1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use anyhow::{anyhow, Context, Result};
5use arrow::array::{
6 ArrayRef, Decimal128Builder, Float64Builder, Int8Builder, ListBuilder, StringBuilder,
7 StructBuilder, TimestampNanosecondBuilder,
8};
9use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
10use arrow::record_batch::RecordBatch;
11use chrono::{DateTime, Utc};
12use once_cell::sync::Lazy;
13use rust_decimal::prelude::RoundingStrategy;
14use rust_decimal::Decimal;
15use serde_json::{json, Map};
16use tracing::warn;
17
18use tesser_core::{
19 Candle, ExecutionHint, Fill, Interval, Order, OrderBook, OrderBookLevel, OrderStatus,
20 OrderType, Signal, SignalKind, Tick, TimeInForce,
21};
22
23const DECIMAL_PRECISION: u8 = 38;
24const DECIMAL_SCALE: i8 = 18;
25
26static DECIMAL_ROUND_WARNED: AtomicBool = AtomicBool::new(false);
27
28fn decimal_builder(capacity: usize) -> Decimal128Builder {
29 Decimal128Builder::with_capacity(capacity)
30 .with_data_type(DataType::Decimal128(DECIMAL_PRECISION, DECIMAL_SCALE))
31}
32
33fn timestamp_builder(capacity: usize) -> TimestampNanosecondBuilder {
34 TimestampNanosecondBuilder::with_capacity(capacity)
35 .with_data_type(DataType::Timestamp(TimeUnit::Nanosecond, None))
36}
37
38fn string_builder(capacity: usize) -> StringBuilder {
39 StringBuilder::with_capacity(capacity, capacity.saturating_mul(16))
41}
42
43fn decimal_field(name: &str, nullable: bool) -> Field {
44 Field::new(
45 name,
46 DataType::Decimal128(DECIMAL_PRECISION, DECIMAL_SCALE),
47 nullable,
48 )
49}
50
51fn timestamp_field(name: &str) -> Field {
52 Field::new(name, DataType::Timestamp(TimeUnit::Nanosecond, None), false)
53}
54
55static TICK_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
56 Arc::new(Schema::new(vec![
57 Field::new("symbol", DataType::Utf8, false),
58 decimal_field("price", false),
59 decimal_field("size", false),
60 Field::new("side", DataType::Int8, false),
61 timestamp_field("exchange_timestamp"),
62 timestamp_field("received_at"),
63 ]))
64});
65
66static CANDLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
67 Arc::new(Schema::new(vec![
68 Field::new("symbol", DataType::Utf8, false),
69 Field::new("interval", DataType::Utf8, false),
70 decimal_field("open", false),
71 decimal_field("high", false),
72 decimal_field("low", false),
73 decimal_field("close", false),
74 decimal_field("volume", false),
75 timestamp_field("timestamp"),
76 ]))
77});
78
79static FILL_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
80 Arc::new(Schema::new(vec![
81 Field::new("order_id", DataType::Utf8, false),
82 Field::new("symbol", DataType::Utf8, false),
83 Field::new("side", DataType::Int8, false),
84 decimal_field("fill_price", false),
85 decimal_field("fill_quantity", false),
86 decimal_field("fee", true),
87 timestamp_field("timestamp"),
88 ]))
89});
90
91static ORDER_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
92 Arc::new(Schema::new(vec![
93 Field::new("id", DataType::Utf8, false),
94 Field::new("symbol", DataType::Utf8, false),
95 Field::new("side", DataType::Int8, false),
96 Field::new("order_type", DataType::Utf8, false),
97 Field::new("status", DataType::Int8, false),
98 Field::new("time_in_force", DataType::Utf8, true),
99 decimal_field("quantity", false),
100 decimal_field("price", true),
101 decimal_field("trigger_price", true),
102 Field::new("client_order_id", DataType::Utf8, true),
103 decimal_field("take_profit", true),
104 decimal_field("stop_loss", true),
105 decimal_field("display_quantity", true),
106 decimal_field("filled_quantity", false),
107 decimal_field("avg_fill_price", true),
108 timestamp_field("created_at"),
109 timestamp_field("updated_at"),
110 ]))
111});
112
113fn order_book_level_fields() -> Fields {
114 Fields::from(vec![
115 decimal_field("price", false),
116 decimal_field("size", false),
117 ])
118}
119
120fn order_book_levels_field(name: &str) -> Field {
121 Field::new(
122 name,
123 DataType::List(Arc::new(Field::new(
124 "item",
125 DataType::Struct(order_book_level_fields()),
126 false,
127 ))),
128 false,
129 )
130}
131
132static ORDER_BOOK_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
133 Arc::new(Schema::new(vec![
134 timestamp_field("timestamp"),
135 Field::new("symbol", DataType::Utf8, false),
136 order_book_levels_field("bids"),
137 order_book_levels_field("asks"),
138 ]))
139});
140
141static SIGNAL_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
142 Arc::new(Schema::new(vec![
143 Field::new("id", DataType::Utf8, false),
144 Field::new("symbol", DataType::Utf8, false),
145 Field::new("kind", DataType::Utf8, false),
146 Field::new("confidence", DataType::Float64, false),
147 decimal_field("quantity", true),
148 decimal_field("stop_loss", true),
149 decimal_field("take_profit", true),
150 timestamp_field("generated_at"),
151 Field::new("metadata", DataType::Utf8, true),
152 Field::new("group_id", DataType::Utf8, true),
153 ]))
154});
155
156pub fn tick_schema() -> SchemaRef {
158 TICK_SCHEMA.clone()
159}
160
161pub fn candle_schema() -> SchemaRef {
163 CANDLE_SCHEMA.clone()
164}
165
166pub fn fill_schema() -> SchemaRef {
168 FILL_SCHEMA.clone()
169}
170
171pub fn order_schema() -> SchemaRef {
173 ORDER_SCHEMA.clone()
174}
175
176pub fn signal_schema() -> SchemaRef {
178 SIGNAL_SCHEMA.clone()
179}
180
181pub fn order_book_schema() -> SchemaRef {
183 ORDER_BOOK_SCHEMA.clone()
184}
185
186pub fn ticks_to_batch(rows: &[Tick]) -> Result<RecordBatch> {
188 let capacity = rows.len();
189 let mut symbols = string_builder(capacity);
190 let mut prices = decimal_builder(capacity);
191 let mut sizes = decimal_builder(capacity);
192 let mut sides = Int8Builder::with_capacity(capacity);
193 let mut exchange_ts = timestamp_builder(capacity);
194 let mut received_ts = timestamp_builder(capacity);
195
196 for tick in rows {
197 symbols.append_value(tick.symbol);
198 let price = decimal_to_i128(tick.price)?;
199 prices.append_value(price);
200 let size = decimal_to_i128(tick.size)?;
201 sizes.append_value(size);
202 sides.append_value(tick.side.as_i8());
203 exchange_ts.append_value(timestamp_to_nanos(&tick.exchange_timestamp));
204 received_ts.append_value(timestamp_to_nanos(&tick.received_at));
205 }
206
207 let columns: Vec<ArrayRef> = vec![
208 Arc::new(symbols.finish()),
209 Arc::new(prices.finish()),
210 Arc::new(sizes.finish()),
211 Arc::new(sides.finish()),
212 Arc::new(exchange_ts.finish()),
213 Arc::new(received_ts.finish()),
214 ];
215
216 RecordBatch::try_new(tick_schema(), columns).context("failed to build tick batch")
217}
218
219pub fn candles_to_batch(rows: &[Candle]) -> Result<RecordBatch> {
221 let capacity = rows.len();
222 let mut symbols = string_builder(capacity);
223 let mut intervals = string_builder(capacity);
224 let mut opens = decimal_builder(capacity);
225 let mut highs = decimal_builder(capacity);
226 let mut lows = decimal_builder(capacity);
227 let mut closes = decimal_builder(capacity);
228 let mut volumes = decimal_builder(capacity);
229 let mut timestamps = timestamp_builder(capacity);
230
231 for candle in rows {
232 symbols.append_value(candle.symbol);
233 intervals.append_value(interval_label(candle.interval));
234 let open = decimal_to_i128(candle.open)?;
235 opens.append_value(open);
236 let high = decimal_to_i128(candle.high)?;
237 highs.append_value(high);
238 let low = decimal_to_i128(candle.low)?;
239 lows.append_value(low);
240 let close = decimal_to_i128(candle.close)?;
241 closes.append_value(close);
242 let volume = decimal_to_i128(candle.volume)?;
243 volumes.append_value(volume);
244 timestamps.append_value(timestamp_to_nanos(&candle.timestamp));
245 }
246
247 let columns: Vec<ArrayRef> = vec![
248 Arc::new(symbols.finish()),
249 Arc::new(intervals.finish()),
250 Arc::new(opens.finish()),
251 Arc::new(highs.finish()),
252 Arc::new(lows.finish()),
253 Arc::new(closes.finish()),
254 Arc::new(volumes.finish()),
255 Arc::new(timestamps.finish()),
256 ];
257
258 RecordBatch::try_new(candle_schema(), columns).context("failed to build candle batch")
259}
260
261pub fn fills_to_batch(rows: &[Fill]) -> Result<RecordBatch> {
263 let capacity = rows.len();
264 let mut order_ids = string_builder(capacity);
265 let mut symbols = string_builder(capacity);
266 let mut sides = Int8Builder::with_capacity(capacity);
267 let mut prices = decimal_builder(capacity);
268 let mut quantities = decimal_builder(capacity);
269 let mut fees = decimal_builder(capacity);
270 let mut timestamps = timestamp_builder(capacity);
271
272 for fill in rows {
273 order_ids.append_value(&fill.order_id);
274 symbols.append_value(fill.symbol);
275 sides.append_value(fill.side.as_i8());
276 let price = decimal_to_i128(fill.fill_price)?;
277 prices.append_value(price);
278 let qty = decimal_to_i128(fill.fill_quantity)?;
279 quantities.append_value(qty);
280 append_decimal_option(&mut fees, fill.fee)?;
281 timestamps.append_value(timestamp_to_nanos(&fill.timestamp));
282 }
283
284 let columns: Vec<ArrayRef> = vec![
285 Arc::new(order_ids.finish()),
286 Arc::new(symbols.finish()),
287 Arc::new(sides.finish()),
288 Arc::new(prices.finish()),
289 Arc::new(quantities.finish()),
290 Arc::new(fees.finish()),
291 Arc::new(timestamps.finish()),
292 ];
293
294 RecordBatch::try_new(fill_schema(), columns).context("failed to build fill batch")
295}
296
297pub fn orders_to_batch(rows: &[Order]) -> Result<RecordBatch> {
299 let capacity = rows.len();
300 let mut ids = string_builder(capacity);
301 let mut symbols = string_builder(capacity);
302 let mut sides = Int8Builder::with_capacity(capacity);
303 let mut types = string_builder(capacity);
304 let mut statuses = Int8Builder::with_capacity(capacity);
305 let mut time_in_force = string_builder(capacity);
306 let mut quantities = decimal_builder(capacity);
307 let mut prices = decimal_builder(capacity);
308 let mut triggers = decimal_builder(capacity);
309 let mut client_order_ids = string_builder(capacity);
310 let mut take_profit = decimal_builder(capacity);
311 let mut stop_loss = decimal_builder(capacity);
312 let mut display_qty = decimal_builder(capacity);
313 let mut filled_qty = decimal_builder(capacity);
314 let mut avg_fill_price = decimal_builder(capacity);
315 let mut created = timestamp_builder(capacity);
316 let mut updated = timestamp_builder(capacity);
317
318 for order in rows {
319 let req = &order.request;
320 ids.append_value(&order.id);
321 symbols.append_value(req.symbol);
322 sides.append_value(req.side.as_i8());
323 types.append_value(order_type_label(req.order_type));
324 statuses.append_value(status_code(order.status));
325 if let Some(tif) = req.time_in_force {
326 time_in_force.append_value(time_in_force_label(tif));
327 } else {
328 time_in_force.append_null();
329 }
330 let request_qty = decimal_to_i128(req.quantity)?;
331 quantities.append_value(request_qty);
332 append_decimal_option(&mut prices, req.price)?;
333 append_decimal_option(&mut triggers, req.trigger_price)?;
334 append_option_str(&mut client_order_ids, req.client_order_id.as_deref())?;
335 append_decimal_option(&mut take_profit, req.take_profit)?;
336 append_decimal_option(&mut stop_loss, req.stop_loss)?;
337 append_decimal_option(&mut display_qty, req.display_quantity)?;
338 let filled = decimal_to_i128(order.filled_quantity)?;
339 filled_qty.append_value(filled);
340 append_decimal_option(&mut avg_fill_price, order.avg_fill_price)?;
341 created.append_value(timestamp_to_nanos(&order.created_at));
342 updated.append_value(timestamp_to_nanos(&order.updated_at));
343 }
344
345 let columns: Vec<ArrayRef> = vec![
346 Arc::new(ids.finish()),
347 Arc::new(symbols.finish()),
348 Arc::new(sides.finish()),
349 Arc::new(types.finish()),
350 Arc::new(statuses.finish()),
351 Arc::new(time_in_force.finish()),
352 Arc::new(quantities.finish()),
353 Arc::new(prices.finish()),
354 Arc::new(triggers.finish()),
355 Arc::new(client_order_ids.finish()),
356 Arc::new(take_profit.finish()),
357 Arc::new(stop_loss.finish()),
358 Arc::new(display_qty.finish()),
359 Arc::new(filled_qty.finish()),
360 Arc::new(avg_fill_price.finish()),
361 Arc::new(created.finish()),
362 Arc::new(updated.finish()),
363 ];
364
365 RecordBatch::try_new(order_schema(), columns).context("failed to build order batch")
366}
367
368pub fn signals_to_batch(rows: &[Signal]) -> Result<RecordBatch> {
370 let capacity = rows.len();
371 let mut ids = string_builder(capacity);
372 let mut symbols = string_builder(capacity);
373 let mut kinds = string_builder(capacity);
374 let mut confidences = Float64Builder::with_capacity(capacity);
375 let mut quantities = decimal_builder(capacity);
376 let mut stop_losses = decimal_builder(capacity);
377 let mut take_profits = decimal_builder(capacity);
378 let mut generated = timestamp_builder(capacity);
379 let mut metadata = string_builder(capacity);
380 let mut group_ids = string_builder(capacity);
381
382 for signal in rows {
383 ids.append_value(signal.id.to_string());
384 symbols.append_value(signal.symbol);
385 kinds.append_value(signal_kind_label(signal.kind));
386 confidences.append_value(signal.confidence);
387 append_decimal_option(&mut quantities, signal.quantity)?;
388 append_decimal_option(&mut stop_losses, signal.stop_loss)?;
389 append_decimal_option(&mut take_profits, signal.take_profit)?;
390 generated.append_value(timestamp_to_nanos(&signal.generated_at));
391 if let Some(meta) = signal_metadata(signal) {
392 metadata.append_value(meta);
393 } else {
394 metadata.append_null();
395 }
396 if let Some(group_id) = signal.group_id {
397 group_ids.append_value(group_id.to_string());
398 } else {
399 group_ids.append_null();
400 }
401 }
402
403 let columns: Vec<ArrayRef> = vec![
404 Arc::new(ids.finish()),
405 Arc::new(symbols.finish()),
406 Arc::new(kinds.finish()),
407 Arc::new(confidences.finish()),
408 Arc::new(quantities.finish()),
409 Arc::new(stop_losses.finish()),
410 Arc::new(take_profits.finish()),
411 Arc::new(generated.finish()),
412 Arc::new(metadata.finish()),
413 Arc::new(group_ids.finish()),
414 ];
415
416 RecordBatch::try_new(signal_schema(), columns).context("failed to build signal batch")
417}
418
419pub fn order_books_to_batch(rows: &[OrderBook]) -> Result<RecordBatch> {
421 let capacity = rows.len();
422 let mut timestamps = timestamp_builder(capacity);
423 let mut symbols = string_builder(capacity);
424 let mut bids = level_list_builder(capacity);
425 let mut asks = level_list_builder(capacity);
426
427 for book in rows {
428 timestamps.append_value(timestamp_to_nanos(&book.timestamp));
429 symbols.append_value(book.symbol);
430 append_order_book_levels(&mut bids, &book.bids)?;
431 append_order_book_levels(&mut asks, &book.asks)?;
432 }
433
434 let columns: Vec<ArrayRef> = vec![
435 Arc::new(timestamps.finish()),
436 Arc::new(symbols.finish()),
437 Arc::new(bids.finish()),
438 Arc::new(asks.finish()),
439 ];
440
441 RecordBatch::try_new(order_book_schema(), columns).context("failed to build order book batch")
442}
443
444fn level_list_builder(capacity: usize) -> ListBuilder<StructBuilder> {
445 let fields = order_book_level_fields();
446 let struct_builder = StructBuilder::from_fields(fields.clone(), capacity);
447 ListBuilder::with_capacity(struct_builder, capacity).with_field(Arc::new(Field::new(
448 "item",
449 DataType::Struct(fields),
450 false,
451 )))
452}
453
454fn append_order_book_levels(
455 builder: &mut ListBuilder<StructBuilder>,
456 levels: &[OrderBookLevel],
457) -> Result<()> {
458 {
459 let values = builder.values();
460 for level in levels {
461 let price = decimal_to_i128(level.price)?;
462 let size = decimal_to_i128(level.size)?;
463 values
464 .field_builder::<Decimal128Builder>(0)
465 .ok_or_else(|| anyhow!("order book builder missing price field"))?
466 .append_value(price);
467 values
468 .field_builder::<Decimal128Builder>(1)
469 .ok_or_else(|| anyhow!("order book builder missing size field"))?
470 .append_value(size);
471 values.append(true);
472 }
473 }
474 builder.append(true);
475 Ok(())
476}
477
478fn decimal_to_i128(value: Decimal) -> Result<i128> {
479 let scale_limit = DECIMAL_SCALE as i32;
480 let mut normalized = value;
481 if normalized.scale() as i32 > scale_limit {
482 if !DECIMAL_ROUND_WARNED.swap(true, Ordering::Relaxed) {
483 warn!(
484 original_scale = normalized.scale(),
485 target_scale = scale_limit,
486 "value scale exceeded flight recorder precision and will be rounded"
487 );
488 }
489 normalized = normalized
490 .round_dp_with_strategy(DECIMAL_SCALE as u32, RoundingStrategy::MidpointNearestEven);
491 }
492 let scale = normalized.scale() as i32;
493 if scale > scale_limit {
494 return Err(anyhow!(
495 "unable to normalize decimal with scale {} for flight recorder",
496 scale
497 ));
498 }
499 let diff = scale_limit - scale;
500 let factor = 10i128
501 .checked_pow(diff as u32)
502 .ok_or_else(|| anyhow!("decimal scaling factor overflow"))?;
503 normalized
504 .mantissa()
505 .checked_mul(factor)
506 .ok_or_else(|| anyhow!("decimal mantissa overflow"))
507}
508
509fn append_decimal_option(builder: &mut Decimal128Builder, value: Option<Decimal>) -> Result<()> {
510 if let Some(v) = value {
511 let scaled = decimal_to_i128(v)?;
512 builder.append_value(scaled);
513 } else {
514 builder.append_null();
515 }
516 Ok(())
517}
518
519fn append_option_str(builder: &mut StringBuilder, value: Option<&str>) -> Result<()> {
520 if let Some(text) = value {
521 builder.append_value(text);
522 } else {
523 builder.append_null();
524 }
525 Ok(())
526}
527
528fn timestamp_to_nanos(ts: &DateTime<Utc>) -> i64 {
529 ts.timestamp_nanos_opt()
530 .unwrap_or_else(|| ts.timestamp_micros() * 1_000)
531}
532
533fn interval_label(interval: Interval) -> &'static str {
534 match interval {
535 Interval::OneSecond => "1s",
536 Interval::OneMinute => "1m",
537 Interval::FiveMinutes => "5m",
538 Interval::FifteenMinutes => "15m",
539 Interval::OneHour => "1h",
540 Interval::FourHours => "4h",
541 Interval::OneDay => "1d",
542 }
543}
544
545fn order_type_label(order_type: OrderType) -> &'static str {
546 match order_type {
547 OrderType::Market => "market",
548 OrderType::Limit => "limit",
549 OrderType::StopMarket => "stop_market",
550 }
551}
552
553fn time_in_force_label(tif: TimeInForce) -> &'static str {
554 match tif {
555 TimeInForce::GoodTilCanceled => "gtc",
556 TimeInForce::ImmediateOrCancel => "ioc",
557 TimeInForce::FillOrKill => "fok",
558 }
559}
560
561fn status_code(status: OrderStatus) -> i8 {
562 match status {
563 OrderStatus::PendingNew => 0,
564 OrderStatus::Accepted => 1,
565 OrderStatus::PartiallyFilled => 2,
566 OrderStatus::Filled => 3,
567 OrderStatus::Canceled => 4,
568 OrderStatus::Rejected => 5,
569 }
570}
571
572fn signal_metadata(signal: &Signal) -> Option<String> {
573 let note = signal.note.as_deref();
574 let hint = signal.execution_hint.as_ref().map(execution_hint_metadata);
575 if note.is_none() && hint.is_none() {
576 return None;
577 }
578 let mut payload = Map::new();
579 if let Some(note) = note {
580 payload.insert("note".into(), json!(note));
581 }
582 if let Some(hint) = hint {
583 payload.insert("execution_hint".into(), hint);
584 }
585 serde_json::to_string(&serde_json::Value::Object(payload)).ok()
586}
587
588fn execution_hint_metadata(hint: &ExecutionHint) -> serde_json::Value {
589 match hint {
590 ExecutionHint::Twap { duration } => json!({
591 "type": "twap",
592 "duration_ms": duration.num_milliseconds(),
593 }),
594 ExecutionHint::Vwap {
595 duration,
596 participation_rate,
597 } => json!({
598 "type": "vwap",
599 "duration_ms": duration.num_milliseconds(),
600 "participation_rate": participation_rate.as_ref().map(|d| d.to_string()),
601 }),
602 ExecutionHint::IcebergSimulated {
603 display_size,
604 limit_offset_bps,
605 } => json!({
606 "type": "iceberg",
607 "display_size": display_size.to_string(),
608 "limit_offset_bps": limit_offset_bps.as_ref().map(|d| d.to_string()),
609 }),
610 ExecutionHint::PeggedBest {
611 offset_bps,
612 clip_size,
613 refresh_secs,
614 min_chase_distance,
615 } => json!({
616 "type": "pegged_best",
617 "offset_bps": offset_bps.to_string(),
618 "clip_size": clip_size.as_ref().map(|d| d.to_string()),
619 "refresh_secs": refresh_secs,
620 "min_chase_distance": min_chase_distance.as_ref().map(|d| d.to_string()),
621 }),
622 ExecutionHint::Sniper {
623 trigger_price,
624 timeout,
625 } => json!({
626 "type": "sniper",
627 "trigger_price": trigger_price.to_string(),
628 "timeout_ms": timeout.map(|d| d.num_milliseconds()),
629 }),
630 ExecutionHint::TrailingStop {
631 activation_price,
632 callback_rate,
633 } => json!({
634 "type": "trailing_stop",
635 "activation_price": activation_price.to_string(),
636 "callback_rate": callback_rate.to_string(),
637 }),
638 ExecutionHint::Plugin { name, params } => json!({
639 "type": "plugin",
640 "name": name,
641 "params": params,
642 }),
643 }
644}
645
646fn signal_kind_label(kind: SignalKind) -> &'static str {
647 match kind {
648 SignalKind::EnterLong => "enter_long",
649 SignalKind::ExitLong => "exit_long",
650 SignalKind::EnterShort => "enter_short",
651 SignalKind::ExitShort => "exit_short",
652 SignalKind::Flatten => "flatten",
653 }
654}