ccxt_exchanges/binance/ws/
connection_manager.rs1use crate::binance::Binance;
2use crate::binance::ws::BinanceWs;
3use ccxt_core::error::Result;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6
7const MAX_STREAMS_PER_CONNECTION: usize = 200;
8
9#[derive(Debug)]
11pub struct BinanceConnectionManager {
12 public_shards: RwLock<Vec<Arc<BinanceWs>>>,
14 private_client: RwLock<Option<Arc<BinanceWs>>>,
16 base_ws_url: String,
18}
19
20impl BinanceConnectionManager {
21 pub fn new(base_ws_url: String) -> Self {
23 Self {
24 public_shards: RwLock::new(Vec::new()),
25 private_client: RwLock::new(None),
26 base_ws_url,
27 }
28 }
29
30 pub async fn get_public_connection(&self) -> Result<Arc<BinanceWs>> {
32 let mut shards = self.public_shards.write().await;
33
34 for shard in shards.iter() {
36 if shard.subscription_count() < MAX_STREAMS_PER_CONNECTION {
37 return Ok(shard.clone());
38 }
39 }
40
41 let new_shard = Arc::new(BinanceWs::new(self.base_ws_url.clone()));
43
44 new_shard.connect().await?;
46
47 shards.push(new_shard.clone());
48 Ok(new_shard)
49 }
50
51 pub async fn get_private_connection(&self, binance: &Arc<Binance>) -> Result<Arc<BinanceWs>> {
53 let mut client_lock = self.private_client.write().await;
54
55 if let Some(client) = client_lock.as_ref() {
56 if client.is_connected() {
57 return Ok(client.clone());
58 }
59 }
60
61 let new_client = Arc::new(BinanceWs::new_with_auth(
63 self.base_ws_url.clone(),
64 binance.clone(),
65 ));
66
67 new_client.connect_user_stream().await?;
69
70 *client_lock = Some(new_client.clone());
71 Ok(new_client)
72 }
73
74 pub async fn active_shards_count(&self) -> usize {
76 self.public_shards.read().await.len()
77 }
78
79 pub async fn disconnect_all(&self) -> Result<()> {
81 let mut shards = self.public_shards.write().await;
82 for shard in shards.iter() {
83 let _ = shard.disconnect().await;
84 }
85 shards.clear();
86
87 let mut private = self.private_client.write().await;
88 if let Some(client) = private.take() {
89 let _ = client.disconnect().await;
90 }
91
92 Ok(())
93 }
94
95 pub fn is_connected(&self) -> bool {
97 if let Ok(shards) = self.public_shards.try_read() {
98 if !shards.is_empty() {
99 for shard in shards.iter() {
101 if shard.is_connected() {
102 return true;
103 }
104 }
105 }
106 }
107
108 if let Ok(private) = self.private_client.try_read() {
109 if let Some(client) = private.as_ref() {
110 if client.is_connected() {
111 return true;
112 }
113 }
114 }
115
116 false
117 }
118
119 pub fn get_all_subscriptions(&self) -> Vec<String> {
121 let mut subs = Vec::new();
122 if let Ok(shards) = self.public_shards.try_read() {
123 for shard in shards.iter() {
124 subs.extend(shard.subscriptions());
125 }
126 }
127 if let Ok(private) = self.private_client.try_read() {
128 if let Some(client) = private.as_ref() {
129 subs.extend(client.subscriptions());
130 }
131 }
132 subs
133 }
134}