1use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25 clock::Clock,
26 timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29 UnixNanos,
30 correctness::{self, FAILED},
31 datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34 data::{
35 QuoteTick, TradeTick,
36 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37 },
38 enums::{AggregationSource, BarAggregation, BarIntervalType},
39 types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
40};
41
42pub trait BarAggregator: Any + Debug {
46 fn bar_type(&self) -> BarType;
48 fn is_running(&self) -> bool;
50 fn set_await_partial(&mut self, value: bool);
51 fn set_is_running(&mut self, value: bool);
53 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
56 fn handle_quote(&mut self, quote: QuoteTick) {
58 let spec = self.bar_type().spec();
59 if !self.await_partial() {
60 self.update(
61 quote.extract_price(spec.price_type),
62 quote.extract_size(spec.price_type),
63 quote.ts_event,
64 );
65 }
66 }
67 fn handle_trade(&mut self, trade: TradeTick) {
69 if !self.await_partial() {
70 self.update(trade.price, trade.size, trade.ts_event);
71 }
72 }
73 fn handle_bar(&mut self, bar: Bar) {
75 if !self.await_partial() {
76 self.update_bar(bar, bar.volume, bar.ts_init);
77 }
78 }
79 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
80 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
82 fn stop_batch_update(&mut self);
84 fn await_partial(&self) -> bool;
86 fn set_partial(&mut self, partial_bar: Bar);
89 fn stop(&mut self) {}
91}
92
93impl dyn BarAggregator {
94 pub fn as_any(&self) -> &dyn Any {
96 self
97 }
98 pub fn as_any_mut(&mut self) -> &mut dyn Any {
100 self
101 }
102}
103
104#[derive(Debug)]
106pub struct BarBuilder {
107 bar_type: BarType,
108 price_precision: u8,
109 size_precision: u8,
110 initialized: bool,
111 ts_last: UnixNanos,
112 count: usize,
113 partial_set: bool,
114 last_close: Option<Price>,
115 open: Option<Price>,
116 high: Option<Price>,
117 low: Option<Price>,
118 close: Option<Price>,
119 volume: Quantity,
120}
121
122impl BarBuilder {
123 #[must_use]
131 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
132 correctness::check_equal(
133 &bar_type.aggregation_source(),
134 &AggregationSource::Internal,
135 "bar_type.aggregation_source",
136 "AggregationSource::Internal",
137 )
138 .expect(FAILED);
139
140 Self {
141 bar_type,
142 price_precision,
143 size_precision,
144 initialized: false,
145 ts_last: UnixNanos::default(),
146 count: 0,
147 partial_set: false,
148 last_close: None,
149 open: None,
150 high: None,
151 low: None,
152 close: None,
153 volume: Quantity::zero(size_precision),
154 }
155 }
156
157 pub fn set_partial(&mut self, partial_bar: Bar) {
163 if self.partial_set {
164 return; }
166
167 self.open = Some(partial_bar.open);
168
169 if self.high.is_none() || partial_bar.high > self.high.unwrap() {
170 self.high = Some(partial_bar.high);
171 }
172
173 if self.low.is_none() || partial_bar.low < self.low.unwrap() {
174 self.low = Some(partial_bar.low);
175 }
176
177 if self.close.is_none() {
178 self.close = Some(partial_bar.close);
179 }
180
181 self.volume = partial_bar.volume;
182
183 if self.ts_last == 0 {
184 self.ts_last = partial_bar.ts_init;
185 }
186
187 self.partial_set = true;
188 self.initialized = true;
189 }
190
191 pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
197 if ts_event < self.ts_last {
198 return; }
200
201 if self.open.is_none() {
202 self.open = Some(price);
203 self.high = Some(price);
204 self.low = Some(price);
205 self.initialized = true;
206 } else {
207 if price > self.high.unwrap() {
208 self.high = Some(price);
209 }
210 if price < self.low.unwrap() {
211 self.low = Some(price);
212 }
213 }
214
215 self.close = Some(price);
216 self.volume = self.volume.add(size);
217 self.count += 1;
218 self.ts_last = ts_event;
219 }
220
221 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
227 if ts_init < self.ts_last {
228 return; }
230
231 if self.open.is_none() {
232 self.open = Some(bar.open);
233 self.high = Some(bar.high);
234 self.low = Some(bar.low);
235 self.initialized = true;
236 } else {
237 if bar.high > self.high.unwrap() {
238 self.high = Some(bar.high);
239 }
240 if bar.low < self.low.unwrap() {
241 self.low = Some(bar.low);
242 }
243 }
244
245 self.close = Some(bar.close);
246 self.volume = self.volume.add(volume);
247 self.count += 1;
248 self.ts_last = ts_init;
249 }
250
251 pub fn reset(&mut self) {
255 self.open = None;
256 self.high = None;
257 self.low = None;
258 self.volume = Quantity::zero(self.size_precision);
259 self.count = 0;
260 }
261
262 pub fn build_now(&mut self) -> Bar {
264 self.build(self.ts_last, self.ts_last)
265 }
266
267 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
273 if self.open.is_none() {
274 self.open = self.last_close;
275 self.high = self.last_close;
276 self.low = self.last_close;
277 self.close = self.last_close;
278 }
279
280 if let (Some(close), Some(low)) = (self.close, self.low) {
281 if close < low {
282 self.low = Some(close);
283 }
284 }
285
286 if let (Some(close), Some(high)) = (self.close, self.high) {
287 if close > high {
288 self.high = Some(close);
289 }
290 }
291
292 let bar = Bar::new(
294 self.bar_type,
295 self.open.unwrap(),
296 self.high.unwrap(),
297 self.low.unwrap(),
298 self.close.unwrap(),
299 self.volume,
300 ts_event,
301 ts_init,
302 );
303
304 self.last_close = self.close;
305 self.reset();
306 bar
307 }
308}
309
310pub struct BarAggregatorCore<H>
312where
313 H: FnMut(Bar),
314{
315 bar_type: BarType,
316 builder: BarBuilder,
317 handler: H,
318 handler_backup: Option<H>,
319 batch_handler: Option<Box<dyn FnMut(Bar)>>,
320 await_partial: bool,
321 is_running: bool,
322 batch_mode: bool,
323}
324
325impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 f.debug_struct(stringify!(BarAggregatorCore))
328 .field("bar_type", &self.bar_type)
329 .field("builder", &self.builder)
330 .field("await_partial", &self.await_partial)
331 .field("is_running", &self.is_running)
332 .field("batch_mode", &self.batch_mode)
333 .finish()
334 }
335}
336
337impl<H> BarAggregatorCore<H>
338where
339 H: FnMut(Bar),
340{
341 pub fn new(
349 bar_type: BarType,
350 price_precision: u8,
351 size_precision: u8,
352 handler: H,
353 await_partial: bool,
354 ) -> Self {
355 Self {
356 bar_type,
357 builder: BarBuilder::new(bar_type, price_precision, size_precision),
358 handler,
359 handler_backup: None,
360 batch_handler: None,
361 await_partial,
362 is_running: false,
363 batch_mode: false,
364 }
365 }
366
367 pub const fn set_await_partial(&mut self, value: bool) {
369 self.await_partial = value;
370 }
371
372 pub const fn set_is_running(&mut self, value: bool) {
374 self.is_running = value;
375 }
376
377 pub const fn await_partial(&self) -> bool {
379 self.await_partial
380 }
381
382 pub fn set_partial(&mut self, partial_bar: Bar) {
384 self.builder.set_partial(partial_bar);
385 }
386
387 fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
388 self.builder.update(price, size, ts_event);
389 }
390
391 fn build_now_and_send(&mut self) {
392 let bar = self.builder.build_now();
393 (self.handler)(bar);
394 }
395
396 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
397 let bar = self.builder.build(ts_event, ts_init);
398
399 if self.batch_mode {
400 if let Some(handler) = &mut self.batch_handler {
401 handler(bar);
402 }
403 } else {
404 (self.handler)(bar);
405 }
406 }
407
408 pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
410 self.batch_mode = true;
411 self.batch_handler = Some(handler);
412 }
413
414 pub fn stop_batch_update(&mut self) {
416 self.batch_mode = false;
417
418 if let Some(handler) = self.handler_backup.take() {
419 self.handler = handler;
420 }
421 }
422}
423
424pub struct TickBarAggregator<H>
429where
430 H: FnMut(Bar),
431{
432 core: BarAggregatorCore<H>,
433 cum_value: f64,
434}
435
436impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 f.debug_struct(stringify!(TickBarAggregator))
439 .field("core", &self.core)
440 .field("cum_value", &self.cum_value)
441 .finish()
442 }
443}
444
445impl<H> TickBarAggregator<H>
446where
447 H: FnMut(Bar),
448{
449 pub fn new(
457 bar_type: BarType,
458 price_precision: u8,
459 size_precision: u8,
460 handler: H,
461 await_partial: bool,
462 ) -> Self {
463 Self {
464 core: BarAggregatorCore::new(
465 bar_type,
466 price_precision,
467 size_precision,
468 handler,
469 await_partial,
470 ),
471 cum_value: 0.0,
472 }
473 }
474}
475
476impl<H> BarAggregator for TickBarAggregator<H>
477where
478 H: FnMut(Bar) + 'static,
479{
480 fn bar_type(&self) -> BarType {
481 self.core.bar_type
482 }
483
484 fn is_running(&self) -> bool {
485 self.core.is_running
486 }
487
488 fn set_await_partial(&mut self, value: bool) {
489 self.core.set_await_partial(value);
490 }
491
492 fn set_is_running(&mut self, value: bool) {
493 self.core.set_is_running(value);
494 }
495
496 fn await_partial(&self) -> bool {
497 self.core.await_partial()
498 }
499
500 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
502 self.core.apply_update(price, size, ts_event);
503 let spec = self.core.bar_type.spec();
504
505 if self.core.builder.count >= spec.step.get() {
506 self.core.build_now_and_send();
507 }
508 }
509
510 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
511 let mut volume_update = volume;
512 let average_price = Price::new(
513 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
514 self.core.builder.price_precision,
515 );
516
517 while volume_update.as_f64() > 0.0 {
518 let value_update = average_price.as_f64() * volume_update.as_f64();
519 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
520 self.cum_value += value_update;
521 self.core.builder.update_bar(bar, volume_update, ts_init);
522 break;
523 }
524
525 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
526 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
527 self.core.builder.update_bar(
528 bar,
529 Quantity::new(volume_diff, volume_update.precision),
530 ts_init,
531 );
532
533 self.core.build_now_and_send();
534 self.cum_value = 0.0;
535 volume_update = Quantity::new(
536 volume_update.as_f64() - volume_diff,
537 volume_update.precision,
538 );
539 }
540 }
541
542 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
543 self.core.start_batch_update(handler);
544 }
545
546 fn stop_batch_update(&mut self) {
547 self.core.stop_batch_update();
548 }
549
550 fn set_partial(&mut self, partial_bar: Bar) {
551 self.core.set_partial(partial_bar);
552 }
553}
554
555pub struct VolumeBarAggregator<H>
557where
558 H: FnMut(Bar),
559{
560 core: BarAggregatorCore<H>,
561}
562
563impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
564 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
565 f.debug_struct(stringify!(VolumeBarAggregator))
566 .field("core", &self.core)
567 .finish()
568 }
569}
570
571impl<H> VolumeBarAggregator<H>
572where
573 H: FnMut(Bar),
574{
575 pub fn new(
583 bar_type: BarType,
584 price_precision: u8,
585 size_precision: u8,
586 handler: H,
587 await_partial: bool,
588 ) -> Self {
589 Self {
590 core: BarAggregatorCore::new(
591 bar_type.standard(),
592 price_precision,
593 size_precision,
594 handler,
595 await_partial,
596 ),
597 }
598 }
599}
600
601impl<H> BarAggregator for VolumeBarAggregator<H>
602where
603 H: FnMut(Bar) + 'static,
604{
605 fn bar_type(&self) -> BarType {
606 self.core.bar_type
607 }
608
609 fn is_running(&self) -> bool {
610 self.core.is_running
611 }
612
613 fn set_await_partial(&mut self, value: bool) {
614 self.core.set_await_partial(value);
615 }
616
617 fn set_is_running(&mut self, value: bool) {
618 self.core.set_is_running(value);
619 }
620
621 fn await_partial(&self) -> bool {
622 self.core.await_partial()
623 }
624
625 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
627 let mut raw_size_update = size.raw;
628 let spec = self.core.bar_type.spec();
629 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
630
631 while raw_size_update > 0 {
632 if self.core.builder.volume.raw + raw_size_update < raw_step {
633 self.core.apply_update(
634 price,
635 Quantity::from_raw(raw_size_update, size.precision),
636 ts_event,
637 );
638 break;
639 }
640
641 let raw_size_diff = raw_step - self.core.builder.volume.raw;
642 self.core.apply_update(
643 price,
644 Quantity::from_raw(raw_size_diff, size.precision),
645 ts_event,
646 );
647
648 self.core.build_now_and_send();
649 raw_size_update -= raw_size_diff;
650 }
651 }
652
653 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
654 let mut raw_volume_update = volume.raw;
655 let spec = self.core.bar_type.spec();
656 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
657
658 while raw_volume_update > 0 {
659 if self.core.builder.volume.raw + raw_volume_update < raw_step {
660 self.core.builder.update_bar(
661 bar,
662 Quantity::from_raw(raw_volume_update, volume.precision),
663 ts_init,
664 );
665 break;
666 }
667
668 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
669 self.core.builder.update_bar(
670 bar,
671 Quantity::from_raw(raw_volume_diff, volume.precision),
672 ts_init,
673 );
674
675 self.core.build_now_and_send();
676 raw_volume_update -= raw_volume_diff;
677 }
678 }
679
680 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
681 self.core.start_batch_update(handler);
682 }
683
684 fn stop_batch_update(&mut self) {
685 self.core.stop_batch_update();
686 }
687
688 fn set_partial(&mut self, partial_bar: Bar) {
689 self.core.set_partial(partial_bar);
690 }
691}
692
693pub struct ValueBarAggregator<H>
698where
699 H: FnMut(Bar),
700{
701 core: BarAggregatorCore<H>,
702 cum_value: f64,
703}
704
705impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707 f.debug_struct(stringify!(ValueBarAggregator))
708 .field("core", &self.core)
709 .field("cum_value", &self.cum_value)
710 .finish()
711 }
712}
713
714impl<H> ValueBarAggregator<H>
715where
716 H: FnMut(Bar),
717{
718 pub fn new(
726 bar_type: BarType,
727 price_precision: u8,
728 size_precision: u8,
729 handler: H,
730 await_partial: bool,
731 ) -> Self {
732 Self {
733 core: BarAggregatorCore::new(
734 bar_type.standard(),
735 price_precision,
736 size_precision,
737 handler,
738 await_partial,
739 ),
740 cum_value: 0.0,
741 }
742 }
743
744 #[must_use]
745 pub const fn get_cumulative_value(&self) -> f64 {
747 self.cum_value
748 }
749}
750
751impl<H> BarAggregator for ValueBarAggregator<H>
752where
753 H: FnMut(Bar) + 'static,
754{
755 fn bar_type(&self) -> BarType {
756 self.core.bar_type
757 }
758
759 fn is_running(&self) -> bool {
760 self.core.is_running
761 }
762
763 fn set_await_partial(&mut self, value: bool) {
764 self.core.set_await_partial(value);
765 }
766
767 fn set_is_running(&mut self, value: bool) {
768 self.core.set_is_running(value);
769 }
770
771 fn await_partial(&self) -> bool {
772 self.core.await_partial()
773 }
774
775 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
777 let mut size_update = size.as_f64();
778 let spec = self.core.bar_type.spec();
779
780 while size_update > 0.0 {
781 let value_update = price.as_f64() * size_update;
782 if self.cum_value + value_update < spec.step.get() as f64 {
783 self.cum_value += value_update;
784 self.core
785 .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
786 break;
787 }
788
789 let value_diff = spec.step.get() as f64 - self.cum_value;
790 let size_diff = size_update * (value_diff / value_update);
791 self.core
792 .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
793
794 self.core.build_now_and_send();
795 self.cum_value = 0.0;
796 size_update -= size_diff;
797 }
798 }
799
800 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
801 let mut volume_update = volume;
802 let average_price = Price::new(
803 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
804 self.core.builder.price_precision,
805 );
806
807 while volume_update.as_f64() > 0.0 {
808 let value_update = average_price.as_f64() * volume_update.as_f64();
809 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
810 self.cum_value += value_update;
811 self.core.builder.update_bar(bar, volume_update, ts_init);
812 break;
813 }
814
815 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
816 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
817 self.core.builder.update_bar(
818 bar,
819 Quantity::new(volume_diff, volume_update.precision),
820 ts_init,
821 );
822
823 self.core.build_now_and_send();
824 self.cum_value = 0.0;
825 volume_update = Quantity::new(
826 volume_update.as_f64() - volume_diff,
827 volume_update.precision,
828 );
829 }
830 }
831
832 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
833 self.core.start_batch_update(handler);
834 }
835
836 fn stop_batch_update(&mut self) {
837 self.core.stop_batch_update();
838 }
839
840 fn set_partial(&mut self, partial_bar: Bar) {
841 self.core.set_partial(partial_bar);
842 }
843}
844
845pub struct TimeBarAggregator<H>
849where
850 H: FnMut(Bar),
851{
852 core: BarAggregatorCore<H>,
853 clock: Rc<RefCell<dyn Clock>>,
854 build_with_no_updates: bool,
855 timestamp_on_close: bool,
856 is_left_open: bool,
857 build_on_next_tick: bool,
858 stored_open_ns: UnixNanos,
859 stored_close_ns: UnixNanos,
860 timer_name: String,
861 interval_ns: UnixNanos,
862 next_close_ns: UnixNanos,
863 composite_bar_build_delay: i64,
864 add_delay: bool,
865 batch_open_ns: UnixNanos,
866 batch_next_close_ns: UnixNanos,
867 time_bars_origin: Option<TimeDelta>,
868 skip_first_non_full_bar: bool,
869}
870
871impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
872 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
873 f.debug_struct(stringify!(TimeBarAggregator))
874 .field("core", &self.core)
875 .field("build_with_no_updates", &self.build_with_no_updates)
876 .field("timestamp_on_close", &self.timestamp_on_close)
877 .field("is_left_open", &self.is_left_open)
878 .field("timer_name", &self.timer_name)
879 .field("interval_ns", &self.interval_ns)
880 .field("composite_bar_build_delay", &self.composite_bar_build_delay)
881 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
882 .finish()
883 }
884}
885
886#[derive(Clone, Debug)]
887pub struct NewBarCallback<H: FnMut(Bar)> {
888 aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
889}
890
891impl<H: FnMut(Bar)> NewBarCallback<H> {
892 #[must_use]
894 pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
895 Self { aggregator }
896 }
897}
898
899impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
900 fn from(value: NewBarCallback<H>) -> Self {
901 Self::Rust(Rc::new(move |event: TimeEvent| {
902 value.aggregator.borrow_mut().build_bar(event);
903 }))
904 }
905}
906
907impl<H> TimeBarAggregator<H>
908where
909 H: FnMut(Bar) + 'static,
910{
911 #[allow(clippy::too_many_arguments)]
919 pub fn new(
920 bar_type: BarType,
921 price_precision: u8,
922 size_precision: u8,
923 clock: Rc<RefCell<dyn Clock>>,
924 handler: H,
925 await_partial: bool,
926 build_with_no_updates: bool,
927 timestamp_on_close: bool,
928 interval_type: BarIntervalType,
929 time_bars_origin: Option<TimeDelta>,
930 composite_bar_build_delay: i64,
931 skip_first_non_full_bar: bool,
932 ) -> Self {
933 let is_left_open = match interval_type {
934 BarIntervalType::LeftOpen => true,
935 BarIntervalType::RightOpen => false,
936 };
937
938 let add_delay = bar_type.is_composite()
939 && bar_type.composite().aggregation_source() == AggregationSource::Internal;
940
941 let core = BarAggregatorCore::new(
942 bar_type.standard(),
943 price_precision,
944 size_precision,
945 handler,
946 await_partial,
947 );
948
949 Self {
950 core,
951 clock,
952 build_with_no_updates,
953 timestamp_on_close,
954 is_left_open,
955 build_on_next_tick: false,
956 stored_open_ns: UnixNanos::default(),
957 stored_close_ns: UnixNanos::default(),
958 timer_name: bar_type.to_string(),
959 interval_ns: get_bar_interval_ns(&bar_type),
960 next_close_ns: UnixNanos::default(),
961 composite_bar_build_delay,
962 add_delay,
963 batch_open_ns: UnixNanos::default(),
964 batch_next_close_ns: UnixNanos::default(),
965 time_bars_origin,
966 skip_first_non_full_bar,
967 }
968 }
969
970 pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
980 let now = self.clock.borrow().utc_now();
981 let mut start_time = get_time_bar_start(now, &self.bar_type(), self.time_bars_origin);
982
983 if start_time == now {
984 self.skip_first_non_full_bar = false;
985 }
986
987 if self.add_delay {
988 start_time += TimeDelta::microseconds(self.composite_bar_build_delay);
989 }
990
991 let spec = &self.bar_type().spec();
992 let start_time_ns = UnixNanos::from(start_time);
993
994 if spec.aggregation == BarAggregation::Month {
995 let step = spec.step.get() as u32;
996 let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
997
998 self.clock
999 .borrow_mut()
1000 .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1001 .expect(FAILED);
1002 } else {
1003 self.clock
1004 .borrow_mut()
1005 .set_timer_ns(
1006 &self.timer_name,
1007 self.interval_ns.as_u64(),
1008 start_time_ns,
1009 None,
1010 Some(callback.into()),
1011 None,
1012 )
1013 .expect(FAILED);
1014 }
1015
1016 log::debug!("Started timer {}", self.timer_name);
1017 Ok(())
1018 }
1019
1020 pub fn stop(&mut self) {
1022 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1023 }
1024
1025 pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1031 let spec = self.bar_type().spec();
1032 self.core.batch_mode = true;
1033
1034 let time = time_ns.to_datetime_utc();
1035 let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin);
1036 self.batch_open_ns = UnixNanos::from(start_time);
1037
1038 if spec.aggregation == BarAggregation::Month {
1039 let step = spec.step.get() as u32;
1040
1041 if self.batch_open_ns == time_ns {
1042 self.batch_open_ns =
1043 subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1044 }
1045
1046 self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1047 } else {
1048 if self.batch_open_ns == time_ns {
1049 self.batch_open_ns -= self.interval_ns;
1050 }
1051
1052 self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1053 }
1054 }
1055
1056 const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1057 if self.is_left_open {
1058 if self.timestamp_on_close {
1059 close_ns
1060 } else {
1061 open_ns
1062 }
1063 } else {
1064 open_ns
1065 }
1066 }
1067
1068 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1069 if self.skip_first_non_full_bar {
1070 self.core.builder.reset();
1071 self.skip_first_non_full_bar = false;
1072 } else {
1073 self.core.build_and_send(ts_event, ts_init);
1074 }
1075 }
1076
1077 fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1078 if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1079 let ts_init = self.batch_next_close_ns;
1080 let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1081 self.build_and_send(ts_event, ts_init);
1082 }
1083 }
1084
1085 fn batch_post_update(&mut self, time_ns: UnixNanos) {
1086 let step = self.bar_type().spec().step.get() as u32;
1087
1088 if !self.core.batch_mode
1090 && time_ns == self.batch_next_close_ns
1091 && time_ns > self.stored_open_ns
1092 {
1093 self.batch_next_close_ns = UnixNanos::default();
1094 return;
1095 }
1096
1097 if time_ns > self.batch_next_close_ns {
1098 if self.bar_type().spec().aggregation == BarAggregation::Month {
1100 while self.batch_next_close_ns < time_ns {
1101 self.batch_next_close_ns =
1102 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1103 }
1104
1105 self.batch_open_ns =
1106 subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1107 } else {
1108 while self.batch_next_close_ns < time_ns {
1109 self.batch_next_close_ns += self.interval_ns;
1110 }
1111
1112 self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1113 }
1114 }
1115
1116 if time_ns == self.batch_next_close_ns {
1117 let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1118 self.build_and_send(ts_event, time_ns);
1119 self.batch_open_ns = self.batch_next_close_ns;
1120
1121 if self.bar_type().spec().aggregation == BarAggregation::Month {
1122 self.batch_next_close_ns =
1123 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1124 } else {
1125 self.batch_next_close_ns += self.interval_ns;
1126 }
1127 }
1128
1129 if !self.core.batch_mode {
1131 self.batch_next_close_ns = UnixNanos::default();
1132 }
1133 }
1134
1135 fn build_bar(&mut self, event: TimeEvent) {
1136 if !self.core.builder.initialized {
1137 self.build_on_next_tick = true;
1138 self.stored_close_ns = self.next_close_ns;
1139 return;
1140 }
1141
1142 if !self.build_with_no_updates && self.core.builder.count == 0 {
1143 return;
1144 }
1145
1146 let ts_init = event.ts_event;
1147 let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1148 self.build_and_send(ts_event, ts_init);
1149
1150 self.stored_open_ns = ts_init;
1151
1152 if self.bar_type().spec().aggregation == BarAggregation::Month {
1153 let step = self.bar_type().spec().step.get() as u32;
1154 let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1155
1156 self.clock
1157 .borrow_mut()
1158 .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1159 .expect(FAILED);
1160
1161 self.next_close_ns = next_alert_ns;
1162 } else {
1163 self.next_close_ns = self
1164 .clock
1165 .borrow()
1166 .next_time_ns(&self.timer_name)
1167 .unwrap_or_default();
1168 }
1169 }
1170}
1171
1172impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1173where
1174 H: FnMut(Bar) + 'static,
1175{
1176 fn bar_type(&self) -> BarType {
1177 self.core.bar_type
1178 }
1179
1180 fn is_running(&self) -> bool {
1181 self.core.is_running
1182 }
1183
1184 fn set_await_partial(&mut self, value: bool) {
1185 self.core.set_await_partial(value);
1186 }
1187
1188 fn set_is_running(&mut self, value: bool) {
1189 self.core.set_is_running(value);
1190 }
1191
1192 fn await_partial(&self) -> bool {
1193 self.core.await_partial()
1194 }
1195 fn stop(&mut self) {
1197 Self::stop(self);
1198 }
1199
1200 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1201 if self.batch_next_close_ns != UnixNanos::default() {
1202 self.batch_pre_update(ts_event);
1203 }
1204
1205 self.core.apply_update(price, size, ts_event);
1206
1207 if self.build_on_next_tick {
1208 if ts_event <= self.stored_close_ns {
1209 let ts_init = ts_event;
1210 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1211 self.build_and_send(ts_event, ts_init);
1212 }
1213
1214 self.build_on_next_tick = false;
1215 self.stored_close_ns = UnixNanos::default();
1216 }
1217
1218 if self.batch_next_close_ns != UnixNanos::default() {
1219 self.batch_post_update(ts_event);
1220 }
1221 }
1222
1223 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1224 if self.batch_next_close_ns != UnixNanos::default() {
1225 self.batch_pre_update(ts_init);
1226 }
1227
1228 self.core.builder.update_bar(bar, volume, ts_init);
1229
1230 if self.build_on_next_tick {
1231 if ts_init <= self.stored_close_ns {
1232 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1233 self.build_and_send(ts_event, ts_init);
1234 }
1235
1236 self.build_on_next_tick = false;
1238 self.stored_close_ns = UnixNanos::default();
1239 }
1240
1241 if self.batch_next_close_ns != UnixNanos::default() {
1242 self.batch_post_update(ts_init);
1243 }
1244 }
1245
1246 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1247 self.core.start_batch_update(handler);
1248 self.start_batch_time(time_ns);
1249 }
1250
1251 fn stop_batch_update(&mut self) {
1252 self.core.stop_batch_update();
1253 }
1254
1255 fn set_partial(&mut self, partial_bar: Bar) {
1256 self.core.set_partial(partial_bar);
1257 }
1258}
1259
1260#[cfg(test)]
1264mod tests {
1265 use std::sync::{Arc, Mutex};
1266
1267 use nautilus_common::clock::TestClock;
1268 use nautilus_core::UUID4;
1269 use nautilus_model::{
1270 data::{BarSpecification, BarType},
1271 enums::{AggregationSource, BarAggregation, PriceType},
1272 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1273 types::{Price, Quantity},
1274 };
1275 use rstest::rstest;
1276 use ustr::Ustr;
1277
1278 use super::*;
1279
1280 #[rstest]
1281 fn test_bar_builder_initialization(equity_aapl: Equity) {
1282 let instrument = InstrumentAny::Equity(equity_aapl);
1283 let bar_type = BarType::new(
1284 instrument.id(),
1285 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1286 AggregationSource::Internal,
1287 );
1288 let builder = BarBuilder::new(
1289 bar_type,
1290 instrument.price_precision(),
1291 instrument.size_precision(),
1292 );
1293
1294 assert!(!builder.initialized);
1295 assert_eq!(builder.ts_last, 0);
1296 assert_eq!(builder.count, 0);
1297 }
1298
1299 #[rstest]
1300 fn test_set_partial_update(equity_aapl: Equity) {
1301 let instrument = InstrumentAny::Equity(equity_aapl);
1302 let bar_type = BarType::new(
1303 instrument.id(),
1304 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1305 AggregationSource::Internal,
1306 );
1307 let mut builder = BarBuilder::new(
1308 bar_type,
1309 instrument.price_precision(),
1310 instrument.size_precision(),
1311 );
1312
1313 let partial_bar = Bar::new(
1314 bar_type,
1315 Price::from("101.00"),
1316 Price::from("102.00"),
1317 Price::from("100.00"),
1318 Price::from("101.00"),
1319 Quantity::from(100),
1320 UnixNanos::from(1),
1321 UnixNanos::from(2),
1322 );
1323
1324 builder.set_partial(partial_bar);
1325 let bar = builder.build_now();
1326
1327 assert_eq!(bar.open, partial_bar.open);
1328 assert_eq!(bar.high, partial_bar.high);
1329 assert_eq!(bar.low, partial_bar.low);
1330 assert_eq!(bar.close, partial_bar.close);
1331 assert_eq!(bar.volume, partial_bar.volume);
1332 assert_eq!(builder.ts_last, 2);
1333 }
1334
1335 #[rstest]
1336 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1337 let instrument = InstrumentAny::Equity(equity_aapl);
1338 let bar_type = BarType::new(
1339 instrument.id(),
1340 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1341 AggregationSource::Internal,
1342 );
1343 let mut builder = BarBuilder::new(
1344 bar_type,
1345 instrument.price_precision(),
1346 instrument.size_precision(),
1347 );
1348
1349 builder.update(
1350 Price::from("100.00"),
1351 Quantity::from(1),
1352 UnixNanos::from(1000),
1353 );
1354 builder.update(
1355 Price::from("95.00"),
1356 Quantity::from(1),
1357 UnixNanos::from(2000),
1358 );
1359 builder.update(
1360 Price::from("105.00"),
1361 Quantity::from(1),
1362 UnixNanos::from(3000),
1363 );
1364
1365 let bar = builder.build_now();
1366 assert!(bar.high > bar.low);
1367 assert_eq!(bar.open, Price::from("100.00"));
1368 assert_eq!(bar.high, Price::from("105.00"));
1369 assert_eq!(bar.low, Price::from("95.00"));
1370 assert_eq!(bar.close, Price::from("105.00"));
1371 }
1372
1373 #[rstest]
1374 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1375 let instrument = InstrumentAny::Equity(equity_aapl);
1376 let bar_type = BarType::new(
1377 instrument.id(),
1378 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1379 AggregationSource::Internal,
1380 );
1381 let mut builder = BarBuilder::new(
1382 bar_type,
1383 instrument.price_precision(),
1384 instrument.size_precision(),
1385 );
1386
1387 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1388 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1389
1390 assert_eq!(builder.ts_last, 1_000);
1391 assert_eq!(builder.count, 1);
1392 }
1393
1394 #[rstest]
1395 fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1396 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1397 let bar_type = BarType::new(
1398 instrument.id(),
1399 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1400 AggregationSource::Internal,
1401 );
1402 let mut builder = BarBuilder::new(
1403 bar_type,
1404 instrument.price_precision(),
1405 instrument.size_precision(),
1406 );
1407
1408 let partial_bar = Bar::new(
1409 bar_type,
1410 Price::from("1.00001"),
1411 Price::from("1.00010"),
1412 Price::from("1.00000"),
1413 Price::from("1.00002"),
1414 Quantity::from(1),
1415 UnixNanos::from(1_000_000_000),
1416 UnixNanos::from(2_000_000_000),
1417 );
1418
1419 builder.set_partial(partial_bar);
1420 let bar = builder.build_now();
1421
1422 assert_eq!(bar.open, Price::from("1.00001"));
1423 assert_eq!(bar.high, Price::from("1.00010"));
1424 assert_eq!(bar.low, Price::from("1.00000"));
1425 assert_eq!(bar.close, Price::from("1.00002"));
1426 assert_eq!(bar.volume, Quantity::from(1));
1427 assert_eq!(bar.ts_init, 2_000_000_000);
1428 assert_eq!(builder.ts_last, 2_000_000_000);
1429 }
1430
1431 #[rstest]
1432 fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1433 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1434 let bar_type = BarType::new(
1435 instrument.id(),
1436 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1437 AggregationSource::Internal,
1438 );
1439 let mut builder = BarBuilder::new(
1440 bar_type,
1441 instrument.price_precision(),
1442 instrument.size_precision(),
1443 );
1444
1445 let partial_bar1 = Bar::new(
1446 bar_type,
1447 Price::from("1.00001"),
1448 Price::from("1.00010"),
1449 Price::from("1.00000"),
1450 Price::from("1.00002"),
1451 Quantity::from(1),
1452 UnixNanos::from(1_000_000_000),
1453 UnixNanos::from(1_000_000_000),
1454 );
1455
1456 let partial_bar2 = Bar::new(
1457 bar_type,
1458 Price::from("2.00001"),
1459 Price::from("2.00010"),
1460 Price::from("2.00000"),
1461 Price::from("2.00002"),
1462 Quantity::from(2),
1463 UnixNanos::from(3_000_000_000),
1464 UnixNanos::from(3_000_000_000),
1465 );
1466
1467 builder.set_partial(partial_bar1);
1468 builder.set_partial(partial_bar2);
1469 let bar = builder.build(
1470 UnixNanos::from(4_000_000_000),
1471 UnixNanos::from(4_000_000_000),
1472 );
1473
1474 assert_eq!(bar.open, Price::from("1.00001"));
1475 assert_eq!(bar.high, Price::from("1.00010"));
1476 assert_eq!(bar.low, Price::from("1.00000"));
1477 assert_eq!(bar.close, Price::from("1.00002"));
1478 assert_eq!(bar.volume, Quantity::from(1));
1479 assert_eq!(bar.ts_init, 4_000_000_000);
1480 assert_eq!(builder.ts_last, 1_000_000_000);
1481 }
1482
1483 #[rstest]
1484 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1485 let instrument = InstrumentAny::Equity(equity_aapl);
1486 let bar_type = BarType::new(
1487 instrument.id(),
1488 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1489 AggregationSource::Internal,
1490 );
1491 let mut builder = BarBuilder::new(
1492 bar_type,
1493 instrument.price_precision(),
1494 instrument.size_precision(),
1495 );
1496
1497 builder.update(
1498 Price::from("1.00000"),
1499 Quantity::from(1),
1500 UnixNanos::default(),
1501 );
1502
1503 assert!(builder.initialized);
1504 assert_eq!(builder.ts_last, 0);
1505 assert_eq!(builder.count, 1);
1506 }
1507
1508 #[rstest]
1509 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1510 equity_aapl: Equity,
1511 ) {
1512 let instrument = InstrumentAny::Equity(equity_aapl);
1513 let bar_type = BarType::new(
1514 instrument.id(),
1515 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1516 AggregationSource::Internal,
1517 );
1518 let mut builder = BarBuilder::new(bar_type, 2, 0);
1519
1520 builder.update(
1521 Price::from("1.00000"),
1522 Quantity::from(1),
1523 UnixNanos::from(1_000),
1524 );
1525 builder.update(
1526 Price::from("1.00001"),
1527 Quantity::from(1),
1528 UnixNanos::from(500),
1529 );
1530
1531 assert!(builder.initialized);
1532 assert_eq!(builder.ts_last, 1_000);
1533 assert_eq!(builder.count, 1);
1534 }
1535
1536 #[rstest]
1537 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1538 let instrument = InstrumentAny::Equity(equity_aapl);
1539 let bar_type = BarType::new(
1540 instrument.id(),
1541 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1542 AggregationSource::Internal,
1543 );
1544 let mut builder = BarBuilder::new(
1545 bar_type,
1546 instrument.price_precision(),
1547 instrument.size_precision(),
1548 );
1549
1550 for _ in 0..5 {
1551 builder.update(
1552 Price::from("1.00000"),
1553 Quantity::from(1),
1554 UnixNanos::from(1_000),
1555 );
1556 }
1557
1558 assert_eq!(builder.count, 5);
1559 }
1560
1561 #[rstest]
1562 #[should_panic]
1563 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1564 let instrument = InstrumentAny::Equity(equity_aapl);
1565 let bar_type = BarType::new(
1566 instrument.id(),
1567 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1568 AggregationSource::Internal,
1569 );
1570 let mut builder = BarBuilder::new(
1571 bar_type,
1572 instrument.price_precision(),
1573 instrument.size_precision(),
1574 );
1575 let _ = builder.build_now();
1576 }
1577
1578 #[rstest]
1579 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1580 let instrument = InstrumentAny::Equity(equity_aapl);
1581 let bar_type = BarType::new(
1582 instrument.id(),
1583 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1584 AggregationSource::Internal,
1585 );
1586 let mut builder = BarBuilder::new(
1587 bar_type,
1588 instrument.price_precision(),
1589 instrument.size_precision(),
1590 );
1591
1592 builder.update(
1593 Price::from("1.00001"),
1594 Quantity::from(2),
1595 UnixNanos::default(),
1596 );
1597 builder.update(
1598 Price::from("1.00002"),
1599 Quantity::from(2),
1600 UnixNanos::default(),
1601 );
1602 builder.update(
1603 Price::from("1.00000"),
1604 Quantity::from(1),
1605 UnixNanos::from(1_000_000_000),
1606 );
1607
1608 let bar = builder.build_now();
1609
1610 assert_eq!(bar.open, Price::from("1.00001"));
1611 assert_eq!(bar.high, Price::from("1.00002"));
1612 assert_eq!(bar.low, Price::from("1.00000"));
1613 assert_eq!(bar.close, Price::from("1.00000"));
1614 assert_eq!(bar.volume, Quantity::from(5));
1615 assert_eq!(bar.ts_init, 1_000_000_000);
1616 assert_eq!(builder.ts_last, 1_000_000_000);
1617 assert_eq!(builder.count, 0);
1618 }
1619
1620 #[rstest]
1621 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1622 let instrument = InstrumentAny::Equity(equity_aapl);
1623 let bar_type = BarType::new(
1624 instrument.id(),
1625 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1626 AggregationSource::Internal,
1627 );
1628 let mut builder = BarBuilder::new(bar_type, 2, 0);
1629
1630 builder.update(
1631 Price::from("1.00001"),
1632 Quantity::from(1),
1633 UnixNanos::default(),
1634 );
1635 builder.build_now();
1636
1637 builder.update(
1638 Price::from("1.00000"),
1639 Quantity::from(1),
1640 UnixNanos::default(),
1641 );
1642 builder.update(
1643 Price::from("1.00003"),
1644 Quantity::from(1),
1645 UnixNanos::default(),
1646 );
1647 builder.update(
1648 Price::from("1.00002"),
1649 Quantity::from(1),
1650 UnixNanos::default(),
1651 );
1652
1653 let bar = builder.build_now();
1654
1655 assert_eq!(bar.open, Price::from("1.00000"));
1656 assert_eq!(bar.high, Price::from("1.00003"));
1657 assert_eq!(bar.low, Price::from("1.00000"));
1658 assert_eq!(bar.close, Price::from("1.00002"));
1659 assert_eq!(bar.volume, Quantity::from(3));
1660 }
1661
1662 #[rstest]
1663 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1664 let instrument = InstrumentAny::Equity(equity_aapl);
1665 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1666 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1667 let handler = Arc::new(Mutex::new(Vec::new()));
1668 let handler_clone = Arc::clone(&handler);
1669
1670 let mut aggregator = TickBarAggregator::new(
1671 bar_type,
1672 instrument.price_precision(),
1673 instrument.size_precision(),
1674 move |bar: Bar| {
1675 let mut handler_guard = handler_clone.lock().unwrap();
1676 handler_guard.push(bar);
1677 },
1678 false,
1679 );
1680
1681 let trade = TradeTick::default();
1682 aggregator.handle_trade(trade);
1683
1684 let handler_guard = handler.lock().unwrap();
1685 assert_eq!(handler_guard.len(), 0);
1686 }
1687
1688 #[rstest]
1689 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1690 let instrument = InstrumentAny::Equity(equity_aapl);
1691 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1692 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1693 let handler = Arc::new(Mutex::new(Vec::new()));
1694 let handler_clone = Arc::clone(&handler);
1695
1696 let mut aggregator = TickBarAggregator::new(
1697 bar_type,
1698 instrument.price_precision(),
1699 instrument.size_precision(),
1700 move |bar: Bar| {
1701 let mut handler_guard = handler_clone.lock().unwrap();
1702 handler_guard.push(bar);
1703 },
1704 false,
1705 );
1706
1707 let trade = TradeTick::default();
1708 aggregator.handle_trade(trade);
1709 aggregator.handle_trade(trade);
1710 aggregator.handle_trade(trade);
1711
1712 let handler_guard = handler.lock().unwrap();
1713 let bar = handler_guard.first().unwrap();
1714 assert_eq!(handler_guard.len(), 1);
1715 assert_eq!(bar.open, trade.price);
1716 assert_eq!(bar.high, trade.price);
1717 assert_eq!(bar.low, trade.price);
1718 assert_eq!(bar.close, trade.price);
1719 assert_eq!(bar.volume, Quantity::from(300000));
1720 assert_eq!(bar.ts_event, trade.ts_event);
1721 assert_eq!(bar.ts_init, trade.ts_init);
1722 }
1723
1724 #[rstest]
1725 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1726 let instrument = InstrumentAny::Equity(equity_aapl);
1727 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1728 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1729 let handler = Arc::new(Mutex::new(Vec::new()));
1730 let handler_clone = Arc::clone(&handler);
1731
1732 let mut aggregator = TickBarAggregator::new(
1733 bar_type,
1734 instrument.price_precision(),
1735 instrument.size_precision(),
1736 move |bar: Bar| {
1737 let mut handler_guard = handler_clone.lock().unwrap();
1738 handler_guard.push(bar);
1739 },
1740 false,
1741 );
1742
1743 aggregator.update(
1744 Price::from("1.00001"),
1745 Quantity::from(1),
1746 UnixNanos::default(),
1747 );
1748 aggregator.update(
1749 Price::from("1.00002"),
1750 Quantity::from(1),
1751 UnixNanos::from(1000),
1752 );
1753 aggregator.update(
1754 Price::from("1.00003"),
1755 Quantity::from(1),
1756 UnixNanos::from(2000),
1757 );
1758
1759 let handler_guard = handler.lock().unwrap();
1760 assert_eq!(handler_guard.len(), 1);
1761
1762 let bar = handler_guard.first().unwrap();
1763 assert_eq!(bar.open, Price::from("1.00001"));
1764 assert_eq!(bar.high, Price::from("1.00003"));
1765 assert_eq!(bar.low, Price::from("1.00001"));
1766 assert_eq!(bar.close, Price::from("1.00003"));
1767 assert_eq!(bar.volume, Quantity::from(3));
1768 }
1769
1770 #[rstest]
1771 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1772 let instrument = InstrumentAny::Equity(equity_aapl);
1773 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1774 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1775 let handler = Arc::new(Mutex::new(Vec::new()));
1776 let handler_clone = Arc::clone(&handler);
1777
1778 let mut aggregator = TickBarAggregator::new(
1779 bar_type,
1780 instrument.price_precision(),
1781 instrument.size_precision(),
1782 move |bar: Bar| {
1783 let mut handler_guard = handler_clone.lock().unwrap();
1784 handler_guard.push(bar);
1785 },
1786 false,
1787 );
1788
1789 aggregator.update(
1790 Price::from("1.00001"),
1791 Quantity::from(1),
1792 UnixNanos::default(),
1793 );
1794 aggregator.update(
1795 Price::from("1.00002"),
1796 Quantity::from(1),
1797 UnixNanos::from(1000),
1798 );
1799 aggregator.update(
1800 Price::from("1.00003"),
1801 Quantity::from(1),
1802 UnixNanos::from(2000),
1803 );
1804 aggregator.update(
1805 Price::from("1.00004"),
1806 Quantity::from(1),
1807 UnixNanos::from(3000),
1808 );
1809
1810 let handler_guard = handler.lock().unwrap();
1811 assert_eq!(handler_guard.len(), 2);
1812
1813 let bar1 = &handler_guard[0];
1814 assert_eq!(bar1.open, Price::from("1.00001"));
1815 assert_eq!(bar1.close, Price::from("1.00002"));
1816 assert_eq!(bar1.volume, Quantity::from(2));
1817
1818 let bar2 = &handler_guard[1];
1819 assert_eq!(bar2.open, Price::from("1.00003"));
1820 assert_eq!(bar2.close, Price::from("1.00004"));
1821 assert_eq!(bar2.volume, Quantity::from(2));
1822 }
1823
1824 #[rstest]
1825 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1826 let instrument = InstrumentAny::Equity(equity_aapl);
1827 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1828 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1829 let handler = Arc::new(Mutex::new(Vec::new()));
1830 let handler_clone = Arc::clone(&handler);
1831
1832 let mut aggregator = VolumeBarAggregator::new(
1833 bar_type,
1834 instrument.price_precision(),
1835 instrument.size_precision(),
1836 move |bar: Bar| {
1837 let mut handler_guard = handler_clone.lock().unwrap();
1838 handler_guard.push(bar);
1839 },
1840 false,
1841 );
1842
1843 aggregator.update(
1844 Price::from("1.00001"),
1845 Quantity::from(25),
1846 UnixNanos::default(),
1847 );
1848
1849 let handler_guard = handler.lock().unwrap();
1850 assert_eq!(handler_guard.len(), 2);
1851 let bar1 = &handler_guard[0];
1852 assert_eq!(bar1.volume, Quantity::from(10));
1853 let bar2 = &handler_guard[1];
1854 assert_eq!(bar2.volume, Quantity::from(10));
1855 }
1856
1857 #[rstest]
1858 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1859 let instrument = InstrumentAny::Equity(equity_aapl);
1860 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1862 let handler = Arc::new(Mutex::new(Vec::new()));
1863 let handler_clone = Arc::clone(&handler);
1864
1865 let mut aggregator = ValueBarAggregator::new(
1866 bar_type,
1867 instrument.price_precision(),
1868 instrument.size_precision(),
1869 move |bar: Bar| {
1870 let mut handler_guard = handler_clone.lock().unwrap();
1871 handler_guard.push(bar);
1872 },
1873 false,
1874 );
1875
1876 aggregator.update(
1878 Price::from("100.00"),
1879 Quantity::from(5),
1880 UnixNanos::default(),
1881 );
1882 aggregator.update(
1883 Price::from("100.00"),
1884 Quantity::from(5),
1885 UnixNanos::from(1000),
1886 );
1887
1888 let handler_guard = handler.lock().unwrap();
1889 assert_eq!(handler_guard.len(), 1);
1890 let bar = handler_guard.first().unwrap();
1891 assert_eq!(bar.volume, Quantity::from(10));
1892 }
1893
1894 #[rstest]
1895 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1896 let instrument = InstrumentAny::Equity(equity_aapl);
1897 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1898 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1899 let handler = Arc::new(Mutex::new(Vec::new()));
1900 let handler_clone = Arc::clone(&handler);
1901
1902 let mut aggregator = ValueBarAggregator::new(
1903 bar_type,
1904 instrument.price_precision(),
1905 instrument.size_precision(),
1906 move |bar: Bar| {
1907 let mut handler_guard = handler_clone.lock().unwrap();
1908 handler_guard.push(bar);
1909 },
1910 false,
1911 );
1912
1913 aggregator.update(
1915 Price::from("100.00"),
1916 Quantity::from(25),
1917 UnixNanos::default(),
1918 );
1919
1920 let handler_guard = handler.lock().unwrap();
1921 assert_eq!(handler_guard.len(), 2);
1922 let remaining_value = aggregator.get_cumulative_value();
1923 assert!(remaining_value < 1000.0); }
1925
1926 #[rstest]
1927 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1928 let instrument = InstrumentAny::Equity(equity_aapl);
1929 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1931 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1932 let handler = Arc::new(Mutex::new(Vec::new()));
1933 let handler_clone = Arc::clone(&handler);
1934 let clock = Rc::new(RefCell::new(TestClock::new()));
1935
1936 let mut aggregator = TimeBarAggregator::new(
1937 bar_type,
1938 instrument.price_precision(),
1939 instrument.size_precision(),
1940 clock.clone(),
1941 move |bar: Bar| {
1942 let mut handler_guard = handler_clone.lock().unwrap();
1943 handler_guard.push(bar);
1944 },
1945 false, true, false, BarIntervalType::LeftOpen,
1949 None, 15, false, );
1953
1954 aggregator.update(
1955 Price::from("100.00"),
1956 Quantity::from(1),
1957 UnixNanos::default(),
1958 );
1959
1960 let next_sec = UnixNanos::from(1_000_000_000);
1961 clock.borrow_mut().set_time(next_sec);
1962
1963 let event = TimeEvent::new(
1964 Ustr::from("1-SECOND-LAST"),
1965 UUID4::new(),
1966 next_sec,
1967 next_sec,
1968 );
1969 aggregator.build_bar(event);
1970
1971 let handler_guard = handler.lock().unwrap();
1972 assert_eq!(handler_guard.len(), 1);
1973 let bar = handler_guard.first().unwrap();
1974 assert_eq!(bar.ts_event, UnixNanos::default());
1975 assert_eq!(bar.ts_init, next_sec);
1976 }
1977
1978 #[rstest]
1979 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1980 let instrument = InstrumentAny::Equity(equity_aapl);
1981 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1982 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1983 let handler = Arc::new(Mutex::new(Vec::new()));
1984 let handler_clone = Arc::clone(&handler);
1985 let clock = Rc::new(RefCell::new(TestClock::new()));
1986
1987 let mut aggregator = TimeBarAggregator::new(
1988 bar_type,
1989 instrument.price_precision(),
1990 instrument.size_precision(),
1991 clock.clone(),
1992 move |bar: Bar| {
1993 let mut handler_guard = handler_clone.lock().unwrap();
1994 handler_guard.push(bar);
1995 },
1996 false, true, true, BarIntervalType::LeftOpen,
2000 None,
2001 15,
2002 false, );
2004
2005 aggregator.update(
2007 Price::from("100.00"),
2008 Quantity::from(1),
2009 UnixNanos::default(),
2010 );
2011
2012 let ts1 = UnixNanos::from(1_000_000_000);
2014 clock.borrow_mut().set_time(ts1);
2015 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2016 aggregator.build_bar(event);
2017
2018 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2020
2021 let ts2 = UnixNanos::from(2_000_000_000);
2023 clock.borrow_mut().set_time(ts2);
2024 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2025 aggregator.build_bar(event);
2026
2027 let handler_guard = handler.lock().unwrap();
2028 assert_eq!(handler_guard.len(), 2);
2029
2030 let bar1 = &handler_guard[0];
2031 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
2033 assert_eq!(bar1.close, Price::from("100.00"));
2034 let bar2 = &handler_guard[1];
2035 assert_eq!(bar2.ts_event, ts2);
2036 assert_eq!(bar2.ts_init, ts2);
2037 assert_eq!(bar2.close, Price::from("101.00"));
2038 }
2039
2040 #[rstest]
2041 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
2042 let instrument = InstrumentAny::Equity(equity_aapl);
2043 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2044 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2045 let handler = Arc::new(Mutex::new(Vec::new()));
2046 let handler_clone = Arc::clone(&handler);
2047 let clock = Rc::new(RefCell::new(TestClock::new()));
2048 let mut aggregator = TimeBarAggregator::new(
2049 bar_type,
2050 instrument.price_precision(),
2051 instrument.size_precision(),
2052 clock.clone(),
2053 move |bar: Bar| {
2054 let mut handler_guard = handler_clone.lock().unwrap();
2055 handler_guard.push(bar);
2056 },
2057 false, true, true, BarIntervalType::RightOpen,
2061 None,
2062 15,
2063 false, );
2065
2066 aggregator.update(
2068 Price::from("100.00"),
2069 Quantity::from(1),
2070 UnixNanos::default(),
2071 );
2072
2073 let ts1 = UnixNanos::from(1_000_000_000);
2075 clock.borrow_mut().set_time(ts1);
2076 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2077 aggregator.build_bar(event);
2078
2079 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2081
2082 let ts2 = UnixNanos::from(2_000_000_000);
2084 clock.borrow_mut().set_time(ts2);
2085 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2086 aggregator.build_bar(event);
2087
2088 let handler_guard = handler.lock().unwrap();
2089 assert_eq!(handler_guard.len(), 2);
2090
2091 let bar1 = &handler_guard[0];
2092 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
2094 assert_eq!(bar1.close, Price::from("100.00"));
2095
2096 let bar2 = &handler_guard[1];
2097 assert_eq!(bar2.ts_event, ts1);
2098 assert_eq!(bar2.ts_init, ts2);
2099 assert_eq!(bar2.close, Price::from("101.00"));
2100 }
2101
2102 #[rstest]
2103 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2104 let instrument = InstrumentAny::Equity(equity_aapl);
2105 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2106 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2107 let handler = Arc::new(Mutex::new(Vec::new()));
2108 let handler_clone = Arc::clone(&handler);
2109 let clock = Rc::new(RefCell::new(TestClock::new()));
2110
2111 let mut aggregator = TimeBarAggregator::new(
2113 bar_type,
2114 instrument.price_precision(),
2115 instrument.size_precision(),
2116 clock.clone(),
2117 move |bar: Bar| {
2118 let mut handler_guard = handler_clone.lock().unwrap();
2119 handler_guard.push(bar);
2120 },
2121 false, false, true, BarIntervalType::LeftOpen,
2125 None,
2126 15,
2127 false, );
2129
2130 let ts1 = UnixNanos::from(1_000_000_000);
2132 clock.borrow_mut().set_time(ts1);
2133 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2134 aggregator.build_bar(event);
2135
2136 let handler_guard = handler.lock().unwrap();
2137 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
2139
2140 let handler = Arc::new(Mutex::new(Vec::new()));
2142 let handler_clone = Arc::clone(&handler);
2143 let mut aggregator = TimeBarAggregator::new(
2144 bar_type,
2145 instrument.price_precision(),
2146 instrument.size_precision(),
2147 clock.clone(),
2148 move |bar: Bar| {
2149 let mut handler_guard = handler_clone.lock().unwrap();
2150 handler_guard.push(bar);
2151 },
2152 false,
2153 true, true, BarIntervalType::LeftOpen,
2156 None,
2157 15,
2158 false, );
2160
2161 aggregator.update(
2162 Price::from("100.00"),
2163 Quantity::from(1),
2164 UnixNanos::default(),
2165 );
2166
2167 let ts1 = UnixNanos::from(1_000_000_000);
2169 clock.borrow_mut().set_time(ts1);
2170 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2171 aggregator.build_bar(event);
2172
2173 let ts2 = UnixNanos::from(2_000_000_000);
2175 clock.borrow_mut().set_time(ts2);
2176 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2177 aggregator.build_bar(event);
2178
2179 let handler_guard = handler.lock().unwrap();
2180 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
2182 assert_eq!(bar1.close, Price::from("100.00"));
2183 let bar2 = &handler_guard[1];
2184 assert_eq!(bar2.close, Price::from("100.00")); }
2186
2187 #[rstest]
2188 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2189 let instrument = InstrumentAny::Equity(equity_aapl);
2190 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2191 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2192 let clock = Rc::new(RefCell::new(TestClock::new()));
2193 let handler = Arc::new(Mutex::new(Vec::new()));
2194 let handler_clone = Arc::clone(&handler);
2195
2196 let mut aggregator = TimeBarAggregator::new(
2197 bar_type,
2198 instrument.price_precision(),
2199 instrument.size_precision(),
2200 clock.clone(),
2201 move |bar: Bar| {
2202 let mut handler_guard = handler_clone.lock().unwrap();
2203 handler_guard.push(bar);
2204 },
2205 false, true, true, BarIntervalType::RightOpen,
2209 None,
2210 15,
2211 false, );
2213
2214 let ts1 = UnixNanos::from(1_000_000_000);
2215 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2216
2217 let ts2 = UnixNanos::from(2_000_000_000);
2218 clock.borrow_mut().set_time(ts2);
2219
2220 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2222 aggregator.build_bar(event);
2223
2224 let handler_guard = handler.lock().unwrap();
2225 let bar = handler_guard.first().unwrap();
2226 assert_eq!(bar.ts_event, UnixNanos::default());
2227 assert_eq!(bar.ts_init, ts2);
2228 }
2229
2230 #[rstest]
2231 fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2232 let instrument = InstrumentAny::Equity(equity_aapl);
2233 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2234 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2235 let clock = Rc::new(RefCell::new(TestClock::new()));
2236 let handler = Arc::new(Mutex::new(Vec::new()));
2237 let handler_clone = Arc::clone(&handler);
2238
2239 let mut aggregator = TimeBarAggregator::new(
2240 bar_type,
2241 instrument.price_precision(),
2242 instrument.size_precision(),
2243 clock.clone(),
2244 move |bar: Bar| {
2245 let mut handler_guard = handler_clone.lock().unwrap();
2246 handler_guard.push(bar);
2247 },
2248 false, true, true, BarIntervalType::LeftOpen,
2252 None,
2253 15,
2254 false, );
2256
2257 let ts1 = UnixNanos::from(1_000_000_000);
2258 clock.borrow_mut().set_time(ts1);
2259
2260 let initial_time = clock.borrow().utc_now();
2261 aggregator.start_batch_time(UnixNanos::from(
2262 initial_time.timestamp_nanos_opt().unwrap() as u64
2263 ));
2264
2265 let handler_guard = handler.lock().unwrap();
2266 assert_eq!(handler_guard.len(), 0);
2267 }
2268}