Skip to main content

ccxt_exchanges/bybit/
ws.rs

1//! Bybit WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for Bybit exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (balance, orders) with automatic reconnection.
6
7use crate::bybit::parser::{parse_orderbook, parse_ticker, parse_trade};
8use ccxt_core::error::{Error, Result};
9use ccxt_core::types::{Market, OrderBook, Ticker, Trade};
10use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
11use ccxt_core::ws_exchange::MessageStream;
12use futures::Stream;
13use serde_json::Value;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use tokio::sync::{RwLock, mpsc};
18
19/// Default ping interval for Bybit WebSocket (20 seconds).
20/// Bybit requires ping every 20 seconds to keep connection alive.
21const DEFAULT_PING_INTERVAL_MS: u64 = 20000;
22
23/// Default reconnect delay (5 seconds).
24const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
25
26/// Maximum reconnect attempts.
27const MAX_RECONNECT_ATTEMPTS: u32 = 10;
28
29/// Bybit WebSocket client.
30///
31/// Provides real-time data streaming for Bybit exchange.
32pub struct BybitWs {
33    /// WebSocket client instance.
34    client: Arc<WsClient>,
35    /// Active subscriptions.
36    subscriptions: Arc<RwLock<Vec<String>>>,
37}
38
39impl BybitWs {
40    /// Creates a new Bybit WebSocket client.
41    ///
42    /// # Arguments
43    ///
44    /// * `url` - WebSocket server URL
45    pub fn new(url: String) -> Self {
46        let config = WsConfig {
47            url: url.clone(),
48            connect_timeout: 10000,
49            ping_interval: DEFAULT_PING_INTERVAL_MS,
50            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
51            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
52            auto_reconnect: true,
53            enable_compression: false,
54            pong_timeout: 90000,
55            ..Default::default()
56        };
57
58        Self {
59            client: Arc::new(WsClient::new(config)),
60            subscriptions: Arc::new(RwLock::new(Vec::new())),
61        }
62    }
63
64    /// Connects to the WebSocket server.
65    pub async fn connect(&self) -> Result<()> {
66        self.client.connect().await
67    }
68
69    /// Disconnects from the WebSocket server.
70    pub async fn disconnect(&self) -> Result<()> {
71        self.client.disconnect().await
72    }
73
74    /// Returns the current connection state.
75    pub fn state(&self) -> WsConnectionState {
76        self.client.state()
77    }
78
79    /// Checks if the WebSocket is connected.
80    pub fn is_connected(&self) -> bool {
81        self.client.is_connected()
82    }
83
84    /// Receives the next message from the WebSocket.
85    pub async fn receive(&self) -> Option<Value> {
86        self.client.receive().await
87    }
88
89    /// Subscribes to a ticker stream.
90    ///
91    /// # Arguments
92    ///
93    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
94    pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
95        // Bybit V5 WebSocket subscription format
96        let topic = format!("tickers.{}", symbol);
97
98        // json! macro with literal values is infallible
99        #[allow(clippy::disallowed_methods)]
100        let msg = serde_json::json!({
101            "op": "subscribe",
102            "args": [topic]
103        });
104
105        self.client.send_json(&msg).await?;
106
107        let sub_key = format!("ticker:{}", symbol);
108        self.subscriptions.write().await.push(sub_key);
109
110        Ok(())
111    }
112
113    /// Subscribes to multiple ticker streams.
114    ///
115    /// # Arguments
116    ///
117    /// * `symbols` - List of trading pair symbols (e.g., `["BTCUSDT", "ETHUSDT"]`)
118    pub async fn subscribe_tickers(&self, symbols: &[String]) -> Result<()> {
119        let topics: Vec<String> = symbols.iter().map(|s| format!("tickers.{}", s)).collect();
120
121        // json! macro with literal values is infallible
122        #[allow(clippy::disallowed_methods)]
123        let msg = serde_json::json!({
124            "op": "subscribe",
125            "args": topics
126        });
127
128        self.client.send_json(&msg).await?;
129
130        let mut subs = self.subscriptions.write().await;
131        for symbol in symbols {
132            subs.push(format!("ticker:{}", symbol));
133        }
134
135        Ok(())
136    }
137
138    /// Watches ticker updates for multiple symbols.
139    ///
140    /// Returns a stream of `Vec<Ticker>` updates for the specified symbols.
141    ///
142    /// # Arguments
143    ///
144    /// * `symbols` - List of trading pair symbols (e.g., `["BTCUSDT", "ETHUSDT"]`)
145    ///
146    /// # Returns
147    ///
148    /// A `MessageStream<Vec<Ticker>>` that yields ticker updates.
149    pub async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
150        // Ensure connected
151        if !self.is_connected() {
152            self.connect().await?;
153        }
154
155        // Subscribe to ticker channels
156        self.subscribe_tickers(symbols).await?;
157
158        // Create channel for ticker updates
159        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Ticker>>>();
160        let symbols_owned: Vec<String> = symbols.to_vec();
161        let client = Arc::clone(&self.client);
162
163        // Spawn task to process messages and filter ticker updates
164        tokio::spawn(async move {
165            while let Some(msg) = client.receive().await {
166                // Check if this is a ticker message for ANY of our symbols
167                if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
168                    if let Some(symbol_part) = topic.strip_prefix("tickers.") {
169                        if symbols_owned.iter().any(|s| s == symbol_part) {
170                            match parse_ws_ticker(&msg, None) {
171                                Ok(ticker) => {
172                                    if tx.send(Ok(vec![ticker])).is_err() {
173                                        break; // Receiver dropped
174                                    }
175                                }
176                                Err(e) => {
177                                    if tx.send(Err(e)).is_err() {
178                                        break;
179                                    }
180                                }
181                            }
182                        }
183                    }
184                }
185            }
186        });
187
188        Ok(Box::pin(ReceiverStream::new(rx)))
189    }
190    /// * `depth` - Orderbook depth (1, 50, 200, or 500)
191    pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
192        // Bybit supports orderbook depths: 1, 50, 200, 500
193        let actual_depth = match depth {
194            1 => 1,
195            d if d <= 50 => 50,
196            d if d <= 200 => 200,
197            _ => 500,
198        };
199
200        let topic = format!("orderbook.{}.{}", actual_depth, symbol);
201
202        // json! macro with literal values is infallible
203        #[allow(clippy::disallowed_methods)]
204        let msg = serde_json::json!({
205            "op": "subscribe",
206            "args": [topic]
207        });
208
209        self.client.send_json(&msg).await?;
210
211        let sub_key = format!("orderbook:{}", symbol);
212        self.subscriptions.write().await.push(sub_key);
213
214        Ok(())
215    }
216
217    /// Subscribes to a trades stream.
218    ///
219    /// # Arguments
220    ///
221    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
222    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
223        let topic = format!("publicTrade.{}", symbol);
224
225        // json! macro with literal values is infallible
226        #[allow(clippy::disallowed_methods)]
227        let msg = serde_json::json!({
228            "op": "subscribe",
229            "args": [topic]
230        });
231
232        self.client.send_json(&msg).await?;
233
234        let sub_key = format!("trades:{}", symbol);
235        self.subscriptions.write().await.push(sub_key);
236
237        Ok(())
238    }
239
240    /// Subscribes to a kline/candlestick stream.
241    ///
242    /// # Arguments
243    ///
244    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
245    /// * `interval` - Kline interval (e.g., "1", "5", "60", "D")
246    pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
247        let topic = format!("kline.{}.{}", interval, symbol);
248
249        // json! macro with literal values is infallible
250        #[allow(clippy::disallowed_methods)]
251        let msg = serde_json::json!({
252            "op": "subscribe",
253            "args": [topic]
254        });
255
256        self.client.send_json(&msg).await?;
257
258        let sub_key = format!("kline:{}:{}", symbol, interval);
259        self.subscriptions.write().await.push(sub_key);
260
261        Ok(())
262    }
263
264    /// Unsubscribes from a stream.
265    ///
266    /// # Arguments
267    ///
268    /// * `stream_name` - Stream identifier to unsubscribe from
269    pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
270        // Parse stream name to determine topic
271        let parts: Vec<&str> = stream_name.split(':').collect();
272        if parts.len() < 2 {
273            return Err(Error::invalid_request(format!(
274                "Invalid stream name: {}",
275                stream_name
276            )));
277        }
278
279        let channel = parts[0];
280        let symbol = parts[1];
281
282        let topic = match channel {
283            "ticker" => format!("tickers.{}", symbol),
284            "orderbook" => format!("orderbook.50.{}", symbol),
285            "trades" => format!("publicTrade.{}", symbol),
286            "kline" => {
287                if parts.len() >= 3 {
288                    format!("kline.{}.{}", parts[2], symbol)
289                } else {
290                    return Err(Error::invalid_request(
291                        "Kline unsubscribe requires interval",
292                    ));
293                }
294            }
295            _ => {
296                return Err(Error::invalid_request(format!(
297                    "Unknown channel: {}",
298                    channel
299                )));
300            }
301        };
302
303        // json! macro with literal values is infallible
304        #[allow(clippy::disallowed_methods)]
305        let msg = serde_json::json!({
306            "op": "unsubscribe",
307            "args": [topic]
308        });
309
310        self.client.send_json(&msg).await?;
311
312        // Remove from subscriptions
313        let mut subs = self.subscriptions.write().await;
314        subs.retain(|s| s != &stream_name);
315
316        Ok(())
317    }
318
319    /// Returns the list of active subscriptions.
320    pub async fn subscriptions(&self) -> Vec<String> {
321        self.subscriptions.read().await.clone()
322    }
323
324    /// Watches ticker updates for a symbol.
325    ///
326    /// Returns a stream of `Ticker` updates for the specified symbol.
327    ///
328    /// # Arguments
329    ///
330    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
331    /// * `market` - Optional market information for symbol resolution
332    ///
333    /// # Returns
334    ///
335    /// A `MessageStream<Ticker>` that yields ticker updates.
336    ///
337    /// # Example
338    ///
339    /// ```no_run
340    /// use ccxt_exchanges::bybit::ws::BybitWs;
341    /// use futures::StreamExt;
342    ///
343    /// # async fn example() -> ccxt_core::error::Result<()> {
344    /// let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
345    /// ws.connect().await?;
346    /// let mut stream = ws.watch_ticker("BTCUSDT", None).await?;
347    /// while let Some(result) = stream.next().await {
348    ///     match result {
349    ///         Ok(ticker) => println!("Ticker: {:?}", ticker.last),
350    ///         Err(e) => eprintln!("Error: {}", e),
351    ///     }
352    /// }
353    /// # Ok(())
354    /// # }
355    /// ```
356    pub async fn watch_ticker(
357        &self,
358        symbol: &str,
359        market: Option<Market>,
360    ) -> Result<MessageStream<Ticker>> {
361        // Ensure connected
362        if !self.is_connected() {
363            self.connect().await?;
364        }
365
366        // Subscribe to ticker channel
367        self.subscribe_ticker(symbol).await?;
368
369        // Create channel for ticker updates
370        let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
371        let symbol_owned = symbol.to_string();
372        let client = Arc::clone(&self.client);
373
374        // Spawn task to process messages and filter ticker updates
375        tokio::spawn(async move {
376            while let Some(msg) = client.receive().await {
377                // Check if this is a ticker message for our symbol
378                if is_ticker_message(&msg, &symbol_owned) {
379                    match parse_ws_ticker(&msg, market.as_ref()) {
380                        Ok(ticker) => {
381                            if tx.send(Ok(ticker)).is_err() {
382                                break; // Receiver dropped
383                            }
384                        }
385                        Err(e) => {
386                            if tx.send(Err(e)).is_err() {
387                                break;
388                            }
389                        }
390                    }
391                }
392            }
393        });
394
395        Ok(Box::pin(ReceiverStream::new(rx)))
396    }
397
398    /// Watches order book updates for a symbol.
399    ///
400    /// Returns a stream of `OrderBook` updates for the specified symbol.
401    ///
402    /// # Arguments
403    ///
404    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
405    /// * `limit` - Optional depth limit (1, 50, 200, or 500)
406    ///
407    /// # Returns
408    ///
409    /// A `MessageStream<OrderBook>` that yields order book updates.
410    ///
411    /// # Example
412    ///
413    /// ```no_run
414    /// use ccxt_exchanges::bybit::ws::BybitWs;
415    /// use futures::StreamExt;
416    ///
417    /// # async fn example() -> ccxt_core::error::Result<()> {
418    /// let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
419    /// ws.connect().await?;
420    /// let mut stream = ws.watch_order_book("BTCUSDT", Some(50)).await?;
421    /// while let Some(result) = stream.next().await {
422    ///     match result {
423    ///         Ok(orderbook) => println!("Best bid: {:?}", orderbook.bids.first()),
424    ///         Err(e) => eprintln!("Error: {}", e),
425    ///     }
426    /// }
427    /// # Ok(())
428    /// # }
429    /// ```
430    pub async fn watch_order_book(
431        &self,
432        symbol: &str,
433        limit: Option<u32>,
434    ) -> Result<MessageStream<OrderBook>> {
435        // Ensure connected
436        if !self.is_connected() {
437            self.connect().await?;
438        }
439
440        // Subscribe to orderbook channel
441        let depth = limit.unwrap_or(50);
442        self.subscribe_orderbook(symbol, depth).await?;
443
444        // Create channel for orderbook updates
445        let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
446        let symbol_owned = symbol.to_string();
447        let unified_symbol = format_unified_symbol(&symbol_owned);
448        let client = Arc::clone(&self.client);
449
450        // Spawn task to process messages and filter orderbook updates
451        tokio::spawn(async move {
452            while let Some(msg) = client.receive().await {
453                // Check if this is an orderbook message for our symbol
454                if is_orderbook_message(&msg, &symbol_owned) {
455                    match parse_ws_orderbook(&msg, unified_symbol.clone()) {
456                        Ok(orderbook) => {
457                            if tx.send(Ok(orderbook)).is_err() {
458                                break; // Receiver dropped
459                            }
460                        }
461                        Err(e) => {
462                            if tx.send(Err(e)).is_err() {
463                                break;
464                            }
465                        }
466                    }
467                }
468            }
469        });
470
471        Ok(Box::pin(ReceiverStream::new(rx)))
472    }
473
474    /// Watches trade updates for a symbol.
475    ///
476    /// Returns a stream of `Trade` updates for the specified symbol.
477    ///
478    /// # Arguments
479    ///
480    /// * `symbol` - Trading pair symbol (e.g., "BTCUSDT")
481    /// * `market` - Optional market information for symbol resolution
482    ///
483    /// # Returns
484    ///
485    /// A `MessageStream<Vec<Trade>>` that yields trade updates.
486    ///
487    /// # Example
488    ///
489    /// ```no_run
490    /// use ccxt_exchanges::bybit::ws::BybitWs;
491    /// use futures::StreamExt;
492    ///
493    /// # async fn example() -> ccxt_core::error::Result<()> {
494    /// let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
495    /// ws.connect().await?;
496    /// let mut stream = ws.watch_trades("BTCUSDT", None).await?;
497    /// while let Some(result) = stream.next().await {
498    ///     match result {
499    ///         Ok(trades) => {
500    ///             for trade in trades {
501    ///                 println!("Trade: {:?} @ {:?}", trade.amount, trade.price);
502    ///             }
503    ///         }
504    ///         Err(e) => eprintln!("Error: {}", e),
505    ///     }
506    /// }
507    /// # Ok(())
508    /// # }
509    /// ```
510    pub async fn watch_trades(
511        &self,
512        symbol: &str,
513        market: Option<Market>,
514    ) -> Result<MessageStream<Vec<Trade>>> {
515        // Ensure connected
516        if !self.is_connected() {
517            self.connect().await?;
518        }
519
520        // Subscribe to trades channel
521        self.subscribe_trades(symbol).await?;
522
523        // Create channel for trade updates
524        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
525        let symbol_owned = symbol.to_string();
526        let client = Arc::clone(&self.client);
527
528        // Spawn task to process messages and filter trade updates
529        tokio::spawn(async move {
530            while let Some(msg) = client.receive().await {
531                // Check if this is a trade message for our symbol
532                if is_trade_message(&msg, &symbol_owned) {
533                    match parse_ws_trades(&msg, market.as_ref()) {
534                        Ok(trades) => {
535                            if tx.send(Ok(trades)).is_err() {
536                                break; // Receiver dropped
537                            }
538                        }
539                        Err(e) => {
540                            if tx.send(Err(e)).is_err() {
541                                break;
542                            }
543                        }
544                    }
545                }
546            }
547        });
548
549        Ok(Box::pin(ReceiverStream::new(rx)))
550    }
551}
552
553// ============================================================================
554// Stream Wrapper
555// ============================================================================
556
557/// A stream wrapper that converts an mpsc receiver into a futures Stream.
558struct ReceiverStream<T> {
559    receiver: mpsc::UnboundedReceiver<T>,
560}
561
562impl<T> ReceiverStream<T> {
563    fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
564        Self { receiver }
565    }
566}
567
568impl<T> Stream for ReceiverStream<T> {
569    type Item = T;
570
571    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
572        self.receiver.poll_recv(cx)
573    }
574}
575
576// ============================================================================
577// Message Type Detection Helpers
578// ============================================================================
579
580/// Check if a WebSocket message is a ticker message for the given symbol.
581fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
582    // Bybit V5 WebSocket ticker format:
583    // {"topic":"tickers.BTCUSDT","type":"snapshot","data":{...},"ts":...}
584    if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
585        let expected_topic = format!("tickers.{}", symbol);
586        topic == expected_topic
587    } else {
588        false
589    }
590}
591
592/// Check if a WebSocket message is an orderbook message for the given symbol.
593fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
594    // Bybit V5 WebSocket orderbook format:
595    // {"topic":"orderbook.50.BTCUSDT","type":"snapshot","data":{...},"ts":...}
596    if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
597        // Check if topic starts with "orderbook." and ends with the symbol
598        topic.starts_with("orderbook.") && topic.ends_with(symbol)
599    } else {
600        false
601    }
602}
603
604/// Check if a WebSocket message is a trade message for the given symbol.
605fn is_trade_message(msg: &Value, symbol: &str) -> bool {
606    // Bybit V5 WebSocket trade format:
607    // {"topic":"publicTrade.BTCUSDT","type":"snapshot","data":[{...}],"ts":...}
608    if let Some(topic) = msg.get("topic").and_then(|t| t.as_str()) {
609        let expected_topic = format!("publicTrade.{}", symbol);
610        topic == expected_topic
611    } else {
612        false
613    }
614}
615
616/// Format a Bybit symbol (e.g., "BTCUSDT") to unified format (e.g., "BTC/USDT").
617fn format_unified_symbol(symbol: &str) -> String {
618    // Common quote currencies to detect
619    let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
620
621    for quote in &quote_currencies {
622        if let Some(base) = symbol.strip_suffix(quote) {
623            if !base.is_empty() {
624                return format!("{}/{}", base, quote);
625            }
626        }
627    }
628
629    // If no known quote currency found, return as-is
630    symbol.to_string()
631}
632
633// ============================================================================
634// WebSocket Message Parsers
635// ============================================================================
636
637/// Parse a WebSocket ticker message.
638pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
639    // Bybit V5 WebSocket ticker format:
640    // {"topic":"tickers.BTCUSDT","type":"snapshot","data":{...},"ts":...}
641    let data = msg
642        .get("data")
643        .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
644
645    parse_ticker(data, market)
646}
647
648/// Parse a WebSocket orderbook message.
649pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
650    // Bybit V5 WebSocket orderbook format:
651    // {"topic":"orderbook.50.BTCUSDT","type":"snapshot","data":{"s":"BTCUSDT","b":[...],"a":[...],"u":...,"seq":...},"ts":...}
652    let data = msg
653        .get("data")
654        .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
655
656    parse_orderbook(data, symbol)
657}
658
659/// Parse a WebSocket trade message (single trade).
660pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
661    // Bybit V5 WebSocket trade format:
662    // {"topic":"publicTrade.BTCUSDT","type":"snapshot","data":[{...}],"ts":...}
663    let data = msg
664        .get("data")
665        .and_then(|d| d.as_array())
666        .and_then(|arr| arr.first())
667        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
668
669    parse_trade(data, market)
670}
671
672/// Parse a WebSocket trade message (multiple trades).
673pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
674    // Bybit V5 WebSocket trade format:
675    // {"topic":"publicTrade.BTCUSDT","type":"snapshot","data":[{...}, {...}],"ts":...}
676    let data_array = msg
677        .get("data")
678        .and_then(|d| d.as_array())
679        .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
680
681    let mut trades = Vec::with_capacity(data_array.len());
682    for data in data_array {
683        trades.push(parse_trade(data, market)?);
684    }
685
686    Ok(trades)
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692    use ccxt_core::types::financial::Price;
693    use rust_decimal_macros::dec;
694
695    #[test]
696    fn test_bybit_ws_creation() {
697        let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
698        // WsClient is created successfully - config is private so we just verify creation works
699        assert!(ws.subscriptions.try_read().is_ok());
700    }
701
702    #[tokio::test]
703    async fn test_subscriptions_empty_by_default() {
704        let ws = BybitWs::new("wss://stream.bybit.com/v5/public/spot".to_string());
705        let subs = ws.subscriptions().await;
706        assert!(subs.is_empty());
707    }
708
709    // ==================== Ticker Message Parsing Tests ====================
710
711    #[test]
712    fn test_parse_ws_ticker_snapshot() {
713        let msg = serde_json::from_str(
714            r#"{
715                "topic": "tickers.BTCUSDT",
716                "type": "snapshot",
717                "data": {
718                    "symbol": "BTCUSDT",
719                    "lastPrice": "50000.00",
720                    "highPrice24h": "51000.00",
721                    "lowPrice24h": "49000.00",
722                    "bid1Price": "49999.00",
723                    "ask1Price": "50001.00",
724                    "volume24h": "1000.5",
725                    "time": "1700000000000"
726                },
727                "ts": 1700000000000
728            }"#,
729        )
730        .unwrap();
731
732        let ticker = parse_ws_ticker(&msg, None).unwrap();
733        assert_eq!(ticker.symbol, "BTCUSDT");
734        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
735        assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
736        assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
737        assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
738        assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
739        assert_eq!(ticker.timestamp, 1700000000000);
740    }
741
742    #[test]
743    fn test_parse_ws_ticker_with_market() {
744        let msg = serde_json::from_str(
745            r#"{
746                "topic": "tickers.BTCUSDT",
747                "type": "snapshot",
748                "data": {
749                    "symbol": "BTCUSDT",
750                    "lastPrice": "50000.00",
751                    "time": "1700000000000"
752                },
753                "ts": 1700000000000
754            }"#,
755        )
756        .unwrap();
757
758        let market = Market {
759            id: "BTCUSDT".to_string(),
760            symbol: "BTC/USDT".to_string(),
761            base: "BTC".to_string(),
762            quote: "USDT".to_string(),
763            ..Default::default()
764        };
765
766        let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
767        assert_eq!(ticker.symbol, "BTC/USDT");
768        assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
769    }
770
771    #[test]
772    fn test_parse_ws_ticker_missing_data() {
773        let msg = serde_json::from_str(
774            r#"{
775                "topic": "tickers.BTCUSDT",
776                "type": "snapshot",
777                "ts": 1700000000000
778            }"#,
779        )
780        .unwrap();
781
782        let result = parse_ws_ticker(&msg, None);
783        assert!(result.is_err());
784    }
785
786    // ==================== OrderBook Message Parsing Tests ====================
787
788    #[test]
789    fn test_parse_ws_orderbook_snapshot() {
790        let msg = serde_json::from_str(
791            r#"{
792                "topic": "orderbook.50.BTCUSDT",
793                "type": "snapshot",
794                "data": {
795                    "s": "BTCUSDT",
796                    "b": [
797                        ["50000.00", "1.5"],
798                        ["49999.00", "2.0"],
799                        ["49998.00", "0.5"]
800                    ],
801                    "a": [
802                        ["50001.00", "1.0"],
803                        ["50002.00", "3.0"],
804                        ["50003.00", "2.5"]
805                    ],
806                    "u": 12345,
807                    "seq": 67890,
808                    "ts": "1700000000000"
809                },
810                "ts": 1700000000000
811            }"#,
812        )
813        .unwrap();
814
815        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
816        assert_eq!(orderbook.symbol, "BTC/USDT");
817        assert_eq!(orderbook.bids.len(), 3);
818        assert_eq!(orderbook.asks.len(), 3);
819
820        // Verify bids are sorted in descending order
821        assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
822        assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
823        assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
824
825        // Verify asks are sorted in ascending order
826        assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
827        assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
828        assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
829    }
830
831    #[test]
832    fn test_parse_ws_orderbook_missing_data() {
833        let msg = serde_json::from_str(
834            r#"{
835                "topic": "orderbook.50.BTCUSDT",
836                "type": "snapshot",
837                "ts": 1700000000000
838            }"#,
839        )
840        .unwrap();
841
842        let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
843        assert!(result.is_err());
844    }
845
846    #[test]
847    fn test_parse_ws_orderbook_empty_sides() {
848        let msg = serde_json::from_str(
849            r#"{
850                "topic": "orderbook.50.BTCUSDT",
851                "type": "snapshot",
852                "data": {
853                    "s": "BTCUSDT",
854                    "b": [],
855                    "a": [],
856                    "u": 12345,
857                    "ts": "1700000000000"
858                },
859                "ts": 1700000000000
860            }"#,
861        )
862        .unwrap();
863
864        let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
865        assert!(orderbook.bids.is_empty());
866        assert!(orderbook.asks.is_empty());
867    }
868
869    // ==================== Trade Message Parsing Tests ====================
870
871    #[test]
872    fn test_parse_ws_trade_single() {
873        let msg = serde_json::from_str(
874            r#"{
875                "topic": "publicTrade.BTCUSDT",
876                "type": "snapshot",
877                "data": [{
878                    "i": "123456789",
879                    "T": 1700000000000,
880                    "p": "50000.00",
881                    "v": "0.5",
882                    "S": "Buy",
883                    "s": "BTCUSDT"
884                }],
885                "ts": 1700000000000
886            }"#,
887        )
888        .unwrap();
889
890        let trade = parse_ws_trade(&msg, None).unwrap();
891        // Note: The trade parser uses different field names
892        assert_eq!(trade.timestamp, 1700000000000);
893    }
894
895    #[test]
896    fn test_parse_ws_trades_multiple() {
897        let msg = serde_json::from_str(
898            r#"{
899                "topic": "publicTrade.BTCUSDT",
900                "type": "snapshot",
901                "data": [
902                    {
903                        "i": "123456789",
904                        "T": 1700000000000,
905                        "p": "50000.00",
906                        "v": "0.5",
907                        "S": "Buy",
908                        "s": "BTCUSDT",
909                        "side": "Buy",
910                        "price": "50000.00",
911                        "size": "0.5",
912                        "time": "1700000000000"
913                    },
914                    {
915                        "i": "123456790",
916                        "T": 1700000000001,
917                        "p": "50001.00",
918                        "v": "1.0",
919                        "S": "Sell",
920                        "s": "BTCUSDT",
921                        "side": "Sell",
922                        "price": "50001.00",
923                        "size": "1.0",
924                        "time": "1700000000001"
925                    }
926                ],
927                "ts": 1700000000001
928            }"#,
929        )
930        .unwrap();
931
932        let trades = parse_ws_trades(&msg, None).unwrap();
933        assert_eq!(trades.len(), 2);
934    }
935
936    #[test]
937    fn test_parse_ws_trade_missing_data() {
938        let msg = serde_json::from_str(
939            r#"{
940                "topic": "publicTrade.BTCUSDT",
941                "type": "snapshot",
942                "ts": 1700000000000
943            }"#,
944        )
945        .unwrap();
946
947        let result = parse_ws_trade(&msg, None);
948        assert!(result.is_err());
949    }
950
951    #[test]
952    fn test_parse_ws_trades_empty_array() {
953        let msg = serde_json::from_str(
954            r#"{
955                "topic": "publicTrade.BTCUSDT",
956                "type": "snapshot",
957                "data": [],
958                "ts": 1700000000000
959            }"#,
960        )
961        .unwrap();
962
963        let trades = parse_ws_trades(&msg, None).unwrap();
964        assert!(trades.is_empty());
965    }
966
967    // ==================== Message Type Detection Tests ====================
968
969    #[test]
970    fn test_is_ticker_message_true() {
971        let msg = serde_json::from_str(
972            r#"{
973                "topic": "tickers.BTCUSDT",
974                "type": "snapshot",
975                "data": {},
976                "ts": 1700000000000
977            }"#,
978        )
979        .unwrap();
980
981        assert!(is_ticker_message(&msg, "BTCUSDT"));
982    }
983
984    #[test]
985    fn test_is_ticker_message_wrong_symbol() {
986        let msg = serde_json::from_str(
987            r#"{
988                "topic": "tickers.ETHUSDT",
989                "type": "snapshot",
990                "data": {},
991                "ts": 1700000000000
992            }"#,
993        )
994        .unwrap();
995
996        assert!(!is_ticker_message(&msg, "BTCUSDT"));
997    }
998
999    #[test]
1000    fn test_is_ticker_message_wrong_topic() {
1001        let msg = serde_json::from_str(
1002            r#"{
1003                "topic": "publicTrade.BTCUSDT",
1004                "type": "snapshot",
1005                "data": [],
1006                "ts": 1700000000000
1007            }"#,
1008        )
1009        .unwrap();
1010
1011        assert!(!is_ticker_message(&msg, "BTCUSDT"));
1012    }
1013
1014    #[test]
1015    fn test_is_orderbook_message_depth_50() {
1016        let msg = serde_json::from_str(
1017            r#"{
1018                "topic": "orderbook.50.BTCUSDT",
1019                "type": "snapshot",
1020                "data": {},
1021                "ts": 1700000000000
1022            }"#,
1023        )
1024        .unwrap();
1025
1026        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1027    }
1028
1029    #[test]
1030    fn test_is_orderbook_message_depth_200() {
1031        let msg = serde_json::from_str(
1032            r#"{
1033                "topic": "orderbook.200.BTCUSDT",
1034                "type": "snapshot",
1035                "data": {},
1036                "ts": 1700000000000
1037            }"#,
1038        )
1039        .unwrap();
1040
1041        assert!(is_orderbook_message(&msg, "BTCUSDT"));
1042    }
1043
1044    #[test]
1045    fn test_is_trade_message_true() {
1046        let msg = serde_json::from_str(
1047            r#"{
1048                "topic": "publicTrade.BTCUSDT",
1049                "type": "snapshot",
1050                "data": [],
1051                "ts": 1700000000000
1052            }"#,
1053        )
1054        .unwrap();
1055
1056        assert!(is_trade_message(&msg, "BTCUSDT"));
1057    }
1058
1059    #[test]
1060    fn test_is_trade_message_wrong_topic() {
1061        let msg = serde_json::from_str(
1062            r#"{
1063                "topic": "tickers.BTCUSDT",
1064                "type": "snapshot",
1065                "data": {},
1066                "ts": 1700000000000
1067            }"#,
1068        )
1069        .unwrap();
1070
1071        assert!(!is_trade_message(&msg, "BTCUSDT"));
1072    }
1073
1074    // ==================== Symbol Formatting Tests ====================
1075
1076    #[test]
1077    fn test_format_unified_symbol_usdt() {
1078        assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1079        assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1080    }
1081
1082    #[test]
1083    fn test_format_unified_symbol_usdc() {
1084        assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1085    }
1086
1087    #[test]
1088    fn test_format_unified_symbol_btc() {
1089        assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1090    }
1091
1092    #[test]
1093    fn test_format_unified_symbol_unknown() {
1094        // Unknown quote currency returns as-is
1095        assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1096    }
1097}