use crate::{
books::{Asks, Bids, Level, OrderBook, OrderBookSide},
subscription::book::OrderBookEvent,
};
use ibapi::market_data::realtime::{MarketDepth, MarketDepths};
use rust_decimal::Decimal;
use super::decimal_from_f64;
const IB_DEPTH_OP_DELETE: i32 = 2;
#[derive(Debug, Default)]
pub struct DepthAggregator {
bids: OrderBookSide<Bids>,
asks: OrderBookSide<Asks>,
sequence: u64,
}
impl DepthAggregator {
pub fn new() -> Self {
Self::default()
}
pub fn update(&mut self, depth: &MarketDepths) -> Option<OrderBookEvent> {
match depth {
MarketDepths::MarketDepth(d) => self.process_depth(d),
MarketDepths::MarketDepthL2(_) => {
tracing::trace!("Discarding MarketDepthL2 event (market maker data not tracked)");
None
}
MarketDepths::Notice(_) => None,
}
}
fn process_depth(&mut self, depth: &MarketDepth) -> Option<OrderBookEvent> {
let price = decimal_from_f64(depth.price)?;
let size = if depth.operation == IB_DEPTH_OP_DELETE {
Decimal::ZERO
} else {
decimal_from_f64(depth.size)?
};
match depth.side {
0 => self.update_asks(price, size),
1 => self.update_bids(price, size),
other => {
tracing::warn!(
side = other,
price = %price,
"Unknown IB depth side, skipping"
);
return None;
}
}
self.sequence += 1;
Some(OrderBookEvent::Snapshot(self.to_order_book()))
}
fn update_bids(&mut self, price: Decimal, size: Decimal) {
let level = Level {
price,
amount: size,
};
self.bids
.upsert_single(level, |existing| existing.price.cmp(&level.price).reverse());
}
fn update_asks(&mut self, price: Decimal, size: Decimal) {
let level = Level {
price,
amount: size,
};
self.asks
.upsert_single(level, |existing| existing.price.cmp(&level.price));
}
fn to_order_book(&self) -> OrderBook {
OrderBook::from_sides(self.sequence, None, self.bids.clone(), self.asks.clone())
}
pub fn clear(&mut self) {
self.bids = OrderBookSide::default();
self.asks = OrderBookSide::default();
self.sequence = 0;
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use rust_decimal_macros::dec;
fn depth(side: i32, operation: i32, price: f64, size: f64) -> MarketDepths {
MarketDepths::MarketDepth(MarketDepth {
position: 0,
operation,
side,
price,
size,
})
}
#[test]
fn insert_bid() {
let mut agg = DepthAggregator::new();
let event = agg.update(&depth(1, 0, 100.0, 10.0)).unwrap();
match event {
OrderBookEvent::Snapshot(book) => {
assert_eq!(book.bids().levels().len(), 1);
assert_eq!(book.bids().levels()[0].price, dec!(100));
assert_eq!(book.bids().levels()[0].amount, dec!(10));
assert!(book.asks().levels().is_empty());
}
_ => panic!("Expected Snapshot"),
}
}
#[test]
fn insert_ask() {
let mut agg = DepthAggregator::new();
let event = agg.update(&depth(0, 0, 101.0, 5.0)).unwrap();
match event {
OrderBookEvent::Snapshot(book) => {
assert_eq!(book.asks().levels().len(), 1);
assert_eq!(book.asks().levels()[0].price, dec!(101));
assert_eq!(book.asks().levels()[0].amount, dec!(5));
assert!(book.bids().levels().is_empty());
}
_ => panic!("Expected Snapshot"),
}
}
#[test]
fn update_level() {
let mut agg = DepthAggregator::new();
agg.update(&depth(1, 0, 100.0, 10.0));
let event = agg.update(&depth(1, 1, 100.0, 15.0)).unwrap();
match event {
OrderBookEvent::Snapshot(book) => {
assert_eq!(book.bids().levels().len(), 1);
assert_eq!(book.bids().levels()[0].amount, dec!(15));
}
_ => panic!("Expected Snapshot"),
}
}
#[test]
fn delete_level() {
let mut agg = DepthAggregator::new();
agg.update(&depth(1, 0, 100.0, 10.0));
agg.update(&depth(1, 0, 99.0, 5.0));
let event = agg.update(&depth(1, 2, 100.0, 0.0)).unwrap();
match event {
OrderBookEvent::Snapshot(book) => {
assert_eq!(book.bids().levels().len(), 1);
assert_eq!(book.bids().levels()[0].price, dec!(99));
}
_ => panic!("Expected Snapshot"),
}
}
#[test]
fn multiple_levels() {
let mut agg = DepthAggregator::new();
agg.update(&depth(1, 0, 100.0, 10.0));
agg.update(&depth(1, 0, 99.0, 20.0));
agg.update(&depth(0, 0, 101.0, 5.0));
let event = agg.update(&depth(0, 0, 102.0, 3.0)).unwrap();
match event {
OrderBookEvent::Snapshot(book) => {
assert_eq!(book.bids().levels().len(), 2);
assert_eq!(book.asks().levels().len(), 2);
let bids = book.bids().levels();
assert!(
bids[0].price > bids[1].price,
"bids should be sorted descending: {:?}",
bids
);
let asks = book.asks().levels();
assert!(
asks[0].price < asks[1].price,
"asks should be sorted ascending: {:?}",
asks
);
}
_ => panic!("Expected Snapshot"),
}
}
#[test]
fn notice_ignored() {
let mut agg = DepthAggregator::new();
let notice = MarketDepths::Notice(ibapi::messages::Notice {
code: 2100,
message: "test notice".to_string(),
error_time: None,
});
let result = agg.update(¬ice);
assert!(result.is_none());
}
#[test]
fn invalid_price_skipped() {
let mut agg = DepthAggregator::new();
let result = agg.update(&depth(1, 0, f64::NAN, 10.0));
assert!(result.is_none());
let result = agg.update(&depth(1, 0, f64::INFINITY, 10.0));
assert!(result.is_none());
let result = agg.update(&depth(1, 0, 100.0, 10.0));
assert!(result.is_some());
}
#[test]
fn delete_with_nan_size_removes_level() {
let mut agg = DepthAggregator::new();
agg.update(&depth(1, 0, 100.0, 10.0));
let event = agg.update(&depth(1, 2, 100.0, f64::NAN)).unwrap();
match event {
OrderBookEvent::Snapshot(book) => {
assert!(
book.bids().levels().is_empty(),
"Delete with NaN size should remove level"
);
}
_ => panic!("Expected Snapshot"),
}
}
#[test]
fn clear_resets_state() {
let mut agg = DepthAggregator::new();
agg.update(&depth(1, 0, 100.0, 10.0));
agg.update(&depth(1, 0, 99.0, 5.0));
agg.update(&depth(0, 0, 101.0, 8.0));
assert_eq!(agg.sequence, 3);
agg.clear();
assert_eq!(agg.sequence, 0, "sequence should reset to 0");
assert!(agg.bids.levels().is_empty(), "bids should be empty");
assert!(agg.asks.levels().is_empty(), "asks should be empty");
let event = agg.update(&depth(1, 0, 50.0, 1.0)).unwrap();
match event {
OrderBookEvent::Snapshot(book) => {
assert_eq!(book.sequence(), 1);
assert_eq!(book.bids().levels().len(), 1);
}
_ => panic!("Expected Snapshot"),
}
}
}