Skip to main content

ccxt_exchanges/bitget/
ws.rs

1//! Bitget WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for Bitget exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (balance, orders) with automatic reconnection.
6
7use crate::bitget::auth::BitgetAuth;
8use crate::bitget::parser::{parse_orderbook, parse_ticker, parse_trade};
9use ccxt_core::error::{Error, Result};
10use ccxt_core::types::{
11    Balance, BalanceEntry, Fee, Market, Order, OrderBook, OrderSide, OrderStatus, OrderType,
12    Ticker, Trade,
13    financial::{Amount, Cost, Price},
14};
15use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
16use ccxt_core::ws_exchange::MessageStream;
17use futures::Stream;
18use rust_decimal::Decimal;
19use rust_decimal::prelude::FromStr;
20use serde_json::Value;
21use std::collections::HashMap;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25use tokio::sync::{RwLock, mpsc};
26use tracing::{debug, warn};
27
28/// Default ping interval for Bitget WebSocket (30 seconds).
29const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
30
31/// Default reconnect delay (5 seconds).
32const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
33
34/// Maximum reconnect attempts.
35const MAX_RECONNECT_ATTEMPTS: u32 = 10;
36
37/// Bitget WebSocket client.
38///
39/// Provides real-time data streaming for Bitget exchange.
40pub struct BitgetWs {
41    /// WebSocket client instance.
42    client: Arc<WsClient>,
43    /// Active subscriptions.
44    subscriptions: Arc<RwLock<Vec<String>>>,
45}
46
47impl BitgetWs {
48    /// Creates a new Bitget WebSocket client.
49    ///
50    /// # Arguments
51    ///
52    /// * `url` - WebSocket server URL
53    pub fn new(url: String) -> Self {
54        let config = WsConfig {
55            url: url.clone(),
56            connect_timeout: 10000,
57            ping_interval: DEFAULT_PING_INTERVAL_MS,
58            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
59            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
60            auto_reconnect: true,
61            enable_compression: false,
62            pong_timeout: 90000,
63            ..Default::default()
64        };
65
66        Self {
67            client: Arc::new(WsClient::new(config)),
68            subscriptions: Arc::new(RwLock::new(Vec::new())),
69        }
70    }
71
72    /// Connects to the WebSocket server.
73    pub async fn connect(&self) -> Result<()> {
74        self.client.connect().await
75    }
76
77    /// Disconnects from the WebSocket server.
78    pub async fn disconnect(&self) -> Result<()> {
79        self.client.disconnect().await
80    }
81
82    /// Returns the current connection state.
83    pub fn state(&self) -> WsConnectionState {
84        self.client.state()
85    }
86
87    /// Checks if the WebSocket is connected.
88    pub fn is_connected(&self) -> bool {
89        self.client.is_connected()
90    }
91
92    /// Receives the next message from the WebSocket.
93    pub async fn receive(&self) -> Option<Value> {
94        self.client.receive().await
95    }
96
97    /// Subscribes to a ticker stream.
98    ///
99    /// # Arguments
100    ///
101    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
102    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
103        let mut arg_map = serde_json::Map::new();
104        arg_map.insert(
105            "instType".to_string(),
106            serde_json::Value::String("SPOT".to_string()),
107        );
108        arg_map.insert(
109            "channel".to_string(),
110            serde_json::Value::String("ticker".to_string()),
111        );
112        arg_map.insert(
113            "instId".to_string(),
114            serde_json::Value::String(symbol.to_string()),
115        );
116        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
117
118        let mut msg_map = serde_json::Map::new();
119        msg_map.insert(
120            "op".to_string(),
121            serde_json::Value::String("subscribe".to_string()),
122        );
123        msg_map.insert("args".to_string(), args);
124        let msg = serde_json::Value::Object(msg_map);
125
126        self.client.send_json(&msg).await?;
127
128        let sub_key = format!("ticker:{}", symbol);
129        self.subscriptions.write().await.push(sub_key);
130
131        Ok(())
132    }
133
134    /// Subscribes to multiple ticker streams.
135    ///
136    /// # Arguments
137    ///
138    /// * `symbols` - List of trading pair symbols (e.g., `["BTCUSDT", "ETHUSDT"]`)
139    pub async fn subscribe_tickers(&self, symbols: &[String]) -> Result<()> {
140        let mut args = Vec::new();
141        for symbol in symbols {
142            let mut arg_map = serde_json::Map::new();
143            arg_map.insert(
144                "instType".to_string(),
145                serde_json::Value::String("SPOT".to_string()),
146            );
147            arg_map.insert(
148                "channel".to_string(),
149                serde_json::Value::String("ticker".to_string()),
150            );
151            arg_map.insert(
152                "instId".to_string(),
153                serde_json::Value::String(symbol.clone()),
154            );
155            args.push(serde_json::Value::Object(arg_map));
156        }
157
158        let mut msg_map = serde_json::Map::new();
159        msg_map.insert(
160            "op".to_string(),
161            serde_json::Value::String("subscribe".to_string()),
162        );
163        msg_map.insert("args".to_string(), serde_json::Value::Array(args));
164        let msg = serde_json::Value::Object(msg_map);
165
166        self.client.send_json(&msg).await?;
167
168        let mut subs = self.subscriptions.write().await;
169        for symbol in symbols {
170            subs.push(format!("ticker:{}", symbol));
171        }
172
173        Ok(())
174    }
175
176    /// Watches ticker updates for multiple symbols.
177    ///
178    /// Returns a stream of `Vec<Ticker>` updates for the specified symbols.
179    ///
180    /// # Arguments
181    ///
182    /// * `symbols` - List of trading pair symbols (e.g., `["BTCUSDT", "ETHUSDT"]`)
183    ///
184    /// # Returns
185    ///
186    /// A `MessageStream<Vec<Ticker>>` that yields ticker updates.
187    pub async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
188        // Ensure connected
189        if !self.is_connected() {
190            self.connect().await?;
191        }
192
193        // Subscribe to ticker channels
194        self.subscribe_tickers(symbols).await?;
195
196        // Create channel for ticker updates
197        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Ticker>>>();
198        let symbols_owned: Vec<String> = symbols.to_vec();
199        let client = Arc::clone(&self.client);
200
201        // Spawn task to process messages and filter ticker updates
202        tokio::spawn(async move {
203            while let Some(msg) = client.receive().await {
204                // Check if this is a ticker message for ANY of our symbols
205                if let Some(arg) = msg.get("arg") {
206                    let channel = arg.get("channel").and_then(|c| c.as_str());
207                    let inst_id = arg.get("instId").and_then(|i| i.as_str());
208
209                    if channel == Some("ticker") {
210                        if let Some(id) = inst_id {
211                            if symbols_owned.iter().any(|s| s == id) {
212                                match parse_ws_ticker(&msg, None) {
213                                    Ok(ticker) => {
214                                        if tx.send(Ok(vec![ticker])).is_err() {
215                                            break; // Receiver dropped
216                                        }
217                                    }
218                                    Err(e) => {
219                                        if tx.send(Err(e)).is_err() {
220                                            break;
221                                        }
222                                    }
223                                }
224                            }
225                        }
226                    }
227                }
228            }
229        });
230
231        Ok(Box::pin(ReceiverStream::new(rx)))
232    }
233    /// * `depth` - Orderbook depth (5, 15, or default)
234    pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
235        let channel = match depth {
236            5 => "books5",
237            15 => "books15",
238            _ => "books",
239        };
240
241        let mut arg_map = serde_json::Map::new();
242        arg_map.insert(
243            "instType".to_string(),
244            serde_json::Value::String("SPOT".to_string()),
245        );
246        arg_map.insert(
247            "channel".to_string(),
248            serde_json::Value::String(channel.to_string()),
249        );
250        arg_map.insert(
251            "instId".to_string(),
252            serde_json::Value::String(symbol.to_string()),
253        );
254        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
255
256        let mut msg_map = serde_json::Map::new();
257        msg_map.insert(
258            "op".to_string(),
259            serde_json::Value::String("subscribe".to_string()),
260        );
261        msg_map.insert("args".to_string(), args);
262        let msg = serde_json::Value::Object(msg_map);
263
264        self.client.send_json(&msg).await?;
265
266        let sub_key = format!("orderbook:{}", symbol);
267        self.subscriptions.write().await.push(sub_key);
268
269        Ok(())
270    }
271
272    /// Subscribes to a trades stream.
273    ///
274    /// # Arguments
275    ///
276    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
277    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
278        let mut arg_map = serde_json::Map::new();
279        arg_map.insert(
280            "instType".to_string(),
281            serde_json::Value::String("SPOT".to_string()),
282        );
283        arg_map.insert(
284            "channel".to_string(),
285            serde_json::Value::String("trade".to_string()),
286        );
287        arg_map.insert(
288            "instId".to_string(),
289            serde_json::Value::String(symbol.to_string()),
290        );
291        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
292
293        let mut msg_map = serde_json::Map::new();
294        msg_map.insert(
295            "op".to_string(),
296            serde_json::Value::String("subscribe".to_string()),
297        );
298        msg_map.insert("args".to_string(), args);
299        let msg = serde_json::Value::Object(msg_map);
300
301        self.client.send_json(&msg).await?;
302
303        let sub_key = format!("trades:{}", symbol);
304        self.subscriptions.write().await.push(sub_key);
305
306        Ok(())
307    }
308
309    /// Subscribes to a kline/candlestick stream.
310    ///
311    /// # Arguments
312    ///
313    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
314    /// * `interval` - Kline interval (e.g., "1m", "5m", "1H")
315    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
316        let channel = format!("candle{}", interval);
317
318        let mut arg_map = serde_json::Map::new();
319        arg_map.insert(
320            "instType".to_string(),
321            serde_json::Value::String("SPOT".to_string()),
322        );
323        arg_map.insert(
324            "channel".to_string(),
325            serde_json::Value::String(channel.clone()),
326        );
327        arg_map.insert(
328            "instId".to_string(),
329            serde_json::Value::String(symbol.to_string()),
330        );
331        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
332
333        let mut msg_map = serde_json::Map::new();
334        msg_map.insert(
335            "op".to_string(),
336            serde_json::Value::String("subscribe".to_string()),
337        );
338        msg_map.insert("args".to_string(), args);
339        let msg = serde_json::Value::Object(msg_map);
340
341        self.client.send_json(&msg).await?;
342
343        let sub_key = format!("kline:{}:{}", symbol, interval);
344        self.subscriptions.write().await.push(sub_key);
345
346        Ok(())
347    }
348
349    /// Unsubscribes from a stream.
350    ///
351    /// # Arguments
352    ///
353    /// * `stream_name` - Stream identifier to unsubscribe from
354    pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
355        // Parse stream name to determine channel and symbol
356        let parts: Vec<&str> = stream_name.split(':').collect();
357        if parts.len() < 2 {
358            return Err(Error::invalid_request(format!(
359                "Invalid stream name: {}",
360                stream_name
361            )));
362        }
363
364        let channel = parts[0];
365        let symbol = parts[1];
366
367        let bitget_channel = match channel {
368            "ticker" => "ticker",
369            "orderbook" => "books",
370            "trades" => "trade",
371            "kline" => {
372                if parts.len() >= 3 {
373                    // For kline, we need the interval
374                    return self.unsubscribe_kline(symbol, parts[2]).await;
375                }
376                return Err(Error::invalid_request(
377                    "Kline unsubscribe requires interval",
378                ));
379            }
380            _ => channel,
381        };
382
383        let mut arg_map = serde_json::Map::new();
384        arg_map.insert(
385            "instType".to_string(),
386            serde_json::Value::String("SPOT".to_string()),
387        );
388        arg_map.insert(
389            "channel".to_string(),
390            serde_json::Value::String(bitget_channel.to_string()),
391        );
392        arg_map.insert(
393            "instId".to_string(),
394            serde_json::Value::String(symbol.to_string()),
395        );
396        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
397
398        let mut msg_map = serde_json::Map::new();
399        msg_map.insert(
400            "op".to_string(),
401            serde_json::Value::String("unsubscribe".to_string()),
402        );
403        msg_map.insert("args".to_string(), args);
404        let msg = serde_json::Value::Object(msg_map);
405
406        self.client.send_json(&msg).await?;
407
408        // Remove from subscriptions
409        let mut subs = self.subscriptions.write().await;
410        subs.retain(|s| s != &stream_name);
411
412        Ok(())
413    }
414
415    /// Unsubscribes from a kline stream.
416    async fn unsubscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
417        let channel = format!("candle{}", interval);
418
419        let mut arg_map = serde_json::Map::new();
420        arg_map.insert(
421            "instType".to_string(),
422            serde_json::Value::String("SPOT".to_string()),
423        );
424        arg_map.insert(
425            "channel".to_string(),
426            serde_json::Value::String(channel.clone()),
427        );
428        arg_map.insert(
429            "instId".to_string(),
430            serde_json::Value::String(symbol.to_string()),
431        );
432        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
433
434        let mut msg_map = serde_json::Map::new();
435        msg_map.insert(
436            "op".to_string(),
437            serde_json::Value::String("unsubscribe".to_string()),
438        );
439        msg_map.insert("args".to_string(), args);
440        let msg = serde_json::Value::Object(msg_map);
441
442        self.client.send_json(&msg).await?;
443
444        let sub_key = format!("kline:{}:{}", symbol, interval);
445        let mut subs = self.subscriptions.write().await;
446        subs.retain(|s| s != &sub_key);
447
448        Ok(())
449    }
450
451    /// Returns the list of active subscriptions.
452    pub async fn subscriptions(&self) -> Vec<String> {
453        self.subscriptions.read().await.clone()
454    }
455
456    // ==================== Private WebSocket Methods ====================
457
458    /// Authenticates with the Bitget private WebSocket.
459    ///
460    /// Bitget WS login uses the same HMAC-SHA256 signing as REST API:
461    /// sign(timestamp + "GET" + "/user/verify", secret)
462    pub async fn login(&self, auth: &BitgetAuth) -> Result<()> {
463        let timestamp = (chrono::Utc::now().timestamp_millis() / 1000).to_string();
464        let sign = auth.sign(&timestamp, "GET", "/user/verify", "");
465
466        #[allow(clippy::disallowed_methods)]
467        let msg = serde_json::json!({
468            "op": "login",
469            "args": [{
470                "apiKey": auth.api_key(),
471                "passphrase": auth.passphrase(),
472                "timestamp": timestamp,
473                "sign": sign
474            }]
475        });
476
477        self.client.send_json(&msg).await?;
478
479        // Wait for login response
480        let timeout = tokio::time::Duration::from_secs(10);
481        let start = tokio::time::Instant::now();
482
483        while start.elapsed() < timeout {
484            if let Some(resp) = self.client.receive().await {
485                if let Some(event) = resp.get("event").and_then(|e| e.as_str()) {
486                    if event == "login" {
487                        let code = resp.get("code").and_then(|c| c.as_str()).unwrap_or("1");
488                        if code == "0" {
489                            debug!("Bitget WebSocket login successful");
490                            return Ok(());
491                        }
492                        let msg_text = resp
493                            .get("msg")
494                            .and_then(|m| m.as_str())
495                            .unwrap_or("Unknown error");
496                        return Err(Error::authentication(format!(
497                            "Bitget WebSocket login failed: {} (code: {})",
498                            msg_text, code
499                        )));
500                    }
501                }
502            }
503        }
504
505        Err(Error::authentication(
506            "Bitget WebSocket login timed out waiting for response",
507        ))
508    }
509
510    /// Subscribes to the orders channel (private).
511    ///
512    /// Receives real-time order updates for the specified instrument type.
513    pub async fn subscribe_orders(&self, inst_type: Option<&str>) -> Result<()> {
514        let inst_type = inst_type.unwrap_or("SPOT");
515
516        #[allow(clippy::disallowed_methods)]
517        let msg = serde_json::json!({
518            "op": "subscribe",
519            "args": [{
520                "instType": inst_type,
521                "channel": "orders",
522                "instId": "default"
523            }]
524        });
525
526        self.client.send_json(&msg).await?;
527        self.subscriptions.write().await.push("orders".to_string());
528        Ok(())
529    }
530
531    /// Subscribes to the account channel (private).
532    ///
533    /// Receives real-time balance/account updates.
534    pub async fn subscribe_account(&self) -> Result<()> {
535        #[allow(clippy::disallowed_methods)]
536        let msg = serde_json::json!({
537            "op": "subscribe",
538            "args": [{
539                "instType": "SPOT",
540                "channel": "account",
541                "coin": "default"
542            }]
543        });
544
545        self.client.send_json(&msg).await?;
546        self.subscriptions.write().await.push("account".to_string());
547        Ok(())
548    }
549
550    /// Watches balance updates via the private WebSocket.
551    ///
552    /// Requires prior authentication via `login()`.
553    pub async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
554        if !self.is_connected() {
555            self.connect().await?;
556        }
557
558        self.subscribe_account().await?;
559
560        let (tx, rx) = mpsc::unbounded_channel::<Result<Balance>>();
561        let client = Arc::clone(&self.client);
562
563        tokio::spawn(async move {
564            while let Some(msg) = client.receive().await {
565                if is_account_message(&msg) {
566                    match parse_ws_balance(&msg) {
567                        Ok(balance) => {
568                            if tx.send(Ok(balance)).is_err() {
569                                break;
570                            }
571                        }
572                        Err(e) => {
573                            warn!(error = %e, "Failed to parse Bitget balance update");
574                            if tx.send(Err(e)).is_err() {
575                                break;
576                            }
577                        }
578                    }
579                }
580            }
581        });
582
583        Ok(Box::pin(ReceiverStream::new(rx)))
584    }
585
586    /// Watches order updates via the private WebSocket.
587    ///
588    /// Requires prior authentication via `login()`.
589    pub async fn watch_orders(
590        &self,
591        symbol_filter: Option<String>,
592    ) -> Result<MessageStream<Order>> {
593        if !self.is_connected() {
594            self.connect().await?;
595        }
596
597        self.subscribe_orders(None).await?;
598
599        let (tx, rx) = mpsc::unbounded_channel::<Result<Order>>();
600        let client = Arc::clone(&self.client);
601
602        tokio::spawn(async move {
603            while let Some(msg) = client.receive().await {
604                if is_orders_message(&msg) {
605                    if let Some(data_array) = msg.get("data").and_then(|d| d.as_array()) {
606                        for data in data_array {
607                            // Apply symbol filter if specified
608                            if let Some(ref filter) = symbol_filter {
609                                let inst_id =
610                                    data.get("instId").and_then(|i| i.as_str()).unwrap_or("");
611                                let unified = format_unified_symbol(inst_id);
612                                if unified != *filter && inst_id != filter.as_str() {
613                                    continue;
614                                }
615                            }
616
617                            match parse_ws_order(data) {
618                                Ok(order) => {
619                                    if tx.send(Ok(order)).is_err() {
620                                        return;
621                                    }
622                                }
623                                Err(e) => {
624                                    warn!(error = %e, "Failed to parse Bitget order update");
625                                    if tx.send(Err(e)).is_err() {
626                                        return;
627                                    }
628                                }
629                            }
630                        }
631                    }
632                }
633            }
634        });
635
636        Ok(Box::pin(ReceiverStream::new(rx)))
637    }
638
639    /// Watches trade fills via the private WebSocket (orders channel).
640    ///
641    /// Extracts trade information from order execution reports.
642    /// Requires prior authentication via `login()`.
643    pub async fn watch_my_trades(
644        &self,
645        symbol_filter: Option<String>,
646    ) -> Result<MessageStream<Trade>> {
647        if !self.is_connected() {
648            self.connect().await?;
649        }
650
651        self.subscribe_orders(None).await?;
652
653        let (tx, rx) = mpsc::unbounded_channel::<Result<Trade>>();
654        let client = Arc::clone(&self.client);
655
656        tokio::spawn(async move {
657            while let Some(msg) = client.receive().await {
658                if is_orders_message(&msg) {
659                    if let Some(data_array) = msg.get("data").and_then(|d| d.as_array()) {
660                        for data in data_array {
661                            // Only emit trades for filled/partially filled orders
662                            let fill_sz = data
663                                .get("fillSz")
664                                .or_else(|| data.get("baseVolume"))
665                                .and_then(|v| v.as_str())
666                                .and_then(|s| s.parse::<f64>().ok())
667                                .unwrap_or(0.0);
668
669                            if fill_sz <= 0.0 {
670                                continue;
671                            }
672
673                            // Apply symbol filter
674                            if let Some(ref filter) = symbol_filter {
675                                let inst_id =
676                                    data.get("instId").and_then(|i| i.as_str()).unwrap_or("");
677                                let unified = format_unified_symbol(inst_id);
678                                if unified != *filter && inst_id != filter.as_str() {
679                                    continue;
680                                }
681                            }
682
683                            match parse_ws_my_trade(data) {
684                                Ok(trade) => {
685                                    if tx.send(Ok(trade)).is_err() {
686                                        return;
687                                    }
688                                }
689                                Err(e) => {
690                                    warn!(error = %e, "Failed to parse Bitget trade update");
691                                    if tx.send(Err(e)).is_err() {
692                                        return;
693                                    }
694                                }
695                            }
696                        }
697                    }
698                }
699            }
700        });
701
702        Ok(Box::pin(ReceiverStream::new(rx)))
703    }
704
705    /// Watches ticker updates for a symbol.
706    ///
707    /// Returns a stream of `Ticker` updates for the specified symbol.
708    ///
709    /// # Arguments
710    ///
711    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
712    /// * `market` - Optional market information for symbol resolution
713    ///
714    /// # Returns
715    ///
716    /// A `MessageStream<Ticker>` that yields ticker updates.
717    ///
718    /// # Example
719    ///
720    /// ```no_run
721    /// use ccxt_exchanges::bitget::ws::BitgetWs;
722    /// use futures::StreamExt;
723    ///
724    /// # async fn example() -> ccxt_core::error::Result<()> {
725    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
726    /// ws.connect().await?;
727    /// let mut stream = ws.watch_ticker("BTCUSDT", None).await?;
728    /// while let Some(result) = stream.next().await {
729    ///     match result {
730    ///         Ok(ticker) => println!("Ticker: {:?}", ticker.last),
731    ///         Err(e) => eprintln!("Error: {}", e),
732    ///     }
733    /// }
734    /// # Ok(())
735    /// # }
736    /// ```
737    pub async fn watch_ticker(
738        &self,
739        symbol: &str,
740        market: Option<Market>,
741    ) -> Result<MessageStream<Ticker>> {
742        // Ensure connected
743        if !self.is_connected() {
744            self.connect().await?;
745        }
746
747        // Subscribe to ticker channel
748        self.subscribe_ticker(symbol).await?;
749
750        // Create channel for ticker updates
751        let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
752        let symbol_owned = symbol.to_string();
753        let client = Arc::clone(&self.client);
754
755        // Spawn task to process messages and filter ticker updates
756        tokio::spawn(async move {
757            while let Some(msg) = client.receive().await {
758                // Check if this is a ticker message for our symbol
759                if is_ticker_message(&msg, &symbol_owned) {
760                    match parse_ws_ticker(&msg, market.as_ref()) {
761                        Ok(ticker) => {
762                            if tx.send(Ok(ticker)).is_err() {
763                                break; // Receiver dropped
764                            }
765                        }
766                        Err(e) => {
767                            if tx.send(Err(e)).is_err() {
768                                break;
769                            }
770                        }
771                    }
772                }
773            }
774        });
775
776        Ok(Box::pin(ReceiverStream::new(rx)))
777    }
778
779    /// Watches order book updates for a symbol.
780    ///
781    /// Returns a stream of `OrderBook` updates for the specified symbol.
782    ///
783    /// # Arguments
784    ///
785    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
786    /// * `limit` - Optional depth limit (5, 15, or full depth)
787    ///
788    /// # Returns
789    ///
790    /// A `MessageStream<OrderBook>` that yields order book updates.
791    ///
792    /// # Example
793    ///
794    /// ```no_run
795    /// use ccxt_exchanges::bitget::ws::BitgetWs;
796    /// use futures::StreamExt;
797    ///
798    /// # async fn example() -> ccxt_core::error::Result<()> {
799    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
800    /// ws.connect().await?;
801    /// let mut stream = ws.watch_order_book("BTCUSDT", Some(5)).await?;
802    /// while let Some(result) = stream.next().await {
803    ///     match result {
804    ///         Ok(orderbook) => println!("Best bid: {:?}", orderbook.bids.first()),
805    ///         Err(e) => eprintln!("Error: {}", e),
806    ///     }
807    /// }
808    /// # Ok(())
809    /// # }
810    /// ```
811    pub async fn watch_order_book(
812        &self,
813        symbol: &str,
814        limit: Option<u32>,
815    ) -> Result<MessageStream<OrderBook>> {
816        // Ensure connected
817        if !self.is_connected() {
818            self.connect().await?;
819        }
820
821        // Subscribe to orderbook channel
822        let depth = limit.unwrap_or(15);
823        self.subscribe_orderbook(symbol, depth).await?;
824
825        // Create channel for orderbook updates
826        let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
827        let symbol_owned = symbol.to_string();
828        let unified_symbol = format_unified_symbol(&symbol_owned);
829        let client = Arc::clone(&self.client);
830
831        // Spawn task to process messages and filter orderbook updates
832        tokio::spawn(async move {
833            while let Some(msg) = client.receive().await {
834                // Check if this is an orderbook message for our symbol
835                if is_orderbook_message(&msg, &symbol_owned) {
836                    match parse_ws_orderbook(&msg, unified_symbol.clone()) {
837                        Ok(orderbook) => {
838                            if tx.send(Ok(orderbook)).is_err() {
839                                break; // Receiver dropped
840                            }
841                        }
842                        Err(e) => {
843                            if tx.send(Err(e)).is_err() {
844                                break;
845                            }
846                        }
847                    }
848                }
849            }
850        });
851
852        Ok(Box::pin(ReceiverStream::new(rx)))
853    }
854
855    /// Watches trade updates for a symbol.
856    ///
857    /// Returns a stream of `Trade` updates for the specified symbol.
858    ///
859    /// # Arguments
860    ///
861    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
862    /// * `market` - Optional market information for symbol resolution
863    ///
864    /// # Returns
865    ///
866    /// A `MessageStream<Vec<Trade>>` that yields trade updates.
867    ///
868    /// # Example
869    ///
870    /// ```no_run
871    /// use ccxt_exchanges::bitget::ws::BitgetWs;
872    /// use futures::StreamExt;
873    ///
874    /// # async fn example() -> ccxt_core::error::Result<()> {
875    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
876    /// ws.connect().await?;
877    /// let mut stream = ws.watch_trades("BTCUSDT", None).await?;
878    /// while let Some(result) = stream.next().await {
879    ///     match result {
880    ///         Ok(trades) => {
881    ///             for trade in trades {
882    ///                 println!("Trade: {:?} @ {:?}", trade.amount, trade.price);
883    ///             }
884    ///         }
885    ///         Err(e) => eprintln!("Error: {}", e),
886    ///     }
887    /// }
888    /// # Ok(())
889    /// # }
890    /// ```
891    pub async fn watch_trades(
892        &self,
893        symbol: &str,
894        market: Option<Market>,
895    ) -> Result<MessageStream<Vec<Trade>>> {
896        // Ensure connected
897        if !self.is_connected() {
898            self.connect().await?;
899        }
900
901        // Subscribe to trades channel
902        self.subscribe_trades(symbol).await?;
903
904        // Create channel for trade updates
905        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
906        let symbol_owned = symbol.to_string();
907        let client = Arc::clone(&self.client);
908
909        // Spawn task to process messages and filter trade updates
910        tokio::spawn(async move {
911            while let Some(msg) = client.receive().await {
912                // Check if this is a trade message for our symbol
913                if is_trade_message(&msg, &symbol_owned) {
914                    match parse_ws_trades(&msg, market.as_ref()) {
915                        Ok(trades) => {
916                            if tx.send(Ok(trades)).is_err() {
917                                break; // Receiver dropped
918                            }
919                        }
920                        Err(e) => {
921                            if tx.send(Err(e)).is_err() {
922                                break;
923                            }
924                        }
925                    }
926                }
927            }
928        });
929
930        Ok(Box::pin(ReceiverStream::new(rx)))
931    }
932}
933
934// ============================================================================
935// Stream Wrapper
936// ============================================================================
937
938/// A stream wrapper that converts an mpsc receiver into a futures Stream.
939struct ReceiverStream<T> {
940    receiver: mpsc::UnboundedReceiver<T>,
941}
942
943impl<T> ReceiverStream<T> {
944    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
945        Self { receiver }
946    }
947}
948
949impl<T> Stream for ReceiverStream<T> {
950    type Item = T;
951
952    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
953        self.receiver.poll_recv(cx)
954    }
955}
956
957// ============================================================================
958// Message Type Detection Helpers
959// ============================================================================
960
961/// Check if a WebSocket message is a ticker message for the given symbol.
962fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
963    if let Some(arg) = msg.get("arg") {
964        let channel = arg.get("channel").and_then(|c| c.as_str());
965        let inst_id = arg.get("instId").and_then(|i| i.as_str());
966
967        channel == Some("ticker") && inst_id == Some(symbol)
968    } else {
969        false
970    }
971}
972
973/// Check if a WebSocket message is an orderbook message for the given symbol.
974fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
975    if let Some(arg) = msg.get("arg") {
976        let channel = arg.get("channel").and_then(|c| c.as_str());
977        let inst_id = arg.get("instId").and_then(|i| i.as_str());
978
979        // Bitget uses books, books5, books15 for orderbook channels
980        let is_orderbook_channel = channel.is_some_and(|c| c.starts_with("books"));
981        is_orderbook_channel && inst_id == Some(symbol)
982    } else {
983        false
984    }
985}
986
987/// Check if a WebSocket message is a trade message for the given symbol.
988fn is_trade_message(msg: &Value, symbol: &str) -> bool {
989    if let Some(arg) = msg.get("arg") {
990        let channel = arg.get("channel").and_then(|c| c.as_str());
991        let inst_id = arg.get("instId").and_then(|i| i.as_str());
992
993        channel == Some("trade") && inst_id == Some(symbol)
994    } else {
995        false
996    }
997}
998
999// ============================================================================
1000// Private Message Type Detection Helpers
1001// ============================================================================
1002
1003/// Check if a WebSocket message is an account/balance update.
1004fn is_account_message(msg: &Value) -> bool {
1005    if let Some(arg) = msg.get("arg") {
1006        arg.get("channel").and_then(|c| c.as_str()) == Some("account")
1007    } else {
1008        false
1009    }
1010}
1011
1012/// Check if a WebSocket message is an orders update.
1013fn is_orders_message(msg: &Value) -> bool {
1014    if let Some(arg) = msg.get("arg") {
1015        arg.get("channel").and_then(|c| c.as_str()) == Some("orders")
1016    } else {
1017        false
1018    }
1019}
1020
1021/// Format a Bitget symbol (e.g., "BTCUSDT") to unified format (e.g., "BTC/USDT").
1022fn format_unified_symbol(symbol: &str) -> String {
1023    // Common quote currencies to detect
1024    let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
1025
1026    for quote in &quote_currencies {
1027        if let Some(base) = symbol.strip_suffix(quote) {
1028            if !base.is_empty() {
1029                return format!("{}/{}", base, quote);
1030            }
1031        }
1032    }
1033
1034    // If no known quote currency found, return as-is
1035    symbol.to_string()
1036}
1037
1038// ============================================================================
1039// WebSocket Message Parsers
1040// ============================================================================
1041
1042/// Parse a WebSocket ticker message.
1043pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
1044    // Bitget WebSocket ticker format:
1045    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"ticker","instId":"BTCUSDT"},"data":[{...}]}
1046    let data = msg
1047        .get("data")
1048        .and_then(|d| d.as_array())
1049        .and_then(|arr| arr.first())
1050        .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
1051
1052    parse_ticker(data, market)
1053}
1054
1055/// Parse a WebSocket orderbook message.
1056pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
1057    // Bitget WebSocket orderbook format:
1058    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"books5","instId":"BTCUSDT"},"data":[{...}]}
1059    let data = msg
1060        .get("data")
1061        .and_then(|d| d.as_array())
1062        .and_then(|arr| arr.first())
1063        .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
1064
1065    parse_orderbook(data, symbol)
1066}
1067
1068/// Parse a WebSocket trade message (single trade).
1069pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
1070    // Bitget WebSocket trade format:
1071    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"trade","instId":"BTCUSDT"},"data":[{...}]}
1072    let data = msg
1073        .get("data")
1074        .and_then(|d| d.as_array())
1075        .and_then(|arr| arr.first())
1076        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
1077
1078    parse_trade(data, market)
1079}
1080
1081/// Parse a WebSocket trade message (multiple trades).
1082pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
1083    // Bitget WebSocket trade format:
1084    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"trade","instId":"BTCUSDT"},"data":[{...}, {...}]}
1085    let data_array = msg
1086        .get("data")
1087        .and_then(|d| d.as_array())
1088        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
1089
1090    let mut trades = Vec::with_capacity(data_array.len());
1091    for data in data_array {
1092        trades.push(parse_trade(data, market)?);
1093    }
1094
1095    Ok(trades)
1096}
1097
1098// ============================================================================
1099// Private WebSocket Message Parsers
1100// ============================================================================
1101
1102/// Parse a WebSocket account/balance message into a Balance.
1103///
1104/// Bitget account update format:
1105/// ```json
1106/// {
1107///   "arg": {"instType": "SPOT", "channel": "account", "coin": "default"},
1108///   "data": [{
1109///     "coin": "USDT",
1110///     "available": "50000",
1111///     "frozen": "10000",
1112///     "uTime": "1700000000000"
1113///   }]
1114/// }
1115/// ```
1116pub fn parse_ws_balance(msg: &Value) -> Result<Balance> {
1117    let data_array = msg
1118        .get("data")
1119        .and_then(|d| d.as_array())
1120        .ok_or_else(|| Error::invalid_request("Missing data in account message"))?;
1121
1122    let mut balances = HashMap::new();
1123
1124    for data in data_array {
1125        // Bitget V2 sends individual coin updates
1126        let currency = data
1127            .get("coin")
1128            .and_then(|c| c.as_str())
1129            .unwrap_or_default()
1130            .to_string();
1131
1132        if currency.is_empty() {
1133            continue;
1134        }
1135
1136        let free = data
1137            .get("available")
1138            .and_then(|v| v.as_str())
1139            .and_then(|s| Decimal::from_str(s).ok())
1140            .unwrap_or_default();
1141
1142        let used = data
1143            .get("frozen")
1144            .and_then(|v| v.as_str())
1145            .and_then(|s| Decimal::from_str(s).ok())
1146            .unwrap_or_default();
1147
1148        let total = free + used;
1149
1150        balances.insert(currency, BalanceEntry { free, used, total });
1151    }
1152
1153    Ok(Balance {
1154        balances,
1155        info: HashMap::new(),
1156    })
1157}
1158
1159/// Parse a WebSocket order update message into an Order.
1160///
1161/// Bitget order update format:
1162/// ```json
1163/// {
1164///   "instId": "BTCUSDT",
1165///   "ordId": "123456",
1166///   "clOrdId": "client123",
1167///   "side": "buy",
1168///   "ordType": "limit",
1169///   "px": "50000",
1170///   "sz": "0.1",
1171///   "fillPx": "49999",
1172///   "fillSz": "0.05",
1173///   "avgPx": "49999",
1174///   "status": "partially_filled",
1175///   "fee": "-0.5",
1176///   "feeCcy": "USDT",
1177///   "uTime": "1700000000000",
1178///   "cTime": "1700000000000"
1179/// }
1180/// ```
1181pub fn parse_ws_order(data: &Value) -> Result<Order> {
1182    let inst_id = data
1183        .get("instId")
1184        .and_then(|v| v.as_str())
1185        .unwrap_or_default();
1186    let symbol = format_unified_symbol(inst_id);
1187
1188    let id = data
1189        .get("ordId")
1190        .and_then(|v| v.as_str())
1191        .unwrap_or_default()
1192        .to_string();
1193
1194    let client_order_id = data
1195        .get("clOrdId")
1196        .and_then(|v| v.as_str())
1197        .map(ToString::to_string);
1198
1199    let side = match data.get("side").and_then(|v| v.as_str()) {
1200        Some("sell") => OrderSide::Sell,
1201        _ => OrderSide::Buy,
1202    };
1203
1204    let order_type = match data.get("ordType").and_then(|v| v.as_str()) {
1205        Some("market") => OrderType::Market,
1206        _ => OrderType::Limit,
1207    };
1208
1209    let price = data
1210        .get("px")
1211        .or_else(|| data.get("price"))
1212        .and_then(|v| v.as_str())
1213        .and_then(|s| Decimal::from_str(s).ok());
1214
1215    let amount = data
1216        .get("sz")
1217        .or_else(|| data.get("size"))
1218        .and_then(|v| v.as_str())
1219        .and_then(|s| Decimal::from_str(s).ok())
1220        .unwrap_or_default();
1221
1222    let filled = data
1223        .get("accFillSz")
1224        .or_else(|| data.get("baseVolume"))
1225        .and_then(|v| v.as_str())
1226        .and_then(|s| Decimal::from_str(s).ok());
1227
1228    let average = data
1229        .get("avgPx")
1230        .and_then(|v| v.as_str())
1231        .and_then(|s| Decimal::from_str(s).ok())
1232        .filter(|d| !d.is_zero());
1233
1234    let cost = match (filled, average) {
1235        (Some(f), Some(a)) => Some(f * a),
1236        _ => None,
1237    };
1238
1239    let status = match data.get("status").and_then(|v| v.as_str()) {
1240        Some("filled" | "full-fill") => OrderStatus::Closed,
1241        Some("cancelled" | "canceled") => OrderStatus::Cancelled,
1242        _ => OrderStatus::Open,
1243    };
1244
1245    let fee = {
1246        let fee_amount = data
1247            .get("fee")
1248            .or_else(|| data.get("feeDetail"))
1249            .and_then(|v| v.as_str())
1250            .and_then(|s| Decimal::from_str(s).ok());
1251        let fee_currency = data
1252            .get("feeCcy")
1253            .and_then(|v| v.as_str())
1254            .unwrap_or_default()
1255            .to_string();
1256        fee_amount.map(|f| Fee::new(fee_currency, f.abs()))
1257    };
1258
1259    let timestamp = data
1260        .get("cTime")
1261        .and_then(|v| v.as_str())
1262        .and_then(|s| s.parse::<i64>().ok());
1263
1264    Ok(Order {
1265        id,
1266        client_order_id,
1267        symbol,
1268        order_type,
1269        side,
1270        price,
1271        amount,
1272        filled,
1273        remaining: None,
1274        cost,
1275        average,
1276        status,
1277        fee,
1278        fees: None,
1279        timestamp,
1280        datetime: None,
1281        last_trade_timestamp: None,
1282        time_in_force: None,
1283        post_only: None,
1284        reduce_only: None,
1285        stop_price: None,
1286        trigger_price: None,
1287        take_profit_price: None,
1288        stop_loss_price: None,
1289        trailing_delta: None,
1290        trailing_percent: None,
1291        activation_price: None,
1292        callback_rate: None,
1293        working_type: None,
1294        trades: None,
1295        info: HashMap::new(),
1296    })
1297}
1298
1299/// Parse a WebSocket order fill into a Trade (my trade).
1300///
1301/// Extracts the latest fill information from an order update.
1302pub fn parse_ws_my_trade(data: &Value) -> Result<Trade> {
1303    let inst_id = data
1304        .get("instId")
1305        .and_then(|v| v.as_str())
1306        .unwrap_or_default();
1307    let symbol = format_unified_symbol(inst_id);
1308
1309    let trade_id = data
1310        .get("tradeId")
1311        .and_then(|v| v.as_str())
1312        .map(ToString::to_string);
1313
1314    let order_id = data
1315        .get("ordId")
1316        .and_then(|v| v.as_str())
1317        .map(ToString::to_string);
1318
1319    let side = match data.get("side").and_then(|v| v.as_str()) {
1320        Some("buy") => OrderSide::Buy,
1321        _ => OrderSide::Sell,
1322    };
1323
1324    let fill_px = data
1325        .get("fillPx")
1326        .or_else(|| data.get("price"))
1327        .and_then(|v| v.as_str())
1328        .and_then(|s| Decimal::from_str(s).ok())
1329        .unwrap_or_default();
1330
1331    let fill_sz = data
1332        .get("fillSz")
1333        .or_else(|| data.get("baseVolume"))
1334        .and_then(|v| v.as_str())
1335        .and_then(|s| Decimal::from_str(s).ok())
1336        .unwrap_or_default();
1337
1338    let timestamp = data
1339        .get("fillTime")
1340        .or_else(|| data.get("uTime"))
1341        .or_else(|| data.get("cTime"))
1342        .and_then(|v| v.as_str())
1343        .and_then(|s| s.parse::<i64>().ok())
1344        .unwrap_or(0);
1345
1346    let fee = {
1347        let fee_amount = data
1348            .get("fillFee")
1349            .or_else(|| data.get("fee"))
1350            .and_then(|v| v.as_str())
1351            .and_then(|s| Decimal::from_str(s).ok());
1352        let fee_currency = data
1353            .get("fillFeeCcy")
1354            .or_else(|| data.get("feeCcy"))
1355            .and_then(|v| v.as_str())
1356            .unwrap_or_default()
1357            .to_string();
1358        fee_amount.map(|f| Fee::new(fee_currency, f.abs()))
1359    };
1360
1361    Ok(Trade {
1362        id: trade_id,
1363        order: order_id,
1364        symbol,
1365        trade_type: None,
1366        side,
1367        taker_or_maker: None,
1368        price: Price(fill_px),
1369        amount: Amount(fill_sz),
1370        cost: Some(Cost(fill_px * fill_sz)),
1371        fee,
1372        timestamp,
1373        datetime: None,
1374        info: HashMap::new(),
1375    })
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380    use super::*;
1381    use ccxt_core::types::financial::Price;
1382    use rust_decimal_macros::dec;
1383
1384    #[test]
1385    fn test_bitget_ws_creation() {
1386        let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
1387        // WsClient is created successfully - config is private so we just verify creation works
1388        assert!(ws.subscriptions.try_read().is_ok());
1389    }
1390
1391    #[tokio::test]
1392    async fn test_subscriptions_empty_by_default() {
1393        let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
1394        let subs = ws.subscriptions().await;
1395        assert!(subs.is_empty());
1396    }
1397
1398    // ==================== Ticker Message Parsing Tests ====================
1399
1400    #[test]
1401    fn test_parse_ws_ticker_snapshot() {
1402        let msg = serde_json::from_str(
1403            r#"{
1404                "action": "snapshot",
1405                "arg": {
1406                    "instType": "SPOT",
1407                    "channel": "ticker",
1408                    "instId": "BTCUSDT"
1409                },
1410                "data": [{
1411                    "instId": "BTCUSDT",
1412                    "lastPr": "50000.00",
1413                    "high24h": "51000.00",
1414                    "low24h": "49000.00",
1415                    "bidPr": "49999.00",
1416                    "askPr": "50001.00",
1417                    "baseVolume": "1000.5",
1418                    "ts": "1700000000000"
1419                }]
1420            }"#,
1421        )
1422        .unwrap();
1423
1424        let ticker = parse_ws_ticker(&msg, None).unwrap();
1425        assert_eq!(ticker.symbol, "BTCUSDT");
1426        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
1427        assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
1428        assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
1429        assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
1430        assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
1431        assert_eq!(ticker.timestamp, 1700000000000);
1432    }
1433
1434    #[test]
1435    fn test_parse_ws_ticker_with_market() {
1436        let msg = serde_json::from_str(
1437            r#"{
1438                "action": "snapshot",
1439                "arg": {
1440                    "instType": "SPOT",
1441                    "channel": "ticker",
1442                    "instId": "BTCUSDT"
1443                },
1444                "data": [{
1445                    "instId": "BTCUSDT",
1446                    "lastPr": "50000.00",
1447                    "ts": "1700000000000"
1448                }]
1449            }"#,
1450        )
1451        .unwrap();
1452
1453        let market = Market {
1454            id: "BTCUSDT".to_string(),
1455            symbol: "BTC/USDT".to_string(),
1456            base: "BTC".to_string(),
1457            quote: "USDT".to_string(),
1458            ..Default::default()
1459        };
1460
1461        let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
1462        assert_eq!(ticker.symbol, "BTC/USDT");
1463        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
1464    }
1465
1466    #[test]
1467    fn test_parse_ws_ticker_missing_data() {
1468        let msg = serde_json::from_str(
1469            r#"{
1470                "action": "snapshot",
1471                "arg": {
1472                    "instType": "SPOT",
1473                    "channel": "ticker",
1474                    "instId": "BTCUSDT"
1475                }
1476            }"#,
1477        )
1478        .unwrap();
1479
1480        let result = parse_ws_ticker(&msg, None);
1481        assert!(result.is_err());
1482    }
1483
1484    #[test]
1485    fn test_parse_ws_ticker_empty_data_array() {
1486        let msg = serde_json::from_str(
1487            r#"{
1488                "action": "snapshot",
1489                "arg": {
1490                    "instType": "SPOT",
1491                    "channel": "ticker",
1492                    "instId": "BTCUSDT"
1493                },
1494                "data": []
1495            }"#,
1496        )
1497        .unwrap();
1498
1499        let result = parse_ws_ticker(&msg, None);
1500        assert!(result.is_err());
1501    }
1502
1503    // ==================== OrderBook Message Parsing Tests ====================
1504
1505    #[test]
1506    fn test_parse_ws_orderbook_snapshot() {
1507        let msg = serde_json::from_str(
1508            r#"{
1509                "action": "snapshot",
1510                "arg": {
1511                    "instType": "SPOT",
1512                    "channel": "books5",
1513                    "instId": "BTCUSDT"
1514                },
1515                "data": [{
1516                    "bids": [
1517                        ["50000.00", "1.5"],
1518                        ["49999.00", "2.0"],
1519                        ["49998.00", "0.5"]
1520                    ],
1521                    "asks": [
1522                        ["50001.00", "1.0"],
1523                        ["50002.00", "3.0"],
1524                        ["50003.00", "2.5"]
1525                    ],
1526                    "ts": "1700000000000"
1527                }]
1528            }"#,
1529        )
1530        .unwrap();
1531
1532        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
1533        assert_eq!(orderbook.symbol, "BTC/USDT");
1534        assert_eq!(orderbook.bids.len(), 3);
1535        assert_eq!(orderbook.asks.len(), 3);
1536
1537        // Verify bids are sorted in descending order
1538        assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
1539        assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
1540        assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
1541
1542        // Verify asks are sorted in ascending order
1543        assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
1544        assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
1545        assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
1546    }
1547
1548    #[test]
1549    fn test_parse_ws_orderbook_update() {
1550        let msg = serde_json::from_str(
1551            r#"{
1552                "action": "update",
1553                "arg": {
1554                    "instType": "SPOT",
1555                    "channel": "books",
1556                    "instId": "ETHUSDT"
1557                },
1558                "data": [{
1559                    "bids": [
1560                        ["2000.00", "10.0"]
1561                    ],
1562                    "asks": [
1563                        ["2001.00", "5.0"]
1564                    ],
1565                    "ts": "1700000000001"
1566                }]
1567            }"#,
1568        )
1569        .unwrap();
1570
1571        let orderbook = parse_ws_orderbook(&msg, "ETH/USDT".to_string()).unwrap();
1572        assert_eq!(orderbook.symbol, "ETH/USDT");
1573        assert_eq!(orderbook.bids.len(), 1);
1574        assert_eq!(orderbook.asks.len(), 1);
1575        assert_eq!(orderbook.timestamp, 1700000000001);
1576    }
1577
1578    #[test]
1579    fn test_parse_ws_orderbook_missing_data() {
1580        let msg = serde_json::from_str(
1581            r#"{
1582                "action": "snapshot",
1583                "arg": {
1584                    "instType": "SPOT",
1585                    "channel": "books5",
1586                    "instId": "BTCUSDT"
1587                }
1588            }"#,
1589        )
1590        .unwrap();
1591
1592        let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
1593        assert!(result.is_err());
1594    }
1595
1596    #[test]
1597    fn test_parse_ws_orderbook_empty_sides() {
1598        let msg = serde_json::from_str(
1599            r#"{
1600                "action": "snapshot",
1601                "arg": {
1602                    "instType": "SPOT",
1603                    "channel": "books5",
1604                    "instId": "BTCUSDT"
1605                },
1606                "data": [{
1607                    "bids": [],
1608                    "asks": [],
1609                    "ts": "1700000000000"
1610                }]
1611            }"#,
1612        )
1613        .unwrap();
1614
1615        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
1616        assert!(orderbook.bids.is_empty());
1617        assert!(orderbook.asks.is_empty());
1618    }
1619
1620    // ==================== Trade Message Parsing Tests ====================
1621
1622    #[test]
1623    fn test_parse_ws_trade_single() {
1624        let msg = serde_json::from_str(
1625            r#"{
1626                "action": "snapshot",
1627                "arg": {
1628                    "instType": "SPOT",
1629                    "channel": "trade",
1630                    "instId": "BTCUSDT"
1631                },
1632                "data": [{
1633                    "tradeId": "123456789",
1634                    "symbol": "BTCUSDT",
1635                    "side": "buy",
1636                    "price": "50000.00",
1637                    "size": "0.5",
1638                    "ts": "1700000000000"
1639                }]
1640            }"#,
1641        )
1642        .unwrap();
1643
1644        let trade = parse_ws_trade(&msg, None).unwrap();
1645        assert_eq!(trade.id, Some("123456789".to_string()));
1646        assert_eq!(trade.side, ccxt_core::types::OrderSide::Buy);
1647        assert_eq!(trade.price, Price::new(dec!(50000.00)));
1648        assert_eq!(
1649            trade.amount,
1650            ccxt_core::types::financial::Amount::new(dec!(0.5))
1651        );
1652        assert_eq!(trade.timestamp, 1700000000000);
1653    }
1654
1655    #[test]
1656    fn test_parse_ws_trades_multiple() {
1657        let msg = serde_json::from_str(
1658            r#"{
1659                "action": "snapshot",
1660                "arg": {
1661                    "instType": "SPOT",
1662                    "channel": "trade",
1663                    "instId": "BTCUSDT"
1664                },
1665                "data": [
1666                    {
1667                        "tradeId": "123456789",
1668                        "symbol": "BTCUSDT",
1669                        "side": "buy",
1670                        "price": "50000.00",
1671                        "size": "0.5",
1672                        "ts": "1700000000000"
1673                    },
1674                    {
1675                        "tradeId": "123456790",
1676                        "symbol": "BTCUSDT",
1677                        "side": "sell",
1678                        "price": "50001.00",
1679                        "size": "1.0",
1680                        "ts": "1700000000001"
1681                    }
1682                ]
1683            }"#,
1684        )
1685        .unwrap();
1686
1687        let trades = parse_ws_trades(&msg, None).unwrap();
1688        assert_eq!(trades.len(), 2);
1689
1690        assert_eq!(trades[0].id, Some("123456789".to_string()));
1691        assert_eq!(trades[0].side, ccxt_core::types::OrderSide::Buy);
1692
1693        assert_eq!(trades[1].id, Some("123456790".to_string()));
1694        assert_eq!(trades[1].side, ccxt_core::types::OrderSide::Sell);
1695    }
1696
1697    #[test]
1698    fn test_parse_ws_trade_sell_side() {
1699        let msg = serde_json::from_str(
1700            r#"{
1701                "action": "snapshot",
1702                "arg": {
1703                    "instType": "SPOT",
1704                    "channel": "trade",
1705                    "instId": "BTCUSDT"
1706                },
1707                "data": [{
1708                    "tradeId": "123456789",
1709                    "symbol": "BTCUSDT",
1710                    "side": "sell",
1711                    "price": "50000.00",
1712                    "size": "0.5",
1713                    "ts": "1700000000000"
1714                }]
1715            }"#,
1716        )
1717        .unwrap();
1718
1719        let trade = parse_ws_trade(&msg, None).unwrap();
1720        assert_eq!(trade.side, ccxt_core::types::OrderSide::Sell);
1721    }
1722
1723    #[test]
1724    fn test_parse_ws_trade_missing_data() {
1725        let msg = serde_json::from_str(
1726            r#"{
1727                "action": "snapshot",
1728                "arg": {
1729                    "instType": "SPOT",
1730                    "channel": "trade",
1731                    "instId": "BTCUSDT"
1732                }
1733            }"#,
1734        )
1735        .unwrap();
1736
1737        let result = parse_ws_trade(&msg, None);
1738        assert!(result.is_err());
1739    }
1740
1741    #[test]
1742    fn test_parse_ws_trades_empty_array() {
1743        let msg = serde_json::from_str(
1744            r#"{
1745                "action": "snapshot",
1746                "arg": {
1747                    "instType": "SPOT",
1748                    "channel": "trade",
1749                    "instId": "BTCUSDT"
1750                },
1751                "data": []
1752            }"#,
1753        )
1754        .unwrap();
1755
1756        let trades = parse_ws_trades(&msg, None).unwrap();
1757        assert!(trades.is_empty());
1758    }
1759
1760    // ==================== Message Type Detection Tests ====================
1761
1762    #[test]
1763    fn test_is_ticker_message_true() {
1764        let msg = serde_json::from_str(
1765            r#"{
1766                "action": "snapshot",
1767                "arg": {
1768                    "instType": "SPOT",
1769                    "channel": "ticker",
1770                    "instId": "BTCUSDT"
1771                },
1772                "data": [{}]
1773            }"#,
1774        )
1775        .unwrap();
1776
1777        assert!(is_ticker_message(&msg, "BTCUSDT"));
1778    }
1779
1780    #[test]
1781    fn test_is_ticker_message_wrong_symbol() {
1782        let msg = serde_json::from_str(
1783            r#"{
1784                "action": "snapshot",
1785                "arg": {
1786                    "instType": "SPOT",
1787                    "channel": "ticker",
1788                    "instId": "ETHUSDT"
1789                },
1790                "data": [{}]
1791            }"#,
1792        )
1793        .unwrap();
1794
1795        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1796    }
1797
1798    #[test]
1799    fn test_is_ticker_message_wrong_channel() {
1800        let msg = serde_json::from_str(
1801            r#"{
1802                "action": "snapshot",
1803                "arg": {
1804                    "instType": "SPOT",
1805                    "channel": "trade",
1806                    "instId": "BTCUSDT"
1807                },
1808                "data": [{}]
1809            }"#,
1810        )
1811        .unwrap();
1812
1813        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1814    }
1815
1816    #[test]
1817    fn test_is_orderbook_message_books5() {
1818        let msg = serde_json::from_str(
1819            r#"{
1820                "arg": {
1821                    "instType": "SPOT",
1822                    "channel": "books5",
1823                    "instId": "BTCUSDT"
1824                }
1825            }"#,
1826        )
1827        .unwrap();
1828
1829        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1830    }
1831
1832    #[test]
1833    fn test_is_orderbook_message_books15() {
1834        let msg = serde_json::from_str(
1835            r#"{
1836                "arg": {
1837                    "instType": "SPOT",
1838                    "channel": "books15",
1839                    "instId": "BTCUSDT"
1840                }
1841            }"#,
1842        )
1843        .unwrap();
1844
1845        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1846    }
1847
1848    #[test]
1849    fn test_is_orderbook_message_books() {
1850        let msg = serde_json::from_str(
1851            r#"{
1852                "arg": {
1853                    "instType": "SPOT",
1854                    "channel": "books",
1855                    "instId": "BTCUSDT"
1856                }
1857            }"#,
1858        )
1859        .unwrap();
1860
1861        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1862    }
1863
1864    #[test]
1865    fn test_is_trade_message_true() {
1866        let msg = serde_json::from_str(
1867            r#"{
1868                "arg": {
1869                    "instType": "SPOT",
1870                    "channel": "trade",
1871                    "instId": "BTCUSDT"
1872                }
1873            }"#,
1874        )
1875        .unwrap();
1876
1877        assert!(is_trade_message(&msg, "BTCUSDT"));
1878    }
1879
1880    #[test]
1881    fn test_is_trade_message_wrong_channel() {
1882        let msg = serde_json::from_str(
1883            r#"{
1884                "arg": {
1885                    "instType": "SPOT",
1886                    "channel": "ticker",
1887                    "instId": "BTCUSDT"
1888                }
1889            }"#,
1890        )
1891        .unwrap();
1892
1893        assert!(!is_trade_message(&msg, "BTCUSDT"));
1894    }
1895
1896    // ==================== Symbol Formatting Tests ====================
1897
1898    #[test]
1899    fn test_format_unified_symbol_usdt() {
1900        assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1901        assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1902    }
1903
1904    #[test]
1905    fn test_format_unified_symbol_usdc() {
1906        assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1907    }
1908
1909    #[test]
1910    fn test_format_unified_symbol_btc() {
1911        assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1912    }
1913
1914    #[test]
1915    fn test_format_unified_symbol_unknown() {
1916        // Unknown quote currency returns as-is
1917        assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1918    }
1919
1920    // ==================== Private Message Type Detection Tests ====================
1921
1922    #[test]
1923    fn test_is_account_message_true() {
1924        let msg: Value = serde_json::from_str(
1925            r#"{
1926                "arg": {"instType": "SPOT", "channel": "account", "coin": "default"},
1927                "data": [{}]
1928            }"#,
1929        )
1930        .unwrap();
1931        assert!(is_account_message(&msg));
1932    }
1933
1934    #[test]
1935    fn test_is_account_message_wrong_channel() {
1936        let msg: Value = serde_json::from_str(
1937            r#"{
1938                "arg": {"instType": "SPOT", "channel": "orders"},
1939                "data": [{}]
1940            }"#,
1941        )
1942        .unwrap();
1943        assert!(!is_account_message(&msg));
1944    }
1945
1946    #[test]
1947    fn test_is_orders_message_true() {
1948        let msg: Value = serde_json::from_str(
1949            r#"{
1950                "arg": {"instType": "SPOT", "channel": "orders", "instId": "default"},
1951                "data": [{}]
1952            }"#,
1953        )
1954        .unwrap();
1955        assert!(is_orders_message(&msg));
1956    }
1957
1958    #[test]
1959    fn test_is_orders_message_wrong_channel() {
1960        let msg: Value = serde_json::from_str(
1961            r#"{
1962                "arg": {"instType": "SPOT", "channel": "account"},
1963                "data": [{}]
1964            }"#,
1965        )
1966        .unwrap();
1967        assert!(!is_orders_message(&msg));
1968    }
1969
1970    // ==================== Private Balance Parsing Tests ====================
1971
1972    #[test]
1973    fn test_parse_ws_balance_single_coin() {
1974        let msg: Value = serde_json::from_str(
1975            r#"{
1976                "arg": {"instType": "SPOT", "channel": "account", "coin": "default"},
1977                "data": [{
1978                    "coin": "USDT",
1979                    "available": "50000.50",
1980                    "frozen": "10000.25",
1981                    "uTime": "1700000000000"
1982                }]
1983            }"#,
1984        )
1985        .unwrap();
1986
1987        let balance = parse_ws_balance(&msg).unwrap();
1988        assert_eq!(balance.balances.len(), 1);
1989
1990        let usdt = balance.balances.get("USDT").unwrap();
1991        assert_eq!(usdt.free, dec!(50000.50));
1992        assert_eq!(usdt.used, dec!(10000.25));
1993        assert_eq!(usdt.total, dec!(60000.75));
1994    }
1995
1996    #[test]
1997    fn test_parse_ws_balance_multiple_coins() {
1998        let msg: Value = serde_json::from_str(
1999            r#"{
2000                "arg": {"instType": "SPOT", "channel": "account"},
2001                "data": [
2002                    {"coin": "USDT", "available": "50000", "frozen": "10000"},
2003                    {"coin": "BTC", "available": "1.5", "frozen": "0.5"}
2004                ]
2005            }"#,
2006        )
2007        .unwrap();
2008
2009        let balance = parse_ws_balance(&msg).unwrap();
2010        assert_eq!(balance.balances.len(), 2);
2011
2012        let usdt = balance.balances.get("USDT").unwrap();
2013        assert_eq!(usdt.free, dec!(50000));
2014        assert_eq!(usdt.total, dec!(60000));
2015
2016        let btc = balance.balances.get("BTC").unwrap();
2017        assert_eq!(btc.free, dec!(1.5));
2018        assert_eq!(btc.total, dec!(2.0));
2019    }
2020
2021    #[test]
2022    fn test_parse_ws_balance_missing_data() {
2023        let msg: Value = serde_json::from_str(
2024            r#"{
2025                "arg": {"instType": "SPOT", "channel": "account"}
2026            }"#,
2027        )
2028        .unwrap();
2029
2030        let result = parse_ws_balance(&msg);
2031        assert!(result.is_err());
2032    }
2033
2034    // ==================== Private Order Parsing Tests ====================
2035
2036    #[test]
2037    fn test_parse_ws_order_limit_buy() {
2038        let data: Value = serde_json::from_str(
2039            r#"{
2040                "instId": "BTCUSDT",
2041                "ordId": "123456789",
2042                "clOrdId": "client123",
2043                "side": "buy",
2044                "ordType": "limit",
2045                "px": "50000",
2046                "sz": "0.1",
2047                "accFillSz": "0.05",
2048                "avgPx": "49999",
2049                "status": "partially_filled",
2050                "fee": "-0.5",
2051                "feeCcy": "USDT",
2052                "cTime": "1700000000000"
2053            }"#,
2054        )
2055        .unwrap();
2056
2057        let order = parse_ws_order(&data).unwrap();
2058        assert_eq!(order.symbol, "BTC/USDT");
2059        assert_eq!(order.id, "123456789");
2060        assert_eq!(order.client_order_id, Some("client123".to_string()));
2061        assert_eq!(order.side, OrderSide::Buy);
2062        assert_eq!(order.order_type, OrderType::Limit);
2063        assert_eq!(order.price, Some(dec!(50000)));
2064        assert_eq!(order.amount, dec!(0.1));
2065        assert_eq!(order.filled, Some(dec!(0.05)));
2066        assert_eq!(order.average, Some(dec!(49999)));
2067        assert_eq!(order.status, OrderStatus::Open); // partially_filled is still open
2068        assert_eq!(order.timestamp, Some(1700000000000));
2069        assert!(order.fee.is_some());
2070        let fee = order.fee.unwrap();
2071        assert_eq!(fee.cost, dec!(0.5)); // abs value
2072    }
2073
2074    #[test]
2075    fn test_parse_ws_order_market_sell() {
2076        let data: Value = serde_json::from_str(
2077            r#"{
2078                "instId": "ETHUSDT",
2079                "ordId": "987654321",
2080                "side": "sell",
2081                "ordType": "market",
2082                "sz": "1.0",
2083                "accFillSz": "1.0",
2084                "avgPx": "2000",
2085                "status": "filled",
2086                "fee": "-2.0",
2087                "feeCcy": "USDT",
2088                "cTime": "1700000000000"
2089            }"#,
2090        )
2091        .unwrap();
2092
2093        let order = parse_ws_order(&data).unwrap();
2094        assert_eq!(order.symbol, "ETH/USDT");
2095        assert_eq!(order.side, OrderSide::Sell);
2096        assert_eq!(order.order_type, OrderType::Market);
2097        assert_eq!(order.status, OrderStatus::Closed);
2098        assert_eq!(order.cost, Some(dec!(2000))); // 1.0 * 2000
2099    }
2100
2101    #[test]
2102    fn test_parse_ws_order_cancelled() {
2103        let data: Value = serde_json::from_str(
2104            r#"{
2105                "instId": "BTCUSDT",
2106                "ordId": "111222333",
2107                "side": "buy",
2108                "ordType": "limit",
2109                "px": "45000",
2110                "sz": "0.5",
2111                "status": "cancelled",
2112                "cTime": "1700000000000"
2113            }"#,
2114        )
2115        .unwrap();
2116
2117        let order = parse_ws_order(&data).unwrap();
2118        assert_eq!(order.status, OrderStatus::Cancelled);
2119    }
2120
2121    // ==================== Private Trade (My Trade) Parsing Tests ====================
2122
2123    #[test]
2124    fn test_parse_ws_my_trade_basic() {
2125        let data: Value = serde_json::from_str(
2126            r#"{
2127                "instId": "BTCUSDT",
2128                "ordId": "123456789",
2129                "tradeId": "trade001",
2130                "side": "buy",
2131                "fillPx": "50000",
2132                "fillSz": "0.1",
2133                "fillTime": "1700000000000",
2134                "fillFee": "-5.0",
2135                "fillFeeCcy": "USDT"
2136            }"#,
2137        )
2138        .unwrap();
2139
2140        let trade = parse_ws_my_trade(&data).unwrap();
2141        assert_eq!(trade.symbol, "BTC/USDT");
2142        assert_eq!(trade.id, Some("trade001".to_string()));
2143        assert_eq!(trade.order, Some("123456789".to_string()));
2144        assert_eq!(trade.side, OrderSide::Buy);
2145        assert_eq!(trade.price, Price::new(dec!(50000)));
2146        assert_eq!(trade.amount, Amount::new(dec!(0.1)));
2147        assert_eq!(trade.cost, Some(Cost::new(dec!(5000)))); // 50000 * 0.1
2148        assert_eq!(trade.timestamp, 1700000000000);
2149        assert!(trade.fee.is_some());
2150        let fee = trade.fee.unwrap();
2151        assert_eq!(fee.cost, dec!(5.0)); // abs value
2152        assert_eq!(fee.currency, "USDT");
2153    }
2154
2155    #[test]
2156    fn test_parse_ws_my_trade_sell() {
2157        let data: Value = serde_json::from_str(
2158            r#"{
2159                "instId": "ETHUSDT",
2160                "ordId": "987654321",
2161                "tradeId": "trade002",
2162                "side": "sell",
2163                "fillPx": "2000",
2164                "fillSz": "0.5",
2165                "fillTime": "1700000000001"
2166            }"#,
2167        )
2168        .unwrap();
2169
2170        let trade = parse_ws_my_trade(&data).unwrap();
2171        assert_eq!(trade.symbol, "ETH/USDT");
2172        assert_eq!(trade.side, OrderSide::Sell);
2173        assert_eq!(trade.price, Price::new(dec!(2000)));
2174        assert_eq!(trade.amount, Amount::new(dec!(0.5)));
2175        assert_eq!(trade.cost, Some(Cost::new(dec!(1000)))); // 2000 * 0.5
2176        assert!(trade.fee.is_none()); // No fee fields
2177    }
2178
2179    #[test]
2180    fn test_parse_ws_my_trade_fallback_fields() {
2181        // Test fallback field names (price instead of fillPx, etc.)
2182        let data: Value = serde_json::from_str(
2183            r#"{
2184                "instId": "BTCUSDT",
2185                "ordId": "111222333",
2186                "side": "buy",
2187                "price": "48000",
2188                "baseVolume": "0.2",
2189                "uTime": "1700000000002",
2190                "fee": "-1.5",
2191                "feeCcy": "USDT"
2192            }"#,
2193        )
2194        .unwrap();
2195
2196        let trade = parse_ws_my_trade(&data).unwrap();
2197        assert_eq!(trade.price, Price::new(dec!(48000)));
2198        assert_eq!(trade.amount, Amount::new(dec!(0.2)));
2199        assert_eq!(trade.timestamp, 1700000000002);
2200        assert!(trade.fee.is_some());
2201    }
2202}