1use std::{
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU8, Ordering},
27 },
28 time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::get_runtime;
35use nautilus_core::{
36 consts::NAUTILUS_USER_AGENT,
37 env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40 data::bar::BarType,
41 enums::OrderType,
42 identifiers::{AccountId, ClientOrderId, InstrumentId},
43 instruments::{Instrument, InstrumentAny},
44};
45use nautilus_network::{
46 http::USER_AGENT,
47 mode::ConnectionMode,
48 websocket::{
49 AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
50 WebSocketConfig, channel_message_handler,
51 },
52};
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::{
57 enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
58 error::BitmexWsError,
59 handler::{FeedHandler, HandlerCommand},
60 messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
61 parse::{is_index_symbol, topic_from_bar_spec},
62};
63use crate::common::{
64 consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
65 credential::{Credential, credential_env_vars},
66};
67
68#[derive(Clone, Debug)]
76#[cfg_attr(
77 feature = "python",
78 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
79)]
80pub struct BitmexWebSocketClient {
81 url: String,
82 credential: Option<Credential>,
83 heartbeat: Option<u64>,
84 account_id: AccountId,
85 auth_tracker: AuthTracker,
86 signal: Arc<AtomicBool>,
87 connection_mode: Arc<ArcSwap<AtomicU8>>,
88 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
89 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
90 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
91 subscriptions: SubscriptionState,
92 tracked_subscriptions: Arc<DashMap<String, ()>>,
93 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
94 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
95 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
96}
97
98impl BitmexWebSocketClient {
99 pub fn new(
105 url: Option<String>,
106 api_key: Option<String>,
107 api_secret: Option<String>,
108 account_id: Option<AccountId>,
109 heartbeat: Option<u64>,
110 ) -> anyhow::Result<Self> {
111 let credential = match (api_key, api_secret) {
112 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
113 (None, None) => None,
114 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
115 };
116
117 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
118
119 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
120 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
121
122 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
126
127 Ok(Self {
128 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
129 credential,
130 heartbeat,
131 account_id,
132 auth_tracker: AuthTracker::new(),
133 signal: Arc::new(AtomicBool::new(false)),
134 connection_mode,
135 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
136 out_rx: None,
137 task_handle: None,
138 subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
139 tracked_subscriptions: Arc::new(DashMap::new()),
140 instruments_cache: Arc::new(DashMap::new()),
141 order_type_cache: Arc::new(DashMap::new()),
142 order_symbol_cache: Arc::new(DashMap::new()),
143 })
144 }
145
146 pub fn new_with_env(
157 url: Option<String>,
158 api_key: Option<String>,
159 api_secret: Option<String>,
160 account_id: Option<AccountId>,
161 heartbeat: Option<u64>,
162 testnet: bool,
163 ) -> anyhow::Result<Self> {
164 let (api_key_env, api_secret_env) = credential_env_vars(testnet);
165
166 let key = get_or_env_var_opt(api_key, api_key_env);
167 let secret = get_or_env_var_opt(api_secret, api_secret_env);
168
169 Self::new(url, key, secret, account_id, heartbeat)
170 }
171
172 pub fn from_env() -> anyhow::Result<Self> {
178 let url = get_env_var("BITMEX_WS_URL")?;
179 let (key_var, secret_var) = credential_env_vars(false);
180 let api_key = get_env_var(key_var)?;
181 let api_secret = get_env_var(secret_var)?;
182
183 Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
184 }
185
186 #[must_use]
188 pub const fn url(&self) -> &str {
189 self.url.as_str()
190 }
191
192 #[must_use]
194 pub fn api_key(&self) -> Option<&str> {
195 self.credential.as_ref().map(|c| c.api_key())
196 }
197
198 #[must_use]
200 pub fn api_key_masked(&self) -> Option<String> {
201 self.credential.as_ref().map(|c| c.api_key_masked())
202 }
203
204 #[must_use]
206 pub fn is_active(&self) -> bool {
207 let connection_mode_arc = self.connection_mode.load();
208 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
209 && !self.signal.load(Ordering::Relaxed)
210 }
211
212 #[must_use]
214 pub fn is_closed(&self) -> bool {
215 let connection_mode_arc = self.connection_mode.load();
216 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
217 || self.signal.load(Ordering::Relaxed)
218 }
219
220 pub fn set_account_id(&mut self, account_id: AccountId) {
222 self.account_id = account_id;
223 }
224
225 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
229 self.instruments_cache.clear();
230 let mut count = 0;
231
232 log::debug!("Initializing BitMEX instrument cache");
233
234 for inst in instruments {
235 let symbol = inst.symbol().inner();
236 self.instruments_cache.insert(symbol, inst.clone());
237 log::debug!("Cached instrument: {symbol}");
238 count += 1;
239 }
240
241 log::info!("BitMEX instrument cache initialized with {count} instruments");
242 }
243
244 pub fn cache_instrument(&self, instrument: InstrumentAny) {
248 self.instruments_cache
249 .insert(instrument.symbol().inner(), instrument.clone());
250
251 if let Ok(cmd_tx) = self.cmd_tx.try_read()
254 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
255 {
256 log::debug!("Failed to send instrument update to handler: {e}");
257 }
258 }
259
260 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
267 let (client, raw_rx) = self.connect_inner().await?;
268
269 self.signal.store(false, Ordering::Relaxed);
271
272 self.connection_mode.store(client.connection_mode_atomic());
274
275 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
276 self.out_rx = Some(Arc::new(out_rx));
277
278 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
279 *self.cmd_tx.write().await = cmd_tx.clone();
280
281 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
283 return Err(BitmexWsError::ClientError(format!(
284 "Failed to send WebSocketClient to handler: {e}"
285 )));
286 }
287
288 if !self.instruments_cache.is_empty() {
290 let cached_instruments: Vec<InstrumentAny> = self
291 .instruments_cache
292 .iter()
293 .map(|entry| entry.value().clone())
294 .collect();
295
296 if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
297 log::error!("Failed to replay instruments to handler: {e}");
298 }
299 }
300
301 let signal = self.signal.clone();
302 let account_id = self.account_id;
303 let credential = self.credential.clone();
304 let auth_tracker = self.auth_tracker.clone();
305 let subscriptions = self.subscriptions.clone();
306 let order_type_cache = self.order_type_cache.clone();
307 let order_symbol_cache = self.order_symbol_cache.clone();
308 let cmd_tx_for_reconnect = cmd_tx.clone();
309
310 let stream_handle = get_runtime().spawn(async move {
311 let mut handler = FeedHandler::new(
312 signal.clone(),
313 cmd_rx,
314 raw_rx,
315 out_tx,
316 account_id,
317 auth_tracker.clone(),
318 subscriptions.clone(),
319 order_type_cache,
320 order_symbol_cache,
321 );
322
323 let resubscribe_all = || {
325 let topics = subscriptions.all_topics();
327
328 if topics.is_empty() {
329 return;
330 }
331
332 log::debug!(
333 "Resubscribing to confirmed subscriptions: count={}",
334 topics.len()
335 );
336
337 for topic in &topics {
338 subscriptions.mark_subscribe(topic.as_str());
339 }
340
341 let mut payloads = Vec::with_capacity(topics.len());
343 for topic in &topics {
344 let message = BitmexSubscription {
345 op: BitmexWsOperation::Subscribe,
346 args: vec![Ustr::from(topic.as_ref())],
347 };
348
349 if let Ok(payload) = serde_json::to_string(&message) {
350 payloads.push(payload);
351 }
352 }
353
354 if let Err(e) =
355 cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads })
356 {
357 log::error!("Failed to send resubscribe command: {e}");
358 }
359 };
360
361 loop {
363 match handler.next().await {
364 Some(NautilusWsMessage::Reconnected) => {
365 if signal.load(Ordering::Relaxed) {
366 continue;
367 }
368
369 log::info!("WebSocket reconnected");
370
371 let confirmed_topics: Vec<String> = {
373 let confirmed = subscriptions.confirmed();
374 let mut topics = Vec::new();
375
376 for entry in confirmed.iter() {
377 let (channel, symbols) = entry.pair();
378
379 if *channel == BitmexWsTopic::Instrument.as_ref() {
380 continue;
381 }
382
383 for symbol in symbols {
384 if symbol.is_empty() {
385 topics.push(channel.to_string());
386 } else {
387 topics.push(format!("{channel}:{symbol}"));
388 }
389 }
390 }
391
392 topics
393 };
394
395 if !confirmed_topics.is_empty() {
396 log::debug!(
397 "Marking confirmed subscriptions as pending for replay: count={}",
398 confirmed_topics.len()
399 );
400 for topic in confirmed_topics {
401 subscriptions.mark_failure(&topic);
402 }
403 }
404
405 if let Some(cred) = &credential {
406 log::debug!("Re-authenticating after reconnection");
407
408 let expires =
409 (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
410 let signature = cred.sign("GET", "/realtime", expires, "");
411
412 let auth_message = BitmexAuthentication {
413 op: BitmexWsAuthAction::AuthKeyExpires,
414 args: (cred.api_key().to_string(), expires, signature),
415 };
416
417 if let Ok(payload) = serde_json::to_string(&auth_message) {
418 if let Err(e) = cmd_tx_for_reconnect
419 .send(HandlerCommand::Authenticate { payload })
420 {
421 log::error!("Failed to send reconnection auth command: {e}");
422 }
423 } else {
424 log::error!("Failed to serialize reconnection auth message");
425 }
426 }
427
428 if credential.is_none() {
431 log::debug!("No authentication required, resubscribing immediately");
432 resubscribe_all();
433 }
434
435 if handler.send(NautilusWsMessage::Reconnected).is_err() {
436 log::error!("Failed to forward reconnect event (receiver dropped)");
437 break;
438 }
439 }
440 Some(NautilusWsMessage::Authenticated) => {
441 log::debug!("Authenticated after reconnection, resubscribing");
442 resubscribe_all();
443 }
444 Some(msg) => {
445 if handler.send(msg).is_err() {
446 log::error!("Failed to send message (receiver dropped)");
447 break;
448 }
449 }
450 None => {
451 if handler.is_stopped() {
453 log::debug!("Stop signal received, ending message processing");
454 break;
455 }
456 log::warn!("WebSocket stream ended unexpectedly");
458 break;
459 }
460 }
461 }
462
463 log::debug!("Handler task exiting");
464 });
465
466 self.task_handle = Some(Arc::new(stream_handle));
467
468 if self.credential.is_some()
469 && let Err(e) = self.authenticate().await
470 {
471 if let Some(handle) = self.task_handle.take() {
472 handle.abort();
473 }
474 self.signal.store(true, Ordering::Relaxed);
475 return Err(e);
476 }
477
478 let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
480 self.subscriptions.mark_subscribe(&instrument_topic);
481 self.tracked_subscriptions.insert(instrument_topic, ());
482
483 let subscribe_msg = BitmexSubscription {
484 op: BitmexWsOperation::Subscribe,
485 args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
486 };
487
488 match serde_json::to_string(&subscribe_msg) {
489 Ok(subscribe_json) => {
490 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
491 topics: vec![subscribe_json],
492 }) {
493 log::error!("Failed to send subscribe command for instruments: {e}");
494 } else {
495 log::debug!("Subscribed to all instruments");
496 }
497 }
498 Err(e) => {
499 log::error!("Failed to serialize subscribe message: {e}");
500 }
501 }
502
503 Ok(())
504 }
505
506 async fn connect_inner(
512 &mut self,
513 ) -> Result<
514 (
515 WebSocketClient,
516 tokio::sync::mpsc::UnboundedReceiver<Message>,
517 ),
518 BitmexWsError,
519 > {
520 let (message_handler, rx) = channel_message_handler();
521
522 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
525 });
527
528 let config = WebSocketConfig {
529 url: self.url.clone(),
530 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
531 heartbeat: self.heartbeat,
532 heartbeat_msg: None,
533 reconnect_timeout_ms: Some(5_000),
534 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, reconnect_max_attempts: None,
539 idle_timeout_ms: None,
540 };
541
542 let keyed_quotas = vec![];
543 let client = WebSocketClient::connect(
544 config,
545 Some(message_handler),
546 Some(ping_handler),
547 None, keyed_quotas,
549 None, )
551 .await
552 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
553
554 Ok((client, rx))
555 }
556
557 async fn authenticate(&self) -> Result<(), BitmexWsError> {
564 let credential = match &self.credential {
565 Some(credential) => credential,
566 None => {
567 return Err(BitmexWsError::AuthenticationError(
568 "API credentials not available to authenticate".to_string(),
569 ));
570 }
571 };
572
573 let receiver = self.auth_tracker.begin();
574
575 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
576 let signature = credential.sign("GET", "/realtime", expires, "");
577
578 let auth_message = BitmexAuthentication {
579 op: BitmexWsAuthAction::AuthKeyExpires,
580 args: (credential.api_key().to_string(), expires, signature),
581 };
582
583 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
584 let msg = format!("Failed to serialize auth message: {e}");
585 self.auth_tracker.fail(msg.clone());
586 BitmexWsError::AuthenticationError(msg)
587 })?;
588
589 self.cmd_tx
591 .read()
592 .await
593 .send(HandlerCommand::Authenticate { payload: auth_json })
594 .map_err(|e| {
595 let msg = format!("Failed to send authenticate command: {e}");
596 self.auth_tracker.fail(msg.clone());
597 BitmexWsError::AuthenticationError(msg)
598 })?;
599
600 self.auth_tracker
601 .wait_for_result::<BitmexWsError>(
602 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
603 receiver,
604 )
605 .await
606 }
607
608 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
614 let timeout = Duration::from_secs_f64(timeout_secs);
615
616 tokio::time::timeout(timeout, async {
617 while !self.is_active() {
618 tokio::time::sleep(Duration::from_millis(10)).await;
619 }
620 })
621 .await
622 .map_err(|_| {
623 BitmexWsError::ClientError(format!(
624 "WebSocket connection timeout after {timeout_secs} seconds"
625 ))
626 })?;
627
628 Ok(())
629 }
630
631 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
639 let rx = self
640 .out_rx
641 .take()
642 .expect("Stream receiver already taken or not connected");
643 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
644 async_stream::stream! {
645 while let Some(msg) = rx.recv().await {
646 yield msg;
647 }
648 }
649 }
650
651 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
657 log::debug!("Starting close process");
658
659 self.signal.store(true, Ordering::Relaxed);
660
661 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
663 log::debug!(
664 "Failed to send disconnect command (handler may already be shut down): {e}"
665 );
666 }
667
668 if let Some(task_handle) = self.task_handle.take() {
670 match Arc::try_unwrap(task_handle) {
671 Ok(handle) => {
672 log::debug!("Waiting for task handle to complete");
673 match tokio::time::timeout(Duration::from_secs(2), handle).await {
674 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
675 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
676 Err(_) => {
677 log::warn!(
678 "Timeout waiting for task handle, task may still be running"
679 );
680 }
682 }
683 }
684 Err(arc_handle) => {
685 log::debug!(
686 "Cannot take ownership of task handle - other references exist, aborting task"
687 );
688 arc_handle.abort();
689 }
690 }
691 } else {
692 log::debug!("No task handle to await");
693 }
694
695 log::debug!("Closed");
696
697 Ok(())
698 }
699
700 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
706 log::debug!("Subscribing to topics: {topics:?}");
707
708 for topic in &topics {
709 self.subscriptions.mark_subscribe(topic.as_str());
710 self.tracked_subscriptions.insert(topic.clone(), ());
711 }
712
713 let mut payloads = Vec::with_capacity(topics.len());
715 for topic in &topics {
716 let message = BitmexSubscription {
717 op: BitmexWsOperation::Subscribe,
718 args: vec![Ustr::from(topic.as_ref())],
719 };
720 let payload = serde_json::to_string(&message).map_err(|e| {
721 BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
722 })?;
723 payloads.push(payload);
724 }
725
726 let cmd = HandlerCommand::Subscribe { topics: payloads };
728
729 self.send_cmd(cmd).await.map_err(|e| {
730 BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
731 })
732 }
733
734 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
740 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
741
742 if self.signal.load(Ordering::Relaxed) {
743 log::debug!("Shutdown signal detected, skipping unsubscribe");
744 return Ok(());
745 }
746
747 for topic in &topics {
748 self.subscriptions.mark_unsubscribe(topic.as_str());
749 self.tracked_subscriptions.remove(topic);
750 }
751
752 let mut payloads = Vec::with_capacity(topics.len());
754 for topic in &topics {
755 let message = BitmexSubscription {
756 op: BitmexWsOperation::Unsubscribe,
757 args: vec![Ustr::from(topic.as_ref())],
758 };
759
760 if let Ok(payload) = serde_json::to_string(&message) {
761 payloads.push(payload);
762 }
763 }
764
765 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
767
768 if let Err(e) = self.send_cmd(cmd).await {
769 log::debug!("Failed to send unsubscribe command: {e}");
770 }
771
772 Ok(())
773 }
774
775 #[must_use]
777 pub fn subscription_count(&self) -> usize {
778 self.subscriptions.len()
779 }
780
781 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
782 let symbol = instrument_id.symbol.inner();
783 let confirmed = self.subscriptions.confirmed();
784 let mut channels = Vec::with_capacity(confirmed.len());
785
786 for entry in confirmed.iter() {
787 let (channel, symbols) = entry.pair();
788 if symbols.contains(&symbol) {
789 channels.push(format!("{channel}:{symbol}"));
791 } else {
792 let has_channel_marker = symbols.iter().any(|s| s.is_empty());
793 if has_channel_marker
794 && (*channel == BitmexWsAuthChannel::Execution.as_ref()
795 || *channel == BitmexWsAuthChannel::Order.as_ref())
796 {
797 channels.push(channel.to_string());
799 }
800 }
801 }
802
803 channels
804 }
805
806 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
812 log::debug!("Already subscribed to all instruments on connection, skipping");
814 Ok(())
815 }
816
817 pub async fn subscribe_instrument(
823 &self,
824 instrument_id: InstrumentId,
825 ) -> Result<(), BitmexWsError> {
826 log::debug!(
828 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
829 );
830 Ok(())
831 }
832
833 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
839 let topic = BitmexWsTopic::OrderBookL2;
840 let symbol = instrument_id.symbol.inner();
841 self.subscribe(vec![format!("{topic}:{symbol}")]).await
842 }
843
844 pub async fn subscribe_book_25(
850 &self,
851 instrument_id: InstrumentId,
852 ) -> Result<(), BitmexWsError> {
853 let topic = BitmexWsTopic::OrderBookL2_25;
854 let symbol = instrument_id.symbol.inner();
855 self.subscribe(vec![format!("{topic}:{symbol}")]).await
856 }
857
858 pub async fn subscribe_book_depth10(
864 &self,
865 instrument_id: InstrumentId,
866 ) -> Result<(), BitmexWsError> {
867 let topic = BitmexWsTopic::OrderBook10;
868 let symbol = instrument_id.symbol.inner();
869 self.subscribe(vec![format!("{topic}:{symbol}")]).await
870 }
871
872 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
880 let symbol = instrument_id.symbol.inner();
881
882 if is_index_symbol(&instrument_id.symbol.inner()) {
884 log::warn!("Ignoring quote subscription for index symbol: {symbol}");
885 return Ok(());
886 }
887
888 let topic = BitmexWsTopic::Quote;
889 self.subscribe(vec![format!("{topic}:{symbol}")]).await
890 }
891
892 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
900 let symbol = instrument_id.symbol.inner();
901
902 if is_index_symbol(&symbol) {
904 log::warn!("Ignoring trade subscription for index symbol: {symbol}");
905 return Ok(());
906 }
907
908 let topic = BitmexWsTopic::Trade;
909 self.subscribe(vec![format!("{topic}:{symbol}")]).await
910 }
911
912 pub async fn subscribe_mark_prices(
918 &self,
919 instrument_id: InstrumentId,
920 ) -> Result<(), BitmexWsError> {
921 self.subscribe_instrument(instrument_id).await
922 }
923
924 pub async fn subscribe_index_prices(
930 &self,
931 instrument_id: InstrumentId,
932 ) -> Result<(), BitmexWsError> {
933 self.subscribe_instrument(instrument_id).await
934 }
935
936 pub async fn subscribe_funding_rates(
942 &self,
943 instrument_id: InstrumentId,
944 ) -> Result<(), BitmexWsError> {
945 let topic = BitmexWsTopic::Funding;
946 let symbol = instrument_id.symbol.inner();
947 self.subscribe(vec![format!("{topic}:{symbol}")]).await
948 }
949
950 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
956 let topic = topic_from_bar_spec(bar_type.spec());
957 let symbol = bar_type.instrument_id().symbol.inner();
958 self.subscribe(vec![format!("{topic}:{symbol}")]).await
959 }
960
961 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
967 log::debug!(
969 "Instruments subscription maintained for proper operation, skipping unsubscribe"
970 );
971 Ok(())
972 }
973
974 pub async fn unsubscribe_instrument(
980 &self,
981 instrument_id: InstrumentId,
982 ) -> Result<(), BitmexWsError> {
983 log::debug!(
985 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
986 );
987 Ok(())
988 }
989
990 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
996 let topic = BitmexWsTopic::OrderBookL2;
997 let symbol = instrument_id.symbol.inner();
998 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
999 }
1000
1001 pub async fn unsubscribe_book_25(
1007 &self,
1008 instrument_id: InstrumentId,
1009 ) -> Result<(), BitmexWsError> {
1010 let topic = BitmexWsTopic::OrderBookL2_25;
1011 let symbol = instrument_id.symbol.inner();
1012 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1013 }
1014
1015 pub async fn unsubscribe_book_depth10(
1021 &self,
1022 instrument_id: InstrumentId,
1023 ) -> Result<(), BitmexWsError> {
1024 let topic = BitmexWsTopic::OrderBook10;
1025 let symbol = instrument_id.symbol.inner();
1026 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1027 }
1028
1029 pub async fn unsubscribe_quotes(
1035 &self,
1036 instrument_id: InstrumentId,
1037 ) -> Result<(), BitmexWsError> {
1038 let symbol = instrument_id.symbol.inner();
1039
1040 if is_index_symbol(&symbol) {
1042 return Ok(());
1043 }
1044
1045 let topic = BitmexWsTopic::Quote;
1046 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1047 }
1048
1049 pub async fn unsubscribe_trades(
1055 &self,
1056 instrument_id: InstrumentId,
1057 ) -> Result<(), BitmexWsError> {
1058 let symbol = instrument_id.symbol.inner();
1059
1060 if is_index_symbol(&symbol) {
1062 return Ok(());
1063 }
1064
1065 let topic = BitmexWsTopic::Trade;
1066 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1067 }
1068
1069 pub async fn unsubscribe_mark_prices(
1075 &self,
1076 instrument_id: InstrumentId,
1077 ) -> Result<(), BitmexWsError> {
1078 log::debug!(
1080 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1081 );
1082 Ok(())
1083 }
1084
1085 pub async fn unsubscribe_index_prices(
1091 &self,
1092 instrument_id: InstrumentId,
1093 ) -> Result<(), BitmexWsError> {
1094 log::debug!(
1096 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1097 );
1098 Ok(())
1099 }
1100
1101 pub async fn unsubscribe_funding_rates(
1107 &self,
1108 instrument_id: InstrumentId,
1109 ) -> Result<(), BitmexWsError> {
1110 log::debug!(
1112 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1113 );
1114 Ok(())
1115 }
1116
1117 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1123 let topic = topic_from_bar_spec(bar_type.spec());
1124 let symbol = bar_type.instrument_id().symbol.inner();
1125 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1126 }
1127
1128 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1134 if self.credential.is_none() {
1135 return Err(BitmexWsError::MissingCredentials);
1136 }
1137 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1138 .await
1139 }
1140
1141 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1147 if self.credential.is_none() {
1148 return Err(BitmexWsError::MissingCredentials);
1149 }
1150 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1151 .await
1152 }
1153
1154 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1160 if self.credential.is_none() {
1161 return Err(BitmexWsError::MissingCredentials);
1162 }
1163 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1164 .await
1165 }
1166
1167 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1173 if self.credential.is_none() {
1174 return Err(BitmexWsError::MissingCredentials);
1175 }
1176 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1177 .await
1178 }
1179
1180 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1186 if self.credential.is_none() {
1187 return Err(BitmexWsError::MissingCredentials);
1188 }
1189 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1190 .await
1191 }
1192
1193 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1199 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1200 .await
1201 }
1202
1203 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1209 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1210 .await
1211 }
1212
1213 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1219 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1220 .await
1221 }
1222
1223 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1229 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1230 .await
1231 }
1232
1233 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1239 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1240 .await
1241 }
1242
1243 async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1245 self.cmd_tx
1246 .read()
1247 .await
1248 .send(cmd)
1249 .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1250 }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use ahash::AHashSet;
1256 use rstest::rstest;
1257 use ustr::Ustr;
1258
1259 use super::*;
1260
1261 #[rstest]
1262 fn test_reconnect_topics_restoration_logic() {
1263 let client = BitmexWebSocketClient::new(
1265 Some("ws://test.com".to_string()),
1266 Some("test_key".to_string()),
1267 Some("test_secret".to_string()),
1268 Some(AccountId::new("BITMEX-TEST")),
1269 None,
1270 )
1271 .unwrap();
1272
1273 let subs = client.subscriptions.confirmed();
1275 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1276 let mut set = AHashSet::new();
1277 set.insert(Ustr::from("XBTUSD"));
1278 set.insert(Ustr::from("ETHUSD"));
1279 set
1280 });
1281
1282 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1283 let mut set = AHashSet::new();
1284 set.insert(Ustr::from("XBTUSD"));
1285 set
1286 });
1287
1288 subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1290 let mut set = AHashSet::new();
1291 set.insert(Ustr::from(""));
1292 set
1293 });
1294 subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1295 let mut set = AHashSet::new();
1296 set.insert(Ustr::from(""));
1297 set
1298 });
1299
1300 let mut topics_to_restore = Vec::new();
1302 for entry in subs.iter() {
1303 let (channel, symbols) = entry.pair();
1304 for symbol in symbols {
1305 if symbol.is_empty() {
1306 topics_to_restore.push(channel.to_string());
1307 } else {
1308 topics_to_restore.push(format!("{channel}:{symbol}"));
1309 }
1310 }
1311 }
1312
1313 assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1315 assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1316 assert!(
1317 topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1318 );
1319 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1320 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1321 assert_eq!(topics_to_restore.len(), 5);
1322 }
1323
1324 #[rstest]
1325 fn test_reconnect_auth_message_building() {
1326 let client_with_creds = BitmexWebSocketClient::new(
1328 Some("ws://test.com".to_string()),
1329 Some("test_key".to_string()),
1330 Some("test_secret".to_string()),
1331 Some(AccountId::new("BITMEX-TEST")),
1332 None,
1333 )
1334 .unwrap();
1335
1336 if let Some(cred) = &client_with_creds.credential {
1338 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1339 let signature = cred.sign("GET", "/realtime", expires, "");
1340
1341 let auth_message = BitmexAuthentication {
1342 op: BitmexWsAuthAction::AuthKeyExpires,
1343 args: (cred.api_key().to_string(), expires, signature),
1344 };
1345
1346 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1348 assert_eq!(auth_message.args.0, "test_key");
1349 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
1352 panic!("Client should have credentials");
1353 }
1354
1355 let client_no_creds = BitmexWebSocketClient::new(
1357 Some("ws://test.com".to_string()),
1358 None,
1359 None,
1360 Some(AccountId::new("BITMEX-TEST")),
1361 None,
1362 )
1363 .unwrap();
1364
1365 assert!(client_no_creds.credential.is_none());
1366 }
1367
1368 #[rstest]
1369 fn test_subscription_state_after_unsubscribe() {
1370 let client = BitmexWebSocketClient::new(
1371 Some("ws://test.com".to_string()),
1372 Some("test_key".to_string()),
1373 Some("test_secret".to_string()),
1374 Some(AccountId::new("BITMEX-TEST")),
1375 None,
1376 )
1377 .unwrap();
1378
1379 let subs = client.subscriptions.confirmed();
1381 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1382 let mut set = AHashSet::new();
1383 set.insert(Ustr::from("XBTUSD"));
1384 set.insert(Ustr::from("ETHUSD"));
1385 set
1386 });
1387
1388 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1389 let mut set = AHashSet::new();
1390 set.insert(Ustr::from("XBTUSD"));
1391 set
1392 });
1393
1394 let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1396 if let Some((channel, symbol)) = topic.split_once(':')
1397 && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1398 {
1399 entry.remove(&Ustr::from(symbol));
1400 if entry.is_empty() {
1401 drop(entry);
1402 subs.remove(&Ustr::from(channel));
1403 }
1404 }
1405
1406 let mut topics_to_restore = Vec::new();
1408 for entry in subs.iter() {
1409 let (channel, symbols) = entry.pair();
1410 for symbol in symbols {
1411 if symbol.is_empty() {
1412 topics_to_restore.push(channel.to_string());
1413 } else {
1414 topics_to_restore.push(format!("{channel}:{symbol}"));
1415 }
1416 }
1417 }
1418
1419 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1421 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1422 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1423
1424 assert!(topics_to_restore.contains(&trade_xbt));
1425 assert!(!topics_to_restore.contains(&trade_eth));
1426 assert!(topics_to_restore.contains(&book_xbt));
1427 assert_eq!(topics_to_restore.len(), 2);
1428 }
1429
1430 #[rstest]
1431 fn test_race_unsubscribe_failure_recovery() {
1432 let client = BitmexWebSocketClient::new(
1438 Some("ws://test.com".to_string()),
1439 None,
1440 None,
1441 Some(AccountId::new("BITMEX-TEST")),
1442 None,
1443 )
1444 .unwrap();
1445
1446 let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1447
1448 client.subscriptions.mark_subscribe(&topic);
1450 client.subscriptions.confirm_subscribe(&topic);
1451 assert_eq!(client.subscriptions.len(), 1);
1452
1453 client.subscriptions.mark_unsubscribe(&topic);
1455 assert_eq!(client.subscriptions.len(), 0);
1456 assert_eq!(
1457 client.subscriptions.pending_unsubscribe_topics(),
1458 vec![topic.clone()]
1459 );
1460
1461 client.subscriptions.confirm_unsubscribe(&topic); client.subscriptions.mark_subscribe(&topic); client.subscriptions.confirm_subscribe(&topic); assert_eq!(client.subscriptions.len(), 1);
1469 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1470 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1471
1472 let all = client.subscriptions.all_topics();
1474 assert_eq!(all.len(), 1);
1475 assert!(all.contains(&topic));
1476 }
1477
1478 #[rstest]
1479 fn test_race_resubscribe_before_unsubscribe_ack() {
1480 let client = BitmexWebSocketClient::new(
1484 Some("ws://test.com".to_string()),
1485 None,
1486 None,
1487 Some(AccountId::new("BITMEX-TEST")),
1488 None,
1489 )
1490 .unwrap();
1491
1492 let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1493
1494 client.subscriptions.mark_subscribe(&topic);
1496 client.subscriptions.confirm_subscribe(&topic);
1497 assert_eq!(client.subscriptions.len(), 1);
1498
1499 client.subscriptions.mark_unsubscribe(&topic);
1501 assert_eq!(client.subscriptions.len(), 0);
1502 assert_eq!(
1503 client.subscriptions.pending_unsubscribe_topics(),
1504 vec![topic.clone()]
1505 );
1506
1507 client.subscriptions.mark_subscribe(&topic);
1509 assert_eq!(
1510 client.subscriptions.pending_subscribe_topics(),
1511 vec![topic.clone()]
1512 );
1513
1514 client.subscriptions.confirm_unsubscribe(&topic);
1516 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1517 assert_eq!(
1518 client.subscriptions.pending_subscribe_topics(),
1519 vec![topic.clone()]
1520 );
1521
1522 client.subscriptions.confirm_subscribe(&topic);
1524 assert_eq!(client.subscriptions.len(), 1);
1525 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1526
1527 let all = client.subscriptions.all_topics();
1529 assert_eq!(all.len(), 1);
1530 assert!(all.contains(&topic));
1531 }
1532
1533 #[rstest]
1534 fn test_race_channel_level_reconnection_with_pending_states() {
1535 let client = BitmexWebSocketClient::new(
1537 Some("ws://test.com".to_string()),
1538 Some("test_key".to_string()),
1539 Some("test_secret".to_string()),
1540 Some(AccountId::new("BITMEX-TEST")),
1541 None,
1542 )
1543 .unwrap();
1544
1545 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1548 client.subscriptions.mark_subscribe(&trade_xbt);
1549 client.subscriptions.confirm_subscribe(&trade_xbt);
1550
1551 let order_channel = BitmexWsAuthChannel::Order.as_ref();
1553 client.subscriptions.mark_subscribe(order_channel);
1554 client.subscriptions.confirm_subscribe(order_channel);
1555
1556 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1558 client.subscriptions.mark_subscribe(&trade_eth);
1559
1560 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1562 client.subscriptions.mark_subscribe(&book_xbt);
1563 client.subscriptions.confirm_subscribe(&book_xbt);
1564 client.subscriptions.mark_unsubscribe(&book_xbt);
1565
1566 let topics_to_restore = client.subscriptions.all_topics();
1568
1569 assert_eq!(topics_to_restore.len(), 3);
1571 assert!(topics_to_restore.contains(&trade_xbt));
1572 assert!(topics_to_restore.contains(&order_channel.to_string()));
1573 assert!(topics_to_restore.contains(&trade_eth));
1574 assert!(!topics_to_restore.contains(&book_xbt)); for topic in &topics_to_restore {
1579 if topic == order_channel {
1580 assert!(
1581 !topic.contains(':'),
1582 "Channel-level topic should not have delimiter"
1583 );
1584 }
1585 }
1586 }
1587}