1pub mod connection_manager;
7mod handlers;
8mod listen_key;
9mod streams;
10mod subscriptions;
11pub(crate) mod user_data;
12
13pub use connection_manager::BinanceConnectionManager;
15pub use handlers::MessageRouter;
16pub use listen_key::ListenKeyManager;
17pub use streams::*;
18pub use subscriptions::{ReconnectConfig, Subscription, SubscriptionManager, SubscriptionType};
19
20use crate::binance::{Binance, parser};
21use ccxt_core::error::{Error, Result};
22use ccxt_core::types::{
23 Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
24};
25use serde_json::Value;
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28use std::time::Duration;
29use tokio::sync::{Mutex, RwLock};
30
31const MAX_TRADES: usize = 1000;
32const MAX_OHLCVS: usize = 1000;
33
34pub struct BinanceWs {
36 pub(crate) message_router: Arc<MessageRouter>,
37 pub(crate) subscription_manager: Arc<SubscriptionManager>,
38 listen_key: Arc<RwLock<Option<String>>>,
39 listen_key_manager: Option<Arc<ListenKeyManager>>,
40 pub(crate) tickers: Arc<Mutex<HashMap<String, Ticker>>>,
41 pub(crate) bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
42 #[allow(dead_code)]
43 mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
44 pub(crate) orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
45 pub(crate) trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
46 pub(crate) ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
47 pub(crate) balances: Arc<RwLock<HashMap<String, Balance>>>,
48 pub(crate) orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
49 pub(crate) my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
50 pub(crate) positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
51}
52
53impl std::fmt::Debug for BinanceWs {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("BinanceWs")
56 .field("is_connected", &self.message_router.is_connected())
57 .finish_non_exhaustive()
58 }
59}
60
61impl BinanceWs {
62 pub fn new(url: String) -> Self {
64 let subscription_manager = Arc::new(SubscriptionManager::new());
65 let message_router = Arc::new(MessageRouter::new(url, subscription_manager.clone(), None));
66
67 let router_clone = message_router.clone();
69 tokio::spawn(async move {
70 if let Err(e) = router_clone.start(None).await {
71 tracing::error!("Failed to start MessageRouter: {}", e);
72 }
73 });
74
75 Self {
76 message_router,
77 subscription_manager,
78 listen_key: Arc::new(RwLock::new(None)),
79 listen_key_manager: None,
80 tickers: Arc::new(Mutex::new(HashMap::new())),
81 bids_asks: Arc::new(Mutex::new(HashMap::new())),
82 mark_prices: Arc::new(Mutex::new(HashMap::new())),
83 orderbooks: Arc::new(Mutex::new(HashMap::new())),
84 trades: Arc::new(Mutex::new(HashMap::new())),
85 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
86 balances: Arc::new(RwLock::new(HashMap::new())),
87 orders: Arc::new(RwLock::new(HashMap::new())),
88 my_trades: Arc::new(RwLock::new(HashMap::new())),
89 positions: Arc::new(RwLock::new(HashMap::new())),
90 }
91 }
92
93 pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
95 let subscription_manager = Arc::new(SubscriptionManager::new());
96 let listen_key_manager = Arc::new(ListenKeyManager::new(binance));
97 let message_router = Arc::new(MessageRouter::new(
98 url,
99 subscription_manager.clone(),
100 Some(listen_key_manager.clone()),
101 ));
102
103 let router_clone = message_router.clone();
105 tokio::spawn(async move {
106 if let Err(e) = router_clone.start(None).await {
107 tracing::error!("Failed to start MessageRouter: {}", e);
108 }
109 });
110
111 Self {
112 message_router,
113 subscription_manager,
114 listen_key: Arc::new(RwLock::new(None)),
115 listen_key_manager: Some(listen_key_manager),
116 tickers: Arc::new(Mutex::new(HashMap::new())),
117 bids_asks: Arc::new(Mutex::new(HashMap::new())),
118 mark_prices: Arc::new(Mutex::new(HashMap::new())),
119 orderbooks: Arc::new(Mutex::new(HashMap::new())),
120 trades: Arc::new(Mutex::new(HashMap::new())),
121 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
122 balances: Arc::new(RwLock::new(HashMap::new())),
123 orders: Arc::new(RwLock::new(HashMap::new())),
124 my_trades: Arc::new(RwLock::new(HashMap::new())),
125 positions: Arc::new(RwLock::new(HashMap::new())),
126 }
127 }
128
129 pub async fn connect(&self) -> Result<()> {
131 if self.is_connected() {
132 return Ok(());
133 }
134
135 self.message_router.start(None).await?;
136
137 Ok(())
141 }
142
143 pub async fn disconnect(&self) -> Result<()> {
145 self.message_router.stop().await?;
146
147 if let Some(manager) = &self.listen_key_manager {
148 manager.stop_auto_refresh().await;
149 }
150
151 Ok(())
152 }
153
154 pub async fn connect_user_stream(&self) -> Result<()> {
156 let manager = self.listen_key_manager.as_ref()
157 .ok_or_else(|| Error::invalid_request(
158 "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
159 ))?;
160
161 let listen_key = manager.get_or_create().await?;
162
163 let base_url = self.message_router.get_url();
164 let base_url = if let Some(stripped) = base_url.strip_suffix('/') {
165 stripped
166 } else {
167 &base_url
168 };
169
170 let url = format!("{}/{}", base_url, listen_key);
171
172 self.message_router.start(Some(url)).await?;
173 manager.start_auto_refresh().await;
174 *self.listen_key.write().await = Some(listen_key);
175
176 Ok(())
177 }
178
179 pub async fn close_user_stream(&self) -> Result<()> {
181 if let Some(manager) = &self.listen_key_manager {
182 manager.delete().await?;
183 }
184 *self.listen_key.write().await = None;
185 Ok(())
186 }
187
188 pub async fn get_listen_key(&self) -> Option<String> {
190 if let Some(manager) = &self.listen_key_manager {
191 manager.get_current().await
192 } else {
193 self.listen_key.read().await.clone()
194 }
195 }
196
197 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
199 let stream = format!("{}@ticker", symbol.to_lowercase());
200 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
201
202 self.subscription_manager
203 .add_subscription(
204 stream.clone(),
205 symbol.to_string(),
206 SubscriptionType::Ticker,
207 tx,
208 )
209 .await?;
210
211 self.message_router.subscribe(vec![stream]).await
212 }
213
214 pub async fn subscribe_all_tickers(&self) -> Result<()> {
216 let stream = "!ticker@arr".to_string();
217 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
218
219 self.subscription_manager
220 .add_subscription(
221 stream.clone(),
222 "all".to_string(),
223 SubscriptionType::Ticker,
224 tx,
225 )
226 .await?;
227
228 self.message_router.subscribe(vec![stream]).await
229 }
230
231 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
233 let stream = format!("{}@trade", symbol.to_lowercase());
234 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
235
236 self.subscription_manager
237 .add_subscription(
238 stream.clone(),
239 symbol.to_string(),
240 SubscriptionType::Trades,
241 tx,
242 )
243 .await?;
244
245 self.message_router.subscribe(vec![stream]).await
246 }
247
248 pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
250 let stream = format!("{}@aggTrade", symbol.to_lowercase());
251 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
252
253 self.subscription_manager
254 .add_subscription(
255 stream.clone(),
256 symbol.to_string(),
257 SubscriptionType::Trades,
258 tx,
259 )
260 .await?;
261
262 self.message_router.subscribe(vec![stream]).await
263 }
264
265 pub async fn subscribe_orderbook(
267 &self,
268 symbol: &str,
269 levels: u32,
270 update_speed: &str,
271 ) -> Result<()> {
272 let stream = if update_speed == "100ms" {
273 format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
274 } else {
275 format!("{}@depth{}", symbol.to_lowercase(), levels)
276 };
277 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
278
279 self.subscription_manager
280 .add_subscription(
281 stream.clone(),
282 symbol.to_string(),
283 SubscriptionType::OrderBook,
284 tx,
285 )
286 .await?;
287
288 self.message_router.subscribe(vec![stream]).await
289 }
290
291 pub async fn subscribe_orderbook_diff(
293 &self,
294 symbol: &str,
295 update_speed: Option<&str>,
296 ) -> Result<()> {
297 let stream = if let Some(speed) = update_speed {
298 if speed == "100ms" {
299 format!("{}@depth@100ms", symbol.to_lowercase())
300 } else {
301 format!("{}@depth", symbol.to_lowercase())
302 }
303 } else {
304 format!("{}@depth", symbol.to_lowercase())
305 };
306 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
307
308 self.subscription_manager
309 .add_subscription(
310 stream.clone(),
311 symbol.to_string(),
312 SubscriptionType::OrderBook,
313 tx,
314 )
315 .await?;
316
317 self.message_router.subscribe(vec![stream]).await
318 }
319
320 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
322 let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
323 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
324
325 self.subscription_manager
326 .add_subscription(
327 stream.clone(),
328 symbol.to_string(),
329 SubscriptionType::Kline(interval.to_string()),
330 tx,
331 )
332 .await?;
333
334 self.message_router.subscribe(vec![stream]).await
335 }
336
337 pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
339 let stream = format!("{}@miniTicker", symbol.to_lowercase());
340 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
341
342 self.subscription_manager
343 .add_subscription(
344 stream.clone(),
345 symbol.to_string(),
346 SubscriptionType::Ticker,
347 tx,
348 )
349 .await?;
350
351 self.message_router.subscribe(vec![stream]).await
352 }
353
354 pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
356 let stream = "!miniTicker@arr".to_string();
357 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
358
359 self.subscription_manager
360 .add_subscription(
361 stream.clone(),
362 "all".to_string(),
363 SubscriptionType::Ticker,
364 tx,
365 )
366 .await?;
367
368 self.message_router.subscribe(vec![stream]).await
369 }
370
371 pub async fn unsubscribe(&self, stream: String) -> Result<()> {
373 self.subscription_manager
374 .remove_subscription(&stream)
375 .await?;
376 self.message_router.unsubscribe(vec![stream]).await
377 }
378
379 pub fn receive(&self) -> Option<Value> {
381 None
382 }
383
384 pub fn is_connected(&self) -> bool {
386 self.message_router.is_connected()
387 }
388
389 pub fn state(&self) -> ccxt_core::ws_client::WsConnectionState {
391 if self.message_router.is_connected() {
392 ccxt_core::ws_client::WsConnectionState::Connected
393 } else {
394 ccxt_core::ws_client::WsConnectionState::Disconnected
395 }
396 }
397
398 pub fn subscriptions(&self) -> Vec<String> {
404 let subs = self.subscription_manager.get_all_subscriptions_sync();
405 subs.into_iter().map(|s| s.stream).collect()
406 }
407
408 pub fn subscription_count(&self) -> usize {
410 self.subscription_manager.active_count()
411 }
412
413 async fn watch_mark_price_internal(
415 &self,
416 symbol: &str,
417 channel_name: &str,
418 ) -> Result<MarkPrice> {
419 let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
420 tracing::debug!(
421 "watch_mark_price_internal: stream={}, symbol={}",
422 stream,
423 symbol
424 );
425
426 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
427 self.subscription_manager
428 .add_subscription(
429 stream.clone(),
430 symbol.to_string(),
431 SubscriptionType::MarkPrice,
432 tx,
433 )
434 .await?;
435
436 self.message_router.subscribe(vec![stream.clone()]).await?;
437
438 loop {
439 if let Some(message) = rx.recv().await {
440 if message.get("result").is_some() {
441 tracing::debug!("Received subscription result for {}", stream);
442 continue;
443 }
444
445 tracing::debug!("Received mark price message for {}", stream);
446
447 match parser::parse_ws_mark_price(&message) {
448 Ok(mark_price) => {
449 let mut mark_prices = self.mark_prices.lock().await;
450 mark_prices.insert(mark_price.symbol.clone(), mark_price.clone());
451 return Ok(mark_price);
452 }
453 Err(e) => {
454 tracing::warn!(
455 "Failed to parse mark price message for stream {}: {:?}. Payload: {:?}",
456 stream,
457 e,
458 message
459 );
460 }
461 }
462 } else {
463 tracing::warn!("Subscription channel closed for {}", stream);
464 return Err(Error::network("Subscription channel closed"));
465 }
466 }
467 }
468
469 async fn watch_mark_prices_internal(
471 &self,
472 symbols: Option<Vec<String>>,
473 channel_name: &str,
474 ) -> Result<HashMap<String, MarkPrice>> {
475 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
476
477 let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
478 let mut streams = Vec::with_capacity(syms.len());
479 for sym in syms {
480 let symbol = sym.to_lowercase();
481 let stream = format!("{}@{}", symbol, channel_name);
482 self.subscription_manager
483 .add_subscription(
484 stream.clone(),
485 symbol,
486 SubscriptionType::MarkPrice,
487 tx.clone(),
488 )
489 .await?;
490 streams.push(stream);
491 }
492 streams
493 } else {
494 let stream = format!("!{}@arr", channel_name);
495 self.subscription_manager
496 .add_subscription(
497 stream.clone(),
498 "all".to_string(),
499 SubscriptionType::MarkPrice,
500 tx.clone(),
501 )
502 .await?;
503 vec![stream]
504 };
505
506 self.message_router.subscribe(streams.clone()).await?;
507
508 let mut result = HashMap::new();
509
510 loop {
511 if let Some(message) = rx.recv().await {
512 if message.get("result").is_some() {
513 continue;
514 }
515
516 if let Some(arr) = message.as_array() {
517 for item in arr {
518 if let Ok(mark_price) = parser::parse_ws_mark_price(item) {
519 let symbol = mark_price.symbol.clone();
520
521 if let Some(syms) = &symbols {
522 if syms.contains(&symbol.to_lowercase()) {
523 result.insert(symbol.clone(), mark_price.clone());
524 }
525 } else {
526 result.insert(symbol.clone(), mark_price.clone());
527 }
528
529 let mut mark_prices = self.mark_prices.lock().await;
530 mark_prices.insert(symbol, mark_price);
531 } else {
532 tracing::warn!("Failed to parse item in mark price array: {:?}", item);
533 }
534 }
535
536 if let Some(syms) = &symbols {
537 if result.len() >= syms.len() {
538 return Ok(result);
539 }
540 } else {
541 return Ok(result);
543 }
544 } else {
545 match parser::parse_ws_mark_price(&message) {
546 Ok(mark_price) => {
547 let symbol = mark_price.symbol.clone();
548 result.insert(symbol.clone(), mark_price.clone());
549
550 let mut mark_prices = self.mark_prices.lock().await;
551 mark_prices.insert(symbol, mark_price);
552
553 if let Some(syms) = &symbols {
554 if result.len() >= syms.len() {
555 return Ok(result);
556 }
557 }
558 }
559 Err(e) => {
560 tracing::warn!(
561 "Failed to parse mark price message: {:?}. Payload: {:?}",
562 e,
563 message
564 );
565 }
566 }
567 }
568 } else {
569 return Err(Error::network("Subscription channel closed"));
570 }
571 }
572 }
573
574 async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
576 let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
577
578 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
579 self.subscription_manager
580 .add_subscription(
581 stream.clone(),
582 symbol.to_string(),
583 SubscriptionType::Ticker,
584 tx,
585 )
586 .await?;
587
588 self.message_router.subscribe(vec![stream.clone()]).await?;
589
590 loop {
591 if let Some(message) = rx.recv().await {
592 if message.get("result").is_some() {
593 continue;
594 }
595
596 match parser::parse_ws_ticker(&message, None) {
597 Ok(ticker) => {
598 let mut tickers = self.tickers.lock().await;
599 tickers.insert(ticker.symbol.clone(), ticker.clone());
600 return Ok(ticker);
601 }
602 Err(e) => {
603 tracing::warn!(
604 "Failed to parse ticker message for stream {}: {:?}. Payload: {:?}",
605 stream,
606 e,
607 message
608 );
609 }
611 }
612 } else {
613 return Err(Error::network("Subscription channel closed"));
614 }
615 }
616 }
617
618 async fn watch_tickers_internal(
620 &self,
621 symbols: Option<Vec<String>>,
622 channel_name: &str,
623 ) -> Result<HashMap<String, Ticker>> {
624 let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
625 syms.iter()
626 .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
627 .collect()
628 } else {
629 vec![format!("!{}@arr", channel_name)]
630 };
631
632 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
633
634 for stream in &streams {
635 self.subscription_manager
636 .add_subscription(
637 stream.clone(),
638 "all".to_string(),
639 SubscriptionType::Ticker,
640 tx.clone(),
641 )
642 .await?;
643 }
644
645 self.message_router.subscribe(streams.clone()).await?;
646
647 let mut result = HashMap::new();
648
649 loop {
650 if let Some(message) = rx.recv().await {
651 if message.get("result").is_some() {
652 continue;
653 }
654
655 if let Some(arr) = message.as_array() {
656 for item in arr {
657 if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
658 let symbol = ticker.symbol.clone();
659
660 if let Some(syms) = &symbols {
661 if syms.contains(&symbol.to_lowercase()) {
662 result.insert(symbol.clone(), ticker.clone());
663 }
664 } else {
665 result.insert(symbol.clone(), ticker.clone());
666 }
667
668 let mut tickers = self.tickers.lock().await;
669 tickers.insert(symbol, ticker);
670 } else {
671 tracing::warn!("Failed to parse item in ticker array: {:?}", item);
672 }
673 }
674
675 if let Some(syms) = &symbols {
676 if result.len() >= syms.len() {
677 return Ok(result);
678 }
679 } else {
680 return Ok(result);
684 }
685 } else {
686 match parser::parse_ws_ticker(&message, None) {
687 Ok(ticker) => {
688 let symbol = ticker.symbol.clone();
689 result.insert(symbol.clone(), ticker.clone());
690
691 let mut tickers = self.tickers.lock().await;
692 tickers.insert(symbol, ticker);
693
694 if let Some(syms) = &symbols {
695 if result.len() >= syms.len() {
696 return Ok(result);
697 }
698 }
699 }
700 Err(e) => {
701 tracing::warn!(
702 "Failed to parse ticker message: {:?}. Payload: {:?}",
703 e,
704 message
705 );
706 }
707 }
708 }
709 } else {
710 return Err(Error::network("Subscription channel closed"));
711 }
712 }
713 }
714
715 async fn handle_orderbook_delta(
717 &self,
718 symbol: &str,
719 delta_message: &Value,
720 is_futures: bool,
721 ) -> Result<()> {
722 handlers::handle_orderbook_delta(symbol, delta_message, is_futures, &self.orderbooks).await
723 }
724
725 async fn fetch_orderbook_snapshot(
727 &self,
728 exchange: &Binance,
729 symbol: &str,
730 limit: Option<i64>,
731 is_futures: bool,
732 ) -> Result<OrderBook> {
733 handlers::fetch_orderbook_snapshot(exchange, symbol, limit, is_futures, &self.orderbooks)
734 .await
735 }
736
737 async fn watch_orderbook_internal(
739 &self,
740 exchange: &Binance,
741 symbol: &str,
742 limit: Option<i64>,
743 update_speed: i32,
744 is_futures: bool,
745 ) -> Result<OrderBook> {
746 let stream = if update_speed == 100 {
747 format!("{}@depth@100ms", symbol.to_lowercase())
748 } else {
749 format!("{}@depth", symbol.to_lowercase())
750 };
751
752 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
753 self.subscription_manager
754 .add_subscription(
755 stream.clone(),
756 symbol.to_string(),
757 SubscriptionType::OrderBook,
758 tx,
759 )
760 .await?;
761
762 self.message_router.subscribe(vec![stream.clone()]).await?;
763
764 tokio::time::sleep(Duration::from_millis(500)).await;
765
766 let _snapshot = self
767 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
768 .await?;
769
770 loop {
771 if let Some(message) = rx.recv().await {
772 if message.get("result").is_some() {
773 continue;
774 }
775
776 if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
777 if event_type == "depthUpdate" {
778 match self
779 .handle_orderbook_delta(symbol, &message, is_futures)
780 .await
781 {
782 Ok(()) => {
783 let orderbooks = self.orderbooks.lock().await;
784 if let Some(ob) = orderbooks.get(symbol) {
785 if ob.is_synced {
786 return Ok(ob.clone());
787 }
788 }
789 }
790 Err(e) => {
791 let err_msg = e.to_string();
792
793 if err_msg.contains("RESYNC_NEEDED") {
794 tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
795
796 let current_time = chrono::Utc::now().timestamp_millis();
797 let should_resync = {
798 let orderbooks = self.orderbooks.lock().await;
799 if let Some(ob) = orderbooks.get(symbol) {
800 ob.should_resync(current_time)
801 } else {
802 true
803 }
804 };
805
806 if should_resync {
807 tracing::info!("Initiating resync for {}", symbol);
808
809 {
810 let mut orderbooks = self.orderbooks.lock().await;
811 if let Some(ob) = orderbooks.get_mut(symbol) {
812 ob.reset_for_resync();
813 ob.mark_resync_initiated(current_time);
814 }
815 }
816
817 tokio::time::sleep(Duration::from_millis(500)).await;
818
819 match self
820 .fetch_orderbook_snapshot(
821 exchange, symbol, limit, is_futures,
822 )
823 .await
824 {
825 Ok(_) => {
826 tracing::info!(
827 "Resync completed successfully for {}",
828 symbol
829 );
830 continue;
831 }
832 Err(resync_err) => {
833 tracing::error!(
834 "Resync failed for {}: {}",
835 symbol,
836 resync_err
837 );
838 return Err(resync_err);
839 }
840 }
841 }
842 tracing::debug!("Resync rate limited for {}, skipping", symbol);
843 }
844 tracing::error!("Failed to handle orderbook delta: {}", err_msg);
845 }
846 }
847 }
848 }
849 } else {
850 return Err(Error::network("Subscription channel closed"));
851 }
852 }
853 }
854
855 async fn watch_orderbooks_internal(
857 &self,
858 exchange: &Binance,
859 symbols: Vec<String>,
860 limit: Option<i64>,
861 update_speed: i32,
862 is_futures: bool,
863 ) -> Result<HashMap<String, OrderBook>> {
864 if symbols.len() > 200 {
865 return Err(Error::invalid_request(
866 "Binance supports max 200 symbols per connection",
867 ));
868 }
869
870 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
871 let mut streams = Vec::new();
872
873 for symbol in &symbols {
874 let stream = if update_speed == 100 {
875 format!("{}@depth@100ms", symbol.to_lowercase())
876 } else {
877 format!("{}@depth", symbol.to_lowercase())
878 };
879
880 streams.push(stream.clone());
881
882 self.subscription_manager
883 .add_subscription(
884 stream,
885 symbol.clone(),
886 SubscriptionType::OrderBook,
887 tx.clone(),
888 )
889 .await?;
890 }
891
892 self.message_router.subscribe(streams).await?;
893
894 tokio::time::sleep(Duration::from_millis(500)).await;
895
896 for symbol in &symbols {
897 let _ = self
898 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
899 .await;
900 }
901
902 let mut result = HashMap::new();
903 let mut synced_symbols = std::collections::HashSet::new();
904
905 while synced_symbols.len() < symbols.len() {
906 if let Some(message) = rx.recv().await {
907 if message.get("result").is_some() {
908 continue;
909 }
910
911 if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
912 if event_type == "depthUpdate" {
913 if let Some(msg_symbol) =
914 message.get("s").and_then(serde_json::Value::as_str)
915 {
916 if let Err(e) = self
917 .handle_orderbook_delta(msg_symbol, &message, is_futures)
918 .await
919 {
920 tracing::error!("Failed to handle orderbook delta: {}", e);
921 continue;
922 }
923
924 let orderbooks = self.orderbooks.lock().await;
925 if let Some(ob) = orderbooks.get(msg_symbol) {
926 if ob.is_synced {
927 synced_symbols.insert(msg_symbol.to_string());
928 }
929 }
930 }
931 }
932 }
933 } else {
934 return Err(Error::network("Subscription channel closed"));
935 }
936 }
937
938 let orderbooks = self.orderbooks.lock().await;
939 for symbol in &symbols {
940 if let Some(ob) = orderbooks.get(symbol) {
941 result.insert(symbol.clone(), ob.clone());
942 }
943 }
944
945 Ok(result)
946 }
947
948 async fn watch_bids_asks_internal(&self, symbol: &str, market_id: &str) -> Result<BidAsk> {
950 let stream = format!("{}@bookTicker", market_id.to_lowercase());
951 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
952
953 self.subscription_manager
954 .add_subscription(
955 stream.clone(),
956 symbol.to_string(),
957 SubscriptionType::BookTicker,
958 tx,
959 )
960 .await?;
961
962 self.message_router.subscribe(vec![stream.clone()]).await?;
963
964 loop {
965 if let Some(message) = rx.recv().await {
966 if message.get("result").is_some() {
967 continue;
968 }
969
970 if let Ok(bid_ask) = parser::parse_ws_bid_ask(&message) {
971 let mut bids_asks_map = self.bids_asks.lock().await;
972 bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
973 return Ok(bid_ask);
974 }
975 } else {
976 return Err(Error::network("Subscription channel closed"));
977 }
978 }
979 }
980
981 async fn watch_trades_internal(
983 &self,
984 symbol: &str,
985 market_id: &str,
986 since: Option<i64>,
987 limit: Option<usize>,
988 market: Option<&ccxt_core::types::Market>,
989 ) -> Result<Vec<Trade>> {
990 let stream = format!("{}@trade", market_id.to_lowercase());
991 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
992
993 self.subscription_manager
994 .add_subscription(
995 stream.clone(),
996 symbol.to_string(),
997 SubscriptionType::Trades,
998 tx,
999 )
1000 .await?;
1001
1002 self.message_router.subscribe(vec![stream.clone()]).await?;
1003
1004 loop {
1008 if let Some(message) = rx.recv().await {
1009 if message.get("result").is_some() {
1010 continue;
1011 }
1012
1013 if let Ok(trade) = parser::parse_ws_trade(&message, market) {
1014 let mut trades_map = self.trades.lock().await;
1015 let trades = trades_map
1016 .entry(symbol.to_string())
1017 .or_insert_with(VecDeque::new);
1018
1019 if trades.len() >= MAX_TRADES {
1020 trades.pop_front();
1021 }
1022 trades.push_back(trade);
1023
1024 let mut result: Vec<Trade> = trades.iter().cloned().collect();
1025
1026 if let Some(since_ts) = since {
1027 result.retain(|t| t.timestamp >= since_ts);
1028 }
1029
1030 if let Some(limit_size) = limit {
1031 if result.len() > limit_size {
1032 result = result.split_off(result.len() - limit_size);
1033 }
1034 }
1035
1036 return Ok(result);
1037 }
1038 } else {
1039 return Err(Error::network("Subscription channel closed"));
1040 }
1041 }
1042 }
1043
1044 async fn watch_ohlcv_internal(
1046 &self,
1047 symbol: &str,
1048 market_id: &str,
1049 timeframe: &str,
1050 since: Option<i64>,
1051 limit: Option<usize>,
1052 ) -> Result<Vec<OHLCV>> {
1053 let stream = format!("{}@kline_{}", market_id.to_lowercase(), timeframe);
1054 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1055
1056 self.subscription_manager
1057 .add_subscription(
1058 stream.clone(),
1059 symbol.to_string(),
1060 SubscriptionType::Kline(timeframe.to_string()),
1061 tx,
1062 )
1063 .await?;
1064
1065 self.message_router.subscribe(vec![stream.clone()]).await?;
1066
1067 loop {
1068 if let Some(message) = rx.recv().await {
1069 if message.get("result").is_some() {
1070 continue;
1071 }
1072
1073 if let Ok(ohlcv) = parser::parse_ws_ohlcv(&message) {
1074 let cache_key = format!("{}:{}", symbol, timeframe);
1075 let mut ohlcvs_map = self.ohlcvs.lock().await;
1076 let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
1077
1078 if ohlcvs.len() >= MAX_OHLCVS {
1079 ohlcvs.pop_front();
1080 }
1081 ohlcvs.push_back(ohlcv);
1082
1083 let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
1084
1085 if let Some(since_ts) = since {
1086 result.retain(|o| o.timestamp >= since_ts);
1087 }
1088
1089 if let Some(limit_size) = limit {
1090 if result.len() > limit_size {
1091 result = result.split_off(result.len() - limit_size);
1092 }
1093 }
1094
1095 return Ok(result);
1096 }
1097 } else {
1098 return Err(Error::network("Subscription channel closed"));
1099 }
1100 }
1101 }
1102
1103 pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1105 let tickers = self.tickers.lock().await;
1106 tickers.get(symbol).cloned()
1107 }
1108
1109 pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1111 let tickers = self.tickers.lock().await;
1112 tickers.clone()
1113 }
1114
1115 async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1117 user_data::handle_balance_message(message, account_type, &self.balances).await
1118 }
1119
1120 async fn watch_balance_internal(&self, account_type: &str) -> Result<Balance> {
1122 self.connect_user_stream().await?;
1123
1124 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1125
1126 self.subscription_manager
1127 .add_subscription(
1128 "!userData".to_string(),
1129 "user".to_string(),
1130 SubscriptionType::Balance,
1131 tx,
1132 )
1133 .await?;
1134
1135 loop {
1136 if let Some(message) = rx.recv().await {
1137 if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
1138 if matches!(
1139 event_type,
1140 "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE"
1141 ) {
1142 if let Ok(()) = self.handle_balance_message(&message, account_type).await {
1143 let balances = self.balances.read().await;
1144 if let Some(balance) = balances.get(account_type) {
1145 return Ok(balance.clone());
1146 }
1147 }
1148 }
1149 }
1150 } else {
1151 return Err(Error::network("Subscription channel closed"));
1152 }
1153 }
1154 }
1155
1156 async fn watch_orders_internal(
1158 &self,
1159 symbol: Option<&str>,
1160 since: Option<i64>,
1161 limit: Option<usize>,
1162 ) -> Result<Vec<Order>> {
1163 self.connect_user_stream().await?;
1164
1165 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1166
1167 self.subscription_manager
1168 .add_subscription(
1169 "!userData".to_string(),
1170 "user".to_string(),
1171 SubscriptionType::Orders,
1172 tx,
1173 )
1174 .await?;
1175
1176 loop {
1177 if let Some(message) = rx.recv().await {
1178 if let Value::Object(data) = message {
1179 if let Some(event_type) = data.get("e").and_then(serde_json::Value::as_str) {
1180 if event_type == "executionReport" {
1181 let order = user_data::parse_ws_order(&data);
1182
1183 let mut orders = self.orders.write().await;
1184 let symbol_orders = orders
1185 .entry(order.symbol.clone())
1186 .or_insert_with(HashMap::new);
1187 symbol_orders.insert(order.id.clone(), order.clone());
1188 drop(orders);
1189
1190 if let Some(exec_type) =
1191 data.get("x").and_then(serde_json::Value::as_str)
1192 {
1193 if exec_type == "TRADE" {
1194 if let Ok(trade) =
1195 BinanceWs::parse_ws_trade(&Value::Object(data.clone()))
1196 {
1197 let mut trades = self.my_trades.write().await;
1198 let symbol_trades = trades
1199 .entry(trade.symbol.clone())
1200 .or_insert_with(VecDeque::new);
1201
1202 symbol_trades.push_front(trade);
1203 if symbol_trades.len() > 1000 {
1204 symbol_trades.pop_back();
1205 }
1206 }
1207 }
1208 }
1209
1210 return self.filter_orders(symbol, since, limit).await;
1211 }
1212 }
1213 }
1214 } else {
1215 return Err(Error::network("Subscription channel closed"));
1216 }
1217 }
1218 }
1219
1220 async fn watch_my_trades_internal(
1222 &self,
1223 symbol: Option<&str>,
1224 since: Option<i64>,
1225 limit: Option<usize>,
1226 ) -> Result<Vec<Trade>> {
1227 self.connect_user_stream().await?;
1228
1229 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1230
1231 self.subscription_manager
1232 .add_subscription(
1233 "!userData".to_string(),
1234 "user".to_string(),
1235 SubscriptionType::MyTrades,
1236 tx,
1237 )
1238 .await?;
1239
1240 loop {
1241 if let Some(msg) = rx.recv().await {
1242 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
1243 if event_type == "executionReport" {
1244 if let Ok(trade) = BinanceWs::parse_ws_trade(&msg) {
1245 let symbol_key = trade.symbol.clone();
1246
1247 let mut trades_map = self.my_trades.write().await;
1248 let symbol_trades =
1249 trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
1250
1251 symbol_trades.push_front(trade);
1252 if symbol_trades.len() > 1000 {
1253 symbol_trades.pop_back();
1254 }
1255
1256 drop(trades_map);
1257 return self.filter_my_trades(symbol, since, limit).await;
1258 }
1259 }
1260 }
1261 } else {
1262 return Err(Error::network("Subscription channel closed"));
1263 }
1264 }
1265 }
1266
1267 async fn watch_positions_internal(
1269 &self,
1270 symbols: Option<Vec<String>>,
1271 since: Option<i64>,
1272 limit: Option<usize>,
1273 ) -> Result<Vec<Position>> {
1274 self.connect_user_stream().await?;
1275
1276 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
1277
1278 self.subscription_manager
1279 .add_subscription(
1280 "!userData".to_string(),
1281 "user".to_string(),
1282 SubscriptionType::Positions,
1283 tx,
1284 )
1285 .await?;
1286
1287 loop {
1288 if let Some(msg) = rx.recv().await {
1289 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
1290 if event_type == "ACCOUNT_UPDATE" {
1291 if let Some(account_data) = msg.get("a") {
1292 if let Some(positions_array) =
1293 account_data.get("P").and_then(|p| p.as_array())
1294 {
1295 for position_data in positions_array {
1296 if let Ok(position) =
1297 BinanceWs::parse_ws_position(position_data)
1298 {
1299 let symbol_key = position.symbol.clone();
1300 let side_key = position
1301 .side
1302 .clone()
1303 .unwrap_or_else(|| "both".to_string());
1304
1305 let mut positions_map = self.positions.write().await;
1306 let symbol_positions = positions_map
1307 .entry(symbol_key)
1308 .or_insert_with(HashMap::new);
1309
1310 if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
1311 symbol_positions.remove(&side_key);
1312 if symbol_positions.is_empty() {
1313 positions_map.remove(&position.symbol);
1314 }
1315 } else {
1316 symbol_positions.insert(side_key, position);
1317 }
1318 }
1319 }
1320
1321 let symbols_ref = symbols.as_deref();
1322 return self.filter_positions(symbols_ref, since, limit).await;
1323 }
1324 }
1325 }
1326 }
1327 } else {
1328 return Err(Error::network("Subscription channel closed"));
1329 }
1330 }
1331 }
1332
1333 async fn filter_orders(
1335 &self,
1336 symbol: Option<&str>,
1337 since: Option<i64>,
1338 limit: Option<usize>,
1339 ) -> Result<Vec<Order>> {
1340 let orders_map = self.orders.read().await;
1341
1342 let mut orders: Vec<Order> = if let Some(sym) = symbol {
1343 orders_map
1344 .get(sym)
1345 .map(|symbol_orders| symbol_orders.values().cloned().collect())
1346 .unwrap_or_default()
1347 } else {
1348 orders_map
1349 .values()
1350 .flat_map(|symbol_orders| symbol_orders.values().cloned())
1351 .collect()
1352 };
1353
1354 if let Some(since_ts) = since {
1355 orders.retain(|order| order.timestamp.is_some_and(|ts| ts >= since_ts));
1356 }
1357
1358 orders.sort_by(|a, b| {
1359 let ts_a = a.timestamp.unwrap_or(0);
1360 let ts_b = b.timestamp.unwrap_or(0);
1361 ts_b.cmp(&ts_a)
1362 });
1363
1364 if let Some(lim) = limit {
1365 orders.truncate(lim);
1366 }
1367
1368 Ok(orders)
1369 }
1370
1371 fn parse_ws_trade(data: &Value) -> Result<Trade> {
1373 user_data::parse_ws_trade(data)
1374 }
1375
1376 async fn filter_my_trades(
1378 &self,
1379 symbol: Option<&str>,
1380 since: Option<i64>,
1381 limit: Option<usize>,
1382 ) -> Result<Vec<Trade>> {
1383 let trades_map = self.my_trades.read().await;
1384
1385 let mut trades: Vec<Trade> = if let Some(sym) = symbol {
1386 trades_map
1387 .get(sym)
1388 .map(|symbol_trades| symbol_trades.iter().cloned().collect())
1389 .unwrap_or_default()
1390 } else {
1391 trades_map
1392 .values()
1393 .flat_map(|symbol_trades| symbol_trades.iter().cloned())
1394 .collect()
1395 };
1396
1397 if let Some(since_ts) = since {
1398 trades.retain(|trade| trade.timestamp >= since_ts);
1399 }
1400
1401 trades.sort_by(|a, b| {
1402 let ts_a = a.timestamp;
1403 let ts_b = b.timestamp;
1404 ts_b.cmp(&ts_a)
1405 });
1406
1407 if let Some(lim) = limit {
1408 trades.truncate(lim);
1409 }
1410
1411 Ok(trades)
1412 }
1413
1414 fn parse_ws_position(data: &Value) -> Result<Position> {
1416 user_data::parse_ws_position(data)
1417 }
1418
1419 async fn filter_positions(
1421 &self,
1422 symbols: Option<&[String]>,
1423 since: Option<i64>,
1424 limit: Option<usize>,
1425 ) -> Result<Vec<Position>> {
1426 let positions_map = self.positions.read().await;
1427
1428 let mut positions: Vec<Position> = if let Some(syms) = symbols {
1429 syms.iter()
1430 .filter_map(|sym| positions_map.get(sym))
1431 .flat_map(|side_map| side_map.values().cloned())
1432 .collect()
1433 } else {
1434 positions_map
1435 .values()
1436 .flat_map(|side_map| side_map.values().cloned())
1437 .collect()
1438 };
1439
1440 if let Some(since_ts) = since {
1441 positions.retain(|pos| pos.timestamp.is_some_and(|ts| ts >= since_ts));
1442 }
1443
1444 positions.sort_by(|a, b| {
1445 let ts_a = a.timestamp.unwrap_or(0);
1446 let ts_b = b.timestamp.unwrap_or(0);
1447 ts_b.cmp(&ts_a)
1448 });
1449
1450 if let Some(lim) = limit {
1451 positions.truncate(lim);
1452 }
1453
1454 Ok(positions)
1455 }
1456}
1457
1458include!("binance_impl.rs");
1460
1461#[cfg(test)]
1462#[allow(clippy::disallowed_methods)]
1463mod tests {
1464 use super::*;
1465
1466 #[tokio::test]
1467 async fn test_binance_ws_creation() {
1468 let ws = BinanceWs::new(WS_BASE_URL.to_string());
1469 assert!(ws.listen_key.try_read().is_ok());
1470 }
1471
1472 #[test]
1473 fn test_stream_format() {
1474 let symbol = "btcusdt";
1475
1476 let ticker_stream = format!("{}@ticker", symbol);
1477 assert_eq!(ticker_stream, "btcusdt@ticker");
1478
1479 let trade_stream = format!("{}@trade", symbol);
1480 assert_eq!(trade_stream, "btcusdt@trade");
1481
1482 let depth_stream = format!("{}@depth20", symbol);
1483 assert_eq!(depth_stream, "btcusdt@depth20");
1484
1485 let kline_stream = format!("{}@kline_1m", symbol);
1486 assert_eq!(kline_stream, "btcusdt@kline_1m");
1487 }
1488
1489 #[tokio::test]
1490 async fn test_subscription_manager_basic() {
1491 let manager = SubscriptionManager::new();
1492 let (tx, _rx) = tokio::sync::mpsc::channel(1024);
1493
1494 assert_eq!(manager.active_count(), 0);
1495 assert!(!manager.has_subscription("btcusdt@ticker").await);
1496
1497 manager
1498 .add_subscription(
1499 "btcusdt@ticker".to_string(),
1500 "BTCUSDT".to_string(),
1501 SubscriptionType::Ticker,
1502 tx.clone(),
1503 )
1504 .await
1505 .unwrap();
1506
1507 assert_eq!(manager.active_count(), 1);
1508 assert!(manager.has_subscription("btcusdt@ticker").await);
1509
1510 let sub = manager.get_subscription("btcusdt@ticker").await;
1511 assert!(sub.is_some());
1512 let sub = sub.unwrap();
1513 assert_eq!(sub.stream, "btcusdt@ticker");
1514 assert_eq!(sub.symbol, "BTCUSDT");
1515 assert_eq!(sub.sub_type, SubscriptionType::Ticker);
1516
1517 manager.remove_subscription("btcusdt@ticker").await.unwrap();
1518 assert_eq!(manager.active_count(), 0);
1519 assert!(!manager.has_subscription("btcusdt@ticker").await);
1520 }
1521
1522 #[test]
1523 fn test_symbol_conversion() {
1524 let symbol = "BTC/USDT";
1525 let binance_symbol = symbol.replace('/', "").to_lowercase();
1526 assert_eq!(binance_symbol, "btcusdt");
1527 }
1528}