rangebar_core/
processor.rs

1//! Core range bar processing algorithm
2//!
3//! Implements non-lookahead bias range bar construction where bars close when
4//! price moves ±threshold bps from the bar's OPEN price.
5
6use crate::fixed_point::FixedPoint;
7use crate::types::{AggTrade, RangeBar};
8#[cfg(feature = "python")]
9use pyo3::prelude::*;
10use thiserror::Error;
11
12/// Range bar processor with non-lookahead bias guarantee
13pub struct RangeBarProcessor {
14    /// Threshold in decimal basis points (250 = 25bps, v3.0.0+)
15    threshold_decimal_bps: u32,
16
17    /// Current bar state for streaming processing (Q19)
18    /// Enables get_incomplete_bar() and stateful process_single_trade()
19    current_bar_state: Option<RangeBarState>,
20}
21
22impl RangeBarProcessor {
23    /// Create new processor with given threshold
24    ///
25    /// # Arguments
26    ///
27    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
28    ///   - Example: `250` → 25bps = 0.25%
29    ///   - Example: `10` → 1bps = 0.01%
30    ///   - Minimum: `1` → 0.1bps = 0.001%
31    ///
32    /// # Breaking Change (v3.0.0)
33    ///
34    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
35    /// **Migration**: Multiply all threshold values by 10.
36    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
37        // Validation bounds (v3.0.0: decimal bps units)
38        // Min: 1 × 0.1bps = 0.1bps = 0.001%
39        // Max: 100,000 × 0.1bps = 10,000bps = 100%
40        if threshold_decimal_bps < 1 {
41            return Err(ProcessingError::InvalidThreshold {
42                threshold_decimal_bps,
43            });
44        }
45        if threshold_decimal_bps > 100_000 {
46            return Err(ProcessingError::InvalidThreshold {
47                threshold_decimal_bps,
48            });
49        }
50
51        Ok(Self {
52            threshold_decimal_bps,
53            current_bar_state: None,
54        })
55    }
56
57    /// Process a single trade and return completed bar if any
58    ///
59    /// Maintains internal state for streaming use case. State persists across calls
60    /// until a bar completes (threshold breach), enabling get_incomplete_bar().
61    ///
62    /// # Arguments
63    ///
64    /// * `trade` - Single aggregated trade to process
65    ///
66    /// # Returns
67    ///
68    /// `Some(RangeBar)` if a bar was completed, `None` otherwise
69    ///
70    /// # State Management
71    ///
72    /// - First trade: Initializes new bar state
73    /// - Subsequent trades: Updates existing bar or closes on breach
74    /// - Breach: Returns completed bar, starts new bar with breaching trade
75    pub fn process_single_trade(
76        &mut self,
77        trade: AggTrade,
78    ) -> Result<Option<RangeBar>, ProcessingError> {
79        match &mut self.current_bar_state {
80            None => {
81                // First trade - initialize new bar
82                self.current_bar_state =
83                    Some(RangeBarState::new(&trade, self.threshold_decimal_bps));
84                Ok(None)
85            }
86            Some(bar_state) => {
87                // Check for threshold breach
88                if bar_state.bar.is_breach(
89                    trade.price,
90                    bar_state.upper_threshold,
91                    bar_state.lower_threshold,
92                ) {
93                    // Breach detected - close current bar
94                    bar_state.bar.update_with_trade(&trade);
95
96                    // Validation: Ensure high/low include open/close extremes
97                    debug_assert!(
98                        bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
99                    );
100                    debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
101
102                    let completed_bar = bar_state.bar.clone();
103
104                    // Start new bar with breaching trade
105                    self.current_bar_state =
106                        Some(RangeBarState::new(&trade, self.threshold_decimal_bps));
107
108                    Ok(Some(completed_bar))
109                } else {
110                    // No breach - update existing bar
111                    bar_state.bar.update_with_trade(&trade);
112                    Ok(None)
113                }
114            }
115        }
116    }
117
118    /// Get any incomplete bar currently being processed
119    ///
120    /// Returns clone of current bar state for inspection without consuming it.
121    /// Useful for final bar at stream end or progress monitoring.
122    ///
123    /// # Returns
124    ///
125    /// `Some(RangeBar)` if bar is in progress, `None` if no active bar
126    pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
127        self.current_bar_state
128            .as_ref()
129            .map(|state| state.bar.clone())
130    }
131
132    /// Process AggTrade records into range bars including incomplete bars for analysis
133    ///
134    /// # Arguments
135    ///
136    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
137    ///
138    /// # Returns
139    ///
140    /// Vector of range bars including incomplete bars at end of data
141    ///
142    /// # Warning
143    ///
144    /// This method is for analysis purposes only. Incomplete bars violate the
145    /// fundamental range bar algorithm and should not be used for production trading.
146    pub fn process_agg_trade_records_with_incomplete(
147        &mut self,
148        agg_trade_records: &[AggTrade],
149    ) -> Result<Vec<RangeBar>, ProcessingError> {
150        self.process_agg_trade_records_with_options(agg_trade_records, true)
151    }
152
153    /// Process Binance aggregated trade records into range bars
154    ///
155    /// This is the primary method for converting AggTrade records (which aggregate
156    /// multiple individual trades) into range bars based on price movement thresholds.
157    ///
158    /// # Parameters
159    ///
160    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
161    ///   Each record represents multiple individual trades aggregated at same price
162    ///
163    /// # Returns
164    ///
165    /// Vector of completed range bars (ONLY bars that breached thresholds).
166    /// Each bar tracks both individual trade count and AggTrade record count.
167    pub fn process_agg_trade_records(
168        &mut self,
169        agg_trade_records: &[AggTrade],
170    ) -> Result<Vec<RangeBar>, ProcessingError> {
171        self.process_agg_trade_records_with_options(agg_trade_records, false)
172    }
173
174    /// Process AggTrade records with options for including incomplete bars
175    ///
176    /// Batch processing mode: Clears any existing state before processing.
177    /// Use process_single_trade() for stateful streaming instead.
178    ///
179    /// # Parameters
180    ///
181    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
182    /// * `include_incomplete` - Whether to include incomplete bars at end of processing
183    ///
184    /// # Returns
185    ///
186    /// Vector of range bars (completed + incomplete if requested)
187    pub fn process_agg_trade_records_with_options(
188        &mut self,
189        agg_trade_records: &[AggTrade],
190        include_incomplete: bool,
191    ) -> Result<Vec<RangeBar>, ProcessingError> {
192        if agg_trade_records.is_empty() {
193            return Ok(Vec::new());
194        }
195
196        // Validate records are sorted
197        self.validate_trade_ordering(agg_trade_records)?;
198
199        // Clear streaming state - batch mode uses local state
200        self.current_bar_state = None;
201
202        let mut bars = Vec::with_capacity(agg_trade_records.len() / 100); // Heuristic capacity
203        let mut current_bar: Option<RangeBarState> = None;
204        let mut defer_open = false;
205
206        for agg_record in agg_trade_records {
207            if defer_open {
208                // Previous bar closed, this agg_record opens new bar
209                current_bar = Some(RangeBarState::new(agg_record, self.threshold_decimal_bps));
210                defer_open = false;
211                continue;
212            }
213
214            match current_bar {
215                None => {
216                    // First bar initialization
217                    current_bar = Some(RangeBarState::new(agg_record, self.threshold_decimal_bps));
218                }
219                Some(ref mut bar_state) => {
220                    // Check if this AggTrade record breaches the threshold
221                    if bar_state.bar.is_breach(
222                        agg_record.price,
223                        bar_state.upper_threshold,
224                        bar_state.lower_threshold,
225                    ) {
226                        // Breach detected - update bar with breaching record (includes microstructure)
227                        bar_state.bar.update_with_trade(agg_record);
228
229                        // Validation: Ensure high/low include open/close extremes
230                        debug_assert!(
231                            bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
232                        );
233                        debug_assert!(
234                            bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
235                        );
236
237                        bars.push(bar_state.bar.clone());
238                        current_bar = None;
239                        defer_open = true; // Next record will open new bar
240                    } else {
241                        // No breach: normal update with microstructure calculations
242                        bar_state.bar.update_with_trade(agg_record);
243                    }
244                }
245            }
246        }
247
248        // Add final partial bar only if explicitly requested
249        // This preserves algorithm integrity: bars should only close on threshold breach
250        if include_incomplete && let Some(bar_state) = current_bar {
251            bars.push(bar_state.bar);
252        }
253
254        Ok(bars)
255    }
256
257    /// Validate that trades are properly sorted for deterministic processing
258    fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
259        for i in 1..trades.len() {
260            let prev = &trades[i - 1];
261            let curr = &trades[i];
262
263            // Check ordering: (timestamp, agg_trade_id) ascending
264            if curr.timestamp < prev.timestamp
265                || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
266            {
267                return Err(ProcessingError::UnsortedTrades {
268                    index: i,
269                    prev_time: prev.timestamp,
270                    prev_id: prev.agg_trade_id,
271                    curr_time: curr.timestamp,
272                    curr_id: curr.agg_trade_id,
273                });
274            }
275        }
276
277        Ok(())
278    }
279}
280
281/// Internal state for a range bar being built
282struct RangeBarState {
283    /// The range bar being constructed
284    pub bar: RangeBar,
285
286    /// Upper breach threshold (FIXED from bar open)
287    pub upper_threshold: FixedPoint,
288
289    /// Lower breach threshold (FIXED from bar open)
290    pub lower_threshold: FixedPoint,
291}
292
293impl RangeBarState {
294    /// Create new range bar state from opening trade
295    fn new(trade: &AggTrade, threshold_decimal_bps: u32) -> Self {
296        let bar = RangeBar::new(trade);
297
298        // Compute FIXED thresholds from opening price
299        let (upper_threshold, lower_threshold) =
300            bar.open.compute_range_thresholds(threshold_decimal_bps);
301
302        Self {
303            bar,
304            upper_threshold,
305            lower_threshold,
306        }
307    }
308}
309
310/// Processing errors
311#[derive(Error, Debug)]
312pub enum ProcessingError {
313    #[error(
314        "Trades not sorted at index {index}: prev=({prev_time}, {prev_id}), curr=({curr_time}, {curr_id})"
315    )]
316    UnsortedTrades {
317        index: usize,
318        prev_time: i64,
319        prev_id: i64,
320        curr_time: i64,
321        curr_id: i64,
322    },
323
324    #[error("Empty trade data")]
325    EmptyData,
326
327    #[error(
328        "Invalid threshold: {threshold_decimal_bps} (decimal bps). Valid range: 1-100,000 (0.001%-100%)"
329    )]
330    InvalidThreshold { threshold_decimal_bps: u32 },
331}
332
333#[cfg(feature = "python")]
334impl From<ProcessingError> for PyErr {
335    fn from(err: ProcessingError) -> PyErr {
336        match err {
337            ProcessingError::UnsortedTrades {
338                index,
339                prev_time,
340                prev_id,
341                curr_time,
342                curr_id,
343            } => pyo3::exceptions::PyValueError::new_err(format!(
344                "Trades not sorted at index {}: prev=({}, {}), curr=({}, {})",
345                index, prev_time, prev_id, curr_time, curr_id
346            )),
347            ProcessingError::EmptyData => {
348                pyo3::exceptions::PyValueError::new_err("Empty trade data")
349            }
350            ProcessingError::InvalidThreshold {
351                threshold_decimal_bps,
352            } => pyo3::exceptions::PyValueError::new_err(format!(
353                "Invalid threshold: {} (decimal bps). Valid range: 1-100,000 (0.001%-100%)",
354                threshold_decimal_bps
355            )),
356        }
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use crate::test_utils::{self, scenarios};
364
365    #[test]
366    fn test_single_bar_no_breach() {
367        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
368
369        // Create trades that stay within 25 bps threshold
370        let trades = scenarios::no_breach_sequence(250);
371
372        // Test strict algorithm compliance: no bars should be created without breach
373        let bars = processor.process_agg_trade_records(&trades).unwrap();
374        assert_eq!(
375            bars.len(),
376            0,
377            "Strict algorithm should not create bars without breach"
378        );
379
380        // Test analysis mode: incomplete bar should be available for analysis
381        let bars_with_incomplete = processor
382            .process_agg_trade_records_with_incomplete(&trades)
383            .unwrap();
384        assert_eq!(
385            bars_with_incomplete.len(),
386            1,
387            "Analysis mode should include incomplete bar"
388        );
389
390        let bar = &bars_with_incomplete[0];
391        assert_eq!(bar.open.to_string(), "50000.00000000");
392        assert_eq!(bar.high.to_string(), "50100.00000000");
393        assert_eq!(bar.low.to_string(), "49900.00000000");
394        assert_eq!(bar.close.to_string(), "49900.00000000");
395    }
396
397    #[test]
398    fn test_exact_breach_upward() {
399        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
400
401        let trades = scenarios::exact_breach_upward(250);
402
403        // Test strict algorithm: only completed bars (with breach)
404        let bars = processor.process_agg_trade_records(&trades).unwrap();
405        assert_eq!(
406            bars.len(),
407            1,
408            "Strict algorithm should only return completed bars"
409        );
410
411        // First bar should close at breach
412        let bar1 = &bars[0];
413        assert_eq!(bar1.open.to_string(), "50000.00000000");
414        // Breach at 25 bps = 0.25% = 50000 * 1.0025 = 50125
415        assert_eq!(bar1.close.to_string(), "50125.00000000"); // Breach tick included
416        assert_eq!(bar1.high.to_string(), "50125.00000000");
417        assert_eq!(bar1.low.to_string(), "50000.00000000");
418
419        // Test analysis mode: includes incomplete second bar
420        let bars_with_incomplete = processor
421            .process_agg_trade_records_with_incomplete(&trades)
422            .unwrap();
423        assert_eq!(
424            bars_with_incomplete.len(),
425            2,
426            "Analysis mode should include incomplete bars"
427        );
428
429        // Second bar should start at next tick price (not breach price)
430        let bar2 = &bars_with_incomplete[1];
431        assert_eq!(bar2.open.to_string(), "50500.00000000"); // Next tick after breach
432        assert_eq!(bar2.close.to_string(), "50500.00000000");
433    }
434
435    #[test]
436    fn test_exact_breach_downward() {
437        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
438
439        let trades = scenarios::exact_breach_downward(250);
440
441        let bars = processor.process_agg_trade_records(&trades).unwrap();
442
443        assert_eq!(bars.len(), 1);
444
445        let bar = &bars[0];
446        assert_eq!(bar.open.to_string(), "50000.00000000");
447        assert_eq!(bar.close.to_string(), "49875.00000000"); // Breach tick included
448        assert_eq!(bar.high.to_string(), "50000.00000000");
449        assert_eq!(bar.low.to_string(), "49875.00000000");
450    }
451
452    #[test]
453    fn test_large_gap_single_bar() {
454        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
455
456        let trades = scenarios::large_gap_sequence();
457
458        let bars = processor.process_agg_trade_records(&trades).unwrap();
459
460        // Should create exactly ONE bar, not multiple bars to "fill the gap"
461        assert_eq!(bars.len(), 1);
462
463        let bar = &bars[0];
464        assert_eq!(bar.open.to_string(), "50000.00000000");
465        assert_eq!(bar.close.to_string(), "51000.00000000");
466        assert_eq!(bar.high.to_string(), "51000.00000000");
467        assert_eq!(bar.low.to_string(), "50000.00000000");
468    }
469
470    #[test]
471    fn test_unsorted_trades_error() {
472        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
473
474        let trades = scenarios::unsorted_sequence();
475
476        let result = processor.process_agg_trade_records(&trades);
477        assert!(result.is_err());
478
479        match result {
480            Err(ProcessingError::UnsortedTrades { index, .. }) => {
481                assert_eq!(index, 1);
482            }
483            _ => panic!("Expected UnsortedTrades error"),
484        }
485    }
486
487    #[test]
488    fn test_threshold_calculation() {
489        let processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
490
491        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
492        let bar_state = RangeBarState::new(&trade, processor.threshold_decimal_bps);
493
494        // 50000 * 0.0025 = 125 (25bps = 0.25%)
495        assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
496        assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
497    }
498
499    #[test]
500    fn test_empty_trades() {
501        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
502        let trades = scenarios::empty_sequence();
503        let bars = processor.process_agg_trade_records(&trades).unwrap();
504        assert_eq!(bars.len(), 0);
505    }
506
507    #[test]
508    fn test_debug_streaming_data() {
509        let mut processor = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
510
511        // Create trades similar to our test data
512        let trades = vec![
513            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
514            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
515            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
516        ];
517
518        println!("Test data prices: 50014 -> 50163 -> 50032");
519        println!("Expected price movements: +0.3% then -0.26%");
520
521        let bars = processor.process_agg_trade_records(&trades).unwrap();
522        println!("Generated {} range bars", bars.len());
523
524        for (i, bar) in bars.iter().enumerate() {
525            println!(
526                "  Bar {}: O={} H={} L={} C={}",
527                i + 1,
528                bar.open,
529                bar.high,
530                bar.low,
531                bar.close
532            );
533        }
534
535        // With a 0.1% threshold and 0.3% price movement, we should get at least 1 bar
536        assert!(
537            !bars.is_empty(),
538            "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
539        );
540    }
541
542    #[test]
543    fn test_threshold_validation() {
544        // Valid threshold
545        assert!(RangeBarProcessor::new(250).is_ok());
546
547        // Invalid: too low (0 × 0.1bps = 0%)
548        assert!(matches!(
549            RangeBarProcessor::new(0),
550            Err(ProcessingError::InvalidThreshold {
551                threshold_decimal_bps: 0
552            })
553        ));
554
555        // Invalid: too high (150,000 × 0.1bps = 15,000bps = 150%)
556        assert!(matches!(
557            RangeBarProcessor::new(150_000),
558            Err(ProcessingError::InvalidThreshold {
559                threshold_decimal_bps: 150_000
560            })
561        ));
562
563        // Valid boundary: minimum (1 × 0.1bps = 0.1bps = 0.001%)
564        assert!(RangeBarProcessor::new(1).is_ok());
565
566        // Valid boundary: maximum (100,000 × 0.1bps = 10,000bps = 100%)
567        assert!(RangeBarProcessor::new(100_000).is_ok());
568    }
569
570    #[test]
571    fn test_export_processor_with_manual_trades() {
572        println!("Testing ExportRangeBarProcessor with same trade data...");
573
574        let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
575
576        // Use same trades as the working basic test
577        let trades = vec![
578            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
579            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
580            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
581        ];
582
583        println!(
584            "Processing {} trades with ExportRangeBarProcessor...",
585            trades.len()
586        );
587
588        export_processor.process_trades_continuously(&trades);
589        let bars = export_processor.get_all_completed_bars();
590
591        println!(
592            "ExportRangeBarProcessor generated {} range bars",
593            bars.len()
594        );
595        for (i, bar) in bars.iter().enumerate() {
596            println!(
597                "  Bar {}: O={} H={} L={} C={}",
598                i + 1,
599                bar.open,
600                bar.high,
601                bar.low,
602                bar.close
603            );
604        }
605
606        // Should match the basic processor results (1 bar)
607        assert!(
608            !bars.is_empty(),
609            "ExportRangeBarProcessor should generate same results as basic processor"
610        );
611    }
612}
613
614/// Internal state for range bar construction with fixed-point precision
615#[derive(Debug, Clone)]
616struct InternalRangeBar {
617    open_time: i64,
618    close_time: i64,
619    open: FixedPoint,
620    high: FixedPoint,
621    low: FixedPoint,
622    close: FixedPoint,
623    volume: FixedPoint,
624    turnover: i128,
625    individual_trade_count: i64,
626    agg_record_count: u32,
627    first_trade_id: i64,
628    last_trade_id: i64,
629    /// Volume from buy-side trades (is_buyer_maker = false)
630    buy_volume: FixedPoint,
631    /// Volume from sell-side trades (is_buyer_maker = true)
632    sell_volume: FixedPoint,
633    /// Number of buy-side trades
634    buy_trade_count: i64,
635    /// Number of sell-side trades
636    sell_trade_count: i64,
637    /// Volume Weighted Average Price
638    vwap: FixedPoint,
639    /// Turnover from buy-side trades
640    buy_turnover: i128,
641    /// Turnover from sell-side trades
642    sell_turnover: i128,
643}
644
645/// Export-oriented range bar processor for streaming use cases
646///
647/// This implementation uses the proven fixed-point arithmetic algorithm
648/// that achieves 100% breach consistency compliance in multi-year processing.
649pub struct ExportRangeBarProcessor {
650    threshold_decimal_bps: u32,
651    current_bar: Option<InternalRangeBar>,
652    completed_bars: Vec<RangeBar>,
653}
654
655impl ExportRangeBarProcessor {
656    /// Create new export processor with given threshold
657    ///
658    /// # Arguments
659    ///
660    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
661    ///   - Example: `250` → 25bps = 0.25%
662    ///   - Example: `10` → 1bps = 0.01%
663    ///   - Minimum: `1` → 0.1bps = 0.001%
664    ///
665    /// # Breaking Change (v3.0.0)
666    ///
667    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
668    /// **Migration**: Multiply all threshold values by 10.
669    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
670        // Validation bounds (v3.0.0: decimal bps units)
671        // Min: 1 × 0.1bps = 0.1bps = 0.001%
672        // Max: 100,000 × 0.1bps = 10,000bps = 100%
673        if threshold_decimal_bps < 1 {
674            return Err(ProcessingError::InvalidThreshold {
675                threshold_decimal_bps,
676            });
677        }
678        if threshold_decimal_bps > 100_000 {
679            return Err(ProcessingError::InvalidThreshold {
680                threshold_decimal_bps,
681            });
682        }
683
684        Ok(Self {
685            threshold_decimal_bps,
686            current_bar: None,
687            completed_bars: Vec::new(),
688        })
689    }
690
691    /// Process trades continuously using proven fixed-point algorithm
692    /// This method maintains 100% breach consistency by using precise integer arithmetic
693    pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
694        for trade in trades {
695            self.process_single_trade_fixed_point(trade);
696        }
697    }
698
699    /// Process single trade using proven fixed-point algorithm (100% breach consistency)
700    fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
701        if self.current_bar.is_none() {
702            // Start new bar
703            let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
704
705            self.current_bar = Some(InternalRangeBar {
706                open_time: trade.timestamp,
707                close_time: trade.timestamp,
708                open: trade.price,
709                high: trade.price,
710                low: trade.price,
711                close: trade.price,
712                volume: trade.volume,
713                turnover: trade_turnover,
714                individual_trade_count: 1,
715                agg_record_count: 1,
716                first_trade_id: trade.agg_trade_id,
717                last_trade_id: trade.agg_trade_id,
718                // Market microstructure fields
719                buy_volume: if trade.is_buyer_maker {
720                    FixedPoint(0)
721                } else {
722                    trade.volume
723                },
724                sell_volume: if trade.is_buyer_maker {
725                    trade.volume
726                } else {
727                    FixedPoint(0)
728                },
729                buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
730                sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
731                vwap: trade.price,
732                buy_turnover: if trade.is_buyer_maker {
733                    0
734                } else {
735                    trade_turnover
736                },
737                sell_turnover: if trade.is_buyer_maker {
738                    trade_turnover
739                } else {
740                    0
741                },
742            });
743            return;
744        }
745
746        // Process existing bar - work with reference
747        // SAFETY: current_bar guaranteed Some - early return above if None
748        let bar = self.current_bar.as_mut().unwrap();
749        let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
750
751        // CRITICAL FIX: Use fixed-point integer arithmetic for precise threshold calculation
752        // v3.0.0: threshold now in decimal bps, using BASIS_POINTS_SCALE = 100_000
753        let price_val = trade.price.0;
754        let bar_open_val = bar.open.0;
755        let threshold_decimal_bps = self.threshold_decimal_bps as i64;
756        let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
757        let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
758
759        // Update bar with new trade
760        bar.close_time = trade.timestamp;
761        bar.close = trade.price;
762        bar.volume.0 += trade.volume.0;
763        bar.turnover += trade_turnover;
764        bar.individual_trade_count += 1;
765        bar.agg_record_count += 1;
766        bar.last_trade_id = trade.agg_trade_id;
767
768        // Update high/low
769        if price_val > bar.high.0 {
770            bar.high = trade.price;
771        }
772        if price_val < bar.low.0 {
773            bar.low = trade.price;
774        }
775
776        // Update market microstructure
777        if trade.is_buyer_maker {
778            bar.sell_volume.0 += trade.volume.0;
779            bar.sell_turnover += trade_turnover;
780            bar.sell_trade_count += 1;
781        } else {
782            bar.buy_volume.0 += trade.volume.0;
783            bar.buy_turnover += trade_turnover;
784            bar.buy_trade_count += 1;
785        }
786
787        // CRITICAL: Fixed-point threshold breach detection (matches proven 100% compliance algorithm)
788        if price_val >= upper_threshold || price_val <= lower_threshold {
789            // Close current bar and move to completed
790            // SAFETY: current_bar guaranteed Some - checked at line 688/734
791            let completed_bar = self.current_bar.take().unwrap();
792
793            // Convert to export format (this is from an old internal structure)
794            let export_bar = RangeBar {
795                open_time: completed_bar.open_time,
796                close_time: completed_bar.close_time,
797                open: completed_bar.open,
798                high: completed_bar.high,
799                low: completed_bar.low,
800                close: completed_bar.close,
801                volume: completed_bar.volume,
802                turnover: completed_bar.turnover,
803
804                // Enhanced fields
805                individual_trade_count: completed_bar.individual_trade_count as u32,
806                agg_record_count: completed_bar.agg_record_count,
807                first_trade_id: completed_bar.first_trade_id,
808                last_trade_id: completed_bar.last_trade_id,
809                data_source: crate::types::DataSource::default(),
810
811                // Market microstructure fields
812                buy_volume: completed_bar.buy_volume,
813                sell_volume: completed_bar.sell_volume,
814                buy_trade_count: completed_bar.buy_trade_count as u32,
815                sell_trade_count: completed_bar.sell_trade_count as u32,
816                vwap: completed_bar.vwap,
817                buy_turnover: completed_bar.buy_turnover,
818                sell_turnover: completed_bar.sell_turnover,
819            };
820
821            self.completed_bars.push(export_bar);
822
823            // Start new bar with breaching trade
824            let initial_buy_turnover = if trade.is_buyer_maker {
825                0
826            } else {
827                trade_turnover
828            };
829            let initial_sell_turnover = if trade.is_buyer_maker {
830                trade_turnover
831            } else {
832                0
833            };
834
835            self.current_bar = Some(InternalRangeBar {
836                open_time: trade.timestamp,
837                close_time: trade.timestamp,
838                open: trade.price,
839                high: trade.price,
840                low: trade.price,
841                close: trade.price,
842                volume: trade.volume,
843                turnover: trade_turnover,
844                individual_trade_count: 1,
845                agg_record_count: 1,
846                first_trade_id: trade.agg_trade_id,
847                last_trade_id: trade.agg_trade_id,
848                // Market microstructure fields
849                buy_volume: if trade.is_buyer_maker {
850                    FixedPoint(0)
851                } else {
852                    trade.volume
853                },
854                sell_volume: if trade.is_buyer_maker {
855                    trade.volume
856                } else {
857                    FixedPoint(0)
858                },
859                buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
860                sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
861                vwap: trade.price,
862                buy_turnover: initial_buy_turnover,
863                sell_turnover: initial_sell_turnover,
864            });
865        }
866    }
867
868    /// Get all completed bars accumulated so far
869    /// This drains the internal buffer to avoid memory leaks
870    pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
871        std::mem::take(&mut self.completed_bars)
872    }
873
874    /// Get incomplete bar if exists (for final bar processing)
875    pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
876        self.current_bar.as_ref().map(|incomplete| RangeBar {
877            open_time: incomplete.open_time,
878            close_time: incomplete.close_time,
879            open: incomplete.open,
880            high: incomplete.high,
881            low: incomplete.low,
882            close: incomplete.close,
883            volume: incomplete.volume,
884            turnover: incomplete.turnover,
885
886            // Enhanced fields
887            individual_trade_count: incomplete.individual_trade_count as u32,
888            agg_record_count: incomplete.agg_record_count,
889            first_trade_id: incomplete.first_trade_id,
890            last_trade_id: incomplete.last_trade_id,
891            data_source: crate::types::DataSource::default(),
892
893            // Market microstructure fields
894            buy_volume: incomplete.buy_volume,
895            sell_volume: incomplete.sell_volume,
896            buy_trade_count: incomplete.buy_trade_count as u32,
897            sell_trade_count: incomplete.sell_trade_count as u32,
898            vwap: incomplete.vwap,
899            buy_turnover: incomplete.buy_turnover,
900            sell_turnover: incomplete.sell_turnover,
901        })
902    }
903}