1use crate::bitget::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
21
22const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
24
25const MAX_RECONNECT_ATTEMPTS: u32 = 10;
27
28pub struct BitgetWs {
32 client: Arc<WsClient>,
34 subscriptions: Arc<RwLock<Vec<String>>>,
36}
37
38impl BitgetWs {
39 pub fn new(url: String) -> Self {
45 let config = WsConfig {
46 url: url.clone(),
47 connect_timeout: 10000,
48 ping_interval: DEFAULT_PING_INTERVAL_MS,
49 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
50 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
51 auto_reconnect: true,
52 enable_compression: false,
53 pong_timeout: 90000,
54 };
55
56 Self {
57 client: Arc::new(WsClient::new(config)),
58 subscriptions: Arc::new(RwLock::new(Vec::new())),
59 }
60 }
61
62 pub async fn connect(&self) -> Result<()> {
64 self.client.connect().await
65 }
66
67 pub async fn disconnect(&self) -> Result<()> {
69 self.client.disconnect().await
70 }
71
72 pub async fn state(&self) -> WsConnectionState {
74 self.client.state().await
75 }
76
77 pub async fn is_connected(&self) -> bool {
79 self.client.is_connected().await
80 }
81
82 pub async fn receive(&self) -> Option<Value> {
84 self.client.receive().await
85 }
86
87 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
93 let mut arg_map = serde_json::Map::new();
94 arg_map.insert(
95 "instType".to_string(),
96 serde_json::Value::String("SPOT".to_string()),
97 );
98 arg_map.insert(
99 "channel".to_string(),
100 serde_json::Value::String("ticker".to_string()),
101 );
102 arg_map.insert(
103 "instId".to_string(),
104 serde_json::Value::String(symbol.to_string()),
105 );
106 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
107
108 let mut msg_map = serde_json::Map::new();
109 msg_map.insert(
110 "op".to_string(),
111 serde_json::Value::String("subscribe".to_string()),
112 );
113 msg_map.insert("args".to_string(), args);
114 let msg = serde_json::Value::Object(msg_map);
115
116 self.client.send_json(&msg).await?;
117
118 let sub_key = format!("ticker:{}", symbol);
119 self.subscriptions.write().await.push(sub_key);
120
121 Ok(())
122 }
123
124 pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
131 let channel = match depth {
132 5 => "books5",
133 15 => "books15",
134 _ => "books",
135 };
136
137 let mut arg_map = serde_json::Map::new();
138 arg_map.insert(
139 "instType".to_string(),
140 serde_json::Value::String("SPOT".to_string()),
141 );
142 arg_map.insert(
143 "channel".to_string(),
144 serde_json::Value::String(channel.to_string()),
145 );
146 arg_map.insert(
147 "instId".to_string(),
148 serde_json::Value::String(symbol.to_string()),
149 );
150 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
151
152 let mut msg_map = serde_json::Map::new();
153 msg_map.insert(
154 "op".to_string(),
155 serde_json::Value::String("subscribe".to_string()),
156 );
157 msg_map.insert("args".to_string(), args);
158 let msg = serde_json::Value::Object(msg_map);
159
160 self.client.send_json(&msg).await?;
161
162 let sub_key = format!("orderbook:{}", symbol);
163 self.subscriptions.write().await.push(sub_key);
164
165 Ok(())
166 }
167
168 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
174 let mut arg_map = serde_json::Map::new();
175 arg_map.insert(
176 "instType".to_string(),
177 serde_json::Value::String("SPOT".to_string()),
178 );
179 arg_map.insert(
180 "channel".to_string(),
181 serde_json::Value::String("trade".to_string()),
182 );
183 arg_map.insert(
184 "instId".to_string(),
185 serde_json::Value::String(symbol.to_string()),
186 );
187 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
188
189 let mut msg_map = serde_json::Map::new();
190 msg_map.insert(
191 "op".to_string(),
192 serde_json::Value::String("subscribe".to_string()),
193 );
194 msg_map.insert("args".to_string(), args);
195 let msg = serde_json::Value::Object(msg_map);
196
197 self.client.send_json(&msg).await?;
198
199 let sub_key = format!("trades:{}", symbol);
200 self.subscriptions.write().await.push(sub_key);
201
202 Ok(())
203 }
204
205 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
212 let channel = format!("candle{}", interval);
213
214 let mut arg_map = serde_json::Map::new();
215 arg_map.insert(
216 "instType".to_string(),
217 serde_json::Value::String("SPOT".to_string()),
218 );
219 arg_map.insert(
220 "channel".to_string(),
221 serde_json::Value::String(channel.clone()),
222 );
223 arg_map.insert(
224 "instId".to_string(),
225 serde_json::Value::String(symbol.to_string()),
226 );
227 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
228
229 let mut msg_map = serde_json::Map::new();
230 msg_map.insert(
231 "op".to_string(),
232 serde_json::Value::String("subscribe".to_string()),
233 );
234 msg_map.insert("args".to_string(), args);
235 let msg = serde_json::Value::Object(msg_map);
236
237 self.client.send_json(&msg).await?;
238
239 let sub_key = format!("kline:{}:{}", symbol, interval);
240 self.subscriptions.write().await.push(sub_key);
241
242 Ok(())
243 }
244
245 pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
251 let parts: Vec<&str> = stream_name.split(':').collect();
253 if parts.len() < 2 {
254 return Err(Error::invalid_request(format!(
255 "Invalid stream name: {}",
256 stream_name
257 )));
258 }
259
260 let channel = parts[0];
261 let symbol = parts[1];
262
263 let bitget_channel = match channel {
264 "ticker" => "ticker",
265 "orderbook" => "books",
266 "trades" => "trade",
267 "kline" => {
268 if parts.len() >= 3 {
269 return self.unsubscribe_kline(symbol, parts[2]).await;
271 }
272 return Err(Error::invalid_request(
273 "Kline unsubscribe requires interval",
274 ));
275 }
276 _ => channel,
277 };
278
279 let mut arg_map = serde_json::Map::new();
280 arg_map.insert(
281 "instType".to_string(),
282 serde_json::Value::String("SPOT".to_string()),
283 );
284 arg_map.insert(
285 "channel".to_string(),
286 serde_json::Value::String(bitget_channel.to_string()),
287 );
288 arg_map.insert(
289 "instId".to_string(),
290 serde_json::Value::String(symbol.to_string()),
291 );
292 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
293
294 let mut msg_map = serde_json::Map::new();
295 msg_map.insert(
296 "op".to_string(),
297 serde_json::Value::String("unsubscribe".to_string()),
298 );
299 msg_map.insert("args".to_string(), args);
300 let msg = serde_json::Value::Object(msg_map);
301
302 self.client.send_json(&msg).await?;
303
304 let mut subs = self.subscriptions.write().await;
306 subs.retain(|s| s != &stream_name);
307
308 Ok(())
309 }
310
311 async fn unsubscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
313 let channel = format!("candle{}", interval);
314
315 let mut arg_map = serde_json::Map::new();
316 arg_map.insert(
317 "instType".to_string(),
318 serde_json::Value::String("SPOT".to_string()),
319 );
320 arg_map.insert(
321 "channel".to_string(),
322 serde_json::Value::String(channel.clone()),
323 );
324 arg_map.insert(
325 "instId".to_string(),
326 serde_json::Value::String(symbol.to_string()),
327 );
328 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
329
330 let mut msg_map = serde_json::Map::new();
331 msg_map.insert(
332 "op".to_string(),
333 serde_json::Value::String("unsubscribe".to_string()),
334 );
335 msg_map.insert("args".to_string(), args);
336 let msg = serde_json::Value::Object(msg_map);
337
338 self.client.send_json(&msg).await?;
339
340 let sub_key = format!("kline:{}:{}", symbol, interval);
341 let mut subs = self.subscriptions.write().await;
342 subs.retain(|s| s != &sub_key);
343
344 Ok(())
345 }
346
347 pub async fn subscriptions(&self) -> Vec<String> {
349 self.subscriptions.read().await.clone()
350 }
351
352 pub async fn watch_ticker(
385 &self,
386 symbol: &str,
387 market: Option<Market>,
388 ) -> Result<MessageStream<Ticker>> {
389 if !self.is_connected().await {
391 self.connect().await?;
392 }
393
394 self.subscribe_ticker(symbol).await?;
396
397 let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
399 let symbol_owned = symbol.to_string();
400 let client = Arc::clone(&self.client);
401
402 tokio::spawn(async move {
404 while let Some(msg) = client.receive().await {
405 if is_ticker_message(&msg, &symbol_owned) {
407 match parse_ws_ticker(&msg, market.as_ref()) {
408 Ok(ticker) => {
409 if tx.send(Ok(ticker)).is_err() {
410 break; }
412 }
413 Err(e) => {
414 if tx.send(Err(e)).is_err() {
415 break;
416 }
417 }
418 }
419 }
420 }
421 });
422
423 Ok(Box::pin(ReceiverStream::new(rx)))
424 }
425
426 pub async fn watch_order_book(
459 &self,
460 symbol: &str,
461 limit: Option<u32>,
462 ) -> Result<MessageStream<OrderBook>> {
463 if !self.is_connected().await {
465 self.connect().await?;
466 }
467
468 let depth = limit.unwrap_or(15);
470 self.subscribe_orderbook(symbol, depth).await?;
471
472 let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
474 let symbol_owned = symbol.to_string();
475 let unified_symbol = format_unified_symbol(&symbol_owned);
476 let client = Arc::clone(&self.client);
477
478 tokio::spawn(async move {
480 while let Some(msg) = client.receive().await {
481 if is_orderbook_message(&msg, &symbol_owned) {
483 match parse_ws_orderbook(&msg, unified_symbol.clone()) {
484 Ok(orderbook) => {
485 if tx.send(Ok(orderbook)).is_err() {
486 break; }
488 }
489 Err(e) => {
490 if tx.send(Err(e)).is_err() {
491 break;
492 }
493 }
494 }
495 }
496 }
497 });
498
499 Ok(Box::pin(ReceiverStream::new(rx)))
500 }
501
502 pub async fn watch_trades(
539 &self,
540 symbol: &str,
541 market: Option<Market>,
542 ) -> Result<MessageStream<Vec<Trade>>> {
543 if !self.is_connected().await {
545 self.connect().await?;
546 }
547
548 self.subscribe_trades(symbol).await?;
550
551 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
553 let symbol_owned = symbol.to_string();
554 let client = Arc::clone(&self.client);
555
556 tokio::spawn(async move {
558 while let Some(msg) = client.receive().await {
559 if is_trade_message(&msg, &symbol_owned) {
561 match parse_ws_trades(&msg, market.as_ref()) {
562 Ok(trades) => {
563 if tx.send(Ok(trades)).is_err() {
564 break; }
566 }
567 Err(e) => {
568 if tx.send(Err(e)).is_err() {
569 break;
570 }
571 }
572 }
573 }
574 }
575 });
576
577 Ok(Box::pin(ReceiverStream::new(rx)))
578 }
579}
580
581struct ReceiverStream<T> {
587 receiver: mpsc::UnboundedReceiver<T>,
588}
589
590impl<T> ReceiverStream<T> {
591 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
592 Self { receiver }
593 }
594}
595
596impl<T> Stream for ReceiverStream<T> {
597 type Item = T;
598
599 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
600 self.receiver.poll_recv(cx)
601 }
602}
603
604fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
610 if let Some(arg) = msg.get("arg") {
611 let channel = arg.get("channel").and_then(|c| c.as_str());
612 let inst_id = arg.get("instId").and_then(|i| i.as_str());
613
614 channel == Some("ticker") && inst_id == Some(symbol)
615 } else {
616 false
617 }
618}
619
620fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
622 if let Some(arg) = msg.get("arg") {
623 let channel = arg.get("channel").and_then(|c| c.as_str());
624 let inst_id = arg.get("instId").and_then(|i| i.as_str());
625
626 let is_orderbook_channel = channel.map(|c| c.starts_with("books")).unwrap_or(false);
628 is_orderbook_channel && inst_id == Some(symbol)
629 } else {
630 false
631 }
632}
633
634fn is_trade_message(msg: &Value, symbol: &str) -> bool {
636 if let Some(arg) = msg.get("arg") {
637 let channel = arg.get("channel").and_then(|c| c.as_str());
638 let inst_id = arg.get("instId").and_then(|i| i.as_str());
639
640 channel == Some("trade") && inst_id == Some(symbol)
641 } else {
642 false
643 }
644}
645
646fn format_unified_symbol(symbol: &str) -> String {
648 let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
650
651 for quote in "e_currencies {
652 if symbol.ends_with(quote) {
653 let base = &symbol[..symbol.len() - quote.len()];
654 if !base.is_empty() {
655 return format!("{}/{}", base, quote);
656 }
657 }
658 }
659
660 symbol.to_string()
662}
663
664pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
670 let data = msg
673 .get("data")
674 .and_then(|d| d.as_array())
675 .and_then(|arr| arr.first())
676 .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
677
678 parse_ticker(data, market)
679}
680
681pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
683 let data = msg
686 .get("data")
687 .and_then(|d| d.as_array())
688 .and_then(|arr| arr.first())
689 .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
690
691 parse_orderbook(data, symbol)
692}
693
694pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
696 let data = msg
699 .get("data")
700 .and_then(|d| d.as_array())
701 .and_then(|arr| arr.first())
702 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
703
704 parse_trade(data, market)
705}
706
707pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
709 let data_array = msg
712 .get("data")
713 .and_then(|d| d.as_array())
714 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
715
716 let mut trades = Vec::with_capacity(data_array.len());
717 for data in data_array {
718 trades.push(parse_trade(data, market)?);
719 }
720
721 Ok(trades)
722}
723
724#[cfg(test)]
725mod tests {
726 use super::*;
727 use ccxt_core::types::financial::Price;
728 use rust_decimal_macros::dec;
729
730 #[test]
731 fn test_bitget_ws_creation() {
732 let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
733 assert!(ws.subscriptions.try_read().is_ok());
735 }
736
737 #[tokio::test]
738 async fn test_subscriptions_empty_by_default() {
739 let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
740 let subs = ws.subscriptions().await;
741 assert!(subs.is_empty());
742 }
743
744 #[test]
747 fn test_parse_ws_ticker_snapshot() {
748 let msg = serde_json::from_str(
749 r#"{
750 "action": "snapshot",
751 "arg": {
752 "instType": "SPOT",
753 "channel": "ticker",
754 "instId": "BTCUSDT"
755 },
756 "data": [{
757 "instId": "BTCUSDT",
758 "lastPr": "50000.00",
759 "high24h": "51000.00",
760 "low24h": "49000.00",
761 "bidPr": "49999.00",
762 "askPr": "50001.00",
763 "baseVolume": "1000.5",
764 "ts": "1700000000000"
765 }]
766 }"#,
767 )
768 .unwrap();
769
770 let ticker = parse_ws_ticker(&msg, None).unwrap();
771 assert_eq!(ticker.symbol, "BTCUSDT");
772 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
773 assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
774 assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
775 assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
776 assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
777 assert_eq!(ticker.timestamp, 1700000000000);
778 }
779
780 #[test]
781 fn test_parse_ws_ticker_with_market() {
782 let msg = serde_json::from_str(
783 r#"{
784 "action": "snapshot",
785 "arg": {
786 "instType": "SPOT",
787 "channel": "ticker",
788 "instId": "BTCUSDT"
789 },
790 "data": [{
791 "instId": "BTCUSDT",
792 "lastPr": "50000.00",
793 "ts": "1700000000000"
794 }]
795 }"#,
796 )
797 .unwrap();
798
799 let market = Market {
800 id: "BTCUSDT".to_string(),
801 symbol: "BTC/USDT".to_string(),
802 base: "BTC".to_string(),
803 quote: "USDT".to_string(),
804 ..Default::default()
805 };
806
807 let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
808 assert_eq!(ticker.symbol, "BTC/USDT");
809 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
810 }
811
812 #[test]
813 fn test_parse_ws_ticker_missing_data() {
814 let msg = serde_json::from_str(
815 r#"{
816 "action": "snapshot",
817 "arg": {
818 "instType": "SPOT",
819 "channel": "ticker",
820 "instId": "BTCUSDT"
821 }
822 }"#,
823 )
824 .unwrap();
825
826 let result = parse_ws_ticker(&msg, None);
827 assert!(result.is_err());
828 }
829
830 #[test]
831 fn test_parse_ws_ticker_empty_data_array() {
832 let msg = serde_json::from_str(
833 r#"{
834 "action": "snapshot",
835 "arg": {
836 "instType": "SPOT",
837 "channel": "ticker",
838 "instId": "BTCUSDT"
839 },
840 "data": []
841 }"#,
842 )
843 .unwrap();
844
845 let result = parse_ws_ticker(&msg, None);
846 assert!(result.is_err());
847 }
848
849 #[test]
852 fn test_parse_ws_orderbook_snapshot() {
853 let msg = serde_json::from_str(
854 r#"{
855 "action": "snapshot",
856 "arg": {
857 "instType": "SPOT",
858 "channel": "books5",
859 "instId": "BTCUSDT"
860 },
861 "data": [{
862 "bids": [
863 ["50000.00", "1.5"],
864 ["49999.00", "2.0"],
865 ["49998.00", "0.5"]
866 ],
867 "asks": [
868 ["50001.00", "1.0"],
869 ["50002.00", "3.0"],
870 ["50003.00", "2.5"]
871 ],
872 "ts": "1700000000000"
873 }]
874 }"#,
875 )
876 .unwrap();
877
878 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
879 assert_eq!(orderbook.symbol, "BTC/USDT");
880 assert_eq!(orderbook.bids.len(), 3);
881 assert_eq!(orderbook.asks.len(), 3);
882
883 assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
885 assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
886 assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
887
888 assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
890 assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
891 assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
892 }
893
894 #[test]
895 fn test_parse_ws_orderbook_update() {
896 let msg = serde_json::from_str(
897 r#"{
898 "action": "update",
899 "arg": {
900 "instType": "SPOT",
901 "channel": "books",
902 "instId": "ETHUSDT"
903 },
904 "data": [{
905 "bids": [
906 ["2000.00", "10.0"]
907 ],
908 "asks": [
909 ["2001.00", "5.0"]
910 ],
911 "ts": "1700000000001"
912 }]
913 }"#,
914 )
915 .unwrap();
916
917 let orderbook = parse_ws_orderbook(&msg, "ETH/USDT".to_string()).unwrap();
918 assert_eq!(orderbook.symbol, "ETH/USDT");
919 assert_eq!(orderbook.bids.len(), 1);
920 assert_eq!(orderbook.asks.len(), 1);
921 assert_eq!(orderbook.timestamp, 1700000000001);
922 }
923
924 #[test]
925 fn test_parse_ws_orderbook_missing_data() {
926 let msg = serde_json::from_str(
927 r#"{
928 "action": "snapshot",
929 "arg": {
930 "instType": "SPOT",
931 "channel": "books5",
932 "instId": "BTCUSDT"
933 }
934 }"#,
935 )
936 .unwrap();
937
938 let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
939 assert!(result.is_err());
940 }
941
942 #[test]
943 fn test_parse_ws_orderbook_empty_sides() {
944 let msg = serde_json::from_str(
945 r#"{
946 "action": "snapshot",
947 "arg": {
948 "instType": "SPOT",
949 "channel": "books5",
950 "instId": "BTCUSDT"
951 },
952 "data": [{
953 "bids": [],
954 "asks": [],
955 "ts": "1700000000000"
956 }]
957 }"#,
958 )
959 .unwrap();
960
961 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
962 assert!(orderbook.bids.is_empty());
963 assert!(orderbook.asks.is_empty());
964 }
965
966 #[test]
969 fn test_parse_ws_trade_single() {
970 let msg = serde_json::from_str(
971 r#"{
972 "action": "snapshot",
973 "arg": {
974 "instType": "SPOT",
975 "channel": "trade",
976 "instId": "BTCUSDT"
977 },
978 "data": [{
979 "tradeId": "123456789",
980 "symbol": "BTCUSDT",
981 "side": "buy",
982 "price": "50000.00",
983 "size": "0.5",
984 "ts": "1700000000000"
985 }]
986 }"#,
987 )
988 .unwrap();
989
990 let trade = parse_ws_trade(&msg, None).unwrap();
991 assert_eq!(trade.id, Some("123456789".to_string()));
992 assert_eq!(trade.side, ccxt_core::types::OrderSide::Buy);
993 assert_eq!(trade.price, Price::new(dec!(50000.00)));
994 assert_eq!(
995 trade.amount,
996 ccxt_core::types::financial::Amount::new(dec!(0.5))
997 );
998 assert_eq!(trade.timestamp, 1700000000000);
999 }
1000
1001 #[test]
1002 fn test_parse_ws_trades_multiple() {
1003 let msg = serde_json::from_str(
1004 r#"{
1005 "action": "snapshot",
1006 "arg": {
1007 "instType": "SPOT",
1008 "channel": "trade",
1009 "instId": "BTCUSDT"
1010 },
1011 "data": [
1012 {
1013 "tradeId": "123456789",
1014 "symbol": "BTCUSDT",
1015 "side": "buy",
1016 "price": "50000.00",
1017 "size": "0.5",
1018 "ts": "1700000000000"
1019 },
1020 {
1021 "tradeId": "123456790",
1022 "symbol": "BTCUSDT",
1023 "side": "sell",
1024 "price": "50001.00",
1025 "size": "1.0",
1026 "ts": "1700000000001"
1027 }
1028 ]
1029 }"#,
1030 )
1031 .unwrap();
1032
1033 let trades = parse_ws_trades(&msg, None).unwrap();
1034 assert_eq!(trades.len(), 2);
1035
1036 assert_eq!(trades[0].id, Some("123456789".to_string()));
1037 assert_eq!(trades[0].side, ccxt_core::types::OrderSide::Buy);
1038
1039 assert_eq!(trades[1].id, Some("123456790".to_string()));
1040 assert_eq!(trades[1].side, ccxt_core::types::OrderSide::Sell);
1041 }
1042
1043 #[test]
1044 fn test_parse_ws_trade_sell_side() {
1045 let msg = serde_json::from_str(
1046 r#"{
1047 "action": "snapshot",
1048 "arg": {
1049 "instType": "SPOT",
1050 "channel": "trade",
1051 "instId": "BTCUSDT"
1052 },
1053 "data": [{
1054 "tradeId": "123456789",
1055 "symbol": "BTCUSDT",
1056 "side": "sell",
1057 "price": "50000.00",
1058 "size": "0.5",
1059 "ts": "1700000000000"
1060 }]
1061 }"#,
1062 )
1063 .unwrap();
1064
1065 let trade = parse_ws_trade(&msg, None).unwrap();
1066 assert_eq!(trade.side, ccxt_core::types::OrderSide::Sell);
1067 }
1068
1069 #[test]
1070 fn test_parse_ws_trade_missing_data() {
1071 let msg = serde_json::from_str(
1072 r#"{
1073 "action": "snapshot",
1074 "arg": {
1075 "instType": "SPOT",
1076 "channel": "trade",
1077 "instId": "BTCUSDT"
1078 }
1079 }"#,
1080 )
1081 .unwrap();
1082
1083 let result = parse_ws_trade(&msg, None);
1084 assert!(result.is_err());
1085 }
1086
1087 #[test]
1088 fn test_parse_ws_trades_empty_array() {
1089 let msg = serde_json::from_str(
1090 r#"{
1091 "action": "snapshot",
1092 "arg": {
1093 "instType": "SPOT",
1094 "channel": "trade",
1095 "instId": "BTCUSDT"
1096 },
1097 "data": []
1098 }"#,
1099 )
1100 .unwrap();
1101
1102 let trades = parse_ws_trades(&msg, None).unwrap();
1103 assert!(trades.is_empty());
1104 }
1105
1106 #[test]
1109 fn test_is_ticker_message_true() {
1110 let msg = serde_json::from_str(
1111 r#"{
1112 "action": "snapshot",
1113 "arg": {
1114 "instType": "SPOT",
1115 "channel": "ticker",
1116 "instId": "BTCUSDT"
1117 },
1118 "data": [{}]
1119 }"#,
1120 )
1121 .unwrap();
1122
1123 assert!(is_ticker_message(&msg, "BTCUSDT"));
1124 }
1125
1126 #[test]
1127 fn test_is_ticker_message_wrong_symbol() {
1128 let msg = serde_json::from_str(
1129 r#"{
1130 "action": "snapshot",
1131 "arg": {
1132 "instType": "SPOT",
1133 "channel": "ticker",
1134 "instId": "ETHUSDT"
1135 },
1136 "data": [{}]
1137 }"#,
1138 )
1139 .unwrap();
1140
1141 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1142 }
1143
1144 #[test]
1145 fn test_is_ticker_message_wrong_channel() {
1146 let msg = serde_json::from_str(
1147 r#"{
1148 "action": "snapshot",
1149 "arg": {
1150 "instType": "SPOT",
1151 "channel": "trade",
1152 "instId": "BTCUSDT"
1153 },
1154 "data": [{}]
1155 }"#,
1156 )
1157 .unwrap();
1158
1159 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1160 }
1161
1162 #[test]
1163 fn test_is_orderbook_message_books5() {
1164 let msg = serde_json::from_str(
1165 r#"{
1166 "arg": {
1167 "instType": "SPOT",
1168 "channel": "books5",
1169 "instId": "BTCUSDT"
1170 }
1171 }"#,
1172 )
1173 .unwrap();
1174
1175 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1176 }
1177
1178 #[test]
1179 fn test_is_orderbook_message_books15() {
1180 let msg = serde_json::from_str(
1181 r#"{
1182 "arg": {
1183 "instType": "SPOT",
1184 "channel": "books15",
1185 "instId": "BTCUSDT"
1186 }
1187 }"#,
1188 )
1189 .unwrap();
1190
1191 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1192 }
1193
1194 #[test]
1195 fn test_is_orderbook_message_books() {
1196 let msg = serde_json::from_str(
1197 r#"{
1198 "arg": {
1199 "instType": "SPOT",
1200 "channel": "books",
1201 "instId": "BTCUSDT"
1202 }
1203 }"#,
1204 )
1205 .unwrap();
1206
1207 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1208 }
1209
1210 #[test]
1211 fn test_is_trade_message_true() {
1212 let msg = serde_json::from_str(
1213 r#"{
1214 "arg": {
1215 "instType": "SPOT",
1216 "channel": "trade",
1217 "instId": "BTCUSDT"
1218 }
1219 }"#,
1220 )
1221 .unwrap();
1222
1223 assert!(is_trade_message(&msg, "BTCUSDT"));
1224 }
1225
1226 #[test]
1227 fn test_is_trade_message_wrong_channel() {
1228 let msg = serde_json::from_str(
1229 r#"{
1230 "arg": {
1231 "instType": "SPOT",
1232 "channel": "ticker",
1233 "instId": "BTCUSDT"
1234 }
1235 }"#,
1236 )
1237 .unwrap();
1238
1239 assert!(!is_trade_message(&msg, "BTCUSDT"));
1240 }
1241
1242 #[test]
1245 fn test_format_unified_symbol_usdt() {
1246 assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1247 assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1248 }
1249
1250 #[test]
1251 fn test_format_unified_symbol_usdc() {
1252 assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1253 }
1254
1255 #[test]
1256 fn test_format_unified_symbol_btc() {
1257 assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1258 }
1259
1260 #[test]
1261 fn test_format_unified_symbol_unknown() {
1262 assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1264 }
1265}