Skip to main content

ccxt_exchanges/binance/ws/
types.rs

1//! WebSocket types and configuration for Binance
2//!
3//! This module contains configuration structures, enums, and types used
4//! throughout the Binance WebSocket implementation.
5
6use ccxt_core::ws_client::BackpressureStrategy;
7use std::sync::atomic::{AtomicBool, AtomicU64};
8use std::time::Duration;
9
10/// Default channel capacity for ticker streams
11pub const DEFAULT_TICKER_CAPACITY: usize = 256;
12/// Default channel capacity for orderbook streams
13pub const DEFAULT_ORDERBOOK_CAPACITY: usize = 512;
14/// Default channel capacity for trade streams
15pub const DEFAULT_TRADES_CAPACITY: usize = 1024;
16/// Default channel capacity for user data streams
17pub const DEFAULT_USER_DATA_CAPACITY: usize = 256;
18
19/// Configuration for WebSocket channel capacities.
20///
21/// Different data streams have different message frequencies:
22/// - Trades are highest frequency, need larger buffers
23/// - Tickers and user data are lower frequency
24#[derive(Debug, Clone)]
25pub struct WsChannelConfig {
26    /// Channel capacity for ticker streams (default: 256)
27    pub ticker_capacity: usize,
28    /// Channel capacity for orderbook streams (default: 512)
29    pub orderbook_capacity: usize,
30    /// Channel capacity for trade streams (default: 1024)
31    pub trades_capacity: usize,
32    /// Channel capacity for user data streams (default: 256)
33    pub user_data_capacity: usize,
34}
35
36impl Default for WsChannelConfig {
37    fn default() -> Self {
38        Self {
39            ticker_capacity: DEFAULT_TICKER_CAPACITY,
40            orderbook_capacity: DEFAULT_ORDERBOOK_CAPACITY,
41            trades_capacity: DEFAULT_TRADES_CAPACITY,
42            user_data_capacity: DEFAULT_USER_DATA_CAPACITY,
43        }
44    }
45}
46
47/// Order book depth levels supported by Binance.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
49pub enum DepthLevel {
50    /// 5 levels of depth
51    L5 = 5,
52    /// 10 levels of depth
53    L10 = 10,
54    /// 20 levels of depth
55    #[default]
56    L20 = 20,
57}
58
59impl DepthLevel {
60    /// Returns the numeric value of the depth level
61    pub fn as_u32(self) -> u32 {
62        self as u32
63    }
64}
65
66/// Update speed for order book streams.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
68pub enum UpdateSpeed {
69    /// 100 millisecond updates (faster, more bandwidth)
70    #[default]
71    Ms100,
72    /// 1000 millisecond updates (slower, less bandwidth)
73    Ms1000,
74}
75
76impl UpdateSpeed {
77    /// Returns the string representation for Binance API
78    pub fn as_str(&self) -> &'static str {
79        match self {
80            Self::Ms100 => "100ms",
81            Self::Ms1000 => "1000ms",
82        }
83    }
84
85    /// Returns the millisecond value
86    pub fn as_millis(&self) -> u64 {
87        match self {
88            Self::Ms100 => 100,
89            Self::Ms1000 => 1000,
90        }
91    }
92}
93
94/// Error recovery strategy for WebSocket errors.
95///
96/// This enum helps callers understand how to handle different error types.
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum WsErrorRecovery {
99    /// Retry the operation with exponential backoff
100    Retry {
101        /// Maximum number of retry attempts
102        max_attempts: u32,
103        /// Base backoff duration
104        backoff: Duration,
105    },
106    /// Resync state (e.g., fetch fresh orderbook snapshot)
107    Resync,
108    /// Reconnect the WebSocket connection
109    Reconnect,
110    /// Fatal error - requires user intervention
111    Fatal,
112}
113
114impl WsErrorRecovery {
115    /// Creates a retry recovery with default settings
116    pub fn retry_default() -> Self {
117        Self::Retry {
118            max_attempts: 3,
119            backoff: Duration::from_secs(1),
120        }
121    }
122
123    /// Returns true if this is a recoverable error
124    pub fn is_recoverable(&self) -> bool {
125        !matches!(self, Self::Fatal)
126    }
127
128    /// Determines the recovery strategy for a given error message.
129    ///
130    /// This classifies errors into transient (retryable), resync-needed,
131    /// reconnect-needed, or fatal categories.
132    pub fn from_error_message(error_msg: &str) -> Self {
133        let lower = error_msg.to_lowercase();
134
135        // Fatal errors - require user intervention
136        if is_fatal_error(&lower) {
137            return Self::Fatal;
138        }
139
140        // Resync errors - need to refresh state
141        if is_resync_error(&lower) {
142            return Self::Resync;
143        }
144
145        // Reconnect errors - need new connection
146        if is_reconnect_error(&lower) {
147            return Self::Reconnect;
148        }
149
150        // Default to retry for transient errors
151        Self::retry_default()
152    }
153}
154
155/// Checks if an error message indicates a fatal (non-recoverable) error.
156///
157/// Fatal errors include:
158/// - Authentication failures
159/// - Invalid API keys
160/// - Permission denied
161/// - Account issues
162fn is_fatal_error(error_msg: &str) -> bool {
163    const FATAL_PATTERNS: &[&str] = &[
164        "invalid api",
165        "api key",
166        "authentication",
167        "unauthorized",
168        "permission denied",
169        "forbidden",
170        "account",
171        "banned",
172        "ip banned",
173        "invalid signature",
174        "signature",
175    ];
176
177    FATAL_PATTERNS.iter().any(|p| error_msg.contains(p))
178}
179
180/// Checks if an error message indicates a resync is needed.
181///
182/// Resync errors include:
183/// - Sequence gaps in orderbook
184/// - Stale data
185/// - Out of sync
186fn is_resync_error(error_msg: &str) -> bool {
187    const RESYNC_PATTERNS: &[&str] = &[
188        "resync",
189        "sequence",
190        "out of sync",
191        "stale",
192        "gap",
193        "missing update",
194    ];
195
196    RESYNC_PATTERNS.iter().any(|p| error_msg.contains(p))
197}
198
199/// Checks if an error message indicates a reconnection is needed.
200///
201/// Reconnect errors include:
202/// - Connection closed
203/// - Connection reset
204/// - Server disconnect
205fn is_reconnect_error(error_msg: &str) -> bool {
206    const RECONNECT_PATTERNS: &[&str] = &[
207        "connection closed",
208        "connection reset",
209        "disconnected",
210        "eof",
211        "broken pipe",
212        "connection refused",
213        "server closed",
214    ];
215
216    RECONNECT_PATTERNS.iter().any(|p| error_msg.contains(p))
217}
218
219/// Health snapshot for WebSocket connection monitoring.
220///
221/// Provides metrics for monitoring connection health and performance.
222#[derive(Debug, Clone, Default)]
223pub struct WsHealthSnapshot {
224    /// Round-trip latency in milliseconds (from ping/pong)
225    pub latency_ms: Option<i64>,
226    /// Total number of messages received
227    pub messages_received: u64,
228    /// Number of messages dropped due to backpressure
229    pub messages_dropped: u64,
230    /// Timestamp of the last received message (milliseconds since epoch)
231    pub last_message_time: Option<i64>,
232    /// Connection uptime in milliseconds
233    pub connection_uptime_ms: u64,
234    /// Number of reconnection attempts
235    pub reconnect_count: u32,
236}
237
238impl WsHealthSnapshot {
239    /// Returns true if the connection appears healthy
240    pub fn is_healthy(&self) -> bool {
241        // Consider unhealthy if:
242        // - No messages received in last 60 seconds
243        // - Drop rate > 10%
244        if let Some(last_time) = self.last_message_time {
245            let now = chrono::Utc::now().timestamp_millis();
246            if now - last_time > 60_000 {
247                return false;
248            }
249        }
250
251        if self.messages_received > 0 {
252            let drop_rate = self.messages_dropped as f64 / self.messages_received as f64;
253            if drop_rate > 0.1 {
254                return false;
255            }
256        }
257
258        true
259    }
260}
261
262/// Internal statistics tracking for WebSocket connections.
263#[derive(Debug, Default)]
264pub struct WsStats {
265    /// Total messages received
266    pub messages_received: AtomicU64,
267    /// Total messages dropped
268    pub messages_dropped: AtomicU64,
269    /// Last message timestamp
270    pub last_message_time: AtomicU64,
271    /// Connection start time
272    pub connection_start_time: AtomicU64,
273    /// Reconnect count
274    pub reconnect_count: std::sync::atomic::AtomicU32,
275}
276
277/// Configuration for Binance WebSocket client.
278///
279/// Combines all configuration options for creating a BinanceWs instance.
280#[derive(Debug, Clone)]
281pub struct BinanceWsConfig {
282    /// WebSocket endpoint URL
283    pub url: String,
284    /// Channel capacity configuration
285    pub channel_config: WsChannelConfig,
286    /// Backpressure strategy for handling channel overflow
287    pub backpressure_strategy: BackpressureStrategy,
288    /// Shutdown timeout in milliseconds
289    pub shutdown_timeout_ms: u64,
290}
291
292impl BinanceWsConfig {
293    /// Creates a new configuration with the given URL
294    pub fn new(url: String) -> Self {
295        Self {
296            url,
297            channel_config: WsChannelConfig::default(),
298            backpressure_strategy: BackpressureStrategy::DropOldest,
299            shutdown_timeout_ms: 5000,
300        }
301    }
302
303    /// Sets the channel configuration
304    pub fn with_channel_config(mut self, config: WsChannelConfig) -> Self {
305        self.channel_config = config;
306        self
307    }
308
309    /// Sets the backpressure strategy
310    pub fn with_backpressure(mut self, strategy: BackpressureStrategy) -> Self {
311        self.backpressure_strategy = strategy;
312        self
313    }
314
315    /// Sets the shutdown timeout
316    pub fn with_shutdown_timeout(mut self, timeout_ms: u64) -> Self {
317        self.shutdown_timeout_ms = timeout_ms;
318        self
319    }
320}
321
322/// Shutdown state tracking
323#[derive(Debug, Default)]
324pub struct ShutdownState {
325    /// Whether shutdown has been initiated
326    pub is_shutting_down: AtomicBool,
327    /// Whether shutdown has completed
328    pub shutdown_complete: AtomicBool,
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[test]
336    fn test_depth_level_values() {
337        assert_eq!(DepthLevel::L5.as_u32(), 5);
338        assert_eq!(DepthLevel::L10.as_u32(), 10);
339        assert_eq!(DepthLevel::L20.as_u32(), 20);
340    }
341
342    #[test]
343    fn test_update_speed_str() {
344        assert_eq!(UpdateSpeed::Ms100.as_str(), "100ms");
345        assert_eq!(UpdateSpeed::Ms1000.as_str(), "1000ms");
346    }
347
348    #[test]
349    fn test_update_speed_millis() {
350        assert_eq!(UpdateSpeed::Ms100.as_millis(), 100);
351        assert_eq!(UpdateSpeed::Ms1000.as_millis(), 1000);
352    }
353
354    #[test]
355    fn test_ws_error_recovery_is_recoverable() {
356        assert!(WsErrorRecovery::retry_default().is_recoverable());
357        assert!(WsErrorRecovery::Resync.is_recoverable());
358        assert!(WsErrorRecovery::Reconnect.is_recoverable());
359        assert!(!WsErrorRecovery::Fatal.is_recoverable());
360    }
361
362    #[test]
363    fn test_ws_channel_config_default() {
364        let config = WsChannelConfig::default();
365        assert_eq!(config.ticker_capacity, DEFAULT_TICKER_CAPACITY);
366        assert_eq!(config.orderbook_capacity, DEFAULT_ORDERBOOK_CAPACITY);
367        assert_eq!(config.trades_capacity, DEFAULT_TRADES_CAPACITY);
368        assert_eq!(config.user_data_capacity, DEFAULT_USER_DATA_CAPACITY);
369    }
370
371    #[test]
372    fn test_binance_ws_config_builder() {
373        let config = BinanceWsConfig::new("wss://test.com".to_string())
374            .with_backpressure(BackpressureStrategy::DropNewest)
375            .with_shutdown_timeout(10000);
376
377        assert_eq!(config.url, "wss://test.com");
378        assert_eq!(
379            config.backpressure_strategy,
380            BackpressureStrategy::DropNewest
381        );
382        assert_eq!(config.shutdown_timeout_ms, 10000);
383    }
384
385    #[test]
386    fn test_ws_error_recovery_fatal_errors() {
387        assert_eq!(
388            WsErrorRecovery::from_error_message("Invalid API key"),
389            WsErrorRecovery::Fatal
390        );
391        assert_eq!(
392            WsErrorRecovery::from_error_message("Authentication failed"),
393            WsErrorRecovery::Fatal
394        );
395        assert_eq!(
396            WsErrorRecovery::from_error_message("Permission denied"),
397            WsErrorRecovery::Fatal
398        );
399    }
400
401    #[test]
402    fn test_ws_error_recovery_resync_errors() {
403        assert_eq!(
404            WsErrorRecovery::from_error_message("RESYNC_NEEDED: sequence gap"),
405            WsErrorRecovery::Resync
406        );
407        assert_eq!(
408            WsErrorRecovery::from_error_message("Out of sync with server"),
409            WsErrorRecovery::Resync
410        );
411    }
412
413    #[test]
414    fn test_ws_error_recovery_reconnect_errors() {
415        assert_eq!(
416            WsErrorRecovery::from_error_message("Connection closed by server"),
417            WsErrorRecovery::Reconnect
418        );
419        assert_eq!(
420            WsErrorRecovery::from_error_message("Connection reset"),
421            WsErrorRecovery::Reconnect
422        );
423    }
424
425    #[test]
426    fn test_ws_error_recovery_transient_errors() {
427        // Unknown errors default to retry
428        let recovery = WsErrorRecovery::from_error_message("Network timeout");
429        assert!(matches!(recovery, WsErrorRecovery::Retry { .. }));
430    }
431}