use crate::{
funding::FundingRate, liquidations::Liquidation, open_interest::OpenInterest,
orderbooks::Orderbook, snapshots::MarketSnapshot, trades::Trade,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ReferenceEventType {
Orderbook,
Trade,
Liquidation,
}
pub struct EventSynchronizer {
reference_events: Vec<ReferenceEventType>,
initialized: bool,
last_orderbook: Option<Orderbook>,
current_trades: Vec<Trade>,
current_liquidations: Vec<Liquidation>,
current_funding_rates: Vec<FundingRate>,
current_open_interests: Vec<OpenInterest>,
pub buffer: Vec<MarketSnapshot>,
pub total_captured: usize,
}
impl EventSynchronizer {
pub fn new(reference_events: Vec<ReferenceEventType>) -> Self {
assert!(
!reference_events.is_empty(),
"at least one reference event type is required"
);
Self {
reference_events,
initialized: false,
last_orderbook: None,
current_trades: Vec::new(),
current_liquidations: Vec::new(),
current_funding_rates: Vec::new(),
current_open_interests: Vec::new(),
buffer: Vec::new(),
total_captured: 0,
}
}
pub fn orderbook_only() -> Self {
Self::new(vec![ReferenceEventType::Orderbook])
}
pub fn trade_only() -> Self {
Self::new(vec![ReferenceEventType::Trade])
}
pub fn liquidation_only() -> Self {
Self::new(vec![ReferenceEventType::Liquidation])
}
pub fn all_events() -> Self {
Self::new(vec![
ReferenceEventType::Orderbook,
ReferenceEventType::Trade,
ReferenceEventType::Liquidation,
])
}
#[inline]
fn is_reference(&self, event_type: ReferenceEventType) -> bool {
self.reference_events.contains(&event_type)
}
fn emit_snapshot(&mut self, ts_ns: u64) {
let snap = MarketSnapshot {
ts_ns,
orderbook: self.last_orderbook.clone().map(|mut ob| {
ob.orderbook_ts = ts_ns;
ob
}),
trades: std::mem::take(&mut self.current_trades),
liquidations: std::mem::take(&mut self.current_liquidations),
funding_rate: self.current_funding_rates.clone(),
open_interest: self.current_open_interests.clone(),
};
self.buffer.push(snap);
self.total_captured += 1;
}
pub fn on_orderbook(&mut self, _symbol: &str, ts_ns: u64, snapshot: Orderbook) -> usize {
self.last_orderbook = Some(snapshot);
if !self.initialized {
self.initialized = true;
}
if self.is_reference(ReferenceEventType::Orderbook) {
self.emit_snapshot(ts_ns);
return 1;
}
0
}
pub fn on_trade(&mut self, trade: Trade) -> usize {
let ts_ns = trade.trade_ts * 1_000_000; self.current_trades.push(trade);
if self.initialized && self.is_reference(ReferenceEventType::Trade) {
self.emit_snapshot(ts_ns);
return 1;
}
0
}
pub fn on_liquidation(&mut self, liq: Liquidation) -> usize {
let ts_ns = liq.liquidation_ts * 1_000_000; self.current_liquidations.push(liq);
if self.initialized && self.is_reference(ReferenceEventType::Liquidation) {
self.emit_snapshot(ts_ns);
return 1;
}
0
}
pub fn on_funding(&mut self, fr: FundingRate) -> usize {
self.current_funding_rates = vec![fr];
0
}
pub fn on_open_interest(&mut self, oi: OpenInterest) -> usize {
self.current_open_interests = vec![oi];
0
}
pub fn finalize(&mut self) {
if !self.initialized {
return;
}
if !self.current_trades.is_empty() || !self.current_liquidations.is_empty() {
let ts_ns = self
.last_orderbook
.as_ref()
.map(|ob| ob.orderbook_ts)
.unwrap_or(0);
self.emit_snapshot(ts_ns);
}
}
pub fn drain(&mut self) -> Vec<MarketSnapshot> {
std::mem::take(&mut self.buffer)
}
#[inline]
pub fn buffer_len(&self) -> usize {
self.buffer.len()
}
#[inline]
pub fn total_captured(&self) -> usize {
self.total_captured
}
}