Skip to main content

nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and parses incoming messages into structured Nautilus domain objects.
22
23use std::{
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU8, Ordering},
27    },
28    time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::get_runtime;
35use nautilus_core::{
36    consts::NAUTILUS_USER_AGENT,
37    env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40    data::bar::BarType,
41    enums::OrderType,
42    identifiers::{AccountId, ClientOrderId, InstrumentId},
43    instruments::{Instrument, InstrumentAny},
44};
45use nautilus_network::{
46    http::USER_AGENT,
47    mode::ConnectionMode,
48    websocket::{
49        AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
50        WebSocketConfig, channel_message_handler,
51    },
52};
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::{
57    enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
58    error::BitmexWsError,
59    handler::{FeedHandler, HandlerCommand},
60    messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
61    parse::{is_index_symbol, topic_from_bar_spec},
62};
63use crate::common::{
64    consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
65    credential::{Credential, credential_env_vars},
66};
67
68/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
69///
70/// Key runtime patterns:
71/// - Authentication handshakes are managed by the internal auth tracker, ensuring resubscriptions
72///   occur only after BitMEX acknowledges `authKey` messages.
73/// - The subscription state maintains pending and confirmed topics so reconnection replay is
74///   deterministic and per-topic errors are surfaced.
75#[derive(Clone, Debug)]
76#[cfg_attr(
77    feature = "python",
78    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
79)]
80pub struct BitmexWebSocketClient {
81    url: String,
82    credential: Option<Credential>,
83    heartbeat: Option<u64>,
84    account_id: AccountId,
85    auth_tracker: AuthTracker,
86    signal: Arc<AtomicBool>,
87    connection_mode: Arc<ArcSwap<AtomicU8>>,
88    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
89    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
90    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
91    subscriptions: SubscriptionState,
92    tracked_subscriptions: Arc<DashMap<String, ()>>,
93    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
94    order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
95    order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
96}
97
98impl BitmexWebSocketClient {
99    /// Creates a new [`BitmexWebSocketClient`] instance.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
104    pub fn new(
105        url: Option<String>,
106        api_key: Option<String>,
107        api_secret: Option<String>,
108        account_id: Option<AccountId>,
109        heartbeat: Option<u64>,
110    ) -> anyhow::Result<Self> {
111        let credential = match (api_key, api_secret) {
112            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
113            (None, None) => None,
114            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
115        };
116
117        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
118
119        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
120        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
121
122        // We don't have a handler yet; this placeholder keeps cache_instrument() working,
123        // connect() swaps in the real channel and replays any queued instruments so the
124        // handler sees them once it starts.
125        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
126
127        Ok(Self {
128            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
129            credential,
130            heartbeat,
131            account_id,
132            auth_tracker: AuthTracker::new(),
133            signal: Arc::new(AtomicBool::new(false)),
134            connection_mode,
135            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
136            out_rx: None,
137            task_handle: None,
138            subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
139            tracked_subscriptions: Arc::new(DashMap::new()),
140            instruments_cache: Arc::new(DashMap::new()),
141            order_type_cache: Arc::new(DashMap::new()),
142            order_symbol_cache: Arc::new(DashMap::new()),
143        })
144    }
145
146    /// Creates a new [`BitmexWebSocketClient`] with environment variable credential resolution.
147    ///
148    /// If `api_key` or `api_secret` are not provided, they will be loaded from
149    /// environment variables based on the `testnet` flag:
150    /// - Testnet: `BITMEX_TESTNET_API_KEY`, `BITMEX_TESTNET_API_SECRET`
151    /// - Mainnet: `BITMEX_API_KEY`, `BITMEX_API_SECRET`
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if only one of `api_key` or `api_secret` is provided.
156    pub fn new_with_env(
157        url: Option<String>,
158        api_key: Option<String>,
159        api_secret: Option<String>,
160        account_id: Option<AccountId>,
161        heartbeat: Option<u64>,
162        testnet: bool,
163    ) -> anyhow::Result<Self> {
164        let (api_key_env, api_secret_env) = credential_env_vars(testnet);
165
166        let key = get_or_env_var_opt(api_key, api_key_env);
167        let secret = get_or_env_var_opt(api_secret, api_secret_env);
168
169        Self::new(url, key, secret, account_id, heartbeat)
170    }
171
172    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if environment variables are not set or credentials are invalid.
177    pub fn from_env() -> anyhow::Result<Self> {
178        let url = get_env_var("BITMEX_WS_URL")?;
179        let (key_var, secret_var) = credential_env_vars(false);
180        let api_key = get_env_var(key_var)?;
181        let api_secret = get_env_var(secret_var)?;
182
183        Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
184    }
185
186    /// Returns the websocket url being used by the client.
187    #[must_use]
188    pub const fn url(&self) -> &str {
189        self.url.as_str()
190    }
191
192    /// Returns the public API key being used by the client.
193    #[must_use]
194    pub fn api_key(&self) -> Option<&str> {
195        self.credential.as_ref().map(|c| c.api_key())
196    }
197
198    /// Returns a masked version of the API key for logging purposes.
199    #[must_use]
200    pub fn api_key_masked(&self) -> Option<String> {
201        self.credential.as_ref().map(|c| c.api_key_masked())
202    }
203
204    /// Returns a value indicating whether the client is active.
205    #[must_use]
206    pub fn is_active(&self) -> bool {
207        let connection_mode_arc = self.connection_mode.load();
208        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
209            && !self.signal.load(Ordering::Relaxed)
210    }
211
212    /// Returns a value indicating whether the client is closed.
213    #[must_use]
214    pub fn is_closed(&self) -> bool {
215        let connection_mode_arc = self.connection_mode.load();
216        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
217            || self.signal.load(Ordering::Relaxed)
218    }
219
220    /// Sets the account ID.
221    pub fn set_account_id(&mut self, account_id: AccountId) {
222        self.account_id = account_id;
223    }
224
225    /// Caches multiple instruments.
226    ///
227    /// Clears the existing cache first, then adds all provided instruments.
228    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
229        self.instruments_cache.clear();
230        let mut count = 0;
231
232        log::debug!("Initializing BitMEX instrument cache");
233
234        for inst in instruments {
235            let symbol = inst.symbol().inner();
236            self.instruments_cache.insert(symbol, inst.clone());
237            log::debug!("Cached instrument: {symbol}");
238            count += 1;
239        }
240
241        log::info!("BitMEX instrument cache initialized with {count} instruments");
242    }
243
244    /// Caches a single instrument.
245    ///
246    /// Any existing instrument with the same symbol will be replaced.
247    pub fn cache_instrument(&self, instrument: InstrumentAny) {
248        self.instruments_cache
249            .insert(instrument.symbol().inner(), instrument.clone());
250
251        // Before connect() the handler isn't running; this send will fail and that's expected
252        // because connect() replays the instruments via InitializeInstruments
253        if let Ok(cmd_tx) = self.cmd_tx.try_read()
254            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
255        {
256            log::debug!("Failed to send instrument update to handler: {e}");
257        }
258    }
259
260    /// Connect to the BitMEX WebSocket server.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
265    ///
266    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
267        let (client, raw_rx) = self.connect_inner().await?;
268
269        // Reset shutdown signal so is_active() works after close+reconnect
270        self.signal.store(false, Ordering::Relaxed);
271
272        // Replace connection state so all clones see the underlying WebSocketClient's state
273        self.connection_mode.store(client.connection_mode_atomic());
274
275        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
276        self.out_rx = Some(Arc::new(out_rx));
277
278        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
279        *self.cmd_tx.write().await = cmd_tx.clone();
280
281        // Send WebSocketClient to handler
282        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
283            return Err(BitmexWsError::ClientError(format!(
284                "Failed to send WebSocketClient to handler: {e}"
285            )));
286        }
287
288        // Replay cached instruments to the new handler via the new channel
289        if !self.instruments_cache.is_empty() {
290            let cached_instruments: Vec<InstrumentAny> = self
291                .instruments_cache
292                .iter()
293                .map(|entry| entry.value().clone())
294                .collect();
295
296            if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
297                log::error!("Failed to replay instruments to handler: {e}");
298            }
299        }
300
301        let signal = self.signal.clone();
302        let account_id = self.account_id;
303        let credential = self.credential.clone();
304        let auth_tracker = self.auth_tracker.clone();
305        let subscriptions = self.subscriptions.clone();
306        let order_type_cache = self.order_type_cache.clone();
307        let order_symbol_cache = self.order_symbol_cache.clone();
308        let cmd_tx_for_reconnect = cmd_tx.clone();
309
310        let stream_handle = get_runtime().spawn(async move {
311            let mut handler = FeedHandler::new(
312                signal.clone(),
313                cmd_rx,
314                raw_rx,
315                out_tx,
316                account_id,
317                auth_tracker.clone(),
318                subscriptions.clone(),
319                order_type_cache,
320                order_symbol_cache,
321            );
322
323            // Helper closure to resubscribe all tracked subscriptions after reconnection
324            let resubscribe_all = || {
325                // Use SubscriptionState as source of truth for what to restore
326                let topics = subscriptions.all_topics();
327
328                if topics.is_empty() {
329                    return;
330                }
331
332                log::debug!(
333                    "Resubscribing to confirmed subscriptions: count={}",
334                    topics.len()
335                );
336
337                for topic in &topics {
338                    subscriptions.mark_subscribe(topic.as_str());
339                }
340
341                // Serialize subscription messages
342                let mut payloads = Vec::with_capacity(topics.len());
343                for topic in &topics {
344                    let message = BitmexSubscription {
345                        op: BitmexWsOperation::Subscribe,
346                        args: vec![Ustr::from(topic.as_ref())],
347                    };
348
349                    if let Ok(payload) = serde_json::to_string(&message) {
350                        payloads.push(payload);
351                    }
352                }
353
354                if let Err(e) =
355                    cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads })
356                {
357                    log::error!("Failed to send resubscribe command: {e}");
358                }
359            };
360
361            // Run message processing with reconnection handling
362            loop {
363                match handler.next().await {
364                    Some(NautilusWsMessage::Reconnected) => {
365                        if signal.load(Ordering::Relaxed) {
366                            continue;
367                        }
368
369                        log::info!("WebSocket reconnected");
370
371                        // Mark all confirmed subscriptions as failed so they transition to pending state
372                        let confirmed_topics: Vec<String> = {
373                            let confirmed = subscriptions.confirmed();
374                            let mut topics = Vec::new();
375
376                            for entry in confirmed.iter() {
377                                let (channel, symbols) = entry.pair();
378
379                                if *channel == BitmexWsTopic::Instrument.as_ref() {
380                                    continue;
381                                }
382
383                                for symbol in symbols {
384                                    if symbol.is_empty() {
385                                        topics.push(channel.to_string());
386                                    } else {
387                                        topics.push(format!("{channel}:{symbol}"));
388                                    }
389                                }
390                            }
391
392                            topics
393                        };
394
395                        if !confirmed_topics.is_empty() {
396                            log::debug!(
397                                "Marking confirmed subscriptions as pending for replay: count={}",
398                                confirmed_topics.len()
399                            );
400                            for topic in confirmed_topics {
401                                subscriptions.mark_failure(&topic);
402                            }
403                        }
404
405                        if let Some(cred) = &credential {
406                            log::debug!("Re-authenticating after reconnection");
407
408                            let expires =
409                                (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
410                            let signature = cred.sign("GET", "/realtime", expires, "");
411
412                            let auth_message = BitmexAuthentication {
413                                op: BitmexWsAuthAction::AuthKeyExpires,
414                                args: (cred.api_key().to_string(), expires, signature),
415                            };
416
417                            if let Ok(payload) = serde_json::to_string(&auth_message) {
418                                if let Err(e) = cmd_tx_for_reconnect
419                                    .send(HandlerCommand::Authenticate { payload })
420                                {
421                                    log::error!("Failed to send reconnection auth command: {e}");
422                                }
423                            } else {
424                                log::error!("Failed to serialize reconnection auth message");
425                            }
426                        }
427
428                        // Unauthenticated sessions resubscribe immediately after reconnection,
429                        // authenticated sessions wait for Authenticated message
430                        if credential.is_none() {
431                            log::debug!("No authentication required, resubscribing immediately");
432                            resubscribe_all();
433                        }
434
435                        if handler.send(NautilusWsMessage::Reconnected).is_err() {
436                            log::error!("Failed to forward reconnect event (receiver dropped)");
437                            break;
438                        }
439                    }
440                    Some(NautilusWsMessage::Authenticated) => {
441                        log::debug!("Authenticated after reconnection, resubscribing");
442                        resubscribe_all();
443                    }
444                    Some(msg) => {
445                        if handler.send(msg).is_err() {
446                            log::error!("Failed to send message (receiver dropped)");
447                            break;
448                        }
449                    }
450                    None => {
451                        // Stream ended - check if it's a stop signal
452                        if handler.is_stopped() {
453                            log::debug!("Stop signal received, ending message processing");
454                            break;
455                        }
456                        // Otherwise it's an unexpected stream end
457                        log::warn!("WebSocket stream ended unexpectedly");
458                        break;
459                    }
460                }
461            }
462
463            log::debug!("Handler task exiting");
464        });
465
466        self.task_handle = Some(Arc::new(stream_handle));
467
468        if self.credential.is_some()
469            && let Err(e) = self.authenticate().await
470        {
471            if let Some(handle) = self.task_handle.take() {
472                handle.abort();
473            }
474            self.signal.store(true, Ordering::Relaxed);
475            return Err(e);
476        }
477
478        // Subscribe to instrument topic
479        let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
480        self.subscriptions.mark_subscribe(&instrument_topic);
481        self.tracked_subscriptions.insert(instrument_topic, ());
482
483        let subscribe_msg = BitmexSubscription {
484            op: BitmexWsOperation::Subscribe,
485            args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
486        };
487
488        match serde_json::to_string(&subscribe_msg) {
489            Ok(subscribe_json) => {
490                if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
491                    topics: vec![subscribe_json],
492                }) {
493                    log::error!("Failed to send subscribe command for instruments: {e}");
494                } else {
495                    log::debug!("Subscribed to all instruments");
496                }
497            }
498            Err(e) => {
499                log::error!("Failed to serialize subscribe message: {e}");
500            }
501        }
502
503        Ok(())
504    }
505
506    /// Connect to the WebSocket and return a message receiver.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
511    async fn connect_inner(
512        &mut self,
513    ) -> Result<
514        (
515            WebSocketClient,
516            tokio::sync::mpsc::UnboundedReceiver<Message>,
517        ),
518        BitmexWsError,
519    > {
520        let (message_handler, rx) = channel_message_handler();
521
522        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
523        // in the message loop for minimal latency (see handler.rs pong response)
524        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
525            // Handler responds to pings internally via select! loop
526        });
527
528        let config = WebSocketConfig {
529            url: self.url.clone(),
530            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
531            heartbeat: self.heartbeat,
532            heartbeat_msg: None,
533            reconnect_timeout_ms: Some(5_000),
534            reconnect_delay_initial_ms: None, // Use default
535            reconnect_delay_max_ms: None,     // Use default
536            reconnect_backoff_factor: None,   // Use default
537            reconnect_jitter_ms: None,        // Use default
538            reconnect_max_attempts: None,
539            idle_timeout_ms: None,
540        };
541
542        let keyed_quotas = vec![];
543        let client = WebSocketClient::connect(
544            config,
545            Some(message_handler),
546            Some(ping_handler),
547            None, // post_reconnection
548            keyed_quotas,
549            None, // default_quota
550        )
551        .await
552        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
553
554        Ok((client, rx))
555    }
556
557    /// Authenticate the WebSocket connection using the provided credentials.
558    ///
559    /// # Errors
560    ///
561    /// Returns an error if the WebSocket is not connected, if authentication fails,
562    /// or if credentials are not available.
563    async fn authenticate(&self) -> Result<(), BitmexWsError> {
564        let credential = match &self.credential {
565            Some(credential) => credential,
566            None => {
567                return Err(BitmexWsError::AuthenticationError(
568                    "API credentials not available to authenticate".to_string(),
569                ));
570            }
571        };
572
573        let receiver = self.auth_tracker.begin();
574
575        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
576        let signature = credential.sign("GET", "/realtime", expires, "");
577
578        let auth_message = BitmexAuthentication {
579            op: BitmexWsAuthAction::AuthKeyExpires,
580            args: (credential.api_key().to_string(), expires, signature),
581        };
582
583        let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
584            let msg = format!("Failed to serialize auth message: {e}");
585            self.auth_tracker.fail(msg.clone());
586            BitmexWsError::AuthenticationError(msg)
587        })?;
588
589        // Send Authenticate command to handler
590        self.cmd_tx
591            .read()
592            .await
593            .send(HandlerCommand::Authenticate { payload: auth_json })
594            .map_err(|e| {
595                let msg = format!("Failed to send authenticate command: {e}");
596                self.auth_tracker.fail(msg.clone());
597                BitmexWsError::AuthenticationError(msg)
598            })?;
599
600        self.auth_tracker
601            .wait_for_result::<BitmexWsError>(
602                Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
603                receiver,
604            )
605            .await
606    }
607
608    /// Wait until the WebSocket connection is active.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the connection times out.
613    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
614        let timeout = Duration::from_secs_f64(timeout_secs);
615
616        tokio::time::timeout(timeout, async {
617            while !self.is_active() {
618                tokio::time::sleep(Duration::from_millis(10)).await;
619            }
620        })
621        .await
622        .map_err(|_| {
623            BitmexWsError::ClientError(format!(
624                "WebSocket connection timeout after {timeout_secs} seconds"
625            ))
626        })?;
627
628        Ok(())
629    }
630
631    /// Provides the internal stream as a channel-based stream.
632    ///
633    /// # Panics
634    ///
635    /// This function panics:
636    /// - If the websocket is not connected.
637    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
638    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
639        let rx = self
640            .out_rx
641            .take()
642            .expect("Stream receiver already taken or not connected");
643        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
644        async_stream::stream! {
645            while let Some(msg) = rx.recv().await {
646                yield msg;
647            }
648        }
649    }
650
651    /// Closes the client.
652    ///
653    /// # Errors
654    ///
655    /// Returns an error if the WebSocket is not connected or if closing fails.
656    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
657        log::debug!("Starting close process");
658
659        self.signal.store(true, Ordering::Relaxed);
660
661        // Send Disconnect command to handler
662        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
663            log::debug!(
664                "Failed to send disconnect command (handler may already be shut down): {e}"
665            );
666        }
667
668        // Clean up task handle with timeout
669        if let Some(task_handle) = self.task_handle.take() {
670            match Arc::try_unwrap(task_handle) {
671                Ok(handle) => {
672                    log::debug!("Waiting for task handle to complete");
673                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
674                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
675                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
676                        Err(_) => {
677                            log::warn!(
678                                "Timeout waiting for task handle, task may still be running"
679                            );
680                            // The task will be dropped and should clean up automatically
681                        }
682                    }
683                }
684                Err(arc_handle) => {
685                    log::debug!(
686                        "Cannot take ownership of task handle - other references exist, aborting task"
687                    );
688                    arc_handle.abort();
689                }
690            }
691        } else {
692            log::debug!("No task handle to await");
693        }
694
695        log::debug!("Closed");
696
697        Ok(())
698    }
699
700    /// Subscribe to the specified topics.
701    ///
702    /// # Errors
703    ///
704    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
705    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
706        log::debug!("Subscribing to topics: {topics:?}");
707
708        for topic in &topics {
709            self.subscriptions.mark_subscribe(topic.as_str());
710            self.tracked_subscriptions.insert(topic.clone(), ());
711        }
712
713        // Serialize subscription messages
714        let mut payloads = Vec::with_capacity(topics.len());
715        for topic in &topics {
716            let message = BitmexSubscription {
717                op: BitmexWsOperation::Subscribe,
718                args: vec![Ustr::from(topic.as_ref())],
719            };
720            let payload = serde_json::to_string(&message).map_err(|e| {
721                BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
722            })?;
723            payloads.push(payload);
724        }
725
726        // Send Subscribe command to handler
727        let cmd = HandlerCommand::Subscribe { topics: payloads };
728
729        self.send_cmd(cmd).await.map_err(|e| {
730            BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
731        })
732    }
733
734    /// Unsubscribe from the specified topics.
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
739    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
740        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
741
742        if self.signal.load(Ordering::Relaxed) {
743            log::debug!("Shutdown signal detected, skipping unsubscribe");
744            return Ok(());
745        }
746
747        for topic in &topics {
748            self.subscriptions.mark_unsubscribe(topic.as_str());
749            self.tracked_subscriptions.remove(topic);
750        }
751
752        // Serialize unsubscription messages
753        let mut payloads = Vec::with_capacity(topics.len());
754        for topic in &topics {
755            let message = BitmexSubscription {
756                op: BitmexWsOperation::Unsubscribe,
757                args: vec![Ustr::from(topic.as_ref())],
758            };
759
760            if let Ok(payload) = serde_json::to_string(&message) {
761                payloads.push(payload);
762            }
763        }
764
765        // Send Unsubscribe command to handler
766        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
767
768        if let Err(e) = self.send_cmd(cmd).await {
769            log::debug!("Failed to send unsubscribe command: {e}");
770        }
771
772        Ok(())
773    }
774
775    /// Get the current number of active subscriptions.
776    #[must_use]
777    pub fn subscription_count(&self) -> usize {
778        self.subscriptions.len()
779    }
780
781    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
782        let symbol = instrument_id.symbol.inner();
783        let confirmed = self.subscriptions.confirmed();
784        let mut channels = Vec::with_capacity(confirmed.len());
785
786        for entry in confirmed.iter() {
787            let (channel, symbols) = entry.pair();
788            if symbols.contains(&symbol) {
789                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
790                channels.push(format!("{channel}:{symbol}"));
791            } else {
792                let has_channel_marker = symbols.iter().any(|s| s.is_empty());
793                if has_channel_marker
794                    && (*channel == BitmexWsAuthChannel::Execution.as_ref()
795                        || *channel == BitmexWsAuthChannel::Order.as_ref())
796                {
797                    // These are account-level subscriptions without symbols
798                    channels.push(channel.to_string());
799                }
800            }
801        }
802
803        channels
804    }
805
806    /// Subscribe to instrument updates for all instruments on the venue.
807    ///
808    /// # Errors
809    ///
810    /// Returns an error if the WebSocket is not connected or if the subscription fails.
811    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
812        // Already subscribed automatically on connection
813        log::debug!("Already subscribed to all instruments on connection, skipping");
814        Ok(())
815    }
816
817    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if the WebSocket is not connected or if the subscription fails.
822    pub async fn subscribe_instrument(
823        &self,
824        instrument_id: InstrumentId,
825    ) -> Result<(), BitmexWsError> {
826        // Already subscribed to all instruments on connection
827        log::debug!(
828            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
829        );
830        Ok(())
831    }
832
833    /// Subscribe to order book updates for the specified instrument.
834    ///
835    /// # Errors
836    ///
837    /// Returns an error if the WebSocket is not connected or if the subscription fails.
838    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
839        let topic = BitmexWsTopic::OrderBookL2;
840        let symbol = instrument_id.symbol.inner();
841        self.subscribe(vec![format!("{topic}:{symbol}")]).await
842    }
843
844    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
845    ///
846    /// # Errors
847    ///
848    /// Returns an error if the WebSocket is not connected or if the subscription fails.
849    pub async fn subscribe_book_25(
850        &self,
851        instrument_id: InstrumentId,
852    ) -> Result<(), BitmexWsError> {
853        let topic = BitmexWsTopic::OrderBookL2_25;
854        let symbol = instrument_id.symbol.inner();
855        self.subscribe(vec![format!("{topic}:{symbol}")]).await
856    }
857
858    /// Subscribe to order book depth 10 updates for the specified instrument.
859    ///
860    /// # Errors
861    ///
862    /// Returns an error if the WebSocket is not connected or if the subscription fails.
863    pub async fn subscribe_book_depth10(
864        &self,
865        instrument_id: InstrumentId,
866    ) -> Result<(), BitmexWsError> {
867        let topic = BitmexWsTopic::OrderBook10;
868        let symbol = instrument_id.symbol.inner();
869        self.subscribe(vec![format!("{topic}:{symbol}")]).await
870    }
871
872    /// Subscribe to quote updates for the specified instrument.
873    ///
874    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
875    ///
876    /// # Errors
877    ///
878    /// Returns an error if the WebSocket is not connected or if the subscription fails.
879    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
880        let symbol = instrument_id.symbol.inner();
881
882        // Index symbols don't have quotes (bid/ask), only a single price
883        if is_index_symbol(&instrument_id.symbol.inner()) {
884            log::warn!("Ignoring quote subscription for index symbol: {symbol}");
885            return Ok(());
886        }
887
888        let topic = BitmexWsTopic::Quote;
889        self.subscribe(vec![format!("{topic}:{symbol}")]).await
890    }
891
892    /// Subscribe to trade updates for the specified instrument.
893    ///
894    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
895    ///
896    /// # Errors
897    ///
898    /// Returns an error if the WebSocket is not connected or if the subscription fails.
899    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
900        let symbol = instrument_id.symbol.inner();
901
902        // Index symbols don't have trades
903        if is_index_symbol(&symbol) {
904            log::warn!("Ignoring trade subscription for index symbol: {symbol}");
905            return Ok(());
906        }
907
908        let topic = BitmexWsTopic::Trade;
909        self.subscribe(vec![format!("{topic}:{symbol}")]).await
910    }
911
912    /// Subscribe to mark price updates for the specified instrument.
913    ///
914    /// # Errors
915    ///
916    /// Returns an error if the WebSocket is not connected or if the subscription fails.
917    pub async fn subscribe_mark_prices(
918        &self,
919        instrument_id: InstrumentId,
920    ) -> Result<(), BitmexWsError> {
921        self.subscribe_instrument(instrument_id).await
922    }
923
924    /// Subscribe to index price updates for the specified instrument.
925    ///
926    /// # Errors
927    ///
928    /// Returns an error if the WebSocket is not connected or if the subscription fails.
929    pub async fn subscribe_index_prices(
930        &self,
931        instrument_id: InstrumentId,
932    ) -> Result<(), BitmexWsError> {
933        self.subscribe_instrument(instrument_id).await
934    }
935
936    /// Subscribe to funding rate updates for the specified instrument.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if the WebSocket is not connected or if the subscription fails.
941    pub async fn subscribe_funding_rates(
942        &self,
943        instrument_id: InstrumentId,
944    ) -> Result<(), BitmexWsError> {
945        let topic = BitmexWsTopic::Funding;
946        let symbol = instrument_id.symbol.inner();
947        self.subscribe(vec![format!("{topic}:{symbol}")]).await
948    }
949
950    /// Subscribe to bar updates for the specified bar type.
951    ///
952    /// # Errors
953    ///
954    /// Returns an error if the WebSocket is not connected or if the subscription fails.
955    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
956        let topic = topic_from_bar_spec(bar_type.spec());
957        let symbol = bar_type.instrument_id().symbol.inner();
958        self.subscribe(vec![format!("{topic}:{symbol}")]).await
959    }
960
961    /// Unsubscribe from instrument updates for all instruments on the venue.
962    ///
963    /// # Errors
964    ///
965    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
966    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
967        // No-op: instruments are required for proper operation
968        log::debug!(
969            "Instruments subscription maintained for proper operation, skipping unsubscribe"
970        );
971        Ok(())
972    }
973
974    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
975    ///
976    /// # Errors
977    ///
978    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
979    pub async fn unsubscribe_instrument(
980        &self,
981        instrument_id: InstrumentId,
982    ) -> Result<(), BitmexWsError> {
983        // No-op: instruments are required for proper operation
984        log::debug!(
985            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
986        );
987        Ok(())
988    }
989
990    /// Unsubscribe from order book updates for the specified instrument.
991    ///
992    /// # Errors
993    ///
994    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
995    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
996        let topic = BitmexWsTopic::OrderBookL2;
997        let symbol = instrument_id.symbol.inner();
998        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
999    }
1000
1001    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
1002    ///
1003    /// # Errors
1004    ///
1005    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1006    pub async fn unsubscribe_book_25(
1007        &self,
1008        instrument_id: InstrumentId,
1009    ) -> Result<(), BitmexWsError> {
1010        let topic = BitmexWsTopic::OrderBookL2_25;
1011        let symbol = instrument_id.symbol.inner();
1012        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1013    }
1014
1015    /// Unsubscribe from order book depth 10 updates for the specified instrument.
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1020    pub async fn unsubscribe_book_depth10(
1021        &self,
1022        instrument_id: InstrumentId,
1023    ) -> Result<(), BitmexWsError> {
1024        let topic = BitmexWsTopic::OrderBook10;
1025        let symbol = instrument_id.symbol.inner();
1026        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1027    }
1028
1029    /// Unsubscribe from quote updates for the specified instrument.
1030    ///
1031    /// # Errors
1032    ///
1033    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1034    pub async fn unsubscribe_quotes(
1035        &self,
1036        instrument_id: InstrumentId,
1037    ) -> Result<(), BitmexWsError> {
1038        let symbol = instrument_id.symbol.inner();
1039
1040        // Index symbols don't have quotes
1041        if is_index_symbol(&symbol) {
1042            return Ok(());
1043        }
1044
1045        let topic = BitmexWsTopic::Quote;
1046        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1047    }
1048
1049    /// Unsubscribe from trade updates for the specified instrument.
1050    ///
1051    /// # Errors
1052    ///
1053    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1054    pub async fn unsubscribe_trades(
1055        &self,
1056        instrument_id: InstrumentId,
1057    ) -> Result<(), BitmexWsError> {
1058        let symbol = instrument_id.symbol.inner();
1059
1060        // Index symbols don't have trades
1061        if is_index_symbol(&symbol) {
1062            return Ok(());
1063        }
1064
1065        let topic = BitmexWsTopic::Trade;
1066        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1067    }
1068
1069    /// Unsubscribe from mark price updates for the specified instrument.
1070    ///
1071    /// # Errors
1072    ///
1073    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1074    pub async fn unsubscribe_mark_prices(
1075        &self,
1076        instrument_id: InstrumentId,
1077    ) -> Result<(), BitmexWsError> {
1078        // No-op: instrument channel shared with index prices
1079        log::debug!(
1080            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1081        );
1082        Ok(())
1083    }
1084
1085    /// Unsubscribe from index price updates for the specified instrument.
1086    ///
1087    /// # Errors
1088    ///
1089    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1090    pub async fn unsubscribe_index_prices(
1091        &self,
1092        instrument_id: InstrumentId,
1093    ) -> Result<(), BitmexWsError> {
1094        // No-op: instrument channel shared with mark prices
1095        log::debug!(
1096            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1097        );
1098        Ok(())
1099    }
1100
1101    /// Unsubscribe from funding rate updates for the specified instrument.
1102    ///
1103    /// # Errors
1104    ///
1105    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1106    pub async fn unsubscribe_funding_rates(
1107        &self,
1108        instrument_id: InstrumentId,
1109    ) -> Result<(), BitmexWsError> {
1110        // No-op: unsubscribing during shutdown causes race conditions
1111        log::debug!(
1112            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1113        );
1114        Ok(())
1115    }
1116
1117    /// Unsubscribe from bar updates for the specified bar type.
1118    ///
1119    /// # Errors
1120    ///
1121    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1122    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1123        let topic = topic_from_bar_spec(bar_type.spec());
1124        let symbol = bar_type.instrument_id().symbol.inner();
1125        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1126    }
1127
1128    /// Subscribe to order updates for the authenticated account.
1129    ///
1130    /// # Errors
1131    ///
1132    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1133    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1134        if self.credential.is_none() {
1135            return Err(BitmexWsError::MissingCredentials);
1136        }
1137        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1138            .await
1139    }
1140
1141    /// Subscribe to execution updates for the authenticated account.
1142    ///
1143    /// # Errors
1144    ///
1145    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1146    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1147        if self.credential.is_none() {
1148            return Err(BitmexWsError::MissingCredentials);
1149        }
1150        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1151            .await
1152    }
1153
1154    /// Subscribe to position updates for the authenticated account.
1155    ///
1156    /// # Errors
1157    ///
1158    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1159    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1160        if self.credential.is_none() {
1161            return Err(BitmexWsError::MissingCredentials);
1162        }
1163        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1164            .await
1165    }
1166
1167    /// Subscribe to margin updates for the authenticated account.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1172    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1173        if self.credential.is_none() {
1174            return Err(BitmexWsError::MissingCredentials);
1175        }
1176        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1177            .await
1178    }
1179
1180    /// Subscribe to wallet updates for the authenticated account.
1181    ///
1182    /// # Errors
1183    ///
1184    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1185    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1186        if self.credential.is_none() {
1187            return Err(BitmexWsError::MissingCredentials);
1188        }
1189        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1190            .await
1191    }
1192
1193    /// Unsubscribe from order updates for the authenticated account.
1194    ///
1195    /// # Errors
1196    ///
1197    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1198    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1199        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1200            .await
1201    }
1202
1203    /// Unsubscribe from execution updates for the authenticated account.
1204    ///
1205    /// # Errors
1206    ///
1207    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1208    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1209        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1210            .await
1211    }
1212
1213    /// Unsubscribe from position updates for the authenticated account.
1214    ///
1215    /// # Errors
1216    ///
1217    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1218    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1219        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1220            .await
1221    }
1222
1223    /// Unsubscribe from margin updates for the authenticated account.
1224    ///
1225    /// # Errors
1226    ///
1227    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1228    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1229        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1230            .await
1231    }
1232
1233    /// Unsubscribe from wallet updates for the authenticated account.
1234    ///
1235    /// # Errors
1236    ///
1237    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1238    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1239        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1240            .await
1241    }
1242
1243    /// Sends a command to the handler.
1244    async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1245        self.cmd_tx
1246            .read()
1247            .await
1248            .send(cmd)
1249            .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1250    }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use ahash::AHashSet;
1256    use rstest::rstest;
1257    use ustr::Ustr;
1258
1259    use super::*;
1260
1261    #[rstest]
1262    fn test_reconnect_topics_restoration_logic() {
1263        // Create real client with credentials
1264        let client = BitmexWebSocketClient::new(
1265            Some("ws://test.com".to_string()),
1266            Some("test_key".to_string()),
1267            Some("test_secret".to_string()),
1268            Some(AccountId::new("BITMEX-TEST")),
1269            None,
1270        )
1271        .unwrap();
1272
1273        // Populate subscriptions like they would be during normal operation
1274        let subs = client.subscriptions.confirmed();
1275        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1276            let mut set = AHashSet::new();
1277            set.insert(Ustr::from("XBTUSD"));
1278            set.insert(Ustr::from("ETHUSD"));
1279            set
1280        });
1281
1282        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1283            let mut set = AHashSet::new();
1284            set.insert(Ustr::from("XBTUSD"));
1285            set
1286        });
1287
1288        // Private channels (no symbols)
1289        subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1290            let mut set = AHashSet::new();
1291            set.insert(Ustr::from(""));
1292            set
1293        });
1294        subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1295            let mut set = AHashSet::new();
1296            set.insert(Ustr::from(""));
1297            set
1298        });
1299
1300        // Test the actual reconnection topic building logic
1301        let mut topics_to_restore = Vec::new();
1302        for entry in subs.iter() {
1303            let (channel, symbols) = entry.pair();
1304            for symbol in symbols {
1305                if symbol.is_empty() {
1306                    topics_to_restore.push(channel.to_string());
1307                } else {
1308                    topics_to_restore.push(format!("{channel}:{symbol}"));
1309                }
1310            }
1311        }
1312
1313        // Verify it builds the correct restoration topics
1314        assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1315        assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1316        assert!(
1317            topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1318        );
1319        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1320        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1321        assert_eq!(topics_to_restore.len(), 5);
1322    }
1323
1324    #[rstest]
1325    fn test_reconnect_auth_message_building() {
1326        // Test with credentials
1327        let client_with_creds = BitmexWebSocketClient::new(
1328            Some("ws://test.com".to_string()),
1329            Some("test_key".to_string()),
1330            Some("test_secret".to_string()),
1331            Some(AccountId::new("BITMEX-TEST")),
1332            None,
1333        )
1334        .unwrap();
1335
1336        // Test the actual auth message building logic from lines 220-228
1337        if let Some(cred) = &client_with_creds.credential {
1338            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1339            let signature = cred.sign("GET", "/realtime", expires, "");
1340
1341            let auth_message = BitmexAuthentication {
1342                op: BitmexWsAuthAction::AuthKeyExpires,
1343                args: (cred.api_key().to_string(), expires, signature),
1344            };
1345
1346            // Verify auth message structure
1347            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1348            assert_eq!(auth_message.args.0, "test_key");
1349            assert!(auth_message.args.1 > 0); // expires should be positive
1350            assert!(!auth_message.args.2.is_empty()); // signature should exist
1351        } else {
1352            panic!("Client should have credentials");
1353        }
1354
1355        // Test without credentials
1356        let client_no_creds = BitmexWebSocketClient::new(
1357            Some("ws://test.com".to_string()),
1358            None,
1359            None,
1360            Some(AccountId::new("BITMEX-TEST")),
1361            None,
1362        )
1363        .unwrap();
1364
1365        assert!(client_no_creds.credential.is_none());
1366    }
1367
1368    #[rstest]
1369    fn test_subscription_state_after_unsubscribe() {
1370        let client = BitmexWebSocketClient::new(
1371            Some("ws://test.com".to_string()),
1372            Some("test_key".to_string()),
1373            Some("test_secret".to_string()),
1374            Some(AccountId::new("BITMEX-TEST")),
1375            None,
1376        )
1377        .unwrap();
1378
1379        // Set up initial subscriptions
1380        let subs = client.subscriptions.confirmed();
1381        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1382            let mut set = AHashSet::new();
1383            set.insert(Ustr::from("XBTUSD"));
1384            set.insert(Ustr::from("ETHUSD"));
1385            set
1386        });
1387
1388        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1389            let mut set = AHashSet::new();
1390            set.insert(Ustr::from("XBTUSD"));
1391            set
1392        });
1393
1394        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
1395        let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1396        if let Some((channel, symbol)) = topic.split_once(':')
1397            && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1398        {
1399            entry.remove(&Ustr::from(symbol));
1400            if entry.is_empty() {
1401                drop(entry);
1402                subs.remove(&Ustr::from(channel));
1403            }
1404        }
1405
1406        // Build restoration topics after unsubscribe
1407        let mut topics_to_restore = Vec::new();
1408        for entry in subs.iter() {
1409            let (channel, symbols) = entry.pair();
1410            for symbol in symbols {
1411                if symbol.is_empty() {
1412                    topics_to_restore.push(channel.to_string());
1413                } else {
1414                    topics_to_restore.push(format!("{channel}:{symbol}"));
1415                }
1416            }
1417        }
1418
1419        // Should have XBTUSD trade but not ETHUSD trade
1420        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1421        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1422        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1423
1424        assert!(topics_to_restore.contains(&trade_xbt));
1425        assert!(!topics_to_restore.contains(&trade_eth));
1426        assert!(topics_to_restore.contains(&book_xbt));
1427        assert_eq!(topics_to_restore.len(), 2);
1428    }
1429
1430    #[rstest]
1431    fn test_race_unsubscribe_failure_recovery() {
1432        // Simulates the race condition where venue rejects an unsubscribe request.
1433        // The adapter must perform the 3-step recovery:
1434        // 1. confirm_unsubscribe() - clear pending_unsubscribe
1435        // 2. mark_subscribe() - mark as subscribing again
1436        // 3. confirm_subscribe() - restore to confirmed state
1437        let client = BitmexWebSocketClient::new(
1438            Some("ws://test.com".to_string()),
1439            None,
1440            None,
1441            Some(AccountId::new("BITMEX-TEST")),
1442            None,
1443        )
1444        .unwrap();
1445
1446        let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1447
1448        // Initial subscribe flow
1449        client.subscriptions.mark_subscribe(&topic);
1450        client.subscriptions.confirm_subscribe(&topic);
1451        assert_eq!(client.subscriptions.len(), 1);
1452
1453        // User unsubscribes
1454        client.subscriptions.mark_unsubscribe(&topic);
1455        assert_eq!(client.subscriptions.len(), 0);
1456        assert_eq!(
1457            client.subscriptions.pending_unsubscribe_topics(),
1458            vec![topic.clone()]
1459        );
1460
1461        // Venue REJECTS the unsubscribe (error message)
1462        // Adapter must perform 3-step recovery (from lines 1884-1891)
1463        client.subscriptions.confirm_unsubscribe(&topic); // Step 1: clear pending_unsubscribe
1464        client.subscriptions.mark_subscribe(&topic); // Step 2: mark as subscribing
1465        client.subscriptions.confirm_subscribe(&topic); // Step 3: confirm subscription
1466
1467        // Verify recovery: topic should be back in confirmed state
1468        assert_eq!(client.subscriptions.len(), 1);
1469        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1470        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1471
1472        // Verify topic is in all_topics() for reconnect
1473        let all = client.subscriptions.all_topics();
1474        assert_eq!(all.len(), 1);
1475        assert!(all.contains(&topic));
1476    }
1477
1478    #[rstest]
1479    fn test_race_resubscribe_before_unsubscribe_ack() {
1480        // Simulates: User unsubscribes, then immediately resubscribes before
1481        // the unsubscribe ACK arrives from the venue.
1482        // This is the race condition fixed in the subscription tracker.
1483        let client = BitmexWebSocketClient::new(
1484            Some("ws://test.com".to_string()),
1485            None,
1486            None,
1487            Some(AccountId::new("BITMEX-TEST")),
1488            None,
1489        )
1490        .unwrap();
1491
1492        let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1493
1494        // Initial subscribe
1495        client.subscriptions.mark_subscribe(&topic);
1496        client.subscriptions.confirm_subscribe(&topic);
1497        assert_eq!(client.subscriptions.len(), 1);
1498
1499        // User unsubscribes
1500        client.subscriptions.mark_unsubscribe(&topic);
1501        assert_eq!(client.subscriptions.len(), 0);
1502        assert_eq!(
1503            client.subscriptions.pending_unsubscribe_topics(),
1504            vec![topic.clone()]
1505        );
1506
1507        // User immediately changes mind and resubscribes (before unsubscribe ACK)
1508        client.subscriptions.mark_subscribe(&topic);
1509        assert_eq!(
1510            client.subscriptions.pending_subscribe_topics(),
1511            vec![topic.clone()]
1512        );
1513
1514        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
1515        client.subscriptions.confirm_unsubscribe(&topic);
1516        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1517        assert_eq!(
1518            client.subscriptions.pending_subscribe_topics(),
1519            vec![topic.clone()]
1520        );
1521
1522        // Subscribe ACK arrives
1523        client.subscriptions.confirm_subscribe(&topic);
1524        assert_eq!(client.subscriptions.len(), 1);
1525        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1526
1527        // Verify final state is correct
1528        let all = client.subscriptions.all_topics();
1529        assert_eq!(all.len(), 1);
1530        assert!(all.contains(&topic));
1531    }
1532
1533    #[rstest]
1534    fn test_race_channel_level_reconnection_with_pending_states() {
1535        // Simulates reconnection with mixed pending states including channel-level subscriptions.
1536        let client = BitmexWebSocketClient::new(
1537            Some("ws://test.com".to_string()),
1538            Some("test_key".to_string()),
1539            Some("test_secret".to_string()),
1540            Some(AccountId::new("BITMEX-TEST")),
1541            None,
1542        )
1543        .unwrap();
1544
1545        // Set up mixed state before reconnection
1546        // Confirmed: trade:XBTUSD
1547        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1548        client.subscriptions.mark_subscribe(&trade_xbt);
1549        client.subscriptions.confirm_subscribe(&trade_xbt);
1550
1551        // Confirmed: order (channel-level, no symbol)
1552        let order_channel = BitmexWsAuthChannel::Order.as_ref();
1553        client.subscriptions.mark_subscribe(order_channel);
1554        client.subscriptions.confirm_subscribe(order_channel);
1555
1556        // Pending subscribe: trade:ETHUSD
1557        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1558        client.subscriptions.mark_subscribe(&trade_eth);
1559
1560        // Pending unsubscribe: orderBookL2:XBTUSD (user cancelled)
1561        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1562        client.subscriptions.mark_subscribe(&book_xbt);
1563        client.subscriptions.confirm_subscribe(&book_xbt);
1564        client.subscriptions.mark_unsubscribe(&book_xbt);
1565
1566        // Get topics for reconnection
1567        let topics_to_restore = client.subscriptions.all_topics();
1568
1569        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
1570        assert_eq!(topics_to_restore.len(), 3);
1571        assert!(topics_to_restore.contains(&trade_xbt));
1572        assert!(topics_to_restore.contains(&order_channel.to_string()));
1573        assert!(topics_to_restore.contains(&trade_eth));
1574        assert!(!topics_to_restore.contains(&book_xbt)); // Excluded
1575
1576        // Verify channel-level marker is handled correctly
1577        // order channel should not have ':' delimiter
1578        for topic in &topics_to_restore {
1579            if topic == order_channel {
1580                assert!(
1581                    !topic.contains(':'),
1582                    "Channel-level topic should not have delimiter"
1583                );
1584            }
1585        }
1586    }
1587}