Skip to main content

bybit_client/ws/
orderbook.rs

1//! Orderbook management with delta update support.
2//!
3//! This module provides a local orderbook that can be maintained from
4//! WebSocket snapshot and delta messages.
5//!
6//! # Usage
7//!
8//! ```no_run
9//! use bybit_client::ws::{LocalOrderbook, WsClient, WsChannel, WsMessage};
10//!
11//! #[tokio::main]
12//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
13//!     let (client, mut receiver) = WsClient::connect_public(WsChannel::PublicLinear).await?;
14//!     client.subscribe(&["orderbook.50.BTCUSDT"]).await?;
15//!
16//!     let mut orderbook = LocalOrderbook::new("BTCUSDT");
17//!
18//!     while let Some(msg) = receiver.recv().await {
19//!         if let WsMessage::Orderbook(update) = msg {
20//!             orderbook.apply_update(&update)?;
21//!             println!("Best bid: {:?}, Best ask: {:?}",
22//!                 orderbook.best_bid(), orderbook.best_ask());
23//!         }
24//!     }
25//!
26//!     Ok(())
27//! }
28//! ```
29
30use std::collections::BTreeMap;
31use std::cmp::Ordering;
32
33use crate::error::BybitError;
34use crate::ws::types::{OrderbookData, OrderbookEntry, WsStreamMessage};
35
36/// A price level in the orderbook.
37#[derive(Debug, Clone, PartialEq)]
38pub struct PriceLevel {
39    /// Price as a string (preserves decimal precision).
40    pub price: String,
41    /// Price as f64 for comparison and sorting.
42    pub price_f64: f64,
43    /// Size/quantity at this price level.
44    pub size: String,
45    /// Size as f64.
46    pub size_f64: f64,
47}
48
49impl PriceLevel {
50    /// Create a new price level from strings.
51    pub fn new(price: String, size: String) -> Result<Self, BybitError> {
52        let price_f64 = price
53            .parse::<f64>()
54            .map_err(|e| BybitError::InvalidParameter(format!("Invalid price '{}': {}", price, e)))?;
55        let size_f64 = size
56            .parse::<f64>()
57            .map_err(|e| BybitError::InvalidParameter(format!("Invalid size '{}': {}", size, e)))?;
58
59        Ok(Self {
60            price,
61            price_f64,
62            size,
63            size_f64,
64        })
65    }
66
67    /// Create from an orderbook entry.
68    pub fn from_entry(entry: &OrderbookEntry) -> Result<Self, BybitError> {
69        Self::new(entry.price.clone(), entry.size.clone())
70    }
71}
72
73/// Wrapper for price that implements Ord for use in BTreeMap.
74/// Bids are sorted descending (highest first), asks ascending (lowest first).
75#[derive(Debug, Clone, Copy)]
76struct OrderedPrice {
77    price: f64,
78    /// If true, sort descending (for bids). If false, sort ascending (for asks).
79    descending: bool,
80}
81
82impl PartialEq for OrderedPrice {
83    fn eq(&self, other: &Self) -> bool {
84        self.price == other.price
85    }
86}
87
88impl Eq for OrderedPrice {}
89
90impl PartialOrd for OrderedPrice {
91    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
92        Some(self.cmp(other))
93    }
94}
95
96impl Ord for OrderedPrice {
97    fn cmp(&self, other: &Self) -> Ordering {
98        let cmp = self
99            .price
100            .partial_cmp(&other.price)
101            .unwrap_or(Ordering::Equal);
102        if self.descending {
103            cmp.reverse()
104        } else {
105            cmp
106        }
107    }
108}
109
110/// Local orderbook maintained from WebSocket updates.
111///
112/// This structure maintains a sorted orderbook from snapshot and delta updates.
113/// Bids are sorted by price descending (highest first), asks ascending (lowest first).
114#[derive(Debug)]
115pub struct LocalOrderbook {
116    /// Symbol this orderbook is for.
117    symbol: String,
118    /// Bid price levels (sorted by price descending).
119    bids: BTreeMap<OrderedPrice, PriceLevel>,
120    /// Ask price levels (sorted by price ascending).
121    asks: BTreeMap<OrderedPrice, PriceLevel>,
122    /// Current update ID.
123    update_id: u64,
124    /// Sequence number for cross-depth comparison.
125    sequence: Option<u64>,
126    /// Last update timestamp.
127    last_update_ts: u64,
128    /// Whether the orderbook has received a snapshot.
129    initialized: bool,
130}
131
132impl LocalOrderbook {
133    /// Create a new empty orderbook for a symbol.
134    pub fn new(symbol: impl Into<String>) -> Self {
135        Self {
136            symbol: symbol.into(),
137            bids: BTreeMap::new(),
138            asks: BTreeMap::new(),
139            update_id: 0,
140            sequence: None,
141            last_update_ts: 0,
142            initialized: false,
143        }
144    }
145
146    /// Get the symbol this orderbook is for.
147    pub fn symbol(&self) -> &str {
148        &self.symbol
149    }
150
151    /// Check if the orderbook has been initialized with a snapshot.
152    pub fn is_initialized(&self) -> bool {
153        self.initialized
154    }
155
156    /// Get the current update ID.
157    pub fn update_id(&self) -> u64 {
158        self.update_id
159    }
160
161    /// Get the sequence number.
162    pub fn sequence(&self) -> Option<u64> {
163        self.sequence
164    }
165
166    /// Get the last update timestamp.
167    pub fn last_update_ts(&self) -> u64 {
168        self.last_update_ts
169    }
170
171    /// Apply an orderbook update (snapshot or delta).
172    ///
173    /// # Update Rules
174    ///
175    /// - Snapshots replace the entire orderbook.
176    /// - Deltas are applied incrementally:
177    ///   - If size is "0", the price level is removed.
178    ///   - If the price level does not exist, it is inserted.
179    ///   - If the price level exists, the size is updated.
180    /// - If `update_id` is 1, treat it as a snapshot.
181    pub fn apply_update(
182        &mut self,
183        update: &WsStreamMessage<OrderbookData>,
184    ) -> Result<(), BybitError> {
185        if update.data.symbol != self.symbol {
186            return Err(BybitError::InvalidParameter(format!(
187                "Symbol mismatch: expected {}, got {}",
188                self.symbol, update.data.symbol
189            )));
190        }
191
192        let is_snapshot = update.update_type == "snapshot" || update.data.update_id == 1;
193
194        if is_snapshot {
195            self.apply_snapshot(&update.data)?;
196        } else {
197            if !self.initialized {
198                return Err(BybitError::InvalidParameter(
199                    "Received delta before snapshot".to_string(),
200                ));
201            }
202            self.apply_delta(&update.data)?;
203        }
204
205        self.update_id = update.data.update_id;
206        self.sequence = update.data.seq;
207        self.last_update_ts = update.ts;
208
209        Ok(())
210    }
211
212    /// Apply a snapshot update (replaces entire orderbook).
213    fn apply_snapshot(&mut self, data: &OrderbookData) -> Result<(), BybitError> {
214        self.bids.clear();
215        self.asks.clear();
216
217        for entry in &data.bids {
218            let level = PriceLevel::from_entry(entry)?;
219            let key = OrderedPrice {
220                price: level.price_f64,
221                descending: true,
222            };
223            self.bids.insert(key, level);
224        }
225
226        for entry in &data.asks {
227            let level = PriceLevel::from_entry(entry)?;
228            let key = OrderedPrice {
229                price: level.price_f64,
230                descending: false,
231            };
232            self.asks.insert(key, level);
233        }
234
235        self.initialized = true;
236        Ok(())
237    }
238
239    /// Apply a delta update (incremental changes).
240    fn apply_delta(&mut self, data: &OrderbookData) -> Result<(), BybitError> {
241        for entry in &data.bids {
242            let level = PriceLevel::from_entry(entry)?;
243            let key = OrderedPrice {
244                price: level.price_f64,
245                descending: true,
246            };
247
248            if level.size_f64 == 0.0 {
249                self.bids.remove(&key);
250            } else {
251                self.bids.insert(key, level);
252            }
253        }
254
255        for entry in &data.asks {
256            let level = PriceLevel::from_entry(entry)?;
257            let key = OrderedPrice {
258                price: level.price_f64,
259                descending: false,
260            };
261
262            if level.size_f64 == 0.0 {
263                self.asks.remove(&key);
264            } else {
265                self.asks.insert(key, level);
266            }
267        }
268
269        Ok(())
270    }
271
272    /// Get the best bid (highest buy price).
273    pub fn best_bid(&self) -> Option<&PriceLevel> {
274        self.bids.values().next()
275    }
276
277    /// Get the best ask (lowest sell price).
278    pub fn best_ask(&self) -> Option<&PriceLevel> {
279        self.asks.values().next()
280    }
281
282    /// Get the bid-ask spread.
283    pub fn spread(&self) -> Option<f64> {
284        match (self.best_ask(), self.best_bid()) {
285            (Some(ask), Some(bid)) => Some(ask.price_f64 - bid.price_f64),
286            _ => None,
287        }
288    }
289
290    /// Get the mid price (average of best bid and best ask).
291    pub fn mid_price(&self) -> Option<f64> {
292        match (self.best_ask(), self.best_bid()) {
293            (Some(ask), Some(bid)) => Some((ask.price_f64 + bid.price_f64) / 2.0),
294            _ => None,
295        }
296    }
297
298    /// Get the top N bids (sorted by price descending).
299    pub fn top_bids(&self, n: usize) -> Vec<&PriceLevel> {
300        self.bids.values().take(n).collect()
301    }
302
303    /// Get the top N asks (sorted by price ascending).
304    pub fn top_asks(&self, n: usize) -> Vec<&PriceLevel> {
305        self.asks.values().take(n).collect()
306    }
307
308    /// Get all bids (sorted by price descending).
309    pub fn bids(&self) -> impl Iterator<Item = &PriceLevel> {
310        self.bids.values()
311    }
312
313    /// Get all asks (sorted by price ascending).
314    pub fn asks(&self) -> impl Iterator<Item = &PriceLevel> {
315        self.asks.values()
316    }
317
318    /// Get the total bid depth (sum of all bid sizes).
319    pub fn bid_depth(&self) -> f64 {
320        self.bids.values().map(|l| l.size_f64).sum()
321    }
322
323    /// Get the total ask depth (sum of all ask sizes).
324    pub fn ask_depth(&self) -> f64 {
325        self.asks.values().map(|l| l.size_f64).sum()
326    }
327
328    /// Get the number of bid levels.
329    pub fn bid_levels(&self) -> usize {
330        self.bids.len()
331    }
332
333    /// Get the number of ask levels.
334    pub fn ask_levels(&self) -> usize {
335        self.asks.len()
336    }
337
338    /// Clear the orderbook and reset to uninitialized state.
339    pub fn clear(&mut self) {
340        self.bids.clear();
341        self.asks.clear();
342        self.update_id = 0;
343        self.sequence = None;
344        self.last_update_ts = 0;
345        self.initialized = false;
346    }
347
348    /// Get orderbook imbalance ratio.
349    ///
350    /// Returns a value between -1 and 1.
351    /// - Positive values indicate more bid depth (buying pressure).
352    /// - Negative values indicate more ask depth (selling pressure).
353    /// - Zero indicates balanced depth.
354    pub fn imbalance(&self) -> f64 {
355        let bid_depth = self.bid_depth();
356        let ask_depth = self.ask_depth();
357        let total = bid_depth + ask_depth;
358
359        if total == 0.0 {
360            0.0
361        } else {
362            (bid_depth - ask_depth) / total
363        }
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    fn make_entry(price: &str, size: &str) -> OrderbookEntry {
372        OrderbookEntry {
373            price: price.to_string(),
374            size: size.to_string(),
375        }
376    }
377
378    fn make_snapshot(
379        symbol: &str,
380        bids: Vec<(&str, &str)>,
381        asks: Vec<(&str, &str)>,
382    ) -> WsStreamMessage<OrderbookData> {
383        WsStreamMessage {
384            topic: format!("orderbook.50.{}", symbol),
385            update_type: "snapshot".to_string(),
386            ts: 1234567890000,
387            data: OrderbookData {
388                symbol: symbol.to_string(),
389                bids: bids.iter().map(|(p, s)| make_entry(p, s)).collect(),
390                asks: asks.iter().map(|(p, s)| make_entry(p, s)).collect(),
391                update_id: 1,
392                seq: Some(100),
393            },
394            cts: None,
395        }
396    }
397
398    fn make_delta(
399        symbol: &str,
400        bids: Vec<(&str, &str)>,
401        asks: Vec<(&str, &str)>,
402        update_id: u64,
403    ) -> WsStreamMessage<OrderbookData> {
404        WsStreamMessage {
405            topic: format!("orderbook.50.{}", symbol),
406            update_type: "delta".to_string(),
407            ts: 1234567890001,
408            data: OrderbookData {
409                symbol: symbol.to_string(),
410                bids: bids.iter().map(|(p, s)| make_entry(p, s)).collect(),
411                asks: asks.iter().map(|(p, s)| make_entry(p, s)).collect(),
412                update_id,
413                seq: Some(101),
414            },
415            cts: None,
416        }
417    }
418
419    #[test]
420    fn test_new_orderbook() {
421        let ob = LocalOrderbook::new("BTCUSDT");
422        assert_eq!(ob.symbol(), "BTCUSDT");
423        assert!(!ob.is_initialized());
424        assert_eq!(ob.bid_levels(), 0);
425        assert_eq!(ob.ask_levels(), 0);
426    }
427
428    #[test]
429    fn test_apply_snapshot() {
430        let mut ob = LocalOrderbook::new("BTCUSDT");
431        let snapshot = make_snapshot(
432            "BTCUSDT",
433            vec![("50000", "1.5"), ("49999", "2.0"), ("49998", "3.0")],
434            vec![("50001", "0.8"), ("50002", "1.2")],
435        );
436
437        if let Err(err) = ob.apply_update(&snapshot) {
438            panic!("Failed to apply snapshot: {}", err);
439        }
440
441        assert!(ob.is_initialized());
442        assert_eq!(ob.bid_levels(), 3);
443        assert_eq!(ob.ask_levels(), 2);
444
445        let best_bid = match ob.best_bid() {
446            Some(level) => level,
447            None => panic!("Expected best bid"),
448        };
449        assert_eq!(best_bid.price, "50000");
450        assert_eq!(best_bid.size, "1.5");
451
452        let best_ask = match ob.best_ask() {
453            Some(level) => level,
454            None => panic!("Expected best ask"),
455        };
456        assert_eq!(best_ask.price, "50001");
457        assert_eq!(best_ask.size, "0.8");
458    }
459
460    #[test]
461    fn test_apply_delta_insert() {
462        let mut ob = LocalOrderbook::new("BTCUSDT");
463        let snapshot = make_snapshot(
464            "BTCUSDT",
465            vec![("50000", "1.5")],
466            vec![("50001", "0.8")],
467        );
468        if let Err(err) = ob.apply_update(&snapshot) {
469            panic!("Failed to apply snapshot: {}", err);
470        }
471
472        let delta = make_delta("BTCUSDT", vec![("49999", "2.0")], vec![], 2);
473        if let Err(err) = ob.apply_update(&delta) {
474            panic!("Failed to apply delta: {}", err);
475        }
476
477        assert_eq!(ob.bid_levels(), 2);
478        let top_bids = ob.top_bids(2);
479        assert_eq!(top_bids[0].price, "50000");
480        assert_eq!(top_bids[1].price, "49999");
481    }
482
483    #[test]
484    fn test_apply_delta_update() {
485        let mut ob = LocalOrderbook::new("BTCUSDT");
486        let snapshot = make_snapshot(
487            "BTCUSDT",
488            vec![("50000", "1.5")],
489            vec![("50001", "0.8")],
490        );
491        if let Err(err) = ob.apply_update(&snapshot) {
492            panic!("Failed to apply snapshot: {}", err);
493        }
494
495        let delta = make_delta("BTCUSDT", vec![("50000", "3.0")], vec![], 2);
496        if let Err(err) = ob.apply_update(&delta) {
497            panic!("Failed to apply delta: {}", err);
498        }
499
500        assert_eq!(ob.bid_levels(), 1);
501        let best_bid = match ob.best_bid() {
502            Some(level) => level,
503            None => panic!("Expected best bid"),
504        };
505        assert_eq!(best_bid.price, "50000");
506        assert_eq!(best_bid.size, "3.0");
507    }
508
509    #[test]
510    fn test_apply_delta_delete() {
511        let mut ob = LocalOrderbook::new("BTCUSDT");
512        let snapshot = make_snapshot(
513            "BTCUSDT",
514            vec![("50000", "1.5"), ("49999", "2.0")],
515            vec![("50001", "0.8")],
516        );
517        if let Err(err) = ob.apply_update(&snapshot) {
518            panic!("Failed to apply snapshot: {}", err);
519        }
520
521        let delta = make_delta("BTCUSDT", vec![("50000", "0")], vec![], 2);
522        if let Err(err) = ob.apply_update(&delta) {
523            panic!("Failed to apply delta: {}", err);
524        }
525
526        assert_eq!(ob.bid_levels(), 1);
527        let best_bid = match ob.best_bid() {
528            Some(level) => level,
529            None => panic!("Expected best bid"),
530        };
531        assert_eq!(best_bid.price, "49999");
532    }
533
534    #[test]
535    fn test_spread_and_mid_price() {
536        let mut ob = LocalOrderbook::new("BTCUSDT");
537        let snapshot = make_snapshot(
538            "BTCUSDT",
539            vec![("50000", "1.0")],
540            vec![("50010", "1.0")],
541        );
542        if let Err(err) = ob.apply_update(&snapshot) {
543            panic!("Failed to apply snapshot: {}", err);
544        }
545
546        assert_eq!(ob.spread(), Some(10.0));
547        assert_eq!(ob.mid_price(), Some(50005.0));
548    }
549
550    #[test]
551    fn test_depth_calculation() {
552        let mut ob = LocalOrderbook::new("BTCUSDT");
553        let snapshot = make_snapshot(
554            "BTCUSDT",
555            vec![("50000", "1.0"), ("49999", "2.0")],
556            vec![("50001", "0.5"), ("50002", "1.5")],
557        );
558        if let Err(err) = ob.apply_update(&snapshot) {
559            panic!("Failed to apply snapshot: {}", err);
560        }
561
562        assert_eq!(ob.bid_depth(), 3.0);
563        assert_eq!(ob.ask_depth(), 2.0);
564    }
565
566    #[test]
567    fn test_imbalance() {
568        let mut ob = LocalOrderbook::new("BTCUSDT");
569        let snapshot = make_snapshot(
570            "BTCUSDT",
571            vec![("50000", "3.0")],
572            vec![("50001", "1.0")],
573        );
574        if let Err(err) = ob.apply_update(&snapshot) {
575            panic!("Failed to apply snapshot: {}", err);
576        }
577
578        assert_eq!(ob.imbalance(), 0.5);
579    }
580
581    #[test]
582    fn test_symbol_mismatch() {
583        let mut ob = LocalOrderbook::new("BTCUSDT");
584        let snapshot = make_snapshot("ETHUSDT", vec![], vec![]);
585
586        let result = ob.apply_update(&snapshot);
587        assert!(result.is_err());
588    }
589
590    #[test]
591    fn test_delta_before_snapshot() {
592        let mut ob = LocalOrderbook::new("BTCUSDT");
593        let delta = make_delta("BTCUSDT", vec![("50000", "1.0")], vec![], 2);
594
595        let result = ob.apply_update(&delta);
596        assert!(result.is_err());
597    }
598
599    #[test]
600    fn test_reset_on_update_id_1() {
601        let mut ob = LocalOrderbook::new("BTCUSDT");
602        let snapshot = make_snapshot(
603            "BTCUSDT",
604            vec![("50000", "1.5")],
605            vec![("50001", "0.8")],
606        );
607        if let Err(err) = ob.apply_update(&snapshot) {
608            panic!("Failed to apply snapshot: {}", err);
609        }
610
611        let reset = WsStreamMessage {
612            topic: "orderbook.50.BTCUSDT".to_string(),
613            update_type: "delta".to_string(),
614            ts: 1234567890002,
615            data: OrderbookData {
616                symbol: "BTCUSDT".to_string(),
617                bids: vec![make_entry("49000", "5.0")],
618                asks: vec![make_entry("49001", "4.0")],
619                update_id: 1,
620                seq: Some(1),
621            },
622            cts: None,
623        };
624        if let Err(err) = ob.apply_update(&reset) {
625            panic!("Failed to apply reset update: {}", err);
626        }
627
628        assert_eq!(ob.bid_levels(), 1);
629        let best_bid = match ob.best_bid() {
630            Some(level) => level,
631            None => panic!("Expected best bid after reset"),
632        };
633        assert_eq!(best_bid.price, "49000");
634    }
635
636    #[test]
637    fn test_clear() {
638        let mut ob = LocalOrderbook::new("BTCUSDT");
639        let snapshot = make_snapshot(
640            "BTCUSDT",
641            vec![("50000", "1.5")],
642            vec![("50001", "0.8")],
643        );
644        if let Err(err) = ob.apply_update(&snapshot) {
645            panic!("Failed to apply snapshot: {}", err);
646        }
647
648        ob.clear();
649
650        assert!(!ob.is_initialized());
651        assert_eq!(ob.bid_levels(), 0);
652        assert_eq!(ob.ask_levels(), 0);
653    }
654}