Skip to main content

ccxt_exchanges/okx/
ws.rs

1//! OKX WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for OKX exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (balance, orders) with automatic reconnection.
6
7use crate::okx::auth::OkxAuth;
8use crate::okx::parser::{parse_orderbook, parse_ticker, parse_trade};
9use ccxt_core::error::{Error, Result};
10use ccxt_core::types::{
11    Balance, BalanceEntry, Fee, Market, Order, OrderBook, OrderSide, OrderStatus, OrderType,
12    Ticker, Trade,
13    financial::{Amount, Cost, Price},
14};
15use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
16use ccxt_core::ws_exchange::MessageStream;
17use futures::Stream;
18use rust_decimal::Decimal;
19use rust_decimal::prelude::FromStr;
20use serde_json::Value;
21use std::collections::HashMap;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25use tokio::sync::{RwLock, mpsc};
26use tracing::{debug, warn};
27
28/// Default ping interval for OKX WebSocket (25 seconds).
29/// OKX requires ping every 30 seconds, we use 25 for safety margin.
30const DEFAULT_PING_INTERVAL_MS: u64 = 25000;
31
32/// Default reconnect delay (5 seconds).
33const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
34
35/// Maximum reconnect attempts.
36const MAX_RECONNECT_ATTEMPTS: u32 = 10;
37
38/// OKX WebSocket client.
39///
40/// Provides real-time data streaming for OKX exchange.
41pub struct OkxWs {
42    /// WebSocket client instance.
43    client: Arc<WsClient>,
44    /// Active subscriptions.
45    subscriptions: Arc<RwLock<Vec<String>>>,
46}
47
48impl OkxWs {
49    /// Creates a new OKX WebSocket client.
50    ///
51    /// # Arguments
52    ///
53    /// * `url` - WebSocket server URL
54    pub fn new(url: String) -> Self {
55        let config = WsConfig {
56            url: url.clone(),
57            connect_timeout: 10000,
58            ping_interval: DEFAULT_PING_INTERVAL_MS,
59            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
60            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
61            auto_reconnect: true,
62            enable_compression: false,
63            pong_timeout: 90000,
64            ..Default::default()
65        };
66
67        Self {
68            client: Arc::new(WsClient::new(config)),
69            subscriptions: Arc::new(RwLock::new(Vec::new())),
70        }
71    }
72
73    /// Connects to the WebSocket server.
74    pub async fn connect(&self) -> Result<()> {
75        self.client.connect().await
76    }
77
78    /// Disconnects from the WebSocket server.
79    pub async fn disconnect(&self) -> Result<()> {
80        self.client.disconnect().await
81    }
82
83    /// Returns the current connection state.
84    pub fn state(&self) -> WsConnectionState {
85        self.client.state()
86    }
87
88    /// Checks if the WebSocket is connected.
89    pub fn is_connected(&self) -> bool {
90        self.client.is_connected()
91    }
92
93    /// Receives the next message from the WebSocket.
94    pub async fn receive(&self) -> Option<Value> {
95        self.client.receive().await
96    }
97
98    /// Subscribes to a ticker stream.
99    ///
100    /// # Arguments
101    ///
102    /// * `symbol` - Trading pair symbol in OKX format (e.g., "BTC-USDT")
103    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
104        // OKX V5 WebSocket subscription format
105        // json! macro with literal values is infallible
106        #[allow(clippy::disallowed_methods)]
107        let msg = serde_json::json!({
108            "op": "subscribe",
109            "args": [{
110                "channel": "tickers",
111                "instId": symbol
112            }]
113        });
114
115        self.client.send_json(&msg).await?;
116
117        let sub_key = format!("ticker:{}", symbol);
118        self.subscriptions.write().await.push(sub_key);
119
120        Ok(())
121    }
122
123    /// Subscribes to multiple ticker streams.
124    ///
125    /// # Arguments
126    ///
127    /// * `symbols` - List of trading pair symbols (e.g., `["BTC-USDT", "ETH-USDT"]`)
128    pub async fn subscribe_tickers(&self, symbols: &[String]) -> Result<()> {
129        let mut args = Vec::new();
130        for symbol in symbols {
131            let mut arg_map = serde_json::Map::new();
132            arg_map.insert(
133                "channel".to_string(),
134                serde_json::Value::String("tickers".to_string()),
135            );
136            arg_map.insert(
137                "instId".to_string(),
138                serde_json::Value::String(symbol.clone()),
139            );
140            args.push(serde_json::Value::Object(arg_map));
141        }
142
143        // json! macro with literal values is infallible
144        #[allow(clippy::disallowed_methods)]
145        let msg = serde_json::json!({
146            "op": "subscribe",
147            "args": args
148        });
149
150        self.client.send_json(&msg).await?;
151
152        let mut subs = self.subscriptions.write().await;
153        for symbol in symbols {
154            subs.push(format!("ticker:{}", symbol));
155        }
156
157        Ok(())
158    }
159
160    /// Watches ticker updates for multiple symbols.
161    ///
162    /// Returns a stream of `Vec<Ticker>` updates for the specified symbols.
163    ///
164    /// # Arguments
165    ///
166    /// * `symbols` - List of trading pair symbols (e.g., `["BTC-USDT", "ETH-USDT"]`)
167    ///
168    /// # Returns
169    ///
170    /// A `MessageStream<Vec<Ticker>>` that yields ticker updates.
171    pub async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
172        // Ensure connected
173        if !self.is_connected() {
174            self.connect().await?;
175        }
176
177        // Subscribe to ticker channels
178        self.subscribe_tickers(symbols).await?;
179
180        // Create channel for ticker updates
181        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Ticker>>>();
182        let symbols_owned: Vec<String> = symbols.to_vec();
183        let client = Arc::clone(&self.client);
184
185        // Spawn task to process messages and filter ticker updates
186        tokio::spawn(async move {
187            while let Some(msg) = client.receive().await {
188                // Check if this is a ticker message for ANY of our symbols
189                if let Some(arg) = msg.get("arg") {
190                    let channel = arg.get("channel").and_then(|c| c.as_str());
191                    let inst_id = arg.get("instId").and_then(|i| i.as_str());
192
193                    if channel == Some("tickers") {
194                        if let Some(id) = inst_id {
195                            let has_data = msg
196                                .get("data")
197                                .and_then(|d| d.as_array())
198                                .and_then(|arr| arr.first())
199                                .is_some();
200                            if has_data && symbols_owned.iter().any(|s| s == id) {
201                                match parse_ws_ticker(&msg, None) {
202                                    Ok(ticker) => {
203                                        if tx.send(Ok(vec![ticker])).is_err() {
204                                            break; // Receiver dropped
205                                        }
206                                    }
207                                    Err(e) => {
208                                        if tx.send(Err(e)).is_err() {
209                                            break;
210                                        }
211                                    }
212                                }
213                            }
214                        }
215                    }
216                }
217            }
218        });
219
220        Ok(Box::pin(ReceiverStream::new(rx)))
221    }
222    /// * `depth` - Orderbook depth (5, 50, or 400)
223    pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
224        // OKX supports orderbook depths: books5, books, books50-l2
225        let channel = match depth {
226            d if d <= 5 => "books5",
227            d if d <= 50 => "books50-l2",
228            _ => "books",
229        };
230
231        // json! macro with literal values is infallible
232        #[allow(clippy::disallowed_methods)]
233        let msg = serde_json::json!({
234            "op": "subscribe",
235            "args": [{
236                "channel": channel,
237                "instId": symbol
238            }]
239        });
240
241        self.client.send_json(&msg).await?;
242
243        let sub_key = format!("orderbook:{}", symbol);
244        self.subscriptions.write().await.push(sub_key);
245
246        Ok(())
247    }
248
249    /// Subscribes to a trades stream.
250    ///
251    /// # Arguments
252    ///
253    /// * `symbol` - Trading pair symbol in OKX format (e.g., "BTC-USDT")
254    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
255        // json! macro with literal values is infallible
256        #[allow(clippy::disallowed_methods)]
257        let msg = serde_json::json!({
258            "op": "subscribe",
259            "args": [{
260                "channel": "trades",
261                "instId": symbol
262            }]
263        });
264
265        self.client.send_json(&msg).await?;
266
267        let sub_key = format!("trades:{}", symbol);
268        self.subscriptions.write().await.push(sub_key);
269
270        Ok(())
271    }
272
273    /// Subscribes to a kline/candlestick stream.
274    ///
275    /// # Arguments
276    ///
277    /// * `symbol` - Trading pair symbol in OKX format (e.g., "BTC-USDT")
278    /// * `interval` - Kline interval (e.g., "1m", "5m", "1H", "1D")
279    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
280        let channel = format!("candle{}", interval);
281
282        // json! macro with literal values is infallible
283        #[allow(clippy::disallowed_methods)]
284        let msg = serde_json::json!({
285            "op": "subscribe",
286            "args": [{
287                "channel": channel,
288                "instId": symbol
289            }]
290        });
291
292        self.client.send_json(&msg).await?;
293
294        let sub_key = format!("kline:{}:{}", symbol, interval);
295        self.subscriptions.write().await.push(sub_key);
296
297        Ok(())
298    }
299
300    /// Unsubscribes from a stream.
301    ///
302    /// # Arguments
303    ///
304    /// * `stream_name` - Stream identifier to unsubscribe from
305    pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
306        // Parse stream name to determine channel and symbol
307        let parts: Vec<&str> = stream_name.split(':').collect();
308        if parts.len() < 2 {
309            return Err(Error::invalid_request(format!(
310                "Invalid stream name: {}",
311                stream_name
312            )));
313        }
314
315        let channel_type = parts[0];
316        let symbol = parts[1];
317
318        let channel = match channel_type {
319            "ticker" => "tickers".to_string(),
320            "orderbook" => "books5".to_string(),
321            "trades" => "trades".to_string(),
322            "kline" => {
323                if parts.len() >= 3 {
324                    format!("candle{}", parts[2])
325                } else {
326                    return Err(Error::invalid_request(
327                        "Kline unsubscribe requires interval",
328                    ));
329                }
330            }
331            _ => {
332                return Err(Error::invalid_request(format!(
333                    "Unknown channel: {}",
334                    channel_type
335                )));
336            }
337        };
338
339        // json! macro with literal values is infallible
340        #[allow(clippy::disallowed_methods)]
341        let msg = serde_json::json!({
342            "op": "unsubscribe",
343            "args": [{
344                "channel": channel,
345                "instId": symbol
346            }]
347        });
348
349        self.client.send_json(&msg).await?;
350
351        // Remove from subscriptions
352        let mut subs = self.subscriptions.write().await;
353        subs.retain(|s| s != &stream_name);
354
355        Ok(())
356    }
357
358    /// Returns the list of active subscriptions.
359    pub async fn subscriptions(&self) -> Vec<String> {
360        self.subscriptions.read().await.clone()
361    }
362
363    /// Watches ticker updates for a symbol.
364    ///
365    /// Returns a stream of `Ticker` updates for the specified symbol.
366    ///
367    /// # Arguments
368    ///
369    /// * `symbol` - Trading pair symbol in OKX format (e.g., "BTC-USDT")
370    /// * `market` - Optional market information for symbol resolution
371    ///
372    /// # Returns
373    ///
374    /// A `MessageStream<Ticker>` that yields ticker updates.
375    pub async fn watch_ticker(
376        &self,
377        symbol: &str,
378        market: Option<Market>,
379    ) -> Result<MessageStream<Ticker>> {
380        // Ensure connected
381        if !self.is_connected() {
382            self.connect().await?;
383        }
384
385        // Subscribe to ticker channel
386        self.subscribe_ticker(symbol).await?;
387
388        // Create channel for ticker updates
389        let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
390        let symbol_owned = symbol.to_string();
391        let client = Arc::clone(&self.client);
392
393        // Spawn task to process messages and filter ticker updates
394        tokio::spawn(async move {
395            while let Some(msg) = client.receive().await {
396                // Check if this is a ticker message for our symbol
397                if is_ticker_message(&msg, &symbol_owned) {
398                    match parse_ws_ticker(&msg, market.as_ref()) {
399                        Ok(ticker) => {
400                            if tx.send(Ok(ticker)).is_err() {
401                                break; // Receiver dropped
402                            }
403                        }
404                        Err(e) => {
405                            if tx.send(Err(e)).is_err() {
406                                break;
407                            }
408                        }
409                    }
410                }
411            }
412        });
413
414        Ok(Box::pin(ReceiverStream::new(rx)))
415    }
416
417    /// Watches order book updates for a symbol.
418    ///
419    /// Returns a stream of `OrderBook` updates for the specified symbol.
420    ///
421    /// # Arguments
422    ///
423    /// * `symbol` - Trading pair symbol in OKX format (e.g., "BTC-USDT")
424    /// * `limit` - Optional depth limit (5, 50, or 400)
425    ///
426    /// # Returns
427    ///
428    /// A `MessageStream<OrderBook>` that yields order book updates.
429    pub async fn watch_order_book(
430        &self,
431        symbol: &str,
432        limit: Option<u32>,
433    ) -> Result<MessageStream<OrderBook>> {
434        // Ensure connected
435        if !self.is_connected() {
436            self.connect().await?;
437        }
438
439        // Subscribe to orderbook channel
440        let depth = limit.unwrap_or(5);
441        self.subscribe_orderbook(symbol, depth).await?;
442
443        // Create channel for orderbook updates
444        let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
445        let symbol_owned = symbol.to_string();
446        let unified_symbol = format_unified_symbol(&symbol_owned);
447        let client = Arc::clone(&self.client);
448
449        // Spawn task to process messages and filter orderbook updates
450        tokio::spawn(async move {
451            while let Some(msg) = client.receive().await {
452                // Check if this is an orderbook message for our symbol
453                if is_orderbook_message(&msg, &symbol_owned) {
454                    match parse_ws_orderbook(&msg, unified_symbol.clone()) {
455                        Ok(orderbook) => {
456                            if tx.send(Ok(orderbook)).is_err() {
457                                break; // Receiver dropped
458                            }
459                        }
460                        Err(e) => {
461                            if tx.send(Err(e)).is_err() {
462                                break;
463                            }
464                        }
465                    }
466                }
467            }
468        });
469
470        Ok(Box::pin(ReceiverStream::new(rx)))
471    }
472
473    /// Watches trade updates for a symbol.
474    ///
475    /// Returns a stream of `Trade` updates for the specified symbol.
476    ///
477    /// # Arguments
478    ///
479    /// * `symbol` - Trading pair symbol in OKX format (e.g., "BTC-USDT")
480    /// * `market` - Optional market information for symbol resolution
481    ///
482    /// # Returns
483    ///
484    /// A `MessageStream<Vec<Trade>>` that yields trade updates.
485    pub async fn watch_trades(
486        &self,
487        symbol: &str,
488        market: Option<Market>,
489    ) -> Result<MessageStream<Vec<Trade>>> {
490        // Ensure connected
491        if !self.is_connected() {
492            self.connect().await?;
493        }
494
495        // Subscribe to trades channel
496        self.subscribe_trades(symbol).await?;
497
498        // Create channel for trade updates
499        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
500        let symbol_owned = symbol.to_string();
501        let client = Arc::clone(&self.client);
502
503        // Spawn task to process messages and filter trade updates
504        tokio::spawn(async move {
505            while let Some(msg) = client.receive().await {
506                // Check if this is a trade message for our symbol
507                if is_trade_message(&msg, &symbol_owned) {
508                    match parse_ws_trades(&msg, market.as_ref()) {
509                        Ok(trades) => {
510                            if tx.send(Ok(trades)).is_err() {
511                                break; // Receiver dropped
512                            }
513                        }
514                        Err(e) => {
515                            if tx.send(Err(e)).is_err() {
516                                break;
517                            }
518                        }
519                    }
520                }
521            }
522        });
523
524        Ok(Box::pin(ReceiverStream::new(rx)))
525    }
526
527    // ========================================================================
528    // Private WebSocket Methods
529    // ========================================================================
530
531    /// Sends a login message to authenticate the private WebSocket connection.
532    ///
533    /// OKX WebSocket login format:
534    /// ```json
535    /// {
536    ///   "op": "login",
537    ///   "args": [{
538    ///     "apiKey": "...",
539    ///     "passphrase": "...",
540    ///     "timestamp": "...",
541    ///     "sign": "..."
542    ///   }]
543    /// }
544    /// ```
545    ///
546    /// The signature is: HMAC-SHA256(timestamp + "GET" + "/users/self/verify", secret)
547    pub async fn login(&self, auth: &OkxAuth) -> Result<()> {
548        let timestamp = (chrono::Utc::now().timestamp_millis() / 1000).to_string();
549        let sign = auth.sign(&timestamp, "GET", "/users/self/verify", "");
550
551        #[allow(clippy::disallowed_methods)]
552        let msg = serde_json::json!({
553            "op": "login",
554            "args": [{
555                "apiKey": auth.api_key(),
556                "passphrase": auth.passphrase(),
557                "timestamp": timestamp,
558                "sign": sign
559            }]
560        });
561
562        self.client.send_json(&msg).await?;
563
564        // Wait for login response
565        let timeout = tokio::time::Duration::from_secs(10);
566        let start = tokio::time::Instant::now();
567
568        while start.elapsed() < timeout {
569            if let Some(resp) = self.client.receive().await {
570                if let Some(event) = resp.get("event").and_then(|e| e.as_str()) {
571                    if event == "login" {
572                        let code = resp.get("code").and_then(|c| c.as_str()).unwrap_or("1");
573                        if code == "0" {
574                            debug!("OKX WebSocket login successful");
575                            return Ok(());
576                        }
577                        let msg_text = resp
578                            .get("msg")
579                            .and_then(|m| m.as_str())
580                            .unwrap_or("Unknown error");
581                        return Err(Error::authentication(format!(
582                            "OKX WebSocket login failed: {} (code: {})",
583                            msg_text, code
584                        )));
585                    }
586                }
587            }
588        }
589
590        Err(Error::authentication(
591            "OKX WebSocket login timed out waiting for response",
592        ))
593    }
594
595    /// Subscribes to the orders channel (private).
596    ///
597    /// Receives real-time order updates for all instrument types.
598    pub async fn subscribe_orders(&self, inst_type: Option<&str>) -> Result<()> {
599        let mut arg = serde_json::Map::new();
600        arg.insert("channel".to_string(), Value::String("orders".to_string()));
601        arg.insert(
602            "instType".to_string(),
603            Value::String(inst_type.unwrap_or("ANY").to_string()),
604        );
605
606        #[allow(clippy::disallowed_methods)]
607        let msg = serde_json::json!({
608            "op": "subscribe",
609            "args": [Value::Object(arg)]
610        });
611
612        self.client.send_json(&msg).await?;
613
614        self.subscriptions.write().await.push("orders".to_string());
615
616        Ok(())
617    }
618
619    /// Subscribes to the account channel (private).
620    ///
621    /// Receives real-time balance/account updates.
622    pub async fn subscribe_account(&self) -> Result<()> {
623        #[allow(clippy::disallowed_methods)]
624        let msg = serde_json::json!({
625            "op": "subscribe",
626            "args": [{
627                "channel": "account"
628            }]
629        });
630
631        self.client.send_json(&msg).await?;
632
633        self.subscriptions.write().await.push("account".to_string());
634
635        Ok(())
636    }
637
638    /// Watches balance updates via the private WebSocket.
639    ///
640    /// Requires prior authentication via `login()`.
641    pub async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
642        if !self.is_connected() {
643            self.connect().await?;
644        }
645
646        self.subscribe_account().await?;
647
648        let (tx, rx) = mpsc::unbounded_channel::<Result<Balance>>();
649        let client = Arc::clone(&self.client);
650
651        tokio::spawn(async move {
652            while let Some(msg) = client.receive().await {
653                if is_account_message(&msg) {
654                    match parse_ws_balance(&msg) {
655                        Ok(balance) => {
656                            if tx.send(Ok(balance)).is_err() {
657                                break;
658                            }
659                        }
660                        Err(e) => {
661                            warn!(error = %e, "Failed to parse OKX balance update");
662                            if tx.send(Err(e)).is_err() {
663                                break;
664                            }
665                        }
666                    }
667                }
668            }
669        });
670
671        Ok(Box::pin(ReceiverStream::new(rx)))
672    }
673
674    /// Watches order updates via the private WebSocket.
675    ///
676    /// Requires prior authentication via `login()`.
677    pub async fn watch_orders(
678        &self,
679        symbol_filter: Option<String>,
680    ) -> Result<MessageStream<Order>> {
681        if !self.is_connected() {
682            self.connect().await?;
683        }
684
685        self.subscribe_orders(None).await?;
686
687        let (tx, rx) = mpsc::unbounded_channel::<Result<Order>>();
688        let client = Arc::clone(&self.client);
689
690        tokio::spawn(async move {
691            while let Some(msg) = client.receive().await {
692                if is_orders_message(&msg) {
693                    if let Some(data_array) = msg.get("data").and_then(|d| d.as_array()) {
694                        for data in data_array {
695                            // Apply symbol filter if specified
696                            if let Some(ref filter) = symbol_filter {
697                                let inst_id =
698                                    data.get("instId").and_then(|i| i.as_str()).unwrap_or("");
699                                let unified = inst_id.replace('-', "/");
700                                if unified != *filter && inst_id != filter.as_str() {
701                                    continue;
702                                }
703                            }
704
705                            match parse_ws_order(data) {
706                                Ok(order) => {
707                                    if tx.send(Ok(order)).is_err() {
708                                        return;
709                                    }
710                                }
711                                Err(e) => {
712                                    warn!(error = %e, "Failed to parse OKX order update");
713                                    if tx.send(Err(e)).is_err() {
714                                        return;
715                                    }
716                                }
717                            }
718                        }
719                    }
720                }
721            }
722        });
723
724        Ok(Box::pin(ReceiverStream::new(rx)))
725    }
726
727    /// Watches trade fills via the private WebSocket (orders channel).
728    ///
729    /// Extracts trade information from order execution reports.
730    /// Requires prior authentication via `login()`.
731    pub async fn watch_my_trades(
732        &self,
733        symbol_filter: Option<String>,
734    ) -> Result<MessageStream<Trade>> {
735        if !self.is_connected() {
736            self.connect().await?;
737        }
738
739        self.subscribe_orders(None).await?;
740
741        let (tx, rx) = mpsc::unbounded_channel::<Result<Trade>>();
742        let client = Arc::clone(&self.client);
743
744        tokio::spawn(async move {
745            while let Some(msg) = client.receive().await {
746                if is_orders_message(&msg) {
747                    if let Some(data_array) = msg.get("data").and_then(|d| d.as_array()) {
748                        for data in data_array {
749                            // Only emit trades for filled/partially filled orders
750                            let fill_sz = data
751                                .get("fillSz")
752                                .and_then(|v| v.as_str())
753                                .and_then(|s| s.parse::<f64>().ok())
754                                .unwrap_or(0.0);
755
756                            if fill_sz <= 0.0 {
757                                continue;
758                            }
759
760                            // Apply symbol filter
761                            if let Some(ref filter) = symbol_filter {
762                                let inst_id =
763                                    data.get("instId").and_then(|i| i.as_str()).unwrap_or("");
764                                let unified = inst_id.replace('-', "/");
765                                if unified != *filter && inst_id != filter.as_str() {
766                                    continue;
767                                }
768                            }
769
770                            match parse_ws_my_trade(data) {
771                                Ok(trade) => {
772                                    if tx.send(Ok(trade)).is_err() {
773                                        return;
774                                    }
775                                }
776                                Err(e) => {
777                                    warn!(error = %e, "Failed to parse OKX trade update");
778                                    if tx.send(Err(e)).is_err() {
779                                        return;
780                                    }
781                                }
782                            }
783                        }
784                    }
785                }
786            }
787        });
788
789        Ok(Box::pin(ReceiverStream::new(rx)))
790    }
791}
792
793// ============================================================================
794// Stream Wrapper
795// ============================================================================
796
797/// A stream wrapper that converts an mpsc receiver into a futures Stream.
798struct ReceiverStream<T> {
799    receiver: mpsc::UnboundedReceiver<T>,
800}
801
802impl<T> ReceiverStream<T> {
803    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
804        Self { receiver }
805    }
806}
807
808impl<T> Stream for ReceiverStream<T> {
809    type Item = T;
810
811    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
812        self.receiver.poll_recv(cx)
813    }
814}
815
816// ============================================================================
817// Message Type Detection Helpers
818// ============================================================================
819
820/// Check if a WebSocket message is a ticker message for the given symbol.
821fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
822    // OKX V5 WebSocket ticker format:
823    // {"arg":{"channel":"tickers","instId":"BTC-USDT"},"data":[{...}]}
824    if let Some(arg) = msg.get("arg") {
825        let channel = arg.get("channel").and_then(|c| c.as_str());
826        let inst_id = arg.get("instId").and_then(|i| i.as_str());
827        let has_data = msg
828            .get("data")
829            .and_then(|d| d.as_array())
830            .and_then(|arr| arr.first())
831            .is_some();
832        channel == Some("tickers") && inst_id == Some(symbol) && has_data
833    } else {
834        false
835    }
836}
837
838/// Check if a WebSocket message is an orderbook message for the given symbol.
839fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
840    // OKX V5 WebSocket orderbook format:
841    // {"arg":{"channel":"books5","instId":"BTC-USDT"},"data":[{...}]}
842    if let Some(arg) = msg.get("arg") {
843        let channel = arg.get("channel").and_then(|c| c.as_str());
844        let inst_id = arg.get("instId").and_then(|i| i.as_str());
845        let has_data = msg
846            .get("data")
847            .and_then(|d| d.as_array())
848            .and_then(|arr| arr.first())
849            .is_some();
850        if let (Some(ch), Some(id)) = (channel, inst_id) {
851            ch.starts_with("books") && id == symbol && has_data
852        } else {
853            false
854        }
855    } else {
856        false
857    }
858}
859
860/// Check if a WebSocket message is a trade message for the given symbol.
861fn is_trade_message(msg: &Value, symbol: &str) -> bool {
862    // OKX V5 WebSocket trade format:
863    // {"arg":{"channel":"trades","instId":"BTC-USDT"},"data":[{...}]}
864    if let Some(arg) = msg.get("arg") {
865        let channel = arg.get("channel").and_then(|c| c.as_str());
866        let inst_id = arg.get("instId").and_then(|i| i.as_str());
867        let has_data = msg
868            .get("data")
869            .and_then(|d| d.as_array())
870            .and_then(|arr| arr.first())
871            .is_some();
872        channel == Some("trades") && inst_id == Some(symbol) && has_data
873    } else {
874        false
875    }
876}
877
878/// Format an OKX symbol (e.g., "BTC-USDT") to unified format (e.g., "BTC/USDT").
879fn format_unified_symbol(symbol: &str) -> String {
880    symbol.replace('-', "/")
881}
882
883// ============================================================================
884// WebSocket Message Parsers
885// ============================================================================
886
887/// Parse a WebSocket ticker message.
888pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
889    // OKX V5 WebSocket ticker format:
890    // {"arg":{"channel":"tickers","instId":"BTC-USDT"},"data":[{...}]}
891    let data = msg
892        .get("data")
893        .and_then(|d| d.as_array())
894        .and_then(|arr| arr.first())
895        .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
896
897    parse_ticker(data, market)
898}
899
900/// Parse a WebSocket orderbook message.
901pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
902    // OKX V5 WebSocket orderbook format:
903    // {"arg":{"channel":"books5","instId":"BTC-USDT"},"data":[{"asks":[...],"bids":[...],"ts":"..."}]}
904    let data = msg
905        .get("data")
906        .and_then(|d| d.as_array())
907        .and_then(|arr| arr.first())
908        .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
909
910    parse_orderbook(data, symbol)
911}
912
913/// Parse a WebSocket trade message (single trade).
914pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
915    // OKX V5 WebSocket trade format:
916    // {"arg":{"channel":"trades","instId":"BTC-USDT"},"data":[{...}]}
917    let data = msg
918        .get("data")
919        .and_then(|d| d.as_array())
920        .and_then(|arr| arr.first())
921        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
922
923    parse_trade(data, market)
924}
925
926/// Parse a WebSocket trade message (multiple trades).
927pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
928    // OKX V5 WebSocket trade format:
929    // {"arg":{"channel":"trades","instId":"BTC-USDT"},"data":[{...}, {...}]}
930    let data_array = msg
931        .get("data")
932        .and_then(|d| d.as_array())
933        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
934
935    let mut trades = Vec::with_capacity(data_array.len());
936    for data in data_array {
937        trades.push(parse_trade(data, market)?);
938    }
939
940    Ok(trades)
941}
942
943// ============================================================================
944// Private Message Type Detection Helpers
945// ============================================================================
946
947/// Check if a WebSocket message is an account/balance update.
948fn is_account_message(msg: &Value) -> bool {
949    if let Some(arg) = msg.get("arg") {
950        arg.get("channel").and_then(|c| c.as_str()) == Some("account")
951    } else {
952        false
953    }
954}
955
956/// Check if a WebSocket message is an orders update.
957fn is_orders_message(msg: &Value) -> bool {
958    if let Some(arg) = msg.get("arg") {
959        arg.get("channel").and_then(|c| c.as_str()) == Some("orders")
960    } else {
961        false
962    }
963}
964
965// ============================================================================
966// Private WebSocket Message Parsers
967// ============================================================================
968
969/// Parse a WebSocket account/balance message into a Balance.
970///
971/// OKX account update format:
972/// ```json
973/// {
974///   "arg": {"channel": "account"},
975///   "data": [{
976///     "uTime": "1700000000000",
977///     "totalEq": "100000.00",
978///     "details": [
979///       {"ccy": "USDT", "availBal": "50000", "frozenBal": "10000", "eq": "60000"},
980///       {"ccy": "BTC", "availBal": "1.5", "frozenBal": "0.5", "eq": "2.0"}
981///     ]
982///   }]
983/// }
984/// ```
985pub fn parse_ws_balance(msg: &Value) -> Result<Balance> {
986    let data = msg
987        .get("data")
988        .and_then(|d| d.as_array())
989        .and_then(|arr| arr.first())
990        .ok_or_else(|| Error::invalid_request("Missing data in account message"))?;
991
992    // uTime is available but Balance struct doesn't have a timestamp field
993    let mut info = HashMap::new();
994
995    if let Some(details) = data.get("details").and_then(|d| d.as_array()) {
996        for detail in details {
997            let currency = detail
998                .get("ccy")
999                .and_then(|c| c.as_str())
1000                .unwrap_or_default()
1001                .to_string();
1002
1003            if currency.is_empty() {
1004                continue;
1005            }
1006
1007            let free = detail
1008                .get("availBal")
1009                .and_then(|v| v.as_str())
1010                .and_then(|s| Decimal::from_str(s).ok())
1011                .unwrap_or_default();
1012
1013            let used = detail
1014                .get("frozenBal")
1015                .and_then(|v| v.as_str())
1016                .and_then(|s| Decimal::from_str(s).ok())
1017                .unwrap_or_default();
1018
1019            let total = free + used;
1020
1021            info.insert(currency, BalanceEntry { free, used, total });
1022        }
1023    }
1024
1025    Ok(Balance {
1026        balances: info,
1027        info: HashMap::new(),
1028    })
1029}
1030
1031/// Parse a WebSocket order update message into an Order.
1032///
1033/// OKX order update format:
1034/// ```json
1035/// {
1036///   "instId": "BTC-USDT",
1037///   "ordId": "123456",
1038///   "clOrdId": "client123",
1039///   "side": "buy",
1040///   "ordType": "limit",
1041///   "px": "50000",
1042///   "sz": "0.1",
1043///   "fillPx": "49999",
1044///   "fillSz": "0.05",
1045///   "avgPx": "49999",
1046///   "state": "partially_filled",
1047///   "fee": "-0.5",
1048///   "feeCcy": "USDT",
1049///   "uTime": "1700000000000",
1050///   "cTime": "1700000000000"
1051/// }
1052/// ```
1053pub fn parse_ws_order(data: &Value) -> Result<Order> {
1054    let inst_id = data
1055        .get("instId")
1056        .and_then(|v| v.as_str())
1057        .unwrap_or_default();
1058    let symbol = inst_id.replace('-', "/");
1059
1060    let id = data
1061        .get("ordId")
1062        .and_then(|v| v.as_str())
1063        .unwrap_or_default()
1064        .to_string();
1065
1066    let client_order_id = data
1067        .get("clOrdId")
1068        .and_then(|v| v.as_str())
1069        .map(ToString::to_string);
1070
1071    let side = match data.get("side").and_then(|v| v.as_str()) {
1072        Some("sell") => OrderSide::Sell,
1073        _ => OrderSide::Buy,
1074    };
1075
1076    let order_type = match data.get("ordType").and_then(|v| v.as_str()) {
1077        Some("market") => OrderType::Market,
1078        _ => OrderType::Limit,
1079    };
1080
1081    let price = data
1082        .get("px")
1083        .and_then(|v| v.as_str())
1084        .and_then(|s| Decimal::from_str(s).ok());
1085
1086    let amount = data
1087        .get("sz")
1088        .and_then(|v| v.as_str())
1089        .and_then(|s| Decimal::from_str(s).ok())
1090        .unwrap_or_default();
1091
1092    let filled = data
1093        .get("accFillSz")
1094        .and_then(|v| v.as_str())
1095        .and_then(|s| Decimal::from_str(s).ok());
1096
1097    let average = data
1098        .get("avgPx")
1099        .and_then(|v| v.as_str())
1100        .and_then(|s| Decimal::from_str(s).ok())
1101        .filter(|d| !d.is_zero());
1102
1103    let cost = match (filled, average) {
1104        (Some(f), Some(a)) => Some(f * a),
1105        _ => None,
1106    };
1107
1108    let status = match data.get("state").and_then(|v| v.as_str()) {
1109        Some("filled") => OrderStatus::Closed,
1110        Some("canceled" | "cancelled") => OrderStatus::Cancelled,
1111        _ => OrderStatus::Open,
1112    };
1113
1114    let fee = {
1115        let fee_amount = data
1116            .get("fee")
1117            .and_then(|v| v.as_str())
1118            .and_then(|s| Decimal::from_str(s).ok());
1119        let fee_currency = data
1120            .get("feeCcy")
1121            .and_then(|v| v.as_str())
1122            .unwrap_or_default()
1123            .to_string();
1124        fee_amount.map(|f| Fee::new(fee_currency, f.abs()))
1125    };
1126
1127    let timestamp = data
1128        .get("cTime")
1129        .and_then(|v| v.as_str())
1130        .and_then(|s| s.parse::<i64>().ok());
1131
1132    Ok(Order {
1133        id,
1134        client_order_id,
1135        symbol,
1136        order_type,
1137        side,
1138        price,
1139        amount,
1140        filled,
1141        remaining: None,
1142        cost,
1143        average,
1144        status,
1145        fee,
1146        fees: None,
1147        timestamp,
1148        datetime: None,
1149        last_trade_timestamp: None,
1150        time_in_force: None,
1151        post_only: None,
1152        reduce_only: None,
1153        stop_price: None,
1154        trigger_price: None,
1155        take_profit_price: None,
1156        stop_loss_price: None,
1157        trailing_delta: None,
1158        trailing_percent: None,
1159        activation_price: None,
1160        callback_rate: None,
1161        working_type: None,
1162        trades: None,
1163        info: HashMap::new(),
1164    })
1165}
1166
1167/// Parse a WebSocket order fill into a Trade (my trade).
1168///
1169/// Extracts the latest fill information from an order update.
1170pub fn parse_ws_my_trade(data: &Value) -> Result<Trade> {
1171    let inst_id = data
1172        .get("instId")
1173        .and_then(|v| v.as_str())
1174        .unwrap_or_default();
1175    let symbol = inst_id.replace('-', "/");
1176
1177    let trade_id = data
1178        .get("tradeId")
1179        .and_then(|v| v.as_str())
1180        .map(ToString::to_string);
1181
1182    let order_id = data
1183        .get("ordId")
1184        .and_then(|v| v.as_str())
1185        .map(ToString::to_string);
1186
1187    let side = match data.get("side").and_then(|v| v.as_str()) {
1188        Some("buy") => OrderSide::Buy,
1189        _ => OrderSide::Sell,
1190    };
1191
1192    let fill_px = data
1193        .get("fillPx")
1194        .and_then(|v| v.as_str())
1195        .and_then(|s| Decimal::from_str(s).ok())
1196        .unwrap_or_default();
1197
1198    let fill_sz = data
1199        .get("fillSz")
1200        .and_then(|v| v.as_str())
1201        .and_then(|s| Decimal::from_str(s).ok())
1202        .unwrap_or_default();
1203
1204    let timestamp = data
1205        .get("fillTime")
1206        .or_else(|| data.get("uTime"))
1207        .and_then(|v| v.as_str())
1208        .and_then(|s| s.parse::<i64>().ok())
1209        .unwrap_or(0);
1210
1211    let fee = {
1212        let fee_amount = data
1213            .get("fillFee")
1214            .or_else(|| data.get("fee"))
1215            .and_then(|v| v.as_str())
1216            .and_then(|s| Decimal::from_str(s).ok());
1217        let fee_currency = data
1218            .get("fillFeeCcy")
1219            .or_else(|| data.get("feeCcy"))
1220            .and_then(|v| v.as_str())
1221            .unwrap_or_default()
1222            .to_string();
1223        fee_amount.map(|f| Fee::new(fee_currency, f.abs()))
1224    };
1225
1226    Ok(Trade {
1227        id: trade_id,
1228        order: order_id,
1229        symbol,
1230        trade_type: None,
1231        side,
1232        taker_or_maker: None,
1233        price: Price(fill_px),
1234        amount: Amount(fill_sz),
1235        cost: Some(Cost(fill_px * fill_sz)),
1236        fee,
1237        timestamp,
1238        datetime: None,
1239        info: HashMap::new(),
1240    })
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245    use super::*;
1246    use ccxt_core::types::financial::Price;
1247    use rust_decimal_macros::dec;
1248
1249    #[test]
1250    fn test_okx_ws_creation() {
1251        let ws = OkxWs::new("wss://ws.okx.com:8443/ws/v5/public".to_string());
1252        assert!(ws.subscriptions.try_read().is_ok());
1253    }
1254
1255    #[tokio::test]
1256    async fn test_subscriptions_empty_by_default() {
1257        let ws = OkxWs::new("wss://ws.okx.com:8443/ws/v5/public".to_string());
1258        let subs = ws.subscriptions().await;
1259        assert!(subs.is_empty());
1260    }
1261
1262    // ==================== Message Type Detection Tests ====================
1263
1264    #[test]
1265    fn test_is_ticker_message_true() {
1266        let msg = serde_json::from_str(
1267            r#"{
1268                "arg": {"channel": "tickers", "instId": "BTC-USDT"},
1269                "data": [{}]
1270            }"#,
1271        )
1272        .unwrap();
1273
1274        assert!(is_ticker_message(&msg, "BTC-USDT"));
1275    }
1276
1277    #[test]
1278    fn test_is_ticker_message_wrong_symbol() {
1279        let msg = serde_json::from_str(
1280            r#"{
1281                "arg": {"channel": "tickers", "instId": "ETH-USDT"},
1282                "data": [{}]
1283            }"#,
1284        )
1285        .unwrap();
1286
1287        assert!(!is_ticker_message(&msg, "BTC-USDT"));
1288    }
1289
1290    #[test]
1291    fn test_is_ticker_message_wrong_channel() {
1292        let msg = serde_json::from_str(
1293            r#"{
1294                "arg": {"channel": "trades", "instId": "BTC-USDT"},
1295                "data": [{}]
1296            }"#,
1297        )
1298        .unwrap();
1299
1300        assert!(!is_ticker_message(&msg, "BTC-USDT"));
1301    }
1302
1303    #[test]
1304    fn test_is_orderbook_message_books5() {
1305        let msg = serde_json::from_str(
1306            r#"{
1307                "arg": {"channel": "books5", "instId": "BTC-USDT"},
1308                "data": [{}]
1309            }"#,
1310        )
1311        .unwrap();
1312
1313        assert!(is_orderbook_message(&msg, "BTC-USDT"));
1314    }
1315
1316    #[test]
1317    fn test_is_orderbook_message_books50() {
1318        let msg = serde_json::from_str(
1319            r#"{
1320                "arg": {"channel": "books50-l2", "instId": "BTC-USDT"},
1321                "data": [{}]
1322            }"#,
1323        )
1324        .unwrap();
1325
1326        assert!(is_orderbook_message(&msg, "BTC-USDT"));
1327    }
1328
1329    #[test]
1330    fn test_is_trade_message_true() {
1331        let msg = serde_json::from_str(
1332            r#"{
1333                "arg": {"channel": "trades", "instId": "BTC-USDT"},
1334                "data": [{}]
1335            }"#,
1336        )
1337        .unwrap();
1338
1339        assert!(is_trade_message(&msg, "BTC-USDT"));
1340    }
1341
1342    #[test]
1343    fn test_format_unified_symbol() {
1344        assert_eq!(format_unified_symbol("BTC-USDT"), "BTC/USDT");
1345        assert_eq!(format_unified_symbol("ETH-BTC"), "ETH/BTC");
1346    }
1347
1348    // ==================== Ticker Message Parsing Tests ====================
1349
1350    #[test]
1351    fn test_parse_ws_ticker() {
1352        let msg = serde_json::from_str(
1353            r#"{
1354                "arg": {"channel": "tickers", "instId": "BTC-USDT"},
1355                "data": [{
1356                    "instId": "BTC-USDT",
1357                    "last": "50000.00",
1358                    "high24h": "51000.00",
1359                    "low24h": "49000.00",
1360                    "bidPx": "49999.00",
1361                    "askPx": "50001.00",
1362                    "vol24h": "1000.5",
1363                    "ts": "1700000000000"
1364                }]
1365            }"#,
1366        )
1367        .unwrap();
1368
1369        let ticker = parse_ws_ticker(&msg, None).unwrap();
1370        // Parser converts BTC-USDT to BTC/USDT (unified format) when no market is provided
1371        assert_eq!(ticker.symbol, "BTC/USDT");
1372        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
1373        assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
1374        assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
1375    }
1376
1377    #[test]
1378    fn test_parse_ws_ticker_with_market() {
1379        let msg = serde_json::from_str(
1380            r#"{
1381                "arg": {"channel": "tickers", "instId": "BTC-USDT"},
1382                "data": [{
1383                    "instId": "BTC-USDT",
1384                    "last": "50000.00",
1385                    "ts": "1700000000000"
1386                }]
1387            }"#,
1388        )
1389        .unwrap();
1390
1391        let market = Market {
1392            id: "BTC-USDT".to_string(),
1393            symbol: "BTC/USDT".to_string(),
1394            base: "BTC".to_string(),
1395            quote: "USDT".to_string(),
1396            ..Default::default()
1397        };
1398
1399        let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
1400        assert_eq!(ticker.symbol, "BTC/USDT");
1401    }
1402
1403    #[test]
1404    fn test_parse_ws_ticker_missing_data() {
1405        let msg = serde_json::from_str(
1406            r#"{
1407                "arg": {"channel": "tickers", "instId": "BTC-USDT"}
1408            }"#,
1409        )
1410        .unwrap();
1411
1412        let result = parse_ws_ticker(&msg, None);
1413        assert!(result.is_err());
1414    }
1415
1416    // ==================== OrderBook Message Parsing Tests ====================
1417
1418    #[test]
1419    fn test_parse_ws_orderbook() {
1420        let msg = serde_json::from_str(
1421            r#"{
1422                "arg": {"channel": "books5", "instId": "BTC-USDT"},
1423                "data": [{
1424                    "asks": [
1425                        ["50001.00", "1.0", "0", "1"],
1426                        ["50002.00", "3.0", "0", "2"],
1427                        ["50003.00", "2.5", "0", "1"]
1428                    ],
1429                    "bids": [
1430                        ["50000.00", "1.5", "0", "2"],
1431                        ["49999.00", "2.0", "0", "1"],
1432                        ["49998.00", "0.5", "0", "1"]
1433                    ],
1434                    "ts": "1700000000000"
1435                }]
1436            }"#,
1437        )
1438        .unwrap();
1439
1440        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
1441        assert_eq!(orderbook.symbol, "BTC/USDT");
1442        assert_eq!(orderbook.bids.len(), 3);
1443        assert_eq!(orderbook.asks.len(), 3);
1444
1445        // Verify bids are sorted in descending order
1446        assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
1447        assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
1448
1449        // Verify asks are sorted in ascending order
1450        assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
1451        assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
1452    }
1453
1454    #[test]
1455    fn test_parse_ws_orderbook_missing_data() {
1456        let msg = serde_json::from_str(
1457            r#"{
1458                "arg": {"channel": "books5", "instId": "BTC-USDT"}
1459            }"#,
1460        )
1461        .unwrap();
1462
1463        let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
1464        assert!(result.is_err());
1465    }
1466
1467    // ==================== Trade Message Parsing Tests ====================
1468
1469    #[test]
1470    fn test_parse_ws_trade() {
1471        let msg = serde_json::from_str(
1472            r#"{
1473                "arg": {"channel": "trades", "instId": "BTC-USDT"},
1474                "data": [{
1475                    "instId": "BTC-USDT",
1476                    "tradeId": "123456789",
1477                    "px": "50000.00",
1478                    "sz": "0.5",
1479                    "side": "buy",
1480                    "ts": "1700000000000"
1481                }]
1482            }"#,
1483        )
1484        .unwrap();
1485
1486        let trade = parse_ws_trade(&msg, None).unwrap();
1487        assert_eq!(trade.timestamp, 1700000000000);
1488    }
1489
1490    #[test]
1491    fn test_parse_ws_trades_multiple() {
1492        let msg = serde_json::from_str(
1493            r#"{
1494                "arg": {"channel": "trades", "instId": "BTC-USDT"},
1495                "data": [
1496                    {
1497                        "instId": "BTC-USDT",
1498                        "tradeId": "123456789",
1499                        "px": "50000.00",
1500                        "sz": "0.5",
1501                        "side": "buy",
1502                        "ts": "1700000000000"
1503                    },
1504                    {
1505                        "instId": "BTC-USDT",
1506                        "tradeId": "123456790",
1507                        "px": "50001.00",
1508                        "sz": "1.0",
1509                        "side": "sell",
1510                        "ts": "1700000000001"
1511                    }
1512                ]
1513            }"#,
1514        )
1515        .unwrap();
1516
1517        let trades = parse_ws_trades(&msg, None).unwrap();
1518        assert_eq!(trades.len(), 2);
1519    }
1520
1521    #[test]
1522    fn test_parse_ws_trade_missing_data() {
1523        let msg = serde_json::from_str(
1524            r#"{
1525                "arg": {"channel": "trades", "instId": "BTC-USDT"}
1526            }"#,
1527        )
1528        .unwrap();
1529
1530        let result = parse_ws_trade(&msg, None);
1531        assert!(result.is_err());
1532    }
1533
1534    #[test]
1535    fn test_parse_ws_trades_empty_array() {
1536        let msg = serde_json::from_str(
1537            r#"{
1538                "arg": {"channel": "trades", "instId": "BTC-USDT"},
1539                "data": []
1540            }"#,
1541        )
1542        .unwrap();
1543
1544        let trades = parse_ws_trades(&msg, None).unwrap();
1545        assert!(trades.is_empty());
1546    }
1547}