use crate::{
Level, OrderSide,
orderbooks::{Orderbook, OrderbookDelta},
utils::decimal_to_f64,
};
use std::collections::HashMap;
use tracing::warn;
pub fn capture_levels(ob: &OrderbookDelta) -> (Vec<Level>, Vec<Level>) {
let bids: Vec<Level> = ob
.top_bids(ob.bid_depth())
.iter()
.enumerate()
.map(|(idx, (price, size))| {
Level::new(
idx as u32,
OrderSide::Bids,
decimal_to_f64(*price),
decimal_to_f64(*size),
vec![],
)
})
.collect();
let asks: Vec<Level> = ob
.top_asks(ob.ask_depth())
.iter()
.enumerate()
.map(|(idx, (price, size))| {
Level::new(
idx as u32,
OrderSide::Asks,
decimal_to_f64(*price),
decimal_to_f64(*size),
vec![],
)
})
.collect();
(bids, asks)
}
const MAX_GAP_FILL: u64 = 10_000;
pub struct ObSynchronizer {
pub period_ns: u64,
pub last_period: HashMap<String, u64>,
last_snapshot: HashMap<String, Orderbook>,
pub buffer: Vec<Orderbook>,
pub total_captured: usize,
}
impl ObSynchronizer {
pub fn new(period_ns: u64) -> Self {
assert!(period_ns > 0, "period_ns must be positive");
Self {
period_ns,
last_period: HashMap::new(),
last_snapshot: HashMap::new(),
buffer: Vec::new(),
total_captured: 0,
}
}
pub fn on_update<F>(
&mut self,
symbol: &str,
exchange: &str,
exchange_ts_ms: u64,
capture_fn: F,
) -> usize
where
F: FnOnce() -> (Vec<Level>, Vec<Level>),
{
let ts_ns = exchange_ts_ms * 1_000_000;
let current_period = ts_ns / self.period_ns;
let prev_period = match self.last_period.get(symbol) {
Some(&p) => p,
None => {
self.last_period.insert(symbol.to_string(), current_period);
return 0;
}
};
if current_period <= prev_period {
return 0;
}
let gap = current_period - prev_period;
let (bids, asks) = capture_fn();
let fill_count = gap.min(MAX_GAP_FILL);
if gap > MAX_GAP_FILL {
warn!(
"[{}] Gap of {} periods exceeds MAX_GAP_FILL ({}), \
filling last {} only",
symbol, gap, MAX_GAP_FILL, MAX_GAP_FILL
);
}
let start = prev_period + 1;
let end = prev_period + fill_count; for p in start..=end {
self.buffer.push(Orderbook::new(
0,
p * self.period_ns,
symbol.to_string(),
exchange.to_string(),
bids.clone(),
asks.clone(),
));
}
self.last_period.insert(symbol.to_string(), current_period);
self.total_captured += fill_count as usize;
fill_count as usize
}
pub fn on_snapshot(
&mut self,
symbol: &str,
exchange_ts_ms: u64,
snapshot: Orderbook,
) -> usize {
let ts_ns = exchange_ts_ms as u128 * 1_000_000;
let current_period = (ts_ns / self.period_ns as u128) as u64;
let prev_period = match self.last_period.get(symbol) {
Some(&p) => p,
None => {
self.last_period.insert(symbol.to_string(), current_period);
self.last_snapshot.insert(symbol.to_string(), snapshot);
return 0;
}
};
if current_period <= prev_period {
self.last_snapshot.insert(symbol.to_string(), snapshot);
return 0;
}
let gap = current_period - prev_period;
let fill_count = gap.min(MAX_GAP_FILL);
if gap > MAX_GAP_FILL {
warn!(
"[{}] gap of {} periods exceeds MAX_GAP_FILL ({}), capping",
symbol, gap, MAX_GAP_FILL,
);
}
if let Some(prev_ob) = self.last_snapshot.get(symbol) {
for p in (prev_period + 1)..=(prev_period + fill_count) {
let mut ob = prev_ob.clone();
ob.orderbook_ts = p * self.period_ns;
self.buffer.push(ob);
}
}
self.last_period.insert(symbol.to_string(), current_period);
self.last_snapshot.insert(symbol.to_string(), snapshot);
self.total_captured += fill_count as usize;
fill_count as usize
}
pub fn finalize(&mut self) {
let periods: Vec<(String, u64)> = self
.last_period
.iter()
.map(|(s, &p)| (s.clone(), p))
.collect();
for (symbol, period) in periods {
if let Some(ob) = self.last_snapshot.get(&symbol) {
let mut final_ob = ob.clone();
final_ob.orderbook_ts = (period + 1) * self.period_ns;
self.buffer.push(final_ob);
self.total_captured += 1;
}
}
}
#[inline]
pub fn buffer_len(&self) -> usize {
self.buffer.len()
}
#[inline]
pub fn total_captured(&self) -> usize {
self.total_captured
}
pub fn drain(&mut self) -> Vec<Orderbook> {
std::mem::take(&mut self.buffer)
}
#[cfg(feature = "parquet")]
pub fn flush_to_parquet(
&mut self,
output_dir: &std::path::Path,
) -> anyhow::Result<std::path::PathBuf> {
use crate::orderbooks::io::ob_parquet::write_ob_parquet;
let snapshots = std::mem::take(&mut self.buffer);
write_ob_parquet(&snapshots, output_dir, "sync")
}
}