Skip to main content

ccxt_exchanges/binance/ws/
mod.rs

1//! Binance WebSocket implementation
2//!
3//! Provides WebSocket real-time data stream subscriptions for the Binance exchange
4
5/// Connection manager module
6pub mod connection_manager;
7mod handlers;
8mod listen_key;
9mod streams;
10mod subscriptions;
11pub(crate) mod user_data;
12
13// Re-export public types for backward compatibility
14pub use connection_manager::BinanceConnectionManager;
15pub use handlers::MessageRouter;
16pub use listen_key::ListenKeyManager;
17pub use streams::*;
18pub use subscriptions::{ReconnectConfig, Subscription, SubscriptionManager, SubscriptionType};
19
20use crate::binance::{Binance, parser};
21use ccxt_core::error::{Error, Result};
22use ccxt_core::types::{
23    Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
24};
25use serde_json::Value;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::time::Duration;
29use tokio::sync::{Mutex, RwLock};
30
31const MAX_TRADES: usize = 1000;
32const MAX_OHLCVS: usize = 1000;
33
34/// Binance WebSocket client wrapper
35pub struct BinanceWs {
36    pub(crate) message_router: Arc<MessageRouter>,
37    pub(crate) subscription_manager: Arc<SubscriptionManager>,
38    listen_key: Arc<RwLock<Option<String>>>,
39    listen_key_manager: Option<Arc<ListenKeyManager>>,
40    pub(crate) tickers: Arc<Mutex<HashMap<String, Ticker>>>,
41    pub(crate) bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
42    #[allow(dead_code)]
43    mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
44    pub(crate) orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
45    pub(crate) trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
46    pub(crate) ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
47    pub(crate) balances: Arc<RwLock<HashMap<String, Balance>>>,
48    pub(crate) orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
49    pub(crate) my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
50    pub(crate) positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
51}
52
53impl std::fmt::Debug for BinanceWs {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("BinanceWs")
56            .field("is_connected", &self.message_router.is_connected())
57            .finish_non_exhaustive()
58    }
59}
60
61impl BinanceWs {
62    /// Creates a new Binance WebSocket client
63    pub fn new(url: String) -> Self {
64        let subscription_manager = Arc::new(SubscriptionManager::new());
65        let message_router = Arc::new(MessageRouter::new(url, subscription_manager.clone(), None));
66
67        // Start the router immediately
68        let router_clone = message_router.clone();
69        tokio::spawn(async move {
70            if let Err(e) = router_clone.start(None).await {
71                tracing::error!("Failed to start MessageRouter: {}", e);
72            }
73        });
74
75        Self {
76            message_router,
77            subscription_manager,
78            listen_key: Arc::new(RwLock::new(None)),
79            listen_key_manager: None,
80            tickers: Arc::new(Mutex::new(HashMap::new())),
81            bids_asks: Arc::new(Mutex::new(HashMap::new())),
82            mark_prices: Arc::new(Mutex::new(HashMap::new())),
83            orderbooks: Arc::new(Mutex::new(HashMap::new())),
84            trades: Arc::new(Mutex::new(HashMap::new())),
85            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
86            balances: Arc::new(RwLock::new(HashMap::new())),
87            orders: Arc::new(RwLock::new(HashMap::new())),
88            my_trades: Arc::new(RwLock::new(HashMap::new())),
89            positions: Arc::new(RwLock::new(HashMap::new())),
90        }
91    }
92
93    /// Creates a WebSocket client with a listen key manager
94    pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
95        let subscription_manager = Arc::new(SubscriptionManager::new());
96        let listen_key_manager = Arc::new(ListenKeyManager::new(binance));
97        let message_router = Arc::new(MessageRouter::new(
98            url,
99            subscription_manager.clone(),
100            Some(listen_key_manager.clone()),
101        ));
102
103        // Start the router immediately
104        let router_clone = message_router.clone();
105        tokio::spawn(async move {
106            if let Err(e) = router_clone.start(None).await {
107                tracing::error!("Failed to start MessageRouter: {}", e);
108            }
109        });
110
111        Self {
112            message_router,
113            subscription_manager,
114            listen_key: Arc::new(RwLock::new(None)),
115            listen_key_manager: Some(listen_key_manager),
116            tickers: Arc::new(Mutex::new(HashMap::new())),
117            bids_asks: Arc::new(Mutex::new(HashMap::new())),
118            mark_prices: Arc::new(Mutex::new(HashMap::new())),
119            orderbooks: Arc::new(Mutex::new(HashMap::new())),
120            trades: Arc::new(Mutex::new(HashMap::new())),
121            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
122            balances: Arc::new(RwLock::new(HashMap::new())),
123            orders: Arc::new(RwLock::new(HashMap::new())),
124            my_trades: Arc::new(RwLock::new(HashMap::new())),
125            positions: Arc::new(RwLock::new(HashMap::new())),
126        }
127    }
128
129    /// Connects to the WebSocket server
130    pub async fn connect(&self) -> Result<()> {
131        if self.is_connected() {
132            return Ok(());
133        }
134
135        self.message_router.start(None).await?;
136
137        // No auto-reconnect coordinator needed as MessageRouter handles it internally.
138        // The router's loop manages connection state and retries.
139
140        Ok(())
141    }
142
143    /// Disconnects from the WebSocket server
144    pub async fn disconnect(&self) -> Result<()> {
145        self.message_router.stop().await?;
146
147        if let Some(manager) = &self.listen_key_manager {
148            manager.stop_auto_refresh().await;
149        }
150
151        Ok(())
152    }
153
154    /// Connects to the user data stream
155    pub async fn connect_user_stream(&self) -> Result<()> {
156        let manager = self.listen_key_manager.as_ref()
157            .ok_or_else(|| Error::invalid_request(
158                "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
159            ))?;
160
161        let listen_key = manager.get_or_create().await?;
162
163        let base_url = self.message_router.get_url();
164        let base_url = if let Some(stripped) = base_url.strip_suffix('/') {
165            stripped
166        } else {
167            &base_url
168        };
169
170        let url = format!("{}/{}", base_url, listen_key);
171
172        self.message_router.start(Some(url)).await?;
173        manager.start_auto_refresh().await;
174        *self.listen_key.write().await = Some(listen_key);
175
176        Ok(())
177    }
178
179    /// Closes the user data stream
180    pub async fn close_user_stream(&self) -> Result<()> {
181        if let Some(manager) = &self.listen_key_manager {
182            manager.delete().await?;
183        }
184        *self.listen_key.write().await = None;
185        Ok(())
186    }
187
188    /// Returns the active listen key, when available
189    pub async fn get_listen_key(&self) -> Option<String> {
190        if let Some(manager) = &self.listen_key_manager {
191            manager.get_current().await
192        } else {
193            self.listen_key.read().await.clone()
194        }
195    }
196
197    /// Subscribes to the ticker stream for a symbol
198    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
199        let stream = format!("{}@ticker", symbol.to_lowercase());
200        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
201
202        self.subscription_manager
203            .add_subscription(
204                stream.clone(),
205                symbol.to_string(),
206                SubscriptionType::Ticker,
207                tx,
208            )
209            .await?;
210
211        self.message_router.subscribe(vec![stream]).await
212    }
213
214    /// Subscribes to the 24-hour ticker stream for all symbols
215    pub async fn subscribe_all_tickers(&self) -> Result<()> {
216        let stream = "!ticker@arr".to_string();
217        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
218
219        self.subscription_manager
220            .add_subscription(
221                stream.clone(),
222                "all".to_string(),
223                SubscriptionType::Ticker,
224                tx,
225            )
226            .await?;
227
228        self.message_router.subscribe(vec![stream]).await
229    }
230
231    /// Subscribes to real-time trade executions for a symbol
232    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
233        let stream = format!("{}@trade", symbol.to_lowercase());
234        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
235
236        self.subscription_manager
237            .add_subscription(
238                stream.clone(),
239                symbol.to_string(),
240                SubscriptionType::Trades,
241                tx,
242            )
243            .await?;
244
245        self.message_router.subscribe(vec![stream]).await
246    }
247
248    /// Subscribes to the aggregated trade stream for a symbol
249    pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
250        let stream = format!("{}@aggTrade", symbol.to_lowercase());
251        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
252
253        self.subscription_manager
254            .add_subscription(
255                stream.clone(),
256                symbol.to_string(),
257                SubscriptionType::Trades,
258                tx,
259            )
260            .await?;
261
262        self.message_router.subscribe(vec![stream]).await
263    }
264
265    /// Subscribes to the order book depth stream
266    pub async fn subscribe_orderbook(
267        &self,
268        symbol: &str,
269        levels: u32,
270        update_speed: &str,
271    ) -> Result<()> {
272        let stream = if update_speed == "100ms" {
273            format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
274        } else {
275            format!("{}@depth{}", symbol.to_lowercase(), levels)
276        };
277        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
278
279        self.subscription_manager
280            .add_subscription(
281                stream.clone(),
282                symbol.to_string(),
283                SubscriptionType::OrderBook,
284                tx,
285            )
286            .await?;
287
288        self.message_router.subscribe(vec![stream]).await
289    }
290
291    /// Subscribes to the diff order book stream
292    pub async fn subscribe_orderbook_diff(
293        &self,
294        symbol: &str,
295        update_speed: Option<&str>,
296    ) -> Result<()> {
297        let stream = if let Some(speed) = update_speed {
298            if speed == "100ms" {
299                format!("{}@depth@100ms", symbol.to_lowercase())
300            } else {
301                format!("{}@depth", symbol.to_lowercase())
302            }
303        } else {
304            format!("{}@depth", symbol.to_lowercase())
305        };
306        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
307
308        self.subscription_manager
309            .add_subscription(
310                stream.clone(),
311                symbol.to_string(),
312                SubscriptionType::OrderBook,
313                tx,
314            )
315            .await?;
316
317        self.message_router.subscribe(vec![stream]).await
318    }
319
320    /// Subscribes to Kline (candlestick) data for a symbol
321    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
322        let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
323        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
324
325        self.subscription_manager
326            .add_subscription(
327                stream.clone(),
328                symbol.to_string(),
329                SubscriptionType::Kline(interval.to_string()),
330                tx,
331            )
332            .await?;
333
334        self.message_router.subscribe(vec![stream]).await
335    }
336
337    /// Subscribes to the mini ticker stream for a symbol
338    pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
339        let stream = format!("{}@miniTicker", symbol.to_lowercase());
340        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
341
342        self.subscription_manager
343            .add_subscription(
344                stream.clone(),
345                symbol.to_string(),
346                SubscriptionType::Ticker,
347                tx,
348            )
349            .await?;
350
351        self.message_router.subscribe(vec![stream]).await
352    }
353
354    /// Subscribes to the mini ticker stream for all symbols
355    pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
356        let stream = "!miniTicker@arr".to_string();
357        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
358
359        self.subscription_manager
360            .add_subscription(
361                stream.clone(),
362                "all".to_string(),
363                SubscriptionType::Ticker,
364                tx,
365            )
366            .await?;
367
368        self.message_router.subscribe(vec![stream]).await
369    }
370
371    /// Cancels an existing subscription
372    pub async fn unsubscribe(&self, stream: String) -> Result<()> {
373        self.subscription_manager
374            .remove_subscription(&stream)
375            .await?;
376        self.message_router.unsubscribe(vec![stream]).await
377    }
378
379    /// Receives the next available message
380    pub fn receive(&self) -> Option<Value> {
381        None
382    }
383
384    /// Indicates whether the WebSocket connection is active
385    pub fn is_connected(&self) -> bool {
386        self.message_router.is_connected()
387    }
388
389    /// Returns the current connection state.
390    pub fn state(&self) -> ccxt_core::ws_client::WsConnectionState {
391        if self.message_router.is_connected() {
392            ccxt_core::ws_client::WsConnectionState::Connected
393        } else {
394            ccxt_core::ws_client::WsConnectionState::Disconnected
395        }
396    }
397
398    /// Returns the list of active subscriptions.
399    ///
400    /// Returns a vector of subscription channel names that are currently active.
401    /// This method retrieves the actual subscriptions from the underlying WsClient's
402    /// subscription manager, providing accurate state tracking.
403    pub fn subscriptions(&self) -> Vec<String> {
404        let subs = self.subscription_manager.get_all_subscriptions_sync();
405        subs.into_iter().map(|s| s.stream).collect()
406    }
407
408    /// Returns the number of active subscriptions.
409    pub fn subscription_count(&self) -> usize {
410        self.subscription_manager.active_count()
411    }
412
413    /// Watches a single mark price stream (internal helper)
414    async fn watch_mark_price_internal(
415        &self,
416        symbol: &str,
417        channel_name: &str,
418    ) -> Result<MarkPrice> {
419        let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
420        tracing::debug!(
421            "watch_mark_price_internal: stream={}, symbol={}",
422            stream,
423            symbol
424        );
425
426        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
427        self.subscription_manager
428            .add_subscription(
429                stream.clone(),
430                symbol.to_string(),
431                SubscriptionType::MarkPrice,
432                tx,
433            )
434            .await?;
435
436        self.message_router.subscribe(vec![stream.clone()]).await?;
437
438        loop {
439            if let Some(message) = rx.recv().await {
440                if message.get("result").is_some() {
441                    tracing::debug!("Received subscription result for {}", stream);
442                    continue;
443                }
444
445                tracing::debug!("Received mark price message for {}", stream);
446
447                match parser::parse_ws_mark_price(&message) {
448                    Ok(mark_price) => {
449                        let mut mark_prices = self.mark_prices.lock().await;
450                        mark_prices.insert(mark_price.symbol.clone(), mark_price.clone());
451                        return Ok(mark_price);
452                    }
453                    Err(e) => {
454                        tracing::warn!(
455                            "Failed to parse mark price message for stream {}: {:?}. Payload: {:?}",
456                            stream,
457                            e,
458                            message
459                        );
460                    }
461                }
462            } else {
463                tracing::warn!("Subscription channel closed for {}", stream);
464                return Err(Error::network("Subscription channel closed"));
465            }
466        }
467    }
468
469    /// Watches multiple mark price streams (internal helper)
470    async fn watch_mark_prices_internal(
471        &self,
472        symbols: Option<Vec<String>>,
473        channel_name: &str,
474    ) -> Result<HashMap<String, MarkPrice>> {
475        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
476
477        let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
478            let mut streams = Vec::with_capacity(syms.len());
479            for sym in syms {
480                let symbol = sym.to_lowercase();
481                let stream = format!("{}@{}", symbol, channel_name);
482                self.subscription_manager
483                    .add_subscription(
484                        stream.clone(),
485                        symbol,
486                        SubscriptionType::MarkPrice,
487                        tx.clone(),
488                    )
489                    .await?;
490                streams.push(stream);
491            }
492            streams
493        } else {
494            let stream = format!("!{}@arr", channel_name);
495            self.subscription_manager
496                .add_subscription(
497                    stream.clone(),
498                    "all".to_string(),
499                    SubscriptionType::MarkPrice,
500                    tx.clone(),
501                )
502                .await?;
503            vec![stream]
504        };
505
506        self.message_router.subscribe(streams.clone()).await?;
507
508        let mut result = HashMap::new();
509
510        loop {
511            if let Some(message) = rx.recv().await {
512                if message.get("result").is_some() {
513                    continue;
514                }
515
516                if let Some(arr) = message.as_array() {
517                    for item in arr {
518                        if let Ok(mark_price) = parser::parse_ws_mark_price(item) {
519                            let symbol = mark_price.symbol.clone();
520
521                            if let Some(syms) = &symbols {
522                                if syms.contains(&symbol.to_lowercase()) {
523                                    result.insert(symbol.clone(), mark_price.clone());
524                                }
525                            } else {
526                                result.insert(symbol.clone(), mark_price.clone());
527                            }
528
529                            let mut mark_prices = self.mark_prices.lock().await;
530                            mark_prices.insert(symbol, mark_price);
531                        } else {
532                            tracing::warn!("Failed to parse item in mark price array: {:?}", item);
533                        }
534                    }
535
536                    if let Some(syms) = &symbols {
537                        if result.len() >= syms.len() {
538                            return Ok(result);
539                        }
540                    } else {
541                        // For array updates without specific symbols filter, return what we got
542                        return Ok(result);
543                    }
544                } else {
545                    match parser::parse_ws_mark_price(&message) {
546                        Ok(mark_price) => {
547                            let symbol = mark_price.symbol.clone();
548                            result.insert(symbol.clone(), mark_price.clone());
549
550                            let mut mark_prices = self.mark_prices.lock().await;
551                            mark_prices.insert(symbol, mark_price);
552
553                            if let Some(syms) = &symbols {
554                                if result.len() >= syms.len() {
555                                    return Ok(result);
556                                }
557                            }
558                        }
559                        Err(e) => {
560                            tracing::warn!(
561                                "Failed to parse mark price message: {:?}. Payload: {:?}",
562                                e,
563                                message
564                            );
565                        }
566                    }
567                }
568            } else {
569                return Err(Error::network("Subscription channel closed"));
570            }
571        }
572    }
573
574    /// Watches a single ticker stream (internal helper)
575    async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
576        let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
577
578        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
579        self.subscription_manager
580            .add_subscription(
581                stream.clone(),
582                symbol.to_string(),
583                SubscriptionType::Ticker,
584                tx,
585            )
586            .await?;
587
588        self.message_router.subscribe(vec![stream.clone()]).await?;
589
590        loop {
591            if let Some(message) = rx.recv().await {
592                if message.get("result").is_some() {
593                    continue;
594                }
595
596                match parser::parse_ws_ticker(&message, None) {
597                    Ok(ticker) => {
598                        let mut tickers = self.tickers.lock().await;
599                        tickers.insert(ticker.symbol.clone(), ticker.clone());
600                        return Ok(ticker);
601                    }
602                    Err(e) => {
603                        tracing::warn!(
604                            "Failed to parse ticker message for stream {}: {:?}. Payload: {:?}",
605                            stream,
606                            e,
607                            message
608                        );
609                        // Continue waiting for the next message instead of hanging silently
610                    }
611                }
612            } else {
613                return Err(Error::network("Subscription channel closed"));
614            }
615        }
616    }
617
618    /// Watches multiple ticker streams (internal helper)
619    async fn watch_tickers_internal(
620        &self,
621        symbols: Option<Vec<String>>,
622        channel_name: &str,
623    ) -> Result<HashMap<String, Ticker>> {
624        let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
625            syms.iter()
626                .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
627                .collect()
628        } else {
629            vec![format!("!{}@arr", channel_name)]
630        };
631
632        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
633
634        for stream in &streams {
635            self.subscription_manager
636                .add_subscription(
637                    stream.clone(),
638                    "all".to_string(),
639                    SubscriptionType::Ticker,
640                    tx.clone(),
641                )
642                .await?;
643        }
644
645        self.message_router.subscribe(streams.clone()).await?;
646
647        let mut result = HashMap::new();
648
649        loop {
650            if let Some(message) = rx.recv().await {
651                if message.get("result").is_some() {
652                    continue;
653                }
654
655                if let Some(arr) = message.as_array() {
656                    for item in arr {
657                        if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
658                            let symbol = ticker.symbol.clone();
659
660                            if let Some(syms) = &symbols {
661                                if syms.contains(&symbol.to_lowercase()) {
662                                    result.insert(symbol.clone(), ticker.clone());
663                                }
664                            } else {
665                                result.insert(symbol.clone(), ticker.clone());
666                            }
667
668                            let mut tickers = self.tickers.lock().await;
669                            tickers.insert(symbol, ticker);
670                        } else {
671                            tracing::warn!("Failed to parse item in ticker array: {:?}", item);
672                        }
673                    }
674
675                    if let Some(syms) = &symbols {
676                        if result.len() >= syms.len() {
677                            return Ok(result);
678                        }
679                    } else {
680                        // If we received an array but we are not waiting for specific symbols,
681                        // we can assume we got the update for "all" markets.
682                        // However, !ticker@arr returns ALL tickers in one message usually.
683                        return Ok(result);
684                    }
685                } else {
686                    match parser::parse_ws_ticker(&message, None) {
687                        Ok(ticker) => {
688                            let symbol = ticker.symbol.clone();
689                            result.insert(symbol.clone(), ticker.clone());
690
691                            let mut tickers = self.tickers.lock().await;
692                            tickers.insert(symbol, ticker);
693
694                            if let Some(syms) = &symbols {
695                                if result.len() >= syms.len() {
696                                    return Ok(result);
697                                }
698                            }
699                        }
700                        Err(e) => {
701                            tracing::warn!(
702                                "Failed to parse ticker message: {:?}. Payload: {:?}",
703                                e,
704                                message
705                            );
706                        }
707                    }
708                }
709            } else {
710                return Err(Error::network("Subscription channel closed"));
711            }
712        }
713    }
714
715    /// Processes an order book delta update (internal helper)
716    async fn handle_orderbook_delta(
717        &self,
718        symbol: &str,
719        delta_message: &Value,
720        is_futures: bool,
721    ) -> Result<()> {
722        handlers::handle_orderbook_delta(symbol, delta_message, is_futures, &self.orderbooks).await
723    }
724
725    /// Retrieves an order book snapshot and initializes cached state (internal helper)
726    async fn fetch_orderbook_snapshot(
727        &self,
728        exchange: &Binance,
729        symbol: &str,
730        limit: Option<i64>,
731        is_futures: bool,
732    ) -> Result<OrderBook> {
733        handlers::fetch_orderbook_snapshot(exchange, symbol, limit, is_futures, &self.orderbooks)
734            .await
735    }
736
737    /// Watches a single order book stream (internal helper)
738    async fn watch_orderbook_internal(
739        &self,
740        exchange: &Binance,
741        symbol: &str,
742        limit: Option<i64>,
743        update_speed: i32,
744        is_futures: bool,
745    ) -> Result<OrderBook> {
746        let stream = if update_speed == 100 {
747            format!("{}@depth@100ms", symbol.to_lowercase())
748        } else {
749            format!("{}@depth", symbol.to_lowercase())
750        };
751
752        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
753        self.subscription_manager
754            .add_subscription(
755                stream.clone(),
756                symbol.to_string(),
757                SubscriptionType::OrderBook,
758                tx,
759            )
760            .await?;
761
762        self.message_router.subscribe(vec![stream.clone()]).await?;
763
764        tokio::time::sleep(Duration::from_millis(500)).await;
765
766        let _snapshot = self
767            .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
768            .await?;
769
770        loop {
771            if let Some(message) = rx.recv().await {
772                if message.get("result").is_some() {
773                    continue;
774                }
775
776                if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
777                    if event_type == "depthUpdate" {
778                        match self
779                            .handle_orderbook_delta(symbol, &message, is_futures)
780                            .await
781                        {
782                            Ok(()) => {
783                                let orderbooks = self.orderbooks.lock().await;
784                                if let Some(ob) = orderbooks.get(symbol) {
785                                    if ob.is_synced {
786                                        return Ok(ob.clone());
787                                    }
788                                }
789                            }
790                            Err(e) => {
791                                let err_msg = e.to_string();
792
793                                if err_msg.contains("RESYNC_NEEDED") {
794                                    tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
795
796                                    let current_time = chrono::Utc::now().timestamp_millis();
797                                    let should_resync = {
798                                        let orderbooks = self.orderbooks.lock().await;
799                                        if let Some(ob) = orderbooks.get(symbol) {
800                                            ob.should_resync(current_time)
801                                        } else {
802                                            true
803                                        }
804                                    };
805
806                                    if should_resync {
807                                        tracing::info!("Initiating resync for {}", symbol);
808
809                                        {
810                                            let mut orderbooks = self.orderbooks.lock().await;
811                                            if let Some(ob) = orderbooks.get_mut(symbol) {
812                                                ob.reset_for_resync();
813                                                ob.mark_resync_initiated(current_time);
814                                            }
815                                        }
816
817                                        tokio::time::sleep(Duration::from_millis(500)).await;
818
819                                        match self
820                                            .fetch_orderbook_snapshot(
821                                                exchange, symbol, limit, is_futures,
822                                            )
823                                            .await
824                                        {
825                                            Ok(_) => {
826                                                tracing::info!(
827                                                    "Resync completed successfully for {}",
828                                                    symbol
829                                                );
830                                                continue;
831                                            }
832                                            Err(resync_err) => {
833                                                tracing::error!(
834                                                    "Resync failed for {}: {}",
835                                                    symbol,
836                                                    resync_err
837                                                );
838                                                return Err(resync_err);
839                                            }
840                                        }
841                                    }
842                                    tracing::debug!("Resync rate limited for {}, skipping", symbol);
843                                }
844                                tracing::error!("Failed to handle orderbook delta: {}", err_msg);
845                            }
846                        }
847                    }
848                }
849            } else {
850                return Err(Error::network("Subscription channel closed"));
851            }
852        }
853    }
854
855    /// Watches multiple order book streams (internal helper)
856    async fn watch_orderbooks_internal(
857        &self,
858        exchange: &Binance,
859        symbols: Vec<String>,
860        limit: Option<i64>,
861        update_speed: i32,
862        is_futures: bool,
863    ) -> Result<HashMap<String, OrderBook>> {
864        if symbols.len() > 200 {
865            return Err(Error::invalid_request(
866                "Binance supports max 200 symbols per connection",
867            ));
868        }
869
870        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
871        let mut streams = Vec::new();
872
873        for symbol in &symbols {
874            let stream = if update_speed == 100 {
875                format!("{}@depth@100ms", symbol.to_lowercase())
876            } else {
877                format!("{}@depth", symbol.to_lowercase())
878            };
879
880            streams.push(stream.clone());
881
882            self.subscription_manager
883                .add_subscription(
884                    stream,
885                    symbol.clone(),
886                    SubscriptionType::OrderBook,
887                    tx.clone(),
888                )
889                .await?;
890        }
891
892        self.message_router.subscribe(streams).await?;
893
894        tokio::time::sleep(Duration::from_millis(500)).await;
895
896        for symbol in &symbols {
897            let _ = self
898                .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
899                .await;
900        }
901
902        let mut result = HashMap::new();
903        let mut synced_symbols = std::collections::HashSet::new();
904
905        while synced_symbols.len() < symbols.len() {
906            if let Some(message) = rx.recv().await {
907                if message.get("result").is_some() {
908                    continue;
909                }
910
911                if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
912                    if event_type == "depthUpdate" {
913                        if let Some(msg_symbol) =
914                            message.get("s").and_then(serde_json::Value::as_str)
915                        {
916                            if let Err(e) = self
917                                .handle_orderbook_delta(msg_symbol, &message, is_futures)
918                                .await
919                            {
920                                tracing::error!("Failed to handle orderbook delta: {}", e);
921                                continue;
922                            }
923
924                            let orderbooks = self.orderbooks.lock().await;
925                            if let Some(ob) = orderbooks.get(msg_symbol) {
926                                if ob.is_synced {
927                                    synced_symbols.insert(msg_symbol.to_string());
928                                }
929                            }
930                        }
931                    }
932                }
933            } else {
934                return Err(Error::network("Subscription channel closed"));
935            }
936        }
937
938        let orderbooks = self.orderbooks.lock().await;
939        for symbol in &symbols {
940            if let Some(ob) = orderbooks.get(symbol) {
941                result.insert(symbol.clone(), ob.clone());
942            }
943        }
944
945        Ok(result)
946    }
947
948    /// Watches the best bid/ask data for a unified symbol (internal helper)
949    async fn watch_bids_asks_internal(&self, symbol: &str, market_id: &str) -> Result<BidAsk> {
950        let stream = format!("{}@bookTicker", market_id.to_lowercase());
951        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
952
953        self.subscription_manager
954            .add_subscription(
955                stream.clone(),
956                symbol.to_string(),
957                SubscriptionType::BookTicker,
958                tx,
959            )
960            .await?;
961
962        self.message_router.subscribe(vec![stream.clone()]).await?;
963
964        loop {
965            if let Some(message) = rx.recv().await {
966                if message.get("result").is_some() {
967                    continue;
968                }
969
970                if let Ok(bid_ask) = parser::parse_ws_bid_ask(&message) {
971                    let mut bids_asks_map = self.bids_asks.lock().await;
972                    bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
973                    return Ok(bid_ask);
974                }
975            } else {
976                return Err(Error::network("Subscription channel closed"));
977            }
978        }
979    }
980
981    /// Streams trade data for a unified symbol (internal helper)
982    async fn watch_trades_internal(
983        &self,
984        symbol: &str,
985        market_id: &str,
986        since: Option<i64>,
987        limit: Option<usize>,
988        market: Option<&ccxt_core::types::Market>,
989    ) -> Result<Vec<Trade>> {
990        let stream = format!("{}@trade", market_id.to_lowercase());
991        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
992
993        self.subscription_manager
994            .add_subscription(
995                stream.clone(),
996                symbol.to_string(),
997                SubscriptionType::Trades,
998                tx,
999            )
1000            .await?;
1001
1002        self.message_router.subscribe(vec![stream.clone()]).await?;
1003
1004        // Wait for at least one trade or use a loop with timeout if we want to mimic "polling" until data arrives?
1005        // Usually watch_trades returns the latest trades.
1006
1007        loop {
1008            if let Some(message) = rx.recv().await {
1009                if message.get("result").is_some() {
1010                    continue;
1011                }
1012
1013                if let Ok(trade) = parser::parse_ws_trade(&message, market) {
1014                    let mut trades_map = self.trades.lock().await;
1015                    let trades = trades_map
1016                        .entry(symbol.to_string())
1017                        .or_insert_with(VecDeque::new);
1018
1019                    if trades.len() >= MAX_TRADES {
1020                        trades.pop_front();
1021                    }
1022                    trades.push_back(trade);
1023
1024                    let mut result: Vec<Trade> = trades.iter().cloned().collect();
1025
1026                    if let Some(since_ts) = since {
1027                        result.retain(|t| t.timestamp >= since_ts);
1028                    }
1029
1030                    if let Some(limit_size) = limit {
1031                        if result.len() > limit_size {
1032                            result = result.split_off(result.len() - limit_size);
1033                        }
1034                    }
1035
1036                    return Ok(result);
1037                }
1038            } else {
1039                return Err(Error::network("Subscription channel closed"));
1040            }
1041        }
1042    }
1043
1044    /// Streams OHLCV data for a unified symbol (internal helper)
1045    async fn watch_ohlcv_internal(
1046        &self,
1047        symbol: &str,
1048        market_id: &str,
1049        timeframe: &str,
1050        since: Option<i64>,
1051        limit: Option<usize>,
1052    ) -> Result<Vec<OHLCV>> {
1053        let stream = format!("{}@kline_{}", market_id.to_lowercase(), timeframe);
1054        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1055
1056        self.subscription_manager
1057            .add_subscription(
1058                stream.clone(),
1059                symbol.to_string(),
1060                SubscriptionType::Kline(timeframe.to_string()),
1061                tx,
1062            )
1063            .await?;
1064
1065        self.message_router.subscribe(vec![stream.clone()]).await?;
1066
1067        loop {
1068            if let Some(message) = rx.recv().await {
1069                if message.get("result").is_some() {
1070                    continue;
1071                }
1072
1073                if let Ok(ohlcv) = parser::parse_ws_ohlcv(&message) {
1074                    let cache_key = format!("{}:{}", symbol, timeframe);
1075                    let mut ohlcvs_map = self.ohlcvs.lock().await;
1076                    let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
1077
1078                    if ohlcvs.len() >= MAX_OHLCVS {
1079                        ohlcvs.pop_front();
1080                    }
1081                    ohlcvs.push_back(ohlcv);
1082
1083                    let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
1084
1085                    if let Some(since_ts) = since {
1086                        result.retain(|o| o.timestamp >= since_ts);
1087                    }
1088
1089                    if let Some(limit_size) = limit {
1090                        if result.len() > limit_size {
1091                            result = result.split_off(result.len() - limit_size);
1092                        }
1093                    }
1094
1095                    return Ok(result);
1096                }
1097            } else {
1098                return Err(Error::network("Subscription channel closed"));
1099            }
1100        }
1101    }
1102
1103    /// Returns cached ticker snapshot
1104    pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1105        let tickers = self.tickers.lock().await;
1106        tickers.get(symbol).cloned()
1107    }
1108
1109    /// Returns all cached ticker snapshots
1110    pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1111        let tickers = self.tickers.lock().await;
1112        tickers.clone()
1113    }
1114
1115    /// Handles balance update messages (internal helper)
1116    async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1117        user_data::handle_balance_message(message, account_type, &self.balances).await
1118    }
1119
1120    /// Watches for balance updates (internal helper)
1121    async fn watch_balance_internal(&self, account_type: &str) -> Result<Balance> {
1122        self.connect_user_stream().await?;
1123
1124        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1125
1126        self.subscription_manager
1127            .add_subscription(
1128                "!userData".to_string(),
1129                "user".to_string(),
1130                SubscriptionType::Balance,
1131                tx,
1132            )
1133            .await?;
1134
1135        loop {
1136            if let Some(message) = rx.recv().await {
1137                if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
1138                    if matches!(
1139                        event_type,
1140                        "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE"
1141                    ) {
1142                        if let Ok(()) = self.handle_balance_message(&message, account_type).await {
1143                            let balances = self.balances.read().await;
1144                            if let Some(balance) = balances.get(account_type) {
1145                                return Ok(balance.clone());
1146                            }
1147                        }
1148                    }
1149                }
1150            } else {
1151                return Err(Error::network("Subscription channel closed"));
1152            }
1153        }
1154    }
1155
1156    /// Watches for order updates (internal helper)
1157    async fn watch_orders_internal(
1158        &self,
1159        symbol: Option<&str>,
1160        since: Option<i64>,
1161        limit: Option<usize>,
1162    ) -> Result<Vec<Order>> {
1163        self.connect_user_stream().await?;
1164
1165        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1166
1167        self.subscription_manager
1168            .add_subscription(
1169                "!userData".to_string(),
1170                "user".to_string(),
1171                SubscriptionType::Orders,
1172                tx,
1173            )
1174            .await?;
1175
1176        loop {
1177            if let Some(message) = rx.recv().await {
1178                if let Value::Object(data) = message {
1179                    if let Some(event_type) = data.get("e").and_then(serde_json::Value::as_str) {
1180                        if event_type == "executionReport" {
1181                            let order = user_data::parse_ws_order(&data);
1182
1183                            let mut orders = self.orders.write().await;
1184                            let symbol_orders = orders
1185                                .entry(order.symbol.clone())
1186                                .or_insert_with(HashMap::new);
1187                            symbol_orders.insert(order.id.clone(), order.clone());
1188                            drop(orders);
1189
1190                            if let Some(exec_type) =
1191                                data.get("x").and_then(serde_json::Value::as_str)
1192                            {
1193                                if exec_type == "TRADE" {
1194                                    if let Ok(trade) =
1195                                        BinanceWs::parse_ws_trade(&Value::Object(data.clone()))
1196                                    {
1197                                        let mut trades = self.my_trades.write().await;
1198                                        let symbol_trades = trades
1199                                            .entry(trade.symbol.clone())
1200                                            .or_insert_with(VecDeque::new);
1201
1202                                        symbol_trades.push_front(trade);
1203                                        if symbol_trades.len() > 1000 {
1204                                            symbol_trades.pop_back();
1205                                        }
1206                                    }
1207                                }
1208                            }
1209
1210                            return self.filter_orders(symbol, since, limit).await;
1211                        }
1212                    }
1213                }
1214            } else {
1215                return Err(Error::network("Subscription channel closed"));
1216            }
1217        }
1218    }
1219
1220    /// Watches for my trades (internal helper)
1221    async fn watch_my_trades_internal(
1222        &self,
1223        symbol: Option<&str>,
1224        since: Option<i64>,
1225        limit: Option<usize>,
1226    ) -> Result<Vec<Trade>> {
1227        self.connect_user_stream().await?;
1228
1229        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1230
1231        self.subscription_manager
1232            .add_subscription(
1233                "!userData".to_string(),
1234                "user".to_string(),
1235                SubscriptionType::MyTrades,
1236                tx,
1237            )
1238            .await?;
1239
1240        loop {
1241            if let Some(msg) = rx.recv().await {
1242                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
1243                    if event_type == "executionReport" {
1244                        if let Ok(trade) = BinanceWs::parse_ws_trade(&msg) {
1245                            let symbol_key = trade.symbol.clone();
1246
1247                            let mut trades_map = self.my_trades.write().await;
1248                            let symbol_trades =
1249                                trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
1250
1251                            symbol_trades.push_front(trade);
1252                            if symbol_trades.len() > 1000 {
1253                                symbol_trades.pop_back();
1254                            }
1255
1256                            drop(trades_map);
1257                            return self.filter_my_trades(symbol, since, limit).await;
1258                        }
1259                    }
1260                }
1261            } else {
1262                return Err(Error::network("Subscription channel closed"));
1263            }
1264        }
1265    }
1266
1267    /// Watches for positions (internal helper)
1268    async fn watch_positions_internal(
1269        &self,
1270        symbols: Option<Vec<String>>,
1271        since: Option<i64>,
1272        limit: Option<usize>,
1273    ) -> Result<Vec<Position>> {
1274        self.connect_user_stream().await?;
1275
1276        let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1277
1278        self.subscription_manager
1279            .add_subscription(
1280                "!userData".to_string(),
1281                "user".to_string(),
1282                SubscriptionType::Positions,
1283                tx,
1284            )
1285            .await?;
1286
1287        loop {
1288            if let Some(msg) = rx.recv().await {
1289                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
1290                    if event_type == "ACCOUNT_UPDATE" {
1291                        if let Some(account_data) = msg.get("a") {
1292                            if let Some(positions_array) =
1293                                account_data.get("P").and_then(|p| p.as_array())
1294                            {
1295                                for position_data in positions_array {
1296                                    if let Ok(position) =
1297                                        BinanceWs::parse_ws_position(position_data)
1298                                    {
1299                                        let symbol_key = position.symbol.clone();
1300                                        let side_key = position
1301                                            .side
1302                                            .clone()
1303                                            .unwrap_or_else(|| "both".to_string());
1304
1305                                        let mut positions_map = self.positions.write().await;
1306                                        let symbol_positions = positions_map
1307                                            .entry(symbol_key)
1308                                            .or_insert_with(HashMap::new);
1309
1310                                        if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
1311                                            symbol_positions.remove(&side_key);
1312                                            if symbol_positions.is_empty() {
1313                                                positions_map.remove(&position.symbol);
1314                                            }
1315                                        } else {
1316                                            symbol_positions.insert(side_key, position);
1317                                        }
1318                                    }
1319                                }
1320
1321                                let symbols_ref = symbols.as_deref();
1322                                return self.filter_positions(symbols_ref, since, limit).await;
1323                            }
1324                        }
1325                    }
1326                }
1327            } else {
1328                return Err(Error::network("Subscription channel closed"));
1329            }
1330        }
1331    }
1332
1333    /// Filters cached orders by symbol, time range, and limit
1334    async fn filter_orders(
1335        &self,
1336        symbol: Option<&str>,
1337        since: Option<i64>,
1338        limit: Option<usize>,
1339    ) -> Result<Vec<Order>> {
1340        let orders_map = self.orders.read().await;
1341
1342        let mut orders: Vec<Order> = if let Some(sym) = symbol {
1343            orders_map
1344                .get(sym)
1345                .map(|symbol_orders| symbol_orders.values().cloned().collect())
1346                .unwrap_or_default()
1347        } else {
1348            orders_map
1349                .values()
1350                .flat_map(|symbol_orders| symbol_orders.values().cloned())
1351                .collect()
1352        };
1353
1354        if let Some(since_ts) = since {
1355            orders.retain(|order| order.timestamp.is_some_and(|ts| ts >= since_ts));
1356        }
1357
1358        orders.sort_by(|a, b| {
1359            let ts_a = a.timestamp.unwrap_or(0);
1360            let ts_b = b.timestamp.unwrap_or(0);
1361            ts_b.cmp(&ts_a)
1362        });
1363
1364        if let Some(lim) = limit {
1365            orders.truncate(lim);
1366        }
1367
1368        Ok(orders)
1369    }
1370
1371    /// Parses a WebSocket trade message
1372    fn parse_ws_trade(data: &Value) -> Result<Trade> {
1373        user_data::parse_ws_trade(data)
1374    }
1375
1376    /// Filters cached personal trades by symbol, time range, and limit
1377    async fn filter_my_trades(
1378        &self,
1379        symbol: Option<&str>,
1380        since: Option<i64>,
1381        limit: Option<usize>,
1382    ) -> Result<Vec<Trade>> {
1383        let trades_map = self.my_trades.read().await;
1384
1385        let mut trades: Vec<Trade> = if let Some(sym) = symbol {
1386            trades_map
1387                .get(sym)
1388                .map(|symbol_trades| symbol_trades.iter().cloned().collect())
1389                .unwrap_or_default()
1390        } else {
1391            trades_map
1392                .values()
1393                .flat_map(|symbol_trades| symbol_trades.iter().cloned())
1394                .collect()
1395        };
1396
1397        if let Some(since_ts) = since {
1398            trades.retain(|trade| trade.timestamp >= since_ts);
1399        }
1400
1401        trades.sort_by(|a, b| {
1402            let ts_a = a.timestamp;
1403            let ts_b = b.timestamp;
1404            ts_b.cmp(&ts_a)
1405        });
1406
1407        if let Some(lim) = limit {
1408            trades.truncate(lim);
1409        }
1410
1411        Ok(trades)
1412    }
1413
1414    /// Parses a WebSocket position payload
1415    fn parse_ws_position(data: &Value) -> Result<Position> {
1416        user_data::parse_ws_position(data)
1417    }
1418
1419    /// Filters cached positions by symbol, time range, and limit
1420    async fn filter_positions(
1421        &self,
1422        symbols: Option<&[String]>,
1423        since: Option<i64>,
1424        limit: Option<usize>,
1425    ) -> Result<Vec<Position>> {
1426        let positions_map = self.positions.read().await;
1427
1428        let mut positions: Vec<Position> = if let Some(syms) = symbols {
1429            syms.iter()
1430                .filter_map(|sym| positions_map.get(sym))
1431                .flat_map(|side_map| side_map.values().cloned())
1432                .collect()
1433        } else {
1434            positions_map
1435                .values()
1436                .flat_map(|side_map| side_map.values().cloned())
1437                .collect()
1438        };
1439
1440        if let Some(since_ts) = since {
1441            positions.retain(|pos| pos.timestamp.is_some_and(|ts| ts >= since_ts));
1442        }
1443
1444        positions.sort_by(|a, b| {
1445            let ts_a = a.timestamp.unwrap_or(0);
1446            let ts_b = b.timestamp.unwrap_or(0);
1447            ts_b.cmp(&ts_a)
1448        });
1449
1450        if let Some(lim) = limit {
1451            positions.truncate(lim);
1452        }
1453
1454        Ok(positions)
1455    }
1456}
1457
1458// Include Binance impl methods in a separate file to keep mod.rs manageable
1459include!("binance_impl.rs");
1460
1461#[cfg(test)]
1462#[allow(clippy::disallowed_methods)]
1463mod tests {
1464    use super::*;
1465
1466    #[tokio::test]
1467    async fn test_binance_ws_creation() {
1468        let ws = BinanceWs::new(WS_BASE_URL.to_string());
1469        assert!(ws.listen_key.try_read().is_ok());
1470    }
1471
1472    #[test]
1473    fn test_stream_format() {
1474        let symbol = "btcusdt";
1475
1476        let ticker_stream = format!("{}@ticker", symbol);
1477        assert_eq!(ticker_stream, "btcusdt@ticker");
1478
1479        let trade_stream = format!("{}@trade", symbol);
1480        assert_eq!(trade_stream, "btcusdt@trade");
1481
1482        let depth_stream = format!("{}@depth20", symbol);
1483        assert_eq!(depth_stream, "btcusdt@depth20");
1484
1485        let kline_stream = format!("{}@kline_1m", symbol);
1486        assert_eq!(kline_stream, "btcusdt@kline_1m");
1487    }
1488
1489    #[tokio::test]
1490    async fn test_subscription_manager_basic() {
1491        let manager = SubscriptionManager::new();
1492        let (tx, _rx) = tokio::sync::mpsc::channel(1024);
1493
1494        assert_eq!(manager.active_count(), 0);
1495        assert!(!manager.has_subscription("btcusdt@ticker").await);
1496
1497        manager
1498            .add_subscription(
1499                "btcusdt@ticker".to_string(),
1500                "BTCUSDT".to_string(),
1501                SubscriptionType::Ticker,
1502                tx.clone(),
1503            )
1504            .await
1505            .unwrap();
1506
1507        assert_eq!(manager.active_count(), 1);
1508        assert!(manager.has_subscription("btcusdt@ticker").await);
1509
1510        let sub = manager.get_subscription("btcusdt@ticker").await;
1511        assert!(sub.is_some());
1512        let sub = sub.unwrap();
1513        assert_eq!(sub.stream, "btcusdt@ticker");
1514        assert_eq!(sub.symbol, "BTCUSDT");
1515        assert_eq!(sub.sub_type, SubscriptionType::Ticker);
1516
1517        manager.remove_subscription("btcusdt@ticker").await.unwrap();
1518        assert_eq!(manager.active_count(), 0);
1519        assert!(!manager.has_subscription("btcusdt@ticker").await);
1520    }
1521
1522    #[test]
1523    fn test_symbol_conversion() {
1524        let symbol = "BTC/USDT";
1525        let binance_symbol = symbol.replace('/', "").to_lowercase();
1526        assert_eq!(binance_symbol, "btcusdt");
1527    }
1528}