ccxt_exchanges/binance/ws/
subscriptions.rs

1//! Subscription management for Binance WebSocket streams
2
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7use tokio::sync::RwLock;
8
9/// Subscription types supported by the Binance WebSocket API.
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub enum SubscriptionType {
12    /// 24-hour ticker stream
13    Ticker,
14    /// Order book depth stream
15    OrderBook,
16    /// Real-time trade stream
17    Trades,
18    /// Kline (candlestick) stream with interval (e.g. "1m", "5m", "1h")
19    Kline(String),
20    /// Account balance stream
21    Balance,
22    /// Order update stream
23    Orders,
24    /// Position update stream
25    Positions,
26    /// Personal trade execution stream
27    MyTrades,
28    /// Mark price stream
29    MarkPrice,
30    /// Book ticker (best bid/ask) stream
31    BookTicker,
32}
33
34impl SubscriptionType {
35    /// Infers a subscription type from a stream name
36    pub fn from_stream(stream: &str) -> Option<Self> {
37        if stream.contains("@ticker") {
38            Some(Self::Ticker)
39        } else if stream.contains("@depth") {
40            Some(Self::OrderBook)
41        } else if stream.contains("@trade") || stream.contains("@aggTrade") {
42            Some(Self::Trades)
43        } else if stream.contains("@kline_") {
44            let parts: Vec<&str> = stream.split("@kline_").collect();
45            if parts.len() == 2 {
46                Some(Self::Kline(parts[1].to_string()))
47            } else {
48                None
49            }
50        } else if stream.contains("@markPrice") {
51            Some(Self::MarkPrice)
52        } else if stream.contains("@bookTicker") {
53            Some(Self::BookTicker)
54        } else {
55            None
56        }
57    }
58}
59
60/// Subscription metadata
61#[derive(Clone)]
62pub struct Subscription {
63    /// Stream name (e.g. "btcusdt@ticker")
64    pub stream: String,
65    /// Normalized trading symbol (e.g. "BTCUSDT")
66    pub symbol: String,
67    /// Subscription type descriptor
68    pub sub_type: SubscriptionType,
69    /// Timestamp when the subscription was created
70    pub subscribed_at: Instant,
71    /// Sender for forwarding WebSocket messages to consumers
72    pub sender: tokio::sync::mpsc::UnboundedSender<Value>,
73}
74
75impl Subscription {
76    /// Creates a new subscription with the provided parameters
77    pub fn new(
78        stream: String,
79        symbol: String,
80        sub_type: SubscriptionType,
81        sender: tokio::sync::mpsc::UnboundedSender<Value>,
82    ) -> Self {
83        Self {
84            stream,
85            symbol,
86            sub_type,
87            subscribed_at: Instant::now(),
88            sender,
89        }
90    }
91
92    /// Sends a message to the subscriber
93    pub fn send(&self, message: Value) -> bool {
94        self.sender.send(message).is_ok()
95    }
96}
97
98/// Subscription manager
99pub struct SubscriptionManager {
100    /// Mapping of `stream_name -> Subscription`
101    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
102    /// Index by symbol: `symbol -> Vec<stream_name>`
103    symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
104    /// Counter of active subscriptions
105    active_count: Arc<std::sync::atomic::AtomicUsize>,
106}
107
108impl SubscriptionManager {
109    /// Creates a new `SubscriptionManager`
110    pub fn new() -> Self {
111        Self {
112            subscriptions: Arc::new(RwLock::new(HashMap::new())),
113            symbol_index: Arc::new(RwLock::new(HashMap::new())),
114            active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
115        }
116    }
117
118    /// Adds a subscription to the manager
119    pub async fn add_subscription(
120        &self,
121        stream: String,
122        symbol: String,
123        sub_type: SubscriptionType,
124        sender: tokio::sync::mpsc::UnboundedSender<Value>,
125    ) -> ccxt_core::error::Result<()> {
126        let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
127
128        let mut subs = self.subscriptions.write().await;
129        subs.insert(stream.clone(), subscription);
130
131        let mut index = self.symbol_index.write().await;
132        index.entry(symbol).or_insert_with(Vec::new).push(stream);
133
134        self.active_count
135            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
136
137        Ok(())
138    }
139
140    /// Removes a subscription by stream name
141    pub async fn remove_subscription(&self, stream: &str) -> ccxt_core::error::Result<()> {
142        let mut subs = self.subscriptions.write().await;
143
144        if let Some(subscription) = subs.remove(stream) {
145            let mut index = self.symbol_index.write().await;
146            if let Some(streams) = index.get_mut(&subscription.symbol) {
147                streams.retain(|s| s != stream);
148                if streams.is_empty() {
149                    index.remove(&subscription.symbol);
150                }
151            }
152
153            self.active_count
154                .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
155        }
156
157        Ok(())
158    }
159
160    /// Retrieves a subscription by stream name
161    pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
162        let subs = self.subscriptions.read().await;
163        subs.get(stream).cloned()
164    }
165
166    /// Checks whether a subscription exists for the given stream
167    pub async fn has_subscription(&self, stream: &str) -> bool {
168        let subs = self.subscriptions.read().await;
169        subs.contains_key(stream)
170    }
171
172    /// Returns all registered subscriptions
173    pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
174        let subs = self.subscriptions.read().await;
175        subs.values().cloned().collect()
176    }
177
178    /// Returns all subscriptions associated with a symbol
179    pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
180        let index = self.symbol_index.read().await;
181        let subs = self.subscriptions.read().await;
182
183        if let Some(streams) = index.get(symbol) {
184            streams
185                .iter()
186                .filter_map(|stream| subs.get(stream).cloned())
187                .collect()
188        } else {
189            Vec::new()
190        }
191    }
192
193    /// Returns the number of active subscriptions
194    pub fn active_count(&self) -> usize {
195        self.active_count.load(std::sync::atomic::Ordering::SeqCst)
196    }
197
198    /// Removes all subscriptions and clears indexes
199    pub async fn clear(&self) {
200        let mut subs = self.subscriptions.write().await;
201        let mut index = self.symbol_index.write().await;
202
203        subs.clear();
204        index.clear();
205        self.active_count
206            .store(0, std::sync::atomic::Ordering::SeqCst);
207    }
208
209    /// Sends a message to subscribers of a specific stream
210    pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
211        let subs = self.subscriptions.read().await;
212        if let Some(subscription) = subs.get(stream) {
213            subscription.send(message)
214        } else {
215            false
216        }
217    }
218
219    /// Sends a message to all subscribers of a symbol
220    pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
221        let index = self.symbol_index.read().await;
222        let subs = self.subscriptions.read().await;
223
224        let mut sent_count = 0;
225
226        if let Some(streams) = index.get(symbol) {
227            for stream in streams {
228                if let Some(subscription) = subs.get(stream) {
229                    if subscription.send(message.clone()) {
230                        sent_count += 1;
231                    }
232                }
233            }
234        }
235
236        sent_count
237    }
238}
239
240impl Default for SubscriptionManager {
241    fn default() -> Self {
242        Self::new()
243    }
244}
245
246/// Reconnect configuration
247#[derive(Debug, Clone)]
248pub struct ReconnectConfig {
249    /// Enables or disables automatic reconnection
250    pub enabled: bool,
251
252    /// Initial reconnection delay in milliseconds
253    pub initial_delay_ms: u64,
254
255    /// Maximum reconnection delay in milliseconds
256    pub max_delay_ms: u64,
257
258    /// Exponential backoff multiplier
259    pub backoff_multiplier: f64,
260
261    /// Maximum number of reconnection attempts (0 means unlimited)
262    pub max_attempts: usize,
263}
264
265impl Default for ReconnectConfig {
266    fn default() -> Self {
267        Self {
268            enabled: true,
269            initial_delay_ms: 1000,
270            max_delay_ms: 30000,
271            backoff_multiplier: 2.0,
272            max_attempts: 0,
273        }
274    }
275}
276
277impl ReconnectConfig {
278    /// Calculates the reconnection delay
279    pub fn calculate_delay(&self, attempt: usize) -> u64 {
280        let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
281        delay.min(self.max_delay_ms as f64) as u64
282    }
283
284    /// Determines whether another reconnection attempt should be made
285    pub fn should_retry(&self, attempt: usize) -> bool {
286        self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
287    }
288}