rangebar_core/
processor.rs

1//! Core range bar processing algorithm
2//!
3//! Implements non-lookahead bias range bar construction where bars close when
4//! price moves ±threshold bps from the bar's OPEN price.
5
6use crate::checkpoint::{
7    AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
8};
9use crate::fixed_point::FixedPoint;
10use crate::types::{AggTrade, RangeBar};
11#[cfg(feature = "python")]
12use pyo3::prelude::*;
13use thiserror::Error;
14
15/// Range bar processor with non-lookahead bias guarantee
16pub struct RangeBarProcessor {
17    /// Threshold in decimal basis points (250 = 25bps, v3.0.0+)
18    threshold_decimal_bps: u32,
19
20    /// Current bar state for streaming processing (Q19)
21    /// Enables get_incomplete_bar() and stateful process_single_trade()
22    current_bar_state: Option<RangeBarState>,
23
24    /// Price window for checkpoint hash verification
25    price_window: PriceWindow,
26
27    /// Last processed trade ID (for gap detection on resume)
28    last_trade_id: Option<i64>,
29
30    /// Last processed timestamp (for position verification)
31    last_timestamp_us: i64,
32
33    /// Anomaly tracking for debugging
34    anomaly_summary: AnomalySummary,
35
36    /// Flag indicating this processor was created from a checkpoint
37    /// When true, process_agg_trade_records will continue from existing bar state
38    resumed_from_checkpoint: bool,
39}
40
41impl RangeBarProcessor {
42    /// Create new processor with given threshold
43    ///
44    /// # Arguments
45    ///
46    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
47    ///   - Example: `250` → 25bps = 0.25%
48    ///   - Example: `10` → 1bps = 0.01%
49    ///   - Minimum: `1` → 0.1bps = 0.001%
50    ///
51    /// # Breaking Change (v3.0.0)
52    ///
53    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
54    /// **Migration**: Multiply all threshold values by 10.
55    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
56        // Validation bounds (v3.0.0: decimal bps units)
57        // Min: 1 × 0.1bps = 0.1bps = 0.001%
58        // Max: 100,000 × 0.1bps = 10,000bps = 100%
59        if threshold_decimal_bps < 1 {
60            return Err(ProcessingError::InvalidThreshold {
61                threshold_decimal_bps,
62            });
63        }
64        if threshold_decimal_bps > 100_000 {
65            return Err(ProcessingError::InvalidThreshold {
66                threshold_decimal_bps,
67            });
68        }
69
70        Ok(Self {
71            threshold_decimal_bps,
72            current_bar_state: None,
73            price_window: PriceWindow::new(),
74            last_trade_id: None,
75            last_timestamp_us: 0,
76            anomaly_summary: AnomalySummary::default(),
77            resumed_from_checkpoint: false,
78        })
79    }
80
81    /// Process a single trade and return completed bar if any
82    ///
83    /// Maintains internal state for streaming use case. State persists across calls
84    /// until a bar completes (threshold breach), enabling get_incomplete_bar().
85    ///
86    /// # Arguments
87    ///
88    /// * `trade` - Single aggregated trade to process
89    ///
90    /// # Returns
91    ///
92    /// `Some(RangeBar)` if a bar was completed, `None` otherwise
93    ///
94    /// # State Management
95    ///
96    /// - First trade: Initializes new bar state
97    /// - Subsequent trades: Updates existing bar or closes on breach
98    /// - Breach: Returns completed bar, starts new bar with breaching trade
99    pub fn process_single_trade(
100        &mut self,
101        trade: AggTrade,
102    ) -> Result<Option<RangeBar>, ProcessingError> {
103        // Track price and position for checkpoint
104        self.price_window.push(trade.price);
105        self.last_trade_id = Some(trade.agg_trade_id);
106        self.last_timestamp_us = trade.timestamp;
107
108        match &mut self.current_bar_state {
109            None => {
110                // First trade - initialize new bar
111                self.current_bar_state =
112                    Some(RangeBarState::new(&trade, self.threshold_decimal_bps));
113                Ok(None)
114            }
115            Some(bar_state) => {
116                // Check for threshold breach
117                if bar_state.bar.is_breach(
118                    trade.price,
119                    bar_state.upper_threshold,
120                    bar_state.lower_threshold,
121                ) {
122                    // Breach detected - close current bar
123                    bar_state.bar.update_with_trade(&trade);
124
125                    // Validation: Ensure high/low include open/close extremes
126                    debug_assert!(
127                        bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
128                    );
129                    debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
130
131                    let completed_bar = bar_state.bar.clone();
132
133                    // Start new bar with breaching trade
134                    self.current_bar_state =
135                        Some(RangeBarState::new(&trade, self.threshold_decimal_bps));
136
137                    Ok(Some(completed_bar))
138                } else {
139                    // No breach - update existing bar
140                    bar_state.bar.update_with_trade(&trade);
141                    Ok(None)
142                }
143            }
144        }
145    }
146
147    /// Get any incomplete bar currently being processed
148    ///
149    /// Returns clone of current bar state for inspection without consuming it.
150    /// Useful for final bar at stream end or progress monitoring.
151    ///
152    /// # Returns
153    ///
154    /// `Some(RangeBar)` if bar is in progress, `None` if no active bar
155    pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
156        self.current_bar_state
157            .as_ref()
158            .map(|state| state.bar.clone())
159    }
160
161    /// Process AggTrade records into range bars including incomplete bars for analysis
162    ///
163    /// # Arguments
164    ///
165    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
166    ///
167    /// # Returns
168    ///
169    /// Vector of range bars including incomplete bars at end of data
170    ///
171    /// # Warning
172    ///
173    /// This method is for analysis purposes only. Incomplete bars violate the
174    /// fundamental range bar algorithm and should not be used for production trading.
175    pub fn process_agg_trade_records_with_incomplete(
176        &mut self,
177        agg_trade_records: &[AggTrade],
178    ) -> Result<Vec<RangeBar>, ProcessingError> {
179        self.process_agg_trade_records_with_options(agg_trade_records, true)
180    }
181
182    /// Process Binance aggregated trade records into range bars
183    ///
184    /// This is the primary method for converting AggTrade records (which aggregate
185    /// multiple individual trades) into range bars based on price movement thresholds.
186    ///
187    /// # Parameters
188    ///
189    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
190    ///   Each record represents multiple individual trades aggregated at same price
191    ///
192    /// # Returns
193    ///
194    /// Vector of completed range bars (ONLY bars that breached thresholds).
195    /// Each bar tracks both individual trade count and AggTrade record count.
196    pub fn process_agg_trade_records(
197        &mut self,
198        agg_trade_records: &[AggTrade],
199    ) -> Result<Vec<RangeBar>, ProcessingError> {
200        self.process_agg_trade_records_with_options(agg_trade_records, false)
201    }
202
203    /// Process AggTrade records with options for including incomplete bars
204    ///
205    /// Batch processing mode: Clears any existing state before processing.
206    /// Use process_single_trade() for stateful streaming instead.
207    ///
208    /// # Parameters
209    ///
210    /// * `agg_trade_records` - Slice of AggTrade records sorted by (timestamp, agg_trade_id)
211    /// * `include_incomplete` - Whether to include incomplete bars at end of processing
212    ///
213    /// # Returns
214    ///
215    /// Vector of range bars (completed + incomplete if requested)
216    pub fn process_agg_trade_records_with_options(
217        &mut self,
218        agg_trade_records: &[AggTrade],
219        include_incomplete: bool,
220    ) -> Result<Vec<RangeBar>, ProcessingError> {
221        if agg_trade_records.is_empty() {
222            return Ok(Vec::new());
223        }
224
225        // Validate records are sorted
226        self.validate_trade_ordering(agg_trade_records)?;
227
228        // Use existing bar state if resuming from checkpoint, otherwise start fresh
229        // This is CRITICAL for cross-file continuation (Issues #2, #3)
230        let mut current_bar: Option<RangeBarState> = if self.resumed_from_checkpoint {
231            // Continue from checkpoint's incomplete bar
232            self.resumed_from_checkpoint = false; // Consume the flag
233            self.current_bar_state.take()
234        } else {
235            // Start fresh for normal batch processing
236            self.current_bar_state = None;
237            None
238        };
239
240        let mut bars = Vec::with_capacity(agg_trade_records.len() / 100); // Heuristic capacity
241        let mut defer_open = false;
242
243        for agg_record in agg_trade_records {
244            // Track price and position for checkpoint
245            self.price_window.push(agg_record.price);
246            self.last_trade_id = Some(agg_record.agg_trade_id);
247            self.last_timestamp_us = agg_record.timestamp;
248
249            if defer_open {
250                // Previous bar closed, this agg_record opens new bar
251                current_bar = Some(RangeBarState::new(agg_record, self.threshold_decimal_bps));
252                defer_open = false;
253                continue;
254            }
255
256            match current_bar {
257                None => {
258                    // First bar initialization
259                    current_bar = Some(RangeBarState::new(agg_record, self.threshold_decimal_bps));
260                }
261                Some(ref mut bar_state) => {
262                    // Check if this AggTrade record breaches the threshold
263                    if bar_state.bar.is_breach(
264                        agg_record.price,
265                        bar_state.upper_threshold,
266                        bar_state.lower_threshold,
267                    ) {
268                        // Breach detected - update bar with breaching record (includes microstructure)
269                        bar_state.bar.update_with_trade(agg_record);
270
271                        // Validation: Ensure high/low include open/close extremes
272                        debug_assert!(
273                            bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
274                        );
275                        debug_assert!(
276                            bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
277                        );
278
279                        bars.push(bar_state.bar.clone());
280                        current_bar = None;
281                        defer_open = true; // Next record will open new bar
282                    } else {
283                        // No breach: normal update with microstructure calculations
284                        bar_state.bar.update_with_trade(agg_record);
285                    }
286                }
287            }
288        }
289
290        // Save current bar state for checkpoint (preserves incomplete bar)
291        self.current_bar_state = current_bar.clone();
292
293        // Add final partial bar only if explicitly requested
294        // This preserves algorithm integrity: bars should only close on threshold breach
295        if include_incomplete && let Some(bar_state) = current_bar {
296            bars.push(bar_state.bar);
297        }
298
299        Ok(bars)
300    }
301
302    // === CHECKPOINT METHODS ===
303
304    /// Create checkpoint for cross-file continuation
305    ///
306    /// Captures current processing state for seamless continuation:
307    /// - Incomplete bar (if any) with FIXED thresholds
308    /// - Position tracking (timestamp, trade_id if available)
309    /// - Price hash for verification
310    ///
311    /// # Arguments
312    ///
313    /// * `symbol` - Symbol being processed (e.g., "BTCUSDT", "EURUSD")
314    ///
315    /// # Example
316    ///
317    /// ```ignore
318    /// let bars = processor.process_agg_trade_records(&trades)?;
319    /// let checkpoint = processor.create_checkpoint("BTCUSDT");
320    /// let json = serde_json::to_string(&checkpoint)?;
321    /// std::fs::write("checkpoint.json", json)?;
322    /// ```
323    pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
324        let (incomplete_bar, thresholds) = match &self.current_bar_state {
325            Some(state) => (
326                Some(state.bar.clone()),
327                Some((state.upper_threshold, state.lower_threshold)),
328            ),
329            None => (None, None),
330        };
331
332        Checkpoint::new(
333            symbol.to_string(),
334            self.threshold_decimal_bps,
335            incomplete_bar,
336            thresholds,
337            self.last_timestamp_us,
338            self.last_trade_id,
339            self.price_window.compute_hash(),
340        )
341    }
342
343    /// Resume processing from checkpoint
344    ///
345    /// Restores incomplete bar state with IMMUTABLE thresholds.
346    /// Next trade continues building the bar until threshold breach.
347    ///
348    /// # Errors
349    ///
350    /// - `CheckpointError::MissingThresholds` - Checkpoint has bar but no thresholds
351    ///
352    /// # Example
353    ///
354    /// ```ignore
355    /// let json = std::fs::read_to_string("checkpoint.json")?;
356    /// let checkpoint: Checkpoint = serde_json::from_str(&json)?;
357    /// let mut processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
358    /// let bars = processor.process_agg_trade_records(&next_file_trades)?;
359    /// ```
360    pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
361        // Validate checkpoint consistency
362        if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
363            return Err(CheckpointError::MissingThresholds);
364        }
365
366        // Restore bar state if there's an incomplete bar
367        let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
368            (Some(bar), Some((upper, lower))) => Some(RangeBarState {
369                bar,
370                upper_threshold: upper,
371                lower_threshold: lower,
372            }),
373            _ => None,
374        };
375
376        Ok(Self {
377            threshold_decimal_bps: checkpoint.threshold_decimal_bps,
378            current_bar_state,
379            price_window: PriceWindow::new(), // Reset - will be rebuilt from new trades
380            last_trade_id: checkpoint.last_trade_id,
381            last_timestamp_us: checkpoint.last_timestamp_us,
382            anomaly_summary: checkpoint.anomaly_summary,
383            resumed_from_checkpoint: true, // Signal to continue from existing bar state
384        })
385    }
386
387    /// Verify we're at the right position in the data stream
388    ///
389    /// Call with first trade of new file to verify continuity.
390    /// Returns verification result indicating if there's a gap or exact match.
391    ///
392    /// # Arguments
393    ///
394    /// * `first_trade` - First trade of the new file/chunk
395    ///
396    /// # Example
397    ///
398    /// ```ignore
399    /// let processor = RangeBarProcessor::from_checkpoint(checkpoint)?;
400    /// let verification = processor.verify_position(&next_file_trades[0]);
401    /// match verification {
402    ///     PositionVerification::Exact => println!("Perfect continuation!"),
403    ///     PositionVerification::Gap { missing_count, .. } => {
404    ///         println!("Warning: {} trades missing", missing_count);
405    ///     }
406    ///     PositionVerification::TimestampOnly { gap_ms } => {
407    ///         println!("Exness data: {}ms gap", gap_ms);
408    ///     }
409    /// }
410    /// ```
411    pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
412        match self.last_trade_id {
413            Some(last_id) => {
414                // Binance: has trade IDs - check for gaps
415                let expected_id = last_id + 1;
416                if first_trade.agg_trade_id == expected_id {
417                    PositionVerification::Exact
418                } else {
419                    let missing_count = first_trade.agg_trade_id - expected_id;
420                    PositionVerification::Gap {
421                        expected_id,
422                        actual_id: first_trade.agg_trade_id,
423                        missing_count,
424                    }
425                }
426            }
427            None => {
428                // Exness: no trade IDs - use timestamp only
429                let gap_us = first_trade.timestamp - self.last_timestamp_us;
430                let gap_ms = gap_us / 1000;
431                PositionVerification::TimestampOnly { gap_ms }
432            }
433        }
434    }
435
436    /// Get the current anomaly summary
437    pub fn anomaly_summary(&self) -> &AnomalySummary {
438        &self.anomaly_summary
439    }
440
441    /// Get the threshold in decimal basis points
442    pub fn threshold_decimal_bps(&self) -> u32 {
443        self.threshold_decimal_bps
444    }
445
446    /// Validate that trades are properly sorted for deterministic processing
447    fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
448        for i in 1..trades.len() {
449            let prev = &trades[i - 1];
450            let curr = &trades[i];
451
452            // Check ordering: (timestamp, agg_trade_id) ascending
453            if curr.timestamp < prev.timestamp
454                || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
455            {
456                return Err(ProcessingError::UnsortedTrades {
457                    index: i,
458                    prev_time: prev.timestamp,
459                    prev_id: prev.agg_trade_id,
460                    curr_time: curr.timestamp,
461                    curr_id: curr.agg_trade_id,
462                });
463            }
464        }
465
466        Ok(())
467    }
468}
469
470/// Internal state for a range bar being built
471#[derive(Clone)]
472struct RangeBarState {
473    /// The range bar being constructed
474    pub bar: RangeBar,
475
476    /// Upper breach threshold (FIXED from bar open)
477    pub upper_threshold: FixedPoint,
478
479    /// Lower breach threshold (FIXED from bar open)
480    pub lower_threshold: FixedPoint,
481}
482
483impl RangeBarState {
484    /// Create new range bar state from opening trade
485    fn new(trade: &AggTrade, threshold_decimal_bps: u32) -> Self {
486        let bar = RangeBar::new(trade);
487
488        // Compute FIXED thresholds from opening price
489        let (upper_threshold, lower_threshold) =
490            bar.open.compute_range_thresholds(threshold_decimal_bps);
491
492        Self {
493            bar,
494            upper_threshold,
495            lower_threshold,
496        }
497    }
498}
499
500/// Processing errors
501#[derive(Error, Debug)]
502pub enum ProcessingError {
503    #[error(
504        "Trades not sorted at index {index}: prev=({prev_time}, {prev_id}), curr=({curr_time}, {curr_id})"
505    )]
506    UnsortedTrades {
507        index: usize,
508        prev_time: i64,
509        prev_id: i64,
510        curr_time: i64,
511        curr_id: i64,
512    },
513
514    #[error("Empty trade data")]
515    EmptyData,
516
517    #[error(
518        "Invalid threshold: {threshold_decimal_bps} (decimal bps). Valid range: 1-100,000 (0.001%-100%)"
519    )]
520    InvalidThreshold { threshold_decimal_bps: u32 },
521}
522
523#[cfg(feature = "python")]
524impl From<ProcessingError> for PyErr {
525    fn from(err: ProcessingError) -> PyErr {
526        match err {
527            ProcessingError::UnsortedTrades {
528                index,
529                prev_time,
530                prev_id,
531                curr_time,
532                curr_id,
533            } => pyo3::exceptions::PyValueError::new_err(format!(
534                "Trades not sorted at index {}: prev=({}, {}), curr=({}, {})",
535                index, prev_time, prev_id, curr_time, curr_id
536            )),
537            ProcessingError::EmptyData => {
538                pyo3::exceptions::PyValueError::new_err("Empty trade data")
539            }
540            ProcessingError::InvalidThreshold {
541                threshold_decimal_bps,
542            } => pyo3::exceptions::PyValueError::new_err(format!(
543                "Invalid threshold: {} (decimal bps). Valid range: 1-100,000 (0.001%-100%)",
544                threshold_decimal_bps
545            )),
546        }
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use crate::test_utils::{self, scenarios};
554
555    #[test]
556    fn test_single_bar_no_breach() {
557        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
558
559        // Create trades that stay within 25 bps threshold
560        let trades = scenarios::no_breach_sequence(250);
561
562        // Test strict algorithm compliance: no bars should be created without breach
563        let bars = processor.process_agg_trade_records(&trades).unwrap();
564        assert_eq!(
565            bars.len(),
566            0,
567            "Strict algorithm should not create bars without breach"
568        );
569
570        // Test analysis mode: incomplete bar should be available for analysis
571        let bars_with_incomplete = processor
572            .process_agg_trade_records_with_incomplete(&trades)
573            .unwrap();
574        assert_eq!(
575            bars_with_incomplete.len(),
576            1,
577            "Analysis mode should include incomplete bar"
578        );
579
580        let bar = &bars_with_incomplete[0];
581        assert_eq!(bar.open.to_string(), "50000.00000000");
582        assert_eq!(bar.high.to_string(), "50100.00000000");
583        assert_eq!(bar.low.to_string(), "49900.00000000");
584        assert_eq!(bar.close.to_string(), "49900.00000000");
585    }
586
587    #[test]
588    fn test_exact_breach_upward() {
589        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
590
591        let trades = scenarios::exact_breach_upward(250);
592
593        // Test strict algorithm: only completed bars (with breach)
594        let bars = processor.process_agg_trade_records(&trades).unwrap();
595        assert_eq!(
596            bars.len(),
597            1,
598            "Strict algorithm should only return completed bars"
599        );
600
601        // First bar should close at breach
602        let bar1 = &bars[0];
603        assert_eq!(bar1.open.to_string(), "50000.00000000");
604        // Breach at 25 bps = 0.25% = 50000 * 1.0025 = 50125
605        assert_eq!(bar1.close.to_string(), "50125.00000000"); // Breach tick included
606        assert_eq!(bar1.high.to_string(), "50125.00000000");
607        assert_eq!(bar1.low.to_string(), "50000.00000000");
608
609        // Test analysis mode: includes incomplete second bar
610        let bars_with_incomplete = processor
611            .process_agg_trade_records_with_incomplete(&trades)
612            .unwrap();
613        assert_eq!(
614            bars_with_incomplete.len(),
615            2,
616            "Analysis mode should include incomplete bars"
617        );
618
619        // Second bar should start at next tick price (not breach price)
620        let bar2 = &bars_with_incomplete[1];
621        assert_eq!(bar2.open.to_string(), "50500.00000000"); // Next tick after breach
622        assert_eq!(bar2.close.to_string(), "50500.00000000");
623    }
624
625    #[test]
626    fn test_exact_breach_downward() {
627        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
628
629        let trades = scenarios::exact_breach_downward(250);
630
631        let bars = processor.process_agg_trade_records(&trades).unwrap();
632
633        assert_eq!(bars.len(), 1);
634
635        let bar = &bars[0];
636        assert_eq!(bar.open.to_string(), "50000.00000000");
637        assert_eq!(bar.close.to_string(), "49875.00000000"); // Breach tick included
638        assert_eq!(bar.high.to_string(), "50000.00000000");
639        assert_eq!(bar.low.to_string(), "49875.00000000");
640    }
641
642    #[test]
643    fn test_large_gap_single_bar() {
644        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
645
646        let trades = scenarios::large_gap_sequence();
647
648        let bars = processor.process_agg_trade_records(&trades).unwrap();
649
650        // Should create exactly ONE bar, not multiple bars to "fill the gap"
651        assert_eq!(bars.len(), 1);
652
653        let bar = &bars[0];
654        assert_eq!(bar.open.to_string(), "50000.00000000");
655        assert_eq!(bar.close.to_string(), "51000.00000000");
656        assert_eq!(bar.high.to_string(), "51000.00000000");
657        assert_eq!(bar.low.to_string(), "50000.00000000");
658    }
659
660    #[test]
661    fn test_unsorted_trades_error() {
662        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
663
664        let trades = scenarios::unsorted_sequence();
665
666        let result = processor.process_agg_trade_records(&trades);
667        assert!(result.is_err());
668
669        match result {
670            Err(ProcessingError::UnsortedTrades { index, .. }) => {
671                assert_eq!(index, 1);
672            }
673            _ => panic!("Expected UnsortedTrades error"),
674        }
675    }
676
677    #[test]
678    fn test_threshold_calculation() {
679        let processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps = 0.25%
680
681        let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
682        let bar_state = RangeBarState::new(&trade, processor.threshold_decimal_bps);
683
684        // 50000 * 0.0025 = 125 (25bps = 0.25%)
685        assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
686        assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
687    }
688
689    #[test]
690    fn test_empty_trades() {
691        let mut processor = RangeBarProcessor::new(250).unwrap(); // 250 × 0.1bps = 25bps
692        let trades = scenarios::empty_sequence();
693        let bars = processor.process_agg_trade_records(&trades).unwrap();
694        assert_eq!(bars.len(), 0);
695    }
696
697    #[test]
698    fn test_debug_streaming_data() {
699        let mut processor = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
700
701        // Create trades similar to our test data
702        let trades = vec![
703            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
704            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
705            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
706        ];
707
708        println!("Test data prices: 50014 -> 50163 -> 50032");
709        println!("Expected price movements: +0.3% then -0.26%");
710
711        let bars = processor.process_agg_trade_records(&trades).unwrap();
712        println!("Generated {} range bars", bars.len());
713
714        for (i, bar) in bars.iter().enumerate() {
715            println!(
716                "  Bar {}: O={} H={} L={} C={}",
717                i + 1,
718                bar.open,
719                bar.high,
720                bar.low,
721                bar.close
722            );
723        }
724
725        // With a 0.1% threshold and 0.3% price movement, we should get at least 1 bar
726        assert!(
727            !bars.is_empty(),
728            "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
729        );
730    }
731
732    #[test]
733    fn test_threshold_validation() {
734        // Valid threshold
735        assert!(RangeBarProcessor::new(250).is_ok());
736
737        // Invalid: too low (0 × 0.1bps = 0%)
738        assert!(matches!(
739            RangeBarProcessor::new(0),
740            Err(ProcessingError::InvalidThreshold {
741                threshold_decimal_bps: 0
742            })
743        ));
744
745        // Invalid: too high (150,000 × 0.1bps = 15,000bps = 150%)
746        assert!(matches!(
747            RangeBarProcessor::new(150_000),
748            Err(ProcessingError::InvalidThreshold {
749                threshold_decimal_bps: 150_000
750            })
751        ));
752
753        // Valid boundary: minimum (1 × 0.1bps = 0.1bps = 0.001%)
754        assert!(RangeBarProcessor::new(1).is_ok());
755
756        // Valid boundary: maximum (100,000 × 0.1bps = 10,000bps = 100%)
757        assert!(RangeBarProcessor::new(100_000).is_ok());
758    }
759
760    #[test]
761    fn test_export_processor_with_manual_trades() {
762        println!("Testing ExportRangeBarProcessor with same trade data...");
763
764        let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
765
766        // Use same trades as the working basic test
767        let trades = vec![
768            test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
769            test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), // ~0.3% increase
770            test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
771        ];
772
773        println!(
774            "Processing {} trades with ExportRangeBarProcessor...",
775            trades.len()
776        );
777
778        export_processor.process_trades_continuously(&trades);
779        let bars = export_processor.get_all_completed_bars();
780
781        println!(
782            "ExportRangeBarProcessor generated {} range bars",
783            bars.len()
784        );
785        for (i, bar) in bars.iter().enumerate() {
786            println!(
787                "  Bar {}: O={} H={} L={} C={}",
788                i + 1,
789                bar.open,
790                bar.high,
791                bar.low,
792                bar.close
793            );
794        }
795
796        // Should match the basic processor results (1 bar)
797        assert!(
798            !bars.is_empty(),
799            "ExportRangeBarProcessor should generate same results as basic processor"
800        );
801    }
802
803    // === CHECKPOINT TESTS (Issues #2 and #3) ===
804
805    #[test]
806    fn test_checkpoint_creation() {
807        let mut processor = RangeBarProcessor::new(250).unwrap();
808
809        // Process some trades that don't complete a bar
810        let trades = scenarios::no_breach_sequence(250);
811        let _bars = processor.process_agg_trade_records(&trades).unwrap();
812
813        // Create checkpoint
814        let checkpoint = processor.create_checkpoint("BTCUSDT");
815
816        assert_eq!(checkpoint.symbol, "BTCUSDT");
817        assert_eq!(checkpoint.threshold_decimal_bps, 250);
818        assert!(checkpoint.has_incomplete_bar()); // Should have incomplete bar
819        assert!(checkpoint.thresholds.is_some()); // Thresholds should be saved
820        assert!(checkpoint.last_trade_id.is_some()); // Should track last trade
821    }
822
823    #[test]
824    fn test_checkpoint_serialization_roundtrip() {
825        let mut processor = RangeBarProcessor::new(250).unwrap();
826
827        // Process trades
828        let trades = scenarios::no_breach_sequence(250);
829        let _bars = processor.process_agg_trade_records(&trades).unwrap();
830
831        // Create checkpoint
832        let checkpoint = processor.create_checkpoint("BTCUSDT");
833
834        // Serialize to JSON
835        let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
836
837        // Deserialize back
838        let restored: Checkpoint =
839            serde_json::from_str(&json).expect("Deserialization should succeed");
840
841        assert_eq!(restored.symbol, checkpoint.symbol);
842        assert_eq!(
843            restored.threshold_decimal_bps,
844            checkpoint.threshold_decimal_bps
845        );
846        assert_eq!(
847            restored.incomplete_bar.is_some(),
848            checkpoint.incomplete_bar.is_some()
849        );
850    }
851
852    #[test]
853    fn test_cross_file_bar_continuation() {
854        // This is the PRIMARY test for Issues #2 and #3
855        // Verifies that incomplete bars continue correctly across file boundaries
856
857        // Create trades that span multiple bars
858        let mut all_trades = Vec::new();
859
860        // Generate enough trades to produce multiple bars
861        // Using 100bps threshold (1%) for clearer price movements
862        let base_timestamp = 1640995200000000i64; // Microseconds
863
864        // Create a sequence where we'll have ~3-4 completed bars with remainder
865        for i in 0..20 {
866            let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
867            let trade = test_utils::create_test_agg_trade(
868                i + 1,
869                &format!("{:.8}", price),
870                "1.0",
871                base_timestamp + (i as i64 * 1000000),
872            );
873            all_trades.push(trade);
874        }
875
876        // === FULL PROCESSING (baseline) ===
877        let mut processor_full = RangeBarProcessor::new(100).unwrap(); // 100 × 0.1bps = 10bps = 0.1%
878        let bars_full = processor_full
879            .process_agg_trade_records(&all_trades)
880            .unwrap();
881
882        // === SPLIT PROCESSING WITH CHECKPOINT ===
883        let split_point = 10; // Split in the middle
884
885        // Part 1: Process first half
886        let mut processor_1 = RangeBarProcessor::new(100).unwrap();
887        let part1_trades = &all_trades[0..split_point];
888        let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
889
890        // Create checkpoint
891        let checkpoint = processor_1.create_checkpoint("TEST");
892
893        // Part 2: Resume from checkpoint and process second half
894        let mut processor_2 = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
895        let part2_trades = &all_trades[split_point..];
896        let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
897
898        // === VERIFY CONTINUATION ===
899        // Total completed bars should match full processing
900        let split_total = bars_1.len() + bars_2.len();
901
902        println!("Full processing: {} bars", bars_full.len());
903        println!(
904            "Split processing: {} + {} = {} bars",
905            bars_1.len(),
906            bars_2.len(),
907            split_total
908        );
909
910        assert_eq!(
911            split_total,
912            bars_full.len(),
913            "Split processing should produce same bar count as full processing"
914        );
915
916        // Verify the bars themselves match
917        let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
918        for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
919            assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
920            assert_eq!(
921                full.close.0, split.close.0,
922                "Bar {} close price mismatch",
923                i
924            );
925        }
926    }
927
928    #[test]
929    fn test_verify_position_exact() {
930        let mut processor = RangeBarProcessor::new(250).unwrap();
931
932        // Process some trades
933        let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
934        let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
935
936        let _ = processor.process_single_trade(trade1);
937        let _ = processor.process_single_trade(trade2);
938
939        // Create next trade in sequence
940        let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
941
942        // Verify position
943        let verification = processor.verify_position(&next_trade);
944
945        assert_eq!(verification, PositionVerification::Exact);
946    }
947
948    #[test]
949    fn test_verify_position_gap() {
950        let mut processor = RangeBarProcessor::new(250).unwrap();
951
952        // Process some trades
953        let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
954        let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
955
956        let _ = processor.process_single_trade(trade1);
957        let _ = processor.process_single_trade(trade2);
958
959        // Create next trade with gap (skip IDs 102-104)
960        let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
961
962        // Verify position
963        let verification = processor.verify_position(&next_trade);
964
965        match verification {
966            PositionVerification::Gap {
967                expected_id,
968                actual_id,
969                missing_count,
970            } => {
971                assert_eq!(expected_id, 102);
972                assert_eq!(actual_id, 105);
973                assert_eq!(missing_count, 3);
974            }
975            _ => panic!("Expected Gap verification, got {:?}", verification),
976        }
977    }
978
979    #[test]
980    fn test_checkpoint_clean_completion() {
981        // Test when last trade completes a bar with no remainder
982        // In range bar algorithm: breach trade closes bar, NEXT trade opens new bar
983        // If there's no next trade, there's no incomplete bar
984        let mut processor = RangeBarProcessor::new(100).unwrap(); // 10bps
985
986        // Create trades that complete exactly one bar
987        let trades = vec![
988            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
989            test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // ~0.2% move, breaches 0.1%
990        ];
991
992        let bars = processor.process_agg_trade_records(&trades).unwrap();
993        assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
994
995        // Create checkpoint - should NOT have incomplete bar
996        // (breach trade closes bar, no next trade to open new bar)
997        let checkpoint = processor.create_checkpoint("TEST");
998
999        // With defer_open logic, the next bar isn't started until the next trade
1000        assert!(
1001            !checkpoint.has_incomplete_bar(),
1002            "No incomplete bar when last trade was a breach with no following trade"
1003        );
1004    }
1005
1006    #[test]
1007    fn test_checkpoint_with_remainder() {
1008        // Test when we have trades remaining after a completed bar
1009        let mut processor = RangeBarProcessor::new(100).unwrap(); // 10bps
1010
1011        // Create trades: bar completes at trade 2, trade 3 starts new bar
1012        let trades = vec![
1013            test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1014            test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), // Breach
1015            test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), // Opens new bar
1016        ];
1017
1018        let bars = processor.process_agg_trade_records(&trades).unwrap();
1019        assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1020
1021        // Create checkpoint - should have incomplete bar from trade 3
1022        let checkpoint = processor.create_checkpoint("TEST");
1023
1024        assert!(
1025            checkpoint.has_incomplete_bar(),
1026            "Should have incomplete bar from trade 3"
1027        );
1028
1029        // Verify the incomplete bar has correct data
1030        let incomplete = checkpoint.incomplete_bar.unwrap();
1031        assert_eq!(
1032            incomplete.open.to_string(),
1033            "50110.00000000",
1034            "Incomplete bar should open at trade 3 price"
1035        );
1036    }
1037}
1038
1039/// Internal state for range bar construction with fixed-point precision
1040#[derive(Debug, Clone)]
1041struct InternalRangeBar {
1042    open_time: i64,
1043    close_time: i64,
1044    open: FixedPoint,
1045    high: FixedPoint,
1046    low: FixedPoint,
1047    close: FixedPoint,
1048    volume: FixedPoint,
1049    turnover: i128,
1050    individual_trade_count: i64,
1051    agg_record_count: u32,
1052    first_trade_id: i64,
1053    last_trade_id: i64,
1054    /// Volume from buy-side trades (is_buyer_maker = false)
1055    buy_volume: FixedPoint,
1056    /// Volume from sell-side trades (is_buyer_maker = true)
1057    sell_volume: FixedPoint,
1058    /// Number of buy-side trades
1059    buy_trade_count: i64,
1060    /// Number of sell-side trades
1061    sell_trade_count: i64,
1062    /// Volume Weighted Average Price
1063    vwap: FixedPoint,
1064    /// Turnover from buy-side trades
1065    buy_turnover: i128,
1066    /// Turnover from sell-side trades
1067    sell_turnover: i128,
1068}
1069
1070/// Export-oriented range bar processor for streaming use cases
1071///
1072/// This implementation uses the proven fixed-point arithmetic algorithm
1073/// that achieves 100% breach consistency compliance in multi-year processing.
1074pub struct ExportRangeBarProcessor {
1075    threshold_decimal_bps: u32,
1076    current_bar: Option<InternalRangeBar>,
1077    completed_bars: Vec<RangeBar>,
1078}
1079
1080impl ExportRangeBarProcessor {
1081    /// Create new export processor with given threshold
1082    ///
1083    /// # Arguments
1084    ///
1085    /// * `threshold_decimal_bps` - Threshold in **decimal basis points**
1086    ///   - Example: `250` → 25bps = 0.25%
1087    ///   - Example: `10` → 1bps = 0.01%
1088    ///   - Minimum: `1` → 0.1bps = 0.001%
1089    ///
1090    /// # Breaking Change (v3.0.0)
1091    ///
1092    /// Prior to v3.0.0, `threshold_decimal_bps` was in 1bps units.
1093    /// **Migration**: Multiply all threshold values by 10.
1094    pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
1095        // Validation bounds (v3.0.0: decimal bps units)
1096        // Min: 1 × 0.1bps = 0.1bps = 0.001%
1097        // Max: 100,000 × 0.1bps = 10,000bps = 100%
1098        if threshold_decimal_bps < 1 {
1099            return Err(ProcessingError::InvalidThreshold {
1100                threshold_decimal_bps,
1101            });
1102        }
1103        if threshold_decimal_bps > 100_000 {
1104            return Err(ProcessingError::InvalidThreshold {
1105                threshold_decimal_bps,
1106            });
1107        }
1108
1109        Ok(Self {
1110            threshold_decimal_bps,
1111            current_bar: None,
1112            completed_bars: Vec::new(),
1113        })
1114    }
1115
1116    /// Process trades continuously using proven fixed-point algorithm
1117    /// This method maintains 100% breach consistency by using precise integer arithmetic
1118    pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
1119        for trade in trades {
1120            self.process_single_trade_fixed_point(trade);
1121        }
1122    }
1123
1124    /// Process single trade using proven fixed-point algorithm (100% breach consistency)
1125    fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
1126        if self.current_bar.is_none() {
1127            // Start new bar
1128            let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
1129
1130            self.current_bar = Some(InternalRangeBar {
1131                open_time: trade.timestamp,
1132                close_time: trade.timestamp,
1133                open: trade.price,
1134                high: trade.price,
1135                low: trade.price,
1136                close: trade.price,
1137                volume: trade.volume,
1138                turnover: trade_turnover,
1139                individual_trade_count: 1,
1140                agg_record_count: 1,
1141                first_trade_id: trade.agg_trade_id,
1142                last_trade_id: trade.agg_trade_id,
1143                // Market microstructure fields
1144                buy_volume: if trade.is_buyer_maker {
1145                    FixedPoint(0)
1146                } else {
1147                    trade.volume
1148                },
1149                sell_volume: if trade.is_buyer_maker {
1150                    trade.volume
1151                } else {
1152                    FixedPoint(0)
1153                },
1154                buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
1155                sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
1156                vwap: trade.price,
1157                buy_turnover: if trade.is_buyer_maker {
1158                    0
1159                } else {
1160                    trade_turnover
1161                },
1162                sell_turnover: if trade.is_buyer_maker {
1163                    trade_turnover
1164                } else {
1165                    0
1166                },
1167            });
1168            return;
1169        }
1170
1171        // Process existing bar - work with reference
1172        // SAFETY: current_bar guaranteed Some - early return above if None
1173        let bar = self.current_bar.as_mut().unwrap();
1174        let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
1175
1176        // CRITICAL FIX: Use fixed-point integer arithmetic for precise threshold calculation
1177        // v3.0.0: threshold now in decimal bps, using BASIS_POINTS_SCALE = 100_000
1178        let price_val = trade.price.0;
1179        let bar_open_val = bar.open.0;
1180        let threshold_decimal_bps = self.threshold_decimal_bps as i64;
1181        let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
1182        let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
1183
1184        // Update bar with new trade
1185        bar.close_time = trade.timestamp;
1186        bar.close = trade.price;
1187        bar.volume.0 += trade.volume.0;
1188        bar.turnover += trade_turnover;
1189        bar.individual_trade_count += 1;
1190        bar.agg_record_count += 1;
1191        bar.last_trade_id = trade.agg_trade_id;
1192
1193        // Update high/low
1194        if price_val > bar.high.0 {
1195            bar.high = trade.price;
1196        }
1197        if price_val < bar.low.0 {
1198            bar.low = trade.price;
1199        }
1200
1201        // Update market microstructure
1202        if trade.is_buyer_maker {
1203            bar.sell_volume.0 += trade.volume.0;
1204            bar.sell_turnover += trade_turnover;
1205            bar.sell_trade_count += 1;
1206        } else {
1207            bar.buy_volume.0 += trade.volume.0;
1208            bar.buy_turnover += trade_turnover;
1209            bar.buy_trade_count += 1;
1210        }
1211
1212        // CRITICAL: Fixed-point threshold breach detection (matches proven 100% compliance algorithm)
1213        if price_val >= upper_threshold || price_val <= lower_threshold {
1214            // Close current bar and move to completed
1215            // SAFETY: current_bar guaranteed Some - checked at line 688/734
1216            let completed_bar = self.current_bar.take().unwrap();
1217
1218            // Convert to export format (this is from an old internal structure)
1219            let export_bar = RangeBar {
1220                open_time: completed_bar.open_time,
1221                close_time: completed_bar.close_time,
1222                open: completed_bar.open,
1223                high: completed_bar.high,
1224                low: completed_bar.low,
1225                close: completed_bar.close,
1226                volume: completed_bar.volume,
1227                turnover: completed_bar.turnover,
1228
1229                // Enhanced fields
1230                individual_trade_count: completed_bar.individual_trade_count as u32,
1231                agg_record_count: completed_bar.agg_record_count,
1232                first_trade_id: completed_bar.first_trade_id,
1233                last_trade_id: completed_bar.last_trade_id,
1234                data_source: crate::types::DataSource::default(),
1235
1236                // Market microstructure fields
1237                buy_volume: completed_bar.buy_volume,
1238                sell_volume: completed_bar.sell_volume,
1239                buy_trade_count: completed_bar.buy_trade_count as u32,
1240                sell_trade_count: completed_bar.sell_trade_count as u32,
1241                vwap: completed_bar.vwap,
1242                buy_turnover: completed_bar.buy_turnover,
1243                sell_turnover: completed_bar.sell_turnover,
1244            };
1245
1246            self.completed_bars.push(export_bar);
1247
1248            // Start new bar with breaching trade
1249            let initial_buy_turnover = if trade.is_buyer_maker {
1250                0
1251            } else {
1252                trade_turnover
1253            };
1254            let initial_sell_turnover = if trade.is_buyer_maker {
1255                trade_turnover
1256            } else {
1257                0
1258            };
1259
1260            self.current_bar = Some(InternalRangeBar {
1261                open_time: trade.timestamp,
1262                close_time: trade.timestamp,
1263                open: trade.price,
1264                high: trade.price,
1265                low: trade.price,
1266                close: trade.price,
1267                volume: trade.volume,
1268                turnover: trade_turnover,
1269                individual_trade_count: 1,
1270                agg_record_count: 1,
1271                first_trade_id: trade.agg_trade_id,
1272                last_trade_id: trade.agg_trade_id,
1273                // Market microstructure fields
1274                buy_volume: if trade.is_buyer_maker {
1275                    FixedPoint(0)
1276                } else {
1277                    trade.volume
1278                },
1279                sell_volume: if trade.is_buyer_maker {
1280                    trade.volume
1281                } else {
1282                    FixedPoint(0)
1283                },
1284                buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
1285                sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
1286                vwap: trade.price,
1287                buy_turnover: initial_buy_turnover,
1288                sell_turnover: initial_sell_turnover,
1289            });
1290        }
1291    }
1292
1293    /// Get all completed bars accumulated so far
1294    /// This drains the internal buffer to avoid memory leaks
1295    pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
1296        std::mem::take(&mut self.completed_bars)
1297    }
1298
1299    /// Get incomplete bar if exists (for final bar processing)
1300    pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
1301        self.current_bar.as_ref().map(|incomplete| RangeBar {
1302            open_time: incomplete.open_time,
1303            close_time: incomplete.close_time,
1304            open: incomplete.open,
1305            high: incomplete.high,
1306            low: incomplete.low,
1307            close: incomplete.close,
1308            volume: incomplete.volume,
1309            turnover: incomplete.turnover,
1310
1311            // Enhanced fields
1312            individual_trade_count: incomplete.individual_trade_count as u32,
1313            agg_record_count: incomplete.agg_record_count,
1314            first_trade_id: incomplete.first_trade_id,
1315            last_trade_id: incomplete.last_trade_id,
1316            data_source: crate::types::DataSource::default(),
1317
1318            // Market microstructure fields
1319            buy_volume: incomplete.buy_volume,
1320            sell_volume: incomplete.sell_volume,
1321            buy_trade_count: incomplete.buy_trade_count as u32,
1322            sell_trade_count: incomplete.sell_trade_count as u32,
1323            vwap: incomplete.vwap,
1324            buy_turnover: incomplete.buy_turnover,
1325            sell_turnover: incomplete.sell_turnover,
1326        })
1327    }
1328}