1use crate::binance::Binance;
6use crate::binance::parser;
7use ccxt_core::error::{Error, Result};
8use ccxt_core::types::financial::{Amount, Cost, Price};
9use ccxt_core::types::{
10 Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
11};
12use ccxt_core::ws_client::{WsClient, WsConfig};
13use serde_json::Value;
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock};
18use tokio::task::JoinHandle;
19use tokio_tungstenite::tungstenite::protocol::Message;
20
21#[allow(dead_code)]
23const WS_BASE_URL: &str = "wss://stream.binance.com:9443/ws";
24#[allow(dead_code)]
25const WS_TESTNET_URL: &str = "wss://testnet.binance.vision/ws";
26
27const LISTEN_KEY_REFRESH_INTERVAL: Duration = Duration::from_secs(30 * 60);
29
30pub struct ListenKeyManager {
38 binance: Arc<Binance>,
40 listen_key: Arc<RwLock<Option<String>>>,
42 created_at: Arc<RwLock<Option<Instant>>>,
44 refresh_interval: Duration,
46 refresh_task: Arc<Mutex<Option<JoinHandle<()>>>>,
48}
49
50impl ListenKeyManager {
51 pub fn new(binance: Arc<Binance>) -> Self {
59 Self {
60 binance,
61 listen_key: Arc::new(RwLock::new(None)),
62 created_at: Arc::new(RwLock::new(None)),
63 refresh_interval: LISTEN_KEY_REFRESH_INTERVAL,
64 refresh_task: Arc::new(Mutex::new(None)),
65 }
66 }
67
68 pub async fn get_or_create(&self) -> Result<String> {
79 let key_opt = self.listen_key.read().await.clone();
81
82 if let Some(key) = key_opt {
83 let created = self.created_at.read().await;
85 if let Some(created_time) = *created {
86 let elapsed = created_time.elapsed();
87 if elapsed > Duration::from_secs(50 * 60) {
89 drop(created);
90 return self.create_new().await;
91 }
92 }
93 return Ok(key);
94 }
95
96 self.create_new().await
98 }
99
100 async fn create_new(&self) -> Result<String> {
105 let key = self.binance.create_listen_key().await?;
106
107 *self.listen_key.write().await = Some(key.clone());
109 *self.created_at.write().await = Some(Instant::now());
110
111 Ok(key)
112 }
113
114 pub async fn refresh(&self) -> Result<()> {
121 let key_opt = self.listen_key.read().await.clone();
122
123 if let Some(key) = key_opt {
124 self.binance.refresh_listen_key(&key).await?;
125 *self.created_at.write().await = Some(Instant::now());
127 Ok(())
128 } else {
129 Err(Error::invalid_request("No listen key to refresh"))
130 }
131 }
132
133 pub async fn start_auto_refresh(&self) {
137 self.stop_auto_refresh().await;
139
140 let listen_key = self.listen_key.clone();
141 let created_at = self.created_at.clone();
142 let binance = self.binance.clone();
143 let interval = self.refresh_interval;
144
145 let handle = tokio::spawn(async move {
146 loop {
147 tokio::time::sleep(interval).await;
148
149 let key_opt = listen_key.read().await.clone();
151 if let Some(key) = key_opt {
152 match binance.refresh_listen_key(&key).await {
154 Ok(_) => {
155 *created_at.write().await = Some(Instant::now());
156 }
158 Err(_e) => {
159 *listen_key.write().await = None;
161 *created_at.write().await = None;
162 break;
163 }
164 }
165 } else {
166 break;
168 }
169 }
170 });
171
172 *self.refresh_task.lock().await = Some(handle);
173 }
174
175 pub async fn stop_auto_refresh(&self) {
177 let mut task_opt = self.refresh_task.lock().await;
178 if let Some(handle) = task_opt.take() {
179 handle.abort();
180 }
181 }
182
183 pub async fn delete(&self) -> Result<()> {
190 self.stop_auto_refresh().await;
192
193 let key_opt = self.listen_key.read().await.clone();
194
195 if let Some(key) = key_opt {
196 self.binance.delete_listen_key(&key).await?;
197
198 *self.listen_key.write().await = None;
200 *self.created_at.write().await = None;
201
202 Ok(())
203 } else {
204 Ok(()) }
206 }
207
208 pub async fn get_current(&self) -> Option<String> {
210 self.listen_key.read().await.clone()
211 }
212
213 pub async fn is_valid(&self) -> bool {
215 let key_opt = self.listen_key.read().await;
216 if key_opt.is_none() {
217 return false;
218 }
219
220 let created = self.created_at.read().await;
221 if let Some(created_time) = *created {
222 created_time.elapsed() < Duration::from_secs(55 * 60)
224 } else {
225 false
226 }
227 }
228}
229
230impl Drop for ListenKeyManager {
231 fn drop(&mut self) {
232 }
235}
236#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum SubscriptionType {
239 Ticker,
241 OrderBook,
243 Trades,
245 Kline(String),
247 Balance,
249 Orders,
251 Positions,
253 MyTrades,
255 MarkPrice,
257 BookTicker,
259}
260
261impl SubscriptionType {
262 pub fn from_stream(stream: &str) -> Option<Self> {
270 if stream.contains("@ticker") {
271 Some(Self::Ticker)
272 } else if stream.contains("@depth") {
273 Some(Self::OrderBook)
274 } else if stream.contains("@trade") || stream.contains("@aggTrade") {
275 Some(Self::Trades)
276 } else if stream.contains("@kline_") {
277 let parts: Vec<&str> = stream.split("@kline_").collect();
279 if parts.len() == 2 {
280 Some(Self::Kline(parts[1].to_string()))
281 } else {
282 None
283 }
284 } else if stream.contains("@markPrice") {
285 Some(Self::MarkPrice)
286 } else if stream.contains("@bookTicker") {
287 Some(Self::BookTicker)
288 } else {
289 None
290 }
291 }
292}
293
294#[derive(Clone)]
296pub struct Subscription {
297 pub stream: String,
299 pub symbol: String,
301 pub sub_type: SubscriptionType,
303 pub subscribed_at: Instant,
305 pub sender: tokio::sync::mpsc::UnboundedSender<Value>,
307}
308
309impl Subscription {
310 pub fn new(
312 stream: String,
313 symbol: String,
314 sub_type: SubscriptionType,
315 sender: tokio::sync::mpsc::UnboundedSender<Value>,
316 ) -> Self {
317 Self {
318 stream,
319 symbol,
320 sub_type,
321 subscribed_at: Instant::now(),
322 sender,
323 }
324 }
325
326 pub fn send(&self, message: Value) -> bool {
334 self.sender.send(message).is_ok()
335 }
336}
337
338pub struct SubscriptionManager {
345 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
347 symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
349 active_count: Arc<std::sync::atomic::AtomicUsize>,
351}
352
353impl SubscriptionManager {
354 pub fn new() -> Self {
356 Self {
357 subscriptions: Arc::new(RwLock::new(HashMap::new())),
358 symbol_index: Arc::new(RwLock::new(HashMap::new())),
359 active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
360 }
361 }
362
363 pub async fn add_subscription(
374 &self,
375 stream: String,
376 symbol: String,
377 sub_type: SubscriptionType,
378 sender: tokio::sync::mpsc::UnboundedSender<Value>,
379 ) -> Result<()> {
380 let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
381
382 let mut subs = self.subscriptions.write().await;
384 subs.insert(stream.clone(), subscription);
385
386 let mut index = self.symbol_index.write().await;
388 index.entry(symbol).or_insert_with(Vec::new).push(stream);
389
390 self.active_count
392 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
393
394 Ok(())
395 }
396
397 pub async fn remove_subscription(&self, stream: &str) -> Result<()> {
405 let mut subs = self.subscriptions.write().await;
406
407 if let Some(subscription) = subs.remove(stream) {
408 let mut index = self.symbol_index.write().await;
410 if let Some(streams) = index.get_mut(&subscription.symbol) {
411 streams.retain(|s| s != stream);
412 if streams.is_empty() {
414 index.remove(&subscription.symbol);
415 }
416 }
417
418 self.active_count
420 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
421 }
422
423 Ok(())
424 }
425
426 pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
434 let subs = self.subscriptions.read().await;
435 subs.get(stream).cloned()
436 }
437
438 pub async fn has_subscription(&self, stream: &str) -> bool {
446 let subs = self.subscriptions.read().await;
447 subs.contains_key(stream)
448 }
449
450 pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
452 let subs = self.subscriptions.read().await;
453 subs.values().cloned().collect()
454 }
455
456 pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
461 let index = self.symbol_index.read().await;
462 let subs = self.subscriptions.read().await;
463
464 if let Some(streams) = index.get(symbol) {
465 streams
466 .iter()
467 .filter_map(|stream| subs.get(stream).cloned())
468 .collect()
469 } else {
470 Vec::new()
471 }
472 }
473
474 pub fn active_count(&self) -> usize {
476 self.active_count.load(std::sync::atomic::Ordering::SeqCst)
477 }
478
479 pub async fn clear(&self) {
481 let mut subs = self.subscriptions.write().await;
482 let mut index = self.symbol_index.write().await;
483
484 subs.clear();
485 index.clear();
486 self.active_count
487 .store(0, std::sync::atomic::Ordering::SeqCst);
488 }
489
490 pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
499 let subs = self.subscriptions.read().await;
500 if let Some(subscription) = subs.get(stream) {
501 subscription.send(message)
502 } else {
503 false
504 }
505 }
506
507 pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
516 let index = self.symbol_index.read().await;
517 let subs = self.subscriptions.read().await;
518
519 let mut sent_count = 0;
520
521 if let Some(streams) = index.get(symbol) {
522 for stream in streams {
523 if let Some(subscription) = subs.get(stream) {
524 if subscription.send(message.clone()) {
525 sent_count += 1;
526 }
527 }
528 }
529 }
530
531 sent_count
532 }
533}
534#[derive(Debug, Clone)]
538pub struct ReconnectConfig {
539 pub enabled: bool,
541
542 pub initial_delay_ms: u64,
544
545 pub max_delay_ms: u64,
547
548 pub backoff_multiplier: f64,
550
551 pub max_attempts: usize,
553}
554
555impl Default for ReconnectConfig {
556 fn default() -> Self {
557 Self {
558 enabled: true,
559 initial_delay_ms: 1000, max_delay_ms: 30000, backoff_multiplier: 2.0, max_attempts: 0, }
564 }
565}
566
567impl ReconnectConfig {
568 pub fn calculate_delay(&self, attempt: usize) -> u64 {
578 let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
579 delay.min(self.max_delay_ms as f64) as u64
580 }
581
582 pub fn should_retry(&self, attempt: usize) -> bool {
590 self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
591 }
592}
593
594pub struct MessageRouter {
602 ws_client: Arc<RwLock<Option<WsClient>>>,
604
605 subscription_manager: Arc<SubscriptionManager>,
607
608 router_task: Arc<Mutex<Option<JoinHandle<()>>>>,
610
611 is_connected: Arc<std::sync::atomic::AtomicBool>,
613
614 reconnect_config: Arc<RwLock<ReconnectConfig>>,
616
617 ws_url: String,
619
620 request_id: Arc<std::sync::atomic::AtomicU64>,
622}
623
624impl MessageRouter {
625 pub fn new(ws_url: String, subscription_manager: Arc<SubscriptionManager>) -> Self {
634 Self {
635 ws_client: Arc::new(RwLock::new(None)),
636 subscription_manager,
637 router_task: Arc::new(Mutex::new(None)),
638 is_connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
639 reconnect_config: Arc::new(RwLock::new(ReconnectConfig::default())),
640 ws_url,
641 request_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
642 }
643 }
644
645 pub async fn start(&self) -> Result<()> {
652 if self.is_connected() {
654 self.stop().await?;
655 }
656
657 let config = WsConfig {
659 url: self.ws_url.clone(),
660 ..Default::default()
661 };
662 let client = WsClient::new(config);
663 client.connect().await?;
664
665 *self.ws_client.write().await = Some(client);
667
668 self.is_connected
670 .store(true, std::sync::atomic::Ordering::SeqCst);
671
672 let ws_client = self.ws_client.clone();
674 let subscription_manager = self.subscription_manager.clone();
675 let is_connected = self.is_connected.clone();
676 let reconnect_config = self.reconnect_config.clone();
677 let ws_url = self.ws_url.clone();
678
679 let handle = tokio::spawn(async move {
680 Self::message_loop(
681 ws_client,
682 subscription_manager,
683 is_connected,
684 reconnect_config,
685 ws_url,
686 )
687 .await
688 });
689
690 *self.router_task.lock().await = Some(handle);
691
692 Ok(())
693 }
694
695 pub async fn stop(&self) -> Result<()> {
702 self.is_connected
704 .store(false, std::sync::atomic::Ordering::SeqCst);
705
706 let mut task_opt = self.router_task.lock().await;
708 if let Some(handle) = task_opt.take() {
709 handle.abort();
710 }
711
712 let mut client_opt = self.ws_client.write().await;
714 if let Some(client) = client_opt.take() {
715 let _ = client.disconnect().await;
716 }
717
718 Ok(())
719 }
720
721 pub async fn restart(&self) -> Result<()> {
728 self.stop().await?;
729 tokio::time::sleep(Duration::from_millis(100)).await;
730 self.start().await
731 }
732
733 pub fn is_connected(&self) -> bool {
735 self.is_connected.load(std::sync::atomic::Ordering::SeqCst)
736 }
737
738 pub async fn set_reconnect_config(&self, config: ReconnectConfig) {
743 *self.reconnect_config.write().await = config;
744 }
745
746 pub async fn get_reconnect_config(&self) -> ReconnectConfig {
748 self.reconnect_config.read().await.clone()
749 }
750
751 pub async fn subscribe(&self, streams: Vec<String>) -> Result<()> {
758 if streams.is_empty() {
759 return Ok(());
760 }
761
762 let client_opt = self.ws_client.read().await;
763 let client = client_opt
764 .as_ref()
765 .ok_or_else(|| Error::network("WebSocket not connected"))?;
766
767 let id = self
769 .request_id
770 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
771
772 #[allow(clippy::disallowed_methods)]
774 let request = serde_json::json!({
775 "method": "SUBSCRIBE",
776 "params": streams,
777 "id": id
778 });
779
780 client
782 .send(Message::Text(request.to_string().into()))
783 .await?;
784
785 Ok(())
786 }
787
788 pub async fn unsubscribe(&self, streams: Vec<String>) -> Result<()> {
795 if streams.is_empty() {
796 return Ok(());
797 }
798
799 let client_opt = self.ws_client.read().await;
800 let client = client_opt
801 .as_ref()
802 .ok_or_else(|| Error::network("WebSocket not connected"))?;
803
804 let id = self
806 .request_id
807 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
808
809 #[allow(clippy::disallowed_methods)]
811 let request = serde_json::json!({
812 "method": "UNSUBSCRIBE",
813 "params": streams,
814 "id": id
815 });
816
817 client
819 .send(Message::Text(request.to_string().into()))
820 .await?;
821
822 Ok(())
823 }
824
825 async fn message_loop(
829 ws_client: Arc<RwLock<Option<WsClient>>>,
830 subscription_manager: Arc<SubscriptionManager>,
831 is_connected: Arc<std::sync::atomic::AtomicBool>,
832 reconnect_config: Arc<RwLock<ReconnectConfig>>,
833 ws_url: String,
834 ) {
835 let mut reconnect_attempt = 0;
836
837 loop {
838 if !is_connected.load(std::sync::atomic::Ordering::SeqCst) {
840 break;
841 }
842
843 let has_client = ws_client.read().await.is_some();
845
846 if !has_client {
847 let config = reconnect_config.read().await;
849 if config.should_retry(reconnect_attempt) {
850 let delay = config.calculate_delay(reconnect_attempt);
851 drop(config);
852
853 tokio::time::sleep(Duration::from_millis(delay)).await;
854
855 match Self::reconnect(&ws_url, ws_client.clone()).await {
856 Ok(_) => {
857 reconnect_attempt = 0; continue;
859 }
860 Err(_) => {
861 reconnect_attempt += 1;
862 continue;
863 }
864 }
865 } else {
866 is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
868 break;
869 }
870 }
871
872 let message_opt = {
874 let guard = ws_client.read().await;
875 if let Some(client) = guard.as_ref() {
876 client.receive().await
877 } else {
878 None
879 }
880 };
881
882 match message_opt {
883 Some(value) => {
884 if let Err(_e) = Self::handle_message(value, subscription_manager.clone()).await
886 {
887 continue;
888 }
889
890 reconnect_attempt = 0;
892 }
893 None => {
894 let config = reconnect_config.read().await;
896 if config.should_retry(reconnect_attempt) {
897 let delay = config.calculate_delay(reconnect_attempt);
898 drop(config);
899
900 tokio::time::sleep(Duration::from_millis(delay)).await;
901
902 match Self::reconnect(&ws_url, ws_client.clone()).await {
903 Ok(_) => {
904 reconnect_attempt = 0;
905 continue;
906 }
907 Err(_) => {
908 reconnect_attempt += 1;
909 continue;
910 }
911 }
912 } else {
913 is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
915 break;
916 }
917 }
918 }
919 }
920 }
921
922 async fn handle_message(
926 message: Value,
927 subscription_manager: Arc<SubscriptionManager>,
928 ) -> Result<()> {
929 let stream_name = Self::extract_stream_name(&message)?;
931
932 let sent = subscription_manager
934 .send_to_stream(&stream_name, message)
935 .await;
936
937 if sent {
938 Ok(())
939 } else {
940 Err(Error::generic("No subscribers for stream"))
941 }
942 }
943
944 fn extract_stream_name(message: &Value) -> Result<String> {
956 if let Some(stream) = message.get("stream").and_then(|s| s.as_str()) {
958 return Ok(stream.to_string());
959 }
960
961 if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
963 if let Some(symbol) = message.get("s").and_then(|s| s.as_str()) {
964 let stream = match event_type {
966 "24hrTicker" => format!("{}@ticker", symbol.to_lowercase()),
967 "depthUpdate" => format!("{}@depth", symbol.to_lowercase()),
968 "aggTrade" => format!("{}@aggTrade", symbol.to_lowercase()),
969 "trade" => format!("{}@trade", symbol.to_lowercase()),
970 "kline" => {
971 if let Some(kline) = message.get("k") {
973 if let Some(interval) = kline.get("i").and_then(|i| i.as_str()) {
974 format!("{}@kline_{}", symbol.to_lowercase(), interval)
975 } else {
976 return Err(Error::generic("Missing kline interval"));
977 }
978 } else {
979 return Err(Error::generic("Missing kline data"));
980 }
981 }
982 "markPriceUpdate" => format!("{}@markPrice", symbol.to_lowercase()),
983 "bookTicker" => format!("{}@bookTicker", symbol.to_lowercase()),
984 _ => {
985 return Err(Error::generic(format!(
986 "Unknown event type: {}",
987 event_type
988 )));
989 }
990 };
991 return Ok(stream);
992 }
993 }
994
995 if message.get("result").is_some() || message.get("error").is_some() {
997 return Err(Error::generic("Subscription response, skip routing"));
998 }
999
1000 Err(Error::generic("Cannot extract stream name from message"))
1001 }
1002
1003 async fn reconnect(ws_url: &str, ws_client: Arc<RwLock<Option<WsClient>>>) -> Result<()> {
1007 {
1009 let mut client_opt = ws_client.write().await;
1010 if let Some(client) = client_opt.take() {
1011 let _ = client.disconnect().await;
1012 }
1013 }
1014
1015 let config = WsConfig {
1017 url: ws_url.to_string(),
1018 ..Default::default()
1019 };
1020 let new_client = WsClient::new(config);
1021
1022 new_client.connect().await?;
1024
1025 *ws_client.write().await = Some(new_client);
1027
1028 Ok(())
1029 }
1030}
1031
1032impl Drop for MessageRouter {
1033 fn drop(&mut self) {
1034 }
1037}
1038
1039impl Default for SubscriptionManager {
1040 fn default() -> Self {
1041 Self::new()
1042 }
1043}
1044
1045pub struct BinanceWs {
1047 client: Arc<WsClient>,
1048 listen_key: Arc<RwLock<Option<String>>>,
1049 listen_key_manager: Option<Arc<ListenKeyManager>>,
1051 auto_reconnect_coordinator: Arc<Mutex<Option<ccxt_core::ws_client::AutoReconnectCoordinator>>>,
1053 tickers: Arc<Mutex<HashMap<String, Ticker>>>,
1055 bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
1057 #[allow(dead_code)]
1059 mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
1060 orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
1062 trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
1064 ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
1066 balances: Arc<RwLock<HashMap<String, Balance>>>,
1068 orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
1070 my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
1072 positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
1074}
1075
1076impl BinanceWs {
1077 pub fn new(url: String) -> Self {
1085 let config = WsConfig {
1086 url,
1087 connect_timeout: 10000,
1088 ping_interval: 180000, reconnect_interval: 5000,
1090 max_reconnect_attempts: 5,
1091 auto_reconnect: true,
1092 enable_compression: false,
1093 pong_timeout: 90000, };
1095
1096 Self {
1097 client: Arc::new(WsClient::new(config)),
1098 listen_key: Arc::new(RwLock::new(None)),
1099 listen_key_manager: None,
1100 auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1101 tickers: Arc::new(Mutex::new(HashMap::new())),
1102 bids_asks: Arc::new(Mutex::new(HashMap::new())),
1103 mark_prices: Arc::new(Mutex::new(HashMap::new())),
1104 orderbooks: Arc::new(Mutex::new(HashMap::new())),
1105 trades: Arc::new(Mutex::new(HashMap::new())),
1106 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1107 balances: Arc::new(RwLock::new(HashMap::new())),
1108 orders: Arc::new(RwLock::new(HashMap::new())),
1109 my_trades: Arc::new(RwLock::new(HashMap::new())),
1110 positions: Arc::new(RwLock::new(HashMap::new())),
1111 }
1112 }
1113
1114 pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
1123 let config = WsConfig {
1124 url,
1125 connect_timeout: 10000,
1126 ping_interval: 180000, reconnect_interval: 5000,
1128 max_reconnect_attempts: 5,
1129 auto_reconnect: true,
1130 enable_compression: false,
1131 pong_timeout: 90000, };
1133
1134 Self {
1135 client: Arc::new(WsClient::new(config)),
1136 listen_key: Arc::new(RwLock::new(None)),
1137 listen_key_manager: Some(Arc::new(ListenKeyManager::new(binance))),
1138 auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1139 tickers: Arc::new(Mutex::new(HashMap::new())),
1140 bids_asks: Arc::new(Mutex::new(HashMap::new())),
1141 mark_prices: Arc::new(Mutex::new(HashMap::new())),
1142 orderbooks: Arc::new(Mutex::new(HashMap::new())),
1143 trades: Arc::new(Mutex::new(HashMap::new())),
1144 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1145 balances: Arc::new(RwLock::new(HashMap::new())),
1146 orders: Arc::new(RwLock::new(HashMap::new())),
1147 my_trades: Arc::new(RwLock::new(HashMap::new())),
1148 positions: Arc::new(RwLock::new(HashMap::new())),
1149 }
1150 }
1151
1152 pub async fn connect(&self) -> Result<()> {
1154 self.client.connect().await?;
1156
1157 let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1159 if coordinator_guard.is_none() {
1160 let coordinator = self.client.clone().create_auto_reconnect_coordinator();
1161 coordinator.start().await;
1162 *coordinator_guard = Some(coordinator);
1163 tracing::info!("Auto-reconnect coordinator started");
1164 }
1165
1166 Ok(())
1167 }
1168
1169 pub async fn disconnect(&self) -> Result<()> {
1171 let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1173 if let Some(coordinator) = coordinator_guard.take() {
1174 coordinator.stop().await;
1175 tracing::info!("Auto-reconnect coordinator stopped");
1176 }
1177
1178 if let Some(manager) = &self.listen_key_manager {
1180 manager.stop_auto_refresh().await;
1181 }
1182
1183 self.client.disconnect().await
1184 }
1185
1186 pub async fn connect_user_stream(&self) -> Result<()> {
1210 let manager = self.listen_key_manager.as_ref()
1211 .ok_or_else(|| Error::invalid_request(
1212 "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
1213 ))?;
1214
1215 let listen_key = manager.get_or_create().await?;
1217
1218 let user_stream_url = format!("wss://stream.binance.com:9443/ws/{}", listen_key);
1220
1221 let config = WsConfig {
1223 url: user_stream_url,
1224 connect_timeout: 10000,
1225 ping_interval: 180000,
1226 reconnect_interval: 5000,
1227 max_reconnect_attempts: 5,
1228 auto_reconnect: true,
1229 enable_compression: false,
1230 pong_timeout: 90000, };
1232
1233 let _new_client = Arc::new(WsClient::new(config));
1235 self.client.connect().await?;
1239
1240 manager.start_auto_refresh().await;
1242
1243 *self.listen_key.write().await = Some(listen_key);
1245
1246 Ok(())
1247 }
1248
1249 pub async fn close_user_stream(&self) -> Result<()> {
1256 if let Some(manager) = &self.listen_key_manager {
1257 manager.delete().await?;
1258 }
1259 *self.listen_key.write().await = None;
1260 Ok(())
1261 }
1262
1263 pub async fn get_listen_key(&self) -> Option<String> {
1265 if let Some(manager) = &self.listen_key_manager {
1266 manager.get_current().await
1267 } else {
1268 self.listen_key.read().await.clone()
1269 }
1270 }
1271
1272 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
1280 let stream = format!("{}@ticker", symbol.to_lowercase());
1281 self.client
1282 .subscribe(stream, Some(symbol.to_string()), None)
1283 .await
1284 }
1285
1286 pub async fn subscribe_all_tickers(&self) -> Result<()> {
1288 self.client
1289 .subscribe("!ticker@arr".to_string(), None, None)
1290 .await
1291 }
1292
1293 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
1301 let stream = format!("{}@trade", symbol.to_lowercase());
1302 self.client
1303 .subscribe(stream, Some(symbol.to_string()), None)
1304 .await
1305 }
1306
1307 pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
1315 let stream = format!("{}@aggTrade", symbol.to_lowercase());
1316 self.client
1317 .subscribe(stream, Some(symbol.to_string()), None)
1318 .await
1319 }
1320
1321 pub async fn subscribe_orderbook(
1331 &self,
1332 symbol: &str,
1333 levels: u32,
1334 update_speed: &str,
1335 ) -> Result<()> {
1336 let stream = if update_speed == "100ms" {
1337 format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
1338 } else {
1339 format!("{}@depth{}", symbol.to_lowercase(), levels)
1340 };
1341
1342 self.client
1343 .subscribe(stream, Some(symbol.to_string()), None)
1344 .await
1345 }
1346
1347 pub async fn subscribe_orderbook_diff(
1356 &self,
1357 symbol: &str,
1358 update_speed: Option<&str>,
1359 ) -> Result<()> {
1360 let stream = if let Some(speed) = update_speed {
1361 if speed == "100ms" {
1362 format!("{}@depth@100ms", symbol.to_lowercase())
1363 } else {
1364 format!("{}@depth", symbol.to_lowercase())
1365 }
1366 } else {
1367 format!("{}@depth", symbol.to_lowercase())
1368 };
1369
1370 self.client
1371 .subscribe(stream, Some(symbol.to_string()), None)
1372 .await
1373 }
1374
1375 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
1384 let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
1385 self.client
1386 .subscribe(stream, Some(symbol.to_string()), None)
1387 .await
1388 }
1389
1390 pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
1398 let stream = format!("{}@miniTicker", symbol.to_lowercase());
1399 self.client
1400 .subscribe(stream, Some(symbol.to_string()), None)
1401 .await
1402 }
1403
1404 pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
1406 self.client
1407 .subscribe("!miniTicker@arr".to_string(), None, None)
1408 .await
1409 }
1410
1411 pub async fn unsubscribe(&self, stream: String) -> Result<()> {
1419 self.client.unsubscribe(stream, None).await
1420 }
1421
1422 pub async fn receive(&self) -> Option<Value> {
1427 self.client.receive().await
1428 }
1429
1430 pub async fn is_connected(&self) -> bool {
1432 self.client.is_connected().await
1433 }
1434
1435 async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
1444 let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
1445
1446 self.client
1448 .subscribe(stream.clone(), Some(symbol.to_string()), None)
1449 .await?;
1450
1451 loop {
1453 if let Some(message) = self.client.receive().await {
1454 if message.get("result").is_some() {
1456 continue;
1457 }
1458
1459 if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1461 let mut tickers = self.tickers.lock().await;
1463 tickers.insert(ticker.symbol.clone(), ticker.clone());
1464
1465 return Ok(ticker);
1466 }
1467 }
1468 }
1469 }
1470
1471 async fn watch_tickers_internal(
1480 &self,
1481 symbols: Option<Vec<String>>,
1482 channel_name: &str,
1483 ) -> Result<HashMap<String, Ticker>> {
1484 let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
1485 syms.iter()
1487 .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
1488 .collect()
1489 } else {
1490 vec![format!("!{}@arr", channel_name)]
1492 };
1493
1494 for stream in &streams {
1496 self.client.subscribe(stream.clone(), None, None).await?;
1497 }
1498
1499 let mut result = HashMap::new();
1501
1502 loop {
1503 if let Some(message) = self.client.receive().await {
1504 if message.get("result").is_some() {
1506 continue;
1507 }
1508
1509 if let Some(arr) = message.as_array() {
1511 for item in arr {
1512 if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
1513 let symbol = ticker.symbol.clone();
1514
1515 if let Some(syms) = &symbols {
1517 if syms.contains(&symbol.to_lowercase()) {
1518 result.insert(symbol.clone(), ticker.clone());
1519 }
1520 } else {
1521 result.insert(symbol.clone(), ticker.clone());
1522 }
1523
1524 let mut tickers = self.tickers.lock().await;
1526 tickers.insert(symbol, ticker);
1527 }
1528 }
1529
1530 if let Some(syms) = &symbols {
1532 if result.len() == syms.len() {
1533 return Ok(result);
1534 }
1535 } else {
1536 return Ok(result);
1537 }
1538 } else if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1539 let symbol = ticker.symbol.clone();
1541 result.insert(symbol.clone(), ticker.clone());
1542
1543 let mut tickers = self.tickers.lock().await;
1545 tickers.insert(symbol, ticker);
1546
1547 if let Some(syms) = &symbols {
1549 if result.len() == syms.len() {
1550 return Ok(result);
1551 }
1552 }
1553 }
1554 }
1555 }
1556 }
1557
1558 async fn handle_orderbook_delta(
1568 &self,
1569 symbol: &str,
1570 delta_message: &Value,
1571 is_futures: bool,
1572 ) -> Result<()> {
1573 use ccxt_core::types::orderbook::{OrderBookDelta, OrderBookEntry};
1574 use rust_decimal::Decimal;
1575
1576 let first_update_id = delta_message["U"]
1578 .as_i64()
1579 .ok_or_else(|| Error::invalid_request("Missing first update ID in delta message"))?;
1580
1581 let final_update_id = delta_message["u"]
1582 .as_i64()
1583 .ok_or_else(|| Error::invalid_request("Missing final update ID in delta message"))?;
1584
1585 let prev_final_update_id = if is_futures {
1586 delta_message["pu"].as_i64()
1587 } else {
1588 None
1589 };
1590
1591 let timestamp = delta_message["E"]
1592 .as_i64()
1593 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1594
1595 let mut bids = Vec::new();
1597 if let Some(bids_arr) = delta_message["b"].as_array() {
1598 for bid in bids_arr {
1599 if let (Some(price_str), Some(amount_str)) = (bid[0].as_str(), bid[1].as_str()) {
1600 if let (Ok(price), Ok(amount)) =
1601 (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1602 {
1603 bids.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1604 }
1605 }
1606 }
1607 }
1608
1609 let mut asks = Vec::new();
1611 if let Some(asks_arr) = delta_message["a"].as_array() {
1612 for ask in asks_arr {
1613 if let (Some(price_str), Some(amount_str)) = (ask[0].as_str(), ask[1].as_str()) {
1614 if let (Ok(price), Ok(amount)) =
1615 (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1616 {
1617 asks.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1618 }
1619 }
1620 }
1621 }
1622
1623 let delta = OrderBookDelta {
1625 symbol: symbol.to_string(),
1626 first_update_id,
1627 final_update_id,
1628 prev_final_update_id,
1629 timestamp,
1630 bids,
1631 asks,
1632 };
1633
1634 let mut orderbooks = self.orderbooks.lock().await;
1636 let orderbook = orderbooks
1637 .entry(symbol.to_string())
1638 .or_insert_with(|| OrderBook::new(symbol.to_string(), timestamp));
1639
1640 if !orderbook.is_synced {
1642 orderbook.buffer_delta(delta);
1643 return Ok(());
1644 }
1645
1646 if let Err(e) = orderbook.apply_delta(&delta, is_futures) {
1648 if orderbook.needs_resync {
1650 tracing::warn!("Orderbook {} needs resync due to: {}", symbol, e);
1651 orderbook.buffer_delta(delta);
1653 return Err(Error::invalid_request(format!("RESYNC_NEEDED: {}", e)));
1655 }
1656 return Err(Error::invalid_request(e));
1657 }
1658
1659 Ok(())
1660 }
1661
1662 async fn fetch_orderbook_snapshot(
1673 &self,
1674 exchange: &Binance,
1675 symbol: &str,
1676 limit: Option<i64>,
1677 is_futures: bool,
1678 ) -> Result<OrderBook> {
1679 let mut params = HashMap::new();
1681 if let Some(l) = limit {
1682 #[allow(clippy::disallowed_methods)]
1684 let limit_value = serde_json::json!(l);
1685 params.insert("limit".to_string(), limit_value);
1686 }
1687
1688 let mut snapshot = exchange.fetch_order_book(symbol, None).await?;
1689
1690 snapshot.is_synced = true;
1692
1693 let mut orderbooks = self.orderbooks.lock().await;
1695 if let Some(cached_ob) = orderbooks.get_mut(symbol) {
1696 snapshot.buffered_deltas = cached_ob.buffered_deltas.clone();
1698
1699 if let Ok(processed) = snapshot.process_buffered_deltas(is_futures) {
1701 tracing::debug!("Processed {} buffered deltas for {}", processed, symbol);
1702 }
1703 }
1704
1705 orderbooks.insert(symbol.to_string(), snapshot.clone());
1707
1708 Ok(snapshot)
1709 }
1710
1711 async fn watch_orderbook_internal(
1723 &self,
1724 exchange: &Binance,
1725 symbol: &str,
1726 limit: Option<i64>,
1727 update_speed: i32,
1728 is_futures: bool,
1729 ) -> Result<OrderBook> {
1730 let stream = if update_speed == 100 {
1732 format!("{}@depth@100ms", symbol.to_lowercase())
1733 } else {
1734 format!("{}@depth", symbol.to_lowercase())
1735 };
1736
1737 self.client
1739 .subscribe(stream.clone(), Some(symbol.to_string()), None)
1740 .await?;
1741
1742 let snapshot_fetched = Arc::new(Mutex::new(false));
1744 let _snapshot_fetched_clone = snapshot_fetched.clone();
1745
1746 let _orderbooks_clone = self.orderbooks.clone();
1748 let _symbol_clone = symbol.to_string();
1749
1750 tokio::spawn(async move {
1751 });
1753
1754 tokio::time::sleep(Duration::from_millis(500)).await;
1756
1757 let _snapshot = self
1759 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1760 .await?;
1761
1762 *snapshot_fetched.lock().await = true;
1763
1764 loop {
1766 if let Some(message) = self.client.receive().await {
1767 if message.get("result").is_some() {
1769 continue;
1770 }
1771
1772 if let Some(event_type) = message.get("e").and_then(|v| v.as_str()) {
1774 if event_type == "depthUpdate" {
1775 match self
1776 .handle_orderbook_delta(symbol, &message, is_futures)
1777 .await
1778 {
1779 Ok(_) => {
1780 let orderbooks = self.orderbooks.lock().await;
1782 if let Some(ob) = orderbooks.get(symbol) {
1783 if ob.is_synced {
1784 return Ok(ob.clone());
1785 }
1786 }
1787 }
1788 Err(e) => {
1789 let err_msg = e.to_string();
1790
1791 if err_msg.contains("RESYNC_NEEDED") {
1793 tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
1794
1795 let current_time = chrono::Utc::now().timestamp_millis();
1796 let should_resync = {
1797 let orderbooks = self.orderbooks.lock().await;
1798 if let Some(ob) = orderbooks.get(symbol) {
1799 ob.should_resync(current_time)
1800 } else {
1801 true
1802 }
1803 };
1804
1805 if should_resync {
1806 tracing::info!("Initiating resync for {}", symbol);
1807
1808 {
1810 let mut orderbooks = self.orderbooks.lock().await;
1811 if let Some(ob) = orderbooks.get_mut(symbol) {
1812 ob.reset_for_resync();
1813 ob.mark_resync_initiated(current_time);
1814 }
1815 }
1816
1817 tokio::time::sleep(Duration::from_millis(500)).await;
1819
1820 match self
1822 .fetch_orderbook_snapshot(
1823 exchange, symbol, limit, is_futures,
1824 )
1825 .await
1826 {
1827 Ok(_) => {
1828 tracing::info!(
1829 "Resync completed successfully for {}",
1830 symbol
1831 );
1832 continue;
1833 }
1834 Err(resync_err) => {
1835 tracing::error!(
1836 "Resync failed for {}: {}",
1837 symbol,
1838 resync_err
1839 );
1840 return Err(resync_err);
1841 }
1842 }
1843 } else {
1844 tracing::debug!(
1845 "Resync rate limited for {}, skipping",
1846 symbol
1847 );
1848 continue;
1849 }
1850 } else {
1851 tracing::error!(
1852 "Failed to handle orderbook delta: {}",
1853 err_msg
1854 );
1855 continue;
1856 }
1857 }
1858 }
1859 }
1860 }
1861 }
1862 }
1863 }
1864
1865 async fn watch_orderbooks_internal(
1877 &self,
1878 exchange: &Binance,
1879 symbols: Vec<String>,
1880 limit: Option<i64>,
1881 update_speed: i32,
1882 is_futures: bool,
1883 ) -> Result<HashMap<String, OrderBook>> {
1884 if symbols.len() > 200 {
1886 return Err(Error::invalid_request(
1887 "Binance supports max 200 symbols per connection",
1888 ));
1889 }
1890
1891 for symbol in &symbols {
1893 let stream = if update_speed == 100 {
1894 format!("{}@depth@100ms", symbol.to_lowercase())
1895 } else {
1896 format!("{}@depth", symbol.to_lowercase())
1897 };
1898
1899 self.client
1900 .subscribe(stream, Some(symbol.clone()), None)
1901 .await?;
1902 }
1903
1904 tokio::time::sleep(Duration::from_millis(500)).await;
1906
1907 for symbol in &symbols {
1909 let _ = self
1910 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1911 .await;
1912 }
1913
1914 let mut result = HashMap::new();
1916 let mut update_count = 0;
1917
1918 while update_count < symbols.len() {
1919 if let Some(message) = self.client.receive().await {
1920 if message.get("result").is_some() {
1922 continue;
1923 }
1924
1925 if let Some(event_type) = message.get("e").and_then(|v| v.as_str()) {
1927 if event_type == "depthUpdate" {
1928 if let Some(msg_symbol) = message.get("s").and_then(|v| v.as_str()) {
1929 if let Err(e) = self
1930 .handle_orderbook_delta(msg_symbol, &message, is_futures)
1931 .await
1932 {
1933 tracing::error!("Failed to handle orderbook delta: {}", e);
1934 continue;
1935 }
1936
1937 update_count += 1;
1938 }
1939 }
1940 }
1941 }
1942 }
1943
1944 let orderbooks = self.orderbooks.lock().await;
1946 for symbol in &symbols {
1947 if let Some(ob) = orderbooks.get(symbol) {
1948 result.insert(symbol.clone(), ob.clone());
1949 }
1950 }
1951
1952 Ok(result)
1953 }
1954
1955 pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1962 let tickers = self.tickers.lock().await;
1963 tickers.get(symbol).cloned()
1964 }
1965
1966 pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1968 let tickers = self.tickers.lock().await;
1969 tickers.clone()
1970 }
1971
1972 async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1981 use rust_decimal::Decimal;
1982 use std::str::FromStr;
1983
1984 let event_type = message
1986 .get("e")
1987 .and_then(|e| e.as_str())
1988 .ok_or_else(|| Error::invalid_request("Missing event type in balance message"))?;
1989
1990 let mut balances = self.balances.write().await;
1992 let balance = balances
1993 .entry(account_type.to_string())
1994 .or_insert_with(Balance::new);
1995
1996 match event_type {
1997 "balanceUpdate" => {
1999 let asset = message
2000 .get("a")
2001 .and_then(|a| a.as_str())
2002 .ok_or_else(|| Error::invalid_request("Missing asset in balanceUpdate"))?;
2003
2004 let delta_str = message
2005 .get("d")
2006 .and_then(|d| d.as_str())
2007 .ok_or_else(|| Error::invalid_request("Missing delta in balanceUpdate"))?;
2008
2009 let delta = Decimal::from_str(delta_str)
2010 .map_err(|e| Error::invalid_request(format!("Invalid delta value: {}", e)))?;
2011
2012 balance.apply_delta(asset.to_string(), delta);
2014 }
2015
2016 "outboundAccountPosition" => {
2018 if let Some(balances_array) = message.get("B").and_then(|b| b.as_array()) {
2019 for balance_item in balances_array {
2020 let asset =
2021 balance_item
2022 .get("a")
2023 .and_then(|a| a.as_str())
2024 .ok_or_else(|| {
2025 Error::invalid_request("Missing asset in balance item")
2026 })?;
2027
2028 let free_str = balance_item
2029 .get("f")
2030 .and_then(|f| f.as_str())
2031 .ok_or_else(|| Error::invalid_request("Missing free balance"))?;
2032
2033 let locked_str = balance_item
2034 .get("l")
2035 .and_then(|l| l.as_str())
2036 .ok_or_else(|| Error::invalid_request("Missing locked balance"))?;
2037
2038 let free = Decimal::from_str(free_str).map_err(|e| {
2039 Error::invalid_request(format!("Invalid free value: {}", e))
2040 })?;
2041
2042 let locked = Decimal::from_str(locked_str).map_err(|e| {
2043 Error::invalid_request(format!("Invalid locked value: {}", e))
2044 })?;
2045
2046 balance.update_balance(asset.to_string(), free, locked);
2048 }
2049 }
2050 }
2051
2052 "ACCOUNT_UPDATE" => {
2054 if let Some(account_data) = message.get("a") {
2055 if let Some(balances_array) = account_data.get("B").and_then(|b| b.as_array()) {
2057 for balance_item in balances_array {
2058 let asset = balance_item.get("a").and_then(|a| a.as_str()).ok_or_else(
2059 || Error::invalid_request("Missing asset in balance item"),
2060 )?;
2061
2062 let wallet_balance_str = balance_item
2063 .get("wb")
2064 .and_then(|wb| wb.as_str())
2065 .ok_or_else(|| Error::invalid_request("Missing wallet balance"))?;
2066
2067 let wallet_balance =
2068 Decimal::from_str(wallet_balance_str).map_err(|e| {
2069 Error::invalid_request(format!("Invalid wallet balance: {}", e))
2070 })?;
2071
2072 let cross_wallet = balance_item
2074 .get("cw")
2075 .and_then(|cw| cw.as_str())
2076 .and_then(|s| Decimal::from_str(s).ok());
2077
2078 balance.update_wallet(asset.to_string(), wallet_balance, cross_wallet);
2080 }
2081 }
2082
2083 }
2085 }
2086
2087 _ => {
2088 return Err(Error::invalid_request(format!(
2089 "Unknown balance event type: {}",
2090 event_type
2091 )));
2092 }
2093 }
2094
2095 Ok(())
2096 }
2097
2098 fn parse_ws_trade(&self, data: &Value) -> Result<Trade> {
2108 use ccxt_core::types::{Fee, OrderSide, OrderType, TakerOrMaker};
2109 use rust_decimal::Decimal;
2110 use std::str::FromStr;
2111
2112 let symbol = data
2114 .get("s")
2115 .and_then(|v| v.as_str())
2116 .ok_or_else(|| Error::invalid_request("Missing symbol field".to_string()))?
2117 .to_string();
2118
2119 let id = data
2121 .get("t")
2122 .and_then(|v| v.as_i64())
2123 .map(|v| v.to_string());
2124
2125 let timestamp = data.get("T").and_then(|v| v.as_i64()).unwrap_or(0);
2127
2128 let price = data
2130 .get("L")
2131 .and_then(|v| v.as_str())
2132 .and_then(|s| Decimal::from_str(s).ok())
2133 .unwrap_or(Decimal::ZERO);
2134
2135 let amount = data
2137 .get("l")
2138 .and_then(|v| v.as_str())
2139 .and_then(|s| Decimal::from_str(s).ok())
2140 .unwrap_or(Decimal::ZERO);
2141
2142 let cost = data
2144 .get("Y")
2145 .and_then(|v| v.as_str())
2146 .and_then(|s| Decimal::from_str(s).ok())
2147 .or_else(|| {
2148 if price > Decimal::ZERO && amount > Decimal::ZERO {
2150 Some(price * amount)
2151 } else {
2152 None
2153 }
2154 });
2155
2156 let side = data
2158 .get("S")
2159 .and_then(|v| v.as_str())
2160 .and_then(|s| match s.to_uppercase().as_str() {
2161 "BUY" => Some(OrderSide::Buy),
2162 "SELL" => Some(OrderSide::Sell),
2163 _ => None,
2164 })
2165 .unwrap_or(OrderSide::Buy);
2166
2167 let trade_type =
2169 data.get("o")
2170 .and_then(|v| v.as_str())
2171 .and_then(|s| match s.to_uppercase().as_str() {
2172 "LIMIT" => Some(OrderType::Limit),
2173 "MARKET" => Some(OrderType::Market),
2174 _ => None,
2175 });
2176
2177 let order_id = data
2179 .get("i")
2180 .and_then(|v| v.as_i64())
2181 .map(|v| v.to_string());
2182
2183 let taker_or_maker = data.get("m").and_then(|v| v.as_bool()).map(|is_maker| {
2185 if is_maker {
2186 TakerOrMaker::Maker
2187 } else {
2188 TakerOrMaker::Taker
2189 }
2190 });
2191
2192 let fee = if let Some(fee_cost_str) = data.get("n").and_then(|v| v.as_str()) {
2194 if let Ok(fee_cost) = Decimal::from_str(fee_cost_str) {
2195 let currency = data
2196 .get("N")
2197 .and_then(|v| v.as_str())
2198 .unwrap_or("UNKNOWN")
2199 .to_string();
2200 Some(Fee {
2201 currency,
2202 cost: fee_cost,
2203 rate: None,
2204 })
2205 } else {
2206 None
2207 }
2208 } else {
2209 None
2210 };
2211
2212 let datetime = chrono::DateTime::from_timestamp_millis(timestamp)
2214 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string());
2215
2216 let mut info = HashMap::new();
2218 if let Value::Object(map) = data {
2219 for (k, v) in map.iter() {
2220 info.insert(k.clone(), v.clone());
2221 }
2222 }
2223
2224 Ok(Trade {
2225 id,
2226 order: order_id,
2227 symbol,
2228 trade_type,
2229 side,
2230 taker_or_maker,
2231 price: Price::from(price),
2232 amount: Amount::from(amount),
2233 cost: cost.map(Cost::from),
2234 fee,
2235 timestamp,
2236 datetime,
2237 info,
2238 })
2239 }
2240
2241 async fn filter_my_trades(
2248 &self,
2249 symbol: Option<&str>,
2250 since: Option<i64>,
2251 limit: Option<usize>,
2252 ) -> Result<Vec<Trade>> {
2253 let trades_map = self.my_trades.read().await;
2254
2255 let mut trades: Vec<Trade> = if let Some(sym) = symbol {
2257 trades_map
2258 .get(sym)
2259 .map(|symbol_trades| symbol_trades.iter().cloned().collect())
2260 .unwrap_or_default()
2261 } else {
2262 trades_map
2263 .values()
2264 .flat_map(|symbol_trades| symbol_trades.iter().cloned())
2265 .collect()
2266 };
2267
2268 if let Some(since_ts) = since {
2270 trades.retain(|trade| trade.timestamp >= since_ts);
2271 }
2272
2273 trades.sort_by(|a, b| {
2275 let ts_a = a.timestamp;
2276 let ts_b = b.timestamp;
2277 ts_b.cmp(&ts_a)
2278 });
2279
2280 if let Some(lim) = limit {
2282 trades.truncate(lim);
2283 }
2284
2285 Ok(trades)
2286 }
2287
2288 async fn parse_ws_position(&self, data: &Value) -> Result<Position> {
2311 let symbol = data["s"]
2313 .as_str()
2314 .ok_or_else(|| Error::invalid_request("Missing symbol field"))?
2315 .to_string();
2316
2317 let position_amount_str = data["pa"]
2318 .as_str()
2319 .ok_or_else(|| Error::invalid_request("Missing position amount"))?;
2320
2321 let position_amount = position_amount_str
2322 .parse::<f64>()
2323 .map_err(|e| Error::invalid_request(format!("Invalid position amount: {}", e)))?;
2324
2325 let position_side = data["ps"]
2327 .as_str()
2328 .ok_or_else(|| Error::invalid_request("Missing position side"))?
2329 .to_uppercase();
2330
2331 let (side, hedged) = if position_side == "BOTH" {
2335 let actual_side = if position_amount < 0.0 {
2336 "short"
2337 } else {
2338 "long"
2339 };
2340 (actual_side.to_string(), false)
2341 } else {
2342 (position_side.to_lowercase(), true)
2343 };
2344
2345 let entry_price = data["ep"].as_str().and_then(|s| s.parse::<f64>().ok());
2347 let unrealized_pnl = data["up"].as_str().and_then(|s| s.parse::<f64>().ok());
2348 let realized_pnl = data["cr"].as_str().and_then(|s| s.parse::<f64>().ok());
2349 let margin_mode = data["mt"].as_str().map(|s| s.to_string());
2350 let initial_margin = data["iw"].as_str().and_then(|s| s.parse::<f64>().ok());
2351 let _margin_asset = data["ma"].as_str().map(|s| s.to_string());
2352
2353 Ok(Position {
2355 info: data.clone(),
2356 id: None,
2357 symbol,
2358 side: Some(side),
2359 contracts: Some(position_amount.abs()), contract_size: None,
2361 entry_price,
2362 mark_price: None,
2363 notional: None,
2364 leverage: None,
2365 collateral: initial_margin, initial_margin,
2367 initial_margin_percentage: None,
2368 maintenance_margin: None,
2369 maintenance_margin_percentage: None,
2370 unrealized_pnl,
2371 realized_pnl,
2372 liquidation_price: None,
2373 margin_ratio: None,
2374 margin_mode,
2375 hedged: Some(hedged),
2376 percentage: None,
2377 position_side: None,
2378 dual_side_position: None,
2379 timestamp: Some(chrono::Utc::now().timestamp_millis() as u64),
2380 datetime: Some(chrono::Utc::now().to_rfc3339()),
2381 })
2382 }
2383
2384 async fn filter_positions(
2394 &self,
2395 symbols: Option<&[String]>,
2396 since: Option<i64>,
2397 limit: Option<usize>,
2398 ) -> Result<Vec<Position>> {
2399 let positions_map = self.positions.read().await;
2400
2401 let mut positions: Vec<Position> = if let Some(syms) = symbols {
2403 syms.iter()
2404 .filter_map(|sym| positions_map.get(sym))
2405 .flat_map(|side_map| side_map.values().cloned())
2406 .collect()
2407 } else {
2408 positions_map
2409 .values()
2410 .flat_map(|side_map| side_map.values().cloned())
2411 .collect()
2412 };
2413
2414 if let Some(since_ts) = since {
2416 positions.retain(|pos| {
2417 pos.timestamp
2418 .map(|ts| ts as i64 >= since_ts)
2419 .unwrap_or(false)
2420 });
2421 }
2422
2423 positions.sort_by(|a, b| {
2425 let ts_a = a.timestamp.unwrap_or(0);
2426 let ts_b = b.timestamp.unwrap_or(0);
2427 ts_b.cmp(&ts_a)
2428 });
2429
2430 if let Some(lim) = limit {
2432 positions.truncate(lim);
2433 }
2434
2435 Ok(positions)
2436 }
2437}
2438
2439impl Binance {
2440 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
2448 let ws = self.create_ws();
2449 ws.connect().await?;
2450
2451 let binance_symbol = symbol.replace('/', "").to_lowercase();
2453 ws.subscribe_ticker(&binance_symbol).await
2454 }
2455
2456 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
2464 let ws = self.create_ws();
2465 ws.connect().await?;
2466
2467 let binance_symbol = symbol.replace('/', "").to_lowercase();
2468 ws.subscribe_trades(&binance_symbol).await
2469 }
2470
2471 pub async fn subscribe_orderbook(&self, symbol: &str, levels: Option<u32>) -> Result<()> {
2480 let ws = self.create_ws();
2481 ws.connect().await?;
2482
2483 let binance_symbol = symbol.replace('/', "").to_lowercase();
2484 let depth_levels = levels.unwrap_or(20);
2485 ws.subscribe_orderbook(&binance_symbol, depth_levels, "1000ms")
2486 .await
2487 }
2488
2489 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
2498 let ws = self.create_ws();
2499 ws.connect().await?;
2500
2501 let binance_symbol = symbol.replace('/', "").to_lowercase();
2502 ws.subscribe_kline(&binance_symbol, interval).await
2503 }
2504
2505 pub async fn watch_ticker(
2526 &self,
2527 symbol: &str,
2528 params: Option<HashMap<String, Value>>,
2529 ) -> Result<Ticker> {
2530 self.load_markets(false).await?;
2532
2533 let market = self.base.market(symbol).await?;
2535 let binance_symbol = market.id.to_lowercase();
2536
2537 let channel_name = if let Some(p) = ¶ms {
2539 p.get("name").and_then(|v| v.as_str()).unwrap_or("ticker")
2540 } else {
2541 "ticker"
2542 };
2543
2544 let ws = self.create_ws();
2546 ws.connect().await?;
2547
2548 ws.watch_ticker_internal(&binance_symbol, channel_name)
2550 .await
2551 }
2552
2553 pub async fn watch_tickers(
2581 &self,
2582 symbols: Option<Vec<String>>,
2583 params: Option<HashMap<String, Value>>,
2584 ) -> Result<HashMap<String, Ticker>> {
2585 self.load_markets(false).await?;
2587
2588 let channel_name = if let Some(p) = ¶ms {
2590 p.get("name").and_then(|v| v.as_str()).unwrap_or("ticker")
2591 } else {
2592 "ticker"
2593 };
2594
2595 if channel_name == "bookTicker" {
2597 return Err(Error::invalid_request(
2598 "To subscribe for bids-asks, use watch_bids_asks() method instead",
2599 ));
2600 }
2601
2602 let binance_symbols = if let Some(syms) = symbols {
2604 let mut result = Vec::new();
2605 for symbol in syms {
2606 let market = self.base.market(&symbol).await?;
2607 result.push(market.id.to_lowercase());
2608 }
2609 Some(result)
2610 } else {
2611 None
2612 };
2613
2614 let ws = self.create_ws();
2616 ws.connect().await?;
2617
2618 ws.watch_tickers_internal(binance_symbols, channel_name)
2620 .await
2621 }
2622
2623 pub async fn watch_mark_price(
2652 &self,
2653 symbol: &str,
2654 params: Option<HashMap<String, Value>>,
2655 ) -> Result<Ticker> {
2656 self.load_markets(false).await?;
2658
2659 let market = self.base.market(symbol).await?;
2661 if market.market_type != MarketType::Swap && market.market_type != MarketType::Futures {
2662 return Err(Error::invalid_request(format!(
2663 "watch_mark_price() does not support {} markets",
2664 market.market_type
2665 )));
2666 }
2667
2668 let binance_symbol = market.id.to_lowercase();
2669
2670 let use_1s_freq = if let Some(p) = ¶ms {
2672 p.get("use1sFreq").and_then(|v| v.as_bool()).unwrap_or(true)
2673 } else {
2674 true
2675 };
2676
2677 let channel_name = if use_1s_freq {
2679 "markPrice@1s"
2680 } else {
2681 "markPrice"
2682 };
2683
2684 let ws = self.create_ws();
2686 ws.connect().await?;
2687
2688 ws.watch_ticker_internal(&binance_symbol, channel_name)
2690 .await
2691 }
2692
2693 pub async fn watch_order_book(
2729 &self,
2730 symbol: &str,
2731 limit: Option<i64>,
2732 params: Option<HashMap<String, Value>>,
2733 ) -> Result<OrderBook> {
2734 self.load_markets(false).await?;
2736
2737 let market = self.base.market(symbol).await?;
2739 let binance_symbol = market.id.to_lowercase();
2740
2741 let is_futures =
2743 market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2744
2745 let update_speed = if let Some(p) = ¶ms {
2747 p.get("speed").and_then(|v| v.as_i64()).unwrap_or(100) as i32
2748 } else {
2749 100
2750 };
2751
2752 if update_speed != 100 && update_speed != 1000 {
2754 return Err(Error::invalid_request(
2755 "Update speed must be 100 or 1000 milliseconds",
2756 ));
2757 }
2758
2759 let ws = self.create_ws();
2761 ws.connect().await?;
2762
2763 ws.watch_orderbook_internal(self, &binance_symbol, limit, update_speed, is_futures)
2765 .await
2766 }
2767
2768 pub async fn watch_order_books(
2798 &self,
2799 symbols: Vec<String>,
2800 limit: Option<i64>,
2801 params: Option<HashMap<String, Value>>,
2802 ) -> Result<HashMap<String, OrderBook>> {
2803 if symbols.is_empty() {
2805 return Err(Error::invalid_request("Symbols list cannot be empty"));
2806 }
2807
2808 if symbols.len() > 200 {
2809 return Err(Error::invalid_request(
2810 "Binance supports max 200 symbols per connection",
2811 ));
2812 }
2813
2814 self.load_markets(false).await?;
2816
2817 let mut binance_symbols = Vec::new();
2819 let mut is_futures = false;
2820
2821 for symbol in &symbols {
2822 let market = self.base.market(symbol).await?;
2823 binance_symbols.push(market.id.to_lowercase());
2824
2825 let current_is_futures =
2826 market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2827 if !binance_symbols.is_empty() && current_is_futures != is_futures {
2828 return Err(Error::invalid_request(
2829 "Cannot mix spot and futures markets in watch_order_books",
2830 ));
2831 }
2832 is_futures = current_is_futures;
2833 }
2834
2835 let update_speed = if let Some(p) = ¶ms {
2837 p.get("speed").and_then(|v| v.as_i64()).unwrap_or(100) as i32
2838 } else {
2839 100
2840 };
2841
2842 let ws = self.create_ws();
2844 ws.connect().await?;
2845
2846 ws.watch_orderbooks_internal(self, binance_symbols, limit, update_speed, is_futures)
2848 .await
2849 }
2850
2851 pub async fn watch_mark_prices(
2861 &self,
2862 symbols: Option<Vec<String>>,
2863 params: Option<HashMap<String, Value>>,
2864 ) -> Result<HashMap<String, Ticker>> {
2865 self.load_markets(false).await?;
2867
2868 let use_1s_freq = if let Some(p) = ¶ms {
2870 p.get("use1sFreq").and_then(|v| v.as_bool()).unwrap_or(true)
2871 } else {
2872 true
2873 };
2874
2875 let channel_name = if use_1s_freq {
2877 "markPrice@1s"
2878 } else {
2879 "markPrice"
2880 };
2881
2882 let binance_symbols = if let Some(syms) = symbols {
2884 let mut result = Vec::new();
2885 for symbol in syms {
2886 let market = self.base.market(&symbol).await?;
2887 if market.market_type != MarketType::Swap
2888 && market.market_type != MarketType::Futures
2889 {
2890 return Err(Error::invalid_request(format!(
2891 "watch_mark_prices() does not support {} markets",
2892 market.market_type
2893 )));
2894 }
2895 result.push(market.id.to_lowercase());
2896 }
2897 Some(result)
2898 } else {
2899 None
2900 };
2901
2902 let ws = self.create_ws();
2904 ws.connect().await?;
2905
2906 ws.watch_tickers_internal(binance_symbols, channel_name)
2908 .await
2909 }
2910 pub async fn watch_trades(
2920 &self,
2921 symbol: &str,
2922 since: Option<i64>,
2923 limit: Option<usize>,
2924 ) -> Result<Vec<Trade>> {
2925 self.base.load_markets(false).await?;
2927
2928 let market = self.base.market(symbol).await?;
2930 let binance_symbol = market.id.to_lowercase();
2931
2932 let ws = self.create_ws();
2934 ws.connect().await?;
2935
2936 ws.subscribe_trades(&binance_symbol).await?;
2938
2939 let mut retries = 0;
2941 const MAX_RETRIES: u32 = 50;
2942
2943 while retries < MAX_RETRIES {
2944 if let Some(msg) = ws.client.receive().await {
2945 if msg.get("result").is_some() || msg.get("id").is_some() {
2947 continue;
2948 }
2949
2950 if let Ok(trade) = parser::parse_ws_trade(&msg, Some(&market)) {
2952 let mut trades_map = ws.trades.lock().await;
2954 let trades = trades_map
2955 .entry(symbol.to_string())
2956 .or_insert_with(VecDeque::new);
2957
2958 const MAX_TRADES: usize = 1000;
2960 if trades.len() >= MAX_TRADES {
2961 trades.pop_front();
2962 }
2963 trades.push_back(trade);
2964
2965 let mut result: Vec<Trade> = trades.iter().cloned().collect();
2967
2968 if let Some(since_ts) = since {
2970 result.retain(|t| t.timestamp >= since_ts);
2971 }
2972
2973 if let Some(limit_size) = limit {
2975 if result.len() > limit_size {
2976 result = result.split_off(result.len() - limit_size);
2977 }
2978 }
2979
2980 return Ok(result);
2981 }
2982 }
2983
2984 retries += 1;
2985 tokio::time::sleep(Duration::from_millis(100)).await;
2986 }
2987
2988 Err(Error::network("Timeout waiting for trade data"))
2989 }
2990
2991 pub async fn watch_ohlcv(
3002 &self,
3003 symbol: &str,
3004 timeframe: &str,
3005 since: Option<i64>,
3006 limit: Option<usize>,
3007 ) -> Result<Vec<OHLCV>> {
3008 self.base.load_markets(false).await?;
3010
3011 let market = self.base.market(symbol).await?;
3013 let binance_symbol = market.id.to_lowercase();
3014
3015 let ws = self.create_ws();
3017 ws.connect().await?;
3018
3019 ws.subscribe_kline(&binance_symbol, timeframe).await?;
3021
3022 let mut retries = 0;
3024 const MAX_RETRIES: u32 = 50;
3025
3026 while retries < MAX_RETRIES {
3027 if let Some(msg) = ws.client.receive().await {
3028 if msg.get("result").is_some() || msg.get("id").is_some() {
3030 continue;
3031 }
3032
3033 if let Ok(ohlcv) = parser::parse_ws_ohlcv(&msg) {
3035 let cache_key = format!("{}:{}", symbol, timeframe);
3037 let mut ohlcvs_map = ws.ohlcvs.lock().await;
3038 let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
3039
3040 const MAX_OHLCVS: usize = 1000;
3042 if ohlcvs.len() >= MAX_OHLCVS {
3043 ohlcvs.pop_front();
3044 }
3045 ohlcvs.push_back(ohlcv);
3046
3047 let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
3049
3050 if let Some(since_ts) = since {
3052 result.retain(|o| o.timestamp >= since_ts);
3053 }
3054
3055 if let Some(limit_size) = limit {
3057 if result.len() > limit_size {
3058 result = result.split_off(result.len() - limit_size);
3059 }
3060 }
3061
3062 return Ok(result);
3063 }
3064 }
3065
3066 retries += 1;
3067 tokio::time::sleep(Duration::from_millis(100)).await;
3068 }
3069
3070 Err(Error::network("Timeout waiting for OHLCV data"))
3071 }
3072
3073 pub async fn watch_bids_asks(&self, symbol: &str) -> Result<BidAsk> {
3081 self.base.load_markets(false).await?;
3083
3084 let market = self.base.market(symbol).await?;
3086 let binance_symbol = market.id.to_lowercase();
3087
3088 let ws = self.create_ws();
3090 ws.connect().await?;
3091
3092 let stream_name = format!("{}@bookTicker", binance_symbol);
3094 ws.client
3095 .subscribe(stream_name, Some(symbol.to_string()), None)
3096 .await?;
3097
3098 let mut retries = 0;
3100 const MAX_RETRIES: u32 = 50;
3101
3102 while retries < MAX_RETRIES {
3103 if let Some(msg) = ws.client.receive().await {
3104 if msg.get("result").is_some() || msg.get("id").is_some() {
3106 continue;
3107 }
3108
3109 if let Ok(bid_ask) = parser::parse_ws_bid_ask(&msg) {
3111 let mut bids_asks_map = ws.bids_asks.lock().await;
3113 bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
3114
3115 return Ok(bid_ask);
3116 }
3117 }
3118
3119 retries += 1;
3120 tokio::time::sleep(Duration::from_millis(100)).await;
3121 }
3122
3123 Err(Error::network("Timeout waiting for BidAsk data"))
3124 }
3125
3126 pub async fn watch_balance(
3161 self: Arc<Self>,
3162 params: Option<HashMap<String, Value>>,
3163 ) -> Result<Balance> {
3164 self.base.load_markets(false).await?;
3166
3167 let account_type = if let Some(p) = ¶ms {
3169 p.get("type")
3170 .and_then(|v| v.as_str())
3171 .unwrap_or(&self.options.default_type)
3172 } else {
3173 &self.options.default_type
3174 };
3175
3176 let fetch_snapshot = if let Some(p) = ¶ms {
3178 p.get("fetchBalanceSnapshot")
3179 .and_then(|v| v.as_bool())
3180 .unwrap_or(false)
3181 } else {
3182 false
3183 };
3184
3185 let await_snapshot = if let Some(p) = ¶ms {
3186 p.get("awaitBalanceSnapshot")
3187 .and_then(|v| v.as_bool())
3188 .unwrap_or(true)
3189 } else {
3190 true
3191 };
3192
3193 let ws = self.create_authenticated_ws();
3195 ws.connect().await?;
3196
3197 if fetch_snapshot {
3199 let snapshot = self.fetch_balance(params.clone()).await?;
3200
3201 let mut balances = ws.balances.write().await;
3203 balances.insert(account_type.to_string(), snapshot.clone());
3204
3205 if !await_snapshot {
3206 return Ok(snapshot);
3207 }
3208 }
3209
3210 let mut retries = 0;
3212 const MAX_RETRIES: u32 = 100;
3213
3214 while retries < MAX_RETRIES {
3215 if let Some(msg) = ws.client.receive().await {
3216 if msg.get("result").is_some() || msg.get("id").is_some() {
3218 continue;
3219 }
3220
3221 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3223 match event_type {
3225 "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE" => {
3226 if let Ok(()) = ws.handle_balance_message(&msg, account_type).await {
3228 let balances = ws.balances.read().await;
3230 if let Some(balance) = balances.get(account_type) {
3231 return Ok(balance.clone());
3232 }
3233 }
3234 }
3235 _ => {}
3236 }
3237 }
3238 }
3239
3240 retries += 1;
3241 tokio::time::sleep(Duration::from_millis(100)).await;
3242 }
3243
3244 Err(Error::network("Timeout waiting for balance data"))
3245 }
3246
3247 pub async fn watch_orders(
3280 self: Arc<Self>,
3281 symbol: Option<&str>,
3282 since: Option<i64>,
3283 limit: Option<usize>,
3284 _params: Option<HashMap<String, Value>>,
3285 ) -> Result<Vec<Order>> {
3286 self.base.load_markets(false).await?;
3287
3288 let ws = self.create_authenticated_ws();
3289 ws.connect().await?;
3290
3291 loop {
3293 if let Some(msg) = ws.client.receive().await {
3294 if let Value::Object(data) = msg {
3295 if let Some(event_type) = data.get("e").and_then(|v| v.as_str()) {
3296 match event_type {
3297 "executionReport" => {
3298 let order = self.parse_ws_order(&data)?;
3300
3301 let mut orders = ws.orders.write().await;
3303 let symbol_orders = orders
3304 .entry(order.symbol.clone())
3305 .or_insert_with(HashMap::new);
3306 symbol_orders.insert(order.id.clone(), order.clone());
3307 drop(orders);
3308
3309 if let Some(exec_type) = data.get("x").and_then(|v| v.as_str()) {
3311 if exec_type == "TRADE" {
3312 if let Ok(trade) =
3314 ws.parse_ws_trade(&Value::Object(data.clone()))
3315 {
3316 let mut trades = ws.my_trades.write().await;
3318 let symbol_trades = trades
3319 .entry(trade.symbol.clone())
3320 .or_insert_with(VecDeque::new);
3321
3322 symbol_trades.push_front(trade);
3324 if symbol_trades.len() > 1000 {
3325 symbol_trades.pop_back();
3326 }
3327 }
3328 }
3329 }
3330
3331 return self.filter_orders(&ws, symbol, since, limit).await;
3333 }
3334 _ => continue,
3335 }
3336 }
3337 }
3338 } else {
3339 tokio::time::sleep(Duration::from_millis(100)).await;
3340 }
3341 }
3342 }
3343
3344 fn parse_ws_order(&self, data: &serde_json::Map<String, Value>) -> Result<Order> {
3346 use ccxt_core::types::{OrderSide, OrderStatus, OrderType};
3347 use rust_decimal::Decimal;
3348 use std::str::FromStr;
3349
3350 let symbol = data.get("s").and_then(|v| v.as_str()).unwrap_or("");
3352 let order_id = data
3353 .get("i")
3354 .and_then(|v| v.as_i64())
3355 .map(|id| id.to_string())
3356 .unwrap_or_default();
3357 let client_order_id = data.get("c").and_then(|v| v.as_str()).map(String::from);
3358
3359 let status_str = data.get("X").and_then(|v| v.as_str()).unwrap_or("NEW");
3361 let status = match status_str {
3362 "NEW" => OrderStatus::Open,
3363 "PARTIALLY_FILLED" => OrderStatus::Open,
3364 "FILLED" => OrderStatus::Closed,
3365 "CANCELED" => OrderStatus::Canceled,
3366 "REJECTED" => OrderStatus::Rejected,
3367 "EXPIRED" => OrderStatus::Expired,
3368 _ => OrderStatus::Open,
3369 };
3370
3371 let side_str = data.get("S").and_then(|v| v.as_str()).unwrap_or("BUY");
3373 let side = match side_str {
3374 "BUY" => OrderSide::Buy,
3375 "SELL" => OrderSide::Sell,
3376 _ => OrderSide::Buy,
3377 };
3378
3379 let type_str = data.get("o").and_then(|v| v.as_str()).unwrap_or("LIMIT");
3381 let order_type = match type_str {
3382 "LIMIT" => OrderType::Limit,
3383 "MARKET" => OrderType::Market,
3384 "STOP_LOSS" => OrderType::StopLoss,
3385 "STOP_LOSS_LIMIT" => OrderType::StopLossLimit,
3386 "TAKE_PROFIT" => OrderType::TakeProfit,
3387 "TAKE_PROFIT_LIMIT" => OrderType::TakeProfitLimit,
3388 "LIMIT_MAKER" => OrderType::LimitMaker,
3389 _ => OrderType::Limit,
3390 };
3391
3392 let amount = data
3394 .get("q")
3395 .and_then(|v| v.as_str())
3396 .and_then(|s| Decimal::from_str(s).ok())
3397 .unwrap_or(Decimal::ZERO);
3398
3399 let price = data
3400 .get("p")
3401 .and_then(|v| v.as_str())
3402 .and_then(|s| Decimal::from_str(s).ok());
3403
3404 let filled = data
3405 .get("z")
3406 .and_then(|v| v.as_str())
3407 .and_then(|s| Decimal::from_str(s).ok());
3408
3409 let cost = data
3410 .get("Z")
3411 .and_then(|v| v.as_str())
3412 .and_then(|s| Decimal::from_str(s).ok());
3413
3414 let remaining = match filled {
3416 Some(fill) => Some(amount - fill),
3417 None => None,
3418 };
3419
3420 let average = match (filled, cost) {
3422 (Some(fill), Some(c)) if fill > Decimal::ZERO && c > Decimal::ZERO => Some(c / fill),
3423 _ => None,
3424 };
3425
3426 let timestamp = data.get("T").and_then(|v| v.as_i64());
3428 let last_trade_timestamp = data.get("T").and_then(|v| v.as_i64());
3429
3430 Ok(Order {
3431 id: order_id,
3432 client_order_id,
3433 timestamp,
3434 datetime: timestamp.map(|ts| {
3435 chrono::DateTime::from_timestamp_millis(ts)
3436 .map(|dt| dt.to_rfc3339())
3437 .unwrap_or_default()
3438 }),
3439 last_trade_timestamp,
3440 symbol: symbol.to_string(),
3441 order_type,
3442 side,
3443 price,
3444 average,
3445 amount,
3446 cost,
3447 filled,
3448 remaining,
3449 status,
3450 fee: None,
3451 fees: None,
3452 trades: None,
3453 time_in_force: data.get("f").and_then(|v| v.as_str()).map(String::from),
3454 post_only: None,
3455 reduce_only: None,
3456 stop_price: data
3457 .get("P")
3458 .and_then(|v| v.as_str())
3459 .and_then(|s| Decimal::from_str(s).ok()),
3460 trigger_price: None,
3461 take_profit_price: None,
3462 stop_loss_price: None,
3463 trailing_delta: None,
3464 trailing_percent: None,
3465 activation_price: None,
3466 callback_rate: None,
3467 working_type: data.get("wt").and_then(|v| v.as_str()).map(String::from),
3468 info: data.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
3469 })
3470 }
3471
3472 async fn filter_orders(
3474 &self,
3475 ws: &BinanceWs,
3476 symbol: Option<&str>,
3477 since: Option<i64>,
3478 limit: Option<usize>,
3479 ) -> Result<Vec<Order>> {
3480 let orders_map = ws.orders.read().await;
3481
3482 let mut orders: Vec<Order> = if let Some(sym) = symbol {
3484 orders_map
3485 .get(sym)
3486 .map(|symbol_orders| symbol_orders.values().cloned().collect())
3487 .unwrap_or_default()
3488 } else {
3489 orders_map
3490 .values()
3491 .flat_map(|symbol_orders| symbol_orders.values().cloned())
3492 .collect()
3493 };
3494
3495 if let Some(since_ts) = since {
3497 orders.retain(|order| order.timestamp.map_or(false, |ts| ts >= since_ts));
3498 }
3499
3500 orders.sort_by(|a, b| {
3502 let ts_a = a.timestamp.unwrap_or(0);
3503 let ts_b = b.timestamp.unwrap_or(0);
3504 ts_b.cmp(&ts_a)
3505 });
3506
3507 if let Some(lim) = limit {
3509 orders.truncate(lim);
3510 }
3511
3512 Ok(orders)
3513 }
3514
3515 pub async fn watch_my_trades(
3542 self: Arc<Self>,
3543 symbol: Option<&str>,
3544 since: Option<i64>,
3545 limit: Option<usize>,
3546 _params: Option<HashMap<String, Value>>,
3547 ) -> Result<Vec<Trade>> {
3548 let ws = self.create_authenticated_ws();
3550 ws.connect().await?;
3551
3552 let mut retries = 0;
3554 const MAX_RETRIES: u32 = 100;
3555
3556 while retries < MAX_RETRIES {
3557 if let Some(msg) = ws.client.receive().await {
3558 if msg.get("result").is_some() || msg.get("id").is_some() {
3560 continue;
3561 }
3562
3563 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3565 if event_type == "executionReport" {
3567 if let Ok(trade) = ws.parse_ws_trade(&msg) {
3568 let symbol_key = trade.symbol.clone();
3569
3570 let mut trades_map = ws.my_trades.write().await;
3572 let symbol_trades =
3573 trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
3574
3575 symbol_trades.push_front(trade);
3577 if symbol_trades.len() > 1000 {
3578 symbol_trades.pop_back();
3579 }
3580 }
3581 }
3582 }
3583 } else {
3584 tokio::time::sleep(Duration::from_millis(100)).await;
3585 }
3586
3587 retries += 1;
3588 }
3589
3590 ws.filter_my_trades(symbol, since, limit).await
3592 }
3593
3594 pub async fn watch_positions(
3658 self: Arc<Self>,
3659 symbols: Option<Vec<String>>,
3660 since: Option<i64>,
3661 limit: Option<usize>,
3662 _params: Option<HashMap<String, Value>>,
3663 ) -> Result<Vec<Position>> {
3664 let ws = self.create_authenticated_ws();
3666 ws.connect().await?;
3667
3668 let mut retries = 0;
3670 const MAX_RETRIES: u32 = 100;
3671
3672 while retries < MAX_RETRIES {
3673 if let Some(msg) = ws.client.receive().await {
3674 if msg.get("result").is_some() || msg.get("id").is_some() {
3676 continue;
3677 }
3678
3679 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3681 if event_type == "ACCOUNT_UPDATE" {
3682 if let Some(account_data) = msg.get("a") {
3683 if let Some(positions_array) =
3684 account_data.get("P").and_then(|p| p.as_array())
3685 {
3686 for position_data in positions_array {
3687 if let Ok(position) = ws.parse_ws_position(position_data).await
3688 {
3689 let symbol_key = position.symbol.clone();
3690 let side_key = position
3691 .side
3692 .clone()
3693 .unwrap_or_else(|| "both".to_string());
3694
3695 let mut positions_map = ws.positions.write().await;
3697 let symbol_positions = positions_map
3698 .entry(symbol_key)
3699 .or_insert_with(HashMap::new);
3700
3701 if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
3703 symbol_positions.remove(&side_key);
3704 if symbol_positions.is_empty() {
3705 positions_map.remove(&position.symbol);
3706 }
3707 } else {
3708 symbol_positions.insert(side_key, position);
3709 }
3710 }
3711 }
3712 }
3713 }
3714 }
3715 }
3716 } else {
3717 tokio::time::sleep(Duration::from_millis(100)).await;
3718 }
3719
3720 retries += 1;
3721 }
3722
3723 let symbols_ref = symbols.as_ref().map(|v| v.as_slice());
3725 ws.filter_positions(symbols_ref, since, limit).await
3726 }
3727}
3728
3729#[cfg(test)]
3730mod tests {
3731 use super::*;
3732
3733 #[test]
3734 fn test_binance_ws_creation() {
3735 let ws = BinanceWs::new(WS_BASE_URL.to_string());
3736 assert!(ws.listen_key.try_read().is_ok());
3738 }
3739
3740 #[test]
3741 fn test_stream_format() {
3742 let symbol = "btcusdt";
3743
3744 let ticker_stream = format!("{}@ticker", symbol);
3746 assert_eq!(ticker_stream, "btcusdt@ticker");
3747
3748 let trade_stream = format!("{}@trade", symbol);
3750 assert_eq!(trade_stream, "btcusdt@trade");
3751
3752 let depth_stream = format!("{}@depth20", symbol);
3754 assert_eq!(depth_stream, "btcusdt@depth20");
3755
3756 let kline_stream = format!("{}@kline_1m", symbol);
3758 assert_eq!(kline_stream, "btcusdt@kline_1m");
3759 }
3760
3761 #[tokio::test]
3762 async fn test_subscription_manager_basic() {
3763 let manager = SubscriptionManager::new();
3764 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
3765
3766 assert_eq!(manager.active_count(), 0);
3768 assert!(!manager.has_subscription("btcusdt@ticker").await);
3769
3770 manager
3772 .add_subscription(
3773 "btcusdt@ticker".to_string(),
3774 "BTCUSDT".to_string(),
3775 SubscriptionType::Ticker,
3776 tx.clone(),
3777 )
3778 .await
3779 .unwrap();
3780
3781 assert_eq!(manager.active_count(), 1);
3782 assert!(manager.has_subscription("btcusdt@ticker").await);
3783
3784 let sub = manager.get_subscription("btcusdt@ticker").await;
3786 assert!(sub.is_some());
3787 let sub = sub.unwrap();
3788 assert_eq!(sub.stream, "btcusdt@ticker");
3789 assert_eq!(sub.symbol, "BTCUSDT");
3790 assert_eq!(sub.sub_type, SubscriptionType::Ticker);
3791
3792 manager.remove_subscription("btcusdt@ticker").await.unwrap();
3794 assert_eq!(manager.active_count(), 0);
3795 assert!(!manager.has_subscription("btcusdt@ticker").await);
3796 }
3797
3798 #[tokio::test]
3799 async fn test_subscription_manager_multiple() {
3800 let manager = SubscriptionManager::new();
3801 let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
3802 let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
3803 let (tx3, _rx3) = tokio::sync::mpsc::unbounded_channel();
3804
3805 manager
3807 .add_subscription(
3808 "btcusdt@ticker".to_string(),
3809 "BTCUSDT".to_string(),
3810 SubscriptionType::Ticker,
3811 tx1,
3812 )
3813 .await
3814 .unwrap();
3815
3816 manager
3817 .add_subscription(
3818 "btcusdt@depth".to_string(),
3819 "BTCUSDT".to_string(),
3820 SubscriptionType::OrderBook,
3821 tx2,
3822 )
3823 .await
3824 .unwrap();
3825
3826 manager
3827 .add_subscription(
3828 "ethusdt@ticker".to_string(),
3829 "ETHUSDT".to_string(),
3830 SubscriptionType::Ticker,
3831 tx3,
3832 )
3833 .await
3834 .unwrap();
3835
3836 assert_eq!(manager.active_count(), 3);
3837
3838 let btc_subs = manager.get_subscriptions_by_symbol("BTCUSDT").await;
3840 assert_eq!(btc_subs.len(), 2);
3841
3842 let eth_subs = manager.get_subscriptions_by_symbol("ETHUSDT").await;
3843 assert_eq!(eth_subs.len(), 1);
3844
3845 let all_subs = manager.get_all_subscriptions().await;
3847 assert_eq!(all_subs.len(), 3);
3848
3849 manager.clear().await;
3851 assert_eq!(manager.active_count(), 0);
3852 }
3853
3854 #[tokio::test]
3855 async fn test_subscription_type_from_stream() {
3856 let sub_type = SubscriptionType::from_stream("btcusdt@ticker");
3858 assert_eq!(sub_type, Some(SubscriptionType::Ticker));
3859
3860 let sub_type = SubscriptionType::from_stream("btcusdt@depth");
3862 assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3863
3864 let sub_type = SubscriptionType::from_stream("btcusdt@depth@100ms");
3865 assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3866
3867 let sub_type = SubscriptionType::from_stream("btcusdt@trade");
3869 assert_eq!(sub_type, Some(SubscriptionType::Trades));
3870
3871 let sub_type = SubscriptionType::from_stream("btcusdt@aggTrade");
3872 assert_eq!(sub_type, Some(SubscriptionType::Trades));
3873
3874 let sub_type = SubscriptionType::from_stream("btcusdt@kline_1m");
3876 assert_eq!(sub_type, Some(SubscriptionType::Kline("1m".to_string())));
3877
3878 let sub_type = SubscriptionType::from_stream("btcusdt@kline_1h");
3879 assert_eq!(sub_type, Some(SubscriptionType::Kline("1h".to_string())));
3880
3881 let sub_type = SubscriptionType::from_stream("btcusdt@markPrice");
3883 assert_eq!(sub_type, Some(SubscriptionType::MarkPrice));
3884
3885 let sub_type = SubscriptionType::from_stream("btcusdt@bookTicker");
3887 assert_eq!(sub_type, Some(SubscriptionType::BookTicker));
3888
3889 let sub_type = SubscriptionType::from_stream("btcusdt@unknown");
3891 assert_eq!(sub_type, None);
3892 }
3893
3894 #[tokio::test]
3895 async fn test_subscription_send_message() {
3896 let manager = SubscriptionManager::new();
3897 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
3898
3899 manager
3901 .add_subscription(
3902 "btcusdt@ticker".to_string(),
3903 "BTCUSDT".to_string(),
3904 SubscriptionType::Ticker,
3905 tx,
3906 )
3907 .await
3908 .unwrap();
3909
3910 let test_msg = serde_json::json!({
3912 "e": "24hrTicker",
3913 "s": "BTCUSDT",
3914 "c": "50000"
3915 });
3916
3917 let sent = manager
3918 .send_to_stream("btcusdt@ticker", test_msg.clone())
3919 .await;
3920 assert!(sent);
3921
3922 let received = rx.recv().await;
3924 assert!(received.is_some());
3925 assert_eq!(received.unwrap(), test_msg);
3926 }
3927
3928 #[tokio::test]
3929 async fn test_subscription_send_to_symbol() {
3930 let manager = SubscriptionManager::new();
3931 let (tx1, mut rx1) = tokio::sync::mpsc::unbounded_channel();
3932 let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel();
3933
3934 manager
3936 .add_subscription(
3937 "btcusdt@ticker".to_string(),
3938 "BTCUSDT".to_string(),
3939 SubscriptionType::Ticker,
3940 tx1,
3941 )
3942 .await
3943 .unwrap();
3944
3945 manager
3946 .add_subscription(
3947 "btcusdt@depth".to_string(),
3948 "BTCUSDT".to_string(),
3949 SubscriptionType::OrderBook,
3950 tx2,
3951 )
3952 .await
3953 .unwrap();
3954
3955 let test_msg = serde_json::json!({
3957 "s": "BTCUSDT",
3958 "data": "test"
3959 });
3960
3961 let sent_count = manager.send_to_symbol("BTCUSDT", &test_msg).await;
3962 assert_eq!(sent_count, 2);
3963
3964 let received1 = rx1.recv().await;
3966 assert!(received1.is_some());
3967 assert_eq!(received1.unwrap(), test_msg);
3968
3969 let received2 = rx2.recv().await;
3970 assert!(received2.is_some());
3971 assert_eq!(received2.unwrap(), test_msg);
3972 }
3973
3974 #[test]
3975 fn test_symbol_conversion() {
3976 let symbol = "BTC/USDT";
3977 let binance_symbol = symbol.replace('/', "").to_lowercase();
3978 assert_eq!(binance_symbol, "btcusdt");
3979 }
3980
3981 #[test]
3984 fn test_reconnect_config_default() {
3985 let config = ReconnectConfig::default();
3986
3987 assert!(config.enabled);
3988 assert_eq!(config.initial_delay_ms, 1000);
3989 assert_eq!(config.max_delay_ms, 30000);
3990 assert_eq!(config.backoff_multiplier, 2.0);
3991 assert_eq!(config.max_attempts, 0); }
3993
3994 #[test]
3995 fn test_reconnect_config_calculate_delay() {
3996 let config = ReconnectConfig::default();
3997
3998 assert_eq!(config.calculate_delay(0), 1000); assert_eq!(config.calculate_delay(1), 2000); assert_eq!(config.calculate_delay(2), 4000); assert_eq!(config.calculate_delay(3), 8000); assert_eq!(config.calculate_delay(4), 16000); assert_eq!(config.calculate_delay(5), 30000); assert_eq!(config.calculate_delay(6), 30000); }
4007
4008 #[test]
4009 fn test_reconnect_config_should_retry() {
4010 let mut config = ReconnectConfig::default();
4011
4012 assert!(config.should_retry(0));
4014 assert!(config.should_retry(10));
4015 assert!(config.should_retry(100));
4016
4017 config.max_attempts = 3;
4019 assert!(config.should_retry(0));
4020 assert!(config.should_retry(1));
4021 assert!(config.should_retry(2));
4022 assert!(!config.should_retry(3));
4023 assert!(!config.should_retry(4));
4024
4025 config.enabled = false;
4027 assert!(!config.should_retry(0));
4028 assert!(!config.should_retry(1));
4029 }
4030
4031 #[test]
4032 fn test_message_router_extract_stream_name_combined() {
4033 let message = serde_json::json!({
4035 "stream": "btcusdt@ticker",
4036 "data": {
4037 "e": "24hrTicker",
4038 "s": "BTCUSDT"
4039 }
4040 });
4041
4042 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4043 assert_eq!(stream_name, "btcusdt@ticker");
4044 }
4045
4046 #[test]
4047 fn test_message_router_extract_stream_name_ticker() {
4048 let message = serde_json::json!({
4050 "e": "24hrTicker",
4051 "s": "BTCUSDT",
4052 "E": 1672531200000_u64,
4053 "c": "16950.00",
4054 "h": "17100.00"
4055 });
4056
4057 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4058 assert_eq!(stream_name, "btcusdt@ticker");
4059 }
4060
4061 #[test]
4062 fn test_message_router_extract_stream_name_depth() {
4063 let message = serde_json::json!({
4065 "e": "depthUpdate",
4066 "s": "ETHUSDT",
4067 "E": 1672531200000_u64,
4068 "U": 157,
4069 "u": 160
4070 });
4071
4072 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4073 assert_eq!(stream_name, "ethusdt@depth");
4074 }
4075
4076 #[test]
4077 fn test_message_router_extract_stream_name_trade() {
4078 let message = serde_json::json!({
4080 "e": "trade",
4081 "s": "BNBUSDT",
4082 "E": 1672531200000_u64,
4083 "t": 12345
4084 });
4085
4086 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4087 assert_eq!(stream_name, "bnbusdt@trade");
4088 }
4089
4090 #[test]
4091 fn test_message_router_extract_stream_name_kline() {
4092 let message = serde_json::json!({
4094 "e": "kline",
4095 "s": "BTCUSDT",
4096 "E": 1672531200000_u64,
4097 "k": {
4098 "i": "1m",
4099 "t": 1672531200000_u64,
4100 "o": "16950.00"
4101 }
4102 });
4103
4104 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4105 assert_eq!(stream_name, "btcusdt@kline_1m");
4106 }
4107
4108 #[test]
4109 fn test_message_router_extract_stream_name_mark_price() {
4110 let message = serde_json::json!({
4112 "e": "markPriceUpdate",
4113 "s": "BTCUSDT",
4114 "E": 1672531200000_u64,
4115 "p": "16950.00"
4116 });
4117
4118 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4119 assert_eq!(stream_name, "btcusdt@markPrice");
4120 }
4121
4122 #[test]
4123 fn test_message_router_extract_stream_name_book_ticker() {
4124 let message = serde_json::json!({
4126 "e": "bookTicker",
4127 "s": "ETHUSDT",
4128 "E": 1672531200000_u64,
4129 "b": "1200.00",
4130 "a": "1200.50"
4131 });
4132
4133 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4134 assert_eq!(stream_name, "ethusdt@bookTicker");
4135 }
4136
4137 #[test]
4138 fn test_message_router_extract_stream_name_subscription_response() {
4139 let message = serde_json::json!({
4141 "result": null,
4142 "id": 1
4143 });
4144
4145 let result = MessageRouter::extract_stream_name(&message);
4146 assert!(result.is_err());
4147 }
4148
4149 #[test]
4150 fn test_message_router_extract_stream_name_error_response() {
4151 let message = serde_json::json!({
4153 "error": {
4154 "code": -1,
4155 "msg": "Invalid request"
4156 },
4157 "id": 1
4158 });
4159
4160 let result = MessageRouter::extract_stream_name(&message);
4161 assert!(result.is_err());
4162 }
4163
4164 #[test]
4165 fn test_message_router_extract_stream_name_invalid() {
4166 let message = serde_json::json!({
4168 "unknown": "data"
4169 });
4170
4171 let result = MessageRouter::extract_stream_name(&message);
4172 assert!(result.is_err());
4173 }
4174
4175 #[tokio::test]
4176 async fn test_message_router_creation() {
4177 let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4178 let subscription_manager = Arc::new(SubscriptionManager::new());
4179
4180 let router = MessageRouter::new(ws_url.clone(), subscription_manager);
4181
4182 assert!(!router.is_connected());
4184 assert_eq!(router.ws_url, ws_url);
4185 }
4186
4187 #[tokio::test]
4188 async fn test_message_router_reconnect_config() {
4189 let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4190 let subscription_manager = Arc::new(SubscriptionManager::new());
4191
4192 let router = MessageRouter::new(ws_url, subscription_manager);
4193
4194 let config = router.get_reconnect_config().await;
4196 assert!(config.enabled);
4197 assert_eq!(config.initial_delay_ms, 1000);
4198
4199 let new_config = ReconnectConfig {
4201 enabled: false,
4202 initial_delay_ms: 2000,
4203 max_delay_ms: 60000,
4204 backoff_multiplier: 1.5,
4205 max_attempts: 5,
4206 };
4207
4208 router.set_reconnect_config(new_config.clone()).await;
4209
4210 let updated_config = router.get_reconnect_config().await;
4211 assert!(!updated_config.enabled);
4212 assert_eq!(updated_config.initial_delay_ms, 2000);
4213 assert_eq!(updated_config.max_delay_ms, 60000);
4214 assert_eq!(updated_config.backoff_multiplier, 1.5);
4215 assert_eq!(updated_config.max_attempts, 5);
4216 }
4217}