ccxt_exchanges/binance/ws/
subscriptions.rs1use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::time::Instant;
8use tokio::sync::RwLock;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12pub enum SubscriptionType {
13 Ticker,
15 OrderBook,
17 Trades,
19 Kline(String),
21 Balance,
23 Orders,
25 Positions,
27 MyTrades,
29 MarkPrice,
31 BookTicker,
33}
34
35impl SubscriptionType {
36 pub fn from_stream(stream: &str) -> Option<Self> {
38 if stream.contains("@ticker") {
39 Some(Self::Ticker)
40 } else if stream.contains("@depth") {
41 Some(Self::OrderBook)
42 } else if stream.contains("@trade") || stream.contains("@aggTrade") {
43 Some(Self::Trades)
44 } else if stream.contains("@kline_") {
45 let parts: Vec<&str> = stream.split("@kline_").collect();
46 if parts.len() == 2 {
47 Some(Self::Kline(parts[1].to_string()))
48 } else {
49 None
50 }
51 } else if stream.contains("@markPrice") {
52 Some(Self::MarkPrice)
53 } else if stream.contains("@bookTicker") {
54 Some(Self::BookTicker)
55 } else {
56 None
57 }
58 }
59}
60
61#[derive(Clone)]
63pub struct Subscription {
64 pub stream: String,
66 pub symbol: String,
68 pub sub_type: SubscriptionType,
70 pub subscribed_at: Instant,
72 senders: Arc<std::sync::Mutex<Vec<tokio::sync::mpsc::Sender<Value>>>>,
74 ref_count: Arc<AtomicUsize>,
76}
77
78impl Subscription {
79 pub fn new(
81 stream: String,
82 symbol: String,
83 sub_type: SubscriptionType,
84 sender: tokio::sync::mpsc::Sender<Value>,
85 ) -> Self {
86 Self {
87 stream,
88 symbol,
89 sub_type,
90 subscribed_at: Instant::now(),
91 senders: Arc::new(std::sync::Mutex::new(vec![sender])),
92 ref_count: Arc::new(AtomicUsize::new(1)),
93 }
94 }
95
96 pub fn add_sender(&self, sender: tokio::sync::mpsc::Sender<Value>) {
98 if let Ok(mut senders) = self.senders.lock() {
99 senders.push(sender);
100 }
101 }
102
103 pub fn send(&self, message: Value) -> bool {
107 if let Ok(mut senders) = self.senders.lock() {
108 let mut any_sent = false;
109 senders.retain(|sender| {
110 match sender.try_send(message.clone()) {
111 Ok(()) => {
112 any_sent = true;
113 true }
115 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
116 true
118 }
119 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
120 false
122 }
123 }
124 });
125 any_sent || !senders.is_empty()
126 } else {
127 false
128 }
129 }
130
131 pub fn add_ref(&self) -> usize {
133 self.ref_count.fetch_add(1, Ordering::SeqCst) + 1
134 }
135
136 pub fn remove_ref(&self) -> usize {
138 let prev = self.ref_count.fetch_sub(1, Ordering::SeqCst);
139 prev.saturating_sub(1)
140 }
141
142 pub fn ref_count(&self) -> usize {
144 self.ref_count.load(Ordering::SeqCst)
145 }
146}
147
148pub struct SubscriptionManager {
150 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
152 symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
154 active_count: Arc<std::sync::atomic::AtomicUsize>,
156}
157
158impl SubscriptionManager {
159 pub fn new() -> Self {
161 Self {
162 subscriptions: Arc::new(RwLock::new(HashMap::new())),
163 symbol_index: Arc::new(RwLock::new(HashMap::new())),
164 active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
165 }
166 }
167
168 pub async fn add_subscription(
175 &self,
176 stream: String,
177 symbol: String,
178 sub_type: SubscriptionType,
179 sender: tokio::sync::mpsc::Sender<Value>,
180 ) -> ccxt_core::error::Result<bool> {
181 let mut subs = self.subscriptions.write().await;
182
183 if let Some(existing) = subs.get(&stream) {
185 existing.add_sender(sender);
186 existing.add_ref();
187 return Ok(false);
188 }
189
190 let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
191
192 subs.insert(stream.clone(), subscription);
193
194 let mut index = self.symbol_index.write().await;
195 index.entry(symbol).or_insert_with(Vec::new).push(stream);
196
197 self.active_count
198 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
199
200 Ok(true)
201 }
202
203 pub async fn remove_subscription(&self, stream: &str) -> ccxt_core::error::Result<bool> {
208 let mut subs = self.subscriptions.write().await;
209
210 if let Some(subscription) = subs.get(stream) {
211 let remaining = subscription.remove_ref();
212 if remaining > 0 {
213 return Ok(false);
215 }
216
217 let Some(subscription) = subs.remove(stream) else {
219 return Ok(false);
220 };
221 let mut index = self.symbol_index.write().await;
222 if let Some(streams) = index.get_mut(&subscription.symbol) {
223 streams.retain(|s| s != stream);
224 if streams.is_empty() {
225 index.remove(&subscription.symbol);
226 }
227 }
228
229 self.active_count
230 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
231 Ok(true)
232 } else {
233 Ok(false)
234 }
235 }
236
237 pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
239 let subs = self.subscriptions.read().await;
240 subs.get(stream).cloned()
241 }
242
243 pub async fn has_subscription(&self, stream: &str) -> bool {
245 let subs = self.subscriptions.read().await;
246 subs.contains_key(stream)
247 }
248
249 pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
251 let subs = self.subscriptions.read().await;
252 subs.values().cloned().collect()
253 }
254
255 pub fn get_all_subscriptions_sync(&self) -> Vec<Subscription> {
257 if let Ok(subs) = self.subscriptions.try_read() {
258 subs.values().cloned().collect()
259 } else {
260 Vec::new()
261 }
262 }
263
264 pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
266 let index = self.symbol_index.read().await;
267 let subs = self.subscriptions.read().await;
268
269 if let Some(streams) = index.get(symbol) {
270 streams
271 .iter()
272 .filter_map(|stream| subs.get(stream).cloned())
273 .collect()
274 } else {
275 Vec::new()
276 }
277 }
278
279 pub fn active_count(&self) -> usize {
281 self.active_count.load(std::sync::atomic::Ordering::SeqCst)
282 }
283
284 pub async fn clear(&self) {
286 let mut subs = self.subscriptions.write().await;
287 let mut index = self.symbol_index.write().await;
288
289 subs.clear();
290 index.clear();
291 self.active_count
292 .store(0, std::sync::atomic::Ordering::SeqCst);
293 }
294
295 pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
297 let subs = self.subscriptions.read().await;
298 if let Some(subscription) = subs.get(stream) {
299 if subscription.send(message) {
300 return true;
301 }
302 } else {
303 return false;
304 }
305 drop(subs);
306
307 let _ = self.remove_subscription(stream).await;
308 false
309 }
310
311 pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
313 let index = self.symbol_index.read().await;
314 let subs = self.subscriptions.read().await;
315
316 let mut sent_count = 0;
317 let mut streams_to_remove = Vec::new();
318
319 if let Some(streams) = index.get(symbol) {
320 for stream in streams {
321 if let Some(subscription) = subs.get(stream) {
322 if subscription.send(message.clone()) {
323 sent_count += 1;
324 } else {
325 streams_to_remove.push(stream.clone());
326 }
327 }
328 }
329 }
330 drop(subs);
331 drop(index);
332
333 for stream in streams_to_remove {
334 let _ = self.remove_subscription(&stream).await;
335 }
336
337 sent_count
338 }
339
340 pub async fn get_active_streams(&self) -> Vec<String> {
342 let subs = self.subscriptions.read().await;
343 subs.keys().cloned().collect()
344 }
345}
346
347impl Default for SubscriptionManager {
348 fn default() -> Self {
349 Self::new()
350 }
351}
352
353#[derive(Debug, Clone)]
355pub struct ReconnectConfig {
356 pub enabled: bool,
358
359 pub initial_delay_ms: u64,
361
362 pub max_delay_ms: u64,
364
365 pub backoff_multiplier: f64,
367
368 pub max_attempts: usize,
370}
371
372impl Default for ReconnectConfig {
373 fn default() -> Self {
374 Self {
375 enabled: true,
376 initial_delay_ms: 1000,
377 max_delay_ms: 30000,
378 backoff_multiplier: 2.0,
379 max_attempts: 0,
380 }
381 }
382}
383
384impl ReconnectConfig {
385 pub fn calculate_delay(&self, attempt: usize) -> u64 {
387 let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
388 delay.min(self.max_delay_ms as f64) as u64
389 }
390
391 pub fn should_retry(&self, attempt: usize) -> bool {
393 self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
394 }
395}
396
397pub struct SubscriptionHandle {
411 stream: String,
413 subscription_manager: Arc<SubscriptionManager>,
415 message_router: Option<Arc<crate::binance::ws::handlers::MessageRouter>>,
417 released: bool,
419}
420
421impl SubscriptionHandle {
422 pub fn new(
424 stream: String,
425 subscription_manager: Arc<SubscriptionManager>,
426 message_router: Option<Arc<crate::binance::ws::handlers::MessageRouter>>,
427 ) -> Self {
428 Self {
429 stream,
430 subscription_manager,
431 message_router,
432 released: false,
433 }
434 }
435
436 pub fn stream(&self) -> &str {
438 &self.stream
439 }
440
441 pub async fn release(mut self) -> ccxt_core::error::Result<()> {
446 self.released = true;
447 self.do_release().await
448 }
449
450 async fn do_release(&self) -> ccxt_core::error::Result<()> {
452 let fully_removed = self
453 .subscription_manager
454 .remove_subscription(&self.stream)
455 .await?;
456
457 if fully_removed {
458 if let Some(router) = &self.message_router {
459 router.unsubscribe(vec![self.stream.clone()]).await?;
460 }
461 }
462
463 Ok(())
464 }
465}
466
467impl Drop for SubscriptionHandle {
468 fn drop(&mut self) {
469 if self.released {
470 return;
471 }
472
473 let stream = self.stream.clone();
475 let subscription_manager = self.subscription_manager.clone();
476 let message_router = self.message_router.clone();
477
478 tokio::spawn(async move {
479 let fully_removed = subscription_manager
480 .remove_subscription(&stream)
481 .await
482 .unwrap_or(false);
483
484 if fully_removed {
485 if let Some(router) = &message_router {
486 let _ = router.unsubscribe(vec![stream]).await;
487 }
488 }
489 });
490 }
491}