rangebar_core/processor.rs
1// FILE-SIZE-OK: 1496 lines — tests are inline (access private RangeBarState)
2//! Core range bar processing algorithm
3//!
4//! Implements non-lookahead bias range bar construction where bars close when
5//! price moves ±threshold dbps from the bar's OPEN price.
6
7use crate::checkpoint::{
8 AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
9};
10use crate::fixed_point::FixedPoint;
11use crate::interbar::{InterBarConfig, TradeHistory}; // Issue #59
12use crate::intrabar::compute_intra_bar_features; // Issue #59: Intra-bar features
13use crate::types::{AggTrade, RangeBar};
14#[cfg(feature = "python")]
15use pyo3::prelude::*;
16// Re-export ProcessingError from errors.rs (Phase 2a extraction)
17pub use crate::errors::ProcessingError;
18// Re-export ExportRangeBarProcessor from export_processor.rs (Phase 2d extraction)
19pub use crate::export_processor::ExportRangeBarProcessor;
20
21/// Range bar processor with non-lookahead bias guarantee
22pub struct RangeBarProcessor {
23 /// Threshold in decimal basis points (250 = 25bps, v3.0.0+)
24 threshold_decimal_bps: u32,
25
26 /// Current bar state for streaming processing (Q19)
27 /// Enables get_incomplete_bar() and stateful process_single_trade()
28 current_bar_state: Option<RangeBarState>,
29
30 /// Price window for checkpoint hash verification
31 price_window: PriceWindow,
32
33 /// Last processed trade ID (for gap detection on resume)
34 last_trade_id: Option<i64>,
35
36 /// Last processed timestamp (for position verification)
37 last_timestamp_us: i64,
38
39 /// Anomaly tracking for debugging
40 anomaly_summary: AnomalySummary,
41
42 /// Flag indicating this processor was created from a checkpoint
43 /// When true, process_agg_trade_records will continue from existing bar state
44 resumed_from_checkpoint: bool,
45
46 /// Prevent bars from closing on same timestamp as they opened (Issue #36)
47 ///
48 /// When true (default): A bar cannot close until a trade arrives with a
49 /// different timestamp than the bar's open_time. This prevents "instant bars"
50 /// during flash crashes where multiple trades occur at the same millisecond.
51 ///
52 /// When false: Legacy behavior - bars can close on any breach regardless
53 /// of timestamp, which may produce bars with identical timestamps.
54 prevent_same_timestamp_close: bool,
55
56 /// Deferred bar open flag (Issue #46)
57 ///
58 /// When true: The previous trade triggered a threshold breach and closed a bar.
59 /// The next trade arriving via `process_single_trade()` should open a new bar
60 /// instead of being treated as a continuation.
61 ///
62 /// This matches the batch path's `defer_open` semantics in
63 /// `process_agg_trade_records()` where the breaching trade closes the current
64 /// bar and the NEXT trade opens the new bar.
65 defer_open: bool,
66
67 /// Trade history for inter-bar feature computation (Issue #59)
68 ///
69 /// Ring buffer of recent trades for computing lookback-based features.
70 /// When Some, features are computed from trades BEFORE each bar's open_time.
71 /// When None, inter-bar features are disabled (all lookback_* fields = None).
72 trade_history: Option<TradeHistory>,
73
74 /// Configuration for inter-bar features (Issue #59)
75 ///
76 /// Controls lookback mode (fixed count or time window) and which feature
77 /// tiers to compute. When None, inter-bar features are disabled.
78 inter_bar_config: Option<InterBarConfig>,
79
80 /// Enable intra-bar feature computation (Issue #59)
81 ///
82 /// When true, the processor accumulates trades during bar construction
83 /// and computes 22 features from trades WITHIN each bar at bar close.
84 /// Features include ITH (Investment Time Horizon), statistical, and
85 /// complexity metrics. When false, all intra_* fields are None.
86 include_intra_bar_features: bool,
87}
88
89impl RangeBarProcessor {
90 /// Create new processor with given threshold
91 ///
92 /// Uses default behavior: `prevent_same_timestamp_close = true` (Issue #36)
93 ///
94 /// # Arguments
95 ///
96 /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
97 /// - Example: `250` → 25bps = 0.25%
98 /// - Example: `10` → 1bps = 0.01%
99 /// - Minimum: `1` → 0.1bps = 0.001%
100 ///
101 /// # Breaking Change (v3.0.0)
102 ///
103 /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
104 /// **Migration**: Multiply all threshold values by 10.
105 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
106 Self::with_options(threshold_decimal_bps, true)
107 }
108
109 /// Create new processor with explicit timestamp gating control
110 ///
111 /// # Arguments
112 ///
113 /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
114 /// * `prevent_same_timestamp_close` - If true, bars cannot close until
115 /// timestamp advances from open_time. This prevents "instant bars" during
116 /// flash crashes. Set to false for legacy behavior (pre-v9).
117 ///
118 /// # Example
119 ///
120 /// ```ignore
121 /// // Default behavior (v9+): timestamp gating enabled
122 /// let processor = RangeBarProcessor::new(250)?;
123 ///
124 /// // Legacy behavior: allow instant bars
125 /// let processor = RangeBarProcessor::with_options(250, false)?;
126 /// ```
127 pub fn with_options(
128 threshold_decimal_bps: u32,
129 prevent_same_timestamp_close: bool,
130 ) -> Result<Self, ProcessingError> {
131 // Validation bounds (v3.0.0: dbps units)
132 // Min: 1 dbps = 0.001%
133 // Max: 100,000 dbps = 100%
134 if threshold_decimal_bps < 1 {
135 return Err(ProcessingError::InvalidThreshold {
136 threshold_decimal_bps,
137 });
138 }
139 if threshold_decimal_bps > 100_000 {
140 return Err(ProcessingError::InvalidThreshold {
141 threshold_decimal_bps,
142 });
143 }
144
145 Ok(Self {
146 threshold_decimal_bps,
147 current_bar_state: None,
148 price_window: PriceWindow::new(),
149 last_trade_id: None,
150 last_timestamp_us: 0,
151 anomaly_summary: AnomalySummary::default(),
152 resumed_from_checkpoint: false,
153 prevent_same_timestamp_close,
154 defer_open: false,
155 trade_history: None, // Issue #59: disabled by default
156 inter_bar_config: None, // Issue #59: disabled by default
157 include_intra_bar_features: false, // Issue #59: disabled by default
158 })
159 }
160
161 /// Get the prevent_same_timestamp_close setting
162 pub fn prevent_same_timestamp_close(&self) -> bool {
163 self.prevent_same_timestamp_close
164 }
165
166 /// Enable inter-bar feature computation with the given configuration (Issue #59)
167 ///
168 /// When enabled, the processor maintains a trade history buffer and computes
169 /// lookback-based microstructure features on each bar close. Features are
170 /// computed from trades that occurred BEFORE each bar's open_time, ensuring
171 /// no lookahead bias.
172 ///
173 /// # Arguments
174 ///
175 /// * `config` - Configuration controlling lookback mode and feature tiers
176 ///
177 /// # Example
178 ///
179 /// ```ignore
180 /// use rangebar_core::processor::RangeBarProcessor;
181 /// use rangebar_core::interbar::{InterBarConfig, LookbackMode};
182 ///
183 /// let processor = RangeBarProcessor::new(1000)?
184 /// .with_inter_bar_config(InterBarConfig {
185 /// lookback_mode: LookbackMode::FixedCount(500),
186 /// compute_tier2: true,
187 /// compute_tier3: true,
188 /// });
189 /// ```
190 pub fn with_inter_bar_config(mut self, config: InterBarConfig) -> Self {
191 self.trade_history = Some(TradeHistory::new(config.clone()));
192 self.inter_bar_config = Some(config);
193 self
194 }
195
196 /// Check if inter-bar features are enabled
197 pub fn inter_bar_enabled(&self) -> bool {
198 self.inter_bar_config.is_some()
199 }
200
201 /// Enable intra-bar feature computation (Issue #59)
202 ///
203 /// When enabled, the processor accumulates trades during bar construction
204 /// and computes 22 features from trades WITHIN each bar at bar close:
205 /// - 8 ITH features (Investment Time Horizon)
206 /// - 12 statistical features (OFI, intensity, Kyle lambda, etc.)
207 /// - 2 complexity features (Hurst exponent, permutation entropy)
208 ///
209 /// # Memory Note
210 ///
211 /// Trades are accumulated per-bar and freed when the bar closes.
212 /// Typical 1000 dbps bar: ~50-500 trades, ~2-24 KB overhead.
213 ///
214 /// # Example
215 ///
216 /// ```ignore
217 /// let processor = RangeBarProcessor::new(1000)?
218 /// .with_intra_bar_features();
219 /// ```
220 pub fn with_intra_bar_features(mut self) -> Self {
221 self.include_intra_bar_features = true;
222 self
223 }
224
225 /// Check if intra-bar features are enabled
226 pub fn intra_bar_enabled(&self) -> bool {
227 self.include_intra_bar_features
228 }
229
230 /// Re-enable inter-bar features on an existing processor (Issue #97).
231 ///
232 /// Used after `from_checkpoint()` to restore microstructure config that
233 /// is not preserved in checkpoint state.
234 pub fn set_inter_bar_config(&mut self, config: InterBarConfig) {
235 self.trade_history = Some(TradeHistory::new(config.clone()));
236 self.inter_bar_config = Some(config);
237 }
238
239 /// Re-enable intra-bar features on an existing processor (Issue #97).
240 pub fn set_intra_bar_features(&mut self, enabled: bool) {
241 self.include_intra_bar_features = enabled;
242 }
243
244 /// Process a single trade and return completed bar if any
245 ///
246 /// Maintains internal state for streaming use case. State persists across calls
247 /// until a bar completes (threshold breach), enabling get_incomplete_bar().
248 ///
249 /// # Arguments
250 ///
251 /// * `trade` - Single aggregated trade to process
252 ///
253 /// # Returns
254 ///
255 /// `Some(RangeBar)` if a bar was completed, `None` otherwise
256 ///
257 /// # State Management
258 ///
259 /// - First trade: Initializes new bar state
260 /// - Subsequent trades: Updates existing bar or closes on breach
261 /// - Breach: Returns completed bar, starts new bar with breaching trade
262 pub fn process_single_trade(
263 &mut self,
264 trade: AggTrade,
265 ) -> Result<Option<RangeBar>, ProcessingError> {
266 // Track price and position for checkpoint
267 self.price_window.push(trade.price);
268 self.last_trade_id = Some(trade.agg_trade_id);
269 self.last_timestamp_us = trade.timestamp;
270
271 // Issue #59: Push trade to history buffer for inter-bar feature computation
272 // This must happen BEFORE bar processing so lookback window includes recent trades
273 if let Some(ref mut history) = self.trade_history {
274 history.push(&trade);
275 }
276
277 // Issue #46: If previous call triggered a breach, this trade opens the new bar.
278 // This matches the batch path's defer_open semantics - the breaching trade
279 // closes the current bar, and the NEXT trade opens the new bar.
280 if self.defer_open {
281 // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
282 if let Some(ref mut history) = self.trade_history {
283 history.on_bar_open(trade.timestamp);
284 }
285 self.current_bar_state = Some(if self.include_intra_bar_features {
286 RangeBarState::new_with_trade_accumulation(&trade, self.threshold_decimal_bps)
287 } else {
288 RangeBarState::new(&trade, self.threshold_decimal_bps)
289 });
290 self.defer_open = false;
291 return Ok(None);
292 }
293
294 match &mut self.current_bar_state {
295 None => {
296 // First trade - initialize new bar
297 // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
298 if let Some(ref mut history) = self.trade_history {
299 history.on_bar_open(trade.timestamp);
300 }
301 self.current_bar_state = Some(if self.include_intra_bar_features {
302 RangeBarState::new_with_trade_accumulation(&trade, self.threshold_decimal_bps)
303 } else {
304 RangeBarState::new(&trade, self.threshold_decimal_bps)
305 });
306 Ok(None)
307 }
308 Some(bar_state) => {
309 // Issue #59: Accumulate trade for intra-bar features (before breach check)
310 if self.include_intra_bar_features {
311 bar_state.accumulate_trade(&trade);
312 }
313
314 // Check for threshold breach
315 let price_breaches = bar_state.bar.is_breach(
316 trade.price,
317 bar_state.upper_threshold,
318 bar_state.lower_threshold,
319 );
320
321 // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
322 // This eliminates "instant bars" during flash crashes where multiple trades
323 // occur at the same millisecond.
324 let timestamp_allows_close = !self.prevent_same_timestamp_close
325 || trade.timestamp != bar_state.bar.open_time;
326
327 if price_breaches && timestamp_allows_close {
328 // Breach detected AND timestamp changed - close current bar
329 bar_state.bar.update_with_trade(&trade);
330
331 // Validation: Ensure high/low include open/close extremes
332 debug_assert!(
333 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
334 );
335 debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
336
337 // Compute microstructure features at bar finalization (Issue #25)
338 bar_state.bar.compute_microstructure_features();
339
340 // Issue #59: Compute inter-bar features from lookback window
341 // Features are computed from trades BEFORE bar.open_time (no lookahead)
342 if let Some(ref mut history) = self.trade_history {
343 let inter_bar_features = history.compute_features(bar_state.bar.open_time);
344 bar_state.bar.set_inter_bar_features(&inter_bar_features);
345 // Issue #68: Notify history that bar is closing (resumes normal pruning)
346 history.on_bar_close();
347 }
348
349 // Issue #59: Compute intra-bar features from accumulated trades
350 if self.include_intra_bar_features {
351 let intra_bar_features =
352 compute_intra_bar_features(&bar_state.accumulated_trades);
353 bar_state.bar.set_intra_bar_features(&intra_bar_features);
354 }
355
356 // Move bar out instead of cloning — bar_state borrow ends after
357 // last use above (NLL), so take() is safe here.
358 let completed_bar = self.current_bar_state.take().unwrap().bar;
359
360 // Issue #46: Don't start new bar with breaching trade.
361 // Next trade will open the new bar via defer_open.
362 self.defer_open = true;
363
364 Ok(Some(completed_bar))
365 } else {
366 // Either no breach OR same timestamp (gate active) - update existing bar
367 bar_state.bar.update_with_trade(&trade);
368 Ok(None)
369 }
370 }
371 }
372 }
373
374 /// Get any incomplete bar currently being processed
375 ///
376 /// Returns clone of current bar state for inspection without consuming it.
377 /// Useful for final bar at stream end or progress monitoring.
378 ///
379 /// # Returns
380 ///
381 /// `Some(RangeBar)` if bar is in progress, `None` if no active bar
382 pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
383 self.current_bar_state
384 .as_ref()
385 .map(|state| state.bar.clone())
386 }
387
388 /// Process AggTrade records into range bars including incomplete bars for analysis
389 ///
390 /// # Arguments
391 ///
392 /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
393 ///
394 /// # Returns
395 ///
396 /// Vector of range bars including incomplete bars at end of data
397 ///
398 /// # Warning
399 ///
400 /// This method is for analysis purposes only. Incomplete bars violate the
401 /// fundamental range bar algorithm and should not be used for production trading.
402 pub fn process_agg_trade_records_with_incomplete(
403 &mut self,
404 agg_trade_records: &[AggTrade],
405 ) -> Result<Vec<RangeBar>, ProcessingError> {
406 self.process_agg_trade_records_with_options(agg_trade_records, true)
407 }
408
409 /// Process Binance aggregated trade records into range bars
410 ///
411 /// This is the primary method for converting AggTrade records (which aggregate
412 /// multiple individual trades) into range bars based on price movement thresholds.
413 ///
414 /// # Parameters
415 ///
416 /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
417 /// Each record represents multiple individual trades aggregated at same price
418 ///
419 /// # Returns
420 ///
421 /// Vector of completed range bars (ONLY bars that breached thresholds).
422 /// Each bar tracks both individual trade count and AggTrade record count.
423 pub fn process_agg_trade_records(
424 &mut self,
425 agg_trade_records: &[AggTrade],
426 ) -> Result<Vec<RangeBar>, ProcessingError> {
427 self.process_agg_trade_records_with_options(agg_trade_records, false)
428 }
429
430 /// Process AggTrade records with options for including incomplete bars
431 ///
432 /// Batch processing mode: Clears any existing state before processing.
433 /// Use process_single_trade() for stateful streaming instead.
434 ///
435 /// # Parameters
436 ///
437 /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
438 /// * `include_incomplete` - Whether to include incomplete bars at end of processing
439 ///
440 /// # Returns
441 ///
442 /// Vector of range bars (completed + incomplete if requested)
443 pub fn process_agg_trade_records_with_options(
444 &mut self,
445 agg_trade_records: &[AggTrade],
446 include_incomplete: bool,
447 ) -> Result<Vec<RangeBar>, ProcessingError> {
448 if agg_trade_records.is_empty() {
449 return Ok(Vec::new());
450 }
451
452 // Validate records are sorted
453 self.validate_trade_ordering(agg_trade_records)?;
454
455 // Use existing bar state if resuming from checkpoint, otherwise start fresh
456 // This is CRITICAL for cross-file continuation (Issues #2, #3)
457 let mut current_bar: Option<RangeBarState> = if self.resumed_from_checkpoint {
458 // Continue from checkpoint's incomplete bar
459 self.resumed_from_checkpoint = false; // Consume the flag
460 self.current_bar_state.take()
461 } else {
462 // Start fresh for normal batch processing
463 self.current_bar_state = None;
464 None
465 };
466
467 let mut bars = Vec::with_capacity(agg_trade_records.len() / 100); // Heuristic capacity
468 let mut defer_open = false;
469
470 for agg_record in agg_trade_records {
471 // Track price and position for checkpoint
472 self.price_window.push(agg_record.price);
473 self.last_trade_id = Some(agg_record.agg_trade_id);
474 self.last_timestamp_us = agg_record.timestamp;
475
476 // Issue #59: Push trade to history buffer for inter-bar feature computation
477 if let Some(ref mut history) = self.trade_history {
478 history.push(agg_record);
479 }
480
481 if defer_open {
482 // Previous bar closed, this agg_record opens new bar
483 // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
484 if let Some(ref mut history) = self.trade_history {
485 history.on_bar_open(agg_record.timestamp);
486 }
487 current_bar = Some(if self.include_intra_bar_features {
488 RangeBarState::new_with_trade_accumulation(
489 agg_record,
490 self.threshold_decimal_bps,
491 )
492 } else {
493 RangeBarState::new(agg_record, self.threshold_decimal_bps)
494 });
495 defer_open = false;
496 continue;
497 }
498
499 match current_bar {
500 None => {
501 // First bar initialization
502 // Issue #68: Notify history that new bar is opening (preserves pre-bar trades)
503 if let Some(ref mut history) = self.trade_history {
504 history.on_bar_open(agg_record.timestamp);
505 }
506 current_bar = Some(if self.include_intra_bar_features {
507 RangeBarState::new_with_trade_accumulation(
508 agg_record,
509 self.threshold_decimal_bps,
510 )
511 } else {
512 RangeBarState::new(agg_record, self.threshold_decimal_bps)
513 });
514 }
515 Some(ref mut bar_state) => {
516 // Issue #59: Accumulate trade for intra-bar features (before breach check)
517 if self.include_intra_bar_features {
518 bar_state.accumulate_trade(agg_record);
519 }
520
521 // Check if this AggTrade record breaches the threshold
522 let price_breaches = bar_state.bar.is_breach(
523 agg_record.price,
524 bar_state.upper_threshold,
525 bar_state.lower_threshold,
526 );
527
528 // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
529 // This eliminates "instant bars" during flash crashes where multiple trades
530 // occur at the same millisecond.
531 let timestamp_allows_close = !self.prevent_same_timestamp_close
532 || agg_record.timestamp != bar_state.bar.open_time;
533
534 if price_breaches && timestamp_allows_close {
535 // Breach detected AND timestamp changed - update bar with breaching record
536 bar_state.bar.update_with_trade(agg_record);
537
538 // Validation: Ensure high/low include open/close extremes
539 debug_assert!(
540 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
541 );
542 debug_assert!(
543 bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
544 );
545
546 // Compute microstructure features at bar finalization (Issue #34)
547 bar_state.bar.compute_microstructure_features();
548
549 // Issue #59: Compute inter-bar features from lookback window
550 if let Some(ref mut history) = self.trade_history {
551 let inter_bar_features =
552 history.compute_features(bar_state.bar.open_time);
553 bar_state.bar.set_inter_bar_features(&inter_bar_features);
554 // Issue #68: Notify history that bar is closing (resumes normal pruning)
555 history.on_bar_close();
556 }
557
558 // Issue #59: Compute intra-bar features from accumulated trades
559 if self.include_intra_bar_features {
560 let intra_bar_features =
561 compute_intra_bar_features(&bar_state.accumulated_trades);
562 bar_state.bar.set_intra_bar_features(&intra_bar_features);
563 }
564
565 // Move bar out instead of cloning — bar_state borrow ends
566 // after last use above (NLL), so take() is safe here.
567 bars.push(current_bar.take().unwrap().bar);
568 defer_open = true; // Next record will open new bar
569 } else {
570 // Either no breach OR same timestamp (gate active) - normal update
571 bar_state.bar.update_with_trade(agg_record);
572 }
573 }
574 }
575 }
576
577 // Save current bar state for checkpoint and optionally append incomplete bar.
578 // When include_incomplete=true, clone for checkpoint then consume for output.
579 // When include_incomplete=false, move directly (no clone needed).
580 if include_incomplete {
581 self.current_bar_state = current_bar.clone();
582
583 // Add final partial bar only if explicitly requested
584 // This preserves algorithm integrity: bars should only close on threshold breach
585 if let Some(mut bar_state) = current_bar {
586 // Compute microstructure features for incomplete bar (Issue #34)
587 bar_state.bar.compute_microstructure_features();
588
589 // Issue #59: Compute inter-bar features from lookback window
590 if let Some(ref history) = self.trade_history {
591 let inter_bar_features = history.compute_features(bar_state.bar.open_time);
592 bar_state.bar.set_inter_bar_features(&inter_bar_features);
593 }
594
595 // Issue #59: Compute intra-bar features from accumulated trades
596 if self.include_intra_bar_features {
597 let intra_bar_features =
598 compute_intra_bar_features(&bar_state.accumulated_trades);
599 bar_state.bar.set_intra_bar_features(&intra_bar_features);
600 }
601
602 bars.push(bar_state.bar);
603 }
604 } else {
605 // No incomplete bar appended — move ownership directly, no clone needed
606 self.current_bar_state = current_bar;
607 }
608
609 Ok(bars)
610 }
611
612 // === CHECKPOINT METHODS ===
613
614 /// Create checkpoint for cross-file continuation
615 ///
616 /// Captures current processing state for seamless continuation:
617 /// - Incomplete bar (if any) with FIXED thresholds
618 /// - Position tracking (timestamp, trade_id if available)
619 /// - Price hash for verification
620 ///
621 /// # Arguments
622 ///
623 /// * `symbol` - Symbol being processed (e.g., "BTCUSDT", "EURUSD")
624 ///
625 /// # Example
626 ///
627 /// ```ignore
628 /// let bars = processor.process_agg_trade_records(&trades)?;
629 /// let checkpoint = processor.create_checkpoint("BTCUSDT");
630 /// let json = serde_json::to_string(&checkpoint)?;
631 /// std::fs::write("checkpoint.json", json)?;
632 /// ```
633 pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
634 let (incomplete_bar, thresholds) = match &self.current_bar_state {
635 Some(state) => (
636 Some(state.bar.clone()),
637 Some((state.upper_threshold, state.lower_threshold)),
638 ),
639 None => (None, None),
640 };
641
642 let mut checkpoint = Checkpoint::new(
643 symbol.to_string(),
644 self.threshold_decimal_bps,
645 incomplete_bar,
646 thresholds,
647 self.last_timestamp_us,
648 self.last_trade_id,
649 self.price_window.compute_hash(),
650 self.prevent_same_timestamp_close,
651 );
652 // Issue #46: Persist defer_open state for cross-session continuity
653 checkpoint.defer_open = self.defer_open;
654 checkpoint
655 }
656
657 /// Resume processing from checkpoint
658 ///
659 /// Restores incomplete bar state with IMMUTABLE thresholds.
660 /// Next trade continues building the bar until threshold breach.
661 ///
662 /// # Errors
663 ///
664 /// - `CheckpointError::MissingThresholds` - Checkpoint has bar but no thresholds
665 ///
666 /// # Example
667 ///
668 /// ```ignore
669 /// let json = std::fs::read_to_string("checkpoint.json")?;
670 /// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
671 /// let mut processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
672 /// let bars = processor.process_agg_trade_records(&next_file_trades)?;
673 /// ```
674 pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
675 // Issue #62: Validate threshold range before restoring from checkpoint
676 // Valid range: 1-100,000 dbps (0.0001% to 10%)
677 const THRESHOLD_MIN: u32 = 1;
678 const THRESHOLD_MAX: u32 = 100_000;
679 if checkpoint.threshold_decimal_bps < THRESHOLD_MIN
680 || checkpoint.threshold_decimal_bps > THRESHOLD_MAX
681 {
682 return Err(CheckpointError::InvalidThreshold {
683 threshold: checkpoint.threshold_decimal_bps,
684 min_threshold: THRESHOLD_MIN,
685 max_threshold: THRESHOLD_MAX,
686 });
687 }
688
689 // Validate checkpoint consistency
690 if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
691 return Err(CheckpointError::MissingThresholds);
692 }
693
694 // Restore bar state if there's an incomplete bar
695 // Note: accumulated_trades is reset to empty - intra-bar features won't be
696 // accurate for bars resumed from checkpoint (partial trade history lost)
697 let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
698 (Some(bar), Some((upper, lower))) => Some(RangeBarState {
699 bar,
700 upper_threshold: upper,
701 lower_threshold: lower,
702 accumulated_trades: Vec::new(), // Lost on checkpoint - features may be partial
703 }),
704 _ => None,
705 };
706
707 Ok(Self {
708 threshold_decimal_bps: checkpoint.threshold_decimal_bps,
709 current_bar_state,
710 price_window: PriceWindow::new(), // Reset - will be rebuilt from new trades
711 last_trade_id: checkpoint.last_trade_id,
712 last_timestamp_us: checkpoint.last_timestamp_us,
713 anomaly_summary: checkpoint.anomaly_summary,
714 resumed_from_checkpoint: true, // Signal to continue from existing bar state
715 prevent_same_timestamp_close: checkpoint.prevent_same_timestamp_close,
716 defer_open: checkpoint.defer_open, // Issue #46: Restore deferred open state
717 trade_history: None, // Issue #59: Must be re-enabled after restore
718 inter_bar_config: None, // Issue #59: Must be re-enabled after restore
719 include_intra_bar_features: false, // Issue #59: Must be re-enabled after restore
720 })
721 }
722
723 /// Verify we're at the right position in the data stream
724 ///
725 /// Call with first trade of new file to verify continuity.
726 /// Returns verification result indicating if there's a gap or exact match.
727 ///
728 /// # Arguments
729 ///
730 /// * `first_trade` - First trade of the new file/chunk
731 ///
732 /// # Example
733 ///
734 /// ```ignore
735 /// let processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
736 /// let verification = processor.verify_position(&next_file_trades[0]);
737 /// match verification {
738 /// PositionVerification::Exact => println!("Perfect continuation!"),
739 /// PositionVerification::Gap { missing_count, .. } => {
740 /// println!("Warning: {} trades missing", missing_count);
741 /// }
742 /// PositionVerification::TimestampOnly { gap_ms } => {
743 /// println!("Exness data: {}ms gap", gap_ms);
744 /// }
745 /// }
746 /// ```
747 pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
748 match self.last_trade_id {
749 Some(last_id) => {
750 // Binance: has trade IDs - check for gaps
751 let expected_id = last_id + 1;
752 if first_trade.agg_trade_id == expected_id {
753 PositionVerification::Exact
754 } else {
755 let missing_count = first_trade.agg_trade_id - expected_id;
756 PositionVerification::Gap {
757 expected_id,
758 actual_id: first_trade.agg_trade_id,
759 missing_count,
760 }
761 }
762 }
763 None => {
764 // Exness: no trade IDs - use timestamp only
765 let gap_us = first_trade.timestamp - self.last_timestamp_us;
766 let gap_ms = gap_us / 1000;
767 PositionVerification::TimestampOnly { gap_ms }
768 }
769 }
770 }
771
772 /// Get the current anomaly summary
773 pub fn anomaly_summary(&self) -> &AnomalySummary {
774 &self.anomaly_summary
775 }
776
777 /// Get the threshold in decimal basis points
778 pub fn threshold_decimal_bps(&self) -> u32 {
779 self.threshold_decimal_bps
780 }
781
782 /// Validate that trades are properly sorted for deterministic processing
783 fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
784 for i in 1..trades.len() {
785 let prev = &trades[i - 1];
786 let curr = &trades[i];
787
788 // Check ordering: (timestamp, agg_trade_id) ascending
789 if curr.timestamp < prev.timestamp
790 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
791 {
792 return Err(ProcessingError::UnsortedTrades {
793 index: i,
794 prev_time: prev.timestamp,
795 prev_id: prev.agg_trade_id,
796 curr_time: curr.timestamp,
797 curr_id: curr.agg_trade_id,
798 });
799 }
800 }
801
802 Ok(())
803 }
804
805 /// Reset processor state at an ouroboros boundary (year/month/week).
806 ///
807 /// Clears the incomplete bar and position tracking while preserving
808 /// the threshold configuration. Use this when starting fresh at a
809 /// known boundary for reproducibility.
810 ///
811 /// # Returns
812 ///
813 /// The orphaned incomplete bar (if any) so caller can decide
814 /// whether to include it in results with `is_orphan=True` flag.
815 ///
816 /// # Example
817 ///
818 /// ```ignore
819 /// // At year boundary (Jan 1 00:00:00 UTC)
820 /// let orphaned = processor.reset_at_ouroboros();
821 /// if let Some(bar) = orphaned {
822 /// // Handle incomplete bar from previous year
823 /// }
824 /// // Continue processing new year's data with clean state
825 /// ```
826 pub fn reset_at_ouroboros(&mut self) -> Option<RangeBar> {
827 let orphaned = self.current_bar_state.take().map(|state| state.bar);
828 self.price_window = PriceWindow::new();
829 self.last_trade_id = None;
830 self.last_timestamp_us = 0;
831 self.resumed_from_checkpoint = false;
832 self.defer_open = false;
833 // Issue #81: Clear bar boundary tracking at ouroboros reset.
834 // Trades are preserved — still valid lookback for first bar of new segment.
835 if let Some(ref mut history) = self.trade_history {
836 history.reset_bar_boundaries();
837 }
838 orphaned
839 }
840}
841
842/// Internal state for a range bar being built
843#[derive(Clone)]
844struct RangeBarState {
845 /// The range bar being constructed
846 pub bar: RangeBar,
847
848 /// Upper breach threshold (FIXED from bar open)
849 pub upper_threshold: FixedPoint,
850
851 /// Lower breach threshold (FIXED from bar open)
852 pub lower_threshold: FixedPoint,
853
854 /// Accumulated trades for intra-bar feature computation (Issue #59)
855 ///
856 /// When intra-bar features are enabled, trades are accumulated here
857 /// during bar construction and used to compute features at bar close.
858 /// Cleared when bar closes to free memory.
859 pub accumulated_trades: Vec<AggTrade>,
860}
861
862impl RangeBarState {
863 /// Create new range bar state from opening trade
864 fn new(trade: &AggTrade, threshold_decimal_bps: u32) -> Self {
865 let bar = RangeBar::new(trade);
866
867 // Compute FIXED thresholds from opening price
868 let (upper_threshold, lower_threshold) =
869 bar.open.compute_range_thresholds(threshold_decimal_bps);
870
871 Self {
872 bar,
873 upper_threshold,
874 lower_threshold,
875 accumulated_trades: Vec::new(),
876 }
877 }
878
879 /// Create new range bar state with intra-bar feature accumulation
880 fn new_with_trade_accumulation(trade: &AggTrade, threshold_decimal_bps: u32) -> Self {
881 let bar = RangeBar::new(trade);
882
883 // Compute FIXED thresholds from opening price
884 let (upper_threshold, lower_threshold) =
885 bar.open.compute_range_thresholds(threshold_decimal_bps);
886
887 Self {
888 bar,
889 upper_threshold,
890 lower_threshold,
891 accumulated_trades: vec![trade.clone()],
892 }
893 }
894
895 /// Accumulate a trade for intra-bar feature computation
896 fn accumulate_trade(&mut self, trade: &AggTrade) {
897 self.accumulated_trades.push(trade.clone());
898 }
899}
900
901#[cfg(test)]
902mod tests {
903 use super::*;
904 use crate::test_utils::{self, scenarios};
905
906 #[test]
907 fn test_single_bar_no_breach() {
908 let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 dbps = 0.25%
909
910 // Create trades that stay within 250 dbps threshold
911 let trades = scenarios::no_breach_sequence(250);
912
913 // Test strict algorithm compliance: no bars should be created without breach
914 let bars = processor.process_agg_trade_records(&trades).unwrap();
915 assert_eq!(
916 bars.len(),
917 0,
918 "Strict algorithm should not create bars without breach"
919 );
920
921 // Test analysis mode: incomplete bar should be available for analysis
922 let bars_with_incomplete = processor
923 .process_agg_trade_records_with_incomplete(&trades)
924 .unwrap();
925 assert_eq!(
926 bars_with_incomplete.len(),
927 1,
928 "Analysis mode should include incomplete bar"
929 );
930
931 let bar = &bars_with_incomplete[0];
932 assert_eq!(bar.open.to_string(), "50000.00000000");
933 assert_eq!(bar.high.to_string(), "50100.00000000");
934 assert_eq!(bar.low.to_string(), "49900.00000000");
935 assert_eq!(bar.close.to_string(), "49900.00000000");
936 }
937
938 #[test]
939 fn test_exact_breach_upward() {
940 let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 dbps = 0.25%
941
942 let trades = scenarios::exact_breach_upward(250);
943
944 // Test strict algorithm: only completed bars (with breach)
945 let bars = processor.process_agg_trade_records(&trades).unwrap();
946 assert_eq!(
947 bars.len(),
948 1,
949 "Strict algorithm should only return completed bars"
950 );
951
952 // First bar should close at breach
953 let bar1 = &bars[0];
954 assert_eq!(bar1.open.to_string(), "50000.00000000");
955 // Breach at 250 dbps = 0.25% = 50000 * 1.0025 = 50125
956 assert_eq!(bar1.close.to_string(), "50125.00000000"); // Breach tick included
957 assert_eq!(bar1.high.to_string(), "50125.00000000");
958 assert_eq!(bar1.low.to_string(), "50000.00000000");
959
960 // Test analysis mode: includes incomplete second bar
961 let bars_with_incomplete = processor
962 .process_agg_trade_records_with_incomplete(&trades)
963 .unwrap();
964 assert_eq!(
965 bars_with_incomplete.len(),
966 2,
967 "Analysis mode should include incomplete bars"
968 );
969
970 // Second bar should start at next tick price (not breach price)
971 let bar2 = &bars_with_incomplete[1];
972 assert_eq!(bar2.open.to_string(), "50500.00000000"); // Next tick after breach
973 assert_eq!(bar2.close.to_string(), "50500.00000000");
974 }
975
976 #[test]
977 fn test_exact_breach_downward() {
978 let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
979
980 let trades = scenarios::exact_breach_downward(250);
981
982 let bars = processor.process_agg_trade_records(&trades).unwrap();
983
984 assert_eq!(bars.len(), 1);
985
986 let bar = &bars[0];
987 assert_eq!(bar.open.to_string(), "50000.00000000");
988 assert_eq!(bar.close.to_string(), "49875.00000000"); // Breach tick included
989 assert_eq!(bar.high.to_string(), "50000.00000000");
990 assert_eq!(bar.low.to_string(), "49875.00000000");
991 }
992
993 #[test]
994 fn test_large_gap_single_bar() {
995 let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
996
997 let trades = scenarios::large_gap_sequence();
998
999 let bars = processor.process_agg_trade_records(&trades).unwrap();
1000
1001 // Should create exactly ONE bar, not multiple bars to "fill the gap"
1002 assert_eq!(bars.len(), 1);
1003
1004 let bar = &bars[0];
1005 assert_eq!(bar.open.to_string(), "50000.00000000");
1006 assert_eq!(bar.close.to_string(), "51000.00000000");
1007 assert_eq!(bar.high.to_string(), "51000.00000000");
1008 assert_eq!(bar.low.to_string(), "50000.00000000");
1009 }
1010
1011 #[test]
1012 fn test_unsorted_trades_error() {
1013 let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
1014
1015 let trades = scenarios::unsorted_sequence();
1016
1017 let result = processor.process_agg_trade_records(&trades);
1018 assert!(result.is_err());
1019
1020 match result {
1021 Err(ProcessingError::UnsortedTrades { index, .. }) => {
1022 assert_eq!(index, 1);
1023 }
1024 _ => panic!("Expected UnsortedTrades error"),
1025 }
1026 }
1027
1028 #[test]
1029 fn test_threshold_calculation() {
1030 let processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
1031
1032 let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1033 let bar_state = RangeBarState::new(&trade, processor.threshold_decimal_bps);
1034
1035 // 50000 * 0.0025 = 125 (25bps = 0.25%)
1036 assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
1037 assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
1038 }
1039
1040 #[test]
1041 fn test_empty_trades() {
1042 let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
1043 let trades = scenarios::empty_sequence();
1044 let bars = processor.process_agg_trade_records(&trades).unwrap();
1045 assert_eq!(bars.len(), 0);
1046 }
1047
1048 #[test]
1049 fn test_debug_streaming_data() {
1050 let mut processor = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1051
1052 // Create trades similar to our test data
1053 let trades = vec![
1054 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1055 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
1056 test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1057 ];
1058
1059 println!("Test data prices: 50014 -> 50163 -> 50032");
1060 println!("Expected price movements: +0.3% then -0.26%");
1061
1062 let bars = processor.process_agg_trade_records(&trades).unwrap();
1063 println!("Generated {} range bars", bars.len());
1064
1065 for (i, bar) in bars.iter().enumerate() {
1066 println!(
1067 " Bar {}: O={} H={} L={} C={}",
1068 i + 1,
1069 bar.open,
1070 bar.high,
1071 bar.low,
1072 bar.close
1073 );
1074 }
1075
1076 // With a 0.1% threshold and 0.3% price movement, we should get at least 1 bar
1077 assert!(
1078 !bars.is_empty(),
1079 "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
1080 );
1081 }
1082
1083 #[test]
1084 fn test_threshold_validation() {
1085 // Valid threshold
1086 assert!(RangeBarProcessor::new(250).is_ok());
1087
1088 // Invalid: too low (0 × 0.1bps = 0%)
1089 assert!(matches!(
1090 RangeBarProcessor::new(0),
1091 Err(ProcessingError::InvalidThreshold {
1092 threshold_decimal_bps: 0
1093 })
1094 ));
1095
1096 // Invalid: too high (150,000 × 0.1bps = 15,000bps = 150%)
1097 assert!(matches!(
1098 RangeBarProcessor::new(150_000),
1099 Err(ProcessingError::InvalidThreshold {
1100 threshold_decimal_bps: 150_000
1101 })
1102 ));
1103
1104 // Valid boundary: minimum (1 × 0.1bps = 0.1bps = 0.001%)
1105 assert!(RangeBarProcessor::new(1).is_ok());
1106
1107 // Valid boundary: maximum (100,000 × 0.1bps = 10,000bps = 100%)
1108 assert!(RangeBarProcessor::new(100_000).is_ok());
1109 }
1110
1111 #[test]
1112 fn test_export_processor_with_manual_trades() {
1113 println!("Testing ExportRangeBarProcessor with same trade data...");
1114
1115 let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1116
1117 // Use same trades as the working basic test
1118 let trades = vec![
1119 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1120 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
1121 test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1122 ];
1123
1124 println!(
1125 "Processing {} trades with ExportRangeBarProcessor...",
1126 trades.len()
1127 );
1128
1129 export_processor.process_trades_continuously(&trades);
1130 let bars = export_processor.get_all_completed_bars();
1131
1132 println!(
1133 "ExportRangeBarProcessor generated {} range bars",
1134 bars.len()
1135 );
1136 for (i, bar) in bars.iter().enumerate() {
1137 println!(
1138 " Bar {}: O={} H={} L={} C={}",
1139 i + 1,
1140 bar.open,
1141 bar.high,
1142 bar.low,
1143 bar.close
1144 );
1145 }
1146
1147 // Should match the basic processor results (1 bar)
1148 assert!(
1149 !bars.is_empty(),
1150 "ExportRangeBarProcessor should generate same results as basic processor"
1151 );
1152 }
1153
1154 // === CHECKPOINT TESTS (Issues #2 and #3) ===
1155
1156 #[test]
1157 fn test_checkpoint_creation() {
1158 let mut processor = RangeBarProcessor::new(250).unwrap();
1159
1160 // Process some trades that don't complete a bar
1161 let trades = scenarios::no_breach_sequence(250);
1162 let _bars = processor.process_agg_trade_records(&trades).unwrap();
1163
1164 // Create checkpoint
1165 let checkpoint = processor.create_checkpoint("BTCUSDT");
1166
1167 assert_eq!(checkpoint.symbol, "BTCUSDT");
1168 assert_eq!(checkpoint.threshold_decimal_bps, 250);
1169 assert!(checkpoint.has_incomplete_bar()); // Should have incomplete bar
1170 assert!(checkpoint.thresholds.is_some()); // Thresholds should be saved
1171 assert!(checkpoint.last_trade_id.is_some()); // Should track last trade
1172 }
1173
1174 #[test]
1175 fn test_checkpoint_serialization_roundtrip() {
1176 let mut processor = RangeBarProcessor::new(250).unwrap();
1177
1178 // Process trades
1179 let trades = scenarios::no_breach_sequence(250);
1180 let _bars = processor.process_agg_trade_records(&trades).unwrap();
1181
1182 // Create checkpoint
1183 let checkpoint = processor.create_checkpoint("BTCUSDT");
1184
1185 // Serialize to JSON
1186 let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
1187
1188 // Deserialize back
1189 let restored: Checkpoint =
1190 serde_json::from_str(&json).expect("Deserialization should succeed");
1191
1192 assert_eq!(restored.symbol, checkpoint.symbol);
1193 assert_eq!(
1194 restored.threshold_decimal_bps,
1195 checkpoint.threshold_decimal_bps
1196 );
1197 assert_eq!(
1198 restored.incomplete_bar.is_some(),
1199 checkpoint.incomplete_bar.is_some()
1200 );
1201 }
1202
1203 #[test]
1204 fn test_cross_file_bar_continuation() {
1205 // This is the PRIMARY test for Issues #2 and #3
1206 // Verifies that incomplete bars continue correctly across file boundaries
1207
1208 // Create trades that span multiple bars
1209 let mut all_trades = Vec::new();
1210
1211 // Generate enough trades to produce multiple bars
1212 // Using 100bps threshold (1%) for clearer price movements
1213 let base_timestamp = 1640995200000000i64; // Microseconds
1214
1215 // Create a sequence where we'll have ~3-4 completed bars with remainder
1216 for i in 0..20 {
1217 let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
1218 let trade = test_utils::create_test_agg_trade(
1219 i + 1,
1220 &format!("{:.8}", price),
1221 "1.0",
1222 base_timestamp + (i * 1000000),
1223 );
1224 all_trades.push(trade);
1225 }
1226
1227 // === FULL PROCESSING (baseline) ===
1228 let mut processor_full = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
1229 let bars_full = processor_full
1230 .process_agg_trade_records(&all_trades)
1231 .unwrap();
1232
1233 // === SPLIT PROCESSING WITH CHECKPOINT ===
1234 let split_point = 10; // Split in the middle
1235
1236 // Part 1: Process first half
1237 let mut processor_1 = RangeBarProcessor::new(100).unwrap();
1238 let part1_trades = &all_trades[0..split_point];
1239 let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
1240
1241 // Create checkpoint
1242 let checkpoint = processor_1.create_checkpoint("TEST");
1243
1244 // Part 2: Resume from checkpoint and process second half
1245 let mut processor_2 = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
1246 let part2_trades = &all_trades[split_point..];
1247 let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
1248
1249 // === VERIFY CONTINUATION ===
1250 // Total completed bars should match full processing
1251 let split_total = bars_1.len() + bars_2.len();
1252
1253 println!("Full processing: {} bars", bars_full.len());
1254 println!(
1255 "Split processing: {} + {} = {} bars",
1256 bars_1.len(),
1257 bars_2.len(),
1258 split_total
1259 );
1260
1261 assert_eq!(
1262 split_total,
1263 bars_full.len(),
1264 "Split processing should produce same bar count as full processing"
1265 );
1266
1267 // Verify the bars themselves match
1268 let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
1269 for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
1270 assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
1271 assert_eq!(
1272 full.close.0, split.close.0,
1273 "Bar {} close price mismatch",
1274 i
1275 );
1276 }
1277 }
1278
1279 #[test]
1280 fn test_verify_position_exact() {
1281 let mut processor = RangeBarProcessor::new(250).unwrap();
1282
1283 // Process some trades
1284 let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1285 let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1286
1287 let _ = processor.process_single_trade(trade1);
1288 let _ = processor.process_single_trade(trade2);
1289
1290 // Create next trade in sequence
1291 let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
1292
1293 // Verify position
1294 let verification = processor.verify_position(&next_trade);
1295
1296 assert_eq!(verification, PositionVerification::Exact);
1297 }
1298
1299 #[test]
1300 fn test_verify_position_gap() {
1301 let mut processor = RangeBarProcessor::new(250).unwrap();
1302
1303 // Process some trades
1304 let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1305 let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1306
1307 let _ = processor.process_single_trade(trade1);
1308 let _ = processor.process_single_trade(trade2);
1309
1310 // Create next trade with gap (skip IDs 102-104)
1311 let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
1312
1313 // Verify position
1314 let verification = processor.verify_position(&next_trade);
1315
1316 match verification {
1317 PositionVerification::Gap {
1318 expected_id,
1319 actual_id,
1320 missing_count,
1321 } => {
1322 assert_eq!(expected_id, 102);
1323 assert_eq!(actual_id, 105);
1324 assert_eq!(missing_count, 3);
1325 }
1326 _ => panic!("Expected Gap verification, got {:?}", verification),
1327 }
1328 }
1329
1330 #[test]
1331 fn test_checkpoint_clean_completion() {
1332 // Test when last trade completes a bar with no remainder
1333 // In range bar algorithm: breach trade closes bar, NEXT trade opens new bar
1334 // If there's no next trade, there's no incomplete bar
1335 let mut processor = RangeBarProcessor::new(100).unwrap(); // 10bps
1336
1337 // Create trades that complete exactly one bar
1338 let trades = vec![
1339 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1340 test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // ~0.2% move, breaches 0.1%
1341 ];
1342
1343 let bars = processor.process_agg_trade_records(&trades).unwrap();
1344 assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1345
1346 // Create checkpoint - should NOT have incomplete bar
1347 // (breach trade closes bar, no next trade to open new bar)
1348 let checkpoint = processor.create_checkpoint("TEST");
1349
1350 // With defer_open logic, the next bar isn't started until the next trade
1351 assert!(
1352 !checkpoint.has_incomplete_bar(),
1353 "No incomplete bar when last trade was a breach with no following trade"
1354 );
1355 }
1356
1357 #[test]
1358 fn test_checkpoint_with_remainder() {
1359 // Test when we have trades remaining after a completed bar
1360 let mut processor = RangeBarProcessor::new(100).unwrap(); // 10bps
1361
1362 // Create trades: bar completes at trade 2, trade 3 starts new bar
1363 let trades = vec![
1364 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1365 test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // Breach
1366 test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), // Opens new bar
1367 ];
1368
1369 let bars = processor.process_agg_trade_records(&trades).unwrap();
1370 assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1371
1372 // Create checkpoint - should have incomplete bar from trade 3
1373 let checkpoint = processor.create_checkpoint("TEST");
1374
1375 assert!(
1376 checkpoint.has_incomplete_bar(),
1377 "Should have incomplete bar from trade 3"
1378 );
1379
1380 // Verify the incomplete bar has correct data
1381 let incomplete = checkpoint.incomplete_bar.unwrap();
1382 assert_eq!(
1383 incomplete.open.to_string(),
1384 "50110.00000000",
1385 "Incomplete bar should open at trade 3 price"
1386 );
1387 }
1388
1389 /// Issue #46: Verify streaming and batch paths produce identical bars
1390 ///
1391 /// The batch path (`process_agg_trade_records`) and streaming path
1392 /// (`process_single_trade`) must produce identical OHLCV output for
1393 /// the same input trades. This test catches regressions where the
1394 /// breaching trade is double-counted or bar boundaries differ.
1395 #[test]
1396 fn test_streaming_batch_parity() {
1397 let threshold = 250; // 250 dbps = 0.25%
1398
1399 // Build a sequence with multiple breaches
1400 let trades = test_utils::AggTradeBuilder::new()
1401 .add_trade(1, 1.0, 0) // Open first bar at 50000
1402 .add_trade(2, 1.001, 1000) // +0.1% - accumulate
1403 .add_trade(3, 1.003, 2000) // +0.3% - breach (>0.25%)
1404 .add_trade(4, 1.004, 3000) // Opens second bar
1405 .add_trade(5, 1.005, 4000) // Accumulate
1406 .add_trade(6, 1.008, 5000) // +0.4% from bar 2 open - breach
1407 .add_trade(7, 1.009, 6000) // Opens third bar
1408 .build();
1409
1410 // === BATCH PATH ===
1411 let mut batch_processor = RangeBarProcessor::new(threshold).unwrap();
1412 let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
1413 let batch_incomplete = batch_processor.get_incomplete_bar();
1414
1415 // === STREAMING PATH ===
1416 let mut stream_processor = RangeBarProcessor::new(threshold).unwrap();
1417 let mut stream_bars: Vec<RangeBar> = Vec::new();
1418 for trade in &trades {
1419 if let Some(bar) = stream_processor
1420 .process_single_trade(trade.clone())
1421 .unwrap()
1422 {
1423 stream_bars.push(bar);
1424 }
1425 }
1426 let stream_incomplete = stream_processor.get_incomplete_bar();
1427
1428 // === VERIFY PARITY ===
1429 assert_eq!(
1430 batch_bars.len(),
1431 stream_bars.len(),
1432 "Batch and streaming should produce same number of completed bars"
1433 );
1434
1435 for (i, (batch_bar, stream_bar)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1436 assert_eq!(
1437 batch_bar.open, stream_bar.open,
1438 "Bar {i}: open price mismatch"
1439 );
1440 assert_eq!(
1441 batch_bar.close, stream_bar.close,
1442 "Bar {i}: close price mismatch"
1443 );
1444 assert_eq!(
1445 batch_bar.high, stream_bar.high,
1446 "Bar {i}: high price mismatch"
1447 );
1448 assert_eq!(batch_bar.low, stream_bar.low, "Bar {i}: low price mismatch");
1449 assert_eq!(
1450 batch_bar.volume, stream_bar.volume,
1451 "Bar {i}: volume mismatch (double-counting?)"
1452 );
1453 assert_eq!(
1454 batch_bar.open_time, stream_bar.open_time,
1455 "Bar {i}: open_time mismatch"
1456 );
1457 assert_eq!(
1458 batch_bar.close_time, stream_bar.close_time,
1459 "Bar {i}: close_time mismatch"
1460 );
1461 assert_eq!(
1462 batch_bar.individual_trade_count, stream_bar.individual_trade_count,
1463 "Bar {i}: trade count mismatch"
1464 );
1465 }
1466
1467 // Verify incomplete bars match
1468 match (batch_incomplete, stream_incomplete) {
1469 (Some(b), Some(s)) => {
1470 assert_eq!(b.open, s.open, "Incomplete bar: open mismatch");
1471 assert_eq!(b.close, s.close, "Incomplete bar: close mismatch");
1472 assert_eq!(b.volume, s.volume, "Incomplete bar: volume mismatch");
1473 }
1474 (None, None) => {} // Both finished cleanly
1475 _ => panic!("Incomplete bar presence mismatch between batch and streaming"),
1476 }
1477 }
1478
1479 /// Issue #46: After breach, next trade opens new bar (not breaching trade)
1480 #[test]
1481 fn test_defer_open_new_bar_opens_with_next_trade() {
1482 let mut processor = RangeBarProcessor::new(250).unwrap();
1483
1484 // Trade 1: Opens bar at 50000
1485 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1486 assert!(processor.process_single_trade(t1).unwrap().is_none());
1487
1488 // Trade 2: Breaches threshold (+0.3%)
1489 let t2 = test_utils::create_test_agg_trade(2, "50150.0", "2.0", 2000);
1490 let bar = processor.process_single_trade(t2).unwrap();
1491 assert!(bar.is_some(), "Should close bar on breach");
1492
1493 let closed_bar = bar.unwrap();
1494 assert_eq!(closed_bar.open.to_string(), "50000.00000000");
1495 assert_eq!(closed_bar.close.to_string(), "50150.00000000");
1496
1497 // After breach, no incomplete bar should exist
1498 assert!(
1499 processor.get_incomplete_bar().is_none(),
1500 "No incomplete bar after breach - defer_open is true"
1501 );
1502
1503 // Trade 3: Should open NEW bar (not the breaching trade)
1504 let t3 = test_utils::create_test_agg_trade(3, "50100.0", "3.0", 3000);
1505 assert!(processor.process_single_trade(t3).unwrap().is_none());
1506
1507 let incomplete = processor.get_incomplete_bar().unwrap();
1508 assert_eq!(
1509 incomplete.open.to_string(),
1510 "50100.00000000",
1511 "New bar should open at trade 3's price, not trade 2's"
1512 );
1513 }
1514
1515 // === Memory efficiency tests (R1/R2/R3) ===
1516
1517 #[test]
1518 fn test_bar_close_take_single_trade() {
1519 // R1: Verify bar close via single-trade path produces correct OHLCV after
1520 // clone→take optimization. Uses single_breach_sequence that triggers breach.
1521 let mut processor = RangeBarProcessor::new(250).unwrap();
1522 let trades = scenarios::single_breach_sequence(250);
1523
1524 for trade in &trades[..trades.len() - 1] {
1525 let result = processor.process_single_trade(trade.clone()).unwrap();
1526 assert!(result.is_none());
1527 }
1528
1529 // Last trade triggers breach
1530 let bar = processor
1531 .process_single_trade(trades.last().unwrap().clone())
1532 .unwrap()
1533 .expect("Should produce completed bar");
1534
1535 // Verify OHLCV integrity after take() optimization
1536 assert_eq!(bar.open.to_string(), "50000.00000000");
1537 assert!(bar.high >= bar.open.max(bar.close));
1538 assert!(bar.low <= bar.open.min(bar.close));
1539 assert!(bar.volume > 0);
1540
1541 // Verify processor state is clean after bar close
1542 assert!(processor.get_incomplete_bar().is_none());
1543 }
1544
1545 #[test]
1546 fn test_bar_close_take_batch() {
1547 // R2: Verify batch path produces correct bars after clone→take optimization.
1548 // large_sequence generates enough trades to trigger multiple breaches.
1549 let mut processor = RangeBarProcessor::new(250).unwrap();
1550 let trades = scenarios::large_sequence(500);
1551
1552 let bars = processor.process_agg_trade_records(&trades).unwrap();
1553 assert!(
1554 !bars.is_empty(),
1555 "Should produce at least one completed bar"
1556 );
1557
1558 // Verify every bar has valid OHLCV invariants
1559 for bar in &bars {
1560 assert!(bar.high >= bar.open.max(bar.close));
1561 assert!(bar.low <= bar.open.min(bar.close));
1562 assert!(bar.volume > 0);
1563 assert!(bar.close_time >= bar.open_time);
1564 }
1565 }
1566
1567 #[test]
1568 fn test_checkpoint_conditional_clone() {
1569 // R3: Verify checkpoint state is preserved correctly with both
1570 // include_incomplete=true and include_incomplete=false.
1571 let trades = scenarios::no_breach_sequence(250);
1572
1573 // Test with include_incomplete=false (move, no clone)
1574 let mut processor1 = RangeBarProcessor::new(250).unwrap();
1575 let bars_without = processor1.process_agg_trade_records(&trades).unwrap();
1576 assert_eq!(bars_without.len(), 0);
1577 // Checkpoint should be preserved
1578 assert!(processor1.get_incomplete_bar().is_some());
1579
1580 // Test with include_incomplete=true (clone + consume)
1581 let mut processor2 = RangeBarProcessor::new(250).unwrap();
1582 let bars_with = processor2
1583 .process_agg_trade_records_with_incomplete(&trades)
1584 .unwrap();
1585 assert_eq!(bars_with.len(), 1);
1586 // Checkpoint should ALSO be preserved (cloned before consume)
1587 assert!(processor2.get_incomplete_bar().is_some());
1588
1589 // Both checkpoints should have identical bar content
1590 let cp1 = processor1.get_incomplete_bar().unwrap();
1591 let cp2 = processor2.get_incomplete_bar().unwrap();
1592 assert_eq!(cp1.open, cp2.open);
1593 assert_eq!(cp1.close, cp2.close);
1594 assert_eq!(cp1.high, cp2.high);
1595 assert_eq!(cp1.low, cp2.low);
1596 }
1597}