1use crate::fixed_point::FixedPoint;
7use crate::types::{AggTrade, RangeBar};
8#[cfg(feature = "python")]
9use pyo3::prelude::*;
10use thiserror::Error;
11
12pub struct RangeBarProcessor {
14 threshold_bps: u32,
16
17 current_bar_state: Option<RangeBarState>,
20}
21
22impl RangeBarProcessor {
23 pub fn new(threshold_bps: u32) -> Result<Self, ProcessingError> {
37 if threshold_bps < 1 {
41 return Err(ProcessingError::InvalidThreshold { threshold_bps });
42 }
43 if threshold_bps > 100_000 {
44 return Err(ProcessingError::InvalidThreshold { threshold_bps });
45 }
46
47 Ok(Self {
48 threshold_bps,
49 current_bar_state: None,
50 })
51 }
52
53 pub fn process_single_trade(
72 &mut self,
73 trade: AggTrade,
74 ) -> Result<Option<RangeBar>, ProcessingError> {
75 match &mut self.current_bar_state {
76 None => {
77 self.current_bar_state = Some(RangeBarState::new(&trade, self.threshold_bps));
79 Ok(None)
80 }
81 Some(bar_state) => {
82 if bar_state.bar.is_breach(
84 trade.price,
85 bar_state.upper_threshold,
86 bar_state.lower_threshold,
87 ) {
88 bar_state.bar.update_with_trade(&trade);
90
91 debug_assert!(
93 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
94 );
95 debug_assert!(bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close));
96
97 let completed_bar = bar_state.bar.clone();
98
99 self.current_bar_state = Some(RangeBarState::new(&trade, self.threshold_bps));
101
102 Ok(Some(completed_bar))
103 } else {
104 bar_state.bar.update_with_trade(&trade);
106 Ok(None)
107 }
108 }
109 }
110 }
111
112 pub fn get_incomplete_bar(&self) -> Option<RangeBar> {
121 self.current_bar_state
122 .as_ref()
123 .map(|state| state.bar.clone())
124 }
125
126 pub fn process_agg_trade_records_with_incomplete(
141 &mut self,
142 agg_trade_records: &[AggTrade],
143 ) -> Result<Vec<RangeBar>, ProcessingError> {
144 self.process_agg_trade_records_with_options(agg_trade_records, true)
145 }
146
147 pub fn process_agg_trade_records(
162 &mut self,
163 agg_trade_records: &[AggTrade],
164 ) -> Result<Vec<RangeBar>, ProcessingError> {
165 self.process_agg_trade_records_with_options(agg_trade_records, false)
166 }
167
168 pub fn process_agg_trade_records_with_options(
182 &mut self,
183 agg_trade_records: &[AggTrade],
184 include_incomplete: bool,
185 ) -> Result<Vec<RangeBar>, ProcessingError> {
186 if agg_trade_records.is_empty() {
187 return Ok(Vec::new());
188 }
189
190 self.validate_trade_ordering(agg_trade_records)?;
192
193 self.current_bar_state = None;
195
196 let mut bars = Vec::with_capacity(agg_trade_records.len() / 100); let mut current_bar: Option<RangeBarState> = None;
198 let mut defer_open = false;
199
200 for agg_record in agg_trade_records {
201 if defer_open {
202 current_bar = Some(RangeBarState::new(agg_record, self.threshold_bps));
204 defer_open = false;
205 continue;
206 }
207
208 match current_bar {
209 None => {
210 current_bar = Some(RangeBarState::new(agg_record, self.threshold_bps));
212 }
213 Some(ref mut bar_state) => {
214 if bar_state.bar.is_breach(
216 agg_record.price,
217 bar_state.upper_threshold,
218 bar_state.lower_threshold,
219 ) {
220 bar_state.bar.update_with_trade(agg_record);
222
223 debug_assert!(
225 bar_state.bar.high >= bar_state.bar.open.max(bar_state.bar.close)
226 );
227 debug_assert!(
228 bar_state.bar.low <= bar_state.bar.open.min(bar_state.bar.close)
229 );
230
231 bars.push(bar_state.bar.clone());
232 current_bar = None;
233 defer_open = true; } else {
235 bar_state.bar.update_with_trade(agg_record);
237 }
238 }
239 }
240 }
241
242 if include_incomplete && let Some(bar_state) = current_bar {
245 bars.push(bar_state.bar);
246 }
247
248 Ok(bars)
249 }
250
251 fn validate_trade_ordering(&self, trades: &[AggTrade]) -> Result<(), ProcessingError> {
253 for i in 1..trades.len() {
254 let prev = &trades[i - 1];
255 let curr = &trades[i];
256
257 if curr.timestamp < prev.timestamp
259 || (curr.timestamp == prev.timestamp && curr.agg_trade_id <= prev.agg_trade_id)
260 {
261 return Err(ProcessingError::UnsortedTrades {
262 index: i,
263 prev_time: prev.timestamp,
264 prev_id: prev.agg_trade_id,
265 curr_time: curr.timestamp,
266 curr_id: curr.agg_trade_id,
267 });
268 }
269 }
270
271 Ok(())
272 }
273}
274
275struct RangeBarState {
277 pub bar: RangeBar,
279
280 pub upper_threshold: FixedPoint,
282
283 pub lower_threshold: FixedPoint,
285}
286
287impl RangeBarState {
288 fn new(trade: &AggTrade, threshold_bps: u32) -> Self {
290 let bar = RangeBar::new(trade);
291
292 let (upper_threshold, lower_threshold) = bar.open.compute_range_thresholds(threshold_bps);
294
295 Self {
296 bar,
297 upper_threshold,
298 lower_threshold,
299 }
300 }
301}
302
303#[derive(Error, Debug)]
305pub enum ProcessingError {
306 #[error(
307 "Trades not sorted at index {index}: prev=({prev_time}, {prev_id}), curr=({curr_time}, {curr_id})"
308 )]
309 UnsortedTrades {
310 index: usize,
311 prev_time: i64,
312 prev_id: i64,
313 curr_time: i64,
314 curr_id: i64,
315 },
316
317 #[error("Empty trade data")]
318 EmptyData,
319
320 #[error(
321 "Invalid threshold: {threshold_bps} (0.1bps units). Valid range: 1-100,000 (0.001%-100%)"
322 )]
323 InvalidThreshold { threshold_bps: u32 },
324}
325
326#[cfg(feature = "python")]
327impl From<ProcessingError> for PyErr {
328 fn from(err: ProcessingError) -> PyErr {
329 match err {
330 ProcessingError::UnsortedTrades {
331 index,
332 prev_time,
333 prev_id,
334 curr_time,
335 curr_id,
336 } => pyo3::exceptions::PyValueError::new_err(format!(
337 "Trades not sorted at index {}: prev=({}, {}), curr=({}, {})",
338 index, prev_time, prev_id, curr_time, curr_id
339 )),
340 ProcessingError::EmptyData => {
341 pyo3::exceptions::PyValueError::new_err("Empty trade data")
342 }
343 ProcessingError::InvalidThreshold { threshold_bps } => {
344 pyo3::exceptions::PyValueError::new_err(format!(
345 "Invalid threshold: {} (0.1bps units). Valid range: 1-100,000 (0.001%-100%)",
346 threshold_bps
347 ))
348 }
349 }
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use crate::test_utils::{self, scenarios};
357
358 #[test]
359 fn test_single_bar_no_breach() {
360 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::no_breach_sequence(250);
364
365 let bars = processor.process_agg_trade_records(&trades).unwrap();
367 assert_eq!(
368 bars.len(),
369 0,
370 "Strict algorithm should not create bars without breach"
371 );
372
373 let bars_with_incomplete = processor
375 .process_agg_trade_records_with_incomplete(&trades)
376 .unwrap();
377 assert_eq!(
378 bars_with_incomplete.len(),
379 1,
380 "Analysis mode should include incomplete bar"
381 );
382
383 let bar = &bars_with_incomplete[0];
384 assert_eq!(bar.open.to_string(), "50000.00000000");
385 assert_eq!(bar.high.to_string(), "50100.00000000");
386 assert_eq!(bar.low.to_string(), "49900.00000000");
387 assert_eq!(bar.close.to_string(), "49900.00000000");
388 }
389
390 #[test]
391 fn test_exact_breach_upward() {
392 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_upward(250);
395
396 let bars = processor.process_agg_trade_records(&trades).unwrap();
398 assert_eq!(
399 bars.len(),
400 1,
401 "Strict algorithm should only return completed bars"
402 );
403
404 let bar1 = &bars[0];
406 assert_eq!(bar1.open.to_string(), "50000.00000000");
407 assert_eq!(bar1.close.to_string(), "50125.00000000"); assert_eq!(bar1.high.to_string(), "50125.00000000");
410 assert_eq!(bar1.low.to_string(), "50000.00000000");
411
412 let bars_with_incomplete = processor
414 .process_agg_trade_records_with_incomplete(&trades)
415 .unwrap();
416 assert_eq!(
417 bars_with_incomplete.len(),
418 2,
419 "Analysis mode should include incomplete bars"
420 );
421
422 let bar2 = &bars_with_incomplete[1];
424 assert_eq!(bar2.open.to_string(), "50500.00000000"); assert_eq!(bar2.close.to_string(), "50500.00000000");
426 }
427
428 #[test]
429 fn test_exact_breach_downward() {
430 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::exact_breach_downward(250);
433
434 let bars = processor.process_agg_trade_records(&trades).unwrap();
435
436 assert_eq!(bars.len(), 1);
437
438 let bar = &bars[0];
439 assert_eq!(bar.open.to_string(), "50000.00000000");
440 assert_eq!(bar.close.to_string(), "49875.00000000"); assert_eq!(bar.high.to_string(), "50000.00000000");
442 assert_eq!(bar.low.to_string(), "49875.00000000");
443 }
444
445 #[test]
446 fn test_large_gap_single_bar() {
447 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::large_gap_sequence();
450
451 let bars = processor.process_agg_trade_records(&trades).unwrap();
452
453 assert_eq!(bars.len(), 1);
455
456 let bar = &bars[0];
457 assert_eq!(bar.open.to_string(), "50000.00000000");
458 assert_eq!(bar.close.to_string(), "51000.00000000");
459 assert_eq!(bar.high.to_string(), "51000.00000000");
460 assert_eq!(bar.low.to_string(), "50000.00000000");
461 }
462
463 #[test]
464 fn test_unsorted_trades_error() {
465 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::unsorted_sequence();
468
469 let result = processor.process_agg_trade_records(&trades);
470 assert!(result.is_err());
471
472 match result {
473 Err(ProcessingError::UnsortedTrades { index, .. }) => {
474 assert_eq!(index, 1);
475 }
476 _ => panic!("Expected UnsortedTrades error"),
477 }
478 }
479
480 #[test]
481 fn test_threshold_calculation() {
482 let processor = RangeBarProcessor::new(250).unwrap(); let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
485 let bar_state = RangeBarState::new(&trade, processor.threshold_bps);
486
487 assert_eq!(bar_state.upper_threshold.to_string(), "50125.00000000");
489 assert_eq!(bar_state.lower_threshold.to_string(), "49875.00000000");
490 }
491
492 #[test]
493 fn test_empty_trades() {
494 let mut processor = RangeBarProcessor::new(250).unwrap(); let trades = scenarios::empty_sequence();
496 let bars = processor.process_agg_trade_records(&trades).unwrap();
497 assert_eq!(bars.len(), 0);
498 }
499
500 #[test]
501 fn test_debug_streaming_data() {
502 let mut processor = RangeBarProcessor::new(100).unwrap(); let trades = vec![
506 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
507 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
509 ];
510
511 println!("Test data prices: 50014 -> 50163 -> 50032");
512 println!("Expected price movements: +0.3% then -0.26%");
513
514 let bars = processor.process_agg_trade_records(&trades).unwrap();
515 println!("Generated {} range bars", bars.len());
516
517 for (i, bar) in bars.iter().enumerate() {
518 println!(
519 " Bar {}: O={} H={} L={} C={}",
520 i + 1,
521 bar.open,
522 bar.high,
523 bar.low,
524 bar.close
525 );
526 }
527
528 assert!(
530 !bars.is_empty(),
531 "Expected at least 1 range bar with 0.3% price movement and 0.1% threshold"
532 );
533 }
534
535 #[test]
536 fn test_threshold_validation() {
537 assert!(RangeBarProcessor::new(250).is_ok());
539
540 assert!(matches!(
542 RangeBarProcessor::new(0),
543 Err(ProcessingError::InvalidThreshold { threshold_bps: 0 })
544 ));
545
546 assert!(matches!(
548 RangeBarProcessor::new(150_000),
549 Err(ProcessingError::InvalidThreshold {
550 threshold_bps: 150_000
551 })
552 ));
553
554 assert!(RangeBarProcessor::new(1).is_ok());
556
557 assert!(RangeBarProcessor::new(100_000).is_ok());
559 }
560
561 #[test]
562 fn test_export_processor_with_manual_trades() {
563 println!("Testing ExportRangeBarProcessor with same trade data...");
564
565 let mut export_processor = ExportRangeBarProcessor::new(100).unwrap(); let trades = vec![
569 test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
570 test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
572 ];
573
574 println!(
575 "Processing {} trades with ExportRangeBarProcessor...",
576 trades.len()
577 );
578
579 export_processor.process_trades_continuously(&trades);
580 let bars = export_processor.get_all_completed_bars();
581
582 println!(
583 "ExportRangeBarProcessor generated {} range bars",
584 bars.len()
585 );
586 for (i, bar) in bars.iter().enumerate() {
587 println!(
588 " Bar {}: O={} H={} L={} C={}",
589 i + 1,
590 bar.open,
591 bar.high,
592 bar.low,
593 bar.close
594 );
595 }
596
597 assert!(
599 !bars.is_empty(),
600 "ExportRangeBarProcessor should generate same results as basic processor"
601 );
602 }
603}
604
605#[derive(Debug, Clone)]
607struct InternalRangeBar {
608 open_time: i64,
609 close_time: i64,
610 open: FixedPoint,
611 high: FixedPoint,
612 low: FixedPoint,
613 close: FixedPoint,
614 volume: FixedPoint,
615 turnover: i128,
616 individual_trade_count: i64,
617 agg_record_count: u32,
618 first_trade_id: i64,
619 last_trade_id: i64,
620 buy_volume: FixedPoint,
622 sell_volume: FixedPoint,
624 buy_trade_count: i64,
626 sell_trade_count: i64,
628 vwap: FixedPoint,
630 buy_turnover: i128,
632 sell_turnover: i128,
634}
635
636pub struct ExportRangeBarProcessor {
641 threshold_bps: u32,
642 current_bar: Option<InternalRangeBar>,
643 completed_bars: Vec<RangeBar>,
644}
645
646impl ExportRangeBarProcessor {
647 pub fn new(threshold_bps: u32) -> Result<Self, ProcessingError> {
661 if threshold_bps < 1 {
665 return Err(ProcessingError::InvalidThreshold { threshold_bps });
666 }
667 if threshold_bps > 100_000 {
668 return Err(ProcessingError::InvalidThreshold { threshold_bps });
669 }
670
671 Ok(Self {
672 threshold_bps,
673 current_bar: None,
674 completed_bars: Vec::new(),
675 })
676 }
677
678 pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
681 for trade in trades {
682 self.process_single_trade_fixed_point(trade);
683 }
684 }
685
686 fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
688 if self.current_bar.is_none() {
689 let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
691
692 self.current_bar = Some(InternalRangeBar {
693 open_time: trade.timestamp,
694 close_time: trade.timestamp,
695 open: trade.price,
696 high: trade.price,
697 low: trade.price,
698 close: trade.price,
699 volume: trade.volume,
700 turnover: trade_turnover,
701 individual_trade_count: 1,
702 agg_record_count: 1,
703 first_trade_id: trade.agg_trade_id,
704 last_trade_id: trade.agg_trade_id,
705 buy_volume: if trade.is_buyer_maker {
707 FixedPoint(0)
708 } else {
709 trade.volume
710 },
711 sell_volume: if trade.is_buyer_maker {
712 trade.volume
713 } else {
714 FixedPoint(0)
715 },
716 buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
717 sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
718 vwap: trade.price,
719 buy_turnover: if trade.is_buyer_maker {
720 0
721 } else {
722 trade_turnover
723 },
724 sell_turnover: if trade.is_buyer_maker {
725 trade_turnover
726 } else {
727 0
728 },
729 });
730 return;
731 }
732
733 let bar = self.current_bar.as_mut().unwrap();
736 let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
737
738 let price_val = trade.price.0;
741 let bar_open_val = bar.open.0;
742 let threshold_bps = self.threshold_bps as i64;
743 let upper_threshold = bar_open_val + (bar_open_val * threshold_bps) / 100_000;
744 let lower_threshold = bar_open_val - (bar_open_val * threshold_bps) / 100_000;
745
746 bar.close_time = trade.timestamp;
748 bar.close = trade.price;
749 bar.volume.0 += trade.volume.0;
750 bar.turnover += trade_turnover;
751 bar.individual_trade_count += 1;
752 bar.agg_record_count += 1;
753 bar.last_trade_id = trade.agg_trade_id;
754
755 if price_val > bar.high.0 {
757 bar.high = trade.price;
758 }
759 if price_val < bar.low.0 {
760 bar.low = trade.price;
761 }
762
763 if trade.is_buyer_maker {
765 bar.sell_volume.0 += trade.volume.0;
766 bar.sell_turnover += trade_turnover;
767 bar.sell_trade_count += 1;
768 } else {
769 bar.buy_volume.0 += trade.volume.0;
770 bar.buy_turnover += trade_turnover;
771 bar.buy_trade_count += 1;
772 }
773
774 if price_val >= upper_threshold || price_val <= lower_threshold {
776 let completed_bar = self.current_bar.take().unwrap();
779
780 let export_bar = RangeBar {
782 open_time: completed_bar.open_time,
783 close_time: completed_bar.close_time,
784 open: completed_bar.open,
785 high: completed_bar.high,
786 low: completed_bar.low,
787 close: completed_bar.close,
788 volume: completed_bar.volume,
789 turnover: completed_bar.turnover,
790
791 individual_trade_count: completed_bar.individual_trade_count as u32,
793 agg_record_count: completed_bar.agg_record_count,
794 first_trade_id: completed_bar.first_trade_id,
795 last_trade_id: completed_bar.last_trade_id,
796 data_source: crate::types::DataSource::default(),
797
798 buy_volume: completed_bar.buy_volume,
800 sell_volume: completed_bar.sell_volume,
801 buy_trade_count: completed_bar.buy_trade_count as u32,
802 sell_trade_count: completed_bar.sell_trade_count as u32,
803 vwap: completed_bar.vwap,
804 buy_turnover: completed_bar.buy_turnover,
805 sell_turnover: completed_bar.sell_turnover,
806 };
807
808 self.completed_bars.push(export_bar);
809
810 let initial_buy_turnover = if trade.is_buyer_maker {
812 0
813 } else {
814 trade_turnover
815 };
816 let initial_sell_turnover = if trade.is_buyer_maker {
817 trade_turnover
818 } else {
819 0
820 };
821
822 self.current_bar = Some(InternalRangeBar {
823 open_time: trade.timestamp,
824 close_time: trade.timestamp,
825 open: trade.price,
826 high: trade.price,
827 low: trade.price,
828 close: trade.price,
829 volume: trade.volume,
830 turnover: trade_turnover,
831 individual_trade_count: 1,
832 agg_record_count: 1,
833 first_trade_id: trade.agg_trade_id,
834 last_trade_id: trade.agg_trade_id,
835 buy_volume: if trade.is_buyer_maker {
837 FixedPoint(0)
838 } else {
839 trade.volume
840 },
841 sell_volume: if trade.is_buyer_maker {
842 trade.volume
843 } else {
844 FixedPoint(0)
845 },
846 buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
847 sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
848 vwap: trade.price,
849 buy_turnover: initial_buy_turnover,
850 sell_turnover: initial_sell_turnover,
851 });
852 }
853 }
854
855 pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
858 std::mem::take(&mut self.completed_bars)
859 }
860
861 pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
863 self.current_bar.as_ref().map(|incomplete| RangeBar {
864 open_time: incomplete.open_time,
865 close_time: incomplete.close_time,
866 open: incomplete.open,
867 high: incomplete.high,
868 low: incomplete.low,
869 close: incomplete.close,
870 volume: incomplete.volume,
871 turnover: incomplete.turnover,
872
873 individual_trade_count: incomplete.individual_trade_count as u32,
875 agg_record_count: incomplete.agg_record_count,
876 first_trade_id: incomplete.first_trade_id,
877 last_trade_id: incomplete.last_trade_id,
878 data_source: crate::types::DataSource::default(),
879
880 buy_volume: incomplete.buy_volume,
882 sell_volume: incomplete.sell_volume,
883 buy_trade_count: incomplete.buy_trade_count as u32,
884 sell_trade_count: incomplete.sell_trade_count as u32,
885 vwap: incomplete.vwap,
886 buy_turnover: incomplete.buy_turnover,
887 sell_turnover: incomplete.sell_turnover,
888 })
889 }
890}