ccxt_exchanges/binance/ws/
types.rs1use ccxt_core::ws_client::BackpressureStrategy;
7use std::sync::atomic::{AtomicBool, AtomicU64};
8use std::time::Duration;
9
10pub const DEFAULT_TICKER_CAPACITY: usize = 256;
12pub const DEFAULT_ORDERBOOK_CAPACITY: usize = 512;
14pub const DEFAULT_TRADES_CAPACITY: usize = 1024;
16pub const DEFAULT_USER_DATA_CAPACITY: usize = 256;
18
19#[derive(Debug, Clone)]
25pub struct WsChannelConfig {
26 pub ticker_capacity: usize,
28 pub orderbook_capacity: usize,
30 pub trades_capacity: usize,
32 pub user_data_capacity: usize,
34}
35
36impl Default for WsChannelConfig {
37 fn default() -> Self {
38 Self {
39 ticker_capacity: DEFAULT_TICKER_CAPACITY,
40 orderbook_capacity: DEFAULT_ORDERBOOK_CAPACITY,
41 trades_capacity: DEFAULT_TRADES_CAPACITY,
42 user_data_capacity: DEFAULT_USER_DATA_CAPACITY,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
49pub enum DepthLevel {
50 L5 = 5,
52 L10 = 10,
54 #[default]
56 L20 = 20,
57}
58
59impl DepthLevel {
60 pub fn as_u32(self) -> u32 {
62 self as u32
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
68pub enum UpdateSpeed {
69 #[default]
71 Ms100,
72 Ms1000,
74}
75
76impl UpdateSpeed {
77 pub fn as_str(&self) -> &'static str {
79 match self {
80 Self::Ms100 => "100ms",
81 Self::Ms1000 => "1000ms",
82 }
83 }
84
85 pub fn as_millis(&self) -> u64 {
87 match self {
88 Self::Ms100 => 100,
89 Self::Ms1000 => 1000,
90 }
91 }
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum WsErrorRecovery {
99 Retry {
101 max_attempts: u32,
103 backoff: Duration,
105 },
106 Resync,
108 Reconnect,
110 Fatal,
112}
113
114impl WsErrorRecovery {
115 pub fn retry_default() -> Self {
117 Self::Retry {
118 max_attempts: 3,
119 backoff: Duration::from_secs(1),
120 }
121 }
122
123 pub fn is_recoverable(&self) -> bool {
125 !matches!(self, Self::Fatal)
126 }
127
128 pub fn from_error_message(error_msg: &str) -> Self {
133 let lower = error_msg.to_lowercase();
134
135 if is_fatal_error(&lower) {
137 return Self::Fatal;
138 }
139
140 if is_resync_error(&lower) {
142 return Self::Resync;
143 }
144
145 if is_reconnect_error(&lower) {
147 return Self::Reconnect;
148 }
149
150 Self::retry_default()
152 }
153}
154
155fn is_fatal_error(error_msg: &str) -> bool {
163 const FATAL_PATTERNS: &[&str] = &[
164 "invalid api",
165 "api key",
166 "authentication",
167 "unauthorized",
168 "permission denied",
169 "forbidden",
170 "account",
171 "banned",
172 "ip banned",
173 "invalid signature",
174 "signature",
175 ];
176
177 FATAL_PATTERNS.iter().any(|p| error_msg.contains(p))
178}
179
180fn is_resync_error(error_msg: &str) -> bool {
187 const RESYNC_PATTERNS: &[&str] = &[
188 "resync",
189 "sequence",
190 "out of sync",
191 "stale",
192 "gap",
193 "missing update",
194 ];
195
196 RESYNC_PATTERNS.iter().any(|p| error_msg.contains(p))
197}
198
199fn is_reconnect_error(error_msg: &str) -> bool {
206 const RECONNECT_PATTERNS: &[&str] = &[
207 "connection closed",
208 "connection reset",
209 "disconnected",
210 "eof",
211 "broken pipe",
212 "connection refused",
213 "server closed",
214 ];
215
216 RECONNECT_PATTERNS.iter().any(|p| error_msg.contains(p))
217}
218
219#[derive(Debug, Clone, Default)]
223pub struct WsHealthSnapshot {
224 pub latency_ms: Option<i64>,
226 pub messages_received: u64,
228 pub messages_dropped: u64,
230 pub last_message_time: Option<i64>,
232 pub connection_uptime_ms: u64,
234 pub reconnect_count: u32,
236}
237
238impl WsHealthSnapshot {
239 pub fn is_healthy(&self) -> bool {
241 if let Some(last_time) = self.last_message_time {
245 let now = chrono::Utc::now().timestamp_millis();
246 if now - last_time > 60_000 {
247 return false;
248 }
249 }
250
251 if self.messages_received > 0 {
252 let drop_rate = self.messages_dropped as f64 / self.messages_received as f64;
253 if drop_rate > 0.1 {
254 return false;
255 }
256 }
257
258 true
259 }
260}
261
262#[derive(Debug, Default)]
264pub struct WsStats {
265 pub messages_received: AtomicU64,
267 pub messages_dropped: AtomicU64,
269 pub last_message_time: AtomicU64,
271 pub connection_start_time: AtomicU64,
273 pub reconnect_count: std::sync::atomic::AtomicU32,
275}
276
277#[derive(Debug, Clone)]
281pub struct BinanceWsConfig {
282 pub url: String,
284 pub channel_config: WsChannelConfig,
286 pub backpressure_strategy: BackpressureStrategy,
288 pub shutdown_timeout_ms: u64,
290}
291
292impl BinanceWsConfig {
293 pub fn new(url: String) -> Self {
295 Self {
296 url,
297 channel_config: WsChannelConfig::default(),
298 backpressure_strategy: BackpressureStrategy::DropOldest,
299 shutdown_timeout_ms: 5000,
300 }
301 }
302
303 pub fn with_channel_config(mut self, config: WsChannelConfig) -> Self {
305 self.channel_config = config;
306 self
307 }
308
309 pub fn with_backpressure(mut self, strategy: BackpressureStrategy) -> Self {
311 self.backpressure_strategy = strategy;
312 self
313 }
314
315 pub fn with_shutdown_timeout(mut self, timeout_ms: u64) -> Self {
317 self.shutdown_timeout_ms = timeout_ms;
318 self
319 }
320}
321
322#[derive(Debug, Default)]
324pub struct ShutdownState {
325 pub is_shutting_down: AtomicBool,
327 pub shutdown_complete: AtomicBool,
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn test_depth_level_values() {
337 assert_eq!(DepthLevel::L5.as_u32(), 5);
338 assert_eq!(DepthLevel::L10.as_u32(), 10);
339 assert_eq!(DepthLevel::L20.as_u32(), 20);
340 }
341
342 #[test]
343 fn test_update_speed_str() {
344 assert_eq!(UpdateSpeed::Ms100.as_str(), "100ms");
345 assert_eq!(UpdateSpeed::Ms1000.as_str(), "1000ms");
346 }
347
348 #[test]
349 fn test_update_speed_millis() {
350 assert_eq!(UpdateSpeed::Ms100.as_millis(), 100);
351 assert_eq!(UpdateSpeed::Ms1000.as_millis(), 1000);
352 }
353
354 #[test]
355 fn test_ws_error_recovery_is_recoverable() {
356 assert!(WsErrorRecovery::retry_default().is_recoverable());
357 assert!(WsErrorRecovery::Resync.is_recoverable());
358 assert!(WsErrorRecovery::Reconnect.is_recoverable());
359 assert!(!WsErrorRecovery::Fatal.is_recoverable());
360 }
361
362 #[test]
363 fn test_ws_channel_config_default() {
364 let config = WsChannelConfig::default();
365 assert_eq!(config.ticker_capacity, DEFAULT_TICKER_CAPACITY);
366 assert_eq!(config.orderbook_capacity, DEFAULT_ORDERBOOK_CAPACITY);
367 assert_eq!(config.trades_capacity, DEFAULT_TRADES_CAPACITY);
368 assert_eq!(config.user_data_capacity, DEFAULT_USER_DATA_CAPACITY);
369 }
370
371 #[test]
372 fn test_binance_ws_config_builder() {
373 let config = BinanceWsConfig::new("wss://test.com".to_string())
374 .with_backpressure(BackpressureStrategy::DropNewest)
375 .with_shutdown_timeout(10000);
376
377 assert_eq!(config.url, "wss://test.com");
378 assert_eq!(
379 config.backpressure_strategy,
380 BackpressureStrategy::DropNewest
381 );
382 assert_eq!(config.shutdown_timeout_ms, 10000);
383 }
384
385 #[test]
386 fn test_ws_error_recovery_fatal_errors() {
387 assert_eq!(
388 WsErrorRecovery::from_error_message("Invalid API key"),
389 WsErrorRecovery::Fatal
390 );
391 assert_eq!(
392 WsErrorRecovery::from_error_message("Authentication failed"),
393 WsErrorRecovery::Fatal
394 );
395 assert_eq!(
396 WsErrorRecovery::from_error_message("Permission denied"),
397 WsErrorRecovery::Fatal
398 );
399 }
400
401 #[test]
402 fn test_ws_error_recovery_resync_errors() {
403 assert_eq!(
404 WsErrorRecovery::from_error_message("RESYNC_NEEDED: sequence gap"),
405 WsErrorRecovery::Resync
406 );
407 assert_eq!(
408 WsErrorRecovery::from_error_message("Out of sync with server"),
409 WsErrorRecovery::Resync
410 );
411 }
412
413 #[test]
414 fn test_ws_error_recovery_reconnect_errors() {
415 assert_eq!(
416 WsErrorRecovery::from_error_message("Connection closed by server"),
417 WsErrorRecovery::Reconnect
418 );
419 assert_eq!(
420 WsErrorRecovery::from_error_message("Connection reset"),
421 WsErrorRecovery::Reconnect
422 );
423 }
424
425 #[test]
426 fn test_ws_error_recovery_transient_errors() {
427 let recovery = WsErrorRecovery::from_error_message("Network timeout");
429 assert!(matches!(recovery, WsErrorRecovery::Retry { .. }));
430 }
431}