1use crate::checkpoint::{
8 AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
9};
10use crate::fixed_point::FixedPoint;
11use crate::interbar::{InterBarConfig, TradeHistory}; use crate::types::{AggTrade, OpenDeviationBar};
14use smallvec::SmallVec; #[cfg(feature = "python")]
16use pyo3::prelude::*;
17pub use crate::errors::ProcessingError;
19pub use crate::export_processor::ExportOpenDeviationBarProcessor;
21
22pub struct OpenDeviationBarProcessor {
24 threshold_decimal_bps: u32,
26
27 pub threshold_ratio: i64,
34
35 current_bar_state: Option<OpenDeviationBarState>,
38
39 price_window: PriceWindow,
41
42 last_trade_id: Option<i64>,
44
45 last_timestamp_us: i64,
47
48 anomaly_summary: AnomalySummary,
50
51 resumed_from_checkpoint: bool,
54
55 prevent_same_timestamp_close: bool,
64
65 defer_open: bool,
75
76 trade_history: Option<TradeHistory>,
82
83 inter_bar_config: Option<InterBarConfig>,
88
89 include_intra_bar_features: bool,
96
97 intra_bar_config: crate::intrabar::IntraBarConfig,
100
101 max_gap_us: i64,
110}
111
112#[cold]
115#[inline(never)]
116fn find_unsorted_trade(trades: &[AggTrade]) -> Result<(), ProcessingError> {
117 for i in 1..trades.len() {
118 let prev = &trades[i - 1];
119 let curr = &trades[i];
120 if curr.timestamp < prev.timestamp
121 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
122 {
123 return Err(ProcessingError::UnsortedTrades {
124 index: i,
125 prev_time: prev.timestamp,
126 prev_id: prev.agg_trade_id,
127 curr_time: curr.timestamp,
128 curr_id: curr.agg_trade_id,
129 });
130 }
131 }
132 Ok(())
133}
134
135#[cold]
138#[inline(never)]
139fn unsorted_trade_error(index: usize, prev: &AggTrade, curr: &AggTrade) -> Result<(), ProcessingError> {
140 Err(ProcessingError::UnsortedTrades {
141 index,
142 prev_time: prev.timestamp,
143 prev_id: prev.agg_trade_id,
144 curr_time: curr.timestamp,
145 curr_id: curr.agg_trade_id,
146 })
147}
148
149impl OpenDeviationBarProcessor {
150 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
166 Self::with_options(threshold_decimal_bps, true)
167 }
168
169 pub fn with_options(
188 threshold_decimal_bps: u32,
189 prevent_same_timestamp_close: bool,
190 ) -> Result<Self, ProcessingError> {
191 if threshold_decimal_bps < 1 {
195 return Err(ProcessingError::InvalidThreshold {
196 threshold_decimal_bps,
197 });
198 }
199 if threshold_decimal_bps > 100_000 {
200 return Err(ProcessingError::InvalidThreshold {
201 threshold_decimal_bps,
202 });
203 }
204
205 let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
209 / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
210
211 Ok(Self {
212 threshold_decimal_bps,
213 threshold_ratio,
214 current_bar_state: None,
215 price_window: PriceWindow::new(),
216 last_trade_id: None,
217 last_timestamp_us: 0,
218 anomaly_summary: AnomalySummary::default(),
219 resumed_from_checkpoint: false,
220 prevent_same_timestamp_close,
221 defer_open: false,
222 trade_history: None, inter_bar_config: None, include_intra_bar_features: false, intra_bar_config: crate::intrabar::IntraBarConfig::default(), max_gap_us: 3_600_000_000, })
228 }
229
230 pub fn prevent_same_timestamp_close(&self) -> bool {
232 self.prevent_same_timestamp_close
233 }
234
235 pub fn with_inter_bar_config(self, config: InterBarConfig) -> Self {
264 self.with_inter_bar_config_and_cache(config, None)
265 }
266
267 pub fn with_inter_bar_config_and_cache(
293 mut self,
294 config: InterBarConfig,
295 external_cache: Option<std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>>,
296 ) -> Self {
297 self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
298 self.inter_bar_config = Some(config);
299 self
300 }
301
302 pub fn inter_bar_enabled(&self) -> bool {
304 self.inter_bar_config.is_some()
305 }
306
307 pub fn with_max_gap(mut self, max_gap_us: i64) -> Self {
317 self.max_gap_us = max_gap_us;
318 self
319 }
320
321 pub fn max_gap_us(&self) -> i64 {
323 self.max_gap_us
324 }
325
326 pub fn with_intra_bar_features(mut self) -> Self {
346 self.include_intra_bar_features = true;
347 self
348 }
349
350 pub fn intra_bar_enabled(&self) -> bool {
352 self.include_intra_bar_features
353 }
354
355 pub fn set_inter_bar_config(&mut self, config: InterBarConfig) {
361 self.set_inter_bar_config_with_cache(config, None);
362 }
363
364 pub fn set_inter_bar_config_with_cache(
370 &mut self,
371 config: InterBarConfig,
372 external_cache: Option<std::sync::Arc<parking_lot::RwLock<crate::interbar_math::EntropyCache>>>,
373 ) {
374 self.trade_history = Some(TradeHistory::new_with_cache(config.clone(), external_cache));
375 self.inter_bar_config = Some(config);
376 }
377
378 pub fn set_intra_bar_features(&mut self, enabled: bool) {
380 self.include_intra_bar_features = enabled;
381 }
382
383 pub fn with_intra_bar_config(mut self, config: crate::intrabar::IntraBarConfig) -> Self {
386 self.intra_bar_config = config;
387 self
388 }
389
390 pub fn set_intra_bar_config(&mut self, config: crate::intrabar::IntraBarConfig) {
392 self.intra_bar_config = config;
393 }
394
395 #[inline]
419 pub fn process_single_trade(
420 &mut self,
421 trade: &AggTrade,
422 ) -> Result<Option<OpenDeviationBar>, ProcessingError> {
423 self.price_window.push(trade.price);
425 self.last_trade_id = Some(trade.agg_trade_id);
426 self.last_timestamp_us = trade.timestamp;
427
428 if let Some(ref mut history) = self.trade_history {
431 history.push(trade);
432 }
433
434 if self.defer_open {
438 if let Some(ref mut history) = self.trade_history {
440 history.on_bar_open(trade.timestamp);
441 }
442 self.current_bar_state = Some(if self.include_intra_bar_features {
443 OpenDeviationBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
444 } else {
445 OpenDeviationBarState::new(trade, self.threshold_ratio)
446 });
447 self.defer_open = false;
448 return Ok(None);
449 }
450
451 match &mut self.current_bar_state {
452 None => {
453 if let Some(ref mut history) = self.trade_history {
456 history.on_bar_open(trade.timestamp);
457 }
458 self.current_bar_state = Some(if self.include_intra_bar_features {
459 OpenDeviationBarState::new_with_trade_accumulation(trade, self.threshold_ratio)
460 } else {
461 OpenDeviationBarState::new(trade, self.threshold_ratio)
462 });
463 Ok(None)
464 }
465 Some(bar_state) => {
466 bar_state.accumulate_trade(trade, self.include_intra_bar_features);
469
470 let price_breaches = bar_state.bar.is_breach(
472 trade.price,
473 bar_state.upper_threshold,
474 bar_state.lower_threshold,
475 );
476
477 let timestamp_allows_close = !self.prevent_same_timestamp_close
481 || trade.timestamp != bar_state.bar.open_time;
482
483 if price_breaches && timestamp_allows_close {
484 bar_state.bar.update_with_trade(trade);
486
487 debug_assert!(
489 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
490 );
491 debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
492
493 bar_state.bar.compute_microstructure_features();
495
496 if let Some(ref mut history) = self.trade_history {
499 let inter_bar_features = history.compute_features(bar_state.bar.open_time);
500 bar_state.bar.set_inter_bar_features(&inter_bar_features);
501 history.on_bar_close();
503 }
504
505 if self.include_intra_bar_features {
507 let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_config(
509 &bar_state.accumulated_trades,
510 &mut bar_state.scratch_prices,
511 &mut bar_state.scratch_volumes,
512 &self.intra_bar_config, );
514 bar_state.bar.set_intra_bar_features(&intra_bar_features);
515 }
516
517 let completed_bar = self.current_bar_state.take().unwrap().bar;
520
521 self.defer_open = true;
524
525 Ok(Some(completed_bar))
526 } else {
527 bar_state.bar.update_with_trade(trade);
529 Ok(None)
530 }
531 }
532 }
533 }
534
535 pub fn get_incomplete_bar(&self) -> Option<OpenDeviationBar> {
544 self.current_bar_state
545 .as_ref()
546 .map(|state| state.bar.clone())
547 }
548
549 pub fn process_agg_trade_records_with_incomplete(
564 &mut self,
565 agg_trade_records: &[AggTrade],
566 ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
567 self.process_agg_trade_records_with_options(agg_trade_records, true)
568 }
569
570 pub fn process_agg_trade_records(
585 &mut self,
586 agg_trade_records: &[AggTrade],
587 ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
588 self.process_agg_trade_records_with_options(agg_trade_records, false)
589 }
590
591 pub fn process_agg_trade_records_with_options(
605 &mut self,
606 agg_trade_records: &[AggTrade],
607 include_incomplete: bool,
608 ) -> Result<Vec<OpenDeviationBar>, ProcessingError> {
609 if agg_trade_records.is_empty() {
610 return Ok(Vec::new());
611 }
612
613 self.validate_trade_ordering(agg_trade_records)?;
615
616 let mut current_bar: Option<OpenDeviationBarState> = if self.resumed_from_checkpoint {
619 self.resumed_from_checkpoint = false; let restored_bar = self.current_bar_state.take();
622
623 if let Some(ref bar_state) = restored_bar {
627 let first_trade_ts = agg_trade_records[0].timestamp;
628 let gap = first_trade_ts - bar_state.bar.close_time;
629 if gap > self.max_gap_us {
630 self.anomaly_summary.record_gap();
631 None
633 } else {
634 restored_bar
635 }
636 } else {
637 restored_bar
638 }
639 } else {
640 self.current_bar_state = None;
642 None
643 };
644
645 let mut bars = Vec::with_capacity(agg_trade_records.len() / 50); let mut defer_open = false;
647
648 for agg_record in agg_trade_records {
649 self.price_window.push(agg_record.price);
651 self.last_trade_id = Some(agg_record.agg_trade_id);
652 self.last_timestamp_us = agg_record.timestamp;
653
654 if let Some(ref mut history) = self.trade_history {
656 history.push(agg_record);
657 }
658
659 if defer_open {
660 if let Some(ref mut history) = self.trade_history {
663 history.on_bar_open(agg_record.timestamp);
664 }
665 current_bar = Some(if self.include_intra_bar_features {
666 OpenDeviationBarState::new_with_trade_accumulation(
667 agg_record,
668 self.threshold_ratio,
669 )
670 } else {
671 OpenDeviationBarState::new(agg_record, self.threshold_ratio)
672 });
673 defer_open = false;
674 continue;
675 }
676
677 match current_bar {
678 None => {
679 if let Some(ref mut history) = self.trade_history {
682 history.on_bar_open(agg_record.timestamp);
683 }
684 current_bar = Some(if self.include_intra_bar_features {
685 OpenDeviationBarState::new_with_trade_accumulation(
686 agg_record,
687 self.threshold_ratio,
688 )
689 } else {
690 OpenDeviationBarState::new(agg_record, self.threshold_ratio)
691 });
692 }
693 Some(ref mut bar_state) => {
694 bar_state.accumulate_trade(agg_record, self.include_intra_bar_features);
697
698 let price_breaches = bar_state.bar.is_breach(
700 agg_record.price,
701 bar_state.upper_threshold,
702 bar_state.lower_threshold,
703 );
704
705 let timestamp_allows_close = !self.prevent_same_timestamp_close
709 || agg_record.timestamp != bar_state.bar.open_time;
710
711 if price_breaches && timestamp_allows_close {
712 bar_state.bar.update_with_trade(agg_record);
714
715 debug_assert!(
717 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
718 );
719 debug_assert!(
720 bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
721 );
722
723 bar_state.bar.compute_microstructure_features();
725
726 if let Some(ref mut history) = self.trade_history {
728 let inter_bar_features =
729 history.compute_features(bar_state.bar.open_time);
730 bar_state.bar.set_inter_bar_features(&inter_bar_features);
731 history.on_bar_close();
733 }
734
735 if self.include_intra_bar_features {
737 let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_config(
739 &bar_state.accumulated_trades,
740 &mut bar_state.scratch_prices,
741 &mut bar_state.scratch_volumes,
742 &self.intra_bar_config, );
744 bar_state.bar.set_intra_bar_features(&intra_bar_features);
745 }
746
747 bars.push(current_bar.take().unwrap().bar);
750 defer_open = true; } else {
752 bar_state.bar.update_with_trade(agg_record);
754 }
755 }
756 }
757 }
758
759 if include_incomplete {
763 if let Some(ref state) = current_bar {
767 let mut checkpoint_state = state.clone();
768 let _ = std::mem::take(&mut checkpoint_state.accumulated_trades);
770 self.current_bar_state = Some(checkpoint_state);
771 }
772
773 if let Some(mut bar_state) = current_bar {
776 bar_state.bar.compute_microstructure_features();
778
779 if let Some(ref history) = self.trade_history {
781 let inter_bar_features = history.compute_features(bar_state.bar.open_time);
782 bar_state.bar.set_inter_bar_features(&inter_bar_features);
783 }
784
785 if self.include_intra_bar_features {
787 let intra_bar_features = crate::intrabar::compute_intra_bar_features_with_config(
789 &bar_state.accumulated_trades,
790 &mut bar_state.scratch_prices,
791 &mut bar_state.scratch_volumes,
792 &self.intra_bar_config, );
794 bar_state.bar.set_intra_bar_features(&intra_bar_features);
795 }
796
797 bars.push(bar_state.bar);
798 }
799 } else {
800 self.current_bar_state = current_bar;
802 }
803
804 Ok(bars)
805 }
806
807 pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
829 let (incomplete_bar, thresholds) = match &self.current_bar_state {
830 Some(state) => (
831 Some(state.bar.clone()),
832 Some((state.upper_threshold, state.lower_threshold)),
833 ),
834 None => (None, None),
835 };
836
837 let mut checkpoint = Checkpoint::new(
838 symbol.to_string(),
839 self.threshold_decimal_bps,
840 incomplete_bar,
841 thresholds,
842 self.last_timestamp_us,
843 self.last_trade_id,
844 self.price_window.compute_hash(),
845 self.prevent_same_timestamp_close,
846 );
847 checkpoint.defer_open = self.defer_open;
849 checkpoint
850 }
851
852 pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
870 let checkpoint = Self::migrate_checkpoint(checkpoint);
872
873 const THRESHOLD_MIN: u32 = 1;
876 const THRESHOLD_MAX: u32 = 100_000;
877 if checkpoint.threshold_decimal_bps < THRESHOLD_MIN
878 || checkpoint.threshold_decimal_bps > THRESHOLD_MAX
879 {
880 return Err(CheckpointError::InvalidThreshold {
881 threshold: checkpoint.threshold_decimal_bps,
882 min_threshold: THRESHOLD_MIN,
883 max_threshold: THRESHOLD_MAX,
884 });
885 }
886
887 if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
889 return Err(CheckpointError::MissingThresholds);
890 }
891
892 let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
896 (Some(bar), Some((upper, lower))) => Some(OpenDeviationBarState {
897 bar,
898 upper_threshold: upper,
899 lower_threshold: lower,
900 accumulated_trades: SmallVec::new(), scratch_prices: SmallVec::new(),
902 scratch_volumes: SmallVec::new(),
903 }),
904 _ => None,
905 };
906
907 let threshold_ratio = ((checkpoint.threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
909 / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
910
911 Ok(Self {
912 threshold_decimal_bps: checkpoint.threshold_decimal_bps,
913 threshold_ratio,
914 current_bar_state,
915 price_window: PriceWindow::new(), last_trade_id: checkpoint.last_trade_id,
917 last_timestamp_us: checkpoint.last_timestamp_us,
918 anomaly_summary: checkpoint.anomaly_summary,
919 resumed_from_checkpoint: true, prevent_same_timestamp_close: checkpoint.prevent_same_timestamp_close,
921 defer_open: checkpoint.defer_open, trade_history: None, inter_bar_config: None, include_intra_bar_features: false, intra_bar_config: crate::intrabar::IntraBarConfig::default(), max_gap_us: 3_600_000_000, })
928 }
929
930 fn migrate_checkpoint(mut checkpoint: Checkpoint) -> Checkpoint {
934 match checkpoint.version {
935 1 => {
936 checkpoint.version = 2;
939 checkpoint
940 }
941 2 => {
942 checkpoint
944 }
945 _ => {
946 eprintln!(
948 "Warning: Checkpoint has unknown version {}, treating as v2",
949 checkpoint.version
950 );
951 checkpoint.version = 2;
952 checkpoint
953 }
954 }
955 }
956
957 pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
982 match self.last_trade_id {
983 Some(last_id) => {
984 let expected_id = last_id + 1;
986 if first_trade.agg_trade_id == expected_id {
987 PositionVerification::Exact
988 } else {
989 let missing_count = first_trade.agg_trade_id - expected_id;
990 PositionVerification::Gap {
991 expected_id,
992 actual_id: first_trade.agg_trade_id,
993 missing_count,
994 }
995 }
996 }
997 None => {
998 let gap_us = first_trade.timestamp - self.last_timestamp_us;
1000 let gap_ms = gap_us / 1000;
1001 PositionVerification::TimestampOnly { gap_ms }
1002 }
1003 }
1004 }
1005
1006 pub fn anomaly_summary(&self) -> &AnomalySummary {
1008 &self.anomaly_summary
1009 }
1010
1011 pub fn threshold_decimal_bps(&self) -> u32 {
1013 self.threshold_decimal_bps
1014 }
1015
1016 fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
1022 if trades.is_empty() {
1023 return Ok(());
1024 }
1025
1026 let first = &trades[0];
1030 let last = &trades[trades.len() - 1];
1031
1032 if last.timestamp < first.timestamp
1033 || (last.timestamp == first.timestamp && last.agg_trade_id <= first.agg_trade_id)
1034 {
1035 return find_unsorted_trade(trades);
1037 }
1038
1039 for i in 1..trades.len() {
1041 let prev = &trades[i - 1];
1042 let curr = &trades[i];
1043
1044 if curr.timestamp < prev.timestamp
1046 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
1047 {
1048 return unsorted_trade_error(i, prev, curr);
1049 }
1050 }
1051
1052 Ok(())
1053 }
1054
1055 pub fn reset_at_ouroboros(&mut self) -> Option<OpenDeviationBar> {
1077 let orphaned = self.current_bar_state.take().map(|state| state.bar);
1078 self.price_window = PriceWindow::new();
1079 self.last_trade_id = None;
1080 self.last_timestamp_us = 0;
1081 self.resumed_from_checkpoint = false;
1082 self.defer_open = false;
1083 if let Some(ref mut history) = self.trade_history {
1086 history.reset_bar_boundaries();
1087 }
1088 orphaned
1089 }
1090}
1091
1092#[derive(Clone)]
1094struct OpenDeviationBarState {
1095 pub bar: OpenDeviationBar,
1097
1098 pub upper_threshold: FixedPoint,
1100
1101 pub lower_threshold: FixedPoint,
1103
1104 pub accumulated_trades: SmallVec<[AggTrade; 64]>,
1113
1114 pub scratch_prices: SmallVec<[f64; 64]>,
1118
1119 pub scratch_volumes: SmallVec<[f64; 64]>,
1122}
1123
1124impl OpenDeviationBarState {
1125 #[inline]
1128 fn new(trade: &AggTrade, threshold_ratio: i64) -> Self {
1129 let bar = OpenDeviationBar::new(trade);
1130
1131 let (upper_threshold, lower_threshold) =
1134 bar.open.compute_range_thresholds_cached(threshold_ratio);
1135
1136 Self {
1137 bar,
1138 upper_threshold,
1139 lower_threshold,
1140 accumulated_trades: SmallVec::new(),
1141 scratch_prices: SmallVec::new(),
1142 scratch_volumes: SmallVec::new(),
1143 }
1144 }
1145
1146 #[inline]
1149 fn new_with_trade_accumulation(trade: &AggTrade, threshold_ratio: i64) -> Self {
1150 let bar = OpenDeviationBar::new(trade);
1151
1152 let (upper_threshold, lower_threshold) =
1155 bar.open.compute_range_thresholds_cached(threshold_ratio);
1156
1157 Self {
1158 bar,
1159 upper_threshold,
1160 lower_threshold,
1161 accumulated_trades: {
1162 let mut sv = SmallVec::new();
1163 sv.push(trade.clone());
1164 sv
1165 },
1166 scratch_prices: SmallVec::new(),
1167 scratch_volumes: SmallVec::new(),
1168 }
1169 }
1170
1171 #[inline]
1178 fn accumulate_trade(&mut self, trade: &AggTrade, include_intra: bool) {
1179 if include_intra {
1180 self.accumulated_trades.push(trade.clone());
1181 }
1182 }
1183}
1184
1185#[cfg(test)]
1186mod tests {
1187 use super::*;
1188 use crate::test_utils::{self, scenarios};
1189
1190 #[test]
1191 fn test_single_bar_no_breach() {
1192 let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); let trades = scenarios::no_breach_sequence(250);
1196
1197 let bars = processor.process_agg_trade_records(&trades).unwrap();
1199 assert_eq!(
1200 bars.len(),
1201 0,
1202 "Strict algorithm should not create bars without breach"
1203 );
1204
1205 let bars_with_incomplete = processor
1207 .process_agg_trade_records_with_incomplete(&trades)
1208 .unwrap();
1209 assert_eq!(
1210 bars_with_incomplete.len(),
1211 1,
1212 "Analysis mode should include incomplete bar"
1213 );
1214
1215 let bar = &bars_with_incomplete[0];
1216 assert_eq!(bar.open.to_string(), "50000.00000000");
1217 assert_eq!(bar.high.to_string(), "50100.00000000");
1218 assert_eq!(bar.low.to_string(), "49900.00000000");
1219 assert_eq!(bar.close.to_string(), "49900.00000000");
1220 }
1221
1222 #[test]
1223 fn test_exact_breach_upward() {
1224 let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_upward(250);
1227
1228 let bars = processor.process_agg_trade_records(&trades).unwrap();
1230 assert_eq!(
1231 bars.len(),
1232 1,
1233 "Strict algorithm should only return completed bars"
1234 );
1235
1236 let bar1 = &bars[0];
1238 assert_eq!(bar1.open.to_string(), "50000.00000000");
1239 assert_eq!(bar1.close.to_string(), "50125.00000000"); assert_eq!(bar1.high.to_string(), "50125.00000000");
1242 assert_eq!(bar1.low.to_string(), "50000.00000000");
1243
1244 let bars_with_incomplete = processor
1246 .process_agg_trade_records_with_incomplete(&trades)
1247 .unwrap();
1248 assert_eq!(
1249 bars_with_incomplete.len(),
1250 2,
1251 "Analysis mode should include incomplete bars"
1252 );
1253
1254 let bar2 = &bars_with_incomplete[1];
1256 assert_eq!(bar2.open.to_string(), "50500.00000000"); assert_eq!(bar2.close.to_string(), "50500.00000000");
1258 }
1259
1260 #[test]
1261 fn test_exact_breach_downward() {
1262 let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_downward(250);
1265
1266 let bars = processor.process_agg_trade_records(&trades).unwrap();
1267
1268 assert_eq!(bars.len(), 1);
1269
1270 let bar = &bars[0];
1271 assert_eq!(bar.open.to_string(), "50000.00000000");
1272 assert_eq!(bar.close.to_string(), "49875.00000000"); assert_eq!(bar.high.to_string(), "50000.00000000");
1274 assert_eq!(bar.low.to_string(), "49875.00000000");
1275 }
1276
1277 #[test]
1278 fn test_large_gap_single_bar() {
1279 let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); let trades = scenarios::large_gap_sequence();
1282
1283 let bars = processor.process_agg_trade_records(&trades).unwrap();
1284
1285 assert_eq!(bars.len(), 1);
1287
1288 let bar = &bars[0];
1289 assert_eq!(bar.open.to_string(), "50000.00000000");
1290 assert_eq!(bar.close.to_string(), "51000.00000000");
1291 assert_eq!(bar.high.to_string(), "51000.00000000");
1292 assert_eq!(bar.low.to_string(), "50000.00000000");
1293 }
1294
1295 #[test]
1296 fn test_unsorted_trades_error() {
1297 let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); let trades = scenarios::unsorted_sequence();
1300
1301 let result = processor.process_agg_trade_records(&trades);
1302 assert!(result.is_err());
1303
1304 match result {
1305 Err(ProcessingError::UnsortedTrades { index, .. }) => {
1306 assert_eq!(index, 1);
1307 }
1308 _ => panic!("Expected UnsortedTrades error"),
1309 }
1310 }
1311
1312 #[test]
1313 fn test_threshold_calculation() {
1314 let processor = OpenDeviationBarProcessor::new(250).unwrap(); let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1317 let bar_state = OpenDeviationBarState::new(&trade, processor.threshold_ratio);
1318
1319 assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
1321 assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
1322 }
1323
1324 #[test]
1325 fn test_empty_trades() {
1326 let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); let trades = scenarios::empty_sequence();
1328 let bars = processor.process_agg_trade_records(&trades).unwrap();
1329 assert_eq!(bars.len(), 0);
1330 }
1331
1332 #[test]
1333 fn test_debug_streaming_data() {
1334 let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); let trades = vec![
1338 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1339 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1341 ];
1342
1343 println!("Test data prices: 50014 -> 50163 -> 50032");
1344 println!("Expected price movements: +0.3% then -0.26%");
1345
1346 let bars = processor.process_agg_trade_records(&trades).unwrap();
1347 println!("Generated {} open deviation bars", bars.len());
1348
1349 for (i, bar) in bars.iter().enumerate() {
1350 println!(
1351 " Bar {}: O={} H={} L={} C={}",
1352 i + 1,
1353 bar.open,
1354 bar.high,
1355 bar.low,
1356 bar.close
1357 );
1358 }
1359
1360 assert!(
1362 !bars.is_empty(),
1363 "Expected at least 1 open deviation bar with 0.3% price movement and 0.1% threshold"
1364 );
1365 }
1366
1367 #[test]
1368 fn test_threshold_validation() {
1369 assert!(OpenDeviationBarProcessor::new(250).is_ok());
1371
1372 assert!(matches!(
1374 OpenDeviationBarProcessor::new(0),
1375 Err(ProcessingError::InvalidThreshold {
1376 threshold_decimal_bps: 0
1377 })
1378 ));
1379
1380 assert!(matches!(
1382 OpenDeviationBarProcessor::new(150_000),
1383 Err(ProcessingError::InvalidThreshold {
1384 threshold_decimal_bps: 150_000
1385 })
1386 ));
1387
1388 assert!(OpenDeviationBarProcessor::new(1).is_ok());
1390
1391 assert!(OpenDeviationBarProcessor::new(100_000).is_ok());
1393 }
1394
1395 #[test]
1396 fn test_export_processor_with_manual_trades() {
1397 println!("Testing ExportOpenDeviationBarProcessor with same trade data...");
1398
1399 let mut export_processor = ExportOpenDeviationBarProcessor::new(100).unwrap(); let trades = vec![
1403 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
1404 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
1406 ];
1407
1408 println!(
1409 "Processing {} trades with ExportOpenDeviationBarProcessor...",
1410 trades.len()
1411 );
1412
1413 export_processor.process_trades_continuously(&trades);
1414 let bars = export_processor.get_all_completed_bars();
1415
1416 println!(
1417 "ExportOpenDeviationBarProcessor generated {} open deviation bars",
1418 bars.len()
1419 );
1420 for (i, bar) in bars.iter().enumerate() {
1421 println!(
1422 " Bar {}: O={} H={} L={} C={}",
1423 i + 1,
1424 bar.open,
1425 bar.high,
1426 bar.low,
1427 bar.close
1428 );
1429 }
1430
1431 assert!(
1433 !bars.is_empty(),
1434 "ExportOpenDeviationBarProcessor should generate same results as basic processor"
1435 );
1436 }
1437
1438 #[test]
1441 fn test_checkpoint_creation() {
1442 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1443
1444 let trades = scenarios::no_breach_sequence(250);
1446 let _bars = processor.process_agg_trade_records(&trades).unwrap();
1447
1448 let checkpoint = processor.create_checkpoint("BTCUSDT");
1450
1451 assert_eq!(checkpoint.symbol, "BTCUSDT");
1452 assert_eq!(checkpoint.threshold_decimal_bps, 250);
1453 assert!(checkpoint.has_incomplete_bar()); assert!(checkpoint.thresholds.is_some()); assert!(checkpoint.last_trade_id.is_some()); }
1457
1458 #[test]
1459 fn test_checkpoint_serialization_roundtrip() {
1460 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1461
1462 let trades = scenarios::no_breach_sequence(250);
1464 let _bars = processor.process_agg_trade_records(&trades).unwrap();
1465
1466 let checkpoint = processor.create_checkpoint("BTCUSDT");
1468
1469 let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
1471
1472 let restored: Checkpoint =
1474 serde_json::from_str(&json).expect("Deserialization should succeed");
1475
1476 assert_eq!(restored.symbol, checkpoint.symbol);
1477 assert_eq!(
1478 restored.threshold_decimal_bps,
1479 checkpoint.threshold_decimal_bps
1480 );
1481 assert_eq!(
1482 restored.incomplete_bar.is_some(),
1483 checkpoint.incomplete_bar.is_some()
1484 );
1485 }
1486
1487 #[test]
1488 fn test_cross_file_bar_continuation() {
1489 let mut all_trades = Vec::new();
1494
1495 let base_timestamp = 1640995200000000i64; for i in 0..20 {
1501 let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
1502 let trade = test_utils::create_test_agg_trade(
1503 i + 1,
1504 &format!("{:.8}", price),
1505 "1.0",
1506 base_timestamp + (i * 1000000),
1507 );
1508 all_trades.push(trade);
1509 }
1510
1511 let mut processor_full = OpenDeviationBarProcessor::new(100).unwrap(); let bars_full = processor_full
1514 .process_agg_trade_records(&all_trades)
1515 .unwrap();
1516
1517 let split_point = 10; let mut processor_1 = OpenDeviationBarProcessor::new(100).unwrap();
1522 let part1_trades = &all_trades[0..split_point];
1523 let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
1524
1525 let checkpoint = processor_1.create_checkpoint("TEST");
1527
1528 let mut processor_2 = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
1530 let part2_trades = &all_trades[split_point..];
1531 let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
1532
1533 let split_total = bars_1.len() + bars_2.len();
1536
1537 println!("Full processing: {} bars", bars_full.len());
1538 println!(
1539 "Split processing: {} + {} = {} bars",
1540 bars_1.len(),
1541 bars_2.len(),
1542 split_total
1543 );
1544
1545 assert_eq!(
1546 split_total,
1547 bars_full.len(),
1548 "Split processing should produce same bar count as full processing"
1549 );
1550
1551 let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
1553 for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
1554 assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
1555 assert_eq!(
1556 full.close.0, split.close.0,
1557 "Bar {} close price mismatch",
1558 i
1559 );
1560 }
1561 }
1562
1563 #[test]
1564 fn test_verify_position_exact() {
1565 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1566
1567 let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1569 let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1570
1571 let _ = processor.process_single_trade(&trade1);
1572 let _ = processor.process_single_trade(&trade2);
1573
1574 let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
1576
1577 let verification = processor.verify_position(&next_trade);
1579
1580 assert_eq!(verification, PositionVerification::Exact);
1581 }
1582
1583 #[test]
1584 fn test_verify_position_gap() {
1585 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1586
1587 let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
1589 let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
1590
1591 let _ = processor.process_single_trade(&trade1);
1592 let _ = processor.process_single_trade(&trade2);
1593
1594 let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
1596
1597 let verification = processor.verify_position(&next_trade);
1599
1600 match verification {
1601 PositionVerification::Gap {
1602 expected_id,
1603 actual_id,
1604 missing_count,
1605 } => {
1606 assert_eq!(expected_id, 102);
1607 assert_eq!(actual_id, 105);
1608 assert_eq!(missing_count, 3);
1609 }
1610 _ => panic!("Expected Gap verification, got {:?}", verification),
1611 }
1612 }
1613
1614 #[test]
1616 fn test_verify_position_timestamp_only() {
1617 let processor = OpenDeviationBarProcessor::new(250).unwrap();
1619
1620 let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 5000000);
1621 let verification = processor.verify_position(&trade);
1622
1623 match verification {
1626 PositionVerification::TimestampOnly { gap_ms } => {
1627 assert_eq!(gap_ms, 5000, "gap_ms should be (timestamp - 0) / 1000");
1628 }
1629 _ => panic!("Expected TimestampOnly verification, got {:?}", verification),
1630 }
1631 }
1632
1633 #[test]
1634 fn test_checkpoint_clean_completion() {
1635 let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); let trades = vec![
1642 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1643 test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), ];
1645
1646 let bars = processor.process_agg_trade_records(&trades).unwrap();
1647 assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1648
1649 let checkpoint = processor.create_checkpoint("TEST");
1652
1653 assert!(
1655 !checkpoint.has_incomplete_bar(),
1656 "No incomplete bar when last trade was a breach with no following trade"
1657 );
1658 }
1659
1660 #[test]
1661 fn test_checkpoint_with_remainder() {
1662 let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); let trades = vec![
1667 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1668 test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), ];
1671
1672 let bars = processor.process_agg_trade_records(&trades).unwrap();
1673 assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1674
1675 let checkpoint = processor.create_checkpoint("TEST");
1677
1678 assert!(
1679 checkpoint.has_incomplete_bar(),
1680 "Should have incomplete bar from trade 3"
1681 );
1682
1683 let incomplete = checkpoint.incomplete_bar.unwrap();
1685 assert_eq!(
1686 incomplete.open.to_string(),
1687 "50110.00000000",
1688 "Incomplete bar should open at trade 3 price"
1689 );
1690 }
1691
1692 #[test]
1699 fn test_streaming_batch_parity() {
1700 let threshold = 250; let trades = test_utils::AggTradeBuilder::new()
1704 .add_trade(1, 1.0, 0) .add_trade(2, 1.001, 1000) .add_trade(3, 1.003, 2000) .add_trade(4, 1.004, 3000) .add_trade(5, 1.005, 4000) .add_trade(6, 1.008, 5000) .add_trade(7, 1.009, 6000) .build();
1712
1713 let mut batch_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
1715 let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
1716 let batch_incomplete = batch_processor.get_incomplete_bar();
1717
1718 let mut stream_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
1720 let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
1721 for trade in &trades {
1722 if let Some(bar) = stream_processor
1723 .process_single_trade(trade)
1724 .unwrap()
1725 {
1726 stream_bars.push(bar);
1727 }
1728 }
1729 let stream_incomplete = stream_processor.get_incomplete_bar();
1730
1731 assert_eq!(
1733 batch_bars.len(),
1734 stream_bars.len(),
1735 "Batch and streaming should produce same number of completed bars"
1736 );
1737
1738 for (i, (batch_bar, stream_bar)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1739 assert_eq!(
1740 batch_bar.open, stream_bar.open,
1741 "Bar {i}: open price mismatch"
1742 );
1743 assert_eq!(
1744 batch_bar.close, stream_bar.close,
1745 "Bar {i}: close price mismatch"
1746 );
1747 assert_eq!(
1748 batch_bar.high, stream_bar.high,
1749 "Bar {i}: high price mismatch"
1750 );
1751 assert_eq!(batch_bar.low, stream_bar.low, "Bar {i}: low price mismatch");
1752 assert_eq!(
1753 batch_bar.volume, stream_bar.volume,
1754 "Bar {i}: volume mismatch (double-counting?)"
1755 );
1756 assert_eq!(
1757 batch_bar.open_time, stream_bar.open_time,
1758 "Bar {i}: open_time mismatch"
1759 );
1760 assert_eq!(
1761 batch_bar.close_time, stream_bar.close_time,
1762 "Bar {i}: close_time mismatch"
1763 );
1764 assert_eq!(
1765 batch_bar.individual_trade_count, stream_bar.individual_trade_count,
1766 "Bar {i}: trade count mismatch"
1767 );
1768 }
1769
1770 match (batch_incomplete, stream_incomplete) {
1772 (Some(b), Some(s)) => {
1773 assert_eq!(b.open, s.open, "Incomplete bar: open mismatch");
1774 assert_eq!(b.close, s.close, "Incomplete bar: close mismatch");
1775 assert_eq!(b.volume, s.volume, "Incomplete bar: volume mismatch");
1776 }
1777 (None, None) => {} _ => panic!("Incomplete bar presence mismatch between batch and streaming"),
1779 }
1780 }
1781
1782 mod proptest_batch_streaming_parity {
1788 use super::*;
1789 use proptest::prelude::*;
1790
1791 fn trade_sequence(
1793 n: usize,
1794 base_price: f64,
1795 volatility: f64,
1796 ) -> Vec<AggTrade> {
1797 let mut trades = Vec::with_capacity(n);
1798 let mut price = base_price;
1799 let base_ts = 1640995200000i64; for i in 0..n {
1802 let step = ((i as f64 * 0.3).sin() * volatility)
1804 + ((i as f64 * 0.07).cos() * volatility * 0.5);
1805 price += step;
1806 if price < 100.0 {
1808 price = 100.0 + (i as f64 * 0.01).sin().abs() * 50.0;
1809 }
1810
1811 let trade = test_utils::create_test_agg_trade_with_range(
1812 i as i64 + 1,
1813 &format!("{:.8}", price),
1814 "1.50000000",
1815 base_ts + (i as i64 * 500), (i as i64 + 1) * 10,
1817 (i as i64 + 1) * 10,
1818 i % 3 != 0, );
1820 trades.push(trade);
1821 }
1822 trades
1823 }
1824
1825 fn assert_bar_parity(i: usize, batch: &OpenDeviationBar, stream: &OpenDeviationBar) {
1827 assert_eq!(batch.open_time, stream.open_time, "Bar {i}: open_time");
1829 assert_eq!(batch.close_time, stream.close_time, "Bar {i}: close_time");
1830 assert_eq!(batch.open, stream.open, "Bar {i}: open");
1831 assert_eq!(batch.high, stream.high, "Bar {i}: high");
1832 assert_eq!(batch.low, stream.low, "Bar {i}: low");
1833 assert_eq!(batch.close, stream.close, "Bar {i}: close");
1834
1835 assert_eq!(batch.volume, stream.volume, "Bar {i}: volume");
1837 assert_eq!(batch.turnover, stream.turnover, "Bar {i}: turnover");
1838 assert_eq!(batch.buy_volume, stream.buy_volume, "Bar {i}: buy_volume");
1839 assert_eq!(batch.sell_volume, stream.sell_volume, "Bar {i}: sell_volume");
1840 assert_eq!(batch.buy_turnover, stream.buy_turnover, "Bar {i}: buy_turnover");
1841 assert_eq!(batch.sell_turnover, stream.sell_turnover, "Bar {i}: sell_turnover");
1842
1843 assert_eq!(batch.individual_trade_count, stream.individual_trade_count, "Bar {i}: trade_count");
1845 assert_eq!(batch.agg_record_count, stream.agg_record_count, "Bar {i}: agg_record_count");
1846 assert_eq!(batch.first_trade_id, stream.first_trade_id, "Bar {i}: first_trade_id");
1847 assert_eq!(batch.last_trade_id, stream.last_trade_id, "Bar {i}: last_trade_id");
1848 assert_eq!(batch.first_agg_trade_id, stream.first_agg_trade_id, "Bar {i}: first_agg_trade_id");
1849 assert_eq!(batch.last_agg_trade_id, stream.last_agg_trade_id, "Bar {i}: last_agg_trade_id");
1850 assert_eq!(batch.buy_trade_count, stream.buy_trade_count, "Bar {i}: buy_trade_count");
1851 assert_eq!(batch.sell_trade_count, stream.sell_trade_count, "Bar {i}: sell_trade_count");
1852
1853 assert_eq!(batch.vwap, stream.vwap, "Bar {i}: vwap");
1855
1856 assert_eq!(batch.duration_us, stream.duration_us, "Bar {i}: duration_us");
1858 assert_eq!(batch.ofi.to_bits(), stream.ofi.to_bits(), "Bar {i}: ofi");
1859 assert_eq!(batch.vwap_close_deviation.to_bits(), stream.vwap_close_deviation.to_bits(), "Bar {i}: vwap_close_dev");
1860 assert_eq!(batch.price_impact.to_bits(), stream.price_impact.to_bits(), "Bar {i}: price_impact");
1861 assert_eq!(batch.kyle_lambda_proxy.to_bits(), stream.kyle_lambda_proxy.to_bits(), "Bar {i}: kyle_lambda");
1862 assert_eq!(batch.trade_intensity.to_bits(), stream.trade_intensity.to_bits(), "Bar {i}: trade_intensity");
1863 assert_eq!(batch.volume_per_trade.to_bits(), stream.volume_per_trade.to_bits(), "Bar {i}: vol_per_trade");
1864 assert_eq!(batch.aggression_ratio.to_bits(), stream.aggression_ratio.to_bits(), "Bar {i}: aggression_ratio");
1865 assert_eq!(batch.aggregation_density_f64.to_bits(), stream.aggregation_density_f64.to_bits(), "Bar {i}: agg_density");
1866 assert_eq!(batch.turnover_imbalance.to_bits(), stream.turnover_imbalance.to_bits(), "Bar {i}: turnover_imbalance");
1867 }
1868
1869 proptest! {
1870 #[test]
1872 fn batch_streaming_parity_random(
1873 n in 200usize..500,
1874 volatility in 10.0f64..200.0,
1875 ) {
1876 let trades = trade_sequence(n, 50000.0, volatility);
1877
1878 let mut batch_proc = OpenDeviationBarProcessor::new(250).unwrap();
1880 let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
1881 let batch_incomplete = batch_proc.get_incomplete_bar();
1882
1883 let mut stream_proc = OpenDeviationBarProcessor::new(250).unwrap();
1885 let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
1886 for trade in &trades {
1887 if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
1888 stream_bars.push(bar);
1889 }
1890 }
1891 let stream_incomplete = stream_proc.get_incomplete_bar();
1892
1893 prop_assert_eq!(batch_bars.len(), stream_bars.len(),
1895 "Completed bar count mismatch: batch={}, stream={} for n={}, vol={}",
1896 batch_bars.len(), stream_bars.len(), n, volatility);
1897
1898 for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1900 assert_bar_parity(i, b, s);
1901 }
1902
1903 match (&batch_incomplete, &stream_incomplete) {
1905 (Some(b), Some(s)) => assert_bar_parity(batch_bars.len(), b, s),
1906 (None, None) => {}
1907 _ => prop_assert!(false,
1908 "Incomplete bar presence mismatch: batch={}, stream={}",
1909 batch_incomplete.is_some(), stream_incomplete.is_some()),
1910 }
1911 }
1912
1913 #[test]
1915 fn batch_streaming_parity_thresholds(
1916 threshold in 100u32..1000,
1917 ) {
1918 let trades = trade_sequence(300, 50000.0, 80.0);
1919
1920 let mut batch_proc = OpenDeviationBarProcessor::new(threshold).unwrap();
1921 let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
1922
1923 let mut stream_proc = OpenDeviationBarProcessor::new(threshold).unwrap();
1924 let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
1925 for trade in &trades {
1926 if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
1927 stream_bars.push(bar);
1928 }
1929 }
1930
1931 prop_assert_eq!(batch_bars.len(), stream_bars.len(),
1932 "Bar count mismatch at threshold={}", threshold);
1933
1934 for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
1935 assert_bar_parity(i, b, s);
1936 }
1937 }
1938 }
1939 }
1940
1941 #[test]
1943 fn test_defer_open_new_bar_opens_with_next_trade() {
1944 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1945
1946 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
1948 assert!(processor.process_single_trade(&t1).unwrap().is_none());
1949
1950 let t2 = test_utils::create_test_agg_trade(2, "50150.0", "2.0", 2000);
1952 let bar = processor.process_single_trade(&t2).unwrap();
1953 assert!(bar.is_some(), "Should close bar on breach");
1954
1955 let closed_bar = bar.unwrap();
1956 assert_eq!(closed_bar.open.to_string(), "50000.00000000");
1957 assert_eq!(closed_bar.close.to_string(), "50150.00000000");
1958
1959 assert!(
1961 processor.get_incomplete_bar().is_none(),
1962 "No incomplete bar after breach - defer_open is true"
1963 );
1964
1965 let t3 = test_utils::create_test_agg_trade(3, "50100.0", "3.0", 3000);
1967 assert!(processor.process_single_trade(&t3).unwrap().is_none());
1968
1969 let incomplete = processor.get_incomplete_bar().unwrap();
1970 assert_eq!(
1971 incomplete.open.to_string(),
1972 "50100.00000000",
1973 "New bar should open at trade 3's price, not trade 2's"
1974 );
1975 }
1976
1977 #[test]
1980 fn test_bar_close_take_single_trade() {
1981 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
1984 let trades = scenarios::single_breach_sequence(250);
1985
1986 for trade in &trades[..trades.len() - 1] {
1987 let result = processor.process_single_trade(trade).unwrap();
1988 assert!(result.is_none());
1989 }
1990
1991 let bar = processor
1993 .process_single_trade(trades.last().unwrap())
1994 .unwrap()
1995 .expect("Should produce completed bar");
1996
1997 assert_eq!(bar.open.to_string(), "50000.00000000");
1999 assert!(bar.high >= bar.open.max(bar.close));
2000 assert!(bar.low <= bar.open.min(bar.close));
2001 assert!(bar.volume > 0);
2002
2003 assert!(processor.get_incomplete_bar().is_none());
2005 }
2006
2007 #[test]
2008 fn test_bar_close_take_batch() {
2009 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2012 let trades = scenarios::large_sequence(500);
2013
2014 let bars = processor.process_agg_trade_records(&trades).unwrap();
2015 assert!(
2016 !bars.is_empty(),
2017 "Should produce at least one completed bar"
2018 );
2019
2020 for bar in &bars {
2022 assert!(bar.high >= bar.open.max(bar.close));
2023 assert!(bar.low <= bar.open.min(bar.close));
2024 assert!(bar.volume > 0);
2025 assert!(bar.close_time >= bar.open_time);
2026 }
2027 }
2028
2029 #[test]
2030 fn test_checkpoint_conditional_clone() {
2031 let trades = scenarios::no_breach_sequence(250);
2034
2035 let mut processor1 = OpenDeviationBarProcessor::new(250).unwrap();
2037 let bars_without = processor1.process_agg_trade_records(&trades).unwrap();
2038 assert_eq!(bars_without.len(), 0);
2039 assert!(processor1.get_incomplete_bar().is_some());
2041
2042 let mut processor2 = OpenDeviationBarProcessor::new(250).unwrap();
2044 let bars_with = processor2
2045 .process_agg_trade_records_with_incomplete(&trades)
2046 .unwrap();
2047 assert_eq!(bars_with.len(), 1);
2048 assert!(processor2.get_incomplete_bar().is_some());
2050
2051 let cp1 = processor1.get_incomplete_bar().unwrap();
2053 let cp2 = processor2.get_incomplete_bar().unwrap();
2054 assert_eq!(cp1.open, cp2.open);
2055 assert_eq!(cp1.close, cp2.close);
2056 assert_eq!(cp1.high, cp2.high);
2057 assert_eq!(cp1.low, cp2.low);
2058 }
2059
2060 #[test]
2061 fn test_checkpoint_v1_to_v2_migration() {
2062 let v1_json = r#"{
2065 "symbol": "BTCUSDT",
2066 "threshold_decimal_bps": 250,
2067 "incomplete_bar": null,
2068 "thresholds": null,
2069 "last_timestamp_us": 1640995200000000,
2070 "last_trade_id": 5000,
2071 "price_hash": 0,
2072 "anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0},
2073 "prevent_same_timestamp_close": true,
2074 "defer_open": false
2075 }"#;
2076
2077 let checkpoint: Checkpoint = serde_json::from_str(v1_json).unwrap();
2079 assert_eq!(checkpoint.version, 1, "Old checkpoints should default to v1");
2080 assert_eq!(checkpoint.symbol, "BTCUSDT");
2081 assert_eq!(checkpoint.threshold_decimal_bps, 250);
2082
2083 let mut processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2085
2086 assert!(!processor.get_incomplete_bar().is_some(), "No incomplete bar before processing");
2088
2089 let trades = scenarios::single_breach_sequence(250);
2091 let bars = processor.process_agg_trade_records(&trades).unwrap();
2092
2093 assert!(!bars.is_empty(), "Should produce bars after v1→v2 migration");
2095 assert!(bars[0].volume > 0, "Bar should have volume after migration");
2097 assert!(bars[0].close_time >= bars[0].open_time, "Bar times should be valid");
2098
2099 let new_checkpoint = processor.create_checkpoint("BTCUSDT");
2101 assert_eq!(new_checkpoint.version, 2, "New checkpoints should be v2");
2102 assert_eq!(new_checkpoint.symbol, "BTCUSDT");
2103
2104 let json = serde_json::to_string(&new_checkpoint).unwrap();
2106 let restored: Checkpoint = serde_json::from_str(&json).unwrap();
2107 assert_eq!(restored.version, 2);
2108 assert_eq!(restored.symbol, "BTCUSDT");
2109 }
2110
2111 #[test]
2116 fn test_from_checkpoint_invalid_threshold_zero() {
2117 let checkpoint = Checkpoint::new(
2118 "BTCUSDT".to_string(), 0, None, None, 0, None, 0, true,
2119 );
2120 match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
2121 Err(CheckpointError::InvalidThreshold { threshold: 0, .. }) => {}
2122 other => panic!("Expected InvalidThreshold(0), got {:?}", other.err()),
2123 }
2124 }
2125
2126 #[test]
2127 fn test_from_checkpoint_invalid_threshold_too_high() {
2128 let checkpoint = Checkpoint::new(
2129 "BTCUSDT".to_string(), 200_000, None, None, 0, None, 0, true,
2130 );
2131 match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
2132 Err(CheckpointError::InvalidThreshold { threshold: 200_000, .. }) => {}
2133 other => panic!("Expected InvalidThreshold(200000), got {:?}", other.err()),
2134 }
2135 }
2136
2137 #[test]
2138 fn test_from_checkpoint_missing_thresholds() {
2139 let bar = OpenDeviationBar::new(&test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000));
2140 let mut checkpoint = Checkpoint::new(
2141 "BTCUSDT".to_string(), 250, None, None, 0, None, 0, true,
2142 );
2143 checkpoint.incomplete_bar = Some(bar);
2144 checkpoint.thresholds = None;
2145
2146 match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
2147 Err(CheckpointError::MissingThresholds) => {}
2148 other => panic!("Expected MissingThresholds, got {:?}", other.err()),
2149 }
2150 }
2151
2152 #[test]
2153 fn test_from_checkpoint_unknown_version_treated_as_v2() {
2154 let mut checkpoint = Checkpoint::new(
2155 "BTCUSDT".to_string(), 250, None, None, 0, None, 0, true,
2156 );
2157 checkpoint.version = 99;
2158
2159 let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2160 assert_eq!(processor.threshold_decimal_bps(), 250);
2161 }
2162
2163 #[test]
2164 fn test_from_checkpoint_valid_with_incomplete_bar() {
2165 use crate::fixed_point::FixedPoint;
2166 let bar = OpenDeviationBar::new(&test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000));
2167 let upper = FixedPoint::from_str("50125.0").unwrap();
2168 let lower = FixedPoint::from_str("49875.0").unwrap();
2169
2170 let checkpoint = Checkpoint::new(
2171 "BTCUSDT".to_string(), 250, Some(bar), Some((upper, lower)), 0, None, 0, true,
2172 );
2173
2174 let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2175 assert!(processor.get_incomplete_bar().is_some(), "Should restore incomplete bar");
2176 }
2177
2178 #[test]
2183 fn test_reset_at_ouroboros_with_orphan() {
2184 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2185
2186 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2188 let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 2000);
2189 assert!(processor.process_single_trade(&t1).unwrap().is_none());
2190 assert!(processor.process_single_trade(&t2).unwrap().is_none());
2191 assert!(processor.get_incomplete_bar().is_some(), "Should have incomplete bar");
2192
2193 let orphan = processor.reset_at_ouroboros();
2195 assert!(orphan.is_some(), "Should return orphaned bar");
2196 let orphan_bar = orphan.unwrap();
2197 assert_eq!(orphan_bar.open.to_string(), "50000.00000000");
2198
2199 assert!(processor.get_incomplete_bar().is_none(), "No bar after reset");
2201 }
2202
2203 #[test]
2204 fn test_reset_at_ouroboros_clean_state() {
2205 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2206
2207 let orphan = processor.reset_at_ouroboros();
2209 assert!(orphan.is_none(), "No orphan when state is clean");
2210 assert!(processor.get_incomplete_bar().is_none());
2211 }
2212
2213 #[test]
2214 fn test_reset_at_ouroboros_clears_defer_open() {
2215 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2216
2217 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2219 let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); processor.process_single_trade(&t1).unwrap();
2221 let bar = processor.process_single_trade(&t2).unwrap();
2222 assert!(bar.is_some(), "Should breach");
2223
2224 assert!(processor.get_incomplete_bar().is_none());
2226
2227 processor.reset_at_ouroboros();
2229
2230 let t3 = test_utils::create_test_agg_trade(3, "50000.0", "1.0", 3000);
2232 processor.process_single_trade(&t3).unwrap();
2233 assert!(processor.get_incomplete_bar().is_some(), "Should have new bar after reset");
2234 }
2235
2236 #[test]
2239 fn test_single_trade_no_bar() {
2240 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2242 let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2243 let bars = processor.process_agg_trade_records(&[trade]).unwrap();
2244 assert_eq!(bars.len(), 0, "Single trade should not produce a completed bar");
2245 assert!(processor.get_incomplete_bar().is_some(), "Should have incomplete bar");
2246 }
2247
2248 #[test]
2249 fn test_identical_timestamps_no_close() {
2250 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2252 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2253 let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1000); let bars = processor.process_agg_trade_records(&[t1, t2]).unwrap();
2255 assert_eq!(bars.len(), 0, "Bar should not close on same timestamp as open (Issue #36)");
2256 }
2257
2258 #[test]
2259 fn test_identical_timestamps_then_different_closes() {
2260 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2262 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2263 let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 1000); let t3 = test_utils::create_test_agg_trade(3, "50200.0", "1.0", 2000); let bars = processor.process_agg_trade_records(&[t1, t2, t3]).unwrap();
2266 assert_eq!(bars.len(), 1, "Should close when breach at different timestamp");
2267 }
2268
2269 #[test]
2270 fn test_streaming_defer_open_semantics() {
2271 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2273 let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2274 let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); let t3 = test_utils::create_test_agg_trade(3, "51000.0", "1.0", 3000); processor.process_single_trade(&t1).unwrap();
2278 let bar = processor.process_single_trade(&t2).unwrap();
2279 assert!(bar.is_some(), "Trade 2 should cause a breach");
2280
2281 assert!(processor.get_incomplete_bar().is_none());
2283
2284 let bar2 = processor.process_single_trade(&t3).unwrap();
2286 assert!(bar2.is_none(), "Trade 3 should open new bar, not breach");
2287 let incomplete = processor.get_incomplete_bar().unwrap();
2288 assert_eq!(incomplete.open.to_f64(), 51000.0, "New bar should open at t3 price");
2289 }
2290
2291 #[test]
2292 fn test_process_empty_then_trades() {
2293 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2295 let bars = processor.process_agg_trade_records(&[]).unwrap();
2296 assert_eq!(bars.len(), 0);
2297 assert!(processor.get_incomplete_bar().is_none());
2298
2299 let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
2301 let bars = processor.process_agg_trade_records(&[trade]).unwrap();
2302 assert_eq!(bars.len(), 0);
2303 assert!(processor.get_incomplete_bar().is_some());
2304 }
2305
2306 #[test]
2307 fn test_multiple_breaches_in_batch() {
2308 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2310 let trades = vec![
2311 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
2312 test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000), test_utils::create_test_agg_trade(3, "50500.0", "1.0", 3000), test_utils::create_test_agg_trade(4, "50700.0", "1.0", 4000), test_utils::create_test_agg_trade(5, "51000.0", "1.0", 5000), ];
2317 let bars = processor.process_agg_trade_records(&trades).unwrap();
2318 assert_eq!(bars.len(), 2, "Should produce 2 completed bars from 2 breaches");
2319 }
2320
2321 #[test]
2322 fn test_streaming_batch_parity_extended() {
2323 let threshold = 100;
2326
2327 let mut trades = Vec::new();
2329 let mut price = 50000.0;
2330 for i in 0..20 {
2331 if i % 3 == 0 && i > 0 {
2333 price *= 1.002; } else if i % 3 == 1 && i > 1 {
2335 price *= 0.998; } else {
2337 price *= 1.0005; }
2339 trades.push(test_utils::create_test_agg_trade(
2340 (i + 1) as i64,
2341 &format!("{:.8}", price),
2342 "1.0",
2343 (i as i64 + 1) * 1000,
2344 ));
2345 }
2346
2347 let mut batch_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
2349 let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
2350
2351 let mut stream_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
2353 let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
2354 for trade in &trades {
2355 if let Some(bar) = stream_processor.process_single_trade(trade).unwrap() {
2356 stream_bars.push(bar);
2357 }
2358 }
2359
2360 assert!(batch_bars.len() >= 3, "Should produce at least 3 bars from zigzag pattern");
2362 assert_eq!(
2363 batch_bars.len(), stream_bars.len(),
2364 "Batch ({}) and streaming ({}) bar count mismatch",
2365 batch_bars.len(), stream_bars.len()
2366 );
2367
2368 for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
2369 assert_eq!(b.open, s.open, "Bar {i}: open mismatch");
2370 assert_eq!(b.close, s.close, "Bar {i}: close mismatch");
2371 assert_eq!(b.high, s.high, "Bar {i}: high mismatch");
2372 assert_eq!(b.low, s.low, "Bar {i}: low mismatch");
2373 assert_eq!(b.volume, s.volume, "Bar {i}: volume mismatch");
2374 assert_eq!(b.open_time, s.open_time, "Bar {i}: open_time mismatch");
2375 assert_eq!(b.close_time, s.close_time, "Bar {i}: close_time mismatch");
2376 assert_eq!(b.individual_trade_count, s.individual_trade_count, "Bar {i}: trade_count mismatch");
2377 }
2378
2379 let batch_inc = batch_processor.get_incomplete_bar();
2381 let stream_inc = stream_processor.get_incomplete_bar();
2382 match (&batch_inc, &stream_inc) {
2383 (Some(b), Some(s)) => {
2384 assert_eq!(b.open, s.open, "Incomplete: open mismatch");
2385 assert_eq!(b.volume, s.volume, "Incomplete: volume mismatch");
2386 }
2387 (None, None) => {}
2388 _ => panic!("Incomplete bar presence mismatch"),
2389 }
2390 }
2391
2392 #[test]
2393 fn test_multi_batch_sequential_state_continuity() {
2394 let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); let mut all_bars = Vec::new();
2398
2399 let batch1 = vec![
2401 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
2402 test_utils::create_test_agg_trade(2, "50020.0", "1.0", 2000),
2403 test_utils::create_test_agg_trade(3, "50060.0", "1.0", 3000), ];
2405 let bars1 = processor.process_agg_trade_records(&batch1).unwrap();
2406 all_bars.extend(bars1);
2407
2408 let batch2 = vec![
2410 test_utils::create_test_agg_trade(4, "50100.0", "1.0", 4000), test_utils::create_test_agg_trade(5, "50120.0", "1.0", 5000),
2412 test_utils::create_test_agg_trade(6, "50170.0", "1.0", 6000), ];
2414 let bars2 = processor.process_agg_trade_records(&batch2).unwrap();
2415 all_bars.extend(bars2);
2416
2417 let batch3 = vec![
2419 test_utils::create_test_agg_trade(7, "50200.0", "1.0", 7000), test_utils::create_test_agg_trade(8, "50220.0", "1.0", 8000),
2421 test_utils::create_test_agg_trade(9, "50280.0", "1.0", 9000), ];
2423 let bars3 = processor.process_agg_trade_records(&batch3).unwrap();
2424 all_bars.extend(bars3);
2425
2426 assert!(
2428 all_bars.len() >= 3,
2429 "Expected at least 3 bars from 3 batches, got {}",
2430 all_bars.len()
2431 );
2432
2433 for i in 1..all_bars.len() {
2435 assert!(
2436 all_bars[i].close_time >= all_bars[i - 1].close_time,
2437 "Bar {i}: close_time {} < previous {}",
2438 all_bars[i].close_time,
2439 all_bars[i - 1].close_time
2440 );
2441 }
2442
2443 for i in 1..all_bars.len() {
2445 assert_eq!(
2446 all_bars[i].first_agg_trade_id,
2447 all_bars[i - 1].last_agg_trade_id + 1,
2448 "Bar {i}: trade ID gap (first={}, prev last={})",
2449 all_bars[i].first_agg_trade_id,
2450 all_bars[i - 1].last_agg_trade_id
2451 );
2452 }
2453 }
2454
2455 #[test]
2458 fn test_same_timestamp_prevents_bar_close() {
2459 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2461 processor.prevent_same_timestamp_close = true;
2462
2463 let trades: Vec<AggTrade> = (0..5)
2465 .map(|i| {
2466 let price_str = if i == 0 {
2467 "50000.0".to_string()
2468 } else {
2469 format!("{}.0", 50000 + (i + 1) * 200)
2471 };
2472 AggTrade {
2473 agg_trade_id: i as i64,
2474 price: FixedPoint::from_str(&price_str).unwrap(),
2475 volume: FixedPoint::from_str("1.0").unwrap(),
2476 first_trade_id: i as i64,
2477 last_trade_id: i as i64,
2478 timestamp: 1000000, is_buyer_maker: false,
2480 is_best_match: None,
2481 }
2482 })
2483 .collect();
2484
2485 let bars = processor.process_agg_trade_records(&trades).unwrap();
2486 assert_eq!(bars.len(), 0, "Same timestamp should prevent bar close (Issue #36)");
2488 }
2489
2490 #[test]
2491 fn test_single_trade_incomplete_bar() {
2492 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2493
2494 let trade = AggTrade {
2495 agg_trade_id: 1,
2496 price: FixedPoint::from_str("50000.0").unwrap(),
2497 volume: FixedPoint::from_str("10.0").unwrap(),
2498 first_trade_id: 1,
2499 last_trade_id: 1,
2500 timestamp: 1000000,
2501 is_buyer_maker: false,
2502 is_best_match: None,
2503 };
2504
2505 let bars = processor.process_agg_trade_records(&[trade.clone()]).unwrap();
2507 assert_eq!(bars.len(), 0, "Single trade cannot complete a bar");
2508
2509 let mut processor2 = OpenDeviationBarProcessor::new(250).unwrap();
2511 let bars_incl = processor2
2512 .process_agg_trade_records_with_incomplete(&[trade])
2513 .unwrap();
2514 assert_eq!(bars_incl.len(), 1, "Should return 1 incomplete bar");
2515 assert_eq!(bars_incl[0].open, bars_incl[0].close);
2516 assert_eq!(bars_incl[0].high, bars_incl[0].low);
2517 }
2518
2519 #[test]
2522 fn test_with_options_gate_disabled_same_timestamp_closes() {
2523 let mut processor = OpenDeviationBarProcessor::with_options(250, false).unwrap();
2526 assert!(!processor.prevent_same_timestamp_close());
2527
2528 let trades = vec![
2529 AggTrade {
2530 agg_trade_id: 1, price: FixedPoint::from_str("50000.0").unwrap(),
2531 volume: FixedPoint::from_str("1.0").unwrap(),
2532 first_trade_id: 1, last_trade_id: 1, timestamp: 1000000,
2533 is_buyer_maker: false, is_best_match: None,
2534 },
2535 AggTrade {
2536 agg_trade_id: 2, price: FixedPoint::from_str("50200.0").unwrap(), volume: FixedPoint::from_str("1.0").unwrap(),
2538 first_trade_id: 2, last_trade_id: 2, timestamp: 1000000, is_buyer_maker: false, is_best_match: None,
2540 },
2541 ];
2542 let bars = processor.process_agg_trade_records(&trades).unwrap();
2543 assert_eq!(bars.len(), 1, "Gate disabled: same-timestamp breach should close bar");
2544 }
2545
2546 #[test]
2547 fn test_inter_bar_config_enables_features() {
2548 use crate::interbar::LookbackMode;
2549 let processor = OpenDeviationBarProcessor::new(250).unwrap();
2550 assert!(!processor.inter_bar_enabled(), "Default: inter-bar disabled");
2551
2552 let processor = processor.with_inter_bar_config(InterBarConfig {
2553 lookback_mode: LookbackMode::FixedCount(100),
2554 compute_tier2: false,
2555 compute_tier3: false,
2556 ..Default::default()
2557 });
2558 assert!(processor.inter_bar_enabled(), "After config: inter-bar enabled");
2559 }
2560
2561 #[test]
2562 fn test_intra_bar_feature_toggle() {
2563 let processor = OpenDeviationBarProcessor::new(250).unwrap();
2564 assert!(!processor.intra_bar_enabled(), "Default: intra-bar disabled");
2565
2566 let processor = processor.with_intra_bar_features();
2567 assert!(processor.intra_bar_enabled(), "After toggle: intra-bar enabled");
2568 }
2569
2570 #[test]
2571 fn test_set_inter_bar_config_after_construction() {
2572 use crate::interbar::LookbackMode;
2573 let mut processor = OpenDeviationBarProcessor::new(500).unwrap();
2574 assert!(!processor.inter_bar_enabled());
2575
2576 processor.set_inter_bar_config(InterBarConfig {
2577 lookback_mode: LookbackMode::FixedCount(200),
2578 compute_tier2: true,
2579 compute_tier3: false,
2580 ..Default::default()
2581 });
2582 assert!(processor.inter_bar_enabled(), "set_inter_bar_config should enable");
2583 }
2584
2585 #[test]
2586 fn test_process_with_options_incomplete_false_vs_true() {
2587 let trades = scenarios::single_breach_sequence(250);
2588
2589 let mut p1 = OpenDeviationBarProcessor::new(250).unwrap();
2591 let bars_strict = p1.process_agg_trade_records_with_options(&trades, false).unwrap();
2592
2593 let mut p2 = OpenDeviationBarProcessor::new(250).unwrap();
2595 let bars_incl = p2.process_agg_trade_records_with_options(&trades, true).unwrap();
2596
2597 assert!(
2598 bars_incl.len() >= bars_strict.len(),
2599 "inclusive ({}) must be >= strict ({})", bars_incl.len(), bars_strict.len()
2600 );
2601 }
2602
2603 #[test]
2606 fn test_anomaly_summary_default_no_anomalies() {
2607 let processor = OpenDeviationBarProcessor::new(250).unwrap();
2608 let summary = processor.anomaly_summary();
2609 assert_eq!(summary.gaps_detected, 0);
2610 assert_eq!(summary.overlaps_detected, 0);
2611 assert_eq!(summary.timestamp_anomalies, 0);
2612 assert!(!summary.has_anomalies());
2613 assert_eq!(summary.total(), 0);
2614 }
2615
2616 #[test]
2617 fn test_anomaly_summary_preserved_through_checkpoint() {
2618 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2620 let trades = scenarios::single_breach_sequence(250);
2621 processor.process_agg_trade_records(&trades).unwrap();
2622
2623 let checkpoint = processor.create_checkpoint("TEST");
2624 let restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2625 let summary = restored.anomaly_summary();
2626 assert_eq!(summary.total(), 0);
2628 }
2629
2630 #[test]
2631 fn test_anomaly_summary_from_checkpoint_with_anomalies() {
2632 let json = r#"{
2634 "version": 3,
2635 "symbol": "TESTUSDT",
2636 "threshold_decimal_bps": 250,
2637 "prevent_same_timestamp_close": true,
2638 "defer_open": false,
2639 "current_bar": null,
2640 "thresholds": null,
2641 "last_timestamp_us": 1000000,
2642 "last_trade_id": 5,
2643 "price_hash": 0,
2644 "anomaly_summary": {"gaps_detected": 3, "overlaps_detected": 1, "timestamp_anomalies": 2}
2645 }"#;
2646 let checkpoint: crate::checkpoint::Checkpoint = serde_json::from_str(json).unwrap();
2647 let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2648 let summary = processor.anomaly_summary();
2649 assert_eq!(summary.gaps_detected, 3);
2650 assert_eq!(summary.overlaps_detected, 1);
2651 assert_eq!(summary.timestamp_anomalies, 2);
2652 assert!(summary.has_anomalies());
2653 assert_eq!(summary.total(), 6);
2654 }
2655
2656 #[test]
2657 fn test_with_inter_bar_config_and_cache_shared() {
2658 use crate::entropy_cache_global::get_global_entropy_cache;
2659 use crate::interbar::LookbackMode;
2660
2661 let global_cache = get_global_entropy_cache();
2662 let config = InterBarConfig {
2663 lookback_mode: LookbackMode::FixedCount(100),
2664 compute_tier2: true,
2665 compute_tier3: true,
2666 ..Default::default()
2667 };
2668
2669 let p1 = OpenDeviationBarProcessor::new(250).unwrap()
2671 .with_inter_bar_config_and_cache(config.clone(), Some(global_cache.clone()));
2672 let p2 = OpenDeviationBarProcessor::new(500).unwrap()
2673 .with_inter_bar_config_and_cache(config, Some(global_cache));
2674
2675 assert!(p1.inter_bar_enabled());
2676 assert!(p2.inter_bar_enabled());
2677 }
2678
2679 #[test]
2680 fn test_set_inter_bar_config_with_cache_after_checkpoint() {
2681 use crate::entropy_cache_global::get_global_entropy_cache;
2682 use crate::interbar::LookbackMode;
2683
2684 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2685 let trades = scenarios::single_breach_sequence(250);
2686 processor.process_agg_trade_records(&trades).unwrap();
2687
2688 let checkpoint = processor.create_checkpoint("TEST");
2690 let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2691 assert!(!restored.inter_bar_enabled(), "Checkpoint does not preserve inter-bar config");
2692
2693 let global_cache = get_global_entropy_cache();
2695 restored.set_inter_bar_config_with_cache(
2696 InterBarConfig {
2697 lookback_mode: LookbackMode::FixedCount(100),
2698 compute_tier2: false,
2699 compute_tier3: false,
2700 ..Default::default()
2701 },
2702 Some(global_cache),
2703 );
2704 assert!(restored.inter_bar_enabled(), "set_inter_bar_config_with_cache should re-enable");
2705 }
2706
2707 #[test]
2708 fn test_threshold_decimal_bps_getter() {
2709 let p250 = OpenDeviationBarProcessor::new(250).unwrap();
2710 assert_eq!(p250.threshold_decimal_bps(), 250);
2711
2712 let p1000 = OpenDeviationBarProcessor::new(1000).unwrap();
2713 assert_eq!(p1000.threshold_decimal_bps(), 1000);
2714 }
2715
2716 #[test]
2721 fn test_checkpoint_gap_discards_forming_bar() {
2722 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2724
2725 let trades = vec![
2727 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000), test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000), test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640995202_000_000), ];
2731
2732 let bars = processor.process_agg_trade_records(&trades).unwrap();
2733 assert_eq!(bars.len(), 0, "No breach = no completed bars");
2734
2735 let checkpoint = processor.create_checkpoint("BTCUSDT");
2737 assert!(checkpoint.has_incomplete_bar(), "Should have forming bar");
2738
2739 let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2741
2742 let gap_trades = vec![
2744 test_utils::create_test_agg_trade(4, "50030.0", "1.0", 1641002402_000_000), test_utils::create_test_agg_trade(5, "50040.0", "1.0", 1641002403_000_000),
2746 ];
2747
2748 let bars = restored.process_agg_trade_records(&gap_trades).unwrap();
2749 assert_eq!(bars.len(), 0, "No bars should complete — forming bar was discarded");
2751 assert_eq!(
2752 restored.anomaly_summary().gaps_detected, 1,
2753 "Gap should be recorded in anomaly summary"
2754 );
2755 }
2756
2757 #[test]
2758 fn test_checkpoint_small_gap_continues_bar() {
2759 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2761
2762 let trades = vec![
2763 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2764 test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000),
2765 ];
2766
2767 let _ = processor.process_agg_trade_records(&trades).unwrap();
2768 let checkpoint = processor.create_checkpoint("BTCUSDT");
2769 let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2770
2771 let small_gap_trades = vec![
2773 test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640997001_000_000), test_utils::create_test_agg_trade(4, "50125.01", "1.0", 1640997002_000_000), ];
2776
2777 let bars = restored.process_agg_trade_records(&small_gap_trades).unwrap();
2778 assert_eq!(bars.len(), 1, "Bar should complete normally with small gap");
2779 assert_eq!(bars[0].open_time, 1640995200_000_000);
2781 assert_eq!(
2782 restored.anomaly_summary().gaps_detected, 0,
2783 "No gap anomaly for small gap"
2784 );
2785 }
2786
2787 #[test]
2788 fn test_checkpoint_gap_custom_max_gap() {
2789 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2791
2792 let trades = vec![
2793 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2794 ];
2795 let _ = processor.process_agg_trade_records(&trades).unwrap();
2796 let checkpoint = processor.create_checkpoint("BTCUSDT");
2797
2798 let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint)
2800 .unwrap()
2801 .with_max_gap(1_800_000_000); let gap_trades = vec![
2805 test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640997900_000_000), ];
2807
2808 let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
2809 assert_eq!(
2810 restored.anomaly_summary().gaps_detected, 1,
2811 "45-min gap should be detected with 30-min threshold"
2812 );
2813 }
2814
2815 #[test]
2816 fn test_is_valid_range_rejects_oversized() {
2817 use crate::fixed_point::FixedPoint;
2818
2819 let threshold_decimal_bps: u32 = 250; let threshold_ratio = ((threshold_decimal_bps as i64) * crate::fixed_point::SCALE)
2821 / (crate::fixed_point::BASIS_POINTS_SCALE as i64);
2822
2823 let mut oversized = OpenDeviationBar::default();
2826 oversized.open = FixedPoint::from_str("50000.0").unwrap();
2827 oversized.high = FixedPoint::from_str("50250.01").unwrap();
2828 oversized.low = FixedPoint::from_str("50000.0").unwrap();
2829 assert!(
2830 !oversized.is_valid_range(threshold_ratio, 2),
2831 "Bar exceeding 2x threshold should be invalid"
2832 );
2833
2834 let mut valid = OpenDeviationBar::default();
2836 valid.open = FixedPoint::from_str("50000.0").unwrap();
2837 valid.high = FixedPoint::from_str("50100.0").unwrap();
2838 valid.low = FixedPoint::from_str("50000.0").unwrap();
2839 assert!(
2840 valid.is_valid_range(threshold_ratio, 2),
2841 "Bar within threshold should be valid"
2842 );
2843
2844 let mut exact = OpenDeviationBar::default();
2846 exact.open = FixedPoint::from_str("50000.0").unwrap();
2847 exact.high = FixedPoint::from_str("50125.0").unwrap();
2848 exact.low = FixedPoint::from_str("50000.0").unwrap();
2849 assert!(
2850 exact.is_valid_range(threshold_ratio, 2),
2851 "Bar at exact threshold should be valid"
2852 );
2853 }
2854
2855 #[test]
2856 fn test_checkpoint_no_incomplete_bar_gap_is_noop() {
2857 let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
2859
2860 let trades = vec![
2862 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
2863 test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1640995201_000_000), ];
2865 let bars = processor.process_agg_trade_records(&trades).unwrap();
2866 assert_eq!(bars.len(), 1);
2867
2868 let checkpoint = processor.create_checkpoint("BTCUSDT");
2869 assert!(!checkpoint.has_incomplete_bar());
2870
2871 let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
2872
2873 let gap_trades = vec![
2875 test_utils::create_test_agg_trade(3, "50010.0", "1.0", 1641081600_000_000), ];
2877 let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
2878 assert_eq!(
2879 restored.anomaly_summary().gaps_detected, 0,
2880 "No gap anomaly when no forming bar exists"
2881 );
2882 }
2883}