1use std::{
55 collections::VecDeque,
56 hash::Hash,
57 sync::{
58 Mutex,
59 atomic::{AtomicBool, Ordering},
60 },
61};
62
63use ahash::AHashSet;
64use dashmap::{DashMap, DashSet};
65use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
66use nautilus_live::ExecutionEventEmitter;
67use nautilus_model::{
68 enums::{OrderSide, OrderStatus, OrderType},
69 events::{
70 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
71 OrderTriggered, OrderUpdated,
72 },
73 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
74 reports::{FillReport, OrderStatusReport},
75 types::{Price, Quantity},
76};
77use ustr::Ustr;
78
79pub const DEDUP_CAPACITY: usize = 10_000;
80
81#[derive(Debug, Clone)]
88pub struct OrderIdentity {
89 pub strategy_id: StrategyId,
91 pub instrument_id: InstrumentId,
93 pub order_side: OrderSide,
95 pub order_type: OrderType,
97 pub quantity: Quantity,
99 pub price: Option<Price>,
103}
104
105#[derive(Debug)]
113pub struct BoundedDedup<T>
114where
115 T: Eq + Hash + Clone,
116{
117 order: VecDeque<T>,
118 set: AHashSet<T>,
119 capacity: usize,
120}
121
122impl<T> BoundedDedup<T>
123where
124 T: Eq + Hash + Clone,
125{
126 #[must_use]
128 pub fn new(capacity: usize) -> Self {
129 Self {
130 order: VecDeque::with_capacity(capacity),
131 set: AHashSet::with_capacity(capacity),
132 capacity,
133 }
134 }
135
136 pub fn insert(&mut self, value: T) -> bool {
138 if self.set.contains(&value) {
139 return true;
140 }
141
142 if self.order.len() >= self.capacity
143 && let Some(evicted) = self.order.pop_front()
144 {
145 self.set.remove(&evicted);
146 }
147
148 self.order.push_back(value.clone());
149 self.set.insert(value);
150 false
151 }
152
153 #[must_use]
155 pub fn len(&self) -> usize {
156 self.set.len()
157 }
158
159 #[must_use]
161 pub fn is_empty(&self) -> bool {
162 self.set.is_empty()
163 }
164
165 #[must_use]
167 pub fn contains(&self, value: &T) -> bool {
168 self.set.contains(value)
169 }
170}
171
172#[derive(Debug)]
182pub struct WsDispatchState {
183 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
185 pub emitted_accepted: DashSet<ClientOrderId>,
187 pub filled_orders: DashSet<ClientOrderId>,
192 pub emitted_trades: Mutex<BoundedDedup<TradeId>>,
197 pub cached_venue_order_ids: DashMap<ClientOrderId, VenueOrderId>,
204 pub pending_modify_keys: DashMap<ClientOrderId, VenueOrderId>,
213 pub pending_modify_target_qty: DashMap<ClientOrderId, Quantity>,
217 pub buffered_fills: DashMap<ClientOrderId, Vec<FillReport>>,
221 pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
224 clearing: AtomicBool,
225}
226
227impl Default for WsDispatchState {
228 fn default() -> Self {
229 Self {
230 order_identities: DashMap::new(),
231 emitted_accepted: DashSet::default(),
232 filled_orders: DashSet::default(),
233 emitted_trades: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
234 cached_venue_order_ids: DashMap::new(),
235 pending_modify_keys: DashMap::new(),
236 pending_modify_target_qty: DashMap::new(),
237 buffered_fills: DashMap::new(),
238 order_filled_qty: DashMap::new(),
239 clearing: AtomicBool::new(false),
240 }
241 }
242}
243
244impl WsDispatchState {
245 #[must_use]
247 pub fn new() -> Self {
248 Self::default()
249 }
250
251 pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
254 self.order_identities.insert(client_order_id, identity);
255 }
256
257 #[must_use]
259 pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
260 self.order_identities
261 .get(client_order_id)
262 .map(|r| r.clone())
263 }
264
265 pub fn update_identity_price(&self, client_order_id: &ClientOrderId, price: Option<Price>) {
268 if let Some(price) = price
269 && let Some(mut entry) = self.order_identities.get_mut(client_order_id)
270 {
271 entry.price = Some(price);
272 }
273 }
274
275 pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
277 if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
278 entry.quantity = quantity;
279 }
280 }
281
282 pub fn insert_accepted(&self, cid: ClientOrderId) {
284 self.evict_if_full(&self.emitted_accepted);
285 self.emitted_accepted.insert(cid);
286 }
287
288 pub fn insert_filled(&self, cid: ClientOrderId) {
290 self.evict_if_full(&self.filled_orders);
291 self.filled_orders.insert(cid);
292 }
293
294 #[allow(
299 clippy::missing_panics_doc,
300 reason = "dedup mutex poisoning is not expected"
301 )]
302 pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
303 let mut set = self.emitted_trades.lock().expect(MUTEX_POISONED);
304 set.insert(trade_id)
305 }
306
307 pub fn record_venue_order_id(
309 &self,
310 client_order_id: ClientOrderId,
311 venue_order_id: VenueOrderId,
312 ) {
313 self.cached_venue_order_ids
314 .insert(client_order_id, venue_order_id);
315 }
316
317 #[must_use]
319 pub fn cached_venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
320 self.cached_venue_order_ids.get(client_order_id).map(|r| *r)
321 }
322
323 pub fn mark_pending_modify(
326 &self,
327 client_order_id: ClientOrderId,
328 old_venue_order_id: VenueOrderId,
329 target_qty: Quantity,
330 ) {
331 self.pending_modify_keys
332 .insert(client_order_id, old_venue_order_id);
333 self.pending_modify_target_qty
334 .insert(client_order_id, target_qty);
335 }
336
337 pub fn clear_pending_modify(&self, client_order_id: &ClientOrderId) {
339 self.pending_modify_keys.remove(client_order_id);
340 self.pending_modify_target_qty.remove(client_order_id);
341 }
342
343 #[must_use]
345 pub fn pending_modify(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
346 self.pending_modify_keys.get(client_order_id).map(|r| *r)
347 }
348
349 #[must_use]
351 pub fn pending_modify_target_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
352 self.pending_modify_target_qty
353 .get(client_order_id)
354 .map(|r| *r)
355 }
356
357 pub fn buffer_fill(&self, client_order_id: ClientOrderId, fill: FillReport) {
359 self.buffered_fills
360 .entry(client_order_id)
361 .or_default()
362 .push(fill);
363 }
364
365 #[must_use]
367 pub fn drain_buffered_fills(&self, client_order_id: &ClientOrderId) -> Vec<FillReport> {
368 self.buffered_fills
369 .remove(client_order_id)
370 .map(|(_, v)| v)
371 .unwrap_or_default()
372 }
373
374 #[must_use]
376 pub fn buffered_fill_count(&self, client_order_id: &ClientOrderId) -> usize {
377 self.buffered_fills
378 .get(client_order_id)
379 .map_or(0, |r| r.len())
380 }
381
382 pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
384 self.order_filled_qty.insert(client_order_id, qty);
385 }
386
387 #[must_use]
389 pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
390 self.order_filled_qty.get(client_order_id).map(|r| *r)
391 }
392
393 pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
398 self.order_identities.remove(client_order_id);
399 self.emitted_accepted.remove(client_order_id);
400 self.cached_venue_order_ids.remove(client_order_id);
401 self.pending_modify_keys.remove(client_order_id);
402 self.pending_modify_target_qty.remove(client_order_id);
403 self.buffered_fills.remove(client_order_id);
404 self.order_filled_qty.remove(client_order_id);
405 }
406
407 fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
408 if set.len() >= DEDUP_CAPACITY
409 && self
410 .clearing
411 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
412 .is_ok()
413 {
414 set.clear();
415 self.clearing.store(false, Ordering::Release);
416 }
417 }
418}
419
420#[derive(Debug, Clone, Copy, PartialEq, Eq)]
422pub enum DispatchOutcome {
423 Tracked,
427 External,
432 Skip,
436}
437
438pub fn dispatch_order_status_report(
448 report: &OrderStatusReport,
449 state: &WsDispatchState,
450 emitter: &ExecutionEventEmitter,
451 ts_init: UnixNanos,
452) -> DispatchOutcome {
453 let Some(client_order_id) = report.client_order_id else {
454 return DispatchOutcome::External;
455 };
456
457 if state.filled_orders.contains(&client_order_id) {
458 log::debug!(
459 "Skipping stale report for filled order: cid={client_order_id}, status={:?}",
460 report.order_status,
461 );
462 return DispatchOutcome::Skip;
463 }
464
465 let Some(identity) = state.lookup_identity(&client_order_id) else {
466 return DispatchOutcome::External;
467 };
468
469 match report.order_status {
470 OrderStatus::Accepted => {
471 handle_accepted(report, client_order_id, &identity, state, emitter, ts_init)
472 }
473 OrderStatus::Triggered => {
474 handle_triggered(report, client_order_id, &identity, state, emitter, ts_init)
475 }
476 OrderStatus::Canceled => {
477 handle_canceled(report, client_order_id, &identity, state, emitter, ts_init)
478 }
479 OrderStatus::Expired => {
480 handle_expired(report, client_order_id, &identity, state, emitter, ts_init)
481 }
482 OrderStatus::Rejected => {
483 handle_rejected(report, client_order_id, &identity, state, emitter, ts_init)
484 }
485 OrderStatus::Filled => handle_filled_marker(client_order_id, state),
486 OrderStatus::PartiallyFilled => {
487 DispatchOutcome::Tracked
489 }
490 OrderStatus::PendingUpdate
491 | OrderStatus::PendingCancel
492 | OrderStatus::Submitted
493 | OrderStatus::Initialized
494 | OrderStatus::Denied
495 | OrderStatus::Released
496 | OrderStatus::Emulated => DispatchOutcome::Tracked,
497 }
498}
499
500pub fn dispatch_fill_report(
511 report: &FillReport,
512 state: &WsDispatchState,
513 emitter: &ExecutionEventEmitter,
514 ts_init: UnixNanos,
515) -> DispatchOutcome {
516 let Some(client_order_id) = report.client_order_id else {
517 return DispatchOutcome::External;
518 };
519
520 if state.filled_orders.contains(&client_order_id) {
521 log::debug!(
522 "Skipping stale fill for filled order: cid={client_order_id}, trade_id={}",
523 report.trade_id,
524 );
525 return DispatchOutcome::Skip;
526 }
527
528 let Some(identity) = state.lookup_identity(&client_order_id) else {
529 return DispatchOutcome::External;
530 };
531
532 if state.pending_modify(&client_order_id).is_some()
537 && let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
538 && report.venue_order_id != cached_voi
539 {
540 log::debug!(
541 "Buffering cancel-replace fill for {client_order_id}: \
542 report_voi={}, cached_voi={cached_voi}, trade_id={}",
543 report.venue_order_id,
544 report.trade_id,
545 );
546 state.buffer_fill(client_order_id, report.clone());
547 return DispatchOutcome::Tracked;
548 }
549
550 if state.check_and_insert_trade(report.trade_id) {
551 log::debug!(
552 "Skipping duplicate fill for {client_order_id}: trade_id={}",
553 report.trade_id
554 );
555 return DispatchOutcome::Tracked;
556 }
557
558 ensure_accepted_emitted(
559 client_order_id,
560 report.venue_order_id,
561 report.account_id,
562 &identity,
563 state,
564 emitter,
565 report.ts_event,
566 ts_init,
567 );
568
569 let filled = OrderFilled::new(
570 emitter.trader_id(),
571 identity.strategy_id,
572 identity.instrument_id,
573 client_order_id,
574 report.venue_order_id,
575 report.account_id,
576 report.trade_id,
577 identity.order_side,
578 identity.order_type,
579 report.last_qty,
580 report.last_px,
581 report.commission.currency,
582 report.liquidity_side,
583 UUID4::new(),
584 report.ts_event,
585 ts_init,
586 false,
587 report.venue_position_id,
588 Some(report.commission),
589 );
590 emitter.send_order_event(OrderEventAny::Filled(filled));
591
592 let previous = state
593 .previous_filled_qty(&client_order_id)
594 .unwrap_or_else(|| Quantity::zero(report.last_qty.precision));
595 let cumulative = previous + report.last_qty;
596 state.record_filled_qty(client_order_id, cumulative);
597
598 if cumulative >= identity.quantity {
599 state.insert_filled(client_order_id);
600 state.cleanup_terminal(&client_order_id);
601 }
602
603 DispatchOutcome::Tracked
604}
605
606fn handle_accepted(
607 report: &OrderStatusReport,
608 client_order_id: ClientOrderId,
609 identity: &OrderIdentity,
610 state: &WsDispatchState,
611 emitter: &ExecutionEventEmitter,
612 ts_init: UnixNanos,
613) -> DispatchOutcome {
614 let venue_order_id = report.venue_order_id;
615 let ts_event = report.ts_last;
616 let account_id = report.account_id;
617
618 if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
623 && cached_voi != venue_order_id
624 {
625 let price = report.price.or(identity.price);
626 let Some(price) = price else {
627 log::warn!(
628 "Cannot emit OrderUpdated for cancel-replace {client_order_id}: \
629 no price on report and no cached price on identity",
630 );
631 return DispatchOutcome::Skip;
632 };
633
634 let updated_quantity = state
637 .pending_modify_target_qty(&client_order_id)
638 .unwrap_or(report.quantity);
639
640 state.record_venue_order_id(client_order_id, venue_order_id);
641 state.update_identity_quantity(&client_order_id, updated_quantity);
642 state.update_identity_price(&client_order_id, Some(price));
643 state.clear_pending_modify(&client_order_id);
644
645 let updated = OrderUpdated::new(
646 emitter.trader_id(),
647 identity.strategy_id,
648 identity.instrument_id,
649 client_order_id,
650 updated_quantity,
651 UUID4::new(),
652 ts_event,
653 ts_init,
654 false,
655 Some(venue_order_id),
656 Some(account_id),
657 Some(price),
658 report.trigger_price,
659 None,
660 false,
661 );
662 emitter.send_order_event(OrderEventAny::Updated(updated));
663
664 let buffered = state.drain_buffered_fills(&client_order_id);
667 for fill in buffered {
668 dispatch_fill_report(&fill, state, emitter, ts_init);
669 }
670 return DispatchOutcome::Tracked;
671 }
672
673 if state.emitted_accepted.contains(&client_order_id) {
674 state.update_identity_price(&client_order_id, report.price);
678 return DispatchOutcome::Tracked;
679 }
680
681 state.insert_accepted(client_order_id);
682 state.record_venue_order_id(client_order_id, venue_order_id);
683 state.update_identity_price(&client_order_id, report.price);
684
685 let accepted = OrderAccepted::new(
686 emitter.trader_id(),
687 identity.strategy_id,
688 identity.instrument_id,
689 client_order_id,
690 venue_order_id,
691 account_id,
692 UUID4::new(),
693 ts_event,
694 ts_init,
695 false,
696 );
697 emitter.send_order_event(OrderEventAny::Accepted(accepted));
698 DispatchOutcome::Tracked
699}
700
701fn handle_triggered(
702 report: &OrderStatusReport,
703 client_order_id: ClientOrderId,
704 identity: &OrderIdentity,
705 state: &WsDispatchState,
706 emitter: &ExecutionEventEmitter,
707 ts_init: UnixNanos,
708) -> DispatchOutcome {
709 if !matches!(
710 identity.order_type,
711 OrderType::StopLimit | OrderType::TrailingStopLimit | OrderType::LimitIfTouched
712 ) {
713 log::debug!(
714 "Ignoring TRIGGERED status for non-triggerable order type {:?}: {client_order_id}",
715 identity.order_type,
716 );
717 return DispatchOutcome::Tracked;
718 }
719
720 ensure_accepted_emitted(
721 client_order_id,
722 report.venue_order_id,
723 report.account_id,
724 identity,
725 state,
726 emitter,
727 report.ts_last,
728 ts_init,
729 );
730
731 let triggered = OrderTriggered::new(
732 emitter.trader_id(),
733 identity.strategy_id,
734 identity.instrument_id,
735 client_order_id,
736 UUID4::new(),
737 report.ts_last,
738 ts_init,
739 false,
740 Some(report.venue_order_id),
741 Some(report.account_id),
742 );
743 emitter.send_order_event(OrderEventAny::Triggered(triggered));
744 DispatchOutcome::Tracked
745}
746
747fn handle_canceled(
748 report: &OrderStatusReport,
749 client_order_id: ClientOrderId,
750 identity: &OrderIdentity,
751 state: &WsDispatchState,
752 emitter: &ExecutionEventEmitter,
753 ts_init: UnixNanos,
754) -> DispatchOutcome {
755 let venue_order_id = report.venue_order_id;
756
757 if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
761 && cached_voi != venue_order_id
762 {
763 log::debug!(
764 "Skipping stale CANCELED for {venue_order_id} (cached {cached_voi}) on {client_order_id}",
765 );
766 return DispatchOutcome::Skip;
767 }
768
769 if let Some(pending_old) = state.pending_modify(&client_order_id)
775 && pending_old == venue_order_id
776 {
777 log::debug!(
778 "Skipping cancel-before-accept leg for {client_order_id}: venue_order_id={venue_order_id}",
779 );
780 return DispatchOutcome::Skip;
781 }
782
783 ensure_accepted_emitted(
784 client_order_id,
785 venue_order_id,
786 report.account_id,
787 identity,
788 state,
789 emitter,
790 report.ts_last,
791 ts_init,
792 );
793
794 let canceled = OrderCanceled::new(
795 emitter.trader_id(),
796 identity.strategy_id,
797 identity.instrument_id,
798 client_order_id,
799 UUID4::new(),
800 report.ts_last,
801 ts_init,
802 false,
803 Some(venue_order_id),
804 Some(report.account_id),
805 );
806 emitter.send_order_event(OrderEventAny::Canceled(canceled));
807
808 state.insert_filled(client_order_id);
811 state.cleanup_terminal(&client_order_id);
812 DispatchOutcome::Tracked
813}
814
815fn handle_expired(
816 report: &OrderStatusReport,
817 client_order_id: ClientOrderId,
818 identity: &OrderIdentity,
819 state: &WsDispatchState,
820 emitter: &ExecutionEventEmitter,
821 ts_init: UnixNanos,
822) -> DispatchOutcome {
823 ensure_accepted_emitted(
824 client_order_id,
825 report.venue_order_id,
826 report.account_id,
827 identity,
828 state,
829 emitter,
830 report.ts_last,
831 ts_init,
832 );
833
834 let expired = OrderExpired::new(
835 emitter.trader_id(),
836 identity.strategy_id,
837 identity.instrument_id,
838 client_order_id,
839 UUID4::new(),
840 report.ts_last,
841 ts_init,
842 false,
843 Some(report.venue_order_id),
844 Some(report.account_id),
845 );
846 emitter.send_order_event(OrderEventAny::Expired(expired));
847 state.insert_filled(client_order_id);
848 state.cleanup_terminal(&client_order_id);
849 DispatchOutcome::Tracked
850}
851
852fn handle_rejected(
853 report: &OrderStatusReport,
854 client_order_id: ClientOrderId,
855 identity: &OrderIdentity,
856 state: &WsDispatchState,
857 emitter: &ExecutionEventEmitter,
858 ts_init: UnixNanos,
859) -> DispatchOutcome {
860 let reason = report
861 .cancel_reason
862 .clone()
863 .unwrap_or_else(|| "Order rejected by exchange".to_string());
864 let rejected = OrderRejected::new(
865 emitter.trader_id(),
866 identity.strategy_id,
867 identity.instrument_id,
868 client_order_id,
869 report.account_id,
870 Ustr::from(&reason),
871 UUID4::new(),
872 report.ts_last,
873 ts_init,
874 false,
875 false,
876 );
877 emitter.send_order_event(OrderEventAny::Rejected(rejected));
878 state.insert_filled(client_order_id);
879 state.cleanup_terminal(&client_order_id);
880 DispatchOutcome::Tracked
881}
882
883fn handle_filled_marker(
884 _client_order_id: ClientOrderId,
885 _state: &WsDispatchState,
886) -> DispatchOutcome {
887 DispatchOutcome::Tracked
895}
896
897#[allow(clippy::too_many_arguments)]
905fn ensure_accepted_emitted(
906 client_order_id: ClientOrderId,
907 venue_order_id: VenueOrderId,
908 account_id: AccountId,
909 identity: &OrderIdentity,
910 state: &WsDispatchState,
911 emitter: &ExecutionEventEmitter,
912 ts_event: UnixNanos,
913 ts_init: UnixNanos,
914) {
915 if state.emitted_accepted.contains(&client_order_id) {
916 return;
917 }
918 state.insert_accepted(client_order_id);
919 state.record_venue_order_id(client_order_id, venue_order_id);
920
921 let accepted = OrderAccepted::new(
922 emitter.trader_id(),
923 identity.strategy_id,
924 identity.instrument_id,
925 client_order_id,
926 venue_order_id,
927 account_id,
928 UUID4::new(),
929 ts_event,
930 ts_init,
931 false,
932 );
933 emitter.send_order_event(OrderEventAny::Accepted(accepted));
934}
935
936#[cfg(test)]
937mod tests {
938 use nautilus_model::identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId};
939 use rstest::rstest;
940
941 use super::*;
942
943 fn make_identity() -> OrderIdentity {
944 OrderIdentity {
945 strategy_id: StrategyId::from("S-001"),
946 instrument_id: InstrumentId::from("BTC-USD-PERP.HYPERLIQUID"),
947 order_side: OrderSide::Buy,
948 order_type: OrderType::Limit,
949 quantity: Quantity::from("0.0001"),
950 price: None,
951 }
952 }
953
954 #[rstest]
955 fn test_register_and_lookup_identity() {
956 let state = WsDispatchState::new();
957 let cid = ClientOrderId::new("O-001");
958 state.register_identity(cid, make_identity());
959
960 let found = state.lookup_identity(&cid);
961 assert!(found.is_some());
962 let identity = found.unwrap();
963 assert_eq!(identity.strategy_id.as_str(), "S-001");
964 assert_eq!(identity.order_side, OrderSide::Buy);
965 }
966
967 #[rstest]
968 fn test_lookup_identity_missing_returns_none() {
969 let state = WsDispatchState::new();
970 let cid = ClientOrderId::new("not-tracked");
971 assert!(state.lookup_identity(&cid).is_none());
972 }
973
974 #[rstest]
975 fn test_insert_accepted_dedup() {
976 let state = WsDispatchState::new();
977 let cid = ClientOrderId::new("O-002");
978 assert!(!state.emitted_accepted.contains(&cid));
979 state.insert_accepted(cid);
980 assert!(state.emitted_accepted.contains(&cid));
981 state.insert_accepted(cid);
982 assert!(state.emitted_accepted.contains(&cid));
983 }
984
985 #[rstest]
986 fn test_check_and_insert_trade_detects_duplicates() {
987 let state = WsDispatchState::new();
988 let trade = TradeId::new("trade-1");
989 assert!(!state.check_and_insert_trade(trade));
990 assert!(state.check_and_insert_trade(trade));
991 }
992
993 #[rstest]
994 fn test_bounded_dedup_fifo_eviction_preserves_recent_ids() {
995 let mut dedup: BoundedDedup<TradeId> = BoundedDedup::new(3);
996 assert!(!dedup.insert(TradeId::new("t-0")));
997 assert!(!dedup.insert(TradeId::new("t-1")));
998 assert!(!dedup.insert(TradeId::new("t-2")));
999 assert_eq!(dedup.len(), 3);
1000
1001 assert!(!dedup.insert(TradeId::new("t-3")));
1003 assert_eq!(dedup.len(), 3);
1004 assert!(!dedup.contains(&TradeId::new("t-0")));
1005 assert!(dedup.contains(&TradeId::new("t-1")));
1006 assert!(dedup.contains(&TradeId::new("t-3")));
1007 }
1008
1009 #[rstest]
1010 fn test_pending_modify_roundtrip() {
1011 let state = WsDispatchState::new();
1012 let cid = ClientOrderId::new("O-010");
1013 let voi = VenueOrderId::new("v-1");
1014 let target_qty = Quantity::from("0.0001");
1015
1016 assert!(state.pending_modify(&cid).is_none());
1017 assert!(state.pending_modify_target_qty(&cid).is_none());
1018 state.mark_pending_modify(cid, voi, target_qty);
1019 assert_eq!(state.pending_modify(&cid), Some(voi));
1020 assert_eq!(state.pending_modify_target_qty(&cid), Some(target_qty));
1021 state.clear_pending_modify(&cid);
1022 assert!(state.pending_modify(&cid).is_none());
1023 assert!(state.pending_modify_target_qty(&cid).is_none());
1024 }
1025
1026 #[rstest]
1027 fn test_cleanup_terminal_preserves_filled_marker() {
1028 let state = WsDispatchState::new();
1029 let cid = ClientOrderId::new("O-020");
1030 state.register_identity(cid, make_identity());
1031 state.insert_accepted(cid);
1032 state.mark_pending_modify(cid, VenueOrderId::new("v-1"), Quantity::from("0.0001"));
1033 state.insert_filled(cid);
1034 state.cleanup_terminal(&cid);
1035
1036 assert!(state.lookup_identity(&cid).is_none());
1037 assert!(!state.emitted_accepted.contains(&cid));
1038 assert!(state.pending_modify(&cid).is_none());
1039 assert!(state.pending_modify_target_qty(&cid).is_none());
1040 assert!(state.filled_orders.contains(&cid));
1042 }
1043}