1use crate::bybit::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19const DEFAULT_PING_INTERVAL_MS: u64 = 20000;
22
23const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
25
26const MAX_RECONNECT_ATTEMPTS: u32 = 10;
28
29pub struct BybitWs {
33 client: Arc<WsClient>,
35 subscriptions: Arc<RwLock<Vec<String>>>,
37}
38
39impl BybitWs {
40 pub fn new(url: String) -> Self {
46 let config = WsConfig {
47 url: url.clone(),
48 connect_timeout: 10000,
49 ping_interval: DEFAULT_PING_INTERVAL_MS,
50 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
51 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
52 auto_reconnect: true,
53 enable_compression: false,
54 pong_timeout: 90000,
55 ..Default::default()
56 };
57
58 Self {
59 client: Arc::new(WsClient::new(config)),
60 subscriptions: Arc::new(RwLock::new(Vec::new())),
61 }
62 }
63
64 pub async fn connect(&self) -> Result<()> {
66 self.client.connect().await
67 }
68
69 pub async fn disconnect(&self) -> Result<()> {
71 self.client.disconnect().await
72 }
73
74 pub fn state(&self) -> WsConnectionState {
76 self.client.state()
77 }
78
79 pub fn is_connected(&self) -> bool {
81 self.client.is_connected()
82 }
83
84 pub async fn receive(&self) -> Option<Value> {
86 self.client.receive().await
87 }
88
89 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
95 let topic = format!("tickers.{}", symbol);
97
98 #[allow(clippy::disallowed_methods)]
100 let msg = serde_json::json!({
101 "op": "subscribe",
102 "args": [topic]
103 });
104
105 self.client.send_json(&msg).await?;
106
107 let sub_key = format!("ticker:{}", symbol);
108 self.subscriptions.write().await.push(sub_key);
109
110 Ok(())
111 }
112
113 pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
120 let actual_depth = match depth {
122 1 => 1,
123 d if d <= 50 => 50,
124 d if d <= 200 => 200,
125 _ => 500,
126 };
127
128 let topic = format!("orderbook.{}.{}", actual_depth, symbol);
129
130 #[allow(clippy::disallowed_methods)]
132 let msg = serde_json::json!({
133 "op": "subscribe",
134 "args": [topic]
135 });
136
137 self.client.send_json(&msg).await?;
138
139 let sub_key = format!("orderbook:{}", symbol);
140 self.subscriptions.write().await.push(sub_key);
141
142 Ok(())
143 }
144
145 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
151 let topic = format!("publicTrade.{}", symbol);
152
153 #[allow(clippy::disallowed_methods)]
155 let msg = serde_json::json!({
156 "op": "subscribe",
157 "args": [topic]
158 });
159
160 self.client.send_json(&msg).await?;
161
162 let sub_key = format!("trades:{}", symbol);
163 self.subscriptions.write().await.push(sub_key);
164
165 Ok(())
166 }
167
168 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
175 let topic = format!("kline.{}.{}", interval, symbol);
176
177 #[allow(clippy::disallowed_methods)]
179 let msg = serde_json::json!({
180 "op": "subscribe",
181 "args": [topic]
182 });
183
184 self.client.send_json(&msg).await?;
185
186 let sub_key = format!("kline:{}:{}", symbol, interval);
187 self.subscriptions.write().await.push(sub_key);
188
189 Ok(())
190 }
191
192 pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
198 let parts: Vec<&str> = stream_name.split(':').collect();
200 if parts.len() < 2 {
201 return Err(Error::invalid_request(format!(
202 "Invalid stream name: {}",
203 stream_name
204 )));
205 }
206
207 let channel = parts[0];
208 let symbol = parts[1];
209
210 let topic = match channel {
211 "ticker" => format!("tickers.{}", symbol),
212 "orderbook" => format!("orderbook.50.{}", symbol),
213 "trades" => format!("publicTrade.{}", symbol),
214 "kline" => {
215 if parts.len() >= 3 {
216 format!("kline.{}.{}", parts[2], symbol)
217 } else {
218 return Err(Error::invalid_request(
219 "Kline unsubscribe requires interval",
220 ));
221 }
222 }
223 _ => {
224 return Err(Error::invalid_request(format!(
225 "Unknown channel: {}",
226 channel
227 )));
228 }
229 };
230
231 #[allow(clippy::disallowed_methods)]
233 let msg = serde_json::json!({
234 "op": "unsubscribe",
235 "args": [topic]
236 });
237
238 self.client.send_json(&msg).await?;
239
240 let mut subs = self.subscriptions.write().await;
242 subs.retain(|s| s != &stream_name);
243
244 Ok(())
245 }
246
247 pub async fn subscriptions(&self) -> Vec<String> {
249 self.subscriptions.read().await.clone()
250 }
251
252 pub async fn watch_ticker(
285 &self,
286 symbol: &str,
287 market: Option<Market>,
288 ) -> Result<MessageStream<Ticker>> {
289 if !self.is_connected() {
291 self.connect().await?;
292 }
293
294 self.subscribe_ticker(symbol).await?;
296
297 let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
299 let symbol_owned = symbol.to_string();
300 let client = Arc::clone(&self.client);
301
302 tokio::spawn(async move {
304 while let Some(msg) = client.receive().await {
305 if is_ticker_message(&msg, &symbol_owned) {
307 match parse_ws_ticker(&msg, market.as_ref()) {
308 Ok(ticker) => {
309 if tx.send(Ok(ticker)).is_err() {
310 break; }
312 }
313 Err(e) => {
314 if tx.send(Err(e)).is_err() {
315 break;
316 }
317 }
318 }
319 }
320 }
321 });
322
323 Ok(Box::pin(ReceiverStream::new(rx)))
324 }
325
326 pub async fn watch_order_book(
359 &self,
360 symbol: &str,
361 limit: Option<u32>,
362 ) -> Result<MessageStream<OrderBook>> {
363 if !self.is_connected() {
365 self.connect().await?;
366 }
367
368 let depth = limit.unwrap_or(50);
370 self.subscribe_orderbook(symbol, depth).await?;
371
372 let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
374 let symbol_owned = symbol.to_string();
375 let unified_symbol = format_unified_symbol(&symbol_owned);
376 let client = Arc::clone(&self.client);
377
378 tokio::spawn(async move {
380 while let Some(msg) = client.receive().await {
381 if is_orderbook_message(&msg, &symbol_owned) {
383 match parse_ws_orderbook(&msg, unified_symbol.clone()) {
384 Ok(orderbook) => {
385 if tx.send(Ok(orderbook)).is_err() {
386 break; }
388 }
389 Err(e) => {
390 if tx.send(Err(e)).is_err() {
391 break;
392 }
393 }
394 }
395 }
396 }
397 });
398
399 Ok(Box::pin(ReceiverStream::new(rx)))
400 }
401
402 pub async fn watch_trades(
439 &self,
440 symbol: &str,
441 market: Option<Market>,
442 ) -> Result<MessageStream<Vec<Trade>>> {
443 if !self.is_connected() {
445 self.connect().await?;
446 }
447
448 self.subscribe_trades(symbol).await?;
450
451 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
453 let symbol_owned = symbol.to_string();
454 let client = Arc::clone(&self.client);
455
456 tokio::spawn(async move {
458 while let Some(msg) = client.receive().await {
459 if is_trade_message(&msg, &symbol_owned) {
461 match parse_ws_trades(&msg, market.as_ref()) {
462 Ok(trades) => {
463 if tx.send(Ok(trades)).is_err() {
464 break; }
466 }
467 Err(e) => {
468 if tx.send(Err(e)).is_err() {
469 break;
470 }
471 }
472 }
473 }
474 }
475 });
476
477 Ok(Box::pin(ReceiverStream::new(rx)))
478 }
479}
480
481struct ReceiverStream<T> {
487 receiver: mpsc::UnboundedReceiver<T>,
488}
489
490impl<T> ReceiverStream<T> {
491 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
492 Self { receiver }
493 }
494}
495
496impl<T> Stream for ReceiverStream<T> {
497 type Item = T;
498
499 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
500 self.receiver.poll_recv(cx)
501 }
502}
503
504fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
510 if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
513 let expected_topic = format!("tickers.{}", symbol);
514 topic == expected_topic
515 } else {
516 false
517 }
518}
519
520fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
522 if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
525 topic.starts_with("orderbook.") && topic.ends_with(symbol)
527 } else {
528 false
529 }
530}
531
532fn is_trade_message(msg: &Value, symbol: &str) -> bool {
534 if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
537 let expected_topic = format!("publicTrade.{}", symbol);
538 topic == expected_topic
539 } else {
540 false
541 }
542}
543
544fn format_unified_symbol(symbol: &str) -> String {
546 let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
548
549 for quote in "e_currencies {
550 if let Some(base) = symbol.strip_suffix(quote) {
551 if !base.is_empty() {
552 return format!("{}/{}", base, quote);
553 }
554 }
555 }
556
557 symbol.to_string()
559}
560
561pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
567 let data = msg
570 .get("data")
571 .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
572
573 parse_ticker(data, market)
574}
575
576pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
578 let data = msg
581 .get("data")
582 .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
583
584 parse_orderbook(data, symbol)
585}
586
587pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
589 let data = msg
592 .get("data")
593 .and_then(|d| d.as_array())
594 .and_then(|arr| arr.first())
595 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
596
597 parse_trade(data, market)
598}
599
600pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
602 let data_array = msg
605 .get("data")
606 .and_then(|d| d.as_array())
607 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
608
609 let mut trades = Vec::with_capacity(data_array.len());
610 for data in data_array {
611 trades.push(parse_trade(data, market)?);
612 }
613
614 Ok(trades)
615}
616
617#[cfg(test)]
618mod tests {
619 use super::*;
620 use ccxt_core::types::financial::Price;
621 use rust_decimal_macros::dec;
622
623 #[test]
624 fn test_bybit_ws_creation() {
625 let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
626 assert!(ws.subscriptions.try_read().is_ok());
628 }
629
630 #[tokio::test]
631 async fn test_subscriptions_empty_by_default() {
632 let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
633 let subs = ws.subscriptions().await;
634 assert!(subs.is_empty());
635 }
636
637 #[test]
640 fn test_parse_ws_ticker_snapshot() {
641 let msg = serde_json::from_str(
642 r#"{
643 "topic": "tickers.BTCUSDT",
644 "type": "snapshot",
645 "data": {
646 "symbol": "BTCUSDT",
647 "lastPrice": "50000.00",
648 "highPrice24h": "51000.00",
649 "lowPrice24h": "49000.00",
650 "bid1Price": "49999.00",
651 "ask1Price": "50001.00",
652 "volume24h": "1000.5",
653 "time": "1700000000000"
654 },
655 "ts": 1700000000000
656 }"#,
657 )
658 .unwrap();
659
660 let ticker = parse_ws_ticker(&msg, None).unwrap();
661 assert_eq!(ticker.symbol, "BTCUSDT");
662 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
663 assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
664 assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
665 assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
666 assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
667 assert_eq!(ticker.timestamp, 1700000000000);
668 }
669
670 #[test]
671 fn test_parse_ws_ticker_with_market() {
672 let msg = serde_json::from_str(
673 r#"{
674 "topic": "tickers.BTCUSDT",
675 "type": "snapshot",
676 "data": {
677 "symbol": "BTCUSDT",
678 "lastPrice": "50000.00",
679 "time": "1700000000000"
680 },
681 "ts": 1700000000000
682 }"#,
683 )
684 .unwrap();
685
686 let market = Market {
687 id: "BTCUSDT".to_string(),
688 symbol: "BTC/USDT".to_string(),
689 base: "BTC".to_string(),
690 quote: "USDT".to_string(),
691 ..Default::default()
692 };
693
694 let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
695 assert_eq!(ticker.symbol, "BTC/USDT");
696 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
697 }
698
699 #[test]
700 fn test_parse_ws_ticker_missing_data() {
701 let msg = serde_json::from_str(
702 r#"{
703 "topic": "tickers.BTCUSDT",
704 "type": "snapshot",
705 "ts": 1700000000000
706 }"#,
707 )
708 .unwrap();
709
710 let result = parse_ws_ticker(&msg, None);
711 assert!(result.is_err());
712 }
713
714 #[test]
717 fn test_parse_ws_orderbook_snapshot() {
718 let msg = serde_json::from_str(
719 r#"{
720 "topic": "orderbook.50.BTCUSDT",
721 "type": "snapshot",
722 "data": {
723 "s": "BTCUSDT",
724 "b": [
725 ["50000.00", "1.5"],
726 ["49999.00", "2.0"],
727 ["49998.00", "0.5"]
728 ],
729 "a": [
730 ["50001.00", "1.0"],
731 ["50002.00", "3.0"],
732 ["50003.00", "2.5"]
733 ],
734 "u": 12345,
735 "seq": 67890,
736 "ts": "1700000000000"
737 },
738 "ts": 1700000000000
739 }"#,
740 )
741 .unwrap();
742
743 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
744 assert_eq!(orderbook.symbol, "BTC/USDT");
745 assert_eq!(orderbook.bids.len(), 3);
746 assert_eq!(orderbook.asks.len(), 3);
747
748 assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
750 assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
751 assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
752
753 assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
755 assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
756 assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
757 }
758
759 #[test]
760 fn test_parse_ws_orderbook_missing_data() {
761 let msg = serde_json::from_str(
762 r#"{
763 "topic": "orderbook.50.BTCUSDT",
764 "type": "snapshot",
765 "ts": 1700000000000
766 }"#,
767 )
768 .unwrap();
769
770 let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
771 assert!(result.is_err());
772 }
773
774 #[test]
775 fn test_parse_ws_orderbook_empty_sides() {
776 let msg = serde_json::from_str(
777 r#"{
778 "topic": "orderbook.50.BTCUSDT",
779 "type": "snapshot",
780 "data": {
781 "s": "BTCUSDT",
782 "b": [],
783 "a": [],
784 "u": 12345,
785 "ts": "1700000000000"
786 },
787 "ts": 1700000000000
788 }"#,
789 )
790 .unwrap();
791
792 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
793 assert!(orderbook.bids.is_empty());
794 assert!(orderbook.asks.is_empty());
795 }
796
797 #[test]
800 fn test_parse_ws_trade_single() {
801 let msg = serde_json::from_str(
802 r#"{
803 "topic": "publicTrade.BTCUSDT",
804 "type": "snapshot",
805 "data": [{
806 "i": "123456789",
807 "T": 1700000000000,
808 "p": "50000.00",
809 "v": "0.5",
810 "S": "Buy",
811 "s": "BTCUSDT"
812 }],
813 "ts": 1700000000000
814 }"#,
815 )
816 .unwrap();
817
818 let trade = parse_ws_trade(&msg, None).unwrap();
819 assert_eq!(trade.timestamp, 1700000000000);
821 }
822
823 #[test]
824 fn test_parse_ws_trades_multiple() {
825 let msg = serde_json::from_str(
826 r#"{
827 "topic": "publicTrade.BTCUSDT",
828 "type": "snapshot",
829 "data": [
830 {
831 "i": "123456789",
832 "T": 1700000000000,
833 "p": "50000.00",
834 "v": "0.5",
835 "S": "Buy",
836 "s": "BTCUSDT",
837 "side": "Buy",
838 "price": "50000.00",
839 "size": "0.5",
840 "time": "1700000000000"
841 },
842 {
843 "i": "123456790",
844 "T": 1700000000001,
845 "p": "50001.00",
846 "v": "1.0",
847 "S": "Sell",
848 "s": "BTCUSDT",
849 "side": "Sell",
850 "price": "50001.00",
851 "size": "1.0",
852 "time": "1700000000001"
853 }
854 ],
855 "ts": 1700000000001
856 }"#,
857 )
858 .unwrap();
859
860 let trades = parse_ws_trades(&msg, None).unwrap();
861 assert_eq!(trades.len(), 2);
862 }
863
864 #[test]
865 fn test_parse_ws_trade_missing_data() {
866 let msg = serde_json::from_str(
867 r#"{
868 "topic": "publicTrade.BTCUSDT",
869 "type": "snapshot",
870 "ts": 1700000000000
871 }"#,
872 )
873 .unwrap();
874
875 let result = parse_ws_trade(&msg, None);
876 assert!(result.is_err());
877 }
878
879 #[test]
880 fn test_parse_ws_trades_empty_array() {
881 let msg = serde_json::from_str(
882 r#"{
883 "topic": "publicTrade.BTCUSDT",
884 "type": "snapshot",
885 "data": [],
886 "ts": 1700000000000
887 }"#,
888 )
889 .unwrap();
890
891 let trades = parse_ws_trades(&msg, None).unwrap();
892 assert!(trades.is_empty());
893 }
894
895 #[test]
898 fn test_is_ticker_message_true() {
899 let msg = serde_json::from_str(
900 r#"{
901 "topic": "tickers.BTCUSDT",
902 "type": "snapshot",
903 "data": {},
904 "ts": 1700000000000
905 }"#,
906 )
907 .unwrap();
908
909 assert!(is_ticker_message(&msg, "BTCUSDT"));
910 }
911
912 #[test]
913 fn test_is_ticker_message_wrong_symbol() {
914 let msg = serde_json::from_str(
915 r#"{
916 "topic": "tickers.ETHUSDT",
917 "type": "snapshot",
918 "data": {},
919 "ts": 1700000000000
920 }"#,
921 )
922 .unwrap();
923
924 assert!(!is_ticker_message(&msg, "BTCUSDT"));
925 }
926
927 #[test]
928 fn test_is_ticker_message_wrong_topic() {
929 let msg = serde_json::from_str(
930 r#"{
931 "topic": "publicTrade.BTCUSDT",
932 "type": "snapshot",
933 "data": [],
934 "ts": 1700000000000
935 }"#,
936 )
937 .unwrap();
938
939 assert!(!is_ticker_message(&msg, "BTCUSDT"));
940 }
941
942 #[test]
943 fn test_is_orderbook_message_depth_50() {
944 let msg = serde_json::from_str(
945 r#"{
946 "topic": "orderbook.50.BTCUSDT",
947 "type": "snapshot",
948 "data": {},
949 "ts": 1700000000000
950 }"#,
951 )
952 .unwrap();
953
954 assert!(is_orderbook_message(&msg, "BTCUSDT"));
955 }
956
957 #[test]
958 fn test_is_orderbook_message_depth_200() {
959 let msg = serde_json::from_str(
960 r#"{
961 "topic": "orderbook.200.BTCUSDT",
962 "type": "snapshot",
963 "data": {},
964 "ts": 1700000000000
965 }"#,
966 )
967 .unwrap();
968
969 assert!(is_orderbook_message(&msg, "BTCUSDT"));
970 }
971
972 #[test]
973 fn test_is_trade_message_true() {
974 let msg = serde_json::from_str(
975 r#"{
976 "topic": "publicTrade.BTCUSDT",
977 "type": "snapshot",
978 "data": [],
979 "ts": 1700000000000
980 }"#,
981 )
982 .unwrap();
983
984 assert!(is_trade_message(&msg, "BTCUSDT"));
985 }
986
987 #[test]
988 fn test_is_trade_message_wrong_topic() {
989 let msg = serde_json::from_str(
990 r#"{
991 "topic": "tickers.BTCUSDT",
992 "type": "snapshot",
993 "data": {},
994 "ts": 1700000000000
995 }"#,
996 )
997 .unwrap();
998
999 assert!(!is_trade_message(&msg, "BTCUSDT"));
1000 }
1001
1002 #[test]
1005 fn test_format_unified_symbol_usdt() {
1006 assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1007 assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1008 }
1009
1010 #[test]
1011 fn test_format_unified_symbol_usdc() {
1012 assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1013 }
1014
1015 #[test]
1016 fn test_format_unified_symbol_btc() {
1017 assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1018 }
1019
1020 #[test]
1021 fn test_format_unified_symbol_unknown() {
1022 assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1024 }
1025}