ccxt_exchanges/binance/
ws.rs

1//! Binance WebSocket implementation
2//!
3//! Provides WebSocket real-time data stream subscriptions for the Binance exchange
4
5use crate::binance::Binance;
6use crate::binance::parser;
7use ccxt_core::error::{Error, Result};
8use ccxt_core::types::financial::{Amount, Cost, Price};
9use ccxt_core::types::{
10    Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
11};
12use ccxt_core::ws_client::{WsClient, WsConfig};
13use serde_json::Value;
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock};
18use tokio::task::JoinHandle;
19use tokio_tungstenite::tungstenite::protocol::Message;
20
21/// Binance WebSocket endpoints
22#[allow(dead_code)]
23const WS_BASE_URL: &str = "wss://stream.binance.com:9443/ws";
24#[allow(dead_code)]
25const WS_TESTNET_URL: &str = "wss://testnet.binance.vision/ws";
26
27/// Listen key refresh interval (30 minutes)
28const LISTEN_KEY_REFRESH_INTERVAL: Duration = Duration::from_secs(30 * 60);
29
30/// Listen key manager
31///
32/// Automatically manages Binance user data stream listen keys by:
33/// - Creating and caching listen keys
34/// - Refreshing them every 30 minutes
35/// - Detecting expiration and rebuilding
36/// - Tracking connection state
37pub struct ListenKeyManager {
38    /// Reference to the Binance instance
39    binance: Arc<Binance>,
40    /// Currently active listen key
41    listen_key: Arc<RwLock<Option<String>>>,
42    /// Listen key creation timestamp
43    created_at: Arc<RwLock<Option<Instant>>>,
44    /// Configured refresh interval
45    refresh_interval: Duration,
46    /// Handle to the auto-refresh task
47    refresh_task: Arc<Mutex<Option<JoinHandle<()>>>>,
48}
49
50impl ListenKeyManager {
51    /// Creates a new `ListenKeyManager`
52    ///
53    /// # Arguments
54    /// * `binance` - Reference-counted Binance instance
55    ///
56    /// # Returns
57    /// Configured `ListenKeyManager`
58    pub fn new(binance: Arc<Binance>) -> Self {
59        Self {
60            binance,
61            listen_key: Arc::new(RwLock::new(None)),
62            created_at: Arc::new(RwLock::new(None)),
63            refresh_interval: LISTEN_KEY_REFRESH_INTERVAL,
64            refresh_task: Arc::new(Mutex::new(None)),
65        }
66    }
67
68    /// Retrieves or creates a listen key
69    ///
70    /// Returns an existing valid listen key when present; otherwise creates a new one.
71    ///
72    /// # Returns
73    /// Listen key string
74    ///
75    /// # Errors
76    /// - Failed to create the listen key
77    /// - Missing API credentials
78    pub async fn get_or_create(&self) -> Result<String> {
79        // Check if a listen key already exists
80        let key_opt = self.listen_key.read().await.clone();
81
82        if let Some(key) = key_opt {
83            // Check whether the key needs to be refreshed
84            let created = self.created_at.read().await;
85            if let Some(created_time) = *created {
86                let elapsed = created_time.elapsed();
87                // If more than 50 minutes have elapsed, create a new key
88                if elapsed > Duration::from_secs(50 * 60) {
89                    drop(created);
90                    return self.create_new().await;
91                }
92            }
93            return Ok(key);
94        }
95
96        // Create a new listen key
97        self.create_new().await
98    }
99
100    /// Creates a new listen key
101    ///
102    /// # Returns
103    /// Newly created listen key
104    async fn create_new(&self) -> Result<String> {
105        let key = self.binance.create_listen_key().await?;
106
107        // Update cache
108        *self.listen_key.write().await = Some(key.clone());
109        *self.created_at.write().await = Some(Instant::now());
110
111        Ok(key)
112    }
113
114    /// Refreshes the current listen key
115    ///
116    /// Extends the listen key lifetime by 60 minutes
117    ///
118    /// # Returns
119    /// Refresh result
120    pub async fn refresh(&self) -> Result<()> {
121        let key_opt = self.listen_key.read().await.clone();
122
123        if let Some(key) = key_opt {
124            self.binance.refresh_listen_key(&key).await?;
125            // Update the creation timestamp
126            *self.created_at.write().await = Some(Instant::now());
127            Ok(())
128        } else {
129            Err(Error::invalid_request("No listen key to refresh"))
130        }
131    }
132
133    /// Starts the auto-refresh task
134    ///
135    /// Refreshes the listen key every 30 minutes
136    pub async fn start_auto_refresh(&self) {
137        // Stop any existing task
138        self.stop_auto_refresh().await;
139
140        let listen_key = self.listen_key.clone();
141        let created_at = self.created_at.clone();
142        let binance = self.binance.clone();
143        let interval = self.refresh_interval;
144
145        let handle = tokio::spawn(async move {
146            loop {
147                tokio::time::sleep(interval).await;
148
149                // Check whether a listen key exists
150                let key_opt = listen_key.read().await.clone();
151                if let Some(key) = key_opt {
152                    // Attempt to refresh the key
153                    match binance.refresh_listen_key(&key).await {
154                        Ok(_) => {
155                            *created_at.write().await = Some(Instant::now());
156                            // Listen key refreshed successfully
157                        }
158                        Err(_e) => {
159                            // Failed to refresh; clear cache so a new key will be created next time
160                            *listen_key.write().await = None;
161                            *created_at.write().await = None;
162                            break;
163                        }
164                    }
165                } else {
166                    // No listen key available; stop the task
167                    break;
168                }
169            }
170        });
171
172        *self.refresh_task.lock().await = Some(handle);
173    }
174
175    /// Stops the auto-refresh task
176    pub async fn stop_auto_refresh(&self) {
177        let mut task_opt = self.refresh_task.lock().await;
178        if let Some(handle) = task_opt.take() {
179            handle.abort();
180        }
181    }
182
183    /// Deletes the listen key
184    ///
185    /// Closes the user data stream and invalidates the key
186    ///
187    /// # Returns
188    /// Result of the deletion
189    pub async fn delete(&self) -> Result<()> {
190        // Stop the auto-refresh first
191        self.stop_auto_refresh().await;
192
193        let key_opt = self.listen_key.read().await.clone();
194
195        if let Some(key) = key_opt {
196            self.binance.delete_listen_key(&key).await?;
197
198            // Clear cached state
199            *self.listen_key.write().await = None;
200            *self.created_at.write().await = None;
201
202            Ok(())
203        } else {
204            Ok(()) // No listen key; treat as success
205        }
206    }
207
208    /// Returns the current listen key when available
209    pub async fn get_current(&self) -> Option<String> {
210        self.listen_key.read().await.clone()
211    }
212
213    /// Checks whether the listen key is still valid
214    pub async fn is_valid(&self) -> bool {
215        let key_opt = self.listen_key.read().await;
216        if key_opt.is_none() {
217            return false;
218        }
219
220        let created = self.created_at.read().await;
221        if let Some(created_time) = *created {
222            // Key is considered valid if less than 55 minutes old
223            created_time.elapsed() < Duration::from_secs(55 * 60)
224        } else {
225            false
226        }
227    }
228}
229
230impl Drop for ListenKeyManager {
231    fn drop(&mut self) {
232        // Note: Drop is synchronous, so we cannot await asynchronous operations here.
233        // Callers should explicitly invoke `delete()` to release resources.
234    }
235}
236/// Subscription types supported by the Binance WebSocket API.
237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum SubscriptionType {
239    /// 24-hour ticker stream
240    Ticker,
241    /// Order book depth stream
242    OrderBook,
243    /// Real-time trade stream
244    Trades,
245    /// Kline (candlestick) stream with interval (e.g. "1m", "5m", "1h")
246    Kline(String),
247    /// Account balance stream
248    Balance,
249    /// Order update stream
250    Orders,
251    /// Position update stream
252    Positions,
253    /// Personal trade execution stream
254    MyTrades,
255    /// Mark price stream
256    MarkPrice,
257    /// Book ticker (best bid/ask) stream
258    BookTicker,
259}
260
261impl SubscriptionType {
262    /// Infers a subscription type from a stream name
263    ///
264    /// # Arguments
265    /// * `stream` - Stream identifier (e.g. "btcusdt@ticker")
266    ///
267    /// # Returns
268    /// Subscription type, when the stream can be identified
269    pub fn from_stream(stream: &str) -> Option<Self> {
270        if stream.contains("@ticker") {
271            Some(Self::Ticker)
272        } else if stream.contains("@depth") {
273            Some(Self::OrderBook)
274        } else if stream.contains("@trade") || stream.contains("@aggTrade") {
275            Some(Self::Trades)
276        } else if stream.contains("@kline_") {
277            // Extract timeframe suffix
278            let parts: Vec<&str> = stream.split("@kline_").collect();
279            if parts.len() == 2 {
280                Some(Self::Kline(parts[1].to_string()))
281            } else {
282                None
283            }
284        } else if stream.contains("@markPrice") {
285            Some(Self::MarkPrice)
286        } else if stream.contains("@bookTicker") {
287            Some(Self::BookTicker)
288        } else {
289            None
290        }
291    }
292}
293
294/// Subscription metadata
295#[derive(Clone)]
296pub struct Subscription {
297    /// Stream name (e.g. "btcusdt@ticker")
298    pub stream: String,
299    /// Normalized trading symbol (e.g. "BTCUSDT")
300    pub symbol: String,
301    /// Subscription type descriptor
302    pub sub_type: SubscriptionType,
303    /// Timestamp when the subscription was created
304    pub subscribed_at: Instant,
305    /// Sender for forwarding WebSocket messages to consumers
306    pub sender: tokio::sync::mpsc::UnboundedSender<Value>,
307}
308
309impl Subscription {
310    /// Creates a new subscription with the provided parameters
311    pub fn new(
312        stream: String,
313        symbol: String,
314        sub_type: SubscriptionType,
315        sender: tokio::sync::mpsc::UnboundedSender<Value>,
316    ) -> Self {
317        Self {
318            stream,
319            symbol,
320            sub_type,
321            subscribed_at: Instant::now(),
322            sender,
323        }
324    }
325
326    /// Sends a message to the subscriber
327    ///
328    /// # Arguments
329    /// * `message` - WebSocket payload to forward
330    ///
331    /// # Returns
332    /// Whether the message was delivered successfully
333    pub fn send(&self, message: Value) -> bool {
334        self.sender.send(message).is_ok()
335    }
336}
337
338/// Subscription manager
339///
340/// Manages the lifecycle of all WebSocket subscriptions, including:
341/// - Adding and removing subscriptions
342/// - Querying and validating subscriptions
343/// - Tracking the number of active subscriptions
344pub struct SubscriptionManager {
345    /// Mapping of `stream_name -> Subscription`
346    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
347    /// Index by symbol: `symbol -> Vec<stream_name>`
348    symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
349    /// Counter of active subscriptions
350    active_count: Arc<std::sync::atomic::AtomicUsize>,
351}
352
353impl SubscriptionManager {
354    /// Creates a new `SubscriptionManager`
355    pub fn new() -> Self {
356        Self {
357            subscriptions: Arc::new(RwLock::new(HashMap::new())),
358            symbol_index: Arc::new(RwLock::new(HashMap::new())),
359            active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
360        }
361    }
362
363    /// Adds a subscription to the manager
364    ///
365    /// # Arguments
366    /// * `stream` - Stream identifier (e.g. "btcusdt@ticker")
367    /// * `symbol` - Normalized trading symbol (e.g. "BTCUSDT")
368    /// * `sub_type` - Subscription classification
369    /// * `sender` - Channel for dispatching messages
370    ///
371    /// # Returns
372    /// Result of the insertion
373    pub async fn add_subscription(
374        &self,
375        stream: String,
376        symbol: String,
377        sub_type: SubscriptionType,
378        sender: tokio::sync::mpsc::UnboundedSender<Value>,
379    ) -> Result<()> {
380        let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
381
382        // Insert into the subscription map
383        let mut subs = self.subscriptions.write().await;
384        subs.insert(stream.clone(), subscription);
385
386        // Update the per-symbol index
387        let mut index = self.symbol_index.write().await;
388        index.entry(symbol).or_insert_with(Vec::new).push(stream);
389
390        // Increment the active subscription count
391        self.active_count
392            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
393
394        Ok(())
395    }
396
397    /// Removes a subscription by stream name
398    ///
399    /// # Arguments
400    /// * `stream` - Stream identifier to remove
401    ///
402    /// # Returns
403    /// Result of the removal
404    pub async fn remove_subscription(&self, stream: &str) -> Result<()> {
405        let mut subs = self.subscriptions.write().await;
406
407        if let Some(subscription) = subs.remove(stream) {
408            // Remove from the symbol index as well
409            let mut index = self.symbol_index.write().await;
410            if let Some(streams) = index.get_mut(&subscription.symbol) {
411                streams.retain(|s| s != stream);
412                // Drop the symbol entry when no subscriptions remain
413                if streams.is_empty() {
414                    index.remove(&subscription.symbol);
415                }
416            }
417
418            // Decrement the active subscription counter
419            self.active_count
420                .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
421        }
422
423        Ok(())
424    }
425
426    /// Retrieves a subscription by stream name
427    ///
428    /// # Arguments
429    /// * `stream` - Stream identifier
430    ///
431    /// # Returns
432    /// Optional subscription record
433    pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
434        let subs = self.subscriptions.read().await;
435        subs.get(stream).cloned()
436    }
437
438    /// Checks whether a subscription exists for the given stream
439    ///
440    /// # Arguments
441    /// * `stream` - Stream identifier
442    ///
443    /// # Returns
444    /// `true` if the subscription exists, otherwise `false`
445    pub async fn has_subscription(&self, stream: &str) -> bool {
446        let subs = self.subscriptions.read().await;
447        subs.contains_key(stream)
448    }
449
450    /// Returns all registered subscriptions
451    pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
452        let subs = self.subscriptions.read().await;
453        subs.values().cloned().collect()
454    }
455
456    /// Returns all subscriptions associated with a symbol
457    ///
458    /// # Arguments
459    /// * `symbol` - Trading symbol
460    pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
461        let index = self.symbol_index.read().await;
462        let subs = self.subscriptions.read().await;
463
464        if let Some(streams) = index.get(symbol) {
465            streams
466                .iter()
467                .filter_map(|stream| subs.get(stream).cloned())
468                .collect()
469        } else {
470            Vec::new()
471        }
472    }
473
474    /// Returns the number of active subscriptions
475    pub fn active_count(&self) -> usize {
476        self.active_count.load(std::sync::atomic::Ordering::SeqCst)
477    }
478
479    /// Removes all subscriptions and clears indexes
480    pub async fn clear(&self) {
481        let mut subs = self.subscriptions.write().await;
482        let mut index = self.symbol_index.write().await;
483
484        subs.clear();
485        index.clear();
486        self.active_count
487            .store(0, std::sync::atomic::Ordering::SeqCst);
488    }
489
490    /// Sends a message to subscribers of a specific stream
491    ///
492    /// # Arguments
493    /// * `stream` - Stream identifier
494    /// * `message` - WebSocket payload to broadcast
495    ///
496    /// # Returns
497    /// `true` when the message was delivered, otherwise `false`
498    pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
499        let subs = self.subscriptions.read().await;
500        if let Some(subscription) = subs.get(stream) {
501            subscription.send(message)
502        } else {
503            false
504        }
505    }
506
507    /// Sends a message to all subscribers of a symbol
508    ///
509    /// # Arguments
510    /// * `symbol` - Trading symbol
511    /// * `message` - Shared WebSocket payload
512    ///
513    /// # Returns
514    /// Number of subscribers that received the message
515    pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
516        let index = self.symbol_index.read().await;
517        let subs = self.subscriptions.read().await;
518
519        let mut sent_count = 0;
520
521        if let Some(streams) = index.get(symbol) {
522            for stream in streams {
523                if let Some(subscription) = subs.get(stream) {
524                    if subscription.send(message.clone()) {
525                        sent_count += 1;
526                    }
527                }
528            }
529        }
530
531        sent_count
532    }
533}
534/// Reconnect configuration
535///
536/// Defines the automatic reconnection strategy after a WebSocket disconnect
537#[derive(Debug, Clone)]
538pub struct ReconnectConfig {
539    /// Enables or disables automatic reconnection
540    pub enabled: bool,
541
542    /// Initial reconnection delay in milliseconds
543    pub initial_delay_ms: u64,
544
545    /// Maximum reconnection delay in milliseconds
546    pub max_delay_ms: u64,
547
548    /// Exponential backoff multiplier
549    pub backoff_multiplier: f64,
550
551    /// Maximum number of reconnection attempts (0 means unlimited)
552    pub max_attempts: usize,
553}
554
555impl Default for ReconnectConfig {
556    fn default() -> Self {
557        Self {
558            enabled: true,
559            initial_delay_ms: 1000,  // 1 second
560            max_delay_ms: 30000,     // 30 seconds
561            backoff_multiplier: 2.0, // Exponential backoff
562            max_attempts: 0,         // Unlimited retries
563        }
564    }
565}
566
567impl ReconnectConfig {
568    /// Calculates the reconnection delay
569    ///
570    /// Uses exponential backoff to determine the delay duration
571    ///
572    /// # Arguments
573    /// * `attempt` - Current reconnection attempt (zero-based)
574    ///
575    /// # Returns
576    /// Delay duration in milliseconds
577    pub fn calculate_delay(&self, attempt: usize) -> u64 {
578        let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
579        delay.min(self.max_delay_ms as f64) as u64
580    }
581
582    /// Determines whether another reconnection attempt should be made
583    ///
584    /// # Arguments
585    /// * `attempt` - Current reconnection attempt
586    ///
587    /// # Returns
588    /// `true` if another retry should be attempted
589    pub fn should_retry(&self, attempt: usize) -> bool {
590        self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
591    }
592}
593
594/// Message router
595///
596/// Handles reception, parsing, and dispatching of WebSocket messages. Core responsibilities:
597/// - Managing WebSocket connections
598/// - Receiving and routing messages
599/// - Coordinating automatic reconnection
600/// - Managing subscriptions
601pub struct MessageRouter {
602    /// WebSocket client instance
603    ws_client: Arc<RwLock<Option<WsClient>>>,
604
605    /// Subscription manager registry
606    subscription_manager: Arc<SubscriptionManager>,
607
608    /// Handle to the background routing task
609    router_task: Arc<Mutex<Option<JoinHandle<()>>>>,
610
611    /// Connection state flag
612    is_connected: Arc<std::sync::atomic::AtomicBool>,
613
614    /// Configuration for reconnection behavior
615    reconnect_config: Arc<RwLock<ReconnectConfig>>,
616
617    /// WebSocket endpoint URL
618    ws_url: String,
619
620    /// Request ID counter (used for subscribe/unsubscribe)
621    request_id: Arc<std::sync::atomic::AtomicU64>,
622}
623
624impl MessageRouter {
625    /// Creates a new message router
626    ///
627    /// # Arguments
628    /// * `ws_url` - WebSocket connection URL
629    /// * `subscription_manager` - Subscription manager handle
630    ///
631    /// # Returns
632    /// Configured router instance
633    pub fn new(ws_url: String, subscription_manager: Arc<SubscriptionManager>) -> Self {
634        Self {
635            ws_client: Arc::new(RwLock::new(None)),
636            subscription_manager,
637            router_task: Arc::new(Mutex::new(None)),
638            is_connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
639            reconnect_config: Arc::new(RwLock::new(ReconnectConfig::default())),
640            ws_url,
641            request_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
642        }
643    }
644
645    /// Starts the message router
646    ///
647    /// Establishes the WebSocket connection and launches the message loop
648    ///
649    /// # Returns
650    /// Result of the startup sequence
651    pub async fn start(&self) -> Result<()> {
652        // Stop any running instance before starting again
653        if self.is_connected() {
654            self.stop().await?;
655        }
656
657        // Establish a new WebSocket connection
658        let config = WsConfig {
659            url: self.ws_url.clone(),
660            ..Default::default()
661        };
662        let client = WsClient::new(config);
663        client.connect().await?;
664
665        // Store the client handle
666        *self.ws_client.write().await = Some(client);
667
668        // Update connection state
669        self.is_connected
670            .store(true, std::sync::atomic::Ordering::SeqCst);
671
672        // Spawn the message processing loop
673        let ws_client = self.ws_client.clone();
674        let subscription_manager = self.subscription_manager.clone();
675        let is_connected = self.is_connected.clone();
676        let reconnect_config = self.reconnect_config.clone();
677        let ws_url = self.ws_url.clone();
678
679        let handle = tokio::spawn(async move {
680            Self::message_loop(
681                ws_client,
682                subscription_manager,
683                is_connected,
684                reconnect_config,
685                ws_url,
686            )
687            .await
688        });
689
690        *self.router_task.lock().await = Some(handle);
691
692        Ok(())
693    }
694
695    /// Stops the message router
696    ///
697    /// Terminates the message loop and disconnects the WebSocket client
698    ///
699    /// # Returns
700    /// Result of the shutdown procedure
701    pub async fn stop(&self) -> Result<()> {
702        // Mark the connection as inactive
703        self.is_connected
704            .store(false, std::sync::atomic::Ordering::SeqCst);
705
706        // Cancel the background routing task
707        let mut task_opt = self.router_task.lock().await;
708        if let Some(handle) = task_opt.take() {
709            handle.abort();
710        }
711
712        // Disconnect the WebSocket client if present
713        let mut client_opt = self.ws_client.write().await;
714        if let Some(client) = client_opt.take() {
715            let _ = client.disconnect().await;
716        }
717
718        Ok(())
719    }
720
721    /// Restarts the message router
722    ///
723    /// Stops the current connection and restarts it, typically used during reconnect scenarios
724    ///
725    /// # Returns
726    /// Result of the restart sequence
727    pub async fn restart(&self) -> Result<()> {
728        self.stop().await?;
729        tokio::time::sleep(Duration::from_millis(100)).await;
730        self.start().await
731    }
732
733    /// Returns the current connection state
734    pub fn is_connected(&self) -> bool {
735        self.is_connected.load(std::sync::atomic::Ordering::SeqCst)
736    }
737
738    /// Applies a new reconnection configuration
739    ///
740    /// # Arguments
741    /// * `config` - Updated reconnection configuration
742    pub async fn set_reconnect_config(&self, config: ReconnectConfig) {
743        *self.reconnect_config.write().await = config;
744    }
745
746    /// Retrieves the current reconnection configuration
747    pub async fn get_reconnect_config(&self) -> ReconnectConfig {
748        self.reconnect_config.read().await.clone()
749    }
750
751    /// Subscribes to the provided streams
752    ///
753    /// Sends a subscription request to Binance
754    ///
755    /// # Arguments
756    /// * `streams` - Collection of stream identifiers
757    pub async fn subscribe(&self, streams: Vec<String>) -> Result<()> {
758        if streams.is_empty() {
759            return Ok(());
760        }
761
762        let client_opt = self.ws_client.read().await;
763        let client = client_opt
764            .as_ref()
765            .ok_or_else(|| Error::network("WebSocket not connected"))?;
766
767        // Generate a request identifier
768        let id = self
769            .request_id
770            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
771
772        // Build the subscribe payload
773        #[allow(clippy::disallowed_methods)]
774        let request = serde_json::json!({
775            "method": "SUBSCRIBE",
776            "params": streams,
777            "id": id
778        });
779
780        // Send the request to Binance
781        client
782            .send(Message::Text(request.to_string().into()))
783            .await?;
784
785        Ok(())
786    }
787
788    /// Unsubscribes from the provided streams
789    ///
790    /// Sends an unsubscribe request to Binance
791    ///
792    /// # Arguments
793    /// * `streams` - Collection of stream identifiers
794    pub async fn unsubscribe(&self, streams: Vec<String>) -> Result<()> {
795        if streams.is_empty() {
796            return Ok(());
797        }
798
799        let client_opt = self.ws_client.read().await;
800        let client = client_opt
801            .as_ref()
802            .ok_or_else(|| Error::network("WebSocket not connected"))?;
803
804        // Generate a request identifier
805        let id = self
806            .request_id
807            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
808
809        // Build the unsubscribe payload
810        #[allow(clippy::disallowed_methods)]
811        let request = serde_json::json!({
812            "method": "UNSUBSCRIBE",
813            "params": streams,
814            "id": id
815        });
816
817        // Send the request to Binance
818        client
819            .send(Message::Text(request.to_string().into()))
820            .await?;
821
822        Ok(())
823    }
824
825    /// Message reception loop
826    ///
827    /// Continuously receives WebSocket messages and routes them to subscribers
828    async fn message_loop(
829        ws_client: Arc<RwLock<Option<WsClient>>>,
830        subscription_manager: Arc<SubscriptionManager>,
831        is_connected: Arc<std::sync::atomic::AtomicBool>,
832        reconnect_config: Arc<RwLock<ReconnectConfig>>,
833        ws_url: String,
834    ) {
835        let mut reconnect_attempt = 0;
836
837        loop {
838            // Exit if instructed to stop
839            if !is_connected.load(std::sync::atomic::Ordering::SeqCst) {
840                break;
841            }
842
843            // Confirm that a client instance exists
844            let has_client = ws_client.read().await.is_some();
845
846            if !has_client {
847                // Attempt to reconnect when no client is present
848                let config = reconnect_config.read().await;
849                if config.should_retry(reconnect_attempt) {
850                    let delay = config.calculate_delay(reconnect_attempt);
851                    drop(config);
852
853                    tokio::time::sleep(Duration::from_millis(delay)).await;
854
855                    match Self::reconnect(&ws_url, ws_client.clone()).await {
856                        Ok(_) => {
857                            reconnect_attempt = 0; // Reset counter on success
858                            continue;
859                        }
860                        Err(_) => {
861                            reconnect_attempt += 1;
862                            continue;
863                        }
864                    }
865                } else {
866                    // Stop attempting to reconnect
867                    is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
868                    break;
869                }
870            }
871
872            // Receive the next message (requires re-acquiring the lock)
873            let message_opt = {
874                let guard = ws_client.read().await;
875                if let Some(client) = guard.as_ref() {
876                    client.receive().await
877                } else {
878                    None
879                }
880            };
881
882            match message_opt {
883                Some(value) => {
884                    // Handle the message; ignore failures but continue the loop
885                    if let Err(_e) = Self::handle_message(value, subscription_manager.clone()).await
886                    {
887                        continue;
888                    }
889
890                    // Reset the reconnection counter because the connection is healthy
891                    reconnect_attempt = 0;
892                }
893                None => {
894                    // Connection failure; attempt to reconnect
895                    let config = reconnect_config.read().await;
896                    if config.should_retry(reconnect_attempt) {
897                        let delay = config.calculate_delay(reconnect_attempt);
898                        drop(config);
899
900                        tokio::time::sleep(Duration::from_millis(delay)).await;
901
902                        match Self::reconnect(&ws_url, ws_client.clone()).await {
903                            Ok(_) => {
904                                reconnect_attempt = 0;
905                                continue;
906                            }
907                            Err(_) => {
908                                reconnect_attempt += 1;
909                                continue;
910                            }
911                        }
912                    } else {
913                        // Reconnection not permitted anymore; stop the loop
914                        is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
915                        break;
916                    }
917                }
918            }
919        }
920    }
921
922    /// Processes a WebSocket message
923    ///
924    /// Parses the message and routes it to relevant subscribers
925    async fn handle_message(
926        message: Value,
927        subscription_manager: Arc<SubscriptionManager>,
928    ) -> Result<()> {
929        // Determine the stream name for routing
930        let stream_name = Self::extract_stream_name(&message)?;
931
932        // Forward the message to the corresponding subscribers
933        let sent = subscription_manager
934            .send_to_stream(&stream_name, message)
935            .await;
936
937        if sent {
938            Ok(())
939        } else {
940            Err(Error::generic("No subscribers for stream"))
941        }
942    }
943
944    /// Extracts the stream name from an incoming message
945    ///
946    /// Supports two formats:
947    /// 1. Combined stream: `{"stream":"btcusdt@ticker","data":{...}}`
948    /// 2. Single stream: `{"e":"24hrTicker","s":"BTCUSDT",...}`
949    ///
950    /// # Arguments
951    /// * `message` - Raw WebSocket payload
952    ///
953    /// # Returns
954    /// Stream identifier for routing
955    fn extract_stream_name(message: &Value) -> Result<String> {
956        // Attempt to read the combined-stream format
957        if let Some(stream) = message.get("stream").and_then(|s| s.as_str()) {
958            return Ok(stream.to_string());
959        }
960
961        // Fall back to single-stream format: event@symbol (lowercase)
962        if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
963            if let Some(symbol) = message.get("s").and_then(|s| s.as_str()) {
964                // Build the stream identifier
965                let stream = match event_type {
966                    "24hrTicker" => format!("{}@ticker", symbol.to_lowercase()),
967                    "depthUpdate" => format!("{}@depth", symbol.to_lowercase()),
968                    "aggTrade" => format!("{}@aggTrade", symbol.to_lowercase()),
969                    "trade" => format!("{}@trade", symbol.to_lowercase()),
970                    "kline" => {
971                        // Kline messages carry the interval within the "k" object
972                        if let Some(kline) = message.get("k") {
973                            if let Some(interval) = kline.get("i").and_then(|i| i.as_str()) {
974                                format!("{}@kline_{}", symbol.to_lowercase(), interval)
975                            } else {
976                                return Err(Error::generic("Missing kline interval"));
977                            }
978                        } else {
979                            return Err(Error::generic("Missing kline data"));
980                        }
981                    }
982                    "markPriceUpdate" => format!("{}@markPrice", symbol.to_lowercase()),
983                    "bookTicker" => format!("{}@bookTicker", symbol.to_lowercase()),
984                    _ => {
985                        return Err(Error::generic(format!(
986                            "Unknown event type: {}",
987                            event_type
988                        )));
989                    }
990                };
991                return Ok(stream);
992            }
993        }
994
995        // Ignore subscription acknowledgements and error responses
996        if message.get("result").is_some() || message.get("error").is_some() {
997            return Err(Error::generic("Subscription response, skip routing"));
998        }
999
1000        Err(Error::generic("Cannot extract stream name from message"))
1001    }
1002
1003    /// Reconnects the WebSocket client
1004    ///
1005    /// Closes the existing connection and establishes a new one
1006    async fn reconnect(ws_url: &str, ws_client: Arc<RwLock<Option<WsClient>>>) -> Result<()> {
1007        // Close the previous connection
1008        {
1009            let mut client_opt = ws_client.write().await;
1010            if let Some(client) = client_opt.take() {
1011                let _ = client.disconnect().await;
1012            }
1013        }
1014
1015        // Establish a fresh connection
1016        let config = WsConfig {
1017            url: ws_url.to_string(),
1018            ..Default::default()
1019        };
1020        let new_client = WsClient::new(config);
1021
1022        // Connect to the WebSocket endpoint
1023        new_client.connect().await?;
1024
1025        // Store the new client handle
1026        *ws_client.write().await = Some(new_client);
1027
1028        Ok(())
1029    }
1030}
1031
1032impl Drop for MessageRouter {
1033    fn drop(&mut self) {
1034        // Note: Drop is synchronous, so we cannot await asynchronous operations here.
1035        // Callers should explicitly invoke `stop()` to release resources.
1036    }
1037}
1038
1039impl Default for SubscriptionManager {
1040    fn default() -> Self {
1041        Self::new()
1042    }
1043}
1044
1045/// Binance WebSocket client wrapper
1046pub struct BinanceWs {
1047    client: Arc<WsClient>,
1048    listen_key: Arc<RwLock<Option<String>>>,
1049    /// Listen key manager
1050    listen_key_manager: Option<Arc<ListenKeyManager>>,
1051    /// Auto-reconnect coordinator
1052    auto_reconnect_coordinator: Arc<Mutex<Option<ccxt_core::ws_client::AutoReconnectCoordinator>>>,
1053    /// Cached ticker data
1054    tickers: Arc<Mutex<HashMap<String, Ticker>>>,
1055    /// Cached bid/ask snapshots
1056    bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
1057    /// Cached mark price entries
1058    #[allow(dead_code)]
1059    mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
1060    /// Cached order book snapshots
1061    orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
1062    /// Cached trade history (retains the latest 1000 entries)
1063    trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
1064    /// Cached OHLCV candles
1065    ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
1066    /// Cached balance data grouped by account type
1067    balances: Arc<RwLock<HashMap<String, Balance>>>,
1068    /// Cached orders grouped by symbol then order ID
1069    orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
1070    /// Cached personal trades grouped by symbol (retains the latest 1000 entries)
1071    my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
1072    /// Cached positions grouped by symbol and side
1073    positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
1074}
1075
1076impl BinanceWs {
1077    /// Creates a new Binance WebSocket client
1078    ///
1079    /// # Arguments
1080    /// * `url` - WebSocket server URL
1081    ///
1082    /// # Returns
1083    /// Initialized Binance WebSocket client
1084    pub fn new(url: String) -> Self {
1085        let config = WsConfig {
1086            url,
1087            connect_timeout: 10000,
1088            ping_interval: 180000, // Binance recommends 3 minutes
1089            reconnect_interval: 5000,
1090            max_reconnect_attempts: 5,
1091            auto_reconnect: true,
1092            enable_compression: false,
1093            pong_timeout: 90000, // Set pong timeout to 90 seconds
1094        };
1095
1096        Self {
1097            client: Arc::new(WsClient::new(config)),
1098            listen_key: Arc::new(RwLock::new(None)),
1099            listen_key_manager: None,
1100            auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1101            tickers: Arc::new(Mutex::new(HashMap::new())),
1102            bids_asks: Arc::new(Mutex::new(HashMap::new())),
1103            mark_prices: Arc::new(Mutex::new(HashMap::new())),
1104            orderbooks: Arc::new(Mutex::new(HashMap::new())),
1105            trades: Arc::new(Mutex::new(HashMap::new())),
1106            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1107            balances: Arc::new(RwLock::new(HashMap::new())),
1108            orders: Arc::new(RwLock::new(HashMap::new())),
1109            my_trades: Arc::new(RwLock::new(HashMap::new())),
1110            positions: Arc::new(RwLock::new(HashMap::new())),
1111        }
1112    }
1113
1114    /// Creates a WebSocket client with a listen key manager
1115    ///
1116    /// # Arguments
1117    /// * `url` - WebSocket server URL
1118    /// * `binance` - Shared Binance instance
1119    ///
1120    /// # Returns
1121    /// Binance WebSocket client configured with a listen key manager
1122    pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
1123        let config = WsConfig {
1124            url,
1125            connect_timeout: 10000,
1126            ping_interval: 180000, // Binance recommends 3 minutes
1127            reconnect_interval: 5000,
1128            max_reconnect_attempts: 5,
1129            auto_reconnect: true,
1130            enable_compression: false,
1131            pong_timeout: 90000, // Set pong timeout to 90 seconds
1132        };
1133
1134        Self {
1135            client: Arc::new(WsClient::new(config)),
1136            listen_key: Arc::new(RwLock::new(None)),
1137            listen_key_manager: Some(Arc::new(ListenKeyManager::new(binance))),
1138            auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1139            tickers: Arc::new(Mutex::new(HashMap::new())),
1140            bids_asks: Arc::new(Mutex::new(HashMap::new())),
1141            mark_prices: Arc::new(Mutex::new(HashMap::new())),
1142            orderbooks: Arc::new(Mutex::new(HashMap::new())),
1143            trades: Arc::new(Mutex::new(HashMap::new())),
1144            ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1145            balances: Arc::new(RwLock::new(HashMap::new())),
1146            orders: Arc::new(RwLock::new(HashMap::new())),
1147            my_trades: Arc::new(RwLock::new(HashMap::new())),
1148            positions: Arc::new(RwLock::new(HashMap::new())),
1149        }
1150    }
1151
1152    /// Connects to the WebSocket server
1153    pub async fn connect(&self) -> Result<()> {
1154        // Establish the WebSocket connection
1155        self.client.connect().await?;
1156
1157        // Start the auto-reconnect coordinator when not already running
1158        let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1159        if coordinator_guard.is_none() {
1160            let coordinator = self.client.clone().create_auto_reconnect_coordinator();
1161            coordinator.start().await;
1162            *coordinator_guard = Some(coordinator);
1163            tracing::info!("Auto-reconnect coordinator started");
1164        }
1165
1166        Ok(())
1167    }
1168
1169    /// Disconnects from the WebSocket server
1170    pub async fn disconnect(&self) -> Result<()> {
1171        // Stop the auto-reconnect coordinator first
1172        let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1173        if let Some(coordinator) = coordinator_guard.take() {
1174            coordinator.stop().await;
1175            tracing::info!("Auto-reconnect coordinator stopped");
1176        }
1177
1178        // Stop automatic listen key refresh if available
1179        if let Some(manager) = &self.listen_key_manager {
1180            manager.stop_auto_refresh().await;
1181        }
1182
1183        self.client.disconnect().await
1184    }
1185
1186    /// Connects to the user data stream
1187    ///
1188    /// Creates or retrieves a listen key, connects to the user data WebSocket, and starts auto-refresh
1189    ///
1190    /// # Returns
1191    /// Result of the connection attempt
1192    ///
1193    /// # Errors
1194    /// - Listen key manager is missing (requires `new_with_auth` constructor)
1195    /// - Listen key creation failed
1196    /// - WebSocket connection failed
1197    ///
1198    /// # Example
1199    /// ```rust,no_run
1200    /// # use ccxt_exchanges::binance::Binance;
1201    /// # use std::sync::Arc;
1202    /// # async fn example() -> ccxt_core::error::Result<()> {
1203    /// let binance = Arc::new(Binance::new());
1204    /// let ws = binance.create_authenticated_ws();
1205    /// ws.connect_user_stream().await?;
1206    /// # Ok(())
1207    /// # }
1208    /// ```
1209    pub async fn connect_user_stream(&self) -> Result<()> {
1210        let manager = self.listen_key_manager.as_ref()
1211            .ok_or_else(|| Error::invalid_request(
1212                "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
1213            ))?;
1214
1215        // Obtain an existing listen key or create a new one
1216        let listen_key = manager.get_or_create().await?;
1217
1218        // Update the WebSocket URL using the listen key
1219        let user_stream_url = format!("wss://stream.binance.com:9443/ws/{}", listen_key);
1220
1221        // Recreate the WebSocket client configuration
1222        let config = WsConfig {
1223            url: user_stream_url,
1224            connect_timeout: 10000,
1225            ping_interval: 180000,
1226            reconnect_interval: 5000,
1227            max_reconnect_attempts: 5,
1228            auto_reconnect: true,
1229            enable_compression: false,
1230            pong_timeout: 90000, // Default 90-second timeout
1231        };
1232
1233        // Initialize a new client instance (retained for future replacement logic)
1234        let _new_client = Arc::new(WsClient::new(config));
1235        // An `Arc` cannot be modified in place; concrete handling is deferred for future work
1236
1237        // Connect to the WebSocket endpoint
1238        self.client.connect().await?;
1239
1240        // Start automatic listen key refresh
1241        manager.start_auto_refresh().await;
1242
1243        // Cache the current listen key locally
1244        *self.listen_key.write().await = Some(listen_key);
1245
1246        Ok(())
1247    }
1248
1249    /// Closes the user data stream
1250    ///
1251    /// Stops auto-refresh and deletes the listen key
1252    ///
1253    /// # Returns
1254    /// Result of the shutdown
1255    pub async fn close_user_stream(&self) -> Result<()> {
1256        if let Some(manager) = &self.listen_key_manager {
1257            manager.delete().await?;
1258        }
1259        *self.listen_key.write().await = None;
1260        Ok(())
1261    }
1262
1263    /// Returns the active listen key, when available
1264    pub async fn get_listen_key(&self) -> Option<String> {
1265        if let Some(manager) = &self.listen_key_manager {
1266            manager.get_current().await
1267        } else {
1268            self.listen_key.read().await.clone()
1269        }
1270    }
1271
1272    /// Subscribes to the ticker stream for a symbol
1273    ///
1274    /// # Arguments
1275    /// * `symbol` - Trading pair (e.g. "btcusdt")
1276    ///
1277    /// # Returns
1278    /// Result of the subscription request
1279    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
1280        let stream = format!("{}@ticker", symbol.to_lowercase());
1281        self.client
1282            .subscribe(stream, Some(symbol.to_string()), None)
1283            .await
1284    }
1285
1286    /// Subscribes to the 24-hour ticker stream for all symbols
1287    pub async fn subscribe_all_tickers(&self) -> Result<()> {
1288        self.client
1289            .subscribe("!ticker@arr".to_string(), None, None)
1290            .await
1291    }
1292
1293    /// Subscribes to real-time trade executions for a symbol
1294    ///
1295    /// # Arguments
1296    /// * `symbol` - Trading pair (e.g. "btcusdt")
1297    ///
1298    /// # Returns
1299    /// Result of the subscription request
1300    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
1301        let stream = format!("{}@trade", symbol.to_lowercase());
1302        self.client
1303            .subscribe(stream, Some(symbol.to_string()), None)
1304            .await
1305    }
1306
1307    /// Subscribes to the aggregated trade stream for a symbol
1308    ///
1309    /// # Arguments
1310    /// * `symbol` - Trading pair (e.g. "btcusdt")
1311    ///
1312    /// # Returns
1313    /// Result of the subscription request
1314    pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
1315        let stream = format!("{}@aggTrade", symbol.to_lowercase());
1316        self.client
1317            .subscribe(stream, Some(symbol.to_string()), None)
1318            .await
1319    }
1320
1321    /// Subscribes to the order book depth stream
1322    ///
1323    /// # Arguments
1324    /// * `symbol` - Trading pair (e.g. "btcusdt")
1325    /// * `levels` - Depth levels (5, 10, 20)
1326    /// * `update_speed` - Update frequency ("100ms" or "1000ms")
1327    ///
1328    /// # Returns
1329    /// Result of the subscription request
1330    pub async fn subscribe_orderbook(
1331        &self,
1332        symbol: &str,
1333        levels: u32,
1334        update_speed: &str,
1335    ) -> Result<()> {
1336        let stream = if update_speed == "100ms" {
1337            format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
1338        } else {
1339            format!("{}@depth{}", symbol.to_lowercase(), levels)
1340        };
1341
1342        self.client
1343            .subscribe(stream, Some(symbol.to_string()), None)
1344            .await
1345    }
1346
1347    /// Subscribes to the diff order book stream
1348    ///
1349    /// # Arguments
1350    /// * `symbol` - Trading pair (e.g. "btcusdt")
1351    /// * `update_speed` - Update frequency ("100ms" or "1000ms")
1352    ///
1353    /// # Returns
1354    /// Result of the subscription request
1355    pub async fn subscribe_orderbook_diff(
1356        &self,
1357        symbol: &str,
1358        update_speed: Option<&str>,
1359    ) -> Result<()> {
1360        let stream = if let Some(speed) = update_speed {
1361            if speed == "100ms" {
1362                format!("{}@depth@100ms", symbol.to_lowercase())
1363            } else {
1364                format!("{}@depth", symbol.to_lowercase())
1365            }
1366        } else {
1367            format!("{}@depth", symbol.to_lowercase())
1368        };
1369
1370        self.client
1371            .subscribe(stream, Some(symbol.to_string()), None)
1372            .await
1373    }
1374
1375    /// Subscribes to Kline (candlestick) data for a symbol
1376    ///
1377    /// # Arguments
1378    /// * `symbol` - Trading pair (e.g. "btcusdt")
1379    /// * `interval` - Kline interval (1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1M)
1380    ///
1381    /// # Returns
1382    /// Result of the subscription request
1383    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
1384        let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
1385        self.client
1386            .subscribe(stream, Some(symbol.to_string()), None)
1387            .await
1388    }
1389
1390    /// Subscribes to the mini ticker stream for a symbol
1391    ///
1392    /// # Arguments
1393    /// * `symbol` - Trading pair (e.g. "btcusdt")
1394    ///
1395    /// # Returns
1396    /// Result of the subscription request
1397    pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
1398        let stream = format!("{}@miniTicker", symbol.to_lowercase());
1399        self.client
1400            .subscribe(stream, Some(symbol.to_string()), None)
1401            .await
1402    }
1403
1404    /// Subscribes to the mini ticker stream for all symbols
1405    pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
1406        self.client
1407            .subscribe("!miniTicker@arr".to_string(), None, None)
1408            .await
1409    }
1410
1411    /// Cancels an existing subscription
1412    ///
1413    /// # Arguments
1414    /// * `stream` - Stream identifier to unsubscribe from
1415    ///
1416    /// # Returns
1417    /// Result of the unsubscribe request
1418    pub async fn unsubscribe(&self, stream: String) -> Result<()> {
1419        self.client.unsubscribe(stream, None).await
1420    }
1421
1422    /// Receives the next available message
1423    ///
1424    /// # Returns
1425    /// Optional message payload when available
1426    pub async fn receive(&self) -> Option<Value> {
1427        self.client.receive().await
1428    }
1429
1430    /// Indicates whether the WebSocket connection is active
1431    pub async fn is_connected(&self) -> bool {
1432        self.client.is_connected().await
1433    }
1434
1435    /// Watches a single ticker stream (internal helper)
1436    ///
1437    /// # Arguments
1438    /// * `symbol` - Lowercase trading pair (e.g. "btcusdt")
1439    /// * `channel_name` - Channel identifier (ticker/miniTicker/markPrice/bookTicker)
1440    ///
1441    /// # Returns
1442    /// Parsed ticker data
1443    async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
1444        let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
1445
1446        // Subscribe to the requested stream
1447        self.client
1448            .subscribe(stream.clone(), Some(symbol.to_string()), None)
1449            .await?;
1450
1451        // Wait for and parse incoming messages
1452        loop {
1453            if let Some(message) = self.client.receive().await {
1454                // Ignore subscription acknowledgements
1455                if message.get("result").is_some() {
1456                    continue;
1457                }
1458
1459                // Parse ticker payload
1460                if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1461                    // Cache the ticker for later retrieval
1462                    let mut tickers = self.tickers.lock().await;
1463                    tickers.insert(ticker.symbol.clone(), ticker.clone());
1464
1465                    return Ok(ticker);
1466                }
1467            }
1468        }
1469    }
1470
1471    /// Watches multiple ticker streams (internal helper)
1472    ///
1473    /// # Arguments
1474    /// * `symbols` - Optional list of lowercase trading pairs
1475    /// * `channel_name` - Target channel name
1476    ///
1477    /// # Returns
1478    /// Mapping of symbol to ticker payloads
1479    async fn watch_tickers_internal(
1480        &self,
1481        symbols: Option<Vec<String>>,
1482        channel_name: &str,
1483    ) -> Result<HashMap<String, Ticker>> {
1484        let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
1485            // Subscribe to specific trading pairs
1486            syms.iter()
1487                .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
1488                .collect()
1489        } else {
1490            // Subscribe to the aggregated ticker stream
1491            vec![format!("!{}@arr", channel_name)]
1492        };
1493
1494        // Issue subscription requests
1495        for stream in &streams {
1496            self.client.subscribe(stream.clone(), None, None).await?;
1497        }
1498
1499        // Collect and parse messages
1500        let mut result = HashMap::new();
1501
1502        loop {
1503            if let Some(message) = self.client.receive().await {
1504                // Skip subscription acknowledgements
1505                if message.get("result").is_some() {
1506                    continue;
1507                }
1508
1509                // Handle array payloads (all tickers)
1510                if let Some(arr) = message.as_array() {
1511                    for item in arr {
1512                        if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
1513                            let symbol = ticker.symbol.clone();
1514
1515                            // Return only requested symbols when provided
1516                            if let Some(syms) = &symbols {
1517                                if syms.contains(&symbol.to_lowercase()) {
1518                                    result.insert(symbol.clone(), ticker.clone());
1519                                }
1520                            } else {
1521                                result.insert(symbol.clone(), ticker.clone());
1522                            }
1523
1524                            // Cache the ticker payload
1525                            let mut tickers = self.tickers.lock().await;
1526                            tickers.insert(symbol, ticker);
1527                        }
1528                    }
1529
1530                    // Exit once all requested tickers have been observed
1531                    if let Some(syms) = &symbols {
1532                        if result.len() == syms.len() {
1533                            return Ok(result);
1534                        }
1535                    } else {
1536                        return Ok(result);
1537                    }
1538                } else if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1539                    // Handle single-ticker payloads
1540                    let symbol = ticker.symbol.clone();
1541                    result.insert(symbol.clone(), ticker.clone());
1542
1543                    // Cache the ticker payload
1544                    let mut tickers = self.tickers.lock().await;
1545                    tickers.insert(symbol, ticker);
1546
1547                    // Exit once all requested tickers have been observed
1548                    if let Some(syms) = &symbols {
1549                        if result.len() == syms.len() {
1550                            return Ok(result);
1551                        }
1552                    }
1553                }
1554            }
1555        }
1556    }
1557
1558    /// Processes an order book delta update (internal helper)
1559    ///
1560    /// # Arguments
1561    /// * `symbol` - Trading pair
1562    /// * `delta_message` - Raw WebSocket delta payload
1563    /// * `is_futures` - Whether the feed originates from the futures market
1564    ///
1565    /// # Returns
1566    /// Result of the update; returns a special error when resynchronization is required
1567    async fn handle_orderbook_delta(
1568        &self,
1569        symbol: &str,
1570        delta_message: &Value,
1571        is_futures: bool,
1572    ) -> Result<()> {
1573        use ccxt_core::types::orderbook::{OrderBookDelta, OrderBookEntry};
1574        use rust_decimal::Decimal;
1575
1576        // Parse the delta message
1577        let first_update_id = delta_message["U"]
1578            .as_i64()
1579            .ok_or_else(|| Error::invalid_request("Missing first update ID in delta message"))?;
1580
1581        let final_update_id = delta_message["u"]
1582            .as_i64()
1583            .ok_or_else(|| Error::invalid_request("Missing final update ID in delta message"))?;
1584
1585        let prev_final_update_id = if is_futures {
1586            delta_message["pu"].as_i64()
1587        } else {
1588            None
1589        };
1590
1591        let timestamp = delta_message["E"]
1592            .as_i64()
1593            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1594
1595        // Parse bid updates
1596        let mut bids = Vec::new();
1597        if let Some(bids_arr) = delta_message["b"].as_array() {
1598            for bid in bids_arr {
1599                if let (Some(price_str), Some(amount_str)) = (bid[0].as_str(), bid[1].as_str()) {
1600                    if let (Ok(price), Ok(amount)) =
1601                        (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1602                    {
1603                        bids.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1604                    }
1605                }
1606            }
1607        }
1608
1609        // Parse ask updates
1610        let mut asks = Vec::new();
1611        if let Some(asks_arr) = delta_message["a"].as_array() {
1612            for ask in asks_arr {
1613                if let (Some(price_str), Some(amount_str)) = (ask[0].as_str(), ask[1].as_str()) {
1614                    if let (Ok(price), Ok(amount)) =
1615                        (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1616                    {
1617                        asks.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1618                    }
1619                }
1620            }
1621        }
1622
1623        // Build the delta structure
1624        let delta = OrderBookDelta {
1625            symbol: symbol.to_string(),
1626            first_update_id,
1627            final_update_id,
1628            prev_final_update_id,
1629            timestamp,
1630            bids,
1631            asks,
1632        };
1633
1634        // Retrieve or create the cached order book
1635        let mut orderbooks = self.orderbooks.lock().await;
1636        let orderbook = orderbooks
1637            .entry(symbol.to_string())
1638            .or_insert_with(|| OrderBook::new(symbol.to_string(), timestamp));
1639
1640        // If the order book is not synchronized yet, buffer the delta
1641        if !orderbook.is_synced {
1642            orderbook.buffer_delta(delta);
1643            return Ok(());
1644        }
1645
1646        // Apply the delta to the order book
1647        if let Err(e) = orderbook.apply_delta(&delta, is_futures) {
1648            // Check whether a resynchronization cycle is needed
1649            if orderbook.needs_resync {
1650                tracing::warn!("Orderbook {} needs resync due to: {}", symbol, e);
1651                // Buffer the delta so it can be reused after resync
1652                orderbook.buffer_delta(delta);
1653                // Signal that resynchronization is required
1654                return Err(Error::invalid_request(format!("RESYNC_NEEDED: {}", e)));
1655            }
1656            return Err(Error::invalid_request(e));
1657        }
1658
1659        Ok(())
1660    }
1661
1662    /// Retrieves an order book snapshot and initializes cached state (internal helper)
1663    ///
1664    /// # Arguments
1665    /// * `exchange` - Exchange reference used for REST API calls
1666    /// * `symbol` - Trading pair identifier
1667    /// * `limit` - Depth limit to request
1668    /// * `is_futures` - Whether the symbol is a futures market
1669    ///
1670    /// # Returns
1671    /// Initialized order book structure
1672    async fn fetch_orderbook_snapshot(
1673        &self,
1674        exchange: &Binance,
1675        symbol: &str,
1676        limit: Option<i64>,
1677        is_futures: bool,
1678    ) -> Result<OrderBook> {
1679        // Fetch snapshot via REST API
1680        let mut params = HashMap::new();
1681        if let Some(l) = limit {
1682            // json! macro with simple values is infallible
1683            #[allow(clippy::disallowed_methods)]
1684            let limit_value = serde_json::json!(l);
1685            params.insert("limit".to_string(), limit_value);
1686        }
1687
1688        let mut snapshot = exchange.fetch_order_book(symbol, None).await?;
1689
1690        // Mark the snapshot as synchronized
1691        snapshot.is_synced = true;
1692
1693        // Apply buffered deltas captured before the snapshot
1694        let mut orderbooks = self.orderbooks.lock().await;
1695        if let Some(cached_ob) = orderbooks.get_mut(symbol) {
1696            // Transfer buffered deltas to the snapshot instance
1697            snapshot.buffered_deltas = cached_ob.buffered_deltas.clone();
1698
1699            // Apply buffered deltas to catch up
1700            if let Ok(processed) = snapshot.process_buffered_deltas(is_futures) {
1701                tracing::debug!("Processed {} buffered deltas for {}", processed, symbol);
1702            }
1703        }
1704
1705        // Update cache with the synchronized snapshot
1706        orderbooks.insert(symbol.to_string(), snapshot.clone());
1707
1708        Ok(snapshot)
1709    }
1710
1711    /// Watches a single order book stream (internal helper)
1712    ///
1713    /// # Arguments
1714    /// * `exchange` - Exchange reference
1715    /// * `symbol` - Lowercase trading pair
1716    /// * `limit` - Depth limit
1717    /// * `update_speed` - Update frequency (100 or 1000 ms)
1718    /// * `is_futures` - Whether the symbol is a futures market
1719    ///
1720    /// # Returns
1721    /// Order book snapshot enriched with streamed updates
1722    async fn watch_orderbook_internal(
1723        &self,
1724        exchange: &Binance,
1725        symbol: &str,
1726        limit: Option<i64>,
1727        update_speed: i32,
1728        is_futures: bool,
1729    ) -> Result<OrderBook> {
1730        // Construct the stream name
1731        let stream = if update_speed == 100 {
1732            format!("{}@depth@100ms", symbol.to_lowercase())
1733        } else {
1734            format!("{}@depth", symbol.to_lowercase())
1735        };
1736
1737        // Subscribe to depth updates
1738        self.client
1739            .subscribe(stream.clone(), Some(symbol.to_string()), None)
1740            .await?;
1741
1742        // Start buffering deltas before the snapshot is ready
1743        let snapshot_fetched = Arc::new(Mutex::new(false));
1744        let _snapshot_fetched_clone = snapshot_fetched.clone();
1745
1746        // Spawn a placeholder processing loop (actual handling occurs elsewhere)
1747        let _orderbooks_clone = self.orderbooks.clone();
1748        let _symbol_clone = symbol.to_string();
1749
1750        tokio::spawn(async move {
1751            // Placeholder: actual message handling occurs in the main loop
1752        });
1753
1754        // Give the stream time to accumulate initial deltas before fetching a snapshot
1755        tokio::time::sleep(Duration::from_millis(500)).await;
1756
1757        // Fetch the initial snapshot
1758        let _snapshot = self
1759            .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1760            .await?;
1761
1762        *snapshot_fetched.lock().await = true;
1763
1764        // Main processing loop
1765        loop {
1766            if let Some(message) = self.client.receive().await {
1767                // Skip subscription acknowledgements
1768                if message.get("result").is_some() {
1769                    continue;
1770                }
1771
1772                // Process depth updates only
1773                if let Some(event_type) = message.get("e").and_then(|v| v.as_str()) {
1774                    if event_type == "depthUpdate" {
1775                        match self
1776                            .handle_orderbook_delta(symbol, &message, is_futures)
1777                            .await
1778                        {
1779                            Ok(_) => {
1780                                // Return the updated order book once synchronized
1781                                let orderbooks = self.orderbooks.lock().await;
1782                                if let Some(ob) = orderbooks.get(symbol) {
1783                                    if ob.is_synced {
1784                                        return Ok(ob.clone());
1785                                    }
1786                                }
1787                            }
1788                            Err(e) => {
1789                                let err_msg = e.to_string();
1790
1791                                // Initiate resynchronization when instructed
1792                                if err_msg.contains("RESYNC_NEEDED") {
1793                                    tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
1794
1795                                    let current_time = chrono::Utc::now().timestamp_millis();
1796                                    let should_resync = {
1797                                        let orderbooks = self.orderbooks.lock().await;
1798                                        if let Some(ob) = orderbooks.get(symbol) {
1799                                            ob.should_resync(current_time)
1800                                        } else {
1801                                            true
1802                                        }
1803                                    };
1804
1805                                    if should_resync {
1806                                        tracing::info!("Initiating resync for {}", symbol);
1807
1808                                        // Reset local state in preparation for resync
1809                                        {
1810                                            let mut orderbooks = self.orderbooks.lock().await;
1811                                            if let Some(ob) = orderbooks.get_mut(symbol) {
1812                                                ob.reset_for_resync();
1813                                                ob.mark_resync_initiated(current_time);
1814                                            }
1815                                        }
1816
1817                                        // Allow some deltas to buffer before fetching a new snapshot
1818                                        tokio::time::sleep(Duration::from_millis(500)).await;
1819
1820                                        // Fetch a fresh snapshot and continue processing
1821                                        match self
1822                                            .fetch_orderbook_snapshot(
1823                                                exchange, symbol, limit, is_futures,
1824                                            )
1825                                            .await
1826                                        {
1827                                            Ok(_) => {
1828                                                tracing::info!(
1829                                                    "Resync completed successfully for {}",
1830                                                    symbol
1831                                                );
1832                                                continue;
1833                                            }
1834                                            Err(resync_err) => {
1835                                                tracing::error!(
1836                                                    "Resync failed for {}: {}",
1837                                                    symbol,
1838                                                    resync_err
1839                                                );
1840                                                return Err(resync_err);
1841                                            }
1842                                        }
1843                                    } else {
1844                                        tracing::debug!(
1845                                            "Resync rate limited for {}, skipping",
1846                                            symbol
1847                                        );
1848                                        continue;
1849                                    }
1850                                } else {
1851                                    tracing::error!(
1852                                        "Failed to handle orderbook delta: {}",
1853                                        err_msg
1854                                    );
1855                                    continue;
1856                                }
1857                            }
1858                        }
1859                    }
1860                }
1861            }
1862        }
1863    }
1864
1865    /// Watches multiple order book streams (internal helper)
1866    ///
1867    /// # Arguments
1868    /// * `exchange` - Exchange reference
1869    /// * `symbols` - List of trading pairs
1870    /// * `limit` - Requested depth level
1871    /// * `update_speed` - Update frequency
1872    /// * `is_futures` - Whether the symbols are futures markets
1873    ///
1874    /// # Returns
1875    /// Mapping of symbol to order book data
1876    async fn watch_orderbooks_internal(
1877        &self,
1878        exchange: &Binance,
1879        symbols: Vec<String>,
1880        limit: Option<i64>,
1881        update_speed: i32,
1882        is_futures: bool,
1883    ) -> Result<HashMap<String, OrderBook>> {
1884        // Binance enforces a 200-symbol limit per connection
1885        if symbols.len() > 200 {
1886            return Err(Error::invalid_request(
1887                "Binance supports max 200 symbols per connection",
1888            ));
1889        }
1890
1891        // Subscribe to each symbol
1892        for symbol in &symbols {
1893            let stream = if update_speed == 100 {
1894                format!("{}@depth@100ms", symbol.to_lowercase())
1895            } else {
1896                format!("{}@depth", symbol.to_lowercase())
1897            };
1898
1899            self.client
1900                .subscribe(stream, Some(symbol.clone()), None)
1901                .await?;
1902        }
1903
1904        // Allow messages to accumulate before snapshot retrieval
1905        tokio::time::sleep(Duration::from_millis(500)).await;
1906
1907        // Fetch snapshots for all symbols
1908        for symbol in &symbols {
1909            let _ = self
1910                .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1911                .await;
1912        }
1913
1914        // Process incremental updates
1915        let mut result = HashMap::new();
1916        let mut update_count = 0;
1917
1918        while update_count < symbols.len() {
1919            if let Some(message) = self.client.receive().await {
1920                // Skip subscription acknowledgements
1921                if message.get("result").is_some() {
1922                    continue;
1923                }
1924
1925                // Handle depth updates
1926                if let Some(event_type) = message.get("e").and_then(|v| v.as_str()) {
1927                    if event_type == "depthUpdate" {
1928                        if let Some(msg_symbol) = message.get("s").and_then(|v| v.as_str()) {
1929                            if let Err(e) = self
1930                                .handle_orderbook_delta(msg_symbol, &message, is_futures)
1931                                .await
1932                            {
1933                                tracing::error!("Failed to handle orderbook delta: {}", e);
1934                                continue;
1935                            }
1936
1937                            update_count += 1;
1938                        }
1939                    }
1940                }
1941            }
1942        }
1943
1944        // Collect the resulting order books
1945        let orderbooks = self.orderbooks.lock().await;
1946        for symbol in &symbols {
1947            if let Some(ob) = orderbooks.get(symbol) {
1948                result.insert(symbol.clone(), ob.clone());
1949            }
1950        }
1951
1952        Ok(result)
1953    }
1954
1955    ///
1956    /// # Arguments
1957    /// * `symbol` - Trading pair identifier
1958    ///
1959    /// # Returns
1960    /// Ticker snapshot when available
1961    pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1962        let tickers = self.tickers.lock().await;
1963        tickers.get(symbol).cloned()
1964    }
1965
1966    /// Returns all cached ticker snapshots
1967    pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1968        let tickers = self.tickers.lock().await;
1969        tickers.clone()
1970    }
1971
1972    /// Handles balance update messages (internal helper)
1973    ///
1974    /// # Arguments
1975    /// * `message` - WebSocket payload
1976    /// * `account_type` - Account category (spot/future/delivery, etc.)
1977    ///
1978    /// # Returns
1979    /// Result of the balance update processing
1980    async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1981        use rust_decimal::Decimal;
1982        use std::str::FromStr;
1983
1984        // Identify the event type
1985        let event_type = message
1986            .get("e")
1987            .and_then(|e| e.as_str())
1988            .ok_or_else(|| Error::invalid_request("Missing event type in balance message"))?;
1989
1990        // Retrieve or create cached balances for the account
1991        let mut balances = self.balances.write().await;
1992        let balance = balances
1993            .entry(account_type.to_string())
1994            .or_insert_with(Balance::new);
1995
1996        match event_type {
1997            // Spot account incremental balance update
1998            "balanceUpdate" => {
1999                let asset = message
2000                    .get("a")
2001                    .and_then(|a| a.as_str())
2002                    .ok_or_else(|| Error::invalid_request("Missing asset in balanceUpdate"))?;
2003
2004                let delta_str = message
2005                    .get("d")
2006                    .and_then(|d| d.as_str())
2007                    .ok_or_else(|| Error::invalid_request("Missing delta in balanceUpdate"))?;
2008
2009                let delta = Decimal::from_str(delta_str)
2010                    .map_err(|e| Error::invalid_request(format!("Invalid delta value: {}", e)))?;
2011
2012                // Apply delta to the cached balance
2013                balance.apply_delta(asset.to_string(), delta);
2014            }
2015
2016            // Spot account full balance update
2017            "outboundAccountPosition" => {
2018                if let Some(balances_array) = message.get("B").and_then(|b| b.as_array()) {
2019                    for balance_item in balances_array {
2020                        let asset =
2021                            balance_item
2022                                .get("a")
2023                                .and_then(|a| a.as_str())
2024                                .ok_or_else(|| {
2025                                    Error::invalid_request("Missing asset in balance item")
2026                                })?;
2027
2028                        let free_str = balance_item
2029                            .get("f")
2030                            .and_then(|f| f.as_str())
2031                            .ok_or_else(|| Error::invalid_request("Missing free balance"))?;
2032
2033                        let locked_str = balance_item
2034                            .get("l")
2035                            .and_then(|l| l.as_str())
2036                            .ok_or_else(|| Error::invalid_request("Missing locked balance"))?;
2037
2038                        let free = Decimal::from_str(free_str).map_err(|e| {
2039                            Error::invalid_request(format!("Invalid free value: {}", e))
2040                        })?;
2041
2042                        let locked = Decimal::from_str(locked_str).map_err(|e| {
2043                            Error::invalid_request(format!("Invalid locked value: {}", e))
2044                        })?;
2045
2046                        // Update the cached balance snapshot
2047                        balance.update_balance(asset.to_string(), free, locked);
2048                    }
2049                }
2050            }
2051
2052            // Futures/delivery account updates
2053            "ACCOUNT_UPDATE" => {
2054                if let Some(account_data) = message.get("a") {
2055                    // Parse balance array
2056                    if let Some(balances_array) = account_data.get("B").and_then(|b| b.as_array()) {
2057                        for balance_item in balances_array {
2058                            let asset = balance_item.get("a").and_then(|a| a.as_str()).ok_or_else(
2059                                || Error::invalid_request("Missing asset in balance item"),
2060                            )?;
2061
2062                            let wallet_balance_str = balance_item
2063                                .get("wb")
2064                                .and_then(|wb| wb.as_str())
2065                                .ok_or_else(|| Error::invalid_request("Missing wallet balance"))?;
2066
2067                            let wallet_balance =
2068                                Decimal::from_str(wallet_balance_str).map_err(|e| {
2069                                    Error::invalid_request(format!("Invalid wallet balance: {}", e))
2070                                })?;
2071
2072                            // Optional cross wallet balance
2073                            let cross_wallet = balance_item
2074                                .get("cw")
2075                                .and_then(|cw| cw.as_str())
2076                                .and_then(|s| Decimal::from_str(s).ok());
2077
2078                            // Update wallet balance snapshot
2079                            balance.update_wallet(asset.to_string(), wallet_balance, cross_wallet);
2080                        }
2081                    }
2082
2083                    // Positions are contained in account_data["P"]; handling occurs in watch_positions
2084                }
2085            }
2086
2087            _ => {
2088                return Err(Error::invalid_request(format!(
2089                    "Unknown balance event type: {}",
2090                    event_type
2091                )));
2092            }
2093        }
2094
2095        Ok(())
2096    }
2097
2098    /// Parses a WebSocket trade message
2099    ///
2100    /// Extracts trade information from an `executionReport` event
2101    ///
2102    /// # Arguments
2103    /// * `data` - Raw WebSocket JSON payload
2104    ///
2105    /// # Returns
2106    /// Parsed `Trade` structure
2107    fn parse_ws_trade(&self, data: &Value) -> Result<Trade> {
2108        use ccxt_core::types::{Fee, OrderSide, OrderType, TakerOrMaker};
2109        use rust_decimal::Decimal;
2110        use std::str::FromStr;
2111
2112        // Extract symbol field
2113        let symbol = data
2114            .get("s")
2115            .and_then(|v| v.as_str())
2116            .ok_or_else(|| Error::invalid_request("Missing symbol field".to_string()))?
2117            .to_string();
2118
2119        // Trade ID (field `t`)
2120        let id = data
2121            .get("t")
2122            .and_then(|v| v.as_i64())
2123            .map(|v| v.to_string());
2124
2125        // Trade timestamp (field `T`)
2126        let timestamp = data.get("T").and_then(|v| v.as_i64()).unwrap_or(0);
2127
2128        // Executed price (field `L` - last executed price)
2129        let price = data
2130            .get("L")
2131            .and_then(|v| v.as_str())
2132            .and_then(|s| Decimal::from_str(s).ok())
2133            .unwrap_or(Decimal::ZERO);
2134
2135        // Executed amount (field `l` - last executed quantity)
2136        let amount = data
2137            .get("l")
2138            .and_then(|v| v.as_str())
2139            .and_then(|s| Decimal::from_str(s).ok())
2140            .unwrap_or(Decimal::ZERO);
2141
2142        // Quote asset amount (field `Y` - last quote asset transacted quantity)
2143        let cost = data
2144            .get("Y")
2145            .and_then(|v| v.as_str())
2146            .and_then(|s| Decimal::from_str(s).ok())
2147            .or_else(|| {
2148                // Fallback: compute from price * amount when `Y` is unavailable
2149                if price > Decimal::ZERO && amount > Decimal::ZERO {
2150                    Some(price * amount)
2151                } else {
2152                    None
2153                }
2154            });
2155
2156        // Trade side (field `S`)
2157        let side = data
2158            .get("S")
2159            .and_then(|v| v.as_str())
2160            .and_then(|s| match s.to_uppercase().as_str() {
2161                "BUY" => Some(OrderSide::Buy),
2162                "SELL" => Some(OrderSide::Sell),
2163                _ => None,
2164            })
2165            .unwrap_or(OrderSide::Buy);
2166
2167        // Order type (field `o`)
2168        let trade_type =
2169            data.get("o")
2170                .and_then(|v| v.as_str())
2171                .and_then(|s| match s.to_uppercase().as_str() {
2172                    "LIMIT" => Some(OrderType::Limit),
2173                    "MARKET" => Some(OrderType::Market),
2174                    _ => None,
2175                });
2176
2177        // Associated order ID (field `i`)
2178        let order_id = data
2179            .get("i")
2180            .and_then(|v| v.as_i64())
2181            .map(|v| v.to_string());
2182
2183        // Maker/taker flag (field `m` - true when buyer is the maker)
2184        let taker_or_maker = data.get("m").and_then(|v| v.as_bool()).map(|is_maker| {
2185            if is_maker {
2186                TakerOrMaker::Maker
2187            } else {
2188                TakerOrMaker::Taker
2189            }
2190        });
2191
2192        // Fee information (fields `n` = fee amount, `N` = fee currency)
2193        let fee = if let Some(fee_cost_str) = data.get("n").and_then(|v| v.as_str()) {
2194            if let Ok(fee_cost) = Decimal::from_str(fee_cost_str) {
2195                let currency = data
2196                    .get("N")
2197                    .and_then(|v| v.as_str())
2198                    .unwrap_or("UNKNOWN")
2199                    .to_string();
2200                Some(Fee {
2201                    currency,
2202                    cost: fee_cost,
2203                    rate: None,
2204                })
2205            } else {
2206                None
2207            }
2208        } else {
2209            None
2210        };
2211
2212        // Derive ISO8601 timestamp string when possible
2213        let datetime = chrono::DateTime::from_timestamp_millis(timestamp)
2214            .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string());
2215
2216        // Preserve the raw payload in the `info` map
2217        let mut info = HashMap::new();
2218        if let Value::Object(map) = data {
2219            for (k, v) in map.iter() {
2220                info.insert(k.clone(), v.clone());
2221            }
2222        }
2223
2224        Ok(Trade {
2225            id,
2226            order: order_id,
2227            symbol,
2228            trade_type,
2229            side,
2230            taker_or_maker,
2231            price: Price::from(price),
2232            amount: Amount::from(amount),
2233            cost: cost.map(Cost::from),
2234            fee,
2235            timestamp,
2236            datetime,
2237            info,
2238        })
2239    }
2240
2241    /// Filters cached personal trades by symbol, time range, and limit
2242    ///
2243    /// # Arguments
2244    /// * `symbol` - Optional symbol filter
2245    /// * `since` - Optional starting timestamp (inclusive)
2246    /// * `limit` - Optional maximum number of trades to return
2247    async fn filter_my_trades(
2248        &self,
2249        symbol: Option<&str>,
2250        since: Option<i64>,
2251        limit: Option<usize>,
2252    ) -> Result<Vec<Trade>> {
2253        let trades_map = self.my_trades.read().await;
2254
2255        // Filter by symbol when provided
2256        let mut trades: Vec<Trade> = if let Some(sym) = symbol {
2257            trades_map
2258                .get(sym)
2259                .map(|symbol_trades| symbol_trades.iter().cloned().collect())
2260                .unwrap_or_default()
2261        } else {
2262            trades_map
2263                .values()
2264                .flat_map(|symbol_trades| symbol_trades.iter().cloned())
2265                .collect()
2266        };
2267
2268        // Apply `since` filter when provided
2269        if let Some(since_ts) = since {
2270            trades.retain(|trade| trade.timestamp >= since_ts);
2271        }
2272
2273        // Sort by timestamp descending (latest first)
2274        trades.sort_by(|a, b| {
2275            let ts_a = a.timestamp;
2276            let ts_b = b.timestamp;
2277            ts_b.cmp(&ts_a)
2278        });
2279
2280        // Apply optional limit
2281        if let Some(lim) = limit {
2282            trades.truncate(lim);
2283        }
2284
2285        Ok(trades)
2286    }
2287
2288    /// Parses a WebSocket position payload
2289    ///
2290    /// # Arguments
2291    /// * `data` - Position data from the ACCOUNT_UPDATE event (`P` array element)
2292    ///
2293    /// # Returns
2294    /// Parsed `Position` instance
2295    ///
2296    /// # Binance WebSocket Position Payload Example
2297    /// ```json
2298    /// {
2299    ///   "s": "BTCUSDT",           // Trading pair
2300    ///   "pa": "-0.089",           // Position amount (negative indicates short)
2301    ///   "ep": "19700.03933",      // Entry price
2302    ///   "cr": "-1260.24809979",   // Accumulated realized PnL
2303    ///   "up": "1.53058860",       // Unrealized PnL
2304    ///   "mt": "isolated",         // Margin mode: isolated/cross
2305    ///   "iw": "87.13658940",      // Isolated wallet balance
2306    ///   "ps": "BOTH",             // Position side: BOTH/LONG/SHORT
2307    ///   "ma": "USDT"              // Margin asset
2308    /// }
2309    /// ```
2310    async fn parse_ws_position(&self, data: &Value) -> Result<Position> {
2311        // Extract required fields
2312        let symbol = data["s"]
2313            .as_str()
2314            .ok_or_else(|| Error::invalid_request("Missing symbol field"))?
2315            .to_string();
2316
2317        let position_amount_str = data["pa"]
2318            .as_str()
2319            .ok_or_else(|| Error::invalid_request("Missing position amount"))?;
2320
2321        let position_amount = position_amount_str
2322            .parse::<f64>()
2323            .map_err(|e| Error::invalid_request(format!("Invalid position amount: {}", e)))?;
2324
2325        // Extract position side
2326        let position_side = data["ps"]
2327            .as_str()
2328            .ok_or_else(|| Error::invalid_request("Missing position side"))?
2329            .to_uppercase();
2330
2331        // Determine hedged mode and actual side
2332        // - If ps = BOTH, hedged = false and use sign of `pa` for actual side
2333        // - If ps = LONG/SHORT, hedged = true and side equals ps
2334        let (side, hedged) = if position_side == "BOTH" {
2335            let actual_side = if position_amount < 0.0 {
2336                "short"
2337            } else {
2338                "long"
2339            };
2340            (actual_side.to_string(), false)
2341        } else {
2342            (position_side.to_lowercase(), true)
2343        };
2344
2345        // Extract additional fields
2346        let entry_price = data["ep"].as_str().and_then(|s| s.parse::<f64>().ok());
2347        let unrealized_pnl = data["up"].as_str().and_then(|s| s.parse::<f64>().ok());
2348        let realized_pnl = data["cr"].as_str().and_then(|s| s.parse::<f64>().ok());
2349        let margin_mode = data["mt"].as_str().map(|s| s.to_string());
2350        let initial_margin = data["iw"].as_str().and_then(|s| s.parse::<f64>().ok());
2351        let _margin_asset = data["ma"].as_str().map(|s| s.to_string());
2352
2353        // Construct the `Position` object
2354        Ok(Position {
2355            info: data.clone(),
2356            id: None,
2357            symbol,
2358            side: Some(side),
2359            contracts: Some(position_amount.abs()), // Absolute contract amount
2360            contract_size: None,
2361            entry_price,
2362            mark_price: None,
2363            notional: None,
2364            leverage: None,
2365            collateral: initial_margin, // Use isolated wallet balance as collateral
2366            initial_margin,
2367            initial_margin_percentage: None,
2368            maintenance_margin: None,
2369            maintenance_margin_percentage: None,
2370            unrealized_pnl,
2371            realized_pnl,
2372            liquidation_price: None,
2373            margin_ratio: None,
2374            margin_mode,
2375            hedged: Some(hedged),
2376            percentage: None,
2377            position_side: None,
2378            dual_side_position: None,
2379            timestamp: Some(chrono::Utc::now().timestamp_millis() as u64),
2380            datetime: Some(chrono::Utc::now().to_rfc3339()),
2381        })
2382    }
2383
2384    /// Filters cached positions by symbol, time range, and limit
2385    ///
2386    /// # Arguments
2387    /// * `symbols` - Optional list of symbols to include
2388    /// * `since` - Optional starting timestamp (inclusive)
2389    /// * `limit` - Optional maximum number of positions to return
2390    ///
2391    /// # Returns
2392    /// Filtered list of positions
2393    async fn filter_positions(
2394        &self,
2395        symbols: Option<&[String]>,
2396        since: Option<i64>,
2397        limit: Option<usize>,
2398    ) -> Result<Vec<Position>> {
2399        let positions_map = self.positions.read().await;
2400
2401        // Filter by symbol list when provided
2402        let mut positions: Vec<Position> = if let Some(syms) = symbols {
2403            syms.iter()
2404                .filter_map(|sym| positions_map.get(sym))
2405                .flat_map(|side_map| side_map.values().cloned())
2406                .collect()
2407        } else {
2408            positions_map
2409                .values()
2410                .flat_map(|side_map| side_map.values().cloned())
2411                .collect()
2412        };
2413
2414        // Apply `since` filter when provided
2415        if let Some(since_ts) = since {
2416            positions.retain(|pos| {
2417                pos.timestamp
2418                    .map(|ts| ts as i64 >= since_ts)
2419                    .unwrap_or(false)
2420            });
2421        }
2422
2423        // Sort by timestamp descending (latest first)
2424        positions.sort_by(|a, b| {
2425            let ts_a = a.timestamp.unwrap_or(0);
2426            let ts_b = b.timestamp.unwrap_or(0);
2427            ts_b.cmp(&ts_a)
2428        });
2429
2430        // Apply optional limit
2431        if let Some(lim) = limit {
2432            positions.truncate(lim);
2433        }
2434
2435        Ok(positions)
2436    }
2437}
2438
2439impl Binance {
2440    /// Subscribes to the ticker stream for a unified symbol
2441    ///
2442    /// # Arguments
2443    /// * `symbol` - Unified trading pair identifier
2444    ///
2445    /// # Returns
2446    /// Result of the subscription call
2447    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
2448        let ws = self.create_ws();
2449        ws.connect().await?;
2450
2451        // Convert symbol format BTC/USDT -> btcusdt
2452        let binance_symbol = symbol.replace('/', "").to_lowercase();
2453        ws.subscribe_ticker(&binance_symbol).await
2454    }
2455
2456    /// Subscribes to the trade stream for a unified symbol
2457    ///
2458    /// # Arguments
2459    /// * `symbol` - Unified trading pair identifier
2460    ///
2461    /// # Returns
2462    /// Result of the subscription call
2463    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
2464        let ws = self.create_ws();
2465        ws.connect().await?;
2466
2467        let binance_symbol = symbol.replace('/', "").to_lowercase();
2468        ws.subscribe_trades(&binance_symbol).await
2469    }
2470
2471    /// Subscribes to the order book stream for a unified symbol
2472    ///
2473    /// # Arguments
2474    /// * `symbol` - Unified trading pair identifier
2475    /// * `levels` - Optional depth limit (default 20)
2476    ///
2477    /// # Returns
2478    /// Result of the subscription call
2479    pub async fn subscribe_orderbook(&self, symbol: &str, levels: Option<u32>) -> Result<()> {
2480        let ws = self.create_ws();
2481        ws.connect().await?;
2482
2483        let binance_symbol = symbol.replace('/', "").to_lowercase();
2484        let depth_levels = levels.unwrap_or(20);
2485        ws.subscribe_orderbook(&binance_symbol, depth_levels, "1000ms")
2486            .await
2487    }
2488
2489    /// Subscribes to the candlestick stream for a unified symbol
2490    ///
2491    /// # Arguments
2492    /// * `symbol` - Unified trading pair identifier
2493    /// * `interval` - Candlestick interval identifier
2494    ///
2495    /// # Returns
2496    /// Result of the subscription call
2497    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
2498        let ws = self.create_ws();
2499        ws.connect().await?;
2500
2501        let binance_symbol = symbol.replace('/', "").to_lowercase();
2502        ws.subscribe_kline(&binance_symbol, interval).await
2503    }
2504
2505    /// Watches a ticker stream for a single unified symbol
2506    ///
2507    /// # Arguments
2508    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2509    /// * `params` - Optional parameters
2510    ///   - `name`: Channel name (ticker/miniTicker, defaults to ticker)
2511    ///
2512    /// # Returns
2513    /// Parsed ticker structure
2514    ///
2515    /// # Example
2516    /// ```rust,no_run
2517    /// # use ccxt_exchanges::binance::Binance;
2518    /// # async fn example() -> ccxt_core::error::Result<()> {
2519    /// let exchange = Binance::new();
2520    /// let ticker = exchange.watch_ticker("BTC/USDT", None).await?;
2521    /// println!("Price: {}", ticker.last.unwrap_or(0.0));
2522    /// # Ok(())
2523    /// # }
2524    /// ```
2525    pub async fn watch_ticker(
2526        &self,
2527        symbol: &str,
2528        params: Option<HashMap<String, Value>>,
2529    ) -> Result<Ticker> {
2530        // Load market metadata
2531        self.load_markets(false).await?;
2532
2533        // Convert unified symbol to exchange format
2534        let market = self.base.market(symbol).await?;
2535        let binance_symbol = market.id.to_lowercase();
2536
2537        // Select channel name
2538        let channel_name = if let Some(p) = &params {
2539            p.get("name").and_then(|v| v.as_str()).unwrap_or("ticker")
2540        } else {
2541            "ticker"
2542        };
2543
2544        // Establish WebSocket connection
2545        let ws = self.create_ws();
2546        ws.connect().await?;
2547
2548        // Watch ticker
2549        ws.watch_ticker_internal(&binance_symbol, channel_name)
2550            .await
2551    }
2552
2553    /// Watches ticker streams for multiple unified symbols
2554    ///
2555    /// # Arguments
2556    /// * `symbols` - Optional list of unified trading pairs (None subscribes to all)
2557    /// * `params` - Optional parameters
2558    ///   - `name`: Channel name (ticker/miniTicker, defaults to ticker)
2559    ///
2560    /// # Returns
2561    /// Mapping of symbol to ticker data
2562    ///
2563    /// # Example
2564    /// ```rust,no_run
2565    /// # use ccxt_exchanges::binance::Binance;
2566    /// # async fn example() -> ccxt_core::error::Result<()> {
2567    /// let exchange = Binance::new();
2568    ///
2569    /// // Watch a subset of symbols
2570    /// let tickers = exchange.watch_tickers(
2571    ///     Some(vec!["BTC/USDT".to_string(), "ETH/USDT".to_string()]),
2572    ///     None
2573    /// ).await?;
2574    ///
2575    /// // Watch all symbols
2576    /// let all_tickers = exchange.watch_tickers(None, None).await?;
2577    /// # Ok(())
2578    /// # }
2579    /// ```
2580    pub async fn watch_tickers(
2581        &self,
2582        symbols: Option<Vec<String>>,
2583        params: Option<HashMap<String, Value>>,
2584    ) -> Result<HashMap<String, Ticker>> {
2585        // Load market metadata
2586        self.load_markets(false).await?;
2587
2588        // Determine channel name
2589        let channel_name = if let Some(p) = &params {
2590            p.get("name").and_then(|v| v.as_str()).unwrap_or("ticker")
2591        } else {
2592            "ticker"
2593        };
2594
2595        // Validate channel selection
2596        if channel_name == "bookTicker" {
2597            return Err(Error::invalid_request(
2598                "To subscribe for bids-asks, use watch_bids_asks() method instead",
2599            ));
2600        }
2601
2602        // Convert unified symbols to exchange format
2603        let binance_symbols = if let Some(syms) = symbols {
2604            let mut result = Vec::new();
2605            for symbol in syms {
2606                let market = self.base.market(&symbol).await?;
2607                result.push(market.id.to_lowercase());
2608            }
2609            Some(result)
2610        } else {
2611            None
2612        };
2613
2614        // Establish WebSocket connection
2615        let ws = self.create_ws();
2616        ws.connect().await?;
2617
2618        // Watch tickers
2619        ws.watch_tickers_internal(binance_symbols, channel_name)
2620            .await
2621    }
2622
2623    /// Watches the mark price stream for a futures market
2624    ///
2625    /// # Arguments
2626    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT:USDT")
2627    /// * `params` - Optional parameters
2628    ///   - `use1sFreq`: Whether to use 1-second updates (defaults to true)
2629    ///
2630    /// # Returns
2631    /// Ticker structure representing the mark price
2632    ///
2633    /// # Example
2634    /// ```rust,no_run
2635    /// # use ccxt_exchanges::binance::Binance;
2636    /// # use std::collections::HashMap;
2637    /// # use serde_json::json;
2638    /// # async fn example() -> ccxt_core::error::Result<()> {
2639    /// let exchange = Binance::new();
2640    ///
2641    /// // Use 1-second updates
2642    /// let ticker = exchange.watch_mark_price("BTC/USDT:USDT", None).await?;
2643    ///
2644    /// // Use 3-second updates
2645    /// let mut params = HashMap::new();
2646    /// params.insert("use1sFreq".to_string(), json!(false));
2647    /// let ticker = exchange.watch_mark_price("BTC/USDT:USDT", Some(params)).await?;
2648    /// # Ok(())
2649    /// # }
2650    /// ```
2651    pub async fn watch_mark_price(
2652        &self,
2653        symbol: &str,
2654        params: Option<HashMap<String, Value>>,
2655    ) -> Result<Ticker> {
2656        // Load market metadata
2657        self.load_markets(false).await?;
2658
2659        // Ensure the symbol belongs to a futures market
2660        let market = self.base.market(symbol).await?;
2661        if market.market_type != MarketType::Swap && market.market_type != MarketType::Futures {
2662            return Err(Error::invalid_request(format!(
2663                "watch_mark_price() does not support {} markets",
2664                market.market_type
2665            )));
2666        }
2667
2668        let binance_symbol = market.id.to_lowercase();
2669
2670        // Determine update frequency
2671        let use_1s_freq = if let Some(p) = &params {
2672            p.get("use1sFreq").and_then(|v| v.as_bool()).unwrap_or(true)
2673        } else {
2674            true
2675        };
2676
2677        // Construct channel name
2678        let channel_name = if use_1s_freq {
2679            "markPrice@1s"
2680        } else {
2681            "markPrice"
2682        };
2683
2684        // Establish WebSocket connection
2685        let ws = self.create_ws();
2686        ws.connect().await?;
2687
2688        // Watch mark price
2689        ws.watch_ticker_internal(&binance_symbol, channel_name)
2690            .await
2691    }
2692
2693    /// Watches an order book stream for a unified symbol
2694    ///
2695    /// # Arguments
2696    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2697    /// * `limit` - Optional depth limit (defaults to unlimited)
2698    /// * `params` - Optional parameters
2699    ///   - `speed`: Update frequency (100 or 1000 ms, defaults to 100)
2700    ///
2701    /// # Returns
2702    /// Order book snapshot populated with streaming updates
2703    ///
2704    /// # Example
2705    /// ```rust,no_run
2706    /// # use ccxt_exchanges::binance::Binance;
2707    /// # use std::collections::HashMap;
2708    /// # use serde_json::json;
2709    /// # async fn example() -> ccxt_core::error::Result<()> {
2710    /// let exchange = Binance::new();
2711    ///
2712    /// // Watch order book with 100 ms updates
2713    /// let orderbook = exchange.watch_order_book("BTC/USDT", None, None).await?;
2714    /// println!("Best bid: {:?}", orderbook.best_bid());
2715    /// println!("Best ask: {:?}", orderbook.best_ask());
2716    ///
2717    /// // Watch order book limited to 100 levels with 1000 ms updates
2718    /// let mut params = HashMap::new();
2719    /// params.insert("speed".to_string(), json!(1000));
2720    /// let orderbook = exchange.watch_order_book(
2721    ///     "BTC/USDT",
2722    ///     Some(100),
2723    ///     Some(params)
2724    /// ).await?;
2725    /// # Ok(())
2726    /// # }
2727    /// ```
2728    pub async fn watch_order_book(
2729        &self,
2730        symbol: &str,
2731        limit: Option<i64>,
2732        params: Option<HashMap<String, Value>>,
2733    ) -> Result<OrderBook> {
2734        // Load market metadata
2735        self.load_markets(false).await?;
2736
2737        // Resolve market details
2738        let market = self.base.market(symbol).await?;
2739        let binance_symbol = market.id.to_lowercase();
2740
2741        // Determine whether this is a futures market
2742        let is_futures =
2743            market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2744
2745        // Determine update speed
2746        let update_speed = if let Some(p) = &params {
2747            p.get("speed").and_then(|v| v.as_i64()).unwrap_or(100) as i32
2748        } else {
2749            100
2750        };
2751
2752        // Validate update speed
2753        if update_speed != 100 && update_speed != 1000 {
2754            return Err(Error::invalid_request(
2755                "Update speed must be 100 or 1000 milliseconds",
2756            ));
2757        }
2758
2759        // Establish WebSocket connection
2760        let ws = self.create_ws();
2761        ws.connect().await?;
2762
2763        // Watch the order book
2764        ws.watch_orderbook_internal(self, &binance_symbol, limit, update_speed, is_futures)
2765            .await
2766    }
2767
2768    /// Watches order books for multiple symbols
2769    ///
2770    /// # Arguments
2771    /// * `symbols` - List of trading pairs (maximum 200)
2772    /// * `limit` - Optional depth limit
2773    /// * `params` - Optional parameters
2774    ///   - `speed`: Update frequency (100 or 1000 ms)
2775    ///
2776    /// # Returns
2777    /// Mapping of symbol to corresponding order book
2778    ///
2779    /// # Example
2780    /// ```rust,no_run
2781    /// # use ccxt_exchanges::binance::Binance;
2782    /// # async fn example() -> ccxt_core::error::Result<()> {
2783    /// let exchange = Binance::new();
2784    ///
2785    /// let symbols = vec![
2786    ///     "BTC/USDT".to_string(),
2787    ///     "ETH/USDT".to_string(),
2788    /// ];
2789    ///
2790    /// let orderbooks = exchange.watch_order_books(symbols, None, None).await?;
2791    /// for (symbol, ob) in orderbooks {
2792    ///     println!("{}: spread = {:?}", symbol, ob.spread());
2793    /// }
2794    /// # Ok(())
2795    /// # }
2796    /// ```
2797    pub async fn watch_order_books(
2798        &self,
2799        symbols: Vec<String>,
2800        limit: Option<i64>,
2801        params: Option<HashMap<String, Value>>,
2802    ) -> Result<HashMap<String, OrderBook>> {
2803        // Enforce symbol count constraints
2804        if symbols.is_empty() {
2805            return Err(Error::invalid_request("Symbols list cannot be empty"));
2806        }
2807
2808        if symbols.len() > 200 {
2809            return Err(Error::invalid_request(
2810                "Binance supports max 200 symbols per connection",
2811            ));
2812        }
2813
2814        // Load market metadata
2815        self.load_markets(false).await?;
2816
2817        // Convert symbols to exchange format and ensure consistent market type
2818        let mut binance_symbols = Vec::new();
2819        let mut is_futures = false;
2820
2821        for symbol in &symbols {
2822            let market = self.base.market(symbol).await?;
2823            binance_symbols.push(market.id.to_lowercase());
2824
2825            let current_is_futures =
2826                market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2827            if !binance_symbols.is_empty() && current_is_futures != is_futures {
2828                return Err(Error::invalid_request(
2829                    "Cannot mix spot and futures markets in watch_order_books",
2830                ));
2831            }
2832            is_futures = current_is_futures;
2833        }
2834
2835        // Determine update speed
2836        let update_speed = if let Some(p) = &params {
2837            p.get("speed").and_then(|v| v.as_i64()).unwrap_or(100) as i32
2838        } else {
2839            100
2840        };
2841
2842        // Establish WebSocket connection
2843        let ws = self.create_ws();
2844        ws.connect().await?;
2845
2846        // Watch order books
2847        ws.watch_orderbooks_internal(self, binance_symbols, limit, update_speed, is_futures)
2848            .await
2849    }
2850
2851    /// Watches mark prices for multiple futures symbols
2852    ///
2853    /// # Arguments
2854    /// * `symbols` - Optional list of symbols (None subscribes to all)
2855    /// * `params` - Optional parameters
2856    ///   - `use1sFreq`: Whether to use 1-second updates (defaults to true)
2857    ///
2858    /// # Returns
2859    /// Mapping of symbol to ticker data
2860    pub async fn watch_mark_prices(
2861        &self,
2862        symbols: Option<Vec<String>>,
2863        params: Option<HashMap<String, Value>>,
2864    ) -> Result<HashMap<String, Ticker>> {
2865        // Load market metadata
2866        self.load_markets(false).await?;
2867
2868        // Determine update frequency
2869        let use_1s_freq = if let Some(p) = &params {
2870            p.get("use1sFreq").and_then(|v| v.as_bool()).unwrap_or(true)
2871        } else {
2872            true
2873        };
2874
2875        // Construct channel name
2876        let channel_name = if use_1s_freq {
2877            "markPrice@1s"
2878        } else {
2879            "markPrice"
2880        };
2881
2882        // Convert symbols and validate market type
2883        let binance_symbols = if let Some(syms) = symbols {
2884            let mut result = Vec::new();
2885            for symbol in syms {
2886                let market = self.base.market(&symbol).await?;
2887                if market.market_type != MarketType::Swap
2888                    && market.market_type != MarketType::Futures
2889                {
2890                    return Err(Error::invalid_request(format!(
2891                        "watch_mark_prices() does not support {} markets",
2892                        market.market_type
2893                    )));
2894                }
2895                result.push(market.id.to_lowercase());
2896            }
2897            Some(result)
2898        } else {
2899            None
2900        };
2901
2902        // Establish WebSocket connection
2903        let ws = self.create_ws();
2904        ws.connect().await?;
2905
2906        // Watch mark prices
2907        ws.watch_tickers_internal(binance_symbols, channel_name)
2908            .await
2909    }
2910    /// Streams trade data for a unified symbol
2911    ///
2912    /// # Arguments
2913    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2914    /// * `since` - Optional starting timestamp in milliseconds
2915    /// * `limit` - Optional maximum number of trades to return
2916    ///
2917    /// # Returns
2918    /// Vector of parsed trade data
2919    pub async fn watch_trades(
2920        &self,
2921        symbol: &str,
2922        since: Option<i64>,
2923        limit: Option<usize>,
2924    ) -> Result<Vec<Trade>> {
2925        // Ensure market metadata is loaded
2926        self.base.load_markets(false).await?;
2927
2928        // Resolve market information
2929        let market = self.base.market(symbol).await?;
2930        let binance_symbol = market.id.to_lowercase();
2931
2932        // Establish WebSocket connection
2933        let ws = self.create_ws();
2934        ws.connect().await?;
2935
2936        // Subscribe to the trade stream
2937        ws.subscribe_trades(&binance_symbol).await?;
2938
2939        // Process incoming messages
2940        let mut retries = 0;
2941        const MAX_RETRIES: u32 = 50;
2942
2943        while retries < MAX_RETRIES {
2944            if let Some(msg) = ws.client.receive().await {
2945                // Skip subscription acknowledgement messages
2946                if msg.get("result").is_some() || msg.get("id").is_some() {
2947                    continue;
2948                }
2949
2950                // Parse trade payload
2951                if let Ok(trade) = parser::parse_ws_trade(&msg, Some(&market)) {
2952                    // Cache trade payload
2953                    let mut trades_map = ws.trades.lock().await;
2954                    let trades = trades_map
2955                        .entry(symbol.to_string())
2956                        .or_insert_with(VecDeque::new);
2957
2958                    // Enforce cache size limit
2959                    const MAX_TRADES: usize = 1000;
2960                    if trades.len() >= MAX_TRADES {
2961                        trades.pop_front();
2962                    }
2963                    trades.push_back(trade);
2964
2965                    // Gather trades from cache
2966                    let mut result: Vec<Trade> = trades.iter().cloned().collect();
2967
2968                    // Apply optional `since` filter
2969                    if let Some(since_ts) = since {
2970                        result.retain(|t| t.timestamp >= since_ts);
2971                    }
2972
2973                    // Apply optional limit
2974                    if let Some(limit_size) = limit {
2975                        if result.len() > limit_size {
2976                            result = result.split_off(result.len() - limit_size);
2977                        }
2978                    }
2979
2980                    return Ok(result);
2981                }
2982            }
2983
2984            retries += 1;
2985            tokio::time::sleep(Duration::from_millis(100)).await;
2986        }
2987
2988        Err(Error::network("Timeout waiting for trade data"))
2989    }
2990
2991    /// Streams OHLCV data for a unified symbol
2992    ///
2993    /// # Arguments
2994    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
2995    /// * `timeframe` - Candlestick interval (e.g. "1m", "5m", "1h", "1d")
2996    /// * `since` - Optional starting timestamp in milliseconds
2997    /// * `limit` - Optional maximum number of entries to return
2998    ///
2999    /// # Returns
3000    /// Vector of OHLCV entries
3001    pub async fn watch_ohlcv(
3002        &self,
3003        symbol: &str,
3004        timeframe: &str,
3005        since: Option<i64>,
3006        limit: Option<usize>,
3007    ) -> Result<Vec<OHLCV>> {
3008        // Ensure market metadata is loaded
3009        self.base.load_markets(false).await?;
3010
3011        // Resolve market information
3012        let market = self.base.market(symbol).await?;
3013        let binance_symbol = market.id.to_lowercase();
3014
3015        // Establish WebSocket connection
3016        let ws = self.create_ws();
3017        ws.connect().await?;
3018
3019        // Subscribe to the Kline stream
3020        ws.subscribe_kline(&binance_symbol, timeframe).await?;
3021
3022        // Process incoming messages
3023        let mut retries = 0;
3024        const MAX_RETRIES: u32 = 50;
3025
3026        while retries < MAX_RETRIES {
3027            if let Some(msg) = ws.client.receive().await {
3028                // Skip subscription acknowledgement messages
3029                if msg.get("result").is_some() || msg.get("id").is_some() {
3030                    continue;
3031                }
3032
3033                // Parse OHLCV payload
3034                if let Ok(ohlcv) = parser::parse_ws_ohlcv(&msg) {
3035                    // Cache OHLCV entries
3036                    let cache_key = format!("{}:{}", symbol, timeframe);
3037                    let mut ohlcvs_map = ws.ohlcvs.lock().await;
3038                    let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
3039
3040                    // Enforce cache size limit
3041                    const MAX_OHLCVS: usize = 1000;
3042                    if ohlcvs.len() >= MAX_OHLCVS {
3043                        ohlcvs.pop_front();
3044                    }
3045                    ohlcvs.push_back(ohlcv);
3046
3047                    // Collect results from cache
3048                    let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
3049
3050                    // Apply optional `since` filter
3051                    if let Some(since_ts) = since {
3052                        result.retain(|o| o.timestamp >= since_ts);
3053                    }
3054
3055                    // Apply optional limit
3056                    if let Some(limit_size) = limit {
3057                        if result.len() > limit_size {
3058                            result = result.split_off(result.len() - limit_size);
3059                        }
3060                    }
3061
3062                    return Ok(result);
3063                }
3064            }
3065
3066            retries += 1;
3067            tokio::time::sleep(Duration::from_millis(100)).await;
3068        }
3069
3070        Err(Error::network("Timeout waiting for OHLCV data"))
3071    }
3072
3073    /// Streams the best bid/ask data for a unified symbol
3074    ///
3075    /// # Arguments
3076    /// * `symbol` - Unified trading pair identifier (e.g. "BTC/USDT")
3077    ///
3078    /// # Returns
3079    /// Latest bid/ask snapshot
3080    pub async fn watch_bids_asks(&self, symbol: &str) -> Result<BidAsk> {
3081        // Ensure market metadata is loaded
3082        self.base.load_markets(false).await?;
3083
3084        // Resolve market details
3085        let market = self.base.market(symbol).await?;
3086        let binance_symbol = market.id.to_lowercase();
3087
3088        // Establish WebSocket connection
3089        let ws = self.create_ws();
3090        ws.connect().await?;
3091
3092        // Subscribe to the bookTicker stream
3093        let stream_name = format!("{}@bookTicker", binance_symbol);
3094        ws.client
3095            .subscribe(stream_name, Some(symbol.to_string()), None)
3096            .await?;
3097
3098        // Process incoming messages
3099        let mut retries = 0;
3100        const MAX_RETRIES: u32 = 50;
3101
3102        while retries < MAX_RETRIES {
3103            if let Some(msg) = ws.client.receive().await {
3104                // Skip subscription acknowledgement messages
3105                if msg.get("result").is_some() || msg.get("id").is_some() {
3106                    continue;
3107                }
3108
3109                // Parse bid/ask payload
3110                if let Ok(bid_ask) = parser::parse_ws_bid_ask(&msg) {
3111                    // Cache the snapshot
3112                    let mut bids_asks_map = ws.bids_asks.lock().await;
3113                    bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
3114
3115                    return Ok(bid_ask);
3116                }
3117            }
3118
3119            retries += 1;
3120            tokio::time::sleep(Duration::from_millis(100)).await;
3121        }
3122
3123        Err(Error::network("Timeout waiting for BidAsk data"))
3124    }
3125
3126    /// Streams account balance changes (private user data stream)
3127    ///
3128    /// # Arguments
3129    /// * `params` - Optional parameters
3130    ///   - `type`: Account type (spot/future/delivery/margin, etc.)
3131    ///   - `fetchBalanceSnapshot`: Whether to fetch an initial snapshot (default false)
3132    ///   - `awaitBalanceSnapshot`: Whether to wait for snapshot completion (default true)
3133    ///
3134    /// # Returns
3135    /// Updated account balances
3136    ///
3137    /// # Example
3138    ///
3139    /// ```rust,no_run
3140    /// # use ccxt_exchanges::binance::Binance;
3141    /// # use ccxt_core::ExchangeConfig;
3142    /// # use std::collections::HashMap;
3143    /// # use serde_json::json;
3144    /// # async fn example() -> ccxt_core::error::Result<()> {
3145    /// let mut config = ExchangeConfig::default();
3146    /// config.api_key = Some("your-api-key".to_string());
3147    /// config.secret = Some("your-secret".to_string());
3148    /// let exchange = Binance::new(config)?;
3149    ///
3150    /// // Watch spot account balance
3151    /// let balance = exchange.watch_balance(None).await?;
3152    ///
3153    /// // Watch futures account balance
3154    /// let mut params = HashMap::new();
3155    /// params.insert("type".to_string(), json!("future"));
3156    /// let futures_balance = exchange.watch_balance(Some(params)).await?;
3157    /// # Ok(())
3158    /// # }
3159    /// ```
3160    pub async fn watch_balance(
3161        self: Arc<Self>,
3162        params: Option<HashMap<String, Value>>,
3163    ) -> Result<Balance> {
3164        // Ensure market metadata is loaded
3165        self.base.load_markets(false).await?;
3166
3167        // Resolve account type
3168        let account_type = if let Some(p) = &params {
3169            p.get("type")
3170                .and_then(|v| v.as_str())
3171                .unwrap_or(&self.options.default_type)
3172        } else {
3173            &self.options.default_type
3174        };
3175
3176        // Determine configuration flags
3177        let fetch_snapshot = if let Some(p) = &params {
3178            p.get("fetchBalanceSnapshot")
3179                .and_then(|v| v.as_bool())
3180                .unwrap_or(false)
3181        } else {
3182            false
3183        };
3184
3185        let await_snapshot = if let Some(p) = &params {
3186            p.get("awaitBalanceSnapshot")
3187                .and_then(|v| v.as_bool())
3188                .unwrap_or(true)
3189        } else {
3190            true
3191        };
3192
3193        // Establish authenticated WebSocket connection
3194        let ws = self.create_authenticated_ws();
3195        ws.connect().await?;
3196
3197        // Optionally fetch the initial snapshot
3198        if fetch_snapshot {
3199            let snapshot = self.fetch_balance(params.clone()).await?;
3200
3201            // Update cache with the snapshot
3202            let mut balances = ws.balances.write().await;
3203            balances.insert(account_type.to_string(), snapshot.clone());
3204
3205            if !await_snapshot {
3206                return Ok(snapshot);
3207            }
3208        }
3209
3210        // Process balance update messages
3211        let mut retries = 0;
3212        const MAX_RETRIES: u32 = 100;
3213
3214        while retries < MAX_RETRIES {
3215            if let Some(msg) = ws.client.receive().await {
3216                // Skip subscription acknowledgement messages
3217                if msg.get("result").is_some() || msg.get("id").is_some() {
3218                    continue;
3219                }
3220
3221                // Determine message event type
3222                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3223                    // Handle supported balance message types
3224                    match event_type {
3225                        "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE" => {
3226                            // Parse and update local balance cache
3227                            if let Ok(()) = ws.handle_balance_message(&msg, account_type).await {
3228                                // Retrieve the updated balance snapshot
3229                                let balances = ws.balances.read().await;
3230                                if let Some(balance) = balances.get(account_type) {
3231                                    return Ok(balance.clone());
3232                                }
3233                            }
3234                        }
3235                        _ => {}
3236                    }
3237                }
3238            }
3239
3240            retries += 1;
3241            tokio::time::sleep(Duration::from_millis(100)).await;
3242        }
3243
3244        Err(Error::network("Timeout waiting for balance data"))
3245    }
3246
3247    /// Watches authenticated order updates via the user data stream
3248    ///
3249    /// Streams real-time order status changes delivered by Binance user data WebSocket messages
3250    ///
3251    /// # Arguments
3252    /// * `symbol` - Optional trading pair filter (e.g. "BTC/USDT")
3253    /// * `since` - Optional starting timestamp in milliseconds
3254    /// * `limit` - Optional maximum number of orders to return
3255    /// * `params` - Optional additional parameters
3256    ///
3257    /// # Returns
3258    /// Orders returned in descending chronological order
3259    ///
3260    /// # Examples
3261    /// ```no_run
3262    /// use std::sync::Arc;
3263    /// use ccxt_exchanges::binance::Binance;
3264    ///
3265    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
3266    ///     let exchange = Arc::new(Binance::new(Default::default()));
3267    ///
3268    ///     // Watch all order updates
3269    ///     let orders = exchange.watch_orders(None, None, None, None).await?;
3270    ///     println!("Received {} order updates", orders.len());
3271    ///
3272    ///     // Watch updates for a specific trading pair
3273    ///     let btc_orders = exchange.watch_orders(Some("BTC/USDT"), None, None, None).await?;
3274    ///     println!("BTC/USDT orders: {:?}", btc_orders);
3275    ///
3276    ///     Ok(())
3277    /// }
3278    /// ```
3279    pub async fn watch_orders(
3280        self: Arc<Self>,
3281        symbol: Option<&str>,
3282        since: Option<i64>,
3283        limit: Option<usize>,
3284        _params: Option<HashMap<String, Value>>,
3285    ) -> Result<Vec<Order>> {
3286        self.base.load_markets(false).await?;
3287
3288        let ws = self.create_authenticated_ws();
3289        ws.connect().await?;
3290
3291        // Receive messages in a loop
3292        loop {
3293            if let Some(msg) = ws.client.receive().await {
3294                if let Value::Object(data) = msg {
3295                    if let Some(event_type) = data.get("e").and_then(|v| v.as_str()) {
3296                        match event_type {
3297                            "executionReport" => {
3298                                // Parse order payload
3299                                let order = self.parse_ws_order(&data)?;
3300
3301                                // Update order cache
3302                                let mut orders = ws.orders.write().await;
3303                                let symbol_orders = orders
3304                                    .entry(order.symbol.clone())
3305                                    .or_insert_with(HashMap::new);
3306                                symbol_orders.insert(order.id.clone(), order.clone());
3307                                drop(orders);
3308
3309                                // Check for trade execution events
3310                                if let Some(exec_type) = data.get("x").and_then(|v| v.as_str()) {
3311                                    if exec_type == "TRADE" {
3312                                        // Parse execution trade payload
3313                                        if let Ok(trade) =
3314                                            ws.parse_ws_trade(&Value::Object(data.clone()))
3315                                        {
3316                                            // Update my trades cache
3317                                            let mut trades = ws.my_trades.write().await;
3318                                            let symbol_trades = trades
3319                                                .entry(trade.symbol.clone())
3320                                                .or_insert_with(VecDeque::new);
3321
3322                                            // Prepend newest trade and enforce max length of 1000
3323                                            symbol_trades.push_front(trade);
3324                                            if symbol_trades.len() > 1000 {
3325                                                symbol_trades.pop_back();
3326                                            }
3327                                        }
3328                                    }
3329                                }
3330
3331                                // Return filtered orders
3332                                return self.filter_orders(&ws, symbol, since, limit).await;
3333                            }
3334                            _ => continue,
3335                        }
3336                    }
3337                }
3338            } else {
3339                tokio::time::sleep(Duration::from_millis(100)).await;
3340            }
3341        }
3342    }
3343
3344    /// Parses a WebSocket order message
3345    fn parse_ws_order(&self, data: &serde_json::Map<String, Value>) -> Result<Order> {
3346        use ccxt_core::types::{OrderSide, OrderStatus, OrderType};
3347        use rust_decimal::Decimal;
3348        use std::str::FromStr;
3349
3350        // Extract core fields
3351        let symbol = data.get("s").and_then(|v| v.as_str()).unwrap_or("");
3352        let order_id = data
3353            .get("i")
3354            .and_then(|v| v.as_i64())
3355            .map(|id| id.to_string())
3356            .unwrap_or_default();
3357        let client_order_id = data.get("c").and_then(|v| v.as_str()).map(String::from);
3358
3359        // Map order status
3360        let status_str = data.get("X").and_then(|v| v.as_str()).unwrap_or("NEW");
3361        let status = match status_str {
3362            "NEW" => OrderStatus::Open,
3363            "PARTIALLY_FILLED" => OrderStatus::Open,
3364            "FILLED" => OrderStatus::Closed,
3365            "CANCELED" => OrderStatus::Canceled,
3366            "REJECTED" => OrderStatus::Rejected,
3367            "EXPIRED" => OrderStatus::Expired,
3368            _ => OrderStatus::Open,
3369        };
3370
3371        // Map order side
3372        let side_str = data.get("S").and_then(|v| v.as_str()).unwrap_or("BUY");
3373        let side = match side_str {
3374            "BUY" => OrderSide::Buy,
3375            "SELL" => OrderSide::Sell,
3376            _ => OrderSide::Buy,
3377        };
3378
3379        // Map order type
3380        let type_str = data.get("o").and_then(|v| v.as_str()).unwrap_or("LIMIT");
3381        let order_type = match type_str {
3382            "LIMIT" => OrderType::Limit,
3383            "MARKET" => OrderType::Market,
3384            "STOP_LOSS" => OrderType::StopLoss,
3385            "STOP_LOSS_LIMIT" => OrderType::StopLossLimit,
3386            "TAKE_PROFIT" => OrderType::TakeProfit,
3387            "TAKE_PROFIT_LIMIT" => OrderType::TakeProfitLimit,
3388            "LIMIT_MAKER" => OrderType::LimitMaker,
3389            _ => OrderType::Limit,
3390        };
3391
3392        // Parse amount and price fields
3393        let amount = data
3394            .get("q")
3395            .and_then(|v| v.as_str())
3396            .and_then(|s| Decimal::from_str(s).ok())
3397            .unwrap_or(Decimal::ZERO);
3398
3399        let price = data
3400            .get("p")
3401            .and_then(|v| v.as_str())
3402            .and_then(|s| Decimal::from_str(s).ok());
3403
3404        let filled = data
3405            .get("z")
3406            .and_then(|v| v.as_str())
3407            .and_then(|s| Decimal::from_str(s).ok());
3408
3409        let cost = data
3410            .get("Z")
3411            .and_then(|v| v.as_str())
3412            .and_then(|s| Decimal::from_str(s).ok());
3413
3414        // Derive remaining quantity
3415        let remaining = match filled {
3416            Some(fill) => Some(amount - fill),
3417            None => None,
3418        };
3419
3420        // Compute average price
3421        let average = match (filled, cost) {
3422            (Some(fill), Some(c)) if fill > Decimal::ZERO && c > Decimal::ZERO => Some(c / fill),
3423            _ => None,
3424        };
3425
3426        // Parse timestamps
3427        let timestamp = data.get("T").and_then(|v| v.as_i64());
3428        let last_trade_timestamp = data.get("T").and_then(|v| v.as_i64());
3429
3430        Ok(Order {
3431            id: order_id,
3432            client_order_id,
3433            timestamp,
3434            datetime: timestamp.map(|ts| {
3435                chrono::DateTime::from_timestamp_millis(ts)
3436                    .map(|dt| dt.to_rfc3339())
3437                    .unwrap_or_default()
3438            }),
3439            last_trade_timestamp,
3440            symbol: symbol.to_string(),
3441            order_type,
3442            side,
3443            price,
3444            average,
3445            amount,
3446            cost,
3447            filled,
3448            remaining,
3449            status,
3450            fee: None,
3451            fees: None,
3452            trades: None,
3453            time_in_force: data.get("f").and_then(|v| v.as_str()).map(String::from),
3454            post_only: None,
3455            reduce_only: None,
3456            stop_price: data
3457                .get("P")
3458                .and_then(|v| v.as_str())
3459                .and_then(|s| Decimal::from_str(s).ok()),
3460            trigger_price: None,
3461            take_profit_price: None,
3462            stop_loss_price: None,
3463            trailing_delta: None,
3464            trailing_percent: None,
3465            activation_price: None,
3466            callback_rate: None,
3467            working_type: data.get("wt").and_then(|v| v.as_str()).map(String::from),
3468            info: data.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
3469        })
3470    }
3471
3472    /// Filters cached orders by symbol, time range, and limit
3473    async fn filter_orders(
3474        &self,
3475        ws: &BinanceWs,
3476        symbol: Option<&str>,
3477        since: Option<i64>,
3478        limit: Option<usize>,
3479    ) -> Result<Vec<Order>> {
3480        let orders_map = ws.orders.read().await;
3481
3482        // Filter by symbol when provided
3483        let mut orders: Vec<Order> = if let Some(sym) = symbol {
3484            orders_map
3485                .get(sym)
3486                .map(|symbol_orders| symbol_orders.values().cloned().collect())
3487                .unwrap_or_default()
3488        } else {
3489            orders_map
3490                .values()
3491                .flat_map(|symbol_orders| symbol_orders.values().cloned())
3492                .collect()
3493        };
3494
3495        // Apply optional `since` filter
3496        if let Some(since_ts) = since {
3497            orders.retain(|order| order.timestamp.map_or(false, |ts| ts >= since_ts));
3498        }
3499
3500        // Sort by timestamp descending
3501        orders.sort_by(|a, b| {
3502            let ts_a = a.timestamp.unwrap_or(0);
3503            let ts_b = b.timestamp.unwrap_or(0);
3504            ts_b.cmp(&ts_a)
3505        });
3506
3507        // Apply optional limit
3508        if let Some(lim) = limit {
3509            orders.truncate(lim);
3510        }
3511
3512        Ok(orders)
3513    }
3514
3515    /// Watches authenticated user trade updates
3516    ///
3517    /// # Arguments
3518    /// * `symbol` - Optional trading pair to filter (None subscribes to all)
3519    /// * `since` - Starting timestamp in milliseconds
3520    /// * `limit` - Maximum number of trades to return
3521    /// * `params` - Additional parameters
3522    ///
3523    /// # Returns
3524    /// List of trade records
3525    ///
3526    /// # Example
3527    /// ```rust,no_run
3528    /// use std::sync::Arc;
3529    /// use ccxt_exchanges::binance::Binance;
3530    ///
3531    /// #[tokio::main]
3532    /// async fn main() {
3533    ///     let exchange = Arc::new(Binance::new(Default::default()));
3534    ///     let ws = exchange.create_authenticated_ws();
3535    ///     
3536    ///     // Subscribe to BTC/USDT trade updates
3537    ///     let trades = ws.watch_my_trades(Some("BTC/USDT"), None, None, None).await.unwrap();
3538    ///     println!("My trades: {:?}", trades);
3539    /// }
3540    /// ```
3541    pub async fn watch_my_trades(
3542        self: Arc<Self>,
3543        symbol: Option<&str>,
3544        since: Option<i64>,
3545        limit: Option<usize>,
3546        _params: Option<HashMap<String, Value>>,
3547    ) -> Result<Vec<Trade>> {
3548        // Establish authenticated WebSocket connection
3549        let ws = self.create_authenticated_ws();
3550        ws.connect().await?;
3551
3552        // Process trade update messages
3553        let mut retries = 0;
3554        const MAX_RETRIES: u32 = 100;
3555
3556        while retries < MAX_RETRIES {
3557            if let Some(msg) = ws.client.receive().await {
3558                // Skip subscription acknowledgements
3559                if msg.get("result").is_some() || msg.get("id").is_some() {
3560                    continue;
3561                }
3562
3563                // Identify event type
3564                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3565                    // Handle executionReport events containing trade updates
3566                    if event_type == "executionReport" {
3567                        if let Ok(trade) = ws.parse_ws_trade(&msg) {
3568                            let symbol_key = trade.symbol.clone();
3569
3570                            // Update cached trades
3571                            let mut trades_map = ws.my_trades.write().await;
3572                            let symbol_trades =
3573                                trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
3574
3575                            // Prepend latest trade; bound cache to 1000 entries
3576                            symbol_trades.push_front(trade);
3577                            if symbol_trades.len() > 1000 {
3578                                symbol_trades.pop_back();
3579                            }
3580                        }
3581                    }
3582                }
3583            } else {
3584                tokio::time::sleep(Duration::from_millis(100)).await;
3585            }
3586
3587            retries += 1;
3588        }
3589
3590        // Filter and return personal trades
3591        ws.filter_my_trades(symbol, since, limit).await
3592    }
3593
3594    /// Watches authenticated futures position updates
3595    ///
3596    /// Receives ACCOUNT_UPDATE events via the user data stream to track changes to futures
3597    /// Supports both USD-margined (USD-M) and coin-margined (COIN-M) contracts.
3598    ///
3599    /// # Arguments
3600    /// * `symbols` - Optional list of symbols (None subscribes to all positions)
3601    /// * `since` - Optional starting timestamp
3602    /// * `limit` - Optional maximum number of positions to return
3603    /// * `params` - Optional parameters
3604    ///   - `type`: Market type (`future`/`delivery`, default `future`)
3605    ///   - `subType`: Subtype (`linear`/`inverse`)
3606    ///
3607    /// # Returns
3608    /// Collection of positions
3609    ///
3610    /// # Implementation Details
3611    /// 1. Subscribe to ACCOUNT_UPDATE events through the user data stream.
3612    /// 2. Parse the position data contained in the `P` array.
3613    /// 3. Update the internal position cache.
3614    /// 4. Filter results according to the provided arguments.
3615    ///
3616    /// # WebSocket Message Format
3617    /// ```json
3618    /// {
3619    ///   "e": "ACCOUNT_UPDATE",
3620    ///   "T": 1667881353112,
3621    ///   "E": 1667881353115,
3622    ///   "a": {
3623    ///     "P": [
3624    ///       {
3625    ///         "s": "BTCUSDT",
3626    ///         "pa": "-0.089",
3627    ///         "ep": "19700.03933",
3628    ///         "up": "1.53058860",
3629    ///         "mt": "isolated",
3630    ///         "ps": "BOTH"
3631    ///       }
3632    ///     ]
3633    ///   }
3634    /// }
3635    /// ```
3636    ///
3637    /// # Example
3638    /// ```rust,no_run
3639    /// # use ccxt_exchanges::binance::Binance;
3640    /// # use std::sync::Arc;
3641    /// # async fn example() -> ccxt_core::error::Result<()> {
3642    /// let exchange = Arc::new(Binance::new());
3643    ///
3644    /// // Watch all positions
3645    /// let positions = exchange.watch_positions(None, None, None, None).await?;
3646    /// for pos in positions {
3647    ///     println!("Symbol: {}, Side: {:?}, Contracts: {:?}",
3648    ///              pos.symbol, pos.side, pos.contracts);
3649    /// }
3650    ///
3651    /// // Watch a subset of symbols
3652    /// let symbols = vec!["BTC/USDT".to_string(), "ETH/USDT".to_string()];
3653    /// let positions = exchange.watch_positions(Some(symbols), None, Some(10), None).await?;
3654    /// # Ok(())
3655    /// # }
3656    /// ```
3657    pub async fn watch_positions(
3658        self: Arc<Self>,
3659        symbols: Option<Vec<String>>,
3660        since: Option<i64>,
3661        limit: Option<usize>,
3662        _params: Option<HashMap<String, Value>>,
3663    ) -> Result<Vec<Position>> {
3664        // Establish authenticated WebSocket connection
3665        let ws = self.create_authenticated_ws();
3666        ws.connect().await?;
3667
3668        // Process position update messages
3669        let mut retries = 0;
3670        const MAX_RETRIES: u32 = 100;
3671
3672        while retries < MAX_RETRIES {
3673            if let Some(msg) = ws.client.receive().await {
3674                // Skip subscription acknowledgement messages
3675                if msg.get("result").is_some() || msg.get("id").is_some() {
3676                    continue;
3677                }
3678
3679                // Handle ACCOUNT_UPDATE events only
3680                if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3681                    if event_type == "ACCOUNT_UPDATE" {
3682                        if let Some(account_data) = msg.get("a") {
3683                            if let Some(positions_array) =
3684                                account_data.get("P").and_then(|p| p.as_array())
3685                            {
3686                                for position_data in positions_array {
3687                                    if let Ok(position) = ws.parse_ws_position(position_data).await
3688                                    {
3689                                        let symbol_key = position.symbol.clone();
3690                                        let side_key = position
3691                                            .side
3692                                            .clone()
3693                                            .unwrap_or_else(|| "both".to_string());
3694
3695                                        // Update cached positions
3696                                        let mut positions_map = ws.positions.write().await;
3697                                        let symbol_positions = positions_map
3698                                            .entry(symbol_key)
3699                                            .or_insert_with(HashMap::new);
3700
3701                                        // Remove positions with effectively zero contracts
3702                                        if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
3703                                            symbol_positions.remove(&side_key);
3704                                            if symbol_positions.is_empty() {
3705                                                positions_map.remove(&position.symbol);
3706                                            }
3707                                        } else {
3708                                            symbol_positions.insert(side_key, position);
3709                                        }
3710                                    }
3711                                }
3712                            }
3713                        }
3714                    }
3715                }
3716            } else {
3717                tokio::time::sleep(Duration::from_millis(100)).await;
3718            }
3719
3720            retries += 1;
3721        }
3722
3723        // Filter and return positions
3724        let symbols_ref = symbols.as_ref().map(|v| v.as_slice());
3725        ws.filter_positions(symbols_ref, since, limit).await
3726    }
3727}
3728
3729#[cfg(test)]
3730mod tests {
3731    use super::*;
3732
3733    #[test]
3734    fn test_binance_ws_creation() {
3735        let ws = BinanceWs::new(WS_BASE_URL.to_string());
3736        // Basic creation test: ensure the listen key lock is accessible
3737        assert!(ws.listen_key.try_read().is_ok());
3738    }
3739
3740    #[test]
3741    fn test_stream_format() {
3742        let symbol = "btcusdt";
3743
3744        // Ticker stream format
3745        let ticker_stream = format!("{}@ticker", symbol);
3746        assert_eq!(ticker_stream, "btcusdt@ticker");
3747
3748        // Trade stream format
3749        let trade_stream = format!("{}@trade", symbol);
3750        assert_eq!(trade_stream, "btcusdt@trade");
3751
3752        // Depth stream format
3753        let depth_stream = format!("{}@depth20", symbol);
3754        assert_eq!(depth_stream, "btcusdt@depth20");
3755
3756        // Kline stream format
3757        let kline_stream = format!("{}@kline_1m", symbol);
3758        assert_eq!(kline_stream, "btcusdt@kline_1m");
3759    }
3760
3761    #[tokio::test]
3762    async fn test_subscription_manager_basic() {
3763        let manager = SubscriptionManager::new();
3764        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
3765
3766        // Validate initial state
3767        assert_eq!(manager.active_count(), 0);
3768        assert!(!manager.has_subscription("btcusdt@ticker").await);
3769
3770        // Add a subscription
3771        manager
3772            .add_subscription(
3773                "btcusdt@ticker".to_string(),
3774                "BTCUSDT".to_string(),
3775                SubscriptionType::Ticker,
3776                tx.clone(),
3777            )
3778            .await
3779            .unwrap();
3780
3781        assert_eq!(manager.active_count(), 1);
3782        assert!(manager.has_subscription("btcusdt@ticker").await);
3783
3784        // Retrieve subscription
3785        let sub = manager.get_subscription("btcusdt@ticker").await;
3786        assert!(sub.is_some());
3787        let sub = sub.unwrap();
3788        assert_eq!(sub.stream, "btcusdt@ticker");
3789        assert_eq!(sub.symbol, "BTCUSDT");
3790        assert_eq!(sub.sub_type, SubscriptionType::Ticker);
3791
3792        // Remove subscription
3793        manager.remove_subscription("btcusdt@ticker").await.unwrap();
3794        assert_eq!(manager.active_count(), 0);
3795        assert!(!manager.has_subscription("btcusdt@ticker").await);
3796    }
3797
3798    #[tokio::test]
3799    async fn test_subscription_manager_multiple() {
3800        let manager = SubscriptionManager::new();
3801        let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
3802        let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
3803        let (tx3, _rx3) = tokio::sync::mpsc::unbounded_channel();
3804
3805        // Add multiple subscriptions
3806        manager
3807            .add_subscription(
3808                "btcusdt@ticker".to_string(),
3809                "BTCUSDT".to_string(),
3810                SubscriptionType::Ticker,
3811                tx1,
3812            )
3813            .await
3814            .unwrap();
3815
3816        manager
3817            .add_subscription(
3818                "btcusdt@depth".to_string(),
3819                "BTCUSDT".to_string(),
3820                SubscriptionType::OrderBook,
3821                tx2,
3822            )
3823            .await
3824            .unwrap();
3825
3826        manager
3827            .add_subscription(
3828                "ethusdt@ticker".to_string(),
3829                "ETHUSDT".to_string(),
3830                SubscriptionType::Ticker,
3831                tx3,
3832            )
3833            .await
3834            .unwrap();
3835
3836        assert_eq!(manager.active_count(), 3);
3837
3838        // Query by symbol
3839        let btc_subs = manager.get_subscriptions_by_symbol("BTCUSDT").await;
3840        assert_eq!(btc_subs.len(), 2);
3841
3842        let eth_subs = manager.get_subscriptions_by_symbol("ETHUSDT").await;
3843        assert_eq!(eth_subs.len(), 1);
3844
3845        // Retrieve all subscriptions
3846        let all_subs = manager.get_all_subscriptions().await;
3847        assert_eq!(all_subs.len(), 3);
3848
3849        // Clear all subscriptions
3850        manager.clear().await;
3851        assert_eq!(manager.active_count(), 0);
3852    }
3853
3854    #[tokio::test]
3855    async fn test_subscription_type_from_stream() {
3856        // Ticker stream
3857        let sub_type = SubscriptionType::from_stream("btcusdt@ticker");
3858        assert_eq!(sub_type, Some(SubscriptionType::Ticker));
3859
3860        // Order book streams
3861        let sub_type = SubscriptionType::from_stream("btcusdt@depth");
3862        assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3863
3864        let sub_type = SubscriptionType::from_stream("btcusdt@depth@100ms");
3865        assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3866
3867        // Trade streams
3868        let sub_type = SubscriptionType::from_stream("btcusdt@trade");
3869        assert_eq!(sub_type, Some(SubscriptionType::Trades));
3870
3871        let sub_type = SubscriptionType::from_stream("btcusdt@aggTrade");
3872        assert_eq!(sub_type, Some(SubscriptionType::Trades));
3873
3874        // Kline streams
3875        let sub_type = SubscriptionType::from_stream("btcusdt@kline_1m");
3876        assert_eq!(sub_type, Some(SubscriptionType::Kline("1m".to_string())));
3877
3878        let sub_type = SubscriptionType::from_stream("btcusdt@kline_1h");
3879        assert_eq!(sub_type, Some(SubscriptionType::Kline("1h".to_string())));
3880
3881        // Mark price stream
3882        let sub_type = SubscriptionType::from_stream("btcusdt@markPrice");
3883        assert_eq!(sub_type, Some(SubscriptionType::MarkPrice));
3884
3885        // Book ticker stream
3886        let sub_type = SubscriptionType::from_stream("btcusdt@bookTicker");
3887        assert_eq!(sub_type, Some(SubscriptionType::BookTicker));
3888
3889        // Unknown stream
3890        let sub_type = SubscriptionType::from_stream("btcusdt@unknown");
3891        assert_eq!(sub_type, None);
3892    }
3893
3894    #[tokio::test]
3895    async fn test_subscription_send_message() {
3896        let manager = SubscriptionManager::new();
3897        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
3898
3899        // Add subscription
3900        manager
3901            .add_subscription(
3902                "btcusdt@ticker".to_string(),
3903                "BTCUSDT".to_string(),
3904                SubscriptionType::Ticker,
3905                tx,
3906            )
3907            .await
3908            .unwrap();
3909
3910        // Send message to stream
3911        let test_msg = serde_json::json!({
3912            "e": "24hrTicker",
3913            "s": "BTCUSDT",
3914            "c": "50000"
3915        });
3916
3917        let sent = manager
3918            .send_to_stream("btcusdt@ticker", test_msg.clone())
3919            .await;
3920        assert!(sent);
3921
3922        // Receive message
3923        let received = rx.recv().await;
3924        assert!(received.is_some());
3925        assert_eq!(received.unwrap(), test_msg);
3926    }
3927
3928    #[tokio::test]
3929    async fn test_subscription_send_to_symbol() {
3930        let manager = SubscriptionManager::new();
3931        let (tx1, mut rx1) = tokio::sync::mpsc::unbounded_channel();
3932        let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel();
3933
3934        // Add two subscriptions for the same symbol
3935        manager
3936            .add_subscription(
3937                "btcusdt@ticker".to_string(),
3938                "BTCUSDT".to_string(),
3939                SubscriptionType::Ticker,
3940                tx1,
3941            )
3942            .await
3943            .unwrap();
3944
3945        manager
3946            .add_subscription(
3947                "btcusdt@depth".to_string(),
3948                "BTCUSDT".to_string(),
3949                SubscriptionType::OrderBook,
3950                tx2,
3951            )
3952            .await
3953            .unwrap();
3954
3955        // Send a message to all subscriptions for the symbol
3956        let test_msg = serde_json::json!({
3957            "s": "BTCUSDT",
3958            "data": "test"
3959        });
3960
3961        let sent_count = manager.send_to_symbol("BTCUSDT", &test_msg).await;
3962        assert_eq!(sent_count, 2);
3963
3964        // Receive messages
3965        let received1 = rx1.recv().await;
3966        assert!(received1.is_some());
3967        assert_eq!(received1.unwrap(), test_msg);
3968
3969        let received2 = rx2.recv().await;
3970        assert!(received2.is_some());
3971        assert_eq!(received2.unwrap(), test_msg);
3972    }
3973
3974    #[test]
3975    fn test_symbol_conversion() {
3976        let symbol = "BTC/USDT";
3977        let binance_symbol = symbol.replace('/', "").to_lowercase();
3978        assert_eq!(binance_symbol, "btcusdt");
3979    }
3980
3981    // ==================== MessageRouter tests ====================
3982
3983    #[test]
3984    fn test_reconnect_config_default() {
3985        let config = ReconnectConfig::default();
3986
3987        assert!(config.enabled);
3988        assert_eq!(config.initial_delay_ms, 1000);
3989        assert_eq!(config.max_delay_ms, 30000);
3990        assert_eq!(config.backoff_multiplier, 2.0);
3991        assert_eq!(config.max_attempts, 0); // Unlimited retries
3992    }
3993
3994    #[test]
3995    fn test_reconnect_config_calculate_delay() {
3996        let config = ReconnectConfig::default();
3997
3998        // Exponential backoff tests
3999        assert_eq!(config.calculate_delay(0), 1000); // 1s
4000        assert_eq!(config.calculate_delay(1), 2000); // 2s
4001        assert_eq!(config.calculate_delay(2), 4000); // 4s
4002        assert_eq!(config.calculate_delay(3), 8000); // 8s
4003        assert_eq!(config.calculate_delay(4), 16000); // 16s
4004        assert_eq!(config.calculate_delay(5), 30000); // 30s (capped at max)
4005        assert_eq!(config.calculate_delay(6), 30000); // 30s (remains at max)
4006    }
4007
4008    #[test]
4009    fn test_reconnect_config_should_retry() {
4010        let mut config = ReconnectConfig::default();
4011
4012        // Unlimited retries
4013        assert!(config.should_retry(0));
4014        assert!(config.should_retry(10));
4015        assert!(config.should_retry(100));
4016
4017        // Finite retries
4018        config.max_attempts = 3;
4019        assert!(config.should_retry(0));
4020        assert!(config.should_retry(1));
4021        assert!(config.should_retry(2));
4022        assert!(!config.should_retry(3));
4023        assert!(!config.should_retry(4));
4024
4025        // Disabled retries
4026        config.enabled = false;
4027        assert!(!config.should_retry(0));
4028        assert!(!config.should_retry(1));
4029    }
4030
4031    #[test]
4032    fn test_message_router_extract_stream_name_combined() {
4033        // Combined stream format
4034        let message = serde_json::json!({
4035            "stream": "btcusdt@ticker",
4036            "data": {
4037                "e": "24hrTicker",
4038                "s": "BTCUSDT"
4039            }
4040        });
4041
4042        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4043        assert_eq!(stream_name, "btcusdt@ticker");
4044    }
4045
4046    #[test]
4047    fn test_message_router_extract_stream_name_ticker() {
4048        // Ticker single stream format
4049        let message = serde_json::json!({
4050            "e": "24hrTicker",
4051            "s": "BTCUSDT",
4052            "E": 1672531200000_u64,
4053            "c": "16950.00",
4054            "h": "17100.00"
4055        });
4056
4057        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4058        assert_eq!(stream_name, "btcusdt@ticker");
4059    }
4060
4061    #[test]
4062    fn test_message_router_extract_stream_name_depth() {
4063        // Depth single stream format
4064        let message = serde_json::json!({
4065            "e": "depthUpdate",
4066            "s": "ETHUSDT",
4067            "E": 1672531200000_u64,
4068            "U": 157,
4069            "u": 160
4070        });
4071
4072        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4073        assert_eq!(stream_name, "ethusdt@depth");
4074    }
4075
4076    #[test]
4077    fn test_message_router_extract_stream_name_trade() {
4078        // Test trade single-stream format
4079        let message = serde_json::json!({
4080            "e": "trade",
4081            "s": "BNBUSDT",
4082            "E": 1672531200000_u64,
4083            "t": 12345
4084        });
4085
4086        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4087        assert_eq!(stream_name, "bnbusdt@trade");
4088    }
4089
4090    #[test]
4091    fn test_message_router_extract_stream_name_kline() {
4092        // Kline single stream format
4093        let message = serde_json::json!({
4094            "e": "kline",
4095            "s": "BTCUSDT",
4096            "E": 1672531200000_u64,
4097            "k": {
4098                "i": "1m",
4099                "t": 1672531200000_u64,
4100                "o": "16950.00"
4101            }
4102        });
4103
4104        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4105        assert_eq!(stream_name, "btcusdt@kline_1m");
4106    }
4107
4108    #[test]
4109    fn test_message_router_extract_stream_name_mark_price() {
4110        // Mark price single stream format
4111        let message = serde_json::json!({
4112            "e": "markPriceUpdate",
4113            "s": "BTCUSDT",
4114            "E": 1672531200000_u64,
4115            "p": "16950.00"
4116        });
4117
4118        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4119        assert_eq!(stream_name, "btcusdt@markPrice");
4120    }
4121
4122    #[test]
4123    fn test_message_router_extract_stream_name_book_ticker() {
4124        // Book ticker single stream format
4125        let message = serde_json::json!({
4126            "e": "bookTicker",
4127            "s": "ETHUSDT",
4128            "E": 1672531200000_u64,
4129            "b": "1200.00",
4130            "a": "1200.50"
4131        });
4132
4133        let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4134        assert_eq!(stream_name, "ethusdt@bookTicker");
4135    }
4136
4137    #[test]
4138    fn test_message_router_extract_stream_name_subscription_response() {
4139        // Subscription response should yield an error
4140        let message = serde_json::json!({
4141            "result": null,
4142            "id": 1
4143        });
4144
4145        let result = MessageRouter::extract_stream_name(&message);
4146        assert!(result.is_err());
4147    }
4148
4149    #[test]
4150    fn test_message_router_extract_stream_name_error_response() {
4151        // Error responses should yield an error
4152        let message = serde_json::json!({
4153            "error": {
4154                "code": -1,
4155                "msg": "Invalid request"
4156            },
4157            "id": 1
4158        });
4159
4160        let result = MessageRouter::extract_stream_name(&message);
4161        assert!(result.is_err());
4162    }
4163
4164    #[test]
4165    fn test_message_router_extract_stream_name_invalid() {
4166        // Invalid message format
4167        let message = serde_json::json!({
4168            "unknown": "data"
4169        });
4170
4171        let result = MessageRouter::extract_stream_name(&message);
4172        assert!(result.is_err());
4173    }
4174
4175    #[tokio::test]
4176    async fn test_message_router_creation() {
4177        let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4178        let subscription_manager = Arc::new(SubscriptionManager::new());
4179
4180        let router = MessageRouter::new(ws_url.clone(), subscription_manager);
4181
4182        // Validate initial state
4183        assert!(!router.is_connected());
4184        assert_eq!(router.ws_url, ws_url);
4185    }
4186
4187    #[tokio::test]
4188    async fn test_message_router_reconnect_config() {
4189        let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4190        let subscription_manager = Arc::new(SubscriptionManager::new());
4191
4192        let router = MessageRouter::new(ws_url, subscription_manager);
4193
4194        // Default configuration
4195        let config = router.get_reconnect_config().await;
4196        assert!(config.enabled);
4197        assert_eq!(config.initial_delay_ms, 1000);
4198
4199        // Setting new configuration
4200        let new_config = ReconnectConfig {
4201            enabled: false,
4202            initial_delay_ms: 2000,
4203            max_delay_ms: 60000,
4204            backoff_multiplier: 1.5,
4205            max_attempts: 5,
4206        };
4207
4208        router.set_reconnect_config(new_config.clone()).await;
4209
4210        let updated_config = router.get_reconnect_config().await;
4211        assert!(!updated_config.enabled);
4212        assert_eq!(updated_config.initial_delay_ms, 2000);
4213        assert_eq!(updated_config.max_delay_ms, 60000);
4214        assert_eq!(updated_config.backoff_multiplier, 1.5);
4215        assert_eq!(updated_config.max_attempts, 5);
4216    }
4217}