1use crate::binance::Binance;
6use crate::binance::parser;
7use ccxt_core::error::{Error, Result};
8use ccxt_core::types::financial::{Amount, Cost, Price};
9use ccxt_core::types::{
10 Balance, BidAsk, MarkPrice, MarketType, OHLCV, Order, OrderBook, Position, Ticker, Trade,
11};
12use ccxt_core::ws_client::{WsClient, WsConfig};
13use serde_json::Value;
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock};
18use tokio::task::JoinHandle;
19use tokio_tungstenite::tungstenite::protocol::Message;
20
21#[allow(dead_code)]
23const WS_BASE_URL: &str = "wss://stream.binance.com:9443/ws";
24#[allow(dead_code)]
25const WS_TESTNET_URL: &str = "wss://testnet.binance.vision/ws";
26
27const LISTEN_KEY_REFRESH_INTERVAL: Duration = Duration::from_secs(30 * 60);
29
30pub struct ListenKeyManager {
38 binance: Arc<Binance>,
40 listen_key: Arc<RwLock<Option<String>>>,
42 created_at: Arc<RwLock<Option<Instant>>>,
44 refresh_interval: Duration,
46 refresh_task: Arc<Mutex<Option<JoinHandle<()>>>>,
48}
49
50impl ListenKeyManager {
51 pub fn new(binance: Arc<Binance>) -> Self {
59 Self {
60 binance,
61 listen_key: Arc::new(RwLock::new(None)),
62 created_at: Arc::new(RwLock::new(None)),
63 refresh_interval: LISTEN_KEY_REFRESH_INTERVAL,
64 refresh_task: Arc::new(Mutex::new(None)),
65 }
66 }
67
68 pub async fn get_or_create(&self) -> Result<String> {
79 let key_opt = self.listen_key.read().await.clone();
81
82 if let Some(key) = key_opt {
83 let created = self.created_at.read().await;
85 if let Some(created_time) = *created {
86 let elapsed = created_time.elapsed();
87 if elapsed > Duration::from_secs(50 * 60) {
89 drop(created);
90 return self.create_new().await;
91 }
92 }
93 return Ok(key);
94 }
95
96 self.create_new().await
98 }
99
100 async fn create_new(&self) -> Result<String> {
105 let key = self.binance.create_listen_key().await?;
106
107 *self.listen_key.write().await = Some(key.clone());
109 *self.created_at.write().await = Some(Instant::now());
110
111 Ok(key)
112 }
113
114 pub async fn refresh(&self) -> Result<()> {
121 let key_opt = self.listen_key.read().await.clone();
122
123 if let Some(key) = key_opt {
124 self.binance.refresh_listen_key(&key).await?;
125 *self.created_at.write().await = Some(Instant::now());
127 Ok(())
128 } else {
129 Err(Error::invalid_request("No listen key to refresh"))
130 }
131 }
132
133 pub async fn start_auto_refresh(&self) {
137 self.stop_auto_refresh().await;
139
140 let listen_key = self.listen_key.clone();
141 let created_at = self.created_at.clone();
142 let binance = self.binance.clone();
143 let interval = self.refresh_interval;
144
145 let handle = tokio::spawn(async move {
146 loop {
147 tokio::time::sleep(interval).await;
148
149 let key_opt = listen_key.read().await.clone();
151 if let Some(key) = key_opt {
152 match binance.refresh_listen_key(&key).await {
154 Ok(()) => {
155 *created_at.write().await = Some(Instant::now());
156 }
158 Err(_e) => {
159 *listen_key.write().await = None;
161 *created_at.write().await = None;
162 break;
163 }
164 }
165 } else {
166 break;
168 }
169 }
170 });
171
172 *self.refresh_task.lock().await = Some(handle);
173 }
174
175 pub async fn stop_auto_refresh(&self) {
177 let mut task_opt = self.refresh_task.lock().await;
178 if let Some(handle) = task_opt.take() {
179 handle.abort();
180 }
181 }
182
183 pub async fn delete(&self) -> Result<()> {
190 self.stop_auto_refresh().await;
192
193 let key_opt = self.listen_key.read().await.clone();
194
195 if let Some(key) = key_opt {
196 self.binance.delete_listen_key(&key).await?;
197
198 *self.listen_key.write().await = None;
200 *self.created_at.write().await = None;
201
202 Ok(())
203 } else {
204 Ok(()) }
206 }
207
208 pub async fn get_current(&self) -> Option<String> {
210 self.listen_key.read().await.clone()
211 }
212
213 pub async fn is_valid(&self) -> bool {
215 let key_opt = self.listen_key.read().await;
216 if key_opt.is_none() {
217 return false;
218 }
219
220 let created = self.created_at.read().await;
221 if let Some(created_time) = *created {
222 created_time.elapsed() < Duration::from_secs(55 * 60)
224 } else {
225 false
226 }
227 }
228}
229
230impl Drop for ListenKeyManager {
231 fn drop(&mut self) {
232 }
235}
236#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum SubscriptionType {
239 Ticker,
241 OrderBook,
243 Trades,
245 Kline(String),
247 Balance,
249 Orders,
251 Positions,
253 MyTrades,
255 MarkPrice,
257 BookTicker,
259}
260
261impl SubscriptionType {
262 pub fn from_stream(stream: &str) -> Option<Self> {
270 if stream.contains("@ticker") {
271 Some(Self::Ticker)
272 } else if stream.contains("@depth") {
273 Some(Self::OrderBook)
274 } else if stream.contains("@trade") || stream.contains("@aggTrade") {
275 Some(Self::Trades)
276 } else if stream.contains("@kline_") {
277 let parts: Vec<&str> = stream.split("@kline_").collect();
279 if parts.len() == 2 {
280 Some(Self::Kline(parts[1].to_string()))
281 } else {
282 None
283 }
284 } else if stream.contains("@markPrice") {
285 Some(Self::MarkPrice)
286 } else if stream.contains("@bookTicker") {
287 Some(Self::BookTicker)
288 } else {
289 None
290 }
291 }
292}
293
294#[derive(Clone)]
296pub struct Subscription {
297 pub stream: String,
299 pub symbol: String,
301 pub sub_type: SubscriptionType,
303 pub subscribed_at: Instant,
305 pub sender: tokio::sync::mpsc::UnboundedSender<Value>,
307}
308
309impl Subscription {
310 pub fn new(
312 stream: String,
313 symbol: String,
314 sub_type: SubscriptionType,
315 sender: tokio::sync::mpsc::UnboundedSender<Value>,
316 ) -> Self {
317 Self {
318 stream,
319 symbol,
320 sub_type,
321 subscribed_at: Instant::now(),
322 sender,
323 }
324 }
325
326 pub fn send(&self, message: Value) -> bool {
334 self.sender.send(message).is_ok()
335 }
336}
337
338pub struct SubscriptionManager {
345 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
347 symbol_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
349 active_count: Arc<std::sync::atomic::AtomicUsize>,
351}
352
353impl SubscriptionManager {
354 pub fn new() -> Self {
356 Self {
357 subscriptions: Arc::new(RwLock::new(HashMap::new())),
358 symbol_index: Arc::new(RwLock::new(HashMap::new())),
359 active_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
360 }
361 }
362
363 pub async fn add_subscription(
374 &self,
375 stream: String,
376 symbol: String,
377 sub_type: SubscriptionType,
378 sender: tokio::sync::mpsc::UnboundedSender<Value>,
379 ) -> Result<()> {
380 let subscription = Subscription::new(stream.clone(), symbol.clone(), sub_type, sender);
381
382 let mut subs = self.subscriptions.write().await;
384 subs.insert(stream.clone(), subscription);
385
386 let mut index = self.symbol_index.write().await;
388 index.entry(symbol).or_insert_with(Vec::new).push(stream);
389
390 self.active_count
392 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
393
394 Ok(())
395 }
396
397 pub async fn remove_subscription(&self, stream: &str) -> Result<()> {
405 let mut subs = self.subscriptions.write().await;
406
407 if let Some(subscription) = subs.remove(stream) {
408 let mut index = self.symbol_index.write().await;
410 if let Some(streams) = index.get_mut(&subscription.symbol) {
411 streams.retain(|s| s != stream);
412 if streams.is_empty() {
414 index.remove(&subscription.symbol);
415 }
416 }
417
418 self.active_count
420 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
421 }
422
423 Ok(())
424 }
425
426 pub async fn get_subscription(&self, stream: &str) -> Option<Subscription> {
434 let subs = self.subscriptions.read().await;
435 subs.get(stream).cloned()
436 }
437
438 pub async fn has_subscription(&self, stream: &str) -> bool {
446 let subs = self.subscriptions.read().await;
447 subs.contains_key(stream)
448 }
449
450 pub async fn get_all_subscriptions(&self) -> Vec<Subscription> {
452 let subs = self.subscriptions.read().await;
453 subs.values().cloned().collect()
454 }
455
456 pub async fn get_subscriptions_by_symbol(&self, symbol: &str) -> Vec<Subscription> {
461 let index = self.symbol_index.read().await;
462 let subs = self.subscriptions.read().await;
463
464 if let Some(streams) = index.get(symbol) {
465 streams
466 .iter()
467 .filter_map(|stream| subs.get(stream).cloned())
468 .collect()
469 } else {
470 Vec::new()
471 }
472 }
473
474 pub fn active_count(&self) -> usize {
476 self.active_count.load(std::sync::atomic::Ordering::SeqCst)
477 }
478
479 pub async fn clear(&self) {
481 let mut subs = self.subscriptions.write().await;
482 let mut index = self.symbol_index.write().await;
483
484 subs.clear();
485 index.clear();
486 self.active_count
487 .store(0, std::sync::atomic::Ordering::SeqCst);
488 }
489
490 pub async fn send_to_stream(&self, stream: &str, message: Value) -> bool {
499 let subs = self.subscriptions.read().await;
500 if let Some(subscription) = subs.get(stream) {
501 subscription.send(message)
502 } else {
503 false
504 }
505 }
506
507 pub async fn send_to_symbol(&self, symbol: &str, message: &Value) -> usize {
516 let index = self.symbol_index.read().await;
517 let subs = self.subscriptions.read().await;
518
519 let mut sent_count = 0;
520
521 if let Some(streams) = index.get(symbol) {
522 for stream in streams {
523 if let Some(subscription) = subs.get(stream) {
524 if subscription.send(message.clone()) {
525 sent_count += 1;
526 }
527 }
528 }
529 }
530
531 sent_count
532 }
533}
534#[derive(Debug, Clone)]
538pub struct ReconnectConfig {
539 pub enabled: bool,
541
542 pub initial_delay_ms: u64,
544
545 pub max_delay_ms: u64,
547
548 pub backoff_multiplier: f64,
550
551 pub max_attempts: usize,
553}
554
555impl Default for ReconnectConfig {
556 fn default() -> Self {
557 Self {
558 enabled: true,
559 initial_delay_ms: 1000, max_delay_ms: 30000, backoff_multiplier: 2.0, max_attempts: 0, }
564 }
565}
566
567impl ReconnectConfig {
568 pub fn calculate_delay(&self, attempt: usize) -> u64 {
578 let delay = (self.initial_delay_ms as f64) * self.backoff_multiplier.powi(attempt as i32);
579 delay.min(self.max_delay_ms as f64) as u64
580 }
581
582 pub fn should_retry(&self, attempt: usize) -> bool {
590 self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
591 }
592}
593
594pub struct MessageRouter {
602 ws_client: Arc<RwLock<Option<WsClient>>>,
604
605 subscription_manager: Arc<SubscriptionManager>,
607
608 router_task: Arc<Mutex<Option<JoinHandle<()>>>>,
610
611 is_connected: Arc<std::sync::atomic::AtomicBool>,
613
614 reconnect_config: Arc<RwLock<ReconnectConfig>>,
616
617 ws_url: String,
619
620 request_id: Arc<std::sync::atomic::AtomicU64>,
622}
623
624impl MessageRouter {
625 pub fn new(ws_url: String, subscription_manager: Arc<SubscriptionManager>) -> Self {
634 Self {
635 ws_client: Arc::new(RwLock::new(None)),
636 subscription_manager,
637 router_task: Arc::new(Mutex::new(None)),
638 is_connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
639 reconnect_config: Arc::new(RwLock::new(ReconnectConfig::default())),
640 ws_url,
641 request_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
642 }
643 }
644
645 pub async fn start(&self) -> Result<()> {
652 if self.is_connected() {
654 self.stop().await?;
655 }
656
657 let config = WsConfig {
659 url: self.ws_url.clone(),
660 ..Default::default()
661 };
662 let client = WsClient::new(config);
663 client.connect().await?;
664
665 *self.ws_client.write().await = Some(client);
667
668 self.is_connected
670 .store(true, std::sync::atomic::Ordering::SeqCst);
671
672 let ws_client = self.ws_client.clone();
674 let subscription_manager = self.subscription_manager.clone();
675 let is_connected = self.is_connected.clone();
676 let reconnect_config = self.reconnect_config.clone();
677 let ws_url = self.ws_url.clone();
678
679 let handle = tokio::spawn(async move {
680 Self::message_loop(
681 ws_client,
682 subscription_manager,
683 is_connected,
684 reconnect_config,
685 ws_url,
686 )
687 .await;
688 });
689
690 *self.router_task.lock().await = Some(handle);
691
692 Ok(())
693 }
694
695 pub async fn stop(&self) -> Result<()> {
702 self.is_connected
704 .store(false, std::sync::atomic::Ordering::SeqCst);
705
706 let mut task_opt = self.router_task.lock().await;
708 if let Some(handle) = task_opt.take() {
709 handle.abort();
710 }
711
712 let mut client_opt = self.ws_client.write().await;
714 if let Some(client) = client_opt.take() {
715 let _ = client.disconnect().await;
716 }
717
718 Ok(())
719 }
720
721 pub async fn restart(&self) -> Result<()> {
728 self.stop().await?;
729 tokio::time::sleep(Duration::from_millis(100)).await;
730 self.start().await
731 }
732
733 pub fn is_connected(&self) -> bool {
735 self.is_connected.load(std::sync::atomic::Ordering::SeqCst)
736 }
737
738 pub async fn set_reconnect_config(&self, config: ReconnectConfig) {
743 *self.reconnect_config.write().await = config;
744 }
745
746 pub async fn get_reconnect_config(&self) -> ReconnectConfig {
748 self.reconnect_config.read().await.clone()
749 }
750
751 pub async fn subscribe(&self, streams: Vec<String>) -> Result<()> {
758 if streams.is_empty() {
759 return Ok(());
760 }
761
762 let client_opt = self.ws_client.read().await;
763 let client = client_opt
764 .as_ref()
765 .ok_or_else(|| Error::network("WebSocket not connected"))?;
766
767 let id = self
769 .request_id
770 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
771
772 #[allow(clippy::disallowed_methods)]
774 let request = serde_json::json!({
775 "method": "SUBSCRIBE",
776 "params": streams,
777 "id": id
778 });
779
780 client
782 .send(Message::Text(request.to_string().into()))
783 .await?;
784
785 Ok(())
786 }
787
788 pub async fn unsubscribe(&self, streams: Vec<String>) -> Result<()> {
795 if streams.is_empty() {
796 return Ok(());
797 }
798
799 let client_opt = self.ws_client.read().await;
800 let client = client_opt
801 .as_ref()
802 .ok_or_else(|| Error::network("WebSocket not connected"))?;
803
804 let id = self
806 .request_id
807 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
808
809 #[allow(clippy::disallowed_methods)]
811 let request = serde_json::json!({
812 "method": "UNSUBSCRIBE",
813 "params": streams,
814 "id": id
815 });
816
817 client
819 .send(Message::Text(request.to_string().into()))
820 .await?;
821
822 Ok(())
823 }
824
825 async fn message_loop(
829 ws_client: Arc<RwLock<Option<WsClient>>>,
830 subscription_manager: Arc<SubscriptionManager>,
831 is_connected: Arc<std::sync::atomic::AtomicBool>,
832 reconnect_config: Arc<RwLock<ReconnectConfig>>,
833 ws_url: String,
834 ) {
835 let mut reconnect_attempt = 0;
836
837 loop {
838 if !is_connected.load(std::sync::atomic::Ordering::SeqCst) {
840 break;
841 }
842
843 let has_client = ws_client.read().await.is_some();
845
846 if !has_client {
847 let config = reconnect_config.read().await;
849 if config.should_retry(reconnect_attempt) {
850 let delay = config.calculate_delay(reconnect_attempt);
851 drop(config);
852
853 tokio::time::sleep(Duration::from_millis(delay)).await;
854
855 if let Ok(()) = Self::reconnect(&ws_url, ws_client.clone()).await {
856 reconnect_attempt = 0; continue;
858 }
859 reconnect_attempt += 1;
860 continue;
861 }
862 is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
864 break;
865 }
866
867 let message_opt = {
869 let guard = ws_client.read().await;
870 if let Some(client) = guard.as_ref() {
871 client.receive().await
872 } else {
873 None
874 }
875 };
876
877 if let Some(value) = message_opt {
878 if let Err(_e) = Self::handle_message(value, subscription_manager.clone()).await {
880 continue;
881 }
882
883 reconnect_attempt = 0;
885 } else {
886 let config = reconnect_config.read().await;
888 if config.should_retry(reconnect_attempt) {
889 let delay = config.calculate_delay(reconnect_attempt);
890 drop(config);
891
892 tokio::time::sleep(Duration::from_millis(delay)).await;
893
894 if let Ok(()) = Self::reconnect(&ws_url, ws_client.clone()).await {
895 reconnect_attempt = 0;
896 continue;
897 }
898 reconnect_attempt += 1;
899 continue;
900 }
901 is_connected.store(false, std::sync::atomic::Ordering::SeqCst);
903 break;
904 }
905 }
906 }
907
908 async fn handle_message(
912 message: Value,
913 subscription_manager: Arc<SubscriptionManager>,
914 ) -> Result<()> {
915 let stream_name = Self::extract_stream_name(&message)?;
917
918 let sent = subscription_manager
920 .send_to_stream(&stream_name, message)
921 .await;
922
923 if sent {
924 Ok(())
925 } else {
926 Err(Error::generic("No subscribers for stream"))
927 }
928 }
929
930 fn extract_stream_name(message: &Value) -> Result<String> {
942 if let Some(stream) = message.get("stream").and_then(|s| s.as_str()) {
944 return Ok(stream.to_string());
945 }
946
947 if let Some(event_type) = message.get("e").and_then(|e| e.as_str()) {
949 if let Some(symbol) = message.get("s").and_then(|s| s.as_str()) {
950 let stream = match event_type {
952 "24hrTicker" => format!("{}@ticker", symbol.to_lowercase()),
953 "depthUpdate" => format!("{}@depth", symbol.to_lowercase()),
954 "aggTrade" => format!("{}@aggTrade", symbol.to_lowercase()),
955 "trade" => format!("{}@trade", symbol.to_lowercase()),
956 "kline" => {
957 if let Some(kline) = message.get("k") {
959 if let Some(interval) = kline.get("i").and_then(|i| i.as_str()) {
960 format!("{}@kline_{}", symbol.to_lowercase(), interval)
961 } else {
962 return Err(Error::generic("Missing kline interval"));
963 }
964 } else {
965 return Err(Error::generic("Missing kline data"));
966 }
967 }
968 "markPriceUpdate" => format!("{}@markPrice", symbol.to_lowercase()),
969 "bookTicker" => format!("{}@bookTicker", symbol.to_lowercase()),
970 _ => {
971 return Err(Error::generic(format!(
972 "Unknown event type: {}",
973 event_type
974 )));
975 }
976 };
977 return Ok(stream);
978 }
979 }
980
981 if message.get("result").is_some() || message.get("error").is_some() {
983 return Err(Error::generic("Subscription response, skip routing"));
984 }
985
986 Err(Error::generic("Cannot extract stream name from message"))
987 }
988
989 async fn reconnect(ws_url: &str, ws_client: Arc<RwLock<Option<WsClient>>>) -> Result<()> {
993 {
995 let mut client_opt = ws_client.write().await;
996 if let Some(client) = client_opt.take() {
997 let _ = client.disconnect().await;
998 }
999 }
1000
1001 let config = WsConfig {
1003 url: ws_url.to_string(),
1004 ..Default::default()
1005 };
1006 let new_client = WsClient::new(config);
1007
1008 new_client.connect().await?;
1010
1011 *ws_client.write().await = Some(new_client);
1013
1014 Ok(())
1015 }
1016}
1017
1018impl Drop for MessageRouter {
1019 fn drop(&mut self) {
1020 }
1023}
1024
1025impl Default for SubscriptionManager {
1026 fn default() -> Self {
1027 Self::new()
1028 }
1029}
1030
1031pub struct BinanceWs {
1033 client: Arc<WsClient>,
1034 listen_key: Arc<RwLock<Option<String>>>,
1035 listen_key_manager: Option<Arc<ListenKeyManager>>,
1037 auto_reconnect_coordinator: Arc<Mutex<Option<ccxt_core::ws_client::AutoReconnectCoordinator>>>,
1039 tickers: Arc<Mutex<HashMap<String, Ticker>>>,
1041 bids_asks: Arc<Mutex<HashMap<String, BidAsk>>>,
1043 #[allow(dead_code)]
1045 mark_prices: Arc<Mutex<HashMap<String, MarkPrice>>>,
1046 orderbooks: Arc<Mutex<HashMap<String, OrderBook>>>,
1048 trades: Arc<Mutex<HashMap<String, VecDeque<Trade>>>>,
1050 ohlcvs: Arc<Mutex<HashMap<String, VecDeque<OHLCV>>>>,
1052 balances: Arc<RwLock<HashMap<String, Balance>>>,
1054 orders: Arc<RwLock<HashMap<String, HashMap<String, Order>>>>,
1056 my_trades: Arc<RwLock<HashMap<String, VecDeque<Trade>>>>,
1058 positions: Arc<RwLock<HashMap<String, HashMap<String, Position>>>>,
1060}
1061
1062impl BinanceWs {
1063 pub fn new(url: String) -> Self {
1071 let config = WsConfig {
1072 url,
1073 connect_timeout: 10000,
1074 ping_interval: 180000, reconnect_interval: 5000,
1076 max_reconnect_attempts: 5,
1077 auto_reconnect: true,
1078 enable_compression: false,
1079 pong_timeout: 90000, ..Default::default()
1081 };
1082
1083 Self {
1084 client: Arc::new(WsClient::new(config)),
1085 listen_key: Arc::new(RwLock::new(None)),
1086 listen_key_manager: None,
1087 auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1088 tickers: Arc::new(Mutex::new(HashMap::new())),
1089 bids_asks: Arc::new(Mutex::new(HashMap::new())),
1090 mark_prices: Arc::new(Mutex::new(HashMap::new())),
1091 orderbooks: Arc::new(Mutex::new(HashMap::new())),
1092 trades: Arc::new(Mutex::new(HashMap::new())),
1093 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1094 balances: Arc::new(RwLock::new(HashMap::new())),
1095 orders: Arc::new(RwLock::new(HashMap::new())),
1096 my_trades: Arc::new(RwLock::new(HashMap::new())),
1097 positions: Arc::new(RwLock::new(HashMap::new())),
1098 }
1099 }
1100
1101 pub fn new_with_auth(url: String, binance: Arc<Binance>) -> Self {
1110 let config = WsConfig {
1111 url,
1112 connect_timeout: 10000,
1113 ping_interval: 180000, reconnect_interval: 5000,
1115 max_reconnect_attempts: 5,
1116 auto_reconnect: true,
1117 enable_compression: false,
1118 pong_timeout: 90000, ..Default::default()
1120 };
1121
1122 Self {
1123 client: Arc::new(WsClient::new(config)),
1124 listen_key: Arc::new(RwLock::new(None)),
1125 listen_key_manager: Some(Arc::new(ListenKeyManager::new(binance))),
1126 auto_reconnect_coordinator: Arc::new(Mutex::new(None)),
1127 tickers: Arc::new(Mutex::new(HashMap::new())),
1128 bids_asks: Arc::new(Mutex::new(HashMap::new())),
1129 mark_prices: Arc::new(Mutex::new(HashMap::new())),
1130 orderbooks: Arc::new(Mutex::new(HashMap::new())),
1131 trades: Arc::new(Mutex::new(HashMap::new())),
1132 ohlcvs: Arc::new(Mutex::new(HashMap::new())),
1133 balances: Arc::new(RwLock::new(HashMap::new())),
1134 orders: Arc::new(RwLock::new(HashMap::new())),
1135 my_trades: Arc::new(RwLock::new(HashMap::new())),
1136 positions: Arc::new(RwLock::new(HashMap::new())),
1137 }
1138 }
1139
1140 pub async fn connect(&self) -> Result<()> {
1142 self.client.connect().await?;
1144
1145 let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1147 if coordinator_guard.is_none() {
1148 let coordinator = self.client.clone().create_auto_reconnect_coordinator();
1149 coordinator.start().await;
1150 *coordinator_guard = Some(coordinator);
1151 tracing::info!("Auto-reconnect coordinator started");
1152 }
1153
1154 Ok(())
1155 }
1156
1157 pub async fn disconnect(&self) -> Result<()> {
1159 let mut coordinator_guard = self.auto_reconnect_coordinator.lock().await;
1161 if let Some(coordinator) = coordinator_guard.take() {
1162 coordinator.stop().await;
1163 tracing::info!("Auto-reconnect coordinator stopped");
1164 }
1165
1166 if let Some(manager) = &self.listen_key_manager {
1168 manager.stop_auto_refresh().await;
1169 }
1170
1171 self.client.disconnect().await
1172 }
1173
1174 pub async fn connect_user_stream(&self) -> Result<()> {
1199 let manager = self.listen_key_manager.as_ref()
1200 .ok_or_else(|| Error::invalid_request(
1201 "Listen key manager not available. Use new_with_auth() to create authenticated WebSocket"
1202 ))?;
1203
1204 let listen_key = manager.get_or_create().await?;
1206
1207 let user_stream_url = format!("wss://stream.binance.com:9443/ws/{}", listen_key);
1209
1210 let config = WsConfig {
1212 url: user_stream_url,
1213 connect_timeout: 10000,
1214 ping_interval: 180000,
1215 reconnect_interval: 5000,
1216 max_reconnect_attempts: 5,
1217 auto_reconnect: true,
1218 enable_compression: false,
1219 pong_timeout: 90000, ..Default::default()
1221 };
1222
1223 let _new_client = Arc::new(WsClient::new(config));
1225 self.client.connect().await?;
1229
1230 manager.start_auto_refresh().await;
1232
1233 *self.listen_key.write().await = Some(listen_key);
1235
1236 Ok(())
1237 }
1238
1239 pub async fn close_user_stream(&self) -> Result<()> {
1246 if let Some(manager) = &self.listen_key_manager {
1247 manager.delete().await?;
1248 }
1249 *self.listen_key.write().await = None;
1250 Ok(())
1251 }
1252
1253 pub async fn get_listen_key(&self) -> Option<String> {
1255 if let Some(manager) = &self.listen_key_manager {
1256 manager.get_current().await
1257 } else {
1258 self.listen_key.read().await.clone()
1259 }
1260 }
1261
1262 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
1270 let stream = format!("{}@ticker", symbol.to_lowercase());
1271 self.client
1272 .subscribe(stream, Some(symbol.to_string()), None)
1273 .await
1274 }
1275
1276 pub async fn subscribe_all_tickers(&self) -> Result<()> {
1278 self.client
1279 .subscribe("!ticker@arr".to_string(), None, None)
1280 .await
1281 }
1282
1283 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
1291 let stream = format!("{}@trade", symbol.to_lowercase());
1292 self.client
1293 .subscribe(stream, Some(symbol.to_string()), None)
1294 .await
1295 }
1296
1297 pub async fn subscribe_agg_trades(&self, symbol: &str) -> Result<()> {
1305 let stream = format!("{}@aggTrade", symbol.to_lowercase());
1306 self.client
1307 .subscribe(stream, Some(symbol.to_string()), None)
1308 .await
1309 }
1310
1311 pub async fn subscribe_orderbook(
1321 &self,
1322 symbol: &str,
1323 levels: u32,
1324 update_speed: &str,
1325 ) -> Result<()> {
1326 let stream = if update_speed == "100ms" {
1327 format!("{}@depth{}@100ms", symbol.to_lowercase(), levels)
1328 } else {
1329 format!("{}@depth{}", symbol.to_lowercase(), levels)
1330 };
1331
1332 self.client
1333 .subscribe(stream, Some(symbol.to_string()), None)
1334 .await
1335 }
1336
1337 pub async fn subscribe_orderbook_diff(
1346 &self,
1347 symbol: &str,
1348 update_speed: Option<&str>,
1349 ) -> Result<()> {
1350 let stream = if let Some(speed) = update_speed {
1351 if speed == "100ms" {
1352 format!("{}@depth@100ms", symbol.to_lowercase())
1353 } else {
1354 format!("{}@depth", symbol.to_lowercase())
1355 }
1356 } else {
1357 format!("{}@depth", symbol.to_lowercase())
1358 };
1359
1360 self.client
1361 .subscribe(stream, Some(symbol.to_string()), None)
1362 .await
1363 }
1364
1365 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
1374 let stream = format!("{}@kline_{}", symbol.to_lowercase(), interval);
1375 self.client
1376 .subscribe(stream, Some(symbol.to_string()), None)
1377 .await
1378 }
1379
1380 pub async fn subscribe_mini_ticker(&self, symbol: &str) -> Result<()> {
1388 let stream = format!("{}@miniTicker", symbol.to_lowercase());
1389 self.client
1390 .subscribe(stream, Some(symbol.to_string()), None)
1391 .await
1392 }
1393
1394 pub async fn subscribe_all_mini_tickers(&self) -> Result<()> {
1396 self.client
1397 .subscribe("!miniTicker@arr".to_string(), None, None)
1398 .await
1399 }
1400
1401 pub async fn unsubscribe(&self, stream: String) -> Result<()> {
1409 self.client.unsubscribe(stream, None).await
1410 }
1411
1412 pub async fn receive(&self) -> Option<Value> {
1417 self.client.receive().await
1418 }
1419
1420 pub fn is_connected(&self) -> bool {
1422 self.client.is_connected()
1423 }
1424
1425 async fn watch_ticker_internal(&self, symbol: &str, channel_name: &str) -> Result<Ticker> {
1434 let stream = format!("{}@{}", symbol.to_lowercase(), channel_name);
1435
1436 self.client
1438 .subscribe(stream.clone(), Some(symbol.to_string()), None)
1439 .await?;
1440
1441 loop {
1443 if let Some(message) = self.client.receive().await {
1444 if message.get("result").is_some() {
1446 continue;
1447 }
1448
1449 if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1451 let mut tickers = self.tickers.lock().await;
1453 tickers.insert(ticker.symbol.clone(), ticker.clone());
1454
1455 return Ok(ticker);
1456 }
1457 }
1458 }
1459 }
1460
1461 async fn watch_tickers_internal(
1470 &self,
1471 symbols: Option<Vec<String>>,
1472 channel_name: &str,
1473 ) -> Result<HashMap<String, Ticker>> {
1474 let streams: Vec<String> = if let Some(syms) = symbols.as_ref() {
1475 syms.iter()
1477 .map(|s| format!("{}@{}", s.to_lowercase(), channel_name))
1478 .collect()
1479 } else {
1480 vec![format!("!{}@arr", channel_name)]
1482 };
1483
1484 for stream in &streams {
1486 self.client.subscribe(stream.clone(), None, None).await?;
1487 }
1488
1489 let mut result = HashMap::new();
1491
1492 loop {
1493 if let Some(message) = self.client.receive().await {
1494 if message.get("result").is_some() {
1496 continue;
1497 }
1498
1499 if let Some(arr) = message.as_array() {
1501 for item in arr {
1502 if let Ok(ticker) = parser::parse_ws_ticker(item, None) {
1503 let symbol = ticker.symbol.clone();
1504
1505 if let Some(syms) = &symbols {
1507 if syms.contains(&symbol.to_lowercase()) {
1508 result.insert(symbol.clone(), ticker.clone());
1509 }
1510 } else {
1511 result.insert(symbol.clone(), ticker.clone());
1512 }
1513
1514 let mut tickers = self.tickers.lock().await;
1516 tickers.insert(symbol, ticker);
1517 }
1518 }
1519
1520 if let Some(syms) = &symbols {
1522 if result.len() == syms.len() {
1523 return Ok(result);
1524 }
1525 } else {
1526 return Ok(result);
1527 }
1528 } else if let Ok(ticker) = parser::parse_ws_ticker(&message, None) {
1529 let symbol = ticker.symbol.clone();
1531 result.insert(symbol.clone(), ticker.clone());
1532
1533 let mut tickers = self.tickers.lock().await;
1535 tickers.insert(symbol, ticker);
1536
1537 if let Some(syms) = &symbols {
1539 if result.len() == syms.len() {
1540 return Ok(result);
1541 }
1542 }
1543 }
1544 }
1545 }
1546 }
1547
1548 async fn handle_orderbook_delta(
1558 &self,
1559 symbol: &str,
1560 delta_message: &Value,
1561 is_futures: bool,
1562 ) -> Result<()> {
1563 use ccxt_core::types::orderbook::{OrderBookDelta, OrderBookEntry};
1564 use rust_decimal::Decimal;
1565
1566 let first_update_id = delta_message["U"]
1568 .as_i64()
1569 .ok_or_else(|| Error::invalid_request("Missing first update ID in delta message"))?;
1570
1571 let final_update_id = delta_message["u"]
1572 .as_i64()
1573 .ok_or_else(|| Error::invalid_request("Missing final update ID in delta message"))?;
1574
1575 let prev_final_update_id = if is_futures {
1576 delta_message["pu"].as_i64()
1577 } else {
1578 None
1579 };
1580
1581 let timestamp = delta_message["E"]
1582 .as_i64()
1583 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1584
1585 let mut bids = Vec::new();
1587 if let Some(bids_arr) = delta_message["b"].as_array() {
1588 for bid in bids_arr {
1589 if let (Some(price_str), Some(amount_str)) = (bid[0].as_str(), bid[1].as_str()) {
1590 if let (Ok(price), Ok(amount)) =
1591 (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1592 {
1593 bids.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1594 }
1595 }
1596 }
1597 }
1598
1599 let mut asks = Vec::new();
1601 if let Some(asks_arr) = delta_message["a"].as_array() {
1602 for ask in asks_arr {
1603 if let (Some(price_str), Some(amount_str)) = (ask[0].as_str(), ask[1].as_str()) {
1604 if let (Ok(price), Ok(amount)) =
1605 (price_str.parse::<Decimal>(), amount_str.parse::<Decimal>())
1606 {
1607 asks.push(OrderBookEntry::new(Price::new(price), Amount::new(amount)));
1608 }
1609 }
1610 }
1611 }
1612
1613 let delta = OrderBookDelta {
1615 symbol: symbol.to_string(),
1616 first_update_id,
1617 final_update_id,
1618 prev_final_update_id,
1619 timestamp,
1620 bids,
1621 asks,
1622 };
1623
1624 let mut orderbooks = self.orderbooks.lock().await;
1626 let orderbook = orderbooks
1627 .entry(symbol.to_string())
1628 .or_insert_with(|| OrderBook::new(symbol.to_string(), timestamp));
1629
1630 if !orderbook.is_synced {
1632 orderbook.buffer_delta(delta);
1633 return Ok(());
1634 }
1635
1636 if let Err(e) = orderbook.apply_delta(&delta, is_futures) {
1638 if orderbook.needs_resync {
1640 tracing::warn!("Orderbook {} needs resync due to: {}", symbol, e);
1641 orderbook.buffer_delta(delta);
1643 return Err(Error::invalid_request(format!("RESYNC_NEEDED: {}", e)));
1645 }
1646 return Err(Error::invalid_request(e));
1647 }
1648
1649 Ok(())
1650 }
1651
1652 async fn fetch_orderbook_snapshot(
1663 &self,
1664 exchange: &Binance,
1665 symbol: &str,
1666 limit: Option<i64>,
1667 is_futures: bool,
1668 ) -> Result<OrderBook> {
1669 let mut params = HashMap::new();
1671 if let Some(l) = limit {
1672 #[allow(clippy::disallowed_methods)]
1674 let limit_value = serde_json::json!(l);
1675 params.insert("limit".to_string(), limit_value);
1676 }
1677
1678 let mut snapshot = exchange.fetch_order_book(symbol, None).await?;
1679
1680 snapshot.is_synced = true;
1682
1683 let mut orderbooks = self.orderbooks.lock().await;
1685 if let Some(cached_ob) = orderbooks.get_mut(symbol) {
1686 snapshot
1688 .buffered_deltas
1689 .clone_from(&cached_ob.buffered_deltas);
1690
1691 if let Ok(processed) = snapshot.process_buffered_deltas(is_futures) {
1693 tracing::debug!("Processed {} buffered deltas for {}", processed, symbol);
1694 }
1695 }
1696
1697 orderbooks.insert(symbol.to_string(), snapshot.clone());
1699
1700 Ok(snapshot)
1701 }
1702
1703 async fn watch_orderbook_internal(
1715 &self,
1716 exchange: &Binance,
1717 symbol: &str,
1718 limit: Option<i64>,
1719 update_speed: i32,
1720 is_futures: bool,
1721 ) -> Result<OrderBook> {
1722 let stream = if update_speed == 100 {
1724 format!("{}@depth@100ms", symbol.to_lowercase())
1725 } else {
1726 format!("{}@depth", symbol.to_lowercase())
1727 };
1728
1729 self.client
1731 .subscribe(stream.clone(), Some(symbol.to_string()), None)
1732 .await?;
1733
1734 let snapshot_fetched = Arc::new(Mutex::new(false));
1736 let _snapshot_fetched_clone = snapshot_fetched.clone();
1737
1738 let _orderbooks_clone = self.orderbooks.clone();
1740 let _symbol_clone = symbol.to_string();
1741
1742 tokio::spawn(async move {
1743 });
1745
1746 tokio::time::sleep(Duration::from_millis(500)).await;
1748
1749 let _snapshot = self
1751 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1752 .await?;
1753
1754 *snapshot_fetched.lock().await = true;
1755
1756 loop {
1758 if let Some(message) = self.client.receive().await {
1759 if message.get("result").is_some() {
1761 continue;
1762 }
1763
1764 if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
1766 if event_type == "depthUpdate" {
1767 match self
1768 .handle_orderbook_delta(symbol, &message, is_futures)
1769 .await
1770 {
1771 Ok(()) => {
1772 let orderbooks = self.orderbooks.lock().await;
1774 if let Some(ob) = orderbooks.get(symbol) {
1775 if ob.is_synced {
1776 return Ok(ob.clone());
1777 }
1778 }
1779 }
1780 Err(e) => {
1781 let err_msg = e.to_string();
1782
1783 if err_msg.contains("RESYNC_NEEDED") {
1785 tracing::warn!("Resync needed for {}: {}", symbol, err_msg);
1786
1787 let current_time = chrono::Utc::now().timestamp_millis();
1788 let should_resync = {
1789 let orderbooks = self.orderbooks.lock().await;
1790 if let Some(ob) = orderbooks.get(symbol) {
1791 ob.should_resync(current_time)
1792 } else {
1793 true
1794 }
1795 };
1796
1797 if should_resync {
1798 tracing::info!("Initiating resync for {}", symbol);
1799
1800 {
1802 let mut orderbooks = self.orderbooks.lock().await;
1803 if let Some(ob) = orderbooks.get_mut(symbol) {
1804 ob.reset_for_resync();
1805 ob.mark_resync_initiated(current_time);
1806 }
1807 }
1808
1809 tokio::time::sleep(Duration::from_millis(500)).await;
1811
1812 match self
1814 .fetch_orderbook_snapshot(
1815 exchange, symbol, limit, is_futures,
1816 )
1817 .await
1818 {
1819 Ok(_) => {
1820 tracing::info!(
1821 "Resync completed successfully for {}",
1822 symbol
1823 );
1824 continue;
1825 }
1826 Err(resync_err) => {
1827 tracing::error!(
1828 "Resync failed for {}: {}",
1829 symbol,
1830 resync_err
1831 );
1832 return Err(resync_err);
1833 }
1834 }
1835 }
1836 tracing::debug!("Resync rate limited for {}, skipping", symbol);
1837 }
1838 tracing::error!("Failed to handle orderbook delta: {}", err_msg);
1839 }
1840 }
1841 }
1842 }
1843 }
1844 }
1845 }
1846
1847 async fn watch_orderbooks_internal(
1859 &self,
1860 exchange: &Binance,
1861 symbols: Vec<String>,
1862 limit: Option<i64>,
1863 update_speed: i32,
1864 is_futures: bool,
1865 ) -> Result<HashMap<String, OrderBook>> {
1866 if symbols.len() > 200 {
1868 return Err(Error::invalid_request(
1869 "Binance supports max 200 symbols per connection",
1870 ));
1871 }
1872
1873 for symbol in &symbols {
1875 let stream = if update_speed == 100 {
1876 format!("{}@depth@100ms", symbol.to_lowercase())
1877 } else {
1878 format!("{}@depth", symbol.to_lowercase())
1879 };
1880
1881 self.client
1882 .subscribe(stream, Some(symbol.clone()), None)
1883 .await?;
1884 }
1885
1886 tokio::time::sleep(Duration::from_millis(500)).await;
1888
1889 for symbol in &symbols {
1891 let _ = self
1892 .fetch_orderbook_snapshot(exchange, symbol, limit, is_futures)
1893 .await;
1894 }
1895
1896 let mut result = HashMap::new();
1898 let mut update_count = 0;
1899
1900 while update_count < symbols.len() {
1901 if let Some(message) = self.client.receive().await {
1902 if message.get("result").is_some() {
1904 continue;
1905 }
1906
1907 if let Some(event_type) = message.get("e").and_then(serde_json::Value::as_str) {
1909 if event_type == "depthUpdate" {
1910 if let Some(msg_symbol) =
1911 message.get("s").and_then(serde_json::Value::as_str)
1912 {
1913 if let Err(e) = self
1914 .handle_orderbook_delta(msg_symbol, &message, is_futures)
1915 .await
1916 {
1917 tracing::error!("Failed to handle orderbook delta: {}", e);
1918 continue;
1919 }
1920
1921 update_count += 1;
1922 }
1923 }
1924 }
1925 }
1926 }
1927
1928 let orderbooks = self.orderbooks.lock().await;
1930 for symbol in &symbols {
1931 if let Some(ob) = orderbooks.get(symbol) {
1932 result.insert(symbol.clone(), ob.clone());
1933 }
1934 }
1935
1936 Ok(result)
1937 }
1938
1939 pub async fn get_cached_ticker(&self, symbol: &str) -> Option<Ticker> {
1946 let tickers = self.tickers.lock().await;
1947 tickers.get(symbol).cloned()
1948 }
1949
1950 pub async fn get_all_cached_tickers(&self) -> HashMap<String, Ticker> {
1952 let tickers = self.tickers.lock().await;
1953 tickers.clone()
1954 }
1955
1956 async fn handle_balance_message(&self, message: &Value, account_type: &str) -> Result<()> {
1965 use rust_decimal::Decimal;
1966 use std::str::FromStr;
1967
1968 let event_type = message
1970 .get("e")
1971 .and_then(|e| e.as_str())
1972 .ok_or_else(|| Error::invalid_request("Missing event type in balance message"))?;
1973
1974 let mut balances = self.balances.write().await;
1976 let balance = balances
1977 .entry(account_type.to_string())
1978 .or_insert_with(Balance::new);
1979
1980 match event_type {
1981 "balanceUpdate" => {
1983 let asset = message
1984 .get("a")
1985 .and_then(|a| a.as_str())
1986 .ok_or_else(|| Error::invalid_request("Missing asset in balanceUpdate"))?;
1987
1988 let delta_str = message
1989 .get("d")
1990 .and_then(|d| d.as_str())
1991 .ok_or_else(|| Error::invalid_request("Missing delta in balanceUpdate"))?;
1992
1993 let delta = Decimal::from_str(delta_str)
1994 .map_err(|e| Error::invalid_request(format!("Invalid delta value: {}", e)))?;
1995
1996 balance.apply_delta(asset.to_string(), delta);
1998 }
1999
2000 "outboundAccountPosition" => {
2002 if let Some(balances_array) = message.get("B").and_then(|b| b.as_array()) {
2003 for balance_item in balances_array {
2004 let asset =
2005 balance_item
2006 .get("a")
2007 .and_then(|a| a.as_str())
2008 .ok_or_else(|| {
2009 Error::invalid_request("Missing asset in balance item")
2010 })?;
2011
2012 let free_str = balance_item
2013 .get("f")
2014 .and_then(|f| f.as_str())
2015 .ok_or_else(|| Error::invalid_request("Missing free balance"))?;
2016
2017 let locked_str = balance_item
2018 .get("l")
2019 .and_then(|l| l.as_str())
2020 .ok_or_else(|| Error::invalid_request("Missing locked balance"))?;
2021
2022 let free = Decimal::from_str(free_str).map_err(|e| {
2023 Error::invalid_request(format!("Invalid free value: {}", e))
2024 })?;
2025
2026 let locked = Decimal::from_str(locked_str).map_err(|e| {
2027 Error::invalid_request(format!("Invalid locked value: {}", e))
2028 })?;
2029
2030 balance.update_balance(asset.to_string(), free, locked);
2032 }
2033 }
2034 }
2035
2036 "ACCOUNT_UPDATE" => {
2038 if let Some(account_data) = message.get("a") {
2039 if let Some(balances_array) = account_data.get("B").and_then(|b| b.as_array()) {
2041 for balance_item in balances_array {
2042 let asset = balance_item.get("a").and_then(|a| a.as_str()).ok_or_else(
2043 || Error::invalid_request("Missing asset in balance item"),
2044 )?;
2045
2046 let wallet_balance_str = balance_item
2047 .get("wb")
2048 .and_then(|wb| wb.as_str())
2049 .ok_or_else(|| Error::invalid_request("Missing wallet balance"))?;
2050
2051 let wallet_balance =
2052 Decimal::from_str(wallet_balance_str).map_err(|e| {
2053 Error::invalid_request(format!("Invalid wallet balance: {}", e))
2054 })?;
2055
2056 let cross_wallet = balance_item
2058 .get("cw")
2059 .and_then(|cw| cw.as_str())
2060 .and_then(|s| Decimal::from_str(s).ok());
2061
2062 balance.update_wallet(asset.to_string(), wallet_balance, cross_wallet);
2064 }
2065 }
2066
2067 }
2069 }
2070
2071 _ => {
2072 return Err(Error::invalid_request(format!(
2073 "Unknown balance event type: {}",
2074 event_type
2075 )));
2076 }
2077 }
2078
2079 Ok(())
2080 }
2081
2082 fn parse_ws_trade(data: &Value) -> Result<Trade> {
2092 use ccxt_core::types::{Fee, OrderSide, OrderType, TakerOrMaker};
2093 use rust_decimal::Decimal;
2094 use std::str::FromStr;
2095
2096 let symbol = data
2098 .get("s")
2099 .and_then(serde_json::Value::as_str)
2100 .ok_or_else(|| Error::invalid_request("Missing symbol field".to_string()))?
2101 .to_string();
2102
2103 let id = data
2105 .get("t")
2106 .and_then(serde_json::Value::as_i64)
2107 .map(|v| v.to_string());
2108
2109 let timestamp = data
2111 .get("T")
2112 .and_then(serde_json::Value::as_i64)
2113 .unwrap_or(0);
2114
2115 let price = data
2117 .get("L")
2118 .and_then(serde_json::Value::as_str)
2119 .and_then(|s| Decimal::from_str(s).ok())
2120 .unwrap_or(Decimal::ZERO);
2121
2122 let amount = data
2124 .get("l")
2125 .and_then(serde_json::Value::as_str)
2126 .and_then(|s| Decimal::from_str(s).ok())
2127 .unwrap_or(Decimal::ZERO);
2128
2129 let cost = data
2131 .get("Y")
2132 .and_then(serde_json::Value::as_str)
2133 .and_then(|s| Decimal::from_str(s).ok())
2134 .or_else(|| {
2135 if price > Decimal::ZERO && amount > Decimal::ZERO {
2137 Some(price * amount)
2138 } else {
2139 None
2140 }
2141 });
2142
2143 let side = data
2145 .get("S")
2146 .and_then(serde_json::Value::as_str)
2147 .and_then(|s| match s.to_uppercase().as_str() {
2148 "BUY" => Some(OrderSide::Buy),
2149 "SELL" => Some(OrderSide::Sell),
2150 _ => None,
2151 })
2152 .unwrap_or(OrderSide::Buy);
2153
2154 let trade_type = data
2156 .get("o")
2157 .and_then(serde_json::Value::as_str)
2158 .and_then(|s| match s.to_uppercase().as_str() {
2159 "LIMIT" => Some(OrderType::Limit),
2160 "MARKET" => Some(OrderType::Market),
2161 _ => None,
2162 });
2163
2164 let order_id = data
2166 .get("i")
2167 .and_then(serde_json::Value::as_i64)
2168 .map(|v| v.to_string());
2169
2170 let taker_or_maker = data
2172 .get("m")
2173 .and_then(serde_json::Value::as_bool)
2174 .map(|is_maker| {
2175 if is_maker {
2176 TakerOrMaker::Maker
2177 } else {
2178 TakerOrMaker::Taker
2179 }
2180 });
2181
2182 let fee = if let Some(fee_cost_str) = data.get("n").and_then(serde_json::Value::as_str) {
2184 if let Ok(fee_cost) = Decimal::from_str(fee_cost_str) {
2185 let currency = data
2186 .get("N")
2187 .and_then(serde_json::Value::as_str)
2188 .unwrap_or("UNKNOWN")
2189 .to_string();
2190 Some(Fee {
2191 currency,
2192 cost: fee_cost,
2193 rate: None,
2194 })
2195 } else {
2196 None
2197 }
2198 } else {
2199 None
2200 };
2201
2202 let datetime = chrono::DateTime::from_timestamp_millis(timestamp)
2204 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string());
2205
2206 let mut info = HashMap::new();
2208 if let Value::Object(map) = data {
2209 for (k, v) in map {
2210 info.insert(k.clone(), v.clone());
2211 }
2212 }
2213
2214 Ok(Trade {
2215 id,
2216 order: order_id,
2217 symbol,
2218 trade_type,
2219 side,
2220 taker_or_maker,
2221 price: Price::from(price),
2222 amount: Amount::from(amount),
2223 cost: cost.map(Cost::from),
2224 fee,
2225 timestamp,
2226 datetime,
2227 info,
2228 })
2229 }
2230
2231 async fn filter_my_trades(
2238 &self,
2239 symbol: Option<&str>,
2240 since: Option<i64>,
2241 limit: Option<usize>,
2242 ) -> Result<Vec<Trade>> {
2243 let trades_map = self.my_trades.read().await;
2244
2245 let mut trades: Vec<Trade> = if let Some(sym) = symbol {
2247 trades_map
2248 .get(sym)
2249 .map(|symbol_trades| symbol_trades.iter().cloned().collect())
2250 .unwrap_or_default()
2251 } else {
2252 trades_map
2253 .values()
2254 .flat_map(|symbol_trades| symbol_trades.iter().cloned())
2255 .collect()
2256 };
2257
2258 if let Some(since_ts) = since {
2260 trades.retain(|trade| trade.timestamp >= since_ts);
2261 }
2262
2263 trades.sort_by(|a, b| {
2265 let ts_a = a.timestamp;
2266 let ts_b = b.timestamp;
2267 ts_b.cmp(&ts_a)
2268 });
2269
2270 if let Some(lim) = limit {
2272 trades.truncate(lim);
2273 }
2274
2275 Ok(trades)
2276 }
2277
2278 fn parse_ws_position(data: &Value) -> Result<Position> {
2301 let symbol = data["s"]
2303 .as_str()
2304 .ok_or_else(|| Error::invalid_request("Missing symbol field"))?
2305 .to_string();
2306
2307 let position_amount_str = data["pa"]
2308 .as_str()
2309 .ok_or_else(|| Error::invalid_request("Missing position amount"))?;
2310
2311 let position_amount = position_amount_str
2312 .parse::<f64>()
2313 .map_err(|e| Error::invalid_request(format!("Invalid position amount: {}", e)))?;
2314
2315 let position_side = data["ps"]
2317 .as_str()
2318 .ok_or_else(|| Error::invalid_request("Missing position side"))?
2319 .to_uppercase();
2320
2321 let (side, hedged) = if position_side == "BOTH" {
2325 let actual_side = if position_amount < 0.0 {
2326 "short"
2327 } else {
2328 "long"
2329 };
2330 (actual_side.to_string(), false)
2331 } else {
2332 (position_side.to_lowercase(), true)
2333 };
2334
2335 let entry_price = data["ep"].as_str().and_then(|s| s.parse::<f64>().ok());
2337 let unrealized_pnl = data["up"].as_str().and_then(|s| s.parse::<f64>().ok());
2338 let realized_pnl = data["cr"].as_str().and_then(|s| s.parse::<f64>().ok());
2339 let margin_mode = data["mt"].as_str().map(ToString::to_string);
2340 let initial_margin = data["iw"].as_str().and_then(|s| s.parse::<f64>().ok());
2341 let _margin_asset = data["ma"].as_str().map(ToString::to_string);
2342
2343 Ok(Position {
2345 info: data.clone(),
2346 id: None,
2347 symbol,
2348 side: Some(side),
2349 contracts: Some(position_amount.abs()), contract_size: None,
2351 entry_price,
2352 mark_price: None,
2353 notional: None,
2354 leverage: None,
2355 collateral: initial_margin, initial_margin,
2357 initial_margin_percentage: None,
2358 maintenance_margin: None,
2359 maintenance_margin_percentage: None,
2360 unrealized_pnl,
2361 realized_pnl,
2362 liquidation_price: None,
2363 margin_ratio: None,
2364 margin_mode,
2365 hedged: Some(hedged),
2366 percentage: None,
2367 position_side: None,
2368 dual_side_position: None,
2369 timestamp: Some(chrono::Utc::now().timestamp_millis()),
2370 datetime: Some(chrono::Utc::now().to_rfc3339()),
2371 })
2372 }
2373
2374 async fn filter_positions(
2384 &self,
2385 symbols: Option<&[String]>,
2386 since: Option<i64>,
2387 limit: Option<usize>,
2388 ) -> Result<Vec<Position>> {
2389 let positions_map = self.positions.read().await;
2390
2391 let mut positions: Vec<Position> = if let Some(syms) = symbols {
2393 syms.iter()
2394 .filter_map(|sym| positions_map.get(sym))
2395 .flat_map(|side_map| side_map.values().cloned())
2396 .collect()
2397 } else {
2398 positions_map
2399 .values()
2400 .flat_map(|side_map| side_map.values().cloned())
2401 .collect()
2402 };
2403
2404 if let Some(since_ts) = since {
2406 positions.retain(|pos| pos.timestamp.is_some_and(|ts| ts >= since_ts));
2407 }
2408
2409 positions.sort_by(|a, b| {
2411 let ts_a = a.timestamp.unwrap_or(0);
2412 let ts_b = b.timestamp.unwrap_or(0);
2413 ts_b.cmp(&ts_a)
2414 });
2415
2416 if let Some(lim) = limit {
2418 positions.truncate(lim);
2419 }
2420
2421 Ok(positions)
2422 }
2423}
2424
2425impl Binance {
2426 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
2434 let ws = self.create_ws();
2435 ws.connect().await?;
2436
2437 let binance_symbol = symbol.replace('/', "").to_lowercase();
2439 ws.subscribe_ticker(&binance_symbol).await
2440 }
2441
2442 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
2450 let ws = self.create_ws();
2451 ws.connect().await?;
2452
2453 let binance_symbol = symbol.replace('/', "").to_lowercase();
2454 ws.subscribe_trades(&binance_symbol).await
2455 }
2456
2457 pub async fn subscribe_orderbook(&self, symbol: &str, levels: Option<u32>) -> Result<()> {
2466 let ws = self.create_ws();
2467 ws.connect().await?;
2468
2469 let binance_symbol = symbol.replace('/', "").to_lowercase();
2470 let depth_levels = levels.unwrap_or(20);
2471 ws.subscribe_orderbook(&binance_symbol, depth_levels, "1000ms")
2472 .await
2473 }
2474
2475 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
2484 let ws = self.create_ws();
2485 ws.connect().await?;
2486
2487 let binance_symbol = symbol.replace('/', "").to_lowercase();
2488 ws.subscribe_kline(&binance_symbol, interval).await
2489 }
2490
2491 pub async fn watch_ticker(
2514 &self,
2515 symbol: &str,
2516 params: Option<HashMap<String, Value>>,
2517 ) -> Result<Ticker> {
2518 self.load_markets(false).await?;
2520
2521 let market = self.base.market(symbol).await?;
2523 let binance_symbol = market.id.to_lowercase();
2524
2525 let channel_name = if let Some(p) = ¶ms {
2527 p.get("name")
2528 .and_then(serde_json::Value::as_str)
2529 .unwrap_or("ticker")
2530 } else {
2531 "ticker"
2532 };
2533
2534 let ws = self.create_ws();
2536 ws.connect().await?;
2537
2538 ws.watch_ticker_internal(&binance_symbol, channel_name)
2540 .await
2541 }
2542
2543 pub async fn watch_tickers(
2572 &self,
2573 symbols: Option<Vec<String>>,
2574 params: Option<HashMap<String, Value>>,
2575 ) -> Result<HashMap<String, Ticker>> {
2576 self.load_markets(false).await?;
2578
2579 let channel_name = if let Some(p) = ¶ms {
2581 p.get("name")
2582 .and_then(serde_json::Value::as_str)
2583 .unwrap_or("ticker")
2584 } else {
2585 "ticker"
2586 };
2587
2588 if channel_name == "bookTicker" {
2590 return Err(Error::invalid_request(
2591 "To subscribe for bids-asks, use watch_bids_asks() method instead",
2592 ));
2593 }
2594
2595 let binance_symbols = if let Some(syms) = symbols {
2597 let mut result = Vec::new();
2598 for symbol in syms {
2599 let market = self.base.market(&symbol).await?;
2600 result.push(market.id.to_lowercase());
2601 }
2602 Some(result)
2603 } else {
2604 None
2605 };
2606
2607 let ws = self.create_ws();
2609 ws.connect().await?;
2610
2611 ws.watch_tickers_internal(binance_symbols, channel_name)
2613 .await
2614 }
2615
2616 pub async fn watch_mark_price(
2646 &self,
2647 symbol: &str,
2648 params: Option<HashMap<String, Value>>,
2649 ) -> Result<Ticker> {
2650 self.load_markets(false).await?;
2652
2653 let market = self.base.market(symbol).await?;
2655 if market.market_type != MarketType::Swap && market.market_type != MarketType::Futures {
2656 return Err(Error::invalid_request(format!(
2657 "watch_mark_price() does not support {} markets",
2658 market.market_type
2659 )));
2660 }
2661
2662 let binance_symbol = market.id.to_lowercase();
2663
2664 let use_1s_freq = if let Some(p) = ¶ms {
2666 p.get("use1sFreq")
2667 .and_then(serde_json::Value::as_bool)
2668 .unwrap_or(true)
2669 } else {
2670 true
2671 };
2672
2673 let channel_name = if use_1s_freq {
2675 "markPrice@1s"
2676 } else {
2677 "markPrice"
2678 };
2679
2680 let ws = self.create_ws();
2682 ws.connect().await?;
2683
2684 ws.watch_ticker_internal(&binance_symbol, channel_name)
2686 .await
2687 }
2688
2689 pub async fn watch_order_book(
2726 &self,
2727 symbol: &str,
2728 limit: Option<i64>,
2729 params: Option<HashMap<String, Value>>,
2730 ) -> Result<OrderBook> {
2731 self.load_markets(false).await?;
2733
2734 let market = self.base.market(symbol).await?;
2736 let binance_symbol = market.id.to_lowercase();
2737
2738 let is_futures =
2740 market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2741
2742 let update_speed = if let Some(p) = ¶ms {
2744 p.get("speed")
2745 .and_then(serde_json::Value::as_i64)
2746 .unwrap_or(100) as i32
2747 } else {
2748 100
2749 };
2750
2751 if update_speed != 100 && update_speed != 1000 {
2753 return Err(Error::invalid_request(
2754 "Update speed must be 100 or 1000 milliseconds",
2755 ));
2756 }
2757
2758 let ws = self.create_ws();
2760 ws.connect().await?;
2761
2762 ws.watch_orderbook_internal(self, &binance_symbol, limit, update_speed, is_futures)
2764 .await
2765 }
2766
2767 pub async fn watch_order_books(
2798 &self,
2799 symbols: Vec<String>,
2800 limit: Option<i64>,
2801 params: Option<HashMap<String, Value>>,
2802 ) -> Result<HashMap<String, OrderBook>> {
2803 if symbols.is_empty() {
2805 return Err(Error::invalid_request("Symbols list cannot be empty"));
2806 }
2807
2808 if symbols.len() > 200 {
2809 return Err(Error::invalid_request(
2810 "Binance supports max 200 symbols per connection",
2811 ));
2812 }
2813
2814 self.load_markets(false).await?;
2816
2817 let mut binance_symbols = Vec::new();
2819 let mut is_futures = false;
2820
2821 for symbol in &symbols {
2822 let market = self.base.market(symbol).await?;
2823 binance_symbols.push(market.id.to_lowercase());
2824
2825 let current_is_futures =
2826 market.market_type == MarketType::Swap || market.market_type == MarketType::Futures;
2827 if !binance_symbols.is_empty() && current_is_futures != is_futures {
2828 return Err(Error::invalid_request(
2829 "Cannot mix spot and futures markets in watch_order_books",
2830 ));
2831 }
2832 is_futures = current_is_futures;
2833 }
2834
2835 let update_speed = if let Some(p) = ¶ms {
2837 p.get("speed")
2838 .and_then(serde_json::Value::as_i64)
2839 .unwrap_or(100) as i32
2840 } else {
2841 100
2842 };
2843
2844 let ws = self.create_ws();
2846 ws.connect().await?;
2847
2848 ws.watch_orderbooks_internal(self, binance_symbols, limit, update_speed, is_futures)
2850 .await
2851 }
2852
2853 pub async fn watch_mark_prices(
2863 &self,
2864 symbols: Option<Vec<String>>,
2865 params: Option<HashMap<String, Value>>,
2866 ) -> Result<HashMap<String, Ticker>> {
2867 self.load_markets(false).await?;
2869
2870 let use_1s_freq = if let Some(p) = ¶ms {
2872 p.get("use1sFreq")
2873 .and_then(serde_json::Value::as_bool)
2874 .unwrap_or(true)
2875 } else {
2876 true
2877 };
2878
2879 let channel_name = if use_1s_freq {
2881 "markPrice@1s"
2882 } else {
2883 "markPrice"
2884 };
2885
2886 let binance_symbols = if let Some(syms) = symbols {
2888 let mut result = Vec::new();
2889 for symbol in syms {
2890 let market = self.base.market(&symbol).await?;
2891 if market.market_type != MarketType::Swap
2892 && market.market_type != MarketType::Futures
2893 {
2894 return Err(Error::invalid_request(format!(
2895 "watch_mark_prices() does not support {} markets",
2896 market.market_type
2897 )));
2898 }
2899 result.push(market.id.to_lowercase());
2900 }
2901 Some(result)
2902 } else {
2903 None
2904 };
2905
2906 let ws = self.create_ws();
2908 ws.connect().await?;
2909
2910 ws.watch_tickers_internal(binance_symbols, channel_name)
2912 .await
2913 }
2914 pub async fn watch_trades(
2924 &self,
2925 symbol: &str,
2926 since: Option<i64>,
2927 limit: Option<usize>,
2928 ) -> Result<Vec<Trade>> {
2929 const MAX_RETRIES: u32 = 50;
2930 const MAX_TRADES: usize = 1000;
2931
2932 self.base.load_markets(false).await?;
2934
2935 let market = self.base.market(symbol).await?;
2937 let binance_symbol = market.id.to_lowercase();
2938
2939 let ws = self.create_ws();
2941 ws.connect().await?;
2942
2943 ws.subscribe_trades(&binance_symbol).await?;
2945
2946 let mut retries = 0;
2948
2949 while retries < MAX_RETRIES {
2950 if let Some(msg) = ws.client.receive().await {
2951 if msg.get("result").is_some() || msg.get("id").is_some() {
2953 continue;
2954 }
2955
2956 if let Ok(trade) = parser::parse_ws_trade(&msg, Some(&market)) {
2958 let mut trades_map = ws.trades.lock().await;
2960 let trades = trades_map
2961 .entry(symbol.to_string())
2962 .or_insert_with(VecDeque::new);
2963
2964 if trades.len() >= MAX_TRADES {
2966 trades.pop_front();
2967 }
2968 trades.push_back(trade);
2969
2970 let mut result: Vec<Trade> = trades.iter().cloned().collect();
2972
2973 if let Some(since_ts) = since {
2975 result.retain(|t| t.timestamp >= since_ts);
2976 }
2977
2978 if let Some(limit_size) = limit {
2980 if result.len() > limit_size {
2981 result = result.split_off(result.len() - limit_size);
2982 }
2983 }
2984
2985 return Ok(result);
2986 }
2987 }
2988
2989 retries += 1;
2990 tokio::time::sleep(Duration::from_millis(100)).await;
2991 }
2992
2993 Err(Error::network("Timeout waiting for trade data"))
2994 }
2995
2996 pub async fn watch_ohlcv(
3007 &self,
3008 symbol: &str,
3009 timeframe: &str,
3010 since: Option<i64>,
3011 limit: Option<usize>,
3012 ) -> Result<Vec<OHLCV>> {
3013 const MAX_RETRIES: u32 = 50;
3014 const MAX_OHLCVS: usize = 1000;
3015
3016 self.base.load_markets(false).await?;
3018
3019 let market = self.base.market(symbol).await?;
3021 let binance_symbol = market.id.to_lowercase();
3022
3023 let ws = self.create_ws();
3025 ws.connect().await?;
3026
3027 ws.subscribe_kline(&binance_symbol, timeframe).await?;
3029
3030 let mut retries = 0;
3032
3033 while retries < MAX_RETRIES {
3034 if let Some(msg) = ws.client.receive().await {
3035 if msg.get("result").is_some() || msg.get("id").is_some() {
3037 continue;
3038 }
3039
3040 if let Ok(ohlcv) = parser::parse_ws_ohlcv(&msg) {
3042 let cache_key = format!("{}:{}", symbol, timeframe);
3044 let mut ohlcvs_map = ws.ohlcvs.lock().await;
3045 let ohlcvs = ohlcvs_map.entry(cache_key).or_insert_with(VecDeque::new);
3046
3047 if ohlcvs.len() >= MAX_OHLCVS {
3049 ohlcvs.pop_front();
3050 }
3051 ohlcvs.push_back(ohlcv);
3052
3053 let mut result: Vec<OHLCV> = ohlcvs.iter().cloned().collect();
3055
3056 if let Some(since_ts) = since {
3058 result.retain(|o| o.timestamp >= since_ts);
3059 }
3060
3061 if let Some(limit_size) = limit {
3063 if result.len() > limit_size {
3064 result = result.split_off(result.len() - limit_size);
3065 }
3066 }
3067
3068 return Ok(result);
3069 }
3070 }
3071
3072 retries += 1;
3073 tokio::time::sleep(Duration::from_millis(100)).await;
3074 }
3075
3076 Err(Error::network("Timeout waiting for OHLCV data"))
3077 }
3078
3079 pub async fn watch_bids_asks(&self, symbol: &str) -> Result<BidAsk> {
3087 const MAX_RETRIES: u32 = 50;
3088
3089 self.base.load_markets(false).await?;
3091
3092 let market = self.base.market(symbol).await?;
3094 let binance_symbol = market.id.to_lowercase();
3095
3096 let ws = self.create_ws();
3098 ws.connect().await?;
3099
3100 let stream_name = format!("{}@bookTicker", binance_symbol);
3102 ws.client
3103 .subscribe(stream_name, Some(symbol.to_string()), None)
3104 .await?;
3105
3106 let mut retries = 0;
3108
3109 while retries < MAX_RETRIES {
3110 if let Some(msg) = ws.client.receive().await {
3111 if msg.get("result").is_some() || msg.get("id").is_some() {
3113 continue;
3114 }
3115
3116 if let Ok(bid_ask) = parser::parse_ws_bid_ask(&msg) {
3118 let mut bids_asks_map = ws.bids_asks.lock().await;
3120 bids_asks_map.insert(symbol.to_string(), bid_ask.clone());
3121
3122 return Ok(bid_ask);
3123 }
3124 }
3125
3126 retries += 1;
3127 tokio::time::sleep(Duration::from_millis(100)).await;
3128 }
3129
3130 Err(Error::network("Timeout waiting for BidAsk data"))
3131 }
3132
3133 pub async fn watch_balance(
3169 self: Arc<Self>,
3170 params: Option<HashMap<String, Value>>,
3171 ) -> Result<Balance> {
3172 const MAX_RETRIES: u32 = 100;
3173
3174 self.base.load_markets(false).await?;
3176
3177 let account_type = if let Some(p) = ¶ms {
3179 p.get("type")
3180 .and_then(serde_json::Value::as_str)
3181 .unwrap_or_else(|| self.options.default_type.as_str())
3182 } else {
3183 self.options.default_type.as_str()
3184 };
3185
3186 let fetch_snapshot = if let Some(p) = ¶ms {
3188 p.get("fetchBalanceSnapshot")
3189 .and_then(serde_json::Value::as_bool)
3190 .unwrap_or(false)
3191 } else {
3192 false
3193 };
3194
3195 let await_snapshot = if let Some(p) = ¶ms {
3196 p.get("awaitBalanceSnapshot")
3197 .and_then(serde_json::Value::as_bool)
3198 .unwrap_or(true)
3199 } else {
3200 true
3201 };
3202
3203 let ws = self.create_authenticated_ws();
3205 ws.connect().await?;
3206
3207 if fetch_snapshot {
3209 let account_type_enum = account_type.parse::<ccxt_core::types::AccountType>().ok();
3211 let snapshot = self.fetch_balance(account_type_enum).await?;
3212
3213 let mut balances = ws.balances.write().await;
3215 balances.insert(account_type.to_string(), snapshot.clone());
3216
3217 if !await_snapshot {
3218 return Ok(snapshot);
3219 }
3220 }
3221
3222 let mut retries = 0;
3224
3225 while retries < MAX_RETRIES {
3226 if let Some(msg) = ws.client.receive().await {
3227 if msg.get("result").is_some() || msg.get("id").is_some() {
3229 continue;
3230 }
3231
3232 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3234 if matches!(
3236 event_type,
3237 "balanceUpdate" | "outboundAccountPosition" | "ACCOUNT_UPDATE"
3238 ) {
3239 if let Ok(()) = ws.handle_balance_message(&msg, account_type).await {
3241 let balances = ws.balances.read().await;
3243 if let Some(balance) = balances.get(account_type) {
3244 return Ok(balance.clone());
3245 }
3246 }
3247 }
3248 }
3249 }
3250
3251 retries += 1;
3252 tokio::time::sleep(Duration::from_millis(100)).await;
3253 }
3254
3255 Err(Error::network("Timeout waiting for balance data"))
3256 }
3257
3258 pub async fn watch_orders(
3291 self: Arc<Self>,
3292 symbol: Option<&str>,
3293 since: Option<i64>,
3294 limit: Option<usize>,
3295 _params: Option<HashMap<String, Value>>,
3296 ) -> Result<Vec<Order>> {
3297 self.base.load_markets(false).await?;
3298
3299 let ws = self.create_authenticated_ws();
3300 ws.connect().await?;
3301
3302 loop {
3304 if let Some(msg) = ws.client.receive().await {
3305 if let Value::Object(data) = msg {
3306 if let Some(event_type) = data.get("e").and_then(serde_json::Value::as_str) {
3307 if event_type == "executionReport" {
3308 let order = Binance::parse_ws_order(&data);
3310
3311 let mut orders = ws.orders.write().await;
3313 let symbol_orders = orders
3314 .entry(order.symbol.clone())
3315 .or_insert_with(HashMap::new);
3316 symbol_orders.insert(order.id.clone(), order.clone());
3317 drop(orders);
3318
3319 if let Some(exec_type) =
3321 data.get("x").and_then(serde_json::Value::as_str)
3322 {
3323 if exec_type == "TRADE" {
3324 if let Ok(trade) =
3326 BinanceWs::parse_ws_trade(&Value::Object(data.clone()))
3327 {
3328 let mut trades = ws.my_trades.write().await;
3330 let symbol_trades = trades
3331 .entry(trade.symbol.clone())
3332 .or_insert_with(VecDeque::new);
3333
3334 symbol_trades.push_front(trade);
3336 if symbol_trades.len() > 1000 {
3337 symbol_trades.pop_back();
3338 }
3339 }
3340 }
3341 }
3342
3343 return self.filter_orders(&ws, symbol, since, limit).await;
3345 }
3346 }
3347 }
3348 } else {
3349 tokio::time::sleep(Duration::from_millis(100)).await;
3350 }
3351 }
3352 }
3353
3354 fn parse_ws_order(data: &serde_json::Map<String, Value>) -> Order {
3356 use ccxt_core::types::{OrderSide, OrderStatus, OrderType};
3357 use rust_decimal::Decimal;
3358 use std::str::FromStr;
3359
3360 let symbol = data
3362 .get("s")
3363 .and_then(serde_json::Value::as_str)
3364 .unwrap_or("");
3365 let order_id = data
3366 .get("i")
3367 .and_then(serde_json::Value::as_i64)
3368 .map(|id| id.to_string())
3369 .unwrap_or_default();
3370 let client_order_id = data
3371 .get("c")
3372 .and_then(serde_json::Value::as_str)
3373 .map(String::from);
3374
3375 let status_str = data
3377 .get("X")
3378 .and_then(serde_json::Value::as_str)
3379 .unwrap_or("NEW");
3380 let status = match status_str {
3381 "FILLED" => OrderStatus::Closed,
3382 "CANCELED" => OrderStatus::Cancelled,
3383 "REJECTED" => OrderStatus::Rejected,
3384 "EXPIRED" => OrderStatus::Expired,
3385 _ => OrderStatus::Open,
3386 };
3387
3388 let side_str = data
3390 .get("S")
3391 .and_then(serde_json::Value::as_str)
3392 .unwrap_or("BUY");
3393 let side = match side_str {
3394 "SELL" => OrderSide::Sell,
3395 _ => OrderSide::Buy,
3396 };
3397
3398 let type_str = data
3400 .get("o")
3401 .and_then(serde_json::Value::as_str)
3402 .unwrap_or("LIMIT");
3403 let order_type = match type_str {
3404 "MARKET" => OrderType::Market,
3405 "STOP_LOSS" => OrderType::StopLoss,
3406 "STOP_LOSS_LIMIT" => OrderType::StopLossLimit,
3407 "TAKE_PROFIT" => OrderType::TakeProfit,
3408 "TAKE_PROFIT_LIMIT" => OrderType::TakeProfitLimit,
3409 "LIMIT_MAKER" => OrderType::LimitMaker,
3410 _ => OrderType::Limit,
3411 };
3412
3413 let amount = data
3415 .get("q")
3416 .and_then(serde_json::Value::as_str)
3417 .and_then(|s| Decimal::from_str(s).ok())
3418 .unwrap_or(Decimal::ZERO);
3419
3420 let price = data
3421 .get("p")
3422 .and_then(serde_json::Value::as_str)
3423 .and_then(|s| Decimal::from_str(s).ok());
3424
3425 let filled = data
3426 .get("z")
3427 .and_then(serde_json::Value::as_str)
3428 .and_then(|s| Decimal::from_str(s).ok());
3429
3430 let cost = data
3431 .get("Z")
3432 .and_then(serde_json::Value::as_str)
3433 .and_then(|s| Decimal::from_str(s).ok());
3434
3435 let remaining = filled.map(|fill| amount - fill);
3437
3438 let average = match (filled, cost) {
3440 (Some(fill), Some(c)) if fill > Decimal::ZERO && c > Decimal::ZERO => Some(c / fill),
3441 _ => None,
3442 };
3443
3444 let timestamp = data.get("T").and_then(serde_json::Value::as_i64);
3446 let last_trade_timestamp = data.get("T").and_then(serde_json::Value::as_i64);
3447
3448 Order {
3449 id: order_id,
3450 client_order_id,
3451 timestamp,
3452 datetime: timestamp.map(|ts| {
3453 chrono::DateTime::from_timestamp_millis(ts)
3454 .map(|dt| dt.to_rfc3339())
3455 .unwrap_or_default()
3456 }),
3457 last_trade_timestamp,
3458 symbol: symbol.to_string(),
3459 order_type,
3460 side,
3461 price,
3462 average,
3463 amount,
3464 cost,
3465 filled,
3466 remaining,
3467 status,
3468 fee: None,
3469 fees: None,
3470 trades: None,
3471 time_in_force: data
3472 .get("f")
3473 .and_then(serde_json::Value::as_str)
3474 .map(String::from),
3475 post_only: None,
3476 reduce_only: None,
3477 stop_price: data
3478 .get("P")
3479 .and_then(serde_json::Value::as_str)
3480 .and_then(|s| Decimal::from_str(s).ok()),
3481 trigger_price: None,
3482 take_profit_price: None,
3483 stop_loss_price: None,
3484 trailing_delta: None,
3485 trailing_percent: None,
3486 activation_price: None,
3487 callback_rate: None,
3488 working_type: data
3489 .get("wt")
3490 .and_then(serde_json::Value::as_str)
3491 .map(String::from),
3492 info: data.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
3493 }
3494 }
3495
3496 async fn filter_orders(
3498 &self,
3499 ws: &BinanceWs,
3500 symbol: Option<&str>,
3501 since: Option<i64>,
3502 limit: Option<usize>,
3503 ) -> Result<Vec<Order>> {
3504 let orders_map = ws.orders.read().await;
3505
3506 let mut orders: Vec<Order> = if let Some(sym) = symbol {
3508 orders_map
3509 .get(sym)
3510 .map(|symbol_orders| symbol_orders.values().cloned().collect())
3511 .unwrap_or_default()
3512 } else {
3513 orders_map
3514 .values()
3515 .flat_map(|symbol_orders| symbol_orders.values().cloned())
3516 .collect()
3517 };
3518
3519 if let Some(since_ts) = since {
3521 orders.retain(|order| order.timestamp.is_some_and(|ts| ts >= since_ts));
3522 }
3523
3524 orders.sort_by(|a, b| {
3526 let ts_a = a.timestamp.unwrap_or(0);
3527 let ts_b = b.timestamp.unwrap_or(0);
3528 ts_b.cmp(&ts_a)
3529 });
3530
3531 if let Some(lim) = limit {
3533 orders.truncate(lim);
3534 }
3535
3536 Ok(orders)
3537 }
3538
3539 pub async fn watch_my_trades(
3570 self: Arc<Self>,
3571 symbol: Option<&str>,
3572 since: Option<i64>,
3573 limit: Option<usize>,
3574 _params: Option<HashMap<String, Value>>,
3575 ) -> Result<Vec<Trade>> {
3576 const MAX_RETRIES: u32 = 100;
3577
3578 let ws = self.create_authenticated_ws();
3580 ws.connect().await?;
3581
3582 let mut retries = 0;
3584
3585 while retries < MAX_RETRIES {
3586 if let Some(msg) = ws.client.receive().await {
3587 if msg.get("result").is_some() || msg.get("id").is_some() {
3589 continue;
3590 }
3591
3592 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3594 if event_type == "executionReport" {
3596 if let Ok(trade) = BinanceWs::parse_ws_trade(&msg) {
3597 let symbol_key = trade.symbol.clone();
3598
3599 let mut trades_map = ws.my_trades.write().await;
3601 let symbol_trades =
3602 trades_map.entry(symbol_key).or_insert_with(VecDeque::new);
3603
3604 symbol_trades.push_front(trade);
3606 if symbol_trades.len() > 1000 {
3607 symbol_trades.pop_back();
3608 }
3609 }
3610 }
3611 }
3612 } else {
3613 tokio::time::sleep(Duration::from_millis(100)).await;
3614 }
3615
3616 retries += 1;
3617 }
3618
3619 ws.filter_my_trades(symbol, since, limit).await
3621 }
3622
3623 pub async fn watch_positions(
3688 self: Arc<Self>,
3689 symbols: Option<Vec<String>>,
3690 since: Option<i64>,
3691 limit: Option<usize>,
3692 _params: Option<HashMap<String, Value>>,
3693 ) -> Result<Vec<Position>> {
3694 const MAX_RETRIES: u32 = 100;
3695
3696 let ws = self.create_authenticated_ws();
3698 ws.connect().await?;
3699
3700 let mut retries = 0;
3702
3703 while retries < MAX_RETRIES {
3704 if let Some(msg) = ws.client.receive().await {
3705 if msg.get("result").is_some() || msg.get("id").is_some() {
3707 continue;
3708 }
3709
3710 if let Some(event_type) = msg.get("e").and_then(|e| e.as_str()) {
3712 if event_type == "ACCOUNT_UPDATE" {
3713 if let Some(account_data) = msg.get("a") {
3714 if let Some(positions_array) =
3715 account_data.get("P").and_then(|p| p.as_array())
3716 {
3717 for position_data in positions_array {
3718 if let Ok(position) =
3719 BinanceWs::parse_ws_position(position_data)
3720 {
3721 let symbol_key = position.symbol.clone();
3722 let side_key = position
3723 .side
3724 .clone()
3725 .unwrap_or_else(|| "both".to_string());
3726
3727 let mut positions_map = ws.positions.write().await;
3729 let symbol_positions = positions_map
3730 .entry(symbol_key)
3731 .or_insert_with(HashMap::new);
3732
3733 if position.contracts.unwrap_or(0.0).abs() < 0.000001 {
3735 symbol_positions.remove(&side_key);
3736 if symbol_positions.is_empty() {
3737 positions_map.remove(&position.symbol);
3738 }
3739 } else {
3740 symbol_positions.insert(side_key, position);
3741 }
3742 }
3743 }
3744 }
3745 }
3746 }
3747 }
3748 } else {
3749 tokio::time::sleep(Duration::from_millis(100)).await;
3750 }
3751
3752 retries += 1;
3753 }
3754
3755 let symbols_ref = symbols.as_deref();
3757 ws.filter_positions(symbols_ref, since, limit).await
3758 }
3759}
3760
3761#[cfg(test)]
3762mod tests {
3763 use super::*;
3764
3765 #[test]
3766 fn test_binance_ws_creation() {
3767 let ws = BinanceWs::new(WS_BASE_URL.to_string());
3768 assert!(ws.listen_key.try_read().is_ok());
3770 }
3771
3772 #[test]
3773 fn test_stream_format() {
3774 let symbol = "btcusdt";
3775
3776 let ticker_stream = format!("{}@ticker", symbol);
3778 assert_eq!(ticker_stream, "btcusdt@ticker");
3779
3780 let trade_stream = format!("{}@trade", symbol);
3782 assert_eq!(trade_stream, "btcusdt@trade");
3783
3784 let depth_stream = format!("{}@depth20", symbol);
3786 assert_eq!(depth_stream, "btcusdt@depth20");
3787
3788 let kline_stream = format!("{}@kline_1m", symbol);
3790 assert_eq!(kline_stream, "btcusdt@kline_1m");
3791 }
3792
3793 #[tokio::test]
3794 async fn test_subscription_manager_basic() {
3795 let manager = SubscriptionManager::new();
3796 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
3797
3798 assert_eq!(manager.active_count(), 0);
3800 assert!(!manager.has_subscription("btcusdt@ticker").await);
3801
3802 manager
3804 .add_subscription(
3805 "btcusdt@ticker".to_string(),
3806 "BTCUSDT".to_string(),
3807 SubscriptionType::Ticker,
3808 tx.clone(),
3809 )
3810 .await
3811 .unwrap();
3812
3813 assert_eq!(manager.active_count(), 1);
3814 assert!(manager.has_subscription("btcusdt@ticker").await);
3815
3816 let sub = manager.get_subscription("btcusdt@ticker").await;
3818 assert!(sub.is_some());
3819 let sub = sub.unwrap();
3820 assert_eq!(sub.stream, "btcusdt@ticker");
3821 assert_eq!(sub.symbol, "BTCUSDT");
3822 assert_eq!(sub.sub_type, SubscriptionType::Ticker);
3823
3824 manager.remove_subscription("btcusdt@ticker").await.unwrap();
3826 assert_eq!(manager.active_count(), 0);
3827 assert!(!manager.has_subscription("btcusdt@ticker").await);
3828 }
3829
3830 #[tokio::test]
3831 async fn test_subscription_manager_multiple() {
3832 let manager = SubscriptionManager::new();
3833 let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
3834 let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
3835 let (tx3, _rx3) = tokio::sync::mpsc::unbounded_channel();
3836
3837 manager
3839 .add_subscription(
3840 "btcusdt@ticker".to_string(),
3841 "BTCUSDT".to_string(),
3842 SubscriptionType::Ticker,
3843 tx1,
3844 )
3845 .await
3846 .unwrap();
3847
3848 manager
3849 .add_subscription(
3850 "btcusdt@depth".to_string(),
3851 "BTCUSDT".to_string(),
3852 SubscriptionType::OrderBook,
3853 tx2,
3854 )
3855 .await
3856 .unwrap();
3857
3858 manager
3859 .add_subscription(
3860 "ethusdt@ticker".to_string(),
3861 "ETHUSDT".to_string(),
3862 SubscriptionType::Ticker,
3863 tx3,
3864 )
3865 .await
3866 .unwrap();
3867
3868 assert_eq!(manager.active_count(), 3);
3869
3870 let btc_subs = manager.get_subscriptions_by_symbol("BTCUSDT").await;
3872 assert_eq!(btc_subs.len(), 2);
3873
3874 let eth_subs = manager.get_subscriptions_by_symbol("ETHUSDT").await;
3875 assert_eq!(eth_subs.len(), 1);
3876
3877 let all_subs = manager.get_all_subscriptions().await;
3879 assert_eq!(all_subs.len(), 3);
3880
3881 manager.clear().await;
3883 assert_eq!(manager.active_count(), 0);
3884 }
3885
3886 #[tokio::test]
3887 async fn test_subscription_type_from_stream() {
3888 let sub_type = SubscriptionType::from_stream("btcusdt@ticker");
3890 assert_eq!(sub_type, Some(SubscriptionType::Ticker));
3891
3892 let sub_type = SubscriptionType::from_stream("btcusdt@depth");
3894 assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3895
3896 let sub_type = SubscriptionType::from_stream("btcusdt@depth@100ms");
3897 assert_eq!(sub_type, Some(SubscriptionType::OrderBook));
3898
3899 let sub_type = SubscriptionType::from_stream("btcusdt@trade");
3901 assert_eq!(sub_type, Some(SubscriptionType::Trades));
3902
3903 let sub_type = SubscriptionType::from_stream("btcusdt@aggTrade");
3904 assert_eq!(sub_type, Some(SubscriptionType::Trades));
3905
3906 let sub_type = SubscriptionType::from_stream("btcusdt@kline_1m");
3908 assert_eq!(sub_type, Some(SubscriptionType::Kline("1m".to_string())));
3909
3910 let sub_type = SubscriptionType::from_stream("btcusdt@kline_1h");
3911 assert_eq!(sub_type, Some(SubscriptionType::Kline("1h".to_string())));
3912
3913 let sub_type = SubscriptionType::from_stream("btcusdt@markPrice");
3915 assert_eq!(sub_type, Some(SubscriptionType::MarkPrice));
3916
3917 let sub_type = SubscriptionType::from_stream("btcusdt@bookTicker");
3919 assert_eq!(sub_type, Some(SubscriptionType::BookTicker));
3920
3921 let sub_type = SubscriptionType::from_stream("btcusdt@unknown");
3923 assert_eq!(sub_type, None);
3924 }
3925
3926 #[tokio::test]
3927 async fn test_subscription_send_message() {
3928 let manager = SubscriptionManager::new();
3929 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
3930
3931 manager
3933 .add_subscription(
3934 "btcusdt@ticker".to_string(),
3935 "BTCUSDT".to_string(),
3936 SubscriptionType::Ticker,
3937 tx,
3938 )
3939 .await
3940 .unwrap();
3941
3942 let test_msg = serde_json::json!({
3944 "e": "24hrTicker",
3945 "s": "BTCUSDT",
3946 "c": "50000"
3947 });
3948
3949 let sent = manager
3950 .send_to_stream("btcusdt@ticker", test_msg.clone())
3951 .await;
3952 assert!(sent);
3953
3954 let received = rx.recv().await;
3956 assert!(received.is_some());
3957 assert_eq!(received.unwrap(), test_msg);
3958 }
3959
3960 #[tokio::test]
3961 async fn test_subscription_send_to_symbol() {
3962 let manager = SubscriptionManager::new();
3963 let (tx1, mut rx1) = tokio::sync::mpsc::unbounded_channel();
3964 let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel();
3965
3966 manager
3968 .add_subscription(
3969 "btcusdt@ticker".to_string(),
3970 "BTCUSDT".to_string(),
3971 SubscriptionType::Ticker,
3972 tx1,
3973 )
3974 .await
3975 .unwrap();
3976
3977 manager
3978 .add_subscription(
3979 "btcusdt@depth".to_string(),
3980 "BTCUSDT".to_string(),
3981 SubscriptionType::OrderBook,
3982 tx2,
3983 )
3984 .await
3985 .unwrap();
3986
3987 let test_msg = serde_json::json!({
3989 "s": "BTCUSDT",
3990 "data": "test"
3991 });
3992
3993 let sent_count = manager.send_to_symbol("BTCUSDT", &test_msg).await;
3994 assert_eq!(sent_count, 2);
3995
3996 let received1 = rx1.recv().await;
3998 assert!(received1.is_some());
3999 assert_eq!(received1.unwrap(), test_msg);
4000
4001 let received2 = rx2.recv().await;
4002 assert!(received2.is_some());
4003 assert_eq!(received2.unwrap(), test_msg);
4004 }
4005
4006 #[test]
4007 fn test_symbol_conversion() {
4008 let symbol = "BTC/USDT";
4009 let binance_symbol = symbol.replace('/', "").to_lowercase();
4010 assert_eq!(binance_symbol, "btcusdt");
4011 }
4012
4013 #[test]
4016 fn test_reconnect_config_default() {
4017 let config = ReconnectConfig::default();
4018
4019 assert!(config.enabled);
4020 assert_eq!(config.initial_delay_ms, 1000);
4021 assert_eq!(config.max_delay_ms, 30000);
4022 assert_eq!(config.backoff_multiplier, 2.0);
4023 assert_eq!(config.max_attempts, 0); }
4025
4026 #[test]
4027 fn test_reconnect_config_calculate_delay() {
4028 let config = ReconnectConfig::default();
4029
4030 assert_eq!(config.calculate_delay(0), 1000); assert_eq!(config.calculate_delay(1), 2000); assert_eq!(config.calculate_delay(2), 4000); assert_eq!(config.calculate_delay(3), 8000); assert_eq!(config.calculate_delay(4), 16000); assert_eq!(config.calculate_delay(5), 30000); assert_eq!(config.calculate_delay(6), 30000); }
4039
4040 #[test]
4041 fn test_reconnect_config_should_retry() {
4042 let mut config = ReconnectConfig::default();
4043
4044 assert!(config.should_retry(0));
4046 assert!(config.should_retry(10));
4047 assert!(config.should_retry(100));
4048
4049 config.max_attempts = 3;
4051 assert!(config.should_retry(0));
4052 assert!(config.should_retry(1));
4053 assert!(config.should_retry(2));
4054 assert!(!config.should_retry(3));
4055 assert!(!config.should_retry(4));
4056
4057 config.enabled = false;
4059 assert!(!config.should_retry(0));
4060 assert!(!config.should_retry(1));
4061 }
4062
4063 #[test]
4064 fn test_message_router_extract_stream_name_combined() {
4065 let message = serde_json::json!({
4067 "stream": "btcusdt@ticker",
4068 "data": {
4069 "e": "24hrTicker",
4070 "s": "BTCUSDT"
4071 }
4072 });
4073
4074 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4075 assert_eq!(stream_name, "btcusdt@ticker");
4076 }
4077
4078 #[test]
4079 fn test_message_router_extract_stream_name_ticker() {
4080 let message = serde_json::json!({
4082 "e": "24hrTicker",
4083 "s": "BTCUSDT",
4084 "E": 1672531200000_u64,
4085 "c": "16950.00",
4086 "h": "17100.00"
4087 });
4088
4089 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4090 assert_eq!(stream_name, "btcusdt@ticker");
4091 }
4092
4093 #[test]
4094 fn test_message_router_extract_stream_name_depth() {
4095 let message = serde_json::json!({
4097 "e": "depthUpdate",
4098 "s": "ETHUSDT",
4099 "E": 1672531200000_u64,
4100 "U": 157,
4101 "u": 160
4102 });
4103
4104 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4105 assert_eq!(stream_name, "ethusdt@depth");
4106 }
4107
4108 #[test]
4109 fn test_message_router_extract_stream_name_trade() {
4110 let message = serde_json::json!({
4112 "e": "trade",
4113 "s": "BNBUSDT",
4114 "E": 1672531200000_u64,
4115 "t": 12345
4116 });
4117
4118 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4119 assert_eq!(stream_name, "bnbusdt@trade");
4120 }
4121
4122 #[test]
4123 fn test_message_router_extract_stream_name_kline() {
4124 let message = serde_json::json!({
4126 "e": "kline",
4127 "s": "BTCUSDT",
4128 "E": 1672531200000_u64,
4129 "k": {
4130 "i": "1m",
4131 "t": 1672531200000_u64,
4132 "o": "16950.00"
4133 }
4134 });
4135
4136 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4137 assert_eq!(stream_name, "btcusdt@kline_1m");
4138 }
4139
4140 #[test]
4141 fn test_message_router_extract_stream_name_mark_price() {
4142 let message = serde_json::json!({
4144 "e": "markPriceUpdate",
4145 "s": "BTCUSDT",
4146 "E": 1672531200000_u64,
4147 "p": "16950.00"
4148 });
4149
4150 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4151 assert_eq!(stream_name, "btcusdt@markPrice");
4152 }
4153
4154 #[test]
4155 fn test_message_router_extract_stream_name_book_ticker() {
4156 let message = serde_json::json!({
4158 "e": "bookTicker",
4159 "s": "ETHUSDT",
4160 "E": 1672531200000_u64,
4161 "b": "1200.00",
4162 "a": "1200.50"
4163 });
4164
4165 let stream_name = MessageRouter::extract_stream_name(&message).unwrap();
4166 assert_eq!(stream_name, "ethusdt@bookTicker");
4167 }
4168
4169 #[test]
4170 fn test_message_router_extract_stream_name_subscription_response() {
4171 let message = serde_json::json!({
4173 "result": null,
4174 "id": 1
4175 });
4176
4177 let result = MessageRouter::extract_stream_name(&message);
4178 assert!(result.is_err());
4179 }
4180
4181 #[test]
4182 fn test_message_router_extract_stream_name_error_response() {
4183 let message = serde_json::json!({
4185 "error": {
4186 "code": -1,
4187 "msg": "Invalid request"
4188 },
4189 "id": 1
4190 });
4191
4192 let result = MessageRouter::extract_stream_name(&message);
4193 assert!(result.is_err());
4194 }
4195
4196 #[test]
4197 fn test_message_router_extract_stream_name_invalid() {
4198 let message = serde_json::json!({
4200 "unknown": "data"
4201 });
4202
4203 let result = MessageRouter::extract_stream_name(&message);
4204 assert!(result.is_err());
4205 }
4206
4207 #[tokio::test]
4208 async fn test_message_router_creation() {
4209 let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4210 let subscription_manager = Arc::new(SubscriptionManager::new());
4211
4212 let router = MessageRouter::new(ws_url.clone(), subscription_manager);
4213
4214 assert!(!router.is_connected());
4216 assert_eq!(router.ws_url, ws_url);
4217 }
4218
4219 #[tokio::test]
4220 async fn test_message_router_reconnect_config() {
4221 let ws_url = "wss://stream.binance.com:9443/ws".to_string();
4222 let subscription_manager = Arc::new(SubscriptionManager::new());
4223
4224 let router = MessageRouter::new(ws_url, subscription_manager);
4225
4226 let config = router.get_reconnect_config().await;
4228 assert!(config.enabled);
4229 assert_eq!(config.initial_delay_ms, 1000);
4230
4231 let new_config = ReconnectConfig {
4233 enabled: false,
4234 initial_delay_ms: 2000,
4235 max_delay_ms: 60000,
4236 backoff_multiplier: 1.5,
4237 max_attempts: 5,
4238 };
4239
4240 router.set_reconnect_config(new_config.clone()).await;
4241
4242 let updated_config = router.get_reconnect_config().await;
4243 assert!(!updated_config.enabled);
4244 assert_eq!(updated_config.initial_delay_ms, 2000);
4245 assert_eq!(updated_config.max_delay_ms, 60000);
4246 assert_eq!(updated_config.backoff_multiplier, 1.5);
4247 assert_eq!(updated_config.max_attempts, 5);
4248 }
4249}