ccxt_exchanges/binance/ws/
mod.rs

1//! Binance WebSocket implementation
2//!
3//! Provides WebSocket real-time data stream subscriptions for the Binance exchange
4
5mod handlers;
6mod listen_key;
7mod streams;
8mod subscriptions;
9mod user_data;
10
11// Re-export public types for backward compatibility
12pub use handlers::MessageRouter;
13pub use listen_key::ListenKeyManager;
14pub use streams::*;
15pub use subscriptions::{ReconnectConfig, Subscription, SubscriptionManager, SubscriptionType};
16
17use crate::binance::{Binance, parser};
18use ccxt_core::error::{Error, Result};
19use ccxt_core::types::{
20    Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
21};
22use ccxt_core::ws_client::{WsClient, WsConfig};
23use serde_json::Value;
24use std::collections::{HashMap, VecDeque};
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::{Mutex, RwLock};
28
29/// Binance WebSocket client wrapper
30pub struct BinanceWs {
31    pub(crate) client: Arc<WsClient>,
32    listen_key: Arc<RwLock<Option<String>>>,
33    listen_key_manager: Option<Arc<ListenKeyManager>>,
34    auto_reconnect_coordinator: Arc<Mutex<Option<ccxt_core::ws_client::AutoReconnectCoordinator>>>,
35    pub(crate) tickers: Arc<Mutex<HashMap<String, Ticker>>>,
36    pub(crate) bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
37    #[allow(dead_code)]
38    mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
39    pub(crate) orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
40    pub(crate) trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
41    pub(crate) ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
42    pub(crate) balances: Arc<RwLock<HashMap<String, Balance>>>,
43    pub(crate) orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
44    pub(crate) my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
45    pub(crate) positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
46}
47
48impl std::fmt::Debug for BinanceWs {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("BinanceWs")
51            .field("is_connected", &self.client.is_connected())
52            .field("state", &self.client.state())
53            .finish_non_exhaustive()
54    }
55}
56
57impl BinanceWs {
58    /// Creates a new Binance WebSocket client
59    pub fn new(url: String) -> Self {
60        let config = WsConfig {
61            url,
62            connect_timeout: 10000,
63            ping_interval: 180000,
64            reconnect_interval: 5000,
65            max_reconnect_attempts: 5,
66            auto_reconnect: true,
67            enable_compression: false,
68            pong_timeout: 90000,
69            ..Default::default()
70        };
71
72        Self {
73            client: Arc::new(WsClient::new(config)),
74            listen_key: Arc::new(RwLock::new(None)),
75            listen_key_manager: None,
76            auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
77            tickers: Arc::new(Mutex::new(HashMap::new())),
78            bids_asks: Arc::new(Mutex::new(HashMap::new())),
79            mark_prices: Arc::new(Mutex::new(HashMap::new())),
80            orderbooks: Arc::new(Mutex::new(HashMap::new())),
81            trades: Arc::new(Mutex::new(HashMap::new())),
82            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
83            balances: Arc::new(RwLock::new(HashMap::new())),
84            orders: Arc::new(RwLock::new(HashMap::new())),
85            my_trades: Arc::new(RwLock::new(HashMap::new())),
86            positions: Arc::new(RwLock::new(HashMap::new())),
87        }
88    }
89
90    /// Creates a WebSocket client with a listen key manager
91    pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
92        let config = WsConfig {
93            url,
94            connect_timeout: 10000,
95            ping_interval: 180000,
96            reconnect_interval: 5000,
97            max_reconnect_attempts: 5,
98            auto_reconnect: true,
99            enable_compression: false,
100            pong_timeout: 90000,
101            ..Default::default()
102        };
103
104        Self {
105            client: Arc::new(WsClient::new(config)),
106            listen_key: Arc::new(RwLock::new(None)),
107            listen_key_manager: Some(Arc::new(ListenKeyManager::new(binance))),
108            auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
109            tickers: Arc::new(Mutex::new(HashMap::new())),
110            bids_asks: Arc::new(Mutex::new(HashMap::new())),
111            mark_prices: Arc::new(Mutex::new(HashMap::new())),
112            orderbooks: Arc::new(Mutex::new(HashMap::new())),
113            trades: Arc::new(Mutex::new(HashMap::new())),
114            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
115            balances: Arc::new(RwLock::new(HashMap::new())),
116            orders: Arc::new(RwLock::new(HashMap::new())),
117            my_trades: Arc::new(RwLock::new(HashMap::new())),
118            positions: Arc::new(RwLock::new(HashMap::new())),
119        }
120    }
121
122    /// Connects to the WebSocket server
123    pub async fn connect(&self) -> Result<()> {
124        self.client.connect().await?;
125
126        let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
127        if coordinator_guard.is_none() {
128            let coordinator = self.client.clone().create_auto_reconnect_coordinator();
129            coordinator.start().await;
130            *coordinator_guard = Some(coordinator);
131            tracing::info!("Auto-reconnect coordinator started");
132        }
133
134        Ok(())
135    }
136
137    /// Disconnects from the WebSocket server
138    pub async fn disconnect(&self) -> Result<()> {
139        let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
140        if let Some(coordinator) = coordinator_guard.take() {
141            coordinator.stop().await;
142            tracing::info!("Auto-reconnect coordinator stopped");
143        }
144
145        if let Some(manager) = &self.listen_key_manager {
146            manager.stop_auto_refresh().await;
147        }
148
149        self.client.disconnect().await
150    }
151
152    /// Connects to the user data stream
153    pub async fn connect_user_stream(&self) -> Result<()> {
154        let manager = self.listen_key_manager.as_ref()
155            .ok_or_else(|| Error::invalid_request(
156                "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
157            ))?;
158
159        let listen_key = manager.get_or_create().await?;
160        let _user_stream_url = format!("wss://stream.binance.com:9443/ws/{}", listen_key);
161
162        self.client.connect().await?;
163        manager.start_auto_refresh().await;
164        *self.listen_key.write().await = Some(listen_key);
165
166        Ok(())
167    }
168
169    /// Closes the user data stream
170    pub async fn close_user_stream(&self) -> Result<()> {
171        if let Some(manager) = &self.listen_key_manager {
172            manager.delete().await?;
173        }
174        *self.listen_key.write().await = None;
175        Ok(())
176    }
177
178    /// Returns the active listen key, when available
179    pub async fn get_listen_key(&self) -> Option<String> {
180        if let Some(manager) = &self.listen_key_manager {
181            manager.get_current().await
182        } else {
183            self.listen_key.read().await.clone()
184        }
185    }
186
187    /// Subscribes to the ticker stream for a symbol
188    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
189        let stream = format!("{}@ticker", symbol.to_lowercase());
190        self.client
191            .subscribe(stream, Some(symbol.to_string()), None)
192            .await
193    }
194
195    /// Subscribes to the 24-hour ticker stream for all symbols
196    pub async fn subscribe_all_tickers(&self) -> Result<()> {
197        self.client
198            .subscribe("!ticker@arr".to_string(), None, None)
199            .await
200    }
201
202    /// Subscribes to real-time trade executions for a symbol
203    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
204        let stream = format!("{}@trade", symbol.to_lowercase());
205        self.client
206            .subscribe(stream, Some(symbol.to_string()), None)
207            .await
208    }
209
210    /// Subscribes to the aggregated trade stream for a symbol
211    pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
212        let stream = format!("{}@aggTrade", symbol.to_lowercase());
213        self.client
214            .subscribe(stream, Some(symbol.to_string()), None)
215            .await
216    }
217
218    /// Subscribes to the order book depth stream
219    pub async fn subscribe_orderbook(
220        &self,
221        symbol: &str,
222        levels: u32,
223        update_speed: &str,
224    ) -> Result<()> {
225        let stream = if update_speed == "100ms" {
226            format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
227        } else {
228            format!("{}@depth{}", symbol.to_lowercase(), levels)
229        };
230
231        self.client
232            .subscribe(stream, Some(symbol.to_string()), None)
233            .await
234    }
235
236    /// Subscribes to the diff order book stream
237    pub async fn subscribe_orderbook_diff(
238        &self,
239        symbol: &str,
240        update_speed: Option<&str>,
241    ) -> Result<()> {
242        let stream = if let Some(speed) = update_speed {
243            if speed == "100ms" {
244                format!("{}@depth@100ms", symbol.to_lowercase())
245            } else {
246                format!("{}@depth", symbol.to_lowercase())
247            }
248        } else {
249            format!("{}@depth", symbol.to_lowercase())
250        };
251
252        self.client
253            .subscribe(stream, Some(symbol.to_string()), None)
254            .await
255    }
256
257    /// Subscribes to Kline (candlestick) data for a symbol
258    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
259        let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
260        self.client
261            .subscribe(stream, Some(symbol.to_string()), None)
262            .await
263    }
264
265    /// Subscribes to the mini ticker stream for a symbol
266    pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
267        let stream = format!("{}@miniTicker", symbol.to_lowercase());
268        self.client
269            .subscribe(stream, Some(symbol.to_string()), None)
270            .await
271    }
272
273    /// Subscribes to the mini ticker stream for all symbols
274    pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
275        self.client
276            .subscribe("!miniTicker@arr".to_string(), None, None)
277            .await
278    }
279
280    /// Cancels an existing subscription
281    pub async fn unsubscribe(&self, stream: String) -> Result<()> {
282        self.client.unsubscribe(stream, None).await
283    }
284
285    /// Receives the next available message
286    pub async fn receive(&self) -> Option<Value> {
287        self.client.receive().await
288    }
289
290    /// Indicates whether the WebSocket connection is active
291    pub fn is_connected(&self) -> bool {
292        self.client.is_connected()
293    }
294
295    /// Returns the current connection state.
296    pub fn state(&self) -> ccxt_core::ws_client::WsConnectionState {
297        self.client.state()
298    }
299
300    /// Returns the list of active subscriptions.
301    ///
302    /// Returns a vector of subscription channel names that are currently active.
303    /// This method retrieves the actual subscriptions from the underlying WsClient's
304    /// subscription manager, providing accurate state tracking.
305    pub fn subscriptions(&self) -> Vec<String> {
306        self.client.subscriptions()
307    }
308
309    /// Returns the number of active subscriptions.
310    pub fn subscription_count(&self) -> usize {
311        self.client.subscription_count()
312    }
313
314    /// Watches a single ticker stream (internal helper)
315    async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
316        let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
317
318        self.client
319            .subscribe(stream.clone(), Some(symbol.to_string()), None)
320            .await?;
321
322        loop {
323            if let Some(message) = self.client.receive().await {
324                if message.get("result").is_some() {
325                    continue;
326                }
327
328                if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
329                    let mut tickers = self.tickers.lock().await;
330                    tickers.insert(ticker.symbol.clone(), ticker.clone());
331                    return Ok(ticker);
332                }
333            }
334        }
335    }
336
337    /// Watches multiple ticker streams (internal helper)
338    async fn watch_tickers_internal(
339        &self,
340        symbols: Option<Vec<String>>,
341        channel_name: &str,
342    ) -> Result<HashMap<String, Ticker>> {
343        let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
344            syms.iter()
345                .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
346                .collect()
347        } else {
348            vec![format!("!{}@arr", channel_name)]
349        };
350
351        for stream in &streams {
352            self.client.subscribe(stream.clone(), None, None).await?;
353        }
354
355        let mut result = HashMap::new();
356
357        loop {
358            if let Some(message) = self.client.receive().await {
359                if message.get("result").is_some() {
360                    continue;
361                }
362
363                if let Some(arr) = message.as_array() {
364                    for item in arr {
365                        if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
366                            let symbol = ticker.symbol.clone();
367
368                            if let Some(syms) = &symbols {
369                                if syms.contains(&symbol.to_lowercase()) {
370                                    result.insert(symbol.clone(), ticker.clone());
371                                }
372                            } else {
373                                result.insert(symbol.clone(), ticker.clone());
374                            }
375
376                            let mut tickers = self.tickers.lock().await;
377                            tickers.insert(symbol, ticker);
378                        }
379                    }
380
381                    if let Some(syms) = &symbols {
382                        if result.len() == syms.len() {
383                            return Ok(result);
384                        }
385                    } else {
386                        return Ok(result);
387                    }
388                } else if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
389                    let symbol = ticker.symbol.clone();
390                    result.insert(symbol.clone(), ticker.clone());
391
392                    let mut tickers = self.tickers.lock().await;
393                    tickers.insert(symbol, ticker);
394
395                    if let Some(syms) = &symbols {
396                        if result.len() == syms.len() {
397                            return Ok(result);
398                        }
399                    }
400                }
401            }
402        }
403    }
404
405    /// Processes an order book delta update (internal helper)
406    async fn handle_orderbook_delta(
407        &self,
408        symbol: &str,
409        delta_message: &Value,
410        is_futures: bool,
411    ) -> Result<()> {
412        handlers::handle_orderbook_delta(symbol, delta_message, is_futures, &self.orderbooks).await
413    }
414
415    /// Retrieves an order book snapshot and initializes cached state (internal helper)
416    async fn fetch_orderbook_snapshot(
417        &self,
418        exchange: &Binance,
419        symbol: &str,
420        limit: Option<i64>,
421        is_futures: bool,
422    ) -> Result<OrderBook> {
423        handlers::fetch_orderbook_snapshot(exchange, symbol, limit, is_futures, &self.orderbooks)
424            .await
425    }
426
427    /// Watches a single order book stream (internal helper)
428    async fn watch_orderbook_internal(
429        &self,
430        exchange: &Binance,
431        symbol: &str,
432        limit: Option<i64>,
433        update_speed: i32,
434        is_futures: bool,
435    ) -> Result<OrderBook> {
436        let stream = if update_speed == 100 {
437            format!("{}@depth@100ms", symbol.to_lowercase())
438        } else {
439            format!("{}@depth", symbol.to_lowercase())
440        };
441
442        self.client
443            .subscribe(stream.clone(), Some(symbol.to_string()), None)
444            .await?;
445
446        tokio::time::sleep(Duration::from_millis(500)).await;
447
448        let _snapshot = self
449            .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
450            .await?;
451
452        loop {
453            if let Some(message) = self.client.receive().await {
454                if message.get("result").is_some() {
455                    continue;
456                }
457
458                if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
459                    if event_type == "depthUpdate" {
460                        match self
461                            .handle_orderbook_delta(symbol, &message, is_futures)
462                            .await
463                        {
464                            Ok(()) => {
465                                let orderbooks = self.orderbooks.lock().await;
466                                if let Some(ob) = orderbooks.get(symbol) {
467                                    if ob.is_synced {
468                                        return Ok(ob.clone());
469                                    }
470                                }
471                            }
472                            Err(e) => {
473                                let err_msg = e.to_string();
474
475                                if err_msg.contains("RESYNC_NEEDED") {
476                                    tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
477
478                                    let current_time = chrono::Utc::now().timestamp_millis();
479                                    let should_resync = {
480                                        let orderbooks = self.orderbooks.lock().await;
481                                        if let Some(ob) = orderbooks.get(symbol) {
482                                            ob.should_resync(current_time)
483                                        } else {
484                                            true
485                                        }
486                                    };
487
488                                    if should_resync {
489                                        tracing::info!("Initiating resync for {}", symbol);
490
491                                        {
492                                            let mut orderbooks = self.orderbooks.lock().await;
493                                            if let Some(ob) = orderbooks.get_mut(symbol) {
494                                                ob.reset_for_resync();
495                                                ob.mark_resync_initiated(current_time);
496                                            }
497                                        }
498
499                                        tokio::time::sleep(Duration::from_millis(500)).await;
500
501                                        match self
502                                            .fetch_orderbook_snapshot(
503                                                exchange, symbol, limit, is_futures,
504                                            )
505                                            .await
506                                        {
507                                            Ok(_) => {
508                                                tracing::info!(
509                                                    "Resync completed successfully for {}",
510                                                    symbol
511                                                );
512                                                continue;
513                                            }
514                                            Err(resync_err) => {
515                                                tracing::error!(
516                                                    "Resync failed for {}: {}",
517                                                    symbol,
518                                                    resync_err
519                                                );
520                                                return Err(resync_err);
521                                            }
522                                        }
523                                    }
524                                    tracing::debug!("Resync rate limited for {}, skipping", symbol);
525                                }
526                                tracing::error!("Failed to handle orderbook delta: {}", err_msg);
527                            }
528                        }
529                    }
530                }
531            }
532        }
533    }
534
535    /// Watches multiple order book streams (internal helper)
536    async fn watch_orderbooks_internal(
537        &self,
538        exchange: &Binance,
539        symbols: Vec<String>,
540        limit: Option<i64>,
541        update_speed: i32,
542        is_futures: bool,
543    ) -> Result<HashMap<String, OrderBook>> {
544        if symbols.len() > 200 {
545            return Err(Error::invalid_request(
546                "Binance supports max 200 symbols per connection",
547            ));
548        }
549
550        for symbol in &symbols {
551            let stream = if update_speed == 100 {
552                format!("{}@depth@100ms", symbol.to_lowercase())
553            } else {
554                format!("{}@depth", symbol.to_lowercase())
555            };
556
557            self.client
558                .subscribe(stream, Some(symbol.clone()), None)
559                .await?;
560        }
561
562        tokio::time::sleep(Duration::from_millis(500)).await;
563
564        for symbol in &symbols {
565            let _ = self
566                .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
567                .await;
568        }
569
570        let mut result = HashMap::new();
571        let mut update_count = 0;
572
573        while update_count < symbols.len() {
574            if let Some(message) = self.client.receive().await {
575                if message.get("result").is_some() {
576                    continue;
577                }
578
579                if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
580                    if event_type == "depthUpdate" {
581                        if let Some(msg_symbol) =
582                            message.get("s").and_then(serde_json::Value::as_str)
583                        {
584                            if let Err(e) = self
585                                .handle_orderbook_delta(msg_symbol, &message, is_futures)
586                                .await
587                            {
588                                tracing::error!("Failed to handle orderbook delta: {}", e);
589                                continue;
590                            }
591
592                            update_count += 1;
593                        }
594                    }
595                }
596            }
597        }
598
599        let orderbooks = self.orderbooks.lock().await;
600        for symbol in &symbols {
601            if let Some(ob) = orderbooks.get(symbol) {
602                result.insert(symbol.clone(), ob.clone());
603            }
604        }
605
606        Ok(result)
607    }
608
609    /// Returns cached ticker snapshot
610    pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
611        let tickers = self.tickers.lock().await;
612        tickers.get(symbol).cloned()
613    }
614
615    /// Returns all cached ticker snapshots
616    pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
617        let tickers = self.tickers.lock().await;
618        tickers.clone()
619    }
620
621    /// Handles balance update messages (internal helper)
622    async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
623        user_data::handle_balance_message(message, account_type, &self.balances).await
624    }
625
626    /// Parses a WebSocket trade message
627    fn parse_ws_trade(data: &Value) -> Result<Trade> {
628        user_data::parse_ws_trade(data)
629    }
630
631    /// Filters cached personal trades by symbol, time range, and limit
632    async fn filter_my_trades(
633        &self,
634        symbol: Option<&str>,
635        since: Option<i64>,
636        limit: Option<usize>,
637    ) -> Result<Vec<Trade>> {
638        let trades_map = self.my_trades.read().await;
639
640        let mut trades: Vec<Trade> = if let Some(sym) = symbol {
641            trades_map
642                .get(sym)
643                .map(|symbol_trades| symbol_trades.iter().cloned().collect())
644                .unwrap_or_default()
645        } else {
646            trades_map
647                .values()
648                .flat_map(|symbol_trades| symbol_trades.iter().cloned())
649                .collect()
650        };
651
652        if let Some(since_ts) = since {
653            trades.retain(|trade| trade.timestamp >= since_ts);
654        }
655
656        trades.sort_by(|a, b| {
657            let ts_a = a.timestamp;
658            let ts_b = b.timestamp;
659            ts_b.cmp(&ts_a)
660        });
661
662        if let Some(lim) = limit {
663            trades.truncate(lim);
664        }
665
666        Ok(trades)
667    }
668
669    /// Parses a WebSocket position payload
670    fn parse_ws_position(data: &Value) -> Result<Position> {
671        user_data::parse_ws_position(data)
672    }
673
674    /// Filters cached positions by symbol, time range, and limit
675    async fn filter_positions(
676        &self,
677        symbols: Option<&[String]>,
678        since: Option<i64>,
679        limit: Option<usize>,
680    ) -> Result<Vec<Position>> {
681        let positions_map = self.positions.read().await;
682
683        let mut positions: Vec<Position> = if let Some(syms) = symbols {
684            syms.iter()
685                .filter_map(|sym| positions_map.get(sym))
686                .flat_map(|side_map| side_map.values().cloned())
687                .collect()
688        } else {
689            positions_map
690                .values()
691                .flat_map(|side_map| side_map.values().cloned())
692                .collect()
693        };
694
695        if let Some(since_ts) = since {
696            positions.retain(|pos| pos.timestamp.is_some_and(|ts| ts >= since_ts));
697        }
698
699        positions.sort_by(|a, b| {
700            let ts_a = a.timestamp.unwrap_or(0);
701            let ts_b = b.timestamp.unwrap_or(0);
702            ts_b.cmp(&ts_a)
703        });
704
705        if let Some(lim) = limit {
706            positions.truncate(lim);
707        }
708
709        Ok(positions)
710    }
711}
712
713// Include Binance impl methods in a separate file to keep mod.rs manageable
714include!("binance_impl.rs");
715
716#[cfg(test)]
717mod tests {
718    use super::*;
719
720    #[test]
721    fn test_binance_ws_creation() {
722        let ws = BinanceWs::new(WS_BASE_URL.to_string());
723        assert!(ws.listen_key.try_read().is_ok());
724    }
725
726    #[test]
727    fn test_stream_format() {
728        let symbol = "btcusdt";
729
730        let ticker_stream = format!("{}@ticker", symbol);
731        assert_eq!(ticker_stream, "btcusdt@ticker");
732
733        let trade_stream = format!("{}@trade", symbol);
734        assert_eq!(trade_stream, "btcusdt@trade");
735
736        let depth_stream = format!("{}@depth20", symbol);
737        assert_eq!(depth_stream, "btcusdt@depth20");
738
739        let kline_stream = format!("{}@kline_1m", symbol);
740        assert_eq!(kline_stream, "btcusdt@kline_1m");
741    }
742
743    #[tokio::test]
744    async fn test_subscription_manager_basic() {
745        let manager = SubscriptionManager::new();
746        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
747
748        assert_eq!(manager.active_count(), 0);
749        assert!(!manager.has_subscription("btcusdt@ticker").await);
750
751        manager
752            .add_subscription(
753                "btcusdt@ticker".to_string(),
754                "BTCUSDT".to_string(),
755                SubscriptionType::Ticker,
756                tx.clone(),
757            )
758            .await
759            .unwrap();
760
761        assert_eq!(manager.active_count(), 1);
762        assert!(manager.has_subscription("btcusdt@ticker").await);
763
764        let sub = manager.get_subscription("btcusdt@ticker").await;
765        assert!(sub.is_some());
766        let sub = sub.unwrap();
767        assert_eq!(sub.stream, "btcusdt@ticker");
768        assert_eq!(sub.symbol, "BTCUSDT");
769        assert_eq!(sub.sub_type, SubscriptionType::Ticker);
770
771        manager.remove_subscription("btcusdt@ticker").await.unwrap();
772        assert_eq!(manager.active_count(), 0);
773        assert!(!manager.has_subscription("btcusdt@ticker").await);
774    }
775
776    #[test]
777    fn test_symbol_conversion() {
778        let symbol = "BTC/USDT";
779        let binance_symbol = symbol.replace('/', "").to_lowercase();
780        assert_eq!(binance_symbol, "btcusdt");
781    }
782}