Skip to main content

rangebar_core/
checkpoint.rs

1//! Checkpoint system for cross-file range bar continuation
2//!
3//! Enables seamless processing across file boundaries by serializing
4//! incomplete bar state with IMMUTABLE thresholds.
5//!
6//! ## Primary Use Case
7//!
8//! ```text
9//! File 1 ends with incomplete bar → save Checkpoint
10//! File 2 starts → load Checkpoint → continue building bar
11//! ```
12//!
13//! ## Key Invariants
14//!
15//! - Thresholds are computed from bar.open and are IMMUTABLE for bar's lifetime
16//! - Incomplete bar state preserved across file boundaries
17//! - Note: `bar[i+1].open` may differ from `bar[i].close` (next bar opens at first
18//!   tick after previous bar closes, not at the close price itself)
19//! - Works with both Binance (has agg_trade_id) and Exness (timestamp-only)
20
21use crate::fixed_point::FixedPoint;
22use crate::types::RangeBar;
23use ahash::AHasher;
24use serde::{Deserialize, Serialize};
25use std::hash::Hasher;
26use thiserror::Error;
27
28/// Price window size for hash calculation (last N prices)
29const PRICE_WINDOW_SIZE: usize = 8;
30
31/// Checkpoint for cross-file range bar continuation
32///
33/// Enables seamless processing across any file boundaries (Binance daily, Exness monthly).
34/// Captures minimal state needed to continue building an incomplete bar.
35///
36/// # Example
37///
38/// ```ignore
39/// // Process first file
40/// let bars_1 = processor.process_agg_trade_records(&file1_trades)?;
41/// let checkpoint = processor.create_checkpoint("BTCUSDT");
42///
43/// // Serialize and save checkpoint
44/// let json = serde_json::to_string(&checkpoint)?;
45/// std::fs::write("checkpoint.json", json)?;
46///
47/// // ... later, load checkpoint and continue processing ...
48/// let json = std::fs::read_to_string("checkpoint.json")?;
49/// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
50/// let mut processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
51/// let bars_2 = processor.process_agg_trade_records(&file2_trades)?;
52/// ```
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct Checkpoint {
55    // === IDENTIFICATION (2 fields) ===
56    /// Symbol being processed (e.g., "BTCUSDT", "EURUSD")
57    pub symbol: String,
58
59    /// Threshold in decimal basis points (v3.0.0+: 0.1bps units)
60    /// Example: 250 = 25bps = 0.25%
61    pub threshold_decimal_bps: u32,
62
63    // === BAR STATE (2 fields) ===
64    /// Incomplete bar at file boundary (None = last bar completed cleanly)
65    /// REUSES existing RangeBar type - no separate BarState needed!
66    pub incomplete_bar: Option<RangeBar>,
67
68    /// Fixed thresholds for incomplete bar (computed from bar.open, IMMUTABLE)
69    /// Stored as (upper_threshold, lower_threshold)
70    pub thresholds: Option<(FixedPoint, FixedPoint)>,
71
72    // === POSITION TRACKING (2 fields) ===
73    /// Last processed timestamp in microseconds (universal, works for all sources)
74    pub last_timestamp_us: i64,
75
76    /// Last trade ID (Some for Binance, None for Exness)
77    /// Binance: agg_trade_id is strictly sequential, never resets
78    pub last_trade_id: Option<i64>,
79
80    // === INTEGRITY (1 field) ===
81    /// Price window hash (ahash of last 8 prices for position verification)
82    /// Used to verify we're resuming at the correct position in data stream
83    pub price_hash: u64,
84
85    // === MONITORING (1 field) ===
86    /// Anomaly summary counts for debugging
87    pub anomaly_summary: AnomalySummary,
88
89    // === BEHAVIOR FLAGS (2 fields) ===
90    /// Prevent bars from closing on same timestamp as they opened (Issue #36)
91    ///
92    /// When true (default): A bar cannot close until a trade arrives with a
93    /// different timestamp than the bar's open_time. This prevents flash crash
94    /// scenarios from creating thousands of bars at identical timestamps.
95    ///
96    /// When false: Legacy v8 behavior - bars can close immediately on breach.
97    #[serde(default = "default_prevent_same_timestamp_close")]
98    pub prevent_same_timestamp_close: bool,
99
100    /// Deferred bar open flag (Issue #46)
101    ///
102    /// When true: The last trade before checkpoint triggered a threshold breach.
103    /// On resume, the next trade should open a new bar instead of continuing.
104    /// This matches the batch path's `defer_open` semantics.
105    #[serde(default)]
106    pub defer_open: bool,
107}
108
109/// Default value for prevent_same_timestamp_close (true = timestamp gating enabled)
110fn default_prevent_same_timestamp_close() -> bool {
111    true
112}
113
114impl Checkpoint {
115    /// Create a new checkpoint with the given parameters
116    #[allow(clippy::too_many_arguments)]
117    pub fn new(
118        symbol: String,
119        threshold_decimal_bps: u32,
120        incomplete_bar: Option<RangeBar>,
121        thresholds: Option<(FixedPoint, FixedPoint)>,
122        last_timestamp_us: i64,
123        last_trade_id: Option<i64>,
124        price_hash: u64,
125        prevent_same_timestamp_close: bool,
126    ) -> Self {
127        Self {
128            symbol,
129            threshold_decimal_bps,
130            incomplete_bar,
131            thresholds,
132            last_timestamp_us,
133            last_trade_id,
134            price_hash,
135            anomaly_summary: AnomalySummary::default(),
136            prevent_same_timestamp_close,
137            defer_open: false,
138        }
139    }
140
141    /// Check if there's an incomplete bar that needs to continue
142    pub fn has_incomplete_bar(&self) -> bool {
143        self.incomplete_bar.is_some()
144    }
145
146    /// Get the library version that created this checkpoint
147    pub fn library_version() -> &'static str {
148        env!("CARGO_PKG_VERSION")
149    }
150}
151
152/// Anomaly summary for quick inspection (counts only)
153///
154/// Tracks anomalies detected during processing for debugging purposes.
155/// Does NOT affect processing - purely for monitoring.
156#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
157pub struct AnomalySummary {
158    /// Number of gaps detected (missing trade IDs or timestamp jumps)
159    pub gaps_detected: u32,
160
161    /// Number of overlaps detected (duplicate or out-of-order data)
162    pub overlaps_detected: u32,
163
164    /// Number of timestamp anomalies (negative intervals, etc.)
165    pub timestamp_anomalies: u32,
166}
167
168impl AnomalySummary {
169    /// Increment gap counter
170    pub fn record_gap(&mut self) {
171        self.gaps_detected += 1;
172    }
173
174    /// Increment overlap counter
175    pub fn record_overlap(&mut self) {
176        self.overlaps_detected += 1;
177    }
178
179    /// Increment timestamp anomaly counter
180    pub fn record_timestamp_anomaly(&mut self) {
181        self.timestamp_anomalies += 1;
182    }
183
184    /// Check if any anomalies were detected
185    pub fn has_anomalies(&self) -> bool {
186        self.gaps_detected > 0 || self.overlaps_detected > 0 || self.timestamp_anomalies > 0
187    }
188
189    /// Get total anomaly count
190    pub fn total(&self) -> u32 {
191        self.gaps_detected + self.overlaps_detected + self.timestamp_anomalies
192    }
193}
194
195/// Position verification result when resuming from checkpoint
196#[derive(Debug, Clone, PartialEq)]
197pub enum PositionVerification {
198    /// Trade ID matches expected (Binance: last_id + 1)
199    Exact,
200
201    /// Trade ID gap detected (Binance only)
202    /// Contains expected_id, actual_id, and count of missing trades
203    Gap {
204        expected_id: i64,
205        actual_id: i64,
206        missing_count: i64,
207    },
208
209    /// No trade ID available, timestamp check only (Exness)
210    /// Contains gap in milliseconds since last checkpoint
211    TimestampOnly { gap_ms: i64 },
212}
213
214impl PositionVerification {
215    /// Check if position verification indicates a data gap
216    pub fn has_gap(&self) -> bool {
217        matches!(self, PositionVerification::Gap { .. })
218    }
219}
220
221/// Checkpoint-related errors
222#[derive(Error, Debug, Clone, PartialEq)]
223pub enum CheckpointError {
224    /// Symbol mismatch between checkpoint and processor
225    #[error("Symbol mismatch: checkpoint has '{checkpoint}', expected '{expected}'")]
226    SymbolMismatch {
227        checkpoint: String,
228        expected: String,
229    },
230
231    /// Threshold mismatch between checkpoint and processor
232    #[error("Threshold mismatch: checkpoint has {checkpoint} dbps, expected {expected} dbps")]
233    ThresholdMismatch { checkpoint: u32, expected: u32 },
234
235    /// Price hash mismatch indicates wrong position in data stream
236    #[error("Price hash mismatch: checkpoint has {checkpoint}, computed {computed}")]
237    PriceHashMismatch { checkpoint: u64, computed: u64 },
238
239    /// Checkpoint has incomplete bar but no thresholds
240    #[error("Checkpoint has incomplete bar but missing thresholds - corrupted checkpoint")]
241    MissingThresholds,
242
243    /// Checkpoint serialization/deserialization error
244    #[error("Checkpoint serialization error: {message}")]
245    SerializationError { message: String },
246
247    /// Invalid threshold in checkpoint (Issue #62: crypto minimum threshold enforcement)
248    #[error(
249        "Invalid threshold in checkpoint: {threshold} dbps. Valid range: {min_threshold}-{max_threshold} dbps"
250    )]
251    InvalidThreshold {
252        threshold: u32,
253        min_threshold: u32,
254        max_threshold: u32,
255    },
256}
257
258/// Price window for computing position verification hash
259///
260/// Maintains a circular buffer of the last N prices for hash computation.
261#[derive(Debug, Clone)]
262pub struct PriceWindow {
263    prices: [i64; PRICE_WINDOW_SIZE],
264    index: usize,
265    count: usize,
266}
267
268impl Default for PriceWindow {
269    fn default() -> Self {
270        Self::new()
271    }
272}
273
274impl PriceWindow {
275    /// Create a new empty price window
276    pub fn new() -> Self {
277        Self {
278            prices: [0; PRICE_WINDOW_SIZE],
279            index: 0,
280            count: 0,
281        }
282    }
283
284    /// Add a price to the window (circular buffer)
285    pub fn push(&mut self, price: FixedPoint) {
286        self.prices[self.index] = price.0;
287        self.index = (self.index + 1) % PRICE_WINDOW_SIZE;
288        if self.count < PRICE_WINDOW_SIZE {
289            self.count += 1;
290        }
291    }
292
293    /// Compute hash of the price window using ahash
294    ///
295    /// Returns a 64-bit hash that can be used to verify position in data stream.
296    pub fn compute_hash(&self) -> u64 {
297        let mut hasher = AHasher::default();
298
299        // Hash prices in order they were added (oldest to newest)
300        if self.count < PRICE_WINDOW_SIZE {
301            // Buffer not full yet - hash from start
302            for i in 0..self.count {
303                hasher.write_i64(self.prices[i]);
304            }
305        } else {
306            // Buffer full - hash from current index (oldest) around
307            for i in 0..PRICE_WINDOW_SIZE {
308                let idx = (self.index + i) % PRICE_WINDOW_SIZE;
309                hasher.write_i64(self.prices[idx]);
310            }
311        }
312
313        hasher.finish()
314    }
315
316    /// Get the number of prices in the window
317    pub fn len(&self) -> usize {
318        self.count
319    }
320
321    /// Check if the window is empty
322    pub fn is_empty(&self) -> bool {
323        self.count == 0
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn test_checkpoint_creation() {
333        let checkpoint = Checkpoint::new(
334            "BTCUSDT".to_string(),
335            250, // 25bps
336            None,
337            None,
338            1640995200000000, // timestamp in microseconds
339            Some(12345),
340            0,
341            true, // prevent_same_timestamp_close
342        );
343
344        assert_eq!(checkpoint.symbol, "BTCUSDT");
345        assert_eq!(checkpoint.threshold_decimal_bps, 250);
346        assert!(!checkpoint.has_incomplete_bar());
347        assert_eq!(checkpoint.last_trade_id, Some(12345));
348        assert!(checkpoint.prevent_same_timestamp_close);
349    }
350
351    #[test]
352    fn test_checkpoint_serialization() {
353        let checkpoint = Checkpoint::new(
354            "EURUSD".to_string(),
355            10, // 1bps
356            None,
357            None,
358            1640995200000000,
359            None, // Exness has no trade IDs
360            12345678,
361            true, // prevent_same_timestamp_close
362        );
363
364        // Serialize to JSON
365        let json = serde_json::to_string(&checkpoint).unwrap();
366        assert!(json.contains("EURUSD"));
367        assert!(json.contains("\"threshold_decimal_bps\":10"));
368        assert!(json.contains("\"prevent_same_timestamp_close\":true"));
369
370        // Deserialize back
371        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
372        assert_eq!(restored.symbol, "EURUSD");
373        assert_eq!(restored.threshold_decimal_bps, 10);
374        assert_eq!(restored.price_hash, 12345678);
375        assert!(restored.prevent_same_timestamp_close);
376    }
377
378    #[test]
379    fn test_checkpoint_serialization_toggle_false() {
380        let checkpoint = Checkpoint::new(
381            "BTCUSDT".to_string(),
382            100, // 10bps
383            None,
384            None,
385            1640995200000000,
386            Some(999),
387            12345678,
388            false, // Legacy behavior
389        );
390
391        // Serialize to JSON
392        let json = serde_json::to_string(&checkpoint).unwrap();
393        assert!(json.contains("\"prevent_same_timestamp_close\":false"));
394
395        // Deserialize back
396        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
397        assert!(!restored.prevent_same_timestamp_close);
398    }
399
400    #[test]
401    fn test_checkpoint_deserialization_default() {
402        // Test that old checkpoints without the field default to true
403        let json = r#"{
404            "symbol": "BTCUSDT",
405            "threshold_decimal_bps": 100,
406            "incomplete_bar": null,
407            "thresholds": null,
408            "last_timestamp_us": 1640995200000000,
409            "last_trade_id": 12345,
410            "price_hash": 0,
411            "anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0}
412        }"#;
413
414        let checkpoint: Checkpoint = serde_json::from_str(json).unwrap();
415        // Missing field should default to true (new behavior)
416        assert!(checkpoint.prevent_same_timestamp_close);
417    }
418
419    #[test]
420    fn test_anomaly_summary() {
421        let mut summary = AnomalySummary::default();
422        assert!(!summary.has_anomalies());
423        assert_eq!(summary.total(), 0);
424
425        summary.record_gap();
426        summary.record_gap();
427        summary.record_timestamp_anomaly();
428
429        assert!(summary.has_anomalies());
430        assert_eq!(summary.gaps_detected, 2);
431        assert_eq!(summary.timestamp_anomalies, 1);
432        assert_eq!(summary.total(), 3);
433    }
434
435    #[test]
436    fn test_price_window() {
437        let mut window = PriceWindow::new();
438        assert!(window.is_empty());
439
440        // Add some prices
441        window.push(FixedPoint(5000000000000)); // 50000.0
442        window.push(FixedPoint(5001000000000)); // 50010.0
443        window.push(FixedPoint(5002000000000)); // 50020.0
444
445        assert_eq!(window.len(), 3);
446        assert!(!window.is_empty());
447
448        let hash1 = window.compute_hash();
449
450        // Same prices should produce same hash
451        let mut window2 = PriceWindow::new();
452        window2.push(FixedPoint(5000000000000));
453        window2.push(FixedPoint(5001000000000));
454        window2.push(FixedPoint(5002000000000));
455
456        let hash2 = window2.compute_hash();
457        assert_eq!(hash1, hash2);
458
459        // Different prices should produce different hash
460        let mut window3 = PriceWindow::new();
461        window3.push(FixedPoint(5000000000000));
462        window3.push(FixedPoint(5001000000000));
463        window3.push(FixedPoint(5003000000000)); // Different!
464
465        let hash3 = window3.compute_hash();
466        assert_ne!(hash1, hash3);
467    }
468
469    #[test]
470    fn test_price_window_circular() {
471        let mut window = PriceWindow::new();
472
473        // Fill the window beyond capacity
474        for i in 0..12 {
475            window.push(FixedPoint(i * 100000000));
476        }
477
478        // Should only contain last 8 prices
479        assert_eq!(window.len(), PRICE_WINDOW_SIZE);
480
481        // Hash should be consistent
482        let hash1 = window.compute_hash();
483        let hash2 = window.compute_hash();
484        assert_eq!(hash1, hash2);
485    }
486
487    #[test]
488    fn test_position_verification() {
489        let exact = PositionVerification::Exact;
490        assert!(!exact.has_gap());
491
492        let gap = PositionVerification::Gap {
493            expected_id: 100,
494            actual_id: 105,
495            missing_count: 5,
496        };
497        assert!(gap.has_gap());
498
499        let timestamp_only = PositionVerification::TimestampOnly { gap_ms: 1000 };
500        assert!(!timestamp_only.has_gap());
501    }
502
503    #[test]
504    fn test_library_version() {
505        let version = Checkpoint::library_version();
506        // Should be a valid semver string
507        assert!(version.contains('.'));
508        println!("Library version: {}", version);
509    }
510}