tesser_data/
analytics.rs

1use std::collections::HashMap;
2use std::fs::{self, File};
3use std::path::{Path, PathBuf};
4
5use anyhow::{anyhow, bail, Context, Result};
6use arrow::array::{Array, Decimal128Array, Int8Array, StringArray, TimestampNanosecondArray};
7use arrow::datatypes::SchemaRef;
8use arrow::record_batch::RecordBatch;
9use chrono::{DateTime, Utc};
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11use rust_decimal::Decimal;
12use tesser_core::{Side, Symbol};
13
14/// Request payload for execution analytics.
15#[derive(Debug, Clone)]
16pub struct ExecutionAnalysisRequest {
17    /// Directory that contains `orders/`, `fills/` and `ticks/` sub-directories.
18    pub data_dir: PathBuf,
19    /// Optional inclusive lower bound for the analyzed window.
20    pub start: Option<DateTime<Utc>>,
21    /// Optional inclusive upper bound for the analyzed window.
22    pub end: Option<DateTime<Utc>>,
23}
24
25/// High-level metrics computed for either the entire data set or one algo bucket.
26#[derive(Debug, Clone)]
27pub struct ExecutionStats {
28    pub label: String,
29    pub order_count: usize,
30    pub fill_count: usize,
31    pub orders_with_arrival: usize,
32    pub filled_quantity: Decimal,
33    pub notional: Decimal,
34    pub total_fees: Decimal,
35    pub implementation_shortfall: Decimal,
36    pub avg_slippage_bps: Option<Decimal>,
37}
38
39impl ExecutionStats {
40    fn empty(label: impl Into<String>) -> Self {
41        Self {
42            label: label.into(),
43            order_count: 0,
44            fill_count: 0,
45            orders_with_arrival: 0,
46            filled_quantity: Decimal::ZERO,
47            notional: Decimal::ZERO,
48            total_fees: Decimal::ZERO,
49            implementation_shortfall: Decimal::ZERO,
50            avg_slippage_bps: None,
51        }
52    }
53}
54
55/// Execution analysis output, containing totals and the algo breakdown.
56#[derive(Debug, Clone)]
57pub struct ExecutionReport {
58    pub period_start: Option<DateTime<Utc>>,
59    pub period_end: Option<DateTime<Utc>>,
60    pub totals: ExecutionStats,
61    pub per_algo: Vec<ExecutionStats>,
62    pub skipped_orders: usize,
63}
64
65fn bps_factor() -> Decimal {
66    Decimal::new(10_000, 0)
67}
68
69/// Compute a [`ExecutionReport`] by scanning flight-recorder parquet files.
70pub fn analyze_execution(request: &ExecutionAnalysisRequest) -> Result<ExecutionReport> {
71    let range = TimeRange::new(request.start, request.end)?;
72    let orders_dir = request.data_dir.join("orders");
73    let fills_dir = request.data_dir.join("fills");
74    if !orders_dir.exists() {
75        bail!(
76            "orders directory missing at {}",
77            orders_dir.to_string_lossy()
78        );
79    }
80    if !fills_dir.exists() {
81        bail!("fills directory missing at {}", fills_dir.to_string_lossy());
82    }
83    let ticks_dir = request.data_dir.join("ticks");
84
85    let order_paths = collect_parquet_files(&orders_dir)?;
86    if order_paths.is_empty() {
87        bail!(
88            "no parquet files found under {}",
89            orders_dir.to_string_lossy()
90        );
91    }
92    let fill_paths = collect_parquet_files(&fills_dir)?;
93    if fill_paths.is_empty() {
94        bail!(
95            "no parquet files found under {}",
96            fills_dir.to_string_lossy()
97        );
98    }
99    let tick_paths = collect_parquet_files(&ticks_dir)?;
100
101    let orders = load_orders(&order_paths, &range)?;
102    if orders.is_empty() {
103        return Ok(ExecutionReport {
104            period_start: request.start,
105            period_end: request.end,
106            totals: ExecutionStats::empty("ALL"),
107            per_algo: Vec::new(),
108            skipped_orders: 0,
109        });
110    }
111    let fills = load_fills(&fill_paths)?;
112    let ticks = load_ticks(&tick_paths)?;
113
114    let mut fills_by_order: HashMap<String, Vec<FillRow>> = HashMap::new();
115    for fill in fills {
116        fills_by_order
117            .entry(fill.order_id.clone())
118            .or_default()
119            .push(fill);
120    }
121
122    let arrival_lookup = ArrivalLookup::new(ticks);
123    let mut aggregator = StatsAggregator::new();
124    let mut skipped = 0usize;
125
126    for order in orders {
127        let Some(fill_rows) = fills_by_order.get(&order.id) else {
128            skipped += 1;
129            continue;
130        };
131        if fill_rows.is_empty() {
132            skipped += 1;
133            continue;
134        }
135        match summarize_order(&order, fill_rows, &arrival_lookup) {
136            Some(summary) => aggregator.record(&order.algo_label, &summary),
137            None => skipped += 1,
138        }
139    }
140
141    let (totals, mut per_algo) = aggregator.finish();
142    per_algo.sort_by(|a, b| b.notional.cmp(&a.notional));
143
144    Ok(ExecutionReport {
145        period_start: request.start,
146        period_end: request.end,
147        totals,
148        per_algo,
149        skipped_orders: skipped,
150    })
151}
152
153struct StatsAggregator {
154    totals: StatsAccumulator,
155    groups: HashMap<String, StatsAccumulator>,
156}
157
158impl StatsAggregator {
159    fn new() -> Self {
160        Self {
161            totals: StatsAccumulator::new("ALL"),
162            groups: HashMap::new(),
163        }
164    }
165
166    fn record(&mut self, label: &str, summary: &OrderSummary) {
167        self.totals.ingest(summary);
168        self.groups
169            .entry(label.to_string())
170            .or_insert_with(|| StatsAccumulator::new(label))
171            .ingest(summary);
172    }
173
174    fn finish(self) -> (ExecutionStats, Vec<ExecutionStats>) {
175        let totals = self.totals.into_stats();
176        let groups = self
177            .groups
178            .into_values()
179            .map(|acc| acc.into_stats())
180            .collect();
181        (totals, groups)
182    }
183}
184
185struct StatsAccumulator {
186    stats: ExecutionStats,
187    slippage_weighted_sum: Decimal,
188    slippage_weight: Decimal,
189}
190
191impl StatsAccumulator {
192    fn new(label: &str) -> Self {
193        Self {
194            stats: ExecutionStats::empty(label),
195            slippage_weighted_sum: Decimal::ZERO,
196            slippage_weight: Decimal::ZERO,
197        }
198    }
199
200    fn ingest(&mut self, summary: &OrderSummary) {
201        self.stats.order_count += 1;
202        self.stats.fill_count += summary.fill_count;
203        if summary.has_arrival {
204            self.stats.orders_with_arrival += 1;
205        }
206        self.stats.filled_quantity += summary.filled_quantity;
207        self.stats.notional += summary.notional;
208        self.stats.total_fees += summary.total_fees;
209        self.stats.implementation_shortfall += summary.shortfall_value;
210        if let Some(bps) = summary.slippage_bps {
211            self.slippage_weighted_sum += bps * summary.filled_quantity;
212            self.slippage_weight += summary.filled_quantity;
213        }
214    }
215
216    fn into_stats(mut self) -> ExecutionStats {
217        self.stats.avg_slippage_bps = if self.slippage_weight > Decimal::ZERO {
218            Some(self.slippage_weighted_sum / self.slippage_weight)
219        } else {
220            None
221        };
222        self.stats
223    }
224}
225
226#[derive(Clone)]
227struct OrderRow {
228    id: String,
229    symbol: Symbol,
230    side: Side,
231    created_at: DateTime<Utc>,
232    algo_label: String,
233}
234
235#[derive(Clone)]
236struct FillRow {
237    order_id: String,
238    price: Decimal,
239    quantity: Decimal,
240    fee: Decimal,
241}
242
243struct OrderSummary {
244    fill_count: usize,
245    filled_quantity: Decimal,
246    notional: Decimal,
247    total_fees: Decimal,
248    slippage_bps: Option<Decimal>,
249    shortfall_value: Decimal,
250    has_arrival: bool,
251}
252
253struct ArrivalLookup {
254    ticks: HashMap<Symbol, Vec<TickPoint>>,
255}
256
257impl ArrivalLookup {
258    fn new(rows: Vec<TickPoint>) -> Self {
259        let mut ticks: HashMap<Symbol, Vec<TickPoint>> = HashMap::new();
260        for row in rows {
261            ticks.entry(row.symbol).or_default().push(row);
262        }
263        for series in ticks.values_mut() {
264            series.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
265        }
266        Self { ticks }
267    }
268
269    fn price_at(&self, symbol: &Symbol, timestamp: DateTime<Utc>) -> Option<Decimal> {
270        let series = self.ticks.get(symbol)?;
271        if series.is_empty() {
272            return None;
273        }
274        let idx = series.partition_point(|point| point.timestamp <= timestamp);
275        if idx == 0 {
276            Some(series[0].price)
277        } else {
278            Some(series[idx - 1].price)
279        }
280    }
281}
282
283#[derive(Clone)]
284struct TickPoint {
285    symbol: Symbol,
286    price: Decimal,
287    timestamp: DateTime<Utc>,
288}
289
290#[derive(Clone)]
291struct TimeRange {
292    start: Option<DateTime<Utc>>,
293    end: Option<DateTime<Utc>>,
294}
295
296impl TimeRange {
297    fn new(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> Result<Self> {
298        if let (Some(s), Some(e)) = (start, end) {
299            if e < s {
300                bail!("end time must be after start time");
301            }
302        }
303        Ok(Self { start, end })
304    }
305
306    fn contains(&self, ts: DateTime<Utc>) -> bool {
307        if let Some(start) = self.start {
308            if ts < start {
309                return false;
310            }
311        }
312        if let Some(end) = self.end {
313            if ts > end {
314                return false;
315            }
316        }
317        true
318    }
319}
320
321fn summarize_order(
322    order: &OrderRow,
323    fills: &[FillRow],
324    arrival_lookup: &ArrivalLookup,
325) -> Option<OrderSummary> {
326    if fills.is_empty() {
327        return None;
328    }
329
330    let mut filled_quantity = Decimal::ZERO;
331    let mut notional = Decimal::ZERO;
332    let mut total_fees = Decimal::ZERO;
333    for fill in fills {
334        filled_quantity += fill.quantity;
335        notional += fill.price * fill.quantity;
336        total_fees += fill.fee;
337    }
338    if filled_quantity <= Decimal::ZERO {
339        return None;
340    }
341    let avg_fill_price = notional / filled_quantity;
342    let arrival = arrival_lookup.price_at(&order.symbol, order.created_at);
343    let mut slippage_bps = None;
344    let mut shortfall_value = Decimal::ZERO;
345    if let Some(arrival_price) = arrival {
346        if arrival_price > Decimal::ZERO {
347            let price_delta = (avg_fill_price - arrival_price) * side_sign(order.side);
348            shortfall_value = price_delta * filled_quantity;
349            let ratio = price_delta / arrival_price;
350            slippage_bps = Some(ratio * bps_factor());
351        }
352    }
353    Some(OrderSummary {
354        fill_count: fills.len(),
355        filled_quantity,
356        notional,
357        total_fees,
358        slippage_bps,
359        shortfall_value,
360        has_arrival: arrival.is_some(),
361    })
362}
363
364fn side_sign(side: Side) -> Decimal {
365    match side {
366        Side::Buy => Decimal::ONE,
367        Side::Sell => -Decimal::ONE,
368    }
369}
370
371pub fn collect_parquet_files(dir: &Path) -> Result<Vec<PathBuf>> {
372    if !dir.exists() {
373        return Ok(Vec::new());
374    }
375    let mut stack = vec![dir.to_path_buf()];
376    let mut files = Vec::new();
377    while let Some(path) = stack.pop() {
378        let metadata = fs::metadata(&path)
379            .with_context(|| format!("failed to inspect {}", path.to_string_lossy()))?;
380        if metadata.is_dir() {
381            for entry in fs::read_dir(&path)
382                .with_context(|| format!("failed to list {}", path.to_string_lossy()))?
383            {
384                let entry = entry?;
385                stack.push(entry.path());
386            }
387        } else if path
388            .extension()
389            .and_then(|ext| ext.to_str())
390            .map(|ext| ext.eq_ignore_ascii_case("parquet"))
391            .unwrap_or(false)
392        {
393            files.push(path);
394        }
395    }
396    files.sort();
397    Ok(files)
398}
399
400fn load_orders(paths: &[PathBuf], range: &TimeRange) -> Result<Vec<OrderRow>> {
401    let mut rows = Vec::new();
402    for path in paths {
403        let file =
404            File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
405        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
406            .with_batch_size(4096)
407            .build()?;
408        let mut columns: Option<OrderColumns> = None;
409        for batch in reader {
410            let batch = batch?;
411            if columns.is_none() {
412                columns = Some(OrderColumns::from_schema(&batch.schema())?);
413            }
414            let columns = columns.as_ref().expect("order columns should be set");
415            for row in 0..batch.num_rows() {
416                let created_at = timestamp_value(&batch, columns.created_at, row)?;
417                if !range.contains(created_at) {
418                    continue;
419                }
420                let id = string_value(&batch, columns.id, row)?;
421                let symbol = string_value(&batch, columns.symbol, row)?;
422                let side = side_value(&batch, columns.side, row)?;
423                let client_order_id = string_option(&batch, columns.client_order_id, row)?;
424                let algo_label = infer_algo_label(client_order_id.as_deref());
425                rows.push(OrderRow {
426                    id,
427                    symbol: Symbol::from(symbol.as_str()),
428                    side,
429                    created_at,
430                    algo_label,
431                });
432            }
433        }
434    }
435    Ok(rows)
436}
437
438fn load_fills(paths: &[PathBuf]) -> Result<Vec<FillRow>> {
439    let mut rows = Vec::new();
440    for path in paths {
441        let file =
442            File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
443        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
444            .with_batch_size(4096)
445            .build()?;
446        let mut columns: Option<FillColumns> = None;
447        for batch in reader {
448            let batch = batch?;
449            if columns.is_none() {
450                columns = Some(FillColumns::from_schema(&batch.schema())?);
451            }
452            let columns = columns.as_ref().expect("fill columns set");
453            for row in 0..batch.num_rows() {
454                rows.push(FillRow {
455                    order_id: string_value(&batch, columns.order_id, row)?,
456                    price: decimal_value(&batch, columns.price, row)?,
457                    quantity: decimal_value(&batch, columns.quantity, row)?,
458                    fee: decimal_option(&batch, columns.fee, row)?.unwrap_or(Decimal::ZERO),
459                });
460            }
461        }
462    }
463    Ok(rows)
464}
465
466fn load_ticks(paths: &[PathBuf]) -> Result<Vec<TickPoint>> {
467    let mut rows = Vec::new();
468    for path in paths {
469        let file =
470            File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
471        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
472            .with_batch_size(4096)
473            .build()?;
474        let mut columns: Option<TickColumns> = None;
475        for batch in reader {
476            let batch = batch?;
477            if columns.is_none() {
478                columns = Some(TickColumns::from_schema(&batch.schema())?);
479            }
480            let columns = columns.as_ref().expect("tick columns set");
481            for row in 0..batch.num_rows() {
482                let symbol = string_value(&batch, columns.symbol, row)?;
483                rows.push(TickPoint {
484                    symbol: Symbol::from(symbol.as_str()),
485                    price: decimal_value(&batch, columns.price, row)?,
486                    timestamp: timestamp_value(&batch, columns.exchange_ts, row)?,
487                });
488            }
489        }
490    }
491    Ok(rows)
492}
493
494fn infer_algo_label(client_order_id: Option<&str>) -> String {
495    let value = client_order_id.unwrap_or("unlabeled");
496    let normalized = value.to_ascii_lowercase();
497    if normalized.starts_with("twap") {
498        "TWAP".to_string()
499    } else if normalized.starts_with("vwap") {
500        "VWAP".to_string()
501    } else if normalized.starts_with("iceberg") {
502        "ICEBERG".to_string()
503    } else if normalized.starts_with("pegged") {
504        "PEGGED".to_string()
505    } else if normalized.starts_with("sniper") {
506        "SNIPER".to_string()
507    } else if normalized.ends_with("-sl") {
508        "STOP_LOSS".to_string()
509    } else if normalized.ends_with("-tp") {
510        "TAKE_PROFIT".to_string()
511    } else {
512        "SIGNAL".to_string()
513    }
514}
515
516struct OrderColumns {
517    id: usize,
518    symbol: usize,
519    side: usize,
520    client_order_id: usize,
521    created_at: usize,
522}
523
524impl OrderColumns {
525    fn from_schema(schema: &SchemaRef) -> Result<Self> {
526        Ok(Self {
527            id: column_index(schema, "id")?,
528            symbol: column_index(schema, "symbol")?,
529            side: column_index(schema, "side")?,
530            client_order_id: column_index(schema, "client_order_id")?,
531            created_at: column_index(schema, "created_at")?,
532        })
533    }
534}
535
536struct FillColumns {
537    order_id: usize,
538    price: usize,
539    quantity: usize,
540    fee: usize,
541}
542
543impl FillColumns {
544    fn from_schema(schema: &SchemaRef) -> Result<Self> {
545        Ok(Self {
546            order_id: column_index(schema, "order_id")?,
547            price: column_index(schema, "fill_price")?,
548            quantity: column_index(schema, "fill_quantity")?,
549            fee: column_index(schema, "fee")?,
550        })
551    }
552}
553
554struct TickColumns {
555    symbol: usize,
556    price: usize,
557    exchange_ts: usize,
558}
559
560impl TickColumns {
561    fn from_schema(schema: &SchemaRef) -> Result<Self> {
562        Ok(Self {
563            symbol: column_index(schema, "symbol")?,
564            price: column_index(schema, "price")?,
565            exchange_ts: column_index(schema, "exchange_timestamp")?,
566        })
567    }
568}
569
570fn column_index(schema: &SchemaRef, name: &str) -> Result<usize> {
571    schema
572        .column_with_name(name)
573        .map(|(idx, _)| idx)
574        .ok_or_else(|| anyhow!("column '{name}' missing from parquet schema"))
575}
576
577fn as_array<T: Array + 'static>(batch: &RecordBatch, column: usize) -> Result<&T> {
578    batch
579        .column(column)
580        .as_any()
581        .downcast_ref::<T>()
582        .ok_or_else(|| anyhow!("column {column} type mismatch"))
583}
584
585fn string_value(batch: &RecordBatch, column: usize, row: usize) -> Result<String> {
586    let array = as_array::<StringArray>(batch, column)?;
587    if array.is_null(row) {
588        return Err(anyhow!("column {column} contains null string"));
589    }
590    Ok(array.value(row).to_string())
591}
592
593fn string_option(batch: &RecordBatch, column: usize, row: usize) -> Result<Option<String>> {
594    let array = as_array::<StringArray>(batch, column)?;
595    if array.is_null(row) {
596        return Ok(None);
597    }
598    Ok(Some(array.value(row).to_string()))
599}
600
601fn decimal_value(batch: &RecordBatch, column: usize, row: usize) -> Result<Decimal> {
602    let array = as_array::<Decimal128Array>(batch, column)?;
603    if array.is_null(row) {
604        return Err(anyhow!("column {column} contains null decimal"));
605    }
606    Ok(Decimal::from_i128_with_scale(
607        array.value(row),
608        array.scale() as u32,
609    ))
610}
611
612fn decimal_option(batch: &RecordBatch, column: usize, row: usize) -> Result<Option<Decimal>> {
613    let array = as_array::<Decimal128Array>(batch, column)?;
614    if array.is_null(row) {
615        return Ok(None);
616    }
617    Ok(Some(Decimal::from_i128_with_scale(
618        array.value(row),
619        array.scale() as u32,
620    )))
621}
622
623fn timestamp_value(batch: &RecordBatch, column: usize, row: usize) -> Result<DateTime<Utc>> {
624    let array = as_array::<TimestampNanosecondArray>(batch, column)?;
625    if array.is_null(row) {
626        return Err(anyhow!("column {column} contains null timestamp"));
627    }
628    let nanos = array.value(row);
629    let secs = nanos.div_euclid(1_000_000_000);
630    let sub = nanos.rem_euclid(1_000_000_000) as u32;
631    DateTime::<Utc>::from_timestamp(secs, sub)
632        .ok_or_else(|| anyhow!("timestamp overflow for value {nanos}"))
633}
634
635fn side_value(batch: &RecordBatch, column: usize, row: usize) -> Result<Side> {
636    let array = as_array::<Int8Array>(batch, column)?;
637    if array.is_null(row) {
638        return Err(anyhow!("column {column} contains null side"));
639    }
640    Ok(if array.value(row) >= 0 {
641        Side::Buy
642    } else {
643        Side::Sell
644    })
645}
646
647#[cfg(test)]
648mod tests {
649    use super::*;
650    use arrow::record_batch::RecordBatch;
651    use chrono::TimeZone;
652    use parquet::arrow::ArrowWriter;
653    use parquet::file::properties::WriterProperties;
654    use rust_decimal::prelude::FromPrimitive;
655    use tempfile::tempdir;
656    use tesser_core::{
657        Fill, Order, OrderRequest, OrderStatus, OrderType, Symbol, Tick, TimeInForce,
658    };
659
660    use crate::encoding::{fills_to_batch, orders_to_batch, ticks_to_batch};
661
662    #[test]
663    fn computes_slippage_from_mock_data() -> Result<()> {
664        let dir = tempdir()?;
665        let root = dir.path();
666        let order_id = "order-1".to_string();
667        let created_at = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
668        let order = Order {
669            id: order_id.clone(),
670            request: OrderRequest {
671                symbol: Symbol::from("BTCUSDT"),
672                side: Side::Buy,
673                order_type: OrderType::Market,
674                quantity: Decimal::from_i64(2).unwrap(),
675                price: None,
676                trigger_price: None,
677                time_in_force: Some(TimeInForce::GoodTilCanceled),
678                client_order_id: Some("twap-demo-1".to_string()),
679                take_profit: None,
680                stop_loss: None,
681                display_quantity: None,
682            },
683            status: OrderStatus::Filled,
684            filled_quantity: Decimal::from_i64(2).unwrap(),
685            avg_fill_price: None,
686            created_at,
687            updated_at: created_at,
688        };
689        let orders_batch = orders_to_batch(std::slice::from_ref(&order))?;
690        write_partition(root, "orders", created_at, &orders_batch)?;
691
692        let fill_one = Fill {
693            order_id: order_id.clone(),
694            symbol: order.request.symbol,
695            side: order.request.side,
696            fill_price: Decimal::from_f64(101.0).unwrap(),
697            fill_quantity: Decimal::ONE,
698            fee: Some(Decimal::new(1, 2)),
699            fee_asset: None,
700            timestamp: created_at,
701        };
702        let fill_two = Fill {
703            order_id: order_id.clone(),
704            symbol: order.request.symbol,
705            side: order.request.side,
706            fill_price: Decimal::from_f64(102.0).unwrap(),
707            fill_quantity: Decimal::ONE,
708            fee: Some(Decimal::new(1, 2)),
709            fee_asset: None,
710            timestamp: created_at,
711        };
712        let fills_batch = fills_to_batch(&[fill_one, fill_two])?;
713        write_partition(root, "fills", created_at, &fills_batch)?;
714
715        let tick = Tick {
716            symbol: order.request.symbol,
717            price: Decimal::from_f64(100.0).unwrap(),
718            size: Decimal::ONE,
719            side: Side::Buy,
720            exchange_timestamp: created_at,
721            received_at: created_at,
722        };
723        let ticks_batch = ticks_to_batch(std::slice::from_ref(&tick))?;
724        write_partition(root, "ticks", created_at, &ticks_batch)?;
725
726        let report = analyze_execution(&ExecutionAnalysisRequest {
727            data_dir: root.into(),
728            start: None,
729            end: None,
730        })?;
731
732        assert_eq!(report.totals.order_count, 1);
733        assert_eq!(report.totals.fill_count, 2);
734        assert_eq!(report.totals.filled_quantity, Decimal::from_i64(2).unwrap());
735        assert_eq!(report.totals.orders_with_arrival, 1);
736        assert_eq!(report.totals.total_fees, Decimal::from_f64(0.02).unwrap());
737        assert_eq!(
738            report.totals.implementation_shortfall,
739            Decimal::from_f64(3.0).unwrap()
740        );
741        let bps = report.totals.avg_slippage_bps.expect("slippage available");
742        assert_eq!(bps, Decimal::from_i64(150).unwrap());
743        let algo = report
744            .per_algo
745            .iter()
746            .find(|entry| entry.label == "TWAP")
747            .expect("twap bucket exists");
748        assert_eq!(algo.order_count, 1);
749        Ok(())
750    }
751
752    #[test]
753    fn handles_missing_orders_in_window() -> Result<()> {
754        let dir = tempdir()?;
755        let root = dir.path();
756        let created_at = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
757        let order = Order {
758            id: "order-1".to_string(),
759            request: OrderRequest {
760                symbol: Symbol::from("BTCUSDT"),
761                side: Side::Buy,
762                order_type: OrderType::Market,
763                quantity: Decimal::ONE,
764                price: None,
765                trigger_price: None,
766                time_in_force: Some(TimeInForce::FillOrKill),
767                client_order_id: Some("sniper-1".to_string()),
768                take_profit: None,
769                stop_loss: None,
770                display_quantity: None,
771            },
772            status: OrderStatus::Canceled,
773            filled_quantity: Decimal::ZERO,
774            avg_fill_price: None,
775            created_at,
776            updated_at: created_at,
777        };
778        let orders_batch = orders_to_batch(std::slice::from_ref(&order))?;
779        write_partition(root, "orders", created_at, &orders_batch)?;
780
781        let fills_batch = fills_to_batch(&[])?;
782        write_partition(root, "fills", created_at, &fills_batch)?;
783
784        let ticks_batch = ticks_to_batch(&[])?;
785        write_partition(root, "ticks", created_at, &ticks_batch)?;
786
787        let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
788        let report = analyze_execution(&ExecutionAnalysisRequest {
789            data_dir: root.into(),
790            start: Some(start),
791            end: None,
792        })?;
793        assert_eq!(report.totals.order_count, 0);
794        assert_eq!(report.skipped_orders, 0);
795        Ok(())
796    }
797
798    fn write_partition(
799        root: &Path,
800        kind: &str,
801        timestamp: DateTime<Utc>,
802        batch: &RecordBatch,
803    ) -> Result<()> {
804        let day = timestamp.date_naive().to_string();
805        let dir = root.join(kind).join(day);
806        std::fs::create_dir_all(&dir)
807            .with_context(|| format!("failed to create {}", dir.display()))?;
808        let path = dir.join("part-000.parquet");
809        let file =
810            File::create(&path).with_context(|| format!("failed to create {}", path.display()))?;
811        let props = WriterProperties::builder().build();
812        let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
813        writer.write(batch)?;
814        writer.close().map(|_| ()).map_err(Into::into)
815    }
816}