use std::sync::Mutex;
use dashmap::DashMap;
use nautilus_common::cache::fifo::FifoCache;
use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
use nautilus_live::ExecutionEventEmitter;
use nautilus_model::{
enums::{OrderSide, OrderType},
events::{OrderAccepted, OrderEventAny},
identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, VenueOrderId},
types::Price,
};
#[derive(Debug, Clone, Copy)]
pub enum PendingOperation {
Place,
Cancel,
Modify,
}
#[derive(Debug, Clone)]
pub struct PendingRequest {
pub client_order_id: ClientOrderId,
pub venue_order_id: Option<VenueOrderId>,
pub operation: PendingOperation,
}
#[derive(Debug, Clone)]
pub struct OrderIdentity {
pub instrument_id: InstrumentId,
pub strategy_id: StrategyId,
pub order_side: OrderSide,
pub order_type: OrderType,
pub price: Option<Price>,
}
#[derive(Debug)]
pub struct WsDispatchState {
pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
pub pending_requests: DashMap<String, PendingRequest>,
emitted_accepted: Mutex<FifoCache<ClientOrderId, 10_000>>,
filled_orders: Mutex<FifoCache<ClientOrderId, 10_000>>,
}
impl Default for WsDispatchState {
fn default() -> Self {
Self {
order_identities: DashMap::new(),
pending_requests: DashMap::new(),
emitted_accepted: Mutex::new(FifoCache::new()),
filled_orders: Mutex::new(FifoCache::new()),
}
}
}
#[allow(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
impl WsDispatchState {
pub fn has_emitted_accepted(&self, cid: &ClientOrderId) -> bool {
self.emitted_accepted
.lock()
.expect(MUTEX_POISONED)
.contains(cid)
}
pub fn insert_accepted(&self, cid: ClientOrderId) {
self.emitted_accepted.lock().expect(MUTEX_POISONED).add(cid);
}
pub fn has_filled(&self, cid: &ClientOrderId) -> bool {
self.filled_orders
.lock()
.expect(MUTEX_POISONED)
.contains(cid)
}
pub fn insert_filled(&self, cid: ClientOrderId) {
self.filled_orders.lock().expect(MUTEX_POISONED).add(cid);
}
pub fn cleanup_terminal(&self, cid: ClientOrderId) {
self.order_identities.remove(&cid);
self.emitted_accepted
.lock()
.expect(MUTEX_POISONED)
.remove(&cid);
self.filled_orders
.lock()
.expect(MUTEX_POISONED)
.remove(&cid);
}
}
pub fn ensure_accepted_emitted(
client_order_id: ClientOrderId,
account_id: AccountId,
venue_order_id: VenueOrderId,
identity: &OrderIdentity,
emitter: &ExecutionEventEmitter,
state: &WsDispatchState,
ts_init: UnixNanos,
) {
if state.has_emitted_accepted(&client_order_id) {
return;
}
state.insert_accepted(client_order_id);
let accepted = OrderAccepted::new(
emitter.trader_id(),
identity.strategy_id,
identity.instrument_id,
client_order_id,
venue_order_id,
account_id,
UUID4::new(),
ts_init,
ts_init,
false,
);
emitter.send_order_event(OrderEventAny::Accepted(accepted));
}