ccxt_exchanges/binance/
ws.rs

1//! Binance WebSocket implementation
2//!
3//! Provides WebSocket real-time data stream subscriptions for the Binance exchange
4
5use crate::binance::Binance;
6use crate::binance::parser;
7use ccxt_core::error::{Error, Result};
8use ccxt_core::types::financial::{Amount, Cost, Price};
9use ccxt_core::types::{
10    Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
11};
12use ccxt_core::ws_client::{WsClient, WsConfig};
13use serde_json::Value;
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock};
18use tokio::task::JoinHandle;
19use tokio_tungstenite::tungstenite::protocol::Message;
20
21/// Binance WebSocket endpoints
22#[allow(dead_code)]
23const WS_BASE_URL: &str = "wss://stream.binance.com:9443/ws";
24#[allow(dead_code)]
25const WS_TESTNET_URL: &str = "wss://testnet.binance.vision/ws";
26
27/// Listen key refresh interval (30 minutes)
28const LISTEN_KEY_REFRESH_INTERVAL: Duration = Duration::from_secs(30 * 60);
29
30/// Listen key manager
31///
32/// Automatically manages Binance user data stream listen keys by:
33/// - Creating and caching listen keys
34/// - Refreshing them every 30 minutes
35/// - Detecting expiration and rebuilding
36/// - Tracking connection state
37pub struct ListenKeyManager {
38    /// Reference to the Binance instance
39    binance: Arc<Binance>,
40    /// Currently active listen key
41    listen_key: Arc<RwLock<Option<String>>>,
42    /// Listen key creation timestamp
43    created_at: Arc<RwLock<Option<Instant>>>,
44    /// Configured refresh interval
45    refresh_interval: Duration,
46    /// Handle to the auto-refresh task
47    refresh_task: Arc<Mutex<Option<JoinHandle<()>>>>,
48}
49
50impl ListenKeyManager {
51    /// Creates a new `ListenKeyManager`
52    ///
53    /// # Arguments
54    /// * `binance` - Reference-counted Binance instance
55    ///
56    /// # Returns
57    /// Configured `ListenKeyManager`
58    pub fn new(binance: Arc<Binance>) -> Self {
59        Self {
60            binance,
61            listen_key: Arc::new(RwLock::new(None)),
62            created_at: Arc::new(RwLock::new(None)),
63            refresh_interval: LISTEN_KEY_REFRESH_INTERVAL,
64            refresh_task: Arc::new(Mutex::new(None)),
65        }
66    }
67
68    /// Retrieves or creates a listen key
69    ///
70    /// Returns an existing valid listen key when present; otherwise creates a new one.
71    ///
72    /// # Returns
73    /// Listen key string
74    ///
75    /// # Errors
76    /// - Failed to create the listen key
77    /// - Missing API credentials
78    pub async fn get_or_create(&self) -> Result<String> {
79        // Check if a listen key already exists
80        let key_opt = self.listen_key.read().await.clone();
81
82        if let Some(key) = key_opt {
83            // Check whether the key needs to be refreshed
84            let created = self.created_at.read().await;
85            if let Some(created_time) = *created {
86                let elapsed = created_time.elapsed();
87                // If more than 50 minutes have elapsed, create a new key
88                if elapsed > Duration::from_secs(50 * 60) {
89                    drop(created);
90                    return self.create_new().await;
91                }
92            }
93            return Ok(key);
94        }
95
96        // Create a new listen key
97        self.create_new().await
98    }
99
100    /// Creates a new listen key
101    ///
102    /// # Returns
103    /// Newly created listen key
104    async fn create_new(&self) -> Result<String> {
105        let key = self.binance.create_listen_key().await?;
106
107        // Update cache
108        *self.listen_key.write().await = Some(key.clone());
109        *self.created_at.write().await = Some(Instant::now());
110
111        Ok(key)
112    }
113
114    /// Refreshes the current listen key
115    ///
116    /// Extends the listen key lifetime by 60 minutes
117    ///
118    /// # Returns
119    /// Refresh result
120    pub async fn refresh(&self) -> Result<()> {
121        let key_opt = self.listen_key.read().await.clone();
122
123        if let Some(key) = key_opt {
124            self.binance.refresh_listen_key(&key).await?;
125            // Update the creation timestamp
126            *self.created_at.write().await = Some(Instant::now());
127            Ok(())
128        } else {
129            Err(Error::invalid_request("No listen key to refresh"))
130        }
131    }
132
133    /// Starts the auto-refresh task
134    ///
135    /// Refreshes the listen key every 30 minutes
136    pub async fn start_auto_refresh(&self) {
137        // Stop any existing task
138        self.stop_auto_refresh().await;
139
140        let listen_key = self.listen_key.clone();
141        let created_at = self.created_at.clone();
142        let binance = self.binance.clone();
143        let interval = self.refresh_interval;
144
145        let handle = tokio::spawn(async move {
146            loop {
147                tokio::time::sleep(interval).await;
148
149                // Check whether a listen key exists
150                let key_opt = listen_key.read().await.clone();
151                if let Some(key) = key_opt {
152                    // Attempt to refresh the key
153                    match binance.refresh_listen_key(&key).await {
154                        Ok(()) => {
155                            *created_at.write().await = Some(Instant::now());
156                            // Listen key refreshed successfully
157                        }
158                        Err(_e) => {
159                            // Failed to refresh; clear cache so a new key will be created next time
160                            *listen_key.write().await = None;
161                            *created_at.write().await = None;
162                            break;
163                        }
164                    }
165                } else {
166                    // No listen key available; stop the task
167                    break;
168                }
169            }
170        });
171
172        *self.refresh_task.lock().await = Some(handle);
173    }
174
175    /// Stops the auto-refresh task
176    pub async fn stop_auto_refresh(&self) {
177        let mut task_opt = self.refresh_task.lock().await;
178        if let Some(handle) = task_opt.take() {
179            handle.abort();
180        }
181    }
182
183    /// Deletes the listen key
184    ///
185    /// Closes the user data stream and invalidates the key
186    ///
187    /// # Returns
188    /// Result of the deletion
189    pub async fn delete(&self) -> Result<()> {
190        // Stop the auto-refresh first
191        self.stop_auto_refresh().await;
192
193        let key_opt = self.listen_key.read().await.clone();
194
195        if let Some(key) = key_opt {
196            self.binance.delete_listen_key(&key).await?;
197
198            // Clear cached state
199            *self.listen_key.write().await = None;
200            *self.created_at.write().await = None;
201
202            Ok(())
203        } else {
204            Ok(()) // No listen key; treat as success
205        }
206    }
207
208    /// Returns the current listen key when available
209    pub async fn get_current(&self) -> Option<String> {
210        self.listen_key.read().await.clone()
211    }
212
213    /// Checks whether the listen key is still valid
214    pub async fn is_valid(&self) -> bool {
215        let key_opt = self.listen_key.read().await;
216        if key_opt.is_none() {
217            return false;
218        }
219
220        let created = self.created_at.read().await;
221        if let Some(created_time) = *created {
222            // Key is considered valid if less than 55 minutes old
223            created_time.elapsed() < Duration::from_secs(55 * 60)
224        } else {
225            false
226        }
227    }
228}
229
230impl Drop for ListenKeyManager {
231    fn drop(&mut self) {
232        // Note: Drop is synchronous, so we cannot await asynchronous operations here.
233        // Callers should explicitly invoke `delete()` to release resources.
234    }
235}
236/// Subscription types supported by the Binance WebSocket API.
237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum SubscriptionType {
239    /// 24-hour ticker stream
240    Ticker,
241    /// Order book depth stream
242    OrderBook,
243    /// Real-time trade stream
244    Trades,
245    /// Kline (candlestick) stream with interval (e.g. "1m", "5m", "1h")
246    Kline(String),
247    /// Account balance stream
248    Balance,
249    /// Order update stream
250    Orders,
251    /// Position update stream
252    Positions,
253    /// Personal trade execution stream
254    MyTrades,
255    /// Mark price stream
256    MarkPrice,
257    /// Book ticker (best bid/ask) stream
258    BookTicker,
259}
260
261impl SubscriptionType {
262    /// Infers a subscription type from a stream name
263    ///
264    /// # Arguments
265    /// * `stream` - Stream identifier (e.g. "btcusdt@ticker")
266    ///
267    /// # Returns
268    /// Subscription type, when the stream can be identified
269    pub fn from_stream(stream: &str) -> Option<Self> {
270        if stream.contains("@ticker") {
271            Some(Self::Ticker)
272        } else if stream.contains("@depth") {
273            Some(Self::OrderBook)
274        } else if stream.contains("@trade") || stream.contains("@aggTrade") {
275            Some(Self::Trades)
276        } else if stream.contains("@kline_") {
277            // Extract timeframe suffix
278            let parts: Vec<&str> = stream.split("@kline_").collect();
279            if parts.len() == 2 {
280                Some(Self::Kline(parts[1].to_string()))
281            } else {
282                None
283            }
284        } else if stream.contains("@markPrice") {
285            Some(Self::MarkPrice)
286        } else if stream.contains("@bookTicker") {
287            Some(Self::BookTicker)
288        } else {
289            None
290        }
291    }
292}
293
294/// Subscription metadata
295#[derive(Clone)]
296pub struct Subscription {
297    /// Stream name (e.g. "btcusdt@ticker")
298    pub stream: String,
299    /// Normalized trading symbol (e.g. "BTCUSDT")
300    pub symbol: String,
301    /// Subscription type descriptor
302    pub sub_type: SubscriptionType,
303    /// Timestamp when the subscription was created
304    pub subscribed_at: Instant,
305    /// Sender for forwarding WebSocket messages to consumers
306    pub sender: tokio::sync::mpsc::UnboundedSender<Value>,
307}
308
309impl Subscription {
310    /// Creates a new subscription with the provided parameters
311    pub fn new(
312        stream: String,
313        symbol: String,
314        sub_type: SubscriptionType,
315        sender: tokio::sync::mpsc::UnboundedSender<Value>,
316    ) -> Self {
317        Self {
318            stream,
319            symbol,
320            sub_type,
321            subscribed_at: Instant::now(),
322            sender,
323        }
324    }
325
326    /// Sends a message to the subscriber
327    ///
328    /// # Arguments
329    /// * `message` - WebSocket payload to forward
330    ///
331    /// # Returns
332    /// Whether the message was delivered successfully
333    pub fn send(&self, message: Value) -> bool {
334        self.sender.send(message).is_ok()
335    }
336}
337
338/// Subscription manager
339///
340/// Manages the lifecycle of all WebSocket subscriptions, including:
341/// - Adding and removing subscriptions
342/// - Querying and validating subscriptions
343/// - Tracking the number of active subscriptions
344pub struct SubscriptionManager {
345    /// Mapping of `stream_name -> Subscription`
346    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
347    /// Index by symbol: `symbol -> Vec<stream_name>`
348    symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
349    /// Counter of active subscriptions
350    active_count: Arc<std::sync::atomic::AtomicUsize>,
351}
352
353impl SubscriptionManager {
354    /// Creates a new `SubscriptionManager`
355    pub fn new() -> Self {
356        Self {
357            subscriptions: Arc::new(RwLock::new(HashMap::new())),
358            symbol_index: Arc::new(RwLock::new(HashMap::new())),
359            active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
360        }
361    }
362
363    /// Adds a subscription to the manager
364    ///
365    /// # Arguments
366    /// * `stream` - Stream identifier (e.g. "btcusdt@ticker")
367    /// * `symbol` - Normalized trading symbol (e.g. "BTCUSDT")
368    /// * `sub_type` - Subscription classification
369    /// * `sender` - Channel for dispatching messages
370    ///
371    /// # Returns
372    /// Result of the insertion
373    pub async fn add_subscription(
374        &self,
375        stream: String,
376        symbol: String,
377        sub_type: SubscriptionType,
378        sender: tokio::sync::mpsc::UnboundedSender<Value>,
379    ) -> Result<()> {
380        let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
381
382        // Insert into the subscription map
383        let mut subs = self.subscriptions.write().await;
384        subs.insert(stream.clone(), subscription);
385
386        // Update the per-symbol index
387        let mut index = self.symbol_index.write().await;
388        index.entry(symbol).or_insert_with(Vec::new).push(stream);
389
390        // Increment the active subscription count
391        self.active_count
392            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
393
394        Ok(())
395    }
396
397    /// Removes a subscription by stream name
398    ///
399    /// # Arguments
400    /// * `stream` - Stream identifier to remove
401    ///
402    /// # Returns
403    /// Result of the removal
404    pub async fn remove_subscription(&self, stream: &str) -> Result<()> {
405        let mut subs = self.subscriptions.write().await;
406
407        if let Some(subscription) = subs.remove(stream) {
408            // Remove from the symbol index as well
409            let mut index = self.symbol_index.write().await;
410            if let Some(streams) = index.get_mut(&subscription.symbol) {
411                streams.retain(|s| s != stream);
412                // Drop the symbol entry when no subscriptions remain
413                if streams.is_empty() {
414                    index.remove(&subscription.symbol);
415                }
416            }
417
418            // Decrement the active subscription counter
419            self.active_count
420                .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
421        }
422
423        Ok(())
424    }
425
426    /// Retrieves a subscription by stream name
427    ///
428    /// # Arguments
429    /// * `stream` - Stream identifier
430    ///
431    /// # Returns
432    /// Optional subscription record
433    pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
434        let subs = self.subscriptions.read().await;
435        subs.get(stream).cloned()
436    }
437
438    /// Checks whether a subscription exists for the given stream
439    ///
440    /// # Arguments
441    /// * `stream` - Stream identifier
442    ///
443    /// # Returns
444    /// `true` if the subscription exists, otherwise `false`
445    pub async fn has_subscription(&self, stream: &str) -> bool {
446        let subs = self.subscriptions.read().await;
447        subs.contains_key(stream)
448    }
449
450    /// Returns all registered subscriptions
451    pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
452        let subs = self.subscriptions.read().await;
453        subs.values().cloned().collect()
454    }
455
456    /// Returns all subscriptions associated with a symbol
457    ///
458    /// # Arguments
459    /// * `symbol` - Trading symbol
460    pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
461        let index = self.symbol_index.read().await;
462        let subs = self.subscriptions.read().await;
463
464        if let Some(streams) = index.get(symbol) {
465            streams
466                .iter()
467                .filter_map(|stream| subs.get(stream).cloned())
468                .collect()
469        } else {
470            Vec::new()
471        }
472    }
473
474    /// Returns the number of active subscriptions
475    pub fn active_count(&self) -> usize {
476        self.active_count.load(std::sync::atomic::Ordering::SeqCst)
477    }
478
479    /// Removes all subscriptions and clears indexes
480    pub async fn clear(&self) {
481        let mut subs = self.subscriptions.write().await;
482        let mut index = self.symbol_index.write().await;
483
484        subs.clear();
485        index.clear();
486        self.active_count
487            .store(0, std::sync::atomic::Ordering::SeqCst);
488    }
489
490    /// Sends a message to subscribers of a specific stream
491    ///
492    /// # Arguments
493    /// * `stream` - Stream identifier
494    /// * `message` - WebSocket payload to broadcast
495    ///
496    /// # Returns
497    /// `true` when the message was delivered, otherwise `false`
498    pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
499        let subs = self.subscriptions.read().await;
500        if let Some(subscription) = subs.get(stream) {
501            subscription.send(message)
502        } else {
503            false
504        }
505    }
506
507    /// Sends a message to all subscribers of a symbol
508    ///
509    /// # Arguments
510    /// * `symbol` - Trading symbol
511    /// * `message` - Shared WebSocket payload
512    ///
513    /// # Returns
514    /// Number of subscribers that received the message
515    pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
516        let index = self.symbol_index.read().await;
517        let subs = self.subscriptions.read().await;
518
519        let mut sent_count = 0;
520
521        if let Some(streams) = index.get(symbol) {
522            for stream in streams {
523                if let Some(subscription) = subs.get(stream) {
524                    if subscription.send(message.clone()) {
525                        sent_count += 1;
526                    }
527                }
528            }
529        }
530
531        sent_count
532    }
533}
534/// Reconnect configuration
535///
536/// Defines the automatic reconnection strategy after a WebSocket disconnect
537#[derive(Debug, Clone)]
538pub struct ReconnectConfig {
539    /// Enables or disables automatic reconnection
540    pub enabled: bool,
541
542    /// Initial reconnection delay in milliseconds
543    pub initial_delay_ms: u64,
544
545    /// Maximum reconnection delay in milliseconds
546    pub max_delay_ms: u64,
547
548    /// Exponential backoff multiplier
549    pub backoff_multiplier: f64,
550
551    /// Maximum number of reconnection attempts (0 means unlimited)
552    pub max_attempts: usize,
553}
554
555impl Default for ReconnectConfig {
556    fn default() -> Self {
557        Self {
558            enabled: true,
559            initial_delay_ms: 1000,  // 1 second
560            max_delay_ms: 30000,     // 30 seconds
561            backoff_multiplier: 2.0, // Exponential backoff
562            max_attempts: 0,         // Unlimited retries
563        }
564    }
565}
566
567impl ReconnectConfig {
568    /// Calculates the reconnection delay
569    ///
570    /// Uses exponential backoff to determine the delay duration
571    ///
572    /// # Arguments
573    /// * `attempt` - Current reconnection attempt (zero-based)
574    ///
575    /// # Returns
576    /// Delay duration in milliseconds
577    pub fn calculate_delay(&self, attempt: usize) -> u64 {
578        let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
579        delay.min(self.max_delay_ms as f64) as u64
580    }
581
582    /// Determines whether another reconnection attempt should be made
583    ///
584    /// # Arguments
585    /// * `attempt` - Current reconnection attempt
586    ///
587    /// # Returns
588    /// `true` if another retry should be attempted
589    pub fn should_retry(&self, attempt: usize) -> bool {
590        self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
591    }
592}
593
594/// Message router
595///
596/// Handles reception, parsing, and dispatching of WebSocket messages. Core responsibilities:
597/// - Managing WebSocket connections
598/// - Receiving and routing messages
599/// - Coordinating automatic reconnection
600/// - Managing subscriptions
601pub struct MessageRouter {
602    /// WebSocket client instance
603    ws_client: Arc<RwLock<Option<WsClient>>>,
604
605    /// Subscription manager registry
606    subscription_manager: Arc<SubscriptionManager>,
607
608    /// Handle to the background routing task
609    router_task: Arc<Mutex<Option<JoinHandle<()>>>>,
610
611    /// Connection state flag
612    is_connected: Arc<std::sync::atomic::AtomicBool>,
613
614    /// Configuration for reconnection behavior
615    reconnect_config: Arc<RwLock<ReconnectConfig>>,
616
617    /// WebSocket endpoint URL
618    ws_url: String,
619
620    /// Request ID counter (used for subscribe/unsubscribe)
621    request_id: Arc<std::sync::atomic::AtomicU64>,
622}
623
624impl MessageRouter {
625    /// Creates a new message router
626    ///
627    /// # Arguments
628    /// * `ws_url` - WebSocket connection URL
629    /// * `subscription_manager` - Subscription manager handle
630    ///
631    /// # Returns
632    /// Configured router instance
633    pub fn new(ws_url: String, subscription_manager: Arc<SubscriptionManager>) -> Self {
634        Self {
635            ws_client: Arc::new(RwLock::new(None)),
636            subscription_manager,
637            router_task: Arc::new(Mutex::new(None)),
638            is_connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
639            reconnect_config: Arc::new(RwLock::new(ReconnectConfig::default())),
640            ws_url,
641            request_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
642        }
643    }
644
645    /// Starts the message router
646    ///
647    /// Establishes the WebSocket connection and launches the message loop
648    ///
649    /// # Returns
650    /// Result of the startup sequence
651    pub async fn start(&self) -> Result<()> {
652        // Stop any running instance before starting again
653        if self.is_connected() {
654            self.stop().await?;
655        }
656
657        // Establish a new WebSocket connection
658        let config = WsConfig {
659            url: self.ws_url.clone(),
660            ..Default::default()
661        };
662        let client = WsClient::new(config);
663        client.connect().await?;
664
665        // Store the client handle
666        *self.ws_client.write().await = Some(client);
667
668        // Update connection state
669        self.is_connected
670            .store(true, std::sync::atomic::Ordering::SeqCst);
671
672        // Spawn the message processing loop
673        let ws_client = self.ws_client.clone();
674        let subscription_manager = self.subscription_manager.clone();
675        let is_connected = self.is_connected.clone();
676        let reconnect_config = self.reconnect_config.clone();
677        let ws_url = self.ws_url.clone();
678
679        let handle = tokio::spawn(async move {
680            Self::message_loop(
681                ws_client,
682                subscription_manager,
683                is_connected,
684                reconnect_config,
685                ws_url,
686            )
687            .await;
688        });
689
690        *self.router_task.lock().await = Some(handle);
691
692        Ok(())
693    }
694
695    /// Stops the message router
696    ///
697    /// Terminates the message loop and disconnects the WebSocket client
698    ///
699    /// # Returns
700    /// Result of the shutdown procedure
701    pub async fn stop(&self) -> Result<()> {
702        // Mark the connection as inactive
703        self.is_connected
704            .store(false, std::sync::atomic::Ordering::SeqCst);
705
706        // Cancel the background routing task
707        let mut task_opt = self.router_task.lock().await;
708        if let Some(handle) = task_opt.take() {
709            handle.abort();
710        }
711
712        // Disconnect the WebSocket client if present
713        let mut client_opt = self.ws_client.write().await;
714        if let Some(client) = client_opt.take() {
715            let _ = client.disconnect().await;
716        }
717
718        Ok(())
719    }
720
721    /// Restarts the message router
722    ///
723    /// Stops the current connection and restarts it, typically used during reconnect scenarios
724    ///
725    /// # Returns
726    /// Result of the restart sequence
727    pub async fn restart(&self) -> Result<()> {
728        self.stop().await?;
729        tokio::time::sleep(Duration::from_millis(100)).await;
730        self.start().await
731    }
732
733    /// Returns the current connection state
734    pub fn is_connected(&self) -> bool {
735        self.is_connected.load(std::sync::atomic::Ordering::SeqCst)
736    }
737
738    /// Applies a new reconnection configuration
739    ///
740    /// # Arguments
741    /// * `config` - Updated reconnection configuration
742    pub async fn set_reconnect_config(&self, config: ReconnectConfig) {
743        *self.reconnect_config.write().await = config;
744    }
745
746    /// Retrieves the current reconnection configuration
747    pub async fn get_reconnect_config(&self) -> ReconnectConfig {
748        self.reconnect_config.read().await.clone()
749    }
750
751    /// Subscribes to the provided streams
752    ///
753    /// Sends a subscription request to Binance
754    ///
755    /// # Arguments
756    /// * `streams` - Collection of stream identifiers
757    pub async fn subscribe(&self, streams: Vec<String>) -> Result<()> {
758        if streams.is_empty() {
759            return Ok(());
760        }
761
762        let client_opt = self.ws_client.read().await;
763        let client = client_opt
764            .as_ref()
765            .ok_or_else(|| Error::network("WebSocket not connected"))?;
766
767        // Generate a request identifier
768        let id = self
769            .request_id
770            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
771
772        // Build the subscribe payload
773        #[allow(clippy::disallowed_methods)]
774        let request = serde_json::json!({
775            "method": "SUBSCRIBE",
776            "params": streams,
777            "id": id
778        });
779
780        // Send the request to Binance
781        client
782            .send(Message::Text(request.to_string().into()))
783            .await?;
784
785        Ok(())
786    }
787
788    /// Unsubscribes from the provided streams
789    ///
790    /// Sends an unsubscribe request to Binance
791    ///
792    /// # Arguments
793    /// * `streams` - Collection of stream identifiers
794    pub async fn unsubscribe(&self, streams: Vec<String>) -> Result<()> {
795        if streams.is_empty() {
796            return Ok(());
797        }
798
799        let client_opt = self.ws_client.read().await;
800        let client = client_opt
801            .as_ref()
802            .ok_or_else(|| Error::network("WebSocket not connected"))?;
803
804        // Generate a request identifier
805        let id = self
806            .request_id
807            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
808
809        // Build the unsubscribe payload
810        #[allow(clippy::disallowed_methods)]
811        let request = serde_json::json!({
812            "method": "UNSUBSCRIBE",
813            "params": streams,
814            "id": id
815        });
816
817        // Send the request to Binance
818        client
819            .send(Message::Text(request.to_string().into()))
820            .await?;
821
822        Ok(())
823    }
824
825    /// Message reception loop
826    ///
827    /// Continuously receives WebSocket messages and routes them to subscribers
828    async fn message_loop(
829        ws_client: Arc<RwLock<Option<WsClient>>>,
830        subscription_manager: Arc<SubscriptionManager>,
831        is_connected: Arc<std::sync::atomic::AtomicBool>,
832        reconnect_config: Arc<RwLock<ReconnectConfig>>,
833        ws_url: String,
834    ) {
835        let mut reconnect_attempt = 0;
836
837        loop {
838            // Exit if instructed to stop
839            if !is_connected.load(std::sync::atomic::Ordering::SeqCst) {
840                break;
841            }
842
843            // Confirm that a client instance exists
844            let has_client = ws_client.read().await.is_some();
845
846            if !has_client {
847                // Attempt to reconnect when no client is present
848                let config = reconnect_config.read().await;
849                if config.should_retry(reconnect_attempt) {
850                    let delay = config.calculate_delay(reconnect_attempt);
851                    drop(config);
852
853                    tokio::time::sleep(Duration::from_millis(delay)).await;
854
855                    if let Ok(()) = Self::reconnect(&ws_url, ws_client.clone()).await {
856                        reconnect_attempt = 0; // Reset counter on success
857                        continue;
858                    }
859                    reconnect_attempt += 1;
860                    continue;
861                }
862                // Stop attempting to reconnect
863                is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
864                break;
865            }
866
867            // Receive the next message (requires re-acquiring the lock)
868            let message_opt = {
869                let guard = ws_client.read().await;
870                if let Some(client) = guard.as_ref() {
871                    client.receive().await
872                } else {
873                    None
874                }
875            };
876
877            if let Some(value) = message_opt {
878                // Handle the message; ignore failures but continue the loop
879                if let Err(_e) = Self::handle_message(value, subscription_manager.clone()).await {
880                    continue;
881                }
882
883                // Reset the reconnection counter because the connection is healthy
884                reconnect_attempt = 0;
885            } else {
886                // Connection failure; attempt to reconnect
887                let config = reconnect_config.read().await;
888                if config.should_retry(reconnect_attempt) {
889                    let delay = config.calculate_delay(reconnect_attempt);
890                    drop(config);
891
892                    tokio::time::sleep(Duration::from_millis(delay)).await;
893
894                    if let Ok(()) = Self::reconnect(&ws_url, ws_client.clone()).await {
895                        reconnect_attempt = 0;
896                        continue;
897                    }
898                    reconnect_attempt += 1;
899                    continue;
900                }
901                // Reconnection not permitted anymore; stop the loop
902                is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
903                break;
904            }
905        }
906    }
907
908    /// Processes a WebSocket message
909    ///
910    /// Parses the message and routes it to relevant subscribers
911    async fn handle_message(
912        message: Value,
913        subscription_manager: Arc<SubscriptionManager>,
914    ) -> Result<()> {
915        // Determine the stream name for routing
916        let stream_name = Self::extract_stream_name(&message)?;
917
918        // Forward the message to the corresponding subscribers
919        let sent = subscription_manager
920            .send_to_stream(&stream_name, message)
921            .await;
922
923        if sent {
924            Ok(())
925        } else {
926            Err(Error::generic("No subscribers for stream"))
927        }
928    }
929
930    /// Extracts the stream name from an incoming message
931    ///
932    /// Supports two formats:
933    /// 1. Combined stream: `{"stream":"btcusdt@ticker","data":{...}}`
934    /// 2. Single stream: `{"e":"24hrTicker","s":"BTCUSDT",...}`
935    ///
936    /// # Arguments
937    /// * `message` - Raw WebSocket payload
938    ///
939    /// # Returns
940    /// Stream identifier for routing
941    fn extract_stream_name(message: &Value) -> Result<String> {
942        // Attempt to read the combined-stream format
943        if let Some(stream) = message.get("stream").and_then(|s| s.as_str()) {
944            return Ok(stream.to_string());
945        }
946
947        // Fall back to single-stream format: event@symbol (lowercase)
948        if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
949            if let Some(symbol) = message.get("s").and_then(|s| s.as_str()) {
950                // Build the stream identifier
951                let stream = match event_type {
952                    "24hrTicker" => format!("{}@ticker", symbol.to_lowercase()),
953                    "depthUpdate" => format!("{}@depth", symbol.to_lowercase()),
954                    "aggTrade" => format!("{}@aggTrade", symbol.to_lowercase()),
955                    "trade" => format!("{}@trade", symbol.to_lowercase()),
956                    "kline" => {
957                        // Kline messages carry the interval within the "k" object
958                        if let Some(kline) = message.get("k") {
959                            if let Some(interval) = kline.get("i").and_then(|i| i.as_str()) {
960                                format!("{}@kline_{}", symbol.to_lowercase(), interval)
961                            } else {
962                                return Err(Error::generic("Missing kline interval"));
963                            }
964                        } else {
965                            return Err(Error::generic("Missing kline data"));
966                        }
967                    }
968                    "markPriceUpdate" => format!("{}@markPrice", symbol.to_lowercase()),
969                    "bookTicker" => format!("{}@bookTicker", symbol.to_lowercase()),
970                    _ => {
971                        return Err(Error::generic(format!(
972                            "Unknown event type: {}",
973                            event_type
974                        )));
975                    }
976                };
977                return Ok(stream);
978            }
979        }
980
981        // Ignore subscription acknowledgements and error responses
982        if message.get("result").is_some() || message.get("error").is_some() {
983            return Err(Error::generic("Subscription response, skip routing"));
984        }
985
986        Err(Error::generic("Cannot extract stream name from message"))
987    }
988
989    /// Reconnects the WebSocket client
990    ///
991    /// Closes the existing connection and establishes a new one
992    async fn reconnect(ws_url: &str, ws_client: Arc<RwLock<Option<WsClient>>>) -> Result<()> {
993        // Close the previous connection
994        {
995            let mut client_opt = ws_client.write().await;
996            if let Some(client) = client_opt.take() {
997                let _ = client.disconnect().await;
998            }
999        }
1000
1001        // Establish a fresh connection
1002        let config = WsConfig {
1003            url: ws_url.to_string(),
1004            ..Default::default()
1005        };
1006        let new_client = WsClient::new(config);
1007
1008        // Connect to the WebSocket endpoint
1009        new_client.connect().await?;
1010
1011        // Store the new client handle
1012        *ws_client.write().await = Some(new_client);
1013
1014        Ok(())
1015    }
1016}
1017
1018impl Drop for MessageRouter {
1019    fn drop(&mut self) {
1020        // Note: Drop is synchronous, so we cannot await asynchronous operations here.
1021        // Callers should explicitly invoke `stop()` to release resources.
1022    }
1023}
1024
1025impl Default for SubscriptionManager {
1026    fn default() -> Self {
1027        Self::new()
1028    }
1029}
1030
1031/// Binance WebSocket client wrapper
1032pub struct BinanceWs {
1033    client: Arc<WsClient>,
1034    listen_key: Arc<RwLock<Option<String>>>,
1035    /// Listen key manager
1036    listen_key_manager: Option<Arc<ListenKeyManager>>,
1037    /// Auto-reconnect coordinator
1038    auto_reconnect_coordinator: Arc<Mutex<Option<ccxt_core::ws_client::AutoReconnectCoordinator>>>,
1039    /// Cached ticker data
1040    tickers: Arc<Mutex<HashMap<String, Ticker>>>,
1041    /// Cached bid/ask snapshots
1042    bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
1043    /// Cached mark price entries
1044    #[allow(dead_code)]
1045    mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
1046    /// Cached order book snapshots
1047    orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
1048    /// Cached trade history (retains the latest 1000 entries)
1049    trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
1050    /// Cached OHLCV candles
1051    ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
1052    /// Cached balance data grouped by account type
1053    balances: Arc<RwLock<HashMap<String, Balance>>>,
1054    /// Cached orders grouped by symbol then order ID
1055    orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
1056    /// Cached personal trades grouped by symbol (retains the latest 1000 entries)
1057    my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
1058    /// Cached positions grouped by symbol and side
1059    positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
1060}
1061
1062impl BinanceWs {
1063    /// Creates a new Binance WebSocket client
1064    ///
1065    /// # Arguments
1066    /// * `url` - WebSocket server URL
1067    ///
1068    /// # Returns
1069    /// Initialized Binance WebSocket client
1070    pub fn new(url: String) -> Self {
1071        let config = WsConfig {
1072            url,
1073            connect_timeout: 10000,
1074            ping_interval: 180000, // Binance recommends 3 minutes
1075            reconnect_interval: 5000,
1076            max_reconnect_attempts: 5,
1077            auto_reconnect: true,
1078            enable_compression: false,
1079            pong_timeout: 90000, // Set pong timeout to 90 seconds
1080            ..Default::default()
1081        };
1082
1083        Self {
1084            client: Arc::new(WsClient::new(config)),
1085            listen_key: Arc::new(RwLock::new(None)),
1086            listen_key_manager: None,
1087            auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1088            tickers: Arc::new(Mutex::new(HashMap::new())),
1089            bids_asks: Arc::new(Mutex::new(HashMap::new())),
1090            mark_prices: Arc::new(Mutex::new(HashMap::new())),
1091            orderbooks: Arc::new(Mutex::new(HashMap::new())),
1092            trades: Arc::new(Mutex::new(HashMap::new())),
1093            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1094            balances: Arc::new(RwLock::new(HashMap::new())),
1095            orders: Arc::new(RwLock::new(HashMap::new())),
1096            my_trades: Arc::new(RwLock::new(HashMap::new())),
1097            positions: Arc::new(RwLock::new(HashMap::new())),
1098        }
1099    }
1100
1101    /// Creates a WebSocket client with a listen key manager
1102    ///
1103    /// # Arguments
1104    /// * `url` - WebSocket server URL
1105    /// * `binance` - Shared Binance instance
1106    ///
1107    /// # Returns
1108    /// Binance WebSocket client configured with a listen key manager
1109    pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
1110        let config = WsConfig {
1111            url,
1112            connect_timeout: 10000,
1113            ping_interval: 180000, // Binance recommends 3 minutes
1114            reconnect_interval: 5000,
1115            max_reconnect_attempts: 5,
1116            auto_reconnect: true,
1117            enable_compression: false,
1118            pong_timeout: 90000, // Set pong timeout to 90 seconds
1119            ..Default::default()
1120        };
1121
1122        Self {
1123            client: Arc::new(WsClient::new(config)),
1124            listen_key: Arc::new(RwLock::new(None)),
1125            listen_key_manager: Some(Arc::new(ListenKeyManager::new(binance))),
1126            auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1127            tickers: Arc::new(Mutex::new(HashMap::new())),
1128            bids_asks: Arc::new(Mutex::new(HashMap::new())),
1129            mark_prices: Arc::new(Mutex::new(HashMap::new())),
1130            orderbooks: Arc::new(Mutex::new(HashMap::new())),
1131            trades: Arc::new(Mutex::new(HashMap::new())),
1132            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1133            balances: Arc::new(RwLock::new(HashMap::new())),
1134            orders: Arc::new(RwLock::new(HashMap::new())),
1135            my_trades: Arc::new(RwLock::new(HashMap::new())),
1136            positions: Arc::new(RwLock::new(HashMap::new())),
1137        }
1138    }
1139
1140    /// Connects to the WebSocket server
1141    pub async fn connect(&self) -> Result<()> {
1142        // Establish the WebSocket connection
1143        self.client.connect().await?;
1144
1145        // Start the auto-reconnect coordinator when not already running
1146        let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1147        if coordinator_guard.is_none() {
1148            let coordinator = self.client.clone().create_auto_reconnect_coordinator();
1149            coordinator.start().await;
1150            *coordinator_guard = Some(coordinator);
1151            tracing::info!("Auto-reconnect coordinator started");
1152        }
1153
1154        Ok(())
1155    }
1156
1157    /// Disconnects from the WebSocket server
1158    pub async fn disconnect(&self) -> Result<()> {
1159        // Stop the auto-reconnect coordinator first
1160        let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1161        if let Some(coordinator) = coordinator_guard.take() {
1162            coordinator.stop().await;
1163            tracing::info!("Auto-reconnect coordinator stopped");
1164        }
1165
1166        // Stop automatic listen key refresh if available
1167        if let Some(manager) = &self.listen_key_manager {
1168            manager.stop_auto_refresh().await;
1169        }
1170
1171        self.client.disconnect().await
1172    }
1173
1174    /// Connects to the user data stream
1175    ///
1176    /// Creates or retrieves a listen key, connects to the user data WebSocket, and starts auto-refresh
1177    ///
1178    /// # Returns
1179    /// Result of the connection attempt
1180    ///
1181    /// # Errors
1182    /// - Listen key manager is missing (requires `new_with_auth` constructor)
1183    /// - Listen key creation failed
1184    /// - WebSocket connection failed
1185    ///
1186    /// # Example
1187    /// ```rust,no_run
1188    /// # use ccxt_exchanges::binance::Binance;
1189    /// # use ccxt_core::ExchangeConfig;
1190    /// # use std::sync::Arc;
1191    /// # async fn example() -> ccxt_core::error::Result<()> {
1192    /// let binance = Arc::new(Binance::new(ExchangeConfig::default())?);
1193    /// let ws = binance.create_authenticated_ws();
1194    /// ws.connect_user_stream().await?;
1195    /// # Ok(())
1196    /// # }
1197    /// ```
1198    pub async fn connect_user_stream(&self) -> Result<()> {
1199        let manager = self.listen_key_manager.as_ref()
1200            .ok_or_else(|| Error::invalid_request(
1201                "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
1202            ))?;
1203
1204        // Obtain an existing listen key or create a new one
1205        let listen_key = manager.get_or_create().await?;
1206
1207        // Update the WebSocket URL using the listen key
1208        let user_stream_url = format!("wss://stream.binance.com:9443/ws/{}", listen_key);
1209
1210        // Recreate the WebSocket client configuration
1211        let config = WsConfig {
1212            url: user_stream_url,
1213            connect_timeout: 10000,
1214            ping_interval: 180000,
1215            reconnect_interval: 5000,
1216            max_reconnect_attempts: 5,
1217            auto_reconnect: true,
1218            enable_compression: false,
1219            pong_timeout: 90000, // Default 90-second timeout
1220            ..Default::default()
1221        };
1222
1223        // Initialize a new client instance (retained for future replacement logic)
1224        let _new_client = Arc::new(WsClient::new(config));
1225        // An `Arc` cannot be modified in place; concrete handling is deferred for future work
1226
1227        // Connect to the WebSocket endpoint
1228        self.client.connect().await?;
1229
1230        // Start automatic listen key refresh
1231        manager.start_auto_refresh().await;
1232
1233        // Cache the current listen key locally
1234        *self.listen_key.write().await = Some(listen_key);
1235
1236        Ok(())
1237    }
1238
1239    /// Closes the user data stream
1240    ///
1241    /// Stops auto-refresh and deletes the listen key
1242    ///
1243    /// # Returns
1244    /// Result of the shutdown
1245    pub async fn close_user_stream(&self) -> Result<()> {
1246        if let Some(manager) = &self.listen_key_manager {
1247            manager.delete().await?;
1248        }
1249        *self.listen_key.write().await = None;
1250        Ok(())
1251    }
1252
1253    /// Returns the active listen key, when available
1254    pub async fn get_listen_key(&self) -> Option<String> {
1255        if let Some(manager) = &self.listen_key_manager {
1256            manager.get_current().await
1257        } else {
1258            self.listen_key.read().await.clone()
1259        }
1260    }
1261
1262    /// Subscribes to the ticker stream for a symbol
1263    ///
1264    /// # Arguments
1265    /// * `symbol` - Trading pair (e.g. "btcusdt")
1266    ///
1267    /// # Returns
1268    /// Result of the subscription request
1269    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
1270        let stream = format!("{}@ticker", symbol.to_lowercase());
1271        self.client
1272            .subscribe(stream, Some(symbol.to_string()), None)
1273            .await
1274    }
1275
1276    /// Subscribes to the 24-hour ticker stream for all symbols
1277    pub async fn subscribe_all_tickers(&self) -> Result<()> {
1278        self.client
1279            .subscribe("!ticker@arr".to_string(), None, None)
1280            .await
1281    }
1282
1283    /// Subscribes to real-time trade executions for a symbol
1284    ///
1285    /// # Arguments
1286    /// * `symbol` - Trading pair (e.g. "btcusdt")
1287    ///
1288    /// # Returns
1289    /// Result of the subscription request
1290    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
1291        let stream = format!("{}@trade", symbol.to_lowercase());
1292        self.client
1293            .subscribe(stream, Some(symbol.to_string()), None)
1294            .await
1295    }
1296
1297    /// Subscribes to the aggregated trade stream for a symbol
1298    ///
1299    /// # Arguments
1300    /// * `symbol` - Trading pair (e.g. "btcusdt")
1301    ///
1302    /// # Returns
1303    /// Result of the subscription request
1304    pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
1305        let stream = format!("{}@aggTrade", symbol.to_lowercase());
1306        self.client
1307            .subscribe(stream, Some(symbol.to_string()), None)
1308            .await
1309    }
1310
1311    /// Subscribes to the order book depth stream
1312    ///
1313    /// # Arguments
1314    /// * `symbol` - Trading pair (e.g. "btcusdt")
1315    /// * `levels` - Depth levels (5, 10, 20)
1316    /// * `update_speed` - Update frequency ("100ms" or "1000ms")
1317    ///
1318    /// # Returns
1319    /// Result of the subscription request
1320    pub async fn subscribe_orderbook(
1321        &self,
1322        symbol: &str,
1323        levels: u32,
1324        update_speed: &str,
1325    ) -> Result<()> {
1326        let stream = if update_speed == "100ms" {
1327            format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
1328        } else {
1329            format!("{}@depth{}", symbol.to_lowercase(), levels)
1330        };
1331
1332        self.client
1333            .subscribe(stream, Some(symbol.to_string()), None)
1334            .await
1335    }
1336
1337    /// Subscribes to the diff order book stream
1338    ///
1339    /// # Arguments
1340    /// * `symbol` - Trading pair (e.g. "btcusdt")
1341    /// * `update_speed` - Update frequency ("100ms" or "1000ms")
1342    ///
1343    /// # Returns
1344    /// Result of the subscription request
1345    pub async fn subscribe_orderbook_diff(
1346        &self,
1347        symbol: &str,
1348        update_speed: Option<&str>,
1349    ) -> Result<()> {
1350        let stream = if let Some(speed) = update_speed {
1351            if speed == "100ms" {
1352                format!("{}@depth@100ms", symbol.to_lowercase())
1353            } else {
1354                format!("{}@depth", symbol.to_lowercase())
1355            }
1356        } else {
1357            format!("{}@depth", symbol.to_lowercase())
1358        };
1359
1360        self.client
1361            .subscribe(stream, Some(symbol.to_string()), None)
1362            .await
1363    }
1364
1365    /// Subscribes to Kline (candlestick) data for a symbol
1366    ///
1367    /// # Arguments
1368    /// * `symbol` - Trading pair (e.g. "btcusdt")
1369    /// * `interval` - Kline interval (1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1M)
1370    ///
1371    /// # Returns
1372    /// Result of the subscription request
1373    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
1374        let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
1375        self.client
1376            .subscribe(stream, Some(symbol.to_string()), None)
1377            .await
1378    }
1379
1380    /// Subscribes to the mini ticker stream for a symbol
1381    ///
1382    /// # Arguments
1383    /// * `symbol` - Trading pair (e.g. "btcusdt")
1384    ///
1385    /// # Returns
1386    /// Result of the subscription request
1387    pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
1388        let stream = format!("{}@miniTicker", symbol.to_lowercase());
1389        self.client
1390            .subscribe(stream, Some(symbol.to_string()), None)
1391            .await
1392    }
1393
1394    /// Subscribes to the mini ticker stream for all symbols
1395    pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
1396        self.client
1397            .subscribe("!miniTicker@arr".to_string(), None, None)
1398            .await
1399    }
1400
1401    /// Cancels an existing subscription
1402    ///
1403    /// # Arguments
1404    /// * `stream` - Stream identifier to unsubscribe from
1405    ///
1406    /// # Returns
1407    /// Result of the unsubscribe request
1408    pub async fn unsubscribe(&self, stream: String) -> Result<()> {
1409        self.client.unsubscribe(stream, None).await
1410    }
1411
1412    /// Receives the next available message
1413    ///
1414    /// # Returns
1415    /// Optional message payload when available
1416    pub async fn receive(&self) -> Option<Value> {
1417        self.client.receive().await
1418    }
1419
1420    /// Indicates whether the WebSocket connection is active
1421    pub fn is_connected(&self) -> bool {
1422        self.client.is_connected()
1423    }
1424
1425    /// Watches a single ticker stream (internal helper)
1426    ///
1427    /// # Arguments
1428    /// * `symbol` - Lowercase trading pair (e.g. "btcusdt")
1429    /// * `channel_name` - Channel identifier (ticker/miniTicker/markPrice/bookTicker)
1430    ///
1431    /// # Returns
1432    /// Parsed ticker data
1433    async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
1434        let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
1435
1436        // Subscribe to the requested stream
1437        self.client
1438            .subscribe(stream.clone(), Some(symbol.to_string()), None)
1439            .await?;
1440
1441        // Wait for and parse incoming messages
1442        loop {
1443            if let Some(message) = self.client.receive().await {
1444                // Ignore subscription acknowledgements
1445                if message.get("result").is_some() {
1446                    continue;
1447                }
1448
1449                // Parse ticker payload
1450                if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1451                    // Cache the ticker for later retrieval
1452                    let mut tickers = self.tickers.lock().await;
1453                    tickers.insert(ticker.symbol.clone(), ticker.clone());
1454
1455                    return Ok(ticker);
1456                }
1457            }
1458        }
1459    }
1460
1461    /// Watches multiple ticker streams (internal helper)
1462    ///
1463    /// # Arguments
1464    /// * `symbols` - Optional list of lowercase trading pairs
1465    /// * `channel_name` - Target channel name
1466    ///
1467    /// # Returns
1468    /// Mapping of symbol to ticker payloads
1469    async fn watch_tickers_internal(
1470        &self,
1471        symbols: Option<Vec<String>>,
1472        channel_name: &str,
1473    ) -> Result<HashMap<String, Ticker>> {
1474        let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
1475            // Subscribe to specific trading pairs
1476            syms.iter()
1477                .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
1478                .collect()
1479        } else {
1480            // Subscribe to the aggregated ticker stream
1481            vec![format!("!{}@arr", channel_name)]
1482        };
1483
1484        // Issue subscription requests
1485        for stream in &streams {
1486            self.client.subscribe(stream.clone(), None, None).await?;
1487        }
1488
1489        // Collect and parse messages
1490        let mut result = HashMap::new();
1491
1492        loop {
1493            if let Some(message) = self.client.receive().await {
1494                // Skip subscription acknowledgements
1495                if message.get("result").is_some() {
1496                    continue;
1497                }
1498
1499                // Handle array payloads (all tickers)
1500                if let Some(arr) = message.as_array() {
1501                    for item in arr {
1502                        if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
1503                            let symbol = ticker.symbol.clone();
1504
1505                            // Return only requested symbols when provided
1506                            if let Some(syms) = &symbols {
1507                                if syms.contains(&symbol.to_lowercase()) {
1508                                    result.insert(symbol.clone(), ticker.clone());
1509                                }
1510                            } else {
1511                                result.insert(symbol.clone(), ticker.clone());
1512                            }
1513
1514                            // Cache the ticker payload
1515                            let mut tickers = self.tickers.lock().await;
1516                            tickers.insert(symbol, ticker);
1517                        }
1518                    }
1519
1520                    // Exit once all requested tickers have been observed
1521                    if let Some(syms) = &symbols {
1522                        if result.len() == syms.len() {
1523                            return Ok(result);
1524                        }
1525                    } else {
1526                        return Ok(result);
1527                    }
1528                } else if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1529                    // Handle single-ticker payloads
1530                    let symbol = ticker.symbol.clone();
1531                    result.insert(symbol.clone(), ticker.clone());
1532
1533                    // Cache the ticker payload
1534                    let mut tickers = self.tickers.lock().await;
1535                    tickers.insert(symbol, ticker);
1536
1537                    // Exit once all requested tickers have been observed
1538                    if let Some(syms) = &symbols {
1539                        if result.len() == syms.len() {
1540                            return Ok(result);
1541                        }
1542                    }
1543                }
1544            }
1545        }
1546    }
1547
1548    /// Processes an order book delta update (internal helper)
1549    ///
1550    /// # Arguments
1551    /// * `symbol` - Trading pair
1552    /// * `delta_message` - Raw WebSocket delta payload
1553    /// * `is_futures` - Whether the feed originates from the futures market
1554    ///
1555    /// # Returns
1556    /// Result of the update; returns a special error when resynchronization is required
1557    async fn handle_orderbook_delta(
1558        &self,
1559        symbol: &str,
1560        delta_message: &Value,
1561        is_futures: bool,
1562    ) -> Result<()> {
1563        use ccxt_core::types::orderbook::{OrderBookDelta, OrderBookEntry};
1564        use rust_decimal::Decimal;
1565
1566        // Parse the delta message
1567        let first_update_id = delta_message["U"]
1568            .as_i64()
1569            .ok_or_else(|| Error::invalid_request("Missing first update ID in delta message"))?;
1570
1571        let final_update_id = delta_message["u"]
1572            .as_i64()
1573            .ok_or_else(|| Error::invalid_request("Missing final update ID in delta message"))?;
1574
1575        let prev_final_update_id = if is_futures {
1576            delta_message["pu"].as_i64()
1577        } else {
1578            None
1579        };
1580
1581        let timestamp = delta_message["E"]
1582            .as_i64()
1583            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1584
1585        // Parse bid updates
1586        let mut bids = Vec::new();
1587        if let Some(bids_arr) = delta_message["b"].as_array() {
1588            for bid in bids_arr {
1589                if let (Some(price_str), Some(amount_str)) = (bid[0].as_str(), bid[1].as_str()) {
1590                    if let (Ok(price), Ok(amount)) =
1591                        (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1592                    {
1593                        bids.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1594                    }
1595                }
1596            }
1597        }
1598
1599        // Parse ask updates
1600        let mut asks = Vec::new();
1601        if let Some(asks_arr) = delta_message["a"].as_array() {
1602            for ask in asks_arr {
1603                if let (Some(price_str), Some(amount_str)) = (ask[0].as_str(), ask[1].as_str()) {
1604                    if let (Ok(price), Ok(amount)) =
1605                        (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1606                    {
1607                        asks.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1608                    }
1609                }
1610            }
1611        }
1612
1613        // Build the delta structure
1614        let delta = OrderBookDelta {
1615            symbol: symbol.to_string(),
1616            first_update_id,
1617            final_update_id,
1618            prev_final_update_id,
1619            timestamp,
1620            bids,
1621            asks,
1622        };
1623
1624        // Retrieve or create the cached order book
1625        let mut orderbooks = self.orderbooks.lock().await;
1626        let orderbook = orderbooks
1627            .entry(symbol.to_string())
1628            .or_insert_with(|| OrderBook::new(symbol.to_string(), timestamp));
1629
1630        // If the order book is not synchronized yet, buffer the delta
1631        if !orderbook.is_synced {
1632            orderbook.buffer_delta(delta);
1633            return Ok(());
1634        }
1635
1636        // Apply the delta to the order book
1637        if let Err(e) = orderbook.apply_delta(&delta, is_futures) {
1638            // Check whether a resynchronization cycle is needed
1639            if orderbook.needs_resync {
1640                tracing::warn!("Orderbook {} needs resync due to: {}", symbol, e);
1641                // Buffer the delta so it can be reused after resync
1642                orderbook.buffer_delta(delta);
1643                // Signal that resynchronization is required
1644                return Err(Error::invalid_request(format!("RESYNC_NEEDED: {}", e)));
1645            }
1646            return Err(Error::invalid_request(e));
1647        }
1648
1649        Ok(())
1650    }
1651
1652    /// Retrieves an order book snapshot and initializes cached state (internal helper)
1653    ///
1654    /// # Arguments
1655    /// * `exchange` - Exchange reference used for REST API calls
1656    /// * `symbol` - Trading pair identifier
1657    /// * `limit` - Depth limit to request
1658    /// * `is_futures` - Whether the symbol is a futures market
1659    ///
1660    /// # Returns
1661    /// Initialized order book structure
1662    async fn fetch_orderbook_snapshot(
1663        &self,
1664        exchange: &Binance,
1665        symbol: &str,
1666        limit: Option<i64>,
1667        is_futures: bool,
1668    ) -> Result<OrderBook> {
1669        // Fetch snapshot via REST API
1670        let mut params = HashMap::new();
1671        if let Some(l) = limit {
1672            // json! macro with simple values is infallible
1673            #[allow(clippy::disallowed_methods)]
1674            let limit_value = serde_json::json!(l);
1675            params.insert("limit".to_string(), limit_value);
1676        }
1677
1678        let mut snapshot = exchange.fetch_order_book(symbol, None).await?;
1679
1680        // Mark the snapshot as synchronized
1681        snapshot.is_synced = true;
1682
1683        // Apply buffered deltas captured before the snapshot
1684        let mut orderbooks = self.orderbooks.lock().await;
1685        if let Some(cached_ob) = orderbooks.get_mut(symbol) {
1686            // Transfer buffered deltas to the snapshot instance
1687            snapshot
1688                .buffered_deltas
1689                .clone_from(&cached_ob.buffered_deltas);
1690
1691            // Apply buffered deltas to catch up
1692            if let Ok(processed) = snapshot.process_buffered_deltas(is_futures) {
1693                tracing::debug!("Processed {} buffered deltas for {}", processed, symbol);
1694            }
1695        }
1696
1697        // Update cache with the synchronized snapshot
1698        orderbooks.insert(symbol.to_string(), snapshot.clone());
1699
1700        Ok(snapshot)
1701    }
1702
1703    /// Watches a single order book stream (internal helper)
1704    ///
1705    /// # Arguments
1706    /// * `exchange` - Exchange reference
1707    /// * `symbol` - Lowercase trading pair
1708    /// * `limit` - Depth limit
1709    /// * `update_speed` - Update frequency (100 or 1000 ms)
1710    /// * `is_futures` - Whether the symbol is a futures market
1711    ///
1712    /// # Returns
1713    /// Order book snapshot enriched with streamed updates
1714    async fn watch_orderbook_internal(
1715        &self,
1716        exchange: &Binance,
1717        symbol: &str,
1718        limit: Option<i64>,
1719        update_speed: i32,
1720        is_futures: bool,
1721    ) -> Result<OrderBook> {
1722        // Construct the stream name
1723        let stream = if update_speed == 100 {
1724            format!("{}@depth@100ms", symbol.to_lowercase())
1725        } else {
1726            format!("{}@depth", symbol.to_lowercase())
1727        };
1728
1729        // Subscribe to depth updates
1730        self.client
1731            .subscribe(stream.clone(), Some(symbol.to_string()), None)
1732            .await?;
1733
1734        // Start buffering deltas before the snapshot is ready
1735        let snapshot_fetched = Arc::new(Mutex::new(false));
1736        let _snapshot_fetched_clone = snapshot_fetched.clone();
1737
1738        // Spawn a placeholder processing loop (actual handling occurs elsewhere)
1739        let _orderbooks_clone = self.orderbooks.clone();
1740        let _symbol_clone = symbol.to_string();
1741
1742        tokio::spawn(async move {
1743            // Placeholder: actual message handling occurs in the main loop
1744        });
1745
1746        // Give the stream time to accumulate initial deltas before fetching a snapshot
1747        tokio::time::sleep(Duration::from_millis(500)).await;
1748
1749        // Fetch the initial snapshot
1750        let _snapshot = self
1751            .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1752            .await?;
1753
1754        *snapshot_fetched.lock().await = true;
1755
1756        // Main processing loop
1757        loop {
1758            if let Some(message) = self.client.receive().await {
1759                // Skip subscription acknowledgements
1760                if message.get("result").is_some() {
1761                    continue;
1762                }
1763
1764                // Process depth updates only
1765                if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
1766                    if event_type == "depthUpdate" {
1767                        match self
1768                            .handle_orderbook_delta(symbol, &message, is_futures)
1769                            .await
1770                        {
1771                            Ok(()) => {
1772                                // Return the updated order book once synchronized
1773                                let orderbooks = self.orderbooks.lock().await;
1774                                if let Some(ob) = orderbooks.get(symbol) {
1775                                    if ob.is_synced {
1776                                        return Ok(ob.clone());
1777                                    }
1778                                }
1779                            }
1780                            Err(e) => {
1781                                let err_msg = e.to_string();
1782
1783                                // Initiate resynchronization when instructed
1784                                if err_msg.contains("RESYNC_NEEDED") {
1785                                    tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
1786
1787                                    let current_time = chrono::Utc::now().timestamp_millis();
1788                                    let should_resync = {
1789                                        let orderbooks = self.orderbooks.lock().await;
1790                                        if let Some(ob) = orderbooks.get(symbol) {
1791                                            ob.should_resync(current_time)
1792                                        } else {
1793                                            true
1794                                        }
1795                                    };
1796
1797                                    if should_resync {
1798                                        tracing::info!("Initiating resync for {}", symbol);
1799
1800                                        // Reset local state in preparation for resync
1801                                        {
1802                                            let mut orderbooks = self.orderbooks.lock().await;
1803                                            if let Some(ob) = orderbooks.get_mut(symbol) {
1804                                                ob.reset_for_resync();
1805                                                ob.mark_resync_initiated(current_time);
1806                                            }
1807                                        }
1808
1809                                        // Allow some deltas to buffer before fetching a new snapshot
1810                                        tokio::time::sleep(Duration::from_millis(500)).await;
1811
1812                                        // Fetch a fresh snapshot and continue processing
1813                                        match self
1814                                            .fetch_orderbook_snapshot(
1815                                                exchange, symbol, limit, is_futures,
1816                                            )
1817                                            .await
1818                                        {
1819                                            Ok(_) => {
1820                                                tracing::info!(
1821                                                    "Resync completed successfully for {}",
1822                                                    symbol
1823                                                );
1824                                                continue;
1825                                            }
1826                                            Err(resync_err) => {
1827                                                tracing::error!(
1828                                                    "Resync failed for {}: {}",
1829                                                    symbol,
1830                                                    resync_err
1831                                                );
1832                                                return Err(resync_err);
1833                                            }
1834                                        }
1835                                    }
1836                                    tracing::debug!("Resync rate limited for {}, skipping", symbol);
1837                                }
1838                                tracing::error!("Failed to handle orderbook delta: {}", err_msg);
1839                            }
1840                        }
1841                    }
1842                }
1843            }
1844        }
1845    }
1846
1847    /// Watches multiple order book streams (internal helper)
1848    ///
1849    /// # Arguments
1850    /// * `exchange` - Exchange reference
1851    /// * `symbols` - List of trading pairs
1852    /// * `limit` - Requested depth level
1853    /// * `update_speed` - Update frequency
1854    /// * `is_futures` - Whether the symbols are futures markets
1855    ///
1856    /// # Returns
1857    /// Mapping of symbol to order book data
1858    async fn watch_orderbooks_internal(
1859        &self,
1860        exchange: &Binance,
1861        symbols: Vec<String>,
1862        limit: Option<i64>,
1863        update_speed: i32,
1864        is_futures: bool,
1865    ) -> Result<HashMap<String, OrderBook>> {
1866        // Binance enforces a 200-symbol limit per connection
1867        if symbols.len() > 200 {
1868            return Err(Error::invalid_request(
1869                "Binance supports max 200 symbols per connection",
1870            ));
1871        }
1872
1873        // Subscribe to each symbol
1874        for symbol in &symbols {
1875            let stream = if update_speed == 100 {
1876                format!("{}@depth@100ms", symbol.to_lowercase())
1877            } else {
1878                format!("{}@depth", symbol.to_lowercase())
1879            };
1880
1881            self.client
1882                .subscribe(stream, Some(symbol.clone()), None)
1883                .await?;
1884        }
1885
1886        // Allow messages to accumulate before snapshot retrieval
1887        tokio::time::sleep(Duration::from_millis(500)).await;
1888
1889        // Fetch snapshots for all symbols
1890        for symbol in &symbols {
1891            let _ = self
1892                .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1893                .await;
1894        }
1895
1896        // Process incremental updates
1897        let mut result = HashMap::new();
1898        let mut update_count = 0;
1899
1900        while update_count < symbols.len() {
1901            if let Some(message) = self.client.receive().await {
1902                // Skip subscription acknowledgements
1903                if message.get("result").is_some() {
1904                    continue;
1905                }
1906
1907                // Handle depth updates
1908                if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
1909                    if event_type == "depthUpdate" {
1910                        if let Some(msg_symbol) =
1911                            message.get("s").and_then(serde_json::Value::as_str)
1912                        {
1913                            if let Err(e) = self
1914                                .handle_orderbook_delta(msg_symbol, &message, is_futures)
1915                                .await
1916                            {
1917                                tracing::error!("Failed to handle orderbook delta: {}", e);
1918                                continue;
1919                            }
1920
1921                            update_count += 1;
1922                        }
1923                    }
1924                }
1925            }
1926        }
1927
1928        // Collect the resulting order books
1929        let orderbooks = self.orderbooks.lock().await;
1930        for symbol in &symbols {
1931            if let Some(ob) = orderbooks.get(symbol) {
1932                result.insert(symbol.clone(), ob.clone());
1933            }
1934        }
1935
1936        Ok(result)
1937    }
1938
1939    ///
1940    /// # Arguments
1941    /// * `symbol` - Trading pair identifier
1942    ///
1943    /// # Returns
1944    /// Ticker snapshot when available
1945    pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1946        let tickers = self.tickers.lock().await;
1947        tickers.get(symbol).cloned()
1948    }
1949
1950    /// Returns all cached ticker snapshots
1951    pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1952        let tickers = self.tickers.lock().await;
1953        tickers.clone()
1954    }
1955
1956    /// Handles balance update messages (internal helper)
1957    ///
1958    /// # Arguments
1959    /// * `message` - WebSocket payload
1960    /// * `account_type` - Account category (spot/future/delivery, etc.)
1961    ///
1962    /// # Returns
1963    /// Result of the balance update processing
1964    async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1965        use rust_decimal::Decimal;
1966        use std::str::FromStr;
1967
1968        // Identify the event type
1969        let event_type = message
1970            .get("e")
1971            .and_then(|e| e.as_str())
1972            .ok_or_else(|| Error::invalid_request("Missing event type in balance message"))?;
1973
1974        // Retrieve or create cached balances for the account
1975        let mut balances = self.balances.write().await;
1976        let balance = balances
1977            .entry(account_type.to_string())
1978            .or_insert_with(Balance::new);
1979
1980        match event_type {
1981            // Spot account incremental balance update
1982            "balanceUpdate" => {
1983                let asset = message
1984                    .get("a")
1985                    .and_then(|a| a.as_str())
1986                    .ok_or_else(|| Error::invalid_request("Missing asset in balanceUpdate"))?;
1987
1988                let delta_str = message
1989                    .get("d")
1990                    .and_then(|d| d.as_str())
1991                    .ok_or_else(|| Error::invalid_request("Missing delta in balanceUpdate"))?;
1992
1993                let delta = Decimal::from_str(delta_str)
1994                    .map_err(|e| Error::invalid_request(format!("Invalid delta value: {}", e)))?;
1995
1996                // Apply delta to the cached balance
1997                balance.apply_delta(asset.to_string(), delta);
1998            }
1999
2000            // Spot account full balance update
2001            "outboundAccountPosition" => {
2002                if let Some(balances_array) = message.get("B").and_then(|b| b.as_array()) {
2003                    for balance_item in balances_array {
2004                        let asset =
2005                            balance_item
2006                                .get("a")
2007                                .and_then(|a| a.as_str())
2008                                .ok_or_else(|| {
2009                                    Error::invalid_request("Missing asset in balance item")
2010                                })?;
2011
2012                        let free_str = balance_item
2013                            .get("f")
2014                            .and_then(|f| f.as_str())
2015                            .ok_or_else(|| Error::invalid_request("Missing free balance"))?;
2016
2017                        let locked_str = balance_item
2018                            .get("l")
2019                            .and_then(|l| l.as_str())
2020                            .ok_or_else(|| Error::invalid_request("Missing locked balance"))?;
2021
2022                        let free = Decimal::from_str(free_str).map_err(|e| {
2023                            Error::invalid_request(format!("Invalid free value: {}", e))
2024                        })?;
2025
2026                        let locked = Decimal::from_str(locked_str).map_err(|e| {
2027                            Error::invalid_request(format!("Invalid locked value: {}", e))
2028                        })?;
2029
2030                        // Update the cached balance snapshot
2031                        balance.update_balance(asset.to_string(), free, locked);
2032                    }
2033                }
2034            }
2035
2036            // Futures/delivery account updates
2037            "ACCOUNT_UPDATE" => {
2038                if let Some(account_data) = message.get("a") {
2039                    // Parse balance array
2040                    if let Some(balances_array) = account_data.get("B").and_then(|b| b.as_array()) {
2041                        for balance_item in balances_array {
2042                            let asset = balance_item.get("a").and_then(|a| a.as_str()).ok_or_else(
2043                                || Error::invalid_request("Missing asset in balance item"),
2044                            )?;
2045
2046                            let wallet_balance_str = balance_item
2047                                .get("wb")
2048                                .and_then(|wb| wb.as_str())
2049                                .ok_or_else(|| Error::invalid_request("Missing wallet balance"))?;
2050
2051                            let wallet_balance =
2052                                Decimal::from_str(wallet_balance_str).map_err(|e| {
2053                                    Error::invalid_request(format!("Invalid wallet balance: {}", e))
2054                                })?;
2055
2056                            // Optional cross wallet balance
2057                            let cross_wallet = balance_item
2058                                .get("cw")
2059                                .and_then(|cw| cw.as_str())
2060                                .and_then(|s| Decimal::from_str(s).ok());
2061
2062                            // Update wallet balance snapshot
2063                            balance.update_wallet(asset.to_string(), wallet_balance, cross_wallet);
2064                        }
2065                    }
2066
2067                    // Positions are contained in account_data["P"]; handling occurs in watch_positions
2068                }
2069            }
2070
2071            _ => {
2072                return Err(Error::invalid_request(format!(
2073                    "Unknown balance event type: {}",
2074                    event_type
2075                )));
2076            }
2077        }
2078
2079        Ok(())
2080    }
2081
2082    /// Parses a WebSocket trade message
2083    ///
2084    /// Extracts trade information from an `executionReport` event
2085    ///
2086    /// # Arguments
2087    /// * `data` - Raw WebSocket JSON payload
2088    ///
2089    /// # Returns
2090    /// Parsed `Trade` structure
2091    fn parse_ws_trade(data: &Value) -> Result<Trade> {
2092        use ccxt_core::types::{Fee, OrderSide, OrderType, TakerOrMaker};
2093        use rust_decimal::Decimal;
2094        use std::str::FromStr;
2095
2096        // Extract symbol field
2097        let symbol = data
2098            .get("s")
2099            .and_then(serde_json::Value::as_str)
2100            .ok_or_else(|| Error::invalid_request("Missing symbol field".to_string()))?
2101            .to_string();
2102
2103        // Trade ID (field `t`)
2104        let id = data
2105            .get("t")
2106            .and_then(serde_json::Value::as_i64)
2107            .map(|v| v.to_string());
2108
2109        // Trade timestamp (field `T`)
2110        let timestamp = data
2111            .get("T")
2112            .and_then(serde_json::Value::as_i64)
2113            .unwrap_or(0);
2114
2115        // Executed price (field `L` - last executed price)
2116        let price = data
2117            .get("L")
2118            .and_then(serde_json::Value::as_str)
2119            .and_then(|s| Decimal::from_str(s).ok())
2120            .unwrap_or(Decimal::ZERO);
2121
2122        // Executed amount (field `l` - last executed quantity)
2123        let amount = data
2124            .get("l")
2125            .and_then(serde_json::Value::as_str)
2126            .and_then(|s| Decimal::from_str(s).ok())
2127            .unwrap_or(Decimal::ZERO);
2128
2129        // Quote asset amount (field `Y` - last quote asset transacted quantity)
2130        let cost = data
2131            .get("Y")
2132            .and_then(serde_json::Value::as_str)
2133            .and_then(|s| Decimal::from_str(s).ok())
2134            .or_else(|| {
2135                // Fallback: compute from price * amount when `Y` is unavailable
2136                if price > Decimal::ZERO && amount > Decimal::ZERO {
2137                    Some(price * amount)
2138                } else {
2139                    None
2140                }
2141            });
2142
2143        // Trade side (field `S`)
2144        let side = data
2145            .get("S")
2146            .and_then(serde_json::Value::as_str)
2147            .and_then(|s| match s.to_uppercase().as_str() {
2148                "BUY" => Some(OrderSide::Buy),
2149                "SELL" => Some(OrderSide::Sell),
2150                _ => None,
2151            })
2152            .unwrap_or(OrderSide::Buy);
2153
2154        // Order type (field `o`)
2155        let trade_type = data
2156            .get("o")
2157            .and_then(serde_json::Value::as_str)
2158            .and_then(|s| match s.to_uppercase().as_str() {
2159                "LIMIT" => Some(OrderType::Limit),
2160                "MARKET" => Some(OrderType::Market),
2161                _ => None,
2162            });
2163
2164        // Associated order ID (field `i`)
2165        let order_id = data
2166            .get("i")
2167            .and_then(serde_json::Value::as_i64)
2168            .map(|v| v.to_string());
2169
2170        // Maker/taker flag (field `m` - true when buyer is the maker)
2171        let taker_or_maker = data
2172            .get("m")
2173            .and_then(serde_json::Value::as_bool)
2174            .map(|is_maker| {
2175                if is_maker {
2176                    TakerOrMaker::Maker
2177                } else {
2178                    TakerOrMaker::Taker
2179                }
2180            });
2181
2182        // Fee information (fields `n` = fee amount, `N` = fee currency)
2183        let fee = if let Some(fee_cost_str) = data.get("n").and_then(serde_json::Value::as_str) {
2184            if let Ok(fee_cost) = Decimal::from_str(fee_cost_str) {
2185                let currency = data
2186                    .get("N")
2187                    .and_then(serde_json::Value::as_str)
2188                    .unwrap_or("UNKNOWN")
2189                    .to_string();
2190                Some(Fee {
2191                    currency,
2192                    cost: fee_cost,
2193                    rate: None,
2194                })
2195            } else {
2196                None
2197            }
2198        } else {
2199            None
2200        };
2201
2202        // Derive ISO8601 timestamp string when possible
2203        let datetime = chrono::DateTime::from_timestamp_millis(timestamp)
2204            .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string());
2205
2206        // Preserve the raw payload in the `info` map
2207        let mut info = HashMap::new();
2208        if let Value::Object(map) = data {
2209            for (k, v) in map {
2210                info.insert(k.clone(), v.clone());
2211            }
2212        }
2213
2214        Ok(Trade {
2215            id,
2216            order: order_id,
2217            symbol,
2218            trade_type,
2219            side,
2220            taker_or_maker,
2221            price: Price::from(price),
2222            amount: Amount::from(amount),
2223            cost: cost.map(Cost::from),
2224            fee,
2225            timestamp,
2226            datetime,
2227            info,
2228        })
2229    }
2230
2231    /// Filters cached personal trades by symbol, time range, and limit
2232    ///
2233    /// # Arguments
2234    /// * `symbol` - Optional symbol filter
2235    /// * `since` - Optional starting timestamp (inclusive)
2236    /// * `limit` - Optional maximum number of trades to return
2237    async fn filter_my_trades(
2238        &self,
2239        symbol: Option<&str>,
2240        since: Option<i64>,
2241        limit: Option<usize>,
2242    ) -> Result<Vec<Trade>> {
2243        let trades_map = self.my_trades.read().await;
2244
2245        // Filter by symbol when provided
2246        let mut trades: Vec<Trade> = if let Some(sym) = symbol {
2247            trades_map
2248                .get(sym)
2249                .map(|symbol_trades| symbol_trades.iter().cloned().collect())
2250                .unwrap_or_default()
2251        } else {
2252            trades_map
2253                .values()
2254                .flat_map(|symbol_trades| symbol_trades.iter().cloned())
2255                .collect()
2256        };
2257
2258        // Apply `since` filter when provided
2259        if let Some(since_ts) = since {
2260            trades.retain(|trade| trade.timestamp >= since_ts);
2261        }
2262
2263        // Sort by timestamp descending (latest first)
2264        trades.sort_by(|a, b| {
2265            let ts_a = a.timestamp;
2266            let ts_b = b.timestamp;
2267            ts_b.cmp(&ts_a)
2268        });
2269
2270        // Apply optional limit
2271        if let Some(lim) = limit {
2272            trades.truncate(lim);
2273        }
2274
2275        Ok(trades)
2276    }
2277
2278    /// Parses a WebSocket position payload
2279    ///
2280    /// # Arguments
2281    /// * `data` - Position data from the ACCOUNT_UPDATE event (`P` array element)
2282    ///
2283    /// # Returns
2284    /// Parsed `Position` instance
2285    ///
2286    /// # Binance WebSocket Position Payload Example
2287    /// ```json
2288    /// {
2289    ///   "s": "BTCUSDT",           // Trading pair
2290    ///   "pa": "-0.089",           // Position amount (negative indicates short)
2291    ///   "ep": "19700.03933",      // Entry price
2292    ///   "cr": "-1260.24809979",   // Accumulated realized PnL
2293    ///   "up": "1.53058860",       // Unrealized PnL
2294    ///   "mt": "isolated",         // Margin mode: isolated/cross
2295    ///   "iw": "87.13658940",      // Isolated wallet balance
2296    ///   "ps": "BOTH",             // Position side: BOTH/LONG/SHORT
2297    ///   "ma": "USDT"              // Margin asset
2298    /// }
2299    /// ```
2300    fn parse_ws_position(data: &Value) -> Result<Position> {
2301        // Extract required fields
2302        let symbol = data["s"]
2303            .as_str()
2304            .ok_or_else(|| Error::invalid_request("Missing symbol field"))?
2305            .to_string();
2306
2307        let position_amount_str = data["pa"]
2308            .as_str()
2309            .ok_or_else(|| Error::invalid_request("Missing position amount"))?;
2310
2311        let position_amount = position_amount_str
2312            .parse::<f64>()
2313            .map_err(|e| Error::invalid_request(format!("Invalid position amount: {}", e)))?;
2314
2315        // Extract position side
2316        let position_side = data["ps"]
2317            .as_str()
2318            .ok_or_else(|| Error::invalid_request("Missing position side"))?
2319            .to_uppercase();
2320
2321        // Determine hedged mode and actual side
2322        // - If ps = BOTH, hedged = false and use sign of `pa` for actual side
2323        // - If ps = LONG/SHORT, hedged = true and side equals ps
2324        let (side, hedged) = if position_side == "BOTH" {
2325            let actual_side = if position_amount < 0.0 {
2326                "short"
2327            } else {
2328                "long"
2329            };
2330            (actual_side.to_string(), false)
2331        } else {
2332            (position_side.to_lowercase(), true)
2333        };
2334
2335        // Extract additional fields
2336        let entry_price = data["ep"].as_str().and_then(|s| s.parse::<f64>().ok());
2337        let unrealized_pnl = data["up"].as_str().and_then(|s| s.parse::<f64>().ok());
2338        let realized_pnl = data["cr"].as_str().and_then(|s| s.parse::<f64>().ok());
2339        let margin_mode = data["mt"].as_str().map(ToString::to_string);
2340        let initial_margin = data["iw"].as_str().and_then(|s| s.parse::<f64>().ok());
2341        let _margin_asset = data["ma"].as_str().map(ToString::to_string);
2342
2343        // Construct the `Position` object
2344        Ok(Position {
2345            info: data.clone(),
2346            id: None,
2347            symbol,
2348            side: Some(side),
2349            contracts: Some(position_amount.abs()), // Absolute contract amount
2350            contract_size: None,
2351            entry_price,
2352            mark_price: None,
2353            notional: None,
2354            leverage: None,
2355            collateral: initial_margin, // Use isolated wallet balance as collateral
2356            initial_margin,
2357            initial_margin_percentage: None,
2358            maintenance_margin: None,
2359            maintenance_margin_percentage: None,
2360            unrealized_pnl,
2361            realized_pnl,
2362            liquidation_price: None,
2363            margin_ratio: None,
2364            margin_mode,
2365            hedged: Some(hedged),
2366            percentage: None,
2367            position_side: None,
2368            dual_side_position: None,
2369            timestamp: Some(chrono::Utc::now().timestamp_millis()),
2370            datetime: Some(chrono::Utc::now().to_rfc3339()),
2371        })
2372    }
2373
2374    /// Filters cached positions by symbol, time range, and limit
2375    ///
2376    /// # Arguments
2377    /// * `symbols` - Optional list of symbols to include
2378    /// * `since` - Optional starting timestamp (inclusive)
2379    /// * `limit` - Optional maximum number of positions to return
2380    ///
2381    /// # Returns
2382    /// Filtered list of positions
2383    async fn filter_positions(
2384        &self,
2385        symbols: Option<&[String]>,
2386        since: Option<i64>,
2387        limit: Option<usize>,
2388    ) -> Result<Vec<Position>> {
2389        let positions_map = self.positions.read().await;
2390
2391        // Filter by symbol list when provided
2392        let mut positions: Vec<Position> = if let Some(syms) = symbols {
2393            syms.iter()
2394                .filter_map(|sym| positions_map.get(sym))
2395                .flat_map(|side_map| side_map.values().cloned())
2396                .collect()
2397        } else {
2398            positions_map
2399                .values()
2400                .flat_map(|side_map| side_map.values().cloned())
2401                .collect()
2402        };
2403
2404        // Apply `since` filter when provided
2405        if let Some(since_ts) = since {
2406            positions.retain(|pos| pos.timestamp.is_some_and(|ts| ts >= since_ts));
2407        }
2408
2409        // Sort by timestamp descending (latest first)
2410        positions.sort_by(|a, b| {
2411            let ts_a = a.timestamp.unwrap_or(0);
2412            let ts_b = b.timestamp.unwrap_or(0);
2413            ts_b.cmp(&ts_a)
2414        });
2415
2416        // Apply optional limit
2417        if let Some(lim) = limit {
2418            positions.truncate(lim);
2419        }
2420
2421        Ok(positions)
2422    }
2423}
2424
2425impl Binance {
2426    /// Subscribes to the ticker stream for a unified symbol
2427    ///
2428    /// # Arguments
2429    /// * `symbol` - Unified trading pair identifier
2430    ///
2431    /// # Returns
2432    /// Result of the subscription call
2433    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
2434        let ws = self.create_ws();
2435        ws.connect().await?;
2436
2437        // Convert symbol format BTC/USDT -> btcusdt
2438        let binance_symbol = symbol.replace('/', "").to_lowercase();
2439        ws.subscribe_ticker(&binance_symbol).await
2440    }
2441
2442    /// Subscribes to the trade stream for a unified symbol
2443    ///
2444    /// # Arguments
2445    /// * `symbol` - Unified trading pair identifier
2446    ///
2447    /// # Returns
2448    /// Result of the subscription call
2449    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
2450        let ws = self.create_ws();
2451        ws.connect().await?;
2452
2453        let binance_symbol = symbol.replace('/', "").to_lowercase();
2454        ws.subscribe_trades(&binance_symbol).await
2455    }
2456
2457    /// Subscribes to the order book stream for a unified symbol
2458    ///
2459    /// # Arguments
2460    /// * `symbol` - Unified trading pair identifier
2461    /// * `levels` - Optional depth limit (default 20)
2462    ///
2463    /// # Returns
2464    /// Result of the subscription call
2465    pub async fn subscribe_orderbook(&self, symbol: &str, levels: Option<u32>) -> Result<()> {
2466        let ws = self.create_ws();
2467        ws.connect().await?;
2468
2469        let binance_symbol = symbol.replace('/', "").to_lowercase();
2470        let depth_levels = levels.unwrap_or(20);
2471        ws.subscribe_orderbook(&binance_symbol, depth_levels, "1000ms")
2472            .await
2473    }
2474
2475    /// Subscribes to the candlestick stream for a unified symbol
2476    ///
2477    /// # Arguments
2478    /// * `symbol` - Unified trading pair identifier
2479    /// * `interval` - Candlestick interval identifier
2480    ///
2481    /// # Returns
2482    /// Result of the subscription call
2483    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
2484        let ws = self.create_ws();
2485        ws.connect().await?;
2486
2487        let binance_symbol = symbol.replace('/', "").to_lowercase();
2488        ws.subscribe_kline(&binance_symbol, interval).await
2489    }
2490
2491    /// Watches a ticker stream for a single unified symbol
2492    ///
2493    /// # Arguments
2494    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2495    /// * `params` - Optional parameters
2496    ///   - `name`: Channel name (ticker/miniTicker, defaults to ticker)
2497    ///
2498    /// # Returns
2499    /// Parsed ticker structure
2500    ///
2501    /// # Example
2502    /// ```rust,no_run
2503    /// # use ccxt_exchanges::binance::Binance;
2504    /// # use ccxt_core::ExchangeConfig;
2505    /// # use ccxt_core::types::Price;
2506    /// # async fn example() -> ccxt_core::error::Result<()> {
2507    /// let exchange = Binance::new(ExchangeConfig::default())?;
2508    /// let ticker = exchange.watch_ticker("BTC/USDT", None).await?;
2509    /// println!("Price: {}", ticker.last.unwrap_or(Price::ZERO));
2510    /// # Ok(())
2511    /// # }
2512    /// ```
2513    pub async fn watch_ticker(
2514        &self,
2515        symbol: &str,
2516        params: Option<HashMap<String, Value>>,
2517    ) -> Result<Ticker> {
2518        // Load market metadata
2519        self.load_markets(false).await?;
2520
2521        // Convert unified symbol to exchange format
2522        let market = self.base.market(symbol).await?;
2523        let binance_symbol = market.id.to_lowercase();
2524
2525        // Select channel name
2526        let channel_name = if let Some(p) = &params {
2527            p.get("name")
2528                .and_then(serde_json::Value::as_str)
2529                .unwrap_or("ticker")
2530        } else {
2531            "ticker"
2532        };
2533
2534        // Establish WebSocket connection
2535        let ws = self.create_ws();
2536        ws.connect().await?;
2537
2538        // Watch ticker
2539        ws.watch_ticker_internal(&binance_symbol, channel_name)
2540            .await
2541    }
2542
2543    /// Watches ticker streams for multiple unified symbols
2544    ///
2545    /// # Arguments
2546    /// * `symbols` - Optional list of unified trading pairs (None subscribes to all)
2547    /// * `params` - Optional parameters
2548    ///   - `name`: Channel name (ticker/miniTicker, defaults to ticker)
2549    ///
2550    /// # Returns
2551    /// Mapping of symbol to ticker data
2552    ///
2553    /// # Example
2554    /// ```rust,no_run
2555    /// # use ccxt_exchanges::binance::Binance;
2556    /// # use ccxt_core::ExchangeConfig;
2557    /// # async fn example() -> ccxt_core::error::Result<()> {
2558    /// let exchange = Binance::new(ExchangeConfig::default())?;
2559    ///
2560    /// // Watch a subset of symbols
2561    /// let tickers = exchange.watch_tickers(
2562    ///     Some(vec!["BTC/USDT".to_string(), "ETH/USDT".to_string()]),
2563    ///     None
2564    /// ).await?;
2565    ///
2566    /// // Watch all symbols
2567    /// let all_tickers = exchange.watch_tickers(None, None).await?;
2568    /// # Ok(())
2569    /// # }
2570    /// ```
2571    pub async fn watch_tickers(
2572        &self,
2573        symbols: Option<Vec<String>>,
2574        params: Option<HashMap<String, Value>>,
2575    ) -> Result<HashMap<String, Ticker>> {
2576        // Load market metadata
2577        self.load_markets(false).await?;
2578
2579        // Determine channel name
2580        let channel_name = if let Some(p) = &params {
2581            p.get("name")
2582                .and_then(serde_json::Value::as_str)
2583                .unwrap_or("ticker")
2584        } else {
2585            "ticker"
2586        };
2587
2588        // Validate channel selection
2589        if channel_name == "bookTicker" {
2590            return Err(Error::invalid_request(
2591                "To subscribe for bids-asks, use watch_bids_asks() method instead",
2592            ));
2593        }
2594
2595        // Convert unified symbols to exchange format
2596        let binance_symbols = if let Some(syms) = symbols {
2597            let mut result = Vec::new();
2598            for symbol in syms {
2599                let market = self.base.market(&symbol).await?;
2600                result.push(market.id.to_lowercase());
2601            }
2602            Some(result)
2603        } else {
2604            None
2605        };
2606
2607        // Establish WebSocket connection
2608        let ws = self.create_ws();
2609        ws.connect().await?;
2610
2611        // Watch tickers
2612        ws.watch_tickers_internal(binance_symbols, channel_name)
2613            .await
2614    }
2615
2616    /// Watches the mark price stream for a futures market
2617    ///
2618    /// # Arguments
2619    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT:USDT")
2620    /// * `params` - Optional parameters
2621    ///   - `use1sFreq`: Whether to use 1-second updates (defaults to true)
2622    ///
2623    /// # Returns
2624    /// Ticker structure representing the mark price
2625    ///
2626    /// # Example
2627    /// ```rust,no_run
2628    /// # use ccxt_exchanges::binance::Binance;
2629    /// # use ccxt_core::ExchangeConfig;
2630    /// # use std::collections::HashMap;
2631    /// # use serde_json::json;
2632    /// # async fn example() -> ccxt_core::error::Result<()> {
2633    /// let exchange = Binance::new(ExchangeConfig::default())?;
2634    ///
2635    /// // Use 1-second updates
2636    /// let ticker = exchange.watch_mark_price("BTC/USDT:USDT", None).await?;
2637    ///
2638    /// // Use 3-second updates
2639    /// let mut params = HashMap::new();
2640    /// params.insert("use1sFreq".to_string(), json!(false));
2641    /// let ticker = exchange.watch_mark_price("BTC/USDT:USDT", Some(params)).await?;
2642    /// # Ok(())
2643    /// # }
2644    /// ```
2645    pub async fn watch_mark_price(
2646        &self,
2647        symbol: &str,
2648        params: Option<HashMap<String, Value>>,
2649    ) -> Result<Ticker> {
2650        // Load market metadata
2651        self.load_markets(false).await?;
2652
2653        // Ensure the symbol belongs to a futures market
2654        let market = self.base.market(symbol).await?;
2655        if market.market_type != MarketType::Swap && market.market_type != MarketType::Futures {
2656            return Err(Error::invalid_request(format!(
2657                "watch_mark_price() does not support {} markets",
2658                market.market_type
2659            )));
2660        }
2661
2662        let binance_symbol = market.id.to_lowercase();
2663
2664        // Determine update frequency
2665        let use_1s_freq = if let Some(p) = &params {
2666            p.get("use1sFreq")
2667                .and_then(serde_json::Value::as_bool)
2668                .unwrap_or(true)
2669        } else {
2670            true
2671        };
2672
2673        // Construct channel name
2674        let channel_name = if use_1s_freq {
2675            "markPrice@1s"
2676        } else {
2677            "markPrice"
2678        };
2679
2680        // Establish WebSocket connection
2681        let ws = self.create_ws();
2682        ws.connect().await?;
2683
2684        // Watch mark price
2685        ws.watch_ticker_internal(&binance_symbol, channel_name)
2686            .await
2687    }
2688
2689    /// Watches an order book stream for a unified symbol
2690    ///
2691    /// # Arguments
2692    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2693    /// * `limit` - Optional depth limit (defaults to unlimited)
2694    /// * `params` - Optional parameters
2695    ///   - `speed`: Update frequency (100 or 1000 ms, defaults to 100)
2696    ///
2697    /// # Returns
2698    /// Order book snapshot populated with streaming updates
2699    ///
2700    /// # Example
2701    /// ```rust,no_run
2702    /// # use ccxt_exchanges::binance::Binance;
2703    /// # use ccxt_core::ExchangeConfig;
2704    /// # use std::collections::HashMap;
2705    /// # use serde_json::json;
2706    /// # async fn example() -> ccxt_core::error::Result<()> {
2707    /// let exchange = Binance::new(ExchangeConfig::default())?;
2708    ///
2709    /// // Watch order book with 100 ms updates
2710    /// let orderbook = exchange.watch_order_book("BTC/USDT", None, None).await?;
2711    /// println!("Best bid: {:?}", orderbook.best_bid());
2712    /// println!("Best ask: {:?}", orderbook.best_ask());
2713    ///
2714    /// // Watch order book limited to 100 levels with 1000 ms updates
2715    /// let mut params = HashMap::new();
2716    /// params.insert("speed".to_string(), json!(1000));
2717    /// let orderbook = exchange.watch_order_book(
2718    ///     "BTC/USDT",
2719    ///     Some(100),
2720    ///     Some(params)
2721    /// ).await?;
2722    /// # Ok(())
2723    /// # }
2724    /// ```
2725    pub async fn watch_order_book(
2726        &self,
2727        symbol: &str,
2728        limit: Option<i64>,
2729        params: Option<HashMap<String, Value>>,
2730    ) -> Result<OrderBook> {
2731        // Load market metadata
2732        self.load_markets(false).await?;
2733
2734        // Resolve market details
2735        let market = self.base.market(symbol).await?;
2736        let binance_symbol = market.id.to_lowercase();
2737
2738        // Determine whether this is a futures market
2739        let is_futures =
2740            market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2741
2742        // Determine update speed
2743        let update_speed = if let Some(p) = &params {
2744            p.get("speed")
2745                .and_then(serde_json::Value::as_i64)
2746                .unwrap_or(100) as i32
2747        } else {
2748            100
2749        };
2750
2751        // Validate update speed
2752        if update_speed != 100 && update_speed != 1000 {
2753            return Err(Error::invalid_request(
2754                "Update speed must be 100 or 1000 milliseconds",
2755            ));
2756        }
2757
2758        // Establish WebSocket connection
2759        let ws = self.create_ws();
2760        ws.connect().await?;
2761
2762        // Watch the order book
2763        ws.watch_orderbook_internal(self, &binance_symbol, limit, update_speed, is_futures)
2764            .await
2765    }
2766
2767    /// Watches order books for multiple symbols
2768    ///
2769    /// # Arguments
2770    /// * `symbols` - List of trading pairs (maximum 200)
2771    /// * `limit` - Optional depth limit
2772    /// * `params` - Optional parameters
2773    ///   - `speed`: Update frequency (100 or 1000 ms)
2774    ///
2775    /// # Returns
2776    /// Mapping of symbol to corresponding order book
2777    ///
2778    /// # Example
2779    /// ```rust,no_run
2780    /// # use ccxt_exchanges::binance::Binance;
2781    /// # use ccxt_core::ExchangeConfig;
2782    /// # async fn example() -> ccxt_core::error::Result<()> {
2783    /// let exchange = Binance::new(ExchangeConfig::default())?;
2784    ///
2785    /// let symbols = vec![
2786    ///     "BTC/USDT".to_string(),
2787    ///     "ETH/USDT".to_string(),
2788    /// ];
2789    ///
2790    /// let orderbooks = exchange.watch_order_books(symbols, None, None).await?;
2791    /// for (symbol, ob) in orderbooks {
2792    ///     println!("{}: spread = {:?}", symbol, ob.spread());
2793    /// }
2794    /// # Ok(())
2795    /// # }
2796    /// ```
2797    pub async fn watch_order_books(
2798        &self,
2799        symbols: Vec<String>,
2800        limit: Option<i64>,
2801        params: Option<HashMap<String, Value>>,
2802    ) -> Result<HashMap<String, OrderBook>> {
2803        // Enforce symbol count constraints
2804        if symbols.is_empty() {
2805            return Err(Error::invalid_request("Symbols list cannot be empty"));
2806        }
2807
2808        if symbols.len() > 200 {
2809            return Err(Error::invalid_request(
2810                "Binance supports max 200 symbols per connection",
2811            ));
2812        }
2813
2814        // Load market metadata
2815        self.load_markets(false).await?;
2816
2817        // Convert symbols to exchange format and ensure consistent market type
2818        let mut binance_symbols = Vec::new();
2819        let mut is_futures = false;
2820
2821        for symbol in &symbols {
2822            let market = self.base.market(symbol).await?;
2823            binance_symbols.push(market.id.to_lowercase());
2824
2825            let current_is_futures =
2826                market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2827            if !binance_symbols.is_empty() && current_is_futures != is_futures {
2828                return Err(Error::invalid_request(
2829                    "Cannot mix spot and futures markets in watch_order_books",
2830                ));
2831            }
2832            is_futures = current_is_futures;
2833        }
2834
2835        // Determine update speed
2836        let update_speed = if let Some(p) = &params {
2837            p.get("speed")
2838                .and_then(serde_json::Value::as_i64)
2839                .unwrap_or(100) as i32
2840        } else {
2841            100
2842        };
2843
2844        // Establish WebSocket connection
2845        let ws = self.create_ws();
2846        ws.connect().await?;
2847
2848        // Watch order books
2849        ws.watch_orderbooks_internal(self, binance_symbols, limit, update_speed, is_futures)
2850            .await
2851    }
2852
2853    /// Watches mark prices for multiple futures symbols
2854    ///
2855    /// # Arguments
2856    /// * `symbols` - Optional list of symbols (None subscribes to all)
2857    /// * `params` - Optional parameters
2858    ///   - `use1sFreq`: Whether to use 1-second updates (defaults to true)
2859    ///
2860    /// # Returns
2861    /// Mapping of symbol to ticker data
2862    pub async fn watch_mark_prices(
2863        &self,
2864        symbols: Option<Vec<String>>,
2865        params: Option<HashMap<String, Value>>,
2866    ) -> Result<HashMap<String, Ticker>> {
2867        // Load market metadata
2868        self.load_markets(false).await?;
2869
2870        // Determine update frequency
2871        let use_1s_freq = if let Some(p) = &params {
2872            p.get("use1sFreq")
2873                .and_then(serde_json::Value::as_bool)
2874                .unwrap_or(true)
2875        } else {
2876            true
2877        };
2878
2879        // Construct channel name
2880        let channel_name = if use_1s_freq {
2881            "markPrice@1s"
2882        } else {
2883            "markPrice"
2884        };
2885
2886        // Convert symbols and validate market type
2887        let binance_symbols = if let Some(syms) = symbols {
2888            let mut result = Vec::new();
2889            for symbol in syms {
2890                let market = self.base.market(&symbol).await?;
2891                if market.market_type != MarketType::Swap
2892                    && market.market_type != MarketType::Futures
2893                {
2894                    return Err(Error::invalid_request(format!(
2895                        "watch_mark_prices() does not support {} markets",
2896                        market.market_type
2897                    )));
2898                }
2899                result.push(market.id.to_lowercase());
2900            }
2901            Some(result)
2902        } else {
2903            None
2904        };
2905
2906        // Establish WebSocket connection
2907        let ws = self.create_ws();
2908        ws.connect().await?;
2909
2910        // Watch mark prices
2911        ws.watch_tickers_internal(binance_symbols, channel_name)
2912            .await
2913    }
2914    /// Streams trade data for a unified symbol
2915    ///
2916    /// # Arguments
2917    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2918    /// * `since` - Optional starting timestamp in milliseconds
2919    /// * `limit` - Optional maximum number of trades to return
2920    ///
2921    /// # Returns
2922    /// Vector of parsed trade data
2923    pub async fn watch_trades(
2924        &self,
2925        symbol: &str,
2926        since: Option<i64>,
2927        limit: Option<usize>,
2928    ) -> Result<Vec<Trade>> {
2929        const MAX_RETRIES: u32 = 50;
2930        const MAX_TRADES: usize = 1000;
2931
2932        // Ensure market metadata is loaded
2933        self.base.load_markets(false).await?;
2934
2935        // Resolve market information
2936        let market = self.base.market(symbol).await?;
2937        let binance_symbol = market.id.to_lowercase();
2938
2939        // Establish WebSocket connection
2940        let ws = self.create_ws();
2941        ws.connect().await?;
2942
2943        // Subscribe to the trade stream
2944        ws.subscribe_trades(&binance_symbol).await?;
2945
2946        // Process incoming messages
2947        let mut retries = 0;
2948
2949        while retries < MAX_RETRIES {
2950            if let Some(msg) = ws.client.receive().await {
2951                // Skip subscription acknowledgement messages
2952                if msg.get("result").is_some() || msg.get("id").is_some() {
2953                    continue;
2954                }
2955
2956                // Parse trade payload
2957                if let Ok(trade) = parser::parse_ws_trade(&msg, Some(&market)) {
2958                    // Cache trade payload
2959                    let mut trades_map = ws.trades.lock().await;
2960                    let trades = trades_map
2961                        .entry(symbol.to_string())
2962                        .or_insert_with(VecDeque::new);
2963
2964                    // Enforce cache size limit
2965                    if trades.len() >= MAX_TRADES {
2966                        trades.pop_front();
2967                    }
2968                    trades.push_back(trade);
2969
2970                    // Gather trades from cache
2971                    let mut result: Vec<Trade> = trades.iter().cloned().collect();
2972
2973                    // Apply optional `since` filter
2974                    if let Some(since_ts) = since {
2975                        result.retain(|t| t.timestamp >= since_ts);
2976                    }
2977
2978                    // Apply optional limit
2979                    if let Some(limit_size) = limit {
2980                        if result.len() > limit_size {
2981                            result = result.split_off(result.len() - limit_size);
2982                        }
2983                    }
2984
2985                    return Ok(result);
2986                }
2987            }
2988
2989            retries += 1;
2990            tokio::time::sleep(Duration::from_millis(100)).await;
2991        }
2992
2993        Err(Error::network("Timeout waiting for trade data"))
2994    }
2995
2996    /// Streams OHLCV data for a unified symbol
2997    ///
2998    /// # Arguments
2999    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
3000    /// * `timeframe` - Candlestick interval (e.g. "1m", "5m", "1h", "1d")
3001    /// * `since` - Optional starting timestamp in milliseconds
3002    /// * `limit` - Optional maximum number of entries to return
3003    ///
3004    /// # Returns
3005    /// Vector of OHLCV entries
3006    pub async fn watch_ohlcv(
3007        &self,
3008        symbol: &str,
3009        timeframe: &str,
3010        since: Option<i64>,
3011        limit: Option<usize>,
3012    ) -> Result<Vec<OHLCV>> {
3013        const MAX_RETRIES: u32 = 50;
3014        const MAX_OHLCVS: usize = 1000;
3015
3016        // Ensure market metadata is loaded
3017        self.base.load_markets(false).await?;
3018
3019        // Resolve market information
3020        let market = self.base.market(symbol).await?;
3021        let binance_symbol = market.id.to_lowercase();
3022
3023        // Establish WebSocket connection
3024        let ws = self.create_ws();
3025        ws.connect().await?;
3026
3027        // Subscribe to the Kline stream
3028        ws.subscribe_kline(&binance_symbol, timeframe).await?;
3029
3030        // Process incoming messages
3031        let mut retries = 0;
3032
3033        while retries < MAX_RETRIES {
3034            if let Some(msg) = ws.client.receive().await {
3035                // Skip subscription acknowledgement messages
3036                if msg.get("result").is_some() || msg.get("id").is_some() {
3037                    continue;
3038                }
3039
3040                // Parse OHLCV payload
3041                if let Ok(ohlcv) = parser::parse_ws_ohlcv(&msg) {
3042                    // Cache OHLCV entries
3043                    let cache_key = format!("{}:{}", symbol, timeframe);
3044                    let mut ohlcvs_map = ws.ohlcvs.lock().await;
3045                    let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
3046
3047                    // Enforce cache size limit
3048                    if ohlcvs.len() >= MAX_OHLCVS {
3049                        ohlcvs.pop_front();
3050                    }
3051                    ohlcvs.push_back(ohlcv);
3052
3053                    // Collect results from cache
3054                    let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
3055
3056                    // Apply optional `since` filter
3057                    if let Some(since_ts) = since {
3058                        result.retain(|o| o.timestamp >= since_ts);
3059                    }
3060
3061                    // Apply optional limit
3062                    if let Some(limit_size) = limit {
3063                        if result.len() > limit_size {
3064                            result = result.split_off(result.len() - limit_size);
3065                        }
3066                    }
3067
3068                    return Ok(result);
3069                }
3070            }
3071
3072            retries += 1;
3073            tokio::time::sleep(Duration::from_millis(100)).await;
3074        }
3075
3076        Err(Error::network("Timeout waiting for OHLCV data"))
3077    }
3078
3079    /// Streams the best bid/ask data for a unified symbol
3080    ///
3081    /// # Arguments
3082    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
3083    ///
3084    /// # Returns
3085    /// Latest bid/ask snapshot
3086    pub async fn watch_bids_asks(&self, symbol: &str) -> Result<BidAsk> {
3087        const MAX_RETRIES: u32 = 50;
3088
3089        // Ensure market metadata is loaded
3090        self.base.load_markets(false).await?;
3091
3092        // Resolve market details
3093        let market = self.base.market(symbol).await?;
3094        let binance_symbol = market.id.to_lowercase();
3095
3096        // Establish WebSocket connection
3097        let ws = self.create_ws();
3098        ws.connect().await?;
3099
3100        // Subscribe to the bookTicker stream
3101        let stream_name = format!("{}@bookTicker", binance_symbol);
3102        ws.client
3103            .subscribe(stream_name, Some(symbol.to_string()), None)
3104            .await?;
3105
3106        // Process incoming messages
3107        let mut retries = 0;
3108
3109        while retries < MAX_RETRIES {
3110            if let Some(msg) = ws.client.receive().await {
3111                // Skip subscription acknowledgement messages
3112                if msg.get("result").is_some() || msg.get("id").is_some() {
3113                    continue;
3114                }
3115
3116                // Parse bid/ask payload
3117                if let Ok(bid_ask) = parser::parse_ws_bid_ask(&msg) {
3118                    // Cache the snapshot
3119                    let mut bids_asks_map = ws.bids_asks.lock().await;
3120                    bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
3121
3122                    return Ok(bid_ask);
3123                }
3124            }
3125
3126            retries += 1;
3127            tokio::time::sleep(Duration::from_millis(100)).await;
3128        }
3129
3130        Err(Error::network("Timeout waiting for BidAsk data"))
3131    }
3132
3133    /// Streams account balance changes (private user data stream)
3134    ///
3135    /// # Arguments
3136    /// * `params` - Optional parameters
3137    ///   - `type`: Account type (spot/future/delivery/margin, etc.)
3138    ///   - `fetchBalanceSnapshot`: Whether to fetch an initial snapshot (default false)
3139    ///   - `awaitBalanceSnapshot`: Whether to wait for snapshot completion (default true)
3140    ///
3141    /// # Returns
3142    /// Updated account balances
3143    ///
3144    /// # Example
3145    ///
3146    /// ```rust,no_run
3147    /// # use ccxt_exchanges::binance::Binance;
3148    /// # use ccxt_core::ExchangeConfig;
3149    /// # use std::collections::HashMap;
3150    /// # use std::sync::Arc;
3151    /// # use serde_json::json;
3152    /// # async fn example() -> ccxt_core::error::Result<()> {
3153    /// let mut config = ExchangeConfig::default();
3154    /// config.api_key = Some("your-api-key".to_string());
3155    /// config.secret = Some("your-secret".to_string());
3156    /// let exchange = Arc::new(Binance::new(config)?);
3157    ///
3158    /// // Watch spot account balance
3159    /// let balance = exchange.clone().watch_balance(None).await?;
3160    ///
3161    /// // Watch futures account balance
3162    /// let mut params = HashMap::new();
3163    /// params.insert("type".to_string(), json!("future"));
3164    /// let futures_balance = exchange.clone().watch_balance(Some(params)).await?;
3165    /// # Ok(())
3166    /// # }
3167    /// ```
3168    pub async fn watch_balance(
3169        self: Arc<Self>,
3170        params: Option<HashMap<String, Value>>,
3171    ) -> Result<Balance> {
3172        const MAX_RETRIES: u32 = 100;
3173
3174        // Ensure market metadata is loaded
3175        self.base.load_markets(false).await?;
3176
3177        // Resolve account type
3178        let account_type = if let Some(p) = &params {
3179            p.get("type")
3180                .and_then(serde_json::Value::as_str)
3181                .unwrap_or_else(|| self.options.default_type.as_str())
3182        } else {
3183            self.options.default_type.as_str()
3184        };
3185
3186        // Determine configuration flags
3187        let fetch_snapshot = if let Some(p) = &params {
3188            p.get("fetchBalanceSnapshot")
3189                .and_then(serde_json::Value::as_bool)
3190                .unwrap_or(false)
3191        } else {
3192            false
3193        };
3194
3195        let await_snapshot = if let Some(p) = &params {
3196            p.get("awaitBalanceSnapshot")
3197                .and_then(serde_json::Value::as_bool)
3198                .unwrap_or(true)
3199        } else {
3200            true
3201        };
3202
3203        // Establish authenticated WebSocket connection
3204        let ws = self.create_authenticated_ws();
3205        ws.connect().await?;
3206
3207        // Optionally fetch the initial snapshot
3208        if fetch_snapshot {
3209            // Convert string account type to AccountType enum
3210            let account_type_enum = account_type.parse::<ccxt_core::types::AccountType>().ok();
3211            let snapshot = self.fetch_balance(account_type_enum).await?;
3212
3213            // Update cache with the snapshot
3214            let mut balances = ws.balances.write().await;
3215            balances.insert(account_type.to_string(), snapshot.clone());
3216
3217            if !await_snapshot {
3218                return Ok(snapshot);
3219            }
3220        }
3221
3222        // Process balance update messages
3223        let mut retries = 0;
3224
3225        while retries < MAX_RETRIES {
3226            if let Some(msg) = ws.client.receive().await {
3227                // Skip subscription acknowledgement messages
3228                if msg.get("result").is_some() || msg.get("id").is_some() {
3229                    continue;
3230                }
3231
3232                // Determine message event type
3233                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3234                    // Handle supported balance message types
3235                    if matches!(
3236                        event_type,
3237                        "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE"
3238                    ) {
3239                        // Parse and update local balance cache
3240                        if let Ok(()) = ws.handle_balance_message(&msg, account_type).await {
3241                            // Retrieve the updated balance snapshot
3242                            let balances = ws.balances.read().await;
3243                            if let Some(balance) = balances.get(account_type) {
3244                                return Ok(balance.clone());
3245                            }
3246                        }
3247                    }
3248                }
3249            }
3250
3251            retries += 1;
3252            tokio::time::sleep(Duration::from_millis(100)).await;
3253        }
3254
3255        Err(Error::network("Timeout waiting for balance data"))
3256    }
3257
3258    /// Watches authenticated order updates via the user data stream
3259    ///
3260    /// Streams real-time order status changes delivered by Binance user data WebSocket messages
3261    ///
3262    /// # Arguments
3263    /// * `symbol` - Optional trading pair filter (e.g. "BTC/USDT")
3264    /// * `since` - Optional starting timestamp in milliseconds
3265    /// * `limit` - Optional maximum number of orders to return
3266    /// * `params` - Optional additional parameters
3267    ///
3268    /// # Returns
3269    /// Orders returned in descending chronological order
3270    ///
3271    /// # Examples
3272    /// ```no_run
3273    /// use std::sync::Arc;
3274    /// use ccxt_exchanges::binance::Binance;
3275    ///
3276    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
3277    ///     let exchange = Arc::new(Binance::new(Default::default())?);
3278    ///
3279    ///     // Watch all order updates
3280    ///     let orders = exchange.clone().watch_orders(None, None, None, None).await?;
3281    ///     println!("Received {} order updates", orders.len());
3282    ///
3283    ///     // Watch updates for a specific trading pair
3284    ///     let btc_orders = exchange.clone().watch_orders(Some("BTC/USDT"), None, None, None).await?;
3285    ///     println!("BTC/USDT orders: {:?}", btc_orders);
3286    ///
3287    ///     Ok(())
3288    /// }
3289    /// ```
3290    pub async fn watch_orders(
3291        self: Arc<Self>,
3292        symbol: Option<&str>,
3293        since: Option<i64>,
3294        limit: Option<usize>,
3295        _params: Option<HashMap<String, Value>>,
3296    ) -> Result<Vec<Order>> {
3297        self.base.load_markets(false).await?;
3298
3299        let ws = self.create_authenticated_ws();
3300        ws.connect().await?;
3301
3302        // Receive messages in a loop
3303        loop {
3304            if let Some(msg) = ws.client.receive().await {
3305                if let Value::Object(data) = msg {
3306                    if let Some(event_type) = data.get("e").and_then(serde_json::Value::as_str) {
3307                        if event_type == "executionReport" {
3308                            // Parse order payload
3309                            let order = Binance::parse_ws_order(&data);
3310
3311                            // Update order cache
3312                            let mut orders = ws.orders.write().await;
3313                            let symbol_orders = orders
3314                                .entry(order.symbol.clone())
3315                                .or_insert_with(HashMap::new);
3316                            symbol_orders.insert(order.id.clone(), order.clone());
3317                            drop(orders);
3318
3319                            // Check for trade execution events
3320                            if let Some(exec_type) =
3321                                data.get("x").and_then(serde_json::Value::as_str)
3322                            {
3323                                if exec_type == "TRADE" {
3324                                    // Parse execution trade payload
3325                                    if let Ok(trade) =
3326                                        BinanceWs::parse_ws_trade(&Value::Object(data.clone()))
3327                                    {
3328                                        // Update my trades cache
3329                                        let mut trades = ws.my_trades.write().await;
3330                                        let symbol_trades = trades
3331                                            .entry(trade.symbol.clone())
3332                                            .or_insert_with(VecDeque::new);
3333
3334                                        // Prepend newest trade and enforce max length of 1000
3335                                        symbol_trades.push_front(trade);
3336                                        if symbol_trades.len() > 1000 {
3337                                            symbol_trades.pop_back();
3338                                        }
3339                                    }
3340                                }
3341                            }
3342
3343                            // Return filtered orders
3344                            return self.filter_orders(&ws, symbol, since, limit).await;
3345                        }
3346                    }
3347                }
3348            } else {
3349                tokio::time::sleep(Duration::from_millis(100)).await;
3350            }
3351        }
3352    }
3353
3354    /// Parses a WebSocket order message
3355    fn parse_ws_order(data: &serde_json::Map<String, Value>) -> Order {
3356        use ccxt_core::types::{OrderSide, OrderStatus, OrderType};
3357        use rust_decimal::Decimal;
3358        use std::str::FromStr;
3359
3360        // Extract core fields
3361        let symbol = data
3362            .get("s")
3363            .and_then(serde_json::Value::as_str)
3364            .unwrap_or("");
3365        let order_id = data
3366            .get("i")
3367            .and_then(serde_json::Value::as_i64)
3368            .map(|id| id.to_string())
3369            .unwrap_or_default();
3370        let client_order_id = data
3371            .get("c")
3372            .and_then(serde_json::Value::as_str)
3373            .map(String::from);
3374
3375        // Map order status
3376        let status_str = data
3377            .get("X")
3378            .and_then(serde_json::Value::as_str)
3379            .unwrap_or("NEW");
3380        let status = match status_str {
3381            "FILLED" => OrderStatus::Closed,
3382            "CANCELED" => OrderStatus::Cancelled,
3383            "REJECTED" => OrderStatus::Rejected,
3384            "EXPIRED" => OrderStatus::Expired,
3385            _ => OrderStatus::Open,
3386        };
3387
3388        // Map order side
3389        let side_str = data
3390            .get("S")
3391            .and_then(serde_json::Value::as_str)
3392            .unwrap_or("BUY");
3393        let side = match side_str {
3394            "SELL" => OrderSide::Sell,
3395            _ => OrderSide::Buy,
3396        };
3397
3398        // Map order type
3399        let type_str = data
3400            .get("o")
3401            .and_then(serde_json::Value::as_str)
3402            .unwrap_or("LIMIT");
3403        let order_type = match type_str {
3404            "MARKET" => OrderType::Market,
3405            "STOP_LOSS" => OrderType::StopLoss,
3406            "STOP_LOSS_LIMIT" => OrderType::StopLossLimit,
3407            "TAKE_PROFIT" => OrderType::TakeProfit,
3408            "TAKE_PROFIT_LIMIT" => OrderType::TakeProfitLimit,
3409            "LIMIT_MAKER" => OrderType::LimitMaker,
3410            _ => OrderType::Limit,
3411        };
3412
3413        // Parse amount and price fields
3414        let amount = data
3415            .get("q")
3416            .and_then(serde_json::Value::as_str)
3417            .and_then(|s| Decimal::from_str(s).ok())
3418            .unwrap_or(Decimal::ZERO);
3419
3420        let price = data
3421            .get("p")
3422            .and_then(serde_json::Value::as_str)
3423            .and_then(|s| Decimal::from_str(s).ok());
3424
3425        let filled = data
3426            .get("z")
3427            .and_then(serde_json::Value::as_str)
3428            .and_then(|s| Decimal::from_str(s).ok());
3429
3430        let cost = data
3431            .get("Z")
3432            .and_then(serde_json::Value::as_str)
3433            .and_then(|s| Decimal::from_str(s).ok());
3434
3435        // Derive remaining quantity
3436        let remaining = filled.map(|fill| amount - fill);
3437
3438        // Compute average price
3439        let average = match (filled, cost) {
3440            (Some(fill), Some(c)) if fill > Decimal::ZERO && c > Decimal::ZERO => Some(c / fill),
3441            _ => None,
3442        };
3443
3444        // Parse timestamps
3445        let timestamp = data.get("T").and_then(serde_json::Value::as_i64);
3446        let last_trade_timestamp = data.get("T").and_then(serde_json::Value::as_i64);
3447
3448        Order {
3449            id: order_id,
3450            client_order_id,
3451            timestamp,
3452            datetime: timestamp.map(|ts| {
3453                chrono::DateTime::from_timestamp_millis(ts)
3454                    .map(|dt| dt.to_rfc3339())
3455                    .unwrap_or_default()
3456            }),
3457            last_trade_timestamp,
3458            symbol: symbol.to_string(),
3459            order_type,
3460            side,
3461            price,
3462            average,
3463            amount,
3464            cost,
3465            filled,
3466            remaining,
3467            status,
3468            fee: None,
3469            fees: None,
3470            trades: None,
3471            time_in_force: data
3472                .get("f")
3473                .and_then(serde_json::Value::as_str)
3474                .map(String::from),
3475            post_only: None,
3476            reduce_only: None,
3477            stop_price: data
3478                .get("P")
3479                .and_then(serde_json::Value::as_str)
3480                .and_then(|s| Decimal::from_str(s).ok()),
3481            trigger_price: None,
3482            take_profit_price: None,
3483            stop_loss_price: None,
3484            trailing_delta: None,
3485            trailing_percent: None,
3486            activation_price: None,
3487            callback_rate: None,
3488            working_type: data
3489                .get("wt")
3490                .and_then(serde_json::Value::as_str)
3491                .map(String::from),
3492            info: data.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
3493        }
3494    }
3495
3496    /// Filters cached orders by symbol, time range, and limit
3497    async fn filter_orders(
3498        &self,
3499        ws: &BinanceWs,
3500        symbol: Option<&str>,
3501        since: Option<i64>,
3502        limit: Option<usize>,
3503    ) -> Result<Vec<Order>> {
3504        let orders_map = ws.orders.read().await;
3505
3506        // Filter by symbol when provided
3507        let mut orders: Vec<Order> = if let Some(sym) = symbol {
3508            orders_map
3509                .get(sym)
3510                .map(|symbol_orders| symbol_orders.values().cloned().collect())
3511                .unwrap_or_default()
3512        } else {
3513            orders_map
3514                .values()
3515                .flat_map(|symbol_orders| symbol_orders.values().cloned())
3516                .collect()
3517        };
3518
3519        // Apply optional `since` filter
3520        if let Some(since_ts) = since {
3521            orders.retain(|order| order.timestamp.is_some_and(|ts| ts >= since_ts));
3522        }
3523
3524        // Sort by timestamp descending
3525        orders.sort_by(|a, b| {
3526            let ts_a = a.timestamp.unwrap_or(0);
3527            let ts_b = b.timestamp.unwrap_or(0);
3528            ts_b.cmp(&ts_a)
3529        });
3530
3531        // Apply optional limit
3532        if let Some(lim) = limit {
3533            orders.truncate(lim);
3534        }
3535
3536        Ok(orders)
3537    }
3538
3539    /// Watches authenticated user trade updates
3540    ///
3541    /// # Arguments
3542    /// * `symbol` - Optional trading pair to filter (None subscribes to all)
3543    /// * `since` - Starting timestamp in milliseconds
3544    /// * `limit` - Maximum number of trades to return
3545    /// * `params` - Additional parameters
3546    ///
3547    /// # Returns
3548    /// List of trade records
3549    ///
3550    /// # Example
3551    /// ```rust,no_run
3552    /// use std::sync::Arc;
3553    /// use ccxt_exchanges::binance::Binance;
3554    /// use ccxt_core::ExchangeConfig;
3555    ///
3556    /// #[tokio::main]
3557    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
3558    ///     let mut config = ExchangeConfig::default();
3559    ///     config.api_key = Some("your-api-key".to_string());
3560    ///     config.secret = Some("your-secret".to_string());
3561    ///     let exchange = Arc::new(Binance::new(config)?);
3562    ///
3563    ///     // Subscribe to BTC/USDT trade updates
3564    ///     let trades = exchange.clone().watch_my_trades(Some("BTC/USDT"), None, None, None).await?;
3565    ///     println!("My trades: {:?}", trades);
3566    ///     Ok(())
3567    /// }
3568    /// ```
3569    pub async fn watch_my_trades(
3570        self: Arc<Self>,
3571        symbol: Option<&str>,
3572        since: Option<i64>,
3573        limit: Option<usize>,
3574        _params: Option<HashMap<String, Value>>,
3575    ) -> Result<Vec<Trade>> {
3576        const MAX_RETRIES: u32 = 100;
3577
3578        // Establish authenticated WebSocket connection
3579        let ws = self.create_authenticated_ws();
3580        ws.connect().await?;
3581
3582        // Process trade update messages
3583        let mut retries = 0;
3584
3585        while retries < MAX_RETRIES {
3586            if let Some(msg) = ws.client.receive().await {
3587                // Skip subscription acknowledgements
3588                if msg.get("result").is_some() || msg.get("id").is_some() {
3589                    continue;
3590                }
3591
3592                // Identify event type
3593                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3594                    // Handle executionReport events containing trade updates
3595                    if event_type == "executionReport" {
3596                        if let Ok(trade) = BinanceWs::parse_ws_trade(&msg) {
3597                            let symbol_key = trade.symbol.clone();
3598
3599                            // Update cached trades
3600                            let mut trades_map = ws.my_trades.write().await;
3601                            let symbol_trades =
3602                                trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
3603
3604                            // Prepend latest trade; bound cache to 1000 entries
3605                            symbol_trades.push_front(trade);
3606                            if symbol_trades.len() > 1000 {
3607                                symbol_trades.pop_back();
3608                            }
3609                        }
3610                    }
3611                }
3612            } else {
3613                tokio::time::sleep(Duration::from_millis(100)).await;
3614            }
3615
3616            retries += 1;
3617        }
3618
3619        // Filter and return personal trades
3620        ws.filter_my_trades(symbol, since, limit).await
3621    }
3622
3623    /// Watches authenticated futures position updates
3624    ///
3625    /// Receives ACCOUNT_UPDATE events via the user data stream to track changes to futures
3626    /// Supports both USD-margined (USD-M) and coin-margined (COIN-M) contracts.
3627    ///
3628    /// # Arguments
3629    /// * `symbols` - Optional list of symbols (None subscribes to all positions)
3630    /// * `since` - Optional starting timestamp
3631    /// * `limit` - Optional maximum number of positions to return
3632    /// * `params` - Optional parameters
3633    ///   - `type`: Market type (`future`/`delivery`, default `future`)
3634    ///   - `subType`: Subtype (`linear`/`inverse`)
3635    ///
3636    /// # Returns
3637    /// Collection of positions
3638    ///
3639    /// # Implementation Details
3640    /// 1. Subscribe to ACCOUNT_UPDATE events through the user data stream.
3641    /// 2. Parse the position data contained in the `P` array.
3642    /// 3. Update the internal position cache.
3643    /// 4. Filter results according to the provided arguments.
3644    ///
3645    /// # WebSocket Message Format
3646    /// ```json
3647    /// {
3648    ///   "e": "ACCOUNT_UPDATE",
3649    ///   "T": 1667881353112,
3650    ///   "E": 1667881353115,
3651    ///   "a": {
3652    ///     "P": [
3653    ///       {
3654    ///         "s": "BTCUSDT",
3655    ///         "pa": "-0.089",
3656    ///         "ep": "19700.03933",
3657    ///         "up": "1.53058860",
3658    ///         "mt": "isolated",
3659    ///         "ps": "BOTH"
3660    ///       }
3661    ///     ]
3662    ///   }
3663    /// }
3664    /// ```
3665    ///
3666    /// # Example
3667    /// ```rust,no_run
3668    /// # use ccxt_exchanges::binance::Binance;
3669    /// # use ccxt_core::ExchangeConfig;
3670    /// # use std::sync::Arc;
3671    /// # async fn example() -> ccxt_core::error::Result<()> {
3672    /// let exchange = Arc::new(Binance::new(ExchangeConfig::default())?);
3673    ///
3674    /// // Watch all positions
3675    /// let positions = exchange.clone().watch_positions(None, None, None, None).await?;
3676    /// for pos in positions {
3677    ///     println!("Symbol: {}, Side: {:?}, Contracts: {:?}",
3678    ///              pos.symbol, pos.side, pos.contracts);
3679    /// }
3680    ///
3681    /// // Watch a subset of symbols
3682    /// let symbols = vec!["BTC/USDT".to_string(), "ETH/USDT".to_string()];
3683    /// let positions = exchange.clone().watch_positions(Some(symbols), None, Some(10), None).await?;
3684    /// # Ok(())
3685    /// # }
3686    /// ```
3687    pub async fn watch_positions(
3688        self: Arc<Self>,
3689        symbols: Option<Vec<String>>,
3690        since: Option<i64>,
3691        limit: Option<usize>,
3692        _params: Option<HashMap<String, Value>>,
3693    ) -> Result<Vec<Position>> {
3694        const MAX_RETRIES: u32 = 100;
3695
3696        // Establish authenticated WebSocket connection
3697        let ws = self.create_authenticated_ws();
3698        ws.connect().await?;
3699
3700        // Process position update messages
3701        let mut retries = 0;
3702
3703        while retries < MAX_RETRIES {
3704            if let Some(msg) = ws.client.receive().await {
3705                // Skip subscription acknowledgement messages
3706                if msg.get("result").is_some() || msg.get("id").is_some() {
3707                    continue;
3708                }
3709
3710                // Handle ACCOUNT_UPDATE events only
3711                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3712                    if event_type == "ACCOUNT_UPDATE" {
3713                        if let Some(account_data) = msg.get("a") {
3714                            if let Some(positions_array) =
3715                                account_data.get("P").and_then(|p| p.as_array())
3716                            {
3717                                for position_data in positions_array {
3718                                    if let Ok(position) =
3719                                        BinanceWs::parse_ws_position(position_data)
3720                                    {
3721                                        let symbol_key = position.symbol.clone();
3722                                        let side_key = position
3723                                            .side
3724                                            .clone()
3725                                            .unwrap_or_else(|| "both".to_string());
3726
3727                                        // Update cached positions
3728                                        let mut positions_map = ws.positions.write().await;
3729                                        let symbol_positions = positions_map
3730                                            .entry(symbol_key)
3731                                            .or_insert_with(HashMap::new);
3732
3733                                        // Remove positions with effectively zero contracts
3734                                        if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
3735                                            symbol_positions.remove(&side_key);
3736                                            if symbol_positions.is_empty() {
3737                                                positions_map.remove(&position.symbol);
3738                                            }
3739                                        } else {
3740                                            symbol_positions.insert(side_key, position);
3741                                        }
3742                                    }
3743                                }
3744                            }
3745                        }
3746                    }
3747                }
3748            } else {
3749                tokio::time::sleep(Duration::from_millis(100)).await;
3750            }
3751
3752            retries += 1;
3753        }
3754
3755        // Filter and return positions
3756        let symbols_ref = symbols.as_deref();
3757        ws.filter_positions(symbols_ref, since, limit).await
3758    }
3759}
3760
3761#[cfg(test)]
3762mod tests {
3763    use super::*;
3764
3765    #[test]
3766    fn test_binance_ws_creation() {
3767        let ws = BinanceWs::new(WS_BASE_URL.to_string());
3768        // Basic creation test: ensure the listen key lock is accessible
3769        assert!(ws.listen_key.try_read().is_ok());
3770    }
3771
3772    #[test]
3773    fn test_stream_format() {
3774        let symbol = "btcusdt";
3775
3776        // Ticker stream format
3777        let ticker_stream = format!("{}@ticker", symbol);
3778        assert_eq!(ticker_stream, "btcusdt@ticker");
3779
3780        // Trade stream format
3781        let trade_stream = format!("{}@trade", symbol);
3782        assert_eq!(trade_stream, "btcusdt@trade");
3783
3784        // Depth stream format
3785        let depth_stream = format!("{}@depth20", symbol);
3786        assert_eq!(depth_stream, "btcusdt@depth20");
3787
3788        // Kline stream format
3789        let kline_stream = format!("{}@kline_1m", symbol);
3790        assert_eq!(kline_stream, "btcusdt@kline_1m");
3791    }
3792
3793    #[tokio::test]
3794    async fn test_subscription_manager_basic() {
3795        let manager = SubscriptionManager::new();
3796        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
3797
3798        // Validate initial state
3799        assert_eq!(manager.active_count(), 0);
3800        assert!(!manager.has_subscription("btcusdt@ticker").await);
3801
3802        // Add a subscription
3803        manager
3804            .add_subscription(
3805                "btcusdt@ticker".to_string(),
3806                "BTCUSDT".to_string(),
3807                SubscriptionType::Ticker,
3808                tx.clone(),
3809            )
3810            .await
3811            .unwrap();
3812
3813        assert_eq!(manager.active_count(), 1);
3814        assert!(manager.has_subscription("btcusdt@ticker").await);
3815
3816        // Retrieve subscription
3817        let sub = manager.get_subscription("btcusdt@ticker").await;
3818        assert!(sub.is_some());
3819        let sub = sub.unwrap();
3820        assert_eq!(sub.stream, "btcusdt@ticker");
3821        assert_eq!(sub.symbol, "BTCUSDT");
3822        assert_eq!(sub.sub_type, SubscriptionType::Ticker);
3823
3824        // Remove subscription
3825        manager.remove_subscription("btcusdt@ticker").await.unwrap();
3826        assert_eq!(manager.active_count(), 0);
3827        assert!(!manager.has_subscription("btcusdt@ticker").await);
3828    }
3829
3830    #[tokio::test]
3831    async fn test_subscription_manager_multiple() {
3832        let manager = SubscriptionManager::new();
3833        let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
3834        let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
3835        let (tx3, _rx3) = tokio::sync::mpsc::unbounded_channel();
3836
3837        // Add multiple subscriptions
3838        manager
3839            .add_subscription(
3840                "btcusdt@ticker".to_string(),
3841                "BTCUSDT".to_string(),
3842                SubscriptionType::Ticker,
3843                tx1,
3844            )
3845            .await
3846            .unwrap();
3847
3848        manager
3849            .add_subscription(
3850                "btcusdt@depth".to_string(),
3851                "BTCUSDT".to_string(),
3852                SubscriptionType::OrderBook,
3853                tx2,
3854            )
3855            .await
3856            .unwrap();
3857
3858        manager
3859            .add_subscription(
3860                "ethusdt@ticker".to_string(),
3861                "ETHUSDT".to_string(),
3862                SubscriptionType::Ticker,
3863                tx3,
3864            )
3865            .await
3866            .unwrap();
3867
3868        assert_eq!(manager.active_count(), 3);
3869
3870        // Query by symbol
3871        let btc_subs = manager.get_subscriptions_by_symbol("BTCUSDT").await;
3872        assert_eq!(btc_subs.len(), 2);
3873
3874        let eth_subs = manager.get_subscriptions_by_symbol("ETHUSDT").await;
3875        assert_eq!(eth_subs.len(), 1);
3876
3877        // Retrieve all subscriptions
3878        let all_subs = manager.get_all_subscriptions().await;
3879        assert_eq!(all_subs.len(), 3);
3880
3881        // Clear all subscriptions
3882        manager.clear().await;
3883        assert_eq!(manager.active_count(), 0);
3884    }
3885
3886    #[tokio::test]
3887    async fn test_subscription_type_from_stream() {
3888        // Ticker stream
3889        let sub_type = SubscriptionType::from_stream("btcusdt@ticker");
3890        assert_eq!(sub_type, Some(SubscriptionType::Ticker));
3891
3892        // Order book streams
3893        let sub_type = SubscriptionType::from_stream("btcusdt@depth");
3894        assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3895
3896        let sub_type = SubscriptionType::from_stream("btcusdt@depth@100ms");
3897        assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3898
3899        // Trade streams
3900        let sub_type = SubscriptionType::from_stream("btcusdt@trade");
3901        assert_eq!(sub_type, Some(SubscriptionType::Trades));
3902
3903        let sub_type = SubscriptionType::from_stream("btcusdt@aggTrade");
3904        assert_eq!(sub_type, Some(SubscriptionType::Trades));
3905
3906        // Kline streams
3907        let sub_type = SubscriptionType::from_stream("btcusdt@kline_1m");
3908        assert_eq!(sub_type, Some(SubscriptionType::Kline("1m".to_string())));
3909
3910        let sub_type = SubscriptionType::from_stream("btcusdt@kline_1h");
3911        assert_eq!(sub_type, Some(SubscriptionType::Kline("1h".to_string())));
3912
3913        // Mark price stream
3914        let sub_type = SubscriptionType::from_stream("btcusdt@markPrice");
3915        assert_eq!(sub_type, Some(SubscriptionType::MarkPrice));
3916
3917        // Book ticker stream
3918        let sub_type = SubscriptionType::from_stream("btcusdt@bookTicker");
3919        assert_eq!(sub_type, Some(SubscriptionType::BookTicker));
3920
3921        // Unknown stream
3922        let sub_type = SubscriptionType::from_stream("btcusdt@unknown");
3923        assert_eq!(sub_type, None);
3924    }
3925
3926    #[tokio::test]
3927    async fn test_subscription_send_message() {
3928        let manager = SubscriptionManager::new();
3929        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
3930
3931        // Add subscription
3932        manager
3933            .add_subscription(
3934                "btcusdt@ticker".to_string(),
3935                "BTCUSDT".to_string(),
3936                SubscriptionType::Ticker,
3937                tx,
3938            )
3939            .await
3940            .unwrap();
3941
3942        // Send message to stream
3943        let test_msg = serde_json::json!({
3944            "e": "24hrTicker",
3945            "s": "BTCUSDT",
3946            "c": "50000"
3947        });
3948
3949        let sent = manager
3950            .send_to_stream("btcusdt@ticker", test_msg.clone())
3951            .await;
3952        assert!(sent);
3953
3954        // Receive message
3955        let received = rx.recv().await;
3956        assert!(received.is_some());
3957        assert_eq!(received.unwrap(), test_msg);
3958    }
3959
3960    #[tokio::test]
3961    async fn test_subscription_send_to_symbol() {
3962        let manager = SubscriptionManager::new();
3963        let (tx1, mut rx1) = tokio::sync::mpsc::unbounded_channel();
3964        let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel();
3965
3966        // Add two subscriptions for the same symbol
3967        manager
3968            .add_subscription(
3969                "btcusdt@ticker".to_string(),
3970                "BTCUSDT".to_string(),
3971                SubscriptionType::Ticker,
3972                tx1,
3973            )
3974            .await
3975            .unwrap();
3976
3977        manager
3978            .add_subscription(
3979                "btcusdt@depth".to_string(),
3980                "BTCUSDT".to_string(),
3981                SubscriptionType::OrderBook,
3982                tx2,
3983            )
3984            .await
3985            .unwrap();
3986
3987        // Send a message to all subscriptions for the symbol
3988        let test_msg = serde_json::json!({
3989            "s": "BTCUSDT",
3990            "data": "test"
3991        });
3992
3993        let sent_count = manager.send_to_symbol("BTCUSDT", &test_msg).await;
3994        assert_eq!(sent_count, 2);
3995
3996        // Receive messages
3997        let received1 = rx1.recv().await;
3998        assert!(received1.is_some());
3999        assert_eq!(received1.unwrap(), test_msg);
4000
4001        let received2 = rx2.recv().await;
4002        assert!(received2.is_some());
4003        assert_eq!(received2.unwrap(), test_msg);
4004    }
4005
4006    #[test]
4007    fn test_symbol_conversion() {
4008        let symbol = "BTC/USDT";
4009        let binance_symbol = symbol.replace('/', "").to_lowercase();
4010        assert_eq!(binance_symbol, "btcusdt");
4011    }
4012
4013    // ==================== MessageRouter tests ====================
4014
4015    #[test]
4016    fn test_reconnect_config_default() {
4017        let config = ReconnectConfig::default();
4018
4019        assert!(config.enabled);
4020        assert_eq!(config.initial_delay_ms, 1000);
4021        assert_eq!(config.max_delay_ms, 30000);
4022        assert_eq!(config.backoff_multiplier, 2.0);
4023        assert_eq!(config.max_attempts, 0); // Unlimited retries
4024    }
4025
4026    #[test]
4027    fn test_reconnect_config_calculate_delay() {
4028        let config = ReconnectConfig::default();
4029
4030        // Exponential backoff tests
4031        assert_eq!(config.calculate_delay(0), 1000); // 1s
4032        assert_eq!(config.calculate_delay(1), 2000); // 2s
4033        assert_eq!(config.calculate_delay(2), 4000); // 4s
4034        assert_eq!(config.calculate_delay(3), 8000); // 8s
4035        assert_eq!(config.calculate_delay(4), 16000); // 16s
4036        assert_eq!(config.calculate_delay(5), 30000); // 30s (capped at max)
4037        assert_eq!(config.calculate_delay(6), 30000); // 30s (remains at max)
4038    }
4039
4040    #[test]
4041    fn test_reconnect_config_should_retry() {
4042        let mut config = ReconnectConfig::default();
4043
4044        // Unlimited retries
4045        assert!(config.should_retry(0));
4046        assert!(config.should_retry(10));
4047        assert!(config.should_retry(100));
4048
4049        // Finite retries
4050        config.max_attempts = 3;
4051        assert!(config.should_retry(0));
4052        assert!(config.should_retry(1));
4053        assert!(config.should_retry(2));
4054        assert!(!config.should_retry(3));
4055        assert!(!config.should_retry(4));
4056
4057        // Disabled retries
4058        config.enabled = false;
4059        assert!(!config.should_retry(0));
4060        assert!(!config.should_retry(1));
4061    }
4062
4063    #[test]
4064    fn test_message_router_extract_stream_name_combined() {
4065        // Combined stream format
4066        let message = serde_json::json!({
4067            "stream": "btcusdt@ticker",
4068            "data": {
4069                "e": "24hrTicker",
4070                "s": "BTCUSDT"
4071            }
4072        });
4073
4074        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4075        assert_eq!(stream_name, "btcusdt@ticker");
4076    }
4077
4078    #[test]
4079    fn test_message_router_extract_stream_name_ticker() {
4080        // Ticker single stream format
4081        let message = serde_json::json!({
4082            "e": "24hrTicker",
4083            "s": "BTCUSDT",
4084            "E": 1672531200000_u64,
4085            "c": "16950.00",
4086            "h": "17100.00"
4087        });
4088
4089        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4090        assert_eq!(stream_name, "btcusdt@ticker");
4091    }
4092
4093    #[test]
4094    fn test_message_router_extract_stream_name_depth() {
4095        // Depth single stream format
4096        let message = serde_json::json!({
4097            "e": "depthUpdate",
4098            "s": "ETHUSDT",
4099            "E": 1672531200000_u64,
4100            "U": 157,
4101            "u": 160
4102        });
4103
4104        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4105        assert_eq!(stream_name, "ethusdt@depth");
4106    }
4107
4108    #[test]
4109    fn test_message_router_extract_stream_name_trade() {
4110        // Test trade single-stream format
4111        let message = serde_json::json!({
4112            "e": "trade",
4113            "s": "BNBUSDT",
4114            "E": 1672531200000_u64,
4115            "t": 12345
4116        });
4117
4118        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4119        assert_eq!(stream_name, "bnbusdt@trade");
4120    }
4121
4122    #[test]
4123    fn test_message_router_extract_stream_name_kline() {
4124        // Kline single stream format
4125        let message = serde_json::json!({
4126            "e": "kline",
4127            "s": "BTCUSDT",
4128            "E": 1672531200000_u64,
4129            "k": {
4130                "i": "1m",
4131                "t": 1672531200000_u64,
4132                "o": "16950.00"
4133            }
4134        });
4135
4136        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4137        assert_eq!(stream_name, "btcusdt@kline_1m");
4138    }
4139
4140    #[test]
4141    fn test_message_router_extract_stream_name_mark_price() {
4142        // Mark price single stream format
4143        let message = serde_json::json!({
4144            "e": "markPriceUpdate",
4145            "s": "BTCUSDT",
4146            "E": 1672531200000_u64,
4147            "p": "16950.00"
4148        });
4149
4150        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4151        assert_eq!(stream_name, "btcusdt@markPrice");
4152    }
4153
4154    #[test]
4155    fn test_message_router_extract_stream_name_book_ticker() {
4156        // Book ticker single stream format
4157        let message = serde_json::json!({
4158            "e": "bookTicker",
4159            "s": "ETHUSDT",
4160            "E": 1672531200000_u64,
4161            "b": "1200.00",
4162            "a": "1200.50"
4163        });
4164
4165        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4166        assert_eq!(stream_name, "ethusdt@bookTicker");
4167    }
4168
4169    #[test]
4170    fn test_message_router_extract_stream_name_subscription_response() {
4171        // Subscription response should yield an error
4172        let message = serde_json::json!({
4173            "result": null,
4174            "id": 1
4175        });
4176
4177        let result = MessageRouter::extract_stream_name(&message);
4178        assert!(result.is_err());
4179    }
4180
4181    #[test]
4182    fn test_message_router_extract_stream_name_error_response() {
4183        // Error responses should yield an error
4184        let message = serde_json::json!({
4185            "error": {
4186                "code": -1,
4187                "msg": "Invalid request"
4188            },
4189            "id": 1
4190        });
4191
4192        let result = MessageRouter::extract_stream_name(&message);
4193        assert!(result.is_err());
4194    }
4195
4196    #[test]
4197    fn test_message_router_extract_stream_name_invalid() {
4198        // Invalid message format
4199        let message = serde_json::json!({
4200            "unknown": "data"
4201        });
4202
4203        let result = MessageRouter::extract_stream_name(&message);
4204        assert!(result.is_err());
4205    }
4206
4207    #[tokio::test]
4208    async fn test_message_router_creation() {
4209        let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4210        let subscription_manager = Arc::new(SubscriptionManager::new());
4211
4212        let router = MessageRouter::new(ws_url.clone(), subscription_manager);
4213
4214        // Validate initial state
4215        assert!(!router.is_connected());
4216        assert_eq!(router.ws_url, ws_url);
4217    }
4218
4219    #[tokio::test]
4220    async fn test_message_router_reconnect_config() {
4221        let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4222        let subscription_manager = Arc::new(SubscriptionManager::new());
4223
4224        let router = MessageRouter::new(ws_url, subscription_manager);
4225
4226        // Default configuration
4227        let config = router.get_reconnect_config().await;
4228        assert!(config.enabled);
4229        assert_eq!(config.initial_delay_ms, 1000);
4230
4231        // Setting new configuration
4232        let new_config = ReconnectConfig {
4233            enabled: false,
4234            initial_delay_ms: 2000,
4235            max_delay_ms: 60000,
4236            backoff_multiplier: 1.5,
4237            max_attempts: 5,
4238        };
4239
4240        router.set_reconnect_config(new_config.clone()).await;
4241
4242        let updated_config = router.get_reconnect_config().await;
4243        assert!(!updated_config.enabled);
4244        assert_eq!(updated_config.initial_delay_ms, 2000);
4245        assert_eq!(updated_config.max_delay_ms, 60000);
4246        assert_eq!(updated_config.backoff_multiplier, 1.5);
4247        assert_eq!(updated_config.max_attempts, 5);
4248    }
4249}