1pub mod connection_manager;
49mod handlers;
50mod listen_key;
51pub mod parsers;
53mod streams;
54mod subscriptions;
55pub mod types;
57pub(crate) mod user_data;
58
59pub use connection_manager::BinanceConnectionManager;
61pub use handlers::MessageRouter;
62pub use listen_key::ListenKeyManager;
63pub use parsers::{
64 BidAskParser, MarkPriceParser, OhlcvParser, StreamParser, TickerParser, TradeParser,
65};
66pub use streams::normalize_symbol;
67pub use subscriptions::{
68 ReconnectConfig, Subscription, SubscriptionHandle, SubscriptionManager, SubscriptionType,
69};
70pub use types::{
71 BinanceWsConfig, DepthLevel, UpdateSpeed, WsChannelConfig, WsErrorRecovery, WsHealthSnapshot,
72};
73
74use crate::binance::{Binance, parser};
75use ccxt_core::error::{Error, Result};
76use ccxt_core::types::{
77 Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
78};
79use serde_json::Value;
80use std::collections::{HashMap, VecDeque};
81use std::sync::Arc;
82use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
83use std::time::Duration;
84use tokio::sync::{Mutex, RwLock};
85
86const MAX_TRADES: usize = 1000;
87const MAX_OHLCVS: usize = 1000;
88const DEFAULT_SHUTDOWN_TIMEOUT_MS: u64 = 5000;
90
91pub struct BinanceWs {
93 pub(crate) message_router: Arc<MessageRouter>,
94 pub(crate) subscription_manager: Arc<SubscriptionManager>,
95 listen_key: Arc<RwLock<Option<String>>>,
96 listen_key_manager: Option<Arc<ListenKeyManager>>,
97 pub(crate) tickers: Arc<Mutex<HashMap<String, Ticker>>>,
98 pub(crate) bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
99 #[allow(dead_code)]
100 mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
101 pub(crate) orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
102 pub(crate) trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
103 pub(crate) ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
104 pub(crate) balances: Arc<RwLock<HashMap<String, Balance>>>,
105 pub(crate) orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
106 pub(crate) my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
107 pub(crate) positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
108 channel_config: WsChannelConfig,
110 messages_received: Arc<AtomicU64>,
112 messages_dropped: Arc<AtomicU64>,
113 last_message_time: Arc<AtomicU64>,
114 connection_start_time: Arc<AtomicU64>,
115 is_shutting_down: Arc<AtomicBool>,
117 shutdown_complete: Arc<AtomicBool>,
118}
119
120impl std::fmt::Debug for BinanceWs {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("BinanceWs")
123 .field("is_connected", &self.message_router.is_connected())
124 .finish_non_exhaustive()
125 }
126}
127
128impl Drop for BinanceWs {
129 fn drop(&mut self) {
130 if !self.shutdown_complete.load(Ordering::Acquire)
132 && !self.is_shutting_down.load(Ordering::Acquire)
133 {
134 tracing::warn!(
135 "BinanceWs dropped without calling shutdown(). \
136 This may leave resources uncleaned. \
137 Consider calling shutdown() before dropping."
138 );
139 }
140 }
141}
142
143impl BinanceWs {
144 pub fn new(url: String) -> Self {
146 let subscription_manager = Arc::new(SubscriptionManager::new());
147 let message_router = Arc::new(MessageRouter::new(url, subscription_manager.clone(), None));
148
149 Self {
150 message_router,
151 subscription_manager,
152 listen_key: Arc::new(RwLock::new(None)),
153 listen_key_manager: None,
154 tickers: Arc::new(Mutex::new(HashMap::new())),
155 bids_asks: Arc::new(Mutex::new(HashMap::new())),
156 mark_prices: Arc::new(Mutex::new(HashMap::new())),
157 orderbooks: Arc::new(Mutex::new(HashMap::new())),
158 trades: Arc::new(Mutex::new(HashMap::new())),
159 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
160 balances: Arc::new(RwLock::new(HashMap::new())),
161 orders: Arc::new(RwLock::new(HashMap::new())),
162 my_trades: Arc::new(RwLock::new(HashMap::new())),
163 positions: Arc::new(RwLock::new(HashMap::new())),
164 channel_config: WsChannelConfig::default(),
165 messages_received: Arc::new(AtomicU64::new(0)),
166 messages_dropped: Arc::new(AtomicU64::new(0)),
167 last_message_time: Arc::new(AtomicU64::new(0)),
168 connection_start_time: Arc::new(AtomicU64::new(
169 chrono::Utc::now().timestamp_millis() as u64
170 )),
171 is_shutting_down: Arc::new(AtomicBool::new(false)),
172 shutdown_complete: Arc::new(AtomicBool::new(false)),
173 }
174 }
175
176 pub fn new_with_config(config: BinanceWsConfig) -> Self {
191 let subscription_manager = Arc::new(SubscriptionManager::new());
192 let message_router = Arc::new(MessageRouter::new(
193 config.url,
194 subscription_manager.clone(),
195 None,
196 ));
197
198 Self {
199 message_router,
200 subscription_manager,
201 listen_key: Arc::new(RwLock::new(None)),
202 listen_key_manager: None,
203 tickers: Arc::new(Mutex::new(HashMap::new())),
204 bids_asks: Arc::new(Mutex::new(HashMap::new())),
205 mark_prices: Arc::new(Mutex::new(HashMap::new())),
206 orderbooks: Arc::new(Mutex::new(HashMap::new())),
207 trades: Arc::new(Mutex::new(HashMap::new())),
208 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
209 balances: Arc::new(RwLock::new(HashMap::new())),
210 orders: Arc::new(RwLock::new(HashMap::new())),
211 my_trades: Arc::new(RwLock::new(HashMap::new())),
212 positions: Arc::new(RwLock::new(HashMap::new())),
213 channel_config: config.channel_config,
214 messages_received: Arc::new(AtomicU64::new(0)),
215 messages_dropped: Arc::new(AtomicU64::new(0)),
216 last_message_time: Arc::new(AtomicU64::new(0)),
217 connection_start_time: Arc::new(AtomicU64::new(
218 chrono::Utc::now().timestamp_millis() as u64
219 )),
220 is_shutting_down: Arc::new(AtomicBool::new(false)),
221 shutdown_complete: Arc::new(AtomicBool::new(false)),
222 }
223 }
224
225 pub fn new_with_auth(url: String, binance: Arc<Binance>, market_type: MarketType) -> Self {
227 let subscription_manager = Arc::new(SubscriptionManager::new());
228 let listen_key_manager = Arc::new(ListenKeyManager::new_for_market(binance, market_type));
229 let message_router = Arc::new(MessageRouter::new(
230 url,
231 subscription_manager.clone(),
232 Some(listen_key_manager.clone()),
233 ));
234
235 Self {
236 message_router,
237 subscription_manager,
238 listen_key: Arc::new(RwLock::new(None)),
239 listen_key_manager: Some(listen_key_manager),
240 tickers: Arc::new(Mutex::new(HashMap::new())),
241 bids_asks: Arc::new(Mutex::new(HashMap::new())),
242 mark_prices: Arc::new(Mutex::new(HashMap::new())),
243 orderbooks: Arc::new(Mutex::new(HashMap::new())),
244 trades: Arc::new(Mutex::new(HashMap::new())),
245 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
246 balances: Arc::new(RwLock::new(HashMap::new())),
247 orders: Arc::new(RwLock::new(HashMap::new())),
248 my_trades: Arc::new(RwLock::new(HashMap::new())),
249 positions: Arc::new(RwLock::new(HashMap::new())),
250 channel_config: WsChannelConfig::default(),
251 messages_received: Arc::new(AtomicU64::new(0)),
252 messages_dropped: Arc::new(AtomicU64::new(0)),
253 last_message_time: Arc::new(AtomicU64::new(0)),
254 connection_start_time: Arc::new(AtomicU64::new(
255 chrono::Utc::now().timestamp_millis() as u64
256 )),
257 is_shutting_down: Arc::new(AtomicBool::new(false)),
258 shutdown_complete: Arc::new(AtomicBool::new(false)),
259 }
260 }
261
262 fn channel_capacity_for(&self, sub_type: &SubscriptionType) -> usize {
264 match sub_type {
265 SubscriptionType::Ticker | SubscriptionType::BookTicker => {
266 self.channel_config.ticker_capacity
267 }
268 SubscriptionType::OrderBook => self.channel_config.orderbook_capacity,
269 SubscriptionType::Trades | SubscriptionType::Kline(_) | SubscriptionType::MarkPrice => {
270 self.channel_config.trades_capacity
271 }
272 SubscriptionType::Balance
273 | SubscriptionType::Orders
274 | SubscriptionType::MyTrades
275 | SubscriptionType::Positions => self.channel_config.user_data_capacity,
276 }
277 }
278
279 pub async fn connect(&self) -> Result<()> {
281 if self.is_connected() {
282 return Ok(());
283 }
284
285 self.message_router.start(None).await?;
286
287 Ok(())
291 }
292
293 pub async fn disconnect(&self) -> Result<()> {
295 self.message_router.stop().await?;
296
297 if let Some(manager) = &self.listen_key_manager {
298 manager.stop_auto_refresh().await;
299 }
300
301 Ok(())
302 }
303
304 pub async fn shutdown(&self) -> Result<()> {
316 if self.shutdown_complete.load(Ordering::Acquire) {
318 return Ok(());
319 }
320
321 if self.is_shutting_down.swap(true, Ordering::AcqRel) {
323 while !self.shutdown_complete.load(Ordering::Acquire) {
325 tokio::time::sleep(Duration::from_millis(10)).await;
326 }
327 return Ok(());
328 }
329
330 tracing::info!("Initiating graceful shutdown of BinanceWs");
331
332 let shutdown_result = tokio::time::timeout(
334 Duration::from_millis(DEFAULT_SHUTDOWN_TIMEOUT_MS),
335 self.message_router.stop(),
336 )
337 .await;
338
339 if shutdown_result.is_err() {
340 tracing::warn!("Shutdown timeout exceeded, forcing close");
341 }
342
343 if let Some(manager) = &self.listen_key_manager {
345 manager.stop_auto_refresh().await;
346 }
347
348 self.subscription_manager.clear().await;
350
351 self.tickers.lock().await.clear();
353 self.bids_asks.lock().await.clear();
354 self.mark_prices.lock().await.clear();
355 self.orderbooks.lock().await.clear();
356 self.trades.lock().await.clear();
357 self.ohlcvs.lock().await.clear();
358 self.balances.write().await.clear();
359 self.orders.write().await.clear();
360 self.my_trades.write().await.clear();
361 self.positions.write().await.clear();
362
363 self.shutdown_complete.store(true, Ordering::Release);
365
366 tracing::info!("BinanceWs shutdown complete");
367 Ok(())
368 }
369
370 #[inline]
372 pub fn is_shutting_down(&self) -> bool {
373 self.is_shutting_down.load(Ordering::Acquire)
374 }
375
376 #[inline]
378 #[allow(dead_code)]
379 fn check_not_shutting_down(&self) -> Result<()> {
380 if self.is_shutting_down.load(Ordering::Acquire) {
381 return Err(Error::invalid_request("WebSocket client is shutting down"));
382 }
383 Ok(())
384 }
385
386 pub async fn connect_user_stream(&self) -> Result<()> {
388 let manager = self.listen_key_manager.as_ref()
389 .ok_or_else(|| Error::invalid_request(
390 "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
391 ))?;
392
393 let listen_key = manager.get_or_create().await?;
394
395 let base_url = self.message_router.get_url();
396 let base_url = if let Some(stripped) = base_url.strip_suffix('/') {
397 stripped
398 } else {
399 &base_url
400 };
401
402 let url = format!("{}/{}", base_url, listen_key);
403
404 self.message_router.start(Some(url)).await?;
405 manager.start_auto_refresh().await;
406 *self.listen_key.write().await = Some(listen_key);
407
408 Ok(())
409 }
410
411 pub async fn close_user_stream(&self) -> Result<()> {
413 if let Some(manager) = &self.listen_key_manager {
414 manager.delete().await?;
415 }
416 *self.listen_key.write().await = None;
417 Ok(())
418 }
419
420 pub async fn get_listen_key(&self) -> Option<String> {
422 if let Some(manager) = &self.listen_key_manager {
423 manager.get_current().await
424 } else {
425 self.listen_key.read().await.clone()
426 }
427 }
428
429 pub async fn subscribe_ticker(
434 &self,
435 symbol: &str,
436 ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
437 let normalized = normalize_symbol(symbol);
438 let stream = format!("{}@ticker", normalized);
439 let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
440 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
441
442 let is_new = self
443 .subscription_manager
444 .add_subscription(
445 stream.clone(),
446 symbol.to_string(),
447 SubscriptionType::Ticker,
448 tx,
449 )
450 .await?;
451
452 if is_new {
453 self.message_router.subscribe(vec![stream]).await?;
454 }
455 Ok(rx)
456 }
457
458 pub async fn subscribe_all_tickers(&self) -> Result<tokio::sync::mpsc::Receiver<Value>> {
462 let stream = "!ticker@arr".to_string();
463 let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
464 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
465
466 let is_new = self
467 .subscription_manager
468 .add_subscription(
469 stream.clone(),
470 "all".to_string(),
471 SubscriptionType::Ticker,
472 tx,
473 )
474 .await?;
475
476 if is_new {
477 self.message_router.subscribe(vec![stream]).await?;
478 }
479 Ok(rx)
480 }
481
482 pub async fn subscribe_trades(
486 &self,
487 symbol: &str,
488 ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
489 let normalized = normalize_symbol(symbol);
490 let stream = format!("{}@trade", normalized);
491 let capacity = self.channel_capacity_for(&SubscriptionType::Trades);
492 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
493
494 let is_new = self
495 .subscription_manager
496 .add_subscription(
497 stream.clone(),
498 symbol.to_string(),
499 SubscriptionType::Trades,
500 tx,
501 )
502 .await?;
503
504 if is_new {
505 self.message_router.subscribe(vec![stream]).await?;
506 }
507 Ok(rx)
508 }
509
510 pub async fn subscribe_agg_trades(
514 &self,
515 symbol: &str,
516 ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
517 let normalized = normalize_symbol(symbol);
518 let stream = format!("{}@aggTrade", normalized);
519 let capacity = self.channel_capacity_for(&SubscriptionType::Trades);
520 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
521
522 let is_new = self
523 .subscription_manager
524 .add_subscription(
525 stream.clone(),
526 symbol.to_string(),
527 SubscriptionType::Trades,
528 tx,
529 )
530 .await?;
531
532 if is_new {
533 self.message_router.subscribe(vec![stream]).await?;
534 }
535 Ok(rx)
536 }
537
538 pub async fn subscribe_orderbook(
556 &self,
557 symbol: &str,
558 levels: DepthLevel,
559 update_speed: UpdateSpeed,
560 ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
561 let normalized = normalize_symbol(symbol);
562 let stream = match update_speed {
563 UpdateSpeed::Ms100 => format!("{}@depth{}@100ms", normalized, levels.as_u32()),
564 UpdateSpeed::Ms1000 => format!("{}@depth{}", normalized, levels.as_u32()),
565 };
566 let capacity = self.channel_capacity_for(&SubscriptionType::OrderBook);
567 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
568
569 let is_new = self
570 .subscription_manager
571 .add_subscription(
572 stream.clone(),
573 symbol.to_string(),
574 SubscriptionType::OrderBook,
575 tx,
576 )
577 .await?;
578
579 if is_new {
580 self.message_router.subscribe(vec![stream]).await?;
581 }
582 Ok(rx)
583 }
584
585 pub async fn subscribe_orderbook_diff(
593 &self,
594 symbol: &str,
595 update_speed: Option<UpdateSpeed>,
596 ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
597 let normalized = normalize_symbol(symbol);
598 let stream = match update_speed {
599 Some(UpdateSpeed::Ms100) => format!("{}@depth@100ms", normalized),
600 _ => format!("{}@depth", normalized),
601 };
602 let capacity = self.channel_capacity_for(&SubscriptionType::OrderBook);
603 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
604
605 let is_new = self
606 .subscription_manager
607 .add_subscription(
608 stream.clone(),
609 symbol.to_string(),
610 SubscriptionType::OrderBook,
611 tx,
612 )
613 .await?;
614
615 if is_new {
616 self.message_router.subscribe(vec![stream]).await?;
617 }
618 Ok(rx)
619 }
620
621 pub async fn subscribe_kline(
625 &self,
626 symbol: &str,
627 interval: &str,
628 ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
629 let normalized = normalize_symbol(symbol);
630 let stream = format!("{}@kline_{}", normalized, interval);
631 let sub_type = SubscriptionType::Kline(interval.to_string());
632 let capacity = self.channel_capacity_for(&sub_type);
633 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
634
635 let is_new = self
636 .subscription_manager
637 .add_subscription(stream.clone(), symbol.to_string(), sub_type, tx)
638 .await?;
639
640 if is_new {
641 self.message_router.subscribe(vec![stream]).await?;
642 }
643 Ok(rx)
644 }
645
646 pub async fn subscribe_mini_ticker(
650 &self,
651 symbol: &str,
652 ) -> Result<tokio::sync::mpsc::Receiver<Value>> {
653 let normalized = normalize_symbol(symbol);
654 let stream = format!("{}@miniTicker", normalized);
655 let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
656 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
657
658 let is_new = self
659 .subscription_manager
660 .add_subscription(
661 stream.clone(),
662 symbol.to_string(),
663 SubscriptionType::Ticker,
664 tx,
665 )
666 .await?;
667
668 if is_new {
669 self.message_router.subscribe(vec![stream]).await?;
670 }
671 Ok(rx)
672 }
673
674 pub async fn subscribe_all_mini_tickers(&self) -> Result<tokio::sync::mpsc::Receiver<Value>> {
678 let stream = "!miniTicker@arr".to_string();
679 let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
680 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
681
682 let is_new = self
683 .subscription_manager
684 .add_subscription(
685 stream.clone(),
686 "all".to_string(),
687 SubscriptionType::Ticker,
688 tx,
689 )
690 .await?;
691
692 if is_new {
693 self.message_router.subscribe(vec![stream]).await?;
694 }
695 Ok(rx)
696 }
697
698 pub async fn unsubscribe(&self, stream: String) -> Result<()> {
703 let fully_removed = self
704 .subscription_manager
705 .remove_subscription(&stream)
706 .await?;
707
708 if fully_removed {
710 self.message_router.unsubscribe(vec![stream]).await?;
711 }
712 Ok(())
713 }
714
715 pub fn receive(&self) -> Option<Value> {
717 None
718 }
719
720 pub fn is_connected(&self) -> bool {
722 self.message_router.is_connected()
723 }
724
725 pub fn state(&self) -> ccxt_core::ws_client::WsConnectionState {
727 if self.message_router.is_connected() {
728 ccxt_core::ws_client::WsConnectionState::Connected
729 } else {
730 ccxt_core::ws_client::WsConnectionState::Disconnected
731 }
732 }
733
734 pub fn subscriptions(&self) -> Vec<String> {
740 let subs = self.subscription_manager.get_all_subscriptions_sync();
741 subs.into_iter().map(|s| s.stream).collect()
742 }
743
744 pub fn subscription_count(&self) -> usize {
746 self.subscription_manager.active_count()
747 }
748
749 pub fn health(&self) -> WsHealthSnapshot {
757 let now = chrono::Utc::now().timestamp_millis() as u64;
758 let start_time = self.connection_start_time.load(Ordering::Relaxed);
759 let last_msg = self.last_message_time.load(Ordering::Relaxed);
760
761 WsHealthSnapshot {
762 latency_ms: self.message_router.latency(),
763 messages_received: self.messages_received.load(Ordering::Relaxed),
764 messages_dropped: self.messages_dropped.load(Ordering::Relaxed),
765 last_message_time: if last_msg > 0 {
766 Some(last_msg as i64)
767 } else {
768 None
769 },
770 connection_uptime_ms: if start_time > 0 {
771 now.saturating_sub(start_time)
772 } else {
773 0
774 },
775 reconnect_count: self.message_router.reconnect_count(),
776 }
777 }
778
779 #[inline]
781 #[allow(dead_code)]
782 pub(crate) fn record_message_received(&self) {
783 self.messages_received.fetch_add(1, Ordering::Relaxed);
784 self.last_message_time.store(
785 chrono::Utc::now().timestamp_millis() as u64,
786 Ordering::Relaxed,
787 );
788 }
789
790 #[inline]
792 #[allow(dead_code)]
793 pub(crate) fn record_message_dropped(&self) {
794 let count = self.messages_dropped.fetch_add(1, Ordering::Relaxed) + 1;
795 if count % 100 == 1 {
797 tracing::warn!(dropped_count = count, "Message dropped due to backpressure");
798 }
799 }
800
801 async fn watch_stream<T, P>(
819 &self,
820 stream: String,
821 symbol: String,
822 sub_type: SubscriptionType,
823 market: Option<&ccxt_core::types::Market>,
824 ) -> Result<T>
825 where
826 P: parsers::StreamParser<Output = T>,
827 {
828 let capacity = self.channel_capacity_for(&sub_type);
829 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
830 let is_new = self
831 .subscription_manager
832 .add_subscription(stream.clone(), symbol, sub_type, tx)
833 .await?;
834
835 if is_new {
836 self.message_router.subscribe(vec![stream.clone()]).await?;
837 }
838
839 loop {
840 if let Some(message) = rx.recv().await {
841 if message.get("result").is_some() {
843 continue;
844 }
845
846 match P::parse(&message, market) {
847 Ok(data) => return Ok(data),
848 Err(e) => {
849 tracing::warn!(
850 "Failed to parse message for stream {}: {:?}. Payload: {:?}",
851 stream,
852 e,
853 message
854 );
855 }
857 }
858 } else {
859 return Err(Error::network("Subscription channel closed"));
860 }
861 }
862 }
863
864 async fn watch_mark_price_internal(
866 &self,
867 symbol: &str,
868 channel_name: &str,
869 ) -> Result<MarkPrice> {
870 let normalized = normalize_symbol(symbol);
871 let stream = format!("{}@{}", normalized, channel_name);
872 tracing::debug!(
873 "watch_mark_price_internal: stream={}, symbol={}",
874 stream,
875 symbol
876 );
877
878 let mark_price = self
879 .watch_stream::<MarkPrice, parsers::MarkPriceParser>(
880 stream,
881 symbol.to_string(),
882 SubscriptionType::MarkPrice,
883 None,
884 )
885 .await?;
886
887 let mut mark_prices = self.mark_prices.lock().await;
889 mark_prices.insert(mark_price.symbol.clone(), mark_price.clone());
890
891 Ok(mark_price)
892 }
893
894 async fn watch_mark_prices_internal(
896 &self,
897 symbols: Option<Vec<String>>,
898 channel_name: &str,
899 ) -> Result<HashMap<String, MarkPrice>> {
900 let capacity = self.channel_capacity_for(&SubscriptionType::MarkPrice);
901 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
902
903 let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
904 let mut streams = Vec::with_capacity(syms.len());
905 for sym in syms {
906 let symbol = sym.to_lowercase();
907 let stream = format!("{}@{}", symbol, channel_name);
908 let is_new = self
909 .subscription_manager
910 .add_subscription(
911 stream.clone(),
912 symbol,
913 SubscriptionType::MarkPrice,
914 tx.clone(),
915 )
916 .await?;
917 if is_new {
918 streams.push(stream);
919 }
920 }
921 streams
922 } else {
923 let stream = format!("!{}@arr", channel_name);
924 let is_new = self
925 .subscription_manager
926 .add_subscription(
927 stream.clone(),
928 "all".to_string(),
929 SubscriptionType::MarkPrice,
930 tx.clone(),
931 )
932 .await?;
933 if is_new { vec![stream] } else { vec![] }
934 };
935
936 if !streams.is_empty() {
937 self.message_router.subscribe(streams.clone()).await?;
938 }
939
940 let mut result = HashMap::new();
941
942 loop {
943 if let Some(message) = rx.recv().await {
944 if message.get("result").is_some() {
945 continue;
946 }
947
948 if let Some(arr) = message.as_array() {
949 for item in arr {
950 if let Ok(mark_price) = parser::parse_ws_mark_price(item) {
951 let symbol = mark_price.symbol.clone();
952
953 if let Some(syms) = &symbols {
954 if syms.contains(&symbol.to_lowercase()) {
955 result.insert(symbol.clone(), mark_price.clone());
956 }
957 } else {
958 result.insert(symbol.clone(), mark_price.clone());
959 }
960
961 let mut mark_prices = self.mark_prices.lock().await;
962 mark_prices.insert(symbol, mark_price);
963 } else {
964 tracing::warn!("Failed to parse item in mark price array: {:?}", item);
965 }
966 }
967
968 if let Some(syms) = &symbols {
969 if result.len() >= syms.len() {
970 return Ok(result);
971 }
972 } else {
973 return Ok(result);
975 }
976 } else {
977 match parser::parse_ws_mark_price(&message) {
978 Ok(mark_price) => {
979 let symbol = mark_price.symbol.clone();
980 result.insert(symbol.clone(), mark_price.clone());
981
982 let mut mark_prices = self.mark_prices.lock().await;
983 mark_prices.insert(symbol, mark_price);
984
985 if let Some(syms) = &symbols {
986 if result.len() >= syms.len() {
987 return Ok(result);
988 }
989 }
990 }
991 Err(e) => {
992 tracing::warn!(
993 "Failed to parse mark price message: {:?}. Payload: {:?}",
994 e,
995 message
996 );
997 }
998 }
999 }
1000 } else {
1001 return Err(Error::network("Subscription channel closed"));
1002 }
1003 }
1004 }
1005
1006 async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
1008 let normalized = normalize_symbol(symbol);
1009 let stream = format!("{}@{}", normalized, channel_name);
1010
1011 let ticker = self
1012 .watch_stream::<Ticker, parsers::TickerParser>(
1013 stream,
1014 symbol.to_string(),
1015 SubscriptionType::Ticker,
1016 None,
1017 )
1018 .await?;
1019
1020 let mut tickers = self.tickers.lock().await;
1022 tickers.insert(ticker.symbol.clone(), ticker.clone());
1023
1024 Ok(ticker)
1025 }
1026
1027 async fn watch_tickers_internal(
1029 &self,
1030 symbols: Option<Vec<String>>,
1031 channel_name: &str,
1032 ) -> Result<HashMap<String, Ticker>> {
1033 let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
1034 syms.iter()
1035 .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
1036 .collect()
1037 } else {
1038 vec![format!("!{}@arr", channel_name)]
1039 };
1040
1041 let capacity = self.channel_capacity_for(&SubscriptionType::Ticker);
1042 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1043
1044 let mut new_streams = Vec::new();
1045
1046 for stream in &streams {
1047 let is_new = self
1048 .subscription_manager
1049 .add_subscription(
1050 stream.clone(),
1051 "all".to_string(),
1052 SubscriptionType::Ticker,
1053 tx.clone(),
1054 )
1055 .await?;
1056 if is_new {
1057 new_streams.push(stream.clone());
1058 }
1059 }
1060
1061 if !new_streams.is_empty() {
1062 self.message_router.subscribe(new_streams).await?;
1063 }
1064
1065 let mut result = HashMap::new();
1066
1067 loop {
1068 if let Some(message) = rx.recv().await {
1069 if message.get("result").is_some() {
1070 continue;
1071 }
1072
1073 if let Some(arr) = message.as_array() {
1074 for item in arr {
1075 if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
1076 let symbol = ticker.symbol.clone();
1077
1078 if let Some(syms) = &symbols {
1079 if syms.contains(&symbol.to_lowercase()) {
1080 result.insert(symbol.clone(), ticker.clone());
1081 }
1082 } else {
1083 result.insert(symbol.clone(), ticker.clone());
1084 }
1085
1086 let mut tickers = self.tickers.lock().await;
1087 tickers.insert(symbol, ticker);
1088 } else {
1089 tracing::warn!("Failed to parse item in ticker array: {:?}", item);
1090 }
1091 }
1092
1093 if let Some(syms) = &symbols {
1094 if result.len() >= syms.len() {
1095 return Ok(result);
1096 }
1097 } else {
1098 return Ok(result);
1102 }
1103 } else {
1104 match parser::parse_ws_ticker(&message, None) {
1105 Ok(ticker) => {
1106 let symbol = ticker.symbol.clone();
1107 result.insert(symbol.clone(), ticker.clone());
1108
1109 let mut tickers = self.tickers.lock().await;
1110 tickers.insert(symbol, ticker);
1111
1112 if let Some(syms) = &symbols {
1113 if result.len() >= syms.len() {
1114 return Ok(result);
1115 }
1116 }
1117 }
1118 Err(e) => {
1119 tracing::warn!(
1120 "Failed to parse ticker message: {:?}. Payload: {:?}",
1121 e,
1122 message
1123 );
1124 }
1125 }
1126 }
1127 } else {
1128 return Err(Error::network("Subscription channel closed"));
1129 }
1130 }
1131 }
1132
1133 async fn handle_orderbook_delta(
1135 &self,
1136 symbol: &str,
1137 delta_message: &Value,
1138 is_futures: bool,
1139 ) -> Result<()> {
1140 handlers::handle_orderbook_delta(symbol, delta_message, is_futures, &self.orderbooks).await
1141 }
1142
1143 async fn fetch_orderbook_snapshot(
1145 &self,
1146 exchange: &Binance,
1147 symbol: &str,
1148 limit: Option<i64>,
1149 is_futures: bool,
1150 ) -> Result<OrderBook> {
1151 handlers::fetch_orderbook_snapshot(exchange, symbol, limit, is_futures, &self.orderbooks)
1152 .await
1153 }
1154
1155 async fn watch_orderbook_internal(
1157 &self,
1158 exchange: &Binance,
1159 symbol: &str,
1160 limit: Option<i64>,
1161 update_speed: UpdateSpeed,
1162 is_futures: bool,
1163 ) -> Result<OrderBook> {
1164 let stream = match update_speed {
1165 UpdateSpeed::Ms100 => format!("{}@depth@100ms", symbol.to_lowercase()),
1166 UpdateSpeed::Ms1000 => format!("{}@depth", symbol.to_lowercase()),
1167 };
1168
1169 let capacity = self.channel_capacity_for(&SubscriptionType::OrderBook);
1170 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1171 let is_new = self
1172 .subscription_manager
1173 .add_subscription(
1174 stream.clone(),
1175 symbol.to_string(),
1176 SubscriptionType::OrderBook,
1177 tx,
1178 )
1179 .await?;
1180
1181 if is_new {
1182 self.message_router.subscribe(vec![stream.clone()]).await?;
1183 }
1184
1185 tokio::time::sleep(Duration::from_millis(500)).await;
1186
1187 let _snapshot = self
1188 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1189 .await?;
1190
1191 loop {
1192 if let Some(message) = rx.recv().await {
1193 if message.get("result").is_some() {
1194 continue;
1195 }
1196
1197 if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
1198 if event_type == "depthUpdate" {
1199 match self
1200 .handle_orderbook_delta(symbol, &message, is_futures)
1201 .await
1202 {
1203 Ok(()) => {
1204 let orderbooks = self.orderbooks.lock().await;
1205 if let Some(ob) = orderbooks.get(symbol) {
1206 if ob.is_synced {
1207 return Ok(ob.clone());
1208 }
1209 }
1210 }
1211 Err(e) => {
1212 let err_msg = e.to_string();
1213 let recovery = WsErrorRecovery::from_error_message(&err_msg);
1214
1215 match recovery {
1216 WsErrorRecovery::Resync => {
1217 tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
1218 match self
1219 .resync_orderbook(exchange, symbol, limit, is_futures)
1220 .await
1221 {
1222 Ok(true) => {
1223 tracing::info!(
1224 "Resync completed successfully for {}",
1225 symbol
1226 );
1227 }
1228 Ok(false) => {
1229 tracing::debug!(
1230 "Resync rate limited for {}, skipping",
1231 symbol
1232 );
1233 }
1234 Err(resync_err) => {
1235 tracing::error!(
1236 "Resync failed for {}: {}",
1237 symbol,
1238 resync_err
1239 );
1240 return Err(resync_err);
1241 }
1242 }
1243 }
1244 WsErrorRecovery::Fatal => {
1245 tracing::error!(
1246 "Fatal error handling orderbook delta: {}",
1247 err_msg
1248 );
1249 return Err(e);
1250 }
1251 _ => {
1252 tracing::error!(
1253 "Failed to handle orderbook delta: {}",
1254 err_msg
1255 );
1256 }
1257 }
1258 }
1259 }
1260 }
1261 }
1262 } else {
1263 return Err(Error::network("Subscription channel closed"));
1264 }
1265 }
1266 }
1267
1268 async fn resync_orderbook(
1272 &self,
1273 exchange: &Binance,
1274 symbol: &str,
1275 limit: Option<i64>,
1276 is_futures: bool,
1277 ) -> Result<bool> {
1278 let current_time = chrono::Utc::now().timestamp_millis();
1279
1280 let should_resync = {
1282 let orderbooks = self.orderbooks.lock().await;
1283 if let Some(ob) = orderbooks.get(symbol) {
1284 ob.should_resync(current_time)
1285 } else {
1286 true
1287 }
1288 };
1289
1290 if !should_resync {
1291 return Ok(false);
1292 }
1293
1294 {
1296 let mut orderbooks = self.orderbooks.lock().await;
1297 if let Some(ob) = orderbooks.get_mut(symbol) {
1298 ob.reset_for_resync();
1299 ob.mark_resync_initiated(current_time);
1300 }
1301 }
1302
1303 tokio::time::sleep(Duration::from_millis(500)).await;
1305
1306 self.fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1308 .await?;
1309
1310 Ok(true)
1311 }
1312
1313 async fn watch_orderbooks_internal(
1315 &self,
1316 exchange: &Binance,
1317 symbols: Vec<String>,
1318 limit: Option<i64>,
1319 update_speed: UpdateSpeed,
1320 is_futures: bool,
1321 ) -> Result<HashMap<String, OrderBook>> {
1322 if symbols.len() > 200 {
1323 return Err(Error::invalid_request(
1324 "Binance supports max 200 symbols per connection",
1325 ));
1326 }
1327
1328 let capacity = self.channel_capacity_for(&SubscriptionType::OrderBook);
1329 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1330 let mut new_streams = Vec::new();
1331
1332 for symbol in &symbols {
1333 let stream = match update_speed {
1334 UpdateSpeed::Ms100 => format!("{}@depth@100ms", symbol.to_lowercase()),
1335 UpdateSpeed::Ms1000 => format!("{}@depth", symbol.to_lowercase()),
1336 };
1337
1338 let is_new = self
1339 .subscription_manager
1340 .add_subscription(
1341 stream.clone(),
1342 symbol.clone(),
1343 SubscriptionType::OrderBook,
1344 tx.clone(),
1345 )
1346 .await?;
1347
1348 if is_new {
1349 new_streams.push(stream);
1350 }
1351 }
1352
1353 if !new_streams.is_empty() {
1354 self.message_router.subscribe(new_streams).await?;
1355 }
1356
1357 tokio::time::sleep(Duration::from_millis(500)).await;
1358
1359 for symbol in &symbols {
1360 let _ = self
1361 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1362 .await;
1363 }
1364
1365 let mut result = HashMap::new();
1366 let mut synced_symbols = std::collections::HashSet::new();
1367
1368 while synced_symbols.len() < symbols.len() {
1369 if let Some(message) = rx.recv().await {
1370 if message.get("result").is_some() {
1371 continue;
1372 }
1373
1374 if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
1375 if event_type == "depthUpdate" {
1376 if let Some(msg_symbol) =
1377 message.get("s").and_then(serde_json::Value::as_str)
1378 {
1379 if let Err(e) = self
1380 .handle_orderbook_delta(msg_symbol, &message, is_futures)
1381 .await
1382 {
1383 tracing::error!("Failed to handle orderbook delta: {}", e);
1384 continue;
1385 }
1386
1387 let orderbooks = self.orderbooks.lock().await;
1388 if let Some(ob) = orderbooks.get(msg_symbol) {
1389 if ob.is_synced {
1390 synced_symbols.insert(msg_symbol.to_string());
1391 }
1392 }
1393 }
1394 }
1395 }
1396 } else {
1397 return Err(Error::network("Subscription channel closed"));
1398 }
1399 }
1400
1401 let orderbooks = self.orderbooks.lock().await;
1402 for symbol in &symbols {
1403 if let Some(ob) = orderbooks.get(symbol) {
1404 result.insert(symbol.clone(), ob.clone());
1405 }
1406 }
1407
1408 Ok(result)
1409 }
1410
1411 async fn watch_bids_asks_internal(&self, symbol: &str, market_id: &str) -> Result<BidAsk> {
1413 let normalized = normalize_symbol(market_id);
1414 let stream = format!("{}@bookTicker", normalized);
1415
1416 let bid_ask = self
1417 .watch_stream::<BidAsk, parsers::BidAskParser>(
1418 stream,
1419 symbol.to_string(),
1420 SubscriptionType::BookTicker,
1421 None,
1422 )
1423 .await?;
1424
1425 let mut bids_asks_map = self.bids_asks.lock().await;
1427 bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
1428
1429 Ok(bid_ask)
1430 }
1431
1432 async fn watch_trades_internal(
1434 &self,
1435 symbol: &str,
1436 market_id: &str,
1437 since: Option<i64>,
1438 limit: Option<usize>,
1439 market: Option<&ccxt_core::types::Market>,
1440 ) -> Result<Vec<Trade>> {
1441 let stream = format!("{}@trade", market_id.to_lowercase());
1442 let capacity = self.channel_capacity_for(&SubscriptionType::Trades);
1443 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1444
1445 let is_new = self
1446 .subscription_manager
1447 .add_subscription(
1448 stream.clone(),
1449 symbol.to_string(),
1450 SubscriptionType::Trades,
1451 tx,
1452 )
1453 .await?;
1454
1455 if is_new {
1456 self.message_router.subscribe(vec![stream.clone()]).await?;
1457 }
1458
1459 loop {
1463 if let Some(message) = rx.recv().await {
1464 if message.get("result").is_some() {
1465 continue;
1466 }
1467
1468 if let Ok(trade) = parser::parse_ws_trade(&message, market) {
1469 let mut trades_map = self.trades.lock().await;
1470 let trades = trades_map
1471 .entry(symbol.to_string())
1472 .or_insert_with(VecDeque::new);
1473
1474 if trades.len() >= MAX_TRADES {
1475 trades.pop_front();
1476 }
1477 trades.push_back(trade);
1478
1479 let mut result: Vec<Trade> = trades.iter().cloned().collect();
1480
1481 if let Some(since_ts) = since {
1482 result.retain(|t| t.timestamp >= since_ts);
1483 }
1484
1485 if let Some(limit_size) = limit {
1486 if result.len() > limit_size {
1487 result = result.split_off(result.len() - limit_size);
1488 }
1489 }
1490
1491 return Ok(result);
1492 }
1493 } else {
1494 return Err(Error::network("Subscription channel closed"));
1495 }
1496 }
1497 }
1498
1499 async fn watch_ohlcv_internal(
1501 &self,
1502 symbol: &str,
1503 market_id: &str,
1504 timeframe: &str,
1505 since: Option<i64>,
1506 limit: Option<usize>,
1507 ) -> Result<Vec<OHLCV>> {
1508 let stream = format!("{}@kline_{}", market_id.to_lowercase(), timeframe);
1509 let sub_type = SubscriptionType::Kline(timeframe.to_string());
1510 let capacity = self.channel_capacity_for(&sub_type);
1511 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1512
1513 let is_new = self
1514 .subscription_manager
1515 .add_subscription(stream.clone(), symbol.to_string(), sub_type, tx)
1516 .await?;
1517
1518 if is_new {
1519 self.message_router.subscribe(vec![stream.clone()]).await?;
1520 }
1521
1522 loop {
1523 if let Some(message) = rx.recv().await {
1524 if message.get("result").is_some() {
1525 continue;
1526 }
1527
1528 if let Ok(ohlcv) = parser::parse_ws_ohlcv(&message) {
1529 let cache_key = format!("{}:{}", symbol, timeframe);
1530 let mut ohlcvs_map = self.ohlcvs.lock().await;
1531 let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
1532
1533 if ohlcvs.len() >= MAX_OHLCVS {
1534 ohlcvs.pop_front();
1535 }
1536 ohlcvs.push_back(ohlcv);
1537
1538 let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
1539
1540 if let Some(since_ts) = since {
1541 result.retain(|o| o.timestamp >= since_ts);
1542 }
1543
1544 if let Some(limit_size) = limit {
1545 if result.len() > limit_size {
1546 result = result.split_off(result.len() - limit_size);
1547 }
1548 }
1549
1550 return Ok(result);
1551 }
1552 } else {
1553 return Err(Error::network("Subscription channel closed"));
1554 }
1555 }
1556 }
1557
1558 pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1560 let tickers = self.tickers.lock().await;
1561 tickers.get(symbol).cloned()
1562 }
1563
1564 pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1566 let tickers = self.tickers.lock().await;
1567 tickers.clone()
1568 }
1569
1570 async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1572 user_data::handle_balance_message(message, account_type, &self.balances).await
1573 }
1574
1575 async fn watch_balance_internal(&self, account_type: &str) -> Result<Balance> {
1577 self.connect_user_stream().await?;
1578
1579 let capacity = self.channel_capacity_for(&SubscriptionType::Balance);
1580 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1581
1582 self.subscription_manager
1583 .add_subscription(
1584 "!userData".to_string(),
1585 "user".to_string(),
1586 SubscriptionType::Balance,
1587 tx,
1588 )
1589 .await?;
1590
1591 loop {
1593 if let Some(message) = rx.recv().await {
1594 if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
1595 if matches!(
1596 event_type,
1597 "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE"
1598 ) {
1599 if let Ok(()) = self.handle_balance_message(&message, account_type).await {
1600 let balances = self.balances.read().await;
1601 if let Some(balance) = balances.get(account_type) {
1602 return Ok(balance.clone());
1603 }
1604 }
1605 }
1606 }
1607 } else {
1608 return Err(Error::network("Subscription channel closed"));
1609 }
1610 }
1611 }
1612
1613 async fn watch_orders_internal(
1615 &self,
1616 symbol: Option<&str>,
1617 since: Option<i64>,
1618 limit: Option<usize>,
1619 ) -> Result<Vec<Order>> {
1620 self.connect_user_stream().await?;
1621
1622 let capacity = self.channel_capacity_for(&SubscriptionType::Orders);
1623 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1624
1625 self.subscription_manager
1626 .add_subscription(
1627 "!userData".to_string(),
1628 "user".to_string(),
1629 SubscriptionType::Orders,
1630 tx,
1631 )
1632 .await?;
1633
1634 loop {
1635 if let Some(message) = rx.recv().await {
1636 if let Value::Object(data) = message {
1637 if let Some(event_type) = data.get("e").and_then(serde_json::Value::as_str) {
1638 if event_type == "executionReport" {
1639 let order = user_data::parse_ws_order(&data);
1640
1641 let mut orders = self.orders.write().await;
1642 let symbol_orders = orders
1643 .entry(order.symbol.clone())
1644 .or_insert_with(HashMap::new);
1645 symbol_orders.insert(order.id.clone(), order.clone());
1646 drop(orders);
1647
1648 if let Some(exec_type) =
1649 data.get("x").and_then(serde_json::Value::as_str)
1650 {
1651 if exec_type == "TRADE" {
1652 if let Ok(trade) =
1653 BinanceWs::parse_ws_trade(&Value::Object(data.clone()))
1654 {
1655 let mut trades = self.my_trades.write().await;
1656 let symbol_trades = trades
1657 .entry(trade.symbol.clone())
1658 .or_insert_with(VecDeque::new);
1659
1660 symbol_trades.push_front(trade);
1661 if symbol_trades.len() > 1000 {
1662 symbol_trades.pop_back();
1663 }
1664 }
1665 }
1666 }
1667
1668 return self.filter_orders(symbol, since, limit).await;
1669 }
1670 }
1671 }
1672 } else {
1673 return Err(Error::network("Subscription channel closed"));
1674 }
1675 }
1676 }
1677
1678 async fn watch_my_trades_internal(
1680 &self,
1681 symbol: Option<&str>,
1682 since: Option<i64>,
1683 limit: Option<usize>,
1684 ) -> Result<Vec<Trade>> {
1685 self.connect_user_stream().await?;
1686
1687 let capacity = self.channel_capacity_for(&SubscriptionType::MyTrades);
1688 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1689
1690 self.subscription_manager
1691 .add_subscription(
1692 "!userData".to_string(),
1693 "user".to_string(),
1694 SubscriptionType::MyTrades,
1695 tx,
1696 )
1697 .await?;
1698
1699 loop {
1700 if let Some(msg) = rx.recv().await {
1701 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
1702 if event_type == "executionReport" {
1703 if let Ok(trade) = BinanceWs::parse_ws_trade(&msg) {
1704 let symbol_key = trade.symbol.clone();
1705
1706 let mut trades_map = self.my_trades.write().await;
1707 let symbol_trades =
1708 trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
1709
1710 symbol_trades.push_front(trade);
1711 if symbol_trades.len() > 1000 {
1712 symbol_trades.pop_back();
1713 }
1714
1715 drop(trades_map);
1716 return self.filter_my_trades(symbol, since, limit).await;
1717 }
1718 }
1719 }
1720 } else {
1721 return Err(Error::network("Subscription channel closed"));
1722 }
1723 }
1724 }
1725
1726 async fn watch_positions_internal(
1728 &self,
1729 symbols: Option<Vec<String>>,
1730 since: Option<i64>,
1731 limit: Option<usize>,
1732 ) -> Result<Vec<Position>> {
1733 self.connect_user_stream().await?;
1734
1735 let capacity = self.channel_capacity_for(&SubscriptionType::Positions);
1736 let (tx, mut rx) = tokio::sync::mpsc::channel(capacity);
1737
1738 self.subscription_manager
1739 .add_subscription(
1740 "!userData".to_string(),
1741 "user".to_string(),
1742 SubscriptionType::Positions,
1743 tx,
1744 )
1745 .await?;
1746
1747 loop {
1748 if let Some(msg) = rx.recv().await {
1749 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
1750 if event_type == "ACCOUNT_UPDATE" {
1751 if let Some(account_data) = msg.get("a") {
1752 if let Some(positions_array) =
1753 account_data.get("P").and_then(|p| p.as_array())
1754 {
1755 for position_data in positions_array {
1756 if let Ok(position) =
1757 BinanceWs::parse_ws_position(position_data)
1758 {
1759 let symbol_key = position.symbol.clone();
1760 let side_key = position
1761 .side
1762 .clone()
1763 .unwrap_or_else(|| "both".to_string());
1764
1765 let mut positions_map = self.positions.write().await;
1766 let symbol_positions = positions_map
1767 .entry(symbol_key)
1768 .or_insert_with(HashMap::new);
1769
1770 if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
1771 symbol_positions.remove(&side_key);
1772 if symbol_positions.is_empty() {
1773 positions_map.remove(&position.symbol);
1774 }
1775 } else {
1776 symbol_positions.insert(side_key, position);
1777 }
1778 }
1779 }
1780
1781 let symbols_ref = symbols.as_deref();
1782 return self.filter_positions(symbols_ref, since, limit).await;
1783 }
1784 }
1785 }
1786 }
1787 } else {
1788 return Err(Error::network("Subscription channel closed"));
1789 }
1790 }
1791 }
1792
1793 async fn filter_orders(
1795 &self,
1796 symbol: Option<&str>,
1797 since: Option<i64>,
1798 limit: Option<usize>,
1799 ) -> Result<Vec<Order>> {
1800 let orders_map = self.orders.read().await;
1801
1802 let mut orders: Vec<Order> = if let Some(sym) = symbol {
1803 orders_map
1804 .get(sym)
1805 .map(|symbol_orders| symbol_orders.values().cloned().collect())
1806 .unwrap_or_default()
1807 } else {
1808 orders_map
1809 .values()
1810 .flat_map(|symbol_orders| symbol_orders.values().cloned())
1811 .collect()
1812 };
1813
1814 if let Some(since_ts) = since {
1815 orders.retain(|order| order.timestamp.is_some_and(|ts| ts >= since_ts));
1816 }
1817
1818 orders.sort_by(|a, b| {
1819 let ts_a = a.timestamp.unwrap_or(0);
1820 let ts_b = b.timestamp.unwrap_or(0);
1821 ts_b.cmp(&ts_a)
1822 });
1823
1824 if let Some(lim) = limit {
1825 orders.truncate(lim);
1826 }
1827
1828 Ok(orders)
1829 }
1830
1831 fn parse_ws_trade(data: &Value) -> Result<Trade> {
1833 user_data::parse_ws_trade(data)
1834 }
1835
1836 async fn filter_my_trades(
1838 &self,
1839 symbol: Option<&str>,
1840 since: Option<i64>,
1841 limit: Option<usize>,
1842 ) -> Result<Vec<Trade>> {
1843 let trades_map = self.my_trades.read().await;
1844
1845 let mut trades: Vec<Trade> = if let Some(sym) = symbol {
1846 trades_map
1847 .get(sym)
1848 .map(|symbol_trades| symbol_trades.iter().cloned().collect())
1849 .unwrap_or_default()
1850 } else {
1851 trades_map
1852 .values()
1853 .flat_map(|symbol_trades| symbol_trades.iter().cloned())
1854 .collect()
1855 };
1856
1857 if let Some(since_ts) = since {
1858 trades.retain(|trade| trade.timestamp >= since_ts);
1859 }
1860
1861 trades.sort_by(|a, b| {
1862 let ts_a = a.timestamp;
1863 let ts_b = b.timestamp;
1864 ts_b.cmp(&ts_a)
1865 });
1866
1867 if let Some(lim) = limit {
1868 trades.truncate(lim);
1869 }
1870
1871 Ok(trades)
1872 }
1873
1874 fn parse_ws_position(data: &Value) -> Result<Position> {
1876 user_data::parse_ws_position(data)
1877 }
1878
1879 async fn filter_positions(
1881 &self,
1882 symbols: Option<&[String]>,
1883 since: Option<i64>,
1884 limit: Option<usize>,
1885 ) -> Result<Vec<Position>> {
1886 let positions_map = self.positions.read().await;
1887
1888 let mut positions: Vec<Position> = if let Some(syms) = symbols {
1889 syms.iter()
1890 .filter_map(|sym| positions_map.get(sym))
1891 .flat_map(|side_map| side_map.values().cloned())
1892 .collect()
1893 } else {
1894 positions_map
1895 .values()
1896 .flat_map(|side_map| side_map.values().cloned())
1897 .collect()
1898 };
1899
1900 if let Some(since_ts) = since {
1901 positions.retain(|pos| pos.timestamp.is_some_and(|ts| ts >= since_ts));
1902 }
1903
1904 positions.sort_by(|a, b| {
1905 let ts_a = a.timestamp.unwrap_or(0);
1906 let ts_b = b.timestamp.unwrap_or(0);
1907 ts_b.cmp(&ts_a)
1908 });
1909
1910 if let Some(lim) = limit {
1911 positions.truncate(lim);
1912 }
1913
1914 Ok(positions)
1915 }
1916}
1917
1918include!("binance_impl.rs");
1920
1921#[cfg(test)]
1922#[allow(clippy::disallowed_methods)]
1923mod tests {
1924 use super::*;
1925 use streams::WS_BASE_URL;
1926 use types::{
1927 DEFAULT_ORDERBOOK_CAPACITY, DEFAULT_TICKER_CAPACITY, DEFAULT_TRADES_CAPACITY,
1928 DEFAULT_USER_DATA_CAPACITY,
1929 };
1930
1931 #[tokio::test]
1932 async fn test_binance_ws_creation() {
1933 let ws = BinanceWs::new(WS_BASE_URL.to_string());
1934 assert!(ws.listen_key.try_read().is_ok());
1935 }
1936
1937 #[test]
1938 fn test_stream_format() {
1939 let symbol = "btcusdt";
1940
1941 let ticker_stream = format!("{}@ticker", symbol);
1942 assert_eq!(ticker_stream, "btcusdt@ticker");
1943
1944 let trade_stream = format!("{}@trade", symbol);
1945 assert_eq!(trade_stream, "btcusdt@trade");
1946
1947 let depth_stream = format!("{}@depth20", symbol);
1948 assert_eq!(depth_stream, "btcusdt@depth20");
1949
1950 let kline_stream = format!("{}@kline_1m", symbol);
1951 assert_eq!(kline_stream, "btcusdt@kline_1m");
1952 }
1953
1954 #[tokio::test]
1955 async fn test_subscription_manager_basic() {
1956 let manager = SubscriptionManager::new();
1957 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
1958
1959 assert_eq!(manager.active_count(), 0);
1960 assert!(!manager.has_subscription("btcusdt@ticker").await);
1961
1962 manager
1963 .add_subscription(
1964 "btcusdt@ticker".to_string(),
1965 "BTCUSDT".to_string(),
1966 SubscriptionType::Ticker,
1967 tx.clone(),
1968 )
1969 .await
1970 .unwrap();
1971
1972 assert_eq!(manager.active_count(), 1);
1973 assert!(manager.has_subscription("btcusdt@ticker").await);
1974
1975 let sub = manager.get_subscription("btcusdt@ticker").await;
1976 assert!(sub.is_some());
1977 let sub = sub.unwrap();
1978 assert_eq!(sub.stream, "btcusdt@ticker");
1979 assert_eq!(sub.symbol, "BTCUSDT");
1980 assert_eq!(sub.sub_type, SubscriptionType::Ticker);
1981
1982 manager.remove_subscription("btcusdt@ticker").await.unwrap();
1983 assert_eq!(manager.active_count(), 0);
1984 assert!(!manager.has_subscription("btcusdt@ticker").await);
1985 }
1986
1987 #[test]
1988 fn test_symbol_conversion() {
1989 let symbol = "BTC/USDT";
1990 let binance_symbol = symbol.replace('/', "").to_lowercase();
1991 assert_eq!(binance_symbol, "btcusdt");
1992 }
1993
1994 #[test]
1995 fn test_ws_health_snapshot_default() {
1996 let health = WsHealthSnapshot::default();
1997 assert_eq!(health.messages_received, 0);
1998 assert_eq!(health.messages_dropped, 0);
1999 assert!(health.latency_ms.is_none());
2000 assert!(health.last_message_time.is_none());
2001 assert_eq!(health.connection_uptime_ms, 0);
2002 assert_eq!(health.reconnect_count, 0);
2003 }
2004
2005 #[test]
2006 fn test_ws_health_snapshot_is_healthy() {
2007 let mut health = WsHealthSnapshot::default();
2008
2009 assert!(health.is_healthy());
2011
2012 health.messages_received = 100;
2014 health.messages_dropped = 20; assert!(!health.is_healthy());
2016
2017 health.messages_dropped = 5; assert!(health.is_healthy());
2020 }
2021
2022 #[tokio::test]
2023 async fn test_shutdown_sets_flags() {
2024 let ws = BinanceWs::new("wss://stream.binance.com:9443/ws".to_string());
2025
2026 assert!(!ws.is_shutting_down());
2028
2029 let _ = ws.shutdown().await;
2031 assert!(ws.is_shutting_down());
2032 }
2033
2034 #[tokio::test]
2035 async fn test_shutdown_is_idempotent() {
2036 let ws = BinanceWs::new("wss://stream.binance.com:9443/ws".to_string());
2037
2038 let result1 = ws.shutdown().await;
2040 let result2 = ws.shutdown().await;
2041
2042 assert!(result1.is_ok());
2043 assert!(result2.is_ok());
2044 }
2045
2046 #[tokio::test]
2047 async fn test_channel_capacity_configuration() {
2048 let ws = BinanceWs::new("wss://stream.binance.com:9443/ws".to_string());
2050
2051 assert_eq!(
2052 ws.channel_capacity_for(&SubscriptionType::Ticker),
2053 DEFAULT_TICKER_CAPACITY
2054 );
2055 assert_eq!(
2056 ws.channel_capacity_for(&SubscriptionType::OrderBook),
2057 DEFAULT_ORDERBOOK_CAPACITY
2058 );
2059 assert_eq!(
2060 ws.channel_capacity_for(&SubscriptionType::Trades),
2061 DEFAULT_TRADES_CAPACITY
2062 );
2063 assert_eq!(
2064 ws.channel_capacity_for(&SubscriptionType::Balance),
2065 DEFAULT_USER_DATA_CAPACITY
2066 );
2067 }
2068
2069 #[tokio::test]
2070 async fn test_custom_channel_capacity_configuration() {
2071 use ccxt_core::ws_client::BackpressureStrategy;
2072
2073 let custom_config = WsChannelConfig {
2074 ticker_capacity: 128,
2075 orderbook_capacity: 256,
2076 trades_capacity: 512,
2077 user_data_capacity: 64,
2078 };
2079
2080 let config = BinanceWsConfig::new("wss://stream.binance.com:9443/ws".to_string())
2081 .with_channel_config(custom_config)
2082 .with_backpressure(BackpressureStrategy::DropOldest);
2083
2084 let ws = BinanceWs::new_with_config(config);
2085
2086 assert_eq!(ws.channel_capacity_for(&SubscriptionType::Ticker), 128);
2087 assert_eq!(ws.channel_capacity_for(&SubscriptionType::OrderBook), 256);
2088 assert_eq!(ws.channel_capacity_for(&SubscriptionType::Trades), 512);
2089 assert_eq!(ws.channel_capacity_for(&SubscriptionType::Balance), 64);
2090 assert_eq!(ws.channel_capacity_for(&SubscriptionType::Orders), 64);
2091 assert_eq!(ws.channel_capacity_for(&SubscriptionType::MyTrades), 64);
2092 assert_eq!(ws.channel_capacity_for(&SubscriptionType::Positions), 64);
2093 }
2094}