1mod handlers;
6mod listen_key;
7mod streams;
8mod subscriptions;
9mod user_data;
10
11pub use handlers::MessageRouter;
13pub use listen_key::ListenKeyManager;
14pub use streams::*;
15pub use subscriptions::{ReconnectConfig, Subscription, SubscriptionManager, SubscriptionType};
16
17use crate::binance::{Binance, parser};
18use ccxt_core::error::{Error, Result};
19use ccxt_core::types::{
20 Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
21};
22use ccxt_core::ws_client::{WsClient, WsConfig};
23use serde_json::Value;
24use std::collections::{HashMap, VecDeque};
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::{Mutex, RwLock};
28
29pub struct BinanceWs {
31 pub(crate) client: Arc<WsClient>,
32 listen_key: Arc<RwLock<Option<String>>>,
33 listen_key_manager: Option<Arc<ListenKeyManager>>,
34 auto_reconnect_coordinator: Arc<Mutex<Option<ccxt_core::ws_client::AutoReconnectCoordinator>>>,
35 pub(crate) tickers: Arc<Mutex<HashMap<String, Ticker>>>,
36 pub(crate) bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
37 #[allow(dead_code)]
38 mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
39 pub(crate) orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
40 pub(crate) trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
41 pub(crate) ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
42 pub(crate) balances: Arc<RwLock<HashMap<String, Balance>>>,
43 pub(crate) orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
44 pub(crate) my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
45 pub(crate) positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
46}
47
48impl std::fmt::Debug for BinanceWs {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("BinanceWs")
51 .field("is_connected", &self.client.is_connected())
52 .field("state", &self.client.state())
53 .finish_non_exhaustive()
54 }
55}
56
57impl BinanceWs {
58 pub fn new(url: String) -> Self {
60 let config = WsConfig {
61 url,
62 connect_timeout: 10000,
63 ping_interval: 180000,
64 reconnect_interval: 5000,
65 max_reconnect_attempts: 5,
66 auto_reconnect: true,
67 enable_compression: false,
68 pong_timeout: 90000,
69 ..Default::default()
70 };
71
72 Self {
73 client: Arc::new(WsClient::new(config)),
74 listen_key: Arc::new(RwLock::new(None)),
75 listen_key_manager: None,
76 auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
77 tickers: Arc::new(Mutex::new(HashMap::new())),
78 bids_asks: Arc::new(Mutex::new(HashMap::new())),
79 mark_prices: Arc::new(Mutex::new(HashMap::new())),
80 orderbooks: Arc::new(Mutex::new(HashMap::new())),
81 trades: Arc::new(Mutex::new(HashMap::new())),
82 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
83 balances: Arc::new(RwLock::new(HashMap::new())),
84 orders: Arc::new(RwLock::new(HashMap::new())),
85 my_trades: Arc::new(RwLock::new(HashMap::new())),
86 positions: Arc::new(RwLock::new(HashMap::new())),
87 }
88 }
89
90 pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
92 let config = WsConfig {
93 url,
94 connect_timeout: 10000,
95 ping_interval: 180000,
96 reconnect_interval: 5000,
97 max_reconnect_attempts: 5,
98 auto_reconnect: true,
99 enable_compression: false,
100 pong_timeout: 90000,
101 ..Default::default()
102 };
103
104 Self {
105 client: Arc::new(WsClient::new(config)),
106 listen_key: Arc::new(RwLock::new(None)),
107 listen_key_manager: Some(Arc::new(ListenKeyManager::new(binance))),
108 auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
109 tickers: Arc::new(Mutex::new(HashMap::new())),
110 bids_asks: Arc::new(Mutex::new(HashMap::new())),
111 mark_prices: Arc::new(Mutex::new(HashMap::new())),
112 orderbooks: Arc::new(Mutex::new(HashMap::new())),
113 trades: Arc::new(Mutex::new(HashMap::new())),
114 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
115 balances: Arc::new(RwLock::new(HashMap::new())),
116 orders: Arc::new(RwLock::new(HashMap::new())),
117 my_trades: Arc::new(RwLock::new(HashMap::new())),
118 positions: Arc::new(RwLock::new(HashMap::new())),
119 }
120 }
121
122 pub async fn connect(&self) -> Result<()> {
124 self.client.connect().await?;
125
126 let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
127 if coordinator_guard.is_none() {
128 let coordinator = self.client.clone().create_auto_reconnect_coordinator();
129 coordinator.start().await;
130 *coordinator_guard = Some(coordinator);
131 tracing::info!("Auto-reconnect coordinator started");
132 }
133
134 Ok(())
135 }
136
137 pub async fn disconnect(&self) -> Result<()> {
139 let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
140 if let Some(coordinator) = coordinator_guard.take() {
141 coordinator.stop().await;
142 tracing::info!("Auto-reconnect coordinator stopped");
143 }
144
145 if let Some(manager) = &self.listen_key_manager {
146 manager.stop_auto_refresh().await;
147 }
148
149 self.client.disconnect().await
150 }
151
152 pub async fn connect_user_stream(&self) -> Result<()> {
154 let manager = self.listen_key_manager.as_ref()
155 .ok_or_else(|| Error::invalid_request(
156 "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
157 ))?;
158
159 let listen_key = manager.get_or_create().await?;
160 let _user_stream_url = format!("wss://stream.binance.com:9443/ws/{}", listen_key);
161
162 self.client.connect().await?;
163 manager.start_auto_refresh().await;
164 *self.listen_key.write().await = Some(listen_key);
165
166 Ok(())
167 }
168
169 pub async fn close_user_stream(&self) -> Result<()> {
171 if let Some(manager) = &self.listen_key_manager {
172 manager.delete().await?;
173 }
174 *self.listen_key.write().await = None;
175 Ok(())
176 }
177
178 pub async fn get_listen_key(&self) -> Option<String> {
180 if let Some(manager) = &self.listen_key_manager {
181 manager.get_current().await
182 } else {
183 self.listen_key.read().await.clone()
184 }
185 }
186
187 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
189 let stream = format!("{}@ticker", symbol.to_lowercase());
190 self.client
191 .subscribe(stream, Some(symbol.to_string()), None)
192 .await
193 }
194
195 pub async fn subscribe_all_tickers(&self) -> Result<()> {
197 self.client
198 .subscribe("!ticker@arr".to_string(), None, None)
199 .await
200 }
201
202 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
204 let stream = format!("{}@trade", symbol.to_lowercase());
205 self.client
206 .subscribe(stream, Some(symbol.to_string()), None)
207 .await
208 }
209
210 pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
212 let stream = format!("{}@aggTrade", symbol.to_lowercase());
213 self.client
214 .subscribe(stream, Some(symbol.to_string()), None)
215 .await
216 }
217
218 pub async fn subscribe_orderbook(
220 &self,
221 symbol: &str,
222 levels: u32,
223 update_speed: &str,
224 ) -> Result<()> {
225 let stream = if update_speed == "100ms" {
226 format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
227 } else {
228 format!("{}@depth{}", symbol.to_lowercase(), levels)
229 };
230
231 self.client
232 .subscribe(stream, Some(symbol.to_string()), None)
233 .await
234 }
235
236 pub async fn subscribe_orderbook_diff(
238 &self,
239 symbol: &str,
240 update_speed: Option<&str>,
241 ) -> Result<()> {
242 let stream = if let Some(speed) = update_speed {
243 if speed == "100ms" {
244 format!("{}@depth@100ms", symbol.to_lowercase())
245 } else {
246 format!("{}@depth", symbol.to_lowercase())
247 }
248 } else {
249 format!("{}@depth", symbol.to_lowercase())
250 };
251
252 self.client
253 .subscribe(stream, Some(symbol.to_string()), None)
254 .await
255 }
256
257 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
259 let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
260 self.client
261 .subscribe(stream, Some(symbol.to_string()), None)
262 .await
263 }
264
265 pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
267 let stream = format!("{}@miniTicker", symbol.to_lowercase());
268 self.client
269 .subscribe(stream, Some(symbol.to_string()), None)
270 .await
271 }
272
273 pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
275 self.client
276 .subscribe("!miniTicker@arr".to_string(), None, None)
277 .await
278 }
279
280 pub async fn unsubscribe(&self, stream: String) -> Result<()> {
282 self.client.unsubscribe(stream, None).await
283 }
284
285 pub async fn receive(&self) -> Option<Value> {
287 self.client.receive().await
288 }
289
290 pub fn is_connected(&self) -> bool {
292 self.client.is_connected()
293 }
294
295 pub fn state(&self) -> ccxt_core::ws_client::WsConnectionState {
297 self.client.state()
298 }
299
300 pub fn subscriptions(&self) -> Vec<String> {
306 self.client.subscriptions()
307 }
308
309 pub fn subscription_count(&self) -> usize {
311 self.client.subscription_count()
312 }
313
314 async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
316 let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
317
318 self.client
319 .subscribe(stream.clone(), Some(symbol.to_string()), None)
320 .await?;
321
322 loop {
323 if let Some(message) = self.client.receive().await {
324 if message.get("result").is_some() {
325 continue;
326 }
327
328 if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
329 let mut tickers = self.tickers.lock().await;
330 tickers.insert(ticker.symbol.clone(), ticker.clone());
331 return Ok(ticker);
332 }
333 }
334 }
335 }
336
337 async fn watch_tickers_internal(
339 &self,
340 symbols: Option<Vec<String>>,
341 channel_name: &str,
342 ) -> Result<HashMap<String, Ticker>> {
343 let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
344 syms.iter()
345 .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
346 .collect()
347 } else {
348 vec![format!("!{}@arr", channel_name)]
349 };
350
351 for stream in &streams {
352 self.client.subscribe(stream.clone(), None, None).await?;
353 }
354
355 let mut result = HashMap::new();
356
357 loop {
358 if let Some(message) = self.client.receive().await {
359 if message.get("result").is_some() {
360 continue;
361 }
362
363 if let Some(arr) = message.as_array() {
364 for item in arr {
365 if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
366 let symbol = ticker.symbol.clone();
367
368 if let Some(syms) = &symbols {
369 if syms.contains(&symbol.to_lowercase()) {
370 result.insert(symbol.clone(), ticker.clone());
371 }
372 } else {
373 result.insert(symbol.clone(), ticker.clone());
374 }
375
376 let mut tickers = self.tickers.lock().await;
377 tickers.insert(symbol, ticker);
378 }
379 }
380
381 if let Some(syms) = &symbols {
382 if result.len() == syms.len() {
383 return Ok(result);
384 }
385 } else {
386 return Ok(result);
387 }
388 } else if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
389 let symbol = ticker.symbol.clone();
390 result.insert(symbol.clone(), ticker.clone());
391
392 let mut tickers = self.tickers.lock().await;
393 tickers.insert(symbol, ticker);
394
395 if let Some(syms) = &symbols {
396 if result.len() == syms.len() {
397 return Ok(result);
398 }
399 }
400 }
401 }
402 }
403 }
404
405 async fn handle_orderbook_delta(
407 &self,
408 symbol: &str,
409 delta_message: &Value,
410 is_futures: bool,
411 ) -> Result<()> {
412 handlers::handle_orderbook_delta(symbol, delta_message, is_futures, &self.orderbooks).await
413 }
414
415 async fn fetch_orderbook_snapshot(
417 &self,
418 exchange: &Binance,
419 symbol: &str,
420 limit: Option<i64>,
421 is_futures: bool,
422 ) -> Result<OrderBook> {
423 handlers::fetch_orderbook_snapshot(exchange, symbol, limit, is_futures, &self.orderbooks)
424 .await
425 }
426
427 async fn watch_orderbook_internal(
429 &self,
430 exchange: &Binance,
431 symbol: &str,
432 limit: Option<i64>,
433 update_speed: i32,
434 is_futures: bool,
435 ) -> Result<OrderBook> {
436 let stream = if update_speed == 100 {
437 format!("{}@depth@100ms", symbol.to_lowercase())
438 } else {
439 format!("{}@depth", symbol.to_lowercase())
440 };
441
442 self.client
443 .subscribe(stream.clone(), Some(symbol.to_string()), None)
444 .await?;
445
446 tokio::time::sleep(Duration::from_millis(500)).await;
447
448 let _snapshot = self
449 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
450 .await?;
451
452 loop {
453 if let Some(message) = self.client.receive().await {
454 if message.get("result").is_some() {
455 continue;
456 }
457
458 if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
459 if event_type == "depthUpdate" {
460 match self
461 .handle_orderbook_delta(symbol, &message, is_futures)
462 .await
463 {
464 Ok(()) => {
465 let orderbooks = self.orderbooks.lock().await;
466 if let Some(ob) = orderbooks.get(symbol) {
467 if ob.is_synced {
468 return Ok(ob.clone());
469 }
470 }
471 }
472 Err(e) => {
473 let err_msg = e.to_string();
474
475 if err_msg.contains("RESYNC_NEEDED") {
476 tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
477
478 let current_time = chrono::Utc::now().timestamp_millis();
479 let should_resync = {
480 let orderbooks = self.orderbooks.lock().await;
481 if let Some(ob) = orderbooks.get(symbol) {
482 ob.should_resync(current_time)
483 } else {
484 true
485 }
486 };
487
488 if should_resync {
489 tracing::info!("Initiating resync for {}", symbol);
490
491 {
492 let mut orderbooks = self.orderbooks.lock().await;
493 if let Some(ob) = orderbooks.get_mut(symbol) {
494 ob.reset_for_resync();
495 ob.mark_resync_initiated(current_time);
496 }
497 }
498
499 tokio::time::sleep(Duration::from_millis(500)).await;
500
501 match self
502 .fetch_orderbook_snapshot(
503 exchange, symbol, limit, is_futures,
504 )
505 .await
506 {
507 Ok(_) => {
508 tracing::info!(
509 "Resync completed successfully for {}",
510 symbol
511 );
512 continue;
513 }
514 Err(resync_err) => {
515 tracing::error!(
516 "Resync failed for {}: {}",
517 symbol,
518 resync_err
519 );
520 return Err(resync_err);
521 }
522 }
523 }
524 tracing::debug!("Resync rate limited for {}, skipping", symbol);
525 }
526 tracing::error!("Failed to handle orderbook delta: {}", err_msg);
527 }
528 }
529 }
530 }
531 }
532 }
533 }
534
535 async fn watch_orderbooks_internal(
537 &self,
538 exchange: &Binance,
539 symbols: Vec<String>,
540 limit: Option<i64>,
541 update_speed: i32,
542 is_futures: bool,
543 ) -> Result<HashMap<String, OrderBook>> {
544 if symbols.len() > 200 {
545 return Err(Error::invalid_request(
546 "Binance supports max 200 symbols per connection",
547 ));
548 }
549
550 for symbol in &symbols {
551 let stream = if update_speed == 100 {
552 format!("{}@depth@100ms", symbol.to_lowercase())
553 } else {
554 format!("{}@depth", symbol.to_lowercase())
555 };
556
557 self.client
558 .subscribe(stream, Some(symbol.clone()), None)
559 .await?;
560 }
561
562 tokio::time::sleep(Duration::from_millis(500)).await;
563
564 for symbol in &symbols {
565 let _ = self
566 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
567 .await;
568 }
569
570 let mut result = HashMap::new();
571 let mut update_count = 0;
572
573 while update_count < symbols.len() {
574 if let Some(message) = self.client.receive().await {
575 if message.get("result").is_some() {
576 continue;
577 }
578
579 if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
580 if event_type == "depthUpdate" {
581 if let Some(msg_symbol) =
582 message.get("s").and_then(serde_json::Value::as_str)
583 {
584 if let Err(e) = self
585 .handle_orderbook_delta(msg_symbol, &message, is_futures)
586 .await
587 {
588 tracing::error!("Failed to handle orderbook delta: {}", e);
589 continue;
590 }
591
592 update_count += 1;
593 }
594 }
595 }
596 }
597 }
598
599 let orderbooks = self.orderbooks.lock().await;
600 for symbol in &symbols {
601 if let Some(ob) = orderbooks.get(symbol) {
602 result.insert(symbol.clone(), ob.clone());
603 }
604 }
605
606 Ok(result)
607 }
608
609 pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
611 let tickers = self.tickers.lock().await;
612 tickers.get(symbol).cloned()
613 }
614
615 pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
617 let tickers = self.tickers.lock().await;
618 tickers.clone()
619 }
620
621 async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
623 user_data::handle_balance_message(message, account_type, &self.balances).await
624 }
625
626 fn parse_ws_trade(data: &Value) -> Result<Trade> {
628 user_data::parse_ws_trade(data)
629 }
630
631 async fn filter_my_trades(
633 &self,
634 symbol: Option<&str>,
635 since: Option<i64>,
636 limit: Option<usize>,
637 ) -> Result<Vec<Trade>> {
638 let trades_map = self.my_trades.read().await;
639
640 let mut trades: Vec<Trade> = if let Some(sym) = symbol {
641 trades_map
642 .get(sym)
643 .map(|symbol_trades| symbol_trades.iter().cloned().collect())
644 .unwrap_or_default()
645 } else {
646 trades_map
647 .values()
648 .flat_map(|symbol_trades| symbol_trades.iter().cloned())
649 .collect()
650 };
651
652 if let Some(since_ts) = since {
653 trades.retain(|trade| trade.timestamp >= since_ts);
654 }
655
656 trades.sort_by(|a, b| {
657 let ts_a = a.timestamp;
658 let ts_b = b.timestamp;
659 ts_b.cmp(&ts_a)
660 });
661
662 if let Some(lim) = limit {
663 trades.truncate(lim);
664 }
665
666 Ok(trades)
667 }
668
669 fn parse_ws_position(data: &Value) -> Result<Position> {
671 user_data::parse_ws_position(data)
672 }
673
674 async fn filter_positions(
676 &self,
677 symbols: Option<&[String]>,
678 since: Option<i64>,
679 limit: Option<usize>,
680 ) -> Result<Vec<Position>> {
681 let positions_map = self.positions.read().await;
682
683 let mut positions: Vec<Position> = if let Some(syms) = symbols {
684 syms.iter()
685 .filter_map(|sym| positions_map.get(sym))
686 .flat_map(|side_map| side_map.values().cloned())
687 .collect()
688 } else {
689 positions_map
690 .values()
691 .flat_map(|side_map| side_map.values().cloned())
692 .collect()
693 };
694
695 if let Some(since_ts) = since {
696 positions.retain(|pos| pos.timestamp.is_some_and(|ts| ts >= since_ts));
697 }
698
699 positions.sort_by(|a, b| {
700 let ts_a = a.timestamp.unwrap_or(0);
701 let ts_b = b.timestamp.unwrap_or(0);
702 ts_b.cmp(&ts_a)
703 });
704
705 if let Some(lim) = limit {
706 positions.truncate(lim);
707 }
708
709 Ok(positions)
710 }
711}
712
713include!("binance_impl.rs");
715
716#[cfg(test)]
717mod tests {
718 use super::*;
719
720 #[test]
721 fn test_binance_ws_creation() {
722 let ws = BinanceWs::new(WS_BASE_URL.to_string());
723 assert!(ws.listen_key.try_read().is_ok());
724 }
725
726 #[test]
727 fn test_stream_format() {
728 let symbol = "btcusdt";
729
730 let ticker_stream = format!("{}@ticker", symbol);
731 assert_eq!(ticker_stream, "btcusdt@ticker");
732
733 let trade_stream = format!("{}@trade", symbol);
734 assert_eq!(trade_stream, "btcusdt@trade");
735
736 let depth_stream = format!("{}@depth20", symbol);
737 assert_eq!(depth_stream, "btcusdt@depth20");
738
739 let kline_stream = format!("{}@kline_1m", symbol);
740 assert_eq!(kline_stream, "btcusdt@kline_1m");
741 }
742
743 #[tokio::test]
744 async fn test_subscription_manager_basic() {
745 let manager = SubscriptionManager::new();
746 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
747
748 assert_eq!(manager.active_count(), 0);
749 assert!(!manager.has_subscription("btcusdt@ticker").await);
750
751 manager
752 .add_subscription(
753 "btcusdt@ticker".to_string(),
754 "BTCUSDT".to_string(),
755 SubscriptionType::Ticker,
756 tx.clone(),
757 )
758 .await
759 .unwrap();
760
761 assert_eq!(manager.active_count(), 1);
762 assert!(manager.has_subscription("btcusdt@ticker").await);
763
764 let sub = manager.get_subscription("btcusdt@ticker").await;
765 assert!(sub.is_some());
766 let sub = sub.unwrap();
767 assert_eq!(sub.stream, "btcusdt@ticker");
768 assert_eq!(sub.symbol, "BTCUSDT");
769 assert_eq!(sub.sub_type, SubscriptionType::Ticker);
770
771 manager.remove_subscription("btcusdt@ticker").await.unwrap();
772 assert_eq!(manager.active_count(), 0);
773 assert!(!manager.has_subscription("btcusdt@ticker").await);
774 }
775
776 #[test]
777 fn test_symbol_conversion() {
778 let symbol = "BTC/USDT";
779 let binance_symbol = symbol.replace('/', "").to_lowercase();
780 assert_eq!(binance_symbol, "btcusdt");
781 }
782}