use std::sync::Mutex;
use ahash::AHashMap;
use nautilus_common::cache::fifo::FifoCache;
use nautilus_core::MUTEX_POISONED;
use nautilus_model::{
enums::{OrderSide, OrderType},
identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
};
pub const TRADE_DEDUP_CAPACITY: usize = 4_096;
pub const ORDER_DEDUP_CAPACITY: usize = 1_024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OrderIdentity {
pub instrument_id: InstrumentId,
pub strategy_id: StrategyId,
pub order_side: OrderSide,
pub order_type: OrderType,
}
#[derive(Debug, Default)]
pub struct WsDispatchState {
order_identities: Mutex<AHashMap<ClientOrderId, OrderIdentity>>,
emitted_accepted: Mutex<FifoCache<ClientOrderId, ORDER_DEDUP_CAPACITY>>,
filled_orders: Mutex<FifoCache<ClientOrderId, ORDER_DEDUP_CAPACITY>>,
emitted_trades: Mutex<FifoCache<TradeId, TRADE_DEDUP_CAPACITY>>,
bound_venue_order_ids: Mutex<AHashMap<ClientOrderId, VenueOrderId>>,
pending_modifies: Mutex<AHashMap<ClientOrderId, VenueOrderId>>,
}
impl WsDispatchState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
self.order_identities
.lock()
.expect(MUTEX_POISONED)
.insert(client_order_id, identity);
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
#[must_use]
pub fn identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
self.order_identities
.lock()
.expect(MUTEX_POISONED)
.get(client_order_id)
.copied()
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
pub fn forget(&self, client_order_id: &ClientOrderId) {
self.order_identities
.lock()
.expect(MUTEX_POISONED)
.remove(client_order_id);
self.emitted_accepted
.lock()
.expect(MUTEX_POISONED)
.remove(client_order_id);
self.bound_venue_order_ids
.lock()
.expect(MUTEX_POISONED)
.remove(client_order_id);
self.pending_modifies
.lock()
.expect(MUTEX_POISONED)
.remove(client_order_id);
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
pub fn record_venue_order_id(
&self,
client_order_id: ClientOrderId,
venue_order_id: VenueOrderId,
) {
self.bound_venue_order_ids
.lock()
.expect(MUTEX_POISONED)
.insert(client_order_id, venue_order_id);
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
#[must_use]
pub fn bound_venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
self.bound_venue_order_ids
.lock()
.expect(MUTEX_POISONED)
.get(client_order_id)
.copied()
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
pub fn mark_pending_modify(
&self,
client_order_id: ClientOrderId,
old_venue_order_id: VenueOrderId,
) {
self.pending_modifies
.lock()
.expect(MUTEX_POISONED)
.insert(client_order_id, old_venue_order_id);
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
pub fn clear_pending_modify(&self, client_order_id: &ClientOrderId) {
self.pending_modifies
.lock()
.expect(MUTEX_POISONED)
.remove(client_order_id);
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
#[must_use]
pub fn pending_modify(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
self.pending_modifies
.lock()
.expect(MUTEX_POISONED)
.get(client_order_id)
.copied()
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
#[must_use]
pub fn contains_accepted(&self, client_order_id: &ClientOrderId) -> bool {
self.emitted_accepted
.lock()
.expect(MUTEX_POISONED)
.contains(client_order_id)
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
pub fn mark_accepted(&self, client_order_id: ClientOrderId) -> bool {
let mut cache = self.emitted_accepted.lock().expect(MUTEX_POISONED);
if cache.contains(&client_order_id) {
return true;
}
cache.add(client_order_id);
false
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
#[must_use]
pub fn contains_filled(&self, client_order_id: &ClientOrderId) -> bool {
self.filled_orders
.lock()
.expect(MUTEX_POISONED)
.contains(client_order_id)
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
pub fn mark_filled(&self, client_order_id: ClientOrderId) {
let mut cache = self.filled_orders.lock().expect(MUTEX_POISONED);
if !cache.contains(&client_order_id) {
cache.add(client_order_id);
}
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
let mut cache = self.emitted_trades.lock().expect(MUTEX_POISONED);
if cache.contains(&trade_id) {
return true;
}
cache.add(trade_id);
false
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
#[must_use]
pub fn contains_trade(&self, trade_id: &TradeId) -> bool {
self.emitted_trades
.lock()
.expect(MUTEX_POISONED)
.contains(trade_id)
}
}
#[cfg(test)]
mod tests {
use nautilus_model::{
enums::{OrderSide, OrderType},
identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
};
use rstest::rstest;
use super::*;
fn sample_identity() -> OrderIdentity {
OrderIdentity {
instrument_id: InstrumentId::from("ETH-PERP.DERIVE"),
strategy_id: StrategyId::from("S-1"),
order_side: OrderSide::Buy,
order_type: OrderType::Limit,
}
}
#[rstest]
fn test_register_and_identity_roundtrip() {
let state = WsDispatchState::new();
let cid = ClientOrderId::from("STRAT-O-1");
let identity = sample_identity();
assert!(state.identity(&cid).is_none());
state.register_identity(cid, identity);
assert_eq!(state.identity(&cid), Some(identity));
state.forget(&cid);
assert!(state.identity(&cid).is_none());
}
#[rstest]
fn test_mark_accepted_dedupes_second_call() {
let state = WsDispatchState::new();
let cid = ClientOrderId::from("STRAT-O-1");
assert!(!state.mark_accepted(cid));
assert!(state.contains_accepted(&cid));
assert!(state.mark_accepted(cid));
}
#[rstest]
fn test_check_and_insert_trade_returns_true_on_duplicate() {
let state = WsDispatchState::new();
let trade_id = TradeId::new("T-1");
assert!(!state.check_and_insert_trade(trade_id));
assert!(state.contains_trade(&trade_id));
assert!(state.check_and_insert_trade(trade_id));
}
#[rstest]
fn test_forget_clears_accepted_marker() {
let state = WsDispatchState::new();
let cid = ClientOrderId::from("STRAT-O-1");
state.mark_accepted(cid);
state.forget(&cid);
assert!(!state.contains_accepted(&cid));
}
#[rstest]
fn test_bound_venue_order_id_records_and_advances() {
let state = WsDispatchState::new();
let cid = ClientOrderId::from("STRAT-O-1");
let voi1 = VenueOrderId::from("voi-1");
let voi2 = VenueOrderId::from("voi-2");
assert!(state.bound_venue_order_id(&cid).is_none());
state.record_venue_order_id(cid, voi1);
assert_eq!(state.bound_venue_order_id(&cid), Some(voi1));
state.record_venue_order_id(cid, voi2);
assert_eq!(state.bound_venue_order_id(&cid), Some(voi2));
}
#[rstest]
fn test_pending_modify_marker_set_and_cleared() {
let state = WsDispatchState::new();
let cid = ClientOrderId::from("STRAT-O-1");
let old_voi = VenueOrderId::from("voi-1");
assert!(state.pending_modify(&cid).is_none());
state.mark_pending_modify(cid, old_voi);
assert_eq!(state.pending_modify(&cid), Some(old_voi));
state.clear_pending_modify(&cid);
assert!(state.pending_modify(&cid).is_none());
}
#[rstest]
fn test_forget_clears_bound_and_pending() {
let state = WsDispatchState::new();
let cid = ClientOrderId::from("STRAT-O-1");
state.record_venue_order_id(cid, VenueOrderId::from("voi-1"));
state.mark_pending_modify(cid, VenueOrderId::from("voi-1"));
state.forget(&cid);
assert!(state.bound_venue_order_id(&cid).is_none());
assert!(state.pending_modify(&cid).is_none());
}
}