rangebar_core/
checkpoint.rs

1//! Checkpoint system for cross-file range bar continuation
2//!
3//! Enables seamless processing across file boundaries by serializing
4//! incomplete bar state with IMMUTABLE thresholds.
5//!
6//! ## Primary Use Case
7//!
8//! ```text
9//! File 1 ends with incomplete bar → save Checkpoint
10//! File 2 starts → load Checkpoint → continue building bar
11//! ```
12//!
13//! ## Key Invariants
14//!
15//! - Thresholds are computed from bar.open and are IMMUTABLE for bar's lifetime
16//! - Incomplete bar state preserved across file boundaries
17//! - Note: `bar[i+1].open` may differ from `bar[i].close` (next bar opens at first
18//!   tick after previous bar closes, not at the close price itself)
19//! - Works with both Binance (has agg_trade_id) and Exness (timestamp-only)
20
21use crate::fixed_point::FixedPoint;
22use crate::types::RangeBar;
23use ahash::AHasher;
24use serde::{Deserialize, Serialize};
25use std::hash::Hasher;
26use thiserror::Error;
27
28/// Price window size for hash calculation (last N prices)
29const PRICE_WINDOW_SIZE: usize = 8;
30
31/// Checkpoint for cross-file range bar continuation
32///
33/// Enables seamless processing across any file boundaries (Binance daily, Exness monthly).
34/// Captures minimal state needed to continue building an incomplete bar.
35///
36/// # Example
37///
38/// ```ignore
39/// // Process first file
40/// let bars_1 = processor.process_agg_trade_records(&file1_trades)?;
41/// let checkpoint = processor.create_checkpoint("BTCUSDT");
42///
43/// // Serialize and save checkpoint
44/// let json = serde_json::to_string(&checkpoint)?;
45/// std::fs::write("checkpoint.json", json)?;
46///
47/// // ... later, load checkpoint and continue processing ...
48/// let json = std::fs::read_to_string("checkpoint.json")?;
49/// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
50/// let mut processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
51/// let bars_2 = processor.process_agg_trade_records(&file2_trades)?;
52/// ```
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct Checkpoint {
55    // === IDENTIFICATION (2 fields) ===
56    /// Symbol being processed (e.g., "BTCUSDT", "EURUSD")
57    pub symbol: String,
58
59    /// Threshold in decimal basis points (v3.0.0+: 0.1bps units)
60    /// Example: 250 = 25bps = 0.25%
61    pub threshold_decimal_bps: u32,
62
63    // === BAR STATE (2 fields) ===
64    /// Incomplete bar at file boundary (None = last bar completed cleanly)
65    /// REUSES existing RangeBar type - no separate BarState needed!
66    pub incomplete_bar: Option<RangeBar>,
67
68    /// Fixed thresholds for incomplete bar (computed from bar.open, IMMUTABLE)
69    /// Stored as (upper_threshold, lower_threshold)
70    pub thresholds: Option<(FixedPoint, FixedPoint)>,
71
72    // === POSITION TRACKING (2 fields) ===
73    /// Last processed timestamp in microseconds (universal, works for all sources)
74    pub last_timestamp_us: i64,
75
76    /// Last trade ID (Some for Binance, None for Exness)
77    /// Binance: agg_trade_id is strictly sequential, never resets
78    pub last_trade_id: Option<i64>,
79
80    // === INTEGRITY (1 field) ===
81    /// Price window hash (ahash of last 8 prices for position verification)
82    /// Used to verify we're resuming at the correct position in data stream
83    pub price_hash: u64,
84
85    // === MONITORING (1 field) ===
86    /// Anomaly summary counts for debugging
87    pub anomaly_summary: AnomalySummary,
88}
89
90impl Checkpoint {
91    /// Create a new checkpoint with the given parameters
92    pub fn new(
93        symbol: String,
94        threshold_decimal_bps: u32,
95        incomplete_bar: Option<RangeBar>,
96        thresholds: Option<(FixedPoint, FixedPoint)>,
97        last_timestamp_us: i64,
98        last_trade_id: Option<i64>,
99        price_hash: u64,
100    ) -> Self {
101        Self {
102            symbol,
103            threshold_decimal_bps,
104            incomplete_bar,
105            thresholds,
106            last_timestamp_us,
107            last_trade_id,
108            price_hash,
109            anomaly_summary: AnomalySummary::default(),
110        }
111    }
112
113    /// Check if there's an incomplete bar that needs to continue
114    pub fn has_incomplete_bar(&self) -> bool {
115        self.incomplete_bar.is_some()
116    }
117
118    /// Get the library version that created this checkpoint
119    pub fn library_version() -> &'static str {
120        env!("CARGO_PKG_VERSION")
121    }
122}
123
124/// Anomaly summary for quick inspection (counts only)
125///
126/// Tracks anomalies detected during processing for debugging purposes.
127/// Does NOT affect processing - purely for monitoring.
128#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
129pub struct AnomalySummary {
130    /// Number of gaps detected (missing trade IDs or timestamp jumps)
131    pub gaps_detected: u32,
132
133    /// Number of overlaps detected (duplicate or out-of-order data)
134    pub overlaps_detected: u32,
135
136    /// Number of timestamp anomalies (negative intervals, etc.)
137    pub timestamp_anomalies: u32,
138}
139
140impl AnomalySummary {
141    /// Increment gap counter
142    pub fn record_gap(&mut self) {
143        self.gaps_detected += 1;
144    }
145
146    /// Increment overlap counter
147    pub fn record_overlap(&mut self) {
148        self.overlaps_detected += 1;
149    }
150
151    /// Increment timestamp anomaly counter
152    pub fn record_timestamp_anomaly(&mut self) {
153        self.timestamp_anomalies += 1;
154    }
155
156    /// Check if any anomalies were detected
157    pub fn has_anomalies(&self) -> bool {
158        self.gaps_detected > 0 || self.overlaps_detected > 0 || self.timestamp_anomalies > 0
159    }
160
161    /// Get total anomaly count
162    pub fn total(&self) -> u32 {
163        self.gaps_detected + self.overlaps_detected + self.timestamp_anomalies
164    }
165}
166
167/// Position verification result when resuming from checkpoint
168#[derive(Debug, Clone, PartialEq)]
169pub enum PositionVerification {
170    /// Trade ID matches expected (Binance: last_id + 1)
171    Exact,
172
173    /// Trade ID gap detected (Binance only)
174    /// Contains expected_id, actual_id, and count of missing trades
175    Gap {
176        expected_id: i64,
177        actual_id: i64,
178        missing_count: i64,
179    },
180
181    /// No trade ID available, timestamp check only (Exness)
182    /// Contains gap in milliseconds since last checkpoint
183    TimestampOnly { gap_ms: i64 },
184}
185
186impl PositionVerification {
187    /// Check if position verification indicates a data gap
188    pub fn has_gap(&self) -> bool {
189        matches!(self, PositionVerification::Gap { .. })
190    }
191}
192
193/// Checkpoint-related errors
194#[derive(Error, Debug, Clone, PartialEq)]
195pub enum CheckpointError {
196    /// Symbol mismatch between checkpoint and processor
197    #[error("Symbol mismatch: checkpoint has '{checkpoint}', expected '{expected}'")]
198    SymbolMismatch {
199        checkpoint: String,
200        expected: String,
201    },
202
203    /// Threshold mismatch between checkpoint and processor
204    #[error(
205        "Threshold mismatch: checkpoint has {checkpoint} decimal bps, expected {expected} decimal bps"
206    )]
207    ThresholdMismatch { checkpoint: u32, expected: u32 },
208
209    /// Price hash mismatch indicates wrong position in data stream
210    #[error("Price hash mismatch: checkpoint has {checkpoint}, computed {computed}")]
211    PriceHashMismatch { checkpoint: u64, computed: u64 },
212
213    /// Checkpoint has incomplete bar but no thresholds
214    #[error("Checkpoint has incomplete bar but missing thresholds - corrupted checkpoint")]
215    MissingThresholds,
216
217    /// Checkpoint serialization/deserialization error
218    #[error("Checkpoint serialization error: {message}")]
219    SerializationError { message: String },
220}
221
222/// Price window for computing position verification hash
223///
224/// Maintains a circular buffer of the last N prices for hash computation.
225#[derive(Debug, Clone)]
226pub struct PriceWindow {
227    prices: [i64; PRICE_WINDOW_SIZE],
228    index: usize,
229    count: usize,
230}
231
232impl Default for PriceWindow {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238impl PriceWindow {
239    /// Create a new empty price window
240    pub fn new() -> Self {
241        Self {
242            prices: [0; PRICE_WINDOW_SIZE],
243            index: 0,
244            count: 0,
245        }
246    }
247
248    /// Add a price to the window (circular buffer)
249    pub fn push(&mut self, price: FixedPoint) {
250        self.prices[self.index] = price.0;
251        self.index = (self.index + 1) % PRICE_WINDOW_SIZE;
252        if self.count < PRICE_WINDOW_SIZE {
253            self.count += 1;
254        }
255    }
256
257    /// Compute hash of the price window using ahash
258    ///
259    /// Returns a 64-bit hash that can be used to verify position in data stream.
260    pub fn compute_hash(&self) -> u64 {
261        let mut hasher = AHasher::default();
262
263        // Hash prices in order they were added (oldest to newest)
264        if self.count < PRICE_WINDOW_SIZE {
265            // Buffer not full yet - hash from start
266            for i in 0..self.count {
267                hasher.write_i64(self.prices[i]);
268            }
269        } else {
270            // Buffer full - hash from current index (oldest) around
271            for i in 0..PRICE_WINDOW_SIZE {
272                let idx = (self.index + i) % PRICE_WINDOW_SIZE;
273                hasher.write_i64(self.prices[idx]);
274            }
275        }
276
277        hasher.finish()
278    }
279
280    /// Get the number of prices in the window
281    pub fn len(&self) -> usize {
282        self.count
283    }
284
285    /// Check if the window is empty
286    pub fn is_empty(&self) -> bool {
287        self.count == 0
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[test]
296    fn test_checkpoint_creation() {
297        let checkpoint = Checkpoint::new(
298            "BTCUSDT".to_string(),
299            250, // 25bps
300            None,
301            None,
302            1640995200000000, // timestamp in microseconds
303            Some(12345),
304            0,
305        );
306
307        assert_eq!(checkpoint.symbol, "BTCUSDT");
308        assert_eq!(checkpoint.threshold_decimal_bps, 250);
309        assert!(!checkpoint.has_incomplete_bar());
310        assert_eq!(checkpoint.last_trade_id, Some(12345));
311    }
312
313    #[test]
314    fn test_checkpoint_serialization() {
315        let checkpoint = Checkpoint::new(
316            "EURUSD".to_string(),
317            10, // 1bps
318            None,
319            None,
320            1640995200000000,
321            None, // Exness has no trade IDs
322            12345678,
323        );
324
325        // Serialize to JSON
326        let json = serde_json::to_string(&checkpoint).unwrap();
327        assert!(json.contains("EURUSD"));
328        assert!(json.contains("\"threshold_decimal_bps\":10"));
329
330        // Deserialize back
331        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
332        assert_eq!(restored.symbol, "EURUSD");
333        assert_eq!(restored.threshold_decimal_bps, 10);
334        assert_eq!(restored.price_hash, 12345678);
335    }
336
337    #[test]
338    fn test_anomaly_summary() {
339        let mut summary = AnomalySummary::default();
340        assert!(!summary.has_anomalies());
341        assert_eq!(summary.total(), 0);
342
343        summary.record_gap();
344        summary.record_gap();
345        summary.record_timestamp_anomaly();
346
347        assert!(summary.has_anomalies());
348        assert_eq!(summary.gaps_detected, 2);
349        assert_eq!(summary.timestamp_anomalies, 1);
350        assert_eq!(summary.total(), 3);
351    }
352
353    #[test]
354    fn test_price_window() {
355        let mut window = PriceWindow::new();
356        assert!(window.is_empty());
357
358        // Add some prices
359        window.push(FixedPoint(5000000000000)); // 50000.0
360        window.push(FixedPoint(5001000000000)); // 50010.0
361        window.push(FixedPoint(5002000000000)); // 50020.0
362
363        assert_eq!(window.len(), 3);
364        assert!(!window.is_empty());
365
366        let hash1 = window.compute_hash();
367
368        // Same prices should produce same hash
369        let mut window2 = PriceWindow::new();
370        window2.push(FixedPoint(5000000000000));
371        window2.push(FixedPoint(5001000000000));
372        window2.push(FixedPoint(5002000000000));
373
374        let hash2 = window2.compute_hash();
375        assert_eq!(hash1, hash2);
376
377        // Different prices should produce different hash
378        let mut window3 = PriceWindow::new();
379        window3.push(FixedPoint(5000000000000));
380        window3.push(FixedPoint(5001000000000));
381        window3.push(FixedPoint(5003000000000)); // Different!
382
383        let hash3 = window3.compute_hash();
384        assert_ne!(hash1, hash3);
385    }
386
387    #[test]
388    fn test_price_window_circular() {
389        let mut window = PriceWindow::new();
390
391        // Fill the window beyond capacity
392        for i in 0..12 {
393            window.push(FixedPoint(i * 100000000));
394        }
395
396        // Should only contain last 8 prices
397        assert_eq!(window.len(), PRICE_WINDOW_SIZE);
398
399        // Hash should be consistent
400        let hash1 = window.compute_hash();
401        let hash2 = window.compute_hash();
402        assert_eq!(hash1, hash2);
403    }
404
405    #[test]
406    fn test_position_verification() {
407        let exact = PositionVerification::Exact;
408        assert!(!exact.has_gap());
409
410        let gap = PositionVerification::Gap {
411            expected_id: 100,
412            actual_id: 105,
413            missing_count: 5,
414        };
415        assert!(gap.has_gap());
416
417        let timestamp_only = PositionVerification::TimestampOnly { gap_ms: 1000 };
418        assert!(!timestamp_only.has_gap());
419    }
420
421    #[test]
422    fn test_library_version() {
423        let version = Checkpoint::library_version();
424        // Should be a valid semver string
425        assert!(version.contains('.'));
426        println!("Library version: {}", version);
427    }
428}