ccxt_exchanges/bybit/
ws_exchange_impl.rs1use async_trait::async_trait;
7use ccxt_core::{
8 error::{Error, Result},
9 types::{Balance, Ohlcv, Order, OrderBook, Ticker, Timeframe, Trade},
10 ws_client::WsConnectionState,
11 ws_exchange::{MessageStream, WsExchange},
12};
13
14use tokio::sync::mpsc;
15
16use super::Bybit;
17
18struct ReceiverStream<T> {
20 receiver: mpsc::UnboundedReceiver<T>,
21}
22
23impl<T> ReceiverStream<T> {
24 #[allow(dead_code)]
25 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
26 Self { receiver }
27 }
28}
29
30impl<T> futures::Stream for ReceiverStream<T> {
31 type Item = T;
32
33 fn poll_next(
34 mut self: std::pin::Pin<&mut Self>,
35 cx: &mut std::task::Context<'_>,
36 ) -> std::task::Poll<Option<Self::Item>> {
37 self.receiver.poll_recv(cx)
38 }
39}
40
41#[async_trait]
42impl WsExchange for Bybit {
43 async fn ws_connect(&self) -> Result<()> {
46 let ws = self.create_ws();
47 ws.connect().await?;
48 Ok(())
49 }
50
51 async fn ws_disconnect(&self) -> Result<()> {
52 let ws = self.create_ws();
53 ws.disconnect().await
54 }
55
56 fn ws_is_connected(&self) -> bool {
57 false
60 }
61
62 fn ws_state(&self) -> WsConnectionState {
63 WsConnectionState::Disconnected
65 }
66
67 async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>> {
70 let ws = self.create_ws();
71
72 let bybit_symbol = symbol.replace('/', "");
74
75 let market = self.base().market(symbol).await.ok().map(|m| (*m).clone());
77
78 ws.watch_ticker(&bybit_symbol, market).await
79 }
80
81 async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
82 let ws = self.create_ws();
83
84 let bybit_symbols: Vec<String> = symbols.iter().map(|s| s.replace('/', "")).collect();
86
87 ws.watch_tickers(&bybit_symbols).await
88 }
89
90 async fn watch_order_book(
91 &self,
92 symbol: &str,
93 limit: Option<u32>,
94 ) -> Result<MessageStream<OrderBook>> {
95 let ws = self.create_ws();
96
97 let bybit_symbol = symbol.replace('/', "");
99
100 ws.watch_order_book(&bybit_symbol, limit).await
101 }
102
103 async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>> {
104 let ws = self.create_ws();
105
106 let bybit_symbol = symbol.replace('/', "");
108
109 let market = self.base().market(symbol).await.ok().map(|m| (*m).clone());
111
112 ws.watch_trades(&bybit_symbol, market).await
113 }
114
115 async fn watch_ohlcv(
116 &self,
117 _symbol: &str,
118 _timeframe: Timeframe,
119 ) -> Result<MessageStream<Ohlcv>> {
120 Err(Error::not_implemented(
121 "watch_ohlcv not yet implemented for Bybit",
122 ))
123 }
124
125 async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
128 Err(Error::not_implemented(
129 "watch_balance not yet implemented for Bybit",
130 ))
131 }
132
133 async fn watch_orders(&self, _symbol: Option<&str>) -> Result<MessageStream<Order>> {
134 Err(Error::not_implemented(
135 "watch_orders not yet implemented for Bybit",
136 ))
137 }
138
139 async fn watch_my_trades(&self, _symbol: Option<&str>) -> Result<MessageStream<Trade>> {
140 Err(Error::not_implemented(
141 "watch_my_trades not yet implemented for Bybit",
142 ))
143 }
144
145 async fn subscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
148 Err(Error::not_implemented(
149 "subscribe not yet implemented for Bybit",
150 ))
151 }
152
153 async fn unsubscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
154 Err(Error::not_implemented(
155 "unsubscribe not yet implemented for Bybit",
156 ))
157 }
158
159 fn subscriptions(&self) -> Vec<String> {
160 Vec::new()
161 }
162}