Skip to main content

opendeviationbar_core/
processor.rs

1// FILE-SIZE-OK: 1184 lines — tests extracted to tests/processor_tests.rs
2//! Core open deviation bar processing algorithm
3//!
4//! Implements non-lookahead bias open deviation bar construction where bars close when
5//! price moves ±threshold dbps from the bar's OPEN price.
6
7use crate::checkpoint::{
8    AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
9};
10use crate::fixed_point::FixedPoint;
11use crate::interbar::{InterBarConfig, TradeHistory}; // Issue #59
12// Issue #59: Intra-bar features - using compute_intra_bar_features_with_config (Issue #128)
13use crate::types::{AggTrade, OpenDeviationBar};
14#[cfg(feature = "python")]
15use pyo3::prelude::*;
16use smallvec::SmallVec; // Issue #119: Trade accumulation with inline buffer (512 slots ≈ 29KB)
17// Re-export ProcessingError from errors.rs (Phase 2a extraction)
18pub use crate::errors::ProcessingError;
19// Re-export ExportOpenDeviationBarProcessor from export_processor.rs (Phase 2d extraction)
20pub use crate::export_processor::ExportOpenDeviationBarProcessor;
21
22/// Open deviation bar processor with non-lookahead bias guarantee
23pub struct OpenDeviationBarProcessor {
24    /// Threshold in decimal basis points (250 = 25bps, v3.0.0+)
25    threshold_decimal_bps: u32,
26
27    /// Issue #96 Task #98: Pre-computed threshold ratio for fast delta calculation
28    /// Stores (threshold_decimal_bps * SCALE) / BASIS_POINTS_SCALE as fixed-point
29    /// This allows compute_range_thresholds() to compute delta = (price * ratio) / SCALE
30    /// without repeated division, avoiding i128 arithmetic in hot path.
31    /// Performance: Eliminates BASIS_POINTS_SCALE division on every bar creation.
32    /// Made public for testing purposes.
33    pub threshold_ratio: i64,
34
35    /// Current bar state for streaming processing (Q19)
36    /// Enables get_incomplete_bar() and stateful process_single_trade()
37    current_bar_state: Option<OpenDeviationBarState>,
38
39    /// Price window for checkpoint hash verification
40    price_window: PriceWindow,
41
42    /// Last processed trade ID (for gap detection on resume)
43    last_trade_id: Option<i64>,
44
45    /// Last completed bar's trade ID for dedup floor initialization (v1.4)
46    ///
47    /// Updated ONLY on bar completion (process_single_trade returns Some) and
48    /// orphan emission (reset_at_ouroboros). NOT updated on every trade.
49    /// This gives committed_floors a floor that excludes the forming bar tail,
50    /// preventing suppression of bars at the REST-to-WS junction.
51    last_completed_bar_tid: Option<i64>,
52
53    /// Last processed timestamp (for position verification)
54    last_timestamp_us: i64,
55
56    /// Anomaly tracking for debugging
57    anomaly_summary: AnomalySummary,
58
59    /// Flag indicating this processor was created from a checkpoint
60    /// When true, process_agg_trade_records will continue from existing bar state
61    resumed_from_checkpoint: bool,
62
63    /// Prevent bars from closing on same timestamp as they opened (Issue #36)
64    ///
65    /// When true (default): A bar cannot close until a trade arrives with a
66    /// different timestamp than the bar's open_time. This prevents "instant bars"
67    /// during flash crashes where multiple trades occur at the same millisecond.
68    ///
69    /// When false: Legacy behavior - bars can close on any breach regardless
70    /// of timestamp, which may produce bars with identical timestamps.
71    prevent_same_timestamp_close: bool,
72
73    /// Deferred bar open flag (Issue #46)
74    ///
75    /// When true: The previous trade triggered a threshold breach and closed a bar.
76    /// The next trade arriving via `process_single_trade()` should open a new bar
77    /// instead of being treated as a continuation.
78    ///
79    /// This matches the batch path's `defer_open` semantics in
80    /// `process_agg_trade_records()` where the breaching trade closes the current
81    /// bar and the NEXT trade opens the new bar.
82    defer_open: bool,
83
84    /// Trade history for inter-bar feature computation (Issue #59)
85    ///
86    /// Ring buffer of recent trades for computing lookback-based features.
87    /// When Some, features are computed from trades BEFORE each bar's open_time.
88    /// When None, inter-bar features are disabled (all lookback_* fields = None).
89    trade_history: Option<TradeHistory>,
90
91    /// Configuration for inter-bar features (Issue #59)
92    ///
93    /// Controls lookback mode (fixed count or time window) and which feature
94    /// tiers to compute. When None, inter-bar features are disabled.
95    inter_bar_config: Option<InterBarConfig>,
96
97    /// Enable intra-bar feature computation (Issue #59)
98    ///
99    /// When true, the processor accumulates trades during bar construction
100    /// and computes 22 features from trades WITHIN each bar at bar close.
101    /// Features include ITH (Investment Time Horizon), statistical, and
102    /// complexity metrics. When false, all intra_* fields are None.
103    include_intra_bar_features: bool,
104
105    /// Issue #128: Configuration for intra-bar feature computation.
106    /// Controls which complexity features (Hurst, PE) are computed.
107    intra_bar_config: crate::intrabar::IntraBarConfig,
108
109    /// Issue #112: Maximum timestamp gap in microseconds before discarding a forming bar
110    ///
111    /// When resuming from checkpoint with a forming bar, if the gap between
112    /// the forming bar's close_time and the first incoming trade exceeds this
113    /// threshold, the forming bar is discarded as an orphan (same as ouroboros reset).
114    /// This prevents "oversized" bars caused by large data gaps (e.g., 38-hour outages).
115    ///
116    /// Default: 3,600,000,000 μs (1 hour)
117    max_gap_us: i64,
118}
119
120/// Cold path: scan trades to find first unsorted pair and return error
121/// Extracted from validate_trade_ordering() to improve hot-path code layout
122#[cold]
123#[inline(never)]
124fn find_unsorted_trade(trades: &[AggTrade]) -> Result<(), ProcessingError> {
125    for i in 1..trades.len() {
126        let prev = &trades[i - 1];
127        let curr = &trades[i];
128        if curr.timestamp < prev.timestamp
129            || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
130        {
131            return Err(ProcessingError::UnsortedTrades {
132                index: i,
133                prev_time: prev.timestamp,
134                prev_id: prev.agg_trade_id,
135                curr_time: curr.timestamp,
136                curr_id: curr.agg_trade_id,
137            });
138        }
139    }
140    Ok(())
141}
142
143/// Cold path: construct unsorted trade error
144/// Extracted to keep error construction out of the hot validation loop
145#[cold]
146#[inline(never)]
147fn unsorted_trade_error(
148    index: usize,
149    prev: &AggTrade,
150    curr: &AggTrade,
151) -> Result<(), ProcessingError> {
152    Err(ProcessingError::UnsortedTrades {
153        index,
154        prev_time: prev.timestamp,
155        prev_id: prev.agg_trade_id,
156        curr_time: curr.timestamp,
157        curr_id: curr.agg_trade_id,
158    })
159}
160
161impl OpenDeviationBarProcessor {
162    /// Create new processor with given threshold
163    ///
164    /// Uses default behavior: `prevent_same_timestamp_close = true` (Issue #36)
165    ///
166    /// # Arguments
167    ///
168    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
169    ///   - Example: `250` → 25bps = 0.25%
170    ///   - Example: `10` → 1bps = 0.01%
171    ///   - Minimum: `1` → 0.1bps = 0.001%
172    ///
173    /// # Breaking Change (v3.0.0)
174    ///
175    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
176    /// **Migration**: Multiply all threshold values by 10.
177    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
178        Self::with_options(threshold_decimal_bps, true)
179    }
180
181    /// Create new processor with explicit timestamp gating control
182    ///
183    /// # Arguments
184    ///
185    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
186    /// * `prevent_same_timestamp_close` - If true, bars cannot close until
187    ///   timestamp advances from open_time. This prevents "instant bars" during
188    ///   flash crashes. Set to false for legacy behavior (pre-v9).
189    ///
190    /// # Example
191    ///
192    /// ```ignore
193    /// // Default behavior (v9+): timestamp gating enabled
194    /// let processor = OpenDeviationBarProcessor::new(250)?;
195    ///
196    /// // Legacy behavior: allow instant bars
197    /// let processor = OpenDeviationBarProcessor::with_options(250, false)?;
198    /// ```
199    pub fn with_options(
200        threshold_decimal_bps: u32,
201        prevent_same_timestamp_close: bool,
202    ) -> Result<Self, ProcessingError> {
203        // Validation bounds (v3.0.0: dbps units)
204        // Min: 1 dbps = 0.001%
205        // Max: 100,000 dbps = 100%
206        if threshold_decimal_bps < 1 {
207            return Err(ProcessingError::InvalidThreshold {
208                threshold_decimal_bps,
209            });
210        }
211        if threshold_decimal_bps > 100_000 {
212            return Err(ProcessingError::InvalidThreshold {
213                threshold_decimal_bps,
214            });
215        }
216
217        // Issue #96 Task #98: Pre-compute threshold ratio
218        // Ratio = (threshold_decimal_bps * SCALE) / BASIS_POINTS_SCALE
219        // This is used in compute_range_thresholds() for fast delta calculation
220        let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
221            / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
222
223        Ok(Self {
224            threshold_decimal_bps,
225            threshold_ratio,
226            current_bar_state: None,
227            price_window: PriceWindow::new(),
228            last_trade_id: None,
229            last_completed_bar_tid: None,
230            last_timestamp_us: 0,
231            anomaly_summary: AnomalySummary::default(),
232            resumed_from_checkpoint: false,
233            prevent_same_timestamp_close,
234            defer_open: false,
235            trade_history: None,               // Issue #59: disabled by default
236            inter_bar_config: None,            // Issue #59: disabled by default
237            include_intra_bar_features: false, // Issue #59: disabled by default
238            intra_bar_config: crate::intrabar::IntraBarConfig::default(), // Issue #128
239            max_gap_us: 3_600_000_000,         // Issue #112: 1 hour default
240        })
241    }
242
243    /// Get the prevent_same_timestamp_close setting
244    pub fn prevent_same_timestamp_close(&self) -> bool {
245        self.prevent_same_timestamp_close
246    }
247
248    /// Enable inter-bar feature computation with the given configuration (Issue #59)
249    ///
250    /// When enabled, the processor maintains a trade history buffer and computes
251    /// lookback-based microstructure features on each bar close. Features are
252    /// computed from trades that occurred BEFORE each bar's open_time, ensuring
253    /// no lookahead bias.
254    ///
255    /// Uses a local entropy cache (default behavior, backward compatible).
256    /// For multi-symbol workloads, use `with_inter_bar_config_and_cache()` with a global cache.
257    ///
258    /// # Arguments
259    ///
260    /// * `config` - Configuration controlling lookback mode and feature tiers
261    ///
262    /// # Example
263    ///
264    /// ```ignore
265    /// use opendeviationbar_core::processor::OpenDeviationBarProcessor;
266    /// use opendeviationbar_core::interbar::{InterBarConfig, LookbackMode};
267    ///
268    /// let processor = OpenDeviationBarProcessor::new(1000)?
269    ///     .with_inter_bar_config(InterBarConfig {
270    ///         lookback_mode: LookbackMode::FixedCount(500),
271    ///         compute_tier2: true,
272    ///         compute_tier3: true,
273    ///         ..Default::default()
274    ///     });
275    /// ```
276    pub fn with_inter_bar_config(self, config: InterBarConfig) -> Self {
277        self.with_inter_bar_config_and_cache(config, None)
278    }
279
280    /// Enable inter-bar feature computation with optional external entropy cache
281    ///
282    /// Issue #145 Phase 3: Multi-Symbol Entropy Cache Sharing
283    ///
284    /// # Arguments
285    ///
286    /// * `config` - Configuration controlling lookback mode and feature tiers
287    /// * `external_cache` - Optional shared entropy cache from `get_global_entropy_cache()`
288    ///   - If provided: Uses the shared global cache (recommended for multi-symbol)
289    ///   - If None: Creates a local 128-entry cache (default, backward compatible)
290    ///
291    /// # Usage
292    ///
293    /// ```ignore
294    /// use opendeviationbar_core::{processor::OpenDeviationBarProcessor, entropy_cache_global::get_global_entropy_cache, interbar::InterBarConfig};
295    ///
296    /// // Single-symbol: use local cache (default)
297    /// let processor = OpenDeviationBarProcessor::new(1000)?
298    ///     .with_inter_bar_config(config);
299    ///
300    /// // Multi-symbol: share global cache
301    /// let global_cache = get_global_entropy_cache();
302    /// let processor = OpenDeviationBarProcessor::new(1000)?
303    ///     .with_inter_bar_config_and_cache(config, Some(global_cache));
304    /// ```
305    pub fn with_inter_bar_config_and_cache(
306        mut self,
307        config: InterBarConfig,
308        external_cache: Option<
309            std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>,
310        >,
311    ) -> Self {
312        self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
313        self.inter_bar_config = Some(config);
314        self
315    }
316
317    /// Check if inter-bar features are enabled
318    pub fn inter_bar_enabled(&self) -> bool {
319        self.inter_bar_config.is_some()
320    }
321
322    /// Issue #112: Configure maximum timestamp gap for checkpoint recovery
323    ///
324    /// When resuming from checkpoint with a forming bar, if the gap between
325    /// the forming bar's close_time and the first incoming trade exceeds this
326    /// threshold, the forming bar is discarded as an orphan.
327    ///
328    /// # Arguments
329    ///
330    /// * `max_gap_us` - Maximum gap in microseconds (default: 3,600,000,000 = 1 hour)
331    pub fn with_max_gap(mut self, max_gap_us: i64) -> Self {
332        self.max_gap_us = max_gap_us;
333        self
334    }
335
336    /// Get the maximum gap threshold in microseconds
337    pub fn max_gap_us(&self) -> i64 {
338        self.max_gap_us
339    }
340
341    /// Enable intra-bar feature computation (Issue #59)
342    ///
343    /// When enabled, the processor accumulates trades during bar construction
344    /// and computes 22 features from trades WITHIN each bar at bar close:
345    /// - 8 ITH features (Investment Time Horizon)
346    /// - 12 statistical features (OFI, intensity, Kyle lambda, etc.)
347    /// - 2 complexity features (Hurst exponent, permutation entropy)
348    ///
349    /// # Memory Note
350    ///
351    /// Trades are accumulated per-bar and freed when the bar closes.
352    /// Typical 1000 dbps bar: ~50-500 trades, ~2-24 KB overhead.
353    ///
354    /// # Example
355    ///
356    /// ```ignore
357    /// let processor = OpenDeviationBarProcessor::new(1000)?
358    ///     .with_intra_bar_features();
359    /// ```
360    pub fn with_intra_bar_features(mut self) -> Self {
361        self.include_intra_bar_features = true;
362        self
363    }
364
365    /// Check if intra-bar features are enabled
366    pub fn intra_bar_enabled(&self) -> bool {
367        self.include_intra_bar_features
368    }
369
370    /// Re-enable inter-bar features on an existing processor (Issue #97).
371    ///
372    /// Used after `from_checkpoint()` to restore microstructure config that
373    /// is not preserved in checkpoint state. Uses a local entropy cache by default.
374    /// For multi-symbol workloads, use `set_inter_bar_config_with_cache()` with a global cache.
375    pub fn set_inter_bar_config(&mut self, config: InterBarConfig) {
376        self.set_inter_bar_config_with_cache(config, None);
377    }
378
379    /// Re-enable inter-bar features with optional external entropy cache (Issue #145 Phase 3).
380    ///
381    /// Used after `from_checkpoint()` to restore microstructure config that
382    /// is not preserved in checkpoint state. Allows specifying a shared entropy cache
383    /// for multi-symbol processors.
384    pub fn set_inter_bar_config_with_cache(
385        &mut self,
386        config: InterBarConfig,
387        external_cache: Option<
388            std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>,
389        >,
390    ) {
391        self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
392        self.inter_bar_config = Some(config);
393    }
394
395    /// Re-enable intra-bar features on an existing processor (Issue #97).
396    pub fn set_intra_bar_features(&mut self, enabled: bool) {
397        self.include_intra_bar_features = enabled;
398    }
399
400    /// Issue #128: Set intra-bar feature configuration.
401    /// Controls which complexity features (Hurst, PE) are computed.
402    pub fn with_intra_bar_config(mut self, config: crate::intrabar::IntraBarConfig) -> Self {
403        self.intra_bar_config = config;
404        self
405    }
406
407    /// Issue #128: Set intra-bar feature configuration on existing processor.
408    pub fn set_intra_bar_config(&mut self, config: crate::intrabar::IntraBarConfig) {
409        self.intra_bar_config = config;
410    }
411
412    /// Process a single trade and return completed bar if any
413    ///
414    /// Maintains internal state for streaming use case. State persists across calls
415    /// until a bar completes (threshold breach), enabling get_incomplete_bar().
416    ///
417    /// # Arguments
418    ///
419    /// * `trade` - Single aggregated trade to process
420    ///
421    /// # Returns
422    ///
423    /// `Some(OpenDeviationBar)` if a bar was completed, `None` otherwise
424    ///
425    /// # State Management
426    ///
427    /// - First trade: Initializes new bar state
428    /// - Subsequent trades: Updates existing bar or closes on breach
429    /// - Breach: Returns completed bar, starts new bar with breaching trade
430    ///
431    /// Issue #96 Task #78: Accept borrowed AggTrade to eliminate clones in fan-out loops.
432    /// Streaming pipelines (4+ thresholds) were cloning ~57 byte trades per processor.
433    /// Signature change to `&AggTrade` eliminates 4-8x unnecessary allocations.
434    /// Issue #96 Task #84: `#[inline]` — main hot-path entry point called on every trade.
435    #[inline]
436    pub fn process_single_trade(
437        &mut self,
438        trade: &AggTrade,
439    ) -> Result<Option<OpenDeviationBar>, ProcessingError> {
440        // Track price and position for checkpoint
441        self.price_window.push(trade.price);
442        self.last_trade_id = Some(trade.agg_trade_id);
443        self.last_timestamp_us = trade.timestamp;
444
445        // Issue #59: Push trade to history buffer for inter-bar feature computation
446        // This must happen BEFORE bar processing so lookback window includes recent trades
447        if let Some(ref mut history) = self.trade_history {
448            history.push(trade);
449        }
450
451        // Issue #46: If previous call triggered a breach, this trade opens the new bar.
452        // This matches the batch path's defer_open semantics - the breaching trade
453        // closes the current bar, and the NEXT trade opens the new bar.
454        if self.defer_open {
455            // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
456            if let Some(ref mut history) = self.trade_history {
457                history.on_bar_open(trade.timestamp);
458            }
459            self.current_bar_state = Some(if self.include_intra_bar_features {
460                OpenDeviationBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
461            } else {
462                OpenDeviationBarState::new(trade, self.threshold_ratio)
463            });
464            self.defer_open = false;
465            return Ok(None);
466        }
467
468        match &mut self.current_bar_state {
469            None => {
470                // First trade - initialize new bar
471                // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
472                if let Some(ref mut history) = self.trade_history {
473                    history.on_bar_open(trade.timestamp);
474                }
475                self.current_bar_state = Some(if self.include_intra_bar_features {
476                    OpenDeviationBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
477                } else {
478                    OpenDeviationBarState::new(trade, self.threshold_ratio)
479                });
480                Ok(None)
481            }
482            Some(bar_state) => {
483                // Issue #59 & #96 Task #44: Accumulate trade for intra-bar features (before breach check)
484                // Only accumulates if features enabled, avoiding unnecessary clones
485                bar_state.accumulate_trade(trade, self.include_intra_bar_features);
486
487                // Check for threshold breach
488                let price_breaches = bar_state.bar.is_breach(
489                    trade.price,
490                    bar_state.upper_threshold,
491                    bar_state.lower_threshold,
492                );
493
494                // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
495                // This eliminates "instant bars" during flash crashes where multiple trades
496                // occur at the same millisecond.
497                let timestamp_allows_close = !self.prevent_same_timestamp_close
498                    || trade.timestamp != bar_state.bar.open_time;
499
500                if price_breaches && timestamp_allows_close {
501                    // Breach detected AND timestamp changed - close current bar
502                    bar_state.bar.update_with_trade(trade);
503
504                    // Validation: Ensure high/low include open/close extremes
505                    debug_assert!(
506                        bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
507                    );
508                    debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
509
510                    // Compute microstructure features at bar finalization (Issue #25)
511                    bar_state.bar.compute_microstructure_features();
512
513                    // Issue #59: Compute inter-bar features from lookback window
514                    // Features are computed from trades BEFORE bar.open_time (no lookahead)
515                    if let Some(ref mut history) = self.trade_history {
516                        let inter_bar_features = history.compute_features(bar_state.bar.open_time);
517                        bar_state.bar.set_inter_bar_features(&inter_bar_features);
518                        // Issue #68: Notify history that bar is closing (resumes normal pruning)
519                        history.on_bar_close();
520                    }
521
522                    // Issue #59: Compute intra-bar features from accumulated trades
523                    if self.include_intra_bar_features {
524                        // Issue #96 Task #173: Use reusable scratch buffers from bar_state
525                        let intra_bar_features =
526                            crate::intrabar::compute_intra_bar_features_with_config(
527                                &bar_state.accumulated_trades,
528                                &mut bar_state.scratch_prices,
529                                &mut bar_state.scratch_volumes,
530                                &self.intra_bar_config, // Issue #128
531                            );
532                        bar_state.bar.set_intra_bar_features(&intra_bar_features);
533                    }
534
535                    // Move bar out instead of cloning — bar_state borrow ends after
536                    // last use above (NLL), so take() is safe here.
537                    let completed_bar = self.current_bar_state.take().unwrap().bar;
538
539                    // v1.4: Track last completed bar's trade ID for dedup floor
540                    self.last_completed_bar_tid = Some(completed_bar.last_agg_trade_id);
541
542                    // Issue #46: Don't start new bar with breaching trade.
543                    // Next trade will open the new bar via defer_open.
544                    self.defer_open = true;
545
546                    Ok(Some(completed_bar))
547                } else {
548                    // Either no breach OR same timestamp (gate active) - update existing bar
549                    bar_state.bar.update_with_trade(trade);
550                    Ok(None)
551                }
552            }
553        }
554    }
555
556    /// Get any incomplete bar currently being processed
557    ///
558    /// Returns clone of current bar state for inspection without consuming it.
559    /// Useful for final bar at stream end or progress monitoring.
560    ///
561    /// # Returns
562    ///
563    /// `Some(OpenDeviationBar)` if bar is in progress, `None` if no active bar
564    pub fn get_incomplete_bar(&self) -> Option<OpenDeviationBar> {
565        self.current_bar_state.as_ref().map(|state| {
566            let mut bar = state.bar.clone();
567            // Issue #275: Compute microstructure features for incomplete bars.
568            // Without this, duration_us (and derived trade_intensity) remain at
569            // their default of 0 when the bar is inspected mid-construction.
570            bar.compute_microstructure_features();
571            bar
572        })
573    }
574
575    /// Get the last processed aggregate trade ID (for gap detection on reconnect).
576    ///
577    /// Returns `None` for fresh processors that have not yet processed any trades.
578    /// After checkpoint restore, returns the last trade ID from the checkpoint.
579    pub fn last_agg_trade_id(&self) -> Option<i64> {
580        self.last_trade_id
581    }
582
583    /// Get the last COMPLETED bar's aggregate trade ID.
584    ///
585    /// Unlike `last_agg_trade_id()` which includes the forming bar tail,
586    /// this returns the trade ID from the most recently completed or orphaned bar.
587    /// Used by committed_floors initialization to avoid suppressing junction bars.
588    ///
589    /// Returns `None` for fresh processors that have not yet completed any bar.
590    pub fn last_completed_bar_tid(&self) -> Option<i64> {
591        self.last_completed_bar_tid
592    }
593
594    /// Process AggTrade records into open deviation bars including incomplete bars for analysis
595    ///
596    /// # Arguments
597    ///
598    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
599    ///
600    /// # Returns
601    ///
602    /// Vector of open deviation bars including incomplete bars at end of data
603    ///
604    /// # Warning
605    ///
606    /// This method is for analysis purposes only. Incomplete bars violate the
607    /// fundamental open deviation bar algorithm and should not be used for production trading.
608    pub fn process_agg_trade_records_with_incomplete(
609        &mut self,
610        agg_trade_records: &[AggTrade],
611    ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
612        self.process_agg_trade_records_with_options(agg_trade_records, true)
613    }
614
615    /// Process Binance aggregated trade records into open deviation bars
616    ///
617    /// This is the primary method for converting AggTrade records (which aggregate
618    /// multiple individual trades) into open deviation bars based on price movement thresholds.
619    ///
620    /// # Parameters
621    ///
622    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
623    ///   Each record represents multiple individual trades aggregated at same price
624    ///
625    /// # Returns
626    ///
627    /// Vector of completed open deviation bars (ONLY bars that breached thresholds).
628    /// Each bar tracks both individual trade count and AggTrade record count.
629    pub fn process_agg_trade_records(
630        &mut self,
631        agg_trade_records: &[AggTrade],
632    ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
633        self.process_agg_trade_records_with_options(agg_trade_records, false)
634    }
635
636    /// Process AggTrade records with options for including incomplete bars
637    ///
638    /// Batch processing mode: Clears any existing state before processing.
639    /// Use process_single_trade() for stateful streaming instead.
640    ///
641    /// # Parameters
642    ///
643    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
644    /// * `include_incomplete` - Whether to include incomplete bars at end of processing
645    ///
646    /// # Returns
647    ///
648    /// Vector of open deviation bars (completed + incomplete if requested)
649    pub fn process_agg_trade_records_with_options(
650        &mut self,
651        agg_trade_records: &[AggTrade],
652        include_incomplete: bool,
653    ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
654        if agg_trade_records.is_empty() {
655            return Ok(Vec::new());
656        }
657
658        // Validate records are sorted
659        self.validate_trade_ordering(agg_trade_records)?;
660
661        // Use existing bar state if resuming from checkpoint, otherwise start fresh
662        // This is CRITICAL for cross-file continuation (Issues #2, #3)
663        let mut current_bar: Option<OpenDeviationBarState> = if self.resumed_from_checkpoint {
664            // Continue from checkpoint's incomplete bar
665            self.resumed_from_checkpoint = false; // Consume the flag
666            let restored_bar = self.current_bar_state.take();
667
668            // Issue #112: Gap-aware checkpoint recovery
669            // If the forming bar's close_time is too far from the first incoming trade,
670            // discard it as an orphan to prevent oversized bars from data gaps.
671            if let Some(ref bar_state) = restored_bar {
672                let first_trade_ts = agg_trade_records[0].timestamp;
673                let gap = first_trade_ts - bar_state.bar.close_time;
674                if gap > self.max_gap_us {
675                    self.anomaly_summary.record_gap();
676                    // Discard forming bar — same treatment as ouroboros reset
677                    None
678                } else {
679                    restored_bar
680                }
681            } else {
682                restored_bar
683            }
684        } else {
685            // Start fresh for normal batch processing
686            self.current_bar_state = None;
687            None
688        };
689
690        let mut bars = Vec::with_capacity(agg_trade_records.len() / 50); // Heuristic: 50 trades/bar covers consolidation regimes
691        let mut defer_open = false;
692
693        for agg_record in agg_trade_records {
694            // Track price and position for checkpoint
695            self.price_window.push(agg_record.price);
696            self.last_trade_id = Some(agg_record.agg_trade_id);
697            self.last_timestamp_us = agg_record.timestamp;
698
699            // Issue #59: Push trade to history buffer for inter-bar feature computation
700            if let Some(ref mut history) = self.trade_history {
701                history.push(agg_record);
702            }
703
704            if defer_open {
705                // Previous bar closed, this agg_record opens new bar
706                // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
707                if let Some(ref mut history) = self.trade_history {
708                    history.on_bar_open(agg_record.timestamp);
709                }
710                current_bar = Some(if self.include_intra_bar_features {
711                    OpenDeviationBarState::new_with_trade_accumulation(
712                        agg_record,
713                        self.threshold_ratio,
714                    )
715                } else {
716                    OpenDeviationBarState::new(agg_record, self.threshold_ratio)
717                });
718                defer_open = false;
719                continue;
720            }
721
722            match current_bar {
723                None => {
724                    // First bar initialization
725                    // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
726                    if let Some(ref mut history) = self.trade_history {
727                        history.on_bar_open(agg_record.timestamp);
728                    }
729                    current_bar = Some(if self.include_intra_bar_features {
730                        OpenDeviationBarState::new_with_trade_accumulation(
731                            agg_record,
732                            self.threshold_ratio,
733                        )
734                    } else {
735                        OpenDeviationBarState::new(agg_record, self.threshold_ratio)
736                    });
737                }
738                Some(ref mut bar_state) => {
739                    // Issue #59 & #96 Task #44: Accumulate trade for intra-bar features (before breach check)
740                    // Only accumulates if features enabled, avoiding unnecessary clones
741                    bar_state.accumulate_trade(agg_record, self.include_intra_bar_features);
742
743                    // Check if this AggTrade record breaches the threshold
744                    let price_breaches = bar_state.bar.is_breach(
745                        agg_record.price,
746                        bar_state.upper_threshold,
747                        bar_state.lower_threshold,
748                    );
749
750                    // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
751                    // This eliminates "instant bars" during flash crashes where multiple trades
752                    // occur at the same millisecond.
753                    let timestamp_allows_close = !self.prevent_same_timestamp_close
754                        || agg_record.timestamp != bar_state.bar.open_time;
755
756                    if price_breaches && timestamp_allows_close {
757                        // Breach detected AND timestamp changed - update bar with breaching record
758                        bar_state.bar.update_with_trade(agg_record);
759
760                        // Validation: Ensure high/low include open/close extremes
761                        debug_assert!(
762                            bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
763                        );
764                        debug_assert!(
765                            bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
766                        );
767
768                        // Compute microstructure features at bar finalization (Issue #34)
769                        bar_state.bar.compute_microstructure_features();
770
771                        // Issue #59: Compute inter-bar features from lookback window
772                        if let Some(ref mut history) = self.trade_history {
773                            let inter_bar_features =
774                                history.compute_features(bar_state.bar.open_time);
775                            bar_state.bar.set_inter_bar_features(&inter_bar_features);
776                            // Issue #68: Notify history that bar is closing (resumes normal pruning)
777                            history.on_bar_close();
778                        }
779
780                        // Issue #59: Compute intra-bar features from accumulated trades
781                        if self.include_intra_bar_features {
782                            // Issue #96 Task #173: Use reusable scratch buffers from bar_state
783                            let intra_bar_features =
784                                crate::intrabar::compute_intra_bar_features_with_config(
785                                    &bar_state.accumulated_trades,
786                                    &mut bar_state.scratch_prices,
787                                    &mut bar_state.scratch_volumes,
788                                    &self.intra_bar_config, // Issue #128
789                                );
790                            bar_state.bar.set_intra_bar_features(&intra_bar_features);
791                        }
792
793                        // Move bar out instead of cloning — bar_state borrow ends
794                        // after last use above (NLL), so take() is safe here.
795                        bars.push(current_bar.take().unwrap().bar);
796                        // v1.4: Track last completed bar's trade ID for dedup floor
797                        self.last_completed_bar_tid = Some(bars.last().unwrap().last_agg_trade_id);
798                        defer_open = true; // Next record will open new bar
799                    } else {
800                        // Either no breach OR same timestamp (gate active) - normal update
801                        bar_state.bar.update_with_trade(agg_record);
802                    }
803                }
804            }
805        }
806
807        // Save current bar state for checkpoint and optionally append incomplete bar.
808        // When include_incomplete=true, clone for checkpoint then consume for output.
809        // When include_incomplete=false, move directly (no clone needed).
810        if include_incomplete {
811            // Issue #96 Task #95: Optimize checkpoint cloning with take() to avoid Vec allocation
812            // (accumulated_trades not needed after intra-bar features computed).
813            // Using std::mem::take() instead of clone+clear reduces allocation overhead.
814            if let Some(ref state) = current_bar {
815                // Construct checkpoint state without cloning accumulated_trades/scratch buffers
816                // (they're not needed for checkpoint restoration). Avoids cloning ~3.5KB inline SmallVec.
817                self.current_bar_state = Some(OpenDeviationBarState {
818                    bar: state.bar.clone(),
819                    upper_threshold: state.upper_threshold,
820                    lower_threshold: state.lower_threshold,
821                    accumulated_trades: SmallVec::new(),
822                    scratch_prices: SmallVec::new(),
823                    scratch_volumes: SmallVec::new(),
824                });
825            }
826
827            // Add final partial bar only if explicitly requested
828            // This preserves algorithm integrity: bars should only close on threshold breach
829            if let Some(mut bar_state) = current_bar {
830                // Compute microstructure features for incomplete bar (Issue #34)
831                bar_state.bar.compute_microstructure_features();
832
833                // Issue #59: Compute inter-bar features from lookback window
834                if let Some(ref history) = self.trade_history {
835                    let inter_bar_features = history.compute_features(bar_state.bar.open_time);
836                    bar_state.bar.set_inter_bar_features(&inter_bar_features);
837                }
838
839                // Issue #59: Compute intra-bar features from accumulated trades
840                if self.include_intra_bar_features {
841                    // Issue #96 Task #173: Use reusable scratch buffers from bar_state
842                    let intra_bar_features =
843                        crate::intrabar::compute_intra_bar_features_with_config(
844                            &bar_state.accumulated_trades,
845                            &mut bar_state.scratch_prices,
846                            &mut bar_state.scratch_volumes,
847                            &self.intra_bar_config, // Issue #128
848                        );
849                    bar_state.bar.set_intra_bar_features(&intra_bar_features);
850                }
851
852                bars.push(bar_state.bar);
853            }
854        } else {
855            // No incomplete bar appended — move ownership directly, no clone needed
856            self.current_bar_state = current_bar;
857        }
858
859        Ok(bars)
860    }
861
862    // === CHECKPOINT METHODS ===
863
864    /// Create checkpoint for cross-file continuation
865    ///
866    /// Captures current processing state for seamless continuation:
867    /// - Incomplete bar (if any) with FIXED thresholds
868    /// - Position tracking (timestamp, trade_id if available)
869    /// - Price hash for verification
870    ///
871    /// # Arguments
872    ///
873    /// * `symbol` - Symbol being processed (e.g., "BTCUSDT", "EURUSD")
874    ///
875    /// # Example
876    ///
877    /// ```ignore
878    /// let bars = processor.process_agg_trade_records(&trades)?;
879    /// let checkpoint = processor.create_checkpoint("BTCUSDT");
880    /// let json = serde_json::to_string(&checkpoint)?;
881    /// std::fs::write("checkpoint.json", json)?;
882    /// ```
883    pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
884        let (incomplete_bar, thresholds) = match &self.current_bar_state {
885            Some(state) => (
886                Some(state.bar.clone()),
887                Some((state.upper_threshold, state.lower_threshold)),
888            ),
889            None => (None, None),
890        };
891
892        let mut checkpoint = Checkpoint::new(
893            symbol.to_string(),
894            self.threshold_decimal_bps,
895            incomplete_bar,
896            thresholds,
897            self.last_timestamp_us,
898            self.last_trade_id,
899            self.price_window.compute_hash(),
900            self.prevent_same_timestamp_close,
901        );
902        // Issue #46: Persist defer_open state for cross-session continuity
903        checkpoint.defer_open = self.defer_open;
904        // v1.4: Persist last_completed_bar_tid for dedup floor initialization
905        checkpoint.last_completed_bar_tid = self.last_completed_bar_tid;
906        checkpoint
907    }
908
909    /// Resume processing from checkpoint
910    ///
911    /// Restores incomplete bar state with IMMUTABLE thresholds.
912    /// Next trade continues building the bar until threshold breach.
913    ///
914    /// # Errors
915    ///
916    /// - `CheckpointError::MissingThresholds` - Checkpoint has bar but no thresholds
917    ///
918    /// # Example
919    ///
920    /// ```ignore
921    /// let json = std::fs::read_to_string("checkpoint.json")?;
922    /// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
923    /// let mut processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint)?;
924    /// let bars = processor.process_agg_trade_records(&next_file_trades)?;
925    /// ```
926    pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
927        // Issue #85 Phase 2: Apply checkpoint schema migration if needed
928        let checkpoint = Self::migrate_checkpoint(checkpoint);
929
930        // Issue #62: Validate threshold range before restoring from checkpoint
931        // Valid range: 1-100,000 dbps (0.0001% to 10%)
932        const THRESHOLD_MIN: u32 = 1;
933        const THRESHOLD_MAX: u32 = 100_000;
934        if checkpoint.threshold_decimal_bps < THRESHOLD_MIN
935            || checkpoint.threshold_decimal_bps > THRESHOLD_MAX
936        {
937            return Err(CheckpointError::InvalidThreshold {
938                threshold: checkpoint.threshold_decimal_bps,
939                min_threshold: THRESHOLD_MIN,
940                max_threshold: THRESHOLD_MAX,
941            });
942        }
943
944        // Validate checkpoint consistency
945        if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
946            return Err(CheckpointError::MissingThresholds);
947        }
948
949        // Restore bar state if there's an incomplete bar
950        // Note: accumulated_trades is reset to empty - intra-bar features won't be
951        // accurate for bars resumed from checkpoint (partial trade history lost)
952        let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
953            (Some(bar), Some((upper, lower))) => Some(OpenDeviationBarState {
954                bar,
955                upper_threshold: upper,
956                lower_threshold: lower,
957                accumulated_trades: SmallVec::new(), // Lost on checkpoint - features may be partial
958                scratch_prices: SmallVec::new(),
959                scratch_volumes: SmallVec::new(),
960            }),
961            _ => None,
962        };
963
964        // Issue #96 Task #98: Pre-compute threshold ratio (same as with_options)
965        let threshold_ratio = ((checkpoint.threshold_decimal_bps as i64)
966            * crate::fixed_point::SCALE)
967            / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
968
969        Ok(Self {
970            threshold_decimal_bps: checkpoint.threshold_decimal_bps,
971            threshold_ratio,
972            current_bar_state,
973            price_window: PriceWindow::new(), // Reset - will be rebuilt from new trades
974            last_trade_id: checkpoint.last_trade_id,
975            last_timestamp_us: checkpoint.last_timestamp_us,
976            anomaly_summary: checkpoint.anomaly_summary,
977            resumed_from_checkpoint: true, // Signal to continue from existing bar state
978            prevent_same_timestamp_close: checkpoint.prevent_same_timestamp_close,
979            defer_open: checkpoint.defer_open, // Issue #46: Restore deferred open state
980            last_completed_bar_tid: checkpoint.last_completed_bar_tid, // v1.4: Restore dedup floor
981            trade_history: None,               // Issue #59: Must be re-enabled after restore
982            inter_bar_config: None,            // Issue #59: Must be re-enabled after restore
983            include_intra_bar_features: false, // Issue #59: Must be re-enabled after restore
984            intra_bar_config: crate::intrabar::IntraBarConfig::default(), // Issue #128
985            max_gap_us: 3_600_000_000,         // Issue #112: 1 hour default
986        })
987    }
988
989    /// Migrate checkpoint between schema versions
990    /// Issue #85 Phase 2: Handle v1 → v2 migration
991    /// Safe: JSON deserialization is field-name-based, so old v1 checkpoints load correctly
992    fn migrate_checkpoint(mut checkpoint: Checkpoint) -> Checkpoint {
993        match checkpoint.version {
994            1 => {
995                // v1 → v2: OpenDeviationBar struct field reordering (no behavioral changes)
996                // JSON serialization is position-independent, so no transformation needed
997                checkpoint.version = 2;
998                checkpoint
999            }
1000            2 => {
1001                // Already current version
1002                checkpoint
1003            }
1004            _ => {
1005                // Unknown version - log warning and continue with best effort
1006                eprintln!(
1007                    "Warning: Checkpoint has unknown version {}, treating as v2",
1008                    checkpoint.version
1009                );
1010                checkpoint.version = 2;
1011                checkpoint
1012            }
1013        }
1014    }
1015
1016    /// Verify we're at the right position in the data stream
1017    ///
1018    /// Call with first trade of new file to verify continuity.
1019    /// Returns verification result indicating if there's a gap or exact match.
1020    ///
1021    /// # Arguments
1022    ///
1023    /// * `first_trade` - First trade of the new file/chunk
1024    ///
1025    /// # Example
1026    ///
1027    /// ```ignore
1028    /// let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint)?;
1029    /// let verification = processor.verify_position(&next_file_trades[0]);
1030    /// match verification {
1031    ///     PositionVerification::Exact => println!("Perfect continuation!"),
1032    ///     PositionVerification::Gap { missing_count, .. } => {
1033    ///         println!("Warning: {} trades missing", missing_count);
1034    ///     }
1035    ///     PositionVerification::TimestampOnly { gap_ms } => {
1036    ///         println!("Exness data: {}ms gap", gap_ms);
1037    ///     }
1038    /// }
1039    /// ```
1040    pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
1041        match self.last_trade_id {
1042            Some(last_id) => {
1043                // Binance: has trade IDs - check for gaps
1044                let expected_id = last_id + 1;
1045                if first_trade.agg_trade_id == expected_id {
1046                    PositionVerification::Exact
1047                } else {
1048                    let missing_count = first_trade.agg_trade_id - expected_id;
1049                    PositionVerification::Gap {
1050                        expected_id,
1051                        actual_id: first_trade.agg_trade_id,
1052                        missing_count,
1053                    }
1054                }
1055            }
1056            None => {
1057                // Exness: no trade IDs - use timestamp only
1058                let gap_us = first_trade.timestamp - self.last_timestamp_us;
1059                let gap_ms = gap_us / 1000;
1060                PositionVerification::TimestampOnly { gap_ms }
1061            }
1062        }
1063    }
1064
1065    /// Get the current anomaly summary
1066    pub fn anomaly_summary(&self) -> &AnomalySummary {
1067        &self.anomaly_summary
1068    }
1069
1070    /// Get the threshold in decimal basis points
1071    pub fn threshold_decimal_bps(&self) -> u32 {
1072        self.threshold_decimal_bps
1073    }
1074
1075    /// Validate that trades are properly sorted for deterministic processing
1076    ///
1077    /// Issue #96 Task #62: Early-exit optimization for sorted data
1078    /// For typical workloads (95%+ sorted), quick first/last check identifies
1079    /// unsorted batches immediately without full O(n) validation.
1080    fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
1081        if trades.is_empty() {
1082            return Ok(());
1083        }
1084
1085        // Issue #96 Task #62: Fast-path check for obviously unsorted data
1086        // If first and last trades are not ordered, data is definitely unsorted
1087        // This early-exit catches common failures without full validation
1088        let first = &trades[0];
1089        let last = &trades[trades.len() - 1];
1090
1091        if last.timestamp < first.timestamp
1092            || (last.timestamp == first.timestamp && last.agg_trade_id <= first.agg_trade_id)
1093        {
1094            // Definitely unsorted - find exact error location (cold path)
1095            return find_unsorted_trade(trades);
1096        }
1097
1098        // Full validation for typical sorted case
1099        for i in 1..trades.len() {
1100            let prev = &trades[i - 1];
1101            let curr = &trades[i];
1102
1103            // Check ordering: (timestamp, agg_trade_id) ascending
1104            if curr.timestamp < prev.timestamp
1105                || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
1106            {
1107                return unsorted_trade_error(i, prev, curr);
1108            }
1109        }
1110
1111        Ok(())
1112    }
1113
1114    /// Reset processor state at a UTC-midnight ouroboros boundary (day mode only).
1115    ///
1116    /// Clears the incomplete bar and position tracking while preserving
1117    /// the threshold configuration. Use this when starting fresh at a
1118    /// known boundary for reproducibility.
1119    ///
1120    /// # Returns
1121    ///
1122    /// The orphaned incomplete bar (if any) so caller can decide
1123    /// whether to include it in results with `is_orphan=True` flag.
1124    ///
1125    /// # Example
1126    ///
1127    /// ```ignore
1128    /// // At year boundary (Jan 1 00:00:00 UTC)
1129    /// let orphaned = processor.reset_at_ouroboros();
1130    /// if let Some(bar) = orphaned {
1131    ///     // Handle incomplete bar from previous year
1132    /// }
1133    /// // Continue processing new year's data with clean state
1134    /// ```
1135    pub fn reset_at_ouroboros(&mut self) -> Option<OpenDeviationBar> {
1136        let orphaned = self.current_bar_state.take().map(|mut state| {
1137            // Issue #275: Compute microstructure features for orphan bars.
1138            // Without this, duration_us (and derived trade_intensity) remain at
1139            // their default of 0, even though close_time − open_time > 0.
1140            state.bar.compute_microstructure_features();
1141            state.bar
1142        });
1143        // v1.4: Track orphan bar's trade ID for dedup floor (orphan IS a completed bar)
1144        // Do NOT reset last_completed_bar_tid to None — it persists across day boundaries
1145        if let Some(ref bar) = orphaned {
1146            self.last_completed_bar_tid = Some(bar.last_agg_trade_id);
1147        }
1148        self.price_window = PriceWindow::new();
1149        self.last_trade_id = None;
1150        self.last_timestamp_us = 0;
1151        self.resumed_from_checkpoint = false;
1152        self.defer_open = false;
1153        // Issue #81: Clear bar boundary tracking at ouroboros reset.
1154        // Trades are preserved — still valid lookback for first bar of new segment.
1155        if let Some(ref mut history) = self.trade_history {
1156            history.reset_bar_boundaries();
1157        }
1158        orphaned
1159    }
1160}
1161
1162/// Internal state for a open deviation bar being built
1163#[derive(Clone)]
1164struct OpenDeviationBarState {
1165    /// The open deviation bar being constructed
1166    pub bar: OpenDeviationBar,
1167
1168    /// Upper breach threshold (FIXED from bar open)
1169    pub upper_threshold: FixedPoint,
1170
1171    /// Lower breach threshold (FIXED from bar open)
1172    pub lower_threshold: FixedPoint,
1173
1174    /// Accumulated trades for intra-bar feature computation (Issue #59)
1175    ///
1176    /// When intra-bar features are enabled, trades are accumulated here
1177    /// during bar construction and used to compute features at bar close.
1178    /// Cleared when bar closes to free memory.
1179    /// Issue #136: Optimized from 512→64→48 slots.
1180    /// Profile data: max trades/bar = 26 (P99 = 14), so 64 slots provides
1181    /// 2.46x safety margin. SmallVec transparently spills to heap if exceeded.
1182    pub accumulated_trades: SmallVec<[AggTrade; 64]>,
1183
1184    /// Issue #96: Scratch buffer for intra-bar price extraction
1185    /// SmallVec<[f64; 64]> keeps 95%+ of bars on stack (P99 trades/bar = 14, max = 26)
1186    /// Eliminates heap allocation for typical bars, spills transparently for large ones
1187    pub scratch_prices: SmallVec<[f64; 64]>,
1188
1189    /// Issue #96: Scratch buffer for intra-bar volume extraction
1190    /// Same sizing rationale as scratch_prices
1191    pub scratch_volumes: SmallVec<[f64; 64]>,
1192}
1193
1194impl OpenDeviationBarState {
1195    /// Create new open deviation bar state from opening trade
1196    /// Issue #96 Task #98: Accept pre-computed threshold_ratio for fast threshold calculation
1197    #[inline]
1198    fn new(trade: &AggTrade, threshold_ratio: i64) -> Self {
1199        let bar = OpenDeviationBar::new(trade);
1200
1201        // Issue #96 Task #98: Use cached ratio instead of repeated division
1202        // This avoids BASIS_POINTS_SCALE division on every bar creation
1203        let (upper_threshold, lower_threshold) =
1204            bar.open.compute_range_thresholds_cached(threshold_ratio);
1205
1206        Self {
1207            bar,
1208            upper_threshold,
1209            lower_threshold,
1210            accumulated_trades: SmallVec::new(),
1211            scratch_prices: SmallVec::new(),
1212            scratch_volumes: SmallVec::new(),
1213        }
1214    }
1215
1216    /// Create new open deviation bar state with intra-bar feature accumulation
1217    /// Issue #96 Task #98: Accept pre-computed threshold_ratio for fast threshold calculation
1218    #[inline]
1219    fn new_with_trade_accumulation(trade: &AggTrade, threshold_ratio: i64) -> Self {
1220        let bar = OpenDeviationBar::new(trade);
1221
1222        // Issue #96 Task #98: Use cached ratio instead of repeated division
1223        // This avoids BASIS_POINTS_SCALE division on every bar creation
1224        let (upper_threshold, lower_threshold) =
1225            bar.open.compute_range_thresholds_cached(threshold_ratio);
1226
1227        Self {
1228            bar,
1229            upper_threshold,
1230            lower_threshold,
1231            accumulated_trades: {
1232                let mut sv = SmallVec::new();
1233                sv.push(*trade);
1234                sv
1235            },
1236            scratch_prices: SmallVec::new(),
1237            scratch_volumes: SmallVec::new(),
1238        }
1239    }
1240
1241    /// Accumulate a trade for intra-bar feature computation
1242    ///
1243    /// Issue #96 Task #44: Only accumulates if intra-bar features are enabled,
1244    /// avoiding unnecessary clones for the majority of use cases where they're disabled.
1245    /// Issue #96 Task #79: #[inline] allows compiler to fold invariant branch
1246    /// (include_intra is constant for processor lifetime)
1247    #[inline]
1248    fn accumulate_trade(&mut self, trade: &AggTrade, include_intra: bool) {
1249        if include_intra {
1250            self.accumulated_trades.push(*trade);
1251        }
1252    }
1253}