ccxt_exchanges/bitget/
ws.rs

1//! Bitget WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for Bitget exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (balance, orders) with automatic reconnection.
6
7use crate::bitget::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19/// Default ping interval for Bitget WebSocket (30 seconds).
20const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
21
22/// Default reconnect delay (5 seconds).
23const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
24
25/// Maximum reconnect attempts.
26const MAX_RECONNECT_ATTEMPTS: u32 = 10;
27
28/// Bitget WebSocket client.
29///
30/// Provides real-time data streaming for Bitget exchange.
31pub struct BitgetWs {
32    /// WebSocket client instance.
33    client: Arc<WsClient>,
34    /// Active subscriptions.
35    subscriptions: Arc<RwLock<Vec<String>>>,
36}
37
38impl BitgetWs {
39    /// Creates a new Bitget WebSocket client.
40    ///
41    /// # Arguments
42    ///
43    /// * `url` - WebSocket server URL
44    pub fn new(url: String) -> Self {
45        let config = WsConfig {
46            url: url.clone(),
47            connect_timeout: 10000,
48            ping_interval: DEFAULT_PING_INTERVAL_MS,
49            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
50            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
51            auto_reconnect: true,
52            enable_compression: false,
53            pong_timeout: 90000,
54            ..Default::default()
55        };
56
57        Self {
58            client: Arc::new(WsClient::new(config)),
59            subscriptions: Arc::new(RwLock::new(Vec::new())),
60        }
61    }
62
63    /// Connects to the WebSocket server.
64    pub async fn connect(&self) -> Result<()> {
65        self.client.connect().await
66    }
67
68    /// Disconnects from the WebSocket server.
69    pub async fn disconnect(&self) -> Result<()> {
70        self.client.disconnect().await
71    }
72
73    /// Returns the current connection state.
74    pub fn state(&self) -> WsConnectionState {
75        self.client.state()
76    }
77
78    /// Checks if the WebSocket is connected.
79    pub fn is_connected(&self) -> bool {
80        self.client.is_connected()
81    }
82
83    /// Receives the next message from the WebSocket.
84    pub async fn receive(&self) -> Option<Value> {
85        self.client.receive().await
86    }
87
88    /// Subscribes to a ticker stream.
89    ///
90    /// # Arguments
91    ///
92    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
93    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
94        let mut arg_map = serde_json::Map::new();
95        arg_map.insert(
96            "instType".to_string(),
97            serde_json::Value::String("SPOT".to_string()),
98        );
99        arg_map.insert(
100            "channel".to_string(),
101            serde_json::Value::String("ticker".to_string()),
102        );
103        arg_map.insert(
104            "instId".to_string(),
105            serde_json::Value::String(symbol.to_string()),
106        );
107        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
108
109        let mut msg_map = serde_json::Map::new();
110        msg_map.insert(
111            "op".to_string(),
112            serde_json::Value::String("subscribe".to_string()),
113        );
114        msg_map.insert("args".to_string(), args);
115        let msg = serde_json::Value::Object(msg_map);
116
117        self.client.send_json(&msg).await?;
118
119        let sub_key = format!("ticker:{}", symbol);
120        self.subscriptions.write().await.push(sub_key);
121
122        Ok(())
123    }
124
125    /// Subscribes to an orderbook stream.
126    ///
127    /// # Arguments
128    ///
129    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
130    /// * `depth` - Orderbook depth (5, 15, or default)
131    pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
132        let channel = match depth {
133            5 => "books5",
134            15 => "books15",
135            _ => "books",
136        };
137
138        let mut arg_map = serde_json::Map::new();
139        arg_map.insert(
140            "instType".to_string(),
141            serde_json::Value::String("SPOT".to_string()),
142        );
143        arg_map.insert(
144            "channel".to_string(),
145            serde_json::Value::String(channel.to_string()),
146        );
147        arg_map.insert(
148            "instId".to_string(),
149            serde_json::Value::String(symbol.to_string()),
150        );
151        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
152
153        let mut msg_map = serde_json::Map::new();
154        msg_map.insert(
155            "op".to_string(),
156            serde_json::Value::String("subscribe".to_string()),
157        );
158        msg_map.insert("args".to_string(), args);
159        let msg = serde_json::Value::Object(msg_map);
160
161        self.client.send_json(&msg).await?;
162
163        let sub_key = format!("orderbook:{}", symbol);
164        self.subscriptions.write().await.push(sub_key);
165
166        Ok(())
167    }
168
169    /// Subscribes to a trades stream.
170    ///
171    /// # Arguments
172    ///
173    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
174    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
175        let mut arg_map = serde_json::Map::new();
176        arg_map.insert(
177            "instType".to_string(),
178            serde_json::Value::String("SPOT".to_string()),
179        );
180        arg_map.insert(
181            "channel".to_string(),
182            serde_json::Value::String("trade".to_string()),
183        );
184        arg_map.insert(
185            "instId".to_string(),
186            serde_json::Value::String(symbol.to_string()),
187        );
188        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
189
190        let mut msg_map = serde_json::Map::new();
191        msg_map.insert(
192            "op".to_string(),
193            serde_json::Value::String("subscribe".to_string()),
194        );
195        msg_map.insert("args".to_string(), args);
196        let msg = serde_json::Value::Object(msg_map);
197
198        self.client.send_json(&msg).await?;
199
200        let sub_key = format!("trades:{}", symbol);
201        self.subscriptions.write().await.push(sub_key);
202
203        Ok(())
204    }
205
206    /// Subscribes to a kline/candlestick stream.
207    ///
208    /// # Arguments
209    ///
210    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
211    /// * `interval` - Kline interval (e.g., "1m", "5m", "1H")
212    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
213        let channel = format!("candle{}", interval);
214
215        let mut arg_map = serde_json::Map::new();
216        arg_map.insert(
217            "instType".to_string(),
218            serde_json::Value::String("SPOT".to_string()),
219        );
220        arg_map.insert(
221            "channel".to_string(),
222            serde_json::Value::String(channel.clone()),
223        );
224        arg_map.insert(
225            "instId".to_string(),
226            serde_json::Value::String(symbol.to_string()),
227        );
228        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
229
230        let mut msg_map = serde_json::Map::new();
231        msg_map.insert(
232            "op".to_string(),
233            serde_json::Value::String("subscribe".to_string()),
234        );
235        msg_map.insert("args".to_string(), args);
236        let msg = serde_json::Value::Object(msg_map);
237
238        self.client.send_json(&msg).await?;
239
240        let sub_key = format!("kline:{}:{}", symbol, interval);
241        self.subscriptions.write().await.push(sub_key);
242
243        Ok(())
244    }
245
246    /// Unsubscribes from a stream.
247    ///
248    /// # Arguments
249    ///
250    /// * `stream_name` - Stream identifier to unsubscribe from
251    pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
252        // Parse stream name to determine channel and symbol
253        let parts: Vec<&str> = stream_name.split(':').collect();
254        if parts.len() < 2 {
255            return Err(Error::invalid_request(format!(
256                "Invalid stream name: {}",
257                stream_name
258            )));
259        }
260
261        let channel = parts[0];
262        let symbol = parts[1];
263
264        let bitget_channel = match channel {
265            "ticker" => "ticker",
266            "orderbook" => "books",
267            "trades" => "trade",
268            "kline" => {
269                if parts.len() >= 3 {
270                    // For kline, we need the interval
271                    return self.unsubscribe_kline(symbol, parts[2]).await;
272                }
273                return Err(Error::invalid_request(
274                    "Kline unsubscribe requires interval",
275                ));
276            }
277            _ => channel,
278        };
279
280        let mut arg_map = serde_json::Map::new();
281        arg_map.insert(
282            "instType".to_string(),
283            serde_json::Value::String("SPOT".to_string()),
284        );
285        arg_map.insert(
286            "channel".to_string(),
287            serde_json::Value::String(bitget_channel.to_string()),
288        );
289        arg_map.insert(
290            "instId".to_string(),
291            serde_json::Value::String(symbol.to_string()),
292        );
293        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
294
295        let mut msg_map = serde_json::Map::new();
296        msg_map.insert(
297            "op".to_string(),
298            serde_json::Value::String("unsubscribe".to_string()),
299        );
300        msg_map.insert("args".to_string(), args);
301        let msg = serde_json::Value::Object(msg_map);
302
303        self.client.send_json(&msg).await?;
304
305        // Remove from subscriptions
306        let mut subs = self.subscriptions.write().await;
307        subs.retain(|s| s != &stream_name);
308
309        Ok(())
310    }
311
312    /// Unsubscribes from a kline stream.
313    async fn unsubscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
314        let channel = format!("candle{}", interval);
315
316        let mut arg_map = serde_json::Map::new();
317        arg_map.insert(
318            "instType".to_string(),
319            serde_json::Value::String("SPOT".to_string()),
320        );
321        arg_map.insert(
322            "channel".to_string(),
323            serde_json::Value::String(channel.clone()),
324        );
325        arg_map.insert(
326            "instId".to_string(),
327            serde_json::Value::String(symbol.to_string()),
328        );
329        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
330
331        let mut msg_map = serde_json::Map::new();
332        msg_map.insert(
333            "op".to_string(),
334            serde_json::Value::String("unsubscribe".to_string()),
335        );
336        msg_map.insert("args".to_string(), args);
337        let msg = serde_json::Value::Object(msg_map);
338
339        self.client.send_json(&msg).await?;
340
341        let sub_key = format!("kline:{}:{}", symbol, interval);
342        let mut subs = self.subscriptions.write().await;
343        subs.retain(|s| s != &sub_key);
344
345        Ok(())
346    }
347
348    /// Returns the list of active subscriptions.
349    pub async fn subscriptions(&self) -> Vec<String> {
350        self.subscriptions.read().await.clone()
351    }
352
353    /// Watches ticker updates for a symbol.
354    ///
355    /// Returns a stream of `Ticker` updates for the specified symbol.
356    ///
357    /// # Arguments
358    ///
359    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
360    /// * `market` - Optional market information for symbol resolution
361    ///
362    /// # Returns
363    ///
364    /// A `MessageStream<Ticker>` that yields ticker updates.
365    ///
366    /// # Example
367    ///
368    /// ```no_run
369    /// use ccxt_exchanges::bitget::ws::BitgetWs;
370    /// use futures::StreamExt;
371    ///
372    /// # async fn example() -> ccxt_core::error::Result<()> {
373    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
374    /// ws.connect().await?;
375    /// let mut stream = ws.watch_ticker("BTCUSDT", None).await?;
376    /// while let Some(result) = stream.next().await {
377    ///     match result {
378    ///         Ok(ticker) => println!("Ticker: {:?}", ticker.last),
379    ///         Err(e) => eprintln!("Error: {}", e),
380    ///     }
381    /// }
382    /// # Ok(())
383    /// # }
384    /// ```
385    pub async fn watch_ticker(
386        &self,
387        symbol: &str,
388        market: Option<Market>,
389    ) -> Result<MessageStream<Ticker>> {
390        // Ensure connected
391        if !self.is_connected() {
392            self.connect().await?;
393        }
394
395        // Subscribe to ticker channel
396        self.subscribe_ticker(symbol).await?;
397
398        // Create channel for ticker updates
399        let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
400        let symbol_owned = symbol.to_string();
401        let client = Arc::clone(&self.client);
402
403        // Spawn task to process messages and filter ticker updates
404        tokio::spawn(async move {
405            while let Some(msg) = client.receive().await {
406                // Check if this is a ticker message for our symbol
407                if is_ticker_message(&msg, &symbol_owned) {
408                    match parse_ws_ticker(&msg, market.as_ref()) {
409                        Ok(ticker) => {
410                            if tx.send(Ok(ticker)).is_err() {
411                                break; // Receiver dropped
412                            }
413                        }
414                        Err(e) => {
415                            if tx.send(Err(e)).is_err() {
416                                break;
417                            }
418                        }
419                    }
420                }
421            }
422        });
423
424        Ok(Box::pin(ReceiverStream::new(rx)))
425    }
426
427    /// Watches order book updates for a symbol.
428    ///
429    /// Returns a stream of `OrderBook` updates for the specified symbol.
430    ///
431    /// # Arguments
432    ///
433    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
434    /// * `limit` - Optional depth limit (5, 15, or full depth)
435    ///
436    /// # Returns
437    ///
438    /// A `MessageStream<OrderBook>` that yields order book updates.
439    ///
440    /// # Example
441    ///
442    /// ```no_run
443    /// use ccxt_exchanges::bitget::ws::BitgetWs;
444    /// use futures::StreamExt;
445    ///
446    /// # async fn example() -> ccxt_core::error::Result<()> {
447    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
448    /// ws.connect().await?;
449    /// let mut stream = ws.watch_order_book("BTCUSDT", Some(5)).await?;
450    /// while let Some(result) = stream.next().await {
451    ///     match result {
452    ///         Ok(orderbook) => println!("Best bid: {:?}", orderbook.bids.first()),
453    ///         Err(e) => eprintln!("Error: {}", e),
454    ///     }
455    /// }
456    /// # Ok(())
457    /// # }
458    /// ```
459    pub async fn watch_order_book(
460        &self,
461        symbol: &str,
462        limit: Option<u32>,
463    ) -> Result<MessageStream<OrderBook>> {
464        // Ensure connected
465        if !self.is_connected() {
466            self.connect().await?;
467        }
468
469        // Subscribe to orderbook channel
470        let depth = limit.unwrap_or(15);
471        self.subscribe_orderbook(symbol, depth).await?;
472
473        // Create channel for orderbook updates
474        let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
475        let symbol_owned = symbol.to_string();
476        let unified_symbol = format_unified_symbol(&symbol_owned);
477        let client = Arc::clone(&self.client);
478
479        // Spawn task to process messages and filter orderbook updates
480        tokio::spawn(async move {
481            while let Some(msg) = client.receive().await {
482                // Check if this is an orderbook message for our symbol
483                if is_orderbook_message(&msg, &symbol_owned) {
484                    match parse_ws_orderbook(&msg, unified_symbol.clone()) {
485                        Ok(orderbook) => {
486                            if tx.send(Ok(orderbook)).is_err() {
487                                break; // Receiver dropped
488                            }
489                        }
490                        Err(e) => {
491                            if tx.send(Err(e)).is_err() {
492                                break;
493                            }
494                        }
495                    }
496                }
497            }
498        });
499
500        Ok(Box::pin(ReceiverStream::new(rx)))
501    }
502
503    /// Watches trade updates for a symbol.
504    ///
505    /// Returns a stream of `Trade` updates for the specified symbol.
506    ///
507    /// # Arguments
508    ///
509    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
510    /// * `market` - Optional market information for symbol resolution
511    ///
512    /// # Returns
513    ///
514    /// A `MessageStream<Vec<Trade>>` that yields trade updates.
515    ///
516    /// # Example
517    ///
518    /// ```no_run
519    /// use ccxt_exchanges::bitget::ws::BitgetWs;
520    /// use futures::StreamExt;
521    ///
522    /// # async fn example() -> ccxt_core::error::Result<()> {
523    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
524    /// ws.connect().await?;
525    /// let mut stream = ws.watch_trades("BTCUSDT", None).await?;
526    /// while let Some(result) = stream.next().await {
527    ///     match result {
528    ///         Ok(trades) => {
529    ///             for trade in trades {
530    ///                 println!("Trade: {:?} @ {:?}", trade.amount, trade.price);
531    ///             }
532    ///         }
533    ///         Err(e) => eprintln!("Error: {}", e),
534    ///     }
535    /// }
536    /// # Ok(())
537    /// # }
538    /// ```
539    pub async fn watch_trades(
540        &self,
541        symbol: &str,
542        market: Option<Market>,
543    ) -> Result<MessageStream<Vec<Trade>>> {
544        // Ensure connected
545        if !self.is_connected() {
546            self.connect().await?;
547        }
548
549        // Subscribe to trades channel
550        self.subscribe_trades(symbol).await?;
551
552        // Create channel for trade updates
553        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
554        let symbol_owned = symbol.to_string();
555        let client = Arc::clone(&self.client);
556
557        // Spawn task to process messages and filter trade updates
558        tokio::spawn(async move {
559            while let Some(msg) = client.receive().await {
560                // Check if this is a trade message for our symbol
561                if is_trade_message(&msg, &symbol_owned) {
562                    match parse_ws_trades(&msg, market.as_ref()) {
563                        Ok(trades) => {
564                            if tx.send(Ok(trades)).is_err() {
565                                break; // Receiver dropped
566                            }
567                        }
568                        Err(e) => {
569                            if tx.send(Err(e)).is_err() {
570                                break;
571                            }
572                        }
573                    }
574                }
575            }
576        });
577
578        Ok(Box::pin(ReceiverStream::new(rx)))
579    }
580}
581
582// ============================================================================
583// Stream Wrapper
584// ============================================================================
585
586/// A stream wrapper that converts an mpsc receiver into a futures Stream.
587struct ReceiverStream<T> {
588    receiver: mpsc::UnboundedReceiver<T>,
589}
590
591impl<T> ReceiverStream<T> {
592    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
593        Self { receiver }
594    }
595}
596
597impl<T> Stream for ReceiverStream<T> {
598    type Item = T;
599
600    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
601        self.receiver.poll_recv(cx)
602    }
603}
604
605// ============================================================================
606// Message Type Detection Helpers
607// ============================================================================
608
609/// Check if a WebSocket message is a ticker message for the given symbol.
610fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
611    if let Some(arg) = msg.get("arg") {
612        let channel = arg.get("channel").and_then(|c| c.as_str());
613        let inst_id = arg.get("instId").and_then(|i| i.as_str());
614
615        channel == Some("ticker") && inst_id == Some(symbol)
616    } else {
617        false
618    }
619}
620
621/// Check if a WebSocket message is an orderbook message for the given symbol.
622fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
623    if let Some(arg) = msg.get("arg") {
624        let channel = arg.get("channel").and_then(|c| c.as_str());
625        let inst_id = arg.get("instId").and_then(|i| i.as_str());
626
627        // Bitget uses books, books5, books15 for orderbook channels
628        let is_orderbook_channel = channel.is_some_and(|c| c.starts_with("books"));
629        is_orderbook_channel && inst_id == Some(symbol)
630    } else {
631        false
632    }
633}
634
635/// Check if a WebSocket message is a trade message for the given symbol.
636fn is_trade_message(msg: &Value, symbol: &str) -> bool {
637    if let Some(arg) = msg.get("arg") {
638        let channel = arg.get("channel").and_then(|c| c.as_str());
639        let inst_id = arg.get("instId").and_then(|i| i.as_str());
640
641        channel == Some("trade") && inst_id == Some(symbol)
642    } else {
643        false
644    }
645}
646
647/// Format a Bitget symbol (e.g., "BTCUSDT") to unified format (e.g., "BTC/USDT").
648fn format_unified_symbol(symbol: &str) -> String {
649    // Common quote currencies to detect
650    let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
651
652    for quote in &quote_currencies {
653        if let Some(base) = symbol.strip_suffix(quote) {
654            if !base.is_empty() {
655                return format!("{}/{}", base, quote);
656            }
657        }
658    }
659
660    // If no known quote currency found, return as-is
661    symbol.to_string()
662}
663
664// ============================================================================
665// WebSocket Message Parsers
666// ============================================================================
667
668/// Parse a WebSocket ticker message.
669pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
670    // Bitget WebSocket ticker format:
671    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"ticker","instId":"BTCUSDT"},"data":[{...}]}
672    let data = msg
673        .get("data")
674        .and_then(|d| d.as_array())
675        .and_then(|arr| arr.first())
676        .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
677
678    parse_ticker(data, market)
679}
680
681/// Parse a WebSocket orderbook message.
682pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
683    // Bitget WebSocket orderbook format:
684    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"books5","instId":"BTCUSDT"},"data":[{...}]}
685    let data = msg
686        .get("data")
687        .and_then(|d| d.as_array())
688        .and_then(|arr| arr.first())
689        .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
690
691    parse_orderbook(data, symbol)
692}
693
694/// Parse a WebSocket trade message (single trade).
695pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
696    // Bitget WebSocket trade format:
697    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"trade","instId":"BTCUSDT"},"data":[{...}]}
698    let data = msg
699        .get("data")
700        .and_then(|d| d.as_array())
701        .and_then(|arr| arr.first())
702        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
703
704    parse_trade(data, market)
705}
706
707/// Parse a WebSocket trade message (multiple trades).
708pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
709    // Bitget WebSocket trade format:
710    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"trade","instId":"BTCUSDT"},"data":[{...}, {...}]}
711    let data_array = msg
712        .get("data")
713        .and_then(|d| d.as_array())
714        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
715
716    let mut trades = Vec::with_capacity(data_array.len());
717    for data in data_array {
718        trades.push(parse_trade(data, market)?);
719    }
720
721    Ok(trades)
722}
723
724#[cfg(test)]
725mod tests {
726    use super::*;
727    use ccxt_core::types::financial::Price;
728    use rust_decimal_macros::dec;
729
730    #[test]
731    fn test_bitget_ws_creation() {
732        let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
733        // WsClient is created successfully - config is private so we just verify creation works
734        assert!(ws.subscriptions.try_read().is_ok());
735    }
736
737    #[tokio::test]
738    async fn test_subscriptions_empty_by_default() {
739        let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
740        let subs = ws.subscriptions().await;
741        assert!(subs.is_empty());
742    }
743
744    // ==================== Ticker Message Parsing Tests ====================
745
746    #[test]
747    fn test_parse_ws_ticker_snapshot() {
748        let msg = serde_json::from_str(
749            r#"{
750                "action": "snapshot",
751                "arg": {
752                    "instType": "SPOT",
753                    "channel": "ticker",
754                    "instId": "BTCUSDT"
755                },
756                "data": [{
757                    "instId": "BTCUSDT",
758                    "lastPr": "50000.00",
759                    "high24h": "51000.00",
760                    "low24h": "49000.00",
761                    "bidPr": "49999.00",
762                    "askPr": "50001.00",
763                    "baseVolume": "1000.5",
764                    "ts": "1700000000000"
765                }]
766            }"#,
767        )
768        .unwrap();
769
770        let ticker = parse_ws_ticker(&msg, None).unwrap();
771        assert_eq!(ticker.symbol, "BTCUSDT");
772        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
773        assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
774        assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
775        assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
776        assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
777        assert_eq!(ticker.timestamp, 1700000000000);
778    }
779
780    #[test]
781    fn test_parse_ws_ticker_with_market() {
782        let msg = serde_json::from_str(
783            r#"{
784                "action": "snapshot",
785                "arg": {
786                    "instType": "SPOT",
787                    "channel": "ticker",
788                    "instId": "BTCUSDT"
789                },
790                "data": [{
791                    "instId": "BTCUSDT",
792                    "lastPr": "50000.00",
793                    "ts": "1700000000000"
794                }]
795            }"#,
796        )
797        .unwrap();
798
799        let market = Market {
800            id: "BTCUSDT".to_string(),
801            symbol: "BTC/USDT".to_string(),
802            base: "BTC".to_string(),
803            quote: "USDT".to_string(),
804            ..Default::default()
805        };
806
807        let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
808        assert_eq!(ticker.symbol, "BTC/USDT");
809        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
810    }
811
812    #[test]
813    fn test_parse_ws_ticker_missing_data() {
814        let msg = serde_json::from_str(
815            r#"{
816                "action": "snapshot",
817                "arg": {
818                    "instType": "SPOT",
819                    "channel": "ticker",
820                    "instId": "BTCUSDT"
821                }
822            }"#,
823        )
824        .unwrap();
825
826        let result = parse_ws_ticker(&msg, None);
827        assert!(result.is_err());
828    }
829
830    #[test]
831    fn test_parse_ws_ticker_empty_data_array() {
832        let msg = serde_json::from_str(
833            r#"{
834                "action": "snapshot",
835                "arg": {
836                    "instType": "SPOT",
837                    "channel": "ticker",
838                    "instId": "BTCUSDT"
839                },
840                "data": []
841            }"#,
842        )
843        .unwrap();
844
845        let result = parse_ws_ticker(&msg, None);
846        assert!(result.is_err());
847    }
848
849    // ==================== OrderBook Message Parsing Tests ====================
850
851    #[test]
852    fn test_parse_ws_orderbook_snapshot() {
853        let msg = serde_json::from_str(
854            r#"{
855                "action": "snapshot",
856                "arg": {
857                    "instType": "SPOT",
858                    "channel": "books5",
859                    "instId": "BTCUSDT"
860                },
861                "data": [{
862                    "bids": [
863                        ["50000.00", "1.5"],
864                        ["49999.00", "2.0"],
865                        ["49998.00", "0.5"]
866                    ],
867                    "asks": [
868                        ["50001.00", "1.0"],
869                        ["50002.00", "3.0"],
870                        ["50003.00", "2.5"]
871                    ],
872                    "ts": "1700000000000"
873                }]
874            }"#,
875        )
876        .unwrap();
877
878        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
879        assert_eq!(orderbook.symbol, "BTC/USDT");
880        assert_eq!(orderbook.bids.len(), 3);
881        assert_eq!(orderbook.asks.len(), 3);
882
883        // Verify bids are sorted in descending order
884        assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
885        assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
886        assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
887
888        // Verify asks are sorted in ascending order
889        assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
890        assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
891        assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
892    }
893
894    #[test]
895    fn test_parse_ws_orderbook_update() {
896        let msg = serde_json::from_str(
897            r#"{
898                "action": "update",
899                "arg": {
900                    "instType": "SPOT",
901                    "channel": "books",
902                    "instId": "ETHUSDT"
903                },
904                "data": [{
905                    "bids": [
906                        ["2000.00", "10.0"]
907                    ],
908                    "asks": [
909                        ["2001.00", "5.0"]
910                    ],
911                    "ts": "1700000000001"
912                }]
913            }"#,
914        )
915        .unwrap();
916
917        let orderbook = parse_ws_orderbook(&msg, "ETH/USDT".to_string()).unwrap();
918        assert_eq!(orderbook.symbol, "ETH/USDT");
919        assert_eq!(orderbook.bids.len(), 1);
920        assert_eq!(orderbook.asks.len(), 1);
921        assert_eq!(orderbook.timestamp, 1700000000001);
922    }
923
924    #[test]
925    fn test_parse_ws_orderbook_missing_data() {
926        let msg = serde_json::from_str(
927            r#"{
928                "action": "snapshot",
929                "arg": {
930                    "instType": "SPOT",
931                    "channel": "books5",
932                    "instId": "BTCUSDT"
933                }
934            }"#,
935        )
936        .unwrap();
937
938        let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
939        assert!(result.is_err());
940    }
941
942    #[test]
943    fn test_parse_ws_orderbook_empty_sides() {
944        let msg = serde_json::from_str(
945            r#"{
946                "action": "snapshot",
947                "arg": {
948                    "instType": "SPOT",
949                    "channel": "books5",
950                    "instId": "BTCUSDT"
951                },
952                "data": [{
953                    "bids": [],
954                    "asks": [],
955                    "ts": "1700000000000"
956                }]
957            }"#,
958        )
959        .unwrap();
960
961        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
962        assert!(orderbook.bids.is_empty());
963        assert!(orderbook.asks.is_empty());
964    }
965
966    // ==================== Trade Message Parsing Tests ====================
967
968    #[test]
969    fn test_parse_ws_trade_single() {
970        let msg = serde_json::from_str(
971            r#"{
972                "action": "snapshot",
973                "arg": {
974                    "instType": "SPOT",
975                    "channel": "trade",
976                    "instId": "BTCUSDT"
977                },
978                "data": [{
979                    "tradeId": "123456789",
980                    "symbol": "BTCUSDT",
981                    "side": "buy",
982                    "price": "50000.00",
983                    "size": "0.5",
984                    "ts": "1700000000000"
985                }]
986            }"#,
987        )
988        .unwrap();
989
990        let trade = parse_ws_trade(&msg, None).unwrap();
991        assert_eq!(trade.id, Some("123456789".to_string()));
992        assert_eq!(trade.side, ccxt_core::types::OrderSide::Buy);
993        assert_eq!(trade.price, Price::new(dec!(50000.00)));
994        assert_eq!(
995            trade.amount,
996            ccxt_core::types::financial::Amount::new(dec!(0.5))
997        );
998        assert_eq!(trade.timestamp, 1700000000000);
999    }
1000
1001    #[test]
1002    fn test_parse_ws_trades_multiple() {
1003        let msg = serde_json::from_str(
1004            r#"{
1005                "action": "snapshot",
1006                "arg": {
1007                    "instType": "SPOT",
1008                    "channel": "trade",
1009                    "instId": "BTCUSDT"
1010                },
1011                "data": [
1012                    {
1013                        "tradeId": "123456789",
1014                        "symbol": "BTCUSDT",
1015                        "side": "buy",
1016                        "price": "50000.00",
1017                        "size": "0.5",
1018                        "ts": "1700000000000"
1019                    },
1020                    {
1021                        "tradeId": "123456790",
1022                        "symbol": "BTCUSDT",
1023                        "side": "sell",
1024                        "price": "50001.00",
1025                        "size": "1.0",
1026                        "ts": "1700000000001"
1027                    }
1028                ]
1029            }"#,
1030        )
1031        .unwrap();
1032
1033        let trades = parse_ws_trades(&msg, None).unwrap();
1034        assert_eq!(trades.len(), 2);
1035
1036        assert_eq!(trades[0].id, Some("123456789".to_string()));
1037        assert_eq!(trades[0].side, ccxt_core::types::OrderSide::Buy);
1038
1039        assert_eq!(trades[1].id, Some("123456790".to_string()));
1040        assert_eq!(trades[1].side, ccxt_core::types::OrderSide::Sell);
1041    }
1042
1043    #[test]
1044    fn test_parse_ws_trade_sell_side() {
1045        let msg = serde_json::from_str(
1046            r#"{
1047                "action": "snapshot",
1048                "arg": {
1049                    "instType": "SPOT",
1050                    "channel": "trade",
1051                    "instId": "BTCUSDT"
1052                },
1053                "data": [{
1054                    "tradeId": "123456789",
1055                    "symbol": "BTCUSDT",
1056                    "side": "sell",
1057                    "price": "50000.00",
1058                    "size": "0.5",
1059                    "ts": "1700000000000"
1060                }]
1061            }"#,
1062        )
1063        .unwrap();
1064
1065        let trade = parse_ws_trade(&msg, None).unwrap();
1066        assert_eq!(trade.side, ccxt_core::types::OrderSide::Sell);
1067    }
1068
1069    #[test]
1070    fn test_parse_ws_trade_missing_data() {
1071        let msg = serde_json::from_str(
1072            r#"{
1073                "action": "snapshot",
1074                "arg": {
1075                    "instType": "SPOT",
1076                    "channel": "trade",
1077                    "instId": "BTCUSDT"
1078                }
1079            }"#,
1080        )
1081        .unwrap();
1082
1083        let result = parse_ws_trade(&msg, None);
1084        assert!(result.is_err());
1085    }
1086
1087    #[test]
1088    fn test_parse_ws_trades_empty_array() {
1089        let msg = serde_json::from_str(
1090            r#"{
1091                "action": "snapshot",
1092                "arg": {
1093                    "instType": "SPOT",
1094                    "channel": "trade",
1095                    "instId": "BTCUSDT"
1096                },
1097                "data": []
1098            }"#,
1099        )
1100        .unwrap();
1101
1102        let trades = parse_ws_trades(&msg, None).unwrap();
1103        assert!(trades.is_empty());
1104    }
1105
1106    // ==================== Message Type Detection Tests ====================
1107
1108    #[test]
1109    fn test_is_ticker_message_true() {
1110        let msg = serde_json::from_str(
1111            r#"{
1112                "action": "snapshot",
1113                "arg": {
1114                    "instType": "SPOT",
1115                    "channel": "ticker",
1116                    "instId": "BTCUSDT"
1117                },
1118                "data": [{}]
1119            }"#,
1120        )
1121        .unwrap();
1122
1123        assert!(is_ticker_message(&msg, "BTCUSDT"));
1124    }
1125
1126    #[test]
1127    fn test_is_ticker_message_wrong_symbol() {
1128        let msg = serde_json::from_str(
1129            r#"{
1130                "action": "snapshot",
1131                "arg": {
1132                    "instType": "SPOT",
1133                    "channel": "ticker",
1134                    "instId": "ETHUSDT"
1135                },
1136                "data": [{}]
1137            }"#,
1138        )
1139        .unwrap();
1140
1141        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1142    }
1143
1144    #[test]
1145    fn test_is_ticker_message_wrong_channel() {
1146        let msg = serde_json::from_str(
1147            r#"{
1148                "action": "snapshot",
1149                "arg": {
1150                    "instType": "SPOT",
1151                    "channel": "trade",
1152                    "instId": "BTCUSDT"
1153                },
1154                "data": [{}]
1155            }"#,
1156        )
1157        .unwrap();
1158
1159        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1160    }
1161
1162    #[test]
1163    fn test_is_orderbook_message_books5() {
1164        let msg = serde_json::from_str(
1165            r#"{
1166                "arg": {
1167                    "instType": "SPOT",
1168                    "channel": "books5",
1169                    "instId": "BTCUSDT"
1170                }
1171            }"#,
1172        )
1173        .unwrap();
1174
1175        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1176    }
1177
1178    #[test]
1179    fn test_is_orderbook_message_books15() {
1180        let msg = serde_json::from_str(
1181            r#"{
1182                "arg": {
1183                    "instType": "SPOT",
1184                    "channel": "books15",
1185                    "instId": "BTCUSDT"
1186                }
1187            }"#,
1188        )
1189        .unwrap();
1190
1191        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1192    }
1193
1194    #[test]
1195    fn test_is_orderbook_message_books() {
1196        let msg = serde_json::from_str(
1197            r#"{
1198                "arg": {
1199                    "instType": "SPOT",
1200                    "channel": "books",
1201                    "instId": "BTCUSDT"
1202                }
1203            }"#,
1204        )
1205        .unwrap();
1206
1207        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1208    }
1209
1210    #[test]
1211    fn test_is_trade_message_true() {
1212        let msg = serde_json::from_str(
1213            r#"{
1214                "arg": {
1215                    "instType": "SPOT",
1216                    "channel": "trade",
1217                    "instId": "BTCUSDT"
1218                }
1219            }"#,
1220        )
1221        .unwrap();
1222
1223        assert!(is_trade_message(&msg, "BTCUSDT"));
1224    }
1225
1226    #[test]
1227    fn test_is_trade_message_wrong_channel() {
1228        let msg = serde_json::from_str(
1229            r#"{
1230                "arg": {
1231                    "instType": "SPOT",
1232                    "channel": "ticker",
1233                    "instId": "BTCUSDT"
1234                }
1235            }"#,
1236        )
1237        .unwrap();
1238
1239        assert!(!is_trade_message(&msg, "BTCUSDT"));
1240    }
1241
1242    // ==================== Symbol Formatting Tests ====================
1243
1244    #[test]
1245    fn test_format_unified_symbol_usdt() {
1246        assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1247        assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1248    }
1249
1250    #[test]
1251    fn test_format_unified_symbol_usdc() {
1252        assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1253    }
1254
1255    #[test]
1256    fn test_format_unified_symbol_btc() {
1257        assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1258    }
1259
1260    #[test]
1261    fn test_format_unified_symbol_unknown() {
1262        // Unknown quote currency returns as-is
1263        assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1264    }
1265}