Skip to main content

ccxt_exchanges/bybit/
ws_exchange_impl.rs

1//! WsExchange trait implementation for Bybit
2//!
3//! This module implements the unified `WsExchange` trait from `ccxt-core` for Bybit,
4//! providing real-time WebSocket data streaming capabilities.
5
6use async_trait::async_trait;
7use ccxt_core::{
8    error::{Error, Result},
9    types::{Balance, Ohlcv, Order, OrderBook, Ticker, Timeframe, Trade},
10    ws_client::WsConnectionState,
11    ws_exchange::{MessageStream, WsExchange},
12};
13
14use tokio::sync::mpsc;
15
16use super::Bybit;
17
18/// A simple stream wrapper that converts an mpsc receiver into a Stream
19struct ReceiverStream<T> {
20    receiver: mpsc::UnboundedReceiver<T>,
21}
22
23impl<T> ReceiverStream<T> {
24    #[allow(dead_code)]
25    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
26        Self { receiver }
27    }
28}
29
30impl<T> futures::Stream for ReceiverStream<T> {
31    type Item = T;
32
33    fn poll_next(
34        mut self: std::pin::Pin<&mut Self>,
35        cx: &mut std::task::Context<'_>,
36    ) -> std::task::Poll<Option<Self::Item>> {
37        self.receiver.poll_recv(cx)
38    }
39}
40
41#[async_trait]
42impl WsExchange for Bybit {
43    // ==================== Connection Management ====================
44
45    async fn ws_connect(&self) -> Result<()> {
46        let ws = self.create_ws();
47        ws.connect().await?;
48        Ok(())
49    }
50
51    async fn ws_disconnect(&self) -> Result<()> {
52        let ws = self.create_ws();
53        ws.disconnect().await
54    }
55
56    fn ws_is_connected(&self) -> bool {
57        // Without persistent state tracking, return false
58        // In a full implementation, we'd track the active connection
59        false
60    }
61
62    fn ws_state(&self) -> WsConnectionState {
63        // Return disconnected since we don't have persistent state
64        WsConnectionState::Disconnected
65    }
66
67    // ==================== Public Data Streams ====================
68
69    async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>> {
70        let ws = self.create_ws();
71
72        // Convert unified symbol (BTC/USDT) to Bybit format (BTCUSDT)
73        let bybit_symbol = symbol.replace('/', "");
74
75        // Try to get market info if available (optional)
76        let market = self.base().market(symbol).await.ok().map(|m| (*m).clone());
77
78        ws.watch_ticker(&bybit_symbol, market).await
79    }
80
81    async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
82        let ws = self.create_ws();
83
84        // Convert unified symbols (BTC/USDT) to Bybit format (BTCUSDT)
85        let bybit_symbols: Vec<String> = symbols.iter().map(|s| s.replace('/', "")).collect();
86
87        ws.watch_tickers(&bybit_symbols).await
88    }
89
90    async fn watch_order_book(
91        &self,
92        symbol: &str,
93        limit: Option<u32>,
94    ) -> Result<MessageStream<OrderBook>> {
95        let ws = self.create_ws();
96
97        // Convert unified symbol (BTC/USDT) to Bybit format (BTCUSDT)
98        let bybit_symbol = symbol.replace('/', "");
99
100        ws.watch_order_book(&bybit_symbol, limit).await
101    }
102
103    async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>> {
104        let ws = self.create_ws();
105
106        // Convert unified symbol (BTC/USDT) to Bybit format (BTCUSDT)
107        let bybit_symbol = symbol.replace('/', "");
108
109        // Try to get market info if available (optional)
110        let market = self.base().market(symbol).await.ok().map(|m| (*m).clone());
111
112        ws.watch_trades(&bybit_symbol, market).await
113    }
114
115    async fn watch_ohlcv(
116        &self,
117        _symbol: &str,
118        _timeframe: Timeframe,
119    ) -> Result<MessageStream<Ohlcv>> {
120        Err(Error::not_implemented(
121            "watch_ohlcv not yet implemented for Bybit",
122        ))
123    }
124
125    // ==================== Private Data Streams ====================
126
127    async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
128        Err(Error::not_implemented(
129            "watch_balance not yet implemented for Bybit",
130        ))
131    }
132
133    async fn watch_orders(&self, _symbol: Option<&str>) -> Result<MessageStream<Order>> {
134        Err(Error::not_implemented(
135            "watch_orders not yet implemented for Bybit",
136        ))
137    }
138
139    async fn watch_my_trades(&self, _symbol: Option<&str>) -> Result<MessageStream<Trade>> {
140        Err(Error::not_implemented(
141            "watch_my_trades not yet implemented for Bybit",
142        ))
143    }
144
145    // ==================== Subscription Management ====================
146
147    async fn subscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
148        Err(Error::not_implemented(
149            "subscribe not yet implemented for Bybit",
150        ))
151    }
152
153    async fn unsubscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
154        Err(Error::not_implemented(
155            "unsubscribe not yet implemented for Bybit",
156        ))
157    }
158
159    fn subscriptions(&self) -> Vec<String> {
160        Vec::new()
161    }
162}