1use std::{
22 any::Any,
23 cell::RefCell,
24 fmt::Debug,
25 ops::Add,
26 rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31 clock::{Clock, TestClock},
32 timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35 UnixNanos,
36 correctness::{self, FAILED},
37 datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40 data::{
41 QuoteTick, TradeTick,
42 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43 },
44 enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45 types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48type BarHandler = Box<dyn FnMut(Bar)>;
50
51pub trait BarAggregator: Any + Debug {
55 fn bar_type(&self) -> BarType;
57 fn is_running(&self) -> bool;
59 fn set_is_running(&mut self, value: bool);
61 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63 fn handle_quote(&mut self, quote: QuoteTick) {
65 let spec = self.bar_type().spec();
66 self.update(
67 quote.extract_price(spec.price_type),
68 quote.extract_size(spec.price_type),
69 quote.ts_init,
70 );
71 }
72 fn handle_trade(&mut self, trade: TradeTick) {
74 self.update(trade.price, trade.size, trade.ts_init);
75 }
76 fn handle_bar(&mut self, bar: Bar) {
78 self.update_bar(bar, bar.volume, bar.ts_init);
79 }
80 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81 fn stop(&mut self) {}
83 fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85 fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87 fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89 fn build_bar(&mut self, _event: TimeEvent) {}
91 fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95 fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101 pub fn as_any(&self) -> &dyn Any {
103 self
104 }
105 pub fn as_any_mut(&mut self) -> &mut dyn Any {
107 self
108 }
109}
110
111#[derive(Debug)]
113pub struct BarBuilder {
114 bar_type: BarType,
115 price_precision: u8,
116 size_precision: u8,
117 initialized: bool,
118 ts_last: UnixNanos,
119 count: usize,
120 last_close: Option<Price>,
121 open: Option<Price>,
122 high: Option<Price>,
123 low: Option<Price>,
124 close: Option<Price>,
125 volume: Quantity,
126}
127
128impl BarBuilder {
129 #[must_use]
135 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
136 correctness::check_equal(
137 &bar_type.aggregation_source(),
138 &AggregationSource::Internal,
139 "bar_type.aggregation_source",
140 "AggregationSource::Internal",
141 )
142 .expect(FAILED);
143
144 Self {
145 bar_type,
146 price_precision,
147 size_precision,
148 initialized: false,
149 ts_last: UnixNanos::default(),
150 count: 0,
151 last_close: None,
152 open: None,
153 high: None,
154 low: None,
155 close: None,
156 volume: Quantity::zero(size_precision),
157 }
158 }
159
160 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
166 if ts_init < self.ts_last {
167 return; }
169
170 if self.open.is_none() {
171 self.open = Some(price);
172 self.high = Some(price);
173 self.low = Some(price);
174 self.initialized = true;
175 } else {
176 if price > self.high.unwrap() {
177 self.high = Some(price);
178 }
179 if price < self.low.unwrap() {
180 self.low = Some(price);
181 }
182 }
183
184 self.close = Some(price);
185 self.volume = self.volume.add(size);
186 self.count += 1;
187 self.ts_last = ts_init;
188 }
189
190 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
196 if ts_init < self.ts_last {
197 return; }
199
200 if self.open.is_none() {
201 self.open = Some(bar.open);
202 self.high = Some(bar.high);
203 self.low = Some(bar.low);
204 self.initialized = true;
205 } else {
206 if bar.high > self.high.unwrap() {
207 self.high = Some(bar.high);
208 }
209 if bar.low < self.low.unwrap() {
210 self.low = Some(bar.low);
211 }
212 }
213
214 self.close = Some(bar.close);
215 self.volume = self.volume.add(volume);
216 self.count += 1;
217 self.ts_last = ts_init;
218 }
219
220 pub fn reset(&mut self) {
224 self.open = None;
225 self.high = None;
226 self.low = None;
227 self.volume = Quantity::zero(self.size_precision);
228 self.count = 0;
229 }
230
231 pub fn build_now(&mut self) -> Bar {
233 self.build(self.ts_last, self.ts_last)
234 }
235
236 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
242 if self.open.is_none() {
243 self.open = self.last_close;
244 self.high = self.last_close;
245 self.low = self.last_close;
246 self.close = self.last_close;
247 }
248
249 if let (Some(close), Some(low)) = (self.close, self.low)
250 && close < low
251 {
252 self.low = Some(close);
253 }
254
255 if let (Some(close), Some(high)) = (self.close, self.high)
256 && close > high
257 {
258 self.high = Some(close);
259 }
260
261 let bar = Bar::new(
263 self.bar_type,
264 self.open.unwrap(),
265 self.high.unwrap(),
266 self.low.unwrap(),
267 self.close.unwrap(),
268 self.volume,
269 ts_event,
270 ts_init,
271 );
272
273 self.last_close = self.close;
274 self.reset();
275 bar
276 }
277}
278
279pub struct BarAggregatorCore {
281 bar_type: BarType,
282 builder: BarBuilder,
283 handler: BarHandler,
284 is_running: bool,
285}
286
287impl Debug for BarAggregatorCore {
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 f.debug_struct(stringify!(BarAggregatorCore))
290 .field("bar_type", &self.bar_type)
291 .field("builder", &self.builder)
292 .field("is_running", &self.is_running)
293 .finish()
294 }
295}
296
297impl BarAggregatorCore {
298 pub fn new<H: FnMut(Bar) + 'static>(
304 bar_type: BarType,
305 price_precision: u8,
306 size_precision: u8,
307 handler: H,
308 ) -> Self {
309 Self {
310 bar_type,
311 builder: BarBuilder::new(bar_type, price_precision, size_precision),
312 handler: Box::new(handler),
313 is_running: false,
314 }
315 }
316
317 pub const fn set_is_running(&mut self, value: bool) {
319 self.is_running = value;
320 }
321 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
322 self.builder.update(price, size, ts_init);
323 }
324
325 fn build_now_and_send(&mut self) {
326 let bar = self.builder.build_now();
327 (self.handler)(bar);
328 }
329
330 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
331 let bar = self.builder.build(ts_event, ts_init);
332 (self.handler)(bar);
333 }
334}
335
336pub struct TickBarAggregator {
341 core: BarAggregatorCore,
342}
343
344impl Debug for TickBarAggregator {
345 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346 f.debug_struct(stringify!(TickBarAggregator))
347 .field("core", &self.core)
348 .finish()
349 }
350}
351
352impl TickBarAggregator {
353 pub fn new<H: FnMut(Bar) + 'static>(
359 bar_type: BarType,
360 price_precision: u8,
361 size_precision: u8,
362 handler: H,
363 ) -> Self {
364 Self {
365 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
366 }
367 }
368}
369
370impl BarAggregator for TickBarAggregator {
371 fn bar_type(&self) -> BarType {
372 self.core.bar_type
373 }
374
375 fn is_running(&self) -> bool {
376 self.core.is_running
377 }
378
379 fn set_is_running(&mut self, value: bool) {
380 self.core.set_is_running(value);
381 }
382
383 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
385 self.core.apply_update(price, size, ts_init);
386 let spec = self.core.bar_type.spec();
387
388 if self.core.builder.count >= spec.step.get() {
389 self.core.build_now_and_send();
390 }
391 }
392
393 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
394 self.core.builder.update_bar(bar, volume, ts_init);
395 let spec = self.core.bar_type.spec();
396
397 if self.core.builder.count >= spec.step.get() {
398 self.core.build_now_and_send();
399 }
400 }
401}
402
403pub struct TickImbalanceBarAggregator {
408 core: BarAggregatorCore,
409 imbalance: isize,
410}
411
412impl Debug for TickImbalanceBarAggregator {
413 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
414 f.debug_struct(stringify!(TickImbalanceBarAggregator))
415 .field("core", &self.core)
416 .field("imbalance", &self.imbalance)
417 .finish()
418 }
419}
420
421impl TickImbalanceBarAggregator {
422 pub fn new<H: FnMut(Bar) + 'static>(
428 bar_type: BarType,
429 price_precision: u8,
430 size_precision: u8,
431 handler: H,
432 ) -> Self {
433 Self {
434 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
435 imbalance: 0,
436 }
437 }
438}
439
440impl BarAggregator for TickImbalanceBarAggregator {
441 fn bar_type(&self) -> BarType {
442 self.core.bar_type
443 }
444
445 fn is_running(&self) -> bool {
446 self.core.is_running
447 }
448
449 fn set_is_running(&mut self, value: bool) {
450 self.core.set_is_running(value);
451 }
452
453 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
458 self.core.apply_update(price, size, ts_init);
459 }
460
461 fn handle_trade(&mut self, trade: TradeTick) {
462 self.core
463 .apply_update(trade.price, trade.size, trade.ts_init);
464
465 let delta = match trade.aggressor_side {
466 AggressorSide::Buyer => 1,
467 AggressorSide::Seller => -1,
468 AggressorSide::NoAggressor => 0,
469 };
470
471 if delta == 0 {
472 return;
473 }
474
475 self.imbalance += delta;
476 let threshold = self.core.bar_type.spec().step.get();
477 if self.imbalance.unsigned_abs() >= threshold {
478 self.core.build_now_and_send();
479 self.imbalance = 0;
480 }
481 }
482
483 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
484 self.core.builder.update_bar(bar, volume, ts_init);
485 }
486}
487
488pub struct TickRunsBarAggregator {
490 core: BarAggregatorCore,
491 current_run_side: Option<AggressorSide>,
492 run_count: usize,
493}
494
495impl Debug for TickRunsBarAggregator {
496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 f.debug_struct(stringify!(TickRunsBarAggregator))
498 .field("core", &self.core)
499 .field("current_run_side", &self.current_run_side)
500 .field("run_count", &self.run_count)
501 .finish()
502 }
503}
504
505impl TickRunsBarAggregator {
506 pub fn new<H: FnMut(Bar) + 'static>(
512 bar_type: BarType,
513 price_precision: u8,
514 size_precision: u8,
515 handler: H,
516 ) -> Self {
517 Self {
518 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
519 current_run_side: None,
520 run_count: 0,
521 }
522 }
523}
524
525impl BarAggregator for TickRunsBarAggregator {
526 fn bar_type(&self) -> BarType {
527 self.core.bar_type
528 }
529
530 fn is_running(&self) -> bool {
531 self.core.is_running
532 }
533
534 fn set_is_running(&mut self, value: bool) {
535 self.core.set_is_running(value);
536 }
537
538 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
543 self.core.apply_update(price, size, ts_init);
544 }
545
546 fn handle_trade(&mut self, trade: TradeTick) {
547 let side = match trade.aggressor_side {
548 AggressorSide::Buyer => Some(AggressorSide::Buyer),
549 AggressorSide::Seller => Some(AggressorSide::Seller),
550 AggressorSide::NoAggressor => None,
551 };
552
553 if let Some(side) = side {
554 if self.current_run_side != Some(side) {
555 self.current_run_side = Some(side);
556 self.run_count = 0;
557 self.core.builder.reset();
558 }
559
560 self.core
561 .apply_update(trade.price, trade.size, trade.ts_init);
562 self.run_count += 1;
563
564 let threshold = self.core.bar_type.spec().step.get();
565 if self.run_count >= threshold {
566 self.core.build_now_and_send();
567 self.run_count = 0;
568 self.current_run_side = None;
569 }
570 } else {
571 self.core
572 .apply_update(trade.price, trade.size, trade.ts_init);
573 }
574 }
575
576 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
577 self.core.builder.update_bar(bar, volume, ts_init);
578 }
579}
580
581pub struct VolumeBarAggregator {
583 core: BarAggregatorCore,
584}
585
586impl Debug for VolumeBarAggregator {
587 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588 f.debug_struct(stringify!(VolumeBarAggregator))
589 .field("core", &self.core)
590 .finish()
591 }
592}
593
594impl VolumeBarAggregator {
595 pub fn new<H: FnMut(Bar) + 'static>(
601 bar_type: BarType,
602 price_precision: u8,
603 size_precision: u8,
604 handler: H,
605 ) -> Self {
606 Self {
607 core: BarAggregatorCore::new(
608 bar_type.standard(),
609 price_precision,
610 size_precision,
611 handler,
612 ),
613 }
614 }
615}
616
617impl BarAggregator for VolumeBarAggregator {
618 fn bar_type(&self) -> BarType {
619 self.core.bar_type
620 }
621
622 fn is_running(&self) -> bool {
623 self.core.is_running
624 }
625
626 fn set_is_running(&mut self, value: bool) {
627 self.core.set_is_running(value);
628 }
629
630 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
632 let mut raw_size_update = size.raw;
633 let spec = self.core.bar_type.spec();
634 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
635
636 while raw_size_update > 0 {
637 if self.core.builder.volume.raw + raw_size_update < raw_step {
638 self.core.apply_update(
639 price,
640 Quantity::from_raw(raw_size_update, size.precision),
641 ts_init,
642 );
643 break;
644 }
645
646 let raw_size_diff = raw_step - self.core.builder.volume.raw;
647 self.core.apply_update(
648 price,
649 Quantity::from_raw(raw_size_diff, size.precision),
650 ts_init,
651 );
652
653 self.core.build_now_and_send();
654 raw_size_update -= raw_size_diff;
655 }
656 }
657
658 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
659 let mut raw_volume_update = volume.raw;
660 let spec = self.core.bar_type.spec();
661 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
662
663 while raw_volume_update > 0 {
664 if self.core.builder.volume.raw + raw_volume_update < raw_step {
665 self.core.builder.update_bar(
666 bar,
667 Quantity::from_raw(raw_volume_update, volume.precision),
668 ts_init,
669 );
670 break;
671 }
672
673 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
674 self.core.builder.update_bar(
675 bar,
676 Quantity::from_raw(raw_volume_diff, volume.precision),
677 ts_init,
678 );
679
680 self.core.build_now_and_send();
681 raw_volume_update -= raw_volume_diff;
682 }
683 }
684}
685
686pub struct VolumeImbalanceBarAggregator {
688 core: BarAggregatorCore,
689 imbalance_raw: i128,
690 raw_step: i128,
691}
692
693impl Debug for VolumeImbalanceBarAggregator {
694 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
695 f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
696 .field("core", &self.core)
697 .field("imbalance_raw", &self.imbalance_raw)
698 .field("raw_step", &self.raw_step)
699 .finish()
700 }
701}
702
703impl VolumeImbalanceBarAggregator {
704 pub fn new<H: FnMut(Bar) + 'static>(
710 bar_type: BarType,
711 price_precision: u8,
712 size_precision: u8,
713 handler: H,
714 ) -> Self {
715 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
716 Self {
717 core: BarAggregatorCore::new(
718 bar_type.standard(),
719 price_precision,
720 size_precision,
721 handler,
722 ),
723 imbalance_raw: 0,
724 raw_step,
725 }
726 }
727}
728
729impl BarAggregator for VolumeImbalanceBarAggregator {
730 fn bar_type(&self) -> BarType {
731 self.core.bar_type
732 }
733
734 fn is_running(&self) -> bool {
735 self.core.is_running
736 }
737
738 fn set_is_running(&mut self, value: bool) {
739 self.core.set_is_running(value);
740 }
741
742 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
747 self.core.apply_update(price, size, ts_init);
748 }
749
750 fn handle_trade(&mut self, trade: TradeTick) {
751 let side = match trade.aggressor_side {
752 AggressorSide::Buyer => 1,
753 AggressorSide::Seller => -1,
754 AggressorSide::NoAggressor => {
755 self.core
756 .apply_update(trade.price, trade.size, trade.ts_init);
757 return;
758 }
759 };
760
761 let mut raw_remaining = trade.size.raw as i128;
762 while raw_remaining > 0 {
763 let imbalance_abs = self.imbalance_raw.abs();
764 let needed = (self.raw_step - imbalance_abs).max(1);
765 let raw_chunk = raw_remaining.min(needed);
766 let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
767
768 self.core
769 .apply_update(trade.price, qty_chunk, trade.ts_init);
770
771 self.imbalance_raw += side * raw_chunk;
772 raw_remaining -= raw_chunk;
773
774 if self.imbalance_raw.abs() >= self.raw_step {
775 self.core.build_now_and_send();
776 self.imbalance_raw = 0;
777 }
778 }
779 }
780
781 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
782 self.core.builder.update_bar(bar, volume, ts_init);
783 }
784}
785
786pub struct VolumeRunsBarAggregator {
788 core: BarAggregatorCore,
789 current_run_side: Option<AggressorSide>,
790 run_volume_raw: QuantityRaw,
791 raw_step: QuantityRaw,
792}
793
794impl Debug for VolumeRunsBarAggregator {
795 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
796 f.debug_struct(stringify!(VolumeRunsBarAggregator))
797 .field("core", &self.core)
798 .field("current_run_side", &self.current_run_side)
799 .field("run_volume_raw", &self.run_volume_raw)
800 .field("raw_step", &self.raw_step)
801 .finish()
802 }
803}
804
805impl VolumeRunsBarAggregator {
806 pub fn new<H: FnMut(Bar) + 'static>(
812 bar_type: BarType,
813 price_precision: u8,
814 size_precision: u8,
815 handler: H,
816 ) -> Self {
817 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
818 Self {
819 core: BarAggregatorCore::new(
820 bar_type.standard(),
821 price_precision,
822 size_precision,
823 handler,
824 ),
825 current_run_side: None,
826 run_volume_raw: 0,
827 raw_step,
828 }
829 }
830}
831
832impl BarAggregator for VolumeRunsBarAggregator {
833 fn bar_type(&self) -> BarType {
834 self.core.bar_type
835 }
836
837 fn is_running(&self) -> bool {
838 self.core.is_running
839 }
840
841 fn set_is_running(&mut self, value: bool) {
842 self.core.set_is_running(value);
843 }
844
845 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
850 self.core.apply_update(price, size, ts_init);
851 }
852
853 fn handle_trade(&mut self, trade: TradeTick) {
854 let side = match trade.aggressor_side {
855 AggressorSide::Buyer => Some(AggressorSide::Buyer),
856 AggressorSide::Seller => Some(AggressorSide::Seller),
857 AggressorSide::NoAggressor => None,
858 };
859
860 let Some(side) = side else {
861 self.core
862 .apply_update(trade.price, trade.size, trade.ts_init);
863 return;
864 };
865
866 if self.current_run_side != Some(side) {
867 self.current_run_side = Some(side);
868 self.run_volume_raw = 0;
869 self.core.builder.reset();
870 }
871
872 let mut raw_remaining = trade.size.raw;
873 while raw_remaining > 0 {
874 let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
875 let raw_chunk = raw_remaining.min(needed);
876
877 self.core.apply_update(
878 trade.price,
879 Quantity::from_raw(raw_chunk, trade.size.precision),
880 trade.ts_init,
881 );
882
883 self.run_volume_raw += raw_chunk;
884 raw_remaining -= raw_chunk;
885
886 if self.run_volume_raw >= self.raw_step {
887 self.core.build_now_and_send();
888 self.run_volume_raw = 0;
889 self.current_run_side = None;
890 }
891 }
892 }
893
894 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
895 self.core.builder.update_bar(bar, volume, ts_init);
896 }
897}
898
899pub struct ValueBarAggregator {
904 core: BarAggregatorCore,
905 cum_value: f64,
906}
907
908impl Debug for ValueBarAggregator {
909 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
910 f.debug_struct(stringify!(ValueBarAggregator))
911 .field("core", &self.core)
912 .field("cum_value", &self.cum_value)
913 .finish()
914 }
915}
916
917impl ValueBarAggregator {
918 pub fn new<H: FnMut(Bar) + 'static>(
924 bar_type: BarType,
925 price_precision: u8,
926 size_precision: u8,
927 handler: H,
928 ) -> Self {
929 Self {
930 core: BarAggregatorCore::new(
931 bar_type.standard(),
932 price_precision,
933 size_precision,
934 handler,
935 ),
936 cum_value: 0.0,
937 }
938 }
939
940 #[must_use]
941 pub const fn get_cumulative_value(&self) -> f64 {
943 self.cum_value
944 }
945}
946
947impl BarAggregator for ValueBarAggregator {
948 fn bar_type(&self) -> BarType {
949 self.core.bar_type
950 }
951
952 fn is_running(&self) -> bool {
953 self.core.is_running
954 }
955
956 fn set_is_running(&mut self, value: bool) {
957 self.core.set_is_running(value);
958 }
959
960 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
962 let mut size_update = size.as_f64();
963 let spec = self.core.bar_type.spec();
964
965 while size_update > 0.0 {
966 let value_update = price.as_f64() * size_update;
967 if value_update == 0.0 {
968 self.core
970 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
971 break;
972 }
973
974 if self.cum_value + value_update < spec.step.get() as f64 {
975 self.cum_value += value_update;
976 self.core
977 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
978 break;
979 }
980
981 let value_diff = spec.step.get() as f64 - self.cum_value;
982 let mut size_diff = size_update * (value_diff / value_update);
983
984 if is_below_min_size(size_diff, size.precision) {
986 if is_below_min_size(size_update, size.precision) {
987 break;
988 }
989 size_diff = min_size_f64(size.precision);
990 }
991
992 self.core
993 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
994
995 self.core.build_now_and_send();
996 self.cum_value = 0.0;
997 size_update -= size_diff;
998 }
999 }
1000
1001 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1002 let mut volume_update = volume;
1003 let average_price = Price::new(
1004 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1005 self.core.builder.price_precision,
1006 );
1007
1008 while volume_update.as_f64() > 0.0 {
1009 let value_update = average_price.as_f64() * volume_update.as_f64();
1010 if value_update == 0.0 {
1011 self.core.builder.update_bar(bar, volume_update, ts_init);
1013 break;
1014 }
1015
1016 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1017 self.cum_value += value_update;
1018 self.core.builder.update_bar(bar, volume_update, ts_init);
1019 break;
1020 }
1021
1022 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1023 let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1024
1025 if is_below_min_size(volume_diff, volume_update.precision) {
1027 if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1028 break;
1029 }
1030 volume_diff = min_size_f64(volume_update.precision);
1031 }
1032
1033 self.core.builder.update_bar(
1034 bar,
1035 Quantity::new(volume_diff, volume_update.precision),
1036 ts_init,
1037 );
1038
1039 self.core.build_now_and_send();
1040 self.cum_value = 0.0;
1041 volume_update = Quantity::new(
1042 volume_update.as_f64() - volume_diff,
1043 volume_update.precision,
1044 );
1045 }
1046 }
1047}
1048
1049pub struct ValueImbalanceBarAggregator {
1051 core: BarAggregatorCore,
1052 imbalance_value: f64,
1053 step_value: f64,
1054}
1055
1056impl Debug for ValueImbalanceBarAggregator {
1057 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058 f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1059 .field("core", &self.core)
1060 .field("imbalance_value", &self.imbalance_value)
1061 .field("step_value", &self.step_value)
1062 .finish()
1063 }
1064}
1065
1066impl ValueImbalanceBarAggregator {
1067 pub fn new<H: FnMut(Bar) + 'static>(
1073 bar_type: BarType,
1074 price_precision: u8,
1075 size_precision: u8,
1076 handler: H,
1077 ) -> Self {
1078 Self {
1079 core: BarAggregatorCore::new(
1080 bar_type.standard(),
1081 price_precision,
1082 size_precision,
1083 handler,
1084 ),
1085 imbalance_value: 0.0,
1086 step_value: bar_type.spec().step.get() as f64,
1087 }
1088 }
1089}
1090
1091impl BarAggregator for ValueImbalanceBarAggregator {
1092 fn bar_type(&self) -> BarType {
1093 self.core.bar_type
1094 }
1095
1096 fn is_running(&self) -> bool {
1097 self.core.is_running
1098 }
1099
1100 fn set_is_running(&mut self, value: bool) {
1101 self.core.set_is_running(value);
1102 }
1103
1104 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1109 self.core.apply_update(price, size, ts_init);
1110 }
1111
1112 fn handle_trade(&mut self, trade: TradeTick) {
1113 let price_f64 = trade.price.as_f64();
1114 if price_f64 == 0.0 {
1115 self.core
1116 .apply_update(trade.price, trade.size, trade.ts_init);
1117 return;
1118 }
1119
1120 let side_sign = match trade.aggressor_side {
1121 AggressorSide::Buyer => 1.0,
1122 AggressorSide::Seller => -1.0,
1123 AggressorSide::NoAggressor => {
1124 self.core
1125 .apply_update(trade.price, trade.size, trade.ts_init);
1126 return;
1127 }
1128 };
1129
1130 let mut size_remaining = trade.size.as_f64();
1131 while size_remaining > 0.0 {
1132 let value_remaining = price_f64 * size_remaining;
1133 if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1134 let needed = self.step_value - self.imbalance_value.abs();
1135 if value_remaining <= needed {
1136 self.imbalance_value += side_sign * value_remaining;
1137 self.core.apply_update(
1138 trade.price,
1139 Quantity::new(size_remaining, trade.size.precision),
1140 trade.ts_init,
1141 );
1142
1143 if self.imbalance_value.abs() >= self.step_value {
1144 self.core.build_now_and_send();
1145 self.imbalance_value = 0.0;
1146 }
1147 break;
1148 }
1149
1150 let mut value_chunk = needed;
1151 let mut size_chunk = value_chunk / price_f64;
1152
1153 if is_below_min_size(size_chunk, trade.size.precision) {
1155 if is_below_min_size(size_remaining, trade.size.precision) {
1156 break;
1157 }
1158 size_chunk = min_size_f64(trade.size.precision);
1159 value_chunk = price_f64 * size_chunk;
1160 }
1161
1162 self.core.apply_update(
1163 trade.price,
1164 Quantity::new(size_chunk, trade.size.precision),
1165 trade.ts_init,
1166 );
1167 self.imbalance_value += side_sign * value_chunk;
1168 size_remaining -= size_chunk;
1169
1170 if self.imbalance_value.abs() >= self.step_value {
1171 self.core.build_now_and_send();
1172 self.imbalance_value = 0.0;
1173 }
1174 } else {
1175 let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1177 let mut size_chunk = value_to_flatten / price_f64;
1178
1179 if is_below_min_size(size_chunk, trade.size.precision) {
1181 if is_below_min_size(size_remaining, trade.size.precision) {
1182 break;
1183 }
1184 size_chunk = min_size_f64(trade.size.precision);
1185 value_to_flatten = price_f64 * size_chunk;
1186 }
1187
1188 self.core.apply_update(
1189 trade.price,
1190 Quantity::new(size_chunk, trade.size.precision),
1191 trade.ts_init,
1192 );
1193 self.imbalance_value += side_sign * value_to_flatten;
1194
1195 if self.imbalance_value.abs() >= self.step_value {
1197 self.core.build_now_and_send();
1198 self.imbalance_value = 0.0;
1199 }
1200 size_remaining -= size_chunk;
1201 }
1202 }
1203 }
1204
1205 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1206 self.core.builder.update_bar(bar, volume, ts_init);
1207 }
1208}
1209
1210pub struct ValueRunsBarAggregator {
1212 core: BarAggregatorCore,
1213 current_run_side: Option<AggressorSide>,
1214 run_value: f64,
1215 step_value: f64,
1216}
1217
1218impl Debug for ValueRunsBarAggregator {
1219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1220 f.debug_struct(stringify!(ValueRunsBarAggregator))
1221 .field("core", &self.core)
1222 .field("current_run_side", &self.current_run_side)
1223 .field("run_value", &self.run_value)
1224 .field("step_value", &self.step_value)
1225 .finish()
1226 }
1227}
1228
1229impl ValueRunsBarAggregator {
1230 pub fn new<H: FnMut(Bar) + 'static>(
1236 bar_type: BarType,
1237 price_precision: u8,
1238 size_precision: u8,
1239 handler: H,
1240 ) -> Self {
1241 Self {
1242 core: BarAggregatorCore::new(
1243 bar_type.standard(),
1244 price_precision,
1245 size_precision,
1246 handler,
1247 ),
1248 current_run_side: None,
1249 run_value: 0.0,
1250 step_value: bar_type.spec().step.get() as f64,
1251 }
1252 }
1253}
1254
1255impl BarAggregator for ValueRunsBarAggregator {
1256 fn bar_type(&self) -> BarType {
1257 self.core.bar_type
1258 }
1259
1260 fn is_running(&self) -> bool {
1261 self.core.is_running
1262 }
1263
1264 fn set_is_running(&mut self, value: bool) {
1265 self.core.set_is_running(value);
1266 }
1267
1268 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1273 self.core.apply_update(price, size, ts_init);
1274 }
1275
1276 fn handle_trade(&mut self, trade: TradeTick) {
1277 let price_f64 = trade.price.as_f64();
1278 if price_f64 == 0.0 {
1279 self.core
1280 .apply_update(trade.price, trade.size, trade.ts_init);
1281 return;
1282 }
1283
1284 let side = match trade.aggressor_side {
1285 AggressorSide::Buyer => Some(AggressorSide::Buyer),
1286 AggressorSide::Seller => Some(AggressorSide::Seller),
1287 AggressorSide::NoAggressor => None,
1288 };
1289
1290 let Some(side) = side else {
1291 self.core
1292 .apply_update(trade.price, trade.size, trade.ts_init);
1293 return;
1294 };
1295
1296 if self.current_run_side != Some(side) {
1297 self.current_run_side = Some(side);
1298 self.run_value = 0.0;
1299 self.core.builder.reset();
1300 }
1301
1302 let mut size_remaining = trade.size.as_f64();
1303 while size_remaining > 0.0 {
1304 let value_update = price_f64 * size_remaining;
1305 if self.run_value + value_update < self.step_value {
1306 self.run_value += value_update;
1307 self.core.apply_update(
1308 trade.price,
1309 Quantity::new(size_remaining, trade.size.precision),
1310 trade.ts_init,
1311 );
1312 break;
1313 }
1314
1315 let value_needed = self.step_value - self.run_value;
1316 let mut size_chunk = value_needed / price_f64;
1317
1318 if is_below_min_size(size_chunk, trade.size.precision) {
1320 if is_below_min_size(size_remaining, trade.size.precision) {
1321 break;
1322 }
1323 size_chunk = min_size_f64(trade.size.precision);
1324 }
1325
1326 self.core.apply_update(
1327 trade.price,
1328 Quantity::new(size_chunk, trade.size.precision),
1329 trade.ts_init,
1330 );
1331
1332 self.core.build_now_and_send();
1333 self.run_value = 0.0;
1334 self.current_run_side = None;
1335 size_remaining -= size_chunk;
1336 }
1337 }
1338
1339 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1340 self.core.builder.update_bar(bar, volume, ts_init);
1341 }
1342}
1343
1344pub struct RenkoBarAggregator {
1350 core: BarAggregatorCore,
1351 pub brick_size: PriceRaw,
1352 last_close: Option<Price>,
1353}
1354
1355impl Debug for RenkoBarAggregator {
1356 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1357 f.debug_struct(stringify!(RenkoBarAggregator))
1358 .field("core", &self.core)
1359 .field("brick_size", &self.brick_size)
1360 .field("last_close", &self.last_close)
1361 .finish()
1362 }
1363}
1364
1365impl RenkoBarAggregator {
1366 pub fn new<H: FnMut(Bar) + 'static>(
1372 bar_type: BarType,
1373 price_precision: u8,
1374 size_precision: u8,
1375 price_increment: Price,
1376 handler: H,
1377 ) -> Self {
1378 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1380
1381 Self {
1382 core: BarAggregatorCore::new(
1383 bar_type.standard(),
1384 price_precision,
1385 size_precision,
1386 handler,
1387 ),
1388 brick_size,
1389 last_close: None,
1390 }
1391 }
1392}
1393
1394impl BarAggregator for RenkoBarAggregator {
1395 fn bar_type(&self) -> BarType {
1396 self.core.bar_type
1397 }
1398
1399 fn is_running(&self) -> bool {
1400 self.core.is_running
1401 }
1402
1403 fn set_is_running(&mut self, value: bool) {
1404 self.core.set_is_running(value);
1405 }
1406
1407 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1412 self.core.apply_update(price, size, ts_init);
1414
1415 if self.last_close.is_none() {
1417 self.last_close = Some(price);
1418 return;
1419 }
1420
1421 let last_close = self.last_close.unwrap();
1422
1423 let current_raw = price.raw;
1425 let last_close_raw = last_close.raw;
1426 let price_diff_raw = current_raw - last_close_raw;
1427 let abs_price_diff_raw = price_diff_raw.abs();
1428
1429 if abs_price_diff_raw >= self.brick_size {
1431 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1432 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1433 let mut current_close = last_close;
1434
1435 let total_volume = self.core.builder.volume;
1437
1438 for _i in 0..num_bricks {
1439 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1441 let brick_close = Price::from_raw(brick_close_raw, price.precision);
1442
1443 let (brick_high, brick_low) = if direction > 0.0 {
1445 (brick_close, current_close)
1446 } else {
1447 (current_close, brick_close)
1448 };
1449
1450 self.core.builder.reset();
1452 self.core.builder.open = Some(current_close);
1453 self.core.builder.high = Some(brick_high);
1454 self.core.builder.low = Some(brick_low);
1455 self.core.builder.close = Some(brick_close);
1456 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1458 self.core.builder.ts_last = ts_init;
1459 self.core.builder.initialized = true;
1460
1461 self.core.build_and_send(ts_init, ts_init);
1463
1464 current_close = brick_close;
1466 self.last_close = Some(brick_close);
1467 }
1468 }
1469 }
1470
1471 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1472 self.core.builder.update_bar(bar, volume, ts_init);
1474
1475 if self.last_close.is_none() {
1477 self.last_close = Some(bar.close);
1478 return;
1479 }
1480
1481 let last_close = self.last_close.unwrap();
1482
1483 let current_raw = bar.close.raw;
1485 let last_close_raw = last_close.raw;
1486 let price_diff_raw = current_raw - last_close_raw;
1487 let abs_price_diff_raw = price_diff_raw.abs();
1488
1489 if abs_price_diff_raw >= self.brick_size {
1491 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1492 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1493 let mut current_close = last_close;
1494
1495 let total_volume = self.core.builder.volume;
1497
1498 for _i in 0..num_bricks {
1499 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1501 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1502
1503 let (brick_high, brick_low) = if direction > 0.0 {
1505 (brick_close, current_close)
1506 } else {
1507 (current_close, brick_close)
1508 };
1509
1510 self.core.builder.reset();
1512 self.core.builder.open = Some(current_close);
1513 self.core.builder.high = Some(brick_high);
1514 self.core.builder.low = Some(brick_low);
1515 self.core.builder.close = Some(brick_close);
1516 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1518 self.core.builder.ts_last = ts_init;
1519 self.core.builder.initialized = true;
1520
1521 self.core.build_and_send(ts_init, ts_init);
1523
1524 current_close = brick_close;
1526 self.last_close = Some(brick_close);
1527 }
1528 }
1529 }
1530}
1531
1532pub struct TimeBarAggregator {
1536 core: BarAggregatorCore,
1537 clock: Rc<RefCell<dyn Clock>>,
1538 build_with_no_updates: bool,
1539 timestamp_on_close: bool,
1540 is_left_open: bool,
1541 stored_open_ns: UnixNanos,
1542 timer_name: String,
1543 interval_ns: UnixNanos,
1544 next_close_ns: UnixNanos,
1545 bar_build_delay: u64,
1546 time_bars_origin_offset: Option<TimeDelta>,
1547 skip_first_non_full_bar: bool,
1548 pub historical_mode: bool,
1549 historical_events: Vec<TimeEvent>,
1550 aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1551}
1552
1553impl Debug for TimeBarAggregator {
1554 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1555 f.debug_struct(stringify!(TimeBarAggregator))
1556 .field("core", &self.core)
1557 .field("build_with_no_updates", &self.build_with_no_updates)
1558 .field("timestamp_on_close", &self.timestamp_on_close)
1559 .field("is_left_open", &self.is_left_open)
1560 .field("timer_name", &self.timer_name)
1561 .field("interval_ns", &self.interval_ns)
1562 .field("bar_build_delay", &self.bar_build_delay)
1563 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1564 .finish()
1565 }
1566}
1567
1568impl TimeBarAggregator {
1569 #[allow(clippy::too_many_arguments)]
1575 pub fn new<H: FnMut(Bar) + 'static>(
1576 bar_type: BarType,
1577 price_precision: u8,
1578 size_precision: u8,
1579 clock: Rc<RefCell<dyn Clock>>,
1580 handler: H,
1581 build_with_no_updates: bool,
1582 timestamp_on_close: bool,
1583 interval_type: BarIntervalType,
1584 time_bars_origin_offset: Option<TimeDelta>,
1585 bar_build_delay: u64,
1586 skip_first_non_full_bar: bool,
1587 ) -> Self {
1588 let is_left_open = match interval_type {
1589 BarIntervalType::LeftOpen => true,
1590 BarIntervalType::RightOpen => false,
1591 };
1592
1593 let core = BarAggregatorCore::new(
1594 bar_type.standard(),
1595 price_precision,
1596 size_precision,
1597 handler,
1598 );
1599
1600 Self {
1601 core,
1602 clock,
1603 build_with_no_updates,
1604 timestamp_on_close,
1605 is_left_open,
1606 stored_open_ns: UnixNanos::default(),
1607 timer_name: bar_type.to_string(),
1608 interval_ns: get_bar_interval_ns(&bar_type),
1609 next_close_ns: UnixNanos::default(),
1610 bar_build_delay,
1611 time_bars_origin_offset,
1612 skip_first_non_full_bar,
1613 historical_mode: false,
1614 historical_events: Vec::new(),
1615 aggregator_weak: None,
1616 }
1617 }
1618
1619 pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1621 self.clock = clock;
1622 }
1623
1624 pub fn start_timer_internal(
1633 &mut self,
1634 aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1635 ) {
1636 let aggregator_weak = if let Some(rc) = aggregator_rc {
1638 let weak = Rc::downgrade(&rc);
1640 self.aggregator_weak = Some(weak.clone());
1641 weak
1642 } else {
1643 self.aggregator_weak
1645 .as_ref()
1646 .expect("Aggregator weak reference must be set before calling start_timer()")
1647 .clone()
1648 };
1649
1650 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1651 if let Some(agg) = aggregator_weak.upgrade() {
1652 agg.borrow_mut().build_bar(event);
1653 }
1654 }));
1655
1656 let now = self.clock.borrow().utc_now();
1658 let mut start_time =
1659 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1660 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1661
1662 let fire_immediately = start_time == now;
1664
1665 self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1666
1667 let spec = &self.bar_type().spec();
1668 let start_time_ns = UnixNanos::from(start_time);
1669
1670 if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1671 self.clock
1672 .borrow_mut()
1673 .set_timer_ns(
1674 &self.timer_name,
1675 self.interval_ns.as_u64(),
1676 Some(start_time_ns),
1677 None,
1678 Some(callback),
1679 Some(true), Some(fire_immediately),
1681 )
1682 .expect(FAILED);
1683
1684 if fire_immediately {
1685 self.next_close_ns = start_time_ns;
1686 } else {
1687 let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1688 self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1689 }
1690
1691 self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1692 } else {
1693 let alert_time = if fire_immediately {
1695 start_time
1696 } else {
1697 let step = spec.step.get() as u32;
1698 if spec.aggregation == BarAggregation::Month {
1699 add_n_months(start_time, step).expect(FAILED)
1700 } else {
1701 add_n_years(start_time, step).expect(FAILED)
1703 }
1704 };
1705
1706 self.clock
1707 .borrow_mut()
1708 .set_time_alert_ns(
1709 &self.timer_name,
1710 UnixNanos::from(alert_time),
1711 Some(callback),
1712 Some(true), )
1714 .expect(FAILED);
1715
1716 self.next_close_ns = UnixNanos::from(alert_time);
1717 self.stored_open_ns = UnixNanos::from(start_time);
1718 }
1719
1720 log::debug!(
1721 "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1722 self.timer_name,
1723 start_time,
1724 self.historical_mode,
1725 fire_immediately,
1726 now,
1727 self.bar_build_delay
1728 );
1729 }
1730
1731 pub fn stop(&mut self) {
1733 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1734 }
1735
1736 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1737 if self.skip_first_non_full_bar {
1738 self.core.builder.reset();
1739 self.skip_first_non_full_bar = false;
1740 } else {
1741 self.core.build_and_send(ts_event, ts_init);
1742 }
1743 }
1744
1745 fn build_bar(&mut self, event: TimeEvent) {
1746 if !self.core.builder.initialized {
1747 return;
1748 }
1749
1750 if !self.build_with_no_updates && self.core.builder.count == 0 {
1751 return; }
1753
1754 let ts_init = event.ts_event;
1755 let ts_event = if self.is_left_open {
1756 if self.timestamp_on_close {
1757 event.ts_event
1758 } else {
1759 self.stored_open_ns
1760 }
1761 } else {
1762 self.stored_open_ns
1763 };
1764
1765 self.build_and_send(ts_event, ts_init);
1766
1767 self.stored_open_ns = event.ts_event;
1769
1770 if self.bar_type().spec().aggregation == BarAggregation::Month {
1771 let step = self.bar_type().spec().step.get() as u32;
1772 let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1773
1774 self.clock
1775 .borrow_mut()
1776 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1777 .expect(FAILED);
1778
1779 self.next_close_ns = alert_time_ns;
1780 } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1781 let step = self.bar_type().spec().step.get() as u32;
1782 let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1783
1784 self.clock
1785 .borrow_mut()
1786 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1787 .expect(FAILED);
1788
1789 self.next_close_ns = alert_time_ns;
1790 } else {
1791 self.next_close_ns = self
1793 .clock
1794 .borrow()
1795 .next_time_ns(&self.timer_name)
1796 .unwrap_or_default();
1797 }
1798 }
1799
1800 fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1801 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1802 {
1804 let mut clock_borrow = self.clock.borrow_mut();
1805 let test_clock = clock_borrow
1806 .as_any_mut()
1807 .downcast_mut::<TestClock>()
1808 .expect("Expected TestClock in historical mode");
1809 test_clock.set_time(ts_init);
1810 }
1811 self.start_timer_internal(None);
1813 }
1814
1815 {
1817 let mut clock_borrow = self.clock.borrow_mut();
1818 let test_clock = clock_borrow
1819 .as_any_mut()
1820 .downcast_mut::<TestClock>()
1821 .expect("Expected TestClock in historical mode");
1822 self.historical_events = test_clock.advance_time(ts_init, true);
1823 }
1824 }
1825
1826 fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1827 let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1830 for event in events {
1831 self.build_bar(event);
1832 }
1833 }
1834
1835 pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1837 self.historical_events = events;
1838 }
1839}
1840
1841impl BarAggregator for TimeBarAggregator {
1842 fn bar_type(&self) -> BarType {
1843 self.core.bar_type
1844 }
1845
1846 fn is_running(&self) -> bool {
1847 self.core.is_running
1848 }
1849
1850 fn set_is_running(&mut self, value: bool) {
1851 self.core.set_is_running(value);
1852 }
1853
1854 fn stop(&mut self) {
1856 Self::stop(self);
1857 }
1858
1859 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1860 if self.historical_mode {
1861 self.preprocess_historical_events(ts_init);
1862 }
1863
1864 self.core.apply_update(price, size, ts_init);
1865
1866 if self.historical_mode {
1867 self.postprocess_historical_events(ts_init);
1868 }
1869 }
1870
1871 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1872 if self.historical_mode {
1873 self.preprocess_historical_events(ts_init);
1874 }
1875
1876 self.core.builder.update_bar(bar, volume, ts_init);
1877
1878 if self.historical_mode {
1879 self.postprocess_historical_events(ts_init);
1880 }
1881 }
1882
1883 fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1884 self.historical_mode = historical_mode;
1885 self.core.handler = handler;
1886 }
1887
1888 fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1889 self.set_historical_events_internal(events);
1890 }
1891
1892 fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1893 self.set_clock_internal(clock);
1894 }
1895
1896 fn build_bar(&mut self, event: TimeEvent) {
1897 {
1900 #[allow(clippy::use_self)]
1901 TimeBarAggregator::build_bar(self, event);
1902 }
1903 }
1904
1905 fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1906 self.aggregator_weak = Some(weak);
1907 }
1908
1909 fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1910 self.start_timer_internal(aggregator_rc);
1911 }
1912}
1913
1914fn is_below_min_size(size: f64, precision: u8) -> bool {
1915 Quantity::new(size, precision).raw == 0
1916}
1917
1918fn min_size_f64(precision: u8) -> f64 {
1919 10_f64.powi(-(precision as i32))
1920}
1921
1922#[cfg(test)]
1923mod tests {
1924 use std::sync::{Arc, Mutex};
1925
1926 use nautilus_common::clock::TestClock;
1927 use nautilus_core::{MUTEX_POISONED, UUID4};
1928 use nautilus_model::{
1929 data::{BarSpecification, BarType},
1930 enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1931 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1932 types::{Price, Quantity},
1933 };
1934 use rstest::rstest;
1935 use ustr::Ustr;
1936
1937 use super::*;
1938
1939 #[rstest]
1940 fn test_bar_builder_initialization(equity_aapl: Equity) {
1941 let instrument = InstrumentAny::Equity(equity_aapl);
1942 let bar_type = BarType::new(
1943 instrument.id(),
1944 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1945 AggregationSource::Internal,
1946 );
1947 let builder = BarBuilder::new(
1948 bar_type,
1949 instrument.price_precision(),
1950 instrument.size_precision(),
1951 );
1952
1953 assert!(!builder.initialized);
1954 assert_eq!(builder.ts_last, 0);
1955 assert_eq!(builder.count, 0);
1956 }
1957
1958 #[rstest]
1959 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1960 let instrument = InstrumentAny::Equity(equity_aapl);
1961 let bar_type = BarType::new(
1962 instrument.id(),
1963 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1964 AggregationSource::Internal,
1965 );
1966 let mut builder = BarBuilder::new(
1967 bar_type,
1968 instrument.price_precision(),
1969 instrument.size_precision(),
1970 );
1971
1972 builder.update(
1973 Price::from("100.00"),
1974 Quantity::from(1),
1975 UnixNanos::from(1000),
1976 );
1977 builder.update(
1978 Price::from("95.00"),
1979 Quantity::from(1),
1980 UnixNanos::from(2000),
1981 );
1982 builder.update(
1983 Price::from("105.00"),
1984 Quantity::from(1),
1985 UnixNanos::from(3000),
1986 );
1987
1988 let bar = builder.build_now();
1989 assert!(bar.high > bar.low);
1990 assert_eq!(bar.open, Price::from("100.00"));
1991 assert_eq!(bar.high, Price::from("105.00"));
1992 assert_eq!(bar.low, Price::from("95.00"));
1993 assert_eq!(bar.close, Price::from("105.00"));
1994 }
1995
1996 #[rstest]
1997 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1998 let instrument = InstrumentAny::Equity(equity_aapl);
1999 let bar_type = BarType::new(
2000 instrument.id(),
2001 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2002 AggregationSource::Internal,
2003 );
2004 let mut builder = BarBuilder::new(
2005 bar_type,
2006 instrument.price_precision(),
2007 instrument.size_precision(),
2008 );
2009
2010 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2011 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2012
2013 assert_eq!(builder.ts_last, 1_000);
2014 assert_eq!(builder.count, 1);
2015 }
2016
2017 #[rstest]
2018 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2019 let instrument = InstrumentAny::Equity(equity_aapl);
2020 let bar_type = BarType::new(
2021 instrument.id(),
2022 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2023 AggregationSource::Internal,
2024 );
2025 let mut builder = BarBuilder::new(
2026 bar_type,
2027 instrument.price_precision(),
2028 instrument.size_precision(),
2029 );
2030
2031 builder.update(
2032 Price::from("1.00000"),
2033 Quantity::from(1),
2034 UnixNanos::default(),
2035 );
2036
2037 assert!(builder.initialized);
2038 assert_eq!(builder.ts_last, 0);
2039 assert_eq!(builder.count, 1);
2040 }
2041
2042 #[rstest]
2043 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2044 equity_aapl: Equity,
2045 ) {
2046 let instrument = InstrumentAny::Equity(equity_aapl);
2047 let bar_type = BarType::new(
2048 instrument.id(),
2049 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2050 AggregationSource::Internal,
2051 );
2052 let mut builder = BarBuilder::new(bar_type, 2, 0);
2053
2054 builder.update(
2055 Price::from("1.00000"),
2056 Quantity::from(1),
2057 UnixNanos::from(1_000),
2058 );
2059 builder.update(
2060 Price::from("1.00001"),
2061 Quantity::from(1),
2062 UnixNanos::from(500),
2063 );
2064
2065 assert!(builder.initialized);
2066 assert_eq!(builder.ts_last, 1_000);
2067 assert_eq!(builder.count, 1);
2068 }
2069
2070 #[rstest]
2071 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2072 let instrument = InstrumentAny::Equity(equity_aapl);
2073 let bar_type = BarType::new(
2074 instrument.id(),
2075 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2076 AggregationSource::Internal,
2077 );
2078 let mut builder = BarBuilder::new(
2079 bar_type,
2080 instrument.price_precision(),
2081 instrument.size_precision(),
2082 );
2083
2084 for _ in 0..5 {
2085 builder.update(
2086 Price::from("1.00000"),
2087 Quantity::from(1),
2088 UnixNanos::from(1_000),
2089 );
2090 }
2091
2092 assert_eq!(builder.count, 5);
2093 }
2094
2095 #[rstest]
2096 #[should_panic]
2097 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2098 let instrument = InstrumentAny::Equity(equity_aapl);
2099 let bar_type = BarType::new(
2100 instrument.id(),
2101 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2102 AggregationSource::Internal,
2103 );
2104 let mut builder = BarBuilder::new(
2105 bar_type,
2106 instrument.price_precision(),
2107 instrument.size_precision(),
2108 );
2109 let _ = builder.build_now();
2110 }
2111
2112 #[rstest]
2113 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2114 let instrument = InstrumentAny::Equity(equity_aapl);
2115 let bar_type = BarType::new(
2116 instrument.id(),
2117 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2118 AggregationSource::Internal,
2119 );
2120 let mut builder = BarBuilder::new(
2121 bar_type,
2122 instrument.price_precision(),
2123 instrument.size_precision(),
2124 );
2125
2126 builder.update(
2127 Price::from("1.00001"),
2128 Quantity::from(2),
2129 UnixNanos::default(),
2130 );
2131 builder.update(
2132 Price::from("1.00002"),
2133 Quantity::from(2),
2134 UnixNanos::default(),
2135 );
2136 builder.update(
2137 Price::from("1.00000"),
2138 Quantity::from(1),
2139 UnixNanos::from(1_000_000_000),
2140 );
2141
2142 let bar = builder.build_now();
2143
2144 assert_eq!(bar.open, Price::from("1.00001"));
2145 assert_eq!(bar.high, Price::from("1.00002"));
2146 assert_eq!(bar.low, Price::from("1.00000"));
2147 assert_eq!(bar.close, Price::from("1.00000"));
2148 assert_eq!(bar.volume, Quantity::from(5));
2149 assert_eq!(bar.ts_init, 1_000_000_000);
2150 assert_eq!(builder.ts_last, 1_000_000_000);
2151 assert_eq!(builder.count, 0);
2152 }
2153
2154 #[rstest]
2155 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2156 let instrument = InstrumentAny::Equity(equity_aapl);
2157 let bar_type = BarType::new(
2158 instrument.id(),
2159 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2160 AggregationSource::Internal,
2161 );
2162 let mut builder = BarBuilder::new(bar_type, 2, 0);
2163
2164 builder.update(
2165 Price::from("1.00001"),
2166 Quantity::from(1),
2167 UnixNanos::default(),
2168 );
2169 builder.build_now();
2170
2171 builder.update(
2172 Price::from("1.00000"),
2173 Quantity::from(1),
2174 UnixNanos::default(),
2175 );
2176 builder.update(
2177 Price::from("1.00003"),
2178 Quantity::from(1),
2179 UnixNanos::default(),
2180 );
2181 builder.update(
2182 Price::from("1.00002"),
2183 Quantity::from(1),
2184 UnixNanos::default(),
2185 );
2186
2187 let bar = builder.build_now();
2188
2189 assert_eq!(bar.open, Price::from("1.00000"));
2190 assert_eq!(bar.high, Price::from("1.00003"));
2191 assert_eq!(bar.low, Price::from("1.00000"));
2192 assert_eq!(bar.close, Price::from("1.00002"));
2193 assert_eq!(bar.volume, Quantity::from(3));
2194 }
2195
2196 #[rstest]
2197 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2198 let instrument = InstrumentAny::Equity(equity_aapl);
2199 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2200 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2201 let handler = Arc::new(Mutex::new(Vec::new()));
2202 let handler_clone = Arc::clone(&handler);
2203
2204 let mut aggregator = TickBarAggregator::new(
2205 bar_type,
2206 instrument.price_precision(),
2207 instrument.size_precision(),
2208 move |bar: Bar| {
2209 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2210 handler_guard.push(bar);
2211 },
2212 );
2213
2214 let trade = TradeTick::default();
2215 aggregator.handle_trade(trade);
2216
2217 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2218 assert_eq!(handler_guard.len(), 0);
2219 }
2220
2221 #[rstest]
2222 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2223 let instrument = InstrumentAny::Equity(equity_aapl);
2224 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2225 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2226 let handler = Arc::new(Mutex::new(Vec::new()));
2227 let handler_clone = Arc::clone(&handler);
2228
2229 let mut aggregator = TickBarAggregator::new(
2230 bar_type,
2231 instrument.price_precision(),
2232 instrument.size_precision(),
2233 move |bar: Bar| {
2234 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2235 handler_guard.push(bar);
2236 },
2237 );
2238
2239 let trade = TradeTick::default();
2240 aggregator.handle_trade(trade);
2241 aggregator.handle_trade(trade);
2242 aggregator.handle_trade(trade);
2243
2244 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2245 let bar = handler_guard.first().unwrap();
2246 assert_eq!(handler_guard.len(), 1);
2247 assert_eq!(bar.open, trade.price);
2248 assert_eq!(bar.high, trade.price);
2249 assert_eq!(bar.low, trade.price);
2250 assert_eq!(bar.close, trade.price);
2251 assert_eq!(bar.volume, Quantity::from(300000));
2252 assert_eq!(bar.ts_event, trade.ts_event);
2253 assert_eq!(bar.ts_init, trade.ts_init);
2254 }
2255
2256 #[rstest]
2257 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2258 let instrument = InstrumentAny::Equity(equity_aapl);
2259 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2260 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2261 let handler = Arc::new(Mutex::new(Vec::new()));
2262 let handler_clone = Arc::clone(&handler);
2263
2264 let mut aggregator = TickBarAggregator::new(
2265 bar_type,
2266 instrument.price_precision(),
2267 instrument.size_precision(),
2268 move |bar: Bar| {
2269 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2270 handler_guard.push(bar);
2271 },
2272 );
2273
2274 aggregator.update(
2275 Price::from("1.00001"),
2276 Quantity::from(1),
2277 UnixNanos::default(),
2278 );
2279 aggregator.update(
2280 Price::from("1.00002"),
2281 Quantity::from(1),
2282 UnixNanos::from(1000),
2283 );
2284 aggregator.update(
2285 Price::from("1.00003"),
2286 Quantity::from(1),
2287 UnixNanos::from(2000),
2288 );
2289
2290 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2291 assert_eq!(handler_guard.len(), 1);
2292
2293 let bar = handler_guard.first().unwrap();
2294 assert_eq!(bar.open, Price::from("1.00001"));
2295 assert_eq!(bar.high, Price::from("1.00003"));
2296 assert_eq!(bar.low, Price::from("1.00001"));
2297 assert_eq!(bar.close, Price::from("1.00003"));
2298 assert_eq!(bar.volume, Quantity::from(3));
2299 }
2300
2301 #[rstest]
2302 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2303 let instrument = InstrumentAny::Equity(equity_aapl);
2304 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2305 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2306 let handler = Arc::new(Mutex::new(Vec::new()));
2307 let handler_clone = Arc::clone(&handler);
2308
2309 let mut aggregator = TickBarAggregator::new(
2310 bar_type,
2311 instrument.price_precision(),
2312 instrument.size_precision(),
2313 move |bar: Bar| {
2314 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2315 handler_guard.push(bar);
2316 },
2317 );
2318
2319 aggregator.update(
2320 Price::from("1.00001"),
2321 Quantity::from(1),
2322 UnixNanos::default(),
2323 );
2324 aggregator.update(
2325 Price::from("1.00002"),
2326 Quantity::from(1),
2327 UnixNanos::from(1000),
2328 );
2329 aggregator.update(
2330 Price::from("1.00003"),
2331 Quantity::from(1),
2332 UnixNanos::from(2000),
2333 );
2334 aggregator.update(
2335 Price::from("1.00004"),
2336 Quantity::from(1),
2337 UnixNanos::from(3000),
2338 );
2339
2340 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2341 assert_eq!(handler_guard.len(), 2);
2342
2343 let bar1 = &handler_guard[0];
2344 assert_eq!(bar1.open, Price::from("1.00001"));
2345 assert_eq!(bar1.close, Price::from("1.00002"));
2346 assert_eq!(bar1.volume, Quantity::from(2));
2347
2348 let bar2 = &handler_guard[1];
2349 assert_eq!(bar2.open, Price::from("1.00003"));
2350 assert_eq!(bar2.close, Price::from("1.00004"));
2351 assert_eq!(bar2.volume, Quantity::from(2));
2352 }
2353
2354 #[rstest]
2355 fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2356 let instrument = InstrumentAny::Equity(equity_aapl);
2357 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2358 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2359 let handler = Arc::new(Mutex::new(Vec::new()));
2360 let handler_clone = Arc::clone(&handler);
2361
2362 let mut aggregator = TickImbalanceBarAggregator::new(
2363 bar_type,
2364 instrument.price_precision(),
2365 instrument.size_precision(),
2366 move |bar: Bar| {
2367 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2368 handler_guard.push(bar);
2369 },
2370 );
2371
2372 let trade = TradeTick::default();
2373 aggregator.handle_trade(trade);
2374 aggregator.handle_trade(trade);
2375
2376 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2377 assert_eq!(handler_guard.len(), 1);
2378 let bar = handler_guard.first().unwrap();
2379 assert_eq!(bar.volume, Quantity::from(200000));
2380 }
2381
2382 #[rstest]
2383 fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2384 let instrument = InstrumentAny::Equity(equity_aapl);
2385 let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2386 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2387 let handler = Arc::new(Mutex::new(Vec::new()));
2388 let handler_clone = Arc::clone(&handler);
2389
2390 let mut aggregator = TickImbalanceBarAggregator::new(
2391 bar_type,
2392 instrument.price_precision(),
2393 instrument.size_precision(),
2394 move |bar: Bar| {
2395 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2396 handler_guard.push(bar);
2397 },
2398 );
2399
2400 let sell = TradeTick {
2401 aggressor_side: AggressorSide::Seller,
2402 ..TradeTick::default()
2403 };
2404
2405 aggregator.handle_trade(sell);
2406
2407 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2408 assert_eq!(handler_guard.len(), 1);
2409 }
2410
2411 #[rstest]
2412 fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2413 let instrument = InstrumentAny::Equity(equity_aapl);
2414 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2415 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2416 let handler = Arc::new(Mutex::new(Vec::new()));
2417 let handler_clone = Arc::clone(&handler);
2418
2419 let mut aggregator = TickRunsBarAggregator::new(
2420 bar_type,
2421 instrument.price_precision(),
2422 instrument.size_precision(),
2423 move |bar: Bar| {
2424 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2425 handler_guard.push(bar);
2426 },
2427 );
2428
2429 let buy = TradeTick::default();
2430 let sell = TradeTick {
2431 aggressor_side: AggressorSide::Seller,
2432 ..buy
2433 };
2434
2435 aggregator.handle_trade(buy);
2436 aggregator.handle_trade(buy);
2437 aggregator.handle_trade(sell);
2438 aggregator.handle_trade(sell);
2439
2440 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2441 assert_eq!(handler_guard.len(), 2);
2442 }
2443
2444 #[rstest]
2445 fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2446 let instrument = InstrumentAny::Equity(equity_aapl);
2447 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2448 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2449 let handler = Arc::new(Mutex::new(Vec::new()));
2450 let handler_clone = Arc::clone(&handler);
2451
2452 let mut aggregator = TickRunsBarAggregator::new(
2453 bar_type,
2454 instrument.price_precision(),
2455 instrument.size_precision(),
2456 move |bar: Bar| {
2457 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2458 handler_guard.push(bar);
2459 },
2460 );
2461
2462 let buy = TradeTick {
2463 size: Quantity::from(1),
2464 ..TradeTick::default()
2465 };
2466 let sell = TradeTick {
2467 aggressor_side: AggressorSide::Seller,
2468 size: Quantity::from(1),
2469 ..buy
2470 };
2471
2472 aggregator.handle_trade(buy);
2473 aggregator.handle_trade(buy);
2474 aggregator.handle_trade(sell);
2475 aggregator.handle_trade(sell);
2476
2477 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2478 assert_eq!(handler_guard.len(), 2);
2479 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2480 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2481 }
2482
2483 #[rstest]
2484 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2485 let instrument = InstrumentAny::Equity(equity_aapl);
2486 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2487 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2488 let handler = Arc::new(Mutex::new(Vec::new()));
2489 let handler_clone = Arc::clone(&handler);
2490
2491 let mut aggregator = VolumeBarAggregator::new(
2492 bar_type,
2493 instrument.price_precision(),
2494 instrument.size_precision(),
2495 move |bar: Bar| {
2496 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2497 handler_guard.push(bar);
2498 },
2499 );
2500
2501 aggregator.update(
2502 Price::from("1.00001"),
2503 Quantity::from(25),
2504 UnixNanos::default(),
2505 );
2506
2507 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2508 assert_eq!(handler_guard.len(), 2);
2509 let bar1 = &handler_guard[0];
2510 assert_eq!(bar1.volume, Quantity::from(10));
2511 let bar2 = &handler_guard[1];
2512 assert_eq!(bar2.volume, Quantity::from(10));
2513 }
2514
2515 #[rstest]
2516 fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2517 let instrument = InstrumentAny::Equity(equity_aapl);
2518 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2519 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2520 let handler = Arc::new(Mutex::new(Vec::new()));
2521 let handler_clone = Arc::clone(&handler);
2522
2523 let mut aggregator = VolumeRunsBarAggregator::new(
2524 bar_type,
2525 instrument.price_precision(),
2526 instrument.size_precision(),
2527 move |bar: Bar| {
2528 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2529 handler_guard.push(bar);
2530 },
2531 );
2532
2533 let buy = TradeTick {
2534 instrument_id: instrument.id(),
2535 price: Price::from("1.0"),
2536 size: Quantity::from(1),
2537 ..TradeTick::default()
2538 };
2539 let sell = TradeTick {
2540 aggressor_side: AggressorSide::Seller,
2541 ..buy
2542 };
2543
2544 aggregator.handle_trade(buy);
2545 aggregator.handle_trade(buy); aggregator.handle_trade(sell);
2547 aggregator.handle_trade(sell); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2550 assert!(handler_guard.len() >= 2);
2551 assert!(
2552 (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2553 < f64::EPSILON
2554 );
2555 }
2556
2557 #[rstest]
2558 fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2559 let instrument = InstrumentAny::Equity(equity_aapl);
2560 let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2561 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2562 let handler = Arc::new(Mutex::new(Vec::new()));
2563 let handler_clone = Arc::clone(&handler);
2564
2565 let mut aggregator = VolumeRunsBarAggregator::new(
2566 bar_type,
2567 instrument.price_precision(),
2568 instrument.size_precision(),
2569 move |bar: Bar| {
2570 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2571 handler_guard.push(bar);
2572 },
2573 );
2574
2575 let trade = TradeTick {
2576 instrument_id: instrument.id(),
2577 price: Price::from("1.0"),
2578 size: Quantity::from(5),
2579 ..TradeTick::default()
2580 };
2581
2582 aggregator.handle_trade(trade);
2583
2584 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2585 assert!(!handler_guard.is_empty());
2586 assert!(handler_guard[0].volume.as_f64() > 0.0);
2587 assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2588 }
2589
2590 #[rstest]
2591 fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2592 let instrument = InstrumentAny::Equity(equity_aapl);
2593 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2594 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2595 let handler = Arc::new(Mutex::new(Vec::new()));
2596 let handler_clone = Arc::clone(&handler);
2597
2598 let mut aggregator = VolumeImbalanceBarAggregator::new(
2599 bar_type,
2600 instrument.price_precision(),
2601 instrument.size_precision(),
2602 move |bar: Bar| {
2603 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2604 handler_guard.push(bar);
2605 },
2606 );
2607
2608 let trade_small = TradeTick {
2609 instrument_id: instrument.id(),
2610 price: Price::from("1.0"),
2611 size: Quantity::from(1),
2612 ..TradeTick::default()
2613 };
2614 let trade_large = TradeTick {
2615 size: Quantity::from(3),
2616 ..trade_small
2617 };
2618
2619 aggregator.handle_trade(trade_small);
2620 aggregator.handle_trade(trade_large);
2621
2622 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2623 assert_eq!(handler_guard.len(), 2);
2624 let total_output = handler_guard
2625 .iter()
2626 .map(|bar| bar.volume.as_f64())
2627 .sum::<f64>();
2628 let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2629 assert!((total_output - total_input).abs() < f64::EPSILON);
2630 }
2631
2632 #[rstest]
2633 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2634 let instrument = InstrumentAny::Equity(equity_aapl);
2635 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2637 let handler = Arc::new(Mutex::new(Vec::new()));
2638 let handler_clone = Arc::clone(&handler);
2639
2640 let mut aggregator = ValueBarAggregator::new(
2641 bar_type,
2642 instrument.price_precision(),
2643 instrument.size_precision(),
2644 move |bar: Bar| {
2645 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2646 handler_guard.push(bar);
2647 },
2648 );
2649
2650 aggregator.update(
2652 Price::from("100.00"),
2653 Quantity::from(5),
2654 UnixNanos::default(),
2655 );
2656 aggregator.update(
2657 Price::from("100.00"),
2658 Quantity::from(5),
2659 UnixNanos::from(1000),
2660 );
2661
2662 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2663 assert_eq!(handler_guard.len(), 1);
2664 let bar = handler_guard.first().unwrap();
2665 assert_eq!(bar.volume, Quantity::from(10));
2666 }
2667
2668 #[rstest]
2669 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2670 let instrument = InstrumentAny::Equity(equity_aapl);
2671 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2672 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2673 let handler = Arc::new(Mutex::new(Vec::new()));
2674 let handler_clone = Arc::clone(&handler);
2675
2676 let mut aggregator = ValueBarAggregator::new(
2677 bar_type,
2678 instrument.price_precision(),
2679 instrument.size_precision(),
2680 move |bar: Bar| {
2681 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2682 handler_guard.push(bar);
2683 },
2684 );
2685
2686 aggregator.update(
2688 Price::from("100.00"),
2689 Quantity::from(25),
2690 UnixNanos::default(),
2691 );
2692
2693 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2694 assert_eq!(handler_guard.len(), 2);
2695 let remaining_value = aggregator.get_cumulative_value();
2696 assert!(remaining_value < 1000.0); }
2698
2699 #[rstest]
2700 fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2701 let instrument = InstrumentAny::Equity(equity_aapl);
2702 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2703 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2704 let handler = Arc::new(Mutex::new(Vec::new()));
2705 let handler_clone = Arc::clone(&handler);
2706
2707 let mut aggregator = ValueBarAggregator::new(
2708 bar_type,
2709 instrument.price_precision(),
2710 instrument.size_precision(),
2711 move |bar: Bar| {
2712 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2713 handler_guard.push(bar);
2714 },
2715 );
2716
2717 aggregator.update(
2719 Price::from("0.00"),
2720 Quantity::from(100),
2721 UnixNanos::default(),
2722 );
2723
2724 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2726 assert_eq!(handler_guard.len(), 0);
2727
2728 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2730 }
2731
2732 #[rstest]
2733 fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2734 let instrument = InstrumentAny::Equity(equity_aapl);
2735 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2736 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2737 let handler = Arc::new(Mutex::new(Vec::new()));
2738 let handler_clone = Arc::clone(&handler);
2739
2740 let mut aggregator = ValueBarAggregator::new(
2741 bar_type,
2742 instrument.price_precision(),
2743 instrument.size_precision(),
2744 move |bar: Bar| {
2745 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2746 handler_guard.push(bar);
2747 },
2748 );
2749
2750 aggregator.update(
2752 Price::from("100.00"),
2753 Quantity::from(0),
2754 UnixNanos::default(),
2755 );
2756
2757 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2759 assert_eq!(handler_guard.len(), 0);
2760
2761 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2763 }
2764
2765 #[rstest]
2766 fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2767 let instrument = InstrumentAny::Equity(equity_aapl);
2768 let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2769 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2770 let handler = Arc::new(Mutex::new(Vec::new()));
2771 let handler_clone = Arc::clone(&handler);
2772
2773 let mut aggregator = ValueImbalanceBarAggregator::new(
2774 bar_type,
2775 instrument.price_precision(),
2776 instrument.size_precision(),
2777 move |bar: Bar| {
2778 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2779 handler_guard.push(bar);
2780 },
2781 );
2782
2783 let buy = TradeTick {
2784 price: Price::from("5.0"),
2785 size: Quantity::from(2), instrument_id: instrument.id(),
2787 ..TradeTick::default()
2788 };
2789 let sell = TradeTick {
2790 price: Price::from("5.0"),
2791 size: Quantity::from(2), aggressor_side: AggressorSide::Seller,
2793 instrument_id: instrument.id(),
2794 ..buy
2795 };
2796
2797 aggregator.handle_trade(buy);
2798 aggregator.handle_trade(sell);
2799
2800 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2801 assert_eq!(handler_guard.len(), 2);
2802 }
2803
2804 #[rstest]
2805 fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2806 let instrument = InstrumentAny::Equity(equity_aapl);
2807 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2808 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2809 let handler = Arc::new(Mutex::new(Vec::new()));
2810 let handler_clone = Arc::clone(&handler);
2811
2812 let mut aggregator = ValueRunsBarAggregator::new(
2813 bar_type,
2814 instrument.price_precision(),
2815 instrument.size_precision(),
2816 move |bar: Bar| {
2817 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2818 handler_guard.push(bar);
2819 },
2820 );
2821
2822 let trade = TradeTick {
2823 price: Price::from("10.0"),
2824 size: Quantity::from(5),
2825 instrument_id: instrument.id(),
2826 ..TradeTick::default()
2827 };
2828
2829 aggregator.handle_trade(trade);
2830 aggregator.handle_trade(trade);
2831
2832 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2833 assert_eq!(handler_guard.len(), 1);
2834 let bar = handler_guard.first().unwrap();
2835 assert_eq!(bar.volume, Quantity::from(10));
2836 }
2837
2838 #[rstest]
2839 fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2840 let instrument = InstrumentAny::Equity(equity_aapl);
2841 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2842 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2843 let handler = Arc::new(Mutex::new(Vec::new()));
2844 let handler_clone = Arc::clone(&handler);
2845
2846 let mut aggregator = ValueRunsBarAggregator::new(
2847 bar_type,
2848 instrument.price_precision(),
2849 instrument.size_precision(),
2850 move |bar: Bar| {
2851 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2852 handler_guard.push(bar);
2853 },
2854 );
2855
2856 let buy = TradeTick {
2857 price: Price::from("10.0"),
2858 size: Quantity::from(5),
2859 instrument_id: instrument.id(),
2860 ..TradeTick::default()
2861 }; let sell = TradeTick {
2863 price: Price::from("10.0"),
2864 size: Quantity::from(10),
2865 aggressor_side: AggressorSide::Seller,
2866 ..buy
2867 }; aggregator.handle_trade(buy);
2870 aggregator.handle_trade(sell);
2871
2872 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2873 assert_eq!(handler_guard.len(), 1);
2874 assert_eq!(handler_guard[0].volume, Quantity::from(10));
2875 }
2876
2877 #[rstest]
2878 fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2879 let instrument = InstrumentAny::Equity(equity_aapl);
2880 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2881 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2882 let handler = Arc::new(Mutex::new(Vec::new()));
2883 let handler_clone = Arc::clone(&handler);
2884
2885 let mut aggregator = TickRunsBarAggregator::new(
2886 bar_type,
2887 instrument.price_precision(),
2888 instrument.size_precision(),
2889 move |bar: Bar| {
2890 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2891 handler_guard.push(bar);
2892 },
2893 );
2894
2895 let buy = TradeTick::default();
2896
2897 aggregator.handle_trade(buy);
2898 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2903 assert_eq!(handler_guard.len(), 2);
2904 }
2905
2906 #[rstest]
2907 fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2908 let instrument = InstrumentAny::Equity(equity_aapl);
2909 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2910 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2911 let handler = Arc::new(Mutex::new(Vec::new()));
2912 let handler_clone = Arc::clone(&handler);
2913
2914 let mut aggregator = TickRunsBarAggregator::new(
2915 bar_type,
2916 instrument.price_precision(),
2917 instrument.size_precision(),
2918 move |bar: Bar| {
2919 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2920 handler_guard.push(bar);
2921 },
2922 );
2923
2924 let buy = TradeTick::default();
2925 let no_aggressor = TradeTick {
2926 aggressor_side: AggressorSide::NoAggressor,
2927 ..buy
2928 };
2929
2930 aggregator.handle_trade(buy);
2931 aggregator.handle_trade(no_aggressor); aggregator.handle_trade(no_aggressor); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2936 assert_eq!(handler_guard.len(), 1);
2937 }
2938
2939 #[rstest]
2940 fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2941 let instrument = InstrumentAny::Equity(equity_aapl);
2942 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2943 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2944 let handler = Arc::new(Mutex::new(Vec::new()));
2945 let handler_clone = Arc::clone(&handler);
2946
2947 let mut aggregator = VolumeRunsBarAggregator::new(
2948 bar_type,
2949 instrument.price_precision(),
2950 instrument.size_precision(),
2951 move |bar: Bar| {
2952 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2953 handler_guard.push(bar);
2954 },
2955 );
2956
2957 let buy = TradeTick {
2958 instrument_id: instrument.id(),
2959 price: Price::from("1.0"),
2960 size: Quantity::from(1),
2961 ..TradeTick::default()
2962 };
2963
2964 aggregator.handle_trade(buy);
2965 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2970 assert_eq!(handler_guard.len(), 2);
2971 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2972 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2973 }
2974
2975 #[rstest]
2976 fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2977 let instrument = InstrumentAny::Equity(equity_aapl);
2978 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2979 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2980 let handler = Arc::new(Mutex::new(Vec::new()));
2981 let handler_clone = Arc::clone(&handler);
2982
2983 let mut aggregator = ValueRunsBarAggregator::new(
2984 bar_type,
2985 instrument.price_precision(),
2986 instrument.size_precision(),
2987 move |bar: Bar| {
2988 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2989 handler_guard.push(bar);
2990 },
2991 );
2992
2993 let buy = TradeTick {
2994 instrument_id: instrument.id(),
2995 price: Price::from("10.0"),
2996 size: Quantity::from(5),
2997 ..TradeTick::default()
2998 }; aggregator.handle_trade(buy);
3001 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3006 assert_eq!(handler_guard.len(), 2);
3007 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3008 assert_eq!(handler_guard[1].volume, Quantity::from(10));
3009 }
3010
3011 #[rstest]
3012 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
3013 let instrument = InstrumentAny::Equity(equity_aapl);
3014 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3016 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3017 let handler = Arc::new(Mutex::new(Vec::new()));
3018 let handler_clone = Arc::clone(&handler);
3019 let clock = Rc::new(RefCell::new(TestClock::new()));
3020
3021 let mut aggregator = TimeBarAggregator::new(
3022 bar_type,
3023 instrument.price_precision(),
3024 instrument.size_precision(),
3025 clock.clone(),
3026 move |bar: Bar| {
3027 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3028 handler_guard.push(bar);
3029 },
3030 true, false, BarIntervalType::LeftOpen,
3033 None, 15, false, );
3037
3038 aggregator.update(
3039 Price::from("100.00"),
3040 Quantity::from(1),
3041 UnixNanos::default(),
3042 );
3043
3044 let next_sec = UnixNanos::from(1_000_000_000);
3045 clock.borrow_mut().set_time(next_sec);
3046
3047 let event = TimeEvent::new(
3048 Ustr::from("1-SECOND-LAST"),
3049 UUID4::new(),
3050 next_sec,
3051 next_sec,
3052 );
3053 aggregator.build_bar(event);
3054
3055 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3056 assert_eq!(handler_guard.len(), 1);
3057 let bar = handler_guard.first().unwrap();
3058 assert_eq!(bar.ts_event, UnixNanos::default());
3059 assert_eq!(bar.ts_init, next_sec);
3060 }
3061
3062 #[rstest]
3063 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3064 let instrument = InstrumentAny::Equity(equity_aapl);
3065 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3066 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3067 let handler = Arc::new(Mutex::new(Vec::new()));
3068 let handler_clone = Arc::clone(&handler);
3069 let clock = Rc::new(RefCell::new(TestClock::new()));
3070
3071 let mut aggregator = TimeBarAggregator::new(
3072 bar_type,
3073 instrument.price_precision(),
3074 instrument.size_precision(),
3075 clock.clone(),
3076 move |bar: Bar| {
3077 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3078 handler_guard.push(bar);
3079 },
3080 true, true, BarIntervalType::LeftOpen,
3083 None,
3084 15,
3085 false, );
3087
3088 aggregator.update(
3090 Price::from("100.00"),
3091 Quantity::from(1),
3092 UnixNanos::default(),
3093 );
3094
3095 let ts1 = UnixNanos::from(1_000_000_000);
3097 clock.borrow_mut().set_time(ts1);
3098 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3099 aggregator.build_bar(event);
3100
3101 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3103
3104 let ts2 = UnixNanos::from(2_000_000_000);
3106 clock.borrow_mut().set_time(ts2);
3107 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3108 aggregator.build_bar(event);
3109
3110 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3111 assert_eq!(handler_guard.len(), 2);
3112
3113 let bar1 = &handler_guard[0];
3114 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
3116 assert_eq!(bar1.close, Price::from("100.00"));
3117 let bar2 = &handler_guard[1];
3118 assert_eq!(bar2.ts_event, ts2);
3119 assert_eq!(bar2.ts_init, ts2);
3120 assert_eq!(bar2.close, Price::from("101.00"));
3121 }
3122
3123 #[rstest]
3124 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3125 let instrument = InstrumentAny::Equity(equity_aapl);
3126 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3127 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3128 let handler = Arc::new(Mutex::new(Vec::new()));
3129 let handler_clone = Arc::clone(&handler);
3130 let clock = Rc::new(RefCell::new(TestClock::new()));
3131 let mut aggregator = TimeBarAggregator::new(
3132 bar_type,
3133 instrument.price_precision(),
3134 instrument.size_precision(),
3135 clock.clone(),
3136 move |bar: Bar| {
3137 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3138 handler_guard.push(bar);
3139 },
3140 true, true, BarIntervalType::RightOpen,
3143 None,
3144 15,
3145 false, );
3147
3148 aggregator.update(
3150 Price::from("100.00"),
3151 Quantity::from(1),
3152 UnixNanos::default(),
3153 );
3154
3155 let ts1 = UnixNanos::from(1_000_000_000);
3157 clock.borrow_mut().set_time(ts1);
3158 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3159 aggregator.build_bar(event);
3160
3161 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3163
3164 let ts2 = UnixNanos::from(2_000_000_000);
3166 clock.borrow_mut().set_time(ts2);
3167 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3168 aggregator.build_bar(event);
3169
3170 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3171 assert_eq!(handler_guard.len(), 2);
3172
3173 let bar1 = &handler_guard[0];
3174 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
3176 assert_eq!(bar1.close, Price::from("100.00"));
3177
3178 let bar2 = &handler_guard[1];
3179 assert_eq!(bar2.ts_event, ts1);
3180 assert_eq!(bar2.ts_init, ts2);
3181 assert_eq!(bar2.close, Price::from("101.00"));
3182 }
3183
3184 #[rstest]
3185 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3186 let instrument = InstrumentAny::Equity(equity_aapl);
3187 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3188 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3189 let handler = Arc::new(Mutex::new(Vec::new()));
3190 let handler_clone = Arc::clone(&handler);
3191 let clock = Rc::new(RefCell::new(TestClock::new()));
3192
3193 let mut aggregator = TimeBarAggregator::new(
3195 bar_type,
3196 instrument.price_precision(),
3197 instrument.size_precision(),
3198 clock.clone(),
3199 move |bar: Bar| {
3200 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3201 handler_guard.push(bar);
3202 },
3203 false, true, BarIntervalType::LeftOpen,
3206 None,
3207 15,
3208 false, );
3210
3211 let ts1 = UnixNanos::from(1_000_000_000);
3213 clock.borrow_mut().set_time(ts1);
3214 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3215 aggregator.build_bar(event);
3216
3217 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3218 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
3220
3221 let handler = Arc::new(Mutex::new(Vec::new()));
3223 let handler_clone = Arc::clone(&handler);
3224 let mut aggregator = TimeBarAggregator::new(
3225 bar_type,
3226 instrument.price_precision(),
3227 instrument.size_precision(),
3228 clock.clone(),
3229 move |bar: Bar| {
3230 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3231 handler_guard.push(bar);
3232 },
3233 true, true, BarIntervalType::LeftOpen,
3236 None,
3237 15,
3238 false, );
3240
3241 aggregator.update(
3242 Price::from("100.00"),
3243 Quantity::from(1),
3244 UnixNanos::default(),
3245 );
3246
3247 let ts1 = UnixNanos::from(1_000_000_000);
3249 clock.borrow_mut().set_time(ts1);
3250 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3251 aggregator.build_bar(event);
3252
3253 let ts2 = UnixNanos::from(2_000_000_000);
3255 clock.borrow_mut().set_time(ts2);
3256 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3257 aggregator.build_bar(event);
3258
3259 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3260 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
3262 assert_eq!(bar1.close, Price::from("100.00"));
3263 let bar2 = &handler_guard[1];
3264 assert_eq!(bar2.close, Price::from("100.00")); }
3266
3267 #[rstest]
3268 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3269 let instrument = InstrumentAny::Equity(equity_aapl);
3270 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3271 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3272 let clock = Rc::new(RefCell::new(TestClock::new()));
3273 let handler = Arc::new(Mutex::new(Vec::new()));
3274 let handler_clone = Arc::clone(&handler);
3275
3276 let mut aggregator = TimeBarAggregator::new(
3277 bar_type,
3278 instrument.price_precision(),
3279 instrument.size_precision(),
3280 clock.clone(),
3281 move |bar: Bar| {
3282 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3283 handler_guard.push(bar);
3284 },
3285 true, true, BarIntervalType::RightOpen,
3288 None,
3289 15,
3290 false, );
3292
3293 let ts1 = UnixNanos::from(1_000_000_000);
3294 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3295
3296 let ts2 = UnixNanos::from(2_000_000_000);
3297 clock.borrow_mut().set_time(ts2);
3298
3299 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3301 aggregator.build_bar(event);
3302
3303 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3304 let bar = handler_guard.first().unwrap();
3305 assert_eq!(bar.ts_event, UnixNanos::default());
3306 assert_eq!(bar.ts_init, ts2);
3307 }
3308
3309 #[rstest]
3310 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3311 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3312 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3314 let handler = Arc::new(Mutex::new(Vec::new()));
3315 let handler_clone = Arc::clone(&handler);
3316
3317 let aggregator = RenkoBarAggregator::new(
3318 bar_type,
3319 instrument.price_precision(),
3320 instrument.size_precision(),
3321 instrument.price_increment(),
3322 move |bar: Bar| {
3323 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3324 handler_guard.push(bar);
3325 },
3326 );
3327
3328 assert_eq!(aggregator.bar_type(), bar_type);
3329 assert!(!aggregator.is_running());
3330 let expected_brick_size = 10 * instrument.price_increment().raw;
3332 assert_eq!(aggregator.brick_size, expected_brick_size);
3333 }
3334
3335 #[rstest]
3336 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3337 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3338 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3340 let handler = Arc::new(Mutex::new(Vec::new()));
3341 let handler_clone = Arc::clone(&handler);
3342
3343 let mut aggregator = RenkoBarAggregator::new(
3344 bar_type,
3345 instrument.price_precision(),
3346 instrument.size_precision(),
3347 instrument.price_increment(),
3348 move |bar: Bar| {
3349 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3350 handler_guard.push(bar);
3351 },
3352 );
3353
3354 aggregator.update(
3356 Price::from("1.00000"),
3357 Quantity::from(1),
3358 UnixNanos::default(),
3359 );
3360 aggregator.update(
3361 Price::from("1.00005"),
3362 Quantity::from(1),
3363 UnixNanos::from(1000),
3364 );
3365
3366 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3367 assert_eq!(handler_guard.len(), 0); }
3369
3370 #[rstest]
3371 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3372 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3373 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3375 let handler = Arc::new(Mutex::new(Vec::new()));
3376 let handler_clone = Arc::clone(&handler);
3377
3378 let mut aggregator = RenkoBarAggregator::new(
3379 bar_type,
3380 instrument.price_precision(),
3381 instrument.size_precision(),
3382 instrument.price_increment(),
3383 move |bar: Bar| {
3384 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3385 handler_guard.push(bar);
3386 },
3387 );
3388
3389 aggregator.update(
3391 Price::from("1.00000"),
3392 Quantity::from(1),
3393 UnixNanos::default(),
3394 );
3395 aggregator.update(
3396 Price::from("1.00015"),
3397 Quantity::from(1),
3398 UnixNanos::from(1000),
3399 );
3400
3401 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3402 assert_eq!(handler_guard.len(), 1);
3403
3404 let bar = handler_guard.first().unwrap();
3405 assert_eq!(bar.open, Price::from("1.00000"));
3406 assert_eq!(bar.high, Price::from("1.00010"));
3407 assert_eq!(bar.low, Price::from("1.00000"));
3408 assert_eq!(bar.close, Price::from("1.00010"));
3409 assert_eq!(bar.volume, Quantity::from(2));
3410 assert_eq!(bar.ts_event, UnixNanos::from(1000));
3411 assert_eq!(bar.ts_init, UnixNanos::from(1000));
3412 }
3413
3414 #[rstest]
3415 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3416 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3417 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3419 let handler = Arc::new(Mutex::new(Vec::new()));
3420 let handler_clone = Arc::clone(&handler);
3421
3422 let mut aggregator = RenkoBarAggregator::new(
3423 bar_type,
3424 instrument.price_precision(),
3425 instrument.size_precision(),
3426 instrument.price_increment(),
3427 move |bar: Bar| {
3428 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3429 handler_guard.push(bar);
3430 },
3431 );
3432
3433 aggregator.update(
3435 Price::from("1.00000"),
3436 Quantity::from(1),
3437 UnixNanos::default(),
3438 );
3439 aggregator.update(
3440 Price::from("1.00025"),
3441 Quantity::from(1),
3442 UnixNanos::from(1000),
3443 );
3444
3445 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3446 assert_eq!(handler_guard.len(), 2);
3447
3448 let bar1 = &handler_guard[0];
3449 assert_eq!(bar1.open, Price::from("1.00000"));
3450 assert_eq!(bar1.high, Price::from("1.00010"));
3451 assert_eq!(bar1.low, Price::from("1.00000"));
3452 assert_eq!(bar1.close, Price::from("1.00010"));
3453
3454 let bar2 = &handler_guard[1];
3455 assert_eq!(bar2.open, Price::from("1.00010"));
3456 assert_eq!(bar2.high, Price::from("1.00020"));
3457 assert_eq!(bar2.low, Price::from("1.00010"));
3458 assert_eq!(bar2.close, Price::from("1.00020"));
3459 }
3460
3461 #[rstest]
3462 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3463 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3464 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3466 let handler = Arc::new(Mutex::new(Vec::new()));
3467 let handler_clone = Arc::clone(&handler);
3468
3469 let mut aggregator = RenkoBarAggregator::new(
3470 bar_type,
3471 instrument.price_precision(),
3472 instrument.size_precision(),
3473 instrument.price_increment(),
3474 move |bar: Bar| {
3475 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3476 handler_guard.push(bar);
3477 },
3478 );
3479
3480 aggregator.update(
3482 Price::from("1.00020"),
3483 Quantity::from(1),
3484 UnixNanos::default(),
3485 );
3486 aggregator.update(
3487 Price::from("1.00005"),
3488 Quantity::from(1),
3489 UnixNanos::from(1000),
3490 );
3491
3492 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3493 assert_eq!(handler_guard.len(), 1);
3494
3495 let bar = handler_guard.first().unwrap();
3496 assert_eq!(bar.open, Price::from("1.00020"));
3497 assert_eq!(bar.high, Price::from("1.00020"));
3498 assert_eq!(bar.low, Price::from("1.00010"));
3499 assert_eq!(bar.close, Price::from("1.00010"));
3500 assert_eq!(bar.volume, Quantity::from(2));
3501 }
3502
3503 #[rstest]
3504 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3505 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3506 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3508 let handler = Arc::new(Mutex::new(Vec::new()));
3509 let handler_clone = Arc::clone(&handler);
3510
3511 let mut aggregator = RenkoBarAggregator::new(
3512 bar_type,
3513 instrument.price_precision(),
3514 instrument.size_precision(),
3515 instrument.price_increment(),
3516 move |bar: Bar| {
3517 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3518 handler_guard.push(bar);
3519 },
3520 );
3521
3522 let input_bar = Bar::new(
3524 BarType::new(
3525 instrument.id(),
3526 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3527 AggregationSource::Internal,
3528 ),
3529 Price::from("1.00000"),
3530 Price::from("1.00005"),
3531 Price::from("0.99995"),
3532 Price::from("1.00005"), Quantity::from(100),
3534 UnixNanos::default(),
3535 UnixNanos::from(1000),
3536 );
3537
3538 aggregator.handle_bar(input_bar);
3539
3540 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3541 assert_eq!(handler_guard.len(), 0); }
3543
3544 #[rstest]
3545 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3546 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3547 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3549 let handler = Arc::new(Mutex::new(Vec::new()));
3550 let handler_clone = Arc::clone(&handler);
3551
3552 let mut aggregator = RenkoBarAggregator::new(
3553 bar_type,
3554 instrument.price_precision(),
3555 instrument.size_precision(),
3556 instrument.price_increment(),
3557 move |bar: Bar| {
3558 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3559 handler_guard.push(bar);
3560 },
3561 );
3562
3563 let bar1 = Bar::new(
3565 BarType::new(
3566 instrument.id(),
3567 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3568 AggregationSource::Internal,
3569 ),
3570 Price::from("1.00000"),
3571 Price::from("1.00005"),
3572 Price::from("0.99995"),
3573 Price::from("1.00000"),
3574 Quantity::from(100),
3575 UnixNanos::default(),
3576 UnixNanos::default(),
3577 );
3578
3579 let bar2 = Bar::new(
3581 BarType::new(
3582 instrument.id(),
3583 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3584 AggregationSource::Internal,
3585 ),
3586 Price::from("1.00000"),
3587 Price::from("1.00015"),
3588 Price::from("0.99995"),
3589 Price::from("1.00010"), Quantity::from(50),
3591 UnixNanos::from(60_000_000_000),
3592 UnixNanos::from(60_000_000_000),
3593 );
3594
3595 aggregator.handle_bar(bar1);
3596 aggregator.handle_bar(bar2);
3597
3598 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3599 assert_eq!(handler_guard.len(), 1);
3600
3601 let bar = handler_guard.first().unwrap();
3602 assert_eq!(bar.open, Price::from("1.00000"));
3603 assert_eq!(bar.high, Price::from("1.00010"));
3604 assert_eq!(bar.low, Price::from("1.00000"));
3605 assert_eq!(bar.close, Price::from("1.00010"));
3606 assert_eq!(bar.volume, Quantity::from(150));
3607 }
3608
3609 #[rstest]
3610 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3611 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3612 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3614 let handler = Arc::new(Mutex::new(Vec::new()));
3615 let handler_clone = Arc::clone(&handler);
3616
3617 let mut aggregator = RenkoBarAggregator::new(
3618 bar_type,
3619 instrument.price_precision(),
3620 instrument.size_precision(),
3621 instrument.price_increment(),
3622 move |bar: Bar| {
3623 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3624 handler_guard.push(bar);
3625 },
3626 );
3627
3628 let bar1 = Bar::new(
3630 BarType::new(
3631 instrument.id(),
3632 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3633 AggregationSource::Internal,
3634 ),
3635 Price::from("1.00000"),
3636 Price::from("1.00005"),
3637 Price::from("0.99995"),
3638 Price::from("1.00000"),
3639 Quantity::from(100),
3640 UnixNanos::default(),
3641 UnixNanos::default(),
3642 );
3643
3644 let bar2 = Bar::new(
3646 BarType::new(
3647 instrument.id(),
3648 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3649 AggregationSource::Internal,
3650 ),
3651 Price::from("1.00000"),
3652 Price::from("1.00035"),
3653 Price::from("0.99995"),
3654 Price::from("1.00030"), Quantity::from(50),
3656 UnixNanos::from(60_000_000_000),
3657 UnixNanos::from(60_000_000_000),
3658 );
3659
3660 aggregator.handle_bar(bar1);
3661 aggregator.handle_bar(bar2);
3662
3663 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3664 assert_eq!(handler_guard.len(), 3);
3665
3666 let bar1 = &handler_guard[0];
3667 assert_eq!(bar1.open, Price::from("1.00000"));
3668 assert_eq!(bar1.close, Price::from("1.00010"));
3669
3670 let bar2 = &handler_guard[1];
3671 assert_eq!(bar2.open, Price::from("1.00010"));
3672 assert_eq!(bar2.close, Price::from("1.00020"));
3673
3674 let bar3 = &handler_guard[2];
3675 assert_eq!(bar3.open, Price::from("1.00020"));
3676 assert_eq!(bar3.close, Price::from("1.00030"));
3677 }
3678
3679 #[rstest]
3680 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3681 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3682 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3684 let handler = Arc::new(Mutex::new(Vec::new()));
3685 let handler_clone = Arc::clone(&handler);
3686
3687 let mut aggregator = RenkoBarAggregator::new(
3688 bar_type,
3689 instrument.price_precision(),
3690 instrument.size_precision(),
3691 instrument.price_increment(),
3692 move |bar: Bar| {
3693 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3694 handler_guard.push(bar);
3695 },
3696 );
3697
3698 let bar1 = Bar::new(
3700 BarType::new(
3701 instrument.id(),
3702 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3703 AggregationSource::Internal,
3704 ),
3705 Price::from("1.00020"),
3706 Price::from("1.00025"),
3707 Price::from("1.00015"),
3708 Price::from("1.00020"),
3709 Quantity::from(100),
3710 UnixNanos::default(),
3711 UnixNanos::default(),
3712 );
3713
3714 let bar2 = Bar::new(
3716 BarType::new(
3717 instrument.id(),
3718 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3719 AggregationSource::Internal,
3720 ),
3721 Price::from("1.00020"),
3722 Price::from("1.00025"),
3723 Price::from("1.00005"),
3724 Price::from("1.00010"), Quantity::from(50),
3726 UnixNanos::from(60_000_000_000),
3727 UnixNanos::from(60_000_000_000),
3728 );
3729
3730 aggregator.handle_bar(bar1);
3731 aggregator.handle_bar(bar2);
3732
3733 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3734 assert_eq!(handler_guard.len(), 1);
3735
3736 let bar = handler_guard.first().unwrap();
3737 assert_eq!(bar.open, Price::from("1.00020"));
3738 assert_eq!(bar.high, Price::from("1.00020"));
3739 assert_eq!(bar.low, Price::from("1.00010"));
3740 assert_eq!(bar.close, Price::from("1.00010"));
3741 assert_eq!(bar.volume, Quantity::from(150));
3742 }
3743
3744 #[rstest]
3745 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3746 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3747
3748 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3751 let handler = Arc::new(Mutex::new(Vec::new()));
3752 let handler_clone = Arc::clone(&handler);
3753
3754 let aggregator_5 = RenkoBarAggregator::new(
3755 bar_type_5,
3756 instrument.price_precision(),
3757 instrument.size_precision(),
3758 instrument.price_increment(),
3759 move |_bar: Bar| {
3760 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3761 handler_guard.push(_bar);
3762 },
3763 );
3764
3765 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3767 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3768
3769 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3771 let handler2 = Arc::new(Mutex::new(Vec::new()));
3772 let handler2_clone = Arc::clone(&handler2);
3773
3774 let aggregator_20 = RenkoBarAggregator::new(
3775 bar_type_20,
3776 instrument.price_precision(),
3777 instrument.size_precision(),
3778 instrument.price_increment(),
3779 move |_bar: Bar| {
3780 let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3781 handler_guard.push(_bar);
3782 },
3783 );
3784
3785 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3787 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3788 }
3789
3790 #[rstest]
3791 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3792 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3793 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3795 let handler = Arc::new(Mutex::new(Vec::new()));
3796 let handler_clone = Arc::clone(&handler);
3797
3798 let mut aggregator = RenkoBarAggregator::new(
3799 bar_type,
3800 instrument.price_precision(),
3801 instrument.size_precision(),
3802 instrument.price_increment(),
3803 move |bar: Bar| {
3804 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3805 handler_guard.push(bar);
3806 },
3807 );
3808
3809 aggregator.update(
3811 Price::from("1.00000"),
3812 Quantity::from(1),
3813 UnixNanos::from(1000),
3814 );
3815 aggregator.update(
3816 Price::from("1.00010"),
3817 Quantity::from(1),
3818 UnixNanos::from(2000),
3819 ); aggregator.update(
3821 Price::from("1.00020"),
3822 Quantity::from(1),
3823 UnixNanos::from(3000),
3824 ); aggregator.update(
3826 Price::from("1.00025"),
3827 Quantity::from(1),
3828 UnixNanos::from(4000),
3829 ); aggregator.update(
3831 Price::from("1.00030"),
3832 Quantity::from(1),
3833 UnixNanos::from(5000),
3834 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3837 assert_eq!(handler_guard.len(), 3);
3838
3839 let bar1 = &handler_guard[0];
3840 assert_eq!(bar1.open, Price::from("1.00000"));
3841 assert_eq!(bar1.close, Price::from("1.00010"));
3842
3843 let bar2 = &handler_guard[1];
3844 assert_eq!(bar2.open, Price::from("1.00010"));
3845 assert_eq!(bar2.close, Price::from("1.00020"));
3846
3847 let bar3 = &handler_guard[2];
3848 assert_eq!(bar3.open, Price::from("1.00020"));
3849 assert_eq!(bar3.close, Price::from("1.00030"));
3850 }
3851
3852 #[rstest]
3853 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3854 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3855 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3857 let handler = Arc::new(Mutex::new(Vec::new()));
3858 let handler_clone = Arc::clone(&handler);
3859
3860 let mut aggregator = RenkoBarAggregator::new(
3861 bar_type,
3862 instrument.price_precision(),
3863 instrument.size_precision(),
3864 instrument.price_increment(),
3865 move |bar: Bar| {
3866 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3867 handler_guard.push(bar);
3868 },
3869 );
3870
3871 aggregator.update(
3873 Price::from("1.00000"),
3874 Quantity::from(1),
3875 UnixNanos::from(1000),
3876 );
3877 aggregator.update(
3878 Price::from("1.00010"),
3879 Quantity::from(1),
3880 UnixNanos::from(2000),
3881 ); aggregator.update(
3883 Price::from("0.99990"),
3884 Quantity::from(1),
3885 UnixNanos::from(3000),
3886 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3889 assert_eq!(handler_guard.len(), 3);
3890
3891 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
3893 assert_eq!(bar1.high, Price::from("1.00010"));
3894 assert_eq!(bar1.low, Price::from("1.00000"));
3895 assert_eq!(bar1.close, Price::from("1.00010"));
3896
3897 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
3899 assert_eq!(bar2.high, Price::from("1.00010"));
3900 assert_eq!(bar2.low, Price::from("1.00000"));
3901 assert_eq!(bar2.close, Price::from("1.00000"));
3902
3903 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
3905 assert_eq!(bar3.high, Price::from("1.00000"));
3906 assert_eq!(bar3.low, Price::from("0.99990"));
3907 assert_eq!(bar3.close, Price::from("0.99990"));
3908 }
3909
3910 #[rstest]
3911 fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3912 let instrument = InstrumentAny::Equity(equity_aapl);
3913 let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3914 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3915 let handler = Arc::new(Mutex::new(Vec::new()));
3916 let handler_clone = Arc::clone(&handler);
3917
3918 let mut aggregator = TickImbalanceBarAggregator::new(
3919 bar_type,
3920 instrument.price_precision(),
3921 instrument.size_precision(),
3922 move |bar: Bar| {
3923 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3924 handler_guard.push(bar);
3925 },
3926 );
3927
3928 let buy = TradeTick {
3929 aggressor_side: AggressorSide::Buyer,
3930 ..TradeTick::default()
3931 };
3932 let sell = TradeTick {
3933 aggressor_side: AggressorSide::Seller,
3934 ..TradeTick::default()
3935 };
3936
3937 aggregator.handle_trade(buy);
3938 aggregator.handle_trade(sell);
3939 aggregator.handle_trade(buy);
3940
3941 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3942 assert_eq!(handler_guard.len(), 0);
3943 }
3944
3945 #[rstest]
3946 fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3947 let instrument = InstrumentAny::Equity(equity_aapl);
3948 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3949 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3950 let handler = Arc::new(Mutex::new(Vec::new()));
3951 let handler_clone = Arc::clone(&handler);
3952
3953 let mut aggregator = TickImbalanceBarAggregator::new(
3954 bar_type,
3955 instrument.price_precision(),
3956 instrument.size_precision(),
3957 move |bar: Bar| {
3958 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3959 handler_guard.push(bar);
3960 },
3961 );
3962
3963 let buy = TradeTick {
3964 aggressor_side: AggressorSide::Buyer,
3965 ..TradeTick::default()
3966 };
3967 let no_aggressor = TradeTick {
3968 aggressor_side: AggressorSide::NoAggressor,
3969 ..TradeTick::default()
3970 };
3971
3972 aggregator.handle_trade(buy);
3973 aggregator.handle_trade(no_aggressor);
3974 aggregator.handle_trade(buy);
3975
3976 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3977 assert_eq!(handler_guard.len(), 1);
3978 }
3979
3980 #[rstest]
3981 fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3982 let instrument = InstrumentAny::Equity(equity_aapl);
3983 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3984 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3985 let handler = Arc::new(Mutex::new(Vec::new()));
3986 let handler_clone = Arc::clone(&handler);
3987
3988 let mut aggregator = TickRunsBarAggregator::new(
3989 bar_type,
3990 instrument.price_precision(),
3991 instrument.size_precision(),
3992 move |bar: Bar| {
3993 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3994 handler_guard.push(bar);
3995 },
3996 );
3997
3998 let buy = TradeTick {
3999 aggressor_side: AggressorSide::Buyer,
4000 ..TradeTick::default()
4001 };
4002 let sell = TradeTick {
4003 aggressor_side: AggressorSide::Seller,
4004 ..TradeTick::default()
4005 };
4006
4007 aggregator.handle_trade(buy);
4008 aggregator.handle_trade(buy);
4009 aggregator.handle_trade(sell);
4010 aggregator.handle_trade(sell);
4011
4012 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4013 assert_eq!(handler_guard.len(), 2);
4014 }
4015
4016 #[rstest]
4017 fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4018 let instrument = InstrumentAny::Equity(equity_aapl);
4019 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4020 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4021 let handler = Arc::new(Mutex::new(Vec::new()));
4022 let handler_clone = Arc::clone(&handler);
4023
4024 let mut aggregator = VolumeImbalanceBarAggregator::new(
4025 bar_type,
4026 instrument.price_precision(),
4027 instrument.size_precision(),
4028 move |bar: Bar| {
4029 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4030 handler_guard.push(bar);
4031 },
4032 );
4033
4034 let large_trade = TradeTick {
4035 size: Quantity::from(25),
4036 aggressor_side: AggressorSide::Buyer,
4037 ..TradeTick::default()
4038 };
4039
4040 aggregator.handle_trade(large_trade);
4041
4042 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4043 assert_eq!(handler_guard.len(), 2);
4044 }
4045
4046 #[rstest]
4047 fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4048 equity_aapl: Equity,
4049 ) {
4050 let instrument = InstrumentAny::Equity(equity_aapl);
4051 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4052 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4053 let handler = Arc::new(Mutex::new(Vec::new()));
4054 let handler_clone = Arc::clone(&handler);
4055
4056 let mut aggregator = VolumeImbalanceBarAggregator::new(
4057 bar_type,
4058 instrument.price_precision(),
4059 instrument.size_precision(),
4060 move |bar: Bar| {
4061 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4062 handler_guard.push(bar);
4063 },
4064 );
4065
4066 let buy = TradeTick {
4067 size: Quantity::from(5),
4068 aggressor_side: AggressorSide::Buyer,
4069 ..TradeTick::default()
4070 };
4071 let no_aggressor = TradeTick {
4072 size: Quantity::from(3),
4073 aggressor_side: AggressorSide::NoAggressor,
4074 ..TradeTick::default()
4075 };
4076
4077 aggregator.handle_trade(buy);
4078 aggregator.handle_trade(no_aggressor);
4079 aggregator.handle_trade(buy);
4080
4081 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4082 assert_eq!(handler_guard.len(), 1);
4083 }
4084
4085 #[rstest]
4086 fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4087 let instrument = InstrumentAny::Equity(equity_aapl);
4088 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4089 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4090 let handler = Arc::new(Mutex::new(Vec::new()));
4091 let handler_clone = Arc::clone(&handler);
4092
4093 let mut aggregator = VolumeRunsBarAggregator::new(
4094 bar_type,
4095 instrument.price_precision(),
4096 instrument.size_precision(),
4097 move |bar: Bar| {
4098 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4099 handler_guard.push(bar);
4100 },
4101 );
4102
4103 let large_trade = TradeTick {
4104 size: Quantity::from(25),
4105 aggressor_side: AggressorSide::Buyer,
4106 ..TradeTick::default()
4107 };
4108
4109 aggregator.handle_trade(large_trade);
4110
4111 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4112 assert_eq!(handler_guard.len(), 2);
4113 }
4114
4115 #[rstest]
4116 fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4117 let instrument = InstrumentAny::Equity(equity_aapl);
4118 let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4119 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4120 let handler = Arc::new(Mutex::new(Vec::new()));
4121 let handler_clone = Arc::clone(&handler);
4122
4123 let mut aggregator = ValueRunsBarAggregator::new(
4124 bar_type,
4125 instrument.price_precision(),
4126 instrument.size_precision(),
4127 move |bar: Bar| {
4128 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4129 handler_guard.push(bar);
4130 },
4131 );
4132
4133 let large_trade = TradeTick {
4134 price: Price::from("5.00"),
4135 size: Quantity::from(25),
4136 aggressor_side: AggressorSide::Buyer,
4137 ..TradeTick::default()
4138 };
4139
4140 aggregator.handle_trade(large_trade);
4141
4142 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4143 assert_eq!(handler_guard.len(), 2);
4144 }
4145
4146 #[rstest]
4147 fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4148 let instrument = InstrumentAny::Equity(equity_aapl);
4149 let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
4150 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4151 let handler = Arc::new(Mutex::new(Vec::new()));
4152 let handler_clone = Arc::clone(&handler);
4153
4154 let mut aggregator = ValueBarAggregator::new(
4155 bar_type,
4156 instrument.price_precision(),
4157 instrument.size_precision(),
4158 move |bar: Bar| {
4159 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4160 handler_guard.push(bar);
4161 },
4162 );
4163
4164 aggregator.update(
4166 Price::from("1000.00"),
4167 Quantity::from(3),
4168 UnixNanos::default(),
4169 );
4170
4171 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4173 assert_eq!(handler_guard.len(), 3);
4174 for bar in handler_guard.iter() {
4175 assert_eq!(bar.volume, Quantity::from(1));
4176 }
4177 }
4178
4179 #[rstest]
4180 fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4181 let instrument = InstrumentAny::Equity(equity_aapl);
4182 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
4183 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4184 let handler = Arc::new(Mutex::new(Vec::new()));
4185 let handler_clone = Arc::clone(&handler);
4186
4187 let mut aggregator = ValueImbalanceBarAggregator::new(
4188 bar_type,
4189 instrument.price_precision(),
4190 instrument.size_precision(),
4191 move |bar: Bar| {
4192 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4193 handler_guard.push(bar);
4194 },
4195 );
4196
4197 let trade = TradeTick {
4198 price: Price::from("1000.00"),
4199 size: Quantity::from(3),
4200 aggressor_side: AggressorSide::Buyer,
4201 instrument_id: instrument.id(),
4202 ..TradeTick::default()
4203 };
4204
4205 aggregator.handle_trade(trade);
4206
4207 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4208 assert_eq!(handler_guard.len(), 3);
4209 for bar in handler_guard.iter() {
4210 assert_eq!(bar.volume, Quantity::from(1));
4211 }
4212 }
4213
4214 #[rstest]
4215 fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
4216 let instrument = InstrumentAny::Equity(equity_aapl);
4217 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
4218 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4219 let handler = Arc::new(Mutex::new(Vec::new()));
4220 let handler_clone = Arc::clone(&handler);
4221
4222 let mut aggregator = ValueImbalanceBarAggregator::new(
4223 bar_type,
4224 instrument.price_precision(),
4225 instrument.size_precision(),
4226 move |bar: Bar| {
4227 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4228 handler_guard.push(bar);
4229 },
4230 );
4231
4232 let sell_tick = TradeTick {
4234 price: Price::from("10.00"),
4235 size: Quantity::from(5),
4236 aggressor_side: AggressorSide::Seller,
4237 instrument_id: instrument.id(),
4238 ..TradeTick::default()
4239 };
4240
4241 let buy_tick = TradeTick {
4244 price: Price::from("1000.00"),
4245 size: Quantity::from(1),
4246 aggressor_side: AggressorSide::Buyer,
4247 instrument_id: instrument.id(),
4248 ts_init: UnixNanos::from(1),
4249 ts_event: UnixNanos::from(1),
4250 ..TradeTick::default()
4251 };
4252
4253 aggregator.handle_trade(sell_tick);
4254 aggregator.handle_trade(buy_tick);
4255
4256 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4257 assert_eq!(handler_guard.len(), 1);
4258 assert_eq!(handler_guard[0].volume, Quantity::from(6));
4259 }
4260
4261 #[rstest]
4262 fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4263 let instrument = InstrumentAny::Equity(equity_aapl);
4264 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4265 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4266 let handler = Arc::new(Mutex::new(Vec::new()));
4267 let handler_clone = Arc::clone(&handler);
4268
4269 let mut aggregator = ValueRunsBarAggregator::new(
4270 bar_type,
4271 instrument.price_precision(),
4272 instrument.size_precision(),
4273 move |bar: Bar| {
4274 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4275 handler_guard.push(bar);
4276 },
4277 );
4278
4279 let trade = TradeTick {
4280 price: Price::from("1000.00"),
4281 size: Quantity::from(3),
4282 aggressor_side: AggressorSide::Buyer,
4283 instrument_id: instrument.id(),
4284 ..TradeTick::default()
4285 };
4286
4287 aggregator.handle_trade(trade);
4288
4289 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4290 assert_eq!(handler_guard.len(), 3);
4291 for bar in handler_guard.iter() {
4292 assert_eq!(bar.volume, Quantity::from(1));
4293 }
4294 }
4295}