1use std::{
22 any::Any,
23 cell::RefCell,
24 fmt::Debug,
25 ops::Add,
26 rc::{Rc, Weak},
27};
28
29use ahash::AHashMap;
30use chrono::{Duration, TimeDelta};
31use nautilus_common::{
32 clock::{Clock, TestClock},
33 timer::{TimeEvent, TimeEventCallback},
34};
35use nautilus_core::{
36 UnixNanos,
37 correctness::{self, FAILED},
38 datetime::{
39 add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos, subtract_n_months_nanos,
40 subtract_n_years_nanos,
41 },
42};
43use nautilus_model::{
44 data::{
45 QuoteTick, TradeTick,
46 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
47 },
48 enums::{
49 AggregationSource, AggressorSide, BarAggregation, BarIntervalType,
50 ContinuousFutureAdjustmentType,
51 },
52 identifiers::InstrumentId,
53 instruments::{FixedTickScheme, TickSchemeRule},
54 types::{
55 Price, Quantity,
56 fixed::{FIXED_PRECISION, FIXED_SCALAR, mantissa_exponent_to_fixed_i128},
57 price::PriceRaw,
58 quantity::QuantityRaw,
59 },
60};
61use rust_decimal::{Decimal, prelude::ToPrimitive};
62
63type BarHandler = Box<dyn FnMut(Bar)>;
65
66pub trait BarAggregator: Any + Debug {
70 fn bar_type(&self) -> BarType;
72 fn is_running(&self) -> bool;
74 fn set_is_running(&mut self, value: bool);
76 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
78 fn handle_quote(&mut self, quote: QuoteTick) {
80 let spec = self.bar_type().spec();
81 self.update(
82 quote.extract_price(spec.price_type),
83 quote.extract_size(spec.price_type),
84 quote.ts_init,
85 );
86 }
87 fn handle_trade(&mut self, trade: TradeTick) {
89 self.update(trade.price, trade.size, trade.ts_init);
90 }
91 fn handle_bar(&mut self, bar: Bar) {
93 self.update_bar(bar, bar.volume, bar.ts_init);
94 }
95 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
96 fn stop(&mut self) {}
98 fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
100 fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
102 fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
104 fn build_bar(&mut self, _event: &TimeEvent) {}
106 fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
110 fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
113 fn set_adjustment(&mut self, _adjustment: Decimal, _mode: ContinuousFutureAdjustmentType) {}
115}
116
117impl dyn BarAggregator {
118 pub fn as_any(&self) -> &dyn Any {
120 self
121 }
122 pub fn as_any_mut(&mut self) -> &mut dyn Any {
124 self
125 }
126}
127
128#[derive(Debug)]
130pub struct BarBuilder {
131 bar_type: BarType,
132 price_precision: u8,
133 size_precision: u8,
134 initialized: bool,
135 ts_last: UnixNanos,
136 count: usize,
137 last_close: Option<Price>,
138 open: Option<Price>,
139 high: Option<Price>,
140 low: Option<Price>,
141 close: Option<Price>,
142 volume: Quantity,
143 adjustment_mode: ContinuousFutureAdjustmentType,
144 adjustment_raw: PriceRaw,
145 adjustment_ratio: f64,
146 adjustment_active: bool,
147 adjustment_is_ratio: bool,
148}
149
150impl BarBuilder {
151 #[must_use]
157 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
158 correctness::check_equal(
159 &bar_type.aggregation_source(),
160 &AggregationSource::Internal,
161 "bar_type.aggregation_source",
162 "AggregationSource::Internal",
163 )
164 .expect(FAILED);
165
166 Self {
167 bar_type,
168 price_precision,
169 size_precision,
170 initialized: false,
171 ts_last: UnixNanos::default(),
172 count: 0,
173 last_close: None,
174 open: None,
175 high: None,
176 low: None,
177 close: None,
178 volume: Quantity::zero(size_precision),
179 adjustment_mode: ContinuousFutureAdjustmentType::default(),
180 adjustment_raw: 0,
181 adjustment_ratio: 1.0,
182 adjustment_active: false,
183 adjustment_is_ratio: false,
184 }
185 }
186
187 pub fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
198 self.adjustment_mode = mode;
199
200 if mode.is_ratio() {
201 self.adjustment_is_ratio = true;
202 self.adjustment_ratio = adjustment.to_f64().unwrap_or(1.0);
203 self.adjustment_active = adjustment != Decimal::ONE;
204 return;
205 }
206
207 self.adjustment_is_ratio = false;
211 let exponent = -(adjustment.scale() as i8);
212 let raw_i128 =
213 mantissa_exponent_to_fixed_i128(adjustment.mantissa(), exponent, FIXED_PRECISION)
214 .expect("Failed to scale continuous-future adjustment to fixed precision");
215
216 #[allow(
217 clippy::useless_conversion,
218 reason = "i128 to PriceRaw is real when not high-precision"
219 )]
220 let raw: PriceRaw = raw_i128
221 .try_into()
222 .expect("Continuous-future adjustment exceeds PriceRaw range");
223
224 self.adjustment_raw = raw;
225 self.adjustment_active = self.adjustment_raw != 0;
226 }
227
228 fn apply_adjustment_to_price(&self, price: Price) -> Price {
229 if !self.adjustment_active {
230 return price;
231 }
232
233 if self.adjustment_is_ratio {
234 return Price::new(price.as_f64() * self.adjustment_ratio, price.precision);
237 }
238
239 Price::from_raw(price.raw + self.adjustment_raw, price.precision)
241 }
242
243 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
249 if ts_init < self.ts_last {
250 return; }
252
253 let price = self.apply_adjustment_to_price(price);
254
255 if self.open.is_none() {
256 self.open = Some(price);
257 self.high = Some(price);
258 self.low = Some(price);
259 self.initialized = true;
260 } else {
261 if price > self.high.unwrap() {
262 self.high = Some(price);
263 }
264
265 if price < self.low.unwrap() {
266 self.low = Some(price);
267 }
268 }
269
270 self.close = Some(price);
271 self.volume = self.volume.add(size);
272 self.count += 1;
273 self.ts_last = ts_init;
274 }
275
276 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
282 if ts_init < self.ts_last {
283 return; }
285
286 let bar_open = self.apply_adjustment_to_price(bar.open);
287 let bar_high = self.apply_adjustment_to_price(bar.high);
288 let bar_low = self.apply_adjustment_to_price(bar.low);
289 let bar_close = self.apply_adjustment_to_price(bar.close);
290
291 if self.open.is_none() {
292 self.open = Some(bar_open);
293 self.high = Some(bar_high);
294 self.low = Some(bar_low);
295 self.initialized = true;
296 } else {
297 if bar_high > self.high.unwrap() {
298 self.high = Some(bar_high);
299 }
300
301 if bar_low < self.low.unwrap() {
302 self.low = Some(bar_low);
303 }
304 }
305
306 self.close = Some(bar_close);
307 self.volume = self.volume.add(volume);
308 self.count += 1;
309 self.ts_last = ts_init;
310 }
311
312 pub fn reset(&mut self) {
317 self.open = None;
318 self.high = None;
319 self.low = None;
320 self.volume = Quantity::zero(self.size_precision);
321 self.count = 0;
322 }
323
324 pub fn build_now(&mut self) -> Bar {
326 self.build(self.ts_last, self.ts_last)
327 }
328
329 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
335 if self.open.is_none() {
336 self.open = self.last_close;
337 self.high = self.last_close;
338 self.low = self.last_close;
339 self.close = self.last_close;
340 }
341
342 if let (Some(close), Some(low)) = (self.close, self.low)
343 && close < low
344 {
345 self.low = Some(close);
346 }
347
348 if let (Some(close), Some(high)) = (self.close, self.high)
349 && close > high
350 {
351 self.high = Some(close);
352 }
353
354 let bar = Bar::new(
356 self.bar_type,
357 self.open.unwrap(),
358 self.high.unwrap(),
359 self.low.unwrap(),
360 self.close.unwrap(),
361 self.volume,
362 ts_event,
363 ts_init,
364 );
365
366 self.last_close = self.close;
367 self.reset();
368 bar
369 }
370}
371
372pub struct BarAggregatorCore {
374 bar_type: BarType,
375 builder: BarBuilder,
376 handler: BarHandler,
377 is_running: bool,
378}
379
380impl Debug for BarAggregatorCore {
381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382 f.debug_struct(stringify!(BarAggregatorCore))
383 .field("bar_type", &self.bar_type)
384 .field("builder", &self.builder)
385 .field("is_running", &self.is_running)
386 .finish()
387 }
388}
389
390impl BarAggregatorCore {
391 pub fn new<H: FnMut(Bar) + 'static>(
397 bar_type: BarType,
398 price_precision: u8,
399 size_precision: u8,
400 handler: H,
401 ) -> Self {
402 Self {
403 bar_type,
404 builder: BarBuilder::new(bar_type, price_precision, size_precision),
405 handler: Box::new(handler),
406 is_running: false,
407 }
408 }
409
410 pub const fn set_is_running(&mut self, value: bool) {
412 self.is_running = value;
413 }
414
415 fn set_handler(&mut self, handler: BarHandler) {
416 self.handler = handler;
417 }
418
419 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
420 self.builder.update(price, size, ts_init);
421 }
422
423 fn build_now_and_send(&mut self) {
424 let bar = self.builder.build_now();
425 (self.handler)(bar);
426 }
427
428 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
429 let bar = self.builder.build(ts_event, ts_init);
430 (self.handler)(bar);
431 }
432
433 fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
434 self.builder.set_adjustment(adjustment, mode);
435 }
436}
437
438macro_rules! impl_set_historical_handler {
439 () => {
440 fn set_historical_mode(&mut self, _historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
441 self.core.set_handler(handler);
442 }
443 };
444}
445
446macro_rules! impl_set_adjustment {
447 () => {
448 fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
449 self.core.set_adjustment(adjustment, mode);
450 }
451 };
452}
453
454pub struct TickBarAggregator {
459 core: BarAggregatorCore,
460}
461
462impl Debug for TickBarAggregator {
463 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464 f.debug_struct(stringify!(TickBarAggregator))
465 .field("core", &self.core)
466 .finish()
467 }
468}
469
470impl TickBarAggregator {
471 pub fn new<H: FnMut(Bar) + 'static>(
477 bar_type: BarType,
478 price_precision: u8,
479 size_precision: u8,
480 handler: H,
481 ) -> Self {
482 Self {
483 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
484 }
485 }
486}
487
488impl BarAggregator for TickBarAggregator {
489 fn bar_type(&self) -> BarType {
490 self.core.bar_type
491 }
492
493 fn is_running(&self) -> bool {
494 self.core.is_running
495 }
496
497 fn set_is_running(&mut self, value: bool) {
498 self.core.set_is_running(value);
499 }
500
501 impl_set_historical_handler!();
502 impl_set_adjustment!();
503
504 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
506 self.core.apply_update(price, size, ts_init);
507 let spec = self.core.bar_type.spec();
508
509 if self.core.builder.count >= spec.step.get() {
510 self.core.build_now_and_send();
511 }
512 }
513
514 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
515 self.core.builder.update_bar(bar, volume, ts_init);
516 let spec = self.core.bar_type.spec();
517
518 if self.core.builder.count >= spec.step.get() {
519 self.core.build_now_and_send();
520 }
521 }
522}
523
524pub struct TickImbalanceBarAggregator {
529 core: BarAggregatorCore,
530 imbalance: isize,
531}
532
533impl Debug for TickImbalanceBarAggregator {
534 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
535 f.debug_struct(stringify!(TickImbalanceBarAggregator))
536 .field("core", &self.core)
537 .field("imbalance", &self.imbalance)
538 .finish()
539 }
540}
541
542impl TickImbalanceBarAggregator {
543 pub fn new<H: FnMut(Bar) + 'static>(
549 bar_type: BarType,
550 price_precision: u8,
551 size_precision: u8,
552 handler: H,
553 ) -> Self {
554 Self {
555 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
556 imbalance: 0,
557 }
558 }
559}
560
561impl BarAggregator for TickImbalanceBarAggregator {
562 fn bar_type(&self) -> BarType {
563 self.core.bar_type
564 }
565
566 fn is_running(&self) -> bool {
567 self.core.is_running
568 }
569
570 fn set_is_running(&mut self, value: bool) {
571 self.core.set_is_running(value);
572 }
573
574 impl_set_historical_handler!();
575 impl_set_adjustment!();
576
577 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
582 self.core.apply_update(price, size, ts_init);
583 }
584
585 fn handle_trade(&mut self, trade: TradeTick) {
586 self.core
587 .apply_update(trade.price, trade.size, trade.ts_init);
588
589 let delta = match trade.aggressor_side {
590 AggressorSide::Buyer => 1,
591 AggressorSide::Seller => -1,
592 AggressorSide::NoAggressor => 0,
593 };
594
595 if delta == 0 {
596 return;
597 }
598
599 self.imbalance += delta;
600 let threshold = self.core.bar_type.spec().step.get();
601 if self.imbalance.unsigned_abs() >= threshold {
602 self.core.build_now_and_send();
603 self.imbalance = 0;
604 }
605 }
606
607 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
608 self.core.builder.update_bar(bar, volume, ts_init);
609 }
610}
611
612pub struct TickRunsBarAggregator {
614 core: BarAggregatorCore,
615 current_run_side: Option<AggressorSide>,
616 run_count: usize,
617}
618
619impl Debug for TickRunsBarAggregator {
620 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
621 f.debug_struct(stringify!(TickRunsBarAggregator))
622 .field("core", &self.core)
623 .field("current_run_side", &self.current_run_side)
624 .field("run_count", &self.run_count)
625 .finish()
626 }
627}
628
629impl TickRunsBarAggregator {
630 pub fn new<H: FnMut(Bar) + 'static>(
636 bar_type: BarType,
637 price_precision: u8,
638 size_precision: u8,
639 handler: H,
640 ) -> Self {
641 Self {
642 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
643 current_run_side: None,
644 run_count: 0,
645 }
646 }
647}
648
649impl BarAggregator for TickRunsBarAggregator {
650 fn bar_type(&self) -> BarType {
651 self.core.bar_type
652 }
653
654 fn is_running(&self) -> bool {
655 self.core.is_running
656 }
657
658 fn set_is_running(&mut self, value: bool) {
659 self.core.set_is_running(value);
660 }
661
662 impl_set_historical_handler!();
663 impl_set_adjustment!();
664
665 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
670 self.core.apply_update(price, size, ts_init);
671 }
672
673 fn handle_trade(&mut self, trade: TradeTick) {
674 let side = match trade.aggressor_side {
675 AggressorSide::Buyer => Some(AggressorSide::Buyer),
676 AggressorSide::Seller => Some(AggressorSide::Seller),
677 AggressorSide::NoAggressor => None,
678 };
679
680 if let Some(side) = side {
681 if self.current_run_side != Some(side) {
682 self.current_run_side = Some(side);
683 self.run_count = 0;
684 self.core.builder.reset();
685 }
686
687 self.core
688 .apply_update(trade.price, trade.size, trade.ts_init);
689 self.run_count += 1;
690
691 let threshold = self.core.bar_type.spec().step.get();
692 if self.run_count >= threshold {
693 self.core.build_now_and_send();
694 self.run_count = 0;
695 self.current_run_side = None;
696 }
697 } else {
698 self.core
699 .apply_update(trade.price, trade.size, trade.ts_init);
700 }
701 }
702
703 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
704 self.core.builder.update_bar(bar, volume, ts_init);
705 }
706}
707
708pub struct VolumeBarAggregator {
710 core: BarAggregatorCore,
711}
712
713impl Debug for VolumeBarAggregator {
714 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
715 f.debug_struct(stringify!(VolumeBarAggregator))
716 .field("core", &self.core)
717 .finish()
718 }
719}
720
721impl VolumeBarAggregator {
722 pub fn new<H: FnMut(Bar) + 'static>(
728 bar_type: BarType,
729 price_precision: u8,
730 size_precision: u8,
731 handler: H,
732 ) -> Self {
733 Self {
734 core: BarAggregatorCore::new(
735 bar_type.standard(),
736 price_precision,
737 size_precision,
738 handler,
739 ),
740 }
741 }
742}
743
744impl BarAggregator for VolumeBarAggregator {
745 fn bar_type(&self) -> BarType {
746 self.core.bar_type
747 }
748
749 fn is_running(&self) -> bool {
750 self.core.is_running
751 }
752
753 fn set_is_running(&mut self, value: bool) {
754 self.core.set_is_running(value);
755 }
756
757 impl_set_historical_handler!();
758 impl_set_adjustment!();
759
760 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
762 let mut raw_size_update = size.raw;
763 let spec = self.core.bar_type.spec();
764 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
765
766 while raw_size_update > 0 {
767 if self.core.builder.volume.raw + raw_size_update < raw_step {
768 self.core.apply_update(
769 price,
770 Quantity::from_raw(raw_size_update, size.precision),
771 ts_init,
772 );
773 break;
774 }
775
776 let raw_size_diff = raw_step - self.core.builder.volume.raw;
777 self.core.apply_update(
778 price,
779 Quantity::from_raw(raw_size_diff, size.precision),
780 ts_init,
781 );
782
783 self.core.build_now_and_send();
784 raw_size_update -= raw_size_diff;
785 }
786 }
787
788 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
789 let mut raw_volume_update = volume.raw;
790 let spec = self.core.bar_type.spec();
791 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
792
793 while raw_volume_update > 0 {
794 if self.core.builder.volume.raw + raw_volume_update < raw_step {
795 self.core.builder.update_bar(
796 bar,
797 Quantity::from_raw(raw_volume_update, volume.precision),
798 ts_init,
799 );
800 break;
801 }
802
803 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
804 self.core.builder.update_bar(
805 bar,
806 Quantity::from_raw(raw_volume_diff, volume.precision),
807 ts_init,
808 );
809
810 self.core.build_now_and_send();
811 raw_volume_update -= raw_volume_diff;
812 }
813 }
814}
815
816pub struct VolumeImbalanceBarAggregator {
818 core: BarAggregatorCore,
819 imbalance_raw: i128,
820 raw_step: i128,
821}
822
823impl Debug for VolumeImbalanceBarAggregator {
824 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825 f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
826 .field("core", &self.core)
827 .field("imbalance_raw", &self.imbalance_raw)
828 .field("raw_step", &self.raw_step)
829 .finish()
830 }
831}
832
833impl VolumeImbalanceBarAggregator {
834 pub fn new<H: FnMut(Bar) + 'static>(
840 bar_type: BarType,
841 price_precision: u8,
842 size_precision: u8,
843 handler: H,
844 ) -> Self {
845 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
846 Self {
847 core: BarAggregatorCore::new(
848 bar_type.standard(),
849 price_precision,
850 size_precision,
851 handler,
852 ),
853 imbalance_raw: 0,
854 raw_step,
855 }
856 }
857}
858
859impl BarAggregator for VolumeImbalanceBarAggregator {
860 fn bar_type(&self) -> BarType {
861 self.core.bar_type
862 }
863
864 fn is_running(&self) -> bool {
865 self.core.is_running
866 }
867
868 fn set_is_running(&mut self, value: bool) {
869 self.core.set_is_running(value);
870 }
871
872 impl_set_historical_handler!();
873 impl_set_adjustment!();
874
875 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
880 self.core.apply_update(price, size, ts_init);
881 }
882
883 fn handle_trade(&mut self, trade: TradeTick) {
884 let side = match trade.aggressor_side {
885 AggressorSide::Buyer => 1,
886 AggressorSide::Seller => -1,
887 AggressorSide::NoAggressor => {
888 self.core
889 .apply_update(trade.price, trade.size, trade.ts_init);
890 return;
891 }
892 };
893
894 let mut raw_remaining = trade.size.raw as i128;
895 while raw_remaining > 0 {
896 let imbalance_abs = self.imbalance_raw.abs();
897 let needed = (self.raw_step - imbalance_abs).max(1);
898 let raw_chunk = raw_remaining.min(needed);
899 let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
900
901 self.core
902 .apply_update(trade.price, qty_chunk, trade.ts_init);
903
904 self.imbalance_raw += side * raw_chunk;
905 raw_remaining -= raw_chunk;
906
907 if self.imbalance_raw.abs() >= self.raw_step {
908 self.core.build_now_and_send();
909 self.imbalance_raw = 0;
910 }
911 }
912 }
913
914 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
915 self.core.builder.update_bar(bar, volume, ts_init);
916 }
917}
918
919pub struct VolumeRunsBarAggregator {
921 core: BarAggregatorCore,
922 current_run_side: Option<AggressorSide>,
923 run_volume_raw: QuantityRaw,
924 raw_step: QuantityRaw,
925}
926
927impl Debug for VolumeRunsBarAggregator {
928 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
929 f.debug_struct(stringify!(VolumeRunsBarAggregator))
930 .field("core", &self.core)
931 .field("current_run_side", &self.current_run_side)
932 .field("run_volume_raw", &self.run_volume_raw)
933 .field("raw_step", &self.raw_step)
934 .finish()
935 }
936}
937
938impl VolumeRunsBarAggregator {
939 pub fn new<H: FnMut(Bar) + 'static>(
945 bar_type: BarType,
946 price_precision: u8,
947 size_precision: u8,
948 handler: H,
949 ) -> Self {
950 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
951 Self {
952 core: BarAggregatorCore::new(
953 bar_type.standard(),
954 price_precision,
955 size_precision,
956 handler,
957 ),
958 current_run_side: None,
959 run_volume_raw: 0,
960 raw_step,
961 }
962 }
963}
964
965impl BarAggregator for VolumeRunsBarAggregator {
966 fn bar_type(&self) -> BarType {
967 self.core.bar_type
968 }
969
970 fn is_running(&self) -> bool {
971 self.core.is_running
972 }
973
974 fn set_is_running(&mut self, value: bool) {
975 self.core.set_is_running(value);
976 }
977
978 impl_set_historical_handler!();
979 impl_set_adjustment!();
980
981 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
986 self.core.apply_update(price, size, ts_init);
987 }
988
989 fn handle_trade(&mut self, trade: TradeTick) {
990 let side = match trade.aggressor_side {
991 AggressorSide::Buyer => Some(AggressorSide::Buyer),
992 AggressorSide::Seller => Some(AggressorSide::Seller),
993 AggressorSide::NoAggressor => None,
994 };
995
996 let Some(side) = side else {
997 self.core
998 .apply_update(trade.price, trade.size, trade.ts_init);
999 return;
1000 };
1001
1002 if self.current_run_side != Some(side) {
1003 self.current_run_side = Some(side);
1004 self.run_volume_raw = 0;
1005 self.core.builder.reset();
1006 }
1007
1008 let mut raw_remaining = trade.size.raw;
1009 while raw_remaining > 0 {
1010 let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
1011 let raw_chunk = raw_remaining.min(needed);
1012
1013 self.core.apply_update(
1014 trade.price,
1015 Quantity::from_raw(raw_chunk, trade.size.precision),
1016 trade.ts_init,
1017 );
1018
1019 self.run_volume_raw += raw_chunk;
1020 raw_remaining -= raw_chunk;
1021
1022 if self.run_volume_raw >= self.raw_step {
1023 self.core.build_now_and_send();
1024 self.run_volume_raw = 0;
1025 self.current_run_side = None;
1026 }
1027 }
1028 }
1029
1030 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1031 self.core.builder.update_bar(bar, volume, ts_init);
1032 }
1033}
1034
1035pub struct ValueBarAggregator {
1040 core: BarAggregatorCore,
1041 cum_value: f64,
1042}
1043
1044impl Debug for ValueBarAggregator {
1045 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1046 f.debug_struct(stringify!(ValueBarAggregator))
1047 .field("core", &self.core)
1048 .field("cum_value", &self.cum_value)
1049 .finish()
1050 }
1051}
1052
1053impl ValueBarAggregator {
1054 pub fn new<H: FnMut(Bar) + 'static>(
1060 bar_type: BarType,
1061 price_precision: u8,
1062 size_precision: u8,
1063 handler: H,
1064 ) -> Self {
1065 Self {
1066 core: BarAggregatorCore::new(
1067 bar_type.standard(),
1068 price_precision,
1069 size_precision,
1070 handler,
1071 ),
1072 cum_value: 0.0,
1073 }
1074 }
1075
1076 #[must_use]
1077 pub const fn get_cumulative_value(&self) -> f64 {
1079 self.cum_value
1080 }
1081}
1082
1083impl BarAggregator for ValueBarAggregator {
1084 fn bar_type(&self) -> BarType {
1085 self.core.bar_type
1086 }
1087
1088 fn is_running(&self) -> bool {
1089 self.core.is_running
1090 }
1091
1092 fn set_is_running(&mut self, value: bool) {
1093 self.core.set_is_running(value);
1094 }
1095
1096 impl_set_historical_handler!();
1097 impl_set_adjustment!();
1098
1099 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1101 let mut size_update = size.as_f64();
1102 let spec = self.core.bar_type.spec();
1103
1104 while size_update > 0.0 {
1105 let value_update = price.as_f64() * size_update;
1106 if value_update == 0.0 {
1107 self.core
1109 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
1110 break;
1111 }
1112
1113 if self.cum_value + value_update < spec.step.get() as f64 {
1114 self.cum_value += value_update;
1115 self.core
1116 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
1117 break;
1118 }
1119
1120 let value_diff = spec.step.get() as f64 - self.cum_value;
1121 let mut size_diff = size_update * (value_diff / value_update);
1122
1123 if is_below_min_size(size_diff, size.precision) {
1125 if is_below_min_size(size_update, size.precision) {
1126 break;
1127 }
1128 size_diff = min_size_f64(size.precision);
1129 }
1130
1131 self.core
1132 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1133
1134 self.core.build_now_and_send();
1135 self.cum_value = 0.0;
1136 size_update -= size_diff;
1137 }
1138 }
1139
1140 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1141 let mut volume_update = volume;
1142 let average_price = Price::new(
1143 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1144 self.core.builder.price_precision,
1145 );
1146
1147 while volume_update.as_f64() > 0.0 {
1148 let value_update = average_price.as_f64() * volume_update.as_f64();
1149 if value_update == 0.0 {
1150 self.core.builder.update_bar(bar, volume_update, ts_init);
1152 break;
1153 }
1154
1155 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1156 self.cum_value += value_update;
1157 self.core.builder.update_bar(bar, volume_update, ts_init);
1158 break;
1159 }
1160
1161 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1162 let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1163
1164 if is_below_min_size(volume_diff, volume_update.precision) {
1166 if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1167 break;
1168 }
1169 volume_diff = min_size_f64(volume_update.precision);
1170 }
1171
1172 self.core.builder.update_bar(
1173 bar,
1174 Quantity::new(volume_diff, volume_update.precision),
1175 ts_init,
1176 );
1177
1178 self.core.build_now_and_send();
1179 self.cum_value = 0.0;
1180 volume_update = Quantity::new(
1181 volume_update.as_f64() - volume_diff,
1182 volume_update.precision,
1183 );
1184 }
1185 }
1186}
1187
1188pub struct ValueImbalanceBarAggregator {
1190 core: BarAggregatorCore,
1191 imbalance_value: f64,
1192 step_value: f64,
1193}
1194
1195impl Debug for ValueImbalanceBarAggregator {
1196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1197 f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1198 .field("core", &self.core)
1199 .field("imbalance_value", &self.imbalance_value)
1200 .field("step_value", &self.step_value)
1201 .finish()
1202 }
1203}
1204
1205impl ValueImbalanceBarAggregator {
1206 pub fn new<H: FnMut(Bar) + 'static>(
1212 bar_type: BarType,
1213 price_precision: u8,
1214 size_precision: u8,
1215 handler: H,
1216 ) -> Self {
1217 Self {
1218 core: BarAggregatorCore::new(
1219 bar_type.standard(),
1220 price_precision,
1221 size_precision,
1222 handler,
1223 ),
1224 imbalance_value: 0.0,
1225 step_value: bar_type.spec().step.get() as f64,
1226 }
1227 }
1228}
1229
1230impl BarAggregator for ValueImbalanceBarAggregator {
1231 fn bar_type(&self) -> BarType {
1232 self.core.bar_type
1233 }
1234
1235 fn is_running(&self) -> bool {
1236 self.core.is_running
1237 }
1238
1239 fn set_is_running(&mut self, value: bool) {
1240 self.core.set_is_running(value);
1241 }
1242
1243 impl_set_historical_handler!();
1244 impl_set_adjustment!();
1245
1246 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1251 self.core.apply_update(price, size, ts_init);
1252 }
1253
1254 fn handle_trade(&mut self, trade: TradeTick) {
1255 let price_f64 = trade.price.as_f64();
1256 if price_f64 == 0.0 {
1257 self.core
1258 .apply_update(trade.price, trade.size, trade.ts_init);
1259 return;
1260 }
1261
1262 let side_sign = match trade.aggressor_side {
1263 AggressorSide::Buyer => 1.0,
1264 AggressorSide::Seller => -1.0,
1265 AggressorSide::NoAggressor => {
1266 self.core
1267 .apply_update(trade.price, trade.size, trade.ts_init);
1268 return;
1269 }
1270 };
1271
1272 let mut size_remaining = trade.size.as_f64();
1273 while size_remaining > 0.0 {
1274 let value_remaining = price_f64 * size_remaining;
1275
1276 #[allow(clippy::float_cmp, reason = "exact-zero check on accumulator")]
1277 if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1278 let needed = self.step_value - self.imbalance_value.abs();
1279 if value_remaining <= needed {
1280 self.imbalance_value += side_sign * value_remaining;
1281 self.core.apply_update(
1282 trade.price,
1283 Quantity::new(size_remaining, trade.size.precision),
1284 trade.ts_init,
1285 );
1286
1287 if self.imbalance_value.abs() >= self.step_value {
1288 self.core.build_now_and_send();
1289 self.imbalance_value = 0.0;
1290 }
1291 break;
1292 }
1293
1294 let mut value_chunk = needed;
1295 let mut size_chunk = value_chunk / price_f64;
1296
1297 if is_below_min_size(size_chunk, trade.size.precision) {
1299 if is_below_min_size(size_remaining, trade.size.precision) {
1300 break;
1301 }
1302 size_chunk = min_size_f64(trade.size.precision);
1303 value_chunk = price_f64 * size_chunk;
1304 }
1305
1306 self.core.apply_update(
1307 trade.price,
1308 Quantity::new(size_chunk, trade.size.precision),
1309 trade.ts_init,
1310 );
1311 self.imbalance_value += side_sign * value_chunk;
1312 size_remaining -= size_chunk;
1313
1314 if self.imbalance_value.abs() >= self.step_value {
1315 self.core.build_now_and_send();
1316 self.imbalance_value = 0.0;
1317 }
1318 } else {
1319 let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1321 let mut size_chunk = value_to_flatten / price_f64;
1322
1323 if is_below_min_size(size_chunk, trade.size.precision) {
1325 if is_below_min_size(size_remaining, trade.size.precision) {
1326 break;
1327 }
1328 size_chunk = min_size_f64(trade.size.precision);
1329 value_to_flatten = price_f64 * size_chunk;
1330 }
1331
1332 self.core.apply_update(
1333 trade.price,
1334 Quantity::new(size_chunk, trade.size.precision),
1335 trade.ts_init,
1336 );
1337 self.imbalance_value += side_sign * value_to_flatten;
1338
1339 if self.imbalance_value.abs() >= self.step_value {
1341 self.core.build_now_and_send();
1342 self.imbalance_value = 0.0;
1343 }
1344 size_remaining -= size_chunk;
1345 }
1346 }
1347 }
1348
1349 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1350 self.core.builder.update_bar(bar, volume, ts_init);
1351 }
1352}
1353
1354pub struct ValueRunsBarAggregator {
1356 core: BarAggregatorCore,
1357 current_run_side: Option<AggressorSide>,
1358 run_value: f64,
1359 step_value: f64,
1360}
1361
1362impl Debug for ValueRunsBarAggregator {
1363 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1364 f.debug_struct(stringify!(ValueRunsBarAggregator))
1365 .field("core", &self.core)
1366 .field("current_run_side", &self.current_run_side)
1367 .field("run_value", &self.run_value)
1368 .field("step_value", &self.step_value)
1369 .finish()
1370 }
1371}
1372
1373impl ValueRunsBarAggregator {
1374 pub fn new<H: FnMut(Bar) + 'static>(
1380 bar_type: BarType,
1381 price_precision: u8,
1382 size_precision: u8,
1383 handler: H,
1384 ) -> Self {
1385 Self {
1386 core: BarAggregatorCore::new(
1387 bar_type.standard(),
1388 price_precision,
1389 size_precision,
1390 handler,
1391 ),
1392 current_run_side: None,
1393 run_value: 0.0,
1394 step_value: bar_type.spec().step.get() as f64,
1395 }
1396 }
1397}
1398
1399impl BarAggregator for ValueRunsBarAggregator {
1400 fn bar_type(&self) -> BarType {
1401 self.core.bar_type
1402 }
1403
1404 fn is_running(&self) -> bool {
1405 self.core.is_running
1406 }
1407
1408 fn set_is_running(&mut self, value: bool) {
1409 self.core.set_is_running(value);
1410 }
1411
1412 impl_set_historical_handler!();
1413 impl_set_adjustment!();
1414
1415 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1420 self.core.apply_update(price, size, ts_init);
1421 }
1422
1423 fn handle_trade(&mut self, trade: TradeTick) {
1424 let price_f64 = trade.price.as_f64();
1425 if price_f64 == 0.0 {
1426 self.core
1427 .apply_update(trade.price, trade.size, trade.ts_init);
1428 return;
1429 }
1430
1431 let side = match trade.aggressor_side {
1432 AggressorSide::Buyer => Some(AggressorSide::Buyer),
1433 AggressorSide::Seller => Some(AggressorSide::Seller),
1434 AggressorSide::NoAggressor => None,
1435 };
1436
1437 let Some(side) = side else {
1438 self.core
1439 .apply_update(trade.price, trade.size, trade.ts_init);
1440 return;
1441 };
1442
1443 if self.current_run_side != Some(side) {
1444 self.current_run_side = Some(side);
1445 self.run_value = 0.0;
1446 self.core.builder.reset();
1447 }
1448
1449 let mut size_remaining = trade.size.as_f64();
1450 while size_remaining > 0.0 {
1451 let value_update = price_f64 * size_remaining;
1452 if self.run_value + value_update < self.step_value {
1453 self.run_value += value_update;
1454 self.core.apply_update(
1455 trade.price,
1456 Quantity::new(size_remaining, trade.size.precision),
1457 trade.ts_init,
1458 );
1459 break;
1460 }
1461
1462 let value_needed = self.step_value - self.run_value;
1463 let mut size_chunk = value_needed / price_f64;
1464
1465 if is_below_min_size(size_chunk, trade.size.precision) {
1467 if is_below_min_size(size_remaining, trade.size.precision) {
1468 break;
1469 }
1470 size_chunk = min_size_f64(trade.size.precision);
1471 }
1472
1473 self.core.apply_update(
1474 trade.price,
1475 Quantity::new(size_chunk, trade.size.precision),
1476 trade.ts_init,
1477 );
1478
1479 self.core.build_now_and_send();
1480 self.run_value = 0.0;
1481 self.current_run_side = None;
1482 size_remaining -= size_chunk;
1483 }
1484 }
1485
1486 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1487 self.core.builder.update_bar(bar, volume, ts_init);
1488 }
1489}
1490
1491pub struct RenkoBarAggregator {
1497 core: BarAggregatorCore,
1498 pub brick_size: PriceRaw,
1499 last_close: Option<Price>,
1500}
1501
1502impl Debug for RenkoBarAggregator {
1503 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1504 f.debug_struct(stringify!(RenkoBarAggregator))
1505 .field("core", &self.core)
1506 .field("brick_size", &self.brick_size)
1507 .field("last_close", &self.last_close)
1508 .finish()
1509 }
1510}
1511
1512impl RenkoBarAggregator {
1513 pub fn new<H: FnMut(Bar) + 'static>(
1519 bar_type: BarType,
1520 price_precision: u8,
1521 size_precision: u8,
1522 price_increment: Price,
1523 handler: H,
1524 ) -> Self {
1525 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1527
1528 Self {
1529 core: BarAggregatorCore::new(
1530 bar_type.standard(),
1531 price_precision,
1532 size_precision,
1533 handler,
1534 ),
1535 brick_size,
1536 last_close: None,
1537 }
1538 }
1539}
1540
1541impl BarAggregator for RenkoBarAggregator {
1542 fn bar_type(&self) -> BarType {
1543 self.core.bar_type
1544 }
1545
1546 fn is_running(&self) -> bool {
1547 self.core.is_running
1548 }
1549
1550 fn set_is_running(&mut self, value: bool) {
1551 self.core.set_is_running(value);
1552 }
1553
1554 impl_set_historical_handler!();
1555 impl_set_adjustment!();
1556
1557 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1562 self.core.apply_update(price, size, ts_init);
1564
1565 if self.last_close.is_none() {
1567 self.last_close = Some(price);
1568 return;
1569 }
1570
1571 let last_close = self.last_close.unwrap();
1572
1573 let current_raw = price.raw;
1575 let last_close_raw = last_close.raw;
1576 let price_diff_raw = current_raw - last_close_raw;
1577 let abs_price_diff_raw = price_diff_raw.abs();
1578
1579 if abs_price_diff_raw >= self.brick_size {
1581 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1582 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1583 let mut current_close = last_close;
1584
1585 let total_volume = self.core.builder.volume;
1587
1588 for _i in 0..num_bricks {
1589 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1591 let brick_close = Price::from_raw(brick_close_raw, price.precision);
1592
1593 let (brick_high, brick_low) = if direction > 0.0 {
1595 (brick_close, current_close)
1596 } else {
1597 (current_close, brick_close)
1598 };
1599
1600 self.core.builder.reset();
1602 self.core.builder.open = Some(current_close);
1603 self.core.builder.high = Some(brick_high);
1604 self.core.builder.low = Some(brick_low);
1605 self.core.builder.close = Some(brick_close);
1606 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1608 self.core.builder.ts_last = ts_init;
1609 self.core.builder.initialized = true;
1610
1611 self.core.build_and_send(ts_init, ts_init);
1613
1614 current_close = brick_close;
1616 self.last_close = Some(brick_close);
1617 }
1618 }
1619 }
1620
1621 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1622 self.core.builder.update_bar(bar, volume, ts_init);
1624
1625 if self.last_close.is_none() {
1627 self.last_close = Some(bar.close);
1628 return;
1629 }
1630
1631 let last_close = self.last_close.unwrap();
1632
1633 let current_raw = bar.close.raw;
1635 let last_close_raw = last_close.raw;
1636 let price_diff_raw = current_raw - last_close_raw;
1637 let abs_price_diff_raw = price_diff_raw.abs();
1638
1639 if abs_price_diff_raw >= self.brick_size {
1641 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1642 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1643 let mut current_close = last_close;
1644
1645 let total_volume = self.core.builder.volume;
1647
1648 for _i in 0..num_bricks {
1649 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1651 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1652
1653 let (brick_high, brick_low) = if direction > 0.0 {
1655 (brick_close, current_close)
1656 } else {
1657 (current_close, brick_close)
1658 };
1659
1660 self.core.builder.reset();
1662 self.core.builder.open = Some(current_close);
1663 self.core.builder.high = Some(brick_high);
1664 self.core.builder.low = Some(brick_low);
1665 self.core.builder.close = Some(brick_close);
1666 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1668 self.core.builder.ts_last = ts_init;
1669 self.core.builder.initialized = true;
1670
1671 self.core.build_and_send(ts_init, ts_init);
1673
1674 current_close = brick_close;
1676 self.last_close = Some(brick_close);
1677 }
1678 }
1679 }
1680}
1681
1682pub struct TimeBarAggregator {
1686 core: BarAggregatorCore,
1687 clock: Rc<RefCell<dyn Clock>>,
1688 build_with_no_updates: bool,
1689 timestamp_on_close: bool,
1690 is_left_open: bool,
1691 stored_open_ns: UnixNanos,
1692 timer_name: String,
1693 interval_ns: UnixNanos,
1694 next_close_ns: UnixNanos,
1695 first_close_ns: UnixNanos,
1696 bar_build_delay: u64,
1697 time_bars_origin_offset: Option<TimeDelta>,
1698 skip_first_non_full_bar: bool,
1699 pub historical_mode: bool,
1700 historical_events: Vec<TimeEvent>,
1701 historical_event_at_ts_init: Option<TimeEvent>,
1702 aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1703}
1704
1705impl Debug for TimeBarAggregator {
1706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1707 f.debug_struct(stringify!(TimeBarAggregator))
1708 .field("core", &self.core)
1709 .field("build_with_no_updates", &self.build_with_no_updates)
1710 .field("timestamp_on_close", &self.timestamp_on_close)
1711 .field("is_left_open", &self.is_left_open)
1712 .field("timer_name", &self.timer_name)
1713 .field("interval_ns", &self.interval_ns)
1714 .field("bar_build_delay", &self.bar_build_delay)
1715 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1716 .finish()
1717 }
1718}
1719
1720impl TimeBarAggregator {
1721 #[expect(clippy::too_many_arguments)]
1727 pub fn new<H: FnMut(Bar) + 'static>(
1728 bar_type: BarType,
1729 price_precision: u8,
1730 size_precision: u8,
1731 clock: Rc<RefCell<dyn Clock>>,
1732 handler: H,
1733 build_with_no_updates: bool,
1734 timestamp_on_close: bool,
1735 interval_type: BarIntervalType,
1736 time_bars_origin_offset: Option<TimeDelta>,
1737 bar_build_delay: u64,
1738 skip_first_non_full_bar: bool,
1739 ) -> Self {
1740 let is_left_open = match interval_type {
1741 BarIntervalType::LeftOpen => true,
1742 BarIntervalType::RightOpen => false,
1743 };
1744
1745 let core = BarAggregatorCore::new(
1746 bar_type.standard(),
1747 price_precision,
1748 size_precision,
1749 handler,
1750 );
1751
1752 Self {
1753 core,
1754 clock,
1755 build_with_no_updates,
1756 timestamp_on_close,
1757 is_left_open,
1758 stored_open_ns: UnixNanos::default(),
1759 timer_name: format!("TIME_BAR_{bar_type}"),
1760 interval_ns: get_bar_interval_ns(&bar_type),
1761 next_close_ns: UnixNanos::default(),
1762 first_close_ns: UnixNanos::default(),
1763 bar_build_delay,
1764 time_bars_origin_offset,
1765 skip_first_non_full_bar,
1766 historical_mode: false,
1767 historical_events: Vec::new(),
1768 historical_event_at_ts_init: None,
1769 aggregator_weak: None,
1770 }
1771 }
1772
1773 pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1775 self.clock = clock;
1776 }
1777
1778 pub fn start_timer_internal(
1787 &mut self,
1788 aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1789 ) {
1790 let aggregator_weak = if let Some(rc) = aggregator_rc {
1792 let weak = Rc::downgrade(&rc);
1794 self.aggregator_weak = Some(weak.clone());
1795 weak
1796 } else {
1797 self.aggregator_weak
1799 .as_ref()
1800 .expect("Aggregator weak reference must be set before calling start_timer()")
1801 .clone()
1802 };
1803
1804 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1805 if let Some(agg) = aggregator_weak.upgrade() {
1806 agg.borrow_mut().build_bar(&event);
1807 }
1808 }));
1809
1810 let now = self.clock.borrow().utc_now();
1812 let mut start_time =
1813 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1814 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1815
1816 let fire_immediately = start_time == now;
1818
1819 let spec = &self.bar_type().spec();
1820 let start_time_ns = UnixNanos::from(start_time);
1821 let step = spec.step.get() as u32;
1822
1823 if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1824 self.clock
1825 .borrow_mut()
1826 .set_timer_ns(
1827 &self.timer_name,
1828 self.interval_ns.as_u64(),
1829 Some(start_time_ns),
1830 None,
1831 Some(callback),
1832 Some(true), Some(fire_immediately),
1834 )
1835 .expect(FAILED);
1836
1837 if fire_immediately {
1838 self.next_close_ns = start_time_ns;
1839 } else {
1840 let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1841 self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1842 }
1843
1844 self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1845 } else {
1846 let alert_time = if fire_immediately {
1848 start_time
1849 } else if spec.aggregation == BarAggregation::Month {
1850 add_n_months(start_time, step).expect(FAILED)
1851 } else {
1852 add_n_years(start_time, step).expect(FAILED)
1853 };
1854
1855 self.clock
1856 .borrow_mut()
1857 .set_time_alert_ns(
1858 &self.timer_name,
1859 UnixNanos::from(alert_time),
1860 Some(callback),
1861 Some(true), )
1863 .expect(FAILED);
1864
1865 self.next_close_ns = UnixNanos::from(alert_time);
1866 self.stored_open_ns = if fire_immediately {
1869 if spec.aggregation == BarAggregation::Month {
1870 subtract_n_months_nanos(start_time_ns, step).expect(FAILED)
1871 } else {
1872 subtract_n_years_nanos(start_time_ns, step).expect(FAILED)
1873 }
1874 } else {
1875 start_time_ns
1876 };
1877 }
1878
1879 if self.skip_first_non_full_bar {
1880 self.first_close_ns = self.next_close_ns;
1881 }
1882
1883 log::debug!(
1884 "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1885 self.timer_name,
1886 start_time,
1887 self.historical_mode,
1888 fire_immediately,
1889 now,
1890 self.bar_build_delay
1891 );
1892 }
1893
1894 pub fn stop(&mut self) {
1896 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1897 }
1898
1899 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1900 if self.skip_first_non_full_bar && ts_init <= self.first_close_ns {
1901 self.core.builder.reset();
1902 } else {
1903 self.skip_first_non_full_bar = false;
1906 self.core.build_and_send(ts_event, ts_init);
1907 }
1908 }
1909
1910 fn build_bar(&mut self, event: &TimeEvent) {
1911 if !self.core.builder.initialized {
1912 return;
1913 }
1914
1915 if !self.build_with_no_updates && self.core.builder.count == 0 {
1916 return; }
1918
1919 let ts_init = event.ts_event;
1920 let ts_event = if self.is_left_open {
1921 if self.timestamp_on_close {
1922 event.ts_event
1923 } else {
1924 self.stored_open_ns
1925 }
1926 } else {
1927 self.stored_open_ns
1928 };
1929
1930 self.build_and_send(ts_event, ts_init);
1931
1932 self.stored_open_ns = event.ts_event;
1934
1935 if self.bar_type().spec().aggregation == BarAggregation::Month {
1936 let step = self.bar_type().spec().step.get() as u32;
1937 let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1938
1939 self.clock
1940 .borrow_mut()
1941 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1942 .expect(FAILED);
1943
1944 self.next_close_ns = alert_time_ns;
1945 } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1946 let step = self.bar_type().spec().step.get() as u32;
1947 let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1948
1949 self.clock
1950 .borrow_mut()
1951 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1952 .expect(FAILED);
1953
1954 self.next_close_ns = alert_time_ns;
1955 } else {
1956 self.next_close_ns = self
1958 .clock
1959 .borrow()
1960 .next_time_ns(&self.timer_name)
1961 .unwrap_or_default();
1962 }
1963 }
1964
1965 fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1966 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1967 {
1969 let mut clock_borrow = self.clock.borrow_mut();
1970 let test_clock = clock_borrow
1971 .as_any_mut()
1972 .downcast_mut::<TestClock>()
1973 .expect("Expected TestClock in historical mode");
1974 test_clock.set_time(ts_init);
1975 }
1976 self.start_timer_internal(None);
1978 }
1979
1980 let events = {
1982 let mut clock_borrow = self.clock.borrow_mut();
1983 let test_clock = clock_borrow
1984 .as_any_mut()
1985 .downcast_mut::<TestClock>()
1986 .expect("Expected TestClock in historical mode");
1987 test_clock.advance_time(ts_init, true)
1988 };
1989
1990 for event in events {
1991 if event.ts_event == ts_init {
1992 self.historical_event_at_ts_init = Some(event);
1993 } else {
1994 self.build_bar(&event);
1995 }
1996 }
1997 }
1998
1999 fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
2000 if let Some(ref event) = self.historical_event_at_ts_init.take() {
2001 self.build_bar(event);
2002 }
2003 }
2004
2005 pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
2007 self.historical_events = events;
2008 }
2009}
2010
2011impl BarAggregator for TimeBarAggregator {
2012 fn bar_type(&self) -> BarType {
2013 self.core.bar_type
2014 }
2015
2016 fn is_running(&self) -> bool {
2017 self.core.is_running
2018 }
2019
2020 fn set_is_running(&mut self, value: bool) {
2021 self.core.set_is_running(value);
2022 }
2023
2024 fn stop(&mut self) {
2026 Self::stop(self);
2027 }
2028
2029 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
2030 if self.historical_mode {
2031 self.preprocess_historical_events(ts_init);
2032 }
2033
2034 self.core.apply_update(price, size, ts_init);
2035
2036 if self.historical_mode {
2037 self.postprocess_historical_events(ts_init);
2038 }
2039 }
2040
2041 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
2042 if self.historical_mode {
2043 self.preprocess_historical_events(ts_init);
2044 }
2045
2046 self.core.builder.update_bar(bar, volume, ts_init);
2047
2048 if self.historical_mode {
2049 self.postprocess_historical_events(ts_init);
2050 }
2051 }
2052
2053 fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
2054 self.historical_mode = historical_mode;
2055 self.core.handler = handler;
2056 }
2057
2058 fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
2059 self.set_historical_events_internal(events);
2060 }
2061
2062 fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
2063 self.set_clock_internal(clock);
2064 }
2065
2066 fn build_bar(&mut self, event: &TimeEvent) {
2067 {
2070 #[expect(clippy::use_self)]
2071 TimeBarAggregator::build_bar(self, event);
2072 }
2073 }
2074
2075 fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
2076 self.aggregator_weak = Some(weak);
2077 }
2078
2079 fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
2080 self.start_timer_internal(aggregator_rc);
2081 }
2082
2083 fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
2084 self.core.set_adjustment(adjustment, mode);
2085 }
2086}
2087
2088fn is_below_min_size(size: f64, precision: u8) -> bool {
2089 Quantity::new(size, precision).raw == 0
2090}
2091
2092fn min_size_f64(precision: u8) -> f64 {
2093 10_f64.powi(-(precision as i32))
2094}
2095
2096pub trait VegaProvider {
2098 fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64>;
2100}
2101
2102pub trait SpreadPriceRounder {
2104 fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price);
2106}
2107
2108#[derive(Debug, Default)]
2110pub struct MapVegaProvider {
2111 vegas: AHashMap<InstrumentId, f64>,
2112}
2113
2114impl MapVegaProvider {
2115 pub fn new() -> Self {
2116 Self {
2117 vegas: AHashMap::new(),
2118 }
2119 }
2120
2121 pub fn insert(&mut self, instrument_id: InstrumentId, vega: f64) {
2122 self.vegas.insert(instrument_id, vega);
2123 }
2124
2125 pub fn get(&self, instrument_id: &InstrumentId) -> Option<f64> {
2126 self.vegas.get(instrument_id).copied()
2127 }
2128}
2129
2130impl VegaProvider for MapVegaProvider {
2131 fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64> {
2132 self.vegas.get(&instrument_id).copied()
2133 }
2134}
2135
2136#[derive(Debug)]
2138pub struct FixedTickSchemeRounder {
2139 scheme: FixedTickScheme,
2140}
2141
2142impl FixedTickSchemeRounder {
2143 pub fn new(tick: f64) -> anyhow::Result<Self> {
2149 Ok(Self {
2150 scheme: FixedTickScheme::new(tick)?,
2151 })
2152 }
2153
2154 fn round_one(&self, raw: f64, precision: u8, use_bid_rounding: bool) -> Price {
2155 if raw >= 0.0 {
2156 let p = if use_bid_rounding {
2157 self.scheme.next_bid_price(raw, 0, precision)
2158 } else {
2159 self.scheme.next_ask_price(raw, 0, precision)
2160 };
2161 p.unwrap_or_else(|| price_from_f64(raw, precision))
2162 } else {
2163 let p = if use_bid_rounding {
2164 self.scheme.next_ask_price(-raw, 0, precision)
2165 } else {
2166 self.scheme.next_bid_price(-raw, 0, precision)
2167 };
2168 p.map_or_else(
2169 || price_from_f64(raw, precision),
2170 |q| price_from_f64(-q.as_f64(), precision),
2171 )
2172 }
2173 }
2174}
2175
2176impl SpreadPriceRounder for FixedTickSchemeRounder {
2177 fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price) {
2178 let bid = self.round_one(raw_bid, precision, true);
2179 let ask = self.round_one(raw_ask, precision, false);
2180 (bid, ask)
2181 }
2182}
2183
2184pub struct SpreadQuoteAggregator {
2190 spread_instrument_id: InstrumentId,
2191 leg_ids: Vec<InstrumentId>,
2192 ratios: Vec<i64>,
2193 n_legs: usize,
2194 is_futures_spread: bool,
2195 price_precision: u8,
2196 size_precision: u8,
2197 last_quotes: AHashMap<InstrumentId, QuoteTick>,
2198 mid_prices: Vec<f64>,
2199 bid_prices: Vec<f64>,
2200 ask_prices: Vec<f64>,
2201 vegas: Vec<f64>,
2202 bid_ask_spreads: Vec<f64>,
2203 bid_sizes: Vec<f64>,
2204 ask_sizes: Vec<f64>,
2205 handler: Box<dyn FnMut(QuoteTick)>,
2206 clock: Rc<RefCell<dyn Clock>>,
2207 historical_mode: bool,
2208 update_interval_seconds: Option<u64>,
2209 quote_build_delay: u64,
2210 has_update: bool,
2211 timer_name: String,
2212 historical_event_at_ts_init: Option<TimeEvent>,
2213 vega_provider: Option<Box<dyn VegaProvider>>,
2214 price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2215 is_running: bool,
2216 aggregator_weak: Option<Weak<RefCell<Self>>>,
2217}
2218
2219impl Debug for SpreadQuoteAggregator {
2220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2221 f.debug_struct(stringify!(SpreadQuoteAggregator))
2222 .field("spread_instrument_id", &self.spread_instrument_id)
2223 .field("n_legs", &self.n_legs)
2224 .field("is_futures_spread", &self.is_futures_spread)
2225 .field("update_interval_seconds", &self.update_interval_seconds)
2226 .finish()
2227 }
2228}
2229
2230impl SpreadQuoteAggregator {
2231 #[expect(clippy::too_many_arguments)]
2237 pub fn new(
2238 spread_instrument_id: InstrumentId,
2239 legs: &[(InstrumentId, i64)],
2240 is_futures_spread: bool,
2241 price_precision: u8,
2242 size_precision: u8,
2243 handler: Box<dyn FnMut(QuoteTick)>,
2244 clock: Rc<RefCell<dyn Clock>>,
2245 historical_mode: bool,
2246 update_interval_seconds: Option<u64>,
2247 quote_build_delay: u64,
2248 vega_provider: Option<Box<dyn VegaProvider>>,
2249 price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2250 ) -> Self {
2251 assert!(legs.len() >= 2, "Spread must have more than one leg");
2252 let n_legs = legs.len();
2253 let leg_ids: Vec<InstrumentId> = legs.iter().map(|(id, _)| *id).collect();
2254 let ratios: Vec<i64> = legs.iter().map(|(_, r)| *r).collect();
2255 for &r in &ratios {
2256 assert!(r != 0, "Ratio cannot be zero");
2257 }
2258 let timer_name = format!("SPREAD_QUOTE_{spread_instrument_id}");
2259 Self {
2260 spread_instrument_id,
2261 leg_ids,
2262 ratios,
2263 n_legs,
2264 is_futures_spread,
2265 price_precision,
2266 size_precision,
2267 last_quotes: AHashMap::new(),
2268 mid_prices: vec![0.0; n_legs],
2269 bid_prices: vec![0.0; n_legs],
2270 ask_prices: vec![0.0; n_legs],
2271 vegas: vec![0.0; n_legs],
2272 bid_ask_spreads: vec![0.0; n_legs],
2273 bid_sizes: vec![0.0; n_legs],
2274 ask_sizes: vec![0.0; n_legs],
2275 handler,
2276 clock,
2277 historical_mode,
2278 update_interval_seconds,
2279 quote_build_delay,
2280 has_update: false,
2281 timer_name,
2282 historical_event_at_ts_init: None,
2283 vega_provider,
2284 price_rounder,
2285 is_running: false,
2286 aggregator_weak: None,
2287 }
2288 }
2289
2290 pub fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Self>>) {
2293 self.aggregator_weak = Some(weak);
2294 }
2295
2296 pub fn prepare_for_timer_mode(&mut self, self_rc: &Rc<RefCell<Self>>) {
2301 self.aggregator_weak = Some(Rc::downgrade(self_rc));
2302 }
2303
2304 pub fn set_historical_mode(
2306 &mut self,
2307 historical_mode: bool,
2308 handler: Box<dyn FnMut(QuoteTick)>,
2309 vega_provider: Option<Box<dyn VegaProvider>>,
2310 ) {
2311 self.historical_mode = historical_mode;
2312 self.handler = handler;
2313
2314 if let Some(vp) = vega_provider {
2315 self.vega_provider = Some(vp);
2316 }
2317 }
2318
2319 pub fn set_running(&mut self, is_running: bool) {
2320 self.is_running = is_running;
2321 }
2322
2323 pub fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
2324 self.clock = clock;
2325 }
2326
2327 pub fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Self>>>) {
2336 let Some(interval_secs) = self.update_interval_seconds else {
2337 return;
2338 };
2339 let aggregator_weak = if let Some(rc) = aggregator_rc {
2340 let weak = Rc::downgrade(&rc);
2341 self.aggregator_weak = Some(weak.clone());
2342 weak
2343 } else {
2344 self.aggregator_weak.clone().expect(
2345 "SpreadQuoteAggregator: timer mode requires prepare_for_timer_mode(rc) to be \
2346 called first with the Rc that wraps this aggregator (before feeding quotes in \
2347 historical mode or before start_timer(None)).",
2348 )
2349 };
2350
2351 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
2352 if let Some(agg) = aggregator_weak.upgrade() {
2353 agg.borrow_mut().on_timer_fire(event.ts_event);
2354 }
2355 }));
2356
2357 let now_ns = self.clock.borrow().timestamp_ns();
2358 let interval_ns = interval_secs * 1_000_000_000;
2359 let start_ns = (now_ns.as_u64() / interval_ns) * interval_ns;
2360 let start_ns = start_ns + self.quote_build_delay * 1_000; let start_time = UnixNanos::from(start_ns);
2362 let fire_immediately = now_ns == start_time;
2363 self.clock
2364 .borrow_mut()
2365 .set_timer_ns(
2366 &self.timer_name,
2367 interval_ns,
2368 Some(start_time),
2369 None,
2370 Some(callback),
2371 Some(true),
2372 Some(fire_immediately),
2373 )
2374 .expect("Failed to set spread quote timer");
2375 }
2376
2377 pub fn on_timer_fire(&mut self, ts_event: UnixNanos) {
2379 if self.last_quotes.len() == self.n_legs {
2380 self.build_and_send_quote(ts_event);
2381 }
2382 }
2383
2384 pub fn stop_timer(&mut self) {
2386 if self.update_interval_seconds.is_none() {
2387 return;
2388 }
2389
2390 if self
2391 .clock
2392 .borrow()
2393 .timer_names()
2394 .contains(&self.timer_name.as_str())
2395 {
2396 self.clock.borrow_mut().cancel_timer(&self.timer_name);
2397 }
2398 }
2399
2400 pub fn handle_quote_tick(&mut self, tick: QuoteTick) {
2402 let ts_init = tick.ts_init;
2403
2404 if self.update_interval_seconds.is_some() && self.historical_mode {
2405 self.process_historical_events(ts_init);
2406 }
2407 self.last_quotes.insert(tick.instrument_id, tick);
2408 self.has_update = true;
2409
2410 if self.update_interval_seconds.is_none() && self.last_quotes.len() == self.n_legs {
2411 self.build_and_send_quote(ts_init);
2412 }
2413 }
2414
2415 pub fn flush_pending_historical_quote(&mut self) {
2421 if self.update_interval_seconds.is_none() || !self.historical_mode {
2422 return;
2423 }
2424
2425 let Some(event) = self.historical_event_at_ts_init.take() else {
2426 return;
2427 };
2428
2429 if self.last_quotes.len() == self.n_legs {
2430 self.build_and_send_quote(event.ts_event);
2431 }
2432 }
2433
2434 fn process_historical_events(&mut self, ts_init: UnixNanos) {
2440 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
2441 let mut clock_borrow = self.clock.borrow_mut();
2442 let test_clock = clock_borrow
2443 .as_any_mut()
2444 .downcast_mut::<TestClock>()
2445 .expect("Expected TestClock in historical mode");
2446 test_clock.set_time(ts_init);
2447 drop(clock_borrow);
2448 self.start_timer(None);
2449 }
2450
2451 if self.last_quotes.len() == self.n_legs
2452 && let Some(ref event) = self.historical_event_at_ts_init
2453 && event.ts_event < ts_init
2454 {
2455 let event = self.historical_event_at_ts_init.take().unwrap();
2457 self.build_and_send_quote(event.ts_event);
2458 }
2459
2460 let events = {
2461 let mut clock_borrow = self.clock.borrow_mut();
2462 let test_clock = clock_borrow
2463 .as_any_mut()
2464 .downcast_mut::<TestClock>()
2465 .expect("Expected TestClock in historical mode");
2466 test_clock.advance_time(ts_init, true)
2467 };
2468
2469 for event in events {
2470 if event.ts_event == ts_init {
2471 self.historical_event_at_ts_init = Some(event);
2472 } else if self.last_quotes.len() == self.n_legs {
2473 self.build_and_send_quote(event.ts_event);
2474 }
2475 }
2476 }
2477
2478 fn build_and_send_quote(&mut self, ts_event: UnixNanos) {
2480 if !self.has_update {
2481 return;
2482 }
2483
2484 for (idx, &leg_id) in self.leg_ids.iter().enumerate() {
2485 let Some(tick) = self.last_quotes.get(&leg_id) else {
2486 log::error!(
2487 "SpreadQuoteAggregator[{}]: Missing quote for leg {}",
2488 self.spread_instrument_id,
2489 leg_id
2490 );
2491 return;
2492 };
2493 let ask_price = tick.ask_price.as_f64();
2494 let bid_price = tick.bid_price.as_f64();
2495 self.bid_prices[idx] = bid_price;
2496 self.ask_prices[idx] = ask_price;
2497 self.bid_sizes[idx] = tick.bid_size.as_f64();
2498 self.ask_sizes[idx] = tick.ask_size.as_f64();
2499
2500 if !self.is_futures_spread {
2501 self.mid_prices[idx] = f64::midpoint(ask_price, bid_price);
2502 self.bid_ask_spreads[idx] = ask_price - bid_price;
2503
2504 if let Some(ref vp) = self.vega_provider
2505 && let Some(vega) = vp.vega_for_leg(leg_id)
2506 {
2507 self.vegas[idx] = vega;
2508 }
2509 }
2510 }
2511 let (raw_bid, raw_ask) = if self.is_futures_spread {
2512 self.create_futures_spread_prices()
2513 } else {
2514 self.create_option_spread_prices()
2515 };
2516 let spread_quote = self.create_quote_tick_from_raw_prices(raw_bid, raw_ask, ts_event);
2517 self.has_update = false;
2518 (self.handler)(spread_quote);
2519 }
2520
2521 fn create_option_spread_prices(&self) -> (f64, f64) {
2522 let vega_multipliers: Vec<f64> = (0..self.n_legs)
2523 .map(|i| {
2524 if self.vegas[i] == 0.0 {
2525 0.0
2526 } else {
2527 self.bid_ask_spreads[i] / self.vegas[i]
2528 }
2529 })
2530 .collect();
2531 let non_zero: Vec<f64> = vega_multipliers
2532 .iter()
2533 .copied()
2534 .filter(|&x| x != 0.0)
2535 .collect();
2536
2537 if non_zero.is_empty() {
2538 log::warn!(
2539 "No vega information available for the components of {}. Will generate spread quote using component quotes only",
2540 self.spread_instrument_id
2541 );
2542 return self.create_futures_spread_prices();
2543 }
2544 let vega_multiplier = non_zero.iter().map(|x| x.abs()).sum::<f64>() / non_zero.len() as f64;
2545 let spread_vega = self
2546 .vegas
2547 .iter()
2548 .zip(self.ratios.iter())
2549 .map(|(v, r)| v * (*r as f64))
2550 .sum::<f64>()
2551 .abs();
2552 let bid_ask_spread = spread_vega * vega_multiplier;
2553 let spread_mid_price: f64 = self
2554 .mid_prices
2555 .iter()
2556 .zip(self.ratios.iter())
2557 .map(|(m, r)| m * (*r as f64))
2558 .sum();
2559 let raw_bid = spread_mid_price - bid_ask_spread * 0.5;
2560 let raw_ask = spread_mid_price + bid_ask_spread * 0.5;
2561 (raw_bid, raw_ask)
2562 }
2563
2564 fn create_futures_spread_prices(&self) -> (f64, f64) {
2565 let mut raw_ask = 0.0_f64;
2566 let mut raw_bid = 0.0_f64;
2567
2568 for i in 0..self.n_legs {
2569 let r = self.ratios[i] as f64;
2570 if self.ratios[i] >= 0 {
2571 raw_ask += r * self.ask_prices[i];
2572 raw_bid += r * self.bid_prices[i];
2573 } else {
2574 raw_ask += r * self.bid_prices[i];
2575 raw_bid += r * self.ask_prices[i];
2576 }
2577 }
2578 (raw_bid, raw_ask)
2579 }
2580
2581 fn create_quote_tick_from_raw_prices(
2582 &self,
2583 raw_bid_price: f64,
2584 raw_ask_price: f64,
2585 ts_event: UnixNanos,
2586 ) -> QuoteTick {
2587 let (bid_price, ask_price) = if let Some(ref rounder) = self.price_rounder {
2588 rounder.round_prices(raw_bid_price, raw_ask_price, self.price_precision)
2589 } else {
2590 let bid = price_from_f64(raw_bid_price, self.price_precision);
2591 let ask = price_from_f64(raw_ask_price, self.price_precision);
2592 (bid, ask)
2593 };
2594 let mut min_bid_size = f64::INFINITY;
2595 let mut min_ask_size = f64::INFINITY;
2596 for i in 0..self.n_legs {
2597 let abs_ratio = self.ratios[i].unsigned_abs() as f64;
2598 if self.ratios[i] >= 0 {
2599 let b = self.bid_sizes[i] / abs_ratio;
2600 if b < min_bid_size {
2601 min_bid_size = b;
2602 }
2603 let a = self.ask_sizes[i] / abs_ratio;
2604 if a < min_ask_size {
2605 min_ask_size = a;
2606 }
2607 } else {
2608 let b = self.ask_sizes[i] / abs_ratio;
2609 if b < min_bid_size {
2610 min_bid_size = b;
2611 }
2612 let a = self.bid_sizes[i] / abs_ratio;
2613 if a < min_ask_size {
2614 min_ask_size = a;
2615 }
2616 }
2617 }
2618 let bid_size = Quantity::new(min_bid_size, self.size_precision);
2619 let ask_size = Quantity::new(min_ask_size, self.size_precision);
2620 QuoteTick::new(
2621 self.spread_instrument_id,
2622 bid_price,
2623 ask_price,
2624 bid_size,
2625 ask_size,
2626 ts_event,
2627 ts_event,
2628 )
2629 }
2630}
2631
2632fn price_from_f64(v: f64, precision: u8) -> Price {
2633 Price::new(v, precision)
2634}
2635
2636#[cfg(test)]
2637mod tests {
2638 use std::sync::{Arc, Mutex};
2639
2640 use nautilus_common::{clock::TestClock, timer::TimeEvent};
2641 use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
2642 use nautilus_model::{
2643 data::{BarSpecification, BarType, QuoteTick},
2644 enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
2645 identifiers::InstrumentId,
2646 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
2647 types::{Price, Quantity},
2648 };
2649 use rstest::rstest;
2650 use ustr::Ustr;
2651
2652 use super::*;
2653
2654 #[rstest]
2655 fn test_bar_builder_initialization(equity_aapl: Equity) {
2656 let instrument = InstrumentAny::Equity(equity_aapl);
2657 let bar_type = BarType::new(
2658 instrument.id(),
2659 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2660 AggregationSource::Internal,
2661 );
2662 let builder = BarBuilder::new(
2663 bar_type,
2664 instrument.price_precision(),
2665 instrument.size_precision(),
2666 );
2667
2668 assert!(!builder.initialized);
2669 assert_eq!(builder.ts_last, 0);
2670 assert_eq!(builder.count, 0);
2671 }
2672
2673 #[rstest]
2674 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
2675 let instrument = InstrumentAny::Equity(equity_aapl);
2676 let bar_type = BarType::new(
2677 instrument.id(),
2678 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2679 AggregationSource::Internal,
2680 );
2681 let mut builder = BarBuilder::new(
2682 bar_type,
2683 instrument.price_precision(),
2684 instrument.size_precision(),
2685 );
2686
2687 builder.update(
2688 Price::from("100.00"),
2689 Quantity::from(1),
2690 UnixNanos::from(1000),
2691 );
2692 builder.update(
2693 Price::from("95.00"),
2694 Quantity::from(1),
2695 UnixNanos::from(2000),
2696 );
2697 builder.update(
2698 Price::from("105.00"),
2699 Quantity::from(1),
2700 UnixNanos::from(3000),
2701 );
2702
2703 let bar = builder.build_now();
2704 assert!(bar.high > bar.low);
2705 assert_eq!(bar.open, Price::from("100.00"));
2706 assert_eq!(bar.high, Price::from("105.00"));
2707 assert_eq!(bar.low, Price::from("95.00"));
2708 assert_eq!(bar.close, Price::from("105.00"));
2709 }
2710
2711 #[rstest]
2712 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
2713 let instrument = InstrumentAny::Equity(equity_aapl);
2714 let bar_type = BarType::new(
2715 instrument.id(),
2716 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2717 AggregationSource::Internal,
2718 );
2719 let mut builder = BarBuilder::new(
2720 bar_type,
2721 instrument.price_precision(),
2722 instrument.size_precision(),
2723 );
2724
2725 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2726 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2727
2728 assert_eq!(builder.ts_last, 1_000);
2729 assert_eq!(builder.count, 1);
2730 }
2731
2732 #[rstest]
2733 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2734 let instrument = InstrumentAny::Equity(equity_aapl);
2735 let bar_type = BarType::new(
2736 instrument.id(),
2737 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2738 AggregationSource::Internal,
2739 );
2740 let mut builder = BarBuilder::new(
2741 bar_type,
2742 instrument.price_precision(),
2743 instrument.size_precision(),
2744 );
2745
2746 builder.update(
2747 Price::from("1.00000"),
2748 Quantity::from(1),
2749 UnixNanos::default(),
2750 );
2751
2752 assert!(builder.initialized);
2753 assert_eq!(builder.ts_last, 0);
2754 assert_eq!(builder.count, 1);
2755 }
2756
2757 #[rstest]
2758 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2759 equity_aapl: Equity,
2760 ) {
2761 let instrument = InstrumentAny::Equity(equity_aapl);
2762 let bar_type = BarType::new(
2763 instrument.id(),
2764 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2765 AggregationSource::Internal,
2766 );
2767 let mut builder = BarBuilder::new(bar_type, 2, 0);
2768
2769 builder.update(
2770 Price::from("1.00000"),
2771 Quantity::from(1),
2772 UnixNanos::from(1_000),
2773 );
2774 builder.update(
2775 Price::from("1.00001"),
2776 Quantity::from(1),
2777 UnixNanos::from(500),
2778 );
2779
2780 assert!(builder.initialized);
2781 assert_eq!(builder.ts_last, 1_000);
2782 assert_eq!(builder.count, 1);
2783 }
2784
2785 #[rstest]
2786 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2787 let instrument = InstrumentAny::Equity(equity_aapl);
2788 let bar_type = BarType::new(
2789 instrument.id(),
2790 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2791 AggregationSource::Internal,
2792 );
2793 let mut builder = BarBuilder::new(
2794 bar_type,
2795 instrument.price_precision(),
2796 instrument.size_precision(),
2797 );
2798
2799 for _ in 0..5 {
2800 builder.update(
2801 Price::from("1.00000"),
2802 Quantity::from(1),
2803 UnixNanos::from(1_000),
2804 );
2805 }
2806
2807 assert_eq!(builder.count, 5);
2808 }
2809
2810 #[rstest]
2811 #[should_panic]
2812 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2813 let instrument = InstrumentAny::Equity(equity_aapl);
2814 let bar_type = BarType::new(
2815 instrument.id(),
2816 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2817 AggregationSource::Internal,
2818 );
2819 let mut builder = BarBuilder::new(
2820 bar_type,
2821 instrument.price_precision(),
2822 instrument.size_precision(),
2823 );
2824 let _ = builder.build_now();
2825 }
2826
2827 #[rstest]
2828 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2829 let instrument = InstrumentAny::Equity(equity_aapl);
2830 let bar_type = BarType::new(
2831 instrument.id(),
2832 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2833 AggregationSource::Internal,
2834 );
2835 let mut builder = BarBuilder::new(
2836 bar_type,
2837 instrument.price_precision(),
2838 instrument.size_precision(),
2839 );
2840
2841 builder.update(
2842 Price::from("1.00001"),
2843 Quantity::from(2),
2844 UnixNanos::default(),
2845 );
2846 builder.update(
2847 Price::from("1.00002"),
2848 Quantity::from(2),
2849 UnixNanos::default(),
2850 );
2851 builder.update(
2852 Price::from("1.00000"),
2853 Quantity::from(1),
2854 UnixNanos::from(1_000_000_000),
2855 );
2856
2857 let bar = builder.build_now();
2858
2859 assert_eq!(bar.open, Price::from("1.00001"));
2860 assert_eq!(bar.high, Price::from("1.00002"));
2861 assert_eq!(bar.low, Price::from("1.00000"));
2862 assert_eq!(bar.close, Price::from("1.00000"));
2863 assert_eq!(bar.volume, Quantity::from(5));
2864 assert_eq!(bar.ts_init, 1_000_000_000);
2865 assert_eq!(builder.ts_last, 1_000_000_000);
2866 assert_eq!(builder.count, 0);
2867 }
2868
2869 #[rstest]
2870 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2871 let instrument = InstrumentAny::Equity(equity_aapl);
2872 let bar_type = BarType::new(
2873 instrument.id(),
2874 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2875 AggregationSource::Internal,
2876 );
2877 let mut builder = BarBuilder::new(bar_type, 2, 0);
2878
2879 builder.update(
2880 Price::from("1.00001"),
2881 Quantity::from(1),
2882 UnixNanos::default(),
2883 );
2884 builder.build_now();
2885
2886 builder.update(
2887 Price::from("1.00000"),
2888 Quantity::from(1),
2889 UnixNanos::default(),
2890 );
2891 builder.update(
2892 Price::from("1.00003"),
2893 Quantity::from(1),
2894 UnixNanos::default(),
2895 );
2896 builder.update(
2897 Price::from("1.00002"),
2898 Quantity::from(1),
2899 UnixNanos::default(),
2900 );
2901
2902 let bar = builder.build_now();
2903
2904 assert_eq!(bar.open, Price::from("1.00000"));
2905 assert_eq!(bar.high, Price::from("1.00003"));
2906 assert_eq!(bar.low, Price::from("1.00000"));
2907 assert_eq!(bar.close, Price::from("1.00002"));
2908 assert_eq!(bar.volume, Quantity::from(3));
2909 }
2910
2911 #[rstest]
2912 fn test_bar_builder_update_bar_initializes_then_accumulates(equity_aapl: Equity) {
2913 let instrument = InstrumentAny::Equity(equity_aapl);
2914 let bar_type = BarType::new(
2915 instrument.id(),
2916 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2917 AggregationSource::Internal,
2918 );
2919 let mut builder = BarBuilder::new(
2920 bar_type,
2921 instrument.price_precision(),
2922 instrument.size_precision(),
2923 );
2924
2925 let bar_one = Bar::new(
2926 bar_type,
2927 Price::from("100.00"),
2928 Price::from("102.00"),
2929 Price::from("99.00"),
2930 Price::from("101.00"),
2931 Quantity::from(10),
2932 UnixNanos::from(1_000),
2933 UnixNanos::from(1_000),
2934 );
2935 let bar_two = Bar::new(
2936 bar_type,
2937 Price::from("101.00"),
2938 Price::from("103.00"),
2939 Price::from("98.00"),
2940 Price::from("102.00"),
2941 Quantity::from(5),
2942 UnixNanos::from(2_000),
2943 UnixNanos::from(2_000),
2944 );
2945
2946 builder.update_bar(bar_one, bar_one.volume, bar_one.ts_init);
2947 builder.update_bar(bar_two, bar_two.volume, bar_two.ts_init);
2948 let bar = builder.build_now();
2949
2950 assert_eq!(bar.open, Price::from("100.00"));
2951 assert_eq!(bar.high, Price::from("103.00"));
2952 assert_eq!(bar.low, Price::from("98.00"));
2953 assert_eq!(bar.close, Price::from("102.00"));
2954 assert_eq!(bar.volume, Quantity::from(15));
2955 assert_eq!(builder.count, 0);
2956 }
2957
2958 #[rstest]
2959 fn test_bar_builder_update_bar_ignores_earlier_timestamp(equity_aapl: Equity) {
2960 let instrument = InstrumentAny::Equity(equity_aapl);
2961 let bar_type = BarType::new(
2962 instrument.id(),
2963 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2964 AggregationSource::Internal,
2965 );
2966 let mut builder = BarBuilder::new(
2967 bar_type,
2968 instrument.price_precision(),
2969 instrument.size_precision(),
2970 );
2971
2972 let bar_later = Bar::new(
2973 bar_type,
2974 Price::from("100.00"),
2975 Price::from("101.00"),
2976 Price::from("99.00"),
2977 Price::from("100.50"),
2978 Quantity::from(10),
2979 UnixNanos::from(2_000),
2980 UnixNanos::from(2_000),
2981 );
2982 let bar_earlier = Bar::new(
2983 bar_type,
2984 Price::from("200.00"),
2985 Price::from("210.00"),
2986 Price::from("190.00"),
2987 Price::from("205.00"),
2988 Quantity::from(50),
2989 UnixNanos::from(1_000),
2990 UnixNanos::from(1_000),
2991 );
2992
2993 builder.update_bar(bar_later, bar_later.volume, bar_later.ts_init);
2994 builder.update_bar(bar_earlier, bar_earlier.volume, bar_earlier.ts_init);
2995
2996 assert_eq!(builder.ts_last, 2_000);
2997 assert_eq!(builder.count, 1);
2998 assert_eq!(builder.volume, Quantity::from(10));
2999 }
3000
3001 #[rstest]
3002 #[case::spread_zero_inactive(
3003 Decimal::ZERO,
3004 ContinuousFutureAdjustmentType::BackwardSpread,
3005 false
3006 )]
3007 #[case::spread_positive_active(
3008 Decimal::new(150, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3010 true,
3011 )]
3012 #[case::spread_negative_active(
3013 Decimal::new(-250, 2), ContinuousFutureAdjustmentType::ForwardSpread,
3015 true,
3016 )]
3017 #[case::spread_sub_precision_inactive(
3018 Decimal::new(1, 28),
3020 ContinuousFutureAdjustmentType::BackwardSpread,
3021 false,
3022 )]
3023 #[case::ratio_one_inactive(Decimal::ONE, ContinuousFutureAdjustmentType::BackwardRatio, false)]
3024 #[case::ratio_non_one_active(
3025 Decimal::new(105, 2), ContinuousFutureAdjustmentType::ForwardRatio,
3027 true,
3028 )]
3029 fn test_bar_builder_set_adjustment_active_flag(
3030 equity_aapl: Equity,
3031 #[case] adjustment: Decimal,
3032 #[case] mode: ContinuousFutureAdjustmentType,
3033 #[case] expected_active: bool,
3034 ) {
3035 let instrument = InstrumentAny::Equity(equity_aapl);
3036 let bar_type = BarType::new(
3037 instrument.id(),
3038 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3039 AggregationSource::Internal,
3040 );
3041 let mut builder = BarBuilder::new(bar_type, 2, 0);
3042
3043 builder.set_adjustment(adjustment, mode);
3044
3045 assert_eq!(builder.adjustment_active, expected_active);
3046 assert_eq!(builder.adjustment_is_ratio, mode.is_ratio());
3047 assert_eq!(builder.adjustment_mode, mode);
3048 }
3049
3050 #[rstest]
3051 fn test_bar_builder_set_adjustment_mode_switch_resets_flags(equity_aapl: Equity) {
3052 let instrument = InstrumentAny::Equity(equity_aapl);
3053 let bar_type = BarType::new(
3054 instrument.id(),
3055 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3056 AggregationSource::Internal,
3057 );
3058 let mut builder = BarBuilder::new(bar_type, 2, 0);
3059
3060 builder.set_adjustment(
3062 Decimal::new(150, 2), ContinuousFutureAdjustmentType::BackwardRatio,
3064 );
3065 builder.set_adjustment(
3066 Decimal::new(50, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3068 );
3069 assert!(!builder.adjustment_is_ratio);
3070 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3071 assert_eq!(builder.build_now().close, Price::from("100.50"));
3072
3073 builder.set_adjustment(
3075 Decimal::new(11, 1), ContinuousFutureAdjustmentType::ForwardRatio,
3077 );
3078 assert!(builder.adjustment_is_ratio);
3079 builder.update(Price::from("100.00"), Quantity::from(1), 2_000.into());
3080 assert_eq!(builder.build_now().close, Price::from("110.00"));
3081 }
3082
3083 #[rstest]
3084 fn test_bar_builder_update_applies_backward_spread_adjustment(equity_aapl: Equity) {
3085 let instrument = InstrumentAny::Equity(equity_aapl);
3086 let bar_type = BarType::new(
3087 instrument.id(),
3088 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3089 AggregationSource::Internal,
3090 );
3091 let mut builder = BarBuilder::new(bar_type, 2, 0);
3092
3093 builder.set_adjustment(
3094 Decimal::new(250, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3096 );
3097
3098 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3099 builder.update(Price::from("99.00"), Quantity::from(1), 2_000.into());
3100 builder.update(Price::from("101.00"), Quantity::from(1), 3_000.into());
3101
3102 let bar = builder.build_now();
3103 assert_eq!(bar.open, Price::from("102.50"));
3104 assert_eq!(bar.high, Price::from("103.50"));
3105 assert_eq!(bar.low, Price::from("101.50"));
3106 assert_eq!(bar.close, Price::from("103.50"));
3107 }
3108
3109 #[rstest]
3110 fn test_bar_builder_update_applies_forward_ratio_adjustment(equity_aapl: Equity) {
3111 let instrument = InstrumentAny::Equity(equity_aapl);
3112 let bar_type = BarType::new(
3113 instrument.id(),
3114 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3115 AggregationSource::Internal,
3116 );
3117 let mut builder = BarBuilder::new(bar_type, 2, 0);
3118
3119 builder.set_adjustment(
3120 Decimal::new(11, 1), ContinuousFutureAdjustmentType::ForwardRatio,
3122 );
3123
3124 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3125 builder.update(Price::from("90.00"), Quantity::from(1), 2_000.into());
3126 builder.update(Price::from("110.00"), Quantity::from(1), 3_000.into());
3127
3128 let bar = builder.build_now();
3129 assert_eq!(bar.open, Price::from("110.00"));
3130 assert_eq!(bar.high, Price::from("121.00"));
3131 assert_eq!(bar.low, Price::from("99.00"));
3132 assert_eq!(bar.close, Price::from("121.00"));
3133 }
3134
3135 #[rstest]
3136 fn test_bar_builder_update_bar_applies_adjustment_to_ohlc(equity_aapl: Equity) {
3137 let instrument = InstrumentAny::Equity(equity_aapl);
3138 let bar_type = BarType::new(
3139 instrument.id(),
3140 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3141 AggregationSource::Internal,
3142 );
3143 let mut builder = BarBuilder::new(bar_type, 2, 0);
3144
3145 builder.set_adjustment(
3146 Decimal::new(-100, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3148 );
3149
3150 let input = Bar::new(
3151 bar_type,
3152 Price::from("100.00"),
3153 Price::from("105.00"),
3154 Price::from("99.00"),
3155 Price::from("102.00"),
3156 Quantity::from(10),
3157 UnixNanos::from(1_000),
3158 UnixNanos::from(1_000),
3159 );
3160 builder.update_bar(input, input.volume, input.ts_init);
3161
3162 let bar = builder.build_now();
3163 assert_eq!(bar.open, Price::from("99.00"));
3164 assert_eq!(bar.high, Price::from("104.00"));
3165 assert_eq!(bar.low, Price::from("98.00"));
3166 assert_eq!(bar.close, Price::from("101.00"));
3167 }
3168
3169 #[rstest]
3170 fn test_bar_builder_reset_retains_adjustment(equity_aapl: Equity) {
3171 let instrument = InstrumentAny::Equity(equity_aapl);
3172 let bar_type = BarType::new(
3173 instrument.id(),
3174 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3175 AggregationSource::Internal,
3176 );
3177 let mut builder = BarBuilder::new(bar_type, 2, 0);
3178
3179 builder.set_adjustment(
3180 Decimal::new(500, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3182 );
3183 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3184 let bar_one = builder.build_now();
3185 assert_eq!(bar_one.close, Price::from("105.00"));
3186
3187 assert!(builder.adjustment_active);
3189
3190 builder.update(Price::from("110.00"), Quantity::from(1), 2_000.into());
3191 let bar_two = builder.build_now();
3192 assert_eq!(bar_two.close, Price::from("115.00"));
3193 }
3194
3195 #[rstest]
3196 fn test_bar_builder_update_bar_applies_ratio_adjustment(equity_aapl: Equity) {
3197 let instrument = InstrumentAny::Equity(equity_aapl);
3198 let bar_type = BarType::new(
3199 instrument.id(),
3200 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3201 AggregationSource::Internal,
3202 );
3203 let mut builder = BarBuilder::new(bar_type, 2, 0);
3204
3205 builder.set_adjustment(
3206 Decimal::new(11, 1), ContinuousFutureAdjustmentType::ForwardRatio,
3208 );
3209
3210 let input = Bar::new(
3211 bar_type,
3212 Price::from("100.00"),
3213 Price::from("110.00"),
3214 Price::from("90.00"),
3215 Price::from("105.00"),
3216 Quantity::from(10),
3217 UnixNanos::from(1_000),
3218 UnixNanos::from(1_000),
3219 );
3220 builder.update_bar(input, input.volume, input.ts_init);
3221
3222 let bar = builder.build_now();
3223 assert_eq!(bar.open, Price::from("110.00"));
3224 assert_eq!(bar.high, Price::from("121.00"));
3225 assert_eq!(bar.low, Price::from("99.00"));
3226 assert_eq!(bar.close, Price::from("115.50"));
3227 }
3228
3229 #[rstest]
3230 fn test_bar_builder_spread_below_zero_representable(equity_aapl: Equity) {
3231 let instrument = InstrumentAny::Equity(equity_aapl);
3234 let bar_type = BarType::new(
3235 instrument.id(),
3236 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3237 AggregationSource::Internal,
3238 );
3239 let mut builder = BarBuilder::new(bar_type, 2, 0);
3240
3241 builder.set_adjustment(
3242 Decimal::new(-15000, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3244 );
3245
3246 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3247 let bar = builder.build_now();
3248 assert_eq!(bar.close, Price::from("-50.00"));
3249 assert!(bar.close.raw < 0);
3250 assert_eq!(bar.close.precision, 2);
3251 }
3252
3253 #[rstest]
3254 fn test_bar_builder_build_promotes_close_above_high_from_previous_close(equity_aapl: Equity) {
3255 let instrument = InstrumentAny::Equity(equity_aapl);
3256 let bar_type = BarType::new(
3257 instrument.id(),
3258 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3259 AggregationSource::Internal,
3260 );
3261 let mut builder = BarBuilder::new(bar_type, 2, 0);
3262
3263 builder.update(
3264 Price::from("110.00"),
3265 Quantity::from(1),
3266 UnixNanos::from(100),
3267 );
3268 builder.build_now();
3269
3270 builder.update(
3271 Price::from("100.00"),
3272 Quantity::from(1),
3273 UnixNanos::from(200),
3274 );
3275 builder.update(
3276 Price::from("101.00"),
3277 Quantity::from(1),
3278 UnixNanos::from(300),
3279 );
3280 builder.update(
3281 Price::from("200.00"),
3282 Quantity::from(1),
3283 UnixNanos::from(400),
3284 );
3285
3286 let bar = builder.build_now();
3287 assert_eq!(bar.open, Price::from("100.00"));
3288 assert_eq!(bar.high, Price::from("200.00"));
3289 assert_eq!(bar.low, Price::from("100.00"));
3290 assert_eq!(bar.close, Price::from("200.00"));
3291 }
3292
3293 #[rstest]
3294 fn test_bar_builder_build_clamps_low_to_close(equity_aapl: Equity) {
3295 let instrument = InstrumentAny::Equity(equity_aapl);
3299 let bar_type = BarType::new(
3300 instrument.id(),
3301 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3302 AggregationSource::Internal,
3303 );
3304 let mut builder = BarBuilder::new(bar_type, 2, 0);
3305
3306 builder.update(
3307 Price::from("100.00"),
3308 Quantity::from(1),
3309 UnixNanos::from(100),
3310 );
3311 builder.close = Some(Price::from("50.00"));
3312
3313 let bar = builder.build_now();
3314 assert_eq!(bar.low, Price::from("50.00"));
3315 assert_eq!(bar.close, Price::from("50.00"));
3316 assert!(bar.low <= bar.open);
3317 }
3318
3319 #[rstest]
3320 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
3321 let instrument = InstrumentAny::Equity(equity_aapl);
3322 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3323 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3324 let handler = Arc::new(Mutex::new(Vec::new()));
3325 let handler_clone = Arc::clone(&handler);
3326
3327 let mut aggregator = TickBarAggregator::new(
3328 bar_type,
3329 instrument.price_precision(),
3330 instrument.size_precision(),
3331 move |bar: Bar| {
3332 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3333 handler_guard.push(bar);
3334 },
3335 );
3336
3337 let trade = TradeTick::default();
3338 aggregator.handle_trade(trade);
3339
3340 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3341 assert_eq!(handler_guard.len(), 0);
3342 }
3343
3344 #[rstest]
3345 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
3346 let instrument = InstrumentAny::Equity(equity_aapl);
3347 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3348 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3349 let handler = Arc::new(Mutex::new(Vec::new()));
3350 let handler_clone = Arc::clone(&handler);
3351
3352 let mut aggregator = TickBarAggregator::new(
3353 bar_type,
3354 instrument.price_precision(),
3355 instrument.size_precision(),
3356 move |bar: Bar| {
3357 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3358 handler_guard.push(bar);
3359 },
3360 );
3361
3362 let trade = TradeTick::default();
3363 aggregator.handle_trade(trade);
3364 aggregator.handle_trade(trade);
3365 aggregator.handle_trade(trade);
3366
3367 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3368 let bar = handler_guard.first().unwrap();
3369 assert_eq!(handler_guard.len(), 1);
3370 assert_eq!(bar.open, trade.price);
3371 assert_eq!(bar.high, trade.price);
3372 assert_eq!(bar.low, trade.price);
3373 assert_eq!(bar.close, trade.price);
3374 assert_eq!(bar.volume, Quantity::from(300000));
3375 assert_eq!(bar.ts_event, trade.ts_event);
3376 assert_eq!(bar.ts_init, trade.ts_init);
3377 }
3378
3379 #[rstest]
3380 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
3381 let instrument = InstrumentAny::Equity(equity_aapl);
3382 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3383 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3384 let handler = Arc::new(Mutex::new(Vec::new()));
3385 let handler_clone = Arc::clone(&handler);
3386
3387 let mut aggregator = TickBarAggregator::new(
3388 bar_type,
3389 instrument.price_precision(),
3390 instrument.size_precision(),
3391 move |bar: Bar| {
3392 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3393 handler_guard.push(bar);
3394 },
3395 );
3396
3397 aggregator.update(
3398 Price::from("1.00001"),
3399 Quantity::from(1),
3400 UnixNanos::default(),
3401 );
3402 aggregator.update(
3403 Price::from("1.00002"),
3404 Quantity::from(1),
3405 UnixNanos::from(1000),
3406 );
3407 aggregator.update(
3408 Price::from("1.00003"),
3409 Quantity::from(1),
3410 UnixNanos::from(2000),
3411 );
3412
3413 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3414 assert_eq!(handler_guard.len(), 1);
3415
3416 let bar = handler_guard.first().unwrap();
3417 assert_eq!(bar.open, Price::from("1.00001"));
3418 assert_eq!(bar.high, Price::from("1.00003"));
3419 assert_eq!(bar.low, Price::from("1.00001"));
3420 assert_eq!(bar.close, Price::from("1.00003"));
3421 assert_eq!(bar.volume, Quantity::from(3));
3422 }
3423
3424 #[rstest]
3425 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
3426 let instrument = InstrumentAny::Equity(equity_aapl);
3427 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
3428 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3429 let handler = Arc::new(Mutex::new(Vec::new()));
3430 let handler_clone = Arc::clone(&handler);
3431
3432 let mut aggregator = TickBarAggregator::new(
3433 bar_type,
3434 instrument.price_precision(),
3435 instrument.size_precision(),
3436 move |bar: Bar| {
3437 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3438 handler_guard.push(bar);
3439 },
3440 );
3441
3442 aggregator.update(
3443 Price::from("1.00001"),
3444 Quantity::from(1),
3445 UnixNanos::default(),
3446 );
3447 aggregator.update(
3448 Price::from("1.00002"),
3449 Quantity::from(1),
3450 UnixNanos::from(1000),
3451 );
3452 aggregator.update(
3453 Price::from("1.00003"),
3454 Quantity::from(1),
3455 UnixNanos::from(2000),
3456 );
3457 aggregator.update(
3458 Price::from("1.00004"),
3459 Quantity::from(1),
3460 UnixNanos::from(3000),
3461 );
3462
3463 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3464 assert_eq!(handler_guard.len(), 2);
3465
3466 let bar1 = &handler_guard[0];
3467 assert_eq!(bar1.open, Price::from("1.00001"));
3468 assert_eq!(bar1.close, Price::from("1.00002"));
3469 assert_eq!(bar1.volume, Quantity::from(2));
3470
3471 let bar2 = &handler_guard[1];
3472 assert_eq!(bar2.open, Price::from("1.00003"));
3473 assert_eq!(bar2.close, Price::from("1.00004"));
3474 assert_eq!(bar2.volume, Quantity::from(2));
3475 }
3476
3477 #[rstest]
3478 fn test_non_time_bar_aggregators_use_historical_handler(
3479 equity_aapl: Equity,
3480 audusd_sim: CurrencyPair,
3481 ) {
3482 let instrument = InstrumentAny::Equity(equity_aapl);
3483 let instrument_id = instrument.id();
3484 let price_precision = instrument.price_precision();
3485 let size_precision = instrument.size_precision();
3486 let make_sink = |bars: Arc<Mutex<Vec<Bar>>>| {
3487 move |bar: Bar| {
3488 bars.lock().expect(MUTEX_POISONED).push(bar);
3489 }
3490 };
3491 let make_trade = |price: &str, size: i64, ts: u64| TradeTick {
3492 instrument_id,
3493 price: Price::from(price),
3494 size: Quantity::from(size),
3495 aggressor_side: AggressorSide::Buyer,
3496 ts_event: UnixNanos::from(ts),
3497 ts_init: UnixNanos::from(ts),
3498 ..TradeTick::default()
3499 };
3500
3501 macro_rules! assert_historical_sink_receives {
3502 ($name:expr, $aggregator:expr, $update:expr) => {{
3503 let initial_bars = Arc::new(Mutex::new(Vec::new()));
3504 let historical_bars = Arc::new(Mutex::new(Vec::new()));
3505 let mut aggregator = $aggregator(Arc::clone(&initial_bars));
3506 aggregator
3507 .set_historical_mode(true, Box::new(make_sink(Arc::clone(&historical_bars))));
3508 {
3509 let aggregator: &mut dyn BarAggregator = &mut aggregator;
3510 $update(aggregator);
3511 }
3512
3513 assert_eq!(
3514 initial_bars.lock().expect(MUTEX_POISONED).len(),
3515 0,
3516 "{}",
3517 $name,
3518 );
3519 assert_eq!(
3520 historical_bars.lock().expect(MUTEX_POISONED).len(),
3521 1,
3522 "{}",
3523 $name,
3524 );
3525 }};
3526 }
3527
3528 let tick_type = BarType::new(
3529 instrument_id,
3530 BarSpecification::new(1, BarAggregation::Tick, PriceType::Last),
3531 AggregationSource::Internal,
3532 );
3533 assert_historical_sink_receives!(
3534 "TickBarAggregator",
3535 |bars| TickBarAggregator::new(
3536 tick_type,
3537 price_precision,
3538 size_precision,
3539 make_sink(bars)
3540 ),
3541 |aggregator: &mut dyn BarAggregator| {
3542 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3543 }
3544 );
3545
3546 let tick_imbalance_type = BarType::new(
3547 instrument_id,
3548 BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last),
3549 AggregationSource::Internal,
3550 );
3551 assert_historical_sink_receives!(
3552 "TickImbalanceBarAggregator",
3553 |bars| TickImbalanceBarAggregator::new(
3554 tick_imbalance_type,
3555 price_precision,
3556 size_precision,
3557 make_sink(bars),
3558 ),
3559 |aggregator: &mut dyn BarAggregator| {
3560 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3561 }
3562 );
3563
3564 let tick_runs_type = BarType::new(
3565 instrument_id,
3566 BarSpecification::new(1, BarAggregation::TickRuns, PriceType::Last),
3567 AggregationSource::Internal,
3568 );
3569 assert_historical_sink_receives!(
3570 "TickRunsBarAggregator",
3571 |bars| TickRunsBarAggregator::new(
3572 tick_runs_type,
3573 price_precision,
3574 size_precision,
3575 make_sink(bars),
3576 ),
3577 |aggregator: &mut dyn BarAggregator| {
3578 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3579 }
3580 );
3581
3582 let volume_type = BarType::new(
3583 instrument_id,
3584 BarSpecification::new(1, BarAggregation::Volume, PriceType::Last),
3585 AggregationSource::Internal,
3586 );
3587 assert_historical_sink_receives!(
3588 "VolumeBarAggregator",
3589 |bars| VolumeBarAggregator::new(
3590 volume_type,
3591 price_precision,
3592 size_precision,
3593 make_sink(bars),
3594 ),
3595 |aggregator: &mut dyn BarAggregator| {
3596 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3597 }
3598 );
3599
3600 let volume_imbalance_type = BarType::new(
3601 instrument_id,
3602 BarSpecification::new(1, BarAggregation::VolumeImbalance, PriceType::Last),
3603 AggregationSource::Internal,
3604 );
3605 assert_historical_sink_receives!(
3606 "VolumeImbalanceBarAggregator",
3607 |bars| VolumeImbalanceBarAggregator::new(
3608 volume_imbalance_type,
3609 price_precision,
3610 size_precision,
3611 make_sink(bars),
3612 ),
3613 |aggregator: &mut dyn BarAggregator| {
3614 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3615 }
3616 );
3617
3618 let volume_runs_type = BarType::new(
3619 instrument_id,
3620 BarSpecification::new(1, BarAggregation::VolumeRuns, PriceType::Last),
3621 AggregationSource::Internal,
3622 );
3623 assert_historical_sink_receives!(
3624 "VolumeRunsBarAggregator",
3625 |bars| VolumeRunsBarAggregator::new(
3626 volume_runs_type,
3627 price_precision,
3628 size_precision,
3629 make_sink(bars),
3630 ),
3631 |aggregator: &mut dyn BarAggregator| {
3632 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3633 }
3634 );
3635
3636 let value_type = BarType::new(
3637 instrument_id,
3638 BarSpecification::new(100, BarAggregation::Value, PriceType::Last),
3639 AggregationSource::Internal,
3640 );
3641 assert_historical_sink_receives!(
3642 "ValueBarAggregator",
3643 |bars| ValueBarAggregator::new(
3644 value_type,
3645 price_precision,
3646 size_precision,
3647 make_sink(bars)
3648 ),
3649 |aggregator: &mut dyn BarAggregator| {
3650 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3651 }
3652 );
3653
3654 let value_imbalance_type = BarType::new(
3655 instrument_id,
3656 BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last),
3657 AggregationSource::Internal,
3658 );
3659 assert_historical_sink_receives!(
3660 "ValueImbalanceBarAggregator",
3661 |bars| ValueImbalanceBarAggregator::new(
3662 value_imbalance_type,
3663 price_precision,
3664 size_precision,
3665 make_sink(bars),
3666 ),
3667 |aggregator: &mut dyn BarAggregator| {
3668 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3669 }
3670 );
3671
3672 let value_runs_type = BarType::new(
3673 instrument_id,
3674 BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last),
3675 AggregationSource::Internal,
3676 );
3677 assert_historical_sink_receives!(
3678 "ValueRunsBarAggregator",
3679 |bars| ValueRunsBarAggregator::new(
3680 value_runs_type,
3681 price_precision,
3682 size_precision,
3683 make_sink(bars),
3684 ),
3685 |aggregator: &mut dyn BarAggregator| {
3686 aggregator.handle_trade(make_trade("100.00", 1, 1_000));
3687 }
3688 );
3689
3690 let fx = InstrumentAny::CurrencyPair(audusd_sim);
3691 let renko_type = BarType::new(
3692 fx.id(),
3693 BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid),
3694 AggregationSource::Internal,
3695 );
3696 let fx_price_precision = fx.price_precision();
3697 let fx_size_precision = fx.size_precision();
3698 let fx_price_increment = fx.price_increment();
3699 assert_historical_sink_receives!(
3700 "RenkoBarAggregator",
3701 |bars| RenkoBarAggregator::new(
3702 renko_type,
3703 fx_price_precision,
3704 fx_size_precision,
3705 fx_price_increment,
3706 make_sink(bars),
3707 ),
3708 |aggregator: &mut dyn BarAggregator| {
3709 aggregator.update(
3710 Price::from("1.00000"),
3711 Quantity::from(1),
3712 UnixNanos::from(1_000),
3713 );
3714 aggregator.update(
3715 Price::from("1.00010"),
3716 Quantity::from(1),
3717 UnixNanos::from(2_000),
3718 );
3719 }
3720 );
3721 }
3722
3723 #[rstest]
3724 fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
3725 let instrument = InstrumentAny::Equity(equity_aapl);
3726 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3727 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3728 let handler = Arc::new(Mutex::new(Vec::new()));
3729 let handler_clone = Arc::clone(&handler);
3730
3731 let mut aggregator = TickImbalanceBarAggregator::new(
3732 bar_type,
3733 instrument.price_precision(),
3734 instrument.size_precision(),
3735 move |bar: Bar| {
3736 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3737 handler_guard.push(bar);
3738 },
3739 );
3740
3741 let trade = TradeTick::default();
3742 aggregator.handle_trade(trade);
3743 aggregator.handle_trade(trade);
3744
3745 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3746 assert_eq!(handler_guard.len(), 1);
3747 let bar = handler_guard.first().unwrap();
3748 assert_eq!(bar.volume, Quantity::from(200000));
3749 }
3750
3751 #[rstest]
3752 fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
3753 let instrument = InstrumentAny::Equity(equity_aapl);
3754 let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
3755 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3756 let handler = Arc::new(Mutex::new(Vec::new()));
3757 let handler_clone = Arc::clone(&handler);
3758
3759 let mut aggregator = TickImbalanceBarAggregator::new(
3760 bar_type,
3761 instrument.price_precision(),
3762 instrument.size_precision(),
3763 move |bar: Bar| {
3764 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3765 handler_guard.push(bar);
3766 },
3767 );
3768
3769 let sell = TradeTick {
3770 aggressor_side: AggressorSide::Seller,
3771 ..TradeTick::default()
3772 };
3773
3774 aggregator.handle_trade(sell);
3775
3776 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3777 assert_eq!(handler_guard.len(), 1);
3778 }
3779
3780 #[rstest]
3781 fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
3782 let instrument = InstrumentAny::Equity(equity_aapl);
3783 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3784 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3785 let handler = Arc::new(Mutex::new(Vec::new()));
3786 let handler_clone = Arc::clone(&handler);
3787
3788 let mut aggregator = TickRunsBarAggregator::new(
3789 bar_type,
3790 instrument.price_precision(),
3791 instrument.size_precision(),
3792 move |bar: Bar| {
3793 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3794 handler_guard.push(bar);
3795 },
3796 );
3797
3798 let buy = TradeTick::default();
3799 let sell = TradeTick {
3800 aggressor_side: AggressorSide::Seller,
3801 ..buy
3802 };
3803
3804 aggregator.handle_trade(buy);
3805 aggregator.handle_trade(buy);
3806 aggregator.handle_trade(sell);
3807 aggregator.handle_trade(sell);
3808
3809 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3810 assert_eq!(handler_guard.len(), 2);
3811 }
3812
3813 #[rstest]
3814 fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
3815 let instrument = InstrumentAny::Equity(equity_aapl);
3816 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3817 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3818 let handler = Arc::new(Mutex::new(Vec::new()));
3819 let handler_clone = Arc::clone(&handler);
3820
3821 let mut aggregator = TickRunsBarAggregator::new(
3822 bar_type,
3823 instrument.price_precision(),
3824 instrument.size_precision(),
3825 move |bar: Bar| {
3826 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3827 handler_guard.push(bar);
3828 },
3829 );
3830
3831 let buy = TradeTick {
3832 size: Quantity::from(1),
3833 ..TradeTick::default()
3834 };
3835 let sell = TradeTick {
3836 aggressor_side: AggressorSide::Seller,
3837 size: Quantity::from(1),
3838 ..buy
3839 };
3840
3841 aggregator.handle_trade(buy);
3842 aggregator.handle_trade(buy);
3843 aggregator.handle_trade(sell);
3844 aggregator.handle_trade(sell);
3845
3846 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3847 assert_eq!(handler_guard.len(), 2);
3848 assert_eq!(handler_guard[0].volume, Quantity::from(2));
3849 assert_eq!(handler_guard[1].volume, Quantity::from(2));
3850 }
3851
3852 #[rstest]
3853 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
3854 let instrument = InstrumentAny::Equity(equity_aapl);
3855 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3856 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3857 let handler = Arc::new(Mutex::new(Vec::new()));
3858 let handler_clone = Arc::clone(&handler);
3859
3860 let mut aggregator = VolumeBarAggregator::new(
3861 bar_type,
3862 instrument.price_precision(),
3863 instrument.size_precision(),
3864 move |bar: Bar| {
3865 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3866 handler_guard.push(bar);
3867 },
3868 );
3869
3870 aggregator.update(
3871 Price::from("1.00001"),
3872 Quantity::from(25),
3873 UnixNanos::default(),
3874 );
3875
3876 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3877 assert_eq!(handler_guard.len(), 2);
3878 let bar1 = &handler_guard[0];
3879 assert_eq!(bar1.volume, Quantity::from(10));
3880 let bar2 = &handler_guard[1];
3881 assert_eq!(bar2.volume, Quantity::from(10));
3882 }
3883
3884 #[rstest]
3885 fn test_volume_bar_aggregator_zero_size_update_is_noop(equity_aapl: Equity) {
3886 let instrument = InstrumentAny::Equity(equity_aapl);
3887 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3888 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3889 let handler = Arc::new(Mutex::new(Vec::new()));
3890 let handler_clone = Arc::clone(&handler);
3891
3892 let mut aggregator = VolumeBarAggregator::new(
3893 bar_type,
3894 instrument.price_precision(),
3895 instrument.size_precision(),
3896 move |bar: Bar| {
3897 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3898 handler_guard.push(bar);
3899 },
3900 );
3901
3902 aggregator.update(
3903 Price::from("100.00"),
3904 Quantity::from(0),
3905 UnixNanos::default(),
3906 );
3907
3908 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3909 assert_eq!(handler_guard.len(), 0);
3910 }
3911
3912 #[rstest]
3913 fn test_volume_bar_aggregator_exact_threshold_emits_single_bar(equity_aapl: Equity) {
3914 let instrument = InstrumentAny::Equity(equity_aapl);
3915 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3916 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3917 let handler = Arc::new(Mutex::new(Vec::new()));
3918 let handler_clone = Arc::clone(&handler);
3919
3920 let mut aggregator = VolumeBarAggregator::new(
3921 bar_type,
3922 instrument.price_precision(),
3923 instrument.size_precision(),
3924 move |bar: Bar| {
3925 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3926 handler_guard.push(bar);
3927 },
3928 );
3929
3930 aggregator.update(
3931 Price::from("100.00"),
3932 Quantity::from(7),
3933 UnixNanos::from(1_000),
3934 );
3935 aggregator.update(
3936 Price::from("101.00"),
3937 Quantity::from(3),
3938 UnixNanos::from(2_000),
3939 );
3940
3941 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3942 assert_eq!(handler_guard.len(), 1);
3943 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3944 assert_eq!(handler_guard[0].close, Price::from("101.00"));
3945 }
3946
3947 #[rstest]
3948 fn test_volume_bar_aggregator_step_of_one_emits_per_unit(equity_aapl: Equity) {
3949 let instrument = InstrumentAny::Equity(equity_aapl);
3950 let bar_spec = BarSpecification::new(1, BarAggregation::Volume, PriceType::Last);
3951 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3952 let handler = Arc::new(Mutex::new(Vec::new()));
3953 let handler_clone = Arc::clone(&handler);
3954
3955 let mut aggregator = VolumeBarAggregator::new(
3956 bar_type,
3957 instrument.price_precision(),
3958 instrument.size_precision(),
3959 move |bar: Bar| {
3960 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3961 handler_guard.push(bar);
3962 },
3963 );
3964
3965 aggregator.update(
3966 Price::from("100.00"),
3967 Quantity::from(1),
3968 UnixNanos::default(),
3969 );
3970
3971 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3972 assert_eq!(handler_guard.len(), 1);
3973 assert_eq!(handler_guard[0].volume, Quantity::from(1));
3974 }
3975
3976 #[rstest]
3977 fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
3978 let instrument = InstrumentAny::Equity(equity_aapl);
3979 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
3980 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3981 let handler = Arc::new(Mutex::new(Vec::new()));
3982 let handler_clone = Arc::clone(&handler);
3983
3984 let mut aggregator = VolumeRunsBarAggregator::new(
3985 bar_type,
3986 instrument.price_precision(),
3987 instrument.size_precision(),
3988 move |bar: Bar| {
3989 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3990 handler_guard.push(bar);
3991 },
3992 );
3993
3994 let buy = TradeTick {
3995 instrument_id: instrument.id(),
3996 price: Price::from("1.0"),
3997 size: Quantity::from(1),
3998 ..TradeTick::default()
3999 };
4000 let sell = TradeTick {
4001 aggressor_side: AggressorSide::Seller,
4002 ..buy
4003 };
4004
4005 aggregator.handle_trade(buy);
4006 aggregator.handle_trade(buy); aggregator.handle_trade(sell);
4008 aggregator.handle_trade(sell); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4011 assert!(handler_guard.len() >= 2);
4012 assert!(
4013 (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
4014 < f64::EPSILON
4015 );
4016 }
4017
4018 #[rstest]
4019 fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
4020 let instrument = InstrumentAny::Equity(equity_aapl);
4021 let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
4022 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4023 let handler = Arc::new(Mutex::new(Vec::new()));
4024 let handler_clone = Arc::clone(&handler);
4025
4026 let mut aggregator = VolumeRunsBarAggregator::new(
4027 bar_type,
4028 instrument.price_precision(),
4029 instrument.size_precision(),
4030 move |bar: Bar| {
4031 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4032 handler_guard.push(bar);
4033 },
4034 );
4035
4036 let trade = TradeTick {
4037 instrument_id: instrument.id(),
4038 price: Price::from("1.0"),
4039 size: Quantity::from(5),
4040 ..TradeTick::default()
4041 };
4042
4043 aggregator.handle_trade(trade);
4044
4045 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4046 assert!(!handler_guard.is_empty());
4047 assert!(handler_guard[0].volume.as_f64() > 0.0);
4048 assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
4049 }
4050
4051 #[rstest]
4052 fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
4053 let instrument = InstrumentAny::Equity(equity_aapl);
4054 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
4055 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4056 let handler = Arc::new(Mutex::new(Vec::new()));
4057 let handler_clone = Arc::clone(&handler);
4058
4059 let mut aggregator = VolumeImbalanceBarAggregator::new(
4060 bar_type,
4061 instrument.price_precision(),
4062 instrument.size_precision(),
4063 move |bar: Bar| {
4064 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4065 handler_guard.push(bar);
4066 },
4067 );
4068
4069 let trade_small = TradeTick {
4070 instrument_id: instrument.id(),
4071 price: Price::from("1.0"),
4072 size: Quantity::from(1),
4073 ..TradeTick::default()
4074 };
4075 let trade_large = TradeTick {
4076 size: Quantity::from(3),
4077 ..trade_small
4078 };
4079
4080 aggregator.handle_trade(trade_small);
4081 aggregator.handle_trade(trade_large);
4082
4083 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4084 assert_eq!(handler_guard.len(), 2);
4085 let total_output = handler_guard
4086 .iter()
4087 .map(|bar| bar.volume.as_f64())
4088 .sum::<f64>();
4089 let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
4090 assert!((total_output - total_input).abs() < f64::EPSILON);
4091 }
4092
4093 #[rstest]
4094 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
4095 let instrument = InstrumentAny::Equity(equity_aapl);
4096 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4098 let handler = Arc::new(Mutex::new(Vec::new()));
4099 let handler_clone = Arc::clone(&handler);
4100
4101 let mut aggregator = ValueBarAggregator::new(
4102 bar_type,
4103 instrument.price_precision(),
4104 instrument.size_precision(),
4105 move |bar: Bar| {
4106 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4107 handler_guard.push(bar);
4108 },
4109 );
4110
4111 aggregator.update(
4113 Price::from("100.00"),
4114 Quantity::from(5),
4115 UnixNanos::default(),
4116 );
4117 aggregator.update(
4118 Price::from("100.00"),
4119 Quantity::from(5),
4120 UnixNanos::from(1000),
4121 );
4122
4123 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4124 assert_eq!(handler_guard.len(), 1);
4125 let bar = handler_guard.first().unwrap();
4126 assert_eq!(bar.volume, Quantity::from(10));
4127 }
4128
4129 #[rstest]
4130 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
4131 let instrument = InstrumentAny::Equity(equity_aapl);
4132 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
4133 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4134 let handler = Arc::new(Mutex::new(Vec::new()));
4135 let handler_clone = Arc::clone(&handler);
4136
4137 let mut aggregator = ValueBarAggregator::new(
4138 bar_type,
4139 instrument.price_precision(),
4140 instrument.size_precision(),
4141 move |bar: Bar| {
4142 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4143 handler_guard.push(bar);
4144 },
4145 );
4146
4147 aggregator.update(
4149 Price::from("100.00"),
4150 Quantity::from(25),
4151 UnixNanos::default(),
4152 );
4153
4154 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4155 assert_eq!(handler_guard.len(), 2);
4156 let remaining_value = aggregator.get_cumulative_value();
4157 assert!(remaining_value < 1000.0); }
4159
4160 #[rstest]
4161 fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
4162 let instrument = InstrumentAny::Equity(equity_aapl);
4163 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
4164 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4165 let handler = Arc::new(Mutex::new(Vec::new()));
4166 let handler_clone = Arc::clone(&handler);
4167
4168 let mut aggregator = ValueBarAggregator::new(
4169 bar_type,
4170 instrument.price_precision(),
4171 instrument.size_precision(),
4172 move |bar: Bar| {
4173 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4174 handler_guard.push(bar);
4175 },
4176 );
4177
4178 aggregator.update(
4180 Price::from("0.00"),
4181 Quantity::from(100),
4182 UnixNanos::default(),
4183 );
4184
4185 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4187 assert_eq!(handler_guard.len(), 0);
4188
4189 assert_eq!(aggregator.get_cumulative_value(), 0.0);
4191 }
4192
4193 #[rstest]
4194 fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
4195 let instrument = InstrumentAny::Equity(equity_aapl);
4196 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
4197 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4198 let handler = Arc::new(Mutex::new(Vec::new()));
4199 let handler_clone = Arc::clone(&handler);
4200
4201 let mut aggregator = ValueBarAggregator::new(
4202 bar_type,
4203 instrument.price_precision(),
4204 instrument.size_precision(),
4205 move |bar: Bar| {
4206 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4207 handler_guard.push(bar);
4208 },
4209 );
4210
4211 aggregator.update(
4213 Price::from("100.00"),
4214 Quantity::from(0),
4215 UnixNanos::default(),
4216 );
4217
4218 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4220 assert_eq!(handler_guard.len(), 0);
4221
4222 assert_eq!(aggregator.get_cumulative_value(), 0.0);
4224 }
4225
4226 #[rstest]
4227 fn test_value_bar_aggregator_exact_threshold_emits_one_bar(equity_aapl: Equity) {
4228 let instrument = InstrumentAny::Equity(equity_aapl);
4229 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
4230 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4231 let handler = Arc::new(Mutex::new(Vec::new()));
4232 let handler_clone = Arc::clone(&handler);
4233
4234 let mut aggregator = ValueBarAggregator::new(
4235 bar_type,
4236 instrument.price_precision(),
4237 instrument.size_precision(),
4238 move |bar: Bar| {
4239 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4240 handler_guard.push(bar);
4241 },
4242 );
4243
4244 aggregator.update(
4245 Price::from("100.00"),
4246 Quantity::from(5),
4247 UnixNanos::from(1_000),
4248 );
4249 aggregator.update(
4250 Price::from("100.00"),
4251 Quantity::from(5),
4252 UnixNanos::from(2_000),
4253 );
4254
4255 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4256 assert_eq!(handler_guard.len(), 1);
4257 assert_eq!(handler_guard[0].volume, Quantity::from(10));
4258 assert_eq!(aggregator.get_cumulative_value(), 0.0);
4259 }
4260
4261 #[rstest]
4262 fn test_value_bar_aggregator_precision_boundary_min_size_clamp(equity_aapl: Equity) {
4263 let instrument = InstrumentAny::Equity(equity_aapl);
4267 let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
4268 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4269 let handler = Arc::new(Mutex::new(Vec::new()));
4270 let handler_clone = Arc::clone(&handler);
4271
4272 let mut aggregator = ValueBarAggregator::new(
4273 bar_type,
4274 instrument.price_precision(),
4275 instrument.size_precision(),
4276 move |bar: Bar| {
4277 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4278 handler_guard.push(bar);
4279 },
4280 );
4281
4282 aggregator.update(
4284 Price::from("100.00"),
4285 Quantity::from(4),
4286 UnixNanos::default(),
4287 );
4288
4289 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4290 assert_eq!(handler_guard.len(), 4);
4291 for bar in handler_guard.iter() {
4292 assert_eq!(bar.volume, Quantity::from(1));
4293 }
4294 }
4295
4296 #[rstest]
4297 fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
4298 let instrument = InstrumentAny::Equity(equity_aapl);
4299 let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
4300 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4301 let handler = Arc::new(Mutex::new(Vec::new()));
4302 let handler_clone = Arc::clone(&handler);
4303
4304 let mut aggregator = ValueImbalanceBarAggregator::new(
4305 bar_type,
4306 instrument.price_precision(),
4307 instrument.size_precision(),
4308 move |bar: Bar| {
4309 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4310 handler_guard.push(bar);
4311 },
4312 );
4313
4314 let buy = TradeTick {
4315 price: Price::from("5.0"),
4316 size: Quantity::from(2), instrument_id: instrument.id(),
4318 ..TradeTick::default()
4319 };
4320 let sell = TradeTick {
4321 price: Price::from("5.0"),
4322 size: Quantity::from(2), aggressor_side: AggressorSide::Seller,
4324 instrument_id: instrument.id(),
4325 ..buy
4326 };
4327
4328 aggregator.handle_trade(buy);
4329 aggregator.handle_trade(sell);
4330
4331 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4332 assert_eq!(handler_guard.len(), 2);
4333 }
4334
4335 #[rstest]
4336 fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
4337 let instrument = InstrumentAny::Equity(equity_aapl);
4338 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4339 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4340 let handler = Arc::new(Mutex::new(Vec::new()));
4341 let handler_clone = Arc::clone(&handler);
4342
4343 let mut aggregator = ValueRunsBarAggregator::new(
4344 bar_type,
4345 instrument.price_precision(),
4346 instrument.size_precision(),
4347 move |bar: Bar| {
4348 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4349 handler_guard.push(bar);
4350 },
4351 );
4352
4353 let trade = TradeTick {
4354 price: Price::from("10.0"),
4355 size: Quantity::from(5),
4356 instrument_id: instrument.id(),
4357 ..TradeTick::default()
4358 };
4359
4360 aggregator.handle_trade(trade);
4361 aggregator.handle_trade(trade);
4362
4363 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4364 assert_eq!(handler_guard.len(), 1);
4365 let bar = handler_guard.first().unwrap();
4366 assert_eq!(bar.volume, Quantity::from(10));
4367 }
4368
4369 #[rstest]
4370 fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
4371 let instrument = InstrumentAny::Equity(equity_aapl);
4372 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4373 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4374 let handler = Arc::new(Mutex::new(Vec::new()));
4375 let handler_clone = Arc::clone(&handler);
4376
4377 let mut aggregator = ValueRunsBarAggregator::new(
4378 bar_type,
4379 instrument.price_precision(),
4380 instrument.size_precision(),
4381 move |bar: Bar| {
4382 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4383 handler_guard.push(bar);
4384 },
4385 );
4386
4387 let buy = TradeTick {
4388 price: Price::from("10.0"),
4389 size: Quantity::from(5),
4390 instrument_id: instrument.id(),
4391 ..TradeTick::default()
4392 }; let sell = TradeTick {
4394 price: Price::from("10.0"),
4395 size: Quantity::from(10),
4396 aggressor_side: AggressorSide::Seller,
4397 ..buy
4398 }; aggregator.handle_trade(buy);
4401 aggregator.handle_trade(sell);
4402
4403 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4404 assert_eq!(handler_guard.len(), 1);
4405 assert_eq!(handler_guard[0].volume, Quantity::from(10));
4406 }
4407
4408 #[rstest]
4409 fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4410 let instrument = InstrumentAny::Equity(equity_aapl);
4411 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
4412 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4413 let handler = Arc::new(Mutex::new(Vec::new()));
4414 let handler_clone = Arc::clone(&handler);
4415
4416 let mut aggregator = TickRunsBarAggregator::new(
4417 bar_type,
4418 instrument.price_precision(),
4419 instrument.size_precision(),
4420 move |bar: Bar| {
4421 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4422 handler_guard.push(bar);
4423 },
4424 );
4425
4426 let buy = TradeTick::default();
4427
4428 aggregator.handle_trade(buy);
4429 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4434 assert_eq!(handler_guard.len(), 2);
4435 }
4436
4437 #[rstest]
4438 fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
4439 let instrument = InstrumentAny::Equity(equity_aapl);
4440 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
4441 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4442 let handler = Arc::new(Mutex::new(Vec::new()));
4443 let handler_clone = Arc::clone(&handler);
4444
4445 let mut aggregator = TickRunsBarAggregator::new(
4446 bar_type,
4447 instrument.price_precision(),
4448 instrument.size_precision(),
4449 move |bar: Bar| {
4450 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4451 handler_guard.push(bar);
4452 },
4453 );
4454
4455 let buy = TradeTick::default();
4456 let no_aggressor = TradeTick {
4457 aggressor_side: AggressorSide::NoAggressor,
4458 ..buy
4459 };
4460
4461 aggregator.handle_trade(buy);
4462 aggregator.handle_trade(no_aggressor); aggregator.handle_trade(no_aggressor); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4467 assert_eq!(handler_guard.len(), 1);
4468 }
4469
4470 #[rstest]
4471 fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4472 let instrument = InstrumentAny::Equity(equity_aapl);
4473 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
4474 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4475 let handler = Arc::new(Mutex::new(Vec::new()));
4476 let handler_clone = Arc::clone(&handler);
4477
4478 let mut aggregator = VolumeRunsBarAggregator::new(
4479 bar_type,
4480 instrument.price_precision(),
4481 instrument.size_precision(),
4482 move |bar: Bar| {
4483 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4484 handler_guard.push(bar);
4485 },
4486 );
4487
4488 let buy = TradeTick {
4489 instrument_id: instrument.id(),
4490 price: Price::from("1.0"),
4491 size: Quantity::from(1),
4492 ..TradeTick::default()
4493 };
4494
4495 aggregator.handle_trade(buy);
4496 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4501 assert_eq!(handler_guard.len(), 2);
4502 assert_eq!(handler_guard[0].volume, Quantity::from(2));
4503 assert_eq!(handler_guard[1].volume, Quantity::from(2));
4504 }
4505
4506 #[rstest]
4507 fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4508 let instrument = InstrumentAny::Equity(equity_aapl);
4509 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4510 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4511 let handler = Arc::new(Mutex::new(Vec::new()));
4512 let handler_clone = Arc::clone(&handler);
4513
4514 let mut aggregator = ValueRunsBarAggregator::new(
4515 bar_type,
4516 instrument.price_precision(),
4517 instrument.size_precision(),
4518 move |bar: Bar| {
4519 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4520 handler_guard.push(bar);
4521 },
4522 );
4523
4524 let buy = TradeTick {
4525 instrument_id: instrument.id(),
4526 price: Price::from("10.0"),
4527 size: Quantity::from(5),
4528 ..TradeTick::default()
4529 }; aggregator.handle_trade(buy);
4532 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4537 assert_eq!(handler_guard.len(), 2);
4538 assert_eq!(handler_guard[0].volume, Quantity::from(10));
4539 assert_eq!(handler_guard[1].volume, Quantity::from(10));
4540 }
4541
4542 #[rstest]
4543 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
4544 let instrument = InstrumentAny::Equity(equity_aapl);
4545 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4547 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4548 let handler = Arc::new(Mutex::new(Vec::new()));
4549 let handler_clone = Arc::clone(&handler);
4550 let clock = Rc::new(RefCell::new(TestClock::new()));
4551
4552 let mut aggregator = TimeBarAggregator::new(
4553 bar_type,
4554 instrument.price_precision(),
4555 instrument.size_precision(),
4556 clock.clone(),
4557 move |bar: Bar| {
4558 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4559 handler_guard.push(bar);
4560 },
4561 true, false, BarIntervalType::LeftOpen,
4564 None, 15, false, );
4568
4569 aggregator.update(
4570 Price::from("100.00"),
4571 Quantity::from(1),
4572 UnixNanos::default(),
4573 );
4574
4575 let next_sec = UnixNanos::from(1_000_000_000);
4576 clock.borrow_mut().set_time(next_sec);
4577
4578 let event = TimeEvent::new(
4579 Ustr::from("1-SECOND-LAST"),
4580 UUID4::new(),
4581 next_sec,
4582 next_sec,
4583 );
4584 aggregator.build_bar(&event);
4585
4586 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4587 assert_eq!(handler_guard.len(), 1);
4588 let bar = handler_guard.first().unwrap();
4589 assert_eq!(bar.ts_event, UnixNanos::default());
4590 assert_eq!(bar.ts_init, next_sec);
4591 }
4592
4593 #[rstest]
4594 fn test_time_bar_aggregator_stop_clears_timer_and_allows_restart(equity_aapl: Equity) {
4595 let instrument = InstrumentAny::Equity(equity_aapl);
4596 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4597 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4598 let timer_name = format!("TIME_BAR_{bar_type}");
4599 let clock = Rc::new(RefCell::new(TestClock::new()));
4600
4601 let aggregator = TimeBarAggregator::new(
4602 bar_type,
4603 instrument.price_precision(),
4604 instrument.size_precision(),
4605 clock.clone(),
4606 |_bar: Bar| {},
4607 true,
4608 false,
4609 BarIntervalType::LeftOpen,
4610 None,
4611 15,
4612 false,
4613 );
4614
4615 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
4616 let rc = Rc::new(RefCell::new(boxed));
4617
4618 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
4619 assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
4620
4621 rc.borrow_mut().stop();
4622 assert!(clock.borrow().timer_names().is_empty());
4623
4624 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
4625 assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
4626 }
4627
4628 #[rstest]
4629 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
4630 let instrument = InstrumentAny::Equity(equity_aapl);
4631 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4632 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4633 let handler = Arc::new(Mutex::new(Vec::new()));
4634 let handler_clone = Arc::clone(&handler);
4635 let clock = Rc::new(RefCell::new(TestClock::new()));
4636
4637 let mut aggregator = TimeBarAggregator::new(
4638 bar_type,
4639 instrument.price_precision(),
4640 instrument.size_precision(),
4641 clock.clone(),
4642 move |bar: Bar| {
4643 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4644 handler_guard.push(bar);
4645 },
4646 true, true, BarIntervalType::LeftOpen,
4649 None,
4650 15,
4651 false, );
4653
4654 aggregator.update(
4656 Price::from("100.00"),
4657 Quantity::from(1),
4658 UnixNanos::default(),
4659 );
4660
4661 let ts1 = UnixNanos::from(1_000_000_000);
4663 clock.borrow_mut().set_time(ts1);
4664 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4665 aggregator.build_bar(&event);
4666
4667 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4669
4670 let ts2 = UnixNanos::from(2_000_000_000);
4672 clock.borrow_mut().set_time(ts2);
4673 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4674 aggregator.build_bar(&event);
4675
4676 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4677 assert_eq!(handler_guard.len(), 2);
4678
4679 let bar1 = &handler_guard[0];
4680 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
4682 assert_eq!(bar1.close, Price::from("100.00"));
4683 let bar2 = &handler_guard[1];
4684 assert_eq!(bar2.ts_event, ts2);
4685 assert_eq!(bar2.ts_init, ts2);
4686 assert_eq!(bar2.close, Price::from("101.00"));
4687 }
4688
4689 #[rstest]
4690 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
4691 let instrument = InstrumentAny::Equity(equity_aapl);
4692 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4693 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4694 let handler = Arc::new(Mutex::new(Vec::new()));
4695 let handler_clone = Arc::clone(&handler);
4696 let clock = Rc::new(RefCell::new(TestClock::new()));
4697 let mut aggregator = TimeBarAggregator::new(
4698 bar_type,
4699 instrument.price_precision(),
4700 instrument.size_precision(),
4701 clock.clone(),
4702 move |bar: Bar| {
4703 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4704 handler_guard.push(bar);
4705 },
4706 true, true, BarIntervalType::RightOpen,
4709 None,
4710 15,
4711 false, );
4713
4714 aggregator.update(
4716 Price::from("100.00"),
4717 Quantity::from(1),
4718 UnixNanos::default(),
4719 );
4720
4721 let ts1 = UnixNanos::from(1_000_000_000);
4723 clock.borrow_mut().set_time(ts1);
4724 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4725 aggregator.build_bar(&event);
4726
4727 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4729
4730 let ts2 = UnixNanos::from(2_000_000_000);
4732 clock.borrow_mut().set_time(ts2);
4733 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4734 aggregator.build_bar(&event);
4735
4736 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4737 assert_eq!(handler_guard.len(), 2);
4738
4739 let bar1 = &handler_guard[0];
4740 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
4742 assert_eq!(bar1.close, Price::from("100.00"));
4743
4744 let bar2 = &handler_guard[1];
4745 assert_eq!(bar2.ts_event, ts1);
4746 assert_eq!(bar2.ts_init, ts2);
4747 assert_eq!(bar2.close, Price::from("101.00"));
4748 }
4749
4750 #[rstest]
4751 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
4752 let instrument = InstrumentAny::Equity(equity_aapl);
4753 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4754 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4755 let handler = Arc::new(Mutex::new(Vec::new()));
4756 let handler_clone = Arc::clone(&handler);
4757 let clock = Rc::new(RefCell::new(TestClock::new()));
4758
4759 let mut aggregator = TimeBarAggregator::new(
4761 bar_type,
4762 instrument.price_precision(),
4763 instrument.size_precision(),
4764 clock.clone(),
4765 move |bar: Bar| {
4766 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4767 handler_guard.push(bar);
4768 },
4769 false, true, BarIntervalType::LeftOpen,
4772 None,
4773 15,
4774 false, );
4776
4777 let ts1 = UnixNanos::from(1_000_000_000);
4779 clock.borrow_mut().set_time(ts1);
4780 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4781 aggregator.build_bar(&event);
4782
4783 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4784 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
4786
4787 let handler = Arc::new(Mutex::new(Vec::new()));
4789 let handler_clone = Arc::clone(&handler);
4790 let mut aggregator = TimeBarAggregator::new(
4791 bar_type,
4792 instrument.price_precision(),
4793 instrument.size_precision(),
4794 clock.clone(),
4795 move |bar: Bar| {
4796 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4797 handler_guard.push(bar);
4798 },
4799 true, true, BarIntervalType::LeftOpen,
4802 None,
4803 15,
4804 false, );
4806
4807 aggregator.update(
4808 Price::from("100.00"),
4809 Quantity::from(1),
4810 UnixNanos::default(),
4811 );
4812
4813 let ts1 = UnixNanos::from(1_000_000_000);
4815 clock.borrow_mut().set_time(ts1);
4816 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4817 aggregator.build_bar(&event);
4818
4819 let ts2 = UnixNanos::from(2_000_000_000);
4821 clock.borrow_mut().set_time(ts2);
4822 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4823 aggregator.build_bar(&event);
4824
4825 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4826 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
4828 assert_eq!(bar1.close, Price::from("100.00"));
4829 let bar2 = &handler_guard[1];
4830 assert_eq!(bar2.close, Price::from("100.00")); }
4832
4833 #[rstest]
4834 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
4835 let instrument = InstrumentAny::Equity(equity_aapl);
4836 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4837 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4838 let clock = Rc::new(RefCell::new(TestClock::new()));
4839 let handler = Arc::new(Mutex::new(Vec::new()));
4840 let handler_clone = Arc::clone(&handler);
4841
4842 let mut aggregator = TimeBarAggregator::new(
4843 bar_type,
4844 instrument.price_precision(),
4845 instrument.size_precision(),
4846 clock.clone(),
4847 move |bar: Bar| {
4848 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4849 handler_guard.push(bar);
4850 },
4851 true, true, BarIntervalType::RightOpen,
4854 None,
4855 15,
4856 false, );
4858
4859 let ts1 = UnixNanos::from(1_000_000_000);
4860 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
4861
4862 let ts2 = UnixNanos::from(2_000_000_000);
4863 clock.borrow_mut().set_time(ts2);
4864
4865 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4867 aggregator.build_bar(&event);
4868
4869 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4870 let bar = handler_guard.first().unwrap();
4871 assert_eq!(bar.ts_event, UnixNanos::default());
4872 assert_eq!(bar.ts_init, ts2);
4873 }
4874
4875 #[rstest]
4876 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
4877 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4878 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4880 let handler = Arc::new(Mutex::new(Vec::new()));
4881 let handler_clone = Arc::clone(&handler);
4882
4883 let aggregator = RenkoBarAggregator::new(
4884 bar_type,
4885 instrument.price_precision(),
4886 instrument.size_precision(),
4887 instrument.price_increment(),
4888 move |bar: Bar| {
4889 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4890 handler_guard.push(bar);
4891 },
4892 );
4893
4894 assert_eq!(aggregator.bar_type(), bar_type);
4895 assert!(!aggregator.is_running());
4896 let expected_brick_size = 10 * instrument.price_increment().raw;
4898 assert_eq!(aggregator.brick_size, expected_brick_size);
4899 }
4900
4901 #[rstest]
4902 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
4903 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4904 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4906 let handler = Arc::new(Mutex::new(Vec::new()));
4907 let handler_clone = Arc::clone(&handler);
4908
4909 let mut aggregator = RenkoBarAggregator::new(
4910 bar_type,
4911 instrument.price_precision(),
4912 instrument.size_precision(),
4913 instrument.price_increment(),
4914 move |bar: Bar| {
4915 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4916 handler_guard.push(bar);
4917 },
4918 );
4919
4920 aggregator.update(
4922 Price::from("1.00000"),
4923 Quantity::from(1),
4924 UnixNanos::default(),
4925 );
4926 aggregator.update(
4927 Price::from("1.00005"),
4928 Quantity::from(1),
4929 UnixNanos::from(1000),
4930 );
4931
4932 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4933 assert_eq!(handler_guard.len(), 0); }
4935
4936 #[rstest]
4937 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
4938 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4939 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4941 let handler = Arc::new(Mutex::new(Vec::new()));
4942 let handler_clone = Arc::clone(&handler);
4943
4944 let mut aggregator = RenkoBarAggregator::new(
4945 bar_type,
4946 instrument.price_precision(),
4947 instrument.size_precision(),
4948 instrument.price_increment(),
4949 move |bar: Bar| {
4950 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4951 handler_guard.push(bar);
4952 },
4953 );
4954
4955 aggregator.update(
4957 Price::from("1.00000"),
4958 Quantity::from(1),
4959 UnixNanos::default(),
4960 );
4961 aggregator.update(
4962 Price::from("1.00015"),
4963 Quantity::from(1),
4964 UnixNanos::from(1000),
4965 );
4966
4967 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4968 assert_eq!(handler_guard.len(), 1);
4969
4970 let bar = handler_guard.first().unwrap();
4971 assert_eq!(bar.open, Price::from("1.00000"));
4972 assert_eq!(bar.high, Price::from("1.00010"));
4973 assert_eq!(bar.low, Price::from("1.00000"));
4974 assert_eq!(bar.close, Price::from("1.00010"));
4975 assert_eq!(bar.volume, Quantity::from(2));
4976 assert_eq!(bar.ts_event, UnixNanos::from(1000));
4977 assert_eq!(bar.ts_init, UnixNanos::from(1000));
4978 }
4979
4980 #[rstest]
4981 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
4982 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4983 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4985 let handler = Arc::new(Mutex::new(Vec::new()));
4986 let handler_clone = Arc::clone(&handler);
4987
4988 let mut aggregator = RenkoBarAggregator::new(
4989 bar_type,
4990 instrument.price_precision(),
4991 instrument.size_precision(),
4992 instrument.price_increment(),
4993 move |bar: Bar| {
4994 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4995 handler_guard.push(bar);
4996 },
4997 );
4998
4999 aggregator.update(
5001 Price::from("1.00000"),
5002 Quantity::from(1),
5003 UnixNanos::default(),
5004 );
5005 aggregator.update(
5006 Price::from("1.00025"),
5007 Quantity::from(1),
5008 UnixNanos::from(1000),
5009 );
5010
5011 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5012 assert_eq!(handler_guard.len(), 2);
5013
5014 let bar1 = &handler_guard[0];
5015 assert_eq!(bar1.open, Price::from("1.00000"));
5016 assert_eq!(bar1.high, Price::from("1.00010"));
5017 assert_eq!(bar1.low, Price::from("1.00000"));
5018 assert_eq!(bar1.close, Price::from("1.00010"));
5019
5020 let bar2 = &handler_guard[1];
5021 assert_eq!(bar2.open, Price::from("1.00010"));
5022 assert_eq!(bar2.high, Price::from("1.00020"));
5023 assert_eq!(bar2.low, Price::from("1.00010"));
5024 assert_eq!(bar2.close, Price::from("1.00020"));
5025 }
5026
5027 #[rstest]
5028 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
5029 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5030 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5032 let handler = Arc::new(Mutex::new(Vec::new()));
5033 let handler_clone = Arc::clone(&handler);
5034
5035 let mut aggregator = RenkoBarAggregator::new(
5036 bar_type,
5037 instrument.price_precision(),
5038 instrument.size_precision(),
5039 instrument.price_increment(),
5040 move |bar: Bar| {
5041 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5042 handler_guard.push(bar);
5043 },
5044 );
5045
5046 aggregator.update(
5048 Price::from("1.00020"),
5049 Quantity::from(1),
5050 UnixNanos::default(),
5051 );
5052 aggregator.update(
5053 Price::from("1.00005"),
5054 Quantity::from(1),
5055 UnixNanos::from(1000),
5056 );
5057
5058 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5059 assert_eq!(handler_guard.len(), 1);
5060
5061 let bar = handler_guard.first().unwrap();
5062 assert_eq!(bar.open, Price::from("1.00020"));
5063 assert_eq!(bar.high, Price::from("1.00020"));
5064 assert_eq!(bar.low, Price::from("1.00010"));
5065 assert_eq!(bar.close, Price::from("1.00010"));
5066 assert_eq!(bar.volume, Quantity::from(2));
5067 }
5068
5069 #[rstest]
5070 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
5071 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5072 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5074 let handler = Arc::new(Mutex::new(Vec::new()));
5075 let handler_clone = Arc::clone(&handler);
5076
5077 let mut aggregator = RenkoBarAggregator::new(
5078 bar_type,
5079 instrument.price_precision(),
5080 instrument.size_precision(),
5081 instrument.price_increment(),
5082 move |bar: Bar| {
5083 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5084 handler_guard.push(bar);
5085 },
5086 );
5087
5088 let input_bar = Bar::new(
5090 BarType::new(
5091 instrument.id(),
5092 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
5093 AggregationSource::Internal,
5094 ),
5095 Price::from("1.00000"),
5096 Price::from("1.00005"),
5097 Price::from("0.99995"),
5098 Price::from("1.00005"), Quantity::from(100),
5100 UnixNanos::default(),
5101 UnixNanos::from(1000),
5102 );
5103
5104 aggregator.handle_bar(input_bar);
5105
5106 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5107 assert_eq!(handler_guard.len(), 0); }
5109
5110 #[rstest]
5111 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
5112 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5113 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5115 let handler = Arc::new(Mutex::new(Vec::new()));
5116 let handler_clone = Arc::clone(&handler);
5117
5118 let mut aggregator = RenkoBarAggregator::new(
5119 bar_type,
5120 instrument.price_precision(),
5121 instrument.size_precision(),
5122 instrument.price_increment(),
5123 move |bar: Bar| {
5124 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5125 handler_guard.push(bar);
5126 },
5127 );
5128
5129 let bar1 = Bar::new(
5131 BarType::new(
5132 instrument.id(),
5133 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
5134 AggregationSource::Internal,
5135 ),
5136 Price::from("1.00000"),
5137 Price::from("1.00005"),
5138 Price::from("0.99995"),
5139 Price::from("1.00000"),
5140 Quantity::from(100),
5141 UnixNanos::default(),
5142 UnixNanos::default(),
5143 );
5144
5145 let bar2 = Bar::new(
5147 BarType::new(
5148 instrument.id(),
5149 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
5150 AggregationSource::Internal,
5151 ),
5152 Price::from("1.00000"),
5153 Price::from("1.00015"),
5154 Price::from("0.99995"),
5155 Price::from("1.00010"), Quantity::from(50),
5157 UnixNanos::from(60_000_000_000),
5158 UnixNanos::from(60_000_000_000),
5159 );
5160
5161 aggregator.handle_bar(bar1);
5162 aggregator.handle_bar(bar2);
5163
5164 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5165 assert_eq!(handler_guard.len(), 1);
5166
5167 let bar = handler_guard.first().unwrap();
5168 assert_eq!(bar.open, Price::from("1.00000"));
5169 assert_eq!(bar.high, Price::from("1.00010"));
5170 assert_eq!(bar.low, Price::from("1.00000"));
5171 assert_eq!(bar.close, Price::from("1.00010"));
5172 assert_eq!(bar.volume, Quantity::from(150));
5173 }
5174
5175 #[rstest]
5176 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
5177 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5178 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5180 let handler = Arc::new(Mutex::new(Vec::new()));
5181 let handler_clone = Arc::clone(&handler);
5182
5183 let mut aggregator = RenkoBarAggregator::new(
5184 bar_type,
5185 instrument.price_precision(),
5186 instrument.size_precision(),
5187 instrument.price_increment(),
5188 move |bar: Bar| {
5189 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5190 handler_guard.push(bar);
5191 },
5192 );
5193
5194 let bar1 = Bar::new(
5196 BarType::new(
5197 instrument.id(),
5198 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
5199 AggregationSource::Internal,
5200 ),
5201 Price::from("1.00000"),
5202 Price::from("1.00005"),
5203 Price::from("0.99995"),
5204 Price::from("1.00000"),
5205 Quantity::from(100),
5206 UnixNanos::default(),
5207 UnixNanos::default(),
5208 );
5209
5210 let bar2 = Bar::new(
5212 BarType::new(
5213 instrument.id(),
5214 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
5215 AggregationSource::Internal,
5216 ),
5217 Price::from("1.00000"),
5218 Price::from("1.00035"),
5219 Price::from("0.99995"),
5220 Price::from("1.00030"), Quantity::from(50),
5222 UnixNanos::from(60_000_000_000),
5223 UnixNanos::from(60_000_000_000),
5224 );
5225
5226 aggregator.handle_bar(bar1);
5227 aggregator.handle_bar(bar2);
5228
5229 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5230 assert_eq!(handler_guard.len(), 3);
5231
5232 let bar1 = &handler_guard[0];
5233 assert_eq!(bar1.open, Price::from("1.00000"));
5234 assert_eq!(bar1.close, Price::from("1.00010"));
5235
5236 let bar2 = &handler_guard[1];
5237 assert_eq!(bar2.open, Price::from("1.00010"));
5238 assert_eq!(bar2.close, Price::from("1.00020"));
5239
5240 let bar3 = &handler_guard[2];
5241 assert_eq!(bar3.open, Price::from("1.00020"));
5242 assert_eq!(bar3.close, Price::from("1.00030"));
5243 }
5244
5245 #[rstest]
5246 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
5247 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5248 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5250 let handler = Arc::new(Mutex::new(Vec::new()));
5251 let handler_clone = Arc::clone(&handler);
5252
5253 let mut aggregator = RenkoBarAggregator::new(
5254 bar_type,
5255 instrument.price_precision(),
5256 instrument.size_precision(),
5257 instrument.price_increment(),
5258 move |bar: Bar| {
5259 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5260 handler_guard.push(bar);
5261 },
5262 );
5263
5264 let bar1 = Bar::new(
5266 BarType::new(
5267 instrument.id(),
5268 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
5269 AggregationSource::Internal,
5270 ),
5271 Price::from("1.00020"),
5272 Price::from("1.00025"),
5273 Price::from("1.00015"),
5274 Price::from("1.00020"),
5275 Quantity::from(100),
5276 UnixNanos::default(),
5277 UnixNanos::default(),
5278 );
5279
5280 let bar2 = Bar::new(
5282 BarType::new(
5283 instrument.id(),
5284 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
5285 AggregationSource::Internal,
5286 ),
5287 Price::from("1.00020"),
5288 Price::from("1.00025"),
5289 Price::from("1.00005"),
5290 Price::from("1.00010"), Quantity::from(50),
5292 UnixNanos::from(60_000_000_000),
5293 UnixNanos::from(60_000_000_000),
5294 );
5295
5296 aggregator.handle_bar(bar1);
5297 aggregator.handle_bar(bar2);
5298
5299 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5300 assert_eq!(handler_guard.len(), 1);
5301
5302 let bar = handler_guard.first().unwrap();
5303 assert_eq!(bar.open, Price::from("1.00020"));
5304 assert_eq!(bar.high, Price::from("1.00020"));
5305 assert_eq!(bar.low, Price::from("1.00010"));
5306 assert_eq!(bar.close, Price::from("1.00010"));
5307 assert_eq!(bar.volume, Quantity::from(150));
5308 }
5309
5310 #[rstest]
5311 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
5312 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5313
5314 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
5317 let handler = Arc::new(Mutex::new(Vec::new()));
5318 let handler_clone = Arc::clone(&handler);
5319
5320 let aggregator_5 = RenkoBarAggregator::new(
5321 bar_type_5,
5322 instrument.price_precision(),
5323 instrument.size_precision(),
5324 instrument.price_increment(),
5325 move |_bar: Bar| {
5326 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5327 handler_guard.push(_bar);
5328 },
5329 );
5330
5331 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
5333 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
5334
5335 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
5337 let handler2 = Arc::new(Mutex::new(Vec::new()));
5338 let handler2_clone = Arc::clone(&handler2);
5339
5340 let aggregator_20 = RenkoBarAggregator::new(
5341 bar_type_20,
5342 instrument.price_precision(),
5343 instrument.size_precision(),
5344 instrument.price_increment(),
5345 move |_bar: Bar| {
5346 let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
5347 handler_guard.push(_bar);
5348 },
5349 );
5350
5351 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
5353 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
5354 }
5355
5356 #[rstest]
5357 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
5358 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5359 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5361 let handler = Arc::new(Mutex::new(Vec::new()));
5362 let handler_clone = Arc::clone(&handler);
5363
5364 let mut aggregator = RenkoBarAggregator::new(
5365 bar_type,
5366 instrument.price_precision(),
5367 instrument.size_precision(),
5368 instrument.price_increment(),
5369 move |bar: Bar| {
5370 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5371 handler_guard.push(bar);
5372 },
5373 );
5374
5375 aggregator.update(
5377 Price::from("1.00000"),
5378 Quantity::from(1),
5379 UnixNanos::from(1000),
5380 );
5381 aggregator.update(
5382 Price::from("1.00010"),
5383 Quantity::from(1),
5384 UnixNanos::from(2000),
5385 ); aggregator.update(
5387 Price::from("1.00020"),
5388 Quantity::from(1),
5389 UnixNanos::from(3000),
5390 ); aggregator.update(
5392 Price::from("1.00025"),
5393 Quantity::from(1),
5394 UnixNanos::from(4000),
5395 ); aggregator.update(
5397 Price::from("1.00030"),
5398 Quantity::from(1),
5399 UnixNanos::from(5000),
5400 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
5403 assert_eq!(handler_guard.len(), 3);
5404
5405 let bar1 = &handler_guard[0];
5406 assert_eq!(bar1.open, Price::from("1.00000"));
5407 assert_eq!(bar1.close, Price::from("1.00010"));
5408
5409 let bar2 = &handler_guard[1];
5410 assert_eq!(bar2.open, Price::from("1.00010"));
5411 assert_eq!(bar2.close, Price::from("1.00020"));
5412
5413 let bar3 = &handler_guard[2];
5414 assert_eq!(bar3.open, Price::from("1.00020"));
5415 assert_eq!(bar3.close, Price::from("1.00030"));
5416 }
5417
5418 #[rstest]
5419 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
5420 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5421 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5423 let handler = Arc::new(Mutex::new(Vec::new()));
5424 let handler_clone = Arc::clone(&handler);
5425
5426 let mut aggregator = RenkoBarAggregator::new(
5427 bar_type,
5428 instrument.price_precision(),
5429 instrument.size_precision(),
5430 instrument.price_increment(),
5431 move |bar: Bar| {
5432 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5433 handler_guard.push(bar);
5434 },
5435 );
5436
5437 aggregator.update(
5439 Price::from("1.00000"),
5440 Quantity::from(1),
5441 UnixNanos::from(1000),
5442 );
5443 aggregator.update(
5444 Price::from("1.00010"),
5445 Quantity::from(1),
5446 UnixNanos::from(2000),
5447 ); aggregator.update(
5449 Price::from("0.99990"),
5450 Quantity::from(1),
5451 UnixNanos::from(3000),
5452 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
5455 assert_eq!(handler_guard.len(), 3);
5456
5457 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
5459 assert_eq!(bar1.high, Price::from("1.00010"));
5460 assert_eq!(bar1.low, Price::from("1.00000"));
5461 assert_eq!(bar1.close, Price::from("1.00010"));
5462
5463 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
5465 assert_eq!(bar2.high, Price::from("1.00010"));
5466 assert_eq!(bar2.low, Price::from("1.00000"));
5467 assert_eq!(bar2.close, Price::from("1.00000"));
5468
5469 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
5471 assert_eq!(bar3.high, Price::from("1.00000"));
5472 assert_eq!(bar3.low, Price::from("0.99990"));
5473 assert_eq!(bar3.close, Price::from("0.99990"));
5474 }
5475
5476 #[rstest]
5477 fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
5478 let instrument = InstrumentAny::Equity(equity_aapl);
5479 let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
5480 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5481 let handler = Arc::new(Mutex::new(Vec::new()));
5482 let handler_clone = Arc::clone(&handler);
5483
5484 let mut aggregator = TickImbalanceBarAggregator::new(
5485 bar_type,
5486 instrument.price_precision(),
5487 instrument.size_precision(),
5488 move |bar: Bar| {
5489 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5490 handler_guard.push(bar);
5491 },
5492 );
5493
5494 let buy = TradeTick {
5495 aggressor_side: AggressorSide::Buyer,
5496 ..TradeTick::default()
5497 };
5498 let sell = TradeTick {
5499 aggressor_side: AggressorSide::Seller,
5500 ..TradeTick::default()
5501 };
5502
5503 aggregator.handle_trade(buy);
5504 aggregator.handle_trade(sell);
5505 aggregator.handle_trade(buy);
5506
5507 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5508 assert_eq!(handler_guard.len(), 0);
5509 }
5510
5511 #[rstest]
5512 fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
5513 let instrument = InstrumentAny::Equity(equity_aapl);
5514 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
5515 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5516 let handler = Arc::new(Mutex::new(Vec::new()));
5517 let handler_clone = Arc::clone(&handler);
5518
5519 let mut aggregator = TickImbalanceBarAggregator::new(
5520 bar_type,
5521 instrument.price_precision(),
5522 instrument.size_precision(),
5523 move |bar: Bar| {
5524 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5525 handler_guard.push(bar);
5526 },
5527 );
5528
5529 let buy = TradeTick {
5530 aggressor_side: AggressorSide::Buyer,
5531 ..TradeTick::default()
5532 };
5533 let no_aggressor = TradeTick {
5534 aggressor_side: AggressorSide::NoAggressor,
5535 ..TradeTick::default()
5536 };
5537
5538 aggregator.handle_trade(buy);
5539 aggregator.handle_trade(no_aggressor);
5540 aggregator.handle_trade(buy);
5541
5542 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5543 assert_eq!(handler_guard.len(), 1);
5544 }
5545
5546 #[rstest]
5547 fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
5548 let instrument = InstrumentAny::Equity(equity_aapl);
5549 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
5550 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5551 let handler = Arc::new(Mutex::new(Vec::new()));
5552 let handler_clone = Arc::clone(&handler);
5553
5554 let mut aggregator = TickRunsBarAggregator::new(
5555 bar_type,
5556 instrument.price_precision(),
5557 instrument.size_precision(),
5558 move |bar: Bar| {
5559 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5560 handler_guard.push(bar);
5561 },
5562 );
5563
5564 let buy = TradeTick {
5565 aggressor_side: AggressorSide::Buyer,
5566 ..TradeTick::default()
5567 };
5568 let sell = TradeTick {
5569 aggressor_side: AggressorSide::Seller,
5570 ..TradeTick::default()
5571 };
5572
5573 aggregator.handle_trade(buy);
5574 aggregator.handle_trade(buy);
5575 aggregator.handle_trade(sell);
5576 aggregator.handle_trade(sell);
5577
5578 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5579 assert_eq!(handler_guard.len(), 2);
5580 }
5581
5582 #[rstest]
5583 fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5584 let instrument = InstrumentAny::Equity(equity_aapl);
5585 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
5586 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5587 let handler = Arc::new(Mutex::new(Vec::new()));
5588 let handler_clone = Arc::clone(&handler);
5589
5590 let mut aggregator = VolumeImbalanceBarAggregator::new(
5591 bar_type,
5592 instrument.price_precision(),
5593 instrument.size_precision(),
5594 move |bar: Bar| {
5595 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5596 handler_guard.push(bar);
5597 },
5598 );
5599
5600 let large_trade = TradeTick {
5601 size: Quantity::from(25),
5602 aggressor_side: AggressorSide::Buyer,
5603 ..TradeTick::default()
5604 };
5605
5606 aggregator.handle_trade(large_trade);
5607
5608 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5609 assert_eq!(handler_guard.len(), 2);
5610 }
5611
5612 #[rstest]
5613 fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
5614 equity_aapl: Equity,
5615 ) {
5616 let instrument = InstrumentAny::Equity(equity_aapl);
5617 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
5618 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5619 let handler = Arc::new(Mutex::new(Vec::new()));
5620 let handler_clone = Arc::clone(&handler);
5621
5622 let mut aggregator = VolumeImbalanceBarAggregator::new(
5623 bar_type,
5624 instrument.price_precision(),
5625 instrument.size_precision(),
5626 move |bar: Bar| {
5627 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5628 handler_guard.push(bar);
5629 },
5630 );
5631
5632 let buy = TradeTick {
5633 size: Quantity::from(5),
5634 aggressor_side: AggressorSide::Buyer,
5635 ..TradeTick::default()
5636 };
5637 let no_aggressor = TradeTick {
5638 size: Quantity::from(3),
5639 aggressor_side: AggressorSide::NoAggressor,
5640 ..TradeTick::default()
5641 };
5642
5643 aggregator.handle_trade(buy);
5644 aggregator.handle_trade(no_aggressor);
5645 aggregator.handle_trade(buy);
5646
5647 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5648 assert_eq!(handler_guard.len(), 1);
5649 }
5650
5651 #[rstest]
5652 fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5653 let instrument = InstrumentAny::Equity(equity_aapl);
5654 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
5655 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5656 let handler = Arc::new(Mutex::new(Vec::new()));
5657 let handler_clone = Arc::clone(&handler);
5658
5659 let mut aggregator = VolumeRunsBarAggregator::new(
5660 bar_type,
5661 instrument.price_precision(),
5662 instrument.size_precision(),
5663 move |bar: Bar| {
5664 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5665 handler_guard.push(bar);
5666 },
5667 );
5668
5669 let large_trade = TradeTick {
5670 size: Quantity::from(25),
5671 aggressor_side: AggressorSide::Buyer,
5672 ..TradeTick::default()
5673 };
5674
5675 aggregator.handle_trade(large_trade);
5676
5677 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5678 assert_eq!(handler_guard.len(), 2);
5679 }
5680
5681 #[rstest]
5682 fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5683 let instrument = InstrumentAny::Equity(equity_aapl);
5684 let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
5685 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5686 let handler = Arc::new(Mutex::new(Vec::new()));
5687 let handler_clone = Arc::clone(&handler);
5688
5689 let mut aggregator = ValueRunsBarAggregator::new(
5690 bar_type,
5691 instrument.price_precision(),
5692 instrument.size_precision(),
5693 move |bar: Bar| {
5694 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5695 handler_guard.push(bar);
5696 },
5697 );
5698
5699 let large_trade = TradeTick {
5700 price: Price::from("5.00"),
5701 size: Quantity::from(25),
5702 aggressor_side: AggressorSide::Buyer,
5703 ..TradeTick::default()
5704 };
5705
5706 aggregator.handle_trade(large_trade);
5707
5708 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5709 assert_eq!(handler_guard.len(), 2);
5710 }
5711
5712 #[rstest]
5713 fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5714 let instrument = InstrumentAny::Equity(equity_aapl);
5715 let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
5716 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5717 let handler = Arc::new(Mutex::new(Vec::new()));
5718 let handler_clone = Arc::clone(&handler);
5719
5720 let mut aggregator = ValueBarAggregator::new(
5721 bar_type,
5722 instrument.price_precision(),
5723 instrument.size_precision(),
5724 move |bar: Bar| {
5725 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5726 handler_guard.push(bar);
5727 },
5728 );
5729
5730 aggregator.update(
5732 Price::from("1000.00"),
5733 Quantity::from(3),
5734 UnixNanos::default(),
5735 );
5736
5737 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5739 assert_eq!(handler_guard.len(), 3);
5740 for bar in handler_guard.iter() {
5741 assert_eq!(bar.volume, Quantity::from(1));
5742 }
5743 }
5744
5745 #[rstest]
5746 fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5747 let instrument = InstrumentAny::Equity(equity_aapl);
5748 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5749 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5750 let handler = Arc::new(Mutex::new(Vec::new()));
5751 let handler_clone = Arc::clone(&handler);
5752
5753 let mut aggregator = ValueImbalanceBarAggregator::new(
5754 bar_type,
5755 instrument.price_precision(),
5756 instrument.size_precision(),
5757 move |bar: Bar| {
5758 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5759 handler_guard.push(bar);
5760 },
5761 );
5762
5763 let trade = TradeTick {
5764 price: Price::from("1000.00"),
5765 size: Quantity::from(3),
5766 aggressor_side: AggressorSide::Buyer,
5767 instrument_id: instrument.id(),
5768 ..TradeTick::default()
5769 };
5770
5771 aggregator.handle_trade(trade);
5772
5773 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5774 assert_eq!(handler_guard.len(), 3);
5775 for bar in handler_guard.iter() {
5776 assert_eq!(bar.volume, Quantity::from(1));
5777 }
5778 }
5779
5780 #[rstest]
5781 fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
5782 let instrument = InstrumentAny::Equity(equity_aapl);
5783 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5784 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5785 let handler = Arc::new(Mutex::new(Vec::new()));
5786 let handler_clone = Arc::clone(&handler);
5787
5788 let mut aggregator = ValueImbalanceBarAggregator::new(
5789 bar_type,
5790 instrument.price_precision(),
5791 instrument.size_precision(),
5792 move |bar: Bar| {
5793 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5794 handler_guard.push(bar);
5795 },
5796 );
5797
5798 let sell_tick = TradeTick {
5800 price: Price::from("10.00"),
5801 size: Quantity::from(5),
5802 aggressor_side: AggressorSide::Seller,
5803 instrument_id: instrument.id(),
5804 ..TradeTick::default()
5805 };
5806
5807 let buy_tick = TradeTick {
5810 price: Price::from("1000.00"),
5811 size: Quantity::from(1),
5812 aggressor_side: AggressorSide::Buyer,
5813 instrument_id: instrument.id(),
5814 ts_init: UnixNanos::from(1),
5815 ts_event: UnixNanos::from(1),
5816 ..TradeTick::default()
5817 };
5818
5819 aggregator.handle_trade(sell_tick);
5820 aggregator.handle_trade(buy_tick);
5821
5822 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5823 assert_eq!(handler_guard.len(), 1);
5824 assert_eq!(handler_guard[0].volume, Quantity::from(6));
5825 }
5826
5827 #[rstest]
5828 fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5829 let instrument = InstrumentAny::Equity(equity_aapl);
5830 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
5831 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5832 let handler = Arc::new(Mutex::new(Vec::new()));
5833 let handler_clone = Arc::clone(&handler);
5834
5835 let mut aggregator = ValueRunsBarAggregator::new(
5836 bar_type,
5837 instrument.price_precision(),
5838 instrument.size_precision(),
5839 move |bar: Bar| {
5840 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5841 handler_guard.push(bar);
5842 },
5843 );
5844
5845 let trade = TradeTick {
5846 price: Price::from("1000.00"),
5847 size: Quantity::from(3),
5848 aggressor_side: AggressorSide::Buyer,
5849 instrument_id: instrument.id(),
5850 ..TradeTick::default()
5851 };
5852
5853 aggregator.handle_trade(trade);
5854
5855 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5856 assert_eq!(handler_guard.len(), 3);
5857 for bar in handler_guard.iter() {
5858 assert_eq!(bar.volume, Quantity::from(1));
5859 }
5860 }
5861
5862 #[rstest]
5863 #[case(1000_u64)]
5864 #[case(1500_u64)]
5865 fn test_volume_imbalance_bar_aggregator_large_step_no_overflow(
5866 equity_aapl: Equity,
5867 #[case] step: u64,
5868 ) {
5869 let instrument = InstrumentAny::Equity(equity_aapl);
5870 let bar_spec = BarSpecification::new(
5871 step as usize,
5872 BarAggregation::VolumeImbalance,
5873 PriceType::Last,
5874 );
5875 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5876 let handler = Arc::new(Mutex::new(Vec::new()));
5877 let handler_clone = Arc::clone(&handler);
5878
5879 let mut aggregator = VolumeImbalanceBarAggregator::new(
5880 bar_type,
5881 instrument.price_precision(),
5882 instrument.size_precision(),
5883 move |bar: Bar| {
5884 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5885 handler_guard.push(bar);
5886 },
5887 );
5888
5889 let trade = TradeTick {
5890 size: Quantity::from(step * 2),
5891 aggressor_side: AggressorSide::Buyer,
5892 ..TradeTick::default()
5893 };
5894
5895 aggregator.handle_trade(trade);
5896
5897 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5898 assert_eq!(handler_guard.len(), 2);
5899 for bar in handler_guard.iter() {
5900 assert_eq!(bar.volume.as_f64(), step as f64);
5901 }
5902 }
5903
5904 #[rstest]
5905 fn test_volume_imbalance_bar_aggregator_different_large_steps_produce_different_bar_counts(
5906 equity_aapl: Equity,
5907 ) {
5908 let instrument = InstrumentAny::Equity(equity_aapl);
5909 let total_volume = 3000_u64;
5910 let mut results = Vec::new();
5911
5912 for step in [1000_usize, 1500] {
5913 let bar_spec =
5914 BarSpecification::new(step, BarAggregation::VolumeImbalance, PriceType::Last);
5915 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5916 let handler = Arc::new(Mutex::new(Vec::new()));
5917 let handler_clone = Arc::clone(&handler);
5918
5919 let mut aggregator = VolumeImbalanceBarAggregator::new(
5920 bar_type,
5921 instrument.price_precision(),
5922 instrument.size_precision(),
5923 move |bar: Bar| {
5924 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5925 handler_guard.push(bar);
5926 },
5927 );
5928
5929 let trade = TradeTick {
5930 size: Quantity::from(total_volume),
5931 aggressor_side: AggressorSide::Buyer,
5932 ..TradeTick::default()
5933 };
5934
5935 aggregator.handle_trade(trade);
5936
5937 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5938 results.push(handler_guard.len());
5939 }
5940
5941 assert_eq!(results[0], 3); assert_eq!(results[1], 2); assert_ne!(results[0], results[1]);
5944 }
5945
5946 #[rstest]
5947 #[case(1000_u64)]
5948 #[case(1500_u64)]
5949 fn test_volume_runs_bar_aggregator_large_step_no_overflow(
5950 equity_aapl: Equity,
5951 #[case] step: u64,
5952 ) {
5953 let instrument = InstrumentAny::Equity(equity_aapl);
5954 let bar_spec =
5955 BarSpecification::new(step as usize, BarAggregation::VolumeRuns, PriceType::Last);
5956 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5957 let handler = Arc::new(Mutex::new(Vec::new()));
5958 let handler_clone = Arc::clone(&handler);
5959
5960 let mut aggregator = VolumeRunsBarAggregator::new(
5961 bar_type,
5962 instrument.price_precision(),
5963 instrument.size_precision(),
5964 move |bar: Bar| {
5965 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5966 handler_guard.push(bar);
5967 },
5968 );
5969
5970 let trade = TradeTick {
5971 size: Quantity::from(step * 2),
5972 aggressor_side: AggressorSide::Buyer,
5973 ..TradeTick::default()
5974 };
5975
5976 aggregator.handle_trade(trade);
5977
5978 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5979 assert_eq!(handler_guard.len(), 2);
5980 for bar in handler_guard.iter() {
5981 assert_eq!(bar.volume.as_f64(), step as f64);
5982 }
5983 }
5984
5985 #[rstest]
5986 fn test_volume_runs_bar_aggregator_different_large_steps_produce_different_bar_counts(
5987 equity_aapl: Equity,
5988 ) {
5989 let instrument = InstrumentAny::Equity(equity_aapl);
5990 let total_volume = 3000_u64;
5991 let mut results = Vec::new();
5992
5993 for step in [1000_usize, 1500] {
5994 let bar_spec = BarSpecification::new(step, BarAggregation::VolumeRuns, PriceType::Last);
5995 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5996 let handler = Arc::new(Mutex::new(Vec::new()));
5997 let handler_clone = Arc::clone(&handler);
5998
5999 let mut aggregator = VolumeRunsBarAggregator::new(
6000 bar_type,
6001 instrument.price_precision(),
6002 instrument.size_precision(),
6003 move |bar: Bar| {
6004 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
6005 handler_guard.push(bar);
6006 },
6007 );
6008
6009 let trade = TradeTick {
6010 size: Quantity::from(total_volume),
6011 aggressor_side: AggressorSide::Buyer,
6012 ..TradeTick::default()
6013 };
6014
6015 aggregator.handle_trade(trade);
6016
6017 let handler_guard = handler.lock().expect(MUTEX_POISONED);
6018 results.push(handler_guard.len());
6019 }
6020
6021 assert_eq!(results[0], 3); assert_eq!(results[1], 2); assert_ne!(results[0], results[1]);
6024 }
6025
6026 #[rstest]
6028 fn test_time_bar_historical_defers_event_at_ts_init_until_after_update(equity_aapl: Equity) {
6029 let instrument = InstrumentAny::Equity(equity_aapl);
6030 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6031 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6032 let handler = Arc::new(Mutex::new(Vec::new()));
6033 let handler_clone = Arc::clone(&handler);
6034 let clock = Rc::new(RefCell::new(TestClock::new()));
6035
6036 let mut agg = TimeBarAggregator::new(
6037 bar_type,
6038 instrument.price_precision(),
6039 instrument.size_precision(),
6040 clock.clone(),
6041 move |bar: Bar| {
6042 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6043 h.push(bar);
6044 },
6045 true,
6046 true,
6047 BarIntervalType::LeftOpen,
6048 None,
6049 0,
6050 false,
6051 );
6052 agg.historical_mode = true;
6053 agg.set_clock_internal(clock);
6054 let boxed: Box<dyn BarAggregator> = Box::new(agg);
6055 let rc = Rc::new(RefCell::new(boxed));
6056 rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
6057
6058 rc.borrow_mut().update(
6059 Price::from("100.00"),
6060 Quantity::from(1),
6061 UnixNanos::default(),
6062 );
6063 rc.borrow_mut().update(
6064 Price::from("100.00"),
6065 Quantity::from(1),
6066 UnixNanos::from(1_000_000_000),
6067 );
6068
6069 let bars = handler.lock().expect(MUTEX_POISONED);
6070 assert!(
6071 !bars.is_empty(),
6072 "deferred event at ts_init should produce a bar that includes the update"
6073 );
6074 let last_bar = bars.last().unwrap();
6075 assert_eq!(last_bar.close, Price::from("100.00"));
6076 assert!(
6077 last_bar.volume.as_f64() >= 1.0,
6078 "bar built after deferred event should include the update at ts_init"
6079 );
6080 }
6081
6082 #[rstest]
6083 fn test_spread_quote_quote_driven_emits_when_all_legs_received(equity_aapl: Equity) {
6084 let instrument = InstrumentAny::Equity(equity_aapl);
6085 let leg1 = instrument.id();
6086 let leg2 = InstrumentId::from("MSFT.XNAS");
6087 let spread_id = InstrumentId::from("SPREAD.XNAS");
6088 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6089 let handler = Arc::new(Mutex::new(Vec::new()));
6090 let handler_clone = Arc::clone(&handler);
6091 let clock = Rc::new(RefCell::new(TestClock::new()));
6092
6093 let mut agg = SpreadQuoteAggregator::new(
6094 spread_id,
6095 &legs,
6096 true,
6097 instrument.price_precision(),
6098 0,
6099 Box::new(move |q: QuoteTick| {
6100 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6101 }),
6102 clock,
6103 false,
6104 None,
6105 0,
6106 None,
6107 None,
6108 );
6109
6110 let ts = UnixNanos::from(1_000_000_000);
6111 agg.handle_quote_tick(QuoteTick::new(
6112 leg1,
6113 Price::from("100.00"),
6114 Price::from("100.10"),
6115 Quantity::from(10),
6116 Quantity::from(10),
6117 ts,
6118 ts,
6119 ));
6120 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6121
6122 agg.handle_quote_tick(QuoteTick::new(
6123 leg2,
6124 Price::from("99.00"),
6125 Price::from("99.10"),
6126 Quantity::from(10),
6127 Quantity::from(10),
6128 ts,
6129 ts,
6130 ));
6131 let quotes = handler.lock().expect(MUTEX_POISONED);
6132 assert_eq!(quotes.len(), 1);
6133 assert_eq!(quotes[0].instrument_id, spread_id);
6134 assert!(quotes[0].bid_price < quotes[0].ask_price);
6135 }
6136
6137 #[rstest]
6138 fn test_spread_quote_futures_pricing_signed_ratios(equity_aapl: Equity) {
6139 let instrument = InstrumentAny::Equity(equity_aapl);
6140 let leg1 = instrument.id();
6141 let leg2 = InstrumentId::from("MSFT.XNAS");
6142 let spread_id = InstrumentId::from("SPREAD.XNAS");
6143 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6144 let handler = Arc::new(Mutex::new(Vec::new()));
6145 let handler_clone = Arc::clone(&handler);
6146 let clock = Rc::new(RefCell::new(TestClock::new()));
6147
6148 let mut agg = SpreadQuoteAggregator::new(
6149 spread_id,
6150 &legs,
6151 true,
6152 instrument.price_precision(),
6153 0,
6154 Box::new(move |q: QuoteTick| {
6155 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6156 }),
6157 clock,
6158 false,
6159 None,
6160 0,
6161 None,
6162 None,
6163 );
6164
6165 let ts = UnixNanos::from(1_000_000_000);
6166 agg.handle_quote_tick(QuoteTick::new(
6167 leg1,
6168 Price::from("10.00"),
6169 Price::from("10.10"),
6170 Quantity::from(100),
6171 Quantity::from(100),
6172 ts,
6173 ts,
6174 ));
6175 agg.handle_quote_tick(QuoteTick::new(
6176 leg2,
6177 Price::from("20.00"),
6178 Price::from("20.10"),
6179 Quantity::from(100),
6180 Quantity::from(100),
6181 ts,
6182 ts,
6183 ));
6184 let quotes = handler.lock().expect(MUTEX_POISONED);
6185 assert_eq!(quotes.len(), 1);
6186 let q = "es[0];
6187 assert_eq!(q.instrument_id, spread_id);
6188 assert_eq!(q.bid_price, Price::from("-10.10"));
6189 assert_eq!(q.ask_price, Price::from("-9.90"));
6190 }
6191
6192 #[rstest]
6193 fn test_spread_quote_size_calculation_non_unit_ratios(equity_aapl: Equity) {
6194 let instrument = InstrumentAny::Equity(equity_aapl);
6195 let leg1 = instrument.id();
6196 let leg2 = InstrumentId::from("MSFT.XNAS");
6197 let spread_id = InstrumentId::from("SPREAD.XNAS");
6198 let legs = vec![(leg1, 2_i64), (leg2, -1_i64)];
6199 let handler = Arc::new(Mutex::new(Vec::new()));
6200 let handler_clone = Arc::clone(&handler);
6201 let clock = Rc::new(RefCell::new(TestClock::new()));
6202
6203 let mut agg = SpreadQuoteAggregator::new(
6204 spread_id,
6205 &legs,
6206 true,
6207 instrument.price_precision(),
6208 0,
6209 Box::new(move |q: QuoteTick| {
6210 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6211 }),
6212 clock,
6213 false,
6214 None,
6215 0,
6216 None,
6217 None,
6218 );
6219
6220 let ts = UnixNanos::from(1_000_000_000);
6221 agg.handle_quote_tick(QuoteTick::new(
6222 leg1,
6223 Price::from("10.00"),
6224 Price::from("10.10"),
6225 Quantity::from(100),
6226 Quantity::from(40),
6227 ts,
6228 ts,
6229 ));
6230 agg.handle_quote_tick(QuoteTick::new(
6231 leg2,
6232 Price::from("10.00"),
6233 Price::from("10.10"),
6234 Quantity::from(50),
6235 Quantity::from(30),
6236 ts,
6237 ts,
6238 ));
6239 let quotes = handler.lock().expect(MUTEX_POISONED);
6240 assert_eq!(quotes.len(), 1);
6241 let q = "es[0];
6242 assert_eq!(q.bid_size.as_f64(), 30.0);
6243 assert_eq!(q.ask_size.as_f64(), 20.0);
6244 }
6245
6246 #[rstest]
6247 fn test_spread_quote_timer_driven_emission_cadence(equity_aapl: Equity) {
6248 let instrument = InstrumentAny::Equity(equity_aapl);
6249 let leg1 = instrument.id();
6250 let leg2 = InstrumentId::from("MSFT.XNAS");
6251 let spread_id = InstrumentId::from("SPREAD.XNAS");
6252 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6253 let handler = Arc::new(Mutex::new(Vec::new()));
6254 let handler_clone = Arc::clone(&handler);
6255 let clock = Rc::new(RefCell::new(TestClock::new()));
6256 clock.borrow_mut().set_time(UnixNanos::from(0));
6257
6258 let agg = SpreadQuoteAggregator::new(
6259 spread_id,
6260 &legs,
6261 true,
6262 instrument.price_precision(),
6263 0,
6264 Box::new(move |q: QuoteTick| {
6265 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6266 }),
6267 clock.clone(),
6268 false,
6269 Some(1),
6270 0,
6271 None,
6272 None,
6273 );
6274 let rc = Rc::new(RefCell::new(agg));
6275 rc.borrow_mut().prepare_for_timer_mode(&rc);
6276 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6277
6278 for event in clock.borrow_mut().advance_time(UnixNanos::from(0), true) {
6279 rc.borrow_mut().on_timer_fire(event.ts_event);
6280 }
6281 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6282
6283 let ts1 = UnixNanos::from(1_000_000_000);
6284 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6285 leg1,
6286 Price::from("100.00"),
6287 Price::from("100.10"),
6288 Quantity::from(10),
6289 Quantity::from(10),
6290 ts1,
6291 ts1,
6292 ));
6293 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6294 leg2,
6295 Price::from("99.00"),
6296 Price::from("99.10"),
6297 Quantity::from(10),
6298 Quantity::from(10),
6299 ts1,
6300 ts1,
6301 ));
6302
6303 for event in clock.borrow_mut().advance_time(ts1, true) {
6304 rc.borrow_mut().on_timer_fire(event.ts_event);
6305 }
6306
6307 {
6308 let quotes = handler.lock().expect(MUTEX_POISONED);
6309 assert_eq!(quotes.len(), 1);
6310 assert_eq!(quotes[0].ts_event, ts1);
6311 assert_eq!(quotes[0].ts_init, ts1);
6312 }
6313
6314 let ts2 = UnixNanos::from(2_000_000_000);
6315 for event in clock.borrow_mut().advance_time(ts2, true) {
6316 rc.borrow_mut().on_timer_fire(event.ts_event);
6317 }
6318
6319 let quotes = handler.lock().expect(MUTEX_POISONED);
6320 assert_eq!(quotes.len(), 1);
6321 }
6322
6323 #[rstest]
6324 fn test_spread_quote_historical_timer_waits_for_all_legs(equity_aapl: Equity) {
6325 let instrument = InstrumentAny::Equity(equity_aapl);
6326 let leg1 = instrument.id();
6327 let leg2 = InstrumentId::from("MSFT.XNAS");
6328 let spread_id = InstrumentId::from("SPREAD.XNAS");
6329 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6330 let handler = Arc::new(Mutex::new(Vec::new()));
6331 let handler_clone = Arc::clone(&handler);
6332 let clock = Rc::new(RefCell::new(TestClock::new()));
6333
6334 let agg = SpreadQuoteAggregator::new(
6335 spread_id,
6336 &legs,
6337 true,
6338 instrument.price_precision(),
6339 0,
6340 Box::new(move |q: QuoteTick| {
6341 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6342 }),
6343 clock.clone(),
6345 true,
6346 Some(1),
6347 0,
6348 None,
6349 None,
6350 );
6351 let rc = Rc::new(RefCell::new(agg));
6352 rc.borrow_mut().prepare_for_timer_mode(&rc);
6353 rc.borrow_mut().set_clock(clock);
6354
6355 let ts1 = UnixNanos::from(1_000_000_000);
6356 let ts2 = UnixNanos::from(2_000_000_000);
6357 let ts3 = UnixNanos::from(3_000_000_000);
6358 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6359 leg1,
6360 Price::from("100.00"),
6361 Price::from("100.10"),
6362 Quantity::from(10),
6363 Quantity::from(10),
6364 ts1,
6365 ts1,
6366 ));
6367 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6368
6369 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6370 leg2,
6371 Price::from("99.00"),
6372 Price::from("99.10"),
6373 Quantity::from(10),
6374 Quantity::from(10),
6375 ts2,
6376 ts2,
6377 ));
6378 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6379
6380 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6381 leg1,
6382 Price::from("100.00"),
6383 Price::from("100.10"),
6384 Quantity::from(10),
6385 Quantity::from(10),
6386 ts3,
6387 ts3,
6388 ));
6389 let quotes = handler.lock().expect(MUTEX_POISONED);
6390 assert_eq!(
6391 quotes.len(),
6392 1,
6393 "deferred event at ts2 is processed when we have all legs and advance to ts3"
6394 );
6395 }
6396
6397 #[rstest]
6398 fn test_spread_quote_historical_flush_emits_pending_final_quote(equity_aapl: Equity) {
6399 let instrument = InstrumentAny::Equity(equity_aapl);
6400 let leg1 = instrument.id();
6401 let leg2 = InstrumentId::from("MSFT.XNAS");
6402 let spread_id = InstrumentId::from("SPREAD.XNAS");
6403 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6404 let handler = Arc::new(Mutex::new(Vec::new()));
6405 let handler_clone = Arc::clone(&handler);
6406 let clock = Rc::new(RefCell::new(TestClock::new()));
6407
6408 let agg = SpreadQuoteAggregator::new(
6409 spread_id,
6410 &legs,
6411 true,
6412 instrument.price_precision(),
6413 0,
6414 Box::new(move |q: QuoteTick| {
6415 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6416 }),
6417 clock.clone(),
6419 true,
6420 Some(1),
6421 0,
6422 None,
6423 None,
6424 );
6425 let rc = Rc::new(RefCell::new(agg));
6426 rc.borrow_mut().prepare_for_timer_mode(&rc);
6427 rc.borrow_mut().set_clock(clock);
6428
6429 let ts1 = UnixNanos::from(1_000_000_000);
6430 let ts2 = UnixNanos::from(2_000_000_000);
6431 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6432 leg1,
6433 Price::from("100.00"),
6434 Price::from("100.10"),
6435 Quantity::from(10),
6436 Quantity::from(10),
6437 ts1,
6438 ts1,
6439 ));
6440 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6441 leg2,
6442 Price::from("99.00"),
6443 Price::from("99.10"),
6444 Quantity::from(10),
6445 Quantity::from(10),
6446 ts2,
6447 ts2,
6448 ));
6449
6450 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6451
6452 rc.borrow_mut().flush_pending_historical_quote();
6453
6454 let quotes = handler.lock().expect(MUTEX_POISONED);
6455 assert_eq!(
6456 quotes.len(),
6457 1,
6458 "final historical quote should be emitted when the deferred event is flushed",
6459 );
6460 assert_eq!(quotes[0].ts_event, ts2);
6461 }
6462
6463 #[rstest]
6464 fn test_spread_quote_option_vega_weighting(equity_aapl: Equity) {
6465 let instrument = InstrumentAny::Equity(equity_aapl);
6466 let leg1 = instrument.id();
6467 let leg2 = InstrumentId::from("MSFT.XNAS");
6468 let spread_id = InstrumentId::from("SPREAD.XNAS");
6469 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6470 let handler = Arc::new(Mutex::new(Vec::new()));
6471 let handler_clone = Arc::clone(&handler);
6472 let clock = Rc::new(RefCell::new(TestClock::new()));
6473
6474 let mut vega_provider = MapVegaProvider::new();
6475 vega_provider.insert(leg1, 0.15);
6476 vega_provider.insert(leg2, 0.12);
6477
6478 let mut agg = SpreadQuoteAggregator::new(
6479 spread_id,
6480 &legs,
6481 false,
6482 instrument.price_precision(),
6483 0,
6484 Box::new(move |q: QuoteTick| {
6485 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6486 }),
6487 clock,
6488 false,
6489 None,
6490 0,
6491 Some(Box::new(vega_provider)),
6492 None,
6493 );
6494
6495 let ts = UnixNanos::from(1_000_000_000);
6496 agg.handle_quote_tick(QuoteTick::new(
6497 leg1,
6498 Price::from("10.00"),
6499 Price::from("10.20"),
6500 Quantity::from(100),
6501 Quantity::from(100),
6502 ts,
6503 ts,
6504 ));
6505 agg.handle_quote_tick(QuoteTick::new(
6506 leg2,
6507 Price::from("11.00"),
6508 Price::from("11.20"),
6509 Quantity::from(100),
6510 Quantity::from(100),
6511 ts,
6512 ts,
6513 ));
6514 let quotes = handler.lock().expect(MUTEX_POISONED);
6515 assert_eq!(quotes.len(), 1);
6516 let q = "es[0];
6517 assert!(q.bid_price < q.ask_price);
6518 assert!(q.ask_price.as_f64() - q.bid_price.as_f64() > 0.0);
6519 }
6520
6521 #[rstest]
6522 fn test_spread_quote_all_zero_vega_fallback(equity_aapl: Equity) {
6523 let instrument = InstrumentAny::Equity(equity_aapl);
6524 let leg1 = instrument.id();
6525 let leg2 = InstrumentId::from("MSFT.XNAS");
6526 let spread_id = InstrumentId::from("SPREAD.XNAS");
6527 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6528 let handler = Arc::new(Mutex::new(Vec::new()));
6529 let handler_clone = Arc::clone(&handler);
6530 let clock = Rc::new(RefCell::new(TestClock::new()));
6531
6532 let mut vega_provider = MapVegaProvider::new();
6533 vega_provider.insert(leg1, 0.0);
6534 vega_provider.insert(leg2, 0.0);
6535
6536 let mut agg = SpreadQuoteAggregator::new(
6537 spread_id,
6538 &legs,
6539 false,
6540 instrument.price_precision(),
6541 0,
6542 Box::new(move |q: QuoteTick| {
6543 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6544 }),
6545 clock,
6546 false,
6547 None,
6548 0,
6549 Some(Box::new(vega_provider)),
6550 None,
6551 );
6552
6553 let ts = UnixNanos::from(1_000_000_000);
6554 agg.handle_quote_tick(QuoteTick::new(
6555 leg1,
6556 Price::from("10.00"),
6557 Price::from("10.10"),
6558 Quantity::from(100),
6559 Quantity::from(100),
6560 ts,
6561 ts,
6562 ));
6563 agg.handle_quote_tick(QuoteTick::new(
6564 leg2,
6565 Price::from("20.00"),
6566 Price::from("20.10"),
6567 Quantity::from(100),
6568 Quantity::from(100),
6569 ts,
6570 ts,
6571 ));
6572 let quotes = handler.lock().expect(MUTEX_POISONED);
6573 assert_eq!(quotes.len(), 1);
6574 let q = "es[0];
6575 assert_eq!(q.bid_price, Price::from("-10.10"));
6576 assert_eq!(q.ask_price, Price::from("-9.90"));
6577 }
6578
6579 #[rstest]
6580 fn test_spread_quote_negative_prices_tick_scheme(equity_aapl: Equity) {
6581 let instrument = InstrumentAny::Equity(equity_aapl);
6582 let leg1 = instrument.id();
6583 let leg2 = InstrumentId::from("MSFT.XNAS");
6584 let spread_id = InstrumentId::from("SPREAD.XNAS");
6585 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6586 let handler = Arc::new(Mutex::new(Vec::new()));
6587 let handler_clone = Arc::clone(&handler);
6588 let clock = Rc::new(RefCell::new(TestClock::new()));
6589 let rounder = FixedTickSchemeRounder::new(0.01).unwrap();
6590
6591 let mut agg = SpreadQuoteAggregator::new(
6592 spread_id,
6593 &legs,
6594 true,
6595 2,
6596 0,
6597 Box::new(move |q: QuoteTick| {
6598 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6599 }),
6600 clock,
6601 false,
6602 None,
6603 0,
6604 None,
6605 Some(Box::new(rounder)),
6606 );
6607
6608 let ts = UnixNanos::from(1_000_000_000);
6609 agg.handle_quote_tick(QuoteTick::new(
6610 leg1,
6611 Price::from("10.00"),
6612 Price::from("10.10"),
6613 Quantity::from(100),
6614 Quantity::from(100),
6615 ts,
6616 ts,
6617 ));
6618 agg.handle_quote_tick(QuoteTick::new(
6619 leg2,
6620 Price::from("20.00"),
6621 Price::from("20.10"),
6622 Quantity::from(100),
6623 Quantity::from(100),
6624 ts,
6625 ts,
6626 ));
6627 let quotes = handler.lock().expect(MUTEX_POISONED);
6628 assert_eq!(quotes.len(), 1);
6629 let q = "es[0];
6630 assert!(q.bid_price.as_f64() < 0.0);
6631 assert!(q.ask_price.as_f64() < 0.0);
6632 assert!(q.bid_price < q.ask_price);
6633 }
6634
6635 #[rstest]
6636 #[case(BarIntervalType::LeftOpen)]
6637 #[case(BarIntervalType::RightOpen)]
6638 fn test_time_bar_skip_first_non_full_bar_noop_on_boundary(
6639 equity_aapl: Equity,
6640 #[case] interval_type: BarIntervalType,
6641 ) {
6642 let instrument = InstrumentAny::Equity(equity_aapl);
6647 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6648 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6649 let handler = Arc::new(Mutex::new(Vec::new()));
6650 let handler_clone = Arc::clone(&handler);
6651 let clock = Rc::new(RefCell::new(TestClock::new()));
6652 clock.borrow_mut().set_time(UnixNanos::from(1_000_000_000));
6653 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6654
6655 let aggregator = TimeBarAggregator::new(
6656 bar_type,
6657 instrument.price_precision(),
6658 instrument.size_precision(),
6659 clock,
6660 move |bar: Bar| {
6661 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6662 h.push(bar);
6663 },
6664 false,
6665 false,
6666 interval_type,
6667 None,
6668 0,
6669 true, );
6671
6672 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6673 let rc = Rc::new(RefCell::new(boxed));
6674 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6675
6676 rc.borrow_mut().update(
6677 Price::from("100.00"),
6678 Quantity::from(1),
6679 UnixNanos::from(1_000_000_000),
6680 );
6681 rc.borrow_mut().build_bar(&TimeEvent::new(
6682 event_name,
6683 UUID4::new(),
6684 UnixNanos::from(2_000_000_000),
6685 UnixNanos::from(2_000_000_000),
6686 ));
6687 rc.borrow_mut().update(
6688 Price::from("101.00"),
6689 Quantity::from(1),
6690 UnixNanos::from(2_500_000_000),
6691 );
6692 rc.borrow_mut().build_bar(&TimeEvent::new(
6693 event_name,
6694 UUID4::new(),
6695 UnixNanos::from(3_000_000_000),
6696 UnixNanos::from(3_000_000_000),
6697 ));
6698
6699 let bars = handler.lock().expect(MUTEX_POISONED);
6700 assert_eq!(bars.len(), 2);
6701 assert_eq!(bars[0].close, Price::from("100.00"));
6702 assert_eq!(bars[1].close, Price::from("101.00"));
6703 }
6704
6705 #[rstest]
6706 #[case(BarIntervalType::LeftOpen)]
6707 #[case(BarIntervalType::RightOpen)]
6708 fn test_time_bar_skip_first_non_full_bar_drops_partial_bar(
6709 equity_aapl: Equity,
6710 #[case] interval_type: BarIntervalType,
6711 ) {
6712 let instrument = InstrumentAny::Equity(equity_aapl);
6716 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6717 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6718 let handler = Arc::new(Mutex::new(Vec::new()));
6719 let handler_clone = Arc::clone(&handler);
6720 let clock = Rc::new(RefCell::new(TestClock::new()));
6721 clock.borrow_mut().set_time(UnixNanos::from(1_500_000_000));
6722 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6723
6724 let aggregator = TimeBarAggregator::new(
6725 bar_type,
6726 instrument.price_precision(),
6727 instrument.size_precision(),
6728 clock,
6729 move |bar: Bar| {
6730 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6731 h.push(bar);
6732 },
6733 false,
6734 false,
6735 interval_type,
6736 None,
6737 0,
6738 true, );
6740
6741 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6742 let rc = Rc::new(RefCell::new(boxed));
6743 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6744
6745 rc.borrow_mut().update(
6746 Price::from("100.00"),
6747 Quantity::from(1),
6748 UnixNanos::from(1_500_000_000),
6749 );
6750 rc.borrow_mut().build_bar(&TimeEvent::new(
6751 event_name,
6752 UUID4::new(),
6753 UnixNanos::from(2_000_000_000),
6754 UnixNanos::from(2_000_000_000),
6755 ));
6756 rc.borrow_mut().update(
6757 Price::from("101.00"),
6758 Quantity::from(1),
6759 UnixNanos::from(2_500_000_000),
6760 );
6761 rc.borrow_mut().build_bar(&TimeEvent::new(
6762 event_name,
6763 UUID4::new(),
6764 UnixNanos::from(3_000_000_000),
6765 UnixNanos::from(3_000_000_000),
6766 ));
6767
6768 let bars = handler.lock().expect(MUTEX_POISONED);
6769 assert_eq!(bars.len(), 1);
6770 assert_eq!(bars[0].close, Price::from("101.00"));
6771 }
6772
6773 #[rstest]
6774 fn test_time_bar_skip_first_non_full_bar_skips_every_call_before_first_close(
6775 equity_aapl: Equity,
6776 ) {
6777 let instrument = InstrumentAny::Equity(equity_aapl);
6781 let bar_spec = BarSpecification::new(10, BarAggregation::Second, PriceType::Last);
6782 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6783 let handler = Arc::new(Mutex::new(Vec::new()));
6784 let handler_clone = Arc::clone(&handler);
6785 let clock = Rc::new(RefCell::new(TestClock::new()));
6786 clock.borrow_mut().set_time(UnixNanos::from(5_000_000_000));
6787 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6788
6789 let aggregator = TimeBarAggregator::new(
6790 bar_type,
6791 instrument.price_precision(),
6792 instrument.size_precision(),
6793 clock,
6794 move |bar: Bar| {
6795 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6796 h.push(bar);
6797 },
6798 false,
6799 false,
6800 BarIntervalType::LeftOpen,
6801 None,
6802 0,
6803 true, );
6805
6806 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6807 let rc = Rc::new(RefCell::new(boxed));
6808 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6809
6810 for (price, update_ts, event_ts) in [
6814 ("100.00", 5_500_000_000_u64, 7_000_000_000_u64),
6815 ("101.00", 7_500_000_000_u64, 8_000_000_000_u64),
6816 ("102.00", 9_000_000_000_u64, 10_000_000_000_u64),
6817 ] {
6818 rc.borrow_mut().update(
6819 Price::from(price),
6820 Quantity::from(1),
6821 UnixNanos::from(update_ts),
6822 );
6823 rc.borrow_mut().build_bar(&TimeEvent::new(
6824 event_name,
6825 UUID4::new(),
6826 UnixNanos::from(event_ts),
6827 UnixNanos::from(event_ts),
6828 ));
6829 }
6830
6831 rc.borrow_mut().update(
6833 Price::from("103.00"),
6834 Quantity::from(1),
6835 UnixNanos::from(10_500_000_000),
6836 );
6837 rc.borrow_mut().build_bar(&TimeEvent::new(
6838 event_name,
6839 UUID4::new(),
6840 UnixNanos::from(11_000_000_000),
6841 UnixNanos::from(11_000_000_000),
6842 ));
6843
6844 let bars = handler.lock().expect(MUTEX_POISONED);
6845 assert_eq!(bars.len(), 1);
6846 assert_eq!(bars[0].close, Price::from("103.00"));
6847 }
6848
6849 #[rstest]
6850 fn test_time_bar_skip_first_non_full_bar_skips_when_build_delay_shifts_start(
6851 equity_aapl: Equity,
6852 ) {
6853 let instrument = InstrumentAny::Equity(equity_aapl);
6858 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6859 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6860 let handler = Arc::new(Mutex::new(Vec::new()));
6861 let handler_clone = Arc::clone(&handler);
6862 let clock = Rc::new(RefCell::new(TestClock::new()));
6863 clock.borrow_mut().set_time(UnixNanos::from(2_000_000_000));
6864 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6865
6866 let aggregator = TimeBarAggregator::new(
6867 bar_type,
6868 instrument.price_precision(),
6869 instrument.size_precision(),
6870 clock,
6871 move |bar: Bar| {
6872 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6873 h.push(bar);
6874 },
6875 false,
6876 false,
6877 BarIntervalType::LeftOpen,
6878 None,
6879 100, true, );
6882
6883 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6884 let rc = Rc::new(RefCell::new(boxed));
6885 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6886
6887 rc.borrow_mut().update(
6889 Price::from("100.00"),
6890 Quantity::from(1),
6891 UnixNanos::from(2_500_000_000),
6892 );
6893 rc.borrow_mut().build_bar(&TimeEvent::new(
6894 event_name,
6895 UUID4::new(),
6896 UnixNanos::from(3_000_100_000),
6897 UnixNanos::from(3_000_100_000),
6898 ));
6899 rc.borrow_mut().update(
6900 Price::from("101.00"),
6901 Quantity::from(1),
6902 UnixNanos::from(3_500_000_000),
6903 );
6904 rc.borrow_mut().build_bar(&TimeEvent::new(
6905 event_name,
6906 UUID4::new(),
6907 UnixNanos::from(4_000_100_000),
6908 UnixNanos::from(4_000_100_000),
6909 ));
6910
6911 let bars = handler.lock().expect(MUTEX_POISONED);
6912 assert_eq!(bars.len(), 1);
6913 assert_eq!(bars[0].close, Price::from("101.00"));
6914 }
6915
6916 #[rstest]
6917 #[case(
6918 BarAggregation::Month,
6919 1_735_689_600_000_000_000_u64,
6920 1_733_011_200_000_000_000_u64
6921 )]
6922 #[case(
6923 BarAggregation::Year,
6924 1_735_689_600_000_000_000_u64,
6925 1_704_067_200_000_000_000_u64
6926 )]
6927 fn test_time_bar_fire_immediately_month_year_stored_open_points_to_previous_period(
6928 equity_aapl: Equity,
6929 #[case] aggregation: BarAggregation,
6930 #[case] start_ns: u64,
6931 #[case] expected_stored_open_ns: u64,
6932 ) {
6933 let instrument = InstrumentAny::Equity(equity_aapl);
6938 let bar_spec = BarSpecification::new(1, aggregation, PriceType::Last);
6939 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6940 let handler = Arc::new(Mutex::new(Vec::new()));
6941 let handler_clone = Arc::clone(&handler);
6942 let clock = Rc::new(RefCell::new(TestClock::new()));
6943 clock.borrow_mut().set_time(UnixNanos::from(start_ns));
6944 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6945
6946 let aggregator = TimeBarAggregator::new(
6947 bar_type,
6948 instrument.price_precision(),
6949 instrument.size_precision(),
6950 clock,
6951 move |bar: Bar| {
6952 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6953 h.push(bar);
6954 },
6955 false,
6956 false,
6957 BarIntervalType::RightOpen, None,
6959 0,
6960 false, );
6962
6963 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6964 let rc = Rc::new(RefCell::new(boxed));
6965 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6966
6967 rc.borrow_mut().update(
6968 Price::from("100.00"),
6969 Quantity::from(1),
6970 UnixNanos::from(start_ns),
6971 );
6972 rc.borrow_mut().build_bar(&TimeEvent::new(
6973 event_name,
6974 UUID4::new(),
6975 UnixNanos::from(start_ns),
6976 UnixNanos::from(start_ns),
6977 ));
6978
6979 let bars = handler.lock().expect(MUTEX_POISONED);
6980 assert_eq!(bars.len(), 1);
6981 assert_eq!(bars[0].ts_event, UnixNanos::from(expected_stored_open_ns));
6982 assert_eq!(bars[0].ts_init, UnixNanos::from(start_ns));
6983 }
6984
6985 #[rstest]
6986 fn test_time_bar_historical_prevents_bars_for_timer_before_last_data(equity_aapl: Equity) {
6987 let instrument = InstrumentAny::Equity(equity_aapl);
6988 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6989 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6990 let handler = Arc::new(Mutex::new(Vec::new()));
6991 let handler_clone = Arc::clone(&handler);
6992 let clock = Rc::new(RefCell::new(TestClock::new()));
6993
6994 let mut agg = TimeBarAggregator::new(
6995 bar_type,
6996 instrument.price_precision(),
6997 instrument.size_precision(),
6998 clock.clone(),
6999 move |bar: Bar| {
7000 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
7001 h.push(bar);
7002 },
7003 true,
7004 true,
7005 BarIntervalType::LeftOpen,
7006 None,
7007 0,
7008 false,
7009 );
7010 agg.historical_mode = true;
7011 agg.set_clock_internal(clock);
7012 let boxed: Box<dyn BarAggregator> = Box::new(agg);
7013 let rc = Rc::new(RefCell::new(boxed));
7014 rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
7015
7016 let ts1 = UnixNanos::from(2_000_000_000);
7017 rc.borrow_mut()
7018 .update(Price::from("100.00"), Quantity::from(1), ts1);
7019
7020 let ts2 = UnixNanos::from(3_000_000_000);
7021 rc.borrow_mut()
7022 .update(Price::from("101.00"), Quantity::from(1), ts2);
7023
7024 let bars = handler.lock().expect(MUTEX_POISONED);
7025 assert!(
7026 !bars.is_empty(),
7027 "advancing time from ts1 to ts2 should produce at least one bar"
7028 );
7029 assert_eq!(bars[0].close, Price::from("100.00"));
7030 }
7031}
7032
7033#[cfg(test)]
7034mod property_tests {
7035 use std::{
7036 cell::RefCell,
7037 rc::Rc,
7038 sync::{Arc, Mutex},
7039 };
7040
7041 use nautilus_common::{clock::TestClock, timer::TimeEvent};
7042 use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
7043 use nautilus_model::{
7044 data::{Bar, BarSpecification, BarType, bar::get_bar_interval_ns},
7045 enums::{AggregationSource, BarAggregation, BarIntervalType, PriceType},
7046 instruments::{Instrument, InstrumentAny, stubs::equity_aapl},
7047 types::{Price, Quantity},
7048 };
7049 use proptest::prelude::*;
7050 use rstest::rstest;
7051 use ustr::Ustr;
7052
7053 use super::*;
7054
7055 fn time_bar_spec_strategy() -> impl Strategy<Value = (BarAggregation, usize)> {
7056 prop_oneof![
7057 (Just(BarAggregation::Second), 1usize..=5),
7058 (Just(BarAggregation::Minute), 1usize..=5),
7059 (Just(BarAggregation::Hour), 1usize..=4),
7060 ]
7061 }
7062
7063 fn interval_type_strategy() -> impl Strategy<Value = BarIntervalType> {
7064 prop_oneof![
7065 Just(BarIntervalType::LeftOpen),
7066 Just(BarIntervalType::RightOpen),
7067 ]
7068 }
7069
7070 proptest! {
7071 #[rstest]
7072 fn prop_skip_first_drops_partial_then_emits(
7073 (aggregation, step) in time_bar_spec_strategy(),
7074 interval_type in interval_type_strategy(),
7075 skip_first in any::<bool>(),
7076 ) {
7077 let instrument = InstrumentAny::Equity(equity_aapl());
7078 let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
7079 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7080 let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
7081
7082 let now_ns = interval_ns + interval_ns / 2;
7085
7086 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
7087 let handler_clone = Arc::clone(&handler);
7088 let clock = Rc::new(RefCell::new(TestClock::new()));
7089 clock.borrow_mut().set_time(UnixNanos::from(now_ns));
7090 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
7091
7092 let aggregator = TimeBarAggregator::new(
7093 bar_type,
7094 instrument.price_precision(),
7095 instrument.size_precision(),
7096 clock,
7097 move |bar: Bar| {
7098 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
7099 h.push(bar);
7100 },
7101 false,
7102 false,
7103 interval_type,
7104 None,
7105 0,
7106 skip_first,
7107 );
7108
7109 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
7110 let rc = Rc::new(RefCell::new(boxed));
7111 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
7112
7113 rc.borrow_mut().update(
7116 Price::from("100.00"),
7117 Quantity::from(1),
7118 UnixNanos::from(now_ns),
7119 );
7120 let first_close = 2 * interval_ns;
7121 rc.borrow_mut().build_bar(&TimeEvent::new(
7122 event_name,
7123 UUID4::new(),
7124 UnixNanos::from(first_close),
7125 UnixNanos::from(first_close),
7126 ));
7127
7128 rc.borrow_mut().update(
7130 Price::from("101.00"),
7131 Quantity::from(1),
7132 UnixNanos::from(first_close + interval_ns / 2),
7133 );
7134 let second_close = first_close + interval_ns;
7135 rc.borrow_mut().build_bar(&TimeEvent::new(
7136 event_name,
7137 UUID4::new(),
7138 UnixNanos::from(second_close),
7139 UnixNanos::from(second_close),
7140 ));
7141
7142 let bars = handler.lock().expect(MUTEX_POISONED);
7143 let expected = if skip_first { 1 } else { 2 };
7144 prop_assert_eq!(bars.len(), expected);
7145 prop_assert_eq!(bars.last().unwrap().close, Price::from("101.00"));
7146 for bar in bars.iter() {
7147 prop_assert!(bar.high >= bar.open);
7148 prop_assert!(bar.high >= bar.close);
7149 prop_assert!(bar.low <= bar.open);
7150 prop_assert!(bar.low <= bar.close);
7151 }
7152 }
7153
7154 #[rstest]
7155 fn prop_skip_first_noop_on_exact_boundary(
7156 (aggregation, step) in time_bar_spec_strategy(),
7157 interval_type in interval_type_strategy(),
7158 ) {
7159 let instrument = InstrumentAny::Equity(equity_aapl());
7160 let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
7161 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7162 let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
7163
7164 let now_ns = interval_ns;
7167 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
7168 let handler_clone = Arc::clone(&handler);
7169 let clock = Rc::new(RefCell::new(TestClock::new()));
7170 clock.borrow_mut().set_time(UnixNanos::from(now_ns));
7171 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
7172
7173 let aggregator = TimeBarAggregator::new(
7174 bar_type,
7175 instrument.price_precision(),
7176 instrument.size_precision(),
7177 clock,
7178 move |bar: Bar| {
7179 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
7180 h.push(bar);
7181 },
7182 false,
7183 false,
7184 interval_type,
7185 None,
7186 0,
7187 true, );
7189
7190 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
7191 let rc = Rc::new(RefCell::new(boxed));
7192 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
7193
7194 rc.borrow_mut().update(
7195 Price::from("100.00"),
7196 Quantity::from(1),
7197 UnixNanos::from(now_ns),
7198 );
7199 let next_close = now_ns + interval_ns;
7200 rc.borrow_mut().build_bar(&TimeEvent::new(
7201 event_name,
7202 UUID4::new(),
7203 UnixNanos::from(next_close),
7204 UnixNanos::from(next_close),
7205 ));
7206
7207 let bars = handler.lock().expect(MUTEX_POISONED);
7208 prop_assert_eq!(bars.len(), 1);
7209 prop_assert_eq!(bars[0].close, Price::from("100.00"));
7210 }
7211
7212 #[rstest]
7213 fn prop_bar_builder_ohlc_invariants(
7214 updates in prop::collection::vec((1i64..=100_000i64, 1u64..=1_000u64), 1..=50),
7215 ) {
7216 let instrument = InstrumentAny::Equity(equity_aapl());
7217 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7218 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7219 let mut builder = BarBuilder::new(bar_type, 2, 0);
7220
7221 let mut total_volume: u64 = 0;
7222
7223 for (i, (price_cents, size)) in updates.iter().enumerate() {
7224 let price = Price::new((*price_cents as f64) / 100.0, 2);
7225 let qty = Quantity::new(*size as f64, 0);
7226 let ts = UnixNanos::from((i as u64 + 1) * 1_000);
7227 total_volume += *size;
7228 builder.update(price, qty, ts);
7229 }
7230
7231 let bar = builder.build_now();
7232 prop_assert!(bar.low <= bar.open);
7233 prop_assert!(bar.low <= bar.close);
7234 prop_assert!(bar.high >= bar.open);
7235 prop_assert!(bar.high >= bar.close);
7236 prop_assert!(bar.low <= bar.high);
7237 prop_assert_eq!(bar.volume.as_f64(), total_volume as f64);
7238 }
7239
7240 #[rstest]
7241 fn prop_tick_bar_aggregator_volume_conservation(
7242 ticks in prop::collection::vec((1i64..=1_000i64, 1u64..=100u64), 3..=60),
7243 step in 1usize..=5,
7244 ) {
7245 let instrument = InstrumentAny::Equity(equity_aapl());
7246 let bar_spec = BarSpecification::new(step, BarAggregation::Tick, PriceType::Last);
7247 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7248 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
7249 let handler_clone = Arc::clone(&handler);
7250
7251 let mut aggregator = TickBarAggregator::new(
7252 bar_type,
7253 instrument.price_precision(),
7254 instrument.size_precision(),
7255 move |bar: Bar| {
7256 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
7257 },
7258 );
7259
7260 let mut total_input: u64 = 0;
7261
7262 for (i, (price_cents, size)) in ticks.iter().enumerate() {
7263 let price = Price::new((*price_cents as f64) / 100.0, 2);
7264 let qty = Quantity::new(*size as f64, 0);
7265 aggregator.update(price, qty, UnixNanos::from((i as u64 + 1) * 1_000));
7266 total_input += *size;
7267 }
7268
7269 let bars = handler.lock().expect(MUTEX_POISONED);
7270 let emitted_count = bars.len();
7271 prop_assert_eq!(emitted_count, ticks.len() / step);
7272
7273 let mut sum_emitted: f64 = 0.0;
7274
7275 for bar in bars.iter() {
7276 prop_assert!(bar.low <= bar.open);
7277 prop_assert!(bar.low <= bar.close);
7278 prop_assert!(bar.high >= bar.open);
7279 prop_assert!(bar.high >= bar.close);
7280 sum_emitted += bar.volume.as_f64();
7281 }
7282
7283 let pending_size: u64 = ticks.iter()
7285 .skip(emitted_count * step)
7286 .map(|(_, s)| *s)
7287 .sum();
7288 prop_assert!((sum_emitted + pending_size as f64 - total_input as f64).abs() < 1e-6);
7289 }
7290
7291 #[rstest]
7292 fn prop_volume_bar_aggregator_conservation(
7293 sizes in prop::collection::vec(1u64..=50u64, 3..=40),
7294 step in 2u64..=10u64,
7295 ) {
7296 let instrument = InstrumentAny::Equity(equity_aapl());
7297 let bar_spec = BarSpecification::new(step as usize, BarAggregation::Volume, PriceType::Last);
7298 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7299 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
7300 let handler_clone = Arc::clone(&handler);
7301
7302 let mut aggregator = VolumeBarAggregator::new(
7303 bar_type,
7304 instrument.price_precision(),
7305 instrument.size_precision(),
7306 move |bar: Bar| {
7307 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
7308 },
7309 );
7310
7311 let mut total_input: u64 = 0;
7312
7313 for (i, size) in sizes.iter().enumerate() {
7314 aggregator.update(
7315 Price::from("100.00"),
7316 Quantity::new(*size as f64, 0),
7317 UnixNanos::from((i as u64 + 1) * 1_000),
7318 );
7319 total_input += *size;
7320 }
7321
7322 let bars = handler.lock().expect(MUTEX_POISONED);
7323
7324 for bar in bars.iter() {
7326 prop_assert_eq!(bar.volume, Quantity::from(step));
7327 prop_assert!(bar.low <= bar.open);
7328 prop_assert!(bar.low <= bar.close);
7329 prop_assert!(bar.high >= bar.open);
7330 prop_assert!(bar.high >= bar.close);
7331 }
7332
7333 let emitted_total: u64 = bars.len() as u64 * step;
7335 let pending = aggregator.core.builder.volume.as_f64();
7336 prop_assert!((emitted_total as f64 + pending - total_input as f64).abs() < 1e-6);
7337 }
7338
7339 #[rstest]
7340 fn prop_bar_builder_spread_adjustment_is_additive(
7341 updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7342 spread_cents in -10_000i64..=10_000i64,
7343 backward in any::<bool>(),
7344 ) {
7345 let instrument = InstrumentAny::Equity(equity_aapl());
7346 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7347 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7348 let mut builder = BarBuilder::new(bar_type, 2, 0);
7349
7350 let spread = Decimal::new(spread_cents, 2);
7351 let mode = if backward {
7352 ContinuousFutureAdjustmentType::BackwardSpread
7353 } else {
7354 ContinuousFutureAdjustmentType::ForwardSpread
7355 };
7356 builder.set_adjustment(spread, mode);
7357
7358 let mut min_cents = i64::MAX;
7359 let mut max_cents = i64::MIN;
7360
7361 for (i, (price_cents, size)) in updates.iter().enumerate() {
7362 if *price_cents < min_cents {
7363 min_cents = *price_cents;
7364 }
7365
7366 if *price_cents > max_cents {
7367 max_cents = *price_cents;
7368 }
7369
7370 builder.update(
7371 Price::new((*price_cents as f64) / 100.0, 2),
7372 Quantity::new(*size as f64, 0),
7373 UnixNanos::from((i as u64 + 1) * 1_000),
7374 );
7375 }
7376
7377 let bar = builder.build_now();
7378 let first_decimal = Decimal::new(updates.first().unwrap().0, 2);
7379 let last_decimal = Decimal::new(updates.last().unwrap().0, 2);
7380 let min_decimal = Decimal::new(min_cents, 2);
7381 let max_decimal = Decimal::new(max_cents, 2);
7382
7383 prop_assert_eq!(bar.open.as_decimal(), first_decimal + spread);
7384 prop_assert_eq!(bar.close.as_decimal(), last_decimal + spread);
7385 prop_assert_eq!(bar.low.as_decimal(), min_decimal + spread);
7386 prop_assert_eq!(bar.high.as_decimal(), max_decimal + spread);
7387 }
7388
7389 #[rstest]
7390 fn prop_bar_builder_inactive_adjustment_is_identity(
7391 updates in prop::collection::vec((1i64..=100_000i64, 1u64..=1_000u64), 1..=20),
7392 use_ratio in any::<bool>(),
7393 ) {
7394 let instrument = InstrumentAny::Equity(equity_aapl());
7395 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7396 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7397
7398 let mut adjusted = BarBuilder::new(bar_type, 2, 0);
7399 let mut baseline = BarBuilder::new(bar_type, 2, 0);
7400
7401 let (input, mode) = if use_ratio {
7403 (Decimal::ONE, ContinuousFutureAdjustmentType::BackwardRatio)
7404 } else {
7405 (Decimal::ZERO, ContinuousFutureAdjustmentType::BackwardSpread)
7406 };
7407 adjusted.set_adjustment(input, mode);
7408
7409 for (i, (price_cents, size)) in updates.iter().enumerate() {
7410 let price = Price::new((*price_cents as f64) / 100.0, 2);
7411 let qty = Quantity::new(*size as f64, 0);
7412 let ts = UnixNanos::from((i as u64 + 1) * 1_000);
7413 adjusted.update(price, qty, ts);
7414 baseline.update(price, qty, ts);
7415 }
7416
7417 let bar_adjusted = adjusted.build_now();
7418 let bar_baseline = baseline.build_now();
7419 prop_assert_eq!(bar_adjusted.open, bar_baseline.open);
7420 prop_assert_eq!(bar_adjusted.high, bar_baseline.high);
7421 prop_assert_eq!(bar_adjusted.low, bar_baseline.low);
7422 prop_assert_eq!(bar_adjusted.close, bar_baseline.close);
7423 prop_assert_eq!(bar_adjusted.volume, bar_baseline.volume);
7424 }
7425
7426 #[rstest]
7427 fn prop_bar_builder_spread_preserves_raw_arithmetic(
7428 updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7429 spread_micro in -10_000i64..=10_000i64,
7432 ) {
7433 let instrument = InstrumentAny::Equity(equity_aapl());
7434 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7435 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7436 let mut builder = BarBuilder::new(bar_type, 2, 0);
7437
7438 let spread = Decimal::new(spread_micro, 4);
7439 builder.set_adjustment(spread, ContinuousFutureAdjustmentType::BackwardSpread);
7440
7441 let adjustment_raw_i128 = mantissa_exponent_to_fixed_i128(
7442 spread.mantissa(),
7443 -(spread.scale() as i8),
7444 FIXED_PRECISION,
7445 )
7446 .expect("scale within range");
7447 #[allow(
7448 clippy::useless_conversion,
7449 reason = "i128 to PriceRaw is real when not high-precision"
7450 )]
7451 let expected_adjustment_raw: PriceRaw =
7452 adjustment_raw_i128.try_into().expect("within PriceRaw range");
7453
7454 let mut min_cents = i64::MAX;
7455 let mut max_cents = i64::MIN;
7456 let mut last_price = Price::new(0.0, 2);
7457 let mut first_price = Price::new(0.0, 2);
7458
7459 for (i, (price_cents, size)) in updates.iter().enumerate() {
7460 if *price_cents < min_cents {
7461 min_cents = *price_cents;
7462 }
7463
7464 if *price_cents > max_cents {
7465 max_cents = *price_cents;
7466 }
7467
7468 let price = Price::new((*price_cents as f64) / 100.0, 2);
7469
7470 if i == 0 {
7471 first_price = price;
7472 }
7473
7474 last_price = price;
7475 builder.update(
7476 price,
7477 Quantity::new(*size as f64, 0),
7478 UnixNanos::from((i as u64 + 1) * 1_000),
7479 );
7480 }
7481
7482 let bar = builder.build_now();
7483 let min_price = Price::new((min_cents as f64) / 100.0, 2);
7484 let max_price = Price::new((max_cents as f64) / 100.0, 2);
7485 prop_assert_eq!(bar.open.raw, first_price.raw + expected_adjustment_raw);
7486 prop_assert_eq!(bar.close.raw, last_price.raw + expected_adjustment_raw);
7487 prop_assert_eq!(bar.low.raw, min_price.raw + expected_adjustment_raw);
7488 prop_assert_eq!(bar.high.raw, max_price.raw + expected_adjustment_raw);
7489 prop_assert_eq!(bar.open.precision, 2);
7490 prop_assert_eq!(bar.high.precision, 2);
7491 prop_assert_eq!(bar.low.precision, 2);
7492 prop_assert_eq!(bar.close.precision, 2);
7493 }
7494
7495 #[rstest]
7496 fn prop_bar_builder_active_ratio_scales_each_ohlc(
7497 updates in prop::collection::vec((1_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7498 ratio_centi in prop_oneof![50i64..=99i64, 101i64..=200i64],
7500 backward in any::<bool>(),
7501 ) {
7502 let instrument = InstrumentAny::Equity(equity_aapl());
7503 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7504 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7505 let mut builder = BarBuilder::new(bar_type, 2, 0);
7506
7507 let ratio_decimal = Decimal::new(ratio_centi, 2);
7508 let ratio_f64 = (ratio_centi as f64) / 100.0;
7509 let mode = if backward {
7510 ContinuousFutureAdjustmentType::BackwardRatio
7511 } else {
7512 ContinuousFutureAdjustmentType::ForwardRatio
7513 };
7514 builder.set_adjustment(ratio_decimal, mode);
7515
7516 let mut min_cents = i64::MAX;
7517 let mut max_cents = i64::MIN;
7518 let mut first_cents = 0i64;
7519 let mut last_cents = 0i64;
7520
7521 for (i, (price_cents, size)) in updates.iter().enumerate() {
7522 if *price_cents < min_cents {
7523 min_cents = *price_cents;
7524 }
7525
7526 if *price_cents > max_cents {
7527 max_cents = *price_cents;
7528 }
7529
7530 if i == 0 {
7531 first_cents = *price_cents;
7532 }
7533
7534 last_cents = *price_cents;
7535 builder.update(
7536 Price::new((*price_cents as f64) / 100.0, 2),
7537 Quantity::new(*size as f64, 0),
7538 UnixNanos::from((i as u64 + 1) * 1_000),
7539 );
7540 }
7541
7542 let bar = builder.build_now();
7543 let expect = |cents: i64| Price::new((cents as f64) / 100.0 * ratio_f64, 2);
7545 prop_assert_eq!(bar.open, expect(first_cents));
7546 prop_assert_eq!(bar.close, expect(last_cents));
7547 prop_assert_eq!(bar.low, expect(min_cents));
7549 prop_assert_eq!(bar.high, expect(max_cents));
7550 }
7551
7552 #[rstest]
7553 fn prop_bar_builder_spread_mode_direction_is_metadata_only(
7554 updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7555 spread_cents in -10_000i64..=10_000i64,
7556 ) {
7557 let instrument = InstrumentAny::Equity(equity_aapl());
7558 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7559 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7560
7561 let spread = Decimal::new(spread_cents, 2);
7562 let mut backward = BarBuilder::new(bar_type, 2, 0);
7563 let mut forward = BarBuilder::new(bar_type, 2, 0);
7564 backward.set_adjustment(spread, ContinuousFutureAdjustmentType::BackwardSpread);
7565 forward.set_adjustment(spread, ContinuousFutureAdjustmentType::ForwardSpread);
7566
7567 for (i, (price_cents, size)) in updates.iter().enumerate() {
7568 let price = Price::new((*price_cents as f64) / 100.0, 2);
7569 let qty = Quantity::new(*size as f64, 0);
7570 let ts = UnixNanos::from((i as u64 + 1) * 1_000);
7571 backward.update(price, qty, ts);
7572 forward.update(price, qty, ts);
7573 }
7574
7575 let bar_backward = backward.build_now();
7576 let bar_forward = forward.build_now();
7577 prop_assert_eq!(bar_backward.open, bar_forward.open);
7578 prop_assert_eq!(bar_backward.high, bar_forward.high);
7579 prop_assert_eq!(bar_backward.low, bar_forward.low);
7580 prop_assert_eq!(bar_backward.close, bar_forward.close);
7581 }
7582
7583 #[rstest]
7584 fn prop_value_bar_aggregator_ohlc_invariants(
7585 ticks in prop::collection::vec((50i64..=500i64, 1u64..=20u64), 2..=30),
7586 step in 100u64..=2_000u64,
7587 ) {
7588 let instrument = InstrumentAny::Equity(equity_aapl());
7589 let bar_spec = BarSpecification::new(step as usize, BarAggregation::Value, PriceType::Last);
7590 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7591 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
7592 let handler_clone = Arc::clone(&handler);
7593
7594 let mut aggregator = ValueBarAggregator::new(
7595 bar_type,
7596 instrument.price_precision(),
7597 instrument.size_precision(),
7598 move |bar: Bar| {
7599 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
7600 },
7601 );
7602
7603 for (i, (price_cents, size)) in ticks.iter().enumerate() {
7604 aggregator.update(
7605 Price::new((*price_cents as f64) / 100.0, 2),
7606 Quantity::new(*size as f64, 0),
7607 UnixNanos::from((i as u64 + 1) * 1_000),
7608 );
7609 }
7610
7611 let bars = handler.lock().expect(MUTEX_POISONED);
7612 for bar in bars.iter() {
7613 prop_assert!(bar.low <= bar.open);
7614 prop_assert!(bar.low <= bar.close);
7615 prop_assert!(bar.high >= bar.open);
7616 prop_assert!(bar.high >= bar.close);
7617 prop_assert!(bar.volume.as_f64() > 0.0);
7618 }
7619 }
7620 }
7621}