Skip to main content

ccxt_exchanges/bitget/
ws.rs

1//! Bitget WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for Bitget exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (balance, orders) with automatic reconnection.
6
7use crate::bitget::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19/// Default ping interval for Bitget WebSocket (30 seconds).
20const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
21
22/// Default reconnect delay (5 seconds).
23const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
24
25/// Maximum reconnect attempts.
26const MAX_RECONNECT_ATTEMPTS: u32 = 10;
27
28/// Bitget WebSocket client.
29///
30/// Provides real-time data streaming for Bitget exchange.
31pub struct BitgetWs {
32    /// WebSocket client instance.
33    client: Arc<WsClient>,
34    /// Active subscriptions.
35    subscriptions: Arc<RwLock<Vec<String>>>,
36}
37
38impl BitgetWs {
39    /// Creates a new Bitget WebSocket client.
40    ///
41    /// # Arguments
42    ///
43    /// * `url` - WebSocket server URL
44    pub fn new(url: String) -> Self {
45        let config = WsConfig {
46            url: url.clone(),
47            connect_timeout: 10000,
48            ping_interval: DEFAULT_PING_INTERVAL_MS,
49            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
50            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
51            auto_reconnect: true,
52            enable_compression: false,
53            pong_timeout: 90000,
54            ..Default::default()
55        };
56
57        Self {
58            client: Arc::new(WsClient::new(config)),
59            subscriptions: Arc::new(RwLock::new(Vec::new())),
60        }
61    }
62
63    /// Connects to the WebSocket server.
64    pub async fn connect(&self) -> Result<()> {
65        self.client.connect().await
66    }
67
68    /// Disconnects from the WebSocket server.
69    pub async fn disconnect(&self) -> Result<()> {
70        self.client.disconnect().await
71    }
72
73    /// Returns the current connection state.
74    pub fn state(&self) -> WsConnectionState {
75        self.client.state()
76    }
77
78    /// Checks if the WebSocket is connected.
79    pub fn is_connected(&self) -> bool {
80        self.client.is_connected()
81    }
82
83    /// Receives the next message from the WebSocket.
84    pub async fn receive(&self) -> Option<Value> {
85        self.client.receive().await
86    }
87
88    /// Subscribes to a ticker stream.
89    ///
90    /// # Arguments
91    ///
92    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
93    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
94        let mut arg_map = serde_json::Map::new();
95        arg_map.insert(
96            "instType".to_string(),
97            serde_json::Value::String("SPOT".to_string()),
98        );
99        arg_map.insert(
100            "channel".to_string(),
101            serde_json::Value::String("ticker".to_string()),
102        );
103        arg_map.insert(
104            "instId".to_string(),
105            serde_json::Value::String(symbol.to_string()),
106        );
107        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
108
109        let mut msg_map = serde_json::Map::new();
110        msg_map.insert(
111            "op".to_string(),
112            serde_json::Value::String("subscribe".to_string()),
113        );
114        msg_map.insert("args".to_string(), args);
115        let msg = serde_json::Value::Object(msg_map);
116
117        self.client.send_json(&msg).await?;
118
119        let sub_key = format!("ticker:{}", symbol);
120        self.subscriptions.write().await.push(sub_key);
121
122        Ok(())
123    }
124
125    /// Subscribes to multiple ticker streams.
126    ///
127    /// # Arguments
128    ///
129    /// * `symbols` - List of trading pair symbols (e.g., `["BTCUSDT", "ETHUSDT"]`)
130    pub async fn subscribe_tickers(&self, symbols: &[String]) -> Result<()> {
131        let mut args = Vec::new();
132        for symbol in symbols {
133            let mut arg_map = serde_json::Map::new();
134            arg_map.insert(
135                "instType".to_string(),
136                serde_json::Value::String("SPOT".to_string()),
137            );
138            arg_map.insert(
139                "channel".to_string(),
140                serde_json::Value::String("ticker".to_string()),
141            );
142            arg_map.insert(
143                "instId".to_string(),
144                serde_json::Value::String(symbol.clone()),
145            );
146            args.push(serde_json::Value::Object(arg_map));
147        }
148
149        let mut msg_map = serde_json::Map::new();
150        msg_map.insert(
151            "op".to_string(),
152            serde_json::Value::String("subscribe".to_string()),
153        );
154        msg_map.insert("args".to_string(), serde_json::Value::Array(args));
155        let msg = serde_json::Value::Object(msg_map);
156
157        self.client.send_json(&msg).await?;
158
159        let mut subs = self.subscriptions.write().await;
160        for symbol in symbols {
161            subs.push(format!("ticker:{}", symbol));
162        }
163
164        Ok(())
165    }
166
167    /// Watches ticker updates for multiple symbols.
168    ///
169    /// Returns a stream of `Vec<Ticker>` updates for the specified symbols.
170    ///
171    /// # Arguments
172    ///
173    /// * `symbols` - List of trading pair symbols (e.g., `["BTCUSDT", "ETHUSDT"]`)
174    ///
175    /// # Returns
176    ///
177    /// A `MessageStream<Vec<Ticker>>` that yields ticker updates.
178    pub async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
179        // Ensure connected
180        if !self.is_connected() {
181            self.connect().await?;
182        }
183
184        // Subscribe to ticker channels
185        self.subscribe_tickers(symbols).await?;
186
187        // Create channel for ticker updates
188        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Ticker>>>();
189        let symbols_owned: Vec<String> = symbols.to_vec();
190        let client = Arc::clone(&self.client);
191
192        // Spawn task to process messages and filter ticker updates
193        tokio::spawn(async move {
194            while let Some(msg) = client.receive().await {
195                // Check if this is a ticker message for ANY of our symbols
196                if let Some(arg) = msg.get("arg") {
197                    let channel = arg.get("channel").and_then(|c| c.as_str());
198                    let inst_id = arg.get("instId").and_then(|i| i.as_str());
199
200                    if channel == Some("ticker") {
201                        if let Some(id) = inst_id {
202                            if symbols_owned.iter().any(|s| s == id) {
203                                match parse_ws_ticker(&msg, None) {
204                                    Ok(ticker) => {
205                                        if tx.send(Ok(vec![ticker])).is_err() {
206                                            break; // Receiver dropped
207                                        }
208                                    }
209                                    Err(e) => {
210                                        if tx.send(Err(e)).is_err() {
211                                            break;
212                                        }
213                                    }
214                                }
215                            }
216                        }
217                    }
218                }
219            }
220        });
221
222        Ok(Box::pin(ReceiverStream::new(rx)))
223    }
224    /// * `depth` - Orderbook depth (5, 15, or default)
225    pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
226        let channel = match depth {
227            5 => "books5",
228            15 => "books15",
229            _ => "books",
230        };
231
232        let mut arg_map = serde_json::Map::new();
233        arg_map.insert(
234            "instType".to_string(),
235            serde_json::Value::String("SPOT".to_string()),
236        );
237        arg_map.insert(
238            "channel".to_string(),
239            serde_json::Value::String(channel.to_string()),
240        );
241        arg_map.insert(
242            "instId".to_string(),
243            serde_json::Value::String(symbol.to_string()),
244        );
245        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
246
247        let mut msg_map = serde_json::Map::new();
248        msg_map.insert(
249            "op".to_string(),
250            serde_json::Value::String("subscribe".to_string()),
251        );
252        msg_map.insert("args".to_string(), args);
253        let msg = serde_json::Value::Object(msg_map);
254
255        self.client.send_json(&msg).await?;
256
257        let sub_key = format!("orderbook:{}", symbol);
258        self.subscriptions.write().await.push(sub_key);
259
260        Ok(())
261    }
262
263    /// Subscribes to a trades stream.
264    ///
265    /// # Arguments
266    ///
267    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
268    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
269        let mut arg_map = serde_json::Map::new();
270        arg_map.insert(
271            "instType".to_string(),
272            serde_json::Value::String("SPOT".to_string()),
273        );
274        arg_map.insert(
275            "channel".to_string(),
276            serde_json::Value::String("trade".to_string()),
277        );
278        arg_map.insert(
279            "instId".to_string(),
280            serde_json::Value::String(symbol.to_string()),
281        );
282        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
283
284        let mut msg_map = serde_json::Map::new();
285        msg_map.insert(
286            "op".to_string(),
287            serde_json::Value::String("subscribe".to_string()),
288        );
289        msg_map.insert("args".to_string(), args);
290        let msg = serde_json::Value::Object(msg_map);
291
292        self.client.send_json(&msg).await?;
293
294        let sub_key = format!("trades:{}", symbol);
295        self.subscriptions.write().await.push(sub_key);
296
297        Ok(())
298    }
299
300    /// Subscribes to a kline/candlestick stream.
301    ///
302    /// # Arguments
303    ///
304    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
305    /// * `interval` - Kline interval (e.g., "1m", "5m", "1H")
306    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
307        let channel = format!("candle{}", interval);
308
309        let mut arg_map = serde_json::Map::new();
310        arg_map.insert(
311            "instType".to_string(),
312            serde_json::Value::String("SPOT".to_string()),
313        );
314        arg_map.insert(
315            "channel".to_string(),
316            serde_json::Value::String(channel.clone()),
317        );
318        arg_map.insert(
319            "instId".to_string(),
320            serde_json::Value::String(symbol.to_string()),
321        );
322        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
323
324        let mut msg_map = serde_json::Map::new();
325        msg_map.insert(
326            "op".to_string(),
327            serde_json::Value::String("subscribe".to_string()),
328        );
329        msg_map.insert("args".to_string(), args);
330        let msg = serde_json::Value::Object(msg_map);
331
332        self.client.send_json(&msg).await?;
333
334        let sub_key = format!("kline:{}:{}", symbol, interval);
335        self.subscriptions.write().await.push(sub_key);
336
337        Ok(())
338    }
339
340    /// Unsubscribes from a stream.
341    ///
342    /// # Arguments
343    ///
344    /// * `stream_name` - Stream identifier to unsubscribe from
345    pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
346        // Parse stream name to determine channel and symbol
347        let parts: Vec<&str> = stream_name.split(':').collect();
348        if parts.len() < 2 {
349            return Err(Error::invalid_request(format!(
350                "Invalid stream name: {}",
351                stream_name
352            )));
353        }
354
355        let channel = parts[0];
356        let symbol = parts[1];
357
358        let bitget_channel = match channel {
359            "ticker" => "ticker",
360            "orderbook" => "books",
361            "trades" => "trade",
362            "kline" => {
363                if parts.len() >= 3 {
364                    // For kline, we need the interval
365                    return self.unsubscribe_kline(symbol, parts[2]).await;
366                }
367                return Err(Error::invalid_request(
368                    "Kline unsubscribe requires interval",
369                ));
370            }
371            _ => channel,
372        };
373
374        let mut arg_map = serde_json::Map::new();
375        arg_map.insert(
376            "instType".to_string(),
377            serde_json::Value::String("SPOT".to_string()),
378        );
379        arg_map.insert(
380            "channel".to_string(),
381            serde_json::Value::String(bitget_channel.to_string()),
382        );
383        arg_map.insert(
384            "instId".to_string(),
385            serde_json::Value::String(symbol.to_string()),
386        );
387        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
388
389        let mut msg_map = serde_json::Map::new();
390        msg_map.insert(
391            "op".to_string(),
392            serde_json::Value::String("unsubscribe".to_string()),
393        );
394        msg_map.insert("args".to_string(), args);
395        let msg = serde_json::Value::Object(msg_map);
396
397        self.client.send_json(&msg).await?;
398
399        // Remove from subscriptions
400        let mut subs = self.subscriptions.write().await;
401        subs.retain(|s| s != &stream_name);
402
403        Ok(())
404    }
405
406    /// Unsubscribes from a kline stream.
407    async fn unsubscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
408        let channel = format!("candle{}", interval);
409
410        let mut arg_map = serde_json::Map::new();
411        arg_map.insert(
412            "instType".to_string(),
413            serde_json::Value::String("SPOT".to_string()),
414        );
415        arg_map.insert(
416            "channel".to_string(),
417            serde_json::Value::String(channel.clone()),
418        );
419        arg_map.insert(
420            "instId".to_string(),
421            serde_json::Value::String(symbol.to_string()),
422        );
423        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
424
425        let mut msg_map = serde_json::Map::new();
426        msg_map.insert(
427            "op".to_string(),
428            serde_json::Value::String("unsubscribe".to_string()),
429        );
430        msg_map.insert("args".to_string(), args);
431        let msg = serde_json::Value::Object(msg_map);
432
433        self.client.send_json(&msg).await?;
434
435        let sub_key = format!("kline:{}:{}", symbol, interval);
436        let mut subs = self.subscriptions.write().await;
437        subs.retain(|s| s != &sub_key);
438
439        Ok(())
440    }
441
442    /// Returns the list of active subscriptions.
443    pub async fn subscriptions(&self) -> Vec<String> {
444        self.subscriptions.read().await.clone()
445    }
446
447    /// Watches ticker updates for a symbol.
448    ///
449    /// Returns a stream of `Ticker` updates for the specified symbol.
450    ///
451    /// # Arguments
452    ///
453    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
454    /// * `market` - Optional market information for symbol resolution
455    ///
456    /// # Returns
457    ///
458    /// A `MessageStream<Ticker>` that yields ticker updates.
459    ///
460    /// # Example
461    ///
462    /// ```no_run
463    /// use ccxt_exchanges::bitget::ws::BitgetWs;
464    /// use futures::StreamExt;
465    ///
466    /// # async fn example() -> ccxt_core::error::Result<()> {
467    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
468    /// ws.connect().await?;
469    /// let mut stream = ws.watch_ticker("BTCUSDT", None).await?;
470    /// while let Some(result) = stream.next().await {
471    ///     match result {
472    ///         Ok(ticker) => println!("Ticker: {:?}", ticker.last),
473    ///         Err(e) => eprintln!("Error: {}", e),
474    ///     }
475    /// }
476    /// # Ok(())
477    /// # }
478    /// ```
479    pub async fn watch_ticker(
480        &self,
481        symbol: &str,
482        market: Option<Market>,
483    ) -> Result<MessageStream<Ticker>> {
484        // Ensure connected
485        if !self.is_connected() {
486            self.connect().await?;
487        }
488
489        // Subscribe to ticker channel
490        self.subscribe_ticker(symbol).await?;
491
492        // Create channel for ticker updates
493        let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
494        let symbol_owned = symbol.to_string();
495        let client = Arc::clone(&self.client);
496
497        // Spawn task to process messages and filter ticker updates
498        tokio::spawn(async move {
499            while let Some(msg) = client.receive().await {
500                // Check if this is a ticker message for our symbol
501                if is_ticker_message(&msg, &symbol_owned) {
502                    match parse_ws_ticker(&msg, market.as_ref()) {
503                        Ok(ticker) => {
504                            if tx.send(Ok(ticker)).is_err() {
505                                break; // Receiver dropped
506                            }
507                        }
508                        Err(e) => {
509                            if tx.send(Err(e)).is_err() {
510                                break;
511                            }
512                        }
513                    }
514                }
515            }
516        });
517
518        Ok(Box::pin(ReceiverStream::new(rx)))
519    }
520
521    /// Watches order book updates for a symbol.
522    ///
523    /// Returns a stream of `OrderBook` updates for the specified symbol.
524    ///
525    /// # Arguments
526    ///
527    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
528    /// * `limit` - Optional depth limit (5, 15, or full depth)
529    ///
530    /// # Returns
531    ///
532    /// A `MessageStream<OrderBook>` that yields order book updates.
533    ///
534    /// # Example
535    ///
536    /// ```no_run
537    /// use ccxt_exchanges::bitget::ws::BitgetWs;
538    /// use futures::StreamExt;
539    ///
540    /// # async fn example() -> ccxt_core::error::Result<()> {
541    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
542    /// ws.connect().await?;
543    /// let mut stream = ws.watch_order_book("BTCUSDT", Some(5)).await?;
544    /// while let Some(result) = stream.next().await {
545    ///     match result {
546    ///         Ok(orderbook) => println!("Best bid: {:?}", orderbook.bids.first()),
547    ///         Err(e) => eprintln!("Error: {}", e),
548    ///     }
549    /// }
550    /// # Ok(())
551    /// # }
552    /// ```
553    pub async fn watch_order_book(
554        &self,
555        symbol: &str,
556        limit: Option<u32>,
557    ) -> Result<MessageStream<OrderBook>> {
558        // Ensure connected
559        if !self.is_connected() {
560            self.connect().await?;
561        }
562
563        // Subscribe to orderbook channel
564        let depth = limit.unwrap_or(15);
565        self.subscribe_orderbook(symbol, depth).await?;
566
567        // Create channel for orderbook updates
568        let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
569        let symbol_owned = symbol.to_string();
570        let unified_symbol = format_unified_symbol(&symbol_owned);
571        let client = Arc::clone(&self.client);
572
573        // Spawn task to process messages and filter orderbook updates
574        tokio::spawn(async move {
575            while let Some(msg) = client.receive().await {
576                // Check if this is an orderbook message for our symbol
577                if is_orderbook_message(&msg, &symbol_owned) {
578                    match parse_ws_orderbook(&msg, unified_symbol.clone()) {
579                        Ok(orderbook) => {
580                            if tx.send(Ok(orderbook)).is_err() {
581                                break; // Receiver dropped
582                            }
583                        }
584                        Err(e) => {
585                            if tx.send(Err(e)).is_err() {
586                                break;
587                            }
588                        }
589                    }
590                }
591            }
592        });
593
594        Ok(Box::pin(ReceiverStream::new(rx)))
595    }
596
597    /// Watches trade updates for a symbol.
598    ///
599    /// Returns a stream of `Trade` updates for the specified symbol.
600    ///
601    /// # Arguments
602    ///
603    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
604    /// * `market` - Optional market information for symbol resolution
605    ///
606    /// # Returns
607    ///
608    /// A `MessageStream<Vec<Trade>>` that yields trade updates.
609    ///
610    /// # Example
611    ///
612    /// ```no_run
613    /// use ccxt_exchanges::bitget::ws::BitgetWs;
614    /// use futures::StreamExt;
615    ///
616    /// # async fn example() -> ccxt_core::error::Result<()> {
617    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
618    /// ws.connect().await?;
619    /// let mut stream = ws.watch_trades("BTCUSDT", None).await?;
620    /// while let Some(result) = stream.next().await {
621    ///     match result {
622    ///         Ok(trades) => {
623    ///             for trade in trades {
624    ///                 println!("Trade: {:?} @ {:?}", trade.amount, trade.price);
625    ///             }
626    ///         }
627    ///         Err(e) => eprintln!("Error: {}", e),
628    ///     }
629    /// }
630    /// # Ok(())
631    /// # }
632    /// ```
633    pub async fn watch_trades(
634        &self,
635        symbol: &str,
636        market: Option<Market>,
637    ) -> Result<MessageStream<Vec<Trade>>> {
638        // Ensure connected
639        if !self.is_connected() {
640            self.connect().await?;
641        }
642
643        // Subscribe to trades channel
644        self.subscribe_trades(symbol).await?;
645
646        // Create channel for trade updates
647        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
648        let symbol_owned = symbol.to_string();
649        let client = Arc::clone(&self.client);
650
651        // Spawn task to process messages and filter trade updates
652        tokio::spawn(async move {
653            while let Some(msg) = client.receive().await {
654                // Check if this is a trade message for our symbol
655                if is_trade_message(&msg, &symbol_owned) {
656                    match parse_ws_trades(&msg, market.as_ref()) {
657                        Ok(trades) => {
658                            if tx.send(Ok(trades)).is_err() {
659                                break; // Receiver dropped
660                            }
661                        }
662                        Err(e) => {
663                            if tx.send(Err(e)).is_err() {
664                                break;
665                            }
666                        }
667                    }
668                }
669            }
670        });
671
672        Ok(Box::pin(ReceiverStream::new(rx)))
673    }
674}
675
676// ============================================================================
677// Stream Wrapper
678// ============================================================================
679
680/// A stream wrapper that converts an mpsc receiver into a futures Stream.
681struct ReceiverStream<T> {
682    receiver: mpsc::UnboundedReceiver<T>,
683}
684
685impl<T> ReceiverStream<T> {
686    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
687        Self { receiver }
688    }
689}
690
691impl<T> Stream for ReceiverStream<T> {
692    type Item = T;
693
694    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
695        self.receiver.poll_recv(cx)
696    }
697}
698
699// ============================================================================
700// Message Type Detection Helpers
701// ============================================================================
702
703/// Check if a WebSocket message is a ticker message for the given symbol.
704fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
705    if let Some(arg) = msg.get("arg") {
706        let channel = arg.get("channel").and_then(|c| c.as_str());
707        let inst_id = arg.get("instId").and_then(|i| i.as_str());
708
709        channel == Some("ticker") && inst_id == Some(symbol)
710    } else {
711        false
712    }
713}
714
715/// Check if a WebSocket message is an orderbook message for the given symbol.
716fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
717    if let Some(arg) = msg.get("arg") {
718        let channel = arg.get("channel").and_then(|c| c.as_str());
719        let inst_id = arg.get("instId").and_then(|i| i.as_str());
720
721        // Bitget uses books, books5, books15 for orderbook channels
722        let is_orderbook_channel = channel.is_some_and(|c| c.starts_with("books"));
723        is_orderbook_channel && inst_id == Some(symbol)
724    } else {
725        false
726    }
727}
728
729/// Check if a WebSocket message is a trade message for the given symbol.
730fn is_trade_message(msg: &Value, symbol: &str) -> bool {
731    if let Some(arg) = msg.get("arg") {
732        let channel = arg.get("channel").and_then(|c| c.as_str());
733        let inst_id = arg.get("instId").and_then(|i| i.as_str());
734
735        channel == Some("trade") && inst_id == Some(symbol)
736    } else {
737        false
738    }
739}
740
741/// Format a Bitget symbol (e.g., "BTCUSDT") to unified format (e.g., "BTC/USDT").
742fn format_unified_symbol(symbol: &str) -> String {
743    // Common quote currencies to detect
744    let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
745
746    for quote in &quote_currencies {
747        if let Some(base) = symbol.strip_suffix(quote) {
748            if !base.is_empty() {
749                return format!("{}/{}", base, quote);
750            }
751        }
752    }
753
754    // If no known quote currency found, return as-is
755    symbol.to_string()
756}
757
758// ============================================================================
759// WebSocket Message Parsers
760// ============================================================================
761
762/// Parse a WebSocket ticker message.
763pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
764    // Bitget WebSocket ticker format:
765    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"ticker","instId":"BTCUSDT"},"data":[{...}]}
766    let data = msg
767        .get("data")
768        .and_then(|d| d.as_array())
769        .and_then(|arr| arr.first())
770        .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
771
772    parse_ticker(data, market)
773}
774
775/// Parse a WebSocket orderbook message.
776pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
777    // Bitget WebSocket orderbook format:
778    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"books5","instId":"BTCUSDT"},"data":[{...}]}
779    let data = msg
780        .get("data")
781        .and_then(|d| d.as_array())
782        .and_then(|arr| arr.first())
783        .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
784
785    parse_orderbook(data, symbol)
786}
787
788/// Parse a WebSocket trade message (single trade).
789pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
790    // Bitget WebSocket trade format:
791    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"trade","instId":"BTCUSDT"},"data":[{...}]}
792    let data = msg
793        .get("data")
794        .and_then(|d| d.as_array())
795        .and_then(|arr| arr.first())
796        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
797
798    parse_trade(data, market)
799}
800
801/// Parse a WebSocket trade message (multiple trades).
802pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
803    // Bitget WebSocket trade format:
804    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"trade","instId":"BTCUSDT"},"data":[{...}, {...}]}
805    let data_array = msg
806        .get("data")
807        .and_then(|d| d.as_array())
808        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
809
810    let mut trades = Vec::with_capacity(data_array.len());
811    for data in data_array {
812        trades.push(parse_trade(data, market)?);
813    }
814
815    Ok(trades)
816}
817
818#[cfg(test)]
819mod tests {
820    use super::*;
821    use ccxt_core::types::financial::Price;
822    use rust_decimal_macros::dec;
823
824    #[test]
825    fn test_bitget_ws_creation() {
826        let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
827        // WsClient is created successfully - config is private so we just verify creation works
828        assert!(ws.subscriptions.try_read().is_ok());
829    }
830
831    #[tokio::test]
832    async fn test_subscriptions_empty_by_default() {
833        let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
834        let subs = ws.subscriptions().await;
835        assert!(subs.is_empty());
836    }
837
838    // ==================== Ticker Message Parsing Tests ====================
839
840    #[test]
841    fn test_parse_ws_ticker_snapshot() {
842        let msg = serde_json::from_str(
843            r#"{
844                "action": "snapshot",
845                "arg": {
846                    "instType": "SPOT",
847                    "channel": "ticker",
848                    "instId": "BTCUSDT"
849                },
850                "data": [{
851                    "instId": "BTCUSDT",
852                    "lastPr": "50000.00",
853                    "high24h": "51000.00",
854                    "low24h": "49000.00",
855                    "bidPr": "49999.00",
856                    "askPr": "50001.00",
857                    "baseVolume": "1000.5",
858                    "ts": "1700000000000"
859                }]
860            }"#,
861        )
862        .unwrap();
863
864        let ticker = parse_ws_ticker(&msg, None).unwrap();
865        assert_eq!(ticker.symbol, "BTCUSDT");
866        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
867        assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
868        assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
869        assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
870        assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
871        assert_eq!(ticker.timestamp, 1700000000000);
872    }
873
874    #[test]
875    fn test_parse_ws_ticker_with_market() {
876        let msg = serde_json::from_str(
877            r#"{
878                "action": "snapshot",
879                "arg": {
880                    "instType": "SPOT",
881                    "channel": "ticker",
882                    "instId": "BTCUSDT"
883                },
884                "data": [{
885                    "instId": "BTCUSDT",
886                    "lastPr": "50000.00",
887                    "ts": "1700000000000"
888                }]
889            }"#,
890        )
891        .unwrap();
892
893        let market = Market {
894            id: "BTCUSDT".to_string(),
895            symbol: "BTC/USDT".to_string(),
896            base: "BTC".to_string(),
897            quote: "USDT".to_string(),
898            ..Default::default()
899        };
900
901        let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
902        assert_eq!(ticker.symbol, "BTC/USDT");
903        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
904    }
905
906    #[test]
907    fn test_parse_ws_ticker_missing_data() {
908        let msg = serde_json::from_str(
909            r#"{
910                "action": "snapshot",
911                "arg": {
912                    "instType": "SPOT",
913                    "channel": "ticker",
914                    "instId": "BTCUSDT"
915                }
916            }"#,
917        )
918        .unwrap();
919
920        let result = parse_ws_ticker(&msg, None);
921        assert!(result.is_err());
922    }
923
924    #[test]
925    fn test_parse_ws_ticker_empty_data_array() {
926        let msg = serde_json::from_str(
927            r#"{
928                "action": "snapshot",
929                "arg": {
930                    "instType": "SPOT",
931                    "channel": "ticker",
932                    "instId": "BTCUSDT"
933                },
934                "data": []
935            }"#,
936        )
937        .unwrap();
938
939        let result = parse_ws_ticker(&msg, None);
940        assert!(result.is_err());
941    }
942
943    // ==================== OrderBook Message Parsing Tests ====================
944
945    #[test]
946    fn test_parse_ws_orderbook_snapshot() {
947        let msg = serde_json::from_str(
948            r#"{
949                "action": "snapshot",
950                "arg": {
951                    "instType": "SPOT",
952                    "channel": "books5",
953                    "instId": "BTCUSDT"
954                },
955                "data": [{
956                    "bids": [
957                        ["50000.00", "1.5"],
958                        ["49999.00", "2.0"],
959                        ["49998.00", "0.5"]
960                    ],
961                    "asks": [
962                        ["50001.00", "1.0"],
963                        ["50002.00", "3.0"],
964                        ["50003.00", "2.5"]
965                    ],
966                    "ts": "1700000000000"
967                }]
968            }"#,
969        )
970        .unwrap();
971
972        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
973        assert_eq!(orderbook.symbol, "BTC/USDT");
974        assert_eq!(orderbook.bids.len(), 3);
975        assert_eq!(orderbook.asks.len(), 3);
976
977        // Verify bids are sorted in descending order
978        assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
979        assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
980        assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
981
982        // Verify asks are sorted in ascending order
983        assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
984        assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
985        assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
986    }
987
988    #[test]
989    fn test_parse_ws_orderbook_update() {
990        let msg = serde_json::from_str(
991            r#"{
992                "action": "update",
993                "arg": {
994                    "instType": "SPOT",
995                    "channel": "books",
996                    "instId": "ETHUSDT"
997                },
998                "data": [{
999                    "bids": [
1000                        ["2000.00", "10.0"]
1001                    ],
1002                    "asks": [
1003                        ["2001.00", "5.0"]
1004                    ],
1005                    "ts": "1700000000001"
1006                }]
1007            }"#,
1008        )
1009        .unwrap();
1010
1011        let orderbook = parse_ws_orderbook(&msg, "ETH/USDT".to_string()).unwrap();
1012        assert_eq!(orderbook.symbol, "ETH/USDT");
1013        assert_eq!(orderbook.bids.len(), 1);
1014        assert_eq!(orderbook.asks.len(), 1);
1015        assert_eq!(orderbook.timestamp, 1700000000001);
1016    }
1017
1018    #[test]
1019    fn test_parse_ws_orderbook_missing_data() {
1020        let msg = serde_json::from_str(
1021            r#"{
1022                "action": "snapshot",
1023                "arg": {
1024                    "instType": "SPOT",
1025                    "channel": "books5",
1026                    "instId": "BTCUSDT"
1027                }
1028            }"#,
1029        )
1030        .unwrap();
1031
1032        let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
1033        assert!(result.is_err());
1034    }
1035
1036    #[test]
1037    fn test_parse_ws_orderbook_empty_sides() {
1038        let msg = serde_json::from_str(
1039            r#"{
1040                "action": "snapshot",
1041                "arg": {
1042                    "instType": "SPOT",
1043                    "channel": "books5",
1044                    "instId": "BTCUSDT"
1045                },
1046                "data": [{
1047                    "bids": [],
1048                    "asks": [],
1049                    "ts": "1700000000000"
1050                }]
1051            }"#,
1052        )
1053        .unwrap();
1054
1055        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
1056        assert!(orderbook.bids.is_empty());
1057        assert!(orderbook.asks.is_empty());
1058    }
1059
1060    // ==================== Trade Message Parsing Tests ====================
1061
1062    #[test]
1063    fn test_parse_ws_trade_single() {
1064        let msg = serde_json::from_str(
1065            r#"{
1066                "action": "snapshot",
1067                "arg": {
1068                    "instType": "SPOT",
1069                    "channel": "trade",
1070                    "instId": "BTCUSDT"
1071                },
1072                "data": [{
1073                    "tradeId": "123456789",
1074                    "symbol": "BTCUSDT",
1075                    "side": "buy",
1076                    "price": "50000.00",
1077                    "size": "0.5",
1078                    "ts": "1700000000000"
1079                }]
1080            }"#,
1081        )
1082        .unwrap();
1083
1084        let trade = parse_ws_trade(&msg, None).unwrap();
1085        assert_eq!(trade.id, Some("123456789".to_string()));
1086        assert_eq!(trade.side, ccxt_core::types::OrderSide::Buy);
1087        assert_eq!(trade.price, Price::new(dec!(50000.00)));
1088        assert_eq!(
1089            trade.amount,
1090            ccxt_core::types::financial::Amount::new(dec!(0.5))
1091        );
1092        assert_eq!(trade.timestamp, 1700000000000);
1093    }
1094
1095    #[test]
1096    fn test_parse_ws_trades_multiple() {
1097        let msg = serde_json::from_str(
1098            r#"{
1099                "action": "snapshot",
1100                "arg": {
1101                    "instType": "SPOT",
1102                    "channel": "trade",
1103                    "instId": "BTCUSDT"
1104                },
1105                "data": [
1106                    {
1107                        "tradeId": "123456789",
1108                        "symbol": "BTCUSDT",
1109                        "side": "buy",
1110                        "price": "50000.00",
1111                        "size": "0.5",
1112                        "ts": "1700000000000"
1113                    },
1114                    {
1115                        "tradeId": "123456790",
1116                        "symbol": "BTCUSDT",
1117                        "side": "sell",
1118                        "price": "50001.00",
1119                        "size": "1.0",
1120                        "ts": "1700000000001"
1121                    }
1122                ]
1123            }"#,
1124        )
1125        .unwrap();
1126
1127        let trades = parse_ws_trades(&msg, None).unwrap();
1128        assert_eq!(trades.len(), 2);
1129
1130        assert_eq!(trades[0].id, Some("123456789".to_string()));
1131        assert_eq!(trades[0].side, ccxt_core::types::OrderSide::Buy);
1132
1133        assert_eq!(trades[1].id, Some("123456790".to_string()));
1134        assert_eq!(trades[1].side, ccxt_core::types::OrderSide::Sell);
1135    }
1136
1137    #[test]
1138    fn test_parse_ws_trade_sell_side() {
1139        let msg = serde_json::from_str(
1140            r#"{
1141                "action": "snapshot",
1142                "arg": {
1143                    "instType": "SPOT",
1144                    "channel": "trade",
1145                    "instId": "BTCUSDT"
1146                },
1147                "data": [{
1148                    "tradeId": "123456789",
1149                    "symbol": "BTCUSDT",
1150                    "side": "sell",
1151                    "price": "50000.00",
1152                    "size": "0.5",
1153                    "ts": "1700000000000"
1154                }]
1155            }"#,
1156        )
1157        .unwrap();
1158
1159        let trade = parse_ws_trade(&msg, None).unwrap();
1160        assert_eq!(trade.side, ccxt_core::types::OrderSide::Sell);
1161    }
1162
1163    #[test]
1164    fn test_parse_ws_trade_missing_data() {
1165        let msg = serde_json::from_str(
1166            r#"{
1167                "action": "snapshot",
1168                "arg": {
1169                    "instType": "SPOT",
1170                    "channel": "trade",
1171                    "instId": "BTCUSDT"
1172                }
1173            }"#,
1174        )
1175        .unwrap();
1176
1177        let result = parse_ws_trade(&msg, None);
1178        assert!(result.is_err());
1179    }
1180
1181    #[test]
1182    fn test_parse_ws_trades_empty_array() {
1183        let msg = serde_json::from_str(
1184            r#"{
1185                "action": "snapshot",
1186                "arg": {
1187                    "instType": "SPOT",
1188                    "channel": "trade",
1189                    "instId": "BTCUSDT"
1190                },
1191                "data": []
1192            }"#,
1193        )
1194        .unwrap();
1195
1196        let trades = parse_ws_trades(&msg, None).unwrap();
1197        assert!(trades.is_empty());
1198    }
1199
1200    // ==================== Message Type Detection Tests ====================
1201
1202    #[test]
1203    fn test_is_ticker_message_true() {
1204        let msg = serde_json::from_str(
1205            r#"{
1206                "action": "snapshot",
1207                "arg": {
1208                    "instType": "SPOT",
1209                    "channel": "ticker",
1210                    "instId": "BTCUSDT"
1211                },
1212                "data": [{}]
1213            }"#,
1214        )
1215        .unwrap();
1216
1217        assert!(is_ticker_message(&msg, "BTCUSDT"));
1218    }
1219
1220    #[test]
1221    fn test_is_ticker_message_wrong_symbol() {
1222        let msg = serde_json::from_str(
1223            r#"{
1224                "action": "snapshot",
1225                "arg": {
1226                    "instType": "SPOT",
1227                    "channel": "ticker",
1228                    "instId": "ETHUSDT"
1229                },
1230                "data": [{}]
1231            }"#,
1232        )
1233        .unwrap();
1234
1235        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1236    }
1237
1238    #[test]
1239    fn test_is_ticker_message_wrong_channel() {
1240        let msg = serde_json::from_str(
1241            r#"{
1242                "action": "snapshot",
1243                "arg": {
1244                    "instType": "SPOT",
1245                    "channel": "trade",
1246                    "instId": "BTCUSDT"
1247                },
1248                "data": [{}]
1249            }"#,
1250        )
1251        .unwrap();
1252
1253        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1254    }
1255
1256    #[test]
1257    fn test_is_orderbook_message_books5() {
1258        let msg = serde_json::from_str(
1259            r#"{
1260                "arg": {
1261                    "instType": "SPOT",
1262                    "channel": "books5",
1263                    "instId": "BTCUSDT"
1264                }
1265            }"#,
1266        )
1267        .unwrap();
1268
1269        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1270    }
1271
1272    #[test]
1273    fn test_is_orderbook_message_books15() {
1274        let msg = serde_json::from_str(
1275            r#"{
1276                "arg": {
1277                    "instType": "SPOT",
1278                    "channel": "books15",
1279                    "instId": "BTCUSDT"
1280                }
1281            }"#,
1282        )
1283        .unwrap();
1284
1285        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1286    }
1287
1288    #[test]
1289    fn test_is_orderbook_message_books() {
1290        let msg = serde_json::from_str(
1291            r#"{
1292                "arg": {
1293                    "instType": "SPOT",
1294                    "channel": "books",
1295                    "instId": "BTCUSDT"
1296                }
1297            }"#,
1298        )
1299        .unwrap();
1300
1301        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1302    }
1303
1304    #[test]
1305    fn test_is_trade_message_true() {
1306        let msg = serde_json::from_str(
1307            r#"{
1308                "arg": {
1309                    "instType": "SPOT",
1310                    "channel": "trade",
1311                    "instId": "BTCUSDT"
1312                }
1313            }"#,
1314        )
1315        .unwrap();
1316
1317        assert!(is_trade_message(&msg, "BTCUSDT"));
1318    }
1319
1320    #[test]
1321    fn test_is_trade_message_wrong_channel() {
1322        let msg = serde_json::from_str(
1323            r#"{
1324                "arg": {
1325                    "instType": "SPOT",
1326                    "channel": "ticker",
1327                    "instId": "BTCUSDT"
1328                }
1329            }"#,
1330        )
1331        .unwrap();
1332
1333        assert!(!is_trade_message(&msg, "BTCUSDT"));
1334    }
1335
1336    // ==================== Symbol Formatting Tests ====================
1337
1338    #[test]
1339    fn test_format_unified_symbol_usdt() {
1340        assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1341        assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1342    }
1343
1344    #[test]
1345    fn test_format_unified_symbol_usdc() {
1346        assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1347    }
1348
1349    #[test]
1350    fn test_format_unified_symbol_btc() {
1351        assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1352    }
1353
1354    #[test]
1355    fn test_format_unified_symbol_unknown() {
1356        // Unknown quote currency returns as-is
1357        assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1358    }
1359}