Skip to main content

nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{
22    any::Any,
23    cell::RefCell,
24    fmt::Debug,
25    ops::Add,
26    rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31    clock::{Clock, TestClock},
32    timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35    UnixNanos,
36    correctness::{self, FAILED},
37    datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40    data::{
41        QuoteTick, TradeTick,
42        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43    },
44    enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45    types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48/// Type alias for bar handler to reduce type complexity.
49type BarHandler = Box<dyn FnMut(Bar)>;
50
51/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
52///
53/// Implementors receive updates and produce completed bars via handlers.
54pub trait BarAggregator: Any + Debug {
55    /// The [`BarType`] to be aggregated.
56    fn bar_type(&self) -> BarType;
57    /// If the aggregator is running and will receive data from the message bus.
58    fn is_running(&self) -> bool;
59    /// Sets the running state of the aggregator (receiving updates when `true`).
60    fn set_is_running(&mut self, value: bool);
61    /// Updates the aggregator  with the given price and size.
62    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63    /// Updates the aggregator with the given quote.
64    fn handle_quote(&mut self, quote: QuoteTick) {
65        let spec = self.bar_type().spec();
66        self.update(
67            quote.extract_price(spec.price_type),
68            quote.extract_size(spec.price_type),
69            quote.ts_init,
70        );
71    }
72    /// Updates the aggregator with the given trade.
73    fn handle_trade(&mut self, trade: TradeTick) {
74        self.update(trade.price, trade.size, trade.ts_init);
75    }
76    /// Updates the aggregator with the given bar.
77    fn handle_bar(&mut self, bar: Bar) {
78        self.update_bar(bar, bar.volume, bar.ts_init);
79    }
80    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
82    fn stop(&mut self) {}
83    /// Sets historical mode (default implementation does nothing, TimeBarAggregator overrides)
84    fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85    /// Sets historical events (default implementation does nothing, TimeBarAggregator overrides)
86    fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87    /// Sets clock for time bar aggregators (default implementation does nothing, TimeBarAggregator overrides)
88    fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89    /// Builds a bar from a time event (default implementation does nothing, TimeBarAggregator overrides)
90    fn build_bar(&mut self, _event: TimeEvent) {}
91    /// Starts the timer for time bar aggregators.
92    /// Default implementation does nothing, TimeBarAggregator overrides.
93    /// Takes an optional Rc to create weak reference internally.
94    fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95    /// Sets the weak reference to the aggregator wrapper (for historical mode).
96    /// Default implementation does nothing, TimeBarAggregator overrides.
97    fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101    /// Returns a reference to this aggregator as `Any` for downcasting.
102    pub fn as_any(&self) -> &dyn Any {
103        self
104    }
105    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
106    pub fn as_any_mut(&mut self) -> &mut dyn Any {
107        self
108    }
109}
110
111/// Provides a generic bar builder for aggregation.
112#[derive(Debug)]
113pub struct BarBuilder {
114    bar_type: BarType,
115    price_precision: u8,
116    size_precision: u8,
117    initialized: bool,
118    ts_last: UnixNanos,
119    count: usize,
120    last_close: Option<Price>,
121    open: Option<Price>,
122    high: Option<Price>,
123    low: Option<Price>,
124    close: Option<Price>,
125    volume: Quantity,
126}
127
128impl BarBuilder {
129    /// Creates a new [`BarBuilder`] instance.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
134    #[must_use]
135    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
136        correctness::check_equal(
137            &bar_type.aggregation_source(),
138            &AggregationSource::Internal,
139            "bar_type.aggregation_source",
140            "AggregationSource::Internal",
141        )
142        .expect(FAILED);
143
144        Self {
145            bar_type,
146            price_precision,
147            size_precision,
148            initialized: false,
149            ts_last: UnixNanos::default(),
150            count: 0,
151            last_close: None,
152            open: None,
153            high: None,
154            low: None,
155            close: None,
156            volume: Quantity::zero(size_precision),
157        }
158    }
159
160    /// Updates the builder state with the given price, size, and init timestamp.
161    ///
162    /// # Panics
163    ///
164    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
165    pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
166        if ts_init < self.ts_last {
167            return; // Not applicable
168        }
169
170        if self.open.is_none() {
171            self.open = Some(price);
172            self.high = Some(price);
173            self.low = Some(price);
174            self.initialized = true;
175        } else {
176            if price > self.high.unwrap() {
177                self.high = Some(price);
178            }
179            if price < self.low.unwrap() {
180                self.low = Some(price);
181            }
182        }
183
184        self.close = Some(price);
185        self.volume = self.volume.add(size);
186        self.count += 1;
187        self.ts_last = ts_init;
188    }
189
190    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
191    ///
192    /// # Panics
193    ///
194    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
195    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
196        if ts_init < self.ts_last {
197            return; // Not applicable
198        }
199
200        if self.open.is_none() {
201            self.open = Some(bar.open);
202            self.high = Some(bar.high);
203            self.low = Some(bar.low);
204            self.initialized = true;
205        } else {
206            if bar.high > self.high.unwrap() {
207                self.high = Some(bar.high);
208            }
209            if bar.low < self.low.unwrap() {
210                self.low = Some(bar.low);
211            }
212        }
213
214        self.close = Some(bar.close);
215        self.volume = self.volume.add(volume);
216        self.count += 1;
217        self.ts_last = ts_init;
218    }
219
220    /// Reset the bar builder.
221    ///
222    /// All stateful fields are reset to their initial value.
223    pub fn reset(&mut self) {
224        self.open = None;
225        self.high = None;
226        self.low = None;
227        self.volume = Quantity::zero(self.size_precision);
228        self.count = 0;
229    }
230
231    /// Return the aggregated bar and reset.
232    pub fn build_now(&mut self) -> Bar {
233        self.build(self.ts_last, self.ts_last)
234    }
235
236    /// Returns the aggregated bar for the given timestamps, then resets the builder.
237    ///
238    /// # Panics
239    ///
240    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
241    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
242        if self.open.is_none() {
243            self.open = self.last_close;
244            self.high = self.last_close;
245            self.low = self.last_close;
246            self.close = self.last_close;
247        }
248
249        if let (Some(close), Some(low)) = (self.close, self.low)
250            && close < low
251        {
252            self.low = Some(close);
253        }
254
255        if let (Some(close), Some(high)) = (self.close, self.high)
256            && close > high
257        {
258            self.high = Some(close);
259        }
260
261        // SAFETY: The open was checked, so we can assume all prices are Some
262        let bar = Bar::new(
263            self.bar_type,
264            self.open.unwrap(),
265            self.high.unwrap(),
266            self.low.unwrap(),
267            self.close.unwrap(),
268            self.volume,
269            ts_event,
270            ts_init,
271        );
272
273        self.last_close = self.close;
274        self.reset();
275        bar
276    }
277}
278
279/// Provides a means of aggregating specified bar types and sending to a registered handler.
280pub struct BarAggregatorCore {
281    bar_type: BarType,
282    builder: BarBuilder,
283    handler: BarHandler,
284    is_running: bool,
285}
286
287impl Debug for BarAggregatorCore {
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        f.debug_struct(stringify!(BarAggregatorCore))
290            .field("bar_type", &self.bar_type)
291            .field("builder", &self.builder)
292            .field("is_running", &self.is_running)
293            .finish()
294    }
295}
296
297impl BarAggregatorCore {
298    /// Creates a new [`BarAggregatorCore`] instance.
299    ///
300    /// # Panics
301    ///
302    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
303    pub fn new<H: FnMut(Bar) + 'static>(
304        bar_type: BarType,
305        price_precision: u8,
306        size_precision: u8,
307        handler: H,
308    ) -> Self {
309        Self {
310            bar_type,
311            builder: BarBuilder::new(bar_type, price_precision, size_precision),
312            handler: Box::new(handler),
313            is_running: false,
314        }
315    }
316
317    /// Sets the running state of the aggregator (receives updates when `true`).
318    pub const fn set_is_running(&mut self, value: bool) {
319        self.is_running = value;
320    }
321    fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
322        self.builder.update(price, size, ts_init);
323    }
324
325    fn build_now_and_send(&mut self) {
326        let bar = self.builder.build_now();
327        (self.handler)(bar);
328    }
329
330    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
331        let bar = self.builder.build(ts_event, ts_init);
332        (self.handler)(bar);
333    }
334}
335
336/// Provides a means of building tick bars aggregated from quote and trades.
337///
338/// When received tick count reaches the step threshold of the bar
339/// specification, then a bar is created and sent to the handler.
340pub struct TickBarAggregator {
341    core: BarAggregatorCore,
342}
343
344impl Debug for TickBarAggregator {
345    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346        f.debug_struct(stringify!(TickBarAggregator))
347            .field("core", &self.core)
348            .finish()
349    }
350}
351
352impl TickBarAggregator {
353    /// Creates a new [`TickBarAggregator`] instance.
354    ///
355    /// # Panics
356    ///
357    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
358    pub fn new<H: FnMut(Bar) + 'static>(
359        bar_type: BarType,
360        price_precision: u8,
361        size_precision: u8,
362        handler: H,
363    ) -> Self {
364        Self {
365            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
366        }
367    }
368}
369
370impl BarAggregator for TickBarAggregator {
371    fn bar_type(&self) -> BarType {
372        self.core.bar_type
373    }
374
375    fn is_running(&self) -> bool {
376        self.core.is_running
377    }
378
379    fn set_is_running(&mut self, value: bool) {
380        self.core.set_is_running(value);
381    }
382
383    /// Apply the given update to the aggregator.
384    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
385        self.core.apply_update(price, size, ts_init);
386        let spec = self.core.bar_type.spec();
387
388        if self.core.builder.count >= spec.step.get() {
389            self.core.build_now_and_send();
390        }
391    }
392
393    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
394        self.core.builder.update_bar(bar, volume, ts_init);
395        let spec = self.core.bar_type.spec();
396
397        if self.core.builder.count >= spec.step.get() {
398            self.core.build_now_and_send();
399        }
400    }
401}
402
403/// Aggregates bars based on tick buy/sell imbalance.
404///
405/// Increments imbalance by +1 for buyer-aggressed trades and -1 for seller-aggressed trades.
406/// Emits a bar when the absolute imbalance reaches the step threshold.
407pub struct TickImbalanceBarAggregator {
408    core: BarAggregatorCore,
409    imbalance: isize,
410}
411
412impl Debug for TickImbalanceBarAggregator {
413    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
414        f.debug_struct(stringify!(TickImbalanceBarAggregator))
415            .field("core", &self.core)
416            .field("imbalance", &self.imbalance)
417            .finish()
418    }
419}
420
421impl TickImbalanceBarAggregator {
422    /// Creates a new [`TickImbalanceBarAggregator`] instance.
423    ///
424    /// # Panics
425    ///
426    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
427    pub fn new<H: FnMut(Bar) + 'static>(
428        bar_type: BarType,
429        price_precision: u8,
430        size_precision: u8,
431        handler: H,
432    ) -> Self {
433        Self {
434            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
435            imbalance: 0,
436        }
437    }
438}
439
440impl BarAggregator for TickImbalanceBarAggregator {
441    fn bar_type(&self) -> BarType {
442        self.core.bar_type
443    }
444
445    fn is_running(&self) -> bool {
446        self.core.is_running
447    }
448
449    fn set_is_running(&mut self, value: bool) {
450        self.core.set_is_running(value);
451    }
452
453    /// Apply the given update to the aggregator.
454    ///
455    /// Note: side-aware logic lives in `handle_trade`. This method is used for
456    /// quote/bar updates where no aggressor side is available.
457    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
458        self.core.apply_update(price, size, ts_init);
459    }
460
461    fn handle_trade(&mut self, trade: TradeTick) {
462        self.core
463            .apply_update(trade.price, trade.size, trade.ts_init);
464
465        let delta = match trade.aggressor_side {
466            AggressorSide::Buyer => 1,
467            AggressorSide::Seller => -1,
468            AggressorSide::NoAggressor => 0,
469        };
470
471        if delta == 0 {
472            return;
473        }
474
475        self.imbalance += delta;
476        let threshold = self.core.bar_type.spec().step.get();
477        if self.imbalance.unsigned_abs() >= threshold {
478            self.core.build_now_and_send();
479            self.imbalance = 0;
480        }
481    }
482
483    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
484        self.core.builder.update_bar(bar, volume, ts_init);
485    }
486}
487
488/// Aggregates bars based on consecutive buy/sell tick runs.
489pub struct TickRunsBarAggregator {
490    core: BarAggregatorCore,
491    current_run_side: Option<AggressorSide>,
492    run_count: usize,
493}
494
495impl Debug for TickRunsBarAggregator {
496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497        f.debug_struct(stringify!(TickRunsBarAggregator))
498            .field("core", &self.core)
499            .field("current_run_side", &self.current_run_side)
500            .field("run_count", &self.run_count)
501            .finish()
502    }
503}
504
505impl TickRunsBarAggregator {
506    /// Creates a new [`TickRunsBarAggregator`] instance.
507    ///
508    /// # Panics
509    ///
510    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
511    pub fn new<H: FnMut(Bar) + 'static>(
512        bar_type: BarType,
513        price_precision: u8,
514        size_precision: u8,
515        handler: H,
516    ) -> Self {
517        Self {
518            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
519            current_run_side: None,
520            run_count: 0,
521        }
522    }
523}
524
525impl BarAggregator for TickRunsBarAggregator {
526    fn bar_type(&self) -> BarType {
527        self.core.bar_type
528    }
529
530    fn is_running(&self) -> bool {
531        self.core.is_running
532    }
533
534    fn set_is_running(&mut self, value: bool) {
535        self.core.set_is_running(value);
536    }
537
538    /// Apply the given update to the aggregator.
539    ///
540    /// Note: side-aware logic lives in `handle_trade`. This method is used for
541    /// quote/bar updates where no aggressor side is available.
542    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
543        self.core.apply_update(price, size, ts_init);
544    }
545
546    fn handle_trade(&mut self, trade: TradeTick) {
547        let side = match trade.aggressor_side {
548            AggressorSide::Buyer => Some(AggressorSide::Buyer),
549            AggressorSide::Seller => Some(AggressorSide::Seller),
550            AggressorSide::NoAggressor => None,
551        };
552
553        if let Some(side) = side {
554            if self.current_run_side != Some(side) {
555                self.current_run_side = Some(side);
556                self.run_count = 0;
557                self.core.builder.reset();
558            }
559
560            self.core
561                .apply_update(trade.price, trade.size, trade.ts_init);
562            self.run_count += 1;
563
564            let threshold = self.core.bar_type.spec().step.get();
565            if self.run_count >= threshold {
566                self.core.build_now_and_send();
567                self.run_count = 0;
568                self.current_run_side = None;
569            }
570        } else {
571            self.core
572                .apply_update(trade.price, trade.size, trade.ts_init);
573        }
574    }
575
576    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
577        self.core.builder.update_bar(bar, volume, ts_init);
578    }
579}
580
581/// Provides a means of building volume bars aggregated from quote and trades.
582pub struct VolumeBarAggregator {
583    core: BarAggregatorCore,
584}
585
586impl Debug for VolumeBarAggregator {
587    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588        f.debug_struct(stringify!(VolumeBarAggregator))
589            .field("core", &self.core)
590            .finish()
591    }
592}
593
594impl VolumeBarAggregator {
595    /// Creates a new [`VolumeBarAggregator`] instance.
596    ///
597    /// # Panics
598    ///
599    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
600    pub fn new<H: FnMut(Bar) + 'static>(
601        bar_type: BarType,
602        price_precision: u8,
603        size_precision: u8,
604        handler: H,
605    ) -> Self {
606        Self {
607            core: BarAggregatorCore::new(
608                bar_type.standard(),
609                price_precision,
610                size_precision,
611                handler,
612            ),
613        }
614    }
615}
616
617impl BarAggregator for VolumeBarAggregator {
618    fn bar_type(&self) -> BarType {
619        self.core.bar_type
620    }
621
622    fn is_running(&self) -> bool {
623        self.core.is_running
624    }
625
626    fn set_is_running(&mut self, value: bool) {
627        self.core.set_is_running(value);
628    }
629
630    /// Apply the given update to the aggregator.
631    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
632        let mut raw_size_update = size.raw;
633        let spec = self.core.bar_type.spec();
634        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
635
636        while raw_size_update > 0 {
637            if self.core.builder.volume.raw + raw_size_update < raw_step {
638                self.core.apply_update(
639                    price,
640                    Quantity::from_raw(raw_size_update, size.precision),
641                    ts_init,
642                );
643                break;
644            }
645
646            let raw_size_diff = raw_step - self.core.builder.volume.raw;
647            self.core.apply_update(
648                price,
649                Quantity::from_raw(raw_size_diff, size.precision),
650                ts_init,
651            );
652
653            self.core.build_now_and_send();
654            raw_size_update -= raw_size_diff;
655        }
656    }
657
658    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
659        let mut raw_volume_update = volume.raw;
660        let spec = self.core.bar_type.spec();
661        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
662
663        while raw_volume_update > 0 {
664            if self.core.builder.volume.raw + raw_volume_update < raw_step {
665                self.core.builder.update_bar(
666                    bar,
667                    Quantity::from_raw(raw_volume_update, volume.precision),
668                    ts_init,
669                );
670                break;
671            }
672
673            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
674            self.core.builder.update_bar(
675                bar,
676                Quantity::from_raw(raw_volume_diff, volume.precision),
677                ts_init,
678            );
679
680            self.core.build_now_and_send();
681            raw_volume_update -= raw_volume_diff;
682        }
683    }
684}
685
686/// Aggregates bars based on buy/sell volume imbalance.
687pub struct VolumeImbalanceBarAggregator {
688    core: BarAggregatorCore,
689    imbalance_raw: i128,
690    raw_step: i128,
691}
692
693impl Debug for VolumeImbalanceBarAggregator {
694    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
695        f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
696            .field("core", &self.core)
697            .field("imbalance_raw", &self.imbalance_raw)
698            .field("raw_step", &self.raw_step)
699            .finish()
700    }
701}
702
703impl VolumeImbalanceBarAggregator {
704    /// Creates a new [`VolumeImbalanceBarAggregator`] instance.
705    ///
706    /// # Panics
707    ///
708    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
709    pub fn new<H: FnMut(Bar) + 'static>(
710        bar_type: BarType,
711        price_precision: u8,
712        size_precision: u8,
713        handler: H,
714    ) -> Self {
715        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
716        Self {
717            core: BarAggregatorCore::new(
718                bar_type.standard(),
719                price_precision,
720                size_precision,
721                handler,
722            ),
723            imbalance_raw: 0,
724            raw_step,
725        }
726    }
727}
728
729impl BarAggregator for VolumeImbalanceBarAggregator {
730    fn bar_type(&self) -> BarType {
731        self.core.bar_type
732    }
733
734    fn is_running(&self) -> bool {
735        self.core.is_running
736    }
737
738    fn set_is_running(&mut self, value: bool) {
739        self.core.set_is_running(value);
740    }
741
742    /// Apply the given update to the aggregator.
743    ///
744    /// Note: side-aware logic lives in `handle_trade`. This method is used for
745    /// quote/bar updates where no aggressor side is available.
746    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
747        self.core.apply_update(price, size, ts_init);
748    }
749
750    fn handle_trade(&mut self, trade: TradeTick) {
751        let side = match trade.aggressor_side {
752            AggressorSide::Buyer => 1,
753            AggressorSide::Seller => -1,
754            AggressorSide::NoAggressor => {
755                self.core
756                    .apply_update(trade.price, trade.size, trade.ts_init);
757                return;
758            }
759        };
760
761        let mut raw_remaining = trade.size.raw as i128;
762        while raw_remaining > 0 {
763            let imbalance_abs = self.imbalance_raw.abs();
764            let needed = (self.raw_step - imbalance_abs).max(1);
765            let raw_chunk = raw_remaining.min(needed);
766            let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
767
768            self.core
769                .apply_update(trade.price, qty_chunk, trade.ts_init);
770
771            self.imbalance_raw += side * raw_chunk;
772            raw_remaining -= raw_chunk;
773
774            if self.imbalance_raw.abs() >= self.raw_step {
775                self.core.build_now_and_send();
776                self.imbalance_raw = 0;
777            }
778        }
779    }
780
781    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
782        self.core.builder.update_bar(bar, volume, ts_init);
783    }
784}
785
786/// Aggregates bars based on consecutive buy/sell volume runs.
787pub struct VolumeRunsBarAggregator {
788    core: BarAggregatorCore,
789    current_run_side: Option<AggressorSide>,
790    run_volume_raw: QuantityRaw,
791    raw_step: QuantityRaw,
792}
793
794impl Debug for VolumeRunsBarAggregator {
795    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
796        f.debug_struct(stringify!(VolumeRunsBarAggregator))
797            .field("core", &self.core)
798            .field("current_run_side", &self.current_run_side)
799            .field("run_volume_raw", &self.run_volume_raw)
800            .field("raw_step", &self.raw_step)
801            .finish()
802    }
803}
804
805impl VolumeRunsBarAggregator {
806    /// Creates a new [`VolumeRunsBarAggregator`] instance.
807    ///
808    /// # Panics
809    ///
810    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
811    pub fn new<H: FnMut(Bar) + 'static>(
812        bar_type: BarType,
813        price_precision: u8,
814        size_precision: u8,
815        handler: H,
816    ) -> Self {
817        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
818        Self {
819            core: BarAggregatorCore::new(
820                bar_type.standard(),
821                price_precision,
822                size_precision,
823                handler,
824            ),
825            current_run_side: None,
826            run_volume_raw: 0,
827            raw_step,
828        }
829    }
830}
831
832impl BarAggregator for VolumeRunsBarAggregator {
833    fn bar_type(&self) -> BarType {
834        self.core.bar_type
835    }
836
837    fn is_running(&self) -> bool {
838        self.core.is_running
839    }
840
841    fn set_is_running(&mut self, value: bool) {
842        self.core.set_is_running(value);
843    }
844
845    /// Apply the given update to the aggregator.
846    ///
847    /// Note: side-aware logic lives in `handle_trade`. This method is used for
848    /// quote/bar updates where no aggressor side is available.
849    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
850        self.core.apply_update(price, size, ts_init);
851    }
852
853    fn handle_trade(&mut self, trade: TradeTick) {
854        let side = match trade.aggressor_side {
855            AggressorSide::Buyer => Some(AggressorSide::Buyer),
856            AggressorSide::Seller => Some(AggressorSide::Seller),
857            AggressorSide::NoAggressor => None,
858        };
859
860        let Some(side) = side else {
861            self.core
862                .apply_update(trade.price, trade.size, trade.ts_init);
863            return;
864        };
865
866        if self.current_run_side != Some(side) {
867            self.current_run_side = Some(side);
868            self.run_volume_raw = 0;
869            self.core.builder.reset();
870        }
871
872        let mut raw_remaining = trade.size.raw;
873        while raw_remaining > 0 {
874            let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
875            let raw_chunk = raw_remaining.min(needed);
876
877            self.core.apply_update(
878                trade.price,
879                Quantity::from_raw(raw_chunk, trade.size.precision),
880                trade.ts_init,
881            );
882
883            self.run_volume_raw += raw_chunk;
884            raw_remaining -= raw_chunk;
885
886            if self.run_volume_raw >= self.raw_step {
887                self.core.build_now_and_send();
888                self.run_volume_raw = 0;
889                self.current_run_side = None;
890            }
891        }
892    }
893
894    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
895        self.core.builder.update_bar(bar, volume, ts_init);
896    }
897}
898
899/// Provides a means of building value bars aggregated from quote and trades.
900///
901/// When received value reaches the step threshold of the bar
902/// specification, then a bar is created and sent to the handler.
903pub struct ValueBarAggregator {
904    core: BarAggregatorCore,
905    cum_value: f64,
906}
907
908impl Debug for ValueBarAggregator {
909    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
910        f.debug_struct(stringify!(ValueBarAggregator))
911            .field("core", &self.core)
912            .field("cum_value", &self.cum_value)
913            .finish()
914    }
915}
916
917impl ValueBarAggregator {
918    /// Creates a new [`ValueBarAggregator`] instance.
919    ///
920    /// # Panics
921    ///
922    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
923    pub fn new<H: FnMut(Bar) + 'static>(
924        bar_type: BarType,
925        price_precision: u8,
926        size_precision: u8,
927        handler: H,
928    ) -> Self {
929        Self {
930            core: BarAggregatorCore::new(
931                bar_type.standard(),
932                price_precision,
933                size_precision,
934                handler,
935            ),
936            cum_value: 0.0,
937        }
938    }
939
940    #[must_use]
941    /// Returns the cumulative value for the aggregator.
942    pub const fn get_cumulative_value(&self) -> f64 {
943        self.cum_value
944    }
945}
946
947impl BarAggregator for ValueBarAggregator {
948    fn bar_type(&self) -> BarType {
949        self.core.bar_type
950    }
951
952    fn is_running(&self) -> bool {
953        self.core.is_running
954    }
955
956    fn set_is_running(&mut self, value: bool) {
957        self.core.set_is_running(value);
958    }
959
960    /// Apply the given update to the aggregator.
961    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
962        let mut size_update = size.as_f64();
963        let spec = self.core.bar_type.spec();
964
965        while size_update > 0.0 {
966            let value_update = price.as_f64() * size_update;
967            if value_update == 0.0 {
968                // Prevent division by zero - apply remaining size without triggering bar
969                self.core
970                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
971                break;
972            }
973
974            if self.cum_value + value_update < spec.step.get() as f64 {
975                self.cum_value += value_update;
976                self.core
977                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
978                break;
979            }
980
981            let value_diff = spec.step.get() as f64 - self.cum_value;
982            let mut size_diff = size_update * (value_diff / value_update);
983
984            // Clamp to minimum representable size to avoid zero-volume bars
985            if is_below_min_size(size_diff, size.precision) {
986                if is_below_min_size(size_update, size.precision) {
987                    break;
988                }
989                size_diff = min_size_f64(size.precision);
990            }
991
992            self.core
993                .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
994
995            self.core.build_now_and_send();
996            self.cum_value = 0.0;
997            size_update -= size_diff;
998        }
999    }
1000
1001    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1002        let mut volume_update = volume;
1003        let average_price = Price::new(
1004            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1005            self.core.builder.price_precision,
1006        );
1007
1008        while volume_update.as_f64() > 0.0 {
1009            let value_update = average_price.as_f64() * volume_update.as_f64();
1010            if value_update == 0.0 {
1011                // Prevent division by zero - apply remaining volume without triggering bar
1012                self.core.builder.update_bar(bar, volume_update, ts_init);
1013                break;
1014            }
1015
1016            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1017                self.cum_value += value_update;
1018                self.core.builder.update_bar(bar, volume_update, ts_init);
1019                break;
1020            }
1021
1022            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1023            let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1024
1025            // Clamp to minimum representable size to avoid zero-volume bars
1026            if is_below_min_size(volume_diff, volume_update.precision) {
1027                if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1028                    break;
1029                }
1030                volume_diff = min_size_f64(volume_update.precision);
1031            }
1032
1033            self.core.builder.update_bar(
1034                bar,
1035                Quantity::new(volume_diff, volume_update.precision),
1036                ts_init,
1037            );
1038
1039            self.core.build_now_and_send();
1040            self.cum_value = 0.0;
1041            volume_update = Quantity::new(
1042                volume_update.as_f64() - volume_diff,
1043                volume_update.precision,
1044            );
1045        }
1046    }
1047}
1048
1049/// Aggregates bars based on buy/sell notional imbalance.
1050pub struct ValueImbalanceBarAggregator {
1051    core: BarAggregatorCore,
1052    imbalance_value: f64,
1053    step_value: f64,
1054}
1055
1056impl Debug for ValueImbalanceBarAggregator {
1057    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058        f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1059            .field("core", &self.core)
1060            .field("imbalance_value", &self.imbalance_value)
1061            .field("step_value", &self.step_value)
1062            .finish()
1063    }
1064}
1065
1066impl ValueImbalanceBarAggregator {
1067    /// Creates a new [`ValueImbalanceBarAggregator`] instance.
1068    ///
1069    /// # Panics
1070    ///
1071    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1072    pub fn new<H: FnMut(Bar) + 'static>(
1073        bar_type: BarType,
1074        price_precision: u8,
1075        size_precision: u8,
1076        handler: H,
1077    ) -> Self {
1078        Self {
1079            core: BarAggregatorCore::new(
1080                bar_type.standard(),
1081                price_precision,
1082                size_precision,
1083                handler,
1084            ),
1085            imbalance_value: 0.0,
1086            step_value: bar_type.spec().step.get() as f64,
1087        }
1088    }
1089}
1090
1091impl BarAggregator for ValueImbalanceBarAggregator {
1092    fn bar_type(&self) -> BarType {
1093        self.core.bar_type
1094    }
1095
1096    fn is_running(&self) -> bool {
1097        self.core.is_running
1098    }
1099
1100    fn set_is_running(&mut self, value: bool) {
1101        self.core.set_is_running(value);
1102    }
1103
1104    /// Apply the given update to the aggregator.
1105    ///
1106    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1107    /// quote/bar updates where no aggressor side is available.
1108    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1109        self.core.apply_update(price, size, ts_init);
1110    }
1111
1112    fn handle_trade(&mut self, trade: TradeTick) {
1113        let price_f64 = trade.price.as_f64();
1114        if price_f64 == 0.0 {
1115            self.core
1116                .apply_update(trade.price, trade.size, trade.ts_init);
1117            return;
1118        }
1119
1120        let side_sign = match trade.aggressor_side {
1121            AggressorSide::Buyer => 1.0,
1122            AggressorSide::Seller => -1.0,
1123            AggressorSide::NoAggressor => {
1124                self.core
1125                    .apply_update(trade.price, trade.size, trade.ts_init);
1126                return;
1127            }
1128        };
1129
1130        let mut size_remaining = trade.size.as_f64();
1131        while size_remaining > 0.0 {
1132            let value_remaining = price_f64 * size_remaining;
1133            if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1134                let needed = self.step_value - self.imbalance_value.abs();
1135                if value_remaining <= needed {
1136                    self.imbalance_value += side_sign * value_remaining;
1137                    self.core.apply_update(
1138                        trade.price,
1139                        Quantity::new(size_remaining, trade.size.precision),
1140                        trade.ts_init,
1141                    );
1142
1143                    if self.imbalance_value.abs() >= self.step_value {
1144                        self.core.build_now_and_send();
1145                        self.imbalance_value = 0.0;
1146                    }
1147                    break;
1148                }
1149
1150                let mut value_chunk = needed;
1151                let mut size_chunk = value_chunk / price_f64;
1152
1153                // Clamp to minimum representable size to avoid zero-volume bars
1154                if is_below_min_size(size_chunk, trade.size.precision) {
1155                    if is_below_min_size(size_remaining, trade.size.precision) {
1156                        break;
1157                    }
1158                    size_chunk = min_size_f64(trade.size.precision);
1159                    value_chunk = price_f64 * size_chunk;
1160                }
1161
1162                self.core.apply_update(
1163                    trade.price,
1164                    Quantity::new(size_chunk, trade.size.precision),
1165                    trade.ts_init,
1166                );
1167                self.imbalance_value += side_sign * value_chunk;
1168                size_remaining -= size_chunk;
1169
1170                if self.imbalance_value.abs() >= self.step_value {
1171                    self.core.build_now_and_send();
1172                    self.imbalance_value = 0.0;
1173                }
1174            } else {
1175                // Opposing side: first neutralize existing imbalance
1176                let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1177                let mut size_chunk = value_to_flatten / price_f64;
1178
1179                // Clamp to minimum representable size to avoid zero-volume bars
1180                if is_below_min_size(size_chunk, trade.size.precision) {
1181                    if is_below_min_size(size_remaining, trade.size.precision) {
1182                        break;
1183                    }
1184                    size_chunk = min_size_f64(trade.size.precision);
1185                    value_to_flatten = price_f64 * size_chunk;
1186                }
1187
1188                self.core.apply_update(
1189                    trade.price,
1190                    Quantity::new(size_chunk, trade.size.precision),
1191                    trade.ts_init,
1192                );
1193                self.imbalance_value += side_sign * value_to_flatten;
1194
1195                // Min-size clamp can overshoot past threshold
1196                if self.imbalance_value.abs() >= self.step_value {
1197                    self.core.build_now_and_send();
1198                    self.imbalance_value = 0.0;
1199                }
1200                size_remaining -= size_chunk;
1201            }
1202        }
1203    }
1204
1205    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1206        self.core.builder.update_bar(bar, volume, ts_init);
1207    }
1208}
1209
1210/// Aggregates bars based on consecutive buy/sell notional runs.
1211pub struct ValueRunsBarAggregator {
1212    core: BarAggregatorCore,
1213    current_run_side: Option<AggressorSide>,
1214    run_value: f64,
1215    step_value: f64,
1216}
1217
1218impl Debug for ValueRunsBarAggregator {
1219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1220        f.debug_struct(stringify!(ValueRunsBarAggregator))
1221            .field("core", &self.core)
1222            .field("current_run_side", &self.current_run_side)
1223            .field("run_value", &self.run_value)
1224            .field("step_value", &self.step_value)
1225            .finish()
1226    }
1227}
1228
1229impl ValueRunsBarAggregator {
1230    /// Creates a new [`ValueRunsBarAggregator`] instance.
1231    ///
1232    /// # Panics
1233    ///
1234    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1235    pub fn new<H: FnMut(Bar) + 'static>(
1236        bar_type: BarType,
1237        price_precision: u8,
1238        size_precision: u8,
1239        handler: H,
1240    ) -> Self {
1241        Self {
1242            core: BarAggregatorCore::new(
1243                bar_type.standard(),
1244                price_precision,
1245                size_precision,
1246                handler,
1247            ),
1248            current_run_side: None,
1249            run_value: 0.0,
1250            step_value: bar_type.spec().step.get() as f64,
1251        }
1252    }
1253}
1254
1255impl BarAggregator for ValueRunsBarAggregator {
1256    fn bar_type(&self) -> BarType {
1257        self.core.bar_type
1258    }
1259
1260    fn is_running(&self) -> bool {
1261        self.core.is_running
1262    }
1263
1264    fn set_is_running(&mut self, value: bool) {
1265        self.core.set_is_running(value);
1266    }
1267
1268    /// Apply the given update to the aggregator.
1269    ///
1270    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1271    /// quote/bar updates where no aggressor side is available.
1272    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1273        self.core.apply_update(price, size, ts_init);
1274    }
1275
1276    fn handle_trade(&mut self, trade: TradeTick) {
1277        let price_f64 = trade.price.as_f64();
1278        if price_f64 == 0.0 {
1279            self.core
1280                .apply_update(trade.price, trade.size, trade.ts_init);
1281            return;
1282        }
1283
1284        let side = match trade.aggressor_side {
1285            AggressorSide::Buyer => Some(AggressorSide::Buyer),
1286            AggressorSide::Seller => Some(AggressorSide::Seller),
1287            AggressorSide::NoAggressor => None,
1288        };
1289
1290        let Some(side) = side else {
1291            self.core
1292                .apply_update(trade.price, trade.size, trade.ts_init);
1293            return;
1294        };
1295
1296        if self.current_run_side != Some(side) {
1297            self.current_run_side = Some(side);
1298            self.run_value = 0.0;
1299            self.core.builder.reset();
1300        }
1301
1302        let mut size_remaining = trade.size.as_f64();
1303        while size_remaining > 0.0 {
1304            let value_update = price_f64 * size_remaining;
1305            if self.run_value + value_update < self.step_value {
1306                self.run_value += value_update;
1307                self.core.apply_update(
1308                    trade.price,
1309                    Quantity::new(size_remaining, trade.size.precision),
1310                    trade.ts_init,
1311                );
1312                break;
1313            }
1314
1315            let value_needed = self.step_value - self.run_value;
1316            let mut size_chunk = value_needed / price_f64;
1317
1318            // Clamp to minimum representable size to avoid zero-volume bars
1319            if is_below_min_size(size_chunk, trade.size.precision) {
1320                if is_below_min_size(size_remaining, trade.size.precision) {
1321                    break;
1322                }
1323                size_chunk = min_size_f64(trade.size.precision);
1324            }
1325
1326            self.core.apply_update(
1327                trade.price,
1328                Quantity::new(size_chunk, trade.size.precision),
1329                trade.ts_init,
1330            );
1331
1332            self.core.build_now_and_send();
1333            self.run_value = 0.0;
1334            self.current_run_side = None;
1335            size_remaining -= size_chunk;
1336        }
1337    }
1338
1339    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1340        self.core.builder.update_bar(bar, volume, ts_init);
1341    }
1342}
1343
1344/// Provides a means of building Renko bars aggregated from quote and trades.
1345///
1346/// Renko bars are created when the price moves by a fixed amount (brick size)
1347/// regardless of time or volume. Each bar represents a price movement equal
1348/// to the step size in the bar specification.
1349pub struct RenkoBarAggregator {
1350    core: BarAggregatorCore,
1351    pub brick_size: PriceRaw,
1352    last_close: Option<Price>,
1353}
1354
1355impl Debug for RenkoBarAggregator {
1356    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1357        f.debug_struct(stringify!(RenkoBarAggregator))
1358            .field("core", &self.core)
1359            .field("brick_size", &self.brick_size)
1360            .field("last_close", &self.last_close)
1361            .finish()
1362    }
1363}
1364
1365impl RenkoBarAggregator {
1366    /// Creates a new [`RenkoBarAggregator`] instance.
1367    ///
1368    /// # Panics
1369    ///
1370    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1371    pub fn new<H: FnMut(Bar) + 'static>(
1372        bar_type: BarType,
1373        price_precision: u8,
1374        size_precision: u8,
1375        price_increment: Price,
1376        handler: H,
1377    ) -> Self {
1378        // Calculate brick size in raw price units (step * price_increment.raw)
1379        let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1380
1381        Self {
1382            core: BarAggregatorCore::new(
1383                bar_type.standard(),
1384                price_precision,
1385                size_precision,
1386                handler,
1387            ),
1388            brick_size,
1389            last_close: None,
1390        }
1391    }
1392}
1393
1394impl BarAggregator for RenkoBarAggregator {
1395    fn bar_type(&self) -> BarType {
1396        self.core.bar_type
1397    }
1398
1399    fn is_running(&self) -> bool {
1400        self.core.is_running
1401    }
1402
1403    fn set_is_running(&mut self, value: bool) {
1404        self.core.set_is_running(value);
1405    }
1406
1407    /// Apply the given update to the aggregator.
1408    ///
1409    /// For Renko bars, we check if the price movement from the last close
1410    /// is greater than or equal to the brick size. If so, we create new bars.
1411    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1412        // Always update the builder with the current tick
1413        self.core.apply_update(price, size, ts_init);
1414
1415        // Initialize last_close if this is the first update
1416        if self.last_close.is_none() {
1417            self.last_close = Some(price);
1418            return;
1419        }
1420
1421        let last_close = self.last_close.unwrap();
1422
1423        // Convert prices to raw units (integers) to avoid floating point precision issues
1424        let current_raw = price.raw;
1425        let last_close_raw = last_close.raw;
1426        let price_diff_raw = current_raw - last_close_raw;
1427        let abs_price_diff_raw = price_diff_raw.abs();
1428
1429        // Check if we need to create one or more Renko bars
1430        if abs_price_diff_raw >= self.brick_size {
1431            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1432            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1433            let mut current_close = last_close;
1434
1435            // Store the current builder volume to distribute across bricks
1436            let total_volume = self.core.builder.volume;
1437
1438            for _i in 0..num_bricks {
1439                // Calculate the close price for this brick using raw price units
1440                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1441                let brick_close = Price::from_raw(brick_close_raw, price.precision);
1442
1443                // For Renko bars: open = previous close, high/low depend on direction
1444                let (brick_high, brick_low) = if direction > 0.0 {
1445                    (brick_close, current_close)
1446                } else {
1447                    (current_close, brick_close)
1448                };
1449
1450                // Reset builder for this brick
1451                self.core.builder.reset();
1452                self.core.builder.open = Some(current_close);
1453                self.core.builder.high = Some(brick_high);
1454                self.core.builder.low = Some(brick_low);
1455                self.core.builder.close = Some(brick_close);
1456                self.core.builder.volume = total_volume; // Each brick gets the full volume
1457                self.core.builder.count = 1;
1458                self.core.builder.ts_last = ts_init;
1459                self.core.builder.initialized = true;
1460
1461                // Build and send the bar
1462                self.core.build_and_send(ts_init, ts_init);
1463
1464                // Update for the next brick
1465                current_close = brick_close;
1466                self.last_close = Some(brick_close);
1467            }
1468        }
1469    }
1470
1471    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1472        // Always update the builder with the current bar
1473        self.core.builder.update_bar(bar, volume, ts_init);
1474
1475        // Initialize last_close if this is the first update
1476        if self.last_close.is_none() {
1477            self.last_close = Some(bar.close);
1478            return;
1479        }
1480
1481        let last_close = self.last_close.unwrap();
1482
1483        // Convert prices to raw units (integers) to avoid floating point precision issues
1484        let current_raw = bar.close.raw;
1485        let last_close_raw = last_close.raw;
1486        let price_diff_raw = current_raw - last_close_raw;
1487        let abs_price_diff_raw = price_diff_raw.abs();
1488
1489        // Check if we need to create one or more Renko bars
1490        if abs_price_diff_raw >= self.brick_size {
1491            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1492            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1493            let mut current_close = last_close;
1494
1495            // Store the current builder volume to distribute across bricks
1496            let total_volume = self.core.builder.volume;
1497
1498            for _i in 0..num_bricks {
1499                // Calculate the close price for this brick using raw price units
1500                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1501                let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1502
1503                // For Renko bars: open = previous close, high/low depend on direction
1504                let (brick_high, brick_low) = if direction > 0.0 {
1505                    (brick_close, current_close)
1506                } else {
1507                    (current_close, brick_close)
1508                };
1509
1510                // Reset builder for this brick
1511                self.core.builder.reset();
1512                self.core.builder.open = Some(current_close);
1513                self.core.builder.high = Some(brick_high);
1514                self.core.builder.low = Some(brick_low);
1515                self.core.builder.close = Some(brick_close);
1516                self.core.builder.volume = total_volume; // Each brick gets the full volume
1517                self.core.builder.count = 1;
1518                self.core.builder.ts_last = ts_init;
1519                self.core.builder.initialized = true;
1520
1521                // Build and send the bar
1522                self.core.build_and_send(ts_init, ts_init);
1523
1524                // Update for the next brick
1525                current_close = brick_close;
1526                self.last_close = Some(brick_close);
1527            }
1528        }
1529    }
1530}
1531
1532/// Provides a means of building time bars aggregated from quote and trades.
1533///
1534/// At each aggregation time interval, a bar is created and sent to the handler.
1535pub struct TimeBarAggregator {
1536    core: BarAggregatorCore,
1537    clock: Rc<RefCell<dyn Clock>>,
1538    build_with_no_updates: bool,
1539    timestamp_on_close: bool,
1540    is_left_open: bool,
1541    stored_open_ns: UnixNanos,
1542    timer_name: String,
1543    interval_ns: UnixNanos,
1544    next_close_ns: UnixNanos,
1545    bar_build_delay: u64,
1546    time_bars_origin_offset: Option<TimeDelta>,
1547    skip_first_non_full_bar: bool,
1548    pub historical_mode: bool,
1549    historical_events: Vec<TimeEvent>,
1550    aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1551}
1552
1553impl Debug for TimeBarAggregator {
1554    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1555        f.debug_struct(stringify!(TimeBarAggregator))
1556            .field("core", &self.core)
1557            .field("build_with_no_updates", &self.build_with_no_updates)
1558            .field("timestamp_on_close", &self.timestamp_on_close)
1559            .field("is_left_open", &self.is_left_open)
1560            .field("timer_name", &self.timer_name)
1561            .field("interval_ns", &self.interval_ns)
1562            .field("bar_build_delay", &self.bar_build_delay)
1563            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1564            .finish()
1565    }
1566}
1567
1568impl TimeBarAggregator {
1569    /// Creates a new [`TimeBarAggregator`] instance.
1570    ///
1571    /// # Panics
1572    ///
1573    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1574    #[allow(clippy::too_many_arguments)]
1575    pub fn new<H: FnMut(Bar) + 'static>(
1576        bar_type: BarType,
1577        price_precision: u8,
1578        size_precision: u8,
1579        clock: Rc<RefCell<dyn Clock>>,
1580        handler: H,
1581        build_with_no_updates: bool,
1582        timestamp_on_close: bool,
1583        interval_type: BarIntervalType,
1584        time_bars_origin_offset: Option<TimeDelta>,
1585        bar_build_delay: u64,
1586        skip_first_non_full_bar: bool,
1587    ) -> Self {
1588        let is_left_open = match interval_type {
1589            BarIntervalType::LeftOpen => true,
1590            BarIntervalType::RightOpen => false,
1591        };
1592
1593        let core = BarAggregatorCore::new(
1594            bar_type.standard(),
1595            price_precision,
1596            size_precision,
1597            handler,
1598        );
1599
1600        Self {
1601            core,
1602            clock,
1603            build_with_no_updates,
1604            timestamp_on_close,
1605            is_left_open,
1606            stored_open_ns: UnixNanos::default(),
1607            timer_name: bar_type.to_string(),
1608            interval_ns: get_bar_interval_ns(&bar_type),
1609            next_close_ns: UnixNanos::default(),
1610            bar_build_delay,
1611            time_bars_origin_offset,
1612            skip_first_non_full_bar,
1613            historical_mode: false,
1614            historical_events: Vec::new(),
1615            aggregator_weak: None,
1616        }
1617    }
1618
1619    /// Sets the clock for the aggregator (internal method).
1620    pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1621        self.clock = clock;
1622    }
1623
1624    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
1625    ///
1626    /// This matches the Cython `start_timer()` method exactly.
1627    /// Creates a callback to `build_bar` using a weak reference to the aggregator.
1628    ///
1629    /// # Panics
1630    ///
1631    /// Panics if aggregator_rc is None and aggregator_weak hasn't been set, or if timer registration fails.
1632    pub fn start_timer_internal(
1633        &mut self,
1634        aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1635    ) {
1636        // Create callback that calls build_bar through the weak reference
1637        let aggregator_weak = if let Some(rc) = aggregator_rc {
1638            // Store weak reference for future use (e.g., in build_bar for month/year)
1639            let weak = Rc::downgrade(&rc);
1640            self.aggregator_weak = Some(weak.clone());
1641            weak
1642        } else {
1643            // Use existing weak reference (for historical mode where it was set earlier)
1644            self.aggregator_weak
1645                .as_ref()
1646                .expect("Aggregator weak reference must be set before calling start_timer()")
1647                .clone()
1648        };
1649
1650        let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1651            if let Some(agg) = aggregator_weak.upgrade() {
1652                agg.borrow_mut().build_bar(event);
1653            }
1654        }));
1655
1656        // Computing start_time
1657        let now = self.clock.borrow().utc_now();
1658        let mut start_time =
1659            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1660        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1661
1662        // Closing a partial bar at the transition from historical to backtest data
1663        let fire_immediately = start_time == now;
1664
1665        self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1666
1667        let spec = &self.bar_type().spec();
1668        let start_time_ns = UnixNanos::from(start_time);
1669
1670        if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1671            self.clock
1672                .borrow_mut()
1673                .set_timer_ns(
1674                    &self.timer_name,
1675                    self.interval_ns.as_u64(),
1676                    Some(start_time_ns),
1677                    None,
1678                    Some(callback),
1679                    Some(true), // allow_past
1680                    Some(fire_immediately),
1681                )
1682                .expect(FAILED);
1683
1684            if fire_immediately {
1685                self.next_close_ns = start_time_ns;
1686            } else {
1687                let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1688                self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1689            }
1690
1691            self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1692        } else {
1693            // The monthly/yearly alert time is defined iteratively at each alert time as there is no regular interval
1694            let alert_time = if fire_immediately {
1695                start_time
1696            } else {
1697                let step = spec.step.get() as u32;
1698                if spec.aggregation == BarAggregation::Month {
1699                    add_n_months(start_time, step).expect(FAILED)
1700                } else {
1701                    // Year aggregation
1702                    add_n_years(start_time, step).expect(FAILED)
1703                }
1704            };
1705
1706            self.clock
1707                .borrow_mut()
1708                .set_time_alert_ns(
1709                    &self.timer_name,
1710                    UnixNanos::from(alert_time),
1711                    Some(callback),
1712                    Some(true), // allow_past
1713                )
1714                .expect(FAILED);
1715
1716            self.next_close_ns = UnixNanos::from(alert_time);
1717            self.stored_open_ns = UnixNanos::from(start_time);
1718        }
1719
1720        log::debug!(
1721            "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1722            self.timer_name,
1723            start_time,
1724            self.historical_mode,
1725            fire_immediately,
1726            now,
1727            self.bar_build_delay
1728        );
1729    }
1730
1731    /// Stops the time bar aggregator.
1732    pub fn stop(&mut self) {
1733        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1734    }
1735
1736    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1737        if self.skip_first_non_full_bar {
1738            self.core.builder.reset();
1739            self.skip_first_non_full_bar = false;
1740        } else {
1741            self.core.build_and_send(ts_event, ts_init);
1742        }
1743    }
1744
1745    fn build_bar(&mut self, event: TimeEvent) {
1746        if !self.core.builder.initialized {
1747            return;
1748        }
1749
1750        if !self.build_with_no_updates && self.core.builder.count == 0 {
1751            return; // Do not build bar when no update
1752        }
1753
1754        let ts_init = event.ts_event;
1755        let ts_event = if self.is_left_open {
1756            if self.timestamp_on_close {
1757                event.ts_event
1758            } else {
1759                self.stored_open_ns
1760            }
1761        } else {
1762            self.stored_open_ns
1763        };
1764
1765        self.build_and_send(ts_event, ts_init);
1766
1767        // Close time becomes the next open time
1768        self.stored_open_ns = event.ts_event;
1769
1770        if self.bar_type().spec().aggregation == BarAggregation::Month {
1771            let step = self.bar_type().spec().step.get() as u32;
1772            let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1773
1774            self.clock
1775                .borrow_mut()
1776                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1777                .expect(FAILED);
1778
1779            self.next_close_ns = alert_time_ns;
1780        } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1781            let step = self.bar_type().spec().step.get() as u32;
1782            let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1783
1784            self.clock
1785                .borrow_mut()
1786                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1787                .expect(FAILED);
1788
1789            self.next_close_ns = alert_time_ns;
1790        } else {
1791            // On receiving this event, timer should now have a new `next_time_ns`
1792            self.next_close_ns = self
1793                .clock
1794                .borrow()
1795                .next_time_ns(&self.timer_name)
1796                .unwrap_or_default();
1797        }
1798    }
1799
1800    fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1801        if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1802            // In historical mode, clock is always a TestClock (set by data engine)
1803            {
1804                let mut clock_borrow = self.clock.borrow_mut();
1805                let test_clock = clock_borrow
1806                    .as_any_mut()
1807                    .downcast_mut::<TestClock>()
1808                    .expect("Expected TestClock in historical mode");
1809                test_clock.set_time(ts_init);
1810            }
1811            // In historical mode, weak reference should already be set
1812            self.start_timer_internal(None);
1813        }
1814
1815        // Advance this aggregator's independent clock and collect timer events
1816        {
1817            let mut clock_borrow = self.clock.borrow_mut();
1818            let test_clock = clock_borrow
1819                .as_any_mut()
1820                .downcast_mut::<TestClock>()
1821                .expect("Expected TestClock in historical mode");
1822            self.historical_events = test_clock.advance_time(ts_init, true);
1823        }
1824    }
1825
1826    fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1827        // Process timer events after data processing
1828        // Collect events first to avoid borrow checker issues
1829        let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1830        for event in events {
1831            self.build_bar(event);
1832        }
1833    }
1834
1835    /// Sets historical events (called by data engine after advancing clock)
1836    pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1837        self.historical_events = events;
1838    }
1839}
1840
1841impl BarAggregator for TimeBarAggregator {
1842    fn bar_type(&self) -> BarType {
1843        self.core.bar_type
1844    }
1845
1846    fn is_running(&self) -> bool {
1847        self.core.is_running
1848    }
1849
1850    fn set_is_running(&mut self, value: bool) {
1851        self.core.set_is_running(value);
1852    }
1853
1854    /// Stop time-based aggregator by canceling its timer.
1855    fn stop(&mut self) {
1856        Self::stop(self);
1857    }
1858
1859    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1860        if self.historical_mode {
1861            self.preprocess_historical_events(ts_init);
1862        }
1863
1864        self.core.apply_update(price, size, ts_init);
1865
1866        if self.historical_mode {
1867            self.postprocess_historical_events(ts_init);
1868        }
1869    }
1870
1871    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1872        if self.historical_mode {
1873            self.preprocess_historical_events(ts_init);
1874        }
1875
1876        self.core.builder.update_bar(bar, volume, ts_init);
1877
1878        if self.historical_mode {
1879            self.postprocess_historical_events(ts_init);
1880        }
1881    }
1882
1883    fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1884        self.historical_mode = historical_mode;
1885        self.core.handler = handler;
1886    }
1887
1888    fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1889        self.set_historical_events_internal(events);
1890    }
1891
1892    fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1893        self.set_clock_internal(clock);
1894    }
1895
1896    fn build_bar(&mut self, event: TimeEvent) {
1897        // Delegate to the implementation method
1898        // We use the struct name here to disambiguate from the trait method
1899        {
1900            #[allow(clippy::use_self)]
1901            TimeBarAggregator::build_bar(self, event);
1902        }
1903    }
1904
1905    fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1906        self.aggregator_weak = Some(weak);
1907    }
1908
1909    fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1910        self.start_timer_internal(aggregator_rc);
1911    }
1912}
1913
1914fn is_below_min_size(size: f64, precision: u8) -> bool {
1915    Quantity::new(size, precision).raw == 0
1916}
1917
1918fn min_size_f64(precision: u8) -> f64 {
1919    10_f64.powi(-(precision as i32))
1920}
1921
1922#[cfg(test)]
1923mod tests {
1924    use std::sync::{Arc, Mutex};
1925
1926    use nautilus_common::clock::TestClock;
1927    use nautilus_core::{MUTEX_POISONED, UUID4};
1928    use nautilus_model::{
1929        data::{BarSpecification, BarType},
1930        enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1931        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1932        types::{Price, Quantity},
1933    };
1934    use rstest::rstest;
1935    use ustr::Ustr;
1936
1937    use super::*;
1938
1939    #[rstest]
1940    fn test_bar_builder_initialization(equity_aapl: Equity) {
1941        let instrument = InstrumentAny::Equity(equity_aapl);
1942        let bar_type = BarType::new(
1943            instrument.id(),
1944            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1945            AggregationSource::Internal,
1946        );
1947        let builder = BarBuilder::new(
1948            bar_type,
1949            instrument.price_precision(),
1950            instrument.size_precision(),
1951        );
1952
1953        assert!(!builder.initialized);
1954        assert_eq!(builder.ts_last, 0);
1955        assert_eq!(builder.count, 0);
1956    }
1957
1958    #[rstest]
1959    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1960        let instrument = InstrumentAny::Equity(equity_aapl);
1961        let bar_type = BarType::new(
1962            instrument.id(),
1963            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1964            AggregationSource::Internal,
1965        );
1966        let mut builder = BarBuilder::new(
1967            bar_type,
1968            instrument.price_precision(),
1969            instrument.size_precision(),
1970        );
1971
1972        builder.update(
1973            Price::from("100.00"),
1974            Quantity::from(1),
1975            UnixNanos::from(1000),
1976        );
1977        builder.update(
1978            Price::from("95.00"),
1979            Quantity::from(1),
1980            UnixNanos::from(2000),
1981        );
1982        builder.update(
1983            Price::from("105.00"),
1984            Quantity::from(1),
1985            UnixNanos::from(3000),
1986        );
1987
1988        let bar = builder.build_now();
1989        assert!(bar.high > bar.low);
1990        assert_eq!(bar.open, Price::from("100.00"));
1991        assert_eq!(bar.high, Price::from("105.00"));
1992        assert_eq!(bar.low, Price::from("95.00"));
1993        assert_eq!(bar.close, Price::from("105.00"));
1994    }
1995
1996    #[rstest]
1997    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1998        let instrument = InstrumentAny::Equity(equity_aapl);
1999        let bar_type = BarType::new(
2000            instrument.id(),
2001            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2002            AggregationSource::Internal,
2003        );
2004        let mut builder = BarBuilder::new(
2005            bar_type,
2006            instrument.price_precision(),
2007            instrument.size_precision(),
2008        );
2009
2010        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2011        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2012
2013        assert_eq!(builder.ts_last, 1_000);
2014        assert_eq!(builder.count, 1);
2015    }
2016
2017    #[rstest]
2018    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2019        let instrument = InstrumentAny::Equity(equity_aapl);
2020        let bar_type = BarType::new(
2021            instrument.id(),
2022            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2023            AggregationSource::Internal,
2024        );
2025        let mut builder = BarBuilder::new(
2026            bar_type,
2027            instrument.price_precision(),
2028            instrument.size_precision(),
2029        );
2030
2031        builder.update(
2032            Price::from("1.00000"),
2033            Quantity::from(1),
2034            UnixNanos::default(),
2035        );
2036
2037        assert!(builder.initialized);
2038        assert_eq!(builder.ts_last, 0);
2039        assert_eq!(builder.count, 1);
2040    }
2041
2042    #[rstest]
2043    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2044        equity_aapl: Equity,
2045    ) {
2046        let instrument = InstrumentAny::Equity(equity_aapl);
2047        let bar_type = BarType::new(
2048            instrument.id(),
2049            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2050            AggregationSource::Internal,
2051        );
2052        let mut builder = BarBuilder::new(bar_type, 2, 0);
2053
2054        builder.update(
2055            Price::from("1.00000"),
2056            Quantity::from(1),
2057            UnixNanos::from(1_000),
2058        );
2059        builder.update(
2060            Price::from("1.00001"),
2061            Quantity::from(1),
2062            UnixNanos::from(500),
2063        );
2064
2065        assert!(builder.initialized);
2066        assert_eq!(builder.ts_last, 1_000);
2067        assert_eq!(builder.count, 1);
2068    }
2069
2070    #[rstest]
2071    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2072        let instrument = InstrumentAny::Equity(equity_aapl);
2073        let bar_type = BarType::new(
2074            instrument.id(),
2075            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2076            AggregationSource::Internal,
2077        );
2078        let mut builder = BarBuilder::new(
2079            bar_type,
2080            instrument.price_precision(),
2081            instrument.size_precision(),
2082        );
2083
2084        for _ in 0..5 {
2085            builder.update(
2086                Price::from("1.00000"),
2087                Quantity::from(1),
2088                UnixNanos::from(1_000),
2089            );
2090        }
2091
2092        assert_eq!(builder.count, 5);
2093    }
2094
2095    #[rstest]
2096    #[should_panic]
2097    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2098        let instrument = InstrumentAny::Equity(equity_aapl);
2099        let bar_type = BarType::new(
2100            instrument.id(),
2101            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2102            AggregationSource::Internal,
2103        );
2104        let mut builder = BarBuilder::new(
2105            bar_type,
2106            instrument.price_precision(),
2107            instrument.size_precision(),
2108        );
2109        let _ = builder.build_now();
2110    }
2111
2112    #[rstest]
2113    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2114        let instrument = InstrumentAny::Equity(equity_aapl);
2115        let bar_type = BarType::new(
2116            instrument.id(),
2117            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2118            AggregationSource::Internal,
2119        );
2120        let mut builder = BarBuilder::new(
2121            bar_type,
2122            instrument.price_precision(),
2123            instrument.size_precision(),
2124        );
2125
2126        builder.update(
2127            Price::from("1.00001"),
2128            Quantity::from(2),
2129            UnixNanos::default(),
2130        );
2131        builder.update(
2132            Price::from("1.00002"),
2133            Quantity::from(2),
2134            UnixNanos::default(),
2135        );
2136        builder.update(
2137            Price::from("1.00000"),
2138            Quantity::from(1),
2139            UnixNanos::from(1_000_000_000),
2140        );
2141
2142        let bar = builder.build_now();
2143
2144        assert_eq!(bar.open, Price::from("1.00001"));
2145        assert_eq!(bar.high, Price::from("1.00002"));
2146        assert_eq!(bar.low, Price::from("1.00000"));
2147        assert_eq!(bar.close, Price::from("1.00000"));
2148        assert_eq!(bar.volume, Quantity::from(5));
2149        assert_eq!(bar.ts_init, 1_000_000_000);
2150        assert_eq!(builder.ts_last, 1_000_000_000);
2151        assert_eq!(builder.count, 0);
2152    }
2153
2154    #[rstest]
2155    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2156        let instrument = InstrumentAny::Equity(equity_aapl);
2157        let bar_type = BarType::new(
2158            instrument.id(),
2159            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2160            AggregationSource::Internal,
2161        );
2162        let mut builder = BarBuilder::new(bar_type, 2, 0);
2163
2164        builder.update(
2165            Price::from("1.00001"),
2166            Quantity::from(1),
2167            UnixNanos::default(),
2168        );
2169        builder.build_now();
2170
2171        builder.update(
2172            Price::from("1.00000"),
2173            Quantity::from(1),
2174            UnixNanos::default(),
2175        );
2176        builder.update(
2177            Price::from("1.00003"),
2178            Quantity::from(1),
2179            UnixNanos::default(),
2180        );
2181        builder.update(
2182            Price::from("1.00002"),
2183            Quantity::from(1),
2184            UnixNanos::default(),
2185        );
2186
2187        let bar = builder.build_now();
2188
2189        assert_eq!(bar.open, Price::from("1.00000"));
2190        assert_eq!(bar.high, Price::from("1.00003"));
2191        assert_eq!(bar.low, Price::from("1.00000"));
2192        assert_eq!(bar.close, Price::from("1.00002"));
2193        assert_eq!(bar.volume, Quantity::from(3));
2194    }
2195
2196    #[rstest]
2197    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2198        let instrument = InstrumentAny::Equity(equity_aapl);
2199        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2200        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2201        let handler = Arc::new(Mutex::new(Vec::new()));
2202        let handler_clone = Arc::clone(&handler);
2203
2204        let mut aggregator = TickBarAggregator::new(
2205            bar_type,
2206            instrument.price_precision(),
2207            instrument.size_precision(),
2208            move |bar: Bar| {
2209                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2210                handler_guard.push(bar);
2211            },
2212        );
2213
2214        let trade = TradeTick::default();
2215        aggregator.handle_trade(trade);
2216
2217        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2218        assert_eq!(handler_guard.len(), 0);
2219    }
2220
2221    #[rstest]
2222    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2223        let instrument = InstrumentAny::Equity(equity_aapl);
2224        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2225        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2226        let handler = Arc::new(Mutex::new(Vec::new()));
2227        let handler_clone = Arc::clone(&handler);
2228
2229        let mut aggregator = TickBarAggregator::new(
2230            bar_type,
2231            instrument.price_precision(),
2232            instrument.size_precision(),
2233            move |bar: Bar| {
2234                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2235                handler_guard.push(bar);
2236            },
2237        );
2238
2239        let trade = TradeTick::default();
2240        aggregator.handle_trade(trade);
2241        aggregator.handle_trade(trade);
2242        aggregator.handle_trade(trade);
2243
2244        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2245        let bar = handler_guard.first().unwrap();
2246        assert_eq!(handler_guard.len(), 1);
2247        assert_eq!(bar.open, trade.price);
2248        assert_eq!(bar.high, trade.price);
2249        assert_eq!(bar.low, trade.price);
2250        assert_eq!(bar.close, trade.price);
2251        assert_eq!(bar.volume, Quantity::from(300000));
2252        assert_eq!(bar.ts_event, trade.ts_event);
2253        assert_eq!(bar.ts_init, trade.ts_init);
2254    }
2255
2256    #[rstest]
2257    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2258        let instrument = InstrumentAny::Equity(equity_aapl);
2259        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2260        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2261        let handler = Arc::new(Mutex::new(Vec::new()));
2262        let handler_clone = Arc::clone(&handler);
2263
2264        let mut aggregator = TickBarAggregator::new(
2265            bar_type,
2266            instrument.price_precision(),
2267            instrument.size_precision(),
2268            move |bar: Bar| {
2269                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2270                handler_guard.push(bar);
2271            },
2272        );
2273
2274        aggregator.update(
2275            Price::from("1.00001"),
2276            Quantity::from(1),
2277            UnixNanos::default(),
2278        );
2279        aggregator.update(
2280            Price::from("1.00002"),
2281            Quantity::from(1),
2282            UnixNanos::from(1000),
2283        );
2284        aggregator.update(
2285            Price::from("1.00003"),
2286            Quantity::from(1),
2287            UnixNanos::from(2000),
2288        );
2289
2290        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2291        assert_eq!(handler_guard.len(), 1);
2292
2293        let bar = handler_guard.first().unwrap();
2294        assert_eq!(bar.open, Price::from("1.00001"));
2295        assert_eq!(bar.high, Price::from("1.00003"));
2296        assert_eq!(bar.low, Price::from("1.00001"));
2297        assert_eq!(bar.close, Price::from("1.00003"));
2298        assert_eq!(bar.volume, Quantity::from(3));
2299    }
2300
2301    #[rstest]
2302    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2303        let instrument = InstrumentAny::Equity(equity_aapl);
2304        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2305        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2306        let handler = Arc::new(Mutex::new(Vec::new()));
2307        let handler_clone = Arc::clone(&handler);
2308
2309        let mut aggregator = TickBarAggregator::new(
2310            bar_type,
2311            instrument.price_precision(),
2312            instrument.size_precision(),
2313            move |bar: Bar| {
2314                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2315                handler_guard.push(bar);
2316            },
2317        );
2318
2319        aggregator.update(
2320            Price::from("1.00001"),
2321            Quantity::from(1),
2322            UnixNanos::default(),
2323        );
2324        aggregator.update(
2325            Price::from("1.00002"),
2326            Quantity::from(1),
2327            UnixNanos::from(1000),
2328        );
2329        aggregator.update(
2330            Price::from("1.00003"),
2331            Quantity::from(1),
2332            UnixNanos::from(2000),
2333        );
2334        aggregator.update(
2335            Price::from("1.00004"),
2336            Quantity::from(1),
2337            UnixNanos::from(3000),
2338        );
2339
2340        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2341        assert_eq!(handler_guard.len(), 2);
2342
2343        let bar1 = &handler_guard[0];
2344        assert_eq!(bar1.open, Price::from("1.00001"));
2345        assert_eq!(bar1.close, Price::from("1.00002"));
2346        assert_eq!(bar1.volume, Quantity::from(2));
2347
2348        let bar2 = &handler_guard[1];
2349        assert_eq!(bar2.open, Price::from("1.00003"));
2350        assert_eq!(bar2.close, Price::from("1.00004"));
2351        assert_eq!(bar2.volume, Quantity::from(2));
2352    }
2353
2354    #[rstest]
2355    fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2356        let instrument = InstrumentAny::Equity(equity_aapl);
2357        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2358        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2359        let handler = Arc::new(Mutex::new(Vec::new()));
2360        let handler_clone = Arc::clone(&handler);
2361
2362        let mut aggregator = TickImbalanceBarAggregator::new(
2363            bar_type,
2364            instrument.price_precision(),
2365            instrument.size_precision(),
2366            move |bar: Bar| {
2367                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2368                handler_guard.push(bar);
2369            },
2370        );
2371
2372        let trade = TradeTick::default();
2373        aggregator.handle_trade(trade);
2374        aggregator.handle_trade(trade);
2375
2376        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2377        assert_eq!(handler_guard.len(), 1);
2378        let bar = handler_guard.first().unwrap();
2379        assert_eq!(bar.volume, Quantity::from(200000));
2380    }
2381
2382    #[rstest]
2383    fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2384        let instrument = InstrumentAny::Equity(equity_aapl);
2385        let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2386        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2387        let handler = Arc::new(Mutex::new(Vec::new()));
2388        let handler_clone = Arc::clone(&handler);
2389
2390        let mut aggregator = TickImbalanceBarAggregator::new(
2391            bar_type,
2392            instrument.price_precision(),
2393            instrument.size_precision(),
2394            move |bar: Bar| {
2395                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2396                handler_guard.push(bar);
2397            },
2398        );
2399
2400        let sell = TradeTick {
2401            aggressor_side: AggressorSide::Seller,
2402            ..TradeTick::default()
2403        };
2404
2405        aggregator.handle_trade(sell);
2406
2407        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2408        assert_eq!(handler_guard.len(), 1);
2409    }
2410
2411    #[rstest]
2412    fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2413        let instrument = InstrumentAny::Equity(equity_aapl);
2414        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2415        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2416        let handler = Arc::new(Mutex::new(Vec::new()));
2417        let handler_clone = Arc::clone(&handler);
2418
2419        let mut aggregator = TickRunsBarAggregator::new(
2420            bar_type,
2421            instrument.price_precision(),
2422            instrument.size_precision(),
2423            move |bar: Bar| {
2424                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2425                handler_guard.push(bar);
2426            },
2427        );
2428
2429        let buy = TradeTick::default();
2430        let sell = TradeTick {
2431            aggressor_side: AggressorSide::Seller,
2432            ..buy
2433        };
2434
2435        aggregator.handle_trade(buy);
2436        aggregator.handle_trade(buy);
2437        aggregator.handle_trade(sell);
2438        aggregator.handle_trade(sell);
2439
2440        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2441        assert_eq!(handler_guard.len(), 2);
2442    }
2443
2444    #[rstest]
2445    fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2446        let instrument = InstrumentAny::Equity(equity_aapl);
2447        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2448        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2449        let handler = Arc::new(Mutex::new(Vec::new()));
2450        let handler_clone = Arc::clone(&handler);
2451
2452        let mut aggregator = TickRunsBarAggregator::new(
2453            bar_type,
2454            instrument.price_precision(),
2455            instrument.size_precision(),
2456            move |bar: Bar| {
2457                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2458                handler_guard.push(bar);
2459            },
2460        );
2461
2462        let buy = TradeTick {
2463            size: Quantity::from(1),
2464            ..TradeTick::default()
2465        };
2466        let sell = TradeTick {
2467            aggressor_side: AggressorSide::Seller,
2468            size: Quantity::from(1),
2469            ..buy
2470        };
2471
2472        aggregator.handle_trade(buy);
2473        aggregator.handle_trade(buy);
2474        aggregator.handle_trade(sell);
2475        aggregator.handle_trade(sell);
2476
2477        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2478        assert_eq!(handler_guard.len(), 2);
2479        assert_eq!(handler_guard[0].volume, Quantity::from(2));
2480        assert_eq!(handler_guard[1].volume, Quantity::from(2));
2481    }
2482
2483    #[rstest]
2484    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2485        let instrument = InstrumentAny::Equity(equity_aapl);
2486        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2487        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2488        let handler = Arc::new(Mutex::new(Vec::new()));
2489        let handler_clone = Arc::clone(&handler);
2490
2491        let mut aggregator = VolumeBarAggregator::new(
2492            bar_type,
2493            instrument.price_precision(),
2494            instrument.size_precision(),
2495            move |bar: Bar| {
2496                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2497                handler_guard.push(bar);
2498            },
2499        );
2500
2501        aggregator.update(
2502            Price::from("1.00001"),
2503            Quantity::from(25),
2504            UnixNanos::default(),
2505        );
2506
2507        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2508        assert_eq!(handler_guard.len(), 2);
2509        let bar1 = &handler_guard[0];
2510        assert_eq!(bar1.volume, Quantity::from(10));
2511        let bar2 = &handler_guard[1];
2512        assert_eq!(bar2.volume, Quantity::from(10));
2513    }
2514
2515    #[rstest]
2516    fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2517        let instrument = InstrumentAny::Equity(equity_aapl);
2518        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2519        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2520        let handler = Arc::new(Mutex::new(Vec::new()));
2521        let handler_clone = Arc::clone(&handler);
2522
2523        let mut aggregator = VolumeRunsBarAggregator::new(
2524            bar_type,
2525            instrument.price_precision(),
2526            instrument.size_precision(),
2527            move |bar: Bar| {
2528                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2529                handler_guard.push(bar);
2530            },
2531        );
2532
2533        let buy = TradeTick {
2534            instrument_id: instrument.id(),
2535            price: Price::from("1.0"),
2536            size: Quantity::from(1),
2537            ..TradeTick::default()
2538        };
2539        let sell = TradeTick {
2540            aggressor_side: AggressorSide::Seller,
2541            ..buy
2542        };
2543
2544        aggregator.handle_trade(buy);
2545        aggregator.handle_trade(buy); // emit first bar at 2
2546        aggregator.handle_trade(sell);
2547        aggregator.handle_trade(sell); // emit second bar at 2 sell-side
2548
2549        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2550        assert!(handler_guard.len() >= 2);
2551        assert!(
2552            (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2553                < f64::EPSILON
2554        );
2555    }
2556
2557    #[rstest]
2558    fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2559        let instrument = InstrumentAny::Equity(equity_aapl);
2560        let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2561        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2562        let handler = Arc::new(Mutex::new(Vec::new()));
2563        let handler_clone = Arc::clone(&handler);
2564
2565        let mut aggregator = VolumeRunsBarAggregator::new(
2566            bar_type,
2567            instrument.price_precision(),
2568            instrument.size_precision(),
2569            move |bar: Bar| {
2570                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2571                handler_guard.push(bar);
2572            },
2573        );
2574
2575        let trade = TradeTick {
2576            instrument_id: instrument.id(),
2577            price: Price::from("1.0"),
2578            size: Quantity::from(5),
2579            ..TradeTick::default()
2580        };
2581
2582        aggregator.handle_trade(trade);
2583
2584        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2585        assert!(!handler_guard.is_empty());
2586        assert!(handler_guard[0].volume.as_f64() > 0.0);
2587        assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2588    }
2589
2590    #[rstest]
2591    fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2592        let instrument = InstrumentAny::Equity(equity_aapl);
2593        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2594        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2595        let handler = Arc::new(Mutex::new(Vec::new()));
2596        let handler_clone = Arc::clone(&handler);
2597
2598        let mut aggregator = VolumeImbalanceBarAggregator::new(
2599            bar_type,
2600            instrument.price_precision(),
2601            instrument.size_precision(),
2602            move |bar: Bar| {
2603                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2604                handler_guard.push(bar);
2605            },
2606        );
2607
2608        let trade_small = TradeTick {
2609            instrument_id: instrument.id(),
2610            price: Price::from("1.0"),
2611            size: Quantity::from(1),
2612            ..TradeTick::default()
2613        };
2614        let trade_large = TradeTick {
2615            size: Quantity::from(3),
2616            ..trade_small
2617        };
2618
2619        aggregator.handle_trade(trade_small);
2620        aggregator.handle_trade(trade_large);
2621
2622        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2623        assert_eq!(handler_guard.len(), 2);
2624        let total_output = handler_guard
2625            .iter()
2626            .map(|bar| bar.volume.as_f64())
2627            .sum::<f64>();
2628        let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2629        assert!((total_output - total_input).abs() < f64::EPSILON);
2630    }
2631
2632    #[rstest]
2633    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2634        let instrument = InstrumentAny::Equity(equity_aapl);
2635        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
2636        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2637        let handler = Arc::new(Mutex::new(Vec::new()));
2638        let handler_clone = Arc::clone(&handler);
2639
2640        let mut aggregator = ValueBarAggregator::new(
2641            bar_type,
2642            instrument.price_precision(),
2643            instrument.size_precision(),
2644            move |bar: Bar| {
2645                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2646                handler_guard.push(bar);
2647            },
2648        );
2649
2650        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
2651        aggregator.update(
2652            Price::from("100.00"),
2653            Quantity::from(5),
2654            UnixNanos::default(),
2655        );
2656        aggregator.update(
2657            Price::from("100.00"),
2658            Quantity::from(5),
2659            UnixNanos::from(1000),
2660        );
2661
2662        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2663        assert_eq!(handler_guard.len(), 1);
2664        let bar = handler_guard.first().unwrap();
2665        assert_eq!(bar.volume, Quantity::from(10));
2666    }
2667
2668    #[rstest]
2669    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2670        let instrument = InstrumentAny::Equity(equity_aapl);
2671        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2672        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2673        let handler = Arc::new(Mutex::new(Vec::new()));
2674        let handler_clone = Arc::clone(&handler);
2675
2676        let mut aggregator = ValueBarAggregator::new(
2677            bar_type,
2678            instrument.price_precision(),
2679            instrument.size_precision(),
2680            move |bar: Bar| {
2681                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2682                handler_guard.push(bar);
2683            },
2684        );
2685
2686        // Single large update: $100 * 25 = $2500 (should create 2 bars)
2687        aggregator.update(
2688            Price::from("100.00"),
2689            Quantity::from(25),
2690            UnixNanos::default(),
2691        );
2692
2693        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2694        assert_eq!(handler_guard.len(), 2);
2695        let remaining_value = aggregator.get_cumulative_value();
2696        assert!(remaining_value < 1000.0); // Should be less than threshold
2697    }
2698
2699    #[rstest]
2700    fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2701        let instrument = InstrumentAny::Equity(equity_aapl);
2702        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2703        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2704        let handler = Arc::new(Mutex::new(Vec::new()));
2705        let handler_clone = Arc::clone(&handler);
2706
2707        let mut aggregator = ValueBarAggregator::new(
2708            bar_type,
2709            instrument.price_precision(),
2710            instrument.size_precision(),
2711            move |bar: Bar| {
2712                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2713                handler_guard.push(bar);
2714            },
2715        );
2716
2717        // Update with zero price should not cause division by zero
2718        aggregator.update(
2719            Price::from("0.00"),
2720            Quantity::from(100),
2721            UnixNanos::default(),
2722        );
2723
2724        // No bars should be emitted since value is zero
2725        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2726        assert_eq!(handler_guard.len(), 0);
2727
2728        // Cumulative value should remain zero
2729        assert_eq!(aggregator.get_cumulative_value(), 0.0);
2730    }
2731
2732    #[rstest]
2733    fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2734        let instrument = InstrumentAny::Equity(equity_aapl);
2735        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2736        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2737        let handler = Arc::new(Mutex::new(Vec::new()));
2738        let handler_clone = Arc::clone(&handler);
2739
2740        let mut aggregator = ValueBarAggregator::new(
2741            bar_type,
2742            instrument.price_precision(),
2743            instrument.size_precision(),
2744            move |bar: Bar| {
2745                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2746                handler_guard.push(bar);
2747            },
2748        );
2749
2750        // Update with zero size should not cause issues
2751        aggregator.update(
2752            Price::from("100.00"),
2753            Quantity::from(0),
2754            UnixNanos::default(),
2755        );
2756
2757        // No bars should be emitted
2758        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2759        assert_eq!(handler_guard.len(), 0);
2760
2761        // Cumulative value should remain zero
2762        assert_eq!(aggregator.get_cumulative_value(), 0.0);
2763    }
2764
2765    #[rstest]
2766    fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2767        let instrument = InstrumentAny::Equity(equity_aapl);
2768        let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2769        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2770        let handler = Arc::new(Mutex::new(Vec::new()));
2771        let handler_clone = Arc::clone(&handler);
2772
2773        let mut aggregator = ValueImbalanceBarAggregator::new(
2774            bar_type,
2775            instrument.price_precision(),
2776            instrument.size_precision(),
2777            move |bar: Bar| {
2778                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2779                handler_guard.push(bar);
2780            },
2781        );
2782
2783        let buy = TradeTick {
2784            price: Price::from("5.0"),
2785            size: Quantity::from(2), // value 10, should emit one bar
2786            instrument_id: instrument.id(),
2787            ..TradeTick::default()
2788        };
2789        let sell = TradeTick {
2790            price: Price::from("5.0"),
2791            size: Quantity::from(2), // value 10, should emit another bar
2792            aggressor_side: AggressorSide::Seller,
2793            instrument_id: instrument.id(),
2794            ..buy
2795        };
2796
2797        aggregator.handle_trade(buy);
2798        aggregator.handle_trade(sell);
2799
2800        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2801        assert_eq!(handler_guard.len(), 2);
2802    }
2803
2804    #[rstest]
2805    fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2806        let instrument = InstrumentAny::Equity(equity_aapl);
2807        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2808        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2809        let handler = Arc::new(Mutex::new(Vec::new()));
2810        let handler_clone = Arc::clone(&handler);
2811
2812        let mut aggregator = ValueRunsBarAggregator::new(
2813            bar_type,
2814            instrument.price_precision(),
2815            instrument.size_precision(),
2816            move |bar: Bar| {
2817                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2818                handler_guard.push(bar);
2819            },
2820        );
2821
2822        let trade = TradeTick {
2823            price: Price::from("10.0"),
2824            size: Quantity::from(5),
2825            instrument_id: instrument.id(),
2826            ..TradeTick::default()
2827        };
2828
2829        aggregator.handle_trade(trade);
2830        aggregator.handle_trade(trade);
2831
2832        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2833        assert_eq!(handler_guard.len(), 1);
2834        let bar = handler_guard.first().unwrap();
2835        assert_eq!(bar.volume, Quantity::from(10));
2836    }
2837
2838    #[rstest]
2839    fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2840        let instrument = InstrumentAny::Equity(equity_aapl);
2841        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2842        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2843        let handler = Arc::new(Mutex::new(Vec::new()));
2844        let handler_clone = Arc::clone(&handler);
2845
2846        let mut aggregator = ValueRunsBarAggregator::new(
2847            bar_type,
2848            instrument.price_precision(),
2849            instrument.size_precision(),
2850            move |bar: Bar| {
2851                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2852                handler_guard.push(bar);
2853            },
2854        );
2855
2856        let buy = TradeTick {
2857            price: Price::from("10.0"),
2858            size: Quantity::from(5),
2859            instrument_id: instrument.id(),
2860            ..TradeTick::default()
2861        }; // value 50
2862        let sell = TradeTick {
2863            price: Price::from("10.0"),
2864            size: Quantity::from(10),
2865            aggressor_side: AggressorSide::Seller,
2866            ..buy
2867        }; // value 100
2868
2869        aggregator.handle_trade(buy);
2870        aggregator.handle_trade(sell);
2871
2872        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2873        assert_eq!(handler_guard.len(), 1);
2874        assert_eq!(handler_guard[0].volume, Quantity::from(10));
2875    }
2876
2877    #[rstest]
2878    fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2879        let instrument = InstrumentAny::Equity(equity_aapl);
2880        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2881        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2882        let handler = Arc::new(Mutex::new(Vec::new()));
2883        let handler_clone = Arc::clone(&handler);
2884
2885        let mut aggregator = TickRunsBarAggregator::new(
2886            bar_type,
2887            instrument.price_precision(),
2888            instrument.size_precision(),
2889            move |bar: Bar| {
2890                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2891                handler_guard.push(bar);
2892            },
2893        );
2894
2895        let buy = TradeTick::default();
2896
2897        aggregator.handle_trade(buy);
2898        aggregator.handle_trade(buy); // Emit bar 1 (run complete)
2899        aggregator.handle_trade(buy); // Start new run
2900        aggregator.handle_trade(buy); // Emit bar 2 (new run complete)
2901
2902        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2903        assert_eq!(handler_guard.len(), 2);
2904    }
2905
2906    #[rstest]
2907    fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2908        let instrument = InstrumentAny::Equity(equity_aapl);
2909        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2910        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2911        let handler = Arc::new(Mutex::new(Vec::new()));
2912        let handler_clone = Arc::clone(&handler);
2913
2914        let mut aggregator = TickRunsBarAggregator::new(
2915            bar_type,
2916            instrument.price_precision(),
2917            instrument.size_precision(),
2918            move |bar: Bar| {
2919                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2920                handler_guard.push(bar);
2921            },
2922        );
2923
2924        let buy = TradeTick::default();
2925        let no_aggressor = TradeTick {
2926            aggressor_side: AggressorSide::NoAggressor,
2927            ..buy
2928        };
2929
2930        aggregator.handle_trade(buy);
2931        aggregator.handle_trade(no_aggressor); // Should not affect run count
2932        aggregator.handle_trade(no_aggressor); // Should not affect run count
2933        aggregator.handle_trade(buy); // Continue run to threshold
2934
2935        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2936        assert_eq!(handler_guard.len(), 1);
2937    }
2938
2939    #[rstest]
2940    fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2941        let instrument = InstrumentAny::Equity(equity_aapl);
2942        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2943        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2944        let handler = Arc::new(Mutex::new(Vec::new()));
2945        let handler_clone = Arc::clone(&handler);
2946
2947        let mut aggregator = VolumeRunsBarAggregator::new(
2948            bar_type,
2949            instrument.price_precision(),
2950            instrument.size_precision(),
2951            move |bar: Bar| {
2952                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2953                handler_guard.push(bar);
2954            },
2955        );
2956
2957        let buy = TradeTick {
2958            instrument_id: instrument.id(),
2959            price: Price::from("1.0"),
2960            size: Quantity::from(1),
2961            ..TradeTick::default()
2962        };
2963
2964        aggregator.handle_trade(buy);
2965        aggregator.handle_trade(buy); // Emit bar 1 (2.0 volume reached)
2966        aggregator.handle_trade(buy); // Start new run
2967        aggregator.handle_trade(buy); // Emit bar 2 (new 2.0 volume reached)
2968
2969        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2970        assert_eq!(handler_guard.len(), 2);
2971        assert_eq!(handler_guard[0].volume, Quantity::from(2));
2972        assert_eq!(handler_guard[1].volume, Quantity::from(2));
2973    }
2974
2975    #[rstest]
2976    fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2977        let instrument = InstrumentAny::Equity(equity_aapl);
2978        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2979        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2980        let handler = Arc::new(Mutex::new(Vec::new()));
2981        let handler_clone = Arc::clone(&handler);
2982
2983        let mut aggregator = ValueRunsBarAggregator::new(
2984            bar_type,
2985            instrument.price_precision(),
2986            instrument.size_precision(),
2987            move |bar: Bar| {
2988                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2989                handler_guard.push(bar);
2990            },
2991        );
2992
2993        let buy = TradeTick {
2994            instrument_id: instrument.id(),
2995            price: Price::from("10.0"),
2996            size: Quantity::from(5),
2997            ..TradeTick::default()
2998        }; // value 50 per trade
2999
3000        aggregator.handle_trade(buy);
3001        aggregator.handle_trade(buy); // Emit bar 1 (100 value reached)
3002        aggregator.handle_trade(buy); // Start new run
3003        aggregator.handle_trade(buy); // Emit bar 2 (new 100 value reached)
3004
3005        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3006        assert_eq!(handler_guard.len(), 2);
3007        assert_eq!(handler_guard[0].volume, Quantity::from(10));
3008        assert_eq!(handler_guard[1].volume, Quantity::from(10));
3009    }
3010
3011    #[rstest]
3012    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
3013        let instrument = InstrumentAny::Equity(equity_aapl);
3014        // One second bars
3015        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3016        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3017        let handler = Arc::new(Mutex::new(Vec::new()));
3018        let handler_clone = Arc::clone(&handler);
3019        let clock = Rc::new(RefCell::new(TestClock::new()));
3020
3021        let mut aggregator = TimeBarAggregator::new(
3022            bar_type,
3023            instrument.price_precision(),
3024            instrument.size_precision(),
3025            clock.clone(),
3026            move |bar: Bar| {
3027                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3028                handler_guard.push(bar);
3029            },
3030            true,  // build_with_no_updates
3031            false, // timestamp_on_close
3032            BarIntervalType::LeftOpen,
3033            None,  // time_bars_origin_offset
3034            15,    // bar_build_delay
3035            false, // skip_first_non_full_bar
3036        );
3037
3038        aggregator.update(
3039            Price::from("100.00"),
3040            Quantity::from(1),
3041            UnixNanos::default(),
3042        );
3043
3044        let next_sec = UnixNanos::from(1_000_000_000);
3045        clock.borrow_mut().set_time(next_sec);
3046
3047        let event = TimeEvent::new(
3048            Ustr::from("1-SECOND-LAST"),
3049            UUID4::new(),
3050            next_sec,
3051            next_sec,
3052        );
3053        aggregator.build_bar(event);
3054
3055        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3056        assert_eq!(handler_guard.len(), 1);
3057        let bar = handler_guard.first().unwrap();
3058        assert_eq!(bar.ts_event, UnixNanos::default());
3059        assert_eq!(bar.ts_init, next_sec);
3060    }
3061
3062    #[rstest]
3063    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3064        let instrument = InstrumentAny::Equity(equity_aapl);
3065        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3066        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3067        let handler = Arc::new(Mutex::new(Vec::new()));
3068        let handler_clone = Arc::clone(&handler);
3069        let clock = Rc::new(RefCell::new(TestClock::new()));
3070
3071        let mut aggregator = TimeBarAggregator::new(
3072            bar_type,
3073            instrument.price_precision(),
3074            instrument.size_precision(),
3075            clock.clone(),
3076            move |bar: Bar| {
3077                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3078                handler_guard.push(bar);
3079            },
3080            true, // build_with_no_updates
3081            true, // timestamp_on_close - changed to true to verify left-open behavior
3082            BarIntervalType::LeftOpen,
3083            None,
3084            15,
3085            false, // skip_first_non_full_bar
3086        );
3087
3088        // Update in first interval
3089        aggregator.update(
3090            Price::from("100.00"),
3091            Quantity::from(1),
3092            UnixNanos::default(),
3093        );
3094
3095        // First interval close
3096        let ts1 = UnixNanos::from(1_000_000_000);
3097        clock.borrow_mut().set_time(ts1);
3098        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3099        aggregator.build_bar(event);
3100
3101        // Update in second interval
3102        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3103
3104        // Second interval close
3105        let ts2 = UnixNanos::from(2_000_000_000);
3106        clock.borrow_mut().set_time(ts2);
3107        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3108        aggregator.build_bar(event);
3109
3110        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3111        assert_eq!(handler_guard.len(), 2);
3112
3113        let bar1 = &handler_guard[0];
3114        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
3115        assert_eq!(bar1.ts_init, ts1);
3116        assert_eq!(bar1.close, Price::from("100.00"));
3117        let bar2 = &handler_guard[1];
3118        assert_eq!(bar2.ts_event, ts2);
3119        assert_eq!(bar2.ts_init, ts2);
3120        assert_eq!(bar2.close, Price::from("101.00"));
3121    }
3122
3123    #[rstest]
3124    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3125        let instrument = InstrumentAny::Equity(equity_aapl);
3126        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3127        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3128        let handler = Arc::new(Mutex::new(Vec::new()));
3129        let handler_clone = Arc::clone(&handler);
3130        let clock = Rc::new(RefCell::new(TestClock::new()));
3131        let mut aggregator = TimeBarAggregator::new(
3132            bar_type,
3133            instrument.price_precision(),
3134            instrument.size_precision(),
3135            clock.clone(),
3136            move |bar: Bar| {
3137                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3138                handler_guard.push(bar);
3139            },
3140            true, // build_with_no_updates
3141            true, // timestamp_on_close
3142            BarIntervalType::RightOpen,
3143            None,
3144            15,
3145            false, // skip_first_non_full_bar
3146        );
3147
3148        // Update in first interval
3149        aggregator.update(
3150            Price::from("100.00"),
3151            Quantity::from(1),
3152            UnixNanos::default(),
3153        );
3154
3155        // First interval close
3156        let ts1 = UnixNanos::from(1_000_000_000);
3157        clock.borrow_mut().set_time(ts1);
3158        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3159        aggregator.build_bar(event);
3160
3161        // Update in second interval
3162        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3163
3164        // Second interval close
3165        let ts2 = UnixNanos::from(2_000_000_000);
3166        clock.borrow_mut().set_time(ts2);
3167        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3168        aggregator.build_bar(event);
3169
3170        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3171        assert_eq!(handler_guard.len(), 2);
3172
3173        let bar1 = &handler_guard[0];
3174        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
3175        assert_eq!(bar1.ts_init, ts1);
3176        assert_eq!(bar1.close, Price::from("100.00"));
3177
3178        let bar2 = &handler_guard[1];
3179        assert_eq!(bar2.ts_event, ts1);
3180        assert_eq!(bar2.ts_init, ts2);
3181        assert_eq!(bar2.close, Price::from("101.00"));
3182    }
3183
3184    #[rstest]
3185    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3186        let instrument = InstrumentAny::Equity(equity_aapl);
3187        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3188        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3189        let handler = Arc::new(Mutex::new(Vec::new()));
3190        let handler_clone = Arc::clone(&handler);
3191        let clock = Rc::new(RefCell::new(TestClock::new()));
3192
3193        // First test with build_with_no_updates = false
3194        let mut aggregator = TimeBarAggregator::new(
3195            bar_type,
3196            instrument.price_precision(),
3197            instrument.size_precision(),
3198            clock.clone(),
3199            move |bar: Bar| {
3200                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3201                handler_guard.push(bar);
3202            },
3203            false, // build_with_no_updates disabled
3204            true,  // timestamp_on_close
3205            BarIntervalType::LeftOpen,
3206            None,
3207            15,
3208            false, // skip_first_non_full_bar
3209        );
3210
3211        // No updates, just interval close
3212        let ts1 = UnixNanos::from(1_000_000_000);
3213        clock.borrow_mut().set_time(ts1);
3214        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3215        aggregator.build_bar(event);
3216
3217        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3218        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
3219        drop(handler_guard);
3220
3221        // Now test with build_with_no_updates = true
3222        let handler = Arc::new(Mutex::new(Vec::new()));
3223        let handler_clone = Arc::clone(&handler);
3224        let mut aggregator = TimeBarAggregator::new(
3225            bar_type,
3226            instrument.price_precision(),
3227            instrument.size_precision(),
3228            clock.clone(),
3229            move |bar: Bar| {
3230                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3231                handler_guard.push(bar);
3232            },
3233            true, // build_with_no_updates enabled
3234            true, // timestamp_on_close
3235            BarIntervalType::LeftOpen,
3236            None,
3237            15,
3238            false, // skip_first_non_full_bar
3239        );
3240
3241        aggregator.update(
3242            Price::from("100.00"),
3243            Quantity::from(1),
3244            UnixNanos::default(),
3245        );
3246
3247        // First interval with update
3248        let ts1 = UnixNanos::from(1_000_000_000);
3249        clock.borrow_mut().set_time(ts1);
3250        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3251        aggregator.build_bar(event);
3252
3253        // Second interval without updates
3254        let ts2 = UnixNanos::from(2_000_000_000);
3255        clock.borrow_mut().set_time(ts2);
3256        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3257        aggregator.build_bar(event);
3258
3259        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3260        assert_eq!(handler_guard.len(), 2); // Both bars should be built
3261        let bar1 = &handler_guard[0];
3262        assert_eq!(bar1.close, Price::from("100.00"));
3263        let bar2 = &handler_guard[1];
3264        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
3265    }
3266
3267    #[rstest]
3268    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3269        let instrument = InstrumentAny::Equity(equity_aapl);
3270        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3271        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3272        let clock = Rc::new(RefCell::new(TestClock::new()));
3273        let handler = Arc::new(Mutex::new(Vec::new()));
3274        let handler_clone = Arc::clone(&handler);
3275
3276        let mut aggregator = TimeBarAggregator::new(
3277            bar_type,
3278            instrument.price_precision(),
3279            instrument.size_precision(),
3280            clock.clone(),
3281            move |bar: Bar| {
3282                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3283                handler_guard.push(bar);
3284            },
3285            true, // build_with_no_updates
3286            true, // timestamp_on_close
3287            BarIntervalType::RightOpen,
3288            None,
3289            15,
3290            false, // skip_first_non_full_bar
3291        );
3292
3293        let ts1 = UnixNanos::from(1_000_000_000);
3294        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3295
3296        let ts2 = UnixNanos::from(2_000_000_000);
3297        clock.borrow_mut().set_time(ts2);
3298
3299        // Simulate timestamp on close
3300        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3301        aggregator.build_bar(event);
3302
3303        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3304        let bar = handler_guard.first().unwrap();
3305        assert_eq!(bar.ts_event, UnixNanos::default());
3306        assert_eq!(bar.ts_init, ts2);
3307    }
3308
3309    #[rstest]
3310    fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3311        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3312        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3313        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3314        let handler = Arc::new(Mutex::new(Vec::new()));
3315        let handler_clone = Arc::clone(&handler);
3316
3317        let aggregator = RenkoBarAggregator::new(
3318            bar_type,
3319            instrument.price_precision(),
3320            instrument.size_precision(),
3321            instrument.price_increment(),
3322            move |bar: Bar| {
3323                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3324                handler_guard.push(bar);
3325            },
3326        );
3327
3328        assert_eq!(aggregator.bar_type(), bar_type);
3329        assert!(!aggregator.is_running());
3330        // 10 pips * price_increment.raw (depends on precision mode)
3331        let expected_brick_size = 10 * instrument.price_increment().raw;
3332        assert_eq!(aggregator.brick_size, expected_brick_size);
3333    }
3334
3335    #[rstest]
3336    fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3337        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3338        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3339        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3340        let handler = Arc::new(Mutex::new(Vec::new()));
3341        let handler_clone = Arc::clone(&handler);
3342
3343        let mut aggregator = RenkoBarAggregator::new(
3344            bar_type,
3345            instrument.price_precision(),
3346            instrument.size_precision(),
3347            instrument.price_increment(),
3348            move |bar: Bar| {
3349                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3350                handler_guard.push(bar);
3351            },
3352        );
3353
3354        // Small price movement (5 pips, less than 10 pip brick size)
3355        aggregator.update(
3356            Price::from("1.00000"),
3357            Quantity::from(1),
3358            UnixNanos::default(),
3359        );
3360        aggregator.update(
3361            Price::from("1.00005"),
3362            Quantity::from(1),
3363            UnixNanos::from(1000),
3364        );
3365
3366        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3367        assert_eq!(handler_guard.len(), 0); // No bar created yet
3368    }
3369
3370    #[rstest]
3371    fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3372        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3373        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3374        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3375        let handler = Arc::new(Mutex::new(Vec::new()));
3376        let handler_clone = Arc::clone(&handler);
3377
3378        let mut aggregator = RenkoBarAggregator::new(
3379            bar_type,
3380            instrument.price_precision(),
3381            instrument.size_precision(),
3382            instrument.price_increment(),
3383            move |bar: Bar| {
3384                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3385                handler_guard.push(bar);
3386            },
3387        );
3388
3389        // Price movement exceeding brick size (15 pips)
3390        aggregator.update(
3391            Price::from("1.00000"),
3392            Quantity::from(1),
3393            UnixNanos::default(),
3394        );
3395        aggregator.update(
3396            Price::from("1.00015"),
3397            Quantity::from(1),
3398            UnixNanos::from(1000),
3399        );
3400
3401        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3402        assert_eq!(handler_guard.len(), 1);
3403
3404        let bar = handler_guard.first().unwrap();
3405        assert_eq!(bar.open, Price::from("1.00000"));
3406        assert_eq!(bar.high, Price::from("1.00010"));
3407        assert_eq!(bar.low, Price::from("1.00000"));
3408        assert_eq!(bar.close, Price::from("1.00010"));
3409        assert_eq!(bar.volume, Quantity::from(2));
3410        assert_eq!(bar.ts_event, UnixNanos::from(1000));
3411        assert_eq!(bar.ts_init, UnixNanos::from(1000));
3412    }
3413
3414    #[rstest]
3415    fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3416        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3417        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3418        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3419        let handler = Arc::new(Mutex::new(Vec::new()));
3420        let handler_clone = Arc::clone(&handler);
3421
3422        let mut aggregator = RenkoBarAggregator::new(
3423            bar_type,
3424            instrument.price_precision(),
3425            instrument.size_precision(),
3426            instrument.price_increment(),
3427            move |bar: Bar| {
3428                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3429                handler_guard.push(bar);
3430            },
3431        );
3432
3433        // Large price movement creating multiple bricks (25 pips = 2 bricks)
3434        aggregator.update(
3435            Price::from("1.00000"),
3436            Quantity::from(1),
3437            UnixNanos::default(),
3438        );
3439        aggregator.update(
3440            Price::from("1.00025"),
3441            Quantity::from(1),
3442            UnixNanos::from(1000),
3443        );
3444
3445        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3446        assert_eq!(handler_guard.len(), 2);
3447
3448        let bar1 = &handler_guard[0];
3449        assert_eq!(bar1.open, Price::from("1.00000"));
3450        assert_eq!(bar1.high, Price::from("1.00010"));
3451        assert_eq!(bar1.low, Price::from("1.00000"));
3452        assert_eq!(bar1.close, Price::from("1.00010"));
3453
3454        let bar2 = &handler_guard[1];
3455        assert_eq!(bar2.open, Price::from("1.00010"));
3456        assert_eq!(bar2.high, Price::from("1.00020"));
3457        assert_eq!(bar2.low, Price::from("1.00010"));
3458        assert_eq!(bar2.close, Price::from("1.00020"));
3459    }
3460
3461    #[rstest]
3462    fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3463        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3464        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3465        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3466        let handler = Arc::new(Mutex::new(Vec::new()));
3467        let handler_clone = Arc::clone(&handler);
3468
3469        let mut aggregator = RenkoBarAggregator::new(
3470            bar_type,
3471            instrument.price_precision(),
3472            instrument.size_precision(),
3473            instrument.price_increment(),
3474            move |bar: Bar| {
3475                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3476                handler_guard.push(bar);
3477            },
3478        );
3479
3480        // Start at higher price and move down
3481        aggregator.update(
3482            Price::from("1.00020"),
3483            Quantity::from(1),
3484            UnixNanos::default(),
3485        );
3486        aggregator.update(
3487            Price::from("1.00005"),
3488            Quantity::from(1),
3489            UnixNanos::from(1000),
3490        );
3491
3492        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3493        assert_eq!(handler_guard.len(), 1);
3494
3495        let bar = handler_guard.first().unwrap();
3496        assert_eq!(bar.open, Price::from("1.00020"));
3497        assert_eq!(bar.high, Price::from("1.00020"));
3498        assert_eq!(bar.low, Price::from("1.00010"));
3499        assert_eq!(bar.close, Price::from("1.00010"));
3500        assert_eq!(bar.volume, Quantity::from(2));
3501    }
3502
3503    #[rstest]
3504    fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3505        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3506        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3507        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3508        let handler = Arc::new(Mutex::new(Vec::new()));
3509        let handler_clone = Arc::clone(&handler);
3510
3511        let mut aggregator = RenkoBarAggregator::new(
3512            bar_type,
3513            instrument.price_precision(),
3514            instrument.size_precision(),
3515            instrument.price_increment(),
3516            move |bar: Bar| {
3517                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3518                handler_guard.push(bar);
3519            },
3520        );
3521
3522        // Create a bar with small price movement (5 pips)
3523        let input_bar = Bar::new(
3524            BarType::new(
3525                instrument.id(),
3526                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3527                AggregationSource::Internal,
3528            ),
3529            Price::from("1.00000"),
3530            Price::from("1.00005"),
3531            Price::from("0.99995"),
3532            Price::from("1.00005"), // 5 pip move up (less than 10 pip brick)
3533            Quantity::from(100),
3534            UnixNanos::default(),
3535            UnixNanos::from(1000),
3536        );
3537
3538        aggregator.handle_bar(input_bar);
3539
3540        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3541        assert_eq!(handler_guard.len(), 0); // No bar created yet
3542    }
3543
3544    #[rstest]
3545    fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3546        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3547        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3548        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3549        let handler = Arc::new(Mutex::new(Vec::new()));
3550        let handler_clone = Arc::clone(&handler);
3551
3552        let mut aggregator = RenkoBarAggregator::new(
3553            bar_type,
3554            instrument.price_precision(),
3555            instrument.size_precision(),
3556            instrument.price_increment(),
3557            move |bar: Bar| {
3558                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3559                handler_guard.push(bar);
3560            },
3561        );
3562
3563        // First bar to establish baseline
3564        let bar1 = Bar::new(
3565            BarType::new(
3566                instrument.id(),
3567                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3568                AggregationSource::Internal,
3569            ),
3570            Price::from("1.00000"),
3571            Price::from("1.00005"),
3572            Price::from("0.99995"),
3573            Price::from("1.00000"),
3574            Quantity::from(100),
3575            UnixNanos::default(),
3576            UnixNanos::default(),
3577        );
3578
3579        // Second bar with price movement exceeding brick size (10 pips)
3580        let bar2 = Bar::new(
3581            BarType::new(
3582                instrument.id(),
3583                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3584                AggregationSource::Internal,
3585            ),
3586            Price::from("1.00000"),
3587            Price::from("1.00015"),
3588            Price::from("0.99995"),
3589            Price::from("1.00010"), // 10 pip move up (exactly 1 brick)
3590            Quantity::from(50),
3591            UnixNanos::from(60_000_000_000),
3592            UnixNanos::from(60_000_000_000),
3593        );
3594
3595        aggregator.handle_bar(bar1);
3596        aggregator.handle_bar(bar2);
3597
3598        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3599        assert_eq!(handler_guard.len(), 1);
3600
3601        let bar = handler_guard.first().unwrap();
3602        assert_eq!(bar.open, Price::from("1.00000"));
3603        assert_eq!(bar.high, Price::from("1.00010"));
3604        assert_eq!(bar.low, Price::from("1.00000"));
3605        assert_eq!(bar.close, Price::from("1.00010"));
3606        assert_eq!(bar.volume, Quantity::from(150));
3607    }
3608
3609    #[rstest]
3610    fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3611        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3612        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3613        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3614        let handler = Arc::new(Mutex::new(Vec::new()));
3615        let handler_clone = Arc::clone(&handler);
3616
3617        let mut aggregator = RenkoBarAggregator::new(
3618            bar_type,
3619            instrument.price_precision(),
3620            instrument.size_precision(),
3621            instrument.price_increment(),
3622            move |bar: Bar| {
3623                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3624                handler_guard.push(bar);
3625            },
3626        );
3627
3628        // First bar to establish baseline
3629        let bar1 = Bar::new(
3630            BarType::new(
3631                instrument.id(),
3632                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3633                AggregationSource::Internal,
3634            ),
3635            Price::from("1.00000"),
3636            Price::from("1.00005"),
3637            Price::from("0.99995"),
3638            Price::from("1.00000"),
3639            Quantity::from(100),
3640            UnixNanos::default(),
3641            UnixNanos::default(),
3642        );
3643
3644        // Second bar with large price movement (30 pips = 3 bricks)
3645        let bar2 = Bar::new(
3646            BarType::new(
3647                instrument.id(),
3648                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3649                AggregationSource::Internal,
3650            ),
3651            Price::from("1.00000"),
3652            Price::from("1.00035"),
3653            Price::from("0.99995"),
3654            Price::from("1.00030"), // 30 pip move up (exactly 3 bricks)
3655            Quantity::from(50),
3656            UnixNanos::from(60_000_000_000),
3657            UnixNanos::from(60_000_000_000),
3658        );
3659
3660        aggregator.handle_bar(bar1);
3661        aggregator.handle_bar(bar2);
3662
3663        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3664        assert_eq!(handler_guard.len(), 3);
3665
3666        let bar1 = &handler_guard[0];
3667        assert_eq!(bar1.open, Price::from("1.00000"));
3668        assert_eq!(bar1.close, Price::from("1.00010"));
3669
3670        let bar2 = &handler_guard[1];
3671        assert_eq!(bar2.open, Price::from("1.00010"));
3672        assert_eq!(bar2.close, Price::from("1.00020"));
3673
3674        let bar3 = &handler_guard[2];
3675        assert_eq!(bar3.open, Price::from("1.00020"));
3676        assert_eq!(bar3.close, Price::from("1.00030"));
3677    }
3678
3679    #[rstest]
3680    fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3681        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3682        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3683        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3684        let handler = Arc::new(Mutex::new(Vec::new()));
3685        let handler_clone = Arc::clone(&handler);
3686
3687        let mut aggregator = RenkoBarAggregator::new(
3688            bar_type,
3689            instrument.price_precision(),
3690            instrument.size_precision(),
3691            instrument.price_increment(),
3692            move |bar: Bar| {
3693                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3694                handler_guard.push(bar);
3695            },
3696        );
3697
3698        // First bar to establish baseline
3699        let bar1 = Bar::new(
3700            BarType::new(
3701                instrument.id(),
3702                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3703                AggregationSource::Internal,
3704            ),
3705            Price::from("1.00020"),
3706            Price::from("1.00025"),
3707            Price::from("1.00015"),
3708            Price::from("1.00020"),
3709            Quantity::from(100),
3710            UnixNanos::default(),
3711            UnixNanos::default(),
3712        );
3713
3714        // Second bar with downward price movement (10 pips down)
3715        let bar2 = Bar::new(
3716            BarType::new(
3717                instrument.id(),
3718                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3719                AggregationSource::Internal,
3720            ),
3721            Price::from("1.00020"),
3722            Price::from("1.00025"),
3723            Price::from("1.00005"),
3724            Price::from("1.00010"), // 10 pip move down (exactly 1 brick)
3725            Quantity::from(50),
3726            UnixNanos::from(60_000_000_000),
3727            UnixNanos::from(60_000_000_000),
3728        );
3729
3730        aggregator.handle_bar(bar1);
3731        aggregator.handle_bar(bar2);
3732
3733        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3734        assert_eq!(handler_guard.len(), 1);
3735
3736        let bar = handler_guard.first().unwrap();
3737        assert_eq!(bar.open, Price::from("1.00020"));
3738        assert_eq!(bar.high, Price::from("1.00020"));
3739        assert_eq!(bar.low, Price::from("1.00010"));
3740        assert_eq!(bar.close, Price::from("1.00010"));
3741        assert_eq!(bar.volume, Quantity::from(150));
3742    }
3743
3744    #[rstest]
3745    fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3746        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3747
3748        // Test different brick sizes
3749        let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); // 5 pip brick size
3750        let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3751        let handler = Arc::new(Mutex::new(Vec::new()));
3752        let handler_clone = Arc::clone(&handler);
3753
3754        let aggregator_5 = RenkoBarAggregator::new(
3755            bar_type_5,
3756            instrument.price_precision(),
3757            instrument.size_precision(),
3758            instrument.price_increment(),
3759            move |_bar: Bar| {
3760                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3761                handler_guard.push(_bar);
3762            },
3763        );
3764
3765        // 5 pips * price_increment.raw (depends on precision mode)
3766        let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3767        assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3768
3769        let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); // 20 pip brick size
3770        let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3771        let handler2 = Arc::new(Mutex::new(Vec::new()));
3772        let handler2_clone = Arc::clone(&handler2);
3773
3774        let aggregator_20 = RenkoBarAggregator::new(
3775            bar_type_20,
3776            instrument.price_precision(),
3777            instrument.size_precision(),
3778            instrument.price_increment(),
3779            move |_bar: Bar| {
3780                let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3781                handler_guard.push(_bar);
3782            },
3783        );
3784
3785        // 20 pips * price_increment.raw (depends on precision mode)
3786        let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3787        assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3788    }
3789
3790    #[rstest]
3791    fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3792        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3793        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3794        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3795        let handler = Arc::new(Mutex::new(Vec::new()));
3796        let handler_clone = Arc::clone(&handler);
3797
3798        let mut aggregator = RenkoBarAggregator::new(
3799            bar_type,
3800            instrument.price_precision(),
3801            instrument.size_precision(),
3802            instrument.price_increment(),
3803            move |bar: Bar| {
3804                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3805                handler_guard.push(bar);
3806            },
3807        );
3808
3809        // Sequential updates creating multiple bars
3810        aggregator.update(
3811            Price::from("1.00000"),
3812            Quantity::from(1),
3813            UnixNanos::from(1000),
3814        );
3815        aggregator.update(
3816            Price::from("1.00010"),
3817            Quantity::from(1),
3818            UnixNanos::from(2000),
3819        ); // First brick
3820        aggregator.update(
3821            Price::from("1.00020"),
3822            Quantity::from(1),
3823            UnixNanos::from(3000),
3824        ); // Second brick
3825        aggregator.update(
3826            Price::from("1.00025"),
3827            Quantity::from(1),
3828            UnixNanos::from(4000),
3829        ); // Partial third brick
3830        aggregator.update(
3831            Price::from("1.00030"),
3832            Quantity::from(1),
3833            UnixNanos::from(5000),
3834        ); // Complete third brick
3835
3836        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3837        assert_eq!(handler_guard.len(), 3);
3838
3839        let bar1 = &handler_guard[0];
3840        assert_eq!(bar1.open, Price::from("1.00000"));
3841        assert_eq!(bar1.close, Price::from("1.00010"));
3842
3843        let bar2 = &handler_guard[1];
3844        assert_eq!(bar2.open, Price::from("1.00010"));
3845        assert_eq!(bar2.close, Price::from("1.00020"));
3846
3847        let bar3 = &handler_guard[2];
3848        assert_eq!(bar3.open, Price::from("1.00020"));
3849        assert_eq!(bar3.close, Price::from("1.00030"));
3850    }
3851
3852    #[rstest]
3853    fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3854        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3855        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3856        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3857        let handler = Arc::new(Mutex::new(Vec::new()));
3858        let handler_clone = Arc::clone(&handler);
3859
3860        let mut aggregator = RenkoBarAggregator::new(
3861            bar_type,
3862            instrument.price_precision(),
3863            instrument.size_precision(),
3864            instrument.price_increment(),
3865            move |bar: Bar| {
3866                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3867                handler_guard.push(bar);
3868            },
3869        );
3870
3871        // Mixed direction movement: up then down
3872        aggregator.update(
3873            Price::from("1.00000"),
3874            Quantity::from(1),
3875            UnixNanos::from(1000),
3876        );
3877        aggregator.update(
3878            Price::from("1.00010"),
3879            Quantity::from(1),
3880            UnixNanos::from(2000),
3881        ); // Up brick
3882        aggregator.update(
3883            Price::from("0.99990"),
3884            Quantity::from(1),
3885            UnixNanos::from(3000),
3886        ); // Down 2 bricks (20 pips)
3887
3888        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3889        assert_eq!(handler_guard.len(), 3);
3890
3891        let bar1 = &handler_guard[0]; // Up brick
3892        assert_eq!(bar1.open, Price::from("1.00000"));
3893        assert_eq!(bar1.high, Price::from("1.00010"));
3894        assert_eq!(bar1.low, Price::from("1.00000"));
3895        assert_eq!(bar1.close, Price::from("1.00010"));
3896
3897        let bar2 = &handler_guard[1]; // First down brick
3898        assert_eq!(bar2.open, Price::from("1.00010"));
3899        assert_eq!(bar2.high, Price::from("1.00010"));
3900        assert_eq!(bar2.low, Price::from("1.00000"));
3901        assert_eq!(bar2.close, Price::from("1.00000"));
3902
3903        let bar3 = &handler_guard[2]; // Second down brick
3904        assert_eq!(bar3.open, Price::from("1.00000"));
3905        assert_eq!(bar3.high, Price::from("1.00000"));
3906        assert_eq!(bar3.low, Price::from("0.99990"));
3907        assert_eq!(bar3.close, Price::from("0.99990"));
3908    }
3909
3910    #[rstest]
3911    fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3912        let instrument = InstrumentAny::Equity(equity_aapl);
3913        let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3914        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3915        let handler = Arc::new(Mutex::new(Vec::new()));
3916        let handler_clone = Arc::clone(&handler);
3917
3918        let mut aggregator = TickImbalanceBarAggregator::new(
3919            bar_type,
3920            instrument.price_precision(),
3921            instrument.size_precision(),
3922            move |bar: Bar| {
3923                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3924                handler_guard.push(bar);
3925            },
3926        );
3927
3928        let buy = TradeTick {
3929            aggressor_side: AggressorSide::Buyer,
3930            ..TradeTick::default()
3931        };
3932        let sell = TradeTick {
3933            aggressor_side: AggressorSide::Seller,
3934            ..TradeTick::default()
3935        };
3936
3937        aggregator.handle_trade(buy);
3938        aggregator.handle_trade(sell);
3939        aggregator.handle_trade(buy);
3940
3941        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3942        assert_eq!(handler_guard.len(), 0);
3943    }
3944
3945    #[rstest]
3946    fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3947        let instrument = InstrumentAny::Equity(equity_aapl);
3948        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3949        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3950        let handler = Arc::new(Mutex::new(Vec::new()));
3951        let handler_clone = Arc::clone(&handler);
3952
3953        let mut aggregator = TickImbalanceBarAggregator::new(
3954            bar_type,
3955            instrument.price_precision(),
3956            instrument.size_precision(),
3957            move |bar: Bar| {
3958                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3959                handler_guard.push(bar);
3960            },
3961        );
3962
3963        let buy = TradeTick {
3964            aggressor_side: AggressorSide::Buyer,
3965            ..TradeTick::default()
3966        };
3967        let no_aggressor = TradeTick {
3968            aggressor_side: AggressorSide::NoAggressor,
3969            ..TradeTick::default()
3970        };
3971
3972        aggregator.handle_trade(buy);
3973        aggregator.handle_trade(no_aggressor);
3974        aggregator.handle_trade(buy);
3975
3976        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3977        assert_eq!(handler_guard.len(), 1);
3978    }
3979
3980    #[rstest]
3981    fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3982        let instrument = InstrumentAny::Equity(equity_aapl);
3983        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3984        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3985        let handler = Arc::new(Mutex::new(Vec::new()));
3986        let handler_clone = Arc::clone(&handler);
3987
3988        let mut aggregator = TickRunsBarAggregator::new(
3989            bar_type,
3990            instrument.price_precision(),
3991            instrument.size_precision(),
3992            move |bar: Bar| {
3993                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3994                handler_guard.push(bar);
3995            },
3996        );
3997
3998        let buy = TradeTick {
3999            aggressor_side: AggressorSide::Buyer,
4000            ..TradeTick::default()
4001        };
4002        let sell = TradeTick {
4003            aggressor_side: AggressorSide::Seller,
4004            ..TradeTick::default()
4005        };
4006
4007        aggregator.handle_trade(buy);
4008        aggregator.handle_trade(buy);
4009        aggregator.handle_trade(sell);
4010        aggregator.handle_trade(sell);
4011
4012        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4013        assert_eq!(handler_guard.len(), 2);
4014    }
4015
4016    #[rstest]
4017    fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4018        let instrument = InstrumentAny::Equity(equity_aapl);
4019        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4020        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4021        let handler = Arc::new(Mutex::new(Vec::new()));
4022        let handler_clone = Arc::clone(&handler);
4023
4024        let mut aggregator = VolumeImbalanceBarAggregator::new(
4025            bar_type,
4026            instrument.price_precision(),
4027            instrument.size_precision(),
4028            move |bar: Bar| {
4029                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4030                handler_guard.push(bar);
4031            },
4032        );
4033
4034        let large_trade = TradeTick {
4035            size: Quantity::from(25),
4036            aggressor_side: AggressorSide::Buyer,
4037            ..TradeTick::default()
4038        };
4039
4040        aggregator.handle_trade(large_trade);
4041
4042        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4043        assert_eq!(handler_guard.len(), 2);
4044    }
4045
4046    #[rstest]
4047    fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4048        equity_aapl: Equity,
4049    ) {
4050        let instrument = InstrumentAny::Equity(equity_aapl);
4051        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4052        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4053        let handler = Arc::new(Mutex::new(Vec::new()));
4054        let handler_clone = Arc::clone(&handler);
4055
4056        let mut aggregator = VolumeImbalanceBarAggregator::new(
4057            bar_type,
4058            instrument.price_precision(),
4059            instrument.size_precision(),
4060            move |bar: Bar| {
4061                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4062                handler_guard.push(bar);
4063            },
4064        );
4065
4066        let buy = TradeTick {
4067            size: Quantity::from(5),
4068            aggressor_side: AggressorSide::Buyer,
4069            ..TradeTick::default()
4070        };
4071        let no_aggressor = TradeTick {
4072            size: Quantity::from(3),
4073            aggressor_side: AggressorSide::NoAggressor,
4074            ..TradeTick::default()
4075        };
4076
4077        aggregator.handle_trade(buy);
4078        aggregator.handle_trade(no_aggressor);
4079        aggregator.handle_trade(buy);
4080
4081        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4082        assert_eq!(handler_guard.len(), 1);
4083    }
4084
4085    #[rstest]
4086    fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4087        let instrument = InstrumentAny::Equity(equity_aapl);
4088        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4089        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4090        let handler = Arc::new(Mutex::new(Vec::new()));
4091        let handler_clone = Arc::clone(&handler);
4092
4093        let mut aggregator = VolumeRunsBarAggregator::new(
4094            bar_type,
4095            instrument.price_precision(),
4096            instrument.size_precision(),
4097            move |bar: Bar| {
4098                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4099                handler_guard.push(bar);
4100            },
4101        );
4102
4103        let large_trade = TradeTick {
4104            size: Quantity::from(25),
4105            aggressor_side: AggressorSide::Buyer,
4106            ..TradeTick::default()
4107        };
4108
4109        aggregator.handle_trade(large_trade);
4110
4111        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4112        assert_eq!(handler_guard.len(), 2);
4113    }
4114
4115    #[rstest]
4116    fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4117        let instrument = InstrumentAny::Equity(equity_aapl);
4118        let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4119        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4120        let handler = Arc::new(Mutex::new(Vec::new()));
4121        let handler_clone = Arc::clone(&handler);
4122
4123        let mut aggregator = ValueRunsBarAggregator::new(
4124            bar_type,
4125            instrument.price_precision(),
4126            instrument.size_precision(),
4127            move |bar: Bar| {
4128                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4129                handler_guard.push(bar);
4130            },
4131        );
4132
4133        let large_trade = TradeTick {
4134            price: Price::from("5.00"),
4135            size: Quantity::from(25),
4136            aggressor_side: AggressorSide::Buyer,
4137            ..TradeTick::default()
4138        };
4139
4140        aggregator.handle_trade(large_trade);
4141
4142        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4143        assert_eq!(handler_guard.len(), 2);
4144    }
4145
4146    #[rstest]
4147    fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4148        let instrument = InstrumentAny::Equity(equity_aapl);
4149        let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
4150        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4151        let handler = Arc::new(Mutex::new(Vec::new()));
4152        let handler_clone = Arc::clone(&handler);
4153
4154        let mut aggregator = ValueBarAggregator::new(
4155            bar_type,
4156            instrument.price_precision(),
4157            instrument.size_precision(),
4158            move |bar: Bar| {
4159                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4160                handler_guard.push(bar);
4161            },
4162        );
4163
4164        // price=1000, size=3, value=3000, step=100 → size_chunk=0.1 rounds to 0 at precision 0
4165        aggregator.update(
4166            Price::from("1000.00"),
4167            Quantity::from(3),
4168            UnixNanos::default(),
4169        );
4170
4171        // 3 bars (one per min-size unit), not 30 zero-volume bars
4172        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4173        assert_eq!(handler_guard.len(), 3);
4174        for bar in handler_guard.iter() {
4175            assert_eq!(bar.volume, Quantity::from(1));
4176        }
4177    }
4178
4179    #[rstest]
4180    fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4181        let instrument = InstrumentAny::Equity(equity_aapl);
4182        let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
4183        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4184        let handler = Arc::new(Mutex::new(Vec::new()));
4185        let handler_clone = Arc::clone(&handler);
4186
4187        let mut aggregator = ValueImbalanceBarAggregator::new(
4188            bar_type,
4189            instrument.price_precision(),
4190            instrument.size_precision(),
4191            move |bar: Bar| {
4192                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4193                handler_guard.push(bar);
4194            },
4195        );
4196
4197        let trade = TradeTick {
4198            price: Price::from("1000.00"),
4199            size: Quantity::from(3),
4200            aggressor_side: AggressorSide::Buyer,
4201            instrument_id: instrument.id(),
4202            ..TradeTick::default()
4203        };
4204
4205        aggregator.handle_trade(trade);
4206
4207        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4208        assert_eq!(handler_guard.len(), 3);
4209        for bar in handler_guard.iter() {
4210            assert_eq!(bar.volume, Quantity::from(1));
4211        }
4212    }
4213
4214    #[rstest]
4215    fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
4216        let instrument = InstrumentAny::Equity(equity_aapl);
4217        let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
4218        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4219        let handler = Arc::new(Mutex::new(Vec::new()));
4220        let handler_clone = Arc::clone(&handler);
4221
4222        let mut aggregator = ValueImbalanceBarAggregator::new(
4223            bar_type,
4224            instrument.price_precision(),
4225            instrument.size_precision(),
4226            move |bar: Bar| {
4227                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4228                handler_guard.push(bar);
4229            },
4230        );
4231
4232        // Build seller imbalance of -50 (below step=100, no bar yet)
4233        let sell_tick = TradeTick {
4234            price: Price::from("10.00"),
4235            size: Quantity::from(5),
4236            aggressor_side: AggressorSide::Seller,
4237            instrument_id: instrument.id(),
4238            ..TradeTick::default()
4239        };
4240
4241        // Opposite-side buyer: flatten amount 50/1000=0.05 < min_size (1),
4242        // clamp overshoots imbalance from -50 to +950, crossing threshold
4243        let buy_tick = TradeTick {
4244            price: Price::from("1000.00"),
4245            size: Quantity::from(1),
4246            aggressor_side: AggressorSide::Buyer,
4247            instrument_id: instrument.id(),
4248            ts_init: UnixNanos::from(1),
4249            ts_event: UnixNanos::from(1),
4250            ..TradeTick::default()
4251        };
4252
4253        aggregator.handle_trade(sell_tick);
4254        aggregator.handle_trade(buy_tick);
4255
4256        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4257        assert_eq!(handler_guard.len(), 1);
4258        assert_eq!(handler_guard[0].volume, Quantity::from(6));
4259    }
4260
4261    #[rstest]
4262    fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
4263        let instrument = InstrumentAny::Equity(equity_aapl);
4264        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4265        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4266        let handler = Arc::new(Mutex::new(Vec::new()));
4267        let handler_clone = Arc::clone(&handler);
4268
4269        let mut aggregator = ValueRunsBarAggregator::new(
4270            bar_type,
4271            instrument.price_precision(),
4272            instrument.size_precision(),
4273            move |bar: Bar| {
4274                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4275                handler_guard.push(bar);
4276            },
4277        );
4278
4279        let trade = TradeTick {
4280            price: Price::from("1000.00"),
4281            size: Quantity::from(3),
4282            aggressor_side: AggressorSide::Buyer,
4283            instrument_id: instrument.id(),
4284            ..TradeTick::default()
4285        };
4286
4287        aggregator.handle_trade(trade);
4288
4289        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4290        assert_eq!(handler_guard.len(), 3);
4291        for bar in handler_guard.iter() {
4292            assert_eq!(bar.volume, Quantity::from(1));
4293        }
4294    }
4295}