Skip to main content

nautilus_derive/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Derive adapter.
17//!
18//! Mirrors the Hyperliquid adapter's structural pattern: an
19//! [`ExecutionClientCore`] holds identity and connection state, an
20//! [`ExecutionEventEmitter`] publishes order/account events back to the live
21//! engine, and the venue clients ([`DeriveHttpClient`], [`DeriveWebSocketClient`])
22//! handle the wire. All state-changing requests are EIP-712 typed-data signed
23//! against the per-action module contracts on the Derive Chain; the
24//! `private/order` body in particular is built by [`order_to_derive_payload`].
25
26use std::{
27    sync::{
28        Arc, Mutex,
29        atomic::{AtomicBool, Ordering},
30    },
31    time::{Duration, Instant},
32};
33
34use ahash::AHashSet;
35use anyhow::Context;
36use async_trait::async_trait;
37use nautilus_common::{
38    clients::ExecutionClient,
39    live::{get_runtime, runner::get_exec_event_sender},
40    messages::execution::{
41        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
42        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
43        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
44    },
45};
46use nautilus_core::{
47    AtomicMap, MUTEX_POISONED, UUID4, UnixNanos,
48    time::{AtomicTime, get_atomic_clock_realtime},
49};
50use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
51use nautilus_model::{
52    accounts::AccountAny,
53    data::QuoteTick,
54    enums::{OmsType, OrderSide, OrderStatus, OrderType, PositionSideSpecified},
55    events::{
56        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
57    },
58    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Symbol, Venue, VenueOrderId},
59    instruments::InstrumentAny,
60    orders::Order,
61    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
62    types::{AccountBalance, Currency, MarginBalance, Price, Quantity},
63};
64use rust_decimal::Decimal;
65use tokio::task::JoinHandle;
66use tokio_util::sync::CancellationToken;
67use ustr::Ustr;
68
69use crate::{
70    common::{
71        consts::{
72            DERIVE_ACCOUNT_REGISTRATION_TIMEOUT_SECS, DERIVE_VENUE, MIN_SIGNATURE_TTL,
73            TRIGGER_ORDER_SIGNATURE_TTL,
74        },
75        credential::DeriveCredential,
76        enums::{DeriveInstrumentType, DeriveOrderSide},
77        parse::{derive_rejection_due_post_only, format_instrument_id, format_venue_symbol},
78        retry::{http_retry_config, is_write_outcome_ambiguous_ws},
79    },
80    config::DeriveExecClientConfig,
81    http::{
82        DeriveCredentials, DeriveHttpClient,
83        models::{DeriveInstrument, DeriveOrder, DeriveTrade},
84        parse::{
85            parse_derive_order_to_report, parse_derive_position_to_report,
86            parse_derive_subaccount_to_balances, parse_derive_trade_to_fill_report,
87        },
88        query::{
89            DeriveCancelAllParams, DeriveCancelParams, DeriveCancelTriggerOrderParams,
90            DeriveGetOpenOrdersParams, DeriveGetOrderHistoryParams, DeriveGetOrderParams,
91            DeriveGetPositionsParams, DeriveGetSubaccountParams, DeriveGetTradeHistoryParams,
92            DeriveGetTriggerOrdersParams, order_replace_to_derive_payload, order_to_derive_payload,
93            trigger_order_to_derive_payload,
94        },
95    },
96    signing::{
97        context::{SigningContext, resolve_signing_context},
98        nonce::NonceManager,
99    },
100    websocket::{
101        DeriveOrdersSubscriptionData, DeriveTradesSubscriptionData, DeriveWebSocketClient,
102        DeriveWsChannel, DeriveWsCredentials, DeriveWsError, DeriveWsExecutionHandle,
103        DeriveWsMessage, OrderIdentity, WsDispatchState, parse::parse_ticker_quote_from_rest,
104    },
105};
106
107const DERIVE_PRIVATE_PAGE_SIZE: u32 = 500;
108
109/// Live execution client for Derive.
110///
111/// Owns the HTTP and WebSocket clients used to talk to the venue plus an
112/// [`ExecutionEventEmitter`] that publishes order/account events back to the
113/// live engine. Order operations are signed against the per-environment
114/// EIP-712 signing context resolved at construction.
115#[derive(Debug)]
116pub struct DeriveExecutionClient {
117    core: ExecutionClientCore,
118    clock: &'static AtomicTime,
119    config: DeriveExecClientConfig,
120    credential: DeriveCredential,
121    emitter: ExecutionEventEmitter,
122    http_client: DeriveHttpClient,
123    ws_client: DeriveWebSocketClient,
124    ws_exec: DeriveWsExecutionHandle,
125    instruments: Arc<AtomicMap<InstrumentId, DeriveInstrument>>,
126    nonce_manager: Arc<NonceManager>,
127    signing: SigningContext,
128    is_connected: AtomicBool,
129    cancellation_token: CancellationToken,
130    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
131    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
132    dispatch_state: Arc<WsDispatchState>,
133}
134
135impl DeriveExecutionClient {
136    /// Creates a new [`DeriveExecutionClient`].
137    ///
138    /// Resolves wallet/session-key/subaccount from the supplied config, falling
139    /// back to the documented environment variables when fields are unset, and
140    /// parses the EIP-712 signing constants (domain separator, action typehash,
141    /// trade-module address) from config overrides or the shipped per-environment
142    /// defaults.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error when:
147    /// - Required credentials are not provided via config or environment.
148    /// - Signing constants are still placeholders or cannot be parsed as hex.
149    /// - The HTTP or WebSocket client cannot be constructed.
150    pub fn new(core: ExecutionClientCore, config: DeriveExecClientConfig) -> anyhow::Result<Self> {
151        let credential = DeriveCredential::resolve(
152            config.wallet_address.clone(),
153            config.session_key.clone(),
154            config.subaccount_id,
155            config.environment,
156        )?;
157
158        let http_credentials = DeriveCredentials::new(
159            credential.wallet_address().to_string(),
160            credential.session_key(),
161        )
162        .context("failed to build Derive HTTP credentials")?;
163        let retry_config = http_retry_config(
164            config.max_retries,
165            config.retry_delay_initial_ms,
166            config.retry_delay_max_ms,
167        );
168        let http_client = DeriveHttpClient::with_credentials(
169            config.rest_url(),
170            http_credentials,
171            Some(config.http_timeout_secs),
172            config.proxy_url.clone(),
173            Some(retry_config),
174        )
175        .context("failed to create Derive HTTP client")?;
176
177        let ws_credentials = DeriveWsCredentials::new(
178            credential.wallet_address().to_string(),
179            credential.session_key(),
180        )
181        .context("failed to build Derive WebSocket credentials")?;
182        let ws_client = DeriveWebSocketClient::with_credentials(
183            Some(config.ws_url()),
184            config.environment,
185            config.transport_backend,
186            config.proxy_url.clone(),
187            ws_credentials,
188            config.max_matching_requests_per_second,
189        );
190        // The handle shares the client's command channel, which survives the
191        // reconnect swap, so it stays valid for the client's lifetime.
192        let ws_exec = ws_client.execution_handle();
193
194        let signing = resolve_signing_context(&credential, &config)?;
195
196        let clock = get_atomic_clock_realtime();
197        let emitter = ExecutionEventEmitter::new(
198            clock,
199            core.trader_id,
200            core.account_id,
201            core.account_type,
202            core.base_currency,
203        );
204
205        Ok(Self {
206            core,
207            clock,
208            config,
209            credential,
210            emitter,
211            http_client,
212            ws_client,
213            ws_exec,
214            instruments: Arc::new(AtomicMap::new()),
215            nonce_manager: Arc::new(NonceManager::new()),
216            signing,
217            is_connected: AtomicBool::new(false),
218            cancellation_token: CancellationToken::new(),
219            pending_tasks: Mutex::new(Vec::new()),
220            ws_stream_handle: Mutex::new(None),
221            dispatch_state: Arc::new(WsDispatchState::new()),
222        })
223    }
224
225    /// Returns the resolved subaccount id.
226    #[must_use]
227    pub const fn subaccount_id(&self) -> u64 {
228        self.credential.subaccount_id()
229    }
230
231    /// Returns a reference to the resolved configuration.
232    #[must_use]
233    pub fn config(&self) -> &DeriveExecClientConfig {
234        &self.config
235    }
236
237    /// Returns a reference to the underlying HTTP client.
238    #[must_use]
239    pub fn http_client(&self) -> &DeriveHttpClient {
240        &self.http_client
241    }
242
243    /// Caches a Derive instrument by instrument ID so order submission can
244    /// resolve `base_asset_address` and `base_asset_sub_id` without
245    /// re-querying the venue.
246    pub fn cache_instrument(&self, instrument: DeriveInstrument) {
247        let instrument_id = format_instrument_id(instrument.instrument_name);
248        self.instruments.insert(instrument_id, instrument);
249    }
250
251    /// Spawns a fire-and-forget task tracked in `pending_tasks` for teardown.
252    fn spawn_task<F>(&self, description: &'static str, fut: F)
253    where
254        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
255    {
256        let runtime = get_runtime();
257        let handle = runtime.spawn(async move {
258            if let Err(e) = fut.await {
259                log::warn!("{description} failed: {e:?}");
260            }
261        });
262
263        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
264        tasks.retain(|handle| !handle.is_finished());
265        tasks.push(handle);
266    }
267
268    fn abort_pending_tasks(&self) {
269        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
270        for handle in tasks.drain(..) {
271            handle.abort();
272        }
273    }
274
275    async fn ensure_instruments_initialized(&self) -> anyhow::Result<()> {
276        if self.core.instruments_initialized() {
277            return Ok(());
278        }
279        // Lazy bootstrap: exec-side fetches per-instrument on first reference.
280        // Marking the flag prevents duplicate work across reconnect cycles.
281        self.core.set_instruments_initialized();
282        Ok(())
283    }
284
285    async fn refresh_account_state(&self) -> anyhow::Result<()> {
286        let value = self
287            .http_client
288            .get_subaccount(&DeriveGetSubaccountParams::new(
289                self.credential.subaccount_id(),
290            ))
291            .await
292            .context("failed to fetch Derive subaccount snapshot")?;
293        let (balances, margins) = parse_derive_subaccount_to_balances(&value)
294            .context("failed to parse Derive subaccount balances")?;
295        let ts_event = self.clock.get_time_ns();
296        self.emitter
297            .emit_account_state(balances, margins, true, ts_event);
298        Ok(())
299    }
300
301    /// Blocks until the account appears in the cache, or `timeout_secs` elapses.
302    ///
303    /// The execution engine populates the cache from the [`refresh_account_state`]
304    /// event asynchronously; strategies that begin issuing orders before the
305    /// account is registered race the portfolio. Connecting blocks here so the
306    /// runner can rely on `core.cache().account(account_id)` immediately after
307    /// `connect()` returns.
308    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
309        let account_id = self.core.account_id;
310
311        if self.core.cache().account(&account_id).is_some() {
312            log::info!("Account {account_id} registered");
313            return Ok(());
314        }
315
316        let start = Instant::now();
317        let timeout = Duration::from_secs_f64(timeout_secs);
318        let interval = Duration::from_millis(10);
319
320        loop {
321            tokio::time::sleep(interval).await;
322
323            if self.core.cache().account(&account_id).is_some() {
324                log::info!("Account {account_id} registered");
325                return Ok(());
326            }
327
328            if start.elapsed() >= timeout {
329                anyhow::bail!(
330                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
331                );
332            }
333        }
334    }
335
336    /// Reverses the partial state `connect()` set up before the failing step:
337    /// cancels the shared cancellation token, aborts the WS dispatch task,
338    /// and closes the WS client. Used when initial account state cannot be
339    /// loaded so that the next `connect()` call starts from a clean slate.
340    async fn teardown_partial_connect(&mut self) {
341        self.cancellation_token.cancel();
342
343        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
344            handle.abort();
345        }
346
347        if let Err(e) = self.ws_client.disconnect().await {
348            log::warn!("Error tearing down Derive WebSocket after connect failure: {e}");
349        }
350        self.abort_pending_tasks();
351    }
352
353    fn start_ws_dispatch(&self, rx: tokio::sync::mpsc::UnboundedReceiver<DeriveWsMessage>) {
354        let emitter = self.emitter.clone();
355        let account_id = self.core.account_id;
356        let clock = self.clock;
357        let cancellation = self.cancellation_token.clone();
358        let dispatch_state = self.dispatch_state.clone();
359
360        let handle = get_runtime().spawn(async move {
361            let mut rx = rx;
362
363            loop {
364                tokio::select! {
365                    biased;
366                    () = cancellation.cancelled() => break,
367                    maybe = rx.recv() => {
368                        match maybe {
369                            Some(message) => handle_ws_message(
370                                message,
371                                &emitter,
372                                account_id,
373                                clock,
374                                &dispatch_state,
375                            ),
376                            None => break,
377                        }
378                    }
379                }
380            }
381        });
382        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
383    }
384}
385
386#[async_trait(?Send)]
387impl ExecutionClient for DeriveExecutionClient {
388    fn is_connected(&self) -> bool {
389        self.is_connected.load(Ordering::Acquire)
390    }
391
392    fn client_id(&self) -> ClientId {
393        self.core.client_id
394    }
395
396    fn account_id(&self) -> AccountId {
397        self.core.account_id
398    }
399
400    fn venue(&self) -> Venue {
401        *DERIVE_VENUE
402    }
403
404    fn oms_type(&self) -> OmsType {
405        self.core.oms_type
406    }
407
408    fn get_account(&self) -> Option<AccountAny> {
409        self.core.cache().account_owned(&self.core.account_id)
410    }
411
412    fn start(&mut self) -> anyhow::Result<()> {
413        if self.core.is_started() {
414            return Ok(());
415        }
416
417        let sender = get_exec_event_sender();
418        self.emitter.set_sender(sender);
419        self.core.set_started();
420
421        log::info!(
422            "Started: client_id={}, account_id={}, subaccount_id={}, environment={:?}, proxy_url={:?}",
423            self.core.client_id,
424            self.core.account_id,
425            self.credential.subaccount_id(),
426            self.config.environment,
427            self.config.proxy_url,
428        );
429        Ok(())
430    }
431
432    fn stop(&mut self) -> anyhow::Result<()> {
433        if self.core.is_stopped() {
434            return Ok(());
435        }
436
437        log::info!("Stopping Derive execution client");
438
439        self.cancellation_token.cancel();
440
441        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
442            handle.abort();
443        }
444        self.abort_pending_tasks();
445
446        self.core.set_disconnected();
447        self.core.set_stopped();
448        self.is_connected.store(false, Ordering::Release);
449
450        log::info!("Derive execution client stopped");
451        Ok(())
452    }
453
454    async fn connect(&mut self) -> anyhow::Result<()> {
455        if self.is_connected() {
456            return Ok(());
457        }
458
459        log::info!("Connecting Derive execution client");
460
461        if self.cancellation_token.is_cancelled() {
462            self.cancellation_token = CancellationToken::new();
463        }
464
465        self.ensure_instruments_initialized()
466            .await
467            .context("failed to initialize Derive instruments")?;
468
469        self.ws_client
470            .connect()
471            .await
472            .context("failed to connect Derive WebSocket")?;
473        let rx = self
474            .ws_client
475            .take_event_receiver()
476            .context("Derive execution WS event receiver not initialized")?;
477
478        let subaccount_id = self.credential.subaccount_id();
479        let channels = vec![
480            DeriveWsChannel::orders(subaccount_id),
481            DeriveWsChannel::private_trades(subaccount_id),
482            DeriveWsChannel::balances(subaccount_id),
483        ];
484
485        if let Err(e) = self.ws_client.subscribe_channels(channels).await {
486            log::warn!("Derive private WS subscriptions failed: {e}");
487        }
488
489        self.start_ws_dispatch(rx);
490
491        // Fail-fast if the initial account snapshot cannot load: without it,
492        // `await_account_registered` would block the full timeout window and
493        // surface a misleading registration timeout. Tear down the WS we
494        // already started so the caller does not leak the dispatch task.
495        if let Err(e) = self.refresh_account_state().await {
496            log::warn!("Initial Derive account state refresh failed: {e}; tearing down");
497            self.teardown_partial_connect().await;
498            return Err(e.context("failed initial Derive account state refresh"));
499        }
500
501        if let Err(e) = self
502            .await_account_registered(DERIVE_ACCOUNT_REGISTRATION_TIMEOUT_SECS)
503            .await
504        {
505            log::warn!("Derive account did not register in time: {e}; tearing down");
506            self.teardown_partial_connect().await;
507            return Err(e.context("failed waiting for Derive account registration"));
508        }
509
510        self.core.set_connected();
511        self.is_connected.store(true, Ordering::Release);
512        log::info!(
513            "Connected Derive execution client ({:?})",
514            self.config.environment
515        );
516        Ok(())
517    }
518
519    async fn disconnect(&mut self) -> anyhow::Result<()> {
520        if !self.is_connected() {
521            return Ok(());
522        }
523
524        log::info!("Disconnecting Derive execution client");
525        self.cancellation_token.cancel();
526
527        if let Err(e) = self.ws_client.disconnect().await {
528            log::warn!("Error while disconnecting Derive execution WebSocket: {e}");
529        }
530
531        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
532            handle.abort();
533        }
534        self.abort_pending_tasks();
535
536        self.core.set_disconnected();
537        self.is_connected.store(false, Ordering::Release);
538        log::info!("Derive execution client disconnected");
539        Ok(())
540    }
541
542    fn generate_account_state(
543        &self,
544        balances: Vec<AccountBalance>,
545        margins: Vec<MarginBalance>,
546        reported: bool,
547        ts_event: UnixNanos,
548    ) -> anyhow::Result<()> {
549        self.emitter
550            .emit_account_state(balances, margins, reported, ts_event);
551        Ok(())
552    }
553
554    fn on_instrument(&mut self, _instrument: InstrumentAny) {
555        // The exec-side instrument cache holds `DeriveInstrument` records so
556        // signing can pull `base_asset_address` / `base_asset_sub_id`; the
557        // generic `InstrumentAny` shape published on the bus does not carry
558        // those, so the data client populates the cache via
559        // [`Self::cache_instrument`] from its bootstrap pass instead.
560    }
561
562    async fn generate_order_status_report(
563        &self,
564        cmd: &GenerateOrderStatusReport,
565    ) -> anyhow::Result<Option<OrderStatusReport>> {
566        if cmd.venue_order_id.is_none() && cmd.client_order_id.is_none() {
567            log::warn!(
568                "Derive generate_order_status_report requires venue_order_id or client_order_id"
569            );
570            return Ok(None);
571        }
572
573        let subaccount_id = self.credential.subaccount_id();
574        let order = if let Some(venue_order_id) = cmd.venue_order_id {
575            match self
576                .http_client
577                .get_order(&DeriveGetOrderParams::new(
578                    subaccount_id,
579                    venue_order_id.as_str(),
580                ))
581                .await
582            {
583                Ok(order) => Some(order),
584                Err(e) => {
585                    let trigger_orders = self
586                        .http_client
587                        .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
588                        .await?
589                        .orders;
590
591                    match trigger_orders
592                        .into_iter()
593                        .find(|o| o.order_id.as_str() == venue_order_id.as_str())
594                    {
595                        Some(order) => Some(order),
596                        None => return Err(e.into()),
597                    }
598                }
599            }
600        } else {
601            // Derive has no by-label lookup endpoint; scan open orders first,
602            // then trigger orders, then fall through to paginated history so
603            // terminal orders resolve for reconcilers that only carry the
604            // client_order_id.
605            let label = cmd.client_order_id.expect("guarded above");
606            let open_orders = self
607                .http_client
608                .get_open_orders(&DeriveGetOpenOrdersParams::new(subaccount_id))
609                .await?
610                .orders;
611            let mut found = open_orders
612                .into_iter()
613                .find(|o| o.label.as_str() == label.as_str());
614
615            if found.is_none() {
616                let trigger_orders = self
617                    .http_client
618                    .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
619                    .await?
620                    .orders;
621                found = trigger_orders
622                    .into_iter()
623                    .find(|o| o.label.as_str() == label.as_str());
624            }
625
626            if found.is_none() {
627                let instrument_name = cmd.instrument_id.map(|id| id.symbol.as_str().to_string());
628                let mut page: u32 = 1;
629
630                'history: loop {
631                    let mut params = DeriveGetOrderHistoryParams::new(
632                        subaccount_id,
633                        page,
634                        DERIVE_PRIVATE_PAGE_SIZE,
635                    );
636
637                    if let Some(name) = instrument_name.as_deref() {
638                        params = params.with_instrument_name(name);
639                    }
640
641                    let result = self.http_client.get_order_history(&params).await?;
642                    let total_pages = result.pagination.num_pages;
643
644                    for order in result.orders {
645                        if order.label.as_str() == label.as_str() {
646                            found = Some(order);
647                            break 'history;
648                        }
649                    }
650
651                    if (page as i64) >= total_pages || total_pages == 0 {
652                        break;
653                    }
654                    page += 1;
655                }
656            }
657            found
658        };
659
660        let Some(order) = order else {
661            return Ok(None);
662        };
663
664        if let Some(instrument_id) = cmd.instrument_id
665            && InstrumentId::new(Symbol::new(order.instrument_name.as_str()), *DERIVE_VENUE)
666                != instrument_id
667        {
668            log::warn!(
669                "Derive order {} is for {} but report requested {}",
670                order.order_id,
671                order.instrument_name.as_str(),
672                instrument_id,
673            );
674            return Ok(None);
675        }
676
677        let ts_init = self.clock.get_time_ns();
678        let mut report = parse_derive_order_to_report(&order, self.core.account_id, ts_init)?;
679        // Prefer the parsed label (the venue's source of truth); only stamp
680        // the cmd's id when the venue order has no label at all.
681        if report.client_order_id.is_none()
682            && let Some(client_order_id) = cmd.client_order_id
683        {
684            report = report.with_client_order_id(client_order_id);
685        }
686        Ok(Some(report))
687    }
688
689    async fn generate_order_status_reports(
690        &self,
691        cmd: &GenerateOrderStatusReports,
692    ) -> anyhow::Result<Vec<OrderStatusReport>> {
693        let subaccount_id = self.credential.subaccount_id();
694        let instrument_name = cmd.instrument_id.map(|id| id.symbol.as_str().to_string());
695
696        // open_only routes to private/get_open_orders and
697        // private/get_trigger_orders regardless of window; the venue
698        // endpoints have no time bound but the caller's start/end is applied
699        // below. For full history we walk private/get_order_history pages,
700        // scoped to the optional window.
701        let orders: Vec<DeriveOrder> = if cmd.open_only {
702            let mut orders = self
703                .http_client
704                .get_open_orders(&DeriveGetOpenOrdersParams::new(subaccount_id))
705                .await?
706                .orders;
707            orders.extend(
708                self.http_client
709                    .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
710                    .await?
711                    .orders,
712            );
713            orders
714        } else {
715            let start_ms = cmd.start.map(|t| t.as_millis() as i64);
716            let end_ms = cmd.end.map(|t| t.as_millis() as i64);
717            let mut page: u32 = 1;
718            let mut collected: Vec<DeriveOrder> = Vec::new();
719
720            loop {
721                let mut params =
722                    DeriveGetOrderHistoryParams::new(subaccount_id, page, DERIVE_PRIVATE_PAGE_SIZE)
723                        .with_window(start_ms, end_ms);
724
725                if let Some(name) = instrument_name.as_deref() {
726                    params = params.with_instrument_name(name);
727                }
728
729                let result = self.http_client.get_order_history(&params).await?;
730                let total_pages = result.pagination.num_pages;
731                collected.extend(result.orders);
732
733                if (page as i64) >= total_pages || total_pages == 0 {
734                    break;
735                }
736                page += 1;
737            }
738            collected
739        };
740
741        let ts_init = self.clock.get_time_ns();
742        let start_ms = cmd.start.map(|t| t.as_millis() as i64);
743        let end_ms = cmd.end.map(|t| t.as_millis() as i64);
744        let mut reports = Vec::with_capacity(orders.len());
745        for order in orders {
746            if let Some(instrument_id) = cmd.instrument_id
747                && InstrumentId::new(Symbol::new(order.instrument_name.as_str()), *DERIVE_VENUE)
748                    != instrument_id
749            {
750                continue;
751            }
752            // open_only routed via private/get_open_orders ignores time bounds
753            // at the venue level; apply the command's window here so callers
754            // asking for "open orders since X" get exactly that.
755            if let Some(start) = start_ms
756                && order.last_update_timestamp < start
757            {
758                continue;
759            }
760
761            if let Some(end) = end_ms
762                && order.last_update_timestamp > end
763            {
764                continue;
765            }
766
767            match parse_derive_order_to_report(&order, self.core.account_id, ts_init) {
768                Ok(report) => reports.push(report),
769                Err(e) => log::warn!("Skipping order in status report: {e}"),
770            }
771        }
772        Ok(reports)
773    }
774
775    async fn generate_fill_reports(
776        &self,
777        cmd: GenerateFillReports,
778    ) -> anyhow::Result<Vec<FillReport>> {
779        let instrument_name = cmd.instrument_id.map(|id| id.symbol.as_str().to_string());
780        let mut page: u32 = 1;
781        let mut all_trades: Vec<DeriveTrade> = Vec::new();
782
783        loop {
784            let mut params = DeriveGetTradeHistoryParams::new(
785                self.credential.subaccount_id(),
786                page,
787                DERIVE_PRIVATE_PAGE_SIZE,
788            )
789            .with_window(
790                cmd.start.map(|t| t.as_millis() as i64),
791                cmd.end.map(|t| t.as_millis() as i64),
792            );
793
794            if let Some(name) = instrument_name.as_deref() {
795                params = params.with_instrument_name(name);
796            }
797
798            let result = self.http_client.get_private_trade_history(&params).await?;
799            let total_pages = result.pagination.num_pages;
800            all_trades.extend(result.trades);
801
802            if (page as i64) >= total_pages || total_pages == 0 {
803                break;
804            }
805            page += 1;
806        }
807
808        let ts_init = self.clock.get_time_ns();
809        let fee_currency = Currency::USDC();
810        let venue_order_id_filter = cmd
811            .venue_order_id
812            .as_ref()
813            .map(|id| id.as_str().to_string());
814        let mut reports = Vec::with_capacity(all_trades.len());
815        for trade in all_trades {
816            if let Some(target) = venue_order_id_filter.as_deref()
817                && trade.order_id != target
818            {
819                continue;
820            }
821
822            match parse_derive_trade_to_fill_report(
823                &trade,
824                self.core.account_id,
825                fee_currency,
826                ts_init,
827            ) {
828                Ok(Some(report)) => {
829                    // Cross-source dedup against the WS dispatch path: if the
830                    // live stream already emitted this trade, the reconciler
831                    // should not see it again.
832                    if self.dispatch_state.check_and_insert_trade(report.trade_id) {
833                        log::debug!(
834                            "Skipping duplicate Derive fill (trade_id={}) in generate_fill_reports",
835                            report.trade_id,
836                        );
837                        continue;
838                    }
839                    reports.push(report);
840                }
841                Ok(None) => {}
842                Err(e) => log::warn!("Skipping trade in fill report: {e}"),
843            }
844        }
845        Ok(reports)
846    }
847
848    async fn generate_position_status_reports(
849        &self,
850        cmd: &GeneratePositionStatusReports,
851    ) -> anyhow::Result<Vec<PositionStatusReport>> {
852        let positions = self
853            .http_client
854            .get_positions(&DeriveGetPositionsParams::new(
855                self.credential.subaccount_id(),
856            ))
857            .await?
858            .positions;
859        let ts_init = self.clock.get_time_ns();
860        let mut reports = Vec::with_capacity(positions.len());
861        for position in positions {
862            if let Some(target) = cmd.instrument_id
863                && InstrumentId::new(
864                    Symbol::new(position.instrument_name.as_str()),
865                    *DERIVE_VENUE,
866                ) != target
867            {
868                continue;
869            }
870
871            match parse_derive_position_to_report(&position, self.core.account_id, ts_init) {
872                Ok(report) => reports.push(report),
873                Err(e) => log::warn!("Skipping position in status report: {e}"),
874            }
875        }
876        Ok(reports)
877    }
878
879    async fn generate_mass_status(
880        &self,
881        lookback_mins: Option<u64>,
882    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
883        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
884
885        let ts_now = self.clock.get_time_ns();
886        let start = lookback_mins.map(|mins| {
887            let lookback_ns = mins.saturating_mul(60).saturating_mul(1_000_000_000);
888            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
889        });
890
891        let open_order_cmd = GenerateOrderStatusReports::new(
892            UUID4::new(),
893            ts_now,
894            true,
895            None,
896            None,
897            None,
898            None,
899            None,
900        );
901        let history_order_cmd = GenerateOrderStatusReports::new(
902            UUID4::new(),
903            ts_now,
904            false,
905            None,
906            start,
907            None,
908            None,
909            None,
910        );
911        let fill_cmd =
912            GenerateFillReports::new(UUID4::new(), ts_now, None, None, start, None, None, None);
913        let position_cmd =
914            GeneratePositionStatusReports::new(UUID4::new(), ts_now, None, None, None, None, None);
915
916        let (history_order_reports, open_order_reports, fill_reports, position_reports) = tokio::try_join!(
917            self.generate_order_status_reports(&history_order_cmd),
918            self.generate_order_status_reports(&open_order_cmd),
919            self.generate_fill_reports(fill_cmd),
920            self.generate_position_status_reports(&position_cmd),
921        )?;
922
923        log::info!(
924            "Received {} historical OrderStatusReports",
925            history_order_reports.len()
926        );
927        log::info!(
928            "Received {} open OrderStatusReports",
929            open_order_reports.len()
930        );
931        log::info!("Received {} FillReports", fill_reports.len());
932        log::info!("Received {} PositionReports", position_reports.len());
933
934        let mut touched_instruments = AHashSet::new();
935
936        for report in history_order_reports
937            .iter()
938            .chain(open_order_reports.iter())
939        {
940            touched_instruments.insert(report.instrument_id);
941        }
942
943        for report in &fill_reports {
944            touched_instruments.insert(report.instrument_id);
945        }
946
947        let mut mass_status = ExecutionMassStatus::new(
948            self.core.client_id,
949            self.core.account_id,
950            *DERIVE_VENUE,
951            ts_now,
952            None,
953        );
954
955        mass_status.add_order_reports(history_order_reports);
956        mass_status.add_order_reports(open_order_reports);
957        mass_status.add_fill_reports(fill_reports);
958        mass_status.add_position_reports(position_reports);
959        add_missing_flat_position_reports(
960            &mut mass_status,
961            self.core.account_id,
962            touched_instruments,
963            ts_now,
964        );
965
966        Ok(Some(mass_status))
967    }
968
969    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
970        let order = self
971            .core
972            .cache()
973            .order(&cmd.client_order_id)
974            .map(|o| o.clone())
975            .ok_or_else(|| {
976                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
977            })?;
978
979        if order.is_closed() {
980            log::warn!("Cannot submit closed order {}", order.client_order_id());
981            return Ok(());
982        }
983
984        // Spot has no position to reduce; the venue rejects reduce-only
985        // unconditionally (11025), so deny locally. Perp/option reduce-only is
986        // position-conditional and must still reach the venue.
987        if order.is_reduce_only()
988            && matches!(
989                self.core.cache().instrument(&cmd.instrument_id),
990                Some(InstrumentAny::CurrencyPair(_))
991            )
992        {
993            let reason = format!(
994                "reduce-only is not supported for spot instrument {}; Derive spot has no position to reduce",
995                cmd.instrument_id,
996            );
997            log::warn!("{reason}");
998            self.emitter.emit_order_denied(&order, &reason);
999            return Ok(());
1000        }
1001
1002        // Keep the existing OrderDenied path here, then refresh before signing
1003        let is_trigger_order = is_derive_trigger_order_type(order.order_type());
1004        let market_quote = if order.order_type() == OrderType::Market {
1005            match self.core.cache().quote(&cmd.instrument_id) {
1006                Some(_) => Some(()),
1007                None => {
1008                    let reason = format!(
1009                        "no cached quote for {}; subscribe to quote data before submitting market orders",
1010                        cmd.instrument_id,
1011                    );
1012                    log::warn!("{reason}");
1013                    self.emitter.emit_order_denied(&order, &reason);
1014                    return Ok(());
1015                }
1016            }
1017        } else {
1018            None
1019        };
1020
1021        let venue_symbol = format_venue_symbol(&cmd.instrument_id)?.to_string();
1022        let http_client = self.http_client.clone();
1023        let ws_exec = self.ws_exec.clone();
1024        let signing = self.signing.clone();
1025        let nonce_manager = self.nonce_manager.clone();
1026        let wallet_str = self.credential.wallet_address().to_string();
1027        let emitter = self.emitter.clone();
1028        let clock = self.clock;
1029        let instruments = self.instruments.clone();
1030        let instrument_id = cmd.instrument_id;
1031        let order_for_task = order.clone();
1032        let account_id = self.core.account_id;
1033
1034        // Capture identity so the WS dispatch can route subsequent updates
1035        // for this order to proper events rather than execution reports.
1036        let identity = OrderIdentity {
1037            instrument_id: order.instrument_id(),
1038            strategy_id: order.strategy_id(),
1039            order_side: order.order_side(),
1040            order_type: order.order_type(),
1041        };
1042        self.dispatch_state
1043            .register_identity(order.client_order_id(), identity);
1044
1045        self.emitter.emit_order_submitted(&order);
1046
1047        let slippage_bps = self.signing.market_order_slippage_bps;
1048        let dispatch_state = self.dispatch_state.clone();
1049
1050        self.spawn_task("submit_order", async move {
1051            let instrument = match cached_or_fetch_instrument(
1052                &http_client,
1053                &instruments,
1054                &instrument_id,
1055                &venue_symbol,
1056            )
1057            .await
1058            {
1059                Ok(i) => i,
1060                Err(e) => {
1061                    log::warn!("Failed to resolve instrument {venue_symbol}: {e}");
1062                    dispatch_state.forget(&order_for_task.client_order_id());
1063                    let ts = clock.get_time_ns();
1064                    emitter.emit_order_rejected(
1065                        &order_for_task,
1066                        &format!("instrument resolution failed: {e}"),
1067                        ts,
1068                        false,
1069                    );
1070                    return Ok(());
1071                }
1072            };
1073
1074            // Lazy-resolution net: the synchronous deny is skipped when the
1075            // cache was empty at submit time. OrderSubmitted already fired, so
1076            // reject here rather than deny.
1077            if order_for_task.is_reduce_only()
1078                && instrument.instrument_type == DeriveInstrumentType::Erc20
1079            {
1080                let reason = format!(
1081                    "reduce-only is not supported for spot instrument {}; Derive spot has no position to reduce",
1082                    order_for_task.instrument_id(),
1083                );
1084                log::warn!("{reason}");
1085                dispatch_state.forget(&order_for_task.client_order_id());
1086                let ts = clock.get_time_ns();
1087                emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1088                return Ok(());
1089            }
1090
1091            // Avoid signing against a quote captured before instrument resolution
1092            let explicit_price = if market_quote.is_some() {
1093                let quote = match refresh_market_order_quote(
1094                    &http_client,
1095                    &venue_symbol,
1096                    &instrument,
1097                    clock,
1098                )
1099                .await
1100                {
1101                    Ok(quote) => quote,
1102                    Err(e) => {
1103                        let reason = format!(
1104                            "market-order quote refresh failed for {}: {e}",
1105                            order_for_task.client_order_id(),
1106                        );
1107                        log::warn!("{reason}");
1108                        dispatch_state.forget(&order_for_task.client_order_id());
1109                        let ts = clock.get_time_ns();
1110                        emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1111                        return Ok(());
1112                    }
1113                };
1114
1115                match market_order_limit_price(
1116                    &quote,
1117                    order_for_task.order_side(),
1118                    slippage_bps,
1119                    instrument.tick_size,
1120                ) {
1121                    Some(p) => Some(p),
1122                    None => {
1123                        let reason = format!(
1124                            "market-order slippage bound is non-positive for {} ({} bps)",
1125                            order_for_task.client_order_id(),
1126                            slippage_bps,
1127                        );
1128                        log::warn!("{reason}");
1129                        dispatch_state.forget(&order_for_task.client_order_id());
1130                        let ts = clock.get_time_ns();
1131                        emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1132                        return Ok(());
1133                    }
1134                }
1135            } else if matches!(
1136                order_for_task.order_type(),
1137                OrderType::StopMarket | OrderType::MarketIfTouched
1138            ) {
1139                let trigger_price = match order_for_task.trigger_price() {
1140                    Some(price) => price.as_decimal(),
1141                    None => {
1142                        let reason = format!(
1143                            "trigger market order {} is missing trigger_price",
1144                            order_for_task.client_order_id(),
1145                        );
1146                        log::warn!("{reason}");
1147                        dispatch_state.forget(&order_for_task.client_order_id());
1148                        let ts = clock.get_time_ns();
1149                        emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1150                        return Ok(());
1151                    }
1152                };
1153
1154                match trigger_market_limit_price(
1155                    trigger_price,
1156                    order_for_task.order_side(),
1157                    slippage_bps,
1158                    instrument.tick_size,
1159                ) {
1160                    Some(p) => Some(p),
1161                    None => {
1162                        let reason = format!(
1163                            "trigger market-order slippage bound is non-positive for {} ({} bps)",
1164                            order_for_task.client_order_id(),
1165                            slippage_bps,
1166                        );
1167                        log::warn!("{reason}");
1168                        dispatch_state.forget(&order_for_task.client_order_id());
1169                        let ts = clock.get_time_ns();
1170                        emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1171                        return Ok(());
1172                    }
1173                }
1174            } else {
1175                None
1176            };
1177
1178            if is_trigger_order {
1179                let nonce = nonce_manager.next_nonce(&wallet_str, signing.subaccount_id)?;
1180                let expiry = trigger_order_signature_expiry(clock);
1181                let payload = match trigger_order_to_derive_payload(
1182                    &order_for_task,
1183                    &instrument,
1184                    signing.subaccount_id,
1185                    signing.wallet_address,
1186                    &signing.signer,
1187                    nonce,
1188                    expiry,
1189                    signing.trade_module_address,
1190                    signing.domain_separator,
1191                    signing.action_typehash,
1192                    signing.max_fee_per_contract,
1193                    explicit_price,
1194                    ws_exec.conn_id(),
1195                    UUID4::new().to_string(),
1196                ) {
1197                    Ok(p) => p,
1198                    Err(e) => {
1199                        log::warn!(
1200                            "Trigger order encode failed for {}: {e}",
1201                            order_for_task.client_order_id()
1202                        );
1203                        dispatch_state.forget(&order_for_task.client_order_id());
1204                        let ts = clock.get_time_ns();
1205                        emitter.emit_order_rejected(
1206                            &order_for_task,
1207                            &format!("order encoding failed: {e}"),
1208                            ts,
1209                            false,
1210                        );
1211                        return Ok(());
1212                    }
1213                };
1214
1215                log::debug!(
1216                    "Derive trigger submit payload client_order_id={} instrument_name={} direction={} order_type={} time_in_force={} amount={} limit_price={} trigger_price={:?} trigger_price_type={:?} trigger_type={:?}",
1217                    order_for_task.client_order_id(),
1218                    payload.order.instrument_name.as_str(),
1219                    payload.order.direction,
1220                    payload.order.order_type,
1221                    payload.order.time_in_force,
1222                    payload.order.amount,
1223                    payload.order.limit_price,
1224                    payload.order.trigger_price,
1225                    payload.order.trigger_price_type,
1226                    payload.order.trigger_type,
1227                );
1228
1229                match ws_exec.submit_trigger_order(&payload).await {
1230                    Ok(order) => {
1231                        let venue_order_id = VenueOrderId::new(order.order_id.as_str());
1232                        dispatch_state.record_venue_order_id(
1233                            order_for_task.client_order_id(),
1234                            venue_order_id,
1235                        );
1236                        let ts_now = clock.get_time_ns();
1237                        ensure_accepted_emitted(
1238                            &emitter,
1239                            &dispatch_state,
1240                            order_for_task.client_order_id(),
1241                            identity,
1242                            venue_order_id,
1243                            account_id,
1244                            ts_now,
1245                            ts_now,
1246                        );
1247                        log::debug!(
1248                            "Trigger order submitted: client_order_id={} venue_order_id={venue_order_id}",
1249                            order_for_task.client_order_id(),
1250                        );
1251                    }
1252                    Err(e) if is_write_outcome_ambiguous_ws(&e) => {
1253                        log::warn!(
1254                            "Derive trigger submit for {} returned ambiguous WS outcome: {e}; awaiting reconciliation",
1255                            order_for_task.client_order_id(),
1256                        );
1257                    }
1258                    Err(e) => {
1259                        let (reason, due_post_only) = ws_rejection_reason(&e);
1260                        log::debug!(
1261                            "Derive rejected trigger order {}: {reason}",
1262                            order_for_task.client_order_id(),
1263                        );
1264                        dispatch_state.forget(&order_for_task.client_order_id());
1265                        let ts = clock.get_time_ns();
1266                        emitter.emit_order_rejected(
1267                            &order_for_task,
1268                            &reason,
1269                            ts,
1270                            due_post_only,
1271                        );
1272                    }
1273                }
1274                return Ok(());
1275            }
1276
1277            let expiry =
1278                match normal_order_signature_expiry(clock, signing.signature_expiry_secs) {
1279                    Ok(expiry) => expiry,
1280                    Err(e) => {
1281                        log::warn!(
1282                            "Order expiry validation failed for {}: {e}",
1283                            order_for_task.client_order_id()
1284                        );
1285                        dispatch_state.forget(&order_for_task.client_order_id());
1286                        let ts = clock.get_time_ns();
1287                        emitter.emit_order_rejected(
1288                            &order_for_task,
1289                            &format!("order expiry validation failed: {e}"),
1290                            ts,
1291                            false,
1292                        );
1293                        return Ok(());
1294                    }
1295                };
1296            let nonce = nonce_manager.next_nonce(&wallet_str, signing.subaccount_id)?;
1297            let payload = match order_to_derive_payload(
1298                &order_for_task,
1299                &instrument,
1300                signing.subaccount_id,
1301                signing.wallet_address,
1302                &signing.signer,
1303                nonce,
1304                expiry,
1305                signing.trade_module_address,
1306                signing.domain_separator,
1307                signing.action_typehash,
1308                signing.max_fee_per_contract,
1309                explicit_price,
1310            ) {
1311                Ok(p) => p,
1312                Err(e) => {
1313                    log::warn!("Order encode failed for {}: {e}", order_for_task.client_order_id());
1314                    dispatch_state.forget(&order_for_task.client_order_id());
1315                    let ts = clock.get_time_ns();
1316                    emitter.emit_order_rejected(
1317                        &order_for_task,
1318                        &format!("order encoding failed: {e}"),
1319                        ts,
1320                        false,
1321                    );
1322                    return Ok(());
1323                }
1324            };
1325
1326            // Pre-flight debug log so a venue 11012-style rejection can be
1327            // diagnosed without re-running with full payload tracing.
1328            log::debug!(
1329                "Derive submit payload client_order_id={} instrument_name={} direction={} order_type={} time_in_force={} amount={} limit_price={}",
1330                order_for_task.client_order_id(),
1331                payload.instrument_name.as_str(),
1332                payload.direction,
1333                payload.order_type,
1334                payload.time_in_force,
1335                payload.amount,
1336                payload.limit_price,
1337            );
1338
1339            // Discard the result (and any `trades` it carries): fills arrive on
1340            // the `.trades` channel and are deduped by trade id.
1341            match ws_exec.submit_order(&payload).await {
1342                Ok(_) => {
1343                    log::debug!(
1344                        "Order submitted: client_order_id={}",
1345                        order_for_task.client_order_id(),
1346                    );
1347                }
1348                // See docs/integrations/derive.md "Order rejection semantics".
1349                Err(e) if is_write_outcome_ambiguous_ws(&e) => {
1350                    log::warn!(
1351                        "Derive submit for {} returned ambiguous WS outcome: {e}; awaiting reconciliation",
1352                        order_for_task.client_order_id(),
1353                    );
1354                }
1355                Err(e) => {
1356                    let (reason, due_post_only) = ws_rejection_reason(&e);
1357                    log::debug!(
1358                        "Derive rejected order {}: {reason}",
1359                        order_for_task.client_order_id(),
1360                    );
1361                    dispatch_state.forget(&order_for_task.client_order_id());
1362                    let ts = clock.get_time_ns();
1363                    emitter.emit_order_rejected(&order_for_task, &reason, ts, due_post_only);
1364                }
1365            }
1366            Ok(())
1367        });
1368
1369        Ok(())
1370    }
1371
1372    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1373        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
1374        for order in orders {
1375            let sub = SubmitOrder::from_order(
1376                &order,
1377                cmd.trader_id,
1378                cmd.client_id,
1379                cmd.position_id,
1380                UUID4::new(),
1381                cmd.ts_init,
1382            );
1383            self.submit_order(sub)?;
1384        }
1385        Ok(())
1386    }
1387
1388    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1389        let Some(venue_order_id) = cmd.venue_order_id else {
1390            log::warn!(
1391                "Derive cancel_order requires venue_order_id (client_order_id={})",
1392                cmd.client_order_id,
1393            );
1394            return Ok(());
1395        };
1396        let ws_exec = self.ws_exec.clone();
1397        let subaccount_id = self.credential.subaccount_id();
1398        let venue_symbol = format_venue_symbol(&cmd.instrument_id)?.to_string();
1399        let voi = venue_order_id.to_string();
1400        let emitter = self.emitter.clone();
1401        let clock = self.clock;
1402        let strategy_id = cmd.strategy_id;
1403        let instrument_id = cmd.instrument_id;
1404        let client_order_id = cmd.client_order_id;
1405        let stale_venue_order_id = venue_order_id;
1406        let is_trigger_order = self
1407            .core
1408            .cache()
1409            .order(&client_order_id)
1410            .is_some_and(|order| is_derive_trigger_order_type(order.order_type()));
1411
1412        self.spawn_task("cancel_order", async move {
1413            let outcome = if is_trigger_order {
1414                ws_exec
1415                    .cancel_trigger_order(&DeriveCancelTriggerOrderParams::new(
1416                        subaccount_id,
1417                        voi.as_str(),
1418                    ))
1419                    .await
1420                    .map(|_| ())
1421            } else {
1422                ws_exec
1423                    .cancel_order(&DeriveCancelParams::new(
1424                        subaccount_id,
1425                        venue_symbol.as_str(),
1426                        voi.as_str(),
1427                    ))
1428                    .await
1429            };
1430
1431            match outcome {
1432                Ok(()) => {}
1433                // See docs/integrations/derive.md "Order rejection semantics".
1434                Err(e) if is_write_outcome_ambiguous_ws(&e) => {
1435                    log::warn!(
1436                        "Derive cancel for {client_order_id} returned ambiguous WS outcome: {e}; awaiting reconciliation",
1437                    );
1438                }
1439                Err(e) => {
1440                    let (reason, _) = ws_rejection_reason(&e);
1441                    log::debug!("Derive rejected cancel for {client_order_id}: {reason}");
1442                    let ts = clock.get_time_ns();
1443                    emitter.emit_order_cancel_rejected_event(
1444                        strategy_id,
1445                        instrument_id,
1446                        client_order_id,
1447                        Some(stale_venue_order_id),
1448                        &reason,
1449                        ts,
1450                    );
1451                }
1452            }
1453            Ok(())
1454        });
1455        Ok(())
1456    }
1457
1458    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1459        let http_client = self.http_client.clone();
1460        let ws_exec = self.ws_exec.clone();
1461        let subaccount_id = self.credential.subaccount_id();
1462        let venue_symbol = format_venue_symbol(&cmd.instrument_id)?.to_string();
1463        let side_filter = cmd.order_side;
1464
1465        self.spawn_task("cancel_all_orders", async move {
1466            // The venue endpoint scopes by instrument only, so when the
1467            // caller asks for a single side we list open orders (an idempotent
1468            // private read kept on HTTP), filter by side, and cancel each one
1469            // over the WebSocket. Calling `cancel_all` directly would drop both
1470            // sides and violate the command's filter.
1471            if matches!(side_filter, OrderSide::Buy | OrderSide::Sell) {
1472                let open_params = DeriveGetOpenOrdersParams::new(subaccount_id);
1473                let mut orders = match http_client.get_open_orders(&open_params).await {
1474                    Ok(v) => v,
1475                    Err(e) => {
1476                        log::warn!(
1477                            "Derive cancel_all_orders: failed to list open orders for side filter {side_filter:?}: {e}",
1478                        );
1479                        return Ok(());
1480                    }
1481                }
1482                .orders;
1483
1484                match http_client
1485                    .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
1486                    .await
1487                {
1488                    Ok(result) => orders.extend(result.orders),
1489                    Err(e) => {
1490                        log::warn!(
1491                            "Derive cancel_all_orders: failed to list trigger orders for side filter {side_filter:?}: {e}",
1492                        );
1493                    }
1494                }
1495
1496                for order in orders {
1497                    if order.instrument_name.as_str() != venue_symbol {
1498                        continue;
1499                    }
1500                    let order_side = match order.direction {
1501                        DeriveOrderSide::Buy => OrderSide::Buy,
1502                        DeriveOrderSide::Sell => OrderSide::Sell,
1503                    };
1504
1505                    if order_side != side_filter {
1506                        continue;
1507                    }
1508
1509                    let outcome = if order.trigger_type.is_some() {
1510                        ws_exec
1511                            .cancel_trigger_order(&DeriveCancelTriggerOrderParams::new(
1512                                subaccount_id,
1513                                order.order_id.as_str(),
1514                            ))
1515                            .await
1516                            .map(|_| ())
1517                    } else {
1518                        ws_exec
1519                            .cancel_order(&DeriveCancelParams::new(
1520                                subaccount_id,
1521                                venue_symbol.as_str(),
1522                                order.order_id.as_str(),
1523                            ))
1524                            .await
1525                    };
1526
1527                    if let Err(e) = outcome {
1528                        log::warn!(
1529                            "Derive cancel_all_orders: cancel for {} failed: {e}",
1530                            order.order_id,
1531                        );
1532                    }
1533                }
1534            } else if let Err(e) = ws_exec
1535                .cancel_all_orders(
1536                    &DeriveCancelAllParams::new(subaccount_id)
1537                        .with_instrument_name(venue_symbol.as_str()),
1538                )
1539                .await
1540            {
1541                log::warn!("Derive cancel_all_orders failed for {venue_symbol}: {e}");
1542            }
1543
1544            if !matches!(side_filter, OrderSide::Buy | OrderSide::Sell) {
1545                let trigger_orders = match http_client
1546                    .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
1547                    .await
1548                {
1549                    Ok(result) => result.orders,
1550                    Err(e) => {
1551                        log::warn!(
1552                            "Derive cancel_all_orders: failed to list trigger orders for {venue_symbol}: {e}",
1553                        );
1554                        return Ok(());
1555                    }
1556                };
1557
1558                for order in trigger_orders {
1559                    if order.instrument_name.as_str() != venue_symbol {
1560                        continue;
1561                    }
1562
1563                    if let Err(e) = ws_exec
1564                        .cancel_trigger_order(&DeriveCancelTriggerOrderParams::new(
1565                            subaccount_id,
1566                            order.order_id.as_str(),
1567                        ))
1568                        .await
1569                    {
1570                        log::warn!(
1571                            "Derive cancel_all_orders: trigger cancel for {} failed: {e}",
1572                            order.order_id,
1573                        );
1574                    }
1575                }
1576            }
1577            Ok(())
1578        });
1579        Ok(())
1580    }
1581
1582    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1583        for inner in cmd.cancels {
1584            self.cancel_order(inner)?;
1585        }
1586        Ok(())
1587    }
1588
1589    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1590        let ts_now = self.clock.get_time_ns();
1591
1592        let Some(venue_order_id) = cmd.venue_order_id else {
1593            let reason = "venue_order_id is required for modify";
1594            log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
1595            self.emitter.emit_order_modify_rejected_event(
1596                cmd.strategy_id,
1597                cmd.instrument_id,
1598                cmd.client_order_id,
1599                None,
1600                reason,
1601                ts_now,
1602            );
1603            return Ok(());
1604        };
1605
1606        let Some(order) = self
1607            .core
1608            .cache()
1609            .order(&cmd.client_order_id)
1610            .map(|o| o.clone())
1611        else {
1612            let reason = "order not found in cache";
1613            log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
1614            self.emitter.emit_order_modify_rejected_event(
1615                cmd.strategy_id,
1616                cmd.instrument_id,
1617                cmd.client_order_id,
1618                Some(venue_order_id),
1619                reason,
1620                ts_now,
1621            );
1622            return Ok(());
1623        };
1624
1625        if is_derive_trigger_order_type(order.order_type()) {
1626            let reason = "Derive trigger orders cannot be modified; cancel and resubmit";
1627            log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
1628            self.emitter.emit_order_modify_rejected_event(
1629                cmd.strategy_id,
1630                cmd.instrument_id,
1631                cmd.client_order_id,
1632                Some(venue_order_id),
1633                reason,
1634                ts_now,
1635            );
1636            return Ok(());
1637        }
1638
1639        let target_quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
1640        let target_price = cmd.price.or_else(|| order.price());
1641
1642        let venue_symbol = format_venue_symbol(&cmd.instrument_id)?.to_string();
1643        let http_client = self.http_client.clone();
1644        let ws_exec = self.ws_exec.clone();
1645        let signing = self.signing.clone();
1646        let nonce_manager = self.nonce_manager.clone();
1647        let wallet_str = self.credential.wallet_address().to_string();
1648        let emitter = self.emitter.clone();
1649        let clock = self.clock;
1650        let instruments = self.instruments.clone();
1651        let dispatch_state = self.dispatch_state.clone();
1652        let order_for_task = order;
1653        let strategy_id = cmd.strategy_id;
1654        let instrument_id = cmd.instrument_id;
1655        let client_order_id = cmd.client_order_id;
1656        let stale_venue_order_id = venue_order_id;
1657        let voi_str = venue_order_id.to_string();
1658
1659        self.spawn_task("modify_order", async move {
1660            let instrument = match cached_or_fetch_instrument(
1661                &http_client,
1662                &instruments,
1663                &instrument_id,
1664                &venue_symbol,
1665            )
1666            .await
1667            {
1668                Ok(i) => i,
1669                Err(e) => {
1670                    let reason = format!("instrument resolution failed: {e}");
1671                    log::warn!("Cannot modify order {client_order_id}: {reason}");
1672                    let ts = clock.get_time_ns();
1673                    emitter.emit_order_modify_rejected_event(
1674                        strategy_id,
1675                        instrument_id,
1676                        client_order_id,
1677                        Some(stale_venue_order_id),
1678                        &reason,
1679                        ts,
1680                    );
1681                    return Ok(());
1682                }
1683            };
1684
1685            let expiry = match normal_order_signature_expiry(clock, signing.signature_expiry_secs) {
1686                Ok(expiry) => expiry,
1687                Err(e) => {
1688                    let reason = format!("replace expiry validation failed: {e}");
1689                    log::warn!("Cannot modify order {client_order_id}: {reason}");
1690                    let ts = clock.get_time_ns();
1691                    emitter.emit_order_modify_rejected_event(
1692                        strategy_id,
1693                        instrument_id,
1694                        client_order_id,
1695                        Some(stale_venue_order_id),
1696                        &reason,
1697                        ts,
1698                    );
1699                    return Ok(());
1700                }
1701            };
1702            let nonce = nonce_manager.next_nonce(&wallet_str, signing.subaccount_id)?;
1703
1704            let payload = match order_replace_to_derive_payload(
1705                &order_for_task,
1706                &instrument,
1707                signing.subaccount_id,
1708                signing.wallet_address,
1709                &signing.signer,
1710                nonce,
1711                expiry,
1712                signing.trade_module_address,
1713                signing.domain_separator,
1714                signing.action_typehash,
1715                signing.max_fee_per_contract,
1716                Some(target_quantity.as_decimal()),
1717                target_price.map(|p| p.as_decimal()),
1718                &voi_str,
1719            ) {
1720                Ok(p) => p,
1721                Err(e) => {
1722                    let reason = format!("replace encoding failed: {e}");
1723                    log::warn!("Cannot modify order {client_order_id}: {reason}");
1724                    let ts = clock.get_time_ns();
1725                    emitter.emit_order_modify_rejected_event(
1726                        strategy_id,
1727                        instrument_id,
1728                        client_order_id,
1729                        Some(stale_venue_order_id),
1730                        &reason,
1731                        ts,
1732                    );
1733                    return Ok(());
1734                }
1735            };
1736
1737            // Mark before sending so the cancel-of-old leg is suppressed even if
1738            // it arrives before this response.
1739            dispatch_state.mark_pending_modify(client_order_id, stale_venue_order_id);
1740
1741            match ws_exec.modify_order(&payload).await {
1742                Ok(order) => {
1743                    let new_voi = VenueOrderId::new(order.order_id.as_str());
1744                    log::debug!(
1745                        "Order replaced: client_order_id={client_order_id}, new venue_order_id={new_voi}",
1746                    );
1747                    // Rebind before clearing the marker so a later cancel-of-old
1748                    // stays suppressed by the bound-id check.
1749                    dispatch_state.record_venue_order_id(client_order_id, new_voi);
1750                    dispatch_state.clear_pending_modify(&client_order_id);
1751                    let ts = clock.get_time_ns();
1752                    emitter.emit_order_updated(
1753                        &order_for_task,
1754                        new_voi,
1755                        target_quantity,
1756                        target_price,
1757                        None,
1758                        None,
1759                        ts,
1760                    );
1761                }
1762                // See docs/integrations/derive.md "Order rejection semantics".
1763                Err(e) if is_write_outcome_ambiguous_ws(&e) => {
1764                    dispatch_state.clear_pending_modify(&client_order_id);
1765                    log::warn!(
1766                        "Derive modify for {client_order_id} returned ambiguous WS outcome: {e}; awaiting reconciliation",
1767                    );
1768                }
1769                Err(e) => {
1770                    dispatch_state.clear_pending_modify(&client_order_id);
1771                    let (reason, _) = ws_rejection_reason(&e);
1772                    log::debug!("Derive rejected modify for {client_order_id}: {reason}");
1773                    let ts = clock.get_time_ns();
1774                    emitter.emit_order_modify_rejected_event(
1775                        strategy_id,
1776                        instrument_id,
1777                        client_order_id,
1778                        Some(stale_venue_order_id),
1779                        &reason,
1780                        ts,
1781                    );
1782                }
1783            }
1784            Ok(())
1785        });
1786        Ok(())
1787    }
1788
1789    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1790        let http_client = self.http_client.clone();
1791        let subaccount_id = self.credential.subaccount_id();
1792        let emitter = self.emitter.clone();
1793        let clock = self.clock;
1794        self.spawn_task("query_account", async move {
1795            let subaccount = http_client
1796                .get_subaccount(&DeriveGetSubaccountParams::new(subaccount_id))
1797                .await?;
1798            let (balances, margins) = parse_derive_subaccount_to_balances(&subaccount)?;
1799            let ts_event = clock.get_time_ns();
1800            emitter.emit_account_state(balances, margins, true, ts_event);
1801            Ok(())
1802        });
1803        Ok(())
1804    }
1805
1806    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1807        let Some(venue_order_id) = cmd.venue_order_id else {
1808            log::warn!(
1809                "Derive query_order requires venue_order_id (client_order_id={})",
1810                cmd.client_order_id,
1811            );
1812            return Ok(());
1813        };
1814        let http_client = self.http_client.clone();
1815        let subaccount_id = self.credential.subaccount_id();
1816        let account_id = self.core.account_id;
1817        let emitter = self.emitter.clone();
1818        let clock = self.clock;
1819        let voi = venue_order_id.to_string();
1820
1821        self.spawn_task("query_order", async move {
1822            let order = match http_client
1823                .get_order(&DeriveGetOrderParams::new(subaccount_id, voi.as_str()))
1824                .await
1825            {
1826                Ok(o) => o,
1827                Err(e) => {
1828                    let trigger_orders = match http_client
1829                        .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
1830                        .await
1831                    {
1832                        Ok(result) => result.orders,
1833                        Err(trigger_err) => {
1834                            log::warn!(
1835                                "Failed to fetch Derive order {voi}: {e}; trigger lookup also failed: {trigger_err}",
1836                            );
1837                            return Ok(());
1838                        }
1839                    };
1840
1841                    match trigger_orders
1842                        .into_iter()
1843                        .find(|o| o.order_id.as_str() == voi.as_str())
1844                    {
1845                        Some(order) => order,
1846                        None => {
1847                            log::warn!("Failed to fetch Derive order {voi}: {e}");
1848                            return Ok(());
1849                        }
1850                    }
1851                }
1852            };
1853            let ts_init = clock.get_time_ns();
1854            let report = parse_derive_order_to_report(&order, account_id, ts_init)?;
1855            emitter.send_order_status_report(report);
1856            Ok(())
1857        });
1858        Ok(())
1859    }
1860}
1861
1862// Reason text and post-only classification for a definitive WS write failure.
1863// Non-JSON-RPC errors carry no venue code and are never post-only crossings.
1864fn ws_rejection_reason(error: &DeriveWsError) -> (String, bool) {
1865    match error {
1866        DeriveWsError::JsonRpc { code, message, .. } => (
1867            format!("JSON-RPC {code}: {message}"),
1868            derive_rejection_due_post_only(Some(*code), message),
1869        ),
1870        other => (other.to_string(), false),
1871    }
1872}
1873
1874fn add_missing_flat_position_reports(
1875    mass_status: &mut ExecutionMassStatus,
1876    account_id: AccountId,
1877    touched_instruments: AHashSet<InstrumentId>,
1878    ts_init: UnixNanos,
1879) {
1880    let active_position_instruments: AHashSet<InstrumentId> =
1881        mass_status.position_reports().keys().copied().collect();
1882    let mut flat_reports = Vec::new();
1883
1884    for instrument_id in touched_instruments {
1885        if active_position_instruments.contains(&instrument_id) {
1886            continue;
1887        }
1888
1889        flat_reports.push(PositionStatusReport::new(
1890            account_id,
1891            instrument_id,
1892            PositionSideSpecified::Flat,
1893            Quantity::from("0"),
1894            ts_init,
1895            ts_init,
1896            Some(UUID4::new()),
1897            None,
1898            None,
1899        ));
1900    }
1901
1902    if !flat_reports.is_empty() {
1903        log::info!(
1904            "Added {} flat PositionReports for Derive instruments absent from current positions",
1905            flat_reports.len()
1906        );
1907        mass_status.add_position_reports(flat_reports);
1908    }
1909}
1910
1911fn handle_ws_message(
1912    message: DeriveWsMessage,
1913    emitter: &ExecutionEventEmitter,
1914    account_id: AccountId,
1915    clock: &'static AtomicTime,
1916    dispatch_state: &WsDispatchState,
1917) {
1918    let payload = match message {
1919        DeriveWsMessage::Subscription(payload) => payload,
1920        DeriveWsMessage::Authenticated | DeriveWsMessage::Reconnected => return,
1921    };
1922
1923    let is_orders_channel = payload.channel.as_str().ends_with(".orders");
1924    let is_trades_channel = payload.channel.as_str().ends_with(".trades");
1925
1926    if is_orders_channel {
1927        let data = match serde_json::from_str::<DeriveOrdersSubscriptionData>(payload.data.get()) {
1928            Ok(data) => data,
1929            Err(_) => return,
1930        };
1931        dispatch_orders_payload(data, emitter, account_id, clock, dispatch_state);
1932    } else if is_trades_channel {
1933        let data = match serde_json::from_str::<DeriveTradesSubscriptionData>(payload.data.get()) {
1934            Ok(data) => data,
1935            Err(_) => return,
1936        };
1937        dispatch_trades_payload(data, emitter, account_id, clock, dispatch_state);
1938    }
1939}
1940
1941/// Dispatches a parsed `{subaccount_id}.orders` payload to the execution event
1942/// emitter.
1943///
1944/// Emits tracked order events when an order's client order id resolves to a
1945/// registered identity in `dispatch_state`, and forwards a raw status report
1946/// otherwise.
1947pub fn dispatch_orders_payload(
1948    data: DeriveOrdersSubscriptionData,
1949    emitter: &ExecutionEventEmitter,
1950    account_id: AccountId,
1951    clock: &'static AtomicTime,
1952    dispatch_state: &WsDispatchState,
1953) {
1954    let ts_init = clock.get_time_ns();
1955    for order in data.orders {
1956        let report = match parse_derive_order_to_report(&order, account_id, ts_init) {
1957            Ok(report) => report,
1958            Err(e) => {
1959                log::warn!("Failed to parse Derive order WS update: {e}");
1960                continue;
1961            }
1962        };
1963
1964        let identity = tracked_order_identity(report.client_order_id, dispatch_state);
1965
1966        match identity {
1967            Some((client_order_id, identity)) => emit_tracked_order_event(
1968                emitter,
1969                dispatch_state,
1970                client_order_id,
1971                identity,
1972                &report,
1973                account_id,
1974                ts_init,
1975            ),
1976            None => emitter.send_order_status_report(report),
1977        }
1978    }
1979}
1980
1981/// Dispatches a parsed `{subaccount_id}.trades` payload to the execution event
1982/// emitter.
1983///
1984/// Deduplicates by trade id, then emits a tracked fill when the trade's client
1985/// order id resolves to a registered identity in `dispatch_state`, and forwards
1986/// a raw fill report otherwise.
1987pub fn dispatch_trades_payload(
1988    data: DeriveTradesSubscriptionData,
1989    emitter: &ExecutionEventEmitter,
1990    account_id: AccountId,
1991    clock: &'static AtomicTime,
1992    dispatch_state: &WsDispatchState,
1993) {
1994    let ts_init = clock.get_time_ns();
1995    let fee_currency = Currency::USDC();
1996    for trade in data.trades {
1997        match parse_derive_trade_to_fill_report(&trade, account_id, fee_currency, ts_init) {
1998            Ok(Some(report)) => {
1999                if dispatch_state.check_and_insert_trade(report.trade_id) {
2000                    log::debug!(
2001                        "Skipping duplicate Derive fill (trade_id={}) on WS dispatch",
2002                        report.trade_id,
2003                    );
2004                    continue;
2005                }
2006
2007                let identity = tracked_order_identity(report.client_order_id, dispatch_state);
2008
2009                match identity {
2010                    Some((client_order_id, identity)) => emit_tracked_fill(
2011                        emitter,
2012                        dispatch_state,
2013                        client_order_id,
2014                        identity,
2015                        &report,
2016                        account_id,
2017                        ts_init,
2018                    ),
2019                    None => emitter.send_fill_report(report),
2020                }
2021            }
2022            Ok(None) => {}
2023            Err(e) => log::warn!("Failed to parse Derive trade WS update: {e}"),
2024        }
2025    }
2026}
2027
2028fn tracked_order_identity(
2029    client_order_id: Option<ClientOrderId>,
2030    dispatch_state: &WsDispatchState,
2031) -> Option<(ClientOrderId, OrderIdentity)> {
2032    client_order_id.and_then(|cid| {
2033        dispatch_state
2034            .identity(&cid)
2035            .map(|identity| (cid, identity))
2036    })
2037}
2038
2039/// Synthesizes and emits `OrderAccepted` when one has not yet been emitted
2040/// for the order. Used to guarantee the `Submitted -> Accepted -> ...`
2041/// lifecycle when a fill or terminal event arrives before (or instead of)
2042/// the venue's `Open` notice.
2043#[expect(clippy::too_many_arguments)]
2044fn ensure_accepted_emitted(
2045    emitter: &ExecutionEventEmitter,
2046    dispatch_state: &WsDispatchState,
2047    client_order_id: ClientOrderId,
2048    identity: OrderIdentity,
2049    venue_order_id: VenueOrderId,
2050    account_id: AccountId,
2051    ts_event: UnixNanos,
2052    ts_init: UnixNanos,
2053) {
2054    if dispatch_state.mark_accepted(client_order_id) {
2055        return;
2056    }
2057    let accepted = OrderAccepted::new(
2058        emitter.trader_id(),
2059        identity.strategy_id,
2060        identity.instrument_id,
2061        client_order_id,
2062        venue_order_id,
2063        account_id,
2064        UUID4::new(),
2065        ts_event,
2066        ts_init,
2067        false,
2068    );
2069    emitter.send_order_event(OrderEventAny::Accepted(accepted));
2070}
2071
2072fn emit_tracked_order_event(
2073    emitter: &ExecutionEventEmitter,
2074    dispatch_state: &WsDispatchState,
2075    client_order_id: ClientOrderId,
2076    identity: OrderIdentity,
2077    report: &OrderStatusReport,
2078    account_id: AccountId,
2079    ts_init: UnixNanos,
2080) {
2081    let venue_order_id = report.venue_order_id;
2082    let ts_accepted = report.ts_accepted;
2083    let ts_event = report.ts_last;
2084
2085    // A `private/replace` cancels the old order and opens a new one under the
2086    // same label; suppress events for the superseded old venue order id so they
2087    // don't terminate the order that `modify_order` rebinds via `OrderUpdated`.
2088    // `pending_modify` covers the in-flight window; the bound-id check covers
2089    // after the rebind.
2090    if dispatch_state.pending_modify(&client_order_id) == Some(venue_order_id) {
2091        log::debug!(
2092            "Skipping cancel-replace leg for {client_order_id}: stale venue_order_id={venue_order_id}",
2093        );
2094        return;
2095    }
2096
2097    if let Some(bound) = dispatch_state.bound_venue_order_id(&client_order_id)
2098        && bound != venue_order_id
2099    {
2100        log::debug!(
2101            "Skipping stale {:?} for {client_order_id}: venue_order_id={venue_order_id} superseded by {bound}",
2102            report.order_status,
2103        );
2104        return;
2105    }
2106
2107    match report.order_status {
2108        OrderStatus::Accepted | OrderStatus::PartiallyFilled => {
2109            if dispatch_state.contains_filled(&client_order_id) {
2110                log::debug!("Skipping stale Accepted for {client_order_id} (already filled)",);
2111                return;
2112            }
2113            dispatch_state.record_venue_order_id(client_order_id, venue_order_id);
2114            ensure_accepted_emitted(
2115                emitter,
2116                dispatch_state,
2117                client_order_id,
2118                identity,
2119                venue_order_id,
2120                account_id,
2121                ts_accepted,
2122                ts_init,
2123            );
2124        }
2125        OrderStatus::Filled => {
2126            dispatch_state.record_venue_order_id(client_order_id, venue_order_id);
2127            ensure_accepted_emitted(
2128                emitter,
2129                dispatch_state,
2130                client_order_id,
2131                identity,
2132                venue_order_id,
2133                account_id,
2134                ts_accepted,
2135                ts_init,
2136            );
2137            // Mark the order terminal so replayed Accepted frames are
2138            // suppressed, but keep its identity alive: the matching
2139            // `.trades` frame may arrive after this `.orders` Filled
2140            // notice and still needs the tracked path to emit a proper
2141            // `OrderFilled`. Identity is retired by Canceled/Expired/
2142            // Rejected paths; full-fill leaks are bounded by submission
2143            // throughput.
2144            dispatch_state.mark_filled(client_order_id);
2145        }
2146        OrderStatus::Canceled => {
2147            ensure_accepted_emitted(
2148                emitter,
2149                dispatch_state,
2150                client_order_id,
2151                identity,
2152                venue_order_id,
2153                account_id,
2154                ts_accepted,
2155                ts_init,
2156            );
2157            let canceled = OrderCanceled::new(
2158                emitter.trader_id(),
2159                identity.strategy_id,
2160                identity.instrument_id,
2161                client_order_id,
2162                UUID4::new(),
2163                ts_event,
2164                ts_init,
2165                false,
2166                Some(venue_order_id),
2167                Some(account_id),
2168            );
2169            emitter.send_order_event(OrderEventAny::Canceled(canceled));
2170            dispatch_state.forget(&client_order_id);
2171        }
2172        OrderStatus::Expired => {
2173            ensure_accepted_emitted(
2174                emitter,
2175                dispatch_state,
2176                client_order_id,
2177                identity,
2178                venue_order_id,
2179                account_id,
2180                ts_accepted,
2181                ts_init,
2182            );
2183            let expired = OrderExpired::new(
2184                emitter.trader_id(),
2185                identity.strategy_id,
2186                identity.instrument_id,
2187                client_order_id,
2188                UUID4::new(),
2189                ts_event,
2190                ts_init,
2191                false,
2192                Some(venue_order_id),
2193                Some(account_id),
2194            );
2195            emitter.send_order_event(OrderEventAny::Expired(expired));
2196            dispatch_state.forget(&client_order_id);
2197        }
2198        OrderStatus::Rejected => {
2199            let reason = report
2200                .cancel_reason
2201                .as_deref()
2202                .unwrap_or("Order rejected by Derive");
2203            let due_post_only = derive_rejection_due_post_only(None, reason);
2204            let rejected = OrderRejected::new(
2205                emitter.trader_id(),
2206                identity.strategy_id,
2207                identity.instrument_id,
2208                client_order_id,
2209                account_id,
2210                Ustr::from(reason),
2211                UUID4::new(),
2212                ts_event,
2213                ts_init,
2214                false,
2215                due_post_only,
2216            );
2217            emitter.send_order_event(OrderEventAny::Rejected(rejected));
2218            dispatch_state.forget(&client_order_id);
2219        }
2220        other => {
2221            log::debug!(
2222                "Unhandled tracked order status {other:?} for {client_order_id}, sending as report",
2223            );
2224            emitter.send_order_status_report(report.clone());
2225        }
2226    }
2227}
2228
2229fn emit_tracked_fill(
2230    emitter: &ExecutionEventEmitter,
2231    dispatch_state: &WsDispatchState,
2232    client_order_id: ClientOrderId,
2233    identity: OrderIdentity,
2234    report: &FillReport,
2235    account_id: AccountId,
2236    ts_init: UnixNanos,
2237) {
2238    ensure_accepted_emitted(
2239        emitter,
2240        dispatch_state,
2241        client_order_id,
2242        identity,
2243        report.venue_order_id,
2244        account_id,
2245        report.ts_event,
2246        ts_init,
2247    );
2248
2249    let filled = OrderFilled::new(
2250        emitter.trader_id(),
2251        identity.strategy_id,
2252        identity.instrument_id,
2253        client_order_id,
2254        report.venue_order_id,
2255        account_id,
2256        report.trade_id,
2257        identity.order_side,
2258        identity.order_type,
2259        report.last_qty,
2260        report.last_px,
2261        report.commission.currency,
2262        report.liquidity_side,
2263        UUID4::new(),
2264        report.ts_event,
2265        ts_init,
2266        false,
2267        report.venue_position_id,
2268        Some(report.commission),
2269    );
2270    emitter.send_order_event(OrderEventAny::Filled(filled));
2271}
2272
2273/// Derives the worst-acceptable limit price for a market order from the
2274/// top-of-book quote and a slippage bound in basis points, rounded to the
2275/// instrument's `tick_size`.
2276///
2277/// Buys lift the ask by `slippage_bps` then round up to the next tick; sells
2278/// drop the bid by the same and round down. The result is the signed
2279/// `limit_price` slot in the EIP-712 trade module data; the venue uses it
2280/// as a worst-case bound while the order sweeps. A non-positive sell bound
2281/// is rejected (`None`) so the caller can deny the order rather than sign
2282/// an invalid zero limit.
2283fn market_order_limit_price(
2284    quote: &QuoteTick,
2285    side: OrderSide,
2286    slippage_bps: u32,
2287    tick_size: Decimal,
2288) -> Option<Decimal> {
2289    let bps = Decimal::from(slippage_bps);
2290    let scale = Decimal::from(10_000_u32);
2291    let one = Decimal::ONE;
2292    let raw = match side {
2293        OrderSide::Buy => quote.ask_price.as_decimal() * (one + bps / scale),
2294        OrderSide::Sell => quote.bid_price.as_decimal() * (one - bps / scale),
2295        // NoOrderSide is rejected upstream by `order_side_to_derive`.
2296        OrderSide::NoOrderSide => return None,
2297    };
2298    let rounded = round_to_tick(raw, tick_size, side);
2299    if rounded <= Decimal::ZERO {
2300        return None;
2301    }
2302    Some(rounded)
2303}
2304
2305fn trigger_market_limit_price(
2306    trigger_price: Decimal,
2307    side: OrderSide,
2308    slippage_bps: u32,
2309    tick_size: Decimal,
2310) -> Option<Decimal> {
2311    let bps = Decimal::from(slippage_bps);
2312    let scale = Decimal::from(10_000_u32);
2313    let one = Decimal::ONE;
2314    let raw = match side {
2315        OrderSide::Buy => trigger_price * (one + bps / scale),
2316        OrderSide::Sell => trigger_price * (one - bps / scale),
2317        OrderSide::NoOrderSide => return None,
2318    };
2319    let rounded = round_to_tick(raw, tick_size, side);
2320    if rounded <= Decimal::ZERO {
2321        return None;
2322    }
2323    Some(rounded)
2324}
2325
2326fn is_derive_trigger_order_type(order_type: OrderType) -> bool {
2327    matches!(
2328        order_type,
2329        OrderType::StopMarket
2330            | OrderType::StopLimit
2331            | OrderType::MarketIfTouched
2332            | OrderType::LimitIfTouched
2333    )
2334}
2335
2336fn trigger_order_signature_expiry(clock: &'static AtomicTime) -> i64 {
2337    let now_secs = (clock.get_time_ns().as_u64() / 1_000_000_000) as i64;
2338    now_secs + TRIGGER_ORDER_SIGNATURE_TTL.as_secs() as i64
2339}
2340
2341fn normal_order_signature_expiry(
2342    clock: &'static AtomicTime,
2343    signature_expiry_secs: u64,
2344) -> anyhow::Result<i64> {
2345    let min_ttl_secs = MIN_SIGNATURE_TTL.as_secs();
2346    if signature_expiry_secs <= min_ttl_secs {
2347        anyhow::bail!(
2348            "signature_expiry_secs {signature_expiry_secs}s must be greater than the Derive minimum {min_ttl_secs}s"
2349        );
2350    }
2351
2352    let now_secs_u64 = clock.get_time_ns().as_u64() / 1_000_000_000;
2353    let now_secs = i64::try_from(now_secs_u64).with_context(|| {
2354        format!("current UNIX time {now_secs_u64}s cannot fit in Derive signature_expiry_sec")
2355    })?;
2356    let ttl_secs = i64::try_from(signature_expiry_secs).with_context(|| {
2357        format!(
2358            "signature_expiry_secs {signature_expiry_secs}s cannot fit in Derive signature_expiry_sec"
2359        )
2360    })?;
2361
2362    now_secs.checked_add(ttl_secs).ok_or_else(|| {
2363        anyhow::anyhow!(
2364            "signature expiry overflows Derive signature_expiry_sec: now {now_secs}s plus TTL {ttl_secs}s"
2365        )
2366    })
2367}
2368
2369async fn refresh_market_order_quote(
2370    http_client: &DeriveHttpClient,
2371    venue_symbol: &str,
2372    instrument: &DeriveInstrument,
2373    clock: &'static AtomicTime,
2374) -> anyhow::Result<QuoteTick> {
2375    let ticker = http_client.get_ticker(venue_symbol).await?;
2376    let price_precision = Price::from_decimal(instrument.tick_size)
2377        .with_context(|| format!("invalid Derive tick_size for {venue_symbol}"))?
2378        .precision;
2379    let size_precision = Quantity::from_decimal(instrument.amount_step)
2380        .with_context(|| format!("invalid Derive amount_step for {venue_symbol}"))?
2381        .precision;
2382
2383    parse_ticker_quote_from_rest(
2384        &ticker,
2385        price_precision,
2386        size_precision,
2387        clock.get_time_ns(),
2388    )
2389}
2390
2391/// Rounds `value` to the nearest multiple of `tick_size`. Buys round up so
2392/// the signed bound remains acceptable to the venue; sells round down so the
2393/// caller does not accidentally tighten the floor. A non-positive `tick_size`
2394/// is treated as a no-op.
2395fn round_to_tick(value: Decimal, tick_size: Decimal, side: OrderSide) -> Decimal {
2396    if tick_size <= Decimal::ZERO {
2397        return value;
2398    }
2399    let ratio = value / tick_size;
2400    let ticks = match side {
2401        OrderSide::Buy => ratio.ceil(),
2402        OrderSide::Sell => ratio.floor(),
2403        OrderSide::NoOrderSide => ratio.round(),
2404    };
2405    ticks * tick_size
2406}
2407
2408async fn cached_or_fetch_instrument(
2409    http_client: &DeriveHttpClient,
2410    instruments: &Arc<AtomicMap<InstrumentId, DeriveInstrument>>,
2411    instrument_id: &InstrumentId,
2412    venue_symbol: &str,
2413) -> anyhow::Result<DeriveInstrument> {
2414    if let Some(cached) = instruments.get_cloned(instrument_id) {
2415        return Ok(cached);
2416    }
2417    let instrument = http_client
2418        .get_instrument(venue_symbol)
2419        .await
2420        .with_context(|| format!("failed to fetch instrument {venue_symbol}"))?;
2421    instruments.insert(*instrument_id, instrument.clone());
2422    Ok(instrument)
2423}
2424
2425#[cfg(test)]
2426mod tests {
2427    use std::{cell::RefCell, rc::Rc};
2428
2429    use nautilus_common::{cache::Cache, messages::ExecutionEvent};
2430    use nautilus_core::UnixNanos;
2431    use nautilus_live::ExecutionClientCore;
2432    use nautilus_model::{
2433        data::QuoteTick,
2434        enums::{AccountType, OmsType, TimeInForce},
2435        identifiers::{AccountId, ClientId, InstrumentId, StrategyId, TraderId},
2436        types::{Price, Quantity},
2437    };
2438    use rstest::rstest;
2439    use rust_decimal_macros::dec;
2440
2441    use super::*;
2442    use crate::common::{consts::DERIVE, enums::DeriveEnvironment};
2443
2444    const TEST_WALLET: &str = "0x0000000000000000000000000000000000001234";
2445    const TEST_SESSION_KEY: &str =
2446        "0x2ae8be44db8a590d20bffbe3b6872df9b569147d3bf6801a35a28281a4816bbd";
2447    const TEST_SUBACCOUNT: u64 = 30769;
2448
2449    fn test_core() -> ExecutionClientCore {
2450        let cache = Rc::new(RefCell::new(Cache::default()));
2451        ExecutionClientCore::new(
2452            TraderId::from("TRADER-001"),
2453            ClientId::from(DERIVE),
2454            *DERIVE_VENUE,
2455            OmsType::Netting,
2456            AccountId::from("DERIVE-001"),
2457            AccountType::Margin,
2458            None,
2459            cache,
2460        )
2461    }
2462
2463    fn test_config() -> DeriveExecClientConfig {
2464        DeriveExecClientConfig {
2465            wallet_address: Some(TEST_WALLET.to_string()),
2466            session_key: Some(TEST_SESSION_KEY.to_string()),
2467            subaccount_id: Some(TEST_SUBACCOUNT),
2468            environment: DeriveEnvironment::Testnet,
2469            domain_separator: Some(
2470                "0x2222222222222222222222222222222222222222222222222222222222222222".to_string(),
2471            ),
2472            action_typehash: Some(
2473                "0x1111111111111111111111111111111111111111111111111111111111111111".to_string(),
2474            ),
2475            trade_module_address: Some("0x000000000000000000000000000000000000bbbb".to_string()),
2476            ..DeriveExecClientConfig::default()
2477        }
2478    }
2479
2480    #[rstest]
2481    fn test_market_order_limit_price_buy_lifts_ask_and_rounds_up_to_tick() {
2482        let quote = QuoteTick::new(
2483            InstrumentId::from("ETH-PERP.DERIVE"),
2484            Price::from("3500.00"),
2485            Price::from("3501.00"),
2486            Quantity::from("1.000"),
2487            Quantity::from("1.000"),
2488            UnixNanos::from(0),
2489            UnixNanos::from(0),
2490        );
2491        // 50 bps; raw = 3501 * 1.005 = 3518.505; tick 0.01 rounds up to 3518.51.
2492        let price = market_order_limit_price(&quote, OrderSide::Buy, 50, dec!(0.01)).unwrap();
2493        assert_eq!(price, dec!(3518.51));
2494    }
2495
2496    #[rstest]
2497    fn test_market_order_limit_price_sell_drops_bid_rounds_down_and_denies_non_positive() {
2498        let quote = QuoteTick::new(
2499            InstrumentId::from("ETH-PERP.DERIVE"),
2500            Price::from("3500.00"),
2501            Price::from("3501.00"),
2502            Quantity::from("1.000"),
2503            Quantity::from("1.000"),
2504            UnixNanos::from(0),
2505            UnixNanos::from(0),
2506        );
2507        // 50 bps; raw = 3500 * 0.995 = 3482.5; tick 0.01 stays at 3482.5.
2508        let price = market_order_limit_price(&quote, OrderSide::Sell, 50, dec!(0.01)).unwrap();
2509        assert_eq!(price, dec!(3482.5));
2510
2511        // 20_000 bps = 200% slippage drives the rounded bound below zero; deny.
2512        let zero = market_order_limit_price(&quote, OrderSide::Sell, 20_000, dec!(0.01));
2513        assert!(zero.is_none());
2514    }
2515
2516    #[rstest]
2517    fn test_trigger_market_limit_price_uses_trigger_price_bound() {
2518        let buy = trigger_market_limit_price(dec!(3600), OrderSide::Buy, 50, dec!(0.01)).unwrap();
2519        let sell = trigger_market_limit_price(dec!(3600), OrderSide::Sell, 50, dec!(0.01)).unwrap();
2520        let zero = trigger_market_limit_price(dec!(1), OrderSide::Sell, 20_000, dec!(0.01));
2521
2522        assert_eq!(buy, dec!(3618));
2523        assert_eq!(sell, dec!(3582));
2524        assert!(zero.is_none());
2525    }
2526
2527    #[rstest]
2528    fn test_normal_order_signature_expiry_accepts_ttl_above_minimum() {
2529        let clock = get_atomic_clock_realtime();
2530        let start_secs = (clock.get_time_ns().as_u64() / 1_000_000_000) as i64;
2531        let ttl_secs = MIN_SIGNATURE_TTL.as_secs() + 1;
2532
2533        let expiry = normal_order_signature_expiry(clock, ttl_secs).expect("expiry is valid");
2534
2535        assert!(expiry >= start_secs + ttl_secs as i64);
2536    }
2537
2538    #[rstest]
2539    #[case(MIN_SIGNATURE_TTL.as_secs(), "must be greater than the Derive minimum")]
2540    #[case(MIN_SIGNATURE_TTL.as_secs() - 1, "must be greater than the Derive minimum")]
2541    fn test_normal_order_signature_expiry_rejects_minimum_or_lower_ttl(
2542        #[case] ttl_secs: u64,
2543        #[case] reason_fragment: &str,
2544    ) {
2545        let clock = get_atomic_clock_realtime();
2546
2547        let err = normal_order_signature_expiry(clock, ttl_secs).expect_err("TTL is too short");
2548
2549        assert!(
2550            err.to_string().contains(reason_fragment),
2551            "unexpected error: {err}",
2552        );
2553    }
2554
2555    #[rstest]
2556    #[case(i64::MAX as u64, "overflows Derive signature_expiry_sec")]
2557    #[case(u64::MAX, "cannot fit in Derive signature_expiry_sec")]
2558    fn test_normal_order_signature_expiry_rejects_extreme_ttl(
2559        #[case] ttl_secs: u64,
2560        #[case] reason_fragment: &str,
2561    ) {
2562        let clock = get_atomic_clock_realtime();
2563
2564        let err = normal_order_signature_expiry(clock, ttl_secs).expect_err("TTL is invalid");
2565
2566        assert!(
2567            err.to_string().contains(reason_fragment),
2568            "unexpected error: {err}",
2569        );
2570    }
2571
2572    #[rstest]
2573    #[case(OrderType::StopMarket, true)]
2574    #[case(OrderType::StopLimit, true)]
2575    #[case(OrderType::MarketIfTouched, true)]
2576    #[case(OrderType::LimitIfTouched, true)]
2577    #[case(OrderType::Market, false)]
2578    #[case(OrderType::Limit, false)]
2579    #[case(OrderType::MarketToLimit, false)]
2580    #[case(OrderType::TrailingStopMarket, false)]
2581    fn test_is_derive_trigger_order_type(#[case] order_type: OrderType, #[case] expected: bool) {
2582        assert_eq!(is_derive_trigger_order_type(order_type), expected);
2583    }
2584
2585    #[rstest]
2586    #[case(dec!(0))]
2587    #[case(dec!(-1))]
2588    fn test_round_to_tick_treats_non_positive_tick_as_no_op(#[case] tick: Decimal) {
2589        // Non-positive tick must pass through both sides untouched so the
2590        // signing path does not divide by zero or amplify garbage tick data.
2591        assert_eq!(
2592            round_to_tick(dec!(3501.55), tick, OrderSide::Buy),
2593            dec!(3501.55)
2594        );
2595        assert_eq!(
2596            round_to_tick(dec!(3501.55), tick, OrderSide::Sell),
2597            dec!(3501.55)
2598        );
2599    }
2600
2601    #[rstest]
2602    fn test_resolve_signing_context_rejects_placeholder_domain_separator() {
2603        // The shipped mainnet defaults are real Protocol Constants, so force
2604        // an explicit placeholder via the config override to verify the
2605        // placeholder-detection path still refuses to construct.
2606        let mut config = test_config();
2607        config.environment = DeriveEnvironment::Mainnet;
2608        config.domain_separator =
2609            Some("0x<paste_from_docs.derive.xyz_protocol_constants>".to_string());
2610        let err = DeriveExecutionClient::new(test_core(), config).expect_err("must reject");
2611        let msg = err.to_string();
2612        assert!(msg.contains("placeholder"), "unexpected error: {msg}",);
2613    }
2614
2615    #[rstest]
2616    fn test_resolve_signing_context_uses_mainnet_defaults() {
2617        let mut config = test_config();
2618        config.environment = DeriveEnvironment::Mainnet;
2619        config.domain_separator = None;
2620        config.action_typehash = None;
2621        config.trade_module_address = None;
2622
2623        DeriveExecutionClient::new(test_core(), config).expect("mainnet defaults should parse");
2624    }
2625
2626    #[rstest]
2627    fn test_resolve_signing_context_uses_testnet_defaults() {
2628        let mut config = test_config();
2629        config.environment = DeriveEnvironment::Testnet;
2630        config.domain_separator = None;
2631        config.action_typehash = None;
2632        config.trade_module_address = None;
2633
2634        DeriveExecutionClient::new(test_core(), config).expect("testnet defaults should parse");
2635    }
2636
2637    #[rstest]
2638    fn test_market_order_limit_price_rounds_to_coarse_tick() {
2639        // Coarse tick = 1.0 (e.g. weekly option strikes); raw 3518.505 rounds
2640        // up to 3519, raw 3482.5 rounds down to 3482.
2641        let quote = QuoteTick::new(
2642            InstrumentId::from("ETH-20260627-3500-C.DERIVE"),
2643            Price::from("3500"),
2644            Price::from("3501"),
2645            Quantity::from("1.000"),
2646            Quantity::from("1.000"),
2647            UnixNanos::from(0),
2648            UnixNanos::from(0),
2649        );
2650        let buy = market_order_limit_price(&quote, OrderSide::Buy, 50, dec!(1)).unwrap();
2651        assert_eq!(buy, dec!(3519));
2652        let sell = market_order_limit_price(&quote, OrderSide::Sell, 50, dec!(1)).unwrap();
2653        assert_eq!(sell, dec!(3482));
2654    }
2655
2656    #[rstest]
2657    fn test_new_populates_identity() {
2658        let core = test_core();
2659        let client = DeriveExecutionClient::new(core, test_config()).unwrap();
2660
2661        assert_eq!(client.client_id(), ClientId::from(DERIVE));
2662        assert_eq!(client.account_id(), AccountId::from("DERIVE-001"));
2663        assert_eq!(client.venue(), *DERIVE_VENUE);
2664        assert_eq!(client.oms_type(), OmsType::Netting);
2665        assert_eq!(client.subaccount_id(), TEST_SUBACCOUNT);
2666        assert!(!client.is_connected());
2667    }
2668
2669    #[rstest]
2670    fn test_emit_tracked_event_suppresses_in_flight_replace_cancel_leg() {
2671        // Derive's `private/replace` cancels the old order; the `.orders`
2672        // cancel-of-old leg can arrive before `modify_order` rebinds the order,
2673        // i.e. while the replace is in flight. In that window only the
2674        // `pending_modify` marker (not the bound-id check) can suppress it. The
2675        // integration suite covers the post-rebind bound-id branch; this covers
2676        // the in-flight branch, which is otherwise unexercised end to end.
2677        let clock = get_atomic_clock_realtime();
2678        let account_id = AccountId::from("DERIVE-001");
2679        let instrument_id = InstrumentId::from("ETH-PERP.DERIVE");
2680        let cid = ClientOrderId::from("STRAT-MOD-INFLIGHT");
2681        let stale_voi = VenueOrderId::from("ord-stale-1");
2682        let identity = OrderIdentity {
2683            instrument_id,
2684            strategy_id: StrategyId::from("S-1"),
2685            order_side: OrderSide::Buy,
2686            order_type: OrderType::Limit,
2687        };
2688        // A `cancelled` report for the stale leg, identical across both cases:
2689        // only the dispatch-state marker differs.
2690        let report = OrderStatusReport::new(
2691            account_id,
2692            instrument_id,
2693            Some(cid),
2694            stale_voi,
2695            OrderSide::Buy,
2696            OrderType::Limit,
2697            TimeInForce::Gtc,
2698            OrderStatus::Canceled,
2699            Quantity::from("1.000"),
2700            Quantity::from("0.000"),
2701            UnixNanos::from(1_000),
2702            UnixNanos::from(2_000),
2703            UnixNanos::from(3_000),
2704            None,
2705        );
2706
2707        let new_emitter = || {
2708            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2709            let mut emitter = ExecutionEventEmitter::new(
2710                clock,
2711                TraderId::from("TRADER-001"),
2712                account_id,
2713                AccountType::Margin,
2714                Some(Currency::USDC()),
2715            );
2716            emitter.set_sender(tx);
2717            (emitter, rx)
2718        };
2719
2720        // Marker targets the cancel's venue order id and no bound id is
2721        // recorded, so suppression can only come from the in-flight branch.
2722        let (emitter, mut rx) = new_emitter();
2723        let state = WsDispatchState::new();
2724        state.mark_pending_modify(cid, stale_voi);
2725        emit_tracked_order_event(
2726            &emitter,
2727            &state,
2728            cid,
2729            identity,
2730            &report,
2731            account_id,
2732            UnixNanos::from(0),
2733        );
2734        let suppressed = rx.try_recv().is_err();
2735
2736        // A marker for a different venue order id must not suppress: the guard
2737        // keys on the specific id, so the cancel-of-old still terminates.
2738        let (emitter, mut rx) = new_emitter();
2739        let state = WsDispatchState::new();
2740        state.mark_pending_modify(cid, VenueOrderId::from("ord-other"));
2741        emit_tracked_order_event(
2742            &emitter,
2743            &state,
2744            cid,
2745            identity,
2746            &report,
2747            account_id,
2748            UnixNanos::from(0),
2749        );
2750        let mut saw_canceled = false;
2751
2752        while let Ok(event) = rx.try_recv() {
2753            if matches!(event, ExecutionEvent::Order(OrderEventAny::Canceled(_))) {
2754                saw_canceled = true;
2755            }
2756        }
2757
2758        assert!(
2759            suppressed,
2760            "in-flight cancel-of-old leg must be suppressed by the pending-modify marker",
2761        );
2762        assert!(
2763            saw_canceled,
2764            "a pending-modify marker for a different venue order id must not suppress",
2765        );
2766    }
2767}