1use crate::bitget::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
21
22const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
24
25const MAX_RECONNECT_ATTEMPTS: u32 = 10;
27
28pub struct BitgetWs {
32 client: Arc<WsClient>,
34 subscriptions: Arc<RwLock<Vec<String>>>,
36}
37
38impl BitgetWs {
39 pub fn new(url: String) -> Self {
45 let config = WsConfig {
46 url: url.clone(),
47 connect_timeout: 10000,
48 ping_interval: DEFAULT_PING_INTERVAL_MS,
49 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
50 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
51 auto_reconnect: true,
52 enable_compression: false,
53 pong_timeout: 90000,
54 ..Default::default()
55 };
56
57 Self {
58 client: Arc::new(WsClient::new(config)),
59 subscriptions: Arc::new(RwLock::new(Vec::new())),
60 }
61 }
62
63 pub async fn connect(&self) -> Result<()> {
65 self.client.connect().await
66 }
67
68 pub async fn disconnect(&self) -> Result<()> {
70 self.client.disconnect().await
71 }
72
73 pub fn state(&self) -> WsConnectionState {
75 self.client.state()
76 }
77
78 pub fn is_connected(&self) -> bool {
80 self.client.is_connected()
81 }
82
83 pub async fn receive(&self) -> Option<Value> {
85 self.client.receive().await
86 }
87
88 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
94 let mut arg_map = serde_json::Map::new();
95 arg_map.insert(
96 "instType".to_string(),
97 serde_json::Value::String("SPOT".to_string()),
98 );
99 arg_map.insert(
100 "channel".to_string(),
101 serde_json::Value::String("ticker".to_string()),
102 );
103 arg_map.insert(
104 "instId".to_string(),
105 serde_json::Value::String(symbol.to_string()),
106 );
107 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
108
109 let mut msg_map = serde_json::Map::new();
110 msg_map.insert(
111 "op".to_string(),
112 serde_json::Value::String("subscribe".to_string()),
113 );
114 msg_map.insert("args".to_string(), args);
115 let msg = serde_json::Value::Object(msg_map);
116
117 self.client.send_json(&msg).await?;
118
119 let sub_key = format!("ticker:{}", symbol);
120 self.subscriptions.write().await.push(sub_key);
121
122 Ok(())
123 }
124
125 pub async fn subscribe_tickers(&self, symbols: &[String]) -> Result<()> {
131 let mut args = Vec::new();
132 for symbol in symbols {
133 let mut arg_map = serde_json::Map::new();
134 arg_map.insert(
135 "instType".to_string(),
136 serde_json::Value::String("SPOT".to_string()),
137 );
138 arg_map.insert(
139 "channel".to_string(),
140 serde_json::Value::String("ticker".to_string()),
141 );
142 arg_map.insert(
143 "instId".to_string(),
144 serde_json::Value::String(symbol.clone()),
145 );
146 args.push(serde_json::Value::Object(arg_map));
147 }
148
149 let mut msg_map = serde_json::Map::new();
150 msg_map.insert(
151 "op".to_string(),
152 serde_json::Value::String("subscribe".to_string()),
153 );
154 msg_map.insert("args".to_string(), serde_json::Value::Array(args));
155 let msg = serde_json::Value::Object(msg_map);
156
157 self.client.send_json(&msg).await?;
158
159 let mut subs = self.subscriptions.write().await;
160 for symbol in symbols {
161 subs.push(format!("ticker:{}", symbol));
162 }
163
164 Ok(())
165 }
166
167 pub async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
179 if !self.is_connected() {
181 self.connect().await?;
182 }
183
184 self.subscribe_tickers(symbols).await?;
186
187 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Ticker>>>();
189 let symbols_owned: Vec<String> = symbols.to_vec();
190 let client = Arc::clone(&self.client);
191
192 tokio::spawn(async move {
194 while let Some(msg) = client.receive().await {
195 if let Some(arg) = msg.get("arg") {
197 let channel = arg.get("channel").and_then(|c| c.as_str());
198 let inst_id = arg.get("instId").and_then(|i| i.as_str());
199
200 if channel == Some("ticker") {
201 if let Some(id) = inst_id {
202 if symbols_owned.iter().any(|s| s == id) {
203 match parse_ws_ticker(&msg, None) {
204 Ok(ticker) => {
205 if tx.send(Ok(vec![ticker])).is_err() {
206 break; }
208 }
209 Err(e) => {
210 if tx.send(Err(e)).is_err() {
211 break;
212 }
213 }
214 }
215 }
216 }
217 }
218 }
219 }
220 });
221
222 Ok(Box::pin(ReceiverStream::new(rx)))
223 }
224 pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
226 let channel = match depth {
227 5 => "books5",
228 15 => "books15",
229 _ => "books",
230 };
231
232 let mut arg_map = serde_json::Map::new();
233 arg_map.insert(
234 "instType".to_string(),
235 serde_json::Value::String("SPOT".to_string()),
236 );
237 arg_map.insert(
238 "channel".to_string(),
239 serde_json::Value::String(channel.to_string()),
240 );
241 arg_map.insert(
242 "instId".to_string(),
243 serde_json::Value::String(symbol.to_string()),
244 );
245 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
246
247 let mut msg_map = serde_json::Map::new();
248 msg_map.insert(
249 "op".to_string(),
250 serde_json::Value::String("subscribe".to_string()),
251 );
252 msg_map.insert("args".to_string(), args);
253 let msg = serde_json::Value::Object(msg_map);
254
255 self.client.send_json(&msg).await?;
256
257 let sub_key = format!("orderbook:{}", symbol);
258 self.subscriptions.write().await.push(sub_key);
259
260 Ok(())
261 }
262
263 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
269 let mut arg_map = serde_json::Map::new();
270 arg_map.insert(
271 "instType".to_string(),
272 serde_json::Value::String("SPOT".to_string()),
273 );
274 arg_map.insert(
275 "channel".to_string(),
276 serde_json::Value::String("trade".to_string()),
277 );
278 arg_map.insert(
279 "instId".to_string(),
280 serde_json::Value::String(symbol.to_string()),
281 );
282 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
283
284 let mut msg_map = serde_json::Map::new();
285 msg_map.insert(
286 "op".to_string(),
287 serde_json::Value::String("subscribe".to_string()),
288 );
289 msg_map.insert("args".to_string(), args);
290 let msg = serde_json::Value::Object(msg_map);
291
292 self.client.send_json(&msg).await?;
293
294 let sub_key = format!("trades:{}", symbol);
295 self.subscriptions.write().await.push(sub_key);
296
297 Ok(())
298 }
299
300 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
307 let channel = format!("candle{}", interval);
308
309 let mut arg_map = serde_json::Map::new();
310 arg_map.insert(
311 "instType".to_string(),
312 serde_json::Value::String("SPOT".to_string()),
313 );
314 arg_map.insert(
315 "channel".to_string(),
316 serde_json::Value::String(channel.clone()),
317 );
318 arg_map.insert(
319 "instId".to_string(),
320 serde_json::Value::String(symbol.to_string()),
321 );
322 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
323
324 let mut msg_map = serde_json::Map::new();
325 msg_map.insert(
326 "op".to_string(),
327 serde_json::Value::String("subscribe".to_string()),
328 );
329 msg_map.insert("args".to_string(), args);
330 let msg = serde_json::Value::Object(msg_map);
331
332 self.client.send_json(&msg).await?;
333
334 let sub_key = format!("kline:{}:{}", symbol, interval);
335 self.subscriptions.write().await.push(sub_key);
336
337 Ok(())
338 }
339
340 pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
346 let parts: Vec<&str> = stream_name.split(':').collect();
348 if parts.len() < 2 {
349 return Err(Error::invalid_request(format!(
350 "Invalid stream name: {}",
351 stream_name
352 )));
353 }
354
355 let channel = parts[0];
356 let symbol = parts[1];
357
358 let bitget_channel = match channel {
359 "ticker" => "ticker",
360 "orderbook" => "books",
361 "trades" => "trade",
362 "kline" => {
363 if parts.len() >= 3 {
364 return self.unsubscribe_kline(symbol, parts[2]).await;
366 }
367 return Err(Error::invalid_request(
368 "Kline unsubscribe requires interval",
369 ));
370 }
371 _ => channel,
372 };
373
374 let mut arg_map = serde_json::Map::new();
375 arg_map.insert(
376 "instType".to_string(),
377 serde_json::Value::String("SPOT".to_string()),
378 );
379 arg_map.insert(
380 "channel".to_string(),
381 serde_json::Value::String(bitget_channel.to_string()),
382 );
383 arg_map.insert(
384 "instId".to_string(),
385 serde_json::Value::String(symbol.to_string()),
386 );
387 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
388
389 let mut msg_map = serde_json::Map::new();
390 msg_map.insert(
391 "op".to_string(),
392 serde_json::Value::String("unsubscribe".to_string()),
393 );
394 msg_map.insert("args".to_string(), args);
395 let msg = serde_json::Value::Object(msg_map);
396
397 self.client.send_json(&msg).await?;
398
399 let mut subs = self.subscriptions.write().await;
401 subs.retain(|s| s != &stream_name);
402
403 Ok(())
404 }
405
406 async fn unsubscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
408 let channel = format!("candle{}", interval);
409
410 let mut arg_map = serde_json::Map::new();
411 arg_map.insert(
412 "instType".to_string(),
413 serde_json::Value::String("SPOT".to_string()),
414 );
415 arg_map.insert(
416 "channel".to_string(),
417 serde_json::Value::String(channel.clone()),
418 );
419 arg_map.insert(
420 "instId".to_string(),
421 serde_json::Value::String(symbol.to_string()),
422 );
423 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
424
425 let mut msg_map = serde_json::Map::new();
426 msg_map.insert(
427 "op".to_string(),
428 serde_json::Value::String("unsubscribe".to_string()),
429 );
430 msg_map.insert("args".to_string(), args);
431 let msg = serde_json::Value::Object(msg_map);
432
433 self.client.send_json(&msg).await?;
434
435 let sub_key = format!("kline:{}:{}", symbol, interval);
436 let mut subs = self.subscriptions.write().await;
437 subs.retain(|s| s != &sub_key);
438
439 Ok(())
440 }
441
442 pub async fn subscriptions(&self) -> Vec<String> {
444 self.subscriptions.read().await.clone()
445 }
446
447 pub async fn watch_ticker(
480 &self,
481 symbol: &str,
482 market: Option<Market>,
483 ) -> Result<MessageStream<Ticker>> {
484 if !self.is_connected() {
486 self.connect().await?;
487 }
488
489 self.subscribe_ticker(symbol).await?;
491
492 let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
494 let symbol_owned = symbol.to_string();
495 let client = Arc::clone(&self.client);
496
497 tokio::spawn(async move {
499 while let Some(msg) = client.receive().await {
500 if is_ticker_message(&msg, &symbol_owned) {
502 match parse_ws_ticker(&msg, market.as_ref()) {
503 Ok(ticker) => {
504 if tx.send(Ok(ticker)).is_err() {
505 break; }
507 }
508 Err(e) => {
509 if tx.send(Err(e)).is_err() {
510 break;
511 }
512 }
513 }
514 }
515 }
516 });
517
518 Ok(Box::pin(ReceiverStream::new(rx)))
519 }
520
521 pub async fn watch_order_book(
554 &self,
555 symbol: &str,
556 limit: Option<u32>,
557 ) -> Result<MessageStream<OrderBook>> {
558 if !self.is_connected() {
560 self.connect().await?;
561 }
562
563 let depth = limit.unwrap_or(15);
565 self.subscribe_orderbook(symbol, depth).await?;
566
567 let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
569 let symbol_owned = symbol.to_string();
570 let unified_symbol = format_unified_symbol(&symbol_owned);
571 let client = Arc::clone(&self.client);
572
573 tokio::spawn(async move {
575 while let Some(msg) = client.receive().await {
576 if is_orderbook_message(&msg, &symbol_owned) {
578 match parse_ws_orderbook(&msg, unified_symbol.clone()) {
579 Ok(orderbook) => {
580 if tx.send(Ok(orderbook)).is_err() {
581 break; }
583 }
584 Err(e) => {
585 if tx.send(Err(e)).is_err() {
586 break;
587 }
588 }
589 }
590 }
591 }
592 });
593
594 Ok(Box::pin(ReceiverStream::new(rx)))
595 }
596
597 pub async fn watch_trades(
634 &self,
635 symbol: &str,
636 market: Option<Market>,
637 ) -> Result<MessageStream<Vec<Trade>>> {
638 if !self.is_connected() {
640 self.connect().await?;
641 }
642
643 self.subscribe_trades(symbol).await?;
645
646 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
648 let symbol_owned = symbol.to_string();
649 let client = Arc::clone(&self.client);
650
651 tokio::spawn(async move {
653 while let Some(msg) = client.receive().await {
654 if is_trade_message(&msg, &symbol_owned) {
656 match parse_ws_trades(&msg, market.as_ref()) {
657 Ok(trades) => {
658 if tx.send(Ok(trades)).is_err() {
659 break; }
661 }
662 Err(e) => {
663 if tx.send(Err(e)).is_err() {
664 break;
665 }
666 }
667 }
668 }
669 }
670 });
671
672 Ok(Box::pin(ReceiverStream::new(rx)))
673 }
674}
675
676struct ReceiverStream<T> {
682 receiver: mpsc::UnboundedReceiver<T>,
683}
684
685impl<T> ReceiverStream<T> {
686 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
687 Self { receiver }
688 }
689}
690
691impl<T> Stream for ReceiverStream<T> {
692 type Item = T;
693
694 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
695 self.receiver.poll_recv(cx)
696 }
697}
698
699fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
705 if let Some(arg) = msg.get("arg") {
706 let channel = arg.get("channel").and_then(|c| c.as_str());
707 let inst_id = arg.get("instId").and_then(|i| i.as_str());
708
709 channel == Some("ticker") && inst_id == Some(symbol)
710 } else {
711 false
712 }
713}
714
715fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
717 if let Some(arg) = msg.get("arg") {
718 let channel = arg.get("channel").and_then(|c| c.as_str());
719 let inst_id = arg.get("instId").and_then(|i| i.as_str());
720
721 let is_orderbook_channel = channel.is_some_and(|c| c.starts_with("books"));
723 is_orderbook_channel && inst_id == Some(symbol)
724 } else {
725 false
726 }
727}
728
729fn is_trade_message(msg: &Value, symbol: &str) -> bool {
731 if let Some(arg) = msg.get("arg") {
732 let channel = arg.get("channel").and_then(|c| c.as_str());
733 let inst_id = arg.get("instId").and_then(|i| i.as_str());
734
735 channel == Some("trade") && inst_id == Some(symbol)
736 } else {
737 false
738 }
739}
740
741fn format_unified_symbol(symbol: &str) -> String {
743 let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
745
746 for quote in "e_currencies {
747 if let Some(base) = symbol.strip_suffix(quote) {
748 if !base.is_empty() {
749 return format!("{}/{}", base, quote);
750 }
751 }
752 }
753
754 symbol.to_string()
756}
757
758pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
764 let data = msg
767 .get("data")
768 .and_then(|d| d.as_array())
769 .and_then(|arr| arr.first())
770 .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
771
772 parse_ticker(data, market)
773}
774
775pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
777 let data = msg
780 .get("data")
781 .and_then(|d| d.as_array())
782 .and_then(|arr| arr.first())
783 .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
784
785 parse_orderbook(data, symbol)
786}
787
788pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
790 let data = msg
793 .get("data")
794 .and_then(|d| d.as_array())
795 .and_then(|arr| arr.first())
796 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
797
798 parse_trade(data, market)
799}
800
801pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
803 let data_array = msg
806 .get("data")
807 .and_then(|d| d.as_array())
808 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
809
810 let mut trades = Vec::with_capacity(data_array.len());
811 for data in data_array {
812 trades.push(parse_trade(data, market)?);
813 }
814
815 Ok(trades)
816}
817
818#[cfg(test)]
819mod tests {
820 use super::*;
821 use ccxt_core::types::financial::Price;
822 use rust_decimal_macros::dec;
823
824 #[test]
825 fn test_bitget_ws_creation() {
826 let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
827 assert!(ws.subscriptions.try_read().is_ok());
829 }
830
831 #[tokio::test]
832 async fn test_subscriptions_empty_by_default() {
833 let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
834 let subs = ws.subscriptions().await;
835 assert!(subs.is_empty());
836 }
837
838 #[test]
841 fn test_parse_ws_ticker_snapshot() {
842 let msg = serde_json::from_str(
843 r#"{
844 "action": "snapshot",
845 "arg": {
846 "instType": "SPOT",
847 "channel": "ticker",
848 "instId": "BTCUSDT"
849 },
850 "data": [{
851 "instId": "BTCUSDT",
852 "lastPr": "50000.00",
853 "high24h": "51000.00",
854 "low24h": "49000.00",
855 "bidPr": "49999.00",
856 "askPr": "50001.00",
857 "baseVolume": "1000.5",
858 "ts": "1700000000000"
859 }]
860 }"#,
861 )
862 .unwrap();
863
864 let ticker = parse_ws_ticker(&msg, None).unwrap();
865 assert_eq!(ticker.symbol, "BTCUSDT");
866 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
867 assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
868 assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
869 assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
870 assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
871 assert_eq!(ticker.timestamp, 1700000000000);
872 }
873
874 #[test]
875 fn test_parse_ws_ticker_with_market() {
876 let msg = serde_json::from_str(
877 r#"{
878 "action": "snapshot",
879 "arg": {
880 "instType": "SPOT",
881 "channel": "ticker",
882 "instId": "BTCUSDT"
883 },
884 "data": [{
885 "instId": "BTCUSDT",
886 "lastPr": "50000.00",
887 "ts": "1700000000000"
888 }]
889 }"#,
890 )
891 .unwrap();
892
893 let market = Market {
894 id: "BTCUSDT".to_string(),
895 symbol: "BTC/USDT".to_string(),
896 base: "BTC".to_string(),
897 quote: "USDT".to_string(),
898 ..Default::default()
899 };
900
901 let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
902 assert_eq!(ticker.symbol, "BTC/USDT");
903 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
904 }
905
906 #[test]
907 fn test_parse_ws_ticker_missing_data() {
908 let msg = serde_json::from_str(
909 r#"{
910 "action": "snapshot",
911 "arg": {
912 "instType": "SPOT",
913 "channel": "ticker",
914 "instId": "BTCUSDT"
915 }
916 }"#,
917 )
918 .unwrap();
919
920 let result = parse_ws_ticker(&msg, None);
921 assert!(result.is_err());
922 }
923
924 #[test]
925 fn test_parse_ws_ticker_empty_data_array() {
926 let msg = serde_json::from_str(
927 r#"{
928 "action": "snapshot",
929 "arg": {
930 "instType": "SPOT",
931 "channel": "ticker",
932 "instId": "BTCUSDT"
933 },
934 "data": []
935 }"#,
936 )
937 .unwrap();
938
939 let result = parse_ws_ticker(&msg, None);
940 assert!(result.is_err());
941 }
942
943 #[test]
946 fn test_parse_ws_orderbook_snapshot() {
947 let msg = serde_json::from_str(
948 r#"{
949 "action": "snapshot",
950 "arg": {
951 "instType": "SPOT",
952 "channel": "books5",
953 "instId": "BTCUSDT"
954 },
955 "data": [{
956 "bids": [
957 ["50000.00", "1.5"],
958 ["49999.00", "2.0"],
959 ["49998.00", "0.5"]
960 ],
961 "asks": [
962 ["50001.00", "1.0"],
963 ["50002.00", "3.0"],
964 ["50003.00", "2.5"]
965 ],
966 "ts": "1700000000000"
967 }]
968 }"#,
969 )
970 .unwrap();
971
972 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
973 assert_eq!(orderbook.symbol, "BTC/USDT");
974 assert_eq!(orderbook.bids.len(), 3);
975 assert_eq!(orderbook.asks.len(), 3);
976
977 assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
979 assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
980 assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
981
982 assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
984 assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
985 assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
986 }
987
988 #[test]
989 fn test_parse_ws_orderbook_update() {
990 let msg = serde_json::from_str(
991 r#"{
992 "action": "update",
993 "arg": {
994 "instType": "SPOT",
995 "channel": "books",
996 "instId": "ETHUSDT"
997 },
998 "data": [{
999 "bids": [
1000 ["2000.00", "10.0"]
1001 ],
1002 "asks": [
1003 ["2001.00", "5.0"]
1004 ],
1005 "ts": "1700000000001"
1006 }]
1007 }"#,
1008 )
1009 .unwrap();
1010
1011 let orderbook = parse_ws_orderbook(&msg, "ETH/USDT".to_string()).unwrap();
1012 assert_eq!(orderbook.symbol, "ETH/USDT");
1013 assert_eq!(orderbook.bids.len(), 1);
1014 assert_eq!(orderbook.asks.len(), 1);
1015 assert_eq!(orderbook.timestamp, 1700000000001);
1016 }
1017
1018 #[test]
1019 fn test_parse_ws_orderbook_missing_data() {
1020 let msg = serde_json::from_str(
1021 r#"{
1022 "action": "snapshot",
1023 "arg": {
1024 "instType": "SPOT",
1025 "channel": "books5",
1026 "instId": "BTCUSDT"
1027 }
1028 }"#,
1029 )
1030 .unwrap();
1031
1032 let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
1033 assert!(result.is_err());
1034 }
1035
1036 #[test]
1037 fn test_parse_ws_orderbook_empty_sides() {
1038 let msg = serde_json::from_str(
1039 r#"{
1040 "action": "snapshot",
1041 "arg": {
1042 "instType": "SPOT",
1043 "channel": "books5",
1044 "instId": "BTCUSDT"
1045 },
1046 "data": [{
1047 "bids": [],
1048 "asks": [],
1049 "ts": "1700000000000"
1050 }]
1051 }"#,
1052 )
1053 .unwrap();
1054
1055 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
1056 assert!(orderbook.bids.is_empty());
1057 assert!(orderbook.asks.is_empty());
1058 }
1059
1060 #[test]
1063 fn test_parse_ws_trade_single() {
1064 let msg = serde_json::from_str(
1065 r#"{
1066 "action": "snapshot",
1067 "arg": {
1068 "instType": "SPOT",
1069 "channel": "trade",
1070 "instId": "BTCUSDT"
1071 },
1072 "data": [{
1073 "tradeId": "123456789",
1074 "symbol": "BTCUSDT",
1075 "side": "buy",
1076 "price": "50000.00",
1077 "size": "0.5",
1078 "ts": "1700000000000"
1079 }]
1080 }"#,
1081 )
1082 .unwrap();
1083
1084 let trade = parse_ws_trade(&msg, None).unwrap();
1085 assert_eq!(trade.id, Some("123456789".to_string()));
1086 assert_eq!(trade.side, ccxt_core::types::OrderSide::Buy);
1087 assert_eq!(trade.price, Price::new(dec!(50000.00)));
1088 assert_eq!(
1089 trade.amount,
1090 ccxt_core::types::financial::Amount::new(dec!(0.5))
1091 );
1092 assert_eq!(trade.timestamp, 1700000000000);
1093 }
1094
1095 #[test]
1096 fn test_parse_ws_trades_multiple() {
1097 let msg = serde_json::from_str(
1098 r#"{
1099 "action": "snapshot",
1100 "arg": {
1101 "instType": "SPOT",
1102 "channel": "trade",
1103 "instId": "BTCUSDT"
1104 },
1105 "data": [
1106 {
1107 "tradeId": "123456789",
1108 "symbol": "BTCUSDT",
1109 "side": "buy",
1110 "price": "50000.00",
1111 "size": "0.5",
1112 "ts": "1700000000000"
1113 },
1114 {
1115 "tradeId": "123456790",
1116 "symbol": "BTCUSDT",
1117 "side": "sell",
1118 "price": "50001.00",
1119 "size": "1.0",
1120 "ts": "1700000000001"
1121 }
1122 ]
1123 }"#,
1124 )
1125 .unwrap();
1126
1127 let trades = parse_ws_trades(&msg, None).unwrap();
1128 assert_eq!(trades.len(), 2);
1129
1130 assert_eq!(trades[0].id, Some("123456789".to_string()));
1131 assert_eq!(trades[0].side, ccxt_core::types::OrderSide::Buy);
1132
1133 assert_eq!(trades[1].id, Some("123456790".to_string()));
1134 assert_eq!(trades[1].side, ccxt_core::types::OrderSide::Sell);
1135 }
1136
1137 #[test]
1138 fn test_parse_ws_trade_sell_side() {
1139 let msg = serde_json::from_str(
1140 r#"{
1141 "action": "snapshot",
1142 "arg": {
1143 "instType": "SPOT",
1144 "channel": "trade",
1145 "instId": "BTCUSDT"
1146 },
1147 "data": [{
1148 "tradeId": "123456789",
1149 "symbol": "BTCUSDT",
1150 "side": "sell",
1151 "price": "50000.00",
1152 "size": "0.5",
1153 "ts": "1700000000000"
1154 }]
1155 }"#,
1156 )
1157 .unwrap();
1158
1159 let trade = parse_ws_trade(&msg, None).unwrap();
1160 assert_eq!(trade.side, ccxt_core::types::OrderSide::Sell);
1161 }
1162
1163 #[test]
1164 fn test_parse_ws_trade_missing_data() {
1165 let msg = serde_json::from_str(
1166 r#"{
1167 "action": "snapshot",
1168 "arg": {
1169 "instType": "SPOT",
1170 "channel": "trade",
1171 "instId": "BTCUSDT"
1172 }
1173 }"#,
1174 )
1175 .unwrap();
1176
1177 let result = parse_ws_trade(&msg, None);
1178 assert!(result.is_err());
1179 }
1180
1181 #[test]
1182 fn test_parse_ws_trades_empty_array() {
1183 let msg = serde_json::from_str(
1184 r#"{
1185 "action": "snapshot",
1186 "arg": {
1187 "instType": "SPOT",
1188 "channel": "trade",
1189 "instId": "BTCUSDT"
1190 },
1191 "data": []
1192 }"#,
1193 )
1194 .unwrap();
1195
1196 let trades = parse_ws_trades(&msg, None).unwrap();
1197 assert!(trades.is_empty());
1198 }
1199
1200 #[test]
1203 fn test_is_ticker_message_true() {
1204 let msg = serde_json::from_str(
1205 r#"{
1206 "action": "snapshot",
1207 "arg": {
1208 "instType": "SPOT",
1209 "channel": "ticker",
1210 "instId": "BTCUSDT"
1211 },
1212 "data": [{}]
1213 }"#,
1214 )
1215 .unwrap();
1216
1217 assert!(is_ticker_message(&msg, "BTCUSDT"));
1218 }
1219
1220 #[test]
1221 fn test_is_ticker_message_wrong_symbol() {
1222 let msg = serde_json::from_str(
1223 r#"{
1224 "action": "snapshot",
1225 "arg": {
1226 "instType": "SPOT",
1227 "channel": "ticker",
1228 "instId": "ETHUSDT"
1229 },
1230 "data": [{}]
1231 }"#,
1232 )
1233 .unwrap();
1234
1235 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1236 }
1237
1238 #[test]
1239 fn test_is_ticker_message_wrong_channel() {
1240 let msg = serde_json::from_str(
1241 r#"{
1242 "action": "snapshot",
1243 "arg": {
1244 "instType": "SPOT",
1245 "channel": "trade",
1246 "instId": "BTCUSDT"
1247 },
1248 "data": [{}]
1249 }"#,
1250 )
1251 .unwrap();
1252
1253 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1254 }
1255
1256 #[test]
1257 fn test_is_orderbook_message_books5() {
1258 let msg = serde_json::from_str(
1259 r#"{
1260 "arg": {
1261 "instType": "SPOT",
1262 "channel": "books5",
1263 "instId": "BTCUSDT"
1264 }
1265 }"#,
1266 )
1267 .unwrap();
1268
1269 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1270 }
1271
1272 #[test]
1273 fn test_is_orderbook_message_books15() {
1274 let msg = serde_json::from_str(
1275 r#"{
1276 "arg": {
1277 "instType": "SPOT",
1278 "channel": "books15",
1279 "instId": "BTCUSDT"
1280 }
1281 }"#,
1282 )
1283 .unwrap();
1284
1285 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1286 }
1287
1288 #[test]
1289 fn test_is_orderbook_message_books() {
1290 let msg = serde_json::from_str(
1291 r#"{
1292 "arg": {
1293 "instType": "SPOT",
1294 "channel": "books",
1295 "instId": "BTCUSDT"
1296 }
1297 }"#,
1298 )
1299 .unwrap();
1300
1301 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1302 }
1303
1304 #[test]
1305 fn test_is_trade_message_true() {
1306 let msg = serde_json::from_str(
1307 r#"{
1308 "arg": {
1309 "instType": "SPOT",
1310 "channel": "trade",
1311 "instId": "BTCUSDT"
1312 }
1313 }"#,
1314 )
1315 .unwrap();
1316
1317 assert!(is_trade_message(&msg, "BTCUSDT"));
1318 }
1319
1320 #[test]
1321 fn test_is_trade_message_wrong_channel() {
1322 let msg = serde_json::from_str(
1323 r#"{
1324 "arg": {
1325 "instType": "SPOT",
1326 "channel": "ticker",
1327 "instId": "BTCUSDT"
1328 }
1329 }"#,
1330 )
1331 .unwrap();
1332
1333 assert!(!is_trade_message(&msg, "BTCUSDT"));
1334 }
1335
1336 #[test]
1339 fn test_format_unified_symbol_usdt() {
1340 assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1341 assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1342 }
1343
1344 #[test]
1345 fn test_format_unified_symbol_usdc() {
1346 assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1347 }
1348
1349 #[test]
1350 fn test_format_unified_symbol_btc() {
1351 assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1352 }
1353
1354 #[test]
1355 fn test_format_unified_symbol_unknown() {
1356 assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1358 }
1359}