1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30 clients::DataClient,
31 live::{runner::get_data_event_sender, runtime::get_runtime},
32 messages::{
33 DataEvent,
34 data::{
35 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
36 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
38 SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
39 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
40 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
41 UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
42 UnsubscribeQuotes, UnsubscribeTrades,
43 },
44 },
45};
46use nautilus_core::{
47 datetime::datetime_to_unix_nanos,
48 time::{AtomicTime, get_atomic_clock_realtime},
49};
50use nautilus_model::{
51 data::Data,
52 enums::BookType,
53 identifiers::{ClientId, InstrumentId, Venue},
54 instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60 common::consts::BITMEX_VENUE,
61 config::BitmexDataClientConfig,
62 http::client::BitmexHttpClient,
63 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67enum BitmexBookChannel {
68 OrderBookL2,
69 OrderBookL2_25,
70 OrderBook10,
71}
72
73#[derive(Debug)]
74pub struct BitmexDataClient {
75 client_id: ClientId,
76 config: BitmexDataClientConfig,
77 http_client: BitmexHttpClient,
78 ws_client: Option<BitmexWebSocketClient>,
79 is_connected: AtomicBool,
80 cancellation_token: CancellationToken,
81 tasks: Vec<JoinHandle<()>>,
82 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
83 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
84 book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
85 clock: &'static AtomicTime,
86 instrument_refresh_active: bool,
87}
88
89impl BitmexDataClient {
90 pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
96 let clock = get_atomic_clock_realtime();
97 let data_sender = get_data_event_sender();
98
99 let http_client = BitmexHttpClient::new(
100 Some(config.http_base_url()),
101 config.api_key.clone(),
102 config.api_secret.clone(),
103 config.use_testnet,
104 config.http_timeout_secs,
105 config.max_retries,
106 config.retry_delay_initial_ms,
107 config.retry_delay_max_ms,
108 config.recv_window_ms,
109 config.max_requests_per_second,
110 config.max_requests_per_minute,
111 config.http_proxy_url.clone(),
112 )
113 .context("failed to construct BitMEX HTTP client")?;
114
115 Ok(Self {
116 client_id,
117 config,
118 http_client,
119 ws_client: None,
120 is_connected: AtomicBool::new(false),
121 cancellation_token: CancellationToken::new(),
122 tasks: Vec::new(),
123 data_sender,
124 instruments: Arc::new(RwLock::new(AHashMap::new())),
125 book_channels: Arc::new(RwLock::new(AHashMap::new())),
126 clock,
127 instrument_refresh_active: false,
128 })
129 }
130
131 fn venue(&self) -> Venue {
132 *BITMEX_VENUE
133 }
134
135 fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
136 self.ws_client
137 .as_ref()
138 .context("websocket client not initialized; call connect first")
139 }
140
141 fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
142 self.ws_client
143 .as_mut()
144 .context("websocket client not initialized; call connect first")
145 }
146
147 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
148 if let Err(e) = sender.send(DataEvent::Data(data)) {
149 log::error!("Failed to emit data event: {e}");
150 }
151 }
152
153 fn spawn_ws<F>(&self, fut: F, context: &'static str)
154 where
155 F: Future<Output = anyhow::Result<()>> + Send + 'static,
156 {
157 get_runtime().spawn(async move {
158 if let Err(e) = fut.await {
159 log::error!("{context}: {e:?}");
160 }
161 });
162 }
163
164 fn spawn_stream_task(
165 &mut self,
166 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
167 ) -> anyhow::Result<()> {
168 let data_sender = self.data_sender.clone();
169 let instruments = Arc::clone(&self.instruments);
170 let cancellation = self.cancellation_token.clone();
171
172 let handle = get_runtime().spawn(async move {
173 tokio::pin!(stream);
174
175 loop {
176 tokio::select! {
177 maybe_msg = stream.next() => {
178 match maybe_msg {
179 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
180 None => {
181 log::debug!("BitMEX websocket stream ended");
182 break;
183 }
184 }
185 }
186 () = cancellation.cancelled() => {
187 log::debug!("BitMEX websocket stream task cancelled");
188 break;
189 }
190 }
191 }
192 });
193
194 self.tasks.push(handle);
195 Ok(())
196 }
197
198 fn handle_ws_message(
199 message: NautilusWsMessage,
200 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
201 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
202 ) {
203 match message {
204 NautilusWsMessage::Data(payloads) => {
205 for data in payloads {
206 Self::send_data(sender, data);
207 }
208 }
209 NautilusWsMessage::Instruments(insts) => {
210 let mut guard = instruments.write().expect("instrument cache lock poisoned");
211 for instrument in insts {
212 let instrument_id = instrument.id();
213 guard.insert(instrument_id, instrument.clone());
214 if let Err(e) = sender.send(DataEvent::Instrument(instrument)) {
215 log::error!("Failed to send instrument event: {e}");
216 }
217 }
218 }
219 NautilusWsMessage::InstrumentStatus(status) => {
220 if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
221 log::error!("Failed to send instrument status event: {e}");
222 }
223 }
224 NautilusWsMessage::FundingRateUpdates(updates) => {
225 for update in updates {
226 log::debug!(
227 "Funding rate update: instrument={}, rate={}",
228 update.instrument_id,
229 update.rate,
230 );
231
232 if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
233 log::error!("Failed to emit funding rate event: {e}");
234 }
235 }
236 }
237 NautilusWsMessage::OrderStatusReports(_)
238 | NautilusWsMessage::OrderUpdated(_)
239 | NautilusWsMessage::OrderUpdates(_)
240 | NautilusWsMessage::FillReports(_)
241 | NautilusWsMessage::PositionStatusReports(_)
242 | NautilusWsMessage::AccountStates(_) => {
243 log::debug!("Ignoring trading message on data client");
244 }
245 NautilusWsMessage::Reconnected => {
246 log::info!("BitMEX websocket reconnected");
247 }
248 NautilusWsMessage::Authenticated => {
249 log::debug!("BitMEX websocket authenticated");
250 }
251 }
252 }
253
254 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
255 let http = self.http_client.clone();
256 let mut instruments = http
257 .request_instruments(self.config.active_only)
258 .await
259 .context("failed to request BitMEX instruments")?;
260
261 instruments.sort_by_key(|instrument| instrument.id());
262
263 {
264 let mut guard = self
265 .instruments
266 .write()
267 .expect("instrument cache lock poisoned");
268 guard.clear();
269 for instrument in &instruments {
270 guard.insert(instrument.id(), instrument.clone());
271 }
272 }
273
274 for instrument in &instruments {
275 self.http_client.cache_instrument(instrument.clone());
276
277 if let Err(e) = self
278 .data_sender
279 .send(DataEvent::Instrument(instrument.clone()))
280 {
281 log::warn!(
282 "Failed to send instrument event for {}: {e}",
283 instrument.id()
284 );
285 }
286 }
287
288 Ok(instruments)
289 }
290
291 fn is_connected(&self) -> bool {
292 self.is_connected.load(Ordering::Relaxed)
293 }
294
295 fn is_disconnected(&self) -> bool {
296 !self.is_connected()
297 }
298
299 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
300 let Some(minutes) = self.config.update_instruments_interval_mins else {
301 return Ok(());
302 };
303
304 if minutes == 0 || self.instrument_refresh_active {
305 return Ok(());
306 }
307
308 let interval_secs = minutes.saturating_mul(60);
309 if interval_secs == 0 {
310 return Ok(());
311 }
312
313 let interval = Duration::from_secs(interval_secs);
314 let cancellation = self.cancellation_token.clone();
315 let instruments_cache = Arc::clone(&self.instruments);
316 let active_only = self.config.active_only;
317 let client_id = self.client_id;
318 let http_client = self.http_client.clone();
319
320 let handle = get_runtime().spawn(async move {
321 let http_client = http_client;
322 loop {
323 let sleep = tokio::time::sleep(interval);
324 tokio::pin!(sleep);
325 tokio::select! {
326 () = cancellation.cancelled() => {
327 log::debug!("BitMEX instrument refresh task cancelled");
328 break;
329 }
330 () = &mut sleep => {
331 match http_client.request_instruments(active_only).await {
332 Ok(mut instruments) => {
333 instruments.sort_by_key(|instrument| instrument.id());
334
335 {
336 let mut guard = instruments_cache
337 .write()
338 .expect("instrument cache lock poisoned");
339 guard.clear();
340 for instrument in &instruments {
341 guard.insert(instrument.id(), instrument.clone());
342 }
343 }
344
345 for instrument in instruments {
346 http_client.cache_instrument(instrument);
347 }
348
349 log::debug!("BitMEX instruments refreshed: client_id={client_id}");
350 }
351 Err(e) => {
352 log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
353 }
354 }
355 }
356 }
357 }
358 });
359
360 self.tasks.push(handle);
361 self.instrument_refresh_active = true;
362 Ok(())
363 }
364}
365
366#[async_trait::async_trait(?Send)]
367impl DataClient for BitmexDataClient {
368 fn client_id(&self) -> ClientId {
369 self.client_id
370 }
371
372 fn venue(&self) -> Option<Venue> {
373 Some(self.venue())
374 }
375
376 fn start(&mut self) -> anyhow::Result<()> {
377 log::info!(
378 "Starting BitMEX data client: client_id={}, use_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
379 self.client_id,
380 self.config.use_testnet,
381 self.config.http_proxy_url,
382 self.config.ws_proxy_url,
383 );
384 Ok(())
385 }
386
387 fn stop(&mut self) -> anyhow::Result<()> {
388 log::info!("Stopping BitMEX data client {id}", id = self.client_id);
389 self.cancellation_token.cancel();
390 self.is_connected.store(false, Ordering::Relaxed);
391 self.instrument_refresh_active = false;
392 Ok(())
393 }
394
395 fn reset(&mut self) -> anyhow::Result<()> {
396 log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
397 self.is_connected.store(false, Ordering::Relaxed);
398 self.cancellation_token = CancellationToken::new();
399 self.tasks.clear();
400 self.book_channels
401 .write()
402 .expect("book channel cache lock poisoned")
403 .clear();
404 self.instrument_refresh_active = false;
405 Ok(())
406 }
407
408 fn dispose(&mut self) -> anyhow::Result<()> {
409 self.stop()
410 }
411
412 async fn connect(&mut self) -> anyhow::Result<()> {
413 if self.is_connected() {
414 return Ok(());
415 }
416
417 if self.ws_client.is_none() {
418 let ws = BitmexWebSocketClient::new_with_env(
419 Some(self.config.ws_url()),
420 self.config.api_key.clone(),
421 self.config.api_secret.clone(),
422 None,
423 self.config.heartbeat_interval_secs,
424 self.config.use_testnet,
425 )
426 .context("failed to construct BitMEX websocket client")?;
427 self.ws_client = Some(ws);
428 }
429
430 let instruments = self.bootstrap_instruments().await?;
431
432 if let Some(ws) = self.ws_client.as_mut() {
433 ws.cache_instruments(instruments);
434 }
435
436 let ws = self.ws_client_mut()?;
437 ws.connect()
438 .await
439 .context("failed to connect BitMEX websocket")?;
440 ws.wait_until_active(10.0)
441 .await
442 .context("BitMEX websocket did not become active")?;
443
444 let stream = ws.stream();
445 self.spawn_stream_task(stream)?;
446 self.maybe_spawn_instrument_refresh()?;
447
448 self.is_connected.store(true, Ordering::Relaxed);
449 log::info!("Connected");
450 Ok(())
451 }
452
453 async fn disconnect(&mut self) -> anyhow::Result<()> {
454 if self.is_disconnected() {
455 return Ok(());
456 }
457
458 self.cancellation_token.cancel();
459
460 if let Some(ws) = self.ws_client.as_mut()
461 && let Err(e) = ws.close().await
462 {
463 log::warn!("Error while closing BitMEX websocket: {e:?}");
464 }
465
466 for handle in self.tasks.drain(..) {
467 if let Err(e) = handle.await {
468 log::error!("Error joining websocket task: {e:?}");
469 }
470 }
471
472 self.cancellation_token = CancellationToken::new();
473 self.is_connected.store(false, Ordering::Relaxed);
474 self.book_channels
475 .write()
476 .expect("book channel cache lock poisoned")
477 .clear();
478 self.instrument_refresh_active = false;
479
480 log::info!("Disconnected");
481 Ok(())
482 }
483
484 fn is_connected(&self) -> bool {
485 self.is_connected()
486 }
487
488 fn is_disconnected(&self) -> bool {
489 self.is_disconnected()
490 }
491
492 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
493 let ws = self.ws_client()?.clone();
494
495 self.spawn_ws(
496 async move {
497 ws.subscribe_instruments()
498 .await
499 .map_err(|e| anyhow::anyhow!(e))
500 },
501 "BitMEX instruments subscription",
502 );
503 Ok(())
504 }
505
506 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
507 let instrument_id = cmd.instrument_id;
508
509 if let Some(instrument) = self
510 .instruments
511 .read()
512 .expect("instrument cache lock poisoned")
513 .get(&instrument_id)
514 .cloned()
515 {
516 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
517 log::error!("Failed to send instrument event for {instrument_id}: {e}");
518 }
519 return Ok(());
520 }
521
522 log::warn!("Instrument {instrument_id} not found in BitMEX cache");
523
524 let ws = self.ws_client()?.clone();
525 self.spawn_ws(
526 async move {
527 ws.subscribe_instrument(instrument_id)
528 .await
529 .map_err(|e| anyhow::anyhow!(e))
530 },
531 "BitMEX instrument subscription",
532 );
533
534 Ok(())
535 }
536
537 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
538 if cmd.book_type != BookType::L2_MBP {
539 anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
540 }
541
542 let instrument_id = cmd.instrument_id;
543 let depth = cmd.depth.map_or(0, |d| d.get());
544 let channel = if depth > 0 && depth <= 25 {
545 if depth != 25 {
546 log::info!(
547 "BitMEX only supports depth 25 for L2 deltas, using L2_25 for requested depth {depth}"
548 );
549 }
550 BitmexBookChannel::OrderBookL2_25
551 } else {
552 BitmexBookChannel::OrderBookL2
553 };
554
555 let ws = self.ws_client()?.clone();
556 let book_channels = Arc::clone(&self.book_channels);
557
558 self.spawn_ws(
559 async move {
560 match channel {
561 BitmexBookChannel::OrderBookL2 => ws
562 .subscribe_book(instrument_id)
563 .await
564 .map_err(|e| anyhow::anyhow!(e))?,
565 BitmexBookChannel::OrderBookL2_25 => ws
566 .subscribe_book_25(instrument_id)
567 .await
568 .map_err(|e| anyhow::anyhow!(e))?,
569 BitmexBookChannel::OrderBook10 => unreachable!(),
570 }
571 book_channels
572 .write()
573 .expect("book channel cache lock poisoned")
574 .insert(instrument_id, channel);
575 Ok(())
576 },
577 "BitMEX book delta subscription",
578 );
579
580 Ok(())
581 }
582
583 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
584 let instrument_id = cmd.instrument_id;
585 let ws = self.ws_client()?.clone();
586 let book_channels = Arc::clone(&self.book_channels);
587
588 self.spawn_ws(
589 async move {
590 ws.subscribe_book_depth10(instrument_id)
591 .await
592 .map_err(|e| anyhow::anyhow!(e))?;
593 book_channels
594 .write()
595 .expect("book channel cache lock poisoned")
596 .insert(instrument_id, BitmexBookChannel::OrderBook10);
597 Ok(())
598 },
599 "BitMEX book depth10 subscription",
600 );
601 Ok(())
602 }
603
604 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
605 let instrument_id = cmd.instrument_id;
606 let ws = self.ws_client()?.clone();
607
608 self.spawn_ws(
609 async move {
610 ws.subscribe_quotes(instrument_id)
611 .await
612 .map_err(|e| anyhow::anyhow!(e))
613 },
614 "BitMEX quote subscription",
615 );
616 Ok(())
617 }
618
619 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
620 let instrument_id = cmd.instrument_id;
621 let ws = self.ws_client()?.clone();
622
623 self.spawn_ws(
624 async move {
625 ws.subscribe_trades(instrument_id)
626 .await
627 .map_err(|e| anyhow::anyhow!(e))
628 },
629 "BitMEX trade subscription",
630 );
631 Ok(())
632 }
633
634 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
635 let instrument_id = cmd.instrument_id;
636 let ws = self.ws_client()?.clone();
637
638 self.spawn_ws(
639 async move {
640 ws.subscribe_mark_prices(instrument_id)
641 .await
642 .map_err(|e| anyhow::anyhow!(e))
643 },
644 "BitMEX mark price subscription",
645 );
646 Ok(())
647 }
648
649 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
650 let instrument_id = cmd.instrument_id;
651 let ws = self.ws_client()?.clone();
652
653 self.spawn_ws(
654 async move {
655 ws.subscribe_index_prices(instrument_id)
656 .await
657 .map_err(|e| anyhow::anyhow!(e))
658 },
659 "BitMEX index price subscription",
660 );
661 Ok(())
662 }
663
664 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
665 let instrument_id = cmd.instrument_id;
666 let ws = self.ws_client()?.clone();
667
668 self.spawn_ws(
669 async move {
670 ws.subscribe_funding_rates(instrument_id)
671 .await
672 .map_err(|e| anyhow::anyhow!(e))
673 },
674 "BitMEX funding rate subscription",
675 );
676 Ok(())
677 }
678
679 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
680 let bar_type = cmd.bar_type;
681 let ws = self.ws_client()?.clone();
682
683 self.spawn_ws(
684 async move {
685 ws.subscribe_bars(bar_type)
686 .await
687 .map_err(|e| anyhow::anyhow!(e))
688 },
689 "BitMEX bar subscription",
690 );
691 Ok(())
692 }
693
694 fn subscribe_instrument_status(
695 &mut self,
696 cmd: &SubscribeInstrumentStatus,
697 ) -> anyhow::Result<()> {
698 let instrument_id = cmd.instrument_id;
699 let ws = self.ws_client()?.clone();
700
701 self.spawn_ws(
702 async move {
703 ws.subscribe_instrument(instrument_id)
704 .await
705 .map_err(|e| anyhow::anyhow!(e))
706 },
707 "BitMEX instrument status subscription",
708 );
709 Ok(())
710 }
711
712 fn unsubscribe_instrument_status(
713 &mut self,
714 cmd: &UnsubscribeInstrumentStatus,
715 ) -> anyhow::Result<()> {
716 let instrument_id = cmd.instrument_id;
717 let ws = self.ws_client()?.clone();
718
719 self.spawn_ws(
720 async move {
721 ws.unsubscribe_instrument(instrument_id)
722 .await
723 .map_err(|e| anyhow::anyhow!(e))
724 },
725 "BitMEX instrument status unsubscribe",
726 );
727 Ok(())
728 }
729
730 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
731 let instrument_id = cmd.instrument_id;
732 let ws = self.ws_client()?.clone();
733 let book_channels = Arc::clone(&self.book_channels);
734
735 self.spawn_ws(
736 async move {
737 let channel = book_channels
738 .write()
739 .expect("book channel cache lock poisoned")
740 .remove(&instrument_id);
741
742 match channel {
743 Some(BitmexBookChannel::OrderBookL2) => ws
744 .unsubscribe_book(instrument_id)
745 .await
746 .map_err(|e| anyhow::anyhow!(e))?,
747 Some(BitmexBookChannel::OrderBookL2_25) => ws
748 .unsubscribe_book_25(instrument_id)
749 .await
750 .map_err(|e| anyhow::anyhow!(e))?,
751 Some(BitmexBookChannel::OrderBook10) => ws
752 .unsubscribe_book_depth10(instrument_id)
753 .await
754 .map_err(|e| anyhow::anyhow!(e))?,
755 None => ws
756 .unsubscribe_book(instrument_id)
757 .await
758 .map_err(|e| anyhow::anyhow!(e))?,
759 }
760 Ok(())
761 },
762 "BitMEX book delta unsubscribe",
763 );
764 Ok(())
765 }
766
767 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
768 let instrument_id = cmd.instrument_id;
769 let ws = self.ws_client()?.clone();
770 let book_channels = Arc::clone(&self.book_channels);
771
772 self.spawn_ws(
773 async move {
774 book_channels
775 .write()
776 .expect("book channel cache lock poisoned")
777 .remove(&instrument_id);
778 ws.unsubscribe_book_depth10(instrument_id)
779 .await
780 .map_err(|e| anyhow::anyhow!(e))
781 },
782 "BitMEX book depth10 unsubscribe",
783 );
784 Ok(())
785 }
786
787 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
788 let instrument_id = cmd.instrument_id;
789 let ws = self.ws_client()?.clone();
790
791 self.spawn_ws(
792 async move {
793 ws.unsubscribe_quotes(instrument_id)
794 .await
795 .map_err(|e| anyhow::anyhow!(e))
796 },
797 "BitMEX quote unsubscribe",
798 );
799 Ok(())
800 }
801
802 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
803 let instrument_id = cmd.instrument_id;
804 let ws = self.ws_client()?.clone();
805
806 self.spawn_ws(
807 async move {
808 ws.unsubscribe_trades(instrument_id)
809 .await
810 .map_err(|e| anyhow::anyhow!(e))
811 },
812 "BitMEX trade unsubscribe",
813 );
814 Ok(())
815 }
816
817 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
818 let ws = self.ws_client()?.clone();
819 let instrument_id = cmd.instrument_id;
820
821 self.spawn_ws(
822 async move {
823 ws.unsubscribe_mark_prices(instrument_id)
824 .await
825 .map_err(|e| anyhow::anyhow!(e))
826 },
827 "BitMEX mark price unsubscribe",
828 );
829 Ok(())
830 }
831
832 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
833 let ws = self.ws_client()?.clone();
834 let instrument_id = cmd.instrument_id;
835
836 self.spawn_ws(
837 async move {
838 ws.unsubscribe_index_prices(instrument_id)
839 .await
840 .map_err(|e| anyhow::anyhow!(e))
841 },
842 "BitMEX index price unsubscribe",
843 );
844 Ok(())
845 }
846
847 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
848 let ws = self.ws_client()?.clone();
849 let instrument_id = cmd.instrument_id;
850
851 self.spawn_ws(
852 async move {
853 ws.unsubscribe_funding_rates(instrument_id)
854 .await
855 .map_err(|e| anyhow::anyhow!(e))
856 },
857 "BitMEX funding rate unsubscribe",
858 );
859 Ok(())
860 }
861
862 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
863 let bar_type = cmd.bar_type;
864 let ws = self.ws_client()?.clone();
865
866 self.spawn_ws(
867 async move {
868 ws.unsubscribe_bars(bar_type)
869 .await
870 .map_err(|e| anyhow::anyhow!(e))
871 },
872 "BitMEX bar unsubscribe",
873 );
874 Ok(())
875 }
876
877 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
878 if let Some(req_venue) = request.venue
879 && req_venue != self.venue()
880 {
881 log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
882 }
883 let venue = self.venue();
884
885 let http = self.http_client.clone();
886 let instruments_cache = Arc::clone(&self.instruments);
887 let sender = self.data_sender.clone();
888 let request_id = request.request_id;
889 let client_id = request.client_id.unwrap_or(self.client_id);
890 let params = request.params;
891 let start_nanos = datetime_to_unix_nanos(request.start);
892 let end_nanos = datetime_to_unix_nanos(request.end);
893 let clock = self.clock;
894 let active_only = self.config.active_only;
895
896 get_runtime().spawn(async move {
897 let http_client = http;
898 match http_client
899 .request_instruments(active_only)
900 .await
901 .context("failed to request instruments from BitMEX")
902 {
903 Ok(instruments) => {
904 {
905 let mut guard = instruments_cache
906 .write()
907 .expect("instrument cache lock poisoned");
908 guard.clear();
909 for instrument in &instruments {
910 guard.insert(instrument.id(), instrument.clone());
911 http_client.cache_instrument(instrument.clone());
912 }
913 }
914
915 let response = DataResponse::Instruments(InstrumentsResponse::new(
916 request_id,
917 client_id,
918 venue,
919 instruments,
920 start_nanos,
921 end_nanos,
922 clock.get_time_ns(),
923 params,
924 ));
925
926 if let Err(e) = sender.send(DataEvent::Response(response)) {
927 log::error!("Failed to send instruments response: {e}");
928 }
929 }
930 Err(e) => log::error!("Instrument request failed: {e:?}"),
931 }
932 });
933
934 Ok(())
935 }
936
937 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
938 if let Some(instrument) = self
939 .instruments
940 .read()
941 .expect("instrument cache lock poisoned")
942 .get(&request.instrument_id)
943 .cloned()
944 {
945 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
946 request.request_id,
947 request.client_id.unwrap_or(self.client_id),
948 instrument.id(),
949 instrument,
950 datetime_to_unix_nanos(request.start),
951 datetime_to_unix_nanos(request.end),
952 self.clock.get_time_ns(),
953 request.params,
954 )));
955
956 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
957 log::error!("Failed to send instrument response: {e}");
958 }
959 return Ok(());
960 }
961
962 let http_client = self.http_client.clone();
963 let instruments_cache = Arc::clone(&self.instruments);
964 let sender = self.data_sender.clone();
965 let instrument_id = request.instrument_id;
966 let request_id = request.request_id;
967 let client_id = request.client_id.unwrap_or(self.client_id);
968 let start = request.start;
969 let end = request.end;
970 let params = request.params;
971 let clock = self.clock;
972
973 get_runtime().spawn(async move {
974 match http_client
975 .request_instrument(instrument_id)
976 .await
977 .context("failed to request instrument from BitMEX")
978 {
979 Ok(Some(instrument)) => {
980 http_client.cache_instrument(instrument.clone());
981 {
982 let mut guard = instruments_cache
983 .write()
984 .expect("instrument cache lock poisoned");
985 guard.insert(instrument.id(), instrument.clone());
986 }
987
988 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
989 request_id,
990 client_id,
991 instrument.id(),
992 instrument,
993 datetime_to_unix_nanos(start),
994 datetime_to_unix_nanos(end),
995 clock.get_time_ns(),
996 params,
997 )));
998
999 if let Err(e) = sender.send(DataEvent::Response(response)) {
1000 log::error!("Failed to send instrument response: {e}");
1001 }
1002 }
1003 Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
1004 Err(e) => log::error!("Instrument request failed: {e:?}"),
1005 }
1006 });
1007
1008 Ok(())
1009 }
1010
1011 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1012 let http = self.http_client.clone();
1013 let sender = self.data_sender.clone();
1014 let instrument_id = request.instrument_id;
1015 let start = request.start;
1016 let end = request.end;
1017 let limit = request.limit.map(|n| n.get() as u32);
1018 let request_id = request.request_id;
1019 let client_id = request.client_id.unwrap_or(self.client_id);
1020 let params = request.params;
1021 let clock = self.clock;
1022 let start_nanos = datetime_to_unix_nanos(start);
1023 let end_nanos = datetime_to_unix_nanos(end);
1024
1025 get_runtime().spawn(async move {
1026 match http
1027 .request_trades(instrument_id, start, end, limit)
1028 .await
1029 .context("failed to request trades from BitMEX")
1030 {
1031 Ok(trades) => {
1032 let response = DataResponse::Trades(TradesResponse::new(
1033 request_id,
1034 client_id,
1035 instrument_id,
1036 trades,
1037 start_nanos,
1038 end_nanos,
1039 clock.get_time_ns(),
1040 params,
1041 ));
1042
1043 if let Err(e) = sender.send(DataEvent::Response(response)) {
1044 log::error!("Failed to send trades response: {e}");
1045 }
1046 }
1047 Err(e) => log::error!("Trade request failed: {e:?}"),
1048 }
1049 });
1050
1051 Ok(())
1052 }
1053
1054 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1055 let http = self.http_client.clone();
1056 let sender = self.data_sender.clone();
1057 let bar_type = request.bar_type;
1058 let start = request.start;
1059 let end = request.end;
1060 let limit = request.limit.map(|n| n.get() as u32);
1061 let request_id = request.request_id;
1062 let client_id = request.client_id.unwrap_or(self.client_id);
1063 let params = request.params;
1064 let clock = self.clock;
1065 let start_nanos = datetime_to_unix_nanos(start);
1066 let end_nanos = datetime_to_unix_nanos(end);
1067
1068 get_runtime().spawn(async move {
1069 match http
1070 .request_bars(bar_type, start, end, limit, false)
1071 .await
1072 .context("failed to request bars from BitMEX")
1073 {
1074 Ok(bars) => {
1075 let response = DataResponse::Bars(BarsResponse::new(
1076 request_id,
1077 client_id,
1078 bar_type,
1079 bars,
1080 start_nanos,
1081 end_nanos,
1082 clock.get_time_ns(),
1083 params,
1084 ));
1085
1086 if let Err(e) = sender.send(DataEvent::Response(response)) {
1087 log::error!("Failed to send bars response: {e}");
1088 }
1089 }
1090 Err(e) => log::error!("Bar request failed: {e:?}"),
1091 }
1092 });
1093
1094 Ok(())
1095 }
1096}