Skip to main content

ccxt_exchanges/binance/ws/
mod.rs

1//! Binance WebSocket implementation
2//!
3//! Provides WebSocket real-time data stream subscriptions for the Binance exchange.
4//!
5//! # Architecture
6//!
7//! The WebSocket implementation is organized into several modules:
8//!
9//! - [`types`] - Configuration structures and enums (`BinanceWsConfig`, `DepthLevel`, etc.)
10//! - [`parsers`] - Stream message parsers implementing [`StreamParser`] trait
11//! - [`connection_manager`] - Connection sharding and management
12//! - `handlers` - Message routing and dispatch
13//! - `subscriptions` - Subscription tracking and management
14//!
15//! # Usage
16//!
17//! ```rust,ignore
18//! use ccxt_exchanges::binance::ws::{BinanceWs, BinanceWsConfig, DepthLevel, UpdateSpeed};
19//!
20//! // Create a WebSocket client
21//! let ws = BinanceWs::new("wss://stream.binance.com:9443/ws".to_string());
22//!
23//! // Subscribe to ticker updates
24//! let ticker = ws.watch_ticker("BTC/USDT", MarketType::Spot).await?;
25//!
26//! // Check connection health
27//! let health = ws.health();
28//! println!("Latency: {:?}ms", health.latency_ms);
29//! ```
30//!
31//! # Health Monitoring
32//!
33//! Use the [`BinanceWs::health`] method to get connection metrics:
34//! - Latency (from ping/pong)
35//! - Message counts (received/dropped)
36//! - Connection uptime
37//! - Reconnection count
38//!
39//! # Error Handling
40//!
41//! Errors are classified into recovery strategies via [`WsErrorRecovery`]:
42//! - `Retry` - Transient errors, retry with backoff
43//! - `Resync` - State out of sync, need to refresh (e.g., orderbook)
44//! - `Reconnect` - Connection lost, need to reconnect
45//! - `Fatal` - Unrecoverable, requires user intervention
46
47/// Connection manager module
48pub mod connection_manager;
49mod handlers;
50mod listen_key;
51/// Stream parsers for WebSocket messages
52pub mod parsers;
53mod streams;
54mod subscriptions;
55/// Types and configuration for WebSocket
56pub mod types;
57pub(crate) mod user_data;
58
59// Re-export public types for backward compatibility
60pub use connection_manager::BinanceConnectionManager;
61pub use handlers::MessageRouter;
62pub use listen_key::ListenKeyManager;
63pub use parsers::{
64    BidAskParser, MarkPriceParser, OhlcvParser, StreamParser, TickerParser, TradeParser,
65};
66pub use streams::normalize_symbol;
67pub use subscriptions::{
68    ReconnectConfig, Subscription, SubscriptionHandle, SubscriptionManager, SubscriptionType,
69};
70pub use types::{
71    BinanceWsConfig, DepthLevel, UpdateSpeed, WsChannelConfig, WsErrorRecovery, WsHealthSnapshot,
72};
73
74use crate::binance::{Binance, parser};
75use ccxt_core::error::{Error, Result};
76use ccxt_core::types::{
77    Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
78};
79use serde_json::Value;
80use std::collections::{HashMap, VecDeque};
81use std::sync::Arc;
82use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
83use std::time::Duration;
84use tokio::sync::{Mutex, RwLock};
85
86const MAX_TRADES: usize = 1000;
87const MAX_OHLCVS: usize = 1000;
88/// Default shutdown timeout in milliseconds
89const DEFAULT_SHUTDOWN_TIMEOUT_MS: u64 = 5000;
90
91/// Binance WebSocket client wrapper
92pub struct BinanceWs {
93    pub(crate) message_router: Arc<MessageRouter>,
94    pub(crate) subscription_manager: Arc<SubscriptionManager>,
95    listen_key: Arc<RwLock<Option<String>>>,
96    listen_key_manager: Option<Arc<ListenKeyManager>>,
97    pub(crate) tickers: Arc<Mutex<HashMap<String, Ticker>>>,
98    pub(crate) bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
99    #[allow(dead_code)]
100    mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
101    pub(crate) orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
102    pub(crate) trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
103    pub(crate) ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
104    pub(crate) balances: Arc<RwLock<HashMap<String, Balance>>>,
105    pub(crate) orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
106    pub(crate) my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
107    pub(crate) positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
108    /// Channel capacity configuration
109    channel_config: WsChannelConfig,
110    /// Statistics for health monitoring
111    messages_received: Arc<AtomicU64>,
112    messages_dropped: Arc<AtomicU64>,
113    last_message_time: Arc<AtomicU64>,
114    connection_start_time: Arc<AtomicU64>,
115    /// Shutdown state tracking
116    is_shutting_down: Arc<AtomicBool>,
117    shutdown_complete: Arc<AtomicBool>,
118}
119
120impl std::fmt::Debug for BinanceWs {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("BinanceWs")
123            .field("is_connected", &self.message_router.is_connected())
124            .finish_non_exhaustive()
125    }
126}
127
128impl Drop for BinanceWs {
129    fn drop(&mut self) {
130        // Warn if shutdown() was not called before dropping
131        if !self.shutdown_complete.load(Ordering::Acquire)
132            && !self.is_shutting_down.load(Ordering::Acquire)
133        {
134            tracing::warn!(
135                "BinanceWs dropped without calling shutdown(). \
136                 This may leave resources uncleaned. \
137                 Consider calling shutdown() before dropping."
138            );
139        }
140    }
141}
142
143impl BinanceWs {
144    /// Creates a new Binance WebSocket client
145    pub fn new(url: String) -> Self {
146        let subscription_manager = Arc::new(SubscriptionManager::new());
147        let message_router = Arc::new(MessageRouter::new(url, subscription_manager.clone(), None));
148
149        Self {
150            message_router,
151            subscription_manager,
152            listen_key: Arc::new(RwLock::new(None)),
153            listen_key_manager: None,
154            tickers: Arc::new(Mutex::new(HashMap::new())),
155            bids_asks: Arc::new(Mutex::new(HashMap::new())),
156            mark_prices: Arc::new(Mutex::new(HashMap::new())),
157            orderbooks: Arc::new(Mutex::new(HashMap::new())),
158            trades: Arc::new(Mutex::new(HashMap::new())),
159            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
160            balances: Arc::new(RwLock::new(HashMap::new())),
161            orders: Arc::new(RwLock::new(HashMap::new())),
162            my_trades: Arc::new(RwLock::new(HashMap::new())),
163            positions: Arc::new(RwLock::new(HashMap::new())),
164            channel_config: WsChannelConfig::default(),
165            messages_received: Arc::new(AtomicU64::new(0)),
166            messages_dropped: Arc::new(AtomicU64::new(0)),
167            last_message_time: Arc::new(AtomicU64::new(0)),
168            connection_start_time: Arc::new(AtomicU64::new(
169                chrono::Utc::now().timestamp_millis() as u64
170            )),
171            is_shutting_down: Arc::new(AtomicBool::new(false)),
172            shutdown_complete: Arc::new(AtomicBool::new(false)),
173        }
174    }
175
176    /// Creates a new Binance WebSocket client with custom configuration.
177    ///
178    /// # Arguments
179    /// * `config` - Configuration options including URL, channel capacities, and backpressure strategy
180    ///
181    /// # Example
182    /// ```rust,ignore
183    /// use ccxt_exchanges::binance::ws::{BinanceWs, BinanceWsConfig, WsChannelConfig};
184    /// use ccxt_core::ws_client::BackpressureStrategy;
185    ///
186    /// let config = BinanceWsConfig::new("wss://stream.binance.com:9443/ws".to_string())
187    ///     .with_backpressure(BackpressureStrategy::DropOldest);
188    /// let ws = BinanceWs::new_with_config(config);
189    /// ```
190    pub fn new_with_config(config: BinanceWsConfig) -> Self {
191        let subscription_manager = Arc::new(SubscriptionManager::new());
192        let message_router = Arc::new(MessageRouter::new(
193            config.url,
194            subscription_manager.clone(),
195            None,
196        ));
197
198        Self {
199            message_router,
200            subscription_manager,
201            listen_key: Arc::new(RwLock::new(None)),
202            listen_key_manager: None,
203            tickers: Arc::new(Mutex::new(HashMap::new())),
204            bids_asks: Arc::new(Mutex::new(HashMap::new())),
205            mark_prices: Arc::new(Mutex::new(HashMap::new())),
206            orderbooks: Arc::new(Mutex::new(HashMap::new())),
207            trades: Arc::new(Mutex::new(HashMap::new())),
208            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
209            balances: Arc::new(RwLock::new(HashMap::new())),
210            orders: Arc::new(RwLock::new(HashMap::new())),
211            my_trades: Arc::new(RwLock::new(HashMap::new())),
212            positions: Arc::new(RwLock::new(HashMap::new())),
213            channel_config: config.channel_config,
214            messages_received: Arc::new(AtomicU64::new(0)),
215            messages_dropped: Arc::new(AtomicU64::new(0)),
216            last_message_time: Arc::new(AtomicU64::new(0)),
217            connection_start_time: Arc::new(AtomicU64::new(
218                chrono::Utc::now().timestamp_millis() as u64
219            )),
220            is_shutting_down: Arc::new(AtomicBool::new(false)),
221            shutdown_complete: Arc::new(AtomicBool::new(false)),
222        }
223    }
224
225    /// Creates a WebSocket client with a listen key manager for a specific market type
226    pub fn new_with_auth(url: String, binance: Arc<Binance>, market_type: MarketType) -> Self {
227        let subscription_manager = Arc::new(SubscriptionManager::new());
228        let listen_key_manager = Arc::new(ListenKeyManager::new_for_market(binance, market_type));
229        let message_router = Arc::new(MessageRouter::new(
230            url,
231            subscription_manager.clone(),
232            Some(listen_key_manager.clone()),
233        ));
234
235        Self {
236            message_router,
237            subscription_manager,
238            listen_key: Arc::new(RwLock::new(None)),
239            listen_key_manager: Some(listen_key_manager),
240            tickers: Arc::new(Mutex::new(HashMap::new())),
241            bids_asks: Arc::new(Mutex::new(HashMap::new())),
242            mark_prices: Arc::new(Mutex::new(HashMap::new())),
243            orderbooks: Arc::new(Mutex::new(HashMap::new())),
244            trades: Arc::new(Mutex::new(HashMap::new())),
245            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
246            balances: Arc::new(RwLock::new(HashMap::new())),
247            orders: Arc::new(RwLock::new(HashMap::new())),
248            my_trades: Arc::new(RwLock::new(HashMap::new())),
249            positions: Arc::new(RwLock::new(HashMap::new())),
250            channel_config: WsChannelConfig::default(),
251            messages_received: Arc::new(AtomicU64::new(0)),
252            messages_dropped: Arc::new(AtomicU64::new(0)),
253            last_message_time: Arc::new(AtomicU64::new(0)),
254            connection_start_time: Arc::new(AtomicU64::new(
255                chrono::Utc::now().timestamp_millis() as u64
256            )),
257            is_shutting_down: Arc::new(AtomicBool::new(false)),
258            shutdown_complete: Arc::new(AtomicBool::new(false)),
259        }
260    }
261
262    /// Returns the channel capacity for a given subscription type
263    fn channel_capacity_for(&self, sub_type: &SubscriptionType) -> usize {
264        match sub_type {
265            SubscriptionType::Ticker | SubscriptionType::BookTicker => {
266                self.channel_config.ticker_capacity
267            }
268            SubscriptionType::OrderBook => self.channel_config.orderbook_capacity,
269            SubscriptionType::Trades | SubscriptionType::Kline(_) | SubscriptionType::MarkPrice => {
270                self.channel_config.trades_capacity
271            }
272            SubscriptionType::Balance
273            | SubscriptionType::Orders
274            | SubscriptionType::MyTrades
275            | SubscriptionType::Positions => self.channel_config.user_data_capacity,
276        }
277    }
278
279    /// Connects to the WebSocket server
280    pub async fn connect(&self) -> Result<()> {
281        if self.is_connected() {
282            return Ok(());
283        }
284
285        self.message_router.start(None).await?;
286
287        // No auto-reconnect coordinator needed as MessageRouter handles it internally.
288        // The router's loop manages connection state and retries.
289
290        Ok(())
291    }
292
293    /// Disconnects from the WebSocket server
294    pub async fn disconnect(&self) -> Result<()> {
295        self.message_router.stop().await?;
296
297        if let Some(manager) = &self.listen_key_manager {
298            manager.stop_auto_refresh().await;
299        }
300
301        Ok(())
302    }
303
304    /// Gracefully shuts down the WebSocket client.
305    ///
306    /// This method performs a complete shutdown sequence:
307    /// 1. Sets the shutting down flag to reject new subscriptions
308    /// 2. Stops the message router
309    /// 3. Stops listen key auto-refresh
310    /// 4. Clears all subscriptions
311    /// 5. Clears cached data
312    ///
313    /// After calling this method, the client cannot be reused.
314    /// This method is idempotent - calling it multiple times is safe.
315    pub async fn shutdown(&self) -> Result<()> {
316        // Check if already shutting down or complete
317        if self.shutdown_complete.load(Ordering::Acquire) {
318            return Ok(());
319        }
320
321        // Set shutting down flag to reject new subscriptions
322        if self.is_shutting_down.swap(true, Ordering::AcqRel) {
323            // Another shutdown is in progress, wait for it
324            while !self.shutdown_complete.load(Ordering::Acquire) {
325                tokio::time::sleep(Duration::from_millis(10)).await;
326            }
327            return Ok(());
328        }
329
330        tracing::info!("Initiating graceful shutdown of BinanceWs");
331
332        // Stop the message router (sends close frame)
333        let shutdown_result = tokio::time::timeout(
334            Duration::from_millis(DEFAULT_SHUTDOWN_TIMEOUT_MS),
335            self.message_router.stop(),
336        )
337        .await;
338
339        if shutdown_result.is_err() {
340            tracing::warn!("Shutdown timeout exceeded, forcing close");
341        }
342
343        // Stop listen key auto-refresh
344        if let Some(manager) = &self.listen_key_manager {
345            manager.stop_auto_refresh().await;
346        }
347
348        // Clear all subscriptions
349        self.subscription_manager.clear().await;
350
351        // Clear cached data
352        self.tickers.lock().await.clear();
353        self.bids_asks.lock().await.clear();
354        self.mark_prices.lock().await.clear();
355        self.orderbooks.lock().await.clear();
356        self.trades.lock().await.clear();
357        self.ohlcvs.lock().await.clear();
358        self.balances.write().await.clear();
359        self.orders.write().await.clear();
360        self.my_trades.write().await.clear();
361        self.positions.write().await.clear();
362
363        // Mark shutdown as complete
364        self.shutdown_complete.store(true, Ordering::Release);
365
366        tracing::info!("BinanceWs shutdown complete");
367        Ok(())
368    }
369
370    /// Returns true if the client is shutting down or has shut down.
371    #[inline]
372    pub fn is_shutting_down(&self) -> bool {
373        self.is_shutting_down.load(Ordering::Acquire)
374    }
375
376    /// Checks if shutdown is allowed and returns an error if shutting down.
377    #[inline]
378    #[allow(dead_code)]
379    fn check_not_shutting_down(&self) -> Result<()> {
380        if self.is_shutting_down.load(Ordering::Acquire) {
381            return Err(Error::invalid_request("WebSocket client is shutting down"));
382        }
383        Ok(())
384    }
385
386    /// Connects to the user data stream
387    pub async fn connect_user_stream(&self) -> Result<()> {
388        let manager = self.listen_key_manager.as_ref()
389            .ok_or_else(|| Error::invalid_request(
390                "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
391            ))?;
392
393        let listen_key = manager.get_or_create().await?;
394
395        let base_url = self.message_router.get_url();
396        let base_url = if let Some(stripped) = base_url.strip_suffix('/') {
397            stripped
398        } else {
399            &base_url
400        };
401
402        let url = format!("{}/{}", base_url, listen_key);
403
404        self.message_router.start(Some(url)).await?;
405        manager.start_auto_refresh().await;
406        *self.listen_key.write().await = Some(listen_key);
407
408        Ok(())
409    }
410
411    /// Closes the user data stream
412    pub async fn close_user_stream(&self) -> Result<()> {
413        if let Some(manager) = &self.listen_key_manager {
414            manager.delete().await?;
415        }
416        *self.listen_key.write().await = None;
417        Ok(())
418    }
419
420    /// Returns the active listen key, when available
421    pub async fn get_listen_key(&self) -> Option<String> {
422        if let Some(manager) = &self.listen_key_manager {
423            manager.get_current().await
424        } else {
425            self.listen_key.read().await.clone()
426        }
427    }
428
429    /// Subscribes to the ticker stream for a symbol.
430    ///
431    /// Returns a receiver that will receive ticker updates as JSON values.
432    /// The caller is responsible for consuming messages from the receiver.
433    pub async fn subscribe_ticker(
434        &self,
435        symbol: &str,
436    ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
437        let normalized = normalize_symbol(symbol);
438        let stream = format!("{}@ticker", normalized);
439        let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
440        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
441
442        let is_new = self
443            .subscription_manager
444            .add_subscription(
445                stream.clone(),
446                symbol.to_string(),
447                SubscriptionType::Ticker,
448                tx,
449            )
450            .await?;
451
452        if is_new {
453            self.message_router.subscribe(vec![stream]).await?;
454        }
455        Ok(rx)
456    }
457
458    /// Subscribes to the 24-hour ticker stream for all symbols.
459    ///
460    /// Returns a receiver that will receive ticker updates as JSON values.
461    pub async fn subscribe_all_tickers(&self) -> Result<tokio::sync::mpsc::Receiver<Value>> {
462        let stream = "!ticker@arr".to_string();
463        let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
464        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
465
466        let is_new = self
467            .subscription_manager
468            .add_subscription(
469                stream.clone(),
470                "all".to_string(),
471                SubscriptionType::Ticker,
472                tx,
473            )
474            .await?;
475
476        if is_new {
477            self.message_router.subscribe(vec![stream]).await?;
478        }
479        Ok(rx)
480    }
481
482    /// Subscribes to real-time trade executions for a symbol.
483    ///
484    /// Returns a receiver that will receive trade updates as JSON values.
485    pub async fn subscribe_trades(
486        &self,
487        symbol: &str,
488    ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
489        let normalized = normalize_symbol(symbol);
490        let stream = format!("{}@trade", normalized);
491        let capacity = self.channel_capacity_for(&SubscriptionType::Trades);
492        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
493
494        let is_new = self
495            .subscription_manager
496            .add_subscription(
497                stream.clone(),
498                symbol.to_string(),
499                SubscriptionType::Trades,
500                tx,
501            )
502            .await?;
503
504        if is_new {
505            self.message_router.subscribe(vec![stream]).await?;
506        }
507        Ok(rx)
508    }
509
510    /// Subscribes to the aggregated trade stream for a symbol.
511    ///
512    /// Returns a receiver that will receive aggregated trade updates as JSON values.
513    pub async fn subscribe_agg_trades(
514        &self,
515        symbol: &str,
516    ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
517        let normalized = normalize_symbol(symbol);
518        let stream = format!("{}@aggTrade", normalized);
519        let capacity = self.channel_capacity_for(&SubscriptionType::Trades);
520        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
521
522        let is_new = self
523            .subscription_manager
524            .add_subscription(
525                stream.clone(),
526                symbol.to_string(),
527                SubscriptionType::Trades,
528                tx,
529            )
530            .await?;
531
532        if is_new {
533            self.message_router.subscribe(vec![stream]).await?;
534        }
535        Ok(rx)
536    }
537
538    /// Subscribes to the order book depth stream.
539    ///
540    /// Returns a receiver that will receive order book updates as JSON values.
541    ///
542    /// # Arguments
543    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT" or "BTC/USDT")
544    /// * `levels` - Depth level (L5, L10, or L20)
545    /// * `update_speed` - Update frequency (Ms100 or Ms1000)
546    ///
547    /// # Example
548    /// ```rust,ignore
549    /// use ccxt_exchanges::binance::ws::{DepthLevel, UpdateSpeed};
550    /// let mut rx = ws.subscribe_orderbook("BTCUSDT", DepthLevel::L20, UpdateSpeed::Ms100).await?;
551    /// while let Some(msg) = rx.recv().await {
552    ///     println!("Order book update: {:?}", msg);
553    /// }
554    /// ```
555    pub async fn subscribe_orderbook(
556        &self,
557        symbol: &str,
558        levels: DepthLevel,
559        update_speed: UpdateSpeed,
560    ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
561        let normalized = normalize_symbol(symbol);
562        let stream = match update_speed {
563            UpdateSpeed::Ms100 => format!("{}@depth{}@100ms", normalized, levels.as_u32()),
564            UpdateSpeed::Ms1000 => format!("{}@depth{}", normalized, levels.as_u32()),
565        };
566        let capacity = self.channel_capacity_for(&SubscriptionType::OrderBook);
567        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
568
569        let is_new = self
570            .subscription_manager
571            .add_subscription(
572                stream.clone(),
573                symbol.to_string(),
574                SubscriptionType::OrderBook,
575                tx,
576            )
577            .await?;
578
579        if is_new {
580            self.message_router.subscribe(vec![stream]).await?;
581        }
582        Ok(rx)
583    }
584
585    /// Subscribes to the diff order book stream.
586    ///
587    /// Returns a receiver that will receive order book diff updates as JSON values.
588    ///
589    /// # Arguments
590    /// * `symbol` - Trading pair symbol
591    /// * `update_speed` - Optional update frequency
592    pub async fn subscribe_orderbook_diff(
593        &self,
594        symbol: &str,
595        update_speed: Option<UpdateSpeed>,
596    ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
597        let normalized = normalize_symbol(symbol);
598        let stream = match update_speed {
599            Some(UpdateSpeed::Ms100) => format!("{}@depth@100ms", normalized),
600            _ => format!("{}@depth", normalized),
601        };
602        let capacity = self.channel_capacity_for(&SubscriptionType::OrderBook);
603        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
604
605        let is_new = self
606            .subscription_manager
607            .add_subscription(
608                stream.clone(),
609                symbol.to_string(),
610                SubscriptionType::OrderBook,
611                tx,
612            )
613            .await?;
614
615        if is_new {
616            self.message_router.subscribe(vec![stream]).await?;
617        }
618        Ok(rx)
619    }
620
621    /// Subscribes to Kline (candlestick) data for a symbol.
622    ///
623    /// Returns a receiver that will receive kline updates as JSON values.
624    pub async fn subscribe_kline(
625        &self,
626        symbol: &str,
627        interval: &str,
628    ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
629        let normalized = normalize_symbol(symbol);
630        let stream = format!("{}@kline_{}", normalized, interval);
631        let sub_type = SubscriptionType::Kline(interval.to_string());
632        let capacity = self.channel_capacity_for(&sub_type);
633        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
634
635        let is_new = self
636            .subscription_manager
637            .add_subscription(stream.clone(), symbol.to_string(), sub_type, tx)
638            .await?;
639
640        if is_new {
641            self.message_router.subscribe(vec![stream]).await?;
642        }
643        Ok(rx)
644    }
645
646    /// Subscribes to the mini ticker stream for a symbol.
647    ///
648    /// Returns a receiver that will receive mini ticker updates as JSON values.
649    pub async fn subscribe_mini_ticker(
650        &self,
651        symbol: &str,
652    ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
653        let normalized = normalize_symbol(symbol);
654        let stream = format!("{}@miniTicker", normalized);
655        let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
656        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
657
658        let is_new = self
659            .subscription_manager
660            .add_subscription(
661                stream.clone(),
662                symbol.to_string(),
663                SubscriptionType::Ticker,
664                tx,
665            )
666            .await?;
667
668        if is_new {
669            self.message_router.subscribe(vec![stream]).await?;
670        }
671        Ok(rx)
672    }
673
674    /// Subscribes to the mini ticker stream for all symbols.
675    ///
676    /// Returns a receiver that will receive mini ticker updates as JSON values.
677    pub async fn subscribe_all_mini_tickers(&self) -> Result<tokio::sync::mpsc::Receiver<Value>> {
678        let stream = "!miniTicker@arr".to_string();
679        let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
680        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
681
682        let is_new = self
683            .subscription_manager
684            .add_subscription(
685                stream.clone(),
686                "all".to_string(),
687                SubscriptionType::Ticker,
688                tx,
689            )
690            .await?;
691
692        if is_new {
693            self.message_router.subscribe(vec![stream]).await?;
694        }
695        Ok(rx)
696    }
697
698    /// Cancels an existing subscription.
699    ///
700    /// Only sends the UNSUBSCRIBE command to the server when the reference count
701    /// reaches zero (no more active handles for this stream).
702    pub async fn unsubscribe(&self, stream: String) -> Result<()> {
703        let fully_removed = self
704            .subscription_manager
705            .remove_subscription(&stream)
706            .await?;
707
708        // Only send UNSUBSCRIBE to the server if the subscription was fully removed
709        if fully_removed {
710            self.message_router.unsubscribe(vec![stream]).await?;
711        }
712        Ok(())
713    }
714
715    /// Receives the next available message
716    pub fn receive(&self) -> Option<Value> {
717        None
718    }
719
720    /// Indicates whether the WebSocket connection is active
721    pub fn is_connected(&self) -> bool {
722        self.message_router.is_connected()
723    }
724
725    /// Returns the current connection state.
726    pub fn state(&self) -> ccxt_core::ws_client::WsConnectionState {
727        if self.message_router.is_connected() {
728            ccxt_core::ws_client::WsConnectionState::Connected
729        } else {
730            ccxt_core::ws_client::WsConnectionState::Disconnected
731        }
732    }
733
734    /// Returns the list of active subscriptions.
735    ///
736    /// Returns a vector of subscription channel names that are currently active.
737    /// This method retrieves the actual subscriptions from the underlying WsClient's
738    /// subscription manager, providing accurate state tracking.
739    pub fn subscriptions(&self) -> Vec<String> {
740        let subs = self.subscription_manager.get_all_subscriptions_sync();
741        subs.into_iter().map(|s| s.stream).collect()
742    }
743
744    /// Returns the number of active subscriptions.
745    pub fn subscription_count(&self) -> usize {
746        self.subscription_manager.active_count()
747    }
748
749    /// Returns a health snapshot for monitoring the WebSocket connection.
750    ///
751    /// This provides metrics useful for monitoring connection health:
752    /// - Latency from ping/pong
753    /// - Message counts (received and dropped)
754    /// - Connection uptime
755    /// - Reconnection count
756    pub fn health(&self) -> WsHealthSnapshot {
757        let now = chrono::Utc::now().timestamp_millis() as u64;
758        let start_time = self.connection_start_time.load(Ordering::Relaxed);
759        let last_msg = self.last_message_time.load(Ordering::Relaxed);
760
761        WsHealthSnapshot {
762            latency_ms: self.message_router.latency(),
763            messages_received: self.messages_received.load(Ordering::Relaxed),
764            messages_dropped: self.messages_dropped.load(Ordering::Relaxed),
765            last_message_time: if last_msg > 0 {
766                Some(last_msg as i64)
767            } else {
768                None
769            },
770            connection_uptime_ms: if start_time > 0 {
771                now.saturating_sub(start_time)
772            } else {
773                0
774            },
775            reconnect_count: self.message_router.reconnect_count(),
776        }
777    }
778
779    /// Records that a message was received (for health tracking).
780    #[inline]
781    #[allow(dead_code)]
782    pub(crate) fn record_message_received(&self) {
783        self.messages_received.fetch_add(1, Ordering::Relaxed);
784        self.last_message_time.store(
785            chrono::Utc::now().timestamp_millis() as u64,
786            Ordering::Relaxed,
787        );
788    }
789
790    /// Records that a message was dropped (for health tracking).
791    #[inline]
792    #[allow(dead_code)]
793    pub(crate) fn record_message_dropped(&self) {
794        let count = self.messages_dropped.fetch_add(1, Ordering::Relaxed) + 1;
795        // Log every 100th drop to avoid log spam
796        if count % 100 == 1 {
797            tracing::warn!(dropped_count = count, "Message dropped due to backpressure");
798        }
799    }
800
801    /// Generic method for watching a WebSocket stream.
802    ///
803    /// This method abstracts the common pattern used by all watch_* methods:
804    /// 1. Create a channel for receiving messages
805    /// 2. Add subscription to the manager
806    /// 3. Subscribe via the message router
807    /// 4. Wait for and parse messages
808    ///
809    /// # Type Parameters
810    /// * `T` - The output type to parse messages into
811    /// * `P` - The parser type implementing `StreamParser<Output = T>`
812    ///
813    /// # Arguments
814    /// * `stream` - The stream name (e.g., "btcusdt@ticker")
815    /// * `symbol` - The symbol for subscription tracking
816    /// * `sub_type` - The subscription type
817    /// * `market` - Optional market info for parsing
818    async fn watch_stream<T, P>(
819        &self,
820        stream: String,
821        symbol: String,
822        sub_type: SubscriptionType,
823        market: Option<&ccxt_core::types::Market>,
824    ) -> Result<T>
825    where
826        P: parsers::StreamParser<Output = T>,
827    {
828        let capacity = self.channel_capacity_for(&sub_type);
829        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
830        let is_new = self
831            .subscription_manager
832            .add_subscription(stream.clone(), symbol, sub_type, tx)
833            .await?;
834
835        if is_new {
836            self.message_router.subscribe(vec![stream.clone()]).await?;
837        }
838
839        loop {
840            if let Some(message) = rx.recv().await {
841                // Skip subscription confirmation messages
842                if message.get("result").is_some() {
843                    continue;
844                }
845
846                match P::parse(&message, market) {
847                    Ok(data) => return Ok(data),
848                    Err(e) => {
849                        tracing::warn!(
850                            "Failed to parse message for stream {}: {:?}. Payload: {:?}",
851                            stream,
852                            e,
853                            message
854                        );
855                        // Continue waiting for the next message
856                    }
857                }
858            } else {
859                return Err(Error::network("Subscription channel closed"));
860            }
861        }
862    }
863
864    /// Watches a single mark price stream (internal helper)
865    async fn watch_mark_price_internal(
866        &self,
867        symbol: &str,
868        channel_name: &str,
869    ) -> Result<MarkPrice> {
870        let normalized = normalize_symbol(symbol);
871        let stream = format!("{}@{}", normalized, channel_name);
872        tracing::debug!(
873            "watch_mark_price_internal: stream={}, symbol={}",
874            stream,
875            symbol
876        );
877
878        let mark_price = self
879            .watch_stream::<MarkPrice, parsers::MarkPriceParser>(
880                stream,
881                symbol.to_string(),
882                SubscriptionType::MarkPrice,
883                None,
884            )
885            .await?;
886
887        // Update cache
888        let mut mark_prices = self.mark_prices.lock().await;
889        mark_prices.insert(mark_price.symbol.clone(), mark_price.clone());
890
891        Ok(mark_price)
892    }
893
894    /// Watches multiple mark price streams (internal helper)
895    async fn watch_mark_prices_internal(
896        &self,
897        symbols: Option<Vec<String>>,
898        channel_name: &str,
899    ) -> Result<HashMap<String, MarkPrice>> {
900        let capacity = self.channel_capacity_for(&SubscriptionType::MarkPrice);
901        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
902
903        let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
904            let mut streams = Vec::with_capacity(syms.len());
905            for sym in syms {
906                let symbol = sym.to_lowercase();
907                let stream = format!("{}@{}", symbol, channel_name);
908                let is_new = self
909                    .subscription_manager
910                    .add_subscription(
911                        stream.clone(),
912                        symbol,
913                        SubscriptionType::MarkPrice,
914                        tx.clone(),
915                    )
916                    .await?;
917                if is_new {
918                    streams.push(stream);
919                }
920            }
921            streams
922        } else {
923            let stream = format!("!{}@arr", channel_name);
924            let is_new = self
925                .subscription_manager
926                .add_subscription(
927                    stream.clone(),
928                    "all".to_string(),
929                    SubscriptionType::MarkPrice,
930                    tx.clone(),
931                )
932                .await?;
933            if is_new { vec![stream] } else { vec![] }
934        };
935
936        if !streams.is_empty() {
937            self.message_router.subscribe(streams.clone()).await?;
938        }
939
940        let mut result = HashMap::new();
941
942        loop {
943            if let Some(message) = rx.recv().await {
944                if message.get("result").is_some() {
945                    continue;
946                }
947
948                if let Some(arr) = message.as_array() {
949                    for item in arr {
950                        if let Ok(mark_price) = parser::parse_ws_mark_price(item) {
951                            let symbol = mark_price.symbol.clone();
952
953                            if let Some(syms) = &symbols {
954                                if syms.contains(&symbol.to_lowercase()) {
955                                    result.insert(symbol.clone(), mark_price.clone());
956                                }
957                            } else {
958                                result.insert(symbol.clone(), mark_price.clone());
959                            }
960
961                            let mut mark_prices = self.mark_prices.lock().await;
962                            mark_prices.insert(symbol, mark_price);
963                        } else {
964                            tracing::warn!("Failed to parse item in mark price array: {:?}", item);
965                        }
966                    }
967
968                    if let Some(syms) = &symbols {
969                        if result.len() >= syms.len() {
970                            return Ok(result);
971                        }
972                    } else {
973                        // For array updates without specific symbols filter, return what we got
974                        return Ok(result);
975                    }
976                } else {
977                    match parser::parse_ws_mark_price(&message) {
978                        Ok(mark_price) => {
979                            let symbol = mark_price.symbol.clone();
980                            result.insert(symbol.clone(), mark_price.clone());
981
982                            let mut mark_prices = self.mark_prices.lock().await;
983                            mark_prices.insert(symbol, mark_price);
984
985                            if let Some(syms) = &symbols {
986                                if result.len() >= syms.len() {
987                                    return Ok(result);
988                                }
989                            }
990                        }
991                        Err(e) => {
992                            tracing::warn!(
993                                "Failed to parse mark price message: {:?}. Payload: {:?}",
994                                e,
995                                message
996                            );
997                        }
998                    }
999                }
1000            } else {
1001                return Err(Error::network("Subscription channel closed"));
1002            }
1003        }
1004    }
1005
1006    /// Watches a single ticker stream (internal helper)
1007    async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
1008        let normalized = normalize_symbol(symbol);
1009        let stream = format!("{}@{}", normalized, channel_name);
1010
1011        let ticker = self
1012            .watch_stream::<Ticker, parsers::TickerParser>(
1013                stream,
1014                symbol.to_string(),
1015                SubscriptionType::Ticker,
1016                None,
1017            )
1018            .await?;
1019
1020        // Update cache
1021        let mut tickers = self.tickers.lock().await;
1022        tickers.insert(ticker.symbol.clone(), ticker.clone());
1023
1024        Ok(ticker)
1025    }
1026
1027    /// Watches multiple ticker streams (internal helper)
1028    async fn watch_tickers_internal(
1029        &self,
1030        symbols: Option<Vec<String>>,
1031        channel_name: &str,
1032    ) -> Result<HashMap<String, Ticker>> {
1033        let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
1034            syms.iter()
1035                .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
1036                .collect()
1037        } else {
1038            vec![format!("!{}@arr", channel_name)]
1039        };
1040
1041        let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
1042        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1043
1044        let mut new_streams = Vec::new();
1045
1046        for stream in &streams {
1047            let is_new = self
1048                .subscription_manager
1049                .add_subscription(
1050                    stream.clone(),
1051                    "all".to_string(),
1052                    SubscriptionType::Ticker,
1053                    tx.clone(),
1054                )
1055                .await?;
1056            if is_new {
1057                new_streams.push(stream.clone());
1058            }
1059        }
1060
1061        if !new_streams.is_empty() {
1062            self.message_router.subscribe(new_streams).await?;
1063        }
1064
1065        let mut result = HashMap::new();
1066
1067        loop {
1068            if let Some(message) = rx.recv().await {
1069                if message.get("result").is_some() {
1070                    continue;
1071                }
1072
1073                if let Some(arr) = message.as_array() {
1074                    for item in arr {
1075                        if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
1076                            let symbol = ticker.symbol.clone();
1077
1078                            if let Some(syms) = &symbols {
1079                                if syms.contains(&symbol.to_lowercase()) {
1080                                    result.insert(symbol.clone(), ticker.clone());
1081                                }
1082                            } else {
1083                                result.insert(symbol.clone(), ticker.clone());
1084                            }
1085
1086                            let mut tickers = self.tickers.lock().await;
1087                            tickers.insert(symbol, ticker);
1088                        } else {
1089                            tracing::warn!("Failed to parse item in ticker array: {:?}", item);
1090                        }
1091                    }
1092
1093                    if let Some(syms) = &symbols {
1094                        if result.len() >= syms.len() {
1095                            return Ok(result);
1096                        }
1097                    } else {
1098                        // If we received an array but we are not waiting for specific symbols,
1099                        // we can assume we got the update for "all" markets.
1100                        // However, !ticker@arr returns ALL tickers in one message usually.
1101                        return Ok(result);
1102                    }
1103                } else {
1104                    match parser::parse_ws_ticker(&message, None) {
1105                        Ok(ticker) => {
1106                            let symbol = ticker.symbol.clone();
1107                            result.insert(symbol.clone(), ticker.clone());
1108
1109                            let mut tickers = self.tickers.lock().await;
1110                            tickers.insert(symbol, ticker);
1111
1112                            if let Some(syms) = &symbols {
1113                                if result.len() >= syms.len() {
1114                                    return Ok(result);
1115                                }
1116                            }
1117                        }
1118                        Err(e) => {
1119                            tracing::warn!(
1120                                "Failed to parse ticker message: {:?}. Payload: {:?}",
1121                                e,
1122                                message
1123                            );
1124                        }
1125                    }
1126                }
1127            } else {
1128                return Err(Error::network("Subscription channel closed"));
1129            }
1130        }
1131    }
1132
1133    /// Processes an order book delta update (internal helper)
1134    async fn handle_orderbook_delta(
1135        &self,
1136        symbol: &str,
1137        delta_message: &Value,
1138        is_futures: bool,
1139    ) -> Result<()> {
1140        handlers::handle_orderbook_delta(symbol, delta_message, is_futures, &self.orderbooks).await
1141    }
1142
1143    /// Retrieves an order book snapshot and initializes cached state (internal helper)
1144    async fn fetch_orderbook_snapshot(
1145        &self,
1146        exchange: &Binance,
1147        symbol: &str,
1148        limit: Option<i64>,
1149        is_futures: bool,
1150    ) -> Result<OrderBook> {
1151        handlers::fetch_orderbook_snapshot(exchange, symbol, limit, is_futures, &self.orderbooks)
1152            .await
1153    }
1154
1155    /// Watches a single order book stream (internal helper)
1156    async fn watch_orderbook_internal(
1157        &self,
1158        exchange: &Binance,
1159        symbol: &str,
1160        limit: Option<i64>,
1161        update_speed: UpdateSpeed,
1162        is_futures: bool,
1163    ) -> Result<OrderBook> {
1164        let stream = match update_speed {
1165            UpdateSpeed::Ms100 => format!("{}@depth@100ms", symbol.to_lowercase()),
1166            UpdateSpeed::Ms1000 => format!("{}@depth", symbol.to_lowercase()),
1167        };
1168
1169        let capacity = self.channel_capacity_for(&SubscriptionType::OrderBook);
1170        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1171        let is_new = self
1172            .subscription_manager
1173            .add_subscription(
1174                stream.clone(),
1175                symbol.to_string(),
1176                SubscriptionType::OrderBook,
1177                tx,
1178            )
1179            .await?;
1180
1181        if is_new {
1182            self.message_router.subscribe(vec![stream.clone()]).await?;
1183        }
1184
1185        tokio::time::sleep(Duration::from_millis(500)).await;
1186
1187        let _snapshot = self
1188            .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1189            .await?;
1190
1191        loop {
1192            if let Some(message) = rx.recv().await {
1193                if message.get("result").is_some() {
1194                    continue;
1195                }
1196
1197                if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
1198                    if event_type == "depthUpdate" {
1199                        match self
1200                            .handle_orderbook_delta(symbol, &message, is_futures)
1201                            .await
1202                        {
1203                            Ok(()) => {
1204                                let orderbooks = self.orderbooks.lock().await;
1205                                if let Some(ob) = orderbooks.get(symbol) {
1206                                    if ob.is_synced {
1207                                        return Ok(ob.clone());
1208                                    }
1209                                }
1210                            }
1211                            Err(e) => {
1212                                let err_msg = e.to_string();
1213                                let recovery = WsErrorRecovery::from_error_message(&err_msg);
1214
1215                                match recovery {
1216                                    WsErrorRecovery::Resync => {
1217                                        tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
1218                                        match self
1219                                            .resync_orderbook(exchange, symbol, limit, is_futures)
1220                                            .await
1221                                        {
1222                                            Ok(true) => {
1223                                                tracing::info!(
1224                                                    "Resync completed successfully for {}",
1225                                                    symbol
1226                                                );
1227                                            }
1228                                            Ok(false) => {
1229                                                tracing::debug!(
1230                                                    "Resync rate limited for {}, skipping",
1231                                                    symbol
1232                                                );
1233                                            }
1234                                            Err(resync_err) => {
1235                                                tracing::error!(
1236                                                    "Resync failed for {}: {}",
1237                                                    symbol,
1238                                                    resync_err
1239                                                );
1240                                                return Err(resync_err);
1241                                            }
1242                                        }
1243                                    }
1244                                    WsErrorRecovery::Fatal => {
1245                                        tracing::error!(
1246                                            "Fatal error handling orderbook delta: {}",
1247                                            err_msg
1248                                        );
1249                                        return Err(e);
1250                                    }
1251                                    _ => {
1252                                        tracing::error!(
1253                                            "Failed to handle orderbook delta: {}",
1254                                            err_msg
1255                                        );
1256                                    }
1257                                }
1258                            }
1259                        }
1260                    }
1261                }
1262            } else {
1263                return Err(Error::network("Subscription channel closed"));
1264            }
1265        }
1266    }
1267
1268    /// Resyncs the orderbook for a symbol by fetching a fresh snapshot.
1269    ///
1270    /// Returns `Ok(true)` if resync was performed, `Ok(false)` if rate limited.
1271    async fn resync_orderbook(
1272        &self,
1273        exchange: &Binance,
1274        symbol: &str,
1275        limit: Option<i64>,
1276        is_futures: bool,
1277    ) -> Result<bool> {
1278        let current_time = chrono::Utc::now().timestamp_millis();
1279
1280        // Check rate limit
1281        let should_resync = {
1282            let orderbooks = self.orderbooks.lock().await;
1283            if let Some(ob) = orderbooks.get(symbol) {
1284                ob.should_resync(current_time)
1285            } else {
1286                true
1287            }
1288        };
1289
1290        if !should_resync {
1291            return Ok(false);
1292        }
1293
1294        // Reset orderbook state
1295        {
1296            let mut orderbooks = self.orderbooks.lock().await;
1297            if let Some(ob) = orderbooks.get_mut(symbol) {
1298                ob.reset_for_resync();
1299                ob.mark_resync_initiated(current_time);
1300            }
1301        }
1302
1303        // Wait before fetching snapshot
1304        tokio::time::sleep(Duration::from_millis(500)).await;
1305
1306        // Fetch fresh snapshot
1307        self.fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1308            .await?;
1309
1310        Ok(true)
1311    }
1312
1313    /// Watches multiple order book streams (internal helper)
1314    async fn watch_orderbooks_internal(
1315        &self,
1316        exchange: &Binance,
1317        symbols: Vec<String>,
1318        limit: Option<i64>,
1319        update_speed: UpdateSpeed,
1320        is_futures: bool,
1321    ) -> Result<HashMap<String, OrderBook>> {
1322        if symbols.len() > 200 {
1323            return Err(Error::invalid_request(
1324                "Binance supports max 200 symbols per connection",
1325            ));
1326        }
1327
1328        let capacity = self.channel_capacity_for(&SubscriptionType::OrderBook);
1329        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1330        let mut new_streams = Vec::new();
1331
1332        for symbol in &symbols {
1333            let stream = match update_speed {
1334                UpdateSpeed::Ms100 => format!("{}@depth@100ms", symbol.to_lowercase()),
1335                UpdateSpeed::Ms1000 => format!("{}@depth", symbol.to_lowercase()),
1336            };
1337
1338            let is_new = self
1339                .subscription_manager
1340                .add_subscription(
1341                    stream.clone(),
1342                    symbol.clone(),
1343                    SubscriptionType::OrderBook,
1344                    tx.clone(),
1345                )
1346                .await?;
1347
1348            if is_new {
1349                new_streams.push(stream);
1350            }
1351        }
1352
1353        if !new_streams.is_empty() {
1354            self.message_router.subscribe(new_streams).await?;
1355        }
1356
1357        tokio::time::sleep(Duration::from_millis(500)).await;
1358
1359        for symbol in &symbols {
1360            let _ = self
1361                .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1362                .await;
1363        }
1364
1365        let mut result = HashMap::new();
1366        let mut synced_symbols = std::collections::HashSet::new();
1367
1368        while synced_symbols.len() < symbols.len() {
1369            if let Some(message) = rx.recv().await {
1370                if message.get("result").is_some() {
1371                    continue;
1372                }
1373
1374                if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
1375                    if event_type == "depthUpdate" {
1376                        if let Some(msg_symbol) =
1377                            message.get("s").and_then(serde_json::Value::as_str)
1378                        {
1379                            if let Err(e) = self
1380                                .handle_orderbook_delta(msg_symbol, &message, is_futures)
1381                                .await
1382                            {
1383                                tracing::error!("Failed to handle orderbook delta: {}", e);
1384                                continue;
1385                            }
1386
1387                            let orderbooks = self.orderbooks.lock().await;
1388                            if let Some(ob) = orderbooks.get(msg_symbol) {
1389                                if ob.is_synced {
1390                                    synced_symbols.insert(msg_symbol.to_string());
1391                                }
1392                            }
1393                        }
1394                    }
1395                }
1396            } else {
1397                return Err(Error::network("Subscription channel closed"));
1398            }
1399        }
1400
1401        let orderbooks = self.orderbooks.lock().await;
1402        for symbol in &symbols {
1403            if let Some(ob) = orderbooks.get(symbol) {
1404                result.insert(symbol.clone(), ob.clone());
1405            }
1406        }
1407
1408        Ok(result)
1409    }
1410
1411    /// Watches the best bid/ask data for a unified symbol (internal helper)
1412    async fn watch_bids_asks_internal(&self, symbol: &str, market_id: &str) -> Result<BidAsk> {
1413        let normalized = normalize_symbol(market_id);
1414        let stream = format!("{}@bookTicker", normalized);
1415
1416        let bid_ask = self
1417            .watch_stream::<BidAsk, parsers::BidAskParser>(
1418                stream,
1419                symbol.to_string(),
1420                SubscriptionType::BookTicker,
1421                None,
1422            )
1423            .await?;
1424
1425        // Update cache
1426        let mut bids_asks_map = self.bids_asks.lock().await;
1427        bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
1428
1429        Ok(bid_ask)
1430    }
1431
1432    /// Streams trade data for a unified symbol (internal helper)
1433    async fn watch_trades_internal(
1434        &self,
1435        symbol: &str,
1436        market_id: &str,
1437        since: Option<i64>,
1438        limit: Option<usize>,
1439        market: Option<&ccxt_core::types::Market>,
1440    ) -> Result<Vec<Trade>> {
1441        let stream = format!("{}@trade", market_id.to_lowercase());
1442        let capacity = self.channel_capacity_for(&SubscriptionType::Trades);
1443        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1444
1445        let is_new = self
1446            .subscription_manager
1447            .add_subscription(
1448                stream.clone(),
1449                symbol.to_string(),
1450                SubscriptionType::Trades,
1451                tx,
1452            )
1453            .await?;
1454
1455        if is_new {
1456            self.message_router.subscribe(vec![stream.clone()]).await?;
1457        }
1458
1459        // Wait for at least one trade or use a loop with timeout if we want to mimic "polling" until data arrives?
1460        // Usually watch_trades returns the latest trades.
1461
1462        loop {
1463            if let Some(message) = rx.recv().await {
1464                if message.get("result").is_some() {
1465                    continue;
1466                }
1467
1468                if let Ok(trade) = parser::parse_ws_trade(&message, market) {
1469                    let mut trades_map = self.trades.lock().await;
1470                    let trades = trades_map
1471                        .entry(symbol.to_string())
1472                        .or_insert_with(VecDeque::new);
1473
1474                    if trades.len() >= MAX_TRADES {
1475                        trades.pop_front();
1476                    }
1477                    trades.push_back(trade);
1478
1479                    let mut result: Vec<Trade> = trades.iter().cloned().collect();
1480
1481                    if let Some(since_ts) = since {
1482                        result.retain(|t| t.timestamp >= since_ts);
1483                    }
1484
1485                    if let Some(limit_size) = limit {
1486                        if result.len() > limit_size {
1487                            result = result.split_off(result.len() - limit_size);
1488                        }
1489                    }
1490
1491                    return Ok(result);
1492                }
1493            } else {
1494                return Err(Error::network("Subscription channel closed"));
1495            }
1496        }
1497    }
1498
1499    /// Streams OHLCV data for a unified symbol (internal helper)
1500    async fn watch_ohlcv_internal(
1501        &self,
1502        symbol: &str,
1503        market_id: &str,
1504        timeframe: &str,
1505        since: Option<i64>,
1506        limit: Option<usize>,
1507    ) -> Result<Vec<OHLCV>> {
1508        let stream = format!("{}@kline_{}", market_id.to_lowercase(), timeframe);
1509        let sub_type = SubscriptionType::Kline(timeframe.to_string());
1510        let capacity = self.channel_capacity_for(&sub_type);
1511        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1512
1513        let is_new = self
1514            .subscription_manager
1515            .add_subscription(stream.clone(), symbol.to_string(), sub_type, tx)
1516            .await?;
1517
1518        if is_new {
1519            self.message_router.subscribe(vec![stream.clone()]).await?;
1520        }
1521
1522        loop {
1523            if let Some(message) = rx.recv().await {
1524                if message.get("result").is_some() {
1525                    continue;
1526                }
1527
1528                if let Ok(ohlcv) = parser::parse_ws_ohlcv(&message) {
1529                    let cache_key = format!("{}:{}", symbol, timeframe);
1530                    let mut ohlcvs_map = self.ohlcvs.lock().await;
1531                    let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
1532
1533                    if ohlcvs.len() >= MAX_OHLCVS {
1534                        ohlcvs.pop_front();
1535                    }
1536                    ohlcvs.push_back(ohlcv);
1537
1538                    let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
1539
1540                    if let Some(since_ts) = since {
1541                        result.retain(|o| o.timestamp >= since_ts);
1542                    }
1543
1544                    if let Some(limit_size) = limit {
1545                        if result.len() > limit_size {
1546                            result = result.split_off(result.len() - limit_size);
1547                        }
1548                    }
1549
1550                    return Ok(result);
1551                }
1552            } else {
1553                return Err(Error::network("Subscription channel closed"));
1554            }
1555        }
1556    }
1557
1558    /// Returns cached ticker snapshot
1559    pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1560        let tickers = self.tickers.lock().await;
1561        tickers.get(symbol).cloned()
1562    }
1563
1564    /// Returns all cached ticker snapshots
1565    pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1566        let tickers = self.tickers.lock().await;
1567        tickers.clone()
1568    }
1569
1570    /// Handles balance update messages (internal helper)
1571    async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1572        user_data::handle_balance_message(message, account_type, &self.balances).await
1573    }
1574
1575    /// Watches for balance updates (internal helper)
1576    async fn watch_balance_internal(&self, account_type: &str) -> Result<Balance> {
1577        self.connect_user_stream().await?;
1578
1579        let capacity = self.channel_capacity_for(&SubscriptionType::Balance);
1580        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1581
1582        self.subscription_manager
1583            .add_subscription(
1584                "!userData".to_string(),
1585                "user".to_string(),
1586                SubscriptionType::Balance,
1587                tx,
1588            )
1589            .await?;
1590
1591        // Note: userData stream doesn't need explicit SUBSCRIBE - it's implicit via listen key
1592        loop {
1593            if let Some(message) = rx.recv().await {
1594                if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
1595                    if matches!(
1596                        event_type,
1597                        "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE"
1598                    ) {
1599                        if let Ok(()) = self.handle_balance_message(&message, account_type).await {
1600                            let balances = self.balances.read().await;
1601                            if let Some(balance) = balances.get(account_type) {
1602                                return Ok(balance.clone());
1603                            }
1604                        }
1605                    }
1606                }
1607            } else {
1608                return Err(Error::network("Subscription channel closed"));
1609            }
1610        }
1611    }
1612
1613    /// Watches for order updates (internal helper)
1614    async fn watch_orders_internal(
1615        &self,
1616        symbol: Option<&str>,
1617        since: Option<i64>,
1618        limit: Option<usize>,
1619    ) -> Result<Vec<Order>> {
1620        self.connect_user_stream().await?;
1621
1622        let capacity = self.channel_capacity_for(&SubscriptionType::Orders);
1623        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1624
1625        self.subscription_manager
1626            .add_subscription(
1627                "!userData".to_string(),
1628                "user".to_string(),
1629                SubscriptionType::Orders,
1630                tx,
1631            )
1632            .await?;
1633
1634        loop {
1635            if let Some(message) = rx.recv().await {
1636                if let Value::Object(data) = message {
1637                    if let Some(event_type) = data.get("e").and_then(serde_json::Value::as_str) {
1638                        if event_type == "executionReport" {
1639                            let order = user_data::parse_ws_order(&data);
1640
1641                            let mut orders = self.orders.write().await;
1642                            let symbol_orders = orders
1643                                .entry(order.symbol.clone())
1644                                .or_insert_with(HashMap::new);
1645                            symbol_orders.insert(order.id.clone(), order.clone());
1646                            drop(orders);
1647
1648                            if let Some(exec_type) =
1649                                data.get("x").and_then(serde_json::Value::as_str)
1650                            {
1651                                if exec_type == "TRADE" {
1652                                    if let Ok(trade) =
1653                                        BinanceWs::parse_ws_trade(&Value::Object(data.clone()))
1654                                    {
1655                                        let mut trades = self.my_trades.write().await;
1656                                        let symbol_trades = trades
1657                                            .entry(trade.symbol.clone())
1658                                            .or_insert_with(VecDeque::new);
1659
1660                                        symbol_trades.push_front(trade);
1661                                        if symbol_trades.len() > 1000 {
1662                                            symbol_trades.pop_back();
1663                                        }
1664                                    }
1665                                }
1666                            }
1667
1668                            return self.filter_orders(symbol, since, limit).await;
1669                        }
1670                    }
1671                }
1672            } else {
1673                return Err(Error::network("Subscription channel closed"));
1674            }
1675        }
1676    }
1677
1678    /// Watches for my trades (internal helper)
1679    async fn watch_my_trades_internal(
1680        &self,
1681        symbol: Option<&str>,
1682        since: Option<i64>,
1683        limit: Option<usize>,
1684    ) -> Result<Vec<Trade>> {
1685        self.connect_user_stream().await?;
1686
1687        let capacity = self.channel_capacity_for(&SubscriptionType::MyTrades);
1688        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1689
1690        self.subscription_manager
1691            .add_subscription(
1692                "!userData".to_string(),
1693                "user".to_string(),
1694                SubscriptionType::MyTrades,
1695                tx,
1696            )
1697            .await?;
1698
1699        loop {
1700            if let Some(msg) = rx.recv().await {
1701                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
1702                    if event_type == "executionReport" {
1703                        if let Ok(trade) = BinanceWs::parse_ws_trade(&msg) {
1704                            let symbol_key = trade.symbol.clone();
1705
1706                            let mut trades_map = self.my_trades.write().await;
1707                            let symbol_trades =
1708                                trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
1709
1710                            symbol_trades.push_front(trade);
1711                            if symbol_trades.len() > 1000 {
1712                                symbol_trades.pop_back();
1713                            }
1714
1715                            drop(trades_map);
1716                            return self.filter_my_trades(symbol, since, limit).await;
1717                        }
1718                    }
1719                }
1720            } else {
1721                return Err(Error::network("Subscription channel closed"));
1722            }
1723        }
1724    }
1725
1726    /// Watches for positions (internal helper)
1727    async fn watch_positions_internal(
1728        &self,
1729        symbols: Option<Vec<String>>,
1730        since: Option<i64>,
1731        limit: Option<usize>,
1732    ) -> Result<Vec<Position>> {
1733        self.connect_user_stream().await?;
1734
1735        let capacity = self.channel_capacity_for(&SubscriptionType::Positions);
1736        let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1737
1738        self.subscription_manager
1739            .add_subscription(
1740                "!userData".to_string(),
1741                "user".to_string(),
1742                SubscriptionType::Positions,
1743                tx,
1744            )
1745            .await?;
1746
1747        loop {
1748            if let Some(msg) = rx.recv().await {
1749                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
1750                    if event_type == "ACCOUNT_UPDATE" {
1751                        if let Some(account_data) = msg.get("a") {
1752                            if let Some(positions_array) =
1753                                account_data.get("P").and_then(|p| p.as_array())
1754                            {
1755                                for position_data in positions_array {
1756                                    if let Ok(position) =
1757                                        BinanceWs::parse_ws_position(position_data)
1758                                    {
1759                                        let symbol_key = position.symbol.clone();
1760                                        let side_key = position
1761                                            .side
1762                                            .clone()
1763                                            .unwrap_or_else(|| "both".to_string());
1764
1765                                        let mut positions_map = self.positions.write().await;
1766                                        let symbol_positions = positions_map
1767                                            .entry(symbol_key)
1768                                            .or_insert_with(HashMap::new);
1769
1770                                        if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
1771                                            symbol_positions.remove(&side_key);
1772                                            if symbol_positions.is_empty() {
1773                                                positions_map.remove(&position.symbol);
1774                                            }
1775                                        } else {
1776                                            symbol_positions.insert(side_key, position);
1777                                        }
1778                                    }
1779                                }
1780
1781                                let symbols_ref = symbols.as_deref();
1782                                return self.filter_positions(symbols_ref, since, limit).await;
1783                            }
1784                        }
1785                    }
1786                }
1787            } else {
1788                return Err(Error::network("Subscription channel closed"));
1789            }
1790        }
1791    }
1792
1793    /// Filters cached orders by symbol, time range, and limit
1794    async fn filter_orders(
1795        &self,
1796        symbol: Option<&str>,
1797        since: Option<i64>,
1798        limit: Option<usize>,
1799    ) -> Result<Vec<Order>> {
1800        let orders_map = self.orders.read().await;
1801
1802        let mut orders: Vec<Order> = if let Some(sym) = symbol {
1803            orders_map
1804                .get(sym)
1805                .map(|symbol_orders| symbol_orders.values().cloned().collect())
1806                .unwrap_or_default()
1807        } else {
1808            orders_map
1809                .values()
1810                .flat_map(|symbol_orders| symbol_orders.values().cloned())
1811                .collect()
1812        };
1813
1814        if let Some(since_ts) = since {
1815            orders.retain(|order| order.timestamp.is_some_and(|ts| ts >= since_ts));
1816        }
1817
1818        orders.sort_by(|a, b| {
1819            let ts_a = a.timestamp.unwrap_or(0);
1820            let ts_b = b.timestamp.unwrap_or(0);
1821            ts_b.cmp(&ts_a)
1822        });
1823
1824        if let Some(lim) = limit {
1825            orders.truncate(lim);
1826        }
1827
1828        Ok(orders)
1829    }
1830
1831    /// Parses a WebSocket trade message
1832    fn parse_ws_trade(data: &Value) -> Result<Trade> {
1833        user_data::parse_ws_trade(data)
1834    }
1835
1836    /// Filters cached personal trades by symbol, time range, and limit
1837    async fn filter_my_trades(
1838        &self,
1839        symbol: Option<&str>,
1840        since: Option<i64>,
1841        limit: Option<usize>,
1842    ) -> Result<Vec<Trade>> {
1843        let trades_map = self.my_trades.read().await;
1844
1845        let mut trades: Vec<Trade> = if let Some(sym) = symbol {
1846            trades_map
1847                .get(sym)
1848                .map(|symbol_trades| symbol_trades.iter().cloned().collect())
1849                .unwrap_or_default()
1850        } else {
1851            trades_map
1852                .values()
1853                .flat_map(|symbol_trades| symbol_trades.iter().cloned())
1854                .collect()
1855        };
1856
1857        if let Some(since_ts) = since {
1858            trades.retain(|trade| trade.timestamp >= since_ts);
1859        }
1860
1861        trades.sort_by(|a, b| {
1862            let ts_a = a.timestamp;
1863            let ts_b = b.timestamp;
1864            ts_b.cmp(&ts_a)
1865        });
1866
1867        if let Some(lim) = limit {
1868            trades.truncate(lim);
1869        }
1870
1871        Ok(trades)
1872    }
1873
1874    /// Parses a WebSocket position payload
1875    fn parse_ws_position(data: &Value) -> Result<Position> {
1876        user_data::parse_ws_position(data)
1877    }
1878
1879    /// Filters cached positions by symbol, time range, and limit
1880    async fn filter_positions(
1881        &self,
1882        symbols: Option<&[String]>,
1883        since: Option<i64>,
1884        limit: Option<usize>,
1885    ) -> Result<Vec<Position>> {
1886        let positions_map = self.positions.read().await;
1887
1888        let mut positions: Vec<Position> = if let Some(syms) = symbols {
1889            syms.iter()
1890                .filter_map(|sym| positions_map.get(sym))
1891                .flat_map(|side_map| side_map.values().cloned())
1892                .collect()
1893        } else {
1894            positions_map
1895                .values()
1896                .flat_map(|side_map| side_map.values().cloned())
1897                .collect()
1898        };
1899
1900        if let Some(since_ts) = since {
1901            positions.retain(|pos| pos.timestamp.is_some_and(|ts| ts >= since_ts));
1902        }
1903
1904        positions.sort_by(|a, b| {
1905            let ts_a = a.timestamp.unwrap_or(0);
1906            let ts_b = b.timestamp.unwrap_or(0);
1907            ts_b.cmp(&ts_a)
1908        });
1909
1910        if let Some(lim) = limit {
1911            positions.truncate(lim);
1912        }
1913
1914        Ok(positions)
1915    }
1916}
1917
1918// Include Binance impl methods in a separate file to keep mod.rs manageable
1919include!("binance_impl.rs");
1920
1921#[cfg(test)]
1922#[allow(clippy::disallowed_methods)]
1923mod tests {
1924    use super::*;
1925    use streams::WS_BASE_URL;
1926    use types::{
1927        DEFAULT_ORDERBOOK_CAPACITY, DEFAULT_TICKER_CAPACITY, DEFAULT_TRADES_CAPACITY,
1928        DEFAULT_USER_DATA_CAPACITY,
1929    };
1930
1931    #[tokio::test]
1932    async fn test_binance_ws_creation() {
1933        let ws = BinanceWs::new(WS_BASE_URL.to_string());
1934        assert!(ws.listen_key.try_read().is_ok());
1935    }
1936
1937    #[test]
1938    fn test_stream_format() {
1939        let symbol = "btcusdt";
1940
1941        let ticker_stream = format!("{}@ticker", symbol);
1942        assert_eq!(ticker_stream, "btcusdt@ticker");
1943
1944        let trade_stream = format!("{}@trade", symbol);
1945        assert_eq!(trade_stream, "btcusdt@trade");
1946
1947        let depth_stream = format!("{}@depth20", symbol);
1948        assert_eq!(depth_stream, "btcusdt@depth20");
1949
1950        let kline_stream = format!("{}@kline_1m", symbol);
1951        assert_eq!(kline_stream, "btcusdt@kline_1m");
1952    }
1953
1954    #[tokio::test]
1955    async fn test_subscription_manager_basic() {
1956        let manager = SubscriptionManager::new();
1957        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
1958
1959        assert_eq!(manager.active_count(), 0);
1960        assert!(!manager.has_subscription("btcusdt@ticker").await);
1961
1962        manager
1963            .add_subscription(
1964                "btcusdt@ticker".to_string(),
1965                "BTCUSDT".to_string(),
1966                SubscriptionType::Ticker,
1967                tx.clone(),
1968            )
1969            .await
1970            .unwrap();
1971
1972        assert_eq!(manager.active_count(), 1);
1973        assert!(manager.has_subscription("btcusdt@ticker").await);
1974
1975        let sub = manager.get_subscription("btcusdt@ticker").await;
1976        assert!(sub.is_some());
1977        let sub = sub.unwrap();
1978        assert_eq!(sub.stream, "btcusdt@ticker");
1979        assert_eq!(sub.symbol, "BTCUSDT");
1980        assert_eq!(sub.sub_type, SubscriptionType::Ticker);
1981
1982        manager.remove_subscription("btcusdt@ticker").await.unwrap();
1983        assert_eq!(manager.active_count(), 0);
1984        assert!(!manager.has_subscription("btcusdt@ticker").await);
1985    }
1986
1987    #[test]
1988    fn test_symbol_conversion() {
1989        let symbol = "BTC/USDT";
1990        let binance_symbol = symbol.replace('/', "").to_lowercase();
1991        assert_eq!(binance_symbol, "btcusdt");
1992    }
1993
1994    #[test]
1995    fn test_ws_health_snapshot_default() {
1996        let health = WsHealthSnapshot::default();
1997        assert_eq!(health.messages_received, 0);
1998        assert_eq!(health.messages_dropped, 0);
1999        assert!(health.latency_ms.is_none());
2000        assert!(health.last_message_time.is_none());
2001        assert_eq!(health.connection_uptime_ms, 0);
2002        assert_eq!(health.reconnect_count, 0);
2003    }
2004
2005    #[test]
2006    fn test_ws_health_snapshot_is_healthy() {
2007        let mut health = WsHealthSnapshot::default();
2008
2009        // Empty snapshot is healthy (no data yet)
2010        assert!(health.is_healthy());
2011
2012        // High drop rate is unhealthy
2013        health.messages_received = 100;
2014        health.messages_dropped = 20; // 20% drop rate
2015        assert!(!health.is_healthy());
2016
2017        // Low drop rate is healthy
2018        health.messages_dropped = 5; // 5% drop rate
2019        assert!(health.is_healthy());
2020    }
2021
2022    #[tokio::test]
2023    async fn test_shutdown_sets_flags() {
2024        let ws = BinanceWs::new("wss://stream.binance.com:9443/ws".to_string());
2025
2026        // Initially not shutting down
2027        assert!(!ws.is_shutting_down());
2028
2029        // After shutdown, flag should be set
2030        let _ = ws.shutdown().await;
2031        assert!(ws.is_shutting_down());
2032    }
2033
2034    #[tokio::test]
2035    async fn test_shutdown_is_idempotent() {
2036        let ws = BinanceWs::new("wss://stream.binance.com:9443/ws".to_string());
2037
2038        // Multiple shutdowns should not panic
2039        let result1 = ws.shutdown().await;
2040        let result2 = ws.shutdown().await;
2041
2042        assert!(result1.is_ok());
2043        assert!(result2.is_ok());
2044    }
2045
2046    #[tokio::test]
2047    async fn test_channel_capacity_configuration() {
2048        // Test default configuration
2049        let ws = BinanceWs::new("wss://stream.binance.com:9443/ws".to_string());
2050
2051        assert_eq!(
2052            ws.channel_capacity_for(&SubscriptionType::Ticker),
2053            DEFAULT_TICKER_CAPACITY
2054        );
2055        assert_eq!(
2056            ws.channel_capacity_for(&SubscriptionType::OrderBook),
2057            DEFAULT_ORDERBOOK_CAPACITY
2058        );
2059        assert_eq!(
2060            ws.channel_capacity_for(&SubscriptionType::Trades),
2061            DEFAULT_TRADES_CAPACITY
2062        );
2063        assert_eq!(
2064            ws.channel_capacity_for(&SubscriptionType::Balance),
2065            DEFAULT_USER_DATA_CAPACITY
2066        );
2067    }
2068
2069    #[tokio::test]
2070    async fn test_custom_channel_capacity_configuration() {
2071        use ccxt_core::ws_client::BackpressureStrategy;
2072
2073        let custom_config = WsChannelConfig {
2074            ticker_capacity: 128,
2075            orderbook_capacity: 256,
2076            trades_capacity: 512,
2077            user_data_capacity: 64,
2078        };
2079
2080        let config = BinanceWsConfig::new("wss://stream.binance.com:9443/ws".to_string())
2081            .with_channel_config(custom_config)
2082            .with_backpressure(BackpressureStrategy::DropOldest);
2083
2084        let ws = BinanceWs::new_with_config(config);
2085
2086        assert_eq!(ws.channel_capacity_for(&SubscriptionType::Ticker), 128);
2087        assert_eq!(ws.channel_capacity_for(&SubscriptionType::OrderBook), 256);
2088        assert_eq!(ws.channel_capacity_for(&SubscriptionType::Trades), 512);
2089        assert_eq!(ws.channel_capacity_for(&SubscriptionType::Balance), 64);
2090        assert_eq!(ws.channel_capacity_for(&SubscriptionType::Orders), 64);
2091        assert_eq!(ws.channel_capacity_for(&SubscriptionType::MyTrades), 64);
2092        assert_eq!(ws.channel_capacity_for(&SubscriptionType::Positions), 64);
2093    }
2094}