Skip to main content

nautilus_hyperliquid/websocket/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket execution dispatch for the Hyperliquid execution client.
17//!
18//! Implements the two-tier execution dispatch contract from
19//! `docs/developer_guide/adapters.md` (lines 1232-1296):
20//!
21//! 1. The execution client registers an [`OrderIdentity`] in [`WsDispatchState`]
22//!    when it submits an order, and refreshes the cached venue order id when a
23//!    modify is sent so the WebSocket consumer can detect cancel-replace.
24//! 2. Incoming [`OrderStatusReport`] and [`FillReport`] messages are routed
25//!    through [`dispatch_order_event`] and [`dispatch_order_fill`].
26//!    For tracked orders these build typed [`OrderEventAny`] events and emit
27//!    them via [`ExecutionEventEmitter::send_order_event`]. For untracked /
28//!    external orders the dispatch falls back to forwarding the raw report.
29//!
30//! The dispatch state lives in an `Arc<WsDispatchState>` shared between the
31//! main client task (which registers identities at submission time) and the
32//! spawned WebSocket consumer task.
33//!
34//! # GH-3827 cancel-replace handling
35//!
36//! Hyperliquid implements `modify` as a cancel-and-replace: the venue emits an
37//! `ACCEPTED(new_voi)` together with a `CANCELED(old_voi)` under the same
38//! `client_order_id`. The dispatch detects the replacement leg by comparing
39//! `report.venue_order_id` to the last cached value, promotes it to an
40//! `OrderUpdated` event, and suppresses the stale cancel so strategies never
41//! observe a spurious termination.
42//!
43//! The pending-modify marker (keyed on `client_order_id`) is set by
44//! `modify_order` before the HTTP call and cleared on either the matching
45//! `ACCEPTED(new_voi)` or any modify failure. It lets dispatch skip an
46//! early `CANCELED(old_voi)` that arrives before the replacement
47//! `ACCEPTED(new_voi)` on the WebSocket, regardless of whether the WS
48//! message races ahead of the HTTP response.
49//!
50//! [`dispatch_order_fill`] buffers fills for in-flight cancel-replace
51//! modifies into [`WsDispatchState::buffered_fills`]; `handle_accepted`
52//! drains them on the replacement ACCEPTED. See GH-3972.
53
54use std::{
55    collections::VecDeque,
56    hash::Hash,
57    sync::{
58        Mutex,
59        atomic::{AtomicBool, Ordering},
60    },
61};
62
63use ahash::AHashSet;
64use dashmap::{DashMap, DashSet};
65use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
66use nautilus_live::ExecutionEventEmitter;
67use nautilus_model::{
68    enums::{OrderSide, OrderStatus, OrderType},
69    events::{
70        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
71        OrderTriggered, OrderUpdated,
72    },
73    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
74    reports::{FillReport, OrderStatusReport},
75    types::{Price, Quantity},
76};
77use ustr::Ustr;
78
79use crate::http::models::HyperliquidExecPlaceOrderRequest;
80
81pub const DEDUP_CAPACITY: usize = 10_000;
82
83/// Identity metadata captured when an order is submitted through this client.
84///
85/// Stored in [`WsDispatchState::order_identities`] keyed by the full Nautilus
86/// [`ClientOrderId`]. The dispatch functions use the identity to build typed
87/// order events for tracked orders without needing access to the engine cache
88/// (which is `!Send` and unreachable from the spawned WebSocket task).
89#[derive(Debug, Clone)]
90pub struct OrderIdentity {
91    /// Strategy that owns the order.
92    pub strategy_id: StrategyId,
93    /// Instrument the order targets.
94    pub instrument_id: InstrumentId,
95    /// Order side captured at submission.
96    pub order_side: OrderSide,
97    /// Order type captured at submission.
98    pub order_type: OrderType,
99    /// Order quantity captured at submission.
100    pub quantity: Quantity,
101    /// Last known order price. Populated on submission and refreshed from
102    /// subsequent status reports so a cancel-replace `ACCEPTED` that omits
103    /// `price` can still produce an `OrderUpdated` carrying an accurate value.
104    pub price: Option<Price>,
105}
106
107/// Bounded FIFO deduplication set.
108///
109/// When the capacity is reached, the oldest entry is evicted on the next
110/// insert. A simple `clear()` at the threshold would drop every recent trade
111/// id at once, opening a window where a reconnect or replay right after the
112/// rollover could re-emit duplicate `OrderFilled` events; the FIFO window
113/// slides instead.
114#[derive(Debug)]
115pub struct BoundedDedup<T>
116where
117    T: Eq + Hash + Clone,
118{
119    order: VecDeque<T>,
120    set: AHashSet<T>,
121    capacity: usize,
122}
123
124impl<T> BoundedDedup<T>
125where
126    T: Eq + Hash + Clone,
127{
128    /// Creates a new bounded dedup set with the given `capacity`.
129    #[must_use]
130    pub fn new(capacity: usize) -> Self {
131        Self {
132            order: VecDeque::with_capacity(capacity),
133            set: AHashSet::with_capacity(capacity),
134            capacity,
135        }
136    }
137
138    /// Inserts a value. Returns `true` when the value was already present.
139    pub fn insert(&mut self, value: T) -> bool {
140        if self.set.contains(&value) {
141            return true;
142        }
143
144        if self.order.len() >= self.capacity
145            && let Some(evicted) = self.order.pop_front()
146        {
147            self.set.remove(&evicted);
148        }
149
150        self.order.push_back(value.clone());
151        self.set.insert(value);
152        false
153    }
154
155    /// Returns the number of entries currently tracked.
156    #[must_use]
157    pub fn len(&self) -> usize {
158        self.set.len()
159    }
160
161    /// Returns whether the dedup set is empty.
162    #[must_use]
163    pub fn is_empty(&self) -> bool {
164        self.set.is_empty()
165    }
166
167    /// Returns whether the value is currently tracked.
168    #[must_use]
169    pub fn contains(&self, value: &T) -> bool {
170        self.set.contains(value)
171    }
172}
173
174/// Per-client dispatch state shared between order submission and the
175/// WebSocket consumer task.
176///
177/// Tracks which orders were submitted through this client (so we can route
178/// venue events to typed [`OrderEventAny`] emissions for tracked orders and
179/// fall back to reports for external orders), provides cross-stream dedup
180/// for `OrderAccepted` and `OrderFilled` emissions, and carries the
181/// GH-3827 cancel-replace state (`cached_venue_order_ids` and
182/// `pending_modify_keys`).
183#[derive(Debug)]
184pub struct WsDispatchState {
185    /// Tracked orders keyed by full Nautilus [`ClientOrderId`].
186    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
187    /// Client order IDs for which an `OrderAccepted` event has been emitted.
188    pub emitted_accepted: DashSet<ClientOrderId>,
189    /// Client order IDs that have reached the filled terminal state.
190    ///
191    /// Retained past `cleanup_terminal` so that late replay of the same
192    /// status or fill does not re-emit events.
193    pub filled_orders: DashSet<ClientOrderId>,
194    /// Trade IDs for which an `OrderFilled` event has been emitted.
195    ///
196    /// Bounded FIFO dedup to bound memory while keeping recent trade ids
197    /// deduped across reconnects.
198    pub emitted_trades: Mutex<BoundedDedup<TradeId>>,
199    /// Raw Hyperliquid CLOIDs that reached a terminal state through the post
200    /// response path before the matching `orderUpdates` event arrived.
201    pub terminal_cloids: Mutex<BoundedDedup<Ustr>>,
202    /// Last venue order id observed for a tracked client order id.
203    ///
204    /// Populated on the first `OrderAccepted` and refreshed on every
205    /// cancel-replace promotion. A later `ACCEPTED` with a different venue
206    /// order id under the same client order id is treated as the
207    /// replacement leg of a Hyperliquid modify and emitted as `OrderUpdated`.
208    pub cached_venue_order_ids: DashMap<ClientOrderId, VenueOrderId>,
209    /// Maps `client_order_id` to the old venue order id of an in-flight
210    /// modify. Populated by `modify_order` before the HTTP call so the WS
211    /// cancel handler sees the marker even when `CANCELED(old_voi)` arrives
212    /// before the HTTP response. Cleared on the matching `ACCEPTED(new_voi)`
213    /// or on any modify failure. A `CANCELED(old_voi)` arriving while the
214    /// marker is set is treated as the cancel leg of a cancel-before-accept
215    /// race and suppressed so the later `ACCEPTED(new_voi)` can flow through
216    /// the `OrderUpdated` path.
217    pub pending_modify_keys: DashMap<ClientOrderId, VenueOrderId>,
218    /// User-intended absolute total qty for an in-flight modify; the
219    /// cancel-replace promotion uses it instead of the venue's
220    /// remaining-only `report.quantity`.
221    pub pending_modify_target_qty: DashMap<ClientOrderId, Quantity>,
222    /// `FillReport`s buffered while a cancel-replace modify is in flight,
223    /// drained by the cancel-replace branch of `handle_accepted`. See
224    /// GH-3972.
225    pub buffered_fills: DashMap<ClientOrderId, Vec<FillReport>>,
226    /// Cumulative filled quantity per tracked order. Compared against
227    /// `OrderIdentity::quantity` to decide when to clean up tracked state.
228    pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
229    /// Exact venue request sent for an in-flight modify, used by the
230    /// cancel-replace promotion to build a corrective reduce.
231    pub pending_modify_request: DashMap<ClientOrderId, HyperliquidExecPlaceOrderRequest>,
232    /// Corrective reduce queued by the cancel-replace promotion: client order
233    /// id to (new venue order id, reduced request). Drained by the WS loop.
234    pub pending_corrective: DashMap<ClientOrderId, (u64, HyperliquidExecPlaceOrderRequest)>,
235    clearing: AtomicBool,
236}
237
238impl Default for WsDispatchState {
239    fn default() -> Self {
240        Self {
241            order_identities: DashMap::new(),
242            emitted_accepted: DashSet::default(),
243            filled_orders: DashSet::default(),
244            emitted_trades: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
245            terminal_cloids: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
246            cached_venue_order_ids: DashMap::new(),
247            pending_modify_keys: DashMap::new(),
248            pending_modify_target_qty: DashMap::new(),
249            buffered_fills: DashMap::new(),
250            order_filled_qty: DashMap::new(),
251            pending_modify_request: DashMap::new(),
252            pending_corrective: DashMap::new(),
253            clearing: AtomicBool::new(false),
254        }
255    }
256}
257
258impl WsDispatchState {
259    /// Creates a new empty dispatch state.
260    #[must_use]
261    pub fn new() -> Self {
262        Self::default()
263    }
264
265    /// Registers an order identity. Called by the execution client at order
266    /// submission time, before any WebSocket events for the order can arrive.
267    pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
268        self.order_identities.insert(client_order_id, identity);
269    }
270
271    /// Returns a clone of the identity for the given client order id, if any.
272    #[must_use]
273    pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
274        self.order_identities
275            .get(client_order_id)
276            .map(|r| r.clone())
277    }
278
279    /// Refreshes the tracked price for a modify ack when the new report
280    /// carries an updated price.
281    pub fn update_identity_price(&self, client_order_id: &ClientOrderId, price: Option<Price>) {
282        if let Some(price) = price
283            && let Some(mut entry) = self.order_identities.get_mut(client_order_id)
284        {
285            entry.price = Some(price);
286        }
287    }
288
289    /// Refreshes the tracked quantity for a modify ack.
290    pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
291        if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
292            entry.quantity = quantity;
293        }
294    }
295
296    /// Marks an `OrderAccepted` event as emitted for this order.
297    pub fn insert_accepted(&self, cid: ClientOrderId) {
298        self.evict_if_full(&self.emitted_accepted);
299        self.emitted_accepted.insert(cid);
300    }
301
302    /// Marks an order as having reached a terminal state.
303    ///
304    /// Returns `true` when this call claimed the terminal state, and `false`
305    /// when another path had already claimed it.
306    pub fn insert_filled(&self, cid: ClientOrderId) -> bool {
307        self.evict_if_full(&self.filled_orders);
308        self.filled_orders.insert(cid)
309    }
310
311    /// Atomically inserts a trade id into the dedup set.
312    ///
313    /// Returns `true` when the trade was already present (i.e. it is a
314    /// duplicate), `false` otherwise.
315    #[allow(
316        clippy::missing_panics_doc,
317        reason = "dedup mutex poisoning is not expected"
318    )]
319    pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
320        let mut set = self.emitted_trades.lock().expect(MUTEX_POISONED);
321        set.insert(trade_id)
322    }
323
324    /// Records a terminal raw Hyperliquid CLOID.
325    ///
326    /// Used when the post response rejects an order before the WebSocket
327    /// `orderUpdates` message. The normal CLOID mapping can be removed while a
328    /// late unresolved order update still gets suppressed instead of forwarded
329    /// as an external report.
330    #[allow(
331        clippy::missing_panics_doc,
332        reason = "terminal cloid mutex poisoning is not expected"
333    )]
334    pub fn insert_terminal_cloid(&self, cloid: Ustr) {
335        let mut set = self.terminal_cloids.lock().expect(MUTEX_POISONED);
336        set.insert(cloid);
337    }
338
339    /// Returns whether a raw Hyperliquid CLOID reached a terminal state through
340    /// the post response path.
341    #[allow(
342        clippy::missing_panics_doc,
343        reason = "terminal cloid mutex poisoning is not expected"
344    )]
345    #[must_use]
346    pub fn terminal_cloid_seen(&self, cloid: &Ustr) -> bool {
347        let set = self.terminal_cloids.lock().expect(MUTEX_POISONED);
348        set.contains(cloid)
349    }
350
351    /// Caches the venue order id observed for a tracked client order id.
352    pub fn record_venue_order_id(
353        &self,
354        client_order_id: ClientOrderId,
355        venue_order_id: VenueOrderId,
356    ) {
357        self.cached_venue_order_ids
358            .insert(client_order_id, venue_order_id);
359    }
360
361    /// Returns the previously cached venue order id, if any.
362    #[must_use]
363    pub fn cached_venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
364        self.cached_venue_order_ids.get(client_order_id).map(|r| *r)
365    }
366
367    /// Marks an in-flight modify for cancel-before-accept suppression and
368    /// records the target absolute total qty for the cancel-replace promotion.
369    pub fn mark_pending_modify(
370        &self,
371        client_order_id: ClientOrderId,
372        old_venue_order_id: VenueOrderId,
373        target_qty: Quantity,
374    ) {
375        self.pending_modify_keys
376            .insert(client_order_id, old_venue_order_id);
377        self.pending_modify_target_qty
378            .insert(client_order_id, target_qty);
379    }
380
381    /// Clears the pending modify marker for a client order id.
382    pub fn clear_pending_modify(&self, client_order_id: &ClientOrderId) {
383        self.pending_modify_keys.remove(client_order_id);
384        self.pending_modify_target_qty.remove(client_order_id);
385        self.pending_modify_request.remove(client_order_id);
386    }
387
388    /// Stashes the exact venue request sent for an in-flight modify.
389    pub fn stash_modify_request(
390        &self,
391        client_order_id: ClientOrderId,
392        request: HyperliquidExecPlaceOrderRequest,
393    ) {
394        self.pending_modify_request.insert(client_order_id, request);
395    }
396
397    /// Returns a clone of the stashed in-flight modify request, if any.
398    #[must_use]
399    pub fn modify_request(
400        &self,
401        client_order_id: &ClientOrderId,
402    ) -> Option<HyperliquidExecPlaceOrderRequest> {
403        self.pending_modify_request
404            .get(client_order_id)
405            .map(|r| r.clone())
406    }
407
408    /// Queues a corrective reduce for the WebSocket consumer loop to post.
409    pub fn queue_corrective(
410        &self,
411        client_order_id: ClientOrderId,
412        oid: u64,
413        request: HyperliquidExecPlaceOrderRequest,
414    ) {
415        self.pending_corrective
416            .insert(client_order_id, (oid, request));
417    }
418
419    /// Removes and returns a queued corrective reduce, if any.
420    #[must_use]
421    pub fn take_corrective(
422        &self,
423        client_order_id: &ClientOrderId,
424    ) -> Option<(u64, HyperliquidExecPlaceOrderRequest)> {
425        self.pending_corrective
426            .remove(client_order_id)
427            .map(|(_, v)| v)
428    }
429
430    /// Returns the pending modify marker for a client order id, if any.
431    #[must_use]
432    pub fn pending_modify(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
433        self.pending_modify_keys.get(client_order_id).map(|r| *r)
434    }
435
436    /// Returns the recorded target absolute total qty, if any.
437    #[must_use]
438    pub fn pending_modify_target_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
439        self.pending_modify_target_qty
440            .get(client_order_id)
441            .map(|r| *r)
442    }
443
444    /// Buffers a `FillReport` arrived during an in-flight cancel-replace.
445    pub fn buffer_fill(&self, client_order_id: ClientOrderId, fill: FillReport) {
446        self.buffered_fills
447            .entry(client_order_id)
448            .or_default()
449            .push(fill);
450    }
451
452    /// Removes and returns buffered fills for the cid, in arrival order.
453    #[must_use]
454    pub fn drain_buffered_fills(&self, client_order_id: &ClientOrderId) -> Vec<FillReport> {
455        self.buffered_fills
456            .remove(client_order_id)
457            .map(|(_, v)| v)
458            .unwrap_or_default()
459    }
460
461    /// Number of buffered fills for the cid.
462    #[must_use]
463    pub fn buffered_fill_count(&self, client_order_id: &ClientOrderId) -> usize {
464        self.buffered_fills
465            .get(client_order_id)
466            .map_or(0, |r| r.len())
467    }
468
469    /// Records cumulative filled quantity for a tracked order.
470    pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
471        self.order_filled_qty.insert(client_order_id, qty);
472    }
473
474    /// Returns the previously recorded cumulative filled quantity, if any.
475    #[must_use]
476    pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
477        self.order_filled_qty.get(client_order_id).map(|r| *r)
478    }
479
480    /// Removes all dispatch state for an order that has reached a terminal state.
481    ///
482    /// `filled_orders` is intentionally *not* cleared here: the marker is
483    /// used to suppress stale replays and must outlive the identity cleanup.
484    pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
485        self.order_identities.remove(client_order_id);
486        self.emitted_accepted.remove(client_order_id);
487        self.cached_venue_order_ids.remove(client_order_id);
488        self.pending_modify_keys.remove(client_order_id);
489        self.pending_modify_target_qty.remove(client_order_id);
490        self.pending_modify_request.remove(client_order_id);
491        self.pending_corrective.remove(client_order_id);
492        self.buffered_fills.remove(client_order_id);
493        self.order_filled_qty.remove(client_order_id);
494    }
495
496    fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
497        if set.len() >= DEDUP_CAPACITY
498            && self
499                .clearing
500                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
501                .is_ok()
502        {
503            set.clear();
504            self.clearing.store(false, Ordering::Release);
505        }
506    }
507}
508
509/// Outcome of a single dispatch call.
510#[derive(Debug, Clone, Copy, PartialEq, Eq)]
511pub enum DispatchOutcome {
512    /// The report was for a tracked order. Typed events have been emitted
513    /// (or intentionally skipped, e.g. dedup hit). The caller must not
514    /// forward the report as a fallback.
515    Tracked,
516    /// The report is for an external / untracked order. The caller should
517    /// forward the report via [`ExecutionEventEmitter::send_order_status_report`]
518    /// or [`ExecutionEventEmitter::send_fill_report`] so the engine can
519    /// reconcile.
520    External,
521    /// The report was recognised as stale (e.g. cancel leg of a
522    /// cancel-replace modify, or replay after terminal state). The caller
523    /// must drop it without forwarding.
524    Skip,
525}
526
527/// Dispatches an [`OrderStatusReport`] using the two-tier routing contract.
528///
529/// Returns [`DispatchOutcome::Tracked`] when the report maps to a tracked
530/// order (typed events have been emitted or dedup hit), [`External`] when
531/// the caller should forward the report as an untracked fallback, or
532/// [`Skip`] when the report is a stale / race leg that must be dropped.
533///
534/// [`External`]: DispatchOutcome::External
535/// [`Skip`]: DispatchOutcome::Skip
536pub fn dispatch_order_event(
537    report: &OrderStatusReport,
538    state: &WsDispatchState,
539    emitter: &ExecutionEventEmitter,
540    ts_init: UnixNanos,
541) -> DispatchOutcome {
542    let Some(client_order_id) = report.client_order_id else {
543        return DispatchOutcome::External;
544    };
545
546    if state.filled_orders.contains(&client_order_id) {
547        log::debug!(
548            "Skipping stale report for filled order: cid={client_order_id}, status={:?}",
549            report.order_status,
550        );
551        return DispatchOutcome::Skip;
552    }
553
554    let client_order_id_str = client_order_id.as_str();
555    if client_order_id_str.starts_with("0x")
556        && state.terminal_cloid_seen(&Ustr::from(client_order_id_str))
557    {
558        log::debug!(
559            "Skipping stale terminal report for raw cloid: cid={client_order_id}, status={:?}",
560            report.order_status,
561        );
562        return DispatchOutcome::Skip;
563    }
564
565    let Some(identity) = state.lookup_identity(&client_order_id) else {
566        return DispatchOutcome::External;
567    };
568
569    match report.order_status {
570        OrderStatus::Accepted => {
571            handle_accepted(report, client_order_id, &identity, state, emitter, ts_init)
572        }
573        OrderStatus::Triggered => {
574            handle_triggered(report, client_order_id, &identity, state, emitter, ts_init)
575        }
576        OrderStatus::Canceled => {
577            handle_canceled(report, client_order_id, &identity, state, emitter, ts_init)
578        }
579        OrderStatus::Expired => {
580            handle_expired(report, client_order_id, &identity, state, emitter, ts_init)
581        }
582        OrderStatus::Rejected => {
583            handle_rejected(report, client_order_id, &identity, state, emitter, ts_init)
584        }
585        OrderStatus::Filled => handle_filled_marker(client_order_id, state),
586        OrderStatus::PartiallyFilled => {
587            // Fills come via `FillReport`; nothing to emit from the status path.
588            DispatchOutcome::Tracked
589        }
590        OrderStatus::PendingUpdate
591        | OrderStatus::PendingCancel
592        | OrderStatus::Submitted
593        | OrderStatus::Initialized
594        | OrderStatus::Denied
595        | OrderStatus::Released
596        | OrderStatus::Emulated => DispatchOutcome::Tracked,
597    }
598}
599
600/// Dispatches a [`FillReport`] using the two-tier routing contract.
601///
602/// Returns [`DispatchOutcome::Tracked`] when the fill has been emitted as
603/// an `OrderFilled` event (or skipped via trade dedup), [`External`] when
604/// the caller should forward the fill via
605/// [`ExecutionEventEmitter::send_fill_report`], or [`Skip`] when the fill
606/// is a replay for an already-terminal order and must be dropped.
607///
608/// [`External`]: DispatchOutcome::External
609/// [`Skip`]: DispatchOutcome::Skip
610pub fn dispatch_order_fill(
611    report: &FillReport,
612    state: &WsDispatchState,
613    emitter: &ExecutionEventEmitter,
614    ts_init: UnixNanos,
615) -> DispatchOutcome {
616    let Some(client_order_id) = report.client_order_id else {
617        return DispatchOutcome::External;
618    };
619
620    if state.filled_orders.contains(&client_order_id) {
621        log::debug!(
622            "Skipping stale fill for filled order: cid={client_order_id}, trade_id={}",
623            report.trade_id,
624        );
625        return DispatchOutcome::Skip;
626    }
627
628    let Some(identity) = state.lookup_identity(&client_order_id) else {
629        return DispatchOutcome::External;
630    };
631
632    // Buffer fills for an in-flight cancel-replace. Marker required so a
633    // stale old-leg fill after promotion falls through instead of being
634    // stranded; a delayed earlier-leg fill during a chained modify is a
635    // known limitation. See GH-3972.
636    if state.pending_modify(&client_order_id).is_some()
637        && let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
638        && report.venue_order_id != cached_voi
639    {
640        log::debug!(
641            "Buffering cancel-replace fill for {client_order_id}: \
642             report_voi={}, cached_voi={cached_voi}, trade_id={}",
643            report.venue_order_id,
644            report.trade_id,
645        );
646        state.buffer_fill(client_order_id, report.clone());
647        return DispatchOutcome::Tracked;
648    }
649
650    if state.check_and_insert_trade(report.trade_id) {
651        log::debug!(
652            "Skipping duplicate fill for {client_order_id}: trade_id={}",
653            report.trade_id
654        );
655        return DispatchOutcome::Tracked;
656    }
657
658    let previous = state
659        .previous_filled_qty(&client_order_id)
660        .unwrap_or_else(|| Quantity::zero(report.last_qty.precision));
661    let cumulative = previous + report.last_qty;
662
663    let is_terminal_fill = cumulative >= identity.quantity;
664    if is_terminal_fill && !claim_terminal_order(client_order_id, state, OrderStatus::Filled) {
665        return DispatchOutcome::Skip;
666    }
667
668    ensure_accepted_emitted(
669        client_order_id,
670        report.venue_order_id,
671        report.account_id,
672        &identity,
673        state,
674        emitter,
675        report.ts_event,
676        ts_init,
677    );
678
679    let filled = OrderFilled::new(
680        emitter.trader_id(),
681        identity.strategy_id,
682        identity.instrument_id,
683        client_order_id,
684        report.venue_order_id,
685        report.account_id,
686        report.trade_id,
687        identity.order_side,
688        identity.order_type,
689        report.last_qty,
690        report.last_px,
691        report.commission.currency,
692        report.liquidity_side,
693        UUID4::new(),
694        report.ts_event,
695        ts_init,
696        false,
697        report.venue_position_id,
698        Some(report.commission),
699    );
700    emitter.send_order_event(OrderEventAny::Filled(filled));
701
702    state.record_filled_qty(client_order_id, cumulative);
703
704    if is_terminal_fill {
705        state.cleanup_terminal(&client_order_id);
706    }
707
708    DispatchOutcome::Tracked
709}
710
711fn handle_accepted(
712    report: &OrderStatusReport,
713    client_order_id: ClientOrderId,
714    identity: &OrderIdentity,
715    state: &WsDispatchState,
716    emitter: &ExecutionEventEmitter,
717    ts_init: UnixNanos,
718) -> DispatchOutcome {
719    let venue_order_id = report.venue_order_id;
720    let ts_event = report.ts_last;
721    let account_id = report.account_id;
722
723    // Cancel-replace detection: if an earlier ACCEPTED cached a different
724    // venue_order_id under the same client_order_id, this ACCEPTED is the
725    // replacement leg of a Hyperliquid modify and must be promoted to
726    // OrderUpdated. See GH-3827.
727    if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
728        && cached_voi != venue_order_id
729    {
730        let price = report.price.or(identity.price);
731        let Some(price) = price else {
732            log::warn!(
733                "Cannot emit OrderUpdated for cancel-replace {client_order_id}: \
734                 no price on report and no cached price on identity",
735            );
736            return DispatchOutcome::Skip;
737        };
738
739        // Prefer user target over venue's remaining-only `report.quantity`;
740        // fall back when no marker (external modify).
741        let target_total_qty = state.pending_modify_target_qty(&client_order_id);
742        let updated_quantity = target_total_qty.unwrap_or(report.quantity);
743        let sent_request = state.modify_request(&client_order_id);
744
745        state.record_venue_order_id(client_order_id, venue_order_id);
746        state.update_identity_quantity(&client_order_id, updated_quantity);
747        state.update_identity_price(&client_order_id, Some(price));
748        state.clear_pending_modify(&client_order_id);
749
750        let updated = OrderUpdated::new(
751            emitter.trader_id(),
752            identity.strategy_id,
753            identity.instrument_id,
754            client_order_id,
755            updated_quantity,
756            UUID4::new(),
757            ts_event,
758            ts_init,
759            false,
760            Some(venue_order_id),
761            Some(account_id),
762            Some(price),
763            report.trigger_price,
764            None,
765            false,
766        );
767        emitter.send_order_event(OrderEventAny::Updated(updated));
768
769        // Drain buffered fills. Bypasses `handle_execution_report`;
770        // FIFO-bounded caches make any residue benign. See GH-3972.
771        let buffered = state.drain_buffered_fills(&client_order_id);
772        for fill in buffered {
773            dispatch_order_fill(&fill, state, emitter, ts_init);
774        }
775
776        // In-flight-fill overfill guard: reduce a replacement left oversized by
777        // a fill that raced the modify (mechanism in the integration guide).
778        // Not covered (engine overfill guard backstops both): reverse WS
779        // ordering, and filled reaching target (remaining zero).
780        if let (Some(target), Some(sent_request)) = (target_total_qty, sent_request)
781            && let Ok(new_oid) = venue_order_id.as_str().parse::<u64>()
782        {
783            let filled = state
784                .previous_filled_qty(&client_order_id)
785                .unwrap_or_else(|| Quantity::zero(target.precision));
786            if filled < target {
787                let remaining = (target - filled).as_decimal().normalize();
788                let sent_size = sent_request.size;
789                if sent_size > remaining {
790                    let mut corrective = sent_request;
791                    corrective.size = remaining;
792
793                    state.mark_pending_modify(client_order_id, venue_order_id, target);
794                    state.stash_modify_request(client_order_id, corrective.clone());
795                    state.queue_corrective(client_order_id, new_oid, corrective);
796
797                    log::info!(
798                        "Cancel-replace left {client_order_id} oversized on {venue_order_id} \
799                         (sent {sent_size}, remaining {remaining}); queuing corrective reduce",
800                    );
801                }
802            }
803        }
804
805        return DispatchOutcome::Tracked;
806    }
807
808    if state.emitted_accepted.contains(&client_order_id) {
809        // Repeat ACCEPTED for an already-accepted order. Nothing to emit;
810        // refresh the cached price so a subsequent cancel-replace without a
811        // report price can still recover an accurate value.
812        state.update_identity_price(&client_order_id, report.price);
813        return DispatchOutcome::Tracked;
814    }
815
816    state.insert_accepted(client_order_id);
817    state.record_venue_order_id(client_order_id, venue_order_id);
818    state.update_identity_price(&client_order_id, report.price);
819
820    let accepted = OrderAccepted::new(
821        emitter.trader_id(),
822        identity.strategy_id,
823        identity.instrument_id,
824        client_order_id,
825        venue_order_id,
826        account_id,
827        UUID4::new(),
828        ts_event,
829        ts_init,
830        false,
831    );
832    emitter.send_order_event(OrderEventAny::Accepted(accepted));
833    DispatchOutcome::Tracked
834}
835
836fn handle_triggered(
837    report: &OrderStatusReport,
838    client_order_id: ClientOrderId,
839    identity: &OrderIdentity,
840    state: &WsDispatchState,
841    emitter: &ExecutionEventEmitter,
842    ts_init: UnixNanos,
843) -> DispatchOutcome {
844    if !matches!(
845        identity.order_type,
846        OrderType::StopLimit | OrderType::TrailingStopLimit | OrderType::LimitIfTouched
847    ) {
848        log::debug!(
849            "Ignoring TRIGGERED status for non-triggerable order type {:?}: {client_order_id}",
850            identity.order_type,
851        );
852        return DispatchOutcome::Tracked;
853    }
854
855    ensure_accepted_emitted(
856        client_order_id,
857        report.venue_order_id,
858        report.account_id,
859        identity,
860        state,
861        emitter,
862        report.ts_last,
863        ts_init,
864    );
865
866    let triggered = OrderTriggered::new(
867        emitter.trader_id(),
868        identity.strategy_id,
869        identity.instrument_id,
870        client_order_id,
871        UUID4::new(),
872        report.ts_last,
873        ts_init,
874        false,
875        Some(report.venue_order_id),
876        Some(report.account_id),
877    );
878    emitter.send_order_event(OrderEventAny::Triggered(triggered));
879    DispatchOutcome::Tracked
880}
881
882fn handle_canceled(
883    report: &OrderStatusReport,
884    client_order_id: ClientOrderId,
885    identity: &OrderIdentity,
886    state: &WsDispatchState,
887    emitter: &ExecutionEventEmitter,
888    ts_init: UnixNanos,
889) -> DispatchOutcome {
890    let venue_order_id = report.venue_order_id;
891
892    // Stale cancel suppression: if the cached venue_order_id has already
893    // been advanced by a cancel-replace promotion, this CANCELED refers to
894    // the old leg and has already been handled as OrderUpdated. See GH-3827.
895    if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
896        && cached_voi != venue_order_id
897    {
898        log::debug!(
899            "Skipping stale CANCELED for {venue_order_id} (cached {cached_voi}) on {client_order_id}",
900        );
901        return DispatchOutcome::Skip;
902    }
903
904    // Cancel-before-accept race: an in-flight modify may deliver
905    // CANCELED(old_voi) before the replacement ACCEPTED(new_voi). The
906    // pending marker (set before the modify HTTP call and cleared on
907    // failure) lets us suppress the old leg so the later ACCEPTED can route
908    // through OrderUpdated. See GH-3827.
909    if let Some(pending_old) = state.pending_modify(&client_order_id)
910        && pending_old == venue_order_id
911    {
912        log::debug!(
913            "Skipping cancel-before-accept leg for {client_order_id}: venue_order_id={venue_order_id}",
914        );
915        return DispatchOutcome::Skip;
916    }
917
918    if !claim_terminal_order(client_order_id, state, report.order_status) {
919        return DispatchOutcome::Skip;
920    }
921
922    ensure_accepted_emitted(
923        client_order_id,
924        venue_order_id,
925        report.account_id,
926        identity,
927        state,
928        emitter,
929        report.ts_last,
930        ts_init,
931    );
932
933    let canceled = OrderCanceled::new(
934        emitter.trader_id(),
935        identity.strategy_id,
936        identity.instrument_id,
937        client_order_id,
938        UUID4::new(),
939        report.ts_last,
940        ts_init,
941        false,
942        Some(venue_order_id),
943        Some(report.account_id),
944    );
945    emitter.send_order_event(OrderEventAny::Canceled(canceled));
946
947    state.cleanup_terminal(&client_order_id);
948    DispatchOutcome::Tracked
949}
950
951fn handle_expired(
952    report: &OrderStatusReport,
953    client_order_id: ClientOrderId,
954    identity: &OrderIdentity,
955    state: &WsDispatchState,
956    emitter: &ExecutionEventEmitter,
957    ts_init: UnixNanos,
958) -> DispatchOutcome {
959    if !claim_terminal_order(client_order_id, state, report.order_status) {
960        return DispatchOutcome::Skip;
961    }
962
963    ensure_accepted_emitted(
964        client_order_id,
965        report.venue_order_id,
966        report.account_id,
967        identity,
968        state,
969        emitter,
970        report.ts_last,
971        ts_init,
972    );
973
974    let expired = OrderExpired::new(
975        emitter.trader_id(),
976        identity.strategy_id,
977        identity.instrument_id,
978        client_order_id,
979        UUID4::new(),
980        report.ts_last,
981        ts_init,
982        false,
983        Some(report.venue_order_id),
984        Some(report.account_id),
985    );
986    emitter.send_order_event(OrderEventAny::Expired(expired));
987    state.cleanup_terminal(&client_order_id);
988    DispatchOutcome::Tracked
989}
990
991fn handle_rejected(
992    report: &OrderStatusReport,
993    client_order_id: ClientOrderId,
994    identity: &OrderIdentity,
995    state: &WsDispatchState,
996    emitter: &ExecutionEventEmitter,
997    ts_init: UnixNanos,
998) -> DispatchOutcome {
999    if !claim_terminal_order(client_order_id, state, report.order_status) {
1000        return DispatchOutcome::Skip;
1001    }
1002
1003    let reason = report
1004        .cancel_reason
1005        .clone()
1006        .unwrap_or_else(|| "Order rejected by exchange".to_string());
1007    let rejected = OrderRejected::new(
1008        emitter.trader_id(),
1009        identity.strategy_id,
1010        identity.instrument_id,
1011        client_order_id,
1012        report.account_id,
1013        Ustr::from(&reason),
1014        UUID4::new(),
1015        report.ts_last,
1016        ts_init,
1017        false,
1018        false,
1019    );
1020    emitter.send_order_event(OrderEventAny::Rejected(rejected));
1021    state.cleanup_terminal(&client_order_id);
1022    DispatchOutcome::Tracked
1023}
1024
1025fn claim_terminal_order(
1026    client_order_id: ClientOrderId,
1027    state: &WsDispatchState,
1028    status: OrderStatus,
1029) -> bool {
1030    let claimed = state.insert_filled(client_order_id);
1031    if !claimed {
1032        log::debug!("Skipping duplicate terminal event for {client_order_id}: status={status:?}",);
1033    }
1034
1035    claimed
1036}
1037
1038fn handle_filled_marker(
1039    _client_order_id: ClientOrderId,
1040    _state: &WsDispatchState,
1041) -> DispatchOutcome {
1042    // A status-only `FILLED` marker does not carry fill data; the actual
1043    // `OrderFilled` is emitted from `dispatch_order_fill` when the matching
1044    // trade arrives. Do *not* set `filled_orders` here, otherwise the
1045    // follow-up fill would be classified as a stale replay and dropped
1046    // before the terminal `OrderFilled` event can be emitted. The fill
1047    // path installs the marker itself once the cumulative fill quantity
1048    // matches the tracked order quantity.
1049    DispatchOutcome::Tracked
1050}
1051
1052/// Synthesizes and emits an `OrderAccepted` event when one has not yet been
1053/// emitted for the given order.
1054///
1055/// Used before emitting non-Accepted events so strategies always observe the
1056/// canonical `Submitted -> Accepted -> ...` lifecycle even when the venue
1057/// compresses the placement and follow-up event into a single message (fast
1058/// fills).
1059#[allow(clippy::too_many_arguments)]
1060fn ensure_accepted_emitted(
1061    client_order_id: ClientOrderId,
1062    venue_order_id: VenueOrderId,
1063    account_id: AccountId,
1064    identity: &OrderIdentity,
1065    state: &WsDispatchState,
1066    emitter: &ExecutionEventEmitter,
1067    ts_event: UnixNanos,
1068    ts_init: UnixNanos,
1069) {
1070    if state.emitted_accepted.contains(&client_order_id) {
1071        return;
1072    }
1073    state.insert_accepted(client_order_id);
1074    state.record_venue_order_id(client_order_id, venue_order_id);
1075
1076    let accepted = OrderAccepted::new(
1077        emitter.trader_id(),
1078        identity.strategy_id,
1079        identity.instrument_id,
1080        client_order_id,
1081        venue_order_id,
1082        account_id,
1083        UUID4::new(),
1084        ts_event,
1085        ts_init,
1086        false,
1087    );
1088    emitter.send_order_event(OrderEventAny::Accepted(accepted));
1089}
1090
1091#[cfg(test)]
1092mod tests {
1093    use nautilus_model::identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId};
1094    use rstest::rstest;
1095    use rust_decimal::Decimal;
1096
1097    use super::*;
1098    use crate::http::models::{
1099        HyperliquidExecLimitParams, HyperliquidExecOrderKind, HyperliquidExecTif,
1100    };
1101
1102    fn make_identity() -> OrderIdentity {
1103        OrderIdentity {
1104            strategy_id: StrategyId::from("S-001"),
1105            instrument_id: InstrumentId::from("BTC-USD-PERP.HYPERLIQUID"),
1106            order_side: OrderSide::Buy,
1107            order_type: OrderType::Limit,
1108            quantity: Quantity::from("0.0001"),
1109            price: None,
1110        }
1111    }
1112
1113    #[rstest]
1114    fn test_register_and_lookup_identity() {
1115        let state = WsDispatchState::new();
1116        let cid = ClientOrderId::new("O-001");
1117        state.register_identity(cid, make_identity());
1118
1119        let found = state.lookup_identity(&cid);
1120        assert!(found.is_some());
1121        let identity = found.unwrap();
1122        assert_eq!(identity.strategy_id.as_str(), "S-001");
1123        assert_eq!(identity.order_side, OrderSide::Buy);
1124    }
1125
1126    #[rstest]
1127    fn test_lookup_identity_missing_returns_none() {
1128        let state = WsDispatchState::new();
1129        let cid = ClientOrderId::new("not-tracked");
1130        assert!(state.lookup_identity(&cid).is_none());
1131    }
1132
1133    #[rstest]
1134    fn test_insert_accepted_dedup() {
1135        let state = WsDispatchState::new();
1136        let cid = ClientOrderId::new("O-002");
1137        assert!(!state.emitted_accepted.contains(&cid));
1138        state.insert_accepted(cid);
1139        assert!(state.emitted_accepted.contains(&cid));
1140        state.insert_accepted(cid);
1141        assert!(state.emitted_accepted.contains(&cid));
1142    }
1143
1144    #[rstest]
1145    fn test_check_and_insert_trade_detects_duplicates() {
1146        let state = WsDispatchState::new();
1147        let trade = TradeId::new("trade-1");
1148        assert!(!state.check_and_insert_trade(trade));
1149        assert!(state.check_and_insert_trade(trade));
1150    }
1151
1152    #[rstest]
1153    fn test_bounded_dedup_fifo_eviction_preserves_recent_ids() {
1154        let mut dedup: BoundedDedup<TradeId> = BoundedDedup::new(3);
1155        assert!(!dedup.insert(TradeId::new("t-0")));
1156        assert!(!dedup.insert(TradeId::new("t-1")));
1157        assert!(!dedup.insert(TradeId::new("t-2")));
1158        assert_eq!(dedup.len(), 3);
1159
1160        // Overflow evicts the oldest.
1161        assert!(!dedup.insert(TradeId::new("t-3")));
1162        assert_eq!(dedup.len(), 3);
1163        assert!(!dedup.contains(&TradeId::new("t-0")));
1164        assert!(dedup.contains(&TradeId::new("t-1")));
1165        assert!(dedup.contains(&TradeId::new("t-3")));
1166    }
1167
1168    #[rstest]
1169    fn test_pending_modify_roundtrip() {
1170        let state = WsDispatchState::new();
1171        let cid = ClientOrderId::new("O-010");
1172        let voi = VenueOrderId::new("v-1");
1173        let target_qty = Quantity::from("0.0001");
1174
1175        assert!(state.pending_modify(&cid).is_none());
1176        assert!(state.pending_modify_target_qty(&cid).is_none());
1177        state.mark_pending_modify(cid, voi, target_qty);
1178        assert_eq!(state.pending_modify(&cid), Some(voi));
1179        assert_eq!(state.pending_modify_target_qty(&cid), Some(target_qty));
1180        state.clear_pending_modify(&cid);
1181        assert!(state.pending_modify(&cid).is_none());
1182        assert!(state.pending_modify_target_qty(&cid).is_none());
1183    }
1184
1185    #[rstest]
1186    fn test_cleanup_terminal_preserves_filled_marker() {
1187        let state = WsDispatchState::new();
1188        let cid = ClientOrderId::new("O-020");
1189        state.register_identity(cid, make_identity());
1190        state.insert_accepted(cid);
1191        state.mark_pending_modify(cid, VenueOrderId::new("v-1"), Quantity::from("0.0001"));
1192        state.insert_filled(cid);
1193        state.cleanup_terminal(&cid);
1194
1195        assert!(state.lookup_identity(&cid).is_none());
1196        assert!(!state.emitted_accepted.contains(&cid));
1197        assert!(state.pending_modify(&cid).is_none());
1198        assert!(state.pending_modify_target_qty(&cid).is_none());
1199        // `filled_orders` outlives `cleanup_terminal` so replays stay suppressed.
1200        assert!(state.filled_orders.contains(&cid));
1201    }
1202
1203    #[rstest]
1204    fn test_cleanup_terminal_clears_corrective_state() {
1205        let state = WsDispatchState::new();
1206        let cid = ClientOrderId::new("O-021");
1207        let request = HyperliquidExecPlaceOrderRequest {
1208            asset: 0,
1209            is_buy: true,
1210            price: "100".parse::<Decimal>().unwrap(),
1211            size: Decimal::from(1),
1212            reduce_only: false,
1213            kind: HyperliquidExecOrderKind::Limit {
1214                limit: HyperliquidExecLimitParams {
1215                    tif: HyperliquidExecTif::Gtc,
1216                },
1217            },
1218            cloid: None,
1219        };
1220        state.mark_pending_modify(cid, VenueOrderId::new("v-1"), Quantity::from("1"));
1221        state.stash_modify_request(cid, request.clone());
1222        state.queue_corrective(cid, 1, request);
1223        assert!(state.modify_request(&cid).is_some());
1224
1225        state.cleanup_terminal(&cid);
1226
1227        assert!(state.modify_request(&cid).is_none());
1228        assert!(state.take_corrective(&cid).is_none());
1229        assert!(state.pending_modify(&cid).is_none());
1230    }
1231}