1use crate::checkpoint::{
7 AnomalySummary, Checkpoint, CheckpointError, PositionVerification, PriceWindow,
8};
9use crate::fixed_point::FixedPoint;
10use crate::types::{AggTrade, RangeBar};
11#[cfg(feature = "python")]
12use pyo3::prelude::*;
13use thiserror::Error;
14
15pub struct RangeBarProcessor {
17 threshold_decimal_bps: u32,
19
20 current_bar_state: Option<RangeBarState>,
23
24 price_window: PriceWindow,
26
27 last_trade_id: Option<i64>,
29
30 last_timestamp_us: i64,
32
33 anomaly_summary: AnomalySummary,
35
36 resumed_from_checkpoint: bool,
39}
40
41impl RangeBarProcessor {
42 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
56 if threshold_decimal_bps < 1 {
60 return Err(ProcessingError::InvalidThreshold {
61 threshold_decimal_bps,
62 });
63 }
64 if threshold_decimal_bps > 100_000 {
65 return Err(ProcessingError::InvalidThreshold {
66 threshold_decimal_bps,
67 });
68 }
69
70 Ok(Self {
71 threshold_decimal_bps,
72 current_bar_state: None,
73 price_window: PriceWindow::new(),
74 last_trade_id: None,
75 last_timestamp_us: 0,
76 anomaly_summary: AnomalySummary::default(),
77 resumed_from_checkpoint: false,
78 })
79 }
80
81 pub fn process_single_trade(
100 &mut self,
101 trade: AggTrade,
102 ) -> Result<Option<RangeBar>, ProcessingError> {
103 self.price_window.push(trade.price);
105 self.last_trade_id = Some(trade.agg_trade_id);
106 self.last_timestamp_us = trade.timestamp;
107
108 match &mut self.current_bar_state {
109 None => {
110 self.current_bar_state =
112 Some(RangeBarState::new(&trade, self.threshold_decimal_bps));
113 Ok(None)
114 }
115 Some(bar_state) => {
116 if bar_state.bar.is_breach(
118 trade.price,
119 bar_state.upper_threshold,
120 bar_state.lower_threshold,
121 ) {
122 bar_state.bar.update_with_trade(&trade);
124
125 debug_assert!(
127 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
128 );
129 debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
130
131 let completed_bar = bar_state.bar.clone();
132
133 self.current_bar_state =
135 Some(RangeBarState::new(&trade, self.threshold_decimal_bps));
136
137 Ok(Some(completed_bar))
138 } else {
139 bar_state.bar.update_with_trade(&trade);
141 Ok(None)
142 }
143 }
144 }
145 }
146
147 pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
156 self.current_bar_state
157 .as_ref()
158 .map(|state| state.bar.clone())
159 }
160
161 pub fn process_agg_trade_records_with_incomplete(
176 &mut self,
177 agg_trade_records: &[AggTrade],
178 ) -> Result<Vec<RangeBar>, ProcessingError> {
179 self.process_agg_trade_records_with_options(agg_trade_records, true)
180 }
181
182 pub fn process_agg_trade_records(
197 &mut self,
198 agg_trade_records: &[AggTrade],
199 ) -> Result<Vec<RangeBar>, ProcessingError> {
200 self.process_agg_trade_records_with_options(agg_trade_records, false)
201 }
202
203 pub fn process_agg_trade_records_with_options(
217 &mut self,
218 agg_trade_records: &[AggTrade],
219 include_incomplete: bool,
220 ) -> Result<Vec<RangeBar>, ProcessingError> {
221 if agg_trade_records.is_empty() {
222 return Ok(Vec::new());
223 }
224
225 self.validate_trade_ordering(agg_trade_records)?;
227
228 let mut current_bar: Option<RangeBarState> = if self.resumed_from_checkpoint {
231 self.resumed_from_checkpoint = false; self.current_bar_state.take()
234 } else {
235 self.current_bar_state = None;
237 None
238 };
239
240 let mut bars = Vec::with_capacity(agg_trade_records.len() / 100); let mut defer_open = false;
242
243 for agg_record in agg_trade_records {
244 self.price_window.push(agg_record.price);
246 self.last_trade_id = Some(agg_record.agg_trade_id);
247 self.last_timestamp_us = agg_record.timestamp;
248
249 if defer_open {
250 current_bar = Some(RangeBarState::new(agg_record, self.threshold_decimal_bps));
252 defer_open = false;
253 continue;
254 }
255
256 match current_bar {
257 None => {
258 current_bar = Some(RangeBarState::new(agg_record, self.threshold_decimal_bps));
260 }
261 Some(ref mut bar_state) => {
262 if bar_state.bar.is_breach(
264 agg_record.price,
265 bar_state.upper_threshold,
266 bar_state.lower_threshold,
267 ) {
268 bar_state.bar.update_with_trade(agg_record);
270
271 debug_assert!(
273 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
274 );
275 debug_assert!(
276 bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
277 );
278
279 bars.push(bar_state.bar.clone());
280 current_bar = None;
281 defer_open = true; } else {
283 bar_state.bar.update_with_trade(agg_record);
285 }
286 }
287 }
288 }
289
290 self.current_bar_state = current_bar.clone();
292
293 if include_incomplete && let Some(bar_state) = current_bar {
296 bars.push(bar_state.bar);
297 }
298
299 Ok(bars)
300 }
301
302 pub fn create_checkpoint(&self, symbol: &str) -> Checkpoint {
324 let (incomplete_bar, thresholds) = match &self.current_bar_state {
325 Some(state) => (
326 Some(state.bar.clone()),
327 Some((state.upper_threshold, state.lower_threshold)),
328 ),
329 None => (None, None),
330 };
331
332 Checkpoint::new(
333 symbol.to_string(),
334 self.threshold_decimal_bps,
335 incomplete_bar,
336 thresholds,
337 self.last_timestamp_us,
338 self.last_trade_id,
339 self.price_window.compute_hash(),
340 )
341 }
342
343 pub fn from_checkpoint(checkpoint: Checkpoint) -> Result<Self, CheckpointError> {
361 if checkpoint.incomplete_bar.is_some() && checkpoint.thresholds.is_none() {
363 return Err(CheckpointError::MissingThresholds);
364 }
365
366 let current_bar_state = match (checkpoint.incomplete_bar, checkpoint.thresholds) {
368 (Some(bar), Some((upper, lower))) => Some(RangeBarState {
369 bar,
370 upper_threshold: upper,
371 lower_threshold: lower,
372 }),
373 _ => None,
374 };
375
376 Ok(Self {
377 threshold_decimal_bps: checkpoint.threshold_decimal_bps,
378 current_bar_state,
379 price_window: PriceWindow::new(), last_trade_id: checkpoint.last_trade_id,
381 last_timestamp_us: checkpoint.last_timestamp_us,
382 anomaly_summary: checkpoint.anomaly_summary,
383 resumed_from_checkpoint: true, })
385 }
386
387 pub fn verify_position(&self, first_trade: &AggTrade) -> PositionVerification {
412 match self.last_trade_id {
413 Some(last_id) => {
414 let expected_id = last_id + 1;
416 if first_trade.agg_trade_id == expected_id {
417 PositionVerification::Exact
418 } else {
419 let missing_count = first_trade.agg_trade_id - expected_id;
420 PositionVerification::Gap {
421 expected_id,
422 actual_id: first_trade.agg_trade_id,
423 missing_count,
424 }
425 }
426 }
427 None => {
428 let gap_us = first_trade.timestamp - self.last_timestamp_us;
430 let gap_ms = gap_us / 1000;
431 PositionVerification::TimestampOnly { gap_ms }
432 }
433 }
434 }
435
436 pub fn anomaly_summary(&self) -> &AnomalySummary {
438 &self.anomaly_summary
439 }
440
441 pub fn threshold_decimal_bps(&self) -> u32 {
443 self.threshold_decimal_bps
444 }
445
446 fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
448 for i in 1..trades.len() {
449 let prev = &trades[i - 1];
450 let curr = &trades[i];
451
452 if curr.timestamp < prev.timestamp
454 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
455 {
456 return Err(ProcessingError::UnsortedTrades {
457 index: i,
458 prev_time: prev.timestamp,
459 prev_id: prev.agg_trade_id,
460 curr_time: curr.timestamp,
461 curr_id: curr.agg_trade_id,
462 });
463 }
464 }
465
466 Ok(())
467 }
468}
469
470#[derive(Clone)]
472struct RangeBarState {
473 pub bar: RangeBar,
475
476 pub upper_threshold: FixedPoint,
478
479 pub lower_threshold: FixedPoint,
481}
482
483impl RangeBarState {
484 fn new(trade: &AggTrade, threshold_decimal_bps: u32) -> Self {
486 let bar = RangeBar::new(trade);
487
488 let (upper_threshold, lower_threshold) =
490 bar.open.compute_range_thresholds(threshold_decimal_bps);
491
492 Self {
493 bar,
494 upper_threshold,
495 lower_threshold,
496 }
497 }
498}
499
500#[derive(Error, Debug)]
502pub enum ProcessingError {
503 #[error(
504 "Trades not sorted at index {index}: prev=({prev_time}, {prev_id}), curr=({curr_time}, {curr_id})"
505 )]
506 UnsortedTrades {
507 index: usize,
508 prev_time: i64,
509 prev_id: i64,
510 curr_time: i64,
511 curr_id: i64,
512 },
513
514 #[error("Empty trade data")]
515 EmptyData,
516
517 #[error(
518 "Invalid threshold: {threshold_decimal_bps} (decimal bps). Valid range: 1-100,000 (0.001%-100%)"
519 )]
520 InvalidThreshold { threshold_decimal_bps: u32 },
521}
522
523#[cfg(feature = "python")]
524impl From<ProcessingError> for PyErr {
525 fn from(err: ProcessingError) -> PyErr {
526 match err {
527 ProcessingError::UnsortedTrades {
528 index,
529 prev_time,
530 prev_id,
531 curr_time,
532 curr_id,
533 } => pyo3::exceptions::PyValueError::new_err(format!(
534 "Trades not sorted at index {}: prev=({}, {}), curr=({}, {})",
535 index, prev_time, prev_id, curr_time, curr_id
536 )),
537 ProcessingError::EmptyData => {
538 pyo3::exceptions::PyValueError::new_err("Empty trade data")
539 }
540 ProcessingError::InvalidThreshold {
541 threshold_decimal_bps,
542 } => pyo3::exceptions::PyValueError::new_err(format!(
543 "Invalid threshold: {} (decimal bps). Valid range: 1-100,000 (0.001%-100%)",
544 threshold_decimal_bps
545 )),
546 }
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use crate::test_utils::{self, scenarios};
554
555 #[test]
556 fn test_single_bar_no_breach() {
557 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::no_breach_sequence(250);
561
562 let bars = processor.process_agg_trade_records(&trades).unwrap();
564 assert_eq!(
565 bars.len(),
566 0,
567 "Strict algorithm should not create bars without breach"
568 );
569
570 let bars_with_incomplete = processor
572 .process_agg_trade_records_with_incomplete(&trades)
573 .unwrap();
574 assert_eq!(
575 bars_with_incomplete.len(),
576 1,
577 "Analysis mode should include incomplete bar"
578 );
579
580 let bar = &bars_with_incomplete[0];
581 assert_eq!(bar.open.to_string(), "50000.00000000");
582 assert_eq!(bar.high.to_string(), "50100.00000000");
583 assert_eq!(bar.low.to_string(), "49900.00000000");
584 assert_eq!(bar.close.to_string(), "49900.00000000");
585 }
586
587 #[test]
588 fn test_exact_breach_upward() {
589 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_upward(250);
592
593 let bars = processor.process_agg_trade_records(&trades).unwrap();
595 assert_eq!(
596 bars.len(),
597 1,
598 "Strict algorithm should only return completed bars"
599 );
600
601 let bar1 = &bars[0];
603 assert_eq!(bar1.open.to_string(), "50000.00000000");
604 assert_eq!(bar1.close.to_string(), "50125.00000000"); assert_eq!(bar1.high.to_string(), "50125.00000000");
607 assert_eq!(bar1.low.to_string(), "50000.00000000");
608
609 let bars_with_incomplete = processor
611 .process_agg_trade_records_with_incomplete(&trades)
612 .unwrap();
613 assert_eq!(
614 bars_with_incomplete.len(),
615 2,
616 "Analysis mode should include incomplete bars"
617 );
618
619 let bar2 = &bars_with_incomplete[1];
621 assert_eq!(bar2.open.to_string(), "50500.00000000"); assert_eq!(bar2.close.to_string(), "50500.00000000");
623 }
624
625 #[test]
626 fn test_exact_breach_downward() {
627 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_downward(250);
630
631 let bars = processor.process_agg_trade_records(&trades).unwrap();
632
633 assert_eq!(bars.len(), 1);
634
635 let bar = &bars[0];
636 assert_eq!(bar.open.to_string(), "50000.00000000");
637 assert_eq!(bar.close.to_string(), "49875.00000000"); assert_eq!(bar.high.to_string(), "50000.00000000");
639 assert_eq!(bar.low.to_string(), "49875.00000000");
640 }
641
642 #[test]
643 fn test_large_gap_single_bar() {
644 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::large_gap_sequence();
647
648 let bars = processor.process_agg_trade_records(&trades).unwrap();
649
650 assert_eq!(bars.len(), 1);
652
653 let bar = &bars[0];
654 assert_eq!(bar.open.to_string(), "50000.00000000");
655 assert_eq!(bar.close.to_string(), "51000.00000000");
656 assert_eq!(bar.high.to_string(), "51000.00000000");
657 assert_eq!(bar.low.to_string(), "50000.00000000");
658 }
659
660 #[test]
661 fn test_unsorted_trades_error() {
662 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::unsorted_sequence();
665
666 let result = processor.process_agg_trade_records(&trades);
667 assert!(result.is_err());
668
669 match result {
670 Err(ProcessingError::UnsortedTrades { index, .. }) => {
671 assert_eq!(index, 1);
672 }
673 _ => panic!("Expected UnsortedTrades error"),
674 }
675 }
676
677 #[test]
678 fn test_threshold_calculation() {
679 let processor = RangeBarProcessor::new(250).unwrap(); let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
682 let bar_state = RangeBarState::new(&trade, processor.threshold_decimal_bps);
683
684 assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
686 assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
687 }
688
689 #[test]
690 fn test_empty_trades() {
691 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::empty_sequence();
693 let bars = processor.process_agg_trade_records(&trades).unwrap();
694 assert_eq!(bars.len(), 0);
695 }
696
697 #[test]
698 fn test_debug_streaming_data() {
699 let mut processor = RangeBarProcessor::new(100).unwrap(); let trades = vec![
703 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
704 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
706 ];
707
708 println!("Test data prices: 50014 -> 50163 -> 50032");
709 println!("Expected price movements: +0.3% then -0.26%");
710
711 let bars = processor.process_agg_trade_records(&trades).unwrap();
712 println!("Generated {} range bars", bars.len());
713
714 for (i, bar) in bars.iter().enumerate() {
715 println!(
716 " Bar {}: O={} H={} L={} C={}",
717 i + 1,
718 bar.open,
719 bar.high,
720 bar.low,
721 bar.close
722 );
723 }
724
725 assert!(
727 !bars.is_empty(),
728 "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
729 );
730 }
731
732 #[test]
733 fn test_threshold_validation() {
734 assert!(RangeBarProcessor::new(250).is_ok());
736
737 assert!(matches!(
739 RangeBarProcessor::new(0),
740 Err(ProcessingError::InvalidThreshold {
741 threshold_decimal_bps: 0
742 })
743 ));
744
745 assert!(matches!(
747 RangeBarProcessor::new(150_000),
748 Err(ProcessingError::InvalidThreshold {
749 threshold_decimal_bps: 150_000
750 })
751 ));
752
753 assert!(RangeBarProcessor::new(1).is_ok());
755
756 assert!(RangeBarProcessor::new(100_000).is_ok());
758 }
759
760 #[test]
761 fn test_export_processor_with_manual_trades() {
762 println!("Testing ExportRangeBarProcessor with same trade data...");
763
764 let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); let trades = vec![
768 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
769 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
771 ];
772
773 println!(
774 "Processing {} trades with ExportRangeBarProcessor...",
775 trades.len()
776 );
777
778 export_processor.process_trades_continuously(&trades);
779 let bars = export_processor.get_all_completed_bars();
780
781 println!(
782 "ExportRangeBarProcessor generated {} range bars",
783 bars.len()
784 );
785 for (i, bar) in bars.iter().enumerate() {
786 println!(
787 " Bar {}: O={} H={} L={} C={}",
788 i + 1,
789 bar.open,
790 bar.high,
791 bar.low,
792 bar.close
793 );
794 }
795
796 assert!(
798 !bars.is_empty(),
799 "ExportRangeBarProcessor should generate same results as basic processor"
800 );
801 }
802
803 #[test]
806 fn test_checkpoint_creation() {
807 let mut processor = RangeBarProcessor::new(250).unwrap();
808
809 let trades = scenarios::no_breach_sequence(250);
811 let _bars = processor.process_agg_trade_records(&trades).unwrap();
812
813 let checkpoint = processor.create_checkpoint("BTCUSDT");
815
816 assert_eq!(checkpoint.symbol, "BTCUSDT");
817 assert_eq!(checkpoint.threshold_decimal_bps, 250);
818 assert!(checkpoint.has_incomplete_bar()); assert!(checkpoint.thresholds.is_some()); assert!(checkpoint.last_trade_id.is_some()); }
822
823 #[test]
824 fn test_checkpoint_serialization_roundtrip() {
825 let mut processor = RangeBarProcessor::new(250).unwrap();
826
827 let trades = scenarios::no_breach_sequence(250);
829 let _bars = processor.process_agg_trade_records(&trades).unwrap();
830
831 let checkpoint = processor.create_checkpoint("BTCUSDT");
833
834 let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
836
837 let restored: Checkpoint =
839 serde_json::from_str(&json).expect("Deserialization should succeed");
840
841 assert_eq!(restored.symbol, checkpoint.symbol);
842 assert_eq!(
843 restored.threshold_decimal_bps,
844 checkpoint.threshold_decimal_bps
845 );
846 assert_eq!(
847 restored.incomplete_bar.is_some(),
848 checkpoint.incomplete_bar.is_some()
849 );
850 }
851
852 #[test]
853 fn test_cross_file_bar_continuation() {
854 let mut all_trades = Vec::new();
859
860 let base_timestamp = 1640995200000000i64; for i in 0..20 {
866 let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
867 let trade = test_utils::create_test_agg_trade(
868 i + 1,
869 &format!("{:.8}", price),
870 "1.0",
871 base_timestamp + (i as i64 * 1000000),
872 );
873 all_trades.push(trade);
874 }
875
876 let mut processor_full = RangeBarProcessor::new(100).unwrap(); let bars_full = processor_full
879 .process_agg_trade_records(&all_trades)
880 .unwrap();
881
882 let split_point = 10; let mut processor_1 = RangeBarProcessor::new(100).unwrap();
887 let part1_trades = &all_trades[0..split_point];
888 let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
889
890 let checkpoint = processor_1.create_checkpoint("TEST");
892
893 let mut processor_2 = RangeBarProcessor::from_checkpoint(checkpoint).unwrap();
895 let part2_trades = &all_trades[split_point..];
896 let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
897
898 let split_total = bars_1.len() + bars_2.len();
901
902 println!("Full processing: {} bars", bars_full.len());
903 println!(
904 "Split processing: {} + {} = {} bars",
905 bars_1.len(),
906 bars_2.len(),
907 split_total
908 );
909
910 assert_eq!(
911 split_total,
912 bars_full.len(),
913 "Split processing should produce same bar count as full processing"
914 );
915
916 let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
918 for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
919 assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
920 assert_eq!(
921 full.close.0, split.close.0,
922 "Bar {} close price mismatch",
923 i
924 );
925 }
926 }
927
928 #[test]
929 fn test_verify_position_exact() {
930 let mut processor = RangeBarProcessor::new(250).unwrap();
931
932 let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
934 let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
935
936 let _ = processor.process_single_trade(trade1);
937 let _ = processor.process_single_trade(trade2);
938
939 let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
941
942 let verification = processor.verify_position(&next_trade);
944
945 assert_eq!(verification, PositionVerification::Exact);
946 }
947
948 #[test]
949 fn test_verify_position_gap() {
950 let mut processor = RangeBarProcessor::new(250).unwrap();
951
952 let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
954 let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
955
956 let _ = processor.process_single_trade(trade1);
957 let _ = processor.process_single_trade(trade2);
958
959 let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
961
962 let verification = processor.verify_position(&next_trade);
964
965 match verification {
966 PositionVerification::Gap {
967 expected_id,
968 actual_id,
969 missing_count,
970 } => {
971 assert_eq!(expected_id, 102);
972 assert_eq!(actual_id, 105);
973 assert_eq!(missing_count, 3);
974 }
975 _ => panic!("Expected Gap verification, got {:?}", verification),
976 }
977 }
978
979 #[test]
980 fn test_checkpoint_clean_completion() {
981 let mut processor = RangeBarProcessor::new(100).unwrap(); let trades = vec![
988 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
989 test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), ];
991
992 let bars = processor.process_agg_trade_records(&trades).unwrap();
993 assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
994
995 let checkpoint = processor.create_checkpoint("TEST");
998
999 assert!(
1001 !checkpoint.has_incomplete_bar(),
1002 "No incomplete bar when last trade was a breach with no following trade"
1003 );
1004 }
1005
1006 #[test]
1007 fn test_checkpoint_with_remainder() {
1008 let mut processor = RangeBarProcessor::new(100).unwrap(); let trades = vec![
1013 test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
1014 test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), ];
1017
1018 let bars = processor.process_agg_trade_records(&trades).unwrap();
1019 assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
1020
1021 let checkpoint = processor.create_checkpoint("TEST");
1023
1024 assert!(
1025 checkpoint.has_incomplete_bar(),
1026 "Should have incomplete bar from trade 3"
1027 );
1028
1029 let incomplete = checkpoint.incomplete_bar.unwrap();
1031 assert_eq!(
1032 incomplete.open.to_string(),
1033 "50110.00000000",
1034 "Incomplete bar should open at trade 3 price"
1035 );
1036 }
1037}
1038
1039#[derive(Debug, Clone)]
1041struct InternalRangeBar {
1042 open_time: i64,
1043 close_time: i64,
1044 open: FixedPoint,
1045 high: FixedPoint,
1046 low: FixedPoint,
1047 close: FixedPoint,
1048 volume: FixedPoint,
1049 turnover: i128,
1050 individual_trade_count: i64,
1051 agg_record_count: u32,
1052 first_trade_id: i64,
1053 last_trade_id: i64,
1054 buy_volume: FixedPoint,
1056 sell_volume: FixedPoint,
1058 buy_trade_count: i64,
1060 sell_trade_count: i64,
1062 vwap: FixedPoint,
1064 buy_turnover: i128,
1066 sell_turnover: i128,
1068}
1069
1070pub struct ExportRangeBarProcessor {
1075 threshold_decimal_bps: u32,
1076 current_bar: Option<InternalRangeBar>,
1077 completed_bars: Vec<RangeBar>,
1078}
1079
1080impl ExportRangeBarProcessor {
1081 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
1095 if threshold_decimal_bps < 1 {
1099 return Err(ProcessingError::InvalidThreshold {
1100 threshold_decimal_bps,
1101 });
1102 }
1103 if threshold_decimal_bps > 100_000 {
1104 return Err(ProcessingError::InvalidThreshold {
1105 threshold_decimal_bps,
1106 });
1107 }
1108
1109 Ok(Self {
1110 threshold_decimal_bps,
1111 current_bar: None,
1112 completed_bars: Vec::new(),
1113 })
1114 }
1115
1116 pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
1119 for trade in trades {
1120 self.process_single_trade_fixed_point(trade);
1121 }
1122 }
1123
1124 fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
1126 if self.current_bar.is_none() {
1127 let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
1129
1130 self.current_bar = Some(InternalRangeBar {
1131 open_time: trade.timestamp,
1132 close_time: trade.timestamp,
1133 open: trade.price,
1134 high: trade.price,
1135 low: trade.price,
1136 close: trade.price,
1137 volume: trade.volume,
1138 turnover: trade_turnover,
1139 individual_trade_count: 1,
1140 agg_record_count: 1,
1141 first_trade_id: trade.agg_trade_id,
1142 last_trade_id: trade.agg_trade_id,
1143 buy_volume: if trade.is_buyer_maker {
1145 FixedPoint(0)
1146 } else {
1147 trade.volume
1148 },
1149 sell_volume: if trade.is_buyer_maker {
1150 trade.volume
1151 } else {
1152 FixedPoint(0)
1153 },
1154 buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
1155 sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
1156 vwap: trade.price,
1157 buy_turnover: if trade.is_buyer_maker {
1158 0
1159 } else {
1160 trade_turnover
1161 },
1162 sell_turnover: if trade.is_buyer_maker {
1163 trade_turnover
1164 } else {
1165 0
1166 },
1167 });
1168 return;
1169 }
1170
1171 let bar = self.current_bar.as_mut().unwrap();
1174 let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
1175
1176 let price_val = trade.price.0;
1179 let bar_open_val = bar.open.0;
1180 let threshold_decimal_bps = self.threshold_decimal_bps as i64;
1181 let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
1182 let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
1183
1184 bar.close_time = trade.timestamp;
1186 bar.close = trade.price;
1187 bar.volume.0 += trade.volume.0;
1188 bar.turnover += trade_turnover;
1189 bar.individual_trade_count += 1;
1190 bar.agg_record_count += 1;
1191 bar.last_trade_id = trade.agg_trade_id;
1192
1193 if price_val > bar.high.0 {
1195 bar.high = trade.price;
1196 }
1197 if price_val < bar.low.0 {
1198 bar.low = trade.price;
1199 }
1200
1201 if trade.is_buyer_maker {
1203 bar.sell_volume.0 += trade.volume.0;
1204 bar.sell_turnover += trade_turnover;
1205 bar.sell_trade_count += 1;
1206 } else {
1207 bar.buy_volume.0 += trade.volume.0;
1208 bar.buy_turnover += trade_turnover;
1209 bar.buy_trade_count += 1;
1210 }
1211
1212 if price_val >= upper_threshold || price_val <= lower_threshold {
1214 let completed_bar = self.current_bar.take().unwrap();
1217
1218 let export_bar = RangeBar {
1220 open_time: completed_bar.open_time,
1221 close_time: completed_bar.close_time,
1222 open: completed_bar.open,
1223 high: completed_bar.high,
1224 low: completed_bar.low,
1225 close: completed_bar.close,
1226 volume: completed_bar.volume,
1227 turnover: completed_bar.turnover,
1228
1229 individual_trade_count: completed_bar.individual_trade_count as u32,
1231 agg_record_count: completed_bar.agg_record_count,
1232 first_trade_id: completed_bar.first_trade_id,
1233 last_trade_id: completed_bar.last_trade_id,
1234 data_source: crate::types::DataSource::default(),
1235
1236 buy_volume: completed_bar.buy_volume,
1238 sell_volume: completed_bar.sell_volume,
1239 buy_trade_count: completed_bar.buy_trade_count as u32,
1240 sell_trade_count: completed_bar.sell_trade_count as u32,
1241 vwap: completed_bar.vwap,
1242 buy_turnover: completed_bar.buy_turnover,
1243 sell_turnover: completed_bar.sell_turnover,
1244 };
1245
1246 self.completed_bars.push(export_bar);
1247
1248 let initial_buy_turnover = if trade.is_buyer_maker {
1250 0
1251 } else {
1252 trade_turnover
1253 };
1254 let initial_sell_turnover = if trade.is_buyer_maker {
1255 trade_turnover
1256 } else {
1257 0
1258 };
1259
1260 self.current_bar = Some(InternalRangeBar {
1261 open_time: trade.timestamp,
1262 close_time: trade.timestamp,
1263 open: trade.price,
1264 high: trade.price,
1265 low: trade.price,
1266 close: trade.price,
1267 volume: trade.volume,
1268 turnover: trade_turnover,
1269 individual_trade_count: 1,
1270 agg_record_count: 1,
1271 first_trade_id: trade.agg_trade_id,
1272 last_trade_id: trade.agg_trade_id,
1273 buy_volume: if trade.is_buyer_maker {
1275 FixedPoint(0)
1276 } else {
1277 trade.volume
1278 },
1279 sell_volume: if trade.is_buyer_maker {
1280 trade.volume
1281 } else {
1282 FixedPoint(0)
1283 },
1284 buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
1285 sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
1286 vwap: trade.price,
1287 buy_turnover: initial_buy_turnover,
1288 sell_turnover: initial_sell_turnover,
1289 });
1290 }
1291 }
1292
1293 pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
1296 std::mem::take(&mut self.completed_bars)
1297 }
1298
1299 pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
1301 self.current_bar.as_ref().map(|incomplete| RangeBar {
1302 open_time: incomplete.open_time,
1303 close_time: incomplete.close_time,
1304 open: incomplete.open,
1305 high: incomplete.high,
1306 low: incomplete.low,
1307 close: incomplete.close,
1308 volume: incomplete.volume,
1309 turnover: incomplete.turnover,
1310
1311 individual_trade_count: incomplete.individual_trade_count as u32,
1313 agg_record_count: incomplete.agg_record_count,
1314 first_trade_id: incomplete.first_trade_id,
1315 last_trade_id: incomplete.last_trade_id,
1316 data_source: crate::types::DataSource::default(),
1317
1318 buy_volume: incomplete.buy_volume,
1320 sell_volume: incomplete.sell_volume,
1321 buy_trade_count: incomplete.buy_trade_count as u32,
1322 sell_trade_count: incomplete.sell_trade_count as u32,
1323 vwap: incomplete.vwap,
1324 buy_turnover: incomplete.buy_turnover,
1325 sell_turnover: incomplete.sell_turnover,
1326 })
1327 }
1328}