1use std::{
22 any::Any,
23 cell::RefCell,
24 fmt::Debug,
25 ops::Add,
26 rc::{Rc, Weak},
27};
28
29use ahash::AHashMap;
30use chrono::{Duration, TimeDelta};
31use nautilus_common::{
32 clock::{Clock, TestClock},
33 timer::{TimeEvent, TimeEventCallback},
34};
35use nautilus_core::{
36 UnixNanos,
37 correctness::{self, FAILED},
38 datetime::{
39 add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos, subtract_n_months_nanos,
40 subtract_n_years_nanos,
41 },
42};
43use nautilus_model::{
44 data::{
45 QuoteTick, TradeTick,
46 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
47 },
48 enums::{
49 AggregationSource, AggressorSide, BarAggregation, BarIntervalType,
50 ContinuousFutureAdjustmentType,
51 },
52 identifiers::InstrumentId,
53 instruments::{FixedTickScheme, TickSchemeRule},
54 types::{
55 Price, Quantity,
56 fixed::{FIXED_PRECISION, FIXED_SCALAR, mantissa_exponent_to_fixed_i128},
57 price::PriceRaw,
58 quantity::QuantityRaw,
59 },
60};
61use rust_decimal::{Decimal, prelude::ToPrimitive};
62
63type BarHandler = Box<dyn FnMut(Bar)>;
65
66pub trait BarAggregator: Any + Debug {
70 fn bar_type(&self) -> BarType;
72 fn is_running(&self) -> bool;
74 fn set_is_running(&mut self, value: bool);
76 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
78 fn handle_quote(&mut self, quote: QuoteTick) {
80 let spec = self.bar_type().spec();
81 self.update(
82 quote.extract_price(spec.price_type),
83 quote.extract_size(spec.price_type),
84 quote.ts_init,
85 );
86 }
87 fn handle_trade(&mut self, trade: TradeTick) {
89 self.update(trade.price, trade.size, trade.ts_init);
90 }
91 fn handle_bar(&mut self, bar: Bar) {
93 self.update_bar(bar, bar.volume, bar.ts_init);
94 }
95 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
96 fn stop(&mut self) {}
98 fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
100 fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
102 fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
104 fn build_bar(&mut self, _event: &TimeEvent) {}
106 fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
110 fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
113}
114
115impl dyn BarAggregator {
116 pub fn as_any(&self) -> &dyn Any {
118 self
119 }
120 pub fn as_any_mut(&mut self) -> &mut dyn Any {
122 self
123 }
124}
125
126#[derive(Debug)]
128pub struct BarBuilder {
129 bar_type: BarType,
130 price_precision: u8,
131 size_precision: u8,
132 initialized: bool,
133 ts_last: UnixNanos,
134 count: usize,
135 last_close: Option<Price>,
136 open: Option<Price>,
137 high: Option<Price>,
138 low: Option<Price>,
139 close: Option<Price>,
140 volume: Quantity,
141 adjustment_mode: ContinuousFutureAdjustmentType,
142 adjustment_raw: PriceRaw,
143 adjustment_ratio: f64,
144 adjustment_active: bool,
145 adjustment_is_ratio: bool,
146}
147
148impl BarBuilder {
149 #[must_use]
155 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
156 correctness::check_equal(
157 &bar_type.aggregation_source(),
158 &AggregationSource::Internal,
159 "bar_type.aggregation_source",
160 "AggregationSource::Internal",
161 )
162 .expect(FAILED);
163
164 Self {
165 bar_type,
166 price_precision,
167 size_precision,
168 initialized: false,
169 ts_last: UnixNanos::default(),
170 count: 0,
171 last_close: None,
172 open: None,
173 high: None,
174 low: None,
175 close: None,
176 volume: Quantity::zero(size_precision),
177 adjustment_mode: ContinuousFutureAdjustmentType::default(),
178 adjustment_raw: 0,
179 adjustment_ratio: 1.0,
180 adjustment_active: false,
181 adjustment_is_ratio: false,
182 }
183 }
184
185 pub fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
196 self.adjustment_mode = mode;
197
198 if mode.is_ratio() {
199 self.adjustment_is_ratio = true;
200 self.adjustment_ratio = adjustment.to_f64().unwrap_or(1.0);
201 self.adjustment_active = adjustment != Decimal::ONE;
202 return;
203 }
204
205 self.adjustment_is_ratio = false;
209 let exponent = -(adjustment.scale() as i8);
210 let raw_i128 =
211 mantissa_exponent_to_fixed_i128(adjustment.mantissa(), exponent, FIXED_PRECISION)
212 .expect("Failed to scale continuous-future adjustment to fixed precision");
213
214 #[allow(
215 clippy::useless_conversion,
216 reason = "i128 to PriceRaw is real when not high-precision"
217 )]
218 let raw: PriceRaw = raw_i128
219 .try_into()
220 .expect("Continuous-future adjustment exceeds PriceRaw range");
221
222 self.adjustment_raw = raw;
223 self.adjustment_active = self.adjustment_raw != 0;
224 }
225
226 fn apply_adjustment_to_price(&self, price: Price) -> Price {
227 if !self.adjustment_active {
228 return price;
229 }
230
231 if self.adjustment_is_ratio {
232 return Price::new(price.as_f64() * self.adjustment_ratio, price.precision);
235 }
236
237 Price::from_raw(price.raw + self.adjustment_raw, price.precision)
239 }
240
241 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
247 if ts_init < self.ts_last {
248 return; }
250
251 let price = self.apply_adjustment_to_price(price);
252
253 if self.open.is_none() {
254 self.open = Some(price);
255 self.high = Some(price);
256 self.low = Some(price);
257 self.initialized = true;
258 } else {
259 if price > self.high.unwrap() {
260 self.high = Some(price);
261 }
262
263 if price < self.low.unwrap() {
264 self.low = Some(price);
265 }
266 }
267
268 self.close = Some(price);
269 self.volume = self.volume.add(size);
270 self.count += 1;
271 self.ts_last = ts_init;
272 }
273
274 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
280 if ts_init < self.ts_last {
281 return; }
283
284 let bar_open = self.apply_adjustment_to_price(bar.open);
285 let bar_high = self.apply_adjustment_to_price(bar.high);
286 let bar_low = self.apply_adjustment_to_price(bar.low);
287 let bar_close = self.apply_adjustment_to_price(bar.close);
288
289 if self.open.is_none() {
290 self.open = Some(bar_open);
291 self.high = Some(bar_high);
292 self.low = Some(bar_low);
293 self.initialized = true;
294 } else {
295 if bar_high > self.high.unwrap() {
296 self.high = Some(bar_high);
297 }
298
299 if bar_low < self.low.unwrap() {
300 self.low = Some(bar_low);
301 }
302 }
303
304 self.close = Some(bar_close);
305 self.volume = self.volume.add(volume);
306 self.count += 1;
307 self.ts_last = ts_init;
308 }
309
310 pub fn reset(&mut self) {
315 self.open = None;
316 self.high = None;
317 self.low = None;
318 self.volume = Quantity::zero(self.size_precision);
319 self.count = 0;
320 }
321
322 pub fn build_now(&mut self) -> Bar {
324 self.build(self.ts_last, self.ts_last)
325 }
326
327 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
333 if self.open.is_none() {
334 self.open = self.last_close;
335 self.high = self.last_close;
336 self.low = self.last_close;
337 self.close = self.last_close;
338 }
339
340 if let (Some(close), Some(low)) = (self.close, self.low)
341 && close < low
342 {
343 self.low = Some(close);
344 }
345
346 if let (Some(close), Some(high)) = (self.close, self.high)
347 && close > high
348 {
349 self.high = Some(close);
350 }
351
352 let bar = Bar::new(
354 self.bar_type,
355 self.open.unwrap(),
356 self.high.unwrap(),
357 self.low.unwrap(),
358 self.close.unwrap(),
359 self.volume,
360 ts_event,
361 ts_init,
362 );
363
364 self.last_close = self.close;
365 self.reset();
366 bar
367 }
368}
369
370pub struct BarAggregatorCore {
372 bar_type: BarType,
373 builder: BarBuilder,
374 handler: BarHandler,
375 is_running: bool,
376}
377
378impl Debug for BarAggregatorCore {
379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380 f.debug_struct(stringify!(BarAggregatorCore))
381 .field("bar_type", &self.bar_type)
382 .field("builder", &self.builder)
383 .field("is_running", &self.is_running)
384 .finish()
385 }
386}
387
388impl BarAggregatorCore {
389 pub fn new<H: FnMut(Bar) + 'static>(
395 bar_type: BarType,
396 price_precision: u8,
397 size_precision: u8,
398 handler: H,
399 ) -> Self {
400 Self {
401 bar_type,
402 builder: BarBuilder::new(bar_type, price_precision, size_precision),
403 handler: Box::new(handler),
404 is_running: false,
405 }
406 }
407
408 pub const fn set_is_running(&mut self, value: bool) {
410 self.is_running = value;
411 }
412 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
413 self.builder.update(price, size, ts_init);
414 }
415
416 fn build_now_and_send(&mut self) {
417 let bar = self.builder.build_now();
418 (self.handler)(bar);
419 }
420
421 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
422 let bar = self.builder.build(ts_event, ts_init);
423 (self.handler)(bar);
424 }
425}
426
427pub struct TickBarAggregator {
432 core: BarAggregatorCore,
433}
434
435impl Debug for TickBarAggregator {
436 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437 f.debug_struct(stringify!(TickBarAggregator))
438 .field("core", &self.core)
439 .finish()
440 }
441}
442
443impl TickBarAggregator {
444 pub fn new<H: FnMut(Bar) + 'static>(
450 bar_type: BarType,
451 price_precision: u8,
452 size_precision: u8,
453 handler: H,
454 ) -> Self {
455 Self {
456 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
457 }
458 }
459}
460
461impl BarAggregator for TickBarAggregator {
462 fn bar_type(&self) -> BarType {
463 self.core.bar_type
464 }
465
466 fn is_running(&self) -> bool {
467 self.core.is_running
468 }
469
470 fn set_is_running(&mut self, value: bool) {
471 self.core.set_is_running(value);
472 }
473
474 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
476 self.core.apply_update(price, size, ts_init);
477 let spec = self.core.bar_type.spec();
478
479 if self.core.builder.count >= spec.step.get() {
480 self.core.build_now_and_send();
481 }
482 }
483
484 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
485 self.core.builder.update_bar(bar, volume, ts_init);
486 let spec = self.core.bar_type.spec();
487
488 if self.core.builder.count >= spec.step.get() {
489 self.core.build_now_and_send();
490 }
491 }
492}
493
494pub struct TickImbalanceBarAggregator {
499 core: BarAggregatorCore,
500 imbalance: isize,
501}
502
503impl Debug for TickImbalanceBarAggregator {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 f.debug_struct(stringify!(TickImbalanceBarAggregator))
506 .field("core", &self.core)
507 .field("imbalance", &self.imbalance)
508 .finish()
509 }
510}
511
512impl TickImbalanceBarAggregator {
513 pub fn new<H: FnMut(Bar) + 'static>(
519 bar_type: BarType,
520 price_precision: u8,
521 size_precision: u8,
522 handler: H,
523 ) -> Self {
524 Self {
525 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
526 imbalance: 0,
527 }
528 }
529}
530
531impl BarAggregator for TickImbalanceBarAggregator {
532 fn bar_type(&self) -> BarType {
533 self.core.bar_type
534 }
535
536 fn is_running(&self) -> bool {
537 self.core.is_running
538 }
539
540 fn set_is_running(&mut self, value: bool) {
541 self.core.set_is_running(value);
542 }
543
544 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
549 self.core.apply_update(price, size, ts_init);
550 }
551
552 fn handle_trade(&mut self, trade: TradeTick) {
553 self.core
554 .apply_update(trade.price, trade.size, trade.ts_init);
555
556 let delta = match trade.aggressor_side {
557 AggressorSide::Buyer => 1,
558 AggressorSide::Seller => -1,
559 AggressorSide::NoAggressor => 0,
560 };
561
562 if delta == 0 {
563 return;
564 }
565
566 self.imbalance += delta;
567 let threshold = self.core.bar_type.spec().step.get();
568 if self.imbalance.unsigned_abs() >= threshold {
569 self.core.build_now_and_send();
570 self.imbalance = 0;
571 }
572 }
573
574 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
575 self.core.builder.update_bar(bar, volume, ts_init);
576 }
577}
578
579pub struct TickRunsBarAggregator {
581 core: BarAggregatorCore,
582 current_run_side: Option<AggressorSide>,
583 run_count: usize,
584}
585
586impl Debug for TickRunsBarAggregator {
587 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588 f.debug_struct(stringify!(TickRunsBarAggregator))
589 .field("core", &self.core)
590 .field("current_run_side", &self.current_run_side)
591 .field("run_count", &self.run_count)
592 .finish()
593 }
594}
595
596impl TickRunsBarAggregator {
597 pub fn new<H: FnMut(Bar) + 'static>(
603 bar_type: BarType,
604 price_precision: u8,
605 size_precision: u8,
606 handler: H,
607 ) -> Self {
608 Self {
609 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
610 current_run_side: None,
611 run_count: 0,
612 }
613 }
614}
615
616impl BarAggregator for TickRunsBarAggregator {
617 fn bar_type(&self) -> BarType {
618 self.core.bar_type
619 }
620
621 fn is_running(&self) -> bool {
622 self.core.is_running
623 }
624
625 fn set_is_running(&mut self, value: bool) {
626 self.core.set_is_running(value);
627 }
628
629 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
634 self.core.apply_update(price, size, ts_init);
635 }
636
637 fn handle_trade(&mut self, trade: TradeTick) {
638 let side = match trade.aggressor_side {
639 AggressorSide::Buyer => Some(AggressorSide::Buyer),
640 AggressorSide::Seller => Some(AggressorSide::Seller),
641 AggressorSide::NoAggressor => None,
642 };
643
644 if let Some(side) = side {
645 if self.current_run_side != Some(side) {
646 self.current_run_side = Some(side);
647 self.run_count = 0;
648 self.core.builder.reset();
649 }
650
651 self.core
652 .apply_update(trade.price, trade.size, trade.ts_init);
653 self.run_count += 1;
654
655 let threshold = self.core.bar_type.spec().step.get();
656 if self.run_count >= threshold {
657 self.core.build_now_and_send();
658 self.run_count = 0;
659 self.current_run_side = None;
660 }
661 } else {
662 self.core
663 .apply_update(trade.price, trade.size, trade.ts_init);
664 }
665 }
666
667 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
668 self.core.builder.update_bar(bar, volume, ts_init);
669 }
670}
671
672pub struct VolumeBarAggregator {
674 core: BarAggregatorCore,
675}
676
677impl Debug for VolumeBarAggregator {
678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679 f.debug_struct(stringify!(VolumeBarAggregator))
680 .field("core", &self.core)
681 .finish()
682 }
683}
684
685impl VolumeBarAggregator {
686 pub fn new<H: FnMut(Bar) + 'static>(
692 bar_type: BarType,
693 price_precision: u8,
694 size_precision: u8,
695 handler: H,
696 ) -> Self {
697 Self {
698 core: BarAggregatorCore::new(
699 bar_type.standard(),
700 price_precision,
701 size_precision,
702 handler,
703 ),
704 }
705 }
706}
707
708impl BarAggregator for VolumeBarAggregator {
709 fn bar_type(&self) -> BarType {
710 self.core.bar_type
711 }
712
713 fn is_running(&self) -> bool {
714 self.core.is_running
715 }
716
717 fn set_is_running(&mut self, value: bool) {
718 self.core.set_is_running(value);
719 }
720
721 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
723 let mut raw_size_update = size.raw;
724 let spec = self.core.bar_type.spec();
725 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
726
727 while raw_size_update > 0 {
728 if self.core.builder.volume.raw + raw_size_update < raw_step {
729 self.core.apply_update(
730 price,
731 Quantity::from_raw(raw_size_update, size.precision),
732 ts_init,
733 );
734 break;
735 }
736
737 let raw_size_diff = raw_step - self.core.builder.volume.raw;
738 self.core.apply_update(
739 price,
740 Quantity::from_raw(raw_size_diff, size.precision),
741 ts_init,
742 );
743
744 self.core.build_now_and_send();
745 raw_size_update -= raw_size_diff;
746 }
747 }
748
749 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
750 let mut raw_volume_update = volume.raw;
751 let spec = self.core.bar_type.spec();
752 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
753
754 while raw_volume_update > 0 {
755 if self.core.builder.volume.raw + raw_volume_update < raw_step {
756 self.core.builder.update_bar(
757 bar,
758 Quantity::from_raw(raw_volume_update, volume.precision),
759 ts_init,
760 );
761 break;
762 }
763
764 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
765 self.core.builder.update_bar(
766 bar,
767 Quantity::from_raw(raw_volume_diff, volume.precision),
768 ts_init,
769 );
770
771 self.core.build_now_and_send();
772 raw_volume_update -= raw_volume_diff;
773 }
774 }
775}
776
777pub struct VolumeImbalanceBarAggregator {
779 core: BarAggregatorCore,
780 imbalance_raw: i128,
781 raw_step: i128,
782}
783
784impl Debug for VolumeImbalanceBarAggregator {
785 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
786 f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
787 .field("core", &self.core)
788 .field("imbalance_raw", &self.imbalance_raw)
789 .field("raw_step", &self.raw_step)
790 .finish()
791 }
792}
793
794impl VolumeImbalanceBarAggregator {
795 pub fn new<H: FnMut(Bar) + 'static>(
801 bar_type: BarType,
802 price_precision: u8,
803 size_precision: u8,
804 handler: H,
805 ) -> Self {
806 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
807 Self {
808 core: BarAggregatorCore::new(
809 bar_type.standard(),
810 price_precision,
811 size_precision,
812 handler,
813 ),
814 imbalance_raw: 0,
815 raw_step,
816 }
817 }
818}
819
820impl BarAggregator for VolumeImbalanceBarAggregator {
821 fn bar_type(&self) -> BarType {
822 self.core.bar_type
823 }
824
825 fn is_running(&self) -> bool {
826 self.core.is_running
827 }
828
829 fn set_is_running(&mut self, value: bool) {
830 self.core.set_is_running(value);
831 }
832
833 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
838 self.core.apply_update(price, size, ts_init);
839 }
840
841 fn handle_trade(&mut self, trade: TradeTick) {
842 let side = match trade.aggressor_side {
843 AggressorSide::Buyer => 1,
844 AggressorSide::Seller => -1,
845 AggressorSide::NoAggressor => {
846 self.core
847 .apply_update(trade.price, trade.size, trade.ts_init);
848 return;
849 }
850 };
851
852 let mut raw_remaining = trade.size.raw as i128;
853 while raw_remaining > 0 {
854 let imbalance_abs = self.imbalance_raw.abs();
855 let needed = (self.raw_step - imbalance_abs).max(1);
856 let raw_chunk = raw_remaining.min(needed);
857 let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
858
859 self.core
860 .apply_update(trade.price, qty_chunk, trade.ts_init);
861
862 self.imbalance_raw += side * raw_chunk;
863 raw_remaining -= raw_chunk;
864
865 if self.imbalance_raw.abs() >= self.raw_step {
866 self.core.build_now_and_send();
867 self.imbalance_raw = 0;
868 }
869 }
870 }
871
872 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
873 self.core.builder.update_bar(bar, volume, ts_init);
874 }
875}
876
877pub struct VolumeRunsBarAggregator {
879 core: BarAggregatorCore,
880 current_run_side: Option<AggressorSide>,
881 run_volume_raw: QuantityRaw,
882 raw_step: QuantityRaw,
883}
884
885impl Debug for VolumeRunsBarAggregator {
886 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887 f.debug_struct(stringify!(VolumeRunsBarAggregator))
888 .field("core", &self.core)
889 .field("current_run_side", &self.current_run_side)
890 .field("run_volume_raw", &self.run_volume_raw)
891 .field("raw_step", &self.raw_step)
892 .finish()
893 }
894}
895
896impl VolumeRunsBarAggregator {
897 pub fn new<H: FnMut(Bar) + 'static>(
903 bar_type: BarType,
904 price_precision: u8,
905 size_precision: u8,
906 handler: H,
907 ) -> Self {
908 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
909 Self {
910 core: BarAggregatorCore::new(
911 bar_type.standard(),
912 price_precision,
913 size_precision,
914 handler,
915 ),
916 current_run_side: None,
917 run_volume_raw: 0,
918 raw_step,
919 }
920 }
921}
922
923impl BarAggregator for VolumeRunsBarAggregator {
924 fn bar_type(&self) -> BarType {
925 self.core.bar_type
926 }
927
928 fn is_running(&self) -> bool {
929 self.core.is_running
930 }
931
932 fn set_is_running(&mut self, value: bool) {
933 self.core.set_is_running(value);
934 }
935
936 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
941 self.core.apply_update(price, size, ts_init);
942 }
943
944 fn handle_trade(&mut self, trade: TradeTick) {
945 let side = match trade.aggressor_side {
946 AggressorSide::Buyer => Some(AggressorSide::Buyer),
947 AggressorSide::Seller => Some(AggressorSide::Seller),
948 AggressorSide::NoAggressor => None,
949 };
950
951 let Some(side) = side else {
952 self.core
953 .apply_update(trade.price, trade.size, trade.ts_init);
954 return;
955 };
956
957 if self.current_run_side != Some(side) {
958 self.current_run_side = Some(side);
959 self.run_volume_raw = 0;
960 self.core.builder.reset();
961 }
962
963 let mut raw_remaining = trade.size.raw;
964 while raw_remaining > 0 {
965 let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
966 let raw_chunk = raw_remaining.min(needed);
967
968 self.core.apply_update(
969 trade.price,
970 Quantity::from_raw(raw_chunk, trade.size.precision),
971 trade.ts_init,
972 );
973
974 self.run_volume_raw += raw_chunk;
975 raw_remaining -= raw_chunk;
976
977 if self.run_volume_raw >= self.raw_step {
978 self.core.build_now_and_send();
979 self.run_volume_raw = 0;
980 self.current_run_side = None;
981 }
982 }
983 }
984
985 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
986 self.core.builder.update_bar(bar, volume, ts_init);
987 }
988}
989
990pub struct ValueBarAggregator {
995 core: BarAggregatorCore,
996 cum_value: f64,
997}
998
999impl Debug for ValueBarAggregator {
1000 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1001 f.debug_struct(stringify!(ValueBarAggregator))
1002 .field("core", &self.core)
1003 .field("cum_value", &self.cum_value)
1004 .finish()
1005 }
1006}
1007
1008impl ValueBarAggregator {
1009 pub fn new<H: FnMut(Bar) + 'static>(
1015 bar_type: BarType,
1016 price_precision: u8,
1017 size_precision: u8,
1018 handler: H,
1019 ) -> Self {
1020 Self {
1021 core: BarAggregatorCore::new(
1022 bar_type.standard(),
1023 price_precision,
1024 size_precision,
1025 handler,
1026 ),
1027 cum_value: 0.0,
1028 }
1029 }
1030
1031 #[must_use]
1032 pub const fn get_cumulative_value(&self) -> f64 {
1034 self.cum_value
1035 }
1036}
1037
1038impl BarAggregator for ValueBarAggregator {
1039 fn bar_type(&self) -> BarType {
1040 self.core.bar_type
1041 }
1042
1043 fn is_running(&self) -> bool {
1044 self.core.is_running
1045 }
1046
1047 fn set_is_running(&mut self, value: bool) {
1048 self.core.set_is_running(value);
1049 }
1050
1051 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1053 let mut size_update = size.as_f64();
1054 let spec = self.core.bar_type.spec();
1055
1056 while size_update > 0.0 {
1057 let value_update = price.as_f64() * size_update;
1058 if value_update == 0.0 {
1059 self.core
1061 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
1062 break;
1063 }
1064
1065 if self.cum_value + value_update < spec.step.get() as f64 {
1066 self.cum_value += value_update;
1067 self.core
1068 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
1069 break;
1070 }
1071
1072 let value_diff = spec.step.get() as f64 - self.cum_value;
1073 let mut size_diff = size_update * (value_diff / value_update);
1074
1075 if is_below_min_size(size_diff, size.precision) {
1077 if is_below_min_size(size_update, size.precision) {
1078 break;
1079 }
1080 size_diff = min_size_f64(size.precision);
1081 }
1082
1083 self.core
1084 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1085
1086 self.core.build_now_and_send();
1087 self.cum_value = 0.0;
1088 size_update -= size_diff;
1089 }
1090 }
1091
1092 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1093 let mut volume_update = volume;
1094 let average_price = Price::new(
1095 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1096 self.core.builder.price_precision,
1097 );
1098
1099 while volume_update.as_f64() > 0.0 {
1100 let value_update = average_price.as_f64() * volume_update.as_f64();
1101 if value_update == 0.0 {
1102 self.core.builder.update_bar(bar, volume_update, ts_init);
1104 break;
1105 }
1106
1107 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1108 self.cum_value += value_update;
1109 self.core.builder.update_bar(bar, volume_update, ts_init);
1110 break;
1111 }
1112
1113 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1114 let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1115
1116 if is_below_min_size(volume_diff, volume_update.precision) {
1118 if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1119 break;
1120 }
1121 volume_diff = min_size_f64(volume_update.precision);
1122 }
1123
1124 self.core.builder.update_bar(
1125 bar,
1126 Quantity::new(volume_diff, volume_update.precision),
1127 ts_init,
1128 );
1129
1130 self.core.build_now_and_send();
1131 self.cum_value = 0.0;
1132 volume_update = Quantity::new(
1133 volume_update.as_f64() - volume_diff,
1134 volume_update.precision,
1135 );
1136 }
1137 }
1138}
1139
1140pub struct ValueImbalanceBarAggregator {
1142 core: BarAggregatorCore,
1143 imbalance_value: f64,
1144 step_value: f64,
1145}
1146
1147impl Debug for ValueImbalanceBarAggregator {
1148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1149 f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1150 .field("core", &self.core)
1151 .field("imbalance_value", &self.imbalance_value)
1152 .field("step_value", &self.step_value)
1153 .finish()
1154 }
1155}
1156
1157impl ValueImbalanceBarAggregator {
1158 pub fn new<H: FnMut(Bar) + 'static>(
1164 bar_type: BarType,
1165 price_precision: u8,
1166 size_precision: u8,
1167 handler: H,
1168 ) -> Self {
1169 Self {
1170 core: BarAggregatorCore::new(
1171 bar_type.standard(),
1172 price_precision,
1173 size_precision,
1174 handler,
1175 ),
1176 imbalance_value: 0.0,
1177 step_value: bar_type.spec().step.get() as f64,
1178 }
1179 }
1180}
1181
1182impl BarAggregator for ValueImbalanceBarAggregator {
1183 fn bar_type(&self) -> BarType {
1184 self.core.bar_type
1185 }
1186
1187 fn is_running(&self) -> bool {
1188 self.core.is_running
1189 }
1190
1191 fn set_is_running(&mut self, value: bool) {
1192 self.core.set_is_running(value);
1193 }
1194
1195 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1200 self.core.apply_update(price, size, ts_init);
1201 }
1202
1203 fn handle_trade(&mut self, trade: TradeTick) {
1204 let price_f64 = trade.price.as_f64();
1205 if price_f64 == 0.0 {
1206 self.core
1207 .apply_update(trade.price, trade.size, trade.ts_init);
1208 return;
1209 }
1210
1211 let side_sign = match trade.aggressor_side {
1212 AggressorSide::Buyer => 1.0,
1213 AggressorSide::Seller => -1.0,
1214 AggressorSide::NoAggressor => {
1215 self.core
1216 .apply_update(trade.price, trade.size, trade.ts_init);
1217 return;
1218 }
1219 };
1220
1221 let mut size_remaining = trade.size.as_f64();
1222 while size_remaining > 0.0 {
1223 let value_remaining = price_f64 * size_remaining;
1224
1225 #[allow(clippy::float_cmp, reason = "exact-zero check on accumulator")]
1226 if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1227 let needed = self.step_value - self.imbalance_value.abs();
1228 if value_remaining <= needed {
1229 self.imbalance_value += side_sign * value_remaining;
1230 self.core.apply_update(
1231 trade.price,
1232 Quantity::new(size_remaining, trade.size.precision),
1233 trade.ts_init,
1234 );
1235
1236 if self.imbalance_value.abs() >= self.step_value {
1237 self.core.build_now_and_send();
1238 self.imbalance_value = 0.0;
1239 }
1240 break;
1241 }
1242
1243 let mut value_chunk = needed;
1244 let mut size_chunk = value_chunk / price_f64;
1245
1246 if is_below_min_size(size_chunk, trade.size.precision) {
1248 if is_below_min_size(size_remaining, trade.size.precision) {
1249 break;
1250 }
1251 size_chunk = min_size_f64(trade.size.precision);
1252 value_chunk = price_f64 * size_chunk;
1253 }
1254
1255 self.core.apply_update(
1256 trade.price,
1257 Quantity::new(size_chunk, trade.size.precision),
1258 trade.ts_init,
1259 );
1260 self.imbalance_value += side_sign * value_chunk;
1261 size_remaining -= size_chunk;
1262
1263 if self.imbalance_value.abs() >= self.step_value {
1264 self.core.build_now_and_send();
1265 self.imbalance_value = 0.0;
1266 }
1267 } else {
1268 let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1270 let mut size_chunk = value_to_flatten / price_f64;
1271
1272 if is_below_min_size(size_chunk, trade.size.precision) {
1274 if is_below_min_size(size_remaining, trade.size.precision) {
1275 break;
1276 }
1277 size_chunk = min_size_f64(trade.size.precision);
1278 value_to_flatten = price_f64 * size_chunk;
1279 }
1280
1281 self.core.apply_update(
1282 trade.price,
1283 Quantity::new(size_chunk, trade.size.precision),
1284 trade.ts_init,
1285 );
1286 self.imbalance_value += side_sign * value_to_flatten;
1287
1288 if self.imbalance_value.abs() >= self.step_value {
1290 self.core.build_now_and_send();
1291 self.imbalance_value = 0.0;
1292 }
1293 size_remaining -= size_chunk;
1294 }
1295 }
1296 }
1297
1298 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1299 self.core.builder.update_bar(bar, volume, ts_init);
1300 }
1301}
1302
1303pub struct ValueRunsBarAggregator {
1305 core: BarAggregatorCore,
1306 current_run_side: Option<AggressorSide>,
1307 run_value: f64,
1308 step_value: f64,
1309}
1310
1311impl Debug for ValueRunsBarAggregator {
1312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1313 f.debug_struct(stringify!(ValueRunsBarAggregator))
1314 .field("core", &self.core)
1315 .field("current_run_side", &self.current_run_side)
1316 .field("run_value", &self.run_value)
1317 .field("step_value", &self.step_value)
1318 .finish()
1319 }
1320}
1321
1322impl ValueRunsBarAggregator {
1323 pub fn new<H: FnMut(Bar) + 'static>(
1329 bar_type: BarType,
1330 price_precision: u8,
1331 size_precision: u8,
1332 handler: H,
1333 ) -> Self {
1334 Self {
1335 core: BarAggregatorCore::new(
1336 bar_type.standard(),
1337 price_precision,
1338 size_precision,
1339 handler,
1340 ),
1341 current_run_side: None,
1342 run_value: 0.0,
1343 step_value: bar_type.spec().step.get() as f64,
1344 }
1345 }
1346}
1347
1348impl BarAggregator for ValueRunsBarAggregator {
1349 fn bar_type(&self) -> BarType {
1350 self.core.bar_type
1351 }
1352
1353 fn is_running(&self) -> bool {
1354 self.core.is_running
1355 }
1356
1357 fn set_is_running(&mut self, value: bool) {
1358 self.core.set_is_running(value);
1359 }
1360
1361 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1366 self.core.apply_update(price, size, ts_init);
1367 }
1368
1369 fn handle_trade(&mut self, trade: TradeTick) {
1370 let price_f64 = trade.price.as_f64();
1371 if price_f64 == 0.0 {
1372 self.core
1373 .apply_update(trade.price, trade.size, trade.ts_init);
1374 return;
1375 }
1376
1377 let side = match trade.aggressor_side {
1378 AggressorSide::Buyer => Some(AggressorSide::Buyer),
1379 AggressorSide::Seller => Some(AggressorSide::Seller),
1380 AggressorSide::NoAggressor => None,
1381 };
1382
1383 let Some(side) = side else {
1384 self.core
1385 .apply_update(trade.price, trade.size, trade.ts_init);
1386 return;
1387 };
1388
1389 if self.current_run_side != Some(side) {
1390 self.current_run_side = Some(side);
1391 self.run_value = 0.0;
1392 self.core.builder.reset();
1393 }
1394
1395 let mut size_remaining = trade.size.as_f64();
1396 while size_remaining > 0.0 {
1397 let value_update = price_f64 * size_remaining;
1398 if self.run_value + value_update < self.step_value {
1399 self.run_value += value_update;
1400 self.core.apply_update(
1401 trade.price,
1402 Quantity::new(size_remaining, trade.size.precision),
1403 trade.ts_init,
1404 );
1405 break;
1406 }
1407
1408 let value_needed = self.step_value - self.run_value;
1409 let mut size_chunk = value_needed / price_f64;
1410
1411 if is_below_min_size(size_chunk, trade.size.precision) {
1413 if is_below_min_size(size_remaining, trade.size.precision) {
1414 break;
1415 }
1416 size_chunk = min_size_f64(trade.size.precision);
1417 }
1418
1419 self.core.apply_update(
1420 trade.price,
1421 Quantity::new(size_chunk, trade.size.precision),
1422 trade.ts_init,
1423 );
1424
1425 self.core.build_now_and_send();
1426 self.run_value = 0.0;
1427 self.current_run_side = None;
1428 size_remaining -= size_chunk;
1429 }
1430 }
1431
1432 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1433 self.core.builder.update_bar(bar, volume, ts_init);
1434 }
1435}
1436
1437pub struct RenkoBarAggregator {
1443 core: BarAggregatorCore,
1444 pub brick_size: PriceRaw,
1445 last_close: Option<Price>,
1446}
1447
1448impl Debug for RenkoBarAggregator {
1449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1450 f.debug_struct(stringify!(RenkoBarAggregator))
1451 .field("core", &self.core)
1452 .field("brick_size", &self.brick_size)
1453 .field("last_close", &self.last_close)
1454 .finish()
1455 }
1456}
1457
1458impl RenkoBarAggregator {
1459 pub fn new<H: FnMut(Bar) + 'static>(
1465 bar_type: BarType,
1466 price_precision: u8,
1467 size_precision: u8,
1468 price_increment: Price,
1469 handler: H,
1470 ) -> Self {
1471 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1473
1474 Self {
1475 core: BarAggregatorCore::new(
1476 bar_type.standard(),
1477 price_precision,
1478 size_precision,
1479 handler,
1480 ),
1481 brick_size,
1482 last_close: None,
1483 }
1484 }
1485}
1486
1487impl BarAggregator for RenkoBarAggregator {
1488 fn bar_type(&self) -> BarType {
1489 self.core.bar_type
1490 }
1491
1492 fn is_running(&self) -> bool {
1493 self.core.is_running
1494 }
1495
1496 fn set_is_running(&mut self, value: bool) {
1497 self.core.set_is_running(value);
1498 }
1499
1500 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1505 self.core.apply_update(price, size, ts_init);
1507
1508 if self.last_close.is_none() {
1510 self.last_close = Some(price);
1511 return;
1512 }
1513
1514 let last_close = self.last_close.unwrap();
1515
1516 let current_raw = price.raw;
1518 let last_close_raw = last_close.raw;
1519 let price_diff_raw = current_raw - last_close_raw;
1520 let abs_price_diff_raw = price_diff_raw.abs();
1521
1522 if abs_price_diff_raw >= self.brick_size {
1524 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1525 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1526 let mut current_close = last_close;
1527
1528 let total_volume = self.core.builder.volume;
1530
1531 for _i in 0..num_bricks {
1532 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1534 let brick_close = Price::from_raw(brick_close_raw, price.precision);
1535
1536 let (brick_high, brick_low) = if direction > 0.0 {
1538 (brick_close, current_close)
1539 } else {
1540 (current_close, brick_close)
1541 };
1542
1543 self.core.builder.reset();
1545 self.core.builder.open = Some(current_close);
1546 self.core.builder.high = Some(brick_high);
1547 self.core.builder.low = Some(brick_low);
1548 self.core.builder.close = Some(brick_close);
1549 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1551 self.core.builder.ts_last = ts_init;
1552 self.core.builder.initialized = true;
1553
1554 self.core.build_and_send(ts_init, ts_init);
1556
1557 current_close = brick_close;
1559 self.last_close = Some(brick_close);
1560 }
1561 }
1562 }
1563
1564 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1565 self.core.builder.update_bar(bar, volume, ts_init);
1567
1568 if self.last_close.is_none() {
1570 self.last_close = Some(bar.close);
1571 return;
1572 }
1573
1574 let last_close = self.last_close.unwrap();
1575
1576 let current_raw = bar.close.raw;
1578 let last_close_raw = last_close.raw;
1579 let price_diff_raw = current_raw - last_close_raw;
1580 let abs_price_diff_raw = price_diff_raw.abs();
1581
1582 if abs_price_diff_raw >= self.brick_size {
1584 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1585 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1586 let mut current_close = last_close;
1587
1588 let total_volume = self.core.builder.volume;
1590
1591 for _i in 0..num_bricks {
1592 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1594 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1595
1596 let (brick_high, brick_low) = if direction > 0.0 {
1598 (brick_close, current_close)
1599 } else {
1600 (current_close, brick_close)
1601 };
1602
1603 self.core.builder.reset();
1605 self.core.builder.open = Some(current_close);
1606 self.core.builder.high = Some(brick_high);
1607 self.core.builder.low = Some(brick_low);
1608 self.core.builder.close = Some(brick_close);
1609 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1611 self.core.builder.ts_last = ts_init;
1612 self.core.builder.initialized = true;
1613
1614 self.core.build_and_send(ts_init, ts_init);
1616
1617 current_close = brick_close;
1619 self.last_close = Some(brick_close);
1620 }
1621 }
1622 }
1623}
1624
1625pub struct TimeBarAggregator {
1629 core: BarAggregatorCore,
1630 clock: Rc<RefCell<dyn Clock>>,
1631 build_with_no_updates: bool,
1632 timestamp_on_close: bool,
1633 is_left_open: bool,
1634 stored_open_ns: UnixNanos,
1635 timer_name: String,
1636 interval_ns: UnixNanos,
1637 next_close_ns: UnixNanos,
1638 first_close_ns: UnixNanos,
1639 bar_build_delay: u64,
1640 time_bars_origin_offset: Option<TimeDelta>,
1641 skip_first_non_full_bar: bool,
1642 pub historical_mode: bool,
1643 historical_events: Vec<TimeEvent>,
1644 historical_event_at_ts_init: Option<TimeEvent>,
1645 aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1646}
1647
1648impl Debug for TimeBarAggregator {
1649 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1650 f.debug_struct(stringify!(TimeBarAggregator))
1651 .field("core", &self.core)
1652 .field("build_with_no_updates", &self.build_with_no_updates)
1653 .field("timestamp_on_close", &self.timestamp_on_close)
1654 .field("is_left_open", &self.is_left_open)
1655 .field("timer_name", &self.timer_name)
1656 .field("interval_ns", &self.interval_ns)
1657 .field("bar_build_delay", &self.bar_build_delay)
1658 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1659 .finish()
1660 }
1661}
1662
1663impl TimeBarAggregator {
1664 #[expect(clippy::too_many_arguments)]
1670 pub fn new<H: FnMut(Bar) + 'static>(
1671 bar_type: BarType,
1672 price_precision: u8,
1673 size_precision: u8,
1674 clock: Rc<RefCell<dyn Clock>>,
1675 handler: H,
1676 build_with_no_updates: bool,
1677 timestamp_on_close: bool,
1678 interval_type: BarIntervalType,
1679 time_bars_origin_offset: Option<TimeDelta>,
1680 bar_build_delay: u64,
1681 skip_first_non_full_bar: bool,
1682 ) -> Self {
1683 let is_left_open = match interval_type {
1684 BarIntervalType::LeftOpen => true,
1685 BarIntervalType::RightOpen => false,
1686 };
1687
1688 let core = BarAggregatorCore::new(
1689 bar_type.standard(),
1690 price_precision,
1691 size_precision,
1692 handler,
1693 );
1694
1695 Self {
1696 core,
1697 clock,
1698 build_with_no_updates,
1699 timestamp_on_close,
1700 is_left_open,
1701 stored_open_ns: UnixNanos::default(),
1702 timer_name: format!("TIME_BAR_{bar_type}"),
1703 interval_ns: get_bar_interval_ns(&bar_type),
1704 next_close_ns: UnixNanos::default(),
1705 first_close_ns: UnixNanos::default(),
1706 bar_build_delay,
1707 time_bars_origin_offset,
1708 skip_first_non_full_bar,
1709 historical_mode: false,
1710 historical_events: Vec::new(),
1711 historical_event_at_ts_init: None,
1712 aggregator_weak: None,
1713 }
1714 }
1715
1716 pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1718 self.clock = clock;
1719 }
1720
1721 pub fn start_timer_internal(
1730 &mut self,
1731 aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1732 ) {
1733 let aggregator_weak = if let Some(rc) = aggregator_rc {
1735 let weak = Rc::downgrade(&rc);
1737 self.aggregator_weak = Some(weak.clone());
1738 weak
1739 } else {
1740 self.aggregator_weak
1742 .as_ref()
1743 .expect("Aggregator weak reference must be set before calling start_timer()")
1744 .clone()
1745 };
1746
1747 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1748 if let Some(agg) = aggregator_weak.upgrade() {
1749 agg.borrow_mut().build_bar(&event);
1750 }
1751 }));
1752
1753 let now = self.clock.borrow().utc_now();
1755 let mut start_time =
1756 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1757 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1758
1759 let fire_immediately = start_time == now;
1761
1762 let spec = &self.bar_type().spec();
1763 let start_time_ns = UnixNanos::from(start_time);
1764 let step = spec.step.get() as u32;
1765
1766 if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1767 self.clock
1768 .borrow_mut()
1769 .set_timer_ns(
1770 &self.timer_name,
1771 self.interval_ns.as_u64(),
1772 Some(start_time_ns),
1773 None,
1774 Some(callback),
1775 Some(true), Some(fire_immediately),
1777 )
1778 .expect(FAILED);
1779
1780 if fire_immediately {
1781 self.next_close_ns = start_time_ns;
1782 } else {
1783 let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1784 self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1785 }
1786
1787 self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1788 } else {
1789 let alert_time = if fire_immediately {
1791 start_time
1792 } else if spec.aggregation == BarAggregation::Month {
1793 add_n_months(start_time, step).expect(FAILED)
1794 } else {
1795 add_n_years(start_time, step).expect(FAILED)
1796 };
1797
1798 self.clock
1799 .borrow_mut()
1800 .set_time_alert_ns(
1801 &self.timer_name,
1802 UnixNanos::from(alert_time),
1803 Some(callback),
1804 Some(true), )
1806 .expect(FAILED);
1807
1808 self.next_close_ns = UnixNanos::from(alert_time);
1809 self.stored_open_ns = if fire_immediately {
1812 if spec.aggregation == BarAggregation::Month {
1813 subtract_n_months_nanos(start_time_ns, step).expect(FAILED)
1814 } else {
1815 subtract_n_years_nanos(start_time_ns, step).expect(FAILED)
1816 }
1817 } else {
1818 start_time_ns
1819 };
1820 }
1821
1822 if self.skip_first_non_full_bar {
1823 self.first_close_ns = self.next_close_ns;
1824 }
1825
1826 log::debug!(
1827 "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1828 self.timer_name,
1829 start_time,
1830 self.historical_mode,
1831 fire_immediately,
1832 now,
1833 self.bar_build_delay
1834 );
1835 }
1836
1837 pub fn stop(&mut self) {
1839 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1840 }
1841
1842 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1843 if self.skip_first_non_full_bar && ts_init <= self.first_close_ns {
1844 self.core.builder.reset();
1845 } else {
1846 self.skip_first_non_full_bar = false;
1849 self.core.build_and_send(ts_event, ts_init);
1850 }
1851 }
1852
1853 fn build_bar(&mut self, event: &TimeEvent) {
1854 if !self.core.builder.initialized {
1855 return;
1856 }
1857
1858 if !self.build_with_no_updates && self.core.builder.count == 0 {
1859 return; }
1861
1862 let ts_init = event.ts_event;
1863 let ts_event = if self.is_left_open {
1864 if self.timestamp_on_close {
1865 event.ts_event
1866 } else {
1867 self.stored_open_ns
1868 }
1869 } else {
1870 self.stored_open_ns
1871 };
1872
1873 self.build_and_send(ts_event, ts_init);
1874
1875 self.stored_open_ns = event.ts_event;
1877
1878 if self.bar_type().spec().aggregation == BarAggregation::Month {
1879 let step = self.bar_type().spec().step.get() as u32;
1880 let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1881
1882 self.clock
1883 .borrow_mut()
1884 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1885 .expect(FAILED);
1886
1887 self.next_close_ns = alert_time_ns;
1888 } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1889 let step = self.bar_type().spec().step.get() as u32;
1890 let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1891
1892 self.clock
1893 .borrow_mut()
1894 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1895 .expect(FAILED);
1896
1897 self.next_close_ns = alert_time_ns;
1898 } else {
1899 self.next_close_ns = self
1901 .clock
1902 .borrow()
1903 .next_time_ns(&self.timer_name)
1904 .unwrap_or_default();
1905 }
1906 }
1907
1908 fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1909 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1910 {
1912 let mut clock_borrow = self.clock.borrow_mut();
1913 let test_clock = clock_borrow
1914 .as_any_mut()
1915 .downcast_mut::<TestClock>()
1916 .expect("Expected TestClock in historical mode");
1917 test_clock.set_time(ts_init);
1918 }
1919 self.start_timer_internal(None);
1921 }
1922
1923 let events = {
1925 let mut clock_borrow = self.clock.borrow_mut();
1926 let test_clock = clock_borrow
1927 .as_any_mut()
1928 .downcast_mut::<TestClock>()
1929 .expect("Expected TestClock in historical mode");
1930 test_clock.advance_time(ts_init, true)
1931 };
1932
1933 for event in events {
1934 if event.ts_event == ts_init {
1935 self.historical_event_at_ts_init = Some(event);
1936 } else {
1937 self.build_bar(&event);
1938 }
1939 }
1940 }
1941
1942 fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1943 if let Some(ref event) = self.historical_event_at_ts_init.take() {
1944 self.build_bar(event);
1945 }
1946 }
1947
1948 pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1950 self.historical_events = events;
1951 }
1952}
1953
1954impl BarAggregator for TimeBarAggregator {
1955 fn bar_type(&self) -> BarType {
1956 self.core.bar_type
1957 }
1958
1959 fn is_running(&self) -> bool {
1960 self.core.is_running
1961 }
1962
1963 fn set_is_running(&mut self, value: bool) {
1964 self.core.set_is_running(value);
1965 }
1966
1967 fn stop(&mut self) {
1969 Self::stop(self);
1970 }
1971
1972 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1973 if self.historical_mode {
1974 self.preprocess_historical_events(ts_init);
1975 }
1976
1977 self.core.apply_update(price, size, ts_init);
1978
1979 if self.historical_mode {
1980 self.postprocess_historical_events(ts_init);
1981 }
1982 }
1983
1984 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1985 if self.historical_mode {
1986 self.preprocess_historical_events(ts_init);
1987 }
1988
1989 self.core.builder.update_bar(bar, volume, ts_init);
1990
1991 if self.historical_mode {
1992 self.postprocess_historical_events(ts_init);
1993 }
1994 }
1995
1996 fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1997 self.historical_mode = historical_mode;
1998 self.core.handler = handler;
1999 }
2000
2001 fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
2002 self.set_historical_events_internal(events);
2003 }
2004
2005 fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
2006 self.set_clock_internal(clock);
2007 }
2008
2009 fn build_bar(&mut self, event: &TimeEvent) {
2010 {
2013 #[expect(clippy::use_self)]
2014 TimeBarAggregator::build_bar(self, event);
2015 }
2016 }
2017
2018 fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
2019 self.aggregator_weak = Some(weak);
2020 }
2021
2022 fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
2023 self.start_timer_internal(aggregator_rc);
2024 }
2025}
2026
2027fn is_below_min_size(size: f64, precision: u8) -> bool {
2028 Quantity::new(size, precision).raw == 0
2029}
2030
2031fn min_size_f64(precision: u8) -> f64 {
2032 10_f64.powi(-(precision as i32))
2033}
2034
2035pub trait VegaProvider {
2037 fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64>;
2039}
2040
2041pub trait SpreadPriceRounder {
2043 fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price);
2045}
2046
2047#[derive(Debug, Default)]
2049pub struct MapVegaProvider {
2050 vegas: AHashMap<InstrumentId, f64>,
2051}
2052
2053impl MapVegaProvider {
2054 pub fn new() -> Self {
2055 Self {
2056 vegas: AHashMap::new(),
2057 }
2058 }
2059
2060 pub fn insert(&mut self, instrument_id: InstrumentId, vega: f64) {
2061 self.vegas.insert(instrument_id, vega);
2062 }
2063
2064 pub fn get(&self, instrument_id: &InstrumentId) -> Option<f64> {
2065 self.vegas.get(instrument_id).copied()
2066 }
2067}
2068
2069impl VegaProvider for MapVegaProvider {
2070 fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64> {
2071 self.vegas.get(&instrument_id).copied()
2072 }
2073}
2074
2075#[derive(Debug)]
2077pub struct FixedTickSchemeRounder {
2078 scheme: FixedTickScheme,
2079}
2080
2081impl FixedTickSchemeRounder {
2082 pub fn new(tick: f64) -> anyhow::Result<Self> {
2088 Ok(Self {
2089 scheme: FixedTickScheme::new(tick)?,
2090 })
2091 }
2092
2093 fn round_one(&self, raw: f64, precision: u8, use_bid_rounding: bool) -> Price {
2094 if raw >= 0.0 {
2095 let p = if use_bid_rounding {
2096 self.scheme.next_bid_price(raw, 0, precision)
2097 } else {
2098 self.scheme.next_ask_price(raw, 0, precision)
2099 };
2100 p.unwrap_or_else(|| price_from_f64(raw, precision))
2101 } else {
2102 let p = if use_bid_rounding {
2103 self.scheme.next_ask_price(-raw, 0, precision)
2104 } else {
2105 self.scheme.next_bid_price(-raw, 0, precision)
2106 };
2107 p.map_or_else(
2108 || price_from_f64(raw, precision),
2109 |q| price_from_f64(-q.as_f64(), precision),
2110 )
2111 }
2112 }
2113}
2114
2115impl SpreadPriceRounder for FixedTickSchemeRounder {
2116 fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price) {
2117 let bid = self.round_one(raw_bid, precision, true);
2118 let ask = self.round_one(raw_ask, precision, false);
2119 (bid, ask)
2120 }
2121}
2122
2123pub struct SpreadQuoteAggregator {
2129 spread_instrument_id: InstrumentId,
2130 leg_ids: Vec<InstrumentId>,
2131 ratios: Vec<i64>,
2132 n_legs: usize,
2133 is_futures_spread: bool,
2134 price_precision: u8,
2135 size_precision: u8,
2136 last_quotes: AHashMap<InstrumentId, QuoteTick>,
2137 mid_prices: Vec<f64>,
2138 bid_prices: Vec<f64>,
2139 ask_prices: Vec<f64>,
2140 vegas: Vec<f64>,
2141 bid_ask_spreads: Vec<f64>,
2142 bid_sizes: Vec<f64>,
2143 ask_sizes: Vec<f64>,
2144 handler: Box<dyn FnMut(QuoteTick)>,
2145 clock: Rc<RefCell<dyn Clock>>,
2146 historical_mode: bool,
2147 update_interval_seconds: Option<u64>,
2148 quote_build_delay: u64,
2149 has_update: bool,
2150 timer_name: String,
2151 historical_event_at_ts_init: Option<TimeEvent>,
2152 vega_provider: Option<Box<dyn VegaProvider>>,
2153 price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2154 is_running: bool,
2155 aggregator_weak: Option<Weak<RefCell<Self>>>,
2156}
2157
2158impl Debug for SpreadQuoteAggregator {
2159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2160 f.debug_struct(stringify!(SpreadQuoteAggregator))
2161 .field("spread_instrument_id", &self.spread_instrument_id)
2162 .field("n_legs", &self.n_legs)
2163 .field("is_futures_spread", &self.is_futures_spread)
2164 .field("update_interval_seconds", &self.update_interval_seconds)
2165 .finish()
2166 }
2167}
2168
2169impl SpreadQuoteAggregator {
2170 #[expect(clippy::too_many_arguments)]
2176 pub fn new(
2177 spread_instrument_id: InstrumentId,
2178 legs: &[(InstrumentId, i64)],
2179 is_futures_spread: bool,
2180 price_precision: u8,
2181 size_precision: u8,
2182 handler: Box<dyn FnMut(QuoteTick)>,
2183 clock: Rc<RefCell<dyn Clock>>,
2184 historical_mode: bool,
2185 update_interval_seconds: Option<u64>,
2186 quote_build_delay: u64,
2187 vega_provider: Option<Box<dyn VegaProvider>>,
2188 price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2189 ) -> Self {
2190 assert!(legs.len() >= 2, "Spread must have more than one leg");
2191 let n_legs = legs.len();
2192 let leg_ids: Vec<InstrumentId> = legs.iter().map(|(id, _)| *id).collect();
2193 let ratios: Vec<i64> = legs.iter().map(|(_, r)| *r).collect();
2194 for &r in &ratios {
2195 assert!(r != 0, "Ratio cannot be zero");
2196 }
2197 let timer_name = format!("SPREAD_QUOTE_{spread_instrument_id}");
2198 Self {
2199 spread_instrument_id,
2200 leg_ids,
2201 ratios,
2202 n_legs,
2203 is_futures_spread,
2204 price_precision,
2205 size_precision,
2206 last_quotes: AHashMap::new(),
2207 mid_prices: vec![0.0; n_legs],
2208 bid_prices: vec![0.0; n_legs],
2209 ask_prices: vec![0.0; n_legs],
2210 vegas: vec![0.0; n_legs],
2211 bid_ask_spreads: vec![0.0; n_legs],
2212 bid_sizes: vec![0.0; n_legs],
2213 ask_sizes: vec![0.0; n_legs],
2214 handler,
2215 clock,
2216 historical_mode,
2217 update_interval_seconds,
2218 quote_build_delay,
2219 has_update: false,
2220 timer_name,
2221 historical_event_at_ts_init: None,
2222 vega_provider,
2223 price_rounder,
2224 is_running: false,
2225 aggregator_weak: None,
2226 }
2227 }
2228
2229 pub fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Self>>) {
2232 self.aggregator_weak = Some(weak);
2233 }
2234
2235 pub fn prepare_for_timer_mode(&mut self, self_rc: &Rc<RefCell<Self>>) {
2240 self.aggregator_weak = Some(Rc::downgrade(self_rc));
2241 }
2242
2243 pub fn set_historical_mode(
2245 &mut self,
2246 historical_mode: bool,
2247 handler: Box<dyn FnMut(QuoteTick)>,
2248 vega_provider: Option<Box<dyn VegaProvider>>,
2249 ) {
2250 self.historical_mode = historical_mode;
2251 self.handler = handler;
2252
2253 if let Some(vp) = vega_provider {
2254 self.vega_provider = Some(vp);
2255 }
2256 }
2257
2258 pub fn set_running(&mut self, is_running: bool) {
2259 self.is_running = is_running;
2260 }
2261
2262 pub fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
2263 self.clock = clock;
2264 }
2265
2266 pub fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Self>>>) {
2275 let Some(interval_secs) = self.update_interval_seconds else {
2276 return;
2277 };
2278 let aggregator_weak = if let Some(rc) = aggregator_rc {
2279 let weak = Rc::downgrade(&rc);
2280 self.aggregator_weak = Some(weak.clone());
2281 weak
2282 } else {
2283 self.aggregator_weak.clone().expect(
2284 "SpreadQuoteAggregator: timer mode requires prepare_for_timer_mode(rc) to be \
2285 called first with the Rc that wraps this aggregator (before feeding quotes in \
2286 historical mode or before start_timer(None)).",
2287 )
2288 };
2289
2290 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
2291 if let Some(agg) = aggregator_weak.upgrade() {
2292 agg.borrow_mut().on_timer_fire(event.ts_event);
2293 }
2294 }));
2295
2296 let now_ns = self.clock.borrow().timestamp_ns();
2297 let interval_ns = interval_secs * 1_000_000_000;
2298 let start_ns = (now_ns.as_u64() / interval_ns) * interval_ns;
2299 let start_ns = start_ns + self.quote_build_delay * 1_000; let start_time = UnixNanos::from(start_ns);
2301 let fire_immediately = now_ns == start_time;
2302 self.clock
2303 .borrow_mut()
2304 .set_timer_ns(
2305 &self.timer_name,
2306 interval_ns,
2307 Some(start_time),
2308 None,
2309 Some(callback),
2310 Some(true),
2311 Some(fire_immediately),
2312 )
2313 .expect("Failed to set spread quote timer");
2314 }
2315
2316 pub fn on_timer_fire(&mut self, ts_event: UnixNanos) {
2318 if self.last_quotes.len() == self.n_legs {
2319 self.build_and_send_quote(ts_event);
2320 }
2321 }
2322
2323 pub fn stop_timer(&mut self) {
2325 if self.update_interval_seconds.is_none() {
2326 return;
2327 }
2328
2329 if self
2330 .clock
2331 .borrow()
2332 .timer_names()
2333 .contains(&self.timer_name.as_str())
2334 {
2335 self.clock.borrow_mut().cancel_timer(&self.timer_name);
2336 }
2337 }
2338
2339 pub fn handle_quote_tick(&mut self, tick: QuoteTick) {
2341 let ts_init = tick.ts_init;
2342
2343 if self.update_interval_seconds.is_some() && self.historical_mode {
2344 self.process_historical_events(ts_init);
2345 }
2346 self.last_quotes.insert(tick.instrument_id, tick);
2347 self.has_update = true;
2348
2349 if self.update_interval_seconds.is_none() && self.last_quotes.len() == self.n_legs {
2350 self.build_and_send_quote(ts_init);
2351 }
2352 }
2353
2354 pub fn flush_pending_historical_quote(&mut self) {
2360 if self.update_interval_seconds.is_none() || !self.historical_mode {
2361 return;
2362 }
2363
2364 let Some(event) = self.historical_event_at_ts_init.take() else {
2365 return;
2366 };
2367
2368 if self.last_quotes.len() == self.n_legs {
2369 self.build_and_send_quote(event.ts_event);
2370 }
2371 }
2372
2373 fn process_historical_events(&mut self, ts_init: UnixNanos) {
2379 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
2380 let mut clock_borrow = self.clock.borrow_mut();
2381 let test_clock = clock_borrow
2382 .as_any_mut()
2383 .downcast_mut::<TestClock>()
2384 .expect("Expected TestClock in historical mode");
2385 test_clock.set_time(ts_init);
2386 drop(clock_borrow);
2387 self.start_timer(None);
2388 }
2389
2390 if self.last_quotes.len() == self.n_legs
2391 && let Some(ref event) = self.historical_event_at_ts_init
2392 && event.ts_event < ts_init
2393 {
2394 let event = self.historical_event_at_ts_init.take().unwrap();
2396 self.build_and_send_quote(event.ts_event);
2397 }
2398
2399 let events = {
2400 let mut clock_borrow = self.clock.borrow_mut();
2401 let test_clock = clock_borrow
2402 .as_any_mut()
2403 .downcast_mut::<TestClock>()
2404 .expect("Expected TestClock in historical mode");
2405 test_clock.advance_time(ts_init, true)
2406 };
2407
2408 for event in events {
2409 if event.ts_event == ts_init {
2410 self.historical_event_at_ts_init = Some(event);
2411 } else if self.last_quotes.len() == self.n_legs {
2412 self.build_and_send_quote(event.ts_event);
2413 }
2414 }
2415 }
2416
2417 fn build_and_send_quote(&mut self, ts_event: UnixNanos) {
2419 if !self.has_update {
2420 return;
2421 }
2422
2423 for (idx, &leg_id) in self.leg_ids.iter().enumerate() {
2424 let Some(tick) = self.last_quotes.get(&leg_id) else {
2425 log::error!(
2426 "SpreadQuoteAggregator[{}]: Missing quote for leg {}",
2427 self.spread_instrument_id,
2428 leg_id
2429 );
2430 return;
2431 };
2432 let ask_price = tick.ask_price.as_f64();
2433 let bid_price = tick.bid_price.as_f64();
2434 self.bid_prices[idx] = bid_price;
2435 self.ask_prices[idx] = ask_price;
2436 self.bid_sizes[idx] = tick.bid_size.as_f64();
2437 self.ask_sizes[idx] = tick.ask_size.as_f64();
2438
2439 if !self.is_futures_spread {
2440 self.mid_prices[idx] = (ask_price + bid_price) * 0.5;
2441 self.bid_ask_spreads[idx] = ask_price - bid_price;
2442
2443 if let Some(ref vp) = self.vega_provider
2444 && let Some(vega) = vp.vega_for_leg(leg_id)
2445 {
2446 self.vegas[idx] = vega;
2447 }
2448 }
2449 }
2450 let (raw_bid, raw_ask) = if self.is_futures_spread {
2451 self.create_futures_spread_prices()
2452 } else {
2453 self.create_option_spread_prices()
2454 };
2455 let spread_quote = self.create_quote_tick_from_raw_prices(raw_bid, raw_ask, ts_event);
2456 self.has_update = false;
2457 (self.handler)(spread_quote);
2458 }
2459
2460 fn create_option_spread_prices(&self) -> (f64, f64) {
2461 let vega_multipliers: Vec<f64> = (0..self.n_legs)
2462 .map(|i| {
2463 if self.vegas[i] == 0.0 {
2464 0.0
2465 } else {
2466 self.bid_ask_spreads[i] / self.vegas[i]
2467 }
2468 })
2469 .collect();
2470 let non_zero: Vec<f64> = vega_multipliers
2471 .iter()
2472 .copied()
2473 .filter(|&x| x != 0.0)
2474 .collect();
2475
2476 if non_zero.is_empty() {
2477 log::warn!(
2478 "No vega information available for the components of {}. Will generate spread quote using component quotes only",
2479 self.spread_instrument_id
2480 );
2481 return self.create_futures_spread_prices();
2482 }
2483 let vega_multiplier = non_zero.iter().map(|x| x.abs()).sum::<f64>() / non_zero.len() as f64;
2484 let spread_vega = self
2485 .vegas
2486 .iter()
2487 .zip(self.ratios.iter())
2488 .map(|(v, r)| v * (*r as f64))
2489 .sum::<f64>()
2490 .abs();
2491 let bid_ask_spread = spread_vega * vega_multiplier;
2492 let spread_mid_price: f64 = self
2493 .mid_prices
2494 .iter()
2495 .zip(self.ratios.iter())
2496 .map(|(m, r)| m * (*r as f64))
2497 .sum();
2498 let raw_bid = spread_mid_price - bid_ask_spread * 0.5;
2499 let raw_ask = spread_mid_price + bid_ask_spread * 0.5;
2500 (raw_bid, raw_ask)
2501 }
2502
2503 fn create_futures_spread_prices(&self) -> (f64, f64) {
2504 let mut raw_ask = 0.0_f64;
2505 let mut raw_bid = 0.0_f64;
2506
2507 for i in 0..self.n_legs {
2508 let r = self.ratios[i] as f64;
2509 if self.ratios[i] >= 0 {
2510 raw_ask += r * self.ask_prices[i];
2511 raw_bid += r * self.bid_prices[i];
2512 } else {
2513 raw_ask += r * self.bid_prices[i];
2514 raw_bid += r * self.ask_prices[i];
2515 }
2516 }
2517 (raw_bid, raw_ask)
2518 }
2519
2520 fn create_quote_tick_from_raw_prices(
2521 &self,
2522 raw_bid_price: f64,
2523 raw_ask_price: f64,
2524 ts_event: UnixNanos,
2525 ) -> QuoteTick {
2526 let (bid_price, ask_price) = if let Some(ref rounder) = self.price_rounder {
2527 rounder.round_prices(raw_bid_price, raw_ask_price, self.price_precision)
2528 } else {
2529 let bid = price_from_f64(raw_bid_price, self.price_precision);
2530 let ask = price_from_f64(raw_ask_price, self.price_precision);
2531 (bid, ask)
2532 };
2533 let mut min_bid_size = f64::INFINITY;
2534 let mut min_ask_size = f64::INFINITY;
2535 for i in 0..self.n_legs {
2536 let abs_ratio = self.ratios[i].unsigned_abs() as f64;
2537 if self.ratios[i] >= 0 {
2538 let b = self.bid_sizes[i] / abs_ratio;
2539 if b < min_bid_size {
2540 min_bid_size = b;
2541 }
2542 let a = self.ask_sizes[i] / abs_ratio;
2543 if a < min_ask_size {
2544 min_ask_size = a;
2545 }
2546 } else {
2547 let b = self.ask_sizes[i] / abs_ratio;
2548 if b < min_bid_size {
2549 min_bid_size = b;
2550 }
2551 let a = self.bid_sizes[i] / abs_ratio;
2552 if a < min_ask_size {
2553 min_ask_size = a;
2554 }
2555 }
2556 }
2557 let bid_size = Quantity::new(min_bid_size, self.size_precision);
2558 let ask_size = Quantity::new(min_ask_size, self.size_precision);
2559 QuoteTick::new(
2560 self.spread_instrument_id,
2561 bid_price,
2562 ask_price,
2563 bid_size,
2564 ask_size,
2565 ts_event,
2566 ts_event,
2567 )
2568 }
2569}
2570
2571fn price_from_f64(v: f64, precision: u8) -> Price {
2572 Price::new(v, precision)
2573}
2574
2575#[cfg(test)]
2576mod tests {
2577 use std::sync::{Arc, Mutex};
2578
2579 use nautilus_common::{clock::TestClock, timer::TimeEvent};
2580 use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
2581 use nautilus_model::{
2582 data::{BarSpecification, BarType, QuoteTick},
2583 enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
2584 identifiers::InstrumentId,
2585 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
2586 types::{Price, Quantity},
2587 };
2588 use rstest::rstest;
2589 use ustr::Ustr;
2590
2591 use super::*;
2592
2593 #[rstest]
2594 fn test_bar_builder_initialization(equity_aapl: Equity) {
2595 let instrument = InstrumentAny::Equity(equity_aapl);
2596 let bar_type = BarType::new(
2597 instrument.id(),
2598 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2599 AggregationSource::Internal,
2600 );
2601 let builder = BarBuilder::new(
2602 bar_type,
2603 instrument.price_precision(),
2604 instrument.size_precision(),
2605 );
2606
2607 assert!(!builder.initialized);
2608 assert_eq!(builder.ts_last, 0);
2609 assert_eq!(builder.count, 0);
2610 }
2611
2612 #[rstest]
2613 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
2614 let instrument = InstrumentAny::Equity(equity_aapl);
2615 let bar_type = BarType::new(
2616 instrument.id(),
2617 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2618 AggregationSource::Internal,
2619 );
2620 let mut builder = BarBuilder::new(
2621 bar_type,
2622 instrument.price_precision(),
2623 instrument.size_precision(),
2624 );
2625
2626 builder.update(
2627 Price::from("100.00"),
2628 Quantity::from(1),
2629 UnixNanos::from(1000),
2630 );
2631 builder.update(
2632 Price::from("95.00"),
2633 Quantity::from(1),
2634 UnixNanos::from(2000),
2635 );
2636 builder.update(
2637 Price::from("105.00"),
2638 Quantity::from(1),
2639 UnixNanos::from(3000),
2640 );
2641
2642 let bar = builder.build_now();
2643 assert!(bar.high > bar.low);
2644 assert_eq!(bar.open, Price::from("100.00"));
2645 assert_eq!(bar.high, Price::from("105.00"));
2646 assert_eq!(bar.low, Price::from("95.00"));
2647 assert_eq!(bar.close, Price::from("105.00"));
2648 }
2649
2650 #[rstest]
2651 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
2652 let instrument = InstrumentAny::Equity(equity_aapl);
2653 let bar_type = BarType::new(
2654 instrument.id(),
2655 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2656 AggregationSource::Internal,
2657 );
2658 let mut builder = BarBuilder::new(
2659 bar_type,
2660 instrument.price_precision(),
2661 instrument.size_precision(),
2662 );
2663
2664 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2665 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2666
2667 assert_eq!(builder.ts_last, 1_000);
2668 assert_eq!(builder.count, 1);
2669 }
2670
2671 #[rstest]
2672 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2673 let instrument = InstrumentAny::Equity(equity_aapl);
2674 let bar_type = BarType::new(
2675 instrument.id(),
2676 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2677 AggregationSource::Internal,
2678 );
2679 let mut builder = BarBuilder::new(
2680 bar_type,
2681 instrument.price_precision(),
2682 instrument.size_precision(),
2683 );
2684
2685 builder.update(
2686 Price::from("1.00000"),
2687 Quantity::from(1),
2688 UnixNanos::default(),
2689 );
2690
2691 assert!(builder.initialized);
2692 assert_eq!(builder.ts_last, 0);
2693 assert_eq!(builder.count, 1);
2694 }
2695
2696 #[rstest]
2697 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2698 equity_aapl: Equity,
2699 ) {
2700 let instrument = InstrumentAny::Equity(equity_aapl);
2701 let bar_type = BarType::new(
2702 instrument.id(),
2703 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2704 AggregationSource::Internal,
2705 );
2706 let mut builder = BarBuilder::new(bar_type, 2, 0);
2707
2708 builder.update(
2709 Price::from("1.00000"),
2710 Quantity::from(1),
2711 UnixNanos::from(1_000),
2712 );
2713 builder.update(
2714 Price::from("1.00001"),
2715 Quantity::from(1),
2716 UnixNanos::from(500),
2717 );
2718
2719 assert!(builder.initialized);
2720 assert_eq!(builder.ts_last, 1_000);
2721 assert_eq!(builder.count, 1);
2722 }
2723
2724 #[rstest]
2725 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2726 let instrument = InstrumentAny::Equity(equity_aapl);
2727 let bar_type = BarType::new(
2728 instrument.id(),
2729 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2730 AggregationSource::Internal,
2731 );
2732 let mut builder = BarBuilder::new(
2733 bar_type,
2734 instrument.price_precision(),
2735 instrument.size_precision(),
2736 );
2737
2738 for _ in 0..5 {
2739 builder.update(
2740 Price::from("1.00000"),
2741 Quantity::from(1),
2742 UnixNanos::from(1_000),
2743 );
2744 }
2745
2746 assert_eq!(builder.count, 5);
2747 }
2748
2749 #[rstest]
2750 #[should_panic]
2751 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2752 let instrument = InstrumentAny::Equity(equity_aapl);
2753 let bar_type = BarType::new(
2754 instrument.id(),
2755 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2756 AggregationSource::Internal,
2757 );
2758 let mut builder = BarBuilder::new(
2759 bar_type,
2760 instrument.price_precision(),
2761 instrument.size_precision(),
2762 );
2763 let _ = builder.build_now();
2764 }
2765
2766 #[rstest]
2767 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2768 let instrument = InstrumentAny::Equity(equity_aapl);
2769 let bar_type = BarType::new(
2770 instrument.id(),
2771 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2772 AggregationSource::Internal,
2773 );
2774 let mut builder = BarBuilder::new(
2775 bar_type,
2776 instrument.price_precision(),
2777 instrument.size_precision(),
2778 );
2779
2780 builder.update(
2781 Price::from("1.00001"),
2782 Quantity::from(2),
2783 UnixNanos::default(),
2784 );
2785 builder.update(
2786 Price::from("1.00002"),
2787 Quantity::from(2),
2788 UnixNanos::default(),
2789 );
2790 builder.update(
2791 Price::from("1.00000"),
2792 Quantity::from(1),
2793 UnixNanos::from(1_000_000_000),
2794 );
2795
2796 let bar = builder.build_now();
2797
2798 assert_eq!(bar.open, Price::from("1.00001"));
2799 assert_eq!(bar.high, Price::from("1.00002"));
2800 assert_eq!(bar.low, Price::from("1.00000"));
2801 assert_eq!(bar.close, Price::from("1.00000"));
2802 assert_eq!(bar.volume, Quantity::from(5));
2803 assert_eq!(bar.ts_init, 1_000_000_000);
2804 assert_eq!(builder.ts_last, 1_000_000_000);
2805 assert_eq!(builder.count, 0);
2806 }
2807
2808 #[rstest]
2809 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2810 let instrument = InstrumentAny::Equity(equity_aapl);
2811 let bar_type = BarType::new(
2812 instrument.id(),
2813 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2814 AggregationSource::Internal,
2815 );
2816 let mut builder = BarBuilder::new(bar_type, 2, 0);
2817
2818 builder.update(
2819 Price::from("1.00001"),
2820 Quantity::from(1),
2821 UnixNanos::default(),
2822 );
2823 builder.build_now();
2824
2825 builder.update(
2826 Price::from("1.00000"),
2827 Quantity::from(1),
2828 UnixNanos::default(),
2829 );
2830 builder.update(
2831 Price::from("1.00003"),
2832 Quantity::from(1),
2833 UnixNanos::default(),
2834 );
2835 builder.update(
2836 Price::from("1.00002"),
2837 Quantity::from(1),
2838 UnixNanos::default(),
2839 );
2840
2841 let bar = builder.build_now();
2842
2843 assert_eq!(bar.open, Price::from("1.00000"));
2844 assert_eq!(bar.high, Price::from("1.00003"));
2845 assert_eq!(bar.low, Price::from("1.00000"));
2846 assert_eq!(bar.close, Price::from("1.00002"));
2847 assert_eq!(bar.volume, Quantity::from(3));
2848 }
2849
2850 #[rstest]
2851 fn test_bar_builder_update_bar_initializes_then_accumulates(equity_aapl: Equity) {
2852 let instrument = InstrumentAny::Equity(equity_aapl);
2853 let bar_type = BarType::new(
2854 instrument.id(),
2855 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2856 AggregationSource::Internal,
2857 );
2858 let mut builder = BarBuilder::new(
2859 bar_type,
2860 instrument.price_precision(),
2861 instrument.size_precision(),
2862 );
2863
2864 let bar_one = Bar::new(
2865 bar_type,
2866 Price::from("100.00"),
2867 Price::from("102.00"),
2868 Price::from("99.00"),
2869 Price::from("101.00"),
2870 Quantity::from(10),
2871 UnixNanos::from(1_000),
2872 UnixNanos::from(1_000),
2873 );
2874 let bar_two = Bar::new(
2875 bar_type,
2876 Price::from("101.00"),
2877 Price::from("103.00"),
2878 Price::from("98.00"),
2879 Price::from("102.00"),
2880 Quantity::from(5),
2881 UnixNanos::from(2_000),
2882 UnixNanos::from(2_000),
2883 );
2884
2885 builder.update_bar(bar_one, bar_one.volume, bar_one.ts_init);
2886 builder.update_bar(bar_two, bar_two.volume, bar_two.ts_init);
2887 let bar = builder.build_now();
2888
2889 assert_eq!(bar.open, Price::from("100.00"));
2890 assert_eq!(bar.high, Price::from("103.00"));
2891 assert_eq!(bar.low, Price::from("98.00"));
2892 assert_eq!(bar.close, Price::from("102.00"));
2893 assert_eq!(bar.volume, Quantity::from(15));
2894 assert_eq!(builder.count, 0);
2895 }
2896
2897 #[rstest]
2898 fn test_bar_builder_update_bar_ignores_earlier_timestamp(equity_aapl: Equity) {
2899 let instrument = InstrumentAny::Equity(equity_aapl);
2900 let bar_type = BarType::new(
2901 instrument.id(),
2902 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2903 AggregationSource::Internal,
2904 );
2905 let mut builder = BarBuilder::new(
2906 bar_type,
2907 instrument.price_precision(),
2908 instrument.size_precision(),
2909 );
2910
2911 let bar_later = Bar::new(
2912 bar_type,
2913 Price::from("100.00"),
2914 Price::from("101.00"),
2915 Price::from("99.00"),
2916 Price::from("100.50"),
2917 Quantity::from(10),
2918 UnixNanos::from(2_000),
2919 UnixNanos::from(2_000),
2920 );
2921 let bar_earlier = Bar::new(
2922 bar_type,
2923 Price::from("200.00"),
2924 Price::from("210.00"),
2925 Price::from("190.00"),
2926 Price::from("205.00"),
2927 Quantity::from(50),
2928 UnixNanos::from(1_000),
2929 UnixNanos::from(1_000),
2930 );
2931
2932 builder.update_bar(bar_later, bar_later.volume, bar_later.ts_init);
2933 builder.update_bar(bar_earlier, bar_earlier.volume, bar_earlier.ts_init);
2934
2935 assert_eq!(builder.ts_last, 2_000);
2936 assert_eq!(builder.count, 1);
2937 assert_eq!(builder.volume, Quantity::from(10));
2938 }
2939
2940 #[rstest]
2941 #[case::spread_zero_inactive(
2942 Decimal::ZERO,
2943 ContinuousFutureAdjustmentType::BackwardSpread,
2944 false
2945 )]
2946 #[case::spread_positive_active(
2947 Decimal::new(150, 2), ContinuousFutureAdjustmentType::BackwardSpread,
2949 true,
2950 )]
2951 #[case::spread_negative_active(
2952 Decimal::new(-250, 2), ContinuousFutureAdjustmentType::ForwardSpread,
2954 true,
2955 )]
2956 #[case::spread_sub_precision_inactive(
2957 Decimal::new(1, 28),
2959 ContinuousFutureAdjustmentType::BackwardSpread,
2960 false,
2961 )]
2962 #[case::ratio_one_inactive(Decimal::ONE, ContinuousFutureAdjustmentType::BackwardRatio, false)]
2963 #[case::ratio_non_one_active(
2964 Decimal::new(105, 2), ContinuousFutureAdjustmentType::ForwardRatio,
2966 true,
2967 )]
2968 fn test_bar_builder_set_adjustment_active_flag(
2969 equity_aapl: Equity,
2970 #[case] adjustment: Decimal,
2971 #[case] mode: ContinuousFutureAdjustmentType,
2972 #[case] expected_active: bool,
2973 ) {
2974 let instrument = InstrumentAny::Equity(equity_aapl);
2975 let bar_type = BarType::new(
2976 instrument.id(),
2977 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2978 AggregationSource::Internal,
2979 );
2980 let mut builder = BarBuilder::new(bar_type, 2, 0);
2981
2982 builder.set_adjustment(adjustment, mode);
2983
2984 assert_eq!(builder.adjustment_active, expected_active);
2985 assert_eq!(builder.adjustment_is_ratio, mode.is_ratio());
2986 assert_eq!(builder.adjustment_mode, mode);
2987 }
2988
2989 #[rstest]
2990 fn test_bar_builder_set_adjustment_mode_switch_resets_flags(equity_aapl: Equity) {
2991 let instrument = InstrumentAny::Equity(equity_aapl);
2992 let bar_type = BarType::new(
2993 instrument.id(),
2994 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2995 AggregationSource::Internal,
2996 );
2997 let mut builder = BarBuilder::new(bar_type, 2, 0);
2998
2999 builder.set_adjustment(
3001 Decimal::new(150, 2), ContinuousFutureAdjustmentType::BackwardRatio,
3003 );
3004 builder.set_adjustment(
3005 Decimal::new(50, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3007 );
3008 assert!(!builder.adjustment_is_ratio);
3009 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3010 assert_eq!(builder.build_now().close, Price::from("100.50"));
3011
3012 builder.set_adjustment(
3014 Decimal::new(11, 1), ContinuousFutureAdjustmentType::ForwardRatio,
3016 );
3017 assert!(builder.adjustment_is_ratio);
3018 builder.update(Price::from("100.00"), Quantity::from(1), 2_000.into());
3019 assert_eq!(builder.build_now().close, Price::from("110.00"));
3020 }
3021
3022 #[rstest]
3023 fn test_bar_builder_update_applies_backward_spread_adjustment(equity_aapl: Equity) {
3024 let instrument = InstrumentAny::Equity(equity_aapl);
3025 let bar_type = BarType::new(
3026 instrument.id(),
3027 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3028 AggregationSource::Internal,
3029 );
3030 let mut builder = BarBuilder::new(bar_type, 2, 0);
3031
3032 builder.set_adjustment(
3033 Decimal::new(250, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3035 );
3036
3037 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3038 builder.update(Price::from("99.00"), Quantity::from(1), 2_000.into());
3039 builder.update(Price::from("101.00"), Quantity::from(1), 3_000.into());
3040
3041 let bar = builder.build_now();
3042 assert_eq!(bar.open, Price::from("102.50"));
3043 assert_eq!(bar.high, Price::from("103.50"));
3044 assert_eq!(bar.low, Price::from("101.50"));
3045 assert_eq!(bar.close, Price::from("103.50"));
3046 }
3047
3048 #[rstest]
3049 fn test_bar_builder_update_applies_forward_ratio_adjustment(equity_aapl: Equity) {
3050 let instrument = InstrumentAny::Equity(equity_aapl);
3051 let bar_type = BarType::new(
3052 instrument.id(),
3053 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3054 AggregationSource::Internal,
3055 );
3056 let mut builder = BarBuilder::new(bar_type, 2, 0);
3057
3058 builder.set_adjustment(
3059 Decimal::new(11, 1), ContinuousFutureAdjustmentType::ForwardRatio,
3061 );
3062
3063 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3064 builder.update(Price::from("90.00"), Quantity::from(1), 2_000.into());
3065 builder.update(Price::from("110.00"), Quantity::from(1), 3_000.into());
3066
3067 let bar = builder.build_now();
3068 assert_eq!(bar.open, Price::from("110.00"));
3069 assert_eq!(bar.high, Price::from("121.00"));
3070 assert_eq!(bar.low, Price::from("99.00"));
3071 assert_eq!(bar.close, Price::from("121.00"));
3072 }
3073
3074 #[rstest]
3075 fn test_bar_builder_update_bar_applies_adjustment_to_ohlc(equity_aapl: Equity) {
3076 let instrument = InstrumentAny::Equity(equity_aapl);
3077 let bar_type = BarType::new(
3078 instrument.id(),
3079 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3080 AggregationSource::Internal,
3081 );
3082 let mut builder = BarBuilder::new(bar_type, 2, 0);
3083
3084 builder.set_adjustment(
3085 Decimal::new(-100, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3087 );
3088
3089 let input = Bar::new(
3090 bar_type,
3091 Price::from("100.00"),
3092 Price::from("105.00"),
3093 Price::from("99.00"),
3094 Price::from("102.00"),
3095 Quantity::from(10),
3096 UnixNanos::from(1_000),
3097 UnixNanos::from(1_000),
3098 );
3099 builder.update_bar(input, input.volume, input.ts_init);
3100
3101 let bar = builder.build_now();
3102 assert_eq!(bar.open, Price::from("99.00"));
3103 assert_eq!(bar.high, Price::from("104.00"));
3104 assert_eq!(bar.low, Price::from("98.00"));
3105 assert_eq!(bar.close, Price::from("101.00"));
3106 }
3107
3108 #[rstest]
3109 fn test_bar_builder_reset_retains_adjustment(equity_aapl: Equity) {
3110 let instrument = InstrumentAny::Equity(equity_aapl);
3111 let bar_type = BarType::new(
3112 instrument.id(),
3113 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3114 AggregationSource::Internal,
3115 );
3116 let mut builder = BarBuilder::new(bar_type, 2, 0);
3117
3118 builder.set_adjustment(
3119 Decimal::new(500, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3121 );
3122 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3123 let bar_one = builder.build_now();
3124 assert_eq!(bar_one.close, Price::from("105.00"));
3125
3126 assert!(builder.adjustment_active);
3128
3129 builder.update(Price::from("110.00"), Quantity::from(1), 2_000.into());
3130 let bar_two = builder.build_now();
3131 assert_eq!(bar_two.close, Price::from("115.00"));
3132 }
3133
3134 #[rstest]
3135 fn test_bar_builder_update_bar_applies_ratio_adjustment(equity_aapl: Equity) {
3136 let instrument = InstrumentAny::Equity(equity_aapl);
3137 let bar_type = BarType::new(
3138 instrument.id(),
3139 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3140 AggregationSource::Internal,
3141 );
3142 let mut builder = BarBuilder::new(bar_type, 2, 0);
3143
3144 builder.set_adjustment(
3145 Decimal::new(11, 1), ContinuousFutureAdjustmentType::ForwardRatio,
3147 );
3148
3149 let input = Bar::new(
3150 bar_type,
3151 Price::from("100.00"),
3152 Price::from("110.00"),
3153 Price::from("90.00"),
3154 Price::from("105.00"),
3155 Quantity::from(10),
3156 UnixNanos::from(1_000),
3157 UnixNanos::from(1_000),
3158 );
3159 builder.update_bar(input, input.volume, input.ts_init);
3160
3161 let bar = builder.build_now();
3162 assert_eq!(bar.open, Price::from("110.00"));
3163 assert_eq!(bar.high, Price::from("121.00"));
3164 assert_eq!(bar.low, Price::from("99.00"));
3165 assert_eq!(bar.close, Price::from("115.50"));
3166 }
3167
3168 #[rstest]
3169 fn test_bar_builder_spread_below_zero_representable(equity_aapl: Equity) {
3170 let instrument = InstrumentAny::Equity(equity_aapl);
3173 let bar_type = BarType::new(
3174 instrument.id(),
3175 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3176 AggregationSource::Internal,
3177 );
3178 let mut builder = BarBuilder::new(bar_type, 2, 0);
3179
3180 builder.set_adjustment(
3181 Decimal::new(-15000, 2), ContinuousFutureAdjustmentType::BackwardSpread,
3183 );
3184
3185 builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3186 let bar = builder.build_now();
3187 assert_eq!(bar.close, Price::from("-50.00"));
3188 assert!(bar.close.raw < 0);
3189 assert_eq!(bar.close.precision, 2);
3190 }
3191
3192 #[rstest]
3193 fn test_bar_builder_build_promotes_close_above_high_from_previous_close(equity_aapl: Equity) {
3194 let instrument = InstrumentAny::Equity(equity_aapl);
3195 let bar_type = BarType::new(
3196 instrument.id(),
3197 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3198 AggregationSource::Internal,
3199 );
3200 let mut builder = BarBuilder::new(bar_type, 2, 0);
3201
3202 builder.update(
3203 Price::from("110.00"),
3204 Quantity::from(1),
3205 UnixNanos::from(100),
3206 );
3207 builder.build_now();
3208
3209 builder.update(
3210 Price::from("100.00"),
3211 Quantity::from(1),
3212 UnixNanos::from(200),
3213 );
3214 builder.update(
3215 Price::from("101.00"),
3216 Quantity::from(1),
3217 UnixNanos::from(300),
3218 );
3219 builder.update(
3220 Price::from("200.00"),
3221 Quantity::from(1),
3222 UnixNanos::from(400),
3223 );
3224
3225 let bar = builder.build_now();
3226 assert_eq!(bar.open, Price::from("100.00"));
3227 assert_eq!(bar.high, Price::from("200.00"));
3228 assert_eq!(bar.low, Price::from("100.00"));
3229 assert_eq!(bar.close, Price::from("200.00"));
3230 }
3231
3232 #[rstest]
3233 fn test_bar_builder_build_clamps_low_to_close(equity_aapl: Equity) {
3234 let instrument = InstrumentAny::Equity(equity_aapl);
3238 let bar_type = BarType::new(
3239 instrument.id(),
3240 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3241 AggregationSource::Internal,
3242 );
3243 let mut builder = BarBuilder::new(bar_type, 2, 0);
3244
3245 builder.update(
3246 Price::from("100.00"),
3247 Quantity::from(1),
3248 UnixNanos::from(100),
3249 );
3250 builder.close = Some(Price::from("50.00"));
3251
3252 let bar = builder.build_now();
3253 assert_eq!(bar.low, Price::from("50.00"));
3254 assert_eq!(bar.close, Price::from("50.00"));
3255 assert!(bar.low <= bar.open);
3256 }
3257
3258 #[rstest]
3259 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
3260 let instrument = InstrumentAny::Equity(equity_aapl);
3261 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3262 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3263 let handler = Arc::new(Mutex::new(Vec::new()));
3264 let handler_clone = Arc::clone(&handler);
3265
3266 let mut aggregator = TickBarAggregator::new(
3267 bar_type,
3268 instrument.price_precision(),
3269 instrument.size_precision(),
3270 move |bar: Bar| {
3271 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3272 handler_guard.push(bar);
3273 },
3274 );
3275
3276 let trade = TradeTick::default();
3277 aggregator.handle_trade(trade);
3278
3279 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3280 assert_eq!(handler_guard.len(), 0);
3281 }
3282
3283 #[rstest]
3284 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
3285 let instrument = InstrumentAny::Equity(equity_aapl);
3286 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3287 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3288 let handler = Arc::new(Mutex::new(Vec::new()));
3289 let handler_clone = Arc::clone(&handler);
3290
3291 let mut aggregator = TickBarAggregator::new(
3292 bar_type,
3293 instrument.price_precision(),
3294 instrument.size_precision(),
3295 move |bar: Bar| {
3296 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3297 handler_guard.push(bar);
3298 },
3299 );
3300
3301 let trade = TradeTick::default();
3302 aggregator.handle_trade(trade);
3303 aggregator.handle_trade(trade);
3304 aggregator.handle_trade(trade);
3305
3306 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3307 let bar = handler_guard.first().unwrap();
3308 assert_eq!(handler_guard.len(), 1);
3309 assert_eq!(bar.open, trade.price);
3310 assert_eq!(bar.high, trade.price);
3311 assert_eq!(bar.low, trade.price);
3312 assert_eq!(bar.close, trade.price);
3313 assert_eq!(bar.volume, Quantity::from(300000));
3314 assert_eq!(bar.ts_event, trade.ts_event);
3315 assert_eq!(bar.ts_init, trade.ts_init);
3316 }
3317
3318 #[rstest]
3319 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
3320 let instrument = InstrumentAny::Equity(equity_aapl);
3321 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3322 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3323 let handler = Arc::new(Mutex::new(Vec::new()));
3324 let handler_clone = Arc::clone(&handler);
3325
3326 let mut aggregator = TickBarAggregator::new(
3327 bar_type,
3328 instrument.price_precision(),
3329 instrument.size_precision(),
3330 move |bar: Bar| {
3331 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3332 handler_guard.push(bar);
3333 },
3334 );
3335
3336 aggregator.update(
3337 Price::from("1.00001"),
3338 Quantity::from(1),
3339 UnixNanos::default(),
3340 );
3341 aggregator.update(
3342 Price::from("1.00002"),
3343 Quantity::from(1),
3344 UnixNanos::from(1000),
3345 );
3346 aggregator.update(
3347 Price::from("1.00003"),
3348 Quantity::from(1),
3349 UnixNanos::from(2000),
3350 );
3351
3352 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3353 assert_eq!(handler_guard.len(), 1);
3354
3355 let bar = handler_guard.first().unwrap();
3356 assert_eq!(bar.open, Price::from("1.00001"));
3357 assert_eq!(bar.high, Price::from("1.00003"));
3358 assert_eq!(bar.low, Price::from("1.00001"));
3359 assert_eq!(bar.close, Price::from("1.00003"));
3360 assert_eq!(bar.volume, Quantity::from(3));
3361 }
3362
3363 #[rstest]
3364 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
3365 let instrument = InstrumentAny::Equity(equity_aapl);
3366 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
3367 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3368 let handler = Arc::new(Mutex::new(Vec::new()));
3369 let handler_clone = Arc::clone(&handler);
3370
3371 let mut aggregator = TickBarAggregator::new(
3372 bar_type,
3373 instrument.price_precision(),
3374 instrument.size_precision(),
3375 move |bar: Bar| {
3376 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3377 handler_guard.push(bar);
3378 },
3379 );
3380
3381 aggregator.update(
3382 Price::from("1.00001"),
3383 Quantity::from(1),
3384 UnixNanos::default(),
3385 );
3386 aggregator.update(
3387 Price::from("1.00002"),
3388 Quantity::from(1),
3389 UnixNanos::from(1000),
3390 );
3391 aggregator.update(
3392 Price::from("1.00003"),
3393 Quantity::from(1),
3394 UnixNanos::from(2000),
3395 );
3396 aggregator.update(
3397 Price::from("1.00004"),
3398 Quantity::from(1),
3399 UnixNanos::from(3000),
3400 );
3401
3402 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3403 assert_eq!(handler_guard.len(), 2);
3404
3405 let bar1 = &handler_guard[0];
3406 assert_eq!(bar1.open, Price::from("1.00001"));
3407 assert_eq!(bar1.close, Price::from("1.00002"));
3408 assert_eq!(bar1.volume, Quantity::from(2));
3409
3410 let bar2 = &handler_guard[1];
3411 assert_eq!(bar2.open, Price::from("1.00003"));
3412 assert_eq!(bar2.close, Price::from("1.00004"));
3413 assert_eq!(bar2.volume, Quantity::from(2));
3414 }
3415
3416 #[rstest]
3417 fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
3418 let instrument = InstrumentAny::Equity(equity_aapl);
3419 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3420 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3421 let handler = Arc::new(Mutex::new(Vec::new()));
3422 let handler_clone = Arc::clone(&handler);
3423
3424 let mut aggregator = TickImbalanceBarAggregator::new(
3425 bar_type,
3426 instrument.price_precision(),
3427 instrument.size_precision(),
3428 move |bar: Bar| {
3429 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3430 handler_guard.push(bar);
3431 },
3432 );
3433
3434 let trade = TradeTick::default();
3435 aggregator.handle_trade(trade);
3436 aggregator.handle_trade(trade);
3437
3438 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3439 assert_eq!(handler_guard.len(), 1);
3440 let bar = handler_guard.first().unwrap();
3441 assert_eq!(bar.volume, Quantity::from(200000));
3442 }
3443
3444 #[rstest]
3445 fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
3446 let instrument = InstrumentAny::Equity(equity_aapl);
3447 let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
3448 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3449 let handler = Arc::new(Mutex::new(Vec::new()));
3450 let handler_clone = Arc::clone(&handler);
3451
3452 let mut aggregator = TickImbalanceBarAggregator::new(
3453 bar_type,
3454 instrument.price_precision(),
3455 instrument.size_precision(),
3456 move |bar: Bar| {
3457 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3458 handler_guard.push(bar);
3459 },
3460 );
3461
3462 let sell = TradeTick {
3463 aggressor_side: AggressorSide::Seller,
3464 ..TradeTick::default()
3465 };
3466
3467 aggregator.handle_trade(sell);
3468
3469 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3470 assert_eq!(handler_guard.len(), 1);
3471 }
3472
3473 #[rstest]
3474 fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
3475 let instrument = InstrumentAny::Equity(equity_aapl);
3476 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3477 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3478 let handler = Arc::new(Mutex::new(Vec::new()));
3479 let handler_clone = Arc::clone(&handler);
3480
3481 let mut aggregator = TickRunsBarAggregator::new(
3482 bar_type,
3483 instrument.price_precision(),
3484 instrument.size_precision(),
3485 move |bar: Bar| {
3486 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3487 handler_guard.push(bar);
3488 },
3489 );
3490
3491 let buy = TradeTick::default();
3492 let sell = TradeTick {
3493 aggressor_side: AggressorSide::Seller,
3494 ..buy
3495 };
3496
3497 aggregator.handle_trade(buy);
3498 aggregator.handle_trade(buy);
3499 aggregator.handle_trade(sell);
3500 aggregator.handle_trade(sell);
3501
3502 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3503 assert_eq!(handler_guard.len(), 2);
3504 }
3505
3506 #[rstest]
3507 fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
3508 let instrument = InstrumentAny::Equity(equity_aapl);
3509 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3510 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3511 let handler = Arc::new(Mutex::new(Vec::new()));
3512 let handler_clone = Arc::clone(&handler);
3513
3514 let mut aggregator = TickRunsBarAggregator::new(
3515 bar_type,
3516 instrument.price_precision(),
3517 instrument.size_precision(),
3518 move |bar: Bar| {
3519 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3520 handler_guard.push(bar);
3521 },
3522 );
3523
3524 let buy = TradeTick {
3525 size: Quantity::from(1),
3526 ..TradeTick::default()
3527 };
3528 let sell = TradeTick {
3529 aggressor_side: AggressorSide::Seller,
3530 size: Quantity::from(1),
3531 ..buy
3532 };
3533
3534 aggregator.handle_trade(buy);
3535 aggregator.handle_trade(buy);
3536 aggregator.handle_trade(sell);
3537 aggregator.handle_trade(sell);
3538
3539 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3540 assert_eq!(handler_guard.len(), 2);
3541 assert_eq!(handler_guard[0].volume, Quantity::from(2));
3542 assert_eq!(handler_guard[1].volume, Quantity::from(2));
3543 }
3544
3545 #[rstest]
3546 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
3547 let instrument = InstrumentAny::Equity(equity_aapl);
3548 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3549 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3550 let handler = Arc::new(Mutex::new(Vec::new()));
3551 let handler_clone = Arc::clone(&handler);
3552
3553 let mut aggregator = VolumeBarAggregator::new(
3554 bar_type,
3555 instrument.price_precision(),
3556 instrument.size_precision(),
3557 move |bar: Bar| {
3558 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3559 handler_guard.push(bar);
3560 },
3561 );
3562
3563 aggregator.update(
3564 Price::from("1.00001"),
3565 Quantity::from(25),
3566 UnixNanos::default(),
3567 );
3568
3569 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3570 assert_eq!(handler_guard.len(), 2);
3571 let bar1 = &handler_guard[0];
3572 assert_eq!(bar1.volume, Quantity::from(10));
3573 let bar2 = &handler_guard[1];
3574 assert_eq!(bar2.volume, Quantity::from(10));
3575 }
3576
3577 #[rstest]
3578 fn test_volume_bar_aggregator_zero_size_update_is_noop(equity_aapl: Equity) {
3579 let instrument = InstrumentAny::Equity(equity_aapl);
3580 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3581 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3582 let handler = Arc::new(Mutex::new(Vec::new()));
3583 let handler_clone = Arc::clone(&handler);
3584
3585 let mut aggregator = VolumeBarAggregator::new(
3586 bar_type,
3587 instrument.price_precision(),
3588 instrument.size_precision(),
3589 move |bar: Bar| {
3590 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3591 handler_guard.push(bar);
3592 },
3593 );
3594
3595 aggregator.update(
3596 Price::from("100.00"),
3597 Quantity::from(0),
3598 UnixNanos::default(),
3599 );
3600
3601 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3602 assert_eq!(handler_guard.len(), 0);
3603 }
3604
3605 #[rstest]
3606 fn test_volume_bar_aggregator_exact_threshold_emits_single_bar(equity_aapl: Equity) {
3607 let instrument = InstrumentAny::Equity(equity_aapl);
3608 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3609 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3610 let handler = Arc::new(Mutex::new(Vec::new()));
3611 let handler_clone = Arc::clone(&handler);
3612
3613 let mut aggregator = VolumeBarAggregator::new(
3614 bar_type,
3615 instrument.price_precision(),
3616 instrument.size_precision(),
3617 move |bar: Bar| {
3618 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3619 handler_guard.push(bar);
3620 },
3621 );
3622
3623 aggregator.update(
3624 Price::from("100.00"),
3625 Quantity::from(7),
3626 UnixNanos::from(1_000),
3627 );
3628 aggregator.update(
3629 Price::from("101.00"),
3630 Quantity::from(3),
3631 UnixNanos::from(2_000),
3632 );
3633
3634 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3635 assert_eq!(handler_guard.len(), 1);
3636 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3637 assert_eq!(handler_guard[0].close, Price::from("101.00"));
3638 }
3639
3640 #[rstest]
3641 fn test_volume_bar_aggregator_step_of_one_emits_per_unit(equity_aapl: Equity) {
3642 let instrument = InstrumentAny::Equity(equity_aapl);
3643 let bar_spec = BarSpecification::new(1, BarAggregation::Volume, PriceType::Last);
3644 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3645 let handler = Arc::new(Mutex::new(Vec::new()));
3646 let handler_clone = Arc::clone(&handler);
3647
3648 let mut aggregator = VolumeBarAggregator::new(
3649 bar_type,
3650 instrument.price_precision(),
3651 instrument.size_precision(),
3652 move |bar: Bar| {
3653 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3654 handler_guard.push(bar);
3655 },
3656 );
3657
3658 aggregator.update(
3659 Price::from("100.00"),
3660 Quantity::from(1),
3661 UnixNanos::default(),
3662 );
3663
3664 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3665 assert_eq!(handler_guard.len(), 1);
3666 assert_eq!(handler_guard[0].volume, Quantity::from(1));
3667 }
3668
3669 #[rstest]
3670 fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
3671 let instrument = InstrumentAny::Equity(equity_aapl);
3672 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
3673 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3674 let handler = Arc::new(Mutex::new(Vec::new()));
3675 let handler_clone = Arc::clone(&handler);
3676
3677 let mut aggregator = VolumeRunsBarAggregator::new(
3678 bar_type,
3679 instrument.price_precision(),
3680 instrument.size_precision(),
3681 move |bar: Bar| {
3682 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3683 handler_guard.push(bar);
3684 },
3685 );
3686
3687 let buy = TradeTick {
3688 instrument_id: instrument.id(),
3689 price: Price::from("1.0"),
3690 size: Quantity::from(1),
3691 ..TradeTick::default()
3692 };
3693 let sell = TradeTick {
3694 aggressor_side: AggressorSide::Seller,
3695 ..buy
3696 };
3697
3698 aggregator.handle_trade(buy);
3699 aggregator.handle_trade(buy); aggregator.handle_trade(sell);
3701 aggregator.handle_trade(sell); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3704 assert!(handler_guard.len() >= 2);
3705 assert!(
3706 (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
3707 < f64::EPSILON
3708 );
3709 }
3710
3711 #[rstest]
3712 fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
3713 let instrument = InstrumentAny::Equity(equity_aapl);
3714 let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
3715 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3716 let handler = Arc::new(Mutex::new(Vec::new()));
3717 let handler_clone = Arc::clone(&handler);
3718
3719 let mut aggregator = VolumeRunsBarAggregator::new(
3720 bar_type,
3721 instrument.price_precision(),
3722 instrument.size_precision(),
3723 move |bar: Bar| {
3724 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3725 handler_guard.push(bar);
3726 },
3727 );
3728
3729 let trade = TradeTick {
3730 instrument_id: instrument.id(),
3731 price: Price::from("1.0"),
3732 size: Quantity::from(5),
3733 ..TradeTick::default()
3734 };
3735
3736 aggregator.handle_trade(trade);
3737
3738 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3739 assert!(!handler_guard.is_empty());
3740 assert!(handler_guard[0].volume.as_f64() > 0.0);
3741 assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
3742 }
3743
3744 #[rstest]
3745 fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
3746 let instrument = InstrumentAny::Equity(equity_aapl);
3747 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
3748 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3749 let handler = Arc::new(Mutex::new(Vec::new()));
3750 let handler_clone = Arc::clone(&handler);
3751
3752 let mut aggregator = VolumeImbalanceBarAggregator::new(
3753 bar_type,
3754 instrument.price_precision(),
3755 instrument.size_precision(),
3756 move |bar: Bar| {
3757 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3758 handler_guard.push(bar);
3759 },
3760 );
3761
3762 let trade_small = TradeTick {
3763 instrument_id: instrument.id(),
3764 price: Price::from("1.0"),
3765 size: Quantity::from(1),
3766 ..TradeTick::default()
3767 };
3768 let trade_large = TradeTick {
3769 size: Quantity::from(3),
3770 ..trade_small
3771 };
3772
3773 aggregator.handle_trade(trade_small);
3774 aggregator.handle_trade(trade_large);
3775
3776 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3777 assert_eq!(handler_guard.len(), 2);
3778 let total_output = handler_guard
3779 .iter()
3780 .map(|bar| bar.volume.as_f64())
3781 .sum::<f64>();
3782 let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
3783 assert!((total_output - total_input).abs() < f64::EPSILON);
3784 }
3785
3786 #[rstest]
3787 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
3788 let instrument = InstrumentAny::Equity(equity_aapl);
3789 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3791 let handler = Arc::new(Mutex::new(Vec::new()));
3792 let handler_clone = Arc::clone(&handler);
3793
3794 let mut aggregator = ValueBarAggregator::new(
3795 bar_type,
3796 instrument.price_precision(),
3797 instrument.size_precision(),
3798 move |bar: Bar| {
3799 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3800 handler_guard.push(bar);
3801 },
3802 );
3803
3804 aggregator.update(
3806 Price::from("100.00"),
3807 Quantity::from(5),
3808 UnixNanos::default(),
3809 );
3810 aggregator.update(
3811 Price::from("100.00"),
3812 Quantity::from(5),
3813 UnixNanos::from(1000),
3814 );
3815
3816 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3817 assert_eq!(handler_guard.len(), 1);
3818 let bar = handler_guard.first().unwrap();
3819 assert_eq!(bar.volume, Quantity::from(10));
3820 }
3821
3822 #[rstest]
3823 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
3824 let instrument = InstrumentAny::Equity(equity_aapl);
3825 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3826 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3827 let handler = Arc::new(Mutex::new(Vec::new()));
3828 let handler_clone = Arc::clone(&handler);
3829
3830 let mut aggregator = ValueBarAggregator::new(
3831 bar_type,
3832 instrument.price_precision(),
3833 instrument.size_precision(),
3834 move |bar: Bar| {
3835 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3836 handler_guard.push(bar);
3837 },
3838 );
3839
3840 aggregator.update(
3842 Price::from("100.00"),
3843 Quantity::from(25),
3844 UnixNanos::default(),
3845 );
3846
3847 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3848 assert_eq!(handler_guard.len(), 2);
3849 let remaining_value = aggregator.get_cumulative_value();
3850 assert!(remaining_value < 1000.0); }
3852
3853 #[rstest]
3854 fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
3855 let instrument = InstrumentAny::Equity(equity_aapl);
3856 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3857 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3858 let handler = Arc::new(Mutex::new(Vec::new()));
3859 let handler_clone = Arc::clone(&handler);
3860
3861 let mut aggregator = ValueBarAggregator::new(
3862 bar_type,
3863 instrument.price_precision(),
3864 instrument.size_precision(),
3865 move |bar: Bar| {
3866 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3867 handler_guard.push(bar);
3868 },
3869 );
3870
3871 aggregator.update(
3873 Price::from("0.00"),
3874 Quantity::from(100),
3875 UnixNanos::default(),
3876 );
3877
3878 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3880 assert_eq!(handler_guard.len(), 0);
3881
3882 assert_eq!(aggregator.get_cumulative_value(), 0.0);
3884 }
3885
3886 #[rstest]
3887 fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
3888 let instrument = InstrumentAny::Equity(equity_aapl);
3889 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3890 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3891 let handler = Arc::new(Mutex::new(Vec::new()));
3892 let handler_clone = Arc::clone(&handler);
3893
3894 let mut aggregator = ValueBarAggregator::new(
3895 bar_type,
3896 instrument.price_precision(),
3897 instrument.size_precision(),
3898 move |bar: Bar| {
3899 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3900 handler_guard.push(bar);
3901 },
3902 );
3903
3904 aggregator.update(
3906 Price::from("100.00"),
3907 Quantity::from(0),
3908 UnixNanos::default(),
3909 );
3910
3911 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3913 assert_eq!(handler_guard.len(), 0);
3914
3915 assert_eq!(aggregator.get_cumulative_value(), 0.0);
3917 }
3918
3919 #[rstest]
3920 fn test_value_bar_aggregator_exact_threshold_emits_one_bar(equity_aapl: Equity) {
3921 let instrument = InstrumentAny::Equity(equity_aapl);
3922 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3923 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3924 let handler = Arc::new(Mutex::new(Vec::new()));
3925 let handler_clone = Arc::clone(&handler);
3926
3927 let mut aggregator = ValueBarAggregator::new(
3928 bar_type,
3929 instrument.price_precision(),
3930 instrument.size_precision(),
3931 move |bar: Bar| {
3932 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3933 handler_guard.push(bar);
3934 },
3935 );
3936
3937 aggregator.update(
3938 Price::from("100.00"),
3939 Quantity::from(5),
3940 UnixNanos::from(1_000),
3941 );
3942 aggregator.update(
3943 Price::from("100.00"),
3944 Quantity::from(5),
3945 UnixNanos::from(2_000),
3946 );
3947
3948 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3949 assert_eq!(handler_guard.len(), 1);
3950 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3951 assert_eq!(aggregator.get_cumulative_value(), 0.0);
3952 }
3953
3954 #[rstest]
3955 fn test_value_bar_aggregator_precision_boundary_min_size_clamp(equity_aapl: Equity) {
3956 let instrument = InstrumentAny::Equity(equity_aapl);
3960 let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
3961 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3962 let handler = Arc::new(Mutex::new(Vec::new()));
3963 let handler_clone = Arc::clone(&handler);
3964
3965 let mut aggregator = ValueBarAggregator::new(
3966 bar_type,
3967 instrument.price_precision(),
3968 instrument.size_precision(),
3969 move |bar: Bar| {
3970 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3971 handler_guard.push(bar);
3972 },
3973 );
3974
3975 aggregator.update(
3977 Price::from("100.00"),
3978 Quantity::from(4),
3979 UnixNanos::default(),
3980 );
3981
3982 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3983 assert_eq!(handler_guard.len(), 4);
3984 for bar in handler_guard.iter() {
3985 assert_eq!(bar.volume, Quantity::from(1));
3986 }
3987 }
3988
3989 #[rstest]
3990 fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
3991 let instrument = InstrumentAny::Equity(equity_aapl);
3992 let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
3993 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3994 let handler = Arc::new(Mutex::new(Vec::new()));
3995 let handler_clone = Arc::clone(&handler);
3996
3997 let mut aggregator = ValueImbalanceBarAggregator::new(
3998 bar_type,
3999 instrument.price_precision(),
4000 instrument.size_precision(),
4001 move |bar: Bar| {
4002 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4003 handler_guard.push(bar);
4004 },
4005 );
4006
4007 let buy = TradeTick {
4008 price: Price::from("5.0"),
4009 size: Quantity::from(2), instrument_id: instrument.id(),
4011 ..TradeTick::default()
4012 };
4013 let sell = TradeTick {
4014 price: Price::from("5.0"),
4015 size: Quantity::from(2), aggressor_side: AggressorSide::Seller,
4017 instrument_id: instrument.id(),
4018 ..buy
4019 };
4020
4021 aggregator.handle_trade(buy);
4022 aggregator.handle_trade(sell);
4023
4024 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4025 assert_eq!(handler_guard.len(), 2);
4026 }
4027
4028 #[rstest]
4029 fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
4030 let instrument = InstrumentAny::Equity(equity_aapl);
4031 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4032 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4033 let handler = Arc::new(Mutex::new(Vec::new()));
4034 let handler_clone = Arc::clone(&handler);
4035
4036 let mut aggregator = ValueRunsBarAggregator::new(
4037 bar_type,
4038 instrument.price_precision(),
4039 instrument.size_precision(),
4040 move |bar: Bar| {
4041 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4042 handler_guard.push(bar);
4043 },
4044 );
4045
4046 let trade = TradeTick {
4047 price: Price::from("10.0"),
4048 size: Quantity::from(5),
4049 instrument_id: instrument.id(),
4050 ..TradeTick::default()
4051 };
4052
4053 aggregator.handle_trade(trade);
4054 aggregator.handle_trade(trade);
4055
4056 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4057 assert_eq!(handler_guard.len(), 1);
4058 let bar = handler_guard.first().unwrap();
4059 assert_eq!(bar.volume, Quantity::from(10));
4060 }
4061
4062 #[rstest]
4063 fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
4064 let instrument = InstrumentAny::Equity(equity_aapl);
4065 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4066 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4067 let handler = Arc::new(Mutex::new(Vec::new()));
4068 let handler_clone = Arc::clone(&handler);
4069
4070 let mut aggregator = ValueRunsBarAggregator::new(
4071 bar_type,
4072 instrument.price_precision(),
4073 instrument.size_precision(),
4074 move |bar: Bar| {
4075 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4076 handler_guard.push(bar);
4077 },
4078 );
4079
4080 let buy = TradeTick {
4081 price: Price::from("10.0"),
4082 size: Quantity::from(5),
4083 instrument_id: instrument.id(),
4084 ..TradeTick::default()
4085 }; let sell = TradeTick {
4087 price: Price::from("10.0"),
4088 size: Quantity::from(10),
4089 aggressor_side: AggressorSide::Seller,
4090 ..buy
4091 }; aggregator.handle_trade(buy);
4094 aggregator.handle_trade(sell);
4095
4096 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4097 assert_eq!(handler_guard.len(), 1);
4098 assert_eq!(handler_guard[0].volume, Quantity::from(10));
4099 }
4100
4101 #[rstest]
4102 fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4103 let instrument = InstrumentAny::Equity(equity_aapl);
4104 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
4105 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4106 let handler = Arc::new(Mutex::new(Vec::new()));
4107 let handler_clone = Arc::clone(&handler);
4108
4109 let mut aggregator = TickRunsBarAggregator::new(
4110 bar_type,
4111 instrument.price_precision(),
4112 instrument.size_precision(),
4113 move |bar: Bar| {
4114 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4115 handler_guard.push(bar);
4116 },
4117 );
4118
4119 let buy = TradeTick::default();
4120
4121 aggregator.handle_trade(buy);
4122 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4127 assert_eq!(handler_guard.len(), 2);
4128 }
4129
4130 #[rstest]
4131 fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
4132 let instrument = InstrumentAny::Equity(equity_aapl);
4133 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
4134 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4135 let handler = Arc::new(Mutex::new(Vec::new()));
4136 let handler_clone = Arc::clone(&handler);
4137
4138 let mut aggregator = TickRunsBarAggregator::new(
4139 bar_type,
4140 instrument.price_precision(),
4141 instrument.size_precision(),
4142 move |bar: Bar| {
4143 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4144 handler_guard.push(bar);
4145 },
4146 );
4147
4148 let buy = TradeTick::default();
4149 let no_aggressor = TradeTick {
4150 aggressor_side: AggressorSide::NoAggressor,
4151 ..buy
4152 };
4153
4154 aggregator.handle_trade(buy);
4155 aggregator.handle_trade(no_aggressor); aggregator.handle_trade(no_aggressor); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4160 assert_eq!(handler_guard.len(), 1);
4161 }
4162
4163 #[rstest]
4164 fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4165 let instrument = InstrumentAny::Equity(equity_aapl);
4166 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
4167 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4168 let handler = Arc::new(Mutex::new(Vec::new()));
4169 let handler_clone = Arc::clone(&handler);
4170
4171 let mut aggregator = VolumeRunsBarAggregator::new(
4172 bar_type,
4173 instrument.price_precision(),
4174 instrument.size_precision(),
4175 move |bar: Bar| {
4176 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4177 handler_guard.push(bar);
4178 },
4179 );
4180
4181 let buy = TradeTick {
4182 instrument_id: instrument.id(),
4183 price: Price::from("1.0"),
4184 size: Quantity::from(1),
4185 ..TradeTick::default()
4186 };
4187
4188 aggregator.handle_trade(buy);
4189 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4194 assert_eq!(handler_guard.len(), 2);
4195 assert_eq!(handler_guard[0].volume, Quantity::from(2));
4196 assert_eq!(handler_guard[1].volume, Quantity::from(2));
4197 }
4198
4199 #[rstest]
4200 fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4201 let instrument = InstrumentAny::Equity(equity_aapl);
4202 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4203 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4204 let handler = Arc::new(Mutex::new(Vec::new()));
4205 let handler_clone = Arc::clone(&handler);
4206
4207 let mut aggregator = ValueRunsBarAggregator::new(
4208 bar_type,
4209 instrument.price_precision(),
4210 instrument.size_precision(),
4211 move |bar: Bar| {
4212 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4213 handler_guard.push(bar);
4214 },
4215 );
4216
4217 let buy = TradeTick {
4218 instrument_id: instrument.id(),
4219 price: Price::from("10.0"),
4220 size: Quantity::from(5),
4221 ..TradeTick::default()
4222 }; aggregator.handle_trade(buy);
4225 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4230 assert_eq!(handler_guard.len(), 2);
4231 assert_eq!(handler_guard[0].volume, Quantity::from(10));
4232 assert_eq!(handler_guard[1].volume, Quantity::from(10));
4233 }
4234
4235 #[rstest]
4236 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
4237 let instrument = InstrumentAny::Equity(equity_aapl);
4238 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4240 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4241 let handler = Arc::new(Mutex::new(Vec::new()));
4242 let handler_clone = Arc::clone(&handler);
4243 let clock = Rc::new(RefCell::new(TestClock::new()));
4244
4245 let mut aggregator = TimeBarAggregator::new(
4246 bar_type,
4247 instrument.price_precision(),
4248 instrument.size_precision(),
4249 clock.clone(),
4250 move |bar: Bar| {
4251 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4252 handler_guard.push(bar);
4253 },
4254 true, false, BarIntervalType::LeftOpen,
4257 None, 15, false, );
4261
4262 aggregator.update(
4263 Price::from("100.00"),
4264 Quantity::from(1),
4265 UnixNanos::default(),
4266 );
4267
4268 let next_sec = UnixNanos::from(1_000_000_000);
4269 clock.borrow_mut().set_time(next_sec);
4270
4271 let event = TimeEvent::new(
4272 Ustr::from("1-SECOND-LAST"),
4273 UUID4::new(),
4274 next_sec,
4275 next_sec,
4276 );
4277 aggregator.build_bar(&event);
4278
4279 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4280 assert_eq!(handler_guard.len(), 1);
4281 let bar = handler_guard.first().unwrap();
4282 assert_eq!(bar.ts_event, UnixNanos::default());
4283 assert_eq!(bar.ts_init, next_sec);
4284 }
4285
4286 #[rstest]
4287 fn test_time_bar_aggregator_stop_clears_timer_and_allows_restart(equity_aapl: Equity) {
4288 let instrument = InstrumentAny::Equity(equity_aapl);
4289 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4290 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4291 let timer_name = format!("TIME_BAR_{bar_type}");
4292 let clock = Rc::new(RefCell::new(TestClock::new()));
4293
4294 let aggregator = TimeBarAggregator::new(
4295 bar_type,
4296 instrument.price_precision(),
4297 instrument.size_precision(),
4298 clock.clone(),
4299 |_bar: Bar| {},
4300 true,
4301 false,
4302 BarIntervalType::LeftOpen,
4303 None,
4304 15,
4305 false,
4306 );
4307
4308 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
4309 let rc = Rc::new(RefCell::new(boxed));
4310
4311 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
4312 assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
4313
4314 rc.borrow_mut().stop();
4315 assert!(clock.borrow().timer_names().is_empty());
4316
4317 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
4318 assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
4319 }
4320
4321 #[rstest]
4322 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
4323 let instrument = InstrumentAny::Equity(equity_aapl);
4324 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4325 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4326 let handler = Arc::new(Mutex::new(Vec::new()));
4327 let handler_clone = Arc::clone(&handler);
4328 let clock = Rc::new(RefCell::new(TestClock::new()));
4329
4330 let mut aggregator = TimeBarAggregator::new(
4331 bar_type,
4332 instrument.price_precision(),
4333 instrument.size_precision(),
4334 clock.clone(),
4335 move |bar: Bar| {
4336 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4337 handler_guard.push(bar);
4338 },
4339 true, true, BarIntervalType::LeftOpen,
4342 None,
4343 15,
4344 false, );
4346
4347 aggregator.update(
4349 Price::from("100.00"),
4350 Quantity::from(1),
4351 UnixNanos::default(),
4352 );
4353
4354 let ts1 = UnixNanos::from(1_000_000_000);
4356 clock.borrow_mut().set_time(ts1);
4357 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4358 aggregator.build_bar(&event);
4359
4360 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4362
4363 let ts2 = UnixNanos::from(2_000_000_000);
4365 clock.borrow_mut().set_time(ts2);
4366 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4367 aggregator.build_bar(&event);
4368
4369 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4370 assert_eq!(handler_guard.len(), 2);
4371
4372 let bar1 = &handler_guard[0];
4373 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
4375 assert_eq!(bar1.close, Price::from("100.00"));
4376 let bar2 = &handler_guard[1];
4377 assert_eq!(bar2.ts_event, ts2);
4378 assert_eq!(bar2.ts_init, ts2);
4379 assert_eq!(bar2.close, Price::from("101.00"));
4380 }
4381
4382 #[rstest]
4383 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
4384 let instrument = InstrumentAny::Equity(equity_aapl);
4385 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4386 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4387 let handler = Arc::new(Mutex::new(Vec::new()));
4388 let handler_clone = Arc::clone(&handler);
4389 let clock = Rc::new(RefCell::new(TestClock::new()));
4390 let mut aggregator = TimeBarAggregator::new(
4391 bar_type,
4392 instrument.price_precision(),
4393 instrument.size_precision(),
4394 clock.clone(),
4395 move |bar: Bar| {
4396 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4397 handler_guard.push(bar);
4398 },
4399 true, true, BarIntervalType::RightOpen,
4402 None,
4403 15,
4404 false, );
4406
4407 aggregator.update(
4409 Price::from("100.00"),
4410 Quantity::from(1),
4411 UnixNanos::default(),
4412 );
4413
4414 let ts1 = UnixNanos::from(1_000_000_000);
4416 clock.borrow_mut().set_time(ts1);
4417 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4418 aggregator.build_bar(&event);
4419
4420 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4422
4423 let ts2 = UnixNanos::from(2_000_000_000);
4425 clock.borrow_mut().set_time(ts2);
4426 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4427 aggregator.build_bar(&event);
4428
4429 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4430 assert_eq!(handler_guard.len(), 2);
4431
4432 let bar1 = &handler_guard[0];
4433 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
4435 assert_eq!(bar1.close, Price::from("100.00"));
4436
4437 let bar2 = &handler_guard[1];
4438 assert_eq!(bar2.ts_event, ts1);
4439 assert_eq!(bar2.ts_init, ts2);
4440 assert_eq!(bar2.close, Price::from("101.00"));
4441 }
4442
4443 #[rstest]
4444 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
4445 let instrument = InstrumentAny::Equity(equity_aapl);
4446 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4447 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4448 let handler = Arc::new(Mutex::new(Vec::new()));
4449 let handler_clone = Arc::clone(&handler);
4450 let clock = Rc::new(RefCell::new(TestClock::new()));
4451
4452 let mut aggregator = TimeBarAggregator::new(
4454 bar_type,
4455 instrument.price_precision(),
4456 instrument.size_precision(),
4457 clock.clone(),
4458 move |bar: Bar| {
4459 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4460 handler_guard.push(bar);
4461 },
4462 false, true, BarIntervalType::LeftOpen,
4465 None,
4466 15,
4467 false, );
4469
4470 let ts1 = UnixNanos::from(1_000_000_000);
4472 clock.borrow_mut().set_time(ts1);
4473 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4474 aggregator.build_bar(&event);
4475
4476 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4477 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
4479
4480 let handler = Arc::new(Mutex::new(Vec::new()));
4482 let handler_clone = Arc::clone(&handler);
4483 let mut aggregator = TimeBarAggregator::new(
4484 bar_type,
4485 instrument.price_precision(),
4486 instrument.size_precision(),
4487 clock.clone(),
4488 move |bar: Bar| {
4489 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4490 handler_guard.push(bar);
4491 },
4492 true, true, BarIntervalType::LeftOpen,
4495 None,
4496 15,
4497 false, );
4499
4500 aggregator.update(
4501 Price::from("100.00"),
4502 Quantity::from(1),
4503 UnixNanos::default(),
4504 );
4505
4506 let ts1 = UnixNanos::from(1_000_000_000);
4508 clock.borrow_mut().set_time(ts1);
4509 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4510 aggregator.build_bar(&event);
4511
4512 let ts2 = UnixNanos::from(2_000_000_000);
4514 clock.borrow_mut().set_time(ts2);
4515 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4516 aggregator.build_bar(&event);
4517
4518 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4519 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
4521 assert_eq!(bar1.close, Price::from("100.00"));
4522 let bar2 = &handler_guard[1];
4523 assert_eq!(bar2.close, Price::from("100.00")); }
4525
4526 #[rstest]
4527 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
4528 let instrument = InstrumentAny::Equity(equity_aapl);
4529 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4530 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4531 let clock = Rc::new(RefCell::new(TestClock::new()));
4532 let handler = Arc::new(Mutex::new(Vec::new()));
4533 let handler_clone = Arc::clone(&handler);
4534
4535 let mut aggregator = TimeBarAggregator::new(
4536 bar_type,
4537 instrument.price_precision(),
4538 instrument.size_precision(),
4539 clock.clone(),
4540 move |bar: Bar| {
4541 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4542 handler_guard.push(bar);
4543 },
4544 true, true, BarIntervalType::RightOpen,
4547 None,
4548 15,
4549 false, );
4551
4552 let ts1 = UnixNanos::from(1_000_000_000);
4553 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
4554
4555 let ts2 = UnixNanos::from(2_000_000_000);
4556 clock.borrow_mut().set_time(ts2);
4557
4558 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4560 aggregator.build_bar(&event);
4561
4562 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4563 let bar = handler_guard.first().unwrap();
4564 assert_eq!(bar.ts_event, UnixNanos::default());
4565 assert_eq!(bar.ts_init, ts2);
4566 }
4567
4568 #[rstest]
4569 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
4570 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4571 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4573 let handler = Arc::new(Mutex::new(Vec::new()));
4574 let handler_clone = Arc::clone(&handler);
4575
4576 let aggregator = RenkoBarAggregator::new(
4577 bar_type,
4578 instrument.price_precision(),
4579 instrument.size_precision(),
4580 instrument.price_increment(),
4581 move |bar: Bar| {
4582 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4583 handler_guard.push(bar);
4584 },
4585 );
4586
4587 assert_eq!(aggregator.bar_type(), bar_type);
4588 assert!(!aggregator.is_running());
4589 let expected_brick_size = 10 * instrument.price_increment().raw;
4591 assert_eq!(aggregator.brick_size, expected_brick_size);
4592 }
4593
4594 #[rstest]
4595 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
4596 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4597 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4599 let handler = Arc::new(Mutex::new(Vec::new()));
4600 let handler_clone = Arc::clone(&handler);
4601
4602 let mut aggregator = RenkoBarAggregator::new(
4603 bar_type,
4604 instrument.price_precision(),
4605 instrument.size_precision(),
4606 instrument.price_increment(),
4607 move |bar: Bar| {
4608 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4609 handler_guard.push(bar);
4610 },
4611 );
4612
4613 aggregator.update(
4615 Price::from("1.00000"),
4616 Quantity::from(1),
4617 UnixNanos::default(),
4618 );
4619 aggregator.update(
4620 Price::from("1.00005"),
4621 Quantity::from(1),
4622 UnixNanos::from(1000),
4623 );
4624
4625 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4626 assert_eq!(handler_guard.len(), 0); }
4628
4629 #[rstest]
4630 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
4631 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4632 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4634 let handler = Arc::new(Mutex::new(Vec::new()));
4635 let handler_clone = Arc::clone(&handler);
4636
4637 let mut aggregator = RenkoBarAggregator::new(
4638 bar_type,
4639 instrument.price_precision(),
4640 instrument.size_precision(),
4641 instrument.price_increment(),
4642 move |bar: Bar| {
4643 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4644 handler_guard.push(bar);
4645 },
4646 );
4647
4648 aggregator.update(
4650 Price::from("1.00000"),
4651 Quantity::from(1),
4652 UnixNanos::default(),
4653 );
4654 aggregator.update(
4655 Price::from("1.00015"),
4656 Quantity::from(1),
4657 UnixNanos::from(1000),
4658 );
4659
4660 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4661 assert_eq!(handler_guard.len(), 1);
4662
4663 let bar = handler_guard.first().unwrap();
4664 assert_eq!(bar.open, Price::from("1.00000"));
4665 assert_eq!(bar.high, Price::from("1.00010"));
4666 assert_eq!(bar.low, Price::from("1.00000"));
4667 assert_eq!(bar.close, Price::from("1.00010"));
4668 assert_eq!(bar.volume, Quantity::from(2));
4669 assert_eq!(bar.ts_event, UnixNanos::from(1000));
4670 assert_eq!(bar.ts_init, UnixNanos::from(1000));
4671 }
4672
4673 #[rstest]
4674 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
4675 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4676 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4678 let handler = Arc::new(Mutex::new(Vec::new()));
4679 let handler_clone = Arc::clone(&handler);
4680
4681 let mut aggregator = RenkoBarAggregator::new(
4682 bar_type,
4683 instrument.price_precision(),
4684 instrument.size_precision(),
4685 instrument.price_increment(),
4686 move |bar: Bar| {
4687 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4688 handler_guard.push(bar);
4689 },
4690 );
4691
4692 aggregator.update(
4694 Price::from("1.00000"),
4695 Quantity::from(1),
4696 UnixNanos::default(),
4697 );
4698 aggregator.update(
4699 Price::from("1.00025"),
4700 Quantity::from(1),
4701 UnixNanos::from(1000),
4702 );
4703
4704 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4705 assert_eq!(handler_guard.len(), 2);
4706
4707 let bar1 = &handler_guard[0];
4708 assert_eq!(bar1.open, Price::from("1.00000"));
4709 assert_eq!(bar1.high, Price::from("1.00010"));
4710 assert_eq!(bar1.low, Price::from("1.00000"));
4711 assert_eq!(bar1.close, Price::from("1.00010"));
4712
4713 let bar2 = &handler_guard[1];
4714 assert_eq!(bar2.open, Price::from("1.00010"));
4715 assert_eq!(bar2.high, Price::from("1.00020"));
4716 assert_eq!(bar2.low, Price::from("1.00010"));
4717 assert_eq!(bar2.close, Price::from("1.00020"));
4718 }
4719
4720 #[rstest]
4721 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
4722 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4723 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4725 let handler = Arc::new(Mutex::new(Vec::new()));
4726 let handler_clone = Arc::clone(&handler);
4727
4728 let mut aggregator = RenkoBarAggregator::new(
4729 bar_type,
4730 instrument.price_precision(),
4731 instrument.size_precision(),
4732 instrument.price_increment(),
4733 move |bar: Bar| {
4734 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4735 handler_guard.push(bar);
4736 },
4737 );
4738
4739 aggregator.update(
4741 Price::from("1.00020"),
4742 Quantity::from(1),
4743 UnixNanos::default(),
4744 );
4745 aggregator.update(
4746 Price::from("1.00005"),
4747 Quantity::from(1),
4748 UnixNanos::from(1000),
4749 );
4750
4751 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4752 assert_eq!(handler_guard.len(), 1);
4753
4754 let bar = handler_guard.first().unwrap();
4755 assert_eq!(bar.open, Price::from("1.00020"));
4756 assert_eq!(bar.high, Price::from("1.00020"));
4757 assert_eq!(bar.low, Price::from("1.00010"));
4758 assert_eq!(bar.close, Price::from("1.00010"));
4759 assert_eq!(bar.volume, Quantity::from(2));
4760 }
4761
4762 #[rstest]
4763 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
4764 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4765 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4767 let handler = Arc::new(Mutex::new(Vec::new()));
4768 let handler_clone = Arc::clone(&handler);
4769
4770 let mut aggregator = RenkoBarAggregator::new(
4771 bar_type,
4772 instrument.price_precision(),
4773 instrument.size_precision(),
4774 instrument.price_increment(),
4775 move |bar: Bar| {
4776 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4777 handler_guard.push(bar);
4778 },
4779 );
4780
4781 let input_bar = Bar::new(
4783 BarType::new(
4784 instrument.id(),
4785 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4786 AggregationSource::Internal,
4787 ),
4788 Price::from("1.00000"),
4789 Price::from("1.00005"),
4790 Price::from("0.99995"),
4791 Price::from("1.00005"), Quantity::from(100),
4793 UnixNanos::default(),
4794 UnixNanos::from(1000),
4795 );
4796
4797 aggregator.handle_bar(input_bar);
4798
4799 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4800 assert_eq!(handler_guard.len(), 0); }
4802
4803 #[rstest]
4804 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
4805 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4806 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4808 let handler = Arc::new(Mutex::new(Vec::new()));
4809 let handler_clone = Arc::clone(&handler);
4810
4811 let mut aggregator = RenkoBarAggregator::new(
4812 bar_type,
4813 instrument.price_precision(),
4814 instrument.size_precision(),
4815 instrument.price_increment(),
4816 move |bar: Bar| {
4817 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4818 handler_guard.push(bar);
4819 },
4820 );
4821
4822 let bar1 = Bar::new(
4824 BarType::new(
4825 instrument.id(),
4826 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4827 AggregationSource::Internal,
4828 ),
4829 Price::from("1.00000"),
4830 Price::from("1.00005"),
4831 Price::from("0.99995"),
4832 Price::from("1.00000"),
4833 Quantity::from(100),
4834 UnixNanos::default(),
4835 UnixNanos::default(),
4836 );
4837
4838 let bar2 = Bar::new(
4840 BarType::new(
4841 instrument.id(),
4842 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4843 AggregationSource::Internal,
4844 ),
4845 Price::from("1.00000"),
4846 Price::from("1.00015"),
4847 Price::from("0.99995"),
4848 Price::from("1.00010"), Quantity::from(50),
4850 UnixNanos::from(60_000_000_000),
4851 UnixNanos::from(60_000_000_000),
4852 );
4853
4854 aggregator.handle_bar(bar1);
4855 aggregator.handle_bar(bar2);
4856
4857 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4858 assert_eq!(handler_guard.len(), 1);
4859
4860 let bar = handler_guard.first().unwrap();
4861 assert_eq!(bar.open, Price::from("1.00000"));
4862 assert_eq!(bar.high, Price::from("1.00010"));
4863 assert_eq!(bar.low, Price::from("1.00000"));
4864 assert_eq!(bar.close, Price::from("1.00010"));
4865 assert_eq!(bar.volume, Quantity::from(150));
4866 }
4867
4868 #[rstest]
4869 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
4870 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4871 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4873 let handler = Arc::new(Mutex::new(Vec::new()));
4874 let handler_clone = Arc::clone(&handler);
4875
4876 let mut aggregator = RenkoBarAggregator::new(
4877 bar_type,
4878 instrument.price_precision(),
4879 instrument.size_precision(),
4880 instrument.price_increment(),
4881 move |bar: Bar| {
4882 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4883 handler_guard.push(bar);
4884 },
4885 );
4886
4887 let bar1 = Bar::new(
4889 BarType::new(
4890 instrument.id(),
4891 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4892 AggregationSource::Internal,
4893 ),
4894 Price::from("1.00000"),
4895 Price::from("1.00005"),
4896 Price::from("0.99995"),
4897 Price::from("1.00000"),
4898 Quantity::from(100),
4899 UnixNanos::default(),
4900 UnixNanos::default(),
4901 );
4902
4903 let bar2 = Bar::new(
4905 BarType::new(
4906 instrument.id(),
4907 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4908 AggregationSource::Internal,
4909 ),
4910 Price::from("1.00000"),
4911 Price::from("1.00035"),
4912 Price::from("0.99995"),
4913 Price::from("1.00030"), Quantity::from(50),
4915 UnixNanos::from(60_000_000_000),
4916 UnixNanos::from(60_000_000_000),
4917 );
4918
4919 aggregator.handle_bar(bar1);
4920 aggregator.handle_bar(bar2);
4921
4922 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4923 assert_eq!(handler_guard.len(), 3);
4924
4925 let bar1 = &handler_guard[0];
4926 assert_eq!(bar1.open, Price::from("1.00000"));
4927 assert_eq!(bar1.close, Price::from("1.00010"));
4928
4929 let bar2 = &handler_guard[1];
4930 assert_eq!(bar2.open, Price::from("1.00010"));
4931 assert_eq!(bar2.close, Price::from("1.00020"));
4932
4933 let bar3 = &handler_guard[2];
4934 assert_eq!(bar3.open, Price::from("1.00020"));
4935 assert_eq!(bar3.close, Price::from("1.00030"));
4936 }
4937
4938 #[rstest]
4939 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
4940 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4941 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4943 let handler = Arc::new(Mutex::new(Vec::new()));
4944 let handler_clone = Arc::clone(&handler);
4945
4946 let mut aggregator = RenkoBarAggregator::new(
4947 bar_type,
4948 instrument.price_precision(),
4949 instrument.size_precision(),
4950 instrument.price_increment(),
4951 move |bar: Bar| {
4952 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4953 handler_guard.push(bar);
4954 },
4955 );
4956
4957 let bar1 = Bar::new(
4959 BarType::new(
4960 instrument.id(),
4961 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4962 AggregationSource::Internal,
4963 ),
4964 Price::from("1.00020"),
4965 Price::from("1.00025"),
4966 Price::from("1.00015"),
4967 Price::from("1.00020"),
4968 Quantity::from(100),
4969 UnixNanos::default(),
4970 UnixNanos::default(),
4971 );
4972
4973 let bar2 = Bar::new(
4975 BarType::new(
4976 instrument.id(),
4977 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4978 AggregationSource::Internal,
4979 ),
4980 Price::from("1.00020"),
4981 Price::from("1.00025"),
4982 Price::from("1.00005"),
4983 Price::from("1.00010"), Quantity::from(50),
4985 UnixNanos::from(60_000_000_000),
4986 UnixNanos::from(60_000_000_000),
4987 );
4988
4989 aggregator.handle_bar(bar1);
4990 aggregator.handle_bar(bar2);
4991
4992 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4993 assert_eq!(handler_guard.len(), 1);
4994
4995 let bar = handler_guard.first().unwrap();
4996 assert_eq!(bar.open, Price::from("1.00020"));
4997 assert_eq!(bar.high, Price::from("1.00020"));
4998 assert_eq!(bar.low, Price::from("1.00010"));
4999 assert_eq!(bar.close, Price::from("1.00010"));
5000 assert_eq!(bar.volume, Quantity::from(150));
5001 }
5002
5003 #[rstest]
5004 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
5005 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5006
5007 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
5010 let handler = Arc::new(Mutex::new(Vec::new()));
5011 let handler_clone = Arc::clone(&handler);
5012
5013 let aggregator_5 = RenkoBarAggregator::new(
5014 bar_type_5,
5015 instrument.price_precision(),
5016 instrument.size_precision(),
5017 instrument.price_increment(),
5018 move |_bar: Bar| {
5019 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5020 handler_guard.push(_bar);
5021 },
5022 );
5023
5024 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
5026 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
5027
5028 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
5030 let handler2 = Arc::new(Mutex::new(Vec::new()));
5031 let handler2_clone = Arc::clone(&handler2);
5032
5033 let aggregator_20 = RenkoBarAggregator::new(
5034 bar_type_20,
5035 instrument.price_precision(),
5036 instrument.size_precision(),
5037 instrument.price_increment(),
5038 move |_bar: Bar| {
5039 let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
5040 handler_guard.push(_bar);
5041 },
5042 );
5043
5044 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
5046 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
5047 }
5048
5049 #[rstest]
5050 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
5051 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5052 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5054 let handler = Arc::new(Mutex::new(Vec::new()));
5055 let handler_clone = Arc::clone(&handler);
5056
5057 let mut aggregator = RenkoBarAggregator::new(
5058 bar_type,
5059 instrument.price_precision(),
5060 instrument.size_precision(),
5061 instrument.price_increment(),
5062 move |bar: Bar| {
5063 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5064 handler_guard.push(bar);
5065 },
5066 );
5067
5068 aggregator.update(
5070 Price::from("1.00000"),
5071 Quantity::from(1),
5072 UnixNanos::from(1000),
5073 );
5074 aggregator.update(
5075 Price::from("1.00010"),
5076 Quantity::from(1),
5077 UnixNanos::from(2000),
5078 ); aggregator.update(
5080 Price::from("1.00020"),
5081 Quantity::from(1),
5082 UnixNanos::from(3000),
5083 ); aggregator.update(
5085 Price::from("1.00025"),
5086 Quantity::from(1),
5087 UnixNanos::from(4000),
5088 ); aggregator.update(
5090 Price::from("1.00030"),
5091 Quantity::from(1),
5092 UnixNanos::from(5000),
5093 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
5096 assert_eq!(handler_guard.len(), 3);
5097
5098 let bar1 = &handler_guard[0];
5099 assert_eq!(bar1.open, Price::from("1.00000"));
5100 assert_eq!(bar1.close, Price::from("1.00010"));
5101
5102 let bar2 = &handler_guard[1];
5103 assert_eq!(bar2.open, Price::from("1.00010"));
5104 assert_eq!(bar2.close, Price::from("1.00020"));
5105
5106 let bar3 = &handler_guard[2];
5107 assert_eq!(bar3.open, Price::from("1.00020"));
5108 assert_eq!(bar3.close, Price::from("1.00030"));
5109 }
5110
5111 #[rstest]
5112 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
5113 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5114 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5116 let handler = Arc::new(Mutex::new(Vec::new()));
5117 let handler_clone = Arc::clone(&handler);
5118
5119 let mut aggregator = RenkoBarAggregator::new(
5120 bar_type,
5121 instrument.price_precision(),
5122 instrument.size_precision(),
5123 instrument.price_increment(),
5124 move |bar: Bar| {
5125 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5126 handler_guard.push(bar);
5127 },
5128 );
5129
5130 aggregator.update(
5132 Price::from("1.00000"),
5133 Quantity::from(1),
5134 UnixNanos::from(1000),
5135 );
5136 aggregator.update(
5137 Price::from("1.00010"),
5138 Quantity::from(1),
5139 UnixNanos::from(2000),
5140 ); aggregator.update(
5142 Price::from("0.99990"),
5143 Quantity::from(1),
5144 UnixNanos::from(3000),
5145 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
5148 assert_eq!(handler_guard.len(), 3);
5149
5150 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
5152 assert_eq!(bar1.high, Price::from("1.00010"));
5153 assert_eq!(bar1.low, Price::from("1.00000"));
5154 assert_eq!(bar1.close, Price::from("1.00010"));
5155
5156 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
5158 assert_eq!(bar2.high, Price::from("1.00010"));
5159 assert_eq!(bar2.low, Price::from("1.00000"));
5160 assert_eq!(bar2.close, Price::from("1.00000"));
5161
5162 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
5164 assert_eq!(bar3.high, Price::from("1.00000"));
5165 assert_eq!(bar3.low, Price::from("0.99990"));
5166 assert_eq!(bar3.close, Price::from("0.99990"));
5167 }
5168
5169 #[rstest]
5170 fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
5171 let instrument = InstrumentAny::Equity(equity_aapl);
5172 let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
5173 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5174 let handler = Arc::new(Mutex::new(Vec::new()));
5175 let handler_clone = Arc::clone(&handler);
5176
5177 let mut aggregator = TickImbalanceBarAggregator::new(
5178 bar_type,
5179 instrument.price_precision(),
5180 instrument.size_precision(),
5181 move |bar: Bar| {
5182 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5183 handler_guard.push(bar);
5184 },
5185 );
5186
5187 let buy = TradeTick {
5188 aggressor_side: AggressorSide::Buyer,
5189 ..TradeTick::default()
5190 };
5191 let sell = TradeTick {
5192 aggressor_side: AggressorSide::Seller,
5193 ..TradeTick::default()
5194 };
5195
5196 aggregator.handle_trade(buy);
5197 aggregator.handle_trade(sell);
5198 aggregator.handle_trade(buy);
5199
5200 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5201 assert_eq!(handler_guard.len(), 0);
5202 }
5203
5204 #[rstest]
5205 fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
5206 let instrument = InstrumentAny::Equity(equity_aapl);
5207 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
5208 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5209 let handler = Arc::new(Mutex::new(Vec::new()));
5210 let handler_clone = Arc::clone(&handler);
5211
5212 let mut aggregator = TickImbalanceBarAggregator::new(
5213 bar_type,
5214 instrument.price_precision(),
5215 instrument.size_precision(),
5216 move |bar: Bar| {
5217 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5218 handler_guard.push(bar);
5219 },
5220 );
5221
5222 let buy = TradeTick {
5223 aggressor_side: AggressorSide::Buyer,
5224 ..TradeTick::default()
5225 };
5226 let no_aggressor = TradeTick {
5227 aggressor_side: AggressorSide::NoAggressor,
5228 ..TradeTick::default()
5229 };
5230
5231 aggregator.handle_trade(buy);
5232 aggregator.handle_trade(no_aggressor);
5233 aggregator.handle_trade(buy);
5234
5235 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5236 assert_eq!(handler_guard.len(), 1);
5237 }
5238
5239 #[rstest]
5240 fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
5241 let instrument = InstrumentAny::Equity(equity_aapl);
5242 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
5243 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5244 let handler = Arc::new(Mutex::new(Vec::new()));
5245 let handler_clone = Arc::clone(&handler);
5246
5247 let mut aggregator = TickRunsBarAggregator::new(
5248 bar_type,
5249 instrument.price_precision(),
5250 instrument.size_precision(),
5251 move |bar: Bar| {
5252 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5253 handler_guard.push(bar);
5254 },
5255 );
5256
5257 let buy = TradeTick {
5258 aggressor_side: AggressorSide::Buyer,
5259 ..TradeTick::default()
5260 };
5261 let sell = TradeTick {
5262 aggressor_side: AggressorSide::Seller,
5263 ..TradeTick::default()
5264 };
5265
5266 aggregator.handle_trade(buy);
5267 aggregator.handle_trade(buy);
5268 aggregator.handle_trade(sell);
5269 aggregator.handle_trade(sell);
5270
5271 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5272 assert_eq!(handler_guard.len(), 2);
5273 }
5274
5275 #[rstest]
5276 fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5277 let instrument = InstrumentAny::Equity(equity_aapl);
5278 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
5279 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5280 let handler = Arc::new(Mutex::new(Vec::new()));
5281 let handler_clone = Arc::clone(&handler);
5282
5283 let mut aggregator = VolumeImbalanceBarAggregator::new(
5284 bar_type,
5285 instrument.price_precision(),
5286 instrument.size_precision(),
5287 move |bar: Bar| {
5288 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5289 handler_guard.push(bar);
5290 },
5291 );
5292
5293 let large_trade = TradeTick {
5294 size: Quantity::from(25),
5295 aggressor_side: AggressorSide::Buyer,
5296 ..TradeTick::default()
5297 };
5298
5299 aggregator.handle_trade(large_trade);
5300
5301 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5302 assert_eq!(handler_guard.len(), 2);
5303 }
5304
5305 #[rstest]
5306 fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
5307 equity_aapl: Equity,
5308 ) {
5309 let instrument = InstrumentAny::Equity(equity_aapl);
5310 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
5311 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5312 let handler = Arc::new(Mutex::new(Vec::new()));
5313 let handler_clone = Arc::clone(&handler);
5314
5315 let mut aggregator = VolumeImbalanceBarAggregator::new(
5316 bar_type,
5317 instrument.price_precision(),
5318 instrument.size_precision(),
5319 move |bar: Bar| {
5320 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5321 handler_guard.push(bar);
5322 },
5323 );
5324
5325 let buy = TradeTick {
5326 size: Quantity::from(5),
5327 aggressor_side: AggressorSide::Buyer,
5328 ..TradeTick::default()
5329 };
5330 let no_aggressor = TradeTick {
5331 size: Quantity::from(3),
5332 aggressor_side: AggressorSide::NoAggressor,
5333 ..TradeTick::default()
5334 };
5335
5336 aggregator.handle_trade(buy);
5337 aggregator.handle_trade(no_aggressor);
5338 aggregator.handle_trade(buy);
5339
5340 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5341 assert_eq!(handler_guard.len(), 1);
5342 }
5343
5344 #[rstest]
5345 fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5346 let instrument = InstrumentAny::Equity(equity_aapl);
5347 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
5348 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5349 let handler = Arc::new(Mutex::new(Vec::new()));
5350 let handler_clone = Arc::clone(&handler);
5351
5352 let mut aggregator = VolumeRunsBarAggregator::new(
5353 bar_type,
5354 instrument.price_precision(),
5355 instrument.size_precision(),
5356 move |bar: Bar| {
5357 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5358 handler_guard.push(bar);
5359 },
5360 );
5361
5362 let large_trade = TradeTick {
5363 size: Quantity::from(25),
5364 aggressor_side: AggressorSide::Buyer,
5365 ..TradeTick::default()
5366 };
5367
5368 aggregator.handle_trade(large_trade);
5369
5370 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5371 assert_eq!(handler_guard.len(), 2);
5372 }
5373
5374 #[rstest]
5375 fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5376 let instrument = InstrumentAny::Equity(equity_aapl);
5377 let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
5378 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5379 let handler = Arc::new(Mutex::new(Vec::new()));
5380 let handler_clone = Arc::clone(&handler);
5381
5382 let mut aggregator = ValueRunsBarAggregator::new(
5383 bar_type,
5384 instrument.price_precision(),
5385 instrument.size_precision(),
5386 move |bar: Bar| {
5387 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5388 handler_guard.push(bar);
5389 },
5390 );
5391
5392 let large_trade = TradeTick {
5393 price: Price::from("5.00"),
5394 size: Quantity::from(25),
5395 aggressor_side: AggressorSide::Buyer,
5396 ..TradeTick::default()
5397 };
5398
5399 aggregator.handle_trade(large_trade);
5400
5401 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5402 assert_eq!(handler_guard.len(), 2);
5403 }
5404
5405 #[rstest]
5406 fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5407 let instrument = InstrumentAny::Equity(equity_aapl);
5408 let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
5409 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5410 let handler = Arc::new(Mutex::new(Vec::new()));
5411 let handler_clone = Arc::clone(&handler);
5412
5413 let mut aggregator = ValueBarAggregator::new(
5414 bar_type,
5415 instrument.price_precision(),
5416 instrument.size_precision(),
5417 move |bar: Bar| {
5418 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5419 handler_guard.push(bar);
5420 },
5421 );
5422
5423 aggregator.update(
5425 Price::from("1000.00"),
5426 Quantity::from(3),
5427 UnixNanos::default(),
5428 );
5429
5430 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5432 assert_eq!(handler_guard.len(), 3);
5433 for bar in handler_guard.iter() {
5434 assert_eq!(bar.volume, Quantity::from(1));
5435 }
5436 }
5437
5438 #[rstest]
5439 fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5440 let instrument = InstrumentAny::Equity(equity_aapl);
5441 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5442 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5443 let handler = Arc::new(Mutex::new(Vec::new()));
5444 let handler_clone = Arc::clone(&handler);
5445
5446 let mut aggregator = ValueImbalanceBarAggregator::new(
5447 bar_type,
5448 instrument.price_precision(),
5449 instrument.size_precision(),
5450 move |bar: Bar| {
5451 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5452 handler_guard.push(bar);
5453 },
5454 );
5455
5456 let trade = TradeTick {
5457 price: Price::from("1000.00"),
5458 size: Quantity::from(3),
5459 aggressor_side: AggressorSide::Buyer,
5460 instrument_id: instrument.id(),
5461 ..TradeTick::default()
5462 };
5463
5464 aggregator.handle_trade(trade);
5465
5466 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5467 assert_eq!(handler_guard.len(), 3);
5468 for bar in handler_guard.iter() {
5469 assert_eq!(bar.volume, Quantity::from(1));
5470 }
5471 }
5472
5473 #[rstest]
5474 fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
5475 let instrument = InstrumentAny::Equity(equity_aapl);
5476 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5477 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5478 let handler = Arc::new(Mutex::new(Vec::new()));
5479 let handler_clone = Arc::clone(&handler);
5480
5481 let mut aggregator = ValueImbalanceBarAggregator::new(
5482 bar_type,
5483 instrument.price_precision(),
5484 instrument.size_precision(),
5485 move |bar: Bar| {
5486 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5487 handler_guard.push(bar);
5488 },
5489 );
5490
5491 let sell_tick = TradeTick {
5493 price: Price::from("10.00"),
5494 size: Quantity::from(5),
5495 aggressor_side: AggressorSide::Seller,
5496 instrument_id: instrument.id(),
5497 ..TradeTick::default()
5498 };
5499
5500 let buy_tick = TradeTick {
5503 price: Price::from("1000.00"),
5504 size: Quantity::from(1),
5505 aggressor_side: AggressorSide::Buyer,
5506 instrument_id: instrument.id(),
5507 ts_init: UnixNanos::from(1),
5508 ts_event: UnixNanos::from(1),
5509 ..TradeTick::default()
5510 };
5511
5512 aggregator.handle_trade(sell_tick);
5513 aggregator.handle_trade(buy_tick);
5514
5515 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5516 assert_eq!(handler_guard.len(), 1);
5517 assert_eq!(handler_guard[0].volume, Quantity::from(6));
5518 }
5519
5520 #[rstest]
5521 fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5522 let instrument = InstrumentAny::Equity(equity_aapl);
5523 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
5524 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5525 let handler = Arc::new(Mutex::new(Vec::new()));
5526 let handler_clone = Arc::clone(&handler);
5527
5528 let mut aggregator = ValueRunsBarAggregator::new(
5529 bar_type,
5530 instrument.price_precision(),
5531 instrument.size_precision(),
5532 move |bar: Bar| {
5533 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5534 handler_guard.push(bar);
5535 },
5536 );
5537
5538 let trade = TradeTick {
5539 price: Price::from("1000.00"),
5540 size: Quantity::from(3),
5541 aggressor_side: AggressorSide::Buyer,
5542 instrument_id: instrument.id(),
5543 ..TradeTick::default()
5544 };
5545
5546 aggregator.handle_trade(trade);
5547
5548 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5549 assert_eq!(handler_guard.len(), 3);
5550 for bar in handler_guard.iter() {
5551 assert_eq!(bar.volume, Quantity::from(1));
5552 }
5553 }
5554
5555 #[rstest]
5556 #[case(1000_u64)]
5557 #[case(1500_u64)]
5558 fn test_volume_imbalance_bar_aggregator_large_step_no_overflow(
5559 equity_aapl: Equity,
5560 #[case] step: u64,
5561 ) {
5562 let instrument = InstrumentAny::Equity(equity_aapl);
5563 let bar_spec = BarSpecification::new(
5564 step as usize,
5565 BarAggregation::VolumeImbalance,
5566 PriceType::Last,
5567 );
5568 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5569 let handler = Arc::new(Mutex::new(Vec::new()));
5570 let handler_clone = Arc::clone(&handler);
5571
5572 let mut aggregator = VolumeImbalanceBarAggregator::new(
5573 bar_type,
5574 instrument.price_precision(),
5575 instrument.size_precision(),
5576 move |bar: Bar| {
5577 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5578 handler_guard.push(bar);
5579 },
5580 );
5581
5582 let trade = TradeTick {
5583 size: Quantity::from(step * 2),
5584 aggressor_side: AggressorSide::Buyer,
5585 ..TradeTick::default()
5586 };
5587
5588 aggregator.handle_trade(trade);
5589
5590 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5591 assert_eq!(handler_guard.len(), 2);
5592 for bar in handler_guard.iter() {
5593 assert_eq!(bar.volume.as_f64(), step as f64);
5594 }
5595 }
5596
5597 #[rstest]
5598 fn test_volume_imbalance_bar_aggregator_different_large_steps_produce_different_bar_counts(
5599 equity_aapl: Equity,
5600 ) {
5601 let instrument = InstrumentAny::Equity(equity_aapl);
5602 let total_volume = 3000_u64;
5603 let mut results = Vec::new();
5604
5605 for step in [1000_usize, 1500] {
5606 let bar_spec =
5607 BarSpecification::new(step, BarAggregation::VolumeImbalance, PriceType::Last);
5608 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5609 let handler = Arc::new(Mutex::new(Vec::new()));
5610 let handler_clone = Arc::clone(&handler);
5611
5612 let mut aggregator = VolumeImbalanceBarAggregator::new(
5613 bar_type,
5614 instrument.price_precision(),
5615 instrument.size_precision(),
5616 move |bar: Bar| {
5617 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5618 handler_guard.push(bar);
5619 },
5620 );
5621
5622 let trade = TradeTick {
5623 size: Quantity::from(total_volume),
5624 aggressor_side: AggressorSide::Buyer,
5625 ..TradeTick::default()
5626 };
5627
5628 aggregator.handle_trade(trade);
5629
5630 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5631 results.push(handler_guard.len());
5632 }
5633
5634 assert_eq!(results[0], 3); assert_eq!(results[1], 2); assert_ne!(results[0], results[1]);
5637 }
5638
5639 #[rstest]
5640 #[case(1000_u64)]
5641 #[case(1500_u64)]
5642 fn test_volume_runs_bar_aggregator_large_step_no_overflow(
5643 equity_aapl: Equity,
5644 #[case] step: u64,
5645 ) {
5646 let instrument = InstrumentAny::Equity(equity_aapl);
5647 let bar_spec =
5648 BarSpecification::new(step as usize, BarAggregation::VolumeRuns, PriceType::Last);
5649 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5650 let handler = Arc::new(Mutex::new(Vec::new()));
5651 let handler_clone = Arc::clone(&handler);
5652
5653 let mut aggregator = VolumeRunsBarAggregator::new(
5654 bar_type,
5655 instrument.price_precision(),
5656 instrument.size_precision(),
5657 move |bar: Bar| {
5658 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5659 handler_guard.push(bar);
5660 },
5661 );
5662
5663 let trade = TradeTick {
5664 size: Quantity::from(step * 2),
5665 aggressor_side: AggressorSide::Buyer,
5666 ..TradeTick::default()
5667 };
5668
5669 aggregator.handle_trade(trade);
5670
5671 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5672 assert_eq!(handler_guard.len(), 2);
5673 for bar in handler_guard.iter() {
5674 assert_eq!(bar.volume.as_f64(), step as f64);
5675 }
5676 }
5677
5678 #[rstest]
5679 fn test_volume_runs_bar_aggregator_different_large_steps_produce_different_bar_counts(
5680 equity_aapl: Equity,
5681 ) {
5682 let instrument = InstrumentAny::Equity(equity_aapl);
5683 let total_volume = 3000_u64;
5684 let mut results = Vec::new();
5685
5686 for step in [1000_usize, 1500] {
5687 let bar_spec = BarSpecification::new(step, BarAggregation::VolumeRuns, PriceType::Last);
5688 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5689 let handler = Arc::new(Mutex::new(Vec::new()));
5690 let handler_clone = Arc::clone(&handler);
5691
5692 let mut aggregator = VolumeRunsBarAggregator::new(
5693 bar_type,
5694 instrument.price_precision(),
5695 instrument.size_precision(),
5696 move |bar: Bar| {
5697 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5698 handler_guard.push(bar);
5699 },
5700 );
5701
5702 let trade = TradeTick {
5703 size: Quantity::from(total_volume),
5704 aggressor_side: AggressorSide::Buyer,
5705 ..TradeTick::default()
5706 };
5707
5708 aggregator.handle_trade(trade);
5709
5710 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5711 results.push(handler_guard.len());
5712 }
5713
5714 assert_eq!(results[0], 3); assert_eq!(results[1], 2); assert_ne!(results[0], results[1]);
5717 }
5718
5719 #[rstest]
5721 fn test_time_bar_historical_defers_event_at_ts_init_until_after_update(equity_aapl: Equity) {
5722 let instrument = InstrumentAny::Equity(equity_aapl);
5723 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
5724 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5725 let handler = Arc::new(Mutex::new(Vec::new()));
5726 let handler_clone = Arc::clone(&handler);
5727 let clock = Rc::new(RefCell::new(TestClock::new()));
5728
5729 let mut agg = TimeBarAggregator::new(
5730 bar_type,
5731 instrument.price_precision(),
5732 instrument.size_precision(),
5733 clock.clone(),
5734 move |bar: Bar| {
5735 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
5736 h.push(bar);
5737 },
5738 true,
5739 true,
5740 BarIntervalType::LeftOpen,
5741 None,
5742 0,
5743 false,
5744 );
5745 agg.historical_mode = true;
5746 agg.set_clock_internal(clock);
5747 let boxed: Box<dyn BarAggregator> = Box::new(agg);
5748 let rc = Rc::new(RefCell::new(boxed));
5749 rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
5750
5751 rc.borrow_mut().update(
5752 Price::from("100.00"),
5753 Quantity::from(1),
5754 UnixNanos::default(),
5755 );
5756 rc.borrow_mut().update(
5757 Price::from("100.00"),
5758 Quantity::from(1),
5759 UnixNanos::from(1_000_000_000),
5760 );
5761
5762 let bars = handler.lock().expect(MUTEX_POISONED);
5763 assert!(
5764 !bars.is_empty(),
5765 "deferred event at ts_init should produce a bar that includes the update"
5766 );
5767 let last_bar = bars.last().unwrap();
5768 assert_eq!(last_bar.close, Price::from("100.00"));
5769 assert!(
5770 last_bar.volume.as_f64() >= 1.0,
5771 "bar built after deferred event should include the update at ts_init"
5772 );
5773 }
5774
5775 #[rstest]
5776 fn test_spread_quote_quote_driven_emits_when_all_legs_received(equity_aapl: Equity) {
5777 let instrument = InstrumentAny::Equity(equity_aapl);
5778 let leg1 = instrument.id();
5779 let leg2 = InstrumentId::from("MSFT.XNAS");
5780 let spread_id = InstrumentId::from("SPREAD.XNAS");
5781 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5782 let handler = Arc::new(Mutex::new(Vec::new()));
5783 let handler_clone = Arc::clone(&handler);
5784 let clock = Rc::new(RefCell::new(TestClock::new()));
5785
5786 let mut agg = SpreadQuoteAggregator::new(
5787 spread_id,
5788 &legs,
5789 true,
5790 instrument.price_precision(),
5791 0,
5792 Box::new(move |q: QuoteTick| {
5793 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5794 }),
5795 clock,
5796 false,
5797 None,
5798 0,
5799 None,
5800 None,
5801 );
5802
5803 let ts = UnixNanos::from(1_000_000_000);
5804 agg.handle_quote_tick(QuoteTick::new(
5805 leg1,
5806 Price::from("100.00"),
5807 Price::from("100.10"),
5808 Quantity::from(10),
5809 Quantity::from(10),
5810 ts,
5811 ts,
5812 ));
5813 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5814
5815 agg.handle_quote_tick(QuoteTick::new(
5816 leg2,
5817 Price::from("99.00"),
5818 Price::from("99.10"),
5819 Quantity::from(10),
5820 Quantity::from(10),
5821 ts,
5822 ts,
5823 ));
5824 let quotes = handler.lock().expect(MUTEX_POISONED);
5825 assert_eq!(quotes.len(), 1);
5826 assert_eq!(quotes[0].instrument_id, spread_id);
5827 assert!(quotes[0].bid_price < quotes[0].ask_price);
5828 }
5829
5830 #[rstest]
5831 fn test_spread_quote_futures_pricing_signed_ratios(equity_aapl: Equity) {
5832 let instrument = InstrumentAny::Equity(equity_aapl);
5833 let leg1 = instrument.id();
5834 let leg2 = InstrumentId::from("MSFT.XNAS");
5835 let spread_id = InstrumentId::from("SPREAD.XNAS");
5836 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5837 let handler = Arc::new(Mutex::new(Vec::new()));
5838 let handler_clone = Arc::clone(&handler);
5839 let clock = Rc::new(RefCell::new(TestClock::new()));
5840
5841 let mut agg = SpreadQuoteAggregator::new(
5842 spread_id,
5843 &legs,
5844 true,
5845 instrument.price_precision(),
5846 0,
5847 Box::new(move |q: QuoteTick| {
5848 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5849 }),
5850 clock,
5851 false,
5852 None,
5853 0,
5854 None,
5855 None,
5856 );
5857
5858 let ts = UnixNanos::from(1_000_000_000);
5859 agg.handle_quote_tick(QuoteTick::new(
5860 leg1,
5861 Price::from("10.00"),
5862 Price::from("10.10"),
5863 Quantity::from(100),
5864 Quantity::from(100),
5865 ts,
5866 ts,
5867 ));
5868 agg.handle_quote_tick(QuoteTick::new(
5869 leg2,
5870 Price::from("20.00"),
5871 Price::from("20.10"),
5872 Quantity::from(100),
5873 Quantity::from(100),
5874 ts,
5875 ts,
5876 ));
5877 let quotes = handler.lock().expect(MUTEX_POISONED);
5878 assert_eq!(quotes.len(), 1);
5879 let q = "es[0];
5880 assert_eq!(q.instrument_id, spread_id);
5881 assert_eq!(q.bid_price, Price::from("-10.10"));
5882 assert_eq!(q.ask_price, Price::from("-9.90"));
5883 }
5884
5885 #[rstest]
5886 fn test_spread_quote_size_calculation_non_unit_ratios(equity_aapl: Equity) {
5887 let instrument = InstrumentAny::Equity(equity_aapl);
5888 let leg1 = instrument.id();
5889 let leg2 = InstrumentId::from("MSFT.XNAS");
5890 let spread_id = InstrumentId::from("SPREAD.XNAS");
5891 let legs = vec![(leg1, 2_i64), (leg2, -1_i64)];
5892 let handler = Arc::new(Mutex::new(Vec::new()));
5893 let handler_clone = Arc::clone(&handler);
5894 let clock = Rc::new(RefCell::new(TestClock::new()));
5895
5896 let mut agg = SpreadQuoteAggregator::new(
5897 spread_id,
5898 &legs,
5899 true,
5900 instrument.price_precision(),
5901 0,
5902 Box::new(move |q: QuoteTick| {
5903 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5904 }),
5905 clock,
5906 false,
5907 None,
5908 0,
5909 None,
5910 None,
5911 );
5912
5913 let ts = UnixNanos::from(1_000_000_000);
5914 agg.handle_quote_tick(QuoteTick::new(
5915 leg1,
5916 Price::from("10.00"),
5917 Price::from("10.10"),
5918 Quantity::from(100),
5919 Quantity::from(40),
5920 ts,
5921 ts,
5922 ));
5923 agg.handle_quote_tick(QuoteTick::new(
5924 leg2,
5925 Price::from("10.00"),
5926 Price::from("10.10"),
5927 Quantity::from(50),
5928 Quantity::from(30),
5929 ts,
5930 ts,
5931 ));
5932 let quotes = handler.lock().expect(MUTEX_POISONED);
5933 assert_eq!(quotes.len(), 1);
5934 let q = "es[0];
5935 assert_eq!(q.bid_size.as_f64(), 30.0);
5936 assert_eq!(q.ask_size.as_f64(), 20.0);
5937 }
5938
5939 #[rstest]
5940 fn test_spread_quote_timer_driven_emission_cadence(equity_aapl: Equity) {
5941 let instrument = InstrumentAny::Equity(equity_aapl);
5942 let leg1 = instrument.id();
5943 let leg2 = InstrumentId::from("MSFT.XNAS");
5944 let spread_id = InstrumentId::from("SPREAD.XNAS");
5945 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5946 let handler = Arc::new(Mutex::new(Vec::new()));
5947 let handler_clone = Arc::clone(&handler);
5948 let clock = Rc::new(RefCell::new(TestClock::new()));
5949 clock.borrow_mut().set_time(UnixNanos::from(0));
5950
5951 let agg = SpreadQuoteAggregator::new(
5952 spread_id,
5953 &legs,
5954 true,
5955 instrument.price_precision(),
5956 0,
5957 Box::new(move |q: QuoteTick| {
5958 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5959 }),
5960 clock.clone(),
5961 false,
5962 Some(1),
5963 0,
5964 None,
5965 None,
5966 );
5967 let rc = Rc::new(RefCell::new(agg));
5968 rc.borrow_mut().prepare_for_timer_mode(&rc);
5969 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
5970
5971 for event in clock.borrow_mut().advance_time(UnixNanos::from(0), true) {
5972 rc.borrow_mut().on_timer_fire(event.ts_event);
5973 }
5974 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5975
5976 let ts1 = UnixNanos::from(1_000_000_000);
5977 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5978 leg1,
5979 Price::from("100.00"),
5980 Price::from("100.10"),
5981 Quantity::from(10),
5982 Quantity::from(10),
5983 ts1,
5984 ts1,
5985 ));
5986 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5987 leg2,
5988 Price::from("99.00"),
5989 Price::from("99.10"),
5990 Quantity::from(10),
5991 Quantity::from(10),
5992 ts1,
5993 ts1,
5994 ));
5995
5996 for event in clock.borrow_mut().advance_time(ts1, true) {
5997 rc.borrow_mut().on_timer_fire(event.ts_event);
5998 }
5999
6000 {
6001 let quotes = handler.lock().expect(MUTEX_POISONED);
6002 assert_eq!(quotes.len(), 1);
6003 assert_eq!(quotes[0].ts_event, ts1);
6004 assert_eq!(quotes[0].ts_init, ts1);
6005 }
6006
6007 let ts2 = UnixNanos::from(2_000_000_000);
6008 for event in clock.borrow_mut().advance_time(ts2, true) {
6009 rc.borrow_mut().on_timer_fire(event.ts_event);
6010 }
6011
6012 let quotes = handler.lock().expect(MUTEX_POISONED);
6013 assert_eq!(quotes.len(), 1);
6014 }
6015
6016 #[rstest]
6017 fn test_spread_quote_historical_timer_waits_for_all_legs(equity_aapl: Equity) {
6018 let instrument = InstrumentAny::Equity(equity_aapl);
6019 let leg1 = instrument.id();
6020 let leg2 = InstrumentId::from("MSFT.XNAS");
6021 let spread_id = InstrumentId::from("SPREAD.XNAS");
6022 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6023 let handler = Arc::new(Mutex::new(Vec::new()));
6024 let handler_clone = Arc::clone(&handler);
6025 let clock = Rc::new(RefCell::new(TestClock::new()));
6026
6027 let agg = SpreadQuoteAggregator::new(
6028 spread_id,
6029 &legs,
6030 true,
6031 instrument.price_precision(),
6032 0,
6033 Box::new(move |q: QuoteTick| {
6034 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6035 }),
6036 clock.clone(),
6038 true,
6039 Some(1),
6040 0,
6041 None,
6042 None,
6043 );
6044 let rc = Rc::new(RefCell::new(agg));
6045 rc.borrow_mut().prepare_for_timer_mode(&rc);
6046 rc.borrow_mut().set_clock(clock);
6047
6048 let ts1 = UnixNanos::from(1_000_000_000);
6049 let ts2 = UnixNanos::from(2_000_000_000);
6050 let ts3 = UnixNanos::from(3_000_000_000);
6051 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6052 leg1,
6053 Price::from("100.00"),
6054 Price::from("100.10"),
6055 Quantity::from(10),
6056 Quantity::from(10),
6057 ts1,
6058 ts1,
6059 ));
6060 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6061
6062 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6063 leg2,
6064 Price::from("99.00"),
6065 Price::from("99.10"),
6066 Quantity::from(10),
6067 Quantity::from(10),
6068 ts2,
6069 ts2,
6070 ));
6071 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6072
6073 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6074 leg1,
6075 Price::from("100.00"),
6076 Price::from("100.10"),
6077 Quantity::from(10),
6078 Quantity::from(10),
6079 ts3,
6080 ts3,
6081 ));
6082 let quotes = handler.lock().expect(MUTEX_POISONED);
6083 assert_eq!(
6084 quotes.len(),
6085 1,
6086 "deferred event at ts2 is processed when we have all legs and advance to ts3"
6087 );
6088 }
6089
6090 #[rstest]
6091 fn test_spread_quote_historical_flush_emits_pending_final_quote(equity_aapl: Equity) {
6092 let instrument = InstrumentAny::Equity(equity_aapl);
6093 let leg1 = instrument.id();
6094 let leg2 = InstrumentId::from("MSFT.XNAS");
6095 let spread_id = InstrumentId::from("SPREAD.XNAS");
6096 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6097 let handler = Arc::new(Mutex::new(Vec::new()));
6098 let handler_clone = Arc::clone(&handler);
6099 let clock = Rc::new(RefCell::new(TestClock::new()));
6100
6101 let agg = SpreadQuoteAggregator::new(
6102 spread_id,
6103 &legs,
6104 true,
6105 instrument.price_precision(),
6106 0,
6107 Box::new(move |q: QuoteTick| {
6108 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6109 }),
6110 clock.clone(),
6112 true,
6113 Some(1),
6114 0,
6115 None,
6116 None,
6117 );
6118 let rc = Rc::new(RefCell::new(agg));
6119 rc.borrow_mut().prepare_for_timer_mode(&rc);
6120 rc.borrow_mut().set_clock(clock);
6121
6122 let ts1 = UnixNanos::from(1_000_000_000);
6123 let ts2 = UnixNanos::from(2_000_000_000);
6124 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6125 leg1,
6126 Price::from("100.00"),
6127 Price::from("100.10"),
6128 Quantity::from(10),
6129 Quantity::from(10),
6130 ts1,
6131 ts1,
6132 ));
6133 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6134 leg2,
6135 Price::from("99.00"),
6136 Price::from("99.10"),
6137 Quantity::from(10),
6138 Quantity::from(10),
6139 ts2,
6140 ts2,
6141 ));
6142
6143 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6144
6145 rc.borrow_mut().flush_pending_historical_quote();
6146
6147 let quotes = handler.lock().expect(MUTEX_POISONED);
6148 assert_eq!(
6149 quotes.len(),
6150 1,
6151 "final historical quote should be emitted when the deferred event is flushed",
6152 );
6153 assert_eq!(quotes[0].ts_event, ts2);
6154 }
6155
6156 #[rstest]
6157 fn test_spread_quote_option_vega_weighting(equity_aapl: Equity) {
6158 let instrument = InstrumentAny::Equity(equity_aapl);
6159 let leg1 = instrument.id();
6160 let leg2 = InstrumentId::from("MSFT.XNAS");
6161 let spread_id = InstrumentId::from("SPREAD.XNAS");
6162 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6163 let handler = Arc::new(Mutex::new(Vec::new()));
6164 let handler_clone = Arc::clone(&handler);
6165 let clock = Rc::new(RefCell::new(TestClock::new()));
6166
6167 let mut vega_provider = MapVegaProvider::new();
6168 vega_provider.insert(leg1, 0.15);
6169 vega_provider.insert(leg2, 0.12);
6170
6171 let mut agg = SpreadQuoteAggregator::new(
6172 spread_id,
6173 &legs,
6174 false,
6175 instrument.price_precision(),
6176 0,
6177 Box::new(move |q: QuoteTick| {
6178 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6179 }),
6180 clock,
6181 false,
6182 None,
6183 0,
6184 Some(Box::new(vega_provider)),
6185 None,
6186 );
6187
6188 let ts = UnixNanos::from(1_000_000_000);
6189 agg.handle_quote_tick(QuoteTick::new(
6190 leg1,
6191 Price::from("10.00"),
6192 Price::from("10.20"),
6193 Quantity::from(100),
6194 Quantity::from(100),
6195 ts,
6196 ts,
6197 ));
6198 agg.handle_quote_tick(QuoteTick::new(
6199 leg2,
6200 Price::from("11.00"),
6201 Price::from("11.20"),
6202 Quantity::from(100),
6203 Quantity::from(100),
6204 ts,
6205 ts,
6206 ));
6207 let quotes = handler.lock().expect(MUTEX_POISONED);
6208 assert_eq!(quotes.len(), 1);
6209 let q = "es[0];
6210 assert!(q.bid_price < q.ask_price);
6211 assert!(q.ask_price.as_f64() - q.bid_price.as_f64() > 0.0);
6212 }
6213
6214 #[rstest]
6215 fn test_spread_quote_all_zero_vega_fallback(equity_aapl: Equity) {
6216 let instrument = InstrumentAny::Equity(equity_aapl);
6217 let leg1 = instrument.id();
6218 let leg2 = InstrumentId::from("MSFT.XNAS");
6219 let spread_id = InstrumentId::from("SPREAD.XNAS");
6220 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6221 let handler = Arc::new(Mutex::new(Vec::new()));
6222 let handler_clone = Arc::clone(&handler);
6223 let clock = Rc::new(RefCell::new(TestClock::new()));
6224
6225 let mut vega_provider = MapVegaProvider::new();
6226 vega_provider.insert(leg1, 0.0);
6227 vega_provider.insert(leg2, 0.0);
6228
6229 let mut agg = SpreadQuoteAggregator::new(
6230 spread_id,
6231 &legs,
6232 false,
6233 instrument.price_precision(),
6234 0,
6235 Box::new(move |q: QuoteTick| {
6236 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6237 }),
6238 clock,
6239 false,
6240 None,
6241 0,
6242 Some(Box::new(vega_provider)),
6243 None,
6244 );
6245
6246 let ts = UnixNanos::from(1_000_000_000);
6247 agg.handle_quote_tick(QuoteTick::new(
6248 leg1,
6249 Price::from("10.00"),
6250 Price::from("10.10"),
6251 Quantity::from(100),
6252 Quantity::from(100),
6253 ts,
6254 ts,
6255 ));
6256 agg.handle_quote_tick(QuoteTick::new(
6257 leg2,
6258 Price::from("20.00"),
6259 Price::from("20.10"),
6260 Quantity::from(100),
6261 Quantity::from(100),
6262 ts,
6263 ts,
6264 ));
6265 let quotes = handler.lock().expect(MUTEX_POISONED);
6266 assert_eq!(quotes.len(), 1);
6267 let q = "es[0];
6268 assert_eq!(q.bid_price, Price::from("-10.10"));
6269 assert_eq!(q.ask_price, Price::from("-9.90"));
6270 }
6271
6272 #[rstest]
6273 fn test_spread_quote_negative_prices_tick_scheme(equity_aapl: Equity) {
6274 let instrument = InstrumentAny::Equity(equity_aapl);
6275 let leg1 = instrument.id();
6276 let leg2 = InstrumentId::from("MSFT.XNAS");
6277 let spread_id = InstrumentId::from("SPREAD.XNAS");
6278 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6279 let handler = Arc::new(Mutex::new(Vec::new()));
6280 let handler_clone = Arc::clone(&handler);
6281 let clock = Rc::new(RefCell::new(TestClock::new()));
6282 let rounder = FixedTickSchemeRounder::new(0.01).unwrap();
6283
6284 let mut agg = SpreadQuoteAggregator::new(
6285 spread_id,
6286 &legs,
6287 true,
6288 2,
6289 0,
6290 Box::new(move |q: QuoteTick| {
6291 handler_clone.lock().expect(MUTEX_POISONED).push(q);
6292 }),
6293 clock,
6294 false,
6295 None,
6296 0,
6297 None,
6298 Some(Box::new(rounder)),
6299 );
6300
6301 let ts = UnixNanos::from(1_000_000_000);
6302 agg.handle_quote_tick(QuoteTick::new(
6303 leg1,
6304 Price::from("10.00"),
6305 Price::from("10.10"),
6306 Quantity::from(100),
6307 Quantity::from(100),
6308 ts,
6309 ts,
6310 ));
6311 agg.handle_quote_tick(QuoteTick::new(
6312 leg2,
6313 Price::from("20.00"),
6314 Price::from("20.10"),
6315 Quantity::from(100),
6316 Quantity::from(100),
6317 ts,
6318 ts,
6319 ));
6320 let quotes = handler.lock().expect(MUTEX_POISONED);
6321 assert_eq!(quotes.len(), 1);
6322 let q = "es[0];
6323 assert!(q.bid_price.as_f64() < 0.0);
6324 assert!(q.ask_price.as_f64() < 0.0);
6325 assert!(q.bid_price < q.ask_price);
6326 }
6327
6328 #[rstest]
6329 #[case(BarIntervalType::LeftOpen)]
6330 #[case(BarIntervalType::RightOpen)]
6331 fn test_time_bar_skip_first_non_full_bar_noop_on_boundary(
6332 equity_aapl: Equity,
6333 #[case] interval_type: BarIntervalType,
6334 ) {
6335 let instrument = InstrumentAny::Equity(equity_aapl);
6340 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6341 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6342 let handler = Arc::new(Mutex::new(Vec::new()));
6343 let handler_clone = Arc::clone(&handler);
6344 let clock = Rc::new(RefCell::new(TestClock::new()));
6345 clock.borrow_mut().set_time(UnixNanos::from(1_000_000_000));
6346 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6347
6348 let aggregator = TimeBarAggregator::new(
6349 bar_type,
6350 instrument.price_precision(),
6351 instrument.size_precision(),
6352 clock,
6353 move |bar: Bar| {
6354 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6355 h.push(bar);
6356 },
6357 false,
6358 false,
6359 interval_type,
6360 None,
6361 0,
6362 true, );
6364
6365 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6366 let rc = Rc::new(RefCell::new(boxed));
6367 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6368
6369 rc.borrow_mut().update(
6370 Price::from("100.00"),
6371 Quantity::from(1),
6372 UnixNanos::from(1_000_000_000),
6373 );
6374 rc.borrow_mut().build_bar(&TimeEvent::new(
6375 event_name,
6376 UUID4::new(),
6377 UnixNanos::from(2_000_000_000),
6378 UnixNanos::from(2_000_000_000),
6379 ));
6380 rc.borrow_mut().update(
6381 Price::from("101.00"),
6382 Quantity::from(1),
6383 UnixNanos::from(2_500_000_000),
6384 );
6385 rc.borrow_mut().build_bar(&TimeEvent::new(
6386 event_name,
6387 UUID4::new(),
6388 UnixNanos::from(3_000_000_000),
6389 UnixNanos::from(3_000_000_000),
6390 ));
6391
6392 let bars = handler.lock().expect(MUTEX_POISONED);
6393 assert_eq!(bars.len(), 2);
6394 assert_eq!(bars[0].close, Price::from("100.00"));
6395 assert_eq!(bars[1].close, Price::from("101.00"));
6396 }
6397
6398 #[rstest]
6399 #[case(BarIntervalType::LeftOpen)]
6400 #[case(BarIntervalType::RightOpen)]
6401 fn test_time_bar_skip_first_non_full_bar_drops_partial_bar(
6402 equity_aapl: Equity,
6403 #[case] interval_type: BarIntervalType,
6404 ) {
6405 let instrument = InstrumentAny::Equity(equity_aapl);
6409 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6410 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6411 let handler = Arc::new(Mutex::new(Vec::new()));
6412 let handler_clone = Arc::clone(&handler);
6413 let clock = Rc::new(RefCell::new(TestClock::new()));
6414 clock.borrow_mut().set_time(UnixNanos::from(1_500_000_000));
6415 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6416
6417 let aggregator = TimeBarAggregator::new(
6418 bar_type,
6419 instrument.price_precision(),
6420 instrument.size_precision(),
6421 clock,
6422 move |bar: Bar| {
6423 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6424 h.push(bar);
6425 },
6426 false,
6427 false,
6428 interval_type,
6429 None,
6430 0,
6431 true, );
6433
6434 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6435 let rc = Rc::new(RefCell::new(boxed));
6436 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6437
6438 rc.borrow_mut().update(
6439 Price::from("100.00"),
6440 Quantity::from(1),
6441 UnixNanos::from(1_500_000_000),
6442 );
6443 rc.borrow_mut().build_bar(&TimeEvent::new(
6444 event_name,
6445 UUID4::new(),
6446 UnixNanos::from(2_000_000_000),
6447 UnixNanos::from(2_000_000_000),
6448 ));
6449 rc.borrow_mut().update(
6450 Price::from("101.00"),
6451 Quantity::from(1),
6452 UnixNanos::from(2_500_000_000),
6453 );
6454 rc.borrow_mut().build_bar(&TimeEvent::new(
6455 event_name,
6456 UUID4::new(),
6457 UnixNanos::from(3_000_000_000),
6458 UnixNanos::from(3_000_000_000),
6459 ));
6460
6461 let bars = handler.lock().expect(MUTEX_POISONED);
6462 assert_eq!(bars.len(), 1);
6463 assert_eq!(bars[0].close, Price::from("101.00"));
6464 }
6465
6466 #[rstest]
6467 fn test_time_bar_skip_first_non_full_bar_skips_every_call_before_first_close(
6468 equity_aapl: Equity,
6469 ) {
6470 let instrument = InstrumentAny::Equity(equity_aapl);
6474 let bar_spec = BarSpecification::new(10, BarAggregation::Second, PriceType::Last);
6475 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6476 let handler = Arc::new(Mutex::new(Vec::new()));
6477 let handler_clone = Arc::clone(&handler);
6478 let clock = Rc::new(RefCell::new(TestClock::new()));
6479 clock.borrow_mut().set_time(UnixNanos::from(5_000_000_000));
6480 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6481
6482 let aggregator = TimeBarAggregator::new(
6483 bar_type,
6484 instrument.price_precision(),
6485 instrument.size_precision(),
6486 clock,
6487 move |bar: Bar| {
6488 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6489 h.push(bar);
6490 },
6491 false,
6492 false,
6493 BarIntervalType::LeftOpen,
6494 None,
6495 0,
6496 true, );
6498
6499 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6500 let rc = Rc::new(RefCell::new(boxed));
6501 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6502
6503 for (price, update_ts, event_ts) in [
6507 ("100.00", 5_500_000_000_u64, 7_000_000_000_u64),
6508 ("101.00", 7_500_000_000_u64, 8_000_000_000_u64),
6509 ("102.00", 9_000_000_000_u64, 10_000_000_000_u64),
6510 ] {
6511 rc.borrow_mut().update(
6512 Price::from(price),
6513 Quantity::from(1),
6514 UnixNanos::from(update_ts),
6515 );
6516 rc.borrow_mut().build_bar(&TimeEvent::new(
6517 event_name,
6518 UUID4::new(),
6519 UnixNanos::from(event_ts),
6520 UnixNanos::from(event_ts),
6521 ));
6522 }
6523
6524 rc.borrow_mut().update(
6526 Price::from("103.00"),
6527 Quantity::from(1),
6528 UnixNanos::from(10_500_000_000),
6529 );
6530 rc.borrow_mut().build_bar(&TimeEvent::new(
6531 event_name,
6532 UUID4::new(),
6533 UnixNanos::from(11_000_000_000),
6534 UnixNanos::from(11_000_000_000),
6535 ));
6536
6537 let bars = handler.lock().expect(MUTEX_POISONED);
6538 assert_eq!(bars.len(), 1);
6539 assert_eq!(bars[0].close, Price::from("103.00"));
6540 }
6541
6542 #[rstest]
6543 fn test_time_bar_skip_first_non_full_bar_skips_when_build_delay_shifts_start(
6544 equity_aapl: Equity,
6545 ) {
6546 let instrument = InstrumentAny::Equity(equity_aapl);
6551 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6552 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6553 let handler = Arc::new(Mutex::new(Vec::new()));
6554 let handler_clone = Arc::clone(&handler);
6555 let clock = Rc::new(RefCell::new(TestClock::new()));
6556 clock.borrow_mut().set_time(UnixNanos::from(2_000_000_000));
6557 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6558
6559 let aggregator = TimeBarAggregator::new(
6560 bar_type,
6561 instrument.price_precision(),
6562 instrument.size_precision(),
6563 clock,
6564 move |bar: Bar| {
6565 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6566 h.push(bar);
6567 },
6568 false,
6569 false,
6570 BarIntervalType::LeftOpen,
6571 None,
6572 100, true, );
6575
6576 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6577 let rc = Rc::new(RefCell::new(boxed));
6578 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6579
6580 rc.borrow_mut().update(
6582 Price::from("100.00"),
6583 Quantity::from(1),
6584 UnixNanos::from(2_500_000_000),
6585 );
6586 rc.borrow_mut().build_bar(&TimeEvent::new(
6587 event_name,
6588 UUID4::new(),
6589 UnixNanos::from(3_000_100_000),
6590 UnixNanos::from(3_000_100_000),
6591 ));
6592 rc.borrow_mut().update(
6593 Price::from("101.00"),
6594 Quantity::from(1),
6595 UnixNanos::from(3_500_000_000),
6596 );
6597 rc.borrow_mut().build_bar(&TimeEvent::new(
6598 event_name,
6599 UUID4::new(),
6600 UnixNanos::from(4_000_100_000),
6601 UnixNanos::from(4_000_100_000),
6602 ));
6603
6604 let bars = handler.lock().expect(MUTEX_POISONED);
6605 assert_eq!(bars.len(), 1);
6606 assert_eq!(bars[0].close, Price::from("101.00"));
6607 }
6608
6609 #[rstest]
6610 #[case(
6611 BarAggregation::Month,
6612 1_735_689_600_000_000_000_u64,
6613 1_733_011_200_000_000_000_u64
6614 )]
6615 #[case(
6616 BarAggregation::Year,
6617 1_735_689_600_000_000_000_u64,
6618 1_704_067_200_000_000_000_u64
6619 )]
6620 fn test_time_bar_fire_immediately_month_year_stored_open_points_to_previous_period(
6621 equity_aapl: Equity,
6622 #[case] aggregation: BarAggregation,
6623 #[case] start_ns: u64,
6624 #[case] expected_stored_open_ns: u64,
6625 ) {
6626 let instrument = InstrumentAny::Equity(equity_aapl);
6631 let bar_spec = BarSpecification::new(1, aggregation, PriceType::Last);
6632 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6633 let handler = Arc::new(Mutex::new(Vec::new()));
6634 let handler_clone = Arc::clone(&handler);
6635 let clock = Rc::new(RefCell::new(TestClock::new()));
6636 clock.borrow_mut().set_time(UnixNanos::from(start_ns));
6637 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6638
6639 let aggregator = TimeBarAggregator::new(
6640 bar_type,
6641 instrument.price_precision(),
6642 instrument.size_precision(),
6643 clock,
6644 move |bar: Bar| {
6645 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6646 h.push(bar);
6647 },
6648 false,
6649 false,
6650 BarIntervalType::RightOpen, None,
6652 0,
6653 false, );
6655
6656 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6657 let rc = Rc::new(RefCell::new(boxed));
6658 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6659
6660 rc.borrow_mut().update(
6661 Price::from("100.00"),
6662 Quantity::from(1),
6663 UnixNanos::from(start_ns),
6664 );
6665 rc.borrow_mut().build_bar(&TimeEvent::new(
6666 event_name,
6667 UUID4::new(),
6668 UnixNanos::from(start_ns),
6669 UnixNanos::from(start_ns),
6670 ));
6671
6672 let bars = handler.lock().expect(MUTEX_POISONED);
6673 assert_eq!(bars.len(), 1);
6674 assert_eq!(bars[0].ts_event, UnixNanos::from(expected_stored_open_ns));
6675 assert_eq!(bars[0].ts_init, UnixNanos::from(start_ns));
6676 }
6677
6678 #[rstest]
6679 fn test_time_bar_historical_prevents_bars_for_timer_before_last_data(equity_aapl: Equity) {
6680 let instrument = InstrumentAny::Equity(equity_aapl);
6681 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6682 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6683 let handler = Arc::new(Mutex::new(Vec::new()));
6684 let handler_clone = Arc::clone(&handler);
6685 let clock = Rc::new(RefCell::new(TestClock::new()));
6686
6687 let mut agg = TimeBarAggregator::new(
6688 bar_type,
6689 instrument.price_precision(),
6690 instrument.size_precision(),
6691 clock.clone(),
6692 move |bar: Bar| {
6693 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6694 h.push(bar);
6695 },
6696 true,
6697 true,
6698 BarIntervalType::LeftOpen,
6699 None,
6700 0,
6701 false,
6702 );
6703 agg.historical_mode = true;
6704 agg.set_clock_internal(clock);
6705 let boxed: Box<dyn BarAggregator> = Box::new(agg);
6706 let rc = Rc::new(RefCell::new(boxed));
6707 rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
6708
6709 let ts1 = UnixNanos::from(2_000_000_000);
6710 rc.borrow_mut()
6711 .update(Price::from("100.00"), Quantity::from(1), ts1);
6712
6713 let ts2 = UnixNanos::from(3_000_000_000);
6714 rc.borrow_mut()
6715 .update(Price::from("101.00"), Quantity::from(1), ts2);
6716
6717 let bars = handler.lock().expect(MUTEX_POISONED);
6718 assert!(
6719 !bars.is_empty(),
6720 "advancing time from ts1 to ts2 should produce at least one bar"
6721 );
6722 assert_eq!(bars[0].close, Price::from("100.00"));
6723 }
6724}
6725
6726#[cfg(test)]
6727mod property_tests {
6728 use std::{
6729 cell::RefCell,
6730 rc::Rc,
6731 sync::{Arc, Mutex},
6732 };
6733
6734 use nautilus_common::{clock::TestClock, timer::TimeEvent};
6735 use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
6736 use nautilus_model::{
6737 data::{Bar, BarSpecification, BarType, bar::get_bar_interval_ns},
6738 enums::{AggregationSource, BarAggregation, BarIntervalType, PriceType},
6739 instruments::{Instrument, InstrumentAny, stubs::equity_aapl},
6740 types::{Price, Quantity},
6741 };
6742 use proptest::prelude::*;
6743 use rstest::rstest;
6744 use ustr::Ustr;
6745
6746 use super::*;
6747
6748 fn time_bar_spec_strategy() -> impl Strategy<Value = (BarAggregation, usize)> {
6749 prop_oneof![
6750 (Just(BarAggregation::Second), 1usize..=5),
6751 (Just(BarAggregation::Minute), 1usize..=5),
6752 (Just(BarAggregation::Hour), 1usize..=4),
6753 ]
6754 }
6755
6756 fn interval_type_strategy() -> impl Strategy<Value = BarIntervalType> {
6757 prop_oneof![
6758 Just(BarIntervalType::LeftOpen),
6759 Just(BarIntervalType::RightOpen),
6760 ]
6761 }
6762
6763 proptest! {
6764 #[rstest]
6765 fn prop_skip_first_drops_partial_then_emits(
6766 (aggregation, step) in time_bar_spec_strategy(),
6767 interval_type in interval_type_strategy(),
6768 skip_first in any::<bool>(),
6769 ) {
6770 let instrument = InstrumentAny::Equity(equity_aapl());
6771 let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
6772 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6773 let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
6774
6775 let now_ns = interval_ns + interval_ns / 2;
6778
6779 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6780 let handler_clone = Arc::clone(&handler);
6781 let clock = Rc::new(RefCell::new(TestClock::new()));
6782 clock.borrow_mut().set_time(UnixNanos::from(now_ns));
6783 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6784
6785 let aggregator = TimeBarAggregator::new(
6786 bar_type,
6787 instrument.price_precision(),
6788 instrument.size_precision(),
6789 clock,
6790 move |bar: Bar| {
6791 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6792 h.push(bar);
6793 },
6794 false,
6795 false,
6796 interval_type,
6797 None,
6798 0,
6799 skip_first,
6800 );
6801
6802 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6803 let rc = Rc::new(RefCell::new(boxed));
6804 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6805
6806 rc.borrow_mut().update(
6809 Price::from("100.00"),
6810 Quantity::from(1),
6811 UnixNanos::from(now_ns),
6812 );
6813 let first_close = 2 * interval_ns;
6814 rc.borrow_mut().build_bar(&TimeEvent::new(
6815 event_name,
6816 UUID4::new(),
6817 UnixNanos::from(first_close),
6818 UnixNanos::from(first_close),
6819 ));
6820
6821 rc.borrow_mut().update(
6823 Price::from("101.00"),
6824 Quantity::from(1),
6825 UnixNanos::from(first_close + interval_ns / 2),
6826 );
6827 let second_close = first_close + interval_ns;
6828 rc.borrow_mut().build_bar(&TimeEvent::new(
6829 event_name,
6830 UUID4::new(),
6831 UnixNanos::from(second_close),
6832 UnixNanos::from(second_close),
6833 ));
6834
6835 let bars = handler.lock().expect(MUTEX_POISONED);
6836 let expected = if skip_first { 1 } else { 2 };
6837 prop_assert_eq!(bars.len(), expected);
6838 prop_assert_eq!(bars.last().unwrap().close, Price::from("101.00"));
6839 for bar in bars.iter() {
6840 prop_assert!(bar.high >= bar.open);
6841 prop_assert!(bar.high >= bar.close);
6842 prop_assert!(bar.low <= bar.open);
6843 prop_assert!(bar.low <= bar.close);
6844 }
6845 }
6846
6847 #[rstest]
6848 fn prop_skip_first_noop_on_exact_boundary(
6849 (aggregation, step) in time_bar_spec_strategy(),
6850 interval_type in interval_type_strategy(),
6851 ) {
6852 let instrument = InstrumentAny::Equity(equity_aapl());
6853 let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
6854 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6855 let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
6856
6857 let now_ns = interval_ns;
6860 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6861 let handler_clone = Arc::clone(&handler);
6862 let clock = Rc::new(RefCell::new(TestClock::new()));
6863 clock.borrow_mut().set_time(UnixNanos::from(now_ns));
6864 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6865
6866 let aggregator = TimeBarAggregator::new(
6867 bar_type,
6868 instrument.price_precision(),
6869 instrument.size_precision(),
6870 clock,
6871 move |bar: Bar| {
6872 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6873 h.push(bar);
6874 },
6875 false,
6876 false,
6877 interval_type,
6878 None,
6879 0,
6880 true, );
6882
6883 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6884 let rc = Rc::new(RefCell::new(boxed));
6885 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6886
6887 rc.borrow_mut().update(
6888 Price::from("100.00"),
6889 Quantity::from(1),
6890 UnixNanos::from(now_ns),
6891 );
6892 let next_close = now_ns + interval_ns;
6893 rc.borrow_mut().build_bar(&TimeEvent::new(
6894 event_name,
6895 UUID4::new(),
6896 UnixNanos::from(next_close),
6897 UnixNanos::from(next_close),
6898 ));
6899
6900 let bars = handler.lock().expect(MUTEX_POISONED);
6901 prop_assert_eq!(bars.len(), 1);
6902 prop_assert_eq!(bars[0].close, Price::from("100.00"));
6903 }
6904
6905 #[rstest]
6906 fn prop_bar_builder_ohlc_invariants(
6907 updates in prop::collection::vec((1i64..=100_000i64, 1u64..=1_000u64), 1..=50),
6908 ) {
6909 let instrument = InstrumentAny::Equity(equity_aapl());
6910 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
6911 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6912 let mut builder = BarBuilder::new(bar_type, 2, 0);
6913
6914 let mut total_volume: u64 = 0;
6915
6916 for (i, (price_cents, size)) in updates.iter().enumerate() {
6917 let price = Price::new((*price_cents as f64) / 100.0, 2);
6918 let qty = Quantity::new(*size as f64, 0);
6919 let ts = UnixNanos::from((i as u64 + 1) * 1_000);
6920 total_volume += *size;
6921 builder.update(price, qty, ts);
6922 }
6923
6924 let bar = builder.build_now();
6925 prop_assert!(bar.low <= bar.open);
6926 prop_assert!(bar.low <= bar.close);
6927 prop_assert!(bar.high >= bar.open);
6928 prop_assert!(bar.high >= bar.close);
6929 prop_assert!(bar.low <= bar.high);
6930 prop_assert_eq!(bar.volume.as_f64(), total_volume as f64);
6931 }
6932
6933 #[rstest]
6934 fn prop_tick_bar_aggregator_volume_conservation(
6935 ticks in prop::collection::vec((1i64..=1_000i64, 1u64..=100u64), 3..=60),
6936 step in 1usize..=5,
6937 ) {
6938 let instrument = InstrumentAny::Equity(equity_aapl());
6939 let bar_spec = BarSpecification::new(step, BarAggregation::Tick, PriceType::Last);
6940 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6941 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6942 let handler_clone = Arc::clone(&handler);
6943
6944 let mut aggregator = TickBarAggregator::new(
6945 bar_type,
6946 instrument.price_precision(),
6947 instrument.size_precision(),
6948 move |bar: Bar| {
6949 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
6950 },
6951 );
6952
6953 let mut total_input: u64 = 0;
6954
6955 for (i, (price_cents, size)) in ticks.iter().enumerate() {
6956 let price = Price::new((*price_cents as f64) / 100.0, 2);
6957 let qty = Quantity::new(*size as f64, 0);
6958 aggregator.update(price, qty, UnixNanos::from((i as u64 + 1) * 1_000));
6959 total_input += *size;
6960 }
6961
6962 let bars = handler.lock().expect(MUTEX_POISONED);
6963 let emitted_count = bars.len();
6964 prop_assert_eq!(emitted_count, ticks.len() / step);
6965
6966 let mut sum_emitted: f64 = 0.0;
6967
6968 for bar in bars.iter() {
6969 prop_assert!(bar.low <= bar.open);
6970 prop_assert!(bar.low <= bar.close);
6971 prop_assert!(bar.high >= bar.open);
6972 prop_assert!(bar.high >= bar.close);
6973 sum_emitted += bar.volume.as_f64();
6974 }
6975
6976 let pending_size: u64 = ticks.iter()
6978 .skip(emitted_count * step)
6979 .map(|(_, s)| *s)
6980 .sum();
6981 prop_assert!((sum_emitted + pending_size as f64 - total_input as f64).abs() < 1e-6);
6982 }
6983
6984 #[rstest]
6985 fn prop_volume_bar_aggregator_conservation(
6986 sizes in prop::collection::vec(1u64..=50u64, 3..=40),
6987 step in 2u64..=10u64,
6988 ) {
6989 let instrument = InstrumentAny::Equity(equity_aapl());
6990 let bar_spec = BarSpecification::new(step as usize, BarAggregation::Volume, PriceType::Last);
6991 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6992 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6993 let handler_clone = Arc::clone(&handler);
6994
6995 let mut aggregator = VolumeBarAggregator::new(
6996 bar_type,
6997 instrument.price_precision(),
6998 instrument.size_precision(),
6999 move |bar: Bar| {
7000 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
7001 },
7002 );
7003
7004 let mut total_input: u64 = 0;
7005
7006 for (i, size) in sizes.iter().enumerate() {
7007 aggregator.update(
7008 Price::from("100.00"),
7009 Quantity::new(*size as f64, 0),
7010 UnixNanos::from((i as u64 + 1) * 1_000),
7011 );
7012 total_input += *size;
7013 }
7014
7015 let bars = handler.lock().expect(MUTEX_POISONED);
7016
7017 for bar in bars.iter() {
7019 prop_assert_eq!(bar.volume, Quantity::from(step));
7020 prop_assert!(bar.low <= bar.open);
7021 prop_assert!(bar.low <= bar.close);
7022 prop_assert!(bar.high >= bar.open);
7023 prop_assert!(bar.high >= bar.close);
7024 }
7025
7026 let emitted_total: u64 = bars.len() as u64 * step;
7028 let pending = aggregator.core.builder.volume.as_f64();
7029 prop_assert!((emitted_total as f64 + pending - total_input as f64).abs() < 1e-6);
7030 }
7031
7032 #[rstest]
7033 fn prop_bar_builder_spread_adjustment_is_additive(
7034 updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7035 spread_cents in -10_000i64..=10_000i64,
7036 backward in any::<bool>(),
7037 ) {
7038 let instrument = InstrumentAny::Equity(equity_aapl());
7039 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7040 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7041 let mut builder = BarBuilder::new(bar_type, 2, 0);
7042
7043 let spread = Decimal::new(spread_cents, 2);
7044 let mode = if backward {
7045 ContinuousFutureAdjustmentType::BackwardSpread
7046 } else {
7047 ContinuousFutureAdjustmentType::ForwardSpread
7048 };
7049 builder.set_adjustment(spread, mode);
7050
7051 let mut min_cents = i64::MAX;
7052 let mut max_cents = i64::MIN;
7053
7054 for (i, (price_cents, size)) in updates.iter().enumerate() {
7055 if *price_cents < min_cents {
7056 min_cents = *price_cents;
7057 }
7058
7059 if *price_cents > max_cents {
7060 max_cents = *price_cents;
7061 }
7062
7063 builder.update(
7064 Price::new((*price_cents as f64) / 100.0, 2),
7065 Quantity::new(*size as f64, 0),
7066 UnixNanos::from((i as u64 + 1) * 1_000),
7067 );
7068 }
7069
7070 let bar = builder.build_now();
7071 let first_decimal = Decimal::new(updates.first().unwrap().0, 2);
7072 let last_decimal = Decimal::new(updates.last().unwrap().0, 2);
7073 let min_decimal = Decimal::new(min_cents, 2);
7074 let max_decimal = Decimal::new(max_cents, 2);
7075
7076 prop_assert_eq!(bar.open.as_decimal(), first_decimal + spread);
7077 prop_assert_eq!(bar.close.as_decimal(), last_decimal + spread);
7078 prop_assert_eq!(bar.low.as_decimal(), min_decimal + spread);
7079 prop_assert_eq!(bar.high.as_decimal(), max_decimal + spread);
7080 }
7081
7082 #[rstest]
7083 fn prop_bar_builder_inactive_adjustment_is_identity(
7084 updates in prop::collection::vec((1i64..=100_000i64, 1u64..=1_000u64), 1..=20),
7085 use_ratio in any::<bool>(),
7086 ) {
7087 let instrument = InstrumentAny::Equity(equity_aapl());
7088 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7089 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7090
7091 let mut adjusted = BarBuilder::new(bar_type, 2, 0);
7092 let mut baseline = BarBuilder::new(bar_type, 2, 0);
7093
7094 let (input, mode) = if use_ratio {
7096 (Decimal::ONE, ContinuousFutureAdjustmentType::BackwardRatio)
7097 } else {
7098 (Decimal::ZERO, ContinuousFutureAdjustmentType::BackwardSpread)
7099 };
7100 adjusted.set_adjustment(input, mode);
7101
7102 for (i, (price_cents, size)) in updates.iter().enumerate() {
7103 let price = Price::new((*price_cents as f64) / 100.0, 2);
7104 let qty = Quantity::new(*size as f64, 0);
7105 let ts = UnixNanos::from((i as u64 + 1) * 1_000);
7106 adjusted.update(price, qty, ts);
7107 baseline.update(price, qty, ts);
7108 }
7109
7110 let bar_adjusted = adjusted.build_now();
7111 let bar_baseline = baseline.build_now();
7112 prop_assert_eq!(bar_adjusted.open, bar_baseline.open);
7113 prop_assert_eq!(bar_adjusted.high, bar_baseline.high);
7114 prop_assert_eq!(bar_adjusted.low, bar_baseline.low);
7115 prop_assert_eq!(bar_adjusted.close, bar_baseline.close);
7116 prop_assert_eq!(bar_adjusted.volume, bar_baseline.volume);
7117 }
7118
7119 #[rstest]
7120 fn prop_bar_builder_spread_preserves_raw_arithmetic(
7121 updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7122 spread_micro in -10_000i64..=10_000i64,
7125 ) {
7126 let instrument = InstrumentAny::Equity(equity_aapl());
7127 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7128 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7129 let mut builder = BarBuilder::new(bar_type, 2, 0);
7130
7131 let spread = Decimal::new(spread_micro, 4);
7132 builder.set_adjustment(spread, ContinuousFutureAdjustmentType::BackwardSpread);
7133
7134 let adjustment_raw_i128 = mantissa_exponent_to_fixed_i128(
7135 spread.mantissa(),
7136 -(spread.scale() as i8),
7137 FIXED_PRECISION,
7138 )
7139 .expect("scale within range");
7140 #[allow(
7141 clippy::useless_conversion,
7142 reason = "i128 to PriceRaw is real when not high-precision"
7143 )]
7144 let expected_adjustment_raw: PriceRaw =
7145 adjustment_raw_i128.try_into().expect("within PriceRaw range");
7146
7147 let mut min_cents = i64::MAX;
7148 let mut max_cents = i64::MIN;
7149 let mut last_price = Price::new(0.0, 2);
7150 let mut first_price = Price::new(0.0, 2);
7151
7152 for (i, (price_cents, size)) in updates.iter().enumerate() {
7153 if *price_cents < min_cents {
7154 min_cents = *price_cents;
7155 }
7156
7157 if *price_cents > max_cents {
7158 max_cents = *price_cents;
7159 }
7160
7161 let price = Price::new((*price_cents as f64) / 100.0, 2);
7162
7163 if i == 0 {
7164 first_price = price;
7165 }
7166
7167 last_price = price;
7168 builder.update(
7169 price,
7170 Quantity::new(*size as f64, 0),
7171 UnixNanos::from((i as u64 + 1) * 1_000),
7172 );
7173 }
7174
7175 let bar = builder.build_now();
7176 let min_price = Price::new((min_cents as f64) / 100.0, 2);
7177 let max_price = Price::new((max_cents as f64) / 100.0, 2);
7178 prop_assert_eq!(bar.open.raw, first_price.raw + expected_adjustment_raw);
7179 prop_assert_eq!(bar.close.raw, last_price.raw + expected_adjustment_raw);
7180 prop_assert_eq!(bar.low.raw, min_price.raw + expected_adjustment_raw);
7181 prop_assert_eq!(bar.high.raw, max_price.raw + expected_adjustment_raw);
7182 prop_assert_eq!(bar.open.precision, 2);
7183 prop_assert_eq!(bar.high.precision, 2);
7184 prop_assert_eq!(bar.low.precision, 2);
7185 prop_assert_eq!(bar.close.precision, 2);
7186 }
7187
7188 #[rstest]
7189 fn prop_bar_builder_active_ratio_scales_each_ohlc(
7190 updates in prop::collection::vec((1_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7191 ratio_centi in prop_oneof![50i64..=99i64, 101i64..=200i64],
7193 backward in any::<bool>(),
7194 ) {
7195 let instrument = InstrumentAny::Equity(equity_aapl());
7196 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7197 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7198 let mut builder = BarBuilder::new(bar_type, 2, 0);
7199
7200 let ratio_decimal = Decimal::new(ratio_centi, 2);
7201 let ratio_f64 = (ratio_centi as f64) / 100.0;
7202 let mode = if backward {
7203 ContinuousFutureAdjustmentType::BackwardRatio
7204 } else {
7205 ContinuousFutureAdjustmentType::ForwardRatio
7206 };
7207 builder.set_adjustment(ratio_decimal, mode);
7208
7209 let mut min_cents = i64::MAX;
7210 let mut max_cents = i64::MIN;
7211 let mut first_cents = 0i64;
7212 let mut last_cents = 0i64;
7213
7214 for (i, (price_cents, size)) in updates.iter().enumerate() {
7215 if *price_cents < min_cents {
7216 min_cents = *price_cents;
7217 }
7218
7219 if *price_cents > max_cents {
7220 max_cents = *price_cents;
7221 }
7222
7223 if i == 0 {
7224 first_cents = *price_cents;
7225 }
7226
7227 last_cents = *price_cents;
7228 builder.update(
7229 Price::new((*price_cents as f64) / 100.0, 2),
7230 Quantity::new(*size as f64, 0),
7231 UnixNanos::from((i as u64 + 1) * 1_000),
7232 );
7233 }
7234
7235 let bar = builder.build_now();
7236 let expect = |cents: i64| Price::new((cents as f64) / 100.0 * ratio_f64, 2);
7238 prop_assert_eq!(bar.open, expect(first_cents));
7239 prop_assert_eq!(bar.close, expect(last_cents));
7240 prop_assert_eq!(bar.low, expect(min_cents));
7242 prop_assert_eq!(bar.high, expect(max_cents));
7243 }
7244
7245 #[rstest]
7246 fn prop_bar_builder_spread_mode_direction_is_metadata_only(
7247 updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7248 spread_cents in -10_000i64..=10_000i64,
7249 ) {
7250 let instrument = InstrumentAny::Equity(equity_aapl());
7251 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7252 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7253
7254 let spread = Decimal::new(spread_cents, 2);
7255 let mut backward = BarBuilder::new(bar_type, 2, 0);
7256 let mut forward = BarBuilder::new(bar_type, 2, 0);
7257 backward.set_adjustment(spread, ContinuousFutureAdjustmentType::BackwardSpread);
7258 forward.set_adjustment(spread, ContinuousFutureAdjustmentType::ForwardSpread);
7259
7260 for (i, (price_cents, size)) in updates.iter().enumerate() {
7261 let price = Price::new((*price_cents as f64) / 100.0, 2);
7262 let qty = Quantity::new(*size as f64, 0);
7263 let ts = UnixNanos::from((i as u64 + 1) * 1_000);
7264 backward.update(price, qty, ts);
7265 forward.update(price, qty, ts);
7266 }
7267
7268 let bar_backward = backward.build_now();
7269 let bar_forward = forward.build_now();
7270 prop_assert_eq!(bar_backward.open, bar_forward.open);
7271 prop_assert_eq!(bar_backward.high, bar_forward.high);
7272 prop_assert_eq!(bar_backward.low, bar_forward.low);
7273 prop_assert_eq!(bar_backward.close, bar_forward.close);
7274 }
7275
7276 #[rstest]
7277 fn prop_value_bar_aggregator_ohlc_invariants(
7278 ticks in prop::collection::vec((50i64..=500i64, 1u64..=20u64), 2..=30),
7279 step in 100u64..=2_000u64,
7280 ) {
7281 let instrument = InstrumentAny::Equity(equity_aapl());
7282 let bar_spec = BarSpecification::new(step as usize, BarAggregation::Value, PriceType::Last);
7283 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7284 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
7285 let handler_clone = Arc::clone(&handler);
7286
7287 let mut aggregator = ValueBarAggregator::new(
7288 bar_type,
7289 instrument.price_precision(),
7290 instrument.size_precision(),
7291 move |bar: Bar| {
7292 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
7293 },
7294 );
7295
7296 for (i, (price_cents, size)) in ticks.iter().enumerate() {
7297 aggregator.update(
7298 Price::new((*price_cents as f64) / 100.0, 2),
7299 Quantity::new(*size as f64, 0),
7300 UnixNanos::from((i as u64 + 1) * 1_000),
7301 );
7302 }
7303
7304 let bars = handler.lock().expect(MUTEX_POISONED);
7305 for bar in bars.iter() {
7306 prop_assert!(bar.low <= bar.open);
7307 prop_assert!(bar.low <= bar.close);
7308 prop_assert!(bar.high >= bar.open);
7309 prop_assert!(bar.high >= bar.close);
7310 prop_assert!(bar.volume.as_f64() > 0.0);
7311 }
7312 }
7313 }
7314}