rangebar_core/
checkpoint.rs

1//! Checkpoint system for cross-file range bar continuation
2//!
3//! Enables seamless processing across file boundaries by serializing
4//! incomplete bar state with IMMUTABLE thresholds.
5//!
6//! ## Primary Use Case
7//!
8//! ```text
9//! File 1 ends with incomplete bar → save Checkpoint
10//! File 2 starts → load Checkpoint → continue building bar
11//! ```
12//!
13//! ## Key Invariants
14//!
15//! - Thresholds are computed from bar.open and are IMMUTABLE for bar's lifetime
16//! - `bar[i+1].open == bar[i].close` continuity preserved across files
17//! - Works with both Binance (has agg_trade_id) and Exness (timestamp-only)
18
19use crate::fixed_point::FixedPoint;
20use crate::types::RangeBar;
21use ahash::AHasher;
22use serde::{Deserialize, Serialize};
23use std::hash::Hasher;
24use thiserror::Error;
25
26/// Price window size for hash calculation (last N prices)
27const PRICE_WINDOW_SIZE: usize = 8;
28
29/// Checkpoint for cross-file range bar continuation
30///
31/// Enables seamless processing across any file boundaries (Binance daily, Exness monthly).
32/// Captures minimal state needed to continue building an incomplete bar.
33///
34/// # Example
35///
36/// ```ignore
37/// // Process first file
38/// let bars_1 = processor.process_agg_trade_records(&file1_trades)?;
39/// let checkpoint = processor.create_checkpoint("BTCUSDT");
40///
41/// // Serialize and save checkpoint
42/// let json = serde_json::to_string(&checkpoint)?;
43/// std::fs::write("checkpoint.json", json)?;
44///
45/// // ... later, load checkpoint and continue processing ...
46/// let json = std::fs::read_to_string("checkpoint.json")?;
47/// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
48/// let mut processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
49/// let bars_2 = processor.process_agg_trade_records(&file2_trades)?;
50/// ```
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct Checkpoint {
53    // === IDENTIFICATION (2 fields) ===
54    /// Symbol being processed (e.g., "BTCUSDT", "EURUSD")
55    pub symbol: String,
56
57    /// Threshold in decimal basis points (v3.0.0+: 0.1bps units)
58    /// Example: 250 = 25bps = 0.25%
59    pub threshold_decimal_bps: u32,
60
61    // === BAR STATE (2 fields) ===
62    /// Incomplete bar at file boundary (None = last bar completed cleanly)
63    /// REUSES existing RangeBar type - no separate BarState needed!
64    pub incomplete_bar: Option<RangeBar>,
65
66    /// Fixed thresholds for incomplete bar (computed from bar.open, IMMUTABLE)
67    /// Stored as (upper_threshold, lower_threshold)
68    pub thresholds: Option<(FixedPoint, FixedPoint)>,
69
70    // === POSITION TRACKING (2 fields) ===
71    /// Last processed timestamp in microseconds (universal, works for all sources)
72    pub last_timestamp_us: i64,
73
74    /// Last trade ID (Some for Binance, None for Exness)
75    /// Binance: agg_trade_id is strictly sequential, never resets
76    pub last_trade_id: Option<i64>,
77
78    // === INTEGRITY (1 field) ===
79    /// Price window hash (ahash of last 8 prices for position verification)
80    /// Used to verify we're resuming at the correct position in data stream
81    pub price_hash: u64,
82
83    // === MONITORING (1 field) ===
84    /// Anomaly summary counts for debugging
85    pub anomaly_summary: AnomalySummary,
86}
87
88impl Checkpoint {
89    /// Create a new checkpoint with the given parameters
90    pub fn new(
91        symbol: String,
92        threshold_decimal_bps: u32,
93        incomplete_bar: Option<RangeBar>,
94        thresholds: Option<(FixedPoint, FixedPoint)>,
95        last_timestamp_us: i64,
96        last_trade_id: Option<i64>,
97        price_hash: u64,
98    ) -> Self {
99        Self {
100            symbol,
101            threshold_decimal_bps,
102            incomplete_bar,
103            thresholds,
104            last_timestamp_us,
105            last_trade_id,
106            price_hash,
107            anomaly_summary: AnomalySummary::default(),
108        }
109    }
110
111    /// Check if there's an incomplete bar that needs to continue
112    pub fn has_incomplete_bar(&self) -> bool {
113        self.incomplete_bar.is_some()
114    }
115
116    /// Get the library version that created this checkpoint
117    pub fn library_version() -> &'static str {
118        env!("CARGO_PKG_VERSION")
119    }
120}
121
122/// Anomaly summary for quick inspection (counts only)
123///
124/// Tracks anomalies detected during processing for debugging purposes.
125/// Does NOT affect processing - purely for monitoring.
126#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
127pub struct AnomalySummary {
128    /// Number of gaps detected (missing trade IDs or timestamp jumps)
129    pub gaps_detected: u32,
130
131    /// Number of overlaps detected (duplicate or out-of-order data)
132    pub overlaps_detected: u32,
133
134    /// Number of timestamp anomalies (negative intervals, etc.)
135    pub timestamp_anomalies: u32,
136}
137
138impl AnomalySummary {
139    /// Increment gap counter
140    pub fn record_gap(&mut self) {
141        self.gaps_detected += 1;
142    }
143
144    /// Increment overlap counter
145    pub fn record_overlap(&mut self) {
146        self.overlaps_detected += 1;
147    }
148
149    /// Increment timestamp anomaly counter
150    pub fn record_timestamp_anomaly(&mut self) {
151        self.timestamp_anomalies += 1;
152    }
153
154    /// Check if any anomalies were detected
155    pub fn has_anomalies(&self) -> bool {
156        self.gaps_detected > 0 || self.overlaps_detected > 0 || self.timestamp_anomalies > 0
157    }
158
159    /// Get total anomaly count
160    pub fn total(&self) -> u32 {
161        self.gaps_detected + self.overlaps_detected + self.timestamp_anomalies
162    }
163}
164
165/// Position verification result when resuming from checkpoint
166#[derive(Debug, Clone, PartialEq)]
167pub enum PositionVerification {
168    /// Trade ID matches expected (Binance: last_id + 1)
169    Exact,
170
171    /// Trade ID gap detected (Binance only)
172    /// Contains expected_id, actual_id, and count of missing trades
173    Gap {
174        expected_id: i64,
175        actual_id: i64,
176        missing_count: i64,
177    },
178
179    /// No trade ID available, timestamp check only (Exness)
180    /// Contains gap in milliseconds since last checkpoint
181    TimestampOnly { gap_ms: i64 },
182}
183
184impl PositionVerification {
185    /// Check if position verification indicates a data gap
186    pub fn has_gap(&self) -> bool {
187        matches!(self, PositionVerification::Gap { .. })
188    }
189}
190
191/// Checkpoint-related errors
192#[derive(Error, Debug, Clone, PartialEq)]
193pub enum CheckpointError {
194    /// Symbol mismatch between checkpoint and processor
195    #[error("Symbol mismatch: checkpoint has '{checkpoint}', expected '{expected}'")]
196    SymbolMismatch {
197        checkpoint: String,
198        expected: String,
199    },
200
201    /// Threshold mismatch between checkpoint and processor
202    #[error(
203        "Threshold mismatch: checkpoint has {checkpoint} decimal bps, expected {expected} decimal bps"
204    )]
205    ThresholdMismatch { checkpoint: u32, expected: u32 },
206
207    /// Price hash mismatch indicates wrong position in data stream
208    #[error("Price hash mismatch: checkpoint has {checkpoint}, computed {computed}")]
209    PriceHashMismatch { checkpoint: u64, computed: u64 },
210
211    /// Checkpoint has incomplete bar but no thresholds
212    #[error("Checkpoint has incomplete bar but missing thresholds - corrupted checkpoint")]
213    MissingThresholds,
214
215    /// Checkpoint serialization/deserialization error
216    #[error("Checkpoint serialization error: {message}")]
217    SerializationError { message: String },
218}
219
220/// Price window for computing position verification hash
221///
222/// Maintains a circular buffer of the last N prices for hash computation.
223#[derive(Debug, Clone)]
224pub struct PriceWindow {
225    prices: [i64; PRICE_WINDOW_SIZE],
226    index: usize,
227    count: usize,
228}
229
230impl Default for PriceWindow {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236impl PriceWindow {
237    /// Create a new empty price window
238    pub fn new() -> Self {
239        Self {
240            prices: [0; PRICE_WINDOW_SIZE],
241            index: 0,
242            count: 0,
243        }
244    }
245
246    /// Add a price to the window (circular buffer)
247    pub fn push(&mut self, price: FixedPoint) {
248        self.prices[self.index] = price.0;
249        self.index = (self.index + 1) % PRICE_WINDOW_SIZE;
250        if self.count < PRICE_WINDOW_SIZE {
251            self.count += 1;
252        }
253    }
254
255    /// Compute hash of the price window using ahash
256    ///
257    /// Returns a 64-bit hash that can be used to verify position in data stream.
258    pub fn compute_hash(&self) -> u64 {
259        let mut hasher = AHasher::default();
260
261        // Hash prices in order they were added (oldest to newest)
262        if self.count < PRICE_WINDOW_SIZE {
263            // Buffer not full yet - hash from start
264            for i in 0..self.count {
265                hasher.write_i64(self.prices[i]);
266            }
267        } else {
268            // Buffer full - hash from current index (oldest) around
269            for i in 0..PRICE_WINDOW_SIZE {
270                let idx = (self.index + i) % PRICE_WINDOW_SIZE;
271                hasher.write_i64(self.prices[idx]);
272            }
273        }
274
275        hasher.finish()
276    }
277
278    /// Get the number of prices in the window
279    pub fn len(&self) -> usize {
280        self.count
281    }
282
283    /// Check if the window is empty
284    pub fn is_empty(&self) -> bool {
285        self.count == 0
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn test_checkpoint_creation() {
295        let checkpoint = Checkpoint::new(
296            "BTCUSDT".to_string(),
297            250, // 25bps
298            None,
299            None,
300            1640995200000000, // timestamp in microseconds
301            Some(12345),
302            0,
303        );
304
305        assert_eq!(checkpoint.symbol, "BTCUSDT");
306        assert_eq!(checkpoint.threshold_decimal_bps, 250);
307        assert!(!checkpoint.has_incomplete_bar());
308        assert_eq!(checkpoint.last_trade_id, Some(12345));
309    }
310
311    #[test]
312    fn test_checkpoint_serialization() {
313        let checkpoint = Checkpoint::new(
314            "EURUSD".to_string(),
315            10, // 1bps
316            None,
317            None,
318            1640995200000000,
319            None, // Exness has no trade IDs
320            12345678,
321        );
322
323        // Serialize to JSON
324        let json = serde_json::to_string(&checkpoint).unwrap();
325        assert!(json.contains("EURUSD"));
326        assert!(json.contains("\"threshold_decimal_bps\":10"));
327
328        // Deserialize back
329        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
330        assert_eq!(restored.symbol, "EURUSD");
331        assert_eq!(restored.threshold_decimal_bps, 10);
332        assert_eq!(restored.price_hash, 12345678);
333    }
334
335    #[test]
336    fn test_anomaly_summary() {
337        let mut summary = AnomalySummary::default();
338        assert!(!summary.has_anomalies());
339        assert_eq!(summary.total(), 0);
340
341        summary.record_gap();
342        summary.record_gap();
343        summary.record_timestamp_anomaly();
344
345        assert!(summary.has_anomalies());
346        assert_eq!(summary.gaps_detected, 2);
347        assert_eq!(summary.timestamp_anomalies, 1);
348        assert_eq!(summary.total(), 3);
349    }
350
351    #[test]
352    fn test_price_window() {
353        let mut window = PriceWindow::new();
354        assert!(window.is_empty());
355
356        // Add some prices
357        window.push(FixedPoint(5000000000000)); // 50000.0
358        window.push(FixedPoint(5001000000000)); // 50010.0
359        window.push(FixedPoint(5002000000000)); // 50020.0
360
361        assert_eq!(window.len(), 3);
362        assert!(!window.is_empty());
363
364        let hash1 = window.compute_hash();
365
366        // Same prices should produce same hash
367        let mut window2 = PriceWindow::new();
368        window2.push(FixedPoint(5000000000000));
369        window2.push(FixedPoint(5001000000000));
370        window2.push(FixedPoint(5002000000000));
371
372        let hash2 = window2.compute_hash();
373        assert_eq!(hash1, hash2);
374
375        // Different prices should produce different hash
376        let mut window3 = PriceWindow::new();
377        window3.push(FixedPoint(5000000000000));
378        window3.push(FixedPoint(5001000000000));
379        window3.push(FixedPoint(5003000000000)); // Different!
380
381        let hash3 = window3.compute_hash();
382        assert_ne!(hash1, hash3);
383    }
384
385    #[test]
386    fn test_price_window_circular() {
387        let mut window = PriceWindow::new();
388
389        // Fill the window beyond capacity
390        for i in 0..12 {
391            window.push(FixedPoint(i * 100000000));
392        }
393
394        // Should only contain last 8 prices
395        assert_eq!(window.len(), PRICE_WINDOW_SIZE);
396
397        // Hash should be consistent
398        let hash1 = window.compute_hash();
399        let hash2 = window.compute_hash();
400        assert_eq!(hash1, hash2);
401    }
402
403    #[test]
404    fn test_position_verification() {
405        let exact = PositionVerification::Exact;
406        assert!(!exact.has_gap());
407
408        let gap = PositionVerification::Gap {
409            expected_id: 100,
410            actual_id: 105,
411            missing_count: 5,
412        };
413        assert!(gap.has_gap());
414
415        let timestamp_only = PositionVerification::TimestampOnly { gap_ms: 1000 };
416        assert!(!timestamp_only.has_gap());
417    }
418
419    #[test]
420    fn test_library_version() {
421        let version = Checkpoint::library_version();
422        // Should be a valid semver string
423        assert!(version.contains('.'));
424        println!("Library version: {}", version);
425    }
426}