Skip to main content

rangebar_core/
processor.rs

1// FILE-SIZE-OK: 1496 lines — tests are inline (access private RangeBarState)
2//! Core range bar processing algorithm
3//!
4//! Implements non-lookahead bias range bar construction where bars close when
5//! price moves ±threshold dbps from the bar's OPEN price.
6
7use crate::checkpoint::{
8    AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
9};
10use crate::fixed_point::FixedPoint;
11use crate::interbar::{InterBarConfig, TradeHistory}; // Issue #59
12use crate::intrabar::compute_intra_bar_features; // Issue #59: Intra-bar features
13use crate::types::{AggTrade, RangeBar};
14#[cfg(feature = "python")]
15use pyo3::prelude::*;
16// Re-export ProcessingError from errors.rs (Phase 2a extraction)
17pub use crate::errors::ProcessingError;
18// Re-export ExportRangeBarProcessor from export_processor.rs (Phase 2d extraction)
19pub use crate::export_processor::ExportRangeBarProcessor;
20
21/// Range bar processor with non-lookahead bias guarantee
22pub struct RangeBarProcessor {
23    /// Threshold in decimal basis points (250 = 25bps, v3.0.0+)
24    threshold_decimal_bps: u32,
25
26    /// Current bar state for streaming processing (Q19)
27    /// Enables get_incomplete_bar() and stateful process_single_trade()
28    current_bar_state: Option<RangeBarState>,
29
30    /// Price window for checkpoint hash verification
31    price_window: PriceWindow,
32
33    /// Last processed trade ID (for gap detection on resume)
34    last_trade_id: Option<i64>,
35
36    /// Last processed timestamp (for position verification)
37    last_timestamp_us: i64,
38
39    /// Anomaly tracking for debugging
40    anomaly_summary: AnomalySummary,
41
42    /// Flag indicating this processor was created from a checkpoint
43    /// When true, process_agg_trade_records will continue from existing bar state
44    resumed_from_checkpoint: bool,
45
46    /// Prevent bars from closing on same timestamp as they opened (Issue #36)
47    ///
48    /// When true (default): A bar cannot close until a trade arrives with a
49    /// different timestamp than the bar's open_time. This prevents "instant bars"
50    /// during flash crashes where multiple trades occur at the same millisecond.
51    ///
52    /// When false: Legacy behavior - bars can close on any breach regardless
53    /// of timestamp, which may produce bars with identical timestamps.
54    prevent_same_timestamp_close: bool,
55
56    /// Deferred bar open flag (Issue #46)
57    ///
58    /// When true: The previous trade triggered a threshold breach and closed a bar.
59    /// The next trade arriving via `process_single_trade()` should open a new bar
60    /// instead of being treated as a continuation.
61    ///
62    /// This matches the batch path's `defer_open` semantics in
63    /// `process_agg_trade_records()` where the breaching trade closes the current
64    /// bar and the NEXT trade opens the new bar.
65    defer_open: bool,
66
67    /// Trade history for inter-bar feature computation (Issue #59)
68    ///
69    /// Ring buffer of recent trades for computing lookback-based features.
70    /// When Some, features are computed from trades BEFORE each bar's open_time.
71    /// When None, inter-bar features are disabled (all lookback_* fields = None).
72    trade_history: Option<TradeHistory>,
73
74    /// Configuration for inter-bar features (Issue #59)
75    ///
76    /// Controls lookback mode (fixed count or time window) and which feature
77    /// tiers to compute. When None, inter-bar features are disabled.
78    inter_bar_config: Option<InterBarConfig>,
79
80    /// Enable intra-bar feature computation (Issue #59)
81    ///
82    /// When true, the processor accumulates trades during bar construction
83    /// and computes 22 features from trades WITHIN each bar at bar close.
84    /// Features include ITH (Investment Time Horizon), statistical, and
85    /// complexity metrics. When false, all intra_* fields are None.
86    include_intra_bar_features: bool,
87}
88
89impl RangeBarProcessor {
90    /// Create new processor with given threshold
91    ///
92    /// Uses default behavior: `prevent_same_timestamp_close = true` (Issue #36)
93    ///
94    /// # Arguments
95    ///
96    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
97    ///   - Example: `250` → 25bps = 0.25%
98    ///   - Example: `10` → 1bps = 0.01%
99    ///   - Minimum: `1` → 0.1bps = 0.001%
100    ///
101    /// # Breaking Change (v3.0.0)
102    ///
103    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
104    /// **Migration**: Multiply all threshold values by 10.
105    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
106        Self::with_options(threshold_decimal_bps, true)
107    }
108
109    /// Create new processor with explicit timestamp gating control
110    ///
111    /// # Arguments
112    ///
113    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
114    /// * `prevent_same_timestamp_close` - If true, bars cannot close until
115    ///   timestamp advances from open_time. This prevents "instant bars" during
116    ///   flash crashes. Set to false for legacy behavior (pre-v9).
117    ///
118    /// # Example
119    ///
120    /// ```ignore
121    /// // Default behavior (v9+): timestamp gating enabled
122    /// let processor = RangeBarProcessor::new(250)?;
123    ///
124    /// // Legacy behavior: allow instant bars
125    /// let processor = RangeBarProcessor::with_options(250, false)?;
126    /// ```
127    pub fn with_options(
128        threshold_decimal_bps: u32,
129        prevent_same_timestamp_close: bool,
130    ) -> Result<Self, ProcessingError> {
131        // Validation bounds (v3.0.0: dbps units)
132        // Min: 1 dbps = 0.001%
133        // Max: 100,000 dbps = 100%
134        if threshold_decimal_bps < 1 {
135            return Err(ProcessingError::InvalidThreshold {
136                threshold_decimal_bps,
137            });
138        }
139        if threshold_decimal_bps > 100_000 {
140            return Err(ProcessingError::InvalidThreshold {
141                threshold_decimal_bps,
142            });
143        }
144
145        Ok(Self {
146            threshold_decimal_bps,
147            current_bar_state: None,
148            price_window: PriceWindow::new(),
149            last_trade_id: None,
150            last_timestamp_us: 0,
151            anomaly_summary: AnomalySummary::default(),
152            resumed_from_checkpoint: false,
153            prevent_same_timestamp_close,
154            defer_open: false,
155            trade_history: None,               // Issue #59: disabled by default
156            inter_bar_config: None,            // Issue #59: disabled by default
157            include_intra_bar_features: false, // Issue #59: disabled by default
158        })
159    }
160
161    /// Get the prevent_same_timestamp_close setting
162    pub fn prevent_same_timestamp_close(&self) -> bool {
163        self.prevent_same_timestamp_close
164    }
165
166    /// Enable inter-bar feature computation with the given configuration (Issue #59)
167    ///
168    /// When enabled, the processor maintains a trade history buffer and computes
169    /// lookback-based microstructure features on each bar close. Features are
170    /// computed from trades that occurred BEFORE each bar's open_time, ensuring
171    /// no lookahead bias.
172    ///
173    /// # Arguments
174    ///
175    /// * `config` - Configuration controlling lookback mode and feature tiers
176    ///
177    /// # Example
178    ///
179    /// ```ignore
180    /// use rangebar_core::processor::RangeBarProcessor;
181    /// use rangebar_core::interbar::{InterBarConfig, LookbackMode};
182    ///
183    /// let processor = RangeBarProcessor::new(1000)?
184    ///     .with_inter_bar_config(InterBarConfig {
185    ///         lookback_mode: LookbackMode::FixedCount(500),
186    ///         compute_tier2: true,
187    ///         compute_tier3: true,
188    ///     });
189    /// ```
190    pub fn with_inter_bar_config(mut self, config: InterBarConfig) -> Self {
191        self.trade_history = Some(TradeHistory::new(config.clone()));
192        self.inter_bar_config = Some(config);
193        self
194    }
195
196    /// Check if inter-bar features are enabled
197    pub fn inter_bar_enabled(&self) -> bool {
198        self.inter_bar_config.is_some()
199    }
200
201    /// Enable intra-bar feature computation (Issue #59)
202    ///
203    /// When enabled, the processor accumulates trades during bar construction
204    /// and computes 22 features from trades WITHIN each bar at bar close:
205    /// - 8 ITH features (Investment Time Horizon)
206    /// - 12 statistical features (OFI, intensity, Kyle lambda, etc.)
207    /// - 2 complexity features (Hurst exponent, permutation entropy)
208    ///
209    /// # Memory Note
210    ///
211    /// Trades are accumulated per-bar and freed when the bar closes.
212    /// Typical 1000 dbps bar: ~50-500 trades, ~2-24 KB overhead.
213    ///
214    /// # Example
215    ///
216    /// ```ignore
217    /// let processor = RangeBarProcessor::new(1000)?
218    ///     .with_intra_bar_features();
219    /// ```
220    pub fn with_intra_bar_features(mut self) -> Self {
221        self.include_intra_bar_features = true;
222        self
223    }
224
225    /// Check if intra-bar features are enabled
226    pub fn intra_bar_enabled(&self) -> bool {
227        self.include_intra_bar_features
228    }
229
230    /// Re-enable inter-bar features on an existing processor (Issue #97).
231    ///
232    /// Used after `from_checkpoint()` to restore microstructure config that
233    /// is not preserved in checkpoint state.
234    pub fn set_inter_bar_config(&mut self, config: InterBarConfig) {
235        self.trade_history = Some(TradeHistory::new(config.clone()));
236        self.inter_bar_config = Some(config);
237    }
238
239    /// Re-enable intra-bar features on an existing processor (Issue #97).
240    pub fn set_intra_bar_features(&mut self, enabled: bool) {
241        self.include_intra_bar_features = enabled;
242    }
243
244    /// Process a single trade and return completed bar if any
245    ///
246    /// Maintains internal state for streaming use case. State persists across calls
247    /// until a bar completes (threshold breach), enabling get_incomplete_bar().
248    ///
249    /// # Arguments
250    ///
251    /// * `trade` - Single aggregated trade to process
252    ///
253    /// # Returns
254    ///
255    /// `Some(RangeBar)` if a bar was completed, `None` otherwise
256    ///
257    /// # State Management
258    ///
259    /// - First trade: Initializes new bar state
260    /// - Subsequent trades: Updates existing bar or closes on breach
261    /// - Breach: Returns completed bar, starts new bar with breaching trade
262    pub fn process_single_trade(
263        &mut self,
264        trade: AggTrade,
265    ) -> Result<Option<RangeBar>, ProcessingError> {
266        // Track price and position for checkpoint
267        self.price_window.push(trade.price);
268        self.last_trade_id = Some(trade.agg_trade_id);
269        self.last_timestamp_us = trade.timestamp;
270
271        // Issue #59: Push trade to history buffer for inter-bar feature computation
272        // This must happen BEFORE bar processing so lookback window includes recent trades
273        if let Some(ref mut history) = self.trade_history {
274            history.push(&trade);
275        }
276
277        // Issue #46: If previous call triggered a breach, this trade opens the new bar.
278        // This matches the batch path's defer_open semantics - the breaching trade
279        // closes the current bar, and the NEXT trade opens the new bar.
280        if self.defer_open {
281            // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
282            if let Some(ref mut history) = self.trade_history {
283                history.on_bar_open(trade.timestamp);
284            }
285            self.current_bar_state = Some(if self.include_intra_bar_features {
286                RangeBarState::new_with_trade_accumulation(&trade, self.threshold_decimal_bps)
287            } else {
288                RangeBarState::new(&trade, self.threshold_decimal_bps)
289            });
290            self.defer_open = false;
291            return Ok(None);
292        }
293
294        match &mut self.current_bar_state {
295            None => {
296                // First trade - initialize new bar
297                // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
298                if let Some(ref mut history) = self.trade_history {
299                    history.on_bar_open(trade.timestamp);
300                }
301                self.current_bar_state = Some(if self.include_intra_bar_features {
302                    RangeBarState::new_with_trade_accumulation(&trade, self.threshold_decimal_bps)
303                } else {
304                    RangeBarState::new(&trade, self.threshold_decimal_bps)
305                });
306                Ok(None)
307            }
308            Some(bar_state) => {
309                // Issue #59: Accumulate trade for intra-bar features (before breach check)
310                if self.include_intra_bar_features {
311                    bar_state.accumulate_trade(&trade);
312                }
313
314                // Check for threshold breach
315                let price_breaches = bar_state.bar.is_breach(
316                    trade.price,
317                    bar_state.upper_threshold,
318                    bar_state.lower_threshold,
319                );
320
321                // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
322                // This eliminates "instant bars" during flash crashes where multiple trades
323                // occur at the same millisecond.
324                let timestamp_allows_close = !self.prevent_same_timestamp_close
325                    || trade.timestamp != bar_state.bar.open_time;
326
327                if price_breaches && timestamp_allows_close {
328                    // Breach detected AND timestamp changed - close current bar
329                    bar_state.bar.update_with_trade(&trade);
330
331                    // Validation: Ensure high/low include open/close extremes
332                    debug_assert!(
333                        bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
334                    );
335                    debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
336
337                    // Compute microstructure features at bar finalization (Issue #25)
338                    bar_state.bar.compute_microstructure_features();
339
340                    // Issue #59: Compute inter-bar features from lookback window
341                    // Features are computed from trades BEFORE bar.open_time (no lookahead)
342                    if let Some(ref mut history) = self.trade_history {
343                        let inter_bar_features = history.compute_features(bar_state.bar.open_time);
344                        bar_state.bar.set_inter_bar_features(&inter_bar_features);
345                        // Issue #68: Notify history that bar is closing (resumes normal pruning)
346                        history.on_bar_close();
347                    }
348
349                    // Issue #59: Compute intra-bar features from accumulated trades
350                    if self.include_intra_bar_features {
351                        let intra_bar_features =
352                            compute_intra_bar_features(&bar_state.accumulated_trades);
353                        bar_state.bar.set_intra_bar_features(&intra_bar_features);
354                    }
355
356                    // Move bar out instead of cloning — bar_state borrow ends after
357                    // last use above (NLL), so take() is safe here.
358                    let completed_bar = self.current_bar_state.take().unwrap().bar;
359
360                    // Issue #46: Don't start new bar with breaching trade.
361                    // Next trade will open the new bar via defer_open.
362                    self.defer_open = true;
363
364                    Ok(Some(completed_bar))
365                } else {
366                    // Either no breach OR same timestamp (gate active) - update existing bar
367                    bar_state.bar.update_with_trade(&trade);
368                    Ok(None)
369                }
370            }
371        }
372    }
373
374    /// Get any incomplete bar currently being processed
375    ///
376    /// Returns clone of current bar state for inspection without consuming it.
377    /// Useful for final bar at stream end or progress monitoring.
378    ///
379    /// # Returns
380    ///
381    /// `Some(RangeBar)` if bar is in progress, `None` if no active bar
382    pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
383        self.current_bar_state
384            .as_ref()
385            .map(|state| state.bar.clone())
386    }
387
388    /// Process AggTrade records into range bars including incomplete bars for analysis
389    ///
390    /// # Arguments
391    ///
392    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
393    ///
394    /// # Returns
395    ///
396    /// Vector of range bars including incomplete bars at end of data
397    ///
398    /// # Warning
399    ///
400    /// This method is for analysis purposes only. Incomplete bars violate the
401    /// fundamental range bar algorithm and should not be used for production trading.
402    pub fn process_agg_trade_records_with_incomplete(
403        &mut self,
404        agg_trade_records: &[AggTrade],
405    ) -> Result<Vec<RangeBar>, ProcessingError> {
406        self.process_agg_trade_records_with_options(agg_trade_records, true)
407    }
408
409    /// Process Binance aggregated trade records into range bars
410    ///
411    /// This is the primary method for converting AggTrade records (which aggregate
412    /// multiple individual trades) into range bars based on price movement thresholds.
413    ///
414    /// # Parameters
415    ///
416    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
417    ///   Each record represents multiple individual trades aggregated at same price
418    ///
419    /// # Returns
420    ///
421    /// Vector of completed range bars (ONLY bars that breached thresholds).
422    /// Each bar tracks both individual trade count and AggTrade record count.
423    pub fn process_agg_trade_records(
424        &mut self,
425        agg_trade_records: &[AggTrade],
426    ) -> Result<Vec<RangeBar>, ProcessingError> {
427        self.process_agg_trade_records_with_options(agg_trade_records, false)
428    }
429
430    /// Process AggTrade records with options for including incomplete bars
431    ///
432    /// Batch processing mode: Clears any existing state before processing.
433    /// Use process_single_trade() for stateful streaming instead.
434    ///
435    /// # Parameters
436    ///
437    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
438    /// * `include_incomplete` - Whether to include incomplete bars at end of processing
439    ///
440    /// # Returns
441    ///
442    /// Vector of range bars (completed + incomplete if requested)
443    pub fn process_agg_trade_records_with_options(
444        &mut self,
445        agg_trade_records: &[AggTrade],
446        include_incomplete: bool,
447    ) -> Result<Vec<RangeBar>, ProcessingError> {
448        if agg_trade_records.is_empty() {
449            return Ok(Vec::new());
450        }
451
452        // Validate records are sorted
453        self.validate_trade_ordering(agg_trade_records)?;
454
455        // Use existing bar state if resuming from checkpoint, otherwise start fresh
456        // This is CRITICAL for cross-file continuation (Issues #2, #3)
457        let mut current_bar: Option<RangeBarState> = if self.resumed_from_checkpoint {
458            // Continue from checkpoint's incomplete bar
459            self.resumed_from_checkpoint = false; // Consume the flag
460            self.current_bar_state.take()
461        } else {
462            // Start fresh for normal batch processing
463            self.current_bar_state = None;
464            None
465        };
466
467        let mut bars = Vec::with_capacity(agg_trade_records.len() / 100); // Heuristic capacity
468        let mut defer_open = false;
469
470        for agg_record in agg_trade_records {
471            // Track price and position for checkpoint
472            self.price_window.push(agg_record.price);
473            self.last_trade_id = Some(agg_record.agg_trade_id);
474            self.last_timestamp_us = agg_record.timestamp;
475
476            // Issue #59: Push trade to history buffer for inter-bar feature computation
477            if let Some(ref mut history) = self.trade_history {
478                history.push(agg_record);
479            }
480
481            if defer_open {
482                // Previous bar closed, this agg_record opens new bar
483                // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
484                if let Some(ref mut history) = self.trade_history {
485                    history.on_bar_open(agg_record.timestamp);
486                }
487                current_bar = Some(if self.include_intra_bar_features {
488                    RangeBarState::new_with_trade_accumulation(
489                        agg_record,
490                        self.threshold_decimal_bps,
491                    )
492                } else {
493                    RangeBarState::new(agg_record, self.threshold_decimal_bps)
494                });
495                defer_open = false;
496                continue;
497            }
498
499            match current_bar {
500                None => {
501                    // First bar initialization
502                    // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
503                    if let Some(ref mut history) = self.trade_history {
504                        history.on_bar_open(agg_record.timestamp);
505                    }
506                    current_bar = Some(if self.include_intra_bar_features {
507                        RangeBarState::new_with_trade_accumulation(
508                            agg_record,
509                            self.threshold_decimal_bps,
510                        )
511                    } else {
512                        RangeBarState::new(agg_record, self.threshold_decimal_bps)
513                    });
514                }
515                Some(ref mut bar_state) => {
516                    // Issue #59: Accumulate trade for intra-bar features (before breach check)
517                    if self.include_intra_bar_features {
518                        bar_state.accumulate_trade(agg_record);
519                    }
520
521                    // Check if this AggTrade record breaches the threshold
522                    let price_breaches = bar_state.bar.is_breach(
523                        agg_record.price,
524                        bar_state.upper_threshold,
525                        bar_state.lower_threshold,
526                    );
527
528                    // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
529                    // This eliminates "instant bars" during flash crashes where multiple trades
530                    // occur at the same millisecond.
531                    let timestamp_allows_close = !self.prevent_same_timestamp_close
532                        || agg_record.timestamp != bar_state.bar.open_time;
533
534                    if price_breaches && timestamp_allows_close {
535                        // Breach detected AND timestamp changed - update bar with breaching record
536                        bar_state.bar.update_with_trade(agg_record);
537
538                        // Validation: Ensure high/low include open/close extremes
539                        debug_assert!(
540                            bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
541                        );
542                        debug_assert!(
543                            bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
544                        );
545
546                        // Compute microstructure features at bar finalization (Issue #34)
547                        bar_state.bar.compute_microstructure_features();
548
549                        // Issue #59: Compute inter-bar features from lookback window
550                        if let Some(ref mut history) = self.trade_history {
551                            let inter_bar_features =
552                                history.compute_features(bar_state.bar.open_time);
553                            bar_state.bar.set_inter_bar_features(&inter_bar_features);
554                            // Issue #68: Notify history that bar is closing (resumes normal pruning)
555                            history.on_bar_close();
556                        }
557
558                        // Issue #59: Compute intra-bar features from accumulated trades
559                        if self.include_intra_bar_features {
560                            let intra_bar_features =
561                                compute_intra_bar_features(&bar_state.accumulated_trades);
562                            bar_state.bar.set_intra_bar_features(&intra_bar_features);
563                        }
564
565                        // Move bar out instead of cloning — bar_state borrow ends
566                        // after last use above (NLL), so take() is safe here.
567                        bars.push(current_bar.take().unwrap().bar);
568                        defer_open = true; // Next record will open new bar
569                    } else {
570                        // Either no breach OR same timestamp (gate active) - normal update
571                        bar_state.bar.update_with_trade(agg_record);
572                    }
573                }
574            }
575        }
576
577        // Save current bar state for checkpoint and optionally append incomplete bar.
578        // When include_incomplete=true, clone for checkpoint then consume for output.
579        // When include_incomplete=false, move directly (no clone needed).
580        if include_incomplete {
581            self.current_bar_state = current_bar.clone();
582
583            // Add final partial bar only if explicitly requested
584            // This preserves algorithm integrity: bars should only close on threshold breach
585            if let Some(mut bar_state) = current_bar {
586                // Compute microstructure features for incomplete bar (Issue #34)
587                bar_state.bar.compute_microstructure_features();
588
589                // Issue #59: Compute inter-bar features from lookback window
590                if let Some(ref history) = self.trade_history {
591                    let inter_bar_features = history.compute_features(bar_state.bar.open_time);
592                    bar_state.bar.set_inter_bar_features(&inter_bar_features);
593                }
594
595                // Issue #59: Compute intra-bar features from accumulated trades
596                if self.include_intra_bar_features {
597                    let intra_bar_features =
598                        compute_intra_bar_features(&bar_state.accumulated_trades);
599                    bar_state.bar.set_intra_bar_features(&intra_bar_features);
600                }
601
602                bars.push(bar_state.bar);
603            }
604        } else {
605            // No incomplete bar appended — move ownership directly, no clone needed
606            self.current_bar_state = current_bar;
607        }
608
609        Ok(bars)
610    }
611
612    // === CHECKPOINT METHODS ===
613
614    /// Create checkpoint for cross-file continuation
615    ///
616    /// Captures current processing state for seamless continuation:
617    /// - Incomplete bar (if any) with FIXED thresholds
618    /// - Position tracking (timestamp, trade_id if available)
619    /// - Price hash for verification
620    ///
621    /// # Arguments
622    ///
623    /// * `symbol` - Symbol being processed (e.g., "BTCUSDT", "EURUSD")
624    ///
625    /// # Example
626    ///
627    /// ```ignore
628    /// let bars = processor.process_agg_trade_records(&trades)?;
629    /// let checkpoint = processor.create_checkpoint("BTCUSDT");
630    /// let json = serde_json::to_string(&checkpoint)?;
631    /// std::fs::write("checkpoint.json", json)?;
632    /// ```
633    pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
634        let (incomplete_bar, thresholds) = match &self.current_bar_state {
635            Some(state) => (
636                Some(state.bar.clone()),
637                Some((state.upper_threshold, state.lower_threshold)),
638            ),
639            None => (None, None),
640        };
641
642        let mut checkpoint = Checkpoint::new(
643            symbol.to_string(),
644            self.threshold_decimal_bps,
645            incomplete_bar,
646            thresholds,
647            self.last_timestamp_us,
648            self.last_trade_id,
649            self.price_window.compute_hash(),
650            self.prevent_same_timestamp_close,
651        );
652        // Issue #46: Persist defer_open state for cross-session continuity
653        checkpoint.defer_open = self.defer_open;
654        checkpoint
655    }
656
657    /// Resume processing from checkpoint
658    ///
659    /// Restores incomplete bar state with IMMUTABLE thresholds.
660    /// Next trade continues building the bar until threshold breach.
661    ///
662    /// # Errors
663    ///
664    /// - `CheckpointError::MissingThresholds` - Checkpoint has bar but no thresholds
665    ///
666    /// # Example
667    ///
668    /// ```ignore
669    /// let json = std::fs::read_to_string("checkpoint.json")?;
670    /// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
671    /// let mut processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
672    /// let bars = processor.process_agg_trade_records(&next_file_trades)?;
673    /// ```
674    pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
675        // Issue #62: Validate threshold range before restoring from checkpoint
676        // Valid range: 1-100,000 dbps (0.0001% to 10%)
677        const THRESHOLD_MIN: u32 = 1;
678        const THRESHOLD_MAX: u32 = 100_000;
679        if checkpoint.threshold_decimal_bps < THRESHOLD_MIN
680            || checkpoint.threshold_decimal_bps > THRESHOLD_MAX
681        {
682            return Err(CheckpointError::InvalidThreshold {
683                threshold: checkpoint.threshold_decimal_bps,
684                min_threshold: THRESHOLD_MIN,
685                max_threshold: THRESHOLD_MAX,
686            });
687        }
688
689        // Validate checkpoint consistency
690        if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
691            return Err(CheckpointError::MissingThresholds);
692        }
693
694        // Restore bar state if there's an incomplete bar
695        // Note: accumulated_trades is reset to empty - intra-bar features won't be
696        // accurate for bars resumed from checkpoint (partial trade history lost)
697        let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
698            (Some(bar), Some((upper, lower))) => Some(RangeBarState {
699                bar,
700                upper_threshold: upper,
701                lower_threshold: lower,
702                accumulated_trades: Vec::new(), // Lost on checkpoint - features may be partial
703            }),
704            _ => None,
705        };
706
707        Ok(Self {
708            threshold_decimal_bps: checkpoint.threshold_decimal_bps,
709            current_bar_state,
710            price_window: PriceWindow::new(), // Reset - will be rebuilt from new trades
711            last_trade_id: checkpoint.last_trade_id,
712            last_timestamp_us: checkpoint.last_timestamp_us,
713            anomaly_summary: checkpoint.anomaly_summary,
714            resumed_from_checkpoint: true, // Signal to continue from existing bar state
715            prevent_same_timestamp_close: checkpoint.prevent_same_timestamp_close,
716            defer_open: checkpoint.defer_open, // Issue #46: Restore deferred open state
717            trade_history: None,               // Issue #59: Must be re-enabled after restore
718            inter_bar_config: None,            // Issue #59: Must be re-enabled after restore
719            include_intra_bar_features: false, // Issue #59: Must be re-enabled after restore
720        })
721    }
722
723    /// Verify we're at the right position in the data stream
724    ///
725    /// Call with first trade of new file to verify continuity.
726    /// Returns verification result indicating if there's a gap or exact match.
727    ///
728    /// # Arguments
729    ///
730    /// * `first_trade` - First trade of the new file/chunk
731    ///
732    /// # Example
733    ///
734    /// ```ignore
735    /// let processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
736    /// let verification = processor.verify_position(&next_file_trades[0]);
737    /// match verification {
738    ///     PositionVerification::Exact => println!("Perfect continuation!"),
739    ///     PositionVerification::Gap { missing_count, .. } => {
740    ///         println!("Warning: {} trades missing", missing_count);
741    ///     }
742    ///     PositionVerification::TimestampOnly { gap_ms } => {
743    ///         println!("Exness data: {}ms gap", gap_ms);
744    ///     }
745    /// }
746    /// ```
747    pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
748        match self.last_trade_id {
749            Some(last_id) => {
750                // Binance: has trade IDs - check for gaps
751                let expected_id = last_id + 1;
752                if first_trade.agg_trade_id == expected_id {
753                    PositionVerification::Exact
754                } else {
755                    let missing_count = first_trade.agg_trade_id - expected_id;
756                    PositionVerification::Gap {
757                        expected_id,
758                        actual_id: first_trade.agg_trade_id,
759                        missing_count,
760                    }
761                }
762            }
763            None => {
764                // Exness: no trade IDs - use timestamp only
765                let gap_us = first_trade.timestamp - self.last_timestamp_us;
766                let gap_ms = gap_us / 1000;
767                PositionVerification::TimestampOnly { gap_ms }
768            }
769        }
770    }
771
772    /// Get the current anomaly summary
773    pub fn anomaly_summary(&self) -> &AnomalySummary {
774        &self.anomaly_summary
775    }
776
777    /// Get the threshold in decimal basis points
778    pub fn threshold_decimal_bps(&self) -> u32 {
779        self.threshold_decimal_bps
780    }
781
782    /// Validate that trades are properly sorted for deterministic processing
783    fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
784        for i in 1..trades.len() {
785            let prev = &trades[i - 1];
786            let curr = &trades[i];
787
788            // Check ordering: (timestamp, agg_trade_id) ascending
789            if curr.timestamp < prev.timestamp
790                || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
791            {
792                return Err(ProcessingError::UnsortedTrades {
793                    index: i,
794                    prev_time: prev.timestamp,
795                    prev_id: prev.agg_trade_id,
796                    curr_time: curr.timestamp,
797                    curr_id: curr.agg_trade_id,
798                });
799            }
800        }
801
802        Ok(())
803    }
804
805    /// Reset processor state at an ouroboros boundary (year/month/week).
806    ///
807    /// Clears the incomplete bar and position tracking while preserving
808    /// the threshold configuration. Use this when starting fresh at a
809    /// known boundary for reproducibility.
810    ///
811    /// # Returns
812    ///
813    /// The orphaned incomplete bar (if any) so caller can decide
814    /// whether to include it in results with `is_orphan=True` flag.
815    ///
816    /// # Example
817    ///
818    /// ```ignore
819    /// // At year boundary (Jan 1 00:00:00 UTC)
820    /// let orphaned = processor.reset_at_ouroboros();
821    /// if let Some(bar) = orphaned {
822    ///     // Handle incomplete bar from previous year
823    /// }
824    /// // Continue processing new year's data with clean state
825    /// ```
826    pub fn reset_at_ouroboros(&mut self) -> Option<RangeBar> {
827        let orphaned = self.current_bar_state.take().map(|state| state.bar);
828        self.price_window = PriceWindow::new();
829        self.last_trade_id = None;
830        self.last_timestamp_us = 0;
831        self.resumed_from_checkpoint = false;
832        self.defer_open = false;
833        // Issue #81: Clear bar boundary tracking at ouroboros reset.
834        // Trades are preserved — still valid lookback for first bar of new segment.
835        if let Some(ref mut history) = self.trade_history {
836            history.reset_bar_boundaries();
837        }
838        orphaned
839    }
840}
841
842/// Internal state for a range bar being built
843#[derive(Clone)]
844struct RangeBarState {
845    /// The range bar being constructed
846    pub bar: RangeBar,
847
848    /// Upper breach threshold (FIXED from bar open)
849    pub upper_threshold: FixedPoint,
850
851    /// Lower breach threshold (FIXED from bar open)
852    pub lower_threshold: FixedPoint,
853
854    /// Accumulated trades for intra-bar feature computation (Issue #59)
855    ///
856    /// When intra-bar features are enabled, trades are accumulated here
857    /// during bar construction and used to compute features at bar close.
858    /// Cleared when bar closes to free memory.
859    pub accumulated_trades: Vec<AggTrade>,
860}
861
862impl RangeBarState {
863    /// Create new range bar state from opening trade
864    fn new(trade: &AggTrade, threshold_decimal_bps: u32) -> Self {
865        let bar = RangeBar::new(trade);
866
867        // Compute FIXED thresholds from opening price
868        let (upper_threshold, lower_threshold) =
869            bar.open.compute_range_thresholds(threshold_decimal_bps);
870
871        Self {
872            bar,
873            upper_threshold,
874            lower_threshold,
875            accumulated_trades: Vec::new(),
876        }
877    }
878
879    /// Create new range bar state with intra-bar feature accumulation
880    fn new_with_trade_accumulation(trade: &AggTrade, threshold_decimal_bps: u32) -> Self {
881        let bar = RangeBar::new(trade);
882
883        // Compute FIXED thresholds from opening price
884        let (upper_threshold, lower_threshold) =
885            bar.open.compute_range_thresholds(threshold_decimal_bps);
886
887        Self {
888            bar,
889            upper_threshold,
890            lower_threshold,
891            accumulated_trades: vec![trade.clone()],
892        }
893    }
894
895    /// Accumulate a trade for intra-bar feature computation
896    fn accumulate_trade(&mut self, trade: &AggTrade) {
897        self.accumulated_trades.push(trade.clone());
898    }
899}
900
901#[cfg(test)]
902mod tests {
903    use super::*;
904    use crate::test_utils::{self, scenarios};
905
906    #[test]
907    fn test_single_bar_no_breach() {
908        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 dbps = 0.25%
909
910        // Create trades that stay within 250 dbps threshold
911        let trades = scenarios::no_breach_sequence(250);
912
913        // Test strict algorithm compliance: no bars should be created without breach
914        let bars = processor.process_agg_trade_records(&trades).unwrap();
915        assert_eq!(
916            bars.len(),
917            0,
918            "Strict algorithm should not create bars without breach"
919        );
920
921        // Test analysis mode: incomplete bar should be available for analysis
922        let bars_with_incomplete = processor
923            .process_agg_trade_records_with_incomplete(&trades)
924            .unwrap();
925        assert_eq!(
926            bars_with_incomplete.len(),
927            1,
928            "Analysis mode should include incomplete bar"
929        );
930
931        let bar = &bars_with_incomplete[0];
932        assert_eq!(bar.open.to_string(), "50000.00000000");
933        assert_eq!(bar.high.to_string(), "50100.00000000");
934        assert_eq!(bar.low.to_string(), "49900.00000000");
935        assert_eq!(bar.close.to_string(), "49900.00000000");
936    }
937
938    #[test]
939    fn test_exact_breach_upward() {
940        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 dbps = 0.25%
941
942        let trades = scenarios::exact_breach_upward(250);
943
944        // Test strict algorithm: only completed bars (with breach)
945        let bars = processor.process_agg_trade_records(&trades).unwrap();
946        assert_eq!(
947            bars.len(),
948            1,
949            "Strict algorithm should only return completed bars"
950        );
951
952        // First bar should close at breach
953        let bar1 = &bars[0];
954        assert_eq!(bar1.open.to_string(), "50000.00000000");
955        // Breach at 250 dbps = 0.25% = 50000 * 1.0025 = 50125
956        assert_eq!(bar1.close.to_string(), "50125.00000000"); // Breach tick included
957        assert_eq!(bar1.high.to_string(), "50125.00000000");
958        assert_eq!(bar1.low.to_string(), "50000.00000000");
959
960        // Test analysis mode: includes incomplete second bar
961        let bars_with_incomplete = processor
962            .process_agg_trade_records_with_incomplete(&trades)
963            .unwrap();
964        assert_eq!(
965            bars_with_incomplete.len(),
966            2,
967            "Analysis mode should include incomplete bars"
968        );
969
970        // Second bar should start at next tick price (not breach price)
971        let bar2 = &bars_with_incomplete[1];
972        assert_eq!(bar2.open.to_string(), "50500.00000000"); // Next tick after breach
973        assert_eq!(bar2.close.to_string(), "50500.00000000");
974    }
975
976    #[test]
977    fn test_exact_breach_downward() {
978        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
979
980        let trades = scenarios::exact_breach_downward(250);
981
982        let bars = processor.process_agg_trade_records(&trades).unwrap();
983
984        assert_eq!(bars.len(), 1);
985
986        let bar = &bars[0];
987        assert_eq!(bar.open.to_string(), "50000.00000000");
988        assert_eq!(bar.close.to_string(), "49875.00000000"); // Breach tick included
989        assert_eq!(bar.high.to_string(), "50000.00000000");
990        assert_eq!(bar.low.to_string(), "49875.00000000");
991    }
992
993    #[test]
994    fn test_large_gap_single_bar() {
995        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
996
997        let trades = scenarios::large_gap_sequence();
998
999        let bars = processor.process_agg_trade_records(&trades).unwrap();
1000
1001        // Should create exactly ONE bar, not multiple bars to "fill the gap"
1002        assert_eq!(bars.len(), 1);
1003
1004        let bar = &bars[0];
1005        assert_eq!(bar.open.to_string(), "50000.00000000");
1006        assert_eq!(bar.close.to_string(), "51000.00000000");
1007        assert_eq!(bar.high.to_string(), "51000.00000000");
1008        assert_eq!(bar.low.to_string(), "50000.00000000");
1009    }
1010
1011    #[test]
1012    fn test_unsorted_trades_error() {
1013        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
1014
1015        let trades = scenarios::unsorted_sequence();
1016
1017        let result = processor.process_agg_trade_records(&trades);
1018        assert!(result.is_err());
1019
1020        match result {
1021            Err(ProcessingError::UnsortedTrades { index, .. }) => {
1022                assert_eq!(index, 1);
1023            }
1024            _ => panic!("Expected UnsortedTrades error"),
1025        }
1026    }
1027
1028    #[test]
1029    fn test_threshold_calculation() {
1030        let processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
1031
1032        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1033        let bar_state = RangeBarState::new(&trade, processor.threshold_decimal_bps);
1034
1035        // 50000 * 0.0025 = 125 (25bps = 0.25%)
1036        assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
1037        assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
1038    }
1039
1040    #[test]
1041    fn test_empty_trades() {
1042        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
1043        let trades = scenarios::empty_sequence();
1044        let bars = processor.process_agg_trade_records(&trades).unwrap();
1045        assert_eq!(bars.len(), 0);
1046    }
1047
1048    #[test]
1049    fn test_debug_streaming_data() {
1050        let mut processor = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1051
1052        // Create trades similar to our test data
1053        let trades = vec![
1054            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1055            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
1056            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1057        ];
1058
1059        println!("Test data prices: 50014 -> 50163 -> 50032");
1060        println!("Expected price movements: +0.3% then -0.26%");
1061
1062        let bars = processor.process_agg_trade_records(&trades).unwrap();
1063        println!("Generated {} range bars", bars.len());
1064
1065        for (i, bar) in bars.iter().enumerate() {
1066            println!(
1067                "  Bar {}: O={} H={} L={} C={}",
1068                i + 1,
1069                bar.open,
1070                bar.high,
1071                bar.low,
1072                bar.close
1073            );
1074        }
1075
1076        // With a 0.1% threshold and 0.3% price movement, we should get at least 1 bar
1077        assert!(
1078            !bars.is_empty(),
1079            "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
1080        );
1081    }
1082
1083    #[test]
1084    fn test_threshold_validation() {
1085        // Valid threshold
1086        assert!(RangeBarProcessor::new(250).is_ok());
1087
1088        // Invalid: too low (0 × 0.1bps = 0%)
1089        assert!(matches!(
1090            RangeBarProcessor::new(0),
1091            Err(ProcessingError::InvalidThreshold {
1092                threshold_decimal_bps: 0
1093            })
1094        ));
1095
1096        // Invalid: too high (150,000 × 0.1bps = 15,000bps = 150%)
1097        assert!(matches!(
1098            RangeBarProcessor::new(150_000),
1099            Err(ProcessingError::InvalidThreshold {
1100                threshold_decimal_bps: 150_000
1101            })
1102        ));
1103
1104        // Valid boundary: minimum (1 × 0.1bps = 0.1bps = 0.001%)
1105        assert!(RangeBarProcessor::new(1).is_ok());
1106
1107        // Valid boundary: maximum (100,000 × 0.1bps = 10,000bps = 100%)
1108        assert!(RangeBarProcessor::new(100_000).is_ok());
1109    }
1110
1111    #[test]
1112    fn test_export_processor_with_manual_trades() {
1113        println!("Testing ExportRangeBarProcessor with same trade data...");
1114
1115        let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1116
1117        // Use same trades as the working basic test
1118        let trades = vec![
1119            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1120            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
1121            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1122        ];
1123
1124        println!(
1125            "Processing {} trades with ExportRangeBarProcessor...",
1126            trades.len()
1127        );
1128
1129        export_processor.process_trades_continuously(&trades);
1130        let bars = export_processor.get_all_completed_bars();
1131
1132        println!(
1133            "ExportRangeBarProcessor generated {} range bars",
1134            bars.len()
1135        );
1136        for (i, bar) in bars.iter().enumerate() {
1137            println!(
1138                "  Bar {}: O={} H={} L={} C={}",
1139                i + 1,
1140                bar.open,
1141                bar.high,
1142                bar.low,
1143                bar.close
1144            );
1145        }
1146
1147        // Should match the basic processor results (1 bar)
1148        assert!(
1149            !bars.is_empty(),
1150            "ExportRangeBarProcessor should generate same results as basic processor"
1151        );
1152    }
1153
1154    // === CHECKPOINT TESTS (Issues #2 and #3) ===
1155
1156    #[test]
1157    fn test_checkpoint_creation() {
1158        let mut processor = RangeBarProcessor::new(250).unwrap();
1159
1160        // Process some trades that don't complete a bar
1161        let trades = scenarios::no_breach_sequence(250);
1162        let _bars = processor.process_agg_trade_records(&trades).unwrap();
1163
1164        // Create checkpoint
1165        let checkpoint = processor.create_checkpoint("BTCUSDT");
1166
1167        assert_eq!(checkpoint.symbol, "BTCUSDT");
1168        assert_eq!(checkpoint.threshold_decimal_bps, 250);
1169        assert!(checkpoint.has_incomplete_bar()); // Should have incomplete bar
1170        assert!(checkpoint.thresholds.is_some()); // Thresholds should be saved
1171        assert!(checkpoint.last_trade_id.is_some()); // Should track last trade
1172    }
1173
1174    #[test]
1175    fn test_checkpoint_serialization_roundtrip() {
1176        let mut processor = RangeBarProcessor::new(250).unwrap();
1177
1178        // Process trades
1179        let trades = scenarios::no_breach_sequence(250);
1180        let _bars = processor.process_agg_trade_records(&trades).unwrap();
1181
1182        // Create checkpoint
1183        let checkpoint = processor.create_checkpoint("BTCUSDT");
1184
1185        // Serialize to JSON
1186        let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
1187
1188        // Deserialize back
1189        let restored: Checkpoint =
1190            serde_json::from_str(&json).expect("Deserialization should succeed");
1191
1192        assert_eq!(restored.symbol, checkpoint.symbol);
1193        assert_eq!(
1194            restored.threshold_decimal_bps,
1195            checkpoint.threshold_decimal_bps
1196        );
1197        assert_eq!(
1198            restored.incomplete_bar.is_some(),
1199            checkpoint.incomplete_bar.is_some()
1200        );
1201    }
1202
1203    #[test]
1204    fn test_cross_file_bar_continuation() {
1205        // This is the PRIMARY test for Issues #2 and #3
1206        // Verifies that incomplete bars continue correctly across file boundaries
1207
1208        // Create trades that span multiple bars
1209        let mut all_trades = Vec::new();
1210
1211        // Generate enough trades to produce multiple bars
1212        // Using 100bps threshold (1%) for clearer price movements
1213        let base_timestamp = 1640995200000000i64; // Microseconds
1214
1215        // Create a sequence where we'll have ~3-4 completed bars with remainder
1216        for i in 0..20 {
1217            let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
1218            let trade = test_utils::create_test_agg_trade(
1219                i + 1,
1220                &format!("{:.8}", price),
1221                "1.0",
1222                base_timestamp + (i * 1000000),
1223            );
1224            all_trades.push(trade);
1225        }
1226
1227        // === FULL PROCESSING (baseline) ===
1228        let mut processor_full = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1229        let bars_full = processor_full
1230            .process_agg_trade_records(&all_trades)
1231            .unwrap();
1232
1233        // === SPLIT PROCESSING WITH CHECKPOINT ===
1234        let split_point = 10; // Split in the middle
1235
1236        // Part 1: Process first half
1237        let mut processor_1 = RangeBarProcessor::new(100).unwrap();
1238        let part1_trades = &all_trades[0..split_point];
1239        let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
1240
1241        // Create checkpoint
1242        let checkpoint = processor_1.create_checkpoint("TEST");
1243
1244        // Part 2: Resume from checkpoint and process second half
1245        let mut processor_2 = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
1246        let part2_trades = &all_trades[split_point..];
1247        let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
1248
1249        // === VERIFY CONTINUATION ===
1250        // Total completed bars should match full processing
1251        let split_total = bars_1.len() + bars_2.len();
1252
1253        println!("Full processing: {} bars", bars_full.len());
1254        println!(
1255            "Split processing: {} + {} = {} bars",
1256            bars_1.len(),
1257            bars_2.len(),
1258            split_total
1259        );
1260
1261        assert_eq!(
1262            split_total,
1263            bars_full.len(),
1264            "Split processing should produce same bar count as full processing"
1265        );
1266
1267        // Verify the bars themselves match
1268        let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
1269        for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
1270            assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
1271            assert_eq!(
1272                full.close.0, split.close.0,
1273                "Bar {} close price mismatch",
1274                i
1275            );
1276        }
1277    }
1278
1279    #[test]
1280    fn test_verify_position_exact() {
1281        let mut processor = RangeBarProcessor::new(250).unwrap();
1282
1283        // Process some trades
1284        let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1285        let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1286
1287        let _ = processor.process_single_trade(trade1);
1288        let _ = processor.process_single_trade(trade2);
1289
1290        // Create next trade in sequence
1291        let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
1292
1293        // Verify position
1294        let verification = processor.verify_position(&next_trade);
1295
1296        assert_eq!(verification, PositionVerification::Exact);
1297    }
1298
1299    #[test]
1300    fn test_verify_position_gap() {
1301        let mut processor = RangeBarProcessor::new(250).unwrap();
1302
1303        // Process some trades
1304        let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1305        let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1306
1307        let _ = processor.process_single_trade(trade1);
1308        let _ = processor.process_single_trade(trade2);
1309
1310        // Create next trade with gap (skip IDs 102-104)
1311        let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
1312
1313        // Verify position
1314        let verification = processor.verify_position(&next_trade);
1315
1316        match verification {
1317            PositionVerification::Gap {
1318                expected_id,
1319                actual_id,
1320                missing_count,
1321            } => {
1322                assert_eq!(expected_id, 102);
1323                assert_eq!(actual_id, 105);
1324                assert_eq!(missing_count, 3);
1325            }
1326            _ => panic!("Expected Gap verification, got {:?}", verification),
1327        }
1328    }
1329
1330    #[test]
1331    fn test_checkpoint_clean_completion() {
1332        // Test when last trade completes a bar with no remainder
1333        // In range bar algorithm: breach trade closes bar, NEXT trade opens new bar
1334        // If there's no next trade, there's no incomplete bar
1335        let mut processor = RangeBarProcessor::new(100).unwrap(); // 10bps
1336
1337        // Create trades that complete exactly one bar
1338        let trades = vec![
1339            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1340            test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // ~0.2% move, breaches 0.1%
1341        ];
1342
1343        let bars = processor.process_agg_trade_records(&trades).unwrap();
1344        assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1345
1346        // Create checkpoint - should NOT have incomplete bar
1347        // (breach trade closes bar, no next trade to open new bar)
1348        let checkpoint = processor.create_checkpoint("TEST");
1349
1350        // With defer_open logic, the next bar isn't started until the next trade
1351        assert!(
1352            !checkpoint.has_incomplete_bar(),
1353            "No incomplete bar when last trade was a breach with no following trade"
1354        );
1355    }
1356
1357    #[test]
1358    fn test_checkpoint_with_remainder() {
1359        // Test when we have trades remaining after a completed bar
1360        let mut processor = RangeBarProcessor::new(100).unwrap(); // 10bps
1361
1362        // Create trades: bar completes at trade 2, trade 3 starts new bar
1363        let trades = vec![
1364            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1365            test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // Breach
1366            test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), // Opens new bar
1367        ];
1368
1369        let bars = processor.process_agg_trade_records(&trades).unwrap();
1370        assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1371
1372        // Create checkpoint - should have incomplete bar from trade 3
1373        let checkpoint = processor.create_checkpoint("TEST");
1374
1375        assert!(
1376            checkpoint.has_incomplete_bar(),
1377            "Should have incomplete bar from trade 3"
1378        );
1379
1380        // Verify the incomplete bar has correct data
1381        let incomplete = checkpoint.incomplete_bar.unwrap();
1382        assert_eq!(
1383            incomplete.open.to_string(),
1384            "50110.00000000",
1385            "Incomplete bar should open at trade 3 price"
1386        );
1387    }
1388
1389    /// Issue #46: Verify streaming and batch paths produce identical bars
1390    ///
1391    /// The batch path (`process_agg_trade_records`) and streaming path
1392    /// (`process_single_trade`) must produce identical OHLCV output for
1393    /// the same input trades. This test catches regressions where the
1394    /// breaching trade is double-counted or bar boundaries differ.
1395    #[test]
1396    fn test_streaming_batch_parity() {
1397        let threshold = 250; // 250 dbps = 0.25%
1398
1399        // Build a sequence with multiple breaches
1400        let trades = test_utils::AggTradeBuilder::new()
1401            .add_trade(1, 1.0, 0) // Open first bar at 50000
1402            .add_trade(2, 1.001, 1000) // +0.1% - accumulate
1403            .add_trade(3, 1.003, 2000) // +0.3% - breach (>0.25%)
1404            .add_trade(4, 1.004, 3000) // Opens second bar
1405            .add_trade(5, 1.005, 4000) // Accumulate
1406            .add_trade(6, 1.008, 5000) // +0.4% from bar 2 open - breach
1407            .add_trade(7, 1.009, 6000) // Opens third bar
1408            .build();
1409
1410        // === BATCH PATH ===
1411        let mut batch_processor = RangeBarProcessor::new(threshold).unwrap();
1412        let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
1413        let batch_incomplete = batch_processor.get_incomplete_bar();
1414
1415        // === STREAMING PATH ===
1416        let mut stream_processor = RangeBarProcessor::new(threshold).unwrap();
1417        let mut stream_bars: Vec<RangeBar> = Vec::new();
1418        for trade in &trades {
1419            if let Some(bar) = stream_processor
1420                .process_single_trade(trade.clone())
1421                .unwrap()
1422            {
1423                stream_bars.push(bar);
1424            }
1425        }
1426        let stream_incomplete = stream_processor.get_incomplete_bar();
1427
1428        // === VERIFY PARITY ===
1429        assert_eq!(
1430            batch_bars.len(),
1431            stream_bars.len(),
1432            "Batch and streaming should produce same number of completed bars"
1433        );
1434
1435        for (i, (batch_bar, stream_bar)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1436            assert_eq!(
1437                batch_bar.open, stream_bar.open,
1438                "Bar {i}: open price mismatch"
1439            );
1440            assert_eq!(
1441                batch_bar.close, stream_bar.close,
1442                "Bar {i}: close price mismatch"
1443            );
1444            assert_eq!(
1445                batch_bar.high, stream_bar.high,
1446                "Bar {i}: high price mismatch"
1447            );
1448            assert_eq!(batch_bar.low, stream_bar.low, "Bar {i}: low price mismatch");
1449            assert_eq!(
1450                batch_bar.volume, stream_bar.volume,
1451                "Bar {i}: volume mismatch (double-counting?)"
1452            );
1453            assert_eq!(
1454                batch_bar.open_time, stream_bar.open_time,
1455                "Bar {i}: open_time mismatch"
1456            );
1457            assert_eq!(
1458                batch_bar.close_time, stream_bar.close_time,
1459                "Bar {i}: close_time mismatch"
1460            );
1461            assert_eq!(
1462                batch_bar.individual_trade_count, stream_bar.individual_trade_count,
1463                "Bar {i}: trade count mismatch"
1464            );
1465        }
1466
1467        // Verify incomplete bars match
1468        match (batch_incomplete, stream_incomplete) {
1469            (Some(b), Some(s)) => {
1470                assert_eq!(b.open, s.open, "Incomplete bar: open mismatch");
1471                assert_eq!(b.close, s.close, "Incomplete bar: close mismatch");
1472                assert_eq!(b.volume, s.volume, "Incomplete bar: volume mismatch");
1473            }
1474            (None, None) => {} // Both finished cleanly
1475            _ => panic!("Incomplete bar presence mismatch between batch and streaming"),
1476        }
1477    }
1478
1479    /// Issue #46: After breach, next trade opens new bar (not breaching trade)
1480    #[test]
1481    fn test_defer_open_new_bar_opens_with_next_trade() {
1482        let mut processor = RangeBarProcessor::new(250).unwrap();
1483
1484        // Trade 1: Opens bar at 50000
1485        let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1486        assert!(processor.process_single_trade(t1).unwrap().is_none());
1487
1488        // Trade 2: Breaches threshold (+0.3%)
1489        let t2 = test_utils::create_test_agg_trade(2, "50150.0", "2.0", 2000);
1490        let bar = processor.process_single_trade(t2).unwrap();
1491        assert!(bar.is_some(), "Should close bar on breach");
1492
1493        let closed_bar = bar.unwrap();
1494        assert_eq!(closed_bar.open.to_string(), "50000.00000000");
1495        assert_eq!(closed_bar.close.to_string(), "50150.00000000");
1496
1497        // After breach, no incomplete bar should exist
1498        assert!(
1499            processor.get_incomplete_bar().is_none(),
1500            "No incomplete bar after breach - defer_open is true"
1501        );
1502
1503        // Trade 3: Should open NEW bar (not the breaching trade)
1504        let t3 = test_utils::create_test_agg_trade(3, "50100.0", "3.0", 3000);
1505        assert!(processor.process_single_trade(t3).unwrap().is_none());
1506
1507        let incomplete = processor.get_incomplete_bar().unwrap();
1508        assert_eq!(
1509            incomplete.open.to_string(),
1510            "50100.00000000",
1511            "New bar should open at trade 3's price, not trade 2's"
1512        );
1513    }
1514
1515    // === Memory efficiency tests (R1/R2/R3) ===
1516
1517    #[test]
1518    fn test_bar_close_take_single_trade() {
1519        // R1: Verify bar close via single-trade path produces correct OHLCV after
1520        // clone→take optimization. Uses single_breach_sequence that triggers breach.
1521        let mut processor = RangeBarProcessor::new(250).unwrap();
1522        let trades = scenarios::single_breach_sequence(250);
1523
1524        for trade in &trades[..trades.len() - 1] {
1525            let result = processor.process_single_trade(trade.clone()).unwrap();
1526            assert!(result.is_none());
1527        }
1528
1529        // Last trade triggers breach
1530        let bar = processor
1531            .process_single_trade(trades.last().unwrap().clone())
1532            .unwrap()
1533            .expect("Should produce completed bar");
1534
1535        // Verify OHLCV integrity after take() optimization
1536        assert_eq!(bar.open.to_string(), "50000.00000000");
1537        assert!(bar.high >= bar.open.max(bar.close));
1538        assert!(bar.low <= bar.open.min(bar.close));
1539        assert!(bar.volume > 0);
1540
1541        // Verify processor state is clean after bar close
1542        assert!(processor.get_incomplete_bar().is_none());
1543    }
1544
1545    #[test]
1546    fn test_bar_close_take_batch() {
1547        // R2: Verify batch path produces correct bars after clone→take optimization.
1548        // large_sequence generates enough trades to trigger multiple breaches.
1549        let mut processor = RangeBarProcessor::new(250).unwrap();
1550        let trades = scenarios::large_sequence(500);
1551
1552        let bars = processor.process_agg_trade_records(&trades).unwrap();
1553        assert!(
1554            !bars.is_empty(),
1555            "Should produce at least one completed bar"
1556        );
1557
1558        // Verify every bar has valid OHLCV invariants
1559        for bar in &bars {
1560            assert!(bar.high >= bar.open.max(bar.close));
1561            assert!(bar.low <= bar.open.min(bar.close));
1562            assert!(bar.volume > 0);
1563            assert!(bar.close_time >= bar.open_time);
1564        }
1565    }
1566
1567    #[test]
1568    fn test_checkpoint_conditional_clone() {
1569        // R3: Verify checkpoint state is preserved correctly with both
1570        // include_incomplete=true and include_incomplete=false.
1571        let trades = scenarios::no_breach_sequence(250);
1572
1573        // Test with include_incomplete=false (move, no clone)
1574        let mut processor1 = RangeBarProcessor::new(250).unwrap();
1575        let bars_without = processor1.process_agg_trade_records(&trades).unwrap();
1576        assert_eq!(bars_without.len(), 0);
1577        // Checkpoint should be preserved
1578        assert!(processor1.get_incomplete_bar().is_some());
1579
1580        // Test with include_incomplete=true (clone + consume)
1581        let mut processor2 = RangeBarProcessor::new(250).unwrap();
1582        let bars_with = processor2
1583            .process_agg_trade_records_with_incomplete(&trades)
1584            .unwrap();
1585        assert_eq!(bars_with.len(), 1);
1586        // Checkpoint should ALSO be preserved (cloned before consume)
1587        assert!(processor2.get_incomplete_bar().is_some());
1588
1589        // Both checkpoints should have identical bar content
1590        let cp1 = processor1.get_incomplete_bar().unwrap();
1591        let cp2 = processor2.get_incomplete_bar().unwrap();
1592        assert_eq!(cp1.open, cp2.open);
1593        assert_eq!(cp1.close, cp2.close);
1594        assert_eq!(cp1.high, cp2.high);
1595        assert_eq!(cp1.low, cp2.low);
1596    }
1597}