nt_execution/
fill_reconciliation.rs

1// Fill reconciliation and matching
2//
3// Features:
4// - Track expected fills vs actual fills
5// - Detect fill price anomalies
6// - Handle partial fills
7// - Reconciliation reports
8
9use crate::{BrokerClient, OrderResponse, OrderStatus, Result};
10use chrono::{DateTime, Utc};
11use dashmap::DashMap;
12use nt_core::types::Symbol;
13use rust_decimal::Decimal;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tracing::{debug, error, warn};
18
19/// Fill reconciliation result
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct ReconciliationResult {
22    pub order_id: String,
23    pub status: ReconciliationStatus,
24    pub discrepancies: Vec<Discrepancy>,
25    pub checked_at: DateTime<Utc>,
26}
27
28/// Reconciliation status
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(rename_all = "lowercase")]
31pub enum ReconciliationStatus {
32    /// Fill matches expected
33    Matched,
34    /// Minor discrepancy within tolerance
35    Warning,
36    /// Significant discrepancy requiring attention
37    Error,
38}
39
40/// Discrepancy type
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub enum Discrepancy {
43    /// Quantity mismatch
44    QuantityMismatch {
45        expected: u32,
46        actual: u32,
47        difference: i32,
48    },
49    /// Price deviation
50    PriceDeviation {
51        expected: Decimal,
52        actual: Decimal,
53        deviation_pct: Decimal,
54    },
55    /// Unexpected order status
56    StatusMismatch {
57        expected: OrderStatus,
58        actual: OrderStatus,
59    },
60    /// Timing issue
61    TimingAnomaly { message: String },
62}
63
64/// Expected order information for reconciliation
65#[derive(Debug, Clone)]
66struct ExpectedOrder {
67    order_id: String,
68    symbol: Symbol,
69    expected_qty: u32,
70    expected_price_range: Option<(Decimal, Decimal)>,
71    placed_at: DateTime<Utc>,
72}
73
74/// Fill reconciler
75pub struct FillReconciler {
76    expected_orders: Arc<DashMap<String, ExpectedOrder>>,
77    /// Maximum allowed price deviation percentage (e.g., 0.01 = 1%)
78    max_price_deviation: Decimal,
79}
80
81impl FillReconciler {
82    /// Create a new fill reconciler
83    pub fn new(max_price_deviation: Decimal) -> Self {
84        Self {
85            expected_orders: Arc::new(DashMap::new()),
86            max_price_deviation,
87        }
88    }
89
90    /// Register an order for reconciliation
91    pub fn register_order(
92        &self,
93        order_id: String,
94        symbol: Symbol,
95        expected_qty: u32,
96        expected_price_range: Option<(Decimal, Decimal)>,
97    ) {
98        debug!("Registering order for reconciliation: {}", order_id);
99
100        self.expected_orders.insert(
101            order_id.clone(),
102            ExpectedOrder {
103                order_id,
104                symbol,
105                expected_qty,
106                expected_price_range,
107                placed_at: Utc::now(),
108            },
109        );
110    }
111
112    /// Reconcile an order against actual fill
113    pub async fn reconcile(
114        &self,
115        order_id: &str,
116        broker: &dyn BrokerClient,
117    ) -> Result<ReconciliationResult> {
118        debug!("Reconciling order: {}", order_id);
119
120        // Get expected order
121        let expected = match self.expected_orders.get(order_id) {
122            Some(order) => order.clone(),
123            None => {
124                warn!("No expected order found for reconciliation: {}", order_id);
125                return Ok(ReconciliationResult {
126                    order_id: order_id.to_string(),
127                    status: ReconciliationStatus::Warning,
128                    discrepancies: vec![],
129                    checked_at: Utc::now(),
130                });
131            }
132        };
133
134        // Get actual order from broker
135        let actual = broker.get_order(order_id).await?;
136
137        // Perform checks
138        let mut discrepancies = Vec::new();
139
140        // Check quantity
141        if actual.status == OrderStatus::Filled {
142            if actual.filled_qty != expected.expected_qty {
143                let difference = actual.filled_qty as i32 - expected.expected_qty as i32;
144                discrepancies.push(Discrepancy::QuantityMismatch {
145                    expected: expected.expected_qty,
146                    actual: actual.filled_qty,
147                    difference,
148                });
149            }
150        } else if actual.status == OrderStatus::PartiallyFilled {
151            if actual.filled_qty < expected.expected_qty {
152                let difference = actual.filled_qty as i32 - expected.expected_qty as i32;
153                discrepancies.push(Discrepancy::QuantityMismatch {
154                    expected: expected.expected_qty,
155                    actual: actual.filled_qty,
156                    difference,
157                });
158            }
159        }
160
161        // Check fill price
162        if let Some(filled_price) = actual.filled_avg_price {
163            if let Some((min_price, max_price)) = expected.expected_price_range {
164                if filled_price < min_price || filled_price > max_price {
165                    let mid_price = (min_price + max_price) / Decimal::from(2);
166                    let deviation = ((filled_price - mid_price).abs() / mid_price)
167                        * Decimal::from(100);
168
169                    discrepancies.push(Discrepancy::PriceDeviation {
170                        expected: mid_price,
171                        actual: filled_price,
172                        deviation_pct: deviation,
173                    });
174                }
175            }
176        }
177
178        // Check timing (warn if fill took too long)
179        if let Some(filled_at) = actual.filled_at {
180            let execution_time = (filled_at - expected.placed_at)
181                .num_milliseconds()
182                .abs();
183
184            if execution_time > 60000 {
185                // > 1 minute
186                discrepancies.push(Discrepancy::TimingAnomaly {
187                    message: format!(
188                        "Order took {} seconds to fill",
189                        execution_time / 1000
190                    ),
191                });
192            }
193        }
194
195        // Determine overall status
196        let status = if discrepancies.is_empty() {
197            ReconciliationStatus::Matched
198        } else if discrepancies.iter().any(|d| matches!(d, Discrepancy::PriceDeviation { deviation_pct, .. } if *deviation_pct > self.max_price_deviation * Decimal::from(100))) {
199            error!("Significant price deviation detected for order {}", order_id);
200            ReconciliationStatus::Error
201        } else {
202            ReconciliationStatus::Warning
203        };
204
205        let result = ReconciliationResult {
206            order_id: order_id.to_string(),
207            status,
208            discrepancies,
209            checked_at: Utc::now(),
210        };
211
212        // Log results
213        match result.status {
214            ReconciliationStatus::Matched => {
215                debug!("Order {} reconciled successfully", order_id);
216            }
217            ReconciliationStatus::Warning => {
218                warn!("Order {} reconciliation warnings: {:?}", order_id, result.discrepancies);
219            }
220            ReconciliationStatus::Error => {
221                error!("Order {} reconciliation errors: {:?}", order_id, result.discrepancies);
222            }
223        }
224
225        // Remove from expected orders
226        self.expected_orders.remove(order_id);
227
228        Ok(result)
229    }
230
231    /// Reconcile all pending orders
232    pub async fn reconcile_all(
233        &self,
234        broker: &dyn BrokerClient,
235    ) -> Result<Vec<ReconciliationResult>> {
236        let order_ids: Vec<String> = self
237            .expected_orders
238            .iter()
239            .map(|entry| entry.key().clone())
240            .collect();
241
242        let mut results = Vec::new();
243
244        for order_id in order_ids {
245            match self.reconcile(&order_id, broker).await {
246                Ok(result) => results.push(result),
247                Err(e) => {
248                    error!("Failed to reconcile order {}: {}", order_id, e);
249                }
250            }
251        }
252
253        Ok(results)
254    }
255
256    /// Get number of pending reconciliations
257    pub fn pending_count(&self) -> usize {
258        self.expected_orders.len()
259    }
260
261    /// Clear all expected orders
262    pub fn clear(&self) {
263        self.expected_orders.clear();
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_reconciliation_status() {
273        let result = ReconciliationResult {
274            order_id: "test123".to_string(),
275            status: ReconciliationStatus::Matched,
276            discrepancies: vec![],
277            checked_at: Utc::now(),
278        };
279
280        assert_eq!(result.status, ReconciliationStatus::Matched);
281        assert!(result.discrepancies.is_empty());
282    }
283
284    #[test]
285    fn test_quantity_discrepancy() {
286        let discrepancy = Discrepancy::QuantityMismatch {
287            expected: 100,
288            actual: 95,
289            difference: -5,
290        };
291
292        match discrepancy {
293            Discrepancy::QuantityMismatch {
294                expected,
295                actual,
296                difference,
297            } => {
298                assert_eq!(expected, 100);
299                assert_eq!(actual, 95);
300                assert_eq!(difference, -5);
301            }
302            _ => panic!("Wrong discrepancy type"),
303        }
304    }
305}