1use std::collections::HashMap;
2use std::fs::{self, File};
3use std::path::{Path, PathBuf};
4
5use anyhow::{anyhow, bail, Context, Result};
6use arrow::array::{Array, Decimal128Array, Int8Array, StringArray, TimestampNanosecondArray};
7use arrow::datatypes::SchemaRef;
8use arrow::record_batch::RecordBatch;
9use chrono::{DateTime, Utc};
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11use rust_decimal::Decimal;
12use tesser_core::{Side, Symbol};
13
14#[derive(Debug, Clone)]
16pub struct ExecutionAnalysisRequest {
17 pub data_dir: PathBuf,
19 pub start: Option<DateTime<Utc>>,
21 pub end: Option<DateTime<Utc>>,
23}
24
25#[derive(Debug, Clone)]
27pub struct ExecutionStats {
28 pub label: String,
29 pub order_count: usize,
30 pub fill_count: usize,
31 pub orders_with_arrival: usize,
32 pub filled_quantity: Decimal,
33 pub notional: Decimal,
34 pub total_fees: Decimal,
35 pub implementation_shortfall: Decimal,
36 pub avg_slippage_bps: Option<Decimal>,
37}
38
39impl ExecutionStats {
40 fn empty(label: impl Into<String>) -> Self {
41 Self {
42 label: label.into(),
43 order_count: 0,
44 fill_count: 0,
45 orders_with_arrival: 0,
46 filled_quantity: Decimal::ZERO,
47 notional: Decimal::ZERO,
48 total_fees: Decimal::ZERO,
49 implementation_shortfall: Decimal::ZERO,
50 avg_slippage_bps: None,
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct ExecutionReport {
58 pub period_start: Option<DateTime<Utc>>,
59 pub period_end: Option<DateTime<Utc>>,
60 pub totals: ExecutionStats,
61 pub per_algo: Vec<ExecutionStats>,
62 pub skipped_orders: usize,
63}
64
65fn bps_factor() -> Decimal {
66 Decimal::new(10_000, 0)
67}
68
69pub fn analyze_execution(request: &ExecutionAnalysisRequest) -> Result<ExecutionReport> {
71 let range = TimeRange::new(request.start, request.end)?;
72 let orders_dir = request.data_dir.join("orders");
73 let fills_dir = request.data_dir.join("fills");
74 if !orders_dir.exists() {
75 bail!(
76 "orders directory missing at {}",
77 orders_dir.to_string_lossy()
78 );
79 }
80 if !fills_dir.exists() {
81 bail!("fills directory missing at {}", fills_dir.to_string_lossy());
82 }
83 let ticks_dir = request.data_dir.join("ticks");
84
85 let order_paths = collect_parquet_files(&orders_dir)?;
86 if order_paths.is_empty() {
87 bail!(
88 "no parquet files found under {}",
89 orders_dir.to_string_lossy()
90 );
91 }
92 let fill_paths = collect_parquet_files(&fills_dir)?;
93 if fill_paths.is_empty() {
94 bail!(
95 "no parquet files found under {}",
96 fills_dir.to_string_lossy()
97 );
98 }
99 let tick_paths = collect_parquet_files(&ticks_dir)?;
100
101 let orders = load_orders(&order_paths, &range)?;
102 if orders.is_empty() {
103 return Ok(ExecutionReport {
104 period_start: request.start,
105 period_end: request.end,
106 totals: ExecutionStats::empty("ALL"),
107 per_algo: Vec::new(),
108 skipped_orders: 0,
109 });
110 }
111 let fills = load_fills(&fill_paths)?;
112 let ticks = load_ticks(&tick_paths)?;
113
114 let mut fills_by_order: HashMap<String, Vec<FillRow>> = HashMap::new();
115 for fill in fills {
116 fills_by_order
117 .entry(fill.order_id.clone())
118 .or_default()
119 .push(fill);
120 }
121
122 let arrival_lookup = ArrivalLookup::new(ticks);
123 let mut aggregator = StatsAggregator::new();
124 let mut skipped = 0usize;
125
126 for order in orders {
127 let Some(fill_rows) = fills_by_order.get(&order.id) else {
128 skipped += 1;
129 continue;
130 };
131 if fill_rows.is_empty() {
132 skipped += 1;
133 continue;
134 }
135 match summarize_order(&order, fill_rows, &arrival_lookup) {
136 Some(summary) => aggregator.record(&order.algo_label, &summary),
137 None => skipped += 1,
138 }
139 }
140
141 let (totals, mut per_algo) = aggregator.finish();
142 per_algo.sort_by(|a, b| b.notional.cmp(&a.notional));
143
144 Ok(ExecutionReport {
145 period_start: request.start,
146 period_end: request.end,
147 totals,
148 per_algo,
149 skipped_orders: skipped,
150 })
151}
152
153struct StatsAggregator {
154 totals: StatsAccumulator,
155 groups: HashMap<String, StatsAccumulator>,
156}
157
158impl StatsAggregator {
159 fn new() -> Self {
160 Self {
161 totals: StatsAccumulator::new("ALL"),
162 groups: HashMap::new(),
163 }
164 }
165
166 fn record(&mut self, label: &str, summary: &OrderSummary) {
167 self.totals.ingest(summary);
168 self.groups
169 .entry(label.to_string())
170 .or_insert_with(|| StatsAccumulator::new(label))
171 .ingest(summary);
172 }
173
174 fn finish(self) -> (ExecutionStats, Vec<ExecutionStats>) {
175 let totals = self.totals.into_stats();
176 let groups = self
177 .groups
178 .into_values()
179 .map(|acc| acc.into_stats())
180 .collect();
181 (totals, groups)
182 }
183}
184
185struct StatsAccumulator {
186 stats: ExecutionStats,
187 slippage_weighted_sum: Decimal,
188 slippage_weight: Decimal,
189}
190
191impl StatsAccumulator {
192 fn new(label: &str) -> Self {
193 Self {
194 stats: ExecutionStats::empty(label),
195 slippage_weighted_sum: Decimal::ZERO,
196 slippage_weight: Decimal::ZERO,
197 }
198 }
199
200 fn ingest(&mut self, summary: &OrderSummary) {
201 self.stats.order_count += 1;
202 self.stats.fill_count += summary.fill_count;
203 if summary.has_arrival {
204 self.stats.orders_with_arrival += 1;
205 }
206 self.stats.filled_quantity += summary.filled_quantity;
207 self.stats.notional += summary.notional;
208 self.stats.total_fees += summary.total_fees;
209 self.stats.implementation_shortfall += summary.shortfall_value;
210 if let Some(bps) = summary.slippage_bps {
211 self.slippage_weighted_sum += bps * summary.filled_quantity;
212 self.slippage_weight += summary.filled_quantity;
213 }
214 }
215
216 fn into_stats(mut self) -> ExecutionStats {
217 self.stats.avg_slippage_bps = if self.slippage_weight > Decimal::ZERO {
218 Some(self.slippage_weighted_sum / self.slippage_weight)
219 } else {
220 None
221 };
222 self.stats
223 }
224}
225
226#[derive(Clone)]
227struct OrderRow {
228 id: String,
229 symbol: Symbol,
230 side: Side,
231 created_at: DateTime<Utc>,
232 algo_label: String,
233}
234
235#[derive(Clone)]
236struct FillRow {
237 order_id: String,
238 price: Decimal,
239 quantity: Decimal,
240 fee: Decimal,
241}
242
243struct OrderSummary {
244 fill_count: usize,
245 filled_quantity: Decimal,
246 notional: Decimal,
247 total_fees: Decimal,
248 slippage_bps: Option<Decimal>,
249 shortfall_value: Decimal,
250 has_arrival: bool,
251}
252
253struct ArrivalLookup {
254 ticks: HashMap<Symbol, Vec<TickPoint>>,
255}
256
257impl ArrivalLookup {
258 fn new(rows: Vec<TickPoint>) -> Self {
259 let mut ticks: HashMap<Symbol, Vec<TickPoint>> = HashMap::new();
260 for row in rows {
261 ticks.entry(row.symbol.clone()).or_default().push(row);
262 }
263 for series in ticks.values_mut() {
264 series.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
265 }
266 Self { ticks }
267 }
268
269 fn price_at(&self, symbol: &str, timestamp: DateTime<Utc>) -> Option<Decimal> {
270 let series = self.ticks.get(symbol)?;
271 if series.is_empty() {
272 return None;
273 }
274 let idx = series.partition_point(|point| point.timestamp <= timestamp);
275 if idx == 0 {
276 Some(series[0].price)
277 } else {
278 Some(series[idx - 1].price)
279 }
280 }
281}
282
283#[derive(Clone)]
284struct TickPoint {
285 symbol: Symbol,
286 price: Decimal,
287 timestamp: DateTime<Utc>,
288}
289
290#[derive(Clone)]
291struct TimeRange {
292 start: Option<DateTime<Utc>>,
293 end: Option<DateTime<Utc>>,
294}
295
296impl TimeRange {
297 fn new(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> Result<Self> {
298 if let (Some(s), Some(e)) = (start, end) {
299 if e < s {
300 bail!("end time must be after start time");
301 }
302 }
303 Ok(Self { start, end })
304 }
305
306 fn contains(&self, ts: DateTime<Utc>) -> bool {
307 if let Some(start) = self.start {
308 if ts < start {
309 return false;
310 }
311 }
312 if let Some(end) = self.end {
313 if ts > end {
314 return false;
315 }
316 }
317 true
318 }
319}
320
321fn summarize_order(
322 order: &OrderRow,
323 fills: &[FillRow],
324 arrival_lookup: &ArrivalLookup,
325) -> Option<OrderSummary> {
326 if fills.is_empty() {
327 return None;
328 }
329
330 let mut filled_quantity = Decimal::ZERO;
331 let mut notional = Decimal::ZERO;
332 let mut total_fees = Decimal::ZERO;
333 for fill in fills {
334 filled_quantity += fill.quantity;
335 notional += fill.price * fill.quantity;
336 total_fees += fill.fee;
337 }
338 if filled_quantity <= Decimal::ZERO {
339 return None;
340 }
341 let avg_fill_price = notional / filled_quantity;
342 let arrival = arrival_lookup.price_at(&order.symbol, order.created_at);
343 let mut slippage_bps = None;
344 let mut shortfall_value = Decimal::ZERO;
345 if let Some(arrival_price) = arrival {
346 if arrival_price > Decimal::ZERO {
347 let price_delta = (avg_fill_price - arrival_price) * side_sign(order.side);
348 shortfall_value = price_delta * filled_quantity;
349 let ratio = price_delta / arrival_price;
350 slippage_bps = Some(ratio * bps_factor());
351 }
352 }
353 Some(OrderSummary {
354 fill_count: fills.len(),
355 filled_quantity,
356 notional,
357 total_fees,
358 slippage_bps,
359 shortfall_value,
360 has_arrival: arrival.is_some(),
361 })
362}
363
364fn side_sign(side: Side) -> Decimal {
365 match side {
366 Side::Buy => Decimal::ONE,
367 Side::Sell => -Decimal::ONE,
368 }
369}
370
371fn collect_parquet_files(dir: &Path) -> Result<Vec<PathBuf>> {
372 if !dir.exists() {
373 return Ok(Vec::new());
374 }
375 let mut stack = vec![dir.to_path_buf()];
376 let mut files = Vec::new();
377 while let Some(path) = stack.pop() {
378 let metadata = fs::metadata(&path)
379 .with_context(|| format!("failed to inspect {}", path.to_string_lossy()))?;
380 if metadata.is_dir() {
381 for entry in fs::read_dir(&path)
382 .with_context(|| format!("failed to list {}", path.to_string_lossy()))?
383 {
384 let entry = entry?;
385 stack.push(entry.path());
386 }
387 } else if path
388 .extension()
389 .and_then(|ext| ext.to_str())
390 .map(|ext| ext.eq_ignore_ascii_case("parquet"))
391 .unwrap_or(false)
392 {
393 files.push(path);
394 }
395 }
396 files.sort();
397 Ok(files)
398}
399
400fn load_orders(paths: &[PathBuf], range: &TimeRange) -> Result<Vec<OrderRow>> {
401 let mut rows = Vec::new();
402 for path in paths {
403 let file =
404 File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
405 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
406 .with_batch_size(4096)
407 .build()?;
408 let mut columns: Option<OrderColumns> = None;
409 for batch in reader {
410 let batch = batch?;
411 if columns.is_none() {
412 columns = Some(OrderColumns::from_schema(&batch.schema())?);
413 }
414 let columns = columns.as_ref().expect("order columns should be set");
415 for row in 0..batch.num_rows() {
416 let created_at = timestamp_value(&batch, columns.created_at, row)?;
417 if !range.contains(created_at) {
418 continue;
419 }
420 let id = string_value(&batch, columns.id, row)?;
421 let symbol = string_value(&batch, columns.symbol, row)?;
422 let side = side_value(&batch, columns.side, row)?;
423 let client_order_id = string_option(&batch, columns.client_order_id, row)?;
424 let algo_label = infer_algo_label(client_order_id.as_deref());
425 rows.push(OrderRow {
426 id,
427 symbol,
428 side,
429 created_at,
430 algo_label,
431 });
432 }
433 }
434 }
435 Ok(rows)
436}
437
438fn load_fills(paths: &[PathBuf]) -> Result<Vec<FillRow>> {
439 let mut rows = Vec::new();
440 for path in paths {
441 let file =
442 File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
443 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
444 .with_batch_size(4096)
445 .build()?;
446 let mut columns: Option<FillColumns> = None;
447 for batch in reader {
448 let batch = batch?;
449 if columns.is_none() {
450 columns = Some(FillColumns::from_schema(&batch.schema())?);
451 }
452 let columns = columns.as_ref().expect("fill columns set");
453 for row in 0..batch.num_rows() {
454 rows.push(FillRow {
455 order_id: string_value(&batch, columns.order_id, row)?,
456 price: decimal_value(&batch, columns.price, row)?,
457 quantity: decimal_value(&batch, columns.quantity, row)?,
458 fee: decimal_option(&batch, columns.fee, row)?.unwrap_or(Decimal::ZERO),
459 });
460 }
461 }
462 }
463 Ok(rows)
464}
465
466fn load_ticks(paths: &[PathBuf]) -> Result<Vec<TickPoint>> {
467 let mut rows = Vec::new();
468 for path in paths {
469 let file =
470 File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
471 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
472 .with_batch_size(4096)
473 .build()?;
474 let mut columns: Option<TickColumns> = None;
475 for batch in reader {
476 let batch = batch?;
477 if columns.is_none() {
478 columns = Some(TickColumns::from_schema(&batch.schema())?);
479 }
480 let columns = columns.as_ref().expect("tick columns set");
481 for row in 0..batch.num_rows() {
482 rows.push(TickPoint {
483 symbol: string_value(&batch, columns.symbol, row)?,
484 price: decimal_value(&batch, columns.price, row)?,
485 timestamp: timestamp_value(&batch, columns.exchange_ts, row)?,
486 });
487 }
488 }
489 }
490 Ok(rows)
491}
492
493fn infer_algo_label(client_order_id: Option<&str>) -> String {
494 let value = client_order_id.unwrap_or("unlabeled");
495 let normalized = value.to_ascii_lowercase();
496 if normalized.starts_with("twap") {
497 "TWAP".to_string()
498 } else if normalized.starts_with("vwap") {
499 "VWAP".to_string()
500 } else if normalized.starts_with("iceberg") {
501 "ICEBERG".to_string()
502 } else if normalized.starts_with("pegged") {
503 "PEGGED".to_string()
504 } else if normalized.starts_with("sniper") {
505 "SNIPER".to_string()
506 } else if normalized.ends_with("-sl") {
507 "STOP_LOSS".to_string()
508 } else if normalized.ends_with("-tp") {
509 "TAKE_PROFIT".to_string()
510 } else {
511 "SIGNAL".to_string()
512 }
513}
514
515struct OrderColumns {
516 id: usize,
517 symbol: usize,
518 side: usize,
519 client_order_id: usize,
520 created_at: usize,
521}
522
523impl OrderColumns {
524 fn from_schema(schema: &SchemaRef) -> Result<Self> {
525 Ok(Self {
526 id: column_index(schema, "id")?,
527 symbol: column_index(schema, "symbol")?,
528 side: column_index(schema, "side")?,
529 client_order_id: column_index(schema, "client_order_id")?,
530 created_at: column_index(schema, "created_at")?,
531 })
532 }
533}
534
535struct FillColumns {
536 order_id: usize,
537 price: usize,
538 quantity: usize,
539 fee: usize,
540}
541
542impl FillColumns {
543 fn from_schema(schema: &SchemaRef) -> Result<Self> {
544 Ok(Self {
545 order_id: column_index(schema, "order_id")?,
546 price: column_index(schema, "fill_price")?,
547 quantity: column_index(schema, "fill_quantity")?,
548 fee: column_index(schema, "fee")?,
549 })
550 }
551}
552
553struct TickColumns {
554 symbol: usize,
555 price: usize,
556 exchange_ts: usize,
557}
558
559impl TickColumns {
560 fn from_schema(schema: &SchemaRef) -> Result<Self> {
561 Ok(Self {
562 symbol: column_index(schema, "symbol")?,
563 price: column_index(schema, "price")?,
564 exchange_ts: column_index(schema, "exchange_timestamp")?,
565 })
566 }
567}
568
569fn column_index(schema: &SchemaRef, name: &str) -> Result<usize> {
570 schema
571 .column_with_name(name)
572 .map(|(idx, _)| idx)
573 .ok_or_else(|| anyhow!("column '{name}' missing from parquet schema"))
574}
575
576fn as_array<T: Array + 'static>(batch: &RecordBatch, column: usize) -> Result<&T> {
577 batch
578 .column(column)
579 .as_any()
580 .downcast_ref::<T>()
581 .ok_or_else(|| anyhow!("column {column} type mismatch"))
582}
583
584fn string_value(batch: &RecordBatch, column: usize, row: usize) -> Result<String> {
585 let array = as_array::<StringArray>(batch, column)?;
586 if array.is_null(row) {
587 return Err(anyhow!("column {column} contains null string"));
588 }
589 Ok(array.value(row).to_string())
590}
591
592fn string_option(batch: &RecordBatch, column: usize, row: usize) -> Result<Option<String>> {
593 let array = as_array::<StringArray>(batch, column)?;
594 if array.is_null(row) {
595 return Ok(None);
596 }
597 Ok(Some(array.value(row).to_string()))
598}
599
600fn decimal_value(batch: &RecordBatch, column: usize, row: usize) -> Result<Decimal> {
601 let array = as_array::<Decimal128Array>(batch, column)?;
602 if array.is_null(row) {
603 return Err(anyhow!("column {column} contains null decimal"));
604 }
605 Ok(Decimal::from_i128_with_scale(
606 array.value(row),
607 array.scale() as u32,
608 ))
609}
610
611fn decimal_option(batch: &RecordBatch, column: usize, row: usize) -> Result<Option<Decimal>> {
612 let array = as_array::<Decimal128Array>(batch, column)?;
613 if array.is_null(row) {
614 return Ok(None);
615 }
616 Ok(Some(Decimal::from_i128_with_scale(
617 array.value(row),
618 array.scale() as u32,
619 )))
620}
621
622fn timestamp_value(batch: &RecordBatch, column: usize, row: usize) -> Result<DateTime<Utc>> {
623 let array = as_array::<TimestampNanosecondArray>(batch, column)?;
624 if array.is_null(row) {
625 return Err(anyhow!("column {column} contains null timestamp"));
626 }
627 let nanos = array.value(row);
628 let secs = nanos.div_euclid(1_000_000_000);
629 let sub = nanos.rem_euclid(1_000_000_000) as u32;
630 DateTime::<Utc>::from_timestamp(secs, sub)
631 .ok_or_else(|| anyhow!("timestamp overflow for value {nanos}"))
632}
633
634fn side_value(batch: &RecordBatch, column: usize, row: usize) -> Result<Side> {
635 let array = as_array::<Int8Array>(batch, column)?;
636 if array.is_null(row) {
637 return Err(anyhow!("column {column} contains null side"));
638 }
639 Ok(if array.value(row) >= 0 {
640 Side::Buy
641 } else {
642 Side::Sell
643 })
644}
645
646#[cfg(test)]
647mod tests {
648 use super::*;
649 use arrow::record_batch::RecordBatch;
650 use chrono::TimeZone;
651 use parquet::arrow::ArrowWriter;
652 use parquet::file::properties::WriterProperties;
653 use rust_decimal::prelude::FromPrimitive;
654 use tempfile::tempdir;
655 use tesser_core::{Fill, Order, OrderRequest, OrderStatus, OrderType, Tick, TimeInForce};
656
657 use crate::encoding::{fills_to_batch, orders_to_batch, ticks_to_batch};
658
659 #[test]
660 fn computes_slippage_from_mock_data() -> Result<()> {
661 let dir = tempdir()?;
662 let root = dir.path();
663 let order_id = "order-1".to_string();
664 let created_at = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
665 let order = Order {
666 id: order_id.clone(),
667 request: OrderRequest {
668 symbol: "BTCUSDT".to_string(),
669 side: Side::Buy,
670 order_type: OrderType::Market,
671 quantity: Decimal::from_i64(2).unwrap(),
672 price: None,
673 trigger_price: None,
674 time_in_force: Some(TimeInForce::GoodTilCanceled),
675 client_order_id: Some("twap-demo-1".to_string()),
676 take_profit: None,
677 stop_loss: None,
678 display_quantity: None,
679 },
680 status: OrderStatus::Filled,
681 filled_quantity: Decimal::from_i64(2).unwrap(),
682 avg_fill_price: None,
683 created_at,
684 updated_at: created_at,
685 };
686 let orders_batch = orders_to_batch(std::slice::from_ref(&order))?;
687 write_partition(root, "orders", created_at, &orders_batch)?;
688
689 let fill_one = Fill {
690 order_id: order_id.clone(),
691 symbol: order.request.symbol.clone(),
692 side: order.request.side,
693 fill_price: Decimal::from_f64(101.0).unwrap(),
694 fill_quantity: Decimal::ONE,
695 fee: Some(Decimal::new(1, 2)),
696 timestamp: created_at,
697 };
698 let fill_two = Fill {
699 order_id: order_id.clone(),
700 symbol: order.request.symbol.clone(),
701 side: order.request.side,
702 fill_price: Decimal::from_f64(102.0).unwrap(),
703 fill_quantity: Decimal::ONE,
704 fee: Some(Decimal::new(1, 2)),
705 timestamp: created_at,
706 };
707 let fills_batch = fills_to_batch(&[fill_one, fill_two])?;
708 write_partition(root, "fills", created_at, &fills_batch)?;
709
710 let tick = Tick {
711 symbol: order.request.symbol.clone(),
712 price: Decimal::from_f64(100.0).unwrap(),
713 size: Decimal::ONE,
714 side: Side::Buy,
715 exchange_timestamp: created_at,
716 received_at: created_at,
717 };
718 let ticks_batch = ticks_to_batch(std::slice::from_ref(&tick))?;
719 write_partition(root, "ticks", created_at, &ticks_batch)?;
720
721 let report = analyze_execution(&ExecutionAnalysisRequest {
722 data_dir: root.into(),
723 start: None,
724 end: None,
725 })?;
726
727 assert_eq!(report.totals.order_count, 1);
728 assert_eq!(report.totals.fill_count, 2);
729 assert_eq!(report.totals.filled_quantity, Decimal::from_i64(2).unwrap());
730 assert_eq!(report.totals.orders_with_arrival, 1);
731 assert_eq!(report.totals.total_fees, Decimal::from_f64(0.02).unwrap());
732 assert_eq!(
733 report.totals.implementation_shortfall,
734 Decimal::from_f64(3.0).unwrap()
735 );
736 let bps = report.totals.avg_slippage_bps.expect("slippage available");
737 assert_eq!(bps, Decimal::from_i64(150).unwrap());
738 let algo = report
739 .per_algo
740 .iter()
741 .find(|entry| entry.label == "TWAP")
742 .expect("twap bucket exists");
743 assert_eq!(algo.order_count, 1);
744 Ok(())
745 }
746
747 #[test]
748 fn handles_missing_orders_in_window() -> Result<()> {
749 let dir = tempdir()?;
750 let root = dir.path();
751 let created_at = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
752 let order = Order {
753 id: "order-1".to_string(),
754 request: OrderRequest {
755 symbol: "BTCUSDT".to_string(),
756 side: Side::Buy,
757 order_type: OrderType::Market,
758 quantity: Decimal::ONE,
759 price: None,
760 trigger_price: None,
761 time_in_force: Some(TimeInForce::FillOrKill),
762 client_order_id: Some("sniper-1".to_string()),
763 take_profit: None,
764 stop_loss: None,
765 display_quantity: None,
766 },
767 status: OrderStatus::Canceled,
768 filled_quantity: Decimal::ZERO,
769 avg_fill_price: None,
770 created_at,
771 updated_at: created_at,
772 };
773 let orders_batch = orders_to_batch(std::slice::from_ref(&order))?;
774 write_partition(root, "orders", created_at, &orders_batch)?;
775
776 let fills_batch = fills_to_batch(&[])?;
777 write_partition(root, "fills", created_at, &fills_batch)?;
778
779 let ticks_batch = ticks_to_batch(&[])?;
780 write_partition(root, "ticks", created_at, &ticks_batch)?;
781
782 let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
783 let report = analyze_execution(&ExecutionAnalysisRequest {
784 data_dir: root.into(),
785 start: Some(start),
786 end: None,
787 })?;
788 assert_eq!(report.totals.order_count, 0);
789 assert_eq!(report.skipped_orders, 0);
790 Ok(())
791 }
792
793 fn write_partition(
794 root: &Path,
795 kind: &str,
796 timestamp: DateTime<Utc>,
797 batch: &RecordBatch,
798 ) -> Result<()> {
799 let day = timestamp.date_naive().to_string();
800 let dir = root.join(kind).join(day);
801 std::fs::create_dir_all(&dir)
802 .with_context(|| format!("failed to create {}", dir.display()))?;
803 let path = dir.join("part-000.parquet");
804 let file =
805 File::create(&path).with_context(|| format!("failed to create {}", path.display()))?;
806 let props = WriterProperties::builder().build();
807 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
808 writer.write(batch)?;
809 writer.close().map(|_| ()).map_err(Into::into)
810 }
811}