Skip to main content

ccxt_exchanges/binance/ws/
connection_manager.rs

1use crate::binance::Binance;
2use crate::binance::BinanceUrls;
3use crate::binance::ws::BinanceWs;
4use ccxt_core::error::Result;
5use ccxt_core::types::MarketType;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10/// Returns the maximum number of streams per connection for a given market type.
11///
12/// Binance allows different limits:
13/// - Spot: 1024 streams per connection
14/// - Futures/Swap/Option: 200 streams per connection
15fn max_streams_for(market_type: MarketType) -> usize {
16    match market_type {
17        MarketType::Spot => 1024,
18        MarketType::Futures | MarketType::Swap | MarketType::Option => 200,
19    }
20}
21
22/// Manages multiple WebSocket connections for Binance to handle sharding and isolation.
23///
24/// Connections are organized by market type (Spot, Swap, Futures, Option) to ensure
25/// that each market type connects to the correct WebSocket endpoint.
26#[derive(Debug)]
27pub struct BinanceConnectionManager {
28    /// Pool of public data connections organized by market type
29    public_shards: RwLock<HashMap<MarketType, Vec<Arc<BinanceWs>>>>,
30    /// Dedicated connections for private user data organized by market type
31    private_clients: RwLock<HashMap<MarketType, Arc<BinanceWs>>>,
32    /// WebSocket URLs for different market types
33    urls: BinanceUrls,
34    /// Whether to use sandbox/testnet URLs
35    is_sandbox: bool,
36}
37
38impl BinanceConnectionManager {
39    /// Creates a new connection manager with the given URLs.
40    pub fn new(urls: BinanceUrls, is_sandbox: bool) -> Self {
41        Self {
42            public_shards: RwLock::new(HashMap::new()),
43            private_clients: RwLock::new(HashMap::new()),
44            urls,
45            is_sandbox,
46        }
47    }
48
49    /// Creates a new connection manager for production environment.
50    pub fn production() -> Self {
51        Self::new(BinanceUrls::production(), false)
52    }
53
54    /// Creates a new connection manager for testnet environment.
55    pub fn testnet() -> Self {
56        Self::new(BinanceUrls::testnet(), true)
57    }
58
59    /// Returns the WebSocket URL for the given market type.
60    ///
61    /// # URL Routing
62    ///
63    /// - `MarketType::Spot` → `stream.binance.com` (spot and margin trading)
64    /// - `MarketType::Futures` → `fstream.binance.com` (Linear/USDT-margined contracts)
65    /// - `MarketType::Swap` → `dstream.binance.com` (Inverse/Coin-margined contracts)
66    /// - `MarketType::Option` → `nbstream.binance.com` (European-style options)
67    ///
68    /// Callers should select the `MarketType` based on the market's `linear`/`inverse` flags:
69    /// - `market.linear == true` → use `MarketType::Futures`
70    /// - `market.inverse == true` → use `MarketType::Swap`
71    ///
72    /// # Arguments
73    ///
74    /// * `market_type` - The market type to get the URL for
75    ///
76    /// # Returns
77    ///
78    /// The WebSocket URL for the specified market type.
79    pub fn get_ws_url(&self, market_type: MarketType) -> &str {
80        match market_type {
81            MarketType::Spot => &self.urls.ws,
82            MarketType::Futures => &self.urls.ws_fapi, // Linear/USDT-margined futures
83            MarketType::Swap => &self.urls.ws_dapi,    // Inverse/Coin-margined perpetuals
84            MarketType::Option => &self.urls.ws_eapi,
85        }
86    }
87
88    /// Gets or creates a public connection suitable for a new subscription.
89    ///
90    /// Connections are lazily initialized on first subscription for each market type.
91    ///
92    /// # Arguments
93    ///
94    /// * `market_type` - The market type for the connection
95    ///
96    /// # Returns
97    ///
98    /// An Arc to the WebSocket client for the specified market type.
99    pub async fn get_public_connection(&self, market_type: MarketType) -> Result<Arc<BinanceWs>> {
100        let mut shards = self.public_shards.write().await;
101
102        // Get or create the shard list for this market type
103        let market_shards = shards.entry(market_type).or_insert_with(Vec::new);
104
105        // Try to find an existing shard with capacity
106        for shard in market_shards.iter() {
107            if shard.subscription_count() < max_streams_for(market_type) {
108                return Ok(shard.clone());
109            }
110        }
111
112        // If no shard available or all full, create a new one
113        let ws_url = self.get_ws_url(market_type).to_string();
114        let new_shard = Arc::new(BinanceWs::new(ws_url));
115
116        // Connect the new shard immediately
117        new_shard.connect().await?;
118
119        market_shards.push(new_shard.clone());
120        Ok(new_shard)
121    }
122
123    /// Gets or creates the dedicated private connection for the given market type.
124    ///
125    /// # Arguments
126    ///
127    /// * `market_type` - The market type for the connection
128    /// * `binance` - Reference to the Binance instance for authentication
129    ///
130    /// # Returns
131    ///
132    /// An Arc to the authenticated WebSocket client.
133    pub async fn get_private_connection(
134        &self,
135        market_type: MarketType,
136        binance: &Arc<Binance>,
137    ) -> Result<Arc<BinanceWs>> {
138        let mut clients = self.private_clients.write().await;
139
140        // Check if we have an existing connected client for this market type
141        if let Some(client) = clients.get(&market_type) {
142            if client.is_connected() {
143                return Ok(client.clone());
144            }
145        }
146
147        // Create new authenticated client with the correct URL for this market type
148        let ws_url = self.get_ws_url(market_type).to_string();
149        let new_client = Arc::new(BinanceWs::new_with_auth(
150            ws_url,
151            binance.clone(),
152            market_type,
153        ));
154
155        // Connect user stream (this handles listenKey generation and management)
156        new_client.connect_user_stream().await?;
157
158        clients.insert(market_type, new_client.clone());
159        Ok(new_client)
160    }
161
162    /// Returns the number of active public shards across all market types.
163    pub async fn active_shards_count(&self) -> usize {
164        self.public_shards.read().await.values().map(Vec::len).sum()
165    }
166
167    /// Returns the number of active public shards for a specific market type.
168    pub async fn active_shards_count_for(&self, market_type: MarketType) -> usize {
169        self.public_shards
170            .read()
171            .await
172            .get(&market_type)
173            .map_or(0, Vec::len)
174    }
175
176    /// Disconnects all active connections across all market types.
177    pub async fn disconnect_all(&self) -> Result<()> {
178        // Disconnect all public shards
179        let mut shards = self.public_shards.write().await;
180        for market_shards in shards.values() {
181            for shard in market_shards {
182                let _ = shard.disconnect().await;
183            }
184        }
185        shards.clear();
186
187        // Disconnect all private clients
188        let mut private = self.private_clients.write().await;
189        for (_, client) in private.drain() {
190            let _ = client.disconnect().await;
191        }
192
193        Ok(())
194    }
195
196    /// Disconnects all connections for a specific market type.
197    pub async fn disconnect_market(&self, market_type: MarketType) -> Result<()> {
198        // Disconnect public shards for this market type
199        let mut shards = self.public_shards.write().await;
200        if let Some(market_shards) = shards.remove(&market_type) {
201            for shard in &market_shards {
202                let _ = shard.disconnect().await;
203            }
204        }
205
206        // Disconnect private client for this market type
207        let mut private = self.private_clients.write().await;
208        if let Some(client) = private.remove(&market_type) {
209            let _ = client.disconnect().await;
210        }
211
212        Ok(())
213    }
214
215    /// Checks if any connection is active (non-blocking).
216    pub fn is_connected(&self) -> bool {
217        if let Ok(shards) = self.public_shards.try_read() {
218            for market_shards in shards.values() {
219                for shard in market_shards {
220                    if shard.is_connected() {
221                        return true;
222                    }
223                }
224            }
225        }
226
227        if let Ok(private) = self.private_clients.try_read() {
228            for (_, client) in private.iter() {
229                if client.is_connected() {
230                    return true;
231                }
232            }
233        }
234
235        false
236    }
237
238    /// Checks if a connection is active for a specific market type (non-blocking).
239    pub fn is_connected_for(&self, market_type: MarketType) -> bool {
240        if let Ok(shards) = self.public_shards.try_read() {
241            if let Some(market_shards) = shards.get(&market_type) {
242                for shard in market_shards {
243                    if shard.is_connected() {
244                        return true;
245                    }
246                }
247            }
248        }
249
250        if let Ok(private) = self.private_clients.try_read() {
251            if let Some(client) = private.get(&market_type) {
252                if client.is_connected() {
253                    return true;
254                }
255            }
256        }
257
258        false
259    }
260
261    /// Returns a list of all active subscriptions across all connections.
262    pub fn get_all_subscriptions(&self) -> Vec<String> {
263        let mut subs = Vec::new();
264        if let Ok(shards) = self.public_shards.try_read() {
265            for market_shards in shards.values() {
266                for shard in market_shards {
267                    subs.extend(shard.subscriptions());
268                }
269            }
270        }
271        if let Ok(private) = self.private_clients.try_read() {
272            for (_, client) in private.iter() {
273                subs.extend(client.subscriptions());
274            }
275        }
276        subs
277    }
278
279    /// Returns a list of active subscriptions for a specific market type.
280    pub fn get_subscriptions_for(&self, market_type: MarketType) -> Vec<String> {
281        let mut subs = Vec::new();
282        if let Ok(shards) = self.public_shards.try_read() {
283            if let Some(market_shards) = shards.get(&market_type) {
284                for shard in market_shards {
285                    subs.extend(shard.subscriptions());
286                }
287            }
288        }
289        if let Ok(private) = self.private_clients.try_read() {
290            if let Some(client) = private.get(&market_type) {
291                subs.extend(client.subscriptions());
292            }
293        }
294        subs
295    }
296
297    /// Returns a reference to the URLs configuration.
298    pub fn urls(&self) -> &BinanceUrls {
299        &self.urls
300    }
301
302    /// Returns whether this manager is configured for sandbox/testnet.
303    pub fn is_sandbox(&self) -> bool {
304        self.is_sandbox
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311
312    #[test]
313    fn test_production_urls() {
314        let manager = BinanceConnectionManager::production();
315        assert!(!manager.is_sandbox());
316
317        assert_eq!(
318            manager.get_ws_url(MarketType::Spot),
319            "wss://stream.binance.com:9443/ws"
320        );
321        assert_eq!(
322            manager.get_ws_url(MarketType::Futures),
323            "wss://fstream.binance.com/ws"
324        );
325        assert_eq!(
326            manager.get_ws_url(MarketType::Swap),
327            "wss://dstream.binance.com/ws"
328        );
329        assert_eq!(
330            manager.get_ws_url(MarketType::Option),
331            "wss://nbstream.binance.com/eoptions/ws"
332        );
333    }
334
335    #[test]
336    fn test_testnet_urls() {
337        let manager = BinanceConnectionManager::testnet();
338        assert!(manager.is_sandbox());
339
340        assert_eq!(
341            manager.get_ws_url(MarketType::Spot),
342            "wss://testnet.binance.vision/ws"
343        );
344        assert_eq!(
345            manager.get_ws_url(MarketType::Futures),
346            "wss://stream.binancefuture.com/ws"
347        );
348        assert_eq!(
349            manager.get_ws_url(MarketType::Swap),
350            "wss://dstream.binancefuture.com/ws"
351        );
352    }
353
354    #[test]
355    fn test_custom_urls() {
356        let mut urls = BinanceUrls::production();
357        urls.ws = "wss://custom.example.com/ws".to_string();
358        urls.ws_fapi = "wss://custom-fapi.example.com/ws".to_string();
359
360        let manager = BinanceConnectionManager::new(urls, false);
361
362        assert_eq!(
363            manager.get_ws_url(MarketType::Spot),
364            "wss://custom.example.com/ws"
365        );
366        assert_eq!(
367            manager.get_ws_url(MarketType::Futures),
368            "wss://custom-fapi.example.com/ws"
369        );
370        // Unchanged URLs should still be production defaults
371        assert_eq!(
372            manager.get_ws_url(MarketType::Swap),
373            "wss://dstream.binance.com/ws"
374        );
375    }
376
377    #[test]
378    fn test_initial_state() {
379        let manager = BinanceConnectionManager::production();
380
381        // No connections initially
382        assert!(!manager.is_connected());
383        assert!(!manager.is_connected_for(MarketType::Spot));
384        assert!(!manager.is_connected_for(MarketType::Futures));
385        assert!(!manager.is_connected_for(MarketType::Swap));
386        assert!(!manager.is_connected_for(MarketType::Option));
387
388        // No subscriptions initially
389        assert!(manager.get_all_subscriptions().is_empty());
390        assert!(manager.get_subscriptions_for(MarketType::Spot).is_empty());
391    }
392
393    #[tokio::test]
394    async fn test_active_shards_count_initial() {
395        let manager = BinanceConnectionManager::production();
396
397        assert_eq!(manager.active_shards_count().await, 0);
398        assert_eq!(manager.active_shards_count_for(MarketType::Spot).await, 0);
399        assert_eq!(
400            manager.active_shards_count_for(MarketType::Futures).await,
401            0
402        );
403    }
404
405    #[tokio::test]
406    async fn test_disconnect_all_empty() {
407        let manager = BinanceConnectionManager::production();
408
409        // Should not error when disconnecting with no connections
410        let result = manager.disconnect_all().await;
411        assert!(result.is_ok());
412    }
413
414    #[tokio::test]
415    async fn test_disconnect_market_empty() {
416        let manager = BinanceConnectionManager::production();
417
418        // Should not error when disconnecting a market type with no connections
419        let result = manager.disconnect_market(MarketType::Spot).await;
420        assert!(result.is_ok());
421    }
422
423    #[test]
424    fn test_urls_accessor() {
425        let manager = BinanceConnectionManager::production();
426        let urls = manager.urls();
427
428        assert!(urls.ws.contains("stream.binance.com"));
429        assert!(urls.ws_fapi.contains("fstream.binance.com"));
430        assert!(urls.ws_dapi.contains("dstream.binance.com"));
431        assert!(urls.ws_eapi.contains("nbstream.binance.com"));
432    }
433
434    #[test]
435    fn test_market_type_url_routing_completeness() {
436        let manager = BinanceConnectionManager::production();
437
438        // Ensure all market types return non-empty URLs
439        for market_type in &[
440            MarketType::Spot,
441            MarketType::Futures,
442            MarketType::Swap,
443            MarketType::Option,
444        ] {
445            let url = manager.get_ws_url(*market_type);
446            assert!(
447                !url.is_empty(),
448                "URL for {:?} should not be empty",
449                market_type
450            );
451            assert!(
452                url.starts_with("wss://"),
453                "URL for {:?} should start with wss://, got: {}",
454                market_type,
455                url
456            );
457        }
458    }
459
460    #[test]
461    fn test_different_market_types_get_different_urls() {
462        let manager = BinanceConnectionManager::production();
463
464        let spot_url = manager.get_ws_url(MarketType::Spot);
465        let futures_url = manager.get_ws_url(MarketType::Futures);
466        let swap_url = manager.get_ws_url(MarketType::Swap);
467        let option_url = manager.get_ws_url(MarketType::Option);
468
469        // All URLs should be different
470        assert_ne!(spot_url, futures_url);
471        assert_ne!(spot_url, swap_url);
472        assert_ne!(spot_url, option_url);
473        assert_ne!(futures_url, swap_url);
474        assert_ne!(futures_url, option_url);
475        assert_ne!(swap_url, option_url);
476    }
477}