ccxt_exchanges/binance/ws/
subscriptions.rs1use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7use tokio::sync::RwLock;
8
9#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub enum SubscriptionType {
12 Ticker,
14 OrderBook,
16 Trades,
18 Kline(String),
20 Balance,
22 Orders,
24 Positions,
26 MyTrades,
28 MarkPrice,
30 BookTicker,
32}
33
34impl SubscriptionType {
35 pub fn from_stream(stream: &str) -> Option<Self> {
37 if stream.contains("@ticker") {
38 Some(Self::Ticker)
39 } else if stream.contains("@depth") {
40 Some(Self::OrderBook)
41 } else if stream.contains("@trade") || stream.contains("@aggTrade") {
42 Some(Self::Trades)
43 } else if stream.contains("@kline_") {
44 let parts: Vec<&str> = stream.split("@kline_").collect();
45 if parts.len() == 2 {
46 Some(Self::Kline(parts[1].to_string()))
47 } else {
48 None
49 }
50 } else if stream.contains("@markPrice") {
51 Some(Self::MarkPrice)
52 } else if stream.contains("@bookTicker") {
53 Some(Self::BookTicker)
54 } else {
55 None
56 }
57 }
58}
59
60#[derive(Clone)]
62pub struct Subscription {
63 pub stream: String,
65 pub symbol: String,
67 pub sub_type: SubscriptionType,
69 pub subscribed_at: Instant,
71 pub sender: tokio::sync::mpsc::UnboundedSender<Value>,
73}
74
75impl Subscription {
76 pub fn new(
78 stream: String,
79 symbol: String,
80 sub_type: SubscriptionType,
81 sender: tokio::sync::mpsc::UnboundedSender<Value>,
82 ) -> Self {
83 Self {
84 stream,
85 symbol,
86 sub_type,
87 subscribed_at: Instant::now(),
88 sender,
89 }
90 }
91
92 pub fn send(&self, message: Value) -> bool {
94 self.sender.send(message).is_ok()
95 }
96}
97
98pub struct SubscriptionManager {
100 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
102 symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
104 active_count: Arc<std::sync::atomic::AtomicUsize>,
106}
107
108impl SubscriptionManager {
109 pub fn new() -> Self {
111 Self {
112 subscriptions: Arc::new(RwLock::new(HashMap::new())),
113 symbol_index: Arc::new(RwLock::new(HashMap::new())),
114 active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
115 }
116 }
117
118 pub async fn add_subscription(
120 &self,
121 stream: String,
122 symbol: String,
123 sub_type: SubscriptionType,
124 sender: tokio::sync::mpsc::UnboundedSender<Value>,
125 ) -> ccxt_core::error::Result<()> {
126 let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
127
128 let mut subs = self.subscriptions.write().await;
129 subs.insert(stream.clone(), subscription);
130
131 let mut index = self.symbol_index.write().await;
132 index.entry(symbol).or_insert_with(Vec::new).push(stream);
133
134 self.active_count
135 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
136
137 Ok(())
138 }
139
140 pub async fn remove_subscription(&self, stream: &str) -> ccxt_core::error::Result<()> {
142 let mut subs = self.subscriptions.write().await;
143
144 if let Some(subscription) = subs.remove(stream) {
145 let mut index = self.symbol_index.write().await;
146 if let Some(streams) = index.get_mut(&subscription.symbol) {
147 streams.retain(|s| s != stream);
148 if streams.is_empty() {
149 index.remove(&subscription.symbol);
150 }
151 }
152
153 self.active_count
154 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
155 }
156
157 Ok(())
158 }
159
160 pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
162 let subs = self.subscriptions.read().await;
163 subs.get(stream).cloned()
164 }
165
166 pub async fn has_subscription(&self, stream: &str) -> bool {
168 let subs = self.subscriptions.read().await;
169 subs.contains_key(stream)
170 }
171
172 pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
174 let subs = self.subscriptions.read().await;
175 subs.values().cloned().collect()
176 }
177
178 pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
180 let index = self.symbol_index.read().await;
181 let subs = self.subscriptions.read().await;
182
183 if let Some(streams) = index.get(symbol) {
184 streams
185 .iter()
186 .filter_map(|stream| subs.get(stream).cloned())
187 .collect()
188 } else {
189 Vec::new()
190 }
191 }
192
193 pub fn active_count(&self) -> usize {
195 self.active_count.load(std::sync::atomic::Ordering::SeqCst)
196 }
197
198 pub async fn clear(&self) {
200 let mut subs = self.subscriptions.write().await;
201 let mut index = self.symbol_index.write().await;
202
203 subs.clear();
204 index.clear();
205 self.active_count
206 .store(0, std::sync::atomic::Ordering::SeqCst);
207 }
208
209 pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
211 let subs = self.subscriptions.read().await;
212 if let Some(subscription) = subs.get(stream) {
213 subscription.send(message)
214 } else {
215 false
216 }
217 }
218
219 pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
221 let index = self.symbol_index.read().await;
222 let subs = self.subscriptions.read().await;
223
224 let mut sent_count = 0;
225
226 if let Some(streams) = index.get(symbol) {
227 for stream in streams {
228 if let Some(subscription) = subs.get(stream) {
229 if subscription.send(message.clone()) {
230 sent_count += 1;
231 }
232 }
233 }
234 }
235
236 sent_count
237 }
238}
239
240impl Default for SubscriptionManager {
241 fn default() -> Self {
242 Self::new()
243 }
244}
245
246#[derive(Debug, Clone)]
248pub struct ReconnectConfig {
249 pub enabled: bool,
251
252 pub initial_delay_ms: u64,
254
255 pub max_delay_ms: u64,
257
258 pub backoff_multiplier: f64,
260
261 pub max_attempts: usize,
263}
264
265impl Default for ReconnectConfig {
266 fn default() -> Self {
267 Self {
268 enabled: true,
269 initial_delay_ms: 1000,
270 max_delay_ms: 30000,
271 backoff_multiplier: 2.0,
272 max_attempts: 0,
273 }
274 }
275}
276
277impl ReconnectConfig {
278 pub fn calculate_delay(&self, attempt: usize) -> u64 {
280 let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
281 delay.min(self.max_delay_ms as f64) as u64
282 }
283
284 pub fn should_retry(&self, attempt: usize) -> bool {
286 self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
287 }
288}