ccxt_exchanges/binance/ws/
subscriptions.rs1use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7use tokio::sync::RwLock;
8
9#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub enum SubscriptionType {
12 Ticker,
14 OrderBook,
16 Trades,
18 Kline(String),
20 Balance,
22 Orders,
24 Positions,
26 MyTrades,
28 MarkPrice,
30 BookTicker,
32}
33
34impl SubscriptionType {
35 pub fn from_stream(stream: &str) -> Option<Self> {
37 if stream.contains("@ticker") {
38 Some(Self::Ticker)
39 } else if stream.contains("@depth") {
40 Some(Self::OrderBook)
41 } else if stream.contains("@trade") || stream.contains("@aggTrade") {
42 Some(Self::Trades)
43 } else if stream.contains("@kline_") {
44 let parts: Vec<&str> = stream.split("@kline_").collect();
45 if parts.len() == 2 {
46 Some(Self::Kline(parts[1].to_string()))
47 } else {
48 None
49 }
50 } else if stream.contains("@markPrice") {
51 Some(Self::MarkPrice)
52 } else if stream.contains("@bookTicker") {
53 Some(Self::BookTicker)
54 } else {
55 None
56 }
57 }
58}
59
60#[derive(Clone)]
62pub struct Subscription {
63 pub stream: String,
65 pub symbol: String,
67 pub sub_type: SubscriptionType,
69 pub subscribed_at: Instant,
71 pub sender: tokio::sync::mpsc::Sender<Value>,
73}
74
75impl Subscription {
76 pub fn new(
78 stream: String,
79 symbol: String,
80 sub_type: SubscriptionType,
81 sender: tokio::sync::mpsc::Sender<Value>,
82 ) -> Self {
83 Self {
84 stream,
85 symbol,
86 sub_type,
87 subscribed_at: Instant::now(),
88 sender,
89 }
90 }
91
92 pub fn send(&self, message: Value) -> bool {
94 self.sender.try_send(message).is_ok()
96 }
97}
98
99pub struct SubscriptionManager {
101 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
103 symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
105 active_count: Arc<std::sync::atomic::AtomicUsize>,
107}
108
109impl SubscriptionManager {
110 pub fn new() -> Self {
112 Self {
113 subscriptions: Arc::new(RwLock::new(HashMap::new())),
114 symbol_index: Arc::new(RwLock::new(HashMap::new())),
115 active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
116 }
117 }
118
119 pub async fn add_subscription(
121 &self,
122 stream: String,
123 symbol: String,
124 sub_type: SubscriptionType,
125 sender: tokio::sync::mpsc::Sender<Value>,
126 ) -> ccxt_core::error::Result<()> {
127 let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
128
129 let mut subs = self.subscriptions.write().await;
130 subs.insert(stream.clone(), subscription);
131
132 let mut index = self.symbol_index.write().await;
133 index.entry(symbol).or_insert_with(Vec::new).push(stream);
134
135 self.active_count
136 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
137
138 Ok(())
139 }
140
141 pub async fn remove_subscription(&self, stream: &str) -> ccxt_core::error::Result<()> {
143 let mut subs = self.subscriptions.write().await;
144
145 if let Some(subscription) = subs.remove(stream) {
146 let mut index = self.symbol_index.write().await;
147 if let Some(streams) = index.get_mut(&subscription.symbol) {
148 streams.retain(|s| s != stream);
149 if streams.is_empty() {
150 index.remove(&subscription.symbol);
151 }
152 }
153
154 self.active_count
155 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
156 }
157
158 Ok(())
159 }
160
161 pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
163 let subs = self.subscriptions.read().await;
164 subs.get(stream).cloned()
165 }
166
167 pub async fn has_subscription(&self, stream: &str) -> bool {
169 let subs = self.subscriptions.read().await;
170 subs.contains_key(stream)
171 }
172
173 pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
175 let subs = self.subscriptions.read().await;
176 subs.values().cloned().collect()
177 }
178
179 pub fn get_all_subscriptions_sync(&self) -> Vec<Subscription> {
181 if let Ok(subs) = self.subscriptions.try_read() {
182 subs.values().cloned().collect()
183 } else {
184 Vec::new()
185 }
186 }
187
188 pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
190 let index = self.symbol_index.read().await;
191 let subs = self.subscriptions.read().await;
192
193 if let Some(streams) = index.get(symbol) {
194 streams
195 .iter()
196 .filter_map(|stream| subs.get(stream).cloned())
197 .collect()
198 } else {
199 Vec::new()
200 }
201 }
202
203 pub fn active_count(&self) -> usize {
205 self.active_count.load(std::sync::atomic::Ordering::SeqCst)
206 }
207
208 pub async fn clear(&self) {
210 let mut subs = self.subscriptions.write().await;
211 let mut index = self.symbol_index.write().await;
212
213 subs.clear();
214 index.clear();
215 self.active_count
216 .store(0, std::sync::atomic::Ordering::SeqCst);
217 }
218
219 pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
221 let subs = self.subscriptions.read().await;
222 if let Some(subscription) = subs.get(stream) {
223 if subscription.send(message) {
224 return true;
225 }
226 } else {
227 return false;
228 }
229 drop(subs);
230
231 let _ = self.remove_subscription(stream).await;
232 false
233 }
234
235 pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
237 let index = self.symbol_index.read().await;
238 let subs = self.subscriptions.read().await;
239
240 let mut sent_count = 0;
241 let mut streams_to_remove = Vec::new();
242
243 if let Some(streams) = index.get(symbol) {
244 for stream in streams {
245 if let Some(subscription) = subs.get(stream) {
246 if subscription.send(message.clone()) {
247 sent_count += 1;
248 } else {
249 streams_to_remove.push(stream.clone());
250 }
251 }
252 }
253 }
254 drop(subs);
255 drop(index);
256
257 for stream in streams_to_remove {
258 let _ = self.remove_subscription(&stream).await;
259 }
260
261 sent_count
262 }
263
264 pub async fn get_active_streams(&self) -> Vec<String> {
266 let subs = self.subscriptions.read().await;
267 subs.keys().cloned().collect()
268 }
269}
270
271impl Default for SubscriptionManager {
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277#[derive(Debug, Clone)]
279pub struct ReconnectConfig {
280 pub enabled: bool,
282
283 pub initial_delay_ms: u64,
285
286 pub max_delay_ms: u64,
288
289 pub backoff_multiplier: f64,
291
292 pub max_attempts: usize,
294}
295
296impl Default for ReconnectConfig {
297 fn default() -> Self {
298 Self {
299 enabled: true,
300 initial_delay_ms: 1000,
301 max_delay_ms: 30000,
302 backoff_multiplier: 2.0,
303 max_attempts: 0,
304 }
305 }
306}
307
308impl ReconnectConfig {
309 pub fn calculate_delay(&self, attempt: usize) -> u64 {
311 let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
312 delay.min(self.max_delay_ms as f64) as u64
313 }
314
315 pub fn should_retry(&self, attempt: usize) -> bool {
317 self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
318 }
319}