Skip to main content

rangebar_core/
export_processor.rs

1//! Export-oriented range bar processor
2//! Extracted from processor.rs (Phase 2d refactoring)
3
4use crate::errors::ProcessingError;
5use crate::fixed_point::FixedPoint;
6use crate::types::{AggTrade, RangeBar};
7
8/// Internal state for range bar construction with fixed-point precision
9#[derive(Debug, Clone)]
10pub(crate) struct InternalRangeBar {
11    open_time: i64,
12    close_time: i64,
13    open: FixedPoint,
14    high: FixedPoint,
15    low: FixedPoint,
16    close: FixedPoint,
17    // Issue #88: i128 volume accumulators to prevent FixedPoint(i64) overflow
18    // on high-token-count symbols like SHIBUSDT
19    volume: i128,
20    turnover: i128,
21    individual_trade_count: i64,
22    agg_record_count: u32,
23    first_trade_id: i64,
24    last_trade_id: i64,
25    /// First aggregate trade ID in this range bar (Issue #72)
26    first_agg_trade_id: i64,
27    /// Last aggregate trade ID in this range bar (Issue #72)
28    last_agg_trade_id: i64,
29    /// Volume from buy-side trades (is_buyer_maker = false)
30    // Issue #88: i128 to prevent overflow
31    buy_volume: i128,
32    /// Volume from sell-side trades (is_buyer_maker = true)
33    // Issue #88: i128 to prevent overflow
34    sell_volume: i128,
35    /// Number of buy-side trades
36    buy_trade_count: i64,
37    /// Number of sell-side trades
38    sell_trade_count: i64,
39    /// Volume Weighted Average Price
40    vwap: FixedPoint,
41    /// Turnover from buy-side trades
42    buy_turnover: i128,
43    /// Turnover from sell-side trades
44    sell_turnover: i128,
45}
46
47/// Export-oriented range bar processor for streaming use cases
48///
49/// This implementation uses the proven fixed-point arithmetic algorithm
50/// that achieves 100% breach consistency compliance in multi-year processing.
51pub struct ExportRangeBarProcessor {
52    threshold_decimal_bps: u32,
53    current_bar: Option<InternalRangeBar>,
54    completed_bars: Vec<RangeBar>,
55    /// Issue #96 Task #71: Reuse pool for completed_bars vec (streaming hot path)
56    completed_bars_pool: Option<Vec<RangeBar>>,
57    /// Prevent bars from closing on same timestamp as they opened (Issue #36)
58    prevent_same_timestamp_close: bool,
59    /// Deferred bar open flag (Issue #46) - next trade opens new bar after breach
60    defer_open: bool,
61}
62
63impl ExportRangeBarProcessor {
64    /// Create new export processor with given threshold
65    ///
66    /// Uses default behavior: `prevent_same_timestamp_close = true` (Issue #36)
67    ///
68    /// # Arguments
69    ///
70    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
71    ///   - Example: `250` -> 25bps = 0.25%
72    ///   - Example: `10` -> 1bps = 0.01%
73    ///   - Minimum: `1` -> 0.1bps = 0.001%
74    ///
75    /// # Breaking Change (v3.0.0)
76    ///
77    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
78    /// **Migration**: Multiply all threshold values by 10.
79    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
80        Self::with_options(threshold_decimal_bps, true)
81    }
82
83    /// Create new export processor with explicit timestamp gating control
84    pub fn with_options(
85        threshold_decimal_bps: u32,
86        prevent_same_timestamp_close: bool,
87    ) -> Result<Self, ProcessingError> {
88        // Validation bounds (v3.0.0: dbps units)
89        // Min: 1 dbps = 0.001%
90        // Max: 100,000 dbps = 100%
91        if threshold_decimal_bps < 1 {
92            return Err(ProcessingError::InvalidThreshold {
93                threshold_decimal_bps,
94            });
95        }
96        if threshold_decimal_bps > 100_000 {
97            return Err(ProcessingError::InvalidThreshold {
98                threshold_decimal_bps,
99            });
100        }
101
102        Ok(Self {
103            threshold_decimal_bps,
104            current_bar: None,
105            completed_bars: Vec::new(),
106            completed_bars_pool: None,
107            prevent_same_timestamp_close,
108            defer_open: false,
109        })
110    }
111
112    /// Process trades continuously using proven fixed-point algorithm
113    /// This method maintains 100% breach consistency by using precise integer arithmetic
114    pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
115        for trade in trades {
116            self.process_single_trade_fixed_point(trade);
117        }
118    }
119
120    /// Process single trade using proven fixed-point algorithm (100% breach consistency)
121    fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
122        // Issue #46: If previous trade triggered a breach, this trade opens the new bar.
123        // This matches the batch path's defer_open semantics.
124        if self.defer_open {
125            self.defer_open = false;
126            self.current_bar = None; // Clear any stale state
127            // Fall through to the is_none() branch below to start new bar
128        }
129
130        if self.current_bar.is_none() {
131            // Start new bar
132            // Issue #96: Use integer turnover (matches main processor) — eliminates 2 f64 conversions
133            let trade_turnover = trade.turnover();
134            let vol = trade.volume.0 as i128;
135
136            // Single branch for buy/sell classification
137            let (buy_vol, sell_vol, buy_count, sell_count, buy_turn, sell_turn) =
138                if trade.is_buyer_maker {
139                    (0i128, vol, 0i64, 1i64, 0i128, trade_turnover)
140                } else {
141                    (vol, 0i128, 1i64, 0i64, trade_turnover, 0i128)
142                };
143
144            self.current_bar = Some(InternalRangeBar {
145                open_time: trade.timestamp,
146                close_time: trade.timestamp,
147                open: trade.price,
148                high: trade.price,
149                low: trade.price,
150                close: trade.price,
151                // Issue #88: i128 volume accumulators
152                volume: vol,
153                turnover: trade_turnover,
154                individual_trade_count: 1,
155                agg_record_count: 1,
156                first_trade_id: trade.first_trade_id,
157                last_trade_id: trade.last_trade_id,
158                // Issue #72: Track aggregate trade IDs
159                first_agg_trade_id: trade.agg_trade_id,
160                last_agg_trade_id: trade.agg_trade_id,
161                // Market microstructure fields (Issue #88: i128)
162                buy_volume: buy_vol,
163                sell_volume: sell_vol,
164                buy_trade_count: buy_count,
165                sell_trade_count: sell_count,
166                vwap: trade.price,
167                buy_turnover: buy_turn,
168                sell_turnover: sell_turn,
169            });
170            return;
171        }
172
173        // Process existing bar - work with reference
174        // SAFETY: current_bar guaranteed Some - early return above if None
175        let bar = self.current_bar.as_mut().unwrap();
176        // Issue #96: Use integer turnover (matches main processor) — eliminates 2 f64 conversions
177        let trade_turnover = trade.turnover();
178
179        // CRITICAL FIX: Use fixed-point integer arithmetic for precise threshold calculation
180        // v3.0.0: threshold now in dbps, using BASIS_POINTS_SCALE = 100_000
181        let price_val = trade.price.0;
182        let bar_open_val = bar.open.0;
183        let threshold_decimal_bps = self.threshold_decimal_bps as i64;
184        let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
185        let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
186
187        // Update bar with new trade
188        bar.close_time = trade.timestamp;
189        bar.close = trade.price;
190        bar.volume += trade.volume.0 as i128; // Issue #88: i128 accumulator
191        bar.turnover += trade_turnover;
192        bar.individual_trade_count += 1;
193        bar.agg_record_count += 1;
194        bar.last_trade_id = trade.last_trade_id;
195        bar.last_agg_trade_id = trade.agg_trade_id; // Issue #72
196
197        // Update high/low
198        if price_val > bar.high.0 {
199            bar.high = trade.price;
200        }
201        if price_val < bar.low.0 {
202            bar.low = trade.price;
203        }
204
205        // Update market microstructure
206        if trade.is_buyer_maker {
207            bar.sell_volume += trade.volume.0 as i128; // Issue #88: i128 accumulator
208            bar.sell_turnover += trade_turnover;
209            bar.sell_trade_count += 1;
210        } else {
211            bar.buy_volume += trade.volume.0 as i128; // Issue #88: i128 accumulator
212            bar.buy_turnover += trade_turnover;
213            bar.buy_trade_count += 1;
214        }
215
216        // CRITICAL: Fixed-point threshold breach detection (matches proven 100% compliance algorithm)
217        let price_breaches = price_val >= upper_threshold || price_val <= lower_threshold;
218
219        // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
220        let timestamp_allows_close =
221            !self.prevent_same_timestamp_close || trade.timestamp != bar.open_time;
222
223        if price_breaches && timestamp_allows_close {
224            // Close current bar and move to completed
225            // SAFETY: current_bar guaranteed Some - checked at line 688/734
226            let completed_bar = self.current_bar.take().unwrap();
227
228            // Convert to export format — uses ..Default::default() for all
229            // microstructure/inter-bar/intra-bar fields (0/0.0/None)
230            let mut export_bar = RangeBar {
231                open_time: completed_bar.open_time,
232                close_time: completed_bar.close_time,
233                open: completed_bar.open,
234                high: completed_bar.high,
235                low: completed_bar.low,
236                close: completed_bar.close,
237                volume: completed_bar.volume,
238                turnover: completed_bar.turnover,
239                individual_trade_count: completed_bar.individual_trade_count as u32,
240                agg_record_count: completed_bar.agg_record_count,
241                first_trade_id: completed_bar.first_trade_id,
242                last_trade_id: completed_bar.last_trade_id,
243                first_agg_trade_id: completed_bar.first_agg_trade_id, // Issue #72
244                last_agg_trade_id: completed_bar.last_agg_trade_id,
245                buy_volume: completed_bar.buy_volume,
246                sell_volume: completed_bar.sell_volume,
247                buy_trade_count: completed_bar.buy_trade_count as u32,
248                sell_trade_count: completed_bar.sell_trade_count as u32,
249                vwap: completed_bar.vwap,
250                buy_turnover: completed_bar.buy_turnover,
251                sell_turnover: completed_bar.sell_turnover,
252                ..Default::default() // Issue #25/#59: microstructure computed below; inter/intra-bar not used
253            };
254
255            // Compute microstructure features at bar finalization (Issue #25)
256            export_bar.compute_microstructure_features();
257
258            self.completed_bars.push(export_bar);
259
260            // Issue #46: Don't start new bar with breaching trade.
261            // Next trade will open the new bar via defer_open.
262            self.current_bar = None;
263            self.defer_open = true;
264        }
265    }
266
267    /// Get all completed bars accumulated so far
268    /// This drains the internal buffer to avoid memory leaks
269    pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
270        // Issue #96 Task #71: Vec reuse pool to reduce allocation overhead on hot path
271        let mut result = if let Some(mut pool_vec) = self.completed_bars_pool.take() {
272            // Reuse pool vec for next batch
273            pool_vec.clear();
274            pool_vec
275        } else {
276            // First call or pool was None
277            Vec::new()
278        };
279
280        // Swap current completed bars with pool vec
281        std::mem::swap(&mut result, &mut self.completed_bars);
282
283        // Store the now-empty completed_bars in pool for next cycle
284        self.completed_bars_pool = Some(std::mem::take(&mut self.completed_bars));
285
286        result
287    }
288
289    /// Get incomplete bar if exists (for final bar processing)
290    pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
291        self.current_bar.as_ref().map(|incomplete| {
292            let mut bar = RangeBar {
293                open_time: incomplete.open_time,
294                close_time: incomplete.close_time,
295                open: incomplete.open,
296                high: incomplete.high,
297                low: incomplete.low,
298                close: incomplete.close,
299                volume: incomplete.volume,
300                turnover: incomplete.turnover,
301
302                // Enhanced fields
303                individual_trade_count: incomplete.individual_trade_count as u32,
304                agg_record_count: incomplete.agg_record_count,
305                first_trade_id: incomplete.first_trade_id,
306                last_trade_id: incomplete.last_trade_id,
307                first_agg_trade_id: incomplete.first_agg_trade_id,
308                last_agg_trade_id: incomplete.last_agg_trade_id,
309                data_source: crate::types::DataSource::default(),
310
311                // Market microstructure fields
312                buy_volume: incomplete.buy_volume,
313                sell_volume: incomplete.sell_volume,
314                buy_trade_count: incomplete.buy_trade_count as u32,
315                sell_trade_count: incomplete.sell_trade_count as u32,
316                vwap: incomplete.vwap,
317                buy_turnover: incomplete.buy_turnover,
318                sell_turnover: incomplete.sell_turnover,
319
320                // All microstructure, inter-bar, and intra-bar features default to 0/None
321                ..Default::default()
322            };
323            // Compute microstructure features for incomplete bar (Issue #25)
324            bar.compute_microstructure_features();
325            bar
326        })
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::test_utils::create_test_agg_trade_with_range;
334
335    /// Helper: create a buy trade at given price/time
336    fn buy_trade(id: i64, price: &str, vol: &str, ts: i64) -> AggTrade {
337        create_test_agg_trade_with_range(id, price, vol, ts, id * 10, id * 10, false)
338    }
339
340    /// Helper: create a sell trade at given price/time
341    fn sell_trade(id: i64, price: &str, vol: &str, ts: i64) -> AggTrade {
342        create_test_agg_trade_with_range(id, price, vol, ts, id * 10, id * 10, true)
343    }
344
345    #[test]
346    fn test_new_valid_threshold() {
347        let proc = ExportRangeBarProcessor::new(250);
348        assert!(proc.is_ok());
349    }
350
351    #[test]
352    fn test_new_invalid_threshold_zero() {
353        match ExportRangeBarProcessor::new(0) {
354            Err(ProcessingError::InvalidThreshold {
355                threshold_decimal_bps: 0,
356            }) => {}
357            Err(e) => panic!("Expected InvalidThreshold(0), got error: {e}"),
358            Ok(_) => panic!("Expected error for threshold 0"),
359        }
360    }
361
362    #[test]
363    fn test_new_invalid_threshold_too_high() {
364        let proc = ExportRangeBarProcessor::new(100_001);
365        assert!(proc.is_err());
366    }
367
368    #[test]
369    fn test_new_boundary_thresholds() {
370        // Minimum valid
371        assert!(ExportRangeBarProcessor::new(1).is_ok());
372        // Maximum valid
373        assert!(ExportRangeBarProcessor::new(100_000).is_ok());
374    }
375
376    #[test]
377    fn test_with_options_timestamp_gating() {
378        let proc = ExportRangeBarProcessor::with_options(250, false);
379        assert!(proc.is_ok());
380    }
381
382    #[test]
383    fn test_single_trade_no_bar_completion() {
384        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
385        let trades = vec![buy_trade(1, "100.0", "1.0", 1000)];
386        proc.process_trades_continuously(&trades);
387
388        let completed = proc.get_all_completed_bars();
389        assert_eq!(completed.len(), 0, "Single trade should not complete a bar");
390
391        let incomplete = proc.get_incomplete_bar();
392        assert!(incomplete.is_some(), "Should have an incomplete bar");
393        let bar = incomplete.unwrap();
394        assert_eq!(bar.open, FixedPoint::from_str("100.0").unwrap());
395        assert_eq!(bar.close, FixedPoint::from_str("100.0").unwrap());
396    }
397
398    #[test]
399    fn test_breach_completes_bar() {
400        // 250 dbps = 0.25%. At open=100.0, upper=100.25, lower=99.75
401        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
402        let trades = vec![
403            buy_trade(1, "100.0", "1.0", 1000),
404            buy_trade(2, "100.10", "1.0", 2000),
405            buy_trade(3, "100.25", "1.0", 3000), // Breach: >= upper threshold
406        ];
407        proc.process_trades_continuously(&trades);
408
409        let completed = proc.get_all_completed_bars();
410        assert_eq!(completed.len(), 1, "Breach should complete one bar");
411
412        let bar = &completed[0];
413        assert_eq!(bar.open, FixedPoint::from_str("100.0").unwrap());
414        assert_eq!(bar.close, FixedPoint::from_str("100.25").unwrap());
415        assert_eq!(bar.high, FixedPoint::from_str("100.25").unwrap());
416        assert_eq!(bar.low, FixedPoint::from_str("100.0").unwrap());
417    }
418
419    #[test]
420    fn test_defer_open_semantics() {
421        // Issue #46: Breaching trade should NOT open next bar
422        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
423        let trades = vec![
424            buy_trade(1, "100.0", "1.0", 1000),
425            buy_trade(2, "100.25", "1.0", 2000), // Breach → completes bar 1
426            buy_trade(3, "100.50", "1.0", 3000),  // Opens bar 2 (defer_open)
427        ];
428        proc.process_trades_continuously(&trades);
429
430        let completed = proc.get_all_completed_bars();
431        assert_eq!(completed.len(), 1);
432        // Bar 1 was opened by trade 1, closed by trade 2
433        assert_eq!(completed[0].open, FixedPoint::from_str("100.0").unwrap());
434        assert_eq!(completed[0].close, FixedPoint::from_str("100.25").unwrap());
435
436        // Incomplete bar should be opened by trade 3 (not trade 2)
437        let incomplete = proc.get_incomplete_bar();
438        assert!(incomplete.is_some());
439        let bar2 = incomplete.unwrap();
440        assert_eq!(
441            bar2.open,
442            FixedPoint::from_str("100.50").unwrap(),
443            "Bar 2 should open at trade 3's price, not the breaching trade"
444        );
445    }
446
447    #[test]
448    fn test_timestamp_gate_prevents_same_ts_close() {
449        // Issue #36: Bar cannot close on same timestamp as it opened
450        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
451        let trades = vec![
452            buy_trade(1, "100.0", "1.0", 1000),
453            buy_trade(2, "100.30", "1.0", 1000), // Same ts as open, breach but gated
454        ];
455        proc.process_trades_continuously(&trades);
456
457        let completed = proc.get_all_completed_bars();
458        assert_eq!(
459            completed.len(),
460            0,
461            "Timestamp gate should prevent close on same ms"
462        );
463    }
464
465    #[test]
466    fn test_timestamp_gate_disabled() {
467        // With timestamp gating off, same-ts breach closes the bar
468        let mut proc = ExportRangeBarProcessor::with_options(250, false).unwrap();
469        let trades = vec![
470            buy_trade(1, "100.0", "1.0", 1000),
471            buy_trade(2, "100.30", "1.0", 1000), // Same ts, breach allowed
472        ];
473        proc.process_trades_continuously(&trades);
474
475        let completed = proc.get_all_completed_bars();
476        assert_eq!(
477            completed.len(),
478            1,
479            "With gating disabled, same-ts breach should close"
480        );
481    }
482
483    #[test]
484    fn test_get_all_completed_bars_drains() {
485        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
486        let trades = vec![
487            buy_trade(1, "100.0", "1.0", 1000),
488            buy_trade(2, "100.25", "1.0", 2000), // Breach
489        ];
490        proc.process_trades_continuously(&trades);
491
492        let bars1 = proc.get_all_completed_bars();
493        assert_eq!(bars1.len(), 1);
494
495        // Second call should return empty (drained)
496        let bars2 = proc.get_all_completed_bars();
497        assert_eq!(bars2.len(), 0, "get_all_completed_bars should drain buffer");
498    }
499
500    #[test]
501    fn test_vec_reuse_pool() {
502        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
503
504        // First batch: produce a bar
505        proc.process_trades_continuously(&[
506            buy_trade(1, "100.0", "1.0", 1000),
507            buy_trade(2, "100.25", "1.0", 2000),
508        ]);
509        let _bars1 = proc.get_all_completed_bars();
510
511        // Second batch: produce another bar — pool should be reused
512        proc.process_trades_continuously(&[
513            sell_trade(3, "100.50", "1.0", 3000),
514            sell_trade(4, "100.75", "1.0", 4000),
515            sell_trade(5, "100.24", "1.0", 5000), // Breach lower
516        ]);
517        let bars2 = proc.get_all_completed_bars();
518        assert_eq!(bars2.len(), 1);
519    }
520
521    #[test]
522    fn test_buy_sell_volume_segregation() {
523        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
524        let trades = vec![
525            buy_trade(1, "100.0", "2.0", 1000),   // Buy: 2.0
526            sell_trade(2, "100.05", "3.0", 2000),  // Sell: 3.0
527            buy_trade(3, "100.25", "1.0", 3000),   // Buy: 1.0, breach
528        ];
529        proc.process_trades_continuously(&trades);
530
531        let bars = proc.get_all_completed_bars();
532        assert_eq!(bars.len(), 1);
533        let bar = &bars[0];
534
535        let buy_vol = bar.buy_volume;
536        let sell_vol = bar.sell_volume;
537        // Buy trades: 2.0 + 1.0 = 3.0, Sell trades: 3.0
538        assert_eq!(buy_vol, 300_000_000, "Buy volume should be 3.0 in FixedPoint i128");
539        assert_eq!(sell_vol, 300_000_000, "Sell volume should be 3.0 in FixedPoint i128");
540    }
541
542    #[test]
543    fn test_trade_id_tracking() {
544        // Issue #72: Verify first/last agg trade ID tracking
545        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
546        let trades = vec![
547            create_test_agg_trade_with_range(100, "100.0", "1.0", 1000, 1000, 1005, false),
548            create_test_agg_trade_with_range(101, "100.10", "1.0", 2000, 1006, 1010, true),
549            create_test_agg_trade_with_range(102, "100.25", "1.0", 3000, 1011, 1015, false), // Breach
550        ];
551        proc.process_trades_continuously(&trades);
552
553        let bars = proc.get_all_completed_bars();
554        assert_eq!(bars.len(), 1);
555        let bar = &bars[0];
556        assert_eq!(bar.first_agg_trade_id, 100);
557        assert_eq!(bar.last_agg_trade_id, 102);
558        assert_eq!(bar.first_trade_id, 1000);
559        assert_eq!(bar.last_trade_id, 1015);
560    }
561
562    #[test]
563    fn test_microstructure_features_computed() {
564        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
565        let trades = vec![
566            buy_trade(1, "100.0", "5.0", 1000),
567            sell_trade(2, "100.10", "3.0", 2000),
568            buy_trade(3, "100.25", "2.0", 3000), // Breach
569        ];
570        proc.process_trades_continuously(&trades);
571
572        let bars = proc.get_all_completed_bars();
573        let bar = &bars[0];
574
575        // OFI should be computed (buy_vol > sell_vol → positive)
576        // buy = 5.0 + 2.0 = 7.0, sell = 3.0, ofi = (7-3)/10 = 0.4
577        assert!(bar.ofi != 0.0, "OFI should be computed");
578        assert!(bar.trade_intensity > 0.0, "Trade intensity should be > 0");
579        assert!(bar.volume_per_trade > 0.0, "Volume per trade should be > 0");
580    }
581
582    #[test]
583    fn test_incomplete_bar_has_microstructure() {
584        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
585        proc.process_trades_continuously(&[
586            buy_trade(1, "100.0", "5.0", 1000),
587            sell_trade(2, "100.10", "3.0", 2000),
588        ]);
589
590        let incomplete = proc.get_incomplete_bar().unwrap();
591        // Microstructure should be computed on incomplete bars too
592        assert!(
593            incomplete.volume_per_trade > 0.0,
594            "Incomplete bar should have microstructure features"
595        );
596    }
597
598    #[test]
599    fn test_multiple_bars_sequence() {
600        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
601        // Generate enough trades for 2 complete bars
602        let trades = vec![
603            buy_trade(1, "100.0", "1.0", 1000),
604            buy_trade(2, "100.25", "1.0", 2000),   // Breach → bar 1
605            buy_trade(3, "100.50", "1.0", 3000),    // Opens bar 2
606            buy_trade(4, "100.76", "1.0", 4000),    // Breach → bar 2 (100.50 * 1.0025 = 100.75125)
607        ];
608        proc.process_trades_continuously(&trades);
609
610        let bars = proc.get_all_completed_bars();
611        assert_eq!(bars.len(), 2, "Should produce 2 complete bars");
612        assert_eq!(bars[0].open, FixedPoint::from_str("100.0").unwrap());
613        assert_eq!(bars[1].open, FixedPoint::from_str("100.50").unwrap());
614    }
615
616    #[test]
617    fn test_downward_breach() {
618        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
619        let trades = vec![
620            sell_trade(1, "100.0", "1.0", 1000),
621            sell_trade(2, "99.75", "1.0", 2000), // Breach lower: <= 99.75
622        ];
623        proc.process_trades_continuously(&trades);
624
625        let bars = proc.get_all_completed_bars();
626        assert_eq!(bars.len(), 1);
627        assert_eq!(bars[0].close, FixedPoint::from_str("99.75").unwrap());
628    }
629
630    #[test]
631    fn test_empty_trades_no_op() {
632        let mut proc = ExportRangeBarProcessor::new(250).unwrap();
633        proc.process_trades_continuously(&[]);
634        assert_eq!(proc.get_all_completed_bars().len(), 0);
635        assert!(proc.get_incomplete_bar().is_none());
636    }
637
638    // === Issue #96: Cross-processor parity verification ===
639
640    #[test]
641    fn test_parity_with_range_bar_processor() {
642        use crate::processor::RangeBarProcessor;
643
644        // Test across multiple thresholds
645        for threshold in [250, 500, 1000] {
646            // Generate a multi-breach trade sequence
647            let trades: Vec<AggTrade> = (0..20)
648                .map(|i| {
649                    let price = format!("{:.8}", 100.0 + (i as f64 * 0.15));
650                    buy_trade(i + 1, &price, "1.0", 1000 + i * 1000)
651                })
652                .collect();
653
654            // Process with both processors
655            let mut main_proc = RangeBarProcessor::new(threshold).unwrap();
656            let main_bars = main_proc.process_agg_trade_records(&trades).unwrap();
657
658            let mut export_proc = ExportRangeBarProcessor::new(threshold).unwrap();
659            export_proc.process_trades_continuously(&trades);
660            let export_bars = export_proc.get_all_completed_bars();
661
662            // Bar count must match
663            assert_eq!(
664                main_bars.len(), export_bars.len(),
665                "threshold={threshold}: bar count mismatch: main={} export={}",
666                main_bars.len(), export_bars.len()
667            );
668
669            // Per-bar field-level comparison
670            for (i, (m, e)) in main_bars.iter().zip(export_bars.iter()).enumerate() {
671                assert_eq!(m.open, e.open, "t={threshold} bar={i}: open mismatch");
672                assert_eq!(m.high, e.high, "t={threshold} bar={i}: high mismatch");
673                assert_eq!(m.low, e.low, "t={threshold} bar={i}: low mismatch");
674                assert_eq!(m.close, e.close, "t={threshold} bar={i}: close mismatch");
675                assert_eq!(m.volume, e.volume, "t={threshold} bar={i}: volume mismatch");
676                assert_eq!(m.open_time, e.open_time, "t={threshold} bar={i}: open_time mismatch");
677                assert_eq!(m.close_time, e.close_time, "t={threshold} bar={i}: close_time mismatch");
678                assert_eq!(m.individual_trade_count, e.individual_trade_count, "t={threshold} bar={i}: trade_count mismatch");
679                assert_eq!(
680                    m.first_agg_trade_id, e.first_agg_trade_id,
681                    "t={threshold} bar={i}: first_agg_trade_id mismatch"
682                );
683                assert_eq!(
684                    m.last_agg_trade_id, e.last_agg_trade_id,
685                    "t={threshold} bar={i}: last_agg_trade_id mismatch"
686                );
687            }
688        }
689    }
690}