tesser_data/
analytics.rs

1use std::collections::HashMap;
2use std::fs::{self, File};
3use std::path::{Path, PathBuf};
4
5use anyhow::{anyhow, bail, Context, Result};
6use arrow::array::{Array, Decimal128Array, Int8Array, StringArray, TimestampNanosecondArray};
7use arrow::datatypes::SchemaRef;
8use arrow::record_batch::RecordBatch;
9use chrono::{DateTime, Utc};
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11use rust_decimal::Decimal;
12use tesser_core::{Side, Symbol};
13
14/// Request payload for execution analytics.
15#[derive(Debug, Clone)]
16pub struct ExecutionAnalysisRequest {
17    /// Directory that contains `orders/`, `fills/` and `ticks/` sub-directories.
18    pub data_dir: PathBuf,
19    /// Optional inclusive lower bound for the analyzed window.
20    pub start: Option<DateTime<Utc>>,
21    /// Optional inclusive upper bound for the analyzed window.
22    pub end: Option<DateTime<Utc>>,
23}
24
25/// High-level metrics computed for either the entire data set or one algo bucket.
26#[derive(Debug, Clone)]
27pub struct ExecutionStats {
28    pub label: String,
29    pub order_count: usize,
30    pub fill_count: usize,
31    pub orders_with_arrival: usize,
32    pub filled_quantity: Decimal,
33    pub notional: Decimal,
34    pub total_fees: Decimal,
35    pub implementation_shortfall: Decimal,
36    pub avg_slippage_bps: Option<Decimal>,
37}
38
39impl ExecutionStats {
40    fn empty(label: impl Into<String>) -> Self {
41        Self {
42            label: label.into(),
43            order_count: 0,
44            fill_count: 0,
45            orders_with_arrival: 0,
46            filled_quantity: Decimal::ZERO,
47            notional: Decimal::ZERO,
48            total_fees: Decimal::ZERO,
49            implementation_shortfall: Decimal::ZERO,
50            avg_slippage_bps: None,
51        }
52    }
53}
54
55/// Execution analysis output, containing totals and the algo breakdown.
56#[derive(Debug, Clone)]
57pub struct ExecutionReport {
58    pub period_start: Option<DateTime<Utc>>,
59    pub period_end: Option<DateTime<Utc>>,
60    pub totals: ExecutionStats,
61    pub per_algo: Vec<ExecutionStats>,
62    pub skipped_orders: usize,
63}
64
65fn bps_factor() -> Decimal {
66    Decimal::new(10_000, 0)
67}
68
69/// Compute a [`ExecutionReport`] by scanning flight-recorder parquet files.
70pub fn analyze_execution(request: &ExecutionAnalysisRequest) -> Result<ExecutionReport> {
71    let range = TimeRange::new(request.start, request.end)?;
72    let orders_dir = request.data_dir.join("orders");
73    let fills_dir = request.data_dir.join("fills");
74    if !orders_dir.exists() {
75        bail!(
76            "orders directory missing at {}",
77            orders_dir.to_string_lossy()
78        );
79    }
80    if !fills_dir.exists() {
81        bail!("fills directory missing at {}", fills_dir.to_string_lossy());
82    }
83    let ticks_dir = request.data_dir.join("ticks");
84
85    let order_paths = collect_parquet_files(&orders_dir)?;
86    if order_paths.is_empty() {
87        bail!(
88            "no parquet files found under {}",
89            orders_dir.to_string_lossy()
90        );
91    }
92    let fill_paths = collect_parquet_files(&fills_dir)?;
93    if fill_paths.is_empty() {
94        bail!(
95            "no parquet files found under {}",
96            fills_dir.to_string_lossy()
97        );
98    }
99    let tick_paths = collect_parquet_files(&ticks_dir)?;
100
101    let orders = load_orders(&order_paths, &range)?;
102    if orders.is_empty() {
103        return Ok(ExecutionReport {
104            period_start: request.start,
105            period_end: request.end,
106            totals: ExecutionStats::empty("ALL"),
107            per_algo: Vec::new(),
108            skipped_orders: 0,
109        });
110    }
111    let fills = load_fills(&fill_paths)?;
112    let ticks = load_ticks(&tick_paths)?;
113
114    let mut fills_by_order: HashMap<String, Vec<FillRow>> = HashMap::new();
115    for fill in fills {
116        fills_by_order
117            .entry(fill.order_id.clone())
118            .or_default()
119            .push(fill);
120    }
121
122    let arrival_lookup = ArrivalLookup::new(ticks);
123    let mut aggregator = StatsAggregator::new();
124    let mut skipped = 0usize;
125
126    for order in orders {
127        let Some(fill_rows) = fills_by_order.get(&order.id) else {
128            skipped += 1;
129            continue;
130        };
131        if fill_rows.is_empty() {
132            skipped += 1;
133            continue;
134        }
135        match summarize_order(&order, fill_rows, &arrival_lookup) {
136            Some(summary) => aggregator.record(&order.algo_label, &summary),
137            None => skipped += 1,
138        }
139    }
140
141    let (totals, mut per_algo) = aggregator.finish();
142    per_algo.sort_by(|a, b| b.notional.cmp(&a.notional));
143
144    Ok(ExecutionReport {
145        period_start: request.start,
146        period_end: request.end,
147        totals,
148        per_algo,
149        skipped_orders: skipped,
150    })
151}
152
153struct StatsAggregator {
154    totals: StatsAccumulator,
155    groups: HashMap<String, StatsAccumulator>,
156}
157
158impl StatsAggregator {
159    fn new() -> Self {
160        Self {
161            totals: StatsAccumulator::new("ALL"),
162            groups: HashMap::new(),
163        }
164    }
165
166    fn record(&mut self, label: &str, summary: &OrderSummary) {
167        self.totals.ingest(summary);
168        self.groups
169            .entry(label.to_string())
170            .or_insert_with(|| StatsAccumulator::new(label))
171            .ingest(summary);
172    }
173
174    fn finish(self) -> (ExecutionStats, Vec<ExecutionStats>) {
175        let totals = self.totals.into_stats();
176        let groups = self
177            .groups
178            .into_values()
179            .map(|acc| acc.into_stats())
180            .collect();
181        (totals, groups)
182    }
183}
184
185struct StatsAccumulator {
186    stats: ExecutionStats,
187    slippage_weighted_sum: Decimal,
188    slippage_weight: Decimal,
189}
190
191impl StatsAccumulator {
192    fn new(label: &str) -> Self {
193        Self {
194            stats: ExecutionStats::empty(label),
195            slippage_weighted_sum: Decimal::ZERO,
196            slippage_weight: Decimal::ZERO,
197        }
198    }
199
200    fn ingest(&mut self, summary: &OrderSummary) {
201        self.stats.order_count += 1;
202        self.stats.fill_count += summary.fill_count;
203        if summary.has_arrival {
204            self.stats.orders_with_arrival += 1;
205        }
206        self.stats.filled_quantity += summary.filled_quantity;
207        self.stats.notional += summary.notional;
208        self.stats.total_fees += summary.total_fees;
209        self.stats.implementation_shortfall += summary.shortfall_value;
210        if let Some(bps) = summary.slippage_bps {
211            self.slippage_weighted_sum += bps * summary.filled_quantity;
212            self.slippage_weight += summary.filled_quantity;
213        }
214    }
215
216    fn into_stats(mut self) -> ExecutionStats {
217        self.stats.avg_slippage_bps = if self.slippage_weight > Decimal::ZERO {
218            Some(self.slippage_weighted_sum / self.slippage_weight)
219        } else {
220            None
221        };
222        self.stats
223    }
224}
225
226#[derive(Clone)]
227struct OrderRow {
228    id: String,
229    symbol: Symbol,
230    side: Side,
231    created_at: DateTime<Utc>,
232    algo_label: String,
233}
234
235#[derive(Clone)]
236struct FillRow {
237    order_id: String,
238    price: Decimal,
239    quantity: Decimal,
240    fee: Decimal,
241}
242
243struct OrderSummary {
244    fill_count: usize,
245    filled_quantity: Decimal,
246    notional: Decimal,
247    total_fees: Decimal,
248    slippage_bps: Option<Decimal>,
249    shortfall_value: Decimal,
250    has_arrival: bool,
251}
252
253struct ArrivalLookup {
254    ticks: HashMap<Symbol, Vec<TickPoint>>,
255}
256
257impl ArrivalLookup {
258    fn new(rows: Vec<TickPoint>) -> Self {
259        let mut ticks: HashMap<Symbol, Vec<TickPoint>> = HashMap::new();
260        for row in rows {
261            ticks.entry(row.symbol.clone()).or_default().push(row);
262        }
263        for series in ticks.values_mut() {
264            series.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
265        }
266        Self { ticks }
267    }
268
269    fn price_at(&self, symbol: &str, timestamp: DateTime<Utc>) -> Option<Decimal> {
270        let series = self.ticks.get(symbol)?;
271        if series.is_empty() {
272            return None;
273        }
274        let idx = series.partition_point(|point| point.timestamp <= timestamp);
275        if idx == 0 {
276            Some(series[0].price)
277        } else {
278            Some(series[idx - 1].price)
279        }
280    }
281}
282
283#[derive(Clone)]
284struct TickPoint {
285    symbol: Symbol,
286    price: Decimal,
287    timestamp: DateTime<Utc>,
288}
289
290#[derive(Clone)]
291struct TimeRange {
292    start: Option<DateTime<Utc>>,
293    end: Option<DateTime<Utc>>,
294}
295
296impl TimeRange {
297    fn new(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> Result<Self> {
298        if let (Some(s), Some(e)) = (start, end) {
299            if e < s {
300                bail!("end time must be after start time");
301            }
302        }
303        Ok(Self { start, end })
304    }
305
306    fn contains(&self, ts: DateTime<Utc>) -> bool {
307        if let Some(start) = self.start {
308            if ts < start {
309                return false;
310            }
311        }
312        if let Some(end) = self.end {
313            if ts > end {
314                return false;
315            }
316        }
317        true
318    }
319}
320
321fn summarize_order(
322    order: &OrderRow,
323    fills: &[FillRow],
324    arrival_lookup: &ArrivalLookup,
325) -> Option<OrderSummary> {
326    if fills.is_empty() {
327        return None;
328    }
329
330    let mut filled_quantity = Decimal::ZERO;
331    let mut notional = Decimal::ZERO;
332    let mut total_fees = Decimal::ZERO;
333    for fill in fills {
334        filled_quantity += fill.quantity;
335        notional += fill.price * fill.quantity;
336        total_fees += fill.fee;
337    }
338    if filled_quantity <= Decimal::ZERO {
339        return None;
340    }
341    let avg_fill_price = notional / filled_quantity;
342    let arrival = arrival_lookup.price_at(&order.symbol, order.created_at);
343    let mut slippage_bps = None;
344    let mut shortfall_value = Decimal::ZERO;
345    if let Some(arrival_price) = arrival {
346        if arrival_price > Decimal::ZERO {
347            let price_delta = (avg_fill_price - arrival_price) * side_sign(order.side);
348            shortfall_value = price_delta * filled_quantity;
349            let ratio = price_delta / arrival_price;
350            slippage_bps = Some(ratio * bps_factor());
351        }
352    }
353    Some(OrderSummary {
354        fill_count: fills.len(),
355        filled_quantity,
356        notional,
357        total_fees,
358        slippage_bps,
359        shortfall_value,
360        has_arrival: arrival.is_some(),
361    })
362}
363
364fn side_sign(side: Side) -> Decimal {
365    match side {
366        Side::Buy => Decimal::ONE,
367        Side::Sell => -Decimal::ONE,
368    }
369}
370
371fn collect_parquet_files(dir: &Path) -> Result<Vec<PathBuf>> {
372    if !dir.exists() {
373        return Ok(Vec::new());
374    }
375    let mut stack = vec![dir.to_path_buf()];
376    let mut files = Vec::new();
377    while let Some(path) = stack.pop() {
378        let metadata = fs::metadata(&path)
379            .with_context(|| format!("failed to inspect {}", path.to_string_lossy()))?;
380        if metadata.is_dir() {
381            for entry in fs::read_dir(&path)
382                .with_context(|| format!("failed to list {}", path.to_string_lossy()))?
383            {
384                let entry = entry?;
385                stack.push(entry.path());
386            }
387        } else if path
388            .extension()
389            .and_then(|ext| ext.to_str())
390            .map(|ext| ext.eq_ignore_ascii_case("parquet"))
391            .unwrap_or(false)
392        {
393            files.push(path);
394        }
395    }
396    files.sort();
397    Ok(files)
398}
399
400fn load_orders(paths: &[PathBuf], range: &TimeRange) -> Result<Vec<OrderRow>> {
401    let mut rows = Vec::new();
402    for path in paths {
403        let file =
404            File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
405        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
406            .with_batch_size(4096)
407            .build()?;
408        let mut columns: Option<OrderColumns> = None;
409        for batch in reader {
410            let batch = batch?;
411            if columns.is_none() {
412                columns = Some(OrderColumns::from_schema(&batch.schema())?);
413            }
414            let columns = columns.as_ref().expect("order columns should be set");
415            for row in 0..batch.num_rows() {
416                let created_at = timestamp_value(&batch, columns.created_at, row)?;
417                if !range.contains(created_at) {
418                    continue;
419                }
420                let id = string_value(&batch, columns.id, row)?;
421                let symbol = string_value(&batch, columns.symbol, row)?;
422                let side = side_value(&batch, columns.side, row)?;
423                let client_order_id = string_option(&batch, columns.client_order_id, row)?;
424                let algo_label = infer_algo_label(client_order_id.as_deref());
425                rows.push(OrderRow {
426                    id,
427                    symbol,
428                    side,
429                    created_at,
430                    algo_label,
431                });
432            }
433        }
434    }
435    Ok(rows)
436}
437
438fn load_fills(paths: &[PathBuf]) -> Result<Vec<FillRow>> {
439    let mut rows = Vec::new();
440    for path in paths {
441        let file =
442            File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
443        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
444            .with_batch_size(4096)
445            .build()?;
446        let mut columns: Option<FillColumns> = None;
447        for batch in reader {
448            let batch = batch?;
449            if columns.is_none() {
450                columns = Some(FillColumns::from_schema(&batch.schema())?);
451            }
452            let columns = columns.as_ref().expect("fill columns set");
453            for row in 0..batch.num_rows() {
454                rows.push(FillRow {
455                    order_id: string_value(&batch, columns.order_id, row)?,
456                    price: decimal_value(&batch, columns.price, row)?,
457                    quantity: decimal_value(&batch, columns.quantity, row)?,
458                    fee: decimal_option(&batch, columns.fee, row)?.unwrap_or(Decimal::ZERO),
459                });
460            }
461        }
462    }
463    Ok(rows)
464}
465
466fn load_ticks(paths: &[PathBuf]) -> Result<Vec<TickPoint>> {
467    let mut rows = Vec::new();
468    for path in paths {
469        let file =
470            File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
471        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
472            .with_batch_size(4096)
473            .build()?;
474        let mut columns: Option<TickColumns> = None;
475        for batch in reader {
476            let batch = batch?;
477            if columns.is_none() {
478                columns = Some(TickColumns::from_schema(&batch.schema())?);
479            }
480            let columns = columns.as_ref().expect("tick columns set");
481            for row in 0..batch.num_rows() {
482                rows.push(TickPoint {
483                    symbol: string_value(&batch, columns.symbol, row)?,
484                    price: decimal_value(&batch, columns.price, row)?,
485                    timestamp: timestamp_value(&batch, columns.exchange_ts, row)?,
486                });
487            }
488        }
489    }
490    Ok(rows)
491}
492
493fn infer_algo_label(client_order_id: Option<&str>) -> String {
494    let value = client_order_id.unwrap_or("unlabeled");
495    let normalized = value.to_ascii_lowercase();
496    if normalized.starts_with("twap") {
497        "TWAP".to_string()
498    } else if normalized.starts_with("vwap") {
499        "VWAP".to_string()
500    } else if normalized.starts_with("iceberg") {
501        "ICEBERG".to_string()
502    } else if normalized.starts_with("pegged") {
503        "PEGGED".to_string()
504    } else if normalized.starts_with("sniper") {
505        "SNIPER".to_string()
506    } else if normalized.ends_with("-sl") {
507        "STOP_LOSS".to_string()
508    } else if normalized.ends_with("-tp") {
509        "TAKE_PROFIT".to_string()
510    } else {
511        "SIGNAL".to_string()
512    }
513}
514
515struct OrderColumns {
516    id: usize,
517    symbol: usize,
518    side: usize,
519    client_order_id: usize,
520    created_at: usize,
521}
522
523impl OrderColumns {
524    fn from_schema(schema: &SchemaRef) -> Result<Self> {
525        Ok(Self {
526            id: column_index(schema, "id")?,
527            symbol: column_index(schema, "symbol")?,
528            side: column_index(schema, "side")?,
529            client_order_id: column_index(schema, "client_order_id")?,
530            created_at: column_index(schema, "created_at")?,
531        })
532    }
533}
534
535struct FillColumns {
536    order_id: usize,
537    price: usize,
538    quantity: usize,
539    fee: usize,
540}
541
542impl FillColumns {
543    fn from_schema(schema: &SchemaRef) -> Result<Self> {
544        Ok(Self {
545            order_id: column_index(schema, "order_id")?,
546            price: column_index(schema, "fill_price")?,
547            quantity: column_index(schema, "fill_quantity")?,
548            fee: column_index(schema, "fee")?,
549        })
550    }
551}
552
553struct TickColumns {
554    symbol: usize,
555    price: usize,
556    exchange_ts: usize,
557}
558
559impl TickColumns {
560    fn from_schema(schema: &SchemaRef) -> Result<Self> {
561        Ok(Self {
562            symbol: column_index(schema, "symbol")?,
563            price: column_index(schema, "price")?,
564            exchange_ts: column_index(schema, "exchange_timestamp")?,
565        })
566    }
567}
568
569fn column_index(schema: &SchemaRef, name: &str) -> Result<usize> {
570    schema
571        .column_with_name(name)
572        .map(|(idx, _)| idx)
573        .ok_or_else(|| anyhow!("column '{name}' missing from parquet schema"))
574}
575
576fn as_array<T: Array + 'static>(batch: &RecordBatch, column: usize) -> Result<&T> {
577    batch
578        .column(column)
579        .as_any()
580        .downcast_ref::<T>()
581        .ok_or_else(|| anyhow!("column {column} type mismatch"))
582}
583
584fn string_value(batch: &RecordBatch, column: usize, row: usize) -> Result<String> {
585    let array = as_array::<StringArray>(batch, column)?;
586    if array.is_null(row) {
587        return Err(anyhow!("column {column} contains null string"));
588    }
589    Ok(array.value(row).to_string())
590}
591
592fn string_option(batch: &RecordBatch, column: usize, row: usize) -> Result<Option<String>> {
593    let array = as_array::<StringArray>(batch, column)?;
594    if array.is_null(row) {
595        return Ok(None);
596    }
597    Ok(Some(array.value(row).to_string()))
598}
599
600fn decimal_value(batch: &RecordBatch, column: usize, row: usize) -> Result<Decimal> {
601    let array = as_array::<Decimal128Array>(batch, column)?;
602    if array.is_null(row) {
603        return Err(anyhow!("column {column} contains null decimal"));
604    }
605    Ok(Decimal::from_i128_with_scale(
606        array.value(row),
607        array.scale() as u32,
608    ))
609}
610
611fn decimal_option(batch: &RecordBatch, column: usize, row: usize) -> Result<Option<Decimal>> {
612    let array = as_array::<Decimal128Array>(batch, column)?;
613    if array.is_null(row) {
614        return Ok(None);
615    }
616    Ok(Some(Decimal::from_i128_with_scale(
617        array.value(row),
618        array.scale() as u32,
619    )))
620}
621
622fn timestamp_value(batch: &RecordBatch, column: usize, row: usize) -> Result<DateTime<Utc>> {
623    let array = as_array::<TimestampNanosecondArray>(batch, column)?;
624    if array.is_null(row) {
625        return Err(anyhow!("column {column} contains null timestamp"));
626    }
627    let nanos = array.value(row);
628    let secs = nanos.div_euclid(1_000_000_000);
629    let sub = nanos.rem_euclid(1_000_000_000) as u32;
630    DateTime::<Utc>::from_timestamp(secs, sub)
631        .ok_or_else(|| anyhow!("timestamp overflow for value {nanos}"))
632}
633
634fn side_value(batch: &RecordBatch, column: usize, row: usize) -> Result<Side> {
635    let array = as_array::<Int8Array>(batch, column)?;
636    if array.is_null(row) {
637        return Err(anyhow!("column {column} contains null side"));
638    }
639    Ok(if array.value(row) >= 0 {
640        Side::Buy
641    } else {
642        Side::Sell
643    })
644}
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649    use arrow::record_batch::RecordBatch;
650    use chrono::TimeZone;
651    use parquet::arrow::ArrowWriter;
652    use parquet::file::properties::WriterProperties;
653    use rust_decimal::prelude::FromPrimitive;
654    use tempfile::tempdir;
655    use tesser_core::{Fill, Order, OrderRequest, OrderStatus, OrderType, Tick, TimeInForce};
656
657    use crate::encoding::{fills_to_batch, orders_to_batch, ticks_to_batch};
658
659    #[test]
660    fn computes_slippage_from_mock_data() -> Result<()> {
661        let dir = tempdir()?;
662        let root = dir.path();
663        let order_id = "order-1".to_string();
664        let created_at = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
665        let order = Order {
666            id: order_id.clone(),
667            request: OrderRequest {
668                symbol: "BTCUSDT".to_string(),
669                side: Side::Buy,
670                order_type: OrderType::Market,
671                quantity: Decimal::from_i64(2).unwrap(),
672                price: None,
673                trigger_price: None,
674                time_in_force: Some(TimeInForce::GoodTilCanceled),
675                client_order_id: Some("twap-demo-1".to_string()),
676                take_profit: None,
677                stop_loss: None,
678                display_quantity: None,
679            },
680            status: OrderStatus::Filled,
681            filled_quantity: Decimal::from_i64(2).unwrap(),
682            avg_fill_price: None,
683            created_at,
684            updated_at: created_at,
685        };
686        let orders_batch = orders_to_batch(std::slice::from_ref(&order))?;
687        write_partition(root, "orders", created_at, &orders_batch)?;
688
689        let fill_one = Fill {
690            order_id: order_id.clone(),
691            symbol: order.request.symbol.clone(),
692            side: order.request.side,
693            fill_price: Decimal::from_f64(101.0).unwrap(),
694            fill_quantity: Decimal::ONE,
695            fee: Some(Decimal::new(1, 2)),
696            timestamp: created_at,
697        };
698        let fill_two = Fill {
699            order_id: order_id.clone(),
700            symbol: order.request.symbol.clone(),
701            side: order.request.side,
702            fill_price: Decimal::from_f64(102.0).unwrap(),
703            fill_quantity: Decimal::ONE,
704            fee: Some(Decimal::new(1, 2)),
705            timestamp: created_at,
706        };
707        let fills_batch = fills_to_batch(&[fill_one, fill_two])?;
708        write_partition(root, "fills", created_at, &fills_batch)?;
709
710        let tick = Tick {
711            symbol: order.request.symbol.clone(),
712            price: Decimal::from_f64(100.0).unwrap(),
713            size: Decimal::ONE,
714            side: Side::Buy,
715            exchange_timestamp: created_at,
716            received_at: created_at,
717        };
718        let ticks_batch = ticks_to_batch(std::slice::from_ref(&tick))?;
719        write_partition(root, "ticks", created_at, &ticks_batch)?;
720
721        let report = analyze_execution(&ExecutionAnalysisRequest {
722            data_dir: root.into(),
723            start: None,
724            end: None,
725        })?;
726
727        assert_eq!(report.totals.order_count, 1);
728        assert_eq!(report.totals.fill_count, 2);
729        assert_eq!(report.totals.filled_quantity, Decimal::from_i64(2).unwrap());
730        assert_eq!(report.totals.orders_with_arrival, 1);
731        assert_eq!(report.totals.total_fees, Decimal::from_f64(0.02).unwrap());
732        assert_eq!(
733            report.totals.implementation_shortfall,
734            Decimal::from_f64(3.0).unwrap()
735        );
736        let bps = report.totals.avg_slippage_bps.expect("slippage available");
737        assert_eq!(bps, Decimal::from_i64(150).unwrap());
738        let algo = report
739            .per_algo
740            .iter()
741            .find(|entry| entry.label == "TWAP")
742            .expect("twap bucket exists");
743        assert_eq!(algo.order_count, 1);
744        Ok(())
745    }
746
747    #[test]
748    fn handles_missing_orders_in_window() -> Result<()> {
749        let dir = tempdir()?;
750        let root = dir.path();
751        let created_at = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
752        let order = Order {
753            id: "order-1".to_string(),
754            request: OrderRequest {
755                symbol: "BTCUSDT".to_string(),
756                side: Side::Buy,
757                order_type: OrderType::Market,
758                quantity: Decimal::ONE,
759                price: None,
760                trigger_price: None,
761                time_in_force: Some(TimeInForce::FillOrKill),
762                client_order_id: Some("sniper-1".to_string()),
763                take_profit: None,
764                stop_loss: None,
765                display_quantity: None,
766            },
767            status: OrderStatus::Canceled,
768            filled_quantity: Decimal::ZERO,
769            avg_fill_price: None,
770            created_at,
771            updated_at: created_at,
772        };
773        let orders_batch = orders_to_batch(std::slice::from_ref(&order))?;
774        write_partition(root, "orders", created_at, &orders_batch)?;
775
776        let fills_batch = fills_to_batch(&[])?;
777        write_partition(root, "fills", created_at, &fills_batch)?;
778
779        let ticks_batch = ticks_to_batch(&[])?;
780        write_partition(root, "ticks", created_at, &ticks_batch)?;
781
782        let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
783        let report = analyze_execution(&ExecutionAnalysisRequest {
784            data_dir: root.into(),
785            start: Some(start),
786            end: None,
787        })?;
788        assert_eq!(report.totals.order_count, 0);
789        assert_eq!(report.skipped_orders, 0);
790        Ok(())
791    }
792
793    fn write_partition(
794        root: &Path,
795        kind: &str,
796        timestamp: DateTime<Utc>,
797        batch: &RecordBatch,
798    ) -> Result<()> {
799        let day = timestamp.date_naive().to_string();
800        let dir = root.join(kind).join(day);
801        std::fs::create_dir_all(&dir)
802            .with_context(|| format!("failed to create {}", dir.display()))?;
803        let path = dir.join("part-000.parquet");
804        let file =
805            File::create(&path).with_context(|| format!("failed to create {}", path.display()))?;
806        let props = WriterProperties::builder().build();
807        let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
808        writer.write(batch)?;
809        writer.close().map(|_| ()).map_err(Into::into)
810    }
811}