1use crate::binance::Binance;
6use crate::binance::parser;
7use ccxt_core::error::{Error, Result};
8use ccxt_core::types::financial::{Amount, Cost, Price};
9use ccxt_core::types::{
10 Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
11};
12use ccxt_core::ws_client::{WsClient, WsConfig};
13use serde_json::Value;
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock};
18use tokio::task::JoinHandle;
19use tokio_tungstenite::tungstenite::protocol::Message;
20
21#[allow(dead_code)]
23const WS_BASE_URL: &str = "wss://stream.binance.com:9443/ws";
24#[allow(dead_code)]
25const WS_TESTNET_URL: &str = "wss://testnet.binance.vision/ws";
26
27const LISTEN_KEY_REFRESH_INTERVAL: Duration = Duration::from_secs(30 * 60);
29
30pub struct ListenKeyManager {
38 binance: Arc<Binance>,
40 listen_key: Arc<RwLock<Option<String>>>,
42 created_at: Arc<RwLock<Option<Instant>>>,
44 refresh_interval: Duration,
46 refresh_task: Arc<Mutex<Option<JoinHandle<()>>>>,
48}
49
50impl ListenKeyManager {
51 pub fn new(binance: Arc<Binance>) -> Self {
59 Self {
60 binance,
61 listen_key: Arc::new(RwLock::new(None)),
62 created_at: Arc::new(RwLock::new(None)),
63 refresh_interval: LISTEN_KEY_REFRESH_INTERVAL,
64 refresh_task: Arc::new(Mutex::new(None)),
65 }
66 }
67
68 pub async fn get_or_create(&self) -> Result<String> {
79 let key_opt = self.listen_key.read().await.clone();
81
82 if let Some(key) = key_opt {
83 let created = self.created_at.read().await;
85 if let Some(created_time) = *created {
86 let elapsed = created_time.elapsed();
87 if elapsed > Duration::from_secs(50 * 60) {
89 drop(created);
90 return self.create_new().await;
91 }
92 }
93 return Ok(key);
94 }
95
96 self.create_new().await
98 }
99
100 async fn create_new(&self) -> Result<String> {
105 let key = self.binance.create_listen_key().await?;
106
107 *self.listen_key.write().await = Some(key.clone());
109 *self.created_at.write().await = Some(Instant::now());
110
111 Ok(key)
112 }
113
114 pub async fn refresh(&self) -> Result<()> {
121 let key_opt = self.listen_key.read().await.clone();
122
123 if let Some(key) = key_opt {
124 self.binance.refresh_listen_key(&key).await?;
125 *self.created_at.write().await = Some(Instant::now());
127 Ok(())
128 } else {
129 Err(Error::invalid_request("No listen key to refresh"))
130 }
131 }
132
133 pub async fn start_auto_refresh(&self) {
137 self.stop_auto_refresh().await;
139
140 let listen_key = self.listen_key.clone();
141 let created_at = self.created_at.clone();
142 let binance = self.binance.clone();
143 let interval = self.refresh_interval;
144
145 let handle = tokio::spawn(async move {
146 loop {
147 tokio::time::sleep(interval).await;
148
149 let key_opt = listen_key.read().await.clone();
151 if let Some(key) = key_opt {
152 match binance.refresh_listen_key(&key).await {
154 Ok(_) => {
155 *created_at.write().await = Some(Instant::now());
156 }
158 Err(_e) => {
159 *listen_key.write().await = None;
161 *created_at.write().await = None;
162 break;
163 }
164 }
165 } else {
166 break;
168 }
169 }
170 });
171
172 *self.refresh_task.lock().await = Some(handle);
173 }
174
175 pub async fn stop_auto_refresh(&self) {
177 let mut task_opt = self.refresh_task.lock().await;
178 if let Some(handle) = task_opt.take() {
179 handle.abort();
180 }
181 }
182
183 pub async fn delete(&self) -> Result<()> {
190 self.stop_auto_refresh().await;
192
193 let key_opt = self.listen_key.read().await.clone();
194
195 if let Some(key) = key_opt {
196 self.binance.delete_listen_key(&key).await?;
197
198 *self.listen_key.write().await = None;
200 *self.created_at.write().await = None;
201
202 Ok(())
203 } else {
204 Ok(()) }
206 }
207
208 pub async fn get_current(&self) -> Option<String> {
210 self.listen_key.read().await.clone()
211 }
212
213 pub async fn is_valid(&self) -> bool {
215 let key_opt = self.listen_key.read().await;
216 if key_opt.is_none() {
217 return false;
218 }
219
220 let created = self.created_at.read().await;
221 if let Some(created_time) = *created {
222 created_time.elapsed() < Duration::from_secs(55 * 60)
224 } else {
225 false
226 }
227 }
228}
229
230impl Drop for ListenKeyManager {
231 fn drop(&mut self) {
232 }
235}
236#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum SubscriptionType {
239 Ticker,
241 OrderBook,
243 Trades,
245 Kline(String),
247 Balance,
249 Orders,
251 Positions,
253 MyTrades,
255 MarkPrice,
257 BookTicker,
259}
260
261impl SubscriptionType {
262 pub fn from_stream(stream: &str) -> Option<Self> {
270 if stream.contains("@ticker") {
271 Some(Self::Ticker)
272 } else if stream.contains("@depth") {
273 Some(Self::OrderBook)
274 } else if stream.contains("@trade") || stream.contains("@aggTrade") {
275 Some(Self::Trades)
276 } else if stream.contains("@kline_") {
277 let parts: Vec<&str> = stream.split("@kline_").collect();
279 if parts.len() == 2 {
280 Some(Self::Kline(parts[1].to_string()))
281 } else {
282 None
283 }
284 } else if stream.contains("@markPrice") {
285 Some(Self::MarkPrice)
286 } else if stream.contains("@bookTicker") {
287 Some(Self::BookTicker)
288 } else {
289 None
290 }
291 }
292}
293
294#[derive(Clone)]
296pub struct Subscription {
297 pub stream: String,
299 pub symbol: String,
301 pub sub_type: SubscriptionType,
303 pub subscribed_at: Instant,
305 pub sender: tokio::sync::mpsc::UnboundedSender<Value>,
307}
308
309impl Subscription {
310 pub fn new(
312 stream: String,
313 symbol: String,
314 sub_type: SubscriptionType,
315 sender: tokio::sync::mpsc::UnboundedSender<Value>,
316 ) -> Self {
317 Self {
318 stream,
319 symbol,
320 sub_type,
321 subscribed_at: Instant::now(),
322 sender,
323 }
324 }
325
326 pub fn send(&self, message: Value) -> bool {
334 self.sender.send(message).is_ok()
335 }
336}
337
338pub struct SubscriptionManager {
345 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
347 symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
349 active_count: Arc<std::sync::atomic::AtomicUsize>,
351}
352
353impl SubscriptionManager {
354 pub fn new() -> Self {
356 Self {
357 subscriptions: Arc::new(RwLock::new(HashMap::new())),
358 symbol_index: Arc::new(RwLock::new(HashMap::new())),
359 active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
360 }
361 }
362
363 pub async fn add_subscription(
374 &self,
375 stream: String,
376 symbol: String,
377 sub_type: SubscriptionType,
378 sender: tokio::sync::mpsc::UnboundedSender<Value>,
379 ) -> Result<()> {
380 let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
381
382 let mut subs = self.subscriptions.write().await;
384 subs.insert(stream.clone(), subscription);
385
386 let mut index = self.symbol_index.write().await;
388 index.entry(symbol).or_insert_with(Vec::new).push(stream);
389
390 self.active_count
392 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
393
394 Ok(())
395 }
396
397 pub async fn remove_subscription(&self, stream: &str) -> Result<()> {
405 let mut subs = self.subscriptions.write().await;
406
407 if let Some(subscription) = subs.remove(stream) {
408 let mut index = self.symbol_index.write().await;
410 if let Some(streams) = index.get_mut(&subscription.symbol) {
411 streams.retain(|s| s != stream);
412 if streams.is_empty() {
414 index.remove(&subscription.symbol);
415 }
416 }
417
418 self.active_count
420 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
421 }
422
423 Ok(())
424 }
425
426 pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
434 let subs = self.subscriptions.read().await;
435 subs.get(stream).cloned()
436 }
437
438 pub async fn has_subscription(&self, stream: &str) -> bool {
446 let subs = self.subscriptions.read().await;
447 subs.contains_key(stream)
448 }
449
450 pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
452 let subs = self.subscriptions.read().await;
453 subs.values().cloned().collect()
454 }
455
456 pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
461 let index = self.symbol_index.read().await;
462 let subs = self.subscriptions.read().await;
463
464 if let Some(streams) = index.get(symbol) {
465 streams
466 .iter()
467 .filter_map(|stream| subs.get(stream).cloned())
468 .collect()
469 } else {
470 Vec::new()
471 }
472 }
473
474 pub fn active_count(&self) -> usize {
476 self.active_count.load(std::sync::atomic::Ordering::SeqCst)
477 }
478
479 pub async fn clear(&self) {
481 let mut subs = self.subscriptions.write().await;
482 let mut index = self.symbol_index.write().await;
483
484 subs.clear();
485 index.clear();
486 self.active_count
487 .store(0, std::sync::atomic::Ordering::SeqCst);
488 }
489
490 pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
499 let subs = self.subscriptions.read().await;
500 if let Some(subscription) = subs.get(stream) {
501 subscription.send(message)
502 } else {
503 false
504 }
505 }
506
507 pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
516 let index = self.symbol_index.read().await;
517 let subs = self.subscriptions.read().await;
518
519 let mut sent_count = 0;
520
521 if let Some(streams) = index.get(symbol) {
522 for stream in streams {
523 if let Some(subscription) = subs.get(stream) {
524 if subscription.send(message.clone()) {
525 sent_count += 1;
526 }
527 }
528 }
529 }
530
531 sent_count
532 }
533}
534#[derive(Debug, Clone)]
538pub struct ReconnectConfig {
539 pub enabled: bool,
541
542 pub initial_delay_ms: u64,
544
545 pub max_delay_ms: u64,
547
548 pub backoff_multiplier: f64,
550
551 pub max_attempts: usize,
553}
554
555impl Default for ReconnectConfig {
556 fn default() -> Self {
557 Self {
558 enabled: true,
559 initial_delay_ms: 1000, max_delay_ms: 30000, backoff_multiplier: 2.0, max_attempts: 0, }
564 }
565}
566
567impl ReconnectConfig {
568 pub fn calculate_delay(&self, attempt: usize) -> u64 {
578 let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
579 delay.min(self.max_delay_ms as f64) as u64
580 }
581
582 pub fn should_retry(&self, attempt: usize) -> bool {
590 self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
591 }
592}
593
594pub struct MessageRouter {
602 ws_client: Arc<RwLock<Option<WsClient>>>,
604
605 subscription_manager: Arc<SubscriptionManager>,
607
608 router_task: Arc<Mutex<Option<JoinHandle<()>>>>,
610
611 is_connected: Arc<std::sync::atomic::AtomicBool>,
613
614 reconnect_config: Arc<RwLock<ReconnectConfig>>,
616
617 ws_url: String,
619
620 request_id: Arc<std::sync::atomic::AtomicU64>,
622}
623
624impl MessageRouter {
625 pub fn new(ws_url: String, subscription_manager: Arc<SubscriptionManager>) -> Self {
634 Self {
635 ws_client: Arc::new(RwLock::new(None)),
636 subscription_manager,
637 router_task: Arc::new(Mutex::new(None)),
638 is_connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
639 reconnect_config: Arc::new(RwLock::new(ReconnectConfig::default())),
640 ws_url,
641 request_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
642 }
643 }
644
645 pub async fn start(&self) -> Result<()> {
652 if self.is_connected() {
654 self.stop().await?;
655 }
656
657 let config = WsConfig {
659 url: self.ws_url.clone(),
660 ..Default::default()
661 };
662 let client = WsClient::new(config);
663 client.connect().await?;
664
665 *self.ws_client.write().await = Some(client);
667
668 self.is_connected
670 .store(true, std::sync::atomic::Ordering::SeqCst);
671
672 let ws_client = self.ws_client.clone();
674 let subscription_manager = self.subscription_manager.clone();
675 let is_connected = self.is_connected.clone();
676 let reconnect_config = self.reconnect_config.clone();
677 let ws_url = self.ws_url.clone();
678
679 let handle = tokio::spawn(async move {
680 Self::message_loop(
681 ws_client,
682 subscription_manager,
683 is_connected,
684 reconnect_config,
685 ws_url,
686 )
687 .await
688 });
689
690 *self.router_task.lock().await = Some(handle);
691
692 Ok(())
693 }
694
695 pub async fn stop(&self) -> Result<()> {
702 self.is_connected
704 .store(false, std::sync::atomic::Ordering::SeqCst);
705
706 let mut task_opt = self.router_task.lock().await;
708 if let Some(handle) = task_opt.take() {
709 handle.abort();
710 }
711
712 let mut client_opt = self.ws_client.write().await;
714 if let Some(client) = client_opt.take() {
715 let _ = client.disconnect().await;
716 }
717
718 Ok(())
719 }
720
721 pub async fn restart(&self) -> Result<()> {
728 self.stop().await?;
729 tokio::time::sleep(Duration::from_millis(100)).await;
730 self.start().await
731 }
732
733 pub fn is_connected(&self) -> bool {
735 self.is_connected.load(std::sync::atomic::Ordering::SeqCst)
736 }
737
738 pub async fn set_reconnect_config(&self, config: ReconnectConfig) {
743 *self.reconnect_config.write().await = config;
744 }
745
746 pub async fn get_reconnect_config(&self) -> ReconnectConfig {
748 self.reconnect_config.read().await.clone()
749 }
750
751 pub async fn subscribe(&self, streams: Vec<String>) -> Result<()> {
758 if streams.is_empty() {
759 return Ok(());
760 }
761
762 let client_opt = self.ws_client.read().await;
763 let client = client_opt
764 .as_ref()
765 .ok_or_else(|| Error::network("WebSocket not connected"))?;
766
767 let id = self
769 .request_id
770 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
771
772 #[allow(clippy::disallowed_methods)]
774 let request = serde_json::json!({
775 "method": "SUBSCRIBE",
776 "params": streams,
777 "id": id
778 });
779
780 client
782 .send(Message::Text(request.to_string().into()))
783 .await?;
784
785 Ok(())
786 }
787
788 pub async fn unsubscribe(&self, streams: Vec<String>) -> Result<()> {
795 if streams.is_empty() {
796 return Ok(());
797 }
798
799 let client_opt = self.ws_client.read().await;
800 let client = client_opt
801 .as_ref()
802 .ok_or_else(|| Error::network("WebSocket not connected"))?;
803
804 let id = self
806 .request_id
807 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
808
809 #[allow(clippy::disallowed_methods)]
811 let request = serde_json::json!({
812 "method": "UNSUBSCRIBE",
813 "params": streams,
814 "id": id
815 });
816
817 client
819 .send(Message::Text(request.to_string().into()))
820 .await?;
821
822 Ok(())
823 }
824
825 async fn message_loop(
829 ws_client: Arc<RwLock<Option<WsClient>>>,
830 subscription_manager: Arc<SubscriptionManager>,
831 is_connected: Arc<std::sync::atomic::AtomicBool>,
832 reconnect_config: Arc<RwLock<ReconnectConfig>>,
833 ws_url: String,
834 ) {
835 let mut reconnect_attempt = 0;
836
837 loop {
838 if !is_connected.load(std::sync::atomic::Ordering::SeqCst) {
840 break;
841 }
842
843 let has_client = ws_client.read().await.is_some();
845
846 if !has_client {
847 let config = reconnect_config.read().await;
849 if config.should_retry(reconnect_attempt) {
850 let delay = config.calculate_delay(reconnect_attempt);
851 drop(config);
852
853 tokio::time::sleep(Duration::from_millis(delay)).await;
854
855 match Self::reconnect(&ws_url, ws_client.clone()).await {
856 Ok(_) => {
857 reconnect_attempt = 0; continue;
859 }
860 Err(_) => {
861 reconnect_attempt += 1;
862 continue;
863 }
864 }
865 } else {
866 is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
868 break;
869 }
870 }
871
872 let message_opt = {
874 let guard = ws_client.read().await;
875 if let Some(client) = guard.as_ref() {
876 client.receive().await
877 } else {
878 None
879 }
880 };
881
882 match message_opt {
883 Some(value) => {
884 if let Err(_e) = Self::handle_message(value, subscription_manager.clone()).await
886 {
887 continue;
888 }
889
890 reconnect_attempt = 0;
892 }
893 None => {
894 let config = reconnect_config.read().await;
896 if config.should_retry(reconnect_attempt) {
897 let delay = config.calculate_delay(reconnect_attempt);
898 drop(config);
899
900 tokio::time::sleep(Duration::from_millis(delay)).await;
901
902 match Self::reconnect(&ws_url, ws_client.clone()).await {
903 Ok(_) => {
904 reconnect_attempt = 0;
905 continue;
906 }
907 Err(_) => {
908 reconnect_attempt += 1;
909 continue;
910 }
911 }
912 } else {
913 is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
915 break;
916 }
917 }
918 }
919 }
920 }
921
922 async fn handle_message(
926 message: Value,
927 subscription_manager: Arc<SubscriptionManager>,
928 ) -> Result<()> {
929 let stream_name = Self::extract_stream_name(&message)?;
931
932 let sent = subscription_manager
934 .send_to_stream(&stream_name, message)
935 .await;
936
937 if sent {
938 Ok(())
939 } else {
940 Err(Error::generic("No subscribers for stream"))
941 }
942 }
943
944 fn extract_stream_name(message: &Value) -> Result<String> {
956 if let Some(stream) = message.get("stream").and_then(|s| s.as_str()) {
958 return Ok(stream.to_string());
959 }
960
961 if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
963 if let Some(symbol) = message.get("s").and_then(|s| s.as_str()) {
964 let stream = match event_type {
966 "24hrTicker" => format!("{}@ticker", symbol.to_lowercase()),
967 "depthUpdate" => format!("{}@depth", symbol.to_lowercase()),
968 "aggTrade" => format!("{}@aggTrade", symbol.to_lowercase()),
969 "trade" => format!("{}@trade", symbol.to_lowercase()),
970 "kline" => {
971 if let Some(kline) = message.get("k") {
973 if let Some(interval) = kline.get("i").and_then(|i| i.as_str()) {
974 format!("{}@kline_{}", symbol.to_lowercase(), interval)
975 } else {
976 return Err(Error::generic("Missing kline interval"));
977 }
978 } else {
979 return Err(Error::generic("Missing kline data"));
980 }
981 }
982 "markPriceUpdate" => format!("{}@markPrice", symbol.to_lowercase()),
983 "bookTicker" => format!("{}@bookTicker", symbol.to_lowercase()),
984 _ => {
985 return Err(Error::generic(format!(
986 "Unknown event type: {}",
987 event_type
988 )));
989 }
990 };
991 return Ok(stream);
992 }
993 }
994
995 if message.get("result").is_some() || message.get("error").is_some() {
997 return Err(Error::generic("Subscription response, skip routing"));
998 }
999
1000 Err(Error::generic("Cannot extract stream name from message"))
1001 }
1002
1003 async fn reconnect(ws_url: &str, ws_client: Arc<RwLock<Option<WsClient>>>) -> Result<()> {
1007 {
1009 let mut client_opt = ws_client.write().await;
1010 if let Some(client) = client_opt.take() {
1011 let _ = client.disconnect().await;
1012 }
1013 }
1014
1015 let config = WsConfig {
1017 url: ws_url.to_string(),
1018 ..Default::default()
1019 };
1020 let new_client = WsClient::new(config);
1021
1022 new_client.connect().await?;
1024
1025 *ws_client.write().await = Some(new_client);
1027
1028 Ok(())
1029 }
1030}
1031
1032impl Drop for MessageRouter {
1033 fn drop(&mut self) {
1034 }
1037}
1038
1039impl Default for SubscriptionManager {
1040 fn default() -> Self {
1041 Self::new()
1042 }
1043}
1044
1045pub struct BinanceWs {
1047 client: Arc<WsClient>,
1048 listen_key: Arc<RwLock<Option<String>>>,
1049 listen_key_manager: Option<Arc<ListenKeyManager>>,
1051 auto_reconnect_coordinator: Arc<Mutex<Option<ccxt_core::ws_client::AutoReconnectCoordinator>>>,
1053 tickers: Arc<Mutex<HashMap<String, Ticker>>>,
1055 bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
1057 #[allow(dead_code)]
1059 mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
1060 orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
1062 trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
1064 ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
1066 balances: Arc<RwLock<HashMap<String, Balance>>>,
1068 orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
1070 my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
1072 positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
1074}
1075
1076impl BinanceWs {
1077 pub fn new(url: String) -> Self {
1085 let config = WsConfig {
1086 url,
1087 connect_timeout: 10000,
1088 ping_interval: 180000, reconnect_interval: 5000,
1090 max_reconnect_attempts: 5,
1091 auto_reconnect: true,
1092 enable_compression: false,
1093 pong_timeout: 90000, };
1095
1096 Self {
1097 client: Arc::new(WsClient::new(config)),
1098 listen_key: Arc::new(RwLock::new(None)),
1099 listen_key_manager: None,
1100 auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1101 tickers: Arc::new(Mutex::new(HashMap::new())),
1102 bids_asks: Arc::new(Mutex::new(HashMap::new())),
1103 mark_prices: Arc::new(Mutex::new(HashMap::new())),
1104 orderbooks: Arc::new(Mutex::new(HashMap::new())),
1105 trades: Arc::new(Mutex::new(HashMap::new())),
1106 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1107 balances: Arc::new(RwLock::new(HashMap::new())),
1108 orders: Arc::new(RwLock::new(HashMap::new())),
1109 my_trades: Arc::new(RwLock::new(HashMap::new())),
1110 positions: Arc::new(RwLock::new(HashMap::new())),
1111 }
1112 }
1113
1114 pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
1123 let config = WsConfig {
1124 url,
1125 connect_timeout: 10000,
1126 ping_interval: 180000, reconnect_interval: 5000,
1128 max_reconnect_attempts: 5,
1129 auto_reconnect: true,
1130 enable_compression: false,
1131 pong_timeout: 90000, };
1133
1134 Self {
1135 client: Arc::new(WsClient::new(config)),
1136 listen_key: Arc::new(RwLock::new(None)),
1137 listen_key_manager: Some(Arc::new(ListenKeyManager::new(binance))),
1138 auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1139 tickers: Arc::new(Mutex::new(HashMap::new())),
1140 bids_asks: Arc::new(Mutex::new(HashMap::new())),
1141 mark_prices: Arc::new(Mutex::new(HashMap::new())),
1142 orderbooks: Arc::new(Mutex::new(HashMap::new())),
1143 trades: Arc::new(Mutex::new(HashMap::new())),
1144 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1145 balances: Arc::new(RwLock::new(HashMap::new())),
1146 orders: Arc::new(RwLock::new(HashMap::new())),
1147 my_trades: Arc::new(RwLock::new(HashMap::new())),
1148 positions: Arc::new(RwLock::new(HashMap::new())),
1149 }
1150 }
1151
1152 pub async fn connect(&self) -> Result<()> {
1154 self.client.connect().await?;
1156
1157 let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1159 if coordinator_guard.is_none() {
1160 let coordinator = self.client.clone().create_auto_reconnect_coordinator();
1161 coordinator.start().await;
1162 *coordinator_guard = Some(coordinator);
1163 tracing::info!("Auto-reconnect coordinator started");
1164 }
1165
1166 Ok(())
1167 }
1168
1169 pub async fn disconnect(&self) -> Result<()> {
1171 let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1173 if let Some(coordinator) = coordinator_guard.take() {
1174 coordinator.stop().await;
1175 tracing::info!("Auto-reconnect coordinator stopped");
1176 }
1177
1178 if let Some(manager) = &self.listen_key_manager {
1180 manager.stop_auto_refresh().await;
1181 }
1182
1183 self.client.disconnect().await
1184 }
1185
1186 pub async fn connect_user_stream(&self) -> Result<()> {
1211 let manager = self.listen_key_manager.as_ref()
1212 .ok_or_else(|| Error::invalid_request(
1213 "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
1214 ))?;
1215
1216 let listen_key = manager.get_or_create().await?;
1218
1219 let user_stream_url = format!("wss://stream.binance.com:9443/ws/{}", listen_key);
1221
1222 let config = WsConfig {
1224 url: user_stream_url,
1225 connect_timeout: 10000,
1226 ping_interval: 180000,
1227 reconnect_interval: 5000,
1228 max_reconnect_attempts: 5,
1229 auto_reconnect: true,
1230 enable_compression: false,
1231 pong_timeout: 90000, };
1233
1234 let _new_client = Arc::new(WsClient::new(config));
1236 self.client.connect().await?;
1240
1241 manager.start_auto_refresh().await;
1243
1244 *self.listen_key.write().await = Some(listen_key);
1246
1247 Ok(())
1248 }
1249
1250 pub async fn close_user_stream(&self) -> Result<()> {
1257 if let Some(manager) = &self.listen_key_manager {
1258 manager.delete().await?;
1259 }
1260 *self.listen_key.write().await = None;
1261 Ok(())
1262 }
1263
1264 pub async fn get_listen_key(&self) -> Option<String> {
1266 if let Some(manager) = &self.listen_key_manager {
1267 manager.get_current().await
1268 } else {
1269 self.listen_key.read().await.clone()
1270 }
1271 }
1272
1273 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
1281 let stream = format!("{}@ticker", symbol.to_lowercase());
1282 self.client
1283 .subscribe(stream, Some(symbol.to_string()), None)
1284 .await
1285 }
1286
1287 pub async fn subscribe_all_tickers(&self) -> Result<()> {
1289 self.client
1290 .subscribe("!ticker@arr".to_string(), None, None)
1291 .await
1292 }
1293
1294 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
1302 let stream = format!("{}@trade", symbol.to_lowercase());
1303 self.client
1304 .subscribe(stream, Some(symbol.to_string()), None)
1305 .await
1306 }
1307
1308 pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
1316 let stream = format!("{}@aggTrade", symbol.to_lowercase());
1317 self.client
1318 .subscribe(stream, Some(symbol.to_string()), None)
1319 .await
1320 }
1321
1322 pub async fn subscribe_orderbook(
1332 &self,
1333 symbol: &str,
1334 levels: u32,
1335 update_speed: &str,
1336 ) -> Result<()> {
1337 let stream = if update_speed == "100ms" {
1338 format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
1339 } else {
1340 format!("{}@depth{}", symbol.to_lowercase(), levels)
1341 };
1342
1343 self.client
1344 .subscribe(stream, Some(symbol.to_string()), None)
1345 .await
1346 }
1347
1348 pub async fn subscribe_orderbook_diff(
1357 &self,
1358 symbol: &str,
1359 update_speed: Option<&str>,
1360 ) -> Result<()> {
1361 let stream = if let Some(speed) = update_speed {
1362 if speed == "100ms" {
1363 format!("{}@depth@100ms", symbol.to_lowercase())
1364 } else {
1365 format!("{}@depth", symbol.to_lowercase())
1366 }
1367 } else {
1368 format!("{}@depth", symbol.to_lowercase())
1369 };
1370
1371 self.client
1372 .subscribe(stream, Some(symbol.to_string()), None)
1373 .await
1374 }
1375
1376 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
1385 let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
1386 self.client
1387 .subscribe(stream, Some(symbol.to_string()), None)
1388 .await
1389 }
1390
1391 pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
1399 let stream = format!("{}@miniTicker", symbol.to_lowercase());
1400 self.client
1401 .subscribe(stream, Some(symbol.to_string()), None)
1402 .await
1403 }
1404
1405 pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
1407 self.client
1408 .subscribe("!miniTicker@arr".to_string(), None, None)
1409 .await
1410 }
1411
1412 pub async fn unsubscribe(&self, stream: String) -> Result<()> {
1420 self.client.unsubscribe(stream, None).await
1421 }
1422
1423 pub async fn receive(&self) -> Option<Value> {
1428 self.client.receive().await
1429 }
1430
1431 pub async fn is_connected(&self) -> bool {
1433 self.client.is_connected().await
1434 }
1435
1436 async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
1445 let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
1446
1447 self.client
1449 .subscribe(stream.clone(), Some(symbol.to_string()), None)
1450 .await?;
1451
1452 loop {
1454 if let Some(message) = self.client.receive().await {
1455 if message.get("result").is_some() {
1457 continue;
1458 }
1459
1460 if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1462 let mut tickers = self.tickers.lock().await;
1464 tickers.insert(ticker.symbol.clone(), ticker.clone());
1465
1466 return Ok(ticker);
1467 }
1468 }
1469 }
1470 }
1471
1472 async fn watch_tickers_internal(
1481 &self,
1482 symbols: Option<Vec<String>>,
1483 channel_name: &str,
1484 ) -> Result<HashMap<String, Ticker>> {
1485 let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
1486 syms.iter()
1488 .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
1489 .collect()
1490 } else {
1491 vec![format!("!{}@arr", channel_name)]
1493 };
1494
1495 for stream in &streams {
1497 self.client.subscribe(stream.clone(), None, None).await?;
1498 }
1499
1500 let mut result = HashMap::new();
1502
1503 loop {
1504 if let Some(message) = self.client.receive().await {
1505 if message.get("result").is_some() {
1507 continue;
1508 }
1509
1510 if let Some(arr) = message.as_array() {
1512 for item in arr {
1513 if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
1514 let symbol = ticker.symbol.clone();
1515
1516 if let Some(syms) = &symbols {
1518 if syms.contains(&symbol.to_lowercase()) {
1519 result.insert(symbol.clone(), ticker.clone());
1520 }
1521 } else {
1522 result.insert(symbol.clone(), ticker.clone());
1523 }
1524
1525 let mut tickers = self.tickers.lock().await;
1527 tickers.insert(symbol, ticker);
1528 }
1529 }
1530
1531 if let Some(syms) = &symbols {
1533 if result.len() == syms.len() {
1534 return Ok(result);
1535 }
1536 } else {
1537 return Ok(result);
1538 }
1539 } else if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1540 let symbol = ticker.symbol.clone();
1542 result.insert(symbol.clone(), ticker.clone());
1543
1544 let mut tickers = self.tickers.lock().await;
1546 tickers.insert(symbol, ticker);
1547
1548 if let Some(syms) = &symbols {
1550 if result.len() == syms.len() {
1551 return Ok(result);
1552 }
1553 }
1554 }
1555 }
1556 }
1557 }
1558
1559 async fn handle_orderbook_delta(
1569 &self,
1570 symbol: &str,
1571 delta_message: &Value,
1572 is_futures: bool,
1573 ) -> Result<()> {
1574 use ccxt_core::types::orderbook::{OrderBookDelta, OrderBookEntry};
1575 use rust_decimal::Decimal;
1576
1577 let first_update_id = delta_message["U"]
1579 .as_i64()
1580 .ok_or_else(|| Error::invalid_request("Missing first update ID in delta message"))?;
1581
1582 let final_update_id = delta_message["u"]
1583 .as_i64()
1584 .ok_or_else(|| Error::invalid_request("Missing final update ID in delta message"))?;
1585
1586 let prev_final_update_id = if is_futures {
1587 delta_message["pu"].as_i64()
1588 } else {
1589 None
1590 };
1591
1592 let timestamp = delta_message["E"]
1593 .as_i64()
1594 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1595
1596 let mut bids = Vec::new();
1598 if let Some(bids_arr) = delta_message["b"].as_array() {
1599 for bid in bids_arr {
1600 if let (Some(price_str), Some(amount_str)) = (bid[0].as_str(), bid[1].as_str()) {
1601 if let (Ok(price), Ok(amount)) =
1602 (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1603 {
1604 bids.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1605 }
1606 }
1607 }
1608 }
1609
1610 let mut asks = Vec::new();
1612 if let Some(asks_arr) = delta_message["a"].as_array() {
1613 for ask in asks_arr {
1614 if let (Some(price_str), Some(amount_str)) = (ask[0].as_str(), ask[1].as_str()) {
1615 if let (Ok(price), Ok(amount)) =
1616 (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1617 {
1618 asks.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1619 }
1620 }
1621 }
1622 }
1623
1624 let delta = OrderBookDelta {
1626 symbol: symbol.to_string(),
1627 first_update_id,
1628 final_update_id,
1629 prev_final_update_id,
1630 timestamp,
1631 bids,
1632 asks,
1633 };
1634
1635 let mut orderbooks = self.orderbooks.lock().await;
1637 let orderbook = orderbooks
1638 .entry(symbol.to_string())
1639 .or_insert_with(|| OrderBook::new(symbol.to_string(), timestamp));
1640
1641 if !orderbook.is_synced {
1643 orderbook.buffer_delta(delta);
1644 return Ok(());
1645 }
1646
1647 if let Err(e) = orderbook.apply_delta(&delta, is_futures) {
1649 if orderbook.needs_resync {
1651 tracing::warn!("Orderbook {} needs resync due to: {}", symbol, e);
1652 orderbook.buffer_delta(delta);
1654 return Err(Error::invalid_request(format!("RESYNC_NEEDED: {}", e)));
1656 }
1657 return Err(Error::invalid_request(e));
1658 }
1659
1660 Ok(())
1661 }
1662
1663 async fn fetch_orderbook_snapshot(
1674 &self,
1675 exchange: &Binance,
1676 symbol: &str,
1677 limit: Option<i64>,
1678 is_futures: bool,
1679 ) -> Result<OrderBook> {
1680 let mut params = HashMap::new();
1682 if let Some(l) = limit {
1683 #[allow(clippy::disallowed_methods)]
1685 let limit_value = serde_json::json!(l);
1686 params.insert("limit".to_string(), limit_value);
1687 }
1688
1689 let mut snapshot = exchange.fetch_order_book(symbol, None).await?;
1690
1691 snapshot.is_synced = true;
1693
1694 let mut orderbooks = self.orderbooks.lock().await;
1696 if let Some(cached_ob) = orderbooks.get_mut(symbol) {
1697 snapshot.buffered_deltas = cached_ob.buffered_deltas.clone();
1699
1700 if let Ok(processed) = snapshot.process_buffered_deltas(is_futures) {
1702 tracing::debug!("Processed {} buffered deltas for {}", processed, symbol);
1703 }
1704 }
1705
1706 orderbooks.insert(symbol.to_string(), snapshot.clone());
1708
1709 Ok(snapshot)
1710 }
1711
1712 async fn watch_orderbook_internal(
1724 &self,
1725 exchange: &Binance,
1726 symbol: &str,
1727 limit: Option<i64>,
1728 update_speed: i32,
1729 is_futures: bool,
1730 ) -> Result<OrderBook> {
1731 let stream = if update_speed == 100 {
1733 format!("{}@depth@100ms", symbol.to_lowercase())
1734 } else {
1735 format!("{}@depth", symbol.to_lowercase())
1736 };
1737
1738 self.client
1740 .subscribe(stream.clone(), Some(symbol.to_string()), None)
1741 .await?;
1742
1743 let snapshot_fetched = Arc::new(Mutex::new(false));
1745 let _snapshot_fetched_clone = snapshot_fetched.clone();
1746
1747 let _orderbooks_clone = self.orderbooks.clone();
1749 let _symbol_clone = symbol.to_string();
1750
1751 tokio::spawn(async move {
1752 });
1754
1755 tokio::time::sleep(Duration::from_millis(500)).await;
1757
1758 let _snapshot = self
1760 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1761 .await?;
1762
1763 *snapshot_fetched.lock().await = true;
1764
1765 loop {
1767 if let Some(message) = self.client.receive().await {
1768 if message.get("result").is_some() {
1770 continue;
1771 }
1772
1773 if let Some(event_type) = message.get("e").and_then(|v| v.as_str()) {
1775 if event_type == "depthUpdate" {
1776 match self
1777 .handle_orderbook_delta(symbol, &message, is_futures)
1778 .await
1779 {
1780 Ok(_) => {
1781 let orderbooks = self.orderbooks.lock().await;
1783 if let Some(ob) = orderbooks.get(symbol) {
1784 if ob.is_synced {
1785 return Ok(ob.clone());
1786 }
1787 }
1788 }
1789 Err(e) => {
1790 let err_msg = e.to_string();
1791
1792 if err_msg.contains("RESYNC_NEEDED") {
1794 tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
1795
1796 let current_time = chrono::Utc::now().timestamp_millis();
1797 let should_resync = {
1798 let orderbooks = self.orderbooks.lock().await;
1799 if let Some(ob) = orderbooks.get(symbol) {
1800 ob.should_resync(current_time)
1801 } else {
1802 true
1803 }
1804 };
1805
1806 if should_resync {
1807 tracing::info!("Initiating resync for {}", symbol);
1808
1809 {
1811 let mut orderbooks = self.orderbooks.lock().await;
1812 if let Some(ob) = orderbooks.get_mut(symbol) {
1813 ob.reset_for_resync();
1814 ob.mark_resync_initiated(current_time);
1815 }
1816 }
1817
1818 tokio::time::sleep(Duration::from_millis(500)).await;
1820
1821 match self
1823 .fetch_orderbook_snapshot(
1824 exchange, symbol, limit, is_futures,
1825 )
1826 .await
1827 {
1828 Ok(_) => {
1829 tracing::info!(
1830 "Resync completed successfully for {}",
1831 symbol
1832 );
1833 continue;
1834 }
1835 Err(resync_err) => {
1836 tracing::error!(
1837 "Resync failed for {}: {}",
1838 symbol,
1839 resync_err
1840 );
1841 return Err(resync_err);
1842 }
1843 }
1844 } else {
1845 tracing::debug!(
1846 "Resync rate limited for {}, skipping",
1847 symbol
1848 );
1849 continue;
1850 }
1851 } else {
1852 tracing::error!(
1853 "Failed to handle orderbook delta: {}",
1854 err_msg
1855 );
1856 continue;
1857 }
1858 }
1859 }
1860 }
1861 }
1862 }
1863 }
1864 }
1865
1866 async fn watch_orderbooks_internal(
1878 &self,
1879 exchange: &Binance,
1880 symbols: Vec<String>,
1881 limit: Option<i64>,
1882 update_speed: i32,
1883 is_futures: bool,
1884 ) -> Result<HashMap<String, OrderBook>> {
1885 if symbols.len() > 200 {
1887 return Err(Error::invalid_request(
1888 "Binance supports max 200 symbols per connection",
1889 ));
1890 }
1891
1892 for symbol in &symbols {
1894 let stream = if update_speed == 100 {
1895 format!("{}@depth@100ms", symbol.to_lowercase())
1896 } else {
1897 format!("{}@depth", symbol.to_lowercase())
1898 };
1899
1900 self.client
1901 .subscribe(stream, Some(symbol.clone()), None)
1902 .await?;
1903 }
1904
1905 tokio::time::sleep(Duration::from_millis(500)).await;
1907
1908 for symbol in &symbols {
1910 let _ = self
1911 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1912 .await;
1913 }
1914
1915 let mut result = HashMap::new();
1917 let mut update_count = 0;
1918
1919 while update_count < symbols.len() {
1920 if let Some(message) = self.client.receive().await {
1921 if message.get("result").is_some() {
1923 continue;
1924 }
1925
1926 if let Some(event_type) = message.get("e").and_then(|v| v.as_str()) {
1928 if event_type == "depthUpdate" {
1929 if let Some(msg_symbol) = message.get("s").and_then(|v| v.as_str()) {
1930 if let Err(e) = self
1931 .handle_orderbook_delta(msg_symbol, &message, is_futures)
1932 .await
1933 {
1934 tracing::error!("Failed to handle orderbook delta: {}", e);
1935 continue;
1936 }
1937
1938 update_count += 1;
1939 }
1940 }
1941 }
1942 }
1943 }
1944
1945 let orderbooks = self.orderbooks.lock().await;
1947 for symbol in &symbols {
1948 if let Some(ob) = orderbooks.get(symbol) {
1949 result.insert(symbol.clone(), ob.clone());
1950 }
1951 }
1952
1953 Ok(result)
1954 }
1955
1956 pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1963 let tickers = self.tickers.lock().await;
1964 tickers.get(symbol).cloned()
1965 }
1966
1967 pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1969 let tickers = self.tickers.lock().await;
1970 tickers.clone()
1971 }
1972
1973 async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1982 use rust_decimal::Decimal;
1983 use std::str::FromStr;
1984
1985 let event_type = message
1987 .get("e")
1988 .and_then(|e| e.as_str())
1989 .ok_or_else(|| Error::invalid_request("Missing event type in balance message"))?;
1990
1991 let mut balances = self.balances.write().await;
1993 let balance = balances
1994 .entry(account_type.to_string())
1995 .or_insert_with(Balance::new);
1996
1997 match event_type {
1998 "balanceUpdate" => {
2000 let asset = message
2001 .get("a")
2002 .and_then(|a| a.as_str())
2003 .ok_or_else(|| Error::invalid_request("Missing asset in balanceUpdate"))?;
2004
2005 let delta_str = message
2006 .get("d")
2007 .and_then(|d| d.as_str())
2008 .ok_or_else(|| Error::invalid_request("Missing delta in balanceUpdate"))?;
2009
2010 let delta = Decimal::from_str(delta_str)
2011 .map_err(|e| Error::invalid_request(format!("Invalid delta value: {}", e)))?;
2012
2013 balance.apply_delta(asset.to_string(), delta);
2015 }
2016
2017 "outboundAccountPosition" => {
2019 if let Some(balances_array) = message.get("B").and_then(|b| b.as_array()) {
2020 for balance_item in balances_array {
2021 let asset =
2022 balance_item
2023 .get("a")
2024 .and_then(|a| a.as_str())
2025 .ok_or_else(|| {
2026 Error::invalid_request("Missing asset in balance item")
2027 })?;
2028
2029 let free_str = balance_item
2030 .get("f")
2031 .and_then(|f| f.as_str())
2032 .ok_or_else(|| Error::invalid_request("Missing free balance"))?;
2033
2034 let locked_str = balance_item
2035 .get("l")
2036 .and_then(|l| l.as_str())
2037 .ok_or_else(|| Error::invalid_request("Missing locked balance"))?;
2038
2039 let free = Decimal::from_str(free_str).map_err(|e| {
2040 Error::invalid_request(format!("Invalid free value: {}", e))
2041 })?;
2042
2043 let locked = Decimal::from_str(locked_str).map_err(|e| {
2044 Error::invalid_request(format!("Invalid locked value: {}", e))
2045 })?;
2046
2047 balance.update_balance(asset.to_string(), free, locked);
2049 }
2050 }
2051 }
2052
2053 "ACCOUNT_UPDATE" => {
2055 if let Some(account_data) = message.get("a") {
2056 if let Some(balances_array) = account_data.get("B").and_then(|b| b.as_array()) {
2058 for balance_item in balances_array {
2059 let asset = balance_item.get("a").and_then(|a| a.as_str()).ok_or_else(
2060 || Error::invalid_request("Missing asset in balance item"),
2061 )?;
2062
2063 let wallet_balance_str = balance_item
2064 .get("wb")
2065 .and_then(|wb| wb.as_str())
2066 .ok_or_else(|| Error::invalid_request("Missing wallet balance"))?;
2067
2068 let wallet_balance =
2069 Decimal::from_str(wallet_balance_str).map_err(|e| {
2070 Error::invalid_request(format!("Invalid wallet balance: {}", e))
2071 })?;
2072
2073 let cross_wallet = balance_item
2075 .get("cw")
2076 .and_then(|cw| cw.as_str())
2077 .and_then(|s| Decimal::from_str(s).ok());
2078
2079 balance.update_wallet(asset.to_string(), wallet_balance, cross_wallet);
2081 }
2082 }
2083
2084 }
2086 }
2087
2088 _ => {
2089 return Err(Error::invalid_request(format!(
2090 "Unknown balance event type: {}",
2091 event_type
2092 )));
2093 }
2094 }
2095
2096 Ok(())
2097 }
2098
2099 fn parse_ws_trade(&self, data: &Value) -> Result<Trade> {
2109 use ccxt_core::types::{Fee, OrderSide, OrderType, TakerOrMaker};
2110 use rust_decimal::Decimal;
2111 use std::str::FromStr;
2112
2113 let symbol = data
2115 .get("s")
2116 .and_then(|v| v.as_str())
2117 .ok_or_else(|| Error::invalid_request("Missing symbol field".to_string()))?
2118 .to_string();
2119
2120 let id = data
2122 .get("t")
2123 .and_then(|v| v.as_i64())
2124 .map(|v| v.to_string());
2125
2126 let timestamp = data.get("T").and_then(|v| v.as_i64()).unwrap_or(0);
2128
2129 let price = data
2131 .get("L")
2132 .and_then(|v| v.as_str())
2133 .and_then(|s| Decimal::from_str(s).ok())
2134 .unwrap_or(Decimal::ZERO);
2135
2136 let amount = data
2138 .get("l")
2139 .and_then(|v| v.as_str())
2140 .and_then(|s| Decimal::from_str(s).ok())
2141 .unwrap_or(Decimal::ZERO);
2142
2143 let cost = data
2145 .get("Y")
2146 .and_then(|v| v.as_str())
2147 .and_then(|s| Decimal::from_str(s).ok())
2148 .or_else(|| {
2149 if price > Decimal::ZERO && amount > Decimal::ZERO {
2151 Some(price * amount)
2152 } else {
2153 None
2154 }
2155 });
2156
2157 let side = data
2159 .get("S")
2160 .and_then(|v| v.as_str())
2161 .and_then(|s| match s.to_uppercase().as_str() {
2162 "BUY" => Some(OrderSide::Buy),
2163 "SELL" => Some(OrderSide::Sell),
2164 _ => None,
2165 })
2166 .unwrap_or(OrderSide::Buy);
2167
2168 let trade_type =
2170 data.get("o")
2171 .and_then(|v| v.as_str())
2172 .and_then(|s| match s.to_uppercase().as_str() {
2173 "LIMIT" => Some(OrderType::Limit),
2174 "MARKET" => Some(OrderType::Market),
2175 _ => None,
2176 });
2177
2178 let order_id = data
2180 .get("i")
2181 .and_then(|v| v.as_i64())
2182 .map(|v| v.to_string());
2183
2184 let taker_or_maker = data.get("m").and_then(|v| v.as_bool()).map(|is_maker| {
2186 if is_maker {
2187 TakerOrMaker::Maker
2188 } else {
2189 TakerOrMaker::Taker
2190 }
2191 });
2192
2193 let fee = if let Some(fee_cost_str) = data.get("n").and_then(|v| v.as_str()) {
2195 if let Ok(fee_cost) = Decimal::from_str(fee_cost_str) {
2196 let currency = data
2197 .get("N")
2198 .and_then(|v| v.as_str())
2199 .unwrap_or("UNKNOWN")
2200 .to_string();
2201 Some(Fee {
2202 currency,
2203 cost: fee_cost,
2204 rate: None,
2205 })
2206 } else {
2207 None
2208 }
2209 } else {
2210 None
2211 };
2212
2213 let datetime = chrono::DateTime::from_timestamp_millis(timestamp)
2215 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string());
2216
2217 let mut info = HashMap::new();
2219 if let Value::Object(map) = data {
2220 for (k, v) in map.iter() {
2221 info.insert(k.clone(), v.clone());
2222 }
2223 }
2224
2225 Ok(Trade {
2226 id,
2227 order: order_id,
2228 symbol,
2229 trade_type,
2230 side,
2231 taker_or_maker,
2232 price: Price::from(price),
2233 amount: Amount::from(amount),
2234 cost: cost.map(Cost::from),
2235 fee,
2236 timestamp,
2237 datetime,
2238 info,
2239 })
2240 }
2241
2242 async fn filter_my_trades(
2249 &self,
2250 symbol: Option<&str>,
2251 since: Option<i64>,
2252 limit: Option<usize>,
2253 ) -> Result<Vec<Trade>> {
2254 let trades_map = self.my_trades.read().await;
2255
2256 let mut trades: Vec<Trade> = if let Some(sym) = symbol {
2258 trades_map
2259 .get(sym)
2260 .map(|symbol_trades| symbol_trades.iter().cloned().collect())
2261 .unwrap_or_default()
2262 } else {
2263 trades_map
2264 .values()
2265 .flat_map(|symbol_trades| symbol_trades.iter().cloned())
2266 .collect()
2267 };
2268
2269 if let Some(since_ts) = since {
2271 trades.retain(|trade| trade.timestamp >= since_ts);
2272 }
2273
2274 trades.sort_by(|a, b| {
2276 let ts_a = a.timestamp;
2277 let ts_b = b.timestamp;
2278 ts_b.cmp(&ts_a)
2279 });
2280
2281 if let Some(lim) = limit {
2283 trades.truncate(lim);
2284 }
2285
2286 Ok(trades)
2287 }
2288
2289 async fn parse_ws_position(&self, data: &Value) -> Result<Position> {
2312 let symbol = data["s"]
2314 .as_str()
2315 .ok_or_else(|| Error::invalid_request("Missing symbol field"))?
2316 .to_string();
2317
2318 let position_amount_str = data["pa"]
2319 .as_str()
2320 .ok_or_else(|| Error::invalid_request("Missing position amount"))?;
2321
2322 let position_amount = position_amount_str
2323 .parse::<f64>()
2324 .map_err(|e| Error::invalid_request(format!("Invalid position amount: {}", e)))?;
2325
2326 let position_side = data["ps"]
2328 .as_str()
2329 .ok_or_else(|| Error::invalid_request("Missing position side"))?
2330 .to_uppercase();
2331
2332 let (side, hedged) = if position_side == "BOTH" {
2336 let actual_side = if position_amount < 0.0 {
2337 "short"
2338 } else {
2339 "long"
2340 };
2341 (actual_side.to_string(), false)
2342 } else {
2343 (position_side.to_lowercase(), true)
2344 };
2345
2346 let entry_price = data["ep"].as_str().and_then(|s| s.parse::<f64>().ok());
2348 let unrealized_pnl = data["up"].as_str().and_then(|s| s.parse::<f64>().ok());
2349 let realized_pnl = data["cr"].as_str().and_then(|s| s.parse::<f64>().ok());
2350 let margin_mode = data["mt"].as_str().map(|s| s.to_string());
2351 let initial_margin = data["iw"].as_str().and_then(|s| s.parse::<f64>().ok());
2352 let _margin_asset = data["ma"].as_str().map(|s| s.to_string());
2353
2354 Ok(Position {
2356 info: data.clone(),
2357 id: None,
2358 symbol,
2359 side: Some(side),
2360 contracts: Some(position_amount.abs()), contract_size: None,
2362 entry_price,
2363 mark_price: None,
2364 notional: None,
2365 leverage: None,
2366 collateral: initial_margin, initial_margin,
2368 initial_margin_percentage: None,
2369 maintenance_margin: None,
2370 maintenance_margin_percentage: None,
2371 unrealized_pnl,
2372 realized_pnl,
2373 liquidation_price: None,
2374 margin_ratio: None,
2375 margin_mode,
2376 hedged: Some(hedged),
2377 percentage: None,
2378 position_side: None,
2379 dual_side_position: None,
2380 timestamp: Some(chrono::Utc::now().timestamp_millis()),
2381 datetime: Some(chrono::Utc::now().to_rfc3339()),
2382 })
2383 }
2384
2385 async fn filter_positions(
2395 &self,
2396 symbols: Option<&[String]>,
2397 since: Option<i64>,
2398 limit: Option<usize>,
2399 ) -> Result<Vec<Position>> {
2400 let positions_map = self.positions.read().await;
2401
2402 let mut positions: Vec<Position> = if let Some(syms) = symbols {
2404 syms.iter()
2405 .filter_map(|sym| positions_map.get(sym))
2406 .flat_map(|side_map| side_map.values().cloned())
2407 .collect()
2408 } else {
2409 positions_map
2410 .values()
2411 .flat_map(|side_map| side_map.values().cloned())
2412 .collect()
2413 };
2414
2415 if let Some(since_ts) = since {
2417 positions.retain(|pos| {
2418 pos.timestamp
2419 .map(|ts| ts as i64 >= since_ts)
2420 .unwrap_or(false)
2421 });
2422 }
2423
2424 positions.sort_by(|a, b| {
2426 let ts_a = a.timestamp.unwrap_or(0);
2427 let ts_b = b.timestamp.unwrap_or(0);
2428 ts_b.cmp(&ts_a)
2429 });
2430
2431 if let Some(lim) = limit {
2433 positions.truncate(lim);
2434 }
2435
2436 Ok(positions)
2437 }
2438}
2439
2440impl Binance {
2441 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
2449 let ws = self.create_ws();
2450 ws.connect().await?;
2451
2452 let binance_symbol = symbol.replace('/', "").to_lowercase();
2454 ws.subscribe_ticker(&binance_symbol).await
2455 }
2456
2457 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
2465 let ws = self.create_ws();
2466 ws.connect().await?;
2467
2468 let binance_symbol = symbol.replace('/', "").to_lowercase();
2469 ws.subscribe_trades(&binance_symbol).await
2470 }
2471
2472 pub async fn subscribe_orderbook(&self, symbol: &str, levels: Option<u32>) -> Result<()> {
2481 let ws = self.create_ws();
2482 ws.connect().await?;
2483
2484 let binance_symbol = symbol.replace('/', "").to_lowercase();
2485 let depth_levels = levels.unwrap_or(20);
2486 ws.subscribe_orderbook(&binance_symbol, depth_levels, "1000ms")
2487 .await
2488 }
2489
2490 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
2499 let ws = self.create_ws();
2500 ws.connect().await?;
2501
2502 let binance_symbol = symbol.replace('/', "").to_lowercase();
2503 ws.subscribe_kline(&binance_symbol, interval).await
2504 }
2505
2506 pub async fn watch_ticker(
2529 &self,
2530 symbol: &str,
2531 params: Option<HashMap<String, Value>>,
2532 ) -> Result<Ticker> {
2533 self.load_markets(false).await?;
2535
2536 let market = self.base.market(symbol).await?;
2538 let binance_symbol = market.id.to_lowercase();
2539
2540 let channel_name = if let Some(p) = ¶ms {
2542 p.get("name").and_then(|v| v.as_str()).unwrap_or("ticker")
2543 } else {
2544 "ticker"
2545 };
2546
2547 let ws = self.create_ws();
2549 ws.connect().await?;
2550
2551 ws.watch_ticker_internal(&binance_symbol, channel_name)
2553 .await
2554 }
2555
2556 pub async fn watch_tickers(
2585 &self,
2586 symbols: Option<Vec<String>>,
2587 params: Option<HashMap<String, Value>>,
2588 ) -> Result<HashMap<String, Ticker>> {
2589 self.load_markets(false).await?;
2591
2592 let channel_name = if let Some(p) = ¶ms {
2594 p.get("name").and_then(|v| v.as_str()).unwrap_or("ticker")
2595 } else {
2596 "ticker"
2597 };
2598
2599 if channel_name == "bookTicker" {
2601 return Err(Error::invalid_request(
2602 "To subscribe for bids-asks, use watch_bids_asks() method instead",
2603 ));
2604 }
2605
2606 let binance_symbols = if let Some(syms) = symbols {
2608 let mut result = Vec::new();
2609 for symbol in syms {
2610 let market = self.base.market(&symbol).await?;
2611 result.push(market.id.to_lowercase());
2612 }
2613 Some(result)
2614 } else {
2615 None
2616 };
2617
2618 let ws = self.create_ws();
2620 ws.connect().await?;
2621
2622 ws.watch_tickers_internal(binance_symbols, channel_name)
2624 .await
2625 }
2626
2627 pub async fn watch_mark_price(
2657 &self,
2658 symbol: &str,
2659 params: Option<HashMap<String, Value>>,
2660 ) -> Result<Ticker> {
2661 self.load_markets(false).await?;
2663
2664 let market = self.base.market(symbol).await?;
2666 if market.market_type != MarketType::Swap && market.market_type != MarketType::Futures {
2667 return Err(Error::invalid_request(format!(
2668 "watch_mark_price() does not support {} markets",
2669 market.market_type
2670 )));
2671 }
2672
2673 let binance_symbol = market.id.to_lowercase();
2674
2675 let use_1s_freq = if let Some(p) = ¶ms {
2677 p.get("use1sFreq").and_then(|v| v.as_bool()).unwrap_or(true)
2678 } else {
2679 true
2680 };
2681
2682 let channel_name = if use_1s_freq {
2684 "markPrice@1s"
2685 } else {
2686 "markPrice"
2687 };
2688
2689 let ws = self.create_ws();
2691 ws.connect().await?;
2692
2693 ws.watch_ticker_internal(&binance_symbol, channel_name)
2695 .await
2696 }
2697
2698 pub async fn watch_order_book(
2735 &self,
2736 symbol: &str,
2737 limit: Option<i64>,
2738 params: Option<HashMap<String, Value>>,
2739 ) -> Result<OrderBook> {
2740 self.load_markets(false).await?;
2742
2743 let market = self.base.market(symbol).await?;
2745 let binance_symbol = market.id.to_lowercase();
2746
2747 let is_futures =
2749 market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2750
2751 let update_speed = if let Some(p) = ¶ms {
2753 p.get("speed").and_then(|v| v.as_i64()).unwrap_or(100) as i32
2754 } else {
2755 100
2756 };
2757
2758 if update_speed != 100 && update_speed != 1000 {
2760 return Err(Error::invalid_request(
2761 "Update speed must be 100 or 1000 milliseconds",
2762 ));
2763 }
2764
2765 let ws = self.create_ws();
2767 ws.connect().await?;
2768
2769 ws.watch_orderbook_internal(self, &binance_symbol, limit, update_speed, is_futures)
2771 .await
2772 }
2773
2774 pub async fn watch_order_books(
2805 &self,
2806 symbols: Vec<String>,
2807 limit: Option<i64>,
2808 params: Option<HashMap<String, Value>>,
2809 ) -> Result<HashMap<String, OrderBook>> {
2810 if symbols.is_empty() {
2812 return Err(Error::invalid_request("Symbols list cannot be empty"));
2813 }
2814
2815 if symbols.len() > 200 {
2816 return Err(Error::invalid_request(
2817 "Binance supports max 200 symbols per connection",
2818 ));
2819 }
2820
2821 self.load_markets(false).await?;
2823
2824 let mut binance_symbols = Vec::new();
2826 let mut is_futures = false;
2827
2828 for symbol in &symbols {
2829 let market = self.base.market(symbol).await?;
2830 binance_symbols.push(market.id.to_lowercase());
2831
2832 let current_is_futures =
2833 market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2834 if !binance_symbols.is_empty() && current_is_futures != is_futures {
2835 return Err(Error::invalid_request(
2836 "Cannot mix spot and futures markets in watch_order_books",
2837 ));
2838 }
2839 is_futures = current_is_futures;
2840 }
2841
2842 let update_speed = if let Some(p) = ¶ms {
2844 p.get("speed").and_then(|v| v.as_i64()).unwrap_or(100) as i32
2845 } else {
2846 100
2847 };
2848
2849 let ws = self.create_ws();
2851 ws.connect().await?;
2852
2853 ws.watch_orderbooks_internal(self, binance_symbols, limit, update_speed, is_futures)
2855 .await
2856 }
2857
2858 pub async fn watch_mark_prices(
2868 &self,
2869 symbols: Option<Vec<String>>,
2870 params: Option<HashMap<String, Value>>,
2871 ) -> Result<HashMap<String, Ticker>> {
2872 self.load_markets(false).await?;
2874
2875 let use_1s_freq = if let Some(p) = ¶ms {
2877 p.get("use1sFreq").and_then(|v| v.as_bool()).unwrap_or(true)
2878 } else {
2879 true
2880 };
2881
2882 let channel_name = if use_1s_freq {
2884 "markPrice@1s"
2885 } else {
2886 "markPrice"
2887 };
2888
2889 let binance_symbols = if let Some(syms) = symbols {
2891 let mut result = Vec::new();
2892 for symbol in syms {
2893 let market = self.base.market(&symbol).await?;
2894 if market.market_type != MarketType::Swap
2895 && market.market_type != MarketType::Futures
2896 {
2897 return Err(Error::invalid_request(format!(
2898 "watch_mark_prices() does not support {} markets",
2899 market.market_type
2900 )));
2901 }
2902 result.push(market.id.to_lowercase());
2903 }
2904 Some(result)
2905 } else {
2906 None
2907 };
2908
2909 let ws = self.create_ws();
2911 ws.connect().await?;
2912
2913 ws.watch_tickers_internal(binance_symbols, channel_name)
2915 .await
2916 }
2917 pub async fn watch_trades(
2927 &self,
2928 symbol: &str,
2929 since: Option<i64>,
2930 limit: Option<usize>,
2931 ) -> Result<Vec<Trade>> {
2932 self.base.load_markets(false).await?;
2934
2935 let market = self.base.market(symbol).await?;
2937 let binance_symbol = market.id.to_lowercase();
2938
2939 let ws = self.create_ws();
2941 ws.connect().await?;
2942
2943 ws.subscribe_trades(&binance_symbol).await?;
2945
2946 let mut retries = 0;
2948 const MAX_RETRIES: u32 = 50;
2949
2950 while retries < MAX_RETRIES {
2951 if let Some(msg) = ws.client.receive().await {
2952 if msg.get("result").is_some() || msg.get("id").is_some() {
2954 continue;
2955 }
2956
2957 if let Ok(trade) = parser::parse_ws_trade(&msg, Some(&market)) {
2959 let mut trades_map = ws.trades.lock().await;
2961 let trades = trades_map
2962 .entry(symbol.to_string())
2963 .or_insert_with(VecDeque::new);
2964
2965 const MAX_TRADES: usize = 1000;
2967 if trades.len() >= MAX_TRADES {
2968 trades.pop_front();
2969 }
2970 trades.push_back(trade);
2971
2972 let mut result: Vec<Trade> = trades.iter().cloned().collect();
2974
2975 if let Some(since_ts) = since {
2977 result.retain(|t| t.timestamp >= since_ts);
2978 }
2979
2980 if let Some(limit_size) = limit {
2982 if result.len() > limit_size {
2983 result = result.split_off(result.len() - limit_size);
2984 }
2985 }
2986
2987 return Ok(result);
2988 }
2989 }
2990
2991 retries += 1;
2992 tokio::time::sleep(Duration::from_millis(100)).await;
2993 }
2994
2995 Err(Error::network("Timeout waiting for trade data"))
2996 }
2997
2998 pub async fn watch_ohlcv(
3009 &self,
3010 symbol: &str,
3011 timeframe: &str,
3012 since: Option<i64>,
3013 limit: Option<usize>,
3014 ) -> Result<Vec<OHLCV>> {
3015 self.base.load_markets(false).await?;
3017
3018 let market = self.base.market(symbol).await?;
3020 let binance_symbol = market.id.to_lowercase();
3021
3022 let ws = self.create_ws();
3024 ws.connect().await?;
3025
3026 ws.subscribe_kline(&binance_symbol, timeframe).await?;
3028
3029 let mut retries = 0;
3031 const MAX_RETRIES: u32 = 50;
3032
3033 while retries < MAX_RETRIES {
3034 if let Some(msg) = ws.client.receive().await {
3035 if msg.get("result").is_some() || msg.get("id").is_some() {
3037 continue;
3038 }
3039
3040 if let Ok(ohlcv) = parser::parse_ws_ohlcv(&msg) {
3042 let cache_key = format!("{}:{}", symbol, timeframe);
3044 let mut ohlcvs_map = ws.ohlcvs.lock().await;
3045 let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
3046
3047 const MAX_OHLCVS: usize = 1000;
3049 if ohlcvs.len() >= MAX_OHLCVS {
3050 ohlcvs.pop_front();
3051 }
3052 ohlcvs.push_back(ohlcv);
3053
3054 let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
3056
3057 if let Some(since_ts) = since {
3059 result.retain(|o| o.timestamp >= since_ts);
3060 }
3061
3062 if let Some(limit_size) = limit {
3064 if result.len() > limit_size {
3065 result = result.split_off(result.len() - limit_size);
3066 }
3067 }
3068
3069 return Ok(result);
3070 }
3071 }
3072
3073 retries += 1;
3074 tokio::time::sleep(Duration::from_millis(100)).await;
3075 }
3076
3077 Err(Error::network("Timeout waiting for OHLCV data"))
3078 }
3079
3080 pub async fn watch_bids_asks(&self, symbol: &str) -> Result<BidAsk> {
3088 self.base.load_markets(false).await?;
3090
3091 let market = self.base.market(symbol).await?;
3093 let binance_symbol = market.id.to_lowercase();
3094
3095 let ws = self.create_ws();
3097 ws.connect().await?;
3098
3099 let stream_name = format!("{}@bookTicker", binance_symbol);
3101 ws.client
3102 .subscribe(stream_name, Some(symbol.to_string()), None)
3103 .await?;
3104
3105 let mut retries = 0;
3107 const MAX_RETRIES: u32 = 50;
3108
3109 while retries < MAX_RETRIES {
3110 if let Some(msg) = ws.client.receive().await {
3111 if msg.get("result").is_some() || msg.get("id").is_some() {
3113 continue;
3114 }
3115
3116 if let Ok(bid_ask) = parser::parse_ws_bid_ask(&msg) {
3118 let mut bids_asks_map = ws.bids_asks.lock().await;
3120 bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
3121
3122 return Ok(bid_ask);
3123 }
3124 }
3125
3126 retries += 1;
3127 tokio::time::sleep(Duration::from_millis(100)).await;
3128 }
3129
3130 Err(Error::network("Timeout waiting for BidAsk data"))
3131 }
3132
3133 pub async fn watch_balance(
3169 self: Arc<Self>,
3170 params: Option<HashMap<String, Value>>,
3171 ) -> Result<Balance> {
3172 self.base.load_markets(false).await?;
3174
3175 let account_type = if let Some(p) = ¶ms {
3177 p.get("type")
3178 .and_then(|v| v.as_str())
3179 .unwrap_or_else(|| self.options.default_type.as_str())
3180 } else {
3181 self.options.default_type.as_str()
3182 };
3183
3184 let fetch_snapshot = if let Some(p) = ¶ms {
3186 p.get("fetchBalanceSnapshot")
3187 .and_then(|v| v.as_bool())
3188 .unwrap_or(false)
3189 } else {
3190 false
3191 };
3192
3193 let await_snapshot = if let Some(p) = ¶ms {
3194 p.get("awaitBalanceSnapshot")
3195 .and_then(|v| v.as_bool())
3196 .unwrap_or(true)
3197 } else {
3198 true
3199 };
3200
3201 let ws = self.create_authenticated_ws();
3203 ws.connect().await?;
3204
3205 if fetch_snapshot {
3207 let account_type_enum = account_type.parse::<ccxt_core::types::AccountType>().ok();
3209 let snapshot = self.fetch_balance(account_type_enum).await?;
3210
3211 let mut balances = ws.balances.write().await;
3213 balances.insert(account_type.to_string(), snapshot.clone());
3214
3215 if !await_snapshot {
3216 return Ok(snapshot);
3217 }
3218 }
3219
3220 let mut retries = 0;
3222 const MAX_RETRIES: u32 = 100;
3223
3224 while retries < MAX_RETRIES {
3225 if let Some(msg) = ws.client.receive().await {
3226 if msg.get("result").is_some() || msg.get("id").is_some() {
3228 continue;
3229 }
3230
3231 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3233 match event_type {
3235 "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE" => {
3236 if let Ok(()) = ws.handle_balance_message(&msg, account_type).await {
3238 let balances = ws.balances.read().await;
3240 if let Some(balance) = balances.get(account_type) {
3241 return Ok(balance.clone());
3242 }
3243 }
3244 }
3245 _ => {}
3246 }
3247 }
3248 }
3249
3250 retries += 1;
3251 tokio::time::sleep(Duration::from_millis(100)).await;
3252 }
3253
3254 Err(Error::network("Timeout waiting for balance data"))
3255 }
3256
3257 pub async fn watch_orders(
3290 self: Arc<Self>,
3291 symbol: Option<&str>,
3292 since: Option<i64>,
3293 limit: Option<usize>,
3294 _params: Option<HashMap<String, Value>>,
3295 ) -> Result<Vec<Order>> {
3296 self.base.load_markets(false).await?;
3297
3298 let ws = self.create_authenticated_ws();
3299 ws.connect().await?;
3300
3301 loop {
3303 if let Some(msg) = ws.client.receive().await {
3304 if let Value::Object(data) = msg {
3305 if let Some(event_type) = data.get("e").and_then(|v| v.as_str()) {
3306 match event_type {
3307 "executionReport" => {
3308 let order = self.parse_ws_order(&data)?;
3310
3311 let mut orders = ws.orders.write().await;
3313 let symbol_orders = orders
3314 .entry(order.symbol.clone())
3315 .or_insert_with(HashMap::new);
3316 symbol_orders.insert(order.id.clone(), order.clone());
3317 drop(orders);
3318
3319 if let Some(exec_type) = data.get("x").and_then(|v| v.as_str()) {
3321 if exec_type == "TRADE" {
3322 if let Ok(trade) =
3324 ws.parse_ws_trade(&Value::Object(data.clone()))
3325 {
3326 let mut trades = ws.my_trades.write().await;
3328 let symbol_trades = trades
3329 .entry(trade.symbol.clone())
3330 .or_insert_with(VecDeque::new);
3331
3332 symbol_trades.push_front(trade);
3334 if symbol_trades.len() > 1000 {
3335 symbol_trades.pop_back();
3336 }
3337 }
3338 }
3339 }
3340
3341 return self.filter_orders(&ws, symbol, since, limit).await;
3343 }
3344 _ => continue,
3345 }
3346 }
3347 }
3348 } else {
3349 tokio::time::sleep(Duration::from_millis(100)).await;
3350 }
3351 }
3352 }
3353
3354 fn parse_ws_order(&self, data: &serde_json::Map<String, Value>) -> Result<Order> {
3356 use ccxt_core::types::{OrderSide, OrderStatus, OrderType};
3357 use rust_decimal::Decimal;
3358 use std::str::FromStr;
3359
3360 let symbol = data.get("s").and_then(|v| v.as_str()).unwrap_or("");
3362 let order_id = data
3363 .get("i")
3364 .and_then(|v| v.as_i64())
3365 .map(|id| id.to_string())
3366 .unwrap_or_default();
3367 let client_order_id = data.get("c").and_then(|v| v.as_str()).map(String::from);
3368
3369 let status_str = data.get("X").and_then(|v| v.as_str()).unwrap_or("NEW");
3371 let status = match status_str {
3372 "NEW" => OrderStatus::Open,
3373 "PARTIALLY_FILLED" => OrderStatus::Open,
3374 "FILLED" => OrderStatus::Closed,
3375 "CANCELED" => OrderStatus::Cancelled,
3376 "REJECTED" => OrderStatus::Rejected,
3377 "EXPIRED" => OrderStatus::Expired,
3378 _ => OrderStatus::Open,
3379 };
3380
3381 let side_str = data.get("S").and_then(|v| v.as_str()).unwrap_or("BUY");
3383 let side = match side_str {
3384 "BUY" => OrderSide::Buy,
3385 "SELL" => OrderSide::Sell,
3386 _ => OrderSide::Buy,
3387 };
3388
3389 let type_str = data.get("o").and_then(|v| v.as_str()).unwrap_or("LIMIT");
3391 let order_type = match type_str {
3392 "LIMIT" => OrderType::Limit,
3393 "MARKET" => OrderType::Market,
3394 "STOP_LOSS" => OrderType::StopLoss,
3395 "STOP_LOSS_LIMIT" => OrderType::StopLossLimit,
3396 "TAKE_PROFIT" => OrderType::TakeProfit,
3397 "TAKE_PROFIT_LIMIT" => OrderType::TakeProfitLimit,
3398 "LIMIT_MAKER" => OrderType::LimitMaker,
3399 _ => OrderType::Limit,
3400 };
3401
3402 let amount = data
3404 .get("q")
3405 .and_then(|v| v.as_str())
3406 .and_then(|s| Decimal::from_str(s).ok())
3407 .unwrap_or(Decimal::ZERO);
3408
3409 let price = data
3410 .get("p")
3411 .and_then(|v| v.as_str())
3412 .and_then(|s| Decimal::from_str(s).ok());
3413
3414 let filled = data
3415 .get("z")
3416 .and_then(|v| v.as_str())
3417 .and_then(|s| Decimal::from_str(s).ok());
3418
3419 let cost = data
3420 .get("Z")
3421 .and_then(|v| v.as_str())
3422 .and_then(|s| Decimal::from_str(s).ok());
3423
3424 let remaining = match filled {
3426 Some(fill) => Some(amount - fill),
3427 None => None,
3428 };
3429
3430 let average = match (filled, cost) {
3432 (Some(fill), Some(c)) if fill > Decimal::ZERO && c > Decimal::ZERO => Some(c / fill),
3433 _ => None,
3434 };
3435
3436 let timestamp = data.get("T").and_then(|v| v.as_i64());
3438 let last_trade_timestamp = data.get("T").and_then(|v| v.as_i64());
3439
3440 Ok(Order {
3441 id: order_id,
3442 client_order_id,
3443 timestamp,
3444 datetime: timestamp.map(|ts| {
3445 chrono::DateTime::from_timestamp_millis(ts)
3446 .map(|dt| dt.to_rfc3339())
3447 .unwrap_or_default()
3448 }),
3449 last_trade_timestamp,
3450 symbol: symbol.to_string(),
3451 order_type,
3452 side,
3453 price,
3454 average,
3455 amount,
3456 cost,
3457 filled,
3458 remaining,
3459 status,
3460 fee: None,
3461 fees: None,
3462 trades: None,
3463 time_in_force: data.get("f").and_then(|v| v.as_str()).map(String::from),
3464 post_only: None,
3465 reduce_only: None,
3466 stop_price: data
3467 .get("P")
3468 .and_then(|v| v.as_str())
3469 .and_then(|s| Decimal::from_str(s).ok()),
3470 trigger_price: None,
3471 take_profit_price: None,
3472 stop_loss_price: None,
3473 trailing_delta: None,
3474 trailing_percent: None,
3475 activation_price: None,
3476 callback_rate: None,
3477 working_type: data.get("wt").and_then(|v| v.as_str()).map(String::from),
3478 info: data.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
3479 })
3480 }
3481
3482 async fn filter_orders(
3484 &self,
3485 ws: &BinanceWs,
3486 symbol: Option<&str>,
3487 since: Option<i64>,
3488 limit: Option<usize>,
3489 ) -> Result<Vec<Order>> {
3490 let orders_map = ws.orders.read().await;
3491
3492 let mut orders: Vec<Order> = if let Some(sym) = symbol {
3494 orders_map
3495 .get(sym)
3496 .map(|symbol_orders| symbol_orders.values().cloned().collect())
3497 .unwrap_or_default()
3498 } else {
3499 orders_map
3500 .values()
3501 .flat_map(|symbol_orders| symbol_orders.values().cloned())
3502 .collect()
3503 };
3504
3505 if let Some(since_ts) = since {
3507 orders.retain(|order| order.timestamp.map_or(false, |ts| ts >= since_ts));
3508 }
3509
3510 orders.sort_by(|a, b| {
3512 let ts_a = a.timestamp.unwrap_or(0);
3513 let ts_b = b.timestamp.unwrap_or(0);
3514 ts_b.cmp(&ts_a)
3515 });
3516
3517 if let Some(lim) = limit {
3519 orders.truncate(lim);
3520 }
3521
3522 Ok(orders)
3523 }
3524
3525 pub async fn watch_my_trades(
3556 self: Arc<Self>,
3557 symbol: Option<&str>,
3558 since: Option<i64>,
3559 limit: Option<usize>,
3560 _params: Option<HashMap<String, Value>>,
3561 ) -> Result<Vec<Trade>> {
3562 let ws = self.create_authenticated_ws();
3564 ws.connect().await?;
3565
3566 let mut retries = 0;
3568 const MAX_RETRIES: u32 = 100;
3569
3570 while retries < MAX_RETRIES {
3571 if let Some(msg) = ws.client.receive().await {
3572 if msg.get("result").is_some() || msg.get("id").is_some() {
3574 continue;
3575 }
3576
3577 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3579 if event_type == "executionReport" {
3581 if let Ok(trade) = ws.parse_ws_trade(&msg) {
3582 let symbol_key = trade.symbol.clone();
3583
3584 let mut trades_map = ws.my_trades.write().await;
3586 let symbol_trades =
3587 trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
3588
3589 symbol_trades.push_front(trade);
3591 if symbol_trades.len() > 1000 {
3592 symbol_trades.pop_back();
3593 }
3594 }
3595 }
3596 }
3597 } else {
3598 tokio::time::sleep(Duration::from_millis(100)).await;
3599 }
3600
3601 retries += 1;
3602 }
3603
3604 ws.filter_my_trades(symbol, since, limit).await
3606 }
3607
3608 pub async fn watch_positions(
3673 self: Arc<Self>,
3674 symbols: Option<Vec<String>>,
3675 since: Option<i64>,
3676 limit: Option<usize>,
3677 _params: Option<HashMap<String, Value>>,
3678 ) -> Result<Vec<Position>> {
3679 let ws = self.create_authenticated_ws();
3681 ws.connect().await?;
3682
3683 let mut retries = 0;
3685 const MAX_RETRIES: u32 = 100;
3686
3687 while retries < MAX_RETRIES {
3688 if let Some(msg) = ws.client.receive().await {
3689 if msg.get("result").is_some() || msg.get("id").is_some() {
3691 continue;
3692 }
3693
3694 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3696 if event_type == "ACCOUNT_UPDATE" {
3697 if let Some(account_data) = msg.get("a") {
3698 if let Some(positions_array) =
3699 account_data.get("P").and_then(|p| p.as_array())
3700 {
3701 for position_data in positions_array {
3702 if let Ok(position) = ws.parse_ws_position(position_data).await
3703 {
3704 let symbol_key = position.symbol.clone();
3705 let side_key = position
3706 .side
3707 .clone()
3708 .unwrap_or_else(|| "both".to_string());
3709
3710 let mut positions_map = ws.positions.write().await;
3712 let symbol_positions = positions_map
3713 .entry(symbol_key)
3714 .or_insert_with(HashMap::new);
3715
3716 if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
3718 symbol_positions.remove(&side_key);
3719 if symbol_positions.is_empty() {
3720 positions_map.remove(&position.symbol);
3721 }
3722 } else {
3723 symbol_positions.insert(side_key, position);
3724 }
3725 }
3726 }
3727 }
3728 }
3729 }
3730 }
3731 } else {
3732 tokio::time::sleep(Duration::from_millis(100)).await;
3733 }
3734
3735 retries += 1;
3736 }
3737
3738 let symbols_ref = symbols.as_ref().map(|v| v.as_slice());
3740 ws.filter_positions(symbols_ref, since, limit).await
3741 }
3742}
3743
3744#[cfg(test)]
3745mod tests {
3746 use super::*;
3747
3748 #[test]
3749 fn test_binance_ws_creation() {
3750 let ws = BinanceWs::new(WS_BASE_URL.to_string());
3751 assert!(ws.listen_key.try_read().is_ok());
3753 }
3754
3755 #[test]
3756 fn test_stream_format() {
3757 let symbol = "btcusdt";
3758
3759 let ticker_stream = format!("{}@ticker", symbol);
3761 assert_eq!(ticker_stream, "btcusdt@ticker");
3762
3763 let trade_stream = format!("{}@trade", symbol);
3765 assert_eq!(trade_stream, "btcusdt@trade");
3766
3767 let depth_stream = format!("{}@depth20", symbol);
3769 assert_eq!(depth_stream, "btcusdt@depth20");
3770
3771 let kline_stream = format!("{}@kline_1m", symbol);
3773 assert_eq!(kline_stream, "btcusdt@kline_1m");
3774 }
3775
3776 #[tokio::test]
3777 async fn test_subscription_manager_basic() {
3778 let manager = SubscriptionManager::new();
3779 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
3780
3781 assert_eq!(manager.active_count(), 0);
3783 assert!(!manager.has_subscription("btcusdt@ticker").await);
3784
3785 manager
3787 .add_subscription(
3788 "btcusdt@ticker".to_string(),
3789 "BTCUSDT".to_string(),
3790 SubscriptionType::Ticker,
3791 tx.clone(),
3792 )
3793 .await
3794 .unwrap();
3795
3796 assert_eq!(manager.active_count(), 1);
3797 assert!(manager.has_subscription("btcusdt@ticker").await);
3798
3799 let sub = manager.get_subscription("btcusdt@ticker").await;
3801 assert!(sub.is_some());
3802 let sub = sub.unwrap();
3803 assert_eq!(sub.stream, "btcusdt@ticker");
3804 assert_eq!(sub.symbol, "BTCUSDT");
3805 assert_eq!(sub.sub_type, SubscriptionType::Ticker);
3806
3807 manager.remove_subscription("btcusdt@ticker").await.unwrap();
3809 assert_eq!(manager.active_count(), 0);
3810 assert!(!manager.has_subscription("btcusdt@ticker").await);
3811 }
3812
3813 #[tokio::test]
3814 async fn test_subscription_manager_multiple() {
3815 let manager = SubscriptionManager::new();
3816 let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
3817 let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
3818 let (tx3, _rx3) = tokio::sync::mpsc::unbounded_channel();
3819
3820 manager
3822 .add_subscription(
3823 "btcusdt@ticker".to_string(),
3824 "BTCUSDT".to_string(),
3825 SubscriptionType::Ticker,
3826 tx1,
3827 )
3828 .await
3829 .unwrap();
3830
3831 manager
3832 .add_subscription(
3833 "btcusdt@depth".to_string(),
3834 "BTCUSDT".to_string(),
3835 SubscriptionType::OrderBook,
3836 tx2,
3837 )
3838 .await
3839 .unwrap();
3840
3841 manager
3842 .add_subscription(
3843 "ethusdt@ticker".to_string(),
3844 "ETHUSDT".to_string(),
3845 SubscriptionType::Ticker,
3846 tx3,
3847 )
3848 .await
3849 .unwrap();
3850
3851 assert_eq!(manager.active_count(), 3);
3852
3853 let btc_subs = manager.get_subscriptions_by_symbol("BTCUSDT").await;
3855 assert_eq!(btc_subs.len(), 2);
3856
3857 let eth_subs = manager.get_subscriptions_by_symbol("ETHUSDT").await;
3858 assert_eq!(eth_subs.len(), 1);
3859
3860 let all_subs = manager.get_all_subscriptions().await;
3862 assert_eq!(all_subs.len(), 3);
3863
3864 manager.clear().await;
3866 assert_eq!(manager.active_count(), 0);
3867 }
3868
3869 #[tokio::test]
3870 async fn test_subscription_type_from_stream() {
3871 let sub_type = SubscriptionType::from_stream("btcusdt@ticker");
3873 assert_eq!(sub_type, Some(SubscriptionType::Ticker));
3874
3875 let sub_type = SubscriptionType::from_stream("btcusdt@depth");
3877 assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3878
3879 let sub_type = SubscriptionType::from_stream("btcusdt@depth@100ms");
3880 assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3881
3882 let sub_type = SubscriptionType::from_stream("btcusdt@trade");
3884 assert_eq!(sub_type, Some(SubscriptionType::Trades));
3885
3886 let sub_type = SubscriptionType::from_stream("btcusdt@aggTrade");
3887 assert_eq!(sub_type, Some(SubscriptionType::Trades));
3888
3889 let sub_type = SubscriptionType::from_stream("btcusdt@kline_1m");
3891 assert_eq!(sub_type, Some(SubscriptionType::Kline("1m".to_string())));
3892
3893 let sub_type = SubscriptionType::from_stream("btcusdt@kline_1h");
3894 assert_eq!(sub_type, Some(SubscriptionType::Kline("1h".to_string())));
3895
3896 let sub_type = SubscriptionType::from_stream("btcusdt@markPrice");
3898 assert_eq!(sub_type, Some(SubscriptionType::MarkPrice));
3899
3900 let sub_type = SubscriptionType::from_stream("btcusdt@bookTicker");
3902 assert_eq!(sub_type, Some(SubscriptionType::BookTicker));
3903
3904 let sub_type = SubscriptionType::from_stream("btcusdt@unknown");
3906 assert_eq!(sub_type, None);
3907 }
3908
3909 #[tokio::test]
3910 async fn test_subscription_send_message() {
3911 let manager = SubscriptionManager::new();
3912 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
3913
3914 manager
3916 .add_subscription(
3917 "btcusdt@ticker".to_string(),
3918 "BTCUSDT".to_string(),
3919 SubscriptionType::Ticker,
3920 tx,
3921 )
3922 .await
3923 .unwrap();
3924
3925 let test_msg = serde_json::json!({
3927 "e": "24hrTicker",
3928 "s": "BTCUSDT",
3929 "c": "50000"
3930 });
3931
3932 let sent = manager
3933 .send_to_stream("btcusdt@ticker", test_msg.clone())
3934 .await;
3935 assert!(sent);
3936
3937 let received = rx.recv().await;
3939 assert!(received.is_some());
3940 assert_eq!(received.unwrap(), test_msg);
3941 }
3942
3943 #[tokio::test]
3944 async fn test_subscription_send_to_symbol() {
3945 let manager = SubscriptionManager::new();
3946 let (tx1, mut rx1) = tokio::sync::mpsc::unbounded_channel();
3947 let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel();
3948
3949 manager
3951 .add_subscription(
3952 "btcusdt@ticker".to_string(),
3953 "BTCUSDT".to_string(),
3954 SubscriptionType::Ticker,
3955 tx1,
3956 )
3957 .await
3958 .unwrap();
3959
3960 manager
3961 .add_subscription(
3962 "btcusdt@depth".to_string(),
3963 "BTCUSDT".to_string(),
3964 SubscriptionType::OrderBook,
3965 tx2,
3966 )
3967 .await
3968 .unwrap();
3969
3970 let test_msg = serde_json::json!({
3972 "s": "BTCUSDT",
3973 "data": "test"
3974 });
3975
3976 let sent_count = manager.send_to_symbol("BTCUSDT", &test_msg).await;
3977 assert_eq!(sent_count, 2);
3978
3979 let received1 = rx1.recv().await;
3981 assert!(received1.is_some());
3982 assert_eq!(received1.unwrap(), test_msg);
3983
3984 let received2 = rx2.recv().await;
3985 assert!(received2.is_some());
3986 assert_eq!(received2.unwrap(), test_msg);
3987 }
3988
3989 #[test]
3990 fn test_symbol_conversion() {
3991 let symbol = "BTC/USDT";
3992 let binance_symbol = symbol.replace('/', "").to_lowercase();
3993 assert_eq!(binance_symbol, "btcusdt");
3994 }
3995
3996 #[test]
3999 fn test_reconnect_config_default() {
4000 let config = ReconnectConfig::default();
4001
4002 assert!(config.enabled);
4003 assert_eq!(config.initial_delay_ms, 1000);
4004 assert_eq!(config.max_delay_ms, 30000);
4005 assert_eq!(config.backoff_multiplier, 2.0);
4006 assert_eq!(config.max_attempts, 0); }
4008
4009 #[test]
4010 fn test_reconnect_config_calculate_delay() {
4011 let config = ReconnectConfig::default();
4012
4013 assert_eq!(config.calculate_delay(0), 1000); assert_eq!(config.calculate_delay(1), 2000); assert_eq!(config.calculate_delay(2), 4000); assert_eq!(config.calculate_delay(3), 8000); assert_eq!(config.calculate_delay(4), 16000); assert_eq!(config.calculate_delay(5), 30000); assert_eq!(config.calculate_delay(6), 30000); }
4022
4023 #[test]
4024 fn test_reconnect_config_should_retry() {
4025 let mut config = ReconnectConfig::default();
4026
4027 assert!(config.should_retry(0));
4029 assert!(config.should_retry(10));
4030 assert!(config.should_retry(100));
4031
4032 config.max_attempts = 3;
4034 assert!(config.should_retry(0));
4035 assert!(config.should_retry(1));
4036 assert!(config.should_retry(2));
4037 assert!(!config.should_retry(3));
4038 assert!(!config.should_retry(4));
4039
4040 config.enabled = false;
4042 assert!(!config.should_retry(0));
4043 assert!(!config.should_retry(1));
4044 }
4045
4046 #[test]
4047 fn test_message_router_extract_stream_name_combined() {
4048 let message = serde_json::json!({
4050 "stream": "btcusdt@ticker",
4051 "data": {
4052 "e": "24hrTicker",
4053 "s": "BTCUSDT"
4054 }
4055 });
4056
4057 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4058 assert_eq!(stream_name, "btcusdt@ticker");
4059 }
4060
4061 #[test]
4062 fn test_message_router_extract_stream_name_ticker() {
4063 let message = serde_json::json!({
4065 "e": "24hrTicker",
4066 "s": "BTCUSDT",
4067 "E": 1672531200000_u64,
4068 "c": "16950.00",
4069 "h": "17100.00"
4070 });
4071
4072 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4073 assert_eq!(stream_name, "btcusdt@ticker");
4074 }
4075
4076 #[test]
4077 fn test_message_router_extract_stream_name_depth() {
4078 let message = serde_json::json!({
4080 "e": "depthUpdate",
4081 "s": "ETHUSDT",
4082 "E": 1672531200000_u64,
4083 "U": 157,
4084 "u": 160
4085 });
4086
4087 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4088 assert_eq!(stream_name, "ethusdt@depth");
4089 }
4090
4091 #[test]
4092 fn test_message_router_extract_stream_name_trade() {
4093 let message = serde_json::json!({
4095 "e": "trade",
4096 "s": "BNBUSDT",
4097 "E": 1672531200000_u64,
4098 "t": 12345
4099 });
4100
4101 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4102 assert_eq!(stream_name, "bnbusdt@trade");
4103 }
4104
4105 #[test]
4106 fn test_message_router_extract_stream_name_kline() {
4107 let message = serde_json::json!({
4109 "e": "kline",
4110 "s": "BTCUSDT",
4111 "E": 1672531200000_u64,
4112 "k": {
4113 "i": "1m",
4114 "t": 1672531200000_u64,
4115 "o": "16950.00"
4116 }
4117 });
4118
4119 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4120 assert_eq!(stream_name, "btcusdt@kline_1m");
4121 }
4122
4123 #[test]
4124 fn test_message_router_extract_stream_name_mark_price() {
4125 let message = serde_json::json!({
4127 "e": "markPriceUpdate",
4128 "s": "BTCUSDT",
4129 "E": 1672531200000_u64,
4130 "p": "16950.00"
4131 });
4132
4133 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4134 assert_eq!(stream_name, "btcusdt@markPrice");
4135 }
4136
4137 #[test]
4138 fn test_message_router_extract_stream_name_book_ticker() {
4139 let message = serde_json::json!({
4141 "e": "bookTicker",
4142 "s": "ETHUSDT",
4143 "E": 1672531200000_u64,
4144 "b": "1200.00",
4145 "a": "1200.50"
4146 });
4147
4148 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4149 assert_eq!(stream_name, "ethusdt@bookTicker");
4150 }
4151
4152 #[test]
4153 fn test_message_router_extract_stream_name_subscription_response() {
4154 let message = serde_json::json!({
4156 "result": null,
4157 "id": 1
4158 });
4159
4160 let result = MessageRouter::extract_stream_name(&message);
4161 assert!(result.is_err());
4162 }
4163
4164 #[test]
4165 fn test_message_router_extract_stream_name_error_response() {
4166 let message = serde_json::json!({
4168 "error": {
4169 "code": -1,
4170 "msg": "Invalid request"
4171 },
4172 "id": 1
4173 });
4174
4175 let result = MessageRouter::extract_stream_name(&message);
4176 assert!(result.is_err());
4177 }
4178
4179 #[test]
4180 fn test_message_router_extract_stream_name_invalid() {
4181 let message = serde_json::json!({
4183 "unknown": "data"
4184 });
4185
4186 let result = MessageRouter::extract_stream_name(&message);
4187 assert!(result.is_err());
4188 }
4189
4190 #[tokio::test]
4191 async fn test_message_router_creation() {
4192 let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4193 let subscription_manager = Arc::new(SubscriptionManager::new());
4194
4195 let router = MessageRouter::new(ws_url.clone(), subscription_manager);
4196
4197 assert!(!router.is_connected());
4199 assert_eq!(router.ws_url, ws_url);
4200 }
4201
4202 #[tokio::test]
4203 async fn test_message_router_reconnect_config() {
4204 let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4205 let subscription_manager = Arc::new(SubscriptionManager::new());
4206
4207 let router = MessageRouter::new(ws_url, subscription_manager);
4208
4209 let config = router.get_reconnect_config().await;
4211 assert!(config.enabled);
4212 assert_eq!(config.initial_delay_ms, 1000);
4213
4214 let new_config = ReconnectConfig {
4216 enabled: false,
4217 initial_delay_ms: 2000,
4218 max_delay_ms: 60000,
4219 backoff_multiplier: 1.5,
4220 max_attempts: 5,
4221 };
4222
4223 router.set_reconnect_config(new_config.clone()).await;
4224
4225 let updated_config = router.get_reconnect_config().await;
4226 assert!(!updated_config.enabled);
4227 assert_eq!(updated_config.initial_delay_ms, 2000);
4228 assert_eq!(updated_config.max_delay_ms, 60000);
4229 assert_eq!(updated_config.backoff_multiplier, 1.5);
4230 assert_eq!(updated_config.max_attempts, 5);
4231 }
4232}