tesser_data/
encoding.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use anyhow::{anyhow, Context, Result};
5use arrow::array::{
6    ArrayRef, Decimal128Builder, Int8Builder, StringBuilder, TimestampNanosecondBuilder,
7};
8use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use arrow::record_batch::RecordBatch;
10use chrono::{DateTime, Utc};
11use once_cell::sync::Lazy;
12use rust_decimal::prelude::RoundingStrategy;
13use rust_decimal::Decimal;
14use tracing::warn;
15
16use tesser_core::{Candle, Fill, Interval, Order, OrderStatus, OrderType, Tick, TimeInForce};
17
18const DECIMAL_PRECISION: u8 = 38;
19const DECIMAL_SCALE: i8 = 18;
20
21static DECIMAL_ROUND_WARNED: AtomicBool = AtomicBool::new(false);
22
23fn decimal_builder(capacity: usize) -> Decimal128Builder {
24    Decimal128Builder::with_capacity(capacity)
25        .with_data_type(DataType::Decimal128(DECIMAL_PRECISION, DECIMAL_SCALE))
26}
27
28fn timestamp_builder(capacity: usize) -> TimestampNanosecondBuilder {
29    TimestampNanosecondBuilder::with_capacity(capacity)
30        .with_data_type(DataType::Timestamp(TimeUnit::Nanosecond, None))
31}
32
33fn string_builder(capacity: usize) -> StringBuilder {
34    // Allocate a reasonable byte backing assuming ~16 bytes per value.
35    StringBuilder::with_capacity(capacity, capacity.saturating_mul(16))
36}
37
38fn decimal_field(name: &str, nullable: bool) -> Field {
39    Field::new(
40        name,
41        DataType::Decimal128(DECIMAL_PRECISION, DECIMAL_SCALE),
42        nullable,
43    )
44}
45
46fn timestamp_field(name: &str) -> Field {
47    Field::new(name, DataType::Timestamp(TimeUnit::Nanosecond, None), false)
48}
49
50static TICK_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
51    Arc::new(Schema::new(vec![
52        Field::new("symbol", DataType::Utf8, false),
53        decimal_field("price", false),
54        decimal_field("size", false),
55        Field::new("side", DataType::Int8, false),
56        timestamp_field("exchange_timestamp"),
57        timestamp_field("received_at"),
58    ]))
59});
60
61static CANDLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
62    Arc::new(Schema::new(vec![
63        Field::new("symbol", DataType::Utf8, false),
64        Field::new("interval", DataType::Utf8, false),
65        decimal_field("open", false),
66        decimal_field("high", false),
67        decimal_field("low", false),
68        decimal_field("close", false),
69        decimal_field("volume", false),
70        timestamp_field("timestamp"),
71    ]))
72});
73
74static FILL_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
75    Arc::new(Schema::new(vec![
76        Field::new("order_id", DataType::Utf8, false),
77        Field::new("symbol", DataType::Utf8, false),
78        Field::new("side", DataType::Int8, false),
79        decimal_field("fill_price", false),
80        decimal_field("fill_quantity", false),
81        decimal_field("fee", true),
82        timestamp_field("timestamp"),
83    ]))
84});
85
86static ORDER_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
87    Arc::new(Schema::new(vec![
88        Field::new("id", DataType::Utf8, false),
89        Field::new("symbol", DataType::Utf8, false),
90        Field::new("side", DataType::Int8, false),
91        Field::new("order_type", DataType::Utf8, false),
92        Field::new("status", DataType::Int8, false),
93        Field::new("time_in_force", DataType::Utf8, true),
94        decimal_field("quantity", false),
95        decimal_field("price", true),
96        decimal_field("trigger_price", true),
97        Field::new("client_order_id", DataType::Utf8, true),
98        decimal_field("take_profit", true),
99        decimal_field("stop_loss", true),
100        decimal_field("display_quantity", true),
101        decimal_field("filled_quantity", false),
102        decimal_field("avg_fill_price", true),
103        timestamp_field("created_at"),
104        timestamp_field("updated_at"),
105    ]))
106});
107
108/// Returns the schema used when encoding ticks.
109pub fn tick_schema() -> SchemaRef {
110    TICK_SCHEMA.clone()
111}
112
113/// Returns the schema used when encoding candles.
114pub fn candle_schema() -> SchemaRef {
115    CANDLE_SCHEMA.clone()
116}
117
118/// Returns the schema used when encoding fills.
119pub fn fill_schema() -> SchemaRef {
120    FILL_SCHEMA.clone()
121}
122
123/// Returns the schema used when encoding orders.
124pub fn order_schema() -> SchemaRef {
125    ORDER_SCHEMA.clone()
126}
127
128/// Converts a slice of ticks into a [`RecordBatch`].
129pub fn ticks_to_batch(rows: &[Tick]) -> Result<RecordBatch> {
130    let capacity = rows.len();
131    let mut symbols = string_builder(capacity);
132    let mut prices = decimal_builder(capacity);
133    let mut sizes = decimal_builder(capacity);
134    let mut sides = Int8Builder::with_capacity(capacity);
135    let mut exchange_ts = timestamp_builder(capacity);
136    let mut received_ts = timestamp_builder(capacity);
137
138    for tick in rows {
139        symbols.append_value(&tick.symbol);
140        let price = decimal_to_i128(tick.price)?;
141        prices.append_value(price);
142        let size = decimal_to_i128(tick.size)?;
143        sizes.append_value(size);
144        sides.append_value(tick.side.as_i8());
145        exchange_ts.append_value(timestamp_to_nanos(&tick.exchange_timestamp));
146        received_ts.append_value(timestamp_to_nanos(&tick.received_at));
147    }
148
149    let columns: Vec<ArrayRef> = vec![
150        Arc::new(symbols.finish()),
151        Arc::new(prices.finish()),
152        Arc::new(sizes.finish()),
153        Arc::new(sides.finish()),
154        Arc::new(exchange_ts.finish()),
155        Arc::new(received_ts.finish()),
156    ];
157
158    RecordBatch::try_new(tick_schema(), columns).context("failed to build tick batch")
159}
160
161/// Converts a slice of candles into a [`RecordBatch`].
162pub fn candles_to_batch(rows: &[Candle]) -> Result<RecordBatch> {
163    let capacity = rows.len();
164    let mut symbols = string_builder(capacity);
165    let mut intervals = string_builder(capacity);
166    let mut opens = decimal_builder(capacity);
167    let mut highs = decimal_builder(capacity);
168    let mut lows = decimal_builder(capacity);
169    let mut closes = decimal_builder(capacity);
170    let mut volumes = decimal_builder(capacity);
171    let mut timestamps = timestamp_builder(capacity);
172
173    for candle in rows {
174        symbols.append_value(&candle.symbol);
175        intervals.append_value(interval_label(candle.interval));
176        let open = decimal_to_i128(candle.open)?;
177        opens.append_value(open);
178        let high = decimal_to_i128(candle.high)?;
179        highs.append_value(high);
180        let low = decimal_to_i128(candle.low)?;
181        lows.append_value(low);
182        let close = decimal_to_i128(candle.close)?;
183        closes.append_value(close);
184        let volume = decimal_to_i128(candle.volume)?;
185        volumes.append_value(volume);
186        timestamps.append_value(timestamp_to_nanos(&candle.timestamp));
187    }
188
189    let columns: Vec<ArrayRef> = vec![
190        Arc::new(symbols.finish()),
191        Arc::new(intervals.finish()),
192        Arc::new(opens.finish()),
193        Arc::new(highs.finish()),
194        Arc::new(lows.finish()),
195        Arc::new(closes.finish()),
196        Arc::new(volumes.finish()),
197        Arc::new(timestamps.finish()),
198    ];
199
200    RecordBatch::try_new(candle_schema(), columns).context("failed to build candle batch")
201}
202
203/// Converts a slice of fills into a [`RecordBatch`].
204pub fn fills_to_batch(rows: &[Fill]) -> Result<RecordBatch> {
205    let capacity = rows.len();
206    let mut order_ids = string_builder(capacity);
207    let mut symbols = string_builder(capacity);
208    let mut sides = Int8Builder::with_capacity(capacity);
209    let mut prices = decimal_builder(capacity);
210    let mut quantities = decimal_builder(capacity);
211    let mut fees = decimal_builder(capacity);
212    let mut timestamps = timestamp_builder(capacity);
213
214    for fill in rows {
215        order_ids.append_value(&fill.order_id);
216        symbols.append_value(&fill.symbol);
217        sides.append_value(fill.side.as_i8());
218        let price = decimal_to_i128(fill.fill_price)?;
219        prices.append_value(price);
220        let qty = decimal_to_i128(fill.fill_quantity)?;
221        quantities.append_value(qty);
222        append_decimal_option(&mut fees, fill.fee)?;
223        timestamps.append_value(timestamp_to_nanos(&fill.timestamp));
224    }
225
226    let columns: Vec<ArrayRef> = vec![
227        Arc::new(order_ids.finish()),
228        Arc::new(symbols.finish()),
229        Arc::new(sides.finish()),
230        Arc::new(prices.finish()),
231        Arc::new(quantities.finish()),
232        Arc::new(fees.finish()),
233        Arc::new(timestamps.finish()),
234    ];
235
236    RecordBatch::try_new(fill_schema(), columns).context("failed to build fill batch")
237}
238
239/// Converts a slice of orders into a [`RecordBatch`].
240pub fn orders_to_batch(rows: &[Order]) -> Result<RecordBatch> {
241    let capacity = rows.len();
242    let mut ids = string_builder(capacity);
243    let mut symbols = string_builder(capacity);
244    let mut sides = Int8Builder::with_capacity(capacity);
245    let mut types = string_builder(capacity);
246    let mut statuses = Int8Builder::with_capacity(capacity);
247    let mut time_in_force = string_builder(capacity);
248    let mut quantities = decimal_builder(capacity);
249    let mut prices = decimal_builder(capacity);
250    let mut triggers = decimal_builder(capacity);
251    let mut client_order_ids = string_builder(capacity);
252    let mut take_profit = decimal_builder(capacity);
253    let mut stop_loss = decimal_builder(capacity);
254    let mut display_qty = decimal_builder(capacity);
255    let mut filled_qty = decimal_builder(capacity);
256    let mut avg_fill_price = decimal_builder(capacity);
257    let mut created = timestamp_builder(capacity);
258    let mut updated = timestamp_builder(capacity);
259
260    for order in rows {
261        let req = &order.request;
262        ids.append_value(&order.id);
263        symbols.append_value(&req.symbol);
264        sides.append_value(req.side.as_i8());
265        types.append_value(order_type_label(req.order_type));
266        statuses.append_value(status_code(order.status));
267        if let Some(tif) = req.time_in_force {
268            time_in_force.append_value(time_in_force_label(tif));
269        } else {
270            time_in_force.append_null();
271        }
272        let request_qty = decimal_to_i128(req.quantity)?;
273        quantities.append_value(request_qty);
274        append_decimal_option(&mut prices, req.price)?;
275        append_decimal_option(&mut triggers, req.trigger_price)?;
276        append_option_str(&mut client_order_ids, req.client_order_id.as_deref())?;
277        append_decimal_option(&mut take_profit, req.take_profit)?;
278        append_decimal_option(&mut stop_loss, req.stop_loss)?;
279        append_decimal_option(&mut display_qty, req.display_quantity)?;
280        let filled = decimal_to_i128(order.filled_quantity)?;
281        filled_qty.append_value(filled);
282        append_decimal_option(&mut avg_fill_price, order.avg_fill_price)?;
283        created.append_value(timestamp_to_nanos(&order.created_at));
284        updated.append_value(timestamp_to_nanos(&order.updated_at));
285    }
286
287    let columns: Vec<ArrayRef> = vec![
288        Arc::new(ids.finish()),
289        Arc::new(symbols.finish()),
290        Arc::new(sides.finish()),
291        Arc::new(types.finish()),
292        Arc::new(statuses.finish()),
293        Arc::new(time_in_force.finish()),
294        Arc::new(quantities.finish()),
295        Arc::new(prices.finish()),
296        Arc::new(triggers.finish()),
297        Arc::new(client_order_ids.finish()),
298        Arc::new(take_profit.finish()),
299        Arc::new(stop_loss.finish()),
300        Arc::new(display_qty.finish()),
301        Arc::new(filled_qty.finish()),
302        Arc::new(avg_fill_price.finish()),
303        Arc::new(created.finish()),
304        Arc::new(updated.finish()),
305    ];
306
307    RecordBatch::try_new(order_schema(), columns).context("failed to build order batch")
308}
309
310fn decimal_to_i128(value: Decimal) -> Result<i128> {
311    let scale_limit = DECIMAL_SCALE as i32;
312    let mut normalized = value;
313    if normalized.scale() as i32 > scale_limit {
314        if !DECIMAL_ROUND_WARNED.swap(true, Ordering::Relaxed) {
315            warn!(
316                original_scale = normalized.scale(),
317                target_scale = scale_limit,
318                "value scale exceeded flight recorder precision and will be rounded"
319            );
320        }
321        normalized = normalized
322            .round_dp_with_strategy(DECIMAL_SCALE as u32, RoundingStrategy::MidpointNearestEven);
323    }
324    let scale = normalized.scale() as i32;
325    if scale > scale_limit {
326        return Err(anyhow!(
327            "unable to normalize decimal with scale {} for flight recorder",
328            scale
329        ));
330    }
331    let diff = scale_limit - scale;
332    let factor = 10i128
333        .checked_pow(diff as u32)
334        .ok_or_else(|| anyhow!("decimal scaling factor overflow"))?;
335    normalized
336        .mantissa()
337        .checked_mul(factor)
338        .ok_or_else(|| anyhow!("decimal mantissa overflow"))
339}
340
341fn append_decimal_option(builder: &mut Decimal128Builder, value: Option<Decimal>) -> Result<()> {
342    if let Some(v) = value {
343        let scaled = decimal_to_i128(v)?;
344        builder.append_value(scaled);
345    } else {
346        builder.append_null();
347    }
348    Ok(())
349}
350
351fn append_option_str(builder: &mut StringBuilder, value: Option<&str>) -> Result<()> {
352    if let Some(text) = value {
353        builder.append_value(text);
354    } else {
355        builder.append_null();
356    }
357    Ok(())
358}
359
360fn timestamp_to_nanos(ts: &DateTime<Utc>) -> i64 {
361    ts.timestamp_nanos_opt()
362        .unwrap_or_else(|| ts.timestamp_micros() * 1_000)
363}
364
365fn interval_label(interval: Interval) -> &'static str {
366    match interval {
367        Interval::OneSecond => "1s",
368        Interval::OneMinute => "1m",
369        Interval::FiveMinutes => "5m",
370        Interval::FifteenMinutes => "15m",
371        Interval::OneHour => "1h",
372        Interval::FourHours => "4h",
373        Interval::OneDay => "1d",
374    }
375}
376
377fn order_type_label(order_type: OrderType) -> &'static str {
378    match order_type {
379        OrderType::Market => "market",
380        OrderType::Limit => "limit",
381        OrderType::StopMarket => "stop_market",
382    }
383}
384
385fn time_in_force_label(tif: TimeInForce) -> &'static str {
386    match tif {
387        TimeInForce::GoodTilCanceled => "gtc",
388        TimeInForce::ImmediateOrCancel => "ioc",
389        TimeInForce::FillOrKill => "fok",
390    }
391}
392
393fn status_code(status: OrderStatus) -> i8 {
394    match status {
395        OrderStatus::PendingNew => 0,
396        OrderStatus::Accepted => 1,
397        OrderStatus::PartiallyFilled => 2,
398        OrderStatus::Filled => 3,
399        OrderStatus::Canceled => 4,
400        OrderStatus::Rejected => 5,
401    }
402}