1use std::{
27 sync::{
28 Arc, Mutex,
29 atomic::{AtomicBool, Ordering},
30 },
31 time::{Duration, Instant},
32};
33
34use ahash::AHashSet;
35use anyhow::Context;
36use async_trait::async_trait;
37use nautilus_common::{
38 clients::ExecutionClient,
39 live::{get_runtime, runner::get_exec_event_sender},
40 messages::execution::{
41 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
42 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
43 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
44 },
45};
46use nautilus_core::{
47 AtomicMap, MUTEX_POISONED, UUID4, UnixNanos,
48 time::{AtomicTime, get_atomic_clock_realtime},
49};
50use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
51use nautilus_model::{
52 accounts::AccountAny,
53 data::QuoteTick,
54 enums::{OmsType, OrderSide, OrderStatus, OrderType, PositionSideSpecified},
55 events::{
56 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
57 },
58 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Symbol, Venue, VenueOrderId},
59 instruments::InstrumentAny,
60 orders::Order,
61 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
62 types::{AccountBalance, Currency, MarginBalance, Price, Quantity},
63};
64use rust_decimal::Decimal;
65use tokio::task::JoinHandle;
66use tokio_util::sync::CancellationToken;
67use ustr::Ustr;
68
69use crate::{
70 common::{
71 consts::{
72 DERIVE_ACCOUNT_REGISTRATION_TIMEOUT_SECS, DERIVE_VENUE, MIN_SIGNATURE_TTL,
73 TRIGGER_ORDER_SIGNATURE_TTL,
74 },
75 credential::DeriveCredential,
76 enums::{DeriveInstrumentType, DeriveOrderSide},
77 parse::{derive_rejection_due_post_only, format_instrument_id, format_venue_symbol},
78 retry::{http_retry_config, is_write_outcome_ambiguous_ws},
79 },
80 config::DeriveExecClientConfig,
81 http::{
82 DeriveCredentials, DeriveHttpClient,
83 models::{DeriveInstrument, DeriveOrder, DeriveTrade},
84 parse::{
85 parse_derive_order_to_report, parse_derive_position_to_report,
86 parse_derive_subaccount_to_balances, parse_derive_trade_to_fill_report,
87 },
88 query::{
89 DeriveCancelAllParams, DeriveCancelParams, DeriveCancelTriggerOrderParams,
90 DeriveGetOpenOrdersParams, DeriveGetOrderHistoryParams, DeriveGetOrderParams,
91 DeriveGetPositionsParams, DeriveGetSubaccountParams, DeriveGetTradeHistoryParams,
92 DeriveGetTriggerOrdersParams, order_replace_to_derive_payload, order_to_derive_payload,
93 trigger_order_to_derive_payload,
94 },
95 },
96 signing::{
97 context::{SigningContext, resolve_signing_context},
98 nonce::NonceManager,
99 },
100 websocket::{
101 DeriveOrdersSubscriptionData, DeriveTradesSubscriptionData, DeriveWebSocketClient,
102 DeriveWsChannel, DeriveWsCredentials, DeriveWsError, DeriveWsExecutionHandle,
103 DeriveWsMessage, OrderIdentity, WsDispatchState, parse::parse_ticker_quote_from_rest,
104 },
105};
106
107const DERIVE_PRIVATE_PAGE_SIZE: u32 = 500;
108
109#[derive(Debug)]
116pub struct DeriveExecutionClient {
117 core: ExecutionClientCore,
118 clock: &'static AtomicTime,
119 config: DeriveExecClientConfig,
120 credential: DeriveCredential,
121 emitter: ExecutionEventEmitter,
122 http_client: DeriveHttpClient,
123 ws_client: DeriveWebSocketClient,
124 ws_exec: DeriveWsExecutionHandle,
125 instruments: Arc<AtomicMap<InstrumentId, DeriveInstrument>>,
126 nonce_manager: Arc<NonceManager>,
127 signing: SigningContext,
128 is_connected: AtomicBool,
129 cancellation_token: CancellationToken,
130 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
131 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
132 dispatch_state: Arc<WsDispatchState>,
133}
134
135impl DeriveExecutionClient {
136 pub fn new(core: ExecutionClientCore, config: DeriveExecClientConfig) -> anyhow::Result<Self> {
151 let credential = DeriveCredential::resolve(
152 config.wallet_address.clone(),
153 config.session_key.clone(),
154 config.subaccount_id,
155 config.environment,
156 )?;
157
158 let http_credentials = DeriveCredentials::new(
159 credential.wallet_address().to_string(),
160 credential.session_key(),
161 )
162 .context("failed to build Derive HTTP credentials")?;
163 let retry_config = http_retry_config(
164 config.max_retries,
165 config.retry_delay_initial_ms,
166 config.retry_delay_max_ms,
167 );
168 let http_client = DeriveHttpClient::with_credentials(
169 config.rest_url(),
170 http_credentials,
171 Some(config.http_timeout_secs),
172 config.proxy_url.clone(),
173 Some(retry_config),
174 )
175 .context("failed to create Derive HTTP client")?;
176
177 let ws_credentials = DeriveWsCredentials::new(
178 credential.wallet_address().to_string(),
179 credential.session_key(),
180 )
181 .context("failed to build Derive WebSocket credentials")?;
182 let ws_client = DeriveWebSocketClient::with_credentials(
183 Some(config.ws_url()),
184 config.environment,
185 config.transport_backend,
186 config.proxy_url.clone(),
187 ws_credentials,
188 config.max_matching_requests_per_second,
189 );
190 let ws_exec = ws_client.execution_handle();
193
194 let signing = resolve_signing_context(&credential, &config)?;
195
196 let clock = get_atomic_clock_realtime();
197 let emitter = ExecutionEventEmitter::new(
198 clock,
199 core.trader_id,
200 core.account_id,
201 core.account_type,
202 core.base_currency,
203 );
204
205 Ok(Self {
206 core,
207 clock,
208 config,
209 credential,
210 emitter,
211 http_client,
212 ws_client,
213 ws_exec,
214 instruments: Arc::new(AtomicMap::new()),
215 nonce_manager: Arc::new(NonceManager::new()),
216 signing,
217 is_connected: AtomicBool::new(false),
218 cancellation_token: CancellationToken::new(),
219 pending_tasks: Mutex::new(Vec::new()),
220 ws_stream_handle: Mutex::new(None),
221 dispatch_state: Arc::new(WsDispatchState::new()),
222 })
223 }
224
225 #[must_use]
227 pub const fn subaccount_id(&self) -> u64 {
228 self.credential.subaccount_id()
229 }
230
231 #[must_use]
233 pub fn config(&self) -> &DeriveExecClientConfig {
234 &self.config
235 }
236
237 #[must_use]
239 pub fn http_client(&self) -> &DeriveHttpClient {
240 &self.http_client
241 }
242
243 pub fn cache_instrument(&self, instrument: DeriveInstrument) {
247 let instrument_id = format_instrument_id(instrument.instrument_name);
248 self.instruments.insert(instrument_id, instrument);
249 }
250
251 fn spawn_task<F>(&self, description: &'static str, fut: F)
253 where
254 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
255 {
256 let runtime = get_runtime();
257 let handle = runtime.spawn(async move {
258 if let Err(e) = fut.await {
259 log::warn!("{description} failed: {e:?}");
260 }
261 });
262
263 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
264 tasks.retain(|handle| !handle.is_finished());
265 tasks.push(handle);
266 }
267
268 fn abort_pending_tasks(&self) {
269 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
270 for handle in tasks.drain(..) {
271 handle.abort();
272 }
273 }
274
275 async fn ensure_instruments_initialized(&self) -> anyhow::Result<()> {
276 if self.core.instruments_initialized() {
277 return Ok(());
278 }
279 self.core.set_instruments_initialized();
282 Ok(())
283 }
284
285 async fn refresh_account_state(&self) -> anyhow::Result<()> {
286 let value = self
287 .http_client
288 .get_subaccount(&DeriveGetSubaccountParams::new(
289 self.credential.subaccount_id(),
290 ))
291 .await
292 .context("failed to fetch Derive subaccount snapshot")?;
293 let (balances, margins) = parse_derive_subaccount_to_balances(&value)
294 .context("failed to parse Derive subaccount balances")?;
295 let ts_event = self.clock.get_time_ns();
296 self.emitter
297 .emit_account_state(balances, margins, true, ts_event);
298 Ok(())
299 }
300
301 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
309 let account_id = self.core.account_id;
310
311 if self.core.cache().account(&account_id).is_some() {
312 log::info!("Account {account_id} registered");
313 return Ok(());
314 }
315
316 let start = Instant::now();
317 let timeout = Duration::from_secs_f64(timeout_secs);
318 let interval = Duration::from_millis(10);
319
320 loop {
321 tokio::time::sleep(interval).await;
322
323 if self.core.cache().account(&account_id).is_some() {
324 log::info!("Account {account_id} registered");
325 return Ok(());
326 }
327
328 if start.elapsed() >= timeout {
329 anyhow::bail!(
330 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
331 );
332 }
333 }
334 }
335
336 async fn teardown_partial_connect(&mut self) {
341 self.cancellation_token.cancel();
342
343 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
344 handle.abort();
345 }
346
347 if let Err(e) = self.ws_client.disconnect().await {
348 log::warn!("Error tearing down Derive WebSocket after connect failure: {e}");
349 }
350 self.abort_pending_tasks();
351 }
352
353 fn start_ws_dispatch(&self, rx: tokio::sync::mpsc::UnboundedReceiver<DeriveWsMessage>) {
354 let emitter = self.emitter.clone();
355 let account_id = self.core.account_id;
356 let clock = self.clock;
357 let cancellation = self.cancellation_token.clone();
358 let dispatch_state = self.dispatch_state.clone();
359
360 let handle = get_runtime().spawn(async move {
361 let mut rx = rx;
362
363 loop {
364 tokio::select! {
365 biased;
366 () = cancellation.cancelled() => break,
367 maybe = rx.recv() => {
368 match maybe {
369 Some(message) => handle_ws_message(
370 message,
371 &emitter,
372 account_id,
373 clock,
374 &dispatch_state,
375 ),
376 None => break,
377 }
378 }
379 }
380 }
381 });
382 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
383 }
384}
385
386#[async_trait(?Send)]
387impl ExecutionClient for DeriveExecutionClient {
388 fn is_connected(&self) -> bool {
389 self.is_connected.load(Ordering::Acquire)
390 }
391
392 fn client_id(&self) -> ClientId {
393 self.core.client_id
394 }
395
396 fn account_id(&self) -> AccountId {
397 self.core.account_id
398 }
399
400 fn venue(&self) -> Venue {
401 *DERIVE_VENUE
402 }
403
404 fn oms_type(&self) -> OmsType {
405 self.core.oms_type
406 }
407
408 fn get_account(&self) -> Option<AccountAny> {
409 self.core.cache().account_owned(&self.core.account_id)
410 }
411
412 fn start(&mut self) -> anyhow::Result<()> {
413 if self.core.is_started() {
414 return Ok(());
415 }
416
417 let sender = get_exec_event_sender();
418 self.emitter.set_sender(sender);
419 self.core.set_started();
420
421 log::info!(
422 "Started: client_id={}, account_id={}, subaccount_id={}, environment={:?}, proxy_url={:?}",
423 self.core.client_id,
424 self.core.account_id,
425 self.credential.subaccount_id(),
426 self.config.environment,
427 self.config.proxy_url,
428 );
429 Ok(())
430 }
431
432 fn stop(&mut self) -> anyhow::Result<()> {
433 if self.core.is_stopped() {
434 return Ok(());
435 }
436
437 log::info!("Stopping Derive execution client");
438
439 self.cancellation_token.cancel();
440
441 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
442 handle.abort();
443 }
444 self.abort_pending_tasks();
445
446 self.core.set_disconnected();
447 self.core.set_stopped();
448 self.is_connected.store(false, Ordering::Release);
449
450 log::info!("Derive execution client stopped");
451 Ok(())
452 }
453
454 async fn connect(&mut self) -> anyhow::Result<()> {
455 if self.is_connected() {
456 return Ok(());
457 }
458
459 log::info!("Connecting Derive execution client");
460
461 if self.cancellation_token.is_cancelled() {
462 self.cancellation_token = CancellationToken::new();
463 }
464
465 self.ensure_instruments_initialized()
466 .await
467 .context("failed to initialize Derive instruments")?;
468
469 self.ws_client
470 .connect()
471 .await
472 .context("failed to connect Derive WebSocket")?;
473 let rx = self
474 .ws_client
475 .take_event_receiver()
476 .context("Derive execution WS event receiver not initialized")?;
477
478 let subaccount_id = self.credential.subaccount_id();
479 let channels = vec![
480 DeriveWsChannel::orders(subaccount_id),
481 DeriveWsChannel::private_trades(subaccount_id),
482 DeriveWsChannel::balances(subaccount_id),
483 ];
484
485 if let Err(e) = self.ws_client.subscribe_channels(channels).await {
486 log::warn!("Derive private WS subscriptions failed: {e}");
487 }
488
489 self.start_ws_dispatch(rx);
490
491 if let Err(e) = self.refresh_account_state().await {
496 log::warn!("Initial Derive account state refresh failed: {e}; tearing down");
497 self.teardown_partial_connect().await;
498 return Err(e.context("failed initial Derive account state refresh"));
499 }
500
501 if let Err(e) = self
502 .await_account_registered(DERIVE_ACCOUNT_REGISTRATION_TIMEOUT_SECS)
503 .await
504 {
505 log::warn!("Derive account did not register in time: {e}; tearing down");
506 self.teardown_partial_connect().await;
507 return Err(e.context("failed waiting for Derive account registration"));
508 }
509
510 self.core.set_connected();
511 self.is_connected.store(true, Ordering::Release);
512 log::info!(
513 "Connected Derive execution client ({:?})",
514 self.config.environment
515 );
516 Ok(())
517 }
518
519 async fn disconnect(&mut self) -> anyhow::Result<()> {
520 if !self.is_connected() {
521 return Ok(());
522 }
523
524 log::info!("Disconnecting Derive execution client");
525 self.cancellation_token.cancel();
526
527 if let Err(e) = self.ws_client.disconnect().await {
528 log::warn!("Error while disconnecting Derive execution WebSocket: {e}");
529 }
530
531 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
532 handle.abort();
533 }
534 self.abort_pending_tasks();
535
536 self.core.set_disconnected();
537 self.is_connected.store(false, Ordering::Release);
538 log::info!("Derive execution client disconnected");
539 Ok(())
540 }
541
542 fn generate_account_state(
543 &self,
544 balances: Vec<AccountBalance>,
545 margins: Vec<MarginBalance>,
546 reported: bool,
547 ts_event: UnixNanos,
548 ) -> anyhow::Result<()> {
549 self.emitter
550 .emit_account_state(balances, margins, reported, ts_event);
551 Ok(())
552 }
553
554 fn on_instrument(&mut self, _instrument: InstrumentAny) {
555 }
561
562 async fn generate_order_status_report(
563 &self,
564 cmd: &GenerateOrderStatusReport,
565 ) -> anyhow::Result<Option<OrderStatusReport>> {
566 if cmd.venue_order_id.is_none() && cmd.client_order_id.is_none() {
567 log::warn!(
568 "Derive generate_order_status_report requires venue_order_id or client_order_id"
569 );
570 return Ok(None);
571 }
572
573 let subaccount_id = self.credential.subaccount_id();
574 let order = if let Some(venue_order_id) = cmd.venue_order_id {
575 match self
576 .http_client
577 .get_order(&DeriveGetOrderParams::new(
578 subaccount_id,
579 venue_order_id.as_str(),
580 ))
581 .await
582 {
583 Ok(order) => Some(order),
584 Err(e) => {
585 let trigger_orders = self
586 .http_client
587 .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
588 .await?
589 .orders;
590
591 match trigger_orders
592 .into_iter()
593 .find(|o| o.order_id.as_str() == venue_order_id.as_str())
594 {
595 Some(order) => Some(order),
596 None => return Err(e.into()),
597 }
598 }
599 }
600 } else {
601 let label = cmd.client_order_id.expect("guarded above");
606 let open_orders = self
607 .http_client
608 .get_open_orders(&DeriveGetOpenOrdersParams::new(subaccount_id))
609 .await?
610 .orders;
611 let mut found = open_orders
612 .into_iter()
613 .find(|o| o.label.as_str() == label.as_str());
614
615 if found.is_none() {
616 let trigger_orders = self
617 .http_client
618 .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
619 .await?
620 .orders;
621 found = trigger_orders
622 .into_iter()
623 .find(|o| o.label.as_str() == label.as_str());
624 }
625
626 if found.is_none() {
627 let instrument_name = cmd.instrument_id.map(|id| id.symbol.as_str().to_string());
628 let mut page: u32 = 1;
629
630 'history: loop {
631 let mut params = DeriveGetOrderHistoryParams::new(
632 subaccount_id,
633 page,
634 DERIVE_PRIVATE_PAGE_SIZE,
635 );
636
637 if let Some(name) = instrument_name.as_deref() {
638 params = params.with_instrument_name(name);
639 }
640
641 let result = self.http_client.get_order_history(¶ms).await?;
642 let total_pages = result.pagination.num_pages;
643
644 for order in result.orders {
645 if order.label.as_str() == label.as_str() {
646 found = Some(order);
647 break 'history;
648 }
649 }
650
651 if (page as i64) >= total_pages || total_pages == 0 {
652 break;
653 }
654 page += 1;
655 }
656 }
657 found
658 };
659
660 let Some(order) = order else {
661 return Ok(None);
662 };
663
664 if let Some(instrument_id) = cmd.instrument_id
665 && InstrumentId::new(Symbol::new(order.instrument_name.as_str()), *DERIVE_VENUE)
666 != instrument_id
667 {
668 log::warn!(
669 "Derive order {} is for {} but report requested {}",
670 order.order_id,
671 order.instrument_name.as_str(),
672 instrument_id,
673 );
674 return Ok(None);
675 }
676
677 let ts_init = self.clock.get_time_ns();
678 let mut report = parse_derive_order_to_report(&order, self.core.account_id, ts_init)?;
679 if report.client_order_id.is_none()
682 && let Some(client_order_id) = cmd.client_order_id
683 {
684 report = report.with_client_order_id(client_order_id);
685 }
686 Ok(Some(report))
687 }
688
689 async fn generate_order_status_reports(
690 &self,
691 cmd: &GenerateOrderStatusReports,
692 ) -> anyhow::Result<Vec<OrderStatusReport>> {
693 let subaccount_id = self.credential.subaccount_id();
694 let instrument_name = cmd.instrument_id.map(|id| id.symbol.as_str().to_string());
695
696 let orders: Vec<DeriveOrder> = if cmd.open_only {
702 let mut orders = self
703 .http_client
704 .get_open_orders(&DeriveGetOpenOrdersParams::new(subaccount_id))
705 .await?
706 .orders;
707 orders.extend(
708 self.http_client
709 .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
710 .await?
711 .orders,
712 );
713 orders
714 } else {
715 let start_ms = cmd.start.map(|t| t.as_millis() as i64);
716 let end_ms = cmd.end.map(|t| t.as_millis() as i64);
717 let mut page: u32 = 1;
718 let mut collected: Vec<DeriveOrder> = Vec::new();
719
720 loop {
721 let mut params =
722 DeriveGetOrderHistoryParams::new(subaccount_id, page, DERIVE_PRIVATE_PAGE_SIZE)
723 .with_window(start_ms, end_ms);
724
725 if let Some(name) = instrument_name.as_deref() {
726 params = params.with_instrument_name(name);
727 }
728
729 let result = self.http_client.get_order_history(¶ms).await?;
730 let total_pages = result.pagination.num_pages;
731 collected.extend(result.orders);
732
733 if (page as i64) >= total_pages || total_pages == 0 {
734 break;
735 }
736 page += 1;
737 }
738 collected
739 };
740
741 let ts_init = self.clock.get_time_ns();
742 let start_ms = cmd.start.map(|t| t.as_millis() as i64);
743 let end_ms = cmd.end.map(|t| t.as_millis() as i64);
744 let mut reports = Vec::with_capacity(orders.len());
745 for order in orders {
746 if let Some(instrument_id) = cmd.instrument_id
747 && InstrumentId::new(Symbol::new(order.instrument_name.as_str()), *DERIVE_VENUE)
748 != instrument_id
749 {
750 continue;
751 }
752 if let Some(start) = start_ms
756 && order.last_update_timestamp < start
757 {
758 continue;
759 }
760
761 if let Some(end) = end_ms
762 && order.last_update_timestamp > end
763 {
764 continue;
765 }
766
767 match parse_derive_order_to_report(&order, self.core.account_id, ts_init) {
768 Ok(report) => reports.push(report),
769 Err(e) => log::warn!("Skipping order in status report: {e}"),
770 }
771 }
772 Ok(reports)
773 }
774
775 async fn generate_fill_reports(
776 &self,
777 cmd: GenerateFillReports,
778 ) -> anyhow::Result<Vec<FillReport>> {
779 let instrument_name = cmd.instrument_id.map(|id| id.symbol.as_str().to_string());
780 let mut page: u32 = 1;
781 let mut all_trades: Vec<DeriveTrade> = Vec::new();
782
783 loop {
784 let mut params = DeriveGetTradeHistoryParams::new(
785 self.credential.subaccount_id(),
786 page,
787 DERIVE_PRIVATE_PAGE_SIZE,
788 )
789 .with_window(
790 cmd.start.map(|t| t.as_millis() as i64),
791 cmd.end.map(|t| t.as_millis() as i64),
792 );
793
794 if let Some(name) = instrument_name.as_deref() {
795 params = params.with_instrument_name(name);
796 }
797
798 let result = self.http_client.get_private_trade_history(¶ms).await?;
799 let total_pages = result.pagination.num_pages;
800 all_trades.extend(result.trades);
801
802 if (page as i64) >= total_pages || total_pages == 0 {
803 break;
804 }
805 page += 1;
806 }
807
808 let ts_init = self.clock.get_time_ns();
809 let fee_currency = Currency::USDC();
810 let venue_order_id_filter = cmd
811 .venue_order_id
812 .as_ref()
813 .map(|id| id.as_str().to_string());
814 let mut reports = Vec::with_capacity(all_trades.len());
815 for trade in all_trades {
816 if let Some(target) = venue_order_id_filter.as_deref()
817 && trade.order_id != target
818 {
819 continue;
820 }
821
822 match parse_derive_trade_to_fill_report(
823 &trade,
824 self.core.account_id,
825 fee_currency,
826 ts_init,
827 ) {
828 Ok(Some(report)) => {
829 if self.dispatch_state.check_and_insert_trade(report.trade_id) {
833 log::debug!(
834 "Skipping duplicate Derive fill (trade_id={}) in generate_fill_reports",
835 report.trade_id,
836 );
837 continue;
838 }
839 reports.push(report);
840 }
841 Ok(None) => {}
842 Err(e) => log::warn!("Skipping trade in fill report: {e}"),
843 }
844 }
845 Ok(reports)
846 }
847
848 async fn generate_position_status_reports(
849 &self,
850 cmd: &GeneratePositionStatusReports,
851 ) -> anyhow::Result<Vec<PositionStatusReport>> {
852 let positions = self
853 .http_client
854 .get_positions(&DeriveGetPositionsParams::new(
855 self.credential.subaccount_id(),
856 ))
857 .await?
858 .positions;
859 let ts_init = self.clock.get_time_ns();
860 let mut reports = Vec::with_capacity(positions.len());
861 for position in positions {
862 if let Some(target) = cmd.instrument_id
863 && InstrumentId::new(
864 Symbol::new(position.instrument_name.as_str()),
865 *DERIVE_VENUE,
866 ) != target
867 {
868 continue;
869 }
870
871 match parse_derive_position_to_report(&position, self.core.account_id, ts_init) {
872 Ok(report) => reports.push(report),
873 Err(e) => log::warn!("Skipping position in status report: {e}"),
874 }
875 }
876 Ok(reports)
877 }
878
879 async fn generate_mass_status(
880 &self,
881 lookback_mins: Option<u64>,
882 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
883 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
884
885 let ts_now = self.clock.get_time_ns();
886 let start = lookback_mins.map(|mins| {
887 let lookback_ns = mins.saturating_mul(60).saturating_mul(1_000_000_000);
888 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
889 });
890
891 let open_order_cmd = GenerateOrderStatusReports::new(
892 UUID4::new(),
893 ts_now,
894 true,
895 None,
896 None,
897 None,
898 None,
899 None,
900 );
901 let history_order_cmd = GenerateOrderStatusReports::new(
902 UUID4::new(),
903 ts_now,
904 false,
905 None,
906 start,
907 None,
908 None,
909 None,
910 );
911 let fill_cmd =
912 GenerateFillReports::new(UUID4::new(), ts_now, None, None, start, None, None, None);
913 let position_cmd =
914 GeneratePositionStatusReports::new(UUID4::new(), ts_now, None, None, None, None, None);
915
916 let (history_order_reports, open_order_reports, fill_reports, position_reports) = tokio::try_join!(
917 self.generate_order_status_reports(&history_order_cmd),
918 self.generate_order_status_reports(&open_order_cmd),
919 self.generate_fill_reports(fill_cmd),
920 self.generate_position_status_reports(&position_cmd),
921 )?;
922
923 log::info!(
924 "Received {} historical OrderStatusReports",
925 history_order_reports.len()
926 );
927 log::info!(
928 "Received {} open OrderStatusReports",
929 open_order_reports.len()
930 );
931 log::info!("Received {} FillReports", fill_reports.len());
932 log::info!("Received {} PositionReports", position_reports.len());
933
934 let mut touched_instruments = AHashSet::new();
935
936 for report in history_order_reports
937 .iter()
938 .chain(open_order_reports.iter())
939 {
940 touched_instruments.insert(report.instrument_id);
941 }
942
943 for report in &fill_reports {
944 touched_instruments.insert(report.instrument_id);
945 }
946
947 let mut mass_status = ExecutionMassStatus::new(
948 self.core.client_id,
949 self.core.account_id,
950 *DERIVE_VENUE,
951 ts_now,
952 None,
953 );
954
955 mass_status.add_order_reports(history_order_reports);
956 mass_status.add_order_reports(open_order_reports);
957 mass_status.add_fill_reports(fill_reports);
958 mass_status.add_position_reports(position_reports);
959 add_missing_flat_position_reports(
960 &mut mass_status,
961 self.core.account_id,
962 touched_instruments,
963 ts_now,
964 );
965
966 Ok(Some(mass_status))
967 }
968
969 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
970 let order = self
971 .core
972 .cache()
973 .order(&cmd.client_order_id)
974 .map(|o| o.clone())
975 .ok_or_else(|| {
976 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
977 })?;
978
979 if order.is_closed() {
980 log::warn!("Cannot submit closed order {}", order.client_order_id());
981 return Ok(());
982 }
983
984 if order.is_reduce_only()
988 && matches!(
989 self.core.cache().instrument(&cmd.instrument_id),
990 Some(InstrumentAny::CurrencyPair(_))
991 )
992 {
993 let reason = format!(
994 "reduce-only is not supported for spot instrument {}; Derive spot has no position to reduce",
995 cmd.instrument_id,
996 );
997 log::warn!("{reason}");
998 self.emitter.emit_order_denied(&order, &reason);
999 return Ok(());
1000 }
1001
1002 let is_trigger_order = is_derive_trigger_order_type(order.order_type());
1004 let market_quote = if order.order_type() == OrderType::Market {
1005 match self.core.cache().quote(&cmd.instrument_id) {
1006 Some(_) => Some(()),
1007 None => {
1008 let reason = format!(
1009 "no cached quote for {}; subscribe to quote data before submitting market orders",
1010 cmd.instrument_id,
1011 );
1012 log::warn!("{reason}");
1013 self.emitter.emit_order_denied(&order, &reason);
1014 return Ok(());
1015 }
1016 }
1017 } else {
1018 None
1019 };
1020
1021 let venue_symbol = format_venue_symbol(&cmd.instrument_id)?.to_string();
1022 let http_client = self.http_client.clone();
1023 let ws_exec = self.ws_exec.clone();
1024 let signing = self.signing.clone();
1025 let nonce_manager = self.nonce_manager.clone();
1026 let wallet_str = self.credential.wallet_address().to_string();
1027 let emitter = self.emitter.clone();
1028 let clock = self.clock;
1029 let instruments = self.instruments.clone();
1030 let instrument_id = cmd.instrument_id;
1031 let order_for_task = order.clone();
1032 let account_id = self.core.account_id;
1033
1034 let identity = OrderIdentity {
1037 instrument_id: order.instrument_id(),
1038 strategy_id: order.strategy_id(),
1039 order_side: order.order_side(),
1040 order_type: order.order_type(),
1041 };
1042 self.dispatch_state
1043 .register_identity(order.client_order_id(), identity);
1044
1045 self.emitter.emit_order_submitted(&order);
1046
1047 let slippage_bps = self.signing.market_order_slippage_bps;
1048 let dispatch_state = self.dispatch_state.clone();
1049
1050 self.spawn_task("submit_order", async move {
1051 let instrument = match cached_or_fetch_instrument(
1052 &http_client,
1053 &instruments,
1054 &instrument_id,
1055 &venue_symbol,
1056 )
1057 .await
1058 {
1059 Ok(i) => i,
1060 Err(e) => {
1061 log::warn!("Failed to resolve instrument {venue_symbol}: {e}");
1062 dispatch_state.forget(&order_for_task.client_order_id());
1063 let ts = clock.get_time_ns();
1064 emitter.emit_order_rejected(
1065 &order_for_task,
1066 &format!("instrument resolution failed: {e}"),
1067 ts,
1068 false,
1069 );
1070 return Ok(());
1071 }
1072 };
1073
1074 if order_for_task.is_reduce_only()
1078 && instrument.instrument_type == DeriveInstrumentType::Erc20
1079 {
1080 let reason = format!(
1081 "reduce-only is not supported for spot instrument {}; Derive spot has no position to reduce",
1082 order_for_task.instrument_id(),
1083 );
1084 log::warn!("{reason}");
1085 dispatch_state.forget(&order_for_task.client_order_id());
1086 let ts = clock.get_time_ns();
1087 emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1088 return Ok(());
1089 }
1090
1091 let explicit_price = if market_quote.is_some() {
1093 let quote = match refresh_market_order_quote(
1094 &http_client,
1095 &venue_symbol,
1096 &instrument,
1097 clock,
1098 )
1099 .await
1100 {
1101 Ok(quote) => quote,
1102 Err(e) => {
1103 let reason = format!(
1104 "market-order quote refresh failed for {}: {e}",
1105 order_for_task.client_order_id(),
1106 );
1107 log::warn!("{reason}");
1108 dispatch_state.forget(&order_for_task.client_order_id());
1109 let ts = clock.get_time_ns();
1110 emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1111 return Ok(());
1112 }
1113 };
1114
1115 match market_order_limit_price(
1116 "e,
1117 order_for_task.order_side(),
1118 slippage_bps,
1119 instrument.tick_size,
1120 ) {
1121 Some(p) => Some(p),
1122 None => {
1123 let reason = format!(
1124 "market-order slippage bound is non-positive for {} ({} bps)",
1125 order_for_task.client_order_id(),
1126 slippage_bps,
1127 );
1128 log::warn!("{reason}");
1129 dispatch_state.forget(&order_for_task.client_order_id());
1130 let ts = clock.get_time_ns();
1131 emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1132 return Ok(());
1133 }
1134 }
1135 } else if matches!(
1136 order_for_task.order_type(),
1137 OrderType::StopMarket | OrderType::MarketIfTouched
1138 ) {
1139 let trigger_price = match order_for_task.trigger_price() {
1140 Some(price) => price.as_decimal(),
1141 None => {
1142 let reason = format!(
1143 "trigger market order {} is missing trigger_price",
1144 order_for_task.client_order_id(),
1145 );
1146 log::warn!("{reason}");
1147 dispatch_state.forget(&order_for_task.client_order_id());
1148 let ts = clock.get_time_ns();
1149 emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1150 return Ok(());
1151 }
1152 };
1153
1154 match trigger_market_limit_price(
1155 trigger_price,
1156 order_for_task.order_side(),
1157 slippage_bps,
1158 instrument.tick_size,
1159 ) {
1160 Some(p) => Some(p),
1161 None => {
1162 let reason = format!(
1163 "trigger market-order slippage bound is non-positive for {} ({} bps)",
1164 order_for_task.client_order_id(),
1165 slippage_bps,
1166 );
1167 log::warn!("{reason}");
1168 dispatch_state.forget(&order_for_task.client_order_id());
1169 let ts = clock.get_time_ns();
1170 emitter.emit_order_rejected(&order_for_task, &reason, ts, false);
1171 return Ok(());
1172 }
1173 }
1174 } else {
1175 None
1176 };
1177
1178 if is_trigger_order {
1179 let nonce = nonce_manager.next_nonce(&wallet_str, signing.subaccount_id)?;
1180 let expiry = trigger_order_signature_expiry(clock);
1181 let payload = match trigger_order_to_derive_payload(
1182 &order_for_task,
1183 &instrument,
1184 signing.subaccount_id,
1185 signing.wallet_address,
1186 &signing.signer,
1187 nonce,
1188 expiry,
1189 signing.trade_module_address,
1190 signing.domain_separator,
1191 signing.action_typehash,
1192 signing.max_fee_per_contract,
1193 explicit_price,
1194 ws_exec.conn_id(),
1195 UUID4::new().to_string(),
1196 ) {
1197 Ok(p) => p,
1198 Err(e) => {
1199 log::warn!(
1200 "Trigger order encode failed for {}: {e}",
1201 order_for_task.client_order_id()
1202 );
1203 dispatch_state.forget(&order_for_task.client_order_id());
1204 let ts = clock.get_time_ns();
1205 emitter.emit_order_rejected(
1206 &order_for_task,
1207 &format!("order encoding failed: {e}"),
1208 ts,
1209 false,
1210 );
1211 return Ok(());
1212 }
1213 };
1214
1215 log::debug!(
1216 "Derive trigger submit payload client_order_id={} instrument_name={} direction={} order_type={} time_in_force={} amount={} limit_price={} trigger_price={:?} trigger_price_type={:?} trigger_type={:?}",
1217 order_for_task.client_order_id(),
1218 payload.order.instrument_name.as_str(),
1219 payload.order.direction,
1220 payload.order.order_type,
1221 payload.order.time_in_force,
1222 payload.order.amount,
1223 payload.order.limit_price,
1224 payload.order.trigger_price,
1225 payload.order.trigger_price_type,
1226 payload.order.trigger_type,
1227 );
1228
1229 match ws_exec.submit_trigger_order(&payload).await {
1230 Ok(order) => {
1231 let venue_order_id = VenueOrderId::new(order.order_id.as_str());
1232 dispatch_state.record_venue_order_id(
1233 order_for_task.client_order_id(),
1234 venue_order_id,
1235 );
1236 let ts_now = clock.get_time_ns();
1237 ensure_accepted_emitted(
1238 &emitter,
1239 &dispatch_state,
1240 order_for_task.client_order_id(),
1241 identity,
1242 venue_order_id,
1243 account_id,
1244 ts_now,
1245 ts_now,
1246 );
1247 log::debug!(
1248 "Trigger order submitted: client_order_id={} venue_order_id={venue_order_id}",
1249 order_for_task.client_order_id(),
1250 );
1251 }
1252 Err(e) if is_write_outcome_ambiguous_ws(&e) => {
1253 log::warn!(
1254 "Derive trigger submit for {} returned ambiguous WS outcome: {e}; awaiting reconciliation",
1255 order_for_task.client_order_id(),
1256 );
1257 }
1258 Err(e) => {
1259 let (reason, due_post_only) = ws_rejection_reason(&e);
1260 log::debug!(
1261 "Derive rejected trigger order {}: {reason}",
1262 order_for_task.client_order_id(),
1263 );
1264 dispatch_state.forget(&order_for_task.client_order_id());
1265 let ts = clock.get_time_ns();
1266 emitter.emit_order_rejected(
1267 &order_for_task,
1268 &reason,
1269 ts,
1270 due_post_only,
1271 );
1272 }
1273 }
1274 return Ok(());
1275 }
1276
1277 let expiry =
1278 match normal_order_signature_expiry(clock, signing.signature_expiry_secs) {
1279 Ok(expiry) => expiry,
1280 Err(e) => {
1281 log::warn!(
1282 "Order expiry validation failed for {}: {e}",
1283 order_for_task.client_order_id()
1284 );
1285 dispatch_state.forget(&order_for_task.client_order_id());
1286 let ts = clock.get_time_ns();
1287 emitter.emit_order_rejected(
1288 &order_for_task,
1289 &format!("order expiry validation failed: {e}"),
1290 ts,
1291 false,
1292 );
1293 return Ok(());
1294 }
1295 };
1296 let nonce = nonce_manager.next_nonce(&wallet_str, signing.subaccount_id)?;
1297 let payload = match order_to_derive_payload(
1298 &order_for_task,
1299 &instrument,
1300 signing.subaccount_id,
1301 signing.wallet_address,
1302 &signing.signer,
1303 nonce,
1304 expiry,
1305 signing.trade_module_address,
1306 signing.domain_separator,
1307 signing.action_typehash,
1308 signing.max_fee_per_contract,
1309 explicit_price,
1310 ) {
1311 Ok(p) => p,
1312 Err(e) => {
1313 log::warn!("Order encode failed for {}: {e}", order_for_task.client_order_id());
1314 dispatch_state.forget(&order_for_task.client_order_id());
1315 let ts = clock.get_time_ns();
1316 emitter.emit_order_rejected(
1317 &order_for_task,
1318 &format!("order encoding failed: {e}"),
1319 ts,
1320 false,
1321 );
1322 return Ok(());
1323 }
1324 };
1325
1326 log::debug!(
1329 "Derive submit payload client_order_id={} instrument_name={} direction={} order_type={} time_in_force={} amount={} limit_price={}",
1330 order_for_task.client_order_id(),
1331 payload.instrument_name.as_str(),
1332 payload.direction,
1333 payload.order_type,
1334 payload.time_in_force,
1335 payload.amount,
1336 payload.limit_price,
1337 );
1338
1339 match ws_exec.submit_order(&payload).await {
1342 Ok(_) => {
1343 log::debug!(
1344 "Order submitted: client_order_id={}",
1345 order_for_task.client_order_id(),
1346 );
1347 }
1348 Err(e) if is_write_outcome_ambiguous_ws(&e) => {
1350 log::warn!(
1351 "Derive submit for {} returned ambiguous WS outcome: {e}; awaiting reconciliation",
1352 order_for_task.client_order_id(),
1353 );
1354 }
1355 Err(e) => {
1356 let (reason, due_post_only) = ws_rejection_reason(&e);
1357 log::debug!(
1358 "Derive rejected order {}: {reason}",
1359 order_for_task.client_order_id(),
1360 );
1361 dispatch_state.forget(&order_for_task.client_order_id());
1362 let ts = clock.get_time_ns();
1363 emitter.emit_order_rejected(&order_for_task, &reason, ts, due_post_only);
1364 }
1365 }
1366 Ok(())
1367 });
1368
1369 Ok(())
1370 }
1371
1372 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1373 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
1374 for order in orders {
1375 let sub = SubmitOrder::from_order(
1376 &order,
1377 cmd.trader_id,
1378 cmd.client_id,
1379 cmd.position_id,
1380 UUID4::new(),
1381 cmd.ts_init,
1382 );
1383 self.submit_order(sub)?;
1384 }
1385 Ok(())
1386 }
1387
1388 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1389 let Some(venue_order_id) = cmd.venue_order_id else {
1390 log::warn!(
1391 "Derive cancel_order requires venue_order_id (client_order_id={})",
1392 cmd.client_order_id,
1393 );
1394 return Ok(());
1395 };
1396 let ws_exec = self.ws_exec.clone();
1397 let subaccount_id = self.credential.subaccount_id();
1398 let venue_symbol = format_venue_symbol(&cmd.instrument_id)?.to_string();
1399 let voi = venue_order_id.to_string();
1400 let emitter = self.emitter.clone();
1401 let clock = self.clock;
1402 let strategy_id = cmd.strategy_id;
1403 let instrument_id = cmd.instrument_id;
1404 let client_order_id = cmd.client_order_id;
1405 let stale_venue_order_id = venue_order_id;
1406 let is_trigger_order = self
1407 .core
1408 .cache()
1409 .order(&client_order_id)
1410 .is_some_and(|order| is_derive_trigger_order_type(order.order_type()));
1411
1412 self.spawn_task("cancel_order", async move {
1413 let outcome = if is_trigger_order {
1414 ws_exec
1415 .cancel_trigger_order(&DeriveCancelTriggerOrderParams::new(
1416 subaccount_id,
1417 voi.as_str(),
1418 ))
1419 .await
1420 .map(|_| ())
1421 } else {
1422 ws_exec
1423 .cancel_order(&DeriveCancelParams::new(
1424 subaccount_id,
1425 venue_symbol.as_str(),
1426 voi.as_str(),
1427 ))
1428 .await
1429 };
1430
1431 match outcome {
1432 Ok(()) => {}
1433 Err(e) if is_write_outcome_ambiguous_ws(&e) => {
1435 log::warn!(
1436 "Derive cancel for {client_order_id} returned ambiguous WS outcome: {e}; awaiting reconciliation",
1437 );
1438 }
1439 Err(e) => {
1440 let (reason, _) = ws_rejection_reason(&e);
1441 log::debug!("Derive rejected cancel for {client_order_id}: {reason}");
1442 let ts = clock.get_time_ns();
1443 emitter.emit_order_cancel_rejected_event(
1444 strategy_id,
1445 instrument_id,
1446 client_order_id,
1447 Some(stale_venue_order_id),
1448 &reason,
1449 ts,
1450 );
1451 }
1452 }
1453 Ok(())
1454 });
1455 Ok(())
1456 }
1457
1458 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1459 let http_client = self.http_client.clone();
1460 let ws_exec = self.ws_exec.clone();
1461 let subaccount_id = self.credential.subaccount_id();
1462 let venue_symbol = format_venue_symbol(&cmd.instrument_id)?.to_string();
1463 let side_filter = cmd.order_side;
1464
1465 self.spawn_task("cancel_all_orders", async move {
1466 if matches!(side_filter, OrderSide::Buy | OrderSide::Sell) {
1472 let open_params = DeriveGetOpenOrdersParams::new(subaccount_id);
1473 let mut orders = match http_client.get_open_orders(&open_params).await {
1474 Ok(v) => v,
1475 Err(e) => {
1476 log::warn!(
1477 "Derive cancel_all_orders: failed to list open orders for side filter {side_filter:?}: {e}",
1478 );
1479 return Ok(());
1480 }
1481 }
1482 .orders;
1483
1484 match http_client
1485 .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
1486 .await
1487 {
1488 Ok(result) => orders.extend(result.orders),
1489 Err(e) => {
1490 log::warn!(
1491 "Derive cancel_all_orders: failed to list trigger orders for side filter {side_filter:?}: {e}",
1492 );
1493 }
1494 }
1495
1496 for order in orders {
1497 if order.instrument_name.as_str() != venue_symbol {
1498 continue;
1499 }
1500 let order_side = match order.direction {
1501 DeriveOrderSide::Buy => OrderSide::Buy,
1502 DeriveOrderSide::Sell => OrderSide::Sell,
1503 };
1504
1505 if order_side != side_filter {
1506 continue;
1507 }
1508
1509 let outcome = if order.trigger_type.is_some() {
1510 ws_exec
1511 .cancel_trigger_order(&DeriveCancelTriggerOrderParams::new(
1512 subaccount_id,
1513 order.order_id.as_str(),
1514 ))
1515 .await
1516 .map(|_| ())
1517 } else {
1518 ws_exec
1519 .cancel_order(&DeriveCancelParams::new(
1520 subaccount_id,
1521 venue_symbol.as_str(),
1522 order.order_id.as_str(),
1523 ))
1524 .await
1525 };
1526
1527 if let Err(e) = outcome {
1528 log::warn!(
1529 "Derive cancel_all_orders: cancel for {} failed: {e}",
1530 order.order_id,
1531 );
1532 }
1533 }
1534 } else if let Err(e) = ws_exec
1535 .cancel_all_orders(
1536 &DeriveCancelAllParams::new(subaccount_id)
1537 .with_instrument_name(venue_symbol.as_str()),
1538 )
1539 .await
1540 {
1541 log::warn!("Derive cancel_all_orders failed for {venue_symbol}: {e}");
1542 }
1543
1544 if !matches!(side_filter, OrderSide::Buy | OrderSide::Sell) {
1545 let trigger_orders = match http_client
1546 .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
1547 .await
1548 {
1549 Ok(result) => result.orders,
1550 Err(e) => {
1551 log::warn!(
1552 "Derive cancel_all_orders: failed to list trigger orders for {venue_symbol}: {e}",
1553 );
1554 return Ok(());
1555 }
1556 };
1557
1558 for order in trigger_orders {
1559 if order.instrument_name.as_str() != venue_symbol {
1560 continue;
1561 }
1562
1563 if let Err(e) = ws_exec
1564 .cancel_trigger_order(&DeriveCancelTriggerOrderParams::new(
1565 subaccount_id,
1566 order.order_id.as_str(),
1567 ))
1568 .await
1569 {
1570 log::warn!(
1571 "Derive cancel_all_orders: trigger cancel for {} failed: {e}",
1572 order.order_id,
1573 );
1574 }
1575 }
1576 }
1577 Ok(())
1578 });
1579 Ok(())
1580 }
1581
1582 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1583 for inner in cmd.cancels {
1584 self.cancel_order(inner)?;
1585 }
1586 Ok(())
1587 }
1588
1589 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1590 let ts_now = self.clock.get_time_ns();
1591
1592 let Some(venue_order_id) = cmd.venue_order_id else {
1593 let reason = "venue_order_id is required for modify";
1594 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
1595 self.emitter.emit_order_modify_rejected_event(
1596 cmd.strategy_id,
1597 cmd.instrument_id,
1598 cmd.client_order_id,
1599 None,
1600 reason,
1601 ts_now,
1602 );
1603 return Ok(());
1604 };
1605
1606 let Some(order) = self
1607 .core
1608 .cache()
1609 .order(&cmd.client_order_id)
1610 .map(|o| o.clone())
1611 else {
1612 let reason = "order not found in cache";
1613 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
1614 self.emitter.emit_order_modify_rejected_event(
1615 cmd.strategy_id,
1616 cmd.instrument_id,
1617 cmd.client_order_id,
1618 Some(venue_order_id),
1619 reason,
1620 ts_now,
1621 );
1622 return Ok(());
1623 };
1624
1625 if is_derive_trigger_order_type(order.order_type()) {
1626 let reason = "Derive trigger orders cannot be modified; cancel and resubmit";
1627 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
1628 self.emitter.emit_order_modify_rejected_event(
1629 cmd.strategy_id,
1630 cmd.instrument_id,
1631 cmd.client_order_id,
1632 Some(venue_order_id),
1633 reason,
1634 ts_now,
1635 );
1636 return Ok(());
1637 }
1638
1639 let target_quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
1640 let target_price = cmd.price.or_else(|| order.price());
1641
1642 let venue_symbol = format_venue_symbol(&cmd.instrument_id)?.to_string();
1643 let http_client = self.http_client.clone();
1644 let ws_exec = self.ws_exec.clone();
1645 let signing = self.signing.clone();
1646 let nonce_manager = self.nonce_manager.clone();
1647 let wallet_str = self.credential.wallet_address().to_string();
1648 let emitter = self.emitter.clone();
1649 let clock = self.clock;
1650 let instruments = self.instruments.clone();
1651 let dispatch_state = self.dispatch_state.clone();
1652 let order_for_task = order;
1653 let strategy_id = cmd.strategy_id;
1654 let instrument_id = cmd.instrument_id;
1655 let client_order_id = cmd.client_order_id;
1656 let stale_venue_order_id = venue_order_id;
1657 let voi_str = venue_order_id.to_string();
1658
1659 self.spawn_task("modify_order", async move {
1660 let instrument = match cached_or_fetch_instrument(
1661 &http_client,
1662 &instruments,
1663 &instrument_id,
1664 &venue_symbol,
1665 )
1666 .await
1667 {
1668 Ok(i) => i,
1669 Err(e) => {
1670 let reason = format!("instrument resolution failed: {e}");
1671 log::warn!("Cannot modify order {client_order_id}: {reason}");
1672 let ts = clock.get_time_ns();
1673 emitter.emit_order_modify_rejected_event(
1674 strategy_id,
1675 instrument_id,
1676 client_order_id,
1677 Some(stale_venue_order_id),
1678 &reason,
1679 ts,
1680 );
1681 return Ok(());
1682 }
1683 };
1684
1685 let expiry = match normal_order_signature_expiry(clock, signing.signature_expiry_secs) {
1686 Ok(expiry) => expiry,
1687 Err(e) => {
1688 let reason = format!("replace expiry validation failed: {e}");
1689 log::warn!("Cannot modify order {client_order_id}: {reason}");
1690 let ts = clock.get_time_ns();
1691 emitter.emit_order_modify_rejected_event(
1692 strategy_id,
1693 instrument_id,
1694 client_order_id,
1695 Some(stale_venue_order_id),
1696 &reason,
1697 ts,
1698 );
1699 return Ok(());
1700 }
1701 };
1702 let nonce = nonce_manager.next_nonce(&wallet_str, signing.subaccount_id)?;
1703
1704 let payload = match order_replace_to_derive_payload(
1705 &order_for_task,
1706 &instrument,
1707 signing.subaccount_id,
1708 signing.wallet_address,
1709 &signing.signer,
1710 nonce,
1711 expiry,
1712 signing.trade_module_address,
1713 signing.domain_separator,
1714 signing.action_typehash,
1715 signing.max_fee_per_contract,
1716 Some(target_quantity.as_decimal()),
1717 target_price.map(|p| p.as_decimal()),
1718 &voi_str,
1719 ) {
1720 Ok(p) => p,
1721 Err(e) => {
1722 let reason = format!("replace encoding failed: {e}");
1723 log::warn!("Cannot modify order {client_order_id}: {reason}");
1724 let ts = clock.get_time_ns();
1725 emitter.emit_order_modify_rejected_event(
1726 strategy_id,
1727 instrument_id,
1728 client_order_id,
1729 Some(stale_venue_order_id),
1730 &reason,
1731 ts,
1732 );
1733 return Ok(());
1734 }
1735 };
1736
1737 dispatch_state.mark_pending_modify(client_order_id, stale_venue_order_id);
1740
1741 match ws_exec.modify_order(&payload).await {
1742 Ok(order) => {
1743 let new_voi = VenueOrderId::new(order.order_id.as_str());
1744 log::debug!(
1745 "Order replaced: client_order_id={client_order_id}, new venue_order_id={new_voi}",
1746 );
1747 dispatch_state.record_venue_order_id(client_order_id, new_voi);
1750 dispatch_state.clear_pending_modify(&client_order_id);
1751 let ts = clock.get_time_ns();
1752 emitter.emit_order_updated(
1753 &order_for_task,
1754 new_voi,
1755 target_quantity,
1756 target_price,
1757 None,
1758 None,
1759 ts,
1760 );
1761 }
1762 Err(e) if is_write_outcome_ambiguous_ws(&e) => {
1764 dispatch_state.clear_pending_modify(&client_order_id);
1765 log::warn!(
1766 "Derive modify for {client_order_id} returned ambiguous WS outcome: {e}; awaiting reconciliation",
1767 );
1768 }
1769 Err(e) => {
1770 dispatch_state.clear_pending_modify(&client_order_id);
1771 let (reason, _) = ws_rejection_reason(&e);
1772 log::debug!("Derive rejected modify for {client_order_id}: {reason}");
1773 let ts = clock.get_time_ns();
1774 emitter.emit_order_modify_rejected_event(
1775 strategy_id,
1776 instrument_id,
1777 client_order_id,
1778 Some(stale_venue_order_id),
1779 &reason,
1780 ts,
1781 );
1782 }
1783 }
1784 Ok(())
1785 });
1786 Ok(())
1787 }
1788
1789 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1790 let http_client = self.http_client.clone();
1791 let subaccount_id = self.credential.subaccount_id();
1792 let emitter = self.emitter.clone();
1793 let clock = self.clock;
1794 self.spawn_task("query_account", async move {
1795 let subaccount = http_client
1796 .get_subaccount(&DeriveGetSubaccountParams::new(subaccount_id))
1797 .await?;
1798 let (balances, margins) = parse_derive_subaccount_to_balances(&subaccount)?;
1799 let ts_event = clock.get_time_ns();
1800 emitter.emit_account_state(balances, margins, true, ts_event);
1801 Ok(())
1802 });
1803 Ok(())
1804 }
1805
1806 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1807 let Some(venue_order_id) = cmd.venue_order_id else {
1808 log::warn!(
1809 "Derive query_order requires venue_order_id (client_order_id={})",
1810 cmd.client_order_id,
1811 );
1812 return Ok(());
1813 };
1814 let http_client = self.http_client.clone();
1815 let subaccount_id = self.credential.subaccount_id();
1816 let account_id = self.core.account_id;
1817 let emitter = self.emitter.clone();
1818 let clock = self.clock;
1819 let voi = venue_order_id.to_string();
1820
1821 self.spawn_task("query_order", async move {
1822 let order = match http_client
1823 .get_order(&DeriveGetOrderParams::new(subaccount_id, voi.as_str()))
1824 .await
1825 {
1826 Ok(o) => o,
1827 Err(e) => {
1828 let trigger_orders = match http_client
1829 .get_trigger_orders(&DeriveGetTriggerOrdersParams::new(subaccount_id))
1830 .await
1831 {
1832 Ok(result) => result.orders,
1833 Err(trigger_err) => {
1834 log::warn!(
1835 "Failed to fetch Derive order {voi}: {e}; trigger lookup also failed: {trigger_err}",
1836 );
1837 return Ok(());
1838 }
1839 };
1840
1841 match trigger_orders
1842 .into_iter()
1843 .find(|o| o.order_id.as_str() == voi.as_str())
1844 {
1845 Some(order) => order,
1846 None => {
1847 log::warn!("Failed to fetch Derive order {voi}: {e}");
1848 return Ok(());
1849 }
1850 }
1851 }
1852 };
1853 let ts_init = clock.get_time_ns();
1854 let report = parse_derive_order_to_report(&order, account_id, ts_init)?;
1855 emitter.send_order_status_report(report);
1856 Ok(())
1857 });
1858 Ok(())
1859 }
1860}
1861
1862fn ws_rejection_reason(error: &DeriveWsError) -> (String, bool) {
1865 match error {
1866 DeriveWsError::JsonRpc { code, message, .. } => (
1867 format!("JSON-RPC {code}: {message}"),
1868 derive_rejection_due_post_only(Some(*code), message),
1869 ),
1870 other => (other.to_string(), false),
1871 }
1872}
1873
1874fn add_missing_flat_position_reports(
1875 mass_status: &mut ExecutionMassStatus,
1876 account_id: AccountId,
1877 touched_instruments: AHashSet<InstrumentId>,
1878 ts_init: UnixNanos,
1879) {
1880 let active_position_instruments: AHashSet<InstrumentId> =
1881 mass_status.position_reports().keys().copied().collect();
1882 let mut flat_reports = Vec::new();
1883
1884 for instrument_id in touched_instruments {
1885 if active_position_instruments.contains(&instrument_id) {
1886 continue;
1887 }
1888
1889 flat_reports.push(PositionStatusReport::new(
1890 account_id,
1891 instrument_id,
1892 PositionSideSpecified::Flat,
1893 Quantity::from("0"),
1894 ts_init,
1895 ts_init,
1896 Some(UUID4::new()),
1897 None,
1898 None,
1899 ));
1900 }
1901
1902 if !flat_reports.is_empty() {
1903 log::info!(
1904 "Added {} flat PositionReports for Derive instruments absent from current positions",
1905 flat_reports.len()
1906 );
1907 mass_status.add_position_reports(flat_reports);
1908 }
1909}
1910
1911fn handle_ws_message(
1912 message: DeriveWsMessage,
1913 emitter: &ExecutionEventEmitter,
1914 account_id: AccountId,
1915 clock: &'static AtomicTime,
1916 dispatch_state: &WsDispatchState,
1917) {
1918 let payload = match message {
1919 DeriveWsMessage::Subscription(payload) => payload,
1920 DeriveWsMessage::Authenticated | DeriveWsMessage::Reconnected => return,
1921 };
1922
1923 let is_orders_channel = payload.channel.as_str().ends_with(".orders");
1924 let is_trades_channel = payload.channel.as_str().ends_with(".trades");
1925
1926 if is_orders_channel {
1927 let data = match serde_json::from_str::<DeriveOrdersSubscriptionData>(payload.data.get()) {
1928 Ok(data) => data,
1929 Err(_) => return,
1930 };
1931 dispatch_orders_payload(data, emitter, account_id, clock, dispatch_state);
1932 } else if is_trades_channel {
1933 let data = match serde_json::from_str::<DeriveTradesSubscriptionData>(payload.data.get()) {
1934 Ok(data) => data,
1935 Err(_) => return,
1936 };
1937 dispatch_trades_payload(data, emitter, account_id, clock, dispatch_state);
1938 }
1939}
1940
1941pub fn dispatch_orders_payload(
1948 data: DeriveOrdersSubscriptionData,
1949 emitter: &ExecutionEventEmitter,
1950 account_id: AccountId,
1951 clock: &'static AtomicTime,
1952 dispatch_state: &WsDispatchState,
1953) {
1954 let ts_init = clock.get_time_ns();
1955 for order in data.orders {
1956 let report = match parse_derive_order_to_report(&order, account_id, ts_init) {
1957 Ok(report) => report,
1958 Err(e) => {
1959 log::warn!("Failed to parse Derive order WS update: {e}");
1960 continue;
1961 }
1962 };
1963
1964 let identity = tracked_order_identity(report.client_order_id, dispatch_state);
1965
1966 match identity {
1967 Some((client_order_id, identity)) => emit_tracked_order_event(
1968 emitter,
1969 dispatch_state,
1970 client_order_id,
1971 identity,
1972 &report,
1973 account_id,
1974 ts_init,
1975 ),
1976 None => emitter.send_order_status_report(report),
1977 }
1978 }
1979}
1980
1981pub fn dispatch_trades_payload(
1988 data: DeriveTradesSubscriptionData,
1989 emitter: &ExecutionEventEmitter,
1990 account_id: AccountId,
1991 clock: &'static AtomicTime,
1992 dispatch_state: &WsDispatchState,
1993) {
1994 let ts_init = clock.get_time_ns();
1995 let fee_currency = Currency::USDC();
1996 for trade in data.trades {
1997 match parse_derive_trade_to_fill_report(&trade, account_id, fee_currency, ts_init) {
1998 Ok(Some(report)) => {
1999 if dispatch_state.check_and_insert_trade(report.trade_id) {
2000 log::debug!(
2001 "Skipping duplicate Derive fill (trade_id={}) on WS dispatch",
2002 report.trade_id,
2003 );
2004 continue;
2005 }
2006
2007 let identity = tracked_order_identity(report.client_order_id, dispatch_state);
2008
2009 match identity {
2010 Some((client_order_id, identity)) => emit_tracked_fill(
2011 emitter,
2012 dispatch_state,
2013 client_order_id,
2014 identity,
2015 &report,
2016 account_id,
2017 ts_init,
2018 ),
2019 None => emitter.send_fill_report(report),
2020 }
2021 }
2022 Ok(None) => {}
2023 Err(e) => log::warn!("Failed to parse Derive trade WS update: {e}"),
2024 }
2025 }
2026}
2027
2028fn tracked_order_identity(
2029 client_order_id: Option<ClientOrderId>,
2030 dispatch_state: &WsDispatchState,
2031) -> Option<(ClientOrderId, OrderIdentity)> {
2032 client_order_id.and_then(|cid| {
2033 dispatch_state
2034 .identity(&cid)
2035 .map(|identity| (cid, identity))
2036 })
2037}
2038
2039#[expect(clippy::too_many_arguments)]
2044fn ensure_accepted_emitted(
2045 emitter: &ExecutionEventEmitter,
2046 dispatch_state: &WsDispatchState,
2047 client_order_id: ClientOrderId,
2048 identity: OrderIdentity,
2049 venue_order_id: VenueOrderId,
2050 account_id: AccountId,
2051 ts_event: UnixNanos,
2052 ts_init: UnixNanos,
2053) {
2054 if dispatch_state.mark_accepted(client_order_id) {
2055 return;
2056 }
2057 let accepted = OrderAccepted::new(
2058 emitter.trader_id(),
2059 identity.strategy_id,
2060 identity.instrument_id,
2061 client_order_id,
2062 venue_order_id,
2063 account_id,
2064 UUID4::new(),
2065 ts_event,
2066 ts_init,
2067 false,
2068 );
2069 emitter.send_order_event(OrderEventAny::Accepted(accepted));
2070}
2071
2072fn emit_tracked_order_event(
2073 emitter: &ExecutionEventEmitter,
2074 dispatch_state: &WsDispatchState,
2075 client_order_id: ClientOrderId,
2076 identity: OrderIdentity,
2077 report: &OrderStatusReport,
2078 account_id: AccountId,
2079 ts_init: UnixNanos,
2080) {
2081 let venue_order_id = report.venue_order_id;
2082 let ts_accepted = report.ts_accepted;
2083 let ts_event = report.ts_last;
2084
2085 if dispatch_state.pending_modify(&client_order_id) == Some(venue_order_id) {
2091 log::debug!(
2092 "Skipping cancel-replace leg for {client_order_id}: stale venue_order_id={venue_order_id}",
2093 );
2094 return;
2095 }
2096
2097 if let Some(bound) = dispatch_state.bound_venue_order_id(&client_order_id)
2098 && bound != venue_order_id
2099 {
2100 log::debug!(
2101 "Skipping stale {:?} for {client_order_id}: venue_order_id={venue_order_id} superseded by {bound}",
2102 report.order_status,
2103 );
2104 return;
2105 }
2106
2107 match report.order_status {
2108 OrderStatus::Accepted | OrderStatus::PartiallyFilled => {
2109 if dispatch_state.contains_filled(&client_order_id) {
2110 log::debug!("Skipping stale Accepted for {client_order_id} (already filled)",);
2111 return;
2112 }
2113 dispatch_state.record_venue_order_id(client_order_id, venue_order_id);
2114 ensure_accepted_emitted(
2115 emitter,
2116 dispatch_state,
2117 client_order_id,
2118 identity,
2119 venue_order_id,
2120 account_id,
2121 ts_accepted,
2122 ts_init,
2123 );
2124 }
2125 OrderStatus::Filled => {
2126 dispatch_state.record_venue_order_id(client_order_id, venue_order_id);
2127 ensure_accepted_emitted(
2128 emitter,
2129 dispatch_state,
2130 client_order_id,
2131 identity,
2132 venue_order_id,
2133 account_id,
2134 ts_accepted,
2135 ts_init,
2136 );
2137 dispatch_state.mark_filled(client_order_id);
2145 }
2146 OrderStatus::Canceled => {
2147 ensure_accepted_emitted(
2148 emitter,
2149 dispatch_state,
2150 client_order_id,
2151 identity,
2152 venue_order_id,
2153 account_id,
2154 ts_accepted,
2155 ts_init,
2156 );
2157 let canceled = OrderCanceled::new(
2158 emitter.trader_id(),
2159 identity.strategy_id,
2160 identity.instrument_id,
2161 client_order_id,
2162 UUID4::new(),
2163 ts_event,
2164 ts_init,
2165 false,
2166 Some(venue_order_id),
2167 Some(account_id),
2168 );
2169 emitter.send_order_event(OrderEventAny::Canceled(canceled));
2170 dispatch_state.forget(&client_order_id);
2171 }
2172 OrderStatus::Expired => {
2173 ensure_accepted_emitted(
2174 emitter,
2175 dispatch_state,
2176 client_order_id,
2177 identity,
2178 venue_order_id,
2179 account_id,
2180 ts_accepted,
2181 ts_init,
2182 );
2183 let expired = OrderExpired::new(
2184 emitter.trader_id(),
2185 identity.strategy_id,
2186 identity.instrument_id,
2187 client_order_id,
2188 UUID4::new(),
2189 ts_event,
2190 ts_init,
2191 false,
2192 Some(venue_order_id),
2193 Some(account_id),
2194 );
2195 emitter.send_order_event(OrderEventAny::Expired(expired));
2196 dispatch_state.forget(&client_order_id);
2197 }
2198 OrderStatus::Rejected => {
2199 let reason = report
2200 .cancel_reason
2201 .as_deref()
2202 .unwrap_or("Order rejected by Derive");
2203 let due_post_only = derive_rejection_due_post_only(None, reason);
2204 let rejected = OrderRejected::new(
2205 emitter.trader_id(),
2206 identity.strategy_id,
2207 identity.instrument_id,
2208 client_order_id,
2209 account_id,
2210 Ustr::from(reason),
2211 UUID4::new(),
2212 ts_event,
2213 ts_init,
2214 false,
2215 due_post_only,
2216 );
2217 emitter.send_order_event(OrderEventAny::Rejected(rejected));
2218 dispatch_state.forget(&client_order_id);
2219 }
2220 other => {
2221 log::debug!(
2222 "Unhandled tracked order status {other:?} for {client_order_id}, sending as report",
2223 );
2224 emitter.send_order_status_report(report.clone());
2225 }
2226 }
2227}
2228
2229fn emit_tracked_fill(
2230 emitter: &ExecutionEventEmitter,
2231 dispatch_state: &WsDispatchState,
2232 client_order_id: ClientOrderId,
2233 identity: OrderIdentity,
2234 report: &FillReport,
2235 account_id: AccountId,
2236 ts_init: UnixNanos,
2237) {
2238 ensure_accepted_emitted(
2239 emitter,
2240 dispatch_state,
2241 client_order_id,
2242 identity,
2243 report.venue_order_id,
2244 account_id,
2245 report.ts_event,
2246 ts_init,
2247 );
2248
2249 let filled = OrderFilled::new(
2250 emitter.trader_id(),
2251 identity.strategy_id,
2252 identity.instrument_id,
2253 client_order_id,
2254 report.venue_order_id,
2255 account_id,
2256 report.trade_id,
2257 identity.order_side,
2258 identity.order_type,
2259 report.last_qty,
2260 report.last_px,
2261 report.commission.currency,
2262 report.liquidity_side,
2263 UUID4::new(),
2264 report.ts_event,
2265 ts_init,
2266 false,
2267 report.venue_position_id,
2268 Some(report.commission),
2269 );
2270 emitter.send_order_event(OrderEventAny::Filled(filled));
2271}
2272
2273fn market_order_limit_price(
2284 quote: &QuoteTick,
2285 side: OrderSide,
2286 slippage_bps: u32,
2287 tick_size: Decimal,
2288) -> Option<Decimal> {
2289 let bps = Decimal::from(slippage_bps);
2290 let scale = Decimal::from(10_000_u32);
2291 let one = Decimal::ONE;
2292 let raw = match side {
2293 OrderSide::Buy => quote.ask_price.as_decimal() * (one + bps / scale),
2294 OrderSide::Sell => quote.bid_price.as_decimal() * (one - bps / scale),
2295 OrderSide::NoOrderSide => return None,
2297 };
2298 let rounded = round_to_tick(raw, tick_size, side);
2299 if rounded <= Decimal::ZERO {
2300 return None;
2301 }
2302 Some(rounded)
2303}
2304
2305fn trigger_market_limit_price(
2306 trigger_price: Decimal,
2307 side: OrderSide,
2308 slippage_bps: u32,
2309 tick_size: Decimal,
2310) -> Option<Decimal> {
2311 let bps = Decimal::from(slippage_bps);
2312 let scale = Decimal::from(10_000_u32);
2313 let one = Decimal::ONE;
2314 let raw = match side {
2315 OrderSide::Buy => trigger_price * (one + bps / scale),
2316 OrderSide::Sell => trigger_price * (one - bps / scale),
2317 OrderSide::NoOrderSide => return None,
2318 };
2319 let rounded = round_to_tick(raw, tick_size, side);
2320 if rounded <= Decimal::ZERO {
2321 return None;
2322 }
2323 Some(rounded)
2324}
2325
2326fn is_derive_trigger_order_type(order_type: OrderType) -> bool {
2327 matches!(
2328 order_type,
2329 OrderType::StopMarket
2330 | OrderType::StopLimit
2331 | OrderType::MarketIfTouched
2332 | OrderType::LimitIfTouched
2333 )
2334}
2335
2336fn trigger_order_signature_expiry(clock: &'static AtomicTime) -> i64 {
2337 let now_secs = (clock.get_time_ns().as_u64() / 1_000_000_000) as i64;
2338 now_secs + TRIGGER_ORDER_SIGNATURE_TTL.as_secs() as i64
2339}
2340
2341fn normal_order_signature_expiry(
2342 clock: &'static AtomicTime,
2343 signature_expiry_secs: u64,
2344) -> anyhow::Result<i64> {
2345 let min_ttl_secs = MIN_SIGNATURE_TTL.as_secs();
2346 if signature_expiry_secs <= min_ttl_secs {
2347 anyhow::bail!(
2348 "signature_expiry_secs {signature_expiry_secs}s must be greater than the Derive minimum {min_ttl_secs}s"
2349 );
2350 }
2351
2352 let now_secs_u64 = clock.get_time_ns().as_u64() / 1_000_000_000;
2353 let now_secs = i64::try_from(now_secs_u64).with_context(|| {
2354 format!("current UNIX time {now_secs_u64}s cannot fit in Derive signature_expiry_sec")
2355 })?;
2356 let ttl_secs = i64::try_from(signature_expiry_secs).with_context(|| {
2357 format!(
2358 "signature_expiry_secs {signature_expiry_secs}s cannot fit in Derive signature_expiry_sec"
2359 )
2360 })?;
2361
2362 now_secs.checked_add(ttl_secs).ok_or_else(|| {
2363 anyhow::anyhow!(
2364 "signature expiry overflows Derive signature_expiry_sec: now {now_secs}s plus TTL {ttl_secs}s"
2365 )
2366 })
2367}
2368
2369async fn refresh_market_order_quote(
2370 http_client: &DeriveHttpClient,
2371 venue_symbol: &str,
2372 instrument: &DeriveInstrument,
2373 clock: &'static AtomicTime,
2374) -> anyhow::Result<QuoteTick> {
2375 let ticker = http_client.get_ticker(venue_symbol).await?;
2376 let price_precision = Price::from_decimal(instrument.tick_size)
2377 .with_context(|| format!("invalid Derive tick_size for {venue_symbol}"))?
2378 .precision;
2379 let size_precision = Quantity::from_decimal(instrument.amount_step)
2380 .with_context(|| format!("invalid Derive amount_step for {venue_symbol}"))?
2381 .precision;
2382
2383 parse_ticker_quote_from_rest(
2384 &ticker,
2385 price_precision,
2386 size_precision,
2387 clock.get_time_ns(),
2388 )
2389}
2390
2391fn round_to_tick(value: Decimal, tick_size: Decimal, side: OrderSide) -> Decimal {
2396 if tick_size <= Decimal::ZERO {
2397 return value;
2398 }
2399 let ratio = value / tick_size;
2400 let ticks = match side {
2401 OrderSide::Buy => ratio.ceil(),
2402 OrderSide::Sell => ratio.floor(),
2403 OrderSide::NoOrderSide => ratio.round(),
2404 };
2405 ticks * tick_size
2406}
2407
2408async fn cached_or_fetch_instrument(
2409 http_client: &DeriveHttpClient,
2410 instruments: &Arc<AtomicMap<InstrumentId, DeriveInstrument>>,
2411 instrument_id: &InstrumentId,
2412 venue_symbol: &str,
2413) -> anyhow::Result<DeriveInstrument> {
2414 if let Some(cached) = instruments.get_cloned(instrument_id) {
2415 return Ok(cached);
2416 }
2417 let instrument = http_client
2418 .get_instrument(venue_symbol)
2419 .await
2420 .with_context(|| format!("failed to fetch instrument {venue_symbol}"))?;
2421 instruments.insert(*instrument_id, instrument.clone());
2422 Ok(instrument)
2423}
2424
2425#[cfg(test)]
2426mod tests {
2427 use std::{cell::RefCell, rc::Rc};
2428
2429 use nautilus_common::{cache::Cache, messages::ExecutionEvent};
2430 use nautilus_core::UnixNanos;
2431 use nautilus_live::ExecutionClientCore;
2432 use nautilus_model::{
2433 data::QuoteTick,
2434 enums::{AccountType, OmsType, TimeInForce},
2435 identifiers::{AccountId, ClientId, InstrumentId, StrategyId, TraderId},
2436 types::{Price, Quantity},
2437 };
2438 use rstest::rstest;
2439 use rust_decimal_macros::dec;
2440
2441 use super::*;
2442 use crate::common::{consts::DERIVE, enums::DeriveEnvironment};
2443
2444 const TEST_WALLET: &str = "0x0000000000000000000000000000000000001234";
2445 const TEST_SESSION_KEY: &str =
2446 "0x2ae8be44db8a590d20bffbe3b6872df9b569147d3bf6801a35a28281a4816bbd";
2447 const TEST_SUBACCOUNT: u64 = 30769;
2448
2449 fn test_core() -> ExecutionClientCore {
2450 let cache = Rc::new(RefCell::new(Cache::default()));
2451 ExecutionClientCore::new(
2452 TraderId::from("TRADER-001"),
2453 ClientId::from(DERIVE),
2454 *DERIVE_VENUE,
2455 OmsType::Netting,
2456 AccountId::from("DERIVE-001"),
2457 AccountType::Margin,
2458 None,
2459 cache,
2460 )
2461 }
2462
2463 fn test_config() -> DeriveExecClientConfig {
2464 DeriveExecClientConfig {
2465 wallet_address: Some(TEST_WALLET.to_string()),
2466 session_key: Some(TEST_SESSION_KEY.to_string()),
2467 subaccount_id: Some(TEST_SUBACCOUNT),
2468 environment: DeriveEnvironment::Testnet,
2469 domain_separator: Some(
2470 "0x2222222222222222222222222222222222222222222222222222222222222222".to_string(),
2471 ),
2472 action_typehash: Some(
2473 "0x1111111111111111111111111111111111111111111111111111111111111111".to_string(),
2474 ),
2475 trade_module_address: Some("0x000000000000000000000000000000000000bbbb".to_string()),
2476 ..DeriveExecClientConfig::default()
2477 }
2478 }
2479
2480 #[rstest]
2481 fn test_market_order_limit_price_buy_lifts_ask_and_rounds_up_to_tick() {
2482 let quote = QuoteTick::new(
2483 InstrumentId::from("ETH-PERP.DERIVE"),
2484 Price::from("3500.00"),
2485 Price::from("3501.00"),
2486 Quantity::from("1.000"),
2487 Quantity::from("1.000"),
2488 UnixNanos::from(0),
2489 UnixNanos::from(0),
2490 );
2491 let price = market_order_limit_price("e, OrderSide::Buy, 50, dec!(0.01)).unwrap();
2493 assert_eq!(price, dec!(3518.51));
2494 }
2495
2496 #[rstest]
2497 fn test_market_order_limit_price_sell_drops_bid_rounds_down_and_denies_non_positive() {
2498 let quote = QuoteTick::new(
2499 InstrumentId::from("ETH-PERP.DERIVE"),
2500 Price::from("3500.00"),
2501 Price::from("3501.00"),
2502 Quantity::from("1.000"),
2503 Quantity::from("1.000"),
2504 UnixNanos::from(0),
2505 UnixNanos::from(0),
2506 );
2507 let price = market_order_limit_price("e, OrderSide::Sell, 50, dec!(0.01)).unwrap();
2509 assert_eq!(price, dec!(3482.5));
2510
2511 let zero = market_order_limit_price("e, OrderSide::Sell, 20_000, dec!(0.01));
2513 assert!(zero.is_none());
2514 }
2515
2516 #[rstest]
2517 fn test_trigger_market_limit_price_uses_trigger_price_bound() {
2518 let buy = trigger_market_limit_price(dec!(3600), OrderSide::Buy, 50, dec!(0.01)).unwrap();
2519 let sell = trigger_market_limit_price(dec!(3600), OrderSide::Sell, 50, dec!(0.01)).unwrap();
2520 let zero = trigger_market_limit_price(dec!(1), OrderSide::Sell, 20_000, dec!(0.01));
2521
2522 assert_eq!(buy, dec!(3618));
2523 assert_eq!(sell, dec!(3582));
2524 assert!(zero.is_none());
2525 }
2526
2527 #[rstest]
2528 fn test_normal_order_signature_expiry_accepts_ttl_above_minimum() {
2529 let clock = get_atomic_clock_realtime();
2530 let start_secs = (clock.get_time_ns().as_u64() / 1_000_000_000) as i64;
2531 let ttl_secs = MIN_SIGNATURE_TTL.as_secs() + 1;
2532
2533 let expiry = normal_order_signature_expiry(clock, ttl_secs).expect("expiry is valid");
2534
2535 assert!(expiry >= start_secs + ttl_secs as i64);
2536 }
2537
2538 #[rstest]
2539 #[case(MIN_SIGNATURE_TTL.as_secs(), "must be greater than the Derive minimum")]
2540 #[case(MIN_SIGNATURE_TTL.as_secs() - 1, "must be greater than the Derive minimum")]
2541 fn test_normal_order_signature_expiry_rejects_minimum_or_lower_ttl(
2542 #[case] ttl_secs: u64,
2543 #[case] reason_fragment: &str,
2544 ) {
2545 let clock = get_atomic_clock_realtime();
2546
2547 let err = normal_order_signature_expiry(clock, ttl_secs).expect_err("TTL is too short");
2548
2549 assert!(
2550 err.to_string().contains(reason_fragment),
2551 "unexpected error: {err}",
2552 );
2553 }
2554
2555 #[rstest]
2556 #[case(i64::MAX as u64, "overflows Derive signature_expiry_sec")]
2557 #[case(u64::MAX, "cannot fit in Derive signature_expiry_sec")]
2558 fn test_normal_order_signature_expiry_rejects_extreme_ttl(
2559 #[case] ttl_secs: u64,
2560 #[case] reason_fragment: &str,
2561 ) {
2562 let clock = get_atomic_clock_realtime();
2563
2564 let err = normal_order_signature_expiry(clock, ttl_secs).expect_err("TTL is invalid");
2565
2566 assert!(
2567 err.to_string().contains(reason_fragment),
2568 "unexpected error: {err}",
2569 );
2570 }
2571
2572 #[rstest]
2573 #[case(OrderType::StopMarket, true)]
2574 #[case(OrderType::StopLimit, true)]
2575 #[case(OrderType::MarketIfTouched, true)]
2576 #[case(OrderType::LimitIfTouched, true)]
2577 #[case(OrderType::Market, false)]
2578 #[case(OrderType::Limit, false)]
2579 #[case(OrderType::MarketToLimit, false)]
2580 #[case(OrderType::TrailingStopMarket, false)]
2581 fn test_is_derive_trigger_order_type(#[case] order_type: OrderType, #[case] expected: bool) {
2582 assert_eq!(is_derive_trigger_order_type(order_type), expected);
2583 }
2584
2585 #[rstest]
2586 #[case(dec!(0))]
2587 #[case(dec!(-1))]
2588 fn test_round_to_tick_treats_non_positive_tick_as_no_op(#[case] tick: Decimal) {
2589 assert_eq!(
2592 round_to_tick(dec!(3501.55), tick, OrderSide::Buy),
2593 dec!(3501.55)
2594 );
2595 assert_eq!(
2596 round_to_tick(dec!(3501.55), tick, OrderSide::Sell),
2597 dec!(3501.55)
2598 );
2599 }
2600
2601 #[rstest]
2602 fn test_resolve_signing_context_rejects_placeholder_domain_separator() {
2603 let mut config = test_config();
2607 config.environment = DeriveEnvironment::Mainnet;
2608 config.domain_separator =
2609 Some("0x<paste_from_docs.derive.xyz_protocol_constants>".to_string());
2610 let err = DeriveExecutionClient::new(test_core(), config).expect_err("must reject");
2611 let msg = err.to_string();
2612 assert!(msg.contains("placeholder"), "unexpected error: {msg}",);
2613 }
2614
2615 #[rstest]
2616 fn test_resolve_signing_context_uses_mainnet_defaults() {
2617 let mut config = test_config();
2618 config.environment = DeriveEnvironment::Mainnet;
2619 config.domain_separator = None;
2620 config.action_typehash = None;
2621 config.trade_module_address = None;
2622
2623 DeriveExecutionClient::new(test_core(), config).expect("mainnet defaults should parse");
2624 }
2625
2626 #[rstest]
2627 fn test_resolve_signing_context_uses_testnet_defaults() {
2628 let mut config = test_config();
2629 config.environment = DeriveEnvironment::Testnet;
2630 config.domain_separator = None;
2631 config.action_typehash = None;
2632 config.trade_module_address = None;
2633
2634 DeriveExecutionClient::new(test_core(), config).expect("testnet defaults should parse");
2635 }
2636
2637 #[rstest]
2638 fn test_market_order_limit_price_rounds_to_coarse_tick() {
2639 let quote = QuoteTick::new(
2642 InstrumentId::from("ETH-20260627-3500-C.DERIVE"),
2643 Price::from("3500"),
2644 Price::from("3501"),
2645 Quantity::from("1.000"),
2646 Quantity::from("1.000"),
2647 UnixNanos::from(0),
2648 UnixNanos::from(0),
2649 );
2650 let buy = market_order_limit_price("e, OrderSide::Buy, 50, dec!(1)).unwrap();
2651 assert_eq!(buy, dec!(3519));
2652 let sell = market_order_limit_price("e, OrderSide::Sell, 50, dec!(1)).unwrap();
2653 assert_eq!(sell, dec!(3482));
2654 }
2655
2656 #[rstest]
2657 fn test_new_populates_identity() {
2658 let core = test_core();
2659 let client = DeriveExecutionClient::new(core, test_config()).unwrap();
2660
2661 assert_eq!(client.client_id(), ClientId::from(DERIVE));
2662 assert_eq!(client.account_id(), AccountId::from("DERIVE-001"));
2663 assert_eq!(client.venue(), *DERIVE_VENUE);
2664 assert_eq!(client.oms_type(), OmsType::Netting);
2665 assert_eq!(client.subaccount_id(), TEST_SUBACCOUNT);
2666 assert!(!client.is_connected());
2667 }
2668
2669 #[rstest]
2670 fn test_emit_tracked_event_suppresses_in_flight_replace_cancel_leg() {
2671 let clock = get_atomic_clock_realtime();
2678 let account_id = AccountId::from("DERIVE-001");
2679 let instrument_id = InstrumentId::from("ETH-PERP.DERIVE");
2680 let cid = ClientOrderId::from("STRAT-MOD-INFLIGHT");
2681 let stale_voi = VenueOrderId::from("ord-stale-1");
2682 let identity = OrderIdentity {
2683 instrument_id,
2684 strategy_id: StrategyId::from("S-1"),
2685 order_side: OrderSide::Buy,
2686 order_type: OrderType::Limit,
2687 };
2688 let report = OrderStatusReport::new(
2691 account_id,
2692 instrument_id,
2693 Some(cid),
2694 stale_voi,
2695 OrderSide::Buy,
2696 OrderType::Limit,
2697 TimeInForce::Gtc,
2698 OrderStatus::Canceled,
2699 Quantity::from("1.000"),
2700 Quantity::from("0.000"),
2701 UnixNanos::from(1_000),
2702 UnixNanos::from(2_000),
2703 UnixNanos::from(3_000),
2704 None,
2705 );
2706
2707 let new_emitter = || {
2708 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2709 let mut emitter = ExecutionEventEmitter::new(
2710 clock,
2711 TraderId::from("TRADER-001"),
2712 account_id,
2713 AccountType::Margin,
2714 Some(Currency::USDC()),
2715 );
2716 emitter.set_sender(tx);
2717 (emitter, rx)
2718 };
2719
2720 let (emitter, mut rx) = new_emitter();
2723 let state = WsDispatchState::new();
2724 state.mark_pending_modify(cid, stale_voi);
2725 emit_tracked_order_event(
2726 &emitter,
2727 &state,
2728 cid,
2729 identity,
2730 &report,
2731 account_id,
2732 UnixNanos::from(0),
2733 );
2734 let suppressed = rx.try_recv().is_err();
2735
2736 let (emitter, mut rx) = new_emitter();
2739 let state = WsDispatchState::new();
2740 state.mark_pending_modify(cid, VenueOrderId::from("ord-other"));
2741 emit_tracked_order_event(
2742 &emitter,
2743 &state,
2744 cid,
2745 identity,
2746 &report,
2747 account_id,
2748 UnixNanos::from(0),
2749 );
2750 let mut saw_canceled = false;
2751
2752 while let Ok(event) = rx.try_recv() {
2753 if matches!(event, ExecutionEvent::Order(OrderEventAny::Canceled(_))) {
2754 saw_canceled = true;
2755 }
2756 }
2757
2758 assert!(
2759 suppressed,
2760 "in-flight cancel-of-old leg must be suppressed by the pending-modify marker",
2761 );
2762 assert!(
2763 saw_canceled,
2764 "a pending-modify marker for a different venue order id must not suppress",
2765 );
2766 }
2767}