ccxt_exchanges/bitget/
ws_exchange_impl.rs

1//! WsExchange trait implementation for Bitget
2//!
3//! This module implements the unified `WsExchange` trait from `ccxt-core` for Bitget,
4//! providing real-time WebSocket data streaming capabilities.
5
6use async_trait::async_trait;
7use ccxt_core::{
8    error::{Error, Result},
9    exchange::Exchange,
10    types::{Balance, Ohlcv, Order, OrderBook, Ticker, Timeframe, Trade},
11    ws_client::WsConnectionState,
12    ws_exchange::{MessageStream, WsExchange},
13};
14
15use tokio::sync::mpsc;
16
17use super::Bitget;
18
19/// A simple stream wrapper that converts an mpsc receiver into a Stream
20struct ReceiverStream<T> {
21    receiver: mpsc::UnboundedReceiver<T>,
22}
23
24impl<T> ReceiverStream<T> {
25    #[allow(dead_code)]
26    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
27        Self { receiver }
28    }
29}
30
31impl<T> futures::Stream for ReceiverStream<T> {
32    type Item = T;
33
34    fn poll_next(
35        mut self: std::pin::Pin<&mut Self>,
36        cx: &mut std::task::Context<'_>,
37    ) -> std::task::Poll<Option<Self::Item>> {
38        self.receiver.poll_recv(cx)
39    }
40}
41
42#[async_trait]
43impl WsExchange for Bitget {
44    // ==================== Connection Management ====================
45
46    async fn ws_connect(&self) -> Result<()> {
47        let ws = self.create_ws();
48        ws.connect().await?;
49        Ok(())
50    }
51
52    async fn ws_disconnect(&self) -> Result<()> {
53        let ws = self.create_ws();
54        ws.disconnect().await
55    }
56
57    fn ws_is_connected(&self) -> bool {
58        // Without persistent state tracking, return false
59        // In a full implementation, we'd track the active connection
60        false
61    }
62
63    fn ws_state(&self) -> WsConnectionState {
64        // Return disconnected since we don't have persistent state
65        WsConnectionState::Disconnected
66    }
67
68    // ==================== Public Data Streams ====================
69
70    async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>> {
71        let ws = self.create_ws();
72
73        // Convert unified symbol (BTC/USDT) to Bitget format (BTCUSDT)
74        let bitget_symbol = symbol.replace('/', "");
75
76        // Try to get market info if available (optional)
77        let market = self.market(symbol).await.ok().map(|m| (*m).clone());
78
79        ws.watch_ticker(&bitget_symbol, market).await
80    }
81
82    async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
83        let ws = self.create_ws();
84
85        // Convert unified symbols (BTC/USDT) to Bitget format (BTCUSDT)
86        let bitget_symbols: Vec<String> = symbols.iter().map(|s| s.replace('/', "")).collect();
87
88        ws.watch_tickers(&bitget_symbols).await
89    }
90
91    async fn watch_order_book(
92        &self,
93        symbol: &str,
94        limit: Option<u32>,
95    ) -> Result<MessageStream<OrderBook>> {
96        let ws = self.create_ws();
97
98        // Convert unified symbol (BTC/USDT) to Bitget format (BTCUSDT)
99        let bitget_symbol = symbol.replace('/', "");
100
101        ws.watch_order_book(&bitget_symbol, limit).await
102    }
103
104    async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>> {
105        let ws = self.create_ws();
106
107        // Convert unified symbol (BTC/USDT) to Bitget format (BTCUSDT)
108        let bitget_symbol = symbol.replace('/', "");
109
110        // Try to get market info if available (optional)
111        let market = self.market(symbol).await.ok().map(|m| (*m).clone());
112
113        ws.watch_trades(&bitget_symbol, market).await
114    }
115
116    async fn watch_ohlcv(
117        &self,
118        _symbol: &str,
119        _timeframe: Timeframe,
120    ) -> Result<MessageStream<Ohlcv>> {
121        Err(Error::not_implemented(
122            "watch_ohlcv not yet implemented for Bitget",
123        ))
124    }
125
126    // ==================== Private Data Streams ====================
127
128    async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
129        Err(Error::not_implemented(
130            "watch_balance not yet implemented for Bitget",
131        ))
132    }
133
134    async fn watch_orders(&self, _symbol: Option<&str>) -> Result<MessageStream<Order>> {
135        Err(Error::not_implemented(
136            "watch_orders not yet implemented for Bitget",
137        ))
138    }
139
140    async fn watch_my_trades(&self, _symbol: Option<&str>) -> Result<MessageStream<Trade>> {
141        Err(Error::not_implemented(
142            "watch_my_trades not yet implemented for Bitget",
143        ))
144    }
145
146    // ==================== Subscription Management ====================
147
148    async fn subscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
149        Err(Error::not_implemented(
150            "subscribe not yet implemented for Bitget",
151        ))
152    }
153
154    async fn unsubscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
155        Err(Error::not_implemented(
156            "unsubscribe not yet implemented for Bitget",
157        ))
158    }
159
160    fn subscriptions(&self) -> Vec<String> {
161        Vec::new()
162    }
163}