nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25    clock::Clock,
26    timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29    UnixNanos,
30    correctness::{self, FAILED},
31    datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34    data::{
35        QuoteTick, TradeTick,
36        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37    },
38    enums::{AggregationSource, BarAggregation, BarIntervalType},
39    types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
40};
41
42/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
43///
44/// Implementors receive updates and produce completed bars via handlers, with support for partial and batch updates.
45pub trait BarAggregator: Any + Debug {
46    /// The [`BarType`] to be aggregated.
47    fn bar_type(&self) -> BarType;
48    /// If the aggregator is running and will receive data from the message bus.
49    fn is_running(&self) -> bool;
50    fn set_await_partial(&mut self, value: bool);
51    /// Enables or disables awaiting a partial bar before full aggregation.
52    fn set_is_running(&mut self, value: bool);
53    /// Sets the running state of the aggregator (receiving updates when `true`).
54    /// Updates the aggregator  with the given price and size.
55    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
56    /// Updates the aggregator with the given quote.
57    fn handle_quote(&mut self, quote: QuoteTick) {
58        let spec = self.bar_type().spec();
59        if !self.await_partial() {
60            self.update(
61                quote.extract_price(spec.price_type),
62                quote.extract_size(spec.price_type),
63                quote.ts_event,
64            );
65        }
66    }
67    /// Updates the aggregator with the given trade.
68    fn handle_trade(&mut self, trade: TradeTick) {
69        if !self.await_partial() {
70            self.update(trade.price, trade.size, trade.ts_event);
71        }
72    }
73    /// Updates the aggregator with the given bar.
74    fn handle_bar(&mut self, bar: Bar) {
75        if !self.await_partial() {
76            self.update_bar(bar, bar.volume, bar.ts_init);
77        }
78    }
79    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
80    /// Incorporates an existing bar and its volume into aggregation at the given init timestamp.
81    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
82    /// Starts batch mode, sending bars to the supplied handler for the given time context.
83    fn stop_batch_update(&mut self);
84    /// Stops batch mode and restores the standard bar handler.
85    fn await_partial(&self) -> bool;
86    /// Returns `true` if awaiting a partial bar before processing updates.
87    /// Sets the initial values for a partially completed bar.
88    fn set_partial(&mut self, partial_bar: Bar);
89    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
90    fn stop(&mut self) {}
91}
92
93impl dyn BarAggregator {
94    /// Returns a reference to this aggregator as `Any` for downcasting.
95    pub fn as_any(&self) -> &dyn Any {
96        self
97    }
98    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
99    pub fn as_any_mut(&mut self) -> &mut dyn Any {
100        self
101    }
102}
103
104/// Provides a generic bar builder for aggregation.
105#[derive(Debug)]
106pub struct BarBuilder {
107    bar_type: BarType,
108    price_precision: u8,
109    size_precision: u8,
110    initialized: bool,
111    ts_last: UnixNanos,
112    count: usize,
113    partial_set: bool,
114    last_close: Option<Price>,
115    open: Option<Price>,
116    high: Option<Price>,
117    low: Option<Price>,
118    close: Option<Price>,
119    volume: Quantity,
120}
121
122impl BarBuilder {
123    /// Creates a new [`BarBuilder`] instance.
124    ///
125    /// # Panics
126    ///
127    /// This function panics if:
128    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
129    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
130    #[must_use]
131    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
132        correctness::check_equal(
133            &bar_type.aggregation_source(),
134            &AggregationSource::Internal,
135            "bar_type.aggregation_source",
136            "AggregationSource::Internal",
137        )
138        .expect(FAILED);
139
140        Self {
141            bar_type,
142            price_precision,
143            size_precision,
144            initialized: false,
145            ts_last: UnixNanos::default(),
146            count: 0,
147            partial_set: false,
148            last_close: None,
149            open: None,
150            high: None,
151            low: None,
152            close: None,
153            volume: Quantity::zero(size_precision),
154        }
155    }
156
157    /// Set the initial values for a partially completed bar.
158    ///
159    /// # Panics
160    ///
161    /// Panics if internal values for `high` or `low` are unexpectedly missing.
162    pub fn set_partial(&mut self, partial_bar: Bar) {
163        if self.partial_set {
164            return; // Already updated
165        }
166
167        self.open = Some(partial_bar.open);
168
169        if self.high.is_none() || partial_bar.high > self.high.unwrap() {
170            self.high = Some(partial_bar.high);
171        }
172
173        if self.low.is_none() || partial_bar.low < self.low.unwrap() {
174            self.low = Some(partial_bar.low);
175        }
176
177        if self.close.is_none() {
178            self.close = Some(partial_bar.close);
179        }
180
181        self.volume = partial_bar.volume;
182
183        if self.ts_last == 0 {
184            self.ts_last = partial_bar.ts_init;
185        }
186
187        self.partial_set = true;
188        self.initialized = true;
189    }
190
191    /// Updates the builder state with the given price, size, and event timestamp.
192    ///
193    /// # Panics
194    ///
195    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
196    pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
197        if ts_event < self.ts_last {
198            return; // Not applicable
199        }
200
201        if self.open.is_none() {
202            self.open = Some(price);
203            self.high = Some(price);
204            self.low = Some(price);
205            self.initialized = true;
206        } else {
207            if price > self.high.unwrap() {
208                self.high = Some(price);
209            }
210            if price < self.low.unwrap() {
211                self.low = Some(price);
212            }
213        }
214
215        self.close = Some(price);
216        self.volume = self.volume.add(size);
217        self.count += 1;
218        self.ts_last = ts_event;
219    }
220
221    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
222    ///
223    /// # Panics
224    ///
225    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
226    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
227        if ts_init < self.ts_last {
228            return; // Not applicable
229        }
230
231        if self.open.is_none() {
232            self.open = Some(bar.open);
233            self.high = Some(bar.high);
234            self.low = Some(bar.low);
235            self.initialized = true;
236        } else {
237            if bar.high > self.high.unwrap() {
238                self.high = Some(bar.high);
239            }
240            if bar.low < self.low.unwrap() {
241                self.low = Some(bar.low);
242            }
243        }
244
245        self.close = Some(bar.close);
246        self.volume = self.volume.add(volume);
247        self.count += 1;
248        self.ts_last = ts_init;
249    }
250
251    /// Reset the bar builder.
252    ///
253    /// All stateful fields are reset to their initial value.
254    pub fn reset(&mut self) {
255        self.open = None;
256        self.high = None;
257        self.low = None;
258        self.volume = Quantity::zero(self.size_precision);
259        self.count = 0;
260    }
261
262    /// Return the aggregated bar and reset.
263    pub fn build_now(&mut self) -> Bar {
264        self.build(self.ts_last, self.ts_last)
265    }
266
267    /// Returns the aggregated bar for the given timestamps, then resets the builder.
268    ///
269    /// # Panics
270    ///
271    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
272    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
273        if self.open.is_none() {
274            self.open = self.last_close;
275            self.high = self.last_close;
276            self.low = self.last_close;
277            self.close = self.last_close;
278        }
279
280        if let (Some(close), Some(low)) = (self.close, self.low) {
281            if close < low {
282                self.low = Some(close);
283            }
284        }
285
286        if let (Some(close), Some(high)) = (self.close, self.high) {
287            if close > high {
288                self.high = Some(close);
289            }
290        }
291
292        // SAFETY: The open was checked, so we can assume all prices are Some
293        let bar = Bar::new(
294            self.bar_type,
295            self.open.unwrap(),
296            self.high.unwrap(),
297            self.low.unwrap(),
298            self.close.unwrap(),
299            self.volume,
300            ts_event,
301            ts_init,
302        );
303
304        self.last_close = self.close;
305        self.reset();
306        bar
307    }
308}
309
310/// Provides a means of aggregating specified bar types and sending to a registered handler.
311pub struct BarAggregatorCore<H>
312where
313    H: FnMut(Bar),
314{
315    bar_type: BarType,
316    builder: BarBuilder,
317    handler: H,
318    handler_backup: Option<H>,
319    batch_handler: Option<Box<dyn FnMut(Bar)>>,
320    await_partial: bool,
321    is_running: bool,
322    batch_mode: bool,
323}
324
325impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        f.debug_struct(stringify!(BarAggregatorCore))
328            .field("bar_type", &self.bar_type)
329            .field("builder", &self.builder)
330            .field("await_partial", &self.await_partial)
331            .field("is_running", &self.is_running)
332            .field("batch_mode", &self.batch_mode)
333            .finish()
334    }
335}
336
337impl<H> BarAggregatorCore<H>
338where
339    H: FnMut(Bar),
340{
341    /// Creates a new [`BarAggregatorCore`] instance.
342    ///
343    /// # Panics
344    ///
345    /// This function panics if:
346    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
347    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
348    pub fn new(
349        bar_type: BarType,
350        price_precision: u8,
351        size_precision: u8,
352        handler: H,
353        await_partial: bool,
354    ) -> Self {
355        Self {
356            bar_type,
357            builder: BarBuilder::new(bar_type, price_precision, size_precision),
358            handler,
359            handler_backup: None,
360            batch_handler: None,
361            await_partial,
362            is_running: false,
363            batch_mode: false,
364        }
365    }
366
367    /// Sets whether to await a partial bar before processing new updates.
368    pub const fn set_await_partial(&mut self, value: bool) {
369        self.await_partial = value;
370    }
371
372    /// Sets the running state of the aggregator (receives updates when `true`).
373    pub const fn set_is_running(&mut self, value: bool) {
374        self.is_running = value;
375    }
376
377    /// Returns `true` if the aggregator is awaiting a partial bar to complete before aggregation.
378    pub const fn await_partial(&self) -> bool {
379        self.await_partial
380    }
381
382    /// Initializes builder state with a partially completed bar.
383    pub fn set_partial(&mut self, partial_bar: Bar) {
384        self.builder.set_partial(partial_bar);
385    }
386
387    fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
388        self.builder.update(price, size, ts_event);
389    }
390
391    fn build_now_and_send(&mut self) {
392        let bar = self.builder.build_now();
393        (self.handler)(bar);
394    }
395
396    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
397        let bar = self.builder.build(ts_event, ts_init);
398
399        if self.batch_mode {
400            if let Some(handler) = &mut self.batch_handler {
401                handler(bar);
402            }
403        } else {
404            (self.handler)(bar);
405        }
406    }
407
408    /// Enables batch update mode, sending bars to the provided handler instead of immediate dispatch.
409    pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
410        self.batch_mode = true;
411        self.batch_handler = Some(handler);
412    }
413
414    /// Disables batch update mode and restores the original bar handler.
415    pub fn stop_batch_update(&mut self) {
416        self.batch_mode = false;
417
418        if let Some(handler) = self.handler_backup.take() {
419            self.handler = handler;
420        }
421    }
422}
423
424/// Provides a means of building tick bars aggregated from quote and trades.
425///
426/// When received tick count reaches the step threshold of the bar
427/// specification, then a bar is created and sent to the handler.
428pub struct TickBarAggregator<H>
429where
430    H: FnMut(Bar),
431{
432    core: BarAggregatorCore<H>,
433    cum_value: f64,
434}
435
436impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        f.debug_struct(stringify!(TickBarAggregator))
439            .field("core", &self.core)
440            .field("cum_value", &self.cum_value)
441            .finish()
442    }
443}
444
445impl<H> TickBarAggregator<H>
446where
447    H: FnMut(Bar),
448{
449    /// Creates a new [`TickBarAggregator`] instance.
450    ///
451    /// # Panics
452    ///
453    /// This function panics if:
454    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
455    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
456    pub fn new(
457        bar_type: BarType,
458        price_precision: u8,
459        size_precision: u8,
460        handler: H,
461        await_partial: bool,
462    ) -> Self {
463        Self {
464            core: BarAggregatorCore::new(
465                bar_type,
466                price_precision,
467                size_precision,
468                handler,
469                await_partial,
470            ),
471            cum_value: 0.0,
472        }
473    }
474}
475
476impl<H> BarAggregator for TickBarAggregator<H>
477where
478    H: FnMut(Bar) + 'static,
479{
480    fn bar_type(&self) -> BarType {
481        self.core.bar_type
482    }
483
484    fn is_running(&self) -> bool {
485        self.core.is_running
486    }
487
488    fn set_await_partial(&mut self, value: bool) {
489        self.core.set_await_partial(value);
490    }
491
492    fn set_is_running(&mut self, value: bool) {
493        self.core.set_is_running(value);
494    }
495
496    fn await_partial(&self) -> bool {
497        self.core.await_partial()
498    }
499
500    /// Apply the given update to the aggregator.
501    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
502        self.core.apply_update(price, size, ts_event);
503        let spec = self.core.bar_type.spec();
504
505        if self.core.builder.count >= spec.step.get() {
506            self.core.build_now_and_send();
507        }
508    }
509
510    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
511        let mut volume_update = volume;
512        let average_price = Price::new(
513            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
514            self.core.builder.price_precision,
515        );
516
517        while volume_update.as_f64() > 0.0 {
518            let value_update = average_price.as_f64() * volume_update.as_f64();
519            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
520                self.cum_value += value_update;
521                self.core.builder.update_bar(bar, volume_update, ts_init);
522                break;
523            }
524
525            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
526            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
527            self.core.builder.update_bar(
528                bar,
529                Quantity::new(volume_diff, volume_update.precision),
530                ts_init,
531            );
532
533            self.core.build_now_and_send();
534            self.cum_value = 0.0;
535            volume_update = Quantity::new(
536                volume_update.as_f64() - volume_diff,
537                volume_update.precision,
538            );
539        }
540    }
541
542    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
543        self.core.start_batch_update(handler);
544    }
545
546    fn stop_batch_update(&mut self) {
547        self.core.stop_batch_update();
548    }
549
550    fn set_partial(&mut self, partial_bar: Bar) {
551        self.core.set_partial(partial_bar);
552    }
553}
554
555/// Provides a means of building volume bars aggregated from quote and trades.
556pub struct VolumeBarAggregator<H>
557where
558    H: FnMut(Bar),
559{
560    core: BarAggregatorCore<H>,
561}
562
563impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
564    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
565        f.debug_struct(stringify!(VolumeBarAggregator))
566            .field("core", &self.core)
567            .finish()
568    }
569}
570
571impl<H> VolumeBarAggregator<H>
572where
573    H: FnMut(Bar),
574{
575    /// Creates a new [`VolumeBarAggregator`] instance.
576    ///
577    /// # Panics
578    ///
579    /// This function panics if:
580    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
581    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
582    pub fn new(
583        bar_type: BarType,
584        price_precision: u8,
585        size_precision: u8,
586        handler: H,
587        await_partial: bool,
588    ) -> Self {
589        Self {
590            core: BarAggregatorCore::new(
591                bar_type.standard(),
592                price_precision,
593                size_precision,
594                handler,
595                await_partial,
596            ),
597        }
598    }
599}
600
601impl<H> BarAggregator for VolumeBarAggregator<H>
602where
603    H: FnMut(Bar) + 'static,
604{
605    fn bar_type(&self) -> BarType {
606        self.core.bar_type
607    }
608
609    fn is_running(&self) -> bool {
610        self.core.is_running
611    }
612
613    fn set_await_partial(&mut self, value: bool) {
614        self.core.set_await_partial(value);
615    }
616
617    fn set_is_running(&mut self, value: bool) {
618        self.core.set_is_running(value);
619    }
620
621    fn await_partial(&self) -> bool {
622        self.core.await_partial()
623    }
624
625    /// Apply the given update to the aggregator.
626    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
627        let mut raw_size_update = size.raw;
628        let spec = self.core.bar_type.spec();
629        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
630
631        while raw_size_update > 0 {
632            if self.core.builder.volume.raw + raw_size_update < raw_step {
633                self.core.apply_update(
634                    price,
635                    Quantity::from_raw(raw_size_update, size.precision),
636                    ts_event,
637                );
638                break;
639            }
640
641            let raw_size_diff = raw_step - self.core.builder.volume.raw;
642            self.core.apply_update(
643                price,
644                Quantity::from_raw(raw_size_diff, size.precision),
645                ts_event,
646            );
647
648            self.core.build_now_and_send();
649            raw_size_update -= raw_size_diff;
650        }
651    }
652
653    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
654        let mut raw_volume_update = volume.raw;
655        let spec = self.core.bar_type.spec();
656        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
657
658        while raw_volume_update > 0 {
659            if self.core.builder.volume.raw + raw_volume_update < raw_step {
660                self.core.builder.update_bar(
661                    bar,
662                    Quantity::from_raw(raw_volume_update, volume.precision),
663                    ts_init,
664                );
665                break;
666            }
667
668            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
669            self.core.builder.update_bar(
670                bar,
671                Quantity::from_raw(raw_volume_diff, volume.precision),
672                ts_init,
673            );
674
675            self.core.build_now_and_send();
676            raw_volume_update -= raw_volume_diff;
677        }
678    }
679
680    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
681        self.core.start_batch_update(handler);
682    }
683
684    fn stop_batch_update(&mut self) {
685        self.core.stop_batch_update();
686    }
687
688    fn set_partial(&mut self, partial_bar: Bar) {
689        self.core.set_partial(partial_bar);
690    }
691}
692
693/// Provides a means of building value bars aggregated from quote and trades.
694///
695/// When received value reaches the step threshold of the bar
696/// specification, then a bar is created and sent to the handler.
697pub struct ValueBarAggregator<H>
698where
699    H: FnMut(Bar),
700{
701    core: BarAggregatorCore<H>,
702    cum_value: f64,
703}
704
705impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
706    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707        f.debug_struct(stringify!(ValueBarAggregator))
708            .field("core", &self.core)
709            .field("cum_value", &self.cum_value)
710            .finish()
711    }
712}
713
714impl<H> ValueBarAggregator<H>
715where
716    H: FnMut(Bar),
717{
718    /// Creates a new [`ValueBarAggregator`] instance.
719    ///
720    /// # Panics
721    ///
722    /// This function panics if:
723    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
724    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
725    pub fn new(
726        bar_type: BarType,
727        price_precision: u8,
728        size_precision: u8,
729        handler: H,
730        await_partial: bool,
731    ) -> Self {
732        Self {
733            core: BarAggregatorCore::new(
734                bar_type.standard(),
735                price_precision,
736                size_precision,
737                handler,
738                await_partial,
739            ),
740            cum_value: 0.0,
741        }
742    }
743
744    #[must_use]
745    /// Returns the cumulative value for the aggregator.
746    pub const fn get_cumulative_value(&self) -> f64 {
747        self.cum_value
748    }
749}
750
751impl<H> BarAggregator for ValueBarAggregator<H>
752where
753    H: FnMut(Bar) + 'static,
754{
755    fn bar_type(&self) -> BarType {
756        self.core.bar_type
757    }
758
759    fn is_running(&self) -> bool {
760        self.core.is_running
761    }
762
763    fn set_await_partial(&mut self, value: bool) {
764        self.core.set_await_partial(value);
765    }
766
767    fn set_is_running(&mut self, value: bool) {
768        self.core.set_is_running(value);
769    }
770
771    fn await_partial(&self) -> bool {
772        self.core.await_partial()
773    }
774
775    /// Apply the given update to the aggregator.
776    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
777        let mut size_update = size.as_f64();
778        let spec = self.core.bar_type.spec();
779
780        while size_update > 0.0 {
781            let value_update = price.as_f64() * size_update;
782            if self.cum_value + value_update < spec.step.get() as f64 {
783                self.cum_value += value_update;
784                self.core
785                    .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
786                break;
787            }
788
789            let value_diff = spec.step.get() as f64 - self.cum_value;
790            let size_diff = size_update * (value_diff / value_update);
791            self.core
792                .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
793
794            self.core.build_now_and_send();
795            self.cum_value = 0.0;
796            size_update -= size_diff;
797        }
798    }
799
800    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
801        let mut volume_update = volume;
802        let average_price = Price::new(
803            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
804            self.core.builder.price_precision,
805        );
806
807        while volume_update.as_f64() > 0.0 {
808            let value_update = average_price.as_f64() * volume_update.as_f64();
809            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
810                self.cum_value += value_update;
811                self.core.builder.update_bar(bar, volume_update, ts_init);
812                break;
813            }
814
815            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
816            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
817            self.core.builder.update_bar(
818                bar,
819                Quantity::new(volume_diff, volume_update.precision),
820                ts_init,
821            );
822
823            self.core.build_now_and_send();
824            self.cum_value = 0.0;
825            volume_update = Quantity::new(
826                volume_update.as_f64() - volume_diff,
827                volume_update.precision,
828            );
829        }
830    }
831
832    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
833        self.core.start_batch_update(handler);
834    }
835
836    fn stop_batch_update(&mut self) {
837        self.core.stop_batch_update();
838    }
839
840    fn set_partial(&mut self, partial_bar: Bar) {
841        self.core.set_partial(partial_bar);
842    }
843}
844
845/// Provides a means of building time bars aggregated from quote and trades.
846///
847/// At each aggregation time interval, a bar is created and sent to the handler.
848pub struct TimeBarAggregator<H>
849where
850    H: FnMut(Bar),
851{
852    core: BarAggregatorCore<H>,
853    clock: Rc<RefCell<dyn Clock>>,
854    build_with_no_updates: bool,
855    timestamp_on_close: bool,
856    is_left_open: bool,
857    build_on_next_tick: bool,
858    stored_open_ns: UnixNanos,
859    stored_close_ns: UnixNanos,
860    timer_name: String,
861    interval_ns: UnixNanos,
862    next_close_ns: UnixNanos,
863    composite_bar_build_delay: i64,
864    add_delay: bool,
865    batch_open_ns: UnixNanos,
866    batch_next_close_ns: UnixNanos,
867    time_bars_origin: Option<TimeDelta>,
868    skip_first_non_full_bar: bool,
869}
870
871impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
872    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
873        f.debug_struct(stringify!(TimeBarAggregator))
874            .field("core", &self.core)
875            .field("build_with_no_updates", &self.build_with_no_updates)
876            .field("timestamp_on_close", &self.timestamp_on_close)
877            .field("is_left_open", &self.is_left_open)
878            .field("timer_name", &self.timer_name)
879            .field("interval_ns", &self.interval_ns)
880            .field("composite_bar_build_delay", &self.composite_bar_build_delay)
881            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
882            .finish()
883    }
884}
885
886#[derive(Clone, Debug)]
887pub struct NewBarCallback<H: FnMut(Bar)> {
888    aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
889}
890
891impl<H: FnMut(Bar)> NewBarCallback<H> {
892    /// Creates a new callback that invokes the time bar aggregator on timer events.
893    #[must_use]
894    pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
895        Self { aggregator }
896    }
897}
898
899impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
900    fn from(value: NewBarCallback<H>) -> Self {
901        Self::Rust(Rc::new(move |event: TimeEvent| {
902            value.aggregator.borrow_mut().build_bar(event);
903        }))
904    }
905}
906
907impl<H> TimeBarAggregator<H>
908where
909    H: FnMut(Bar) + 'static,
910{
911    /// Creates a new [`TimeBarAggregator`] instance.
912    ///
913    /// # Panics
914    ///
915    /// This function panics if:
916    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
917    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
918    #[allow(clippy::too_many_arguments)]
919    pub fn new(
920        bar_type: BarType,
921        price_precision: u8,
922        size_precision: u8,
923        clock: Rc<RefCell<dyn Clock>>,
924        handler: H,
925        await_partial: bool,
926        build_with_no_updates: bool,
927        timestamp_on_close: bool,
928        interval_type: BarIntervalType,
929        time_bars_origin: Option<TimeDelta>,
930        composite_bar_build_delay: i64,
931        skip_first_non_full_bar: bool,
932    ) -> Self {
933        let is_left_open = match interval_type {
934            BarIntervalType::LeftOpen => true,
935            BarIntervalType::RightOpen => false,
936        };
937
938        let add_delay = bar_type.is_composite()
939            && bar_type.composite().aggregation_source() == AggregationSource::Internal;
940
941        let core = BarAggregatorCore::new(
942            bar_type.standard(),
943            price_precision,
944            size_precision,
945            handler,
946            await_partial,
947        );
948
949        Self {
950            core,
951            clock,
952            build_with_no_updates,
953            timestamp_on_close,
954            is_left_open,
955            build_on_next_tick: false,
956            stored_open_ns: UnixNanos::default(),
957            stored_close_ns: UnixNanos::default(),
958            timer_name: bar_type.to_string(),
959            interval_ns: get_bar_interval_ns(&bar_type),
960            next_close_ns: UnixNanos::default(),
961            composite_bar_build_delay,
962            add_delay,
963            batch_open_ns: UnixNanos::default(),
964            batch_next_close_ns: UnixNanos::default(),
965            time_bars_origin,
966            skip_first_non_full_bar,
967        }
968    }
969
970    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
971    ///
972    /// # Errors
973    ///
974    /// Returns an error if setting up the underlying clock timer fails.
975    ///
976    /// # Panics
977    ///
978    /// Panics if the underlying clock timer registration fails.
979    pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
980        let now = self.clock.borrow().utc_now();
981        let mut start_time = get_time_bar_start(now, &self.bar_type(), self.time_bars_origin);
982
983        if start_time == now {
984            self.skip_first_non_full_bar = false;
985        }
986
987        if self.add_delay {
988            start_time += TimeDelta::microseconds(self.composite_bar_build_delay);
989        }
990
991        let spec = &self.bar_type().spec();
992        let start_time_ns = UnixNanos::from(start_time);
993
994        if spec.aggregation == BarAggregation::Month {
995            let step = spec.step.get() as u32;
996            let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
997
998            self.clock
999                .borrow_mut()
1000                .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1001                .expect(FAILED);
1002        } else {
1003            self.clock
1004                .borrow_mut()
1005                .set_timer_ns(
1006                    &self.timer_name,
1007                    self.interval_ns.as_u64(),
1008                    start_time_ns,
1009                    None,
1010                    Some(callback.into()),
1011                    None,
1012                )
1013                .expect(FAILED);
1014        }
1015
1016        log::debug!("Started timer {}", self.timer_name);
1017        Ok(())
1018    }
1019
1020    /// Stops the time bar aggregator.
1021    pub fn stop(&mut self) {
1022        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1023    }
1024
1025    /// Starts batch time for bar aggregation.
1026    ///
1027    /// # Panics
1028    ///
1029    /// Panics if month arithmetic operations fail for monthly aggregation intervals.
1030    pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1031        let spec = self.bar_type().spec();
1032        self.core.batch_mode = true;
1033
1034        let time = time_ns.to_datetime_utc();
1035        let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin);
1036        self.batch_open_ns = UnixNanos::from(start_time);
1037
1038        if spec.aggregation == BarAggregation::Month {
1039            let step = spec.step.get() as u32;
1040
1041            if self.batch_open_ns == time_ns {
1042                self.batch_open_ns =
1043                    subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1044            }
1045
1046            self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1047        } else {
1048            if self.batch_open_ns == time_ns {
1049                self.batch_open_ns -= self.interval_ns;
1050            }
1051
1052            self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1053        }
1054    }
1055
1056    const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1057        if self.is_left_open {
1058            if self.timestamp_on_close {
1059                close_ns
1060            } else {
1061                open_ns
1062            }
1063        } else {
1064            open_ns
1065        }
1066    }
1067
1068    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1069        if self.skip_first_non_full_bar {
1070            self.core.builder.reset();
1071            self.skip_first_non_full_bar = false;
1072        } else {
1073            self.core.build_and_send(ts_event, ts_init);
1074        }
1075    }
1076
1077    fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1078        if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1079            let ts_init = self.batch_next_close_ns;
1080            let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1081            self.build_and_send(ts_event, ts_init);
1082        }
1083    }
1084
1085    fn batch_post_update(&mut self, time_ns: UnixNanos) {
1086        let step = self.bar_type().spec().step.get() as u32;
1087
1088        // If not in batch mode and time matches next close, reset batch close
1089        if !self.core.batch_mode
1090            && time_ns == self.batch_next_close_ns
1091            && time_ns > self.stored_open_ns
1092        {
1093            self.batch_next_close_ns = UnixNanos::default();
1094            return;
1095        }
1096
1097        if time_ns > self.batch_next_close_ns {
1098            // Ensure batch times are coherent with last builder update
1099            if self.bar_type().spec().aggregation == BarAggregation::Month {
1100                while self.batch_next_close_ns < time_ns {
1101                    self.batch_next_close_ns =
1102                        add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1103                }
1104
1105                self.batch_open_ns =
1106                    subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1107            } else {
1108                while self.batch_next_close_ns < time_ns {
1109                    self.batch_next_close_ns += self.interval_ns;
1110                }
1111
1112                self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1113            }
1114        }
1115
1116        if time_ns == self.batch_next_close_ns {
1117            let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1118            self.build_and_send(ts_event, time_ns);
1119            self.batch_open_ns = self.batch_next_close_ns;
1120
1121            if self.bar_type().spec().aggregation == BarAggregation::Month {
1122                self.batch_next_close_ns =
1123                    add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1124            } else {
1125                self.batch_next_close_ns += self.interval_ns;
1126            }
1127        }
1128
1129        // Delay resetting batch_next_close_ns to allow creating a last historical bar when transitioning to regular bars
1130        if !self.core.batch_mode {
1131            self.batch_next_close_ns = UnixNanos::default();
1132        }
1133    }
1134
1135    fn build_bar(&mut self, event: TimeEvent) {
1136        if !self.core.builder.initialized {
1137            self.build_on_next_tick = true;
1138            self.stored_close_ns = self.next_close_ns;
1139            return;
1140        }
1141
1142        if !self.build_with_no_updates && self.core.builder.count == 0 {
1143            return;
1144        }
1145
1146        let ts_init = event.ts_event;
1147        let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1148        self.build_and_send(ts_event, ts_init);
1149
1150        self.stored_open_ns = ts_init;
1151
1152        if self.bar_type().spec().aggregation == BarAggregation::Month {
1153            let step = self.bar_type().spec().step.get() as u32;
1154            let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1155
1156            self.clock
1157                .borrow_mut()
1158                .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1159                .expect(FAILED);
1160
1161            self.next_close_ns = next_alert_ns;
1162        } else {
1163            self.next_close_ns = self
1164                .clock
1165                .borrow()
1166                .next_time_ns(&self.timer_name)
1167                .unwrap_or_default();
1168        }
1169    }
1170}
1171
1172impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1173where
1174    H: FnMut(Bar) + 'static,
1175{
1176    fn bar_type(&self) -> BarType {
1177        self.core.bar_type
1178    }
1179
1180    fn is_running(&self) -> bool {
1181        self.core.is_running
1182    }
1183
1184    fn set_await_partial(&mut self, value: bool) {
1185        self.core.set_await_partial(value);
1186    }
1187
1188    fn set_is_running(&mut self, value: bool) {
1189        self.core.set_is_running(value);
1190    }
1191
1192    fn await_partial(&self) -> bool {
1193        self.core.await_partial()
1194    }
1195    /// Stop time-based aggregator by cancelling its timer.
1196    fn stop(&mut self) {
1197        Self::stop(self);
1198    }
1199
1200    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1201        if self.batch_next_close_ns != UnixNanos::default() {
1202            self.batch_pre_update(ts_event);
1203        }
1204
1205        self.core.apply_update(price, size, ts_event);
1206
1207        if self.build_on_next_tick {
1208            if ts_event <= self.stored_close_ns {
1209                let ts_init = ts_event;
1210                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1211                self.build_and_send(ts_event, ts_init);
1212            }
1213
1214            self.build_on_next_tick = false;
1215            self.stored_close_ns = UnixNanos::default();
1216        }
1217
1218        if self.batch_next_close_ns != UnixNanos::default() {
1219            self.batch_post_update(ts_event);
1220        }
1221    }
1222
1223    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1224        if self.batch_next_close_ns != UnixNanos::default() {
1225            self.batch_pre_update(ts_init);
1226        }
1227
1228        self.core.builder.update_bar(bar, volume, ts_init);
1229
1230        if self.build_on_next_tick {
1231            if ts_init <= self.stored_close_ns {
1232                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1233                self.build_and_send(ts_event, ts_init);
1234            }
1235
1236            // Reset flag and clear stored close
1237            self.build_on_next_tick = false;
1238            self.stored_close_ns = UnixNanos::default();
1239        }
1240
1241        if self.batch_next_close_ns != UnixNanos::default() {
1242            self.batch_post_update(ts_init);
1243        }
1244    }
1245
1246    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1247        self.core.start_batch_update(handler);
1248        self.start_batch_time(time_ns);
1249    }
1250
1251    fn stop_batch_update(&mut self) {
1252        self.core.stop_batch_update();
1253    }
1254
1255    fn set_partial(&mut self, partial_bar: Bar) {
1256        self.core.set_partial(partial_bar);
1257    }
1258}
1259
1260////////////////////////////////////////////////////////////////////////////////
1261// Tests
1262////////////////////////////////////////////////////////////////////////////////
1263#[cfg(test)]
1264mod tests {
1265    use std::sync::{Arc, Mutex};
1266
1267    use nautilus_common::clock::TestClock;
1268    use nautilus_core::UUID4;
1269    use nautilus_model::{
1270        data::{BarSpecification, BarType},
1271        enums::{AggregationSource, BarAggregation, PriceType},
1272        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1273        types::{Price, Quantity},
1274    };
1275    use rstest::rstest;
1276    use ustr::Ustr;
1277
1278    use super::*;
1279
1280    #[rstest]
1281    fn test_bar_builder_initialization(equity_aapl: Equity) {
1282        let instrument = InstrumentAny::Equity(equity_aapl);
1283        let bar_type = BarType::new(
1284            instrument.id(),
1285            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1286            AggregationSource::Internal,
1287        );
1288        let builder = BarBuilder::new(
1289            bar_type,
1290            instrument.price_precision(),
1291            instrument.size_precision(),
1292        );
1293
1294        assert!(!builder.initialized);
1295        assert_eq!(builder.ts_last, 0);
1296        assert_eq!(builder.count, 0);
1297    }
1298
1299    #[rstest]
1300    fn test_set_partial_update(equity_aapl: Equity) {
1301        let instrument = InstrumentAny::Equity(equity_aapl);
1302        let bar_type = BarType::new(
1303            instrument.id(),
1304            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1305            AggregationSource::Internal,
1306        );
1307        let mut builder = BarBuilder::new(
1308            bar_type,
1309            instrument.price_precision(),
1310            instrument.size_precision(),
1311        );
1312
1313        let partial_bar = Bar::new(
1314            bar_type,
1315            Price::from("101.00"),
1316            Price::from("102.00"),
1317            Price::from("100.00"),
1318            Price::from("101.00"),
1319            Quantity::from(100),
1320            UnixNanos::from(1),
1321            UnixNanos::from(2),
1322        );
1323
1324        builder.set_partial(partial_bar);
1325        let bar = builder.build_now();
1326
1327        assert_eq!(bar.open, partial_bar.open);
1328        assert_eq!(bar.high, partial_bar.high);
1329        assert_eq!(bar.low, partial_bar.low);
1330        assert_eq!(bar.close, partial_bar.close);
1331        assert_eq!(bar.volume, partial_bar.volume);
1332        assert_eq!(builder.ts_last, 2);
1333    }
1334
1335    #[rstest]
1336    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1337        let instrument = InstrumentAny::Equity(equity_aapl);
1338        let bar_type = BarType::new(
1339            instrument.id(),
1340            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1341            AggregationSource::Internal,
1342        );
1343        let mut builder = BarBuilder::new(
1344            bar_type,
1345            instrument.price_precision(),
1346            instrument.size_precision(),
1347        );
1348
1349        builder.update(
1350            Price::from("100.00"),
1351            Quantity::from(1),
1352            UnixNanos::from(1000),
1353        );
1354        builder.update(
1355            Price::from("95.00"),
1356            Quantity::from(1),
1357            UnixNanos::from(2000),
1358        );
1359        builder.update(
1360            Price::from("105.00"),
1361            Quantity::from(1),
1362            UnixNanos::from(3000),
1363        );
1364
1365        let bar = builder.build_now();
1366        assert!(bar.high > bar.low);
1367        assert_eq!(bar.open, Price::from("100.00"));
1368        assert_eq!(bar.high, Price::from("105.00"));
1369        assert_eq!(bar.low, Price::from("95.00"));
1370        assert_eq!(bar.close, Price::from("105.00"));
1371    }
1372
1373    #[rstest]
1374    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1375        let instrument = InstrumentAny::Equity(equity_aapl);
1376        let bar_type = BarType::new(
1377            instrument.id(),
1378            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1379            AggregationSource::Internal,
1380        );
1381        let mut builder = BarBuilder::new(
1382            bar_type,
1383            instrument.price_precision(),
1384            instrument.size_precision(),
1385        );
1386
1387        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1388        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1389
1390        assert_eq!(builder.ts_last, 1_000);
1391        assert_eq!(builder.count, 1);
1392    }
1393
1394    #[rstest]
1395    fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1396        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1397        let bar_type = BarType::new(
1398            instrument.id(),
1399            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1400            AggregationSource::Internal,
1401        );
1402        let mut builder = BarBuilder::new(
1403            bar_type,
1404            instrument.price_precision(),
1405            instrument.size_precision(),
1406        );
1407
1408        let partial_bar = Bar::new(
1409            bar_type,
1410            Price::from("1.00001"),
1411            Price::from("1.00010"),
1412            Price::from("1.00000"),
1413            Price::from("1.00002"),
1414            Quantity::from(1),
1415            UnixNanos::from(1_000_000_000),
1416            UnixNanos::from(2_000_000_000),
1417        );
1418
1419        builder.set_partial(partial_bar);
1420        let bar = builder.build_now();
1421
1422        assert_eq!(bar.open, Price::from("1.00001"));
1423        assert_eq!(bar.high, Price::from("1.00010"));
1424        assert_eq!(bar.low, Price::from("1.00000"));
1425        assert_eq!(bar.close, Price::from("1.00002"));
1426        assert_eq!(bar.volume, Quantity::from(1));
1427        assert_eq!(bar.ts_init, 2_000_000_000);
1428        assert_eq!(builder.ts_last, 2_000_000_000);
1429    }
1430
1431    #[rstest]
1432    fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1433        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1434        let bar_type = BarType::new(
1435            instrument.id(),
1436            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1437            AggregationSource::Internal,
1438        );
1439        let mut builder = BarBuilder::new(
1440            bar_type,
1441            instrument.price_precision(),
1442            instrument.size_precision(),
1443        );
1444
1445        let partial_bar1 = Bar::new(
1446            bar_type,
1447            Price::from("1.00001"),
1448            Price::from("1.00010"),
1449            Price::from("1.00000"),
1450            Price::from("1.00002"),
1451            Quantity::from(1),
1452            UnixNanos::from(1_000_000_000),
1453            UnixNanos::from(1_000_000_000),
1454        );
1455
1456        let partial_bar2 = Bar::new(
1457            bar_type,
1458            Price::from("2.00001"),
1459            Price::from("2.00010"),
1460            Price::from("2.00000"),
1461            Price::from("2.00002"),
1462            Quantity::from(2),
1463            UnixNanos::from(3_000_000_000),
1464            UnixNanos::from(3_000_000_000),
1465        );
1466
1467        builder.set_partial(partial_bar1);
1468        builder.set_partial(partial_bar2);
1469        let bar = builder.build(
1470            UnixNanos::from(4_000_000_000),
1471            UnixNanos::from(4_000_000_000),
1472        );
1473
1474        assert_eq!(bar.open, Price::from("1.00001"));
1475        assert_eq!(bar.high, Price::from("1.00010"));
1476        assert_eq!(bar.low, Price::from("1.00000"));
1477        assert_eq!(bar.close, Price::from("1.00002"));
1478        assert_eq!(bar.volume, Quantity::from(1));
1479        assert_eq!(bar.ts_init, 4_000_000_000);
1480        assert_eq!(builder.ts_last, 1_000_000_000);
1481    }
1482
1483    #[rstest]
1484    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1485        let instrument = InstrumentAny::Equity(equity_aapl);
1486        let bar_type = BarType::new(
1487            instrument.id(),
1488            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1489            AggregationSource::Internal,
1490        );
1491        let mut builder = BarBuilder::new(
1492            bar_type,
1493            instrument.price_precision(),
1494            instrument.size_precision(),
1495        );
1496
1497        builder.update(
1498            Price::from("1.00000"),
1499            Quantity::from(1),
1500            UnixNanos::default(),
1501        );
1502
1503        assert!(builder.initialized);
1504        assert_eq!(builder.ts_last, 0);
1505        assert_eq!(builder.count, 1);
1506    }
1507
1508    #[rstest]
1509    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1510        equity_aapl: Equity,
1511    ) {
1512        let instrument = InstrumentAny::Equity(equity_aapl);
1513        let bar_type = BarType::new(
1514            instrument.id(),
1515            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1516            AggregationSource::Internal,
1517        );
1518        let mut builder = BarBuilder::new(bar_type, 2, 0);
1519
1520        builder.update(
1521            Price::from("1.00000"),
1522            Quantity::from(1),
1523            UnixNanos::from(1_000),
1524        );
1525        builder.update(
1526            Price::from("1.00001"),
1527            Quantity::from(1),
1528            UnixNanos::from(500),
1529        );
1530
1531        assert!(builder.initialized);
1532        assert_eq!(builder.ts_last, 1_000);
1533        assert_eq!(builder.count, 1);
1534    }
1535
1536    #[rstest]
1537    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1538        let instrument = InstrumentAny::Equity(equity_aapl);
1539        let bar_type = BarType::new(
1540            instrument.id(),
1541            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1542            AggregationSource::Internal,
1543        );
1544        let mut builder = BarBuilder::new(
1545            bar_type,
1546            instrument.price_precision(),
1547            instrument.size_precision(),
1548        );
1549
1550        for _ in 0..5 {
1551            builder.update(
1552                Price::from("1.00000"),
1553                Quantity::from(1),
1554                UnixNanos::from(1_000),
1555            );
1556        }
1557
1558        assert_eq!(builder.count, 5);
1559    }
1560
1561    #[rstest]
1562    #[should_panic]
1563    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1564        let instrument = InstrumentAny::Equity(equity_aapl);
1565        let bar_type = BarType::new(
1566            instrument.id(),
1567            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1568            AggregationSource::Internal,
1569        );
1570        let mut builder = BarBuilder::new(
1571            bar_type,
1572            instrument.price_precision(),
1573            instrument.size_precision(),
1574        );
1575        let _ = builder.build_now();
1576    }
1577
1578    #[rstest]
1579    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1580        let instrument = InstrumentAny::Equity(equity_aapl);
1581        let bar_type = BarType::new(
1582            instrument.id(),
1583            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1584            AggregationSource::Internal,
1585        );
1586        let mut builder = BarBuilder::new(
1587            bar_type,
1588            instrument.price_precision(),
1589            instrument.size_precision(),
1590        );
1591
1592        builder.update(
1593            Price::from("1.00001"),
1594            Quantity::from(2),
1595            UnixNanos::default(),
1596        );
1597        builder.update(
1598            Price::from("1.00002"),
1599            Quantity::from(2),
1600            UnixNanos::default(),
1601        );
1602        builder.update(
1603            Price::from("1.00000"),
1604            Quantity::from(1),
1605            UnixNanos::from(1_000_000_000),
1606        );
1607
1608        let bar = builder.build_now();
1609
1610        assert_eq!(bar.open, Price::from("1.00001"));
1611        assert_eq!(bar.high, Price::from("1.00002"));
1612        assert_eq!(bar.low, Price::from("1.00000"));
1613        assert_eq!(bar.close, Price::from("1.00000"));
1614        assert_eq!(bar.volume, Quantity::from(5));
1615        assert_eq!(bar.ts_init, 1_000_000_000);
1616        assert_eq!(builder.ts_last, 1_000_000_000);
1617        assert_eq!(builder.count, 0);
1618    }
1619
1620    #[rstest]
1621    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1622        let instrument = InstrumentAny::Equity(equity_aapl);
1623        let bar_type = BarType::new(
1624            instrument.id(),
1625            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1626            AggregationSource::Internal,
1627        );
1628        let mut builder = BarBuilder::new(bar_type, 2, 0);
1629
1630        builder.update(
1631            Price::from("1.00001"),
1632            Quantity::from(1),
1633            UnixNanos::default(),
1634        );
1635        builder.build_now();
1636
1637        builder.update(
1638            Price::from("1.00000"),
1639            Quantity::from(1),
1640            UnixNanos::default(),
1641        );
1642        builder.update(
1643            Price::from("1.00003"),
1644            Quantity::from(1),
1645            UnixNanos::default(),
1646        );
1647        builder.update(
1648            Price::from("1.00002"),
1649            Quantity::from(1),
1650            UnixNanos::default(),
1651        );
1652
1653        let bar = builder.build_now();
1654
1655        assert_eq!(bar.open, Price::from("1.00000"));
1656        assert_eq!(bar.high, Price::from("1.00003"));
1657        assert_eq!(bar.low, Price::from("1.00000"));
1658        assert_eq!(bar.close, Price::from("1.00002"));
1659        assert_eq!(bar.volume, Quantity::from(3));
1660    }
1661
1662    #[rstest]
1663    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1664        let instrument = InstrumentAny::Equity(equity_aapl);
1665        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1666        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1667        let handler = Arc::new(Mutex::new(Vec::new()));
1668        let handler_clone = Arc::clone(&handler);
1669
1670        let mut aggregator = TickBarAggregator::new(
1671            bar_type,
1672            instrument.price_precision(),
1673            instrument.size_precision(),
1674            move |bar: Bar| {
1675                let mut handler_guard = handler_clone.lock().unwrap();
1676                handler_guard.push(bar);
1677            },
1678            false,
1679        );
1680
1681        let trade = TradeTick::default();
1682        aggregator.handle_trade(trade);
1683
1684        let handler_guard = handler.lock().unwrap();
1685        assert_eq!(handler_guard.len(), 0);
1686    }
1687
1688    #[rstest]
1689    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1690        let instrument = InstrumentAny::Equity(equity_aapl);
1691        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1692        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1693        let handler = Arc::new(Mutex::new(Vec::new()));
1694        let handler_clone = Arc::clone(&handler);
1695
1696        let mut aggregator = TickBarAggregator::new(
1697            bar_type,
1698            instrument.price_precision(),
1699            instrument.size_precision(),
1700            move |bar: Bar| {
1701                let mut handler_guard = handler_clone.lock().unwrap();
1702                handler_guard.push(bar);
1703            },
1704            false,
1705        );
1706
1707        let trade = TradeTick::default();
1708        aggregator.handle_trade(trade);
1709        aggregator.handle_trade(trade);
1710        aggregator.handle_trade(trade);
1711
1712        let handler_guard = handler.lock().unwrap();
1713        let bar = handler_guard.first().unwrap();
1714        assert_eq!(handler_guard.len(), 1);
1715        assert_eq!(bar.open, trade.price);
1716        assert_eq!(bar.high, trade.price);
1717        assert_eq!(bar.low, trade.price);
1718        assert_eq!(bar.close, trade.price);
1719        assert_eq!(bar.volume, Quantity::from(300000));
1720        assert_eq!(bar.ts_event, trade.ts_event);
1721        assert_eq!(bar.ts_init, trade.ts_init);
1722    }
1723
1724    #[rstest]
1725    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1726        let instrument = InstrumentAny::Equity(equity_aapl);
1727        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1728        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1729        let handler = Arc::new(Mutex::new(Vec::new()));
1730        let handler_clone = Arc::clone(&handler);
1731
1732        let mut aggregator = TickBarAggregator::new(
1733            bar_type,
1734            instrument.price_precision(),
1735            instrument.size_precision(),
1736            move |bar: Bar| {
1737                let mut handler_guard = handler_clone.lock().unwrap();
1738                handler_guard.push(bar);
1739            },
1740            false,
1741        );
1742
1743        aggregator.update(
1744            Price::from("1.00001"),
1745            Quantity::from(1),
1746            UnixNanos::default(),
1747        );
1748        aggregator.update(
1749            Price::from("1.00002"),
1750            Quantity::from(1),
1751            UnixNanos::from(1000),
1752        );
1753        aggregator.update(
1754            Price::from("1.00003"),
1755            Quantity::from(1),
1756            UnixNanos::from(2000),
1757        );
1758
1759        let handler_guard = handler.lock().unwrap();
1760        assert_eq!(handler_guard.len(), 1);
1761
1762        let bar = handler_guard.first().unwrap();
1763        assert_eq!(bar.open, Price::from("1.00001"));
1764        assert_eq!(bar.high, Price::from("1.00003"));
1765        assert_eq!(bar.low, Price::from("1.00001"));
1766        assert_eq!(bar.close, Price::from("1.00003"));
1767        assert_eq!(bar.volume, Quantity::from(3));
1768    }
1769
1770    #[rstest]
1771    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1772        let instrument = InstrumentAny::Equity(equity_aapl);
1773        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1774        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1775        let handler = Arc::new(Mutex::new(Vec::new()));
1776        let handler_clone = Arc::clone(&handler);
1777
1778        let mut aggregator = TickBarAggregator::new(
1779            bar_type,
1780            instrument.price_precision(),
1781            instrument.size_precision(),
1782            move |bar: Bar| {
1783                let mut handler_guard = handler_clone.lock().unwrap();
1784                handler_guard.push(bar);
1785            },
1786            false,
1787        );
1788
1789        aggregator.update(
1790            Price::from("1.00001"),
1791            Quantity::from(1),
1792            UnixNanos::default(),
1793        );
1794        aggregator.update(
1795            Price::from("1.00002"),
1796            Quantity::from(1),
1797            UnixNanos::from(1000),
1798        );
1799        aggregator.update(
1800            Price::from("1.00003"),
1801            Quantity::from(1),
1802            UnixNanos::from(2000),
1803        );
1804        aggregator.update(
1805            Price::from("1.00004"),
1806            Quantity::from(1),
1807            UnixNanos::from(3000),
1808        );
1809
1810        let handler_guard = handler.lock().unwrap();
1811        assert_eq!(handler_guard.len(), 2);
1812
1813        let bar1 = &handler_guard[0];
1814        assert_eq!(bar1.open, Price::from("1.00001"));
1815        assert_eq!(bar1.close, Price::from("1.00002"));
1816        assert_eq!(bar1.volume, Quantity::from(2));
1817
1818        let bar2 = &handler_guard[1];
1819        assert_eq!(bar2.open, Price::from("1.00003"));
1820        assert_eq!(bar2.close, Price::from("1.00004"));
1821        assert_eq!(bar2.volume, Quantity::from(2));
1822    }
1823
1824    #[rstest]
1825    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1826        let instrument = InstrumentAny::Equity(equity_aapl);
1827        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1828        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1829        let handler = Arc::new(Mutex::new(Vec::new()));
1830        let handler_clone = Arc::clone(&handler);
1831
1832        let mut aggregator = VolumeBarAggregator::new(
1833            bar_type,
1834            instrument.price_precision(),
1835            instrument.size_precision(),
1836            move |bar: Bar| {
1837                let mut handler_guard = handler_clone.lock().unwrap();
1838                handler_guard.push(bar);
1839            },
1840            false,
1841        );
1842
1843        aggregator.update(
1844            Price::from("1.00001"),
1845            Quantity::from(25),
1846            UnixNanos::default(),
1847        );
1848
1849        let handler_guard = handler.lock().unwrap();
1850        assert_eq!(handler_guard.len(), 2);
1851        let bar1 = &handler_guard[0];
1852        assert_eq!(bar1.volume, Quantity::from(10));
1853        let bar2 = &handler_guard[1];
1854        assert_eq!(bar2.volume, Quantity::from(10));
1855    }
1856
1857    #[rstest]
1858    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1859        let instrument = InstrumentAny::Equity(equity_aapl);
1860        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
1861        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1862        let handler = Arc::new(Mutex::new(Vec::new()));
1863        let handler_clone = Arc::clone(&handler);
1864
1865        let mut aggregator = ValueBarAggregator::new(
1866            bar_type,
1867            instrument.price_precision(),
1868            instrument.size_precision(),
1869            move |bar: Bar| {
1870                let mut handler_guard = handler_clone.lock().unwrap();
1871                handler_guard.push(bar);
1872            },
1873            false,
1874        );
1875
1876        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
1877        aggregator.update(
1878            Price::from("100.00"),
1879            Quantity::from(5),
1880            UnixNanos::default(),
1881        );
1882        aggregator.update(
1883            Price::from("100.00"),
1884            Quantity::from(5),
1885            UnixNanos::from(1000),
1886        );
1887
1888        let handler_guard = handler.lock().unwrap();
1889        assert_eq!(handler_guard.len(), 1);
1890        let bar = handler_guard.first().unwrap();
1891        assert_eq!(bar.volume, Quantity::from(10));
1892    }
1893
1894    #[rstest]
1895    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1896        let instrument = InstrumentAny::Equity(equity_aapl);
1897        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1898        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1899        let handler = Arc::new(Mutex::new(Vec::new()));
1900        let handler_clone = Arc::clone(&handler);
1901
1902        let mut aggregator = ValueBarAggregator::new(
1903            bar_type,
1904            instrument.price_precision(),
1905            instrument.size_precision(),
1906            move |bar: Bar| {
1907                let mut handler_guard = handler_clone.lock().unwrap();
1908                handler_guard.push(bar);
1909            },
1910            false,
1911        );
1912
1913        // Single large update: $100 * 25 = $2500 (should create 2 bars)
1914        aggregator.update(
1915            Price::from("100.00"),
1916            Quantity::from(25),
1917            UnixNanos::default(),
1918        );
1919
1920        let handler_guard = handler.lock().unwrap();
1921        assert_eq!(handler_guard.len(), 2);
1922        let remaining_value = aggregator.get_cumulative_value();
1923        assert!(remaining_value < 1000.0); // Should be less than threshold
1924    }
1925
1926    #[rstest]
1927    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1928        let instrument = InstrumentAny::Equity(equity_aapl);
1929        // One second bars
1930        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1931        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1932        let handler = Arc::new(Mutex::new(Vec::new()));
1933        let handler_clone = Arc::clone(&handler);
1934        let clock = Rc::new(RefCell::new(TestClock::new()));
1935
1936        let mut aggregator = TimeBarAggregator::new(
1937            bar_type,
1938            instrument.price_precision(),
1939            instrument.size_precision(),
1940            clock.clone(),
1941            move |bar: Bar| {
1942                let mut handler_guard = handler_clone.lock().unwrap();
1943                handler_guard.push(bar);
1944            },
1945            false, // await_partial
1946            true,  // build_with_no_updates
1947            false, // timestamp_on_close
1948            BarIntervalType::LeftOpen,
1949            None,  // time_bars_origin
1950            15,    // composite_bar_build_delay
1951            false, // skip_first_non_full_bar
1952        );
1953
1954        aggregator.update(
1955            Price::from("100.00"),
1956            Quantity::from(1),
1957            UnixNanos::default(),
1958        );
1959
1960        let next_sec = UnixNanos::from(1_000_000_000);
1961        clock.borrow_mut().set_time(next_sec);
1962
1963        let event = TimeEvent::new(
1964            Ustr::from("1-SECOND-LAST"),
1965            UUID4::new(),
1966            next_sec,
1967            next_sec,
1968        );
1969        aggregator.build_bar(event);
1970
1971        let handler_guard = handler.lock().unwrap();
1972        assert_eq!(handler_guard.len(), 1);
1973        let bar = handler_guard.first().unwrap();
1974        assert_eq!(bar.ts_event, UnixNanos::default());
1975        assert_eq!(bar.ts_init, next_sec);
1976    }
1977
1978    #[rstest]
1979    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1980        let instrument = InstrumentAny::Equity(equity_aapl);
1981        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1982        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1983        let handler = Arc::new(Mutex::new(Vec::new()));
1984        let handler_clone = Arc::clone(&handler);
1985        let clock = Rc::new(RefCell::new(TestClock::new()));
1986
1987        let mut aggregator = TimeBarAggregator::new(
1988            bar_type,
1989            instrument.price_precision(),
1990            instrument.size_precision(),
1991            clock.clone(),
1992            move |bar: Bar| {
1993                let mut handler_guard = handler_clone.lock().unwrap();
1994                handler_guard.push(bar);
1995            },
1996            false, // await_partial
1997            true,  // build_with_no_updates
1998            true,  // timestamp_on_close - changed to true to verify left-open behavior
1999            BarIntervalType::LeftOpen,
2000            None,
2001            15,
2002            false, // skip_first_non_full_bar
2003        );
2004
2005        // Update in first interval
2006        aggregator.update(
2007            Price::from("100.00"),
2008            Quantity::from(1),
2009            UnixNanos::default(),
2010        );
2011
2012        // First interval close
2013        let ts1 = UnixNanos::from(1_000_000_000);
2014        clock.borrow_mut().set_time(ts1);
2015        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2016        aggregator.build_bar(event);
2017
2018        // Update in second interval
2019        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2020
2021        // Second interval close
2022        let ts2 = UnixNanos::from(2_000_000_000);
2023        clock.borrow_mut().set_time(ts2);
2024        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2025        aggregator.build_bar(event);
2026
2027        let handler_guard = handler.lock().unwrap();
2028        assert_eq!(handler_guard.len(), 2);
2029
2030        let bar1 = &handler_guard[0];
2031        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
2032        assert_eq!(bar1.ts_init, ts1);
2033        assert_eq!(bar1.close, Price::from("100.00"));
2034        let bar2 = &handler_guard[1];
2035        assert_eq!(bar2.ts_event, ts2);
2036        assert_eq!(bar2.ts_init, ts2);
2037        assert_eq!(bar2.close, Price::from("101.00"));
2038    }
2039
2040    #[rstest]
2041    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
2042        let instrument = InstrumentAny::Equity(equity_aapl);
2043        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2044        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2045        let handler = Arc::new(Mutex::new(Vec::new()));
2046        let handler_clone = Arc::clone(&handler);
2047        let clock = Rc::new(RefCell::new(TestClock::new()));
2048        let mut aggregator = TimeBarAggregator::new(
2049            bar_type,
2050            instrument.price_precision(),
2051            instrument.size_precision(),
2052            clock.clone(),
2053            move |bar: Bar| {
2054                let mut handler_guard = handler_clone.lock().unwrap();
2055                handler_guard.push(bar);
2056            },
2057            false, // await_partial
2058            true,  // build_with_no_updates
2059            true,  // timestamp_on_close
2060            BarIntervalType::RightOpen,
2061            None,
2062            15,
2063            false, // skip_first_non_full_bar
2064        );
2065
2066        // Update in first interval
2067        aggregator.update(
2068            Price::from("100.00"),
2069            Quantity::from(1),
2070            UnixNanos::default(),
2071        );
2072
2073        // First interval close
2074        let ts1 = UnixNanos::from(1_000_000_000);
2075        clock.borrow_mut().set_time(ts1);
2076        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2077        aggregator.build_bar(event);
2078
2079        // Update in second interval
2080        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2081
2082        // Second interval close
2083        let ts2 = UnixNanos::from(2_000_000_000);
2084        clock.borrow_mut().set_time(ts2);
2085        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2086        aggregator.build_bar(event);
2087
2088        let handler_guard = handler.lock().unwrap();
2089        assert_eq!(handler_guard.len(), 2);
2090
2091        let bar1 = &handler_guard[0];
2092        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
2093        assert_eq!(bar1.ts_init, ts1);
2094        assert_eq!(bar1.close, Price::from("100.00"));
2095
2096        let bar2 = &handler_guard[1];
2097        assert_eq!(bar2.ts_event, ts1);
2098        assert_eq!(bar2.ts_init, ts2);
2099        assert_eq!(bar2.close, Price::from("101.00"));
2100    }
2101
2102    #[rstest]
2103    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2104        let instrument = InstrumentAny::Equity(equity_aapl);
2105        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2106        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2107        let handler = Arc::new(Mutex::new(Vec::new()));
2108        let handler_clone = Arc::clone(&handler);
2109        let clock = Rc::new(RefCell::new(TestClock::new()));
2110
2111        // First test with build_with_no_updates = false
2112        let mut aggregator = TimeBarAggregator::new(
2113            bar_type,
2114            instrument.price_precision(),
2115            instrument.size_precision(),
2116            clock.clone(),
2117            move |bar: Bar| {
2118                let mut handler_guard = handler_clone.lock().unwrap();
2119                handler_guard.push(bar);
2120            },
2121            false, // await_partial
2122            false, // build_with_no_updates disabled
2123            true,  // timestamp_on_close
2124            BarIntervalType::LeftOpen,
2125            None,
2126            15,
2127            false, // skip_first_non_full_bar
2128        );
2129
2130        // No updates, just interval close
2131        let ts1 = UnixNanos::from(1_000_000_000);
2132        clock.borrow_mut().set_time(ts1);
2133        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2134        aggregator.build_bar(event);
2135
2136        let handler_guard = handler.lock().unwrap();
2137        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
2138        drop(handler_guard);
2139
2140        // Now test with build_with_no_updates = true
2141        let handler = Arc::new(Mutex::new(Vec::new()));
2142        let handler_clone = Arc::clone(&handler);
2143        let mut aggregator = TimeBarAggregator::new(
2144            bar_type,
2145            instrument.price_precision(),
2146            instrument.size_precision(),
2147            clock.clone(),
2148            move |bar: Bar| {
2149                let mut handler_guard = handler_clone.lock().unwrap();
2150                handler_guard.push(bar);
2151            },
2152            false,
2153            true, // build_with_no_updates enabled
2154            true, // timestamp_on_close
2155            BarIntervalType::LeftOpen,
2156            None,
2157            15,
2158            false, // skip_first_non_full_bar
2159        );
2160
2161        aggregator.update(
2162            Price::from("100.00"),
2163            Quantity::from(1),
2164            UnixNanos::default(),
2165        );
2166
2167        // First interval with update
2168        let ts1 = UnixNanos::from(1_000_000_000);
2169        clock.borrow_mut().set_time(ts1);
2170        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2171        aggregator.build_bar(event);
2172
2173        // Second interval without updates
2174        let ts2 = UnixNanos::from(2_000_000_000);
2175        clock.borrow_mut().set_time(ts2);
2176        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2177        aggregator.build_bar(event);
2178
2179        let handler_guard = handler.lock().unwrap();
2180        assert_eq!(handler_guard.len(), 2); // Both bars should be built
2181        let bar1 = &handler_guard[0];
2182        assert_eq!(bar1.close, Price::from("100.00"));
2183        let bar2 = &handler_guard[1];
2184        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
2185    }
2186
2187    #[rstest]
2188    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2189        let instrument = InstrumentAny::Equity(equity_aapl);
2190        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2191        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2192        let clock = Rc::new(RefCell::new(TestClock::new()));
2193        let handler = Arc::new(Mutex::new(Vec::new()));
2194        let handler_clone = Arc::clone(&handler);
2195
2196        let mut aggregator = TimeBarAggregator::new(
2197            bar_type,
2198            instrument.price_precision(),
2199            instrument.size_precision(),
2200            clock.clone(),
2201            move |bar: Bar| {
2202                let mut handler_guard = handler_clone.lock().unwrap();
2203                handler_guard.push(bar);
2204            },
2205            false, // await_partial
2206            true,  // build_with_no_updates
2207            true,  // timestamp_on_close
2208            BarIntervalType::RightOpen,
2209            None,
2210            15,
2211            false, // skip_first_non_full_bar
2212        );
2213
2214        let ts1 = UnixNanos::from(1_000_000_000);
2215        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2216
2217        let ts2 = UnixNanos::from(2_000_000_000);
2218        clock.borrow_mut().set_time(ts2);
2219
2220        // Simulate timestamp on close
2221        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2222        aggregator.build_bar(event);
2223
2224        let handler_guard = handler.lock().unwrap();
2225        let bar = handler_guard.first().unwrap();
2226        assert_eq!(bar.ts_event, UnixNanos::default());
2227        assert_eq!(bar.ts_init, ts2);
2228    }
2229
2230    #[rstest]
2231    fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2232        let instrument = InstrumentAny::Equity(equity_aapl);
2233        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2234        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2235        let clock = Rc::new(RefCell::new(TestClock::new()));
2236        let handler = Arc::new(Mutex::new(Vec::new()));
2237        let handler_clone = Arc::clone(&handler);
2238
2239        let mut aggregator = TimeBarAggregator::new(
2240            bar_type,
2241            instrument.price_precision(),
2242            instrument.size_precision(),
2243            clock.clone(),
2244            move |bar: Bar| {
2245                let mut handler_guard = handler_clone.lock().unwrap();
2246                handler_guard.push(bar);
2247            },
2248            false, // await_partial
2249            true,  // build_with_no_updates
2250            true,  // timestamp_on_close
2251            BarIntervalType::LeftOpen,
2252            None,
2253            15,
2254            false, // skip_first_non_full_bar
2255        );
2256
2257        let ts1 = UnixNanos::from(1_000_000_000);
2258        clock.borrow_mut().set_time(ts1);
2259
2260        let initial_time = clock.borrow().utc_now();
2261        aggregator.start_batch_time(UnixNanos::from(
2262            initial_time.timestamp_nanos_opt().unwrap() as u64
2263        ));
2264
2265        let handler_guard = handler.lock().unwrap();
2266        assert_eq!(handler_guard.len(), 0);
2267    }
2268}