use crate::{BrokerClient, OrderResponse, OrderStatus, Result};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use nt_core::types::Symbol;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, error, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReconciliationResult {
pub order_id: String,
pub status: ReconciliationStatus,
pub discrepancies: Vec<Discrepancy>,
pub checked_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ReconciliationStatus {
Matched,
Warning,
Error,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Discrepancy {
QuantityMismatch {
expected: u32,
actual: u32,
difference: i32,
},
PriceDeviation {
expected: Decimal,
actual: Decimal,
deviation_pct: Decimal,
},
StatusMismatch {
expected: OrderStatus,
actual: OrderStatus,
},
TimingAnomaly { message: String },
}
#[derive(Debug, Clone)]
struct ExpectedOrder {
order_id: String,
symbol: Symbol,
expected_qty: u32,
expected_price_range: Option<(Decimal, Decimal)>,
placed_at: DateTime<Utc>,
}
pub struct FillReconciler {
expected_orders: Arc<DashMap<String, ExpectedOrder>>,
max_price_deviation: Decimal,
}
impl FillReconciler {
pub fn new(max_price_deviation: Decimal) -> Self {
Self {
expected_orders: Arc::new(DashMap::new()),
max_price_deviation,
}
}
pub fn register_order(
&self,
order_id: String,
symbol: Symbol,
expected_qty: u32,
expected_price_range: Option<(Decimal, Decimal)>,
) {
debug!("Registering order for reconciliation: {}", order_id);
self.expected_orders.insert(
order_id.clone(),
ExpectedOrder {
order_id,
symbol,
expected_qty,
expected_price_range,
placed_at: Utc::now(),
},
);
}
pub async fn reconcile(
&self,
order_id: &str,
broker: &dyn BrokerClient,
) -> Result<ReconciliationResult> {
debug!("Reconciling order: {}", order_id);
let expected = match self.expected_orders.get(order_id) {
Some(order) => order.clone(),
None => {
warn!("No expected order found for reconciliation: {}", order_id);
return Ok(ReconciliationResult {
order_id: order_id.to_string(),
status: ReconciliationStatus::Warning,
discrepancies: vec![],
checked_at: Utc::now(),
});
}
};
let actual = broker.get_order(order_id).await?;
let mut discrepancies = Vec::new();
if actual.status == OrderStatus::Filled {
if actual.filled_qty != expected.expected_qty {
let difference = actual.filled_qty as i32 - expected.expected_qty as i32;
discrepancies.push(Discrepancy::QuantityMismatch {
expected: expected.expected_qty,
actual: actual.filled_qty,
difference,
});
}
} else if actual.status == OrderStatus::PartiallyFilled {
if actual.filled_qty < expected.expected_qty {
let difference = actual.filled_qty as i32 - expected.expected_qty as i32;
discrepancies.push(Discrepancy::QuantityMismatch {
expected: expected.expected_qty,
actual: actual.filled_qty,
difference,
});
}
}
if let Some(filled_price) = actual.filled_avg_price {
if let Some((min_price, max_price)) = expected.expected_price_range {
if filled_price < min_price || filled_price > max_price {
let mid_price = (min_price + max_price) / Decimal::from(2);
let deviation = ((filled_price - mid_price).abs() / mid_price)
* Decimal::from(100);
discrepancies.push(Discrepancy::PriceDeviation {
expected: mid_price,
actual: filled_price,
deviation_pct: deviation,
});
}
}
}
if let Some(filled_at) = actual.filled_at {
let execution_time = (filled_at - expected.placed_at)
.num_milliseconds()
.abs();
if execution_time > 60000 {
discrepancies.push(Discrepancy::TimingAnomaly {
message: format!(
"Order took {} seconds to fill",
execution_time / 1000
),
});
}
}
let status = if discrepancies.is_empty() {
ReconciliationStatus::Matched
} else if discrepancies.iter().any(|d| matches!(d, Discrepancy::PriceDeviation { deviation_pct, .. } if *deviation_pct > self.max_price_deviation * Decimal::from(100))) {
error!("Significant price deviation detected for order {}", order_id);
ReconciliationStatus::Error
} else {
ReconciliationStatus::Warning
};
let result = ReconciliationResult {
order_id: order_id.to_string(),
status,
discrepancies,
checked_at: Utc::now(),
};
match result.status {
ReconciliationStatus::Matched => {
debug!("Order {} reconciled successfully", order_id);
}
ReconciliationStatus::Warning => {
warn!("Order {} reconciliation warnings: {:?}", order_id, result.discrepancies);
}
ReconciliationStatus::Error => {
error!("Order {} reconciliation errors: {:?}", order_id, result.discrepancies);
}
}
self.expected_orders.remove(order_id);
Ok(result)
}
pub async fn reconcile_all(
&self,
broker: &dyn BrokerClient,
) -> Result<Vec<ReconciliationResult>> {
let order_ids: Vec<String> = self
.expected_orders
.iter()
.map(|entry| entry.key().clone())
.collect();
let mut results = Vec::new();
for order_id in order_ids {
match self.reconcile(&order_id, broker).await {
Ok(result) => results.push(result),
Err(e) => {
error!("Failed to reconcile order {}: {}", order_id, e);
}
}
}
Ok(results)
}
pub fn pending_count(&self) -> usize {
self.expected_orders.len()
}
pub fn clear(&self) {
self.expected_orders.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_reconciliation_status() {
let result = ReconciliationResult {
order_id: "test123".to_string(),
status: ReconciliationStatus::Matched,
discrepancies: vec![],
checked_at: Utc::now(),
};
assert_eq!(result.status, ReconciliationStatus::Matched);
assert!(result.discrepancies.is_empty());
}
#[test]
fn test_quantity_discrepancy() {
let discrepancy = Discrepancy::QuantityMismatch {
expected: 100,
actual: 95,
difference: -5,
};
match discrepancy {
Discrepancy::QuantityMismatch {
expected,
actual,
difference,
} => {
assert_eq!(expected, 100);
assert_eq!(actual, 95);
assert_eq!(difference, -5);
}
_ => panic!("Wrong discrepancy type"),
}
}
}