Skip to main content

nautilus_hyperliquid/websocket/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket execution dispatch for the Hyperliquid execution client.
17//!
18//! Implements the two-tier execution dispatch contract from
19//! `docs/developer_guide/adapters.md` (lines 1232-1296):
20//!
21//! 1. The execution client registers an [`OrderIdentity`] in [`WsDispatchState`]
22//!    when it submits an order, and refreshes the cached venue order id when a
23//!    modify is sent so the WebSocket consumer can detect cancel-replace.
24//! 2. Incoming [`OrderStatusReport`] and [`FillReport`] messages are routed
25//!    through [`dispatch_order_status_report`] and [`dispatch_fill_report`].
26//!    For tracked orders these build typed [`OrderEventAny`] events and emit
27//!    them via [`ExecutionEventEmitter::send_order_event`]. For untracked /
28//!    external orders the dispatch falls back to forwarding the raw report.
29//!
30//! The dispatch state lives in an `Arc<WsDispatchState>` shared between the
31//! main client task (which registers identities at submission time) and the
32//! spawned WebSocket consumer task.
33//!
34//! # GH-3827 cancel-replace handling
35//!
36//! Hyperliquid implements `modify` as a cancel-and-replace: the venue emits an
37//! `ACCEPTED(new_voi)` together with a `CANCELED(old_voi)` under the same
38//! `client_order_id`. The dispatch detects the replacement leg by comparing
39//! `report.venue_order_id` to the last cached value, promotes it to an
40//! `OrderUpdated` event, and suppresses the stale cancel so strategies never
41//! observe a spurious termination.
42//!
43//! The pending-modify marker (keyed on `client_order_id`) is set by
44//! `modify_order` before the HTTP call and cleared on either the matching
45//! `ACCEPTED(new_voi)` or any modify failure. It lets dispatch skip an
46//! early `CANCELED(old_voi)` that arrives before the replacement
47//! `ACCEPTED(new_voi)` on the WebSocket, regardless of whether the WS
48//! message races ahead of the HTTP response.
49//!
50//! [`dispatch_fill_report`] buffers fills for in-flight cancel-replace
51//! modifies into [`WsDispatchState::buffered_fills`]; `handle_accepted`
52//! drains them on the replacement ACCEPTED. See GH-3972.
53
54use std::{
55    collections::VecDeque,
56    hash::Hash,
57    sync::{
58        Mutex,
59        atomic::{AtomicBool, Ordering},
60    },
61};
62
63use ahash::AHashSet;
64use dashmap::{DashMap, DashSet};
65use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
66use nautilus_live::ExecutionEventEmitter;
67use nautilus_model::{
68    enums::{OrderSide, OrderStatus, OrderType},
69    events::{
70        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
71        OrderTriggered, OrderUpdated,
72    },
73    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
74    reports::{FillReport, OrderStatusReport},
75    types::{Price, Quantity},
76};
77use ustr::Ustr;
78
79pub const DEDUP_CAPACITY: usize = 10_000;
80
81/// Identity metadata captured when an order is submitted through this client.
82///
83/// Stored in [`WsDispatchState::order_identities`] keyed by the full Nautilus
84/// [`ClientOrderId`]. The dispatch functions use the identity to build typed
85/// order events for tracked orders without needing access to the engine cache
86/// (which is `!Send` and unreachable from the spawned WebSocket task).
87#[derive(Debug, Clone)]
88pub struct OrderIdentity {
89    /// Strategy that owns the order.
90    pub strategy_id: StrategyId,
91    /// Instrument the order targets.
92    pub instrument_id: InstrumentId,
93    /// Order side captured at submission.
94    pub order_side: OrderSide,
95    /// Order type captured at submission.
96    pub order_type: OrderType,
97    /// Order quantity captured at submission.
98    pub quantity: Quantity,
99    /// Last known order price. Populated on submission and refreshed from
100    /// subsequent status reports so a cancel-replace `ACCEPTED` that omits
101    /// `price` can still produce an `OrderUpdated` carrying an accurate value.
102    pub price: Option<Price>,
103}
104
105/// Bounded FIFO deduplication set.
106///
107/// When the capacity is reached, the oldest entry is evicted on the next
108/// insert. A simple `clear()` at the threshold would drop every recent trade
109/// id at once, opening a window where a reconnect or replay right after the
110/// rollover could re-emit duplicate `OrderFilled` events; the FIFO window
111/// slides instead.
112#[derive(Debug)]
113pub struct BoundedDedup<T>
114where
115    T: Eq + Hash + Clone,
116{
117    order: VecDeque<T>,
118    set: AHashSet<T>,
119    capacity: usize,
120}
121
122impl<T> BoundedDedup<T>
123where
124    T: Eq + Hash + Clone,
125{
126    /// Creates a new bounded dedup set with the given `capacity`.
127    #[must_use]
128    pub fn new(capacity: usize) -> Self {
129        Self {
130            order: VecDeque::with_capacity(capacity),
131            set: AHashSet::with_capacity(capacity),
132            capacity,
133        }
134    }
135
136    /// Inserts a value. Returns `true` when the value was already present.
137    pub fn insert(&mut self, value: T) -> bool {
138        if self.set.contains(&value) {
139            return true;
140        }
141
142        if self.order.len() >= self.capacity
143            && let Some(evicted) = self.order.pop_front()
144        {
145            self.set.remove(&evicted);
146        }
147
148        self.order.push_back(value.clone());
149        self.set.insert(value);
150        false
151    }
152
153    /// Returns the number of entries currently tracked.
154    #[must_use]
155    pub fn len(&self) -> usize {
156        self.set.len()
157    }
158
159    /// Returns whether the dedup set is empty.
160    #[must_use]
161    pub fn is_empty(&self) -> bool {
162        self.set.is_empty()
163    }
164
165    /// Returns whether the value is currently tracked.
166    #[must_use]
167    pub fn contains(&self, value: &T) -> bool {
168        self.set.contains(value)
169    }
170}
171
172/// Per-client dispatch state shared between order submission and the
173/// WebSocket consumer task.
174///
175/// Tracks which orders were submitted through this client (so we can route
176/// venue events to typed [`OrderEventAny`] emissions for tracked orders and
177/// fall back to reports for external orders), provides cross-stream dedup
178/// for `OrderAccepted` and `OrderFilled` emissions, and carries the
179/// GH-3827 cancel-replace state (`cached_venue_order_ids` and
180/// `pending_modify_keys`).
181#[derive(Debug)]
182pub struct WsDispatchState {
183    /// Tracked orders keyed by full Nautilus [`ClientOrderId`].
184    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
185    /// Client order IDs for which an `OrderAccepted` event has been emitted.
186    pub emitted_accepted: DashSet<ClientOrderId>,
187    /// Client order IDs that have reached the filled terminal state.
188    ///
189    /// Retained past `cleanup_terminal` so that late replay of the same
190    /// status or fill does not re-emit events.
191    pub filled_orders: DashSet<ClientOrderId>,
192    /// Trade IDs for which an `OrderFilled` event has been emitted.
193    ///
194    /// Bounded FIFO dedup to bound memory while keeping recent trade ids
195    /// deduped across reconnects.
196    pub emitted_trades: Mutex<BoundedDedup<TradeId>>,
197    /// Last venue order id observed for a tracked client order id.
198    ///
199    /// Populated on the first `OrderAccepted` and refreshed on every
200    /// cancel-replace promotion. A later `ACCEPTED` with a different venue
201    /// order id under the same client order id is treated as the
202    /// replacement leg of a Hyperliquid modify and emitted as `OrderUpdated`.
203    pub cached_venue_order_ids: DashMap<ClientOrderId, VenueOrderId>,
204    /// Maps `client_order_id` to the old venue order id of an in-flight
205    /// modify. Populated by `modify_order` before the HTTP call so the WS
206    /// cancel handler sees the marker even when `CANCELED(old_voi)` arrives
207    /// before the HTTP response. Cleared on the matching `ACCEPTED(new_voi)`
208    /// or on any modify failure. A `CANCELED(old_voi)` arriving while the
209    /// marker is set is treated as the cancel leg of a cancel-before-accept
210    /// race and suppressed so the later `ACCEPTED(new_voi)` can flow through
211    /// the `OrderUpdated` path.
212    pub pending_modify_keys: DashMap<ClientOrderId, VenueOrderId>,
213    /// User-intended absolute total qty for an in-flight modify; the
214    /// cancel-replace promotion uses it instead of the venue's
215    /// remaining-only `report.quantity`.
216    pub pending_modify_target_qty: DashMap<ClientOrderId, Quantity>,
217    /// `FillReport`s buffered while a cancel-replace modify is in flight,
218    /// drained by the cancel-replace branch of `handle_accepted`. See
219    /// GH-3972.
220    pub buffered_fills: DashMap<ClientOrderId, Vec<FillReport>>,
221    /// Cumulative filled quantity per tracked order. Compared against
222    /// `OrderIdentity::quantity` to decide when to clean up tracked state.
223    pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
224    clearing: AtomicBool,
225}
226
227impl Default for WsDispatchState {
228    fn default() -> Self {
229        Self {
230            order_identities: DashMap::new(),
231            emitted_accepted: DashSet::default(),
232            filled_orders: DashSet::default(),
233            emitted_trades: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
234            cached_venue_order_ids: DashMap::new(),
235            pending_modify_keys: DashMap::new(),
236            pending_modify_target_qty: DashMap::new(),
237            buffered_fills: DashMap::new(),
238            order_filled_qty: DashMap::new(),
239            clearing: AtomicBool::new(false),
240        }
241    }
242}
243
244impl WsDispatchState {
245    /// Creates a new empty dispatch state.
246    #[must_use]
247    pub fn new() -> Self {
248        Self::default()
249    }
250
251    /// Registers an order identity. Called by the execution client at order
252    /// submission time, before any WebSocket events for the order can arrive.
253    pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
254        self.order_identities.insert(client_order_id, identity);
255    }
256
257    /// Returns a clone of the identity for the given client order id, if any.
258    #[must_use]
259    pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
260        self.order_identities
261            .get(client_order_id)
262            .map(|r| r.clone())
263    }
264
265    /// Refreshes the tracked price for a modify ack when the new report
266    /// carries an updated price.
267    pub fn update_identity_price(&self, client_order_id: &ClientOrderId, price: Option<Price>) {
268        if let Some(price) = price
269            && let Some(mut entry) = self.order_identities.get_mut(client_order_id)
270        {
271            entry.price = Some(price);
272        }
273    }
274
275    /// Refreshes the tracked quantity for a modify ack.
276    pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
277        if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
278            entry.quantity = quantity;
279        }
280    }
281
282    /// Marks an `OrderAccepted` event as emitted for this order.
283    pub fn insert_accepted(&self, cid: ClientOrderId) {
284        self.evict_if_full(&self.emitted_accepted);
285        self.emitted_accepted.insert(cid);
286    }
287
288    /// Marks an order as having reached the filled terminal state.
289    pub fn insert_filled(&self, cid: ClientOrderId) {
290        self.evict_if_full(&self.filled_orders);
291        self.filled_orders.insert(cid);
292    }
293
294    /// Atomically inserts a trade id into the dedup set.
295    ///
296    /// Returns `true` when the trade was already present (i.e. it is a
297    /// duplicate), `false` otherwise.
298    #[allow(
299        clippy::missing_panics_doc,
300        reason = "dedup mutex poisoning is not expected"
301    )]
302    pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
303        let mut set = self.emitted_trades.lock().expect(MUTEX_POISONED);
304        set.insert(trade_id)
305    }
306
307    /// Caches the venue order id observed for a tracked client order id.
308    pub fn record_venue_order_id(
309        &self,
310        client_order_id: ClientOrderId,
311        venue_order_id: VenueOrderId,
312    ) {
313        self.cached_venue_order_ids
314            .insert(client_order_id, venue_order_id);
315    }
316
317    /// Returns the previously cached venue order id, if any.
318    #[must_use]
319    pub fn cached_venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
320        self.cached_venue_order_ids.get(client_order_id).map(|r| *r)
321    }
322
323    /// Marks an in-flight modify for cancel-before-accept suppression and
324    /// records the target absolute total qty for the cancel-replace promotion.
325    pub fn mark_pending_modify(
326        &self,
327        client_order_id: ClientOrderId,
328        old_venue_order_id: VenueOrderId,
329        target_qty: Quantity,
330    ) {
331        self.pending_modify_keys
332            .insert(client_order_id, old_venue_order_id);
333        self.pending_modify_target_qty
334            .insert(client_order_id, target_qty);
335    }
336
337    /// Clears the pending modify marker for a client order id.
338    pub fn clear_pending_modify(&self, client_order_id: &ClientOrderId) {
339        self.pending_modify_keys.remove(client_order_id);
340        self.pending_modify_target_qty.remove(client_order_id);
341    }
342
343    /// Returns the pending modify marker for a client order id, if any.
344    #[must_use]
345    pub fn pending_modify(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
346        self.pending_modify_keys.get(client_order_id).map(|r| *r)
347    }
348
349    /// Returns the recorded target absolute total qty, if any.
350    #[must_use]
351    pub fn pending_modify_target_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
352        self.pending_modify_target_qty
353            .get(client_order_id)
354            .map(|r| *r)
355    }
356
357    /// Buffers a `FillReport` arrived during an in-flight cancel-replace.
358    pub fn buffer_fill(&self, client_order_id: ClientOrderId, fill: FillReport) {
359        self.buffered_fills
360            .entry(client_order_id)
361            .or_default()
362            .push(fill);
363    }
364
365    /// Removes and returns buffered fills for the cid, in arrival order.
366    #[must_use]
367    pub fn drain_buffered_fills(&self, client_order_id: &ClientOrderId) -> Vec<FillReport> {
368        self.buffered_fills
369            .remove(client_order_id)
370            .map(|(_, v)| v)
371            .unwrap_or_default()
372    }
373
374    /// Number of buffered fills for the cid.
375    #[must_use]
376    pub fn buffered_fill_count(&self, client_order_id: &ClientOrderId) -> usize {
377        self.buffered_fills
378            .get(client_order_id)
379            .map_or(0, |r| r.len())
380    }
381
382    /// Records cumulative filled quantity for a tracked order.
383    pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
384        self.order_filled_qty.insert(client_order_id, qty);
385    }
386
387    /// Returns the previously recorded cumulative filled quantity, if any.
388    #[must_use]
389    pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
390        self.order_filled_qty.get(client_order_id).map(|r| *r)
391    }
392
393    /// Removes all dispatch state for an order that has reached a terminal state.
394    ///
395    /// `filled_orders` is intentionally *not* cleared here: the marker is
396    /// used to suppress stale replays and must outlive the identity cleanup.
397    pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
398        self.order_identities.remove(client_order_id);
399        self.emitted_accepted.remove(client_order_id);
400        self.cached_venue_order_ids.remove(client_order_id);
401        self.pending_modify_keys.remove(client_order_id);
402        self.pending_modify_target_qty.remove(client_order_id);
403        self.buffered_fills.remove(client_order_id);
404        self.order_filled_qty.remove(client_order_id);
405    }
406
407    fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
408        if set.len() >= DEDUP_CAPACITY
409            && self
410                .clearing
411                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
412                .is_ok()
413        {
414            set.clear();
415            self.clearing.store(false, Ordering::Release);
416        }
417    }
418}
419
420/// Outcome of a single dispatch call.
421#[derive(Debug, Clone, Copy, PartialEq, Eq)]
422pub enum DispatchOutcome {
423    /// The report was for a tracked order. Typed events have been emitted
424    /// (or intentionally skipped, e.g. dedup hit). The caller must not
425    /// forward the report as a fallback.
426    Tracked,
427    /// The report is for an external / untracked order. The caller should
428    /// forward the report via [`ExecutionEventEmitter::send_order_status_report`]
429    /// or [`ExecutionEventEmitter::send_fill_report`] so the engine can
430    /// reconcile.
431    External,
432    /// The report was recognised as stale (e.g. cancel leg of a
433    /// cancel-replace modify, or replay after terminal state). The caller
434    /// must drop it without forwarding.
435    Skip,
436}
437
438/// Dispatches an [`OrderStatusReport`] using the two-tier routing contract.
439///
440/// Returns [`DispatchOutcome::Tracked`] when the report maps to a tracked
441/// order (typed events have been emitted or dedup hit), [`External`] when
442/// the caller should forward the report as an untracked fallback, or
443/// [`Skip`] when the report is a stale / race leg that must be dropped.
444///
445/// [`External`]: DispatchOutcome::External
446/// [`Skip`]: DispatchOutcome::Skip
447pub fn dispatch_order_status_report(
448    report: &OrderStatusReport,
449    state: &WsDispatchState,
450    emitter: &ExecutionEventEmitter,
451    ts_init: UnixNanos,
452) -> DispatchOutcome {
453    let Some(client_order_id) = report.client_order_id else {
454        return DispatchOutcome::External;
455    };
456
457    if state.filled_orders.contains(&client_order_id) {
458        log::debug!(
459            "Skipping stale report for filled order: cid={client_order_id}, status={:?}",
460            report.order_status,
461        );
462        return DispatchOutcome::Skip;
463    }
464
465    let Some(identity) = state.lookup_identity(&client_order_id) else {
466        return DispatchOutcome::External;
467    };
468
469    match report.order_status {
470        OrderStatus::Accepted => {
471            handle_accepted(report, client_order_id, &identity, state, emitter, ts_init)
472        }
473        OrderStatus::Triggered => {
474            handle_triggered(report, client_order_id, &identity, state, emitter, ts_init)
475        }
476        OrderStatus::Canceled => {
477            handle_canceled(report, client_order_id, &identity, state, emitter, ts_init)
478        }
479        OrderStatus::Expired => {
480            handle_expired(report, client_order_id, &identity, state, emitter, ts_init)
481        }
482        OrderStatus::Rejected => {
483            handle_rejected(report, client_order_id, &identity, state, emitter, ts_init)
484        }
485        OrderStatus::Filled => handle_filled_marker(client_order_id, state),
486        OrderStatus::PartiallyFilled => {
487            // Fills come via `FillReport`; nothing to emit from the status path.
488            DispatchOutcome::Tracked
489        }
490        OrderStatus::PendingUpdate
491        | OrderStatus::PendingCancel
492        | OrderStatus::Submitted
493        | OrderStatus::Initialized
494        | OrderStatus::Denied
495        | OrderStatus::Released
496        | OrderStatus::Emulated => DispatchOutcome::Tracked,
497    }
498}
499
500/// Dispatches a [`FillReport`] using the two-tier routing contract.
501///
502/// Returns [`DispatchOutcome::Tracked`] when the fill has been emitted as
503/// an `OrderFilled` event (or skipped via trade dedup), [`External`] when
504/// the caller should forward the fill via
505/// [`ExecutionEventEmitter::send_fill_report`], or [`Skip`] when the fill
506/// is a replay for an already-terminal order and must be dropped.
507///
508/// [`External`]: DispatchOutcome::External
509/// [`Skip`]: DispatchOutcome::Skip
510pub fn dispatch_fill_report(
511    report: &FillReport,
512    state: &WsDispatchState,
513    emitter: &ExecutionEventEmitter,
514    ts_init: UnixNanos,
515) -> DispatchOutcome {
516    let Some(client_order_id) = report.client_order_id else {
517        return DispatchOutcome::External;
518    };
519
520    if state.filled_orders.contains(&client_order_id) {
521        log::debug!(
522            "Skipping stale fill for filled order: cid={client_order_id}, trade_id={}",
523            report.trade_id,
524        );
525        return DispatchOutcome::Skip;
526    }
527
528    let Some(identity) = state.lookup_identity(&client_order_id) else {
529        return DispatchOutcome::External;
530    };
531
532    // Buffer fills for an in-flight cancel-replace. Marker required so a
533    // stale old-leg fill after promotion falls through instead of being
534    // stranded; a delayed earlier-leg fill during a chained modify is a
535    // known limitation. See GH-3972.
536    if state.pending_modify(&client_order_id).is_some()
537        && let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
538        && report.venue_order_id != cached_voi
539    {
540        log::debug!(
541            "Buffering cancel-replace fill for {client_order_id}: \
542             report_voi={}, cached_voi={cached_voi}, trade_id={}",
543            report.venue_order_id,
544            report.trade_id,
545        );
546        state.buffer_fill(client_order_id, report.clone());
547        return DispatchOutcome::Tracked;
548    }
549
550    if state.check_and_insert_trade(report.trade_id) {
551        log::debug!(
552            "Skipping duplicate fill for {client_order_id}: trade_id={}",
553            report.trade_id
554        );
555        return DispatchOutcome::Tracked;
556    }
557
558    ensure_accepted_emitted(
559        client_order_id,
560        report.venue_order_id,
561        report.account_id,
562        &identity,
563        state,
564        emitter,
565        report.ts_event,
566        ts_init,
567    );
568
569    let filled = OrderFilled::new(
570        emitter.trader_id(),
571        identity.strategy_id,
572        identity.instrument_id,
573        client_order_id,
574        report.venue_order_id,
575        report.account_id,
576        report.trade_id,
577        identity.order_side,
578        identity.order_type,
579        report.last_qty,
580        report.last_px,
581        report.commission.currency,
582        report.liquidity_side,
583        UUID4::new(),
584        report.ts_event,
585        ts_init,
586        false,
587        report.venue_position_id,
588        Some(report.commission),
589    );
590    emitter.send_order_event(OrderEventAny::Filled(filled));
591
592    let previous = state
593        .previous_filled_qty(&client_order_id)
594        .unwrap_or_else(|| Quantity::zero(report.last_qty.precision));
595    let cumulative = previous + report.last_qty;
596    state.record_filled_qty(client_order_id, cumulative);
597
598    if cumulative >= identity.quantity {
599        state.insert_filled(client_order_id);
600        state.cleanup_terminal(&client_order_id);
601    }
602
603    DispatchOutcome::Tracked
604}
605
606fn handle_accepted(
607    report: &OrderStatusReport,
608    client_order_id: ClientOrderId,
609    identity: &OrderIdentity,
610    state: &WsDispatchState,
611    emitter: &ExecutionEventEmitter,
612    ts_init: UnixNanos,
613) -> DispatchOutcome {
614    let venue_order_id = report.venue_order_id;
615    let ts_event = report.ts_last;
616    let account_id = report.account_id;
617
618    // Cancel-replace detection: if an earlier ACCEPTED cached a different
619    // venue_order_id under the same client_order_id, this ACCEPTED is the
620    // replacement leg of a Hyperliquid modify and must be promoted to
621    // OrderUpdated. See GH-3827.
622    if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
623        && cached_voi != venue_order_id
624    {
625        let price = report.price.or(identity.price);
626        let Some(price) = price else {
627            log::warn!(
628                "Cannot emit OrderUpdated for cancel-replace {client_order_id}: \
629                 no price on report and no cached price on identity",
630            );
631            return DispatchOutcome::Skip;
632        };
633
634        // Prefer user target over venue's remaining-only `report.quantity`;
635        // fall back when no marker (external modify).
636        let updated_quantity = state
637            .pending_modify_target_qty(&client_order_id)
638            .unwrap_or(report.quantity);
639
640        state.record_venue_order_id(client_order_id, venue_order_id);
641        state.update_identity_quantity(&client_order_id, updated_quantity);
642        state.update_identity_price(&client_order_id, Some(price));
643        state.clear_pending_modify(&client_order_id);
644
645        let updated = OrderUpdated::new(
646            emitter.trader_id(),
647            identity.strategy_id,
648            identity.instrument_id,
649            client_order_id,
650            updated_quantity,
651            UUID4::new(),
652            ts_event,
653            ts_init,
654            false,
655            Some(venue_order_id),
656            Some(account_id),
657            Some(price),
658            report.trigger_price,
659            None,
660            false,
661        );
662        emitter.send_order_event(OrderEventAny::Updated(updated));
663
664        // Drain buffered fills. Bypasses `handle_execution_report`;
665        // FIFO-bounded caches make any residue benign. See GH-3972.
666        let buffered = state.drain_buffered_fills(&client_order_id);
667        for fill in buffered {
668            dispatch_fill_report(&fill, state, emitter, ts_init);
669        }
670        return DispatchOutcome::Tracked;
671    }
672
673    if state.emitted_accepted.contains(&client_order_id) {
674        // Repeat ACCEPTED for an already-accepted order. Nothing to emit;
675        // refresh the cached price so a subsequent cancel-replace without a
676        // report price can still recover an accurate value.
677        state.update_identity_price(&client_order_id, report.price);
678        return DispatchOutcome::Tracked;
679    }
680
681    state.insert_accepted(client_order_id);
682    state.record_venue_order_id(client_order_id, venue_order_id);
683    state.update_identity_price(&client_order_id, report.price);
684
685    let accepted = OrderAccepted::new(
686        emitter.trader_id(),
687        identity.strategy_id,
688        identity.instrument_id,
689        client_order_id,
690        venue_order_id,
691        account_id,
692        UUID4::new(),
693        ts_event,
694        ts_init,
695        false,
696    );
697    emitter.send_order_event(OrderEventAny::Accepted(accepted));
698    DispatchOutcome::Tracked
699}
700
701fn handle_triggered(
702    report: &OrderStatusReport,
703    client_order_id: ClientOrderId,
704    identity: &OrderIdentity,
705    state: &WsDispatchState,
706    emitter: &ExecutionEventEmitter,
707    ts_init: UnixNanos,
708) -> DispatchOutcome {
709    if !matches!(
710        identity.order_type,
711        OrderType::StopLimit | OrderType::TrailingStopLimit | OrderType::LimitIfTouched
712    ) {
713        log::debug!(
714            "Ignoring TRIGGERED status for non-triggerable order type {:?}: {client_order_id}",
715            identity.order_type,
716        );
717        return DispatchOutcome::Tracked;
718    }
719
720    ensure_accepted_emitted(
721        client_order_id,
722        report.venue_order_id,
723        report.account_id,
724        identity,
725        state,
726        emitter,
727        report.ts_last,
728        ts_init,
729    );
730
731    let triggered = OrderTriggered::new(
732        emitter.trader_id(),
733        identity.strategy_id,
734        identity.instrument_id,
735        client_order_id,
736        UUID4::new(),
737        report.ts_last,
738        ts_init,
739        false,
740        Some(report.venue_order_id),
741        Some(report.account_id),
742    );
743    emitter.send_order_event(OrderEventAny::Triggered(triggered));
744    DispatchOutcome::Tracked
745}
746
747fn handle_canceled(
748    report: &OrderStatusReport,
749    client_order_id: ClientOrderId,
750    identity: &OrderIdentity,
751    state: &WsDispatchState,
752    emitter: &ExecutionEventEmitter,
753    ts_init: UnixNanos,
754) -> DispatchOutcome {
755    let venue_order_id = report.venue_order_id;
756
757    // Stale cancel suppression: if the cached venue_order_id has already
758    // been advanced by a cancel-replace promotion, this CANCELED refers to
759    // the old leg and has already been handled as OrderUpdated. See GH-3827.
760    if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
761        && cached_voi != venue_order_id
762    {
763        log::debug!(
764            "Skipping stale CANCELED for {venue_order_id} (cached {cached_voi}) on {client_order_id}",
765        );
766        return DispatchOutcome::Skip;
767    }
768
769    // Cancel-before-accept race: an in-flight modify may deliver
770    // CANCELED(old_voi) before the replacement ACCEPTED(new_voi). The
771    // pending marker (set before the modify HTTP call and cleared on
772    // failure) lets us suppress the old leg so the later ACCEPTED can route
773    // through OrderUpdated. See GH-3827.
774    if let Some(pending_old) = state.pending_modify(&client_order_id)
775        && pending_old == venue_order_id
776    {
777        log::debug!(
778            "Skipping cancel-before-accept leg for {client_order_id}: venue_order_id={venue_order_id}",
779        );
780        return DispatchOutcome::Skip;
781    }
782
783    ensure_accepted_emitted(
784        client_order_id,
785        venue_order_id,
786        report.account_id,
787        identity,
788        state,
789        emitter,
790        report.ts_last,
791        ts_init,
792    );
793
794    let canceled = OrderCanceled::new(
795        emitter.trader_id(),
796        identity.strategy_id,
797        identity.instrument_id,
798        client_order_id,
799        UUID4::new(),
800        report.ts_last,
801        ts_init,
802        false,
803        Some(venue_order_id),
804        Some(report.account_id),
805    );
806    emitter.send_order_event(OrderEventAny::Canceled(canceled));
807
808    // Retain the filled marker so any late replay of the cancel is
809    // suppressed even after the identity state has been cleaned up.
810    state.insert_filled(client_order_id);
811    state.cleanup_terminal(&client_order_id);
812    DispatchOutcome::Tracked
813}
814
815fn handle_expired(
816    report: &OrderStatusReport,
817    client_order_id: ClientOrderId,
818    identity: &OrderIdentity,
819    state: &WsDispatchState,
820    emitter: &ExecutionEventEmitter,
821    ts_init: UnixNanos,
822) -> DispatchOutcome {
823    ensure_accepted_emitted(
824        client_order_id,
825        report.venue_order_id,
826        report.account_id,
827        identity,
828        state,
829        emitter,
830        report.ts_last,
831        ts_init,
832    );
833
834    let expired = OrderExpired::new(
835        emitter.trader_id(),
836        identity.strategy_id,
837        identity.instrument_id,
838        client_order_id,
839        UUID4::new(),
840        report.ts_last,
841        ts_init,
842        false,
843        Some(report.venue_order_id),
844        Some(report.account_id),
845    );
846    emitter.send_order_event(OrderEventAny::Expired(expired));
847    state.insert_filled(client_order_id);
848    state.cleanup_terminal(&client_order_id);
849    DispatchOutcome::Tracked
850}
851
852fn handle_rejected(
853    report: &OrderStatusReport,
854    client_order_id: ClientOrderId,
855    identity: &OrderIdentity,
856    state: &WsDispatchState,
857    emitter: &ExecutionEventEmitter,
858    ts_init: UnixNanos,
859) -> DispatchOutcome {
860    let reason = report
861        .cancel_reason
862        .clone()
863        .unwrap_or_else(|| "Order rejected by exchange".to_string());
864    let rejected = OrderRejected::new(
865        emitter.trader_id(),
866        identity.strategy_id,
867        identity.instrument_id,
868        client_order_id,
869        report.account_id,
870        Ustr::from(&reason),
871        UUID4::new(),
872        report.ts_last,
873        ts_init,
874        false,
875        false,
876    );
877    emitter.send_order_event(OrderEventAny::Rejected(rejected));
878    state.insert_filled(client_order_id);
879    state.cleanup_terminal(&client_order_id);
880    DispatchOutcome::Tracked
881}
882
883fn handle_filled_marker(
884    _client_order_id: ClientOrderId,
885    _state: &WsDispatchState,
886) -> DispatchOutcome {
887    // A status-only `FILLED` marker does not carry fill data; the actual
888    // `OrderFilled` is emitted from `dispatch_fill_report` when the matching
889    // trade arrives. Do *not* set `filled_orders` here, otherwise the
890    // follow-up fill would be classified as a stale replay and dropped
891    // before the terminal `OrderFilled` event can be emitted. The fill
892    // path installs the marker itself once the cumulative fill quantity
893    // matches the tracked order quantity.
894    DispatchOutcome::Tracked
895}
896
897/// Synthesizes and emits an `OrderAccepted` event when one has not yet been
898/// emitted for the given order.
899///
900/// Used before emitting non-Accepted events so strategies always observe the
901/// canonical `Submitted -> Accepted -> ...` lifecycle even when the venue
902/// compresses the placement and follow-up event into a single message (fast
903/// fills).
904#[allow(clippy::too_many_arguments)]
905fn ensure_accepted_emitted(
906    client_order_id: ClientOrderId,
907    venue_order_id: VenueOrderId,
908    account_id: AccountId,
909    identity: &OrderIdentity,
910    state: &WsDispatchState,
911    emitter: &ExecutionEventEmitter,
912    ts_event: UnixNanos,
913    ts_init: UnixNanos,
914) {
915    if state.emitted_accepted.contains(&client_order_id) {
916        return;
917    }
918    state.insert_accepted(client_order_id);
919    state.record_venue_order_id(client_order_id, venue_order_id);
920
921    let accepted = OrderAccepted::new(
922        emitter.trader_id(),
923        identity.strategy_id,
924        identity.instrument_id,
925        client_order_id,
926        venue_order_id,
927        account_id,
928        UUID4::new(),
929        ts_event,
930        ts_init,
931        false,
932    );
933    emitter.send_order_event(OrderEventAny::Accepted(accepted));
934}
935
936#[cfg(test)]
937mod tests {
938    use nautilus_model::identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId};
939    use rstest::rstest;
940
941    use super::*;
942
943    fn make_identity() -> OrderIdentity {
944        OrderIdentity {
945            strategy_id: StrategyId::from("S-001"),
946            instrument_id: InstrumentId::from("BTC-USD-PERP.HYPERLIQUID"),
947            order_side: OrderSide::Buy,
948            order_type: OrderType::Limit,
949            quantity: Quantity::from("0.0001"),
950            price: None,
951        }
952    }
953
954    #[rstest]
955    fn test_register_and_lookup_identity() {
956        let state = WsDispatchState::new();
957        let cid = ClientOrderId::new("O-001");
958        state.register_identity(cid, make_identity());
959
960        let found = state.lookup_identity(&cid);
961        assert!(found.is_some());
962        let identity = found.unwrap();
963        assert_eq!(identity.strategy_id.as_str(), "S-001");
964        assert_eq!(identity.order_side, OrderSide::Buy);
965    }
966
967    #[rstest]
968    fn test_lookup_identity_missing_returns_none() {
969        let state = WsDispatchState::new();
970        let cid = ClientOrderId::new("not-tracked");
971        assert!(state.lookup_identity(&cid).is_none());
972    }
973
974    #[rstest]
975    fn test_insert_accepted_dedup() {
976        let state = WsDispatchState::new();
977        let cid = ClientOrderId::new("O-002");
978        assert!(!state.emitted_accepted.contains(&cid));
979        state.insert_accepted(cid);
980        assert!(state.emitted_accepted.contains(&cid));
981        state.insert_accepted(cid);
982        assert!(state.emitted_accepted.contains(&cid));
983    }
984
985    #[rstest]
986    fn test_check_and_insert_trade_detects_duplicates() {
987        let state = WsDispatchState::new();
988        let trade = TradeId::new("trade-1");
989        assert!(!state.check_and_insert_trade(trade));
990        assert!(state.check_and_insert_trade(trade));
991    }
992
993    #[rstest]
994    fn test_bounded_dedup_fifo_eviction_preserves_recent_ids() {
995        let mut dedup: BoundedDedup<TradeId> = BoundedDedup::new(3);
996        assert!(!dedup.insert(TradeId::new("t-0")));
997        assert!(!dedup.insert(TradeId::new("t-1")));
998        assert!(!dedup.insert(TradeId::new("t-2")));
999        assert_eq!(dedup.len(), 3);
1000
1001        // Overflow evicts the oldest.
1002        assert!(!dedup.insert(TradeId::new("t-3")));
1003        assert_eq!(dedup.len(), 3);
1004        assert!(!dedup.contains(&TradeId::new("t-0")));
1005        assert!(dedup.contains(&TradeId::new("t-1")));
1006        assert!(dedup.contains(&TradeId::new("t-3")));
1007    }
1008
1009    #[rstest]
1010    fn test_pending_modify_roundtrip() {
1011        let state = WsDispatchState::new();
1012        let cid = ClientOrderId::new("O-010");
1013        let voi = VenueOrderId::new("v-1");
1014        let target_qty = Quantity::from("0.0001");
1015
1016        assert!(state.pending_modify(&cid).is_none());
1017        assert!(state.pending_modify_target_qty(&cid).is_none());
1018        state.mark_pending_modify(cid, voi, target_qty);
1019        assert_eq!(state.pending_modify(&cid), Some(voi));
1020        assert_eq!(state.pending_modify_target_qty(&cid), Some(target_qty));
1021        state.clear_pending_modify(&cid);
1022        assert!(state.pending_modify(&cid).is_none());
1023        assert!(state.pending_modify_target_qty(&cid).is_none());
1024    }
1025
1026    #[rstest]
1027    fn test_cleanup_terminal_preserves_filled_marker() {
1028        let state = WsDispatchState::new();
1029        let cid = ClientOrderId::new("O-020");
1030        state.register_identity(cid, make_identity());
1031        state.insert_accepted(cid);
1032        state.mark_pending_modify(cid, VenueOrderId::new("v-1"), Quantity::from("0.0001"));
1033        state.insert_filled(cid);
1034        state.cleanup_terminal(&cid);
1035
1036        assert!(state.lookup_identity(&cid).is_none());
1037        assert!(!state.emitted_accepted.contains(&cid));
1038        assert!(state.pending_modify(&cid).is_none());
1039        assert!(state.pending_modify_target_qty(&cid).is_none());
1040        // `filled_orders` outlives `cleanup_terminal` so replays stay suppressed.
1041        assert!(state.filled_orders.contains(&cid));
1042    }
1043}