ccxt_exchanges/bitget/
ws.rs

1//! Bitget WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for Bitget exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (balance, orders) with automatic reconnection.
6
7use crate::bitget::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19/// Default ping interval for Bitget WebSocket (30 seconds).
20const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
21
22/// Default reconnect delay (5 seconds).
23const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
24
25/// Maximum reconnect attempts.
26const MAX_RECONNECT_ATTEMPTS: u32 = 10;
27
28/// Bitget WebSocket client.
29///
30/// Provides real-time data streaming for Bitget exchange.
31pub struct BitgetWs {
32    /// WebSocket client instance.
33    client: Arc<WsClient>,
34    /// Active subscriptions.
35    subscriptions: Arc<RwLock<Vec<String>>>,
36}
37
38impl BitgetWs {
39    /// Creates a new Bitget WebSocket client.
40    ///
41    /// # Arguments
42    ///
43    /// * `url` - WebSocket server URL
44    pub fn new(url: String) -> Self {
45        let config = WsConfig {
46            url: url.clone(),
47            connect_timeout: 10000,
48            ping_interval: DEFAULT_PING_INTERVAL_MS,
49            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
50            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
51            auto_reconnect: true,
52            enable_compression: false,
53            pong_timeout: 90000,
54        };
55
56        Self {
57            client: Arc::new(WsClient::new(config)),
58            subscriptions: Arc::new(RwLock::new(Vec::new())),
59        }
60    }
61
62    /// Connects to the WebSocket server.
63    pub async fn connect(&self) -> Result<()> {
64        self.client.connect().await
65    }
66
67    /// Disconnects from the WebSocket server.
68    pub async fn disconnect(&self) -> Result<()> {
69        self.client.disconnect().await
70    }
71
72    /// Returns the current connection state.
73    pub async fn state(&self) -> WsConnectionState {
74        self.client.state().await
75    }
76
77    /// Checks if the WebSocket is connected.
78    pub async fn is_connected(&self) -> bool {
79        self.client.is_connected().await
80    }
81
82    /// Receives the next message from the WebSocket.
83    pub async fn receive(&self) -> Option<Value> {
84        self.client.receive().await
85    }
86
87    /// Subscribes to a ticker stream.
88    ///
89    /// # Arguments
90    ///
91    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
92    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
93        let mut arg_map = serde_json::Map::new();
94        arg_map.insert(
95            "instType".to_string(),
96            serde_json::Value::String("SPOT".to_string()),
97        );
98        arg_map.insert(
99            "channel".to_string(),
100            serde_json::Value::String("ticker".to_string()),
101        );
102        arg_map.insert(
103            "instId".to_string(),
104            serde_json::Value::String(symbol.to_string()),
105        );
106        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
107
108        let mut msg_map = serde_json::Map::new();
109        msg_map.insert(
110            "op".to_string(),
111            serde_json::Value::String("subscribe".to_string()),
112        );
113        msg_map.insert("args".to_string(), args);
114        let msg = serde_json::Value::Object(msg_map);
115
116        self.client.send_json(&msg).await?;
117
118        let sub_key = format!("ticker:{}", symbol);
119        self.subscriptions.write().await.push(sub_key);
120
121        Ok(())
122    }
123
124    /// Subscribes to an orderbook stream.
125    ///
126    /// # Arguments
127    ///
128    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
129    /// * `depth` - Orderbook depth (5, 15, or default)
130    pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
131        let channel = match depth {
132            5 => "books5",
133            15 => "books15",
134            _ => "books",
135        };
136
137        let mut arg_map = serde_json::Map::new();
138        arg_map.insert(
139            "instType".to_string(),
140            serde_json::Value::String("SPOT".to_string()),
141        );
142        arg_map.insert(
143            "channel".to_string(),
144            serde_json::Value::String(channel.to_string()),
145        );
146        arg_map.insert(
147            "instId".to_string(),
148            serde_json::Value::String(symbol.to_string()),
149        );
150        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
151
152        let mut msg_map = serde_json::Map::new();
153        msg_map.insert(
154            "op".to_string(),
155            serde_json::Value::String("subscribe".to_string()),
156        );
157        msg_map.insert("args".to_string(), args);
158        let msg = serde_json::Value::Object(msg_map);
159
160        self.client.send_json(&msg).await?;
161
162        let sub_key = format!("orderbook:{}", symbol);
163        self.subscriptions.write().await.push(sub_key);
164
165        Ok(())
166    }
167
168    /// Subscribes to a trades stream.
169    ///
170    /// # Arguments
171    ///
172    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
173    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
174        let mut arg_map = serde_json::Map::new();
175        arg_map.insert(
176            "instType".to_string(),
177            serde_json::Value::String("SPOT".to_string()),
178        );
179        arg_map.insert(
180            "channel".to_string(),
181            serde_json::Value::String("trade".to_string()),
182        );
183        arg_map.insert(
184            "instId".to_string(),
185            serde_json::Value::String(symbol.to_string()),
186        );
187        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
188
189        let mut msg_map = serde_json::Map::new();
190        msg_map.insert(
191            "op".to_string(),
192            serde_json::Value::String("subscribe".to_string()),
193        );
194        msg_map.insert("args".to_string(), args);
195        let msg = serde_json::Value::Object(msg_map);
196
197        self.client.send_json(&msg).await?;
198
199        let sub_key = format!("trades:{}", symbol);
200        self.subscriptions.write().await.push(sub_key);
201
202        Ok(())
203    }
204
205    /// Subscribes to a kline/candlestick stream.
206    ///
207    /// # Arguments
208    ///
209    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
210    /// * `interval` - Kline interval (e.g., "1m", "5m", "1H")
211    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
212        let channel = format!("candle{}", interval);
213
214        let mut arg_map = serde_json::Map::new();
215        arg_map.insert(
216            "instType".to_string(),
217            serde_json::Value::String("SPOT".to_string()),
218        );
219        arg_map.insert(
220            "channel".to_string(),
221            serde_json::Value::String(channel.clone()),
222        );
223        arg_map.insert(
224            "instId".to_string(),
225            serde_json::Value::String(symbol.to_string()),
226        );
227        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
228
229        let mut msg_map = serde_json::Map::new();
230        msg_map.insert(
231            "op".to_string(),
232            serde_json::Value::String("subscribe".to_string()),
233        );
234        msg_map.insert("args".to_string(), args);
235        let msg = serde_json::Value::Object(msg_map);
236
237        self.client.send_json(&msg).await?;
238
239        let sub_key = format!("kline:{}:{}", symbol, interval);
240        self.subscriptions.write().await.push(sub_key);
241
242        Ok(())
243    }
244
245    /// Unsubscribes from a stream.
246    ///
247    /// # Arguments
248    ///
249    /// * `stream_name` - Stream identifier to unsubscribe from
250    pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
251        // Parse stream name to determine channel and symbol
252        let parts: Vec<&str> = stream_name.split(':').collect();
253        if parts.len() < 2 {
254            return Err(Error::invalid_request(format!(
255                "Invalid stream name: {}",
256                stream_name
257            )));
258        }
259
260        let channel = parts[0];
261        let symbol = parts[1];
262
263        let bitget_channel = match channel {
264            "ticker" => "ticker",
265            "orderbook" => "books",
266            "trades" => "trade",
267            "kline" => {
268                if parts.len() >= 3 {
269                    // For kline, we need the interval
270                    return self.unsubscribe_kline(symbol, parts[2]).await;
271                }
272                return Err(Error::invalid_request(
273                    "Kline unsubscribe requires interval",
274                ));
275            }
276            _ => channel,
277        };
278
279        let mut arg_map = serde_json::Map::new();
280        arg_map.insert(
281            "instType".to_string(),
282            serde_json::Value::String("SPOT".to_string()),
283        );
284        arg_map.insert(
285            "channel".to_string(),
286            serde_json::Value::String(bitget_channel.to_string()),
287        );
288        arg_map.insert(
289            "instId".to_string(),
290            serde_json::Value::String(symbol.to_string()),
291        );
292        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
293
294        let mut msg_map = serde_json::Map::new();
295        msg_map.insert(
296            "op".to_string(),
297            serde_json::Value::String("unsubscribe".to_string()),
298        );
299        msg_map.insert("args".to_string(), args);
300        let msg = serde_json::Value::Object(msg_map);
301
302        self.client.send_json(&msg).await?;
303
304        // Remove from subscriptions
305        let mut subs = self.subscriptions.write().await;
306        subs.retain(|s| s != &stream_name);
307
308        Ok(())
309    }
310
311    /// Unsubscribes from a kline stream.
312    async fn unsubscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
313        let channel = format!("candle{}", interval);
314
315        let mut arg_map = serde_json::Map::new();
316        arg_map.insert(
317            "instType".to_string(),
318            serde_json::Value::String("SPOT".to_string()),
319        );
320        arg_map.insert(
321            "channel".to_string(),
322            serde_json::Value::String(channel.clone()),
323        );
324        arg_map.insert(
325            "instId".to_string(),
326            serde_json::Value::String(symbol.to_string()),
327        );
328        let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
329
330        let mut msg_map = serde_json::Map::new();
331        msg_map.insert(
332            "op".to_string(),
333            serde_json::Value::String("unsubscribe".to_string()),
334        );
335        msg_map.insert("args".to_string(), args);
336        let msg = serde_json::Value::Object(msg_map);
337
338        self.client.send_json(&msg).await?;
339
340        let sub_key = format!("kline:{}:{}", symbol, interval);
341        let mut subs = self.subscriptions.write().await;
342        subs.retain(|s| s != &sub_key);
343
344        Ok(())
345    }
346
347    /// Returns the list of active subscriptions.
348    pub async fn subscriptions(&self) -> Vec<String> {
349        self.subscriptions.read().await.clone()
350    }
351
352    /// Watches ticker updates for a symbol.
353    ///
354    /// Returns a stream of `Ticker` updates for the specified symbol.
355    ///
356    /// # Arguments
357    ///
358    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
359    /// * `market` - Optional market information for symbol resolution
360    ///
361    /// # Returns
362    ///
363    /// A `MessageStream<Ticker>` that yields ticker updates.
364    ///
365    /// # Example
366    ///
367    /// ```no_run
368    /// use ccxt_exchanges::bitget::ws::BitgetWs;
369    /// use futures::StreamExt;
370    ///
371    /// # async fn example() -> ccxt_core::error::Result<()> {
372    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
373    /// ws.connect().await?;
374    /// let mut stream = ws.watch_ticker("BTCUSDT", None).await?;
375    /// while let Some(result) = stream.next().await {
376    ///     match result {
377    ///         Ok(ticker) => println!("Ticker: {:?}", ticker.last),
378    ///         Err(e) => eprintln!("Error: {}", e),
379    ///     }
380    /// }
381    /// # Ok(())
382    /// # }
383    /// ```
384    pub async fn watch_ticker(
385        &self,
386        symbol: &str,
387        market: Option<Market>,
388    ) -> Result<MessageStream<Ticker>> {
389        // Ensure connected
390        if !self.is_connected().await {
391            self.connect().await?;
392        }
393
394        // Subscribe to ticker channel
395        self.subscribe_ticker(symbol).await?;
396
397        // Create channel for ticker updates
398        let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
399        let symbol_owned = symbol.to_string();
400        let client = Arc::clone(&self.client);
401
402        // Spawn task to process messages and filter ticker updates
403        tokio::spawn(async move {
404            while let Some(msg) = client.receive().await {
405                // Check if this is a ticker message for our symbol
406                if is_ticker_message(&msg, &symbol_owned) {
407                    match parse_ws_ticker(&msg, market.as_ref()) {
408                        Ok(ticker) => {
409                            if tx.send(Ok(ticker)).is_err() {
410                                break; // Receiver dropped
411                            }
412                        }
413                        Err(e) => {
414                            if tx.send(Err(e)).is_err() {
415                                break;
416                            }
417                        }
418                    }
419                }
420            }
421        });
422
423        Ok(Box::pin(ReceiverStream::new(rx)))
424    }
425
426    /// Watches order book updates for a symbol.
427    ///
428    /// Returns a stream of `OrderBook` updates for the specified symbol.
429    ///
430    /// # Arguments
431    ///
432    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
433    /// * `limit` - Optional depth limit (5, 15, or full depth)
434    ///
435    /// # Returns
436    ///
437    /// A `MessageStream<OrderBook>` that yields order book updates.
438    ///
439    /// # Example
440    ///
441    /// ```no_run
442    /// use ccxt_exchanges::bitget::ws::BitgetWs;
443    /// use futures::StreamExt;
444    ///
445    /// # async fn example() -> ccxt_core::error::Result<()> {
446    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
447    /// ws.connect().await?;
448    /// let mut stream = ws.watch_order_book("BTCUSDT", Some(5)).await?;
449    /// while let Some(result) = stream.next().await {
450    ///     match result {
451    ///         Ok(orderbook) => println!("Best bid: {:?}", orderbook.bids.first()),
452    ///         Err(e) => eprintln!("Error: {}", e),
453    ///     }
454    /// }
455    /// # Ok(())
456    /// # }
457    /// ```
458    pub async fn watch_order_book(
459        &self,
460        symbol: &str,
461        limit: Option<u32>,
462    ) -> Result<MessageStream<OrderBook>> {
463        // Ensure connected
464        if !self.is_connected().await {
465            self.connect().await?;
466        }
467
468        // Subscribe to orderbook channel
469        let depth = limit.unwrap_or(15);
470        self.subscribe_orderbook(symbol, depth).await?;
471
472        // Create channel for orderbook updates
473        let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
474        let symbol_owned = symbol.to_string();
475        let unified_symbol = format_unified_symbol(&symbol_owned);
476        let client = Arc::clone(&self.client);
477
478        // Spawn task to process messages and filter orderbook updates
479        tokio::spawn(async move {
480            while let Some(msg) = client.receive().await {
481                // Check if this is an orderbook message for our symbol
482                if is_orderbook_message(&msg, &symbol_owned) {
483                    match parse_ws_orderbook(&msg, unified_symbol.clone()) {
484                        Ok(orderbook) => {
485                            if tx.send(Ok(orderbook)).is_err() {
486                                break; // Receiver dropped
487                            }
488                        }
489                        Err(e) => {
490                            if tx.send(Err(e)).is_err() {
491                                break;
492                            }
493                        }
494                    }
495                }
496            }
497        });
498
499        Ok(Box::pin(ReceiverStream::new(rx)))
500    }
501
502    /// Watches trade updates for a symbol.
503    ///
504    /// Returns a stream of `Trade` updates for the specified symbol.
505    ///
506    /// # Arguments
507    ///
508    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
509    /// * `market` - Optional market information for symbol resolution
510    ///
511    /// # Returns
512    ///
513    /// A `MessageStream<Vec<Trade>>` that yields trade updates.
514    ///
515    /// # Example
516    ///
517    /// ```no_run
518    /// use ccxt_exchanges::bitget::ws::BitgetWs;
519    /// use futures::StreamExt;
520    ///
521    /// # async fn example() -> ccxt_core::error::Result<()> {
522    /// let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
523    /// ws.connect().await?;
524    /// let mut stream = ws.watch_trades("BTCUSDT", None).await?;
525    /// while let Some(result) = stream.next().await {
526    ///     match result {
527    ///         Ok(trades) => {
528    ///             for trade in trades {
529    ///                 println!("Trade: {:?} @ {:?}", trade.amount, trade.price);
530    ///             }
531    ///         }
532    ///         Err(e) => eprintln!("Error: {}", e),
533    ///     }
534    /// }
535    /// # Ok(())
536    /// # }
537    /// ```
538    pub async fn watch_trades(
539        &self,
540        symbol: &str,
541        market: Option<Market>,
542    ) -> Result<MessageStream<Vec<Trade>>> {
543        // Ensure connected
544        if !self.is_connected().await {
545            self.connect().await?;
546        }
547
548        // Subscribe to trades channel
549        self.subscribe_trades(symbol).await?;
550
551        // Create channel for trade updates
552        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
553        let symbol_owned = symbol.to_string();
554        let client = Arc::clone(&self.client);
555
556        // Spawn task to process messages and filter trade updates
557        tokio::spawn(async move {
558            while let Some(msg) = client.receive().await {
559                // Check if this is a trade message for our symbol
560                if is_trade_message(&msg, &symbol_owned) {
561                    match parse_ws_trades(&msg, market.as_ref()) {
562                        Ok(trades) => {
563                            if tx.send(Ok(trades)).is_err() {
564                                break; // Receiver dropped
565                            }
566                        }
567                        Err(e) => {
568                            if tx.send(Err(e)).is_err() {
569                                break;
570                            }
571                        }
572                    }
573                }
574            }
575        });
576
577        Ok(Box::pin(ReceiverStream::new(rx)))
578    }
579}
580
581// ============================================================================
582// Stream Wrapper
583// ============================================================================
584
585/// A stream wrapper that converts an mpsc receiver into a futures Stream.
586struct ReceiverStream<T> {
587    receiver: mpsc::UnboundedReceiver<T>,
588}
589
590impl<T> ReceiverStream<T> {
591    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
592        Self { receiver }
593    }
594}
595
596impl<T> Stream for ReceiverStream<T> {
597    type Item = T;
598
599    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
600        self.receiver.poll_recv(cx)
601    }
602}
603
604// ============================================================================
605// Message Type Detection Helpers
606// ============================================================================
607
608/// Check if a WebSocket message is a ticker message for the given symbol.
609fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
610    if let Some(arg) = msg.get("arg") {
611        let channel = arg.get("channel").and_then(|c| c.as_str());
612        let inst_id = arg.get("instId").and_then(|i| i.as_str());
613
614        channel == Some("ticker") && inst_id == Some(symbol)
615    } else {
616        false
617    }
618}
619
620/// Check if a WebSocket message is an orderbook message for the given symbol.
621fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
622    if let Some(arg) = msg.get("arg") {
623        let channel = arg.get("channel").and_then(|c| c.as_str());
624        let inst_id = arg.get("instId").and_then(|i| i.as_str());
625
626        // Bitget uses books, books5, books15 for orderbook channels
627        let is_orderbook_channel = channel.map(|c| c.starts_with("books")).unwrap_or(false);
628        is_orderbook_channel && inst_id == Some(symbol)
629    } else {
630        false
631    }
632}
633
634/// Check if a WebSocket message is a trade message for the given symbol.
635fn is_trade_message(msg: &Value, symbol: &str) -> bool {
636    if let Some(arg) = msg.get("arg") {
637        let channel = arg.get("channel").and_then(|c| c.as_str());
638        let inst_id = arg.get("instId").and_then(|i| i.as_str());
639
640        channel == Some("trade") && inst_id == Some(symbol)
641    } else {
642        false
643    }
644}
645
646/// Format a Bitget symbol (e.g., "BTCUSDT") to unified format (e.g., "BTC/USDT").
647fn format_unified_symbol(symbol: &str) -> String {
648    // Common quote currencies to detect
649    let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
650
651    for quote in &quote_currencies {
652        if symbol.ends_with(quote) {
653            let base = &symbol[..symbol.len() - quote.len()];
654            if !base.is_empty() {
655                return format!("{}/{}", base, quote);
656            }
657        }
658    }
659
660    // If no known quote currency found, return as-is
661    symbol.to_string()
662}
663
664// ============================================================================
665// WebSocket Message Parsers
666// ============================================================================
667
668/// Parse a WebSocket ticker message.
669pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
670    // Bitget WebSocket ticker format:
671    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"ticker","instId":"BTCUSDT"},"data":[{...}]}
672    let data = msg
673        .get("data")
674        .and_then(|d| d.as_array())
675        .and_then(|arr| arr.first())
676        .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
677
678    parse_ticker(data, market)
679}
680
681/// Parse a WebSocket orderbook message.
682pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
683    // Bitget WebSocket orderbook format:
684    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"books5","instId":"BTCUSDT"},"data":[{...}]}
685    let data = msg
686        .get("data")
687        .and_then(|d| d.as_array())
688        .and_then(|arr| arr.first())
689        .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
690
691    parse_orderbook(data, symbol)
692}
693
694/// Parse a WebSocket trade message (single trade).
695pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
696    // Bitget WebSocket trade format:
697    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"trade","instId":"BTCUSDT"},"data":[{...}]}
698    let data = msg
699        .get("data")
700        .and_then(|d| d.as_array())
701        .and_then(|arr| arr.first())
702        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
703
704    parse_trade(data, market)
705}
706
707/// Parse a WebSocket trade message (multiple trades).
708pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
709    // Bitget WebSocket trade format:
710    // {"action":"snapshot","arg":{"instType":"SPOT","channel":"trade","instId":"BTCUSDT"},"data":[{...}, {...}]}
711    let data_array = msg
712        .get("data")
713        .and_then(|d| d.as_array())
714        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
715
716    let mut trades = Vec::with_capacity(data_array.len());
717    for data in data_array {
718        trades.push(parse_trade(data, market)?);
719    }
720
721    Ok(trades)
722}
723
724#[cfg(test)]
725mod tests {
726    use super::*;
727    use ccxt_core::types::financial::Price;
728    use rust_decimal_macros::dec;
729
730    #[test]
731    fn test_bitget_ws_creation() {
732        let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
733        // WsClient is created successfully - config is private so we just verify creation works
734        assert!(ws.subscriptions.try_read().is_ok());
735    }
736
737    #[tokio::test]
738    async fn test_subscriptions_empty_by_default() {
739        let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
740        let subs = ws.subscriptions().await;
741        assert!(subs.is_empty());
742    }
743
744    // ==================== Ticker Message Parsing Tests ====================
745
746    #[test]
747    fn test_parse_ws_ticker_snapshot() {
748        let msg = serde_json::from_str(
749            r#"{
750                "action": "snapshot",
751                "arg": {
752                    "instType": "SPOT",
753                    "channel": "ticker",
754                    "instId": "BTCUSDT"
755                },
756                "data": [{
757                    "instId": "BTCUSDT",
758                    "lastPr": "50000.00",
759                    "high24h": "51000.00",
760                    "low24h": "49000.00",
761                    "bidPr": "49999.00",
762                    "askPr": "50001.00",
763                    "baseVolume": "1000.5",
764                    "ts": "1700000000000"
765                }]
766            }"#,
767        )
768        .unwrap();
769
770        let ticker = parse_ws_ticker(&msg, None).unwrap();
771        assert_eq!(ticker.symbol, "BTCUSDT");
772        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
773        assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
774        assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
775        assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
776        assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
777        assert_eq!(ticker.timestamp, 1700000000000);
778    }
779
780    #[test]
781    fn test_parse_ws_ticker_with_market() {
782        let msg = serde_json::from_str(
783            r#"{
784                "action": "snapshot",
785                "arg": {
786                    "instType": "SPOT",
787                    "channel": "ticker",
788                    "instId": "BTCUSDT"
789                },
790                "data": [{
791                    "instId": "BTCUSDT",
792                    "lastPr": "50000.00",
793                    "ts": "1700000000000"
794                }]
795            }"#,
796        )
797        .unwrap();
798
799        let market = Market {
800            id: "BTCUSDT".to_string(),
801            symbol: "BTC/USDT".to_string(),
802            base: "BTC".to_string(),
803            quote: "USDT".to_string(),
804            ..Default::default()
805        };
806
807        let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
808        assert_eq!(ticker.symbol, "BTC/USDT");
809        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
810    }
811
812    #[test]
813    fn test_parse_ws_ticker_missing_data() {
814        let msg = serde_json::from_str(
815            r#"{
816                "action": "snapshot",
817                "arg": {
818                    "instType": "SPOT",
819                    "channel": "ticker",
820                    "instId": "BTCUSDT"
821                }
822            }"#,
823        )
824        .unwrap();
825
826        let result = parse_ws_ticker(&msg, None);
827        assert!(result.is_err());
828    }
829
830    #[test]
831    fn test_parse_ws_ticker_empty_data_array() {
832        let msg = serde_json::from_str(
833            r#"{
834                "action": "snapshot",
835                "arg": {
836                    "instType": "SPOT",
837                    "channel": "ticker",
838                    "instId": "BTCUSDT"
839                },
840                "data": []
841            }"#,
842        )
843        .unwrap();
844
845        let result = parse_ws_ticker(&msg, None);
846        assert!(result.is_err());
847    }
848
849    // ==================== OrderBook Message Parsing Tests ====================
850
851    #[test]
852    fn test_parse_ws_orderbook_snapshot() {
853        let msg = serde_json::from_str(
854            r#"{
855                "action": "snapshot",
856                "arg": {
857                    "instType": "SPOT",
858                    "channel": "books5",
859                    "instId": "BTCUSDT"
860                },
861                "data": [{
862                    "bids": [
863                        ["50000.00", "1.5"],
864                        ["49999.00", "2.0"],
865                        ["49998.00", "0.5"]
866                    ],
867                    "asks": [
868                        ["50001.00", "1.0"],
869                        ["50002.00", "3.0"],
870                        ["50003.00", "2.5"]
871                    ],
872                    "ts": "1700000000000"
873                }]
874            }"#,
875        )
876        .unwrap();
877
878        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
879        assert_eq!(orderbook.symbol, "BTC/USDT");
880        assert_eq!(orderbook.bids.len(), 3);
881        assert_eq!(orderbook.asks.len(), 3);
882
883        // Verify bids are sorted in descending order
884        assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
885        assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
886        assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
887
888        // Verify asks are sorted in ascending order
889        assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
890        assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
891        assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
892    }
893
894    #[test]
895    fn test_parse_ws_orderbook_update() {
896        let msg = serde_json::from_str(
897            r#"{
898                "action": "update",
899                "arg": {
900                    "instType": "SPOT",
901                    "channel": "books",
902                    "instId": "ETHUSDT"
903                },
904                "data": [{
905                    "bids": [
906                        ["2000.00", "10.0"]
907                    ],
908                    "asks": [
909                        ["2001.00", "5.0"]
910                    ],
911                    "ts": "1700000000001"
912                }]
913            }"#,
914        )
915        .unwrap();
916
917        let orderbook = parse_ws_orderbook(&msg, "ETH/USDT".to_string()).unwrap();
918        assert_eq!(orderbook.symbol, "ETH/USDT");
919        assert_eq!(orderbook.bids.len(), 1);
920        assert_eq!(orderbook.asks.len(), 1);
921        assert_eq!(orderbook.timestamp, 1700000000001);
922    }
923
924    #[test]
925    fn test_parse_ws_orderbook_missing_data() {
926        let msg = serde_json::from_str(
927            r#"{
928                "action": "snapshot",
929                "arg": {
930                    "instType": "SPOT",
931                    "channel": "books5",
932                    "instId": "BTCUSDT"
933                }
934            }"#,
935        )
936        .unwrap();
937
938        let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
939        assert!(result.is_err());
940    }
941
942    #[test]
943    fn test_parse_ws_orderbook_empty_sides() {
944        let msg = serde_json::from_str(
945            r#"{
946                "action": "snapshot",
947                "arg": {
948                    "instType": "SPOT",
949                    "channel": "books5",
950                    "instId": "BTCUSDT"
951                },
952                "data": [{
953                    "bids": [],
954                    "asks": [],
955                    "ts": "1700000000000"
956                }]
957            }"#,
958        )
959        .unwrap();
960
961        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
962        assert!(orderbook.bids.is_empty());
963        assert!(orderbook.asks.is_empty());
964    }
965
966    // ==================== Trade Message Parsing Tests ====================
967
968    #[test]
969    fn test_parse_ws_trade_single() {
970        let msg = serde_json::from_str(
971            r#"{
972                "action": "snapshot",
973                "arg": {
974                    "instType": "SPOT",
975                    "channel": "trade",
976                    "instId": "BTCUSDT"
977                },
978                "data": [{
979                    "tradeId": "123456789",
980                    "symbol": "BTCUSDT",
981                    "side": "buy",
982                    "price": "50000.00",
983                    "size": "0.5",
984                    "ts": "1700000000000"
985                }]
986            }"#,
987        )
988        .unwrap();
989
990        let trade = parse_ws_trade(&msg, None).unwrap();
991        assert_eq!(trade.id, Some("123456789".to_string()));
992        assert_eq!(trade.side, ccxt_core::types::OrderSide::Buy);
993        assert_eq!(trade.price, Price::new(dec!(50000.00)));
994        assert_eq!(
995            trade.amount,
996            ccxt_core::types::financial::Amount::new(dec!(0.5))
997        );
998        assert_eq!(trade.timestamp, 1700000000000);
999    }
1000
1001    #[test]
1002    fn test_parse_ws_trades_multiple() {
1003        let msg = serde_json::from_str(
1004            r#"{
1005                "action": "snapshot",
1006                "arg": {
1007                    "instType": "SPOT",
1008                    "channel": "trade",
1009                    "instId": "BTCUSDT"
1010                },
1011                "data": [
1012                    {
1013                        "tradeId": "123456789",
1014                        "symbol": "BTCUSDT",
1015                        "side": "buy",
1016                        "price": "50000.00",
1017                        "size": "0.5",
1018                        "ts": "1700000000000"
1019                    },
1020                    {
1021                        "tradeId": "123456790",
1022                        "symbol": "BTCUSDT",
1023                        "side": "sell",
1024                        "price": "50001.00",
1025                        "size": "1.0",
1026                        "ts": "1700000000001"
1027                    }
1028                ]
1029            }"#,
1030        )
1031        .unwrap();
1032
1033        let trades = parse_ws_trades(&msg, None).unwrap();
1034        assert_eq!(trades.len(), 2);
1035
1036        assert_eq!(trades[0].id, Some("123456789".to_string()));
1037        assert_eq!(trades[0].side, ccxt_core::types::OrderSide::Buy);
1038
1039        assert_eq!(trades[1].id, Some("123456790".to_string()));
1040        assert_eq!(trades[1].side, ccxt_core::types::OrderSide::Sell);
1041    }
1042
1043    #[test]
1044    fn test_parse_ws_trade_sell_side() {
1045        let msg = serde_json::from_str(
1046            r#"{
1047                "action": "snapshot",
1048                "arg": {
1049                    "instType": "SPOT",
1050                    "channel": "trade",
1051                    "instId": "BTCUSDT"
1052                },
1053                "data": [{
1054                    "tradeId": "123456789",
1055                    "symbol": "BTCUSDT",
1056                    "side": "sell",
1057                    "price": "50000.00",
1058                    "size": "0.5",
1059                    "ts": "1700000000000"
1060                }]
1061            }"#,
1062        )
1063        .unwrap();
1064
1065        let trade = parse_ws_trade(&msg, None).unwrap();
1066        assert_eq!(trade.side, ccxt_core::types::OrderSide::Sell);
1067    }
1068
1069    #[test]
1070    fn test_parse_ws_trade_missing_data() {
1071        let msg = serde_json::from_str(
1072            r#"{
1073                "action": "snapshot",
1074                "arg": {
1075                    "instType": "SPOT",
1076                    "channel": "trade",
1077                    "instId": "BTCUSDT"
1078                }
1079            }"#,
1080        )
1081        .unwrap();
1082
1083        let result = parse_ws_trade(&msg, None);
1084        assert!(result.is_err());
1085    }
1086
1087    #[test]
1088    fn test_parse_ws_trades_empty_array() {
1089        let msg = serde_json::from_str(
1090            r#"{
1091                "action": "snapshot",
1092                "arg": {
1093                    "instType": "SPOT",
1094                    "channel": "trade",
1095                    "instId": "BTCUSDT"
1096                },
1097                "data": []
1098            }"#,
1099        )
1100        .unwrap();
1101
1102        let trades = parse_ws_trades(&msg, None).unwrap();
1103        assert!(trades.is_empty());
1104    }
1105
1106    // ==================== Message Type Detection Tests ====================
1107
1108    #[test]
1109    fn test_is_ticker_message_true() {
1110        let msg = serde_json::from_str(
1111            r#"{
1112                "action": "snapshot",
1113                "arg": {
1114                    "instType": "SPOT",
1115                    "channel": "ticker",
1116                    "instId": "BTCUSDT"
1117                },
1118                "data": [{}]
1119            }"#,
1120        )
1121        .unwrap();
1122
1123        assert!(is_ticker_message(&msg, "BTCUSDT"));
1124    }
1125
1126    #[test]
1127    fn test_is_ticker_message_wrong_symbol() {
1128        let msg = serde_json::from_str(
1129            r#"{
1130                "action": "snapshot",
1131                "arg": {
1132                    "instType": "SPOT",
1133                    "channel": "ticker",
1134                    "instId": "ETHUSDT"
1135                },
1136                "data": [{}]
1137            }"#,
1138        )
1139        .unwrap();
1140
1141        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1142    }
1143
1144    #[test]
1145    fn test_is_ticker_message_wrong_channel() {
1146        let msg = serde_json::from_str(
1147            r#"{
1148                "action": "snapshot",
1149                "arg": {
1150                    "instType": "SPOT",
1151                    "channel": "trade",
1152                    "instId": "BTCUSDT"
1153                },
1154                "data": [{}]
1155            }"#,
1156        )
1157        .unwrap();
1158
1159        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1160    }
1161
1162    #[test]
1163    fn test_is_orderbook_message_books5() {
1164        let msg = serde_json::from_str(
1165            r#"{
1166                "arg": {
1167                    "instType": "SPOT",
1168                    "channel": "books5",
1169                    "instId": "BTCUSDT"
1170                }
1171            }"#,
1172        )
1173        .unwrap();
1174
1175        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1176    }
1177
1178    #[test]
1179    fn test_is_orderbook_message_books15() {
1180        let msg = serde_json::from_str(
1181            r#"{
1182                "arg": {
1183                    "instType": "SPOT",
1184                    "channel": "books15",
1185                    "instId": "BTCUSDT"
1186                }
1187            }"#,
1188        )
1189        .unwrap();
1190
1191        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1192    }
1193
1194    #[test]
1195    fn test_is_orderbook_message_books() {
1196        let msg = serde_json::from_str(
1197            r#"{
1198                "arg": {
1199                    "instType": "SPOT",
1200                    "channel": "books",
1201                    "instId": "BTCUSDT"
1202                }
1203            }"#,
1204        )
1205        .unwrap();
1206
1207        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1208    }
1209
1210    #[test]
1211    fn test_is_trade_message_true() {
1212        let msg = serde_json::from_str(
1213            r#"{
1214                "arg": {
1215                    "instType": "SPOT",
1216                    "channel": "trade",
1217                    "instId": "BTCUSDT"
1218                }
1219            }"#,
1220        )
1221        .unwrap();
1222
1223        assert!(is_trade_message(&msg, "BTCUSDT"));
1224    }
1225
1226    #[test]
1227    fn test_is_trade_message_wrong_channel() {
1228        let msg = serde_json::from_str(
1229            r#"{
1230                "arg": {
1231                    "instType": "SPOT",
1232                    "channel": "ticker",
1233                    "instId": "BTCUSDT"
1234                }
1235            }"#,
1236        )
1237        .unwrap();
1238
1239        assert!(!is_trade_message(&msg, "BTCUSDT"));
1240    }
1241
1242    // ==================== Symbol Formatting Tests ====================
1243
1244    #[test]
1245    fn test_format_unified_symbol_usdt() {
1246        assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1247        assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1248    }
1249
1250    #[test]
1251    fn test_format_unified_symbol_usdc() {
1252        assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1253    }
1254
1255    #[test]
1256    fn test_format_unified_symbol_btc() {
1257        assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1258    }
1259
1260    #[test]
1261    fn test_format_unified_symbol_unknown() {
1262        // Unknown quote currency returns as-is
1263        assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1264    }
1265}