1use crate::{BrokerClient, OrderResponse, OrderStatus, Result};
10use chrono::{DateTime, Utc};
11use dashmap::DashMap;
12use nt_core::types::Symbol;
13use rust_decimal::Decimal;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tracing::{debug, error, warn};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct ReconciliationResult {
22 pub order_id: String,
23 pub status: ReconciliationStatus,
24 pub discrepancies: Vec<Discrepancy>,
25 pub checked_at: DateTime<Utc>,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(rename_all = "lowercase")]
31pub enum ReconciliationStatus {
32 Matched,
34 Warning,
36 Error,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub enum Discrepancy {
43 QuantityMismatch {
45 expected: u32,
46 actual: u32,
47 difference: i32,
48 },
49 PriceDeviation {
51 expected: Decimal,
52 actual: Decimal,
53 deviation_pct: Decimal,
54 },
55 StatusMismatch {
57 expected: OrderStatus,
58 actual: OrderStatus,
59 },
60 TimingAnomaly { message: String },
62}
63
64#[derive(Debug, Clone)]
66struct ExpectedOrder {
67 order_id: String,
68 symbol: Symbol,
69 expected_qty: u32,
70 expected_price_range: Option<(Decimal, Decimal)>,
71 placed_at: DateTime<Utc>,
72}
73
74pub struct FillReconciler {
76 expected_orders: Arc<DashMap<String, ExpectedOrder>>,
77 max_price_deviation: Decimal,
79}
80
81impl FillReconciler {
82 pub fn new(max_price_deviation: Decimal) -> Self {
84 Self {
85 expected_orders: Arc::new(DashMap::new()),
86 max_price_deviation,
87 }
88 }
89
90 pub fn register_order(
92 &self,
93 order_id: String,
94 symbol: Symbol,
95 expected_qty: u32,
96 expected_price_range: Option<(Decimal, Decimal)>,
97 ) {
98 debug!("Registering order for reconciliation: {}", order_id);
99
100 self.expected_orders.insert(
101 order_id.clone(),
102 ExpectedOrder {
103 order_id,
104 symbol,
105 expected_qty,
106 expected_price_range,
107 placed_at: Utc::now(),
108 },
109 );
110 }
111
112 pub async fn reconcile(
114 &self,
115 order_id: &str,
116 broker: &dyn BrokerClient,
117 ) -> Result<ReconciliationResult> {
118 debug!("Reconciling order: {}", order_id);
119
120 let expected = match self.expected_orders.get(order_id) {
122 Some(order) => order.clone(),
123 None => {
124 warn!("No expected order found for reconciliation: {}", order_id);
125 return Ok(ReconciliationResult {
126 order_id: order_id.to_string(),
127 status: ReconciliationStatus::Warning,
128 discrepancies: vec![],
129 checked_at: Utc::now(),
130 });
131 }
132 };
133
134 let actual = broker.get_order(order_id).await?;
136
137 let mut discrepancies = Vec::new();
139
140 if actual.status == OrderStatus::Filled {
142 if actual.filled_qty != expected.expected_qty {
143 let difference = actual.filled_qty as i32 - expected.expected_qty as i32;
144 discrepancies.push(Discrepancy::QuantityMismatch {
145 expected: expected.expected_qty,
146 actual: actual.filled_qty,
147 difference,
148 });
149 }
150 } else if actual.status == OrderStatus::PartiallyFilled {
151 if actual.filled_qty < expected.expected_qty {
152 let difference = actual.filled_qty as i32 - expected.expected_qty as i32;
153 discrepancies.push(Discrepancy::QuantityMismatch {
154 expected: expected.expected_qty,
155 actual: actual.filled_qty,
156 difference,
157 });
158 }
159 }
160
161 if let Some(filled_price) = actual.filled_avg_price {
163 if let Some((min_price, max_price)) = expected.expected_price_range {
164 if filled_price < min_price || filled_price > max_price {
165 let mid_price = (min_price + max_price) / Decimal::from(2);
166 let deviation = ((filled_price - mid_price).abs() / mid_price)
167 * Decimal::from(100);
168
169 discrepancies.push(Discrepancy::PriceDeviation {
170 expected: mid_price,
171 actual: filled_price,
172 deviation_pct: deviation,
173 });
174 }
175 }
176 }
177
178 if let Some(filled_at) = actual.filled_at {
180 let execution_time = (filled_at - expected.placed_at)
181 .num_milliseconds()
182 .abs();
183
184 if execution_time > 60000 {
185 discrepancies.push(Discrepancy::TimingAnomaly {
187 message: format!(
188 "Order took {} seconds to fill",
189 execution_time / 1000
190 ),
191 });
192 }
193 }
194
195 let status = if discrepancies.is_empty() {
197 ReconciliationStatus::Matched
198 } else if discrepancies.iter().any(|d| matches!(d, Discrepancy::PriceDeviation { deviation_pct, .. } if *deviation_pct > self.max_price_deviation * Decimal::from(100))) {
199 error!("Significant price deviation detected for order {}", order_id);
200 ReconciliationStatus::Error
201 } else {
202 ReconciliationStatus::Warning
203 };
204
205 let result = ReconciliationResult {
206 order_id: order_id.to_string(),
207 status,
208 discrepancies,
209 checked_at: Utc::now(),
210 };
211
212 match result.status {
214 ReconciliationStatus::Matched => {
215 debug!("Order {} reconciled successfully", order_id);
216 }
217 ReconciliationStatus::Warning => {
218 warn!("Order {} reconciliation warnings: {:?}", order_id, result.discrepancies);
219 }
220 ReconciliationStatus::Error => {
221 error!("Order {} reconciliation errors: {:?}", order_id, result.discrepancies);
222 }
223 }
224
225 self.expected_orders.remove(order_id);
227
228 Ok(result)
229 }
230
231 pub async fn reconcile_all(
233 &self,
234 broker: &dyn BrokerClient,
235 ) -> Result<Vec<ReconciliationResult>> {
236 let order_ids: Vec<String> = self
237 .expected_orders
238 .iter()
239 .map(|entry| entry.key().clone())
240 .collect();
241
242 let mut results = Vec::new();
243
244 for order_id in order_ids {
245 match self.reconcile(&order_id, broker).await {
246 Ok(result) => results.push(result),
247 Err(e) => {
248 error!("Failed to reconcile order {}: {}", order_id, e);
249 }
250 }
251 }
252
253 Ok(results)
254 }
255
256 pub fn pending_count(&self) -> usize {
258 self.expected_orders.len()
259 }
260
261 pub fn clear(&self) {
263 self.expected_orders.clear();
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn test_reconciliation_status() {
273 let result = ReconciliationResult {
274 order_id: "test123".to_string(),
275 status: ReconciliationStatus::Matched,
276 discrepancies: vec![],
277 checked_at: Utc::now(),
278 };
279
280 assert_eq!(result.status, ReconciliationStatus::Matched);
281 assert!(result.discrepancies.is_empty());
282 }
283
284 #[test]
285 fn test_quantity_discrepancy() {
286 let discrepancy = Discrepancy::QuantityMismatch {
287 expected: 100,
288 actual: 95,
289 difference: -5,
290 };
291
292 match discrepancy {
293 Discrepancy::QuantityMismatch {
294 expected,
295 actual,
296 difference,
297 } => {
298 assert_eq!(expected, 100);
299 assert_eq!(actual, 95);
300 assert_eq!(difference, -5);
301 }
302 _ => panic!("Wrong discrepancy type"),
303 }
304 }
305}