1use crate::checkpoint::{
8 AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
9};
10use crate::fixed_point::FixedPoint;
11use crate::interbar::{InterBarConfig, TradeHistory}; use crate::types::{AggTrade, RangeBar};
14use smallvec::SmallVec; #[cfg(feature = "python")]
16use pyo3::prelude::*;
17pub use crate::errors::ProcessingError;
19pub use crate::export_processor::ExportRangeBarProcessor;
21
22pub struct RangeBarProcessor {
24 threshold_decimal_bps: u32,
26
27 pub threshold_ratio: i64,
34
35 current_bar_state: Option<RangeBarState>,
38
39 price_window: PriceWindow,
41
42 last_trade_id: Option<i64>,
44
45 last_timestamp_us: i64,
47
48 anomaly_summary: AnomalySummary,
50
51 resumed_from_checkpoint: bool,
54
55 prevent_same_timestamp_close: bool,
64
65 defer_open: bool,
75
76 trade_history: Option<TradeHistory>,
82
83 inter_bar_config: Option<InterBarConfig>,
88
89 include_intra_bar_features: bool,
96
97 max_gap_us: i64,
106}
107
108#[cold]
111#[inline(never)]
112fn find_unsorted_trade(trades: &[AggTrade]) -> Result<(), ProcessingError> {
113 for i in 1..trades.len() {
114 let prev = &trades[i - 1];
115 let curr = &trades[i];
116 if curr.timestamp < prev.timestamp
117 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
118 {
119 return Err(ProcessingError::UnsortedTrades {
120 index: i,
121 prev_time: prev.timestamp,
122 prev_id: prev.agg_trade_id,
123 curr_time: curr.timestamp,
124 curr_id: curr.agg_trade_id,
125 });
126 }
127 }
128 Ok(())
129}
130
131#[cold]
134#[inline(never)]
135fn unsorted_trade_error(index: usize, prev: &AggTrade, curr: &AggTrade) -> Result<(), ProcessingError> {
136 Err(ProcessingError::UnsortedTrades {
137 index,
138 prev_time: prev.timestamp,
139 prev_id: prev.agg_trade_id,
140 curr_time: curr.timestamp,
141 curr_id: curr.agg_trade_id,
142 })
143}
144
145impl RangeBarProcessor {
146 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
162 Self::with_options(threshold_decimal_bps, true)
163 }
164
165 pub fn with_options(
184 threshold_decimal_bps: u32,
185 prevent_same_timestamp_close: bool,
186 ) -> Result<Self, ProcessingError> {
187 if threshold_decimal_bps < 1 {
191 return Err(ProcessingError::InvalidThreshold {
192 threshold_decimal_bps,
193 });
194 }
195 if threshold_decimal_bps > 100_000 {
196 return Err(ProcessingError::InvalidThreshold {
197 threshold_decimal_bps,
198 });
199 }
200
201 let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
205 / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
206
207 Ok(Self {
208 threshold_decimal_bps,
209 threshold_ratio,
210 current_bar_state: None,
211 price_window: PriceWindow::new(),
212 last_trade_id: None,
213 last_timestamp_us: 0,
214 anomaly_summary: AnomalySummary::default(),
215 resumed_from_checkpoint: false,
216 prevent_same_timestamp_close,
217 defer_open: false,
218 trade_history: None, inter_bar_config: None, include_intra_bar_features: false, max_gap_us: 3_600_000_000, })
223 }
224
225 pub fn prevent_same_timestamp_close(&self) -> bool {
227 self.prevent_same_timestamp_close
228 }
229
230 pub fn with_inter_bar_config(self, config: InterBarConfig) -> Self {
258 self.with_inter_bar_config_and_cache(config, None)
259 }
260
261 pub fn with_inter_bar_config_and_cache(
287 mut self,
288 config: InterBarConfig,
289 external_cache: Option<std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>>,
290 ) -> Self {
291 self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
292 self.inter_bar_config = Some(config);
293 self
294 }
295
296 pub fn inter_bar_enabled(&self) -> bool {
298 self.inter_bar_config.is_some()
299 }
300
301 pub fn with_max_gap(mut self, max_gap_us: i64) -> Self {
311 self.max_gap_us = max_gap_us;
312 self
313 }
314
315 pub fn max_gap_us(&self) -> i64 {
317 self.max_gap_us
318 }
319
320 pub fn with_intra_bar_features(mut self) -> Self {
340 self.include_intra_bar_features = true;
341 self
342 }
343
344 pub fn intra_bar_enabled(&self) -> bool {
346 self.include_intra_bar_features
347 }
348
349 pub fn set_inter_bar_config(&mut self, config: InterBarConfig) {
355 self.set_inter_bar_config_with_cache(config, None);
356 }
357
358 pub fn set_inter_bar_config_with_cache(
364 &mut self,
365 config: InterBarConfig,
366 external_cache: Option<std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>>,
367 ) {
368 self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
369 self.inter_bar_config = Some(config);
370 }
371
372 pub fn set_intra_bar_features(&mut self, enabled: bool) {
374 self.include_intra_bar_features = enabled;
375 }
376
377 #[inline]
401 pub fn process_single_trade(
402 &mut self,
403 trade: &AggTrade,
404 ) -> Result<Option<RangeBar>, ProcessingError> {
405 self.price_window.push(trade.price);
407 self.last_trade_id = Some(trade.agg_trade_id);
408 self.last_timestamp_us = trade.timestamp;
409
410 if let Some(ref mut history) = self.trade_history {
413 history.push(trade);
414 }
415
416 if self.defer_open {
420 if let Some(ref mut history) = self.trade_history {
422 history.on_bar_open(trade.timestamp);
423 }
424 self.current_bar_state = Some(if self.include_intra_bar_features {
425 RangeBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
426 } else {
427 RangeBarState::new(trade, self.threshold_ratio)
428 });
429 self.defer_open = false;
430 return Ok(None);
431 }
432
433 match &mut self.current_bar_state {
434 None => {
435 if let Some(ref mut history) = self.trade_history {
438 history.on_bar_open(trade.timestamp);
439 }
440 self.current_bar_state = Some(if self.include_intra_bar_features {
441 RangeBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
442 } else {
443 RangeBarState::new(trade, self.threshold_ratio)
444 });
445 Ok(None)
446 }
447 Some(bar_state) => {
448 bar_state.accumulate_trade(trade, self.include_intra_bar_features);
451
452 let price_breaches = bar_state.bar.is_breach(
454 trade.price,
455 bar_state.upper_threshold,
456 bar_state.lower_threshold,
457 );
458
459 let timestamp_allows_close = !self.prevent_same_timestamp_close
463 || trade.timestamp != bar_state.bar.open_time;
464
465 if price_breaches && timestamp_allows_close {
466 bar_state.bar.update_with_trade(trade);
468
469 debug_assert!(
471 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
472 );
473 debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
474
475 bar_state.bar.compute_microstructure_features();
477
478 if let Some(ref mut history) = self.trade_history {
481 let inter_bar_features = history.compute_features(bar_state.bar.open_time);
482 bar_state.bar.set_inter_bar_features(&inter_bar_features);
483 history.on_bar_close();
485 }
486
487 if self.include_intra_bar_features {
489 let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_scratch(
491 &bar_state.accumulated_trades,
492 &mut bar_state.scratch_prices,
493 &mut bar_state.scratch_volumes,
494 );
495 bar_state.bar.set_intra_bar_features(&intra_bar_features);
496 }
497
498 let completed_bar = self.current_bar_state.take().unwrap().bar;
501
502 self.defer_open = true;
505
506 Ok(Some(completed_bar))
507 } else {
508 bar_state.bar.update_with_trade(trade);
510 Ok(None)
511 }
512 }
513 }
514 }
515
516 pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
525 self.current_bar_state
526 .as_ref()
527 .map(|state| state.bar.clone())
528 }
529
530 pub fn process_agg_trade_records_with_incomplete(
545 &mut self,
546 agg_trade_records: &[AggTrade],
547 ) -> Result<Vec<RangeBar>, ProcessingError> {
548 self.process_agg_trade_records_with_options(agg_trade_records, true)
549 }
550
551 pub fn process_agg_trade_records(
566 &mut self,
567 agg_trade_records: &[AggTrade],
568 ) -> Result<Vec<RangeBar>, ProcessingError> {
569 self.process_agg_trade_records_with_options(agg_trade_records, false)
570 }
571
572 pub fn process_agg_trade_records_with_options(
586 &mut self,
587 agg_trade_records: &[AggTrade],
588 include_incomplete: bool,
589 ) -> Result<Vec<RangeBar>, ProcessingError> {
590 if agg_trade_records.is_empty() {
591 return Ok(Vec::new());
592 }
593
594 self.validate_trade_ordering(agg_trade_records)?;
596
597 let mut current_bar: Option<RangeBarState> = if self.resumed_from_checkpoint {
600 self.resumed_from_checkpoint = false; let restored_bar = self.current_bar_state.take();
603
604 if let Some(ref bar_state) = restored_bar {
608 let first_trade_ts = agg_trade_records[0].timestamp;
609 let gap = first_trade_ts - bar_state.bar.close_time;
610 if gap > self.max_gap_us {
611 self.anomaly_summary.record_gap();
612 None
614 } else {
615 restored_bar
616 }
617 } else {
618 restored_bar
619 }
620 } else {
621 self.current_bar_state = None;
623 None
624 };
625
626 let mut bars = Vec::with_capacity(agg_trade_records.len() / 50); let mut defer_open = false;
628
629 for agg_record in agg_trade_records {
630 self.price_window.push(agg_record.price);
632 self.last_trade_id = Some(agg_record.agg_trade_id);
633 self.last_timestamp_us = agg_record.timestamp;
634
635 if let Some(ref mut history) = self.trade_history {
637 history.push(agg_record);
638 }
639
640 if defer_open {
641 if let Some(ref mut history) = self.trade_history {
644 history.on_bar_open(agg_record.timestamp);
645 }
646 current_bar = Some(if self.include_intra_bar_features {
647 RangeBarState::new_with_trade_accumulation(
648 agg_record,
649 self.threshold_ratio,
650 )
651 } else {
652 RangeBarState::new(agg_record, self.threshold_ratio)
653 });
654 defer_open = false;
655 continue;
656 }
657
658 match current_bar {
659 None => {
660 if let Some(ref mut history) = self.trade_history {
663 history.on_bar_open(agg_record.timestamp);
664 }
665 current_bar = Some(if self.include_intra_bar_features {
666 RangeBarState::new_with_trade_accumulation(
667 agg_record,
668 self.threshold_ratio,
669 )
670 } else {
671 RangeBarState::new(agg_record, self.threshold_ratio)
672 });
673 }
674 Some(ref mut bar_state) => {
675 bar_state.accumulate_trade(agg_record, self.include_intra_bar_features);
678
679 let price_breaches = bar_state.bar.is_breach(
681 agg_record.price,
682 bar_state.upper_threshold,
683 bar_state.lower_threshold,
684 );
685
686 let timestamp_allows_close = !self.prevent_same_timestamp_close
690 || agg_record.timestamp != bar_state.bar.open_time;
691
692 if price_breaches && timestamp_allows_close {
693 bar_state.bar.update_with_trade(agg_record);
695
696 debug_assert!(
698 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
699 );
700 debug_assert!(
701 bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
702 );
703
704 bar_state.bar.compute_microstructure_features();
706
707 if let Some(ref mut history) = self.trade_history {
709 let inter_bar_features =
710 history.compute_features(bar_state.bar.open_time);
711 bar_state.bar.set_inter_bar_features(&inter_bar_features);
712 history.on_bar_close();
714 }
715
716 if self.include_intra_bar_features {
718 let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_scratch(
720 &bar_state.accumulated_trades,
721 &mut bar_state.scratch_prices,
722 &mut bar_state.scratch_volumes,
723 );
724 bar_state.bar.set_intra_bar_features(&intra_bar_features);
725 }
726
727 bars.push(current_bar.take().unwrap().bar);
730 defer_open = true; } else {
732 bar_state.bar.update_with_trade(agg_record);
734 }
735 }
736 }
737 }
738
739 if include_incomplete {
743 if let Some(ref state) = current_bar {
747 let mut checkpoint_state = state.clone();
748 let _ = std::mem::take(&mut checkpoint_state.accumulated_trades);
750 self.current_bar_state = Some(checkpoint_state);
751 }
752
753 if let Some(mut bar_state) = current_bar {
756 bar_state.bar.compute_microstructure_features();
758
759 if let Some(ref history) = self.trade_history {
761 let inter_bar_features = history.compute_features(bar_state.bar.open_time);
762 bar_state.bar.set_inter_bar_features(&inter_bar_features);
763 }
764
765 if self.include_intra_bar_features {
767 let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_scratch(
769 &bar_state.accumulated_trades,
770 &mut bar_state.scratch_prices,
771 &mut bar_state.scratch_volumes,
772 );
773 bar_state.bar.set_intra_bar_features(&intra_bar_features);
774 }
775
776 bars.push(bar_state.bar);
777 }
778 } else {
779 self.current_bar_state = current_bar;
781 }
782
783 Ok(bars)
784 }
785
786 pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
808 let (incomplete_bar, thresholds) = match &self.current_bar_state {
809 Some(state) => (
810 Some(state.bar.clone()),
811 Some((state.upper_threshold, state.lower_threshold)),
812 ),
813 None => (None, None),
814 };
815
816 let mut checkpoint = Checkpoint::new(
817 symbol.to_string(),
818 self.threshold_decimal_bps,
819 incomplete_bar,
820 thresholds,
821 self.last_timestamp_us,
822 self.last_trade_id,
823 self.price_window.compute_hash(),
824 self.prevent_same_timestamp_close,
825 );
826 checkpoint.defer_open = self.defer_open;
828 checkpoint
829 }
830
831 pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
849 let checkpoint = Self::migrate_checkpoint(checkpoint);
851
852 const THRESHOLD_MIN: u32 = 1;
855 const THRESHOLD_MAX: u32 = 100_000;
856 if checkpoint.threshold_decimal_bps < THRESHOLD_MIN
857 || checkpoint.threshold_decimal_bps > THRESHOLD_MAX
858 {
859 return Err(CheckpointError::InvalidThreshold {
860 threshold: checkpoint.threshold_decimal_bps,
861 min_threshold: THRESHOLD_MIN,
862 max_threshold: THRESHOLD_MAX,
863 });
864 }
865
866 if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
868 return Err(CheckpointError::MissingThresholds);
869 }
870
871 let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
875 (Some(bar), Some((upper, lower))) => Some(RangeBarState {
876 bar,
877 upper_threshold: upper,
878 lower_threshold: lower,
879 accumulated_trades: SmallVec::new(), scratch_prices: SmallVec::new(),
881 scratch_volumes: SmallVec::new(),
882 }),
883 _ => None,
884 };
885
886 let threshold_ratio = ((checkpoint.threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
888 / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
889
890 Ok(Self {
891 threshold_decimal_bps: checkpoint.threshold_decimal_bps,
892 threshold_ratio,
893 current_bar_state,
894 price_window: PriceWindow::new(), last_trade_id: checkpoint.last_trade_id,
896 last_timestamp_us: checkpoint.last_timestamp_us,
897 anomaly_summary: checkpoint.anomaly_summary,
898 resumed_from_checkpoint: true, prevent_same_timestamp_close: checkpoint.prevent_same_timestamp_close,
900 defer_open: checkpoint.defer_open, trade_history: None, inter_bar_config: None, include_intra_bar_features: false, max_gap_us: 3_600_000_000, })
906 }
907
908 fn migrate_checkpoint(mut checkpoint: Checkpoint) -> Checkpoint {
912 match checkpoint.version {
913 1 => {
914 checkpoint.version = 2;
917 checkpoint
918 }
919 2 => {
920 checkpoint
922 }
923 _ => {
924 eprintln!(
926 "Warning: Checkpoint has unknown version {}, treating as v2",
927 checkpoint.version
928 );
929 checkpoint.version = 2;
930 checkpoint
931 }
932 }
933 }
934
935 pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
960 match self.last_trade_id {
961 Some(last_id) => {
962 let expected_id = last_id + 1;
964 if first_trade.agg_trade_id == expected_id {
965 PositionVerification::Exact
966 } else {
967 let missing_count = first_trade.agg_trade_id - expected_id;
968 PositionVerification::Gap {
969 expected_id,
970 actual_id: first_trade.agg_trade_id,
971 missing_count,
972 }
973 }
974 }
975 None => {
976 let gap_us = first_trade.timestamp - self.last_timestamp_us;
978 let gap_ms = gap_us / 1000;
979 PositionVerification::TimestampOnly { gap_ms }
980 }
981 }
982 }
983
984 pub fn anomaly_summary(&self) -> &AnomalySummary {
986 &self.anomaly_summary
987 }
988
989 pub fn threshold_decimal_bps(&self) -> u32 {
991 self.threshold_decimal_bps
992 }
993
994 fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
1000 if trades.is_empty() {
1001 return Ok(());
1002 }
1003
1004 let first = &trades[0];
1008 let last = &trades[trades.len() - 1];
1009
1010 if last.timestamp < first.timestamp
1011 || (last.timestamp == first.timestamp && last.agg_trade_id <= first.agg_trade_id)
1012 {
1013 return find_unsorted_trade(trades);
1015 }
1016
1017 for i in 1..trades.len() {
1019 let prev = &trades[i - 1];
1020 let curr = &trades[i];
1021
1022 if curr.timestamp < prev.timestamp
1024 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
1025 {
1026 return unsorted_trade_error(i, prev, curr);
1027 }
1028 }
1029
1030 Ok(())
1031 }
1032
1033 pub fn reset_at_ouroboros(&mut self) -> Option<RangeBar> {
1055 let orphaned = self.current_bar_state.take().map(|state| state.bar);
1056 self.price_window = PriceWindow::new();
1057 self.last_trade_id = None;
1058 self.last_timestamp_us = 0;
1059 self.resumed_from_checkpoint = false;
1060 self.defer_open = false;
1061 if let Some(ref mut history) = self.trade_history {
1064 history.reset_bar_boundaries();
1065 }
1066 orphaned
1067 }
1068}
1069
1070#[derive(Clone)]
1072struct RangeBarState {
1073 pub bar: RangeBar,
1075
1076 pub upper_threshold: FixedPoint,
1078
1079 pub lower_threshold: FixedPoint,
1081
1082 pub accumulated_trades: SmallVec<[AggTrade; 64]>,
1091
1092 pub scratch_prices: SmallVec<[f64; 64]>,
1096
1097 pub scratch_volumes: SmallVec<[f64; 64]>,
1100}
1101
1102impl RangeBarState {
1103 #[inline]
1106 fn new(trade: &AggTrade, threshold_ratio: i64) -> Self {
1107 let bar = RangeBar::new(trade);
1108
1109 let (upper_threshold, lower_threshold) =
1112 bar.open.compute_range_thresholds_cached(threshold_ratio);
1113
1114 Self {
1115 bar,
1116 upper_threshold,
1117 lower_threshold,
1118 accumulated_trades: SmallVec::new(),
1119 scratch_prices: SmallVec::new(),
1120 scratch_volumes: SmallVec::new(),
1121 }
1122 }
1123
1124 #[inline]
1127 fn new_with_trade_accumulation(trade: &AggTrade, threshold_ratio: i64) -> Self {
1128 let bar = RangeBar::new(trade);
1129
1130 let (upper_threshold, lower_threshold) =
1133 bar.open.compute_range_thresholds_cached(threshold_ratio);
1134
1135 Self {
1136 bar,
1137 upper_threshold,
1138 lower_threshold,
1139 accumulated_trades: {
1140 let mut sv = SmallVec::new();
1141 sv.push(trade.clone());
1142 sv
1143 },
1144 scratch_prices: SmallVec::new(),
1145 scratch_volumes: SmallVec::new(),
1146 }
1147 }
1148
1149 #[inline]
1156 fn accumulate_trade(&mut self, trade: &AggTrade, include_intra: bool) {
1157 if include_intra {
1158 self.accumulated_trades.push(trade.clone());
1159 }
1160 }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165 use super::*;
1166 use crate::test_utils::{self, scenarios};
1167
1168 #[test]
1169 fn test_single_bar_no_breach() {
1170 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::no_breach_sequence(250);
1174
1175 let bars = processor.process_agg_trade_records(&trades).unwrap();
1177 assert_eq!(
1178 bars.len(),
1179 0,
1180 "Strict algorithm should not create bars without breach"
1181 );
1182
1183 let bars_with_incomplete = processor
1185 .process_agg_trade_records_with_incomplete(&trades)
1186 .unwrap();
1187 assert_eq!(
1188 bars_with_incomplete.len(),
1189 1,
1190 "Analysis mode should include incomplete bar"
1191 );
1192
1193 let bar = &bars_with_incomplete[0];
1194 assert_eq!(bar.open.to_string(), "50000.00000000");
1195 assert_eq!(bar.high.to_string(), "50100.00000000");
1196 assert_eq!(bar.low.to_string(), "49900.00000000");
1197 assert_eq!(bar.close.to_string(), "49900.00000000");
1198 }
1199
1200 #[test]
1201 fn test_exact_breach_upward() {
1202 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_upward(250);
1205
1206 let bars = processor.process_agg_trade_records(&trades).unwrap();
1208 assert_eq!(
1209 bars.len(),
1210 1,
1211 "Strict algorithm should only return completed bars"
1212 );
1213
1214 let bar1 = &bars[0];
1216 assert_eq!(bar1.open.to_string(), "50000.00000000");
1217 assert_eq!(bar1.close.to_string(), "50125.00000000"); assert_eq!(bar1.high.to_string(), "50125.00000000");
1220 assert_eq!(bar1.low.to_string(), "50000.00000000");
1221
1222 let bars_with_incomplete = processor
1224 .process_agg_trade_records_with_incomplete(&trades)
1225 .unwrap();
1226 assert_eq!(
1227 bars_with_incomplete.len(),
1228 2,
1229 "Analysis mode should include incomplete bars"
1230 );
1231
1232 let bar2 = &bars_with_incomplete[1];
1234 assert_eq!(bar2.open.to_string(), "50500.00000000"); assert_eq!(bar2.close.to_string(), "50500.00000000");
1236 }
1237
1238 #[test]
1239 fn test_exact_breach_downward() {
1240 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_downward(250);
1243
1244 let bars = processor.process_agg_trade_records(&trades).unwrap();
1245
1246 assert_eq!(bars.len(), 1);
1247
1248 let bar = &bars[0];
1249 assert_eq!(bar.open.to_string(), "50000.00000000");
1250 assert_eq!(bar.close.to_string(), "49875.00000000"); assert_eq!(bar.high.to_string(), "50000.00000000");
1252 assert_eq!(bar.low.to_string(), "49875.00000000");
1253 }
1254
1255 #[test]
1256 fn test_large_gap_single_bar() {
1257 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::large_gap_sequence();
1260
1261 let bars = processor.process_agg_trade_records(&trades).unwrap();
1262
1263 assert_eq!(bars.len(), 1);
1265
1266 let bar = &bars[0];
1267 assert_eq!(bar.open.to_string(), "50000.00000000");
1268 assert_eq!(bar.close.to_string(), "51000.00000000");
1269 assert_eq!(bar.high.to_string(), "51000.00000000");
1270 assert_eq!(bar.low.to_string(), "50000.00000000");
1271 }
1272
1273 #[test]
1274 fn test_unsorted_trades_error() {
1275 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::unsorted_sequence();
1278
1279 let result = processor.process_agg_trade_records(&trades);
1280 assert!(result.is_err());
1281
1282 match result {
1283 Err(ProcessingError::UnsortedTrades { index, .. }) => {
1284 assert_eq!(index, 1);
1285 }
1286 _ => panic!("Expected UnsortedTrades error"),
1287 }
1288 }
1289
1290 #[test]
1291 fn test_threshold_calculation() {
1292 let processor = RangeBarProcessor::new(250).unwrap(); let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1295 let bar_state = RangeBarState::new(&trade, processor.threshold_ratio);
1296
1297 assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
1299 assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
1300 }
1301
1302 #[test]
1303 fn test_empty_trades() {
1304 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::empty_sequence();
1306 let bars = processor.process_agg_trade_records(&trades).unwrap();
1307 assert_eq!(bars.len(), 0);
1308 }
1309
1310 #[test]
1311 fn test_debug_streaming_data() {
1312 let mut processor = RangeBarProcessor::new(100).unwrap(); let trades = vec![
1316 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1317 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1319 ];
1320
1321 println!("Test data prices: 50014 -> 50163 -> 50032");
1322 println!("Expected price movements: +0.3% then -0.26%");
1323
1324 let bars = processor.process_agg_trade_records(&trades).unwrap();
1325 println!("Generated {} range bars", bars.len());
1326
1327 for (i, bar) in bars.iter().enumerate() {
1328 println!(
1329 " Bar {}: O={} H={} L={} C={}",
1330 i + 1,
1331 bar.open,
1332 bar.high,
1333 bar.low,
1334 bar.close
1335 );
1336 }
1337
1338 assert!(
1340 !bars.is_empty(),
1341 "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
1342 );
1343 }
1344
1345 #[test]
1346 fn test_threshold_validation() {
1347 assert!(RangeBarProcessor::new(250).is_ok());
1349
1350 assert!(matches!(
1352 RangeBarProcessor::new(0),
1353 Err(ProcessingError::InvalidThreshold {
1354 threshold_decimal_bps: 0
1355 })
1356 ));
1357
1358 assert!(matches!(
1360 RangeBarProcessor::new(150_000),
1361 Err(ProcessingError::InvalidThreshold {
1362 threshold_decimal_bps: 150_000
1363 })
1364 ));
1365
1366 assert!(RangeBarProcessor::new(1).is_ok());
1368
1369 assert!(RangeBarProcessor::new(100_000).is_ok());
1371 }
1372
1373 #[test]
1374 fn test_export_processor_with_manual_trades() {
1375 println!("Testing ExportRangeBarProcessor with same trade data...");
1376
1377 let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); let trades = vec![
1381 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1382 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1384 ];
1385
1386 println!(
1387 "Processing {} trades with ExportRangeBarProcessor...",
1388 trades.len()
1389 );
1390
1391 export_processor.process_trades_continuously(&trades);
1392 let bars = export_processor.get_all_completed_bars();
1393
1394 println!(
1395 "ExportRangeBarProcessor generated {} range bars",
1396 bars.len()
1397 );
1398 for (i, bar) in bars.iter().enumerate() {
1399 println!(
1400 " Bar {}: O={} H={} L={} C={}",
1401 i + 1,
1402 bar.open,
1403 bar.high,
1404 bar.low,
1405 bar.close
1406 );
1407 }
1408
1409 assert!(
1411 !bars.is_empty(),
1412 "ExportRangeBarProcessor should generate same results as basic processor"
1413 );
1414 }
1415
1416 #[test]
1419 fn test_checkpoint_creation() {
1420 let mut processor = RangeBarProcessor::new(250).unwrap();
1421
1422 let trades = scenarios::no_breach_sequence(250);
1424 let _bars = processor.process_agg_trade_records(&trades).unwrap();
1425
1426 let checkpoint = processor.create_checkpoint("BTCUSDT");
1428
1429 assert_eq!(checkpoint.symbol, "BTCUSDT");
1430 assert_eq!(checkpoint.threshold_decimal_bps, 250);
1431 assert!(checkpoint.has_incomplete_bar()); assert!(checkpoint.thresholds.is_some()); assert!(checkpoint.last_trade_id.is_some()); }
1435
1436 #[test]
1437 fn test_checkpoint_serialization_roundtrip() {
1438 let mut processor = RangeBarProcessor::new(250).unwrap();
1439
1440 let trades = scenarios::no_breach_sequence(250);
1442 let _bars = processor.process_agg_trade_records(&trades).unwrap();
1443
1444 let checkpoint = processor.create_checkpoint("BTCUSDT");
1446
1447 let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
1449
1450 let restored: Checkpoint =
1452 serde_json::from_str(&json).expect("Deserialization should succeed");
1453
1454 assert_eq!(restored.symbol, checkpoint.symbol);
1455 assert_eq!(
1456 restored.threshold_decimal_bps,
1457 checkpoint.threshold_decimal_bps
1458 );
1459 assert_eq!(
1460 restored.incomplete_bar.is_some(),
1461 checkpoint.incomplete_bar.is_some()
1462 );
1463 }
1464
1465 #[test]
1466 fn test_cross_file_bar_continuation() {
1467 let mut all_trades = Vec::new();
1472
1473 let base_timestamp = 1640995200000000i64; for i in 0..20 {
1479 let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
1480 let trade = test_utils::create_test_agg_trade(
1481 i + 1,
1482 &format!("{:.8}", price),
1483 "1.0",
1484 base_timestamp + (i * 1000000),
1485 );
1486 all_trades.push(trade);
1487 }
1488
1489 let mut processor_full = RangeBarProcessor::new(100).unwrap(); let bars_full = processor_full
1492 .process_agg_trade_records(&all_trades)
1493 .unwrap();
1494
1495 let split_point = 10; let mut processor_1 = RangeBarProcessor::new(100).unwrap();
1500 let part1_trades = &all_trades[0..split_point];
1501 let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
1502
1503 let checkpoint = processor_1.create_checkpoint("TEST");
1505
1506 let mut processor_2 = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
1508 let part2_trades = &all_trades[split_point..];
1509 let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
1510
1511 let split_total = bars_1.len() + bars_2.len();
1514
1515 println!("Full processing: {} bars", bars_full.len());
1516 println!(
1517 "Split processing: {} + {} = {} bars",
1518 bars_1.len(),
1519 bars_2.len(),
1520 split_total
1521 );
1522
1523 assert_eq!(
1524 split_total,
1525 bars_full.len(),
1526 "Split processing should produce same bar count as full processing"
1527 );
1528
1529 let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
1531 for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
1532 assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
1533 assert_eq!(
1534 full.close.0, split.close.0,
1535 "Bar {} close price mismatch",
1536 i
1537 );
1538 }
1539 }
1540
1541 #[test]
1542 fn test_verify_position_exact() {
1543 let mut processor = RangeBarProcessor::new(250).unwrap();
1544
1545 let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1547 let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1548
1549 let _ = processor.process_single_trade(&trade1);
1550 let _ = processor.process_single_trade(&trade2);
1551
1552 let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
1554
1555 let verification = processor.verify_position(&next_trade);
1557
1558 assert_eq!(verification, PositionVerification::Exact);
1559 }
1560
1561 #[test]
1562 fn test_verify_position_gap() {
1563 let mut processor = RangeBarProcessor::new(250).unwrap();
1564
1565 let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1567 let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1568
1569 let _ = processor.process_single_trade(&trade1);
1570 let _ = processor.process_single_trade(&trade2);
1571
1572 let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
1574
1575 let verification = processor.verify_position(&next_trade);
1577
1578 match verification {
1579 PositionVerification::Gap {
1580 expected_id,
1581 actual_id,
1582 missing_count,
1583 } => {
1584 assert_eq!(expected_id, 102);
1585 assert_eq!(actual_id, 105);
1586 assert_eq!(missing_count, 3);
1587 }
1588 _ => panic!("Expected Gap verification, got {:?}", verification),
1589 }
1590 }
1591
1592 #[test]
1594 fn test_verify_position_timestamp_only() {
1595 let processor = RangeBarProcessor::new(250).unwrap();
1597
1598 let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 5000000);
1599 let verification = processor.verify_position(&trade);
1600
1601 match verification {
1604 PositionVerification::TimestampOnly { gap_ms } => {
1605 assert_eq!(gap_ms, 5000, "gap_ms should be (timestamp - 0) / 1000");
1606 }
1607 _ => panic!("Expected TimestampOnly verification, got {:?}", verification),
1608 }
1609 }
1610
1611 #[test]
1612 fn test_checkpoint_clean_completion() {
1613 let mut processor = RangeBarProcessor::new(100).unwrap(); let trades = vec![
1620 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1621 test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), ];
1623
1624 let bars = processor.process_agg_trade_records(&trades).unwrap();
1625 assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1626
1627 let checkpoint = processor.create_checkpoint("TEST");
1630
1631 assert!(
1633 !checkpoint.has_incomplete_bar(),
1634 "No incomplete bar when last trade was a breach with no following trade"
1635 );
1636 }
1637
1638 #[test]
1639 fn test_checkpoint_with_remainder() {
1640 let mut processor = RangeBarProcessor::new(100).unwrap(); let trades = vec![
1645 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1646 test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), ];
1649
1650 let bars = processor.process_agg_trade_records(&trades).unwrap();
1651 assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1652
1653 let checkpoint = processor.create_checkpoint("TEST");
1655
1656 assert!(
1657 checkpoint.has_incomplete_bar(),
1658 "Should have incomplete bar from trade 3"
1659 );
1660
1661 let incomplete = checkpoint.incomplete_bar.unwrap();
1663 assert_eq!(
1664 incomplete.open.to_string(),
1665 "50110.00000000",
1666 "Incomplete bar should open at trade 3 price"
1667 );
1668 }
1669
1670 #[test]
1677 fn test_streaming_batch_parity() {
1678 let threshold = 250; let trades = test_utils::AggTradeBuilder::new()
1682 .add_trade(1, 1.0, 0) .add_trade(2, 1.001, 1000) .add_trade(3, 1.003, 2000) .add_trade(4, 1.004, 3000) .add_trade(5, 1.005, 4000) .add_trade(6, 1.008, 5000) .add_trade(7, 1.009, 6000) .build();
1690
1691 let mut batch_processor = RangeBarProcessor::new(threshold).unwrap();
1693 let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
1694 let batch_incomplete = batch_processor.get_incomplete_bar();
1695
1696 let mut stream_processor = RangeBarProcessor::new(threshold).unwrap();
1698 let mut stream_bars: Vec<RangeBar> = Vec::new();
1699 for trade in &trades {
1700 if let Some(bar) = stream_processor
1701 .process_single_trade(trade)
1702 .unwrap()
1703 {
1704 stream_bars.push(bar);
1705 }
1706 }
1707 let stream_incomplete = stream_processor.get_incomplete_bar();
1708
1709 assert_eq!(
1711 batch_bars.len(),
1712 stream_bars.len(),
1713 "Batch and streaming should produce same number of completed bars"
1714 );
1715
1716 for (i, (batch_bar, stream_bar)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1717 assert_eq!(
1718 batch_bar.open, stream_bar.open,
1719 "Bar {i}: open price mismatch"
1720 );
1721 assert_eq!(
1722 batch_bar.close, stream_bar.close,
1723 "Bar {i}: close price mismatch"
1724 );
1725 assert_eq!(
1726 batch_bar.high, stream_bar.high,
1727 "Bar {i}: high price mismatch"
1728 );
1729 assert_eq!(batch_bar.low, stream_bar.low, "Bar {i}: low price mismatch");
1730 assert_eq!(
1731 batch_bar.volume, stream_bar.volume,
1732 "Bar {i}: volume mismatch (double-counting?)"
1733 );
1734 assert_eq!(
1735 batch_bar.open_time, stream_bar.open_time,
1736 "Bar {i}: open_time mismatch"
1737 );
1738 assert_eq!(
1739 batch_bar.close_time, stream_bar.close_time,
1740 "Bar {i}: close_time mismatch"
1741 );
1742 assert_eq!(
1743 batch_bar.individual_trade_count, stream_bar.individual_trade_count,
1744 "Bar {i}: trade count mismatch"
1745 );
1746 }
1747
1748 match (batch_incomplete, stream_incomplete) {
1750 (Some(b), Some(s)) => {
1751 assert_eq!(b.open, s.open, "Incomplete bar: open mismatch");
1752 assert_eq!(b.close, s.close, "Incomplete bar: close mismatch");
1753 assert_eq!(b.volume, s.volume, "Incomplete bar: volume mismatch");
1754 }
1755 (None, None) => {} _ => panic!("Incomplete bar presence mismatch between batch and streaming"),
1757 }
1758 }
1759
1760 mod proptest_batch_streaming_parity {
1766 use super::*;
1767 use proptest::prelude::*;
1768
1769 fn trade_sequence(
1771 n: usize,
1772 base_price: f64,
1773 volatility: f64,
1774 ) -> Vec<AggTrade> {
1775 let mut trades = Vec::with_capacity(n);
1776 let mut price = base_price;
1777 let base_ts = 1640995200000i64; for i in 0..n {
1780 let step = ((i as f64 * 0.3).sin() * volatility)
1782 + ((i as f64 * 0.07).cos() * volatility * 0.5);
1783 price += step;
1784 if price < 100.0 {
1786 price = 100.0 + (i as f64 * 0.01).sin().abs() * 50.0;
1787 }
1788
1789 let trade = test_utils::create_test_agg_trade_with_range(
1790 i as i64 + 1,
1791 &format!("{:.8}", price),
1792 "1.50000000",
1793 base_ts + (i as i64 * 500), (i as i64 + 1) * 10,
1795 (i as i64 + 1) * 10,
1796 i % 3 != 0, );
1798 trades.push(trade);
1799 }
1800 trades
1801 }
1802
1803 fn assert_bar_parity(i: usize, batch: &RangeBar, stream: &RangeBar) {
1805 assert_eq!(batch.open_time, stream.open_time, "Bar {i}: open_time");
1807 assert_eq!(batch.close_time, stream.close_time, "Bar {i}: close_time");
1808 assert_eq!(batch.open, stream.open, "Bar {i}: open");
1809 assert_eq!(batch.high, stream.high, "Bar {i}: high");
1810 assert_eq!(batch.low, stream.low, "Bar {i}: low");
1811 assert_eq!(batch.close, stream.close, "Bar {i}: close");
1812
1813 assert_eq!(batch.volume, stream.volume, "Bar {i}: volume");
1815 assert_eq!(batch.turnover, stream.turnover, "Bar {i}: turnover");
1816 assert_eq!(batch.buy_volume, stream.buy_volume, "Bar {i}: buy_volume");
1817 assert_eq!(batch.sell_volume, stream.sell_volume, "Bar {i}: sell_volume");
1818 assert_eq!(batch.buy_turnover, stream.buy_turnover, "Bar {i}: buy_turnover");
1819 assert_eq!(batch.sell_turnover, stream.sell_turnover, "Bar {i}: sell_turnover");
1820
1821 assert_eq!(batch.individual_trade_count, stream.individual_trade_count, "Bar {i}: trade_count");
1823 assert_eq!(batch.agg_record_count, stream.agg_record_count, "Bar {i}: agg_record_count");
1824 assert_eq!(batch.first_trade_id, stream.first_trade_id, "Bar {i}: first_trade_id");
1825 assert_eq!(batch.last_trade_id, stream.last_trade_id, "Bar {i}: last_trade_id");
1826 assert_eq!(batch.first_agg_trade_id, stream.first_agg_trade_id, "Bar {i}: first_agg_trade_id");
1827 assert_eq!(batch.last_agg_trade_id, stream.last_agg_trade_id, "Bar {i}: last_agg_trade_id");
1828 assert_eq!(batch.buy_trade_count, stream.buy_trade_count, "Bar {i}: buy_trade_count");
1829 assert_eq!(batch.sell_trade_count, stream.sell_trade_count, "Bar {i}: sell_trade_count");
1830
1831 assert_eq!(batch.vwap, stream.vwap, "Bar {i}: vwap");
1833
1834 assert_eq!(batch.duration_us, stream.duration_us, "Bar {i}: duration_us");
1836 assert_eq!(batch.ofi.to_bits(), stream.ofi.to_bits(), "Bar {i}: ofi");
1837 assert_eq!(batch.vwap_close_deviation.to_bits(), stream.vwap_close_deviation.to_bits(), "Bar {i}: vwap_close_dev");
1838 assert_eq!(batch.price_impact.to_bits(), stream.price_impact.to_bits(), "Bar {i}: price_impact");
1839 assert_eq!(batch.kyle_lambda_proxy.to_bits(), stream.kyle_lambda_proxy.to_bits(), "Bar {i}: kyle_lambda");
1840 assert_eq!(batch.trade_intensity.to_bits(), stream.trade_intensity.to_bits(), "Bar {i}: trade_intensity");
1841 assert_eq!(batch.volume_per_trade.to_bits(), stream.volume_per_trade.to_bits(), "Bar {i}: vol_per_trade");
1842 assert_eq!(batch.aggression_ratio.to_bits(), stream.aggression_ratio.to_bits(), "Bar {i}: aggression_ratio");
1843 assert_eq!(batch.aggregation_density_f64.to_bits(), stream.aggregation_density_f64.to_bits(), "Bar {i}: agg_density");
1844 assert_eq!(batch.turnover_imbalance.to_bits(), stream.turnover_imbalance.to_bits(), "Bar {i}: turnover_imbalance");
1845 }
1846
1847 proptest! {
1848 #[test]
1850 fn batch_streaming_parity_random(
1851 n in 200usize..500,
1852 volatility in 10.0f64..200.0,
1853 ) {
1854 let trades = trade_sequence(n, 50000.0, volatility);
1855
1856 let mut batch_proc = RangeBarProcessor::new(250).unwrap();
1858 let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
1859 let batch_incomplete = batch_proc.get_incomplete_bar();
1860
1861 let mut stream_proc = RangeBarProcessor::new(250).unwrap();
1863 let mut stream_bars: Vec<RangeBar> = Vec::new();
1864 for trade in &trades {
1865 if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
1866 stream_bars.push(bar);
1867 }
1868 }
1869 let stream_incomplete = stream_proc.get_incomplete_bar();
1870
1871 prop_assert_eq!(batch_bars.len(), stream_bars.len(),
1873 "Completed bar count mismatch: batch={}, stream={} for n={}, vol={}",
1874 batch_bars.len(), stream_bars.len(), n, volatility);
1875
1876 for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1878 assert_bar_parity(i, b, s);
1879 }
1880
1881 match (&batch_incomplete, &stream_incomplete) {
1883 (Some(b), Some(s)) => assert_bar_parity(batch_bars.len(), b, s),
1884 (None, None) => {}
1885 _ => prop_assert!(false,
1886 "Incomplete bar presence mismatch: batch={}, stream={}",
1887 batch_incomplete.is_some(), stream_incomplete.is_some()),
1888 }
1889 }
1890
1891 #[test]
1893 fn batch_streaming_parity_thresholds(
1894 threshold in 100u32..1000,
1895 ) {
1896 let trades = trade_sequence(300, 50000.0, 80.0);
1897
1898 let mut batch_proc = RangeBarProcessor::new(threshold).unwrap();
1899 let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
1900
1901 let mut stream_proc = RangeBarProcessor::new(threshold).unwrap();
1902 let mut stream_bars: Vec<RangeBar> = Vec::new();
1903 for trade in &trades {
1904 if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
1905 stream_bars.push(bar);
1906 }
1907 }
1908
1909 prop_assert_eq!(batch_bars.len(), stream_bars.len(),
1910 "Bar count mismatch at threshold={}", threshold);
1911
1912 for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1913 assert_bar_parity(i, b, s);
1914 }
1915 }
1916 }
1917 }
1918
1919 #[test]
1921 fn test_defer_open_new_bar_opens_with_next_trade() {
1922 let mut processor = RangeBarProcessor::new(250).unwrap();
1923
1924 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1926 assert!(processor.process_single_trade(&t1).unwrap().is_none());
1927
1928 let t2 = test_utils::create_test_agg_trade(2, "50150.0", "2.0", 2000);
1930 let bar = processor.process_single_trade(&t2).unwrap();
1931 assert!(bar.is_some(), "Should close bar on breach");
1932
1933 let closed_bar = bar.unwrap();
1934 assert_eq!(closed_bar.open.to_string(), "50000.00000000");
1935 assert_eq!(closed_bar.close.to_string(), "50150.00000000");
1936
1937 assert!(
1939 processor.get_incomplete_bar().is_none(),
1940 "No incomplete bar after breach - defer_open is true"
1941 );
1942
1943 let t3 = test_utils::create_test_agg_trade(3, "50100.0", "3.0", 3000);
1945 assert!(processor.process_single_trade(&t3).unwrap().is_none());
1946
1947 let incomplete = processor.get_incomplete_bar().unwrap();
1948 assert_eq!(
1949 incomplete.open.to_string(),
1950 "50100.00000000",
1951 "New bar should open at trade 3's price, not trade 2's"
1952 );
1953 }
1954
1955 #[test]
1958 fn test_bar_close_take_single_trade() {
1959 let mut processor = RangeBarProcessor::new(250).unwrap();
1962 let trades = scenarios::single_breach_sequence(250);
1963
1964 for trade in &trades[..trades.len() - 1] {
1965 let result = processor.process_single_trade(trade).unwrap();
1966 assert!(result.is_none());
1967 }
1968
1969 let bar = processor
1971 .process_single_trade(trades.last().unwrap())
1972 .unwrap()
1973 .expect("Should produce completed bar");
1974
1975 assert_eq!(bar.open.to_string(), "50000.00000000");
1977 assert!(bar.high >= bar.open.max(bar.close));
1978 assert!(bar.low <= bar.open.min(bar.close));
1979 assert!(bar.volume > 0);
1980
1981 assert!(processor.get_incomplete_bar().is_none());
1983 }
1984
1985 #[test]
1986 fn test_bar_close_take_batch() {
1987 let mut processor = RangeBarProcessor::new(250).unwrap();
1990 let trades = scenarios::large_sequence(500);
1991
1992 let bars = processor.process_agg_trade_records(&trades).unwrap();
1993 assert!(
1994 !bars.is_empty(),
1995 "Should produce at least one completed bar"
1996 );
1997
1998 for bar in &bars {
2000 assert!(bar.high >= bar.open.max(bar.close));
2001 assert!(bar.low <= bar.open.min(bar.close));
2002 assert!(bar.volume > 0);
2003 assert!(bar.close_time >= bar.open_time);
2004 }
2005 }
2006
2007 #[test]
2008 fn test_checkpoint_conditional_clone() {
2009 let trades = scenarios::no_breach_sequence(250);
2012
2013 let mut processor1 = RangeBarProcessor::new(250).unwrap();
2015 let bars_without = processor1.process_agg_trade_records(&trades).unwrap();
2016 assert_eq!(bars_without.len(), 0);
2017 assert!(processor1.get_incomplete_bar().is_some());
2019
2020 let mut processor2 = RangeBarProcessor::new(250).unwrap();
2022 let bars_with = processor2
2023 .process_agg_trade_records_with_incomplete(&trades)
2024 .unwrap();
2025 assert_eq!(bars_with.len(), 1);
2026 assert!(processor2.get_incomplete_bar().is_some());
2028
2029 let cp1 = processor1.get_incomplete_bar().unwrap();
2031 let cp2 = processor2.get_incomplete_bar().unwrap();
2032 assert_eq!(cp1.open, cp2.open);
2033 assert_eq!(cp1.close, cp2.close);
2034 assert_eq!(cp1.high, cp2.high);
2035 assert_eq!(cp1.low, cp2.low);
2036 }
2037
2038 #[test]
2039 fn test_checkpoint_v1_to_v2_migration() {
2040 let v1_json = r#"{
2043 "symbol": "BTCUSDT",
2044 "threshold_decimal_bps": 250,
2045 "incomplete_bar": null,
2046 "thresholds": null,
2047 "last_timestamp_us": 1640995200000000,
2048 "last_trade_id": 5000,
2049 "price_hash": 0,
2050 "anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0},
2051 "prevent_same_timestamp_close": true,
2052 "defer_open": false
2053 }"#;
2054
2055 let checkpoint: Checkpoint = serde_json::from_str(v1_json).unwrap();
2057 assert_eq!(checkpoint.version, 1, "Old checkpoints should default to v1");
2058 assert_eq!(checkpoint.symbol, "BTCUSDT");
2059 assert_eq!(checkpoint.threshold_decimal_bps, 250);
2060
2061 let mut processor = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2063
2064 assert!(!processor.get_incomplete_bar().is_some(), "No incomplete bar before processing");
2066
2067 let trades = scenarios::single_breach_sequence(250);
2069 let bars = processor.process_agg_trade_records(&trades).unwrap();
2070
2071 assert!(!bars.is_empty(), "Should produce bars after v1→v2 migration");
2073 assert!(bars[0].volume > 0, "Bar should have volume after migration");
2075 assert!(bars[0].close_time >= bars[0].open_time, "Bar times should be valid");
2076
2077 let new_checkpoint = processor.create_checkpoint("BTCUSDT");
2079 assert_eq!(new_checkpoint.version, 2, "New checkpoints should be v2");
2080 assert_eq!(new_checkpoint.symbol, "BTCUSDT");
2081
2082 let json = serde_json::to_string(&new_checkpoint).unwrap();
2084 let restored: Checkpoint = serde_json::from_str(&json).unwrap();
2085 assert_eq!(restored.version, 2);
2086 assert_eq!(restored.symbol, "BTCUSDT");
2087 }
2088
2089 #[test]
2094 fn test_from_checkpoint_invalid_threshold_zero() {
2095 let checkpoint = Checkpoint::new(
2096 "BTCUSDT".to_string(), 0, None, None, 0, None, 0, true,
2097 );
2098 match RangeBarProcessor::from_checkpoint(checkpoint) {
2099 Err(CheckpointError::InvalidThreshold { threshold: 0, .. }) => {}
2100 other => panic!("Expected InvalidThreshold(0), got {:?}", other.err()),
2101 }
2102 }
2103
2104 #[test]
2105 fn test_from_checkpoint_invalid_threshold_too_high() {
2106 let checkpoint = Checkpoint::new(
2107 "BTCUSDT".to_string(), 200_000, None, None, 0, None, 0, true,
2108 );
2109 match RangeBarProcessor::from_checkpoint(checkpoint) {
2110 Err(CheckpointError::InvalidThreshold { threshold: 200_000, .. }) => {}
2111 other => panic!("Expected InvalidThreshold(200000), got {:?}", other.err()),
2112 }
2113 }
2114
2115 #[test]
2116 fn test_from_checkpoint_missing_thresholds() {
2117 let bar = RangeBar::new(&test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000));
2118 let mut checkpoint = Checkpoint::new(
2119 "BTCUSDT".to_string(), 250, None, None, 0, None, 0, true,
2120 );
2121 checkpoint.incomplete_bar = Some(bar);
2122 checkpoint.thresholds = None;
2123
2124 match RangeBarProcessor::from_checkpoint(checkpoint) {
2125 Err(CheckpointError::MissingThresholds) => {}
2126 other => panic!("Expected MissingThresholds, got {:?}", other.err()),
2127 }
2128 }
2129
2130 #[test]
2131 fn test_from_checkpoint_unknown_version_treated_as_v2() {
2132 let mut checkpoint = Checkpoint::new(
2133 "BTCUSDT".to_string(), 250, None, None, 0, None, 0, true,
2134 );
2135 checkpoint.version = 99;
2136
2137 let processor = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2138 assert_eq!(processor.threshold_decimal_bps(), 250);
2139 }
2140
2141 #[test]
2142 fn test_from_checkpoint_valid_with_incomplete_bar() {
2143 use crate::fixed_point::FixedPoint;
2144 let bar = RangeBar::new(&test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000));
2145 let upper = FixedPoint::from_str("50125.0").unwrap();
2146 let lower = FixedPoint::from_str("49875.0").unwrap();
2147
2148 let checkpoint = Checkpoint::new(
2149 "BTCUSDT".to_string(), 250, Some(bar), Some((upper, lower)), 0, None, 0, true,
2150 );
2151
2152 let processor = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2153 assert!(processor.get_incomplete_bar().is_some(), "Should restore incomplete bar");
2154 }
2155
2156 #[test]
2161 fn test_reset_at_ouroboros_with_orphan() {
2162 let mut processor = RangeBarProcessor::new(250).unwrap();
2163
2164 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2166 let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 2000);
2167 assert!(processor.process_single_trade(&t1).unwrap().is_none());
2168 assert!(processor.process_single_trade(&t2).unwrap().is_none());
2169 assert!(processor.get_incomplete_bar().is_some(), "Should have incomplete bar");
2170
2171 let orphan = processor.reset_at_ouroboros();
2173 assert!(orphan.is_some(), "Should return orphaned bar");
2174 let orphan_bar = orphan.unwrap();
2175 assert_eq!(orphan_bar.open.to_string(), "50000.00000000");
2176
2177 assert!(processor.get_incomplete_bar().is_none(), "No bar after reset");
2179 }
2180
2181 #[test]
2182 fn test_reset_at_ouroboros_clean_state() {
2183 let mut processor = RangeBarProcessor::new(250).unwrap();
2184
2185 let orphan = processor.reset_at_ouroboros();
2187 assert!(orphan.is_none(), "No orphan when state is clean");
2188 assert!(processor.get_incomplete_bar().is_none());
2189 }
2190
2191 #[test]
2192 fn test_reset_at_ouroboros_clears_defer_open() {
2193 let mut processor = RangeBarProcessor::new(250).unwrap();
2194
2195 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2197 let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); processor.process_single_trade(&t1).unwrap();
2199 let bar = processor.process_single_trade(&t2).unwrap();
2200 assert!(bar.is_some(), "Should breach");
2201
2202 assert!(processor.get_incomplete_bar().is_none());
2204
2205 processor.reset_at_ouroboros();
2207
2208 let t3 = test_utils::create_test_agg_trade(3, "50000.0", "1.0", 3000);
2210 processor.process_single_trade(&t3).unwrap();
2211 assert!(processor.get_incomplete_bar().is_some(), "Should have new bar after reset");
2212 }
2213
2214 #[test]
2217 fn test_single_trade_no_bar() {
2218 let mut processor = RangeBarProcessor::new(250).unwrap();
2220 let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2221 let bars = processor.process_agg_trade_records(&[trade]).unwrap();
2222 assert_eq!(bars.len(), 0, "Single trade should not produce a completed bar");
2223 assert!(processor.get_incomplete_bar().is_some(), "Should have incomplete bar");
2224 }
2225
2226 #[test]
2227 fn test_identical_timestamps_no_close() {
2228 let mut processor = RangeBarProcessor::new(250).unwrap();
2230 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2231 let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1000); let bars = processor.process_agg_trade_records(&[t1, t2]).unwrap();
2233 assert_eq!(bars.len(), 0, "Bar should not close on same timestamp as open (Issue #36)");
2234 }
2235
2236 #[test]
2237 fn test_identical_timestamps_then_different_closes() {
2238 let mut processor = RangeBarProcessor::new(250).unwrap();
2240 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2241 let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 1000); let t3 = test_utils::create_test_agg_trade(3, "50200.0", "1.0", 2000); let bars = processor.process_agg_trade_records(&[t1, t2, t3]).unwrap();
2244 assert_eq!(bars.len(), 1, "Should close when breach at different timestamp");
2245 }
2246
2247 #[test]
2248 fn test_streaming_defer_open_semantics() {
2249 let mut processor = RangeBarProcessor::new(250).unwrap();
2251 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2252 let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); let t3 = test_utils::create_test_agg_trade(3, "51000.0", "1.0", 3000); processor.process_single_trade(&t1).unwrap();
2256 let bar = processor.process_single_trade(&t2).unwrap();
2257 assert!(bar.is_some(), "Trade 2 should cause a breach");
2258
2259 assert!(processor.get_incomplete_bar().is_none());
2261
2262 let bar2 = processor.process_single_trade(&t3).unwrap();
2264 assert!(bar2.is_none(), "Trade 3 should open new bar, not breach");
2265 let incomplete = processor.get_incomplete_bar().unwrap();
2266 assert_eq!(incomplete.open.to_f64(), 51000.0, "New bar should open at t3 price");
2267 }
2268
2269 #[test]
2270 fn test_process_empty_then_trades() {
2271 let mut processor = RangeBarProcessor::new(250).unwrap();
2273 let bars = processor.process_agg_trade_records(&[]).unwrap();
2274 assert_eq!(bars.len(), 0);
2275 assert!(processor.get_incomplete_bar().is_none());
2276
2277 let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2279 let bars = processor.process_agg_trade_records(&[trade]).unwrap();
2280 assert_eq!(bars.len(), 0);
2281 assert!(processor.get_incomplete_bar().is_some());
2282 }
2283
2284 #[test]
2285 fn test_multiple_breaches_in_batch() {
2286 let mut processor = RangeBarProcessor::new(250).unwrap();
2288 let trades = vec![
2289 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
2290 test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000), test_utils::create_test_agg_trade(3, "50500.0", "1.0", 3000), test_utils::create_test_agg_trade(4, "50700.0", "1.0", 4000), test_utils::create_test_agg_trade(5, "51000.0", "1.0", 5000), ];
2295 let bars = processor.process_agg_trade_records(&trades).unwrap();
2296 assert_eq!(bars.len(), 2, "Should produce 2 completed bars from 2 breaches");
2297 }
2298
2299 #[test]
2300 fn test_streaming_batch_parity_extended() {
2301 let threshold = 100;
2304
2305 let mut trades = Vec::new();
2307 let mut price = 50000.0;
2308 for i in 0..20 {
2309 if i % 3 == 0 && i > 0 {
2311 price *= 1.002; } else if i % 3 == 1 && i > 1 {
2313 price *= 0.998; } else {
2315 price *= 1.0005; }
2317 trades.push(test_utils::create_test_agg_trade(
2318 (i + 1) as i64,
2319 &format!("{:.8}", price),
2320 "1.0",
2321 (i as i64 + 1) * 1000,
2322 ));
2323 }
2324
2325 let mut batch_processor = RangeBarProcessor::new(threshold).unwrap();
2327 let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
2328
2329 let mut stream_processor = RangeBarProcessor::new(threshold).unwrap();
2331 let mut stream_bars: Vec<RangeBar> = Vec::new();
2332 for trade in &trades {
2333 if let Some(bar) = stream_processor.process_single_trade(trade).unwrap() {
2334 stream_bars.push(bar);
2335 }
2336 }
2337
2338 assert!(batch_bars.len() >= 3, "Should produce at least 3 bars from zigzag pattern");
2340 assert_eq!(
2341 batch_bars.len(), stream_bars.len(),
2342 "Batch ({}) and streaming ({}) bar count mismatch",
2343 batch_bars.len(), stream_bars.len()
2344 );
2345
2346 for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
2347 assert_eq!(b.open, s.open, "Bar {i}: open mismatch");
2348 assert_eq!(b.close, s.close, "Bar {i}: close mismatch");
2349 assert_eq!(b.high, s.high, "Bar {i}: high mismatch");
2350 assert_eq!(b.low, s.low, "Bar {i}: low mismatch");
2351 assert_eq!(b.volume, s.volume, "Bar {i}: volume mismatch");
2352 assert_eq!(b.open_time, s.open_time, "Bar {i}: open_time mismatch");
2353 assert_eq!(b.close_time, s.close_time, "Bar {i}: close_time mismatch");
2354 assert_eq!(b.individual_trade_count, s.individual_trade_count, "Bar {i}: trade_count mismatch");
2355 }
2356
2357 let batch_inc = batch_processor.get_incomplete_bar();
2359 let stream_inc = stream_processor.get_incomplete_bar();
2360 match (&batch_inc, &stream_inc) {
2361 (Some(b), Some(s)) => {
2362 assert_eq!(b.open, s.open, "Incomplete: open mismatch");
2363 assert_eq!(b.volume, s.volume, "Incomplete: volume mismatch");
2364 }
2365 (None, None) => {}
2366 _ => panic!("Incomplete bar presence mismatch"),
2367 }
2368 }
2369
2370 #[test]
2371 fn test_multi_batch_sequential_state_continuity() {
2372 let mut processor = RangeBarProcessor::new(100).unwrap(); let mut all_bars = Vec::new();
2376
2377 let batch1 = vec![
2379 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
2380 test_utils::create_test_agg_trade(2, "50020.0", "1.0", 2000),
2381 test_utils::create_test_agg_trade(3, "50060.0", "1.0", 3000), ];
2383 let bars1 = processor.process_agg_trade_records(&batch1).unwrap();
2384 all_bars.extend(bars1);
2385
2386 let batch2 = vec![
2388 test_utils::create_test_agg_trade(4, "50100.0", "1.0", 4000), test_utils::create_test_agg_trade(5, "50120.0", "1.0", 5000),
2390 test_utils::create_test_agg_trade(6, "50170.0", "1.0", 6000), ];
2392 let bars2 = processor.process_agg_trade_records(&batch2).unwrap();
2393 all_bars.extend(bars2);
2394
2395 let batch3 = vec![
2397 test_utils::create_test_agg_trade(7, "50200.0", "1.0", 7000), test_utils::create_test_agg_trade(8, "50220.0", "1.0", 8000),
2399 test_utils::create_test_agg_trade(9, "50280.0", "1.0", 9000), ];
2401 let bars3 = processor.process_agg_trade_records(&batch3).unwrap();
2402 all_bars.extend(bars3);
2403
2404 assert!(
2406 all_bars.len() >= 3,
2407 "Expected at least 3 bars from 3 batches, got {}",
2408 all_bars.len()
2409 );
2410
2411 for i in 1..all_bars.len() {
2413 assert!(
2414 all_bars[i].close_time >= all_bars[i - 1].close_time,
2415 "Bar {i}: close_time {} < previous {}",
2416 all_bars[i].close_time,
2417 all_bars[i - 1].close_time
2418 );
2419 }
2420
2421 for i in 1..all_bars.len() {
2423 assert_eq!(
2424 all_bars[i].first_agg_trade_id,
2425 all_bars[i - 1].last_agg_trade_id + 1,
2426 "Bar {i}: trade ID gap (first={}, prev last={})",
2427 all_bars[i].first_agg_trade_id,
2428 all_bars[i - 1].last_agg_trade_id
2429 );
2430 }
2431 }
2432
2433 #[test]
2436 fn test_same_timestamp_prevents_bar_close() {
2437 let mut processor = RangeBarProcessor::new(250).unwrap();
2439 processor.prevent_same_timestamp_close = true;
2440
2441 let trades: Vec<AggTrade> = (0..5)
2443 .map(|i| {
2444 let price_str = if i == 0 {
2445 "50000.0".to_string()
2446 } else {
2447 format!("{}.0", 50000 + (i + 1) * 200)
2449 };
2450 AggTrade {
2451 agg_trade_id: i as i64,
2452 price: FixedPoint::from_str(&price_str).unwrap(),
2453 volume: FixedPoint::from_str("1.0").unwrap(),
2454 first_trade_id: i as i64,
2455 last_trade_id: i as i64,
2456 timestamp: 1000000, is_buyer_maker: false,
2458 is_best_match: None,
2459 }
2460 })
2461 .collect();
2462
2463 let bars = processor.process_agg_trade_records(&trades).unwrap();
2464 assert_eq!(bars.len(), 0, "Same timestamp should prevent bar close (Issue #36)");
2466 }
2467
2468 #[test]
2469 fn test_single_trade_incomplete_bar() {
2470 let mut processor = RangeBarProcessor::new(250).unwrap();
2471
2472 let trade = AggTrade {
2473 agg_trade_id: 1,
2474 price: FixedPoint::from_str("50000.0").unwrap(),
2475 volume: FixedPoint::from_str("10.0").unwrap(),
2476 first_trade_id: 1,
2477 last_trade_id: 1,
2478 timestamp: 1000000,
2479 is_buyer_maker: false,
2480 is_best_match: None,
2481 };
2482
2483 let bars = processor.process_agg_trade_records(&[trade.clone()]).unwrap();
2485 assert_eq!(bars.len(), 0, "Single trade cannot complete a bar");
2486
2487 let mut processor2 = RangeBarProcessor::new(250).unwrap();
2489 let bars_incl = processor2
2490 .process_agg_trade_records_with_incomplete(&[trade])
2491 .unwrap();
2492 assert_eq!(bars_incl.len(), 1, "Should return 1 incomplete bar");
2493 assert_eq!(bars_incl[0].open, bars_incl[0].close);
2494 assert_eq!(bars_incl[0].high, bars_incl[0].low);
2495 }
2496
2497 #[test]
2500 fn test_with_options_gate_disabled_same_timestamp_closes() {
2501 let mut processor = RangeBarProcessor::with_options(250, false).unwrap();
2504 assert!(!processor.prevent_same_timestamp_close());
2505
2506 let trades = vec![
2507 AggTrade {
2508 agg_trade_id: 1, price: FixedPoint::from_str("50000.0").unwrap(),
2509 volume: FixedPoint::from_str("1.0").unwrap(),
2510 first_trade_id: 1, last_trade_id: 1, timestamp: 1000000,
2511 is_buyer_maker: false, is_best_match: None,
2512 },
2513 AggTrade {
2514 agg_trade_id: 2, price: FixedPoint::from_str("50200.0").unwrap(), volume: FixedPoint::from_str("1.0").unwrap(),
2516 first_trade_id: 2, last_trade_id: 2, timestamp: 1000000, is_buyer_maker: false, is_best_match: None,
2518 },
2519 ];
2520 let bars = processor.process_agg_trade_records(&trades).unwrap();
2521 assert_eq!(bars.len(), 1, "Gate disabled: same-timestamp breach should close bar");
2522 }
2523
2524 #[test]
2525 fn test_inter_bar_config_enables_features() {
2526 use crate::interbar::LookbackMode;
2527 let processor = RangeBarProcessor::new(250).unwrap();
2528 assert!(!processor.inter_bar_enabled(), "Default: inter-bar disabled");
2529
2530 let processor = processor.with_inter_bar_config(InterBarConfig {
2531 lookback_mode: LookbackMode::FixedCount(100),
2532 compute_tier2: false,
2533 compute_tier3: false,
2534 });
2535 assert!(processor.inter_bar_enabled(), "After config: inter-bar enabled");
2536 }
2537
2538 #[test]
2539 fn test_intra_bar_feature_toggle() {
2540 let processor = RangeBarProcessor::new(250).unwrap();
2541 assert!(!processor.intra_bar_enabled(), "Default: intra-bar disabled");
2542
2543 let processor = processor.with_intra_bar_features();
2544 assert!(processor.intra_bar_enabled(), "After toggle: intra-bar enabled");
2545 }
2546
2547 #[test]
2548 fn test_set_inter_bar_config_after_construction() {
2549 use crate::interbar::LookbackMode;
2550 let mut processor = RangeBarProcessor::new(500).unwrap();
2551 assert!(!processor.inter_bar_enabled());
2552
2553 processor.set_inter_bar_config(InterBarConfig {
2554 lookback_mode: LookbackMode::FixedCount(200),
2555 compute_tier2: true,
2556 compute_tier3: false,
2557 });
2558 assert!(processor.inter_bar_enabled(), "set_inter_bar_config should enable");
2559 }
2560
2561 #[test]
2562 fn test_process_with_options_incomplete_false_vs_true() {
2563 let trades = scenarios::single_breach_sequence(250);
2564
2565 let mut p1 = RangeBarProcessor::new(250).unwrap();
2567 let bars_strict = p1.process_agg_trade_records_with_options(&trades, false).unwrap();
2568
2569 let mut p2 = RangeBarProcessor::new(250).unwrap();
2571 let bars_incl = p2.process_agg_trade_records_with_options(&trades, true).unwrap();
2572
2573 assert!(
2574 bars_incl.len() >= bars_strict.len(),
2575 "inclusive ({}) must be >= strict ({})", bars_incl.len(), bars_strict.len()
2576 );
2577 }
2578
2579 #[test]
2582 fn test_anomaly_summary_default_no_anomalies() {
2583 let processor = RangeBarProcessor::new(250).unwrap();
2584 let summary = processor.anomaly_summary();
2585 assert_eq!(summary.gaps_detected, 0);
2586 assert_eq!(summary.overlaps_detected, 0);
2587 assert_eq!(summary.timestamp_anomalies, 0);
2588 assert!(!summary.has_anomalies());
2589 assert_eq!(summary.total(), 0);
2590 }
2591
2592 #[test]
2593 fn test_anomaly_summary_preserved_through_checkpoint() {
2594 let mut processor = RangeBarProcessor::new(250).unwrap();
2596 let trades = scenarios::single_breach_sequence(250);
2597 processor.process_agg_trade_records(&trades).unwrap();
2598
2599 let checkpoint = processor.create_checkpoint("TEST");
2600 let restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2601 let summary = restored.anomaly_summary();
2602 assert_eq!(summary.total(), 0);
2604 }
2605
2606 #[test]
2607 fn test_anomaly_summary_from_checkpoint_with_anomalies() {
2608 let json = r#"{
2610 "version": 3,
2611 "symbol": "TESTUSDT",
2612 "threshold_decimal_bps": 250,
2613 "prevent_same_timestamp_close": true,
2614 "defer_open": false,
2615 "current_bar": null,
2616 "thresholds": null,
2617 "last_timestamp_us": 1000000,
2618 "last_trade_id": 5,
2619 "price_hash": 0,
2620 "anomaly_summary": {"gaps_detected": 3, "overlaps_detected": 1, "timestamp_anomalies": 2}
2621 }"#;
2622 let checkpoint: crate::checkpoint::Checkpoint = serde_json::from_str(json).unwrap();
2623 let processor = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2624 let summary = processor.anomaly_summary();
2625 assert_eq!(summary.gaps_detected, 3);
2626 assert_eq!(summary.overlaps_detected, 1);
2627 assert_eq!(summary.timestamp_anomalies, 2);
2628 assert!(summary.has_anomalies());
2629 assert_eq!(summary.total(), 6);
2630 }
2631
2632 #[test]
2633 fn test_with_inter_bar_config_and_cache_shared() {
2634 use crate::entropy_cache_global::get_global_entropy_cache;
2635 use crate::interbar::LookbackMode;
2636
2637 let global_cache = get_global_entropy_cache();
2638 let config = InterBarConfig {
2639 lookback_mode: LookbackMode::FixedCount(100),
2640 compute_tier2: true,
2641 compute_tier3: true,
2642 };
2643
2644 let p1 = RangeBarProcessor::new(250).unwrap()
2646 .with_inter_bar_config_and_cache(config.clone(), Some(global_cache.clone()));
2647 let p2 = RangeBarProcessor::new(500).unwrap()
2648 .with_inter_bar_config_and_cache(config, Some(global_cache));
2649
2650 assert!(p1.inter_bar_enabled());
2651 assert!(p2.inter_bar_enabled());
2652 }
2653
2654 #[test]
2655 fn test_set_inter_bar_config_with_cache_after_checkpoint() {
2656 use crate::entropy_cache_global::get_global_entropy_cache;
2657 use crate::interbar::LookbackMode;
2658
2659 let mut processor = RangeBarProcessor::new(250).unwrap();
2660 let trades = scenarios::single_breach_sequence(250);
2661 processor.process_agg_trade_records(&trades).unwrap();
2662
2663 let checkpoint = processor.create_checkpoint("TEST");
2665 let mut restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2666 assert!(!restored.inter_bar_enabled(), "Checkpoint does not preserve inter-bar config");
2667
2668 let global_cache = get_global_entropy_cache();
2670 restored.set_inter_bar_config_with_cache(
2671 InterBarConfig {
2672 lookback_mode: LookbackMode::FixedCount(100),
2673 compute_tier2: false,
2674 compute_tier3: false,
2675 },
2676 Some(global_cache),
2677 );
2678 assert!(restored.inter_bar_enabled(), "set_inter_bar_config_with_cache should re-enable");
2679 }
2680
2681 #[test]
2682 fn test_threshold_decimal_bps_getter() {
2683 let p250 = RangeBarProcessor::new(250).unwrap();
2684 assert_eq!(p250.threshold_decimal_bps(), 250);
2685
2686 let p1000 = RangeBarProcessor::new(1000).unwrap();
2687 assert_eq!(p1000.threshold_decimal_bps(), 1000);
2688 }
2689
2690 #[test]
2695 fn test_checkpoint_gap_discards_forming_bar() {
2696 let mut processor = RangeBarProcessor::new(250).unwrap();
2698
2699 let trades = vec![
2701 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000), test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000), test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640995202_000_000), ];
2705
2706 let bars = processor.process_agg_trade_records(&trades).unwrap();
2707 assert_eq!(bars.len(), 0, "No breach = no completed bars");
2708
2709 let checkpoint = processor.create_checkpoint("BTCUSDT");
2711 assert!(checkpoint.has_incomplete_bar(), "Should have forming bar");
2712
2713 let mut restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2715
2716 let gap_trades = vec![
2718 test_utils::create_test_agg_trade(4, "50030.0", "1.0", 1641002402_000_000), test_utils::create_test_agg_trade(5, "50040.0", "1.0", 1641002403_000_000),
2720 ];
2721
2722 let bars = restored.process_agg_trade_records(&gap_trades).unwrap();
2723 assert_eq!(bars.len(), 0, "No bars should complete — forming bar was discarded");
2725 assert_eq!(
2726 restored.anomaly_summary().gaps_detected, 1,
2727 "Gap should be recorded in anomaly summary"
2728 );
2729 }
2730
2731 #[test]
2732 fn test_checkpoint_small_gap_continues_bar() {
2733 let mut processor = RangeBarProcessor::new(250).unwrap();
2735
2736 let trades = vec![
2737 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2738 test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000),
2739 ];
2740
2741 let _ = processor.process_agg_trade_records(&trades).unwrap();
2742 let checkpoint = processor.create_checkpoint("BTCUSDT");
2743 let mut restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2744
2745 let small_gap_trades = vec![
2747 test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640997001_000_000), test_utils::create_test_agg_trade(4, "50125.01", "1.0", 1640997002_000_000), ];
2750
2751 let bars = restored.process_agg_trade_records(&small_gap_trades).unwrap();
2752 assert_eq!(bars.len(), 1, "Bar should complete normally with small gap");
2753 assert_eq!(bars[0].open_time, 1640995200_000_000);
2755 assert_eq!(
2756 restored.anomaly_summary().gaps_detected, 0,
2757 "No gap anomaly for small gap"
2758 );
2759 }
2760
2761 #[test]
2762 fn test_checkpoint_gap_custom_max_gap() {
2763 let mut processor = RangeBarProcessor::new(250).unwrap();
2765
2766 let trades = vec![
2767 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2768 ];
2769 let _ = processor.process_agg_trade_records(&trades).unwrap();
2770 let checkpoint = processor.create_checkpoint("BTCUSDT");
2771
2772 let mut restored = RangeBarProcessor::from_checkpoint(checkpoint)
2774 .unwrap()
2775 .with_max_gap(1_800_000_000); let gap_trades = vec![
2779 test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640997900_000_000), ];
2781
2782 let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
2783 assert_eq!(
2784 restored.anomaly_summary().gaps_detected, 1,
2785 "45-min gap should be detected with 30-min threshold"
2786 );
2787 }
2788
2789 #[test]
2790 fn test_is_valid_range_rejects_oversized() {
2791 use crate::fixed_point::FixedPoint;
2792
2793 let threshold_decimal_bps: u32 = 250; let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
2795 / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
2796
2797 let mut oversized = RangeBar::default();
2800 oversized.open = FixedPoint::from_str("50000.0").unwrap();
2801 oversized.high = FixedPoint::from_str("50250.01").unwrap();
2802 oversized.low = FixedPoint::from_str("50000.0").unwrap();
2803 assert!(
2804 !oversized.is_valid_range(threshold_ratio, 2),
2805 "Bar exceeding 2x threshold should be invalid"
2806 );
2807
2808 let mut valid = RangeBar::default();
2810 valid.open = FixedPoint::from_str("50000.0").unwrap();
2811 valid.high = FixedPoint::from_str("50100.0").unwrap();
2812 valid.low = FixedPoint::from_str("50000.0").unwrap();
2813 assert!(
2814 valid.is_valid_range(threshold_ratio, 2),
2815 "Bar within threshold should be valid"
2816 );
2817
2818 let mut exact = RangeBar::default();
2820 exact.open = FixedPoint::from_str("50000.0").unwrap();
2821 exact.high = FixedPoint::from_str("50125.0").unwrap();
2822 exact.low = FixedPoint::from_str("50000.0").unwrap();
2823 assert!(
2824 exact.is_valid_range(threshold_ratio, 2),
2825 "Bar at exact threshold should be valid"
2826 );
2827 }
2828
2829 #[test]
2830 fn test_checkpoint_no_incomplete_bar_gap_is_noop() {
2831 let mut processor = RangeBarProcessor::new(250).unwrap();
2833
2834 let trades = vec![
2836 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2837 test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1640995201_000_000), ];
2839 let bars = processor.process_agg_trade_records(&trades).unwrap();
2840 assert_eq!(bars.len(), 1);
2841
2842 let checkpoint = processor.create_checkpoint("BTCUSDT");
2843 assert!(!checkpoint.has_incomplete_bar());
2844
2845 let mut restored = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
2846
2847 let gap_trades = vec![
2849 test_utils::create_test_agg_trade(3, "50010.0", "1.0", 1641081600_000_000), ];
2851 let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
2852 assert_eq!(
2853 restored.anomaly_summary().gaps_detected, 0,
2854 "No gap anomaly when no forming bar exists"
2855 );
2856 }
2857}