1use crate::bitget::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
21
22const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
24
25const MAX_RECONNECT_ATTEMPTS: u32 = 10;
27
28pub struct BitgetWs {
32 client: Arc<WsClient>,
34 subscriptions: Arc<RwLock<Vec<String>>>,
36}
37
38impl BitgetWs {
39 pub fn new(url: String) -> Self {
45 let config = WsConfig {
46 url: url.clone(),
47 connect_timeout: 10000,
48 ping_interval: DEFAULT_PING_INTERVAL_MS,
49 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
50 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
51 auto_reconnect: true,
52 enable_compression: false,
53 pong_timeout: 90000,
54 ..Default::default()
55 };
56
57 Self {
58 client: Arc::new(WsClient::new(config)),
59 subscriptions: Arc::new(RwLock::new(Vec::new())),
60 }
61 }
62
63 pub async fn connect(&self) -> Result<()> {
65 self.client.connect().await
66 }
67
68 pub async fn disconnect(&self) -> Result<()> {
70 self.client.disconnect().await
71 }
72
73 pub fn state(&self) -> WsConnectionState {
75 self.client.state()
76 }
77
78 pub fn is_connected(&self) -> bool {
80 self.client.is_connected()
81 }
82
83 pub async fn receive(&self) -> Option<Value> {
85 self.client.receive().await
86 }
87
88 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
94 let mut arg_map = serde_json::Map::new();
95 arg_map.insert(
96 "instType".to_string(),
97 serde_json::Value::String("SPOT".to_string()),
98 );
99 arg_map.insert(
100 "channel".to_string(),
101 serde_json::Value::String("ticker".to_string()),
102 );
103 arg_map.insert(
104 "instId".to_string(),
105 serde_json::Value::String(symbol.to_string()),
106 );
107 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
108
109 let mut msg_map = serde_json::Map::new();
110 msg_map.insert(
111 "op".to_string(),
112 serde_json::Value::String("subscribe".to_string()),
113 );
114 msg_map.insert("args".to_string(), args);
115 let msg = serde_json::Value::Object(msg_map);
116
117 self.client.send_json(&msg).await?;
118
119 let sub_key = format!("ticker:{}", symbol);
120 self.subscriptions.write().await.push(sub_key);
121
122 Ok(())
123 }
124
125 pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
132 let channel = match depth {
133 5 => "books5",
134 15 => "books15",
135 _ => "books",
136 };
137
138 let mut arg_map = serde_json::Map::new();
139 arg_map.insert(
140 "instType".to_string(),
141 serde_json::Value::String("SPOT".to_string()),
142 );
143 arg_map.insert(
144 "channel".to_string(),
145 serde_json::Value::String(channel.to_string()),
146 );
147 arg_map.insert(
148 "instId".to_string(),
149 serde_json::Value::String(symbol.to_string()),
150 );
151 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
152
153 let mut msg_map = serde_json::Map::new();
154 msg_map.insert(
155 "op".to_string(),
156 serde_json::Value::String("subscribe".to_string()),
157 );
158 msg_map.insert("args".to_string(), args);
159 let msg = serde_json::Value::Object(msg_map);
160
161 self.client.send_json(&msg).await?;
162
163 let sub_key = format!("orderbook:{}", symbol);
164 self.subscriptions.write().await.push(sub_key);
165
166 Ok(())
167 }
168
169 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
175 let mut arg_map = serde_json::Map::new();
176 arg_map.insert(
177 "instType".to_string(),
178 serde_json::Value::String("SPOT".to_string()),
179 );
180 arg_map.insert(
181 "channel".to_string(),
182 serde_json::Value::String("trade".to_string()),
183 );
184 arg_map.insert(
185 "instId".to_string(),
186 serde_json::Value::String(symbol.to_string()),
187 );
188 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
189
190 let mut msg_map = serde_json::Map::new();
191 msg_map.insert(
192 "op".to_string(),
193 serde_json::Value::String("subscribe".to_string()),
194 );
195 msg_map.insert("args".to_string(), args);
196 let msg = serde_json::Value::Object(msg_map);
197
198 self.client.send_json(&msg).await?;
199
200 let sub_key = format!("trades:{}", symbol);
201 self.subscriptions.write().await.push(sub_key);
202
203 Ok(())
204 }
205
206 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
213 let channel = format!("candle{}", interval);
214
215 let mut arg_map = serde_json::Map::new();
216 arg_map.insert(
217 "instType".to_string(),
218 serde_json::Value::String("SPOT".to_string()),
219 );
220 arg_map.insert(
221 "channel".to_string(),
222 serde_json::Value::String(channel.clone()),
223 );
224 arg_map.insert(
225 "instId".to_string(),
226 serde_json::Value::String(symbol.to_string()),
227 );
228 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
229
230 let mut msg_map = serde_json::Map::new();
231 msg_map.insert(
232 "op".to_string(),
233 serde_json::Value::String("subscribe".to_string()),
234 );
235 msg_map.insert("args".to_string(), args);
236 let msg = serde_json::Value::Object(msg_map);
237
238 self.client.send_json(&msg).await?;
239
240 let sub_key = format!("kline:{}:{}", symbol, interval);
241 self.subscriptions.write().await.push(sub_key);
242
243 Ok(())
244 }
245
246 pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
252 let parts: Vec<&str> = stream_name.split(':').collect();
254 if parts.len() < 2 {
255 return Err(Error::invalid_request(format!(
256 "Invalid stream name: {}",
257 stream_name
258 )));
259 }
260
261 let channel = parts[0];
262 let symbol = parts[1];
263
264 let bitget_channel = match channel {
265 "ticker" => "ticker",
266 "orderbook" => "books",
267 "trades" => "trade",
268 "kline" => {
269 if parts.len() >= 3 {
270 return self.unsubscribe_kline(symbol, parts[2]).await;
272 }
273 return Err(Error::invalid_request(
274 "Kline unsubscribe requires interval",
275 ));
276 }
277 _ => channel,
278 };
279
280 let mut arg_map = serde_json::Map::new();
281 arg_map.insert(
282 "instType".to_string(),
283 serde_json::Value::String("SPOT".to_string()),
284 );
285 arg_map.insert(
286 "channel".to_string(),
287 serde_json::Value::String(bitget_channel.to_string()),
288 );
289 arg_map.insert(
290 "instId".to_string(),
291 serde_json::Value::String(symbol.to_string()),
292 );
293 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
294
295 let mut msg_map = serde_json::Map::new();
296 msg_map.insert(
297 "op".to_string(),
298 serde_json::Value::String("unsubscribe".to_string()),
299 );
300 msg_map.insert("args".to_string(), args);
301 let msg = serde_json::Value::Object(msg_map);
302
303 self.client.send_json(&msg).await?;
304
305 let mut subs = self.subscriptions.write().await;
307 subs.retain(|s| s != &stream_name);
308
309 Ok(())
310 }
311
312 async fn unsubscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
314 let channel = format!("candle{}", interval);
315
316 let mut arg_map = serde_json::Map::new();
317 arg_map.insert(
318 "instType".to_string(),
319 serde_json::Value::String("SPOT".to_string()),
320 );
321 arg_map.insert(
322 "channel".to_string(),
323 serde_json::Value::String(channel.clone()),
324 );
325 arg_map.insert(
326 "instId".to_string(),
327 serde_json::Value::String(symbol.to_string()),
328 );
329 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
330
331 let mut msg_map = serde_json::Map::new();
332 msg_map.insert(
333 "op".to_string(),
334 serde_json::Value::String("unsubscribe".to_string()),
335 );
336 msg_map.insert("args".to_string(), args);
337 let msg = serde_json::Value::Object(msg_map);
338
339 self.client.send_json(&msg).await?;
340
341 let sub_key = format!("kline:{}:{}", symbol, interval);
342 let mut subs = self.subscriptions.write().await;
343 subs.retain(|s| s != &sub_key);
344
345 Ok(())
346 }
347
348 pub async fn subscriptions(&self) -> Vec<String> {
350 self.subscriptions.read().await.clone()
351 }
352
353 pub async fn watch_ticker(
386 &self,
387 symbol: &str,
388 market: Option<Market>,
389 ) -> Result<MessageStream<Ticker>> {
390 if !self.is_connected() {
392 self.connect().await?;
393 }
394
395 self.subscribe_ticker(symbol).await?;
397
398 let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
400 let symbol_owned = symbol.to_string();
401 let client = Arc::clone(&self.client);
402
403 tokio::spawn(async move {
405 while let Some(msg) = client.receive().await {
406 if is_ticker_message(&msg, &symbol_owned) {
408 match parse_ws_ticker(&msg, market.as_ref()) {
409 Ok(ticker) => {
410 if tx.send(Ok(ticker)).is_err() {
411 break; }
413 }
414 Err(e) => {
415 if tx.send(Err(e)).is_err() {
416 break;
417 }
418 }
419 }
420 }
421 }
422 });
423
424 Ok(Box::pin(ReceiverStream::new(rx)))
425 }
426
427 pub async fn watch_order_book(
460 &self,
461 symbol: &str,
462 limit: Option<u32>,
463 ) -> Result<MessageStream<OrderBook>> {
464 if !self.is_connected() {
466 self.connect().await?;
467 }
468
469 let depth = limit.unwrap_or(15);
471 self.subscribe_orderbook(symbol, depth).await?;
472
473 let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
475 let symbol_owned = symbol.to_string();
476 let unified_symbol = format_unified_symbol(&symbol_owned);
477 let client = Arc::clone(&self.client);
478
479 tokio::spawn(async move {
481 while let Some(msg) = client.receive().await {
482 if is_orderbook_message(&msg, &symbol_owned) {
484 match parse_ws_orderbook(&msg, unified_symbol.clone()) {
485 Ok(orderbook) => {
486 if tx.send(Ok(orderbook)).is_err() {
487 break; }
489 }
490 Err(e) => {
491 if tx.send(Err(e)).is_err() {
492 break;
493 }
494 }
495 }
496 }
497 }
498 });
499
500 Ok(Box::pin(ReceiverStream::new(rx)))
501 }
502
503 pub async fn watch_trades(
540 &self,
541 symbol: &str,
542 market: Option<Market>,
543 ) -> Result<MessageStream<Vec<Trade>>> {
544 if !self.is_connected() {
546 self.connect().await?;
547 }
548
549 self.subscribe_trades(symbol).await?;
551
552 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
554 let symbol_owned = symbol.to_string();
555 let client = Arc::clone(&self.client);
556
557 tokio::spawn(async move {
559 while let Some(msg) = client.receive().await {
560 if is_trade_message(&msg, &symbol_owned) {
562 match parse_ws_trades(&msg, market.as_ref()) {
563 Ok(trades) => {
564 if tx.send(Ok(trades)).is_err() {
565 break; }
567 }
568 Err(e) => {
569 if tx.send(Err(e)).is_err() {
570 break;
571 }
572 }
573 }
574 }
575 }
576 });
577
578 Ok(Box::pin(ReceiverStream::new(rx)))
579 }
580}
581
582struct ReceiverStream<T> {
588 receiver: mpsc::UnboundedReceiver<T>,
589}
590
591impl<T> ReceiverStream<T> {
592 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
593 Self { receiver }
594 }
595}
596
597impl<T> Stream for ReceiverStream<T> {
598 type Item = T;
599
600 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
601 self.receiver.poll_recv(cx)
602 }
603}
604
605fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
611 if let Some(arg) = msg.get("arg") {
612 let channel = arg.get("channel").and_then(|c| c.as_str());
613 let inst_id = arg.get("instId").and_then(|i| i.as_str());
614
615 channel == Some("ticker") && inst_id == Some(symbol)
616 } else {
617 false
618 }
619}
620
621fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
623 if let Some(arg) = msg.get("arg") {
624 let channel = arg.get("channel").and_then(|c| c.as_str());
625 let inst_id = arg.get("instId").and_then(|i| i.as_str());
626
627 let is_orderbook_channel = channel.is_some_and(|c| c.starts_with("books"));
629 is_orderbook_channel && inst_id == Some(symbol)
630 } else {
631 false
632 }
633}
634
635fn is_trade_message(msg: &Value, symbol: &str) -> bool {
637 if let Some(arg) = msg.get("arg") {
638 let channel = arg.get("channel").and_then(|c| c.as_str());
639 let inst_id = arg.get("instId").and_then(|i| i.as_str());
640
641 channel == Some("trade") && inst_id == Some(symbol)
642 } else {
643 false
644 }
645}
646
647fn format_unified_symbol(symbol: &str) -> String {
649 let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
651
652 for quote in "e_currencies {
653 if let Some(base) = symbol.strip_suffix(quote) {
654 if !base.is_empty() {
655 return format!("{}/{}", base, quote);
656 }
657 }
658 }
659
660 symbol.to_string()
662}
663
664pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
670 let data = msg
673 .get("data")
674 .and_then(|d| d.as_array())
675 .and_then(|arr| arr.first())
676 .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
677
678 parse_ticker(data, market)
679}
680
681pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
683 let data = msg
686 .get("data")
687 .and_then(|d| d.as_array())
688 .and_then(|arr| arr.first())
689 .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
690
691 parse_orderbook(data, symbol)
692}
693
694pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
696 let data = msg
699 .get("data")
700 .and_then(|d| d.as_array())
701 .and_then(|arr| arr.first())
702 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
703
704 parse_trade(data, market)
705}
706
707pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
709 let data_array = msg
712 .get("data")
713 .and_then(|d| d.as_array())
714 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
715
716 let mut trades = Vec::with_capacity(data_array.len());
717 for data in data_array {
718 trades.push(parse_trade(data, market)?);
719 }
720
721 Ok(trades)
722}
723
724#[cfg(test)]
725mod tests {
726 use super::*;
727 use ccxt_core::types::financial::Price;
728 use rust_decimal_macros::dec;
729
730 #[test]
731 fn test_bitget_ws_creation() {
732 let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
733 assert!(ws.subscriptions.try_read().is_ok());
735 }
736
737 #[tokio::test]
738 async fn test_subscriptions_empty_by_default() {
739 let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
740 let subs = ws.subscriptions().await;
741 assert!(subs.is_empty());
742 }
743
744 #[test]
747 fn test_parse_ws_ticker_snapshot() {
748 let msg = serde_json::from_str(
749 r#"{
750 "action": "snapshot",
751 "arg": {
752 "instType": "SPOT",
753 "channel": "ticker",
754 "instId": "BTCUSDT"
755 },
756 "data": [{
757 "instId": "BTCUSDT",
758 "lastPr": "50000.00",
759 "high24h": "51000.00",
760 "low24h": "49000.00",
761 "bidPr": "49999.00",
762 "askPr": "50001.00",
763 "baseVolume": "1000.5",
764 "ts": "1700000000000"
765 }]
766 }"#,
767 )
768 .unwrap();
769
770 let ticker = parse_ws_ticker(&msg, None).unwrap();
771 assert_eq!(ticker.symbol, "BTCUSDT");
772 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
773 assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
774 assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
775 assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
776 assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
777 assert_eq!(ticker.timestamp, 1700000000000);
778 }
779
780 #[test]
781 fn test_parse_ws_ticker_with_market() {
782 let msg = serde_json::from_str(
783 r#"{
784 "action": "snapshot",
785 "arg": {
786 "instType": "SPOT",
787 "channel": "ticker",
788 "instId": "BTCUSDT"
789 },
790 "data": [{
791 "instId": "BTCUSDT",
792 "lastPr": "50000.00",
793 "ts": "1700000000000"
794 }]
795 }"#,
796 )
797 .unwrap();
798
799 let market = Market {
800 id: "BTCUSDT".to_string(),
801 symbol: "BTC/USDT".to_string(),
802 base: "BTC".to_string(),
803 quote: "USDT".to_string(),
804 ..Default::default()
805 };
806
807 let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
808 assert_eq!(ticker.symbol, "BTC/USDT");
809 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
810 }
811
812 #[test]
813 fn test_parse_ws_ticker_missing_data() {
814 let msg = serde_json::from_str(
815 r#"{
816 "action": "snapshot",
817 "arg": {
818 "instType": "SPOT",
819 "channel": "ticker",
820 "instId": "BTCUSDT"
821 }
822 }"#,
823 )
824 .unwrap();
825
826 let result = parse_ws_ticker(&msg, None);
827 assert!(result.is_err());
828 }
829
830 #[test]
831 fn test_parse_ws_ticker_empty_data_array() {
832 let msg = serde_json::from_str(
833 r#"{
834 "action": "snapshot",
835 "arg": {
836 "instType": "SPOT",
837 "channel": "ticker",
838 "instId": "BTCUSDT"
839 },
840 "data": []
841 }"#,
842 )
843 .unwrap();
844
845 let result = parse_ws_ticker(&msg, None);
846 assert!(result.is_err());
847 }
848
849 #[test]
852 fn test_parse_ws_orderbook_snapshot() {
853 let msg = serde_json::from_str(
854 r#"{
855 "action": "snapshot",
856 "arg": {
857 "instType": "SPOT",
858 "channel": "books5",
859 "instId": "BTCUSDT"
860 },
861 "data": [{
862 "bids": [
863 ["50000.00", "1.5"],
864 ["49999.00", "2.0"],
865 ["49998.00", "0.5"]
866 ],
867 "asks": [
868 ["50001.00", "1.0"],
869 ["50002.00", "3.0"],
870 ["50003.00", "2.5"]
871 ],
872 "ts": "1700000000000"
873 }]
874 }"#,
875 )
876 .unwrap();
877
878 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
879 assert_eq!(orderbook.symbol, "BTC/USDT");
880 assert_eq!(orderbook.bids.len(), 3);
881 assert_eq!(orderbook.asks.len(), 3);
882
883 assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
885 assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
886 assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
887
888 assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
890 assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
891 assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
892 }
893
894 #[test]
895 fn test_parse_ws_orderbook_update() {
896 let msg = serde_json::from_str(
897 r#"{
898 "action": "update",
899 "arg": {
900 "instType": "SPOT",
901 "channel": "books",
902 "instId": "ETHUSDT"
903 },
904 "data": [{
905 "bids": [
906 ["2000.00", "10.0"]
907 ],
908 "asks": [
909 ["2001.00", "5.0"]
910 ],
911 "ts": "1700000000001"
912 }]
913 }"#,
914 )
915 .unwrap();
916
917 let orderbook = parse_ws_orderbook(&msg, "ETH/USDT".to_string()).unwrap();
918 assert_eq!(orderbook.symbol, "ETH/USDT");
919 assert_eq!(orderbook.bids.len(), 1);
920 assert_eq!(orderbook.asks.len(), 1);
921 assert_eq!(orderbook.timestamp, 1700000000001);
922 }
923
924 #[test]
925 fn test_parse_ws_orderbook_missing_data() {
926 let msg = serde_json::from_str(
927 r#"{
928 "action": "snapshot",
929 "arg": {
930 "instType": "SPOT",
931 "channel": "books5",
932 "instId": "BTCUSDT"
933 }
934 }"#,
935 )
936 .unwrap();
937
938 let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
939 assert!(result.is_err());
940 }
941
942 #[test]
943 fn test_parse_ws_orderbook_empty_sides() {
944 let msg = serde_json::from_str(
945 r#"{
946 "action": "snapshot",
947 "arg": {
948 "instType": "SPOT",
949 "channel": "books5",
950 "instId": "BTCUSDT"
951 },
952 "data": [{
953 "bids": [],
954 "asks": [],
955 "ts": "1700000000000"
956 }]
957 }"#,
958 )
959 .unwrap();
960
961 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
962 assert!(orderbook.bids.is_empty());
963 assert!(orderbook.asks.is_empty());
964 }
965
966 #[test]
969 fn test_parse_ws_trade_single() {
970 let msg = serde_json::from_str(
971 r#"{
972 "action": "snapshot",
973 "arg": {
974 "instType": "SPOT",
975 "channel": "trade",
976 "instId": "BTCUSDT"
977 },
978 "data": [{
979 "tradeId": "123456789",
980 "symbol": "BTCUSDT",
981 "side": "buy",
982 "price": "50000.00",
983 "size": "0.5",
984 "ts": "1700000000000"
985 }]
986 }"#,
987 )
988 .unwrap();
989
990 let trade = parse_ws_trade(&msg, None).unwrap();
991 assert_eq!(trade.id, Some("123456789".to_string()));
992 assert_eq!(trade.side, ccxt_core::types::OrderSide::Buy);
993 assert_eq!(trade.price, Price::new(dec!(50000.00)));
994 assert_eq!(
995 trade.amount,
996 ccxt_core::types::financial::Amount::new(dec!(0.5))
997 );
998 assert_eq!(trade.timestamp, 1700000000000);
999 }
1000
1001 #[test]
1002 fn test_parse_ws_trades_multiple() {
1003 let msg = serde_json::from_str(
1004 r#"{
1005 "action": "snapshot",
1006 "arg": {
1007 "instType": "SPOT",
1008 "channel": "trade",
1009 "instId": "BTCUSDT"
1010 },
1011 "data": [
1012 {
1013 "tradeId": "123456789",
1014 "symbol": "BTCUSDT",
1015 "side": "buy",
1016 "price": "50000.00",
1017 "size": "0.5",
1018 "ts": "1700000000000"
1019 },
1020 {
1021 "tradeId": "123456790",
1022 "symbol": "BTCUSDT",
1023 "side": "sell",
1024 "price": "50001.00",
1025 "size": "1.0",
1026 "ts": "1700000000001"
1027 }
1028 ]
1029 }"#,
1030 )
1031 .unwrap();
1032
1033 let trades = parse_ws_trades(&msg, None).unwrap();
1034 assert_eq!(trades.len(), 2);
1035
1036 assert_eq!(trades[0].id, Some("123456789".to_string()));
1037 assert_eq!(trades[0].side, ccxt_core::types::OrderSide::Buy);
1038
1039 assert_eq!(trades[1].id, Some("123456790".to_string()));
1040 assert_eq!(trades[1].side, ccxt_core::types::OrderSide::Sell);
1041 }
1042
1043 #[test]
1044 fn test_parse_ws_trade_sell_side() {
1045 let msg = serde_json::from_str(
1046 r#"{
1047 "action": "snapshot",
1048 "arg": {
1049 "instType": "SPOT",
1050 "channel": "trade",
1051 "instId": "BTCUSDT"
1052 },
1053 "data": [{
1054 "tradeId": "123456789",
1055 "symbol": "BTCUSDT",
1056 "side": "sell",
1057 "price": "50000.00",
1058 "size": "0.5",
1059 "ts": "1700000000000"
1060 }]
1061 }"#,
1062 )
1063 .unwrap();
1064
1065 let trade = parse_ws_trade(&msg, None).unwrap();
1066 assert_eq!(trade.side, ccxt_core::types::OrderSide::Sell);
1067 }
1068
1069 #[test]
1070 fn test_parse_ws_trade_missing_data() {
1071 let msg = serde_json::from_str(
1072 r#"{
1073 "action": "snapshot",
1074 "arg": {
1075 "instType": "SPOT",
1076 "channel": "trade",
1077 "instId": "BTCUSDT"
1078 }
1079 }"#,
1080 )
1081 .unwrap();
1082
1083 let result = parse_ws_trade(&msg, None);
1084 assert!(result.is_err());
1085 }
1086
1087 #[test]
1088 fn test_parse_ws_trades_empty_array() {
1089 let msg = serde_json::from_str(
1090 r#"{
1091 "action": "snapshot",
1092 "arg": {
1093 "instType": "SPOT",
1094 "channel": "trade",
1095 "instId": "BTCUSDT"
1096 },
1097 "data": []
1098 }"#,
1099 )
1100 .unwrap();
1101
1102 let trades = parse_ws_trades(&msg, None).unwrap();
1103 assert!(trades.is_empty());
1104 }
1105
1106 #[test]
1109 fn test_is_ticker_message_true() {
1110 let msg = serde_json::from_str(
1111 r#"{
1112 "action": "snapshot",
1113 "arg": {
1114 "instType": "SPOT",
1115 "channel": "ticker",
1116 "instId": "BTCUSDT"
1117 },
1118 "data": [{}]
1119 }"#,
1120 )
1121 .unwrap();
1122
1123 assert!(is_ticker_message(&msg, "BTCUSDT"));
1124 }
1125
1126 #[test]
1127 fn test_is_ticker_message_wrong_symbol() {
1128 let msg = serde_json::from_str(
1129 r#"{
1130 "action": "snapshot",
1131 "arg": {
1132 "instType": "SPOT",
1133 "channel": "ticker",
1134 "instId": "ETHUSDT"
1135 },
1136 "data": [{}]
1137 }"#,
1138 )
1139 .unwrap();
1140
1141 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1142 }
1143
1144 #[test]
1145 fn test_is_ticker_message_wrong_channel() {
1146 let msg = serde_json::from_str(
1147 r#"{
1148 "action": "snapshot",
1149 "arg": {
1150 "instType": "SPOT",
1151 "channel": "trade",
1152 "instId": "BTCUSDT"
1153 },
1154 "data": [{}]
1155 }"#,
1156 )
1157 .unwrap();
1158
1159 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1160 }
1161
1162 #[test]
1163 fn test_is_orderbook_message_books5() {
1164 let msg = serde_json::from_str(
1165 r#"{
1166 "arg": {
1167 "instType": "SPOT",
1168 "channel": "books5",
1169 "instId": "BTCUSDT"
1170 }
1171 }"#,
1172 )
1173 .unwrap();
1174
1175 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1176 }
1177
1178 #[test]
1179 fn test_is_orderbook_message_books15() {
1180 let msg = serde_json::from_str(
1181 r#"{
1182 "arg": {
1183 "instType": "SPOT",
1184 "channel": "books15",
1185 "instId": "BTCUSDT"
1186 }
1187 }"#,
1188 )
1189 .unwrap();
1190
1191 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1192 }
1193
1194 #[test]
1195 fn test_is_orderbook_message_books() {
1196 let msg = serde_json::from_str(
1197 r#"{
1198 "arg": {
1199 "instType": "SPOT",
1200 "channel": "books",
1201 "instId": "BTCUSDT"
1202 }
1203 }"#,
1204 )
1205 .unwrap();
1206
1207 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1208 }
1209
1210 #[test]
1211 fn test_is_trade_message_true() {
1212 let msg = serde_json::from_str(
1213 r#"{
1214 "arg": {
1215 "instType": "SPOT",
1216 "channel": "trade",
1217 "instId": "BTCUSDT"
1218 }
1219 }"#,
1220 )
1221 .unwrap();
1222
1223 assert!(is_trade_message(&msg, "BTCUSDT"));
1224 }
1225
1226 #[test]
1227 fn test_is_trade_message_wrong_channel() {
1228 let msg = serde_json::from_str(
1229 r#"{
1230 "arg": {
1231 "instType": "SPOT",
1232 "channel": "ticker",
1233 "instId": "BTCUSDT"
1234 }
1235 }"#,
1236 )
1237 .unwrap();
1238
1239 assert!(!is_trade_message(&msg, "BTCUSDT"));
1240 }
1241
1242 #[test]
1245 fn test_format_unified_symbol_usdt() {
1246 assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1247 assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1248 }
1249
1250 #[test]
1251 fn test_format_unified_symbol_usdc() {
1252 assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1253 }
1254
1255 #[test]
1256 fn test_format_unified_symbol_btc() {
1257 assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1258 }
1259
1260 #[test]
1261 fn test_format_unified_symbol_unknown() {
1262 assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1264 }
1265}