1use crate::bybit::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19const DEFAULT_PING_INTERVAL_MS: u64 = 20000;
22
23const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
25
26const MAX_RECONNECT_ATTEMPTS: u32 = 10;
28
29pub struct BybitWs {
33 client: Arc<WsClient>,
35 subscriptions: Arc<RwLock<Vec<String>>>,
37}
38
39impl BybitWs {
40 pub fn new(url: String) -> Self {
46 let config = WsConfig {
47 url: url.clone(),
48 connect_timeout: 10000,
49 ping_interval: DEFAULT_PING_INTERVAL_MS,
50 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
51 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
52 auto_reconnect: true,
53 enable_compression: false,
54 pong_timeout: 90000,
55 ..Default::default()
56 };
57
58 Self {
59 client: Arc::new(WsClient::new(config)),
60 subscriptions: Arc::new(RwLock::new(Vec::new())),
61 }
62 }
63
64 pub async fn connect(&self) -> Result<()> {
66 self.client.connect().await
67 }
68
69 pub async fn disconnect(&self) -> Result<()> {
71 self.client.disconnect().await
72 }
73
74 pub fn state(&self) -> WsConnectionState {
76 self.client.state()
77 }
78
79 pub fn is_connected(&self) -> bool {
81 self.client.is_connected()
82 }
83
84 pub async fn receive(&self) -> Option<Value> {
86 self.client.receive().await
87 }
88
89 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
95 let topic = format!("tickers.{}", symbol);
97
98 #[allow(clippy::disallowed_methods)]
100 let msg = serde_json::json!({
101 "op": "subscribe",
102 "args": [topic]
103 });
104
105 self.client.send_json(&msg).await?;
106
107 let sub_key = format!("ticker:{}", symbol);
108 self.subscriptions.write().await.push(sub_key);
109
110 Ok(())
111 }
112
113 pub async fn subscribe_tickers(&self, symbols: &[String]) -> Result<()> {
119 let topics: Vec<String> = symbols.iter().map(|s| format!("tickers.{}", s)).collect();
120
121 #[allow(clippy::disallowed_methods)]
123 let msg = serde_json::json!({
124 "op": "subscribe",
125 "args": topics
126 });
127
128 self.client.send_json(&msg).await?;
129
130 let mut subs = self.subscriptions.write().await;
131 for symbol in symbols {
132 subs.push(format!("ticker:{}", symbol));
133 }
134
135 Ok(())
136 }
137
138 pub async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
150 if !self.is_connected() {
152 self.connect().await?;
153 }
154
155 self.subscribe_tickers(symbols).await?;
157
158 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Ticker>>>();
160 let symbols_owned: Vec<String> = symbols.to_vec();
161 let client = Arc::clone(&self.client);
162
163 tokio::spawn(async move {
165 while let Some(msg) = client.receive().await {
166 if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
168 if let Some(symbol_part) = topic.strip_prefix("tickers.") {
169 if symbols_owned.iter().any(|s| s == symbol_part) {
170 match parse_ws_ticker(&msg, None) {
171 Ok(ticker) => {
172 if tx.send(Ok(vec![ticker])).is_err() {
173 break; }
175 }
176 Err(e) => {
177 if tx.send(Err(e)).is_err() {
178 break;
179 }
180 }
181 }
182 }
183 }
184 }
185 }
186 });
187
188 Ok(Box::pin(ReceiverStream::new(rx)))
189 }
190 pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
192 let actual_depth = match depth {
194 1 => 1,
195 d if d <= 50 => 50,
196 d if d <= 200 => 200,
197 _ => 500,
198 };
199
200 let topic = format!("orderbook.{}.{}", actual_depth, symbol);
201
202 #[allow(clippy::disallowed_methods)]
204 let msg = serde_json::json!({
205 "op": "subscribe",
206 "args": [topic]
207 });
208
209 self.client.send_json(&msg).await?;
210
211 let sub_key = format!("orderbook:{}", symbol);
212 self.subscriptions.write().await.push(sub_key);
213
214 Ok(())
215 }
216
217 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
223 let topic = format!("publicTrade.{}", symbol);
224
225 #[allow(clippy::disallowed_methods)]
227 let msg = serde_json::json!({
228 "op": "subscribe",
229 "args": [topic]
230 });
231
232 self.client.send_json(&msg).await?;
233
234 let sub_key = format!("trades:{}", symbol);
235 self.subscriptions.write().await.push(sub_key);
236
237 Ok(())
238 }
239
240 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
247 let topic = format!("kline.{}.{}", interval, symbol);
248
249 #[allow(clippy::disallowed_methods)]
251 let msg = serde_json::json!({
252 "op": "subscribe",
253 "args": [topic]
254 });
255
256 self.client.send_json(&msg).await?;
257
258 let sub_key = format!("kline:{}:{}", symbol, interval);
259 self.subscriptions.write().await.push(sub_key);
260
261 Ok(())
262 }
263
264 pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
270 let parts: Vec<&str> = stream_name.split(':').collect();
272 if parts.len() < 2 {
273 return Err(Error::invalid_request(format!(
274 "Invalid stream name: {}",
275 stream_name
276 )));
277 }
278
279 let channel = parts[0];
280 let symbol = parts[1];
281
282 let topic = match channel {
283 "ticker" => format!("tickers.{}", symbol),
284 "orderbook" => format!("orderbook.50.{}", symbol),
285 "trades" => format!("publicTrade.{}", symbol),
286 "kline" => {
287 if parts.len() >= 3 {
288 format!("kline.{}.{}", parts[2], symbol)
289 } else {
290 return Err(Error::invalid_request(
291 "Kline unsubscribe requires interval",
292 ));
293 }
294 }
295 _ => {
296 return Err(Error::invalid_request(format!(
297 "Unknown channel: {}",
298 channel
299 )));
300 }
301 };
302
303 #[allow(clippy::disallowed_methods)]
305 let msg = serde_json::json!({
306 "op": "unsubscribe",
307 "args": [topic]
308 });
309
310 self.client.send_json(&msg).await?;
311
312 let mut subs = self.subscriptions.write().await;
314 subs.retain(|s| s != &stream_name);
315
316 Ok(())
317 }
318
319 pub async fn subscriptions(&self) -> Vec<String> {
321 self.subscriptions.read().await.clone()
322 }
323
324 pub async fn watch_ticker(
357 &self,
358 symbol: &str,
359 market: Option<Market>,
360 ) -> Result<MessageStream<Ticker>> {
361 if !self.is_connected() {
363 self.connect().await?;
364 }
365
366 self.subscribe_ticker(symbol).await?;
368
369 let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
371 let symbol_owned = symbol.to_string();
372 let client = Arc::clone(&self.client);
373
374 tokio::spawn(async move {
376 while let Some(msg) = client.receive().await {
377 if is_ticker_message(&msg, &symbol_owned) {
379 match parse_ws_ticker(&msg, market.as_ref()) {
380 Ok(ticker) => {
381 if tx.send(Ok(ticker)).is_err() {
382 break; }
384 }
385 Err(e) => {
386 if tx.send(Err(e)).is_err() {
387 break;
388 }
389 }
390 }
391 }
392 }
393 });
394
395 Ok(Box::pin(ReceiverStream::new(rx)))
396 }
397
398 pub async fn watch_order_book(
431 &self,
432 symbol: &str,
433 limit: Option<u32>,
434 ) -> Result<MessageStream<OrderBook>> {
435 if !self.is_connected() {
437 self.connect().await?;
438 }
439
440 let depth = limit.unwrap_or(50);
442 self.subscribe_orderbook(symbol, depth).await?;
443
444 let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
446 let symbol_owned = symbol.to_string();
447 let unified_symbol = format_unified_symbol(&symbol_owned);
448 let client = Arc::clone(&self.client);
449
450 tokio::spawn(async move {
452 while let Some(msg) = client.receive().await {
453 if is_orderbook_message(&msg, &symbol_owned) {
455 match parse_ws_orderbook(&msg, unified_symbol.clone()) {
456 Ok(orderbook) => {
457 if tx.send(Ok(orderbook)).is_err() {
458 break; }
460 }
461 Err(e) => {
462 if tx.send(Err(e)).is_err() {
463 break;
464 }
465 }
466 }
467 }
468 }
469 });
470
471 Ok(Box::pin(ReceiverStream::new(rx)))
472 }
473
474 pub async fn watch_trades(
511 &self,
512 symbol: &str,
513 market: Option<Market>,
514 ) -> Result<MessageStream<Vec<Trade>>> {
515 if !self.is_connected() {
517 self.connect().await?;
518 }
519
520 self.subscribe_trades(symbol).await?;
522
523 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
525 let symbol_owned = symbol.to_string();
526 let client = Arc::clone(&self.client);
527
528 tokio::spawn(async move {
530 while let Some(msg) = client.receive().await {
531 if is_trade_message(&msg, &symbol_owned) {
533 match parse_ws_trades(&msg, market.as_ref()) {
534 Ok(trades) => {
535 if tx.send(Ok(trades)).is_err() {
536 break; }
538 }
539 Err(e) => {
540 if tx.send(Err(e)).is_err() {
541 break;
542 }
543 }
544 }
545 }
546 }
547 });
548
549 Ok(Box::pin(ReceiverStream::new(rx)))
550 }
551}
552
553struct ReceiverStream<T> {
559 receiver: mpsc::UnboundedReceiver<T>,
560}
561
562impl<T> ReceiverStream<T> {
563 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
564 Self { receiver }
565 }
566}
567
568impl<T> Stream for ReceiverStream<T> {
569 type Item = T;
570
571 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
572 self.receiver.poll_recv(cx)
573 }
574}
575
576fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
582 if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
585 let expected_topic = format!("tickers.{}", symbol);
586 topic == expected_topic
587 } else {
588 false
589 }
590}
591
592fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
594 if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
597 topic.starts_with("orderbook.") && topic.ends_with(symbol)
599 } else {
600 false
601 }
602}
603
604fn is_trade_message(msg: &Value, symbol: &str) -> bool {
606 if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
609 let expected_topic = format!("publicTrade.{}", symbol);
610 topic == expected_topic
611 } else {
612 false
613 }
614}
615
616fn format_unified_symbol(symbol: &str) -> String {
618 let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
620
621 for quote in "e_currencies {
622 if let Some(base) = symbol.strip_suffix(quote) {
623 if !base.is_empty() {
624 return format!("{}/{}", base, quote);
625 }
626 }
627 }
628
629 symbol.to_string()
631}
632
633pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
639 let data = msg
642 .get("data")
643 .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
644
645 parse_ticker(data, market)
646}
647
648pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
650 let data = msg
653 .get("data")
654 .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
655
656 parse_orderbook(data, symbol)
657}
658
659pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
661 let data = msg
664 .get("data")
665 .and_then(|d| d.as_array())
666 .and_then(|arr| arr.first())
667 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
668
669 parse_trade(data, market)
670}
671
672pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
674 let data_array = msg
677 .get("data")
678 .and_then(|d| d.as_array())
679 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
680
681 let mut trades = Vec::with_capacity(data_array.len());
682 for data in data_array {
683 trades.push(parse_trade(data, market)?);
684 }
685
686 Ok(trades)
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692 use ccxt_core::types::financial::Price;
693 use rust_decimal_macros::dec;
694
695 #[test]
696 fn test_bybit_ws_creation() {
697 let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
698 assert!(ws.subscriptions.try_read().is_ok());
700 }
701
702 #[tokio::test]
703 async fn test_subscriptions_empty_by_default() {
704 let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
705 let subs = ws.subscriptions().await;
706 assert!(subs.is_empty());
707 }
708
709 #[test]
712 fn test_parse_ws_ticker_snapshot() {
713 let msg = serde_json::from_str(
714 r#"{
715 "topic": "tickers.BTCUSDT",
716 "type": "snapshot",
717 "data": {
718 "symbol": "BTCUSDT",
719 "lastPrice": "50000.00",
720 "highPrice24h": "51000.00",
721 "lowPrice24h": "49000.00",
722 "bid1Price": "49999.00",
723 "ask1Price": "50001.00",
724 "volume24h": "1000.5",
725 "time": "1700000000000"
726 },
727 "ts": 1700000000000
728 }"#,
729 )
730 .unwrap();
731
732 let ticker = parse_ws_ticker(&msg, None).unwrap();
733 assert_eq!(ticker.symbol, "BTCUSDT");
734 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
735 assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
736 assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
737 assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
738 assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
739 assert_eq!(ticker.timestamp, 1700000000000);
740 }
741
742 #[test]
743 fn test_parse_ws_ticker_with_market() {
744 let msg = serde_json::from_str(
745 r#"{
746 "topic": "tickers.BTCUSDT",
747 "type": "snapshot",
748 "data": {
749 "symbol": "BTCUSDT",
750 "lastPrice": "50000.00",
751 "time": "1700000000000"
752 },
753 "ts": 1700000000000
754 }"#,
755 )
756 .unwrap();
757
758 let market = Market {
759 id: "BTCUSDT".to_string(),
760 symbol: "BTC/USDT".to_string(),
761 base: "BTC".to_string(),
762 quote: "USDT".to_string(),
763 ..Default::default()
764 };
765
766 let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
767 assert_eq!(ticker.symbol, "BTC/USDT");
768 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
769 }
770
771 #[test]
772 fn test_parse_ws_ticker_missing_data() {
773 let msg = serde_json::from_str(
774 r#"{
775 "topic": "tickers.BTCUSDT",
776 "type": "snapshot",
777 "ts": 1700000000000
778 }"#,
779 )
780 .unwrap();
781
782 let result = parse_ws_ticker(&msg, None);
783 assert!(result.is_err());
784 }
785
786 #[test]
789 fn test_parse_ws_orderbook_snapshot() {
790 let msg = serde_json::from_str(
791 r#"{
792 "topic": "orderbook.50.BTCUSDT",
793 "type": "snapshot",
794 "data": {
795 "s": "BTCUSDT",
796 "b": [
797 ["50000.00", "1.5"],
798 ["49999.00", "2.0"],
799 ["49998.00", "0.5"]
800 ],
801 "a": [
802 ["50001.00", "1.0"],
803 ["50002.00", "3.0"],
804 ["50003.00", "2.5"]
805 ],
806 "u": 12345,
807 "seq": 67890,
808 "ts": "1700000000000"
809 },
810 "ts": 1700000000000
811 }"#,
812 )
813 .unwrap();
814
815 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
816 assert_eq!(orderbook.symbol, "BTC/USDT");
817 assert_eq!(orderbook.bids.len(), 3);
818 assert_eq!(orderbook.asks.len(), 3);
819
820 assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
822 assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
823 assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
824
825 assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
827 assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
828 assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
829 }
830
831 #[test]
832 fn test_parse_ws_orderbook_missing_data() {
833 let msg = serde_json::from_str(
834 r#"{
835 "topic": "orderbook.50.BTCUSDT",
836 "type": "snapshot",
837 "ts": 1700000000000
838 }"#,
839 )
840 .unwrap();
841
842 let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
843 assert!(result.is_err());
844 }
845
846 #[test]
847 fn test_parse_ws_orderbook_empty_sides() {
848 let msg = serde_json::from_str(
849 r#"{
850 "topic": "orderbook.50.BTCUSDT",
851 "type": "snapshot",
852 "data": {
853 "s": "BTCUSDT",
854 "b": [],
855 "a": [],
856 "u": 12345,
857 "ts": "1700000000000"
858 },
859 "ts": 1700000000000
860 }"#,
861 )
862 .unwrap();
863
864 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
865 assert!(orderbook.bids.is_empty());
866 assert!(orderbook.asks.is_empty());
867 }
868
869 #[test]
872 fn test_parse_ws_trade_single() {
873 let msg = serde_json::from_str(
874 r#"{
875 "topic": "publicTrade.BTCUSDT",
876 "type": "snapshot",
877 "data": [{
878 "i": "123456789",
879 "T": 1700000000000,
880 "p": "50000.00",
881 "v": "0.5",
882 "S": "Buy",
883 "s": "BTCUSDT"
884 }],
885 "ts": 1700000000000
886 }"#,
887 )
888 .unwrap();
889
890 let trade = parse_ws_trade(&msg, None).unwrap();
891 assert_eq!(trade.timestamp, 1700000000000);
893 }
894
895 #[test]
896 fn test_parse_ws_trades_multiple() {
897 let msg = serde_json::from_str(
898 r#"{
899 "topic": "publicTrade.BTCUSDT",
900 "type": "snapshot",
901 "data": [
902 {
903 "i": "123456789",
904 "T": 1700000000000,
905 "p": "50000.00",
906 "v": "0.5",
907 "S": "Buy",
908 "s": "BTCUSDT",
909 "side": "Buy",
910 "price": "50000.00",
911 "size": "0.5",
912 "time": "1700000000000"
913 },
914 {
915 "i": "123456790",
916 "T": 1700000000001,
917 "p": "50001.00",
918 "v": "1.0",
919 "S": "Sell",
920 "s": "BTCUSDT",
921 "side": "Sell",
922 "price": "50001.00",
923 "size": "1.0",
924 "time": "1700000000001"
925 }
926 ],
927 "ts": 1700000000001
928 }"#,
929 )
930 .unwrap();
931
932 let trades = parse_ws_trades(&msg, None).unwrap();
933 assert_eq!(trades.len(), 2);
934 }
935
936 #[test]
937 fn test_parse_ws_trade_missing_data() {
938 let msg = serde_json::from_str(
939 r#"{
940 "topic": "publicTrade.BTCUSDT",
941 "type": "snapshot",
942 "ts": 1700000000000
943 }"#,
944 )
945 .unwrap();
946
947 let result = parse_ws_trade(&msg, None);
948 assert!(result.is_err());
949 }
950
951 #[test]
952 fn test_parse_ws_trades_empty_array() {
953 let msg = serde_json::from_str(
954 r#"{
955 "topic": "publicTrade.BTCUSDT",
956 "type": "snapshot",
957 "data": [],
958 "ts": 1700000000000
959 }"#,
960 )
961 .unwrap();
962
963 let trades = parse_ws_trades(&msg, None).unwrap();
964 assert!(trades.is_empty());
965 }
966
967 #[test]
970 fn test_is_ticker_message_true() {
971 let msg = serde_json::from_str(
972 r#"{
973 "topic": "tickers.BTCUSDT",
974 "type": "snapshot",
975 "data": {},
976 "ts": 1700000000000
977 }"#,
978 )
979 .unwrap();
980
981 assert!(is_ticker_message(&msg, "BTCUSDT"));
982 }
983
984 #[test]
985 fn test_is_ticker_message_wrong_symbol() {
986 let msg = serde_json::from_str(
987 r#"{
988 "topic": "tickers.ETHUSDT",
989 "type": "snapshot",
990 "data": {},
991 "ts": 1700000000000
992 }"#,
993 )
994 .unwrap();
995
996 assert!(!is_ticker_message(&msg, "BTCUSDT"));
997 }
998
999 #[test]
1000 fn test_is_ticker_message_wrong_topic() {
1001 let msg = serde_json::from_str(
1002 r#"{
1003 "topic": "publicTrade.BTCUSDT",
1004 "type": "snapshot",
1005 "data": [],
1006 "ts": 1700000000000
1007 }"#,
1008 )
1009 .unwrap();
1010
1011 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1012 }
1013
1014 #[test]
1015 fn test_is_orderbook_message_depth_50() {
1016 let msg = serde_json::from_str(
1017 r#"{
1018 "topic": "orderbook.50.BTCUSDT",
1019 "type": "snapshot",
1020 "data": {},
1021 "ts": 1700000000000
1022 }"#,
1023 )
1024 .unwrap();
1025
1026 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1027 }
1028
1029 #[test]
1030 fn test_is_orderbook_message_depth_200() {
1031 let msg = serde_json::from_str(
1032 r#"{
1033 "topic": "orderbook.200.BTCUSDT",
1034 "type": "snapshot",
1035 "data": {},
1036 "ts": 1700000000000
1037 }"#,
1038 )
1039 .unwrap();
1040
1041 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1042 }
1043
1044 #[test]
1045 fn test_is_trade_message_true() {
1046 let msg = serde_json::from_str(
1047 r#"{
1048 "topic": "publicTrade.BTCUSDT",
1049 "type": "snapshot",
1050 "data": [],
1051 "ts": 1700000000000
1052 }"#,
1053 )
1054 .unwrap();
1055
1056 assert!(is_trade_message(&msg, "BTCUSDT"));
1057 }
1058
1059 #[test]
1060 fn test_is_trade_message_wrong_topic() {
1061 let msg = serde_json::from_str(
1062 r#"{
1063 "topic": "tickers.BTCUSDT",
1064 "type": "snapshot",
1065 "data": {},
1066 "ts": 1700000000000
1067 }"#,
1068 )
1069 .unwrap();
1070
1071 assert!(!is_trade_message(&msg, "BTCUSDT"));
1072 }
1073
1074 #[test]
1077 fn test_format_unified_symbol_usdt() {
1078 assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1079 assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1080 }
1081
1082 #[test]
1083 fn test_format_unified_symbol_usdc() {
1084 assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1085 }
1086
1087 #[test]
1088 fn test_format_unified_symbol_btc() {
1089 assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1090 }
1091
1092 #[test]
1093 fn test_format_unified_symbol_unknown() {
1094 assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1096 }
1097}