Skip to main content

nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{
22    any::Any,
23    cell::RefCell,
24    fmt::Debug,
25    ops::Add,
26    rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31    clock::{Clock, TestClock},
32    timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35    UnixNanos,
36    correctness::{self, FAILED},
37    datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40    data::{
41        QuoteTick, TradeTick,
42        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43    },
44    enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45    types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48/// Type alias for bar handler to reduce type complexity.
49type BarHandler = Box<dyn FnMut(Bar)>;
50
51/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
52///
53/// Implementors receive updates and produce completed bars via handlers.
54pub trait BarAggregator: Any + Debug {
55    /// The [`BarType`] to be aggregated.
56    fn bar_type(&self) -> BarType;
57    /// If the aggregator is running and will receive data from the message bus.
58    fn is_running(&self) -> bool;
59    /// Sets the running state of the aggregator (receiving updates when `true`).
60    fn set_is_running(&mut self, value: bool);
61    /// Updates the aggregator  with the given price and size.
62    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63    /// Updates the aggregator with the given quote.
64    fn handle_quote(&mut self, quote: QuoteTick) {
65        let spec = self.bar_type().spec();
66        self.update(
67            quote.extract_price(spec.price_type),
68            quote.extract_size(spec.price_type),
69            quote.ts_init,
70        );
71    }
72    /// Updates the aggregator with the given trade.
73    fn handle_trade(&mut self, trade: TradeTick) {
74        self.update(trade.price, trade.size, trade.ts_init);
75    }
76    /// Updates the aggregator with the given bar.
77    fn handle_bar(&mut self, bar: Bar) {
78        self.update_bar(bar, bar.volume, bar.ts_init);
79    }
80    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
82    fn stop(&mut self) {}
83    /// Sets historical mode (default implementation does nothing, TimeBarAggregator overrides)
84    fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85    /// Sets historical events (default implementation does nothing, TimeBarAggregator overrides)
86    fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87    /// Sets clock for time bar aggregators (default implementation does nothing, TimeBarAggregator overrides)
88    fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89    /// Builds a bar from a time event (default implementation does nothing, TimeBarAggregator overrides)
90    fn build_bar(&mut self, _event: TimeEvent) {}
91    /// Starts the timer for time bar aggregators.
92    /// Default implementation does nothing, TimeBarAggregator overrides.
93    /// Takes an optional Rc to create weak reference internally.
94    fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95    /// Sets the weak reference to the aggregator wrapper (for historical mode).
96    /// Default implementation does nothing, TimeBarAggregator overrides.
97    fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101    /// Returns a reference to this aggregator as `Any` for downcasting.
102    pub fn as_any(&self) -> &dyn Any {
103        self
104    }
105    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
106    pub fn as_any_mut(&mut self) -> &mut dyn Any {
107        self
108    }
109}
110
111/// Provides a generic bar builder for aggregation.
112#[derive(Debug)]
113pub struct BarBuilder {
114    bar_type: BarType,
115    price_precision: u8,
116    size_precision: u8,
117    initialized: bool,
118    ts_last: UnixNanos,
119    count: usize,
120    last_close: Option<Price>,
121    open: Option<Price>,
122    high: Option<Price>,
123    low: Option<Price>,
124    close: Option<Price>,
125    volume: Quantity,
126}
127
128impl BarBuilder {
129    /// Creates a new [`BarBuilder`] instance.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
134    #[must_use]
135    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
136        correctness::check_equal(
137            &bar_type.aggregation_source(),
138            &AggregationSource::Internal,
139            "bar_type.aggregation_source",
140            "AggregationSource::Internal",
141        )
142        .expect(FAILED);
143
144        Self {
145            bar_type,
146            price_precision,
147            size_precision,
148            initialized: false,
149            ts_last: UnixNanos::default(),
150            count: 0,
151            last_close: None,
152            open: None,
153            high: None,
154            low: None,
155            close: None,
156            volume: Quantity::zero(size_precision),
157        }
158    }
159
160    /// Updates the builder state with the given price, size, and init timestamp.
161    ///
162    /// # Panics
163    ///
164    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
165    pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
166        if ts_init < self.ts_last {
167            return; // Not applicable
168        }
169
170        if self.open.is_none() {
171            self.open = Some(price);
172            self.high = Some(price);
173            self.low = Some(price);
174            self.initialized = true;
175        } else {
176            if price > self.high.unwrap() {
177                self.high = Some(price);
178            }
179
180            if price < self.low.unwrap() {
181                self.low = Some(price);
182            }
183        }
184
185        self.close = Some(price);
186        self.volume = self.volume.add(size);
187        self.count += 1;
188        self.ts_last = ts_init;
189    }
190
191    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
192    ///
193    /// # Panics
194    ///
195    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
196    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
197        if ts_init < self.ts_last {
198            return; // Not applicable
199        }
200
201        if self.open.is_none() {
202            self.open = Some(bar.open);
203            self.high = Some(bar.high);
204            self.low = Some(bar.low);
205            self.initialized = true;
206        } else {
207            if bar.high > self.high.unwrap() {
208                self.high = Some(bar.high);
209            }
210
211            if bar.low < self.low.unwrap() {
212                self.low = Some(bar.low);
213            }
214        }
215
216        self.close = Some(bar.close);
217        self.volume = self.volume.add(volume);
218        self.count += 1;
219        self.ts_last = ts_init;
220    }
221
222    /// Reset the bar builder.
223    ///
224    /// All stateful fields are reset to their initial value.
225    pub fn reset(&mut self) {
226        self.open = None;
227        self.high = None;
228        self.low = None;
229        self.volume = Quantity::zero(self.size_precision);
230        self.count = 0;
231    }
232
233    /// Return the aggregated bar and reset.
234    pub fn build_now(&mut self) -> Bar {
235        self.build(self.ts_last, self.ts_last)
236    }
237
238    /// Returns the aggregated bar for the given timestamps, then resets the builder.
239    ///
240    /// # Panics
241    ///
242    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
243    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
244        if self.open.is_none() {
245            self.open = self.last_close;
246            self.high = self.last_close;
247            self.low = self.last_close;
248            self.close = self.last_close;
249        }
250
251        if let (Some(close), Some(low)) = (self.close, self.low)
252            && close < low
253        {
254            self.low = Some(close);
255        }
256
257        if let (Some(close), Some(high)) = (self.close, self.high)
258            && close > high
259        {
260            self.high = Some(close);
261        }
262
263        // The open was checked, so we can assume all prices are Some
264        let bar = Bar::new(
265            self.bar_type,
266            self.open.unwrap(),
267            self.high.unwrap(),
268            self.low.unwrap(),
269            self.close.unwrap(),
270            self.volume,
271            ts_event,
272            ts_init,
273        );
274
275        self.last_close = self.close;
276        self.reset();
277        bar
278    }
279}
280
281/// Provides a means of aggregating specified bar types and sending to a registered handler.
282pub struct BarAggregatorCore {
283    bar_type: BarType,
284    builder: BarBuilder,
285    handler: BarHandler,
286    is_running: bool,
287}
288
289impl Debug for BarAggregatorCore {
290    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291        f.debug_struct(stringify!(BarAggregatorCore))
292            .field("bar_type", &self.bar_type)
293            .field("builder", &self.builder)
294            .field("is_running", &self.is_running)
295            .finish()
296    }
297}
298
299impl BarAggregatorCore {
300    /// Creates a new [`BarAggregatorCore`] instance.
301    ///
302    /// # Panics
303    ///
304    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
305    pub fn new<H: FnMut(Bar) + 'static>(
306        bar_type: BarType,
307        price_precision: u8,
308        size_precision: u8,
309        handler: H,
310    ) -> Self {
311        Self {
312            bar_type,
313            builder: BarBuilder::new(bar_type, price_precision, size_precision),
314            handler: Box::new(handler),
315            is_running: false,
316        }
317    }
318
319    /// Sets the running state of the aggregator (receives updates when `true`).
320    pub const fn set_is_running(&mut self, value: bool) {
321        self.is_running = value;
322    }
323    fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
324        self.builder.update(price, size, ts_init);
325    }
326
327    fn build_now_and_send(&mut self) {
328        let bar = self.builder.build_now();
329        (self.handler)(bar);
330    }
331
332    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
333        let bar = self.builder.build(ts_event, ts_init);
334        (self.handler)(bar);
335    }
336}
337
338/// Provides a means of building tick bars aggregated from quote and trades.
339///
340/// When received tick count reaches the step threshold of the bar
341/// specification, then a bar is created and sent to the handler.
342pub struct TickBarAggregator {
343    core: BarAggregatorCore,
344}
345
346impl Debug for TickBarAggregator {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        f.debug_struct(stringify!(TickBarAggregator))
349            .field("core", &self.core)
350            .finish()
351    }
352}
353
354impl TickBarAggregator {
355    /// Creates a new [`TickBarAggregator`] instance.
356    ///
357    /// # Panics
358    ///
359    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
360    pub fn new<H: FnMut(Bar) + 'static>(
361        bar_type: BarType,
362        price_precision: u8,
363        size_precision: u8,
364        handler: H,
365    ) -> Self {
366        Self {
367            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
368        }
369    }
370}
371
372impl BarAggregator for TickBarAggregator {
373    fn bar_type(&self) -> BarType {
374        self.core.bar_type
375    }
376
377    fn is_running(&self) -> bool {
378        self.core.is_running
379    }
380
381    fn set_is_running(&mut self, value: bool) {
382        self.core.set_is_running(value);
383    }
384
385    /// Apply the given update to the aggregator.
386    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
387        self.core.apply_update(price, size, ts_init);
388        let spec = self.core.bar_type.spec();
389
390        if self.core.builder.count >= spec.step.get() {
391            self.core.build_now_and_send();
392        }
393    }
394
395    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
396        self.core.builder.update_bar(bar, volume, ts_init);
397        let spec = self.core.bar_type.spec();
398
399        if self.core.builder.count >= spec.step.get() {
400            self.core.build_now_and_send();
401        }
402    }
403}
404
405/// Aggregates bars based on tick buy/sell imbalance.
406///
407/// Increments imbalance by +1 for buyer-aggressed trades and -1 for seller-aggressed trades.
408/// Emits a bar when the absolute imbalance reaches the step threshold.
409pub struct TickImbalanceBarAggregator {
410    core: BarAggregatorCore,
411    imbalance: isize,
412}
413
414impl Debug for TickImbalanceBarAggregator {
415    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416        f.debug_struct(stringify!(TickImbalanceBarAggregator))
417            .field("core", &self.core)
418            .field("imbalance", &self.imbalance)
419            .finish()
420    }
421}
422
423impl TickImbalanceBarAggregator {
424    /// Creates a new [`TickImbalanceBarAggregator`] instance.
425    ///
426    /// # Panics
427    ///
428    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
429    pub fn new<H: FnMut(Bar) + 'static>(
430        bar_type: BarType,
431        price_precision: u8,
432        size_precision: u8,
433        handler: H,
434    ) -> Self {
435        Self {
436            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
437            imbalance: 0,
438        }
439    }
440}
441
442impl BarAggregator for TickImbalanceBarAggregator {
443    fn bar_type(&self) -> BarType {
444        self.core.bar_type
445    }
446
447    fn is_running(&self) -> bool {
448        self.core.is_running
449    }
450
451    fn set_is_running(&mut self, value: bool) {
452        self.core.set_is_running(value);
453    }
454
455    /// Apply the given update to the aggregator.
456    ///
457    /// Note: side-aware logic lives in `handle_trade`. This method is used for
458    /// quote/bar updates where no aggressor side is available.
459    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
460        self.core.apply_update(price, size, ts_init);
461    }
462
463    fn handle_trade(&mut self, trade: TradeTick) {
464        self.core
465            .apply_update(trade.price, trade.size, trade.ts_init);
466
467        let delta = match trade.aggressor_side {
468            AggressorSide::Buyer => 1,
469            AggressorSide::Seller => -1,
470            AggressorSide::NoAggressor => 0,
471        };
472
473        if delta == 0 {
474            return;
475        }
476
477        self.imbalance += delta;
478        let threshold = self.core.bar_type.spec().step.get();
479        if self.imbalance.unsigned_abs() >= threshold {
480            self.core.build_now_and_send();
481            self.imbalance = 0;
482        }
483    }
484
485    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
486        self.core.builder.update_bar(bar, volume, ts_init);
487    }
488}
489
490/// Aggregates bars based on consecutive buy/sell tick runs.
491pub struct TickRunsBarAggregator {
492    core: BarAggregatorCore,
493    current_run_side: Option<AggressorSide>,
494    run_count: usize,
495}
496
497impl Debug for TickRunsBarAggregator {
498    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499        f.debug_struct(stringify!(TickRunsBarAggregator))
500            .field("core", &self.core)
501            .field("current_run_side", &self.current_run_side)
502            .field("run_count", &self.run_count)
503            .finish()
504    }
505}
506
507impl TickRunsBarAggregator {
508    /// Creates a new [`TickRunsBarAggregator`] instance.
509    ///
510    /// # Panics
511    ///
512    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
513    pub fn new<H: FnMut(Bar) + 'static>(
514        bar_type: BarType,
515        price_precision: u8,
516        size_precision: u8,
517        handler: H,
518    ) -> Self {
519        Self {
520            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
521            current_run_side: None,
522            run_count: 0,
523        }
524    }
525}
526
527impl BarAggregator for TickRunsBarAggregator {
528    fn bar_type(&self) -> BarType {
529        self.core.bar_type
530    }
531
532    fn is_running(&self) -> bool {
533        self.core.is_running
534    }
535
536    fn set_is_running(&mut self, value: bool) {
537        self.core.set_is_running(value);
538    }
539
540    /// Apply the given update to the aggregator.
541    ///
542    /// Note: side-aware logic lives in `handle_trade`. This method is used for
543    /// quote/bar updates where no aggressor side is available.
544    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
545        self.core.apply_update(price, size, ts_init);
546    }
547
548    fn handle_trade(&mut self, trade: TradeTick) {
549        let side = match trade.aggressor_side {
550            AggressorSide::Buyer => Some(AggressorSide::Buyer),
551            AggressorSide::Seller => Some(AggressorSide::Seller),
552            AggressorSide::NoAggressor => None,
553        };
554
555        if let Some(side) = side {
556            if self.current_run_side != Some(side) {
557                self.current_run_side = Some(side);
558                self.run_count = 0;
559                self.core.builder.reset();
560            }
561
562            self.core
563                .apply_update(trade.price, trade.size, trade.ts_init);
564            self.run_count += 1;
565
566            let threshold = self.core.bar_type.spec().step.get();
567            if self.run_count >= threshold {
568                self.core.build_now_and_send();
569                self.run_count = 0;
570                self.current_run_side = None;
571            }
572        } else {
573            self.core
574                .apply_update(trade.price, trade.size, trade.ts_init);
575        }
576    }
577
578    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
579        self.core.builder.update_bar(bar, volume, ts_init);
580    }
581}
582
583/// Provides a means of building volume bars aggregated from quote and trades.
584pub struct VolumeBarAggregator {
585    core: BarAggregatorCore,
586}
587
588impl Debug for VolumeBarAggregator {
589    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
590        f.debug_struct(stringify!(VolumeBarAggregator))
591            .field("core", &self.core)
592            .finish()
593    }
594}
595
596impl VolumeBarAggregator {
597    /// Creates a new [`VolumeBarAggregator`] instance.
598    ///
599    /// # Panics
600    ///
601    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
602    pub fn new<H: FnMut(Bar) + 'static>(
603        bar_type: BarType,
604        price_precision: u8,
605        size_precision: u8,
606        handler: H,
607    ) -> Self {
608        Self {
609            core: BarAggregatorCore::new(
610                bar_type.standard(),
611                price_precision,
612                size_precision,
613                handler,
614            ),
615        }
616    }
617}
618
619impl BarAggregator for VolumeBarAggregator {
620    fn bar_type(&self) -> BarType {
621        self.core.bar_type
622    }
623
624    fn is_running(&self) -> bool {
625        self.core.is_running
626    }
627
628    fn set_is_running(&mut self, value: bool) {
629        self.core.set_is_running(value);
630    }
631
632    /// Apply the given update to the aggregator.
633    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
634        let mut raw_size_update = size.raw;
635        let spec = self.core.bar_type.spec();
636        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
637
638        while raw_size_update > 0 {
639            if self.core.builder.volume.raw + raw_size_update < raw_step {
640                self.core.apply_update(
641                    price,
642                    Quantity::from_raw(raw_size_update, size.precision),
643                    ts_init,
644                );
645                break;
646            }
647
648            let raw_size_diff = raw_step - self.core.builder.volume.raw;
649            self.core.apply_update(
650                price,
651                Quantity::from_raw(raw_size_diff, size.precision),
652                ts_init,
653            );
654
655            self.core.build_now_and_send();
656            raw_size_update -= raw_size_diff;
657        }
658    }
659
660    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
661        let mut raw_volume_update = volume.raw;
662        let spec = self.core.bar_type.spec();
663        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
664
665        while raw_volume_update > 0 {
666            if self.core.builder.volume.raw + raw_volume_update < raw_step {
667                self.core.builder.update_bar(
668                    bar,
669                    Quantity::from_raw(raw_volume_update, volume.precision),
670                    ts_init,
671                );
672                break;
673            }
674
675            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
676            self.core.builder.update_bar(
677                bar,
678                Quantity::from_raw(raw_volume_diff, volume.precision),
679                ts_init,
680            );
681
682            self.core.build_now_and_send();
683            raw_volume_update -= raw_volume_diff;
684        }
685    }
686}
687
688/// Aggregates bars based on buy/sell volume imbalance.
689pub struct VolumeImbalanceBarAggregator {
690    core: BarAggregatorCore,
691    imbalance_raw: i128,
692    raw_step: i128,
693}
694
695impl Debug for VolumeImbalanceBarAggregator {
696    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697        f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
698            .field("core", &self.core)
699            .field("imbalance_raw", &self.imbalance_raw)
700            .field("raw_step", &self.raw_step)
701            .finish()
702    }
703}
704
705impl VolumeImbalanceBarAggregator {
706    /// Creates a new [`VolumeImbalanceBarAggregator`] instance.
707    ///
708    /// # Panics
709    ///
710    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
711    pub fn new<H: FnMut(Bar) + 'static>(
712        bar_type: BarType,
713        price_precision: u8,
714        size_precision: u8,
715        handler: H,
716    ) -> Self {
717        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
718        Self {
719            core: BarAggregatorCore::new(
720                bar_type.standard(),
721                price_precision,
722                size_precision,
723                handler,
724            ),
725            imbalance_raw: 0,
726            raw_step,
727        }
728    }
729}
730
731impl BarAggregator for VolumeImbalanceBarAggregator {
732    fn bar_type(&self) -> BarType {
733        self.core.bar_type
734    }
735
736    fn is_running(&self) -> bool {
737        self.core.is_running
738    }
739
740    fn set_is_running(&mut self, value: bool) {
741        self.core.set_is_running(value);
742    }
743
744    /// Apply the given update to the aggregator.
745    ///
746    /// Note: side-aware logic lives in `handle_trade`. This method is used for
747    /// quote/bar updates where no aggressor side is available.
748    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
749        self.core.apply_update(price, size, ts_init);
750    }
751
752    fn handle_trade(&mut self, trade: TradeTick) {
753        let side = match trade.aggressor_side {
754            AggressorSide::Buyer => 1,
755            AggressorSide::Seller => -1,
756            AggressorSide::NoAggressor => {
757                self.core
758                    .apply_update(trade.price, trade.size, trade.ts_init);
759                return;
760            }
761        };
762
763        let mut raw_remaining = trade.size.raw as i128;
764        while raw_remaining > 0 {
765            let imbalance_abs = self.imbalance_raw.abs();
766            let needed = (self.raw_step - imbalance_abs).max(1);
767            let raw_chunk = raw_remaining.min(needed);
768            let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
769
770            self.core
771                .apply_update(trade.price, qty_chunk, trade.ts_init);
772
773            self.imbalance_raw += side * raw_chunk;
774            raw_remaining -= raw_chunk;
775
776            if self.imbalance_raw.abs() >= self.raw_step {
777                self.core.build_now_and_send();
778                self.imbalance_raw = 0;
779            }
780        }
781    }
782
783    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
784        self.core.builder.update_bar(bar, volume, ts_init);
785    }
786}
787
788/// Aggregates bars based on consecutive buy/sell volume runs.
789pub struct VolumeRunsBarAggregator {
790    core: BarAggregatorCore,
791    current_run_side: Option<AggressorSide>,
792    run_volume_raw: QuantityRaw,
793    raw_step: QuantityRaw,
794}
795
796impl Debug for VolumeRunsBarAggregator {
797    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
798        f.debug_struct(stringify!(VolumeRunsBarAggregator))
799            .field("core", &self.core)
800            .field("current_run_side", &self.current_run_side)
801            .field("run_volume_raw", &self.run_volume_raw)
802            .field("raw_step", &self.raw_step)
803            .finish()
804    }
805}
806
807impl VolumeRunsBarAggregator {
808    /// Creates a new [`VolumeRunsBarAggregator`] instance.
809    ///
810    /// # Panics
811    ///
812    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
813    pub fn new<H: FnMut(Bar) + 'static>(
814        bar_type: BarType,
815        price_precision: u8,
816        size_precision: u8,
817        handler: H,
818    ) -> Self {
819        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
820        Self {
821            core: BarAggregatorCore::new(
822                bar_type.standard(),
823                price_precision,
824                size_precision,
825                handler,
826            ),
827            current_run_side: None,
828            run_volume_raw: 0,
829            raw_step,
830        }
831    }
832}
833
834impl BarAggregator for VolumeRunsBarAggregator {
835    fn bar_type(&self) -> BarType {
836        self.core.bar_type
837    }
838
839    fn is_running(&self) -> bool {
840        self.core.is_running
841    }
842
843    fn set_is_running(&mut self, value: bool) {
844        self.core.set_is_running(value);
845    }
846
847    /// Apply the given update to the aggregator.
848    ///
849    /// Note: side-aware logic lives in `handle_trade`. This method is used for
850    /// quote/bar updates where no aggressor side is available.
851    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
852        self.core.apply_update(price, size, ts_init);
853    }
854
855    fn handle_trade(&mut self, trade: TradeTick) {
856        let side = match trade.aggressor_side {
857            AggressorSide::Buyer => Some(AggressorSide::Buyer),
858            AggressorSide::Seller => Some(AggressorSide::Seller),
859            AggressorSide::NoAggressor => None,
860        };
861
862        let Some(side) = side else {
863            self.core
864                .apply_update(trade.price, trade.size, trade.ts_init);
865            return;
866        };
867
868        if self.current_run_side != Some(side) {
869            self.current_run_side = Some(side);
870            self.run_volume_raw = 0;
871            self.core.builder.reset();
872        }
873
874        let mut raw_remaining = trade.size.raw;
875        while raw_remaining > 0 {
876            let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
877            let raw_chunk = raw_remaining.min(needed);
878
879            self.core.apply_update(
880                trade.price,
881                Quantity::from_raw(raw_chunk, trade.size.precision),
882                trade.ts_init,
883            );
884
885            self.run_volume_raw += raw_chunk;
886            raw_remaining -= raw_chunk;
887
888            if self.run_volume_raw >= self.raw_step {
889                self.core.build_now_and_send();
890                self.run_volume_raw = 0;
891                self.current_run_side = None;
892            }
893        }
894    }
895
896    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
897        self.core.builder.update_bar(bar, volume, ts_init);
898    }
899}
900
901/// Provides a means of building value bars aggregated from quote and trades.
902///
903/// When received value reaches the step threshold of the bar
904/// specification, then a bar is created and sent to the handler.
905pub struct ValueBarAggregator {
906    core: BarAggregatorCore,
907    cum_value: f64,
908}
909
910impl Debug for ValueBarAggregator {
911    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
912        f.debug_struct(stringify!(ValueBarAggregator))
913            .field("core", &self.core)
914            .field("cum_value", &self.cum_value)
915            .finish()
916    }
917}
918
919impl ValueBarAggregator {
920    /// Creates a new [`ValueBarAggregator`] instance.
921    ///
922    /// # Panics
923    ///
924    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
925    pub fn new<H: FnMut(Bar) + 'static>(
926        bar_type: BarType,
927        price_precision: u8,
928        size_precision: u8,
929        handler: H,
930    ) -> Self {
931        Self {
932            core: BarAggregatorCore::new(
933                bar_type.standard(),
934                price_precision,
935                size_precision,
936                handler,
937            ),
938            cum_value: 0.0,
939        }
940    }
941
942    #[must_use]
943    /// Returns the cumulative value for the aggregator.
944    pub const fn get_cumulative_value(&self) -> f64 {
945        self.cum_value
946    }
947}
948
949impl BarAggregator for ValueBarAggregator {
950    fn bar_type(&self) -> BarType {
951        self.core.bar_type
952    }
953
954    fn is_running(&self) -> bool {
955        self.core.is_running
956    }
957
958    fn set_is_running(&mut self, value: bool) {
959        self.core.set_is_running(value);
960    }
961
962    /// Apply the given update to the aggregator.
963    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
964        let mut size_update = size.as_f64();
965        let spec = self.core.bar_type.spec();
966
967        while size_update > 0.0 {
968            let value_update = price.as_f64() * size_update;
969            if value_update == 0.0 {
970                // Prevent division by zero - apply remaining size without triggering bar
971                self.core
972                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
973                break;
974            }
975
976            if self.cum_value + value_update < spec.step.get() as f64 {
977                self.cum_value += value_update;
978                self.core
979                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
980                break;
981            }
982
983            let value_diff = spec.step.get() as f64 - self.cum_value;
984            let mut size_diff = size_update * (value_diff / value_update);
985
986            // Clamp to minimum representable size to avoid zero-volume bars
987            if is_below_min_size(size_diff, size.precision) {
988                if is_below_min_size(size_update, size.precision) {
989                    break;
990                }
991                size_diff = min_size_f64(size.precision);
992            }
993
994            self.core
995                .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
996
997            self.core.build_now_and_send();
998            self.cum_value = 0.0;
999            size_update -= size_diff;
1000        }
1001    }
1002
1003    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1004        let mut volume_update = volume;
1005        let average_price = Price::new(
1006            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1007            self.core.builder.price_precision,
1008        );
1009
1010        while volume_update.as_f64() > 0.0 {
1011            let value_update = average_price.as_f64() * volume_update.as_f64();
1012            if value_update == 0.0 {
1013                // Prevent division by zero - apply remaining volume without triggering bar
1014                self.core.builder.update_bar(bar, volume_update, ts_init);
1015                break;
1016            }
1017
1018            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1019                self.cum_value += value_update;
1020                self.core.builder.update_bar(bar, volume_update, ts_init);
1021                break;
1022            }
1023
1024            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1025            let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1026
1027            // Clamp to minimum representable size to avoid zero-volume bars
1028            if is_below_min_size(volume_diff, volume_update.precision) {
1029                if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1030                    break;
1031                }
1032                volume_diff = min_size_f64(volume_update.precision);
1033            }
1034
1035            self.core.builder.update_bar(
1036                bar,
1037                Quantity::new(volume_diff, volume_update.precision),
1038                ts_init,
1039            );
1040
1041            self.core.build_now_and_send();
1042            self.cum_value = 0.0;
1043            volume_update = Quantity::new(
1044                volume_update.as_f64() - volume_diff,
1045                volume_update.precision,
1046            );
1047        }
1048    }
1049}
1050
1051/// Aggregates bars based on buy/sell notional imbalance.
1052pub struct ValueImbalanceBarAggregator {
1053    core: BarAggregatorCore,
1054    imbalance_value: f64,
1055    step_value: f64,
1056}
1057
1058impl Debug for ValueImbalanceBarAggregator {
1059    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1060        f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1061            .field("core", &self.core)
1062            .field("imbalance_value", &self.imbalance_value)
1063            .field("step_value", &self.step_value)
1064            .finish()
1065    }
1066}
1067
1068impl ValueImbalanceBarAggregator {
1069    /// Creates a new [`ValueImbalanceBarAggregator`] instance.
1070    ///
1071    /// # Panics
1072    ///
1073    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1074    pub fn new<H: FnMut(Bar) + 'static>(
1075        bar_type: BarType,
1076        price_precision: u8,
1077        size_precision: u8,
1078        handler: H,
1079    ) -> Self {
1080        Self {
1081            core: BarAggregatorCore::new(
1082                bar_type.standard(),
1083                price_precision,
1084                size_precision,
1085                handler,
1086            ),
1087            imbalance_value: 0.0,
1088            step_value: bar_type.spec().step.get() as f64,
1089        }
1090    }
1091}
1092
1093impl BarAggregator for ValueImbalanceBarAggregator {
1094    fn bar_type(&self) -> BarType {
1095        self.core.bar_type
1096    }
1097
1098    fn is_running(&self) -> bool {
1099        self.core.is_running
1100    }
1101
1102    fn set_is_running(&mut self, value: bool) {
1103        self.core.set_is_running(value);
1104    }
1105
1106    /// Apply the given update to the aggregator.
1107    ///
1108    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1109    /// quote/bar updates where no aggressor side is available.
1110    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1111        self.core.apply_update(price, size, ts_init);
1112    }
1113
1114    fn handle_trade(&mut self, trade: TradeTick) {
1115        let price_f64 = trade.price.as_f64();
1116        if price_f64 == 0.0 {
1117            self.core
1118                .apply_update(trade.price, trade.size, trade.ts_init);
1119            return;
1120        }
1121
1122        let side_sign = match trade.aggressor_side {
1123            AggressorSide::Buyer => 1.0,
1124            AggressorSide::Seller => -1.0,
1125            AggressorSide::NoAggressor => {
1126                self.core
1127                    .apply_update(trade.price, trade.size, trade.ts_init);
1128                return;
1129            }
1130        };
1131
1132        let mut size_remaining = trade.size.as_f64();
1133        while size_remaining > 0.0 {
1134            let value_remaining = price_f64 * size_remaining;
1135
1136            if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1137                let needed = self.step_value - self.imbalance_value.abs();
1138                if value_remaining <= needed {
1139                    self.imbalance_value += side_sign * value_remaining;
1140                    self.core.apply_update(
1141                        trade.price,
1142                        Quantity::new(size_remaining, trade.size.precision),
1143                        trade.ts_init,
1144                    );
1145
1146                    if self.imbalance_value.abs() >= self.step_value {
1147                        self.core.build_now_and_send();
1148                        self.imbalance_value = 0.0;
1149                    }
1150                    break;
1151                }
1152
1153                let mut value_chunk = needed;
1154                let mut size_chunk = value_chunk / price_f64;
1155
1156                // Clamp to minimum representable size to avoid zero-volume bars
1157                if is_below_min_size(size_chunk, trade.size.precision) {
1158                    if is_below_min_size(size_remaining, trade.size.precision) {
1159                        break;
1160                    }
1161                    size_chunk = min_size_f64(trade.size.precision);
1162                    value_chunk = price_f64 * size_chunk;
1163                }
1164
1165                self.core.apply_update(
1166                    trade.price,
1167                    Quantity::new(size_chunk, trade.size.precision),
1168                    trade.ts_init,
1169                );
1170                self.imbalance_value += side_sign * value_chunk;
1171                size_remaining -= size_chunk;
1172
1173                if self.imbalance_value.abs() >= self.step_value {
1174                    self.core.build_now_and_send();
1175                    self.imbalance_value = 0.0;
1176                }
1177            } else {
1178                // Opposing side: first neutralize existing imbalance
1179                let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1180                let mut size_chunk = value_to_flatten / price_f64;
1181
1182                // Clamp to minimum representable size to avoid zero-volume bars
1183                if is_below_min_size(size_chunk, trade.size.precision) {
1184                    if is_below_min_size(size_remaining, trade.size.precision) {
1185                        break;
1186                    }
1187                    size_chunk = min_size_f64(trade.size.precision);
1188                    value_to_flatten = price_f64 * size_chunk;
1189                }
1190
1191                self.core.apply_update(
1192                    trade.price,
1193                    Quantity::new(size_chunk, trade.size.precision),
1194                    trade.ts_init,
1195                );
1196                self.imbalance_value += side_sign * value_to_flatten;
1197
1198                // Min-size clamp can overshoot past threshold
1199                if self.imbalance_value.abs() >= self.step_value {
1200                    self.core.build_now_and_send();
1201                    self.imbalance_value = 0.0;
1202                }
1203                size_remaining -= size_chunk;
1204            }
1205        }
1206    }
1207
1208    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1209        self.core.builder.update_bar(bar, volume, ts_init);
1210    }
1211}
1212
1213/// Aggregates bars based on consecutive buy/sell notional runs.
1214pub struct ValueRunsBarAggregator {
1215    core: BarAggregatorCore,
1216    current_run_side: Option<AggressorSide>,
1217    run_value: f64,
1218    step_value: f64,
1219}
1220
1221impl Debug for ValueRunsBarAggregator {
1222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1223        f.debug_struct(stringify!(ValueRunsBarAggregator))
1224            .field("core", &self.core)
1225            .field("current_run_side", &self.current_run_side)
1226            .field("run_value", &self.run_value)
1227            .field("step_value", &self.step_value)
1228            .finish()
1229    }
1230}
1231
1232impl ValueRunsBarAggregator {
1233    /// Creates a new [`ValueRunsBarAggregator`] instance.
1234    ///
1235    /// # Panics
1236    ///
1237    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1238    pub fn new<H: FnMut(Bar) + 'static>(
1239        bar_type: BarType,
1240        price_precision: u8,
1241        size_precision: u8,
1242        handler: H,
1243    ) -> Self {
1244        Self {
1245            core: BarAggregatorCore::new(
1246                bar_type.standard(),
1247                price_precision,
1248                size_precision,
1249                handler,
1250            ),
1251            current_run_side: None,
1252            run_value: 0.0,
1253            step_value: bar_type.spec().step.get() as f64,
1254        }
1255    }
1256}
1257
1258impl BarAggregator for ValueRunsBarAggregator {
1259    fn bar_type(&self) -> BarType {
1260        self.core.bar_type
1261    }
1262
1263    fn is_running(&self) -> bool {
1264        self.core.is_running
1265    }
1266
1267    fn set_is_running(&mut self, value: bool) {
1268        self.core.set_is_running(value);
1269    }
1270
1271    /// Apply the given update to the aggregator.
1272    ///
1273    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1274    /// quote/bar updates where no aggressor side is available.
1275    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1276        self.core.apply_update(price, size, ts_init);
1277    }
1278
1279    fn handle_trade(&mut self, trade: TradeTick) {
1280        let price_f64 = trade.price.as_f64();
1281        if price_f64 == 0.0 {
1282            self.core
1283                .apply_update(trade.price, trade.size, trade.ts_init);
1284            return;
1285        }
1286
1287        let side = match trade.aggressor_side {
1288            AggressorSide::Buyer => Some(AggressorSide::Buyer),
1289            AggressorSide::Seller => Some(AggressorSide::Seller),
1290            AggressorSide::NoAggressor => None,
1291        };
1292
1293        let Some(side) = side else {
1294            self.core
1295                .apply_update(trade.price, trade.size, trade.ts_init);
1296            return;
1297        };
1298
1299        if self.current_run_side != Some(side) {
1300            self.current_run_side = Some(side);
1301            self.run_value = 0.0;
1302            self.core.builder.reset();
1303        }
1304
1305        let mut size_remaining = trade.size.as_f64();
1306        while size_remaining > 0.0 {
1307            let value_update = price_f64 * size_remaining;
1308            if self.run_value + value_update < self.step_value {
1309                self.run_value += value_update;
1310                self.core.apply_update(
1311                    trade.price,
1312                    Quantity::new(size_remaining, trade.size.precision),
1313                    trade.ts_init,
1314                );
1315                break;
1316            }
1317
1318            let value_needed = self.step_value - self.run_value;
1319            let mut size_chunk = value_needed / price_f64;
1320
1321            // Clamp to minimum representable size to avoid zero-volume bars
1322            if is_below_min_size(size_chunk, trade.size.precision) {
1323                if is_below_min_size(size_remaining, trade.size.precision) {
1324                    break;
1325                }
1326                size_chunk = min_size_f64(trade.size.precision);
1327            }
1328
1329            self.core.apply_update(
1330                trade.price,
1331                Quantity::new(size_chunk, trade.size.precision),
1332                trade.ts_init,
1333            );
1334
1335            self.core.build_now_and_send();
1336            self.run_value = 0.0;
1337            self.current_run_side = None;
1338            size_remaining -= size_chunk;
1339        }
1340    }
1341
1342    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1343        self.core.builder.update_bar(bar, volume, ts_init);
1344    }
1345}
1346
1347/// Provides a means of building Renko bars aggregated from quote and trades.
1348///
1349/// Renko bars are created when the price moves by a fixed amount (brick size)
1350/// regardless of time or volume. Each bar represents a price movement equal
1351/// to the step size in the bar specification.
1352pub struct RenkoBarAggregator {
1353    core: BarAggregatorCore,
1354    pub brick_size: PriceRaw,
1355    last_close: Option<Price>,
1356}
1357
1358impl Debug for RenkoBarAggregator {
1359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1360        f.debug_struct(stringify!(RenkoBarAggregator))
1361            .field("core", &self.core)
1362            .field("brick_size", &self.brick_size)
1363            .field("last_close", &self.last_close)
1364            .finish()
1365    }
1366}
1367
1368impl RenkoBarAggregator {
1369    /// Creates a new [`RenkoBarAggregator`] instance.
1370    ///
1371    /// # Panics
1372    ///
1373    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1374    pub fn new<H: FnMut(Bar) + 'static>(
1375        bar_type: BarType,
1376        price_precision: u8,
1377        size_precision: u8,
1378        price_increment: Price,
1379        handler: H,
1380    ) -> Self {
1381        // Calculate brick size in raw price units (step * price_increment.raw)
1382        let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1383
1384        Self {
1385            core: BarAggregatorCore::new(
1386                bar_type.standard(),
1387                price_precision,
1388                size_precision,
1389                handler,
1390            ),
1391            brick_size,
1392            last_close: None,
1393        }
1394    }
1395}
1396
1397impl BarAggregator for RenkoBarAggregator {
1398    fn bar_type(&self) -> BarType {
1399        self.core.bar_type
1400    }
1401
1402    fn is_running(&self) -> bool {
1403        self.core.is_running
1404    }
1405
1406    fn set_is_running(&mut self, value: bool) {
1407        self.core.set_is_running(value);
1408    }
1409
1410    /// Apply the given update to the aggregator.
1411    ///
1412    /// For Renko bars, we check if the price movement from the last close
1413    /// is greater than or equal to the brick size. If so, we create new bars.
1414    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1415        // Always update the builder with the current tick
1416        self.core.apply_update(price, size, ts_init);
1417
1418        // Initialize last_close if this is the first update
1419        if self.last_close.is_none() {
1420            self.last_close = Some(price);
1421            return;
1422        }
1423
1424        let last_close = self.last_close.unwrap();
1425
1426        // Convert prices to raw units (integers) to avoid floating point precision issues
1427        let current_raw = price.raw;
1428        let last_close_raw = last_close.raw;
1429        let price_diff_raw = current_raw - last_close_raw;
1430        let abs_price_diff_raw = price_diff_raw.abs();
1431
1432        // Check if we need to create one or more Renko bars
1433        if abs_price_diff_raw >= self.brick_size {
1434            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1435            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1436            let mut current_close = last_close;
1437
1438            // Store the current builder volume to distribute across bricks
1439            let total_volume = self.core.builder.volume;
1440
1441            for _i in 0..num_bricks {
1442                // Calculate the close price for this brick using raw price units
1443                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1444                let brick_close = Price::from_raw(brick_close_raw, price.precision);
1445
1446                // For Renko bars: open = previous close, high/low depend on direction
1447                let (brick_high, brick_low) = if direction > 0.0 {
1448                    (brick_close, current_close)
1449                } else {
1450                    (current_close, brick_close)
1451                };
1452
1453                // Reset builder for this brick
1454                self.core.builder.reset();
1455                self.core.builder.open = Some(current_close);
1456                self.core.builder.high = Some(brick_high);
1457                self.core.builder.low = Some(brick_low);
1458                self.core.builder.close = Some(brick_close);
1459                self.core.builder.volume = total_volume; // Each brick gets the full volume
1460                self.core.builder.count = 1;
1461                self.core.builder.ts_last = ts_init;
1462                self.core.builder.initialized = true;
1463
1464                // Build and send the bar
1465                self.core.build_and_send(ts_init, ts_init);
1466
1467                // Update for the next brick
1468                current_close = brick_close;
1469                self.last_close = Some(brick_close);
1470            }
1471        }
1472    }
1473
1474    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1475        // Always update the builder with the current bar
1476        self.core.builder.update_bar(bar, volume, ts_init);
1477
1478        // Initialize last_close if this is the first update
1479        if self.last_close.is_none() {
1480            self.last_close = Some(bar.close);
1481            return;
1482        }
1483
1484        let last_close = self.last_close.unwrap();
1485
1486        // Convert prices to raw units (integers) to avoid floating point precision issues
1487        let current_raw = bar.close.raw;
1488        let last_close_raw = last_close.raw;
1489        let price_diff_raw = current_raw - last_close_raw;
1490        let abs_price_diff_raw = price_diff_raw.abs();
1491
1492        // Check if we need to create one or more Renko bars
1493        if abs_price_diff_raw >= self.brick_size {
1494            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1495            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1496            let mut current_close = last_close;
1497
1498            // Store the current builder volume to distribute across bricks
1499            let total_volume = self.core.builder.volume;
1500
1501            for _i in 0..num_bricks {
1502                // Calculate the close price for this brick using raw price units
1503                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1504                let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1505
1506                // For Renko bars: open = previous close, high/low depend on direction
1507                let (brick_high, brick_low) = if direction > 0.0 {
1508                    (brick_close, current_close)
1509                } else {
1510                    (current_close, brick_close)
1511                };
1512
1513                // Reset builder for this brick
1514                self.core.builder.reset();
1515                self.core.builder.open = Some(current_close);
1516                self.core.builder.high = Some(brick_high);
1517                self.core.builder.low = Some(brick_low);
1518                self.core.builder.close = Some(brick_close);
1519                self.core.builder.volume = total_volume; // Each brick gets the full volume
1520                self.core.builder.count = 1;
1521                self.core.builder.ts_last = ts_init;
1522                self.core.builder.initialized = true;
1523
1524                // Build and send the bar
1525                self.core.build_and_send(ts_init, ts_init);
1526
1527                // Update for the next brick
1528                current_close = brick_close;
1529                self.last_close = Some(brick_close);
1530            }
1531        }
1532    }
1533}
1534
1535/// Provides a means of building time bars aggregated from quote and trades.
1536///
1537/// At each aggregation time interval, a bar is created and sent to the handler.
1538pub struct TimeBarAggregator {
1539    core: BarAggregatorCore,
1540    clock: Rc<RefCell<dyn Clock>>,
1541    build_with_no_updates: bool,
1542    timestamp_on_close: bool,
1543    is_left_open: bool,
1544    stored_open_ns: UnixNanos,
1545    timer_name: String,
1546    interval_ns: UnixNanos,
1547    next_close_ns: UnixNanos,
1548    bar_build_delay: u64,
1549    time_bars_origin_offset: Option<TimeDelta>,
1550    skip_first_non_full_bar: bool,
1551    pub historical_mode: bool,
1552    historical_events: Vec<TimeEvent>,
1553    aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1554}
1555
1556impl Debug for TimeBarAggregator {
1557    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1558        f.debug_struct(stringify!(TimeBarAggregator))
1559            .field("core", &self.core)
1560            .field("build_with_no_updates", &self.build_with_no_updates)
1561            .field("timestamp_on_close", &self.timestamp_on_close)
1562            .field("is_left_open", &self.is_left_open)
1563            .field("timer_name", &self.timer_name)
1564            .field("interval_ns", &self.interval_ns)
1565            .field("bar_build_delay", &self.bar_build_delay)
1566            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1567            .finish()
1568    }
1569}
1570
1571impl TimeBarAggregator {
1572    /// Creates a new [`TimeBarAggregator`] instance.
1573    ///
1574    /// # Panics
1575    ///
1576    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1577    #[allow(clippy::too_many_arguments)]
1578    pub fn new<H: FnMut(Bar) + 'static>(
1579        bar_type: BarType,
1580        price_precision: u8,
1581        size_precision: u8,
1582        clock: Rc<RefCell<dyn Clock>>,
1583        handler: H,
1584        build_with_no_updates: bool,
1585        timestamp_on_close: bool,
1586        interval_type: BarIntervalType,
1587        time_bars_origin_offset: Option<TimeDelta>,
1588        bar_build_delay: u64,
1589        skip_first_non_full_bar: bool,
1590    ) -> Self {
1591        let is_left_open = match interval_type {
1592            BarIntervalType::LeftOpen => true,
1593            BarIntervalType::RightOpen => false,
1594        };
1595
1596        let core = BarAggregatorCore::new(
1597            bar_type.standard(),
1598            price_precision,
1599            size_precision,
1600            handler,
1601        );
1602
1603        Self {
1604            core,
1605            clock,
1606            build_with_no_updates,
1607            timestamp_on_close,
1608            is_left_open,
1609            stored_open_ns: UnixNanos::default(),
1610            timer_name: bar_type.to_string(),
1611            interval_ns: get_bar_interval_ns(&bar_type),
1612            next_close_ns: UnixNanos::default(),
1613            bar_build_delay,
1614            time_bars_origin_offset,
1615            skip_first_non_full_bar,
1616            historical_mode: false,
1617            historical_events: Vec::new(),
1618            aggregator_weak: None,
1619        }
1620    }
1621
1622    /// Sets the clock for the aggregator (internal method).
1623    pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1624        self.clock = clock;
1625    }
1626
1627    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
1628    ///
1629    /// This matches the Cython `start_timer()` method exactly.
1630    /// Creates a callback to `build_bar` using a weak reference to the aggregator.
1631    ///
1632    /// # Panics
1633    ///
1634    /// Panics if aggregator_rc is None and aggregator_weak hasn't been set, or if timer registration fails.
1635    pub fn start_timer_internal(
1636        &mut self,
1637        aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1638    ) {
1639        // Create callback that calls build_bar through the weak reference
1640        let aggregator_weak = if let Some(rc) = aggregator_rc {
1641            // Store weak reference for future use (e.g., in build_bar for month/year)
1642            let weak = Rc::downgrade(&rc);
1643            self.aggregator_weak = Some(weak.clone());
1644            weak
1645        } else {
1646            // Use existing weak reference (for historical mode where it was set earlier)
1647            self.aggregator_weak
1648                .as_ref()
1649                .expect("Aggregator weak reference must be set before calling start_timer()")
1650                .clone()
1651        };
1652
1653        let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1654            if let Some(agg) = aggregator_weak.upgrade() {
1655                agg.borrow_mut().build_bar(event);
1656            }
1657        }));
1658
1659        // Computing start_time
1660        let now = self.clock.borrow().utc_now();
1661        let mut start_time =
1662            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1663        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1664
1665        // Closing a partial bar at the transition from historical to backtest data
1666        let fire_immediately = start_time == now;
1667
1668        self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1669
1670        let spec = &self.bar_type().spec();
1671        let start_time_ns = UnixNanos::from(start_time);
1672
1673        if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1674            self.clock
1675                .borrow_mut()
1676                .set_timer_ns(
1677                    &self.timer_name,
1678                    self.interval_ns.as_u64(),
1679                    Some(start_time_ns),
1680                    None,
1681                    Some(callback),
1682                    Some(true), // allow_past
1683                    Some(fire_immediately),
1684                )
1685                .expect(FAILED);
1686
1687            if fire_immediately {
1688                self.next_close_ns = start_time_ns;
1689            } else {
1690                let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1691                self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1692            }
1693
1694            self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1695        } else {
1696            // The monthly/yearly alert time is defined iteratively at each alert time as there is no regular interval
1697            let alert_time = if fire_immediately {
1698                start_time
1699            } else {
1700                let step = spec.step.get() as u32;
1701                if spec.aggregation == BarAggregation::Month {
1702                    add_n_months(start_time, step).expect(FAILED)
1703                } else {
1704                    // Year aggregation
1705                    add_n_years(start_time, step).expect(FAILED)
1706                }
1707            };
1708
1709            self.clock
1710                .borrow_mut()
1711                .set_time_alert_ns(
1712                    &self.timer_name,
1713                    UnixNanos::from(alert_time),
1714                    Some(callback),
1715                    Some(true), // allow_past
1716                )
1717                .expect(FAILED);
1718
1719            self.next_close_ns = UnixNanos::from(alert_time);
1720            self.stored_open_ns = UnixNanos::from(start_time);
1721        }
1722
1723        log::debug!(
1724            "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1725            self.timer_name,
1726            start_time,
1727            self.historical_mode,
1728            fire_immediately,
1729            now,
1730            self.bar_build_delay
1731        );
1732    }
1733
1734    /// Stops the time bar aggregator.
1735    pub fn stop(&mut self) {
1736        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1737    }
1738
1739    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1740        if self.skip_first_non_full_bar {
1741            self.core.builder.reset();
1742            self.skip_first_non_full_bar = false;
1743        } else {
1744            self.core.build_and_send(ts_event, ts_init);
1745        }
1746    }
1747
1748    fn build_bar(&mut self, event: TimeEvent) {
1749        if !self.core.builder.initialized {
1750            return;
1751        }
1752
1753        if !self.build_with_no_updates && self.core.builder.count == 0 {
1754            return; // Do not build bar when no update
1755        }
1756
1757        let ts_init = event.ts_event;
1758        let ts_event = if self.is_left_open {
1759            if self.timestamp_on_close {
1760                event.ts_event
1761            } else {
1762                self.stored_open_ns
1763            }
1764        } else {
1765            self.stored_open_ns
1766        };
1767
1768        self.build_and_send(ts_event, ts_init);
1769
1770        // Close time becomes the next open time
1771        self.stored_open_ns = event.ts_event;
1772
1773        if self.bar_type().spec().aggregation == BarAggregation::Month {
1774            let step = self.bar_type().spec().step.get() as u32;
1775            let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1776
1777            self.clock
1778                .borrow_mut()
1779                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1780                .expect(FAILED);
1781
1782            self.next_close_ns = alert_time_ns;
1783        } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1784            let step = self.bar_type().spec().step.get() as u32;
1785            let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1786
1787            self.clock
1788                .borrow_mut()
1789                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1790                .expect(FAILED);
1791
1792            self.next_close_ns = alert_time_ns;
1793        } else {
1794            // On receiving this event, timer should now have a new `next_time_ns`
1795            self.next_close_ns = self
1796                .clock
1797                .borrow()
1798                .next_time_ns(&self.timer_name)
1799                .unwrap_or_default();
1800        }
1801    }
1802
1803    fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1804        if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1805            // In historical mode, clock is always a TestClock (set by data engine)
1806            {
1807                let mut clock_borrow = self.clock.borrow_mut();
1808                let test_clock = clock_borrow
1809                    .as_any_mut()
1810                    .downcast_mut::<TestClock>()
1811                    .expect("Expected TestClock in historical mode");
1812                test_clock.set_time(ts_init);
1813            }
1814            // In historical mode, weak reference should already be set
1815            self.start_timer_internal(None);
1816        }
1817
1818        // Advance this aggregator's independent clock and collect timer events
1819        {
1820            let mut clock_borrow = self.clock.borrow_mut();
1821            let test_clock = clock_borrow
1822                .as_any_mut()
1823                .downcast_mut::<TestClock>()
1824                .expect("Expected TestClock in historical mode");
1825            self.historical_events = test_clock.advance_time(ts_init, true);
1826        }
1827    }
1828
1829    fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1830        // Process timer events after data processing
1831        // Collect events first to avoid borrow checker issues
1832        let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1833        for event in events {
1834            self.build_bar(event);
1835        }
1836    }
1837
1838    /// Sets historical events (called by data engine after advancing clock)
1839    pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1840        self.historical_events = events;
1841    }
1842}
1843
1844impl BarAggregator for TimeBarAggregator {
1845    fn bar_type(&self) -> BarType {
1846        self.core.bar_type
1847    }
1848
1849    fn is_running(&self) -> bool {
1850        self.core.is_running
1851    }
1852
1853    fn set_is_running(&mut self, value: bool) {
1854        self.core.set_is_running(value);
1855    }
1856
1857    /// Stop time-based aggregator by canceling its timer.
1858    fn stop(&mut self) {
1859        Self::stop(self);
1860    }
1861
1862    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1863        if self.historical_mode {
1864            self.preprocess_historical_events(ts_init);
1865        }
1866
1867        self.core.apply_update(price, size, ts_init);
1868
1869        if self.historical_mode {
1870            self.postprocess_historical_events(ts_init);
1871        }
1872    }
1873
1874    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1875        if self.historical_mode {
1876            self.preprocess_historical_events(ts_init);
1877        }
1878
1879        self.core.builder.update_bar(bar, volume, ts_init);
1880
1881        if self.historical_mode {
1882            self.postprocess_historical_events(ts_init);
1883        }
1884    }
1885
1886    fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1887        self.historical_mode = historical_mode;
1888        self.core.handler = handler;
1889    }
1890
1891    fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1892        self.set_historical_events_internal(events);
1893    }
1894
1895    fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1896        self.set_clock_internal(clock);
1897    }
1898
1899    fn build_bar(&mut self, event: TimeEvent) {
1900        // Delegate to the implementation method
1901        // We use the struct name here to disambiguate from the trait method
1902        {
1903            #[allow(clippy::use_self)]
1904            TimeBarAggregator::build_bar(self, event);
1905        }
1906    }
1907
1908    fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1909        self.aggregator_weak = Some(weak);
1910    }
1911
1912    fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1913        self.start_timer_internal(aggregator_rc);
1914    }
1915}
1916
1917fn is_below_min_size(size: f64, precision: u8) -> bool {
1918    Quantity::new(size, precision).raw == 0
1919}
1920
1921fn min_size_f64(precision: u8) -> f64 {
1922    10_f64.powi(-(precision as i32))
1923}
1924
1925#[cfg(test)]
1926mod tests {
1927    use std::sync::{Arc, Mutex};
1928
1929    use nautilus_common::clock::TestClock;
1930    use nautilus_core::{MUTEX_POISONED, UUID4};
1931    use nautilus_model::{
1932        data::{BarSpecification, BarType},
1933        enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1934        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1935        types::{Price, Quantity},
1936    };
1937    use rstest::rstest;
1938    use ustr::Ustr;
1939
1940    use super::*;
1941
1942    #[rstest]
1943    fn test_bar_builder_initialization(equity_aapl: Equity) {
1944        let instrument = InstrumentAny::Equity(equity_aapl);
1945        let bar_type = BarType::new(
1946            instrument.id(),
1947            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1948            AggregationSource::Internal,
1949        );
1950        let builder = BarBuilder::new(
1951            bar_type,
1952            instrument.price_precision(),
1953            instrument.size_precision(),
1954        );
1955
1956        assert!(!builder.initialized);
1957        assert_eq!(builder.ts_last, 0);
1958        assert_eq!(builder.count, 0);
1959    }
1960
1961    #[rstest]
1962    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1963        let instrument = InstrumentAny::Equity(equity_aapl);
1964        let bar_type = BarType::new(
1965            instrument.id(),
1966            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1967            AggregationSource::Internal,
1968        );
1969        let mut builder = BarBuilder::new(
1970            bar_type,
1971            instrument.price_precision(),
1972            instrument.size_precision(),
1973        );
1974
1975        builder.update(
1976            Price::from("100.00"),
1977            Quantity::from(1),
1978            UnixNanos::from(1000),
1979        );
1980        builder.update(
1981            Price::from("95.00"),
1982            Quantity::from(1),
1983            UnixNanos::from(2000),
1984        );
1985        builder.update(
1986            Price::from("105.00"),
1987            Quantity::from(1),
1988            UnixNanos::from(3000),
1989        );
1990
1991        let bar = builder.build_now();
1992        assert!(bar.high > bar.low);
1993        assert_eq!(bar.open, Price::from("100.00"));
1994        assert_eq!(bar.high, Price::from("105.00"));
1995        assert_eq!(bar.low, Price::from("95.00"));
1996        assert_eq!(bar.close, Price::from("105.00"));
1997    }
1998
1999    #[rstest]
2000    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
2001        let instrument = InstrumentAny::Equity(equity_aapl);
2002        let bar_type = BarType::new(
2003            instrument.id(),
2004            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2005            AggregationSource::Internal,
2006        );
2007        let mut builder = BarBuilder::new(
2008            bar_type,
2009            instrument.price_precision(),
2010            instrument.size_precision(),
2011        );
2012
2013        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2014        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2015
2016        assert_eq!(builder.ts_last, 1_000);
2017        assert_eq!(builder.count, 1);
2018    }
2019
2020    #[rstest]
2021    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2022        let instrument = InstrumentAny::Equity(equity_aapl);
2023        let bar_type = BarType::new(
2024            instrument.id(),
2025            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2026            AggregationSource::Internal,
2027        );
2028        let mut builder = BarBuilder::new(
2029            bar_type,
2030            instrument.price_precision(),
2031            instrument.size_precision(),
2032        );
2033
2034        builder.update(
2035            Price::from("1.00000"),
2036            Quantity::from(1),
2037            UnixNanos::default(),
2038        );
2039
2040        assert!(builder.initialized);
2041        assert_eq!(builder.ts_last, 0);
2042        assert_eq!(builder.count, 1);
2043    }
2044
2045    #[rstest]
2046    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2047        equity_aapl: Equity,
2048    ) {
2049        let instrument = InstrumentAny::Equity(equity_aapl);
2050        let bar_type = BarType::new(
2051            instrument.id(),
2052            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2053            AggregationSource::Internal,
2054        );
2055        let mut builder = BarBuilder::new(bar_type, 2, 0);
2056
2057        builder.update(
2058            Price::from("1.00000"),
2059            Quantity::from(1),
2060            UnixNanos::from(1_000),
2061        );
2062        builder.update(
2063            Price::from("1.00001"),
2064            Quantity::from(1),
2065            UnixNanos::from(500),
2066        );
2067
2068        assert!(builder.initialized);
2069        assert_eq!(builder.ts_last, 1_000);
2070        assert_eq!(builder.count, 1);
2071    }
2072
2073    #[rstest]
2074    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2075        let instrument = InstrumentAny::Equity(equity_aapl);
2076        let bar_type = BarType::new(
2077            instrument.id(),
2078            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2079            AggregationSource::Internal,
2080        );
2081        let mut builder = BarBuilder::new(
2082            bar_type,
2083            instrument.price_precision(),
2084            instrument.size_precision(),
2085        );
2086
2087        for _ in 0..5 {
2088            builder.update(
2089                Price::from("1.00000"),
2090                Quantity::from(1),
2091                UnixNanos::from(1_000),
2092            );
2093        }
2094
2095        assert_eq!(builder.count, 5);
2096    }
2097
2098    #[rstest]
2099    #[should_panic]
2100    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2101        let instrument = InstrumentAny::Equity(equity_aapl);
2102        let bar_type = BarType::new(
2103            instrument.id(),
2104            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2105            AggregationSource::Internal,
2106        );
2107        let mut builder = BarBuilder::new(
2108            bar_type,
2109            instrument.price_precision(),
2110            instrument.size_precision(),
2111        );
2112        let _ = builder.build_now();
2113    }
2114
2115    #[rstest]
2116    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2117        let instrument = InstrumentAny::Equity(equity_aapl);
2118        let bar_type = BarType::new(
2119            instrument.id(),
2120            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2121            AggregationSource::Internal,
2122        );
2123        let mut builder = BarBuilder::new(
2124            bar_type,
2125            instrument.price_precision(),
2126            instrument.size_precision(),
2127        );
2128
2129        builder.update(
2130            Price::from("1.00001"),
2131            Quantity::from(2),
2132            UnixNanos::default(),
2133        );
2134        builder.update(
2135            Price::from("1.00002"),
2136            Quantity::from(2),
2137            UnixNanos::default(),
2138        );
2139        builder.update(
2140            Price::from("1.00000"),
2141            Quantity::from(1),
2142            UnixNanos::from(1_000_000_000),
2143        );
2144
2145        let bar = builder.build_now();
2146
2147        assert_eq!(bar.open, Price::from("1.00001"));
2148        assert_eq!(bar.high, Price::from("1.00002"));
2149        assert_eq!(bar.low, Price::from("1.00000"));
2150        assert_eq!(bar.close, Price::from("1.00000"));
2151        assert_eq!(bar.volume, Quantity::from(5));
2152        assert_eq!(bar.ts_init, 1_000_000_000);
2153        assert_eq!(builder.ts_last, 1_000_000_000);
2154        assert_eq!(builder.count, 0);
2155    }
2156
2157    #[rstest]
2158    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2159        let instrument = InstrumentAny::Equity(equity_aapl);
2160        let bar_type = BarType::new(
2161            instrument.id(),
2162            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2163            AggregationSource::Internal,
2164        );
2165        let mut builder = BarBuilder::new(bar_type, 2, 0);
2166
2167        builder.update(
2168            Price::from("1.00001"),
2169            Quantity::from(1),
2170            UnixNanos::default(),
2171        );
2172        builder.build_now();
2173
2174        builder.update(
2175            Price::from("1.00000"),
2176            Quantity::from(1),
2177            UnixNanos::default(),
2178        );
2179        builder.update(
2180            Price::from("1.00003"),
2181            Quantity::from(1),
2182            UnixNanos::default(),
2183        );
2184        builder.update(
2185            Price::from("1.00002"),
2186            Quantity::from(1),
2187            UnixNanos::default(),
2188        );
2189
2190        let bar = builder.build_now();
2191
2192        assert_eq!(bar.open, Price::from("1.00000"));
2193        assert_eq!(bar.high, Price::from("1.00003"));
2194        assert_eq!(bar.low, Price::from("1.00000"));
2195        assert_eq!(bar.close, Price::from("1.00002"));
2196        assert_eq!(bar.volume, Quantity::from(3));
2197    }
2198
2199    #[rstest]
2200    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2201        let instrument = InstrumentAny::Equity(equity_aapl);
2202        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2203        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2204        let handler = Arc::new(Mutex::new(Vec::new()));
2205        let handler_clone = Arc::clone(&handler);
2206
2207        let mut aggregator = TickBarAggregator::new(
2208            bar_type,
2209            instrument.price_precision(),
2210            instrument.size_precision(),
2211            move |bar: Bar| {
2212                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2213                handler_guard.push(bar);
2214            },
2215        );
2216
2217        let trade = TradeTick::default();
2218        aggregator.handle_trade(trade);
2219
2220        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2221        assert_eq!(handler_guard.len(), 0);
2222    }
2223
2224    #[rstest]
2225    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2226        let instrument = InstrumentAny::Equity(equity_aapl);
2227        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2228        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2229        let handler = Arc::new(Mutex::new(Vec::new()));
2230        let handler_clone = Arc::clone(&handler);
2231
2232        let mut aggregator = TickBarAggregator::new(
2233            bar_type,
2234            instrument.price_precision(),
2235            instrument.size_precision(),
2236            move |bar: Bar| {
2237                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2238                handler_guard.push(bar);
2239            },
2240        );
2241
2242        let trade = TradeTick::default();
2243        aggregator.handle_trade(trade);
2244        aggregator.handle_trade(trade);
2245        aggregator.handle_trade(trade);
2246
2247        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2248        let bar = handler_guard.first().unwrap();
2249        assert_eq!(handler_guard.len(), 1);
2250        assert_eq!(bar.open, trade.price);
2251        assert_eq!(bar.high, trade.price);
2252        assert_eq!(bar.low, trade.price);
2253        assert_eq!(bar.close, trade.price);
2254        assert_eq!(bar.volume, Quantity::from(300000));
2255        assert_eq!(bar.ts_event, trade.ts_event);
2256        assert_eq!(bar.ts_init, trade.ts_init);
2257    }
2258
2259    #[rstest]
2260    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2261        let instrument = InstrumentAny::Equity(equity_aapl);
2262        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2263        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2264        let handler = Arc::new(Mutex::new(Vec::new()));
2265        let handler_clone = Arc::clone(&handler);
2266
2267        let mut aggregator = TickBarAggregator::new(
2268            bar_type,
2269            instrument.price_precision(),
2270            instrument.size_precision(),
2271            move |bar: Bar| {
2272                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2273                handler_guard.push(bar);
2274            },
2275        );
2276
2277        aggregator.update(
2278            Price::from("1.00001"),
2279            Quantity::from(1),
2280            UnixNanos::default(),
2281        );
2282        aggregator.update(
2283            Price::from("1.00002"),
2284            Quantity::from(1),
2285            UnixNanos::from(1000),
2286        );
2287        aggregator.update(
2288            Price::from("1.00003"),
2289            Quantity::from(1),
2290            UnixNanos::from(2000),
2291        );
2292
2293        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2294        assert_eq!(handler_guard.len(), 1);
2295
2296        let bar = handler_guard.first().unwrap();
2297        assert_eq!(bar.open, Price::from("1.00001"));
2298        assert_eq!(bar.high, Price::from("1.00003"));
2299        assert_eq!(bar.low, Price::from("1.00001"));
2300        assert_eq!(bar.close, Price::from("1.00003"));
2301        assert_eq!(bar.volume, Quantity::from(3));
2302    }
2303
2304    #[rstest]
2305    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2306        let instrument = InstrumentAny::Equity(equity_aapl);
2307        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2308        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2309        let handler = Arc::new(Mutex::new(Vec::new()));
2310        let handler_clone = Arc::clone(&handler);
2311
2312        let mut aggregator = TickBarAggregator::new(
2313            bar_type,
2314            instrument.price_precision(),
2315            instrument.size_precision(),
2316            move |bar: Bar| {
2317                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2318                handler_guard.push(bar);
2319            },
2320        );
2321
2322        aggregator.update(
2323            Price::from("1.00001"),
2324            Quantity::from(1),
2325            UnixNanos::default(),
2326        );
2327        aggregator.update(
2328            Price::from("1.00002"),
2329            Quantity::from(1),
2330            UnixNanos::from(1000),
2331        );
2332        aggregator.update(
2333            Price::from("1.00003"),
2334            Quantity::from(1),
2335            UnixNanos::from(2000),
2336        );
2337        aggregator.update(
2338            Price::from("1.00004"),
2339            Quantity::from(1),
2340            UnixNanos::from(3000),
2341        );
2342
2343        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2344        assert_eq!(handler_guard.len(), 2);
2345
2346        let bar1 = &handler_guard[0];
2347        assert_eq!(bar1.open, Price::from("1.00001"));
2348        assert_eq!(bar1.close, Price::from("1.00002"));
2349        assert_eq!(bar1.volume, Quantity::from(2));
2350
2351        let bar2 = &handler_guard[1];
2352        assert_eq!(bar2.open, Price::from("1.00003"));
2353        assert_eq!(bar2.close, Price::from("1.00004"));
2354        assert_eq!(bar2.volume, Quantity::from(2));
2355    }
2356
2357    #[rstest]
2358    fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2359        let instrument = InstrumentAny::Equity(equity_aapl);
2360        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2361        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2362        let handler = Arc::new(Mutex::new(Vec::new()));
2363        let handler_clone = Arc::clone(&handler);
2364
2365        let mut aggregator = TickImbalanceBarAggregator::new(
2366            bar_type,
2367            instrument.price_precision(),
2368            instrument.size_precision(),
2369            move |bar: Bar| {
2370                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2371                handler_guard.push(bar);
2372            },
2373        );
2374
2375        let trade = TradeTick::default();
2376        aggregator.handle_trade(trade);
2377        aggregator.handle_trade(trade);
2378
2379        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2380        assert_eq!(handler_guard.len(), 1);
2381        let bar = handler_guard.first().unwrap();
2382        assert_eq!(bar.volume, Quantity::from(200000));
2383    }
2384
2385    #[rstest]
2386    fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2387        let instrument = InstrumentAny::Equity(equity_aapl);
2388        let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2389        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2390        let handler = Arc::new(Mutex::new(Vec::new()));
2391        let handler_clone = Arc::clone(&handler);
2392
2393        let mut aggregator = TickImbalanceBarAggregator::new(
2394            bar_type,
2395            instrument.price_precision(),
2396            instrument.size_precision(),
2397            move |bar: Bar| {
2398                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2399                handler_guard.push(bar);
2400            },
2401        );
2402
2403        let sell = TradeTick {
2404            aggressor_side: AggressorSide::Seller,
2405            ..TradeTick::default()
2406        };
2407
2408        aggregator.handle_trade(sell);
2409
2410        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2411        assert_eq!(handler_guard.len(), 1);
2412    }
2413
2414    #[rstest]
2415    fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2416        let instrument = InstrumentAny::Equity(equity_aapl);
2417        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2418        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2419        let handler = Arc::new(Mutex::new(Vec::new()));
2420        let handler_clone = Arc::clone(&handler);
2421
2422        let mut aggregator = TickRunsBarAggregator::new(
2423            bar_type,
2424            instrument.price_precision(),
2425            instrument.size_precision(),
2426            move |bar: Bar| {
2427                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2428                handler_guard.push(bar);
2429            },
2430        );
2431
2432        let buy = TradeTick::default();
2433        let sell = TradeTick {
2434            aggressor_side: AggressorSide::Seller,
2435            ..buy
2436        };
2437
2438        aggregator.handle_trade(buy);
2439        aggregator.handle_trade(buy);
2440        aggregator.handle_trade(sell);
2441        aggregator.handle_trade(sell);
2442
2443        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2444        assert_eq!(handler_guard.len(), 2);
2445    }
2446
2447    #[rstest]
2448    fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2449        let instrument = InstrumentAny::Equity(equity_aapl);
2450        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2451        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2452        let handler = Arc::new(Mutex::new(Vec::new()));
2453        let handler_clone = Arc::clone(&handler);
2454
2455        let mut aggregator = TickRunsBarAggregator::new(
2456            bar_type,
2457            instrument.price_precision(),
2458            instrument.size_precision(),
2459            move |bar: Bar| {
2460                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2461                handler_guard.push(bar);
2462            },
2463        );
2464
2465        let buy = TradeTick {
2466            size: Quantity::from(1),
2467            ..TradeTick::default()
2468        };
2469        let sell = TradeTick {
2470            aggressor_side: AggressorSide::Seller,
2471            size: Quantity::from(1),
2472            ..buy
2473        };
2474
2475        aggregator.handle_trade(buy);
2476        aggregator.handle_trade(buy);
2477        aggregator.handle_trade(sell);
2478        aggregator.handle_trade(sell);
2479
2480        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2481        assert_eq!(handler_guard.len(), 2);
2482        assert_eq!(handler_guard[0].volume, Quantity::from(2));
2483        assert_eq!(handler_guard[1].volume, Quantity::from(2));
2484    }
2485
2486    #[rstest]
2487    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2488        let instrument = InstrumentAny::Equity(equity_aapl);
2489        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2490        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2491        let handler = Arc::new(Mutex::new(Vec::new()));
2492        let handler_clone = Arc::clone(&handler);
2493
2494        let mut aggregator = VolumeBarAggregator::new(
2495            bar_type,
2496            instrument.price_precision(),
2497            instrument.size_precision(),
2498            move |bar: Bar| {
2499                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2500                handler_guard.push(bar);
2501            },
2502        );
2503
2504        aggregator.update(
2505            Price::from("1.00001"),
2506            Quantity::from(25),
2507            UnixNanos::default(),
2508        );
2509
2510        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2511        assert_eq!(handler_guard.len(), 2);
2512        let bar1 = &handler_guard[0];
2513        assert_eq!(bar1.volume, Quantity::from(10));
2514        let bar2 = &handler_guard[1];
2515        assert_eq!(bar2.volume, Quantity::from(10));
2516    }
2517
2518    #[rstest]
2519    fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2520        let instrument = InstrumentAny::Equity(equity_aapl);
2521        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2522        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2523        let handler = Arc::new(Mutex::new(Vec::new()));
2524        let handler_clone = Arc::clone(&handler);
2525
2526        let mut aggregator = VolumeRunsBarAggregator::new(
2527            bar_type,
2528            instrument.price_precision(),
2529            instrument.size_precision(),
2530            move |bar: Bar| {
2531                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2532                handler_guard.push(bar);
2533            },
2534        );
2535
2536        let buy = TradeTick {
2537            instrument_id: instrument.id(),
2538            price: Price::from("1.0"),
2539            size: Quantity::from(1),
2540            ..TradeTick::default()
2541        };
2542        let sell = TradeTick {
2543            aggressor_side: AggressorSide::Seller,
2544            ..buy
2545        };
2546
2547        aggregator.handle_trade(buy);
2548        aggregator.handle_trade(buy); // emit first bar at 2
2549        aggregator.handle_trade(sell);
2550        aggregator.handle_trade(sell); // emit second bar at 2 sell-side
2551
2552        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2553        assert!(handler_guard.len() >= 2);
2554        assert!(
2555            (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2556                < f64::EPSILON
2557        );
2558    }
2559
2560    #[rstest]
2561    fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2562        let instrument = InstrumentAny::Equity(equity_aapl);
2563        let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2564        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2565        let handler = Arc::new(Mutex::new(Vec::new()));
2566        let handler_clone = Arc::clone(&handler);
2567
2568        let mut aggregator = VolumeRunsBarAggregator::new(
2569            bar_type,
2570            instrument.price_precision(),
2571            instrument.size_precision(),
2572            move |bar: Bar| {
2573                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2574                handler_guard.push(bar);
2575            },
2576        );
2577
2578        let trade = TradeTick {
2579            instrument_id: instrument.id(),
2580            price: Price::from("1.0"),
2581            size: Quantity::from(5),
2582            ..TradeTick::default()
2583        };
2584
2585        aggregator.handle_trade(trade);
2586
2587        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2588        assert!(!handler_guard.is_empty());
2589        assert!(handler_guard[0].volume.as_f64() > 0.0);
2590        assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2591    }
2592
2593    #[rstest]
2594    fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2595        let instrument = InstrumentAny::Equity(equity_aapl);
2596        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2597        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2598        let handler = Arc::new(Mutex::new(Vec::new()));
2599        let handler_clone = Arc::clone(&handler);
2600
2601        let mut aggregator = VolumeImbalanceBarAggregator::new(
2602            bar_type,
2603            instrument.price_precision(),
2604            instrument.size_precision(),
2605            move |bar: Bar| {
2606                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2607                handler_guard.push(bar);
2608            },
2609        );
2610
2611        let trade_small = TradeTick {
2612            instrument_id: instrument.id(),
2613            price: Price::from("1.0"),
2614            size: Quantity::from(1),
2615            ..TradeTick::default()
2616        };
2617        let trade_large = TradeTick {
2618            size: Quantity::from(3),
2619            ..trade_small
2620        };
2621
2622        aggregator.handle_trade(trade_small);
2623        aggregator.handle_trade(trade_large);
2624
2625        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2626        assert_eq!(handler_guard.len(), 2);
2627        let total_output = handler_guard
2628            .iter()
2629            .map(|bar| bar.volume.as_f64())
2630            .sum::<f64>();
2631        let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2632        assert!((total_output - total_input).abs() < f64::EPSILON);
2633    }
2634
2635    #[rstest]
2636    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2637        let instrument = InstrumentAny::Equity(equity_aapl);
2638        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
2639        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2640        let handler = Arc::new(Mutex::new(Vec::new()));
2641        let handler_clone = Arc::clone(&handler);
2642
2643        let mut aggregator = ValueBarAggregator::new(
2644            bar_type,
2645            instrument.price_precision(),
2646            instrument.size_precision(),
2647            move |bar: Bar| {
2648                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2649                handler_guard.push(bar);
2650            },
2651        );
2652
2653        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
2654        aggregator.update(
2655            Price::from("100.00"),
2656            Quantity::from(5),
2657            UnixNanos::default(),
2658        );
2659        aggregator.update(
2660            Price::from("100.00"),
2661            Quantity::from(5),
2662            UnixNanos::from(1000),
2663        );
2664
2665        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2666        assert_eq!(handler_guard.len(), 1);
2667        let bar = handler_guard.first().unwrap();
2668        assert_eq!(bar.volume, Quantity::from(10));
2669    }
2670
2671    #[rstest]
2672    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2673        let instrument = InstrumentAny::Equity(equity_aapl);
2674        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2675        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2676        let handler = Arc::new(Mutex::new(Vec::new()));
2677        let handler_clone = Arc::clone(&handler);
2678
2679        let mut aggregator = ValueBarAggregator::new(
2680            bar_type,
2681            instrument.price_precision(),
2682            instrument.size_precision(),
2683            move |bar: Bar| {
2684                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2685                handler_guard.push(bar);
2686            },
2687        );
2688
2689        // Single large update: $100 * 25 = $2500 (should create 2 bars)
2690        aggregator.update(
2691            Price::from("100.00"),
2692            Quantity::from(25),
2693            UnixNanos::default(),
2694        );
2695
2696        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2697        assert_eq!(handler_guard.len(), 2);
2698        let remaining_value = aggregator.get_cumulative_value();
2699        assert!(remaining_value < 1000.0); // Should be less than threshold
2700    }
2701
2702    #[rstest]
2703    fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2704        let instrument = InstrumentAny::Equity(equity_aapl);
2705        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2706        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2707        let handler = Arc::new(Mutex::new(Vec::new()));
2708        let handler_clone = Arc::clone(&handler);
2709
2710        let mut aggregator = ValueBarAggregator::new(
2711            bar_type,
2712            instrument.price_precision(),
2713            instrument.size_precision(),
2714            move |bar: Bar| {
2715                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2716                handler_guard.push(bar);
2717            },
2718        );
2719
2720        // Update with zero price should not cause division by zero
2721        aggregator.update(
2722            Price::from("0.00"),
2723            Quantity::from(100),
2724            UnixNanos::default(),
2725        );
2726
2727        // No bars should be emitted since value is zero
2728        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2729        assert_eq!(handler_guard.len(), 0);
2730
2731        // Cumulative value should remain zero
2732        assert_eq!(aggregator.get_cumulative_value(), 0.0);
2733    }
2734
2735    #[rstest]
2736    fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2737        let instrument = InstrumentAny::Equity(equity_aapl);
2738        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2739        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2740        let handler = Arc::new(Mutex::new(Vec::new()));
2741        let handler_clone = Arc::clone(&handler);
2742
2743        let mut aggregator = ValueBarAggregator::new(
2744            bar_type,
2745            instrument.price_precision(),
2746            instrument.size_precision(),
2747            move |bar: Bar| {
2748                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2749                handler_guard.push(bar);
2750            },
2751        );
2752
2753        // Update with zero size should not cause issues
2754        aggregator.update(
2755            Price::from("100.00"),
2756            Quantity::from(0),
2757            UnixNanos::default(),
2758        );
2759
2760        // No bars should be emitted
2761        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2762        assert_eq!(handler_guard.len(), 0);
2763
2764        // Cumulative value should remain zero
2765        assert_eq!(aggregator.get_cumulative_value(), 0.0);
2766    }
2767
2768    #[rstest]
2769    fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2770        let instrument = InstrumentAny::Equity(equity_aapl);
2771        let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2772        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2773        let handler = Arc::new(Mutex::new(Vec::new()));
2774        let handler_clone = Arc::clone(&handler);
2775
2776        let mut aggregator = ValueImbalanceBarAggregator::new(
2777            bar_type,
2778            instrument.price_precision(),
2779            instrument.size_precision(),
2780            move |bar: Bar| {
2781                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2782                handler_guard.push(bar);
2783            },
2784        );
2785
2786        let buy = TradeTick {
2787            price: Price::from("5.0"),
2788            size: Quantity::from(2), // value 10, should emit one bar
2789            instrument_id: instrument.id(),
2790            ..TradeTick::default()
2791        };
2792        let sell = TradeTick {
2793            price: Price::from("5.0"),
2794            size: Quantity::from(2), // value 10, should emit another bar
2795            aggressor_side: AggressorSide::Seller,
2796            instrument_id: instrument.id(),
2797            ..buy
2798        };
2799
2800        aggregator.handle_trade(buy);
2801        aggregator.handle_trade(sell);
2802
2803        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2804        assert_eq!(handler_guard.len(), 2);
2805    }
2806
2807    #[rstest]
2808    fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2809        let instrument = InstrumentAny::Equity(equity_aapl);
2810        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2811        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2812        let handler = Arc::new(Mutex::new(Vec::new()));
2813        let handler_clone = Arc::clone(&handler);
2814
2815        let mut aggregator = ValueRunsBarAggregator::new(
2816            bar_type,
2817            instrument.price_precision(),
2818            instrument.size_precision(),
2819            move |bar: Bar| {
2820                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2821                handler_guard.push(bar);
2822            },
2823        );
2824
2825        let trade = TradeTick {
2826            price: Price::from("10.0"),
2827            size: Quantity::from(5),
2828            instrument_id: instrument.id(),
2829            ..TradeTick::default()
2830        };
2831
2832        aggregator.handle_trade(trade);
2833        aggregator.handle_trade(trade);
2834
2835        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2836        assert_eq!(handler_guard.len(), 1);
2837        let bar = handler_guard.first().unwrap();
2838        assert_eq!(bar.volume, Quantity::from(10));
2839    }
2840
2841    #[rstest]
2842    fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2843        let instrument = InstrumentAny::Equity(equity_aapl);
2844        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2845        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2846        let handler = Arc::new(Mutex::new(Vec::new()));
2847        let handler_clone = Arc::clone(&handler);
2848
2849        let mut aggregator = ValueRunsBarAggregator::new(
2850            bar_type,
2851            instrument.price_precision(),
2852            instrument.size_precision(),
2853            move |bar: Bar| {
2854                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2855                handler_guard.push(bar);
2856            },
2857        );
2858
2859        let buy = TradeTick {
2860            price: Price::from("10.0"),
2861            size: Quantity::from(5),
2862            instrument_id: instrument.id(),
2863            ..TradeTick::default()
2864        }; // value 50
2865        let sell = TradeTick {
2866            price: Price::from("10.0"),
2867            size: Quantity::from(10),
2868            aggressor_side: AggressorSide::Seller,
2869            ..buy
2870        }; // value 100
2871
2872        aggregator.handle_trade(buy);
2873        aggregator.handle_trade(sell);
2874
2875        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2876        assert_eq!(handler_guard.len(), 1);
2877        assert_eq!(handler_guard[0].volume, Quantity::from(10));
2878    }
2879
2880    #[rstest]
2881    fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2882        let instrument = InstrumentAny::Equity(equity_aapl);
2883        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2884        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2885        let handler = Arc::new(Mutex::new(Vec::new()));
2886        let handler_clone = Arc::clone(&handler);
2887
2888        let mut aggregator = TickRunsBarAggregator::new(
2889            bar_type,
2890            instrument.price_precision(),
2891            instrument.size_precision(),
2892            move |bar: Bar| {
2893                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2894                handler_guard.push(bar);
2895            },
2896        );
2897
2898        let buy = TradeTick::default();
2899
2900        aggregator.handle_trade(buy);
2901        aggregator.handle_trade(buy); // Emit bar 1 (run complete)
2902        aggregator.handle_trade(buy); // Start new run
2903        aggregator.handle_trade(buy); // Emit bar 2 (new run complete)
2904
2905        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2906        assert_eq!(handler_guard.len(), 2);
2907    }
2908
2909    #[rstest]
2910    fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2911        let instrument = InstrumentAny::Equity(equity_aapl);
2912        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2913        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2914        let handler = Arc::new(Mutex::new(Vec::new()));
2915        let handler_clone = Arc::clone(&handler);
2916
2917        let mut aggregator = TickRunsBarAggregator::new(
2918            bar_type,
2919            instrument.price_precision(),
2920            instrument.size_precision(),
2921            move |bar: Bar| {
2922                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2923                handler_guard.push(bar);
2924            },
2925        );
2926
2927        let buy = TradeTick::default();
2928        let no_aggressor = TradeTick {
2929            aggressor_side: AggressorSide::NoAggressor,
2930            ..buy
2931        };
2932
2933        aggregator.handle_trade(buy);
2934        aggregator.handle_trade(no_aggressor); // Should not affect run count
2935        aggregator.handle_trade(no_aggressor); // Should not affect run count
2936        aggregator.handle_trade(buy); // Continue run to threshold
2937
2938        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2939        assert_eq!(handler_guard.len(), 1);
2940    }
2941
2942    #[rstest]
2943    fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2944        let instrument = InstrumentAny::Equity(equity_aapl);
2945        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2946        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2947        let handler = Arc::new(Mutex::new(Vec::new()));
2948        let handler_clone = Arc::clone(&handler);
2949
2950        let mut aggregator = VolumeRunsBarAggregator::new(
2951            bar_type,
2952            instrument.price_precision(),
2953            instrument.size_precision(),
2954            move |bar: Bar| {
2955                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2956                handler_guard.push(bar);
2957            },
2958        );
2959
2960        let buy = TradeTick {
2961            instrument_id: instrument.id(),
2962            price: Price::from("1.0"),
2963            size: Quantity::from(1),
2964            ..TradeTick::default()
2965        };
2966
2967        aggregator.handle_trade(buy);
2968        aggregator.handle_trade(buy); // Emit bar 1 (2.0 volume reached)
2969        aggregator.handle_trade(buy); // Start new run
2970        aggregator.handle_trade(buy); // Emit bar 2 (new 2.0 volume reached)
2971
2972        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2973        assert_eq!(handler_guard.len(), 2);
2974        assert_eq!(handler_guard[0].volume, Quantity::from(2));
2975        assert_eq!(handler_guard[1].volume, Quantity::from(2));
2976    }
2977
2978    #[rstest]
2979    fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2980        let instrument = InstrumentAny::Equity(equity_aapl);
2981        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2982        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2983        let handler = Arc::new(Mutex::new(Vec::new()));
2984        let handler_clone = Arc::clone(&handler);
2985
2986        let mut aggregator = ValueRunsBarAggregator::new(
2987            bar_type,
2988            instrument.price_precision(),
2989            instrument.size_precision(),
2990            move |bar: Bar| {
2991                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2992                handler_guard.push(bar);
2993            },
2994        );
2995
2996        let buy = TradeTick {
2997            instrument_id: instrument.id(),
2998            price: Price::from("10.0"),
2999            size: Quantity::from(5),
3000            ..TradeTick::default()
3001        }; // value 50 per trade
3002
3003        aggregator.handle_trade(buy);
3004        aggregator.handle_trade(buy); // Emit bar 1 (100 value reached)
3005        aggregator.handle_trade(buy); // Start new run
3006        aggregator.handle_trade(buy); // Emit bar 2 (new 100 value reached)
3007
3008        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3009        assert_eq!(handler_guard.len(), 2);
3010        assert_eq!(handler_guard[0].volume, Quantity::from(10));
3011        assert_eq!(handler_guard[1].volume, Quantity::from(10));
3012    }
3013
3014    #[rstest]
3015    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
3016        let instrument = InstrumentAny::Equity(equity_aapl);
3017        // One second bars
3018        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3019        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3020        let handler = Arc::new(Mutex::new(Vec::new()));
3021        let handler_clone = Arc::clone(&handler);
3022        let clock = Rc::new(RefCell::new(TestClock::new()));
3023
3024        let mut aggregator = TimeBarAggregator::new(
3025            bar_type,
3026            instrument.price_precision(),
3027            instrument.size_precision(),
3028            clock.clone(),
3029            move |bar: Bar| {
3030                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3031                handler_guard.push(bar);
3032            },
3033            true,  // build_with_no_updates
3034            false, // timestamp_on_close
3035            BarIntervalType::LeftOpen,
3036            None,  // time_bars_origin_offset
3037            15,    // bar_build_delay
3038            false, // skip_first_non_full_bar
3039        );
3040
3041        aggregator.update(
3042            Price::from("100.00"),
3043            Quantity::from(1),
3044            UnixNanos::default(),
3045        );
3046
3047        let next_sec = UnixNanos::from(1_000_000_000);
3048        clock.borrow_mut().set_time(next_sec);
3049
3050        let event = TimeEvent::new(
3051            Ustr::from("1-SECOND-LAST"),
3052            UUID4::new(),
3053            next_sec,
3054            next_sec,
3055        );
3056        aggregator.build_bar(event);
3057
3058        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3059        assert_eq!(handler_guard.len(), 1);
3060        let bar = handler_guard.first().unwrap();
3061        assert_eq!(bar.ts_event, UnixNanos::default());
3062        assert_eq!(bar.ts_init, next_sec);
3063    }
3064
3065    #[rstest]
3066    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3067        let instrument = InstrumentAny::Equity(equity_aapl);
3068        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3069        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3070        let handler = Arc::new(Mutex::new(Vec::new()));
3071        let handler_clone = Arc::clone(&handler);
3072        let clock = Rc::new(RefCell::new(TestClock::new()));
3073
3074        let mut aggregator = TimeBarAggregator::new(
3075            bar_type,
3076            instrument.price_precision(),
3077            instrument.size_precision(),
3078            clock.clone(),
3079            move |bar: Bar| {
3080                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3081                handler_guard.push(bar);
3082            },
3083            true, // build_with_no_updates
3084            true, // timestamp_on_close - changed to true to verify left-open behavior
3085            BarIntervalType::LeftOpen,
3086            None,
3087            15,
3088            false, // skip_first_non_full_bar
3089        );
3090
3091        // Update in first interval
3092        aggregator.update(
3093            Price::from("100.00"),
3094            Quantity::from(1),
3095            UnixNanos::default(),
3096        );
3097
3098        // First interval close
3099        let ts1 = UnixNanos::from(1_000_000_000);
3100        clock.borrow_mut().set_time(ts1);
3101        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3102        aggregator.build_bar(event);
3103
3104        // Update in second interval
3105        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3106
3107        // Second interval close
3108        let ts2 = UnixNanos::from(2_000_000_000);
3109        clock.borrow_mut().set_time(ts2);
3110        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3111        aggregator.build_bar(event);
3112
3113        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3114        assert_eq!(handler_guard.len(), 2);
3115
3116        let bar1 = &handler_guard[0];
3117        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
3118        assert_eq!(bar1.ts_init, ts1);
3119        assert_eq!(bar1.close, Price::from("100.00"));
3120        let bar2 = &handler_guard[1];
3121        assert_eq!(bar2.ts_event, ts2);
3122        assert_eq!(bar2.ts_init, ts2);
3123        assert_eq!(bar2.close, Price::from("101.00"));
3124    }
3125
3126    #[rstest]
3127    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3128        let instrument = InstrumentAny::Equity(equity_aapl);
3129        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3130        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3131        let handler = Arc::new(Mutex::new(Vec::new()));
3132        let handler_clone = Arc::clone(&handler);
3133        let clock = Rc::new(RefCell::new(TestClock::new()));
3134        let mut aggregator = TimeBarAggregator::new(
3135            bar_type,
3136            instrument.price_precision(),
3137            instrument.size_precision(),
3138            clock.clone(),
3139            move |bar: Bar| {
3140                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3141                handler_guard.push(bar);
3142            },
3143            true, // build_with_no_updates
3144            true, // timestamp_on_close
3145            BarIntervalType::RightOpen,
3146            None,
3147            15,
3148            false, // skip_first_non_full_bar
3149        );
3150
3151        // Update in first interval
3152        aggregator.update(
3153            Price::from("100.00"),
3154            Quantity::from(1),
3155            UnixNanos::default(),
3156        );
3157
3158        // First interval close
3159        let ts1 = UnixNanos::from(1_000_000_000);
3160        clock.borrow_mut().set_time(ts1);
3161        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3162        aggregator.build_bar(event);
3163
3164        // Update in second interval
3165        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3166
3167        // Second interval close
3168        let ts2 = UnixNanos::from(2_000_000_000);
3169        clock.borrow_mut().set_time(ts2);
3170        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3171        aggregator.build_bar(event);
3172
3173        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3174        assert_eq!(handler_guard.len(), 2);
3175
3176        let bar1 = &handler_guard[0];
3177        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
3178        assert_eq!(bar1.ts_init, ts1);
3179        assert_eq!(bar1.close, Price::from("100.00"));
3180
3181        let bar2 = &handler_guard[1];
3182        assert_eq!(bar2.ts_event, ts1);
3183        assert_eq!(bar2.ts_init, ts2);
3184        assert_eq!(bar2.close, Price::from("101.00"));
3185    }
3186
3187    #[rstest]
3188    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3189        let instrument = InstrumentAny::Equity(equity_aapl);
3190        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3191        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3192        let handler = Arc::new(Mutex::new(Vec::new()));
3193        let handler_clone = Arc::clone(&handler);
3194        let clock = Rc::new(RefCell::new(TestClock::new()));
3195
3196        // First test with build_with_no_updates = false
3197        let mut aggregator = TimeBarAggregator::new(
3198            bar_type,
3199            instrument.price_precision(),
3200            instrument.size_precision(),
3201            clock.clone(),
3202            move |bar: Bar| {
3203                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3204                handler_guard.push(bar);
3205            },
3206            false, // build_with_no_updates disabled
3207            true,  // timestamp_on_close
3208            BarIntervalType::LeftOpen,
3209            None,
3210            15,
3211            false, // skip_first_non_full_bar
3212        );
3213
3214        // No updates, just interval close
3215        let ts1 = UnixNanos::from(1_000_000_000);
3216        clock.borrow_mut().set_time(ts1);
3217        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3218        aggregator.build_bar(event);
3219
3220        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3221        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
3222        drop(handler_guard);
3223
3224        // Now test with build_with_no_updates = true
3225        let handler = Arc::new(Mutex::new(Vec::new()));
3226        let handler_clone = Arc::clone(&handler);
3227        let mut aggregator = TimeBarAggregator::new(
3228            bar_type,
3229            instrument.price_precision(),
3230            instrument.size_precision(),
3231            clock.clone(),
3232            move |bar: Bar| {
3233                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3234                handler_guard.push(bar);
3235            },
3236            true, // build_with_no_updates enabled
3237            true, // timestamp_on_close
3238            BarIntervalType::LeftOpen,
3239            None,
3240            15,
3241            false, // skip_first_non_full_bar
3242        );
3243
3244        aggregator.update(
3245            Price::from("100.00"),
3246            Quantity::from(1),
3247            UnixNanos::default(),
3248        );
3249
3250        // First interval with update
3251        let ts1 = UnixNanos::from(1_000_000_000);
3252        clock.borrow_mut().set_time(ts1);
3253        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3254        aggregator.build_bar(event);
3255
3256        // Second interval without updates
3257        let ts2 = UnixNanos::from(2_000_000_000);
3258        clock.borrow_mut().set_time(ts2);
3259        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3260        aggregator.build_bar(event);
3261
3262        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3263        assert_eq!(handler_guard.len(), 2); // Both bars should be built
3264        let bar1 = &handler_guard[0];
3265        assert_eq!(bar1.close, Price::from("100.00"));
3266        let bar2 = &handler_guard[1];
3267        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
3268    }
3269
3270    #[rstest]
3271    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3272        let instrument = InstrumentAny::Equity(equity_aapl);
3273        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3274        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3275        let clock = Rc::new(RefCell::new(TestClock::new()));
3276        let handler = Arc::new(Mutex::new(Vec::new()));
3277        let handler_clone = Arc::clone(&handler);
3278
3279        let mut aggregator = TimeBarAggregator::new(
3280            bar_type,
3281            instrument.price_precision(),
3282            instrument.size_precision(),
3283            clock.clone(),
3284            move |bar: Bar| {
3285                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3286                handler_guard.push(bar);
3287            },
3288            true, // build_with_no_updates
3289            true, // timestamp_on_close
3290            BarIntervalType::RightOpen,
3291            None,
3292            15,
3293            false, // skip_first_non_full_bar
3294        );
3295
3296        let ts1 = UnixNanos::from(1_000_000_000);
3297        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3298
3299        let ts2 = UnixNanos::from(2_000_000_000);
3300        clock.borrow_mut().set_time(ts2);
3301
3302        // Simulate timestamp on close
3303        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3304        aggregator.build_bar(event);
3305
3306        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3307        let bar = handler_guard.first().unwrap();
3308        assert_eq!(bar.ts_event, UnixNanos::default());
3309        assert_eq!(bar.ts_init, ts2);
3310    }
3311
3312    #[rstest]
3313    fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3314        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3315        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3316        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3317        let handler = Arc::new(Mutex::new(Vec::new()));
3318        let handler_clone = Arc::clone(&handler);
3319
3320        let aggregator = RenkoBarAggregator::new(
3321            bar_type,
3322            instrument.price_precision(),
3323            instrument.size_precision(),
3324            instrument.price_increment(),
3325            move |bar: Bar| {
3326                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3327                handler_guard.push(bar);
3328            },
3329        );
3330
3331        assert_eq!(aggregator.bar_type(), bar_type);
3332        assert!(!aggregator.is_running());
3333        // 10 pips * price_increment.raw (depends on precision mode)
3334        let expected_brick_size = 10 * instrument.price_increment().raw;
3335        assert_eq!(aggregator.brick_size, expected_brick_size);
3336    }
3337
3338    #[rstest]
3339    fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3340        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3341        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3342        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3343        let handler = Arc::new(Mutex::new(Vec::new()));
3344        let handler_clone = Arc::clone(&handler);
3345
3346        let mut aggregator = RenkoBarAggregator::new(
3347            bar_type,
3348            instrument.price_precision(),
3349            instrument.size_precision(),
3350            instrument.price_increment(),
3351            move |bar: Bar| {
3352                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3353                handler_guard.push(bar);
3354            },
3355        );
3356
3357        // Small price movement (5 pips, less than 10 pip brick size)
3358        aggregator.update(
3359            Price::from("1.00000"),
3360            Quantity::from(1),
3361            UnixNanos::default(),
3362        );
3363        aggregator.update(
3364            Price::from("1.00005"),
3365            Quantity::from(1),
3366            UnixNanos::from(1000),
3367        );
3368
3369        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3370        assert_eq!(handler_guard.len(), 0); // No bar created yet
3371    }
3372
3373    #[rstest]
3374    fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3375        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3376        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3377        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3378        let handler = Arc::new(Mutex::new(Vec::new()));
3379        let handler_clone = Arc::clone(&handler);
3380
3381        let mut aggregator = RenkoBarAggregator::new(
3382            bar_type,
3383            instrument.price_precision(),
3384            instrument.size_precision(),
3385            instrument.price_increment(),
3386            move |bar: Bar| {
3387                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3388                handler_guard.push(bar);
3389            },
3390        );
3391
3392        // Price movement exceeding brick size (15 pips)
3393        aggregator.update(
3394            Price::from("1.00000"),
3395            Quantity::from(1),
3396            UnixNanos::default(),
3397        );
3398        aggregator.update(
3399            Price::from("1.00015"),
3400            Quantity::from(1),
3401            UnixNanos::from(1000),
3402        );
3403
3404        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3405        assert_eq!(handler_guard.len(), 1);
3406
3407        let bar = handler_guard.first().unwrap();
3408        assert_eq!(bar.open, Price::from("1.00000"));
3409        assert_eq!(bar.high, Price::from("1.00010"));
3410        assert_eq!(bar.low, Price::from("1.00000"));
3411        assert_eq!(bar.close, Price::from("1.00010"));
3412        assert_eq!(bar.volume, Quantity::from(2));
3413        assert_eq!(bar.ts_event, UnixNanos::from(1000));
3414        assert_eq!(bar.ts_init, UnixNanos::from(1000));
3415    }
3416
3417    #[rstest]
3418    fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3419        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3420        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3421        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3422        let handler = Arc::new(Mutex::new(Vec::new()));
3423        let handler_clone = Arc::clone(&handler);
3424
3425        let mut aggregator = RenkoBarAggregator::new(
3426            bar_type,
3427            instrument.price_precision(),
3428            instrument.size_precision(),
3429            instrument.price_increment(),
3430            move |bar: Bar| {
3431                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3432                handler_guard.push(bar);
3433            },
3434        );
3435
3436        // Large price movement creating multiple bricks (25 pips = 2 bricks)
3437        aggregator.update(
3438            Price::from("1.00000"),
3439            Quantity::from(1),
3440            UnixNanos::default(),
3441        );
3442        aggregator.update(
3443            Price::from("1.00025"),
3444            Quantity::from(1),
3445            UnixNanos::from(1000),
3446        );
3447
3448        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3449        assert_eq!(handler_guard.len(), 2);
3450
3451        let bar1 = &handler_guard[0];
3452        assert_eq!(bar1.open, Price::from("1.00000"));
3453        assert_eq!(bar1.high, Price::from("1.00010"));
3454        assert_eq!(bar1.low, Price::from("1.00000"));
3455        assert_eq!(bar1.close, Price::from("1.00010"));
3456
3457        let bar2 = &handler_guard[1];
3458        assert_eq!(bar2.open, Price::from("1.00010"));
3459        assert_eq!(bar2.high, Price::from("1.00020"));
3460        assert_eq!(bar2.low, Price::from("1.00010"));
3461        assert_eq!(bar2.close, Price::from("1.00020"));
3462    }
3463
3464    #[rstest]
3465    fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3466        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3467        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3468        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3469        let handler = Arc::new(Mutex::new(Vec::new()));
3470        let handler_clone = Arc::clone(&handler);
3471
3472        let mut aggregator = RenkoBarAggregator::new(
3473            bar_type,
3474            instrument.price_precision(),
3475            instrument.size_precision(),
3476            instrument.price_increment(),
3477            move |bar: Bar| {
3478                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3479                handler_guard.push(bar);
3480            },
3481        );
3482
3483        // Start at higher price and move down
3484        aggregator.update(
3485            Price::from("1.00020"),
3486            Quantity::from(1),
3487            UnixNanos::default(),
3488        );
3489        aggregator.update(
3490            Price::from("1.00005"),
3491            Quantity::from(1),
3492            UnixNanos::from(1000),
3493        );
3494
3495        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3496        assert_eq!(handler_guard.len(), 1);
3497
3498        let bar = handler_guard.first().unwrap();
3499        assert_eq!(bar.open, Price::from("1.00020"));
3500        assert_eq!(bar.high, Price::from("1.00020"));
3501        assert_eq!(bar.low, Price::from("1.00010"));
3502        assert_eq!(bar.close, Price::from("1.00010"));
3503        assert_eq!(bar.volume, Quantity::from(2));
3504    }
3505
3506    #[rstest]
3507    fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3508        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3509        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3510        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3511        let handler = Arc::new(Mutex::new(Vec::new()));
3512        let handler_clone = Arc::clone(&handler);
3513
3514        let mut aggregator = RenkoBarAggregator::new(
3515            bar_type,
3516            instrument.price_precision(),
3517            instrument.size_precision(),
3518            instrument.price_increment(),
3519            move |bar: Bar| {
3520                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3521                handler_guard.push(bar);
3522            },
3523        );
3524
3525        // Create a bar with small price movement (5 pips)
3526        let input_bar = Bar::new(
3527            BarType::new(
3528                instrument.id(),
3529                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3530                AggregationSource::Internal,
3531            ),
3532            Price::from("1.00000"),
3533            Price::from("1.00005"),
3534            Price::from("0.99995"),
3535            Price::from("1.00005"), // 5 pip move up (less than 10 pip brick)
3536            Quantity::from(100),
3537            UnixNanos::default(),
3538            UnixNanos::from(1000),
3539        );
3540
3541        aggregator.handle_bar(input_bar);
3542
3543        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3544        assert_eq!(handler_guard.len(), 0); // No bar created yet
3545    }
3546
3547    #[rstest]
3548    fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3549        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3550        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3551        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3552        let handler = Arc::new(Mutex::new(Vec::new()));
3553        let handler_clone = Arc::clone(&handler);
3554
3555        let mut aggregator = RenkoBarAggregator::new(
3556            bar_type,
3557            instrument.price_precision(),
3558            instrument.size_precision(),
3559            instrument.price_increment(),
3560            move |bar: Bar| {
3561                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3562                handler_guard.push(bar);
3563            },
3564        );
3565
3566        // First bar to establish baseline
3567        let bar1 = Bar::new(
3568            BarType::new(
3569                instrument.id(),
3570                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3571                AggregationSource::Internal,
3572            ),
3573            Price::from("1.00000"),
3574            Price::from("1.00005"),
3575            Price::from("0.99995"),
3576            Price::from("1.00000"),
3577            Quantity::from(100),
3578            UnixNanos::default(),
3579            UnixNanos::default(),
3580        );
3581
3582        // Second bar with price movement exceeding brick size (10 pips)
3583        let bar2 = Bar::new(
3584            BarType::new(
3585                instrument.id(),
3586                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3587                AggregationSource::Internal,
3588            ),
3589            Price::from("1.00000"),
3590            Price::from("1.00015"),
3591            Price::from("0.99995"),
3592            Price::from("1.00010"), // 10 pip move up (exactly 1 brick)
3593            Quantity::from(50),
3594            UnixNanos::from(60_000_000_000),
3595            UnixNanos::from(60_000_000_000),
3596        );
3597
3598        aggregator.handle_bar(bar1);
3599        aggregator.handle_bar(bar2);
3600
3601        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3602        assert_eq!(handler_guard.len(), 1);
3603
3604        let bar = handler_guard.first().unwrap();
3605        assert_eq!(bar.open, Price::from("1.00000"));
3606        assert_eq!(bar.high, Price::from("1.00010"));
3607        assert_eq!(bar.low, Price::from("1.00000"));
3608        assert_eq!(bar.close, Price::from("1.00010"));
3609        assert_eq!(bar.volume, Quantity::from(150));
3610    }
3611
3612    #[rstest]
3613    fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3614        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3615        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3616        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3617        let handler = Arc::new(Mutex::new(Vec::new()));
3618        let handler_clone = Arc::clone(&handler);
3619
3620        let mut aggregator = RenkoBarAggregator::new(
3621            bar_type,
3622            instrument.price_precision(),
3623            instrument.size_precision(),
3624            instrument.price_increment(),
3625            move |bar: Bar| {
3626                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3627                handler_guard.push(bar);
3628            },
3629        );
3630
3631        // First bar to establish baseline
3632        let bar1 = Bar::new(
3633            BarType::new(
3634                instrument.id(),
3635                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3636                AggregationSource::Internal,
3637            ),
3638            Price::from("1.00000"),
3639            Price::from("1.00005"),
3640            Price::from("0.99995"),
3641            Price::from("1.00000"),
3642            Quantity::from(100),
3643            UnixNanos::default(),
3644            UnixNanos::default(),
3645        );
3646
3647        // Second bar with large price movement (30 pips = 3 bricks)
3648        let bar2 = Bar::new(
3649            BarType::new(
3650                instrument.id(),
3651                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3652                AggregationSource::Internal,
3653            ),
3654            Price::from("1.00000"),
3655            Price::from("1.00035"),
3656            Price::from("0.99995"),
3657            Price::from("1.00030"), // 30 pip move up (exactly 3 bricks)
3658            Quantity::from(50),
3659            UnixNanos::from(60_000_000_000),
3660            UnixNanos::from(60_000_000_000),
3661        );
3662
3663        aggregator.handle_bar(bar1);
3664        aggregator.handle_bar(bar2);
3665
3666        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3667        assert_eq!(handler_guard.len(), 3);
3668
3669        let bar1 = &handler_guard[0];
3670        assert_eq!(bar1.open, Price::from("1.00000"));
3671        assert_eq!(bar1.close, Price::from("1.00010"));
3672
3673        let bar2 = &handler_guard[1];
3674        assert_eq!(bar2.open, Price::from("1.00010"));
3675        assert_eq!(bar2.close, Price::from("1.00020"));
3676
3677        let bar3 = &handler_guard[2];
3678        assert_eq!(bar3.open, Price::from("1.00020"));
3679        assert_eq!(bar3.close, Price::from("1.00030"));
3680    }
3681
3682    #[rstest]
3683    fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3684        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3685        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3686        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3687        let handler = Arc::new(Mutex::new(Vec::new()));
3688        let handler_clone = Arc::clone(&handler);
3689
3690        let mut aggregator = RenkoBarAggregator::new(
3691            bar_type,
3692            instrument.price_precision(),
3693            instrument.size_precision(),
3694            instrument.price_increment(),
3695            move |bar: Bar| {
3696                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3697                handler_guard.push(bar);
3698            },
3699        );
3700
3701        // First bar to establish baseline
3702        let bar1 = Bar::new(
3703            BarType::new(
3704                instrument.id(),
3705                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3706                AggregationSource::Internal,
3707            ),
3708            Price::from("1.00020"),
3709            Price::from("1.00025"),
3710            Price::from("1.00015"),
3711            Price::from("1.00020"),
3712            Quantity::from(100),
3713            UnixNanos::default(),
3714            UnixNanos::default(),
3715        );
3716
3717        // Second bar with downward price movement (10 pips down)
3718        let bar2 = Bar::new(
3719            BarType::new(
3720                instrument.id(),
3721                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3722                AggregationSource::Internal,
3723            ),
3724            Price::from("1.00020"),
3725            Price::from("1.00025"),
3726            Price::from("1.00005"),
3727            Price::from("1.00010"), // 10 pip move down (exactly 1 brick)
3728            Quantity::from(50),
3729            UnixNanos::from(60_000_000_000),
3730            UnixNanos::from(60_000_000_000),
3731        );
3732
3733        aggregator.handle_bar(bar1);
3734        aggregator.handle_bar(bar2);
3735
3736        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3737        assert_eq!(handler_guard.len(), 1);
3738
3739        let bar = handler_guard.first().unwrap();
3740        assert_eq!(bar.open, Price::from("1.00020"));
3741        assert_eq!(bar.high, Price::from("1.00020"));
3742        assert_eq!(bar.low, Price::from("1.00010"));
3743        assert_eq!(bar.close, Price::from("1.00010"));
3744        assert_eq!(bar.volume, Quantity::from(150));
3745    }
3746
3747    #[rstest]
3748    fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3749        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3750
3751        // Test different brick sizes
3752        let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); // 5 pip brick size
3753        let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3754        let handler = Arc::new(Mutex::new(Vec::new()));
3755        let handler_clone = Arc::clone(&handler);
3756
3757        let aggregator_5 = RenkoBarAggregator::new(
3758            bar_type_5,
3759            instrument.price_precision(),
3760            instrument.size_precision(),
3761            instrument.price_increment(),
3762            move |_bar: Bar| {
3763                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3764                handler_guard.push(_bar);
3765            },
3766        );
3767
3768        // 5 pips * price_increment.raw (depends on precision mode)
3769        let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3770        assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3771
3772        let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); // 20 pip brick size
3773        let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3774        let handler2 = Arc::new(Mutex::new(Vec::new()));
3775        let handler2_clone = Arc::clone(&handler2);
3776
3777        let aggregator_20 = RenkoBarAggregator::new(
3778            bar_type_20,
3779            instrument.price_precision(),
3780            instrument.size_precision(),
3781            instrument.price_increment(),
3782            move |_bar: Bar| {
3783                let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3784                handler_guard.push(_bar);
3785            },
3786        );
3787
3788        // 20 pips * price_increment.raw (depends on precision mode)
3789        let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3790        assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3791    }
3792
3793    #[rstest]
3794    fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3795        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3796        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3797        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3798        let handler = Arc::new(Mutex::new(Vec::new()));
3799        let handler_clone = Arc::clone(&handler);
3800
3801        let mut aggregator = RenkoBarAggregator::new(
3802            bar_type,
3803            instrument.price_precision(),
3804            instrument.size_precision(),
3805            instrument.price_increment(),
3806            move |bar: Bar| {
3807                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3808                handler_guard.push(bar);
3809            },
3810        );
3811
3812        // Sequential updates creating multiple bars
3813        aggregator.update(
3814            Price::from("1.00000"),
3815            Quantity::from(1),
3816            UnixNanos::from(1000),
3817        );
3818        aggregator.update(
3819            Price::from("1.00010"),
3820            Quantity::from(1),
3821            UnixNanos::from(2000),
3822        ); // First brick
3823        aggregator.update(
3824            Price::from("1.00020"),
3825            Quantity::from(1),
3826            UnixNanos::from(3000),
3827        ); // Second brick
3828        aggregator.update(
3829            Price::from("1.00025"),
3830            Quantity::from(1),
3831            UnixNanos::from(4000),
3832        ); // Partial third brick
3833        aggregator.update(
3834            Price::from("1.00030"),
3835            Quantity::from(1),
3836            UnixNanos::from(5000),
3837        ); // Complete third brick
3838
3839        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3840        assert_eq!(handler_guard.len(), 3);
3841
3842        let bar1 = &handler_guard[0];
3843        assert_eq!(bar1.open, Price::from("1.00000"));
3844        assert_eq!(bar1.close, Price::from("1.00010"));
3845
3846        let bar2 = &handler_guard[1];
3847        assert_eq!(bar2.open, Price::from("1.00010"));
3848        assert_eq!(bar2.close, Price::from("1.00020"));
3849
3850        let bar3 = &handler_guard[2];
3851        assert_eq!(bar3.open, Price::from("1.00020"));
3852        assert_eq!(bar3.close, Price::from("1.00030"));
3853    }
3854
3855    #[rstest]
3856    fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3857        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3858        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3859        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3860        let handler = Arc::new(Mutex::new(Vec::new()));
3861        let handler_clone = Arc::clone(&handler);
3862
3863        let mut aggregator = RenkoBarAggregator::new(
3864            bar_type,
3865            instrument.price_precision(),
3866            instrument.size_precision(),
3867            instrument.price_increment(),
3868            move |bar: Bar| {
3869                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3870                handler_guard.push(bar);
3871            },
3872        );
3873
3874        // Mixed direction movement: up then down
3875        aggregator.update(
3876            Price::from("1.00000"),
3877            Quantity::from(1),
3878            UnixNanos::from(1000),
3879        );
3880        aggregator.update(
3881            Price::from("1.00010"),
3882            Quantity::from(1),
3883            UnixNanos::from(2000),
3884        ); // Up brick
3885        aggregator.update(
3886            Price::from("0.99990"),
3887            Quantity::from(1),
3888            UnixNanos::from(3000),
3889        ); // Down 2 bricks (20 pips)
3890
3891        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3892        assert_eq!(handler_guard.len(), 3);
3893
3894        let bar1 = &handler_guard[0]; // Up brick
3895        assert_eq!(bar1.open, Price::from("1.00000"));
3896        assert_eq!(bar1.high, Price::from("1.00010"));
3897        assert_eq!(bar1.low, Price::from("1.00000"));
3898        assert_eq!(bar1.close, Price::from("1.00010"));
3899
3900        let bar2 = &handler_guard[1]; // First down brick
3901        assert_eq!(bar2.open, Price::from("1.00010"));
3902        assert_eq!(bar2.high, Price::from("1.00010"));
3903        assert_eq!(bar2.low, Price::from("1.00000"));
3904        assert_eq!(bar2.close, Price::from("1.00000"));
3905
3906        let bar3 = &handler_guard[2]; // Second down brick
3907        assert_eq!(bar3.open, Price::from("1.00000"));
3908        assert_eq!(bar3.high, Price::from("1.00000"));
3909        assert_eq!(bar3.low, Price::from("0.99990"));
3910        assert_eq!(bar3.close, Price::from("0.99990"));
3911    }
3912
3913    #[rstest]
3914    fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3915        let instrument = InstrumentAny::Equity(equity_aapl);
3916        let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3917        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3918        let handler = Arc::new(Mutex::new(Vec::new()));
3919        let handler_clone = Arc::clone(&handler);
3920
3921        let mut aggregator = TickImbalanceBarAggregator::new(
3922            bar_type,
3923            instrument.price_precision(),
3924            instrument.size_precision(),
3925            move |bar: Bar| {
3926                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3927                handler_guard.push(bar);
3928            },
3929        );
3930
3931        let buy = TradeTick {
3932            aggressor_side: AggressorSide::Buyer,
3933            ..TradeTick::default()
3934        };
3935        let sell = TradeTick {
3936            aggressor_side: AggressorSide::Seller,
3937            ..TradeTick::default()
3938        };
3939
3940        aggregator.handle_trade(buy);
3941        aggregator.handle_trade(sell);
3942        aggregator.handle_trade(buy);
3943
3944        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3945        assert_eq!(handler_guard.len(), 0);
3946    }
3947
3948    #[rstest]
3949    fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3950        let instrument = InstrumentAny::Equity(equity_aapl);
3951        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3952        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3953        let handler = Arc::new(Mutex::new(Vec::new()));
3954        let handler_clone = Arc::clone(&handler);
3955
3956        let mut aggregator = TickImbalanceBarAggregator::new(
3957            bar_type,
3958            instrument.price_precision(),
3959            instrument.size_precision(),
3960            move |bar: Bar| {
3961                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3962                handler_guard.push(bar);
3963            },
3964        );
3965
3966        let buy = TradeTick {
3967            aggressor_side: AggressorSide::Buyer,
3968            ..TradeTick::default()
3969        };
3970        let no_aggressor = TradeTick {
3971            aggressor_side: AggressorSide::NoAggressor,
3972            ..TradeTick::default()
3973        };
3974
3975        aggregator.handle_trade(buy);
3976        aggregator.handle_trade(no_aggressor);
3977        aggregator.handle_trade(buy);
3978
3979        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3980        assert_eq!(handler_guard.len(), 1);
3981    }
3982
3983    #[rstest]
3984    fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3985        let instrument = InstrumentAny::Equity(equity_aapl);
3986        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3987        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3988        let handler = Arc::new(Mutex::new(Vec::new()));
3989        let handler_clone = Arc::clone(&handler);
3990
3991        let mut aggregator = TickRunsBarAggregator::new(
3992            bar_type,
3993            instrument.price_precision(),
3994            instrument.size_precision(),
3995            move |bar: Bar| {
3996                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3997                handler_guard.push(bar);
3998            },
3999        );
4000
4001        let buy = TradeTick {
4002            aggressor_side: AggressorSide::Buyer,
4003            ..TradeTick::default()
4004        };
4005        let sell = TradeTick {
4006            aggressor_side: AggressorSide::Seller,
4007            ..TradeTick::default()
4008        };
4009
4010        aggregator.handle_trade(buy);
4011        aggregator.handle_trade(buy);
4012        aggregator.handle_trade(sell);
4013        aggregator.handle_trade(sell);
4014
4015        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4016        assert_eq!(handler_guard.len(), 2);
4017    }
4018
4019    #[rstest]
4020    fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4021        let instrument = InstrumentAny::Equity(equity_aapl);
4022        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4023        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4024        let handler = Arc::new(Mutex::new(Vec::new()));
4025        let handler_clone = Arc::clone(&handler);
4026
4027        let mut aggregator = VolumeImbalanceBarAggregator::new(
4028            bar_type,
4029            instrument.price_precision(),
4030            instrument.size_precision(),
4031            move |bar: Bar| {
4032                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4033                handler_guard.push(bar);
4034            },
4035        );
4036
4037        let large_trade = TradeTick {
4038            size: Quantity::from(25),
4039            aggressor_side: AggressorSide::Buyer,
4040            ..TradeTick::default()
4041        };
4042
4043        aggregator.handle_trade(large_trade);
4044
4045        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4046        assert_eq!(handler_guard.len(), 2);
4047    }
4048
4049    #[rstest]
4050    fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4051        equity_aapl: Equity,
4052    ) {
4053        let instrument = InstrumentAny::Equity(equity_aapl);
4054        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4055        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4056        let handler = Arc::new(Mutex::new(Vec::new()));
4057        let handler_clone = Arc::clone(&handler);
4058
4059        let mut aggregator = VolumeImbalanceBarAggregator::new(
4060            bar_type,
4061            instrument.price_precision(),
4062            instrument.size_precision(),
4063            move |bar: Bar| {
4064                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4065                handler_guard.push(bar);
4066            },
4067        );
4068
4069        let buy = TradeTick {
4070            size: Quantity::from(5),
4071            aggressor_side: AggressorSide::Buyer,
4072            ..TradeTick::default()
4073        };
4074        let no_aggressor = TradeTick {
4075            size: Quantity::from(3),
4076            aggressor_side: AggressorSide::NoAggressor,
4077            ..TradeTick::default()
4078        };
4079
4080        aggregator.handle_trade(buy);
4081        aggregator.handle_trade(no_aggressor);
4082        aggregator.handle_trade(buy);
4083
4084        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4085        assert_eq!(handler_guard.len(), 1);
4086    }
4087
4088    #[rstest]
4089    fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4090        let instrument = InstrumentAny::Equity(equity_aapl);
4091        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4092        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4093        let handler = Arc::new(Mutex::new(Vec::new()));
4094        let handler_clone = Arc::clone(&handler);
4095
4096        let mut aggregator = VolumeRunsBarAggregator::new(
4097            bar_type,
4098            instrument.price_precision(),
4099            instrument.size_precision(),
4100            move |bar: Bar| {
4101                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4102                handler_guard.push(bar);
4103            },
4104        );
4105
4106        let large_trade = TradeTick {
4107            size: Quantity::from(25),
4108            aggressor_side: AggressorSide::Buyer,
4109            ..TradeTick::default()
4110        };
4111
4112        aggregator.handle_trade(large_trade);
4113
4114        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4115        assert_eq!(handler_guard.len(), 2);
4116    }
4117
4118    #[rstest]
4119    fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4120        let instrument = InstrumentAny::Equity(equity_aapl);
4121        let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4122        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4123        let handler = Arc::new(Mutex::new(Vec::new()));
4124        let handler_clone = Arc::clone(&handler);
4125
4126        let mut aggregator = ValueRunsBarAggregator::new(
4127            bar_type,
4128            instrument.price_precision(),
4129            instrument.size_precision(),
4130            move |bar: Bar| {
4131                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4132                handler_guard.push(bar);
4133            },
4134        );
4135
4136        let large_trade = TradeTick {
4137            price: Price::from("5.00"),
4138            size: Quantity::from(25),
4139            aggressor_side: AggressorSide::Buyer,
4140            ..TradeTick::default()
4141        };
4142
4143        aggregator.handle_trade(large_trade);
4144
4145        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4146        assert_eq!(handler_guard.len(), 2);
4147    }
4148
4149    #[rstest]
4150    fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4151        let instrument = InstrumentAny::Equity(equity_aapl);
4152        let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
4153        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4154        let handler = Arc::new(Mutex::new(Vec::new()));
4155        let handler_clone = Arc::clone(&handler);
4156
4157        let mut aggregator = ValueBarAggregator::new(
4158            bar_type,
4159            instrument.price_precision(),
4160            instrument.size_precision(),
4161            move |bar: Bar| {
4162                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4163                handler_guard.push(bar);
4164            },
4165        );
4166
4167        // price=1000, size=3, value=3000, step=100 → size_chunk=0.1 rounds to 0 at precision 0
4168        aggregator.update(
4169            Price::from("1000.00"),
4170            Quantity::from(3),
4171            UnixNanos::default(),
4172        );
4173
4174        // 3 bars (one per min-size unit), not 30 zero-volume bars
4175        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4176        assert_eq!(handler_guard.len(), 3);
4177        for bar in handler_guard.iter() {
4178            assert_eq!(bar.volume, Quantity::from(1));
4179        }
4180    }
4181
4182    #[rstest]
4183    fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4184        let instrument = InstrumentAny::Equity(equity_aapl);
4185        let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
4186        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4187        let handler = Arc::new(Mutex::new(Vec::new()));
4188        let handler_clone = Arc::clone(&handler);
4189
4190        let mut aggregator = ValueImbalanceBarAggregator::new(
4191            bar_type,
4192            instrument.price_precision(),
4193            instrument.size_precision(),
4194            move |bar: Bar| {
4195                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4196                handler_guard.push(bar);
4197            },
4198        );
4199
4200        let trade = TradeTick {
4201            price: Price::from("1000.00"),
4202            size: Quantity::from(3),
4203            aggressor_side: AggressorSide::Buyer,
4204            instrument_id: instrument.id(),
4205            ..TradeTick::default()
4206        };
4207
4208        aggregator.handle_trade(trade);
4209
4210        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4211        assert_eq!(handler_guard.len(), 3);
4212        for bar in handler_guard.iter() {
4213            assert_eq!(bar.volume, Quantity::from(1));
4214        }
4215    }
4216
4217    #[rstest]
4218    fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
4219        let instrument = InstrumentAny::Equity(equity_aapl);
4220        let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
4221        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4222        let handler = Arc::new(Mutex::new(Vec::new()));
4223        let handler_clone = Arc::clone(&handler);
4224
4225        let mut aggregator = ValueImbalanceBarAggregator::new(
4226            bar_type,
4227            instrument.price_precision(),
4228            instrument.size_precision(),
4229            move |bar: Bar| {
4230                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4231                handler_guard.push(bar);
4232            },
4233        );
4234
4235        // Build seller imbalance of -50 (below step=100, no bar yet)
4236        let sell_tick = TradeTick {
4237            price: Price::from("10.00"),
4238            size: Quantity::from(5),
4239            aggressor_side: AggressorSide::Seller,
4240            instrument_id: instrument.id(),
4241            ..TradeTick::default()
4242        };
4243
4244        // Opposite-side buyer: flatten amount 50/1000=0.05 < min_size (1),
4245        // clamp overshoots imbalance from -50 to +950, crossing threshold
4246        let buy_tick = TradeTick {
4247            price: Price::from("1000.00"),
4248            size: Quantity::from(1),
4249            aggressor_side: AggressorSide::Buyer,
4250            instrument_id: instrument.id(),
4251            ts_init: UnixNanos::from(1),
4252            ts_event: UnixNanos::from(1),
4253            ..TradeTick::default()
4254        };
4255
4256        aggregator.handle_trade(sell_tick);
4257        aggregator.handle_trade(buy_tick);
4258
4259        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4260        assert_eq!(handler_guard.len(), 1);
4261        assert_eq!(handler_guard[0].volume, Quantity::from(6));
4262    }
4263
4264    #[rstest]
4265    fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4266        let instrument = InstrumentAny::Equity(equity_aapl);
4267        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4268        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4269        let handler = Arc::new(Mutex::new(Vec::new()));
4270        let handler_clone = Arc::clone(&handler);
4271
4272        let mut aggregator = ValueRunsBarAggregator::new(
4273            bar_type,
4274            instrument.price_precision(),
4275            instrument.size_precision(),
4276            move |bar: Bar| {
4277                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4278                handler_guard.push(bar);
4279            },
4280        );
4281
4282        let trade = TradeTick {
4283            price: Price::from("1000.00"),
4284            size: Quantity::from(3),
4285            aggressor_side: AggressorSide::Buyer,
4286            instrument_id: instrument.id(),
4287            ..TradeTick::default()
4288        };
4289
4290        aggregator.handle_trade(trade);
4291
4292        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4293        assert_eq!(handler_guard.len(), 3);
4294        for bar in handler_guard.iter() {
4295            assert_eq!(bar.volume, Quantity::from(1));
4296        }
4297    }
4298
4299    #[rstest]
4300    #[case(1000_u64)]
4301    #[case(1500_u64)]
4302    fn test_volume_imbalance_bar_aggregator_large_step_no_overflow(
4303        equity_aapl: Equity,
4304        #[case] step: u64,
4305    ) {
4306        let instrument = InstrumentAny::Equity(equity_aapl);
4307        let bar_spec = BarSpecification::new(
4308            step as usize,
4309            BarAggregation::VolumeImbalance,
4310            PriceType::Last,
4311        );
4312        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4313        let handler = Arc::new(Mutex::new(Vec::new()));
4314        let handler_clone = Arc::clone(&handler);
4315
4316        let mut aggregator = VolumeImbalanceBarAggregator::new(
4317            bar_type,
4318            instrument.price_precision(),
4319            instrument.size_precision(),
4320            move |bar: Bar| {
4321                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4322                handler_guard.push(bar);
4323            },
4324        );
4325
4326        let trade = TradeTick {
4327            size: Quantity::from(step * 2),
4328            aggressor_side: AggressorSide::Buyer,
4329            ..TradeTick::default()
4330        };
4331
4332        aggregator.handle_trade(trade);
4333
4334        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4335        assert_eq!(handler_guard.len(), 2);
4336        for bar in handler_guard.iter() {
4337            assert_eq!(bar.volume.as_f64(), step as f64);
4338        }
4339    }
4340
4341    #[rstest]
4342    fn test_volume_imbalance_bar_aggregator_different_large_steps_produce_different_bar_counts(
4343        equity_aapl: Equity,
4344    ) {
4345        let instrument = InstrumentAny::Equity(equity_aapl);
4346        let total_volume = 3000_u64;
4347        let mut results = Vec::new();
4348
4349        for step in [1000_usize, 1500] {
4350            let bar_spec =
4351                BarSpecification::new(step, BarAggregation::VolumeImbalance, PriceType::Last);
4352            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4353            let handler = Arc::new(Mutex::new(Vec::new()));
4354            let handler_clone = Arc::clone(&handler);
4355
4356            let mut aggregator = VolumeImbalanceBarAggregator::new(
4357                bar_type,
4358                instrument.price_precision(),
4359                instrument.size_precision(),
4360                move |bar: Bar| {
4361                    let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4362                    handler_guard.push(bar);
4363                },
4364            );
4365
4366            let trade = TradeTick {
4367                size: Quantity::from(total_volume),
4368                aggressor_side: AggressorSide::Buyer,
4369                ..TradeTick::default()
4370            };
4371
4372            aggregator.handle_trade(trade);
4373
4374            let handler_guard = handler.lock().expect(MUTEX_POISONED);
4375            results.push(handler_guard.len());
4376        }
4377
4378        assert_eq!(results[0], 3); // 3000 / 1000
4379        assert_eq!(results[1], 2); // 3000 / 1500
4380        assert_ne!(results[0], results[1]);
4381    }
4382
4383    #[rstest]
4384    #[case(1000_u64)]
4385    #[case(1500_u64)]
4386    fn test_volume_runs_bar_aggregator_large_step_no_overflow(
4387        equity_aapl: Equity,
4388        #[case] step: u64,
4389    ) {
4390        let instrument = InstrumentAny::Equity(equity_aapl);
4391        let bar_spec =
4392            BarSpecification::new(step as usize, BarAggregation::VolumeRuns, PriceType::Last);
4393        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4394        let handler = Arc::new(Mutex::new(Vec::new()));
4395        let handler_clone = Arc::clone(&handler);
4396
4397        let mut aggregator = VolumeRunsBarAggregator::new(
4398            bar_type,
4399            instrument.price_precision(),
4400            instrument.size_precision(),
4401            move |bar: Bar| {
4402                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4403                handler_guard.push(bar);
4404            },
4405        );
4406
4407        let trade = TradeTick {
4408            size: Quantity::from(step * 2),
4409            aggressor_side: AggressorSide::Buyer,
4410            ..TradeTick::default()
4411        };
4412
4413        aggregator.handle_trade(trade);
4414
4415        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4416        assert_eq!(handler_guard.len(), 2);
4417        for bar in handler_guard.iter() {
4418            assert_eq!(bar.volume.as_f64(), step as f64);
4419        }
4420    }
4421
4422    #[rstest]
4423    fn test_volume_runs_bar_aggregator_different_large_steps_produce_different_bar_counts(
4424        equity_aapl: Equity,
4425    ) {
4426        let instrument = InstrumentAny::Equity(equity_aapl);
4427        let total_volume = 3000_u64;
4428        let mut results = Vec::new();
4429
4430        for step in [1000_usize, 1500] {
4431            let bar_spec = BarSpecification::new(step, BarAggregation::VolumeRuns, PriceType::Last);
4432            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4433            let handler = Arc::new(Mutex::new(Vec::new()));
4434            let handler_clone = Arc::clone(&handler);
4435
4436            let mut aggregator = VolumeRunsBarAggregator::new(
4437                bar_type,
4438                instrument.price_precision(),
4439                instrument.size_precision(),
4440                move |bar: Bar| {
4441                    let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4442                    handler_guard.push(bar);
4443                },
4444            );
4445
4446            let trade = TradeTick {
4447                size: Quantity::from(total_volume),
4448                aggressor_side: AggressorSide::Buyer,
4449                ..TradeTick::default()
4450            };
4451
4452            aggregator.handle_trade(trade);
4453
4454            let handler_guard = handler.lock().expect(MUTEX_POISONED);
4455            results.push(handler_guard.len());
4456        }
4457
4458        assert_eq!(results[0], 3); // 3000 / 1000
4459        assert_eq!(results[1], 2); // 3000 / 1500
4460        assert_ne!(results[0], results[1]);
4461    }
4462}