ccxt_exchanges/bybit/
ws.rs

1//! Bybit WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for Bybit exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (balance, orders) with automatic reconnection.
6
7use crate::bybit::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19/// Default ping interval for Bybit WebSocket (20 seconds).
20/// Bybit requires ping every 20 seconds to keep connection alive.
21const DEFAULT_PING_INTERVAL_MS: u64 = 20000;
22
23/// Default reconnect delay (5 seconds).
24const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
25
26/// Maximum reconnect attempts.
27const MAX_RECONNECT_ATTEMPTS: u32 = 10;
28
29/// Bybit WebSocket client.
30///
31/// Provides real-time data streaming for Bybit exchange.
32pub struct BybitWs {
33    /// WebSocket client instance.
34    client: Arc<WsClient>,
35    /// Active subscriptions.
36    subscriptions: Arc<RwLock<Vec<String>>>,
37}
38
39impl BybitWs {
40    /// Creates a new Bybit WebSocket client.
41    ///
42    /// # Arguments
43    ///
44    /// * `url` - WebSocket server URL
45    pub fn new(url: String) -> Self {
46        let config = WsConfig {
47            url: url.clone(),
48            connect_timeout: 10000,
49            ping_interval: DEFAULT_PING_INTERVAL_MS,
50            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
51            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
52            auto_reconnect: true,
53            enable_compression: false,
54            pong_timeout: 90000,
55            ..Default::default()
56        };
57
58        Self {
59            client: Arc::new(WsClient::new(config)),
60            subscriptions: Arc::new(RwLock::new(Vec::new())),
61        }
62    }
63
64    /// Connects to the WebSocket server.
65    pub async fn connect(&self) -> Result<()> {
66        self.client.connect().await
67    }
68
69    /// Disconnects from the WebSocket server.
70    pub async fn disconnect(&self) -> Result<()> {
71        self.client.disconnect().await
72    }
73
74    /// Returns the current connection state.
75    pub fn state(&self) -> WsConnectionState {
76        self.client.state()
77    }
78
79    /// Checks if the WebSocket is connected.
80    pub fn is_connected(&self) -> bool {
81        self.client.is_connected()
82    }
83
84    /// Receives the next message from the WebSocket.
85    pub async fn receive(&self) -> Option<Value> {
86        self.client.receive().await
87    }
88
89    /// Subscribes to a ticker stream.
90    ///
91    /// # Arguments
92    ///
93    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
94    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
95        // Bybit V5 WebSocket subscription format
96        let topic = format!("tickers.{}", symbol);
97
98        // json! macro with literal values is infallible
99        #[allow(clippy::disallowed_methods)]
100        let msg = serde_json::json!({
101            "op": "subscribe",
102            "args": [topic]
103        });
104
105        self.client.send_json(&msg).await?;
106
107        let sub_key = format!("ticker:{}", symbol);
108        self.subscriptions.write().await.push(sub_key);
109
110        Ok(())
111    }
112
113    /// Subscribes to an orderbook stream.
114    ///
115    /// # Arguments
116    ///
117    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
118    /// * `depth` - Orderbook depth (1, 50, 200, or 500)
119    pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
120        // Bybit supports orderbook depths: 1, 50, 200, 500
121        let actual_depth = match depth {
122            1 => 1,
123            d if d <= 50 => 50,
124            d if d <= 200 => 200,
125            _ => 500,
126        };
127
128        let topic = format!("orderbook.{}.{}", actual_depth, symbol);
129
130        // json! macro with literal values is infallible
131        #[allow(clippy::disallowed_methods)]
132        let msg = serde_json::json!({
133            "op": "subscribe",
134            "args": [topic]
135        });
136
137        self.client.send_json(&msg).await?;
138
139        let sub_key = format!("orderbook:{}", symbol);
140        self.subscriptions.write().await.push(sub_key);
141
142        Ok(())
143    }
144
145    /// Subscribes to a trades stream.
146    ///
147    /// # Arguments
148    ///
149    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
150    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
151        let topic = format!("publicTrade.{}", symbol);
152
153        // json! macro with literal values is infallible
154        #[allow(clippy::disallowed_methods)]
155        let msg = serde_json::json!({
156            "op": "subscribe",
157            "args": [topic]
158        });
159
160        self.client.send_json(&msg).await?;
161
162        let sub_key = format!("trades:{}", symbol);
163        self.subscriptions.write().await.push(sub_key);
164
165        Ok(())
166    }
167
168    /// Subscribes to a kline/candlestick stream.
169    ///
170    /// # Arguments
171    ///
172    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
173    /// * `interval` - Kline interval (e.g., "1", "5", "60", "D")
174    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
175        let topic = format!("kline.{}.{}", interval, symbol);
176
177        // json! macro with literal values is infallible
178        #[allow(clippy::disallowed_methods)]
179        let msg = serde_json::json!({
180            "op": "subscribe",
181            "args": [topic]
182        });
183
184        self.client.send_json(&msg).await?;
185
186        let sub_key = format!("kline:{}:{}", symbol, interval);
187        self.subscriptions.write().await.push(sub_key);
188
189        Ok(())
190    }
191
192    /// Unsubscribes from a stream.
193    ///
194    /// # Arguments
195    ///
196    /// * `stream_name` - Stream identifier to unsubscribe from
197    pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
198        // Parse stream name to determine topic
199        let parts: Vec<&str> = stream_name.split(':').collect();
200        if parts.len() < 2 {
201            return Err(Error::invalid_request(format!(
202                "Invalid stream name: {}",
203                stream_name
204            )));
205        }
206
207        let channel = parts[0];
208        let symbol = parts[1];
209
210        let topic = match channel {
211            "ticker" => format!("tickers.{}", symbol),
212            "orderbook" => format!("orderbook.50.{}", symbol),
213            "trades" => format!("publicTrade.{}", symbol),
214            "kline" => {
215                if parts.len() >= 3 {
216                    format!("kline.{}.{}", parts[2], symbol)
217                } else {
218                    return Err(Error::invalid_request(
219                        "Kline unsubscribe requires interval",
220                    ));
221                }
222            }
223            _ => {
224                return Err(Error::invalid_request(format!(
225                    "Unknown channel: {}",
226                    channel
227                )));
228            }
229        };
230
231        // json! macro with literal values is infallible
232        #[allow(clippy::disallowed_methods)]
233        let msg = serde_json::json!({
234            "op": "unsubscribe",
235            "args": [topic]
236        });
237
238        self.client.send_json(&msg).await?;
239
240        // Remove from subscriptions
241        let mut subs = self.subscriptions.write().await;
242        subs.retain(|s| s != &stream_name);
243
244        Ok(())
245    }
246
247    /// Returns the list of active subscriptions.
248    pub async fn subscriptions(&self) -> Vec<String> {
249        self.subscriptions.read().await.clone()
250    }
251
252    /// Watches ticker updates for a symbol.
253    ///
254    /// Returns a stream of `Ticker` updates for the specified symbol.
255    ///
256    /// # Arguments
257    ///
258    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
259    /// * `market` - Optional market information for symbol resolution
260    ///
261    /// # Returns
262    ///
263    /// A `MessageStream<Ticker>` that yields ticker updates.
264    ///
265    /// # Example
266    ///
267    /// ```no_run
268    /// use ccxt_exchanges::bybit::ws::BybitWs;
269    /// use futures::StreamExt;
270    ///
271    /// # async fn example() -> ccxt_core::error::Result<()> {
272    /// let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
273    /// ws.connect().await?;
274    /// let mut stream = ws.watch_ticker("BTCUSDT", None).await?;
275    /// while let Some(result) = stream.next().await {
276    ///     match result {
277    ///         Ok(ticker) => println!("Ticker: {:?}", ticker.last),
278    ///         Err(e) => eprintln!("Error: {}", e),
279    ///     }
280    /// }
281    /// # Ok(())
282    /// # }
283    /// ```
284    pub async fn watch_ticker(
285        &self,
286        symbol: &str,
287        market: Option<Market>,
288    ) -> Result<MessageStream<Ticker>> {
289        // Ensure connected
290        if !self.is_connected() {
291            self.connect().await?;
292        }
293
294        // Subscribe to ticker channel
295        self.subscribe_ticker(symbol).await?;
296
297        // Create channel for ticker updates
298        let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
299        let symbol_owned = symbol.to_string();
300        let client = Arc::clone(&self.client);
301
302        // Spawn task to process messages and filter ticker updates
303        tokio::spawn(async move {
304            while let Some(msg) = client.receive().await {
305                // Check if this is a ticker message for our symbol
306                if is_ticker_message(&msg, &symbol_owned) {
307                    match parse_ws_ticker(&msg, market.as_ref()) {
308                        Ok(ticker) => {
309                            if tx.send(Ok(ticker)).is_err() {
310                                break; // Receiver dropped
311                            }
312                        }
313                        Err(e) => {
314                            if tx.send(Err(e)).is_err() {
315                                break;
316                            }
317                        }
318                    }
319                }
320            }
321        });
322
323        Ok(Box::pin(ReceiverStream::new(rx)))
324    }
325
326    /// Watches order book updates for a symbol.
327    ///
328    /// Returns a stream of `OrderBook` updates for the specified symbol.
329    ///
330    /// # Arguments
331    ///
332    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
333    /// * `limit` - Optional depth limit (1, 50, 200, or 500)
334    ///
335    /// # Returns
336    ///
337    /// A `MessageStream<OrderBook>` that yields order book updates.
338    ///
339    /// # Example
340    ///
341    /// ```no_run
342    /// use ccxt_exchanges::bybit::ws::BybitWs;
343    /// use futures::StreamExt;
344    ///
345    /// # async fn example() -> ccxt_core::error::Result<()> {
346    /// let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
347    /// ws.connect().await?;
348    /// let mut stream = ws.watch_order_book("BTCUSDT", Some(50)).await?;
349    /// while let Some(result) = stream.next().await {
350    ///     match result {
351    ///         Ok(orderbook) => println!("Best bid: {:?}", orderbook.bids.first()),
352    ///         Err(e) => eprintln!("Error: {}", e),
353    ///     }
354    /// }
355    /// # Ok(())
356    /// # }
357    /// ```
358    pub async fn watch_order_book(
359        &self,
360        symbol: &str,
361        limit: Option<u32>,
362    ) -> Result<MessageStream<OrderBook>> {
363        // Ensure connected
364        if !self.is_connected() {
365            self.connect().await?;
366        }
367
368        // Subscribe to orderbook channel
369        let depth = limit.unwrap_or(50);
370        self.subscribe_orderbook(symbol, depth).await?;
371
372        // Create channel for orderbook updates
373        let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
374        let symbol_owned = symbol.to_string();
375        let unified_symbol = format_unified_symbol(&symbol_owned);
376        let client = Arc::clone(&self.client);
377
378        // Spawn task to process messages and filter orderbook updates
379        tokio::spawn(async move {
380            while let Some(msg) = client.receive().await {
381                // Check if this is an orderbook message for our symbol
382                if is_orderbook_message(&msg, &symbol_owned) {
383                    match parse_ws_orderbook(&msg, unified_symbol.clone()) {
384                        Ok(orderbook) => {
385                            if tx.send(Ok(orderbook)).is_err() {
386                                break; // Receiver dropped
387                            }
388                        }
389                        Err(e) => {
390                            if tx.send(Err(e)).is_err() {
391                                break;
392                            }
393                        }
394                    }
395                }
396            }
397        });
398
399        Ok(Box::pin(ReceiverStream::new(rx)))
400    }
401
402    /// Watches trade updates for a symbol.
403    ///
404    /// Returns a stream of `Trade` updates for the specified symbol.
405    ///
406    /// # Arguments
407    ///
408    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
409    /// * `market` - Optional market information for symbol resolution
410    ///
411    /// # Returns
412    ///
413    /// A `MessageStream<Vec<Trade>>` that yields trade updates.
414    ///
415    /// # Example
416    ///
417    /// ```no_run
418    /// use ccxt_exchanges::bybit::ws::BybitWs;
419    /// use futures::StreamExt;
420    ///
421    /// # async fn example() -> ccxt_core::error::Result<()> {
422    /// let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
423    /// ws.connect().await?;
424    /// let mut stream = ws.watch_trades("BTCUSDT", None).await?;
425    /// while let Some(result) = stream.next().await {
426    ///     match result {
427    ///         Ok(trades) => {
428    ///             for trade in trades {
429    ///                 println!("Trade: {:?} @ {:?}", trade.amount, trade.price);
430    ///             }
431    ///         }
432    ///         Err(e) => eprintln!("Error: {}", e),
433    ///     }
434    /// }
435    /// # Ok(())
436    /// # }
437    /// ```
438    pub async fn watch_trades(
439        &self,
440        symbol: &str,
441        market: Option<Market>,
442    ) -> Result<MessageStream<Vec<Trade>>> {
443        // Ensure connected
444        if !self.is_connected() {
445            self.connect().await?;
446        }
447
448        // Subscribe to trades channel
449        self.subscribe_trades(symbol).await?;
450
451        // Create channel for trade updates
452        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
453        let symbol_owned = symbol.to_string();
454        let client = Arc::clone(&self.client);
455
456        // Spawn task to process messages and filter trade updates
457        tokio::spawn(async move {
458            while let Some(msg) = client.receive().await {
459                // Check if this is a trade message for our symbol
460                if is_trade_message(&msg, &symbol_owned) {
461                    match parse_ws_trades(&msg, market.as_ref()) {
462                        Ok(trades) => {
463                            if tx.send(Ok(trades)).is_err() {
464                                break; // Receiver dropped
465                            }
466                        }
467                        Err(e) => {
468                            if tx.send(Err(e)).is_err() {
469                                break;
470                            }
471                        }
472                    }
473                }
474            }
475        });
476
477        Ok(Box::pin(ReceiverStream::new(rx)))
478    }
479}
480
481// ============================================================================
482// Stream Wrapper
483// ============================================================================
484
485/// A stream wrapper that converts an mpsc receiver into a futures Stream.
486struct ReceiverStream<T> {
487    receiver: mpsc::UnboundedReceiver<T>,
488}
489
490impl<T> ReceiverStream<T> {
491    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
492        Self { receiver }
493    }
494}
495
496impl<T> Stream for ReceiverStream<T> {
497    type Item = T;
498
499    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
500        self.receiver.poll_recv(cx)
501    }
502}
503
504// ============================================================================
505// Message Type Detection Helpers
506// ============================================================================
507
508/// Check if a WebSocket message is a ticker message for the given symbol.
509fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
510    // Bybit V5 WebSocket ticker format:
511    // {"topic":"tickers.BTCUSDT","type":"snapshot","data":{...},"ts":...}
512    if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
513        let expected_topic = format!("tickers.{}", symbol);
514        topic == expected_topic
515    } else {
516        false
517    }
518}
519
520/// Check if a WebSocket message is an orderbook message for the given symbol.
521fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
522    // Bybit V5 WebSocket orderbook format:
523    // {"topic":"orderbook.50.BTCUSDT","type":"snapshot","data":{...},"ts":...}
524    if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
525        // Check if topic starts with "orderbook." and ends with the symbol
526        topic.starts_with("orderbook.") && topic.ends_with(symbol)
527    } else {
528        false
529    }
530}
531
532/// Check if a WebSocket message is a trade message for the given symbol.
533fn is_trade_message(msg: &Value, symbol: &str) -> bool {
534    // Bybit V5 WebSocket trade format:
535    // {"topic":"publicTrade.BTCUSDT","type":"snapshot","data":[{...}],"ts":...}
536    if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
537        let expected_topic = format!("publicTrade.{}", symbol);
538        topic == expected_topic
539    } else {
540        false
541    }
542}
543
544/// Format a Bybit symbol (e.g., "BTCUSDT") to unified format (e.g., "BTC/USDT").
545fn format_unified_symbol(symbol: &str) -> String {
546    // Common quote currencies to detect
547    let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
548
549    for quote in &quote_currencies {
550        if let Some(base) = symbol.strip_suffix(quote) {
551            if !base.is_empty() {
552                return format!("{}/{}", base, quote);
553            }
554        }
555    }
556
557    // If no known quote currency found, return as-is
558    symbol.to_string()
559}
560
561// ============================================================================
562// WebSocket Message Parsers
563// ============================================================================
564
565/// Parse a WebSocket ticker message.
566pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
567    // Bybit V5 WebSocket ticker format:
568    // {"topic":"tickers.BTCUSDT","type":"snapshot","data":{...},"ts":...}
569    let data = msg
570        .get("data")
571        .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
572
573    parse_ticker(data, market)
574}
575
576/// Parse a WebSocket orderbook message.
577pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
578    // Bybit V5 WebSocket orderbook format:
579    // {"topic":"orderbook.50.BTCUSDT","type":"snapshot","data":{"s":"BTCUSDT","b":[...],"a":[...],"u":...,"seq":...},"ts":...}
580    let data = msg
581        .get("data")
582        .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
583
584    parse_orderbook(data, symbol)
585}
586
587/// Parse a WebSocket trade message (single trade).
588pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
589    // Bybit V5 WebSocket trade format:
590    // {"topic":"publicTrade.BTCUSDT","type":"snapshot","data":[{...}],"ts":...}
591    let data = msg
592        .get("data")
593        .and_then(|d| d.as_array())
594        .and_then(|arr| arr.first())
595        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
596
597    parse_trade(data, market)
598}
599
600/// Parse a WebSocket trade message (multiple trades).
601pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
602    // Bybit V5 WebSocket trade format:
603    // {"topic":"publicTrade.BTCUSDT","type":"snapshot","data":[{...}, {...}],"ts":...}
604    let data_array = msg
605        .get("data")
606        .and_then(|d| d.as_array())
607        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
608
609    let mut trades = Vec::with_capacity(data_array.len());
610    for data in data_array {
611        trades.push(parse_trade(data, market)?);
612    }
613
614    Ok(trades)
615}
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620    use ccxt_core::types::financial::Price;
621    use rust_decimal_macros::dec;
622
623    #[test]
624    fn test_bybit_ws_creation() {
625        let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
626        // WsClient is created successfully - config is private so we just verify creation works
627        assert!(ws.subscriptions.try_read().is_ok());
628    }
629
630    #[tokio::test]
631    async fn test_subscriptions_empty_by_default() {
632        let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
633        let subs = ws.subscriptions().await;
634        assert!(subs.is_empty());
635    }
636
637    // ==================== Ticker Message Parsing Tests ====================
638
639    #[test]
640    fn test_parse_ws_ticker_snapshot() {
641        let msg = serde_json::from_str(
642            r#"{
643                "topic": "tickers.BTCUSDT",
644                "type": "snapshot",
645                "data": {
646                    "symbol": "BTCUSDT",
647                    "lastPrice": "50000.00",
648                    "highPrice24h": "51000.00",
649                    "lowPrice24h": "49000.00",
650                    "bid1Price": "49999.00",
651                    "ask1Price": "50001.00",
652                    "volume24h": "1000.5",
653                    "time": "1700000000000"
654                },
655                "ts": 1700000000000
656            }"#,
657        )
658        .unwrap();
659
660        let ticker = parse_ws_ticker(&msg, None).unwrap();
661        assert_eq!(ticker.symbol, "BTCUSDT");
662        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
663        assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
664        assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
665        assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
666        assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
667        assert_eq!(ticker.timestamp, 1700000000000);
668    }
669
670    #[test]
671    fn test_parse_ws_ticker_with_market() {
672        let msg = serde_json::from_str(
673            r#"{
674                "topic": "tickers.BTCUSDT",
675                "type": "snapshot",
676                "data": {
677                    "symbol": "BTCUSDT",
678                    "lastPrice": "50000.00",
679                    "time": "1700000000000"
680                },
681                "ts": 1700000000000
682            }"#,
683        )
684        .unwrap();
685
686        let market = Market {
687            id: "BTCUSDT".to_string(),
688            symbol: "BTC/USDT".to_string(),
689            base: "BTC".to_string(),
690            quote: "USDT".to_string(),
691            ..Default::default()
692        };
693
694        let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
695        assert_eq!(ticker.symbol, "BTC/USDT");
696        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
697    }
698
699    #[test]
700    fn test_parse_ws_ticker_missing_data() {
701        let msg = serde_json::from_str(
702            r#"{
703                "topic": "tickers.BTCUSDT",
704                "type": "snapshot",
705                "ts": 1700000000000
706            }"#,
707        )
708        .unwrap();
709
710        let result = parse_ws_ticker(&msg, None);
711        assert!(result.is_err());
712    }
713
714    // ==================== OrderBook Message Parsing Tests ====================
715
716    #[test]
717    fn test_parse_ws_orderbook_snapshot() {
718        let msg = serde_json::from_str(
719            r#"{
720                "topic": "orderbook.50.BTCUSDT",
721                "type": "snapshot",
722                "data": {
723                    "s": "BTCUSDT",
724                    "b": [
725                        ["50000.00", "1.5"],
726                        ["49999.00", "2.0"],
727                        ["49998.00", "0.5"]
728                    ],
729                    "a": [
730                        ["50001.00", "1.0"],
731                        ["50002.00", "3.0"],
732                        ["50003.00", "2.5"]
733                    ],
734                    "u": 12345,
735                    "seq": 67890,
736                    "ts": "1700000000000"
737                },
738                "ts": 1700000000000
739            }"#,
740        )
741        .unwrap();
742
743        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
744        assert_eq!(orderbook.symbol, "BTC/USDT");
745        assert_eq!(orderbook.bids.len(), 3);
746        assert_eq!(orderbook.asks.len(), 3);
747
748        // Verify bids are sorted in descending order
749        assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
750        assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
751        assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
752
753        // Verify asks are sorted in ascending order
754        assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
755        assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
756        assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
757    }
758
759    #[test]
760    fn test_parse_ws_orderbook_missing_data() {
761        let msg = serde_json::from_str(
762            r#"{
763                "topic": "orderbook.50.BTCUSDT",
764                "type": "snapshot",
765                "ts": 1700000000000
766            }"#,
767        )
768        .unwrap();
769
770        let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
771        assert!(result.is_err());
772    }
773
774    #[test]
775    fn test_parse_ws_orderbook_empty_sides() {
776        let msg = serde_json::from_str(
777            r#"{
778                "topic": "orderbook.50.BTCUSDT",
779                "type": "snapshot",
780                "data": {
781                    "s": "BTCUSDT",
782                    "b": [],
783                    "a": [],
784                    "u": 12345,
785                    "ts": "1700000000000"
786                },
787                "ts": 1700000000000
788            }"#,
789        )
790        .unwrap();
791
792        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
793        assert!(orderbook.bids.is_empty());
794        assert!(orderbook.asks.is_empty());
795    }
796
797    // ==================== Trade Message Parsing Tests ====================
798
799    #[test]
800    fn test_parse_ws_trade_single() {
801        let msg = serde_json::from_str(
802            r#"{
803                "topic": "publicTrade.BTCUSDT",
804                "type": "snapshot",
805                "data": [{
806                    "i": "123456789",
807                    "T": 1700000000000,
808                    "p": "50000.00",
809                    "v": "0.5",
810                    "S": "Buy",
811                    "s": "BTCUSDT"
812                }],
813                "ts": 1700000000000
814            }"#,
815        )
816        .unwrap();
817
818        let trade = parse_ws_trade(&msg, None).unwrap();
819        // Note: The trade parser uses different field names
820        assert_eq!(trade.timestamp, 1700000000000);
821    }
822
823    #[test]
824    fn test_parse_ws_trades_multiple() {
825        let msg = serde_json::from_str(
826            r#"{
827                "topic": "publicTrade.BTCUSDT",
828                "type": "snapshot",
829                "data": [
830                    {
831                        "i": "123456789",
832                        "T": 1700000000000,
833                        "p": "50000.00",
834                        "v": "0.5",
835                        "S": "Buy",
836                        "s": "BTCUSDT",
837                        "side": "Buy",
838                        "price": "50000.00",
839                        "size": "0.5",
840                        "time": "1700000000000"
841                    },
842                    {
843                        "i": "123456790",
844                        "T": 1700000000001,
845                        "p": "50001.00",
846                        "v": "1.0",
847                        "S": "Sell",
848                        "s": "BTCUSDT",
849                        "side": "Sell",
850                        "price": "50001.00",
851                        "size": "1.0",
852                        "time": "1700000000001"
853                    }
854                ],
855                "ts": 1700000000001
856            }"#,
857        )
858        .unwrap();
859
860        let trades = parse_ws_trades(&msg, None).unwrap();
861        assert_eq!(trades.len(), 2);
862    }
863
864    #[test]
865    fn test_parse_ws_trade_missing_data() {
866        let msg = serde_json::from_str(
867            r#"{
868                "topic": "publicTrade.BTCUSDT",
869                "type": "snapshot",
870                "ts": 1700000000000
871            }"#,
872        )
873        .unwrap();
874
875        let result = parse_ws_trade(&msg, None);
876        assert!(result.is_err());
877    }
878
879    #[test]
880    fn test_parse_ws_trades_empty_array() {
881        let msg = serde_json::from_str(
882            r#"{
883                "topic": "publicTrade.BTCUSDT",
884                "type": "snapshot",
885                "data": [],
886                "ts": 1700000000000
887            }"#,
888        )
889        .unwrap();
890
891        let trades = parse_ws_trades(&msg, None).unwrap();
892        assert!(trades.is_empty());
893    }
894
895    // ==================== Message Type Detection Tests ====================
896
897    #[test]
898    fn test_is_ticker_message_true() {
899        let msg = serde_json::from_str(
900            r#"{
901                "topic": "tickers.BTCUSDT",
902                "type": "snapshot",
903                "data": {},
904                "ts": 1700000000000
905            }"#,
906        )
907        .unwrap();
908
909        assert!(is_ticker_message(&msg, "BTCUSDT"));
910    }
911
912    #[test]
913    fn test_is_ticker_message_wrong_symbol() {
914        let msg = serde_json::from_str(
915            r#"{
916                "topic": "tickers.ETHUSDT",
917                "type": "snapshot",
918                "data": {},
919                "ts": 1700000000000
920            }"#,
921        )
922        .unwrap();
923
924        assert!(!is_ticker_message(&msg, "BTCUSDT"));
925    }
926
927    #[test]
928    fn test_is_ticker_message_wrong_topic() {
929        let msg = serde_json::from_str(
930            r#"{
931                "topic": "publicTrade.BTCUSDT",
932                "type": "snapshot",
933                "data": [],
934                "ts": 1700000000000
935            }"#,
936        )
937        .unwrap();
938
939        assert!(!is_ticker_message(&msg, "BTCUSDT"));
940    }
941
942    #[test]
943    fn test_is_orderbook_message_depth_50() {
944        let msg = serde_json::from_str(
945            r#"{
946                "topic": "orderbook.50.BTCUSDT",
947                "type": "snapshot",
948                "data": {},
949                "ts": 1700000000000
950            }"#,
951        )
952        .unwrap();
953
954        assert!(is_orderbook_message(&msg, "BTCUSDT"));
955    }
956
957    #[test]
958    fn test_is_orderbook_message_depth_200() {
959        let msg = serde_json::from_str(
960            r#"{
961                "topic": "orderbook.200.BTCUSDT",
962                "type": "snapshot",
963                "data": {},
964                "ts": 1700000000000
965            }"#,
966        )
967        .unwrap();
968
969        assert!(is_orderbook_message(&msg, "BTCUSDT"));
970    }
971
972    #[test]
973    fn test_is_trade_message_true() {
974        let msg = serde_json::from_str(
975            r#"{
976                "topic": "publicTrade.BTCUSDT",
977                "type": "snapshot",
978                "data": [],
979                "ts": 1700000000000
980            }"#,
981        )
982        .unwrap();
983
984        assert!(is_trade_message(&msg, "BTCUSDT"));
985    }
986
987    #[test]
988    fn test_is_trade_message_wrong_topic() {
989        let msg = serde_json::from_str(
990            r#"{
991                "topic": "tickers.BTCUSDT",
992                "type": "snapshot",
993                "data": {},
994                "ts": 1700000000000
995            }"#,
996        )
997        .unwrap();
998
999        assert!(!is_trade_message(&msg, "BTCUSDT"));
1000    }
1001
1002    // ==================== Symbol Formatting Tests ====================
1003
1004    #[test]
1005    fn test_format_unified_symbol_usdt() {
1006        assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1007        assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1008    }
1009
1010    #[test]
1011    fn test_format_unified_symbol_usdc() {
1012        assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1013    }
1014
1015    #[test]
1016    fn test_format_unified_symbol_btc() {
1017        assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1018    }
1019
1020    #[test]
1021    fn test_format_unified_symbol_unknown() {
1022        // Unknown quote currency returns as-is
1023        assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1024    }
1025}