use dashmap::DashMap;
use pricelevel::Id;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[inline]
fn nanos_since_epoch() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
.unwrap_or(0)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[repr(u8)]
pub enum CancelReason {
UserRequested,
SelfTradePrevention,
TimeInForceExpired,
MassCancelAll,
MassCancelBySide,
MassCancelByUser,
MassCancelByPriceRange,
InsufficientLiquidity,
}
impl std::fmt::Display for CancelReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UserRequested => write!(f, "user requested"),
Self::SelfTradePrevention => write!(f, "self-trade prevention"),
Self::TimeInForceExpired => write!(f, "time-in-force expired"),
Self::MassCancelAll => write!(f, "mass cancel all"),
Self::MassCancelBySide => write!(f, "mass cancel by side"),
Self::MassCancelByUser => write!(f, "mass cancel by user"),
Self::MassCancelByPriceRange => write!(f, "mass cancel by price range"),
Self::InsufficientLiquidity => write!(f, "insufficient liquidity"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum OrderStatus {
Open,
PartiallyFilled {
original_quantity: u64,
filled_quantity: u64,
},
Filled {
filled_quantity: u64,
},
Cancelled {
filled_quantity: u64,
reason: CancelReason,
},
Rejected {
reason: String,
},
}
impl OrderStatus {
#[must_use]
#[inline]
pub fn is_terminal(&self) -> bool {
matches!(
self,
OrderStatus::Filled { .. }
| OrderStatus::Cancelled { .. }
| OrderStatus::Rejected { .. }
)
}
#[must_use]
#[inline]
pub fn is_active(&self) -> bool {
matches!(
self,
OrderStatus::Open | OrderStatus::PartiallyFilled { .. }
)
}
#[must_use]
#[inline]
pub fn filled_quantity(&self) -> u64 {
match self {
OrderStatus::Open => 0,
OrderStatus::PartiallyFilled {
filled_quantity, ..
} => *filled_quantity,
OrderStatus::Filled { filled_quantity } => *filled_quantity,
OrderStatus::Cancelled {
filled_quantity, ..
} => *filled_quantity,
OrderStatus::Rejected { .. } => 0,
}
}
}
impl std::fmt::Display for OrderStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OrderStatus::Open => write!(f, "Open"),
OrderStatus::PartiallyFilled {
original_quantity,
filled_quantity,
} => write!(f, "PartiallyFilled({filled_quantity}/{original_quantity})"),
OrderStatus::Filled { filled_quantity } => {
write!(f, "Filled({filled_quantity})")
}
OrderStatus::Cancelled {
filled_quantity,
reason,
} => write!(f, "Cancelled({reason}, filled={filled_quantity})"),
OrderStatus::Rejected { reason } => write!(f, "Rejected({reason})"),
}
}
}
pub type OrderStateListener = Arc<dyn Fn(Id, &OrderStatus, &OrderStatus) + Send + Sync>;
const DEFAULT_RETENTION_CAPACITY: usize = 10_000;
pub struct OrderStateTracker {
states: DashMap<Id, OrderStatus>,
history: DashMap<Id, Vec<(u64, OrderStatus)>>,
terminal_queue: Mutex<VecDeque<Id>>,
retention_capacity: usize,
listener: Option<OrderStateListener>,
}
impl std::fmt::Debug for OrderStateTracker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OrderStateTracker")
.field("tracked_orders", &self.states.len())
.field("retention_capacity", &self.retention_capacity)
.field("has_listener", &self.listener.is_some())
.finish()
}
}
impl Default for OrderStateTracker {
fn default() -> Self {
Self::new()
}
}
impl OrderStateTracker {
#[must_use]
pub fn new() -> Self {
Self {
states: DashMap::new(),
history: DashMap::new(),
terminal_queue: Mutex::new(VecDeque::new()),
retention_capacity: DEFAULT_RETENTION_CAPACITY,
listener: None,
}
}
#[must_use]
pub fn with_capacity(retention_capacity: usize) -> Self {
Self {
states: DashMap::new(),
history: DashMap::new(),
terminal_queue: Mutex::new(VecDeque::new()),
retention_capacity,
listener: None,
}
}
pub fn set_listener(&mut self, listener: OrderStateListener) {
self.listener = Some(listener);
}
#[must_use]
pub fn get(&self, order_id: Id) -> Option<OrderStatus> {
self.states
.get(&order_id)
.map(|entry| entry.value().clone())
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.states.len()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.states.is_empty()
}
pub fn transition(&self, order_id: Id, new_status: OrderStatus) {
let old_status = self
.states
.get(&order_id)
.map(|entry| entry.value().clone());
self.states.insert(order_id, new_status.clone());
let ts = nanos_since_epoch();
self.history
.entry(order_id)
.or_default()
.push((ts, new_status.clone()));
if let Some(ref listener) = self.listener {
let old = old_status.as_ref().unwrap_or(&new_status);
listener(order_id, old, &new_status);
}
if new_status.is_terminal() {
self.enqueue_terminal(order_id);
}
}
fn enqueue_terminal(&self, order_id: Id) {
if let Ok(mut queue) = self.terminal_queue.lock() {
queue.push_back(order_id);
while queue.len() > self.retention_capacity {
if let Some(evicted_id) = queue.pop_front() {
if let Some(entry) = self.states.get(&evicted_id)
&& entry.value().is_terminal()
{
drop(entry);
self.states.remove(&evicted_id);
self.history.remove(&evicted_id);
}
}
}
}
}
#[must_use]
pub fn get_history(&self, order_id: Id) -> Option<Vec<(u64, OrderStatus)>> {
self.history
.get(&order_id)
.map(|entry| entry.value().clone())
}
#[must_use]
pub fn active_count(&self) -> usize {
self.states.iter().filter(|e| e.value().is_active()).count()
}
#[must_use]
pub fn terminal_count(&self) -> usize {
self.states
.iter()
.filter(|e| e.value().is_terminal())
.count()
}
pub fn purge_terminal_older_than(&self, older_than: Duration) -> usize {
let cutoff = nanos_since_epoch()
.saturating_sub(u64::try_from(older_than.as_nanos()).unwrap_or(u64::MAX));
let mut purged = 0usize;
let to_remove: Vec<Id> = self
.states
.iter()
.filter_map(|entry| {
let id = *entry.key();
let status = entry.value();
if !status.is_terminal() {
return None;
}
let is_old = self
.history
.get(&id)
.and_then(|h| h.value().last().map(|(ts, _)| *ts < cutoff))
.unwrap_or(false);
if is_old { Some(id) } else { None }
})
.collect();
for id in to_remove {
self.states.remove(&id);
self.history.remove(&id);
purged = purged.saturating_add(1);
}
purged
}
pub fn clear(&self) {
self.states.clear();
self.history.clear();
if let Ok(mut queue) = self.terminal_queue.lock() {
queue.clear();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cancel_reason_display() {
assert_eq!(CancelReason::UserRequested.to_string(), "user requested");
assert_eq!(
CancelReason::SelfTradePrevention.to_string(),
"self-trade prevention"
);
assert_eq!(
CancelReason::InsufficientLiquidity.to_string(),
"insufficient liquidity"
);
assert_eq!(CancelReason::MassCancelAll.to_string(), "mass cancel all");
assert_eq!(
CancelReason::MassCancelBySide.to_string(),
"mass cancel by side"
);
assert_eq!(
CancelReason::MassCancelByUser.to_string(),
"mass cancel by user"
);
assert_eq!(
CancelReason::MassCancelByPriceRange.to_string(),
"mass cancel by price range"
);
assert_eq!(
CancelReason::TimeInForceExpired.to_string(),
"time-in-force expired"
);
}
#[test]
fn test_order_status_is_terminal() {
assert!(!OrderStatus::Open.is_terminal());
assert!(
!OrderStatus::PartiallyFilled {
original_quantity: 100,
filled_quantity: 50
}
.is_terminal()
);
assert!(
OrderStatus::Filled {
filled_quantity: 100
}
.is_terminal()
);
assert!(
OrderStatus::Cancelled {
filled_quantity: 0,
reason: CancelReason::UserRequested
}
.is_terminal()
);
assert!(
OrderStatus::Rejected {
reason: "test".to_string()
}
.is_terminal()
);
}
#[test]
fn test_order_status_is_active() {
assert!(OrderStatus::Open.is_active());
assert!(
OrderStatus::PartiallyFilled {
original_quantity: 100,
filled_quantity: 50
}
.is_active()
);
assert!(
!OrderStatus::Filled {
filled_quantity: 100
}
.is_active()
);
assert!(
!OrderStatus::Cancelled {
filled_quantity: 0,
reason: CancelReason::UserRequested
}
.is_active()
);
}
#[test]
fn test_order_status_filled_quantity() {
assert_eq!(OrderStatus::Open.filled_quantity(), 0);
assert_eq!(
OrderStatus::PartiallyFilled {
original_quantity: 100,
filled_quantity: 30
}
.filled_quantity(),
30
);
assert_eq!(
OrderStatus::Filled {
filled_quantity: 100
}
.filled_quantity(),
100
);
assert_eq!(
OrderStatus::Cancelled {
filled_quantity: 20,
reason: CancelReason::UserRequested
}
.filled_quantity(),
20
);
assert_eq!(
OrderStatus::Rejected {
reason: "bad".to_string()
}
.filled_quantity(),
0
);
}
#[test]
fn test_order_status_display() {
assert_eq!(OrderStatus::Open.to_string(), "Open");
assert_eq!(
OrderStatus::PartiallyFilled {
original_quantity: 100,
filled_quantity: 30
}
.to_string(),
"PartiallyFilled(30/100)"
);
assert_eq!(
OrderStatus::Filled {
filled_quantity: 100
}
.to_string(),
"Filled(100)"
);
assert_eq!(
OrderStatus::Cancelled {
filled_quantity: 0,
reason: CancelReason::UserRequested
}
.to_string(),
"Cancelled(user requested, filled=0)"
);
assert_eq!(
OrderStatus::Rejected {
reason: "bad tick".to_string()
}
.to_string(),
"Rejected(bad tick)"
);
}
#[test]
fn test_tracker_new_is_empty() {
let tracker = OrderStateTracker::new();
assert!(tracker.is_empty());
assert_eq!(tracker.len(), 0);
}
#[test]
fn test_tracker_transition_and_get() {
let tracker = OrderStateTracker::new();
let id = Id::new_uuid();
tracker.transition(id, OrderStatus::Open);
let status = tracker.get(id);
assert!(status.is_some());
assert_eq!(status, Some(OrderStatus::Open));
assert_eq!(tracker.len(), 1);
}
#[test]
fn test_tracker_lifecycle_open_to_filled() {
let tracker = OrderStateTracker::new();
let id = Id::new_uuid();
tracker.transition(id, OrderStatus::Open);
tracker.transition(
id,
OrderStatus::PartiallyFilled {
original_quantity: 100,
filled_quantity: 50,
},
);
tracker.transition(
id,
OrderStatus::Filled {
filled_quantity: 100,
},
);
let status = tracker.get(id);
assert_eq!(
status,
Some(OrderStatus::Filled {
filled_quantity: 100
})
);
}
#[test]
fn test_tracker_lifecycle_open_to_cancelled() {
let tracker = OrderStateTracker::new();
let id = Id::new_uuid();
tracker.transition(id, OrderStatus::Open);
tracker.transition(
id,
OrderStatus::Cancelled {
filled_quantity: 0,
reason: CancelReason::UserRequested,
},
);
let status = tracker.get(id);
assert!(matches!(status, Some(OrderStatus::Cancelled { .. })));
}
#[test]
fn test_tracker_rejected_order() {
let tracker = OrderStateTracker::new();
let id = Id::new_uuid();
tracker.transition(
id,
OrderStatus::Rejected {
reason: "invalid tick size".to_string(),
},
);
let status = tracker.get(id);
assert!(matches!(status, Some(OrderStatus::Rejected { .. })));
}
#[test]
fn test_tracker_unknown_order_returns_none() {
let tracker = OrderStateTracker::new();
assert!(tracker.get(Id::new_uuid()).is_none());
}
#[test]
fn test_tracker_retention_evicts_oldest() {
let tracker = OrderStateTracker::with_capacity(3);
for _ in 0..5 {
let id = Id::new_uuid();
tracker.transition(
id,
OrderStatus::Filled {
filled_quantity: 100,
},
);
}
assert!(tracker.len() <= 3);
}
#[test]
fn test_tracker_active_orders_not_evicted() {
let tracker = OrderStateTracker::with_capacity(2);
let active_id = Id::new_uuid();
tracker.transition(active_id, OrderStatus::Open);
for _ in 0..5 {
let id = Id::new_uuid();
tracker.transition(
id,
OrderStatus::Cancelled {
filled_quantity: 0,
reason: CancelReason::MassCancelAll,
},
);
}
assert_eq!(tracker.get(active_id), Some(OrderStatus::Open));
}
#[test]
fn test_tracker_listener_fires_on_transition() {
let mut tracker = OrderStateTracker::new();
let transitions = Arc::new(Mutex::new(Vec::new()));
let transitions_clone = Arc::clone(&transitions);
tracker.set_listener(Arc::new(move |id, old, new| {
if let Ok(mut t) = transitions_clone.lock() {
t.push((id, old.clone(), new.clone()));
}
}));
let id = Id::new_uuid();
tracker.transition(id, OrderStatus::Open);
tracker.transition(
id,
OrderStatus::Filled {
filled_quantity: 50,
},
);
let t = transitions.lock();
assert!(t.is_ok());
let t = t.unwrap_or_else(|_| panic!("lock"));
assert_eq!(t.len(), 2);
assert_eq!(t[0].1, OrderStatus::Open);
assert_eq!(t[0].2, OrderStatus::Open);
assert_eq!(t[1].1, OrderStatus::Open);
assert_eq!(
t[1].2,
OrderStatus::Filled {
filled_quantity: 50
}
);
}
#[test]
fn test_tracker_clear() {
let tracker = OrderStateTracker::new();
let id = Id::new_uuid();
tracker.transition(id, OrderStatus::Open);
assert!(!tracker.is_empty());
tracker.clear();
assert!(tracker.is_empty());
assert!(tracker.get(id).is_none());
}
#[test]
fn test_tracker_concurrent_access() {
use std::thread;
let tracker = Arc::new(OrderStateTracker::new());
let mut handles = Vec::new();
for _ in 0..10 {
let t = Arc::clone(&tracker);
let handle = thread::spawn(move || {
for _ in 0..100 {
let id = Id::new_uuid();
t.transition(id, OrderStatus::Open);
t.transition(
id,
OrderStatus::Filled {
filled_quantity: 100,
},
);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("thread panicked");
}
assert_eq!(tracker.len(), 1000);
}
#[test]
fn test_order_status_serde_roundtrip() {
let statuses = vec![
OrderStatus::Open,
OrderStatus::PartiallyFilled {
original_quantity: 100,
filled_quantity: 30,
},
OrderStatus::Filled {
filled_quantity: 100,
},
OrderStatus::Cancelled {
filled_quantity: 10,
reason: CancelReason::SelfTradePrevention,
},
OrderStatus::Rejected {
reason: "test".to_string(),
},
];
for status in &statuses {
let json = serde_json::to_string(status);
assert!(json.is_ok());
let decoded: Result<OrderStatus, _> = serde_json::from_str(&json.unwrap_or_default());
assert!(decoded.is_ok());
assert_eq!(&decoded.unwrap_or(OrderStatus::Open), status);
}
}
#[test]
fn test_cancel_reason_serde_roundtrip() {
let reasons = vec![
CancelReason::UserRequested,
CancelReason::SelfTradePrevention,
CancelReason::TimeInForceExpired,
CancelReason::MassCancelAll,
CancelReason::MassCancelBySide,
CancelReason::MassCancelByUser,
CancelReason::MassCancelByPriceRange,
CancelReason::InsufficientLiquidity,
];
for reason in &reasons {
let json = serde_json::to_string(reason);
assert!(json.is_ok());
let decoded: Result<CancelReason, _> = serde_json::from_str(&json.unwrap_or_default());
assert!(decoded.is_ok());
assert_eq!(&decoded.unwrap_or(CancelReason::UserRequested), reason);
}
}
}