ccxt_exchanges/bitget/
ws_exchange_impl.rs1use async_trait::async_trait;
7use ccxt_core::{
8 error::{Error, Result},
9 exchange::Exchange,
10 types::{Balance, Ohlcv, Order, OrderBook, Ticker, Timeframe, Trade},
11 ws_client::WsConnectionState,
12 ws_exchange::{MessageStream, WsExchange},
13};
14
15use tokio::sync::mpsc;
16
17use super::Bitget;
18
19struct ReceiverStream<T> {
21 receiver: mpsc::UnboundedReceiver<T>,
22}
23
24impl<T> ReceiverStream<T> {
25 #[allow(dead_code)]
26 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
27 Self { receiver }
28 }
29}
30
31impl<T> futures::Stream for ReceiverStream<T> {
32 type Item = T;
33
34 fn poll_next(
35 mut self: std::pin::Pin<&mut Self>,
36 cx: &mut std::task::Context<'_>,
37 ) -> std::task::Poll<Option<Self::Item>> {
38 self.receiver.poll_recv(cx)
39 }
40}
41
42#[async_trait]
43impl WsExchange for Bitget {
44 async fn ws_connect(&self) -> Result<()> {
47 let ws = self.create_ws();
48 ws.connect().await?;
49 Ok(())
50 }
51
52 async fn ws_disconnect(&self) -> Result<()> {
53 let ws = self.create_ws();
54 ws.disconnect().await
55 }
56
57 fn ws_is_connected(&self) -> bool {
58 false
61 }
62
63 fn ws_state(&self) -> WsConnectionState {
64 WsConnectionState::Disconnected
66 }
67
68 async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>> {
71 let ws = self.create_ws();
72
73 let bitget_symbol = symbol.replace('/', "");
75
76 let market = self.market(symbol).await.ok().map(|m| (*m).clone());
78
79 ws.watch_ticker(&bitget_symbol, market).await
80 }
81
82 async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
83 let ws = self.create_ws();
84
85 let bitget_symbols: Vec<String> = symbols.iter().map(|s| s.replace('/', "")).collect();
87
88 ws.watch_tickers(&bitget_symbols).await
89 }
90
91 async fn watch_order_book(
92 &self,
93 symbol: &str,
94 limit: Option<u32>,
95 ) -> Result<MessageStream<OrderBook>> {
96 let ws = self.create_ws();
97
98 let bitget_symbol = symbol.replace('/', "");
100
101 ws.watch_order_book(&bitget_symbol, limit).await
102 }
103
104 async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>> {
105 let ws = self.create_ws();
106
107 let bitget_symbol = symbol.replace('/', "");
109
110 let market = self.market(symbol).await.ok().map(|m| (*m).clone());
112
113 ws.watch_trades(&bitget_symbol, market).await
114 }
115
116 async fn watch_ohlcv(
117 &self,
118 _symbol: &str,
119 _timeframe: Timeframe,
120 ) -> Result<MessageStream<Ohlcv>> {
121 Err(Error::not_implemented(
122 "watch_ohlcv not yet implemented for Bitget",
123 ))
124 }
125
126 async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
129 let ws = self.create_private_ws();
130 let auth = self.get_auth()?;
131 ws.login(&auth).await?;
132 ws.watch_balance().await
133 }
134
135 async fn watch_orders(&self, symbol: Option<&str>) -> Result<MessageStream<Order>> {
136 let ws = self.create_private_ws();
137 let auth = self.get_auth()?;
138 ws.login(&auth).await?;
139 ws.watch_orders(symbol.map(ToString::to_string)).await
140 }
141
142 async fn watch_my_trades(&self, symbol: Option<&str>) -> Result<MessageStream<Trade>> {
143 let ws = self.create_private_ws();
144 let auth = self.get_auth()?;
145 ws.login(&auth).await?;
146 ws.watch_my_trades(symbol.map(ToString::to_string)).await
147 }
148
149 async fn subscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
152 Err(Error::not_implemented(
153 "subscribe not yet implemented for Bitget",
154 ))
155 }
156
157 async fn unsubscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
158 Err(Error::not_implemented(
159 "unsubscribe not yet implemented for Bitget",
160 ))
161 }
162
163 fn subscriptions(&self) -> Vec<String> {
164 Vec::new()
165 }
166}