Skip to main content

opendeviationbar_core/
checkpoint.rs

1//! Checkpoint system for cross-file open deviation bar continuation
2//!
3//! Enables seamless processing across file boundaries by serializing
4//! incomplete bar state with IMMUTABLE thresholds.
5//!
6//! ## Primary Use Case
7//!
8//! ```text
9//! File 1 ends with incomplete bar → save Checkpoint
10//! File 2 starts → load Checkpoint → continue building bar
11//! ```
12//!
13//! ## Key Invariants
14//!
15//! - Thresholds are computed from bar.open and are IMMUTABLE for bar's lifetime
16//! - Incomplete bar state preserved across file boundaries
17//! - Note: `bar[i+1].open` may differ from `bar[i].close` (next bar opens at first
18//!   tick after previous bar closes, not at the close price itself)
19//! - Works with both Binance (has agg_trade_id) and Exness (timestamp-only)
20
21use crate::fixed_point::FixedPoint;
22use crate::types::OpenDeviationBar;
23use foldhash::fast::FixedState;
24use serde::{Deserialize, Serialize};
25use std::hash::{BuildHasher, Hasher};
26use thiserror::Error;
27
28/// Price window size for hash calculation (last N prices)
29const PRICE_WINDOW_SIZE: usize = 8;
30
31/// Checkpoint for cross-file open deviation bar continuation
32///
33/// Enables seamless processing across any file boundaries (Binance daily, Exness monthly).
34/// Captures minimal state needed to continue building an incomplete bar.
35///
36/// # Example
37///
38/// ```ignore
39/// // Process first file
40/// let bars_1 = processor.process_agg_trade_records(&file1_trades)?;
41/// let checkpoint = processor.create_checkpoint("BTCUSDT");
42///
43/// // Serialize and save checkpoint
44/// let json = serde_json::to_string(&checkpoint)?;
45/// std::fs::write("checkpoint.json", json)?;
46///
47/// // ... later, load checkpoint and continue processing ...
48/// let json = std::fs::read_to_string("checkpoint.json")?;
49/// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
50/// let mut processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint)?;
51/// let bars_2 = processor.process_agg_trade_records(&file2_trades)?;
52/// ```
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct Checkpoint {
55    // === VERSIONING (1 field) ===
56    /// Schema version for checkpoint format (Issue #85: Phase 2)
57    /// v1: Original format (default for old checkpoints)
58    /// v2: Field-reordered OpenDeviationBar (no behavioral changes, safe for deserialization)
59    #[serde(default = "default_checkpoint_version")]
60    pub version: u32,
61
62    // === IDENTIFICATION (2 fields) ===
63    /// Symbol being processed (e.g., "BTCUSDT", "EURUSD")
64    pub symbol: String,
65
66    /// Threshold in decimal basis points (v3.0.0+: 0.1bps units)
67    /// Example: 250 = 25bps = 0.25%
68    pub threshold_decimal_bps: u32,
69
70    // === BAR STATE (2 fields) ===
71    /// Incomplete bar at file boundary (None = last bar completed cleanly)
72    /// REUSES existing OpenDeviationBar type - no separate BarState needed!
73    pub incomplete_bar: Option<OpenDeviationBar>,
74
75    /// Fixed thresholds for incomplete bar (computed from bar.open, IMMUTABLE)
76    /// Stored as (upper_threshold, lower_threshold)
77    pub thresholds: Option<(FixedPoint, FixedPoint)>,
78
79    // === POSITION TRACKING (2 fields) ===
80    /// Last processed timestamp in microseconds (universal, works for all sources)
81    pub last_timestamp_us: i64,
82
83    /// Last trade ID (Some for Binance, None for Exness)
84    /// Binance: agg_trade_id is strictly sequential, never resets
85    pub last_trade_id: Option<i64>,
86
87    // === INTEGRITY (1 field) ===
88    /// Price window hash (ahash of last 8 prices for position verification)
89    /// Used to verify we're resuming at the correct position in data stream
90    pub price_hash: u64,
91
92    // === MONITORING (1 field) ===
93    /// Anomaly summary counts for debugging
94    pub anomaly_summary: AnomalySummary,
95
96    // === BEHAVIOR FLAGS (2 fields) ===
97    /// Prevent bars from closing on same timestamp as they opened (Issue #36)
98    ///
99    /// When true (default): A bar cannot close until a trade arrives with a
100    /// different timestamp than the bar's open_time. This prevents flash crash
101    /// scenarios from creating thousands of bars at identical timestamps.
102    ///
103    /// When false: Legacy v8 behavior - bars can close immediately on breach.
104    #[serde(default = "default_prevent_same_timestamp_close")]
105    pub prevent_same_timestamp_close: bool,
106
107    /// Deferred bar open flag (Issue #46)
108    ///
109    /// When true: The last trade before checkpoint triggered a threshold breach.
110    /// On resume, the next trade should open a new bar instead of continuing.
111    /// This matches the batch path's `defer_open` semantics.
112    #[serde(default)]
113    pub defer_open: bool,
114}
115
116/// Default checkpoint version (v1 for backward compatibility)
117fn default_checkpoint_version() -> u32 {
118    1
119}
120
121/// Default value for prevent_same_timestamp_close (true = timestamp gating enabled)
122fn default_prevent_same_timestamp_close() -> bool {
123    true
124}
125
126impl Checkpoint {
127    /// Create a new checkpoint with the given parameters
128    #[allow(clippy::too_many_arguments)]
129    pub fn new(
130        symbol: String,
131        threshold_decimal_bps: u32,
132        incomplete_bar: Option<OpenDeviationBar>,
133        thresholds: Option<(FixedPoint, FixedPoint)>,
134        last_timestamp_us: i64,
135        last_trade_id: Option<i64>,
136        price_hash: u64,
137        prevent_same_timestamp_close: bool,
138    ) -> Self {
139        Self {
140            version: 2, // New checkpoints created with current version
141            symbol,
142            threshold_decimal_bps,
143            incomplete_bar,
144            thresholds,
145            last_timestamp_us,
146            last_trade_id,
147            price_hash,
148            anomaly_summary: AnomalySummary::default(),
149            prevent_same_timestamp_close,
150            defer_open: false,
151        }
152    }
153
154    /// Check if there's an incomplete bar that needs to continue
155    pub fn has_incomplete_bar(&self) -> bool {
156        self.incomplete_bar.is_some()
157    }
158
159    /// Get the library version that created this checkpoint
160    pub fn library_version() -> &'static str {
161        env!("CARGO_PKG_VERSION")
162    }
163}
164
165/// Anomaly summary for quick inspection (counts only)
166///
167/// Tracks anomalies detected during processing for debugging purposes.
168/// Does NOT affect processing - purely for monitoring.
169#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
170pub struct AnomalySummary {
171    /// Number of gaps detected (missing trade IDs or timestamp jumps)
172    pub gaps_detected: u32,
173
174    /// Number of overlaps detected (duplicate or out-of-order data)
175    pub overlaps_detected: u32,
176
177    /// Number of timestamp anomalies (negative intervals, etc.)
178    pub timestamp_anomalies: u32,
179}
180
181impl AnomalySummary {
182    /// Increment gap counter
183    pub fn record_gap(&mut self) {
184        self.gaps_detected += 1;
185    }
186
187    /// Increment overlap counter
188    pub fn record_overlap(&mut self) {
189        self.overlaps_detected += 1;
190    }
191
192    /// Increment timestamp anomaly counter
193    pub fn record_timestamp_anomaly(&mut self) {
194        self.timestamp_anomalies += 1;
195    }
196
197    /// Check if any anomalies were detected
198    pub fn has_anomalies(&self) -> bool {
199        self.gaps_detected > 0 || self.overlaps_detected > 0 || self.timestamp_anomalies > 0
200    }
201
202    /// Get total anomaly count
203    pub fn total(&self) -> u32 {
204        self.gaps_detected + self.overlaps_detected + self.timestamp_anomalies
205    }
206}
207
208/// Position verification result when resuming from checkpoint
209#[derive(Debug, Clone, PartialEq)]
210pub enum PositionVerification {
211    /// Trade ID matches expected (Binance: last_id + 1)
212    Exact,
213
214    /// Trade ID gap detected (Binance only)
215    /// Contains expected_id, actual_id, and count of missing trades
216    Gap {
217        expected_id: i64,
218        actual_id: i64,
219        missing_count: i64,
220    },
221
222    /// No trade ID available, timestamp check only (Exness)
223    /// Contains gap in milliseconds since last checkpoint
224    TimestampOnly { gap_ms: i64 },
225}
226
227impl PositionVerification {
228    /// Check if position verification indicates a data gap
229    pub fn has_gap(&self) -> bool {
230        matches!(self, PositionVerification::Gap { .. })
231    }
232}
233
234/// Checkpoint-related errors
235#[derive(Error, Debug, Clone, PartialEq)]
236pub enum CheckpointError {
237    /// Symbol mismatch between checkpoint and processor
238    #[error("Symbol mismatch: checkpoint has '{checkpoint}', expected '{expected}'")]
239    SymbolMismatch {
240        checkpoint: String,
241        expected: String,
242    },
243
244    /// Threshold mismatch between checkpoint and processor
245    #[error("Threshold mismatch: checkpoint has {checkpoint} dbps, expected {expected} dbps")]
246    ThresholdMismatch { checkpoint: u32, expected: u32 },
247
248    /// Price hash mismatch indicates wrong position in data stream
249    #[error("Price hash mismatch: checkpoint has {checkpoint}, computed {computed}")]
250    PriceHashMismatch { checkpoint: u64, computed: u64 },
251
252    /// Checkpoint has incomplete bar but no thresholds
253    #[error("Checkpoint has incomplete bar but missing thresholds - corrupted checkpoint")]
254    MissingThresholds,
255
256    /// Checkpoint serialization/deserialization error
257    #[error("Checkpoint serialization error: {message}")]
258    SerializationError { message: String },
259
260    /// Invalid threshold in checkpoint (Issue #62: crypto minimum threshold enforcement)
261    #[error(
262        "Invalid threshold in checkpoint: {threshold} dbps. Valid range: {min_threshold}-{max_threshold} dbps"
263    )]
264    InvalidThreshold {
265        threshold: u32,
266        min_threshold: u32,
267        max_threshold: u32,
268    },
269}
270
271/// Price window for computing position verification hash
272///
273/// Maintains a circular buffer of the last N prices for hash computation.
274#[derive(Debug, Clone)]
275pub struct PriceWindow {
276    prices: [i64; PRICE_WINDOW_SIZE],
277    index: usize,
278    count: usize,
279}
280
281impl Default for PriceWindow {
282    fn default() -> Self {
283        Self::new()
284    }
285}
286
287impl PriceWindow {
288    /// Create a new empty price window
289    pub fn new() -> Self {
290        Self {
291            prices: [0; PRICE_WINDOW_SIZE],
292            index: 0,
293            count: 0,
294        }
295    }
296
297    /// Add a price to the window (circular buffer)
298    pub fn push(&mut self, price: FixedPoint) {
299        self.prices[self.index] = price.0;
300        self.index = (self.index + 1) % PRICE_WINDOW_SIZE;
301        if self.count < PRICE_WINDOW_SIZE {
302            self.count += 1;
303        }
304    }
305
306    /// Compute hash of the price window using foldhash
307    ///
308    /// Returns a 64-bit hash that can be used to verify position in data stream.
309    pub fn compute_hash(&self) -> u64 {
310        let mut hasher = FixedState::default().build_hasher();
311
312        // Hash prices in order they were added (oldest to newest)
313        if self.count < PRICE_WINDOW_SIZE {
314            // Buffer not full yet - hash from start
315            for i in 0..self.count {
316                hasher.write_i64(self.prices[i]);
317            }
318        } else {
319            // Buffer full - hash from current index (oldest) around
320            for i in 0..PRICE_WINDOW_SIZE {
321                let idx = (self.index + i) % PRICE_WINDOW_SIZE;
322                hasher.write_i64(self.prices[idx]);
323            }
324        }
325
326        hasher.finish()
327    }
328
329    /// Get the number of prices in the window
330    pub fn len(&self) -> usize {
331        self.count
332    }
333
334    /// Check if the window is empty
335    pub fn is_empty(&self) -> bool {
336        self.count == 0
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343
344    #[test]
345    fn test_checkpoint_creation() {
346        let checkpoint = Checkpoint::new(
347            "BTCUSDT".to_string(),
348            250, // 25bps
349            None,
350            None,
351            1640995200000000, // timestamp in microseconds
352            Some(12345),
353            0,
354            true, // prevent_same_timestamp_close
355        );
356
357        assert_eq!(checkpoint.symbol, "BTCUSDT");
358        assert_eq!(checkpoint.threshold_decimal_bps, 250);
359        assert!(!checkpoint.has_incomplete_bar());
360        assert_eq!(checkpoint.last_trade_id, Some(12345));
361        assert!(checkpoint.prevent_same_timestamp_close);
362    }
363
364    #[test]
365    fn test_checkpoint_serialization() {
366        let checkpoint = Checkpoint::new(
367            "EURUSD".to_string(),
368            10, // 1bps
369            None,
370            None,
371            1640995200000000,
372            None, // Exness has no trade IDs
373            12345678,
374            true, // prevent_same_timestamp_close
375        );
376
377        // Serialize to JSON
378        let json = serde_json::to_string(&checkpoint).unwrap();
379        assert!(json.contains("EURUSD"));
380        assert!(json.contains("\"threshold_decimal_bps\":10"));
381        assert!(json.contains("\"prevent_same_timestamp_close\":true"));
382
383        // Deserialize back
384        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
385        assert_eq!(restored.symbol, "EURUSD");
386        assert_eq!(restored.threshold_decimal_bps, 10);
387        assert_eq!(restored.price_hash, 12345678);
388        assert!(restored.prevent_same_timestamp_close);
389    }
390
391    #[test]
392    fn test_checkpoint_serialization_toggle_false() {
393        let checkpoint = Checkpoint::new(
394            "BTCUSDT".to_string(),
395            100, // 10bps
396            None,
397            None,
398            1640995200000000,
399            Some(999),
400            12345678,
401            false, // Legacy behavior
402        );
403
404        // Serialize to JSON
405        let json = serde_json::to_string(&checkpoint).unwrap();
406        assert!(json.contains("\"prevent_same_timestamp_close\":false"));
407
408        // Deserialize back
409        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
410        assert!(!restored.prevent_same_timestamp_close);
411    }
412
413    #[test]
414    fn test_checkpoint_deserialization_default() {
415        // Test that old checkpoints without the field default to true
416        let json = r#"{
417            "symbol": "BTCUSDT",
418            "threshold_decimal_bps": 100,
419            "incomplete_bar": null,
420            "thresholds": null,
421            "last_timestamp_us": 1640995200000000,
422            "last_trade_id": 12345,
423            "price_hash": 0,
424            "anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0}
425        }"#;
426
427        let checkpoint: Checkpoint = serde_json::from_str(json).unwrap();
428        // Missing field should default to true (new behavior)
429        assert!(checkpoint.prevent_same_timestamp_close);
430    }
431
432    #[test]
433    fn test_anomaly_summary() {
434        let mut summary = AnomalySummary::default();
435        assert!(!summary.has_anomalies());
436        assert_eq!(summary.total(), 0);
437
438        summary.record_gap();
439        summary.record_gap();
440        summary.record_timestamp_anomaly();
441
442        assert!(summary.has_anomalies());
443        assert_eq!(summary.gaps_detected, 2);
444        assert_eq!(summary.timestamp_anomalies, 1);
445        assert_eq!(summary.total(), 3);
446    }
447
448    #[test]
449    fn test_price_window() {
450        let mut window = PriceWindow::new();
451        assert!(window.is_empty());
452
453        // Add some prices
454        window.push(FixedPoint(5000000000000)); // 50000.0
455        window.push(FixedPoint(5001000000000)); // 50010.0
456        window.push(FixedPoint(5002000000000)); // 50020.0
457
458        assert_eq!(window.len(), 3);
459        assert!(!window.is_empty());
460
461        let hash1 = window.compute_hash();
462
463        // Same prices should produce same hash
464        let mut window2 = PriceWindow::new();
465        window2.push(FixedPoint(5000000000000));
466        window2.push(FixedPoint(5001000000000));
467        window2.push(FixedPoint(5002000000000));
468
469        let hash2 = window2.compute_hash();
470        assert_eq!(hash1, hash2);
471
472        // Different prices should produce different hash
473        let mut window3 = PriceWindow::new();
474        window3.push(FixedPoint(5000000000000));
475        window3.push(FixedPoint(5001000000000));
476        window3.push(FixedPoint(5003000000000)); // Different!
477
478        let hash3 = window3.compute_hash();
479        assert_ne!(hash1, hash3);
480    }
481
482    #[test]
483    fn test_price_window_circular() {
484        let mut window = PriceWindow::new();
485
486        // Fill the window beyond capacity
487        for i in 0..12 {
488            window.push(FixedPoint(i * 100000000));
489        }
490
491        // Should only contain last 8 prices
492        assert_eq!(window.len(), PRICE_WINDOW_SIZE);
493
494        // Hash should be consistent
495        let hash1 = window.compute_hash();
496        let hash2 = window.compute_hash();
497        assert_eq!(hash1, hash2);
498    }
499
500    #[test]
501    fn test_position_verification() {
502        let exact = PositionVerification::Exact;
503        assert!(!exact.has_gap());
504
505        let gap = PositionVerification::Gap {
506            expected_id: 100,
507            actual_id: 105,
508            missing_count: 5,
509        };
510        assert!(gap.has_gap());
511
512        let timestamp_only = PositionVerification::TimestampOnly { gap_ms: 1000 };
513        assert!(!timestamp_only.has_gap());
514    }
515
516    #[test]
517    fn test_library_version() {
518        let version = Checkpoint::library_version();
519        // Should be a valid semver string
520        assert!(version.contains('.'));
521        println!("Library version: {}", version);
522    }
523
524    #[test]
525    fn test_checkpoint_versioning() {
526        let checkpoint = Checkpoint::new(
527            "BTCUSDT".to_string(),
528            250, // 25bps
529            None,
530            None,
531            1640995200000000,
532            Some(12345),
533            0,
534            true,
535        );
536
537        // New checkpoints should have v2
538        assert_eq!(checkpoint.version, 2);
539    }
540
541    #[test]
542    fn test_checkpoint_v1_backward_compat() {
543        // Issue #85: Simulate old v1 checkpoint (without version field)
544        let json = r#"{
545            "symbol": "BTCUSDT",
546            "threshold_decimal_bps": 100,
547            "incomplete_bar": null,
548            "thresholds": null,
549            "last_timestamp_us": 1640995200000000,
550            "last_trade_id": 12345,
551            "price_hash": 0,
552            "anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0},
553            "prevent_same_timestamp_close": true,
554            "defer_open": false
555        }"#;
556
557        // Old v1 checkpoints should deserialize with version defaulting to 1
558        let checkpoint: Checkpoint = serde_json::from_str(json).unwrap();
559        assert_eq!(checkpoint.version, 1); // Deserialized from default
560        assert_eq!(checkpoint.symbol, "BTCUSDT");
561        assert_eq!(checkpoint.threshold_decimal_bps, 100);
562
563        // Migration is applied by OpenDeviationBarProcessor::from_checkpoint()
564        // which is tested in processor.rs::test_checkpoint_v1_to_v2_migration
565    }
566
567    #[test]
568    fn test_checkpoint_v2_serialization() {
569        let checkpoint = Checkpoint::new(
570            "EURUSD".to_string(),
571            10,
572            None,
573            None,
574            1640995200000000,
575            None,
576            12345678,
577            true,
578        );
579
580        // Serialize v2 checkpoint
581        let json = serde_json::to_string(&checkpoint).unwrap();
582        assert!(json.contains("\"version\":2"));
583
584        // Deserialize back
585        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
586        assert_eq!(restored.version, 2);
587        assert_eq!(restored.symbol, "EURUSD");
588    }
589
590    // =========================================================================
591    // Issue #96: PriceWindow circular buffer edge case tests
592    // =========================================================================
593
594    #[test]
595    fn test_price_window_empty() {
596        let pw = PriceWindow::new();
597        assert!(pw.is_empty());
598        assert_eq!(pw.len(), 0);
599        // Hash of empty window should be deterministic
600        let hash1 = pw.compute_hash();
601        let hash2 = PriceWindow::new().compute_hash();
602        assert_eq!(hash1, hash2, "Empty window hash must be deterministic");
603    }
604
605    #[test]
606    fn test_price_window_partial_fill() {
607        let mut pw = PriceWindow::new();
608        pw.push(FixedPoint(100_000_000)); // 1.0
609        pw.push(FixedPoint(200_000_000)); // 2.0
610        pw.push(FixedPoint(300_000_000)); // 3.0
611
612        assert_eq!(pw.len(), 3);
613        assert!(!pw.is_empty());
614
615        // Hash should be deterministic for same sequence
616        let mut pw2 = PriceWindow::new();
617        pw2.push(FixedPoint(100_000_000));
618        pw2.push(FixedPoint(200_000_000));
619        pw2.push(FixedPoint(300_000_000));
620        assert_eq!(pw.compute_hash(), pw2.compute_hash(), "Same prices = same hash");
621    }
622
623    #[test]
624    fn test_price_window_full_capacity() {
625        let mut pw = PriceWindow::new();
626        for i in 1..=PRICE_WINDOW_SIZE {
627            pw.push(FixedPoint(i as i64 * 100_000_000));
628        }
629        assert_eq!(pw.len(), PRICE_WINDOW_SIZE);
630
631        // Verify hash consistency
632        let hash1 = pw.compute_hash();
633        let hash2 = pw.compute_hash();
634        assert_eq!(hash1, hash2, "Hash must be idempotent");
635    }
636
637    #[test]
638    fn test_price_window_wrapping() {
639        let mut pw = PriceWindow::new();
640        // Fill to capacity
641        for i in 1..=PRICE_WINDOW_SIZE {
642            pw.push(FixedPoint(i as i64 * 100_000_000));
643        }
644        let hash_before = pw.compute_hash();
645
646        // Push one more (wraps, oldest evicted)
647        pw.push(FixedPoint(999_000_000));
648        assert_eq!(pw.len(), PRICE_WINDOW_SIZE, "Length stays at capacity after wrap");
649
650        let hash_after = pw.compute_hash();
651        assert_ne!(hash_before, hash_after, "Hash must change after circular overwrite");
652    }
653
654    #[test]
655    fn test_price_window_order_sensitivity() {
656        // Same prices, different order → different hash
657        let mut pw1 = PriceWindow::new();
658        pw1.push(FixedPoint(100_000_000));
659        pw1.push(FixedPoint(200_000_000));
660        pw1.push(FixedPoint(300_000_000));
661
662        let mut pw2 = PriceWindow::new();
663        pw2.push(FixedPoint(300_000_000));
664        pw2.push(FixedPoint(200_000_000));
665        pw2.push(FixedPoint(100_000_000));
666
667        assert_ne!(
668            pw1.compute_hash(), pw2.compute_hash(),
669            "Different order must produce different hash"
670        );
671    }
672
673    #[test]
674    fn test_price_window_push_beyond_capacity() {
675        let mut pw = PriceWindow::new();
676        // Push 2x capacity to exercise full circular behavior
677        for i in 1..=(PRICE_WINDOW_SIZE * 2) {
678            pw.push(FixedPoint(i as i64 * 100_000_000));
679        }
680        assert_eq!(pw.len(), PRICE_WINDOW_SIZE);
681
682        // Should only contain the last PRICE_WINDOW_SIZE prices
683        // Hash should match a fresh window with those same prices
684        let mut pw_expected = PriceWindow::new();
685        for i in (PRICE_WINDOW_SIZE + 1)..=(PRICE_WINDOW_SIZE * 2) {
686            pw_expected.push(FixedPoint(i as i64 * 100_000_000));
687        }
688        assert_eq!(
689            pw.compute_hash(), pw_expected.compute_hash(),
690            "After full wrap, hash must match the last N prices"
691        );
692    }
693}