use ccxt_core::ws_client::BackpressureStrategy;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::time::Duration;
pub const DEFAULT_TICKER_CAPACITY: usize = 256;
pub const DEFAULT_ORDERBOOK_CAPACITY: usize = 512;
pub const DEFAULT_TRADES_CAPACITY: usize = 1024;
pub const DEFAULT_USER_DATA_CAPACITY: usize = 256;
#[derive(Debug, Clone)]
pub struct WsChannelConfig {
pub ticker_capacity: usize,
pub orderbook_capacity: usize,
pub trades_capacity: usize,
pub user_data_capacity: usize,
}
impl Default for WsChannelConfig {
fn default() -> Self {
Self {
ticker_capacity: DEFAULT_TICKER_CAPACITY,
orderbook_capacity: DEFAULT_ORDERBOOK_CAPACITY,
trades_capacity: DEFAULT_TRADES_CAPACITY,
user_data_capacity: DEFAULT_USER_DATA_CAPACITY,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum DepthLevel {
L5 = 5,
L10 = 10,
#[default]
L20 = 20,
}
impl DepthLevel {
pub fn as_u32(self) -> u32 {
self as u32
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum UpdateSpeed {
#[default]
Ms100,
Ms1000,
}
impl UpdateSpeed {
pub fn as_str(&self) -> &'static str {
match self {
Self::Ms100 => "100ms",
Self::Ms1000 => "1000ms",
}
}
pub fn as_millis(&self) -> u64 {
match self {
Self::Ms100 => 100,
Self::Ms1000 => 1000,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WsErrorRecovery {
Retry {
max_attempts: u32,
backoff: Duration,
},
Resync,
Reconnect,
Fatal,
}
impl WsErrorRecovery {
pub fn retry_default() -> Self {
Self::Retry {
max_attempts: 3,
backoff: Duration::from_secs(1),
}
}
pub fn is_recoverable(&self) -> bool {
!matches!(self, Self::Fatal)
}
pub fn from_error_message(error_msg: &str) -> Self {
let lower = error_msg.to_lowercase();
if is_fatal_error(&lower) {
return Self::Fatal;
}
if is_resync_error(&lower) {
return Self::Resync;
}
if is_reconnect_error(&lower) {
return Self::Reconnect;
}
Self::retry_default()
}
}
fn is_fatal_error(error_msg: &str) -> bool {
const FATAL_PATTERNS: &[&str] = &[
"invalid api",
"api key",
"authentication",
"unauthorized",
"permission denied",
"forbidden",
"account",
"banned",
"ip banned",
"invalid signature",
"signature",
];
FATAL_PATTERNS.iter().any(|p| error_msg.contains(p))
}
fn is_resync_error(error_msg: &str) -> bool {
const RESYNC_PATTERNS: &[&str] = &[
"resync",
"sequence",
"out of sync",
"stale",
"gap",
"missing update",
];
RESYNC_PATTERNS.iter().any(|p| error_msg.contains(p))
}
fn is_reconnect_error(error_msg: &str) -> bool {
const RECONNECT_PATTERNS: &[&str] = &[
"connection closed",
"connection reset",
"disconnected",
"eof",
"broken pipe",
"connection refused",
"server closed",
];
RECONNECT_PATTERNS.iter().any(|p| error_msg.contains(p))
}
#[derive(Debug, Clone, Default)]
pub struct WsHealthSnapshot {
pub latency_ms: Option<i64>,
pub messages_received: u64,
pub messages_dropped: u64,
pub last_message_time: Option<i64>,
pub connection_uptime_ms: u64,
pub reconnect_count: u32,
}
impl WsHealthSnapshot {
pub fn is_healthy(&self) -> bool {
if let Some(last_time) = self.last_message_time {
let now = chrono::Utc::now().timestamp_millis();
if now - last_time > 60_000 {
return false;
}
}
if self.messages_received > 0 {
let drop_rate = self.messages_dropped as f64 / self.messages_received as f64;
if drop_rate > 0.1 {
return false;
}
}
true
}
}
#[derive(Debug, Default)]
pub struct WsStats {
pub messages_received: AtomicU64,
pub messages_dropped: AtomicU64,
pub last_message_time: AtomicU64,
pub connection_start_time: AtomicU64,
pub reconnect_count: std::sync::atomic::AtomicU32,
}
#[derive(Debug, Clone)]
pub struct BinanceWsConfig {
pub url: String,
pub channel_config: WsChannelConfig,
pub backpressure_strategy: BackpressureStrategy,
pub shutdown_timeout_ms: u64,
}
impl BinanceWsConfig {
pub fn new(url: String) -> Self {
Self {
url,
channel_config: WsChannelConfig::default(),
backpressure_strategy: BackpressureStrategy::DropOldest,
shutdown_timeout_ms: 5000,
}
}
pub fn with_channel_config(mut self, config: WsChannelConfig) -> Self {
self.channel_config = config;
self
}
pub fn with_backpressure(mut self, strategy: BackpressureStrategy) -> Self {
self.backpressure_strategy = strategy;
self
}
pub fn with_shutdown_timeout(mut self, timeout_ms: u64) -> Self {
self.shutdown_timeout_ms = timeout_ms;
self
}
}
#[derive(Debug, Default)]
pub struct ShutdownState {
pub is_shutting_down: AtomicBool,
pub shutdown_complete: AtomicBool,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_depth_level_values() {
assert_eq!(DepthLevel::L5.as_u32(), 5);
assert_eq!(DepthLevel::L10.as_u32(), 10);
assert_eq!(DepthLevel::L20.as_u32(), 20);
}
#[test]
fn test_update_speed_str() {
assert_eq!(UpdateSpeed::Ms100.as_str(), "100ms");
assert_eq!(UpdateSpeed::Ms1000.as_str(), "1000ms");
}
#[test]
fn test_update_speed_millis() {
assert_eq!(UpdateSpeed::Ms100.as_millis(), 100);
assert_eq!(UpdateSpeed::Ms1000.as_millis(), 1000);
}
#[test]
fn test_ws_error_recovery_is_recoverable() {
assert!(WsErrorRecovery::retry_default().is_recoverable());
assert!(WsErrorRecovery::Resync.is_recoverable());
assert!(WsErrorRecovery::Reconnect.is_recoverable());
assert!(!WsErrorRecovery::Fatal.is_recoverable());
}
#[test]
fn test_ws_channel_config_default() {
let config = WsChannelConfig::default();
assert_eq!(config.ticker_capacity, DEFAULT_TICKER_CAPACITY);
assert_eq!(config.orderbook_capacity, DEFAULT_ORDERBOOK_CAPACITY);
assert_eq!(config.trades_capacity, DEFAULT_TRADES_CAPACITY);
assert_eq!(config.user_data_capacity, DEFAULT_USER_DATA_CAPACITY);
}
#[test]
fn test_binance_ws_config_builder() {
let config = BinanceWsConfig::new("wss://test.com".to_string())
.with_backpressure(BackpressureStrategy::DropNewest)
.with_shutdown_timeout(10000);
assert_eq!(config.url, "wss://test.com");
assert_eq!(
config.backpressure_strategy,
BackpressureStrategy::DropNewest
);
assert_eq!(config.shutdown_timeout_ms, 10000);
}
#[test]
fn test_ws_error_recovery_fatal_errors() {
assert_eq!(
WsErrorRecovery::from_error_message("Invalid API key"),
WsErrorRecovery::Fatal
);
assert_eq!(
WsErrorRecovery::from_error_message("Authentication failed"),
WsErrorRecovery::Fatal
);
assert_eq!(
WsErrorRecovery::from_error_message("Permission denied"),
WsErrorRecovery::Fatal
);
}
#[test]
fn test_ws_error_recovery_resync_errors() {
assert_eq!(
WsErrorRecovery::from_error_message("RESYNC_NEEDED: sequence gap"),
WsErrorRecovery::Resync
);
assert_eq!(
WsErrorRecovery::from_error_message("Out of sync with server"),
WsErrorRecovery::Resync
);
}
#[test]
fn test_ws_error_recovery_reconnect_errors() {
assert_eq!(
WsErrorRecovery::from_error_message("Connection closed by server"),
WsErrorRecovery::Reconnect
);
assert_eq!(
WsErrorRecovery::from_error_message("Connection reset"),
WsErrorRecovery::Reconnect
);
}
#[test]
fn test_ws_error_recovery_transient_errors() {
let recovery = WsErrorRecovery::from_error_message("Network timeout");
assert!(matches!(recovery, WsErrorRecovery::Retry { .. }));
}
}