atelier_data 0.0.15

Data Artifacts and I/O for the atelier-rs engine
//! src/orderbooks/sync.rs
//!
//! Time-synchronized orderbook snapshot sampling.
//!
//! Produces uniformly-spaced `Orderbook` snapshots from an irregular stream
//! of full-book updates by selecting the most recent snapshot at each discrete
//! grid point.
//!
//! # Model
//!
//! ```text
//!   stream:  S₁  S₂     S₃  S₄  S₅        S₆   S₇
//!   time: ───┼───┼───────┼───┼───┼──────────┼────┼──→
//!   grid: ───────|───────────|───────────|───────────|
//!            t₀         t₁         t₂         t₃
//!
//!   output:       S₂          S₅          S₇
//!                (@ t₁)      (@ t₂)      (@ t₃ via finalize)
//! ```
//!
//! At each grid boundary, the **last snapshot received before the crossing**
//! is emitted with its `orderbook_ts` reassigned to the grid-aligned
//! nanosecond timestamp. Gaps (periods with no updates) are forward-filled
//! from the previous snapshot.

use crate::{
    Level, OrderSide,
    orderbooks::{Orderbook, OrderbookDelta},
    utils::decimal_to_f64,
};

use std::collections::HashMap;
use tracing::warn;

/// Extract the current book state from an `OrderbookDelta` as `Vec<Level>` pairs.
///
/// Converts the delta manager's `BTreeMap<Decimal, Decimal>` representation
/// into proper `Level` structs with indexed level IDs. Orders vectors are
/// left empty since the manager tracks aggregate price/size only.
pub fn capture_levels(ob: &OrderbookDelta) -> (Vec<Level>, Vec<Level>) {
    let bids: Vec<Level> = ob
        .top_bids(ob.bid_depth())
        .iter()
        .enumerate()
        .map(|(idx, (price, size))| {
            Level::new(
                idx as u32,
                OrderSide::Bids,
                decimal_to_f64(*price),
                decimal_to_f64(*size),
                vec![],
            )
        })
        .collect();

    let asks: Vec<Level> = ob
        .top_asks(ob.ask_depth())
        .iter()
        .enumerate()
        .map(|(idx, (price, size))| {
            Level::new(
                idx as u32,
                OrderSide::Asks,
                decimal_to_f64(*price),
                decimal_to_f64(*size),
                vec![],
            )
        })
        .collect();

    (bids, asks)
}

/// Maximum number of periods to forward-fill during a gap.
/// Prevents unbounded memory growth from network disconnects.
const MAX_GAP_FILL: u64 = 10_000;

/// Produces uniformly-spaced `Orderbook` snapshots from an irregular update stream.
///
/// Operates on a discrete time grid with spacing `period_ns`. On each incoming
/// update, checks whether one or more grid points have been crossed since the
/// last update for that symbol. If so, captures the current book state as a
/// full `Orderbook` for each crossed grid point.
///
/// Each buffered `Orderbook` has its `orderbook_ts` set to the grid-aligned
/// nanosecond timestamp (i.e. `period_index × period_ns`).
///
/// # Timestamp Resolution
///
/// Exchange timestamps from exchange's millisecond resolution. The effective
/// minimum period is therefore 1ms = 1_000_000 ns. Periods finer than 1ms
/// will still work but cannot distinguish sub-millisecond ordering.
pub struct ObSynchronizer {
    /// Grid spacing in nanoseconds
    pub period_ns: u64,
    /// Last completed period index per symbol
    pub last_period: HashMap<String, u64>,
    /// Most recent snapshot per symbol (updated every message).
    last_snapshot: HashMap<String, Orderbook>,
    /// Buffered full orderbook snapshots awaiting flush
    pub buffer: Vec<Orderbook>,
    /// Lifetime capture count (across flushes)
    pub total_captured: usize,
}

impl ObSynchronizer {
    pub fn new(period_ns: u64) -> Self {
        assert!(period_ns > 0, "period_ns must be positive");
        Self {
            period_ns,
            last_period: HashMap::new(),
            last_snapshot: HashMap::new(),
            buffer: Vec::new(),
            total_captured: 0,
        }
    }

    /// Process an incoming update. Call this BEFORE applying the delta to the
    /// orderbook so the captured state reflects the last update in the
    /// completed period, not the first update in the new one.
    ///
    /// `capture_fn` is invoked lazily — only when a grid boundary is crossed —
    /// and returns `(bids, asks)` as `Vec<Level>`. The synchronizer then
    /// constructs full `Orderbook` objects for each crossed grid point.
    ///
    /// Returns the number of snapshots captured (0 if still within the same period).
    pub fn on_update<F>(
        &mut self,
        symbol: &str,
        exchange: &str,
        exchange_ts_ms: u64,
        capture_fn: F,
    ) -> usize
    where
        F: FnOnce() -> (Vec<Level>, Vec<Level>),
    {
        let ts_ns = exchange_ts_ms * 1_000_000;
        let current_period = ts_ns / self.period_ns;

        let prev_period = match self.last_period.get(symbol) {
            Some(&p) => p,
            None => {
                // First update for this symbol — initialize, no capture yet
                self.last_period.insert(symbol.to_string(), current_period);
                return 0;
            }
        };

        if current_period <= prev_period {
            return 0;
        }

        // We crossed at least one grid boundary. Capture the current state.
        let gap = current_period - prev_period;
        let (bids, asks) = capture_fn();

        let fill_count = gap.min(MAX_GAP_FILL);
        if gap > MAX_GAP_FILL {
            warn!(
                "[{}] Gap of {} periods exceeds MAX_GAP_FILL ({}), \
                 filling last {} only",
                symbol, gap, MAX_GAP_FILL, MAX_GAP_FILL
            );
        }

        // Build a full Orderbook for each crossed grid point.
        // All share the same book state; only `orderbook_ts` differs.
        let start = prev_period + 1;
        let end = prev_period + fill_count; // inclusive
        for p in start..=end {
            self.buffer.push(Orderbook::new(
                0,
                p * self.period_ns,
                symbol.to_string(),
                exchange.to_string(),
                bids.clone(),
                asks.clone(),
            ));
        }

        self.last_period.insert(symbol.to_string(), current_period);
        self.total_captured += fill_count as usize;
        fill_count as usize
    }

    /// Feed a full-book snapshot received from the exchange.
    ///
    /// `exchange_ts_ms` is the exchange-reported timestamp in **milliseconds**
    /// (i.e. `BybitOrderbookResponse.orderbook_ts`).
    ///
    /// If one or more grid boundaries have been crossed since the previous
    /// call for this symbol, the **previously stored** snapshot is emitted
    /// for each crossed grid point (forward-filled for gaps).
    ///
    /// Returns the number of grid-aligned snapshots appended to the buffer
    /// (0 if still within the same period).
    pub fn on_snapshot(
        &mut self,
        symbol: &str,
        exchange_ts_ms: u64,
        snapshot: Orderbook,
    ) -> usize {
        let ts_ns = exchange_ts_ms as u128 * 1_000_000;
        let current_period = (ts_ns / self.period_ns as u128) as u64;

        let prev_period = match self.last_period.get(symbol) {
            Some(&p) => p,
            None => {
                // First update for this symbol — anchor the grid.
                self.last_period.insert(symbol.to_string(), current_period);
                self.last_snapshot.insert(symbol.to_string(), snapshot);
                return 0;
            }
        };

        if current_period <= prev_period {
            // Same period — just refresh the stored snapshot.
            self.last_snapshot.insert(symbol.to_string(), snapshot);
            return 0;
        }

        // Grid boundary crossed
        let gap = current_period - prev_period;
        let fill_count = gap.min(MAX_GAP_FILL);

        if gap > MAX_GAP_FILL {
            warn!(
                "[{}] gap of {} periods exceeds MAX_GAP_FILL ({}), capping",
                symbol, gap, MAX_GAP_FILL,
            );
        }

        // Emit the previous snapshot at each crossed grid point.
        if let Some(prev_ob) = self.last_snapshot.get(symbol) {
            for p in (prev_period + 1)..=(prev_period + fill_count) {
                let mut ob = prev_ob.clone();
                ob.orderbook_ts = p * self.period_ns;
                self.buffer.push(ob);
            }
        }

        self.last_period.insert(symbol.to_string(), current_period);
        self.last_snapshot.insert(symbol.to_string(), snapshot);
        self.total_captured += fill_count as usize;
        fill_count as usize
    }

    /// Emit the final snapshot for every tracked symbol, closing the
    /// current (incomplete) period. Call when the stream ends.
    pub fn finalize(&mut self) {
        let periods: Vec<(String, u64)> = self
            .last_period
            .iter()
            .map(|(s, &p)| (s.clone(), p))
            .collect();

        for (symbol, period) in periods {
            if let Some(ob) = self.last_snapshot.get(&symbol) {
                let mut final_ob = ob.clone();
                final_ob.orderbook_ts = (period + 1) * self.period_ns;
                self.buffer.push(final_ob);
                self.total_captured += 1;
            }
        }
    }

    #[inline]
    pub fn buffer_len(&self) -> usize {
        self.buffer.len()
    }

    #[inline]
    pub fn total_captured(&self) -> usize {
        self.total_captured
    }

    /// Drain the buffer, returning all accumulated snapshots.
    pub fn drain(&mut self) -> Vec<Orderbook> {
        std::mem::take(&mut self.buffer)
    }

    /// Drain the buffer and write all accumulated snapshots to Parquet.
    #[cfg(feature = "parquet")]
    pub fn flush_to_parquet(
        &mut self,
        output_dir: &std::path::Path,
    ) -> anyhow::Result<std::path::PathBuf> {
        use crate::orderbooks::io::ob_parquet::write_ob_parquet;
        let snapshots = std::mem::take(&mut self.buffer);
        write_ob_parquet(&snapshots, output_dir, "sync")
    }
}