Skip to main content

nautilus_bitmex/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30    clients::DataClient,
31    live::{runner::get_data_event_sender, runtime::get_runtime},
32    messages::{
33        DataEvent,
34        data::{
35            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
36            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
38            SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
39            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
40            UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
41            UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
42            UnsubscribeQuotes, UnsubscribeTrades,
43        },
44    },
45};
46use nautilus_core::{
47    datetime::datetime_to_unix_nanos,
48    time::{AtomicTime, get_atomic_clock_realtime},
49};
50use nautilus_model::{
51    data::Data,
52    enums::BookType,
53    identifiers::{ClientId, InstrumentId, Venue},
54    instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60    common::consts::BITMEX_VENUE,
61    config::BitmexDataClientConfig,
62    http::client::BitmexHttpClient,
63    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67enum BitmexBookChannel {
68    OrderBookL2,
69    OrderBookL2_25,
70    OrderBook10,
71}
72
73#[derive(Debug)]
74pub struct BitmexDataClient {
75    client_id: ClientId,
76    config: BitmexDataClientConfig,
77    http_client: BitmexHttpClient,
78    ws_client: Option<BitmexWebSocketClient>,
79    is_connected: AtomicBool,
80    cancellation_token: CancellationToken,
81    tasks: Vec<JoinHandle<()>>,
82    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
83    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
84    book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
85    clock: &'static AtomicTime,
86    instrument_refresh_active: bool,
87}
88
89impl BitmexDataClient {
90    /// Creates a new [`BitmexDataClient`] instance.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the HTTP client cannot be constructed.
95    pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
96        let clock = get_atomic_clock_realtime();
97        let data_sender = get_data_event_sender();
98
99        let http_client = BitmexHttpClient::new(
100            Some(config.http_base_url()),
101            config.api_key.clone(),
102            config.api_secret.clone(),
103            config.use_testnet,
104            config.http_timeout_secs,
105            config.max_retries,
106            config.retry_delay_initial_ms,
107            config.retry_delay_max_ms,
108            config.recv_window_ms,
109            config.max_requests_per_second,
110            config.max_requests_per_minute,
111            config.http_proxy_url.clone(),
112        )
113        .context("failed to construct BitMEX HTTP client")?;
114
115        Ok(Self {
116            client_id,
117            config,
118            http_client,
119            ws_client: None,
120            is_connected: AtomicBool::new(false),
121            cancellation_token: CancellationToken::new(),
122            tasks: Vec::new(),
123            data_sender,
124            instruments: Arc::new(RwLock::new(AHashMap::new())),
125            book_channels: Arc::new(RwLock::new(AHashMap::new())),
126            clock,
127            instrument_refresh_active: false,
128        })
129    }
130
131    fn venue(&self) -> Venue {
132        *BITMEX_VENUE
133    }
134
135    fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
136        self.ws_client
137            .as_ref()
138            .context("websocket client not initialized; call connect first")
139    }
140
141    fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
142        self.ws_client
143            .as_mut()
144            .context("websocket client not initialized; call connect first")
145    }
146
147    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
148        if let Err(e) = sender.send(DataEvent::Data(data)) {
149            log::error!("Failed to emit data event: {e}");
150        }
151    }
152
153    fn spawn_ws<F>(&self, fut: F, context: &'static str)
154    where
155        F: Future<Output = anyhow::Result<()>> + Send + 'static,
156    {
157        get_runtime().spawn(async move {
158            if let Err(e) = fut.await {
159                log::error!("{context}: {e:?}");
160            }
161        });
162    }
163
164    fn spawn_stream_task(
165        &mut self,
166        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
167    ) -> anyhow::Result<()> {
168        let data_sender = self.data_sender.clone();
169        let instruments = Arc::clone(&self.instruments);
170        let cancellation = self.cancellation_token.clone();
171
172        let handle = get_runtime().spawn(async move {
173            tokio::pin!(stream);
174
175            loop {
176                tokio::select! {
177                    maybe_msg = stream.next() => {
178                        match maybe_msg {
179                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
180                            None => {
181                                log::debug!("BitMEX websocket stream ended");
182                                break;
183                            }
184                        }
185                    }
186                    () = cancellation.cancelled() => {
187                        log::debug!("BitMEX websocket stream task cancelled");
188                        break;
189                    }
190                }
191            }
192        });
193
194        self.tasks.push(handle);
195        Ok(())
196    }
197
198    fn handle_ws_message(
199        message: NautilusWsMessage,
200        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
201        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
202    ) {
203        match message {
204            NautilusWsMessage::Data(payloads) => {
205                for data in payloads {
206                    Self::send_data(sender, data);
207                }
208            }
209            NautilusWsMessage::Instruments(insts) => {
210                let mut guard = instruments.write().expect("instrument cache lock poisoned");
211                for instrument in insts {
212                    let instrument_id = instrument.id();
213                    guard.insert(instrument_id, instrument.clone());
214                    if let Err(e) = sender.send(DataEvent::Instrument(instrument)) {
215                        log::error!("Failed to send instrument event: {e}");
216                    }
217                }
218            }
219            NautilusWsMessage::InstrumentStatus(status) => {
220                if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
221                    log::error!("Failed to send instrument status event: {e}");
222                }
223            }
224            NautilusWsMessage::FundingRateUpdates(updates) => {
225                for update in updates {
226                    log::debug!(
227                        "Funding rate update: instrument={}, rate={}",
228                        update.instrument_id,
229                        update.rate,
230                    );
231
232                    if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
233                        log::error!("Failed to emit funding rate event: {e}");
234                    }
235                }
236            }
237            NautilusWsMessage::OrderStatusReports(_)
238            | NautilusWsMessage::OrderUpdated(_)
239            | NautilusWsMessage::OrderUpdates(_)
240            | NautilusWsMessage::FillReports(_)
241            | NautilusWsMessage::PositionStatusReports(_)
242            | NautilusWsMessage::AccountStates(_) => {
243                log::debug!("Ignoring trading message on data client");
244            }
245            NautilusWsMessage::Reconnected => {
246                log::info!("BitMEX websocket reconnected");
247            }
248            NautilusWsMessage::Authenticated => {
249                log::debug!("BitMEX websocket authenticated");
250            }
251        }
252    }
253
254    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
255        let http = self.http_client.clone();
256        let mut instruments = http
257            .request_instruments(self.config.active_only)
258            .await
259            .context("failed to request BitMEX instruments")?;
260
261        instruments.sort_by_key(|instrument| instrument.id());
262
263        {
264            let mut guard = self
265                .instruments
266                .write()
267                .expect("instrument cache lock poisoned");
268            guard.clear();
269            for instrument in &instruments {
270                guard.insert(instrument.id(), instrument.clone());
271            }
272        }
273
274        for instrument in &instruments {
275            self.http_client.cache_instrument(instrument.clone());
276
277            if let Err(e) = self
278                .data_sender
279                .send(DataEvent::Instrument(instrument.clone()))
280            {
281                log::warn!(
282                    "Failed to send instrument event for {}: {e}",
283                    instrument.id()
284                );
285            }
286        }
287
288        Ok(instruments)
289    }
290
291    fn is_connected(&self) -> bool {
292        self.is_connected.load(Ordering::Relaxed)
293    }
294
295    fn is_disconnected(&self) -> bool {
296        !self.is_connected()
297    }
298
299    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
300        let Some(minutes) = self.config.update_instruments_interval_mins else {
301            return Ok(());
302        };
303
304        if minutes == 0 || self.instrument_refresh_active {
305            return Ok(());
306        }
307
308        let interval_secs = minutes.saturating_mul(60);
309        if interval_secs == 0 {
310            return Ok(());
311        }
312
313        let interval = Duration::from_secs(interval_secs);
314        let cancellation = self.cancellation_token.clone();
315        let instruments_cache = Arc::clone(&self.instruments);
316        let active_only = self.config.active_only;
317        let client_id = self.client_id;
318        let http_client = self.http_client.clone();
319
320        let handle = get_runtime().spawn(async move {
321            let http_client = http_client;
322            loop {
323                let sleep = tokio::time::sleep(interval);
324                tokio::pin!(sleep);
325                tokio::select! {
326                    () = cancellation.cancelled() => {
327                        log::debug!("BitMEX instrument refresh task cancelled");
328                        break;
329                    }
330                    () = &mut sleep => {
331                        match http_client.request_instruments(active_only).await {
332                            Ok(mut instruments) => {
333                                instruments.sort_by_key(|instrument| instrument.id());
334
335                                {
336                                    let mut guard = instruments_cache
337                                        .write()
338                                        .expect("instrument cache lock poisoned");
339                                    guard.clear();
340                                    for instrument in &instruments {
341                                        guard.insert(instrument.id(), instrument.clone());
342                                    }
343                                }
344
345                                for instrument in instruments {
346                                    http_client.cache_instrument(instrument);
347                                }
348
349                                log::debug!("BitMEX instruments refreshed: client_id={client_id}");
350                            }
351                            Err(e) => {
352                                log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
353                            }
354                        }
355                    }
356                }
357            }
358        });
359
360        self.tasks.push(handle);
361        self.instrument_refresh_active = true;
362        Ok(())
363    }
364}
365
366#[async_trait::async_trait(?Send)]
367impl DataClient for BitmexDataClient {
368    fn client_id(&self) -> ClientId {
369        self.client_id
370    }
371
372    fn venue(&self) -> Option<Venue> {
373        Some(self.venue())
374    }
375
376    fn start(&mut self) -> anyhow::Result<()> {
377        log::info!(
378            "Starting BitMEX data client: client_id={}, use_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
379            self.client_id,
380            self.config.use_testnet,
381            self.config.http_proxy_url,
382            self.config.ws_proxy_url,
383        );
384        Ok(())
385    }
386
387    fn stop(&mut self) -> anyhow::Result<()> {
388        log::info!("Stopping BitMEX data client {id}", id = self.client_id);
389        self.cancellation_token.cancel();
390        self.is_connected.store(false, Ordering::Relaxed);
391        self.instrument_refresh_active = false;
392        Ok(())
393    }
394
395    fn reset(&mut self) -> anyhow::Result<()> {
396        log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
397        self.is_connected.store(false, Ordering::Relaxed);
398        self.cancellation_token = CancellationToken::new();
399        self.tasks.clear();
400        self.book_channels
401            .write()
402            .expect("book channel cache lock poisoned")
403            .clear();
404        self.instrument_refresh_active = false;
405        Ok(())
406    }
407
408    fn dispose(&mut self) -> anyhow::Result<()> {
409        self.stop()
410    }
411
412    async fn connect(&mut self) -> anyhow::Result<()> {
413        if self.is_connected() {
414            return Ok(());
415        }
416
417        if self.ws_client.is_none() {
418            let ws = BitmexWebSocketClient::new_with_env(
419                Some(self.config.ws_url()),
420                self.config.api_key.clone(),
421                self.config.api_secret.clone(),
422                None,
423                self.config.heartbeat_interval_secs,
424                self.config.use_testnet,
425            )
426            .context("failed to construct BitMEX websocket client")?;
427            self.ws_client = Some(ws);
428        }
429
430        let instruments = self.bootstrap_instruments().await?;
431
432        if let Some(ws) = self.ws_client.as_mut() {
433            ws.cache_instruments(instruments);
434        }
435
436        let ws = self.ws_client_mut()?;
437        ws.connect()
438            .await
439            .context("failed to connect BitMEX websocket")?;
440        ws.wait_until_active(10.0)
441            .await
442            .context("BitMEX websocket did not become active")?;
443
444        let stream = ws.stream();
445        self.spawn_stream_task(stream)?;
446        self.maybe_spawn_instrument_refresh()?;
447
448        self.is_connected.store(true, Ordering::Relaxed);
449        log::info!("Connected");
450        Ok(())
451    }
452
453    async fn disconnect(&mut self) -> anyhow::Result<()> {
454        if self.is_disconnected() {
455            return Ok(());
456        }
457
458        self.cancellation_token.cancel();
459
460        if let Some(ws) = self.ws_client.as_mut()
461            && let Err(e) = ws.close().await
462        {
463            log::warn!("Error while closing BitMEX websocket: {e:?}");
464        }
465
466        for handle in self.tasks.drain(..) {
467            if let Err(e) = handle.await {
468                log::error!("Error joining websocket task: {e:?}");
469            }
470        }
471
472        self.cancellation_token = CancellationToken::new();
473        self.is_connected.store(false, Ordering::Relaxed);
474        self.book_channels
475            .write()
476            .expect("book channel cache lock poisoned")
477            .clear();
478        self.instrument_refresh_active = false;
479
480        log::info!("Disconnected");
481        Ok(())
482    }
483
484    fn is_connected(&self) -> bool {
485        self.is_connected()
486    }
487
488    fn is_disconnected(&self) -> bool {
489        self.is_disconnected()
490    }
491
492    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
493        let ws = self.ws_client()?.clone();
494
495        self.spawn_ws(
496            async move {
497                ws.subscribe_instruments()
498                    .await
499                    .map_err(|e| anyhow::anyhow!(e))
500            },
501            "BitMEX instruments subscription",
502        );
503        Ok(())
504    }
505
506    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
507        let instrument_id = cmd.instrument_id;
508
509        if let Some(instrument) = self
510            .instruments
511            .read()
512            .expect("instrument cache lock poisoned")
513            .get(&instrument_id)
514            .cloned()
515        {
516            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
517                log::error!("Failed to send instrument event for {instrument_id}: {e}");
518            }
519            return Ok(());
520        }
521
522        log::warn!("Instrument {instrument_id} not found in BitMEX cache");
523
524        let ws = self.ws_client()?.clone();
525        self.spawn_ws(
526            async move {
527                ws.subscribe_instrument(instrument_id)
528                    .await
529                    .map_err(|e| anyhow::anyhow!(e))
530            },
531            "BitMEX instrument subscription",
532        );
533
534        Ok(())
535    }
536
537    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
538        if cmd.book_type != BookType::L2_MBP {
539            anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
540        }
541
542        let instrument_id = cmd.instrument_id;
543        let depth = cmd.depth.map_or(0, |d| d.get());
544        let channel = if depth > 0 && depth <= 25 {
545            if depth != 25 {
546                log::info!(
547                    "BitMEX only supports depth 25 for L2 deltas, using L2_25 for requested depth {depth}"
548                );
549            }
550            BitmexBookChannel::OrderBookL2_25
551        } else {
552            BitmexBookChannel::OrderBookL2
553        };
554
555        let ws = self.ws_client()?.clone();
556        let book_channels = Arc::clone(&self.book_channels);
557
558        self.spawn_ws(
559            async move {
560                match channel {
561                    BitmexBookChannel::OrderBookL2 => ws
562                        .subscribe_book(instrument_id)
563                        .await
564                        .map_err(|e| anyhow::anyhow!(e))?,
565                    BitmexBookChannel::OrderBookL2_25 => ws
566                        .subscribe_book_25(instrument_id)
567                        .await
568                        .map_err(|e| anyhow::anyhow!(e))?,
569                    BitmexBookChannel::OrderBook10 => unreachable!(),
570                }
571                book_channels
572                    .write()
573                    .expect("book channel cache lock poisoned")
574                    .insert(instrument_id, channel);
575                Ok(())
576            },
577            "BitMEX book delta subscription",
578        );
579
580        Ok(())
581    }
582
583    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
584        let instrument_id = cmd.instrument_id;
585        let ws = self.ws_client()?.clone();
586        let book_channels = Arc::clone(&self.book_channels);
587
588        self.spawn_ws(
589            async move {
590                ws.subscribe_book_depth10(instrument_id)
591                    .await
592                    .map_err(|e| anyhow::anyhow!(e))?;
593                book_channels
594                    .write()
595                    .expect("book channel cache lock poisoned")
596                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
597                Ok(())
598            },
599            "BitMEX book depth10 subscription",
600        );
601        Ok(())
602    }
603
604    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
605        let instrument_id = cmd.instrument_id;
606        let ws = self.ws_client()?.clone();
607
608        self.spawn_ws(
609            async move {
610                ws.subscribe_quotes(instrument_id)
611                    .await
612                    .map_err(|e| anyhow::anyhow!(e))
613            },
614            "BitMEX quote subscription",
615        );
616        Ok(())
617    }
618
619    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
620        let instrument_id = cmd.instrument_id;
621        let ws = self.ws_client()?.clone();
622
623        self.spawn_ws(
624            async move {
625                ws.subscribe_trades(instrument_id)
626                    .await
627                    .map_err(|e| anyhow::anyhow!(e))
628            },
629            "BitMEX trade subscription",
630        );
631        Ok(())
632    }
633
634    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
635        let instrument_id = cmd.instrument_id;
636        let ws = self.ws_client()?.clone();
637
638        self.spawn_ws(
639            async move {
640                ws.subscribe_mark_prices(instrument_id)
641                    .await
642                    .map_err(|e| anyhow::anyhow!(e))
643            },
644            "BitMEX mark price subscription",
645        );
646        Ok(())
647    }
648
649    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
650        let instrument_id = cmd.instrument_id;
651        let ws = self.ws_client()?.clone();
652
653        self.spawn_ws(
654            async move {
655                ws.subscribe_index_prices(instrument_id)
656                    .await
657                    .map_err(|e| anyhow::anyhow!(e))
658            },
659            "BitMEX index price subscription",
660        );
661        Ok(())
662    }
663
664    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
665        let instrument_id = cmd.instrument_id;
666        let ws = self.ws_client()?.clone();
667
668        self.spawn_ws(
669            async move {
670                ws.subscribe_funding_rates(instrument_id)
671                    .await
672                    .map_err(|e| anyhow::anyhow!(e))
673            },
674            "BitMEX funding rate subscription",
675        );
676        Ok(())
677    }
678
679    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
680        let bar_type = cmd.bar_type;
681        let ws = self.ws_client()?.clone();
682
683        self.spawn_ws(
684            async move {
685                ws.subscribe_bars(bar_type)
686                    .await
687                    .map_err(|e| anyhow::anyhow!(e))
688            },
689            "BitMEX bar subscription",
690        );
691        Ok(())
692    }
693
694    fn subscribe_instrument_status(
695        &mut self,
696        cmd: &SubscribeInstrumentStatus,
697    ) -> anyhow::Result<()> {
698        let instrument_id = cmd.instrument_id;
699        let ws = self.ws_client()?.clone();
700
701        self.spawn_ws(
702            async move {
703                ws.subscribe_instrument(instrument_id)
704                    .await
705                    .map_err(|e| anyhow::anyhow!(e))
706            },
707            "BitMEX instrument status subscription",
708        );
709        Ok(())
710    }
711
712    fn unsubscribe_instrument_status(
713        &mut self,
714        cmd: &UnsubscribeInstrumentStatus,
715    ) -> anyhow::Result<()> {
716        let instrument_id = cmd.instrument_id;
717        let ws = self.ws_client()?.clone();
718
719        self.spawn_ws(
720            async move {
721                ws.unsubscribe_instrument(instrument_id)
722                    .await
723                    .map_err(|e| anyhow::anyhow!(e))
724            },
725            "BitMEX instrument status unsubscribe",
726        );
727        Ok(())
728    }
729
730    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
731        let instrument_id = cmd.instrument_id;
732        let ws = self.ws_client()?.clone();
733        let book_channels = Arc::clone(&self.book_channels);
734
735        self.spawn_ws(
736            async move {
737                let channel = book_channels
738                    .write()
739                    .expect("book channel cache lock poisoned")
740                    .remove(&instrument_id);
741
742                match channel {
743                    Some(BitmexBookChannel::OrderBookL2) => ws
744                        .unsubscribe_book(instrument_id)
745                        .await
746                        .map_err(|e| anyhow::anyhow!(e))?,
747                    Some(BitmexBookChannel::OrderBookL2_25) => ws
748                        .unsubscribe_book_25(instrument_id)
749                        .await
750                        .map_err(|e| anyhow::anyhow!(e))?,
751                    Some(BitmexBookChannel::OrderBook10) => ws
752                        .unsubscribe_book_depth10(instrument_id)
753                        .await
754                        .map_err(|e| anyhow::anyhow!(e))?,
755                    None => ws
756                        .unsubscribe_book(instrument_id)
757                        .await
758                        .map_err(|e| anyhow::anyhow!(e))?,
759                }
760                Ok(())
761            },
762            "BitMEX book delta unsubscribe",
763        );
764        Ok(())
765    }
766
767    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
768        let instrument_id = cmd.instrument_id;
769        let ws = self.ws_client()?.clone();
770        let book_channels = Arc::clone(&self.book_channels);
771
772        self.spawn_ws(
773            async move {
774                book_channels
775                    .write()
776                    .expect("book channel cache lock poisoned")
777                    .remove(&instrument_id);
778                ws.unsubscribe_book_depth10(instrument_id)
779                    .await
780                    .map_err(|e| anyhow::anyhow!(e))
781            },
782            "BitMEX book depth10 unsubscribe",
783        );
784        Ok(())
785    }
786
787    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
788        let instrument_id = cmd.instrument_id;
789        let ws = self.ws_client()?.clone();
790
791        self.spawn_ws(
792            async move {
793                ws.unsubscribe_quotes(instrument_id)
794                    .await
795                    .map_err(|e| anyhow::anyhow!(e))
796            },
797            "BitMEX quote unsubscribe",
798        );
799        Ok(())
800    }
801
802    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
803        let instrument_id = cmd.instrument_id;
804        let ws = self.ws_client()?.clone();
805
806        self.spawn_ws(
807            async move {
808                ws.unsubscribe_trades(instrument_id)
809                    .await
810                    .map_err(|e| anyhow::anyhow!(e))
811            },
812            "BitMEX trade unsubscribe",
813        );
814        Ok(())
815    }
816
817    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
818        let ws = self.ws_client()?.clone();
819        let instrument_id = cmd.instrument_id;
820
821        self.spawn_ws(
822            async move {
823                ws.unsubscribe_mark_prices(instrument_id)
824                    .await
825                    .map_err(|e| anyhow::anyhow!(e))
826            },
827            "BitMEX mark price unsubscribe",
828        );
829        Ok(())
830    }
831
832    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
833        let ws = self.ws_client()?.clone();
834        let instrument_id = cmd.instrument_id;
835
836        self.spawn_ws(
837            async move {
838                ws.unsubscribe_index_prices(instrument_id)
839                    .await
840                    .map_err(|e| anyhow::anyhow!(e))
841            },
842            "BitMEX index price unsubscribe",
843        );
844        Ok(())
845    }
846
847    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
848        let ws = self.ws_client()?.clone();
849        let instrument_id = cmd.instrument_id;
850
851        self.spawn_ws(
852            async move {
853                ws.unsubscribe_funding_rates(instrument_id)
854                    .await
855                    .map_err(|e| anyhow::anyhow!(e))
856            },
857            "BitMEX funding rate unsubscribe",
858        );
859        Ok(())
860    }
861
862    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
863        let bar_type = cmd.bar_type;
864        let ws = self.ws_client()?.clone();
865
866        self.spawn_ws(
867            async move {
868                ws.unsubscribe_bars(bar_type)
869                    .await
870                    .map_err(|e| anyhow::anyhow!(e))
871            },
872            "BitMEX bar unsubscribe",
873        );
874        Ok(())
875    }
876
877    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
878        if let Some(req_venue) = request.venue
879            && req_venue != self.venue()
880        {
881            log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
882        }
883        let venue = self.venue();
884
885        let http = self.http_client.clone();
886        let instruments_cache = Arc::clone(&self.instruments);
887        let sender = self.data_sender.clone();
888        let request_id = request.request_id;
889        let client_id = request.client_id.unwrap_or(self.client_id);
890        let params = request.params;
891        let start_nanos = datetime_to_unix_nanos(request.start);
892        let end_nanos = datetime_to_unix_nanos(request.end);
893        let clock = self.clock;
894        let active_only = self.config.active_only;
895
896        get_runtime().spawn(async move {
897            let http_client = http;
898            match http_client
899                .request_instruments(active_only)
900                .await
901                .context("failed to request instruments from BitMEX")
902            {
903                Ok(instruments) => {
904                    {
905                        let mut guard = instruments_cache
906                            .write()
907                            .expect("instrument cache lock poisoned");
908                        guard.clear();
909                        for instrument in &instruments {
910                            guard.insert(instrument.id(), instrument.clone());
911                            http_client.cache_instrument(instrument.clone());
912                        }
913                    }
914
915                    let response = DataResponse::Instruments(InstrumentsResponse::new(
916                        request_id,
917                        client_id,
918                        venue,
919                        instruments,
920                        start_nanos,
921                        end_nanos,
922                        clock.get_time_ns(),
923                        params,
924                    ));
925
926                    if let Err(e) = sender.send(DataEvent::Response(response)) {
927                        log::error!("Failed to send instruments response: {e}");
928                    }
929                }
930                Err(e) => log::error!("Instrument request failed: {e:?}"),
931            }
932        });
933
934        Ok(())
935    }
936
937    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
938        if let Some(instrument) = self
939            .instruments
940            .read()
941            .expect("instrument cache lock poisoned")
942            .get(&request.instrument_id)
943            .cloned()
944        {
945            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
946                request.request_id,
947                request.client_id.unwrap_or(self.client_id),
948                instrument.id(),
949                instrument,
950                datetime_to_unix_nanos(request.start),
951                datetime_to_unix_nanos(request.end),
952                self.clock.get_time_ns(),
953                request.params,
954            )));
955
956            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
957                log::error!("Failed to send instrument response: {e}");
958            }
959            return Ok(());
960        }
961
962        let http_client = self.http_client.clone();
963        let instruments_cache = Arc::clone(&self.instruments);
964        let sender = self.data_sender.clone();
965        let instrument_id = request.instrument_id;
966        let request_id = request.request_id;
967        let client_id = request.client_id.unwrap_or(self.client_id);
968        let start = request.start;
969        let end = request.end;
970        let params = request.params;
971        let clock = self.clock;
972
973        get_runtime().spawn(async move {
974            match http_client
975                .request_instrument(instrument_id)
976                .await
977                .context("failed to request instrument from BitMEX")
978            {
979                Ok(Some(instrument)) => {
980                    http_client.cache_instrument(instrument.clone());
981                    {
982                        let mut guard = instruments_cache
983                            .write()
984                            .expect("instrument cache lock poisoned");
985                        guard.insert(instrument.id(), instrument.clone());
986                    }
987
988                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
989                        request_id,
990                        client_id,
991                        instrument.id(),
992                        instrument,
993                        datetime_to_unix_nanos(start),
994                        datetime_to_unix_nanos(end),
995                        clock.get_time_ns(),
996                        params,
997                    )));
998
999                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1000                        log::error!("Failed to send instrument response: {e}");
1001                    }
1002                }
1003                Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
1004                Err(e) => log::error!("Instrument request failed: {e:?}"),
1005            }
1006        });
1007
1008        Ok(())
1009    }
1010
1011    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1012        let http = self.http_client.clone();
1013        let sender = self.data_sender.clone();
1014        let instrument_id = request.instrument_id;
1015        let start = request.start;
1016        let end = request.end;
1017        let limit = request.limit.map(|n| n.get() as u32);
1018        let request_id = request.request_id;
1019        let client_id = request.client_id.unwrap_or(self.client_id);
1020        let params = request.params;
1021        let clock = self.clock;
1022        let start_nanos = datetime_to_unix_nanos(start);
1023        let end_nanos = datetime_to_unix_nanos(end);
1024
1025        get_runtime().spawn(async move {
1026            match http
1027                .request_trades(instrument_id, start, end, limit)
1028                .await
1029                .context("failed to request trades from BitMEX")
1030            {
1031                Ok(trades) => {
1032                    let response = DataResponse::Trades(TradesResponse::new(
1033                        request_id,
1034                        client_id,
1035                        instrument_id,
1036                        trades,
1037                        start_nanos,
1038                        end_nanos,
1039                        clock.get_time_ns(),
1040                        params,
1041                    ));
1042
1043                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1044                        log::error!("Failed to send trades response: {e}");
1045                    }
1046                }
1047                Err(e) => log::error!("Trade request failed: {e:?}"),
1048            }
1049        });
1050
1051        Ok(())
1052    }
1053
1054    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1055        let http = self.http_client.clone();
1056        let sender = self.data_sender.clone();
1057        let bar_type = request.bar_type;
1058        let start = request.start;
1059        let end = request.end;
1060        let limit = request.limit.map(|n| n.get() as u32);
1061        let request_id = request.request_id;
1062        let client_id = request.client_id.unwrap_or(self.client_id);
1063        let params = request.params;
1064        let clock = self.clock;
1065        let start_nanos = datetime_to_unix_nanos(start);
1066        let end_nanos = datetime_to_unix_nanos(end);
1067
1068        get_runtime().spawn(async move {
1069            match http
1070                .request_bars(bar_type, start, end, limit, false)
1071                .await
1072                .context("failed to request bars from BitMEX")
1073            {
1074                Ok(bars) => {
1075                    let response = DataResponse::Bars(BarsResponse::new(
1076                        request_id,
1077                        client_id,
1078                        bar_type,
1079                        bars,
1080                        start_nanos,
1081                        end_nanos,
1082                        clock.get_time_ns(),
1083                        params,
1084                    ));
1085
1086                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1087                        log::error!("Failed to send bars response: {e}");
1088                    }
1089                }
1090                Err(e) => log::error!("Bar request failed: {e:?}"),
1091            }
1092        });
1093
1094        Ok(())
1095    }
1096}