Skip to main content

ccxt_exchanges/binance/ws/
connection_manager.rs

1use crate::binance::Binance;
2use crate::binance::ws::BinanceWs;
3use ccxt_core::error::Result;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6
7const MAX_STREAMS_PER_CONNECTION: usize = 200;
8
9/// Manages multiple WebSocket connections for Binance to handle sharding and isolation
10#[derive(Debug)]
11pub struct BinanceConnectionManager {
12    /// Pool of public data connections (shards)
13    public_shards: RwLock<Vec<Arc<BinanceWs>>>,
14    /// Dedicated connection for private user data
15    private_client: RwLock<Option<Arc<BinanceWs>>>,
16    /// Base WebSocket URL
17    base_ws_url: String,
18}
19
20impl BinanceConnectionManager {
21    /// Creates a new connection manager
22    pub fn new(base_ws_url: String) -> Self {
23        Self {
24            public_shards: RwLock::new(Vec::new()),
25            private_client: RwLock::new(None),
26            base_ws_url,
27        }
28    }
29
30    /// Gets or creates a public connection suitable for a new subscription
31    pub async fn get_public_connection(&self) -> Result<Arc<BinanceWs>> {
32        let mut shards = self.public_shards.write().await;
33
34        // Try to find an existing shard with capacity
35        for shard in shards.iter() {
36            if shard.subscription_count() < MAX_STREAMS_PER_CONNECTION {
37                return Ok(shard.clone());
38            }
39        }
40
41        // If no shard available or all full, create a new one
42        let new_shard = Arc::new(BinanceWs::new(self.base_ws_url.clone()));
43
44        // Connect the new shard immediately
45        new_shard.connect().await?;
46
47        shards.push(new_shard.clone());
48        Ok(new_shard)
49    }
50
51    /// Gets or creates the dedicated private connection
52    pub async fn get_private_connection(&self, binance: &Arc<Binance>) -> Result<Arc<BinanceWs>> {
53        let mut client_lock = self.private_client.write().await;
54
55        if let Some(client) = client_lock.as_ref() {
56            if client.is_connected() {
57                return Ok(client.clone());
58            }
59        }
60
61        // Create new authenticated client
62        let new_client = Arc::new(BinanceWs::new_with_auth(
63            self.base_ws_url.clone(),
64            binance.clone(),
65        ));
66
67        // Connect user stream (this handles listenKey generation and management)
68        new_client.connect_user_stream().await?;
69
70        *client_lock = Some(new_client.clone());
71        Ok(new_client)
72    }
73
74    /// Returns the number of active public shards
75    pub async fn active_shards_count(&self) -> usize {
76        self.public_shards.read().await.len()
77    }
78
79    /// Disconnects all active connections
80    pub async fn disconnect_all(&self) -> Result<()> {
81        let mut shards = self.public_shards.write().await;
82        for shard in shards.iter() {
83            let _ = shard.disconnect().await;
84        }
85        shards.clear();
86
87        let mut private = self.private_client.write().await;
88        if let Some(client) = private.take() {
89            let _ = client.disconnect().await;
90        }
91
92        Ok(())
93    }
94
95    /// Checks if any connection is active (non-blocking)
96    pub fn is_connected(&self) -> bool {
97        if let Ok(shards) = self.public_shards.try_read() {
98            if !shards.is_empty() {
99                // Check if at least one shard is connected?
100                for shard in shards.iter() {
101                    if shard.is_connected() {
102                        return true;
103                    }
104                }
105            }
106        }
107
108        if let Ok(private) = self.private_client.try_read() {
109            if let Some(client) = private.as_ref() {
110                if client.is_connected() {
111                    return true;
112                }
113            }
114        }
115
116        false
117    }
118
119    /// Returns a list of all active subscriptions across all connections
120    pub fn get_all_subscriptions(&self) -> Vec<String> {
121        let mut subs = Vec::new();
122        if let Ok(shards) = self.public_shards.try_read() {
123            for shard in shards.iter() {
124                subs.extend(shard.subscriptions());
125            }
126        }
127        if let Ok(private) = self.private_client.try_read() {
128            if let Some(client) = private.as_ref() {
129                subs.extend(client.subscriptions());
130            }
131        }
132        subs
133    }
134}