1use std::collections::BTreeMap;
31use std::cmp::Ordering;
32
33use crate::error::BybitError;
34use crate::ws::types::{OrderbookData, OrderbookEntry, WsStreamMessage};
35
36#[derive(Debug, Clone, PartialEq)]
38pub struct PriceLevel {
39 pub price: String,
41 pub price_f64: f64,
43 pub size: String,
45 pub size_f64: f64,
47}
48
49impl PriceLevel {
50 pub fn new(price: String, size: String) -> Result<Self, BybitError> {
52 let price_f64 = price
53 .parse::<f64>()
54 .map_err(|e| BybitError::InvalidParameter(format!("Invalid price '{}': {}", price, e)))?;
55 let size_f64 = size
56 .parse::<f64>()
57 .map_err(|e| BybitError::InvalidParameter(format!("Invalid size '{}': {}", size, e)))?;
58
59 Ok(Self {
60 price,
61 price_f64,
62 size,
63 size_f64,
64 })
65 }
66
67 pub fn from_entry(entry: &OrderbookEntry) -> Result<Self, BybitError> {
69 Self::new(entry.price.clone(), entry.size.clone())
70 }
71}
72
73#[derive(Debug, Clone, Copy)]
76struct OrderedPrice {
77 price: f64,
78 descending: bool,
80}
81
82impl PartialEq for OrderedPrice {
83 fn eq(&self, other: &Self) -> bool {
84 self.price == other.price
85 }
86}
87
88impl Eq for OrderedPrice {}
89
90impl PartialOrd for OrderedPrice {
91 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
92 Some(self.cmp(other))
93 }
94}
95
96impl Ord for OrderedPrice {
97 fn cmp(&self, other: &Self) -> Ordering {
98 let cmp = self
99 .price
100 .partial_cmp(&other.price)
101 .unwrap_or(Ordering::Equal);
102 if self.descending {
103 cmp.reverse()
104 } else {
105 cmp
106 }
107 }
108}
109
110#[derive(Debug)]
115pub struct LocalOrderbook {
116 symbol: String,
118 bids: BTreeMap<OrderedPrice, PriceLevel>,
120 asks: BTreeMap<OrderedPrice, PriceLevel>,
122 update_id: u64,
124 sequence: Option<u64>,
126 last_update_ts: u64,
128 initialized: bool,
130}
131
132impl LocalOrderbook {
133 pub fn new(symbol: impl Into<String>) -> Self {
135 Self {
136 symbol: symbol.into(),
137 bids: BTreeMap::new(),
138 asks: BTreeMap::new(),
139 update_id: 0,
140 sequence: None,
141 last_update_ts: 0,
142 initialized: false,
143 }
144 }
145
146 pub fn symbol(&self) -> &str {
148 &self.symbol
149 }
150
151 pub fn is_initialized(&self) -> bool {
153 self.initialized
154 }
155
156 pub fn update_id(&self) -> u64 {
158 self.update_id
159 }
160
161 pub fn sequence(&self) -> Option<u64> {
163 self.sequence
164 }
165
166 pub fn last_update_ts(&self) -> u64 {
168 self.last_update_ts
169 }
170
171 pub fn apply_update(
182 &mut self,
183 update: &WsStreamMessage<OrderbookData>,
184 ) -> Result<(), BybitError> {
185 if update.data.symbol != self.symbol {
186 return Err(BybitError::InvalidParameter(format!(
187 "Symbol mismatch: expected {}, got {}",
188 self.symbol, update.data.symbol
189 )));
190 }
191
192 let is_snapshot = update.update_type == "snapshot" || update.data.update_id == 1;
193
194 if is_snapshot {
195 self.apply_snapshot(&update.data)?;
196 } else {
197 if !self.initialized {
198 return Err(BybitError::InvalidParameter(
199 "Received delta before snapshot".to_string(),
200 ));
201 }
202 self.apply_delta(&update.data)?;
203 }
204
205 self.update_id = update.data.update_id;
206 self.sequence = update.data.seq;
207 self.last_update_ts = update.ts;
208
209 Ok(())
210 }
211
212 fn apply_snapshot(&mut self, data: &OrderbookData) -> Result<(), BybitError> {
214 self.bids.clear();
215 self.asks.clear();
216
217 for entry in &data.bids {
218 let level = PriceLevel::from_entry(entry)?;
219 let key = OrderedPrice {
220 price: level.price_f64,
221 descending: true,
222 };
223 self.bids.insert(key, level);
224 }
225
226 for entry in &data.asks {
227 let level = PriceLevel::from_entry(entry)?;
228 let key = OrderedPrice {
229 price: level.price_f64,
230 descending: false,
231 };
232 self.asks.insert(key, level);
233 }
234
235 self.initialized = true;
236 Ok(())
237 }
238
239 fn apply_delta(&mut self, data: &OrderbookData) -> Result<(), BybitError> {
241 for entry in &data.bids {
242 let level = PriceLevel::from_entry(entry)?;
243 let key = OrderedPrice {
244 price: level.price_f64,
245 descending: true,
246 };
247
248 if level.size_f64 == 0.0 {
249 self.bids.remove(&key);
250 } else {
251 self.bids.insert(key, level);
252 }
253 }
254
255 for entry in &data.asks {
256 let level = PriceLevel::from_entry(entry)?;
257 let key = OrderedPrice {
258 price: level.price_f64,
259 descending: false,
260 };
261
262 if level.size_f64 == 0.0 {
263 self.asks.remove(&key);
264 } else {
265 self.asks.insert(key, level);
266 }
267 }
268
269 Ok(())
270 }
271
272 pub fn best_bid(&self) -> Option<&PriceLevel> {
274 self.bids.values().next()
275 }
276
277 pub fn best_ask(&self) -> Option<&PriceLevel> {
279 self.asks.values().next()
280 }
281
282 pub fn spread(&self) -> Option<f64> {
284 match (self.best_ask(), self.best_bid()) {
285 (Some(ask), Some(bid)) => Some(ask.price_f64 - bid.price_f64),
286 _ => None,
287 }
288 }
289
290 pub fn mid_price(&self) -> Option<f64> {
292 match (self.best_ask(), self.best_bid()) {
293 (Some(ask), Some(bid)) => Some((ask.price_f64 + bid.price_f64) / 2.0),
294 _ => None,
295 }
296 }
297
298 pub fn top_bids(&self, n: usize) -> Vec<&PriceLevel> {
300 self.bids.values().take(n).collect()
301 }
302
303 pub fn top_asks(&self, n: usize) -> Vec<&PriceLevel> {
305 self.asks.values().take(n).collect()
306 }
307
308 pub fn bids(&self) -> impl Iterator<Item = &PriceLevel> {
310 self.bids.values()
311 }
312
313 pub fn asks(&self) -> impl Iterator<Item = &PriceLevel> {
315 self.asks.values()
316 }
317
318 pub fn bid_depth(&self) -> f64 {
320 self.bids.values().map(|l| l.size_f64).sum()
321 }
322
323 pub fn ask_depth(&self) -> f64 {
325 self.asks.values().map(|l| l.size_f64).sum()
326 }
327
328 pub fn bid_levels(&self) -> usize {
330 self.bids.len()
331 }
332
333 pub fn ask_levels(&self) -> usize {
335 self.asks.len()
336 }
337
338 pub fn clear(&mut self) {
340 self.bids.clear();
341 self.asks.clear();
342 self.update_id = 0;
343 self.sequence = None;
344 self.last_update_ts = 0;
345 self.initialized = false;
346 }
347
348 pub fn imbalance(&self) -> f64 {
355 let bid_depth = self.bid_depth();
356 let ask_depth = self.ask_depth();
357 let total = bid_depth + ask_depth;
358
359 if total == 0.0 {
360 0.0
361 } else {
362 (bid_depth - ask_depth) / total
363 }
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 fn make_entry(price: &str, size: &str) -> OrderbookEntry {
372 OrderbookEntry {
373 price: price.to_string(),
374 size: size.to_string(),
375 }
376 }
377
378 fn make_snapshot(
379 symbol: &str,
380 bids: Vec<(&str, &str)>,
381 asks: Vec<(&str, &str)>,
382 ) -> WsStreamMessage<OrderbookData> {
383 WsStreamMessage {
384 topic: format!("orderbook.50.{}", symbol),
385 update_type: "snapshot".to_string(),
386 ts: 1234567890000,
387 data: OrderbookData {
388 symbol: symbol.to_string(),
389 bids: bids.iter().map(|(p, s)| make_entry(p, s)).collect(),
390 asks: asks.iter().map(|(p, s)| make_entry(p, s)).collect(),
391 update_id: 1,
392 seq: Some(100),
393 },
394 cts: None,
395 }
396 }
397
398 fn make_delta(
399 symbol: &str,
400 bids: Vec<(&str, &str)>,
401 asks: Vec<(&str, &str)>,
402 update_id: u64,
403 ) -> WsStreamMessage<OrderbookData> {
404 WsStreamMessage {
405 topic: format!("orderbook.50.{}", symbol),
406 update_type: "delta".to_string(),
407 ts: 1234567890001,
408 data: OrderbookData {
409 symbol: symbol.to_string(),
410 bids: bids.iter().map(|(p, s)| make_entry(p, s)).collect(),
411 asks: asks.iter().map(|(p, s)| make_entry(p, s)).collect(),
412 update_id,
413 seq: Some(101),
414 },
415 cts: None,
416 }
417 }
418
419 #[test]
420 fn test_new_orderbook() {
421 let ob = LocalOrderbook::new("BTCUSDT");
422 assert_eq!(ob.symbol(), "BTCUSDT");
423 assert!(!ob.is_initialized());
424 assert_eq!(ob.bid_levels(), 0);
425 assert_eq!(ob.ask_levels(), 0);
426 }
427
428 #[test]
429 fn test_apply_snapshot() {
430 let mut ob = LocalOrderbook::new("BTCUSDT");
431 let snapshot = make_snapshot(
432 "BTCUSDT",
433 vec![("50000", "1.5"), ("49999", "2.0"), ("49998", "3.0")],
434 vec![("50001", "0.8"), ("50002", "1.2")],
435 );
436
437 if let Err(err) = ob.apply_update(&snapshot) {
438 panic!("Failed to apply snapshot: {}", err);
439 }
440
441 assert!(ob.is_initialized());
442 assert_eq!(ob.bid_levels(), 3);
443 assert_eq!(ob.ask_levels(), 2);
444
445 let best_bid = match ob.best_bid() {
446 Some(level) => level,
447 None => panic!("Expected best bid"),
448 };
449 assert_eq!(best_bid.price, "50000");
450 assert_eq!(best_bid.size, "1.5");
451
452 let best_ask = match ob.best_ask() {
453 Some(level) => level,
454 None => panic!("Expected best ask"),
455 };
456 assert_eq!(best_ask.price, "50001");
457 assert_eq!(best_ask.size, "0.8");
458 }
459
460 #[test]
461 fn test_apply_delta_insert() {
462 let mut ob = LocalOrderbook::new("BTCUSDT");
463 let snapshot = make_snapshot(
464 "BTCUSDT",
465 vec![("50000", "1.5")],
466 vec![("50001", "0.8")],
467 );
468 if let Err(err) = ob.apply_update(&snapshot) {
469 panic!("Failed to apply snapshot: {}", err);
470 }
471
472 let delta = make_delta("BTCUSDT", vec![("49999", "2.0")], vec![], 2);
473 if let Err(err) = ob.apply_update(&delta) {
474 panic!("Failed to apply delta: {}", err);
475 }
476
477 assert_eq!(ob.bid_levels(), 2);
478 let top_bids = ob.top_bids(2);
479 assert_eq!(top_bids[0].price, "50000");
480 assert_eq!(top_bids[1].price, "49999");
481 }
482
483 #[test]
484 fn test_apply_delta_update() {
485 let mut ob = LocalOrderbook::new("BTCUSDT");
486 let snapshot = make_snapshot(
487 "BTCUSDT",
488 vec![("50000", "1.5")],
489 vec![("50001", "0.8")],
490 );
491 if let Err(err) = ob.apply_update(&snapshot) {
492 panic!("Failed to apply snapshot: {}", err);
493 }
494
495 let delta = make_delta("BTCUSDT", vec![("50000", "3.0")], vec![], 2);
496 if let Err(err) = ob.apply_update(&delta) {
497 panic!("Failed to apply delta: {}", err);
498 }
499
500 assert_eq!(ob.bid_levels(), 1);
501 let best_bid = match ob.best_bid() {
502 Some(level) => level,
503 None => panic!("Expected best bid"),
504 };
505 assert_eq!(best_bid.price, "50000");
506 assert_eq!(best_bid.size, "3.0");
507 }
508
509 #[test]
510 fn test_apply_delta_delete() {
511 let mut ob = LocalOrderbook::new("BTCUSDT");
512 let snapshot = make_snapshot(
513 "BTCUSDT",
514 vec![("50000", "1.5"), ("49999", "2.0")],
515 vec![("50001", "0.8")],
516 );
517 if let Err(err) = ob.apply_update(&snapshot) {
518 panic!("Failed to apply snapshot: {}", err);
519 }
520
521 let delta = make_delta("BTCUSDT", vec![("50000", "0")], vec![], 2);
522 if let Err(err) = ob.apply_update(&delta) {
523 panic!("Failed to apply delta: {}", err);
524 }
525
526 assert_eq!(ob.bid_levels(), 1);
527 let best_bid = match ob.best_bid() {
528 Some(level) => level,
529 None => panic!("Expected best bid"),
530 };
531 assert_eq!(best_bid.price, "49999");
532 }
533
534 #[test]
535 fn test_spread_and_mid_price() {
536 let mut ob = LocalOrderbook::new("BTCUSDT");
537 let snapshot = make_snapshot(
538 "BTCUSDT",
539 vec![("50000", "1.0")],
540 vec![("50010", "1.0")],
541 );
542 if let Err(err) = ob.apply_update(&snapshot) {
543 panic!("Failed to apply snapshot: {}", err);
544 }
545
546 assert_eq!(ob.spread(), Some(10.0));
547 assert_eq!(ob.mid_price(), Some(50005.0));
548 }
549
550 #[test]
551 fn test_depth_calculation() {
552 let mut ob = LocalOrderbook::new("BTCUSDT");
553 let snapshot = make_snapshot(
554 "BTCUSDT",
555 vec![("50000", "1.0"), ("49999", "2.0")],
556 vec![("50001", "0.5"), ("50002", "1.5")],
557 );
558 if let Err(err) = ob.apply_update(&snapshot) {
559 panic!("Failed to apply snapshot: {}", err);
560 }
561
562 assert_eq!(ob.bid_depth(), 3.0);
563 assert_eq!(ob.ask_depth(), 2.0);
564 }
565
566 #[test]
567 fn test_imbalance() {
568 let mut ob = LocalOrderbook::new("BTCUSDT");
569 let snapshot = make_snapshot(
570 "BTCUSDT",
571 vec![("50000", "3.0")],
572 vec![("50001", "1.0")],
573 );
574 if let Err(err) = ob.apply_update(&snapshot) {
575 panic!("Failed to apply snapshot: {}", err);
576 }
577
578 assert_eq!(ob.imbalance(), 0.5);
579 }
580
581 #[test]
582 fn test_symbol_mismatch() {
583 let mut ob = LocalOrderbook::new("BTCUSDT");
584 let snapshot = make_snapshot("ETHUSDT", vec![], vec![]);
585
586 let result = ob.apply_update(&snapshot);
587 assert!(result.is_err());
588 }
589
590 #[test]
591 fn test_delta_before_snapshot() {
592 let mut ob = LocalOrderbook::new("BTCUSDT");
593 let delta = make_delta("BTCUSDT", vec![("50000", "1.0")], vec![], 2);
594
595 let result = ob.apply_update(&delta);
596 assert!(result.is_err());
597 }
598
599 #[test]
600 fn test_reset_on_update_id_1() {
601 let mut ob = LocalOrderbook::new("BTCUSDT");
602 let snapshot = make_snapshot(
603 "BTCUSDT",
604 vec![("50000", "1.5")],
605 vec![("50001", "0.8")],
606 );
607 if let Err(err) = ob.apply_update(&snapshot) {
608 panic!("Failed to apply snapshot: {}", err);
609 }
610
611 let reset = WsStreamMessage {
612 topic: "orderbook.50.BTCUSDT".to_string(),
613 update_type: "delta".to_string(),
614 ts: 1234567890002,
615 data: OrderbookData {
616 symbol: "BTCUSDT".to_string(),
617 bids: vec![make_entry("49000", "5.0")],
618 asks: vec![make_entry("49001", "4.0")],
619 update_id: 1,
620 seq: Some(1),
621 },
622 cts: None,
623 };
624 if let Err(err) = ob.apply_update(&reset) {
625 panic!("Failed to apply reset update: {}", err);
626 }
627
628 assert_eq!(ob.bid_levels(), 1);
629 let best_bid = match ob.best_bid() {
630 Some(level) => level,
631 None => panic!("Expected best bid after reset"),
632 };
633 assert_eq!(best_bid.price, "49000");
634 }
635
636 #[test]
637 fn test_clear() {
638 let mut ob = LocalOrderbook::new("BTCUSDT");
639 let snapshot = make_snapshot(
640 "BTCUSDT",
641 vec![("50000", "1.5")],
642 vec![("50001", "0.8")],
643 );
644 if let Err(err) = ob.apply_update(&snapshot) {
645 panic!("Failed to apply snapshot: {}", err);
646 }
647
648 ob.clear();
649
650 assert!(!ob.is_initialized());
651 assert_eq!(ob.bid_levels(), 0);
652 assert_eq!(ob.ask_levels(), 0);
653 }
654}