use crate::{
errors::OrderbookError,
orderbooks::{OrderbookUpdate, OrderbookUpdateType},
utils::current_timestamp_ms,
};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, str::FromStr};
#[derive(Debug, Clone)]
pub struct NormalizedDelta {
pub symbol: String,
pub bids: Vec<(String, String)>,
pub asks: Vec<(String, String)>,
pub update_id: u64,
pub sequence: u64,
pub is_snapshot: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct OrderbookDelta {
pub bids: BTreeMap<Decimal, Decimal>,
pub asks: BTreeMap<Decimal, Decimal>,
pub last_update_id: u64,
pub sequence: u64,
pub symbol: String,
pub exchange: String,
pub initialized: bool,
pub delta_count: u64,
}
impl OrderbookDelta {
pub fn new(symbol: impl Into<String>) -> Self {
Self {
bids: BTreeMap::new(),
asks: BTreeMap::new(),
last_update_id: 0,
sequence: 0,
symbol: symbol.into(),
exchange: "".to_string(),
initialized: false,
delta_count: 0,
}
}
#[inline]
pub fn is_initialized(&self) -> bool {
self.initialized
}
#[inline]
pub fn symbol(&self) -> &str {
&self.symbol
}
#[inline]
pub fn exchange(&self) -> &str {
&self.exchange
}
#[inline]
pub fn last_update_id(&self) -> u64 {
self.last_update_id
}
#[inline]
pub fn sequence(&self) -> u64 {
self.sequence
}
#[inline]
pub fn delta_count(&self) -> u64 {
self.delta_count
}
pub fn process(
&mut self,
delta: &NormalizedDelta,
) -> Result<OrderbookUpdate, OrderbookError> {
if !self.symbol.is_empty() && delta.symbol != self.symbol {
return Err(OrderbookError::SymbolMismatch {
expected: self.symbol.clone(),
received: delta.symbol.clone(),
});
}
if delta.is_snapshot {
self.apply_snapshot(delta)
} else {
self.apply_delta(delta)
}
}
fn apply_snapshot(
&mut self,
delta: &NormalizedDelta,
) -> Result<OrderbookUpdate, OrderbookError> {
self.bids.clear();
self.asks.clear();
let mut bids_modified = 0;
let mut asks_modified = 0;
for (price_str, size_str) in &delta.bids {
let price = Self::parse_decimal(price_str)?;
let size = Self::parse_decimal(size_str)?;
if size > Decimal::ZERO {
self.bids.insert(price, size);
bids_modified += 1;
}
}
for (price_str, size_str) in &delta.asks {
let price = Self::parse_decimal(price_str)?;
let size = Self::parse_decimal(size_str)?;
if size > Decimal::ZERO {
self.asks.insert(price, size);
asks_modified += 1;
}
}
self.last_update_id = delta.update_id;
self.sequence = delta.sequence;
self.symbol.clone_from(&delta.symbol);
self.initialized = true;
self.delta_count = 0;
Ok(OrderbookUpdate {
update_type: OrderbookUpdateType::Snapshot,
bids_modified,
asks_modified,
levels_deleted: 0,
levels_inserted: bids_modified + asks_modified,
was_reset: true,
})
}
fn apply_delta(
&mut self,
delta: &NormalizedDelta,
) -> Result<OrderbookUpdate, OrderbookError> {
if !self.initialized {
return Err(OrderbookError::NotInitialized);
}
let mut bids_modified = 0;
let mut asks_modified = 0;
let mut levels_deleted = 0;
let mut levels_inserted = 0;
for (price_str, size_str) in &delta.bids {
let price = Self::parse_decimal(price_str)?;
let size = Self::parse_decimal(size_str)?;
let (deleted, inserted) = Self::apply_level(&mut self.bids, price, size);
bids_modified += 1;
if deleted {
levels_deleted += 1;
}
if inserted {
levels_inserted += 1;
}
}
for (price_str, size_str) in &delta.asks {
let price = Self::parse_decimal(price_str)?;
let size = Self::parse_decimal(size_str)?;
let (deleted, inserted) = Self::apply_level(&mut self.asks, price, size);
asks_modified += 1;
if deleted {
levels_deleted += 1;
}
if inserted {
levels_inserted += 1;
}
}
self.last_update_id = delta.update_id;
self.sequence = delta.sequence;
self.delta_count += 1;
Ok(OrderbookUpdate {
update_type: OrderbookUpdateType::Delta,
bids_modified,
asks_modified,
levels_deleted,
levels_inserted,
was_reset: false,
})
}
pub fn produce_snapshot(ob: &mut Self) -> OrderbookSnapshot {
let bids: Vec<[String; 2]> = ob
.top_bids(ob.bid_depth())
.iter()
.map(|(p, s)| [p.to_string(), s.to_string()])
.collect();
let asks: Vec<[String; 2]> = ob
.top_asks(ob.ask_depth())
.iter()
.map(|(p, s)| [p.to_string(), s.to_string()])
.collect();
OrderbookSnapshot {
timestamp_ms: current_timestamp_ms(),
symbol: ob.symbol().to_string(),
update_id: ob.last_update_id(),
sequence: ob.sequence(),
delta_count: ob.delta_count(),
bid_depth: ob.bid_depth(),
ask_depth: ob.ask_depth(),
mid_price: ob.mid_price().map(|d| d.round_dp(8).to_string()),
spread: ob.spread().map(|d| d.round_dp(8).to_string()),
spread_bps: ob.spread_bps().map(|d| d.round_dp(4).to_string()),
volume_imbalance: ob.volume_imbalance().map(|d| d.round_dp(6).to_string()),
total_bid_volume: ob.total_bid_volume().round_dp(8).to_string(),
total_ask_volume: ob.total_ask_volume().round_dp(8).to_string(),
bids,
asks,
}
}
fn apply_level(
book_side: &mut BTreeMap<Decimal, Decimal>,
price: Decimal,
size: Decimal,
) -> (bool, bool) {
if size == Decimal::ZERO {
let existed = book_side.remove(&price).is_some();
(existed, false)
} else {
match book_side.entry(price) {
std::collections::btree_map::Entry::Occupied(mut e) => {
e.insert(size);
(false, false)
}
std::collections::btree_map::Entry::Vacant(e) => {
e.insert(size);
(false, true)
}
}
}
}
fn parse_decimal(s: &str) -> Result<Decimal, OrderbookError> {
Decimal::from_str(s).map_err(|e| OrderbookError::ParseError(e.to_string()))
}
#[inline]
pub fn best_bid(&self) -> Option<(Decimal, Decimal)> {
self.bids.last_key_value().map(|(&p, &s)| (p, s))
}
#[inline]
pub fn best_ask(&self) -> Option<(Decimal, Decimal)> {
self.asks.first_key_value().map(|(&p, &s)| (p, s))
}
pub fn bbo(&self) -> Option<(Decimal, Decimal, Decimal, Decimal)> {
match (self.best_bid(), self.best_ask()) {
(Some((bp, bs)), Some((ap, az))) => Some((bp, bs, ap, az)),
_ => None,
}
}
pub fn mid_price(&self) -> Option<Decimal> {
match (self.best_bid(), self.best_ask()) {
(Some((bid, _)), Some((ask, _))) => Some((bid + ask) / Decimal::TWO),
_ => None,
}
}
pub fn spread(&self) -> Option<Decimal> {
match (self.best_bid(), self.best_ask()) {
(Some((bid, _)), Some((ask, _))) => Some(ask - bid),
_ => None,
}
}
pub fn spread_bps(&self) -> Option<Decimal> {
let spread = self.spread()?;
let mid = self.mid_price()?;
if mid > Decimal::ZERO {
Some((spread / mid) * Decimal::from(10_000))
} else {
None
}
}
#[inline]
pub fn bid_depth(&self) -> usize {
self.bids.len()
}
#[inline]
pub fn ask_depth(&self) -> usize {
self.asks.len()
}
pub fn total_bid_volume(&self) -> Decimal {
self.bids.values().copied().sum()
}
pub fn total_ask_volume(&self) -> Decimal {
self.asks.values().copied().sum()
}
pub fn volume_imbalance(&self) -> Option<Decimal> {
let bid_vol = self.total_bid_volume();
let ask_vol = self.total_ask_volume();
let total = bid_vol + ask_vol;
if total > Decimal::ZERO {
Some((bid_vol - ask_vol) / total)
} else {
None
}
}
pub fn top_bids(&self, n: usize) -> Vec<(Decimal, Decimal)> {
self.bids
.iter()
.rev()
.take(n)
.map(|(&p, &s)| (p, s))
.collect()
}
pub fn top_asks(&self, n: usize) -> Vec<(Decimal, Decimal)> {
self.asks.iter().take(n).map(|(&p, &s)| (p, s)).collect()
}
pub fn bid_volume_within(&self, depth: Decimal) -> Decimal {
let Some((best_bid, _)) = self.best_bid() else {
return Decimal::ZERO;
};
let threshold = best_bid - depth;
self.bids.range(threshold..).map(|(_, &size)| size).sum()
}
pub fn ask_volume_within(&self, depth: Decimal) -> Decimal {
let Some((best_ask, _)) = self.best_ask() else {
return Decimal::ZERO;
};
let threshold = best_ask + depth;
self.asks.range(..=threshold).map(|(_, &size)| size).sum()
}
pub fn from_maps(
symbol: impl Into<String>,
exchange: impl Into<String>,
bids: BTreeMap<Decimal, Decimal>,
asks: BTreeMap<Decimal, Decimal>,
) -> Self {
let bid_count = bids.len();
let ask_count = asks.len();
Self {
symbol: symbol.into(),
exchange: exchange.into(),
bids,
asks,
last_update_id: 0,
sequence: 0,
initialized: bid_count > 0 || ask_count > 0,
delta_count: 0,
}
}
pub fn from_snapshot(
symbol: impl Into<String>,
exchange: impl Into<String>,
bids: BTreeMap<Decimal, Decimal>,
asks: BTreeMap<Decimal, Decimal>,
update_id: u64,
sequence: u64,
delta_count: u64,
) -> Self {
let bid_count = bids.len();
let ask_count = asks.len();
Self {
symbol: symbol.into(),
exchange: exchange.into(),
bids,
asks,
last_update_id: update_id,
sequence,
initialized: bid_count > 0 || ask_count > 0,
delta_count,
}
}
pub fn from_levels(
symbol: impl Into<String>,
exchange: impl Into<String>,
bids: impl IntoIterator<Item = (Decimal, Decimal)>,
asks: impl IntoIterator<Item = (Decimal, Decimal)>,
) -> Self {
let bids: BTreeMap<Decimal, Decimal> = bids.into_iter().collect();
let asks: BTreeMap<Decimal, Decimal> = asks.into_iter().collect();
Self::from_maps(symbol, exchange, bids, asks)
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct OrderbookSnapshot {
pub timestamp_ms: u64,
pub symbol: String,
pub update_id: u64,
pub sequence: u64,
pub delta_count: u64,
pub bid_depth: usize,
pub ask_depth: usize,
pub mid_price: Option<String>,
pub spread: Option<String>,
pub spread_bps: Option<String>,
pub volume_imbalance: Option<String>,
pub total_bid_volume: String,
pub total_ask_volume: String,
pub bids: Vec<[String; 2]>,
pub asks: Vec<[String; 2]>,
}
impl OrderbookSnapshot {
pub fn display(&self, levels: usize) {
println!("╔══════════════════════════════════════════════════════════════╗");
println!(
"║ {} | update_id: {} | seq: {} | deltas: {}",
self.symbol, self.update_id, self.sequence, self.delta_count
);
println!("╠══════════════════════════════════════════════════════════════╣");
println!(
"║ {:^15} │ {:^12} ║ {:^12} │ {:^15} ║",
"ASK PRICE", "ASK SIZE", "BID SIZE", "BID PRICE"
);
println!("╠══════════════════════════════════════════════════════════════╣");
let display_levels = levels.min(self.bids.len()).min(self.asks.len());
let asks_rev: Vec<_> = self.asks.iter().take(display_levels).collect();
for (i, [ask_price, ask_size]) in asks_rev.iter().rev().enumerate() {
if i < display_levels {
let [bid_price, bid_size] = &self.bids[display_levels - 1 - i];
println!(
"║ {:>15} │ {:>12} ║ {:>12} │ {:>15} ║",
ask_price, ask_size, bid_size, bid_price
);
}
}
println!("╚══════════════════════════════════════════════════════════════╝");
}
}