Skip to main content

nautilus_bitmex/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    str::FromStr,
21    sync::{
22        Arc, Mutex,
23        atomic::{AtomicBool, Ordering},
24    },
25    time::{Duration, Instant},
26};
27
28use anyhow::Context;
29use async_trait::async_trait;
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32    clients::ExecutionClient,
33    enums::LogLevel,
34    live::{get_runtime, runner::get_exec_event_sender},
35    messages::execution::{
36        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
37        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
38        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
39        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
40        SubmitOrderList,
41    },
42};
43use nautilus_core::{
44    Params, UnixNanos,
45    time::{AtomicTime, get_atomic_clock_realtime},
46};
47use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
48use nautilus_model::{
49    accounts::AccountAny,
50    enums::{AccountType, OmsType, OrderSide},
51    events::OrderEventAny,
52    identifiers::{AccountId, ClientId, ClientOrderId, Venue, VenueOrderId},
53    instruments::{Instrument, InstrumentAny},
54    orders::{Order, OrderAny},
55    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
56    types::{AccountBalance, MarginBalance},
57};
58use rust_decimal::prelude::ToPrimitive;
59use tokio::task::JoinHandle;
60
61use crate::{
62    broadcast::{
63        canceller::{CancelBroadcaster, CancelBroadcasterConfig},
64        submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
65    },
66    common::enums::BitmexPegPriceType,
67    config::BitmexExecClientConfig,
68    http::client::BitmexHttpClient,
69    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
70};
71
72#[derive(Debug)]
73pub struct BitmexExecutionClient {
74    core: ExecutionClientCore,
75    clock: &'static AtomicTime,
76    config: BitmexExecClientConfig,
77    emitter: ExecutionEventEmitter,
78    http_client: BitmexHttpClient,
79    ws_client: BitmexWebSocketClient,
80    _submitter: SubmitBroadcaster,
81    _canceller: CancelBroadcaster,
82    ws_stream_handle: Option<JoinHandle<()>>,
83    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
84    dms_task_handle: Option<JoinHandle<()>>,
85    dms_running: Arc<AtomicBool>,
86}
87
88impl BitmexExecutionClient {
89    fn log_report_receipt(count: usize, report_type: &str, log_level: LogLevel) {
90        let plural = if count == 1 { "" } else { "s" };
91        let message = format!("Received {count} {report_type}{plural}");
92
93        match log_level {
94            LogLevel::Off => {}
95            LogLevel::Trace => log::trace!("{message}"),
96            LogLevel::Debug => log::debug!("{message}"),
97            LogLevel::Info => log::info!("{message}"),
98            LogLevel::Warning => log::warn!("{message}"),
99            LogLevel::Error => log::error!("{message}"),
100        }
101    }
102
103    /// Creates a new [`BitmexExecutionClient`].
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if either the HTTP or WebSocket client fail to construct.
108    pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
109        if !config.has_api_credentials() {
110            anyhow::bail!("BitMEX execution client requires API key and secret");
111        }
112
113        let trader_id = core.trader_id;
114        let account_id = config.account_id.unwrap_or(core.account_id);
115        let clock = get_atomic_clock_realtime();
116        let emitter =
117            ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
118        let http_client = BitmexHttpClient::new(
119            Some(config.http_base_url()),
120            config.api_key.clone(),
121            config.api_secret.clone(),
122            config.use_testnet,
123            config.http_timeout_secs,
124            config.max_retries,
125            config.retry_delay_initial_ms,
126            config.retry_delay_max_ms,
127            config.recv_window_ms,
128            config.max_requests_per_second,
129            config.max_requests_per_minute,
130            config.http_proxy_url.clone(),
131        )
132        .context("failed to construct BitMEX HTTP client")?;
133        let ws_client = BitmexWebSocketClient::new_with_env(
134            Some(config.ws_url()),
135            config.api_key.clone(),
136            config.api_secret.clone(),
137            Some(account_id),
138            config.heartbeat_interval_secs,
139            config.use_testnet,
140        )
141        .context("failed to construct BitMEX execution websocket client")?;
142
143        let pool_size = config.submitter_pool_size.unwrap_or(1);
144        let submitter_proxy_urls = match &config.submitter_proxy_urls {
145            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
146            None => vec![config.http_proxy_url.clone(); pool_size],
147        };
148
149        let submitter_config = SubmitBroadcasterConfig {
150            pool_size,
151            api_key: config.api_key.clone(),
152            api_secret: config.api_secret.clone(),
153            base_url: config.base_url_http.clone(),
154            testnet: config.use_testnet,
155            timeout_secs: config.http_timeout_secs,
156            max_retries: config.max_retries,
157            retry_delay_ms: config.retry_delay_initial_ms,
158            retry_delay_max_ms: config.retry_delay_max_ms,
159            recv_window_ms: config.recv_window_ms,
160            max_requests_per_second: config.max_requests_per_second,
161            max_requests_per_minute: config.max_requests_per_minute,
162            proxy_urls: submitter_proxy_urls,
163            ..Default::default()
164        };
165
166        let _submitter = SubmitBroadcaster::new(submitter_config)
167            .context("failed to create SubmitBroadcaster")?;
168
169        let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
170        let canceller_proxy_urls = match &config.canceller_proxy_urls {
171            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
172            None => vec![config.http_proxy_url.clone(); canceller_pool_size],
173        };
174
175        let canceller_config = CancelBroadcasterConfig {
176            pool_size: canceller_pool_size,
177            api_key: config.api_key.clone(),
178            api_secret: config.api_secret.clone(),
179            base_url: config.base_url_http.clone(),
180            testnet: config.use_testnet,
181            timeout_secs: config.http_timeout_secs,
182            max_retries: config.max_retries,
183            retry_delay_ms: config.retry_delay_initial_ms,
184            retry_delay_max_ms: config.retry_delay_max_ms,
185            recv_window_ms: config.recv_window_ms,
186            max_requests_per_second: config.max_requests_per_second,
187            max_requests_per_minute: config.max_requests_per_minute,
188            proxy_urls: canceller_proxy_urls,
189            ..Default::default()
190        };
191
192        let _canceller = CancelBroadcaster::new(canceller_config)
193            .context("failed to create CancelBroadcaster")?;
194
195        Ok(Self {
196            core,
197            clock,
198            config,
199            emitter,
200            http_client,
201            ws_client,
202            _submitter,
203            _canceller,
204            ws_stream_handle: None,
205            pending_tasks: Mutex::new(Vec::new()),
206            dms_task_handle: None,
207            dms_running: Arc::new(AtomicBool::new(false)),
208        })
209    }
210
211    fn spawn_task<F>(&self, label: &'static str, fut: F)
212    where
213        F: Future<Output = anyhow::Result<()>> + Send + 'static,
214    {
215        let handle = get_runtime().spawn(async move {
216            if let Err(e) = fut.await {
217                log::error!("{label}: {e:?}");
218            }
219        });
220
221        let mut guard = self
222            .pending_tasks
223            .lock()
224            .expect("pending task lock poisoned");
225
226        // Remove completed tasks to prevent unbounded growth
227        guard.retain(|h| !h.is_finished());
228        guard.push(handle);
229    }
230
231    fn abort_pending_tasks(&self) {
232        let mut guard = self
233            .pending_tasks
234            .lock()
235            .expect("pending task lock poisoned");
236        for handle in guard.drain(..) {
237            handle.abort();
238        }
239    }
240
241    fn start_deadmans_switch(&mut self) {
242        let Some(timeout_secs) = self.config.deadmans_switch_timeout_secs else {
243            return;
244        };
245
246        let timeout_ms = timeout_secs * 1000;
247        let interval_secs = (timeout_secs / 4).max(1);
248
249        log::info!(
250            "Starting dead man's switch: timeout={timeout_secs}s, refresh_interval={interval_secs}s",
251        );
252
253        self.dms_running.store(true, Ordering::SeqCst);
254        let running = self.dms_running.clone();
255        let http_client = self.http_client.clone();
256
257        let handle = get_runtime().spawn(async move {
258            while running.load(Ordering::SeqCst) {
259                if let Err(e) = http_client.cancel_all_after(timeout_ms).await {
260                    log::warn!("Dead man's switch heartbeat failed: {e}");
261                }
262                tokio::time::sleep(Duration::from_secs(interval_secs)).await;
263            }
264        });
265
266        self.dms_task_handle = Some(handle);
267    }
268
269    async fn stop_deadmans_switch(&mut self) {
270        if self.config.deadmans_switch_timeout_secs.is_none() {
271            return;
272        }
273
274        self.dms_running.store(false, Ordering::SeqCst);
275
276        // Abort and await loop shutdown so disconnect does not block on sleep/HTTP timeout.
277        if let Some(handle) = self.dms_task_handle.take() {
278            handle.abort();
279            let _ = handle.await;
280        }
281
282        log::info!("Disarming dead man's switch");
283
284        if let Err(e) = self.http_client.cancel_all_after(0).await {
285            log::warn!("Failed to disarm dead man's switch: {e}");
286        }
287    }
288
289    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
290        if self.core.instruments_initialized() {
291            return Ok(());
292        }
293
294        let mut instruments: Vec<InstrumentAny> = {
295            let cache = self.core.cache();
296            cache
297                .instruments(&self.core.venue, None)
298                .into_iter()
299                .cloned()
300                .collect()
301        };
302
303        if instruments.is_empty() {
304            let http = self.http_client.clone();
305            instruments = http
306                .request_instruments(self.config.active_only)
307                .await
308                .context("failed to request BitMEX instruments")?;
309        } else {
310            log::debug!(
311                "Reusing {} cached BitMEX instruments for execution client initialization",
312                instruments.len()
313            );
314        }
315
316        instruments.sort_by_key(|instrument| instrument.id());
317
318        for instrument in &instruments {
319            self.http_client.cache_instrument(instrument.clone());
320            self._submitter.cache_instrument(instrument.clone());
321            self._canceller.cache_instrument(instrument.clone());
322        }
323
324        self.ws_client.cache_instruments(instruments);
325
326        self.core.set_instruments_initialized();
327        Ok(())
328    }
329
330    async fn refresh_account_state(&self) -> anyhow::Result<()> {
331        let account_state = self
332            .http_client
333            .request_account_state(self.core.account_id)
334            .await
335            .context("failed to request BitMEX account state")?;
336
337        self.emitter.send_account_state(account_state);
338        Ok(())
339    }
340
341    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
342        let account_id = self.core.account_id;
343
344        if self.core.cache().account(&account_id).is_some() {
345            log::info!("Account {account_id} registered");
346            return Ok(());
347        }
348
349        let start = Instant::now();
350        let timeout = Duration::from_secs_f64(timeout_secs);
351        let interval = Duration::from_millis(10);
352
353        loop {
354            tokio::time::sleep(interval).await;
355
356            if self.core.cache().account(&account_id).is_some() {
357                log::info!("Account {account_id} registered");
358                return Ok(());
359            }
360
361            if start.elapsed() >= timeout {
362                anyhow::bail!(
363                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
364                );
365            }
366        }
367    }
368
369    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
370        if self.ws_stream_handle.is_some() {
371            return Ok(());
372        }
373
374        let stream = self.ws_client.stream();
375        let emitter = self.emitter.clone();
376
377        let handle = get_runtime().spawn(async move {
378            pin_mut!(stream);
379            while let Some(message) = stream.next().await {
380                dispatch_ws_message(message, &emitter);
381            }
382        });
383
384        self.ws_stream_handle = Some(handle);
385        Ok(())
386    }
387
388    fn submit_cached_order(
389        &self,
390        order: OrderAny,
391        submit_tries: Option<usize>,
392        peg_price_type: Option<BitmexPegPriceType>,
393        peg_offset_value: Option<f64>,
394        task_label: &'static str,
395    ) -> anyhow::Result<()> {
396        if order.is_closed() {
397            log::warn!("Cannot submit closed order {}", order.client_order_id());
398            return Ok(());
399        }
400
401        self.emitter.emit_order_submitted(&order);
402
403        let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
404        let http_client = self.http_client.clone();
405        let submitter = self._submitter.clone_for_async();
406        let emitter = self.emitter.clone();
407        let clock = self.clock;
408        let strategy_id = order.strategy_id();
409        let instrument_id = order.instrument_id();
410        let client_order_id = order.client_order_id();
411        let order_side = order.order_side();
412        let order_type = order.order_type();
413        let quantity = order.quantity();
414        let time_in_force = order.time_in_force();
415        let price = order.price();
416        let trigger_price = order.trigger_price();
417        let trigger_type = order.trigger_type();
418        let trailing_offset = order.trailing_offset().and_then(|d| d.to_f64());
419        let trailing_offset_type = order.trailing_offset_type();
420        let display_qty = order.display_qty();
421        let post_only = order.is_post_only();
422        let reduce_only = order.is_reduce_only();
423        let order_list_id = order.order_list_id();
424        let contingency_type = order.contingency_type();
425
426        self.spawn_task(task_label, async move {
427            let result = if use_broadcaster {
428                submitter
429                    .broadcast_submit(
430                        instrument_id,
431                        client_order_id,
432                        order_side,
433                        order_type,
434                        quantity,
435                        time_in_force,
436                        price,
437                        trigger_price,
438                        trigger_type,
439                        trailing_offset,
440                        trailing_offset_type,
441                        display_qty,
442                        post_only,
443                        reduce_only,
444                        order_list_id,
445                        contingency_type,
446                        submit_tries,
447                        peg_price_type,
448                        peg_offset_value,
449                    )
450                    .await
451            } else {
452                http_client
453                    .submit_order(
454                        instrument_id,
455                        client_order_id,
456                        order_side,
457                        order_type,
458                        quantity,
459                        time_in_force,
460                        price,
461                        trigger_price,
462                        trigger_type,
463                        trailing_offset,
464                        trailing_offset_type,
465                        display_qty,
466                        post_only,
467                        reduce_only,
468                        order_list_id,
469                        contingency_type,
470                        peg_price_type,
471                        peg_offset_value,
472                    )
473                    .await
474            };
475
476            match result {
477                Ok(report) => emitter.send_order_status_report(report),
478                Err(e) => {
479                    let error_msg = e.to_string();
480
481                    // If all transports returned "Duplicate clOrdID", the order likely exists
482                    // but the success response was lost. Wait for WebSocket confirmation.
483                    if error_msg.contains("IDEMPOTENT_DUPLICATE") {
484                        log::warn!(
485                            "Order {client_order_id} may exist (duplicate clOrdID from all transports), \
486                             awaiting WebSocket confirmation",
487                        );
488                        return Ok(());
489                    }
490
491                    let ts_event = clock.get_time_ns();
492                    emitter.emit_order_rejected_event(
493                        strategy_id,
494                        instrument_id,
495                        client_order_id,
496                        &format!("submit-order-error: {error_msg}"),
497                        ts_event,
498                        post_only,
499                    );
500                }
501            }
502            Ok(())
503        });
504
505        Ok(())
506    }
507}
508
509#[async_trait(?Send)]
510impl ExecutionClient for BitmexExecutionClient {
511    fn is_connected(&self) -> bool {
512        self.core.is_connected()
513    }
514
515    fn client_id(&self) -> ClientId {
516        self.core.client_id
517    }
518
519    fn account_id(&self) -> AccountId {
520        self.core.account_id
521    }
522
523    fn venue(&self) -> Venue {
524        self.core.venue
525    }
526
527    fn oms_type(&self) -> OmsType {
528        self.core.oms_type
529    }
530
531    fn get_account(&self) -> Option<AccountAny> {
532        self.core.cache().account(&self.core.account_id).cloned()
533    }
534
535    fn generate_account_state(
536        &self,
537        balances: Vec<AccountBalance>,
538        margins: Vec<MarginBalance>,
539        reported: bool,
540        ts_event: UnixNanos,
541    ) -> anyhow::Result<()> {
542        self.emitter
543            .emit_account_state(balances, margins, reported, ts_event);
544        Ok(())
545    }
546
547    fn start(&mut self) -> anyhow::Result<()> {
548        if self.core.is_started() {
549            return Ok(());
550        }
551
552        self.emitter.set_sender(get_exec_event_sender());
553        self.core.set_started();
554        log::info!(
555            "BitMEX execution client started: client_id={}, account_id={}, use_testnet={}, submitter_pool_size={:?}, canceller_pool_size={:?}, http_proxy_url={:?}, ws_proxy_url={:?}, submitter_proxy_urls={:?}, canceller_proxy_urls={:?}",
556            self.core.client_id,
557            self.core.account_id,
558            self.config.use_testnet,
559            self.config.submitter_pool_size,
560            self.config.canceller_pool_size,
561            self.config.http_proxy_url,
562            self.config.ws_proxy_url,
563            self.config.submitter_proxy_urls,
564            self.config.canceller_proxy_urls,
565        );
566        Ok(())
567    }
568
569    fn stop(&mut self) -> anyhow::Result<()> {
570        if self.core.is_stopped() {
571            return Ok(());
572        }
573
574        self.core.set_stopped();
575        self.core.set_disconnected();
576
577        if let Some(handle) = self.ws_stream_handle.take() {
578            handle.abort();
579        }
580
581        if let Some(handle) = self.dms_task_handle.take() {
582            handle.abort();
583        }
584        self.dms_running.store(false, Ordering::SeqCst);
585        self.abort_pending_tasks();
586        log::info!("BitMEX execution client {} stopped", self.core.client_id);
587        Ok(())
588    }
589
590    async fn connect(&mut self) -> anyhow::Result<()> {
591        if self.core.is_connected() {
592            return Ok(());
593        }
594
595        // Reset cancellation token so HTTP requests succeed after reconnect
596        self.http_client.reset_cancellation_token();
597
598        self.ensure_instruments_initialized_async().await?;
599
600        self.ws_client.connect().await?;
601        self.ws_client.wait_until_active(10.0).await?;
602
603        // Start submitter/canceller after WS connection succeeds
604        self._submitter.start().await?;
605        self._canceller.start().await?;
606
607        self.ws_client.subscribe_orders().await?;
608        self.ws_client.subscribe_executions().await?;
609        self.ws_client.subscribe_positions().await?;
610        self.ws_client.subscribe_wallet().await?;
611        if let Err(e) = self.ws_client.subscribe_margin().await {
612            log::debug!("Margin subscription unavailable: {e:?}");
613        }
614
615        self.start_ws_stream()?;
616        self.refresh_account_state().await?;
617        self.await_account_registered(30.0).await?;
618
619        self.core.set_connected();
620        self.start_deadmans_switch();
621        log::info!("Connected: client_id={}", self.core.client_id);
622        Ok(())
623    }
624
625    async fn disconnect(&mut self) -> anyhow::Result<()> {
626        if self.core.is_disconnected() {
627            return Ok(());
628        }
629
630        // Disarm DMS before cancelling requests (needs working HTTP)
631        self.stop_deadmans_switch().await;
632
633        self.http_client.cancel_all_requests();
634        self._submitter.stop().await;
635        self._canceller.stop().await;
636
637        if let Err(e) = self.ws_client.close().await {
638            log::warn!("Error while closing BitMEX execution websocket: {e:?}");
639        }
640
641        if let Some(handle) = self.ws_stream_handle.take() {
642            handle.abort();
643        }
644
645        self.abort_pending_tasks();
646        self.core.set_disconnected();
647        log::info!("Disconnected: client_id={}", self.core.client_id);
648        Ok(())
649    }
650
651    async fn generate_order_status_report(
652        &self,
653        cmd: &GenerateOrderStatusReport,
654    ) -> anyhow::Result<Option<OrderStatusReport>> {
655        let instrument_id = cmd
656            .instrument_id
657            .context("BitMEX generate_order_status_report requires an instrument identifier")?;
658
659        self.http_client
660            .query_order(
661                instrument_id,
662                cmd.client_order_id,
663                cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
664            )
665            .await
666            .context("failed to query BitMEX order status")
667    }
668
669    async fn generate_order_status_reports(
670        &self,
671        cmd: &GenerateOrderStatusReports,
672    ) -> anyhow::Result<Vec<OrderStatusReport>> {
673        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
674        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
675
676        let mut reports = self
677            .http_client
678            .request_order_status_reports(cmd.instrument_id, cmd.open_only, start_dt, end_dt, None)
679            .await
680            .context("failed to request BitMEX order status reports")?;
681
682        if let Some(start) = cmd.start {
683            reports.retain(|report| report.ts_last >= start);
684        }
685
686        if let Some(end) = cmd.end {
687            reports.retain(|report| report.ts_last <= end);
688        }
689
690        Self::log_report_receipt(reports.len(), "OrderStatusReport", cmd.log_receipt_level);
691
692        Ok(reports)
693    }
694
695    async fn generate_fill_reports(
696        &self,
697        cmd: GenerateFillReports,
698    ) -> anyhow::Result<Vec<FillReport>> {
699        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
700        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
701
702        let mut reports = self
703            .http_client
704            .request_fill_reports(cmd.instrument_id, start_dt, end_dt, None)
705            .await
706            .context("failed to request BitMEX fill reports")?;
707
708        if let Some(order_id) = cmd.venue_order_id {
709            reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
710        }
711
712        if let Some(start) = cmd.start {
713            reports.retain(|report| report.ts_event >= start);
714        }
715
716        if let Some(end) = cmd.end {
717            reports.retain(|report| report.ts_event <= end);
718        }
719
720        Self::log_report_receipt(reports.len(), "FillReport", cmd.log_receipt_level);
721
722        Ok(reports)
723    }
724
725    async fn generate_position_status_reports(
726        &self,
727        cmd: &GeneratePositionStatusReports,
728    ) -> anyhow::Result<Vec<PositionStatusReport>> {
729        let mut reports = self
730            .http_client
731            .request_position_status_reports()
732            .await
733            .context("failed to request BitMEX position reports")?;
734
735        if let Some(instrument_id) = cmd.instrument_id {
736            reports.retain(|report| report.instrument_id == instrument_id);
737        }
738
739        if let Some(start) = cmd.start {
740            reports.retain(|report| report.ts_last >= start);
741        }
742
743        if let Some(end) = cmd.end {
744            reports.retain(|report| report.ts_last <= end);
745        }
746
747        Self::log_report_receipt(reports.len(), "PositionStatusReport", cmd.log_receipt_level);
748
749        Ok(reports)
750    }
751
752    async fn generate_mass_status(
753        &self,
754        lookback_mins: Option<u64>,
755    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
756        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
757
758        let ts_now = self.clock.get_time_ns();
759        let start = lookback_mins.map(|mins| {
760            let lookback_ns = mins.saturating_mul(60).saturating_mul(1_000_000_000);
761            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
762        });
763
764        let order_cmd = GenerateOrderStatusReportsBuilder::default()
765            .ts_init(ts_now)
766            .open_only(false)
767            .start(start)
768            .build()
769            .map_err(|e| anyhow::anyhow!("{e}"))?;
770
771        let fill_cmd = GenerateFillReportsBuilder::default()
772            .ts_init(ts_now)
773            .start(start)
774            .build()
775            .map_err(|e| anyhow::anyhow!("{e}"))?;
776
777        let position_cmd = GeneratePositionStatusReportsBuilder::default()
778            .ts_init(ts_now)
779            .start(start)
780            .build()
781            .map_err(|e| anyhow::anyhow!("{e}"))?;
782
783        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
784            self.generate_order_status_reports(&order_cmd),
785            self.generate_fill_reports(fill_cmd),
786            self.generate_position_status_reports(&position_cmd),
787        )?;
788
789        let mut mass_status = ExecutionMassStatus::new(
790            self.core.client_id,
791            self.core.account_id,
792            self.core.venue,
793            ts_now,
794            None,
795        );
796        mass_status.add_order_reports(order_reports);
797        mass_status.add_fill_reports(fill_reports);
798        mass_status.add_position_reports(position_reports);
799
800        Ok(Some(mass_status))
801    }
802
803    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
804        let http_client = self.http_client.clone();
805        let emitter = self.emitter.clone();
806        let account_id = self.core.account_id;
807
808        self.spawn_task("query_account", async move {
809            match http_client.request_account_state(account_id).await {
810                Ok(account_state) => emitter.send_account_state(account_state),
811                Err(e) => log::error!("BitMEX query account failed: {e:?}"),
812            }
813            Ok(())
814        });
815
816        Ok(())
817    }
818
819    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
820        let http_client = self.http_client.clone();
821        let instrument_id = cmd.instrument_id;
822        let client_order_id = Some(cmd.client_order_id);
823        let venue_order_id = cmd.venue_order_id;
824        let emitter = self.emitter.clone();
825
826        self.spawn_task("query_order", async move {
827            match http_client
828                .request_order_status_report(instrument_id, client_order_id, venue_order_id)
829                .await
830            {
831                Ok(report) => emitter.send_order_status_report(report),
832                Err(e) => log::error!("BitMEX query order failed: {e:?}"),
833            }
834            Ok(())
835        });
836
837        Ok(())
838    }
839
840    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
841        let submit_tries = cmd
842            .params
843            .as_ref()
844            .and_then(|p| p.get_usize("submit_tries"))
845            .filter(|&n| n > 0);
846
847        let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
848        let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
849
850        let order = self
851            .core
852            .cache()
853            .order(&cmd.client_order_id)
854            .cloned()
855            .ok_or_else(|| {
856                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
857            })?;
858
859        self.submit_cached_order(
860            order,
861            submit_tries,
862            peg_price_type,
863            peg_offset_value,
864            "submit_order",
865        )
866    }
867
868    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
869        if cmd.order_list.client_order_ids.is_empty() {
870            log::debug!("submit_order_list called with empty order list");
871            return Ok(());
872        }
873
874        let submit_tries = cmd
875            .params
876            .as_ref()
877            .and_then(|p| p.get_usize("submit_tries"))
878            .filter(|&n| n > 0);
879
880        let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
881        let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
882
883        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
884
885        log::info!(
886            "Submitting BitMEX order list: order_list_id={}, count={}",
887            cmd.order_list.id,
888            orders.len(),
889        );
890
891        for order in orders {
892            self.submit_cached_order(
893                order,
894                submit_tries,
895                peg_price_type,
896                peg_offset_value,
897                "submit_order_list_item",
898            )?;
899        }
900
901        Ok(())
902    }
903
904    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
905        let http_client = self.http_client.clone();
906        let emitter = self.emitter.clone();
907        let instrument_id = cmd.instrument_id;
908        let client_order_id = Some(cmd.client_order_id);
909        let venue_order_id = cmd.venue_order_id;
910        let quantity = cmd.quantity;
911        let price = cmd.price;
912        let trigger_price = cmd.trigger_price;
913
914        self.spawn_task("modify_order", async move {
915            match http_client
916                .modify_order(
917                    instrument_id,
918                    client_order_id,
919                    venue_order_id,
920                    quantity,
921                    price,
922                    trigger_price,
923                )
924                .await
925            {
926                Ok(report) => emitter.send_order_status_report(report),
927                Err(e) => log::error!("BitMEX modify order failed: {e:?}"),
928            }
929            Ok(())
930        });
931
932        Ok(())
933    }
934
935    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
936        let canceller = self._canceller.clone_for_async();
937        let emitter = self.emitter.clone();
938        let instrument_id = cmd.instrument_id;
939        let client_order_id = Some(cmd.client_order_id);
940        let venue_order_id = cmd.venue_order_id;
941
942        self.spawn_task("cancel_order", async move {
943            match canceller
944                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
945                .await
946            {
947                Ok(Some(report)) => emitter.send_order_status_report(report),
948                Ok(None) => {
949                    // Idempotent success - order already cancelled
950                    log::debug!("Order already cancelled: {client_order_id:?}");
951                }
952                Err(e) => log::error!("BitMEX cancel order failed: {e:?}"),
953            }
954            Ok(())
955        });
956
957        Ok(())
958    }
959
960    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
961        let canceller = self._canceller.clone_for_async();
962        let emitter = self.emitter.clone();
963        let instrument_id = cmd.instrument_id;
964        let order_side = if cmd.order_side == OrderSide::NoOrderSide {
965            log::debug!(
966                "BitMEX cancel_all_orders received NoOrderSide for {instrument_id}, using unfiltered cancel-all",
967            );
968            None
969        } else {
970            Some(cmd.order_side)
971        };
972
973        self.spawn_task("cancel_all_orders", async move {
974            match canceller
975                .broadcast_cancel_all(instrument_id, order_side)
976                .await
977            {
978                Ok(reports) => {
979                    for report in reports {
980                        emitter.send_order_status_report(report);
981                    }
982                }
983                Err(e) => log::error!("BitMEX cancel all failed: {e:?}"),
984            }
985            Ok(())
986        });
987
988        Ok(())
989    }
990
991    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
992        let canceller = self._canceller.clone_for_async();
993        let emitter = self.emitter.clone();
994        let instrument_id = cmd.instrument_id;
995
996        let client_ids: Vec<ClientOrderId> = cmd
997            .cancels
998            .iter()
999            .map(|cancel| cancel.client_order_id)
1000            .collect();
1001
1002        let venue_ids: Vec<VenueOrderId> = cmd
1003            .cancels
1004            .iter()
1005            .filter_map(|cancel| cancel.venue_order_id)
1006            .collect();
1007
1008        let client_ids_opt = if client_ids.is_empty() {
1009            None
1010        } else {
1011            Some(client_ids)
1012        };
1013
1014        let venue_ids_opt = if venue_ids.is_empty() {
1015            None
1016        } else {
1017            Some(venue_ids)
1018        };
1019
1020        self.spawn_task("batch_cancel_orders", async move {
1021            match canceller
1022                .broadcast_batch_cancel(instrument_id, client_ids_opt, venue_ids_opt)
1023                .await
1024            {
1025                Ok(reports) => {
1026                    for report in reports {
1027                        emitter.send_order_status_report(report);
1028                    }
1029                }
1030                Err(e) => log::error!("BitMEX batch cancel failed: {e:?}"),
1031            }
1032            Ok(())
1033        });
1034
1035        Ok(())
1036    }
1037}
1038
1039/// Dispatches a WebSocket message using the event emitter.
1040fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1041    match message {
1042        NautilusWsMessage::OrderStatusReports(reports) => {
1043            for report in reports {
1044                emitter.send_order_status_report(report);
1045            }
1046        }
1047        NautilusWsMessage::FillReports(reports) => {
1048            for report in reports {
1049                emitter.send_fill_report(report);
1050            }
1051        }
1052        NautilusWsMessage::PositionStatusReports(reports) => {
1053            for report in reports {
1054                emitter.send_position_report(report);
1055            }
1056        }
1057        NautilusWsMessage::AccountStates(states) => {
1058            for state in states {
1059                emitter.send_account_state(state);
1060            }
1061        }
1062        NautilusWsMessage::OrderUpdated(event) => {
1063            emitter.send_order_event(OrderEventAny::Updated(*event));
1064        }
1065        NautilusWsMessage::OrderUpdates(events) => {
1066            for event in events {
1067                emitter.send_order_event(OrderEventAny::Updated(event));
1068            }
1069        }
1070        NautilusWsMessage::Data(_)
1071        | NautilusWsMessage::Instruments(_)
1072        | NautilusWsMessage::InstrumentStatus(_)
1073        | NautilusWsMessage::FundingRateUpdates(_) => {
1074            log::debug!("Ignoring BitMEX data message on execution stream");
1075        }
1076        NautilusWsMessage::Reconnected => {
1077            log::info!("BitMEX execution websocket reconnected");
1078        }
1079        NautilusWsMessage::Authenticated => {
1080            log::debug!("BitMEX execution websocket authenticated");
1081        }
1082    }
1083}
1084
1085fn parse_peg_price_type(params: Option<&Params>) -> anyhow::Result<Option<BitmexPegPriceType>> {
1086    let value = params.and_then(|p| p.get_str("peg_price_type"));
1087    match value {
1088        Some(s) => BitmexPegPriceType::from_str(s)
1089            .map(Some)
1090            .map_err(|_| anyhow::anyhow!("Invalid peg_price_type: {s}")),
1091        None => Ok(None),
1092    }
1093}
1094
1095fn parse_peg_offset_value(params: Option<&Params>) -> anyhow::Result<Option<f64>> {
1096    let value = params.and_then(|p| p.get_str("peg_offset_value"));
1097    match value {
1098        Some(s) => s
1099            .parse::<f64>()
1100            .map(Some)
1101            .map_err(|_| anyhow::anyhow!("Invalid peg_offset_value: {s}")),
1102        None => Ok(None),
1103    }
1104}