Skip to main content

ccxt_exchanges/okx/
ws_exchange_impl.rs

1//! WsExchange trait implementation for OKX
2//!
3//! This module implements the unified `WsExchange` trait from `ccxt-core` for OKX,
4//! providing real-time WebSocket data streaming capabilities.
5
6use async_trait::async_trait;
7use ccxt_core::{
8    error::{Error, Result},
9    types::{Balance, Ohlcv, Order, OrderBook, Ticker, Timeframe, Trade},
10    ws_client::WsConnectionState,
11    ws_exchange::{MessageStream, WsExchange},
12};
13
14use super::Okx;
15
16#[async_trait]
17impl WsExchange for Okx {
18    // ==================== Connection Management ====================
19
20    async fn ws_connect(&self) -> Result<()> {
21        let ws = self.create_ws();
22        ws.connect().await?;
23        Ok(())
24    }
25
26    async fn ws_disconnect(&self) -> Result<()> {
27        let ws = self.create_ws();
28        ws.disconnect().await
29    }
30
31    fn ws_is_connected(&self) -> bool {
32        // Without persistent state tracking, return false
33        // In a full implementation, we'd track the active connection
34        false
35    }
36
37    fn ws_state(&self) -> WsConnectionState {
38        // Return disconnected since we don't have persistent state
39        WsConnectionState::Disconnected
40    }
41
42    // ==================== Public Data Streams ====================
43
44    async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>> {
45        let ws = self.create_ws();
46
47        // Convert unified symbol (BTC/USDT) to OKX format (BTC-USDT)
48        let okx_symbol = symbol.replace('/', "-");
49
50        // Try to get market info if available (optional)
51        let market = self.base().market(symbol).await.ok().map(|m| (*m).clone());
52
53        ws.watch_ticker(&okx_symbol, market).await
54    }
55
56    async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
57        let ws = self.create_ws();
58
59        // Convert unified symbols (BTC/USDT) to OKX format (BTC-USDT)
60        let okx_symbols: Vec<String> = symbols.iter().map(|s| s.replace('/', "-")).collect();
61
62        ws.watch_tickers(&okx_symbols).await
63    }
64
65    async fn watch_order_book(
66        &self,
67        symbol: &str,
68        limit: Option<u32>,
69    ) -> Result<MessageStream<OrderBook>> {
70        let ws = self.create_ws();
71
72        // Convert unified symbol (BTC/USDT) to OKX format (BTC-USDT)
73        let okx_symbol = symbol.replace('/', "-");
74
75        ws.watch_order_book(&okx_symbol, limit).await
76    }
77
78    async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>> {
79        let ws = self.create_ws();
80
81        // Convert unified symbol (BTC/USDT) to OKX format (BTC-USDT)
82        let okx_symbol = symbol.replace('/', "-");
83
84        // Try to get market info if available (optional)
85        let market = self.base().market(symbol).await.ok().map(|m| (*m).clone());
86
87        ws.watch_trades(&okx_symbol, market).await
88    }
89
90    async fn watch_ohlcv(
91        &self,
92        _symbol: &str,
93        _timeframe: Timeframe,
94    ) -> Result<MessageStream<Ohlcv>> {
95        Err(Error::not_implemented(
96            "watch_ohlcv not yet implemented for OKX",
97        ))
98    }
99
100    // ==================== Private Data Streams ====================
101
102    async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
103        let ws = self.create_private_ws();
104        let auth = self.get_auth()?;
105        ws.login(&auth).await?;
106        ws.watch_balance().await
107    }
108
109    async fn watch_orders(&self, symbol: Option<&str>) -> Result<MessageStream<Order>> {
110        let ws = self.create_private_ws();
111        let auth = self.get_auth()?;
112        ws.login(&auth).await?;
113        ws.watch_orders(symbol.map(ToString::to_string)).await
114    }
115
116    async fn watch_my_trades(&self, symbol: Option<&str>) -> Result<MessageStream<Trade>> {
117        let ws = self.create_private_ws();
118        let auth = self.get_auth()?;
119        ws.login(&auth).await?;
120        ws.watch_my_trades(symbol.map(ToString::to_string)).await
121    }
122
123    // ==================== Subscription Management ====================
124
125    async fn subscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
126        Err(Error::not_implemented(
127            "subscribe not yet implemented for OKX",
128        ))
129    }
130
131    async fn unsubscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
132        Err(Error::not_implemented(
133            "unsubscribe not yet implemented for OKX",
134        ))
135    }
136
137    fn subscriptions(&self) -> Vec<String> {
138        Vec::new()
139    }
140}