Skip to main content

opendeviationbar_core/
processor.rs

1// FILE-SIZE-OK: 1496 lines — tests are inline (access private OpenDeviationBarState)
2//! Core open deviation bar processing algorithm
3//!
4//! Implements non-lookahead bias open deviation bar construction where bars close when
5//! price moves ±threshold dbps from the bar's OPEN price.
6
7use crate::checkpoint::{
8    AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
9};
10use crate::fixed_point::FixedPoint;
11use crate::interbar::{InterBarConfig, TradeHistory}; // Issue #59
12// Issue #59: Intra-bar features - using compute_intra_bar_features_with_config (Issue #128)
13use crate::types::{AggTrade, OpenDeviationBar};
14use smallvec::SmallVec; // Issue #119: Trade accumulation with inline buffer (512 slots ≈ 29KB)
15#[cfg(feature = "python")]
16use pyo3::prelude::*;
17// Re-export ProcessingError from errors.rs (Phase 2a extraction)
18pub use crate::errors::ProcessingError;
19// Re-export ExportOpenDeviationBarProcessor from export_processor.rs (Phase 2d extraction)
20pub use crate::export_processor::ExportOpenDeviationBarProcessor;
21
22/// Open deviation bar processor with non-lookahead bias guarantee
23pub struct OpenDeviationBarProcessor {
24    /// Threshold in decimal basis points (250 = 25bps, v3.0.0+)
25    threshold_decimal_bps: u32,
26
27    /// Issue #96 Task #98: Pre-computed threshold ratio for fast delta calculation
28    /// Stores (threshold_decimal_bps * SCALE) / BASIS_POINTS_SCALE as fixed-point
29    /// This allows compute_range_thresholds() to compute delta = (price * ratio) / SCALE
30    /// without repeated division, avoiding i128 arithmetic in hot path.
31    /// Performance: Eliminates BASIS_POINTS_SCALE division on every bar creation.
32    /// Made public for testing purposes.
33    pub threshold_ratio: i64,
34
35    /// Current bar state for streaming processing (Q19)
36    /// Enables get_incomplete_bar() and stateful process_single_trade()
37    current_bar_state: Option<OpenDeviationBarState>,
38
39    /// Price window for checkpoint hash verification
40    price_window: PriceWindow,
41
42    /// Last processed trade ID (for gap detection on resume)
43    last_trade_id: Option<i64>,
44
45    /// Last processed timestamp (for position verification)
46    last_timestamp_us: i64,
47
48    /// Anomaly tracking for debugging
49    anomaly_summary: AnomalySummary,
50
51    /// Flag indicating this processor was created from a checkpoint
52    /// When true, process_agg_trade_records will continue from existing bar state
53    resumed_from_checkpoint: bool,
54
55    /// Prevent bars from closing on same timestamp as they opened (Issue #36)
56    ///
57    /// When true (default): A bar cannot close until a trade arrives with a
58    /// different timestamp than the bar's open_time. This prevents "instant bars"
59    /// during flash crashes where multiple trades occur at the same millisecond.
60    ///
61    /// When false: Legacy behavior - bars can close on any breach regardless
62    /// of timestamp, which may produce bars with identical timestamps.
63    prevent_same_timestamp_close: bool,
64
65    /// Deferred bar open flag (Issue #46)
66    ///
67    /// When true: The previous trade triggered a threshold breach and closed a bar.
68    /// The next trade arriving via `process_single_trade()` should open a new bar
69    /// instead of being treated as a continuation.
70    ///
71    /// This matches the batch path's `defer_open` semantics in
72    /// `process_agg_trade_records()` where the breaching trade closes the current
73    /// bar and the NEXT trade opens the new bar.
74    defer_open: bool,
75
76    /// Trade history for inter-bar feature computation (Issue #59)
77    ///
78    /// Ring buffer of recent trades for computing lookback-based features.
79    /// When Some, features are computed from trades BEFORE each bar's open_time.
80    /// When None, inter-bar features are disabled (all lookback_* fields = None).
81    trade_history: Option<TradeHistory>,
82
83    /// Configuration for inter-bar features (Issue #59)
84    ///
85    /// Controls lookback mode (fixed count or time window) and which feature
86    /// tiers to compute. When None, inter-bar features are disabled.
87    inter_bar_config: Option<InterBarConfig>,
88
89    /// Enable intra-bar feature computation (Issue #59)
90    ///
91    /// When true, the processor accumulates trades during bar construction
92    /// and computes 22 features from trades WITHIN each bar at bar close.
93    /// Features include ITH (Investment Time Horizon), statistical, and
94    /// complexity metrics. When false, all intra_* fields are None.
95    include_intra_bar_features: bool,
96
97    /// Issue #128: Configuration for intra-bar feature computation.
98    /// Controls which complexity features (Hurst, PE) are computed.
99    intra_bar_config: crate::intrabar::IntraBarConfig,
100
101    /// Issue #112: Maximum timestamp gap in microseconds before discarding a forming bar
102    ///
103    /// When resuming from checkpoint with a forming bar, if the gap between
104    /// the forming bar's close_time and the first incoming trade exceeds this
105    /// threshold, the forming bar is discarded as an orphan (same as ouroboros reset).
106    /// This prevents "oversized" bars caused by large data gaps (e.g., 38-hour outages).
107    ///
108    /// Default: 3,600,000,000 μs (1 hour)
109    max_gap_us: i64,
110}
111
112/// Cold path: scan trades to find first unsorted pair and return error
113/// Extracted from validate_trade_ordering() to improve hot-path code layout
114#[cold]
115#[inline(never)]
116fn find_unsorted_trade(trades: &[AggTrade]) -> Result<(), ProcessingError> {
117    for i in 1..trades.len() {
118        let prev = &trades[i - 1];
119        let curr = &trades[i];
120        if curr.timestamp < prev.timestamp
121            || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
122        {
123            return Err(ProcessingError::UnsortedTrades {
124                index: i,
125                prev_time: prev.timestamp,
126                prev_id: prev.agg_trade_id,
127                curr_time: curr.timestamp,
128                curr_id: curr.agg_trade_id,
129            });
130        }
131    }
132    Ok(())
133}
134
135/// Cold path: construct unsorted trade error
136/// Extracted to keep error construction out of the hot validation loop
137#[cold]
138#[inline(never)]
139fn unsorted_trade_error(index: usize, prev: &AggTrade, curr: &AggTrade) -> Result<(), ProcessingError> {
140    Err(ProcessingError::UnsortedTrades {
141        index,
142        prev_time: prev.timestamp,
143        prev_id: prev.agg_trade_id,
144        curr_time: curr.timestamp,
145        curr_id: curr.agg_trade_id,
146    })
147}
148
149impl OpenDeviationBarProcessor {
150    /// Create new processor with given threshold
151    ///
152    /// Uses default behavior: `prevent_same_timestamp_close = true` (Issue #36)
153    ///
154    /// # Arguments
155    ///
156    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
157    ///   - Example: `250` → 25bps = 0.25%
158    ///   - Example: `10` → 1bps = 0.01%
159    ///   - Minimum: `1` → 0.1bps = 0.001%
160    ///
161    /// # Breaking Change (v3.0.0)
162    ///
163    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
164    /// **Migration**: Multiply all threshold values by 10.
165    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
166        Self::with_options(threshold_decimal_bps, true)
167    }
168
169    /// Create new processor with explicit timestamp gating control
170    ///
171    /// # Arguments
172    ///
173    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
174    /// * `prevent_same_timestamp_close` - If true, bars cannot close until
175    ///   timestamp advances from open_time. This prevents "instant bars" during
176    ///   flash crashes. Set to false for legacy behavior (pre-v9).
177    ///
178    /// # Example
179    ///
180    /// ```ignore
181    /// // Default behavior (v9+): timestamp gating enabled
182    /// let processor = OpenDeviationBarProcessor::new(250)?;
183    ///
184    /// // Legacy behavior: allow instant bars
185    /// let processor = OpenDeviationBarProcessor::with_options(250, false)?;
186    /// ```
187    pub fn with_options(
188        threshold_decimal_bps: u32,
189        prevent_same_timestamp_close: bool,
190    ) -> Result<Self, ProcessingError> {
191        // Validation bounds (v3.0.0: dbps units)
192        // Min: 1 dbps = 0.001%
193        // Max: 100,000 dbps = 100%
194        if threshold_decimal_bps < 1 {
195            return Err(ProcessingError::InvalidThreshold {
196                threshold_decimal_bps,
197            });
198        }
199        if threshold_decimal_bps > 100_000 {
200            return Err(ProcessingError::InvalidThreshold {
201                threshold_decimal_bps,
202            });
203        }
204
205        // Issue #96 Task #98: Pre-compute threshold ratio
206        // Ratio = (threshold_decimal_bps * SCALE) / BASIS_POINTS_SCALE
207        // This is used in compute_range_thresholds() for fast delta calculation
208        let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
209            / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
210
211        Ok(Self {
212            threshold_decimal_bps,
213            threshold_ratio,
214            current_bar_state: None,
215            price_window: PriceWindow::new(),
216            last_trade_id: None,
217            last_timestamp_us: 0,
218            anomaly_summary: AnomalySummary::default(),
219            resumed_from_checkpoint: false,
220            prevent_same_timestamp_close,
221            defer_open: false,
222            trade_history: None,               // Issue #59: disabled by default
223            inter_bar_config: None,            // Issue #59: disabled by default
224            include_intra_bar_features: false, // Issue #59: disabled by default
225            intra_bar_config: crate::intrabar::IntraBarConfig::default(), // Issue #128
226            max_gap_us: 3_600_000_000,         // Issue #112: 1 hour default
227        })
228    }
229
230    /// Get the prevent_same_timestamp_close setting
231    pub fn prevent_same_timestamp_close(&self) -> bool {
232        self.prevent_same_timestamp_close
233    }
234
235    /// Enable inter-bar feature computation with the given configuration (Issue #59)
236    ///
237    /// When enabled, the processor maintains a trade history buffer and computes
238    /// lookback-based microstructure features on each bar close. Features are
239    /// computed from trades that occurred BEFORE each bar's open_time, ensuring
240    /// no lookahead bias.
241    ///
242    /// Uses a local entropy cache (default behavior, backward compatible).
243    /// For multi-symbol workloads, use `with_inter_bar_config_and_cache()` with a global cache.
244    ///
245    /// # Arguments
246    ///
247    /// * `config` - Configuration controlling lookback mode and feature tiers
248    ///
249    /// # Example
250    ///
251    /// ```ignore
252    /// use opendeviationbar_core::processor::OpenDeviationBarProcessor;
253    /// use opendeviationbar_core::interbar::{InterBarConfig, LookbackMode};
254    ///
255    /// let processor = OpenDeviationBarProcessor::new(1000)?
256    ///     .with_inter_bar_config(InterBarConfig {
257    ///         lookback_mode: LookbackMode::FixedCount(500),
258    ///         compute_tier2: true,
259    ///         compute_tier3: true,
260    ///         ..Default::default()
261    ///     });
262    /// ```
263    pub fn with_inter_bar_config(self, config: InterBarConfig) -> Self {
264        self.with_inter_bar_config_and_cache(config, None)
265    }
266
267    /// Enable inter-bar feature computation with optional external entropy cache
268    ///
269    /// Issue #145 Phase 3: Multi-Symbol Entropy Cache Sharing
270    ///
271    /// # Arguments
272    ///
273    /// * `config` - Configuration controlling lookback mode and feature tiers
274    /// * `external_cache` - Optional shared entropy cache from `get_global_entropy_cache()`
275    ///   - If provided: Uses the shared global cache (recommended for multi-symbol)
276    ///   - If None: Creates a local 128-entry cache (default, backward compatible)
277    ///
278    /// # Usage
279    ///
280    /// ```ignore
281    /// use opendeviationbar_core::{processor::OpenDeviationBarProcessor, entropy_cache_global::get_global_entropy_cache, interbar::InterBarConfig};
282    ///
283    /// // Single-symbol: use local cache (default)
284    /// let processor = OpenDeviationBarProcessor::new(1000)?
285    ///     .with_inter_bar_config(config);
286    ///
287    /// // Multi-symbol: share global cache
288    /// let global_cache = get_global_entropy_cache();
289    /// let processor = OpenDeviationBarProcessor::new(1000)?
290    ///     .with_inter_bar_config_and_cache(config, Some(global_cache));
291    /// ```
292    pub fn with_inter_bar_config_and_cache(
293        mut self,
294        config: InterBarConfig,
295        external_cache: Option<std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>>,
296    ) -> Self {
297        self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
298        self.inter_bar_config = Some(config);
299        self
300    }
301
302    /// Check if inter-bar features are enabled
303    pub fn inter_bar_enabled(&self) -> bool {
304        self.inter_bar_config.is_some()
305    }
306
307    /// Issue #112: Configure maximum timestamp gap for checkpoint recovery
308    ///
309    /// When resuming from checkpoint with a forming bar, if the gap between
310    /// the forming bar's close_time and the first incoming trade exceeds this
311    /// threshold, the forming bar is discarded as an orphan.
312    ///
313    /// # Arguments
314    ///
315    /// * `max_gap_us` - Maximum gap in microseconds (default: 3,600,000,000 = 1 hour)
316    pub fn with_max_gap(mut self, max_gap_us: i64) -> Self {
317        self.max_gap_us = max_gap_us;
318        self
319    }
320
321    /// Get the maximum gap threshold in microseconds
322    pub fn max_gap_us(&self) -> i64 {
323        self.max_gap_us
324    }
325
326    /// Enable intra-bar feature computation (Issue #59)
327    ///
328    /// When enabled, the processor accumulates trades during bar construction
329    /// and computes 22 features from trades WITHIN each bar at bar close:
330    /// - 8 ITH features (Investment Time Horizon)
331    /// - 12 statistical features (OFI, intensity, Kyle lambda, etc.)
332    /// - 2 complexity features (Hurst exponent, permutation entropy)
333    ///
334    /// # Memory Note
335    ///
336    /// Trades are accumulated per-bar and freed when the bar closes.
337    /// Typical 1000 dbps bar: ~50-500 trades, ~2-24 KB overhead.
338    ///
339    /// # Example
340    ///
341    /// ```ignore
342    /// let processor = OpenDeviationBarProcessor::new(1000)?
343    ///     .with_intra_bar_features();
344    /// ```
345    pub fn with_intra_bar_features(mut self) -> Self {
346        self.include_intra_bar_features = true;
347        self
348    }
349
350    /// Check if intra-bar features are enabled
351    pub fn intra_bar_enabled(&self) -> bool {
352        self.include_intra_bar_features
353    }
354
355    /// Re-enable inter-bar features on an existing processor (Issue #97).
356    ///
357    /// Used after `from_checkpoint()` to restore microstructure config that
358    /// is not preserved in checkpoint state. Uses a local entropy cache by default.
359    /// For multi-symbol workloads, use `set_inter_bar_config_with_cache()` with a global cache.
360    pub fn set_inter_bar_config(&mut self, config: InterBarConfig) {
361        self.set_inter_bar_config_with_cache(config, None);
362    }
363
364    /// Re-enable inter-bar features with optional external entropy cache (Issue #145 Phase 3).
365    ///
366    /// Used after `from_checkpoint()` to restore microstructure config that
367    /// is not preserved in checkpoint state. Allows specifying a shared entropy cache
368    /// for multi-symbol processors.
369    pub fn set_inter_bar_config_with_cache(
370        &mut self,
371        config: InterBarConfig,
372        external_cache: Option<std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>>,
373    ) {
374        self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
375        self.inter_bar_config = Some(config);
376    }
377
378    /// Re-enable intra-bar features on an existing processor (Issue #97).
379    pub fn set_intra_bar_features(&mut self, enabled: bool) {
380        self.include_intra_bar_features = enabled;
381    }
382
383    /// Issue #128: Set intra-bar feature configuration.
384    /// Controls which complexity features (Hurst, PE) are computed.
385    pub fn with_intra_bar_config(mut self, config: crate::intrabar::IntraBarConfig) -> Self {
386        self.intra_bar_config = config;
387        self
388    }
389
390    /// Issue #128: Set intra-bar feature configuration on existing processor.
391    pub fn set_intra_bar_config(&mut self, config: crate::intrabar::IntraBarConfig) {
392        self.intra_bar_config = config;
393    }
394
395    /// Process a single trade and return completed bar if any
396    ///
397    /// Maintains internal state for streaming use case. State persists across calls
398    /// until a bar completes (threshold breach), enabling get_incomplete_bar().
399    ///
400    /// # Arguments
401    ///
402    /// * `trade` - Single aggregated trade to process
403    ///
404    /// # Returns
405    ///
406    /// `Some(OpenDeviationBar)` if a bar was completed, `None` otherwise
407    ///
408    /// # State Management
409    ///
410    /// - First trade: Initializes new bar state
411    /// - Subsequent trades: Updates existing bar or closes on breach
412    /// - Breach: Returns completed bar, starts new bar with breaching trade
413    ///
414    /// Issue #96 Task #78: Accept borrowed AggTrade to eliminate clones in fan-out loops.
415    /// Streaming pipelines (4+ thresholds) were cloning ~57 byte trades per processor.
416    /// Signature change to `&AggTrade` eliminates 4-8x unnecessary allocations.
417    /// Issue #96 Task #84: `#[inline]` — main hot-path entry point called on every trade.
418    #[inline]
419    pub fn process_single_trade(
420        &mut self,
421        trade: &AggTrade,
422    ) -> Result<Option<OpenDeviationBar>, ProcessingError> {
423        // Track price and position for checkpoint
424        self.price_window.push(trade.price);
425        self.last_trade_id = Some(trade.agg_trade_id);
426        self.last_timestamp_us = trade.timestamp;
427
428        // Issue #59: Push trade to history buffer for inter-bar feature computation
429        // This must happen BEFORE bar processing so lookback window includes recent trades
430        if let Some(ref mut history) = self.trade_history {
431            history.push(trade);
432        }
433
434        // Issue #46: If previous call triggered a breach, this trade opens the new bar.
435        // This matches the batch path's defer_open semantics - the breaching trade
436        // closes the current bar, and the NEXT trade opens the new bar.
437        if self.defer_open {
438            // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
439            if let Some(ref mut history) = self.trade_history {
440                history.on_bar_open(trade.timestamp);
441            }
442            self.current_bar_state = Some(if self.include_intra_bar_features {
443                OpenDeviationBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
444            } else {
445                OpenDeviationBarState::new(trade, self.threshold_ratio)
446            });
447            self.defer_open = false;
448            return Ok(None);
449        }
450
451        match &mut self.current_bar_state {
452            None => {
453                // First trade - initialize new bar
454                // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
455                if let Some(ref mut history) = self.trade_history {
456                    history.on_bar_open(trade.timestamp);
457                }
458                self.current_bar_state = Some(if self.include_intra_bar_features {
459                    OpenDeviationBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
460                } else {
461                    OpenDeviationBarState::new(trade, self.threshold_ratio)
462                });
463                Ok(None)
464            }
465            Some(bar_state) => {
466                // Issue #59 & #96 Task #44: Accumulate trade for intra-bar features (before breach check)
467                // Only accumulates if features enabled, avoiding unnecessary clones
468                bar_state.accumulate_trade(trade, self.include_intra_bar_features);
469
470                // Check for threshold breach
471                let price_breaches = bar_state.bar.is_breach(
472                    trade.price,
473                    bar_state.upper_threshold,
474                    bar_state.lower_threshold,
475                );
476
477                // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
478                // This eliminates "instant bars" during flash crashes where multiple trades
479                // occur at the same millisecond.
480                let timestamp_allows_close = !self.prevent_same_timestamp_close
481                    || trade.timestamp != bar_state.bar.open_time;
482
483                if price_breaches && timestamp_allows_close {
484                    // Breach detected AND timestamp changed - close current bar
485                    bar_state.bar.update_with_trade(trade);
486
487                    // Validation: Ensure high/low include open/close extremes
488                    debug_assert!(
489                        bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
490                    );
491                    debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
492
493                    // Compute microstructure features at bar finalization (Issue #25)
494                    bar_state.bar.compute_microstructure_features();
495
496                    // Issue #59: Compute inter-bar features from lookback window
497                    // Features are computed from trades BEFORE bar.open_time (no lookahead)
498                    if let Some(ref mut history) = self.trade_history {
499                        let inter_bar_features = history.compute_features(bar_state.bar.open_time);
500                        bar_state.bar.set_inter_bar_features(&inter_bar_features);
501                        // Issue #68: Notify history that bar is closing (resumes normal pruning)
502                        history.on_bar_close();
503                    }
504
505                    // Issue #59: Compute intra-bar features from accumulated trades
506                    if self.include_intra_bar_features {
507                        // Issue #96 Task #173: Use reusable scratch buffers from bar_state
508                        let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_config(
509                            &bar_state.accumulated_trades,
510                            &mut bar_state.scratch_prices,
511                            &mut bar_state.scratch_volumes,
512                            &self.intra_bar_config, // Issue #128
513                        );
514                        bar_state.bar.set_intra_bar_features(&intra_bar_features);
515                    }
516
517                    // Move bar out instead of cloning — bar_state borrow ends after
518                    // last use above (NLL), so take() is safe here.
519                    let completed_bar = self.current_bar_state.take().unwrap().bar;
520
521                    // Issue #46: Don't start new bar with breaching trade.
522                    // Next trade will open the new bar via defer_open.
523                    self.defer_open = true;
524
525                    Ok(Some(completed_bar))
526                } else {
527                    // Either no breach OR same timestamp (gate active) - update existing bar
528                    bar_state.bar.update_with_trade(trade);
529                    Ok(None)
530                }
531            }
532        }
533    }
534
535    /// Get any incomplete bar currently being processed
536    ///
537    /// Returns clone of current bar state for inspection without consuming it.
538    /// Useful for final bar at stream end or progress monitoring.
539    ///
540    /// # Returns
541    ///
542    /// `Some(OpenDeviationBar)` if bar is in progress, `None` if no active bar
543    pub fn get_incomplete_bar(&self) -> Option<OpenDeviationBar> {
544        self.current_bar_state
545            .as_ref()
546            .map(|state| state.bar.clone())
547    }
548
549    /// Process AggTrade records into open deviation bars including incomplete bars for analysis
550    ///
551    /// # Arguments
552    ///
553    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
554    ///
555    /// # Returns
556    ///
557    /// Vector of open deviation bars including incomplete bars at end of data
558    ///
559    /// # Warning
560    ///
561    /// This method is for analysis purposes only. Incomplete bars violate the
562    /// fundamental open deviation bar algorithm and should not be used for production trading.
563    pub fn process_agg_trade_records_with_incomplete(
564        &mut self,
565        agg_trade_records: &[AggTrade],
566    ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
567        self.process_agg_trade_records_with_options(agg_trade_records, true)
568    }
569
570    /// Process Binance aggregated trade records into open deviation bars
571    ///
572    /// This is the primary method for converting AggTrade records (which aggregate
573    /// multiple individual trades) into open deviation bars based on price movement thresholds.
574    ///
575    /// # Parameters
576    ///
577    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
578    ///   Each record represents multiple individual trades aggregated at same price
579    ///
580    /// # Returns
581    ///
582    /// Vector of completed open deviation bars (ONLY bars that breached thresholds).
583    /// Each bar tracks both individual trade count and AggTrade record count.
584    pub fn process_agg_trade_records(
585        &mut self,
586        agg_trade_records: &[AggTrade],
587    ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
588        self.process_agg_trade_records_with_options(agg_trade_records, false)
589    }
590
591    /// Process AggTrade records with options for including incomplete bars
592    ///
593    /// Batch processing mode: Clears any existing state before processing.
594    /// Use process_single_trade() for stateful streaming instead.
595    ///
596    /// # Parameters
597    ///
598    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
599    /// * `include_incomplete` - Whether to include incomplete bars at end of processing
600    ///
601    /// # Returns
602    ///
603    /// Vector of open deviation bars (completed + incomplete if requested)
604    pub fn process_agg_trade_records_with_options(
605        &mut self,
606        agg_trade_records: &[AggTrade],
607        include_incomplete: bool,
608    ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
609        if agg_trade_records.is_empty() {
610            return Ok(Vec::new());
611        }
612
613        // Validate records are sorted
614        self.validate_trade_ordering(agg_trade_records)?;
615
616        // Use existing bar state if resuming from checkpoint, otherwise start fresh
617        // This is CRITICAL for cross-file continuation (Issues #2, #3)
618        let mut current_bar: Option<OpenDeviationBarState> = if self.resumed_from_checkpoint {
619            // Continue from checkpoint's incomplete bar
620            self.resumed_from_checkpoint = false; // Consume the flag
621            let restored_bar = self.current_bar_state.take();
622
623            // Issue #112: Gap-aware checkpoint recovery
624            // If the forming bar's close_time is too far from the first incoming trade,
625            // discard it as an orphan to prevent oversized bars from data gaps.
626            if let Some(ref bar_state) = restored_bar {
627                let first_trade_ts = agg_trade_records[0].timestamp;
628                let gap = first_trade_ts - bar_state.bar.close_time;
629                if gap > self.max_gap_us {
630                    self.anomaly_summary.record_gap();
631                    // Discard forming bar — same treatment as ouroboros reset
632                    None
633                } else {
634                    restored_bar
635                }
636            } else {
637                restored_bar
638            }
639        } else {
640            // Start fresh for normal batch processing
641            self.current_bar_state = None;
642            None
643        };
644
645        let mut bars = Vec::with_capacity(agg_trade_records.len() / 50); // Heuristic: 50 trades/bar covers consolidation regimes
646        let mut defer_open = false;
647
648        for agg_record in agg_trade_records {
649            // Track price and position for checkpoint
650            self.price_window.push(agg_record.price);
651            self.last_trade_id = Some(agg_record.agg_trade_id);
652            self.last_timestamp_us = agg_record.timestamp;
653
654            // Issue #59: Push trade to history buffer for inter-bar feature computation
655            if let Some(ref mut history) = self.trade_history {
656                history.push(agg_record);
657            }
658
659            if defer_open {
660                // Previous bar closed, this agg_record opens new bar
661                // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
662                if let Some(ref mut history) = self.trade_history {
663                    history.on_bar_open(agg_record.timestamp);
664                }
665                current_bar = Some(if self.include_intra_bar_features {
666                    OpenDeviationBarState::new_with_trade_accumulation(
667                        agg_record,
668                        self.threshold_ratio,
669                    )
670                } else {
671                    OpenDeviationBarState::new(agg_record, self.threshold_ratio)
672                });
673                defer_open = false;
674                continue;
675            }
676
677            match current_bar {
678                None => {
679                    // First bar initialization
680                    // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
681                    if let Some(ref mut history) = self.trade_history {
682                        history.on_bar_open(agg_record.timestamp);
683                    }
684                    current_bar = Some(if self.include_intra_bar_features {
685                        OpenDeviationBarState::new_with_trade_accumulation(
686                            agg_record,
687                            self.threshold_ratio,
688                        )
689                    } else {
690                        OpenDeviationBarState::new(agg_record, self.threshold_ratio)
691                    });
692                }
693                Some(ref mut bar_state) => {
694                    // Issue #59 & #96 Task #44: Accumulate trade for intra-bar features (before breach check)
695                    // Only accumulates if features enabled, avoiding unnecessary clones
696                    bar_state.accumulate_trade(agg_record, self.include_intra_bar_features);
697
698                    // Check if this AggTrade record breaches the threshold
699                    let price_breaches = bar_state.bar.is_breach(
700                        agg_record.price,
701                        bar_state.upper_threshold,
702                        bar_state.lower_threshold,
703                    );
704
705                    // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
706                    // This eliminates "instant bars" during flash crashes where multiple trades
707                    // occur at the same millisecond.
708                    let timestamp_allows_close = !self.prevent_same_timestamp_close
709                        || agg_record.timestamp != bar_state.bar.open_time;
710
711                    if price_breaches && timestamp_allows_close {
712                        // Breach detected AND timestamp changed - update bar with breaching record
713                        bar_state.bar.update_with_trade(agg_record);
714
715                        // Validation: Ensure high/low include open/close extremes
716                        debug_assert!(
717                            bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
718                        );
719                        debug_assert!(
720                            bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
721                        );
722
723                        // Compute microstructure features at bar finalization (Issue #34)
724                        bar_state.bar.compute_microstructure_features();
725
726                        // Issue #59: Compute inter-bar features from lookback window
727                        if let Some(ref mut history) = self.trade_history {
728                            let inter_bar_features =
729                                history.compute_features(bar_state.bar.open_time);
730                            bar_state.bar.set_inter_bar_features(&inter_bar_features);
731                            // Issue #68: Notify history that bar is closing (resumes normal pruning)
732                            history.on_bar_close();
733                        }
734
735                        // Issue #59: Compute intra-bar features from accumulated trades
736                        if self.include_intra_bar_features {
737                            // Issue #96 Task #173: Use reusable scratch buffers from bar_state
738                            let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_config(
739                                &bar_state.accumulated_trades,
740                                &mut bar_state.scratch_prices,
741                                &mut bar_state.scratch_volumes,
742                                &self.intra_bar_config, // Issue #128
743                            );
744                            bar_state.bar.set_intra_bar_features(&intra_bar_features);
745                        }
746
747                        // Move bar out instead of cloning — bar_state borrow ends
748                        // after last use above (NLL), so take() is safe here.
749                        bars.push(current_bar.take().unwrap().bar);
750                        defer_open = true; // Next record will open new bar
751                    } else {
752                        // Either no breach OR same timestamp (gate active) - normal update
753                        bar_state.bar.update_with_trade(agg_record);
754                    }
755                }
756            }
757        }
758
759        // Save current bar state for checkpoint and optionally append incomplete bar.
760        // When include_incomplete=true, clone for checkpoint then consume for output.
761        // When include_incomplete=false, move directly (no clone needed).
762        if include_incomplete {
763            // Issue #96 Task #95: Optimize checkpoint cloning with take() to avoid Vec allocation
764            // (accumulated_trades not needed after intra-bar features computed).
765            // Using std::mem::take() instead of clone+clear reduces allocation overhead.
766            if let Some(ref state) = current_bar {
767                let mut checkpoint_state = state.clone();
768                // Use take() to avoid cloning the Vec - directly moves empty Vec into place
769                let _ = std::mem::take(&mut checkpoint_state.accumulated_trades);
770                self.current_bar_state = Some(checkpoint_state);
771            }
772
773            // Add final partial bar only if explicitly requested
774            // This preserves algorithm integrity: bars should only close on threshold breach
775            if let Some(mut bar_state) = current_bar {
776                // Compute microstructure features for incomplete bar (Issue #34)
777                bar_state.bar.compute_microstructure_features();
778
779                // Issue #59: Compute inter-bar features from lookback window
780                if let Some(ref history) = self.trade_history {
781                    let inter_bar_features = history.compute_features(bar_state.bar.open_time);
782                    bar_state.bar.set_inter_bar_features(&inter_bar_features);
783                }
784
785                // Issue #59: Compute intra-bar features from accumulated trades
786                if self.include_intra_bar_features {
787                    // Issue #96 Task #173: Use reusable scratch buffers from bar_state
788                    let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_config(
789                        &bar_state.accumulated_trades,
790                        &mut bar_state.scratch_prices,
791                        &mut bar_state.scratch_volumes,
792                        &self.intra_bar_config, // Issue #128
793                    );
794                    bar_state.bar.set_intra_bar_features(&intra_bar_features);
795                }
796
797                bars.push(bar_state.bar);
798            }
799        } else {
800            // No incomplete bar appended — move ownership directly, no clone needed
801            self.current_bar_state = current_bar;
802        }
803
804        Ok(bars)
805    }
806
807    // === CHECKPOINT METHODS ===
808
809    /// Create checkpoint for cross-file continuation
810    ///
811    /// Captures current processing state for seamless continuation:
812    /// - Incomplete bar (if any) with FIXED thresholds
813    /// - Position tracking (timestamp, trade_id if available)
814    /// - Price hash for verification
815    ///
816    /// # Arguments
817    ///
818    /// * `symbol` - Symbol being processed (e.g., "BTCUSDT", "EURUSD")
819    ///
820    /// # Example
821    ///
822    /// ```ignore
823    /// let bars = processor.process_agg_trade_records(&trades)?;
824    /// let checkpoint = processor.create_checkpoint("BTCUSDT");
825    /// let json = serde_json::to_string(&checkpoint)?;
826    /// std::fs::write("checkpoint.json", json)?;
827    /// ```
828    pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
829        let (incomplete_bar, thresholds) = match &self.current_bar_state {
830            Some(state) => (
831                Some(state.bar.clone()),
832                Some((state.upper_threshold, state.lower_threshold)),
833            ),
834            None => (None, None),
835        };
836
837        let mut checkpoint = Checkpoint::new(
838            symbol.to_string(),
839            self.threshold_decimal_bps,
840            incomplete_bar,
841            thresholds,
842            self.last_timestamp_us,
843            self.last_trade_id,
844            self.price_window.compute_hash(),
845            self.prevent_same_timestamp_close,
846        );
847        // Issue #46: Persist defer_open state for cross-session continuity
848        checkpoint.defer_open = self.defer_open;
849        checkpoint
850    }
851
852    /// Resume processing from checkpoint
853    ///
854    /// Restores incomplete bar state with IMMUTABLE thresholds.
855    /// Next trade continues building the bar until threshold breach.
856    ///
857    /// # Errors
858    ///
859    /// - `CheckpointError::MissingThresholds` - Checkpoint has bar but no thresholds
860    ///
861    /// # Example
862    ///
863    /// ```ignore
864    /// let json = std::fs::read_to_string("checkpoint.json")?;
865    /// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
866    /// let mut processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint)?;
867    /// let bars = processor.process_agg_trade_records(&next_file_trades)?;
868    /// ```
869    pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
870        // Issue #85 Phase 2: Apply checkpoint schema migration if needed
871        let checkpoint = Self::migrate_checkpoint(checkpoint);
872
873        // Issue #62: Validate threshold range before restoring from checkpoint
874        // Valid range: 1-100,000 dbps (0.0001% to 10%)
875        const THRESHOLD_MIN: u32 = 1;
876        const THRESHOLD_MAX: u32 = 100_000;
877        if checkpoint.threshold_decimal_bps < THRESHOLD_MIN
878            || checkpoint.threshold_decimal_bps > THRESHOLD_MAX
879        {
880            return Err(CheckpointError::InvalidThreshold {
881                threshold: checkpoint.threshold_decimal_bps,
882                min_threshold: THRESHOLD_MIN,
883                max_threshold: THRESHOLD_MAX,
884            });
885        }
886
887        // Validate checkpoint consistency
888        if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
889            return Err(CheckpointError::MissingThresholds);
890        }
891
892        // Restore bar state if there's an incomplete bar
893        // Note: accumulated_trades is reset to empty - intra-bar features won't be
894        // accurate for bars resumed from checkpoint (partial trade history lost)
895        let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
896            (Some(bar), Some((upper, lower))) => Some(OpenDeviationBarState {
897                bar,
898                upper_threshold: upper,
899                lower_threshold: lower,
900                accumulated_trades: SmallVec::new(), // Lost on checkpoint - features may be partial
901                scratch_prices: SmallVec::new(),
902                scratch_volumes: SmallVec::new(),
903            }),
904            _ => None,
905        };
906
907        // Issue #96 Task #98: Pre-compute threshold ratio (same as with_options)
908        let threshold_ratio = ((checkpoint.threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
909            / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
910
911        Ok(Self {
912            threshold_decimal_bps: checkpoint.threshold_decimal_bps,
913            threshold_ratio,
914            current_bar_state,
915            price_window: PriceWindow::new(), // Reset - will be rebuilt from new trades
916            last_trade_id: checkpoint.last_trade_id,
917            last_timestamp_us: checkpoint.last_timestamp_us,
918            anomaly_summary: checkpoint.anomaly_summary,
919            resumed_from_checkpoint: true, // Signal to continue from existing bar state
920            prevent_same_timestamp_close: checkpoint.prevent_same_timestamp_close,
921            defer_open: checkpoint.defer_open, // Issue #46: Restore deferred open state
922            trade_history: None,               // Issue #59: Must be re-enabled after restore
923            inter_bar_config: None,            // Issue #59: Must be re-enabled after restore
924            include_intra_bar_features: false, // Issue #59: Must be re-enabled after restore
925            intra_bar_config: crate::intrabar::IntraBarConfig::default(), // Issue #128
926            max_gap_us: 3_600_000_000,         // Issue #112: 1 hour default
927        })
928    }
929
930    /// Migrate checkpoint between schema versions
931    /// Issue #85 Phase 2: Handle v1 → v2 migration
932    /// Safe: JSON deserialization is field-name-based, so old v1 checkpoints load correctly
933    fn migrate_checkpoint(mut checkpoint: Checkpoint) -> Checkpoint {
934        match checkpoint.version {
935            1 => {
936                // v1 → v2: OpenDeviationBar struct field reordering (no behavioral changes)
937                // JSON serialization is position-independent, so no transformation needed
938                checkpoint.version = 2;
939                checkpoint
940            }
941            2 => {
942                // Already current version
943                checkpoint
944            }
945            _ => {
946                // Unknown version - log warning and continue with best effort
947                eprintln!(
948                    "Warning: Checkpoint has unknown version {}, treating as v2",
949                    checkpoint.version
950                );
951                checkpoint.version = 2;
952                checkpoint
953            }
954        }
955    }
956
957    /// Verify we're at the right position in the data stream
958    ///
959    /// Call with first trade of new file to verify continuity.
960    /// Returns verification result indicating if there's a gap or exact match.
961    ///
962    /// # Arguments
963    ///
964    /// * `first_trade` - First trade of the new file/chunk
965    ///
966    /// # Example
967    ///
968    /// ```ignore
969    /// let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint)?;
970    /// let verification = processor.verify_position(&next_file_trades[0]);
971    /// match verification {
972    ///     PositionVerification::Exact => println!("Perfect continuation!"),
973    ///     PositionVerification::Gap { missing_count, .. } => {
974    ///         println!("Warning: {} trades missing", missing_count);
975    ///     }
976    ///     PositionVerification::TimestampOnly { gap_ms } => {
977    ///         println!("Exness data: {}ms gap", gap_ms);
978    ///     }
979    /// }
980    /// ```
981    pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
982        match self.last_trade_id {
983            Some(last_id) => {
984                // Binance: has trade IDs - check for gaps
985                let expected_id = last_id + 1;
986                if first_trade.agg_trade_id == expected_id {
987                    PositionVerification::Exact
988                } else {
989                    let missing_count = first_trade.agg_trade_id - expected_id;
990                    PositionVerification::Gap {
991                        expected_id,
992                        actual_id: first_trade.agg_trade_id,
993                        missing_count,
994                    }
995                }
996            }
997            None => {
998                // Exness: no trade IDs - use timestamp only
999                let gap_us = first_trade.timestamp - self.last_timestamp_us;
1000                let gap_ms = gap_us / 1000;
1001                PositionVerification::TimestampOnly { gap_ms }
1002            }
1003        }
1004    }
1005
1006    /// Get the current anomaly summary
1007    pub fn anomaly_summary(&self) -> &AnomalySummary {
1008        &self.anomaly_summary
1009    }
1010
1011    /// Get the threshold in decimal basis points
1012    pub fn threshold_decimal_bps(&self) -> u32 {
1013        self.threshold_decimal_bps
1014    }
1015
1016    /// Validate that trades are properly sorted for deterministic processing
1017    ///
1018    /// Issue #96 Task #62: Early-exit optimization for sorted data
1019    /// For typical workloads (95%+ sorted), quick first/last check identifies
1020    /// unsorted batches immediately without full O(n) validation.
1021    fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
1022        if trades.is_empty() {
1023            return Ok(());
1024        }
1025
1026        // Issue #96 Task #62: Fast-path check for obviously unsorted data
1027        // If first and last trades are not ordered, data is definitely unsorted
1028        // This early-exit catches common failures without full validation
1029        let first = &trades[0];
1030        let last = &trades[trades.len() - 1];
1031
1032        if last.timestamp < first.timestamp
1033            || (last.timestamp == first.timestamp && last.agg_trade_id <= first.agg_trade_id)
1034        {
1035            // Definitely unsorted - find exact error location (cold path)
1036            return find_unsorted_trade(trades);
1037        }
1038
1039        // Full validation for typical sorted case
1040        for i in 1..trades.len() {
1041            let prev = &trades[i - 1];
1042            let curr = &trades[i];
1043
1044            // Check ordering: (timestamp, agg_trade_id) ascending
1045            if curr.timestamp < prev.timestamp
1046                || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
1047            {
1048                return unsorted_trade_error(i, prev, curr);
1049            }
1050        }
1051
1052        Ok(())
1053    }
1054
1055    /// Reset processor state at an ouroboros boundary (year/month/week).
1056    ///
1057    /// Clears the incomplete bar and position tracking while preserving
1058    /// the threshold configuration. Use this when starting fresh at a
1059    /// known boundary for reproducibility.
1060    ///
1061    /// # Returns
1062    ///
1063    /// The orphaned incomplete bar (if any) so caller can decide
1064    /// whether to include it in results with `is_orphan=True` flag.
1065    ///
1066    /// # Example
1067    ///
1068    /// ```ignore
1069    /// // At year boundary (Jan 1 00:00:00 UTC)
1070    /// let orphaned = processor.reset_at_ouroboros();
1071    /// if let Some(bar) = orphaned {
1072    ///     // Handle incomplete bar from previous year
1073    /// }
1074    /// // Continue processing new year's data with clean state
1075    /// ```
1076    pub fn reset_at_ouroboros(&mut self) -> Option<OpenDeviationBar> {
1077        let orphaned = self.current_bar_state.take().map(|state| state.bar);
1078        self.price_window = PriceWindow::new();
1079        self.last_trade_id = None;
1080        self.last_timestamp_us = 0;
1081        self.resumed_from_checkpoint = false;
1082        self.defer_open = false;
1083        // Issue #81: Clear bar boundary tracking at ouroboros reset.
1084        // Trades are preserved — still valid lookback for first bar of new segment.
1085        if let Some(ref mut history) = self.trade_history {
1086            history.reset_bar_boundaries();
1087        }
1088        orphaned
1089    }
1090}
1091
1092/// Internal state for a open deviation bar being built
1093#[derive(Clone)]
1094struct OpenDeviationBarState {
1095    /// The open deviation bar being constructed
1096    pub bar: OpenDeviationBar,
1097
1098    /// Upper breach threshold (FIXED from bar open)
1099    pub upper_threshold: FixedPoint,
1100
1101    /// Lower breach threshold (FIXED from bar open)
1102    pub lower_threshold: FixedPoint,
1103
1104    /// Accumulated trades for intra-bar feature computation (Issue #59)
1105    ///
1106    /// When intra-bar features are enabled, trades are accumulated here
1107    /// during bar construction and used to compute features at bar close.
1108    /// Cleared when bar closes to free memory.
1109    /// Issue #136: Optimized from 512 to 64 slots (saves 87.5% stack memory).
1110    /// Profile data: max trades/bar = 26 (P99 = 14), so 64 slots provides
1111    /// 2.5x safety margin while reducing from 32KB to 4KB per bar.
1112    pub accumulated_trades: SmallVec<[AggTrade; 64]>,
1113
1114    /// Issue #96: Scratch buffer for intra-bar price extraction
1115    /// SmallVec<[f64; 64]> keeps 95%+ of bars on stack (P99 trades/bar = 14, max = 26)
1116    /// Eliminates heap allocation for typical bars, spills transparently for large ones
1117    pub scratch_prices: SmallVec<[f64; 64]>,
1118
1119    /// Issue #96: Scratch buffer for intra-bar volume extraction
1120    /// Same sizing rationale as scratch_prices
1121    pub scratch_volumes: SmallVec<[f64; 64]>,
1122}
1123
1124impl OpenDeviationBarState {
1125    /// Create new open deviation bar state from opening trade
1126    /// Issue #96 Task #98: Accept pre-computed threshold_ratio for fast threshold calculation
1127    #[inline]
1128    fn new(trade: &AggTrade, threshold_ratio: i64) -> Self {
1129        let bar = OpenDeviationBar::new(trade);
1130
1131        // Issue #96 Task #98: Use cached ratio instead of repeated division
1132        // This avoids BASIS_POINTS_SCALE division on every bar creation
1133        let (upper_threshold, lower_threshold) =
1134            bar.open.compute_range_thresholds_cached(threshold_ratio);
1135
1136        Self {
1137            bar,
1138            upper_threshold,
1139            lower_threshold,
1140            accumulated_trades: SmallVec::new(),
1141            scratch_prices: SmallVec::new(),
1142            scratch_volumes: SmallVec::new(),
1143        }
1144    }
1145
1146    /// Create new open deviation bar state with intra-bar feature accumulation
1147    /// Issue #96 Task #98: Accept pre-computed threshold_ratio for fast threshold calculation
1148    #[inline]
1149    fn new_with_trade_accumulation(trade: &AggTrade, threshold_ratio: i64) -> Self {
1150        let bar = OpenDeviationBar::new(trade);
1151
1152        // Issue #96 Task #98: Use cached ratio instead of repeated division
1153        // This avoids BASIS_POINTS_SCALE division on every bar creation
1154        let (upper_threshold, lower_threshold) =
1155            bar.open.compute_range_thresholds_cached(threshold_ratio);
1156
1157        Self {
1158            bar,
1159            upper_threshold,
1160            lower_threshold,
1161            accumulated_trades: {
1162                let mut sv = SmallVec::new();
1163                sv.push(trade.clone());
1164                sv
1165            },
1166            scratch_prices: SmallVec::new(),
1167            scratch_volumes: SmallVec::new(),
1168        }
1169    }
1170
1171    /// Accumulate a trade for intra-bar feature computation
1172    ///
1173    /// Issue #96 Task #44: Only accumulates if intra-bar features are enabled,
1174    /// avoiding unnecessary clones for the majority of use cases where they're disabled.
1175    /// Issue #96 Task #79: #[inline] allows compiler to fold invariant branch
1176    /// (include_intra is constant for processor lifetime)
1177    #[inline]
1178    fn accumulate_trade(&mut self, trade: &AggTrade, include_intra: bool) {
1179        if include_intra {
1180            self.accumulated_trades.push(trade.clone());
1181        }
1182    }
1183}
1184
1185#[cfg(test)]
1186mod tests {
1187    use super::*;
1188    use crate::test_utils::{self, scenarios};
1189
1190    #[test]
1191    fn test_single_bar_no_breach() {
1192        let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); // 250 dbps = 0.25%
1193
1194        // Create trades that stay within 250 dbps threshold
1195        let trades = scenarios::no_breach_sequence(250);
1196
1197        // Test strict algorithm compliance: no bars should be created without breach
1198        let bars = processor.process_agg_trade_records(&trades).unwrap();
1199        assert_eq!(
1200            bars.len(),
1201            0,
1202            "Strict algorithm should not create bars without breach"
1203        );
1204
1205        // Test analysis mode: incomplete bar should be available for analysis
1206        let bars_with_incomplete = processor
1207            .process_agg_trade_records_with_incomplete(&trades)
1208            .unwrap();
1209        assert_eq!(
1210            bars_with_incomplete.len(),
1211            1,
1212            "Analysis mode should include incomplete bar"
1213        );
1214
1215        let bar = &bars_with_incomplete[0];
1216        assert_eq!(bar.open.to_string(), "50000.00000000");
1217        assert_eq!(bar.high.to_string(), "50100.00000000");
1218        assert_eq!(bar.low.to_string(), "49900.00000000");
1219        assert_eq!(bar.close.to_string(), "49900.00000000");
1220    }
1221
1222    #[test]
1223    fn test_exact_breach_upward() {
1224        let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); // 250 dbps = 0.25%
1225
1226        let trades = scenarios::exact_breach_upward(250);
1227
1228        // Test strict algorithm: only completed bars (with breach)
1229        let bars = processor.process_agg_trade_records(&trades).unwrap();
1230        assert_eq!(
1231            bars.len(),
1232            1,
1233            "Strict algorithm should only return completed bars"
1234        );
1235
1236        // First bar should close at breach
1237        let bar1 = &bars[0];
1238        assert_eq!(bar1.open.to_string(), "50000.00000000");
1239        // Breach at 250 dbps = 0.25% = 50000 * 1.0025 = 50125
1240        assert_eq!(bar1.close.to_string(), "50125.00000000"); // Breach tick included
1241        assert_eq!(bar1.high.to_string(), "50125.00000000");
1242        assert_eq!(bar1.low.to_string(), "50000.00000000");
1243
1244        // Test analysis mode: includes incomplete second bar
1245        let bars_with_incomplete = processor
1246            .process_agg_trade_records_with_incomplete(&trades)
1247            .unwrap();
1248        assert_eq!(
1249            bars_with_incomplete.len(),
1250            2,
1251            "Analysis mode should include incomplete bars"
1252        );
1253
1254        // Second bar should start at next tick price (not breach price)
1255        let bar2 = &bars_with_incomplete[1];
1256        assert_eq!(bar2.open.to_string(), "50500.00000000"); // Next tick after breach
1257        assert_eq!(bar2.close.to_string(), "50500.00000000");
1258    }
1259
1260    #[test]
1261    fn test_exact_breach_downward() {
1262        let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
1263
1264        let trades = scenarios::exact_breach_downward(250);
1265
1266        let bars = processor.process_agg_trade_records(&trades).unwrap();
1267
1268        assert_eq!(bars.len(), 1);
1269
1270        let bar = &bars[0];
1271        assert_eq!(bar.open.to_string(), "50000.00000000");
1272        assert_eq!(bar.close.to_string(), "49875.00000000"); // Breach tick included
1273        assert_eq!(bar.high.to_string(), "50000.00000000");
1274        assert_eq!(bar.low.to_string(), "49875.00000000");
1275    }
1276
1277    #[test]
1278    fn test_large_gap_single_bar() {
1279        let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
1280
1281        let trades = scenarios::large_gap_sequence();
1282
1283        let bars = processor.process_agg_trade_records(&trades).unwrap();
1284
1285        // Should create exactly ONE bar, not multiple bars to "fill the gap"
1286        assert_eq!(bars.len(), 1);
1287
1288        let bar = &bars[0];
1289        assert_eq!(bar.open.to_string(), "50000.00000000");
1290        assert_eq!(bar.close.to_string(), "51000.00000000");
1291        assert_eq!(bar.high.to_string(), "51000.00000000");
1292        assert_eq!(bar.low.to_string(), "50000.00000000");
1293    }
1294
1295    #[test]
1296    fn test_unsorted_trades_error() {
1297        let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
1298
1299        let trades = scenarios::unsorted_sequence();
1300
1301        let result = processor.process_agg_trade_records(&trades);
1302        assert!(result.is_err());
1303
1304        match result {
1305            Err(ProcessingError::UnsortedTrades { index, .. }) => {
1306                assert_eq!(index, 1);
1307            }
1308            _ => panic!("Expected UnsortedTrades error"),
1309        }
1310    }
1311
1312    #[test]
1313    fn test_threshold_calculation() {
1314        let processor = OpenDeviationBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
1315
1316        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1317        let bar_state = OpenDeviationBarState::new(&trade, processor.threshold_ratio);
1318
1319        // 50000 * 0.0025 = 125 (25bps = 0.25%)
1320        assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
1321        assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
1322    }
1323
1324    #[test]
1325    fn test_empty_trades() {
1326        let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
1327        let trades = scenarios::empty_sequence();
1328        let bars = processor.process_agg_trade_records(&trades).unwrap();
1329        assert_eq!(bars.len(), 0);
1330    }
1331
1332    #[test]
1333    fn test_debug_streaming_data() {
1334        let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1335
1336        // Create trades similar to our test data
1337        let trades = vec![
1338            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1339            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
1340            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1341        ];
1342
1343        println!("Test data prices: 50014 -> 50163 -> 50032");
1344        println!("Expected price movements: +0.3% then -0.26%");
1345
1346        let bars = processor.process_agg_trade_records(&trades).unwrap();
1347        println!("Generated {} open deviation bars", bars.len());
1348
1349        for (i, bar) in bars.iter().enumerate() {
1350            println!(
1351                "  Bar {}: O={} H={} L={} C={}",
1352                i + 1,
1353                bar.open,
1354                bar.high,
1355                bar.low,
1356                bar.close
1357            );
1358        }
1359
1360        // With a 0.1% threshold and 0.3% price movement, we should get at least 1 bar
1361        assert!(
1362            !bars.is_empty(),
1363            "Expected at least 1 open deviation bar with 0.3% price movement and 0.1% threshold"
1364        );
1365    }
1366
1367    #[test]
1368    fn test_threshold_validation() {
1369        // Valid threshold
1370        assert!(OpenDeviationBarProcessor::new(250).is_ok());
1371
1372        // Invalid: too low (0 × 0.1bps = 0%)
1373        assert!(matches!(
1374            OpenDeviationBarProcessor::new(0),
1375            Err(ProcessingError::InvalidThreshold {
1376                threshold_decimal_bps: 0
1377            })
1378        ));
1379
1380        // Invalid: too high (150,000 × 0.1bps = 15,000bps = 150%)
1381        assert!(matches!(
1382            OpenDeviationBarProcessor::new(150_000),
1383            Err(ProcessingError::InvalidThreshold {
1384                threshold_decimal_bps: 150_000
1385            })
1386        ));
1387
1388        // Valid boundary: minimum (1 × 0.1bps = 0.1bps = 0.001%)
1389        assert!(OpenDeviationBarProcessor::new(1).is_ok());
1390
1391        // Valid boundary: maximum (100,000 × 0.1bps = 10,000bps = 100%)
1392        assert!(OpenDeviationBarProcessor::new(100_000).is_ok());
1393    }
1394
1395    #[test]
1396    fn test_export_processor_with_manual_trades() {
1397        println!("Testing ExportOpenDeviationBarProcessor with same trade data...");
1398
1399        let mut export_processor = ExportOpenDeviationBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1400
1401        // Use same trades as the working basic test
1402        let trades = vec![
1403            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1404            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
1405            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1406        ];
1407
1408        println!(
1409            "Processing {} trades with ExportOpenDeviationBarProcessor...",
1410            trades.len()
1411        );
1412
1413        export_processor.process_trades_continuously(&trades);
1414        let bars = export_processor.get_all_completed_bars();
1415
1416        println!(
1417            "ExportOpenDeviationBarProcessor generated {} open deviation bars",
1418            bars.len()
1419        );
1420        for (i, bar) in bars.iter().enumerate() {
1421            println!(
1422                "  Bar {}: O={} H={} L={} C={}",
1423                i + 1,
1424                bar.open,
1425                bar.high,
1426                bar.low,
1427                bar.close
1428            );
1429        }
1430
1431        // Should match the basic processor results (1 bar)
1432        assert!(
1433            !bars.is_empty(),
1434            "ExportOpenDeviationBarProcessor should generate same results as basic processor"
1435        );
1436    }
1437
1438    // === CHECKPOINT TESTS (Issues #2 and #3) ===
1439
1440    #[test]
1441    fn test_checkpoint_creation() {
1442        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1443
1444        // Process some trades that don't complete a bar
1445        let trades = scenarios::no_breach_sequence(250);
1446        let _bars = processor.process_agg_trade_records(&trades).unwrap();
1447
1448        // Create checkpoint
1449        let checkpoint = processor.create_checkpoint("BTCUSDT");
1450
1451        assert_eq!(checkpoint.symbol, "BTCUSDT");
1452        assert_eq!(checkpoint.threshold_decimal_bps, 250);
1453        assert!(checkpoint.has_incomplete_bar()); // Should have incomplete bar
1454        assert!(checkpoint.thresholds.is_some()); // Thresholds should be saved
1455        assert!(checkpoint.last_trade_id.is_some()); // Should track last trade
1456    }
1457
1458    #[test]
1459    fn test_checkpoint_serialization_roundtrip() {
1460        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1461
1462        // Process trades
1463        let trades = scenarios::no_breach_sequence(250);
1464        let _bars = processor.process_agg_trade_records(&trades).unwrap();
1465
1466        // Create checkpoint
1467        let checkpoint = processor.create_checkpoint("BTCUSDT");
1468
1469        // Serialize to JSON
1470        let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
1471
1472        // Deserialize back
1473        let restored: Checkpoint =
1474            serde_json::from_str(&json).expect("Deserialization should succeed");
1475
1476        assert_eq!(restored.symbol, checkpoint.symbol);
1477        assert_eq!(
1478            restored.threshold_decimal_bps,
1479            checkpoint.threshold_decimal_bps
1480        );
1481        assert_eq!(
1482            restored.incomplete_bar.is_some(),
1483            checkpoint.incomplete_bar.is_some()
1484        );
1485    }
1486
1487    #[test]
1488    fn test_cross_file_bar_continuation() {
1489        // This is the PRIMARY test for Issues #2 and #3
1490        // Verifies that incomplete bars continue correctly across file boundaries
1491
1492        // Create trades that span multiple bars
1493        let mut all_trades = Vec::new();
1494
1495        // Generate enough trades to produce multiple bars
1496        // Using 100bps threshold (1%) for clearer price movements
1497        let base_timestamp = 1640995200000000i64; // Microseconds
1498
1499        // Create a sequence where we'll have ~3-4 completed bars with remainder
1500        for i in 0..20 {
1501            let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
1502            let trade = test_utils::create_test_agg_trade(
1503                i + 1,
1504                &format!("{:.8}", price),
1505                "1.0",
1506                base_timestamp + (i * 1000000),
1507            );
1508            all_trades.push(trade);
1509        }
1510
1511        // === FULL PROCESSING (baseline) ===
1512        let mut processor_full = OpenDeviationBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1513        let bars_full = processor_full
1514            .process_agg_trade_records(&all_trades)
1515            .unwrap();
1516
1517        // === SPLIT PROCESSING WITH CHECKPOINT ===
1518        let split_point = 10; // Split in the middle
1519
1520        // Part 1: Process first half
1521        let mut processor_1 = OpenDeviationBarProcessor::new(100).unwrap();
1522        let part1_trades = &all_trades[0..split_point];
1523        let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
1524
1525        // Create checkpoint
1526        let checkpoint = processor_1.create_checkpoint("TEST");
1527
1528        // Part 2: Resume from checkpoint and process second half
1529        let mut processor_2 = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
1530        let part2_trades = &all_trades[split_point..];
1531        let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
1532
1533        // === VERIFY CONTINUATION ===
1534        // Total completed bars should match full processing
1535        let split_total = bars_1.len() + bars_2.len();
1536
1537        println!("Full processing: {} bars", bars_full.len());
1538        println!(
1539            "Split processing: {} + {} = {} bars",
1540            bars_1.len(),
1541            bars_2.len(),
1542            split_total
1543        );
1544
1545        assert_eq!(
1546            split_total,
1547            bars_full.len(),
1548            "Split processing should produce same bar count as full processing"
1549        );
1550
1551        // Verify the bars themselves match
1552        let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
1553        for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
1554            assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
1555            assert_eq!(
1556                full.close.0, split.close.0,
1557                "Bar {} close price mismatch",
1558                i
1559            );
1560        }
1561    }
1562
1563    #[test]
1564    fn test_verify_position_exact() {
1565        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1566
1567        // Process some trades
1568        let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1569        let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1570
1571        let _ = processor.process_single_trade(&trade1);
1572        let _ = processor.process_single_trade(&trade2);
1573
1574        // Create next trade in sequence
1575        let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
1576
1577        // Verify position
1578        let verification = processor.verify_position(&next_trade);
1579
1580        assert_eq!(verification, PositionVerification::Exact);
1581    }
1582
1583    #[test]
1584    fn test_verify_position_gap() {
1585        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1586
1587        // Process some trades
1588        let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1589        let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1590
1591        let _ = processor.process_single_trade(&trade1);
1592        let _ = processor.process_single_trade(&trade2);
1593
1594        // Create next trade with gap (skip IDs 102-104)
1595        let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
1596
1597        // Verify position
1598        let verification = processor.verify_position(&next_trade);
1599
1600        match verification {
1601            PositionVerification::Gap {
1602                expected_id,
1603                actual_id,
1604                missing_count,
1605            } => {
1606                assert_eq!(expected_id, 102);
1607                assert_eq!(actual_id, 105);
1608                assert_eq!(missing_count, 3);
1609            }
1610            _ => panic!("Expected Gap verification, got {:?}", verification),
1611        }
1612    }
1613
1614    // Issue #96 Task #97: Test TimestampOnly branch (Exness path, no trade IDs)
1615    #[test]
1616    fn test_verify_position_timestamp_only() {
1617        // Fresh processor has last_trade_id = None (simulates Exness path)
1618        let processor = OpenDeviationBarProcessor::new(250).unwrap();
1619
1620        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 5000000);
1621        let verification = processor.verify_position(&trade);
1622
1623        // last_trade_id is None → TimestampOnly branch
1624        // gap_ms = (5000000 - 0) / 1000 = 5000
1625        match verification {
1626            PositionVerification::TimestampOnly { gap_ms } => {
1627                assert_eq!(gap_ms, 5000, "gap_ms should be (timestamp - 0) / 1000");
1628            }
1629            _ => panic!("Expected TimestampOnly verification, got {:?}", verification),
1630        }
1631    }
1632
1633    #[test]
1634    fn test_checkpoint_clean_completion() {
1635        // Test when last trade completes a bar with no remainder
1636        // In open deviation bar algorithm: breach trade closes bar, NEXT trade opens new bar
1637        // If there's no next trade, there's no incomplete bar
1638        let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); // 10bps
1639
1640        // Create trades that complete exactly one bar
1641        let trades = vec![
1642            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1643            test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // ~0.2% move, breaches 0.1%
1644        ];
1645
1646        let bars = processor.process_agg_trade_records(&trades).unwrap();
1647        assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1648
1649        // Create checkpoint - should NOT have incomplete bar
1650        // (breach trade closes bar, no next trade to open new bar)
1651        let checkpoint = processor.create_checkpoint("TEST");
1652
1653        // With defer_open logic, the next bar isn't started until the next trade
1654        assert!(
1655            !checkpoint.has_incomplete_bar(),
1656            "No incomplete bar when last trade was a breach with no following trade"
1657        );
1658    }
1659
1660    #[test]
1661    fn test_checkpoint_with_remainder() {
1662        // Test when we have trades remaining after a completed bar
1663        let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); // 10bps
1664
1665        // Create trades: bar completes at trade 2, trade 3 starts new bar
1666        let trades = vec![
1667            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1668            test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // Breach
1669            test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), // Opens new bar
1670        ];
1671
1672        let bars = processor.process_agg_trade_records(&trades).unwrap();
1673        assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1674
1675        // Create checkpoint - should have incomplete bar from trade 3
1676        let checkpoint = processor.create_checkpoint("TEST");
1677
1678        assert!(
1679            checkpoint.has_incomplete_bar(),
1680            "Should have incomplete bar from trade 3"
1681        );
1682
1683        // Verify the incomplete bar has correct data
1684        let incomplete = checkpoint.incomplete_bar.unwrap();
1685        assert_eq!(
1686            incomplete.open.to_string(),
1687            "50110.00000000",
1688            "Incomplete bar should open at trade 3 price"
1689        );
1690    }
1691
1692    /// Issue #46: Verify streaming and batch paths produce identical bars
1693    ///
1694    /// The batch path (`process_agg_trade_records`) and streaming path
1695    /// (`process_single_trade`) must produce identical OHLCV output for
1696    /// the same input trades. This test catches regressions where the
1697    /// breaching trade is double-counted or bar boundaries differ.
1698    #[test]
1699    fn test_streaming_batch_parity() {
1700        let threshold = 250; // 250 dbps = 0.25%
1701
1702        // Build a sequence with multiple breaches
1703        let trades = test_utils::AggTradeBuilder::new()
1704            .add_trade(1, 1.0, 0) // Open first bar at 50000
1705            .add_trade(2, 1.001, 1000) // +0.1% - accumulate
1706            .add_trade(3, 1.003, 2000) // +0.3% - breach (>0.25%)
1707            .add_trade(4, 1.004, 3000) // Opens second bar
1708            .add_trade(5, 1.005, 4000) // Accumulate
1709            .add_trade(6, 1.008, 5000) // +0.4% from bar 2 open - breach
1710            .add_trade(7, 1.009, 6000) // Opens third bar
1711            .build();
1712
1713        // === BATCH PATH ===
1714        let mut batch_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
1715        let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
1716        let batch_incomplete = batch_processor.get_incomplete_bar();
1717
1718        // === STREAMING PATH ===
1719        let mut stream_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
1720        let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
1721        for trade in &trades {
1722            if let Some(bar) = stream_processor
1723                .process_single_trade(trade)
1724                .unwrap()
1725            {
1726                stream_bars.push(bar);
1727            }
1728        }
1729        let stream_incomplete = stream_processor.get_incomplete_bar();
1730
1731        // === VERIFY PARITY ===
1732        assert_eq!(
1733            batch_bars.len(),
1734            stream_bars.len(),
1735            "Batch and streaming should produce same number of completed bars"
1736        );
1737
1738        for (i, (batch_bar, stream_bar)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1739            assert_eq!(
1740                batch_bar.open, stream_bar.open,
1741                "Bar {i}: open price mismatch"
1742            );
1743            assert_eq!(
1744                batch_bar.close, stream_bar.close,
1745                "Bar {i}: close price mismatch"
1746            );
1747            assert_eq!(
1748                batch_bar.high, stream_bar.high,
1749                "Bar {i}: high price mismatch"
1750            );
1751            assert_eq!(batch_bar.low, stream_bar.low, "Bar {i}: low price mismatch");
1752            assert_eq!(
1753                batch_bar.volume, stream_bar.volume,
1754                "Bar {i}: volume mismatch (double-counting?)"
1755            );
1756            assert_eq!(
1757                batch_bar.open_time, stream_bar.open_time,
1758                "Bar {i}: open_time mismatch"
1759            );
1760            assert_eq!(
1761                batch_bar.close_time, stream_bar.close_time,
1762                "Bar {i}: close_time mismatch"
1763            );
1764            assert_eq!(
1765                batch_bar.individual_trade_count, stream_bar.individual_trade_count,
1766                "Bar {i}: trade count mismatch"
1767            );
1768        }
1769
1770        // Verify incomplete bars match
1771        match (batch_incomplete, stream_incomplete) {
1772            (Some(b), Some(s)) => {
1773                assert_eq!(b.open, s.open, "Incomplete bar: open mismatch");
1774                assert_eq!(b.close, s.close, "Incomplete bar: close mismatch");
1775                assert_eq!(b.volume, s.volume, "Incomplete bar: volume mismatch");
1776            }
1777            (None, None) => {} // Both finished cleanly
1778            _ => panic!("Incomplete bar presence mismatch between batch and streaming"),
1779        }
1780    }
1781
1782    /// Issue #96: Proptest-enhanced batch vs streaming parity
1783    ///
1784    /// Generates random trade sequences (200-500 trades) with realistic price
1785    /// movements and verifies bit-exact parity between batch and streaming paths
1786    /// on ALL bar fields including microstructure features.
1787    mod proptest_batch_streaming_parity {
1788        use super::*;
1789        use proptest::prelude::*;
1790
1791        /// Generate a realistic trade sequence with random price movements
1792        fn trade_sequence(
1793            n: usize,
1794            base_price: f64,
1795            volatility: f64,
1796        ) -> Vec<AggTrade> {
1797            let mut trades = Vec::with_capacity(n);
1798            let mut price = base_price;
1799            let base_ts = 1640995200000i64; // 2022-01-01
1800
1801            for i in 0..n {
1802                // Deterministic price walk using sin/cos (proptest handles seed)
1803                let step = ((i as f64 * 0.3).sin() * volatility)
1804                    + ((i as f64 * 0.07).cos() * volatility * 0.5);
1805                price += step;
1806                // Clamp to avoid negative prices
1807                if price < 100.0 {
1808                    price = 100.0 + (i as f64 * 0.01).sin().abs() * 50.0;
1809                }
1810
1811                let trade = test_utils::create_test_agg_trade_with_range(
1812                    i as i64 + 1,
1813                    &format!("{:.8}", price),
1814                    "1.50000000",
1815                    base_ts + (i as i64 * 500), // 500ms apart
1816                    (i as i64 + 1) * 10,
1817                    (i as i64 + 1) * 10,
1818                    i % 3 != 0, // Mix of buy/sell sides
1819                );
1820                trades.push(trade);
1821            }
1822            trades
1823        }
1824
1825        /// Assert two bars are bit-exact on all OHLCV and microstructure fields
1826        fn assert_bar_parity(i: usize, batch: &OpenDeviationBar, stream: &OpenDeviationBar) {
1827            // Tier 1: OHLCV core
1828            assert_eq!(batch.open_time, stream.open_time, "Bar {i}: open_time");
1829            assert_eq!(batch.close_time, stream.close_time, "Bar {i}: close_time");
1830            assert_eq!(batch.open, stream.open, "Bar {i}: open");
1831            assert_eq!(batch.high, stream.high, "Bar {i}: high");
1832            assert_eq!(batch.low, stream.low, "Bar {i}: low");
1833            assert_eq!(batch.close, stream.close, "Bar {i}: close");
1834
1835            // Tier 2: Volume accumulators
1836            assert_eq!(batch.volume, stream.volume, "Bar {i}: volume");
1837            assert_eq!(batch.turnover, stream.turnover, "Bar {i}: turnover");
1838            assert_eq!(batch.buy_volume, stream.buy_volume, "Bar {i}: buy_volume");
1839            assert_eq!(batch.sell_volume, stream.sell_volume, "Bar {i}: sell_volume");
1840            assert_eq!(batch.buy_turnover, stream.buy_turnover, "Bar {i}: buy_turnover");
1841            assert_eq!(batch.sell_turnover, stream.sell_turnover, "Bar {i}: sell_turnover");
1842
1843            // Tier 3: Trade tracking
1844            assert_eq!(batch.individual_trade_count, stream.individual_trade_count, "Bar {i}: trade_count");
1845            assert_eq!(batch.agg_record_count, stream.agg_record_count, "Bar {i}: agg_record_count");
1846            assert_eq!(batch.first_trade_id, stream.first_trade_id, "Bar {i}: first_trade_id");
1847            assert_eq!(batch.last_trade_id, stream.last_trade_id, "Bar {i}: last_trade_id");
1848            assert_eq!(batch.first_agg_trade_id, stream.first_agg_trade_id, "Bar {i}: first_agg_trade_id");
1849            assert_eq!(batch.last_agg_trade_id, stream.last_agg_trade_id, "Bar {i}: last_agg_trade_id");
1850            assert_eq!(batch.buy_trade_count, stream.buy_trade_count, "Bar {i}: buy_trade_count");
1851            assert_eq!(batch.sell_trade_count, stream.sell_trade_count, "Bar {i}: sell_trade_count");
1852
1853            // Tier 4: VWAP
1854            assert_eq!(batch.vwap, stream.vwap, "Bar {i}: vwap");
1855
1856            // Tier 5: Microstructure f64 features (bit-exact comparison)
1857            assert_eq!(batch.duration_us, stream.duration_us, "Bar {i}: duration_us");
1858            assert_eq!(batch.ofi.to_bits(), stream.ofi.to_bits(), "Bar {i}: ofi");
1859            assert_eq!(batch.vwap_close_deviation.to_bits(), stream.vwap_close_deviation.to_bits(), "Bar {i}: vwap_close_dev");
1860            assert_eq!(batch.price_impact.to_bits(), stream.price_impact.to_bits(), "Bar {i}: price_impact");
1861            assert_eq!(batch.kyle_lambda_proxy.to_bits(), stream.kyle_lambda_proxy.to_bits(), "Bar {i}: kyle_lambda");
1862            assert_eq!(batch.trade_intensity.to_bits(), stream.trade_intensity.to_bits(), "Bar {i}: trade_intensity");
1863            assert_eq!(batch.volume_per_trade.to_bits(), stream.volume_per_trade.to_bits(), "Bar {i}: vol_per_trade");
1864            assert_eq!(batch.aggression_ratio.to_bits(), stream.aggression_ratio.to_bits(), "Bar {i}: aggression_ratio");
1865            assert_eq!(batch.aggregation_density_f64.to_bits(), stream.aggregation_density_f64.to_bits(), "Bar {i}: agg_density");
1866            assert_eq!(batch.turnover_imbalance.to_bits(), stream.turnover_imbalance.to_bits(), "Bar {i}: turnover_imbalance");
1867        }
1868
1869        proptest! {
1870            /// 200-500 random trades, 250 dbps threshold
1871            #[test]
1872            fn batch_streaming_parity_random(
1873                n in 200usize..500,
1874                volatility in 10.0f64..200.0,
1875            ) {
1876                let trades = trade_sequence(n, 50000.0, volatility);
1877
1878                // Batch path
1879                let mut batch_proc = OpenDeviationBarProcessor::new(250).unwrap();
1880                let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
1881                let batch_incomplete = batch_proc.get_incomplete_bar();
1882
1883                // Streaming path
1884                let mut stream_proc = OpenDeviationBarProcessor::new(250).unwrap();
1885                let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
1886                for trade in &trades {
1887                    if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
1888                        stream_bars.push(bar);
1889                    }
1890                }
1891                let stream_incomplete = stream_proc.get_incomplete_bar();
1892
1893                // Bar count parity
1894                prop_assert_eq!(batch_bars.len(), stream_bars.len(),
1895                    "Completed bar count mismatch: batch={}, stream={} for n={}, vol={}",
1896                    batch_bars.len(), stream_bars.len(), n, volatility);
1897
1898                // Field-level parity for all completed bars
1899                for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1900                    assert_bar_parity(i, b, s);
1901                }
1902
1903                // Incomplete bar parity
1904                match (&batch_incomplete, &stream_incomplete) {
1905                    (Some(b), Some(s)) => assert_bar_parity(batch_bars.len(), b, s),
1906                    (None, None) => {}
1907                    _ => prop_assert!(false,
1908                        "Incomplete bar presence mismatch: batch={}, stream={}",
1909                        batch_incomplete.is_some(), stream_incomplete.is_some()),
1910                }
1911            }
1912
1913            /// Vary threshold: 100-1000 dbps
1914            #[test]
1915            fn batch_streaming_parity_thresholds(
1916                threshold in 100u32..1000,
1917            ) {
1918                let trades = trade_sequence(300, 50000.0, 80.0);
1919
1920                let mut batch_proc = OpenDeviationBarProcessor::new(threshold).unwrap();
1921                let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
1922
1923                let mut stream_proc = OpenDeviationBarProcessor::new(threshold).unwrap();
1924                let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
1925                for trade in &trades {
1926                    if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
1927                        stream_bars.push(bar);
1928                    }
1929                }
1930
1931                prop_assert_eq!(batch_bars.len(), stream_bars.len(),
1932                    "Bar count mismatch at threshold={}", threshold);
1933
1934                for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1935                    assert_bar_parity(i, b, s);
1936                }
1937            }
1938        }
1939    }
1940
1941    /// Issue #46: After breach, next trade opens new bar (not breaching trade)
1942    #[test]
1943    fn test_defer_open_new_bar_opens_with_next_trade() {
1944        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1945
1946        // Trade 1: Opens bar at 50000
1947        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1948        assert!(processor.process_single_trade(&t1).unwrap().is_none());
1949
1950        // Trade 2: Breaches threshold (+0.3%)
1951        let t2 = test_utils::create_test_agg_trade(2, "50150.0", "2.0", 2000);
1952        let bar = processor.process_single_trade(&t2).unwrap();
1953        assert!(bar.is_some(), "Should close bar on breach");
1954
1955        let closed_bar = bar.unwrap();
1956        assert_eq!(closed_bar.open.to_string(), "50000.00000000");
1957        assert_eq!(closed_bar.close.to_string(), "50150.00000000");
1958
1959        // After breach, no incomplete bar should exist
1960        assert!(
1961            processor.get_incomplete_bar().is_none(),
1962            "No incomplete bar after breach - defer_open is true"
1963        );
1964
1965        // Trade 3: Should open NEW bar (not the breaching trade)
1966        let t3 = test_utils::create_test_agg_trade(3, "50100.0", "3.0", 3000);
1967        assert!(processor.process_single_trade(&t3).unwrap().is_none());
1968
1969        let incomplete = processor.get_incomplete_bar().unwrap();
1970        assert_eq!(
1971            incomplete.open.to_string(),
1972            "50100.00000000",
1973            "New bar should open at trade 3's price, not trade 2's"
1974        );
1975    }
1976
1977    // === Memory efficiency tests (R1/R2/R3) ===
1978
1979    #[test]
1980    fn test_bar_close_take_single_trade() {
1981        // R1: Verify bar close via single-trade path produces correct OHLCV after
1982        // clone→take optimization. Uses single_breach_sequence that triggers breach.
1983        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1984        let trades = scenarios::single_breach_sequence(250);
1985
1986        for trade in &trades[..trades.len() - 1] {
1987            let result = processor.process_single_trade(trade).unwrap();
1988            assert!(result.is_none());
1989        }
1990
1991        // Last trade triggers breach
1992        let bar = processor
1993            .process_single_trade(trades.last().unwrap())
1994            .unwrap()
1995            .expect("Should produce completed bar");
1996
1997        // Verify OHLCV integrity after take() optimization
1998        assert_eq!(bar.open.to_string(), "50000.00000000");
1999        assert!(bar.high >= bar.open.max(bar.close));
2000        assert!(bar.low <= bar.open.min(bar.close));
2001        assert!(bar.volume > 0);
2002
2003        // Verify processor state is clean after bar close
2004        assert!(processor.get_incomplete_bar().is_none());
2005    }
2006
2007    #[test]
2008    fn test_bar_close_take_batch() {
2009        // R2: Verify batch path produces correct bars after clone→take optimization.
2010        // large_sequence generates enough trades to trigger multiple breaches.
2011        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2012        let trades = scenarios::large_sequence(500);
2013
2014        let bars = processor.process_agg_trade_records(&trades).unwrap();
2015        assert!(
2016            !bars.is_empty(),
2017            "Should produce at least one completed bar"
2018        );
2019
2020        // Verify every bar has valid OHLCV invariants
2021        for bar in &bars {
2022            assert!(bar.high >= bar.open.max(bar.close));
2023            assert!(bar.low <= bar.open.min(bar.close));
2024            assert!(bar.volume > 0);
2025            assert!(bar.close_time >= bar.open_time);
2026        }
2027    }
2028
2029    #[test]
2030    fn test_checkpoint_conditional_clone() {
2031        // R3: Verify checkpoint state is preserved correctly with both
2032        // include_incomplete=true and include_incomplete=false.
2033        let trades = scenarios::no_breach_sequence(250);
2034
2035        // Test with include_incomplete=false (move, no clone)
2036        let mut processor1 = OpenDeviationBarProcessor::new(250).unwrap();
2037        let bars_without = processor1.process_agg_trade_records(&trades).unwrap();
2038        assert_eq!(bars_without.len(), 0);
2039        // Checkpoint should be preserved
2040        assert!(processor1.get_incomplete_bar().is_some());
2041
2042        // Test with include_incomplete=true (clone + consume)
2043        let mut processor2 = OpenDeviationBarProcessor::new(250).unwrap();
2044        let bars_with = processor2
2045            .process_agg_trade_records_with_incomplete(&trades)
2046            .unwrap();
2047        assert_eq!(bars_with.len(), 1);
2048        // Checkpoint should ALSO be preserved (cloned before consume)
2049        assert!(processor2.get_incomplete_bar().is_some());
2050
2051        // Both checkpoints should have identical bar content
2052        let cp1 = processor1.get_incomplete_bar().unwrap();
2053        let cp2 = processor2.get_incomplete_bar().unwrap();
2054        assert_eq!(cp1.open, cp2.open);
2055        assert_eq!(cp1.close, cp2.close);
2056        assert_eq!(cp1.high, cp2.high);
2057        assert_eq!(cp1.low, cp2.low);
2058    }
2059
2060    #[test]
2061    fn test_checkpoint_v1_to_v2_migration() {
2062        // Issue #85 Phase 2: Verify v1→v2 checkpoint migration
2063        // Simulate loading an old v1 checkpoint (without version field)
2064        let v1_json = r#"{
2065            "symbol": "BTCUSDT",
2066            "threshold_decimal_bps": 250,
2067            "incomplete_bar": null,
2068            "thresholds": null,
2069            "last_timestamp_us": 1640995200000000,
2070            "last_trade_id": 5000,
2071            "price_hash": 0,
2072            "anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0},
2073            "prevent_same_timestamp_close": true,
2074            "defer_open": false
2075        }"#;
2076
2077        // Deserialize old v1 checkpoint
2078        let checkpoint: Checkpoint = serde_json::from_str(v1_json).unwrap();
2079        assert_eq!(checkpoint.version, 1, "Old checkpoints should default to v1");
2080        assert_eq!(checkpoint.symbol, "BTCUSDT");
2081        assert_eq!(checkpoint.threshold_decimal_bps, 250);
2082
2083        // Restore processor from v1 checkpoint (triggers migration)
2084        let mut processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2085
2086        // Verify processor is ready to continue
2087        assert!(!processor.get_incomplete_bar().is_some(), "No incomplete bar before processing");
2088
2089        // Process some trades to verify migration worked
2090        let trades = scenarios::single_breach_sequence(250);
2091        let bars = processor.process_agg_trade_records(&trades).unwrap();
2092
2093        // Should produce a bar after migration
2094        assert!(!bars.is_empty(), "Should produce bars after v1→v2 migration");
2095        // Verify bar has valid OHLCV (symbol is in Checkpoint, not OpenDeviationBar)
2096        assert!(bars[0].volume > 0, "Bar should have volume after migration");
2097        assert!(bars[0].close_time >= bars[0].open_time, "Bar times should be valid");
2098
2099        // Create new checkpoint from processor after migration
2100        let new_checkpoint = processor.create_checkpoint("BTCUSDT");
2101        assert_eq!(new_checkpoint.version, 2, "New checkpoints should be v2");
2102        assert_eq!(new_checkpoint.symbol, "BTCUSDT");
2103
2104        // Verify new checkpoint can be serialized and deserialized
2105        let json = serde_json::to_string(&new_checkpoint).unwrap();
2106        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
2107        assert_eq!(restored.version, 2);
2108        assert_eq!(restored.symbol, "BTCUSDT");
2109    }
2110
2111    // =========================================================================
2112    // Issue #96: Checkpoint error path tests
2113    // =========================================================================
2114
2115    #[test]
2116    fn test_from_checkpoint_invalid_threshold_zero() {
2117        let checkpoint = Checkpoint::new(
2118            "BTCUSDT".to_string(), 0, None, None, 0, None, 0, true,
2119        );
2120        match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
2121            Err(CheckpointError::InvalidThreshold { threshold: 0, .. }) => {}
2122            other => panic!("Expected InvalidThreshold(0), got {:?}", other.err()),
2123        }
2124    }
2125
2126    #[test]
2127    fn test_from_checkpoint_invalid_threshold_too_high() {
2128        let checkpoint = Checkpoint::new(
2129            "BTCUSDT".to_string(), 200_000, None, None, 0, None, 0, true,
2130        );
2131        match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
2132            Err(CheckpointError::InvalidThreshold { threshold: 200_000, .. }) => {}
2133            other => panic!("Expected InvalidThreshold(200000), got {:?}", other.err()),
2134        }
2135    }
2136
2137    #[test]
2138    fn test_from_checkpoint_missing_thresholds() {
2139        let bar = OpenDeviationBar::new(&test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000));
2140        let mut checkpoint = Checkpoint::new(
2141            "BTCUSDT".to_string(), 250, None, None, 0, None, 0, true,
2142        );
2143        checkpoint.incomplete_bar = Some(bar);
2144        checkpoint.thresholds = None;
2145
2146        match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
2147            Err(CheckpointError::MissingThresholds) => {}
2148            other => panic!("Expected MissingThresholds, got {:?}", other.err()),
2149        }
2150    }
2151
2152    #[test]
2153    fn test_from_checkpoint_unknown_version_treated_as_v2() {
2154        let mut checkpoint = Checkpoint::new(
2155            "BTCUSDT".to_string(), 250, None, None, 0, None, 0, true,
2156        );
2157        checkpoint.version = 99;
2158
2159        let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2160        assert_eq!(processor.threshold_decimal_bps(), 250);
2161    }
2162
2163    #[test]
2164    fn test_from_checkpoint_valid_with_incomplete_bar() {
2165        use crate::fixed_point::FixedPoint;
2166        let bar = OpenDeviationBar::new(&test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000));
2167        let upper = FixedPoint::from_str("50125.0").unwrap();
2168        let lower = FixedPoint::from_str("49875.0").unwrap();
2169
2170        let checkpoint = Checkpoint::new(
2171            "BTCUSDT".to_string(), 250, Some(bar), Some((upper, lower)), 0, None, 0, true,
2172        );
2173
2174        let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2175        assert!(processor.get_incomplete_bar().is_some(), "Should restore incomplete bar");
2176    }
2177
2178    // =========================================================================
2179    // Issue #96: Ouroboros reset tests
2180    // =========================================================================
2181
2182    #[test]
2183    fn test_reset_at_ouroboros_with_orphan() {
2184        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2185
2186        // Feed trades to create incomplete bar
2187        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2188        let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 2000);
2189        assert!(processor.process_single_trade(&t1).unwrap().is_none());
2190        assert!(processor.process_single_trade(&t2).unwrap().is_none());
2191        assert!(processor.get_incomplete_bar().is_some(), "Should have incomplete bar");
2192
2193        // Reset at ouroboros boundary - should return orphaned bar
2194        let orphan = processor.reset_at_ouroboros();
2195        assert!(orphan.is_some(), "Should return orphaned bar");
2196        let orphan_bar = orphan.unwrap();
2197        assert_eq!(orphan_bar.open.to_string(), "50000.00000000");
2198
2199        // After reset, no incomplete bar
2200        assert!(processor.get_incomplete_bar().is_none(), "No bar after reset");
2201    }
2202
2203    #[test]
2204    fn test_reset_at_ouroboros_clean_state() {
2205        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2206
2207        // Reset without any trades processed - should return None
2208        let orphan = processor.reset_at_ouroboros();
2209        assert!(orphan.is_none(), "No orphan when state is clean");
2210        assert!(processor.get_incomplete_bar().is_none());
2211    }
2212
2213    #[test]
2214    fn test_reset_at_ouroboros_clears_defer_open() {
2215        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2216
2217        // Create a breach to set defer_open = true
2218        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2219        let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); // +0.4% breach
2220        processor.process_single_trade(&t1).unwrap();
2221        let bar = processor.process_single_trade(&t2).unwrap();
2222        assert!(bar.is_some(), "Should breach");
2223
2224        // After breach, defer_open is true - no incomplete bar
2225        assert!(processor.get_incomplete_bar().is_none());
2226
2227        // Reset at ouroboros should clear defer_open
2228        processor.reset_at_ouroboros();
2229
2230        // New trade should open fresh bar (defer_open was cleared)
2231        let t3 = test_utils::create_test_agg_trade(3, "50000.0", "1.0", 3000);
2232        processor.process_single_trade(&t3).unwrap();
2233        assert!(processor.get_incomplete_bar().is_some(), "Should have new bar after reset");
2234    }
2235
2236    // === EDGE CASE TESTS (Issue #96 Task #21) ===
2237
2238    #[test]
2239    fn test_single_trade_no_bar() {
2240        // A single trade cannot breach — no bar should be produced
2241        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2242        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2243        let bars = processor.process_agg_trade_records(&[trade]).unwrap();
2244        assert_eq!(bars.len(), 0, "Single trade should not produce a completed bar");
2245        assert!(processor.get_incomplete_bar().is_some(), "Should have incomplete bar");
2246    }
2247
2248    #[test]
2249    fn test_identical_timestamps_no_close() {
2250        // Issue #36: Bar cannot close when breach tick has same timestamp as open
2251        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2252        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2253        let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1000); // Same timestamp, breaches
2254        let bars = processor.process_agg_trade_records(&[t1, t2]).unwrap();
2255        assert_eq!(bars.len(), 0, "Bar should not close on same timestamp as open (Issue #36)");
2256    }
2257
2258    #[test]
2259    fn test_identical_timestamps_then_different_closes() {
2260        // Same-timestamp trades followed by different-timestamp breach should close
2261        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2262        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2263        let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 1000); // Same ts
2264        let t3 = test_utils::create_test_agg_trade(3, "50200.0", "1.0", 2000); // Different ts, breach
2265        let bars = processor.process_agg_trade_records(&[t1, t2, t3]).unwrap();
2266        assert_eq!(bars.len(), 1, "Should close when breach at different timestamp");
2267    }
2268
2269    #[test]
2270    fn test_streaming_defer_open_semantics() {
2271        // After breach via process_single_trade, next trade should open new bar
2272        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2273        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2274        let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); // Breach
2275        let t3 = test_utils::create_test_agg_trade(3, "51000.0", "1.0", 3000); // Opens new bar
2276
2277        processor.process_single_trade(&t1).unwrap();
2278        let bar = processor.process_single_trade(&t2).unwrap();
2279        assert!(bar.is_some(), "Trade 2 should cause a breach");
2280
2281        // After breach, no incomplete bar (defer_open state)
2282        assert!(processor.get_incomplete_bar().is_none());
2283
2284        // Next trade opens a fresh bar
2285        let bar2 = processor.process_single_trade(&t3).unwrap();
2286        assert!(bar2.is_none(), "Trade 3 should open new bar, not breach");
2287        let incomplete = processor.get_incomplete_bar().unwrap();
2288        assert_eq!(incomplete.open.to_f64(), 51000.0, "New bar should open at t3 price");
2289    }
2290
2291    #[test]
2292    fn test_process_empty_then_trades() {
2293        // Processing empty slice should be no-op, then normal processing works
2294        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2295        let bars = processor.process_agg_trade_records(&[]).unwrap();
2296        assert_eq!(bars.len(), 0);
2297        assert!(processor.get_incomplete_bar().is_none());
2298
2299        // Now process a real trade
2300        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2301        let bars = processor.process_agg_trade_records(&[trade]).unwrap();
2302        assert_eq!(bars.len(), 0);
2303        assert!(processor.get_incomplete_bar().is_some());
2304    }
2305
2306    #[test]
2307    fn test_multiple_breaches_in_batch() {
2308        // Multiple bars should form from a batch with repeated breaches
2309        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2310        let trades = vec![
2311            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
2312            test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000),  // Breach 1
2313            test_utils::create_test_agg_trade(3, "50500.0", "1.0", 3000),  // Opens bar 2
2314            test_utils::create_test_agg_trade(4, "50700.0", "1.0", 4000),  // Breach 2
2315            test_utils::create_test_agg_trade(5, "51000.0", "1.0", 5000),  // Opens bar 3
2316        ];
2317        let bars = processor.process_agg_trade_records(&trades).unwrap();
2318        assert_eq!(bars.len(), 2, "Should produce 2 completed bars from 2 breaches");
2319    }
2320
2321    #[test]
2322    fn test_streaming_batch_parity_extended() {
2323        // Task #29: Comprehensive streaming/batch parity with 20 trades producing 5+ bars
2324        // Uses 100 dbps (0.1%) threshold for more frequent breaches
2325        let threshold = 100;
2326
2327        // Build a zigzag price sequence that repeatedly breaches 0.1%
2328        let mut trades = Vec::new();
2329        let mut price = 50000.0;
2330        for i in 0..20 {
2331            // Alternate up and down movements exceeding 0.1%
2332            if i % 3 == 0 && i > 0 {
2333                price *= 1.002; // +0.2% → breach upward
2334            } else if i % 3 == 1 && i > 1 {
2335                price *= 0.998; // -0.2% → may breach downward
2336            } else {
2337                price *= 1.0005; // Small move, no breach
2338            }
2339            trades.push(test_utils::create_test_agg_trade(
2340                (i + 1) as i64,
2341                &format!("{:.8}", price),
2342                "1.0",
2343                (i as i64 + 1) * 1000,
2344            ));
2345        }
2346
2347        // === BATCH PATH ===
2348        let mut batch_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
2349        let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
2350
2351        // === STREAMING PATH ===
2352        let mut stream_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
2353        let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
2354        for trade in &trades {
2355            if let Some(bar) = stream_processor.process_single_trade(trade).unwrap() {
2356                stream_bars.push(bar);
2357            }
2358        }
2359
2360        // === VERIFY PARITY ===
2361        assert!(batch_bars.len() >= 3, "Should produce at least 3 bars from zigzag pattern");
2362        assert_eq!(
2363            batch_bars.len(), stream_bars.len(),
2364            "Batch ({}) and streaming ({}) bar count mismatch",
2365            batch_bars.len(), stream_bars.len()
2366        );
2367
2368        for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
2369            assert_eq!(b.open, s.open, "Bar {i}: open mismatch");
2370            assert_eq!(b.close, s.close, "Bar {i}: close mismatch");
2371            assert_eq!(b.high, s.high, "Bar {i}: high mismatch");
2372            assert_eq!(b.low, s.low, "Bar {i}: low mismatch");
2373            assert_eq!(b.volume, s.volume, "Bar {i}: volume mismatch");
2374            assert_eq!(b.open_time, s.open_time, "Bar {i}: open_time mismatch");
2375            assert_eq!(b.close_time, s.close_time, "Bar {i}: close_time mismatch");
2376            assert_eq!(b.individual_trade_count, s.individual_trade_count, "Bar {i}: trade_count mismatch");
2377        }
2378
2379        // Verify incomplete bars match
2380        let batch_inc = batch_processor.get_incomplete_bar();
2381        let stream_inc = stream_processor.get_incomplete_bar();
2382        match (&batch_inc, &stream_inc) {
2383            (Some(b), Some(s)) => {
2384                assert_eq!(b.open, s.open, "Incomplete: open mismatch");
2385                assert_eq!(b.volume, s.volume, "Incomplete: volume mismatch");
2386            }
2387            (None, None) => {}
2388            _ => panic!("Incomplete bar presence mismatch"),
2389        }
2390    }
2391
2392    #[test]
2393    fn test_multi_batch_sequential_state_continuity() {
2394        // Send 3 separate batches, each producing 1+ bars
2395        // Verify state carries correctly across batch boundaries
2396        let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); // 100 dbps = 0.10%
2397        let mut all_bars = Vec::new();
2398
2399        // Batch 1: open at 50000, breach needs > 0.10% = price > 50050
2400        let batch1 = vec![
2401            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
2402            test_utils::create_test_agg_trade(2, "50020.0", "1.0", 2000),
2403            test_utils::create_test_agg_trade(3, "50060.0", "1.0", 3000), // Breach (>0.10%)
2404        ];
2405        let bars1 = processor.process_agg_trade_records(&batch1).unwrap();
2406        all_bars.extend(bars1);
2407
2408        // Batch 2: next trade opens new bar, breach again
2409        let batch2 = vec![
2410            test_utils::create_test_agg_trade(4, "50100.0", "1.0", 4000), // Opens new bar
2411            test_utils::create_test_agg_trade(5, "50120.0", "1.0", 5000),
2412            test_utils::create_test_agg_trade(6, "50170.0", "1.0", 6000), // Breach (>0.10%)
2413        ];
2414        let bars2 = processor.process_agg_trade_records(&batch2).unwrap();
2415        all_bars.extend(bars2);
2416
2417        // Batch 3: another new bar from fresh state
2418        let batch3 = vec![
2419            test_utils::create_test_agg_trade(7, "50200.0", "1.0", 7000), // Opens new bar
2420            test_utils::create_test_agg_trade(8, "50220.0", "1.0", 8000),
2421            test_utils::create_test_agg_trade(9, "50280.0", "1.0", 9000), // Breach (>0.10%)
2422        ];
2423        let bars3 = processor.process_agg_trade_records(&batch3).unwrap();
2424        all_bars.extend(bars3);
2425
2426        // Should have produced at least 3 bars (one per batch boundary)
2427        assert!(
2428            all_bars.len() >= 3,
2429            "Expected at least 3 bars from 3 batches, got {}",
2430            all_bars.len()
2431        );
2432
2433        // Timestamps must be strictly monotonic
2434        for i in 1..all_bars.len() {
2435            assert!(
2436                all_bars[i].close_time >= all_bars[i - 1].close_time,
2437                "Bar {i}: close_time {} < previous {}",
2438                all_bars[i].close_time,
2439                all_bars[i - 1].close_time
2440            );
2441        }
2442
2443        // Trade IDs should be continuous across batches
2444        for i in 1..all_bars.len() {
2445            assert_eq!(
2446                all_bars[i].first_agg_trade_id,
2447                all_bars[i - 1].last_agg_trade_id + 1,
2448                "Bar {i}: trade ID gap (first={}, prev last={})",
2449                all_bars[i].first_agg_trade_id,
2450                all_bars[i - 1].last_agg_trade_id
2451            );
2452        }
2453    }
2454
2455    // Issue #96 Task #93: Edge case tests for processor algorithm invariants
2456
2457    #[test]
2458    fn test_same_timestamp_prevents_bar_close() {
2459        // Issue #36: Bar cannot close on same timestamp as it opened
2460        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2461        processor.prevent_same_timestamp_close = true;
2462
2463        // All trades at same timestamp but price breaches threshold
2464        let trades: Vec<AggTrade> = (0..5)
2465            .map(|i| {
2466                let price_str = if i == 0 {
2467                    "50000.0".to_string()
2468                } else {
2469                    // Price far above threshold to trigger breach
2470                    format!("{}.0", 50000 + (i + 1) * 200)
2471                };
2472                AggTrade {
2473                    agg_trade_id: i as i64,
2474                    price: FixedPoint::from_str(&price_str).unwrap(),
2475                    volume: FixedPoint::from_str("1.0").unwrap(),
2476                    first_trade_id: i as i64,
2477                    last_trade_id: i as i64,
2478                    timestamp: 1000000, // ALL same timestamp
2479                    is_buyer_maker: false,
2480                    is_best_match: None,
2481                }
2482            })
2483            .collect();
2484
2485        let bars = processor.process_agg_trade_records(&trades).unwrap();
2486        // No bars should close because timestamp gate blocks it
2487        assert_eq!(bars.len(), 0, "Same timestamp should prevent bar close (Issue #36)");
2488    }
2489
2490    #[test]
2491    fn test_single_trade_incomplete_bar() {
2492        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2493
2494        let trade = AggTrade {
2495            agg_trade_id: 1,
2496            price: FixedPoint::from_str("50000.0").unwrap(),
2497            volume: FixedPoint::from_str("10.0").unwrap(),
2498            first_trade_id: 1,
2499            last_trade_id: 1,
2500            timestamp: 1000000,
2501            is_buyer_maker: false,
2502            is_best_match: None,
2503        };
2504
2505        // Strict mode: 0 completed bars
2506        let bars = processor.process_agg_trade_records(&[trade.clone()]).unwrap();
2507        assert_eq!(bars.len(), 0, "Single trade cannot complete a bar");
2508
2509        // With incomplete: should return 1 incomplete bar
2510        let mut processor2 = OpenDeviationBarProcessor::new(250).unwrap();
2511        let bars_incl = processor2
2512            .process_agg_trade_records_with_incomplete(&[trade])
2513            .unwrap();
2514        assert_eq!(bars_incl.len(), 1, "Should return 1 incomplete bar");
2515        assert_eq!(bars_incl[0].open, bars_incl[0].close);
2516        assert_eq!(bars_incl[0].high, bars_incl[0].low);
2517    }
2518
2519    // === Issue #96: Configuration method coverage tests ===
2520
2521    #[test]
2522    fn test_with_options_gate_disabled_same_timestamp_closes() {
2523        // Issue #36: with prevent_same_timestamp_close=false, bar should close
2524        // even when breach trade has same timestamp as open
2525        let mut processor = OpenDeviationBarProcessor::with_options(250, false).unwrap();
2526        assert!(!processor.prevent_same_timestamp_close());
2527
2528        let trades = vec![
2529            AggTrade {
2530                agg_trade_id: 1, price: FixedPoint::from_str("50000.0").unwrap(),
2531                volume: FixedPoint::from_str("1.0").unwrap(),
2532                first_trade_id: 1, last_trade_id: 1, timestamp: 1000000,
2533                is_buyer_maker: false, is_best_match: None,
2534            },
2535            AggTrade {
2536                agg_trade_id: 2, price: FixedPoint::from_str("50200.0").unwrap(), // +0.4% > 0.25%
2537                volume: FixedPoint::from_str("1.0").unwrap(),
2538                first_trade_id: 2, last_trade_id: 2, timestamp: 1000000, // Same timestamp!
2539                is_buyer_maker: false, is_best_match: None,
2540            },
2541        ];
2542        let bars = processor.process_agg_trade_records(&trades).unwrap();
2543        assert_eq!(bars.len(), 1, "Gate disabled: same-timestamp breach should close bar");
2544    }
2545
2546    #[test]
2547    fn test_inter_bar_config_enables_features() {
2548        use crate::interbar::LookbackMode;
2549        let processor = OpenDeviationBarProcessor::new(250).unwrap();
2550        assert!(!processor.inter_bar_enabled(), "Default: inter-bar disabled");
2551
2552        let processor = processor.with_inter_bar_config(InterBarConfig {
2553            lookback_mode: LookbackMode::FixedCount(100),
2554            compute_tier2: false,
2555            compute_tier3: false,
2556            ..Default::default()
2557        });
2558        assert!(processor.inter_bar_enabled(), "After config: inter-bar enabled");
2559    }
2560
2561    #[test]
2562    fn test_intra_bar_feature_toggle() {
2563        let processor = OpenDeviationBarProcessor::new(250).unwrap();
2564        assert!(!processor.intra_bar_enabled(), "Default: intra-bar disabled");
2565
2566        let processor = processor.with_intra_bar_features();
2567        assert!(processor.intra_bar_enabled(), "After toggle: intra-bar enabled");
2568    }
2569
2570    #[test]
2571    fn test_set_inter_bar_config_after_construction() {
2572        use crate::interbar::LookbackMode;
2573        let mut processor = OpenDeviationBarProcessor::new(500).unwrap();
2574        assert!(!processor.inter_bar_enabled());
2575
2576        processor.set_inter_bar_config(InterBarConfig {
2577            lookback_mode: LookbackMode::FixedCount(200),
2578            compute_tier2: true,
2579            compute_tier3: false,
2580            ..Default::default()
2581        });
2582        assert!(processor.inter_bar_enabled(), "set_inter_bar_config should enable");
2583    }
2584
2585    #[test]
2586    fn test_process_with_options_incomplete_false_vs_true() {
2587        let trades = scenarios::single_breach_sequence(250);
2588
2589        // Without incomplete: only completed bars
2590        let mut p1 = OpenDeviationBarProcessor::new(250).unwrap();
2591        let bars_strict = p1.process_agg_trade_records_with_options(&trades, false).unwrap();
2592
2593        // With incomplete: completed + 1 partial
2594        let mut p2 = OpenDeviationBarProcessor::new(250).unwrap();
2595        let bars_incl = p2.process_agg_trade_records_with_options(&trades, true).unwrap();
2596
2597        assert!(
2598            bars_incl.len() >= bars_strict.len(),
2599            "inclusive ({}) must be >= strict ({})", bars_incl.len(), bars_strict.len()
2600        );
2601    }
2602
2603    // === Issue #96: Untested public method coverage ===
2604
2605    #[test]
2606    fn test_anomaly_summary_default_no_anomalies() {
2607        let processor = OpenDeviationBarProcessor::new(250).unwrap();
2608        let summary = processor.anomaly_summary();
2609        assert_eq!(summary.gaps_detected, 0);
2610        assert_eq!(summary.overlaps_detected, 0);
2611        assert_eq!(summary.timestamp_anomalies, 0);
2612        assert!(!summary.has_anomalies());
2613        assert_eq!(summary.total(), 0);
2614    }
2615
2616    #[test]
2617    fn test_anomaly_summary_preserved_through_checkpoint() {
2618        // Process some trades, create checkpoint, restore, verify anomaly state
2619        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2620        let trades = scenarios::single_breach_sequence(250);
2621        processor.process_agg_trade_records(&trades).unwrap();
2622
2623        let checkpoint = processor.create_checkpoint("TEST");
2624        let restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2625        let summary = restored.anomaly_summary();
2626        // Default processor has no anomalies; checkpoint preserves that
2627        assert_eq!(summary.total(), 0);
2628    }
2629
2630    #[test]
2631    fn test_anomaly_summary_from_checkpoint_with_anomalies() {
2632        // Deserialize a checkpoint that has anomaly data
2633        let json = r#"{
2634            "version": 3,
2635            "symbol": "TESTUSDT",
2636            "threshold_decimal_bps": 250,
2637            "prevent_same_timestamp_close": true,
2638            "defer_open": false,
2639            "current_bar": null,
2640            "thresholds": null,
2641            "last_timestamp_us": 1000000,
2642            "last_trade_id": 5,
2643            "price_hash": 0,
2644            "anomaly_summary": {"gaps_detected": 3, "overlaps_detected": 1, "timestamp_anomalies": 2}
2645        }"#;
2646        let checkpoint: crate::checkpoint::Checkpoint = serde_json::from_str(json).unwrap();
2647        let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2648        let summary = processor.anomaly_summary();
2649        assert_eq!(summary.gaps_detected, 3);
2650        assert_eq!(summary.overlaps_detected, 1);
2651        assert_eq!(summary.timestamp_anomalies, 2);
2652        assert!(summary.has_anomalies());
2653        assert_eq!(summary.total(), 6);
2654    }
2655
2656    #[test]
2657    fn test_with_inter_bar_config_and_cache_shared() {
2658        use crate::entropy_cache_global::get_global_entropy_cache;
2659        use crate::interbar::LookbackMode;
2660
2661        let global_cache = get_global_entropy_cache();
2662        let config = InterBarConfig {
2663            lookback_mode: LookbackMode::FixedCount(100),
2664            compute_tier2: true,
2665            compute_tier3: true,
2666            ..Default::default()
2667        };
2668
2669        // Two processors sharing the same global cache
2670        let p1 = OpenDeviationBarProcessor::new(250).unwrap()
2671            .with_inter_bar_config_and_cache(config.clone(), Some(global_cache.clone()));
2672        let p2 = OpenDeviationBarProcessor::new(500).unwrap()
2673            .with_inter_bar_config_and_cache(config, Some(global_cache));
2674
2675        assert!(p1.inter_bar_enabled());
2676        assert!(p2.inter_bar_enabled());
2677    }
2678
2679    #[test]
2680    fn test_set_inter_bar_config_with_cache_after_checkpoint() {
2681        use crate::entropy_cache_global::get_global_entropy_cache;
2682        use crate::interbar::LookbackMode;
2683
2684        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2685        let trades = scenarios::single_breach_sequence(250);
2686        processor.process_agg_trade_records(&trades).unwrap();
2687
2688        // Simulate checkpoint round-trip (inter-bar config not preserved)
2689        let checkpoint = processor.create_checkpoint("TEST");
2690        let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2691        assert!(!restored.inter_bar_enabled(), "Checkpoint does not preserve inter-bar config");
2692
2693        // Re-enable with shared cache
2694        let global_cache = get_global_entropy_cache();
2695        restored.set_inter_bar_config_with_cache(
2696            InterBarConfig {
2697                lookback_mode: LookbackMode::FixedCount(100),
2698                compute_tier2: false,
2699                compute_tier3: false,
2700                ..Default::default()
2701            },
2702            Some(global_cache),
2703        );
2704        assert!(restored.inter_bar_enabled(), "set_inter_bar_config_with_cache should re-enable");
2705    }
2706
2707    #[test]
2708    fn test_threshold_decimal_bps_getter() {
2709        let p250 = OpenDeviationBarProcessor::new(250).unwrap();
2710        assert_eq!(p250.threshold_decimal_bps(), 250);
2711
2712        let p1000 = OpenDeviationBarProcessor::new(1000).unwrap();
2713        assert_eq!(p1000.threshold_decimal_bps(), 1000);
2714    }
2715
2716    // =========================================================================
2717    // Issue #112: Gap-aware checkpoint recovery tests
2718    // =========================================================================
2719
2720    #[test]
2721    fn test_checkpoint_gap_discards_forming_bar() {
2722        // Process some trades, checkpoint, then resume with a 2-hour gap
2723        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2724
2725        // Create trades that don't breach (stay within 0.25%)
2726        let trades = vec![
2727            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000), // t=0
2728            test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000), // t=+1s
2729            test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640995202_000_000), // t=+2s
2730        ];
2731
2732        let bars = processor.process_agg_trade_records(&trades).unwrap();
2733        assert_eq!(bars.len(), 0, "No breach = no completed bars");
2734
2735        // Create checkpoint (should have forming bar)
2736        let checkpoint = processor.create_checkpoint("BTCUSDT");
2737        assert!(checkpoint.has_incomplete_bar(), "Should have forming bar");
2738
2739        // Resume from checkpoint
2740        let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2741
2742        // Feed trades with 2-hour gap (7,200,000,000 μs > 1-hour default max_gap)
2743        let gap_trades = vec![
2744            test_utils::create_test_agg_trade(4, "50030.0", "1.0", 1641002402_000_000), // +2h gap
2745            test_utils::create_test_agg_trade(5, "50040.0", "1.0", 1641002403_000_000),
2746        ];
2747
2748        let bars = restored.process_agg_trade_records(&gap_trades).unwrap();
2749        // Forming bar should have been discarded, no oversized bar emitted
2750        assert_eq!(bars.len(), 0, "No bars should complete — forming bar was discarded");
2751        assert_eq!(
2752            restored.anomaly_summary().gaps_detected, 1,
2753            "Gap should be recorded in anomaly summary"
2754        );
2755    }
2756
2757    #[test]
2758    fn test_checkpoint_small_gap_continues_bar() {
2759        // Resume with a gap UNDER the threshold — bar should continue
2760        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2761
2762        let trades = vec![
2763            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2764            test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000),
2765        ];
2766
2767        let _ = processor.process_agg_trade_records(&trades).unwrap();
2768        let checkpoint = processor.create_checkpoint("BTCUSDT");
2769        let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2770
2771        // Feed trades with 30-minute gap (1,800,000,000 μs < 1-hour max_gap)
2772        let small_gap_trades = vec![
2773            test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640997001_000_000), // +30m
2774            test_utils::create_test_agg_trade(4, "50125.01", "1.0", 1640997002_000_000), // breach
2775        ];
2776
2777        let bars = restored.process_agg_trade_records(&small_gap_trades).unwrap();
2778        assert_eq!(bars.len(), 1, "Bar should complete normally with small gap");
2779        // Bar should span from original open (trade 1) through gap trades
2780        assert_eq!(bars[0].open_time, 1640995200_000_000);
2781        assert_eq!(
2782            restored.anomaly_summary().gaps_detected, 0,
2783            "No gap anomaly for small gap"
2784        );
2785    }
2786
2787    #[test]
2788    fn test_checkpoint_gap_custom_max_gap() {
2789        // Test with custom max_gap_us set to 30 minutes
2790        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2791
2792        let trades = vec![
2793            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2794        ];
2795        let _ = processor.process_agg_trade_records(&trades).unwrap();
2796        let checkpoint = processor.create_checkpoint("BTCUSDT");
2797
2798        // Restore with custom 30-minute max gap
2799        let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint)
2800            .unwrap()
2801            .with_max_gap(1_800_000_000); // 30 minutes
2802
2803        // 45-minute gap should discard with 30-min threshold
2804        let gap_trades = vec![
2805            test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640997900_000_000), // +45min
2806        ];
2807
2808        let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
2809        assert_eq!(
2810            restored.anomaly_summary().gaps_detected, 1,
2811            "45-min gap should be detected with 30-min threshold"
2812        );
2813    }
2814
2815    #[test]
2816    fn test_is_valid_range_rejects_oversized() {
2817        use crate::fixed_point::FixedPoint;
2818
2819        let threshold_decimal_bps: u32 = 250; // 0.25%
2820        let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
2821            / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
2822
2823        // Bar with 0.50% range — exceeds 2x threshold (2 * 0.25% = 0.50%)
2824        // open=50000.0, high=50250.01, low=50000.0 → range=250.01/50000 ≈ 0.5%+
2825        let mut oversized = OpenDeviationBar::default();
2826        oversized.open = FixedPoint::from_str("50000.0").unwrap();
2827        oversized.high = FixedPoint::from_str("50250.01").unwrap();
2828        oversized.low = FixedPoint::from_str("50000.0").unwrap();
2829        assert!(
2830            !oversized.is_valid_range(threshold_ratio, 2),
2831            "Bar exceeding 2x threshold should be invalid"
2832        );
2833
2834        // Bar with 0.20% range — within threshold
2835        let mut valid = OpenDeviationBar::default();
2836        valid.open = FixedPoint::from_str("50000.0").unwrap();
2837        valid.high = FixedPoint::from_str("50100.0").unwrap();
2838        valid.low = FixedPoint::from_str("50000.0").unwrap();
2839        assert!(
2840            valid.is_valid_range(threshold_ratio, 2),
2841            "Bar within threshold should be valid"
2842        );
2843
2844        // Bar at exact breach boundary (0.25%) — should be valid (breach triggers AT threshold)
2845        let mut exact = OpenDeviationBar::default();
2846        exact.open = FixedPoint::from_str("50000.0").unwrap();
2847        exact.high = FixedPoint::from_str("50125.0").unwrap();
2848        exact.low = FixedPoint::from_str("50000.0").unwrap();
2849        assert!(
2850            exact.is_valid_range(threshold_ratio, 2),
2851            "Bar at exact threshold should be valid"
2852        );
2853    }
2854
2855    #[test]
2856    fn test_checkpoint_no_incomplete_bar_gap_is_noop() {
2857        // If checkpoint has no forming bar, gap detection is irrelevant
2858        let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2859
2860        // Process trades that complete a bar (produce breach)
2861        let trades = vec![
2862            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2863            test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1640995201_000_000), // breach
2864        ];
2865        let bars = processor.process_agg_trade_records(&trades).unwrap();
2866        assert_eq!(bars.len(), 1);
2867
2868        let checkpoint = processor.create_checkpoint("BTCUSDT");
2869        assert!(!checkpoint.has_incomplete_bar());
2870
2871        let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2872
2873        // Large gap doesn't matter — no forming bar to discard
2874        let gap_trades = vec![
2875            test_utils::create_test_agg_trade(3, "50010.0", "1.0", 1641081600_000_000), // +24h
2876        ];
2877        let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
2878        assert_eq!(
2879            restored.anomaly_summary().gaps_detected, 0,
2880            "No gap anomaly when no forming bar exists"
2881        );
2882    }
2883}