opendeviationbar_core/processor.rs
1// FILE-SIZE-OK: 1184 lines — tests extracted to tests/processor_tests.rs
2//! Core open deviation bar processing algorithm
3//!
4//! Implements non-lookahead bias open deviation bar construction where bars close when
5//! price moves ±threshold dbps from the bar's OPEN price.
6
7use crate::checkpoint::{
8 AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
9};
10use crate::fixed_point::FixedPoint;
11use crate::interbar::{InterBarConfig, TradeHistory}; // Issue #59
12// Issue #59: Intra-bar features - using compute_intra_bar_features_with_config (Issue #128)
13use crate::types::{AggTrade, OpenDeviationBar};
14#[cfg(feature = "python")]
15use pyo3::prelude::*;
16use smallvec::SmallVec; // Issue #119: Trade accumulation with inline buffer (512 slots ≈ 29KB)
17// Re-export ProcessingError from errors.rs (Phase 2a extraction)
18pub use crate::errors::ProcessingError;
19// Re-export ExportOpenDeviationBarProcessor from export_processor.rs (Phase 2d extraction)
20pub use crate::export_processor::ExportOpenDeviationBarProcessor;
21
22/// Open deviation bar processor with non-lookahead bias guarantee
23pub struct OpenDeviationBarProcessor {
24 /// Threshold in decimal basis points (250 = 25bps, v3.0.0+)
25 threshold_decimal_bps: u32,
26
27 /// Issue #96 Task #98: Pre-computed threshold ratio for fast delta calculation
28 /// Stores (threshold_decimal_bps * SCALE) / BASIS_POINTS_SCALE as fixed-point
29 /// This allows compute_range_thresholds() to compute delta = (price * ratio) / SCALE
30 /// without repeated division, avoiding i128 arithmetic in hot path.
31 /// Performance: Eliminates BASIS_POINTS_SCALE division on every bar creation.
32 /// Made public for testing purposes.
33 pub threshold_ratio: i64,
34
35 /// Current bar state for streaming processing (Q19)
36 /// Enables get_incomplete_bar() and stateful process_single_trade()
37 current_bar_state: Option<OpenDeviationBarState>,
38
39 /// Price window for checkpoint hash verification
40 price_window: PriceWindow,
41
42 /// Last processed trade ID (for gap detection on resume)
43 last_trade_id: Option<i64>,
44
45 /// Last completed bar's trade ID for dedup floor initialization (v1.4)
46 ///
47 /// Updated ONLY on bar completion (process_single_trade returns Some) and
48 /// orphan emission (reset_at_ouroboros). NOT updated on every trade.
49 /// This gives committed_floors a floor that excludes the forming bar tail,
50 /// preventing suppression of bars at the REST-to-WS junction.
51 last_completed_bar_tid: Option<i64>,
52
53 /// Last processed timestamp (for position verification)
54 last_timestamp_us: i64,
55
56 /// Anomaly tracking for debugging
57 anomaly_summary: AnomalySummary,
58
59 /// Flag indicating this processor was created from a checkpoint
60 /// When true, process_agg_trade_records will continue from existing bar state
61 resumed_from_checkpoint: bool,
62
63 /// Prevent bars from closing on same timestamp as they opened (Issue #36)
64 ///
65 /// When true (default): A bar cannot close until a trade arrives with a
66 /// different timestamp than the bar's open_time. This prevents "instant bars"
67 /// during flash crashes where multiple trades occur at the same millisecond.
68 ///
69 /// When false: Legacy behavior - bars can close on any breach regardless
70 /// of timestamp, which may produce bars with identical timestamps.
71 prevent_same_timestamp_close: bool,
72
73 /// Deferred bar open flag (Issue #46)
74 ///
75 /// When true: The previous trade triggered a threshold breach and closed a bar.
76 /// The next trade arriving via `process_single_trade()` should open a new bar
77 /// instead of being treated as a continuation.
78 ///
79 /// This matches the batch path's `defer_open` semantics in
80 /// `process_agg_trade_records()` where the breaching trade closes the current
81 /// bar and the NEXT trade opens the new bar.
82 defer_open: bool,
83
84 /// Trade history for inter-bar feature computation (Issue #59)
85 ///
86 /// Ring buffer of recent trades for computing lookback-based features.
87 /// When Some, features are computed from trades BEFORE each bar's open_time.
88 /// When None, inter-bar features are disabled (all lookback_* fields = None).
89 trade_history: Option<TradeHistory>,
90
91 /// Configuration for inter-bar features (Issue #59)
92 ///
93 /// Controls lookback mode (fixed count or time window) and which feature
94 /// tiers to compute. When None, inter-bar features are disabled.
95 inter_bar_config: Option<InterBarConfig>,
96
97 /// Enable intra-bar feature computation (Issue #59)
98 ///
99 /// When true, the processor accumulates trades during bar construction
100 /// and computes 22 features from trades WITHIN each bar at bar close.
101 /// Features include ITH (Investment Time Horizon), statistical, and
102 /// complexity metrics. When false, all intra_* fields are None.
103 include_intra_bar_features: bool,
104
105 /// Issue #128: Configuration for intra-bar feature computation.
106 /// Controls which complexity features (Hurst, PE) are computed.
107 intra_bar_config: crate::intrabar::IntraBarConfig,
108
109 /// Issue #112: Maximum timestamp gap in microseconds before discarding a forming bar
110 ///
111 /// When resuming from checkpoint with a forming bar, if the gap between
112 /// the forming bar's close_time and the first incoming trade exceeds this
113 /// threshold, the forming bar is discarded as an orphan (same as ouroboros reset).
114 /// This prevents "oversized" bars caused by large data gaps (e.g., 38-hour outages).
115 ///
116 /// Default: 3,600,000,000 μs (1 hour)
117 max_gap_us: i64,
118}
119
120/// Cold path: scan trades to find first unsorted pair and return error
121/// Extracted from validate_trade_ordering() to improve hot-path code layout
122#[cold]
123#[inline(never)]
124fn find_unsorted_trade(trades: &[AggTrade]) -> Result<(), ProcessingError> {
125 for i in 1..trades.len() {
126 let prev = &trades[i - 1];
127 let curr = &trades[i];
128 if curr.timestamp < prev.timestamp
129 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
130 {
131 return Err(ProcessingError::UnsortedTrades {
132 index: i,
133 prev_time: prev.timestamp,
134 prev_id: prev.agg_trade_id,
135 curr_time: curr.timestamp,
136 curr_id: curr.agg_trade_id,
137 });
138 }
139 }
140 Ok(())
141}
142
143/// Cold path: construct unsorted trade error
144/// Extracted to keep error construction out of the hot validation loop
145#[cold]
146#[inline(never)]
147fn unsorted_trade_error(
148 index: usize,
149 prev: &AggTrade,
150 curr: &AggTrade,
151) -> Result<(), ProcessingError> {
152 Err(ProcessingError::UnsortedTrades {
153 index,
154 prev_time: prev.timestamp,
155 prev_id: prev.agg_trade_id,
156 curr_time: curr.timestamp,
157 curr_id: curr.agg_trade_id,
158 })
159}
160
161impl OpenDeviationBarProcessor {
162 /// Create new processor with given threshold
163 ///
164 /// Uses default behavior: `prevent_same_timestamp_close = true` (Issue #36)
165 ///
166 /// # Arguments
167 ///
168 /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
169 /// - Example: `250` → 25bps = 0.25%
170 /// - Example: `10` → 1bps = 0.01%
171 /// - Minimum: `1` → 0.1bps = 0.001%
172 ///
173 /// # Breaking Change (v3.0.0)
174 ///
175 /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
176 /// **Migration**: Multiply all threshold values by 10.
177 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
178 Self::with_options(threshold_decimal_bps, true)
179 }
180
181 /// Create new processor with explicit timestamp gating control
182 ///
183 /// # Arguments
184 ///
185 /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
186 /// * `prevent_same_timestamp_close` - If true, bars cannot close until
187 /// timestamp advances from open_time. This prevents "instant bars" during
188 /// flash crashes. Set to false for legacy behavior (pre-v9).
189 ///
190 /// # Example
191 ///
192 /// ```ignore
193 /// // Default behavior (v9+): timestamp gating enabled
194 /// let processor = OpenDeviationBarProcessor::new(250)?;
195 ///
196 /// // Legacy behavior: allow instant bars
197 /// let processor = OpenDeviationBarProcessor::with_options(250, false)?;
198 /// ```
199 pub fn with_options(
200 threshold_decimal_bps: u32,
201 prevent_same_timestamp_close: bool,
202 ) -> Result<Self, ProcessingError> {
203 // Validation bounds (v3.0.0: dbps units)
204 // Min: 1 dbps = 0.001%
205 // Max: 100,000 dbps = 100%
206 if threshold_decimal_bps < 1 {
207 return Err(ProcessingError::InvalidThreshold {
208 threshold_decimal_bps,
209 });
210 }
211 if threshold_decimal_bps > 100_000 {
212 return Err(ProcessingError::InvalidThreshold {
213 threshold_decimal_bps,
214 });
215 }
216
217 // Issue #96 Task #98: Pre-compute threshold ratio
218 // Ratio = (threshold_decimal_bps * SCALE) / BASIS_POINTS_SCALE
219 // This is used in compute_range_thresholds() for fast delta calculation
220 let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
221 / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
222
223 Ok(Self {
224 threshold_decimal_bps,
225 threshold_ratio,
226 current_bar_state: None,
227 price_window: PriceWindow::new(),
228 last_trade_id: None,
229 last_completed_bar_tid: None,
230 last_timestamp_us: 0,
231 anomaly_summary: AnomalySummary::default(),
232 resumed_from_checkpoint: false,
233 prevent_same_timestamp_close,
234 defer_open: false,
235 trade_history: None, // Issue #59: disabled by default
236 inter_bar_config: None, // Issue #59: disabled by default
237 include_intra_bar_features: false, // Issue #59: disabled by default
238 intra_bar_config: crate::intrabar::IntraBarConfig::default(), // Issue #128
239 max_gap_us: 3_600_000_000, // Issue #112: 1 hour default
240 })
241 }
242
243 /// Get the prevent_same_timestamp_close setting
244 pub fn prevent_same_timestamp_close(&self) -> bool {
245 self.prevent_same_timestamp_close
246 }
247
248 /// Enable inter-bar feature computation with the given configuration (Issue #59)
249 ///
250 /// When enabled, the processor maintains a trade history buffer and computes
251 /// lookback-based microstructure features on each bar close. Features are
252 /// computed from trades that occurred BEFORE each bar's open_time, ensuring
253 /// no lookahead bias.
254 ///
255 /// Uses a local entropy cache (default behavior, backward compatible).
256 /// For multi-symbol workloads, use `with_inter_bar_config_and_cache()` with a global cache.
257 ///
258 /// # Arguments
259 ///
260 /// * `config` - Configuration controlling lookback mode and feature tiers
261 ///
262 /// # Example
263 ///
264 /// ```ignore
265 /// use opendeviationbar_core::processor::OpenDeviationBarProcessor;
266 /// use opendeviationbar_core::interbar::{InterBarConfig, LookbackMode};
267 ///
268 /// let processor = OpenDeviationBarProcessor::new(1000)?
269 /// .with_inter_bar_config(InterBarConfig {
270 /// lookback_mode: LookbackMode::FixedCount(500),
271 /// compute_tier2: true,
272 /// compute_tier3: true,
273 /// ..Default::default()
274 /// });
275 /// ```
276 pub fn with_inter_bar_config(self, config: InterBarConfig) -> Self {
277 self.with_inter_bar_config_and_cache(config, None)
278 }
279
280 /// Enable inter-bar feature computation with optional external entropy cache
281 ///
282 /// Issue #145 Phase 3: Multi-Symbol Entropy Cache Sharing
283 ///
284 /// # Arguments
285 ///
286 /// * `config` - Configuration controlling lookback mode and feature tiers
287 /// * `external_cache` - Optional shared entropy cache from `get_global_entropy_cache()`
288 /// - If provided: Uses the shared global cache (recommended for multi-symbol)
289 /// - If None: Creates a local 128-entry cache (default, backward compatible)
290 ///
291 /// # Usage
292 ///
293 /// ```ignore
294 /// use opendeviationbar_core::{processor::OpenDeviationBarProcessor, entropy_cache_global::get_global_entropy_cache, interbar::InterBarConfig};
295 ///
296 /// // Single-symbol: use local cache (default)
297 /// let processor = OpenDeviationBarProcessor::new(1000)?
298 /// .with_inter_bar_config(config);
299 ///
300 /// // Multi-symbol: share global cache
301 /// let global_cache = get_global_entropy_cache();
302 /// let processor = OpenDeviationBarProcessor::new(1000)?
303 /// .with_inter_bar_config_and_cache(config, Some(global_cache));
304 /// ```
305 pub fn with_inter_bar_config_and_cache(
306 mut self,
307 config: InterBarConfig,
308 external_cache: Option<
309 std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>,
310 >,
311 ) -> Self {
312 self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
313 self.inter_bar_config = Some(config);
314 self
315 }
316
317 /// Check if inter-bar features are enabled
318 pub fn inter_bar_enabled(&self) -> bool {
319 self.inter_bar_config.is_some()
320 }
321
322 /// Issue #112: Configure maximum timestamp gap for checkpoint recovery
323 ///
324 /// When resuming from checkpoint with a forming bar, if the gap between
325 /// the forming bar's close_time and the first incoming trade exceeds this
326 /// threshold, the forming bar is discarded as an orphan.
327 ///
328 /// # Arguments
329 ///
330 /// * `max_gap_us` - Maximum gap in microseconds (default: 3,600,000,000 = 1 hour)
331 pub fn with_max_gap(mut self, max_gap_us: i64) -> Self {
332 self.max_gap_us = max_gap_us;
333 self
334 }
335
336 /// Get the maximum gap threshold in microseconds
337 pub fn max_gap_us(&self) -> i64 {
338 self.max_gap_us
339 }
340
341 /// Enable intra-bar feature computation (Issue #59)
342 ///
343 /// When enabled, the processor accumulates trades during bar construction
344 /// and computes 22 features from trades WITHIN each bar at bar close:
345 /// - 8 ITH features (Investment Time Horizon)
346 /// - 12 statistical features (OFI, intensity, Kyle lambda, etc.)
347 /// - 2 complexity features (Hurst exponent, permutation entropy)
348 ///
349 /// # Memory Note
350 ///
351 /// Trades are accumulated per-bar and freed when the bar closes.
352 /// Typical 1000 dbps bar: ~50-500 trades, ~2-24 KB overhead.
353 ///
354 /// # Example
355 ///
356 /// ```ignore
357 /// let processor = OpenDeviationBarProcessor::new(1000)?
358 /// .with_intra_bar_features();
359 /// ```
360 pub fn with_intra_bar_features(mut self) -> Self {
361 self.include_intra_bar_features = true;
362 self
363 }
364
365 /// Check if intra-bar features are enabled
366 pub fn intra_bar_enabled(&self) -> bool {
367 self.include_intra_bar_features
368 }
369
370 /// Re-enable inter-bar features on an existing processor (Issue #97).
371 ///
372 /// Used after `from_checkpoint()` to restore microstructure config that
373 /// is not preserved in checkpoint state. Uses a local entropy cache by default.
374 /// For multi-symbol workloads, use `set_inter_bar_config_with_cache()` with a global cache.
375 pub fn set_inter_bar_config(&mut self, config: InterBarConfig) {
376 self.set_inter_bar_config_with_cache(config, None);
377 }
378
379 /// Re-enable inter-bar features with optional external entropy cache (Issue #145 Phase 3).
380 ///
381 /// Used after `from_checkpoint()` to restore microstructure config that
382 /// is not preserved in checkpoint state. Allows specifying a shared entropy cache
383 /// for multi-symbol processors.
384 pub fn set_inter_bar_config_with_cache(
385 &mut self,
386 config: InterBarConfig,
387 external_cache: Option<
388 std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>,
389 >,
390 ) {
391 self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
392 self.inter_bar_config = Some(config);
393 }
394
395 /// Re-enable intra-bar features on an existing processor (Issue #97).
396 pub fn set_intra_bar_features(&mut self, enabled: bool) {
397 self.include_intra_bar_features = enabled;
398 }
399
400 /// Issue #128: Set intra-bar feature configuration.
401 /// Controls which complexity features (Hurst, PE) are computed.
402 pub fn with_intra_bar_config(mut self, config: crate::intrabar::IntraBarConfig) -> Self {
403 self.intra_bar_config = config;
404 self
405 }
406
407 /// Issue #128: Set intra-bar feature configuration on existing processor.
408 pub fn set_intra_bar_config(&mut self, config: crate::intrabar::IntraBarConfig) {
409 self.intra_bar_config = config;
410 }
411
412 /// Process a single trade and return completed bar if any
413 ///
414 /// Maintains internal state for streaming use case. State persists across calls
415 /// until a bar completes (threshold breach), enabling get_incomplete_bar().
416 ///
417 /// # Arguments
418 ///
419 /// * `trade` - Single aggregated trade to process
420 ///
421 /// # Returns
422 ///
423 /// `Some(OpenDeviationBar)` if a bar was completed, `None` otherwise
424 ///
425 /// # State Management
426 ///
427 /// - First trade: Initializes new bar state
428 /// - Subsequent trades: Updates existing bar or closes on breach
429 /// - Breach: Returns completed bar, starts new bar with breaching trade
430 ///
431 /// Issue #96 Task #78: Accept borrowed AggTrade to eliminate clones in fan-out loops.
432 /// Streaming pipelines (4+ thresholds) were cloning ~57 byte trades per processor.
433 /// Signature change to `&AggTrade` eliminates 4-8x unnecessary allocations.
434 /// Issue #96 Task #84: `#[inline]` — main hot-path entry point called on every trade.
435 #[inline]
436 pub fn process_single_trade(
437 &mut self,
438 trade: &AggTrade,
439 ) -> Result<Option<OpenDeviationBar>, ProcessingError> {
440 // Track price and position for checkpoint
441 self.price_window.push(trade.price);
442 self.last_trade_id = Some(trade.agg_trade_id);
443 self.last_timestamp_us = trade.timestamp;
444
445 // Issue #59: Push trade to history buffer for inter-bar feature computation
446 // This must happen BEFORE bar processing so lookback window includes recent trades
447 if let Some(ref mut history) = self.trade_history {
448 history.push(trade);
449 }
450
451 // Issue #46: If previous call triggered a breach, this trade opens the new bar.
452 // This matches the batch path's defer_open semantics - the breaching trade
453 // closes the current bar, and the NEXT trade opens the new bar.
454 if self.defer_open {
455 // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
456 if let Some(ref mut history) = self.trade_history {
457 history.on_bar_open(trade.timestamp);
458 }
459 self.current_bar_state = Some(if self.include_intra_bar_features {
460 OpenDeviationBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
461 } else {
462 OpenDeviationBarState::new(trade, self.threshold_ratio)
463 });
464 self.defer_open = false;
465 return Ok(None);
466 }
467
468 match &mut self.current_bar_state {
469 None => {
470 // First trade - initialize new bar
471 // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
472 if let Some(ref mut history) = self.trade_history {
473 history.on_bar_open(trade.timestamp);
474 }
475 self.current_bar_state = Some(if self.include_intra_bar_features {
476 OpenDeviationBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
477 } else {
478 OpenDeviationBarState::new(trade, self.threshold_ratio)
479 });
480 Ok(None)
481 }
482 Some(bar_state) => {
483 // Issue #59 & #96 Task #44: Accumulate trade for intra-bar features (before breach check)
484 // Only accumulates if features enabled, avoiding unnecessary clones
485 bar_state.accumulate_trade(trade, self.include_intra_bar_features);
486
487 // Check for threshold breach
488 let price_breaches = bar_state.bar.is_breach(
489 trade.price,
490 bar_state.upper_threshold,
491 bar_state.lower_threshold,
492 );
493
494 // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
495 // This eliminates "instant bars" during flash crashes where multiple trades
496 // occur at the same millisecond.
497 let timestamp_allows_close = !self.prevent_same_timestamp_close
498 || trade.timestamp != bar_state.bar.open_time;
499
500 if price_breaches && timestamp_allows_close {
501 // Breach detected AND timestamp changed - close current bar
502 bar_state.bar.update_with_trade(trade);
503
504 // Validation: Ensure high/low include open/close extremes
505 debug_assert!(
506 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
507 );
508 debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
509
510 // Compute microstructure features at bar finalization (Issue #25)
511 bar_state.bar.compute_microstructure_features();
512
513 // Issue #59: Compute inter-bar features from lookback window
514 // Features are computed from trades BEFORE bar.open_time (no lookahead)
515 if let Some(ref mut history) = self.trade_history {
516 let inter_bar_features = history.compute_features(bar_state.bar.open_time);
517 bar_state.bar.set_inter_bar_features(&inter_bar_features);
518 // Issue #68: Notify history that bar is closing (resumes normal pruning)
519 history.on_bar_close();
520 }
521
522 // Issue #59: Compute intra-bar features from accumulated trades
523 if self.include_intra_bar_features {
524 // Issue #96 Task #173: Use reusable scratch buffers from bar_state
525 let intra_bar_features =
526 crate::intrabar::compute_intra_bar_features_with_config(
527 &bar_state.accumulated_trades,
528 &mut bar_state.scratch_prices,
529 &mut bar_state.scratch_volumes,
530 &self.intra_bar_config, // Issue #128
531 );
532 bar_state.bar.set_intra_bar_features(&intra_bar_features);
533 }
534
535 // Move bar out instead of cloning — bar_state borrow ends after
536 // last use above (NLL), so take() is safe here.
537 let completed_bar = self.current_bar_state.take().unwrap().bar;
538
539 // v1.4: Track last completed bar's trade ID for dedup floor
540 self.last_completed_bar_tid = Some(completed_bar.last_agg_trade_id);
541
542 // Issue #46: Don't start new bar with breaching trade.
543 // Next trade will open the new bar via defer_open.
544 self.defer_open = true;
545
546 Ok(Some(completed_bar))
547 } else {
548 // Either no breach OR same timestamp (gate active) - update existing bar
549 bar_state.bar.update_with_trade(trade);
550 Ok(None)
551 }
552 }
553 }
554 }
555
556 /// Get any incomplete bar currently being processed
557 ///
558 /// Returns clone of current bar state for inspection without consuming it.
559 /// Useful for final bar at stream end or progress monitoring.
560 ///
561 /// # Returns
562 ///
563 /// `Some(OpenDeviationBar)` if bar is in progress, `None` if no active bar
564 pub fn get_incomplete_bar(&self) -> Option<OpenDeviationBar> {
565 self.current_bar_state.as_ref().map(|state| {
566 let mut bar = state.bar.clone();
567 // Issue #275: Compute microstructure features for incomplete bars.
568 // Without this, duration_us (and derived trade_intensity) remain at
569 // their default of 0 when the bar is inspected mid-construction.
570 bar.compute_microstructure_features();
571 bar
572 })
573 }
574
575 /// Get the last processed aggregate trade ID (for gap detection on reconnect).
576 ///
577 /// Returns `None` for fresh processors that have not yet processed any trades.
578 /// After checkpoint restore, returns the last trade ID from the checkpoint.
579 pub fn last_agg_trade_id(&self) -> Option<i64> {
580 self.last_trade_id
581 }
582
583 /// Get the last COMPLETED bar's aggregate trade ID.
584 ///
585 /// Unlike `last_agg_trade_id()` which includes the forming bar tail,
586 /// this returns the trade ID from the most recently completed or orphaned bar.
587 /// Used by committed_floors initialization to avoid suppressing junction bars.
588 ///
589 /// Returns `None` for fresh processors that have not yet completed any bar.
590 pub fn last_completed_bar_tid(&self) -> Option<i64> {
591 self.last_completed_bar_tid
592 }
593
594 /// Process AggTrade records into open deviation bars including incomplete bars for analysis
595 ///
596 /// # Arguments
597 ///
598 /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
599 ///
600 /// # Returns
601 ///
602 /// Vector of open deviation bars including incomplete bars at end of data
603 ///
604 /// # Warning
605 ///
606 /// This method is for analysis purposes only. Incomplete bars violate the
607 /// fundamental open deviation bar algorithm and should not be used for production trading.
608 pub fn process_agg_trade_records_with_incomplete(
609 &mut self,
610 agg_trade_records: &[AggTrade],
611 ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
612 self.process_agg_trade_records_with_options(agg_trade_records, true)
613 }
614
615 /// Process Binance aggregated trade records into open deviation bars
616 ///
617 /// This is the primary method for converting AggTrade records (which aggregate
618 /// multiple individual trades) into open deviation bars based on price movement thresholds.
619 ///
620 /// # Parameters
621 ///
622 /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
623 /// Each record represents multiple individual trades aggregated at same price
624 ///
625 /// # Returns
626 ///
627 /// Vector of completed open deviation bars (ONLY bars that breached thresholds).
628 /// Each bar tracks both individual trade count and AggTrade record count.
629 pub fn process_agg_trade_records(
630 &mut self,
631 agg_trade_records: &[AggTrade],
632 ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
633 self.process_agg_trade_records_with_options(agg_trade_records, false)
634 }
635
636 /// Process AggTrade records with options for including incomplete bars
637 ///
638 /// Batch processing mode: Clears any existing state before processing.
639 /// Use process_single_trade() for stateful streaming instead.
640 ///
641 /// # Parameters
642 ///
643 /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
644 /// * `include_incomplete` - Whether to include incomplete bars at end of processing
645 ///
646 /// # Returns
647 ///
648 /// Vector of open deviation bars (completed + incomplete if requested)
649 pub fn process_agg_trade_records_with_options(
650 &mut self,
651 agg_trade_records: &[AggTrade],
652 include_incomplete: bool,
653 ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
654 if agg_trade_records.is_empty() {
655 return Ok(Vec::new());
656 }
657
658 // Validate records are sorted
659 self.validate_trade_ordering(agg_trade_records)?;
660
661 // Use existing bar state if resuming from checkpoint, otherwise start fresh
662 // This is CRITICAL for cross-file continuation (Issues #2, #3)
663 let mut current_bar: Option<OpenDeviationBarState> = if self.resumed_from_checkpoint {
664 // Continue from checkpoint's incomplete bar
665 self.resumed_from_checkpoint = false; // Consume the flag
666 let restored_bar = self.current_bar_state.take();
667
668 // Issue #112: Gap-aware checkpoint recovery
669 // If the forming bar's close_time is too far from the first incoming trade,
670 // discard it as an orphan to prevent oversized bars from data gaps.
671 if let Some(ref bar_state) = restored_bar {
672 let first_trade_ts = agg_trade_records[0].timestamp;
673 let gap = first_trade_ts - bar_state.bar.close_time;
674 if gap > self.max_gap_us {
675 self.anomaly_summary.record_gap();
676 // Discard forming bar — same treatment as ouroboros reset
677 None
678 } else {
679 restored_bar
680 }
681 } else {
682 restored_bar
683 }
684 } else {
685 // Start fresh for normal batch processing
686 self.current_bar_state = None;
687 None
688 };
689
690 let mut bars = Vec::with_capacity(agg_trade_records.len() / 50); // Heuristic: 50 trades/bar covers consolidation regimes
691 let mut defer_open = false;
692
693 for agg_record in agg_trade_records {
694 // Track price and position for checkpoint
695 self.price_window.push(agg_record.price);
696 self.last_trade_id = Some(agg_record.agg_trade_id);
697 self.last_timestamp_us = agg_record.timestamp;
698
699 // Issue #59: Push trade to history buffer for inter-bar feature computation
700 if let Some(ref mut history) = self.trade_history {
701 history.push(agg_record);
702 }
703
704 if defer_open {
705 // Previous bar closed, this agg_record opens new bar
706 // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
707 if let Some(ref mut history) = self.trade_history {
708 history.on_bar_open(agg_record.timestamp);
709 }
710 current_bar = Some(if self.include_intra_bar_features {
711 OpenDeviationBarState::new_with_trade_accumulation(
712 agg_record,
713 self.threshold_ratio,
714 )
715 } else {
716 OpenDeviationBarState::new(agg_record, self.threshold_ratio)
717 });
718 defer_open = false;
719 continue;
720 }
721
722 match current_bar {
723 None => {
724 // First bar initialization
725 // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
726 if let Some(ref mut history) = self.trade_history {
727 history.on_bar_open(agg_record.timestamp);
728 }
729 current_bar = Some(if self.include_intra_bar_features {
730 OpenDeviationBarState::new_with_trade_accumulation(
731 agg_record,
732 self.threshold_ratio,
733 )
734 } else {
735 OpenDeviationBarState::new(agg_record, self.threshold_ratio)
736 });
737 }
738 Some(ref mut bar_state) => {
739 // Issue #59 & #96 Task #44: Accumulate trade for intra-bar features (before breach check)
740 // Only accumulates if features enabled, avoiding unnecessary clones
741 bar_state.accumulate_trade(agg_record, self.include_intra_bar_features);
742
743 // Check if this AggTrade record breaches the threshold
744 let price_breaches = bar_state.bar.is_breach(
745 agg_record.price,
746 bar_state.upper_threshold,
747 bar_state.lower_threshold,
748 );
749
750 // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
751 // This eliminates "instant bars" during flash crashes where multiple trades
752 // occur at the same millisecond.
753 let timestamp_allows_close = !self.prevent_same_timestamp_close
754 || agg_record.timestamp != bar_state.bar.open_time;
755
756 if price_breaches && timestamp_allows_close {
757 // Breach detected AND timestamp changed - update bar with breaching record
758 bar_state.bar.update_with_trade(agg_record);
759
760 // Validation: Ensure high/low include open/close extremes
761 debug_assert!(
762 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
763 );
764 debug_assert!(
765 bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
766 );
767
768 // Compute microstructure features at bar finalization (Issue #34)
769 bar_state.bar.compute_microstructure_features();
770
771 // Issue #59: Compute inter-bar features from lookback window
772 if let Some(ref mut history) = self.trade_history {
773 let inter_bar_features =
774 history.compute_features(bar_state.bar.open_time);
775 bar_state.bar.set_inter_bar_features(&inter_bar_features);
776 // Issue #68: Notify history that bar is closing (resumes normal pruning)
777 history.on_bar_close();
778 }
779
780 // Issue #59: Compute intra-bar features from accumulated trades
781 if self.include_intra_bar_features {
782 // Issue #96 Task #173: Use reusable scratch buffers from bar_state
783 let intra_bar_features =
784 crate::intrabar::compute_intra_bar_features_with_config(
785 &bar_state.accumulated_trades,
786 &mut bar_state.scratch_prices,
787 &mut bar_state.scratch_volumes,
788 &self.intra_bar_config, // Issue #128
789 );
790 bar_state.bar.set_intra_bar_features(&intra_bar_features);
791 }
792
793 // Move bar out instead of cloning — bar_state borrow ends
794 // after last use above (NLL), so take() is safe here.
795 bars.push(current_bar.take().unwrap().bar);
796 // v1.4: Track last completed bar's trade ID for dedup floor
797 self.last_completed_bar_tid = Some(bars.last().unwrap().last_agg_trade_id);
798 defer_open = true; // Next record will open new bar
799 } else {
800 // Either no breach OR same timestamp (gate active) - normal update
801 bar_state.bar.update_with_trade(agg_record);
802 }
803 }
804 }
805 }
806
807 // Save current bar state for checkpoint and optionally append incomplete bar.
808 // When include_incomplete=true, clone for checkpoint then consume for output.
809 // When include_incomplete=false, move directly (no clone needed).
810 if include_incomplete {
811 // Issue #96 Task #95: Optimize checkpoint cloning with take() to avoid Vec allocation
812 // (accumulated_trades not needed after intra-bar features computed).
813 // Using std::mem::take() instead of clone+clear reduces allocation overhead.
814 if let Some(ref state) = current_bar {
815 // Construct checkpoint state without cloning accumulated_trades/scratch buffers
816 // (they're not needed for checkpoint restoration). Avoids cloning ~3.5KB inline SmallVec.
817 self.current_bar_state = Some(OpenDeviationBarState {
818 bar: state.bar.clone(),
819 upper_threshold: state.upper_threshold,
820 lower_threshold: state.lower_threshold,
821 accumulated_trades: SmallVec::new(),
822 scratch_prices: SmallVec::new(),
823 scratch_volumes: SmallVec::new(),
824 });
825 }
826
827 // Add final partial bar only if explicitly requested
828 // This preserves algorithm integrity: bars should only close on threshold breach
829 if let Some(mut bar_state) = current_bar {
830 // Compute microstructure features for incomplete bar (Issue #34)
831 bar_state.bar.compute_microstructure_features();
832
833 // Issue #59: Compute inter-bar features from lookback window
834 if let Some(ref history) = self.trade_history {
835 let inter_bar_features = history.compute_features(bar_state.bar.open_time);
836 bar_state.bar.set_inter_bar_features(&inter_bar_features);
837 }
838
839 // Issue #59: Compute intra-bar features from accumulated trades
840 if self.include_intra_bar_features {
841 // Issue #96 Task #173: Use reusable scratch buffers from bar_state
842 let intra_bar_features =
843 crate::intrabar::compute_intra_bar_features_with_config(
844 &bar_state.accumulated_trades,
845 &mut bar_state.scratch_prices,
846 &mut bar_state.scratch_volumes,
847 &self.intra_bar_config, // Issue #128
848 );
849 bar_state.bar.set_intra_bar_features(&intra_bar_features);
850 }
851
852 bars.push(bar_state.bar);
853 }
854 } else {
855 // No incomplete bar appended — move ownership directly, no clone needed
856 self.current_bar_state = current_bar;
857 }
858
859 Ok(bars)
860 }
861
862 // === CHECKPOINT METHODS ===
863
864 /// Create checkpoint for cross-file continuation
865 ///
866 /// Captures current processing state for seamless continuation:
867 /// - Incomplete bar (if any) with FIXED thresholds
868 /// - Position tracking (timestamp, trade_id if available)
869 /// - Price hash for verification
870 ///
871 /// # Arguments
872 ///
873 /// * `symbol` - Symbol being processed (e.g., "BTCUSDT", "EURUSD")
874 ///
875 /// # Example
876 ///
877 /// ```ignore
878 /// let bars = processor.process_agg_trade_records(&trades)?;
879 /// let checkpoint = processor.create_checkpoint("BTCUSDT");
880 /// let json = serde_json::to_string(&checkpoint)?;
881 /// std::fs::write("checkpoint.json", json)?;
882 /// ```
883 pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
884 let (incomplete_bar, thresholds) = match &self.current_bar_state {
885 Some(state) => (
886 Some(state.bar.clone()),
887 Some((state.upper_threshold, state.lower_threshold)),
888 ),
889 None => (None, None),
890 };
891
892 let mut checkpoint = Checkpoint::new(
893 symbol.to_string(),
894 self.threshold_decimal_bps,
895 incomplete_bar,
896 thresholds,
897 self.last_timestamp_us,
898 self.last_trade_id,
899 self.price_window.compute_hash(),
900 self.prevent_same_timestamp_close,
901 );
902 // Issue #46: Persist defer_open state for cross-session continuity
903 checkpoint.defer_open = self.defer_open;
904 // v1.4: Persist last_completed_bar_tid for dedup floor initialization
905 checkpoint.last_completed_bar_tid = self.last_completed_bar_tid;
906 checkpoint
907 }
908
909 /// Resume processing from checkpoint
910 ///
911 /// Restores incomplete bar state with IMMUTABLE thresholds.
912 /// Next trade continues building the bar until threshold breach.
913 ///
914 /// # Errors
915 ///
916 /// - `CheckpointError::MissingThresholds` - Checkpoint has bar but no thresholds
917 ///
918 /// # Example
919 ///
920 /// ```ignore
921 /// let json = std::fs::read_to_string("checkpoint.json")?;
922 /// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
923 /// let mut processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint)?;
924 /// let bars = processor.process_agg_trade_records(&next_file_trades)?;
925 /// ```
926 pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
927 // Issue #85 Phase 2: Apply checkpoint schema migration if needed
928 let checkpoint = Self::migrate_checkpoint(checkpoint);
929
930 // Issue #62: Validate threshold range before restoring from checkpoint
931 // Valid range: 1-100,000 dbps (0.0001% to 10%)
932 const THRESHOLD_MIN: u32 = 1;
933 const THRESHOLD_MAX: u32 = 100_000;
934 if checkpoint.threshold_decimal_bps < THRESHOLD_MIN
935 || checkpoint.threshold_decimal_bps > THRESHOLD_MAX
936 {
937 return Err(CheckpointError::InvalidThreshold {
938 threshold: checkpoint.threshold_decimal_bps,
939 min_threshold: THRESHOLD_MIN,
940 max_threshold: THRESHOLD_MAX,
941 });
942 }
943
944 // Validate checkpoint consistency
945 if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
946 return Err(CheckpointError::MissingThresholds);
947 }
948
949 // Restore bar state if there's an incomplete bar
950 // Note: accumulated_trades is reset to empty - intra-bar features won't be
951 // accurate for bars resumed from checkpoint (partial trade history lost)
952 let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
953 (Some(bar), Some((upper, lower))) => Some(OpenDeviationBarState {
954 bar,
955 upper_threshold: upper,
956 lower_threshold: lower,
957 accumulated_trades: SmallVec::new(), // Lost on checkpoint - features may be partial
958 scratch_prices: SmallVec::new(),
959 scratch_volumes: SmallVec::new(),
960 }),
961 _ => None,
962 };
963
964 // Issue #96 Task #98: Pre-compute threshold ratio (same as with_options)
965 let threshold_ratio = ((checkpoint.threshold_decimal_bps as i64)
966 * crate::fixed_point::SCALE)
967 / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
968
969 Ok(Self {
970 threshold_decimal_bps: checkpoint.threshold_decimal_bps,
971 threshold_ratio,
972 current_bar_state,
973 price_window: PriceWindow::new(), // Reset - will be rebuilt from new trades
974 last_trade_id: checkpoint.last_trade_id,
975 last_timestamp_us: checkpoint.last_timestamp_us,
976 anomaly_summary: checkpoint.anomaly_summary,
977 resumed_from_checkpoint: true, // Signal to continue from existing bar state
978 prevent_same_timestamp_close: checkpoint.prevent_same_timestamp_close,
979 defer_open: checkpoint.defer_open, // Issue #46: Restore deferred open state
980 last_completed_bar_tid: checkpoint.last_completed_bar_tid, // v1.4: Restore dedup floor
981 trade_history: None, // Issue #59: Must be re-enabled after restore
982 inter_bar_config: None, // Issue #59: Must be re-enabled after restore
983 include_intra_bar_features: false, // Issue #59: Must be re-enabled after restore
984 intra_bar_config: crate::intrabar::IntraBarConfig::default(), // Issue #128
985 max_gap_us: 3_600_000_000, // Issue #112: 1 hour default
986 })
987 }
988
989 /// Migrate checkpoint between schema versions
990 /// Issue #85 Phase 2: Handle v1 → v2 migration
991 /// Safe: JSON deserialization is field-name-based, so old v1 checkpoints load correctly
992 fn migrate_checkpoint(mut checkpoint: Checkpoint) -> Checkpoint {
993 match checkpoint.version {
994 1 => {
995 // v1 → v2: OpenDeviationBar struct field reordering (no behavioral changes)
996 // JSON serialization is position-independent, so no transformation needed
997 checkpoint.version = 2;
998 checkpoint
999 }
1000 2 => {
1001 // Already current version
1002 checkpoint
1003 }
1004 _ => {
1005 // Unknown version - log warning and continue with best effort
1006 eprintln!(
1007 "Warning: Checkpoint has unknown version {}, treating as v2",
1008 checkpoint.version
1009 );
1010 checkpoint.version = 2;
1011 checkpoint
1012 }
1013 }
1014 }
1015
1016 /// Verify we're at the right position in the data stream
1017 ///
1018 /// Call with first trade of new file to verify continuity.
1019 /// Returns verification result indicating if there's a gap or exact match.
1020 ///
1021 /// # Arguments
1022 ///
1023 /// * `first_trade` - First trade of the new file/chunk
1024 ///
1025 /// # Example
1026 ///
1027 /// ```ignore
1028 /// let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint)?;
1029 /// let verification = processor.verify_position(&next_file_trades[0]);
1030 /// match verification {
1031 /// PositionVerification::Exact => println!("Perfect continuation!"),
1032 /// PositionVerification::Gap { missing_count, .. } => {
1033 /// println!("Warning: {} trades missing", missing_count);
1034 /// }
1035 /// PositionVerification::TimestampOnly { gap_ms } => {
1036 /// println!("Exness data: {}ms gap", gap_ms);
1037 /// }
1038 /// }
1039 /// ```
1040 pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
1041 match self.last_trade_id {
1042 Some(last_id) => {
1043 // Binance: has trade IDs - check for gaps
1044 let expected_id = last_id + 1;
1045 if first_trade.agg_trade_id == expected_id {
1046 PositionVerification::Exact
1047 } else {
1048 let missing_count = first_trade.agg_trade_id - expected_id;
1049 PositionVerification::Gap {
1050 expected_id,
1051 actual_id: first_trade.agg_trade_id,
1052 missing_count,
1053 }
1054 }
1055 }
1056 None => {
1057 // Exness: no trade IDs - use timestamp only
1058 let gap_us = first_trade.timestamp - self.last_timestamp_us;
1059 let gap_ms = gap_us / 1000;
1060 PositionVerification::TimestampOnly { gap_ms }
1061 }
1062 }
1063 }
1064
1065 /// Get the current anomaly summary
1066 pub fn anomaly_summary(&self) -> &AnomalySummary {
1067 &self.anomaly_summary
1068 }
1069
1070 /// Get the threshold in decimal basis points
1071 pub fn threshold_decimal_bps(&self) -> u32 {
1072 self.threshold_decimal_bps
1073 }
1074
1075 /// Validate that trades are properly sorted for deterministic processing
1076 ///
1077 /// Issue #96 Task #62: Early-exit optimization for sorted data
1078 /// For typical workloads (95%+ sorted), quick first/last check identifies
1079 /// unsorted batches immediately without full O(n) validation.
1080 fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
1081 if trades.is_empty() {
1082 return Ok(());
1083 }
1084
1085 // Issue #96 Task #62: Fast-path check for obviously unsorted data
1086 // If first and last trades are not ordered, data is definitely unsorted
1087 // This early-exit catches common failures without full validation
1088 let first = &trades[0];
1089 let last = &trades[trades.len() - 1];
1090
1091 if last.timestamp < first.timestamp
1092 || (last.timestamp == first.timestamp && last.agg_trade_id <= first.agg_trade_id)
1093 {
1094 // Definitely unsorted - find exact error location (cold path)
1095 return find_unsorted_trade(trades);
1096 }
1097
1098 // Full validation for typical sorted case
1099 for i in 1..trades.len() {
1100 let prev = &trades[i - 1];
1101 let curr = &trades[i];
1102
1103 // Check ordering: (timestamp, agg_trade_id) ascending
1104 if curr.timestamp < prev.timestamp
1105 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
1106 {
1107 return unsorted_trade_error(i, prev, curr);
1108 }
1109 }
1110
1111 Ok(())
1112 }
1113
1114 /// Reset processor state at a UTC-midnight ouroboros boundary (day mode only).
1115 ///
1116 /// Clears the incomplete bar and position tracking while preserving
1117 /// the threshold configuration. Use this when starting fresh at a
1118 /// known boundary for reproducibility.
1119 ///
1120 /// # Returns
1121 ///
1122 /// The orphaned incomplete bar (if any) so caller can decide
1123 /// whether to include it in results with `is_orphan=True` flag.
1124 ///
1125 /// # Example
1126 ///
1127 /// ```ignore
1128 /// // At year boundary (Jan 1 00:00:00 UTC)
1129 /// let orphaned = processor.reset_at_ouroboros();
1130 /// if let Some(bar) = orphaned {
1131 /// // Handle incomplete bar from previous year
1132 /// }
1133 /// // Continue processing new year's data with clean state
1134 /// ```
1135 pub fn reset_at_ouroboros(&mut self) -> Option<OpenDeviationBar> {
1136 let orphaned = self.current_bar_state.take().map(|mut state| {
1137 // Issue #275: Compute microstructure features for orphan bars.
1138 // Without this, duration_us (and derived trade_intensity) remain at
1139 // their default of 0, even though close_time − open_time > 0.
1140 state.bar.compute_microstructure_features();
1141 state.bar
1142 });
1143 // v1.4: Track orphan bar's trade ID for dedup floor (orphan IS a completed bar)
1144 // Do NOT reset last_completed_bar_tid to None — it persists across day boundaries
1145 if let Some(ref bar) = orphaned {
1146 self.last_completed_bar_tid = Some(bar.last_agg_trade_id);
1147 }
1148 self.price_window = PriceWindow::new();
1149 self.last_trade_id = None;
1150 self.last_timestamp_us = 0;
1151 self.resumed_from_checkpoint = false;
1152 self.defer_open = false;
1153 // Issue #81: Clear bar boundary tracking at ouroboros reset.
1154 // Trades are preserved — still valid lookback for first bar of new segment.
1155 if let Some(ref mut history) = self.trade_history {
1156 history.reset_bar_boundaries();
1157 }
1158 orphaned
1159 }
1160}
1161
1162/// Internal state for a open deviation bar being built
1163#[derive(Clone)]
1164struct OpenDeviationBarState {
1165 /// The open deviation bar being constructed
1166 pub bar: OpenDeviationBar,
1167
1168 /// Upper breach threshold (FIXED from bar open)
1169 pub upper_threshold: FixedPoint,
1170
1171 /// Lower breach threshold (FIXED from bar open)
1172 pub lower_threshold: FixedPoint,
1173
1174 /// Accumulated trades for intra-bar feature computation (Issue #59)
1175 ///
1176 /// When intra-bar features are enabled, trades are accumulated here
1177 /// during bar construction and used to compute features at bar close.
1178 /// Cleared when bar closes to free memory.
1179 /// Issue #136: Optimized from 512→64→48 slots.
1180 /// Profile data: max trades/bar = 26 (P99 = 14), so 64 slots provides
1181 /// 2.46x safety margin. SmallVec transparently spills to heap if exceeded.
1182 pub accumulated_trades: SmallVec<[AggTrade; 64]>,
1183
1184 /// Issue #96: Scratch buffer for intra-bar price extraction
1185 /// SmallVec<[f64; 64]> keeps 95%+ of bars on stack (P99 trades/bar = 14, max = 26)
1186 /// Eliminates heap allocation for typical bars, spills transparently for large ones
1187 pub scratch_prices: SmallVec<[f64; 64]>,
1188
1189 /// Issue #96: Scratch buffer for intra-bar volume extraction
1190 /// Same sizing rationale as scratch_prices
1191 pub scratch_volumes: SmallVec<[f64; 64]>,
1192}
1193
1194impl OpenDeviationBarState {
1195 /// Create new open deviation bar state from opening trade
1196 /// Issue #96 Task #98: Accept pre-computed threshold_ratio for fast threshold calculation
1197 #[inline]
1198 fn new(trade: &AggTrade, threshold_ratio: i64) -> Self {
1199 let bar = OpenDeviationBar::new(trade);
1200
1201 // Issue #96 Task #98: Use cached ratio instead of repeated division
1202 // This avoids BASIS_POINTS_SCALE division on every bar creation
1203 let (upper_threshold, lower_threshold) =
1204 bar.open.compute_range_thresholds_cached(threshold_ratio);
1205
1206 Self {
1207 bar,
1208 upper_threshold,
1209 lower_threshold,
1210 accumulated_trades: SmallVec::new(),
1211 scratch_prices: SmallVec::new(),
1212 scratch_volumes: SmallVec::new(),
1213 }
1214 }
1215
1216 /// Create new open deviation bar state with intra-bar feature accumulation
1217 /// Issue #96 Task #98: Accept pre-computed threshold_ratio for fast threshold calculation
1218 #[inline]
1219 fn new_with_trade_accumulation(trade: &AggTrade, threshold_ratio: i64) -> Self {
1220 let bar = OpenDeviationBar::new(trade);
1221
1222 // Issue #96 Task #98: Use cached ratio instead of repeated division
1223 // This avoids BASIS_POINTS_SCALE division on every bar creation
1224 let (upper_threshold, lower_threshold) =
1225 bar.open.compute_range_thresholds_cached(threshold_ratio);
1226
1227 Self {
1228 bar,
1229 upper_threshold,
1230 lower_threshold,
1231 accumulated_trades: {
1232 let mut sv = SmallVec::new();
1233 sv.push(*trade);
1234 sv
1235 },
1236 scratch_prices: SmallVec::new(),
1237 scratch_volumes: SmallVec::new(),
1238 }
1239 }
1240
1241 /// Accumulate a trade for intra-bar feature computation
1242 ///
1243 /// Issue #96 Task #44: Only accumulates if intra-bar features are enabled,
1244 /// avoiding unnecessary clones for the majority of use cases where they're disabled.
1245 /// Issue #96 Task #79: #[inline] allows compiler to fold invariant branch
1246 /// (include_intra is constant for processor lifetime)
1247 #[inline]
1248 fn accumulate_trade(&mut self, trade: &AggTrade, include_intra: bool) {
1249 if include_intra {
1250 self.accumulated_trades.push(*trade);
1251 }
1252 }
1253}