Skip to main content

rangebar_core/
processor.rs

1// FILE-SIZE-OK: 1496 lines — tests are inline (access private RangeBarState)
2//! Core range bar processing algorithm
3//!
4//! Implements non-lookahead bias range bar construction where bars close when
5//! price moves ±threshold dbps from the bar's OPEN price.
6
7use crate::checkpoint::{
8    AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
9};
10use crate::fixed_point::FixedPoint;
11use crate::interbar::{InterBarConfig, TradeHistory}; // Issue #59
12// Issue #59: Intra-bar features - using compute_intra_bar_features_with_scratch (Task #173)
13use crate::types::{AggTrade, RangeBar};
14use smallvec::SmallVec; // Issue #119: Trade accumulation with inline buffer (512 slots ≈ 29KB)
15#[cfg(feature = "python")]
16use pyo3::prelude::*;
17// Re-export ProcessingError from errors.rs (Phase 2a extraction)
18pub use crate::errors::ProcessingError;
19// Re-export ExportRangeBarProcessor from export_processor.rs (Phase 2d extraction)
20pub use crate::export_processor::ExportRangeBarProcessor;
21
22/// Range bar processor with non-lookahead bias guarantee
23pub struct RangeBarProcessor {
24    /// Threshold in decimal basis points (250 = 25bps, v3.0.0+)
25    threshold_decimal_bps: u32,
26
27    /// Issue #96 Task #98: Pre-computed threshold ratio for fast delta calculation
28    /// Stores (threshold_decimal_bps * SCALE) / BASIS_POINTS_SCALE as fixed-point
29    /// This allows compute_range_thresholds() to compute delta = (price * ratio) / SCALE
30    /// without repeated division, avoiding i128 arithmetic in hot path.
31    /// Performance: Eliminates BASIS_POINTS_SCALE division on every bar creation.
32    /// Made public for testing purposes.
33    pub threshold_ratio: i64,
34
35    /// Current bar state for streaming processing (Q19)
36    /// Enables get_incomplete_bar() and stateful process_single_trade()
37    current_bar_state: Option<RangeBarState>,
38
39    /// Price window for checkpoint hash verification
40    price_window: PriceWindow,
41
42    /// Last processed trade ID (for gap detection on resume)
43    last_trade_id: Option<i64>,
44
45    /// Last processed timestamp (for position verification)
46    last_timestamp_us: i64,
47
48    /// Anomaly tracking for debugging
49    anomaly_summary: AnomalySummary,
50
51    /// Flag indicating this processor was created from a checkpoint
52    /// When true, process_agg_trade_records will continue from existing bar state
53    resumed_from_checkpoint: bool,
54
55    /// Prevent bars from closing on same timestamp as they opened (Issue #36)
56    ///
57    /// When true (default): A bar cannot close until a trade arrives with a
58    /// different timestamp than the bar's open_time. This prevents "instant bars"
59    /// during flash crashes where multiple trades occur at the same millisecond.
60    ///
61    /// When false: Legacy behavior - bars can close on any breach regardless
62    /// of timestamp, which may produce bars with identical timestamps.
63    prevent_same_timestamp_close: bool,
64
65    /// Deferred bar open flag (Issue #46)
66    ///
67    /// When true: The previous trade triggered a threshold breach and closed a bar.
68    /// The next trade arriving via `process_single_trade()` should open a new bar
69    /// instead of being treated as a continuation.
70    ///
71    /// This matches the batch path's `defer_open` semantics in
72    /// `process_agg_trade_records()` where the breaching trade closes the current
73    /// bar and the NEXT trade opens the new bar.
74    defer_open: bool,
75
76    /// Trade history for inter-bar feature computation (Issue #59)
77    ///
78    /// Ring buffer of recent trades for computing lookback-based features.
79    /// When Some, features are computed from trades BEFORE each bar's open_time.
80    /// When None, inter-bar features are disabled (all lookback_* fields = None).
81    trade_history: Option<TradeHistory>,
82
83    /// Configuration for inter-bar features (Issue #59)
84    ///
85    /// Controls lookback mode (fixed count or time window) and which feature
86    /// tiers to compute. When None, inter-bar features are disabled.
87    inter_bar_config: Option<InterBarConfig>,
88
89    /// Enable intra-bar feature computation (Issue #59)
90    ///
91    /// When true, the processor accumulates trades during bar construction
92    /// and computes 22 features from trades WITHIN each bar at bar close.
93    /// Features include ITH (Investment Time Horizon), statistical, and
94    /// complexity metrics. When false, all intra_* fields are None.
95    include_intra_bar_features: bool,
96
97    /// Issue #112: Maximum timestamp gap in microseconds before discarding a forming bar
98    ///
99    /// When resuming from checkpoint with a forming bar, if the gap between
100    /// the forming bar's close_time and the first incoming trade exceeds this
101    /// threshold, the forming bar is discarded as an orphan (same as ouroboros reset).
102    /// This prevents "oversized" bars caused by large data gaps (e.g., 38-hour outages).
103    ///
104    /// Default: 3,600,000,000 μs (1 hour)
105    max_gap_us: i64,
106}
107
108/// Cold path: scan trades to find first unsorted pair and return error
109/// Extracted from validate_trade_ordering() to improve hot-path code layout
110#[cold]
111#[inline(never)]
112fn find_unsorted_trade(trades: &[AggTrade]) -> Result<(), ProcessingError> {
113    for i in 1..trades.len() {
114        let prev = &trades[i - 1];
115        let curr = &trades[i];
116        if curr.timestamp < prev.timestamp
117            || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
118        {
119            return Err(ProcessingError::UnsortedTrades {
120                index: i,
121                prev_time: prev.timestamp,
122                prev_id: prev.agg_trade_id,
123                curr_time: curr.timestamp,
124                curr_id: curr.agg_trade_id,
125            });
126        }
127    }
128    Ok(())
129}
130
131/// Cold path: construct unsorted trade error
132/// Extracted to keep error construction out of the hot validation loop
133#[cold]
134#[inline(never)]
135fn unsorted_trade_error(index: usize, prev: &AggTrade, curr: &AggTrade) -> Result<(), ProcessingError> {
136    Err(ProcessingError::UnsortedTrades {
137        index,
138        prev_time: prev.timestamp,
139        prev_id: prev.agg_trade_id,
140        curr_time: curr.timestamp,
141        curr_id: curr.agg_trade_id,
142    })
143}
144
145impl RangeBarProcessor {
146    /// Create new processor with given threshold
147    ///
148    /// Uses default behavior: `prevent_same_timestamp_close = true` (Issue #36)
149    ///
150    /// # Arguments
151    ///
152    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
153    ///   - Example: `250` → 25bps = 0.25%
154    ///   - Example: `10` → 1bps = 0.01%
155    ///   - Minimum: `1` → 0.1bps = 0.001%
156    ///
157    /// # Breaking Change (v3.0.0)
158    ///
159    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
160    /// **Migration**: Multiply all threshold values by 10.
161    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
162        Self::with_options(threshold_decimal_bps, true)
163    }
164
165    /// Create new processor with explicit timestamp gating control
166    ///
167    /// # Arguments
168    ///
169    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
170    /// * `prevent_same_timestamp_close` - If true, bars cannot close until
171    ///   timestamp advances from open_time. This prevents "instant bars" during
172    ///   flash crashes. Set to false for legacy behavior (pre-v9).
173    ///
174    /// # Example
175    ///
176    /// ```ignore
177    /// // Default behavior (v9+): timestamp gating enabled
178    /// let processor = RangeBarProcessor::new(250)?;
179    ///
180    /// // Legacy behavior: allow instant bars
181    /// let processor = RangeBarProcessor::with_options(250, false)?;
182    /// ```
183    pub fn with_options(
184        threshold_decimal_bps: u32,
185        prevent_same_timestamp_close: bool,
186    ) -> Result<Self, ProcessingError> {
187        // Validation bounds (v3.0.0: dbps units)
188        // Min: 1 dbps = 0.001%
189        // Max: 100,000 dbps = 100%
190        if threshold_decimal_bps < 1 {
191            return Err(ProcessingError::InvalidThreshold {
192                threshold_decimal_bps,
193            });
194        }
195        if threshold_decimal_bps > 100_000 {
196            return Err(ProcessingError::InvalidThreshold {
197                threshold_decimal_bps,
198            });
199        }
200
201        // Issue #96 Task #98: Pre-compute threshold ratio
202        // Ratio = (threshold_decimal_bps * SCALE) / BASIS_POINTS_SCALE
203        // This is used in compute_range_thresholds() for fast delta calculation
204        let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
205            / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
206
207        Ok(Self {
208            threshold_decimal_bps,
209            threshold_ratio,
210            current_bar_state: None,
211            price_window: PriceWindow::new(),
212            last_trade_id: None,
213            last_timestamp_us: 0,
214            anomaly_summary: AnomalySummary::default(),
215            resumed_from_checkpoint: false,
216            prevent_same_timestamp_close,
217            defer_open: false,
218            trade_history: None,               // Issue #59: disabled by default
219            inter_bar_config: None,            // Issue #59: disabled by default
220            include_intra_bar_features: false, // Issue #59: disabled by default
221            max_gap_us: 3_600_000_000,         // Issue #112: 1 hour default
222        })
223    }
224
225    /// Get the prevent_same_timestamp_close setting
226    pub fn prevent_same_timestamp_close(&self) -> bool {
227        self.prevent_same_timestamp_close
228    }
229
230    /// Enable inter-bar feature computation with the given configuration (Issue #59)
231    ///
232    /// When enabled, the processor maintains a trade history buffer and computes
233    /// lookback-based microstructure features on each bar close. Features are
234    /// computed from trades that occurred BEFORE each bar's open_time, ensuring
235    /// no lookahead bias.
236    ///
237    /// Uses a local entropy cache (default behavior, backward compatible).
238    /// For multi-symbol workloads, use `with_inter_bar_config_and_cache()` with a global cache.
239    ///
240    /// # Arguments
241    ///
242    /// * `config` - Configuration controlling lookback mode and feature tiers
243    ///
244    /// # Example
245    ///
246    /// ```ignore
247    /// use rangebar_core::processor::RangeBarProcessor;
248    /// use rangebar_core::interbar::{InterBarConfig, LookbackMode};
249    ///
250    /// let processor = RangeBarProcessor::new(1000)?
251    ///     .with_inter_bar_config(InterBarConfig {
252    ///         lookback_mode: LookbackMode::FixedCount(500),
253    ///         compute_tier2: true,
254    ///         compute_tier3: true,
255    ///     });
256    /// ```
257    pub fn with_inter_bar_config(self, config: InterBarConfig) -> Self {
258        self.with_inter_bar_config_and_cache(config, None)
259    }
260
261    /// Enable inter-bar feature computation with optional external entropy cache
262    ///
263    /// Issue #145 Phase 3: Multi-Symbol Entropy Cache Sharing
264    ///
265    /// # Arguments
266    ///
267    /// * `config` - Configuration controlling lookback mode and feature tiers
268    /// * `external_cache` - Optional shared entropy cache from `get_global_entropy_cache()`
269    ///   - If provided: Uses the shared global cache (recommended for multi-symbol)
270    ///   - If None: Creates a local 128-entry cache (default, backward compatible)
271    ///
272    /// # Usage
273    ///
274    /// ```ignore
275    /// use rangebar_core::{processor::RangeBarProcessor, entropy_cache_global::get_global_entropy_cache, interbar::InterBarConfig};
276    ///
277    /// // Single-symbol: use local cache (default)
278    /// let processor = RangeBarProcessor::new(1000)?
279    ///     .with_inter_bar_config(config);
280    ///
281    /// // Multi-symbol: share global cache
282    /// let global_cache = get_global_entropy_cache();
283    /// let processor = RangeBarProcessor::new(1000)?
284    ///     .with_inter_bar_config_and_cache(config, Some(global_cache));
285    /// ```
286    pub fn with_inter_bar_config_and_cache(
287        mut self,
288        config: InterBarConfig,
289        external_cache: Option<std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>>,
290    ) -> Self {
291        self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
292        self.inter_bar_config = Some(config);
293        self
294    }
295
296    /// Check if inter-bar features are enabled
297    pub fn inter_bar_enabled(&self) -> bool {
298        self.inter_bar_config.is_some()
299    }
300
301    /// Issue #112: Configure maximum timestamp gap for checkpoint recovery
302    ///
303    /// When resuming from checkpoint with a forming bar, if the gap between
304    /// the forming bar's close_time and the first incoming trade exceeds this
305    /// threshold, the forming bar is discarded as an orphan.
306    ///
307    /// # Arguments
308    ///
309    /// * `max_gap_us` - Maximum gap in microseconds (default: 3,600,000,000 = 1 hour)
310    pub fn with_max_gap(mut self, max_gap_us: i64) -> Self {
311        self.max_gap_us = max_gap_us;
312        self
313    }
314
315    /// Get the maximum gap threshold in microseconds
316    pub fn max_gap_us(&self) -> i64 {
317        self.max_gap_us
318    }
319
320    /// Enable intra-bar feature computation (Issue #59)
321    ///
322    /// When enabled, the processor accumulates trades during bar construction
323    /// and computes 22 features from trades WITHIN each bar at bar close:
324    /// - 8 ITH features (Investment Time Horizon)
325    /// - 12 statistical features (OFI, intensity, Kyle lambda, etc.)
326    /// - 2 complexity features (Hurst exponent, permutation entropy)
327    ///
328    /// # Memory Note
329    ///
330    /// Trades are accumulated per-bar and freed when the bar closes.
331    /// Typical 1000 dbps bar: ~50-500 trades, ~2-24 KB overhead.
332    ///
333    /// # Example
334    ///
335    /// ```ignore
336    /// let processor = RangeBarProcessor::new(1000)?
337    ///     .with_intra_bar_features();
338    /// ```
339    pub fn with_intra_bar_features(mut self) -> Self {
340        self.include_intra_bar_features = true;
341        self
342    }
343
344    /// Check if intra-bar features are enabled
345    pub fn intra_bar_enabled(&self) -> bool {
346        self.include_intra_bar_features
347    }
348
349    /// Re-enable inter-bar features on an existing processor (Issue #97).
350    ///
351    /// Used after `from_checkpoint()` to restore microstructure config that
352    /// is not preserved in checkpoint state. Uses a local entropy cache by default.
353    /// For multi-symbol workloads, use `set_inter_bar_config_with_cache()` with a global cache.
354    pub fn set_inter_bar_config(&mut self, config: InterBarConfig) {
355        self.set_inter_bar_config_with_cache(config, None);
356    }
357
358    /// Re-enable inter-bar features with optional external entropy cache (Issue #145 Phase 3).
359    ///
360    /// Used after `from_checkpoint()` to restore microstructure config that
361    /// is not preserved in checkpoint state. Allows specifying a shared entropy cache
362    /// for multi-symbol processors.
363    pub fn set_inter_bar_config_with_cache(
364        &mut self,
365        config: InterBarConfig,
366        external_cache: Option<std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>>,
367    ) {
368        self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
369        self.inter_bar_config = Some(config);
370    }
371
372    /// Re-enable intra-bar features on an existing processor (Issue #97).
373    pub fn set_intra_bar_features(&mut self, enabled: bool) {
374        self.include_intra_bar_features = enabled;
375    }
376
377    /// Process a single trade and return completed bar if any
378    ///
379    /// Maintains internal state for streaming use case. State persists across calls
380    /// until a bar completes (threshold breach), enabling get_incomplete_bar().
381    ///
382    /// # Arguments
383    ///
384    /// * `trade` - Single aggregated trade to process
385    ///
386    /// # Returns
387    ///
388    /// `Some(RangeBar)` if a bar was completed, `None` otherwise
389    ///
390    /// # State Management
391    ///
392    /// - First trade: Initializes new bar state
393    /// - Subsequent trades: Updates existing bar or closes on breach
394    /// - Breach: Returns completed bar, starts new bar with breaching trade
395    ///
396    /// Issue #96 Task #78: Accept borrowed AggTrade to eliminate clones in fan-out loops.
397    /// Streaming pipelines (4+ thresholds) were cloning ~57 byte trades per processor.
398    /// Signature change to `&AggTrade` eliminates 4-8x unnecessary allocations.
399    /// Issue #96 Task #84: `#[inline]` — main hot-path entry point called on every trade.
400    #[inline]
401    pub fn process_single_trade(
402        &mut self,
403        trade: &AggTrade,
404    ) -> Result<Option<RangeBar>, ProcessingError> {
405        // Track price and position for checkpoint
406        self.price_window.push(trade.price);
407        self.last_trade_id = Some(trade.agg_trade_id);
408        self.last_timestamp_us = trade.timestamp;
409
410        // Issue #59: Push trade to history buffer for inter-bar feature computation
411        // This must happen BEFORE bar processing so lookback window includes recent trades
412        if let Some(ref mut history) = self.trade_history {
413            history.push(trade);
414        }
415
416        // Issue #46: If previous call triggered a breach, this trade opens the new bar.
417        // This matches the batch path's defer_open semantics - the breaching trade
418        // closes the current bar, and the NEXT trade opens the new bar.
419        if self.defer_open {
420            // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
421            if let Some(ref mut history) = self.trade_history {
422                history.on_bar_open(trade.timestamp);
423            }
424            self.current_bar_state = Some(if self.include_intra_bar_features {
425                RangeBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
426            } else {
427                RangeBarState::new(trade, self.threshold_ratio)
428            });
429            self.defer_open = false;
430            return Ok(None);
431        }
432
433        match &mut self.current_bar_state {
434            None => {
435                // First trade - initialize new bar
436                // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
437                if let Some(ref mut history) = self.trade_history {
438                    history.on_bar_open(trade.timestamp);
439                }
440                self.current_bar_state = Some(if self.include_intra_bar_features {
441                    RangeBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
442                } else {
443                    RangeBarState::new(trade, self.threshold_ratio)
444                });
445                Ok(None)
446            }
447            Some(bar_state) => {
448                // Issue #59 & #96 Task #44: Accumulate trade for intra-bar features (before breach check)
449                // Only accumulates if features enabled, avoiding unnecessary clones
450                bar_state.accumulate_trade(trade, self.include_intra_bar_features);
451
452                // Check for threshold breach
453                let price_breaches = bar_state.bar.is_breach(
454                    trade.price,
455                    bar_state.upper_threshold,
456                    bar_state.lower_threshold,
457                );
458
459                // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
460                // This eliminates "instant bars" during flash crashes where multiple trades
461                // occur at the same millisecond.
462                let timestamp_allows_close = !self.prevent_same_timestamp_close
463                    || trade.timestamp != bar_state.bar.open_time;
464
465                if price_breaches && timestamp_allows_close {
466                    // Breach detected AND timestamp changed - close current bar
467                    bar_state.bar.update_with_trade(trade);
468
469                    // Validation: Ensure high/low include open/close extremes
470                    debug_assert!(
471                        bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
472                    );
473                    debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
474
475                    // Compute microstructure features at bar finalization (Issue #25)
476                    bar_state.bar.compute_microstructure_features();
477
478                    // Issue #59: Compute inter-bar features from lookback window
479                    // Features are computed from trades BEFORE bar.open_time (no lookahead)
480                    if let Some(ref mut history) = self.trade_history {
481                        let inter_bar_features = history.compute_features(bar_state.bar.open_time);
482                        bar_state.bar.set_inter_bar_features(&inter_bar_features);
483                        // Issue #68: Notify history that bar is closing (resumes normal pruning)
484                        history.on_bar_close();
485                    }
486
487                    // Issue #59: Compute intra-bar features from accumulated trades
488                    if self.include_intra_bar_features {
489                        // Issue #96 Task #173: Use reusable scratch buffers from bar_state
490                        let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_scratch(
491                            &bar_state.accumulated_trades,
492                            &mut bar_state.scratch_prices,
493                            &mut bar_state.scratch_volumes,
494                        );
495                        bar_state.bar.set_intra_bar_features(&intra_bar_features);
496                    }
497
498                    // Move bar out instead of cloning — bar_state borrow ends after
499                    // last use above (NLL), so take() is safe here.
500                    let completed_bar = self.current_bar_state.take().unwrap().bar;
501
502                    // Issue #46: Don't start new bar with breaching trade.
503                    // Next trade will open the new bar via defer_open.
504                    self.defer_open = true;
505
506                    Ok(Some(completed_bar))
507                } else {
508                    // Either no breach OR same timestamp (gate active) - update existing bar
509                    bar_state.bar.update_with_trade(trade);
510                    Ok(None)
511                }
512            }
513        }
514    }
515
516    /// Get any incomplete bar currently being processed
517    ///
518    /// Returns clone of current bar state for inspection without consuming it.
519    /// Useful for final bar at stream end or progress monitoring.
520    ///
521    /// # Returns
522    ///
523    /// `Some(RangeBar)` if bar is in progress, `None` if no active bar
524    pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
525        self.current_bar_state
526            .as_ref()
527            .map(|state| state.bar.clone())
528    }
529
530    /// Process AggTrade records into range bars including incomplete bars for analysis
531    ///
532    /// # Arguments
533    ///
534    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
535    ///
536    /// # Returns
537    ///
538    /// Vector of range bars including incomplete bars at end of data
539    ///
540    /// # Warning
541    ///
542    /// This method is for analysis purposes only. Incomplete bars violate the
543    /// fundamental range bar algorithm and should not be used for production trading.
544    pub fn process_agg_trade_records_with_incomplete(
545        &mut self,
546        agg_trade_records: &[AggTrade],
547    ) -> Result<Vec<RangeBar>, ProcessingError> {
548        self.process_agg_trade_records_with_options(agg_trade_records, true)
549    }
550
551    /// Process Binance aggregated trade records into range bars
552    ///
553    /// This is the primary method for converting AggTrade records (which aggregate
554    /// multiple individual trades) into range bars based on price movement thresholds.
555    ///
556    /// # Parameters
557    ///
558    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
559    ///   Each record represents multiple individual trades aggregated at same price
560    ///
561    /// # Returns
562    ///
563    /// Vector of completed range bars (ONLY bars that breached thresholds).
564    /// Each bar tracks both individual trade count and AggTrade record count.
565    pub fn process_agg_trade_records(
566        &mut self,
567        agg_trade_records: &[AggTrade],
568    ) -> Result<Vec<RangeBar>, ProcessingError> {
569        self.process_agg_trade_records_with_options(agg_trade_records, false)
570    }
571
572    /// Process AggTrade records with options for including incomplete bars
573    ///
574    /// Batch processing mode: Clears any existing state before processing.
575    /// Use process_single_trade() for stateful streaming instead.
576    ///
577    /// # Parameters
578    ///
579    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
580    /// * `include_incomplete` - Whether to include incomplete bars at end of processing
581    ///
582    /// # Returns
583    ///
584    /// Vector of range bars (completed + incomplete if requested)
585    pub fn process_agg_trade_records_with_options(
586        &mut self,
587        agg_trade_records: &[AggTrade],
588        include_incomplete: bool,
589    ) -> Result<Vec<RangeBar>, ProcessingError> {
590        if agg_trade_records.is_empty() {
591            return Ok(Vec::new());
592        }
593
594        // Validate records are sorted
595        self.validate_trade_ordering(agg_trade_records)?;
596
597        // Use existing bar state if resuming from checkpoint, otherwise start fresh
598        // This is CRITICAL for cross-file continuation (Issues #2, #3)
599        let mut current_bar: Option<RangeBarState> = if self.resumed_from_checkpoint {
600            // Continue from checkpoint's incomplete bar
601            self.resumed_from_checkpoint = false; // Consume the flag
602            let restored_bar = self.current_bar_state.take();
603
604            // Issue #112: Gap-aware checkpoint recovery
605            // If the forming bar's close_time is too far from the first incoming trade,
606            // discard it as an orphan to prevent oversized bars from data gaps.
607            if let Some(ref bar_state) = restored_bar {
608                let first_trade_ts = agg_trade_records[0].timestamp;
609                let gap = first_trade_ts - bar_state.bar.close_time;
610                if gap > self.max_gap_us {
611                    self.anomaly_summary.record_gap();
612                    // Discard forming bar — same treatment as ouroboros reset
613                    None
614                } else {
615                    restored_bar
616                }
617            } else {
618                restored_bar
619            }
620        } else {
621            // Start fresh for normal batch processing
622            self.current_bar_state = None;
623            None
624        };
625
626        let mut bars = Vec::with_capacity(agg_trade_records.len() / 50); // Heuristic: 50 trades/bar covers consolidation regimes
627        let mut defer_open = false;
628
629        for agg_record in agg_trade_records {
630            // Track price and position for checkpoint
631            self.price_window.push(agg_record.price);
632            self.last_trade_id = Some(agg_record.agg_trade_id);
633            self.last_timestamp_us = agg_record.timestamp;
634
635            // Issue #59: Push trade to history buffer for inter-bar feature computation
636            if let Some(ref mut history) = self.trade_history {
637                history.push(agg_record);
638            }
639
640            if defer_open {
641                // Previous bar closed, this agg_record opens new bar
642                // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
643                if let Some(ref mut history) = self.trade_history {
644                    history.on_bar_open(agg_record.timestamp);
645                }
646                current_bar = Some(if self.include_intra_bar_features {
647                    RangeBarState::new_with_trade_accumulation(
648                        agg_record,
649                        self.threshold_ratio,
650                    )
651                } else {
652                    RangeBarState::new(agg_record, self.threshold_ratio)
653                });
654                defer_open = false;
655                continue;
656            }
657
658            match current_bar {
659                None => {
660                    // First bar initialization
661                    // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
662                    if let Some(ref mut history) = self.trade_history {
663                        history.on_bar_open(agg_record.timestamp);
664                    }
665                    current_bar = Some(if self.include_intra_bar_features {
666                        RangeBarState::new_with_trade_accumulation(
667                            agg_record,
668                            self.threshold_ratio,
669                        )
670                    } else {
671                        RangeBarState::new(agg_record, self.threshold_ratio)
672                    });
673                }
674                Some(ref mut bar_state) => {
675                    // Issue #59 & #96 Task #44: Accumulate trade for intra-bar features (before breach check)
676                    // Only accumulates if features enabled, avoiding unnecessary clones
677                    bar_state.accumulate_trade(agg_record, self.include_intra_bar_features);
678
679                    // Check if this AggTrade record breaches the threshold
680                    let price_breaches = bar_state.bar.is_breach(
681                        agg_record.price,
682                        bar_state.upper_threshold,
683                        bar_state.lower_threshold,
684                    );
685
686                    // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
687                    // This eliminates "instant bars" during flash crashes where multiple trades
688                    // occur at the same millisecond.
689                    let timestamp_allows_close = !self.prevent_same_timestamp_close
690                        || agg_record.timestamp != bar_state.bar.open_time;
691
692                    if price_breaches && timestamp_allows_close {
693                        // Breach detected AND timestamp changed - update bar with breaching record
694                        bar_state.bar.update_with_trade(agg_record);
695
696                        // Validation: Ensure high/low include open/close extremes
697                        debug_assert!(
698                            bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
699                        );
700                        debug_assert!(
701                            bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
702                        );
703
704                        // Compute microstructure features at bar finalization (Issue #34)
705                        bar_state.bar.compute_microstructure_features();
706
707                        // Issue #59: Compute inter-bar features from lookback window
708                        if let Some(ref mut history) = self.trade_history {
709                            let inter_bar_features =
710                                history.compute_features(bar_state.bar.open_time);
711                            bar_state.bar.set_inter_bar_features(&inter_bar_features);
712                            // Issue #68: Notify history that bar is closing (resumes normal pruning)
713                            history.on_bar_close();
714                        }
715
716                        // Issue #59: Compute intra-bar features from accumulated trades
717                        if self.include_intra_bar_features {
718                            // Issue #96 Task #173: Use reusable scratch buffers from bar_state
719                            let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_scratch(
720                                &bar_state.accumulated_trades,
721                                &mut bar_state.scratch_prices,
722                                &mut bar_state.scratch_volumes,
723                            );
724                            bar_state.bar.set_intra_bar_features(&intra_bar_features);
725                        }
726
727                        // Move bar out instead of cloning — bar_state borrow ends
728                        // after last use above (NLL), so take() is safe here.
729                        bars.push(current_bar.take().unwrap().bar);
730                        defer_open = true; // Next record will open new bar
731                    } else {
732                        // Either no breach OR same timestamp (gate active) - normal update
733                        bar_state.bar.update_with_trade(agg_record);
734                    }
735                }
736            }
737        }
738
739        // Save current bar state for checkpoint and optionally append incomplete bar.
740        // When include_incomplete=true, clone for checkpoint then consume for output.
741        // When include_incomplete=false, move directly (no clone needed).
742        if include_incomplete {
743            // Issue #96 Task #95: Optimize checkpoint cloning with take() to avoid Vec allocation
744            // (accumulated_trades not needed after intra-bar features computed).
745            // Using std::mem::take() instead of clone+clear reduces allocation overhead.
746            if let Some(ref state) = current_bar {
747                let mut checkpoint_state = state.clone();
748                // Use take() to avoid cloning the Vec - directly moves empty Vec into place
749                let _ = std::mem::take(&mut checkpoint_state.accumulated_trades);
750                self.current_bar_state = Some(checkpoint_state);
751            }
752
753            // Add final partial bar only if explicitly requested
754            // This preserves algorithm integrity: bars should only close on threshold breach
755            if let Some(mut bar_state) = current_bar {
756                // Compute microstructure features for incomplete bar (Issue #34)
757                bar_state.bar.compute_microstructure_features();
758
759                // Issue #59: Compute inter-bar features from lookback window
760                if let Some(ref history) = self.trade_history {
761                    let inter_bar_features = history.compute_features(bar_state.bar.open_time);
762                    bar_state.bar.set_inter_bar_features(&inter_bar_features);
763                }
764
765                // Issue #59: Compute intra-bar features from accumulated trades
766                if self.include_intra_bar_features {
767                    // Issue #96 Task #173: Use reusable scratch buffers from bar_state
768                    let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_scratch(
769                        &bar_state.accumulated_trades,
770                        &mut bar_state.scratch_prices,
771                        &mut bar_state.scratch_volumes,
772                    );
773                    bar_state.bar.set_intra_bar_features(&intra_bar_features);
774                }
775
776                bars.push(bar_state.bar);
777            }
778        } else {
779            // No incomplete bar appended — move ownership directly, no clone needed
780            self.current_bar_state = current_bar;
781        }
782
783        Ok(bars)
784    }
785
786    // === CHECKPOINT METHODS ===
787
788    /// Create checkpoint for cross-file continuation
789    ///
790    /// Captures current processing state for seamless continuation:
791    /// - Incomplete bar (if any) with FIXED thresholds
792    /// - Position tracking (timestamp, trade_id if available)
793    /// - Price hash for verification
794    ///
795    /// # Arguments
796    ///
797    /// * `symbol` - Symbol being processed (e.g., "BTCUSDT", "EURUSD")
798    ///
799    /// # Example
800    ///
801    /// ```ignore
802    /// let bars = processor.process_agg_trade_records(&trades)?;
803    /// let checkpoint = processor.create_checkpoint("BTCUSDT");
804    /// let json = serde_json::to_string(&checkpoint)?;
805    /// std::fs::write("checkpoint.json", json)?;
806    /// ```
807    pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
808        let (incomplete_bar, thresholds) = match &self.current_bar_state {
809            Some(state) => (
810                Some(state.bar.clone()),
811                Some((state.upper_threshold, state.lower_threshold)),
812            ),
813            None => (None, None),
814        };
815
816        let mut checkpoint = Checkpoint::new(
817            symbol.to_string(),
818            self.threshold_decimal_bps,
819            incomplete_bar,
820            thresholds,
821            self.last_timestamp_us,
822            self.last_trade_id,
823            self.price_window.compute_hash(),
824            self.prevent_same_timestamp_close,
825        );
826        // Issue #46: Persist defer_open state for cross-session continuity
827        checkpoint.defer_open = self.defer_open;
828        checkpoint
829    }
830
831    /// Resume processing from checkpoint
832    ///
833    /// Restores incomplete bar state with IMMUTABLE thresholds.
834    /// Next trade continues building the bar until threshold breach.
835    ///
836    /// # Errors
837    ///
838    /// - `CheckpointError::MissingThresholds` - Checkpoint has bar but no thresholds
839    ///
840    /// # Example
841    ///
842    /// ```ignore
843    /// let json = std::fs::read_to_string("checkpoint.json")?;
844    /// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
845    /// let mut processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
846    /// let bars = processor.process_agg_trade_records(&next_file_trades)?;
847    /// ```
848    pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
849        // Issue #85 Phase 2: Apply checkpoint schema migration if needed
850        let checkpoint = Self::migrate_checkpoint(checkpoint);
851
852        // Issue #62: Validate threshold range before restoring from checkpoint
853        // Valid range: 1-100,000 dbps (0.0001% to 10%)
854        const THRESHOLD_MIN: u32 = 1;
855        const THRESHOLD_MAX: u32 = 100_000;
856        if checkpoint.threshold_decimal_bps < THRESHOLD_MIN
857            || checkpoint.threshold_decimal_bps > THRESHOLD_MAX
858        {
859            return Err(CheckpointError::InvalidThreshold {
860                threshold: checkpoint.threshold_decimal_bps,
861                min_threshold: THRESHOLD_MIN,
862                max_threshold: THRESHOLD_MAX,
863            });
864        }
865
866        // Validate checkpoint consistency
867        if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
868            return Err(CheckpointError::MissingThresholds);
869        }
870
871        // Restore bar state if there's an incomplete bar
872        // Note: accumulated_trades is reset to empty - intra-bar features won't be
873        // accurate for bars resumed from checkpoint (partial trade history lost)
874        let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
875            (Some(bar), Some((upper, lower))) => Some(RangeBarState {
876                bar,
877                upper_threshold: upper,
878                lower_threshold: lower,
879                accumulated_trades: SmallVec::new(), // Lost on checkpoint - features may be partial
880                scratch_prices: SmallVec::new(),
881                scratch_volumes: SmallVec::new(),
882            }),
883            _ => None,
884        };
885
886        // Issue #96 Task #98: Pre-compute threshold ratio (same as with_options)
887        let threshold_ratio = ((checkpoint.threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
888            / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
889
890        Ok(Self {
891            threshold_decimal_bps: checkpoint.threshold_decimal_bps,
892            threshold_ratio,
893            current_bar_state,
894            price_window: PriceWindow::new(), // Reset - will be rebuilt from new trades
895            last_trade_id: checkpoint.last_trade_id,
896            last_timestamp_us: checkpoint.last_timestamp_us,
897            anomaly_summary: checkpoint.anomaly_summary,
898            resumed_from_checkpoint: true, // Signal to continue from existing bar state
899            prevent_same_timestamp_close: checkpoint.prevent_same_timestamp_close,
900            defer_open: checkpoint.defer_open, // Issue #46: Restore deferred open state
901            trade_history: None,               // Issue #59: Must be re-enabled after restore
902            inter_bar_config: None,            // Issue #59: Must be re-enabled after restore
903            include_intra_bar_features: false, // Issue #59: Must be re-enabled after restore
904            max_gap_us: 3_600_000_000,         // Issue #112: 1 hour default
905        })
906    }
907
908    /// Migrate checkpoint between schema versions
909    /// Issue #85 Phase 2: Handle v1 → v2 migration
910    /// Safe: JSON deserialization is field-name-based, so old v1 checkpoints load correctly
911    fn migrate_checkpoint(mut checkpoint: Checkpoint) -> Checkpoint {
912        match checkpoint.version {
913            1 => {
914                // v1 → v2: RangeBar struct field reordering (no behavioral changes)
915                // JSON serialization is position-independent, so no transformation needed
916                checkpoint.version = 2;
917                checkpoint
918            }
919            2 => {
920                // Already current version
921                checkpoint
922            }
923            _ => {
924                // Unknown version - log warning and continue with best effort
925                eprintln!(
926                    "Warning: Checkpoint has unknown version {}, treating as v2",
927                    checkpoint.version
928                );
929                checkpoint.version = 2;
930                checkpoint
931            }
932        }
933    }
934
935    /// Verify we're at the right position in the data stream
936    ///
937    /// Call with first trade of new file to verify continuity.
938    /// Returns verification result indicating if there's a gap or exact match.
939    ///
940    /// # Arguments
941    ///
942    /// * `first_trade` - First trade of the new file/chunk
943    ///
944    /// # Example
945    ///
946    /// ```ignore
947    /// let processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
948    /// let verification = processor.verify_position(&next_file_trades[0]);
949    /// match verification {
950    ///     PositionVerification::Exact => println!("Perfect continuation!"),
951    ///     PositionVerification::Gap { missing_count, .. } => {
952    ///         println!("Warning: {} trades missing", missing_count);
953    ///     }
954    ///     PositionVerification::TimestampOnly { gap_ms } => {
955    ///         println!("Exness data: {}ms gap", gap_ms);
956    ///     }
957    /// }
958    /// ```
959    pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
960        match self.last_trade_id {
961            Some(last_id) => {
962                // Binance: has trade IDs - check for gaps
963                let expected_id = last_id + 1;
964                if first_trade.agg_trade_id == expected_id {
965                    PositionVerification::Exact
966                } else {
967                    let missing_count = first_trade.agg_trade_id - expected_id;
968                    PositionVerification::Gap {
969                        expected_id,
970                        actual_id: first_trade.agg_trade_id,
971                        missing_count,
972                    }
973                }
974            }
975            None => {
976                // Exness: no trade IDs - use timestamp only
977                let gap_us = first_trade.timestamp - self.last_timestamp_us;
978                let gap_ms = gap_us / 1000;
979                PositionVerification::TimestampOnly { gap_ms }
980            }
981        }
982    }
983
984    /// Get the current anomaly summary
985    pub fn anomaly_summary(&self) -> &AnomalySummary {
986        &self.anomaly_summary
987    }
988
989    /// Get the threshold in decimal basis points
990    pub fn threshold_decimal_bps(&self) -> u32 {
991        self.threshold_decimal_bps
992    }
993
994    /// Validate that trades are properly sorted for deterministic processing
995    ///
996    /// Issue #96 Task #62: Early-exit optimization for sorted data
997    /// For typical workloads (95%+ sorted), quick first/last check identifies
998    /// unsorted batches immediately without full O(n) validation.
999    fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
1000        if trades.is_empty() {
1001            return Ok(());
1002        }
1003
1004        // Issue #96 Task #62: Fast-path check for obviously unsorted data
1005        // If first and last trades are not ordered, data is definitely unsorted
1006        // This early-exit catches common failures without full validation
1007        let first = &trades[0];
1008        let last = &trades[trades.len() - 1];
1009
1010        if last.timestamp < first.timestamp
1011            || (last.timestamp == first.timestamp && last.agg_trade_id <= first.agg_trade_id)
1012        {
1013            // Definitely unsorted - find exact error location (cold path)
1014            return find_unsorted_trade(trades);
1015        }
1016
1017        // Full validation for typical sorted case
1018        for i in 1..trades.len() {
1019            let prev = &trades[i - 1];
1020            let curr = &trades[i];
1021
1022            // Check ordering: (timestamp, agg_trade_id) ascending
1023            if curr.timestamp < prev.timestamp
1024                || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
1025            {
1026                return unsorted_trade_error(i, prev, curr);
1027            }
1028        }
1029
1030        Ok(())
1031    }
1032
1033    /// Reset processor state at an ouroboros boundary (year/month/week).
1034    ///
1035    /// Clears the incomplete bar and position tracking while preserving
1036    /// the threshold configuration. Use this when starting fresh at a
1037    /// known boundary for reproducibility.
1038    ///
1039    /// # Returns
1040    ///
1041    /// The orphaned incomplete bar (if any) so caller can decide
1042    /// whether to include it in results with `is_orphan=True` flag.
1043    ///
1044    /// # Example
1045    ///
1046    /// ```ignore
1047    /// // At year boundary (Jan 1 00:00:00 UTC)
1048    /// let orphaned = processor.reset_at_ouroboros();
1049    /// if let Some(bar) = orphaned {
1050    ///     // Handle incomplete bar from previous year
1051    /// }
1052    /// // Continue processing new year's data with clean state
1053    /// ```
1054    pub fn reset_at_ouroboros(&mut self) -> Option<RangeBar> {
1055        let orphaned = self.current_bar_state.take().map(|state| state.bar);
1056        self.price_window = PriceWindow::new();
1057        self.last_trade_id = None;
1058        self.last_timestamp_us = 0;
1059        self.resumed_from_checkpoint = false;
1060        self.defer_open = false;
1061        // Issue #81: Clear bar boundary tracking at ouroboros reset.
1062        // Trades are preserved — still valid lookback for first bar of new segment.
1063        if let Some(ref mut history) = self.trade_history {
1064            history.reset_bar_boundaries();
1065        }
1066        orphaned
1067    }
1068}
1069
1070/// Internal state for a range bar being built
1071#[derive(Clone)]
1072struct RangeBarState {
1073    /// The range bar being constructed
1074    pub bar: RangeBar,
1075
1076    /// Upper breach threshold (FIXED from bar open)
1077    pub upper_threshold: FixedPoint,
1078
1079    /// Lower breach threshold (FIXED from bar open)
1080    pub lower_threshold: FixedPoint,
1081
1082    /// Accumulated trades for intra-bar feature computation (Issue #59)
1083    ///
1084    /// When intra-bar features are enabled, trades are accumulated here
1085    /// during bar construction and used to compute features at bar close.
1086    /// Cleared when bar closes to free memory.
1087    /// Issue #136: Optimized from 512 to 64 slots (saves 87.5% stack memory).
1088    /// Profile data: max trades/bar = 26 (P99 = 14), so 64 slots provides
1089    /// 2.5x safety margin while reducing from 32KB to 4KB per bar.
1090    pub accumulated_trades: SmallVec<[AggTrade; 64]>,
1091
1092    /// Issue #96: Scratch buffer for intra-bar price extraction
1093    /// SmallVec<[f64; 64]> keeps 95%+ of bars on stack (P99 trades/bar = 14, max = 26)
1094    /// Eliminates heap allocation for typical bars, spills transparently for large ones
1095    pub scratch_prices: SmallVec<[f64; 64]>,
1096
1097    /// Issue #96: Scratch buffer for intra-bar volume extraction
1098    /// Same sizing rationale as scratch_prices
1099    pub scratch_volumes: SmallVec<[f64; 64]>,
1100}
1101
1102impl RangeBarState {
1103    /// Create new range bar state from opening trade
1104    /// Issue #96 Task #98: Accept pre-computed threshold_ratio for fast threshold calculation
1105    #[inline]
1106    fn new(trade: &AggTrade, threshold_ratio: i64) -> Self {
1107        let bar = RangeBar::new(trade);
1108
1109        // Issue #96 Task #98: Use cached ratio instead of repeated division
1110        // This avoids BASIS_POINTS_SCALE division on every bar creation
1111        let (upper_threshold, lower_threshold) =
1112            bar.open.compute_range_thresholds_cached(threshold_ratio);
1113
1114        Self {
1115            bar,
1116            upper_threshold,
1117            lower_threshold,
1118            accumulated_trades: SmallVec::new(),
1119            scratch_prices: SmallVec::new(),
1120            scratch_volumes: SmallVec::new(),
1121        }
1122    }
1123
1124    /// Create new range bar state with intra-bar feature accumulation
1125    /// Issue #96 Task #98: Accept pre-computed threshold_ratio for fast threshold calculation
1126    #[inline]
1127    fn new_with_trade_accumulation(trade: &AggTrade, threshold_ratio: i64) -> Self {
1128        let bar = RangeBar::new(trade);
1129
1130        // Issue #96 Task #98: Use cached ratio instead of repeated division
1131        // This avoids BASIS_POINTS_SCALE division on every bar creation
1132        let (upper_threshold, lower_threshold) =
1133            bar.open.compute_range_thresholds_cached(threshold_ratio);
1134
1135        Self {
1136            bar,
1137            upper_threshold,
1138            lower_threshold,
1139            accumulated_trades: {
1140                let mut sv = SmallVec::new();
1141                sv.push(trade.clone());
1142                sv
1143            },
1144            scratch_prices: SmallVec::new(),
1145            scratch_volumes: SmallVec::new(),
1146        }
1147    }
1148
1149    /// Accumulate a trade for intra-bar feature computation
1150    ///
1151    /// Issue #96 Task #44: Only accumulates if intra-bar features are enabled,
1152    /// avoiding unnecessary clones for the majority of use cases where they're disabled.
1153    /// Issue #96 Task #79: #[inline] allows compiler to fold invariant branch
1154    /// (include_intra is constant for processor lifetime)
1155    #[inline]
1156    fn accumulate_trade(&mut self, trade: &AggTrade, include_intra: bool) {
1157        if include_intra {
1158            self.accumulated_trades.push(trade.clone());
1159        }
1160    }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165    use super::*;
1166    use crate::test_utils::{self, scenarios};
1167
1168    #[test]
1169    fn test_single_bar_no_breach() {
1170        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 dbps = 0.25%
1171
1172        // Create trades that stay within 250 dbps threshold
1173        let trades = scenarios::no_breach_sequence(250);
1174
1175        // Test strict algorithm compliance: no bars should be created without breach
1176        let bars = processor.process_agg_trade_records(&trades).unwrap();
1177        assert_eq!(
1178            bars.len(),
1179            0,
1180            "Strict algorithm should not create bars without breach"
1181        );
1182
1183        // Test analysis mode: incomplete bar should be available for analysis
1184        let bars_with_incomplete = processor
1185            .process_agg_trade_records_with_incomplete(&trades)
1186            .unwrap();
1187        assert_eq!(
1188            bars_with_incomplete.len(),
1189            1,
1190            "Analysis mode should include incomplete bar"
1191        );
1192
1193        let bar = &bars_with_incomplete[0];
1194        assert_eq!(bar.open.to_string(), "50000.00000000");
1195        assert_eq!(bar.high.to_string(), "50100.00000000");
1196        assert_eq!(bar.low.to_string(), "49900.00000000");
1197        assert_eq!(bar.close.to_string(), "49900.00000000");
1198    }
1199
1200    #[test]
1201    fn test_exact_breach_upward() {
1202        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 dbps = 0.25%
1203
1204        let trades = scenarios::exact_breach_upward(250);
1205
1206        // Test strict algorithm: only completed bars (with breach)
1207        let bars = processor.process_agg_trade_records(&trades).unwrap();
1208        assert_eq!(
1209            bars.len(),
1210            1,
1211            "Strict algorithm should only return completed bars"
1212        );
1213
1214        // First bar should close at breach
1215        let bar1 = &bars[0];
1216        assert_eq!(bar1.open.to_string(), "50000.00000000");
1217        // Breach at 250 dbps = 0.25% = 50000 * 1.0025 = 50125
1218        assert_eq!(bar1.close.to_string(), "50125.00000000"); // Breach tick included
1219        assert_eq!(bar1.high.to_string(), "50125.00000000");
1220        assert_eq!(bar1.low.to_string(), "50000.00000000");
1221
1222        // Test analysis mode: includes incomplete second bar
1223        let bars_with_incomplete = processor
1224            .process_agg_trade_records_with_incomplete(&trades)
1225            .unwrap();
1226        assert_eq!(
1227            bars_with_incomplete.len(),
1228            2,
1229            "Analysis mode should include incomplete bars"
1230        );
1231
1232        // Second bar should start at next tick price (not breach price)
1233        let bar2 = &bars_with_incomplete[1];
1234        assert_eq!(bar2.open.to_string(), "50500.00000000"); // Next tick after breach
1235        assert_eq!(bar2.close.to_string(), "50500.00000000");
1236    }
1237
1238    #[test]
1239    fn test_exact_breach_downward() {
1240        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
1241
1242        let trades = scenarios::exact_breach_downward(250);
1243
1244        let bars = processor.process_agg_trade_records(&trades).unwrap();
1245
1246        assert_eq!(bars.len(), 1);
1247
1248        let bar = &bars[0];
1249        assert_eq!(bar.open.to_string(), "50000.00000000");
1250        assert_eq!(bar.close.to_string(), "49875.00000000"); // Breach tick included
1251        assert_eq!(bar.high.to_string(), "50000.00000000");
1252        assert_eq!(bar.low.to_string(), "49875.00000000");
1253    }
1254
1255    #[test]
1256    fn test_large_gap_single_bar() {
1257        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
1258
1259        let trades = scenarios::large_gap_sequence();
1260
1261        let bars = processor.process_agg_trade_records(&trades).unwrap();
1262
1263        // Should create exactly ONE bar, not multiple bars to "fill the gap"
1264        assert_eq!(bars.len(), 1);
1265
1266        let bar = &bars[0];
1267        assert_eq!(bar.open.to_string(), "50000.00000000");
1268        assert_eq!(bar.close.to_string(), "51000.00000000");
1269        assert_eq!(bar.high.to_string(), "51000.00000000");
1270        assert_eq!(bar.low.to_string(), "50000.00000000");
1271    }
1272
1273    #[test]
1274    fn test_unsorted_trades_error() {
1275        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
1276
1277        let trades = scenarios::unsorted_sequence();
1278
1279        let result = processor.process_agg_trade_records(&trades);
1280        assert!(result.is_err());
1281
1282        match result {
1283            Err(ProcessingError::UnsortedTrades { index, .. }) => {
1284                assert_eq!(index, 1);
1285            }
1286            _ => panic!("Expected UnsortedTrades error"),
1287        }
1288    }
1289
1290    #[test]
1291    fn test_threshold_calculation() {
1292        let processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
1293
1294        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1295        let bar_state = RangeBarState::new(&trade, processor.threshold_ratio);
1296
1297        // 50000 * 0.0025 = 125 (25bps = 0.25%)
1298        assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
1299        assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
1300    }
1301
1302    #[test]
1303    fn test_empty_trades() {
1304        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
1305        let trades = scenarios::empty_sequence();
1306        let bars = processor.process_agg_trade_records(&trades).unwrap();
1307        assert_eq!(bars.len(), 0);
1308    }
1309
1310    #[test]
1311    fn test_debug_streaming_data() {
1312        let mut processor = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1313
1314        // Create trades similar to our test data
1315        let trades = vec![
1316            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1317            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
1318            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1319        ];
1320
1321        println!("Test data prices: 50014 -> 50163 -> 50032");
1322        println!("Expected price movements: +0.3% then -0.26%");
1323
1324        let bars = processor.process_agg_trade_records(&trades).unwrap();
1325        println!("Generated {} range bars", bars.len());
1326
1327        for (i, bar) in bars.iter().enumerate() {
1328            println!(
1329                "  Bar {}: O={} H={} L={} C={}",
1330                i + 1,
1331                bar.open,
1332                bar.high,
1333                bar.low,
1334                bar.close
1335            );
1336        }
1337
1338        // With a 0.1% threshold and 0.3% price movement, we should get at least 1 bar
1339        assert!(
1340            !bars.is_empty(),
1341            "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
1342        );
1343    }
1344
1345    #[test]
1346    fn test_threshold_validation() {
1347        // Valid threshold
1348        assert!(RangeBarProcessor::new(250).is_ok());
1349
1350        // Invalid: too low (0 × 0.1bps = 0%)
1351        assert!(matches!(
1352            RangeBarProcessor::new(0),
1353            Err(ProcessingError::InvalidThreshold {
1354                threshold_decimal_bps: 0
1355            })
1356        ));
1357
1358        // Invalid: too high (150,000 × 0.1bps = 15,000bps = 150%)
1359        assert!(matches!(
1360            RangeBarProcessor::new(150_000),
1361            Err(ProcessingError::InvalidThreshold {
1362                threshold_decimal_bps: 150_000
1363            })
1364        ));
1365
1366        // Valid boundary: minimum (1 × 0.1bps = 0.1bps = 0.001%)
1367        assert!(RangeBarProcessor::new(1).is_ok());
1368
1369        // Valid boundary: maximum (100,000 × 0.1bps = 10,000bps = 100%)
1370        assert!(RangeBarProcessor::new(100_000).is_ok());
1371    }
1372
1373    #[test]
1374    fn test_export_processor_with_manual_trades() {
1375        println!("Testing ExportRangeBarProcessor with same trade data...");
1376
1377        let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1378
1379        // Use same trades as the working basic test
1380        let trades = vec![
1381            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1382            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
1383            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1384        ];
1385
1386        println!(
1387            "Processing {} trades with ExportRangeBarProcessor...",
1388            trades.len()
1389        );
1390
1391        export_processor.process_trades_continuously(&trades);
1392        let bars = export_processor.get_all_completed_bars();
1393
1394        println!(
1395            "ExportRangeBarProcessor generated {} range bars",
1396            bars.len()
1397        );
1398        for (i, bar) in bars.iter().enumerate() {
1399            println!(
1400                "  Bar {}: O={} H={} L={} C={}",
1401                i + 1,
1402                bar.open,
1403                bar.high,
1404                bar.low,
1405                bar.close
1406            );
1407        }
1408
1409        // Should match the basic processor results (1 bar)
1410        assert!(
1411            !bars.is_empty(),
1412            "ExportRangeBarProcessor should generate same results as basic processor"
1413        );
1414    }
1415
1416    // === CHECKPOINT TESTS (Issues #2 and #3) ===
1417
1418    #[test]
1419    fn test_checkpoint_creation() {
1420        let mut processor = RangeBarProcessor::new(250).unwrap();
1421
1422        // Process some trades that don't complete a bar
1423        let trades = scenarios::no_breach_sequence(250);
1424        let _bars = processor.process_agg_trade_records(&trades).unwrap();
1425
1426        // Create checkpoint
1427        let checkpoint = processor.create_checkpoint("BTCUSDT");
1428
1429        assert_eq!(checkpoint.symbol, "BTCUSDT");
1430        assert_eq!(checkpoint.threshold_decimal_bps, 250);
1431        assert!(checkpoint.has_incomplete_bar()); // Should have incomplete bar
1432        assert!(checkpoint.thresholds.is_some()); // Thresholds should be saved
1433        assert!(checkpoint.last_trade_id.is_some()); // Should track last trade
1434    }
1435
1436    #[test]
1437    fn test_checkpoint_serialization_roundtrip() {
1438        let mut processor = RangeBarProcessor::new(250).unwrap();
1439
1440        // Process trades
1441        let trades = scenarios::no_breach_sequence(250);
1442        let _bars = processor.process_agg_trade_records(&trades).unwrap();
1443
1444        // Create checkpoint
1445        let checkpoint = processor.create_checkpoint("BTCUSDT");
1446
1447        // Serialize to JSON
1448        let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
1449
1450        // Deserialize back
1451        let restored: Checkpoint =
1452            serde_json::from_str(&json).expect("Deserialization should succeed");
1453
1454        assert_eq!(restored.symbol, checkpoint.symbol);
1455        assert_eq!(
1456            restored.threshold_decimal_bps,
1457            checkpoint.threshold_decimal_bps
1458        );
1459        assert_eq!(
1460            restored.incomplete_bar.is_some(),
1461            checkpoint.incomplete_bar.is_some()
1462        );
1463    }
1464
1465    #[test]
1466    fn test_cross_file_bar_continuation() {
1467        // This is the PRIMARY test for Issues #2 and #3
1468        // Verifies that incomplete bars continue correctly across file boundaries
1469
1470        // Create trades that span multiple bars
1471        let mut all_trades = Vec::new();
1472
1473        // Generate enough trades to produce multiple bars
1474        // Using 100bps threshold (1%) for clearer price movements
1475        let base_timestamp = 1640995200000000i64; // Microseconds
1476
1477        // Create a sequence where we'll have ~3-4 completed bars with remainder
1478        for i in 0..20 {
1479            let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
1480            let trade = test_utils::create_test_agg_trade(
1481                i + 1,
1482                &format!("{:.8}", price),
1483                "1.0",
1484                base_timestamp + (i * 1000000),
1485            );
1486            all_trades.push(trade);
1487        }
1488
1489        // === FULL PROCESSING (baseline) ===
1490        let mut processor_full = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1491        let bars_full = processor_full
1492            .process_agg_trade_records(&all_trades)
1493            .unwrap();
1494
1495        // === SPLIT PROCESSING WITH CHECKPOINT ===
1496        let split_point = 10; // Split in the middle
1497
1498        // Part 1: Process first half
1499        let mut processor_1 = RangeBarProcessor::new(100).unwrap();
1500        let part1_trades = &all_trades[0..split_point];
1501        let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
1502
1503        // Create checkpoint
1504        let checkpoint = processor_1.create_checkpoint("TEST");
1505
1506        // Part 2: Resume from checkpoint and process second half
1507        let mut processor_2 = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
1508        let part2_trades = &all_trades[split_point..];
1509        let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
1510
1511        // === VERIFY CONTINUATION ===
1512        // Total completed bars should match full processing
1513        let split_total = bars_1.len() + bars_2.len();
1514
1515        println!("Full processing: {} bars", bars_full.len());
1516        println!(
1517            "Split processing: {} + {} = {} bars",
1518            bars_1.len(),
1519            bars_2.len(),
1520            split_total
1521        );
1522
1523        assert_eq!(
1524            split_total,
1525            bars_full.len(),
1526            "Split processing should produce same bar count as full processing"
1527        );
1528
1529        // Verify the bars themselves match
1530        let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
1531        for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
1532            assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
1533            assert_eq!(
1534                full.close.0, split.close.0,
1535                "Bar {} close price mismatch",
1536                i
1537            );
1538        }
1539    }
1540
1541    #[test]
1542    fn test_verify_position_exact() {
1543        let mut processor = RangeBarProcessor::new(250).unwrap();
1544
1545        // Process some trades
1546        let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1547        let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1548
1549        let _ = processor.process_single_trade(&trade1);
1550        let _ = processor.process_single_trade(&trade2);
1551
1552        // Create next trade in sequence
1553        let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
1554
1555        // Verify position
1556        let verification = processor.verify_position(&next_trade);
1557
1558        assert_eq!(verification, PositionVerification::Exact);
1559    }
1560
1561    #[test]
1562    fn test_verify_position_gap() {
1563        let mut processor = RangeBarProcessor::new(250).unwrap();
1564
1565        // Process some trades
1566        let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1567        let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1568
1569        let _ = processor.process_single_trade(&trade1);
1570        let _ = processor.process_single_trade(&trade2);
1571
1572        // Create next trade with gap (skip IDs 102-104)
1573        let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
1574
1575        // Verify position
1576        let verification = processor.verify_position(&next_trade);
1577
1578        match verification {
1579            PositionVerification::Gap {
1580                expected_id,
1581                actual_id,
1582                missing_count,
1583            } => {
1584                assert_eq!(expected_id, 102);
1585                assert_eq!(actual_id, 105);
1586                assert_eq!(missing_count, 3);
1587            }
1588            _ => panic!("Expected Gap verification, got {:?}", verification),
1589        }
1590    }
1591
1592    // Issue #96 Task #97: Test TimestampOnly branch (Exness path, no trade IDs)
1593    #[test]
1594    fn test_verify_position_timestamp_only() {
1595        // Fresh processor has last_trade_id = None (simulates Exness path)
1596        let processor = RangeBarProcessor::new(250).unwrap();
1597
1598        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 5000000);
1599        let verification = processor.verify_position(&trade);
1600
1601        // last_trade_id is None → TimestampOnly branch
1602        // gap_ms = (5000000 - 0) / 1000 = 5000
1603        match verification {
1604            PositionVerification::TimestampOnly { gap_ms } => {
1605                assert_eq!(gap_ms, 5000, "gap_ms should be (timestamp - 0) / 1000");
1606            }
1607            _ => panic!("Expected TimestampOnly verification, got {:?}", verification),
1608        }
1609    }
1610
1611    #[test]
1612    fn test_checkpoint_clean_completion() {
1613        // Test when last trade completes a bar with no remainder
1614        // In range bar algorithm: breach trade closes bar, NEXT trade opens new bar
1615        // If there's no next trade, there's no incomplete bar
1616        let mut processor = RangeBarProcessor::new(100).unwrap(); // 10bps
1617
1618        // Create trades that complete exactly one bar
1619        let trades = vec![
1620            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1621            test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // ~0.2% move, breaches 0.1%
1622        ];
1623
1624        let bars = processor.process_agg_trade_records(&trades).unwrap();
1625        assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1626
1627        // Create checkpoint - should NOT have incomplete bar
1628        // (breach trade closes bar, no next trade to open new bar)
1629        let checkpoint = processor.create_checkpoint("TEST");
1630
1631        // With defer_open logic, the next bar isn't started until the next trade
1632        assert!(
1633            !checkpoint.has_incomplete_bar(),
1634            "No incomplete bar when last trade was a breach with no following trade"
1635        );
1636    }
1637
1638    #[test]
1639    fn test_checkpoint_with_remainder() {
1640        // Test when we have trades remaining after a completed bar
1641        let mut processor = RangeBarProcessor::new(100).unwrap(); // 10bps
1642
1643        // Create trades: bar completes at trade 2, trade 3 starts new bar
1644        let trades = vec![
1645            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1646            test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // Breach
1647            test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), // Opens new bar
1648        ];
1649
1650        let bars = processor.process_agg_trade_records(&trades).unwrap();
1651        assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1652
1653        // Create checkpoint - should have incomplete bar from trade 3
1654        let checkpoint = processor.create_checkpoint("TEST");
1655
1656        assert!(
1657            checkpoint.has_incomplete_bar(),
1658            "Should have incomplete bar from trade 3"
1659        );
1660
1661        // Verify the incomplete bar has correct data
1662        let incomplete = checkpoint.incomplete_bar.unwrap();
1663        assert_eq!(
1664            incomplete.open.to_string(),
1665            "50110.00000000",
1666            "Incomplete bar should open at trade 3 price"
1667        );
1668    }
1669
1670    /// Issue #46: Verify streaming and batch paths produce identical bars
1671    ///
1672    /// The batch path (`process_agg_trade_records`) and streaming path
1673    /// (`process_single_trade`) must produce identical OHLCV output for
1674    /// the same input trades. This test catches regressions where the
1675    /// breaching trade is double-counted or bar boundaries differ.
1676    #[test]
1677    fn test_streaming_batch_parity() {
1678        let threshold = 250; // 250 dbps = 0.25%
1679
1680        // Build a sequence with multiple breaches
1681        let trades = test_utils::AggTradeBuilder::new()
1682            .add_trade(1, 1.0, 0) // Open first bar at 50000
1683            .add_trade(2, 1.001, 1000) // +0.1% - accumulate
1684            .add_trade(3, 1.003, 2000) // +0.3% - breach (>0.25%)
1685            .add_trade(4, 1.004, 3000) // Opens second bar
1686            .add_trade(5, 1.005, 4000) // Accumulate
1687            .add_trade(6, 1.008, 5000) // +0.4% from bar 2 open - breach
1688            .add_trade(7, 1.009, 6000) // Opens third bar
1689            .build();
1690
1691        // === BATCH PATH ===
1692        let mut batch_processor = RangeBarProcessor::new(threshold).unwrap();
1693        let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
1694        let batch_incomplete = batch_processor.get_incomplete_bar();
1695
1696        // === STREAMING PATH ===
1697        let mut stream_processor = RangeBarProcessor::new(threshold).unwrap();
1698        let mut stream_bars: Vec<RangeBar> = Vec::new();
1699        for trade in &trades {
1700            if let Some(bar) = stream_processor
1701                .process_single_trade(trade)
1702                .unwrap()
1703            {
1704                stream_bars.push(bar);
1705            }
1706        }
1707        let stream_incomplete = stream_processor.get_incomplete_bar();
1708
1709        // === VERIFY PARITY ===
1710        assert_eq!(
1711            batch_bars.len(),
1712            stream_bars.len(),
1713            "Batch and streaming should produce same number of completed bars"
1714        );
1715
1716        for (i, (batch_bar, stream_bar)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1717            assert_eq!(
1718                batch_bar.open, stream_bar.open,
1719                "Bar {i}: open price mismatch"
1720            );
1721            assert_eq!(
1722                batch_bar.close, stream_bar.close,
1723                "Bar {i}: close price mismatch"
1724            );
1725            assert_eq!(
1726                batch_bar.high, stream_bar.high,
1727                "Bar {i}: high price mismatch"
1728            );
1729            assert_eq!(batch_bar.low, stream_bar.low, "Bar {i}: low price mismatch");
1730            assert_eq!(
1731                batch_bar.volume, stream_bar.volume,
1732                "Bar {i}: volume mismatch (double-counting?)"
1733            );
1734            assert_eq!(
1735                batch_bar.open_time, stream_bar.open_time,
1736                "Bar {i}: open_time mismatch"
1737            );
1738            assert_eq!(
1739                batch_bar.close_time, stream_bar.close_time,
1740                "Bar {i}: close_time mismatch"
1741            );
1742            assert_eq!(
1743                batch_bar.individual_trade_count, stream_bar.individual_trade_count,
1744                "Bar {i}: trade count mismatch"
1745            );
1746        }
1747
1748        // Verify incomplete bars match
1749        match (batch_incomplete, stream_incomplete) {
1750            (Some(b), Some(s)) => {
1751                assert_eq!(b.open, s.open, "Incomplete bar: open mismatch");
1752                assert_eq!(b.close, s.close, "Incomplete bar: close mismatch");
1753                assert_eq!(b.volume, s.volume, "Incomplete bar: volume mismatch");
1754            }
1755            (None, None) => {} // Both finished cleanly
1756            _ => panic!("Incomplete bar presence mismatch between batch and streaming"),
1757        }
1758    }
1759
1760    /// Issue #96: Proptest-enhanced batch vs streaming parity
1761    ///
1762    /// Generates random trade sequences (200-500 trades) with realistic price
1763    /// movements and verifies bit-exact parity between batch and streaming paths
1764    /// on ALL bar fields including microstructure features.
1765    mod proptest_batch_streaming_parity {
1766        use super::*;
1767        use proptest::prelude::*;
1768
1769        /// Generate a realistic trade sequence with random price movements
1770        fn trade_sequence(
1771            n: usize,
1772            base_price: f64,
1773            volatility: f64,
1774        ) -> Vec<AggTrade> {
1775            let mut trades = Vec::with_capacity(n);
1776            let mut price = base_price;
1777            let base_ts = 1640995200000i64; // 2022-01-01
1778
1779            for i in 0..n {
1780                // Deterministic price walk using sin/cos (proptest handles seed)
1781                let step = ((i as f64 * 0.3).sin() * volatility)
1782                    + ((i as f64 * 0.07).cos() * volatility * 0.5);
1783                price += step;
1784                // Clamp to avoid negative prices
1785                if price < 100.0 {
1786                    price = 100.0 + (i as f64 * 0.01).sin().abs() * 50.0;
1787                }
1788
1789                let trade = test_utils::create_test_agg_trade_with_range(
1790                    i as i64 + 1,
1791                    &format!("{:.8}", price),
1792                    "1.50000000",
1793                    base_ts + (i as i64 * 500), // 500ms apart
1794                    (i as i64 + 1) * 10,
1795                    (i as i64 + 1) * 10,
1796                    i % 3 != 0, // Mix of buy/sell sides
1797                );
1798                trades.push(trade);
1799            }
1800            trades
1801        }
1802
1803        /// Assert two bars are bit-exact on all OHLCV and microstructure fields
1804        fn assert_bar_parity(i: usize, batch: &RangeBar, stream: &RangeBar) {
1805            // Tier 1: OHLCV core
1806            assert_eq!(batch.open_time, stream.open_time, "Bar {i}: open_time");
1807            assert_eq!(batch.close_time, stream.close_time, "Bar {i}: close_time");
1808            assert_eq!(batch.open, stream.open, "Bar {i}: open");
1809            assert_eq!(batch.high, stream.high, "Bar {i}: high");
1810            assert_eq!(batch.low, stream.low, "Bar {i}: low");
1811            assert_eq!(batch.close, stream.close, "Bar {i}: close");
1812
1813            // Tier 2: Volume accumulators
1814            assert_eq!(batch.volume, stream.volume, "Bar {i}: volume");
1815            assert_eq!(batch.turnover, stream.turnover, "Bar {i}: turnover");
1816            assert_eq!(batch.buy_volume, stream.buy_volume, "Bar {i}: buy_volume");
1817            assert_eq!(batch.sell_volume, stream.sell_volume, "Bar {i}: sell_volume");
1818            assert_eq!(batch.buy_turnover, stream.buy_turnover, "Bar {i}: buy_turnover");
1819            assert_eq!(batch.sell_turnover, stream.sell_turnover, "Bar {i}: sell_turnover");
1820
1821            // Tier 3: Trade tracking
1822            assert_eq!(batch.individual_trade_count, stream.individual_trade_count, "Bar {i}: trade_count");
1823            assert_eq!(batch.agg_record_count, stream.agg_record_count, "Bar {i}: agg_record_count");
1824            assert_eq!(batch.first_trade_id, stream.first_trade_id, "Bar {i}: first_trade_id");
1825            assert_eq!(batch.last_trade_id, stream.last_trade_id, "Bar {i}: last_trade_id");
1826            assert_eq!(batch.first_agg_trade_id, stream.first_agg_trade_id, "Bar {i}: first_agg_trade_id");
1827            assert_eq!(batch.last_agg_trade_id, stream.last_agg_trade_id, "Bar {i}: last_agg_trade_id");
1828            assert_eq!(batch.buy_trade_count, stream.buy_trade_count, "Bar {i}: buy_trade_count");
1829            assert_eq!(batch.sell_trade_count, stream.sell_trade_count, "Bar {i}: sell_trade_count");
1830
1831            // Tier 4: VWAP
1832            assert_eq!(batch.vwap, stream.vwap, "Bar {i}: vwap");
1833
1834            // Tier 5: Microstructure f64 features (bit-exact comparison)
1835            assert_eq!(batch.duration_us, stream.duration_us, "Bar {i}: duration_us");
1836            assert_eq!(batch.ofi.to_bits(), stream.ofi.to_bits(), "Bar {i}: ofi");
1837            assert_eq!(batch.vwap_close_deviation.to_bits(), stream.vwap_close_deviation.to_bits(), "Bar {i}: vwap_close_dev");
1838            assert_eq!(batch.price_impact.to_bits(), stream.price_impact.to_bits(), "Bar {i}: price_impact");
1839            assert_eq!(batch.kyle_lambda_proxy.to_bits(), stream.kyle_lambda_proxy.to_bits(), "Bar {i}: kyle_lambda");
1840            assert_eq!(batch.trade_intensity.to_bits(), stream.trade_intensity.to_bits(), "Bar {i}: trade_intensity");
1841            assert_eq!(batch.volume_per_trade.to_bits(), stream.volume_per_trade.to_bits(), "Bar {i}: vol_per_trade");
1842            assert_eq!(batch.aggression_ratio.to_bits(), stream.aggression_ratio.to_bits(), "Bar {i}: aggression_ratio");
1843            assert_eq!(batch.aggregation_density_f64.to_bits(), stream.aggregation_density_f64.to_bits(), "Bar {i}: agg_density");
1844            assert_eq!(batch.turnover_imbalance.to_bits(), stream.turnover_imbalance.to_bits(), "Bar {i}: turnover_imbalance");
1845        }
1846
1847        proptest! {
1848            /// 200-500 random trades, 250 dbps threshold
1849            #[test]
1850            fn batch_streaming_parity_random(
1851                n in 200usize..500,
1852                volatility in 10.0f64..200.0,
1853            ) {
1854                let trades = trade_sequence(n, 50000.0, volatility);
1855
1856                // Batch path
1857                let mut batch_proc = RangeBarProcessor::new(250).unwrap();
1858                let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
1859                let batch_incomplete = batch_proc.get_incomplete_bar();
1860
1861                // Streaming path
1862                let mut stream_proc = RangeBarProcessor::new(250).unwrap();
1863                let mut stream_bars: Vec<RangeBar> = Vec::new();
1864                for trade in &trades {
1865                    if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
1866                        stream_bars.push(bar);
1867                    }
1868                }
1869                let stream_incomplete = stream_proc.get_incomplete_bar();
1870
1871                // Bar count parity
1872                prop_assert_eq!(batch_bars.len(), stream_bars.len(),
1873                    "Completed bar count mismatch: batch={}, stream={} for n={}, vol={}",
1874                    batch_bars.len(), stream_bars.len(), n, volatility);
1875
1876                // Field-level parity for all completed bars
1877                for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1878                    assert_bar_parity(i, b, s);
1879                }
1880
1881                // Incomplete bar parity
1882                match (&batch_incomplete, &stream_incomplete) {
1883                    (Some(b), Some(s)) => assert_bar_parity(batch_bars.len(), b, s),
1884                    (None, None) => {}
1885                    _ => prop_assert!(false,
1886                        "Incomplete bar presence mismatch: batch={}, stream={}",
1887                        batch_incomplete.is_some(), stream_incomplete.is_some()),
1888                }
1889            }
1890
1891            /// Vary threshold: 100-1000 dbps
1892            #[test]
1893            fn batch_streaming_parity_thresholds(
1894                threshold in 100u32..1000,
1895            ) {
1896                let trades = trade_sequence(300, 50000.0, 80.0);
1897
1898                let mut batch_proc = RangeBarProcessor::new(threshold).unwrap();
1899                let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
1900
1901                let mut stream_proc = RangeBarProcessor::new(threshold).unwrap();
1902                let mut stream_bars: Vec<RangeBar> = Vec::new();
1903                for trade in &trades {
1904                    if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
1905                        stream_bars.push(bar);
1906                    }
1907                }
1908
1909                prop_assert_eq!(batch_bars.len(), stream_bars.len(),
1910                    "Bar count mismatch at threshold={}", threshold);
1911
1912                for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1913                    assert_bar_parity(i, b, s);
1914                }
1915            }
1916        }
1917    }
1918
1919    /// Issue #46: After breach, next trade opens new bar (not breaching trade)
1920    #[test]
1921    fn test_defer_open_new_bar_opens_with_next_trade() {
1922        let mut processor = RangeBarProcessor::new(250).unwrap();
1923
1924        // Trade 1: Opens bar at 50000
1925        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1926        assert!(processor.process_single_trade(&t1).unwrap().is_none());
1927
1928        // Trade 2: Breaches threshold (+0.3%)
1929        let t2 = test_utils::create_test_agg_trade(2, "50150.0", "2.0", 2000);
1930        let bar = processor.process_single_trade(&t2).unwrap();
1931        assert!(bar.is_some(), "Should close bar on breach");
1932
1933        let closed_bar = bar.unwrap();
1934        assert_eq!(closed_bar.open.to_string(), "50000.00000000");
1935        assert_eq!(closed_bar.close.to_string(), "50150.00000000");
1936
1937        // After breach, no incomplete bar should exist
1938        assert!(
1939            processor.get_incomplete_bar().is_none(),
1940            "No incomplete bar after breach - defer_open is true"
1941        );
1942
1943        // Trade 3: Should open NEW bar (not the breaching trade)
1944        let t3 = test_utils::create_test_agg_trade(3, "50100.0", "3.0", 3000);
1945        assert!(processor.process_single_trade(&t3).unwrap().is_none());
1946
1947        let incomplete = processor.get_incomplete_bar().unwrap();
1948        assert_eq!(
1949            incomplete.open.to_string(),
1950            "50100.00000000",
1951            "New bar should open at trade 3's price, not trade 2's"
1952        );
1953    }
1954
1955    // === Memory efficiency tests (R1/R2/R3) ===
1956
1957    #[test]
1958    fn test_bar_close_take_single_trade() {
1959        // R1: Verify bar close via single-trade path produces correct OHLCV after
1960        // clone→take optimization. Uses single_breach_sequence that triggers breach.
1961        let mut processor = RangeBarProcessor::new(250).unwrap();
1962        let trades = scenarios::single_breach_sequence(250);
1963
1964        for trade in &trades[..trades.len() - 1] {
1965            let result = processor.process_single_trade(trade).unwrap();
1966            assert!(result.is_none());
1967        }
1968
1969        // Last trade triggers breach
1970        let bar = processor
1971            .process_single_trade(trades.last().unwrap())
1972            .unwrap()
1973            .expect("Should produce completed bar");
1974
1975        // Verify OHLCV integrity after take() optimization
1976        assert_eq!(bar.open.to_string(), "50000.00000000");
1977        assert!(bar.high >= bar.open.max(bar.close));
1978        assert!(bar.low <= bar.open.min(bar.close));
1979        assert!(bar.volume > 0);
1980
1981        // Verify processor state is clean after bar close
1982        assert!(processor.get_incomplete_bar().is_none());
1983    }
1984
1985    #[test]
1986    fn test_bar_close_take_batch() {
1987        // R2: Verify batch path produces correct bars after clone→take optimization.
1988        // large_sequence generates enough trades to trigger multiple breaches.
1989        let mut processor = RangeBarProcessor::new(250).unwrap();
1990        let trades = scenarios::large_sequence(500);
1991
1992        let bars = processor.process_agg_trade_records(&trades).unwrap();
1993        assert!(
1994            !bars.is_empty(),
1995            "Should produce at least one completed bar"
1996        );
1997
1998        // Verify every bar has valid OHLCV invariants
1999        for bar in &bars {
2000            assert!(bar.high >= bar.open.max(bar.close));
2001            assert!(bar.low <= bar.open.min(bar.close));
2002            assert!(bar.volume > 0);
2003            assert!(bar.close_time >= bar.open_time);
2004        }
2005    }
2006
2007    #[test]
2008    fn test_checkpoint_conditional_clone() {
2009        // R3: Verify checkpoint state is preserved correctly with both
2010        // include_incomplete=true and include_incomplete=false.
2011        let trades = scenarios::no_breach_sequence(250);
2012
2013        // Test with include_incomplete=false (move, no clone)
2014        let mut processor1 = RangeBarProcessor::new(250).unwrap();
2015        let bars_without = processor1.process_agg_trade_records(&trades).unwrap();
2016        assert_eq!(bars_without.len(), 0);
2017        // Checkpoint should be preserved
2018        assert!(processor1.get_incomplete_bar().is_some());
2019
2020        // Test with include_incomplete=true (clone + consume)
2021        let mut processor2 = RangeBarProcessor::new(250).unwrap();
2022        let bars_with = processor2
2023            .process_agg_trade_records_with_incomplete(&trades)
2024            .unwrap();
2025        assert_eq!(bars_with.len(), 1);
2026        // Checkpoint should ALSO be preserved (cloned before consume)
2027        assert!(processor2.get_incomplete_bar().is_some());
2028
2029        // Both checkpoints should have identical bar content
2030        let cp1 = processor1.get_incomplete_bar().unwrap();
2031        let cp2 = processor2.get_incomplete_bar().unwrap();
2032        assert_eq!(cp1.open, cp2.open);
2033        assert_eq!(cp1.close, cp2.close);
2034        assert_eq!(cp1.high, cp2.high);
2035        assert_eq!(cp1.low, cp2.low);
2036    }
2037
2038    #[test]
2039    fn test_checkpoint_v1_to_v2_migration() {
2040        // Issue #85 Phase 2: Verify v1→v2 checkpoint migration
2041        // Simulate loading an old v1 checkpoint (without version field)
2042        let v1_json = r#"{
2043            "symbol": "BTCUSDT",
2044            "threshold_decimal_bps": 250,
2045            "incomplete_bar": null,
2046            "thresholds": null,
2047            "last_timestamp_us": 1640995200000000,
2048            "last_trade_id": 5000,
2049            "price_hash": 0,
2050            "anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0},
2051            "prevent_same_timestamp_close": true,
2052            "defer_open": false
2053        }"#;
2054
2055        // Deserialize old v1 checkpoint
2056        let checkpoint: Checkpoint = serde_json::from_str(v1_json).unwrap();
2057        assert_eq!(checkpoint.version, 1, "Old checkpoints should default to v1");
2058        assert_eq!(checkpoint.symbol, "BTCUSDT");
2059        assert_eq!(checkpoint.threshold_decimal_bps, 250);
2060
2061        // Restore processor from v1 checkpoint (triggers migration)
2062        let mut processor = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2063
2064        // Verify processor is ready to continue
2065        assert!(!processor.get_incomplete_bar().is_some(), "No incomplete bar before processing");
2066
2067        // Process some trades to verify migration worked
2068        let trades = scenarios::single_breach_sequence(250);
2069        let bars = processor.process_agg_trade_records(&trades).unwrap();
2070
2071        // Should produce a bar after migration
2072        assert!(!bars.is_empty(), "Should produce bars after v1→v2 migration");
2073        // Verify bar has valid OHLCV (symbol is in Checkpoint, not RangeBar)
2074        assert!(bars[0].volume > 0, "Bar should have volume after migration");
2075        assert!(bars[0].close_time >= bars[0].open_time, "Bar times should be valid");
2076
2077        // Create new checkpoint from processor after migration
2078        let new_checkpoint = processor.create_checkpoint("BTCUSDT");
2079        assert_eq!(new_checkpoint.version, 2, "New checkpoints should be v2");
2080        assert_eq!(new_checkpoint.symbol, "BTCUSDT");
2081
2082        // Verify new checkpoint can be serialized and deserialized
2083        let json = serde_json::to_string(&new_checkpoint).unwrap();
2084        let restored: Checkpoint = serde_json::from_str(&json).unwrap();
2085        assert_eq!(restored.version, 2);
2086        assert_eq!(restored.symbol, "BTCUSDT");
2087    }
2088
2089    // =========================================================================
2090    // Issue #96: Checkpoint error path tests
2091    // =========================================================================
2092
2093    #[test]
2094    fn test_from_checkpoint_invalid_threshold_zero() {
2095        let checkpoint = Checkpoint::new(
2096            "BTCUSDT".to_string(), 0, None, None, 0, None, 0, true,
2097        );
2098        match RangeBarProcessor::from_checkpoint(checkpoint) {
2099            Err(CheckpointError::InvalidThreshold { threshold: 0, .. }) => {}
2100            other => panic!("Expected InvalidThreshold(0), got {:?}", other.err()),
2101        }
2102    }
2103
2104    #[test]
2105    fn test_from_checkpoint_invalid_threshold_too_high() {
2106        let checkpoint = Checkpoint::new(
2107            "BTCUSDT".to_string(), 200_000, None, None, 0, None, 0, true,
2108        );
2109        match RangeBarProcessor::from_checkpoint(checkpoint) {
2110            Err(CheckpointError::InvalidThreshold { threshold: 200_000, .. }) => {}
2111            other => panic!("Expected InvalidThreshold(200000), got {:?}", other.err()),
2112        }
2113    }
2114
2115    #[test]
2116    fn test_from_checkpoint_missing_thresholds() {
2117        let bar = RangeBar::new(&test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000));
2118        let mut checkpoint = Checkpoint::new(
2119            "BTCUSDT".to_string(), 250, None, None, 0, None, 0, true,
2120        );
2121        checkpoint.incomplete_bar = Some(bar);
2122        checkpoint.thresholds = None;
2123
2124        match RangeBarProcessor::from_checkpoint(checkpoint) {
2125            Err(CheckpointError::MissingThresholds) => {}
2126            other => panic!("Expected MissingThresholds, got {:?}", other.err()),
2127        }
2128    }
2129
2130    #[test]
2131    fn test_from_checkpoint_unknown_version_treated_as_v2() {
2132        let mut checkpoint = Checkpoint::new(
2133            "BTCUSDT".to_string(), 250, None, None, 0, None, 0, true,
2134        );
2135        checkpoint.version = 99;
2136
2137        let processor = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2138        assert_eq!(processor.threshold_decimal_bps(), 250);
2139    }
2140
2141    #[test]
2142    fn test_from_checkpoint_valid_with_incomplete_bar() {
2143        use crate::fixed_point::FixedPoint;
2144        let bar = RangeBar::new(&test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000));
2145        let upper = FixedPoint::from_str("50125.0").unwrap();
2146        let lower = FixedPoint::from_str("49875.0").unwrap();
2147
2148        let checkpoint = Checkpoint::new(
2149            "BTCUSDT".to_string(), 250, Some(bar), Some((upper, lower)), 0, None, 0, true,
2150        );
2151
2152        let processor = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2153        assert!(processor.get_incomplete_bar().is_some(), "Should restore incomplete bar");
2154    }
2155
2156    // =========================================================================
2157    // Issue #96: Ouroboros reset tests
2158    // =========================================================================
2159
2160    #[test]
2161    fn test_reset_at_ouroboros_with_orphan() {
2162        let mut processor = RangeBarProcessor::new(250).unwrap();
2163
2164        // Feed trades to create incomplete bar
2165        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2166        let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 2000);
2167        assert!(processor.process_single_trade(&t1).unwrap().is_none());
2168        assert!(processor.process_single_trade(&t2).unwrap().is_none());
2169        assert!(processor.get_incomplete_bar().is_some(), "Should have incomplete bar");
2170
2171        // Reset at ouroboros boundary - should return orphaned bar
2172        let orphan = processor.reset_at_ouroboros();
2173        assert!(orphan.is_some(), "Should return orphaned bar");
2174        let orphan_bar = orphan.unwrap();
2175        assert_eq!(orphan_bar.open.to_string(), "50000.00000000");
2176
2177        // After reset, no incomplete bar
2178        assert!(processor.get_incomplete_bar().is_none(), "No bar after reset");
2179    }
2180
2181    #[test]
2182    fn test_reset_at_ouroboros_clean_state() {
2183        let mut processor = RangeBarProcessor::new(250).unwrap();
2184
2185        // Reset without any trades processed - should return None
2186        let orphan = processor.reset_at_ouroboros();
2187        assert!(orphan.is_none(), "No orphan when state is clean");
2188        assert!(processor.get_incomplete_bar().is_none());
2189    }
2190
2191    #[test]
2192    fn test_reset_at_ouroboros_clears_defer_open() {
2193        let mut processor = RangeBarProcessor::new(250).unwrap();
2194
2195        // Create a breach to set defer_open = true
2196        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2197        let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); // +0.4% breach
2198        processor.process_single_trade(&t1).unwrap();
2199        let bar = processor.process_single_trade(&t2).unwrap();
2200        assert!(bar.is_some(), "Should breach");
2201
2202        // After breach, defer_open is true - no incomplete bar
2203        assert!(processor.get_incomplete_bar().is_none());
2204
2205        // Reset at ouroboros should clear defer_open
2206        processor.reset_at_ouroboros();
2207
2208        // New trade should open fresh bar (defer_open was cleared)
2209        let t3 = test_utils::create_test_agg_trade(3, "50000.0", "1.0", 3000);
2210        processor.process_single_trade(&t3).unwrap();
2211        assert!(processor.get_incomplete_bar().is_some(), "Should have new bar after reset");
2212    }
2213
2214    // === EDGE CASE TESTS (Issue #96 Task #21) ===
2215
2216    #[test]
2217    fn test_single_trade_no_bar() {
2218        // A single trade cannot breach — no bar should be produced
2219        let mut processor = RangeBarProcessor::new(250).unwrap();
2220        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2221        let bars = processor.process_agg_trade_records(&[trade]).unwrap();
2222        assert_eq!(bars.len(), 0, "Single trade should not produce a completed bar");
2223        assert!(processor.get_incomplete_bar().is_some(), "Should have incomplete bar");
2224    }
2225
2226    #[test]
2227    fn test_identical_timestamps_no_close() {
2228        // Issue #36: Bar cannot close when breach tick has same timestamp as open
2229        let mut processor = RangeBarProcessor::new(250).unwrap();
2230        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2231        let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1000); // Same timestamp, breaches
2232        let bars = processor.process_agg_trade_records(&[t1, t2]).unwrap();
2233        assert_eq!(bars.len(), 0, "Bar should not close on same timestamp as open (Issue #36)");
2234    }
2235
2236    #[test]
2237    fn test_identical_timestamps_then_different_closes() {
2238        // Same-timestamp trades followed by different-timestamp breach should close
2239        let mut processor = RangeBarProcessor::new(250).unwrap();
2240        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2241        let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 1000); // Same ts
2242        let t3 = test_utils::create_test_agg_trade(3, "50200.0", "1.0", 2000); // Different ts, breach
2243        let bars = processor.process_agg_trade_records(&[t1, t2, t3]).unwrap();
2244        assert_eq!(bars.len(), 1, "Should close when breach at different timestamp");
2245    }
2246
2247    #[test]
2248    fn test_streaming_defer_open_semantics() {
2249        // After breach via process_single_trade, next trade should open new bar
2250        let mut processor = RangeBarProcessor::new(250).unwrap();
2251        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2252        let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); // Breach
2253        let t3 = test_utils::create_test_agg_trade(3, "51000.0", "1.0", 3000); // Opens new bar
2254
2255        processor.process_single_trade(&t1).unwrap();
2256        let bar = processor.process_single_trade(&t2).unwrap();
2257        assert!(bar.is_some(), "Trade 2 should cause a breach");
2258
2259        // After breach, no incomplete bar (defer_open state)
2260        assert!(processor.get_incomplete_bar().is_none());
2261
2262        // Next trade opens a fresh bar
2263        let bar2 = processor.process_single_trade(&t3).unwrap();
2264        assert!(bar2.is_none(), "Trade 3 should open new bar, not breach");
2265        let incomplete = processor.get_incomplete_bar().unwrap();
2266        assert_eq!(incomplete.open.to_f64(), 51000.0, "New bar should open at t3 price");
2267    }
2268
2269    #[test]
2270    fn test_process_empty_then_trades() {
2271        // Processing empty slice should be no-op, then normal processing works
2272        let mut processor = RangeBarProcessor::new(250).unwrap();
2273        let bars = processor.process_agg_trade_records(&[]).unwrap();
2274        assert_eq!(bars.len(), 0);
2275        assert!(processor.get_incomplete_bar().is_none());
2276
2277        // Now process a real trade
2278        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2279        let bars = processor.process_agg_trade_records(&[trade]).unwrap();
2280        assert_eq!(bars.len(), 0);
2281        assert!(processor.get_incomplete_bar().is_some());
2282    }
2283
2284    #[test]
2285    fn test_multiple_breaches_in_batch() {
2286        // Multiple bars should form from a batch with repeated breaches
2287        let mut processor = RangeBarProcessor::new(250).unwrap();
2288        let trades = vec![
2289            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
2290            test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000),  // Breach 1
2291            test_utils::create_test_agg_trade(3, "50500.0", "1.0", 3000),  // Opens bar 2
2292            test_utils::create_test_agg_trade(4, "50700.0", "1.0", 4000),  // Breach 2
2293            test_utils::create_test_agg_trade(5, "51000.0", "1.0", 5000),  // Opens bar 3
2294        ];
2295        let bars = processor.process_agg_trade_records(&trades).unwrap();
2296        assert_eq!(bars.len(), 2, "Should produce 2 completed bars from 2 breaches");
2297    }
2298
2299    #[test]
2300    fn test_streaming_batch_parity_extended() {
2301        // Task #29: Comprehensive streaming/batch parity with 20 trades producing 5+ bars
2302        // Uses 100 dbps (0.1%) threshold for more frequent breaches
2303        let threshold = 100;
2304
2305        // Build a zigzag price sequence that repeatedly breaches 0.1%
2306        let mut trades = Vec::new();
2307        let mut price = 50000.0;
2308        for i in 0..20 {
2309            // Alternate up and down movements exceeding 0.1%
2310            if i % 3 == 0 && i > 0 {
2311                price *= 1.002; // +0.2% → breach upward
2312            } else if i % 3 == 1 && i > 1 {
2313                price *= 0.998; // -0.2% → may breach downward
2314            } else {
2315                price *= 1.0005; // Small move, no breach
2316            }
2317            trades.push(test_utils::create_test_agg_trade(
2318                (i + 1) as i64,
2319                &format!("{:.8}", price),
2320                "1.0",
2321                (i as i64 + 1) * 1000,
2322            ));
2323        }
2324
2325        // === BATCH PATH ===
2326        let mut batch_processor = RangeBarProcessor::new(threshold).unwrap();
2327        let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
2328
2329        // === STREAMING PATH ===
2330        let mut stream_processor = RangeBarProcessor::new(threshold).unwrap();
2331        let mut stream_bars: Vec<RangeBar> = Vec::new();
2332        for trade in &trades {
2333            if let Some(bar) = stream_processor.process_single_trade(trade).unwrap() {
2334                stream_bars.push(bar);
2335            }
2336        }
2337
2338        // === VERIFY PARITY ===
2339        assert!(batch_bars.len() >= 3, "Should produce at least 3 bars from zigzag pattern");
2340        assert_eq!(
2341            batch_bars.len(), stream_bars.len(),
2342            "Batch ({}) and streaming ({}) bar count mismatch",
2343            batch_bars.len(), stream_bars.len()
2344        );
2345
2346        for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
2347            assert_eq!(b.open, s.open, "Bar {i}: open mismatch");
2348            assert_eq!(b.close, s.close, "Bar {i}: close mismatch");
2349            assert_eq!(b.high, s.high, "Bar {i}: high mismatch");
2350            assert_eq!(b.low, s.low, "Bar {i}: low mismatch");
2351            assert_eq!(b.volume, s.volume, "Bar {i}: volume mismatch");
2352            assert_eq!(b.open_time, s.open_time, "Bar {i}: open_time mismatch");
2353            assert_eq!(b.close_time, s.close_time, "Bar {i}: close_time mismatch");
2354            assert_eq!(b.individual_trade_count, s.individual_trade_count, "Bar {i}: trade_count mismatch");
2355        }
2356
2357        // Verify incomplete bars match
2358        let batch_inc = batch_processor.get_incomplete_bar();
2359        let stream_inc = stream_processor.get_incomplete_bar();
2360        match (&batch_inc, &stream_inc) {
2361            (Some(b), Some(s)) => {
2362                assert_eq!(b.open, s.open, "Incomplete: open mismatch");
2363                assert_eq!(b.volume, s.volume, "Incomplete: volume mismatch");
2364            }
2365            (None, None) => {}
2366            _ => panic!("Incomplete bar presence mismatch"),
2367        }
2368    }
2369
2370    #[test]
2371    fn test_multi_batch_sequential_state_continuity() {
2372        // Send 3 separate batches, each producing 1+ bars
2373        // Verify state carries correctly across batch boundaries
2374        let mut processor = RangeBarProcessor::new(100).unwrap(); // 100 dbps = 0.10%
2375        let mut all_bars = Vec::new();
2376
2377        // Batch 1: open at 50000, breach needs > 0.10% = price > 50050
2378        let batch1 = vec![
2379            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
2380            test_utils::create_test_agg_trade(2, "50020.0", "1.0", 2000),
2381            test_utils::create_test_agg_trade(3, "50060.0", "1.0", 3000), // Breach (>0.10%)
2382        ];
2383        let bars1 = processor.process_agg_trade_records(&batch1).unwrap();
2384        all_bars.extend(bars1);
2385
2386        // Batch 2: next trade opens new bar, breach again
2387        let batch2 = vec![
2388            test_utils::create_test_agg_trade(4, "50100.0", "1.0", 4000), // Opens new bar
2389            test_utils::create_test_agg_trade(5, "50120.0", "1.0", 5000),
2390            test_utils::create_test_agg_trade(6, "50170.0", "1.0", 6000), // Breach (>0.10%)
2391        ];
2392        let bars2 = processor.process_agg_trade_records(&batch2).unwrap();
2393        all_bars.extend(bars2);
2394
2395        // Batch 3: another new bar from fresh state
2396        let batch3 = vec![
2397            test_utils::create_test_agg_trade(7, "50200.0", "1.0", 7000), // Opens new bar
2398            test_utils::create_test_agg_trade(8, "50220.0", "1.0", 8000),
2399            test_utils::create_test_agg_trade(9, "50280.0", "1.0", 9000), // Breach (>0.10%)
2400        ];
2401        let bars3 = processor.process_agg_trade_records(&batch3).unwrap();
2402        all_bars.extend(bars3);
2403
2404        // Should have produced at least 3 bars (one per batch boundary)
2405        assert!(
2406            all_bars.len() >= 3,
2407            "Expected at least 3 bars from 3 batches, got {}",
2408            all_bars.len()
2409        );
2410
2411        // Timestamps must be strictly monotonic
2412        for i in 1..all_bars.len() {
2413            assert!(
2414                all_bars[i].close_time >= all_bars[i - 1].close_time,
2415                "Bar {i}: close_time {} < previous {}",
2416                all_bars[i].close_time,
2417                all_bars[i - 1].close_time
2418            );
2419        }
2420
2421        // Trade IDs should be continuous across batches
2422        for i in 1..all_bars.len() {
2423            assert_eq!(
2424                all_bars[i].first_agg_trade_id,
2425                all_bars[i - 1].last_agg_trade_id + 1,
2426                "Bar {i}: trade ID gap (first={}, prev last={})",
2427                all_bars[i].first_agg_trade_id,
2428                all_bars[i - 1].last_agg_trade_id
2429            );
2430        }
2431    }
2432
2433    // Issue #96 Task #93: Edge case tests for processor algorithm invariants
2434
2435    #[test]
2436    fn test_same_timestamp_prevents_bar_close() {
2437        // Issue #36: Bar cannot close on same timestamp as it opened
2438        let mut processor = RangeBarProcessor::new(250).unwrap();
2439        processor.prevent_same_timestamp_close = true;
2440
2441        // All trades at same timestamp but price breaches threshold
2442        let trades: Vec<AggTrade> = (0..5)
2443            .map(|i| {
2444                let price_str = if i == 0 {
2445                    "50000.0".to_string()
2446                } else {
2447                    // Price far above threshold to trigger breach
2448                    format!("{}.0", 50000 + (i + 1) * 200)
2449                };
2450                AggTrade {
2451                    agg_trade_id: i as i64,
2452                    price: FixedPoint::from_str(&price_str).unwrap(),
2453                    volume: FixedPoint::from_str("1.0").unwrap(),
2454                    first_trade_id: i as i64,
2455                    last_trade_id: i as i64,
2456                    timestamp: 1000000, // ALL same timestamp
2457                    is_buyer_maker: false,
2458                    is_best_match: None,
2459                }
2460            })
2461            .collect();
2462
2463        let bars = processor.process_agg_trade_records(&trades).unwrap();
2464        // No bars should close because timestamp gate blocks it
2465        assert_eq!(bars.len(), 0, "Same timestamp should prevent bar close (Issue #36)");
2466    }
2467
2468    #[test]
2469    fn test_single_trade_incomplete_bar() {
2470        let mut processor = RangeBarProcessor::new(250).unwrap();
2471
2472        let trade = AggTrade {
2473            agg_trade_id: 1,
2474            price: FixedPoint::from_str("50000.0").unwrap(),
2475            volume: FixedPoint::from_str("10.0").unwrap(),
2476            first_trade_id: 1,
2477            last_trade_id: 1,
2478            timestamp: 1000000,
2479            is_buyer_maker: false,
2480            is_best_match: None,
2481        };
2482
2483        // Strict mode: 0 completed bars
2484        let bars = processor.process_agg_trade_records(&[trade.clone()]).unwrap();
2485        assert_eq!(bars.len(), 0, "Single trade cannot complete a bar");
2486
2487        // With incomplete: should return 1 incomplete bar
2488        let mut processor2 = RangeBarProcessor::new(250).unwrap();
2489        let bars_incl = processor2
2490            .process_agg_trade_records_with_incomplete(&[trade])
2491            .unwrap();
2492        assert_eq!(bars_incl.len(), 1, "Should return 1 incomplete bar");
2493        assert_eq!(bars_incl[0].open, bars_incl[0].close);
2494        assert_eq!(bars_incl[0].high, bars_incl[0].low);
2495    }
2496
2497    // === Issue #96: Configuration method coverage tests ===
2498
2499    #[test]
2500    fn test_with_options_gate_disabled_same_timestamp_closes() {
2501        // Issue #36: with prevent_same_timestamp_close=false, bar should close
2502        // even when breach trade has same timestamp as open
2503        let mut processor = RangeBarProcessor::with_options(250, false).unwrap();
2504        assert!(!processor.prevent_same_timestamp_close());
2505
2506        let trades = vec![
2507            AggTrade {
2508                agg_trade_id: 1, price: FixedPoint::from_str("50000.0").unwrap(),
2509                volume: FixedPoint::from_str("1.0").unwrap(),
2510                first_trade_id: 1, last_trade_id: 1, timestamp: 1000000,
2511                is_buyer_maker: false, is_best_match: None,
2512            },
2513            AggTrade {
2514                agg_trade_id: 2, price: FixedPoint::from_str("50200.0").unwrap(), // +0.4% > 0.25%
2515                volume: FixedPoint::from_str("1.0").unwrap(),
2516                first_trade_id: 2, last_trade_id: 2, timestamp: 1000000, // Same timestamp!
2517                is_buyer_maker: false, is_best_match: None,
2518            },
2519        ];
2520        let bars = processor.process_agg_trade_records(&trades).unwrap();
2521        assert_eq!(bars.len(), 1, "Gate disabled: same-timestamp breach should close bar");
2522    }
2523
2524    #[test]
2525    fn test_inter_bar_config_enables_features() {
2526        use crate::interbar::LookbackMode;
2527        let processor = RangeBarProcessor::new(250).unwrap();
2528        assert!(!processor.inter_bar_enabled(), "Default: inter-bar disabled");
2529
2530        let processor = processor.with_inter_bar_config(InterBarConfig {
2531            lookback_mode: LookbackMode::FixedCount(100),
2532            compute_tier2: false,
2533            compute_tier3: false,
2534        });
2535        assert!(processor.inter_bar_enabled(), "After config: inter-bar enabled");
2536    }
2537
2538    #[test]
2539    fn test_intra_bar_feature_toggle() {
2540        let processor = RangeBarProcessor::new(250).unwrap();
2541        assert!(!processor.intra_bar_enabled(), "Default: intra-bar disabled");
2542
2543        let processor = processor.with_intra_bar_features();
2544        assert!(processor.intra_bar_enabled(), "After toggle: intra-bar enabled");
2545    }
2546
2547    #[test]
2548    fn test_set_inter_bar_config_after_construction() {
2549        use crate::interbar::LookbackMode;
2550        let mut processor = RangeBarProcessor::new(500).unwrap();
2551        assert!(!processor.inter_bar_enabled());
2552
2553        processor.set_inter_bar_config(InterBarConfig {
2554            lookback_mode: LookbackMode::FixedCount(200),
2555            compute_tier2: true,
2556            compute_tier3: false,
2557        });
2558        assert!(processor.inter_bar_enabled(), "set_inter_bar_config should enable");
2559    }
2560
2561    #[test]
2562    fn test_process_with_options_incomplete_false_vs_true() {
2563        let trades = scenarios::single_breach_sequence(250);
2564
2565        // Without incomplete: only completed bars
2566        let mut p1 = RangeBarProcessor::new(250).unwrap();
2567        let bars_strict = p1.process_agg_trade_records_with_options(&trades, false).unwrap();
2568
2569        // With incomplete: completed + 1 partial
2570        let mut p2 = RangeBarProcessor::new(250).unwrap();
2571        let bars_incl = p2.process_agg_trade_records_with_options(&trades, true).unwrap();
2572
2573        assert!(
2574            bars_incl.len() >= bars_strict.len(),
2575            "inclusive ({}) must be >= strict ({})", bars_incl.len(), bars_strict.len()
2576        );
2577    }
2578
2579    // === Issue #96: Untested public method coverage ===
2580
2581    #[test]
2582    fn test_anomaly_summary_default_no_anomalies() {
2583        let processor = RangeBarProcessor::new(250).unwrap();
2584        let summary = processor.anomaly_summary();
2585        assert_eq!(summary.gaps_detected, 0);
2586        assert_eq!(summary.overlaps_detected, 0);
2587        assert_eq!(summary.timestamp_anomalies, 0);
2588        assert!(!summary.has_anomalies());
2589        assert_eq!(summary.total(), 0);
2590    }
2591
2592    #[test]
2593    fn test_anomaly_summary_preserved_through_checkpoint() {
2594        // Process some trades, create checkpoint, restore, verify anomaly state
2595        let mut processor = RangeBarProcessor::new(250).unwrap();
2596        let trades = scenarios::single_breach_sequence(250);
2597        processor.process_agg_trade_records(&trades).unwrap();
2598
2599        let checkpoint = processor.create_checkpoint("TEST");
2600        let restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2601        let summary = restored.anomaly_summary();
2602        // Default processor has no anomalies; checkpoint preserves that
2603        assert_eq!(summary.total(), 0);
2604    }
2605
2606    #[test]
2607    fn test_anomaly_summary_from_checkpoint_with_anomalies() {
2608        // Deserialize a checkpoint that has anomaly data
2609        let json = r#"{
2610            "version": 3,
2611            "symbol": "TESTUSDT",
2612            "threshold_decimal_bps": 250,
2613            "prevent_same_timestamp_close": true,
2614            "defer_open": false,
2615            "current_bar": null,
2616            "thresholds": null,
2617            "last_timestamp_us": 1000000,
2618            "last_trade_id": 5,
2619            "price_hash": 0,
2620            "anomaly_summary": {"gaps_detected": 3, "overlaps_detected": 1, "timestamp_anomalies": 2}
2621        }"#;
2622        let checkpoint: crate::checkpoint::Checkpoint = serde_json::from_str(json).unwrap();
2623        let processor = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2624        let summary = processor.anomaly_summary();
2625        assert_eq!(summary.gaps_detected, 3);
2626        assert_eq!(summary.overlaps_detected, 1);
2627        assert_eq!(summary.timestamp_anomalies, 2);
2628        assert!(summary.has_anomalies());
2629        assert_eq!(summary.total(), 6);
2630    }
2631
2632    #[test]
2633    fn test_with_inter_bar_config_and_cache_shared() {
2634        use crate::entropy_cache_global::get_global_entropy_cache;
2635        use crate::interbar::LookbackMode;
2636
2637        let global_cache = get_global_entropy_cache();
2638        let config = InterBarConfig {
2639            lookback_mode: LookbackMode::FixedCount(100),
2640            compute_tier2: true,
2641            compute_tier3: true,
2642        };
2643
2644        // Two processors sharing the same global cache
2645        let p1 = RangeBarProcessor::new(250).unwrap()
2646            .with_inter_bar_config_and_cache(config.clone(), Some(global_cache.clone()));
2647        let p2 = RangeBarProcessor::new(500).unwrap()
2648            .with_inter_bar_config_and_cache(config, Some(global_cache));
2649
2650        assert!(p1.inter_bar_enabled());
2651        assert!(p2.inter_bar_enabled());
2652    }
2653
2654    #[test]
2655    fn test_set_inter_bar_config_with_cache_after_checkpoint() {
2656        use crate::entropy_cache_global::get_global_entropy_cache;
2657        use crate::interbar::LookbackMode;
2658
2659        let mut processor = RangeBarProcessor::new(250).unwrap();
2660        let trades = scenarios::single_breach_sequence(250);
2661        processor.process_agg_trade_records(&trades).unwrap();
2662
2663        // Simulate checkpoint round-trip (inter-bar config not preserved)
2664        let checkpoint = processor.create_checkpoint("TEST");
2665        let mut restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2666        assert!(!restored.inter_bar_enabled(), "Checkpoint does not preserve inter-bar config");
2667
2668        // Re-enable with shared cache
2669        let global_cache = get_global_entropy_cache();
2670        restored.set_inter_bar_config_with_cache(
2671            InterBarConfig {
2672                lookback_mode: LookbackMode::FixedCount(100),
2673                compute_tier2: false,
2674                compute_tier3: false,
2675            },
2676            Some(global_cache),
2677        );
2678        assert!(restored.inter_bar_enabled(), "set_inter_bar_config_with_cache should re-enable");
2679    }
2680
2681    #[test]
2682    fn test_threshold_decimal_bps_getter() {
2683        let p250 = RangeBarProcessor::new(250).unwrap();
2684        assert_eq!(p250.threshold_decimal_bps(), 250);
2685
2686        let p1000 = RangeBarProcessor::new(1000).unwrap();
2687        assert_eq!(p1000.threshold_decimal_bps(), 1000);
2688    }
2689
2690    // =========================================================================
2691    // Issue #112: Gap-aware checkpoint recovery tests
2692    // =========================================================================
2693
2694    #[test]
2695    fn test_checkpoint_gap_discards_forming_bar() {
2696        // Process some trades, checkpoint, then resume with a 2-hour gap
2697        let mut processor = RangeBarProcessor::new(250).unwrap();
2698
2699        // Create trades that don't breach (stay within 0.25%)
2700        let trades = vec![
2701            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000), // t=0
2702            test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000), // t=+1s
2703            test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640995202_000_000), // t=+2s
2704        ];
2705
2706        let bars = processor.process_agg_trade_records(&trades).unwrap();
2707        assert_eq!(bars.len(), 0, "No breach = no completed bars");
2708
2709        // Create checkpoint (should have forming bar)
2710        let checkpoint = processor.create_checkpoint("BTCUSDT");
2711        assert!(checkpoint.has_incomplete_bar(), "Should have forming bar");
2712
2713        // Resume from checkpoint
2714        let mut restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2715
2716        // Feed trades with 2-hour gap (7,200,000,000 μs > 1-hour default max_gap)
2717        let gap_trades = vec![
2718            test_utils::create_test_agg_trade(4, "50030.0", "1.0", 1641002402_000_000), // +2h gap
2719            test_utils::create_test_agg_trade(5, "50040.0", "1.0", 1641002403_000_000),
2720        ];
2721
2722        let bars = restored.process_agg_trade_records(&gap_trades).unwrap();
2723        // Forming bar should have been discarded, no oversized bar emitted
2724        assert_eq!(bars.len(), 0, "No bars should complete — forming bar was discarded");
2725        assert_eq!(
2726            restored.anomaly_summary().gaps_detected, 1,
2727            "Gap should be recorded in anomaly summary"
2728        );
2729    }
2730
2731    #[test]
2732    fn test_checkpoint_small_gap_continues_bar() {
2733        // Resume with a gap UNDER the threshold — bar should continue
2734        let mut processor = RangeBarProcessor::new(250).unwrap();
2735
2736        let trades = vec![
2737            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2738            test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000),
2739        ];
2740
2741        let _ = processor.process_agg_trade_records(&trades).unwrap();
2742        let checkpoint = processor.create_checkpoint("BTCUSDT");
2743        let mut restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2744
2745        // Feed trades with 30-minute gap (1,800,000,000 μs < 1-hour max_gap)
2746        let small_gap_trades = vec![
2747            test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640997001_000_000), // +30m
2748            test_utils::create_test_agg_trade(4, "50125.01", "1.0", 1640997002_000_000), // breach
2749        ];
2750
2751        let bars = restored.process_agg_trade_records(&small_gap_trades).unwrap();
2752        assert_eq!(bars.len(), 1, "Bar should complete normally with small gap");
2753        // Bar should span from original open (trade 1) through gap trades
2754        assert_eq!(bars[0].open_time, 1640995200_000_000);
2755        assert_eq!(
2756            restored.anomaly_summary().gaps_detected, 0,
2757            "No gap anomaly for small gap"
2758        );
2759    }
2760
2761    #[test]
2762    fn test_checkpoint_gap_custom_max_gap() {
2763        // Test with custom max_gap_us set to 30 minutes
2764        let mut processor = RangeBarProcessor::new(250).unwrap();
2765
2766        let trades = vec![
2767            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2768        ];
2769        let _ = processor.process_agg_trade_records(&trades).unwrap();
2770        let checkpoint = processor.create_checkpoint("BTCUSDT");
2771
2772        // Restore with custom 30-minute max gap
2773        let mut restored = RangeBarProcessor::from_checkpoint(checkpoint)
2774            .unwrap()
2775            .with_max_gap(1_800_000_000); // 30 minutes
2776
2777        // 45-minute gap should discard with 30-min threshold
2778        let gap_trades = vec![
2779            test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640997900_000_000), // +45min
2780        ];
2781
2782        let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
2783        assert_eq!(
2784            restored.anomaly_summary().gaps_detected, 1,
2785            "45-min gap should be detected with 30-min threshold"
2786        );
2787    }
2788
2789    #[test]
2790    fn test_is_valid_range_rejects_oversized() {
2791        use crate::fixed_point::FixedPoint;
2792
2793        let threshold_decimal_bps: u32 = 250; // 0.25%
2794        let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
2795            / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
2796
2797        // Bar with 0.50% range — exceeds 2x threshold (2 * 0.25% = 0.50%)
2798        // open=50000.0, high=50250.01, low=50000.0 → range=250.01/50000 ≈ 0.5%+
2799        let mut oversized = RangeBar::default();
2800        oversized.open = FixedPoint::from_str("50000.0").unwrap();
2801        oversized.high = FixedPoint::from_str("50250.01").unwrap();
2802        oversized.low = FixedPoint::from_str("50000.0").unwrap();
2803        assert!(
2804            !oversized.is_valid_range(threshold_ratio, 2),
2805            "Bar exceeding 2x threshold should be invalid"
2806        );
2807
2808        // Bar with 0.20% range — within threshold
2809        let mut valid = RangeBar::default();
2810        valid.open = FixedPoint::from_str("50000.0").unwrap();
2811        valid.high = FixedPoint::from_str("50100.0").unwrap();
2812        valid.low = FixedPoint::from_str("50000.0").unwrap();
2813        assert!(
2814            valid.is_valid_range(threshold_ratio, 2),
2815            "Bar within threshold should be valid"
2816        );
2817
2818        // Bar at exact breach boundary (0.25%) — should be valid (breach triggers AT threshold)
2819        let mut exact = RangeBar::default();
2820        exact.open = FixedPoint::from_str("50000.0").unwrap();
2821        exact.high = FixedPoint::from_str("50125.0").unwrap();
2822        exact.low = FixedPoint::from_str("50000.0").unwrap();
2823        assert!(
2824            exact.is_valid_range(threshold_ratio, 2),
2825            "Bar at exact threshold should be valid"
2826        );
2827    }
2828
2829    #[test]
2830    fn test_checkpoint_no_incomplete_bar_gap_is_noop() {
2831        // If checkpoint has no forming bar, gap detection is irrelevant
2832        let mut processor = RangeBarProcessor::new(250).unwrap();
2833
2834        // Process trades that complete a bar (produce breach)
2835        let trades = vec![
2836            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2837            test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1640995201_000_000), // breach
2838        ];
2839        let bars = processor.process_agg_trade_records(&trades).unwrap();
2840        assert_eq!(bars.len(), 1);
2841
2842        let checkpoint = processor.create_checkpoint("BTCUSDT");
2843        assert!(!checkpoint.has_incomplete_bar());
2844
2845        let mut restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2846
2847        // Large gap doesn't matter — no forming bar to discard
2848        let gap_trades = vec![
2849            test_utils::create_test_agg_trade(3, "50010.0", "1.0", 1641081600_000_000), // +24h
2850        ];
2851        let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
2852        assert_eq!(
2853            restored.anomaly_summary().gaps_detected, 0,
2854            "No gap anomaly when no forming bar exists"
2855        );
2856    }
2857}