ccxt_exchanges/binance/
ws.rs

1//! Binance WebSocket implementation
2//!
3//! Provides WebSocket real-time data stream subscriptions for the Binance exchange
4
5use crate::binance::Binance;
6use crate::binance::parser;
7use ccxt_core::error::{Error, Result};
8use ccxt_core::types::financial::{Amount, Cost, Price};
9use ccxt_core::types::{
10    Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
11};
12use ccxt_core::ws_client::{WsClient, WsConfig};
13use serde_json::Value;
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock};
18use tokio::task::JoinHandle;
19use tokio_tungstenite::tungstenite::protocol::Message;
20
21/// Binance WebSocket endpoints
22#[allow(dead_code)]
23const WS_BASE_URL: &str = "wss://stream.binance.com:9443/ws";
24#[allow(dead_code)]
25const WS_TESTNET_URL: &str = "wss://testnet.binance.vision/ws";
26
27/// Listen key refresh interval (30 minutes)
28const LISTEN_KEY_REFRESH_INTERVAL: Duration = Duration::from_secs(30 * 60);
29
30/// Listen key manager
31///
32/// Automatically manages Binance user data stream listen keys by:
33/// - Creating and caching listen keys
34/// - Refreshing them every 30 minutes
35/// - Detecting expiration and rebuilding
36/// - Tracking connection state
37pub struct ListenKeyManager {
38    /// Reference to the Binance instance
39    binance: Arc<Binance>,
40    /// Currently active listen key
41    listen_key: Arc<RwLock<Option<String>>>,
42    /// Listen key creation timestamp
43    created_at: Arc<RwLock<Option<Instant>>>,
44    /// Configured refresh interval
45    refresh_interval: Duration,
46    /// Handle to the auto-refresh task
47    refresh_task: Arc<Mutex<Option<JoinHandle<()>>>>,
48}
49
50impl ListenKeyManager {
51    /// Creates a new `ListenKeyManager`
52    ///
53    /// # Arguments
54    /// * `binance` - Reference-counted Binance instance
55    ///
56    /// # Returns
57    /// Configured `ListenKeyManager`
58    pub fn new(binance: Arc<Binance>) -> Self {
59        Self {
60            binance,
61            listen_key: Arc::new(RwLock::new(None)),
62            created_at: Arc::new(RwLock::new(None)),
63            refresh_interval: LISTEN_KEY_REFRESH_INTERVAL,
64            refresh_task: Arc::new(Mutex::new(None)),
65        }
66    }
67
68    /// Retrieves or creates a listen key
69    ///
70    /// Returns an existing valid listen key when present; otherwise creates a new one.
71    ///
72    /// # Returns
73    /// Listen key string
74    ///
75    /// # Errors
76    /// - Failed to create the listen key
77    /// - Missing API credentials
78    pub async fn get_or_create(&self) -> Result<String> {
79        // Check if a listen key already exists
80        let key_opt = self.listen_key.read().await.clone();
81
82        if let Some(key) = key_opt {
83            // Check whether the key needs to be refreshed
84            let created = self.created_at.read().await;
85            if let Some(created_time) = *created {
86                let elapsed = created_time.elapsed();
87                // If more than 50 minutes have elapsed, create a new key
88                if elapsed > Duration::from_secs(50 * 60) {
89                    drop(created);
90                    return self.create_new().await;
91                }
92            }
93            return Ok(key);
94        }
95
96        // Create a new listen key
97        self.create_new().await
98    }
99
100    /// Creates a new listen key
101    ///
102    /// # Returns
103    /// Newly created listen key
104    async fn create_new(&self) -> Result<String> {
105        let key = self.binance.create_listen_key().await?;
106
107        // Update cache
108        *self.listen_key.write().await = Some(key.clone());
109        *self.created_at.write().await = Some(Instant::now());
110
111        Ok(key)
112    }
113
114    /// Refreshes the current listen key
115    ///
116    /// Extends the listen key lifetime by 60 minutes
117    ///
118    /// # Returns
119    /// Refresh result
120    pub async fn refresh(&self) -> Result<()> {
121        let key_opt = self.listen_key.read().await.clone();
122
123        if let Some(key) = key_opt {
124            self.binance.refresh_listen_key(&key).await?;
125            // Update the creation timestamp
126            *self.created_at.write().await = Some(Instant::now());
127            Ok(())
128        } else {
129            Err(Error::invalid_request("No listen key to refresh"))
130        }
131    }
132
133    /// Starts the auto-refresh task
134    ///
135    /// Refreshes the listen key every 30 minutes
136    pub async fn start_auto_refresh(&self) {
137        // Stop any existing task
138        self.stop_auto_refresh().await;
139
140        let listen_key = self.listen_key.clone();
141        let created_at = self.created_at.clone();
142        let binance = self.binance.clone();
143        let interval = self.refresh_interval;
144
145        let handle = tokio::spawn(async move {
146            loop {
147                tokio::time::sleep(interval).await;
148
149                // Check whether a listen key exists
150                let key_opt = listen_key.read().await.clone();
151                if let Some(key) = key_opt {
152                    // Attempt to refresh the key
153                    match binance.refresh_listen_key(&key).await {
154                        Ok(_) => {
155                            *created_at.write().await = Some(Instant::now());
156                            // Listen key refreshed successfully
157                        }
158                        Err(_e) => {
159                            // Failed to refresh; clear cache so a new key will be created next time
160                            *listen_key.write().await = None;
161                            *created_at.write().await = None;
162                            break;
163                        }
164                    }
165                } else {
166                    // No listen key available; stop the task
167                    break;
168                }
169            }
170        });
171
172        *self.refresh_task.lock().await = Some(handle);
173    }
174
175    /// Stops the auto-refresh task
176    pub async fn stop_auto_refresh(&self) {
177        let mut task_opt = self.refresh_task.lock().await;
178        if let Some(handle) = task_opt.take() {
179            handle.abort();
180        }
181    }
182
183    /// Deletes the listen key
184    ///
185    /// Closes the user data stream and invalidates the key
186    ///
187    /// # Returns
188    /// Result of the deletion
189    pub async fn delete(&self) -> Result<()> {
190        // Stop the auto-refresh first
191        self.stop_auto_refresh().await;
192
193        let key_opt = self.listen_key.read().await.clone();
194
195        if let Some(key) = key_opt {
196            self.binance.delete_listen_key(&key).await?;
197
198            // Clear cached state
199            *self.listen_key.write().await = None;
200            *self.created_at.write().await = None;
201
202            Ok(())
203        } else {
204            Ok(()) // No listen key; treat as success
205        }
206    }
207
208    /// Returns the current listen key when available
209    pub async fn get_current(&self) -> Option<String> {
210        self.listen_key.read().await.clone()
211    }
212
213    /// Checks whether the listen key is still valid
214    pub async fn is_valid(&self) -> bool {
215        let key_opt = self.listen_key.read().await;
216        if key_opt.is_none() {
217            return false;
218        }
219
220        let created = self.created_at.read().await;
221        if let Some(created_time) = *created {
222            // Key is considered valid if less than 55 minutes old
223            created_time.elapsed() < Duration::from_secs(55 * 60)
224        } else {
225            false
226        }
227    }
228}
229
230impl Drop for ListenKeyManager {
231    fn drop(&mut self) {
232        // Note: Drop is synchronous, so we cannot await asynchronous operations here.
233        // Callers should explicitly invoke `delete()` to release resources.
234    }
235}
236/// Subscription types supported by the Binance WebSocket API.
237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum SubscriptionType {
239    /// 24-hour ticker stream
240    Ticker,
241    /// Order book depth stream
242    OrderBook,
243    /// Real-time trade stream
244    Trades,
245    /// Kline (candlestick) stream with interval (e.g. "1m", "5m", "1h")
246    Kline(String),
247    /// Account balance stream
248    Balance,
249    /// Order update stream
250    Orders,
251    /// Position update stream
252    Positions,
253    /// Personal trade execution stream
254    MyTrades,
255    /// Mark price stream
256    MarkPrice,
257    /// Book ticker (best bid/ask) stream
258    BookTicker,
259}
260
261impl SubscriptionType {
262    /// Infers a subscription type from a stream name
263    ///
264    /// # Arguments
265    /// * `stream` - Stream identifier (e.g. "btcusdt@ticker")
266    ///
267    /// # Returns
268    /// Subscription type, when the stream can be identified
269    pub fn from_stream(stream: &str) -> Option<Self> {
270        if stream.contains("@ticker") {
271            Some(Self::Ticker)
272        } else if stream.contains("@depth") {
273            Some(Self::OrderBook)
274        } else if stream.contains("@trade") || stream.contains("@aggTrade") {
275            Some(Self::Trades)
276        } else if stream.contains("@kline_") {
277            // Extract timeframe suffix
278            let parts: Vec<&str> = stream.split("@kline_").collect();
279            if parts.len() == 2 {
280                Some(Self::Kline(parts[1].to_string()))
281            } else {
282                None
283            }
284        } else if stream.contains("@markPrice") {
285            Some(Self::MarkPrice)
286        } else if stream.contains("@bookTicker") {
287            Some(Self::BookTicker)
288        } else {
289            None
290        }
291    }
292}
293
294/// Subscription metadata
295#[derive(Clone)]
296pub struct Subscription {
297    /// Stream name (e.g. "btcusdt@ticker")
298    pub stream: String,
299    /// Normalized trading symbol (e.g. "BTCUSDT")
300    pub symbol: String,
301    /// Subscription type descriptor
302    pub sub_type: SubscriptionType,
303    /// Timestamp when the subscription was created
304    pub subscribed_at: Instant,
305    /// Sender for forwarding WebSocket messages to consumers
306    pub sender: tokio::sync::mpsc::UnboundedSender<Value>,
307}
308
309impl Subscription {
310    /// Creates a new subscription with the provided parameters
311    pub fn new(
312        stream: String,
313        symbol: String,
314        sub_type: SubscriptionType,
315        sender: tokio::sync::mpsc::UnboundedSender<Value>,
316    ) -> Self {
317        Self {
318            stream,
319            symbol,
320            sub_type,
321            subscribed_at: Instant::now(),
322            sender,
323        }
324    }
325
326    /// Sends a message to the subscriber
327    ///
328    /// # Arguments
329    /// * `message` - WebSocket payload to forward
330    ///
331    /// # Returns
332    /// Whether the message was delivered successfully
333    pub fn send(&self, message: Value) -> bool {
334        self.sender.send(message).is_ok()
335    }
336}
337
338/// Subscription manager
339///
340/// Manages the lifecycle of all WebSocket subscriptions, including:
341/// - Adding and removing subscriptions
342/// - Querying and validating subscriptions
343/// - Tracking the number of active subscriptions
344pub struct SubscriptionManager {
345    /// Mapping of `stream_name -> Subscription`
346    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
347    /// Index by symbol: `symbol -> Vec<stream_name>`
348    symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
349    /// Counter of active subscriptions
350    active_count: Arc<std::sync::atomic::AtomicUsize>,
351}
352
353impl SubscriptionManager {
354    /// Creates a new `SubscriptionManager`
355    pub fn new() -> Self {
356        Self {
357            subscriptions: Arc::new(RwLock::new(HashMap::new())),
358            symbol_index: Arc::new(RwLock::new(HashMap::new())),
359            active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
360        }
361    }
362
363    /// Adds a subscription to the manager
364    ///
365    /// # Arguments
366    /// * `stream` - Stream identifier (e.g. "btcusdt@ticker")
367    /// * `symbol` - Normalized trading symbol (e.g. "BTCUSDT")
368    /// * `sub_type` - Subscription classification
369    /// * `sender` - Channel for dispatching messages
370    ///
371    /// # Returns
372    /// Result of the insertion
373    pub async fn add_subscription(
374        &self,
375        stream: String,
376        symbol: String,
377        sub_type: SubscriptionType,
378        sender: tokio::sync::mpsc::UnboundedSender<Value>,
379    ) -> Result<()> {
380        let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
381
382        // Insert into the subscription map
383        let mut subs = self.subscriptions.write().await;
384        subs.insert(stream.clone(), subscription);
385
386        // Update the per-symbol index
387        let mut index = self.symbol_index.write().await;
388        index.entry(symbol).or_insert_with(Vec::new).push(stream);
389
390        // Increment the active subscription count
391        self.active_count
392            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
393
394        Ok(())
395    }
396
397    /// Removes a subscription by stream name
398    ///
399    /// # Arguments
400    /// * `stream` - Stream identifier to remove
401    ///
402    /// # Returns
403    /// Result of the removal
404    pub async fn remove_subscription(&self, stream: &str) -> Result<()> {
405        let mut subs = self.subscriptions.write().await;
406
407        if let Some(subscription) = subs.remove(stream) {
408            // Remove from the symbol index as well
409            let mut index = self.symbol_index.write().await;
410            if let Some(streams) = index.get_mut(&subscription.symbol) {
411                streams.retain(|s| s != stream);
412                // Drop the symbol entry when no subscriptions remain
413                if streams.is_empty() {
414                    index.remove(&subscription.symbol);
415                }
416            }
417
418            // Decrement the active subscription counter
419            self.active_count
420                .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
421        }
422
423        Ok(())
424    }
425
426    /// Retrieves a subscription by stream name
427    ///
428    /// # Arguments
429    /// * `stream` - Stream identifier
430    ///
431    /// # Returns
432    /// Optional subscription record
433    pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
434        let subs = self.subscriptions.read().await;
435        subs.get(stream).cloned()
436    }
437
438    /// Checks whether a subscription exists for the given stream
439    ///
440    /// # Arguments
441    /// * `stream` - Stream identifier
442    ///
443    /// # Returns
444    /// `true` if the subscription exists, otherwise `false`
445    pub async fn has_subscription(&self, stream: &str) -> bool {
446        let subs = self.subscriptions.read().await;
447        subs.contains_key(stream)
448    }
449
450    /// Returns all registered subscriptions
451    pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
452        let subs = self.subscriptions.read().await;
453        subs.values().cloned().collect()
454    }
455
456    /// Returns all subscriptions associated with a symbol
457    ///
458    /// # Arguments
459    /// * `symbol` - Trading symbol
460    pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
461        let index = self.symbol_index.read().await;
462        let subs = self.subscriptions.read().await;
463
464        if let Some(streams) = index.get(symbol) {
465            streams
466                .iter()
467                .filter_map(|stream| subs.get(stream).cloned())
468                .collect()
469        } else {
470            Vec::new()
471        }
472    }
473
474    /// Returns the number of active subscriptions
475    pub fn active_count(&self) -> usize {
476        self.active_count.load(std::sync::atomic::Ordering::SeqCst)
477    }
478
479    /// Removes all subscriptions and clears indexes
480    pub async fn clear(&self) {
481        let mut subs = self.subscriptions.write().await;
482        let mut index = self.symbol_index.write().await;
483
484        subs.clear();
485        index.clear();
486        self.active_count
487            .store(0, std::sync::atomic::Ordering::SeqCst);
488    }
489
490    /// Sends a message to subscribers of a specific stream
491    ///
492    /// # Arguments
493    /// * `stream` - Stream identifier
494    /// * `message` - WebSocket payload to broadcast
495    ///
496    /// # Returns
497    /// `true` when the message was delivered, otherwise `false`
498    pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
499        let subs = self.subscriptions.read().await;
500        if let Some(subscription) = subs.get(stream) {
501            subscription.send(message)
502        } else {
503            false
504        }
505    }
506
507    /// Sends a message to all subscribers of a symbol
508    ///
509    /// # Arguments
510    /// * `symbol` - Trading symbol
511    /// * `message` - Shared WebSocket payload
512    ///
513    /// # Returns
514    /// Number of subscribers that received the message
515    pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
516        let index = self.symbol_index.read().await;
517        let subs = self.subscriptions.read().await;
518
519        let mut sent_count = 0;
520
521        if let Some(streams) = index.get(symbol) {
522            for stream in streams {
523                if let Some(subscription) = subs.get(stream) {
524                    if subscription.send(message.clone()) {
525                        sent_count += 1;
526                    }
527                }
528            }
529        }
530
531        sent_count
532    }
533}
534/// Reconnect configuration
535///
536/// Defines the automatic reconnection strategy after a WebSocket disconnect
537#[derive(Debug, Clone)]
538pub struct ReconnectConfig {
539    /// Enables or disables automatic reconnection
540    pub enabled: bool,
541
542    /// Initial reconnection delay in milliseconds
543    pub initial_delay_ms: u64,
544
545    /// Maximum reconnection delay in milliseconds
546    pub max_delay_ms: u64,
547
548    /// Exponential backoff multiplier
549    pub backoff_multiplier: f64,
550
551    /// Maximum number of reconnection attempts (0 means unlimited)
552    pub max_attempts: usize,
553}
554
555impl Default for ReconnectConfig {
556    fn default() -> Self {
557        Self {
558            enabled: true,
559            initial_delay_ms: 1000,  // 1 second
560            max_delay_ms: 30000,     // 30 seconds
561            backoff_multiplier: 2.0, // Exponential backoff
562            max_attempts: 0,         // Unlimited retries
563        }
564    }
565}
566
567impl ReconnectConfig {
568    /// Calculates the reconnection delay
569    ///
570    /// Uses exponential backoff to determine the delay duration
571    ///
572    /// # Arguments
573    /// * `attempt` - Current reconnection attempt (zero-based)
574    ///
575    /// # Returns
576    /// Delay duration in milliseconds
577    pub fn calculate_delay(&self, attempt: usize) -> u64 {
578        let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
579        delay.min(self.max_delay_ms as f64) as u64
580    }
581
582    /// Determines whether another reconnection attempt should be made
583    ///
584    /// # Arguments
585    /// * `attempt` - Current reconnection attempt
586    ///
587    /// # Returns
588    /// `true` if another retry should be attempted
589    pub fn should_retry(&self, attempt: usize) -> bool {
590        self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
591    }
592}
593
594/// Message router
595///
596/// Handles reception, parsing, and dispatching of WebSocket messages. Core responsibilities:
597/// - Managing WebSocket connections
598/// - Receiving and routing messages
599/// - Coordinating automatic reconnection
600/// - Managing subscriptions
601pub struct MessageRouter {
602    /// WebSocket client instance
603    ws_client: Arc<RwLock<Option<WsClient>>>,
604
605    /// Subscription manager registry
606    subscription_manager: Arc<SubscriptionManager>,
607
608    /// Handle to the background routing task
609    router_task: Arc<Mutex<Option<JoinHandle<()>>>>,
610
611    /// Connection state flag
612    is_connected: Arc<std::sync::atomic::AtomicBool>,
613
614    /// Configuration for reconnection behavior
615    reconnect_config: Arc<RwLock<ReconnectConfig>>,
616
617    /// WebSocket endpoint URL
618    ws_url: String,
619
620    /// Request ID counter (used for subscribe/unsubscribe)
621    request_id: Arc<std::sync::atomic::AtomicU64>,
622}
623
624impl MessageRouter {
625    /// Creates a new message router
626    ///
627    /// # Arguments
628    /// * `ws_url` - WebSocket connection URL
629    /// * `subscription_manager` - Subscription manager handle
630    ///
631    /// # Returns
632    /// Configured router instance
633    pub fn new(ws_url: String, subscription_manager: Arc<SubscriptionManager>) -> Self {
634        Self {
635            ws_client: Arc::new(RwLock::new(None)),
636            subscription_manager,
637            router_task: Arc::new(Mutex::new(None)),
638            is_connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
639            reconnect_config: Arc::new(RwLock::new(ReconnectConfig::default())),
640            ws_url,
641            request_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
642        }
643    }
644
645    /// Starts the message router
646    ///
647    /// Establishes the WebSocket connection and launches the message loop
648    ///
649    /// # Returns
650    /// Result of the startup sequence
651    pub async fn start(&self) -> Result<()> {
652        // Stop any running instance before starting again
653        if self.is_connected() {
654            self.stop().await?;
655        }
656
657        // Establish a new WebSocket connection
658        let config = WsConfig {
659            url: self.ws_url.clone(),
660            ..Default::default()
661        };
662        let client = WsClient::new(config);
663        client.connect().await?;
664
665        // Store the client handle
666        *self.ws_client.write().await = Some(client);
667
668        // Update connection state
669        self.is_connected
670            .store(true, std::sync::atomic::Ordering::SeqCst);
671
672        // Spawn the message processing loop
673        let ws_client = self.ws_client.clone();
674        let subscription_manager = self.subscription_manager.clone();
675        let is_connected = self.is_connected.clone();
676        let reconnect_config = self.reconnect_config.clone();
677        let ws_url = self.ws_url.clone();
678
679        let handle = tokio::spawn(async move {
680            Self::message_loop(
681                ws_client,
682                subscription_manager,
683                is_connected,
684                reconnect_config,
685                ws_url,
686            )
687            .await
688        });
689
690        *self.router_task.lock().await = Some(handle);
691
692        Ok(())
693    }
694
695    /// Stops the message router
696    ///
697    /// Terminates the message loop and disconnects the WebSocket client
698    ///
699    /// # Returns
700    /// Result of the shutdown procedure
701    pub async fn stop(&self) -> Result<()> {
702        // Mark the connection as inactive
703        self.is_connected
704            .store(false, std::sync::atomic::Ordering::SeqCst);
705
706        // Cancel the background routing task
707        let mut task_opt = self.router_task.lock().await;
708        if let Some(handle) = task_opt.take() {
709            handle.abort();
710        }
711
712        // Disconnect the WebSocket client if present
713        let mut client_opt = self.ws_client.write().await;
714        if let Some(client) = client_opt.take() {
715            let _ = client.disconnect().await;
716        }
717
718        Ok(())
719    }
720
721    /// Restarts the message router
722    ///
723    /// Stops the current connection and restarts it, typically used during reconnect scenarios
724    ///
725    /// # Returns
726    /// Result of the restart sequence
727    pub async fn restart(&self) -> Result<()> {
728        self.stop().await?;
729        tokio::time::sleep(Duration::from_millis(100)).await;
730        self.start().await
731    }
732
733    /// Returns the current connection state
734    pub fn is_connected(&self) -> bool {
735        self.is_connected.load(std::sync::atomic::Ordering::SeqCst)
736    }
737
738    /// Applies a new reconnection configuration
739    ///
740    /// # Arguments
741    /// * `config` - Updated reconnection configuration
742    pub async fn set_reconnect_config(&self, config: ReconnectConfig) {
743        *self.reconnect_config.write().await = config;
744    }
745
746    /// Retrieves the current reconnection configuration
747    pub async fn get_reconnect_config(&self) -> ReconnectConfig {
748        self.reconnect_config.read().await.clone()
749    }
750
751    /// Subscribes to the provided streams
752    ///
753    /// Sends a subscription request to Binance
754    ///
755    /// # Arguments
756    /// * `streams` - Collection of stream identifiers
757    pub async fn subscribe(&self, streams: Vec<String>) -> Result<()> {
758        if streams.is_empty() {
759            return Ok(());
760        }
761
762        let client_opt = self.ws_client.read().await;
763        let client = client_opt
764            .as_ref()
765            .ok_or_else(|| Error::network("WebSocket not connected"))?;
766
767        // Generate a request identifier
768        let id = self
769            .request_id
770            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
771
772        // Build the subscribe payload
773        #[allow(clippy::disallowed_methods)]
774        let request = serde_json::json!({
775            "method": "SUBSCRIBE",
776            "params": streams,
777            "id": id
778        });
779
780        // Send the request to Binance
781        client
782            .send(Message::Text(request.to_string().into()))
783            .await?;
784
785        Ok(())
786    }
787
788    /// Unsubscribes from the provided streams
789    ///
790    /// Sends an unsubscribe request to Binance
791    ///
792    /// # Arguments
793    /// * `streams` - Collection of stream identifiers
794    pub async fn unsubscribe(&self, streams: Vec<String>) -> Result<()> {
795        if streams.is_empty() {
796            return Ok(());
797        }
798
799        let client_opt = self.ws_client.read().await;
800        let client = client_opt
801            .as_ref()
802            .ok_or_else(|| Error::network("WebSocket not connected"))?;
803
804        // Generate a request identifier
805        let id = self
806            .request_id
807            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
808
809        // Build the unsubscribe payload
810        #[allow(clippy::disallowed_methods)]
811        let request = serde_json::json!({
812            "method": "UNSUBSCRIBE",
813            "params": streams,
814            "id": id
815        });
816
817        // Send the request to Binance
818        client
819            .send(Message::Text(request.to_string().into()))
820            .await?;
821
822        Ok(())
823    }
824
825    /// Message reception loop
826    ///
827    /// Continuously receives WebSocket messages and routes them to subscribers
828    async fn message_loop(
829        ws_client: Arc<RwLock<Option<WsClient>>>,
830        subscription_manager: Arc<SubscriptionManager>,
831        is_connected: Arc<std::sync::atomic::AtomicBool>,
832        reconnect_config: Arc<RwLock<ReconnectConfig>>,
833        ws_url: String,
834    ) {
835        let mut reconnect_attempt = 0;
836
837        loop {
838            // Exit if instructed to stop
839            if !is_connected.load(std::sync::atomic::Ordering::SeqCst) {
840                break;
841            }
842
843            // Confirm that a client instance exists
844            let has_client = ws_client.read().await.is_some();
845
846            if !has_client {
847                // Attempt to reconnect when no client is present
848                let config = reconnect_config.read().await;
849                if config.should_retry(reconnect_attempt) {
850                    let delay = config.calculate_delay(reconnect_attempt);
851                    drop(config);
852
853                    tokio::time::sleep(Duration::from_millis(delay)).await;
854
855                    match Self::reconnect(&ws_url, ws_client.clone()).await {
856                        Ok(_) => {
857                            reconnect_attempt = 0; // Reset counter on success
858                            continue;
859                        }
860                        Err(_) => {
861                            reconnect_attempt += 1;
862                            continue;
863                        }
864                    }
865                } else {
866                    // Stop attempting to reconnect
867                    is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
868                    break;
869                }
870            }
871
872            // Receive the next message (requires re-acquiring the lock)
873            let message_opt = {
874                let guard = ws_client.read().await;
875                if let Some(client) = guard.as_ref() {
876                    client.receive().await
877                } else {
878                    None
879                }
880            };
881
882            match message_opt {
883                Some(value) => {
884                    // Handle the message; ignore failures but continue the loop
885                    if let Err(_e) = Self::handle_message(value, subscription_manager.clone()).await
886                    {
887                        continue;
888                    }
889
890                    // Reset the reconnection counter because the connection is healthy
891                    reconnect_attempt = 0;
892                }
893                None => {
894                    // Connection failure; attempt to reconnect
895                    let config = reconnect_config.read().await;
896                    if config.should_retry(reconnect_attempt) {
897                        let delay = config.calculate_delay(reconnect_attempt);
898                        drop(config);
899
900                        tokio::time::sleep(Duration::from_millis(delay)).await;
901
902                        match Self::reconnect(&ws_url, ws_client.clone()).await {
903                            Ok(_) => {
904                                reconnect_attempt = 0;
905                                continue;
906                            }
907                            Err(_) => {
908                                reconnect_attempt += 1;
909                                continue;
910                            }
911                        }
912                    } else {
913                        // Reconnection not permitted anymore; stop the loop
914                        is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
915                        break;
916                    }
917                }
918            }
919        }
920    }
921
922    /// Processes a WebSocket message
923    ///
924    /// Parses the message and routes it to relevant subscribers
925    async fn handle_message(
926        message: Value,
927        subscription_manager: Arc<SubscriptionManager>,
928    ) -> Result<()> {
929        // Determine the stream name for routing
930        let stream_name = Self::extract_stream_name(&message)?;
931
932        // Forward the message to the corresponding subscribers
933        let sent = subscription_manager
934            .send_to_stream(&stream_name, message)
935            .await;
936
937        if sent {
938            Ok(())
939        } else {
940            Err(Error::generic("No subscribers for stream"))
941        }
942    }
943
944    /// Extracts the stream name from an incoming message
945    ///
946    /// Supports two formats:
947    /// 1. Combined stream: `{"stream":"btcusdt@ticker","data":{...}}`
948    /// 2. Single stream: `{"e":"24hrTicker","s":"BTCUSDT",...}`
949    ///
950    /// # Arguments
951    /// * `message` - Raw WebSocket payload
952    ///
953    /// # Returns
954    /// Stream identifier for routing
955    fn extract_stream_name(message: &Value) -> Result<String> {
956        // Attempt to read the combined-stream format
957        if let Some(stream) = message.get("stream").and_then(|s| s.as_str()) {
958            return Ok(stream.to_string());
959        }
960
961        // Fall back to single-stream format: event@symbol (lowercase)
962        if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
963            if let Some(symbol) = message.get("s").and_then(|s| s.as_str()) {
964                // Build the stream identifier
965                let stream = match event_type {
966                    "24hrTicker" => format!("{}@ticker", symbol.to_lowercase()),
967                    "depthUpdate" => format!("{}@depth", symbol.to_lowercase()),
968                    "aggTrade" => format!("{}@aggTrade", symbol.to_lowercase()),
969                    "trade" => format!("{}@trade", symbol.to_lowercase()),
970                    "kline" => {
971                        // Kline messages carry the interval within the "k" object
972                        if let Some(kline) = message.get("k") {
973                            if let Some(interval) = kline.get("i").and_then(|i| i.as_str()) {
974                                format!("{}@kline_{}", symbol.to_lowercase(), interval)
975                            } else {
976                                return Err(Error::generic("Missing kline interval"));
977                            }
978                        } else {
979                            return Err(Error::generic("Missing kline data"));
980                        }
981                    }
982                    "markPriceUpdate" => format!("{}@markPrice", symbol.to_lowercase()),
983                    "bookTicker" => format!("{}@bookTicker", symbol.to_lowercase()),
984                    _ => {
985                        return Err(Error::generic(format!(
986                            "Unknown event type: {}",
987                            event_type
988                        )));
989                    }
990                };
991                return Ok(stream);
992            }
993        }
994
995        // Ignore subscription acknowledgements and error responses
996        if message.get("result").is_some() || message.get("error").is_some() {
997            return Err(Error::generic("Subscription response, skip routing"));
998        }
999
1000        Err(Error::generic("Cannot extract stream name from message"))
1001    }
1002
1003    /// Reconnects the WebSocket client
1004    ///
1005    /// Closes the existing connection and establishes a new one
1006    async fn reconnect(ws_url: &str, ws_client: Arc<RwLock<Option<WsClient>>>) -> Result<()> {
1007        // Close the previous connection
1008        {
1009            let mut client_opt = ws_client.write().await;
1010            if let Some(client) = client_opt.take() {
1011                let _ = client.disconnect().await;
1012            }
1013        }
1014
1015        // Establish a fresh connection
1016        let config = WsConfig {
1017            url: ws_url.to_string(),
1018            ..Default::default()
1019        };
1020        let new_client = WsClient::new(config);
1021
1022        // Connect to the WebSocket endpoint
1023        new_client.connect().await?;
1024
1025        // Store the new client handle
1026        *ws_client.write().await = Some(new_client);
1027
1028        Ok(())
1029    }
1030}
1031
1032impl Drop for MessageRouter {
1033    fn drop(&mut self) {
1034        // Note: Drop is synchronous, so we cannot await asynchronous operations here.
1035        // Callers should explicitly invoke `stop()` to release resources.
1036    }
1037}
1038
1039impl Default for SubscriptionManager {
1040    fn default() -> Self {
1041        Self::new()
1042    }
1043}
1044
1045/// Binance WebSocket client wrapper
1046pub struct BinanceWs {
1047    client: Arc<WsClient>,
1048    listen_key: Arc<RwLock<Option<String>>>,
1049    /// Listen key manager
1050    listen_key_manager: Option<Arc<ListenKeyManager>>,
1051    /// Auto-reconnect coordinator
1052    auto_reconnect_coordinator: Arc<Mutex<Option<ccxt_core::ws_client::AutoReconnectCoordinator>>>,
1053    /// Cached ticker data
1054    tickers: Arc<Mutex<HashMap<String, Ticker>>>,
1055    /// Cached bid/ask snapshots
1056    bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
1057    /// Cached mark price entries
1058    #[allow(dead_code)]
1059    mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
1060    /// Cached order book snapshots
1061    orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
1062    /// Cached trade history (retains the latest 1000 entries)
1063    trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
1064    /// Cached OHLCV candles
1065    ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
1066    /// Cached balance data grouped by account type
1067    balances: Arc<RwLock<HashMap<String, Balance>>>,
1068    /// Cached orders grouped by symbol then order ID
1069    orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
1070    /// Cached personal trades grouped by symbol (retains the latest 1000 entries)
1071    my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
1072    /// Cached positions grouped by symbol and side
1073    positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
1074}
1075
1076impl BinanceWs {
1077    /// Creates a new Binance WebSocket client
1078    ///
1079    /// # Arguments
1080    /// * `url` - WebSocket server URL
1081    ///
1082    /// # Returns
1083    /// Initialized Binance WebSocket client
1084    pub fn new(url: String) -> Self {
1085        let config = WsConfig {
1086            url,
1087            connect_timeout: 10000,
1088            ping_interval: 180000, // Binance recommends 3 minutes
1089            reconnect_interval: 5000,
1090            max_reconnect_attempts: 5,
1091            auto_reconnect: true,
1092            enable_compression: false,
1093            pong_timeout: 90000, // Set pong timeout to 90 seconds
1094        };
1095
1096        Self {
1097            client: Arc::new(WsClient::new(config)),
1098            listen_key: Arc::new(RwLock::new(None)),
1099            listen_key_manager: None,
1100            auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1101            tickers: Arc::new(Mutex::new(HashMap::new())),
1102            bids_asks: Arc::new(Mutex::new(HashMap::new())),
1103            mark_prices: Arc::new(Mutex::new(HashMap::new())),
1104            orderbooks: Arc::new(Mutex::new(HashMap::new())),
1105            trades: Arc::new(Mutex::new(HashMap::new())),
1106            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1107            balances: Arc::new(RwLock::new(HashMap::new())),
1108            orders: Arc::new(RwLock::new(HashMap::new())),
1109            my_trades: Arc::new(RwLock::new(HashMap::new())),
1110            positions: Arc::new(RwLock::new(HashMap::new())),
1111        }
1112    }
1113
1114    /// Creates a WebSocket client with a listen key manager
1115    ///
1116    /// # Arguments
1117    /// * `url` - WebSocket server URL
1118    /// * `binance` - Shared Binance instance
1119    ///
1120    /// # Returns
1121    /// Binance WebSocket client configured with a listen key manager
1122    pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
1123        let config = WsConfig {
1124            url,
1125            connect_timeout: 10000,
1126            ping_interval: 180000, // Binance recommends 3 minutes
1127            reconnect_interval: 5000,
1128            max_reconnect_attempts: 5,
1129            auto_reconnect: true,
1130            enable_compression: false,
1131            pong_timeout: 90000, // Set pong timeout to 90 seconds
1132        };
1133
1134        Self {
1135            client: Arc::new(WsClient::new(config)),
1136            listen_key: Arc::new(RwLock::new(None)),
1137            listen_key_manager: Some(Arc::new(ListenKeyManager::new(binance))),
1138            auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1139            tickers: Arc::new(Mutex::new(HashMap::new())),
1140            bids_asks: Arc::new(Mutex::new(HashMap::new())),
1141            mark_prices: Arc::new(Mutex::new(HashMap::new())),
1142            orderbooks: Arc::new(Mutex::new(HashMap::new())),
1143            trades: Arc::new(Mutex::new(HashMap::new())),
1144            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1145            balances: Arc::new(RwLock::new(HashMap::new())),
1146            orders: Arc::new(RwLock::new(HashMap::new())),
1147            my_trades: Arc::new(RwLock::new(HashMap::new())),
1148            positions: Arc::new(RwLock::new(HashMap::new())),
1149        }
1150    }
1151
1152    /// Connects to the WebSocket server
1153    pub async fn connect(&self) -> Result<()> {
1154        // Establish the WebSocket connection
1155        self.client.connect().await?;
1156
1157        // Start the auto-reconnect coordinator when not already running
1158        let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1159        if coordinator_guard.is_none() {
1160            let coordinator = self.client.clone().create_auto_reconnect_coordinator();
1161            coordinator.start().await;
1162            *coordinator_guard = Some(coordinator);
1163            tracing::info!("Auto-reconnect coordinator started");
1164        }
1165
1166        Ok(())
1167    }
1168
1169    /// Disconnects from the WebSocket server
1170    pub async fn disconnect(&self) -> Result<()> {
1171        // Stop the auto-reconnect coordinator first
1172        let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1173        if let Some(coordinator) = coordinator_guard.take() {
1174            coordinator.stop().await;
1175            tracing::info!("Auto-reconnect coordinator stopped");
1176        }
1177
1178        // Stop automatic listen key refresh if available
1179        if let Some(manager) = &self.listen_key_manager {
1180            manager.stop_auto_refresh().await;
1181        }
1182
1183        self.client.disconnect().await
1184    }
1185
1186    /// Connects to the user data stream
1187    ///
1188    /// Creates or retrieves a listen key, connects to the user data WebSocket, and starts auto-refresh
1189    ///
1190    /// # Returns
1191    /// Result of the connection attempt
1192    ///
1193    /// # Errors
1194    /// - Listen key manager is missing (requires `new_with_auth` constructor)
1195    /// - Listen key creation failed
1196    /// - WebSocket connection failed
1197    ///
1198    /// # Example
1199    /// ```rust,no_run
1200    /// # use ccxt_exchanges::binance::Binance;
1201    /// # use ccxt_core::ExchangeConfig;
1202    /// # use std::sync::Arc;
1203    /// # async fn example() -> ccxt_core::error::Result<()> {
1204    /// let binance = Arc::new(Binance::new(ExchangeConfig::default())?);
1205    /// let ws = binance.create_authenticated_ws();
1206    /// ws.connect_user_stream().await?;
1207    /// # Ok(())
1208    /// # }
1209    /// ```
1210    pub async fn connect_user_stream(&self) -> Result<()> {
1211        let manager = self.listen_key_manager.as_ref()
1212            .ok_or_else(|| Error::invalid_request(
1213                "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
1214            ))?;
1215
1216        // Obtain an existing listen key or create a new one
1217        let listen_key = manager.get_or_create().await?;
1218
1219        // Update the WebSocket URL using the listen key
1220        let user_stream_url = format!("wss://stream.binance.com:9443/ws/{}", listen_key);
1221
1222        // Recreate the WebSocket client configuration
1223        let config = WsConfig {
1224            url: user_stream_url,
1225            connect_timeout: 10000,
1226            ping_interval: 180000,
1227            reconnect_interval: 5000,
1228            max_reconnect_attempts: 5,
1229            auto_reconnect: true,
1230            enable_compression: false,
1231            pong_timeout: 90000, // Default 90-second timeout
1232        };
1233
1234        // Initialize a new client instance (retained for future replacement logic)
1235        let _new_client = Arc::new(WsClient::new(config));
1236        // An `Arc` cannot be modified in place; concrete handling is deferred for future work
1237
1238        // Connect to the WebSocket endpoint
1239        self.client.connect().await?;
1240
1241        // Start automatic listen key refresh
1242        manager.start_auto_refresh().await;
1243
1244        // Cache the current listen key locally
1245        *self.listen_key.write().await = Some(listen_key);
1246
1247        Ok(())
1248    }
1249
1250    /// Closes the user data stream
1251    ///
1252    /// Stops auto-refresh and deletes the listen key
1253    ///
1254    /// # Returns
1255    /// Result of the shutdown
1256    pub async fn close_user_stream(&self) -> Result<()> {
1257        if let Some(manager) = &self.listen_key_manager {
1258            manager.delete().await?;
1259        }
1260        *self.listen_key.write().await = None;
1261        Ok(())
1262    }
1263
1264    /// Returns the active listen key, when available
1265    pub async fn get_listen_key(&self) -> Option<String> {
1266        if let Some(manager) = &self.listen_key_manager {
1267            manager.get_current().await
1268        } else {
1269            self.listen_key.read().await.clone()
1270        }
1271    }
1272
1273    /// Subscribes to the ticker stream for a symbol
1274    ///
1275    /// # Arguments
1276    /// * `symbol` - Trading pair (e.g. "btcusdt")
1277    ///
1278    /// # Returns
1279    /// Result of the subscription request
1280    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
1281        let stream = format!("{}@ticker", symbol.to_lowercase());
1282        self.client
1283            .subscribe(stream, Some(symbol.to_string()), None)
1284            .await
1285    }
1286
1287    /// Subscribes to the 24-hour ticker stream for all symbols
1288    pub async fn subscribe_all_tickers(&self) -> Result<()> {
1289        self.client
1290            .subscribe("!ticker@arr".to_string(), None, None)
1291            .await
1292    }
1293
1294    /// Subscribes to real-time trade executions for a symbol
1295    ///
1296    /// # Arguments
1297    /// * `symbol` - Trading pair (e.g. "btcusdt")
1298    ///
1299    /// # Returns
1300    /// Result of the subscription request
1301    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
1302        let stream = format!("{}@trade", symbol.to_lowercase());
1303        self.client
1304            .subscribe(stream, Some(symbol.to_string()), None)
1305            .await
1306    }
1307
1308    /// Subscribes to the aggregated trade stream for a symbol
1309    ///
1310    /// # Arguments
1311    /// * `symbol` - Trading pair (e.g. "btcusdt")
1312    ///
1313    /// # Returns
1314    /// Result of the subscription request
1315    pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
1316        let stream = format!("{}@aggTrade", symbol.to_lowercase());
1317        self.client
1318            .subscribe(stream, Some(symbol.to_string()), None)
1319            .await
1320    }
1321
1322    /// Subscribes to the order book depth stream
1323    ///
1324    /// # Arguments
1325    /// * `symbol` - Trading pair (e.g. "btcusdt")
1326    /// * `levels` - Depth levels (5, 10, 20)
1327    /// * `update_speed` - Update frequency ("100ms" or "1000ms")
1328    ///
1329    /// # Returns
1330    /// Result of the subscription request
1331    pub async fn subscribe_orderbook(
1332        &self,
1333        symbol: &str,
1334        levels: u32,
1335        update_speed: &str,
1336    ) -> Result<()> {
1337        let stream = if update_speed == "100ms" {
1338            format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
1339        } else {
1340            format!("{}@depth{}", symbol.to_lowercase(), levels)
1341        };
1342
1343        self.client
1344            .subscribe(stream, Some(symbol.to_string()), None)
1345            .await
1346    }
1347
1348    /// Subscribes to the diff order book stream
1349    ///
1350    /// # Arguments
1351    /// * `symbol` - Trading pair (e.g. "btcusdt")
1352    /// * `update_speed` - Update frequency ("100ms" or "1000ms")
1353    ///
1354    /// # Returns
1355    /// Result of the subscription request
1356    pub async fn subscribe_orderbook_diff(
1357        &self,
1358        symbol: &str,
1359        update_speed: Option<&str>,
1360    ) -> Result<()> {
1361        let stream = if let Some(speed) = update_speed {
1362            if speed == "100ms" {
1363                format!("{}@depth@100ms", symbol.to_lowercase())
1364            } else {
1365                format!("{}@depth", symbol.to_lowercase())
1366            }
1367        } else {
1368            format!("{}@depth", symbol.to_lowercase())
1369        };
1370
1371        self.client
1372            .subscribe(stream, Some(symbol.to_string()), None)
1373            .await
1374    }
1375
1376    /// Subscribes to Kline (candlestick) data for a symbol
1377    ///
1378    /// # Arguments
1379    /// * `symbol` - Trading pair (e.g. "btcusdt")
1380    /// * `interval` - Kline interval (1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1M)
1381    ///
1382    /// # Returns
1383    /// Result of the subscription request
1384    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
1385        let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
1386        self.client
1387            .subscribe(stream, Some(symbol.to_string()), None)
1388            .await
1389    }
1390
1391    /// Subscribes to the mini ticker stream for a symbol
1392    ///
1393    /// # Arguments
1394    /// * `symbol` - Trading pair (e.g. "btcusdt")
1395    ///
1396    /// # Returns
1397    /// Result of the subscription request
1398    pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
1399        let stream = format!("{}@miniTicker", symbol.to_lowercase());
1400        self.client
1401            .subscribe(stream, Some(symbol.to_string()), None)
1402            .await
1403    }
1404
1405    /// Subscribes to the mini ticker stream for all symbols
1406    pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
1407        self.client
1408            .subscribe("!miniTicker@arr".to_string(), None, None)
1409            .await
1410    }
1411
1412    /// Cancels an existing subscription
1413    ///
1414    /// # Arguments
1415    /// * `stream` - Stream identifier to unsubscribe from
1416    ///
1417    /// # Returns
1418    /// Result of the unsubscribe request
1419    pub async fn unsubscribe(&self, stream: String) -> Result<()> {
1420        self.client.unsubscribe(stream, None).await
1421    }
1422
1423    /// Receives the next available message
1424    ///
1425    /// # Returns
1426    /// Optional message payload when available
1427    pub async fn receive(&self) -> Option<Value> {
1428        self.client.receive().await
1429    }
1430
1431    /// Indicates whether the WebSocket connection is active
1432    pub async fn is_connected(&self) -> bool {
1433        self.client.is_connected().await
1434    }
1435
1436    /// Watches a single ticker stream (internal helper)
1437    ///
1438    /// # Arguments
1439    /// * `symbol` - Lowercase trading pair (e.g. "btcusdt")
1440    /// * `channel_name` - Channel identifier (ticker/miniTicker/markPrice/bookTicker)
1441    ///
1442    /// # Returns
1443    /// Parsed ticker data
1444    async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
1445        let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
1446
1447        // Subscribe to the requested stream
1448        self.client
1449            .subscribe(stream.clone(), Some(symbol.to_string()), None)
1450            .await?;
1451
1452        // Wait for and parse incoming messages
1453        loop {
1454            if let Some(message) = self.client.receive().await {
1455                // Ignore subscription acknowledgements
1456                if message.get("result").is_some() {
1457                    continue;
1458                }
1459
1460                // Parse ticker payload
1461                if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1462                    // Cache the ticker for later retrieval
1463                    let mut tickers = self.tickers.lock().await;
1464                    tickers.insert(ticker.symbol.clone(), ticker.clone());
1465
1466                    return Ok(ticker);
1467                }
1468            }
1469        }
1470    }
1471
1472    /// Watches multiple ticker streams (internal helper)
1473    ///
1474    /// # Arguments
1475    /// * `symbols` - Optional list of lowercase trading pairs
1476    /// * `channel_name` - Target channel name
1477    ///
1478    /// # Returns
1479    /// Mapping of symbol to ticker payloads
1480    async fn watch_tickers_internal(
1481        &self,
1482        symbols: Option<Vec<String>>,
1483        channel_name: &str,
1484    ) -> Result<HashMap<String, Ticker>> {
1485        let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
1486            // Subscribe to specific trading pairs
1487            syms.iter()
1488                .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
1489                .collect()
1490        } else {
1491            // Subscribe to the aggregated ticker stream
1492            vec![format!("!{}@arr", channel_name)]
1493        };
1494
1495        // Issue subscription requests
1496        for stream in &streams {
1497            self.client.subscribe(stream.clone(), None, None).await?;
1498        }
1499
1500        // Collect and parse messages
1501        let mut result = HashMap::new();
1502
1503        loop {
1504            if let Some(message) = self.client.receive().await {
1505                // Skip subscription acknowledgements
1506                if message.get("result").is_some() {
1507                    continue;
1508                }
1509
1510                // Handle array payloads (all tickers)
1511                if let Some(arr) = message.as_array() {
1512                    for item in arr {
1513                        if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
1514                            let symbol = ticker.symbol.clone();
1515
1516                            // Return only requested symbols when provided
1517                            if let Some(syms) = &symbols {
1518                                if syms.contains(&symbol.to_lowercase()) {
1519                                    result.insert(symbol.clone(), ticker.clone());
1520                                }
1521                            } else {
1522                                result.insert(symbol.clone(), ticker.clone());
1523                            }
1524
1525                            // Cache the ticker payload
1526                            let mut tickers = self.tickers.lock().await;
1527                            tickers.insert(symbol, ticker);
1528                        }
1529                    }
1530
1531                    // Exit once all requested tickers have been observed
1532                    if let Some(syms) = &symbols {
1533                        if result.len() == syms.len() {
1534                            return Ok(result);
1535                        }
1536                    } else {
1537                        return Ok(result);
1538                    }
1539                } else if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1540                    // Handle single-ticker payloads
1541                    let symbol = ticker.symbol.clone();
1542                    result.insert(symbol.clone(), ticker.clone());
1543
1544                    // Cache the ticker payload
1545                    let mut tickers = self.tickers.lock().await;
1546                    tickers.insert(symbol, ticker);
1547
1548                    // Exit once all requested tickers have been observed
1549                    if let Some(syms) = &symbols {
1550                        if result.len() == syms.len() {
1551                            return Ok(result);
1552                        }
1553                    }
1554                }
1555            }
1556        }
1557    }
1558
1559    /// Processes an order book delta update (internal helper)
1560    ///
1561    /// # Arguments
1562    /// * `symbol` - Trading pair
1563    /// * `delta_message` - Raw WebSocket delta payload
1564    /// * `is_futures` - Whether the feed originates from the futures market
1565    ///
1566    /// # Returns
1567    /// Result of the update; returns a special error when resynchronization is required
1568    async fn handle_orderbook_delta(
1569        &self,
1570        symbol: &str,
1571        delta_message: &Value,
1572        is_futures: bool,
1573    ) -> Result<()> {
1574        use ccxt_core::types::orderbook::{OrderBookDelta, OrderBookEntry};
1575        use rust_decimal::Decimal;
1576
1577        // Parse the delta message
1578        let first_update_id = delta_message["U"]
1579            .as_i64()
1580            .ok_or_else(|| Error::invalid_request("Missing first update ID in delta message"))?;
1581
1582        let final_update_id = delta_message["u"]
1583            .as_i64()
1584            .ok_or_else(|| Error::invalid_request("Missing final update ID in delta message"))?;
1585
1586        let prev_final_update_id = if is_futures {
1587            delta_message["pu"].as_i64()
1588        } else {
1589            None
1590        };
1591
1592        let timestamp = delta_message["E"]
1593            .as_i64()
1594            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1595
1596        // Parse bid updates
1597        let mut bids = Vec::new();
1598        if let Some(bids_arr) = delta_message["b"].as_array() {
1599            for bid in bids_arr {
1600                if let (Some(price_str), Some(amount_str)) = (bid[0].as_str(), bid[1].as_str()) {
1601                    if let (Ok(price), Ok(amount)) =
1602                        (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1603                    {
1604                        bids.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1605                    }
1606                }
1607            }
1608        }
1609
1610        // Parse ask updates
1611        let mut asks = Vec::new();
1612        if let Some(asks_arr) = delta_message["a"].as_array() {
1613            for ask in asks_arr {
1614                if let (Some(price_str), Some(amount_str)) = (ask[0].as_str(), ask[1].as_str()) {
1615                    if let (Ok(price), Ok(amount)) =
1616                        (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1617                    {
1618                        asks.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1619                    }
1620                }
1621            }
1622        }
1623
1624        // Build the delta structure
1625        let delta = OrderBookDelta {
1626            symbol: symbol.to_string(),
1627            first_update_id,
1628            final_update_id,
1629            prev_final_update_id,
1630            timestamp,
1631            bids,
1632            asks,
1633        };
1634
1635        // Retrieve or create the cached order book
1636        let mut orderbooks = self.orderbooks.lock().await;
1637        let orderbook = orderbooks
1638            .entry(symbol.to_string())
1639            .or_insert_with(|| OrderBook::new(symbol.to_string(), timestamp));
1640
1641        // If the order book is not synchronized yet, buffer the delta
1642        if !orderbook.is_synced {
1643            orderbook.buffer_delta(delta);
1644            return Ok(());
1645        }
1646
1647        // Apply the delta to the order book
1648        if let Err(e) = orderbook.apply_delta(&delta, is_futures) {
1649            // Check whether a resynchronization cycle is needed
1650            if orderbook.needs_resync {
1651                tracing::warn!("Orderbook {} needs resync due to: {}", symbol, e);
1652                // Buffer the delta so it can be reused after resync
1653                orderbook.buffer_delta(delta);
1654                // Signal that resynchronization is required
1655                return Err(Error::invalid_request(format!("RESYNC_NEEDED: {}", e)));
1656            }
1657            return Err(Error::invalid_request(e));
1658        }
1659
1660        Ok(())
1661    }
1662
1663    /// Retrieves an order book snapshot and initializes cached state (internal helper)
1664    ///
1665    /// # Arguments
1666    /// * `exchange` - Exchange reference used for REST API calls
1667    /// * `symbol` - Trading pair identifier
1668    /// * `limit` - Depth limit to request
1669    /// * `is_futures` - Whether the symbol is a futures market
1670    ///
1671    /// # Returns
1672    /// Initialized order book structure
1673    async fn fetch_orderbook_snapshot(
1674        &self,
1675        exchange: &Binance,
1676        symbol: &str,
1677        limit: Option<i64>,
1678        is_futures: bool,
1679    ) -> Result<OrderBook> {
1680        // Fetch snapshot via REST API
1681        let mut params = HashMap::new();
1682        if let Some(l) = limit {
1683            // json! macro with simple values is infallible
1684            #[allow(clippy::disallowed_methods)]
1685            let limit_value = serde_json::json!(l);
1686            params.insert("limit".to_string(), limit_value);
1687        }
1688
1689        let mut snapshot = exchange.fetch_order_book(symbol, None).await?;
1690
1691        // Mark the snapshot as synchronized
1692        snapshot.is_synced = true;
1693
1694        // Apply buffered deltas captured before the snapshot
1695        let mut orderbooks = self.orderbooks.lock().await;
1696        if let Some(cached_ob) = orderbooks.get_mut(symbol) {
1697            // Transfer buffered deltas to the snapshot instance
1698            snapshot.buffered_deltas = cached_ob.buffered_deltas.clone();
1699
1700            // Apply buffered deltas to catch up
1701            if let Ok(processed) = snapshot.process_buffered_deltas(is_futures) {
1702                tracing::debug!("Processed {} buffered deltas for {}", processed, symbol);
1703            }
1704        }
1705
1706        // Update cache with the synchronized snapshot
1707        orderbooks.insert(symbol.to_string(), snapshot.clone());
1708
1709        Ok(snapshot)
1710    }
1711
1712    /// Watches a single order book stream (internal helper)
1713    ///
1714    /// # Arguments
1715    /// * `exchange` - Exchange reference
1716    /// * `symbol` - Lowercase trading pair
1717    /// * `limit` - Depth limit
1718    /// * `update_speed` - Update frequency (100 or 1000 ms)
1719    /// * `is_futures` - Whether the symbol is a futures market
1720    ///
1721    /// # Returns
1722    /// Order book snapshot enriched with streamed updates
1723    async fn watch_orderbook_internal(
1724        &self,
1725        exchange: &Binance,
1726        symbol: &str,
1727        limit: Option<i64>,
1728        update_speed: i32,
1729        is_futures: bool,
1730    ) -> Result<OrderBook> {
1731        // Construct the stream name
1732        let stream = if update_speed == 100 {
1733            format!("{}@depth@100ms", symbol.to_lowercase())
1734        } else {
1735            format!("{}@depth", symbol.to_lowercase())
1736        };
1737
1738        // Subscribe to depth updates
1739        self.client
1740            .subscribe(stream.clone(), Some(symbol.to_string()), None)
1741            .await?;
1742
1743        // Start buffering deltas before the snapshot is ready
1744        let snapshot_fetched = Arc::new(Mutex::new(false));
1745        let _snapshot_fetched_clone = snapshot_fetched.clone();
1746
1747        // Spawn a placeholder processing loop (actual handling occurs elsewhere)
1748        let _orderbooks_clone = self.orderbooks.clone();
1749        let _symbol_clone = symbol.to_string();
1750
1751        tokio::spawn(async move {
1752            // Placeholder: actual message handling occurs in the main loop
1753        });
1754
1755        // Give the stream time to accumulate initial deltas before fetching a snapshot
1756        tokio::time::sleep(Duration::from_millis(500)).await;
1757
1758        // Fetch the initial snapshot
1759        let _snapshot = self
1760            .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1761            .await?;
1762
1763        *snapshot_fetched.lock().await = true;
1764
1765        // Main processing loop
1766        loop {
1767            if let Some(message) = self.client.receive().await {
1768                // Skip subscription acknowledgements
1769                if message.get("result").is_some() {
1770                    continue;
1771                }
1772
1773                // Process depth updates only
1774                if let Some(event_type) = message.get("e").and_then(|v| v.as_str()) {
1775                    if event_type == "depthUpdate" {
1776                        match self
1777                            .handle_orderbook_delta(symbol, &message, is_futures)
1778                            .await
1779                        {
1780                            Ok(_) => {
1781                                // Return the updated order book once synchronized
1782                                let orderbooks = self.orderbooks.lock().await;
1783                                if let Some(ob) = orderbooks.get(symbol) {
1784                                    if ob.is_synced {
1785                                        return Ok(ob.clone());
1786                                    }
1787                                }
1788                            }
1789                            Err(e) => {
1790                                let err_msg = e.to_string();
1791
1792                                // Initiate resynchronization when instructed
1793                                if err_msg.contains("RESYNC_NEEDED") {
1794                                    tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
1795
1796                                    let current_time = chrono::Utc::now().timestamp_millis();
1797                                    let should_resync = {
1798                                        let orderbooks = self.orderbooks.lock().await;
1799                                        if let Some(ob) = orderbooks.get(symbol) {
1800                                            ob.should_resync(current_time)
1801                                        } else {
1802                                            true
1803                                        }
1804                                    };
1805
1806                                    if should_resync {
1807                                        tracing::info!("Initiating resync for {}", symbol);
1808
1809                                        // Reset local state in preparation for resync
1810                                        {
1811                                            let mut orderbooks = self.orderbooks.lock().await;
1812                                            if let Some(ob) = orderbooks.get_mut(symbol) {
1813                                                ob.reset_for_resync();
1814                                                ob.mark_resync_initiated(current_time);
1815                                            }
1816                                        }
1817
1818                                        // Allow some deltas to buffer before fetching a new snapshot
1819                                        tokio::time::sleep(Duration::from_millis(500)).await;
1820
1821                                        // Fetch a fresh snapshot and continue processing
1822                                        match self
1823                                            .fetch_orderbook_snapshot(
1824                                                exchange, symbol, limit, is_futures,
1825                                            )
1826                                            .await
1827                                        {
1828                                            Ok(_) => {
1829                                                tracing::info!(
1830                                                    "Resync completed successfully for {}",
1831                                                    symbol
1832                                                );
1833                                                continue;
1834                                            }
1835                                            Err(resync_err) => {
1836                                                tracing::error!(
1837                                                    "Resync failed for {}: {}",
1838                                                    symbol,
1839                                                    resync_err
1840                                                );
1841                                                return Err(resync_err);
1842                                            }
1843                                        }
1844                                    } else {
1845                                        tracing::debug!(
1846                                            "Resync rate limited for {}, skipping",
1847                                            symbol
1848                                        );
1849                                        continue;
1850                                    }
1851                                } else {
1852                                    tracing::error!(
1853                                        "Failed to handle orderbook delta: {}",
1854                                        err_msg
1855                                    );
1856                                    continue;
1857                                }
1858                            }
1859                        }
1860                    }
1861                }
1862            }
1863        }
1864    }
1865
1866    /// Watches multiple order book streams (internal helper)
1867    ///
1868    /// # Arguments
1869    /// * `exchange` - Exchange reference
1870    /// * `symbols` - List of trading pairs
1871    /// * `limit` - Requested depth level
1872    /// * `update_speed` - Update frequency
1873    /// * `is_futures` - Whether the symbols are futures markets
1874    ///
1875    /// # Returns
1876    /// Mapping of symbol to order book data
1877    async fn watch_orderbooks_internal(
1878        &self,
1879        exchange: &Binance,
1880        symbols: Vec<String>,
1881        limit: Option<i64>,
1882        update_speed: i32,
1883        is_futures: bool,
1884    ) -> Result<HashMap<String, OrderBook>> {
1885        // Binance enforces a 200-symbol limit per connection
1886        if symbols.len() > 200 {
1887            return Err(Error::invalid_request(
1888                "Binance supports max 200 symbols per connection",
1889            ));
1890        }
1891
1892        // Subscribe to each symbol
1893        for symbol in &symbols {
1894            let stream = if update_speed == 100 {
1895                format!("{}@depth@100ms", symbol.to_lowercase())
1896            } else {
1897                format!("{}@depth", symbol.to_lowercase())
1898            };
1899
1900            self.client
1901                .subscribe(stream, Some(symbol.clone()), None)
1902                .await?;
1903        }
1904
1905        // Allow messages to accumulate before snapshot retrieval
1906        tokio::time::sleep(Duration::from_millis(500)).await;
1907
1908        // Fetch snapshots for all symbols
1909        for symbol in &symbols {
1910            let _ = self
1911                .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1912                .await;
1913        }
1914
1915        // Process incremental updates
1916        let mut result = HashMap::new();
1917        let mut update_count = 0;
1918
1919        while update_count < symbols.len() {
1920            if let Some(message) = self.client.receive().await {
1921                // Skip subscription acknowledgements
1922                if message.get("result").is_some() {
1923                    continue;
1924                }
1925
1926                // Handle depth updates
1927                if let Some(event_type) = message.get("e").and_then(|v| v.as_str()) {
1928                    if event_type == "depthUpdate" {
1929                        if let Some(msg_symbol) = message.get("s").and_then(|v| v.as_str()) {
1930                            if let Err(e) = self
1931                                .handle_orderbook_delta(msg_symbol, &message, is_futures)
1932                                .await
1933                            {
1934                                tracing::error!("Failed to handle orderbook delta: {}", e);
1935                                continue;
1936                            }
1937
1938                            update_count += 1;
1939                        }
1940                    }
1941                }
1942            }
1943        }
1944
1945        // Collect the resulting order books
1946        let orderbooks = self.orderbooks.lock().await;
1947        for symbol in &symbols {
1948            if let Some(ob) = orderbooks.get(symbol) {
1949                result.insert(symbol.clone(), ob.clone());
1950            }
1951        }
1952
1953        Ok(result)
1954    }
1955
1956    ///
1957    /// # Arguments
1958    /// * `symbol` - Trading pair identifier
1959    ///
1960    /// # Returns
1961    /// Ticker snapshot when available
1962    pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1963        let tickers = self.tickers.lock().await;
1964        tickers.get(symbol).cloned()
1965    }
1966
1967    /// Returns all cached ticker snapshots
1968    pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1969        let tickers = self.tickers.lock().await;
1970        tickers.clone()
1971    }
1972
1973    /// Handles balance update messages (internal helper)
1974    ///
1975    /// # Arguments
1976    /// * `message` - WebSocket payload
1977    /// * `account_type` - Account category (spot/future/delivery, etc.)
1978    ///
1979    /// # Returns
1980    /// Result of the balance update processing
1981    async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1982        use rust_decimal::Decimal;
1983        use std::str::FromStr;
1984
1985        // Identify the event type
1986        let event_type = message
1987            .get("e")
1988            .and_then(|e| e.as_str())
1989            .ok_or_else(|| Error::invalid_request("Missing event type in balance message"))?;
1990
1991        // Retrieve or create cached balances for the account
1992        let mut balances = self.balances.write().await;
1993        let balance = balances
1994            .entry(account_type.to_string())
1995            .or_insert_with(Balance::new);
1996
1997        match event_type {
1998            // Spot account incremental balance update
1999            "balanceUpdate" => {
2000                let asset = message
2001                    .get("a")
2002                    .and_then(|a| a.as_str())
2003                    .ok_or_else(|| Error::invalid_request("Missing asset in balanceUpdate"))?;
2004
2005                let delta_str = message
2006                    .get("d")
2007                    .and_then(|d| d.as_str())
2008                    .ok_or_else(|| Error::invalid_request("Missing delta in balanceUpdate"))?;
2009
2010                let delta = Decimal::from_str(delta_str)
2011                    .map_err(|e| Error::invalid_request(format!("Invalid delta value: {}", e)))?;
2012
2013                // Apply delta to the cached balance
2014                balance.apply_delta(asset.to_string(), delta);
2015            }
2016
2017            // Spot account full balance update
2018            "outboundAccountPosition" => {
2019                if let Some(balances_array) = message.get("B").and_then(|b| b.as_array()) {
2020                    for balance_item in balances_array {
2021                        let asset =
2022                            balance_item
2023                                .get("a")
2024                                .and_then(|a| a.as_str())
2025                                .ok_or_else(|| {
2026                                    Error::invalid_request("Missing asset in balance item")
2027                                })?;
2028
2029                        let free_str = balance_item
2030                            .get("f")
2031                            .and_then(|f| f.as_str())
2032                            .ok_or_else(|| Error::invalid_request("Missing free balance"))?;
2033
2034                        let locked_str = balance_item
2035                            .get("l")
2036                            .and_then(|l| l.as_str())
2037                            .ok_or_else(|| Error::invalid_request("Missing locked balance"))?;
2038
2039                        let free = Decimal::from_str(free_str).map_err(|e| {
2040                            Error::invalid_request(format!("Invalid free value: {}", e))
2041                        })?;
2042
2043                        let locked = Decimal::from_str(locked_str).map_err(|e| {
2044                            Error::invalid_request(format!("Invalid locked value: {}", e))
2045                        })?;
2046
2047                        // Update the cached balance snapshot
2048                        balance.update_balance(asset.to_string(), free, locked);
2049                    }
2050                }
2051            }
2052
2053            // Futures/delivery account updates
2054            "ACCOUNT_UPDATE" => {
2055                if let Some(account_data) = message.get("a") {
2056                    // Parse balance array
2057                    if let Some(balances_array) = account_data.get("B").and_then(|b| b.as_array()) {
2058                        for balance_item in balances_array {
2059                            let asset = balance_item.get("a").and_then(|a| a.as_str()).ok_or_else(
2060                                || Error::invalid_request("Missing asset in balance item"),
2061                            )?;
2062
2063                            let wallet_balance_str = balance_item
2064                                .get("wb")
2065                                .and_then(|wb| wb.as_str())
2066                                .ok_or_else(|| Error::invalid_request("Missing wallet balance"))?;
2067
2068                            let wallet_balance =
2069                                Decimal::from_str(wallet_balance_str).map_err(|e| {
2070                                    Error::invalid_request(format!("Invalid wallet balance: {}", e))
2071                                })?;
2072
2073                            // Optional cross wallet balance
2074                            let cross_wallet = balance_item
2075                                .get("cw")
2076                                .and_then(|cw| cw.as_str())
2077                                .and_then(|s| Decimal::from_str(s).ok());
2078
2079                            // Update wallet balance snapshot
2080                            balance.update_wallet(asset.to_string(), wallet_balance, cross_wallet);
2081                        }
2082                    }
2083
2084                    // Positions are contained in account_data["P"]; handling occurs in watch_positions
2085                }
2086            }
2087
2088            _ => {
2089                return Err(Error::invalid_request(format!(
2090                    "Unknown balance event type: {}",
2091                    event_type
2092                )));
2093            }
2094        }
2095
2096        Ok(())
2097    }
2098
2099    /// Parses a WebSocket trade message
2100    ///
2101    /// Extracts trade information from an `executionReport` event
2102    ///
2103    /// # Arguments
2104    /// * `data` - Raw WebSocket JSON payload
2105    ///
2106    /// # Returns
2107    /// Parsed `Trade` structure
2108    fn parse_ws_trade(&self, data: &Value) -> Result<Trade> {
2109        use ccxt_core::types::{Fee, OrderSide, OrderType, TakerOrMaker};
2110        use rust_decimal::Decimal;
2111        use std::str::FromStr;
2112
2113        // Extract symbol field
2114        let symbol = data
2115            .get("s")
2116            .and_then(|v| v.as_str())
2117            .ok_or_else(|| Error::invalid_request("Missing symbol field".to_string()))?
2118            .to_string();
2119
2120        // Trade ID (field `t`)
2121        let id = data
2122            .get("t")
2123            .and_then(|v| v.as_i64())
2124            .map(|v| v.to_string());
2125
2126        // Trade timestamp (field `T`)
2127        let timestamp = data.get("T").and_then(|v| v.as_i64()).unwrap_or(0);
2128
2129        // Executed price (field `L` - last executed price)
2130        let price = data
2131            .get("L")
2132            .and_then(|v| v.as_str())
2133            .and_then(|s| Decimal::from_str(s).ok())
2134            .unwrap_or(Decimal::ZERO);
2135
2136        // Executed amount (field `l` - last executed quantity)
2137        let amount = data
2138            .get("l")
2139            .and_then(|v| v.as_str())
2140            .and_then(|s| Decimal::from_str(s).ok())
2141            .unwrap_or(Decimal::ZERO);
2142
2143        // Quote asset amount (field `Y` - last quote asset transacted quantity)
2144        let cost = data
2145            .get("Y")
2146            .and_then(|v| v.as_str())
2147            .and_then(|s| Decimal::from_str(s).ok())
2148            .or_else(|| {
2149                // Fallback: compute from price * amount when `Y` is unavailable
2150                if price > Decimal::ZERO && amount > Decimal::ZERO {
2151                    Some(price * amount)
2152                } else {
2153                    None
2154                }
2155            });
2156
2157        // Trade side (field `S`)
2158        let side = data
2159            .get("S")
2160            .and_then(|v| v.as_str())
2161            .and_then(|s| match s.to_uppercase().as_str() {
2162                "BUY" => Some(OrderSide::Buy),
2163                "SELL" => Some(OrderSide::Sell),
2164                _ => None,
2165            })
2166            .unwrap_or(OrderSide::Buy);
2167
2168        // Order type (field `o`)
2169        let trade_type =
2170            data.get("o")
2171                .and_then(|v| v.as_str())
2172                .and_then(|s| match s.to_uppercase().as_str() {
2173                    "LIMIT" => Some(OrderType::Limit),
2174                    "MARKET" => Some(OrderType::Market),
2175                    _ => None,
2176                });
2177
2178        // Associated order ID (field `i`)
2179        let order_id = data
2180            .get("i")
2181            .and_then(|v| v.as_i64())
2182            .map(|v| v.to_string());
2183
2184        // Maker/taker flag (field `m` - true when buyer is the maker)
2185        let taker_or_maker = data.get("m").and_then(|v| v.as_bool()).map(|is_maker| {
2186            if is_maker {
2187                TakerOrMaker::Maker
2188            } else {
2189                TakerOrMaker::Taker
2190            }
2191        });
2192
2193        // Fee information (fields `n` = fee amount, `N` = fee currency)
2194        let fee = if let Some(fee_cost_str) = data.get("n").and_then(|v| v.as_str()) {
2195            if let Ok(fee_cost) = Decimal::from_str(fee_cost_str) {
2196                let currency = data
2197                    .get("N")
2198                    .and_then(|v| v.as_str())
2199                    .unwrap_or("UNKNOWN")
2200                    .to_string();
2201                Some(Fee {
2202                    currency,
2203                    cost: fee_cost,
2204                    rate: None,
2205                })
2206            } else {
2207                None
2208            }
2209        } else {
2210            None
2211        };
2212
2213        // Derive ISO8601 timestamp string when possible
2214        let datetime = chrono::DateTime::from_timestamp_millis(timestamp)
2215            .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string());
2216
2217        // Preserve the raw payload in the `info` map
2218        let mut info = HashMap::new();
2219        if let Value::Object(map) = data {
2220            for (k, v) in map.iter() {
2221                info.insert(k.clone(), v.clone());
2222            }
2223        }
2224
2225        Ok(Trade {
2226            id,
2227            order: order_id,
2228            symbol,
2229            trade_type,
2230            side,
2231            taker_or_maker,
2232            price: Price::from(price),
2233            amount: Amount::from(amount),
2234            cost: cost.map(Cost::from),
2235            fee,
2236            timestamp,
2237            datetime,
2238            info,
2239        })
2240    }
2241
2242    /// Filters cached personal trades by symbol, time range, and limit
2243    ///
2244    /// # Arguments
2245    /// * `symbol` - Optional symbol filter
2246    /// * `since` - Optional starting timestamp (inclusive)
2247    /// * `limit` - Optional maximum number of trades to return
2248    async fn filter_my_trades(
2249        &self,
2250        symbol: Option<&str>,
2251        since: Option<i64>,
2252        limit: Option<usize>,
2253    ) -> Result<Vec<Trade>> {
2254        let trades_map = self.my_trades.read().await;
2255
2256        // Filter by symbol when provided
2257        let mut trades: Vec<Trade> = if let Some(sym) = symbol {
2258            trades_map
2259                .get(sym)
2260                .map(|symbol_trades| symbol_trades.iter().cloned().collect())
2261                .unwrap_or_default()
2262        } else {
2263            trades_map
2264                .values()
2265                .flat_map(|symbol_trades| symbol_trades.iter().cloned())
2266                .collect()
2267        };
2268
2269        // Apply `since` filter when provided
2270        if let Some(since_ts) = since {
2271            trades.retain(|trade| trade.timestamp >= since_ts);
2272        }
2273
2274        // Sort by timestamp descending (latest first)
2275        trades.sort_by(|a, b| {
2276            let ts_a = a.timestamp;
2277            let ts_b = b.timestamp;
2278            ts_b.cmp(&ts_a)
2279        });
2280
2281        // Apply optional limit
2282        if let Some(lim) = limit {
2283            trades.truncate(lim);
2284        }
2285
2286        Ok(trades)
2287    }
2288
2289    /// Parses a WebSocket position payload
2290    ///
2291    /// # Arguments
2292    /// * `data` - Position data from the ACCOUNT_UPDATE event (`P` array element)
2293    ///
2294    /// # Returns
2295    /// Parsed `Position` instance
2296    ///
2297    /// # Binance WebSocket Position Payload Example
2298    /// ```json
2299    /// {
2300    ///   "s": "BTCUSDT",           // Trading pair
2301    ///   "pa": "-0.089",           // Position amount (negative indicates short)
2302    ///   "ep": "19700.03933",      // Entry price
2303    ///   "cr": "-1260.24809979",   // Accumulated realized PnL
2304    ///   "up": "1.53058860",       // Unrealized PnL
2305    ///   "mt": "isolated",         // Margin mode: isolated/cross
2306    ///   "iw": "87.13658940",      // Isolated wallet balance
2307    ///   "ps": "BOTH",             // Position side: BOTH/LONG/SHORT
2308    ///   "ma": "USDT"              // Margin asset
2309    /// }
2310    /// ```
2311    async fn parse_ws_position(&self, data: &Value) -> Result<Position> {
2312        // Extract required fields
2313        let symbol = data["s"]
2314            .as_str()
2315            .ok_or_else(|| Error::invalid_request("Missing symbol field"))?
2316            .to_string();
2317
2318        let position_amount_str = data["pa"]
2319            .as_str()
2320            .ok_or_else(|| Error::invalid_request("Missing position amount"))?;
2321
2322        let position_amount = position_amount_str
2323            .parse::<f64>()
2324            .map_err(|e| Error::invalid_request(format!("Invalid position amount: {}", e)))?;
2325
2326        // Extract position side
2327        let position_side = data["ps"]
2328            .as_str()
2329            .ok_or_else(|| Error::invalid_request("Missing position side"))?
2330            .to_uppercase();
2331
2332        // Determine hedged mode and actual side
2333        // - If ps = BOTH, hedged = false and use sign of `pa` for actual side
2334        // - If ps = LONG/SHORT, hedged = true and side equals ps
2335        let (side, hedged) = if position_side == "BOTH" {
2336            let actual_side = if position_amount < 0.0 {
2337                "short"
2338            } else {
2339                "long"
2340            };
2341            (actual_side.to_string(), false)
2342        } else {
2343            (position_side.to_lowercase(), true)
2344        };
2345
2346        // Extract additional fields
2347        let entry_price = data["ep"].as_str().and_then(|s| s.parse::<f64>().ok());
2348        let unrealized_pnl = data["up"].as_str().and_then(|s| s.parse::<f64>().ok());
2349        let realized_pnl = data["cr"].as_str().and_then(|s| s.parse::<f64>().ok());
2350        let margin_mode = data["mt"].as_str().map(|s| s.to_string());
2351        let initial_margin = data["iw"].as_str().and_then(|s| s.parse::<f64>().ok());
2352        let _margin_asset = data["ma"].as_str().map(|s| s.to_string());
2353
2354        // Construct the `Position` object
2355        Ok(Position {
2356            info: data.clone(),
2357            id: None,
2358            symbol,
2359            side: Some(side),
2360            contracts: Some(position_amount.abs()), // Absolute contract amount
2361            contract_size: None,
2362            entry_price,
2363            mark_price: None,
2364            notional: None,
2365            leverage: None,
2366            collateral: initial_margin, // Use isolated wallet balance as collateral
2367            initial_margin,
2368            initial_margin_percentage: None,
2369            maintenance_margin: None,
2370            maintenance_margin_percentage: None,
2371            unrealized_pnl,
2372            realized_pnl,
2373            liquidation_price: None,
2374            margin_ratio: None,
2375            margin_mode,
2376            hedged: Some(hedged),
2377            percentage: None,
2378            position_side: None,
2379            dual_side_position: None,
2380            timestamp: Some(chrono::Utc::now().timestamp_millis()),
2381            datetime: Some(chrono::Utc::now().to_rfc3339()),
2382        })
2383    }
2384
2385    /// Filters cached positions by symbol, time range, and limit
2386    ///
2387    /// # Arguments
2388    /// * `symbols` - Optional list of symbols to include
2389    /// * `since` - Optional starting timestamp (inclusive)
2390    /// * `limit` - Optional maximum number of positions to return
2391    ///
2392    /// # Returns
2393    /// Filtered list of positions
2394    async fn filter_positions(
2395        &self,
2396        symbols: Option<&[String]>,
2397        since: Option<i64>,
2398        limit: Option<usize>,
2399    ) -> Result<Vec<Position>> {
2400        let positions_map = self.positions.read().await;
2401
2402        // Filter by symbol list when provided
2403        let mut positions: Vec<Position> = if let Some(syms) = symbols {
2404            syms.iter()
2405                .filter_map(|sym| positions_map.get(sym))
2406                .flat_map(|side_map| side_map.values().cloned())
2407                .collect()
2408        } else {
2409            positions_map
2410                .values()
2411                .flat_map(|side_map| side_map.values().cloned())
2412                .collect()
2413        };
2414
2415        // Apply `since` filter when provided
2416        if let Some(since_ts) = since {
2417            positions.retain(|pos| {
2418                pos.timestamp
2419                    .map(|ts| ts as i64 >= since_ts)
2420                    .unwrap_or(false)
2421            });
2422        }
2423
2424        // Sort by timestamp descending (latest first)
2425        positions.sort_by(|a, b| {
2426            let ts_a = a.timestamp.unwrap_or(0);
2427            let ts_b = b.timestamp.unwrap_or(0);
2428            ts_b.cmp(&ts_a)
2429        });
2430
2431        // Apply optional limit
2432        if let Some(lim) = limit {
2433            positions.truncate(lim);
2434        }
2435
2436        Ok(positions)
2437    }
2438}
2439
2440impl Binance {
2441    /// Subscribes to the ticker stream for a unified symbol
2442    ///
2443    /// # Arguments
2444    /// * `symbol` - Unified trading pair identifier
2445    ///
2446    /// # Returns
2447    /// Result of the subscription call
2448    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
2449        let ws = self.create_ws();
2450        ws.connect().await?;
2451
2452        // Convert symbol format BTC/USDT -> btcusdt
2453        let binance_symbol = symbol.replace('/', "").to_lowercase();
2454        ws.subscribe_ticker(&binance_symbol).await
2455    }
2456
2457    /// Subscribes to the trade stream for a unified symbol
2458    ///
2459    /// # Arguments
2460    /// * `symbol` - Unified trading pair identifier
2461    ///
2462    /// # Returns
2463    /// Result of the subscription call
2464    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
2465        let ws = self.create_ws();
2466        ws.connect().await?;
2467
2468        let binance_symbol = symbol.replace('/', "").to_lowercase();
2469        ws.subscribe_trades(&binance_symbol).await
2470    }
2471
2472    /// Subscribes to the order book stream for a unified symbol
2473    ///
2474    /// # Arguments
2475    /// * `symbol` - Unified trading pair identifier
2476    /// * `levels` - Optional depth limit (default 20)
2477    ///
2478    /// # Returns
2479    /// Result of the subscription call
2480    pub async fn subscribe_orderbook(&self, symbol: &str, levels: Option<u32>) -> Result<()> {
2481        let ws = self.create_ws();
2482        ws.connect().await?;
2483
2484        let binance_symbol = symbol.replace('/', "").to_lowercase();
2485        let depth_levels = levels.unwrap_or(20);
2486        ws.subscribe_orderbook(&binance_symbol, depth_levels, "1000ms")
2487            .await
2488    }
2489
2490    /// Subscribes to the candlestick stream for a unified symbol
2491    ///
2492    /// # Arguments
2493    /// * `symbol` - Unified trading pair identifier
2494    /// * `interval` - Candlestick interval identifier
2495    ///
2496    /// # Returns
2497    /// Result of the subscription call
2498    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
2499        let ws = self.create_ws();
2500        ws.connect().await?;
2501
2502        let binance_symbol = symbol.replace('/', "").to_lowercase();
2503        ws.subscribe_kline(&binance_symbol, interval).await
2504    }
2505
2506    /// Watches a ticker stream for a single unified symbol
2507    ///
2508    /// # Arguments
2509    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2510    /// * `params` - Optional parameters
2511    ///   - `name`: Channel name (ticker/miniTicker, defaults to ticker)
2512    ///
2513    /// # Returns
2514    /// Parsed ticker structure
2515    ///
2516    /// # Example
2517    /// ```rust,no_run
2518    /// # use ccxt_exchanges::binance::Binance;
2519    /// # use ccxt_core::ExchangeConfig;
2520    /// # use ccxt_core::types::Price;
2521    /// # async fn example() -> ccxt_core::error::Result<()> {
2522    /// let exchange = Binance::new(ExchangeConfig::default())?;
2523    /// let ticker = exchange.watch_ticker("BTC/USDT", None).await?;
2524    /// println!("Price: {}", ticker.last.unwrap_or(Price::ZERO));
2525    /// # Ok(())
2526    /// # }
2527    /// ```
2528    pub async fn watch_ticker(
2529        &self,
2530        symbol: &str,
2531        params: Option<HashMap<String, Value>>,
2532    ) -> Result<Ticker> {
2533        // Load market metadata
2534        self.load_markets(false).await?;
2535
2536        // Convert unified symbol to exchange format
2537        let market = self.base.market(symbol).await?;
2538        let binance_symbol = market.id.to_lowercase();
2539
2540        // Select channel name
2541        let channel_name = if let Some(p) = &params {
2542            p.get("name").and_then(|v| v.as_str()).unwrap_or("ticker")
2543        } else {
2544            "ticker"
2545        };
2546
2547        // Establish WebSocket connection
2548        let ws = self.create_ws();
2549        ws.connect().await?;
2550
2551        // Watch ticker
2552        ws.watch_ticker_internal(&binance_symbol, channel_name)
2553            .await
2554    }
2555
2556    /// Watches ticker streams for multiple unified symbols
2557    ///
2558    /// # Arguments
2559    /// * `symbols` - Optional list of unified trading pairs (None subscribes to all)
2560    /// * `params` - Optional parameters
2561    ///   - `name`: Channel name (ticker/miniTicker, defaults to ticker)
2562    ///
2563    /// # Returns
2564    /// Mapping of symbol to ticker data
2565    ///
2566    /// # Example
2567    /// ```rust,no_run
2568    /// # use ccxt_exchanges::binance::Binance;
2569    /// # use ccxt_core::ExchangeConfig;
2570    /// # async fn example() -> ccxt_core::error::Result<()> {
2571    /// let exchange = Binance::new(ExchangeConfig::default())?;
2572    ///
2573    /// // Watch a subset of symbols
2574    /// let tickers = exchange.watch_tickers(
2575    ///     Some(vec!["BTC/USDT".to_string(), "ETH/USDT".to_string()]),
2576    ///     None
2577    /// ).await?;
2578    ///
2579    /// // Watch all symbols
2580    /// let all_tickers = exchange.watch_tickers(None, None).await?;
2581    /// # Ok(())
2582    /// # }
2583    /// ```
2584    pub async fn watch_tickers(
2585        &self,
2586        symbols: Option<Vec<String>>,
2587        params: Option<HashMap<String, Value>>,
2588    ) -> Result<HashMap<String, Ticker>> {
2589        // Load market metadata
2590        self.load_markets(false).await?;
2591
2592        // Determine channel name
2593        let channel_name = if let Some(p) = &params {
2594            p.get("name").and_then(|v| v.as_str()).unwrap_or("ticker")
2595        } else {
2596            "ticker"
2597        };
2598
2599        // Validate channel selection
2600        if channel_name == "bookTicker" {
2601            return Err(Error::invalid_request(
2602                "To subscribe for bids-asks, use watch_bids_asks() method instead",
2603            ));
2604        }
2605
2606        // Convert unified symbols to exchange format
2607        let binance_symbols = if let Some(syms) = symbols {
2608            let mut result = Vec::new();
2609            for symbol in syms {
2610                let market = self.base.market(&symbol).await?;
2611                result.push(market.id.to_lowercase());
2612            }
2613            Some(result)
2614        } else {
2615            None
2616        };
2617
2618        // Establish WebSocket connection
2619        let ws = self.create_ws();
2620        ws.connect().await?;
2621
2622        // Watch tickers
2623        ws.watch_tickers_internal(binance_symbols, channel_name)
2624            .await
2625    }
2626
2627    /// Watches the mark price stream for a futures market
2628    ///
2629    /// # Arguments
2630    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT:USDT")
2631    /// * `params` - Optional parameters
2632    ///   - `use1sFreq`: Whether to use 1-second updates (defaults to true)
2633    ///
2634    /// # Returns
2635    /// Ticker structure representing the mark price
2636    ///
2637    /// # Example
2638    /// ```rust,no_run
2639    /// # use ccxt_exchanges::binance::Binance;
2640    /// # use ccxt_core::ExchangeConfig;
2641    /// # use std::collections::HashMap;
2642    /// # use serde_json::json;
2643    /// # async fn example() -> ccxt_core::error::Result<()> {
2644    /// let exchange = Binance::new(ExchangeConfig::default())?;
2645    ///
2646    /// // Use 1-second updates
2647    /// let ticker = exchange.watch_mark_price("BTC/USDT:USDT", None).await?;
2648    ///
2649    /// // Use 3-second updates
2650    /// let mut params = HashMap::new();
2651    /// params.insert("use1sFreq".to_string(), json!(false));
2652    /// let ticker = exchange.watch_mark_price("BTC/USDT:USDT", Some(params)).await?;
2653    /// # Ok(())
2654    /// # }
2655    /// ```
2656    pub async fn watch_mark_price(
2657        &self,
2658        symbol: &str,
2659        params: Option<HashMap<String, Value>>,
2660    ) -> Result<Ticker> {
2661        // Load market metadata
2662        self.load_markets(false).await?;
2663
2664        // Ensure the symbol belongs to a futures market
2665        let market = self.base.market(symbol).await?;
2666        if market.market_type != MarketType::Swap && market.market_type != MarketType::Futures {
2667            return Err(Error::invalid_request(format!(
2668                "watch_mark_price() does not support {} markets",
2669                market.market_type
2670            )));
2671        }
2672
2673        let binance_symbol = market.id.to_lowercase();
2674
2675        // Determine update frequency
2676        let use_1s_freq = if let Some(p) = &params {
2677            p.get("use1sFreq").and_then(|v| v.as_bool()).unwrap_or(true)
2678        } else {
2679            true
2680        };
2681
2682        // Construct channel name
2683        let channel_name = if use_1s_freq {
2684            "markPrice@1s"
2685        } else {
2686            "markPrice"
2687        };
2688
2689        // Establish WebSocket connection
2690        let ws = self.create_ws();
2691        ws.connect().await?;
2692
2693        // Watch mark price
2694        ws.watch_ticker_internal(&binance_symbol, channel_name)
2695            .await
2696    }
2697
2698    /// Watches an order book stream for a unified symbol
2699    ///
2700    /// # Arguments
2701    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2702    /// * `limit` - Optional depth limit (defaults to unlimited)
2703    /// * `params` - Optional parameters
2704    ///   - `speed`: Update frequency (100 or 1000 ms, defaults to 100)
2705    ///
2706    /// # Returns
2707    /// Order book snapshot populated with streaming updates
2708    ///
2709    /// # Example
2710    /// ```rust,no_run
2711    /// # use ccxt_exchanges::binance::Binance;
2712    /// # use ccxt_core::ExchangeConfig;
2713    /// # use std::collections::HashMap;
2714    /// # use serde_json::json;
2715    /// # async fn example() -> ccxt_core::error::Result<()> {
2716    /// let exchange = Binance::new(ExchangeConfig::default())?;
2717    ///
2718    /// // Watch order book with 100 ms updates
2719    /// let orderbook = exchange.watch_order_book("BTC/USDT", None, None).await?;
2720    /// println!("Best bid: {:?}", orderbook.best_bid());
2721    /// println!("Best ask: {:?}", orderbook.best_ask());
2722    ///
2723    /// // Watch order book limited to 100 levels with 1000 ms updates
2724    /// let mut params = HashMap::new();
2725    /// params.insert("speed".to_string(), json!(1000));
2726    /// let orderbook = exchange.watch_order_book(
2727    ///     "BTC/USDT",
2728    ///     Some(100),
2729    ///     Some(params)
2730    /// ).await?;
2731    /// # Ok(())
2732    /// # }
2733    /// ```
2734    pub async fn watch_order_book(
2735        &self,
2736        symbol: &str,
2737        limit: Option<i64>,
2738        params: Option<HashMap<String, Value>>,
2739    ) -> Result<OrderBook> {
2740        // Load market metadata
2741        self.load_markets(false).await?;
2742
2743        // Resolve market details
2744        let market = self.base.market(symbol).await?;
2745        let binance_symbol = market.id.to_lowercase();
2746
2747        // Determine whether this is a futures market
2748        let is_futures =
2749            market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2750
2751        // Determine update speed
2752        let update_speed = if let Some(p) = &params {
2753            p.get("speed").and_then(|v| v.as_i64()).unwrap_or(100) as i32
2754        } else {
2755            100
2756        };
2757
2758        // Validate update speed
2759        if update_speed != 100 && update_speed != 1000 {
2760            return Err(Error::invalid_request(
2761                "Update speed must be 100 or 1000 milliseconds",
2762            ));
2763        }
2764
2765        // Establish WebSocket connection
2766        let ws = self.create_ws();
2767        ws.connect().await?;
2768
2769        // Watch the order book
2770        ws.watch_orderbook_internal(self, &binance_symbol, limit, update_speed, is_futures)
2771            .await
2772    }
2773
2774    /// Watches order books for multiple symbols
2775    ///
2776    /// # Arguments
2777    /// * `symbols` - List of trading pairs (maximum 200)
2778    /// * `limit` - Optional depth limit
2779    /// * `params` - Optional parameters
2780    ///   - `speed`: Update frequency (100 or 1000 ms)
2781    ///
2782    /// # Returns
2783    /// Mapping of symbol to corresponding order book
2784    ///
2785    /// # Example
2786    /// ```rust,no_run
2787    /// # use ccxt_exchanges::binance::Binance;
2788    /// # use ccxt_core::ExchangeConfig;
2789    /// # async fn example() -> ccxt_core::error::Result<()> {
2790    /// let exchange = Binance::new(ExchangeConfig::default())?;
2791    ///
2792    /// let symbols = vec![
2793    ///     "BTC/USDT".to_string(),
2794    ///     "ETH/USDT".to_string(),
2795    /// ];
2796    ///
2797    /// let orderbooks = exchange.watch_order_books(symbols, None, None).await?;
2798    /// for (symbol, ob) in orderbooks {
2799    ///     println!("{}: spread = {:?}", symbol, ob.spread());
2800    /// }
2801    /// # Ok(())
2802    /// # }
2803    /// ```
2804    pub async fn watch_order_books(
2805        &self,
2806        symbols: Vec<String>,
2807        limit: Option<i64>,
2808        params: Option<HashMap<String, Value>>,
2809    ) -> Result<HashMap<String, OrderBook>> {
2810        // Enforce symbol count constraints
2811        if symbols.is_empty() {
2812            return Err(Error::invalid_request("Symbols list cannot be empty"));
2813        }
2814
2815        if symbols.len() > 200 {
2816            return Err(Error::invalid_request(
2817                "Binance supports max 200 symbols per connection",
2818            ));
2819        }
2820
2821        // Load market metadata
2822        self.load_markets(false).await?;
2823
2824        // Convert symbols to exchange format and ensure consistent market type
2825        let mut binance_symbols = Vec::new();
2826        let mut is_futures = false;
2827
2828        for symbol in &symbols {
2829            let market = self.base.market(symbol).await?;
2830            binance_symbols.push(market.id.to_lowercase());
2831
2832            let current_is_futures =
2833                market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2834            if !binance_symbols.is_empty() && current_is_futures != is_futures {
2835                return Err(Error::invalid_request(
2836                    "Cannot mix spot and futures markets in watch_order_books",
2837                ));
2838            }
2839            is_futures = current_is_futures;
2840        }
2841
2842        // Determine update speed
2843        let update_speed = if let Some(p) = &params {
2844            p.get("speed").and_then(|v| v.as_i64()).unwrap_or(100) as i32
2845        } else {
2846            100
2847        };
2848
2849        // Establish WebSocket connection
2850        let ws = self.create_ws();
2851        ws.connect().await?;
2852
2853        // Watch order books
2854        ws.watch_orderbooks_internal(self, binance_symbols, limit, update_speed, is_futures)
2855            .await
2856    }
2857
2858    /// Watches mark prices for multiple futures symbols
2859    ///
2860    /// # Arguments
2861    /// * `symbols` - Optional list of symbols (None subscribes to all)
2862    /// * `params` - Optional parameters
2863    ///   - `use1sFreq`: Whether to use 1-second updates (defaults to true)
2864    ///
2865    /// # Returns
2866    /// Mapping of symbol to ticker data
2867    pub async fn watch_mark_prices(
2868        &self,
2869        symbols: Option<Vec<String>>,
2870        params: Option<HashMap<String, Value>>,
2871    ) -> Result<HashMap<String, Ticker>> {
2872        // Load market metadata
2873        self.load_markets(false).await?;
2874
2875        // Determine update frequency
2876        let use_1s_freq = if let Some(p) = &params {
2877            p.get("use1sFreq").and_then(|v| v.as_bool()).unwrap_or(true)
2878        } else {
2879            true
2880        };
2881
2882        // Construct channel name
2883        let channel_name = if use_1s_freq {
2884            "markPrice@1s"
2885        } else {
2886            "markPrice"
2887        };
2888
2889        // Convert symbols and validate market type
2890        let binance_symbols = if let Some(syms) = symbols {
2891            let mut result = Vec::new();
2892            for symbol in syms {
2893                let market = self.base.market(&symbol).await?;
2894                if market.market_type != MarketType::Swap
2895                    && market.market_type != MarketType::Futures
2896                {
2897                    return Err(Error::invalid_request(format!(
2898                        "watch_mark_prices() does not support {} markets",
2899                        market.market_type
2900                    )));
2901                }
2902                result.push(market.id.to_lowercase());
2903            }
2904            Some(result)
2905        } else {
2906            None
2907        };
2908
2909        // Establish WebSocket connection
2910        let ws = self.create_ws();
2911        ws.connect().await?;
2912
2913        // Watch mark prices
2914        ws.watch_tickers_internal(binance_symbols, channel_name)
2915            .await
2916    }
2917    /// Streams trade data for a unified symbol
2918    ///
2919    /// # Arguments
2920    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2921    /// * `since` - Optional starting timestamp in milliseconds
2922    /// * `limit` - Optional maximum number of trades to return
2923    ///
2924    /// # Returns
2925    /// Vector of parsed trade data
2926    pub async fn watch_trades(
2927        &self,
2928        symbol: &str,
2929        since: Option<i64>,
2930        limit: Option<usize>,
2931    ) -> Result<Vec<Trade>> {
2932        // Ensure market metadata is loaded
2933        self.base.load_markets(false).await?;
2934
2935        // Resolve market information
2936        let market = self.base.market(symbol).await?;
2937        let binance_symbol = market.id.to_lowercase();
2938
2939        // Establish WebSocket connection
2940        let ws = self.create_ws();
2941        ws.connect().await?;
2942
2943        // Subscribe to the trade stream
2944        ws.subscribe_trades(&binance_symbol).await?;
2945
2946        // Process incoming messages
2947        let mut retries = 0;
2948        const MAX_RETRIES: u32 = 50;
2949
2950        while retries < MAX_RETRIES {
2951            if let Some(msg) = ws.client.receive().await {
2952                // Skip subscription acknowledgement messages
2953                if msg.get("result").is_some() || msg.get("id").is_some() {
2954                    continue;
2955                }
2956
2957                // Parse trade payload
2958                if let Ok(trade) = parser::parse_ws_trade(&msg, Some(&market)) {
2959                    // Cache trade payload
2960                    let mut trades_map = ws.trades.lock().await;
2961                    let trades = trades_map
2962                        .entry(symbol.to_string())
2963                        .or_insert_with(VecDeque::new);
2964
2965                    // Enforce cache size limit
2966                    const MAX_TRADES: usize = 1000;
2967                    if trades.len() >= MAX_TRADES {
2968                        trades.pop_front();
2969                    }
2970                    trades.push_back(trade);
2971
2972                    // Gather trades from cache
2973                    let mut result: Vec<Trade> = trades.iter().cloned().collect();
2974
2975                    // Apply optional `since` filter
2976                    if let Some(since_ts) = since {
2977                        result.retain(|t| t.timestamp >= since_ts);
2978                    }
2979
2980                    // Apply optional limit
2981                    if let Some(limit_size) = limit {
2982                        if result.len() > limit_size {
2983                            result = result.split_off(result.len() - limit_size);
2984                        }
2985                    }
2986
2987                    return Ok(result);
2988                }
2989            }
2990
2991            retries += 1;
2992            tokio::time::sleep(Duration::from_millis(100)).await;
2993        }
2994
2995        Err(Error::network("Timeout waiting for trade data"))
2996    }
2997
2998    /// Streams OHLCV data for a unified symbol
2999    ///
3000    /// # Arguments
3001    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
3002    /// * `timeframe` - Candlestick interval (e.g. "1m", "5m", "1h", "1d")
3003    /// * `since` - Optional starting timestamp in milliseconds
3004    /// * `limit` - Optional maximum number of entries to return
3005    ///
3006    /// # Returns
3007    /// Vector of OHLCV entries
3008    pub async fn watch_ohlcv(
3009        &self,
3010        symbol: &str,
3011        timeframe: &str,
3012        since: Option<i64>,
3013        limit: Option<usize>,
3014    ) -> Result<Vec<OHLCV>> {
3015        // Ensure market metadata is loaded
3016        self.base.load_markets(false).await?;
3017
3018        // Resolve market information
3019        let market = self.base.market(symbol).await?;
3020        let binance_symbol = market.id.to_lowercase();
3021
3022        // Establish WebSocket connection
3023        let ws = self.create_ws();
3024        ws.connect().await?;
3025
3026        // Subscribe to the Kline stream
3027        ws.subscribe_kline(&binance_symbol, timeframe).await?;
3028
3029        // Process incoming messages
3030        let mut retries = 0;
3031        const MAX_RETRIES: u32 = 50;
3032
3033        while retries < MAX_RETRIES {
3034            if let Some(msg) = ws.client.receive().await {
3035                // Skip subscription acknowledgement messages
3036                if msg.get("result").is_some() || msg.get("id").is_some() {
3037                    continue;
3038                }
3039
3040                // Parse OHLCV payload
3041                if let Ok(ohlcv) = parser::parse_ws_ohlcv(&msg) {
3042                    // Cache OHLCV entries
3043                    let cache_key = format!("{}:{}", symbol, timeframe);
3044                    let mut ohlcvs_map = ws.ohlcvs.lock().await;
3045                    let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
3046
3047                    // Enforce cache size limit
3048                    const MAX_OHLCVS: usize = 1000;
3049                    if ohlcvs.len() >= MAX_OHLCVS {
3050                        ohlcvs.pop_front();
3051                    }
3052                    ohlcvs.push_back(ohlcv);
3053
3054                    // Collect results from cache
3055                    let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
3056
3057                    // Apply optional `since` filter
3058                    if let Some(since_ts) = since {
3059                        result.retain(|o| o.timestamp >= since_ts);
3060                    }
3061
3062                    // Apply optional limit
3063                    if let Some(limit_size) = limit {
3064                        if result.len() > limit_size {
3065                            result = result.split_off(result.len() - limit_size);
3066                        }
3067                    }
3068
3069                    return Ok(result);
3070                }
3071            }
3072
3073            retries += 1;
3074            tokio::time::sleep(Duration::from_millis(100)).await;
3075        }
3076
3077        Err(Error::network("Timeout waiting for OHLCV data"))
3078    }
3079
3080    /// Streams the best bid/ask data for a unified symbol
3081    ///
3082    /// # Arguments
3083    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
3084    ///
3085    /// # Returns
3086    /// Latest bid/ask snapshot
3087    pub async fn watch_bids_asks(&self, symbol: &str) -> Result<BidAsk> {
3088        // Ensure market metadata is loaded
3089        self.base.load_markets(false).await?;
3090
3091        // Resolve market details
3092        let market = self.base.market(symbol).await?;
3093        let binance_symbol = market.id.to_lowercase();
3094
3095        // Establish WebSocket connection
3096        let ws = self.create_ws();
3097        ws.connect().await?;
3098
3099        // Subscribe to the bookTicker stream
3100        let stream_name = format!("{}@bookTicker", binance_symbol);
3101        ws.client
3102            .subscribe(stream_name, Some(symbol.to_string()), None)
3103            .await?;
3104
3105        // Process incoming messages
3106        let mut retries = 0;
3107        const MAX_RETRIES: u32 = 50;
3108
3109        while retries < MAX_RETRIES {
3110            if let Some(msg) = ws.client.receive().await {
3111                // Skip subscription acknowledgement messages
3112                if msg.get("result").is_some() || msg.get("id").is_some() {
3113                    continue;
3114                }
3115
3116                // Parse bid/ask payload
3117                if let Ok(bid_ask) = parser::parse_ws_bid_ask(&msg) {
3118                    // Cache the snapshot
3119                    let mut bids_asks_map = ws.bids_asks.lock().await;
3120                    bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
3121
3122                    return Ok(bid_ask);
3123                }
3124            }
3125
3126            retries += 1;
3127            tokio::time::sleep(Duration::from_millis(100)).await;
3128        }
3129
3130        Err(Error::network("Timeout waiting for BidAsk data"))
3131    }
3132
3133    /// Streams account balance changes (private user data stream)
3134    ///
3135    /// # Arguments
3136    /// * `params` - Optional parameters
3137    ///   - `type`: Account type (spot/future/delivery/margin, etc.)
3138    ///   - `fetchBalanceSnapshot`: Whether to fetch an initial snapshot (default false)
3139    ///   - `awaitBalanceSnapshot`: Whether to wait for snapshot completion (default true)
3140    ///
3141    /// # Returns
3142    /// Updated account balances
3143    ///
3144    /// # Example
3145    ///
3146    /// ```rust,no_run
3147    /// # use ccxt_exchanges::binance::Binance;
3148    /// # use ccxt_core::ExchangeConfig;
3149    /// # use std::collections::HashMap;
3150    /// # use std::sync::Arc;
3151    /// # use serde_json::json;
3152    /// # async fn example() -> ccxt_core::error::Result<()> {
3153    /// let mut config = ExchangeConfig::default();
3154    /// config.api_key = Some("your-api-key".to_string());
3155    /// config.secret = Some("your-secret".to_string());
3156    /// let exchange = Arc::new(Binance::new(config)?);
3157    ///
3158    /// // Watch spot account balance
3159    /// let balance = exchange.clone().watch_balance(None).await?;
3160    ///
3161    /// // Watch futures account balance
3162    /// let mut params = HashMap::new();
3163    /// params.insert("type".to_string(), json!("future"));
3164    /// let futures_balance = exchange.clone().watch_balance(Some(params)).await?;
3165    /// # Ok(())
3166    /// # }
3167    /// ```
3168    pub async fn watch_balance(
3169        self: Arc<Self>,
3170        params: Option<HashMap<String, Value>>,
3171    ) -> Result<Balance> {
3172        // Ensure market metadata is loaded
3173        self.base.load_markets(false).await?;
3174
3175        // Resolve account type
3176        let account_type = if let Some(p) = &params {
3177            p.get("type")
3178                .and_then(|v| v.as_str())
3179                .unwrap_or_else(|| self.options.default_type.as_str())
3180        } else {
3181            self.options.default_type.as_str()
3182        };
3183
3184        // Determine configuration flags
3185        let fetch_snapshot = if let Some(p) = &params {
3186            p.get("fetchBalanceSnapshot")
3187                .and_then(|v| v.as_bool())
3188                .unwrap_or(false)
3189        } else {
3190            false
3191        };
3192
3193        let await_snapshot = if let Some(p) = &params {
3194            p.get("awaitBalanceSnapshot")
3195                .and_then(|v| v.as_bool())
3196                .unwrap_or(true)
3197        } else {
3198            true
3199        };
3200
3201        // Establish authenticated WebSocket connection
3202        let ws = self.create_authenticated_ws();
3203        ws.connect().await?;
3204
3205        // Optionally fetch the initial snapshot
3206        if fetch_snapshot {
3207            // Convert string account type to AccountType enum
3208            let account_type_enum = account_type.parse::<ccxt_core::types::AccountType>().ok();
3209            let snapshot = self.fetch_balance(account_type_enum).await?;
3210
3211            // Update cache with the snapshot
3212            let mut balances = ws.balances.write().await;
3213            balances.insert(account_type.to_string(), snapshot.clone());
3214
3215            if !await_snapshot {
3216                return Ok(snapshot);
3217            }
3218        }
3219
3220        // Process balance update messages
3221        let mut retries = 0;
3222        const MAX_RETRIES: u32 = 100;
3223
3224        while retries < MAX_RETRIES {
3225            if let Some(msg) = ws.client.receive().await {
3226                // Skip subscription acknowledgement messages
3227                if msg.get("result").is_some() || msg.get("id").is_some() {
3228                    continue;
3229                }
3230
3231                // Determine message event type
3232                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3233                    // Handle supported balance message types
3234                    match event_type {
3235                        "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE" => {
3236                            // Parse and update local balance cache
3237                            if let Ok(()) = ws.handle_balance_message(&msg, account_type).await {
3238                                // Retrieve the updated balance snapshot
3239                                let balances = ws.balances.read().await;
3240                                if let Some(balance) = balances.get(account_type) {
3241                                    return Ok(balance.clone());
3242                                }
3243                            }
3244                        }
3245                        _ => {}
3246                    }
3247                }
3248            }
3249
3250            retries += 1;
3251            tokio::time::sleep(Duration::from_millis(100)).await;
3252        }
3253
3254        Err(Error::network("Timeout waiting for balance data"))
3255    }
3256
3257    /// Watches authenticated order updates via the user data stream
3258    ///
3259    /// Streams real-time order status changes delivered by Binance user data WebSocket messages
3260    ///
3261    /// # Arguments
3262    /// * `symbol` - Optional trading pair filter (e.g. "BTC/USDT")
3263    /// * `since` - Optional starting timestamp in milliseconds
3264    /// * `limit` - Optional maximum number of orders to return
3265    /// * `params` - Optional additional parameters
3266    ///
3267    /// # Returns
3268    /// Orders returned in descending chronological order
3269    ///
3270    /// # Examples
3271    /// ```no_run
3272    /// use std::sync::Arc;
3273    /// use ccxt_exchanges::binance::Binance;
3274    ///
3275    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
3276    ///     let exchange = Arc::new(Binance::new(Default::default())?);
3277    ///
3278    ///     // Watch all order updates
3279    ///     let orders = exchange.clone().watch_orders(None, None, None, None).await?;
3280    ///     println!("Received {} order updates", orders.len());
3281    ///
3282    ///     // Watch updates for a specific trading pair
3283    ///     let btc_orders = exchange.clone().watch_orders(Some("BTC/USDT"), None, None, None).await?;
3284    ///     println!("BTC/USDT orders: {:?}", btc_orders);
3285    ///
3286    ///     Ok(())
3287    /// }
3288    /// ```
3289    pub async fn watch_orders(
3290        self: Arc<Self>,
3291        symbol: Option<&str>,
3292        since: Option<i64>,
3293        limit: Option<usize>,
3294        _params: Option<HashMap<String, Value>>,
3295    ) -> Result<Vec<Order>> {
3296        self.base.load_markets(false).await?;
3297
3298        let ws = self.create_authenticated_ws();
3299        ws.connect().await?;
3300
3301        // Receive messages in a loop
3302        loop {
3303            if let Some(msg) = ws.client.receive().await {
3304                if let Value::Object(data) = msg {
3305                    if let Some(event_type) = data.get("e").and_then(|v| v.as_str()) {
3306                        match event_type {
3307                            "executionReport" => {
3308                                // Parse order payload
3309                                let order = self.parse_ws_order(&data)?;
3310
3311                                // Update order cache
3312                                let mut orders = ws.orders.write().await;
3313                                let symbol_orders = orders
3314                                    .entry(order.symbol.clone())
3315                                    .or_insert_with(HashMap::new);
3316                                symbol_orders.insert(order.id.clone(), order.clone());
3317                                drop(orders);
3318
3319                                // Check for trade execution events
3320                                if let Some(exec_type) = data.get("x").and_then(|v| v.as_str()) {
3321                                    if exec_type == "TRADE" {
3322                                        // Parse execution trade payload
3323                                        if let Ok(trade) =
3324                                            ws.parse_ws_trade(&Value::Object(data.clone()))
3325                                        {
3326                                            // Update my trades cache
3327                                            let mut trades = ws.my_trades.write().await;
3328                                            let symbol_trades = trades
3329                                                .entry(trade.symbol.clone())
3330                                                .or_insert_with(VecDeque::new);
3331
3332                                            // Prepend newest trade and enforce max length of 1000
3333                                            symbol_trades.push_front(trade);
3334                                            if symbol_trades.len() > 1000 {
3335                                                symbol_trades.pop_back();
3336                                            }
3337                                        }
3338                                    }
3339                                }
3340
3341                                // Return filtered orders
3342                                return self.filter_orders(&ws, symbol, since, limit).await;
3343                            }
3344                            _ => continue,
3345                        }
3346                    }
3347                }
3348            } else {
3349                tokio::time::sleep(Duration::from_millis(100)).await;
3350            }
3351        }
3352    }
3353
3354    /// Parses a WebSocket order message
3355    fn parse_ws_order(&self, data: &serde_json::Map<String, Value>) -> Result<Order> {
3356        use ccxt_core::types::{OrderSide, OrderStatus, OrderType};
3357        use rust_decimal::Decimal;
3358        use std::str::FromStr;
3359
3360        // Extract core fields
3361        let symbol = data.get("s").and_then(|v| v.as_str()).unwrap_or("");
3362        let order_id = data
3363            .get("i")
3364            .and_then(|v| v.as_i64())
3365            .map(|id| id.to_string())
3366            .unwrap_or_default();
3367        let client_order_id = data.get("c").and_then(|v| v.as_str()).map(String::from);
3368
3369        // Map order status
3370        let status_str = data.get("X").and_then(|v| v.as_str()).unwrap_or("NEW");
3371        let status = match status_str {
3372            "NEW" => OrderStatus::Open,
3373            "PARTIALLY_FILLED" => OrderStatus::Open,
3374            "FILLED" => OrderStatus::Closed,
3375            "CANCELED" => OrderStatus::Cancelled,
3376            "REJECTED" => OrderStatus::Rejected,
3377            "EXPIRED" => OrderStatus::Expired,
3378            _ => OrderStatus::Open,
3379        };
3380
3381        // Map order side
3382        let side_str = data.get("S").and_then(|v| v.as_str()).unwrap_or("BUY");
3383        let side = match side_str {
3384            "BUY" => OrderSide::Buy,
3385            "SELL" => OrderSide::Sell,
3386            _ => OrderSide::Buy,
3387        };
3388
3389        // Map order type
3390        let type_str = data.get("o").and_then(|v| v.as_str()).unwrap_or("LIMIT");
3391        let order_type = match type_str {
3392            "LIMIT" => OrderType::Limit,
3393            "MARKET" => OrderType::Market,
3394            "STOP_LOSS" => OrderType::StopLoss,
3395            "STOP_LOSS_LIMIT" => OrderType::StopLossLimit,
3396            "TAKE_PROFIT" => OrderType::TakeProfit,
3397            "TAKE_PROFIT_LIMIT" => OrderType::TakeProfitLimit,
3398            "LIMIT_MAKER" => OrderType::LimitMaker,
3399            _ => OrderType::Limit,
3400        };
3401
3402        // Parse amount and price fields
3403        let amount = data
3404            .get("q")
3405            .and_then(|v| v.as_str())
3406            .and_then(|s| Decimal::from_str(s).ok())
3407            .unwrap_or(Decimal::ZERO);
3408
3409        let price = data
3410            .get("p")
3411            .and_then(|v| v.as_str())
3412            .and_then(|s| Decimal::from_str(s).ok());
3413
3414        let filled = data
3415            .get("z")
3416            .and_then(|v| v.as_str())
3417            .and_then(|s| Decimal::from_str(s).ok());
3418
3419        let cost = data
3420            .get("Z")
3421            .and_then(|v| v.as_str())
3422            .and_then(|s| Decimal::from_str(s).ok());
3423
3424        // Derive remaining quantity
3425        let remaining = match filled {
3426            Some(fill) => Some(amount - fill),
3427            None => None,
3428        };
3429
3430        // Compute average price
3431        let average = match (filled, cost) {
3432            (Some(fill), Some(c)) if fill > Decimal::ZERO && c > Decimal::ZERO => Some(c / fill),
3433            _ => None,
3434        };
3435
3436        // Parse timestamps
3437        let timestamp = data.get("T").and_then(|v| v.as_i64());
3438        let last_trade_timestamp = data.get("T").and_then(|v| v.as_i64());
3439
3440        Ok(Order {
3441            id: order_id,
3442            client_order_id,
3443            timestamp,
3444            datetime: timestamp.map(|ts| {
3445                chrono::DateTime::from_timestamp_millis(ts)
3446                    .map(|dt| dt.to_rfc3339())
3447                    .unwrap_or_default()
3448            }),
3449            last_trade_timestamp,
3450            symbol: symbol.to_string(),
3451            order_type,
3452            side,
3453            price,
3454            average,
3455            amount,
3456            cost,
3457            filled,
3458            remaining,
3459            status,
3460            fee: None,
3461            fees: None,
3462            trades: None,
3463            time_in_force: data.get("f").and_then(|v| v.as_str()).map(String::from),
3464            post_only: None,
3465            reduce_only: None,
3466            stop_price: data
3467                .get("P")
3468                .and_then(|v| v.as_str())
3469                .and_then(|s| Decimal::from_str(s).ok()),
3470            trigger_price: None,
3471            take_profit_price: None,
3472            stop_loss_price: None,
3473            trailing_delta: None,
3474            trailing_percent: None,
3475            activation_price: None,
3476            callback_rate: None,
3477            working_type: data.get("wt").and_then(|v| v.as_str()).map(String::from),
3478            info: data.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
3479        })
3480    }
3481
3482    /// Filters cached orders by symbol, time range, and limit
3483    async fn filter_orders(
3484        &self,
3485        ws: &BinanceWs,
3486        symbol: Option<&str>,
3487        since: Option<i64>,
3488        limit: Option<usize>,
3489    ) -> Result<Vec<Order>> {
3490        let orders_map = ws.orders.read().await;
3491
3492        // Filter by symbol when provided
3493        let mut orders: Vec<Order> = if let Some(sym) = symbol {
3494            orders_map
3495                .get(sym)
3496                .map(|symbol_orders| symbol_orders.values().cloned().collect())
3497                .unwrap_or_default()
3498        } else {
3499            orders_map
3500                .values()
3501                .flat_map(|symbol_orders| symbol_orders.values().cloned())
3502                .collect()
3503        };
3504
3505        // Apply optional `since` filter
3506        if let Some(since_ts) = since {
3507            orders.retain(|order| order.timestamp.map_or(false, |ts| ts >= since_ts));
3508        }
3509
3510        // Sort by timestamp descending
3511        orders.sort_by(|a, b| {
3512            let ts_a = a.timestamp.unwrap_or(0);
3513            let ts_b = b.timestamp.unwrap_or(0);
3514            ts_b.cmp(&ts_a)
3515        });
3516
3517        // Apply optional limit
3518        if let Some(lim) = limit {
3519            orders.truncate(lim);
3520        }
3521
3522        Ok(orders)
3523    }
3524
3525    /// Watches authenticated user trade updates
3526    ///
3527    /// # Arguments
3528    /// * `symbol` - Optional trading pair to filter (None subscribes to all)
3529    /// * `since` - Starting timestamp in milliseconds
3530    /// * `limit` - Maximum number of trades to return
3531    /// * `params` - Additional parameters
3532    ///
3533    /// # Returns
3534    /// List of trade records
3535    ///
3536    /// # Example
3537    /// ```rust,no_run
3538    /// use std::sync::Arc;
3539    /// use ccxt_exchanges::binance::Binance;
3540    /// use ccxt_core::ExchangeConfig;
3541    ///
3542    /// #[tokio::main]
3543    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
3544    ///     let mut config = ExchangeConfig::default();
3545    ///     config.api_key = Some("your-api-key".to_string());
3546    ///     config.secret = Some("your-secret".to_string());
3547    ///     let exchange = Arc::new(Binance::new(config)?);
3548    ///
3549    ///     // Subscribe to BTC/USDT trade updates
3550    ///     let trades = exchange.clone().watch_my_trades(Some("BTC/USDT"), None, None, None).await?;
3551    ///     println!("My trades: {:?}", trades);
3552    ///     Ok(())
3553    /// }
3554    /// ```
3555    pub async fn watch_my_trades(
3556        self: Arc<Self>,
3557        symbol: Option<&str>,
3558        since: Option<i64>,
3559        limit: Option<usize>,
3560        _params: Option<HashMap<String, Value>>,
3561    ) -> Result<Vec<Trade>> {
3562        // Establish authenticated WebSocket connection
3563        let ws = self.create_authenticated_ws();
3564        ws.connect().await?;
3565
3566        // Process trade update messages
3567        let mut retries = 0;
3568        const MAX_RETRIES: u32 = 100;
3569
3570        while retries < MAX_RETRIES {
3571            if let Some(msg) = ws.client.receive().await {
3572                // Skip subscription acknowledgements
3573                if msg.get("result").is_some() || msg.get("id").is_some() {
3574                    continue;
3575                }
3576
3577                // Identify event type
3578                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3579                    // Handle executionReport events containing trade updates
3580                    if event_type == "executionReport" {
3581                        if let Ok(trade) = ws.parse_ws_trade(&msg) {
3582                            let symbol_key = trade.symbol.clone();
3583
3584                            // Update cached trades
3585                            let mut trades_map = ws.my_trades.write().await;
3586                            let symbol_trades =
3587                                trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
3588
3589                            // Prepend latest trade; bound cache to 1000 entries
3590                            symbol_trades.push_front(trade);
3591                            if symbol_trades.len() > 1000 {
3592                                symbol_trades.pop_back();
3593                            }
3594                        }
3595                    }
3596                }
3597            } else {
3598                tokio::time::sleep(Duration::from_millis(100)).await;
3599            }
3600
3601            retries += 1;
3602        }
3603
3604        // Filter and return personal trades
3605        ws.filter_my_trades(symbol, since, limit).await
3606    }
3607
3608    /// Watches authenticated futures position updates
3609    ///
3610    /// Receives ACCOUNT_UPDATE events via the user data stream to track changes to futures
3611    /// Supports both USD-margined (USD-M) and coin-margined (COIN-M) contracts.
3612    ///
3613    /// # Arguments
3614    /// * `symbols` - Optional list of symbols (None subscribes to all positions)
3615    /// * `since` - Optional starting timestamp
3616    /// * `limit` - Optional maximum number of positions to return
3617    /// * `params` - Optional parameters
3618    ///   - `type`: Market type (`future`/`delivery`, default `future`)
3619    ///   - `subType`: Subtype (`linear`/`inverse`)
3620    ///
3621    /// # Returns
3622    /// Collection of positions
3623    ///
3624    /// # Implementation Details
3625    /// 1. Subscribe to ACCOUNT_UPDATE events through the user data stream.
3626    /// 2. Parse the position data contained in the `P` array.
3627    /// 3. Update the internal position cache.
3628    /// 4. Filter results according to the provided arguments.
3629    ///
3630    /// # WebSocket Message Format
3631    /// ```json
3632    /// {
3633    ///   "e": "ACCOUNT_UPDATE",
3634    ///   "T": 1667881353112,
3635    ///   "E": 1667881353115,
3636    ///   "a": {
3637    ///     "P": [
3638    ///       {
3639    ///         "s": "BTCUSDT",
3640    ///         "pa": "-0.089",
3641    ///         "ep": "19700.03933",
3642    ///         "up": "1.53058860",
3643    ///         "mt": "isolated",
3644    ///         "ps": "BOTH"
3645    ///       }
3646    ///     ]
3647    ///   }
3648    /// }
3649    /// ```
3650    ///
3651    /// # Example
3652    /// ```rust,no_run
3653    /// # use ccxt_exchanges::binance::Binance;
3654    /// # use ccxt_core::ExchangeConfig;
3655    /// # use std::sync::Arc;
3656    /// # async fn example() -> ccxt_core::error::Result<()> {
3657    /// let exchange = Arc::new(Binance::new(ExchangeConfig::default())?);
3658    ///
3659    /// // Watch all positions
3660    /// let positions = exchange.clone().watch_positions(None, None, None, None).await?;
3661    /// for pos in positions {
3662    ///     println!("Symbol: {}, Side: {:?}, Contracts: {:?}",
3663    ///              pos.symbol, pos.side, pos.contracts);
3664    /// }
3665    ///
3666    /// // Watch a subset of symbols
3667    /// let symbols = vec!["BTC/USDT".to_string(), "ETH/USDT".to_string()];
3668    /// let positions = exchange.clone().watch_positions(Some(symbols), None, Some(10), None).await?;
3669    /// # Ok(())
3670    /// # }
3671    /// ```
3672    pub async fn watch_positions(
3673        self: Arc<Self>,
3674        symbols: Option<Vec<String>>,
3675        since: Option<i64>,
3676        limit: Option<usize>,
3677        _params: Option<HashMap<String, Value>>,
3678    ) -> Result<Vec<Position>> {
3679        // Establish authenticated WebSocket connection
3680        let ws = self.create_authenticated_ws();
3681        ws.connect().await?;
3682
3683        // Process position update messages
3684        let mut retries = 0;
3685        const MAX_RETRIES: u32 = 100;
3686
3687        while retries < MAX_RETRIES {
3688            if let Some(msg) = ws.client.receive().await {
3689                // Skip subscription acknowledgement messages
3690                if msg.get("result").is_some() || msg.get("id").is_some() {
3691                    continue;
3692                }
3693
3694                // Handle ACCOUNT_UPDATE events only
3695                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3696                    if event_type == "ACCOUNT_UPDATE" {
3697                        if let Some(account_data) = msg.get("a") {
3698                            if let Some(positions_array) =
3699                                account_data.get("P").and_then(|p| p.as_array())
3700                            {
3701                                for position_data in positions_array {
3702                                    if let Ok(position) = ws.parse_ws_position(position_data).await
3703                                    {
3704                                        let symbol_key = position.symbol.clone();
3705                                        let side_key = position
3706                                            .side
3707                                            .clone()
3708                                            .unwrap_or_else(|| "both".to_string());
3709
3710                                        // Update cached positions
3711                                        let mut positions_map = ws.positions.write().await;
3712                                        let symbol_positions = positions_map
3713                                            .entry(symbol_key)
3714                                            .or_insert_with(HashMap::new);
3715
3716                                        // Remove positions with effectively zero contracts
3717                                        if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
3718                                            symbol_positions.remove(&side_key);
3719                                            if symbol_positions.is_empty() {
3720                                                positions_map.remove(&position.symbol);
3721                                            }
3722                                        } else {
3723                                            symbol_positions.insert(side_key, position);
3724                                        }
3725                                    }
3726                                }
3727                            }
3728                        }
3729                    }
3730                }
3731            } else {
3732                tokio::time::sleep(Duration::from_millis(100)).await;
3733            }
3734
3735            retries += 1;
3736        }
3737
3738        // Filter and return positions
3739        let symbols_ref = symbols.as_ref().map(|v| v.as_slice());
3740        ws.filter_positions(symbols_ref, since, limit).await
3741    }
3742}
3743
3744#[cfg(test)]
3745mod tests {
3746    use super::*;
3747
3748    #[test]
3749    fn test_binance_ws_creation() {
3750        let ws = BinanceWs::new(WS_BASE_URL.to_string());
3751        // Basic creation test: ensure the listen key lock is accessible
3752        assert!(ws.listen_key.try_read().is_ok());
3753    }
3754
3755    #[test]
3756    fn test_stream_format() {
3757        let symbol = "btcusdt";
3758
3759        // Ticker stream format
3760        let ticker_stream = format!("{}@ticker", symbol);
3761        assert_eq!(ticker_stream, "btcusdt@ticker");
3762
3763        // Trade stream format
3764        let trade_stream = format!("{}@trade", symbol);
3765        assert_eq!(trade_stream, "btcusdt@trade");
3766
3767        // Depth stream format
3768        let depth_stream = format!("{}@depth20", symbol);
3769        assert_eq!(depth_stream, "btcusdt@depth20");
3770
3771        // Kline stream format
3772        let kline_stream = format!("{}@kline_1m", symbol);
3773        assert_eq!(kline_stream, "btcusdt@kline_1m");
3774    }
3775
3776    #[tokio::test]
3777    async fn test_subscription_manager_basic() {
3778        let manager = SubscriptionManager::new();
3779        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
3780
3781        // Validate initial state
3782        assert_eq!(manager.active_count(), 0);
3783        assert!(!manager.has_subscription("btcusdt@ticker").await);
3784
3785        // Add a subscription
3786        manager
3787            .add_subscription(
3788                "btcusdt@ticker".to_string(),
3789                "BTCUSDT".to_string(),
3790                SubscriptionType::Ticker,
3791                tx.clone(),
3792            )
3793            .await
3794            .unwrap();
3795
3796        assert_eq!(manager.active_count(), 1);
3797        assert!(manager.has_subscription("btcusdt@ticker").await);
3798
3799        // Retrieve subscription
3800        let sub = manager.get_subscription("btcusdt@ticker").await;
3801        assert!(sub.is_some());
3802        let sub = sub.unwrap();
3803        assert_eq!(sub.stream, "btcusdt@ticker");
3804        assert_eq!(sub.symbol, "BTCUSDT");
3805        assert_eq!(sub.sub_type, SubscriptionType::Ticker);
3806
3807        // Remove subscription
3808        manager.remove_subscription("btcusdt@ticker").await.unwrap();
3809        assert_eq!(manager.active_count(), 0);
3810        assert!(!manager.has_subscription("btcusdt@ticker").await);
3811    }
3812
3813    #[tokio::test]
3814    async fn test_subscription_manager_multiple() {
3815        let manager = SubscriptionManager::new();
3816        let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
3817        let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
3818        let (tx3, _rx3) = tokio::sync::mpsc::unbounded_channel();
3819
3820        // Add multiple subscriptions
3821        manager
3822            .add_subscription(
3823                "btcusdt@ticker".to_string(),
3824                "BTCUSDT".to_string(),
3825                SubscriptionType::Ticker,
3826                tx1,
3827            )
3828            .await
3829            .unwrap();
3830
3831        manager
3832            .add_subscription(
3833                "btcusdt@depth".to_string(),
3834                "BTCUSDT".to_string(),
3835                SubscriptionType::OrderBook,
3836                tx2,
3837            )
3838            .await
3839            .unwrap();
3840
3841        manager
3842            .add_subscription(
3843                "ethusdt@ticker".to_string(),
3844                "ETHUSDT".to_string(),
3845                SubscriptionType::Ticker,
3846                tx3,
3847            )
3848            .await
3849            .unwrap();
3850
3851        assert_eq!(manager.active_count(), 3);
3852
3853        // Query by symbol
3854        let btc_subs = manager.get_subscriptions_by_symbol("BTCUSDT").await;
3855        assert_eq!(btc_subs.len(), 2);
3856
3857        let eth_subs = manager.get_subscriptions_by_symbol("ETHUSDT").await;
3858        assert_eq!(eth_subs.len(), 1);
3859
3860        // Retrieve all subscriptions
3861        let all_subs = manager.get_all_subscriptions().await;
3862        assert_eq!(all_subs.len(), 3);
3863
3864        // Clear all subscriptions
3865        manager.clear().await;
3866        assert_eq!(manager.active_count(), 0);
3867    }
3868
3869    #[tokio::test]
3870    async fn test_subscription_type_from_stream() {
3871        // Ticker stream
3872        let sub_type = SubscriptionType::from_stream("btcusdt@ticker");
3873        assert_eq!(sub_type, Some(SubscriptionType::Ticker));
3874
3875        // Order book streams
3876        let sub_type = SubscriptionType::from_stream("btcusdt@depth");
3877        assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3878
3879        let sub_type = SubscriptionType::from_stream("btcusdt@depth@100ms");
3880        assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3881
3882        // Trade streams
3883        let sub_type = SubscriptionType::from_stream("btcusdt@trade");
3884        assert_eq!(sub_type, Some(SubscriptionType::Trades));
3885
3886        let sub_type = SubscriptionType::from_stream("btcusdt@aggTrade");
3887        assert_eq!(sub_type, Some(SubscriptionType::Trades));
3888
3889        // Kline streams
3890        let sub_type = SubscriptionType::from_stream("btcusdt@kline_1m");
3891        assert_eq!(sub_type, Some(SubscriptionType::Kline("1m".to_string())));
3892
3893        let sub_type = SubscriptionType::from_stream("btcusdt@kline_1h");
3894        assert_eq!(sub_type, Some(SubscriptionType::Kline("1h".to_string())));
3895
3896        // Mark price stream
3897        let sub_type = SubscriptionType::from_stream("btcusdt@markPrice");
3898        assert_eq!(sub_type, Some(SubscriptionType::MarkPrice));
3899
3900        // Book ticker stream
3901        let sub_type = SubscriptionType::from_stream("btcusdt@bookTicker");
3902        assert_eq!(sub_type, Some(SubscriptionType::BookTicker));
3903
3904        // Unknown stream
3905        let sub_type = SubscriptionType::from_stream("btcusdt@unknown");
3906        assert_eq!(sub_type, None);
3907    }
3908
3909    #[tokio::test]
3910    async fn test_subscription_send_message() {
3911        let manager = SubscriptionManager::new();
3912        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
3913
3914        // Add subscription
3915        manager
3916            .add_subscription(
3917                "btcusdt@ticker".to_string(),
3918                "BTCUSDT".to_string(),
3919                SubscriptionType::Ticker,
3920                tx,
3921            )
3922            .await
3923            .unwrap();
3924
3925        // Send message to stream
3926        let test_msg = serde_json::json!({
3927            "e": "24hrTicker",
3928            "s": "BTCUSDT",
3929            "c": "50000"
3930        });
3931
3932        let sent = manager
3933            .send_to_stream("btcusdt@ticker", test_msg.clone())
3934            .await;
3935        assert!(sent);
3936
3937        // Receive message
3938        let received = rx.recv().await;
3939        assert!(received.is_some());
3940        assert_eq!(received.unwrap(), test_msg);
3941    }
3942
3943    #[tokio::test]
3944    async fn test_subscription_send_to_symbol() {
3945        let manager = SubscriptionManager::new();
3946        let (tx1, mut rx1) = tokio::sync::mpsc::unbounded_channel();
3947        let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel();
3948
3949        // Add two subscriptions for the same symbol
3950        manager
3951            .add_subscription(
3952                "btcusdt@ticker".to_string(),
3953                "BTCUSDT".to_string(),
3954                SubscriptionType::Ticker,
3955                tx1,
3956            )
3957            .await
3958            .unwrap();
3959
3960        manager
3961            .add_subscription(
3962                "btcusdt@depth".to_string(),
3963                "BTCUSDT".to_string(),
3964                SubscriptionType::OrderBook,
3965                tx2,
3966            )
3967            .await
3968            .unwrap();
3969
3970        // Send a message to all subscriptions for the symbol
3971        let test_msg = serde_json::json!({
3972            "s": "BTCUSDT",
3973            "data": "test"
3974        });
3975
3976        let sent_count = manager.send_to_symbol("BTCUSDT", &test_msg).await;
3977        assert_eq!(sent_count, 2);
3978
3979        // Receive messages
3980        let received1 = rx1.recv().await;
3981        assert!(received1.is_some());
3982        assert_eq!(received1.unwrap(), test_msg);
3983
3984        let received2 = rx2.recv().await;
3985        assert!(received2.is_some());
3986        assert_eq!(received2.unwrap(), test_msg);
3987    }
3988
3989    #[test]
3990    fn test_symbol_conversion() {
3991        let symbol = "BTC/USDT";
3992        let binance_symbol = symbol.replace('/', "").to_lowercase();
3993        assert_eq!(binance_symbol, "btcusdt");
3994    }
3995
3996    // ==================== MessageRouter tests ====================
3997
3998    #[test]
3999    fn test_reconnect_config_default() {
4000        let config = ReconnectConfig::default();
4001
4002        assert!(config.enabled);
4003        assert_eq!(config.initial_delay_ms, 1000);
4004        assert_eq!(config.max_delay_ms, 30000);
4005        assert_eq!(config.backoff_multiplier, 2.0);
4006        assert_eq!(config.max_attempts, 0); // Unlimited retries
4007    }
4008
4009    #[test]
4010    fn test_reconnect_config_calculate_delay() {
4011        let config = ReconnectConfig::default();
4012
4013        // Exponential backoff tests
4014        assert_eq!(config.calculate_delay(0), 1000); // 1s
4015        assert_eq!(config.calculate_delay(1), 2000); // 2s
4016        assert_eq!(config.calculate_delay(2), 4000); // 4s
4017        assert_eq!(config.calculate_delay(3), 8000); // 8s
4018        assert_eq!(config.calculate_delay(4), 16000); // 16s
4019        assert_eq!(config.calculate_delay(5), 30000); // 30s (capped at max)
4020        assert_eq!(config.calculate_delay(6), 30000); // 30s (remains at max)
4021    }
4022
4023    #[test]
4024    fn test_reconnect_config_should_retry() {
4025        let mut config = ReconnectConfig::default();
4026
4027        // Unlimited retries
4028        assert!(config.should_retry(0));
4029        assert!(config.should_retry(10));
4030        assert!(config.should_retry(100));
4031
4032        // Finite retries
4033        config.max_attempts = 3;
4034        assert!(config.should_retry(0));
4035        assert!(config.should_retry(1));
4036        assert!(config.should_retry(2));
4037        assert!(!config.should_retry(3));
4038        assert!(!config.should_retry(4));
4039
4040        // Disabled retries
4041        config.enabled = false;
4042        assert!(!config.should_retry(0));
4043        assert!(!config.should_retry(1));
4044    }
4045
4046    #[test]
4047    fn test_message_router_extract_stream_name_combined() {
4048        // Combined stream format
4049        let message = serde_json::json!({
4050            "stream": "btcusdt@ticker",
4051            "data": {
4052                "e": "24hrTicker",
4053                "s": "BTCUSDT"
4054            }
4055        });
4056
4057        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4058        assert_eq!(stream_name, "btcusdt@ticker");
4059    }
4060
4061    #[test]
4062    fn test_message_router_extract_stream_name_ticker() {
4063        // Ticker single stream format
4064        let message = serde_json::json!({
4065            "e": "24hrTicker",
4066            "s": "BTCUSDT",
4067            "E": 1672531200000_u64,
4068            "c": "16950.00",
4069            "h": "17100.00"
4070        });
4071
4072        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4073        assert_eq!(stream_name, "btcusdt@ticker");
4074    }
4075
4076    #[test]
4077    fn test_message_router_extract_stream_name_depth() {
4078        // Depth single stream format
4079        let message = serde_json::json!({
4080            "e": "depthUpdate",
4081            "s": "ETHUSDT",
4082            "E": 1672531200000_u64,
4083            "U": 157,
4084            "u": 160
4085        });
4086
4087        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4088        assert_eq!(stream_name, "ethusdt@depth");
4089    }
4090
4091    #[test]
4092    fn test_message_router_extract_stream_name_trade() {
4093        // Test trade single-stream format
4094        let message = serde_json::json!({
4095            "e": "trade",
4096            "s": "BNBUSDT",
4097            "E": 1672531200000_u64,
4098            "t": 12345
4099        });
4100
4101        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4102        assert_eq!(stream_name, "bnbusdt@trade");
4103    }
4104
4105    #[test]
4106    fn test_message_router_extract_stream_name_kline() {
4107        // Kline single stream format
4108        let message = serde_json::json!({
4109            "e": "kline",
4110            "s": "BTCUSDT",
4111            "E": 1672531200000_u64,
4112            "k": {
4113                "i": "1m",
4114                "t": 1672531200000_u64,
4115                "o": "16950.00"
4116            }
4117        });
4118
4119        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4120        assert_eq!(stream_name, "btcusdt@kline_1m");
4121    }
4122
4123    #[test]
4124    fn test_message_router_extract_stream_name_mark_price() {
4125        // Mark price single stream format
4126        let message = serde_json::json!({
4127            "e": "markPriceUpdate",
4128            "s": "BTCUSDT",
4129            "E": 1672531200000_u64,
4130            "p": "16950.00"
4131        });
4132
4133        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4134        assert_eq!(stream_name, "btcusdt@markPrice");
4135    }
4136
4137    #[test]
4138    fn test_message_router_extract_stream_name_book_ticker() {
4139        // Book ticker single stream format
4140        let message = serde_json::json!({
4141            "e": "bookTicker",
4142            "s": "ETHUSDT",
4143            "E": 1672531200000_u64,
4144            "b": "1200.00",
4145            "a": "1200.50"
4146        });
4147
4148        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4149        assert_eq!(stream_name, "ethusdt@bookTicker");
4150    }
4151
4152    #[test]
4153    fn test_message_router_extract_stream_name_subscription_response() {
4154        // Subscription response should yield an error
4155        let message = serde_json::json!({
4156            "result": null,
4157            "id": 1
4158        });
4159
4160        let result = MessageRouter::extract_stream_name(&message);
4161        assert!(result.is_err());
4162    }
4163
4164    #[test]
4165    fn test_message_router_extract_stream_name_error_response() {
4166        // Error responses should yield an error
4167        let message = serde_json::json!({
4168            "error": {
4169                "code": -1,
4170                "msg": "Invalid request"
4171            },
4172            "id": 1
4173        });
4174
4175        let result = MessageRouter::extract_stream_name(&message);
4176        assert!(result.is_err());
4177    }
4178
4179    #[test]
4180    fn test_message_router_extract_stream_name_invalid() {
4181        // Invalid message format
4182        let message = serde_json::json!({
4183            "unknown": "data"
4184        });
4185
4186        let result = MessageRouter::extract_stream_name(&message);
4187        assert!(result.is_err());
4188    }
4189
4190    #[tokio::test]
4191    async fn test_message_router_creation() {
4192        let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4193        let subscription_manager = Arc::new(SubscriptionManager::new());
4194
4195        let router = MessageRouter::new(ws_url.clone(), subscription_manager);
4196
4197        // Validate initial state
4198        assert!(!router.is_connected());
4199        assert_eq!(router.ws_url, ws_url);
4200    }
4201
4202    #[tokio::test]
4203    async fn test_message_router_reconnect_config() {
4204        let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4205        let subscription_manager = Arc::new(SubscriptionManager::new());
4206
4207        let router = MessageRouter::new(ws_url, subscription_manager);
4208
4209        // Default configuration
4210        let config = router.get_reconnect_config().await;
4211        assert!(config.enabled);
4212        assert_eq!(config.initial_delay_ms, 1000);
4213
4214        // Setting new configuration
4215        let new_config = ReconnectConfig {
4216            enabled: false,
4217            initial_delay_ms: 2000,
4218            max_delay_ms: 60000,
4219            backoff_multiplier: 1.5,
4220            max_attempts: 5,
4221        };
4222
4223        router.set_reconnect_config(new_config.clone()).await;
4224
4225        let updated_config = router.get_reconnect_config().await;
4226        assert!(!updated_config.enabled);
4227        assert_eq!(updated_config.initial_delay_ms, 2000);
4228        assert_eq!(updated_config.max_delay_ms, 60000);
4229        assert_eq!(updated_config.backoff_multiplier, 1.5);
4230        assert_eq!(updated_config.max_attempts, 5);
4231    }
4232}