1use std::{
55 collections::VecDeque,
56 hash::Hash,
57 sync::{
58 Mutex,
59 atomic::{AtomicBool, Ordering},
60 },
61};
62
63use ahash::AHashSet;
64use dashmap::{DashMap, DashSet};
65use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
66use nautilus_live::ExecutionEventEmitter;
67use nautilus_model::{
68 enums::{OrderSide, OrderStatus, OrderType},
69 events::{
70 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
71 OrderTriggered, OrderUpdated,
72 },
73 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
74 reports::{FillReport, OrderStatusReport},
75 types::{Price, Quantity},
76};
77use ustr::Ustr;
78
79use crate::http::models::HyperliquidExecPlaceOrderRequest;
80
81pub const DEDUP_CAPACITY: usize = 10_000;
82
83#[derive(Debug, Clone)]
90pub struct OrderIdentity {
91 pub strategy_id: StrategyId,
93 pub instrument_id: InstrumentId,
95 pub order_side: OrderSide,
97 pub order_type: OrderType,
99 pub quantity: Quantity,
101 pub price: Option<Price>,
105}
106
107#[derive(Debug)]
115pub struct BoundedDedup<T>
116where
117 T: Eq + Hash + Clone,
118{
119 order: VecDeque<T>,
120 set: AHashSet<T>,
121 capacity: usize,
122}
123
124impl<T> BoundedDedup<T>
125where
126 T: Eq + Hash + Clone,
127{
128 #[must_use]
130 pub fn new(capacity: usize) -> Self {
131 Self {
132 order: VecDeque::with_capacity(capacity),
133 set: AHashSet::with_capacity(capacity),
134 capacity,
135 }
136 }
137
138 pub fn insert(&mut self, value: T) -> bool {
140 if self.set.contains(&value) {
141 return true;
142 }
143
144 if self.order.len() >= self.capacity
145 && let Some(evicted) = self.order.pop_front()
146 {
147 self.set.remove(&evicted);
148 }
149
150 self.order.push_back(value.clone());
151 self.set.insert(value);
152 false
153 }
154
155 #[must_use]
157 pub fn len(&self) -> usize {
158 self.set.len()
159 }
160
161 #[must_use]
163 pub fn is_empty(&self) -> bool {
164 self.set.is_empty()
165 }
166
167 #[must_use]
169 pub fn contains(&self, value: &T) -> bool {
170 self.set.contains(value)
171 }
172}
173
174#[derive(Debug)]
184pub struct WsDispatchState {
185 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
187 pub emitted_accepted: DashSet<ClientOrderId>,
189 pub filled_orders: DashSet<ClientOrderId>,
194 pub emitted_trades: Mutex<BoundedDedup<TradeId>>,
199 pub terminal_cloids: Mutex<BoundedDedup<Ustr>>,
202 pub cached_venue_order_ids: DashMap<ClientOrderId, VenueOrderId>,
209 pub pending_modify_keys: DashMap<ClientOrderId, VenueOrderId>,
218 pub pending_modify_target_qty: DashMap<ClientOrderId, Quantity>,
222 pub buffered_fills: DashMap<ClientOrderId, Vec<FillReport>>,
226 pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
229 pub pending_modify_request: DashMap<ClientOrderId, HyperliquidExecPlaceOrderRequest>,
232 pub pending_corrective: DashMap<ClientOrderId, (u64, HyperliquidExecPlaceOrderRequest)>,
235 clearing: AtomicBool,
236}
237
238impl Default for WsDispatchState {
239 fn default() -> Self {
240 Self {
241 order_identities: DashMap::new(),
242 emitted_accepted: DashSet::default(),
243 filled_orders: DashSet::default(),
244 emitted_trades: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
245 terminal_cloids: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
246 cached_venue_order_ids: DashMap::new(),
247 pending_modify_keys: DashMap::new(),
248 pending_modify_target_qty: DashMap::new(),
249 buffered_fills: DashMap::new(),
250 order_filled_qty: DashMap::new(),
251 pending_modify_request: DashMap::new(),
252 pending_corrective: DashMap::new(),
253 clearing: AtomicBool::new(false),
254 }
255 }
256}
257
258impl WsDispatchState {
259 #[must_use]
261 pub fn new() -> Self {
262 Self::default()
263 }
264
265 pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
268 self.order_identities.insert(client_order_id, identity);
269 }
270
271 #[must_use]
273 pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
274 self.order_identities
275 .get(client_order_id)
276 .map(|r| r.clone())
277 }
278
279 pub fn update_identity_price(&self, client_order_id: &ClientOrderId, price: Option<Price>) {
282 if let Some(price) = price
283 && let Some(mut entry) = self.order_identities.get_mut(client_order_id)
284 {
285 entry.price = Some(price);
286 }
287 }
288
289 pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
291 if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
292 entry.quantity = quantity;
293 }
294 }
295
296 pub fn insert_accepted(&self, cid: ClientOrderId) {
298 self.evict_if_full(&self.emitted_accepted);
299 self.emitted_accepted.insert(cid);
300 }
301
302 pub fn insert_filled(&self, cid: ClientOrderId) -> bool {
307 self.evict_if_full(&self.filled_orders);
308 self.filled_orders.insert(cid)
309 }
310
311 #[allow(
316 clippy::missing_panics_doc,
317 reason = "dedup mutex poisoning is not expected"
318 )]
319 pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
320 let mut set = self.emitted_trades.lock().expect(MUTEX_POISONED);
321 set.insert(trade_id)
322 }
323
324 #[allow(
331 clippy::missing_panics_doc,
332 reason = "terminal cloid mutex poisoning is not expected"
333 )]
334 pub fn insert_terminal_cloid(&self, cloid: Ustr) {
335 let mut set = self.terminal_cloids.lock().expect(MUTEX_POISONED);
336 set.insert(cloid);
337 }
338
339 #[allow(
342 clippy::missing_panics_doc,
343 reason = "terminal cloid mutex poisoning is not expected"
344 )]
345 #[must_use]
346 pub fn terminal_cloid_seen(&self, cloid: &Ustr) -> bool {
347 let set = self.terminal_cloids.lock().expect(MUTEX_POISONED);
348 set.contains(cloid)
349 }
350
351 pub fn record_venue_order_id(
353 &self,
354 client_order_id: ClientOrderId,
355 venue_order_id: VenueOrderId,
356 ) {
357 self.cached_venue_order_ids
358 .insert(client_order_id, venue_order_id);
359 }
360
361 #[must_use]
363 pub fn cached_venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
364 self.cached_venue_order_ids.get(client_order_id).map(|r| *r)
365 }
366
367 pub fn mark_pending_modify(
370 &self,
371 client_order_id: ClientOrderId,
372 old_venue_order_id: VenueOrderId,
373 target_qty: Quantity,
374 ) {
375 self.pending_modify_keys
376 .insert(client_order_id, old_venue_order_id);
377 self.pending_modify_target_qty
378 .insert(client_order_id, target_qty);
379 }
380
381 pub fn clear_pending_modify(&self, client_order_id: &ClientOrderId) {
383 self.pending_modify_keys.remove(client_order_id);
384 self.pending_modify_target_qty.remove(client_order_id);
385 self.pending_modify_request.remove(client_order_id);
386 }
387
388 pub fn stash_modify_request(
390 &self,
391 client_order_id: ClientOrderId,
392 request: HyperliquidExecPlaceOrderRequest,
393 ) {
394 self.pending_modify_request.insert(client_order_id, request);
395 }
396
397 #[must_use]
399 pub fn modify_request(
400 &self,
401 client_order_id: &ClientOrderId,
402 ) -> Option<HyperliquidExecPlaceOrderRequest> {
403 self.pending_modify_request
404 .get(client_order_id)
405 .map(|r| r.clone())
406 }
407
408 pub fn queue_corrective(
410 &self,
411 client_order_id: ClientOrderId,
412 oid: u64,
413 request: HyperliquidExecPlaceOrderRequest,
414 ) {
415 self.pending_corrective
416 .insert(client_order_id, (oid, request));
417 }
418
419 #[must_use]
421 pub fn take_corrective(
422 &self,
423 client_order_id: &ClientOrderId,
424 ) -> Option<(u64, HyperliquidExecPlaceOrderRequest)> {
425 self.pending_corrective
426 .remove(client_order_id)
427 .map(|(_, v)| v)
428 }
429
430 #[must_use]
432 pub fn pending_modify(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
433 self.pending_modify_keys.get(client_order_id).map(|r| *r)
434 }
435
436 #[must_use]
438 pub fn pending_modify_target_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
439 self.pending_modify_target_qty
440 .get(client_order_id)
441 .map(|r| *r)
442 }
443
444 pub fn buffer_fill(&self, client_order_id: ClientOrderId, fill: FillReport) {
446 self.buffered_fills
447 .entry(client_order_id)
448 .or_default()
449 .push(fill);
450 }
451
452 #[must_use]
454 pub fn drain_buffered_fills(&self, client_order_id: &ClientOrderId) -> Vec<FillReport> {
455 self.buffered_fills
456 .remove(client_order_id)
457 .map(|(_, v)| v)
458 .unwrap_or_default()
459 }
460
461 #[must_use]
463 pub fn buffered_fill_count(&self, client_order_id: &ClientOrderId) -> usize {
464 self.buffered_fills
465 .get(client_order_id)
466 .map_or(0, |r| r.len())
467 }
468
469 pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
471 self.order_filled_qty.insert(client_order_id, qty);
472 }
473
474 #[must_use]
476 pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
477 self.order_filled_qty.get(client_order_id).map(|r| *r)
478 }
479
480 pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
485 self.order_identities.remove(client_order_id);
486 self.emitted_accepted.remove(client_order_id);
487 self.cached_venue_order_ids.remove(client_order_id);
488 self.pending_modify_keys.remove(client_order_id);
489 self.pending_modify_target_qty.remove(client_order_id);
490 self.pending_modify_request.remove(client_order_id);
491 self.pending_corrective.remove(client_order_id);
492 self.buffered_fills.remove(client_order_id);
493 self.order_filled_qty.remove(client_order_id);
494 }
495
496 fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
497 if set.len() >= DEDUP_CAPACITY
498 && self
499 .clearing
500 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
501 .is_ok()
502 {
503 set.clear();
504 self.clearing.store(false, Ordering::Release);
505 }
506 }
507}
508
509#[derive(Debug, Clone, Copy, PartialEq, Eq)]
511pub enum DispatchOutcome {
512 Tracked,
516 External,
521 Skip,
525}
526
527pub fn dispatch_order_event(
537 report: &OrderStatusReport,
538 state: &WsDispatchState,
539 emitter: &ExecutionEventEmitter,
540 ts_init: UnixNanos,
541) -> DispatchOutcome {
542 let Some(client_order_id) = report.client_order_id else {
543 return DispatchOutcome::External;
544 };
545
546 if state.filled_orders.contains(&client_order_id) {
547 log::debug!(
548 "Skipping stale report for filled order: cid={client_order_id}, status={:?}",
549 report.order_status,
550 );
551 return DispatchOutcome::Skip;
552 }
553
554 let client_order_id_str = client_order_id.as_str();
555 if client_order_id_str.starts_with("0x")
556 && state.terminal_cloid_seen(&Ustr::from(client_order_id_str))
557 {
558 log::debug!(
559 "Skipping stale terminal report for raw cloid: cid={client_order_id}, status={:?}",
560 report.order_status,
561 );
562 return DispatchOutcome::Skip;
563 }
564
565 let Some(identity) = state.lookup_identity(&client_order_id) else {
566 return DispatchOutcome::External;
567 };
568
569 match report.order_status {
570 OrderStatus::Accepted => {
571 handle_accepted(report, client_order_id, &identity, state, emitter, ts_init)
572 }
573 OrderStatus::Triggered => {
574 handle_triggered(report, client_order_id, &identity, state, emitter, ts_init)
575 }
576 OrderStatus::Canceled => {
577 handle_canceled(report, client_order_id, &identity, state, emitter, ts_init)
578 }
579 OrderStatus::Expired => {
580 handle_expired(report, client_order_id, &identity, state, emitter, ts_init)
581 }
582 OrderStatus::Rejected => {
583 handle_rejected(report, client_order_id, &identity, state, emitter, ts_init)
584 }
585 OrderStatus::Filled => handle_filled_marker(client_order_id, state),
586 OrderStatus::PartiallyFilled => {
587 DispatchOutcome::Tracked
589 }
590 OrderStatus::PendingUpdate
591 | OrderStatus::PendingCancel
592 | OrderStatus::Submitted
593 | OrderStatus::Initialized
594 | OrderStatus::Denied
595 | OrderStatus::Released
596 | OrderStatus::Emulated => DispatchOutcome::Tracked,
597 }
598}
599
600pub fn dispatch_order_fill(
611 report: &FillReport,
612 state: &WsDispatchState,
613 emitter: &ExecutionEventEmitter,
614 ts_init: UnixNanos,
615) -> DispatchOutcome {
616 let Some(client_order_id) = report.client_order_id else {
617 return DispatchOutcome::External;
618 };
619
620 if state.filled_orders.contains(&client_order_id) {
621 log::debug!(
622 "Skipping stale fill for filled order: cid={client_order_id}, trade_id={}",
623 report.trade_id,
624 );
625 return DispatchOutcome::Skip;
626 }
627
628 let Some(identity) = state.lookup_identity(&client_order_id) else {
629 return DispatchOutcome::External;
630 };
631
632 if state.pending_modify(&client_order_id).is_some()
637 && let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
638 && report.venue_order_id != cached_voi
639 {
640 log::debug!(
641 "Buffering cancel-replace fill for {client_order_id}: \
642 report_voi={}, cached_voi={cached_voi}, trade_id={}",
643 report.venue_order_id,
644 report.trade_id,
645 );
646 state.buffer_fill(client_order_id, report.clone());
647 return DispatchOutcome::Tracked;
648 }
649
650 if state.check_and_insert_trade(report.trade_id) {
651 log::debug!(
652 "Skipping duplicate fill for {client_order_id}: trade_id={}",
653 report.trade_id
654 );
655 return DispatchOutcome::Tracked;
656 }
657
658 let previous = state
659 .previous_filled_qty(&client_order_id)
660 .unwrap_or_else(|| Quantity::zero(report.last_qty.precision));
661 let cumulative = previous + report.last_qty;
662
663 let is_terminal_fill = cumulative >= identity.quantity;
664 if is_terminal_fill && !claim_terminal_order(client_order_id, state, OrderStatus::Filled) {
665 return DispatchOutcome::Skip;
666 }
667
668 ensure_accepted_emitted(
669 client_order_id,
670 report.venue_order_id,
671 report.account_id,
672 &identity,
673 state,
674 emitter,
675 report.ts_event,
676 ts_init,
677 );
678
679 let filled = OrderFilled::new(
680 emitter.trader_id(),
681 identity.strategy_id,
682 identity.instrument_id,
683 client_order_id,
684 report.venue_order_id,
685 report.account_id,
686 report.trade_id,
687 identity.order_side,
688 identity.order_type,
689 report.last_qty,
690 report.last_px,
691 report.commission.currency,
692 report.liquidity_side,
693 UUID4::new(),
694 report.ts_event,
695 ts_init,
696 false,
697 report.venue_position_id,
698 Some(report.commission),
699 );
700 emitter.send_order_event(OrderEventAny::Filled(filled));
701
702 state.record_filled_qty(client_order_id, cumulative);
703
704 if is_terminal_fill {
705 state.cleanup_terminal(&client_order_id);
706 }
707
708 DispatchOutcome::Tracked
709}
710
711fn handle_accepted(
712 report: &OrderStatusReport,
713 client_order_id: ClientOrderId,
714 identity: &OrderIdentity,
715 state: &WsDispatchState,
716 emitter: &ExecutionEventEmitter,
717 ts_init: UnixNanos,
718) -> DispatchOutcome {
719 let venue_order_id = report.venue_order_id;
720 let ts_event = report.ts_last;
721 let account_id = report.account_id;
722
723 if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
728 && cached_voi != venue_order_id
729 {
730 let price = report.price.or(identity.price);
731 let Some(price) = price else {
732 log::warn!(
733 "Cannot emit OrderUpdated for cancel-replace {client_order_id}: \
734 no price on report and no cached price on identity",
735 );
736 return DispatchOutcome::Skip;
737 };
738
739 let target_total_qty = state.pending_modify_target_qty(&client_order_id);
742 let updated_quantity = target_total_qty.unwrap_or(report.quantity);
743 let sent_request = state.modify_request(&client_order_id);
744
745 state.record_venue_order_id(client_order_id, venue_order_id);
746 state.update_identity_quantity(&client_order_id, updated_quantity);
747 state.update_identity_price(&client_order_id, Some(price));
748 state.clear_pending_modify(&client_order_id);
749
750 let updated = OrderUpdated::new(
751 emitter.trader_id(),
752 identity.strategy_id,
753 identity.instrument_id,
754 client_order_id,
755 updated_quantity,
756 UUID4::new(),
757 ts_event,
758 ts_init,
759 false,
760 Some(venue_order_id),
761 Some(account_id),
762 Some(price),
763 report.trigger_price,
764 None,
765 false,
766 );
767 emitter.send_order_event(OrderEventAny::Updated(updated));
768
769 let buffered = state.drain_buffered_fills(&client_order_id);
772 for fill in buffered {
773 dispatch_order_fill(&fill, state, emitter, ts_init);
774 }
775
776 if let (Some(target), Some(sent_request)) = (target_total_qty, sent_request)
781 && let Ok(new_oid) = venue_order_id.as_str().parse::<u64>()
782 {
783 let filled = state
784 .previous_filled_qty(&client_order_id)
785 .unwrap_or_else(|| Quantity::zero(target.precision));
786 if filled < target {
787 let remaining = (target - filled).as_decimal().normalize();
788 let sent_size = sent_request.size;
789 if sent_size > remaining {
790 let mut corrective = sent_request;
791 corrective.size = remaining;
792
793 state.mark_pending_modify(client_order_id, venue_order_id, target);
794 state.stash_modify_request(client_order_id, corrective.clone());
795 state.queue_corrective(client_order_id, new_oid, corrective);
796
797 log::info!(
798 "Cancel-replace left {client_order_id} oversized on {venue_order_id} \
799 (sent {sent_size}, remaining {remaining}); queuing corrective reduce",
800 );
801 }
802 }
803 }
804
805 return DispatchOutcome::Tracked;
806 }
807
808 if state.emitted_accepted.contains(&client_order_id) {
809 state.update_identity_price(&client_order_id, report.price);
813 return DispatchOutcome::Tracked;
814 }
815
816 state.insert_accepted(client_order_id);
817 state.record_venue_order_id(client_order_id, venue_order_id);
818 state.update_identity_price(&client_order_id, report.price);
819
820 let accepted = OrderAccepted::new(
821 emitter.trader_id(),
822 identity.strategy_id,
823 identity.instrument_id,
824 client_order_id,
825 venue_order_id,
826 account_id,
827 UUID4::new(),
828 ts_event,
829 ts_init,
830 false,
831 );
832 emitter.send_order_event(OrderEventAny::Accepted(accepted));
833 DispatchOutcome::Tracked
834}
835
836fn handle_triggered(
837 report: &OrderStatusReport,
838 client_order_id: ClientOrderId,
839 identity: &OrderIdentity,
840 state: &WsDispatchState,
841 emitter: &ExecutionEventEmitter,
842 ts_init: UnixNanos,
843) -> DispatchOutcome {
844 if !matches!(
845 identity.order_type,
846 OrderType::StopLimit | OrderType::TrailingStopLimit | OrderType::LimitIfTouched
847 ) {
848 log::debug!(
849 "Ignoring TRIGGERED status for non-triggerable order type {:?}: {client_order_id}",
850 identity.order_type,
851 );
852 return DispatchOutcome::Tracked;
853 }
854
855 ensure_accepted_emitted(
856 client_order_id,
857 report.venue_order_id,
858 report.account_id,
859 identity,
860 state,
861 emitter,
862 report.ts_last,
863 ts_init,
864 );
865
866 let triggered = OrderTriggered::new(
867 emitter.trader_id(),
868 identity.strategy_id,
869 identity.instrument_id,
870 client_order_id,
871 UUID4::new(),
872 report.ts_last,
873 ts_init,
874 false,
875 Some(report.venue_order_id),
876 Some(report.account_id),
877 );
878 emitter.send_order_event(OrderEventAny::Triggered(triggered));
879 DispatchOutcome::Tracked
880}
881
882fn handle_canceled(
883 report: &OrderStatusReport,
884 client_order_id: ClientOrderId,
885 identity: &OrderIdentity,
886 state: &WsDispatchState,
887 emitter: &ExecutionEventEmitter,
888 ts_init: UnixNanos,
889) -> DispatchOutcome {
890 let venue_order_id = report.venue_order_id;
891
892 if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
896 && cached_voi != venue_order_id
897 {
898 log::debug!(
899 "Skipping stale CANCELED for {venue_order_id} (cached {cached_voi}) on {client_order_id}",
900 );
901 return DispatchOutcome::Skip;
902 }
903
904 if let Some(pending_old) = state.pending_modify(&client_order_id)
910 && pending_old == venue_order_id
911 {
912 log::debug!(
913 "Skipping cancel-before-accept leg for {client_order_id}: venue_order_id={venue_order_id}",
914 );
915 return DispatchOutcome::Skip;
916 }
917
918 if !claim_terminal_order(client_order_id, state, report.order_status) {
919 return DispatchOutcome::Skip;
920 }
921
922 ensure_accepted_emitted(
923 client_order_id,
924 venue_order_id,
925 report.account_id,
926 identity,
927 state,
928 emitter,
929 report.ts_last,
930 ts_init,
931 );
932
933 let canceled = OrderCanceled::new(
934 emitter.trader_id(),
935 identity.strategy_id,
936 identity.instrument_id,
937 client_order_id,
938 UUID4::new(),
939 report.ts_last,
940 ts_init,
941 false,
942 Some(venue_order_id),
943 Some(report.account_id),
944 );
945 emitter.send_order_event(OrderEventAny::Canceled(canceled));
946
947 state.cleanup_terminal(&client_order_id);
948 DispatchOutcome::Tracked
949}
950
951fn handle_expired(
952 report: &OrderStatusReport,
953 client_order_id: ClientOrderId,
954 identity: &OrderIdentity,
955 state: &WsDispatchState,
956 emitter: &ExecutionEventEmitter,
957 ts_init: UnixNanos,
958) -> DispatchOutcome {
959 if !claim_terminal_order(client_order_id, state, report.order_status) {
960 return DispatchOutcome::Skip;
961 }
962
963 ensure_accepted_emitted(
964 client_order_id,
965 report.venue_order_id,
966 report.account_id,
967 identity,
968 state,
969 emitter,
970 report.ts_last,
971 ts_init,
972 );
973
974 let expired = OrderExpired::new(
975 emitter.trader_id(),
976 identity.strategy_id,
977 identity.instrument_id,
978 client_order_id,
979 UUID4::new(),
980 report.ts_last,
981 ts_init,
982 false,
983 Some(report.venue_order_id),
984 Some(report.account_id),
985 );
986 emitter.send_order_event(OrderEventAny::Expired(expired));
987 state.cleanup_terminal(&client_order_id);
988 DispatchOutcome::Tracked
989}
990
991fn handle_rejected(
992 report: &OrderStatusReport,
993 client_order_id: ClientOrderId,
994 identity: &OrderIdentity,
995 state: &WsDispatchState,
996 emitter: &ExecutionEventEmitter,
997 ts_init: UnixNanos,
998) -> DispatchOutcome {
999 if !claim_terminal_order(client_order_id, state, report.order_status) {
1000 return DispatchOutcome::Skip;
1001 }
1002
1003 let reason = report
1004 .cancel_reason
1005 .clone()
1006 .unwrap_or_else(|| "Order rejected by exchange".to_string());
1007 let rejected = OrderRejected::new(
1008 emitter.trader_id(),
1009 identity.strategy_id,
1010 identity.instrument_id,
1011 client_order_id,
1012 report.account_id,
1013 Ustr::from(&reason),
1014 UUID4::new(),
1015 report.ts_last,
1016 ts_init,
1017 false,
1018 false,
1019 );
1020 emitter.send_order_event(OrderEventAny::Rejected(rejected));
1021 state.cleanup_terminal(&client_order_id);
1022 DispatchOutcome::Tracked
1023}
1024
1025fn claim_terminal_order(
1026 client_order_id: ClientOrderId,
1027 state: &WsDispatchState,
1028 status: OrderStatus,
1029) -> bool {
1030 let claimed = state.insert_filled(client_order_id);
1031 if !claimed {
1032 log::debug!("Skipping duplicate terminal event for {client_order_id}: status={status:?}",);
1033 }
1034
1035 claimed
1036}
1037
1038fn handle_filled_marker(
1039 _client_order_id: ClientOrderId,
1040 _state: &WsDispatchState,
1041) -> DispatchOutcome {
1042 DispatchOutcome::Tracked
1050}
1051
1052#[allow(clippy::too_many_arguments)]
1060fn ensure_accepted_emitted(
1061 client_order_id: ClientOrderId,
1062 venue_order_id: VenueOrderId,
1063 account_id: AccountId,
1064 identity: &OrderIdentity,
1065 state: &WsDispatchState,
1066 emitter: &ExecutionEventEmitter,
1067 ts_event: UnixNanos,
1068 ts_init: UnixNanos,
1069) {
1070 if state.emitted_accepted.contains(&client_order_id) {
1071 return;
1072 }
1073 state.insert_accepted(client_order_id);
1074 state.record_venue_order_id(client_order_id, venue_order_id);
1075
1076 let accepted = OrderAccepted::new(
1077 emitter.trader_id(),
1078 identity.strategy_id,
1079 identity.instrument_id,
1080 client_order_id,
1081 venue_order_id,
1082 account_id,
1083 UUID4::new(),
1084 ts_event,
1085 ts_init,
1086 false,
1087 );
1088 emitter.send_order_event(OrderEventAny::Accepted(accepted));
1089}
1090
1091#[cfg(test)]
1092mod tests {
1093 use nautilus_model::identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId};
1094 use rstest::rstest;
1095 use rust_decimal::Decimal;
1096
1097 use super::*;
1098 use crate::http::models::{
1099 HyperliquidExecLimitParams, HyperliquidExecOrderKind, HyperliquidExecTif,
1100 };
1101
1102 fn make_identity() -> OrderIdentity {
1103 OrderIdentity {
1104 strategy_id: StrategyId::from("S-001"),
1105 instrument_id: InstrumentId::from("BTC-USD-PERP.HYPERLIQUID"),
1106 order_side: OrderSide::Buy,
1107 order_type: OrderType::Limit,
1108 quantity: Quantity::from("0.0001"),
1109 price: None,
1110 }
1111 }
1112
1113 #[rstest]
1114 fn test_register_and_lookup_identity() {
1115 let state = WsDispatchState::new();
1116 let cid = ClientOrderId::new("O-001");
1117 state.register_identity(cid, make_identity());
1118
1119 let found = state.lookup_identity(&cid);
1120 assert!(found.is_some());
1121 let identity = found.unwrap();
1122 assert_eq!(identity.strategy_id.as_str(), "S-001");
1123 assert_eq!(identity.order_side, OrderSide::Buy);
1124 }
1125
1126 #[rstest]
1127 fn test_lookup_identity_missing_returns_none() {
1128 let state = WsDispatchState::new();
1129 let cid = ClientOrderId::new("not-tracked");
1130 assert!(state.lookup_identity(&cid).is_none());
1131 }
1132
1133 #[rstest]
1134 fn test_insert_accepted_dedup() {
1135 let state = WsDispatchState::new();
1136 let cid = ClientOrderId::new("O-002");
1137 assert!(!state.emitted_accepted.contains(&cid));
1138 state.insert_accepted(cid);
1139 assert!(state.emitted_accepted.contains(&cid));
1140 state.insert_accepted(cid);
1141 assert!(state.emitted_accepted.contains(&cid));
1142 }
1143
1144 #[rstest]
1145 fn test_check_and_insert_trade_detects_duplicates() {
1146 let state = WsDispatchState::new();
1147 let trade = TradeId::new("trade-1");
1148 assert!(!state.check_and_insert_trade(trade));
1149 assert!(state.check_and_insert_trade(trade));
1150 }
1151
1152 #[rstest]
1153 fn test_bounded_dedup_fifo_eviction_preserves_recent_ids() {
1154 let mut dedup: BoundedDedup<TradeId> = BoundedDedup::new(3);
1155 assert!(!dedup.insert(TradeId::new("t-0")));
1156 assert!(!dedup.insert(TradeId::new("t-1")));
1157 assert!(!dedup.insert(TradeId::new("t-2")));
1158 assert_eq!(dedup.len(), 3);
1159
1160 assert!(!dedup.insert(TradeId::new("t-3")));
1162 assert_eq!(dedup.len(), 3);
1163 assert!(!dedup.contains(&TradeId::new("t-0")));
1164 assert!(dedup.contains(&TradeId::new("t-1")));
1165 assert!(dedup.contains(&TradeId::new("t-3")));
1166 }
1167
1168 #[rstest]
1169 fn test_pending_modify_roundtrip() {
1170 let state = WsDispatchState::new();
1171 let cid = ClientOrderId::new("O-010");
1172 let voi = VenueOrderId::new("v-1");
1173 let target_qty = Quantity::from("0.0001");
1174
1175 assert!(state.pending_modify(&cid).is_none());
1176 assert!(state.pending_modify_target_qty(&cid).is_none());
1177 state.mark_pending_modify(cid, voi, target_qty);
1178 assert_eq!(state.pending_modify(&cid), Some(voi));
1179 assert_eq!(state.pending_modify_target_qty(&cid), Some(target_qty));
1180 state.clear_pending_modify(&cid);
1181 assert!(state.pending_modify(&cid).is_none());
1182 assert!(state.pending_modify_target_qty(&cid).is_none());
1183 }
1184
1185 #[rstest]
1186 fn test_cleanup_terminal_preserves_filled_marker() {
1187 let state = WsDispatchState::new();
1188 let cid = ClientOrderId::new("O-020");
1189 state.register_identity(cid, make_identity());
1190 state.insert_accepted(cid);
1191 state.mark_pending_modify(cid, VenueOrderId::new("v-1"), Quantity::from("0.0001"));
1192 state.insert_filled(cid);
1193 state.cleanup_terminal(&cid);
1194
1195 assert!(state.lookup_identity(&cid).is_none());
1196 assert!(!state.emitted_accepted.contains(&cid));
1197 assert!(state.pending_modify(&cid).is_none());
1198 assert!(state.pending_modify_target_qty(&cid).is_none());
1199 assert!(state.filled_orders.contains(&cid));
1201 }
1202
1203 #[rstest]
1204 fn test_cleanup_terminal_clears_corrective_state() {
1205 let state = WsDispatchState::new();
1206 let cid = ClientOrderId::new("O-021");
1207 let request = HyperliquidExecPlaceOrderRequest {
1208 asset: 0,
1209 is_buy: true,
1210 price: "100".parse::<Decimal>().unwrap(),
1211 size: Decimal::from(1),
1212 reduce_only: false,
1213 kind: HyperliquidExecOrderKind::Limit {
1214 limit: HyperliquidExecLimitParams {
1215 tif: HyperliquidExecTif::Gtc,
1216 },
1217 },
1218 cloid: None,
1219 };
1220 state.mark_pending_modify(cid, VenueOrderId::new("v-1"), Quantity::from("1"));
1221 state.stash_modify_request(cid, request.clone());
1222 state.queue_corrective(cid, 1, request);
1223 assert!(state.modify_request(&cid).is_some());
1224
1225 state.cleanup_terminal(&cid);
1226
1227 assert!(state.modify_request(&cid).is_none());
1228 assert!(state.take_corrective(&cid).is_none());
1229 assert!(state.pending_modify(&cid).is_none());
1230 }
1231}