use std::{
collections::VecDeque,
hash::Hash,
sync::{
Mutex,
atomic::{AtomicBool, Ordering},
},
};
use ahash::AHashSet;
use dashmap::{DashMap, DashSet};
use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
use nautilus_live::ExecutionEventEmitter;
use nautilus_model::{
enums::{OrderSide, OrderStatus, OrderType},
events::{
OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
OrderTriggered, OrderUpdated,
},
identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
reports::{FillReport, OrderStatusReport},
types::{Price, Quantity},
};
use ustr::Ustr;
pub const DEDUP_CAPACITY: usize = 10_000;
#[derive(Debug, Clone)]
pub struct OrderIdentity {
pub strategy_id: StrategyId,
pub instrument_id: InstrumentId,
pub order_side: OrderSide,
pub order_type: OrderType,
pub quantity: Quantity,
pub price: Option<Price>,
}
#[derive(Debug)]
pub struct BoundedDedup<T>
where
T: Eq + Hash + Clone,
{
order: VecDeque<T>,
set: AHashSet<T>,
capacity: usize,
}
impl<T> BoundedDedup<T>
where
T: Eq + Hash + Clone,
{
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
order: VecDeque::with_capacity(capacity),
set: AHashSet::with_capacity(capacity),
capacity,
}
}
pub fn insert(&mut self, value: T) -> bool {
if self.set.contains(&value) {
return true;
}
if self.order.len() >= self.capacity
&& let Some(evicted) = self.order.pop_front()
{
self.set.remove(&evicted);
}
self.order.push_back(value.clone());
self.set.insert(value);
false
}
#[must_use]
pub fn len(&self) -> usize {
self.set.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.set.is_empty()
}
#[must_use]
pub fn contains(&self, value: &T) -> bool {
self.set.contains(value)
}
}
#[derive(Debug)]
pub struct WsDispatchState {
pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
pub emitted_accepted: DashSet<ClientOrderId>,
pub filled_orders: DashSet<ClientOrderId>,
pub emitted_trades: Mutex<BoundedDedup<TradeId>>,
pub cached_venue_order_ids: DashMap<ClientOrderId, VenueOrderId>,
pub pending_modify_keys: DashMap<ClientOrderId, VenueOrderId>,
pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
clearing: AtomicBool,
}
impl Default for WsDispatchState {
fn default() -> Self {
Self {
order_identities: DashMap::new(),
emitted_accepted: DashSet::default(),
filled_orders: DashSet::default(),
emitted_trades: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
cached_venue_order_ids: DashMap::new(),
pending_modify_keys: DashMap::new(),
order_filled_qty: DashMap::new(),
clearing: AtomicBool::new(false),
}
}
}
impl WsDispatchState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
self.order_identities.insert(client_order_id, identity);
}
#[must_use]
pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
self.order_identities
.get(client_order_id)
.map(|r| r.clone())
}
pub fn update_identity_price(&self, client_order_id: &ClientOrderId, price: Option<Price>) {
if let Some(price) = price
&& let Some(mut entry) = self.order_identities.get_mut(client_order_id)
{
entry.price = Some(price);
}
}
pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
entry.quantity = quantity;
}
}
pub fn insert_accepted(&self, cid: ClientOrderId) {
self.evict_if_full(&self.emitted_accepted);
self.emitted_accepted.insert(cid);
}
pub fn insert_filled(&self, cid: ClientOrderId) {
self.evict_if_full(&self.filled_orders);
self.filled_orders.insert(cid);
}
#[allow(
clippy::missing_panics_doc,
reason = "dedup mutex poisoning is not expected"
)]
pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
let mut set = self.emitted_trades.lock().expect(MUTEX_POISONED);
set.insert(trade_id)
}
pub fn record_venue_order_id(
&self,
client_order_id: ClientOrderId,
venue_order_id: VenueOrderId,
) {
self.cached_venue_order_ids
.insert(client_order_id, venue_order_id);
}
#[must_use]
pub fn cached_venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
self.cached_venue_order_ids.get(client_order_id).map(|r| *r)
}
pub fn mark_pending_modify(
&self,
client_order_id: ClientOrderId,
old_venue_order_id: VenueOrderId,
) {
self.pending_modify_keys
.insert(client_order_id, old_venue_order_id);
}
pub fn clear_pending_modify(&self, client_order_id: &ClientOrderId) {
self.pending_modify_keys.remove(client_order_id);
}
#[must_use]
pub fn pending_modify(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
self.pending_modify_keys.get(client_order_id).map(|r| *r)
}
pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
self.order_filled_qty.insert(client_order_id, qty);
}
#[must_use]
pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
self.order_filled_qty.get(client_order_id).map(|r| *r)
}
pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
self.order_identities.remove(client_order_id);
self.emitted_accepted.remove(client_order_id);
self.cached_venue_order_ids.remove(client_order_id);
self.pending_modify_keys.remove(client_order_id);
self.order_filled_qty.remove(client_order_id);
}
fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
if set.len() >= DEDUP_CAPACITY
&& self
.clearing
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
set.clear();
self.clearing.store(false, Ordering::Release);
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DispatchOutcome {
Tracked,
External,
Skip,
}
pub fn dispatch_order_status_report(
report: &OrderStatusReport,
state: &WsDispatchState,
emitter: &ExecutionEventEmitter,
ts_init: UnixNanos,
) -> DispatchOutcome {
let Some(client_order_id) = report.client_order_id else {
return DispatchOutcome::External;
};
if state.filled_orders.contains(&client_order_id) {
log::debug!(
"Skipping stale report for filled order: cid={client_order_id}, status={:?}",
report.order_status,
);
return DispatchOutcome::Skip;
}
let Some(identity) = state.lookup_identity(&client_order_id) else {
return DispatchOutcome::External;
};
match report.order_status {
OrderStatus::Accepted => {
handle_accepted(report, client_order_id, &identity, state, emitter, ts_init)
}
OrderStatus::Triggered => {
handle_triggered(report, client_order_id, &identity, state, emitter, ts_init)
}
OrderStatus::Canceled => {
handle_canceled(report, client_order_id, &identity, state, emitter, ts_init)
}
OrderStatus::Expired => {
handle_expired(report, client_order_id, &identity, state, emitter, ts_init)
}
OrderStatus::Rejected => {
handle_rejected(report, client_order_id, &identity, state, emitter, ts_init)
}
OrderStatus::Filled => handle_filled_marker(client_order_id, state),
OrderStatus::PartiallyFilled => {
DispatchOutcome::Tracked
}
OrderStatus::PendingUpdate
| OrderStatus::PendingCancel
| OrderStatus::Submitted
| OrderStatus::Initialized
| OrderStatus::Denied
| OrderStatus::Released
| OrderStatus::Emulated => DispatchOutcome::Tracked,
}
}
pub fn dispatch_fill_report(
report: &FillReport,
state: &WsDispatchState,
emitter: &ExecutionEventEmitter,
ts_init: UnixNanos,
) -> DispatchOutcome {
let Some(client_order_id) = report.client_order_id else {
return DispatchOutcome::External;
};
if state.filled_orders.contains(&client_order_id) {
log::debug!(
"Skipping stale fill for filled order: cid={client_order_id}, trade_id={}",
report.trade_id,
);
return DispatchOutcome::Skip;
}
let Some(identity) = state.lookup_identity(&client_order_id) else {
return DispatchOutcome::External;
};
if state.check_and_insert_trade(report.trade_id) {
log::debug!(
"Skipping duplicate fill for {client_order_id}: trade_id={}",
report.trade_id
);
return DispatchOutcome::Tracked;
}
ensure_accepted_emitted(
client_order_id,
report.venue_order_id,
report.account_id,
&identity,
state,
emitter,
report.ts_event,
ts_init,
);
let filled = OrderFilled::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
report.venue_order_id,
report.account_id,
report.trade_id,
identity.order_side,
identity.order_type,
report.last_qty,
report.last_px,
report.commission.currency,
report.liquidity_side,
UUID4::new(),
report.ts_event,
ts_init,
false,
report.venue_position_id,
Some(report.commission),
);
emitter.send_order_event(OrderEventAny::Filled(filled));
let previous = state
.previous_filled_qty(&client_order_id)
.unwrap_or_else(|| Quantity::zero(report.last_qty.precision));
let cumulative = previous + report.last_qty;
state.record_filled_qty(client_order_id, cumulative);
if cumulative >= identity.quantity {
state.insert_filled(client_order_id);
state.cleanup_terminal(&client_order_id);
}
DispatchOutcome::Tracked
}
fn handle_accepted(
report: &OrderStatusReport,
client_order_id: ClientOrderId,
identity: &OrderIdentity,
state: &WsDispatchState,
emitter: &ExecutionEventEmitter,
ts_init: UnixNanos,
) -> DispatchOutcome {
let venue_order_id = report.venue_order_id;
let ts_event = report.ts_last;
let account_id = report.account_id;
if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
&& cached_voi != venue_order_id
{
let price = report.price.or(identity.price);
let Some(price) = price else {
log::warn!(
"Cannot emit OrderUpdated for cancel-replace {client_order_id}: \
no price on report and no cached price on identity",
);
return DispatchOutcome::Skip;
};
state.record_venue_order_id(client_order_id, venue_order_id);
state.update_identity_quantity(&client_order_id, report.quantity);
state.update_identity_price(&client_order_id, Some(price));
state.clear_pending_modify(&client_order_id);
let updated = OrderUpdated::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
report.quantity,
UUID4::new(),
ts_event,
ts_init,
false,
Some(venue_order_id),
Some(account_id),
Some(price),
report.trigger_price,
None,
false,
);
emitter.send_order_event(OrderEventAny::Updated(updated));
return DispatchOutcome::Tracked;
}
if state.emitted_accepted.contains(&client_order_id) {
state.update_identity_price(&client_order_id, report.price);
return DispatchOutcome::Tracked;
}
state.insert_accepted(client_order_id);
state.record_venue_order_id(client_order_id, venue_order_id);
state.update_identity_price(&client_order_id, report.price);
let accepted = OrderAccepted::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
venue_order_id,
account_id,
UUID4::new(),
ts_event,
ts_init,
false,
);
emitter.send_order_event(OrderEventAny::Accepted(accepted));
DispatchOutcome::Tracked
}
fn handle_triggered(
report: &OrderStatusReport,
client_order_id: ClientOrderId,
identity: &OrderIdentity,
state: &WsDispatchState,
emitter: &ExecutionEventEmitter,
ts_init: UnixNanos,
) -> DispatchOutcome {
if !matches!(
identity.order_type,
OrderType::StopLimit | OrderType::TrailingStopLimit | OrderType::LimitIfTouched
) {
log::debug!(
"Ignoring TRIGGERED status for non-triggerable order type {:?}: {client_order_id}",
identity.order_type,
);
return DispatchOutcome::Tracked;
}
ensure_accepted_emitted(
client_order_id,
report.venue_order_id,
report.account_id,
identity,
state,
emitter,
report.ts_last,
ts_init,
);
let triggered = OrderTriggered::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
UUID4::new(),
report.ts_last,
ts_init,
false,
Some(report.venue_order_id),
Some(report.account_id),
);
emitter.send_order_event(OrderEventAny::Triggered(triggered));
DispatchOutcome::Tracked
}
fn handle_canceled(
report: &OrderStatusReport,
client_order_id: ClientOrderId,
identity: &OrderIdentity,
state: &WsDispatchState,
emitter: &ExecutionEventEmitter,
ts_init: UnixNanos,
) -> DispatchOutcome {
let venue_order_id = report.venue_order_id;
if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
&& cached_voi != venue_order_id
{
log::debug!(
"Skipping stale CANCELED for {venue_order_id} (cached {cached_voi}) on {client_order_id}",
);
return DispatchOutcome::Skip;
}
if let Some(pending_old) = state.pending_modify(&client_order_id)
&& pending_old == venue_order_id
{
log::debug!(
"Skipping cancel-before-accept leg for {client_order_id}: venue_order_id={venue_order_id}",
);
return DispatchOutcome::Skip;
}
ensure_accepted_emitted(
client_order_id,
venue_order_id,
report.account_id,
identity,
state,
emitter,
report.ts_last,
ts_init,
);
let canceled = OrderCanceled::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
UUID4::new(),
report.ts_last,
ts_init,
false,
Some(venue_order_id),
Some(report.account_id),
);
emitter.send_order_event(OrderEventAny::Canceled(canceled));
state.insert_filled(client_order_id);
state.cleanup_terminal(&client_order_id);
DispatchOutcome::Tracked
}
fn handle_expired(
report: &OrderStatusReport,
client_order_id: ClientOrderId,
identity: &OrderIdentity,
state: &WsDispatchState,
emitter: &ExecutionEventEmitter,
ts_init: UnixNanos,
) -> DispatchOutcome {
ensure_accepted_emitted(
client_order_id,
report.venue_order_id,
report.account_id,
identity,
state,
emitter,
report.ts_last,
ts_init,
);
let expired = OrderExpired::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
UUID4::new(),
report.ts_last,
ts_init,
false,
Some(report.venue_order_id),
Some(report.account_id),
);
emitter.send_order_event(OrderEventAny::Expired(expired));
state.insert_filled(client_order_id);
state.cleanup_terminal(&client_order_id);
DispatchOutcome::Tracked
}
fn handle_rejected(
report: &OrderStatusReport,
client_order_id: ClientOrderId,
identity: &OrderIdentity,
state: &WsDispatchState,
emitter: &ExecutionEventEmitter,
ts_init: UnixNanos,
) -> DispatchOutcome {
let reason = report
.cancel_reason
.clone()
.unwrap_or_else(|| "Order rejected by exchange".to_string());
let rejected = OrderRejected::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
report.account_id,
Ustr::from(&reason),
UUID4::new(),
report.ts_last,
ts_init,
false,
false,
);
emitter.send_order_event(OrderEventAny::Rejected(rejected));
state.insert_filled(client_order_id);
state.cleanup_terminal(&client_order_id);
DispatchOutcome::Tracked
}
fn handle_filled_marker(
_client_order_id: ClientOrderId,
_state: &WsDispatchState,
) -> DispatchOutcome {
DispatchOutcome::Tracked
}
#[allow(clippy::too_many_arguments)]
fn ensure_accepted_emitted(
client_order_id: ClientOrderId,
venue_order_id: VenueOrderId,
account_id: AccountId,
identity: &OrderIdentity,
state: &WsDispatchState,
emitter: &ExecutionEventEmitter,
ts_event: UnixNanos,
ts_init: UnixNanos,
) {
if state.emitted_accepted.contains(&client_order_id) {
return;
}
state.insert_accepted(client_order_id);
state.record_venue_order_id(client_order_id, venue_order_id);
let accepted = OrderAccepted::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
venue_order_id,
account_id,
UUID4::new(),
ts_event,
ts_init,
false,
);
emitter.send_order_event(OrderEventAny::Accepted(accepted));
}
#[cfg(test)]
mod tests {
use nautilus_model::identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId};
use rstest::rstest;
use super::*;
fn make_identity() -> OrderIdentity {
OrderIdentity {
strategy_id: StrategyId::from("S-001"),
instrument_id: InstrumentId::from("BTC-USD-PERP.HYPERLIQUID"),
order_side: OrderSide::Buy,
order_type: OrderType::Limit,
quantity: Quantity::from("0.0001"),
price: None,
}
}
#[rstest]
fn test_register_and_lookup_identity() {
let state = WsDispatchState::new();
let cid = ClientOrderId::new("O-001");
state.register_identity(cid, make_identity());
let found = state.lookup_identity(&cid);
assert!(found.is_some());
let identity = found.unwrap();
assert_eq!(identity.strategy_id.as_str(), "S-001");
assert_eq!(identity.order_side, OrderSide::Buy);
}
#[rstest]
fn test_lookup_identity_missing_returns_none() {
let state = WsDispatchState::new();
let cid = ClientOrderId::new("not-tracked");
assert!(state.lookup_identity(&cid).is_none());
}
#[rstest]
fn test_insert_accepted_dedup() {
let state = WsDispatchState::new();
let cid = ClientOrderId::new("O-002");
assert!(!state.emitted_accepted.contains(&cid));
state.insert_accepted(cid);
assert!(state.emitted_accepted.contains(&cid));
state.insert_accepted(cid);
assert!(state.emitted_accepted.contains(&cid));
}
#[rstest]
fn test_check_and_insert_trade_detects_duplicates() {
let state = WsDispatchState::new();
let trade = TradeId::new("trade-1");
assert!(!state.check_and_insert_trade(trade));
assert!(state.check_and_insert_trade(trade));
}
#[rstest]
fn test_bounded_dedup_fifo_eviction_preserves_recent_ids() {
let mut dedup: BoundedDedup<TradeId> = BoundedDedup::new(3);
assert!(!dedup.insert(TradeId::new("t-0")));
assert!(!dedup.insert(TradeId::new("t-1")));
assert!(!dedup.insert(TradeId::new("t-2")));
assert_eq!(dedup.len(), 3);
assert!(!dedup.insert(TradeId::new("t-3")));
assert_eq!(dedup.len(), 3);
assert!(!dedup.contains(&TradeId::new("t-0")));
assert!(dedup.contains(&TradeId::new("t-1")));
assert!(dedup.contains(&TradeId::new("t-3")));
}
#[rstest]
fn test_pending_modify_roundtrip() {
let state = WsDispatchState::new();
let cid = ClientOrderId::new("O-010");
let voi = VenueOrderId::new("v-1");
assert!(state.pending_modify(&cid).is_none());
state.mark_pending_modify(cid, voi);
assert_eq!(state.pending_modify(&cid), Some(voi));
state.clear_pending_modify(&cid);
assert!(state.pending_modify(&cid).is_none());
}
#[rstest]
fn test_cleanup_terminal_preserves_filled_marker() {
let state = WsDispatchState::new();
let cid = ClientOrderId::new("O-020");
state.register_identity(cid, make_identity());
state.insert_accepted(cid);
state.insert_filled(cid);
state.cleanup_terminal(&cid);
assert!(state.lookup_identity(&cid).is_none());
assert!(!state.emitted_accepted.contains(&cid));
assert!(state.filled_orders.contains(&cid));
}
}