1use std::{
19 future::Future,
20 str::FromStr,
21 sync::{
22 Arc, Mutex,
23 atomic::{AtomicBool, Ordering},
24 },
25 time::{Duration, Instant},
26};
27
28use anyhow::Context;
29use async_trait::async_trait;
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32 clients::ExecutionClient,
33 enums::LogLevel,
34 live::{get_runtime, runner::get_exec_event_sender},
35 messages::execution::{
36 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
37 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
38 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
39 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
40 SubmitOrderList,
41 },
42};
43use nautilus_core::{
44 Params, UnixNanos,
45 time::{AtomicTime, get_atomic_clock_realtime},
46};
47use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
48use nautilus_model::{
49 accounts::AccountAny,
50 enums::{AccountType, OmsType, OrderSide},
51 events::OrderEventAny,
52 identifiers::{AccountId, ClientId, ClientOrderId, Venue, VenueOrderId},
53 instruments::{Instrument, InstrumentAny},
54 orders::{Order, OrderAny},
55 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
56 types::{AccountBalance, MarginBalance},
57};
58use rust_decimal::prelude::ToPrimitive;
59use tokio::task::JoinHandle;
60
61use crate::{
62 broadcast::{
63 canceller::{CancelBroadcaster, CancelBroadcasterConfig},
64 submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
65 },
66 common::enums::BitmexPegPriceType,
67 config::BitmexExecClientConfig,
68 http::client::BitmexHttpClient,
69 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
70};
71
72#[derive(Debug)]
73pub struct BitmexExecutionClient {
74 core: ExecutionClientCore,
75 clock: &'static AtomicTime,
76 config: BitmexExecClientConfig,
77 emitter: ExecutionEventEmitter,
78 http_client: BitmexHttpClient,
79 ws_client: BitmexWebSocketClient,
80 _submitter: SubmitBroadcaster,
81 _canceller: CancelBroadcaster,
82 ws_stream_handle: Option<JoinHandle<()>>,
83 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
84 dms_task_handle: Option<JoinHandle<()>>,
85 dms_running: Arc<AtomicBool>,
86}
87
88impl BitmexExecutionClient {
89 fn log_report_receipt(count: usize, report_type: &str, log_level: LogLevel) {
90 let plural = if count == 1 { "" } else { "s" };
91 let message = format!("Received {count} {report_type}{plural}");
92
93 match log_level {
94 LogLevel::Off => {}
95 LogLevel::Trace => log::trace!("{message}"),
96 LogLevel::Debug => log::debug!("{message}"),
97 LogLevel::Info => log::info!("{message}"),
98 LogLevel::Warning => log::warn!("{message}"),
99 LogLevel::Error => log::error!("{message}"),
100 }
101 }
102
103 pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
109 if !config.has_api_credentials() {
110 anyhow::bail!("BitMEX execution client requires API key and secret");
111 }
112
113 let trader_id = core.trader_id;
114 let account_id = config.account_id.unwrap_or(core.account_id);
115 let clock = get_atomic_clock_realtime();
116 let emitter =
117 ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
118 let http_client = BitmexHttpClient::new(
119 Some(config.http_base_url()),
120 config.api_key.clone(),
121 config.api_secret.clone(),
122 config.use_testnet,
123 config.http_timeout_secs,
124 config.max_retries,
125 config.retry_delay_initial_ms,
126 config.retry_delay_max_ms,
127 config.recv_window_ms,
128 config.max_requests_per_second,
129 config.max_requests_per_minute,
130 config.http_proxy_url.clone(),
131 )
132 .context("failed to construct BitMEX HTTP client")?;
133 let ws_client = BitmexWebSocketClient::new_with_env(
134 Some(config.ws_url()),
135 config.api_key.clone(),
136 config.api_secret.clone(),
137 Some(account_id),
138 config.heartbeat_interval_secs,
139 config.use_testnet,
140 )
141 .context("failed to construct BitMEX execution websocket client")?;
142
143 let pool_size = config.submitter_pool_size.unwrap_or(1);
144 let submitter_proxy_urls = match &config.submitter_proxy_urls {
145 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
146 None => vec![config.http_proxy_url.clone(); pool_size],
147 };
148
149 let submitter_config = SubmitBroadcasterConfig {
150 pool_size,
151 api_key: config.api_key.clone(),
152 api_secret: config.api_secret.clone(),
153 base_url: config.base_url_http.clone(),
154 testnet: config.use_testnet,
155 timeout_secs: config.http_timeout_secs,
156 max_retries: config.max_retries,
157 retry_delay_ms: config.retry_delay_initial_ms,
158 retry_delay_max_ms: config.retry_delay_max_ms,
159 recv_window_ms: config.recv_window_ms,
160 max_requests_per_second: config.max_requests_per_second,
161 max_requests_per_minute: config.max_requests_per_minute,
162 proxy_urls: submitter_proxy_urls,
163 ..Default::default()
164 };
165
166 let _submitter = SubmitBroadcaster::new(submitter_config)
167 .context("failed to create SubmitBroadcaster")?;
168
169 let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
170 let canceller_proxy_urls = match &config.canceller_proxy_urls {
171 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
172 None => vec![config.http_proxy_url.clone(); canceller_pool_size],
173 };
174
175 let canceller_config = CancelBroadcasterConfig {
176 pool_size: canceller_pool_size,
177 api_key: config.api_key.clone(),
178 api_secret: config.api_secret.clone(),
179 base_url: config.base_url_http.clone(),
180 testnet: config.use_testnet,
181 timeout_secs: config.http_timeout_secs,
182 max_retries: config.max_retries,
183 retry_delay_ms: config.retry_delay_initial_ms,
184 retry_delay_max_ms: config.retry_delay_max_ms,
185 recv_window_ms: config.recv_window_ms,
186 max_requests_per_second: config.max_requests_per_second,
187 max_requests_per_minute: config.max_requests_per_minute,
188 proxy_urls: canceller_proxy_urls,
189 ..Default::default()
190 };
191
192 let _canceller = CancelBroadcaster::new(canceller_config)
193 .context("failed to create CancelBroadcaster")?;
194
195 Ok(Self {
196 core,
197 clock,
198 config,
199 emitter,
200 http_client,
201 ws_client,
202 _submitter,
203 _canceller,
204 ws_stream_handle: None,
205 pending_tasks: Mutex::new(Vec::new()),
206 dms_task_handle: None,
207 dms_running: Arc::new(AtomicBool::new(false)),
208 })
209 }
210
211 fn spawn_task<F>(&self, label: &'static str, fut: F)
212 where
213 F: Future<Output = anyhow::Result<()>> + Send + 'static,
214 {
215 let handle = get_runtime().spawn(async move {
216 if let Err(e) = fut.await {
217 log::error!("{label}: {e:?}");
218 }
219 });
220
221 let mut guard = self
222 .pending_tasks
223 .lock()
224 .expect("pending task lock poisoned");
225
226 guard.retain(|h| !h.is_finished());
228 guard.push(handle);
229 }
230
231 fn abort_pending_tasks(&self) {
232 let mut guard = self
233 .pending_tasks
234 .lock()
235 .expect("pending task lock poisoned");
236 for handle in guard.drain(..) {
237 handle.abort();
238 }
239 }
240
241 fn start_deadmans_switch(&mut self) {
242 let Some(timeout_secs) = self.config.deadmans_switch_timeout_secs else {
243 return;
244 };
245
246 let timeout_ms = timeout_secs * 1000;
247 let interval_secs = (timeout_secs / 4).max(1);
248
249 log::info!(
250 "Starting dead man's switch: timeout={timeout_secs}s, refresh_interval={interval_secs}s",
251 );
252
253 self.dms_running.store(true, Ordering::SeqCst);
254 let running = self.dms_running.clone();
255 let http_client = self.http_client.clone();
256
257 let handle = get_runtime().spawn(async move {
258 while running.load(Ordering::SeqCst) {
259 if let Err(e) = http_client.cancel_all_after(timeout_ms).await {
260 log::warn!("Dead man's switch heartbeat failed: {e}");
261 }
262 tokio::time::sleep(Duration::from_secs(interval_secs)).await;
263 }
264 });
265
266 self.dms_task_handle = Some(handle);
267 }
268
269 async fn stop_deadmans_switch(&mut self) {
270 if self.config.deadmans_switch_timeout_secs.is_none() {
271 return;
272 }
273
274 self.dms_running.store(false, Ordering::SeqCst);
275
276 if let Some(handle) = self.dms_task_handle.take() {
278 handle.abort();
279 let _ = handle.await;
280 }
281
282 log::info!("Disarming dead man's switch");
283
284 if let Err(e) = self.http_client.cancel_all_after(0).await {
285 log::warn!("Failed to disarm dead man's switch: {e}");
286 }
287 }
288
289 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
290 if self.core.instruments_initialized() {
291 return Ok(());
292 }
293
294 let mut instruments: Vec<InstrumentAny> = {
295 let cache = self.core.cache();
296 cache
297 .instruments(&self.core.venue, None)
298 .into_iter()
299 .cloned()
300 .collect()
301 };
302
303 if instruments.is_empty() {
304 let http = self.http_client.clone();
305 instruments = http
306 .request_instruments(self.config.active_only)
307 .await
308 .context("failed to request BitMEX instruments")?;
309 } else {
310 log::debug!(
311 "Reusing {} cached BitMEX instruments for execution client initialization",
312 instruments.len()
313 );
314 }
315
316 instruments.sort_by_key(|instrument| instrument.id());
317
318 for instrument in &instruments {
319 self.http_client.cache_instrument(instrument.clone());
320 self._submitter.cache_instrument(instrument.clone());
321 self._canceller.cache_instrument(instrument.clone());
322 }
323
324 self.ws_client.cache_instruments(instruments);
325
326 self.core.set_instruments_initialized();
327 Ok(())
328 }
329
330 async fn refresh_account_state(&self) -> anyhow::Result<()> {
331 let account_state = self
332 .http_client
333 .request_account_state(self.core.account_id)
334 .await
335 .context("failed to request BitMEX account state")?;
336
337 self.emitter.send_account_state(account_state);
338 Ok(())
339 }
340
341 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
342 let account_id = self.core.account_id;
343
344 if self.core.cache().account(&account_id).is_some() {
345 log::info!("Account {account_id} registered");
346 return Ok(());
347 }
348
349 let start = Instant::now();
350 let timeout = Duration::from_secs_f64(timeout_secs);
351 let interval = Duration::from_millis(10);
352
353 loop {
354 tokio::time::sleep(interval).await;
355
356 if self.core.cache().account(&account_id).is_some() {
357 log::info!("Account {account_id} registered");
358 return Ok(());
359 }
360
361 if start.elapsed() >= timeout {
362 anyhow::bail!(
363 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
364 );
365 }
366 }
367 }
368
369 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
370 if self.ws_stream_handle.is_some() {
371 return Ok(());
372 }
373
374 let stream = self.ws_client.stream();
375 let emitter = self.emitter.clone();
376
377 let handle = get_runtime().spawn(async move {
378 pin_mut!(stream);
379 while let Some(message) = stream.next().await {
380 dispatch_ws_message(message, &emitter);
381 }
382 });
383
384 self.ws_stream_handle = Some(handle);
385 Ok(())
386 }
387
388 fn submit_cached_order(
389 &self,
390 order: OrderAny,
391 submit_tries: Option<usize>,
392 peg_price_type: Option<BitmexPegPriceType>,
393 peg_offset_value: Option<f64>,
394 task_label: &'static str,
395 ) -> anyhow::Result<()> {
396 if order.is_closed() {
397 log::warn!("Cannot submit closed order {}", order.client_order_id());
398 return Ok(());
399 }
400
401 self.emitter.emit_order_submitted(&order);
402
403 let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
404 let http_client = self.http_client.clone();
405 let submitter = self._submitter.clone_for_async();
406 let emitter = self.emitter.clone();
407 let clock = self.clock;
408 let strategy_id = order.strategy_id();
409 let instrument_id = order.instrument_id();
410 let client_order_id = order.client_order_id();
411 let order_side = order.order_side();
412 let order_type = order.order_type();
413 let quantity = order.quantity();
414 let time_in_force = order.time_in_force();
415 let price = order.price();
416 let trigger_price = order.trigger_price();
417 let trigger_type = order.trigger_type();
418 let trailing_offset = order.trailing_offset().and_then(|d| d.to_f64());
419 let trailing_offset_type = order.trailing_offset_type();
420 let display_qty = order.display_qty();
421 let post_only = order.is_post_only();
422 let reduce_only = order.is_reduce_only();
423 let order_list_id = order.order_list_id();
424 let contingency_type = order.contingency_type();
425
426 self.spawn_task(task_label, async move {
427 let result = if use_broadcaster {
428 submitter
429 .broadcast_submit(
430 instrument_id,
431 client_order_id,
432 order_side,
433 order_type,
434 quantity,
435 time_in_force,
436 price,
437 trigger_price,
438 trigger_type,
439 trailing_offset,
440 trailing_offset_type,
441 display_qty,
442 post_only,
443 reduce_only,
444 order_list_id,
445 contingency_type,
446 submit_tries,
447 peg_price_type,
448 peg_offset_value,
449 )
450 .await
451 } else {
452 http_client
453 .submit_order(
454 instrument_id,
455 client_order_id,
456 order_side,
457 order_type,
458 quantity,
459 time_in_force,
460 price,
461 trigger_price,
462 trigger_type,
463 trailing_offset,
464 trailing_offset_type,
465 display_qty,
466 post_only,
467 reduce_only,
468 order_list_id,
469 contingency_type,
470 peg_price_type,
471 peg_offset_value,
472 )
473 .await
474 };
475
476 match result {
477 Ok(report) => emitter.send_order_status_report(report),
478 Err(e) => {
479 let error_msg = e.to_string();
480
481 if error_msg.contains("IDEMPOTENT_DUPLICATE") {
484 log::warn!(
485 "Order {client_order_id} may exist (duplicate clOrdID from all transports), \
486 awaiting WebSocket confirmation",
487 );
488 return Ok(());
489 }
490
491 let ts_event = clock.get_time_ns();
492 emitter.emit_order_rejected_event(
493 strategy_id,
494 instrument_id,
495 client_order_id,
496 &format!("submit-order-error: {error_msg}"),
497 ts_event,
498 post_only,
499 );
500 }
501 }
502 Ok(())
503 });
504
505 Ok(())
506 }
507}
508
509#[async_trait(?Send)]
510impl ExecutionClient for BitmexExecutionClient {
511 fn is_connected(&self) -> bool {
512 self.core.is_connected()
513 }
514
515 fn client_id(&self) -> ClientId {
516 self.core.client_id
517 }
518
519 fn account_id(&self) -> AccountId {
520 self.core.account_id
521 }
522
523 fn venue(&self) -> Venue {
524 self.core.venue
525 }
526
527 fn oms_type(&self) -> OmsType {
528 self.core.oms_type
529 }
530
531 fn get_account(&self) -> Option<AccountAny> {
532 self.core.cache().account(&self.core.account_id).cloned()
533 }
534
535 fn generate_account_state(
536 &self,
537 balances: Vec<AccountBalance>,
538 margins: Vec<MarginBalance>,
539 reported: bool,
540 ts_event: UnixNanos,
541 ) -> anyhow::Result<()> {
542 self.emitter
543 .emit_account_state(balances, margins, reported, ts_event);
544 Ok(())
545 }
546
547 fn start(&mut self) -> anyhow::Result<()> {
548 if self.core.is_started() {
549 return Ok(());
550 }
551
552 self.emitter.set_sender(get_exec_event_sender());
553 self.core.set_started();
554 log::info!(
555 "BitMEX execution client started: client_id={}, account_id={}, use_testnet={}, submitter_pool_size={:?}, canceller_pool_size={:?}, http_proxy_url={:?}, ws_proxy_url={:?}, submitter_proxy_urls={:?}, canceller_proxy_urls={:?}",
556 self.core.client_id,
557 self.core.account_id,
558 self.config.use_testnet,
559 self.config.submitter_pool_size,
560 self.config.canceller_pool_size,
561 self.config.http_proxy_url,
562 self.config.ws_proxy_url,
563 self.config.submitter_proxy_urls,
564 self.config.canceller_proxy_urls,
565 );
566 Ok(())
567 }
568
569 fn stop(&mut self) -> anyhow::Result<()> {
570 if self.core.is_stopped() {
571 return Ok(());
572 }
573
574 self.core.set_stopped();
575 self.core.set_disconnected();
576
577 if let Some(handle) = self.ws_stream_handle.take() {
578 handle.abort();
579 }
580
581 if let Some(handle) = self.dms_task_handle.take() {
582 handle.abort();
583 }
584 self.dms_running.store(false, Ordering::SeqCst);
585 self.abort_pending_tasks();
586 log::info!("BitMEX execution client {} stopped", self.core.client_id);
587 Ok(())
588 }
589
590 async fn connect(&mut self) -> anyhow::Result<()> {
591 if self.core.is_connected() {
592 return Ok(());
593 }
594
595 self.http_client.reset_cancellation_token();
597
598 self.ensure_instruments_initialized_async().await?;
599
600 self.ws_client.connect().await?;
601 self.ws_client.wait_until_active(10.0).await?;
602
603 self._submitter.start().await?;
605 self._canceller.start().await?;
606
607 self.ws_client.subscribe_orders().await?;
608 self.ws_client.subscribe_executions().await?;
609 self.ws_client.subscribe_positions().await?;
610 self.ws_client.subscribe_wallet().await?;
611 if let Err(e) = self.ws_client.subscribe_margin().await {
612 log::debug!("Margin subscription unavailable: {e:?}");
613 }
614
615 self.start_ws_stream()?;
616 self.refresh_account_state().await?;
617 self.await_account_registered(30.0).await?;
618
619 self.core.set_connected();
620 self.start_deadmans_switch();
621 log::info!("Connected: client_id={}", self.core.client_id);
622 Ok(())
623 }
624
625 async fn disconnect(&mut self) -> anyhow::Result<()> {
626 if self.core.is_disconnected() {
627 return Ok(());
628 }
629
630 self.stop_deadmans_switch().await;
632
633 self.http_client.cancel_all_requests();
634 self._submitter.stop().await;
635 self._canceller.stop().await;
636
637 if let Err(e) = self.ws_client.close().await {
638 log::warn!("Error while closing BitMEX execution websocket: {e:?}");
639 }
640
641 if let Some(handle) = self.ws_stream_handle.take() {
642 handle.abort();
643 }
644
645 self.abort_pending_tasks();
646 self.core.set_disconnected();
647 log::info!("Disconnected: client_id={}", self.core.client_id);
648 Ok(())
649 }
650
651 async fn generate_order_status_report(
652 &self,
653 cmd: &GenerateOrderStatusReport,
654 ) -> anyhow::Result<Option<OrderStatusReport>> {
655 let instrument_id = cmd
656 .instrument_id
657 .context("BitMEX generate_order_status_report requires an instrument identifier")?;
658
659 self.http_client
660 .query_order(
661 instrument_id,
662 cmd.client_order_id,
663 cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
664 )
665 .await
666 .context("failed to query BitMEX order status")
667 }
668
669 async fn generate_order_status_reports(
670 &self,
671 cmd: &GenerateOrderStatusReports,
672 ) -> anyhow::Result<Vec<OrderStatusReport>> {
673 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
674 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
675
676 let mut reports = self
677 .http_client
678 .request_order_status_reports(cmd.instrument_id, cmd.open_only, start_dt, end_dt, None)
679 .await
680 .context("failed to request BitMEX order status reports")?;
681
682 if let Some(start) = cmd.start {
683 reports.retain(|report| report.ts_last >= start);
684 }
685
686 if let Some(end) = cmd.end {
687 reports.retain(|report| report.ts_last <= end);
688 }
689
690 Self::log_report_receipt(reports.len(), "OrderStatusReport", cmd.log_receipt_level);
691
692 Ok(reports)
693 }
694
695 async fn generate_fill_reports(
696 &self,
697 cmd: GenerateFillReports,
698 ) -> anyhow::Result<Vec<FillReport>> {
699 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
700 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
701
702 let mut reports = self
703 .http_client
704 .request_fill_reports(cmd.instrument_id, start_dt, end_dt, None)
705 .await
706 .context("failed to request BitMEX fill reports")?;
707
708 if let Some(order_id) = cmd.venue_order_id {
709 reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
710 }
711
712 if let Some(start) = cmd.start {
713 reports.retain(|report| report.ts_event >= start);
714 }
715
716 if let Some(end) = cmd.end {
717 reports.retain(|report| report.ts_event <= end);
718 }
719
720 Self::log_report_receipt(reports.len(), "FillReport", cmd.log_receipt_level);
721
722 Ok(reports)
723 }
724
725 async fn generate_position_status_reports(
726 &self,
727 cmd: &GeneratePositionStatusReports,
728 ) -> anyhow::Result<Vec<PositionStatusReport>> {
729 let mut reports = self
730 .http_client
731 .request_position_status_reports()
732 .await
733 .context("failed to request BitMEX position reports")?;
734
735 if let Some(instrument_id) = cmd.instrument_id {
736 reports.retain(|report| report.instrument_id == instrument_id);
737 }
738
739 if let Some(start) = cmd.start {
740 reports.retain(|report| report.ts_last >= start);
741 }
742
743 if let Some(end) = cmd.end {
744 reports.retain(|report| report.ts_last <= end);
745 }
746
747 Self::log_report_receipt(reports.len(), "PositionStatusReport", cmd.log_receipt_level);
748
749 Ok(reports)
750 }
751
752 async fn generate_mass_status(
753 &self,
754 lookback_mins: Option<u64>,
755 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
756 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
757
758 let ts_now = self.clock.get_time_ns();
759 let start = lookback_mins.map(|mins| {
760 let lookback_ns = mins.saturating_mul(60).saturating_mul(1_000_000_000);
761 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
762 });
763
764 let order_cmd = GenerateOrderStatusReportsBuilder::default()
765 .ts_init(ts_now)
766 .open_only(false)
767 .start(start)
768 .build()
769 .map_err(|e| anyhow::anyhow!("{e}"))?;
770
771 let fill_cmd = GenerateFillReportsBuilder::default()
772 .ts_init(ts_now)
773 .start(start)
774 .build()
775 .map_err(|e| anyhow::anyhow!("{e}"))?;
776
777 let position_cmd = GeneratePositionStatusReportsBuilder::default()
778 .ts_init(ts_now)
779 .start(start)
780 .build()
781 .map_err(|e| anyhow::anyhow!("{e}"))?;
782
783 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
784 self.generate_order_status_reports(&order_cmd),
785 self.generate_fill_reports(fill_cmd),
786 self.generate_position_status_reports(&position_cmd),
787 )?;
788
789 let mut mass_status = ExecutionMassStatus::new(
790 self.core.client_id,
791 self.core.account_id,
792 self.core.venue,
793 ts_now,
794 None,
795 );
796 mass_status.add_order_reports(order_reports);
797 mass_status.add_fill_reports(fill_reports);
798 mass_status.add_position_reports(position_reports);
799
800 Ok(Some(mass_status))
801 }
802
803 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
804 let http_client = self.http_client.clone();
805 let emitter = self.emitter.clone();
806 let account_id = self.core.account_id;
807
808 self.spawn_task("query_account", async move {
809 match http_client.request_account_state(account_id).await {
810 Ok(account_state) => emitter.send_account_state(account_state),
811 Err(e) => log::error!("BitMEX query account failed: {e:?}"),
812 }
813 Ok(())
814 });
815
816 Ok(())
817 }
818
819 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
820 let http_client = self.http_client.clone();
821 let instrument_id = cmd.instrument_id;
822 let client_order_id = Some(cmd.client_order_id);
823 let venue_order_id = cmd.venue_order_id;
824 let emitter = self.emitter.clone();
825
826 self.spawn_task("query_order", async move {
827 match http_client
828 .request_order_status_report(instrument_id, client_order_id, venue_order_id)
829 .await
830 {
831 Ok(report) => emitter.send_order_status_report(report),
832 Err(e) => log::error!("BitMEX query order failed: {e:?}"),
833 }
834 Ok(())
835 });
836
837 Ok(())
838 }
839
840 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
841 let submit_tries = cmd
842 .params
843 .as_ref()
844 .and_then(|p| p.get_usize("submit_tries"))
845 .filter(|&n| n > 0);
846
847 let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
848 let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
849
850 let order = self
851 .core
852 .cache()
853 .order(&cmd.client_order_id)
854 .cloned()
855 .ok_or_else(|| {
856 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
857 })?;
858
859 self.submit_cached_order(
860 order,
861 submit_tries,
862 peg_price_type,
863 peg_offset_value,
864 "submit_order",
865 )
866 }
867
868 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
869 if cmd.order_list.client_order_ids.is_empty() {
870 log::debug!("submit_order_list called with empty order list");
871 return Ok(());
872 }
873
874 let submit_tries = cmd
875 .params
876 .as_ref()
877 .and_then(|p| p.get_usize("submit_tries"))
878 .filter(|&n| n > 0);
879
880 let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
881 let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
882
883 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
884
885 log::info!(
886 "Submitting BitMEX order list: order_list_id={}, count={}",
887 cmd.order_list.id,
888 orders.len(),
889 );
890
891 for order in orders {
892 self.submit_cached_order(
893 order,
894 submit_tries,
895 peg_price_type,
896 peg_offset_value,
897 "submit_order_list_item",
898 )?;
899 }
900
901 Ok(())
902 }
903
904 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
905 let http_client = self.http_client.clone();
906 let emitter = self.emitter.clone();
907 let instrument_id = cmd.instrument_id;
908 let client_order_id = Some(cmd.client_order_id);
909 let venue_order_id = cmd.venue_order_id;
910 let quantity = cmd.quantity;
911 let price = cmd.price;
912 let trigger_price = cmd.trigger_price;
913
914 self.spawn_task("modify_order", async move {
915 match http_client
916 .modify_order(
917 instrument_id,
918 client_order_id,
919 venue_order_id,
920 quantity,
921 price,
922 trigger_price,
923 )
924 .await
925 {
926 Ok(report) => emitter.send_order_status_report(report),
927 Err(e) => log::error!("BitMEX modify order failed: {e:?}"),
928 }
929 Ok(())
930 });
931
932 Ok(())
933 }
934
935 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
936 let canceller = self._canceller.clone_for_async();
937 let emitter = self.emitter.clone();
938 let instrument_id = cmd.instrument_id;
939 let client_order_id = Some(cmd.client_order_id);
940 let venue_order_id = cmd.venue_order_id;
941
942 self.spawn_task("cancel_order", async move {
943 match canceller
944 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
945 .await
946 {
947 Ok(Some(report)) => emitter.send_order_status_report(report),
948 Ok(None) => {
949 log::debug!("Order already cancelled: {client_order_id:?}");
951 }
952 Err(e) => log::error!("BitMEX cancel order failed: {e:?}"),
953 }
954 Ok(())
955 });
956
957 Ok(())
958 }
959
960 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
961 let canceller = self._canceller.clone_for_async();
962 let emitter = self.emitter.clone();
963 let instrument_id = cmd.instrument_id;
964 let order_side = if cmd.order_side == OrderSide::NoOrderSide {
965 log::debug!(
966 "BitMEX cancel_all_orders received NoOrderSide for {instrument_id}, using unfiltered cancel-all",
967 );
968 None
969 } else {
970 Some(cmd.order_side)
971 };
972
973 self.spawn_task("cancel_all_orders", async move {
974 match canceller
975 .broadcast_cancel_all(instrument_id, order_side)
976 .await
977 {
978 Ok(reports) => {
979 for report in reports {
980 emitter.send_order_status_report(report);
981 }
982 }
983 Err(e) => log::error!("BitMEX cancel all failed: {e:?}"),
984 }
985 Ok(())
986 });
987
988 Ok(())
989 }
990
991 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
992 let canceller = self._canceller.clone_for_async();
993 let emitter = self.emitter.clone();
994 let instrument_id = cmd.instrument_id;
995
996 let client_ids: Vec<ClientOrderId> = cmd
997 .cancels
998 .iter()
999 .map(|cancel| cancel.client_order_id)
1000 .collect();
1001
1002 let venue_ids: Vec<VenueOrderId> = cmd
1003 .cancels
1004 .iter()
1005 .filter_map(|cancel| cancel.venue_order_id)
1006 .collect();
1007
1008 let client_ids_opt = if client_ids.is_empty() {
1009 None
1010 } else {
1011 Some(client_ids)
1012 };
1013
1014 let venue_ids_opt = if venue_ids.is_empty() {
1015 None
1016 } else {
1017 Some(venue_ids)
1018 };
1019
1020 self.spawn_task("batch_cancel_orders", async move {
1021 match canceller
1022 .broadcast_batch_cancel(instrument_id, client_ids_opt, venue_ids_opt)
1023 .await
1024 {
1025 Ok(reports) => {
1026 for report in reports {
1027 emitter.send_order_status_report(report);
1028 }
1029 }
1030 Err(e) => log::error!("BitMEX batch cancel failed: {e:?}"),
1031 }
1032 Ok(())
1033 });
1034
1035 Ok(())
1036 }
1037}
1038
1039fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1041 match message {
1042 NautilusWsMessage::OrderStatusReports(reports) => {
1043 for report in reports {
1044 emitter.send_order_status_report(report);
1045 }
1046 }
1047 NautilusWsMessage::FillReports(reports) => {
1048 for report in reports {
1049 emitter.send_fill_report(report);
1050 }
1051 }
1052 NautilusWsMessage::PositionStatusReports(reports) => {
1053 for report in reports {
1054 emitter.send_position_report(report);
1055 }
1056 }
1057 NautilusWsMessage::AccountStates(states) => {
1058 for state in states {
1059 emitter.send_account_state(state);
1060 }
1061 }
1062 NautilusWsMessage::OrderUpdated(event) => {
1063 emitter.send_order_event(OrderEventAny::Updated(*event));
1064 }
1065 NautilusWsMessage::OrderUpdates(events) => {
1066 for event in events {
1067 emitter.send_order_event(OrderEventAny::Updated(event));
1068 }
1069 }
1070 NautilusWsMessage::Data(_)
1071 | NautilusWsMessage::Instruments(_)
1072 | NautilusWsMessage::InstrumentStatus(_)
1073 | NautilusWsMessage::FundingRateUpdates(_) => {
1074 log::debug!("Ignoring BitMEX data message on execution stream");
1075 }
1076 NautilusWsMessage::Reconnected => {
1077 log::info!("BitMEX execution websocket reconnected");
1078 }
1079 NautilusWsMessage::Authenticated => {
1080 log::debug!("BitMEX execution websocket authenticated");
1081 }
1082 }
1083}
1084
1085fn parse_peg_price_type(params: Option<&Params>) -> anyhow::Result<Option<BitmexPegPriceType>> {
1086 let value = params.and_then(|p| p.get_str("peg_price_type"));
1087 match value {
1088 Some(s) => BitmexPegPriceType::from_str(s)
1089 .map(Some)
1090 .map_err(|_| anyhow::anyhow!("Invalid peg_price_type: {s}")),
1091 None => Ok(None),
1092 }
1093}
1094
1095fn parse_peg_offset_value(params: Option<&Params>) -> anyhow::Result<Option<f64>> {
1096 let value = params.and_then(|p| p.get_str("peg_offset_value"));
1097 match value {
1098 Some(s) => s
1099 .parse::<f64>()
1100 .map(Some)
1101 .map_err(|_| anyhow::anyhow!("Invalid peg_offset_value: {s}")),
1102 None => Ok(None),
1103 }
1104}