1use std::{
22 any::Any,
23 cell::RefCell,
24 fmt::Debug,
25 ops::Add,
26 rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31 clock::{Clock, TestClock},
32 timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35 UnixNanos,
36 correctness::{self, FAILED},
37 datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40 data::{
41 QuoteTick, TradeTick,
42 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43 },
44 enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45 types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48type BarHandler = Box<dyn FnMut(Bar)>;
50
51pub trait BarAggregator: Any + Debug {
55 fn bar_type(&self) -> BarType;
57 fn is_running(&self) -> bool;
59 fn set_is_running(&mut self, value: bool);
61 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63 fn handle_quote(&mut self, quote: QuoteTick) {
65 let spec = self.bar_type().spec();
66 self.update(
67 quote.extract_price(spec.price_type),
68 quote.extract_size(spec.price_type),
69 quote.ts_init,
70 );
71 }
72 fn handle_trade(&mut self, trade: TradeTick) {
74 self.update(trade.price, trade.size, trade.ts_init);
75 }
76 fn handle_bar(&mut self, bar: Bar) {
78 self.update_bar(bar, bar.volume, bar.ts_init);
79 }
80 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81 fn stop(&mut self) {}
83 fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85 fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87 fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89 fn build_bar(&mut self, _event: TimeEvent) {}
91 fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95 fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101 pub fn as_any(&self) -> &dyn Any {
103 self
104 }
105 pub fn as_any_mut(&mut self) -> &mut dyn Any {
107 self
108 }
109}
110
111#[derive(Debug)]
113pub struct BarBuilder {
114 bar_type: BarType,
115 price_precision: u8,
116 size_precision: u8,
117 initialized: bool,
118 ts_last: UnixNanos,
119 count: usize,
120 last_close: Option<Price>,
121 open: Option<Price>,
122 high: Option<Price>,
123 low: Option<Price>,
124 close: Option<Price>,
125 volume: Quantity,
126}
127
128impl BarBuilder {
129 #[must_use]
135 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
136 correctness::check_equal(
137 &bar_type.aggregation_source(),
138 &AggregationSource::Internal,
139 "bar_type.aggregation_source",
140 "AggregationSource::Internal",
141 )
142 .expect(FAILED);
143
144 Self {
145 bar_type,
146 price_precision,
147 size_precision,
148 initialized: false,
149 ts_last: UnixNanos::default(),
150 count: 0,
151 last_close: None,
152 open: None,
153 high: None,
154 low: None,
155 close: None,
156 volume: Quantity::zero(size_precision),
157 }
158 }
159
160 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
166 if ts_init < self.ts_last {
167 return; }
169
170 if self.open.is_none() {
171 self.open = Some(price);
172 self.high = Some(price);
173 self.low = Some(price);
174 self.initialized = true;
175 } else {
176 if price > self.high.unwrap() {
177 self.high = Some(price);
178 }
179
180 if price < self.low.unwrap() {
181 self.low = Some(price);
182 }
183 }
184
185 self.close = Some(price);
186 self.volume = self.volume.add(size);
187 self.count += 1;
188 self.ts_last = ts_init;
189 }
190
191 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
197 if ts_init < self.ts_last {
198 return; }
200
201 if self.open.is_none() {
202 self.open = Some(bar.open);
203 self.high = Some(bar.high);
204 self.low = Some(bar.low);
205 self.initialized = true;
206 } else {
207 if bar.high > self.high.unwrap() {
208 self.high = Some(bar.high);
209 }
210
211 if bar.low < self.low.unwrap() {
212 self.low = Some(bar.low);
213 }
214 }
215
216 self.close = Some(bar.close);
217 self.volume = self.volume.add(volume);
218 self.count += 1;
219 self.ts_last = ts_init;
220 }
221
222 pub fn reset(&mut self) {
226 self.open = None;
227 self.high = None;
228 self.low = None;
229 self.volume = Quantity::zero(self.size_precision);
230 self.count = 0;
231 }
232
233 pub fn build_now(&mut self) -> Bar {
235 self.build(self.ts_last, self.ts_last)
236 }
237
238 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
244 if self.open.is_none() {
245 self.open = self.last_close;
246 self.high = self.last_close;
247 self.low = self.last_close;
248 self.close = self.last_close;
249 }
250
251 if let (Some(close), Some(low)) = (self.close, self.low)
252 && close < low
253 {
254 self.low = Some(close);
255 }
256
257 if let (Some(close), Some(high)) = (self.close, self.high)
258 && close > high
259 {
260 self.high = Some(close);
261 }
262
263 let bar = Bar::new(
265 self.bar_type,
266 self.open.unwrap(),
267 self.high.unwrap(),
268 self.low.unwrap(),
269 self.close.unwrap(),
270 self.volume,
271 ts_event,
272 ts_init,
273 );
274
275 self.last_close = self.close;
276 self.reset();
277 bar
278 }
279}
280
281pub struct BarAggregatorCore {
283 bar_type: BarType,
284 builder: BarBuilder,
285 handler: BarHandler,
286 is_running: bool,
287}
288
289impl Debug for BarAggregatorCore {
290 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291 f.debug_struct(stringify!(BarAggregatorCore))
292 .field("bar_type", &self.bar_type)
293 .field("builder", &self.builder)
294 .field("is_running", &self.is_running)
295 .finish()
296 }
297}
298
299impl BarAggregatorCore {
300 pub fn new<H: FnMut(Bar) + 'static>(
306 bar_type: BarType,
307 price_precision: u8,
308 size_precision: u8,
309 handler: H,
310 ) -> Self {
311 Self {
312 bar_type,
313 builder: BarBuilder::new(bar_type, price_precision, size_precision),
314 handler: Box::new(handler),
315 is_running: false,
316 }
317 }
318
319 pub const fn set_is_running(&mut self, value: bool) {
321 self.is_running = value;
322 }
323 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
324 self.builder.update(price, size, ts_init);
325 }
326
327 fn build_now_and_send(&mut self) {
328 let bar = self.builder.build_now();
329 (self.handler)(bar);
330 }
331
332 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
333 let bar = self.builder.build(ts_event, ts_init);
334 (self.handler)(bar);
335 }
336}
337
338pub struct TickBarAggregator {
343 core: BarAggregatorCore,
344}
345
346impl Debug for TickBarAggregator {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 f.debug_struct(stringify!(TickBarAggregator))
349 .field("core", &self.core)
350 .finish()
351 }
352}
353
354impl TickBarAggregator {
355 pub fn new<H: FnMut(Bar) + 'static>(
361 bar_type: BarType,
362 price_precision: u8,
363 size_precision: u8,
364 handler: H,
365 ) -> Self {
366 Self {
367 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
368 }
369 }
370}
371
372impl BarAggregator for TickBarAggregator {
373 fn bar_type(&self) -> BarType {
374 self.core.bar_type
375 }
376
377 fn is_running(&self) -> bool {
378 self.core.is_running
379 }
380
381 fn set_is_running(&mut self, value: bool) {
382 self.core.set_is_running(value);
383 }
384
385 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
387 self.core.apply_update(price, size, ts_init);
388 let spec = self.core.bar_type.spec();
389
390 if self.core.builder.count >= spec.step.get() {
391 self.core.build_now_and_send();
392 }
393 }
394
395 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
396 self.core.builder.update_bar(bar, volume, ts_init);
397 let spec = self.core.bar_type.spec();
398
399 if self.core.builder.count >= spec.step.get() {
400 self.core.build_now_and_send();
401 }
402 }
403}
404
405pub struct TickImbalanceBarAggregator {
410 core: BarAggregatorCore,
411 imbalance: isize,
412}
413
414impl Debug for TickImbalanceBarAggregator {
415 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416 f.debug_struct(stringify!(TickImbalanceBarAggregator))
417 .field("core", &self.core)
418 .field("imbalance", &self.imbalance)
419 .finish()
420 }
421}
422
423impl TickImbalanceBarAggregator {
424 pub fn new<H: FnMut(Bar) + 'static>(
430 bar_type: BarType,
431 price_precision: u8,
432 size_precision: u8,
433 handler: H,
434 ) -> Self {
435 Self {
436 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
437 imbalance: 0,
438 }
439 }
440}
441
442impl BarAggregator for TickImbalanceBarAggregator {
443 fn bar_type(&self) -> BarType {
444 self.core.bar_type
445 }
446
447 fn is_running(&self) -> bool {
448 self.core.is_running
449 }
450
451 fn set_is_running(&mut self, value: bool) {
452 self.core.set_is_running(value);
453 }
454
455 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
460 self.core.apply_update(price, size, ts_init);
461 }
462
463 fn handle_trade(&mut self, trade: TradeTick) {
464 self.core
465 .apply_update(trade.price, trade.size, trade.ts_init);
466
467 let delta = match trade.aggressor_side {
468 AggressorSide::Buyer => 1,
469 AggressorSide::Seller => -1,
470 AggressorSide::NoAggressor => 0,
471 };
472
473 if delta == 0 {
474 return;
475 }
476
477 self.imbalance += delta;
478 let threshold = self.core.bar_type.spec().step.get();
479 if self.imbalance.unsigned_abs() >= threshold {
480 self.core.build_now_and_send();
481 self.imbalance = 0;
482 }
483 }
484
485 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
486 self.core.builder.update_bar(bar, volume, ts_init);
487 }
488}
489
490pub struct TickRunsBarAggregator {
492 core: BarAggregatorCore,
493 current_run_side: Option<AggressorSide>,
494 run_count: usize,
495}
496
497impl Debug for TickRunsBarAggregator {
498 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499 f.debug_struct(stringify!(TickRunsBarAggregator))
500 .field("core", &self.core)
501 .field("current_run_side", &self.current_run_side)
502 .field("run_count", &self.run_count)
503 .finish()
504 }
505}
506
507impl TickRunsBarAggregator {
508 pub fn new<H: FnMut(Bar) + 'static>(
514 bar_type: BarType,
515 price_precision: u8,
516 size_precision: u8,
517 handler: H,
518 ) -> Self {
519 Self {
520 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
521 current_run_side: None,
522 run_count: 0,
523 }
524 }
525}
526
527impl BarAggregator for TickRunsBarAggregator {
528 fn bar_type(&self) -> BarType {
529 self.core.bar_type
530 }
531
532 fn is_running(&self) -> bool {
533 self.core.is_running
534 }
535
536 fn set_is_running(&mut self, value: bool) {
537 self.core.set_is_running(value);
538 }
539
540 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
545 self.core.apply_update(price, size, ts_init);
546 }
547
548 fn handle_trade(&mut self, trade: TradeTick) {
549 let side = match trade.aggressor_side {
550 AggressorSide::Buyer => Some(AggressorSide::Buyer),
551 AggressorSide::Seller => Some(AggressorSide::Seller),
552 AggressorSide::NoAggressor => None,
553 };
554
555 if let Some(side) = side {
556 if self.current_run_side != Some(side) {
557 self.current_run_side = Some(side);
558 self.run_count = 0;
559 self.core.builder.reset();
560 }
561
562 self.core
563 .apply_update(trade.price, trade.size, trade.ts_init);
564 self.run_count += 1;
565
566 let threshold = self.core.bar_type.spec().step.get();
567 if self.run_count >= threshold {
568 self.core.build_now_and_send();
569 self.run_count = 0;
570 self.current_run_side = None;
571 }
572 } else {
573 self.core
574 .apply_update(trade.price, trade.size, trade.ts_init);
575 }
576 }
577
578 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
579 self.core.builder.update_bar(bar, volume, ts_init);
580 }
581}
582
583pub struct VolumeBarAggregator {
585 core: BarAggregatorCore,
586}
587
588impl Debug for VolumeBarAggregator {
589 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
590 f.debug_struct(stringify!(VolumeBarAggregator))
591 .field("core", &self.core)
592 .finish()
593 }
594}
595
596impl VolumeBarAggregator {
597 pub fn new<H: FnMut(Bar) + 'static>(
603 bar_type: BarType,
604 price_precision: u8,
605 size_precision: u8,
606 handler: H,
607 ) -> Self {
608 Self {
609 core: BarAggregatorCore::new(
610 bar_type.standard(),
611 price_precision,
612 size_precision,
613 handler,
614 ),
615 }
616 }
617}
618
619impl BarAggregator for VolumeBarAggregator {
620 fn bar_type(&self) -> BarType {
621 self.core.bar_type
622 }
623
624 fn is_running(&self) -> bool {
625 self.core.is_running
626 }
627
628 fn set_is_running(&mut self, value: bool) {
629 self.core.set_is_running(value);
630 }
631
632 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
634 let mut raw_size_update = size.raw;
635 let spec = self.core.bar_type.spec();
636 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
637
638 while raw_size_update > 0 {
639 if self.core.builder.volume.raw + raw_size_update < raw_step {
640 self.core.apply_update(
641 price,
642 Quantity::from_raw(raw_size_update, size.precision),
643 ts_init,
644 );
645 break;
646 }
647
648 let raw_size_diff = raw_step - self.core.builder.volume.raw;
649 self.core.apply_update(
650 price,
651 Quantity::from_raw(raw_size_diff, size.precision),
652 ts_init,
653 );
654
655 self.core.build_now_and_send();
656 raw_size_update -= raw_size_diff;
657 }
658 }
659
660 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
661 let mut raw_volume_update = volume.raw;
662 let spec = self.core.bar_type.spec();
663 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
664
665 while raw_volume_update > 0 {
666 if self.core.builder.volume.raw + raw_volume_update < raw_step {
667 self.core.builder.update_bar(
668 bar,
669 Quantity::from_raw(raw_volume_update, volume.precision),
670 ts_init,
671 );
672 break;
673 }
674
675 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
676 self.core.builder.update_bar(
677 bar,
678 Quantity::from_raw(raw_volume_diff, volume.precision),
679 ts_init,
680 );
681
682 self.core.build_now_and_send();
683 raw_volume_update -= raw_volume_diff;
684 }
685 }
686}
687
688pub struct VolumeImbalanceBarAggregator {
690 core: BarAggregatorCore,
691 imbalance_raw: i128,
692 raw_step: i128,
693}
694
695impl Debug for VolumeImbalanceBarAggregator {
696 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697 f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
698 .field("core", &self.core)
699 .field("imbalance_raw", &self.imbalance_raw)
700 .field("raw_step", &self.raw_step)
701 .finish()
702 }
703}
704
705impl VolumeImbalanceBarAggregator {
706 pub fn new<H: FnMut(Bar) + 'static>(
712 bar_type: BarType,
713 price_precision: u8,
714 size_precision: u8,
715 handler: H,
716 ) -> Self {
717 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
718 Self {
719 core: BarAggregatorCore::new(
720 bar_type.standard(),
721 price_precision,
722 size_precision,
723 handler,
724 ),
725 imbalance_raw: 0,
726 raw_step,
727 }
728 }
729}
730
731impl BarAggregator for VolumeImbalanceBarAggregator {
732 fn bar_type(&self) -> BarType {
733 self.core.bar_type
734 }
735
736 fn is_running(&self) -> bool {
737 self.core.is_running
738 }
739
740 fn set_is_running(&mut self, value: bool) {
741 self.core.set_is_running(value);
742 }
743
744 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
749 self.core.apply_update(price, size, ts_init);
750 }
751
752 fn handle_trade(&mut self, trade: TradeTick) {
753 let side = match trade.aggressor_side {
754 AggressorSide::Buyer => 1,
755 AggressorSide::Seller => -1,
756 AggressorSide::NoAggressor => {
757 self.core
758 .apply_update(trade.price, trade.size, trade.ts_init);
759 return;
760 }
761 };
762
763 let mut raw_remaining = trade.size.raw as i128;
764 while raw_remaining > 0 {
765 let imbalance_abs = self.imbalance_raw.abs();
766 let needed = (self.raw_step - imbalance_abs).max(1);
767 let raw_chunk = raw_remaining.min(needed);
768 let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
769
770 self.core
771 .apply_update(trade.price, qty_chunk, trade.ts_init);
772
773 self.imbalance_raw += side * raw_chunk;
774 raw_remaining -= raw_chunk;
775
776 if self.imbalance_raw.abs() >= self.raw_step {
777 self.core.build_now_and_send();
778 self.imbalance_raw = 0;
779 }
780 }
781 }
782
783 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
784 self.core.builder.update_bar(bar, volume, ts_init);
785 }
786}
787
788pub struct VolumeRunsBarAggregator {
790 core: BarAggregatorCore,
791 current_run_side: Option<AggressorSide>,
792 run_volume_raw: QuantityRaw,
793 raw_step: QuantityRaw,
794}
795
796impl Debug for VolumeRunsBarAggregator {
797 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
798 f.debug_struct(stringify!(VolumeRunsBarAggregator))
799 .field("core", &self.core)
800 .field("current_run_side", &self.current_run_side)
801 .field("run_volume_raw", &self.run_volume_raw)
802 .field("raw_step", &self.raw_step)
803 .finish()
804 }
805}
806
807impl VolumeRunsBarAggregator {
808 pub fn new<H: FnMut(Bar) + 'static>(
814 bar_type: BarType,
815 price_precision: u8,
816 size_precision: u8,
817 handler: H,
818 ) -> Self {
819 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
820 Self {
821 core: BarAggregatorCore::new(
822 bar_type.standard(),
823 price_precision,
824 size_precision,
825 handler,
826 ),
827 current_run_side: None,
828 run_volume_raw: 0,
829 raw_step,
830 }
831 }
832}
833
834impl BarAggregator for VolumeRunsBarAggregator {
835 fn bar_type(&self) -> BarType {
836 self.core.bar_type
837 }
838
839 fn is_running(&self) -> bool {
840 self.core.is_running
841 }
842
843 fn set_is_running(&mut self, value: bool) {
844 self.core.set_is_running(value);
845 }
846
847 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
852 self.core.apply_update(price, size, ts_init);
853 }
854
855 fn handle_trade(&mut self, trade: TradeTick) {
856 let side = match trade.aggressor_side {
857 AggressorSide::Buyer => Some(AggressorSide::Buyer),
858 AggressorSide::Seller => Some(AggressorSide::Seller),
859 AggressorSide::NoAggressor => None,
860 };
861
862 let Some(side) = side else {
863 self.core
864 .apply_update(trade.price, trade.size, trade.ts_init);
865 return;
866 };
867
868 if self.current_run_side != Some(side) {
869 self.current_run_side = Some(side);
870 self.run_volume_raw = 0;
871 self.core.builder.reset();
872 }
873
874 let mut raw_remaining = trade.size.raw;
875 while raw_remaining > 0 {
876 let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
877 let raw_chunk = raw_remaining.min(needed);
878
879 self.core.apply_update(
880 trade.price,
881 Quantity::from_raw(raw_chunk, trade.size.precision),
882 trade.ts_init,
883 );
884
885 self.run_volume_raw += raw_chunk;
886 raw_remaining -= raw_chunk;
887
888 if self.run_volume_raw >= self.raw_step {
889 self.core.build_now_and_send();
890 self.run_volume_raw = 0;
891 self.current_run_side = None;
892 }
893 }
894 }
895
896 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
897 self.core.builder.update_bar(bar, volume, ts_init);
898 }
899}
900
901pub struct ValueBarAggregator {
906 core: BarAggregatorCore,
907 cum_value: f64,
908}
909
910impl Debug for ValueBarAggregator {
911 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
912 f.debug_struct(stringify!(ValueBarAggregator))
913 .field("core", &self.core)
914 .field("cum_value", &self.cum_value)
915 .finish()
916 }
917}
918
919impl ValueBarAggregator {
920 pub fn new<H: FnMut(Bar) + 'static>(
926 bar_type: BarType,
927 price_precision: u8,
928 size_precision: u8,
929 handler: H,
930 ) -> Self {
931 Self {
932 core: BarAggregatorCore::new(
933 bar_type.standard(),
934 price_precision,
935 size_precision,
936 handler,
937 ),
938 cum_value: 0.0,
939 }
940 }
941
942 #[must_use]
943 pub const fn get_cumulative_value(&self) -> f64 {
945 self.cum_value
946 }
947}
948
949impl BarAggregator for ValueBarAggregator {
950 fn bar_type(&self) -> BarType {
951 self.core.bar_type
952 }
953
954 fn is_running(&self) -> bool {
955 self.core.is_running
956 }
957
958 fn set_is_running(&mut self, value: bool) {
959 self.core.set_is_running(value);
960 }
961
962 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
964 let mut size_update = size.as_f64();
965 let spec = self.core.bar_type.spec();
966
967 while size_update > 0.0 {
968 let value_update = price.as_f64() * size_update;
969 if value_update == 0.0 {
970 self.core
972 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
973 break;
974 }
975
976 if self.cum_value + value_update < spec.step.get() as f64 {
977 self.cum_value += value_update;
978 self.core
979 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
980 break;
981 }
982
983 let value_diff = spec.step.get() as f64 - self.cum_value;
984 let mut size_diff = size_update * (value_diff / value_update);
985
986 if is_below_min_size(size_diff, size.precision) {
988 if is_below_min_size(size_update, size.precision) {
989 break;
990 }
991 size_diff = min_size_f64(size.precision);
992 }
993
994 self.core
995 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
996
997 self.core.build_now_and_send();
998 self.cum_value = 0.0;
999 size_update -= size_diff;
1000 }
1001 }
1002
1003 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1004 let mut volume_update = volume;
1005 let average_price = Price::new(
1006 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1007 self.core.builder.price_precision,
1008 );
1009
1010 while volume_update.as_f64() > 0.0 {
1011 let value_update = average_price.as_f64() * volume_update.as_f64();
1012 if value_update == 0.0 {
1013 self.core.builder.update_bar(bar, volume_update, ts_init);
1015 break;
1016 }
1017
1018 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1019 self.cum_value += value_update;
1020 self.core.builder.update_bar(bar, volume_update, ts_init);
1021 break;
1022 }
1023
1024 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1025 let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1026
1027 if is_below_min_size(volume_diff, volume_update.precision) {
1029 if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1030 break;
1031 }
1032 volume_diff = min_size_f64(volume_update.precision);
1033 }
1034
1035 self.core.builder.update_bar(
1036 bar,
1037 Quantity::new(volume_diff, volume_update.precision),
1038 ts_init,
1039 );
1040
1041 self.core.build_now_and_send();
1042 self.cum_value = 0.0;
1043 volume_update = Quantity::new(
1044 volume_update.as_f64() - volume_diff,
1045 volume_update.precision,
1046 );
1047 }
1048 }
1049}
1050
1051pub struct ValueImbalanceBarAggregator {
1053 core: BarAggregatorCore,
1054 imbalance_value: f64,
1055 step_value: f64,
1056}
1057
1058impl Debug for ValueImbalanceBarAggregator {
1059 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1060 f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1061 .field("core", &self.core)
1062 .field("imbalance_value", &self.imbalance_value)
1063 .field("step_value", &self.step_value)
1064 .finish()
1065 }
1066}
1067
1068impl ValueImbalanceBarAggregator {
1069 pub fn new<H: FnMut(Bar) + 'static>(
1075 bar_type: BarType,
1076 price_precision: u8,
1077 size_precision: u8,
1078 handler: H,
1079 ) -> Self {
1080 Self {
1081 core: BarAggregatorCore::new(
1082 bar_type.standard(),
1083 price_precision,
1084 size_precision,
1085 handler,
1086 ),
1087 imbalance_value: 0.0,
1088 step_value: bar_type.spec().step.get() as f64,
1089 }
1090 }
1091}
1092
1093impl BarAggregator for ValueImbalanceBarAggregator {
1094 fn bar_type(&self) -> BarType {
1095 self.core.bar_type
1096 }
1097
1098 fn is_running(&self) -> bool {
1099 self.core.is_running
1100 }
1101
1102 fn set_is_running(&mut self, value: bool) {
1103 self.core.set_is_running(value);
1104 }
1105
1106 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1111 self.core.apply_update(price, size, ts_init);
1112 }
1113
1114 fn handle_trade(&mut self, trade: TradeTick) {
1115 let price_f64 = trade.price.as_f64();
1116 if price_f64 == 0.0 {
1117 self.core
1118 .apply_update(trade.price, trade.size, trade.ts_init);
1119 return;
1120 }
1121
1122 let side_sign = match trade.aggressor_side {
1123 AggressorSide::Buyer => 1.0,
1124 AggressorSide::Seller => -1.0,
1125 AggressorSide::NoAggressor => {
1126 self.core
1127 .apply_update(trade.price, trade.size, trade.ts_init);
1128 return;
1129 }
1130 };
1131
1132 let mut size_remaining = trade.size.as_f64();
1133 while size_remaining > 0.0 {
1134 let value_remaining = price_f64 * size_remaining;
1135
1136 if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1137 let needed = self.step_value - self.imbalance_value.abs();
1138 if value_remaining <= needed {
1139 self.imbalance_value += side_sign * value_remaining;
1140 self.core.apply_update(
1141 trade.price,
1142 Quantity::new(size_remaining, trade.size.precision),
1143 trade.ts_init,
1144 );
1145
1146 if self.imbalance_value.abs() >= self.step_value {
1147 self.core.build_now_and_send();
1148 self.imbalance_value = 0.0;
1149 }
1150 break;
1151 }
1152
1153 let mut value_chunk = needed;
1154 let mut size_chunk = value_chunk / price_f64;
1155
1156 if is_below_min_size(size_chunk, trade.size.precision) {
1158 if is_below_min_size(size_remaining, trade.size.precision) {
1159 break;
1160 }
1161 size_chunk = min_size_f64(trade.size.precision);
1162 value_chunk = price_f64 * size_chunk;
1163 }
1164
1165 self.core.apply_update(
1166 trade.price,
1167 Quantity::new(size_chunk, trade.size.precision),
1168 trade.ts_init,
1169 );
1170 self.imbalance_value += side_sign * value_chunk;
1171 size_remaining -= size_chunk;
1172
1173 if self.imbalance_value.abs() >= self.step_value {
1174 self.core.build_now_and_send();
1175 self.imbalance_value = 0.0;
1176 }
1177 } else {
1178 let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1180 let mut size_chunk = value_to_flatten / price_f64;
1181
1182 if is_below_min_size(size_chunk, trade.size.precision) {
1184 if is_below_min_size(size_remaining, trade.size.precision) {
1185 break;
1186 }
1187 size_chunk = min_size_f64(trade.size.precision);
1188 value_to_flatten = price_f64 * size_chunk;
1189 }
1190
1191 self.core.apply_update(
1192 trade.price,
1193 Quantity::new(size_chunk, trade.size.precision),
1194 trade.ts_init,
1195 );
1196 self.imbalance_value += side_sign * value_to_flatten;
1197
1198 if self.imbalance_value.abs() >= self.step_value {
1200 self.core.build_now_and_send();
1201 self.imbalance_value = 0.0;
1202 }
1203 size_remaining -= size_chunk;
1204 }
1205 }
1206 }
1207
1208 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1209 self.core.builder.update_bar(bar, volume, ts_init);
1210 }
1211}
1212
1213pub struct ValueRunsBarAggregator {
1215 core: BarAggregatorCore,
1216 current_run_side: Option<AggressorSide>,
1217 run_value: f64,
1218 step_value: f64,
1219}
1220
1221impl Debug for ValueRunsBarAggregator {
1222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1223 f.debug_struct(stringify!(ValueRunsBarAggregator))
1224 .field("core", &self.core)
1225 .field("current_run_side", &self.current_run_side)
1226 .field("run_value", &self.run_value)
1227 .field("step_value", &self.step_value)
1228 .finish()
1229 }
1230}
1231
1232impl ValueRunsBarAggregator {
1233 pub fn new<H: FnMut(Bar) + 'static>(
1239 bar_type: BarType,
1240 price_precision: u8,
1241 size_precision: u8,
1242 handler: H,
1243 ) -> Self {
1244 Self {
1245 core: BarAggregatorCore::new(
1246 bar_type.standard(),
1247 price_precision,
1248 size_precision,
1249 handler,
1250 ),
1251 current_run_side: None,
1252 run_value: 0.0,
1253 step_value: bar_type.spec().step.get() as f64,
1254 }
1255 }
1256}
1257
1258impl BarAggregator for ValueRunsBarAggregator {
1259 fn bar_type(&self) -> BarType {
1260 self.core.bar_type
1261 }
1262
1263 fn is_running(&self) -> bool {
1264 self.core.is_running
1265 }
1266
1267 fn set_is_running(&mut self, value: bool) {
1268 self.core.set_is_running(value);
1269 }
1270
1271 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1276 self.core.apply_update(price, size, ts_init);
1277 }
1278
1279 fn handle_trade(&mut self, trade: TradeTick) {
1280 let price_f64 = trade.price.as_f64();
1281 if price_f64 == 0.0 {
1282 self.core
1283 .apply_update(trade.price, trade.size, trade.ts_init);
1284 return;
1285 }
1286
1287 let side = match trade.aggressor_side {
1288 AggressorSide::Buyer => Some(AggressorSide::Buyer),
1289 AggressorSide::Seller => Some(AggressorSide::Seller),
1290 AggressorSide::NoAggressor => None,
1291 };
1292
1293 let Some(side) = side else {
1294 self.core
1295 .apply_update(trade.price, trade.size, trade.ts_init);
1296 return;
1297 };
1298
1299 if self.current_run_side != Some(side) {
1300 self.current_run_side = Some(side);
1301 self.run_value = 0.0;
1302 self.core.builder.reset();
1303 }
1304
1305 let mut size_remaining = trade.size.as_f64();
1306 while size_remaining > 0.0 {
1307 let value_update = price_f64 * size_remaining;
1308 if self.run_value + value_update < self.step_value {
1309 self.run_value += value_update;
1310 self.core.apply_update(
1311 trade.price,
1312 Quantity::new(size_remaining, trade.size.precision),
1313 trade.ts_init,
1314 );
1315 break;
1316 }
1317
1318 let value_needed = self.step_value - self.run_value;
1319 let mut size_chunk = value_needed / price_f64;
1320
1321 if is_below_min_size(size_chunk, trade.size.precision) {
1323 if is_below_min_size(size_remaining, trade.size.precision) {
1324 break;
1325 }
1326 size_chunk = min_size_f64(trade.size.precision);
1327 }
1328
1329 self.core.apply_update(
1330 trade.price,
1331 Quantity::new(size_chunk, trade.size.precision),
1332 trade.ts_init,
1333 );
1334
1335 self.core.build_now_and_send();
1336 self.run_value = 0.0;
1337 self.current_run_side = None;
1338 size_remaining -= size_chunk;
1339 }
1340 }
1341
1342 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1343 self.core.builder.update_bar(bar, volume, ts_init);
1344 }
1345}
1346
1347pub struct RenkoBarAggregator {
1353 core: BarAggregatorCore,
1354 pub brick_size: PriceRaw,
1355 last_close: Option<Price>,
1356}
1357
1358impl Debug for RenkoBarAggregator {
1359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1360 f.debug_struct(stringify!(RenkoBarAggregator))
1361 .field("core", &self.core)
1362 .field("brick_size", &self.brick_size)
1363 .field("last_close", &self.last_close)
1364 .finish()
1365 }
1366}
1367
1368impl RenkoBarAggregator {
1369 pub fn new<H: FnMut(Bar) + 'static>(
1375 bar_type: BarType,
1376 price_precision: u8,
1377 size_precision: u8,
1378 price_increment: Price,
1379 handler: H,
1380 ) -> Self {
1381 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1383
1384 Self {
1385 core: BarAggregatorCore::new(
1386 bar_type.standard(),
1387 price_precision,
1388 size_precision,
1389 handler,
1390 ),
1391 brick_size,
1392 last_close: None,
1393 }
1394 }
1395}
1396
1397impl BarAggregator for RenkoBarAggregator {
1398 fn bar_type(&self) -> BarType {
1399 self.core.bar_type
1400 }
1401
1402 fn is_running(&self) -> bool {
1403 self.core.is_running
1404 }
1405
1406 fn set_is_running(&mut self, value: bool) {
1407 self.core.set_is_running(value);
1408 }
1409
1410 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1415 self.core.apply_update(price, size, ts_init);
1417
1418 if self.last_close.is_none() {
1420 self.last_close = Some(price);
1421 return;
1422 }
1423
1424 let last_close = self.last_close.unwrap();
1425
1426 let current_raw = price.raw;
1428 let last_close_raw = last_close.raw;
1429 let price_diff_raw = current_raw - last_close_raw;
1430 let abs_price_diff_raw = price_diff_raw.abs();
1431
1432 if abs_price_diff_raw >= self.brick_size {
1434 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1435 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1436 let mut current_close = last_close;
1437
1438 let total_volume = self.core.builder.volume;
1440
1441 for _i in 0..num_bricks {
1442 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1444 let brick_close = Price::from_raw(brick_close_raw, price.precision);
1445
1446 let (brick_high, brick_low) = if direction > 0.0 {
1448 (brick_close, current_close)
1449 } else {
1450 (current_close, brick_close)
1451 };
1452
1453 self.core.builder.reset();
1455 self.core.builder.open = Some(current_close);
1456 self.core.builder.high = Some(brick_high);
1457 self.core.builder.low = Some(brick_low);
1458 self.core.builder.close = Some(brick_close);
1459 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1461 self.core.builder.ts_last = ts_init;
1462 self.core.builder.initialized = true;
1463
1464 self.core.build_and_send(ts_init, ts_init);
1466
1467 current_close = brick_close;
1469 self.last_close = Some(brick_close);
1470 }
1471 }
1472 }
1473
1474 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1475 self.core.builder.update_bar(bar, volume, ts_init);
1477
1478 if self.last_close.is_none() {
1480 self.last_close = Some(bar.close);
1481 return;
1482 }
1483
1484 let last_close = self.last_close.unwrap();
1485
1486 let current_raw = bar.close.raw;
1488 let last_close_raw = last_close.raw;
1489 let price_diff_raw = current_raw - last_close_raw;
1490 let abs_price_diff_raw = price_diff_raw.abs();
1491
1492 if abs_price_diff_raw >= self.brick_size {
1494 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1495 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1496 let mut current_close = last_close;
1497
1498 let total_volume = self.core.builder.volume;
1500
1501 for _i in 0..num_bricks {
1502 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1504 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1505
1506 let (brick_high, brick_low) = if direction > 0.0 {
1508 (brick_close, current_close)
1509 } else {
1510 (current_close, brick_close)
1511 };
1512
1513 self.core.builder.reset();
1515 self.core.builder.open = Some(current_close);
1516 self.core.builder.high = Some(brick_high);
1517 self.core.builder.low = Some(brick_low);
1518 self.core.builder.close = Some(brick_close);
1519 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1521 self.core.builder.ts_last = ts_init;
1522 self.core.builder.initialized = true;
1523
1524 self.core.build_and_send(ts_init, ts_init);
1526
1527 current_close = brick_close;
1529 self.last_close = Some(brick_close);
1530 }
1531 }
1532 }
1533}
1534
1535pub struct TimeBarAggregator {
1539 core: BarAggregatorCore,
1540 clock: Rc<RefCell<dyn Clock>>,
1541 build_with_no_updates: bool,
1542 timestamp_on_close: bool,
1543 is_left_open: bool,
1544 stored_open_ns: UnixNanos,
1545 timer_name: String,
1546 interval_ns: UnixNanos,
1547 next_close_ns: UnixNanos,
1548 bar_build_delay: u64,
1549 time_bars_origin_offset: Option<TimeDelta>,
1550 skip_first_non_full_bar: bool,
1551 pub historical_mode: bool,
1552 historical_events: Vec<TimeEvent>,
1553 aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1554}
1555
1556impl Debug for TimeBarAggregator {
1557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1558 f.debug_struct(stringify!(TimeBarAggregator))
1559 .field("core", &self.core)
1560 .field("build_with_no_updates", &self.build_with_no_updates)
1561 .field("timestamp_on_close", &self.timestamp_on_close)
1562 .field("is_left_open", &self.is_left_open)
1563 .field("timer_name", &self.timer_name)
1564 .field("interval_ns", &self.interval_ns)
1565 .field("bar_build_delay", &self.bar_build_delay)
1566 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1567 .finish()
1568 }
1569}
1570
1571impl TimeBarAggregator {
1572 #[allow(clippy::too_many_arguments)]
1578 pub fn new<H: FnMut(Bar) + 'static>(
1579 bar_type: BarType,
1580 price_precision: u8,
1581 size_precision: u8,
1582 clock: Rc<RefCell<dyn Clock>>,
1583 handler: H,
1584 build_with_no_updates: bool,
1585 timestamp_on_close: bool,
1586 interval_type: BarIntervalType,
1587 time_bars_origin_offset: Option<TimeDelta>,
1588 bar_build_delay: u64,
1589 skip_first_non_full_bar: bool,
1590 ) -> Self {
1591 let is_left_open = match interval_type {
1592 BarIntervalType::LeftOpen => true,
1593 BarIntervalType::RightOpen => false,
1594 };
1595
1596 let core = BarAggregatorCore::new(
1597 bar_type.standard(),
1598 price_precision,
1599 size_precision,
1600 handler,
1601 );
1602
1603 Self {
1604 core,
1605 clock,
1606 build_with_no_updates,
1607 timestamp_on_close,
1608 is_left_open,
1609 stored_open_ns: UnixNanos::default(),
1610 timer_name: bar_type.to_string(),
1611 interval_ns: get_bar_interval_ns(&bar_type),
1612 next_close_ns: UnixNanos::default(),
1613 bar_build_delay,
1614 time_bars_origin_offset,
1615 skip_first_non_full_bar,
1616 historical_mode: false,
1617 historical_events: Vec::new(),
1618 aggregator_weak: None,
1619 }
1620 }
1621
1622 pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1624 self.clock = clock;
1625 }
1626
1627 pub fn start_timer_internal(
1636 &mut self,
1637 aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1638 ) {
1639 let aggregator_weak = if let Some(rc) = aggregator_rc {
1641 let weak = Rc::downgrade(&rc);
1643 self.aggregator_weak = Some(weak.clone());
1644 weak
1645 } else {
1646 self.aggregator_weak
1648 .as_ref()
1649 .expect("Aggregator weak reference must be set before calling start_timer()")
1650 .clone()
1651 };
1652
1653 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1654 if let Some(agg) = aggregator_weak.upgrade() {
1655 agg.borrow_mut().build_bar(event);
1656 }
1657 }));
1658
1659 let now = self.clock.borrow().utc_now();
1661 let mut start_time =
1662 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1663 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1664
1665 let fire_immediately = start_time == now;
1667
1668 self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1669
1670 let spec = &self.bar_type().spec();
1671 let start_time_ns = UnixNanos::from(start_time);
1672
1673 if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1674 self.clock
1675 .borrow_mut()
1676 .set_timer_ns(
1677 &self.timer_name,
1678 self.interval_ns.as_u64(),
1679 Some(start_time_ns),
1680 None,
1681 Some(callback),
1682 Some(true), Some(fire_immediately),
1684 )
1685 .expect(FAILED);
1686
1687 if fire_immediately {
1688 self.next_close_ns = start_time_ns;
1689 } else {
1690 let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1691 self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1692 }
1693
1694 self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1695 } else {
1696 let alert_time = if fire_immediately {
1698 start_time
1699 } else {
1700 let step = spec.step.get() as u32;
1701 if spec.aggregation == BarAggregation::Month {
1702 add_n_months(start_time, step).expect(FAILED)
1703 } else {
1704 add_n_years(start_time, step).expect(FAILED)
1706 }
1707 };
1708
1709 self.clock
1710 .borrow_mut()
1711 .set_time_alert_ns(
1712 &self.timer_name,
1713 UnixNanos::from(alert_time),
1714 Some(callback),
1715 Some(true), )
1717 .expect(FAILED);
1718
1719 self.next_close_ns = UnixNanos::from(alert_time);
1720 self.stored_open_ns = UnixNanos::from(start_time);
1721 }
1722
1723 log::debug!(
1724 "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1725 self.timer_name,
1726 start_time,
1727 self.historical_mode,
1728 fire_immediately,
1729 now,
1730 self.bar_build_delay
1731 );
1732 }
1733
1734 pub fn stop(&mut self) {
1736 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1737 }
1738
1739 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1740 if self.skip_first_non_full_bar {
1741 self.core.builder.reset();
1742 self.skip_first_non_full_bar = false;
1743 } else {
1744 self.core.build_and_send(ts_event, ts_init);
1745 }
1746 }
1747
1748 fn build_bar(&mut self, event: TimeEvent) {
1749 if !self.core.builder.initialized {
1750 return;
1751 }
1752
1753 if !self.build_with_no_updates && self.core.builder.count == 0 {
1754 return; }
1756
1757 let ts_init = event.ts_event;
1758 let ts_event = if self.is_left_open {
1759 if self.timestamp_on_close {
1760 event.ts_event
1761 } else {
1762 self.stored_open_ns
1763 }
1764 } else {
1765 self.stored_open_ns
1766 };
1767
1768 self.build_and_send(ts_event, ts_init);
1769
1770 self.stored_open_ns = event.ts_event;
1772
1773 if self.bar_type().spec().aggregation == BarAggregation::Month {
1774 let step = self.bar_type().spec().step.get() as u32;
1775 let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1776
1777 self.clock
1778 .borrow_mut()
1779 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1780 .expect(FAILED);
1781
1782 self.next_close_ns = alert_time_ns;
1783 } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1784 let step = self.bar_type().spec().step.get() as u32;
1785 let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1786
1787 self.clock
1788 .borrow_mut()
1789 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1790 .expect(FAILED);
1791
1792 self.next_close_ns = alert_time_ns;
1793 } else {
1794 self.next_close_ns = self
1796 .clock
1797 .borrow()
1798 .next_time_ns(&self.timer_name)
1799 .unwrap_or_default();
1800 }
1801 }
1802
1803 fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1804 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1805 {
1807 let mut clock_borrow = self.clock.borrow_mut();
1808 let test_clock = clock_borrow
1809 .as_any_mut()
1810 .downcast_mut::<TestClock>()
1811 .expect("Expected TestClock in historical mode");
1812 test_clock.set_time(ts_init);
1813 }
1814 self.start_timer_internal(None);
1816 }
1817
1818 {
1820 let mut clock_borrow = self.clock.borrow_mut();
1821 let test_clock = clock_borrow
1822 .as_any_mut()
1823 .downcast_mut::<TestClock>()
1824 .expect("Expected TestClock in historical mode");
1825 self.historical_events = test_clock.advance_time(ts_init, true);
1826 }
1827 }
1828
1829 fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1830 let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1833 for event in events {
1834 self.build_bar(event);
1835 }
1836 }
1837
1838 pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1840 self.historical_events = events;
1841 }
1842}
1843
1844impl BarAggregator for TimeBarAggregator {
1845 fn bar_type(&self) -> BarType {
1846 self.core.bar_type
1847 }
1848
1849 fn is_running(&self) -> bool {
1850 self.core.is_running
1851 }
1852
1853 fn set_is_running(&mut self, value: bool) {
1854 self.core.set_is_running(value);
1855 }
1856
1857 fn stop(&mut self) {
1859 Self::stop(self);
1860 }
1861
1862 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1863 if self.historical_mode {
1864 self.preprocess_historical_events(ts_init);
1865 }
1866
1867 self.core.apply_update(price, size, ts_init);
1868
1869 if self.historical_mode {
1870 self.postprocess_historical_events(ts_init);
1871 }
1872 }
1873
1874 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1875 if self.historical_mode {
1876 self.preprocess_historical_events(ts_init);
1877 }
1878
1879 self.core.builder.update_bar(bar, volume, ts_init);
1880
1881 if self.historical_mode {
1882 self.postprocess_historical_events(ts_init);
1883 }
1884 }
1885
1886 fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1887 self.historical_mode = historical_mode;
1888 self.core.handler = handler;
1889 }
1890
1891 fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1892 self.set_historical_events_internal(events);
1893 }
1894
1895 fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1896 self.set_clock_internal(clock);
1897 }
1898
1899 fn build_bar(&mut self, event: TimeEvent) {
1900 {
1903 #[allow(clippy::use_self)]
1904 TimeBarAggregator::build_bar(self, event);
1905 }
1906 }
1907
1908 fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1909 self.aggregator_weak = Some(weak);
1910 }
1911
1912 fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1913 self.start_timer_internal(aggregator_rc);
1914 }
1915}
1916
1917fn is_below_min_size(size: f64, precision: u8) -> bool {
1918 Quantity::new(size, precision).raw == 0
1919}
1920
1921fn min_size_f64(precision: u8) -> f64 {
1922 10_f64.powi(-(precision as i32))
1923}
1924
1925#[cfg(test)]
1926mod tests {
1927 use std::sync::{Arc, Mutex};
1928
1929 use nautilus_common::clock::TestClock;
1930 use nautilus_core::{MUTEX_POISONED, UUID4};
1931 use nautilus_model::{
1932 data::{BarSpecification, BarType},
1933 enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1934 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1935 types::{Price, Quantity},
1936 };
1937 use rstest::rstest;
1938 use ustr::Ustr;
1939
1940 use super::*;
1941
1942 #[rstest]
1943 fn test_bar_builder_initialization(equity_aapl: Equity) {
1944 let instrument = InstrumentAny::Equity(equity_aapl);
1945 let bar_type = BarType::new(
1946 instrument.id(),
1947 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1948 AggregationSource::Internal,
1949 );
1950 let builder = BarBuilder::new(
1951 bar_type,
1952 instrument.price_precision(),
1953 instrument.size_precision(),
1954 );
1955
1956 assert!(!builder.initialized);
1957 assert_eq!(builder.ts_last, 0);
1958 assert_eq!(builder.count, 0);
1959 }
1960
1961 #[rstest]
1962 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1963 let instrument = InstrumentAny::Equity(equity_aapl);
1964 let bar_type = BarType::new(
1965 instrument.id(),
1966 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1967 AggregationSource::Internal,
1968 );
1969 let mut builder = BarBuilder::new(
1970 bar_type,
1971 instrument.price_precision(),
1972 instrument.size_precision(),
1973 );
1974
1975 builder.update(
1976 Price::from("100.00"),
1977 Quantity::from(1),
1978 UnixNanos::from(1000),
1979 );
1980 builder.update(
1981 Price::from("95.00"),
1982 Quantity::from(1),
1983 UnixNanos::from(2000),
1984 );
1985 builder.update(
1986 Price::from("105.00"),
1987 Quantity::from(1),
1988 UnixNanos::from(3000),
1989 );
1990
1991 let bar = builder.build_now();
1992 assert!(bar.high > bar.low);
1993 assert_eq!(bar.open, Price::from("100.00"));
1994 assert_eq!(bar.high, Price::from("105.00"));
1995 assert_eq!(bar.low, Price::from("95.00"));
1996 assert_eq!(bar.close, Price::from("105.00"));
1997 }
1998
1999 #[rstest]
2000 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
2001 let instrument = InstrumentAny::Equity(equity_aapl);
2002 let bar_type = BarType::new(
2003 instrument.id(),
2004 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2005 AggregationSource::Internal,
2006 );
2007 let mut builder = BarBuilder::new(
2008 bar_type,
2009 instrument.price_precision(),
2010 instrument.size_precision(),
2011 );
2012
2013 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2014 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2015
2016 assert_eq!(builder.ts_last, 1_000);
2017 assert_eq!(builder.count, 1);
2018 }
2019
2020 #[rstest]
2021 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2022 let instrument = InstrumentAny::Equity(equity_aapl);
2023 let bar_type = BarType::new(
2024 instrument.id(),
2025 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2026 AggregationSource::Internal,
2027 );
2028 let mut builder = BarBuilder::new(
2029 bar_type,
2030 instrument.price_precision(),
2031 instrument.size_precision(),
2032 );
2033
2034 builder.update(
2035 Price::from("1.00000"),
2036 Quantity::from(1),
2037 UnixNanos::default(),
2038 );
2039
2040 assert!(builder.initialized);
2041 assert_eq!(builder.ts_last, 0);
2042 assert_eq!(builder.count, 1);
2043 }
2044
2045 #[rstest]
2046 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2047 equity_aapl: Equity,
2048 ) {
2049 let instrument = InstrumentAny::Equity(equity_aapl);
2050 let bar_type = BarType::new(
2051 instrument.id(),
2052 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2053 AggregationSource::Internal,
2054 );
2055 let mut builder = BarBuilder::new(bar_type, 2, 0);
2056
2057 builder.update(
2058 Price::from("1.00000"),
2059 Quantity::from(1),
2060 UnixNanos::from(1_000),
2061 );
2062 builder.update(
2063 Price::from("1.00001"),
2064 Quantity::from(1),
2065 UnixNanos::from(500),
2066 );
2067
2068 assert!(builder.initialized);
2069 assert_eq!(builder.ts_last, 1_000);
2070 assert_eq!(builder.count, 1);
2071 }
2072
2073 #[rstest]
2074 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2075 let instrument = InstrumentAny::Equity(equity_aapl);
2076 let bar_type = BarType::new(
2077 instrument.id(),
2078 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2079 AggregationSource::Internal,
2080 );
2081 let mut builder = BarBuilder::new(
2082 bar_type,
2083 instrument.price_precision(),
2084 instrument.size_precision(),
2085 );
2086
2087 for _ in 0..5 {
2088 builder.update(
2089 Price::from("1.00000"),
2090 Quantity::from(1),
2091 UnixNanos::from(1_000),
2092 );
2093 }
2094
2095 assert_eq!(builder.count, 5);
2096 }
2097
2098 #[rstest]
2099 #[should_panic]
2100 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2101 let instrument = InstrumentAny::Equity(equity_aapl);
2102 let bar_type = BarType::new(
2103 instrument.id(),
2104 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2105 AggregationSource::Internal,
2106 );
2107 let mut builder = BarBuilder::new(
2108 bar_type,
2109 instrument.price_precision(),
2110 instrument.size_precision(),
2111 );
2112 let _ = builder.build_now();
2113 }
2114
2115 #[rstest]
2116 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2117 let instrument = InstrumentAny::Equity(equity_aapl);
2118 let bar_type = BarType::new(
2119 instrument.id(),
2120 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2121 AggregationSource::Internal,
2122 );
2123 let mut builder = BarBuilder::new(
2124 bar_type,
2125 instrument.price_precision(),
2126 instrument.size_precision(),
2127 );
2128
2129 builder.update(
2130 Price::from("1.00001"),
2131 Quantity::from(2),
2132 UnixNanos::default(),
2133 );
2134 builder.update(
2135 Price::from("1.00002"),
2136 Quantity::from(2),
2137 UnixNanos::default(),
2138 );
2139 builder.update(
2140 Price::from("1.00000"),
2141 Quantity::from(1),
2142 UnixNanos::from(1_000_000_000),
2143 );
2144
2145 let bar = builder.build_now();
2146
2147 assert_eq!(bar.open, Price::from("1.00001"));
2148 assert_eq!(bar.high, Price::from("1.00002"));
2149 assert_eq!(bar.low, Price::from("1.00000"));
2150 assert_eq!(bar.close, Price::from("1.00000"));
2151 assert_eq!(bar.volume, Quantity::from(5));
2152 assert_eq!(bar.ts_init, 1_000_000_000);
2153 assert_eq!(builder.ts_last, 1_000_000_000);
2154 assert_eq!(builder.count, 0);
2155 }
2156
2157 #[rstest]
2158 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2159 let instrument = InstrumentAny::Equity(equity_aapl);
2160 let bar_type = BarType::new(
2161 instrument.id(),
2162 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2163 AggregationSource::Internal,
2164 );
2165 let mut builder = BarBuilder::new(bar_type, 2, 0);
2166
2167 builder.update(
2168 Price::from("1.00001"),
2169 Quantity::from(1),
2170 UnixNanos::default(),
2171 );
2172 builder.build_now();
2173
2174 builder.update(
2175 Price::from("1.00000"),
2176 Quantity::from(1),
2177 UnixNanos::default(),
2178 );
2179 builder.update(
2180 Price::from("1.00003"),
2181 Quantity::from(1),
2182 UnixNanos::default(),
2183 );
2184 builder.update(
2185 Price::from("1.00002"),
2186 Quantity::from(1),
2187 UnixNanos::default(),
2188 );
2189
2190 let bar = builder.build_now();
2191
2192 assert_eq!(bar.open, Price::from("1.00000"));
2193 assert_eq!(bar.high, Price::from("1.00003"));
2194 assert_eq!(bar.low, Price::from("1.00000"));
2195 assert_eq!(bar.close, Price::from("1.00002"));
2196 assert_eq!(bar.volume, Quantity::from(3));
2197 }
2198
2199 #[rstest]
2200 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2201 let instrument = InstrumentAny::Equity(equity_aapl);
2202 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2203 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2204 let handler = Arc::new(Mutex::new(Vec::new()));
2205 let handler_clone = Arc::clone(&handler);
2206
2207 let mut aggregator = TickBarAggregator::new(
2208 bar_type,
2209 instrument.price_precision(),
2210 instrument.size_precision(),
2211 move |bar: Bar| {
2212 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2213 handler_guard.push(bar);
2214 },
2215 );
2216
2217 let trade = TradeTick::default();
2218 aggregator.handle_trade(trade);
2219
2220 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2221 assert_eq!(handler_guard.len(), 0);
2222 }
2223
2224 #[rstest]
2225 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2226 let instrument = InstrumentAny::Equity(equity_aapl);
2227 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2228 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2229 let handler = Arc::new(Mutex::new(Vec::new()));
2230 let handler_clone = Arc::clone(&handler);
2231
2232 let mut aggregator = TickBarAggregator::new(
2233 bar_type,
2234 instrument.price_precision(),
2235 instrument.size_precision(),
2236 move |bar: Bar| {
2237 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2238 handler_guard.push(bar);
2239 },
2240 );
2241
2242 let trade = TradeTick::default();
2243 aggregator.handle_trade(trade);
2244 aggregator.handle_trade(trade);
2245 aggregator.handle_trade(trade);
2246
2247 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2248 let bar = handler_guard.first().unwrap();
2249 assert_eq!(handler_guard.len(), 1);
2250 assert_eq!(bar.open, trade.price);
2251 assert_eq!(bar.high, trade.price);
2252 assert_eq!(bar.low, trade.price);
2253 assert_eq!(bar.close, trade.price);
2254 assert_eq!(bar.volume, Quantity::from(300000));
2255 assert_eq!(bar.ts_event, trade.ts_event);
2256 assert_eq!(bar.ts_init, trade.ts_init);
2257 }
2258
2259 #[rstest]
2260 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2261 let instrument = InstrumentAny::Equity(equity_aapl);
2262 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2263 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2264 let handler = Arc::new(Mutex::new(Vec::new()));
2265 let handler_clone = Arc::clone(&handler);
2266
2267 let mut aggregator = TickBarAggregator::new(
2268 bar_type,
2269 instrument.price_precision(),
2270 instrument.size_precision(),
2271 move |bar: Bar| {
2272 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2273 handler_guard.push(bar);
2274 },
2275 );
2276
2277 aggregator.update(
2278 Price::from("1.00001"),
2279 Quantity::from(1),
2280 UnixNanos::default(),
2281 );
2282 aggregator.update(
2283 Price::from("1.00002"),
2284 Quantity::from(1),
2285 UnixNanos::from(1000),
2286 );
2287 aggregator.update(
2288 Price::from("1.00003"),
2289 Quantity::from(1),
2290 UnixNanos::from(2000),
2291 );
2292
2293 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2294 assert_eq!(handler_guard.len(), 1);
2295
2296 let bar = handler_guard.first().unwrap();
2297 assert_eq!(bar.open, Price::from("1.00001"));
2298 assert_eq!(bar.high, Price::from("1.00003"));
2299 assert_eq!(bar.low, Price::from("1.00001"));
2300 assert_eq!(bar.close, Price::from("1.00003"));
2301 assert_eq!(bar.volume, Quantity::from(3));
2302 }
2303
2304 #[rstest]
2305 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2306 let instrument = InstrumentAny::Equity(equity_aapl);
2307 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2308 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2309 let handler = Arc::new(Mutex::new(Vec::new()));
2310 let handler_clone = Arc::clone(&handler);
2311
2312 let mut aggregator = TickBarAggregator::new(
2313 bar_type,
2314 instrument.price_precision(),
2315 instrument.size_precision(),
2316 move |bar: Bar| {
2317 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2318 handler_guard.push(bar);
2319 },
2320 );
2321
2322 aggregator.update(
2323 Price::from("1.00001"),
2324 Quantity::from(1),
2325 UnixNanos::default(),
2326 );
2327 aggregator.update(
2328 Price::from("1.00002"),
2329 Quantity::from(1),
2330 UnixNanos::from(1000),
2331 );
2332 aggregator.update(
2333 Price::from("1.00003"),
2334 Quantity::from(1),
2335 UnixNanos::from(2000),
2336 );
2337 aggregator.update(
2338 Price::from("1.00004"),
2339 Quantity::from(1),
2340 UnixNanos::from(3000),
2341 );
2342
2343 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2344 assert_eq!(handler_guard.len(), 2);
2345
2346 let bar1 = &handler_guard[0];
2347 assert_eq!(bar1.open, Price::from("1.00001"));
2348 assert_eq!(bar1.close, Price::from("1.00002"));
2349 assert_eq!(bar1.volume, Quantity::from(2));
2350
2351 let bar2 = &handler_guard[1];
2352 assert_eq!(bar2.open, Price::from("1.00003"));
2353 assert_eq!(bar2.close, Price::from("1.00004"));
2354 assert_eq!(bar2.volume, Quantity::from(2));
2355 }
2356
2357 #[rstest]
2358 fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2359 let instrument = InstrumentAny::Equity(equity_aapl);
2360 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2361 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2362 let handler = Arc::new(Mutex::new(Vec::new()));
2363 let handler_clone = Arc::clone(&handler);
2364
2365 let mut aggregator = TickImbalanceBarAggregator::new(
2366 bar_type,
2367 instrument.price_precision(),
2368 instrument.size_precision(),
2369 move |bar: Bar| {
2370 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2371 handler_guard.push(bar);
2372 },
2373 );
2374
2375 let trade = TradeTick::default();
2376 aggregator.handle_trade(trade);
2377 aggregator.handle_trade(trade);
2378
2379 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2380 assert_eq!(handler_guard.len(), 1);
2381 let bar = handler_guard.first().unwrap();
2382 assert_eq!(bar.volume, Quantity::from(200000));
2383 }
2384
2385 #[rstest]
2386 fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2387 let instrument = InstrumentAny::Equity(equity_aapl);
2388 let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2389 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2390 let handler = Arc::new(Mutex::new(Vec::new()));
2391 let handler_clone = Arc::clone(&handler);
2392
2393 let mut aggregator = TickImbalanceBarAggregator::new(
2394 bar_type,
2395 instrument.price_precision(),
2396 instrument.size_precision(),
2397 move |bar: Bar| {
2398 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2399 handler_guard.push(bar);
2400 },
2401 );
2402
2403 let sell = TradeTick {
2404 aggressor_side: AggressorSide::Seller,
2405 ..TradeTick::default()
2406 };
2407
2408 aggregator.handle_trade(sell);
2409
2410 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2411 assert_eq!(handler_guard.len(), 1);
2412 }
2413
2414 #[rstest]
2415 fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2416 let instrument = InstrumentAny::Equity(equity_aapl);
2417 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2418 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2419 let handler = Arc::new(Mutex::new(Vec::new()));
2420 let handler_clone = Arc::clone(&handler);
2421
2422 let mut aggregator = TickRunsBarAggregator::new(
2423 bar_type,
2424 instrument.price_precision(),
2425 instrument.size_precision(),
2426 move |bar: Bar| {
2427 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2428 handler_guard.push(bar);
2429 },
2430 );
2431
2432 let buy = TradeTick::default();
2433 let sell = TradeTick {
2434 aggressor_side: AggressorSide::Seller,
2435 ..buy
2436 };
2437
2438 aggregator.handle_trade(buy);
2439 aggregator.handle_trade(buy);
2440 aggregator.handle_trade(sell);
2441 aggregator.handle_trade(sell);
2442
2443 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2444 assert_eq!(handler_guard.len(), 2);
2445 }
2446
2447 #[rstest]
2448 fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2449 let instrument = InstrumentAny::Equity(equity_aapl);
2450 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2451 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2452 let handler = Arc::new(Mutex::new(Vec::new()));
2453 let handler_clone = Arc::clone(&handler);
2454
2455 let mut aggregator = TickRunsBarAggregator::new(
2456 bar_type,
2457 instrument.price_precision(),
2458 instrument.size_precision(),
2459 move |bar: Bar| {
2460 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2461 handler_guard.push(bar);
2462 },
2463 );
2464
2465 let buy = TradeTick {
2466 size: Quantity::from(1),
2467 ..TradeTick::default()
2468 };
2469 let sell = TradeTick {
2470 aggressor_side: AggressorSide::Seller,
2471 size: Quantity::from(1),
2472 ..buy
2473 };
2474
2475 aggregator.handle_trade(buy);
2476 aggregator.handle_trade(buy);
2477 aggregator.handle_trade(sell);
2478 aggregator.handle_trade(sell);
2479
2480 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2481 assert_eq!(handler_guard.len(), 2);
2482 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2483 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2484 }
2485
2486 #[rstest]
2487 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2488 let instrument = InstrumentAny::Equity(equity_aapl);
2489 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2490 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2491 let handler = Arc::new(Mutex::new(Vec::new()));
2492 let handler_clone = Arc::clone(&handler);
2493
2494 let mut aggregator = VolumeBarAggregator::new(
2495 bar_type,
2496 instrument.price_precision(),
2497 instrument.size_precision(),
2498 move |bar: Bar| {
2499 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2500 handler_guard.push(bar);
2501 },
2502 );
2503
2504 aggregator.update(
2505 Price::from("1.00001"),
2506 Quantity::from(25),
2507 UnixNanos::default(),
2508 );
2509
2510 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2511 assert_eq!(handler_guard.len(), 2);
2512 let bar1 = &handler_guard[0];
2513 assert_eq!(bar1.volume, Quantity::from(10));
2514 let bar2 = &handler_guard[1];
2515 assert_eq!(bar2.volume, Quantity::from(10));
2516 }
2517
2518 #[rstest]
2519 fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2520 let instrument = InstrumentAny::Equity(equity_aapl);
2521 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2522 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2523 let handler = Arc::new(Mutex::new(Vec::new()));
2524 let handler_clone = Arc::clone(&handler);
2525
2526 let mut aggregator = VolumeRunsBarAggregator::new(
2527 bar_type,
2528 instrument.price_precision(),
2529 instrument.size_precision(),
2530 move |bar: Bar| {
2531 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2532 handler_guard.push(bar);
2533 },
2534 );
2535
2536 let buy = TradeTick {
2537 instrument_id: instrument.id(),
2538 price: Price::from("1.0"),
2539 size: Quantity::from(1),
2540 ..TradeTick::default()
2541 };
2542 let sell = TradeTick {
2543 aggressor_side: AggressorSide::Seller,
2544 ..buy
2545 };
2546
2547 aggregator.handle_trade(buy);
2548 aggregator.handle_trade(buy); aggregator.handle_trade(sell);
2550 aggregator.handle_trade(sell); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2553 assert!(handler_guard.len() >= 2);
2554 assert!(
2555 (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2556 < f64::EPSILON
2557 );
2558 }
2559
2560 #[rstest]
2561 fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2562 let instrument = InstrumentAny::Equity(equity_aapl);
2563 let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2564 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2565 let handler = Arc::new(Mutex::new(Vec::new()));
2566 let handler_clone = Arc::clone(&handler);
2567
2568 let mut aggregator = VolumeRunsBarAggregator::new(
2569 bar_type,
2570 instrument.price_precision(),
2571 instrument.size_precision(),
2572 move |bar: Bar| {
2573 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2574 handler_guard.push(bar);
2575 },
2576 );
2577
2578 let trade = TradeTick {
2579 instrument_id: instrument.id(),
2580 price: Price::from("1.0"),
2581 size: Quantity::from(5),
2582 ..TradeTick::default()
2583 };
2584
2585 aggregator.handle_trade(trade);
2586
2587 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2588 assert!(!handler_guard.is_empty());
2589 assert!(handler_guard[0].volume.as_f64() > 0.0);
2590 assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2591 }
2592
2593 #[rstest]
2594 fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2595 let instrument = InstrumentAny::Equity(equity_aapl);
2596 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2597 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2598 let handler = Arc::new(Mutex::new(Vec::new()));
2599 let handler_clone = Arc::clone(&handler);
2600
2601 let mut aggregator = VolumeImbalanceBarAggregator::new(
2602 bar_type,
2603 instrument.price_precision(),
2604 instrument.size_precision(),
2605 move |bar: Bar| {
2606 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2607 handler_guard.push(bar);
2608 },
2609 );
2610
2611 let trade_small = TradeTick {
2612 instrument_id: instrument.id(),
2613 price: Price::from("1.0"),
2614 size: Quantity::from(1),
2615 ..TradeTick::default()
2616 };
2617 let trade_large = TradeTick {
2618 size: Quantity::from(3),
2619 ..trade_small
2620 };
2621
2622 aggregator.handle_trade(trade_small);
2623 aggregator.handle_trade(trade_large);
2624
2625 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2626 assert_eq!(handler_guard.len(), 2);
2627 let total_output = handler_guard
2628 .iter()
2629 .map(|bar| bar.volume.as_f64())
2630 .sum::<f64>();
2631 let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2632 assert!((total_output - total_input).abs() < f64::EPSILON);
2633 }
2634
2635 #[rstest]
2636 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2637 let instrument = InstrumentAny::Equity(equity_aapl);
2638 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2640 let handler = Arc::new(Mutex::new(Vec::new()));
2641 let handler_clone = Arc::clone(&handler);
2642
2643 let mut aggregator = ValueBarAggregator::new(
2644 bar_type,
2645 instrument.price_precision(),
2646 instrument.size_precision(),
2647 move |bar: Bar| {
2648 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2649 handler_guard.push(bar);
2650 },
2651 );
2652
2653 aggregator.update(
2655 Price::from("100.00"),
2656 Quantity::from(5),
2657 UnixNanos::default(),
2658 );
2659 aggregator.update(
2660 Price::from("100.00"),
2661 Quantity::from(5),
2662 UnixNanos::from(1000),
2663 );
2664
2665 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2666 assert_eq!(handler_guard.len(), 1);
2667 let bar = handler_guard.first().unwrap();
2668 assert_eq!(bar.volume, Quantity::from(10));
2669 }
2670
2671 #[rstest]
2672 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2673 let instrument = InstrumentAny::Equity(equity_aapl);
2674 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2675 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2676 let handler = Arc::new(Mutex::new(Vec::new()));
2677 let handler_clone = Arc::clone(&handler);
2678
2679 let mut aggregator = ValueBarAggregator::new(
2680 bar_type,
2681 instrument.price_precision(),
2682 instrument.size_precision(),
2683 move |bar: Bar| {
2684 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2685 handler_guard.push(bar);
2686 },
2687 );
2688
2689 aggregator.update(
2691 Price::from("100.00"),
2692 Quantity::from(25),
2693 UnixNanos::default(),
2694 );
2695
2696 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2697 assert_eq!(handler_guard.len(), 2);
2698 let remaining_value = aggregator.get_cumulative_value();
2699 assert!(remaining_value < 1000.0); }
2701
2702 #[rstest]
2703 fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2704 let instrument = InstrumentAny::Equity(equity_aapl);
2705 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2706 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2707 let handler = Arc::new(Mutex::new(Vec::new()));
2708 let handler_clone = Arc::clone(&handler);
2709
2710 let mut aggregator = ValueBarAggregator::new(
2711 bar_type,
2712 instrument.price_precision(),
2713 instrument.size_precision(),
2714 move |bar: Bar| {
2715 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2716 handler_guard.push(bar);
2717 },
2718 );
2719
2720 aggregator.update(
2722 Price::from("0.00"),
2723 Quantity::from(100),
2724 UnixNanos::default(),
2725 );
2726
2727 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2729 assert_eq!(handler_guard.len(), 0);
2730
2731 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2733 }
2734
2735 #[rstest]
2736 fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2737 let instrument = InstrumentAny::Equity(equity_aapl);
2738 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2739 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2740 let handler = Arc::new(Mutex::new(Vec::new()));
2741 let handler_clone = Arc::clone(&handler);
2742
2743 let mut aggregator = ValueBarAggregator::new(
2744 bar_type,
2745 instrument.price_precision(),
2746 instrument.size_precision(),
2747 move |bar: Bar| {
2748 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2749 handler_guard.push(bar);
2750 },
2751 );
2752
2753 aggregator.update(
2755 Price::from("100.00"),
2756 Quantity::from(0),
2757 UnixNanos::default(),
2758 );
2759
2760 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2762 assert_eq!(handler_guard.len(), 0);
2763
2764 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2766 }
2767
2768 #[rstest]
2769 fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2770 let instrument = InstrumentAny::Equity(equity_aapl);
2771 let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2772 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2773 let handler = Arc::new(Mutex::new(Vec::new()));
2774 let handler_clone = Arc::clone(&handler);
2775
2776 let mut aggregator = ValueImbalanceBarAggregator::new(
2777 bar_type,
2778 instrument.price_precision(),
2779 instrument.size_precision(),
2780 move |bar: Bar| {
2781 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2782 handler_guard.push(bar);
2783 },
2784 );
2785
2786 let buy = TradeTick {
2787 price: Price::from("5.0"),
2788 size: Quantity::from(2), instrument_id: instrument.id(),
2790 ..TradeTick::default()
2791 };
2792 let sell = TradeTick {
2793 price: Price::from("5.0"),
2794 size: Quantity::from(2), aggressor_side: AggressorSide::Seller,
2796 instrument_id: instrument.id(),
2797 ..buy
2798 };
2799
2800 aggregator.handle_trade(buy);
2801 aggregator.handle_trade(sell);
2802
2803 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2804 assert_eq!(handler_guard.len(), 2);
2805 }
2806
2807 #[rstest]
2808 fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2809 let instrument = InstrumentAny::Equity(equity_aapl);
2810 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2811 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2812 let handler = Arc::new(Mutex::new(Vec::new()));
2813 let handler_clone = Arc::clone(&handler);
2814
2815 let mut aggregator = ValueRunsBarAggregator::new(
2816 bar_type,
2817 instrument.price_precision(),
2818 instrument.size_precision(),
2819 move |bar: Bar| {
2820 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2821 handler_guard.push(bar);
2822 },
2823 );
2824
2825 let trade = TradeTick {
2826 price: Price::from("10.0"),
2827 size: Quantity::from(5),
2828 instrument_id: instrument.id(),
2829 ..TradeTick::default()
2830 };
2831
2832 aggregator.handle_trade(trade);
2833 aggregator.handle_trade(trade);
2834
2835 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2836 assert_eq!(handler_guard.len(), 1);
2837 let bar = handler_guard.first().unwrap();
2838 assert_eq!(bar.volume, Quantity::from(10));
2839 }
2840
2841 #[rstest]
2842 fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2843 let instrument = InstrumentAny::Equity(equity_aapl);
2844 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2845 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2846 let handler = Arc::new(Mutex::new(Vec::new()));
2847 let handler_clone = Arc::clone(&handler);
2848
2849 let mut aggregator = ValueRunsBarAggregator::new(
2850 bar_type,
2851 instrument.price_precision(),
2852 instrument.size_precision(),
2853 move |bar: Bar| {
2854 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2855 handler_guard.push(bar);
2856 },
2857 );
2858
2859 let buy = TradeTick {
2860 price: Price::from("10.0"),
2861 size: Quantity::from(5),
2862 instrument_id: instrument.id(),
2863 ..TradeTick::default()
2864 }; let sell = TradeTick {
2866 price: Price::from("10.0"),
2867 size: Quantity::from(10),
2868 aggressor_side: AggressorSide::Seller,
2869 ..buy
2870 }; aggregator.handle_trade(buy);
2873 aggregator.handle_trade(sell);
2874
2875 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2876 assert_eq!(handler_guard.len(), 1);
2877 assert_eq!(handler_guard[0].volume, Quantity::from(10));
2878 }
2879
2880 #[rstest]
2881 fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2882 let instrument = InstrumentAny::Equity(equity_aapl);
2883 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2884 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2885 let handler = Arc::new(Mutex::new(Vec::new()));
2886 let handler_clone = Arc::clone(&handler);
2887
2888 let mut aggregator = TickRunsBarAggregator::new(
2889 bar_type,
2890 instrument.price_precision(),
2891 instrument.size_precision(),
2892 move |bar: Bar| {
2893 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2894 handler_guard.push(bar);
2895 },
2896 );
2897
2898 let buy = TradeTick::default();
2899
2900 aggregator.handle_trade(buy);
2901 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2906 assert_eq!(handler_guard.len(), 2);
2907 }
2908
2909 #[rstest]
2910 fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2911 let instrument = InstrumentAny::Equity(equity_aapl);
2912 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2913 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2914 let handler = Arc::new(Mutex::new(Vec::new()));
2915 let handler_clone = Arc::clone(&handler);
2916
2917 let mut aggregator = TickRunsBarAggregator::new(
2918 bar_type,
2919 instrument.price_precision(),
2920 instrument.size_precision(),
2921 move |bar: Bar| {
2922 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2923 handler_guard.push(bar);
2924 },
2925 );
2926
2927 let buy = TradeTick::default();
2928 let no_aggressor = TradeTick {
2929 aggressor_side: AggressorSide::NoAggressor,
2930 ..buy
2931 };
2932
2933 aggregator.handle_trade(buy);
2934 aggregator.handle_trade(no_aggressor); aggregator.handle_trade(no_aggressor); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2939 assert_eq!(handler_guard.len(), 1);
2940 }
2941
2942 #[rstest]
2943 fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2944 let instrument = InstrumentAny::Equity(equity_aapl);
2945 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2946 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2947 let handler = Arc::new(Mutex::new(Vec::new()));
2948 let handler_clone = Arc::clone(&handler);
2949
2950 let mut aggregator = VolumeRunsBarAggregator::new(
2951 bar_type,
2952 instrument.price_precision(),
2953 instrument.size_precision(),
2954 move |bar: Bar| {
2955 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2956 handler_guard.push(bar);
2957 },
2958 );
2959
2960 let buy = TradeTick {
2961 instrument_id: instrument.id(),
2962 price: Price::from("1.0"),
2963 size: Quantity::from(1),
2964 ..TradeTick::default()
2965 };
2966
2967 aggregator.handle_trade(buy);
2968 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2973 assert_eq!(handler_guard.len(), 2);
2974 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2975 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2976 }
2977
2978 #[rstest]
2979 fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2980 let instrument = InstrumentAny::Equity(equity_aapl);
2981 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2982 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2983 let handler = Arc::new(Mutex::new(Vec::new()));
2984 let handler_clone = Arc::clone(&handler);
2985
2986 let mut aggregator = ValueRunsBarAggregator::new(
2987 bar_type,
2988 instrument.price_precision(),
2989 instrument.size_precision(),
2990 move |bar: Bar| {
2991 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2992 handler_guard.push(bar);
2993 },
2994 );
2995
2996 let buy = TradeTick {
2997 instrument_id: instrument.id(),
2998 price: Price::from("10.0"),
2999 size: Quantity::from(5),
3000 ..TradeTick::default()
3001 }; aggregator.handle_trade(buy);
3004 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3009 assert_eq!(handler_guard.len(), 2);
3010 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3011 assert_eq!(handler_guard[1].volume, Quantity::from(10));
3012 }
3013
3014 #[rstest]
3015 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
3016 let instrument = InstrumentAny::Equity(equity_aapl);
3017 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3019 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3020 let handler = Arc::new(Mutex::new(Vec::new()));
3021 let handler_clone = Arc::clone(&handler);
3022 let clock = Rc::new(RefCell::new(TestClock::new()));
3023
3024 let mut aggregator = TimeBarAggregator::new(
3025 bar_type,
3026 instrument.price_precision(),
3027 instrument.size_precision(),
3028 clock.clone(),
3029 move |bar: Bar| {
3030 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3031 handler_guard.push(bar);
3032 },
3033 true, false, BarIntervalType::LeftOpen,
3036 None, 15, false, );
3040
3041 aggregator.update(
3042 Price::from("100.00"),
3043 Quantity::from(1),
3044 UnixNanos::default(),
3045 );
3046
3047 let next_sec = UnixNanos::from(1_000_000_000);
3048 clock.borrow_mut().set_time(next_sec);
3049
3050 let event = TimeEvent::new(
3051 Ustr::from("1-SECOND-LAST"),
3052 UUID4::new(),
3053 next_sec,
3054 next_sec,
3055 );
3056 aggregator.build_bar(event);
3057
3058 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3059 assert_eq!(handler_guard.len(), 1);
3060 let bar = handler_guard.first().unwrap();
3061 assert_eq!(bar.ts_event, UnixNanos::default());
3062 assert_eq!(bar.ts_init, next_sec);
3063 }
3064
3065 #[rstest]
3066 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3067 let instrument = InstrumentAny::Equity(equity_aapl);
3068 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3069 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3070 let handler = Arc::new(Mutex::new(Vec::new()));
3071 let handler_clone = Arc::clone(&handler);
3072 let clock = Rc::new(RefCell::new(TestClock::new()));
3073
3074 let mut aggregator = TimeBarAggregator::new(
3075 bar_type,
3076 instrument.price_precision(),
3077 instrument.size_precision(),
3078 clock.clone(),
3079 move |bar: Bar| {
3080 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3081 handler_guard.push(bar);
3082 },
3083 true, true, BarIntervalType::LeftOpen,
3086 None,
3087 15,
3088 false, );
3090
3091 aggregator.update(
3093 Price::from("100.00"),
3094 Quantity::from(1),
3095 UnixNanos::default(),
3096 );
3097
3098 let ts1 = UnixNanos::from(1_000_000_000);
3100 clock.borrow_mut().set_time(ts1);
3101 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3102 aggregator.build_bar(event);
3103
3104 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3106
3107 let ts2 = UnixNanos::from(2_000_000_000);
3109 clock.borrow_mut().set_time(ts2);
3110 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3111 aggregator.build_bar(event);
3112
3113 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3114 assert_eq!(handler_guard.len(), 2);
3115
3116 let bar1 = &handler_guard[0];
3117 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
3119 assert_eq!(bar1.close, Price::from("100.00"));
3120 let bar2 = &handler_guard[1];
3121 assert_eq!(bar2.ts_event, ts2);
3122 assert_eq!(bar2.ts_init, ts2);
3123 assert_eq!(bar2.close, Price::from("101.00"));
3124 }
3125
3126 #[rstest]
3127 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3128 let instrument = InstrumentAny::Equity(equity_aapl);
3129 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3130 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3131 let handler = Arc::new(Mutex::new(Vec::new()));
3132 let handler_clone = Arc::clone(&handler);
3133 let clock = Rc::new(RefCell::new(TestClock::new()));
3134 let mut aggregator = TimeBarAggregator::new(
3135 bar_type,
3136 instrument.price_precision(),
3137 instrument.size_precision(),
3138 clock.clone(),
3139 move |bar: Bar| {
3140 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3141 handler_guard.push(bar);
3142 },
3143 true, true, BarIntervalType::RightOpen,
3146 None,
3147 15,
3148 false, );
3150
3151 aggregator.update(
3153 Price::from("100.00"),
3154 Quantity::from(1),
3155 UnixNanos::default(),
3156 );
3157
3158 let ts1 = UnixNanos::from(1_000_000_000);
3160 clock.borrow_mut().set_time(ts1);
3161 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3162 aggregator.build_bar(event);
3163
3164 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3166
3167 let ts2 = UnixNanos::from(2_000_000_000);
3169 clock.borrow_mut().set_time(ts2);
3170 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3171 aggregator.build_bar(event);
3172
3173 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3174 assert_eq!(handler_guard.len(), 2);
3175
3176 let bar1 = &handler_guard[0];
3177 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
3179 assert_eq!(bar1.close, Price::from("100.00"));
3180
3181 let bar2 = &handler_guard[1];
3182 assert_eq!(bar2.ts_event, ts1);
3183 assert_eq!(bar2.ts_init, ts2);
3184 assert_eq!(bar2.close, Price::from("101.00"));
3185 }
3186
3187 #[rstest]
3188 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3189 let instrument = InstrumentAny::Equity(equity_aapl);
3190 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3191 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3192 let handler = Arc::new(Mutex::new(Vec::new()));
3193 let handler_clone = Arc::clone(&handler);
3194 let clock = Rc::new(RefCell::new(TestClock::new()));
3195
3196 let mut aggregator = TimeBarAggregator::new(
3198 bar_type,
3199 instrument.price_precision(),
3200 instrument.size_precision(),
3201 clock.clone(),
3202 move |bar: Bar| {
3203 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3204 handler_guard.push(bar);
3205 },
3206 false, true, BarIntervalType::LeftOpen,
3209 None,
3210 15,
3211 false, );
3213
3214 let ts1 = UnixNanos::from(1_000_000_000);
3216 clock.borrow_mut().set_time(ts1);
3217 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3218 aggregator.build_bar(event);
3219
3220 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3221 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
3223
3224 let handler = Arc::new(Mutex::new(Vec::new()));
3226 let handler_clone = Arc::clone(&handler);
3227 let mut aggregator = TimeBarAggregator::new(
3228 bar_type,
3229 instrument.price_precision(),
3230 instrument.size_precision(),
3231 clock.clone(),
3232 move |bar: Bar| {
3233 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3234 handler_guard.push(bar);
3235 },
3236 true, true, BarIntervalType::LeftOpen,
3239 None,
3240 15,
3241 false, );
3243
3244 aggregator.update(
3245 Price::from("100.00"),
3246 Quantity::from(1),
3247 UnixNanos::default(),
3248 );
3249
3250 let ts1 = UnixNanos::from(1_000_000_000);
3252 clock.borrow_mut().set_time(ts1);
3253 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3254 aggregator.build_bar(event);
3255
3256 let ts2 = UnixNanos::from(2_000_000_000);
3258 clock.borrow_mut().set_time(ts2);
3259 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3260 aggregator.build_bar(event);
3261
3262 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3263 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
3265 assert_eq!(bar1.close, Price::from("100.00"));
3266 let bar2 = &handler_guard[1];
3267 assert_eq!(bar2.close, Price::from("100.00")); }
3269
3270 #[rstest]
3271 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3272 let instrument = InstrumentAny::Equity(equity_aapl);
3273 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3274 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3275 let clock = Rc::new(RefCell::new(TestClock::new()));
3276 let handler = Arc::new(Mutex::new(Vec::new()));
3277 let handler_clone = Arc::clone(&handler);
3278
3279 let mut aggregator = TimeBarAggregator::new(
3280 bar_type,
3281 instrument.price_precision(),
3282 instrument.size_precision(),
3283 clock.clone(),
3284 move |bar: Bar| {
3285 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3286 handler_guard.push(bar);
3287 },
3288 true, true, BarIntervalType::RightOpen,
3291 None,
3292 15,
3293 false, );
3295
3296 let ts1 = UnixNanos::from(1_000_000_000);
3297 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3298
3299 let ts2 = UnixNanos::from(2_000_000_000);
3300 clock.borrow_mut().set_time(ts2);
3301
3302 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3304 aggregator.build_bar(event);
3305
3306 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3307 let bar = handler_guard.first().unwrap();
3308 assert_eq!(bar.ts_event, UnixNanos::default());
3309 assert_eq!(bar.ts_init, ts2);
3310 }
3311
3312 #[rstest]
3313 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3314 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3315 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3317 let handler = Arc::new(Mutex::new(Vec::new()));
3318 let handler_clone = Arc::clone(&handler);
3319
3320 let aggregator = RenkoBarAggregator::new(
3321 bar_type,
3322 instrument.price_precision(),
3323 instrument.size_precision(),
3324 instrument.price_increment(),
3325 move |bar: Bar| {
3326 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3327 handler_guard.push(bar);
3328 },
3329 );
3330
3331 assert_eq!(aggregator.bar_type(), bar_type);
3332 assert!(!aggregator.is_running());
3333 let expected_brick_size = 10 * instrument.price_increment().raw;
3335 assert_eq!(aggregator.brick_size, expected_brick_size);
3336 }
3337
3338 #[rstest]
3339 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3340 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3341 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3343 let handler = Arc::new(Mutex::new(Vec::new()));
3344 let handler_clone = Arc::clone(&handler);
3345
3346 let mut aggregator = RenkoBarAggregator::new(
3347 bar_type,
3348 instrument.price_precision(),
3349 instrument.size_precision(),
3350 instrument.price_increment(),
3351 move |bar: Bar| {
3352 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3353 handler_guard.push(bar);
3354 },
3355 );
3356
3357 aggregator.update(
3359 Price::from("1.00000"),
3360 Quantity::from(1),
3361 UnixNanos::default(),
3362 );
3363 aggregator.update(
3364 Price::from("1.00005"),
3365 Quantity::from(1),
3366 UnixNanos::from(1000),
3367 );
3368
3369 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3370 assert_eq!(handler_guard.len(), 0); }
3372
3373 #[rstest]
3374 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3375 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3376 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3378 let handler = Arc::new(Mutex::new(Vec::new()));
3379 let handler_clone = Arc::clone(&handler);
3380
3381 let mut aggregator = RenkoBarAggregator::new(
3382 bar_type,
3383 instrument.price_precision(),
3384 instrument.size_precision(),
3385 instrument.price_increment(),
3386 move |bar: Bar| {
3387 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3388 handler_guard.push(bar);
3389 },
3390 );
3391
3392 aggregator.update(
3394 Price::from("1.00000"),
3395 Quantity::from(1),
3396 UnixNanos::default(),
3397 );
3398 aggregator.update(
3399 Price::from("1.00015"),
3400 Quantity::from(1),
3401 UnixNanos::from(1000),
3402 );
3403
3404 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3405 assert_eq!(handler_guard.len(), 1);
3406
3407 let bar = handler_guard.first().unwrap();
3408 assert_eq!(bar.open, Price::from("1.00000"));
3409 assert_eq!(bar.high, Price::from("1.00010"));
3410 assert_eq!(bar.low, Price::from("1.00000"));
3411 assert_eq!(bar.close, Price::from("1.00010"));
3412 assert_eq!(bar.volume, Quantity::from(2));
3413 assert_eq!(bar.ts_event, UnixNanos::from(1000));
3414 assert_eq!(bar.ts_init, UnixNanos::from(1000));
3415 }
3416
3417 #[rstest]
3418 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3419 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3420 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3422 let handler = Arc::new(Mutex::new(Vec::new()));
3423 let handler_clone = Arc::clone(&handler);
3424
3425 let mut aggregator = RenkoBarAggregator::new(
3426 bar_type,
3427 instrument.price_precision(),
3428 instrument.size_precision(),
3429 instrument.price_increment(),
3430 move |bar: Bar| {
3431 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3432 handler_guard.push(bar);
3433 },
3434 );
3435
3436 aggregator.update(
3438 Price::from("1.00000"),
3439 Quantity::from(1),
3440 UnixNanos::default(),
3441 );
3442 aggregator.update(
3443 Price::from("1.00025"),
3444 Quantity::from(1),
3445 UnixNanos::from(1000),
3446 );
3447
3448 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3449 assert_eq!(handler_guard.len(), 2);
3450
3451 let bar1 = &handler_guard[0];
3452 assert_eq!(bar1.open, Price::from("1.00000"));
3453 assert_eq!(bar1.high, Price::from("1.00010"));
3454 assert_eq!(bar1.low, Price::from("1.00000"));
3455 assert_eq!(bar1.close, Price::from("1.00010"));
3456
3457 let bar2 = &handler_guard[1];
3458 assert_eq!(bar2.open, Price::from("1.00010"));
3459 assert_eq!(bar2.high, Price::from("1.00020"));
3460 assert_eq!(bar2.low, Price::from("1.00010"));
3461 assert_eq!(bar2.close, Price::from("1.00020"));
3462 }
3463
3464 #[rstest]
3465 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3466 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3467 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3469 let handler = Arc::new(Mutex::new(Vec::new()));
3470 let handler_clone = Arc::clone(&handler);
3471
3472 let mut aggregator = RenkoBarAggregator::new(
3473 bar_type,
3474 instrument.price_precision(),
3475 instrument.size_precision(),
3476 instrument.price_increment(),
3477 move |bar: Bar| {
3478 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3479 handler_guard.push(bar);
3480 },
3481 );
3482
3483 aggregator.update(
3485 Price::from("1.00020"),
3486 Quantity::from(1),
3487 UnixNanos::default(),
3488 );
3489 aggregator.update(
3490 Price::from("1.00005"),
3491 Quantity::from(1),
3492 UnixNanos::from(1000),
3493 );
3494
3495 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3496 assert_eq!(handler_guard.len(), 1);
3497
3498 let bar = handler_guard.first().unwrap();
3499 assert_eq!(bar.open, Price::from("1.00020"));
3500 assert_eq!(bar.high, Price::from("1.00020"));
3501 assert_eq!(bar.low, Price::from("1.00010"));
3502 assert_eq!(bar.close, Price::from("1.00010"));
3503 assert_eq!(bar.volume, Quantity::from(2));
3504 }
3505
3506 #[rstest]
3507 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3508 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3509 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3511 let handler = Arc::new(Mutex::new(Vec::new()));
3512 let handler_clone = Arc::clone(&handler);
3513
3514 let mut aggregator = RenkoBarAggregator::new(
3515 bar_type,
3516 instrument.price_precision(),
3517 instrument.size_precision(),
3518 instrument.price_increment(),
3519 move |bar: Bar| {
3520 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3521 handler_guard.push(bar);
3522 },
3523 );
3524
3525 let input_bar = Bar::new(
3527 BarType::new(
3528 instrument.id(),
3529 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3530 AggregationSource::Internal,
3531 ),
3532 Price::from("1.00000"),
3533 Price::from("1.00005"),
3534 Price::from("0.99995"),
3535 Price::from("1.00005"), Quantity::from(100),
3537 UnixNanos::default(),
3538 UnixNanos::from(1000),
3539 );
3540
3541 aggregator.handle_bar(input_bar);
3542
3543 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3544 assert_eq!(handler_guard.len(), 0); }
3546
3547 #[rstest]
3548 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3549 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3550 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3552 let handler = Arc::new(Mutex::new(Vec::new()));
3553 let handler_clone = Arc::clone(&handler);
3554
3555 let mut aggregator = RenkoBarAggregator::new(
3556 bar_type,
3557 instrument.price_precision(),
3558 instrument.size_precision(),
3559 instrument.price_increment(),
3560 move |bar: Bar| {
3561 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3562 handler_guard.push(bar);
3563 },
3564 );
3565
3566 let bar1 = Bar::new(
3568 BarType::new(
3569 instrument.id(),
3570 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3571 AggregationSource::Internal,
3572 ),
3573 Price::from("1.00000"),
3574 Price::from("1.00005"),
3575 Price::from("0.99995"),
3576 Price::from("1.00000"),
3577 Quantity::from(100),
3578 UnixNanos::default(),
3579 UnixNanos::default(),
3580 );
3581
3582 let bar2 = Bar::new(
3584 BarType::new(
3585 instrument.id(),
3586 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3587 AggregationSource::Internal,
3588 ),
3589 Price::from("1.00000"),
3590 Price::from("1.00015"),
3591 Price::from("0.99995"),
3592 Price::from("1.00010"), Quantity::from(50),
3594 UnixNanos::from(60_000_000_000),
3595 UnixNanos::from(60_000_000_000),
3596 );
3597
3598 aggregator.handle_bar(bar1);
3599 aggregator.handle_bar(bar2);
3600
3601 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3602 assert_eq!(handler_guard.len(), 1);
3603
3604 let bar = handler_guard.first().unwrap();
3605 assert_eq!(bar.open, Price::from("1.00000"));
3606 assert_eq!(bar.high, Price::from("1.00010"));
3607 assert_eq!(bar.low, Price::from("1.00000"));
3608 assert_eq!(bar.close, Price::from("1.00010"));
3609 assert_eq!(bar.volume, Quantity::from(150));
3610 }
3611
3612 #[rstest]
3613 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3614 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3615 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3617 let handler = Arc::new(Mutex::new(Vec::new()));
3618 let handler_clone = Arc::clone(&handler);
3619
3620 let mut aggregator = RenkoBarAggregator::new(
3621 bar_type,
3622 instrument.price_precision(),
3623 instrument.size_precision(),
3624 instrument.price_increment(),
3625 move |bar: Bar| {
3626 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3627 handler_guard.push(bar);
3628 },
3629 );
3630
3631 let bar1 = Bar::new(
3633 BarType::new(
3634 instrument.id(),
3635 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3636 AggregationSource::Internal,
3637 ),
3638 Price::from("1.00000"),
3639 Price::from("1.00005"),
3640 Price::from("0.99995"),
3641 Price::from("1.00000"),
3642 Quantity::from(100),
3643 UnixNanos::default(),
3644 UnixNanos::default(),
3645 );
3646
3647 let bar2 = Bar::new(
3649 BarType::new(
3650 instrument.id(),
3651 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3652 AggregationSource::Internal,
3653 ),
3654 Price::from("1.00000"),
3655 Price::from("1.00035"),
3656 Price::from("0.99995"),
3657 Price::from("1.00030"), Quantity::from(50),
3659 UnixNanos::from(60_000_000_000),
3660 UnixNanos::from(60_000_000_000),
3661 );
3662
3663 aggregator.handle_bar(bar1);
3664 aggregator.handle_bar(bar2);
3665
3666 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3667 assert_eq!(handler_guard.len(), 3);
3668
3669 let bar1 = &handler_guard[0];
3670 assert_eq!(bar1.open, Price::from("1.00000"));
3671 assert_eq!(bar1.close, Price::from("1.00010"));
3672
3673 let bar2 = &handler_guard[1];
3674 assert_eq!(bar2.open, Price::from("1.00010"));
3675 assert_eq!(bar2.close, Price::from("1.00020"));
3676
3677 let bar3 = &handler_guard[2];
3678 assert_eq!(bar3.open, Price::from("1.00020"));
3679 assert_eq!(bar3.close, Price::from("1.00030"));
3680 }
3681
3682 #[rstest]
3683 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3684 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3685 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3687 let handler = Arc::new(Mutex::new(Vec::new()));
3688 let handler_clone = Arc::clone(&handler);
3689
3690 let mut aggregator = RenkoBarAggregator::new(
3691 bar_type,
3692 instrument.price_precision(),
3693 instrument.size_precision(),
3694 instrument.price_increment(),
3695 move |bar: Bar| {
3696 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3697 handler_guard.push(bar);
3698 },
3699 );
3700
3701 let bar1 = Bar::new(
3703 BarType::new(
3704 instrument.id(),
3705 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3706 AggregationSource::Internal,
3707 ),
3708 Price::from("1.00020"),
3709 Price::from("1.00025"),
3710 Price::from("1.00015"),
3711 Price::from("1.00020"),
3712 Quantity::from(100),
3713 UnixNanos::default(),
3714 UnixNanos::default(),
3715 );
3716
3717 let bar2 = Bar::new(
3719 BarType::new(
3720 instrument.id(),
3721 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3722 AggregationSource::Internal,
3723 ),
3724 Price::from("1.00020"),
3725 Price::from("1.00025"),
3726 Price::from("1.00005"),
3727 Price::from("1.00010"), Quantity::from(50),
3729 UnixNanos::from(60_000_000_000),
3730 UnixNanos::from(60_000_000_000),
3731 );
3732
3733 aggregator.handle_bar(bar1);
3734 aggregator.handle_bar(bar2);
3735
3736 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3737 assert_eq!(handler_guard.len(), 1);
3738
3739 let bar = handler_guard.first().unwrap();
3740 assert_eq!(bar.open, Price::from("1.00020"));
3741 assert_eq!(bar.high, Price::from("1.00020"));
3742 assert_eq!(bar.low, Price::from("1.00010"));
3743 assert_eq!(bar.close, Price::from("1.00010"));
3744 assert_eq!(bar.volume, Quantity::from(150));
3745 }
3746
3747 #[rstest]
3748 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3749 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3750
3751 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3754 let handler = Arc::new(Mutex::new(Vec::new()));
3755 let handler_clone = Arc::clone(&handler);
3756
3757 let aggregator_5 = RenkoBarAggregator::new(
3758 bar_type_5,
3759 instrument.price_precision(),
3760 instrument.size_precision(),
3761 instrument.price_increment(),
3762 move |_bar: Bar| {
3763 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3764 handler_guard.push(_bar);
3765 },
3766 );
3767
3768 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3770 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3771
3772 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3774 let handler2 = Arc::new(Mutex::new(Vec::new()));
3775 let handler2_clone = Arc::clone(&handler2);
3776
3777 let aggregator_20 = RenkoBarAggregator::new(
3778 bar_type_20,
3779 instrument.price_precision(),
3780 instrument.size_precision(),
3781 instrument.price_increment(),
3782 move |_bar: Bar| {
3783 let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3784 handler_guard.push(_bar);
3785 },
3786 );
3787
3788 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3790 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3791 }
3792
3793 #[rstest]
3794 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3795 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3796 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3798 let handler = Arc::new(Mutex::new(Vec::new()));
3799 let handler_clone = Arc::clone(&handler);
3800
3801 let mut aggregator = RenkoBarAggregator::new(
3802 bar_type,
3803 instrument.price_precision(),
3804 instrument.size_precision(),
3805 instrument.price_increment(),
3806 move |bar: Bar| {
3807 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3808 handler_guard.push(bar);
3809 },
3810 );
3811
3812 aggregator.update(
3814 Price::from("1.00000"),
3815 Quantity::from(1),
3816 UnixNanos::from(1000),
3817 );
3818 aggregator.update(
3819 Price::from("1.00010"),
3820 Quantity::from(1),
3821 UnixNanos::from(2000),
3822 ); aggregator.update(
3824 Price::from("1.00020"),
3825 Quantity::from(1),
3826 UnixNanos::from(3000),
3827 ); aggregator.update(
3829 Price::from("1.00025"),
3830 Quantity::from(1),
3831 UnixNanos::from(4000),
3832 ); aggregator.update(
3834 Price::from("1.00030"),
3835 Quantity::from(1),
3836 UnixNanos::from(5000),
3837 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3840 assert_eq!(handler_guard.len(), 3);
3841
3842 let bar1 = &handler_guard[0];
3843 assert_eq!(bar1.open, Price::from("1.00000"));
3844 assert_eq!(bar1.close, Price::from("1.00010"));
3845
3846 let bar2 = &handler_guard[1];
3847 assert_eq!(bar2.open, Price::from("1.00010"));
3848 assert_eq!(bar2.close, Price::from("1.00020"));
3849
3850 let bar3 = &handler_guard[2];
3851 assert_eq!(bar3.open, Price::from("1.00020"));
3852 assert_eq!(bar3.close, Price::from("1.00030"));
3853 }
3854
3855 #[rstest]
3856 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3857 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3858 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3860 let handler = Arc::new(Mutex::new(Vec::new()));
3861 let handler_clone = Arc::clone(&handler);
3862
3863 let mut aggregator = RenkoBarAggregator::new(
3864 bar_type,
3865 instrument.price_precision(),
3866 instrument.size_precision(),
3867 instrument.price_increment(),
3868 move |bar: Bar| {
3869 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3870 handler_guard.push(bar);
3871 },
3872 );
3873
3874 aggregator.update(
3876 Price::from("1.00000"),
3877 Quantity::from(1),
3878 UnixNanos::from(1000),
3879 );
3880 aggregator.update(
3881 Price::from("1.00010"),
3882 Quantity::from(1),
3883 UnixNanos::from(2000),
3884 ); aggregator.update(
3886 Price::from("0.99990"),
3887 Quantity::from(1),
3888 UnixNanos::from(3000),
3889 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3892 assert_eq!(handler_guard.len(), 3);
3893
3894 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
3896 assert_eq!(bar1.high, Price::from("1.00010"));
3897 assert_eq!(bar1.low, Price::from("1.00000"));
3898 assert_eq!(bar1.close, Price::from("1.00010"));
3899
3900 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
3902 assert_eq!(bar2.high, Price::from("1.00010"));
3903 assert_eq!(bar2.low, Price::from("1.00000"));
3904 assert_eq!(bar2.close, Price::from("1.00000"));
3905
3906 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
3908 assert_eq!(bar3.high, Price::from("1.00000"));
3909 assert_eq!(bar3.low, Price::from("0.99990"));
3910 assert_eq!(bar3.close, Price::from("0.99990"));
3911 }
3912
3913 #[rstest]
3914 fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3915 let instrument = InstrumentAny::Equity(equity_aapl);
3916 let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3917 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3918 let handler = Arc::new(Mutex::new(Vec::new()));
3919 let handler_clone = Arc::clone(&handler);
3920
3921 let mut aggregator = TickImbalanceBarAggregator::new(
3922 bar_type,
3923 instrument.price_precision(),
3924 instrument.size_precision(),
3925 move |bar: Bar| {
3926 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3927 handler_guard.push(bar);
3928 },
3929 );
3930
3931 let buy = TradeTick {
3932 aggressor_side: AggressorSide::Buyer,
3933 ..TradeTick::default()
3934 };
3935 let sell = TradeTick {
3936 aggressor_side: AggressorSide::Seller,
3937 ..TradeTick::default()
3938 };
3939
3940 aggregator.handle_trade(buy);
3941 aggregator.handle_trade(sell);
3942 aggregator.handle_trade(buy);
3943
3944 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3945 assert_eq!(handler_guard.len(), 0);
3946 }
3947
3948 #[rstest]
3949 fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3950 let instrument = InstrumentAny::Equity(equity_aapl);
3951 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3952 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3953 let handler = Arc::new(Mutex::new(Vec::new()));
3954 let handler_clone = Arc::clone(&handler);
3955
3956 let mut aggregator = TickImbalanceBarAggregator::new(
3957 bar_type,
3958 instrument.price_precision(),
3959 instrument.size_precision(),
3960 move |bar: Bar| {
3961 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3962 handler_guard.push(bar);
3963 },
3964 );
3965
3966 let buy = TradeTick {
3967 aggressor_side: AggressorSide::Buyer,
3968 ..TradeTick::default()
3969 };
3970 let no_aggressor = TradeTick {
3971 aggressor_side: AggressorSide::NoAggressor,
3972 ..TradeTick::default()
3973 };
3974
3975 aggregator.handle_trade(buy);
3976 aggregator.handle_trade(no_aggressor);
3977 aggregator.handle_trade(buy);
3978
3979 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3980 assert_eq!(handler_guard.len(), 1);
3981 }
3982
3983 #[rstest]
3984 fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3985 let instrument = InstrumentAny::Equity(equity_aapl);
3986 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3987 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3988 let handler = Arc::new(Mutex::new(Vec::new()));
3989 let handler_clone = Arc::clone(&handler);
3990
3991 let mut aggregator = TickRunsBarAggregator::new(
3992 bar_type,
3993 instrument.price_precision(),
3994 instrument.size_precision(),
3995 move |bar: Bar| {
3996 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3997 handler_guard.push(bar);
3998 },
3999 );
4000
4001 let buy = TradeTick {
4002 aggressor_side: AggressorSide::Buyer,
4003 ..TradeTick::default()
4004 };
4005 let sell = TradeTick {
4006 aggressor_side: AggressorSide::Seller,
4007 ..TradeTick::default()
4008 };
4009
4010 aggregator.handle_trade(buy);
4011 aggregator.handle_trade(buy);
4012 aggregator.handle_trade(sell);
4013 aggregator.handle_trade(sell);
4014
4015 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4016 assert_eq!(handler_guard.len(), 2);
4017 }
4018
4019 #[rstest]
4020 fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4021 let instrument = InstrumentAny::Equity(equity_aapl);
4022 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4023 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4024 let handler = Arc::new(Mutex::new(Vec::new()));
4025 let handler_clone = Arc::clone(&handler);
4026
4027 let mut aggregator = VolumeImbalanceBarAggregator::new(
4028 bar_type,
4029 instrument.price_precision(),
4030 instrument.size_precision(),
4031 move |bar: Bar| {
4032 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4033 handler_guard.push(bar);
4034 },
4035 );
4036
4037 let large_trade = TradeTick {
4038 size: Quantity::from(25),
4039 aggressor_side: AggressorSide::Buyer,
4040 ..TradeTick::default()
4041 };
4042
4043 aggregator.handle_trade(large_trade);
4044
4045 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4046 assert_eq!(handler_guard.len(), 2);
4047 }
4048
4049 #[rstest]
4050 fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4051 equity_aapl: Equity,
4052 ) {
4053 let instrument = InstrumentAny::Equity(equity_aapl);
4054 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4055 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4056 let handler = Arc::new(Mutex::new(Vec::new()));
4057 let handler_clone = Arc::clone(&handler);
4058
4059 let mut aggregator = VolumeImbalanceBarAggregator::new(
4060 bar_type,
4061 instrument.price_precision(),
4062 instrument.size_precision(),
4063 move |bar: Bar| {
4064 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4065 handler_guard.push(bar);
4066 },
4067 );
4068
4069 let buy = TradeTick {
4070 size: Quantity::from(5),
4071 aggressor_side: AggressorSide::Buyer,
4072 ..TradeTick::default()
4073 };
4074 let no_aggressor = TradeTick {
4075 size: Quantity::from(3),
4076 aggressor_side: AggressorSide::NoAggressor,
4077 ..TradeTick::default()
4078 };
4079
4080 aggregator.handle_trade(buy);
4081 aggregator.handle_trade(no_aggressor);
4082 aggregator.handle_trade(buy);
4083
4084 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4085 assert_eq!(handler_guard.len(), 1);
4086 }
4087
4088 #[rstest]
4089 fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4090 let instrument = InstrumentAny::Equity(equity_aapl);
4091 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4092 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4093 let handler = Arc::new(Mutex::new(Vec::new()));
4094 let handler_clone = Arc::clone(&handler);
4095
4096 let mut aggregator = VolumeRunsBarAggregator::new(
4097 bar_type,
4098 instrument.price_precision(),
4099 instrument.size_precision(),
4100 move |bar: Bar| {
4101 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4102 handler_guard.push(bar);
4103 },
4104 );
4105
4106 let large_trade = TradeTick {
4107 size: Quantity::from(25),
4108 aggressor_side: AggressorSide::Buyer,
4109 ..TradeTick::default()
4110 };
4111
4112 aggregator.handle_trade(large_trade);
4113
4114 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4115 assert_eq!(handler_guard.len(), 2);
4116 }
4117
4118 #[rstest]
4119 fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4120 let instrument = InstrumentAny::Equity(equity_aapl);
4121 let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4122 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4123 let handler = Arc::new(Mutex::new(Vec::new()));
4124 let handler_clone = Arc::clone(&handler);
4125
4126 let mut aggregator = ValueRunsBarAggregator::new(
4127 bar_type,
4128 instrument.price_precision(),
4129 instrument.size_precision(),
4130 move |bar: Bar| {
4131 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4132 handler_guard.push(bar);
4133 },
4134 );
4135
4136 let large_trade = TradeTick {
4137 price: Price::from("5.00"),
4138 size: Quantity::from(25),
4139 aggressor_side: AggressorSide::Buyer,
4140 ..TradeTick::default()
4141 };
4142
4143 aggregator.handle_trade(large_trade);
4144
4145 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4146 assert_eq!(handler_guard.len(), 2);
4147 }
4148
4149 #[rstest]
4150 fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4151 let instrument = InstrumentAny::Equity(equity_aapl);
4152 let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
4153 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4154 let handler = Arc::new(Mutex::new(Vec::new()));
4155 let handler_clone = Arc::clone(&handler);
4156
4157 let mut aggregator = ValueBarAggregator::new(
4158 bar_type,
4159 instrument.price_precision(),
4160 instrument.size_precision(),
4161 move |bar: Bar| {
4162 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4163 handler_guard.push(bar);
4164 },
4165 );
4166
4167 aggregator.update(
4169 Price::from("1000.00"),
4170 Quantity::from(3),
4171 UnixNanos::default(),
4172 );
4173
4174 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4176 assert_eq!(handler_guard.len(), 3);
4177 for bar in handler_guard.iter() {
4178 assert_eq!(bar.volume, Quantity::from(1));
4179 }
4180 }
4181
4182 #[rstest]
4183 fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4184 let instrument = InstrumentAny::Equity(equity_aapl);
4185 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
4186 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4187 let handler = Arc::new(Mutex::new(Vec::new()));
4188 let handler_clone = Arc::clone(&handler);
4189
4190 let mut aggregator = ValueImbalanceBarAggregator::new(
4191 bar_type,
4192 instrument.price_precision(),
4193 instrument.size_precision(),
4194 move |bar: Bar| {
4195 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4196 handler_guard.push(bar);
4197 },
4198 );
4199
4200 let trade = TradeTick {
4201 price: Price::from("1000.00"),
4202 size: Quantity::from(3),
4203 aggressor_side: AggressorSide::Buyer,
4204 instrument_id: instrument.id(),
4205 ..TradeTick::default()
4206 };
4207
4208 aggregator.handle_trade(trade);
4209
4210 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4211 assert_eq!(handler_guard.len(), 3);
4212 for bar in handler_guard.iter() {
4213 assert_eq!(bar.volume, Quantity::from(1));
4214 }
4215 }
4216
4217 #[rstest]
4218 fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
4219 let instrument = InstrumentAny::Equity(equity_aapl);
4220 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
4221 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4222 let handler = Arc::new(Mutex::new(Vec::new()));
4223 let handler_clone = Arc::clone(&handler);
4224
4225 let mut aggregator = ValueImbalanceBarAggregator::new(
4226 bar_type,
4227 instrument.price_precision(),
4228 instrument.size_precision(),
4229 move |bar: Bar| {
4230 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4231 handler_guard.push(bar);
4232 },
4233 );
4234
4235 let sell_tick = TradeTick {
4237 price: Price::from("10.00"),
4238 size: Quantity::from(5),
4239 aggressor_side: AggressorSide::Seller,
4240 instrument_id: instrument.id(),
4241 ..TradeTick::default()
4242 };
4243
4244 let buy_tick = TradeTick {
4247 price: Price::from("1000.00"),
4248 size: Quantity::from(1),
4249 aggressor_side: AggressorSide::Buyer,
4250 instrument_id: instrument.id(),
4251 ts_init: UnixNanos::from(1),
4252 ts_event: UnixNanos::from(1),
4253 ..TradeTick::default()
4254 };
4255
4256 aggregator.handle_trade(sell_tick);
4257 aggregator.handle_trade(buy_tick);
4258
4259 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4260 assert_eq!(handler_guard.len(), 1);
4261 assert_eq!(handler_guard[0].volume, Quantity::from(6));
4262 }
4263
4264 #[rstest]
4265 fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4266 let instrument = InstrumentAny::Equity(equity_aapl);
4267 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4268 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4269 let handler = Arc::new(Mutex::new(Vec::new()));
4270 let handler_clone = Arc::clone(&handler);
4271
4272 let mut aggregator = ValueRunsBarAggregator::new(
4273 bar_type,
4274 instrument.price_precision(),
4275 instrument.size_precision(),
4276 move |bar: Bar| {
4277 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4278 handler_guard.push(bar);
4279 },
4280 );
4281
4282 let trade = TradeTick {
4283 price: Price::from("1000.00"),
4284 size: Quantity::from(3),
4285 aggressor_side: AggressorSide::Buyer,
4286 instrument_id: instrument.id(),
4287 ..TradeTick::default()
4288 };
4289
4290 aggregator.handle_trade(trade);
4291
4292 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4293 assert_eq!(handler_guard.len(), 3);
4294 for bar in handler_guard.iter() {
4295 assert_eq!(bar.volume, Quantity::from(1));
4296 }
4297 }
4298
4299 #[rstest]
4300 #[case(1000_u64)]
4301 #[case(1500_u64)]
4302 fn test_volume_imbalance_bar_aggregator_large_step_no_overflow(
4303 equity_aapl: Equity,
4304 #[case] step: u64,
4305 ) {
4306 let instrument = InstrumentAny::Equity(equity_aapl);
4307 let bar_spec = BarSpecification::new(
4308 step as usize,
4309 BarAggregation::VolumeImbalance,
4310 PriceType::Last,
4311 );
4312 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4313 let handler = Arc::new(Mutex::new(Vec::new()));
4314 let handler_clone = Arc::clone(&handler);
4315
4316 let mut aggregator = VolumeImbalanceBarAggregator::new(
4317 bar_type,
4318 instrument.price_precision(),
4319 instrument.size_precision(),
4320 move |bar: Bar| {
4321 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4322 handler_guard.push(bar);
4323 },
4324 );
4325
4326 let trade = TradeTick {
4327 size: Quantity::from(step * 2),
4328 aggressor_side: AggressorSide::Buyer,
4329 ..TradeTick::default()
4330 };
4331
4332 aggregator.handle_trade(trade);
4333
4334 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4335 assert_eq!(handler_guard.len(), 2);
4336 for bar in handler_guard.iter() {
4337 assert_eq!(bar.volume.as_f64(), step as f64);
4338 }
4339 }
4340
4341 #[rstest]
4342 fn test_volume_imbalance_bar_aggregator_different_large_steps_produce_different_bar_counts(
4343 equity_aapl: Equity,
4344 ) {
4345 let instrument = InstrumentAny::Equity(equity_aapl);
4346 let total_volume = 3000_u64;
4347 let mut results = Vec::new();
4348
4349 for step in [1000_usize, 1500] {
4350 let bar_spec =
4351 BarSpecification::new(step, BarAggregation::VolumeImbalance, PriceType::Last);
4352 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4353 let handler = Arc::new(Mutex::new(Vec::new()));
4354 let handler_clone = Arc::clone(&handler);
4355
4356 let mut aggregator = VolumeImbalanceBarAggregator::new(
4357 bar_type,
4358 instrument.price_precision(),
4359 instrument.size_precision(),
4360 move |bar: Bar| {
4361 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4362 handler_guard.push(bar);
4363 },
4364 );
4365
4366 let trade = TradeTick {
4367 size: Quantity::from(total_volume),
4368 aggressor_side: AggressorSide::Buyer,
4369 ..TradeTick::default()
4370 };
4371
4372 aggregator.handle_trade(trade);
4373
4374 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4375 results.push(handler_guard.len());
4376 }
4377
4378 assert_eq!(results[0], 3); assert_eq!(results[1], 2); assert_ne!(results[0], results[1]);
4381 }
4382
4383 #[rstest]
4384 #[case(1000_u64)]
4385 #[case(1500_u64)]
4386 fn test_volume_runs_bar_aggregator_large_step_no_overflow(
4387 equity_aapl: Equity,
4388 #[case] step: u64,
4389 ) {
4390 let instrument = InstrumentAny::Equity(equity_aapl);
4391 let bar_spec =
4392 BarSpecification::new(step as usize, BarAggregation::VolumeRuns, PriceType::Last);
4393 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4394 let handler = Arc::new(Mutex::new(Vec::new()));
4395 let handler_clone = Arc::clone(&handler);
4396
4397 let mut aggregator = VolumeRunsBarAggregator::new(
4398 bar_type,
4399 instrument.price_precision(),
4400 instrument.size_precision(),
4401 move |bar: Bar| {
4402 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4403 handler_guard.push(bar);
4404 },
4405 );
4406
4407 let trade = TradeTick {
4408 size: Quantity::from(step * 2),
4409 aggressor_side: AggressorSide::Buyer,
4410 ..TradeTick::default()
4411 };
4412
4413 aggregator.handle_trade(trade);
4414
4415 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4416 assert_eq!(handler_guard.len(), 2);
4417 for bar in handler_guard.iter() {
4418 assert_eq!(bar.volume.as_f64(), step as f64);
4419 }
4420 }
4421
4422 #[rstest]
4423 fn test_volume_runs_bar_aggregator_different_large_steps_produce_different_bar_counts(
4424 equity_aapl: Equity,
4425 ) {
4426 let instrument = InstrumentAny::Equity(equity_aapl);
4427 let total_volume = 3000_u64;
4428 let mut results = Vec::new();
4429
4430 for step in [1000_usize, 1500] {
4431 let bar_spec = BarSpecification::new(step, BarAggregation::VolumeRuns, PriceType::Last);
4432 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4433 let handler = Arc::new(Mutex::new(Vec::new()));
4434 let handler_clone = Arc::clone(&handler);
4435
4436 let mut aggregator = VolumeRunsBarAggregator::new(
4437 bar_type,
4438 instrument.price_precision(),
4439 instrument.size_precision(),
4440 move |bar: Bar| {
4441 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4442 handler_guard.push(bar);
4443 },
4444 );
4445
4446 let trade = TradeTick {
4447 size: Quantity::from(total_volume),
4448 aggressor_side: AggressorSide::Buyer,
4449 ..TradeTick::default()
4450 };
4451
4452 aggregator.handle_trade(trade);
4453
4454 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4455 results.push(handler_guard.len());
4456 }
4457
4458 assert_eq!(results[0], 3); assert_eq!(results[1], 2); assert_ne!(results[0], results[1]);
4461 }
4462}