1use crate::fixed_point::FixedPoint;
7use crate::types::{AggTrade, RangeBar};
8#[cfg(feature = "python")]
9use pyo3::prelude::*;
10use thiserror::Error;
11
12pub struct RangeBarProcessor {
14 threshold_decimal_bps: u32,
16
17 current_bar_state: Option<RangeBarState>,
20}
21
22impl RangeBarProcessor {
23 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
37 if threshold_decimal_bps < 1 {
41 return Err(ProcessingError::InvalidThreshold {
42 threshold_decimal_bps,
43 });
44 }
45 if threshold_decimal_bps > 100_000 {
46 return Err(ProcessingError::InvalidThreshold {
47 threshold_decimal_bps,
48 });
49 }
50
51 Ok(Self {
52 threshold_decimal_bps,
53 current_bar_state: None,
54 })
55 }
56
57 pub fn process_single_trade(
76 &mut self,
77 trade: AggTrade,
78 ) -> Result<Option<RangeBar>, ProcessingError> {
79 match &mut self.current_bar_state {
80 None => {
81 self.current_bar_state =
83 Some(RangeBarState::new(&trade, self.threshold_decimal_bps));
84 Ok(None)
85 }
86 Some(bar_state) => {
87 if bar_state.bar.is_breach(
89 trade.price,
90 bar_state.upper_threshold,
91 bar_state.lower_threshold,
92 ) {
93 bar_state.bar.update_with_trade(&trade);
95
96 debug_assert!(
98 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
99 );
100 debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
101
102 let completed_bar = bar_state.bar.clone();
103
104 self.current_bar_state =
106 Some(RangeBarState::new(&trade, self.threshold_decimal_bps));
107
108 Ok(Some(completed_bar))
109 } else {
110 bar_state.bar.update_with_trade(&trade);
112 Ok(None)
113 }
114 }
115 }
116 }
117
118 pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
127 self.current_bar_state
128 .as_ref()
129 .map(|state| state.bar.clone())
130 }
131
132 pub fn process_agg_trade_records_with_incomplete(
147 &mut self,
148 agg_trade_records: &[AggTrade],
149 ) -> Result<Vec<RangeBar>, ProcessingError> {
150 self.process_agg_trade_records_with_options(agg_trade_records, true)
151 }
152
153 pub fn process_agg_trade_records(
168 &mut self,
169 agg_trade_records: &[AggTrade],
170 ) -> Result<Vec<RangeBar>, ProcessingError> {
171 self.process_agg_trade_records_with_options(agg_trade_records, false)
172 }
173
174 pub fn process_agg_trade_records_with_options(
188 &mut self,
189 agg_trade_records: &[AggTrade],
190 include_incomplete: bool,
191 ) -> Result<Vec<RangeBar>, ProcessingError> {
192 if agg_trade_records.is_empty() {
193 return Ok(Vec::new());
194 }
195
196 self.validate_trade_ordering(agg_trade_records)?;
198
199 self.current_bar_state = None;
201
202 let mut bars = Vec::with_capacity(agg_trade_records.len() / 100); let mut current_bar: Option<RangeBarState> = None;
204 let mut defer_open = false;
205
206 for agg_record in agg_trade_records {
207 if defer_open {
208 current_bar = Some(RangeBarState::new(agg_record, self.threshold_decimal_bps));
210 defer_open = false;
211 continue;
212 }
213
214 match current_bar {
215 None => {
216 current_bar = Some(RangeBarState::new(agg_record, self.threshold_decimal_bps));
218 }
219 Some(ref mut bar_state) => {
220 if bar_state.bar.is_breach(
222 agg_record.price,
223 bar_state.upper_threshold,
224 bar_state.lower_threshold,
225 ) {
226 bar_state.bar.update_with_trade(agg_record);
228
229 debug_assert!(
231 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
232 );
233 debug_assert!(
234 bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
235 );
236
237 bars.push(bar_state.bar.clone());
238 current_bar = None;
239 defer_open = true; } else {
241 bar_state.bar.update_with_trade(agg_record);
243 }
244 }
245 }
246 }
247
248 if include_incomplete && let Some(bar_state) = current_bar {
251 bars.push(bar_state.bar);
252 }
253
254 Ok(bars)
255 }
256
257 fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
259 for i in 1..trades.len() {
260 let prev = &trades[i - 1];
261 let curr = &trades[i];
262
263 if curr.timestamp < prev.timestamp
265 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
266 {
267 return Err(ProcessingError::UnsortedTrades {
268 index: i,
269 prev_time: prev.timestamp,
270 prev_id: prev.agg_trade_id,
271 curr_time: curr.timestamp,
272 curr_id: curr.agg_trade_id,
273 });
274 }
275 }
276
277 Ok(())
278 }
279}
280
281struct RangeBarState {
283 pub bar: RangeBar,
285
286 pub upper_threshold: FixedPoint,
288
289 pub lower_threshold: FixedPoint,
291}
292
293impl RangeBarState {
294 fn new(trade: &AggTrade, threshold_decimal_bps: u32) -> Self {
296 let bar = RangeBar::new(trade);
297
298 let (upper_threshold, lower_threshold) =
300 bar.open.compute_range_thresholds(threshold_decimal_bps);
301
302 Self {
303 bar,
304 upper_threshold,
305 lower_threshold,
306 }
307 }
308}
309
310#[derive(Error, Debug)]
312pub enum ProcessingError {
313 #[error(
314 "Trades not sorted at index {index}: prev=({prev_time}, {prev_id}), curr=({curr_time}, {curr_id})"
315 )]
316 UnsortedTrades {
317 index: usize,
318 prev_time: i64,
319 prev_id: i64,
320 curr_time: i64,
321 curr_id: i64,
322 },
323
324 #[error("Empty trade data")]
325 EmptyData,
326
327 #[error(
328 "Invalid threshold: {threshold_decimal_bps} (decimal bps). Valid range: 1-100,000 (0.001%-100%)"
329 )]
330 InvalidThreshold { threshold_decimal_bps: u32 },
331}
332
333#[cfg(feature = "python")]
334impl From<ProcessingError> for PyErr {
335 fn from(err: ProcessingError) -> PyErr {
336 match err {
337 ProcessingError::UnsortedTrades {
338 index,
339 prev_time,
340 prev_id,
341 curr_time,
342 curr_id,
343 } => pyo3::exceptions::PyValueError::new_err(format!(
344 "Trades not sorted at index {}: prev=({}, {}), curr=({}, {})",
345 index, prev_time, prev_id, curr_time, curr_id
346 )),
347 ProcessingError::EmptyData => {
348 pyo3::exceptions::PyValueError::new_err("Empty trade data")
349 }
350 ProcessingError::InvalidThreshold {
351 threshold_decimal_bps,
352 } => pyo3::exceptions::PyValueError::new_err(format!(
353 "Invalid threshold: {} (decimal bps). Valid range: 1-100,000 (0.001%-100%)",
354 threshold_decimal_bps
355 )),
356 }
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use crate::test_utils::{self, scenarios};
364
365 #[test]
366 fn test_single_bar_no_breach() {
367 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::no_breach_sequence(250);
371
372 let bars = processor.process_agg_trade_records(&trades).unwrap();
374 assert_eq!(
375 bars.len(),
376 0,
377 "Strict algorithm should not create bars without breach"
378 );
379
380 let bars_with_incomplete = processor
382 .process_agg_trade_records_with_incomplete(&trades)
383 .unwrap();
384 assert_eq!(
385 bars_with_incomplete.len(),
386 1,
387 "Analysis mode should include incomplete bar"
388 );
389
390 let bar = &bars_with_incomplete[0];
391 assert_eq!(bar.open.to_string(), "50000.00000000");
392 assert_eq!(bar.high.to_string(), "50100.00000000");
393 assert_eq!(bar.low.to_string(), "49900.00000000");
394 assert_eq!(bar.close.to_string(), "49900.00000000");
395 }
396
397 #[test]
398 fn test_exact_breach_upward() {
399 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_upward(250);
402
403 let bars = processor.process_agg_trade_records(&trades).unwrap();
405 assert_eq!(
406 bars.len(),
407 1,
408 "Strict algorithm should only return completed bars"
409 );
410
411 let bar1 = &bars[0];
413 assert_eq!(bar1.open.to_string(), "50000.00000000");
414 assert_eq!(bar1.close.to_string(), "50125.00000000"); assert_eq!(bar1.high.to_string(), "50125.00000000");
417 assert_eq!(bar1.low.to_string(), "50000.00000000");
418
419 let bars_with_incomplete = processor
421 .process_agg_trade_records_with_incomplete(&trades)
422 .unwrap();
423 assert_eq!(
424 bars_with_incomplete.len(),
425 2,
426 "Analysis mode should include incomplete bars"
427 );
428
429 let bar2 = &bars_with_incomplete[1];
431 assert_eq!(bar2.open.to_string(), "50500.00000000"); assert_eq!(bar2.close.to_string(), "50500.00000000");
433 }
434
435 #[test]
436 fn test_exact_breach_downward() {
437 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_downward(250);
440
441 let bars = processor.process_agg_trade_records(&trades).unwrap();
442
443 assert_eq!(bars.len(), 1);
444
445 let bar = &bars[0];
446 assert_eq!(bar.open.to_string(), "50000.00000000");
447 assert_eq!(bar.close.to_string(), "49875.00000000"); assert_eq!(bar.high.to_string(), "50000.00000000");
449 assert_eq!(bar.low.to_string(), "49875.00000000");
450 }
451
452 #[test]
453 fn test_large_gap_single_bar() {
454 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::large_gap_sequence();
457
458 let bars = processor.process_agg_trade_records(&trades).unwrap();
459
460 assert_eq!(bars.len(), 1);
462
463 let bar = &bars[0];
464 assert_eq!(bar.open.to_string(), "50000.00000000");
465 assert_eq!(bar.close.to_string(), "51000.00000000");
466 assert_eq!(bar.high.to_string(), "51000.00000000");
467 assert_eq!(bar.low.to_string(), "50000.00000000");
468 }
469
470 #[test]
471 fn test_unsorted_trades_error() {
472 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::unsorted_sequence();
475
476 let result = processor.process_agg_trade_records(&trades);
477 assert!(result.is_err());
478
479 match result {
480 Err(ProcessingError::UnsortedTrades { index, .. }) => {
481 assert_eq!(index, 1);
482 }
483 _ => panic!("Expected UnsortedTrades error"),
484 }
485 }
486
487 #[test]
488 fn test_threshold_calculation() {
489 let processor = RangeBarProcessor::new(250).unwrap(); let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
492 let bar_state = RangeBarState::new(&trade, processor.threshold_decimal_bps);
493
494 assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
496 assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
497 }
498
499 #[test]
500 fn test_empty_trades() {
501 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::empty_sequence();
503 let bars = processor.process_agg_trade_records(&trades).unwrap();
504 assert_eq!(bars.len(), 0);
505 }
506
507 #[test]
508 fn test_debug_streaming_data() {
509 let mut processor = RangeBarProcessor::new(100).unwrap(); let trades = vec![
513 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
514 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
516 ];
517
518 println!("Test data prices: 50014 -> 50163 -> 50032");
519 println!("Expected price movements: +0.3% then -0.26%");
520
521 let bars = processor.process_agg_trade_records(&trades).unwrap();
522 println!("Generated {} range bars", bars.len());
523
524 for (i, bar) in bars.iter().enumerate() {
525 println!(
526 " Bar {}: O={} H={} L={} C={}",
527 i + 1,
528 bar.open,
529 bar.high,
530 bar.low,
531 bar.close
532 );
533 }
534
535 assert!(
537 !bars.is_empty(),
538 "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
539 );
540 }
541
542 #[test]
543 fn test_threshold_validation() {
544 assert!(RangeBarProcessor::new(250).is_ok());
546
547 assert!(matches!(
549 RangeBarProcessor::new(0),
550 Err(ProcessingError::InvalidThreshold {
551 threshold_decimal_bps: 0
552 })
553 ));
554
555 assert!(matches!(
557 RangeBarProcessor::new(150_000),
558 Err(ProcessingError::InvalidThreshold {
559 threshold_decimal_bps: 150_000
560 })
561 ));
562
563 assert!(RangeBarProcessor::new(1).is_ok());
565
566 assert!(RangeBarProcessor::new(100_000).is_ok());
568 }
569
570 #[test]
571 fn test_export_processor_with_manual_trades() {
572 println!("Testing ExportRangeBarProcessor with same trade data...");
573
574 let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); let trades = vec![
578 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
579 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
581 ];
582
583 println!(
584 "Processing {} trades with ExportRangeBarProcessor...",
585 trades.len()
586 );
587
588 export_processor.process_trades_continuously(&trades);
589 let bars = export_processor.get_all_completed_bars();
590
591 println!(
592 "ExportRangeBarProcessor generated {} range bars",
593 bars.len()
594 );
595 for (i, bar) in bars.iter().enumerate() {
596 println!(
597 " Bar {}: O={} H={} L={} C={}",
598 i + 1,
599 bar.open,
600 bar.high,
601 bar.low,
602 bar.close
603 );
604 }
605
606 assert!(
608 !bars.is_empty(),
609 "ExportRangeBarProcessor should generate same results as basic processor"
610 );
611 }
612}
613
614#[derive(Debug, Clone)]
616struct InternalRangeBar {
617 open_time: i64,
618 close_time: i64,
619 open: FixedPoint,
620 high: FixedPoint,
621 low: FixedPoint,
622 close: FixedPoint,
623 volume: FixedPoint,
624 turnover: i128,
625 individual_trade_count: i64,
626 agg_record_count: u32,
627 first_trade_id: i64,
628 last_trade_id: i64,
629 buy_volume: FixedPoint,
631 sell_volume: FixedPoint,
633 buy_trade_count: i64,
635 sell_trade_count: i64,
637 vwap: FixedPoint,
639 buy_turnover: i128,
641 sell_turnover: i128,
643}
644
645pub struct ExportRangeBarProcessor {
650 threshold_decimal_bps: u32,
651 current_bar: Option<InternalRangeBar>,
652 completed_bars: Vec<RangeBar>,
653}
654
655impl ExportRangeBarProcessor {
656 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
670 if threshold_decimal_bps < 1 {
674 return Err(ProcessingError::InvalidThreshold {
675 threshold_decimal_bps,
676 });
677 }
678 if threshold_decimal_bps > 100_000 {
679 return Err(ProcessingError::InvalidThreshold {
680 threshold_decimal_bps,
681 });
682 }
683
684 Ok(Self {
685 threshold_decimal_bps,
686 current_bar: None,
687 completed_bars: Vec::new(),
688 })
689 }
690
691 pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
694 for trade in trades {
695 self.process_single_trade_fixed_point(trade);
696 }
697 }
698
699 fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
701 if self.current_bar.is_none() {
702 let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
704
705 self.current_bar = Some(InternalRangeBar {
706 open_time: trade.timestamp,
707 close_time: trade.timestamp,
708 open: trade.price,
709 high: trade.price,
710 low: trade.price,
711 close: trade.price,
712 volume: trade.volume,
713 turnover: trade_turnover,
714 individual_trade_count: 1,
715 agg_record_count: 1,
716 first_trade_id: trade.agg_trade_id,
717 last_trade_id: trade.agg_trade_id,
718 buy_volume: if trade.is_buyer_maker {
720 FixedPoint(0)
721 } else {
722 trade.volume
723 },
724 sell_volume: if trade.is_buyer_maker {
725 trade.volume
726 } else {
727 FixedPoint(0)
728 },
729 buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
730 sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
731 vwap: trade.price,
732 buy_turnover: if trade.is_buyer_maker {
733 0
734 } else {
735 trade_turnover
736 },
737 sell_turnover: if trade.is_buyer_maker {
738 trade_turnover
739 } else {
740 0
741 },
742 });
743 return;
744 }
745
746 let bar = self.current_bar.as_mut().unwrap();
749 let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
750
751 let price_val = trade.price.0;
754 let bar_open_val = bar.open.0;
755 let threshold_decimal_bps = self.threshold_decimal_bps as i64;
756 let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
757 let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
758
759 bar.close_time = trade.timestamp;
761 bar.close = trade.price;
762 bar.volume.0 += trade.volume.0;
763 bar.turnover += trade_turnover;
764 bar.individual_trade_count += 1;
765 bar.agg_record_count += 1;
766 bar.last_trade_id = trade.agg_trade_id;
767
768 if price_val > bar.high.0 {
770 bar.high = trade.price;
771 }
772 if price_val < bar.low.0 {
773 bar.low = trade.price;
774 }
775
776 if trade.is_buyer_maker {
778 bar.sell_volume.0 += trade.volume.0;
779 bar.sell_turnover += trade_turnover;
780 bar.sell_trade_count += 1;
781 } else {
782 bar.buy_volume.0 += trade.volume.0;
783 bar.buy_turnover += trade_turnover;
784 bar.buy_trade_count += 1;
785 }
786
787 if price_val >= upper_threshold || price_val <= lower_threshold {
789 let completed_bar = self.current_bar.take().unwrap();
792
793 let export_bar = RangeBar {
795 open_time: completed_bar.open_time,
796 close_time: completed_bar.close_time,
797 open: completed_bar.open,
798 high: completed_bar.high,
799 low: completed_bar.low,
800 close: completed_bar.close,
801 volume: completed_bar.volume,
802 turnover: completed_bar.turnover,
803
804 individual_trade_count: completed_bar.individual_trade_count as u32,
806 agg_record_count: completed_bar.agg_record_count,
807 first_trade_id: completed_bar.first_trade_id,
808 last_trade_id: completed_bar.last_trade_id,
809 data_source: crate::types::DataSource::default(),
810
811 buy_volume: completed_bar.buy_volume,
813 sell_volume: completed_bar.sell_volume,
814 buy_trade_count: completed_bar.buy_trade_count as u32,
815 sell_trade_count: completed_bar.sell_trade_count as u32,
816 vwap: completed_bar.vwap,
817 buy_turnover: completed_bar.buy_turnover,
818 sell_turnover: completed_bar.sell_turnover,
819 };
820
821 self.completed_bars.push(export_bar);
822
823 let initial_buy_turnover = if trade.is_buyer_maker {
825 0
826 } else {
827 trade_turnover
828 };
829 let initial_sell_turnover = if trade.is_buyer_maker {
830 trade_turnover
831 } else {
832 0
833 };
834
835 self.current_bar = Some(InternalRangeBar {
836 open_time: trade.timestamp,
837 close_time: trade.timestamp,
838 open: trade.price,
839 high: trade.price,
840 low: trade.price,
841 close: trade.price,
842 volume: trade.volume,
843 turnover: trade_turnover,
844 individual_trade_count: 1,
845 agg_record_count: 1,
846 first_trade_id: trade.agg_trade_id,
847 last_trade_id: trade.agg_trade_id,
848 buy_volume: if trade.is_buyer_maker {
850 FixedPoint(0)
851 } else {
852 trade.volume
853 },
854 sell_volume: if trade.is_buyer_maker {
855 trade.volume
856 } else {
857 FixedPoint(0)
858 },
859 buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
860 sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
861 vwap: trade.price,
862 buy_turnover: initial_buy_turnover,
863 sell_turnover: initial_sell_turnover,
864 });
865 }
866 }
867
868 pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
871 std::mem::take(&mut self.completed_bars)
872 }
873
874 pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
876 self.current_bar.as_ref().map(|incomplete| RangeBar {
877 open_time: incomplete.open_time,
878 close_time: incomplete.close_time,
879 open: incomplete.open,
880 high: incomplete.high,
881 low: incomplete.low,
882 close: incomplete.close,
883 volume: incomplete.volume,
884 turnover: incomplete.turnover,
885
886 individual_trade_count: incomplete.individual_trade_count as u32,
888 agg_record_count: incomplete.agg_record_count,
889 first_trade_id: incomplete.first_trade_id,
890 last_trade_id: incomplete.last_trade_id,
891 data_source: crate::types::DataSource::default(),
892
893 buy_volume: incomplete.buy_volume,
895 sell_volume: incomplete.sell_volume,
896 buy_trade_count: incomplete.buy_trade_count as u32,
897 sell_trade_count: incomplete.sell_trade_count as u32,
898 vwap: incomplete.vwap,
899 buy_turnover: incomplete.buy_turnover,
900 sell_turnover: incomplete.sell_turnover,
901 })
902 }
903}