Skip to main content

rangebar_core/
export_processor.rs

1//! Export-oriented range bar processor
2//! Extracted from processor.rs (Phase 2d refactoring)
3
4use crate::errors::ProcessingError;
5use crate::fixed_point::FixedPoint;
6use crate::types::{AggTrade, RangeBar};
7
8/// Internal state for range bar construction with fixed-point precision
9#[derive(Debug, Clone)]
10pub(crate) struct InternalRangeBar {
11    open_time: i64,
12    close_time: i64,
13    open: FixedPoint,
14    high: FixedPoint,
15    low: FixedPoint,
16    close: FixedPoint,
17    // Issue #88: i128 volume accumulators to prevent FixedPoint(i64) overflow
18    // on high-token-count symbols like SHIBUSDT
19    volume: i128,
20    turnover: i128,
21    individual_trade_count: i64,
22    agg_record_count: u32,
23    first_trade_id: i64,
24    last_trade_id: i64,
25    /// First aggregate trade ID in this range bar (Issue #72)
26    first_agg_trade_id: i64,
27    /// Last aggregate trade ID in this range bar (Issue #72)
28    last_agg_trade_id: i64,
29    /// Volume from buy-side trades (is_buyer_maker = false)
30    // Issue #88: i128 to prevent overflow
31    buy_volume: i128,
32    /// Volume from sell-side trades (is_buyer_maker = true)
33    // Issue #88: i128 to prevent overflow
34    sell_volume: i128,
35    /// Number of buy-side trades
36    buy_trade_count: i64,
37    /// Number of sell-side trades
38    sell_trade_count: i64,
39    /// Volume Weighted Average Price
40    vwap: FixedPoint,
41    /// Turnover from buy-side trades
42    buy_turnover: i128,
43    /// Turnover from sell-side trades
44    sell_turnover: i128,
45}
46
47/// Export-oriented range bar processor for streaming use cases
48///
49/// This implementation uses the proven fixed-point arithmetic algorithm
50/// that achieves 100% breach consistency compliance in multi-year processing.
51pub struct ExportRangeBarProcessor {
52    threshold_decimal_bps: u32,
53    current_bar: Option<InternalRangeBar>,
54    completed_bars: Vec<RangeBar>,
55    /// Prevent bars from closing on same timestamp as they opened (Issue #36)
56    prevent_same_timestamp_close: bool,
57    /// Deferred bar open flag (Issue #46) - next trade opens new bar after breach
58    defer_open: bool,
59}
60
61impl ExportRangeBarProcessor {
62    /// Create new export processor with given threshold
63    ///
64    /// Uses default behavior: `prevent_same_timestamp_close = true` (Issue #36)
65    ///
66    /// # Arguments
67    ///
68    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
69    ///   - Example: `250` -> 25bps = 0.25%
70    ///   - Example: `10` -> 1bps = 0.01%
71    ///   - Minimum: `1` -> 0.1bps = 0.001%
72    ///
73    /// # Breaking Change (v3.0.0)
74    ///
75    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
76    /// **Migration**: Multiply all threshold values by 10.
77    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
78        Self::with_options(threshold_decimal_bps, true)
79    }
80
81    /// Create new export processor with explicit timestamp gating control
82    pub fn with_options(
83        threshold_decimal_bps: u32,
84        prevent_same_timestamp_close: bool,
85    ) -> Result<Self, ProcessingError> {
86        // Validation bounds (v3.0.0: dbps units)
87        // Min: 1 dbps = 0.001%
88        // Max: 100,000 dbps = 100%
89        if threshold_decimal_bps < 1 {
90            return Err(ProcessingError::InvalidThreshold {
91                threshold_decimal_bps,
92            });
93        }
94        if threshold_decimal_bps > 100_000 {
95            return Err(ProcessingError::InvalidThreshold {
96                threshold_decimal_bps,
97            });
98        }
99
100        Ok(Self {
101            threshold_decimal_bps,
102            current_bar: None,
103            completed_bars: Vec::new(),
104            prevent_same_timestamp_close,
105            defer_open: false,
106        })
107    }
108
109    /// Process trades continuously using proven fixed-point algorithm
110    /// This method maintains 100% breach consistency by using precise integer arithmetic
111    pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
112        for trade in trades {
113            self.process_single_trade_fixed_point(trade);
114        }
115    }
116
117    /// Process single trade using proven fixed-point algorithm (100% breach consistency)
118    fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
119        // Issue #46: If previous trade triggered a breach, this trade opens the new bar.
120        // This matches the batch path's defer_open semantics.
121        if self.defer_open {
122            self.defer_open = false;
123            self.current_bar = None; // Clear any stale state
124            // Fall through to the is_none() branch below to start new bar
125        }
126
127        if self.current_bar.is_none() {
128            // Start new bar
129            let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
130
131            self.current_bar = Some(InternalRangeBar {
132                open_time: trade.timestamp,
133                close_time: trade.timestamp,
134                open: trade.price,
135                high: trade.price,
136                low: trade.price,
137                close: trade.price,
138                // Issue #88: i128 volume accumulators
139                volume: trade.volume.0 as i128,
140                turnover: trade_turnover,
141                individual_trade_count: 1,
142                agg_record_count: 1,
143                first_trade_id: trade.first_trade_id,
144                last_trade_id: trade.last_trade_id,
145                // Issue #72: Track aggregate trade IDs
146                first_agg_trade_id: trade.agg_trade_id,
147                last_agg_trade_id: trade.agg_trade_id,
148                // Market microstructure fields (Issue #88: i128)
149                buy_volume: if trade.is_buyer_maker {
150                    0i128
151                } else {
152                    trade.volume.0 as i128
153                },
154                sell_volume: if trade.is_buyer_maker {
155                    trade.volume.0 as i128
156                } else {
157                    0i128
158                },
159                buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
160                sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
161                vwap: trade.price,
162                buy_turnover: if trade.is_buyer_maker {
163                    0
164                } else {
165                    trade_turnover
166                },
167                sell_turnover: if trade.is_buyer_maker {
168                    trade_turnover
169                } else {
170                    0
171                },
172            });
173            return;
174        }
175
176        // Process existing bar - work with reference
177        // SAFETY: current_bar guaranteed Some - early return above if None
178        let bar = self.current_bar.as_mut().unwrap();
179        let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
180
181        // CRITICAL FIX: Use fixed-point integer arithmetic for precise threshold calculation
182        // v3.0.0: threshold now in dbps, using BASIS_POINTS_SCALE = 100_000
183        let price_val = trade.price.0;
184        let bar_open_val = bar.open.0;
185        let threshold_decimal_bps = self.threshold_decimal_bps as i64;
186        let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
187        let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
188
189        // Update bar with new trade
190        bar.close_time = trade.timestamp;
191        bar.close = trade.price;
192        bar.volume += trade.volume.0 as i128; // Issue #88: i128 accumulator
193        bar.turnover += trade_turnover;
194        bar.individual_trade_count += 1;
195        bar.agg_record_count += 1;
196        bar.last_trade_id = trade.last_trade_id;
197        bar.last_agg_trade_id = trade.agg_trade_id; // Issue #72
198
199        // Update high/low
200        if price_val > bar.high.0 {
201            bar.high = trade.price;
202        }
203        if price_val < bar.low.0 {
204            bar.low = trade.price;
205        }
206
207        // Update market microstructure
208        if trade.is_buyer_maker {
209            bar.sell_volume += trade.volume.0 as i128; // Issue #88: i128 accumulator
210            bar.sell_turnover += trade_turnover;
211            bar.sell_trade_count += 1;
212        } else {
213            bar.buy_volume += trade.volume.0 as i128; // Issue #88: i128 accumulator
214            bar.buy_turnover += trade_turnover;
215            bar.buy_trade_count += 1;
216        }
217
218        // CRITICAL: Fixed-point threshold breach detection (matches proven 100% compliance algorithm)
219        let price_breaches = price_val >= upper_threshold || price_val <= lower_threshold;
220
221        // Timestamp gate (Issue #36): prevent bars from closing on same timestamp
222        let timestamp_allows_close =
223            !self.prevent_same_timestamp_close || trade.timestamp != bar.open_time;
224
225        if price_breaches && timestamp_allows_close {
226            // Close current bar and move to completed
227            // SAFETY: current_bar guaranteed Some - checked at line 688/734
228            let completed_bar = self.current_bar.take().unwrap();
229
230            // Convert to export format — uses ..Default::default() for all
231            // microstructure/inter-bar/intra-bar fields (0/0.0/None)
232            let mut export_bar = RangeBar {
233                open_time: completed_bar.open_time,
234                close_time: completed_bar.close_time,
235                open: completed_bar.open,
236                high: completed_bar.high,
237                low: completed_bar.low,
238                close: completed_bar.close,
239                volume: completed_bar.volume,
240                turnover: completed_bar.turnover,
241                individual_trade_count: completed_bar.individual_trade_count as u32,
242                agg_record_count: completed_bar.agg_record_count,
243                first_trade_id: completed_bar.first_trade_id,
244                last_trade_id: completed_bar.last_trade_id,
245                first_agg_trade_id: completed_bar.first_agg_trade_id, // Issue #72
246                last_agg_trade_id: completed_bar.last_agg_trade_id,
247                buy_volume: completed_bar.buy_volume,
248                sell_volume: completed_bar.sell_volume,
249                buy_trade_count: completed_bar.buy_trade_count as u32,
250                sell_trade_count: completed_bar.sell_trade_count as u32,
251                vwap: completed_bar.vwap,
252                buy_turnover: completed_bar.buy_turnover,
253                sell_turnover: completed_bar.sell_turnover,
254                ..Default::default() // Issue #25/#59: microstructure computed below; inter/intra-bar not used
255            };
256
257            // Compute microstructure features at bar finalization (Issue #25)
258            export_bar.compute_microstructure_features();
259
260            self.completed_bars.push(export_bar);
261
262            // Issue #46: Don't start new bar with breaching trade.
263            // Next trade will open the new bar via defer_open.
264            self.current_bar = None;
265            self.defer_open = true;
266        }
267    }
268
269    /// Get all completed bars accumulated so far
270    /// This drains the internal buffer to avoid memory leaks
271    pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
272        std::mem::take(&mut self.completed_bars)
273    }
274
275    /// Get incomplete bar if exists (for final bar processing)
276    pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
277        self.current_bar.as_ref().map(|incomplete| {
278            let mut bar = RangeBar {
279                open_time: incomplete.open_time,
280                close_time: incomplete.close_time,
281                open: incomplete.open,
282                high: incomplete.high,
283                low: incomplete.low,
284                close: incomplete.close,
285                volume: incomplete.volume,
286                turnover: incomplete.turnover,
287
288                // Enhanced fields
289                individual_trade_count: incomplete.individual_trade_count as u32,
290                agg_record_count: incomplete.agg_record_count,
291                first_trade_id: incomplete.first_trade_id,
292                last_trade_id: incomplete.last_trade_id,
293                first_agg_trade_id: incomplete.first_agg_trade_id,
294                last_agg_trade_id: incomplete.last_agg_trade_id,
295                data_source: crate::types::DataSource::default(),
296
297                // Market microstructure fields
298                buy_volume: incomplete.buy_volume,
299                sell_volume: incomplete.sell_volume,
300                buy_trade_count: incomplete.buy_trade_count as u32,
301                sell_trade_count: incomplete.sell_trade_count as u32,
302                vwap: incomplete.vwap,
303                buy_turnover: incomplete.buy_turnover,
304                sell_turnover: incomplete.sell_turnover,
305
306                // All microstructure, inter-bar, and intra-bar features default to 0/None
307                ..Default::default()
308            };
309            // Compute microstructure features for incomplete bar (Issue #25)
310            bar.compute_microstructure_features();
311            bar
312        })
313    }
314}