ccxt_exchanges/binance/ws/
connection_manager.rs1use crate::binance::Binance;
2use crate::binance::BinanceUrls;
3use crate::binance::ws::BinanceWs;
4use ccxt_core::error::Result;
5use ccxt_core::types::MarketType;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10fn max_streams_for(market_type: MarketType) -> usize {
16 match market_type {
17 MarketType::Spot => 1024,
18 MarketType::Futures | MarketType::Swap | MarketType::Option => 200,
19 }
20}
21
22#[derive(Debug)]
27pub struct BinanceConnectionManager {
28 public_shards: RwLock<HashMap<MarketType, Vec<Arc<BinanceWs>>>>,
30 private_clients: RwLock<HashMap<MarketType, Arc<BinanceWs>>>,
32 urls: BinanceUrls,
34 is_sandbox: bool,
36}
37
38impl BinanceConnectionManager {
39 pub fn new(urls: BinanceUrls, is_sandbox: bool) -> Self {
41 Self {
42 public_shards: RwLock::new(HashMap::new()),
43 private_clients: RwLock::new(HashMap::new()),
44 urls,
45 is_sandbox,
46 }
47 }
48
49 pub fn production() -> Self {
51 Self::new(BinanceUrls::production(), false)
52 }
53
54 pub fn testnet() -> Self {
56 Self::new(BinanceUrls::testnet(), true)
57 }
58
59 pub fn get_ws_url(&self, market_type: MarketType) -> &str {
80 match market_type {
81 MarketType::Spot => &self.urls.ws,
82 MarketType::Futures => &self.urls.ws_fapi, MarketType::Swap => &self.urls.ws_dapi, MarketType::Option => &self.urls.ws_eapi,
85 }
86 }
87
88 pub async fn get_public_connection(&self, market_type: MarketType) -> Result<Arc<BinanceWs>> {
100 let mut shards = self.public_shards.write().await;
101
102 let market_shards = shards.entry(market_type).or_insert_with(Vec::new);
104
105 for shard in market_shards.iter() {
107 if shard.subscription_count() < max_streams_for(market_type) {
108 return Ok(shard.clone());
109 }
110 }
111
112 let ws_url = self.get_ws_url(market_type).to_string();
114 let new_shard = Arc::new(BinanceWs::new(ws_url));
115
116 new_shard.connect().await?;
118
119 market_shards.push(new_shard.clone());
120 Ok(new_shard)
121 }
122
123 pub async fn get_private_connection(
134 &self,
135 market_type: MarketType,
136 binance: &Arc<Binance>,
137 ) -> Result<Arc<BinanceWs>> {
138 let mut clients = self.private_clients.write().await;
139
140 if let Some(client) = clients.get(&market_type) {
142 if client.is_connected() {
143 return Ok(client.clone());
144 }
145 }
146
147 let ws_url = self.get_ws_url(market_type).to_string();
149 let new_client = Arc::new(BinanceWs::new_with_auth(
150 ws_url,
151 binance.clone(),
152 market_type,
153 ));
154
155 new_client.connect_user_stream().await?;
157
158 clients.insert(market_type, new_client.clone());
159 Ok(new_client)
160 }
161
162 pub async fn active_shards_count(&self) -> usize {
164 self.public_shards.read().await.values().map(Vec::len).sum()
165 }
166
167 pub async fn active_shards_count_for(&self, market_type: MarketType) -> usize {
169 self.public_shards
170 .read()
171 .await
172 .get(&market_type)
173 .map_or(0, Vec::len)
174 }
175
176 pub async fn disconnect_all(&self) -> Result<()> {
178 let mut shards = self.public_shards.write().await;
180 for market_shards in shards.values() {
181 for shard in market_shards {
182 let _ = shard.disconnect().await;
183 }
184 }
185 shards.clear();
186
187 let mut private = self.private_clients.write().await;
189 for (_, client) in private.drain() {
190 let _ = client.disconnect().await;
191 }
192
193 Ok(())
194 }
195
196 pub async fn disconnect_market(&self, market_type: MarketType) -> Result<()> {
198 let mut shards = self.public_shards.write().await;
200 if let Some(market_shards) = shards.remove(&market_type) {
201 for shard in &market_shards {
202 let _ = shard.disconnect().await;
203 }
204 }
205
206 let mut private = self.private_clients.write().await;
208 if let Some(client) = private.remove(&market_type) {
209 let _ = client.disconnect().await;
210 }
211
212 Ok(())
213 }
214
215 pub fn is_connected(&self) -> bool {
217 if let Ok(shards) = self.public_shards.try_read() {
218 for market_shards in shards.values() {
219 for shard in market_shards {
220 if shard.is_connected() {
221 return true;
222 }
223 }
224 }
225 }
226
227 if let Ok(private) = self.private_clients.try_read() {
228 for (_, client) in private.iter() {
229 if client.is_connected() {
230 return true;
231 }
232 }
233 }
234
235 false
236 }
237
238 pub fn is_connected_for(&self, market_type: MarketType) -> bool {
240 if let Ok(shards) = self.public_shards.try_read() {
241 if let Some(market_shards) = shards.get(&market_type) {
242 for shard in market_shards {
243 if shard.is_connected() {
244 return true;
245 }
246 }
247 }
248 }
249
250 if let Ok(private) = self.private_clients.try_read() {
251 if let Some(client) = private.get(&market_type) {
252 if client.is_connected() {
253 return true;
254 }
255 }
256 }
257
258 false
259 }
260
261 pub fn get_all_subscriptions(&self) -> Vec<String> {
263 let mut subs = Vec::new();
264 if let Ok(shards) = self.public_shards.try_read() {
265 for market_shards in shards.values() {
266 for shard in market_shards {
267 subs.extend(shard.subscriptions());
268 }
269 }
270 }
271 if let Ok(private) = self.private_clients.try_read() {
272 for (_, client) in private.iter() {
273 subs.extend(client.subscriptions());
274 }
275 }
276 subs
277 }
278
279 pub fn get_subscriptions_for(&self, market_type: MarketType) -> Vec<String> {
281 let mut subs = Vec::new();
282 if let Ok(shards) = self.public_shards.try_read() {
283 if let Some(market_shards) = shards.get(&market_type) {
284 for shard in market_shards {
285 subs.extend(shard.subscriptions());
286 }
287 }
288 }
289 if let Ok(private) = self.private_clients.try_read() {
290 if let Some(client) = private.get(&market_type) {
291 subs.extend(client.subscriptions());
292 }
293 }
294 subs
295 }
296
297 pub fn urls(&self) -> &BinanceUrls {
299 &self.urls
300 }
301
302 pub fn is_sandbox(&self) -> bool {
304 self.is_sandbox
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311
312 #[test]
313 fn test_production_urls() {
314 let manager = BinanceConnectionManager::production();
315 assert!(!manager.is_sandbox());
316
317 assert_eq!(
318 manager.get_ws_url(MarketType::Spot),
319 "wss://stream.binance.com:9443/ws"
320 );
321 assert_eq!(
322 manager.get_ws_url(MarketType::Futures),
323 "wss://fstream.binance.com/ws"
324 );
325 assert_eq!(
326 manager.get_ws_url(MarketType::Swap),
327 "wss://dstream.binance.com/ws"
328 );
329 assert_eq!(
330 manager.get_ws_url(MarketType::Option),
331 "wss://nbstream.binance.com/eoptions/ws"
332 );
333 }
334
335 #[test]
336 fn test_testnet_urls() {
337 let manager = BinanceConnectionManager::testnet();
338 assert!(manager.is_sandbox());
339
340 assert_eq!(
341 manager.get_ws_url(MarketType::Spot),
342 "wss://testnet.binance.vision/ws"
343 );
344 assert_eq!(
345 manager.get_ws_url(MarketType::Futures),
346 "wss://stream.binancefuture.com/ws"
347 );
348 assert_eq!(
349 manager.get_ws_url(MarketType::Swap),
350 "wss://dstream.binancefuture.com/ws"
351 );
352 }
353
354 #[test]
355 fn test_custom_urls() {
356 let mut urls = BinanceUrls::production();
357 urls.ws = "wss://custom.example.com/ws".to_string();
358 urls.ws_fapi = "wss://custom-fapi.example.com/ws".to_string();
359
360 let manager = BinanceConnectionManager::new(urls, false);
361
362 assert_eq!(
363 manager.get_ws_url(MarketType::Spot),
364 "wss://custom.example.com/ws"
365 );
366 assert_eq!(
367 manager.get_ws_url(MarketType::Futures),
368 "wss://custom-fapi.example.com/ws"
369 );
370 assert_eq!(
372 manager.get_ws_url(MarketType::Swap),
373 "wss://dstream.binance.com/ws"
374 );
375 }
376
377 #[test]
378 fn test_initial_state() {
379 let manager = BinanceConnectionManager::production();
380
381 assert!(!manager.is_connected());
383 assert!(!manager.is_connected_for(MarketType::Spot));
384 assert!(!manager.is_connected_for(MarketType::Futures));
385 assert!(!manager.is_connected_for(MarketType::Swap));
386 assert!(!manager.is_connected_for(MarketType::Option));
387
388 assert!(manager.get_all_subscriptions().is_empty());
390 assert!(manager.get_subscriptions_for(MarketType::Spot).is_empty());
391 }
392
393 #[tokio::test]
394 async fn test_active_shards_count_initial() {
395 let manager = BinanceConnectionManager::production();
396
397 assert_eq!(manager.active_shards_count().await, 0);
398 assert_eq!(manager.active_shards_count_for(MarketType::Spot).await, 0);
399 assert_eq!(
400 manager.active_shards_count_for(MarketType::Futures).await,
401 0
402 );
403 }
404
405 #[tokio::test]
406 async fn test_disconnect_all_empty() {
407 let manager = BinanceConnectionManager::production();
408
409 let result = manager.disconnect_all().await;
411 assert!(result.is_ok());
412 }
413
414 #[tokio::test]
415 async fn test_disconnect_market_empty() {
416 let manager = BinanceConnectionManager::production();
417
418 let result = manager.disconnect_market(MarketType::Spot).await;
420 assert!(result.is_ok());
421 }
422
423 #[test]
424 fn test_urls_accessor() {
425 let manager = BinanceConnectionManager::production();
426 let urls = manager.urls();
427
428 assert!(urls.ws.contains("stream.binance.com"));
429 assert!(urls.ws_fapi.contains("fstream.binance.com"));
430 assert!(urls.ws_dapi.contains("dstream.binance.com"));
431 assert!(urls.ws_eapi.contains("nbstream.binance.com"));
432 }
433
434 #[test]
435 fn test_market_type_url_routing_completeness() {
436 let manager = BinanceConnectionManager::production();
437
438 for market_type in &[
440 MarketType::Spot,
441 MarketType::Futures,
442 MarketType::Swap,
443 MarketType::Option,
444 ] {
445 let url = manager.get_ws_url(*market_type);
446 assert!(
447 !url.is_empty(),
448 "URL for {:?} should not be empty",
449 market_type
450 );
451 assert!(
452 url.starts_with("wss://"),
453 "URL for {:?} should start with wss://, got: {}",
454 market_type,
455 url
456 );
457 }
458 }
459
460 #[test]
461 fn test_different_market_types_get_different_urls() {
462 let manager = BinanceConnectionManager::production();
463
464 let spot_url = manager.get_ws_url(MarketType::Spot);
465 let futures_url = manager.get_ws_url(MarketType::Futures);
466 let swap_url = manager.get_ws_url(MarketType::Swap);
467 let option_url = manager.get_ws_url(MarketType::Option);
468
469 assert_ne!(spot_url, futures_url);
471 assert_ne!(spot_url, swap_url);
472 assert_ne!(spot_url, option_url);
473 assert_ne!(futures_url, swap_url);
474 assert_ne!(futures_url, option_url);
475 assert_ne!(swap_url, option_url);
476 }
477}