use crate::{
funding::FundingRate, liquidations::Liquidation, open_interest::OpenInterest,
orderbooks::Orderbook, snapshots::MarketSnapshot, trades::Trade,
};
use std::collections::HashMap;
use tracing::warn;
const MAX_GAP_FILL: u64 = 10_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ClockMode {
OrderbookDriven,
TradeDriven,
LiquidationDriven,
ExternalClock,
}
#[derive(Debug, Clone, Default)]
pub struct FlushResult {
pub orderbook_path: Option<std::path::PathBuf>,
pub trades_path: Option<std::path::PathBuf>,
pub liquidations_path: Option<std::path::PathBuf>,
pub funding_path: Option<std::path::PathBuf>,
pub open_interest_path: Option<std::path::PathBuf>,
pub snapshot_count: usize,
}
pub struct MarketSynchronizer {
pub period_ns: u64,
clock_mode: ClockMode,
last_period: HashMap<String, u64>,
clock_period: Option<u64>,
last_orderbook: HashMap<String, Orderbook>,
current_trades: Vec<Trade>,
current_liquidations: Vec<Liquidation>,
current_funding_rates: Vec<FundingRate>,
current_open_interests: Vec<OpenInterest>,
pub buffer: Vec<MarketSnapshot>,
pub total_captured: usize,
}
impl MarketSynchronizer {
pub fn new(period_ns: u64) -> Self {
Self::with_clock_mode(period_ns, ClockMode::OrderbookDriven)
}
pub fn with_clock_mode(period_ns: u64, clock_mode: ClockMode) -> Self {
assert!(period_ns > 0, "period_ns must be positive");
Self {
period_ns,
clock_mode,
last_period: HashMap::new(),
clock_period: None,
last_orderbook: HashMap::new(),
current_trades: Vec::new(),
current_liquidations: Vec::new(),
current_funding_rates: Vec::new(),
current_open_interests: Vec::new(),
buffer: Vec::new(),
total_captured: 0,
}
}
#[inline]
pub fn external_clock(period_ns: u64) -> Self {
Self::with_clock_mode(period_ns, ClockMode::ExternalClock)
}
#[inline]
pub fn trade_driven(period_ns: u64) -> Self {
Self::with_clock_mode(period_ns, ClockMode::TradeDriven)
}
#[inline]
pub fn liquidation_driven(period_ns: u64) -> Self {
Self::with_clock_mode(period_ns, ClockMode::LiquidationDriven)
}
#[inline]
pub fn clock_mode(&self) -> ClockMode {
self.clock_mode
}
fn advance_grid(&mut self, ts_ns: u128) -> usize {
let current_period = (ts_ns / self.period_ns as u128) as u64;
let prev_period = match self.clock_period {
Some(p) => p,
None => {
self.clock_period = Some(current_period);
return 0;
}
};
if current_period <= prev_period {
return 0;
}
let gap = current_period - prev_period;
let fill_count = gap.min(MAX_GAP_FILL);
if gap > MAX_GAP_FILL {
warn!(
"[clock] gap of {} periods exceeds MAX_GAP_FILL ({}), capping",
gap, MAX_GAP_FILL,
);
}
let latest_ob = self.last_orderbook.values().next().cloned();
let trades = std::mem::take(&mut self.current_trades);
let liquidations = std::mem::take(&mut self.current_liquidations);
let funding_rates = std::mem::take(&mut self.current_funding_rates);
let open_interests = std::mem::take(&mut self.current_open_interests);
for p in (prev_period + 1)..=(prev_period + fill_count) {
let ts = p * self.period_ns;
let snap = if p == prev_period + 1 {
MarketSnapshot {
ts_ns: ts,
orderbook: latest_ob.clone().map(|mut ob| {
ob.orderbook_ts = ts;
ob
}),
trades: trades.clone(),
liquidations: liquidations.clone(),
funding_rate: funding_rates.clone(),
open_interest: open_interests.clone(),
}
} else {
MarketSnapshot {
ts_ns: ts,
orderbook: latest_ob.clone().map(|mut ob| {
ob.orderbook_ts = ts;
ob
}),
trades: Vec::new(),
liquidations: Vec::new(),
funding_rate: Vec::new(),
open_interest: Vec::new(),
}
};
self.buffer.push(snap);
}
self.clock_period = Some(current_period);
self.total_captured += fill_count as usize;
fill_count as usize
}
pub fn on_time(&mut self, ts_ns: u64) -> usize {
if self.clock_mode != ClockMode::ExternalClock {
return 0;
}
self.advance_grid(ts_ns as u128)
}
pub fn on_orderbook(
&mut self,
symbol: &str,
exchange_ts_ms: u64,
snapshot: Orderbook,
) -> usize {
if self.clock_mode != ClockMode::OrderbookDriven {
self.last_orderbook.insert(symbol.to_string(), snapshot);
return 0;
}
let ts_ns = exchange_ts_ms as u128 * 1_000_000;
let current_period = (ts_ns / self.period_ns as u128) as u64;
let prev_period = match self.last_period.get(symbol) {
Some(&p) => p,
None => {
self.last_period.insert(symbol.to_string(), current_period);
self.last_orderbook.insert(symbol.to_string(), snapshot);
return 0;
}
};
if current_period <= prev_period {
self.last_orderbook.insert(symbol.to_string(), snapshot);
return 0;
}
let gap = current_period - prev_period;
let fill_count = gap.min(MAX_GAP_FILL);
if gap > MAX_GAP_FILL {
warn!(
"[{}] gap of {} periods exceeds MAX_GAP_FILL ({}), capping",
symbol, gap, MAX_GAP_FILL,
);
}
let prev_ob = self.last_orderbook.get(symbol).cloned();
let trades = std::mem::take(&mut self.current_trades);
let liquidations = std::mem::take(&mut self.current_liquidations);
let funding_rates = std::mem::take(&mut self.current_funding_rates);
let open_interests = std::mem::take(&mut self.current_open_interests);
for p in (prev_period + 1)..=(prev_period + fill_count) {
let ts = p * self.period_ns;
let snap = if p == prev_period + 1 {
MarketSnapshot {
ts_ns: ts,
orderbook: prev_ob.clone().map(|mut ob| {
ob.orderbook_ts = ts;
ob
}),
trades: trades.clone(),
liquidations: liquidations.clone(),
funding_rate: funding_rates.clone(),
open_interest: open_interests.clone(),
}
} else {
MarketSnapshot {
ts_ns: ts,
orderbook: prev_ob.clone().map(|mut ob| {
ob.orderbook_ts = ts;
ob
}),
trades: Vec::new(),
liquidations: Vec::new(),
funding_rate: Vec::new(),
open_interest: Vec::new(),
}
};
self.buffer.push(snap);
}
self.last_period.insert(symbol.to_string(), current_period);
self.last_orderbook.insert(symbol.to_string(), snapshot);
self.total_captured += fill_count as usize;
fill_count as usize
}
pub fn on_trade(&mut self, trade: Trade) -> usize {
let ts_ms = trade.trade_ts;
self.current_trades.push(trade);
if self.clock_mode == ClockMode::TradeDriven {
self.advance_grid(ts_ms as u128 * 1_000_000)
} else {
0
}
}
pub fn on_liquidation(&mut self, liq: Liquidation) -> usize {
let ts_ms = liq.liquidation_ts;
self.current_liquidations.push(liq);
if self.clock_mode == ClockMode::LiquidationDriven {
self.advance_grid(ts_ms as u128 * 1_000_000)
} else {
0
}
}
pub fn on_funding(&mut self, fr: FundingRate) {
self.current_funding_rates.push(fr);
}
pub fn on_open_interest(&mut self, oi: OpenInterest) {
self.current_open_interests.push(oi);
}
pub fn finalize(&mut self) {
match self.clock_mode {
ClockMode::OrderbookDriven => self.finalize_ob_driven(),
_ => self.finalize_global_clock(),
}
}
fn finalize_ob_driven(&mut self) {
let periods: Vec<(String, u64)> = self
.last_period
.iter()
.map(|(s, &p)| (s.clone(), p))
.collect();
let trades = std::mem::take(&mut self.current_trades);
let liquidations = std::mem::take(&mut self.current_liquidations);
let funding_rates = std::mem::take(&mut self.current_funding_rates);
let open_interests = std::mem::take(&mut self.current_open_interests);
for (symbol, period) in periods {
let ts = (period + 1) * self.period_ns;
let snap = MarketSnapshot {
ts_ns: ts,
orderbook: self.last_orderbook.get(&symbol).cloned().map(|mut ob| {
ob.orderbook_ts = ts;
ob
}),
trades: trades.clone(),
liquidations: liquidations.clone(),
funding_rate: funding_rates.clone(),
open_interest: open_interests.clone(),
};
self.buffer.push(snap);
self.total_captured += 1;
}
}
fn finalize_global_clock(&mut self) {
let period = match self.clock_period {
Some(p) => p,
None => return, };
let ts = (period + 1) * self.period_ns;
let latest_ob = self.last_orderbook.values().next().cloned();
let trades = std::mem::take(&mut self.current_trades);
let liquidations = std::mem::take(&mut self.current_liquidations);
let funding_rates = std::mem::take(&mut self.current_funding_rates);
let open_interests = std::mem::take(&mut self.current_open_interests);
let snap = MarketSnapshot {
ts_ns: ts,
orderbook: latest_ob.map(|mut ob| {
ob.orderbook_ts = ts;
ob
}),
trades,
liquidations,
funding_rate: funding_rates,
open_interest: open_interests,
};
self.buffer.push(snap);
self.total_captured += 1;
}
pub fn drain(&mut self) -> Vec<MarketSnapshot> {
std::mem::take(&mut self.buffer)
}
#[inline]
pub fn buffer_len(&self) -> usize {
self.buffer.len()
}
#[inline]
pub fn total_captured(&self) -> usize {
self.total_captured
}
pub fn market_aggregate(
&mut self,
) -> Vec<crate::snapshots::aggregate::MarketAggregate> {
use crate::snapshots::aggregate::MarketAggregate;
let snapshots = std::mem::take(&mut self.buffer);
let mut aggregates = Vec::with_capacity(snapshots.len());
let mut prev_oi: f64 = 0.0;
for snap in &snapshots {
let agg = MarketAggregate::from_snapshot(snap, prev_oi);
prev_oi = agg.oi_contracts;
aggregates.push(agg);
}
aggregates
}
#[cfg(feature = "parquet")]
pub fn flush_to_parquet(
&mut self,
output_dir: &std::path::Path,
) -> anyhow::Result<FlushResult> {
use crate::{
funding::io::funding_parquet::write_funding_parquet_timestamped,
liquidations::io::liq_parquet::write_liquidations_parquet_timestamped,
open_interest::io::oi_parquet::write_oi_parquet_timestamped,
orderbooks::io::ob_parquet::write_ob_parquet,
trades::io::trades_parquet::write_trades_parquet_timestamped,
};
let snapshots = std::mem::take(&mut self.buffer);
let n = snapshots.len();
let mut orderbooks: Vec<Orderbook> = Vec::new();
let mut trades: Vec<Trade> = Vec::new();
let mut liquidations: Vec<Liquidation> = Vec::new();
let mut funding_rates: Vec<FundingRate> = Vec::new();
let mut open_interests: Vec<OpenInterest> = Vec::new();
for snap in &snapshots {
if let Some(ob) = &snap.orderbook {
orderbooks.push(ob.clone());
}
trades.extend(snap.trades.iter().cloned());
liquidations.extend(snap.liquidations.iter().cloned());
funding_rates.extend(snap.funding_rate.iter().cloned());
open_interests.extend(snap.open_interest.iter().cloned());
}
let mut result = FlushResult {
snapshot_count: n,
..Default::default()
};
if !orderbooks.is_empty() {
let orderbooks_dir = output_dir.join("orderbooks");
std::fs::create_dir_all(&orderbooks_dir)?;
let path = write_ob_parquet(&orderbooks, &orderbooks_dir, "sync")?;
result.orderbook_path = Some(path);
}
if !trades.is_empty() {
let trades_dir = output_dir.join("trades");
std::fs::create_dir_all(&trades_dir)?;
let path = write_trades_parquet_timestamped(&trades, &trades_dir, "sync")?;
result.trades_path = Some(path);
}
if !liquidations.is_empty() {
let liquidations_dir = output_dir.join("liquidations");
std::fs::create_dir_all(&liquidations_dir)?;
let path =
write_liquidations_parquet_timestamped(&liquidations, &liquidations_dir, "sync")?;
result.liquidations_path = Some(path);
}
if !funding_rates.is_empty() {
let fundings_dir = output_dir.join("fundings");
std::fs::create_dir_all(&fundings_dir)?;
let path = write_funding_parquet_timestamped(&funding_rates, &fundings_dir, "sync")?;
result.funding_path = Some(path);
}
if !open_interests.is_empty() {
let open_interests_dir = output_dir.join("open_interests");
std::fs::create_dir_all(&open_interests_dir)?;
let path =
write_oi_parquet_timestamped(&open_interests, &open_interests_dir, "sync")?;
result.open_interest_path = Some(path);
}
Ok(result)
}
#[cfg(not(feature = "parquet"))]
pub fn flush_to_parquet(
&mut self,
_output_dir: &std::path::Path,
) -> Result<FlushResult, crate::errors::PersistError> {
Err(crate::errors::PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn flush_aggregate_to_parquet(
&mut self,
output_dir: &std::path::Path,
) -> Result<std::path::PathBuf, crate::errors::PersistError> {
use crate::snapshots::aggregate::write_market_aggregate_parquet;
let aggregates = self.market_aggregate();
write_market_aggregate_parquet(&aggregates, output_dir)
}
#[cfg(not(feature = "parquet"))]
pub fn flush_aggregate_to_parquet(
&mut self,
_output_dir: &std::path::Path,
) -> Result<std::path::PathBuf, crate::errors::PersistError> {
Err(crate::errors::PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
}