1use crate::errors::ProcessingError;
5use crate::fixed_point::FixedPoint;
6use crate::types::{AggTrade, RangeBar};
7
8#[derive(Debug, Clone)]
10pub(crate) struct InternalRangeBar {
11 open_time: i64,
12 close_time: i64,
13 open: FixedPoint,
14 high: FixedPoint,
15 low: FixedPoint,
16 close: FixedPoint,
17 volume: i128,
20 turnover: i128,
21 individual_trade_count: i64,
22 agg_record_count: u32,
23 first_trade_id: i64,
24 last_trade_id: i64,
25 first_agg_trade_id: i64,
27 last_agg_trade_id: i64,
29 buy_volume: i128,
32 sell_volume: i128,
35 buy_trade_count: i64,
37 sell_trade_count: i64,
39 vwap: FixedPoint,
41 buy_turnover: i128,
43 sell_turnover: i128,
45}
46
47pub struct ExportRangeBarProcessor {
52 threshold_decimal_bps: u32,
53 current_bar: Option<InternalRangeBar>,
54 completed_bars: Vec<RangeBar>,
55 completed_bars_pool: Option<Vec<RangeBar>>,
57 prevent_same_timestamp_close: bool,
59 defer_open: bool,
61}
62
63impl ExportRangeBarProcessor {
64 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
80 Self::with_options(threshold_decimal_bps, true)
81 }
82
83 pub fn with_options(
85 threshold_decimal_bps: u32,
86 prevent_same_timestamp_close: bool,
87 ) -> Result<Self, ProcessingError> {
88 if threshold_decimal_bps < 1 {
92 return Err(ProcessingError::InvalidThreshold {
93 threshold_decimal_bps,
94 });
95 }
96 if threshold_decimal_bps > 100_000 {
97 return Err(ProcessingError::InvalidThreshold {
98 threshold_decimal_bps,
99 });
100 }
101
102 Ok(Self {
103 threshold_decimal_bps,
104 current_bar: None,
105 completed_bars: Vec::new(),
106 completed_bars_pool: None,
107 prevent_same_timestamp_close,
108 defer_open: false,
109 })
110 }
111
112 pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
115 for trade in trades {
116 self.process_single_trade_fixed_point(trade);
117 }
118 }
119
120 fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
122 if self.defer_open {
125 self.defer_open = false;
126 self.current_bar = None; }
129
130 if self.current_bar.is_none() {
131 let trade_turnover = trade.turnover();
134 let vol = trade.volume.0 as i128;
135
136 let (buy_vol, sell_vol, buy_count, sell_count, buy_turn, sell_turn) =
138 if trade.is_buyer_maker {
139 (0i128, vol, 0i64, 1i64, 0i128, trade_turnover)
140 } else {
141 (vol, 0i128, 1i64, 0i64, trade_turnover, 0i128)
142 };
143
144 self.current_bar = Some(InternalRangeBar {
145 open_time: trade.timestamp,
146 close_time: trade.timestamp,
147 open: trade.price,
148 high: trade.price,
149 low: trade.price,
150 close: trade.price,
151 volume: vol,
153 turnover: trade_turnover,
154 individual_trade_count: 1,
155 agg_record_count: 1,
156 first_trade_id: trade.first_trade_id,
157 last_trade_id: trade.last_trade_id,
158 first_agg_trade_id: trade.agg_trade_id,
160 last_agg_trade_id: trade.agg_trade_id,
161 buy_volume: buy_vol,
163 sell_volume: sell_vol,
164 buy_trade_count: buy_count,
165 sell_trade_count: sell_count,
166 vwap: trade.price,
167 buy_turnover: buy_turn,
168 sell_turnover: sell_turn,
169 });
170 return;
171 }
172
173 let bar = self.current_bar.as_mut().unwrap();
176 let trade_turnover = trade.turnover();
178
179 let price_val = trade.price.0;
182 let bar_open_val = bar.open.0;
183 let threshold_decimal_bps = self.threshold_decimal_bps as i64;
184 let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
185 let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
186
187 bar.close_time = trade.timestamp;
189 bar.close = trade.price;
190 bar.volume += trade.volume.0 as i128; bar.turnover += trade_turnover;
192 bar.individual_trade_count += 1;
193 bar.agg_record_count += 1;
194 bar.last_trade_id = trade.last_trade_id;
195 bar.last_agg_trade_id = trade.agg_trade_id; if price_val > bar.high.0 {
199 bar.high = trade.price;
200 }
201 if price_val < bar.low.0 {
202 bar.low = trade.price;
203 }
204
205 if trade.is_buyer_maker {
207 bar.sell_volume += trade.volume.0 as i128; bar.sell_turnover += trade_turnover;
209 bar.sell_trade_count += 1;
210 } else {
211 bar.buy_volume += trade.volume.0 as i128; bar.buy_turnover += trade_turnover;
213 bar.buy_trade_count += 1;
214 }
215
216 let price_breaches = price_val >= upper_threshold || price_val <= lower_threshold;
218
219 let timestamp_allows_close =
221 !self.prevent_same_timestamp_close || trade.timestamp != bar.open_time;
222
223 if price_breaches && timestamp_allows_close {
224 let completed_bar = self.current_bar.take().unwrap();
227
228 let mut export_bar = RangeBar {
231 open_time: completed_bar.open_time,
232 close_time: completed_bar.close_time,
233 open: completed_bar.open,
234 high: completed_bar.high,
235 low: completed_bar.low,
236 close: completed_bar.close,
237 volume: completed_bar.volume,
238 turnover: completed_bar.turnover,
239 individual_trade_count: completed_bar.individual_trade_count as u32,
240 agg_record_count: completed_bar.agg_record_count,
241 first_trade_id: completed_bar.first_trade_id,
242 last_trade_id: completed_bar.last_trade_id,
243 first_agg_trade_id: completed_bar.first_agg_trade_id, last_agg_trade_id: completed_bar.last_agg_trade_id,
245 buy_volume: completed_bar.buy_volume,
246 sell_volume: completed_bar.sell_volume,
247 buy_trade_count: completed_bar.buy_trade_count as u32,
248 sell_trade_count: completed_bar.sell_trade_count as u32,
249 vwap: completed_bar.vwap,
250 buy_turnover: completed_bar.buy_turnover,
251 sell_turnover: completed_bar.sell_turnover,
252 ..Default::default() };
254
255 export_bar.compute_microstructure_features();
257
258 self.completed_bars.push(export_bar);
259
260 self.current_bar = None;
263 self.defer_open = true;
264 }
265 }
266
267 pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
270 let mut result = if let Some(mut pool_vec) = self.completed_bars_pool.take() {
272 pool_vec.clear();
274 pool_vec
275 } else {
276 Vec::new()
278 };
279
280 std::mem::swap(&mut result, &mut self.completed_bars);
282
283 self.completed_bars_pool = Some(std::mem::take(&mut self.completed_bars));
285
286 result
287 }
288
289 pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
291 self.current_bar.as_ref().map(|incomplete| {
292 let mut bar = RangeBar {
293 open_time: incomplete.open_time,
294 close_time: incomplete.close_time,
295 open: incomplete.open,
296 high: incomplete.high,
297 low: incomplete.low,
298 close: incomplete.close,
299 volume: incomplete.volume,
300 turnover: incomplete.turnover,
301
302 individual_trade_count: incomplete.individual_trade_count as u32,
304 agg_record_count: incomplete.agg_record_count,
305 first_trade_id: incomplete.first_trade_id,
306 last_trade_id: incomplete.last_trade_id,
307 first_agg_trade_id: incomplete.first_agg_trade_id,
308 last_agg_trade_id: incomplete.last_agg_trade_id,
309 data_source: crate::types::DataSource::default(),
310
311 buy_volume: incomplete.buy_volume,
313 sell_volume: incomplete.sell_volume,
314 buy_trade_count: incomplete.buy_trade_count as u32,
315 sell_trade_count: incomplete.sell_trade_count as u32,
316 vwap: incomplete.vwap,
317 buy_turnover: incomplete.buy_turnover,
318 sell_turnover: incomplete.sell_turnover,
319
320 ..Default::default()
322 };
323 bar.compute_microstructure_features();
325 bar
326 })
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::test_utils::create_test_agg_trade_with_range;
334
335 fn buy_trade(id: i64, price: &str, vol: &str, ts: i64) -> AggTrade {
337 create_test_agg_trade_with_range(id, price, vol, ts, id * 10, id * 10, false)
338 }
339
340 fn sell_trade(id: i64, price: &str, vol: &str, ts: i64) -> AggTrade {
342 create_test_agg_trade_with_range(id, price, vol, ts, id * 10, id * 10, true)
343 }
344
345 #[test]
346 fn test_new_valid_threshold() {
347 let proc = ExportRangeBarProcessor::new(250);
348 assert!(proc.is_ok());
349 }
350
351 #[test]
352 fn test_new_invalid_threshold_zero() {
353 match ExportRangeBarProcessor::new(0) {
354 Err(ProcessingError::InvalidThreshold {
355 threshold_decimal_bps: 0,
356 }) => {}
357 Err(e) => panic!("Expected InvalidThreshold(0), got error: {e}"),
358 Ok(_) => panic!("Expected error for threshold 0"),
359 }
360 }
361
362 #[test]
363 fn test_new_invalid_threshold_too_high() {
364 let proc = ExportRangeBarProcessor::new(100_001);
365 assert!(proc.is_err());
366 }
367
368 #[test]
369 fn test_new_boundary_thresholds() {
370 assert!(ExportRangeBarProcessor::new(1).is_ok());
372 assert!(ExportRangeBarProcessor::new(100_000).is_ok());
374 }
375
376 #[test]
377 fn test_with_options_timestamp_gating() {
378 let proc = ExportRangeBarProcessor::with_options(250, false);
379 assert!(proc.is_ok());
380 }
381
382 #[test]
383 fn test_single_trade_no_bar_completion() {
384 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
385 let trades = vec![buy_trade(1, "100.0", "1.0", 1000)];
386 proc.process_trades_continuously(&trades);
387
388 let completed = proc.get_all_completed_bars();
389 assert_eq!(completed.len(), 0, "Single trade should not complete a bar");
390
391 let incomplete = proc.get_incomplete_bar();
392 assert!(incomplete.is_some(), "Should have an incomplete bar");
393 let bar = incomplete.unwrap();
394 assert_eq!(bar.open, FixedPoint::from_str("100.0").unwrap());
395 assert_eq!(bar.close, FixedPoint::from_str("100.0").unwrap());
396 }
397
398 #[test]
399 fn test_breach_completes_bar() {
400 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
402 let trades = vec![
403 buy_trade(1, "100.0", "1.0", 1000),
404 buy_trade(2, "100.10", "1.0", 2000),
405 buy_trade(3, "100.25", "1.0", 3000), ];
407 proc.process_trades_continuously(&trades);
408
409 let completed = proc.get_all_completed_bars();
410 assert_eq!(completed.len(), 1, "Breach should complete one bar");
411
412 let bar = &completed[0];
413 assert_eq!(bar.open, FixedPoint::from_str("100.0").unwrap());
414 assert_eq!(bar.close, FixedPoint::from_str("100.25").unwrap());
415 assert_eq!(bar.high, FixedPoint::from_str("100.25").unwrap());
416 assert_eq!(bar.low, FixedPoint::from_str("100.0").unwrap());
417 }
418
419 #[test]
420 fn test_defer_open_semantics() {
421 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
423 let trades = vec![
424 buy_trade(1, "100.0", "1.0", 1000),
425 buy_trade(2, "100.25", "1.0", 2000), buy_trade(3, "100.50", "1.0", 3000), ];
428 proc.process_trades_continuously(&trades);
429
430 let completed = proc.get_all_completed_bars();
431 assert_eq!(completed.len(), 1);
432 assert_eq!(completed[0].open, FixedPoint::from_str("100.0").unwrap());
434 assert_eq!(completed[0].close, FixedPoint::from_str("100.25").unwrap());
435
436 let incomplete = proc.get_incomplete_bar();
438 assert!(incomplete.is_some());
439 let bar2 = incomplete.unwrap();
440 assert_eq!(
441 bar2.open,
442 FixedPoint::from_str("100.50").unwrap(),
443 "Bar 2 should open at trade 3's price, not the breaching trade"
444 );
445 }
446
447 #[test]
448 fn test_timestamp_gate_prevents_same_ts_close() {
449 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
451 let trades = vec![
452 buy_trade(1, "100.0", "1.0", 1000),
453 buy_trade(2, "100.30", "1.0", 1000), ];
455 proc.process_trades_continuously(&trades);
456
457 let completed = proc.get_all_completed_bars();
458 assert_eq!(
459 completed.len(),
460 0,
461 "Timestamp gate should prevent close on same ms"
462 );
463 }
464
465 #[test]
466 fn test_timestamp_gate_disabled() {
467 let mut proc = ExportRangeBarProcessor::with_options(250, false).unwrap();
469 let trades = vec![
470 buy_trade(1, "100.0", "1.0", 1000),
471 buy_trade(2, "100.30", "1.0", 1000), ];
473 proc.process_trades_continuously(&trades);
474
475 let completed = proc.get_all_completed_bars();
476 assert_eq!(
477 completed.len(),
478 1,
479 "With gating disabled, same-ts breach should close"
480 );
481 }
482
483 #[test]
484 fn test_get_all_completed_bars_drains() {
485 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
486 let trades = vec![
487 buy_trade(1, "100.0", "1.0", 1000),
488 buy_trade(2, "100.25", "1.0", 2000), ];
490 proc.process_trades_continuously(&trades);
491
492 let bars1 = proc.get_all_completed_bars();
493 assert_eq!(bars1.len(), 1);
494
495 let bars2 = proc.get_all_completed_bars();
497 assert_eq!(bars2.len(), 0, "get_all_completed_bars should drain buffer");
498 }
499
500 #[test]
501 fn test_vec_reuse_pool() {
502 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
503
504 proc.process_trades_continuously(&[
506 buy_trade(1, "100.0", "1.0", 1000),
507 buy_trade(2, "100.25", "1.0", 2000),
508 ]);
509 let _bars1 = proc.get_all_completed_bars();
510
511 proc.process_trades_continuously(&[
513 sell_trade(3, "100.50", "1.0", 3000),
514 sell_trade(4, "100.75", "1.0", 4000),
515 sell_trade(5, "100.24", "1.0", 5000), ]);
517 let bars2 = proc.get_all_completed_bars();
518 assert_eq!(bars2.len(), 1);
519 }
520
521 #[test]
522 fn test_buy_sell_volume_segregation() {
523 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
524 let trades = vec![
525 buy_trade(1, "100.0", "2.0", 1000), sell_trade(2, "100.05", "3.0", 2000), buy_trade(3, "100.25", "1.0", 3000), ];
529 proc.process_trades_continuously(&trades);
530
531 let bars = proc.get_all_completed_bars();
532 assert_eq!(bars.len(), 1);
533 let bar = &bars[0];
534
535 let buy_vol = bar.buy_volume;
536 let sell_vol = bar.sell_volume;
537 assert_eq!(buy_vol, 300_000_000, "Buy volume should be 3.0 in FixedPoint i128");
539 assert_eq!(sell_vol, 300_000_000, "Sell volume should be 3.0 in FixedPoint i128");
540 }
541
542 #[test]
543 fn test_trade_id_tracking() {
544 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
546 let trades = vec![
547 create_test_agg_trade_with_range(100, "100.0", "1.0", 1000, 1000, 1005, false),
548 create_test_agg_trade_with_range(101, "100.10", "1.0", 2000, 1006, 1010, true),
549 create_test_agg_trade_with_range(102, "100.25", "1.0", 3000, 1011, 1015, false), ];
551 proc.process_trades_continuously(&trades);
552
553 let bars = proc.get_all_completed_bars();
554 assert_eq!(bars.len(), 1);
555 let bar = &bars[0];
556 assert_eq!(bar.first_agg_trade_id, 100);
557 assert_eq!(bar.last_agg_trade_id, 102);
558 assert_eq!(bar.first_trade_id, 1000);
559 assert_eq!(bar.last_trade_id, 1015);
560 }
561
562 #[test]
563 fn test_microstructure_features_computed() {
564 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
565 let trades = vec![
566 buy_trade(1, "100.0", "5.0", 1000),
567 sell_trade(2, "100.10", "3.0", 2000),
568 buy_trade(3, "100.25", "2.0", 3000), ];
570 proc.process_trades_continuously(&trades);
571
572 let bars = proc.get_all_completed_bars();
573 let bar = &bars[0];
574
575 assert!(bar.ofi != 0.0, "OFI should be computed");
578 assert!(bar.trade_intensity > 0.0, "Trade intensity should be > 0");
579 assert!(bar.volume_per_trade > 0.0, "Volume per trade should be > 0");
580 }
581
582 #[test]
583 fn test_incomplete_bar_has_microstructure() {
584 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
585 proc.process_trades_continuously(&[
586 buy_trade(1, "100.0", "5.0", 1000),
587 sell_trade(2, "100.10", "3.0", 2000),
588 ]);
589
590 let incomplete = proc.get_incomplete_bar().unwrap();
591 assert!(
593 incomplete.volume_per_trade > 0.0,
594 "Incomplete bar should have microstructure features"
595 );
596 }
597
598 #[test]
599 fn test_multiple_bars_sequence() {
600 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
601 let trades = vec![
603 buy_trade(1, "100.0", "1.0", 1000),
604 buy_trade(2, "100.25", "1.0", 2000), buy_trade(3, "100.50", "1.0", 3000), buy_trade(4, "100.76", "1.0", 4000), ];
608 proc.process_trades_continuously(&trades);
609
610 let bars = proc.get_all_completed_bars();
611 assert_eq!(bars.len(), 2, "Should produce 2 complete bars");
612 assert_eq!(bars[0].open, FixedPoint::from_str("100.0").unwrap());
613 assert_eq!(bars[1].open, FixedPoint::from_str("100.50").unwrap());
614 }
615
616 #[test]
617 fn test_downward_breach() {
618 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
619 let trades = vec![
620 sell_trade(1, "100.0", "1.0", 1000),
621 sell_trade(2, "99.75", "1.0", 2000), ];
623 proc.process_trades_continuously(&trades);
624
625 let bars = proc.get_all_completed_bars();
626 assert_eq!(bars.len(), 1);
627 assert_eq!(bars[0].close, FixedPoint::from_str("99.75").unwrap());
628 }
629
630 #[test]
631 fn test_empty_trades_no_op() {
632 let mut proc = ExportRangeBarProcessor::new(250).unwrap();
633 proc.process_trades_continuously(&[]);
634 assert_eq!(proc.get_all_completed_bars().len(), 0);
635 assert!(proc.get_incomplete_bar().is_none());
636 }
637
638 #[test]
641 fn test_parity_with_range_bar_processor() {
642 use crate::processor::RangeBarProcessor;
643
644 for threshold in [250, 500, 1000] {
646 let trades: Vec<AggTrade> = (0..20)
648 .map(|i| {
649 let price = format!("{:.8}", 100.0 + (i as f64 * 0.15));
650 buy_trade(i + 1, &price, "1.0", 1000 + i * 1000)
651 })
652 .collect();
653
654 let mut main_proc = RangeBarProcessor::new(threshold).unwrap();
656 let main_bars = main_proc.process_agg_trade_records(&trades).unwrap();
657
658 let mut export_proc = ExportRangeBarProcessor::new(threshold).unwrap();
659 export_proc.process_trades_continuously(&trades);
660 let export_bars = export_proc.get_all_completed_bars();
661
662 assert_eq!(
664 main_bars.len(), export_bars.len(),
665 "threshold={threshold}: bar count mismatch: main={} export={}",
666 main_bars.len(), export_bars.len()
667 );
668
669 for (i, (m, e)) in main_bars.iter().zip(export_bars.iter()).enumerate() {
671 assert_eq!(m.open, e.open, "t={threshold} bar={i}: open mismatch");
672 assert_eq!(m.high, e.high, "t={threshold} bar={i}: high mismatch");
673 assert_eq!(m.low, e.low, "t={threshold} bar={i}: low mismatch");
674 assert_eq!(m.close, e.close, "t={threshold} bar={i}: close mismatch");
675 assert_eq!(m.volume, e.volume, "t={threshold} bar={i}: volume mismatch");
676 assert_eq!(m.open_time, e.open_time, "t={threshold} bar={i}: open_time mismatch");
677 assert_eq!(m.close_time, e.close_time, "t={threshold} bar={i}: close_time mismatch");
678 assert_eq!(m.individual_trade_count, e.individual_trade_count, "t={threshold} bar={i}: trade_count mismatch");
679 assert_eq!(
680 m.first_agg_trade_id, e.first_agg_trade_id,
681 "t={threshold} bar={i}: first_agg_trade_id mismatch"
682 );
683 assert_eq!(
684 m.last_agg_trade_id, e.last_agg_trade_id,
685 "t={threshold} bar={i}: last_agg_trade_id mismatch"
686 );
687 }
688 }
689 }
690}