rangebar_core/
processor.rs

1//! Core range bar processing algorithm
2//!
3//! Implements non-lookahead bias range bar construction where bars close when
4//! price moves ±threshold bps from the bar's OPEN price.
5
6use crate::fixed_point::FixedPoint;
7use crate::types::{AggTrade, RangeBar};
8#[cfg(feature = "python")]
9use pyo3::prelude::*;
10use thiserror::Error;
11
12/// Range bar processor with non-lookahead bias guarantee
13pub struct RangeBarProcessor {
14    /// Threshold in tenths of basis points (250 = 25bps, v3.0.0+)
15    threshold_bps: u32,
16
17    /// Current bar state for streaming processing (Q19)
18    /// Enables get_incomplete_bar() and stateful process_single_trade()
19    current_bar_state: Option<RangeBarState>,
20}
21
22impl RangeBarProcessor {
23    /// Create new processor with given threshold
24    ///
25    /// # Arguments
26    ///
27    /// * `threshold_bps` - Threshold in **tenths of basis points** (0.1bps units)
28    ///   - Example: `250` → 25bps = 0.25%
29    ///   - Example: `10` → 1bps = 0.01%
30    ///   - Minimum: `1` → 0.1bps = 0.001%
31    ///
32    /// # Breaking Change (v3.0.0)
33    ///
34    /// Prior to v3.0.0, `threshold_bps` was in 1bps units.
35    /// **Migration**: Multiply all threshold values by 10.
36    pub fn new(threshold_bps: u32) -> Result<Self, ProcessingError> {
37        // Validation bounds (v3.0.0: 0.1bps units)
38        // Min: 1 × 0.1bps = 0.1bps = 0.001%
39        // Max: 100,000 × 0.1bps = 10,000bps = 100%
40        if threshold_bps < 1 {
41            return Err(ProcessingError::InvalidThreshold { threshold_bps });
42        }
43        if threshold_bps > 100_000 {
44            return Err(ProcessingError::InvalidThreshold { threshold_bps });
45        }
46
47        Ok(Self {
48            threshold_bps,
49            current_bar_state: None,
50        })
51    }
52
53    /// Process a single trade and return completed bar if any
54    ///
55    /// Maintains internal state for streaming use case. State persists across calls
56    /// until a bar completes (threshold breach), enabling get_incomplete_bar().
57    ///
58    /// # Arguments
59    ///
60    /// * `trade` - Single aggregated trade to process
61    ///
62    /// # Returns
63    ///
64    /// `Some(RangeBar)` if a bar was completed, `None` otherwise
65    ///
66    /// # State Management
67    ///
68    /// - First trade: Initializes new bar state
69    /// - Subsequent trades: Updates existing bar or closes on breach
70    /// - Breach: Returns completed bar, starts new bar with breaching trade
71    pub fn process_single_trade(
72        &mut self,
73        trade: AggTrade,
74    ) -> Result<Option<RangeBar>, ProcessingError> {
75        match &mut self.current_bar_state {
76            None => {
77                // First trade - initialize new bar
78                self.current_bar_state = Some(RangeBarState::new(&trade, self.threshold_bps));
79                Ok(None)
80            }
81            Some(bar_state) => {
82                // Check for threshold breach
83                if bar_state.bar.is_breach(
84                    trade.price,
85                    bar_state.upper_threshold,
86                    bar_state.lower_threshold,
87                ) {
88                    // Breach detected - close current bar
89                    bar_state.bar.update_with_trade(&trade);
90
91                    // Validation: Ensure high/low include open/close extremes
92                    debug_assert!(
93                        bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
94                    );
95                    debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
96
97                    let completed_bar = bar_state.bar.clone();
98
99                    // Start new bar with breaching trade
100                    self.current_bar_state = Some(RangeBarState::new(&trade, self.threshold_bps));
101
102                    Ok(Some(completed_bar))
103                } else {
104                    // No breach - update existing bar
105                    bar_state.bar.update_with_trade(&trade);
106                    Ok(None)
107                }
108            }
109        }
110    }
111
112    /// Get any incomplete bar currently being processed
113    ///
114    /// Returns clone of current bar state for inspection without consuming it.
115    /// Useful for final bar at stream end or progress monitoring.
116    ///
117    /// # Returns
118    ///
119    /// `Some(RangeBar)` if bar is in progress, `None` if no active bar
120    pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
121        self.current_bar_state
122            .as_ref()
123            .map(|state| state.bar.clone())
124    }
125
126    /// Process AggTrade records into range bars including incomplete bars for analysis
127    ///
128    /// # Arguments
129    ///
130    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
131    ///
132    /// # Returns
133    ///
134    /// Vector of range bars including incomplete bars at end of data
135    ///
136    /// # Warning
137    ///
138    /// This method is for analysis purposes only. Incomplete bars violate the
139    /// fundamental range bar algorithm and should not be used for production trading.
140    pub fn process_agg_trade_records_with_incomplete(
141        &mut self,
142        agg_trade_records: &[AggTrade],
143    ) -> Result<Vec<RangeBar>, ProcessingError> {
144        self.process_agg_trade_records_with_options(agg_trade_records, true)
145    }
146
147    /// Process Binance aggregated trade records into range bars
148    ///
149    /// This is the primary method for converting AggTrade records (which aggregate
150    /// multiple individual trades) into range bars based on price movement thresholds.
151    ///
152    /// # Parameters
153    ///
154    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
155    ///   Each record represents multiple individual trades aggregated at same price
156    ///
157    /// # Returns
158    ///
159    /// Vector of completed range bars (ONLY bars that breached thresholds).
160    /// Each bar tracks both individual trade count and AggTrade record count.
161    pub fn process_agg_trade_records(
162        &mut self,
163        agg_trade_records: &[AggTrade],
164    ) -> Result<Vec<RangeBar>, ProcessingError> {
165        self.process_agg_trade_records_with_options(agg_trade_records, false)
166    }
167
168    /// Process AggTrade records with options for including incomplete bars
169    ///
170    /// Batch processing mode: Clears any existing state before processing.
171    /// Use process_single_trade() for stateful streaming instead.
172    ///
173    /// # Parameters
174    ///
175    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
176    /// * `include_incomplete` - Whether to include incomplete bars at end of processing
177    ///
178    /// # Returns
179    ///
180    /// Vector of range bars (completed + incomplete if requested)
181    pub fn process_agg_trade_records_with_options(
182        &mut self,
183        agg_trade_records: &[AggTrade],
184        include_incomplete: bool,
185    ) -> Result<Vec<RangeBar>, ProcessingError> {
186        if agg_trade_records.is_empty() {
187            return Ok(Vec::new());
188        }
189
190        // Validate records are sorted
191        self.validate_trade_ordering(agg_trade_records)?;
192
193        // Clear streaming state - batch mode uses local state
194        self.current_bar_state = None;
195
196        let mut bars = Vec::with_capacity(agg_trade_records.len() / 100); // Heuristic capacity
197        let mut current_bar: Option<RangeBarState> = None;
198        let mut defer_open = false;
199
200        for agg_record in agg_trade_records {
201            if defer_open {
202                // Previous bar closed, this agg_record opens new bar
203                current_bar = Some(RangeBarState::new(agg_record, self.threshold_bps));
204                defer_open = false;
205                continue;
206            }
207
208            match current_bar {
209                None => {
210                    // First bar initialization
211                    current_bar = Some(RangeBarState::new(agg_record, self.threshold_bps));
212                }
213                Some(ref mut bar_state) => {
214                    // Check if this AggTrade record breaches the threshold
215                    if bar_state.bar.is_breach(
216                        agg_record.price,
217                        bar_state.upper_threshold,
218                        bar_state.lower_threshold,
219                    ) {
220                        // Breach detected - update bar with breaching record (includes microstructure)
221                        bar_state.bar.update_with_trade(agg_record);
222
223                        // Validation: Ensure high/low include open/close extremes
224                        debug_assert!(
225                            bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
226                        );
227                        debug_assert!(
228                            bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
229                        );
230
231                        bars.push(bar_state.bar.clone());
232                        current_bar = None;
233                        defer_open = true; // Next record will open new bar
234                    } else {
235                        // No breach: normal update with microstructure calculations
236                        bar_state.bar.update_with_trade(agg_record);
237                    }
238                }
239            }
240        }
241
242        // Add final partial bar only if explicitly requested
243        // This preserves algorithm integrity: bars should only close on threshold breach
244        if include_incomplete && let Some(bar_state) = current_bar {
245            bars.push(bar_state.bar);
246        }
247
248        Ok(bars)
249    }
250
251    /// Validate that trades are properly sorted for deterministic processing
252    fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
253        for i in 1..trades.len() {
254            let prev = &trades[i - 1];
255            let curr = &trades[i];
256
257            // Check ordering: (timestamp, agg_trade_id) ascending
258            if curr.timestamp < prev.timestamp
259                || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
260            {
261                return Err(ProcessingError::UnsortedTrades {
262                    index: i,
263                    prev_time: prev.timestamp,
264                    prev_id: prev.agg_trade_id,
265                    curr_time: curr.timestamp,
266                    curr_id: curr.agg_trade_id,
267                });
268            }
269        }
270
271        Ok(())
272    }
273}
274
275/// Internal state for a range bar being built
276struct RangeBarState {
277    /// The range bar being constructed
278    pub bar: RangeBar,
279
280    /// Upper breach threshold (FIXED from bar open)
281    pub upper_threshold: FixedPoint,
282
283    /// Lower breach threshold (FIXED from bar open)
284    pub lower_threshold: FixedPoint,
285}
286
287impl RangeBarState {
288    /// Create new range bar state from opening trade
289    fn new(trade: &AggTrade, threshold_bps: u32) -> Self {
290        let bar = RangeBar::new(trade);
291
292        // Compute FIXED thresholds from opening price
293        let (upper_threshold, lower_threshold) = bar.open.compute_range_thresholds(threshold_bps);
294
295        Self {
296            bar,
297            upper_threshold,
298            lower_threshold,
299        }
300    }
301}
302
303/// Processing errors
304#[derive(Error, Debug)]
305pub enum ProcessingError {
306    #[error(
307        "Trades not sorted at index {index}: prev=({prev_time}, {prev_id}), curr=({curr_time}, {curr_id})"
308    )]
309    UnsortedTrades {
310        index: usize,
311        prev_time: i64,
312        prev_id: i64,
313        curr_time: i64,
314        curr_id: i64,
315    },
316
317    #[error("Empty trade data")]
318    EmptyData,
319
320    #[error(
321        "Invalid threshold: {threshold_bps} (0.1bps units). Valid range: 1-100,000 (0.001%-100%)"
322    )]
323    InvalidThreshold { threshold_bps: u32 },
324}
325
326#[cfg(feature = "python")]
327impl From<ProcessingError> for PyErr {
328    fn from(err: ProcessingError) -> PyErr {
329        match err {
330            ProcessingError::UnsortedTrades {
331                index,
332                prev_time,
333                prev_id,
334                curr_time,
335                curr_id,
336            } => pyo3::exceptions::PyValueError::new_err(format!(
337                "Trades not sorted at index {}: prev=({}, {}), curr=({}, {})",
338                index, prev_time, prev_id, curr_time, curr_id
339            )),
340            ProcessingError::EmptyData => {
341                pyo3::exceptions::PyValueError::new_err("Empty trade data")
342            }
343            ProcessingError::InvalidThreshold { threshold_bps } => {
344                pyo3::exceptions::PyValueError::new_err(format!(
345                    "Invalid threshold: {} (0.1bps units). Valid range: 1-100,000 (0.001%-100%)",
346                    threshold_bps
347                ))
348            }
349        }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use crate::test_utils::{self, scenarios};
357
358    #[test]
359    fn test_single_bar_no_breach() {
360        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
361
362        // Create trades that stay within 25 bps threshold
363        let trades = scenarios::no_breach_sequence(250);
364
365        // Test strict algorithm compliance: no bars should be created without breach
366        let bars = processor.process_agg_trade_records(&trades).unwrap();
367        assert_eq!(
368            bars.len(),
369            0,
370            "Strict algorithm should not create bars without breach"
371        );
372
373        // Test analysis mode: incomplete bar should be available for analysis
374        let bars_with_incomplete = processor
375            .process_agg_trade_records_with_incomplete(&trades)
376            .unwrap();
377        assert_eq!(
378            bars_with_incomplete.len(),
379            1,
380            "Analysis mode should include incomplete bar"
381        );
382
383        let bar = &bars_with_incomplete[0];
384        assert_eq!(bar.open.to_string(), "50000.00000000");
385        assert_eq!(bar.high.to_string(), "50100.00000000");
386        assert_eq!(bar.low.to_string(), "49900.00000000");
387        assert_eq!(bar.close.to_string(), "49900.00000000");
388    }
389
390    #[test]
391    fn test_exact_breach_upward() {
392        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
393
394        let trades = scenarios::exact_breach_upward(250);
395
396        // Test strict algorithm: only completed bars (with breach)
397        let bars = processor.process_agg_trade_records(&trades).unwrap();
398        assert_eq!(
399            bars.len(),
400            1,
401            "Strict algorithm should only return completed bars"
402        );
403
404        // First bar should close at breach
405        let bar1 = &bars[0];
406        assert_eq!(bar1.open.to_string(), "50000.00000000");
407        // Breach at 25 bps = 0.25% = 50000 * 1.0025 = 50125
408        assert_eq!(bar1.close.to_string(), "50125.00000000"); // Breach tick included
409        assert_eq!(bar1.high.to_string(), "50125.00000000");
410        assert_eq!(bar1.low.to_string(), "50000.00000000");
411
412        // Test analysis mode: includes incomplete second bar
413        let bars_with_incomplete = processor
414            .process_agg_trade_records_with_incomplete(&trades)
415            .unwrap();
416        assert_eq!(
417            bars_with_incomplete.len(),
418            2,
419            "Analysis mode should include incomplete bars"
420        );
421
422        // Second bar should start at next tick price (not breach price)
423        let bar2 = &bars_with_incomplete[1];
424        assert_eq!(bar2.open.to_string(), "50500.00000000"); // Next tick after breach
425        assert_eq!(bar2.close.to_string(), "50500.00000000");
426    }
427
428    #[test]
429    fn test_exact_breach_downward() {
430        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
431
432        let trades = scenarios::exact_breach_downward(250);
433
434        let bars = processor.process_agg_trade_records(&trades).unwrap();
435
436        assert_eq!(bars.len(), 1);
437
438        let bar = &bars[0];
439        assert_eq!(bar.open.to_string(), "50000.00000000");
440        assert_eq!(bar.close.to_string(), "49875.00000000"); // Breach tick included
441        assert_eq!(bar.high.to_string(), "50000.00000000");
442        assert_eq!(bar.low.to_string(), "49875.00000000");
443    }
444
445    #[test]
446    fn test_large_gap_single_bar() {
447        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
448
449        let trades = scenarios::large_gap_sequence();
450
451        let bars = processor.process_agg_trade_records(&trades).unwrap();
452
453        // Should create exactly ONE bar, not multiple bars to "fill the gap"
454        assert_eq!(bars.len(), 1);
455
456        let bar = &bars[0];
457        assert_eq!(bar.open.to_string(), "50000.00000000");
458        assert_eq!(bar.close.to_string(), "51000.00000000");
459        assert_eq!(bar.high.to_string(), "51000.00000000");
460        assert_eq!(bar.low.to_string(), "50000.00000000");
461    }
462
463    #[test]
464    fn test_unsorted_trades_error() {
465        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
466
467        let trades = scenarios::unsorted_sequence();
468
469        let result = processor.process_agg_trade_records(&trades);
470        assert!(result.is_err());
471
472        match result {
473            Err(ProcessingError::UnsortedTrades { index, .. }) => {
474                assert_eq!(index, 1);
475            }
476            _ => panic!("Expected UnsortedTrades error"),
477        }
478    }
479
480    #[test]
481    fn test_threshold_calculation() {
482        let processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
483
484        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
485        let bar_state = RangeBarState::new(&trade, processor.threshold_bps);
486
487        // 50000 * 0.0025 = 125 (25bps = 0.25%)
488        assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
489        assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
490    }
491
492    #[test]
493    fn test_empty_trades() {
494        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
495        let trades = scenarios::empty_sequence();
496        let bars = processor.process_agg_trade_records(&trades).unwrap();
497        assert_eq!(bars.len(), 0);
498    }
499
500    #[test]
501    fn test_debug_streaming_data() {
502        let mut processor = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
503
504        // Create trades similar to our test data
505        let trades = vec![
506            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
507            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
508            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
509        ];
510
511        println!("Test data prices: 50014 -> 50163 -> 50032");
512        println!("Expected price movements: +0.3% then -0.26%");
513
514        let bars = processor.process_agg_trade_records(&trades).unwrap();
515        println!("Generated {} range bars", bars.len());
516
517        for (i, bar) in bars.iter().enumerate() {
518            println!(
519                "  Bar {}: O={} H={} L={} C={}",
520                i + 1,
521                bar.open,
522                bar.high,
523                bar.low,
524                bar.close
525            );
526        }
527
528        // With a 0.1% threshold and 0.3% price movement, we should get at least 1 bar
529        assert!(
530            !bars.is_empty(),
531            "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
532        );
533    }
534
535    #[test]
536    fn test_threshold_validation() {
537        // Valid threshold
538        assert!(RangeBarProcessor::new(250).is_ok());
539
540        // Invalid: too low (0 × 0.1bps = 0%)
541        assert!(matches!(
542            RangeBarProcessor::new(0),
543            Err(ProcessingError::InvalidThreshold { threshold_bps: 0 })
544        ));
545
546        // Invalid: too high (150,000 × 0.1bps = 15,000bps = 150%)
547        assert!(matches!(
548            RangeBarProcessor::new(150_000),
549            Err(ProcessingError::InvalidThreshold {
550                threshold_bps: 150_000
551            })
552        ));
553
554        // Valid boundary: minimum (1 × 0.1bps = 0.1bps = 0.001%)
555        assert!(RangeBarProcessor::new(1).is_ok());
556
557        // Valid boundary: maximum (100,000 × 0.1bps = 10,000bps = 100%)
558        assert!(RangeBarProcessor::new(100_000).is_ok());
559    }
560
561    #[test]
562    fn test_export_processor_with_manual_trades() {
563        println!("Testing ExportRangeBarProcessor with same trade data...");
564
565        let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
566
567        // Use same trades as the working basic test
568        let trades = vec![
569            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
570            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
571            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
572        ];
573
574        println!(
575            "Processing {} trades with ExportRangeBarProcessor...",
576            trades.len()
577        );
578
579        export_processor.process_trades_continuously(&trades);
580        let bars = export_processor.get_all_completed_bars();
581
582        println!(
583            "ExportRangeBarProcessor generated {} range bars",
584            bars.len()
585        );
586        for (i, bar) in bars.iter().enumerate() {
587            println!(
588                "  Bar {}: O={} H={} L={} C={}",
589                i + 1,
590                bar.open,
591                bar.high,
592                bar.low,
593                bar.close
594            );
595        }
596
597        // Should match the basic processor results (1 bar)
598        assert!(
599            !bars.is_empty(),
600            "ExportRangeBarProcessor should generate same results as basic processor"
601        );
602    }
603}
604
605/// Internal state for range bar construction with fixed-point precision
606#[derive(Debug, Clone)]
607struct InternalRangeBar {
608    open_time: i64,
609    close_time: i64,
610    open: FixedPoint,
611    high: FixedPoint,
612    low: FixedPoint,
613    close: FixedPoint,
614    volume: FixedPoint,
615    turnover: i128,
616    individual_trade_count: i64,
617    agg_record_count: u32,
618    first_trade_id: i64,
619    last_trade_id: i64,
620    /// Volume from buy-side trades (is_buyer_maker = false)
621    buy_volume: FixedPoint,
622    /// Volume from sell-side trades (is_buyer_maker = true)
623    sell_volume: FixedPoint,
624    /// Number of buy-side trades
625    buy_trade_count: i64,
626    /// Number of sell-side trades
627    sell_trade_count: i64,
628    /// Volume Weighted Average Price
629    vwap: FixedPoint,
630    /// Turnover from buy-side trades
631    buy_turnover: i128,
632    /// Turnover from sell-side trades
633    sell_turnover: i128,
634}
635
636/// Export-oriented range bar processor for streaming use cases
637///
638/// This implementation uses the proven fixed-point arithmetic algorithm
639/// that achieves 100% breach consistency compliance in multi-year processing.
640pub struct ExportRangeBarProcessor {
641    threshold_bps: u32,
642    current_bar: Option<InternalRangeBar>,
643    completed_bars: Vec<RangeBar>,
644}
645
646impl ExportRangeBarProcessor {
647    /// Create new export processor with given threshold
648    ///
649    /// # Arguments
650    ///
651    /// * `threshold_bps` - Threshold in **tenths of basis points** (0.1bps units)
652    ///   - Example: `250` → 25bps = 0.25%
653    ///   - Example: `10` → 1bps = 0.01%
654    ///   - Minimum: `1` → 0.1bps = 0.001%
655    ///
656    /// # Breaking Change (v3.0.0)
657    ///
658    /// Prior to v3.0.0, `threshold_bps` was in 1bps units.
659    /// **Migration**: Multiply all threshold values by 10.
660    pub fn new(threshold_bps: u32) -> Result<Self, ProcessingError> {
661        // Validation bounds (v3.0.0: 0.1bps units)
662        // Min: 1 × 0.1bps = 0.1bps = 0.001%
663        // Max: 100,000 × 0.1bps = 10,000bps = 100%
664        if threshold_bps < 1 {
665            return Err(ProcessingError::InvalidThreshold { threshold_bps });
666        }
667        if threshold_bps > 100_000 {
668            return Err(ProcessingError::InvalidThreshold { threshold_bps });
669        }
670
671        Ok(Self {
672            threshold_bps,
673            current_bar: None,
674            completed_bars: Vec::new(),
675        })
676    }
677
678    /// Process trades continuously using proven fixed-point algorithm
679    /// This method maintains 100% breach consistency by using precise integer arithmetic
680    pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
681        for trade in trades {
682            self.process_single_trade_fixed_point(trade);
683        }
684    }
685
686    /// Process single trade using proven fixed-point algorithm (100% breach consistency)
687    fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
688        if self.current_bar.is_none() {
689            // Start new bar
690            let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
691
692            self.current_bar = Some(InternalRangeBar {
693                open_time: trade.timestamp,
694                close_time: trade.timestamp,
695                open: trade.price,
696                high: trade.price,
697                low: trade.price,
698                close: trade.price,
699                volume: trade.volume,
700                turnover: trade_turnover,
701                individual_trade_count: 1,
702                agg_record_count: 1,
703                first_trade_id: trade.agg_trade_id,
704                last_trade_id: trade.agg_trade_id,
705                // Market microstructure fields
706                buy_volume: if trade.is_buyer_maker {
707                    FixedPoint(0)
708                } else {
709                    trade.volume
710                },
711                sell_volume: if trade.is_buyer_maker {
712                    trade.volume
713                } else {
714                    FixedPoint(0)
715                },
716                buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
717                sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
718                vwap: trade.price,
719                buy_turnover: if trade.is_buyer_maker {
720                    0
721                } else {
722                    trade_turnover
723                },
724                sell_turnover: if trade.is_buyer_maker {
725                    trade_turnover
726                } else {
727                    0
728                },
729            });
730            return;
731        }
732
733        // Process existing bar - work with reference
734        // SAFETY: current_bar guaranteed Some - early return above if None
735        let bar = self.current_bar.as_mut().unwrap();
736        let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
737
738        // CRITICAL FIX: Use fixed-point integer arithmetic for precise threshold calculation
739        // v3.0.0: threshold_bps now in 0.1bps units, using BASIS_POINTS_SCALE = 100_000
740        let price_val = trade.price.0;
741        let bar_open_val = bar.open.0;
742        let threshold_bps = self.threshold_bps as i64;
743        let upper_threshold = bar_open_val + (bar_open_val * threshold_bps) / 100_000;
744        let lower_threshold = bar_open_val - (bar_open_val * threshold_bps) / 100_000;
745
746        // Update bar with new trade
747        bar.close_time = trade.timestamp;
748        bar.close = trade.price;
749        bar.volume.0 += trade.volume.0;
750        bar.turnover += trade_turnover;
751        bar.individual_trade_count += 1;
752        bar.agg_record_count += 1;
753        bar.last_trade_id = trade.agg_trade_id;
754
755        // Update high/low
756        if price_val > bar.high.0 {
757            bar.high = trade.price;
758        }
759        if price_val < bar.low.0 {
760            bar.low = trade.price;
761        }
762
763        // Update market microstructure
764        if trade.is_buyer_maker {
765            bar.sell_volume.0 += trade.volume.0;
766            bar.sell_turnover += trade_turnover;
767            bar.sell_trade_count += 1;
768        } else {
769            bar.buy_volume.0 += trade.volume.0;
770            bar.buy_turnover += trade_turnover;
771            bar.buy_trade_count += 1;
772        }
773
774        // CRITICAL: Fixed-point threshold breach detection (matches proven 100% compliance algorithm)
775        if price_val >= upper_threshold || price_val <= lower_threshold {
776            // Close current bar and move to completed
777            // SAFETY: current_bar guaranteed Some - checked at line 688/734
778            let completed_bar = self.current_bar.take().unwrap();
779
780            // Convert to export format (this is from an old internal structure)
781            let export_bar = RangeBar {
782                open_time: completed_bar.open_time,
783                close_time: completed_bar.close_time,
784                open: completed_bar.open,
785                high: completed_bar.high,
786                low: completed_bar.low,
787                close: completed_bar.close,
788                volume: completed_bar.volume,
789                turnover: completed_bar.turnover,
790
791                // Enhanced fields
792                individual_trade_count: completed_bar.individual_trade_count as u32,
793                agg_record_count: completed_bar.agg_record_count,
794                first_trade_id: completed_bar.first_trade_id,
795                last_trade_id: completed_bar.last_trade_id,
796                data_source: crate::types::DataSource::default(),
797
798                // Market microstructure fields
799                buy_volume: completed_bar.buy_volume,
800                sell_volume: completed_bar.sell_volume,
801                buy_trade_count: completed_bar.buy_trade_count as u32,
802                sell_trade_count: completed_bar.sell_trade_count as u32,
803                vwap: completed_bar.vwap,
804                buy_turnover: completed_bar.buy_turnover,
805                sell_turnover: completed_bar.sell_turnover,
806            };
807
808            self.completed_bars.push(export_bar);
809
810            // Start new bar with breaching trade
811            let initial_buy_turnover = if trade.is_buyer_maker {
812                0
813            } else {
814                trade_turnover
815            };
816            let initial_sell_turnover = if trade.is_buyer_maker {
817                trade_turnover
818            } else {
819                0
820            };
821
822            self.current_bar = Some(InternalRangeBar {
823                open_time: trade.timestamp,
824                close_time: trade.timestamp,
825                open: trade.price,
826                high: trade.price,
827                low: trade.price,
828                close: trade.price,
829                volume: trade.volume,
830                turnover: trade_turnover,
831                individual_trade_count: 1,
832                agg_record_count: 1,
833                first_trade_id: trade.agg_trade_id,
834                last_trade_id: trade.agg_trade_id,
835                // Market microstructure fields
836                buy_volume: if trade.is_buyer_maker {
837                    FixedPoint(0)
838                } else {
839                    trade.volume
840                },
841                sell_volume: if trade.is_buyer_maker {
842                    trade.volume
843                } else {
844                    FixedPoint(0)
845                },
846                buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
847                sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
848                vwap: trade.price,
849                buy_turnover: initial_buy_turnover,
850                sell_turnover: initial_sell_turnover,
851            });
852        }
853    }
854
855    /// Get all completed bars accumulated so far
856    /// This drains the internal buffer to avoid memory leaks
857    pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
858        std::mem::take(&mut self.completed_bars)
859    }
860
861    /// Get incomplete bar if exists (for final bar processing)
862    pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
863        self.current_bar.as_ref().map(|incomplete| RangeBar {
864            open_time: incomplete.open_time,
865            close_time: incomplete.close_time,
866            open: incomplete.open,
867            high: incomplete.high,
868            low: incomplete.low,
869            close: incomplete.close,
870            volume: incomplete.volume,
871            turnover: incomplete.turnover,
872
873            // Enhanced fields
874            individual_trade_count: incomplete.individual_trade_count as u32,
875            agg_record_count: incomplete.agg_record_count,
876            first_trade_id: incomplete.first_trade_id,
877            last_trade_id: incomplete.last_trade_id,
878            data_source: crate::types::DataSource::default(),
879
880            // Market microstructure fields
881            buy_volume: incomplete.buy_volume,
882            sell_volume: incomplete.sell_volume,
883            buy_trade_count: incomplete.buy_trade_count as u32,
884            sell_trade_count: incomplete.sell_trade_count as u32,
885            vwap: incomplete.vwap,
886            buy_turnover: incomplete.buy_turnover,
887            sell_turnover: incomplete.sell_turnover,
888        })
889    }
890}