ccxt_exchanges/okx/
ws_exchange_impl.rs1use async_trait::async_trait;
7use ccxt_core::{
8 error::{Error, Result},
9 types::{Balance, Ohlcv, Order, OrderBook, Ticker, Timeframe, Trade},
10 ws_client::WsConnectionState,
11 ws_exchange::{MessageStream, WsExchange},
12};
13
14use super::Okx;
15
16#[async_trait]
17impl WsExchange for Okx {
18 async fn ws_connect(&self) -> Result<()> {
21 let ws = self.create_ws();
22 ws.connect().await?;
23 Ok(())
24 }
25
26 async fn ws_disconnect(&self) -> Result<()> {
27 let ws = self.create_ws();
28 ws.disconnect().await
29 }
30
31 fn ws_is_connected(&self) -> bool {
32 false
35 }
36
37 fn ws_state(&self) -> WsConnectionState {
38 WsConnectionState::Disconnected
40 }
41
42 async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>> {
45 let ws = self.create_ws();
46
47 let okx_symbol = symbol.replace('/', "-");
49
50 let market = self.base().market(symbol).await.ok().map(|m| (*m).clone());
52
53 ws.watch_ticker(&okx_symbol, market).await
54 }
55
56 async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
57 let ws = self.create_ws();
58
59 let okx_symbols: Vec<String> = symbols.iter().map(|s| s.replace('/', "-")).collect();
61
62 ws.watch_tickers(&okx_symbols).await
63 }
64
65 async fn watch_order_book(
66 &self,
67 symbol: &str,
68 limit: Option<u32>,
69 ) -> Result<MessageStream<OrderBook>> {
70 let ws = self.create_ws();
71
72 let okx_symbol = symbol.replace('/', "-");
74
75 ws.watch_order_book(&okx_symbol, limit).await
76 }
77
78 async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>> {
79 let ws = self.create_ws();
80
81 let okx_symbol = symbol.replace('/', "-");
83
84 let market = self.base().market(symbol).await.ok().map(|m| (*m).clone());
86
87 ws.watch_trades(&okx_symbol, market).await
88 }
89
90 async fn watch_ohlcv(
91 &self,
92 _symbol: &str,
93 _timeframe: Timeframe,
94 ) -> Result<MessageStream<Ohlcv>> {
95 Err(Error::not_implemented(
96 "watch_ohlcv not yet implemented for OKX",
97 ))
98 }
99
100 async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
103 let ws = self.create_private_ws();
104 let auth = self.get_auth()?;
105 ws.login(&auth).await?;
106 ws.watch_balance().await
107 }
108
109 async fn watch_orders(&self, symbol: Option<&str>) -> Result<MessageStream<Order>> {
110 let ws = self.create_private_ws();
111 let auth = self.get_auth()?;
112 ws.login(&auth).await?;
113 ws.watch_orders(symbol.map(ToString::to_string)).await
114 }
115
116 async fn watch_my_trades(&self, symbol: Option<&str>) -> Result<MessageStream<Trade>> {
117 let ws = self.create_private_ws();
118 let auth = self.get_auth()?;
119 ws.login(&auth).await?;
120 ws.watch_my_trades(symbol.map(ToString::to_string)).await
121 }
122
123 async fn subscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
126 Err(Error::not_implemented(
127 "subscribe not yet implemented for OKX",
128 ))
129 }
130
131 async fn unsubscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
132 Err(Error::not_implemented(
133 "unsubscribe not yet implemented for OKX",
134 ))
135 }
136
137 fn subscriptions(&self) -> Vec<String> {
138 Vec::new()
139 }
140}