1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use anyhow::{anyhow, Context, Result};
5use arrow::array::{
6 ArrayRef, Decimal128Builder, Int8Builder, StringBuilder, TimestampNanosecondBuilder,
7};
8use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use arrow::record_batch::RecordBatch;
10use chrono::{DateTime, Utc};
11use once_cell::sync::Lazy;
12use rust_decimal::prelude::RoundingStrategy;
13use rust_decimal::Decimal;
14use tracing::warn;
15
16use tesser_core::{Candle, Fill, Interval, Order, OrderStatus, OrderType, Tick, TimeInForce};
17
18const DECIMAL_PRECISION: u8 = 38;
19const DECIMAL_SCALE: i8 = 18;
20
21static DECIMAL_ROUND_WARNED: AtomicBool = AtomicBool::new(false);
22
23fn decimal_builder(capacity: usize) -> Decimal128Builder {
24 Decimal128Builder::with_capacity(capacity)
25 .with_data_type(DataType::Decimal128(DECIMAL_PRECISION, DECIMAL_SCALE))
26}
27
28fn timestamp_builder(capacity: usize) -> TimestampNanosecondBuilder {
29 TimestampNanosecondBuilder::with_capacity(capacity)
30 .with_data_type(DataType::Timestamp(TimeUnit::Nanosecond, None))
31}
32
33fn string_builder(capacity: usize) -> StringBuilder {
34 StringBuilder::with_capacity(capacity, capacity.saturating_mul(16))
36}
37
38fn decimal_field(name: &str, nullable: bool) -> Field {
39 Field::new(
40 name,
41 DataType::Decimal128(DECIMAL_PRECISION, DECIMAL_SCALE),
42 nullable,
43 )
44}
45
46fn timestamp_field(name: &str) -> Field {
47 Field::new(name, DataType::Timestamp(TimeUnit::Nanosecond, None), false)
48}
49
50static TICK_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
51 Arc::new(Schema::new(vec![
52 Field::new("symbol", DataType::Utf8, false),
53 decimal_field("price", false),
54 decimal_field("size", false),
55 Field::new("side", DataType::Int8, false),
56 timestamp_field("exchange_timestamp"),
57 timestamp_field("received_at"),
58 ]))
59});
60
61static CANDLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
62 Arc::new(Schema::new(vec![
63 Field::new("symbol", DataType::Utf8, false),
64 Field::new("interval", DataType::Utf8, false),
65 decimal_field("open", false),
66 decimal_field("high", false),
67 decimal_field("low", false),
68 decimal_field("close", false),
69 decimal_field("volume", false),
70 timestamp_field("timestamp"),
71 ]))
72});
73
74static FILL_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
75 Arc::new(Schema::new(vec![
76 Field::new("order_id", DataType::Utf8, false),
77 Field::new("symbol", DataType::Utf8, false),
78 Field::new("side", DataType::Int8, false),
79 decimal_field("fill_price", false),
80 decimal_field("fill_quantity", false),
81 decimal_field("fee", true),
82 timestamp_field("timestamp"),
83 ]))
84});
85
86static ORDER_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
87 Arc::new(Schema::new(vec![
88 Field::new("id", DataType::Utf8, false),
89 Field::new("symbol", DataType::Utf8, false),
90 Field::new("side", DataType::Int8, false),
91 Field::new("order_type", DataType::Utf8, false),
92 Field::new("status", DataType::Int8, false),
93 Field::new("time_in_force", DataType::Utf8, true),
94 decimal_field("quantity", false),
95 decimal_field("price", true),
96 decimal_field("trigger_price", true),
97 Field::new("client_order_id", DataType::Utf8, true),
98 decimal_field("take_profit", true),
99 decimal_field("stop_loss", true),
100 decimal_field("display_quantity", true),
101 decimal_field("filled_quantity", false),
102 decimal_field("avg_fill_price", true),
103 timestamp_field("created_at"),
104 timestamp_field("updated_at"),
105 ]))
106});
107
108pub fn tick_schema() -> SchemaRef {
110 TICK_SCHEMA.clone()
111}
112
113pub fn candle_schema() -> SchemaRef {
115 CANDLE_SCHEMA.clone()
116}
117
118pub fn fill_schema() -> SchemaRef {
120 FILL_SCHEMA.clone()
121}
122
123pub fn order_schema() -> SchemaRef {
125 ORDER_SCHEMA.clone()
126}
127
128pub fn ticks_to_batch(rows: &[Tick]) -> Result<RecordBatch> {
130 let capacity = rows.len();
131 let mut symbols = string_builder(capacity);
132 let mut prices = decimal_builder(capacity);
133 let mut sizes = decimal_builder(capacity);
134 let mut sides = Int8Builder::with_capacity(capacity);
135 let mut exchange_ts = timestamp_builder(capacity);
136 let mut received_ts = timestamp_builder(capacity);
137
138 for tick in rows {
139 symbols.append_value(&tick.symbol);
140 let price = decimal_to_i128(tick.price)?;
141 prices.append_value(price);
142 let size = decimal_to_i128(tick.size)?;
143 sizes.append_value(size);
144 sides.append_value(tick.side.as_i8());
145 exchange_ts.append_value(timestamp_to_nanos(&tick.exchange_timestamp));
146 received_ts.append_value(timestamp_to_nanos(&tick.received_at));
147 }
148
149 let columns: Vec<ArrayRef> = vec![
150 Arc::new(symbols.finish()),
151 Arc::new(prices.finish()),
152 Arc::new(sizes.finish()),
153 Arc::new(sides.finish()),
154 Arc::new(exchange_ts.finish()),
155 Arc::new(received_ts.finish()),
156 ];
157
158 RecordBatch::try_new(tick_schema(), columns).context("failed to build tick batch")
159}
160
161pub fn candles_to_batch(rows: &[Candle]) -> Result<RecordBatch> {
163 let capacity = rows.len();
164 let mut symbols = string_builder(capacity);
165 let mut intervals = string_builder(capacity);
166 let mut opens = decimal_builder(capacity);
167 let mut highs = decimal_builder(capacity);
168 let mut lows = decimal_builder(capacity);
169 let mut closes = decimal_builder(capacity);
170 let mut volumes = decimal_builder(capacity);
171 let mut timestamps = timestamp_builder(capacity);
172
173 for candle in rows {
174 symbols.append_value(&candle.symbol);
175 intervals.append_value(interval_label(candle.interval));
176 let open = decimal_to_i128(candle.open)?;
177 opens.append_value(open);
178 let high = decimal_to_i128(candle.high)?;
179 highs.append_value(high);
180 let low = decimal_to_i128(candle.low)?;
181 lows.append_value(low);
182 let close = decimal_to_i128(candle.close)?;
183 closes.append_value(close);
184 let volume = decimal_to_i128(candle.volume)?;
185 volumes.append_value(volume);
186 timestamps.append_value(timestamp_to_nanos(&candle.timestamp));
187 }
188
189 let columns: Vec<ArrayRef> = vec![
190 Arc::new(symbols.finish()),
191 Arc::new(intervals.finish()),
192 Arc::new(opens.finish()),
193 Arc::new(highs.finish()),
194 Arc::new(lows.finish()),
195 Arc::new(closes.finish()),
196 Arc::new(volumes.finish()),
197 Arc::new(timestamps.finish()),
198 ];
199
200 RecordBatch::try_new(candle_schema(), columns).context("failed to build candle batch")
201}
202
203pub fn fills_to_batch(rows: &[Fill]) -> Result<RecordBatch> {
205 let capacity = rows.len();
206 let mut order_ids = string_builder(capacity);
207 let mut symbols = string_builder(capacity);
208 let mut sides = Int8Builder::with_capacity(capacity);
209 let mut prices = decimal_builder(capacity);
210 let mut quantities = decimal_builder(capacity);
211 let mut fees = decimal_builder(capacity);
212 let mut timestamps = timestamp_builder(capacity);
213
214 for fill in rows {
215 order_ids.append_value(&fill.order_id);
216 symbols.append_value(&fill.symbol);
217 sides.append_value(fill.side.as_i8());
218 let price = decimal_to_i128(fill.fill_price)?;
219 prices.append_value(price);
220 let qty = decimal_to_i128(fill.fill_quantity)?;
221 quantities.append_value(qty);
222 append_decimal_option(&mut fees, fill.fee)?;
223 timestamps.append_value(timestamp_to_nanos(&fill.timestamp));
224 }
225
226 let columns: Vec<ArrayRef> = vec![
227 Arc::new(order_ids.finish()),
228 Arc::new(symbols.finish()),
229 Arc::new(sides.finish()),
230 Arc::new(prices.finish()),
231 Arc::new(quantities.finish()),
232 Arc::new(fees.finish()),
233 Arc::new(timestamps.finish()),
234 ];
235
236 RecordBatch::try_new(fill_schema(), columns).context("failed to build fill batch")
237}
238
239pub fn orders_to_batch(rows: &[Order]) -> Result<RecordBatch> {
241 let capacity = rows.len();
242 let mut ids = string_builder(capacity);
243 let mut symbols = string_builder(capacity);
244 let mut sides = Int8Builder::with_capacity(capacity);
245 let mut types = string_builder(capacity);
246 let mut statuses = Int8Builder::with_capacity(capacity);
247 let mut time_in_force = string_builder(capacity);
248 let mut quantities = decimal_builder(capacity);
249 let mut prices = decimal_builder(capacity);
250 let mut triggers = decimal_builder(capacity);
251 let mut client_order_ids = string_builder(capacity);
252 let mut take_profit = decimal_builder(capacity);
253 let mut stop_loss = decimal_builder(capacity);
254 let mut display_qty = decimal_builder(capacity);
255 let mut filled_qty = decimal_builder(capacity);
256 let mut avg_fill_price = decimal_builder(capacity);
257 let mut created = timestamp_builder(capacity);
258 let mut updated = timestamp_builder(capacity);
259
260 for order in rows {
261 let req = &order.request;
262 ids.append_value(&order.id);
263 symbols.append_value(&req.symbol);
264 sides.append_value(req.side.as_i8());
265 types.append_value(order_type_label(req.order_type));
266 statuses.append_value(status_code(order.status));
267 if let Some(tif) = req.time_in_force {
268 time_in_force.append_value(time_in_force_label(tif));
269 } else {
270 time_in_force.append_null();
271 }
272 let request_qty = decimal_to_i128(req.quantity)?;
273 quantities.append_value(request_qty);
274 append_decimal_option(&mut prices, req.price)?;
275 append_decimal_option(&mut triggers, req.trigger_price)?;
276 append_option_str(&mut client_order_ids, req.client_order_id.as_deref())?;
277 append_decimal_option(&mut take_profit, req.take_profit)?;
278 append_decimal_option(&mut stop_loss, req.stop_loss)?;
279 append_decimal_option(&mut display_qty, req.display_quantity)?;
280 let filled = decimal_to_i128(order.filled_quantity)?;
281 filled_qty.append_value(filled);
282 append_decimal_option(&mut avg_fill_price, order.avg_fill_price)?;
283 created.append_value(timestamp_to_nanos(&order.created_at));
284 updated.append_value(timestamp_to_nanos(&order.updated_at));
285 }
286
287 let columns: Vec<ArrayRef> = vec![
288 Arc::new(ids.finish()),
289 Arc::new(symbols.finish()),
290 Arc::new(sides.finish()),
291 Arc::new(types.finish()),
292 Arc::new(statuses.finish()),
293 Arc::new(time_in_force.finish()),
294 Arc::new(quantities.finish()),
295 Arc::new(prices.finish()),
296 Arc::new(triggers.finish()),
297 Arc::new(client_order_ids.finish()),
298 Arc::new(take_profit.finish()),
299 Arc::new(stop_loss.finish()),
300 Arc::new(display_qty.finish()),
301 Arc::new(filled_qty.finish()),
302 Arc::new(avg_fill_price.finish()),
303 Arc::new(created.finish()),
304 Arc::new(updated.finish()),
305 ];
306
307 RecordBatch::try_new(order_schema(), columns).context("failed to build order batch")
308}
309
310fn decimal_to_i128(value: Decimal) -> Result<i128> {
311 let scale_limit = DECIMAL_SCALE as i32;
312 let mut normalized = value;
313 if normalized.scale() as i32 > scale_limit {
314 if !DECIMAL_ROUND_WARNED.swap(true, Ordering::Relaxed) {
315 warn!(
316 original_scale = normalized.scale(),
317 target_scale = scale_limit,
318 "value scale exceeded flight recorder precision and will be rounded"
319 );
320 }
321 normalized = normalized
322 .round_dp_with_strategy(DECIMAL_SCALE as u32, RoundingStrategy::MidpointNearestEven);
323 }
324 let scale = normalized.scale() as i32;
325 if scale > scale_limit {
326 return Err(anyhow!(
327 "unable to normalize decimal with scale {} for flight recorder",
328 scale
329 ));
330 }
331 let diff = scale_limit - scale;
332 let factor = 10i128
333 .checked_pow(diff as u32)
334 .ok_or_else(|| anyhow!("decimal scaling factor overflow"))?;
335 normalized
336 .mantissa()
337 .checked_mul(factor)
338 .ok_or_else(|| anyhow!("decimal mantissa overflow"))
339}
340
341fn append_decimal_option(builder: &mut Decimal128Builder, value: Option<Decimal>) -> Result<()> {
342 if let Some(v) = value {
343 let scaled = decimal_to_i128(v)?;
344 builder.append_value(scaled);
345 } else {
346 builder.append_null();
347 }
348 Ok(())
349}
350
351fn append_option_str(builder: &mut StringBuilder, value: Option<&str>) -> Result<()> {
352 if let Some(text) = value {
353 builder.append_value(text);
354 } else {
355 builder.append_null();
356 }
357 Ok(())
358}
359
360fn timestamp_to_nanos(ts: &DateTime<Utc>) -> i64 {
361 ts.timestamp_nanos_opt()
362 .unwrap_or_else(|| ts.timestamp_micros() * 1_000)
363}
364
365fn interval_label(interval: Interval) -> &'static str {
366 match interval {
367 Interval::OneSecond => "1s",
368 Interval::OneMinute => "1m",
369 Interval::FiveMinutes => "5m",
370 Interval::FifteenMinutes => "15m",
371 Interval::OneHour => "1h",
372 Interval::FourHours => "4h",
373 Interval::OneDay => "1d",
374 }
375}
376
377fn order_type_label(order_type: OrderType) -> &'static str {
378 match order_type {
379 OrderType::Market => "market",
380 OrderType::Limit => "limit",
381 OrderType::StopMarket => "stop_market",
382 }
383}
384
385fn time_in_force_label(tif: TimeInForce) -> &'static str {
386 match tif {
387 TimeInForce::GoodTilCanceled => "gtc",
388 TimeInForce::ImmediateOrCancel => "ioc",
389 TimeInForce::FillOrKill => "fok",
390 }
391}
392
393fn status_code(status: OrderStatus) -> i8 {
394 match status {
395 OrderStatus::PendingNew => 0,
396 OrderStatus::Accepted => 1,
397 OrderStatus::PartiallyFilled => 2,
398 OrderStatus::Filled => 3,
399 OrderStatus::Canceled => 4,
400 OrderStatus::Rejected => 5,
401 }
402}