1use std::collections::HashMap;
2use std::fs::{self, File};
3use std::path::{Path, PathBuf};
4
5use anyhow::{anyhow, bail, Context, Result};
6use arrow::array::{Array, Decimal128Array, Int8Array, StringArray, TimestampNanosecondArray};
7use arrow::datatypes::SchemaRef;
8use arrow::record_batch::RecordBatch;
9use chrono::{DateTime, Utc};
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11use rust_decimal::Decimal;
12use tesser_core::{Side, Symbol};
13
14#[derive(Debug, Clone)]
16pub struct ExecutionAnalysisRequest {
17 pub data_dir: PathBuf,
19 pub start: Option<DateTime<Utc>>,
21 pub end: Option<DateTime<Utc>>,
23}
24
25#[derive(Debug, Clone)]
27pub struct ExecutionStats {
28 pub label: String,
29 pub order_count: usize,
30 pub fill_count: usize,
31 pub orders_with_arrival: usize,
32 pub filled_quantity: Decimal,
33 pub notional: Decimal,
34 pub total_fees: Decimal,
35 pub implementation_shortfall: Decimal,
36 pub avg_slippage_bps: Option<Decimal>,
37}
38
39impl ExecutionStats {
40 fn empty(label: impl Into<String>) -> Self {
41 Self {
42 label: label.into(),
43 order_count: 0,
44 fill_count: 0,
45 orders_with_arrival: 0,
46 filled_quantity: Decimal::ZERO,
47 notional: Decimal::ZERO,
48 total_fees: Decimal::ZERO,
49 implementation_shortfall: Decimal::ZERO,
50 avg_slippage_bps: None,
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct ExecutionReport {
58 pub period_start: Option<DateTime<Utc>>,
59 pub period_end: Option<DateTime<Utc>>,
60 pub totals: ExecutionStats,
61 pub per_algo: Vec<ExecutionStats>,
62 pub skipped_orders: usize,
63}
64
65fn bps_factor() -> Decimal {
66 Decimal::new(10_000, 0)
67}
68
69pub fn analyze_execution(request: &ExecutionAnalysisRequest) -> Result<ExecutionReport> {
71 let range = TimeRange::new(request.start, request.end)?;
72 let orders_dir = request.data_dir.join("orders");
73 let fills_dir = request.data_dir.join("fills");
74 if !orders_dir.exists() {
75 bail!(
76 "orders directory missing at {}",
77 orders_dir.to_string_lossy()
78 );
79 }
80 if !fills_dir.exists() {
81 bail!("fills directory missing at {}", fills_dir.to_string_lossy());
82 }
83 let ticks_dir = request.data_dir.join("ticks");
84
85 let order_paths = collect_parquet_files(&orders_dir)?;
86 if order_paths.is_empty() {
87 bail!(
88 "no parquet files found under {}",
89 orders_dir.to_string_lossy()
90 );
91 }
92 let fill_paths = collect_parquet_files(&fills_dir)?;
93 if fill_paths.is_empty() {
94 bail!(
95 "no parquet files found under {}",
96 fills_dir.to_string_lossy()
97 );
98 }
99 let tick_paths = collect_parquet_files(&ticks_dir)?;
100
101 let orders = load_orders(&order_paths, &range)?;
102 if orders.is_empty() {
103 return Ok(ExecutionReport {
104 period_start: request.start,
105 period_end: request.end,
106 totals: ExecutionStats::empty("ALL"),
107 per_algo: Vec::new(),
108 skipped_orders: 0,
109 });
110 }
111 let fills = load_fills(&fill_paths)?;
112 let ticks = load_ticks(&tick_paths)?;
113
114 let mut fills_by_order: HashMap<String, Vec<FillRow>> = HashMap::new();
115 for fill in fills {
116 fills_by_order
117 .entry(fill.order_id.clone())
118 .or_default()
119 .push(fill);
120 }
121
122 let arrival_lookup = ArrivalLookup::new(ticks);
123 let mut aggregator = StatsAggregator::new();
124 let mut skipped = 0usize;
125
126 for order in orders {
127 let Some(fill_rows) = fills_by_order.get(&order.id) else {
128 skipped += 1;
129 continue;
130 };
131 if fill_rows.is_empty() {
132 skipped += 1;
133 continue;
134 }
135 match summarize_order(&order, fill_rows, &arrival_lookup) {
136 Some(summary) => aggregator.record(&order.algo_label, &summary),
137 None => skipped += 1,
138 }
139 }
140
141 let (totals, mut per_algo) = aggregator.finish();
142 per_algo.sort_by(|a, b| b.notional.cmp(&a.notional));
143
144 Ok(ExecutionReport {
145 period_start: request.start,
146 period_end: request.end,
147 totals,
148 per_algo,
149 skipped_orders: skipped,
150 })
151}
152
153struct StatsAggregator {
154 totals: StatsAccumulator,
155 groups: HashMap<String, StatsAccumulator>,
156}
157
158impl StatsAggregator {
159 fn new() -> Self {
160 Self {
161 totals: StatsAccumulator::new("ALL"),
162 groups: HashMap::new(),
163 }
164 }
165
166 fn record(&mut self, label: &str, summary: &OrderSummary) {
167 self.totals.ingest(summary);
168 self.groups
169 .entry(label.to_string())
170 .or_insert_with(|| StatsAccumulator::new(label))
171 .ingest(summary);
172 }
173
174 fn finish(self) -> (ExecutionStats, Vec<ExecutionStats>) {
175 let totals = self.totals.into_stats();
176 let groups = self
177 .groups
178 .into_values()
179 .map(|acc| acc.into_stats())
180 .collect();
181 (totals, groups)
182 }
183}
184
185struct StatsAccumulator {
186 stats: ExecutionStats,
187 slippage_weighted_sum: Decimal,
188 slippage_weight: Decimal,
189}
190
191impl StatsAccumulator {
192 fn new(label: &str) -> Self {
193 Self {
194 stats: ExecutionStats::empty(label),
195 slippage_weighted_sum: Decimal::ZERO,
196 slippage_weight: Decimal::ZERO,
197 }
198 }
199
200 fn ingest(&mut self, summary: &OrderSummary) {
201 self.stats.order_count += 1;
202 self.stats.fill_count += summary.fill_count;
203 if summary.has_arrival {
204 self.stats.orders_with_arrival += 1;
205 }
206 self.stats.filled_quantity += summary.filled_quantity;
207 self.stats.notional += summary.notional;
208 self.stats.total_fees += summary.total_fees;
209 self.stats.implementation_shortfall += summary.shortfall_value;
210 if let Some(bps) = summary.slippage_bps {
211 self.slippage_weighted_sum += bps * summary.filled_quantity;
212 self.slippage_weight += summary.filled_quantity;
213 }
214 }
215
216 fn into_stats(mut self) -> ExecutionStats {
217 self.stats.avg_slippage_bps = if self.slippage_weight > Decimal::ZERO {
218 Some(self.slippage_weighted_sum / self.slippage_weight)
219 } else {
220 None
221 };
222 self.stats
223 }
224}
225
226#[derive(Clone)]
227struct OrderRow {
228 id: String,
229 symbol: Symbol,
230 side: Side,
231 created_at: DateTime<Utc>,
232 algo_label: String,
233}
234
235#[derive(Clone)]
236struct FillRow {
237 order_id: String,
238 price: Decimal,
239 quantity: Decimal,
240 fee: Decimal,
241}
242
243struct OrderSummary {
244 fill_count: usize,
245 filled_quantity: Decimal,
246 notional: Decimal,
247 total_fees: Decimal,
248 slippage_bps: Option<Decimal>,
249 shortfall_value: Decimal,
250 has_arrival: bool,
251}
252
253struct ArrivalLookup {
254 ticks: HashMap<Symbol, Vec<TickPoint>>,
255}
256
257impl ArrivalLookup {
258 fn new(rows: Vec<TickPoint>) -> Self {
259 let mut ticks: HashMap<Symbol, Vec<TickPoint>> = HashMap::new();
260 for row in rows {
261 ticks.entry(row.symbol).or_default().push(row);
262 }
263 for series in ticks.values_mut() {
264 series.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
265 }
266 Self { ticks }
267 }
268
269 fn price_at(&self, symbol: &Symbol, timestamp: DateTime<Utc>) -> Option<Decimal> {
270 let series = self.ticks.get(symbol)?;
271 if series.is_empty() {
272 return None;
273 }
274 let idx = series.partition_point(|point| point.timestamp <= timestamp);
275 if idx == 0 {
276 Some(series[0].price)
277 } else {
278 Some(series[idx - 1].price)
279 }
280 }
281}
282
283#[derive(Clone)]
284struct TickPoint {
285 symbol: Symbol,
286 price: Decimal,
287 timestamp: DateTime<Utc>,
288}
289
290#[derive(Clone)]
291struct TimeRange {
292 start: Option<DateTime<Utc>>,
293 end: Option<DateTime<Utc>>,
294}
295
296impl TimeRange {
297 fn new(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> Result<Self> {
298 if let (Some(s), Some(e)) = (start, end) {
299 if e < s {
300 bail!("end time must be after start time");
301 }
302 }
303 Ok(Self { start, end })
304 }
305
306 fn contains(&self, ts: DateTime<Utc>) -> bool {
307 if let Some(start) = self.start {
308 if ts < start {
309 return false;
310 }
311 }
312 if let Some(end) = self.end {
313 if ts > end {
314 return false;
315 }
316 }
317 true
318 }
319}
320
321fn summarize_order(
322 order: &OrderRow,
323 fills: &[FillRow],
324 arrival_lookup: &ArrivalLookup,
325) -> Option<OrderSummary> {
326 if fills.is_empty() {
327 return None;
328 }
329
330 let mut filled_quantity = Decimal::ZERO;
331 let mut notional = Decimal::ZERO;
332 let mut total_fees = Decimal::ZERO;
333 for fill in fills {
334 filled_quantity += fill.quantity;
335 notional += fill.price * fill.quantity;
336 total_fees += fill.fee;
337 }
338 if filled_quantity <= Decimal::ZERO {
339 return None;
340 }
341 let avg_fill_price = notional / filled_quantity;
342 let arrival = arrival_lookup.price_at(&order.symbol, order.created_at);
343 let mut slippage_bps = None;
344 let mut shortfall_value = Decimal::ZERO;
345 if let Some(arrival_price) = arrival {
346 if arrival_price > Decimal::ZERO {
347 let price_delta = (avg_fill_price - arrival_price) * side_sign(order.side);
348 shortfall_value = price_delta * filled_quantity;
349 let ratio = price_delta / arrival_price;
350 slippage_bps = Some(ratio * bps_factor());
351 }
352 }
353 Some(OrderSummary {
354 fill_count: fills.len(),
355 filled_quantity,
356 notional,
357 total_fees,
358 slippage_bps,
359 shortfall_value,
360 has_arrival: arrival.is_some(),
361 })
362}
363
364fn side_sign(side: Side) -> Decimal {
365 match side {
366 Side::Buy => Decimal::ONE,
367 Side::Sell => -Decimal::ONE,
368 }
369}
370
371pub fn collect_parquet_files(dir: &Path) -> Result<Vec<PathBuf>> {
372 if !dir.exists() {
373 return Ok(Vec::new());
374 }
375 let mut stack = vec![dir.to_path_buf()];
376 let mut files = Vec::new();
377 while let Some(path) = stack.pop() {
378 let metadata = fs::metadata(&path)
379 .with_context(|| format!("failed to inspect {}", path.to_string_lossy()))?;
380 if metadata.is_dir() {
381 for entry in fs::read_dir(&path)
382 .with_context(|| format!("failed to list {}", path.to_string_lossy()))?
383 {
384 let entry = entry?;
385 stack.push(entry.path());
386 }
387 } else if path
388 .extension()
389 .and_then(|ext| ext.to_str())
390 .map(|ext| ext.eq_ignore_ascii_case("parquet"))
391 .unwrap_or(false)
392 {
393 files.push(path);
394 }
395 }
396 files.sort();
397 Ok(files)
398}
399
400fn load_orders(paths: &[PathBuf], range: &TimeRange) -> Result<Vec<OrderRow>> {
401 let mut rows = Vec::new();
402 for path in paths {
403 let file =
404 File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
405 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
406 .with_batch_size(4096)
407 .build()?;
408 let mut columns: Option<OrderColumns> = None;
409 for batch in reader {
410 let batch = batch?;
411 if columns.is_none() {
412 columns = Some(OrderColumns::from_schema(&batch.schema())?);
413 }
414 let columns = columns.as_ref().expect("order columns should be set");
415 for row in 0..batch.num_rows() {
416 let created_at = timestamp_value(&batch, columns.created_at, row)?;
417 if !range.contains(created_at) {
418 continue;
419 }
420 let id = string_value(&batch, columns.id, row)?;
421 let symbol = string_value(&batch, columns.symbol, row)?;
422 let side = side_value(&batch, columns.side, row)?;
423 let client_order_id = string_option(&batch, columns.client_order_id, row)?;
424 let algo_label = infer_algo_label(client_order_id.as_deref());
425 rows.push(OrderRow {
426 id,
427 symbol: Symbol::from(symbol.as_str()),
428 side,
429 created_at,
430 algo_label,
431 });
432 }
433 }
434 }
435 Ok(rows)
436}
437
438fn load_fills(paths: &[PathBuf]) -> Result<Vec<FillRow>> {
439 let mut rows = Vec::new();
440 for path in paths {
441 let file =
442 File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
443 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
444 .with_batch_size(4096)
445 .build()?;
446 let mut columns: Option<FillColumns> = None;
447 for batch in reader {
448 let batch = batch?;
449 if columns.is_none() {
450 columns = Some(FillColumns::from_schema(&batch.schema())?);
451 }
452 let columns = columns.as_ref().expect("fill columns set");
453 for row in 0..batch.num_rows() {
454 rows.push(FillRow {
455 order_id: string_value(&batch, columns.order_id, row)?,
456 price: decimal_value(&batch, columns.price, row)?,
457 quantity: decimal_value(&batch, columns.quantity, row)?,
458 fee: decimal_option(&batch, columns.fee, row)?.unwrap_or(Decimal::ZERO),
459 });
460 }
461 }
462 }
463 Ok(rows)
464}
465
466fn load_ticks(paths: &[PathBuf]) -> Result<Vec<TickPoint>> {
467 let mut rows = Vec::new();
468 for path in paths {
469 let file =
470 File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
471 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
472 .with_batch_size(4096)
473 .build()?;
474 let mut columns: Option<TickColumns> = None;
475 for batch in reader {
476 let batch = batch?;
477 if columns.is_none() {
478 columns = Some(TickColumns::from_schema(&batch.schema())?);
479 }
480 let columns = columns.as_ref().expect("tick columns set");
481 for row in 0..batch.num_rows() {
482 let symbol = string_value(&batch, columns.symbol, row)?;
483 rows.push(TickPoint {
484 symbol: Symbol::from(symbol.as_str()),
485 price: decimal_value(&batch, columns.price, row)?,
486 timestamp: timestamp_value(&batch, columns.exchange_ts, row)?,
487 });
488 }
489 }
490 }
491 Ok(rows)
492}
493
494fn infer_algo_label(client_order_id: Option<&str>) -> String {
495 let value = client_order_id.unwrap_or("unlabeled");
496 let normalized = value.to_ascii_lowercase();
497 if normalized.starts_with("twap") {
498 "TWAP".to_string()
499 } else if normalized.starts_with("vwap") {
500 "VWAP".to_string()
501 } else if normalized.starts_with("iceberg") {
502 "ICEBERG".to_string()
503 } else if normalized.starts_with("pegged") {
504 "PEGGED".to_string()
505 } else if normalized.starts_with("sniper") {
506 "SNIPER".to_string()
507 } else if normalized.ends_with("-sl") {
508 "STOP_LOSS".to_string()
509 } else if normalized.ends_with("-tp") {
510 "TAKE_PROFIT".to_string()
511 } else {
512 "SIGNAL".to_string()
513 }
514}
515
516struct OrderColumns {
517 id: usize,
518 symbol: usize,
519 side: usize,
520 client_order_id: usize,
521 created_at: usize,
522}
523
524impl OrderColumns {
525 fn from_schema(schema: &SchemaRef) -> Result<Self> {
526 Ok(Self {
527 id: column_index(schema, "id")?,
528 symbol: column_index(schema, "symbol")?,
529 side: column_index(schema, "side")?,
530 client_order_id: column_index(schema, "client_order_id")?,
531 created_at: column_index(schema, "created_at")?,
532 })
533 }
534}
535
536struct FillColumns {
537 order_id: usize,
538 price: usize,
539 quantity: usize,
540 fee: usize,
541}
542
543impl FillColumns {
544 fn from_schema(schema: &SchemaRef) -> Result<Self> {
545 Ok(Self {
546 order_id: column_index(schema, "order_id")?,
547 price: column_index(schema, "fill_price")?,
548 quantity: column_index(schema, "fill_quantity")?,
549 fee: column_index(schema, "fee")?,
550 })
551 }
552}
553
554struct TickColumns {
555 symbol: usize,
556 price: usize,
557 exchange_ts: usize,
558}
559
560impl TickColumns {
561 fn from_schema(schema: &SchemaRef) -> Result<Self> {
562 Ok(Self {
563 symbol: column_index(schema, "symbol")?,
564 price: column_index(schema, "price")?,
565 exchange_ts: column_index(schema, "exchange_timestamp")?,
566 })
567 }
568}
569
570fn column_index(schema: &SchemaRef, name: &str) -> Result<usize> {
571 schema
572 .column_with_name(name)
573 .map(|(idx, _)| idx)
574 .ok_or_else(|| anyhow!("column '{name}' missing from parquet schema"))
575}
576
577fn as_array<T: Array + 'static>(batch: &RecordBatch, column: usize) -> Result<&T> {
578 batch
579 .column(column)
580 .as_any()
581 .downcast_ref::<T>()
582 .ok_or_else(|| anyhow!("column {column} type mismatch"))
583}
584
585fn string_value(batch: &RecordBatch, column: usize, row: usize) -> Result<String> {
586 let array = as_array::<StringArray>(batch, column)?;
587 if array.is_null(row) {
588 return Err(anyhow!("column {column} contains null string"));
589 }
590 Ok(array.value(row).to_string())
591}
592
593fn string_option(batch: &RecordBatch, column: usize, row: usize) -> Result<Option<String>> {
594 let array = as_array::<StringArray>(batch, column)?;
595 if array.is_null(row) {
596 return Ok(None);
597 }
598 Ok(Some(array.value(row).to_string()))
599}
600
601fn decimal_value(batch: &RecordBatch, column: usize, row: usize) -> Result<Decimal> {
602 let array = as_array::<Decimal128Array>(batch, column)?;
603 if array.is_null(row) {
604 return Err(anyhow!("column {column} contains null decimal"));
605 }
606 Ok(Decimal::from_i128_with_scale(
607 array.value(row),
608 array.scale() as u32,
609 ))
610}
611
612fn decimal_option(batch: &RecordBatch, column: usize, row: usize) -> Result<Option<Decimal>> {
613 let array = as_array::<Decimal128Array>(batch, column)?;
614 if array.is_null(row) {
615 return Ok(None);
616 }
617 Ok(Some(Decimal::from_i128_with_scale(
618 array.value(row),
619 array.scale() as u32,
620 )))
621}
622
623fn timestamp_value(batch: &RecordBatch, column: usize, row: usize) -> Result<DateTime<Utc>> {
624 let array = as_array::<TimestampNanosecondArray>(batch, column)?;
625 if array.is_null(row) {
626 return Err(anyhow!("column {column} contains null timestamp"));
627 }
628 let nanos = array.value(row);
629 let secs = nanos.div_euclid(1_000_000_000);
630 let sub = nanos.rem_euclid(1_000_000_000) as u32;
631 DateTime::<Utc>::from_timestamp(secs, sub)
632 .ok_or_else(|| anyhow!("timestamp overflow for value {nanos}"))
633}
634
635fn side_value(batch: &RecordBatch, column: usize, row: usize) -> Result<Side> {
636 let array = as_array::<Int8Array>(batch, column)?;
637 if array.is_null(row) {
638 return Err(anyhow!("column {column} contains null side"));
639 }
640 Ok(if array.value(row) >= 0 {
641 Side::Buy
642 } else {
643 Side::Sell
644 })
645}
646
647#[cfg(test)]
648mod tests {
649 use super::*;
650 use arrow::record_batch::RecordBatch;
651 use chrono::TimeZone;
652 use parquet::arrow::ArrowWriter;
653 use parquet::file::properties::WriterProperties;
654 use rust_decimal::prelude::FromPrimitive;
655 use tempfile::tempdir;
656 use tesser_core::{
657 Fill, Order, OrderRequest, OrderStatus, OrderType, Symbol, Tick, TimeInForce,
658 };
659
660 use crate::encoding::{fills_to_batch, orders_to_batch, ticks_to_batch};
661
662 #[test]
663 fn computes_slippage_from_mock_data() -> Result<()> {
664 let dir = tempdir()?;
665 let root = dir.path();
666 let order_id = "order-1".to_string();
667 let created_at = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
668 let order = Order {
669 id: order_id.clone(),
670 request: OrderRequest {
671 symbol: Symbol::from("BTCUSDT"),
672 side: Side::Buy,
673 order_type: OrderType::Market,
674 quantity: Decimal::from_i64(2).unwrap(),
675 price: None,
676 trigger_price: None,
677 time_in_force: Some(TimeInForce::GoodTilCanceled),
678 client_order_id: Some("twap-demo-1".to_string()),
679 take_profit: None,
680 stop_loss: None,
681 display_quantity: None,
682 },
683 status: OrderStatus::Filled,
684 filled_quantity: Decimal::from_i64(2).unwrap(),
685 avg_fill_price: None,
686 created_at,
687 updated_at: created_at,
688 };
689 let orders_batch = orders_to_batch(std::slice::from_ref(&order))?;
690 write_partition(root, "orders", created_at, &orders_batch)?;
691
692 let fill_one = Fill {
693 order_id: order_id.clone(),
694 symbol: order.request.symbol,
695 side: order.request.side,
696 fill_price: Decimal::from_f64(101.0).unwrap(),
697 fill_quantity: Decimal::ONE,
698 fee: Some(Decimal::new(1, 2)),
699 fee_asset: None,
700 timestamp: created_at,
701 };
702 let fill_two = Fill {
703 order_id: order_id.clone(),
704 symbol: order.request.symbol,
705 side: order.request.side,
706 fill_price: Decimal::from_f64(102.0).unwrap(),
707 fill_quantity: Decimal::ONE,
708 fee: Some(Decimal::new(1, 2)),
709 fee_asset: None,
710 timestamp: created_at,
711 };
712 let fills_batch = fills_to_batch(&[fill_one, fill_two])?;
713 write_partition(root, "fills", created_at, &fills_batch)?;
714
715 let tick = Tick {
716 symbol: order.request.symbol,
717 price: Decimal::from_f64(100.0).unwrap(),
718 size: Decimal::ONE,
719 side: Side::Buy,
720 exchange_timestamp: created_at,
721 received_at: created_at,
722 };
723 let ticks_batch = ticks_to_batch(std::slice::from_ref(&tick))?;
724 write_partition(root, "ticks", created_at, &ticks_batch)?;
725
726 let report = analyze_execution(&ExecutionAnalysisRequest {
727 data_dir: root.into(),
728 start: None,
729 end: None,
730 })?;
731
732 assert_eq!(report.totals.order_count, 1);
733 assert_eq!(report.totals.fill_count, 2);
734 assert_eq!(report.totals.filled_quantity, Decimal::from_i64(2).unwrap());
735 assert_eq!(report.totals.orders_with_arrival, 1);
736 assert_eq!(report.totals.total_fees, Decimal::from_f64(0.02).unwrap());
737 assert_eq!(
738 report.totals.implementation_shortfall,
739 Decimal::from_f64(3.0).unwrap()
740 );
741 let bps = report.totals.avg_slippage_bps.expect("slippage available");
742 assert_eq!(bps, Decimal::from_i64(150).unwrap());
743 let algo = report
744 .per_algo
745 .iter()
746 .find(|entry| entry.label == "TWAP")
747 .expect("twap bucket exists");
748 assert_eq!(algo.order_count, 1);
749 Ok(())
750 }
751
752 #[test]
753 fn handles_missing_orders_in_window() -> Result<()> {
754 let dir = tempdir()?;
755 let root = dir.path();
756 let created_at = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
757 let order = Order {
758 id: "order-1".to_string(),
759 request: OrderRequest {
760 symbol: Symbol::from("BTCUSDT"),
761 side: Side::Buy,
762 order_type: OrderType::Market,
763 quantity: Decimal::ONE,
764 price: None,
765 trigger_price: None,
766 time_in_force: Some(TimeInForce::FillOrKill),
767 client_order_id: Some("sniper-1".to_string()),
768 take_profit: None,
769 stop_loss: None,
770 display_quantity: None,
771 },
772 status: OrderStatus::Canceled,
773 filled_quantity: Decimal::ZERO,
774 avg_fill_price: None,
775 created_at,
776 updated_at: created_at,
777 };
778 let orders_batch = orders_to_batch(std::slice::from_ref(&order))?;
779 write_partition(root, "orders", created_at, &orders_batch)?;
780
781 let fills_batch = fills_to_batch(&[])?;
782 write_partition(root, "fills", created_at, &fills_batch)?;
783
784 let ticks_batch = ticks_to_batch(&[])?;
785 write_partition(root, "ticks", created_at, &ticks_batch)?;
786
787 let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
788 let report = analyze_execution(&ExecutionAnalysisRequest {
789 data_dir: root.into(),
790 start: Some(start),
791 end: None,
792 })?;
793 assert_eq!(report.totals.order_count, 0);
794 assert_eq!(report.skipped_orders, 0);
795 Ok(())
796 }
797
798 fn write_partition(
799 root: &Path,
800 kind: &str,
801 timestamp: DateTime<Utc>,
802 batch: &RecordBatch,
803 ) -> Result<()> {
804 let day = timestamp.date_naive().to_string();
805 let dir = root.join(kind).join(day);
806 std::fs::create_dir_all(&dir)
807 .with_context(|| format!("failed to create {}", dir.display()))?;
808 let path = dir.join("part-000.parquet");
809 let file =
810 File::create(&path).with_context(|| format!("failed to create {}", path.display()))?;
811 let props = WriterProperties::builder().build();
812 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
813 writer.write(batch)?;
814 writer.close().map(|_| ()).map_err(Into::into)
815 }
816}