Skip to main content

nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{
22    any::Any,
23    cell::RefCell,
24    fmt::Debug,
25    ops::Add,
26    rc::{Rc, Weak},
27};
28
29use ahash::AHashMap;
30use chrono::{Duration, TimeDelta};
31use nautilus_common::{
32    clock::{Clock, TestClock},
33    timer::{TimeEvent, TimeEventCallback},
34};
35use nautilus_core::{
36    UnixNanos,
37    correctness::{self, FAILED},
38    datetime::{
39        add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos, subtract_n_months_nanos,
40        subtract_n_years_nanos,
41    },
42};
43use nautilus_model::{
44    data::{
45        QuoteTick, TradeTick,
46        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
47    },
48    enums::{
49        AggregationSource, AggressorSide, BarAggregation, BarIntervalType,
50        ContinuousFutureAdjustmentType,
51    },
52    identifiers::InstrumentId,
53    instruments::{FixedTickScheme, TickSchemeRule},
54    types::{
55        Price, Quantity,
56        fixed::{FIXED_PRECISION, FIXED_SCALAR, mantissa_exponent_to_fixed_i128},
57        price::PriceRaw,
58        quantity::QuantityRaw,
59    },
60};
61use rust_decimal::{Decimal, prelude::ToPrimitive};
62
63/// Type alias for bar handler to reduce type complexity.
64type BarHandler = Box<dyn FnMut(Bar)>;
65
66/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
67///
68/// Implementors receive updates and produce completed bars via handlers.
69pub trait BarAggregator: Any + Debug {
70    /// The [`BarType`] to be aggregated.
71    fn bar_type(&self) -> BarType;
72    /// If the aggregator is running and will receive data from the message bus.
73    fn is_running(&self) -> bool;
74    /// Sets the running state of the aggregator (receiving updates when `true`).
75    fn set_is_running(&mut self, value: bool);
76    /// Updates the aggregator  with the given price and size.
77    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
78    /// Updates the aggregator with the given quote.
79    fn handle_quote(&mut self, quote: QuoteTick) {
80        let spec = self.bar_type().spec();
81        self.update(
82            quote.extract_price(spec.price_type),
83            quote.extract_size(spec.price_type),
84            quote.ts_init,
85        );
86    }
87    /// Updates the aggregator with the given trade.
88    fn handle_trade(&mut self, trade: TradeTick) {
89        self.update(trade.price, trade.size, trade.ts_init);
90    }
91    /// Updates the aggregator with the given bar.
92    fn handle_bar(&mut self, bar: Bar) {
93        self.update_bar(bar, bar.volume, bar.ts_init);
94    }
95    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
96    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
97    fn stop(&mut self) {}
98    /// Sets historical mode (default implementation does nothing, `TimeBarAggregator` overrides)
99    fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
100    /// Sets historical events (default implementation does nothing, `TimeBarAggregator` overrides)
101    fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
102    /// Sets clock for time bar aggregators (default implementation does nothing, `TimeBarAggregator` overrides)
103    fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
104    /// Builds a bar from a time event (default implementation does nothing, `TimeBarAggregator` overrides)
105    fn build_bar(&mut self, _event: &TimeEvent) {}
106    /// Starts the timer for time bar aggregators.
107    /// Default implementation does nothing, `TimeBarAggregator` overrides.
108    /// Takes an optional Rc to create weak reference internally.
109    fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
110    /// Sets the weak reference to the aggregator wrapper (for historical mode).
111    /// Default implementation does nothing, `TimeBarAggregator` overrides.
112    fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
113}
114
115impl dyn BarAggregator {
116    /// Returns a reference to this aggregator as `Any` for downcasting.
117    pub fn as_any(&self) -> &dyn Any {
118        self
119    }
120    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
121    pub fn as_any_mut(&mut self) -> &mut dyn Any {
122        self
123    }
124}
125
126/// Provides a generic bar builder for aggregation.
127#[derive(Debug)]
128pub struct BarBuilder {
129    bar_type: BarType,
130    price_precision: u8,
131    size_precision: u8,
132    initialized: bool,
133    ts_last: UnixNanos,
134    count: usize,
135    last_close: Option<Price>,
136    open: Option<Price>,
137    high: Option<Price>,
138    low: Option<Price>,
139    close: Option<Price>,
140    volume: Quantity,
141    adjustment_mode: ContinuousFutureAdjustmentType,
142    adjustment_raw: PriceRaw,
143    adjustment_ratio: f64,
144    adjustment_active: bool,
145    adjustment_is_ratio: bool,
146}
147
148impl BarBuilder {
149    /// Creates a new [`BarBuilder`] instance.
150    ///
151    /// # Panics
152    ///
153    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
154    #[must_use]
155    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
156        correctness::check_equal(
157            &bar_type.aggregation_source(),
158            &AggregationSource::Internal,
159            "bar_type.aggregation_source",
160            "AggregationSource::Internal",
161        )
162        .expect(FAILED);
163
164        Self {
165            bar_type,
166            price_precision,
167            size_precision,
168            initialized: false,
169            ts_last: UnixNanos::default(),
170            count: 0,
171            last_close: None,
172            open: None,
173            high: None,
174            low: None,
175            close: None,
176            volume: Quantity::zero(size_precision),
177            adjustment_mode: ContinuousFutureAdjustmentType::default(),
178            adjustment_raw: 0,
179            adjustment_ratio: 1.0,
180            adjustment_active: false,
181            adjustment_is_ratio: false,
182        }
183    }
184
185    /// Configures the per-tick continuous-future price adjustment.
186    ///
187    /// Adjustment applies on ingress in [`Self::update`] and [`Self::update_bar`], so the running
188    /// OHLC state is always in the adjusted (common) frame. The adjustment configuration is
189    /// retained across [`Self::reset`] so it spans subsequent bars within the same continuous-
190    /// future segment.
191    ///
192    /// # Panics
193    ///
194    /// Panics if scaling the spread `adjustment` to the fixed-point representation overflows.
195    pub fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
196        self.adjustment_mode = mode;
197
198        if mode.is_ratio() {
199            self.adjustment_is_ratio = true;
200            self.adjustment_ratio = adjustment.to_f64().unwrap_or(1.0);
201            self.adjustment_active = adjustment != Decimal::ONE;
202            return;
203        }
204
205        // Spread mode: scale the Decimal offset to FIXED_PRECISION once so the hot path
206        // can add it straight onto `price.raw`. Signed PriceRaw supports negatives, so
207        // backward-spread offsets that push prices below zero remain representable.
208        self.adjustment_is_ratio = false;
209        let exponent = -(adjustment.scale() as i8);
210        let raw_i128 =
211            mantissa_exponent_to_fixed_i128(adjustment.mantissa(), exponent, FIXED_PRECISION)
212                .expect("Failed to scale continuous-future adjustment to fixed precision");
213
214        #[allow(
215            clippy::useless_conversion,
216            reason = "i128 to PriceRaw is real when not high-precision"
217        )]
218        let raw: PriceRaw = raw_i128
219            .try_into()
220            .expect("Continuous-future adjustment exceeds PriceRaw range");
221
222        self.adjustment_raw = raw;
223        self.adjustment_active = self.adjustment_raw != 0;
224    }
225
226    fn apply_adjustment_to_price(&self, price: Price) -> Price {
227        if !self.adjustment_active {
228            return price;
229        }
230
231        if self.adjustment_is_ratio {
232            // Multiply in double; `Price::new` rounds to the target precision.
233            // Float can shift 1 ULP for high-precision raws (spread mode is exact).
234            return Price::new(price.as_f64() * self.adjustment_ratio, price.precision);
235        }
236
237        // Spread: signed raw addition.
238        Price::from_raw(price.raw + self.adjustment_raw, price.precision)
239    }
240
241    /// Updates the builder state with the given price, size, and init timestamp.
242    ///
243    /// # Panics
244    ///
245    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
246    pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
247        if ts_init < self.ts_last {
248            return; // Not applicable
249        }
250
251        let price = self.apply_adjustment_to_price(price);
252
253        if self.open.is_none() {
254            self.open = Some(price);
255            self.high = Some(price);
256            self.low = Some(price);
257            self.initialized = true;
258        } else {
259            if price > self.high.unwrap() {
260                self.high = Some(price);
261            }
262
263            if price < self.low.unwrap() {
264                self.low = Some(price);
265            }
266        }
267
268        self.close = Some(price);
269        self.volume = self.volume.add(size);
270        self.count += 1;
271        self.ts_last = ts_init;
272    }
273
274    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
275    ///
276    /// # Panics
277    ///
278    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
279    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
280        if ts_init < self.ts_last {
281            return; // Not applicable
282        }
283
284        let bar_open = self.apply_adjustment_to_price(bar.open);
285        let bar_high = self.apply_adjustment_to_price(bar.high);
286        let bar_low = self.apply_adjustment_to_price(bar.low);
287        let bar_close = self.apply_adjustment_to_price(bar.close);
288
289        if self.open.is_none() {
290            self.open = Some(bar_open);
291            self.high = Some(bar_high);
292            self.low = Some(bar_low);
293            self.initialized = true;
294        } else {
295            if bar_high > self.high.unwrap() {
296                self.high = Some(bar_high);
297            }
298
299            if bar_low < self.low.unwrap() {
300                self.low = Some(bar_low);
301            }
302        }
303
304        self.close = Some(bar_close);
305        self.volume = self.volume.add(volume);
306        self.count += 1;
307        self.ts_last = ts_init;
308    }
309
310    /// Resets per-bar OHLCV state.
311    ///
312    /// Adjustment configuration set via [`Self::set_adjustment`] is retained across resets so it
313    /// spans subsequent bars within the same continuous-future segment.
314    pub fn reset(&mut self) {
315        self.open = None;
316        self.high = None;
317        self.low = None;
318        self.volume = Quantity::zero(self.size_precision);
319        self.count = 0;
320    }
321
322    /// Return the aggregated bar and reset.
323    pub fn build_now(&mut self) -> Bar {
324        self.build(self.ts_last, self.ts_last)
325    }
326
327    /// Returns the aggregated bar for the given timestamps, then resets the builder.
328    ///
329    /// # Panics
330    ///
331    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
332    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
333        if self.open.is_none() {
334            self.open = self.last_close;
335            self.high = self.last_close;
336            self.low = self.last_close;
337            self.close = self.last_close;
338        }
339
340        if let (Some(close), Some(low)) = (self.close, self.low)
341            && close < low
342        {
343            self.low = Some(close);
344        }
345
346        if let (Some(close), Some(high)) = (self.close, self.high)
347            && close > high
348        {
349            self.high = Some(close);
350        }
351
352        // The open was checked, so we can assume all prices are Some
353        let bar = Bar::new(
354            self.bar_type,
355            self.open.unwrap(),
356            self.high.unwrap(),
357            self.low.unwrap(),
358            self.close.unwrap(),
359            self.volume,
360            ts_event,
361            ts_init,
362        );
363
364        self.last_close = self.close;
365        self.reset();
366        bar
367    }
368}
369
370/// Provides a means of aggregating specified bar types and sending to a registered handler.
371pub struct BarAggregatorCore {
372    bar_type: BarType,
373    builder: BarBuilder,
374    handler: BarHandler,
375    is_running: bool,
376}
377
378impl Debug for BarAggregatorCore {
379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380        f.debug_struct(stringify!(BarAggregatorCore))
381            .field("bar_type", &self.bar_type)
382            .field("builder", &self.builder)
383            .field("is_running", &self.is_running)
384            .finish()
385    }
386}
387
388impl BarAggregatorCore {
389    /// Creates a new [`BarAggregatorCore`] instance.
390    ///
391    /// # Panics
392    ///
393    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
394    pub fn new<H: FnMut(Bar) + 'static>(
395        bar_type: BarType,
396        price_precision: u8,
397        size_precision: u8,
398        handler: H,
399    ) -> Self {
400        Self {
401            bar_type,
402            builder: BarBuilder::new(bar_type, price_precision, size_precision),
403            handler: Box::new(handler),
404            is_running: false,
405        }
406    }
407
408    /// Sets the running state of the aggregator (receives updates when `true`).
409    pub const fn set_is_running(&mut self, value: bool) {
410        self.is_running = value;
411    }
412    fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
413        self.builder.update(price, size, ts_init);
414    }
415
416    fn build_now_and_send(&mut self) {
417        let bar = self.builder.build_now();
418        (self.handler)(bar);
419    }
420
421    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
422        let bar = self.builder.build(ts_event, ts_init);
423        (self.handler)(bar);
424    }
425}
426
427/// Provides a means of building tick bars aggregated from quote and trades.
428///
429/// When received tick count reaches the step threshold of the bar
430/// specification, then a bar is created and sent to the handler.
431pub struct TickBarAggregator {
432    core: BarAggregatorCore,
433}
434
435impl Debug for TickBarAggregator {
436    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437        f.debug_struct(stringify!(TickBarAggregator))
438            .field("core", &self.core)
439            .finish()
440    }
441}
442
443impl TickBarAggregator {
444    /// Creates a new [`TickBarAggregator`] instance.
445    ///
446    /// # Panics
447    ///
448    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
449    pub fn new<H: FnMut(Bar) + 'static>(
450        bar_type: BarType,
451        price_precision: u8,
452        size_precision: u8,
453        handler: H,
454    ) -> Self {
455        Self {
456            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
457        }
458    }
459}
460
461impl BarAggregator for TickBarAggregator {
462    fn bar_type(&self) -> BarType {
463        self.core.bar_type
464    }
465
466    fn is_running(&self) -> bool {
467        self.core.is_running
468    }
469
470    fn set_is_running(&mut self, value: bool) {
471        self.core.set_is_running(value);
472    }
473
474    /// Apply the given update to the aggregator.
475    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
476        self.core.apply_update(price, size, ts_init);
477        let spec = self.core.bar_type.spec();
478
479        if self.core.builder.count >= spec.step.get() {
480            self.core.build_now_and_send();
481        }
482    }
483
484    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
485        self.core.builder.update_bar(bar, volume, ts_init);
486        let spec = self.core.bar_type.spec();
487
488        if self.core.builder.count >= spec.step.get() {
489            self.core.build_now_and_send();
490        }
491    }
492}
493
494/// Aggregates bars based on tick buy/sell imbalance.
495///
496/// Increments imbalance by +1 for buyer-aggressed trades and -1 for seller-aggressed trades.
497/// Emits a bar when the absolute imbalance reaches the step threshold.
498pub struct TickImbalanceBarAggregator {
499    core: BarAggregatorCore,
500    imbalance: isize,
501}
502
503impl Debug for TickImbalanceBarAggregator {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        f.debug_struct(stringify!(TickImbalanceBarAggregator))
506            .field("core", &self.core)
507            .field("imbalance", &self.imbalance)
508            .finish()
509    }
510}
511
512impl TickImbalanceBarAggregator {
513    /// Creates a new [`TickImbalanceBarAggregator`] instance.
514    ///
515    /// # Panics
516    ///
517    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
518    pub fn new<H: FnMut(Bar) + 'static>(
519        bar_type: BarType,
520        price_precision: u8,
521        size_precision: u8,
522        handler: H,
523    ) -> Self {
524        Self {
525            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
526            imbalance: 0,
527        }
528    }
529}
530
531impl BarAggregator for TickImbalanceBarAggregator {
532    fn bar_type(&self) -> BarType {
533        self.core.bar_type
534    }
535
536    fn is_running(&self) -> bool {
537        self.core.is_running
538    }
539
540    fn set_is_running(&mut self, value: bool) {
541        self.core.set_is_running(value);
542    }
543
544    /// Apply the given update to the aggregator.
545    ///
546    /// Note: side-aware logic lives in `handle_trade`. This method is used for
547    /// quote/bar updates where no aggressor side is available.
548    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
549        self.core.apply_update(price, size, ts_init);
550    }
551
552    fn handle_trade(&mut self, trade: TradeTick) {
553        self.core
554            .apply_update(trade.price, trade.size, trade.ts_init);
555
556        let delta = match trade.aggressor_side {
557            AggressorSide::Buyer => 1,
558            AggressorSide::Seller => -1,
559            AggressorSide::NoAggressor => 0,
560        };
561
562        if delta == 0 {
563            return;
564        }
565
566        self.imbalance += delta;
567        let threshold = self.core.bar_type.spec().step.get();
568        if self.imbalance.unsigned_abs() >= threshold {
569            self.core.build_now_and_send();
570            self.imbalance = 0;
571        }
572    }
573
574    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
575        self.core.builder.update_bar(bar, volume, ts_init);
576    }
577}
578
579/// Aggregates bars based on consecutive buy/sell tick runs.
580pub struct TickRunsBarAggregator {
581    core: BarAggregatorCore,
582    current_run_side: Option<AggressorSide>,
583    run_count: usize,
584}
585
586impl Debug for TickRunsBarAggregator {
587    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588        f.debug_struct(stringify!(TickRunsBarAggregator))
589            .field("core", &self.core)
590            .field("current_run_side", &self.current_run_side)
591            .field("run_count", &self.run_count)
592            .finish()
593    }
594}
595
596impl TickRunsBarAggregator {
597    /// Creates a new [`TickRunsBarAggregator`] instance.
598    ///
599    /// # Panics
600    ///
601    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
602    pub fn new<H: FnMut(Bar) + 'static>(
603        bar_type: BarType,
604        price_precision: u8,
605        size_precision: u8,
606        handler: H,
607    ) -> Self {
608        Self {
609            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
610            current_run_side: None,
611            run_count: 0,
612        }
613    }
614}
615
616impl BarAggregator for TickRunsBarAggregator {
617    fn bar_type(&self) -> BarType {
618        self.core.bar_type
619    }
620
621    fn is_running(&self) -> bool {
622        self.core.is_running
623    }
624
625    fn set_is_running(&mut self, value: bool) {
626        self.core.set_is_running(value);
627    }
628
629    /// Apply the given update to the aggregator.
630    ///
631    /// Note: side-aware logic lives in `handle_trade`. This method is used for
632    /// quote/bar updates where no aggressor side is available.
633    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
634        self.core.apply_update(price, size, ts_init);
635    }
636
637    fn handle_trade(&mut self, trade: TradeTick) {
638        let side = match trade.aggressor_side {
639            AggressorSide::Buyer => Some(AggressorSide::Buyer),
640            AggressorSide::Seller => Some(AggressorSide::Seller),
641            AggressorSide::NoAggressor => None,
642        };
643
644        if let Some(side) = side {
645            if self.current_run_side != Some(side) {
646                self.current_run_side = Some(side);
647                self.run_count = 0;
648                self.core.builder.reset();
649            }
650
651            self.core
652                .apply_update(trade.price, trade.size, trade.ts_init);
653            self.run_count += 1;
654
655            let threshold = self.core.bar_type.spec().step.get();
656            if self.run_count >= threshold {
657                self.core.build_now_and_send();
658                self.run_count = 0;
659                self.current_run_side = None;
660            }
661        } else {
662            self.core
663                .apply_update(trade.price, trade.size, trade.ts_init);
664        }
665    }
666
667    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
668        self.core.builder.update_bar(bar, volume, ts_init);
669    }
670}
671
672/// Provides a means of building volume bars aggregated from quote and trades.
673pub struct VolumeBarAggregator {
674    core: BarAggregatorCore,
675}
676
677impl Debug for VolumeBarAggregator {
678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679        f.debug_struct(stringify!(VolumeBarAggregator))
680            .field("core", &self.core)
681            .finish()
682    }
683}
684
685impl VolumeBarAggregator {
686    /// Creates a new [`VolumeBarAggregator`] instance.
687    ///
688    /// # Panics
689    ///
690    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
691    pub fn new<H: FnMut(Bar) + 'static>(
692        bar_type: BarType,
693        price_precision: u8,
694        size_precision: u8,
695        handler: H,
696    ) -> Self {
697        Self {
698            core: BarAggregatorCore::new(
699                bar_type.standard(),
700                price_precision,
701                size_precision,
702                handler,
703            ),
704        }
705    }
706}
707
708impl BarAggregator for VolumeBarAggregator {
709    fn bar_type(&self) -> BarType {
710        self.core.bar_type
711    }
712
713    fn is_running(&self) -> bool {
714        self.core.is_running
715    }
716
717    fn set_is_running(&mut self, value: bool) {
718        self.core.set_is_running(value);
719    }
720
721    /// Apply the given update to the aggregator.
722    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
723        let mut raw_size_update = size.raw;
724        let spec = self.core.bar_type.spec();
725        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
726
727        while raw_size_update > 0 {
728            if self.core.builder.volume.raw + raw_size_update < raw_step {
729                self.core.apply_update(
730                    price,
731                    Quantity::from_raw(raw_size_update, size.precision),
732                    ts_init,
733                );
734                break;
735            }
736
737            let raw_size_diff = raw_step - self.core.builder.volume.raw;
738            self.core.apply_update(
739                price,
740                Quantity::from_raw(raw_size_diff, size.precision),
741                ts_init,
742            );
743
744            self.core.build_now_and_send();
745            raw_size_update -= raw_size_diff;
746        }
747    }
748
749    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
750        let mut raw_volume_update = volume.raw;
751        let spec = self.core.bar_type.spec();
752        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
753
754        while raw_volume_update > 0 {
755            if self.core.builder.volume.raw + raw_volume_update < raw_step {
756                self.core.builder.update_bar(
757                    bar,
758                    Quantity::from_raw(raw_volume_update, volume.precision),
759                    ts_init,
760                );
761                break;
762            }
763
764            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
765            self.core.builder.update_bar(
766                bar,
767                Quantity::from_raw(raw_volume_diff, volume.precision),
768                ts_init,
769            );
770
771            self.core.build_now_and_send();
772            raw_volume_update -= raw_volume_diff;
773        }
774    }
775}
776
777/// Aggregates bars based on buy/sell volume imbalance.
778pub struct VolumeImbalanceBarAggregator {
779    core: BarAggregatorCore,
780    imbalance_raw: i128,
781    raw_step: i128,
782}
783
784impl Debug for VolumeImbalanceBarAggregator {
785    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
786        f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
787            .field("core", &self.core)
788            .field("imbalance_raw", &self.imbalance_raw)
789            .field("raw_step", &self.raw_step)
790            .finish()
791    }
792}
793
794impl VolumeImbalanceBarAggregator {
795    /// Creates a new [`VolumeImbalanceBarAggregator`] instance.
796    ///
797    /// # Panics
798    ///
799    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
800    pub fn new<H: FnMut(Bar) + 'static>(
801        bar_type: BarType,
802        price_precision: u8,
803        size_precision: u8,
804        handler: H,
805    ) -> Self {
806        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
807        Self {
808            core: BarAggregatorCore::new(
809                bar_type.standard(),
810                price_precision,
811                size_precision,
812                handler,
813            ),
814            imbalance_raw: 0,
815            raw_step,
816        }
817    }
818}
819
820impl BarAggregator for VolumeImbalanceBarAggregator {
821    fn bar_type(&self) -> BarType {
822        self.core.bar_type
823    }
824
825    fn is_running(&self) -> bool {
826        self.core.is_running
827    }
828
829    fn set_is_running(&mut self, value: bool) {
830        self.core.set_is_running(value);
831    }
832
833    /// Apply the given update to the aggregator.
834    ///
835    /// Note: side-aware logic lives in `handle_trade`. This method is used for
836    /// quote/bar updates where no aggressor side is available.
837    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
838        self.core.apply_update(price, size, ts_init);
839    }
840
841    fn handle_trade(&mut self, trade: TradeTick) {
842        let side = match trade.aggressor_side {
843            AggressorSide::Buyer => 1,
844            AggressorSide::Seller => -1,
845            AggressorSide::NoAggressor => {
846                self.core
847                    .apply_update(trade.price, trade.size, trade.ts_init);
848                return;
849            }
850        };
851
852        let mut raw_remaining = trade.size.raw as i128;
853        while raw_remaining > 0 {
854            let imbalance_abs = self.imbalance_raw.abs();
855            let needed = (self.raw_step - imbalance_abs).max(1);
856            let raw_chunk = raw_remaining.min(needed);
857            let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
858
859            self.core
860                .apply_update(trade.price, qty_chunk, trade.ts_init);
861
862            self.imbalance_raw += side * raw_chunk;
863            raw_remaining -= raw_chunk;
864
865            if self.imbalance_raw.abs() >= self.raw_step {
866                self.core.build_now_and_send();
867                self.imbalance_raw = 0;
868            }
869        }
870    }
871
872    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
873        self.core.builder.update_bar(bar, volume, ts_init);
874    }
875}
876
877/// Aggregates bars based on consecutive buy/sell volume runs.
878pub struct VolumeRunsBarAggregator {
879    core: BarAggregatorCore,
880    current_run_side: Option<AggressorSide>,
881    run_volume_raw: QuantityRaw,
882    raw_step: QuantityRaw,
883}
884
885impl Debug for VolumeRunsBarAggregator {
886    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887        f.debug_struct(stringify!(VolumeRunsBarAggregator))
888            .field("core", &self.core)
889            .field("current_run_side", &self.current_run_side)
890            .field("run_volume_raw", &self.run_volume_raw)
891            .field("raw_step", &self.raw_step)
892            .finish()
893    }
894}
895
896impl VolumeRunsBarAggregator {
897    /// Creates a new [`VolumeRunsBarAggregator`] instance.
898    ///
899    /// # Panics
900    ///
901    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
902    pub fn new<H: FnMut(Bar) + 'static>(
903        bar_type: BarType,
904        price_precision: u8,
905        size_precision: u8,
906        handler: H,
907    ) -> Self {
908        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
909        Self {
910            core: BarAggregatorCore::new(
911                bar_type.standard(),
912                price_precision,
913                size_precision,
914                handler,
915            ),
916            current_run_side: None,
917            run_volume_raw: 0,
918            raw_step,
919        }
920    }
921}
922
923impl BarAggregator for VolumeRunsBarAggregator {
924    fn bar_type(&self) -> BarType {
925        self.core.bar_type
926    }
927
928    fn is_running(&self) -> bool {
929        self.core.is_running
930    }
931
932    fn set_is_running(&mut self, value: bool) {
933        self.core.set_is_running(value);
934    }
935
936    /// Apply the given update to the aggregator.
937    ///
938    /// Note: side-aware logic lives in `handle_trade`. This method is used for
939    /// quote/bar updates where no aggressor side is available.
940    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
941        self.core.apply_update(price, size, ts_init);
942    }
943
944    fn handle_trade(&mut self, trade: TradeTick) {
945        let side = match trade.aggressor_side {
946            AggressorSide::Buyer => Some(AggressorSide::Buyer),
947            AggressorSide::Seller => Some(AggressorSide::Seller),
948            AggressorSide::NoAggressor => None,
949        };
950
951        let Some(side) = side else {
952            self.core
953                .apply_update(trade.price, trade.size, trade.ts_init);
954            return;
955        };
956
957        if self.current_run_side != Some(side) {
958            self.current_run_side = Some(side);
959            self.run_volume_raw = 0;
960            self.core.builder.reset();
961        }
962
963        let mut raw_remaining = trade.size.raw;
964        while raw_remaining > 0 {
965            let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
966            let raw_chunk = raw_remaining.min(needed);
967
968            self.core.apply_update(
969                trade.price,
970                Quantity::from_raw(raw_chunk, trade.size.precision),
971                trade.ts_init,
972            );
973
974            self.run_volume_raw += raw_chunk;
975            raw_remaining -= raw_chunk;
976
977            if self.run_volume_raw >= self.raw_step {
978                self.core.build_now_and_send();
979                self.run_volume_raw = 0;
980                self.current_run_side = None;
981            }
982        }
983    }
984
985    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
986        self.core.builder.update_bar(bar, volume, ts_init);
987    }
988}
989
990/// Provides a means of building value bars aggregated from quote and trades.
991///
992/// When received value reaches the step threshold of the bar
993/// specification, then a bar is created and sent to the handler.
994pub struct ValueBarAggregator {
995    core: BarAggregatorCore,
996    cum_value: f64,
997}
998
999impl Debug for ValueBarAggregator {
1000    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1001        f.debug_struct(stringify!(ValueBarAggregator))
1002            .field("core", &self.core)
1003            .field("cum_value", &self.cum_value)
1004            .finish()
1005    }
1006}
1007
1008impl ValueBarAggregator {
1009    /// Creates a new [`ValueBarAggregator`] instance.
1010    ///
1011    /// # Panics
1012    ///
1013    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1014    pub fn new<H: FnMut(Bar) + 'static>(
1015        bar_type: BarType,
1016        price_precision: u8,
1017        size_precision: u8,
1018        handler: H,
1019    ) -> Self {
1020        Self {
1021            core: BarAggregatorCore::new(
1022                bar_type.standard(),
1023                price_precision,
1024                size_precision,
1025                handler,
1026            ),
1027            cum_value: 0.0,
1028        }
1029    }
1030
1031    #[must_use]
1032    /// Returns the cumulative value for the aggregator.
1033    pub const fn get_cumulative_value(&self) -> f64 {
1034        self.cum_value
1035    }
1036}
1037
1038impl BarAggregator for ValueBarAggregator {
1039    fn bar_type(&self) -> BarType {
1040        self.core.bar_type
1041    }
1042
1043    fn is_running(&self) -> bool {
1044        self.core.is_running
1045    }
1046
1047    fn set_is_running(&mut self, value: bool) {
1048        self.core.set_is_running(value);
1049    }
1050
1051    /// Apply the given update to the aggregator.
1052    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1053        let mut size_update = size.as_f64();
1054        let spec = self.core.bar_type.spec();
1055
1056        while size_update > 0.0 {
1057            let value_update = price.as_f64() * size_update;
1058            if value_update == 0.0 {
1059                // Prevent division by zero - apply remaining size without triggering bar
1060                self.core
1061                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
1062                break;
1063            }
1064
1065            if self.cum_value + value_update < spec.step.get() as f64 {
1066                self.cum_value += value_update;
1067                self.core
1068                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
1069                break;
1070            }
1071
1072            let value_diff = spec.step.get() as f64 - self.cum_value;
1073            let mut size_diff = size_update * (value_diff / value_update);
1074
1075            // Clamp to minimum representable size to avoid zero-volume bars
1076            if is_below_min_size(size_diff, size.precision) {
1077                if is_below_min_size(size_update, size.precision) {
1078                    break;
1079                }
1080                size_diff = min_size_f64(size.precision);
1081            }
1082
1083            self.core
1084                .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1085
1086            self.core.build_now_and_send();
1087            self.cum_value = 0.0;
1088            size_update -= size_diff;
1089        }
1090    }
1091
1092    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1093        let mut volume_update = volume;
1094        let average_price = Price::new(
1095            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1096            self.core.builder.price_precision,
1097        );
1098
1099        while volume_update.as_f64() > 0.0 {
1100            let value_update = average_price.as_f64() * volume_update.as_f64();
1101            if value_update == 0.0 {
1102                // Prevent division by zero - apply remaining volume without triggering bar
1103                self.core.builder.update_bar(bar, volume_update, ts_init);
1104                break;
1105            }
1106
1107            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1108                self.cum_value += value_update;
1109                self.core.builder.update_bar(bar, volume_update, ts_init);
1110                break;
1111            }
1112
1113            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1114            let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1115
1116            // Clamp to minimum representable size to avoid zero-volume bars
1117            if is_below_min_size(volume_diff, volume_update.precision) {
1118                if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1119                    break;
1120                }
1121                volume_diff = min_size_f64(volume_update.precision);
1122            }
1123
1124            self.core.builder.update_bar(
1125                bar,
1126                Quantity::new(volume_diff, volume_update.precision),
1127                ts_init,
1128            );
1129
1130            self.core.build_now_and_send();
1131            self.cum_value = 0.0;
1132            volume_update = Quantity::new(
1133                volume_update.as_f64() - volume_diff,
1134                volume_update.precision,
1135            );
1136        }
1137    }
1138}
1139
1140/// Aggregates bars based on buy/sell notional imbalance.
1141pub struct ValueImbalanceBarAggregator {
1142    core: BarAggregatorCore,
1143    imbalance_value: f64,
1144    step_value: f64,
1145}
1146
1147impl Debug for ValueImbalanceBarAggregator {
1148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1149        f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1150            .field("core", &self.core)
1151            .field("imbalance_value", &self.imbalance_value)
1152            .field("step_value", &self.step_value)
1153            .finish()
1154    }
1155}
1156
1157impl ValueImbalanceBarAggregator {
1158    /// Creates a new [`ValueImbalanceBarAggregator`] instance.
1159    ///
1160    /// # Panics
1161    ///
1162    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1163    pub fn new<H: FnMut(Bar) + 'static>(
1164        bar_type: BarType,
1165        price_precision: u8,
1166        size_precision: u8,
1167        handler: H,
1168    ) -> Self {
1169        Self {
1170            core: BarAggregatorCore::new(
1171                bar_type.standard(),
1172                price_precision,
1173                size_precision,
1174                handler,
1175            ),
1176            imbalance_value: 0.0,
1177            step_value: bar_type.spec().step.get() as f64,
1178        }
1179    }
1180}
1181
1182impl BarAggregator for ValueImbalanceBarAggregator {
1183    fn bar_type(&self) -> BarType {
1184        self.core.bar_type
1185    }
1186
1187    fn is_running(&self) -> bool {
1188        self.core.is_running
1189    }
1190
1191    fn set_is_running(&mut self, value: bool) {
1192        self.core.set_is_running(value);
1193    }
1194
1195    /// Apply the given update to the aggregator.
1196    ///
1197    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1198    /// quote/bar updates where no aggressor side is available.
1199    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1200        self.core.apply_update(price, size, ts_init);
1201    }
1202
1203    fn handle_trade(&mut self, trade: TradeTick) {
1204        let price_f64 = trade.price.as_f64();
1205        if price_f64 == 0.0 {
1206            self.core
1207                .apply_update(trade.price, trade.size, trade.ts_init);
1208            return;
1209        }
1210
1211        let side_sign = match trade.aggressor_side {
1212            AggressorSide::Buyer => 1.0,
1213            AggressorSide::Seller => -1.0,
1214            AggressorSide::NoAggressor => {
1215                self.core
1216                    .apply_update(trade.price, trade.size, trade.ts_init);
1217                return;
1218            }
1219        };
1220
1221        let mut size_remaining = trade.size.as_f64();
1222        while size_remaining > 0.0 {
1223            let value_remaining = price_f64 * size_remaining;
1224
1225            #[allow(clippy::float_cmp, reason = "exact-zero check on accumulator")]
1226            if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1227                let needed = self.step_value - self.imbalance_value.abs();
1228                if value_remaining <= needed {
1229                    self.imbalance_value += side_sign * value_remaining;
1230                    self.core.apply_update(
1231                        trade.price,
1232                        Quantity::new(size_remaining, trade.size.precision),
1233                        trade.ts_init,
1234                    );
1235
1236                    if self.imbalance_value.abs() >= self.step_value {
1237                        self.core.build_now_and_send();
1238                        self.imbalance_value = 0.0;
1239                    }
1240                    break;
1241                }
1242
1243                let mut value_chunk = needed;
1244                let mut size_chunk = value_chunk / price_f64;
1245
1246                // Clamp to minimum representable size to avoid zero-volume bars
1247                if is_below_min_size(size_chunk, trade.size.precision) {
1248                    if is_below_min_size(size_remaining, trade.size.precision) {
1249                        break;
1250                    }
1251                    size_chunk = min_size_f64(trade.size.precision);
1252                    value_chunk = price_f64 * size_chunk;
1253                }
1254
1255                self.core.apply_update(
1256                    trade.price,
1257                    Quantity::new(size_chunk, trade.size.precision),
1258                    trade.ts_init,
1259                );
1260                self.imbalance_value += side_sign * value_chunk;
1261                size_remaining -= size_chunk;
1262
1263                if self.imbalance_value.abs() >= self.step_value {
1264                    self.core.build_now_and_send();
1265                    self.imbalance_value = 0.0;
1266                }
1267            } else {
1268                // Opposing side: first neutralize existing imbalance
1269                let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1270                let mut size_chunk = value_to_flatten / price_f64;
1271
1272                // Clamp to minimum representable size to avoid zero-volume bars
1273                if is_below_min_size(size_chunk, trade.size.precision) {
1274                    if is_below_min_size(size_remaining, trade.size.precision) {
1275                        break;
1276                    }
1277                    size_chunk = min_size_f64(trade.size.precision);
1278                    value_to_flatten = price_f64 * size_chunk;
1279                }
1280
1281                self.core.apply_update(
1282                    trade.price,
1283                    Quantity::new(size_chunk, trade.size.precision),
1284                    trade.ts_init,
1285                );
1286                self.imbalance_value += side_sign * value_to_flatten;
1287
1288                // Min-size clamp can overshoot past threshold
1289                if self.imbalance_value.abs() >= self.step_value {
1290                    self.core.build_now_and_send();
1291                    self.imbalance_value = 0.0;
1292                }
1293                size_remaining -= size_chunk;
1294            }
1295        }
1296    }
1297
1298    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1299        self.core.builder.update_bar(bar, volume, ts_init);
1300    }
1301}
1302
1303/// Aggregates bars based on consecutive buy/sell notional runs.
1304pub struct ValueRunsBarAggregator {
1305    core: BarAggregatorCore,
1306    current_run_side: Option<AggressorSide>,
1307    run_value: f64,
1308    step_value: f64,
1309}
1310
1311impl Debug for ValueRunsBarAggregator {
1312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1313        f.debug_struct(stringify!(ValueRunsBarAggregator))
1314            .field("core", &self.core)
1315            .field("current_run_side", &self.current_run_side)
1316            .field("run_value", &self.run_value)
1317            .field("step_value", &self.step_value)
1318            .finish()
1319    }
1320}
1321
1322impl ValueRunsBarAggregator {
1323    /// Creates a new [`ValueRunsBarAggregator`] instance.
1324    ///
1325    /// # Panics
1326    ///
1327    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1328    pub fn new<H: FnMut(Bar) + 'static>(
1329        bar_type: BarType,
1330        price_precision: u8,
1331        size_precision: u8,
1332        handler: H,
1333    ) -> Self {
1334        Self {
1335            core: BarAggregatorCore::new(
1336                bar_type.standard(),
1337                price_precision,
1338                size_precision,
1339                handler,
1340            ),
1341            current_run_side: None,
1342            run_value: 0.0,
1343            step_value: bar_type.spec().step.get() as f64,
1344        }
1345    }
1346}
1347
1348impl BarAggregator for ValueRunsBarAggregator {
1349    fn bar_type(&self) -> BarType {
1350        self.core.bar_type
1351    }
1352
1353    fn is_running(&self) -> bool {
1354        self.core.is_running
1355    }
1356
1357    fn set_is_running(&mut self, value: bool) {
1358        self.core.set_is_running(value);
1359    }
1360
1361    /// Apply the given update to the aggregator.
1362    ///
1363    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1364    /// quote/bar updates where no aggressor side is available.
1365    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1366        self.core.apply_update(price, size, ts_init);
1367    }
1368
1369    fn handle_trade(&mut self, trade: TradeTick) {
1370        let price_f64 = trade.price.as_f64();
1371        if price_f64 == 0.0 {
1372            self.core
1373                .apply_update(trade.price, trade.size, trade.ts_init);
1374            return;
1375        }
1376
1377        let side = match trade.aggressor_side {
1378            AggressorSide::Buyer => Some(AggressorSide::Buyer),
1379            AggressorSide::Seller => Some(AggressorSide::Seller),
1380            AggressorSide::NoAggressor => None,
1381        };
1382
1383        let Some(side) = side else {
1384            self.core
1385                .apply_update(trade.price, trade.size, trade.ts_init);
1386            return;
1387        };
1388
1389        if self.current_run_side != Some(side) {
1390            self.current_run_side = Some(side);
1391            self.run_value = 0.0;
1392            self.core.builder.reset();
1393        }
1394
1395        let mut size_remaining = trade.size.as_f64();
1396        while size_remaining > 0.0 {
1397            let value_update = price_f64 * size_remaining;
1398            if self.run_value + value_update < self.step_value {
1399                self.run_value += value_update;
1400                self.core.apply_update(
1401                    trade.price,
1402                    Quantity::new(size_remaining, trade.size.precision),
1403                    trade.ts_init,
1404                );
1405                break;
1406            }
1407
1408            let value_needed = self.step_value - self.run_value;
1409            let mut size_chunk = value_needed / price_f64;
1410
1411            // Clamp to minimum representable size to avoid zero-volume bars
1412            if is_below_min_size(size_chunk, trade.size.precision) {
1413                if is_below_min_size(size_remaining, trade.size.precision) {
1414                    break;
1415                }
1416                size_chunk = min_size_f64(trade.size.precision);
1417            }
1418
1419            self.core.apply_update(
1420                trade.price,
1421                Quantity::new(size_chunk, trade.size.precision),
1422                trade.ts_init,
1423            );
1424
1425            self.core.build_now_and_send();
1426            self.run_value = 0.0;
1427            self.current_run_side = None;
1428            size_remaining -= size_chunk;
1429        }
1430    }
1431
1432    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1433        self.core.builder.update_bar(bar, volume, ts_init);
1434    }
1435}
1436
1437/// Provides a means of building Renko bars aggregated from quote and trades.
1438///
1439/// Renko bars are created when the price moves by a fixed amount (brick size)
1440/// regardless of time or volume. Each bar represents a price movement equal
1441/// to the step size in the bar specification.
1442pub struct RenkoBarAggregator {
1443    core: BarAggregatorCore,
1444    pub brick_size: PriceRaw,
1445    last_close: Option<Price>,
1446}
1447
1448impl Debug for RenkoBarAggregator {
1449    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1450        f.debug_struct(stringify!(RenkoBarAggregator))
1451            .field("core", &self.core)
1452            .field("brick_size", &self.brick_size)
1453            .field("last_close", &self.last_close)
1454            .finish()
1455    }
1456}
1457
1458impl RenkoBarAggregator {
1459    /// Creates a new [`RenkoBarAggregator`] instance.
1460    ///
1461    /// # Panics
1462    ///
1463    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1464    pub fn new<H: FnMut(Bar) + 'static>(
1465        bar_type: BarType,
1466        price_precision: u8,
1467        size_precision: u8,
1468        price_increment: Price,
1469        handler: H,
1470    ) -> Self {
1471        // Calculate brick size in raw price units (step * price_increment.raw)
1472        let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1473
1474        Self {
1475            core: BarAggregatorCore::new(
1476                bar_type.standard(),
1477                price_precision,
1478                size_precision,
1479                handler,
1480            ),
1481            brick_size,
1482            last_close: None,
1483        }
1484    }
1485}
1486
1487impl BarAggregator for RenkoBarAggregator {
1488    fn bar_type(&self) -> BarType {
1489        self.core.bar_type
1490    }
1491
1492    fn is_running(&self) -> bool {
1493        self.core.is_running
1494    }
1495
1496    fn set_is_running(&mut self, value: bool) {
1497        self.core.set_is_running(value);
1498    }
1499
1500    /// Apply the given update to the aggregator.
1501    ///
1502    /// For Renko bars, we check if the price movement from the last close
1503    /// is greater than or equal to the brick size. If so, we create new bars.
1504    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1505        // Always update the builder with the current tick
1506        self.core.apply_update(price, size, ts_init);
1507
1508        // Initialize last_close if this is the first update
1509        if self.last_close.is_none() {
1510            self.last_close = Some(price);
1511            return;
1512        }
1513
1514        let last_close = self.last_close.unwrap();
1515
1516        // Convert prices to raw units (integers) to avoid floating point precision issues
1517        let current_raw = price.raw;
1518        let last_close_raw = last_close.raw;
1519        let price_diff_raw = current_raw - last_close_raw;
1520        let abs_price_diff_raw = price_diff_raw.abs();
1521
1522        // Check if we need to create one or more Renko bars
1523        if abs_price_diff_raw >= self.brick_size {
1524            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1525            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1526            let mut current_close = last_close;
1527
1528            // Store the current builder volume to distribute across bricks
1529            let total_volume = self.core.builder.volume;
1530
1531            for _i in 0..num_bricks {
1532                // Calculate the close price for this brick using raw price units
1533                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1534                let brick_close = Price::from_raw(brick_close_raw, price.precision);
1535
1536                // For Renko bars: open = previous close, high/low depend on direction
1537                let (brick_high, brick_low) = if direction > 0.0 {
1538                    (brick_close, current_close)
1539                } else {
1540                    (current_close, brick_close)
1541                };
1542
1543                // Reset builder for this brick
1544                self.core.builder.reset();
1545                self.core.builder.open = Some(current_close);
1546                self.core.builder.high = Some(brick_high);
1547                self.core.builder.low = Some(brick_low);
1548                self.core.builder.close = Some(brick_close);
1549                self.core.builder.volume = total_volume; // Each brick gets the full volume
1550                self.core.builder.count = 1;
1551                self.core.builder.ts_last = ts_init;
1552                self.core.builder.initialized = true;
1553
1554                // Build and send the bar
1555                self.core.build_and_send(ts_init, ts_init);
1556
1557                // Update for the next brick
1558                current_close = brick_close;
1559                self.last_close = Some(brick_close);
1560            }
1561        }
1562    }
1563
1564    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1565        // Always update the builder with the current bar
1566        self.core.builder.update_bar(bar, volume, ts_init);
1567
1568        // Initialize last_close if this is the first update
1569        if self.last_close.is_none() {
1570            self.last_close = Some(bar.close);
1571            return;
1572        }
1573
1574        let last_close = self.last_close.unwrap();
1575
1576        // Convert prices to raw units (integers) to avoid floating point precision issues
1577        let current_raw = bar.close.raw;
1578        let last_close_raw = last_close.raw;
1579        let price_diff_raw = current_raw - last_close_raw;
1580        let abs_price_diff_raw = price_diff_raw.abs();
1581
1582        // Check if we need to create one or more Renko bars
1583        if abs_price_diff_raw >= self.brick_size {
1584            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1585            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1586            let mut current_close = last_close;
1587
1588            // Store the current builder volume to distribute across bricks
1589            let total_volume = self.core.builder.volume;
1590
1591            for _i in 0..num_bricks {
1592                // Calculate the close price for this brick using raw price units
1593                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1594                let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1595
1596                // For Renko bars: open = previous close, high/low depend on direction
1597                let (brick_high, brick_low) = if direction > 0.0 {
1598                    (brick_close, current_close)
1599                } else {
1600                    (current_close, brick_close)
1601                };
1602
1603                // Reset builder for this brick
1604                self.core.builder.reset();
1605                self.core.builder.open = Some(current_close);
1606                self.core.builder.high = Some(brick_high);
1607                self.core.builder.low = Some(brick_low);
1608                self.core.builder.close = Some(brick_close);
1609                self.core.builder.volume = total_volume; // Each brick gets the full volume
1610                self.core.builder.count = 1;
1611                self.core.builder.ts_last = ts_init;
1612                self.core.builder.initialized = true;
1613
1614                // Build and send the bar
1615                self.core.build_and_send(ts_init, ts_init);
1616
1617                // Update for the next brick
1618                current_close = brick_close;
1619                self.last_close = Some(brick_close);
1620            }
1621        }
1622    }
1623}
1624
1625/// Provides a means of building time bars aggregated from quote and trades.
1626///
1627/// At each aggregation time interval, a bar is created and sent to the handler.
1628pub struct TimeBarAggregator {
1629    core: BarAggregatorCore,
1630    clock: Rc<RefCell<dyn Clock>>,
1631    build_with_no_updates: bool,
1632    timestamp_on_close: bool,
1633    is_left_open: bool,
1634    stored_open_ns: UnixNanos,
1635    timer_name: String,
1636    interval_ns: UnixNanos,
1637    next_close_ns: UnixNanos,
1638    first_close_ns: UnixNanos,
1639    bar_build_delay: u64,
1640    time_bars_origin_offset: Option<TimeDelta>,
1641    skip_first_non_full_bar: bool,
1642    pub historical_mode: bool,
1643    historical_events: Vec<TimeEvent>,
1644    historical_event_at_ts_init: Option<TimeEvent>,
1645    aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1646}
1647
1648impl Debug for TimeBarAggregator {
1649    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1650        f.debug_struct(stringify!(TimeBarAggregator))
1651            .field("core", &self.core)
1652            .field("build_with_no_updates", &self.build_with_no_updates)
1653            .field("timestamp_on_close", &self.timestamp_on_close)
1654            .field("is_left_open", &self.is_left_open)
1655            .field("timer_name", &self.timer_name)
1656            .field("interval_ns", &self.interval_ns)
1657            .field("bar_build_delay", &self.bar_build_delay)
1658            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1659            .finish()
1660    }
1661}
1662
1663impl TimeBarAggregator {
1664    /// Creates a new [`TimeBarAggregator`] instance.
1665    ///
1666    /// # Panics
1667    ///
1668    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1669    #[expect(clippy::too_many_arguments)]
1670    pub fn new<H: FnMut(Bar) + 'static>(
1671        bar_type: BarType,
1672        price_precision: u8,
1673        size_precision: u8,
1674        clock: Rc<RefCell<dyn Clock>>,
1675        handler: H,
1676        build_with_no_updates: bool,
1677        timestamp_on_close: bool,
1678        interval_type: BarIntervalType,
1679        time_bars_origin_offset: Option<TimeDelta>,
1680        bar_build_delay: u64,
1681        skip_first_non_full_bar: bool,
1682    ) -> Self {
1683        let is_left_open = match interval_type {
1684            BarIntervalType::LeftOpen => true,
1685            BarIntervalType::RightOpen => false,
1686        };
1687
1688        let core = BarAggregatorCore::new(
1689            bar_type.standard(),
1690            price_precision,
1691            size_precision,
1692            handler,
1693        );
1694
1695        Self {
1696            core,
1697            clock,
1698            build_with_no_updates,
1699            timestamp_on_close,
1700            is_left_open,
1701            stored_open_ns: UnixNanos::default(),
1702            timer_name: format!("TIME_BAR_{bar_type}"),
1703            interval_ns: get_bar_interval_ns(&bar_type),
1704            next_close_ns: UnixNanos::default(),
1705            first_close_ns: UnixNanos::default(),
1706            bar_build_delay,
1707            time_bars_origin_offset,
1708            skip_first_non_full_bar,
1709            historical_mode: false,
1710            historical_events: Vec::new(),
1711            historical_event_at_ts_init: None,
1712            aggregator_weak: None,
1713        }
1714    }
1715
1716    /// Sets the clock for the aggregator (internal method).
1717    pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1718        self.clock = clock;
1719    }
1720
1721    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
1722    ///
1723    /// This matches the Cython `start_timer()` method exactly.
1724    /// Creates a callback to `build_bar` using a weak reference to the aggregator.
1725    ///
1726    /// # Panics
1727    ///
1728    /// Panics if `aggregator_rc` is None and `aggregator_weak` hasn't been set, or if timer registration fails.
1729    pub fn start_timer_internal(
1730        &mut self,
1731        aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1732    ) {
1733        // Create callback that calls build_bar through the weak reference
1734        let aggregator_weak = if let Some(rc) = aggregator_rc {
1735            // Store weak reference for future use (e.g., in build_bar for month/year)
1736            let weak = Rc::downgrade(&rc);
1737            self.aggregator_weak = Some(weak.clone());
1738            weak
1739        } else {
1740            // Use existing weak reference (for historical mode where it was set earlier)
1741            self.aggregator_weak
1742                .as_ref()
1743                .expect("Aggregator weak reference must be set before calling start_timer()")
1744                .clone()
1745        };
1746
1747        let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1748            if let Some(agg) = aggregator_weak.upgrade() {
1749                agg.borrow_mut().build_bar(&event);
1750            }
1751        }));
1752
1753        // Computing start_time
1754        let now = self.clock.borrow().utc_now();
1755        let mut start_time =
1756            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1757        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1758
1759        // Closing a partial bar at the transition from historical to backtest data
1760        let fire_immediately = start_time == now;
1761
1762        let spec = &self.bar_type().spec();
1763        let start_time_ns = UnixNanos::from(start_time);
1764        let step = spec.step.get() as u32;
1765
1766        if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1767            self.clock
1768                .borrow_mut()
1769                .set_timer_ns(
1770                    &self.timer_name,
1771                    self.interval_ns.as_u64(),
1772                    Some(start_time_ns),
1773                    None,
1774                    Some(callback),
1775                    Some(true), // allow_past
1776                    Some(fire_immediately),
1777                )
1778                .expect(FAILED);
1779
1780            if fire_immediately {
1781                self.next_close_ns = start_time_ns;
1782            } else {
1783                let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1784                self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1785            }
1786
1787            self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1788        } else {
1789            // The monthly/yearly alert time is defined iteratively at each alert time as there is no regular interval
1790            let alert_time = if fire_immediately {
1791                start_time
1792            } else if spec.aggregation == BarAggregation::Month {
1793                add_n_months(start_time, step).expect(FAILED)
1794            } else {
1795                add_n_years(start_time, step).expect(FAILED)
1796            };
1797
1798            self.clock
1799                .borrow_mut()
1800                .set_time_alert_ns(
1801                    &self.timer_name,
1802                    UnixNanos::from(alert_time),
1803                    Some(callback),
1804                    Some(true), // allow_past
1805                )
1806                .expect(FAILED);
1807
1808            self.next_close_ns = UnixNanos::from(alert_time);
1809            // Mirror Cython: stored_open = close_time - step, so when fire_immediately the
1810            // current (partial) bar started `step` periods before start_time.
1811            self.stored_open_ns = if fire_immediately {
1812                if spec.aggregation == BarAggregation::Month {
1813                    subtract_n_months_nanos(start_time_ns, step).expect(FAILED)
1814                } else {
1815                    subtract_n_years_nanos(start_time_ns, step).expect(FAILED)
1816                }
1817            } else {
1818                start_time_ns
1819            };
1820        }
1821
1822        if self.skip_first_non_full_bar {
1823            self.first_close_ns = self.next_close_ns;
1824        }
1825
1826        log::debug!(
1827            "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1828            self.timer_name,
1829            start_time,
1830            self.historical_mode,
1831            fire_immediately,
1832            now,
1833            self.bar_build_delay
1834        );
1835    }
1836
1837    /// Stops the time bar aggregator.
1838    pub fn stop(&mut self) {
1839        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1840    }
1841
1842    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1843        if self.skip_first_non_full_bar && ts_init <= self.first_close_ns {
1844            self.core.builder.reset();
1845        } else {
1846            // Clear for the transition from historical to live data; subsequent
1847            // bars always emit regardless of timestamp.
1848            self.skip_first_non_full_bar = false;
1849            self.core.build_and_send(ts_event, ts_init);
1850        }
1851    }
1852
1853    fn build_bar(&mut self, event: &TimeEvent) {
1854        if !self.core.builder.initialized {
1855            return;
1856        }
1857
1858        if !self.build_with_no_updates && self.core.builder.count == 0 {
1859            return; // Do not build bar when no update
1860        }
1861
1862        let ts_init = event.ts_event;
1863        let ts_event = if self.is_left_open {
1864            if self.timestamp_on_close {
1865                event.ts_event
1866            } else {
1867                self.stored_open_ns
1868            }
1869        } else {
1870            self.stored_open_ns
1871        };
1872
1873        self.build_and_send(ts_event, ts_init);
1874
1875        // Close time becomes the next open time
1876        self.stored_open_ns = event.ts_event;
1877
1878        if self.bar_type().spec().aggregation == BarAggregation::Month {
1879            let step = self.bar_type().spec().step.get() as u32;
1880            let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1881
1882            self.clock
1883                .borrow_mut()
1884                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1885                .expect(FAILED);
1886
1887            self.next_close_ns = alert_time_ns;
1888        } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1889            let step = self.bar_type().spec().step.get() as u32;
1890            let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1891
1892            self.clock
1893                .borrow_mut()
1894                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1895                .expect(FAILED);
1896
1897            self.next_close_ns = alert_time_ns;
1898        } else {
1899            // On receiving this event, timer should now have a new `next_time_ns`
1900            self.next_close_ns = self
1901                .clock
1902                .borrow()
1903                .next_time_ns(&self.timer_name)
1904                .unwrap_or_default();
1905        }
1906    }
1907
1908    fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1909        if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1910            // In historical mode, clock is always a TestClock (set by data engine)
1911            {
1912                let mut clock_borrow = self.clock.borrow_mut();
1913                let test_clock = clock_borrow
1914                    .as_any_mut()
1915                    .downcast_mut::<TestClock>()
1916                    .expect("Expected TestClock in historical mode");
1917                test_clock.set_time(ts_init);
1918            }
1919            // In historical mode, weak reference should already be set
1920            self.start_timer_internal(None);
1921        }
1922
1923        // Advance this aggregator's independent clock and collect timer events.
1924        let events = {
1925            let mut clock_borrow = self.clock.borrow_mut();
1926            let test_clock = clock_borrow
1927                .as_any_mut()
1928                .downcast_mut::<TestClock>()
1929                .expect("Expected TestClock in historical mode");
1930            test_clock.advance_time(ts_init, true)
1931        };
1932
1933        for event in events {
1934            if event.ts_event == ts_init {
1935                self.historical_event_at_ts_init = Some(event);
1936            } else {
1937                self.build_bar(&event);
1938            }
1939        }
1940    }
1941
1942    fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1943        if let Some(ref event) = self.historical_event_at_ts_init.take() {
1944            self.build_bar(event);
1945        }
1946    }
1947
1948    /// Sets historical events (called by data engine after advancing clock)
1949    pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1950        self.historical_events = events;
1951    }
1952}
1953
1954impl BarAggregator for TimeBarAggregator {
1955    fn bar_type(&self) -> BarType {
1956        self.core.bar_type
1957    }
1958
1959    fn is_running(&self) -> bool {
1960        self.core.is_running
1961    }
1962
1963    fn set_is_running(&mut self, value: bool) {
1964        self.core.set_is_running(value);
1965    }
1966
1967    /// Stop time-based aggregator by canceling its timer.
1968    fn stop(&mut self) {
1969        Self::stop(self);
1970    }
1971
1972    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1973        if self.historical_mode {
1974            self.preprocess_historical_events(ts_init);
1975        }
1976
1977        self.core.apply_update(price, size, ts_init);
1978
1979        if self.historical_mode {
1980            self.postprocess_historical_events(ts_init);
1981        }
1982    }
1983
1984    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1985        if self.historical_mode {
1986            self.preprocess_historical_events(ts_init);
1987        }
1988
1989        self.core.builder.update_bar(bar, volume, ts_init);
1990
1991        if self.historical_mode {
1992            self.postprocess_historical_events(ts_init);
1993        }
1994    }
1995
1996    fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1997        self.historical_mode = historical_mode;
1998        self.core.handler = handler;
1999    }
2000
2001    fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
2002        self.set_historical_events_internal(events);
2003    }
2004
2005    fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
2006        self.set_clock_internal(clock);
2007    }
2008
2009    fn build_bar(&mut self, event: &TimeEvent) {
2010        // Delegate to the implementation method
2011        // We use the struct name here to disambiguate from the trait method
2012        {
2013            #[expect(clippy::use_self)]
2014            TimeBarAggregator::build_bar(self, event);
2015        }
2016    }
2017
2018    fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
2019        self.aggregator_weak = Some(weak);
2020    }
2021
2022    fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
2023        self.start_timer_internal(aggregator_rc);
2024    }
2025}
2026
2027fn is_below_min_size(size: f64, precision: u8) -> bool {
2028    Quantity::new(size, precision).raw == 0
2029}
2030
2031fn min_size_f64(precision: u8) -> f64 {
2032    10_f64.powi(-(precision as i32))
2033}
2034
2035/// Provider for vega per leg (option spreads). Returns `None` when greeks are unavailable.
2036pub trait VegaProvider {
2037    /// Returns vega for the given leg instrument, or `None` if not available.
2038    fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64>;
2039}
2040
2041/// Rounder for spread bid/ask (e.g. tick scheme). When absent, raw prices are used with instrument precision.
2042pub trait SpreadPriceRounder {
2043    /// Rounds raw bid/ask to valid prices (handles negative prices with mirroring when using tick scheme).
2044    fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price);
2045}
2046
2047/// Vega provider that returns leg vegas from a map (e.g. populated from greeks cache).
2048#[derive(Debug, Default)]
2049pub struct MapVegaProvider {
2050    vegas: AHashMap<InstrumentId, f64>,
2051}
2052
2053impl MapVegaProvider {
2054    pub fn new() -> Self {
2055        Self {
2056            vegas: AHashMap::new(),
2057        }
2058    }
2059
2060    pub fn insert(&mut self, instrument_id: InstrumentId, vega: f64) {
2061        self.vegas.insert(instrument_id, vega);
2062    }
2063
2064    pub fn get(&self, instrument_id: &InstrumentId) -> Option<f64> {
2065        self.vegas.get(instrument_id).copied()
2066    }
2067}
2068
2069impl VegaProvider for MapVegaProvider {
2070    fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64> {
2071        self.vegas.get(&instrument_id).copied()
2072    }
2073}
2074
2075/// Rounder that uses a fixed tick size; mirrors negative prices for tick alignment (Cython parity).
2076#[derive(Debug)]
2077pub struct FixedTickSchemeRounder {
2078    scheme: FixedTickScheme,
2079}
2080
2081impl FixedTickSchemeRounder {
2082    /// Creates a rounder with the given tick size.
2083    ///
2084    /// # Errors
2085    ///
2086    /// Returns an error if `tick` is not positive.
2087    pub fn new(tick: f64) -> anyhow::Result<Self> {
2088        Ok(Self {
2089            scheme: FixedTickScheme::new(tick)?,
2090        })
2091    }
2092
2093    fn round_one(&self, raw: f64, precision: u8, use_bid_rounding: bool) -> Price {
2094        if raw >= 0.0 {
2095            let p = if use_bid_rounding {
2096                self.scheme.next_bid_price(raw, 0, precision)
2097            } else {
2098                self.scheme.next_ask_price(raw, 0, precision)
2099            };
2100            p.unwrap_or_else(|| price_from_f64(raw, precision))
2101        } else {
2102            let p = if use_bid_rounding {
2103                self.scheme.next_ask_price(-raw, 0, precision)
2104            } else {
2105                self.scheme.next_bid_price(-raw, 0, precision)
2106            };
2107            p.map_or_else(
2108                || price_from_f64(raw, precision),
2109                |q| price_from_f64(-q.as_f64(), precision),
2110            )
2111        }
2112    }
2113}
2114
2115impl SpreadPriceRounder for FixedTickSchemeRounder {
2116    fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price) {
2117        let bid = self.round_one(raw_bid, precision, true);
2118        let ask = self.round_one(raw_ask, precision, false);
2119        (bid, ask)
2120    }
2121}
2122
2123/// Spread quote aggregator: builds synthetic quotes from leg quotes (Cython parity).
2124///
2125/// Quote-driven mode (`update_interval_seconds == None`): emits when all legs have quotes.
2126/// Timer-driven mode: emits on timer fire when `_has_update` is true.
2127/// Historical mode: defers timer event at `ts_init` until after the update.
2128pub struct SpreadQuoteAggregator {
2129    spread_instrument_id: InstrumentId,
2130    leg_ids: Vec<InstrumentId>,
2131    ratios: Vec<i64>,
2132    n_legs: usize,
2133    is_futures_spread: bool,
2134    price_precision: u8,
2135    size_precision: u8,
2136    last_quotes: AHashMap<InstrumentId, QuoteTick>,
2137    mid_prices: Vec<f64>,
2138    bid_prices: Vec<f64>,
2139    ask_prices: Vec<f64>,
2140    vegas: Vec<f64>,
2141    bid_ask_spreads: Vec<f64>,
2142    bid_sizes: Vec<f64>,
2143    ask_sizes: Vec<f64>,
2144    handler: Box<dyn FnMut(QuoteTick)>,
2145    clock: Rc<RefCell<dyn Clock>>,
2146    historical_mode: bool,
2147    update_interval_seconds: Option<u64>,
2148    quote_build_delay: u64,
2149    has_update: bool,
2150    timer_name: String,
2151    historical_event_at_ts_init: Option<TimeEvent>,
2152    vega_provider: Option<Box<dyn VegaProvider>>,
2153    price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2154    is_running: bool,
2155    aggregator_weak: Option<Weak<RefCell<Self>>>,
2156}
2157
2158impl Debug for SpreadQuoteAggregator {
2159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2160        f.debug_struct(stringify!(SpreadQuoteAggregator))
2161            .field("spread_instrument_id", &self.spread_instrument_id)
2162            .field("n_legs", &self.n_legs)
2163            .field("is_futures_spread", &self.is_futures_spread)
2164            .field("update_interval_seconds", &self.update_interval_seconds)
2165            .finish()
2166    }
2167}
2168
2169impl SpreadQuoteAggregator {
2170    /// Creates a new [`SpreadQuoteAggregator`].
2171    ///
2172    /// # Panics
2173    ///
2174    /// Panics if `legs` has fewer than 2 entries or any ratio is zero.
2175    #[expect(clippy::too_many_arguments)]
2176    pub fn new(
2177        spread_instrument_id: InstrumentId,
2178        legs: &[(InstrumentId, i64)],
2179        is_futures_spread: bool,
2180        price_precision: u8,
2181        size_precision: u8,
2182        handler: Box<dyn FnMut(QuoteTick)>,
2183        clock: Rc<RefCell<dyn Clock>>,
2184        historical_mode: bool,
2185        update_interval_seconds: Option<u64>,
2186        quote_build_delay: u64,
2187        vega_provider: Option<Box<dyn VegaProvider>>,
2188        price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2189    ) -> Self {
2190        assert!(legs.len() >= 2, "Spread must have more than one leg");
2191        let n_legs = legs.len();
2192        let leg_ids: Vec<InstrumentId> = legs.iter().map(|(id, _)| *id).collect();
2193        let ratios: Vec<i64> = legs.iter().map(|(_, r)| *r).collect();
2194        for &r in &ratios {
2195            assert!(r != 0, "Ratio cannot be zero");
2196        }
2197        let timer_name = format!("SPREAD_QUOTE_{spread_instrument_id}");
2198        Self {
2199            spread_instrument_id,
2200            leg_ids,
2201            ratios,
2202            n_legs,
2203            is_futures_spread,
2204            price_precision,
2205            size_precision,
2206            last_quotes: AHashMap::new(),
2207            mid_prices: vec![0.0; n_legs],
2208            bid_prices: vec![0.0; n_legs],
2209            ask_prices: vec![0.0; n_legs],
2210            vegas: vec![0.0; n_legs],
2211            bid_ask_spreads: vec![0.0; n_legs],
2212            bid_sizes: vec![0.0; n_legs],
2213            ask_sizes: vec![0.0; n_legs],
2214            handler,
2215            clock,
2216            historical_mode,
2217            update_interval_seconds,
2218            quote_build_delay,
2219            has_update: false,
2220            timer_name,
2221            historical_event_at_ts_init: None,
2222            vega_provider,
2223            price_rounder,
2224            is_running: false,
2225            aggregator_weak: None,
2226        }
2227    }
2228
2229    /// Sets the weak reference to this aggregator (used when starting the timer so the callback can call back).
2230    /// Prefer [`Self::prepare_for_timer_mode`] so the owner passes the owning `Rc` in one step.
2231    pub fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Self>>) {
2232        self.aggregator_weak = Some(weak);
2233    }
2234
2235    /// One-step setup for timer-driven mode (live or historical). Call this with the `Rc` that owns
2236    /// this aggregator before feeding any quotes when `update_interval_seconds` is set. The timer
2237    /// callback will use the stored weak reference to call back into this aggregator; without this,
2238    /// [`Self::start_timer`] will panic in historical mode or when called with `None`.
2239    pub fn prepare_for_timer_mode(&mut self, self_rc: &Rc<RefCell<Self>>) {
2240        self.aggregator_weak = Some(Rc::downgrade(self_rc));
2241    }
2242
2243    /// Sets historical mode and handler (and optionally greeks provider when switching).
2244    pub fn set_historical_mode(
2245        &mut self,
2246        historical_mode: bool,
2247        handler: Box<dyn FnMut(QuoteTick)>,
2248        vega_provider: Option<Box<dyn VegaProvider>>,
2249    ) {
2250        self.historical_mode = historical_mode;
2251        self.handler = handler;
2252
2253        if let Some(vp) = vega_provider {
2254            self.vega_provider = Some(vp);
2255        }
2256    }
2257
2258    pub fn set_running(&mut self, is_running: bool) {
2259        self.is_running = is_running;
2260    }
2261
2262    pub fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
2263        self.clock = clock;
2264    }
2265
2266    /// Starts the timer when `update_interval_seconds` is set (timer-driven mode).
2267    /// In live mode pass `Some(rc)` so the weak is set and the timer can call back.
2268    /// In historical mode the owner must have called [`Self::prepare_for_timer_mode`] with the
2269    /// owning `Rc` before any quote is processed, then call with `None` here.
2270    ///
2271    /// # Panics
2272    ///
2273    /// Panics if called with `None` in timer mode without a prior [`Self::prepare_for_timer_mode`] call.
2274    pub fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Self>>>) {
2275        let Some(interval_secs) = self.update_interval_seconds else {
2276            return;
2277        };
2278        let aggregator_weak = if let Some(rc) = aggregator_rc {
2279            let weak = Rc::downgrade(&rc);
2280            self.aggregator_weak = Some(weak.clone());
2281            weak
2282        } else {
2283            self.aggregator_weak.clone().expect(
2284                "SpreadQuoteAggregator: timer mode requires prepare_for_timer_mode(rc) to be \
2285                 called first with the Rc that wraps this aggregator (before feeding quotes in \
2286                 historical mode or before start_timer(None)).",
2287            )
2288        };
2289
2290        let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
2291            if let Some(agg) = aggregator_weak.upgrade() {
2292                agg.borrow_mut().on_timer_fire(event.ts_event);
2293            }
2294        }));
2295
2296        let now_ns = self.clock.borrow().timestamp_ns();
2297        let interval_ns = interval_secs * 1_000_000_000;
2298        let start_ns = (now_ns.as_u64() / interval_ns) * interval_ns;
2299        let start_ns = start_ns + self.quote_build_delay * 1_000; // quote_build_delay in microseconds
2300        let start_time = UnixNanos::from(start_ns);
2301        let fire_immediately = now_ns == start_time;
2302        self.clock
2303            .borrow_mut()
2304            .set_timer_ns(
2305                &self.timer_name,
2306                interval_ns,
2307                Some(start_time),
2308                None,
2309                Some(callback),
2310                Some(true),
2311                Some(fire_immediately),
2312            )
2313            .expect("Failed to set spread quote timer");
2314    }
2315
2316    /// Called when the timer fires (live mode). Builds and sends a spread quote using the timer event timestamp.
2317    pub fn on_timer_fire(&mut self, ts_event: UnixNanos) {
2318        if self.last_quotes.len() == self.n_legs {
2319            self.build_and_send_quote(ts_event);
2320        }
2321    }
2322
2323    /// Stops the timer when in timer-driven mode.
2324    pub fn stop_timer(&mut self) {
2325        if self.update_interval_seconds.is_none() {
2326            return;
2327        }
2328
2329        if self
2330            .clock
2331            .borrow()
2332            .timer_names()
2333            .contains(&self.timer_name.as_str())
2334        {
2335            self.clock.borrow_mut().cancel_timer(&self.timer_name);
2336        }
2337    }
2338
2339    /// Handles an incoming leg quote (Cython `handle_quote_tick`).
2340    pub fn handle_quote_tick(&mut self, tick: QuoteTick) {
2341        let ts_init = tick.ts_init;
2342
2343        if self.update_interval_seconds.is_some() && self.historical_mode {
2344            self.process_historical_events(ts_init);
2345        }
2346        self.last_quotes.insert(tick.instrument_id, tick);
2347        self.has_update = true;
2348
2349        if self.update_interval_seconds.is_none() && self.last_quotes.len() == self.n_legs {
2350            self.build_and_send_quote(ts_init);
2351        }
2352    }
2353
2354    /// Flushes the deferred historical timer event, if any.
2355    ///
2356    /// This is intended for historical request finalization, where we know no more historical
2357    /// quotes will arrive for the requested range and should not require a later live tick just
2358    /// to release the final same-timestamp spread quote.
2359    pub fn flush_pending_historical_quote(&mut self) {
2360        if self.update_interval_seconds.is_none() || !self.historical_mode {
2361            return;
2362        }
2363
2364        let Some(event) = self.historical_event_at_ts_init.take() else {
2365            return;
2366        };
2367
2368        if self.last_quotes.len() == self.n_legs {
2369            self.build_and_send_quote(event.ts_event);
2370        }
2371    }
2372
2373    /// Advances the historical clock and collects timer events. Events at `ts_init` are
2374    /// deferred until the next call when time advances. The deferred event is only flushed
2375    /// when all legs have quotes and time has moved past the deferred timestamp. This
2376    /// prevents building a spread quote with stale leg data when multiple legs update at
2377    /// the same timestamp (Cython parity).
2378    fn process_historical_events(&mut self, ts_init: UnixNanos) {
2379        if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
2380            let mut clock_borrow = self.clock.borrow_mut();
2381            let test_clock = clock_borrow
2382                .as_any_mut()
2383                .downcast_mut::<TestClock>()
2384                .expect("Expected TestClock in historical mode");
2385            test_clock.set_time(ts_init);
2386            drop(clock_borrow);
2387            self.start_timer(None);
2388        }
2389
2390        if self.last_quotes.len() == self.n_legs
2391            && let Some(ref event) = self.historical_event_at_ts_init
2392            && event.ts_event < ts_init
2393        {
2394            // Guarded by `let Some(ref event)` above
2395            let event = self.historical_event_at_ts_init.take().unwrap();
2396            self.build_and_send_quote(event.ts_event);
2397        }
2398
2399        let events = {
2400            let mut clock_borrow = self.clock.borrow_mut();
2401            let test_clock = clock_borrow
2402                .as_any_mut()
2403                .downcast_mut::<TestClock>()
2404                .expect("Expected TestClock in historical mode");
2405            test_clock.advance_time(ts_init, true)
2406        };
2407
2408        for event in events {
2409            if event.ts_event == ts_init {
2410                self.historical_event_at_ts_init = Some(event);
2411            } else if self.last_quotes.len() == self.n_legs {
2412                self.build_and_send_quote(event.ts_event);
2413            }
2414        }
2415    }
2416
2417    /// Builds and sends one spread quote (Cython `_build_and_send_quote`).
2418    fn build_and_send_quote(&mut self, ts_event: UnixNanos) {
2419        if !self.has_update {
2420            return;
2421        }
2422
2423        for (idx, &leg_id) in self.leg_ids.iter().enumerate() {
2424            let Some(tick) = self.last_quotes.get(&leg_id) else {
2425                log::error!(
2426                    "SpreadQuoteAggregator[{}]: Missing quote for leg {}",
2427                    self.spread_instrument_id,
2428                    leg_id
2429                );
2430                return;
2431            };
2432            let ask_price = tick.ask_price.as_f64();
2433            let bid_price = tick.bid_price.as_f64();
2434            self.bid_prices[idx] = bid_price;
2435            self.ask_prices[idx] = ask_price;
2436            self.bid_sizes[idx] = tick.bid_size.as_f64();
2437            self.ask_sizes[idx] = tick.ask_size.as_f64();
2438
2439            if !self.is_futures_spread {
2440                self.mid_prices[idx] = (ask_price + bid_price) * 0.5;
2441                self.bid_ask_spreads[idx] = ask_price - bid_price;
2442
2443                if let Some(ref vp) = self.vega_provider
2444                    && let Some(vega) = vp.vega_for_leg(leg_id)
2445                {
2446                    self.vegas[idx] = vega;
2447                }
2448            }
2449        }
2450        let (raw_bid, raw_ask) = if self.is_futures_spread {
2451            self.create_futures_spread_prices()
2452        } else {
2453            self.create_option_spread_prices()
2454        };
2455        let spread_quote = self.create_quote_tick_from_raw_prices(raw_bid, raw_ask, ts_event);
2456        self.has_update = false;
2457        (self.handler)(spread_quote);
2458    }
2459
2460    fn create_option_spread_prices(&self) -> (f64, f64) {
2461        let vega_multipliers: Vec<f64> = (0..self.n_legs)
2462            .map(|i| {
2463                if self.vegas[i] == 0.0 {
2464                    0.0
2465                } else {
2466                    self.bid_ask_spreads[i] / self.vegas[i]
2467                }
2468            })
2469            .collect();
2470        let non_zero: Vec<f64> = vega_multipliers
2471            .iter()
2472            .copied()
2473            .filter(|&x| x != 0.0)
2474            .collect();
2475
2476        if non_zero.is_empty() {
2477            log::warn!(
2478                "No vega information available for the components of {}. Will generate spread quote using component quotes only",
2479                self.spread_instrument_id
2480            );
2481            return self.create_futures_spread_prices();
2482        }
2483        let vega_multiplier = non_zero.iter().map(|x| x.abs()).sum::<f64>() / non_zero.len() as f64;
2484        let spread_vega = self
2485            .vegas
2486            .iter()
2487            .zip(self.ratios.iter())
2488            .map(|(v, r)| v * (*r as f64))
2489            .sum::<f64>()
2490            .abs();
2491        let bid_ask_spread = spread_vega * vega_multiplier;
2492        let spread_mid_price: f64 = self
2493            .mid_prices
2494            .iter()
2495            .zip(self.ratios.iter())
2496            .map(|(m, r)| m * (*r as f64))
2497            .sum();
2498        let raw_bid = spread_mid_price - bid_ask_spread * 0.5;
2499        let raw_ask = spread_mid_price + bid_ask_spread * 0.5;
2500        (raw_bid, raw_ask)
2501    }
2502
2503    fn create_futures_spread_prices(&self) -> (f64, f64) {
2504        let mut raw_ask = 0.0_f64;
2505        let mut raw_bid = 0.0_f64;
2506
2507        for i in 0..self.n_legs {
2508            let r = self.ratios[i] as f64;
2509            if self.ratios[i] >= 0 {
2510                raw_ask += r * self.ask_prices[i];
2511                raw_bid += r * self.bid_prices[i];
2512            } else {
2513                raw_ask += r * self.bid_prices[i];
2514                raw_bid += r * self.ask_prices[i];
2515            }
2516        }
2517        (raw_bid, raw_ask)
2518    }
2519
2520    fn create_quote_tick_from_raw_prices(
2521        &self,
2522        raw_bid_price: f64,
2523        raw_ask_price: f64,
2524        ts_event: UnixNanos,
2525    ) -> QuoteTick {
2526        let (bid_price, ask_price) = if let Some(ref rounder) = self.price_rounder {
2527            rounder.round_prices(raw_bid_price, raw_ask_price, self.price_precision)
2528        } else {
2529            let bid = price_from_f64(raw_bid_price, self.price_precision);
2530            let ask = price_from_f64(raw_ask_price, self.price_precision);
2531            (bid, ask)
2532        };
2533        let mut min_bid_size = f64::INFINITY;
2534        let mut min_ask_size = f64::INFINITY;
2535        for i in 0..self.n_legs {
2536            let abs_ratio = self.ratios[i].unsigned_abs() as f64;
2537            if self.ratios[i] >= 0 {
2538                let b = self.bid_sizes[i] / abs_ratio;
2539                if b < min_bid_size {
2540                    min_bid_size = b;
2541                }
2542                let a = self.ask_sizes[i] / abs_ratio;
2543                if a < min_ask_size {
2544                    min_ask_size = a;
2545                }
2546            } else {
2547                let b = self.ask_sizes[i] / abs_ratio;
2548                if b < min_bid_size {
2549                    min_bid_size = b;
2550                }
2551                let a = self.bid_sizes[i] / abs_ratio;
2552                if a < min_ask_size {
2553                    min_ask_size = a;
2554                }
2555            }
2556        }
2557        let bid_size = Quantity::new(min_bid_size, self.size_precision);
2558        let ask_size = Quantity::new(min_ask_size, self.size_precision);
2559        QuoteTick::new(
2560            self.spread_instrument_id,
2561            bid_price,
2562            ask_price,
2563            bid_size,
2564            ask_size,
2565            ts_event,
2566            ts_event,
2567        )
2568    }
2569}
2570
2571fn price_from_f64(v: f64, precision: u8) -> Price {
2572    Price::new(v, precision)
2573}
2574
2575#[cfg(test)]
2576mod tests {
2577    use std::sync::{Arc, Mutex};
2578
2579    use nautilus_common::{clock::TestClock, timer::TimeEvent};
2580    use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
2581    use nautilus_model::{
2582        data::{BarSpecification, BarType, QuoteTick},
2583        enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
2584        identifiers::InstrumentId,
2585        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
2586        types::{Price, Quantity},
2587    };
2588    use rstest::rstest;
2589    use ustr::Ustr;
2590
2591    use super::*;
2592
2593    #[rstest]
2594    fn test_bar_builder_initialization(equity_aapl: Equity) {
2595        let instrument = InstrumentAny::Equity(equity_aapl);
2596        let bar_type = BarType::new(
2597            instrument.id(),
2598            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2599            AggregationSource::Internal,
2600        );
2601        let builder = BarBuilder::new(
2602            bar_type,
2603            instrument.price_precision(),
2604            instrument.size_precision(),
2605        );
2606
2607        assert!(!builder.initialized);
2608        assert_eq!(builder.ts_last, 0);
2609        assert_eq!(builder.count, 0);
2610    }
2611
2612    #[rstest]
2613    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
2614        let instrument = InstrumentAny::Equity(equity_aapl);
2615        let bar_type = BarType::new(
2616            instrument.id(),
2617            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2618            AggregationSource::Internal,
2619        );
2620        let mut builder = BarBuilder::new(
2621            bar_type,
2622            instrument.price_precision(),
2623            instrument.size_precision(),
2624        );
2625
2626        builder.update(
2627            Price::from("100.00"),
2628            Quantity::from(1),
2629            UnixNanos::from(1000),
2630        );
2631        builder.update(
2632            Price::from("95.00"),
2633            Quantity::from(1),
2634            UnixNanos::from(2000),
2635        );
2636        builder.update(
2637            Price::from("105.00"),
2638            Quantity::from(1),
2639            UnixNanos::from(3000),
2640        );
2641
2642        let bar = builder.build_now();
2643        assert!(bar.high > bar.low);
2644        assert_eq!(bar.open, Price::from("100.00"));
2645        assert_eq!(bar.high, Price::from("105.00"));
2646        assert_eq!(bar.low, Price::from("95.00"));
2647        assert_eq!(bar.close, Price::from("105.00"));
2648    }
2649
2650    #[rstest]
2651    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
2652        let instrument = InstrumentAny::Equity(equity_aapl);
2653        let bar_type = BarType::new(
2654            instrument.id(),
2655            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2656            AggregationSource::Internal,
2657        );
2658        let mut builder = BarBuilder::new(
2659            bar_type,
2660            instrument.price_precision(),
2661            instrument.size_precision(),
2662        );
2663
2664        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2665        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2666
2667        assert_eq!(builder.ts_last, 1_000);
2668        assert_eq!(builder.count, 1);
2669    }
2670
2671    #[rstest]
2672    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2673        let instrument = InstrumentAny::Equity(equity_aapl);
2674        let bar_type = BarType::new(
2675            instrument.id(),
2676            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2677            AggregationSource::Internal,
2678        );
2679        let mut builder = BarBuilder::new(
2680            bar_type,
2681            instrument.price_precision(),
2682            instrument.size_precision(),
2683        );
2684
2685        builder.update(
2686            Price::from("1.00000"),
2687            Quantity::from(1),
2688            UnixNanos::default(),
2689        );
2690
2691        assert!(builder.initialized);
2692        assert_eq!(builder.ts_last, 0);
2693        assert_eq!(builder.count, 1);
2694    }
2695
2696    #[rstest]
2697    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2698        equity_aapl: Equity,
2699    ) {
2700        let instrument = InstrumentAny::Equity(equity_aapl);
2701        let bar_type = BarType::new(
2702            instrument.id(),
2703            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2704            AggregationSource::Internal,
2705        );
2706        let mut builder = BarBuilder::new(bar_type, 2, 0);
2707
2708        builder.update(
2709            Price::from("1.00000"),
2710            Quantity::from(1),
2711            UnixNanos::from(1_000),
2712        );
2713        builder.update(
2714            Price::from("1.00001"),
2715            Quantity::from(1),
2716            UnixNanos::from(500),
2717        );
2718
2719        assert!(builder.initialized);
2720        assert_eq!(builder.ts_last, 1_000);
2721        assert_eq!(builder.count, 1);
2722    }
2723
2724    #[rstest]
2725    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2726        let instrument = InstrumentAny::Equity(equity_aapl);
2727        let bar_type = BarType::new(
2728            instrument.id(),
2729            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2730            AggregationSource::Internal,
2731        );
2732        let mut builder = BarBuilder::new(
2733            bar_type,
2734            instrument.price_precision(),
2735            instrument.size_precision(),
2736        );
2737
2738        for _ in 0..5 {
2739            builder.update(
2740                Price::from("1.00000"),
2741                Quantity::from(1),
2742                UnixNanos::from(1_000),
2743            );
2744        }
2745
2746        assert_eq!(builder.count, 5);
2747    }
2748
2749    #[rstest]
2750    #[should_panic]
2751    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2752        let instrument = InstrumentAny::Equity(equity_aapl);
2753        let bar_type = BarType::new(
2754            instrument.id(),
2755            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2756            AggregationSource::Internal,
2757        );
2758        let mut builder = BarBuilder::new(
2759            bar_type,
2760            instrument.price_precision(),
2761            instrument.size_precision(),
2762        );
2763        let _ = builder.build_now();
2764    }
2765
2766    #[rstest]
2767    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2768        let instrument = InstrumentAny::Equity(equity_aapl);
2769        let bar_type = BarType::new(
2770            instrument.id(),
2771            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2772            AggregationSource::Internal,
2773        );
2774        let mut builder = BarBuilder::new(
2775            bar_type,
2776            instrument.price_precision(),
2777            instrument.size_precision(),
2778        );
2779
2780        builder.update(
2781            Price::from("1.00001"),
2782            Quantity::from(2),
2783            UnixNanos::default(),
2784        );
2785        builder.update(
2786            Price::from("1.00002"),
2787            Quantity::from(2),
2788            UnixNanos::default(),
2789        );
2790        builder.update(
2791            Price::from("1.00000"),
2792            Quantity::from(1),
2793            UnixNanos::from(1_000_000_000),
2794        );
2795
2796        let bar = builder.build_now();
2797
2798        assert_eq!(bar.open, Price::from("1.00001"));
2799        assert_eq!(bar.high, Price::from("1.00002"));
2800        assert_eq!(bar.low, Price::from("1.00000"));
2801        assert_eq!(bar.close, Price::from("1.00000"));
2802        assert_eq!(bar.volume, Quantity::from(5));
2803        assert_eq!(bar.ts_init, 1_000_000_000);
2804        assert_eq!(builder.ts_last, 1_000_000_000);
2805        assert_eq!(builder.count, 0);
2806    }
2807
2808    #[rstest]
2809    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2810        let instrument = InstrumentAny::Equity(equity_aapl);
2811        let bar_type = BarType::new(
2812            instrument.id(),
2813            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2814            AggregationSource::Internal,
2815        );
2816        let mut builder = BarBuilder::new(bar_type, 2, 0);
2817
2818        builder.update(
2819            Price::from("1.00001"),
2820            Quantity::from(1),
2821            UnixNanos::default(),
2822        );
2823        builder.build_now();
2824
2825        builder.update(
2826            Price::from("1.00000"),
2827            Quantity::from(1),
2828            UnixNanos::default(),
2829        );
2830        builder.update(
2831            Price::from("1.00003"),
2832            Quantity::from(1),
2833            UnixNanos::default(),
2834        );
2835        builder.update(
2836            Price::from("1.00002"),
2837            Quantity::from(1),
2838            UnixNanos::default(),
2839        );
2840
2841        let bar = builder.build_now();
2842
2843        assert_eq!(bar.open, Price::from("1.00000"));
2844        assert_eq!(bar.high, Price::from("1.00003"));
2845        assert_eq!(bar.low, Price::from("1.00000"));
2846        assert_eq!(bar.close, Price::from("1.00002"));
2847        assert_eq!(bar.volume, Quantity::from(3));
2848    }
2849
2850    #[rstest]
2851    fn test_bar_builder_update_bar_initializes_then_accumulates(equity_aapl: Equity) {
2852        let instrument = InstrumentAny::Equity(equity_aapl);
2853        let bar_type = BarType::new(
2854            instrument.id(),
2855            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2856            AggregationSource::Internal,
2857        );
2858        let mut builder = BarBuilder::new(
2859            bar_type,
2860            instrument.price_precision(),
2861            instrument.size_precision(),
2862        );
2863
2864        let bar_one = Bar::new(
2865            bar_type,
2866            Price::from("100.00"),
2867            Price::from("102.00"),
2868            Price::from("99.00"),
2869            Price::from("101.00"),
2870            Quantity::from(10),
2871            UnixNanos::from(1_000),
2872            UnixNanos::from(1_000),
2873        );
2874        let bar_two = Bar::new(
2875            bar_type,
2876            Price::from("101.00"),
2877            Price::from("103.00"),
2878            Price::from("98.00"),
2879            Price::from("102.00"),
2880            Quantity::from(5),
2881            UnixNanos::from(2_000),
2882            UnixNanos::from(2_000),
2883        );
2884
2885        builder.update_bar(bar_one, bar_one.volume, bar_one.ts_init);
2886        builder.update_bar(bar_two, bar_two.volume, bar_two.ts_init);
2887        let bar = builder.build_now();
2888
2889        assert_eq!(bar.open, Price::from("100.00"));
2890        assert_eq!(bar.high, Price::from("103.00"));
2891        assert_eq!(bar.low, Price::from("98.00"));
2892        assert_eq!(bar.close, Price::from("102.00"));
2893        assert_eq!(bar.volume, Quantity::from(15));
2894        assert_eq!(builder.count, 0);
2895    }
2896
2897    #[rstest]
2898    fn test_bar_builder_update_bar_ignores_earlier_timestamp(equity_aapl: Equity) {
2899        let instrument = InstrumentAny::Equity(equity_aapl);
2900        let bar_type = BarType::new(
2901            instrument.id(),
2902            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2903            AggregationSource::Internal,
2904        );
2905        let mut builder = BarBuilder::new(
2906            bar_type,
2907            instrument.price_precision(),
2908            instrument.size_precision(),
2909        );
2910
2911        let bar_later = Bar::new(
2912            bar_type,
2913            Price::from("100.00"),
2914            Price::from("101.00"),
2915            Price::from("99.00"),
2916            Price::from("100.50"),
2917            Quantity::from(10),
2918            UnixNanos::from(2_000),
2919            UnixNanos::from(2_000),
2920        );
2921        let bar_earlier = Bar::new(
2922            bar_type,
2923            Price::from("200.00"),
2924            Price::from("210.00"),
2925            Price::from("190.00"),
2926            Price::from("205.00"),
2927            Quantity::from(50),
2928            UnixNanos::from(1_000),
2929            UnixNanos::from(1_000),
2930        );
2931
2932        builder.update_bar(bar_later, bar_later.volume, bar_later.ts_init);
2933        builder.update_bar(bar_earlier, bar_earlier.volume, bar_earlier.ts_init);
2934
2935        assert_eq!(builder.ts_last, 2_000);
2936        assert_eq!(builder.count, 1);
2937        assert_eq!(builder.volume, Quantity::from(10));
2938    }
2939
2940    #[rstest]
2941    #[case::spread_zero_inactive(
2942        Decimal::ZERO,
2943        ContinuousFutureAdjustmentType::BackwardSpread,
2944        false
2945    )]
2946    #[case::spread_positive_active(
2947        Decimal::new(150, 2), // 1.50
2948        ContinuousFutureAdjustmentType::BackwardSpread,
2949        true,
2950    )]
2951    #[case::spread_negative_active(
2952        Decimal::new(-250, 2), // -2.50
2953        ContinuousFutureAdjustmentType::ForwardSpread,
2954        true,
2955    )]
2956    #[case::spread_sub_precision_inactive(
2957        // 1e-28 scales to 0 raw under banker's rounding, so should be inactive.
2958        Decimal::new(1, 28),
2959        ContinuousFutureAdjustmentType::BackwardSpread,
2960        false,
2961    )]
2962    #[case::ratio_one_inactive(Decimal::ONE, ContinuousFutureAdjustmentType::BackwardRatio, false)]
2963    #[case::ratio_non_one_active(
2964        Decimal::new(105, 2), // 1.05
2965        ContinuousFutureAdjustmentType::ForwardRatio,
2966        true,
2967    )]
2968    fn test_bar_builder_set_adjustment_active_flag(
2969        equity_aapl: Equity,
2970        #[case] adjustment: Decimal,
2971        #[case] mode: ContinuousFutureAdjustmentType,
2972        #[case] expected_active: bool,
2973    ) {
2974        let instrument = InstrumentAny::Equity(equity_aapl);
2975        let bar_type = BarType::new(
2976            instrument.id(),
2977            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2978            AggregationSource::Internal,
2979        );
2980        let mut builder = BarBuilder::new(bar_type, 2, 0);
2981
2982        builder.set_adjustment(adjustment, mode);
2983
2984        assert_eq!(builder.adjustment_active, expected_active);
2985        assert_eq!(builder.adjustment_is_ratio, mode.is_ratio());
2986        assert_eq!(builder.adjustment_mode, mode);
2987    }
2988
2989    #[rstest]
2990    fn test_bar_builder_set_adjustment_mode_switch_resets_flags(equity_aapl: Equity) {
2991        let instrument = InstrumentAny::Equity(equity_aapl);
2992        let bar_type = BarType::new(
2993            instrument.id(),
2994            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2995            AggregationSource::Internal,
2996        );
2997        let mut builder = BarBuilder::new(bar_type, 2, 0);
2998
2999        // ratio -> spread: subsequent update must shift, not scale.
3000        builder.set_adjustment(
3001            Decimal::new(150, 2), // 1.50
3002            ContinuousFutureAdjustmentType::BackwardRatio,
3003        );
3004        builder.set_adjustment(
3005            Decimal::new(50, 2), // +0.50
3006            ContinuousFutureAdjustmentType::BackwardSpread,
3007        );
3008        assert!(!builder.adjustment_is_ratio);
3009        builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3010        assert_eq!(builder.build_now().close, Price::from("100.50"));
3011
3012        // spread -> ratio: subsequent update must scale, not shift.
3013        builder.set_adjustment(
3014            Decimal::new(11, 1), // 1.1
3015            ContinuousFutureAdjustmentType::ForwardRatio,
3016        );
3017        assert!(builder.adjustment_is_ratio);
3018        builder.update(Price::from("100.00"), Quantity::from(1), 2_000.into());
3019        assert_eq!(builder.build_now().close, Price::from("110.00"));
3020    }
3021
3022    #[rstest]
3023    fn test_bar_builder_update_applies_backward_spread_adjustment(equity_aapl: Equity) {
3024        let instrument = InstrumentAny::Equity(equity_aapl);
3025        let bar_type = BarType::new(
3026            instrument.id(),
3027            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3028            AggregationSource::Internal,
3029        );
3030        let mut builder = BarBuilder::new(bar_type, 2, 0);
3031
3032        builder.set_adjustment(
3033            Decimal::new(250, 2), // +2.50
3034            ContinuousFutureAdjustmentType::BackwardSpread,
3035        );
3036
3037        builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3038        builder.update(Price::from("99.00"), Quantity::from(1), 2_000.into());
3039        builder.update(Price::from("101.00"), Quantity::from(1), 3_000.into());
3040
3041        let bar = builder.build_now();
3042        assert_eq!(bar.open, Price::from("102.50"));
3043        assert_eq!(bar.high, Price::from("103.50"));
3044        assert_eq!(bar.low, Price::from("101.50"));
3045        assert_eq!(bar.close, Price::from("103.50"));
3046    }
3047
3048    #[rstest]
3049    fn test_bar_builder_update_applies_forward_ratio_adjustment(equity_aapl: Equity) {
3050        let instrument = InstrumentAny::Equity(equity_aapl);
3051        let bar_type = BarType::new(
3052            instrument.id(),
3053            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3054            AggregationSource::Internal,
3055        );
3056        let mut builder = BarBuilder::new(bar_type, 2, 0);
3057
3058        builder.set_adjustment(
3059            Decimal::new(11, 1), // 1.1
3060            ContinuousFutureAdjustmentType::ForwardRatio,
3061        );
3062
3063        builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3064        builder.update(Price::from("90.00"), Quantity::from(1), 2_000.into());
3065        builder.update(Price::from("110.00"), Quantity::from(1), 3_000.into());
3066
3067        let bar = builder.build_now();
3068        assert_eq!(bar.open, Price::from("110.00"));
3069        assert_eq!(bar.high, Price::from("121.00"));
3070        assert_eq!(bar.low, Price::from("99.00"));
3071        assert_eq!(bar.close, Price::from("121.00"));
3072    }
3073
3074    #[rstest]
3075    fn test_bar_builder_update_bar_applies_adjustment_to_ohlc(equity_aapl: Equity) {
3076        let instrument = InstrumentAny::Equity(equity_aapl);
3077        let bar_type = BarType::new(
3078            instrument.id(),
3079            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3080            AggregationSource::Internal,
3081        );
3082        let mut builder = BarBuilder::new(bar_type, 2, 0);
3083
3084        builder.set_adjustment(
3085            Decimal::new(-100, 2), // -1.00
3086            ContinuousFutureAdjustmentType::BackwardSpread,
3087        );
3088
3089        let input = Bar::new(
3090            bar_type,
3091            Price::from("100.00"),
3092            Price::from("105.00"),
3093            Price::from("99.00"),
3094            Price::from("102.00"),
3095            Quantity::from(10),
3096            UnixNanos::from(1_000),
3097            UnixNanos::from(1_000),
3098        );
3099        builder.update_bar(input, input.volume, input.ts_init);
3100
3101        let bar = builder.build_now();
3102        assert_eq!(bar.open, Price::from("99.00"));
3103        assert_eq!(bar.high, Price::from("104.00"));
3104        assert_eq!(bar.low, Price::from("98.00"));
3105        assert_eq!(bar.close, Price::from("101.00"));
3106    }
3107
3108    #[rstest]
3109    fn test_bar_builder_reset_retains_adjustment(equity_aapl: Equity) {
3110        let instrument = InstrumentAny::Equity(equity_aapl);
3111        let bar_type = BarType::new(
3112            instrument.id(),
3113            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3114            AggregationSource::Internal,
3115        );
3116        let mut builder = BarBuilder::new(bar_type, 2, 0);
3117
3118        builder.set_adjustment(
3119            Decimal::new(500, 2), // +5.00
3120            ContinuousFutureAdjustmentType::BackwardSpread,
3121        );
3122        builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3123        let bar_one = builder.build_now();
3124        assert_eq!(bar_one.close, Price::from("105.00"));
3125
3126        // Adjustment must persist across the reset triggered by build_now.
3127        assert!(builder.adjustment_active);
3128
3129        builder.update(Price::from("110.00"), Quantity::from(1), 2_000.into());
3130        let bar_two = builder.build_now();
3131        assert_eq!(bar_two.close, Price::from("115.00"));
3132    }
3133
3134    #[rstest]
3135    fn test_bar_builder_update_bar_applies_ratio_adjustment(equity_aapl: Equity) {
3136        let instrument = InstrumentAny::Equity(equity_aapl);
3137        let bar_type = BarType::new(
3138            instrument.id(),
3139            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3140            AggregationSource::Internal,
3141        );
3142        let mut builder = BarBuilder::new(bar_type, 2, 0);
3143
3144        builder.set_adjustment(
3145            Decimal::new(11, 1), // 1.1
3146            ContinuousFutureAdjustmentType::ForwardRatio,
3147        );
3148
3149        let input = Bar::new(
3150            bar_type,
3151            Price::from("100.00"),
3152            Price::from("110.00"),
3153            Price::from("90.00"),
3154            Price::from("105.00"),
3155            Quantity::from(10),
3156            UnixNanos::from(1_000),
3157            UnixNanos::from(1_000),
3158        );
3159        builder.update_bar(input, input.volume, input.ts_init);
3160
3161        let bar = builder.build_now();
3162        assert_eq!(bar.open, Price::from("110.00"));
3163        assert_eq!(bar.high, Price::from("121.00"));
3164        assert_eq!(bar.low, Price::from("99.00"));
3165        assert_eq!(bar.close, Price::from("115.50"));
3166    }
3167
3168    #[rstest]
3169    fn test_bar_builder_spread_below_zero_representable(equity_aapl: Equity) {
3170        // Cython documents that backward-spread offsets pushing prices below zero
3171        // remain representable in PriceRaw; verify the same on the Rust side.
3172        let instrument = InstrumentAny::Equity(equity_aapl);
3173        let bar_type = BarType::new(
3174            instrument.id(),
3175            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3176            AggregationSource::Internal,
3177        );
3178        let mut builder = BarBuilder::new(bar_type, 2, 0);
3179
3180        builder.set_adjustment(
3181            Decimal::new(-15000, 2), // -150.00
3182            ContinuousFutureAdjustmentType::BackwardSpread,
3183        );
3184
3185        builder.update(Price::from("100.00"), Quantity::from(1), 1_000.into());
3186        let bar = builder.build_now();
3187        assert_eq!(bar.close, Price::from("-50.00"));
3188        assert!(bar.close.raw < 0);
3189        assert_eq!(bar.close.precision, 2);
3190    }
3191
3192    #[rstest]
3193    fn test_bar_builder_build_promotes_close_above_high_from_previous_close(equity_aapl: Equity) {
3194        let instrument = InstrumentAny::Equity(equity_aapl);
3195        let bar_type = BarType::new(
3196            instrument.id(),
3197            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3198            AggregationSource::Internal,
3199        );
3200        let mut builder = BarBuilder::new(bar_type, 2, 0);
3201
3202        builder.update(
3203            Price::from("110.00"),
3204            Quantity::from(1),
3205            UnixNanos::from(100),
3206        );
3207        builder.build_now();
3208
3209        builder.update(
3210            Price::from("100.00"),
3211            Quantity::from(1),
3212            UnixNanos::from(200),
3213        );
3214        builder.update(
3215            Price::from("101.00"),
3216            Quantity::from(1),
3217            UnixNanos::from(300),
3218        );
3219        builder.update(
3220            Price::from("200.00"),
3221            Quantity::from(1),
3222            UnixNanos::from(400),
3223        );
3224
3225        let bar = builder.build_now();
3226        assert_eq!(bar.open, Price::from("100.00"));
3227        assert_eq!(bar.high, Price::from("200.00"));
3228        assert_eq!(bar.low, Price::from("100.00"));
3229        assert_eq!(bar.close, Price::from("200.00"));
3230    }
3231
3232    #[rstest]
3233    fn test_bar_builder_build_clamps_low_to_close(equity_aapl: Equity) {
3234        // Rust BarBuilder mirrors Cython: on `build`, if `close < low` the low is pulled down to close.
3235        // Reaching this branch requires bypassing `update`'s low tracking (e.g. via bar updates where
3236        // a later bar's close is below the accumulated low). We simulate by direct field assignment.
3237        let instrument = InstrumentAny::Equity(equity_aapl);
3238        let bar_type = BarType::new(
3239            instrument.id(),
3240            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
3241            AggregationSource::Internal,
3242        );
3243        let mut builder = BarBuilder::new(bar_type, 2, 0);
3244
3245        builder.update(
3246            Price::from("100.00"),
3247            Quantity::from(1),
3248            UnixNanos::from(100),
3249        );
3250        builder.close = Some(Price::from("50.00"));
3251
3252        let bar = builder.build_now();
3253        assert_eq!(bar.low, Price::from("50.00"));
3254        assert_eq!(bar.close, Price::from("50.00"));
3255        assert!(bar.low <= bar.open);
3256    }
3257
3258    #[rstest]
3259    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
3260        let instrument = InstrumentAny::Equity(equity_aapl);
3261        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3262        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3263        let handler = Arc::new(Mutex::new(Vec::new()));
3264        let handler_clone = Arc::clone(&handler);
3265
3266        let mut aggregator = TickBarAggregator::new(
3267            bar_type,
3268            instrument.price_precision(),
3269            instrument.size_precision(),
3270            move |bar: Bar| {
3271                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3272                handler_guard.push(bar);
3273            },
3274        );
3275
3276        let trade = TradeTick::default();
3277        aggregator.handle_trade(trade);
3278
3279        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3280        assert_eq!(handler_guard.len(), 0);
3281    }
3282
3283    #[rstest]
3284    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
3285        let instrument = InstrumentAny::Equity(equity_aapl);
3286        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3287        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3288        let handler = Arc::new(Mutex::new(Vec::new()));
3289        let handler_clone = Arc::clone(&handler);
3290
3291        let mut aggregator = TickBarAggregator::new(
3292            bar_type,
3293            instrument.price_precision(),
3294            instrument.size_precision(),
3295            move |bar: Bar| {
3296                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3297                handler_guard.push(bar);
3298            },
3299        );
3300
3301        let trade = TradeTick::default();
3302        aggregator.handle_trade(trade);
3303        aggregator.handle_trade(trade);
3304        aggregator.handle_trade(trade);
3305
3306        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3307        let bar = handler_guard.first().unwrap();
3308        assert_eq!(handler_guard.len(), 1);
3309        assert_eq!(bar.open, trade.price);
3310        assert_eq!(bar.high, trade.price);
3311        assert_eq!(bar.low, trade.price);
3312        assert_eq!(bar.close, trade.price);
3313        assert_eq!(bar.volume, Quantity::from(300000));
3314        assert_eq!(bar.ts_event, trade.ts_event);
3315        assert_eq!(bar.ts_init, trade.ts_init);
3316    }
3317
3318    #[rstest]
3319    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
3320        let instrument = InstrumentAny::Equity(equity_aapl);
3321        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
3322        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3323        let handler = Arc::new(Mutex::new(Vec::new()));
3324        let handler_clone = Arc::clone(&handler);
3325
3326        let mut aggregator = TickBarAggregator::new(
3327            bar_type,
3328            instrument.price_precision(),
3329            instrument.size_precision(),
3330            move |bar: Bar| {
3331                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3332                handler_guard.push(bar);
3333            },
3334        );
3335
3336        aggregator.update(
3337            Price::from("1.00001"),
3338            Quantity::from(1),
3339            UnixNanos::default(),
3340        );
3341        aggregator.update(
3342            Price::from("1.00002"),
3343            Quantity::from(1),
3344            UnixNanos::from(1000),
3345        );
3346        aggregator.update(
3347            Price::from("1.00003"),
3348            Quantity::from(1),
3349            UnixNanos::from(2000),
3350        );
3351
3352        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3353        assert_eq!(handler_guard.len(), 1);
3354
3355        let bar = handler_guard.first().unwrap();
3356        assert_eq!(bar.open, Price::from("1.00001"));
3357        assert_eq!(bar.high, Price::from("1.00003"));
3358        assert_eq!(bar.low, Price::from("1.00001"));
3359        assert_eq!(bar.close, Price::from("1.00003"));
3360        assert_eq!(bar.volume, Quantity::from(3));
3361    }
3362
3363    #[rstest]
3364    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
3365        let instrument = InstrumentAny::Equity(equity_aapl);
3366        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
3367        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3368        let handler = Arc::new(Mutex::new(Vec::new()));
3369        let handler_clone = Arc::clone(&handler);
3370
3371        let mut aggregator = TickBarAggregator::new(
3372            bar_type,
3373            instrument.price_precision(),
3374            instrument.size_precision(),
3375            move |bar: Bar| {
3376                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3377                handler_guard.push(bar);
3378            },
3379        );
3380
3381        aggregator.update(
3382            Price::from("1.00001"),
3383            Quantity::from(1),
3384            UnixNanos::default(),
3385        );
3386        aggregator.update(
3387            Price::from("1.00002"),
3388            Quantity::from(1),
3389            UnixNanos::from(1000),
3390        );
3391        aggregator.update(
3392            Price::from("1.00003"),
3393            Quantity::from(1),
3394            UnixNanos::from(2000),
3395        );
3396        aggregator.update(
3397            Price::from("1.00004"),
3398            Quantity::from(1),
3399            UnixNanos::from(3000),
3400        );
3401
3402        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3403        assert_eq!(handler_guard.len(), 2);
3404
3405        let bar1 = &handler_guard[0];
3406        assert_eq!(bar1.open, Price::from("1.00001"));
3407        assert_eq!(bar1.close, Price::from("1.00002"));
3408        assert_eq!(bar1.volume, Quantity::from(2));
3409
3410        let bar2 = &handler_guard[1];
3411        assert_eq!(bar2.open, Price::from("1.00003"));
3412        assert_eq!(bar2.close, Price::from("1.00004"));
3413        assert_eq!(bar2.volume, Quantity::from(2));
3414    }
3415
3416    #[rstest]
3417    fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
3418        let instrument = InstrumentAny::Equity(equity_aapl);
3419        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3420        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3421        let handler = Arc::new(Mutex::new(Vec::new()));
3422        let handler_clone = Arc::clone(&handler);
3423
3424        let mut aggregator = TickImbalanceBarAggregator::new(
3425            bar_type,
3426            instrument.price_precision(),
3427            instrument.size_precision(),
3428            move |bar: Bar| {
3429                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3430                handler_guard.push(bar);
3431            },
3432        );
3433
3434        let trade = TradeTick::default();
3435        aggregator.handle_trade(trade);
3436        aggregator.handle_trade(trade);
3437
3438        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3439        assert_eq!(handler_guard.len(), 1);
3440        let bar = handler_guard.first().unwrap();
3441        assert_eq!(bar.volume, Quantity::from(200000));
3442    }
3443
3444    #[rstest]
3445    fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
3446        let instrument = InstrumentAny::Equity(equity_aapl);
3447        let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
3448        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3449        let handler = Arc::new(Mutex::new(Vec::new()));
3450        let handler_clone = Arc::clone(&handler);
3451
3452        let mut aggregator = TickImbalanceBarAggregator::new(
3453            bar_type,
3454            instrument.price_precision(),
3455            instrument.size_precision(),
3456            move |bar: Bar| {
3457                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3458                handler_guard.push(bar);
3459            },
3460        );
3461
3462        let sell = TradeTick {
3463            aggressor_side: AggressorSide::Seller,
3464            ..TradeTick::default()
3465        };
3466
3467        aggregator.handle_trade(sell);
3468
3469        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3470        assert_eq!(handler_guard.len(), 1);
3471    }
3472
3473    #[rstest]
3474    fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
3475        let instrument = InstrumentAny::Equity(equity_aapl);
3476        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3477        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3478        let handler = Arc::new(Mutex::new(Vec::new()));
3479        let handler_clone = Arc::clone(&handler);
3480
3481        let mut aggregator = TickRunsBarAggregator::new(
3482            bar_type,
3483            instrument.price_precision(),
3484            instrument.size_precision(),
3485            move |bar: Bar| {
3486                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3487                handler_guard.push(bar);
3488            },
3489        );
3490
3491        let buy = TradeTick::default();
3492        let sell = TradeTick {
3493            aggressor_side: AggressorSide::Seller,
3494            ..buy
3495        };
3496
3497        aggregator.handle_trade(buy);
3498        aggregator.handle_trade(buy);
3499        aggregator.handle_trade(sell);
3500        aggregator.handle_trade(sell);
3501
3502        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3503        assert_eq!(handler_guard.len(), 2);
3504    }
3505
3506    #[rstest]
3507    fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
3508        let instrument = InstrumentAny::Equity(equity_aapl);
3509        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3510        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3511        let handler = Arc::new(Mutex::new(Vec::new()));
3512        let handler_clone = Arc::clone(&handler);
3513
3514        let mut aggregator = TickRunsBarAggregator::new(
3515            bar_type,
3516            instrument.price_precision(),
3517            instrument.size_precision(),
3518            move |bar: Bar| {
3519                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3520                handler_guard.push(bar);
3521            },
3522        );
3523
3524        let buy = TradeTick {
3525            size: Quantity::from(1),
3526            ..TradeTick::default()
3527        };
3528        let sell = TradeTick {
3529            aggressor_side: AggressorSide::Seller,
3530            size: Quantity::from(1),
3531            ..buy
3532        };
3533
3534        aggregator.handle_trade(buy);
3535        aggregator.handle_trade(buy);
3536        aggregator.handle_trade(sell);
3537        aggregator.handle_trade(sell);
3538
3539        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3540        assert_eq!(handler_guard.len(), 2);
3541        assert_eq!(handler_guard[0].volume, Quantity::from(2));
3542        assert_eq!(handler_guard[1].volume, Quantity::from(2));
3543    }
3544
3545    #[rstest]
3546    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
3547        let instrument = InstrumentAny::Equity(equity_aapl);
3548        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3549        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3550        let handler = Arc::new(Mutex::new(Vec::new()));
3551        let handler_clone = Arc::clone(&handler);
3552
3553        let mut aggregator = VolumeBarAggregator::new(
3554            bar_type,
3555            instrument.price_precision(),
3556            instrument.size_precision(),
3557            move |bar: Bar| {
3558                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3559                handler_guard.push(bar);
3560            },
3561        );
3562
3563        aggregator.update(
3564            Price::from("1.00001"),
3565            Quantity::from(25),
3566            UnixNanos::default(),
3567        );
3568
3569        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3570        assert_eq!(handler_guard.len(), 2);
3571        let bar1 = &handler_guard[0];
3572        assert_eq!(bar1.volume, Quantity::from(10));
3573        let bar2 = &handler_guard[1];
3574        assert_eq!(bar2.volume, Quantity::from(10));
3575    }
3576
3577    #[rstest]
3578    fn test_volume_bar_aggregator_zero_size_update_is_noop(equity_aapl: Equity) {
3579        let instrument = InstrumentAny::Equity(equity_aapl);
3580        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3581        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3582        let handler = Arc::new(Mutex::new(Vec::new()));
3583        let handler_clone = Arc::clone(&handler);
3584
3585        let mut aggregator = VolumeBarAggregator::new(
3586            bar_type,
3587            instrument.price_precision(),
3588            instrument.size_precision(),
3589            move |bar: Bar| {
3590                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3591                handler_guard.push(bar);
3592            },
3593        );
3594
3595        aggregator.update(
3596            Price::from("100.00"),
3597            Quantity::from(0),
3598            UnixNanos::default(),
3599        );
3600
3601        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3602        assert_eq!(handler_guard.len(), 0);
3603    }
3604
3605    #[rstest]
3606    fn test_volume_bar_aggregator_exact_threshold_emits_single_bar(equity_aapl: Equity) {
3607        let instrument = InstrumentAny::Equity(equity_aapl);
3608        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3609        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3610        let handler = Arc::new(Mutex::new(Vec::new()));
3611        let handler_clone = Arc::clone(&handler);
3612
3613        let mut aggregator = VolumeBarAggregator::new(
3614            bar_type,
3615            instrument.price_precision(),
3616            instrument.size_precision(),
3617            move |bar: Bar| {
3618                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3619                handler_guard.push(bar);
3620            },
3621        );
3622
3623        aggregator.update(
3624            Price::from("100.00"),
3625            Quantity::from(7),
3626            UnixNanos::from(1_000),
3627        );
3628        aggregator.update(
3629            Price::from("101.00"),
3630            Quantity::from(3),
3631            UnixNanos::from(2_000),
3632        );
3633
3634        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3635        assert_eq!(handler_guard.len(), 1);
3636        assert_eq!(handler_guard[0].volume, Quantity::from(10));
3637        assert_eq!(handler_guard[0].close, Price::from("101.00"));
3638    }
3639
3640    #[rstest]
3641    fn test_volume_bar_aggregator_step_of_one_emits_per_unit(equity_aapl: Equity) {
3642        let instrument = InstrumentAny::Equity(equity_aapl);
3643        let bar_spec = BarSpecification::new(1, BarAggregation::Volume, PriceType::Last);
3644        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3645        let handler = Arc::new(Mutex::new(Vec::new()));
3646        let handler_clone = Arc::clone(&handler);
3647
3648        let mut aggregator = VolumeBarAggregator::new(
3649            bar_type,
3650            instrument.price_precision(),
3651            instrument.size_precision(),
3652            move |bar: Bar| {
3653                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3654                handler_guard.push(bar);
3655            },
3656        );
3657
3658        aggregator.update(
3659            Price::from("100.00"),
3660            Quantity::from(1),
3661            UnixNanos::default(),
3662        );
3663
3664        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3665        assert_eq!(handler_guard.len(), 1);
3666        assert_eq!(handler_guard[0].volume, Quantity::from(1));
3667    }
3668
3669    #[rstest]
3670    fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
3671        let instrument = InstrumentAny::Equity(equity_aapl);
3672        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
3673        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3674        let handler = Arc::new(Mutex::new(Vec::new()));
3675        let handler_clone = Arc::clone(&handler);
3676
3677        let mut aggregator = VolumeRunsBarAggregator::new(
3678            bar_type,
3679            instrument.price_precision(),
3680            instrument.size_precision(),
3681            move |bar: Bar| {
3682                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3683                handler_guard.push(bar);
3684            },
3685        );
3686
3687        let buy = TradeTick {
3688            instrument_id: instrument.id(),
3689            price: Price::from("1.0"),
3690            size: Quantity::from(1),
3691            ..TradeTick::default()
3692        };
3693        let sell = TradeTick {
3694            aggressor_side: AggressorSide::Seller,
3695            ..buy
3696        };
3697
3698        aggregator.handle_trade(buy);
3699        aggregator.handle_trade(buy); // emit first bar at 2
3700        aggregator.handle_trade(sell);
3701        aggregator.handle_trade(sell); // emit second bar at 2 sell-side
3702
3703        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3704        assert!(handler_guard.len() >= 2);
3705        assert!(
3706            (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
3707                < f64::EPSILON
3708        );
3709    }
3710
3711    #[rstest]
3712    fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
3713        let instrument = InstrumentAny::Equity(equity_aapl);
3714        let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
3715        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3716        let handler = Arc::new(Mutex::new(Vec::new()));
3717        let handler_clone = Arc::clone(&handler);
3718
3719        let mut aggregator = VolumeRunsBarAggregator::new(
3720            bar_type,
3721            instrument.price_precision(),
3722            instrument.size_precision(),
3723            move |bar: Bar| {
3724                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3725                handler_guard.push(bar);
3726            },
3727        );
3728
3729        let trade = TradeTick {
3730            instrument_id: instrument.id(),
3731            price: Price::from("1.0"),
3732            size: Quantity::from(5),
3733            ..TradeTick::default()
3734        };
3735
3736        aggregator.handle_trade(trade);
3737
3738        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3739        assert!(!handler_guard.is_empty());
3740        assert!(handler_guard[0].volume.as_f64() > 0.0);
3741        assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
3742    }
3743
3744    #[rstest]
3745    fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
3746        let instrument = InstrumentAny::Equity(equity_aapl);
3747        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
3748        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3749        let handler = Arc::new(Mutex::new(Vec::new()));
3750        let handler_clone = Arc::clone(&handler);
3751
3752        let mut aggregator = VolumeImbalanceBarAggregator::new(
3753            bar_type,
3754            instrument.price_precision(),
3755            instrument.size_precision(),
3756            move |bar: Bar| {
3757                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3758                handler_guard.push(bar);
3759            },
3760        );
3761
3762        let trade_small = TradeTick {
3763            instrument_id: instrument.id(),
3764            price: Price::from("1.0"),
3765            size: Quantity::from(1),
3766            ..TradeTick::default()
3767        };
3768        let trade_large = TradeTick {
3769            size: Quantity::from(3),
3770            ..trade_small
3771        };
3772
3773        aggregator.handle_trade(trade_small);
3774        aggregator.handle_trade(trade_large);
3775
3776        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3777        assert_eq!(handler_guard.len(), 2);
3778        let total_output = handler_guard
3779            .iter()
3780            .map(|bar| bar.volume.as_f64())
3781            .sum::<f64>();
3782        let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
3783        assert!((total_output - total_input).abs() < f64::EPSILON);
3784    }
3785
3786    #[rstest]
3787    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
3788        let instrument = InstrumentAny::Equity(equity_aapl);
3789        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
3790        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3791        let handler = Arc::new(Mutex::new(Vec::new()));
3792        let handler_clone = Arc::clone(&handler);
3793
3794        let mut aggregator = ValueBarAggregator::new(
3795            bar_type,
3796            instrument.price_precision(),
3797            instrument.size_precision(),
3798            move |bar: Bar| {
3799                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3800                handler_guard.push(bar);
3801            },
3802        );
3803
3804        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
3805        aggregator.update(
3806            Price::from("100.00"),
3807            Quantity::from(5),
3808            UnixNanos::default(),
3809        );
3810        aggregator.update(
3811            Price::from("100.00"),
3812            Quantity::from(5),
3813            UnixNanos::from(1000),
3814        );
3815
3816        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3817        assert_eq!(handler_guard.len(), 1);
3818        let bar = handler_guard.first().unwrap();
3819        assert_eq!(bar.volume, Quantity::from(10));
3820    }
3821
3822    #[rstest]
3823    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
3824        let instrument = InstrumentAny::Equity(equity_aapl);
3825        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3826        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3827        let handler = Arc::new(Mutex::new(Vec::new()));
3828        let handler_clone = Arc::clone(&handler);
3829
3830        let mut aggregator = ValueBarAggregator::new(
3831            bar_type,
3832            instrument.price_precision(),
3833            instrument.size_precision(),
3834            move |bar: Bar| {
3835                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3836                handler_guard.push(bar);
3837            },
3838        );
3839
3840        // Single large update: $100 * 25 = $2500 (should create 2 bars)
3841        aggregator.update(
3842            Price::from("100.00"),
3843            Quantity::from(25),
3844            UnixNanos::default(),
3845        );
3846
3847        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3848        assert_eq!(handler_guard.len(), 2);
3849        let remaining_value = aggregator.get_cumulative_value();
3850        assert!(remaining_value < 1000.0); // Should be less than threshold
3851    }
3852
3853    #[rstest]
3854    fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
3855        let instrument = InstrumentAny::Equity(equity_aapl);
3856        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3857        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3858        let handler = Arc::new(Mutex::new(Vec::new()));
3859        let handler_clone = Arc::clone(&handler);
3860
3861        let mut aggregator = ValueBarAggregator::new(
3862            bar_type,
3863            instrument.price_precision(),
3864            instrument.size_precision(),
3865            move |bar: Bar| {
3866                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3867                handler_guard.push(bar);
3868            },
3869        );
3870
3871        // Update with zero price should not cause division by zero
3872        aggregator.update(
3873            Price::from("0.00"),
3874            Quantity::from(100),
3875            UnixNanos::default(),
3876        );
3877
3878        // No bars should be emitted since value is zero
3879        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3880        assert_eq!(handler_guard.len(), 0);
3881
3882        // Cumulative value should remain zero
3883        assert_eq!(aggregator.get_cumulative_value(), 0.0);
3884    }
3885
3886    #[rstest]
3887    fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
3888        let instrument = InstrumentAny::Equity(equity_aapl);
3889        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3890        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3891        let handler = Arc::new(Mutex::new(Vec::new()));
3892        let handler_clone = Arc::clone(&handler);
3893
3894        let mut aggregator = ValueBarAggregator::new(
3895            bar_type,
3896            instrument.price_precision(),
3897            instrument.size_precision(),
3898            move |bar: Bar| {
3899                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3900                handler_guard.push(bar);
3901            },
3902        );
3903
3904        // Update with zero size should not cause issues
3905        aggregator.update(
3906            Price::from("100.00"),
3907            Quantity::from(0),
3908            UnixNanos::default(),
3909        );
3910
3911        // No bars should be emitted
3912        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3913        assert_eq!(handler_guard.len(), 0);
3914
3915        // Cumulative value should remain zero
3916        assert_eq!(aggregator.get_cumulative_value(), 0.0);
3917    }
3918
3919    #[rstest]
3920    fn test_value_bar_aggregator_exact_threshold_emits_one_bar(equity_aapl: Equity) {
3921        let instrument = InstrumentAny::Equity(equity_aapl);
3922        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3923        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3924        let handler = Arc::new(Mutex::new(Vec::new()));
3925        let handler_clone = Arc::clone(&handler);
3926
3927        let mut aggregator = ValueBarAggregator::new(
3928            bar_type,
3929            instrument.price_precision(),
3930            instrument.size_precision(),
3931            move |bar: Bar| {
3932                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3933                handler_guard.push(bar);
3934            },
3935        );
3936
3937        aggregator.update(
3938            Price::from("100.00"),
3939            Quantity::from(5),
3940            UnixNanos::from(1_000),
3941        );
3942        aggregator.update(
3943            Price::from("100.00"),
3944            Quantity::from(5),
3945            UnixNanos::from(2_000),
3946        );
3947
3948        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3949        assert_eq!(handler_guard.len(), 1);
3950        assert_eq!(handler_guard[0].volume, Quantity::from(10));
3951        assert_eq!(aggregator.get_cumulative_value(), 0.0);
3952    }
3953
3954    #[rstest]
3955    fn test_value_bar_aggregator_precision_boundary_min_size_clamp(equity_aapl: Equity) {
3956        // step=100, price=100 per-unit value=100 with size_precision=0 lands the divided
3957        // size_chunk at the precision floor. Verifies the min-size clamp branch in update()
3958        // emits one bar per unit rather than looping on zero-volume chunks.
3959        let instrument = InstrumentAny::Equity(equity_aapl);
3960        let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
3961        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3962        let handler = Arc::new(Mutex::new(Vec::new()));
3963        let handler_clone = Arc::clone(&handler);
3964
3965        let mut aggregator = ValueBarAggregator::new(
3966            bar_type,
3967            instrument.price_precision(),
3968            instrument.size_precision(),
3969            move |bar: Bar| {
3970                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3971                handler_guard.push(bar);
3972            },
3973        );
3974
3975        // 4 units at $100 = $400 value, with step $100 gives 4 bars exactly.
3976        aggregator.update(
3977            Price::from("100.00"),
3978            Quantity::from(4),
3979            UnixNanos::default(),
3980        );
3981
3982        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3983        assert_eq!(handler_guard.len(), 4);
3984        for bar in handler_guard.iter() {
3985            assert_eq!(bar.volume, Quantity::from(1));
3986        }
3987    }
3988
3989    #[rstest]
3990    fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
3991        let instrument = InstrumentAny::Equity(equity_aapl);
3992        let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
3993        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3994        let handler = Arc::new(Mutex::new(Vec::new()));
3995        let handler_clone = Arc::clone(&handler);
3996
3997        let mut aggregator = ValueImbalanceBarAggregator::new(
3998            bar_type,
3999            instrument.price_precision(),
4000            instrument.size_precision(),
4001            move |bar: Bar| {
4002                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4003                handler_guard.push(bar);
4004            },
4005        );
4006
4007        let buy = TradeTick {
4008            price: Price::from("5.0"),
4009            size: Quantity::from(2), // value 10, should emit one bar
4010            instrument_id: instrument.id(),
4011            ..TradeTick::default()
4012        };
4013        let sell = TradeTick {
4014            price: Price::from("5.0"),
4015            size: Quantity::from(2), // value 10, should emit another bar
4016            aggressor_side: AggressorSide::Seller,
4017            instrument_id: instrument.id(),
4018            ..buy
4019        };
4020
4021        aggregator.handle_trade(buy);
4022        aggregator.handle_trade(sell);
4023
4024        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4025        assert_eq!(handler_guard.len(), 2);
4026    }
4027
4028    #[rstest]
4029    fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
4030        let instrument = InstrumentAny::Equity(equity_aapl);
4031        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4032        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4033        let handler = Arc::new(Mutex::new(Vec::new()));
4034        let handler_clone = Arc::clone(&handler);
4035
4036        let mut aggregator = ValueRunsBarAggregator::new(
4037            bar_type,
4038            instrument.price_precision(),
4039            instrument.size_precision(),
4040            move |bar: Bar| {
4041                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4042                handler_guard.push(bar);
4043            },
4044        );
4045
4046        let trade = TradeTick {
4047            price: Price::from("10.0"),
4048            size: Quantity::from(5),
4049            instrument_id: instrument.id(),
4050            ..TradeTick::default()
4051        };
4052
4053        aggregator.handle_trade(trade);
4054        aggregator.handle_trade(trade);
4055
4056        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4057        assert_eq!(handler_guard.len(), 1);
4058        let bar = handler_guard.first().unwrap();
4059        assert_eq!(bar.volume, Quantity::from(10));
4060    }
4061
4062    #[rstest]
4063    fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
4064        let instrument = InstrumentAny::Equity(equity_aapl);
4065        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4066        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4067        let handler = Arc::new(Mutex::new(Vec::new()));
4068        let handler_clone = Arc::clone(&handler);
4069
4070        let mut aggregator = ValueRunsBarAggregator::new(
4071            bar_type,
4072            instrument.price_precision(),
4073            instrument.size_precision(),
4074            move |bar: Bar| {
4075                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4076                handler_guard.push(bar);
4077            },
4078        );
4079
4080        let buy = TradeTick {
4081            price: Price::from("10.0"),
4082            size: Quantity::from(5),
4083            instrument_id: instrument.id(),
4084            ..TradeTick::default()
4085        }; // value 50
4086        let sell = TradeTick {
4087            price: Price::from("10.0"),
4088            size: Quantity::from(10),
4089            aggressor_side: AggressorSide::Seller,
4090            ..buy
4091        }; // value 100
4092
4093        aggregator.handle_trade(buy);
4094        aggregator.handle_trade(sell);
4095
4096        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4097        assert_eq!(handler_guard.len(), 1);
4098        assert_eq!(handler_guard[0].volume, Quantity::from(10));
4099    }
4100
4101    #[rstest]
4102    fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4103        let instrument = InstrumentAny::Equity(equity_aapl);
4104        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
4105        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4106        let handler = Arc::new(Mutex::new(Vec::new()));
4107        let handler_clone = Arc::clone(&handler);
4108
4109        let mut aggregator = TickRunsBarAggregator::new(
4110            bar_type,
4111            instrument.price_precision(),
4112            instrument.size_precision(),
4113            move |bar: Bar| {
4114                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4115                handler_guard.push(bar);
4116            },
4117        );
4118
4119        let buy = TradeTick::default();
4120
4121        aggregator.handle_trade(buy);
4122        aggregator.handle_trade(buy); // Emit bar 1 (run complete)
4123        aggregator.handle_trade(buy); // Start new run
4124        aggregator.handle_trade(buy); // Emit bar 2 (new run complete)
4125
4126        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4127        assert_eq!(handler_guard.len(), 2);
4128    }
4129
4130    #[rstest]
4131    fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
4132        let instrument = InstrumentAny::Equity(equity_aapl);
4133        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
4134        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4135        let handler = Arc::new(Mutex::new(Vec::new()));
4136        let handler_clone = Arc::clone(&handler);
4137
4138        let mut aggregator = TickRunsBarAggregator::new(
4139            bar_type,
4140            instrument.price_precision(),
4141            instrument.size_precision(),
4142            move |bar: Bar| {
4143                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4144                handler_guard.push(bar);
4145            },
4146        );
4147
4148        let buy = TradeTick::default();
4149        let no_aggressor = TradeTick {
4150            aggressor_side: AggressorSide::NoAggressor,
4151            ..buy
4152        };
4153
4154        aggregator.handle_trade(buy);
4155        aggregator.handle_trade(no_aggressor); // Should not affect run count
4156        aggregator.handle_trade(no_aggressor); // Should not affect run count
4157        aggregator.handle_trade(buy); // Continue run to threshold
4158
4159        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4160        assert_eq!(handler_guard.len(), 1);
4161    }
4162
4163    #[rstest]
4164    fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4165        let instrument = InstrumentAny::Equity(equity_aapl);
4166        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
4167        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4168        let handler = Arc::new(Mutex::new(Vec::new()));
4169        let handler_clone = Arc::clone(&handler);
4170
4171        let mut aggregator = VolumeRunsBarAggregator::new(
4172            bar_type,
4173            instrument.price_precision(),
4174            instrument.size_precision(),
4175            move |bar: Bar| {
4176                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4177                handler_guard.push(bar);
4178            },
4179        );
4180
4181        let buy = TradeTick {
4182            instrument_id: instrument.id(),
4183            price: Price::from("1.0"),
4184            size: Quantity::from(1),
4185            ..TradeTick::default()
4186        };
4187
4188        aggregator.handle_trade(buy);
4189        aggregator.handle_trade(buy); // Emit bar 1 (2.0 volume reached)
4190        aggregator.handle_trade(buy); // Start new run
4191        aggregator.handle_trade(buy); // Emit bar 2 (new 2.0 volume reached)
4192
4193        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4194        assert_eq!(handler_guard.len(), 2);
4195        assert_eq!(handler_guard[0].volume, Quantity::from(2));
4196        assert_eq!(handler_guard[1].volume, Quantity::from(2));
4197    }
4198
4199    #[rstest]
4200    fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
4201        let instrument = InstrumentAny::Equity(equity_aapl);
4202        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
4203        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4204        let handler = Arc::new(Mutex::new(Vec::new()));
4205        let handler_clone = Arc::clone(&handler);
4206
4207        let mut aggregator = ValueRunsBarAggregator::new(
4208            bar_type,
4209            instrument.price_precision(),
4210            instrument.size_precision(),
4211            move |bar: Bar| {
4212                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4213                handler_guard.push(bar);
4214            },
4215        );
4216
4217        let buy = TradeTick {
4218            instrument_id: instrument.id(),
4219            price: Price::from("10.0"),
4220            size: Quantity::from(5),
4221            ..TradeTick::default()
4222        }; // value 50 per trade
4223
4224        aggregator.handle_trade(buy);
4225        aggregator.handle_trade(buy); // Emit bar 1 (100 value reached)
4226        aggregator.handle_trade(buy); // Start new run
4227        aggregator.handle_trade(buy); // Emit bar 2 (new 100 value reached)
4228
4229        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4230        assert_eq!(handler_guard.len(), 2);
4231        assert_eq!(handler_guard[0].volume, Quantity::from(10));
4232        assert_eq!(handler_guard[1].volume, Quantity::from(10));
4233    }
4234
4235    #[rstest]
4236    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
4237        let instrument = InstrumentAny::Equity(equity_aapl);
4238        // One second bars
4239        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4240        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4241        let handler = Arc::new(Mutex::new(Vec::new()));
4242        let handler_clone = Arc::clone(&handler);
4243        let clock = Rc::new(RefCell::new(TestClock::new()));
4244
4245        let mut aggregator = TimeBarAggregator::new(
4246            bar_type,
4247            instrument.price_precision(),
4248            instrument.size_precision(),
4249            clock.clone(),
4250            move |bar: Bar| {
4251                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4252                handler_guard.push(bar);
4253            },
4254            true,  // build_with_no_updates
4255            false, // timestamp_on_close
4256            BarIntervalType::LeftOpen,
4257            None,  // time_bars_origin_offset
4258            15,    // bar_build_delay
4259            false, // skip_first_non_full_bar
4260        );
4261
4262        aggregator.update(
4263            Price::from("100.00"),
4264            Quantity::from(1),
4265            UnixNanos::default(),
4266        );
4267
4268        let next_sec = UnixNanos::from(1_000_000_000);
4269        clock.borrow_mut().set_time(next_sec);
4270
4271        let event = TimeEvent::new(
4272            Ustr::from("1-SECOND-LAST"),
4273            UUID4::new(),
4274            next_sec,
4275            next_sec,
4276        );
4277        aggregator.build_bar(&event);
4278
4279        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4280        assert_eq!(handler_guard.len(), 1);
4281        let bar = handler_guard.first().unwrap();
4282        assert_eq!(bar.ts_event, UnixNanos::default());
4283        assert_eq!(bar.ts_init, next_sec);
4284    }
4285
4286    #[rstest]
4287    fn test_time_bar_aggregator_stop_clears_timer_and_allows_restart(equity_aapl: Equity) {
4288        let instrument = InstrumentAny::Equity(equity_aapl);
4289        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4290        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4291        let timer_name = format!("TIME_BAR_{bar_type}");
4292        let clock = Rc::new(RefCell::new(TestClock::new()));
4293
4294        let aggregator = TimeBarAggregator::new(
4295            bar_type,
4296            instrument.price_precision(),
4297            instrument.size_precision(),
4298            clock.clone(),
4299            |_bar: Bar| {},
4300            true,
4301            false,
4302            BarIntervalType::LeftOpen,
4303            None,
4304            15,
4305            false,
4306        );
4307
4308        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
4309        let rc = Rc::new(RefCell::new(boxed));
4310
4311        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
4312        assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
4313
4314        rc.borrow_mut().stop();
4315        assert!(clock.borrow().timer_names().is_empty());
4316
4317        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
4318        assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
4319    }
4320
4321    #[rstest]
4322    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
4323        let instrument = InstrumentAny::Equity(equity_aapl);
4324        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4325        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4326        let handler = Arc::new(Mutex::new(Vec::new()));
4327        let handler_clone = Arc::clone(&handler);
4328        let clock = Rc::new(RefCell::new(TestClock::new()));
4329
4330        let mut aggregator = TimeBarAggregator::new(
4331            bar_type,
4332            instrument.price_precision(),
4333            instrument.size_precision(),
4334            clock.clone(),
4335            move |bar: Bar| {
4336                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4337                handler_guard.push(bar);
4338            },
4339            true, // build_with_no_updates
4340            true, // timestamp_on_close - changed to true to verify left-open behavior
4341            BarIntervalType::LeftOpen,
4342            None,
4343            15,
4344            false, // skip_first_non_full_bar
4345        );
4346
4347        // Update in first interval
4348        aggregator.update(
4349            Price::from("100.00"),
4350            Quantity::from(1),
4351            UnixNanos::default(),
4352        );
4353
4354        // First interval close
4355        let ts1 = UnixNanos::from(1_000_000_000);
4356        clock.borrow_mut().set_time(ts1);
4357        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4358        aggregator.build_bar(&event);
4359
4360        // Update in second interval
4361        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4362
4363        // Second interval close
4364        let ts2 = UnixNanos::from(2_000_000_000);
4365        clock.borrow_mut().set_time(ts2);
4366        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4367        aggregator.build_bar(&event);
4368
4369        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4370        assert_eq!(handler_guard.len(), 2);
4371
4372        let bar1 = &handler_guard[0];
4373        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
4374        assert_eq!(bar1.ts_init, ts1);
4375        assert_eq!(bar1.close, Price::from("100.00"));
4376        let bar2 = &handler_guard[1];
4377        assert_eq!(bar2.ts_event, ts2);
4378        assert_eq!(bar2.ts_init, ts2);
4379        assert_eq!(bar2.close, Price::from("101.00"));
4380    }
4381
4382    #[rstest]
4383    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
4384        let instrument = InstrumentAny::Equity(equity_aapl);
4385        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4386        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4387        let handler = Arc::new(Mutex::new(Vec::new()));
4388        let handler_clone = Arc::clone(&handler);
4389        let clock = Rc::new(RefCell::new(TestClock::new()));
4390        let mut aggregator = TimeBarAggregator::new(
4391            bar_type,
4392            instrument.price_precision(),
4393            instrument.size_precision(),
4394            clock.clone(),
4395            move |bar: Bar| {
4396                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4397                handler_guard.push(bar);
4398            },
4399            true, // build_with_no_updates
4400            true, // timestamp_on_close
4401            BarIntervalType::RightOpen,
4402            None,
4403            15,
4404            false, // skip_first_non_full_bar
4405        );
4406
4407        // Update in first interval
4408        aggregator.update(
4409            Price::from("100.00"),
4410            Quantity::from(1),
4411            UnixNanos::default(),
4412        );
4413
4414        // First interval close
4415        let ts1 = UnixNanos::from(1_000_000_000);
4416        clock.borrow_mut().set_time(ts1);
4417        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4418        aggregator.build_bar(&event);
4419
4420        // Update in second interval
4421        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4422
4423        // Second interval close
4424        let ts2 = UnixNanos::from(2_000_000_000);
4425        clock.borrow_mut().set_time(ts2);
4426        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4427        aggregator.build_bar(&event);
4428
4429        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4430        assert_eq!(handler_guard.len(), 2);
4431
4432        let bar1 = &handler_guard[0];
4433        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
4434        assert_eq!(bar1.ts_init, ts1);
4435        assert_eq!(bar1.close, Price::from("100.00"));
4436
4437        let bar2 = &handler_guard[1];
4438        assert_eq!(bar2.ts_event, ts1);
4439        assert_eq!(bar2.ts_init, ts2);
4440        assert_eq!(bar2.close, Price::from("101.00"));
4441    }
4442
4443    #[rstest]
4444    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
4445        let instrument = InstrumentAny::Equity(equity_aapl);
4446        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4447        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4448        let handler = Arc::new(Mutex::new(Vec::new()));
4449        let handler_clone = Arc::clone(&handler);
4450        let clock = Rc::new(RefCell::new(TestClock::new()));
4451
4452        // First test with build_with_no_updates = false
4453        let mut aggregator = TimeBarAggregator::new(
4454            bar_type,
4455            instrument.price_precision(),
4456            instrument.size_precision(),
4457            clock.clone(),
4458            move |bar: Bar| {
4459                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4460                handler_guard.push(bar);
4461            },
4462            false, // build_with_no_updates disabled
4463            true,  // timestamp_on_close
4464            BarIntervalType::LeftOpen,
4465            None,
4466            15,
4467            false, // skip_first_non_full_bar
4468        );
4469
4470        // No updates, just interval close
4471        let ts1 = UnixNanos::from(1_000_000_000);
4472        clock.borrow_mut().set_time(ts1);
4473        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4474        aggregator.build_bar(&event);
4475
4476        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4477        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
4478        drop(handler_guard);
4479
4480        // Now test with build_with_no_updates = true
4481        let handler = Arc::new(Mutex::new(Vec::new()));
4482        let handler_clone = Arc::clone(&handler);
4483        let mut aggregator = TimeBarAggregator::new(
4484            bar_type,
4485            instrument.price_precision(),
4486            instrument.size_precision(),
4487            clock.clone(),
4488            move |bar: Bar| {
4489                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4490                handler_guard.push(bar);
4491            },
4492            true, // build_with_no_updates enabled
4493            true, // timestamp_on_close
4494            BarIntervalType::LeftOpen,
4495            None,
4496            15,
4497            false, // skip_first_non_full_bar
4498        );
4499
4500        aggregator.update(
4501            Price::from("100.00"),
4502            Quantity::from(1),
4503            UnixNanos::default(),
4504        );
4505
4506        // First interval with update
4507        let ts1 = UnixNanos::from(1_000_000_000);
4508        clock.borrow_mut().set_time(ts1);
4509        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4510        aggregator.build_bar(&event);
4511
4512        // Second interval without updates
4513        let ts2 = UnixNanos::from(2_000_000_000);
4514        clock.borrow_mut().set_time(ts2);
4515        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4516        aggregator.build_bar(&event);
4517
4518        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4519        assert_eq!(handler_guard.len(), 2); // Both bars should be built
4520        let bar1 = &handler_guard[0];
4521        assert_eq!(bar1.close, Price::from("100.00"));
4522        let bar2 = &handler_guard[1];
4523        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
4524    }
4525
4526    #[rstest]
4527    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
4528        let instrument = InstrumentAny::Equity(equity_aapl);
4529        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4530        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4531        let clock = Rc::new(RefCell::new(TestClock::new()));
4532        let handler = Arc::new(Mutex::new(Vec::new()));
4533        let handler_clone = Arc::clone(&handler);
4534
4535        let mut aggregator = TimeBarAggregator::new(
4536            bar_type,
4537            instrument.price_precision(),
4538            instrument.size_precision(),
4539            clock.clone(),
4540            move |bar: Bar| {
4541                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4542                handler_guard.push(bar);
4543            },
4544            true, // build_with_no_updates
4545            true, // timestamp_on_close
4546            BarIntervalType::RightOpen,
4547            None,
4548            15,
4549            false, // skip_first_non_full_bar
4550        );
4551
4552        let ts1 = UnixNanos::from(1_000_000_000);
4553        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
4554
4555        let ts2 = UnixNanos::from(2_000_000_000);
4556        clock.borrow_mut().set_time(ts2);
4557
4558        // Simulate timestamp on close
4559        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4560        aggregator.build_bar(&event);
4561
4562        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4563        let bar = handler_guard.first().unwrap();
4564        assert_eq!(bar.ts_event, UnixNanos::default());
4565        assert_eq!(bar.ts_init, ts2);
4566    }
4567
4568    #[rstest]
4569    fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
4570        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4571        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4572        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4573        let handler = Arc::new(Mutex::new(Vec::new()));
4574        let handler_clone = Arc::clone(&handler);
4575
4576        let aggregator = RenkoBarAggregator::new(
4577            bar_type,
4578            instrument.price_precision(),
4579            instrument.size_precision(),
4580            instrument.price_increment(),
4581            move |bar: Bar| {
4582                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4583                handler_guard.push(bar);
4584            },
4585        );
4586
4587        assert_eq!(aggregator.bar_type(), bar_type);
4588        assert!(!aggregator.is_running());
4589        // 10 pips * price_increment.raw (depends on precision mode)
4590        let expected_brick_size = 10 * instrument.price_increment().raw;
4591        assert_eq!(aggregator.brick_size, expected_brick_size);
4592    }
4593
4594    #[rstest]
4595    fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
4596        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4597        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4598        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4599        let handler = Arc::new(Mutex::new(Vec::new()));
4600        let handler_clone = Arc::clone(&handler);
4601
4602        let mut aggregator = RenkoBarAggregator::new(
4603            bar_type,
4604            instrument.price_precision(),
4605            instrument.size_precision(),
4606            instrument.price_increment(),
4607            move |bar: Bar| {
4608                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4609                handler_guard.push(bar);
4610            },
4611        );
4612
4613        // Small price movement (5 pips, less than 10 pip brick size)
4614        aggregator.update(
4615            Price::from("1.00000"),
4616            Quantity::from(1),
4617            UnixNanos::default(),
4618        );
4619        aggregator.update(
4620            Price::from("1.00005"),
4621            Quantity::from(1),
4622            UnixNanos::from(1000),
4623        );
4624
4625        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4626        assert_eq!(handler_guard.len(), 0); // No bar created yet
4627    }
4628
4629    #[rstest]
4630    fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
4631        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4632        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4633        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4634        let handler = Arc::new(Mutex::new(Vec::new()));
4635        let handler_clone = Arc::clone(&handler);
4636
4637        let mut aggregator = RenkoBarAggregator::new(
4638            bar_type,
4639            instrument.price_precision(),
4640            instrument.size_precision(),
4641            instrument.price_increment(),
4642            move |bar: Bar| {
4643                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4644                handler_guard.push(bar);
4645            },
4646        );
4647
4648        // Price movement exceeding brick size (15 pips)
4649        aggregator.update(
4650            Price::from("1.00000"),
4651            Quantity::from(1),
4652            UnixNanos::default(),
4653        );
4654        aggregator.update(
4655            Price::from("1.00015"),
4656            Quantity::from(1),
4657            UnixNanos::from(1000),
4658        );
4659
4660        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4661        assert_eq!(handler_guard.len(), 1);
4662
4663        let bar = handler_guard.first().unwrap();
4664        assert_eq!(bar.open, Price::from("1.00000"));
4665        assert_eq!(bar.high, Price::from("1.00010"));
4666        assert_eq!(bar.low, Price::from("1.00000"));
4667        assert_eq!(bar.close, Price::from("1.00010"));
4668        assert_eq!(bar.volume, Quantity::from(2));
4669        assert_eq!(bar.ts_event, UnixNanos::from(1000));
4670        assert_eq!(bar.ts_init, UnixNanos::from(1000));
4671    }
4672
4673    #[rstest]
4674    fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
4675        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4676        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4677        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4678        let handler = Arc::new(Mutex::new(Vec::new()));
4679        let handler_clone = Arc::clone(&handler);
4680
4681        let mut aggregator = RenkoBarAggregator::new(
4682            bar_type,
4683            instrument.price_precision(),
4684            instrument.size_precision(),
4685            instrument.price_increment(),
4686            move |bar: Bar| {
4687                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4688                handler_guard.push(bar);
4689            },
4690        );
4691
4692        // Large price movement creating multiple bricks (25 pips = 2 bricks)
4693        aggregator.update(
4694            Price::from("1.00000"),
4695            Quantity::from(1),
4696            UnixNanos::default(),
4697        );
4698        aggregator.update(
4699            Price::from("1.00025"),
4700            Quantity::from(1),
4701            UnixNanos::from(1000),
4702        );
4703
4704        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4705        assert_eq!(handler_guard.len(), 2);
4706
4707        let bar1 = &handler_guard[0];
4708        assert_eq!(bar1.open, Price::from("1.00000"));
4709        assert_eq!(bar1.high, Price::from("1.00010"));
4710        assert_eq!(bar1.low, Price::from("1.00000"));
4711        assert_eq!(bar1.close, Price::from("1.00010"));
4712
4713        let bar2 = &handler_guard[1];
4714        assert_eq!(bar2.open, Price::from("1.00010"));
4715        assert_eq!(bar2.high, Price::from("1.00020"));
4716        assert_eq!(bar2.low, Price::from("1.00010"));
4717        assert_eq!(bar2.close, Price::from("1.00020"));
4718    }
4719
4720    #[rstest]
4721    fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
4722        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4723        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4724        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4725        let handler = Arc::new(Mutex::new(Vec::new()));
4726        let handler_clone = Arc::clone(&handler);
4727
4728        let mut aggregator = RenkoBarAggregator::new(
4729            bar_type,
4730            instrument.price_precision(),
4731            instrument.size_precision(),
4732            instrument.price_increment(),
4733            move |bar: Bar| {
4734                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4735                handler_guard.push(bar);
4736            },
4737        );
4738
4739        // Start at higher price and move down
4740        aggregator.update(
4741            Price::from("1.00020"),
4742            Quantity::from(1),
4743            UnixNanos::default(),
4744        );
4745        aggregator.update(
4746            Price::from("1.00005"),
4747            Quantity::from(1),
4748            UnixNanos::from(1000),
4749        );
4750
4751        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4752        assert_eq!(handler_guard.len(), 1);
4753
4754        let bar = handler_guard.first().unwrap();
4755        assert_eq!(bar.open, Price::from("1.00020"));
4756        assert_eq!(bar.high, Price::from("1.00020"));
4757        assert_eq!(bar.low, Price::from("1.00010"));
4758        assert_eq!(bar.close, Price::from("1.00010"));
4759        assert_eq!(bar.volume, Quantity::from(2));
4760    }
4761
4762    #[rstest]
4763    fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
4764        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4765        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4766        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4767        let handler = Arc::new(Mutex::new(Vec::new()));
4768        let handler_clone = Arc::clone(&handler);
4769
4770        let mut aggregator = RenkoBarAggregator::new(
4771            bar_type,
4772            instrument.price_precision(),
4773            instrument.size_precision(),
4774            instrument.price_increment(),
4775            move |bar: Bar| {
4776                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4777                handler_guard.push(bar);
4778            },
4779        );
4780
4781        // Create a bar with small price movement (5 pips)
4782        let input_bar = Bar::new(
4783            BarType::new(
4784                instrument.id(),
4785                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4786                AggregationSource::Internal,
4787            ),
4788            Price::from("1.00000"),
4789            Price::from("1.00005"),
4790            Price::from("0.99995"),
4791            Price::from("1.00005"), // 5 pip move up (less than 10 pip brick)
4792            Quantity::from(100),
4793            UnixNanos::default(),
4794            UnixNanos::from(1000),
4795        );
4796
4797        aggregator.handle_bar(input_bar);
4798
4799        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4800        assert_eq!(handler_guard.len(), 0); // No bar created yet
4801    }
4802
4803    #[rstest]
4804    fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
4805        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4806        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4807        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4808        let handler = Arc::new(Mutex::new(Vec::new()));
4809        let handler_clone = Arc::clone(&handler);
4810
4811        let mut aggregator = RenkoBarAggregator::new(
4812            bar_type,
4813            instrument.price_precision(),
4814            instrument.size_precision(),
4815            instrument.price_increment(),
4816            move |bar: Bar| {
4817                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4818                handler_guard.push(bar);
4819            },
4820        );
4821
4822        // First bar to establish baseline
4823        let bar1 = Bar::new(
4824            BarType::new(
4825                instrument.id(),
4826                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4827                AggregationSource::Internal,
4828            ),
4829            Price::from("1.00000"),
4830            Price::from("1.00005"),
4831            Price::from("0.99995"),
4832            Price::from("1.00000"),
4833            Quantity::from(100),
4834            UnixNanos::default(),
4835            UnixNanos::default(),
4836        );
4837
4838        // Second bar with price movement exceeding brick size (10 pips)
4839        let bar2 = Bar::new(
4840            BarType::new(
4841                instrument.id(),
4842                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4843                AggregationSource::Internal,
4844            ),
4845            Price::from("1.00000"),
4846            Price::from("1.00015"),
4847            Price::from("0.99995"),
4848            Price::from("1.00010"), // 10 pip move up (exactly 1 brick)
4849            Quantity::from(50),
4850            UnixNanos::from(60_000_000_000),
4851            UnixNanos::from(60_000_000_000),
4852        );
4853
4854        aggregator.handle_bar(bar1);
4855        aggregator.handle_bar(bar2);
4856
4857        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4858        assert_eq!(handler_guard.len(), 1);
4859
4860        let bar = handler_guard.first().unwrap();
4861        assert_eq!(bar.open, Price::from("1.00000"));
4862        assert_eq!(bar.high, Price::from("1.00010"));
4863        assert_eq!(bar.low, Price::from("1.00000"));
4864        assert_eq!(bar.close, Price::from("1.00010"));
4865        assert_eq!(bar.volume, Quantity::from(150));
4866    }
4867
4868    #[rstest]
4869    fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
4870        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4871        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4872        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4873        let handler = Arc::new(Mutex::new(Vec::new()));
4874        let handler_clone = Arc::clone(&handler);
4875
4876        let mut aggregator = RenkoBarAggregator::new(
4877            bar_type,
4878            instrument.price_precision(),
4879            instrument.size_precision(),
4880            instrument.price_increment(),
4881            move |bar: Bar| {
4882                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4883                handler_guard.push(bar);
4884            },
4885        );
4886
4887        // First bar to establish baseline
4888        let bar1 = Bar::new(
4889            BarType::new(
4890                instrument.id(),
4891                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4892                AggregationSource::Internal,
4893            ),
4894            Price::from("1.00000"),
4895            Price::from("1.00005"),
4896            Price::from("0.99995"),
4897            Price::from("1.00000"),
4898            Quantity::from(100),
4899            UnixNanos::default(),
4900            UnixNanos::default(),
4901        );
4902
4903        // Second bar with large price movement (30 pips = 3 bricks)
4904        let bar2 = Bar::new(
4905            BarType::new(
4906                instrument.id(),
4907                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4908                AggregationSource::Internal,
4909            ),
4910            Price::from("1.00000"),
4911            Price::from("1.00035"),
4912            Price::from("0.99995"),
4913            Price::from("1.00030"), // 30 pip move up (exactly 3 bricks)
4914            Quantity::from(50),
4915            UnixNanos::from(60_000_000_000),
4916            UnixNanos::from(60_000_000_000),
4917        );
4918
4919        aggregator.handle_bar(bar1);
4920        aggregator.handle_bar(bar2);
4921
4922        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4923        assert_eq!(handler_guard.len(), 3);
4924
4925        let bar1 = &handler_guard[0];
4926        assert_eq!(bar1.open, Price::from("1.00000"));
4927        assert_eq!(bar1.close, Price::from("1.00010"));
4928
4929        let bar2 = &handler_guard[1];
4930        assert_eq!(bar2.open, Price::from("1.00010"));
4931        assert_eq!(bar2.close, Price::from("1.00020"));
4932
4933        let bar3 = &handler_guard[2];
4934        assert_eq!(bar3.open, Price::from("1.00020"));
4935        assert_eq!(bar3.close, Price::from("1.00030"));
4936    }
4937
4938    #[rstest]
4939    fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
4940        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4941        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4942        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4943        let handler = Arc::new(Mutex::new(Vec::new()));
4944        let handler_clone = Arc::clone(&handler);
4945
4946        let mut aggregator = RenkoBarAggregator::new(
4947            bar_type,
4948            instrument.price_precision(),
4949            instrument.size_precision(),
4950            instrument.price_increment(),
4951            move |bar: Bar| {
4952                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4953                handler_guard.push(bar);
4954            },
4955        );
4956
4957        // First bar to establish baseline
4958        let bar1 = Bar::new(
4959            BarType::new(
4960                instrument.id(),
4961                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4962                AggregationSource::Internal,
4963            ),
4964            Price::from("1.00020"),
4965            Price::from("1.00025"),
4966            Price::from("1.00015"),
4967            Price::from("1.00020"),
4968            Quantity::from(100),
4969            UnixNanos::default(),
4970            UnixNanos::default(),
4971        );
4972
4973        // Second bar with downward price movement (10 pips down)
4974        let bar2 = Bar::new(
4975            BarType::new(
4976                instrument.id(),
4977                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4978                AggregationSource::Internal,
4979            ),
4980            Price::from("1.00020"),
4981            Price::from("1.00025"),
4982            Price::from("1.00005"),
4983            Price::from("1.00010"), // 10 pip move down (exactly 1 brick)
4984            Quantity::from(50),
4985            UnixNanos::from(60_000_000_000),
4986            UnixNanos::from(60_000_000_000),
4987        );
4988
4989        aggregator.handle_bar(bar1);
4990        aggregator.handle_bar(bar2);
4991
4992        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4993        assert_eq!(handler_guard.len(), 1);
4994
4995        let bar = handler_guard.first().unwrap();
4996        assert_eq!(bar.open, Price::from("1.00020"));
4997        assert_eq!(bar.high, Price::from("1.00020"));
4998        assert_eq!(bar.low, Price::from("1.00010"));
4999        assert_eq!(bar.close, Price::from("1.00010"));
5000        assert_eq!(bar.volume, Quantity::from(150));
5001    }
5002
5003    #[rstest]
5004    fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
5005        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5006
5007        // Test different brick sizes
5008        let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); // 5 pip brick size
5009        let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
5010        let handler = Arc::new(Mutex::new(Vec::new()));
5011        let handler_clone = Arc::clone(&handler);
5012
5013        let aggregator_5 = RenkoBarAggregator::new(
5014            bar_type_5,
5015            instrument.price_precision(),
5016            instrument.size_precision(),
5017            instrument.price_increment(),
5018            move |_bar: Bar| {
5019                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5020                handler_guard.push(_bar);
5021            },
5022        );
5023
5024        // 5 pips * price_increment.raw (depends on precision mode)
5025        let expected_brick_size_5 = 5 * instrument.price_increment().raw;
5026        assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
5027
5028        let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); // 20 pip brick size
5029        let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
5030        let handler2 = Arc::new(Mutex::new(Vec::new()));
5031        let handler2_clone = Arc::clone(&handler2);
5032
5033        let aggregator_20 = RenkoBarAggregator::new(
5034            bar_type_20,
5035            instrument.price_precision(),
5036            instrument.size_precision(),
5037            instrument.price_increment(),
5038            move |_bar: Bar| {
5039                let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
5040                handler_guard.push(_bar);
5041            },
5042        );
5043
5044        // 20 pips * price_increment.raw (depends on precision mode)
5045        let expected_brick_size_20 = 20 * instrument.price_increment().raw;
5046        assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
5047    }
5048
5049    #[rstest]
5050    fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
5051        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5052        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
5053        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5054        let handler = Arc::new(Mutex::new(Vec::new()));
5055        let handler_clone = Arc::clone(&handler);
5056
5057        let mut aggregator = RenkoBarAggregator::new(
5058            bar_type,
5059            instrument.price_precision(),
5060            instrument.size_precision(),
5061            instrument.price_increment(),
5062            move |bar: Bar| {
5063                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5064                handler_guard.push(bar);
5065            },
5066        );
5067
5068        // Sequential updates creating multiple bars
5069        aggregator.update(
5070            Price::from("1.00000"),
5071            Quantity::from(1),
5072            UnixNanos::from(1000),
5073        );
5074        aggregator.update(
5075            Price::from("1.00010"),
5076            Quantity::from(1),
5077            UnixNanos::from(2000),
5078        ); // First brick
5079        aggregator.update(
5080            Price::from("1.00020"),
5081            Quantity::from(1),
5082            UnixNanos::from(3000),
5083        ); // Second brick
5084        aggregator.update(
5085            Price::from("1.00025"),
5086            Quantity::from(1),
5087            UnixNanos::from(4000),
5088        ); // Partial third brick
5089        aggregator.update(
5090            Price::from("1.00030"),
5091            Quantity::from(1),
5092            UnixNanos::from(5000),
5093        ); // Complete third brick
5094
5095        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5096        assert_eq!(handler_guard.len(), 3);
5097
5098        let bar1 = &handler_guard[0];
5099        assert_eq!(bar1.open, Price::from("1.00000"));
5100        assert_eq!(bar1.close, Price::from("1.00010"));
5101
5102        let bar2 = &handler_guard[1];
5103        assert_eq!(bar2.open, Price::from("1.00010"));
5104        assert_eq!(bar2.close, Price::from("1.00020"));
5105
5106        let bar3 = &handler_guard[2];
5107        assert_eq!(bar3.open, Price::from("1.00020"));
5108        assert_eq!(bar3.close, Price::from("1.00030"));
5109    }
5110
5111    #[rstest]
5112    fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
5113        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
5114        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
5115        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5116        let handler = Arc::new(Mutex::new(Vec::new()));
5117        let handler_clone = Arc::clone(&handler);
5118
5119        let mut aggregator = RenkoBarAggregator::new(
5120            bar_type,
5121            instrument.price_precision(),
5122            instrument.size_precision(),
5123            instrument.price_increment(),
5124            move |bar: Bar| {
5125                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5126                handler_guard.push(bar);
5127            },
5128        );
5129
5130        // Mixed direction movement: up then down
5131        aggregator.update(
5132            Price::from("1.00000"),
5133            Quantity::from(1),
5134            UnixNanos::from(1000),
5135        );
5136        aggregator.update(
5137            Price::from("1.00010"),
5138            Quantity::from(1),
5139            UnixNanos::from(2000),
5140        ); // Up brick
5141        aggregator.update(
5142            Price::from("0.99990"),
5143            Quantity::from(1),
5144            UnixNanos::from(3000),
5145        ); // Down 2 bricks (20 pips)
5146
5147        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5148        assert_eq!(handler_guard.len(), 3);
5149
5150        let bar1 = &handler_guard[0]; // Up brick
5151        assert_eq!(bar1.open, Price::from("1.00000"));
5152        assert_eq!(bar1.high, Price::from("1.00010"));
5153        assert_eq!(bar1.low, Price::from("1.00000"));
5154        assert_eq!(bar1.close, Price::from("1.00010"));
5155
5156        let bar2 = &handler_guard[1]; // First down brick
5157        assert_eq!(bar2.open, Price::from("1.00010"));
5158        assert_eq!(bar2.high, Price::from("1.00010"));
5159        assert_eq!(bar2.low, Price::from("1.00000"));
5160        assert_eq!(bar2.close, Price::from("1.00000"));
5161
5162        let bar3 = &handler_guard[2]; // Second down brick
5163        assert_eq!(bar3.open, Price::from("1.00000"));
5164        assert_eq!(bar3.high, Price::from("1.00000"));
5165        assert_eq!(bar3.low, Price::from("0.99990"));
5166        assert_eq!(bar3.close, Price::from("0.99990"));
5167    }
5168
5169    #[rstest]
5170    fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
5171        let instrument = InstrumentAny::Equity(equity_aapl);
5172        let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
5173        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5174        let handler = Arc::new(Mutex::new(Vec::new()));
5175        let handler_clone = Arc::clone(&handler);
5176
5177        let mut aggregator = TickImbalanceBarAggregator::new(
5178            bar_type,
5179            instrument.price_precision(),
5180            instrument.size_precision(),
5181            move |bar: Bar| {
5182                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5183                handler_guard.push(bar);
5184            },
5185        );
5186
5187        let buy = TradeTick {
5188            aggressor_side: AggressorSide::Buyer,
5189            ..TradeTick::default()
5190        };
5191        let sell = TradeTick {
5192            aggressor_side: AggressorSide::Seller,
5193            ..TradeTick::default()
5194        };
5195
5196        aggregator.handle_trade(buy);
5197        aggregator.handle_trade(sell);
5198        aggregator.handle_trade(buy);
5199
5200        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5201        assert_eq!(handler_guard.len(), 0);
5202    }
5203
5204    #[rstest]
5205    fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
5206        let instrument = InstrumentAny::Equity(equity_aapl);
5207        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
5208        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5209        let handler = Arc::new(Mutex::new(Vec::new()));
5210        let handler_clone = Arc::clone(&handler);
5211
5212        let mut aggregator = TickImbalanceBarAggregator::new(
5213            bar_type,
5214            instrument.price_precision(),
5215            instrument.size_precision(),
5216            move |bar: Bar| {
5217                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5218                handler_guard.push(bar);
5219            },
5220        );
5221
5222        let buy = TradeTick {
5223            aggressor_side: AggressorSide::Buyer,
5224            ..TradeTick::default()
5225        };
5226        let no_aggressor = TradeTick {
5227            aggressor_side: AggressorSide::NoAggressor,
5228            ..TradeTick::default()
5229        };
5230
5231        aggregator.handle_trade(buy);
5232        aggregator.handle_trade(no_aggressor);
5233        aggregator.handle_trade(buy);
5234
5235        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5236        assert_eq!(handler_guard.len(), 1);
5237    }
5238
5239    #[rstest]
5240    fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
5241        let instrument = InstrumentAny::Equity(equity_aapl);
5242        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
5243        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5244        let handler = Arc::new(Mutex::new(Vec::new()));
5245        let handler_clone = Arc::clone(&handler);
5246
5247        let mut aggregator = TickRunsBarAggregator::new(
5248            bar_type,
5249            instrument.price_precision(),
5250            instrument.size_precision(),
5251            move |bar: Bar| {
5252                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5253                handler_guard.push(bar);
5254            },
5255        );
5256
5257        let buy = TradeTick {
5258            aggressor_side: AggressorSide::Buyer,
5259            ..TradeTick::default()
5260        };
5261        let sell = TradeTick {
5262            aggressor_side: AggressorSide::Seller,
5263            ..TradeTick::default()
5264        };
5265
5266        aggregator.handle_trade(buy);
5267        aggregator.handle_trade(buy);
5268        aggregator.handle_trade(sell);
5269        aggregator.handle_trade(sell);
5270
5271        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5272        assert_eq!(handler_guard.len(), 2);
5273    }
5274
5275    #[rstest]
5276    fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5277        let instrument = InstrumentAny::Equity(equity_aapl);
5278        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
5279        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5280        let handler = Arc::new(Mutex::new(Vec::new()));
5281        let handler_clone = Arc::clone(&handler);
5282
5283        let mut aggregator = VolumeImbalanceBarAggregator::new(
5284            bar_type,
5285            instrument.price_precision(),
5286            instrument.size_precision(),
5287            move |bar: Bar| {
5288                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5289                handler_guard.push(bar);
5290            },
5291        );
5292
5293        let large_trade = TradeTick {
5294            size: Quantity::from(25),
5295            aggressor_side: AggressorSide::Buyer,
5296            ..TradeTick::default()
5297        };
5298
5299        aggregator.handle_trade(large_trade);
5300
5301        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5302        assert_eq!(handler_guard.len(), 2);
5303    }
5304
5305    #[rstest]
5306    fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
5307        equity_aapl: Equity,
5308    ) {
5309        let instrument = InstrumentAny::Equity(equity_aapl);
5310        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
5311        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5312        let handler = Arc::new(Mutex::new(Vec::new()));
5313        let handler_clone = Arc::clone(&handler);
5314
5315        let mut aggregator = VolumeImbalanceBarAggregator::new(
5316            bar_type,
5317            instrument.price_precision(),
5318            instrument.size_precision(),
5319            move |bar: Bar| {
5320                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5321                handler_guard.push(bar);
5322            },
5323        );
5324
5325        let buy = TradeTick {
5326            size: Quantity::from(5),
5327            aggressor_side: AggressorSide::Buyer,
5328            ..TradeTick::default()
5329        };
5330        let no_aggressor = TradeTick {
5331            size: Quantity::from(3),
5332            aggressor_side: AggressorSide::NoAggressor,
5333            ..TradeTick::default()
5334        };
5335
5336        aggregator.handle_trade(buy);
5337        aggregator.handle_trade(no_aggressor);
5338        aggregator.handle_trade(buy);
5339
5340        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5341        assert_eq!(handler_guard.len(), 1);
5342    }
5343
5344    #[rstest]
5345    fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5346        let instrument = InstrumentAny::Equity(equity_aapl);
5347        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
5348        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5349        let handler = Arc::new(Mutex::new(Vec::new()));
5350        let handler_clone = Arc::clone(&handler);
5351
5352        let mut aggregator = VolumeRunsBarAggregator::new(
5353            bar_type,
5354            instrument.price_precision(),
5355            instrument.size_precision(),
5356            move |bar: Bar| {
5357                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5358                handler_guard.push(bar);
5359            },
5360        );
5361
5362        let large_trade = TradeTick {
5363            size: Quantity::from(25),
5364            aggressor_side: AggressorSide::Buyer,
5365            ..TradeTick::default()
5366        };
5367
5368        aggregator.handle_trade(large_trade);
5369
5370        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5371        assert_eq!(handler_guard.len(), 2);
5372    }
5373
5374    #[rstest]
5375    fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5376        let instrument = InstrumentAny::Equity(equity_aapl);
5377        let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
5378        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5379        let handler = Arc::new(Mutex::new(Vec::new()));
5380        let handler_clone = Arc::clone(&handler);
5381
5382        let mut aggregator = ValueRunsBarAggregator::new(
5383            bar_type,
5384            instrument.price_precision(),
5385            instrument.size_precision(),
5386            move |bar: Bar| {
5387                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5388                handler_guard.push(bar);
5389            },
5390        );
5391
5392        let large_trade = TradeTick {
5393            price: Price::from("5.00"),
5394            size: Quantity::from(25),
5395            aggressor_side: AggressorSide::Buyer,
5396            ..TradeTick::default()
5397        };
5398
5399        aggregator.handle_trade(large_trade);
5400
5401        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5402        assert_eq!(handler_guard.len(), 2);
5403    }
5404
5405    #[rstest]
5406    fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5407        let instrument = InstrumentAny::Equity(equity_aapl);
5408        let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
5409        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5410        let handler = Arc::new(Mutex::new(Vec::new()));
5411        let handler_clone = Arc::clone(&handler);
5412
5413        let mut aggregator = ValueBarAggregator::new(
5414            bar_type,
5415            instrument.price_precision(),
5416            instrument.size_precision(),
5417            move |bar: Bar| {
5418                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5419                handler_guard.push(bar);
5420            },
5421        );
5422
5423        // price=1000, size=3, value=3000, step=100 → size_chunk=0.1 rounds to 0 at precision 0
5424        aggregator.update(
5425            Price::from("1000.00"),
5426            Quantity::from(3),
5427            UnixNanos::default(),
5428        );
5429
5430        // 3 bars (one per min-size unit), not 30 zero-volume bars
5431        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5432        assert_eq!(handler_guard.len(), 3);
5433        for bar in handler_guard.iter() {
5434            assert_eq!(bar.volume, Quantity::from(1));
5435        }
5436    }
5437
5438    #[rstest]
5439    fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5440        let instrument = InstrumentAny::Equity(equity_aapl);
5441        let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5442        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5443        let handler = Arc::new(Mutex::new(Vec::new()));
5444        let handler_clone = Arc::clone(&handler);
5445
5446        let mut aggregator = ValueImbalanceBarAggregator::new(
5447            bar_type,
5448            instrument.price_precision(),
5449            instrument.size_precision(),
5450            move |bar: Bar| {
5451                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5452                handler_guard.push(bar);
5453            },
5454        );
5455
5456        let trade = TradeTick {
5457            price: Price::from("1000.00"),
5458            size: Quantity::from(3),
5459            aggressor_side: AggressorSide::Buyer,
5460            instrument_id: instrument.id(),
5461            ..TradeTick::default()
5462        };
5463
5464        aggregator.handle_trade(trade);
5465
5466        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5467        assert_eq!(handler_guard.len(), 3);
5468        for bar in handler_guard.iter() {
5469            assert_eq!(bar.volume, Quantity::from(1));
5470        }
5471    }
5472
5473    #[rstest]
5474    fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
5475        let instrument = InstrumentAny::Equity(equity_aapl);
5476        let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5477        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5478        let handler = Arc::new(Mutex::new(Vec::new()));
5479        let handler_clone = Arc::clone(&handler);
5480
5481        let mut aggregator = ValueImbalanceBarAggregator::new(
5482            bar_type,
5483            instrument.price_precision(),
5484            instrument.size_precision(),
5485            move |bar: Bar| {
5486                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5487                handler_guard.push(bar);
5488            },
5489        );
5490
5491        // Build seller imbalance of -50 (below step=100, no bar yet)
5492        let sell_tick = TradeTick {
5493            price: Price::from("10.00"),
5494            size: Quantity::from(5),
5495            aggressor_side: AggressorSide::Seller,
5496            instrument_id: instrument.id(),
5497            ..TradeTick::default()
5498        };
5499
5500        // Opposite-side buyer: flatten amount 50/1000=0.05 < min_size (1),
5501        // clamp overshoots imbalance from -50 to +950, crossing threshold
5502        let buy_tick = TradeTick {
5503            price: Price::from("1000.00"),
5504            size: Quantity::from(1),
5505            aggressor_side: AggressorSide::Buyer,
5506            instrument_id: instrument.id(),
5507            ts_init: UnixNanos::from(1),
5508            ts_event: UnixNanos::from(1),
5509            ..TradeTick::default()
5510        };
5511
5512        aggregator.handle_trade(sell_tick);
5513        aggregator.handle_trade(buy_tick);
5514
5515        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5516        assert_eq!(handler_guard.len(), 1);
5517        assert_eq!(handler_guard[0].volume, Quantity::from(6));
5518    }
5519
5520    #[rstest]
5521    fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5522        let instrument = InstrumentAny::Equity(equity_aapl);
5523        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
5524        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5525        let handler = Arc::new(Mutex::new(Vec::new()));
5526        let handler_clone = Arc::clone(&handler);
5527
5528        let mut aggregator = ValueRunsBarAggregator::new(
5529            bar_type,
5530            instrument.price_precision(),
5531            instrument.size_precision(),
5532            move |bar: Bar| {
5533                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5534                handler_guard.push(bar);
5535            },
5536        );
5537
5538        let trade = TradeTick {
5539            price: Price::from("1000.00"),
5540            size: Quantity::from(3),
5541            aggressor_side: AggressorSide::Buyer,
5542            instrument_id: instrument.id(),
5543            ..TradeTick::default()
5544        };
5545
5546        aggregator.handle_trade(trade);
5547
5548        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5549        assert_eq!(handler_guard.len(), 3);
5550        for bar in handler_guard.iter() {
5551            assert_eq!(bar.volume, Quantity::from(1));
5552        }
5553    }
5554
5555    #[rstest]
5556    #[case(1000_u64)]
5557    #[case(1500_u64)]
5558    fn test_volume_imbalance_bar_aggregator_large_step_no_overflow(
5559        equity_aapl: Equity,
5560        #[case] step: u64,
5561    ) {
5562        let instrument = InstrumentAny::Equity(equity_aapl);
5563        let bar_spec = BarSpecification::new(
5564            step as usize,
5565            BarAggregation::VolumeImbalance,
5566            PriceType::Last,
5567        );
5568        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5569        let handler = Arc::new(Mutex::new(Vec::new()));
5570        let handler_clone = Arc::clone(&handler);
5571
5572        let mut aggregator = VolumeImbalanceBarAggregator::new(
5573            bar_type,
5574            instrument.price_precision(),
5575            instrument.size_precision(),
5576            move |bar: Bar| {
5577                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5578                handler_guard.push(bar);
5579            },
5580        );
5581
5582        let trade = TradeTick {
5583            size: Quantity::from(step * 2),
5584            aggressor_side: AggressorSide::Buyer,
5585            ..TradeTick::default()
5586        };
5587
5588        aggregator.handle_trade(trade);
5589
5590        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5591        assert_eq!(handler_guard.len(), 2);
5592        for bar in handler_guard.iter() {
5593            assert_eq!(bar.volume.as_f64(), step as f64);
5594        }
5595    }
5596
5597    #[rstest]
5598    fn test_volume_imbalance_bar_aggregator_different_large_steps_produce_different_bar_counts(
5599        equity_aapl: Equity,
5600    ) {
5601        let instrument = InstrumentAny::Equity(equity_aapl);
5602        let total_volume = 3000_u64;
5603        let mut results = Vec::new();
5604
5605        for step in [1000_usize, 1500] {
5606            let bar_spec =
5607                BarSpecification::new(step, BarAggregation::VolumeImbalance, PriceType::Last);
5608            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5609            let handler = Arc::new(Mutex::new(Vec::new()));
5610            let handler_clone = Arc::clone(&handler);
5611
5612            let mut aggregator = VolumeImbalanceBarAggregator::new(
5613                bar_type,
5614                instrument.price_precision(),
5615                instrument.size_precision(),
5616                move |bar: Bar| {
5617                    let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5618                    handler_guard.push(bar);
5619                },
5620            );
5621
5622            let trade = TradeTick {
5623                size: Quantity::from(total_volume),
5624                aggressor_side: AggressorSide::Buyer,
5625                ..TradeTick::default()
5626            };
5627
5628            aggregator.handle_trade(trade);
5629
5630            let handler_guard = handler.lock().expect(MUTEX_POISONED);
5631            results.push(handler_guard.len());
5632        }
5633
5634        assert_eq!(results[0], 3); // 3000 / 1000
5635        assert_eq!(results[1], 2); // 3000 / 1500
5636        assert_ne!(results[0], results[1]);
5637    }
5638
5639    #[rstest]
5640    #[case(1000_u64)]
5641    #[case(1500_u64)]
5642    fn test_volume_runs_bar_aggregator_large_step_no_overflow(
5643        equity_aapl: Equity,
5644        #[case] step: u64,
5645    ) {
5646        let instrument = InstrumentAny::Equity(equity_aapl);
5647        let bar_spec =
5648            BarSpecification::new(step as usize, BarAggregation::VolumeRuns, PriceType::Last);
5649        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5650        let handler = Arc::new(Mutex::new(Vec::new()));
5651        let handler_clone = Arc::clone(&handler);
5652
5653        let mut aggregator = VolumeRunsBarAggregator::new(
5654            bar_type,
5655            instrument.price_precision(),
5656            instrument.size_precision(),
5657            move |bar: Bar| {
5658                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5659                handler_guard.push(bar);
5660            },
5661        );
5662
5663        let trade = TradeTick {
5664            size: Quantity::from(step * 2),
5665            aggressor_side: AggressorSide::Buyer,
5666            ..TradeTick::default()
5667        };
5668
5669        aggregator.handle_trade(trade);
5670
5671        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5672        assert_eq!(handler_guard.len(), 2);
5673        for bar in handler_guard.iter() {
5674            assert_eq!(bar.volume.as_f64(), step as f64);
5675        }
5676    }
5677
5678    #[rstest]
5679    fn test_volume_runs_bar_aggregator_different_large_steps_produce_different_bar_counts(
5680        equity_aapl: Equity,
5681    ) {
5682        let instrument = InstrumentAny::Equity(equity_aapl);
5683        let total_volume = 3000_u64;
5684        let mut results = Vec::new();
5685
5686        for step in [1000_usize, 1500] {
5687            let bar_spec = BarSpecification::new(step, BarAggregation::VolumeRuns, PriceType::Last);
5688            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5689            let handler = Arc::new(Mutex::new(Vec::new()));
5690            let handler_clone = Arc::clone(&handler);
5691
5692            let mut aggregator = VolumeRunsBarAggregator::new(
5693                bar_type,
5694                instrument.price_precision(),
5695                instrument.size_precision(),
5696                move |bar: Bar| {
5697                    let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5698                    handler_guard.push(bar);
5699                },
5700            );
5701
5702            let trade = TradeTick {
5703                size: Quantity::from(total_volume),
5704                aggressor_side: AggressorSide::Buyer,
5705                ..TradeTick::default()
5706            };
5707
5708            aggregator.handle_trade(trade);
5709
5710            let handler_guard = handler.lock().expect(MUTEX_POISONED);
5711            results.push(handler_guard.len());
5712        }
5713
5714        assert_eq!(results[0], 3); // 3000 / 1000
5715        assert_eq!(results[1], 2); // 3000 / 1500
5716        assert_ne!(results[0], results[1]);
5717    }
5718
5719    /// Historical time-bar: event at `ts_init` is deferred until after the update (Cython parity).
5720    #[rstest]
5721    fn test_time_bar_historical_defers_event_at_ts_init_until_after_update(equity_aapl: Equity) {
5722        let instrument = InstrumentAny::Equity(equity_aapl);
5723        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
5724        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5725        let handler = Arc::new(Mutex::new(Vec::new()));
5726        let handler_clone = Arc::clone(&handler);
5727        let clock = Rc::new(RefCell::new(TestClock::new()));
5728
5729        let mut agg = TimeBarAggregator::new(
5730            bar_type,
5731            instrument.price_precision(),
5732            instrument.size_precision(),
5733            clock.clone(),
5734            move |bar: Bar| {
5735                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
5736                h.push(bar);
5737            },
5738            true,
5739            true,
5740            BarIntervalType::LeftOpen,
5741            None,
5742            0,
5743            false,
5744        );
5745        agg.historical_mode = true;
5746        agg.set_clock_internal(clock);
5747        let boxed: Box<dyn BarAggregator> = Box::new(agg);
5748        let rc = Rc::new(RefCell::new(boxed));
5749        rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
5750
5751        rc.borrow_mut().update(
5752            Price::from("100.00"),
5753            Quantity::from(1),
5754            UnixNanos::default(),
5755        );
5756        rc.borrow_mut().update(
5757            Price::from("100.00"),
5758            Quantity::from(1),
5759            UnixNanos::from(1_000_000_000),
5760        );
5761
5762        let bars = handler.lock().expect(MUTEX_POISONED);
5763        assert!(
5764            !bars.is_empty(),
5765            "deferred event at ts_init should produce a bar that includes the update"
5766        );
5767        let last_bar = bars.last().unwrap();
5768        assert_eq!(last_bar.close, Price::from("100.00"));
5769        assert!(
5770            last_bar.volume.as_f64() >= 1.0,
5771            "bar built after deferred event should include the update at ts_init"
5772        );
5773    }
5774
5775    #[rstest]
5776    fn test_spread_quote_quote_driven_emits_when_all_legs_received(equity_aapl: Equity) {
5777        let instrument = InstrumentAny::Equity(equity_aapl);
5778        let leg1 = instrument.id();
5779        let leg2 = InstrumentId::from("MSFT.XNAS");
5780        let spread_id = InstrumentId::from("SPREAD.XNAS");
5781        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5782        let handler = Arc::new(Mutex::new(Vec::new()));
5783        let handler_clone = Arc::clone(&handler);
5784        let clock = Rc::new(RefCell::new(TestClock::new()));
5785
5786        let mut agg = SpreadQuoteAggregator::new(
5787            spread_id,
5788            &legs,
5789            true,
5790            instrument.price_precision(),
5791            0,
5792            Box::new(move |q: QuoteTick| {
5793                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5794            }),
5795            clock,
5796            false,
5797            None,
5798            0,
5799            None,
5800            None,
5801        );
5802
5803        let ts = UnixNanos::from(1_000_000_000);
5804        agg.handle_quote_tick(QuoteTick::new(
5805            leg1,
5806            Price::from("100.00"),
5807            Price::from("100.10"),
5808            Quantity::from(10),
5809            Quantity::from(10),
5810            ts,
5811            ts,
5812        ));
5813        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5814
5815        agg.handle_quote_tick(QuoteTick::new(
5816            leg2,
5817            Price::from("99.00"),
5818            Price::from("99.10"),
5819            Quantity::from(10),
5820            Quantity::from(10),
5821            ts,
5822            ts,
5823        ));
5824        let quotes = handler.lock().expect(MUTEX_POISONED);
5825        assert_eq!(quotes.len(), 1);
5826        assert_eq!(quotes[0].instrument_id, spread_id);
5827        assert!(quotes[0].bid_price < quotes[0].ask_price);
5828    }
5829
5830    #[rstest]
5831    fn test_spread_quote_futures_pricing_signed_ratios(equity_aapl: Equity) {
5832        let instrument = InstrumentAny::Equity(equity_aapl);
5833        let leg1 = instrument.id();
5834        let leg2 = InstrumentId::from("MSFT.XNAS");
5835        let spread_id = InstrumentId::from("SPREAD.XNAS");
5836        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5837        let handler = Arc::new(Mutex::new(Vec::new()));
5838        let handler_clone = Arc::clone(&handler);
5839        let clock = Rc::new(RefCell::new(TestClock::new()));
5840
5841        let mut agg = SpreadQuoteAggregator::new(
5842            spread_id,
5843            &legs,
5844            true,
5845            instrument.price_precision(),
5846            0,
5847            Box::new(move |q: QuoteTick| {
5848                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5849            }),
5850            clock,
5851            false,
5852            None,
5853            0,
5854            None,
5855            None,
5856        );
5857
5858        let ts = UnixNanos::from(1_000_000_000);
5859        agg.handle_quote_tick(QuoteTick::new(
5860            leg1,
5861            Price::from("10.00"),
5862            Price::from("10.10"),
5863            Quantity::from(100),
5864            Quantity::from(100),
5865            ts,
5866            ts,
5867        ));
5868        agg.handle_quote_tick(QuoteTick::new(
5869            leg2,
5870            Price::from("20.00"),
5871            Price::from("20.10"),
5872            Quantity::from(100),
5873            Quantity::from(100),
5874            ts,
5875            ts,
5876        ));
5877        let quotes = handler.lock().expect(MUTEX_POISONED);
5878        assert_eq!(quotes.len(), 1);
5879        let q = &quotes[0];
5880        assert_eq!(q.instrument_id, spread_id);
5881        assert_eq!(q.bid_price, Price::from("-10.10"));
5882        assert_eq!(q.ask_price, Price::from("-9.90"));
5883    }
5884
5885    #[rstest]
5886    fn test_spread_quote_size_calculation_non_unit_ratios(equity_aapl: Equity) {
5887        let instrument = InstrumentAny::Equity(equity_aapl);
5888        let leg1 = instrument.id();
5889        let leg2 = InstrumentId::from("MSFT.XNAS");
5890        let spread_id = InstrumentId::from("SPREAD.XNAS");
5891        let legs = vec![(leg1, 2_i64), (leg2, -1_i64)];
5892        let handler = Arc::new(Mutex::new(Vec::new()));
5893        let handler_clone = Arc::clone(&handler);
5894        let clock = Rc::new(RefCell::new(TestClock::new()));
5895
5896        let mut agg = SpreadQuoteAggregator::new(
5897            spread_id,
5898            &legs,
5899            true,
5900            instrument.price_precision(),
5901            0,
5902            Box::new(move |q: QuoteTick| {
5903                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5904            }),
5905            clock,
5906            false,
5907            None,
5908            0,
5909            None,
5910            None,
5911        );
5912
5913        let ts = UnixNanos::from(1_000_000_000);
5914        agg.handle_quote_tick(QuoteTick::new(
5915            leg1,
5916            Price::from("10.00"),
5917            Price::from("10.10"),
5918            Quantity::from(100),
5919            Quantity::from(40),
5920            ts,
5921            ts,
5922        ));
5923        agg.handle_quote_tick(QuoteTick::new(
5924            leg2,
5925            Price::from("10.00"),
5926            Price::from("10.10"),
5927            Quantity::from(50),
5928            Quantity::from(30),
5929            ts,
5930            ts,
5931        ));
5932        let quotes = handler.lock().expect(MUTEX_POISONED);
5933        assert_eq!(quotes.len(), 1);
5934        let q = &quotes[0];
5935        assert_eq!(q.bid_size.as_f64(), 30.0);
5936        assert_eq!(q.ask_size.as_f64(), 20.0);
5937    }
5938
5939    #[rstest]
5940    fn test_spread_quote_timer_driven_emission_cadence(equity_aapl: Equity) {
5941        let instrument = InstrumentAny::Equity(equity_aapl);
5942        let leg1 = instrument.id();
5943        let leg2 = InstrumentId::from("MSFT.XNAS");
5944        let spread_id = InstrumentId::from("SPREAD.XNAS");
5945        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5946        let handler = Arc::new(Mutex::new(Vec::new()));
5947        let handler_clone = Arc::clone(&handler);
5948        let clock = Rc::new(RefCell::new(TestClock::new()));
5949        clock.borrow_mut().set_time(UnixNanos::from(0));
5950
5951        let agg = SpreadQuoteAggregator::new(
5952            spread_id,
5953            &legs,
5954            true,
5955            instrument.price_precision(),
5956            0,
5957            Box::new(move |q: QuoteTick| {
5958                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5959            }),
5960            clock.clone(),
5961            false,
5962            Some(1),
5963            0,
5964            None,
5965            None,
5966        );
5967        let rc = Rc::new(RefCell::new(agg));
5968        rc.borrow_mut().prepare_for_timer_mode(&rc);
5969        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
5970
5971        for event in clock.borrow_mut().advance_time(UnixNanos::from(0), true) {
5972            rc.borrow_mut().on_timer_fire(event.ts_event);
5973        }
5974        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5975
5976        let ts1 = UnixNanos::from(1_000_000_000);
5977        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5978            leg1,
5979            Price::from("100.00"),
5980            Price::from("100.10"),
5981            Quantity::from(10),
5982            Quantity::from(10),
5983            ts1,
5984            ts1,
5985        ));
5986        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5987            leg2,
5988            Price::from("99.00"),
5989            Price::from("99.10"),
5990            Quantity::from(10),
5991            Quantity::from(10),
5992            ts1,
5993            ts1,
5994        ));
5995
5996        for event in clock.borrow_mut().advance_time(ts1, true) {
5997            rc.borrow_mut().on_timer_fire(event.ts_event);
5998        }
5999
6000        {
6001            let quotes = handler.lock().expect(MUTEX_POISONED);
6002            assert_eq!(quotes.len(), 1);
6003            assert_eq!(quotes[0].ts_event, ts1);
6004            assert_eq!(quotes[0].ts_init, ts1);
6005        }
6006
6007        let ts2 = UnixNanos::from(2_000_000_000);
6008        for event in clock.borrow_mut().advance_time(ts2, true) {
6009            rc.borrow_mut().on_timer_fire(event.ts_event);
6010        }
6011
6012        let quotes = handler.lock().expect(MUTEX_POISONED);
6013        assert_eq!(quotes.len(), 1);
6014    }
6015
6016    #[rstest]
6017    fn test_spread_quote_historical_timer_waits_for_all_legs(equity_aapl: Equity) {
6018        let instrument = InstrumentAny::Equity(equity_aapl);
6019        let leg1 = instrument.id();
6020        let leg2 = InstrumentId::from("MSFT.XNAS");
6021        let spread_id = InstrumentId::from("SPREAD.XNAS");
6022        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6023        let handler = Arc::new(Mutex::new(Vec::new()));
6024        let handler_clone = Arc::clone(&handler);
6025        let clock = Rc::new(RefCell::new(TestClock::new()));
6026
6027        let agg = SpreadQuoteAggregator::new(
6028            spread_id,
6029            &legs,
6030            true,
6031            instrument.price_precision(),
6032            0,
6033            Box::new(move |q: QuoteTick| {
6034                handler_clone.lock().expect(MUTEX_POISONED).push(q);
6035            }),
6036            // need clock for set_clock after
6037            clock.clone(),
6038            true,
6039            Some(1),
6040            0,
6041            None,
6042            None,
6043        );
6044        let rc = Rc::new(RefCell::new(agg));
6045        rc.borrow_mut().prepare_for_timer_mode(&rc);
6046        rc.borrow_mut().set_clock(clock);
6047
6048        let ts1 = UnixNanos::from(1_000_000_000);
6049        let ts2 = UnixNanos::from(2_000_000_000);
6050        let ts3 = UnixNanos::from(3_000_000_000);
6051        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6052            leg1,
6053            Price::from("100.00"),
6054            Price::from("100.10"),
6055            Quantity::from(10),
6056            Quantity::from(10),
6057            ts1,
6058            ts1,
6059        ));
6060        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6061
6062        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6063            leg2,
6064            Price::from("99.00"),
6065            Price::from("99.10"),
6066            Quantity::from(10),
6067            Quantity::from(10),
6068            ts2,
6069            ts2,
6070        ));
6071        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6072
6073        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6074            leg1,
6075            Price::from("100.00"),
6076            Price::from("100.10"),
6077            Quantity::from(10),
6078            Quantity::from(10),
6079            ts3,
6080            ts3,
6081        ));
6082        let quotes = handler.lock().expect(MUTEX_POISONED);
6083        assert_eq!(
6084            quotes.len(),
6085            1,
6086            "deferred event at ts2 is processed when we have all legs and advance to ts3"
6087        );
6088    }
6089
6090    #[rstest]
6091    fn test_spread_quote_historical_flush_emits_pending_final_quote(equity_aapl: Equity) {
6092        let instrument = InstrumentAny::Equity(equity_aapl);
6093        let leg1 = instrument.id();
6094        let leg2 = InstrumentId::from("MSFT.XNAS");
6095        let spread_id = InstrumentId::from("SPREAD.XNAS");
6096        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6097        let handler = Arc::new(Mutex::new(Vec::new()));
6098        let handler_clone = Arc::clone(&handler);
6099        let clock = Rc::new(RefCell::new(TestClock::new()));
6100
6101        let agg = SpreadQuoteAggregator::new(
6102            spread_id,
6103            &legs,
6104            true,
6105            instrument.price_precision(),
6106            0,
6107            Box::new(move |q: QuoteTick| {
6108                handler_clone.lock().expect(MUTEX_POISONED).push(q);
6109            }),
6110            // need clock for set_clock after
6111            clock.clone(),
6112            true,
6113            Some(1),
6114            0,
6115            None,
6116            None,
6117        );
6118        let rc = Rc::new(RefCell::new(agg));
6119        rc.borrow_mut().prepare_for_timer_mode(&rc);
6120        rc.borrow_mut().set_clock(clock);
6121
6122        let ts1 = UnixNanos::from(1_000_000_000);
6123        let ts2 = UnixNanos::from(2_000_000_000);
6124        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6125            leg1,
6126            Price::from("100.00"),
6127            Price::from("100.10"),
6128            Quantity::from(10),
6129            Quantity::from(10),
6130            ts1,
6131            ts1,
6132        ));
6133        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
6134            leg2,
6135            Price::from("99.00"),
6136            Price::from("99.10"),
6137            Quantity::from(10),
6138            Quantity::from(10),
6139            ts2,
6140            ts2,
6141        ));
6142
6143        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
6144
6145        rc.borrow_mut().flush_pending_historical_quote();
6146
6147        let quotes = handler.lock().expect(MUTEX_POISONED);
6148        assert_eq!(
6149            quotes.len(),
6150            1,
6151            "final historical quote should be emitted when the deferred event is flushed",
6152        );
6153        assert_eq!(quotes[0].ts_event, ts2);
6154    }
6155
6156    #[rstest]
6157    fn test_spread_quote_option_vega_weighting(equity_aapl: Equity) {
6158        let instrument = InstrumentAny::Equity(equity_aapl);
6159        let leg1 = instrument.id();
6160        let leg2 = InstrumentId::from("MSFT.XNAS");
6161        let spread_id = InstrumentId::from("SPREAD.XNAS");
6162        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6163        let handler = Arc::new(Mutex::new(Vec::new()));
6164        let handler_clone = Arc::clone(&handler);
6165        let clock = Rc::new(RefCell::new(TestClock::new()));
6166
6167        let mut vega_provider = MapVegaProvider::new();
6168        vega_provider.insert(leg1, 0.15);
6169        vega_provider.insert(leg2, 0.12);
6170
6171        let mut agg = SpreadQuoteAggregator::new(
6172            spread_id,
6173            &legs,
6174            false,
6175            instrument.price_precision(),
6176            0,
6177            Box::new(move |q: QuoteTick| {
6178                handler_clone.lock().expect(MUTEX_POISONED).push(q);
6179            }),
6180            clock,
6181            false,
6182            None,
6183            0,
6184            Some(Box::new(vega_provider)),
6185            None,
6186        );
6187
6188        let ts = UnixNanos::from(1_000_000_000);
6189        agg.handle_quote_tick(QuoteTick::new(
6190            leg1,
6191            Price::from("10.00"),
6192            Price::from("10.20"),
6193            Quantity::from(100),
6194            Quantity::from(100),
6195            ts,
6196            ts,
6197        ));
6198        agg.handle_quote_tick(QuoteTick::new(
6199            leg2,
6200            Price::from("11.00"),
6201            Price::from("11.20"),
6202            Quantity::from(100),
6203            Quantity::from(100),
6204            ts,
6205            ts,
6206        ));
6207        let quotes = handler.lock().expect(MUTEX_POISONED);
6208        assert_eq!(quotes.len(), 1);
6209        let q = &quotes[0];
6210        assert!(q.bid_price < q.ask_price);
6211        assert!(q.ask_price.as_f64() - q.bid_price.as_f64() > 0.0);
6212    }
6213
6214    #[rstest]
6215    fn test_spread_quote_all_zero_vega_fallback(equity_aapl: Equity) {
6216        let instrument = InstrumentAny::Equity(equity_aapl);
6217        let leg1 = instrument.id();
6218        let leg2 = InstrumentId::from("MSFT.XNAS");
6219        let spread_id = InstrumentId::from("SPREAD.XNAS");
6220        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6221        let handler = Arc::new(Mutex::new(Vec::new()));
6222        let handler_clone = Arc::clone(&handler);
6223        let clock = Rc::new(RefCell::new(TestClock::new()));
6224
6225        let mut vega_provider = MapVegaProvider::new();
6226        vega_provider.insert(leg1, 0.0);
6227        vega_provider.insert(leg2, 0.0);
6228
6229        let mut agg = SpreadQuoteAggregator::new(
6230            spread_id,
6231            &legs,
6232            false,
6233            instrument.price_precision(),
6234            0,
6235            Box::new(move |q: QuoteTick| {
6236                handler_clone.lock().expect(MUTEX_POISONED).push(q);
6237            }),
6238            clock,
6239            false,
6240            None,
6241            0,
6242            Some(Box::new(vega_provider)),
6243            None,
6244        );
6245
6246        let ts = UnixNanos::from(1_000_000_000);
6247        agg.handle_quote_tick(QuoteTick::new(
6248            leg1,
6249            Price::from("10.00"),
6250            Price::from("10.10"),
6251            Quantity::from(100),
6252            Quantity::from(100),
6253            ts,
6254            ts,
6255        ));
6256        agg.handle_quote_tick(QuoteTick::new(
6257            leg2,
6258            Price::from("20.00"),
6259            Price::from("20.10"),
6260            Quantity::from(100),
6261            Quantity::from(100),
6262            ts,
6263            ts,
6264        ));
6265        let quotes = handler.lock().expect(MUTEX_POISONED);
6266        assert_eq!(quotes.len(), 1);
6267        let q = &quotes[0];
6268        assert_eq!(q.bid_price, Price::from("-10.10"));
6269        assert_eq!(q.ask_price, Price::from("-9.90"));
6270    }
6271
6272    #[rstest]
6273    fn test_spread_quote_negative_prices_tick_scheme(equity_aapl: Equity) {
6274        let instrument = InstrumentAny::Equity(equity_aapl);
6275        let leg1 = instrument.id();
6276        let leg2 = InstrumentId::from("MSFT.XNAS");
6277        let spread_id = InstrumentId::from("SPREAD.XNAS");
6278        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
6279        let handler = Arc::new(Mutex::new(Vec::new()));
6280        let handler_clone = Arc::clone(&handler);
6281        let clock = Rc::new(RefCell::new(TestClock::new()));
6282        let rounder = FixedTickSchemeRounder::new(0.01).unwrap();
6283
6284        let mut agg = SpreadQuoteAggregator::new(
6285            spread_id,
6286            &legs,
6287            true,
6288            2,
6289            0,
6290            Box::new(move |q: QuoteTick| {
6291                handler_clone.lock().expect(MUTEX_POISONED).push(q);
6292            }),
6293            clock,
6294            false,
6295            None,
6296            0,
6297            None,
6298            Some(Box::new(rounder)),
6299        );
6300
6301        let ts = UnixNanos::from(1_000_000_000);
6302        agg.handle_quote_tick(QuoteTick::new(
6303            leg1,
6304            Price::from("10.00"),
6305            Price::from("10.10"),
6306            Quantity::from(100),
6307            Quantity::from(100),
6308            ts,
6309            ts,
6310        ));
6311        agg.handle_quote_tick(QuoteTick::new(
6312            leg2,
6313            Price::from("20.00"),
6314            Price::from("20.10"),
6315            Quantity::from(100),
6316            Quantity::from(100),
6317            ts,
6318            ts,
6319        ));
6320        let quotes = handler.lock().expect(MUTEX_POISONED);
6321        assert_eq!(quotes.len(), 1);
6322        let q = &quotes[0];
6323        assert!(q.bid_price.as_f64() < 0.0);
6324        assert!(q.ask_price.as_f64() < 0.0);
6325        assert!(q.bid_price < q.ask_price);
6326    }
6327
6328    #[rstest]
6329    #[case(BarIntervalType::LeftOpen)]
6330    #[case(BarIntervalType::RightOpen)]
6331    fn test_time_bar_skip_first_non_full_bar_noop_on_boundary(
6332        equity_aapl: Equity,
6333        #[case] interval_type: BarIntervalType,
6334    ) {
6335        // When the clock sits on a bar boundary, fire_immediately=true and
6336        // first_close_ns equals that boundary. Every subsequent bar closes
6337        // strictly after first_close_ns, so skip_first_non_full_bar never
6338        // triggers and both bars emit.
6339        let instrument = InstrumentAny::Equity(equity_aapl);
6340        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6341        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6342        let handler = Arc::new(Mutex::new(Vec::new()));
6343        let handler_clone = Arc::clone(&handler);
6344        let clock = Rc::new(RefCell::new(TestClock::new()));
6345        clock.borrow_mut().set_time(UnixNanos::from(1_000_000_000));
6346        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6347
6348        let aggregator = TimeBarAggregator::new(
6349            bar_type,
6350            instrument.price_precision(),
6351            instrument.size_precision(),
6352            clock,
6353            move |bar: Bar| {
6354                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6355                h.push(bar);
6356            },
6357            false,
6358            false,
6359            interval_type,
6360            None,
6361            0,
6362            true, // skip_first_non_full_bar
6363        );
6364
6365        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6366        let rc = Rc::new(RefCell::new(boxed));
6367        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6368
6369        rc.borrow_mut().update(
6370            Price::from("100.00"),
6371            Quantity::from(1),
6372            UnixNanos::from(1_000_000_000),
6373        );
6374        rc.borrow_mut().build_bar(&TimeEvent::new(
6375            event_name,
6376            UUID4::new(),
6377            UnixNanos::from(2_000_000_000),
6378            UnixNanos::from(2_000_000_000),
6379        ));
6380        rc.borrow_mut().update(
6381            Price::from("101.00"),
6382            Quantity::from(1),
6383            UnixNanos::from(2_500_000_000),
6384        );
6385        rc.borrow_mut().build_bar(&TimeEvent::new(
6386            event_name,
6387            UUID4::new(),
6388            UnixNanos::from(3_000_000_000),
6389            UnixNanos::from(3_000_000_000),
6390        ));
6391
6392        let bars = handler.lock().expect(MUTEX_POISONED);
6393        assert_eq!(bars.len(), 2);
6394        assert_eq!(bars[0].close, Price::from("100.00"));
6395        assert_eq!(bars[1].close, Price::from("101.00"));
6396    }
6397
6398    #[rstest]
6399    #[case(BarIntervalType::LeftOpen)]
6400    #[case(BarIntervalType::RightOpen)]
6401    fn test_time_bar_skip_first_non_full_bar_drops_partial_bar(
6402        equity_aapl: Equity,
6403        #[case] interval_type: BarIntervalType,
6404    ) {
6405        // When the clock starts past a boundary (mid-interval), first_close_ns
6406        // is the upcoming boundary. The bar closing at first_close_ns is partial,
6407        // so skip_first_non_full_bar drops it; subsequent full bars emit.
6408        let instrument = InstrumentAny::Equity(equity_aapl);
6409        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6410        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6411        let handler = Arc::new(Mutex::new(Vec::new()));
6412        let handler_clone = Arc::clone(&handler);
6413        let clock = Rc::new(RefCell::new(TestClock::new()));
6414        clock.borrow_mut().set_time(UnixNanos::from(1_500_000_000));
6415        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6416
6417        let aggregator = TimeBarAggregator::new(
6418            bar_type,
6419            instrument.price_precision(),
6420            instrument.size_precision(),
6421            clock,
6422            move |bar: Bar| {
6423                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6424                h.push(bar);
6425            },
6426            false,
6427            false,
6428            interval_type,
6429            None,
6430            0,
6431            true, // skip_first_non_full_bar
6432        );
6433
6434        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6435        let rc = Rc::new(RefCell::new(boxed));
6436        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6437
6438        rc.borrow_mut().update(
6439            Price::from("100.00"),
6440            Quantity::from(1),
6441            UnixNanos::from(1_500_000_000),
6442        );
6443        rc.borrow_mut().build_bar(&TimeEvent::new(
6444            event_name,
6445            UUID4::new(),
6446            UnixNanos::from(2_000_000_000),
6447            UnixNanos::from(2_000_000_000),
6448        ));
6449        rc.borrow_mut().update(
6450            Price::from("101.00"),
6451            Quantity::from(1),
6452            UnixNanos::from(2_500_000_000),
6453        );
6454        rc.borrow_mut().build_bar(&TimeEvent::new(
6455            event_name,
6456            UUID4::new(),
6457            UnixNanos::from(3_000_000_000),
6458            UnixNanos::from(3_000_000_000),
6459        ));
6460
6461        let bars = handler.lock().expect(MUTEX_POISONED);
6462        assert_eq!(bars.len(), 1);
6463        assert_eq!(bars[0].close, Price::from("101.00"));
6464    }
6465
6466    #[rstest]
6467    fn test_time_bar_skip_first_non_full_bar_skips_every_call_before_first_close(
6468        equity_aapl: Equity,
6469    ) {
6470        // The flag must remain set across every build_and_send call whose
6471        // ts_init <= first_close_ns, and only flip once a bar actually emits.
6472        // Catches a mutation that flips skip_first_non_full_bar early.
6473        let instrument = InstrumentAny::Equity(equity_aapl);
6474        let bar_spec = BarSpecification::new(10, BarAggregation::Second, PriceType::Last);
6475        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6476        let handler = Arc::new(Mutex::new(Vec::new()));
6477        let handler_clone = Arc::clone(&handler);
6478        let clock = Rc::new(RefCell::new(TestClock::new()));
6479        clock.borrow_mut().set_time(UnixNanos::from(5_000_000_000));
6480        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6481
6482        let aggregator = TimeBarAggregator::new(
6483            bar_type,
6484            instrument.price_precision(),
6485            instrument.size_precision(),
6486            clock,
6487            move |bar: Bar| {
6488                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6489                h.push(bar);
6490            },
6491            false,
6492            false,
6493            BarIntervalType::LeftOpen,
6494            None,
6495            0,
6496            true, // skip_first_non_full_bar
6497        );
6498
6499        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6500        let rc = Rc::new(RefCell::new(boxed));
6501        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6502
6503        // first_close_ns is 10_000_000_000 (first 10s boundary after start).
6504        // Drive three build_bar calls at ts <= first_close_ns, each preceded by a
6505        // distinct update. Every one of them must be skipped.
6506        for (price, update_ts, event_ts) in [
6507            ("100.00", 5_500_000_000_u64, 7_000_000_000_u64),
6508            ("101.00", 7_500_000_000_u64, 8_000_000_000_u64),
6509            ("102.00", 9_000_000_000_u64, 10_000_000_000_u64),
6510        ] {
6511            rc.borrow_mut().update(
6512                Price::from(price),
6513                Quantity::from(1),
6514                UnixNanos::from(update_ts),
6515            );
6516            rc.borrow_mut().build_bar(&TimeEvent::new(
6517                event_name,
6518                UUID4::new(),
6519                UnixNanos::from(event_ts),
6520                UnixNanos::from(event_ts),
6521            ));
6522        }
6523
6524        // Final update + build past first_close_ns emits for the first time.
6525        rc.borrow_mut().update(
6526            Price::from("103.00"),
6527            Quantity::from(1),
6528            UnixNanos::from(10_500_000_000),
6529        );
6530        rc.borrow_mut().build_bar(&TimeEvent::new(
6531            event_name,
6532            UUID4::new(),
6533            UnixNanos::from(11_000_000_000),
6534            UnixNanos::from(11_000_000_000),
6535        ));
6536
6537        let bars = handler.lock().expect(MUTEX_POISONED);
6538        assert_eq!(bars.len(), 1);
6539        assert_eq!(bars[0].close, Price::from("103.00"));
6540    }
6541
6542    #[rstest]
6543    fn test_time_bar_skip_first_non_full_bar_skips_when_build_delay_shifts_start(
6544        equity_aapl: Equity,
6545    ) {
6546        // Cython parity: when bar_build_delay > 0 pushes start_time past a
6547        // boundary (even if `now` is on a boundary), first_close_ns is set and
6548        // the first bar is skipped. The previous Rust `now > start_time` guard
6549        // incorrectly kept this first bar.
6550        let instrument = InstrumentAny::Equity(equity_aapl);
6551        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6552        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6553        let handler = Arc::new(Mutex::new(Vec::new()));
6554        let handler_clone = Arc::clone(&handler);
6555        let clock = Rc::new(RefCell::new(TestClock::new()));
6556        clock.borrow_mut().set_time(UnixNanos::from(2_000_000_000));
6557        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6558
6559        let aggregator = TimeBarAggregator::new(
6560            bar_type,
6561            instrument.price_precision(),
6562            instrument.size_precision(),
6563            clock,
6564            move |bar: Bar| {
6565                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6566                h.push(bar);
6567            },
6568            false,
6569            false,
6570            BarIntervalType::LeftOpen,
6571            None,
6572            100,  // bar_build_delay (microseconds)
6573            true, // skip_first_non_full_bar
6574        );
6575
6576        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6577        let rc = Rc::new(RefCell::new(boxed));
6578        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6579
6580        // start_time = 2s + 100us = 2_000_100_000 ns; first_close_ns = 3_000_100_000 ns.
6581        rc.borrow_mut().update(
6582            Price::from("100.00"),
6583            Quantity::from(1),
6584            UnixNanos::from(2_500_000_000),
6585        );
6586        rc.borrow_mut().build_bar(&TimeEvent::new(
6587            event_name,
6588            UUID4::new(),
6589            UnixNanos::from(3_000_100_000),
6590            UnixNanos::from(3_000_100_000),
6591        ));
6592        rc.borrow_mut().update(
6593            Price::from("101.00"),
6594            Quantity::from(1),
6595            UnixNanos::from(3_500_000_000),
6596        );
6597        rc.borrow_mut().build_bar(&TimeEvent::new(
6598            event_name,
6599            UUID4::new(),
6600            UnixNanos::from(4_000_100_000),
6601            UnixNanos::from(4_000_100_000),
6602        ));
6603
6604        let bars = handler.lock().expect(MUTEX_POISONED);
6605        assert_eq!(bars.len(), 1);
6606        assert_eq!(bars[0].close, Price::from("101.00"));
6607    }
6608
6609    #[rstest]
6610    #[case(
6611        BarAggregation::Month,
6612        1_735_689_600_000_000_000_u64,
6613        1_733_011_200_000_000_000_u64
6614    )]
6615    #[case(
6616        BarAggregation::Year,
6617        1_735_689_600_000_000_000_u64,
6618        1_704_067_200_000_000_000_u64
6619    )]
6620    fn test_time_bar_fire_immediately_month_year_stored_open_points_to_previous_period(
6621        equity_aapl: Equity,
6622        #[case] aggregation: BarAggregation,
6623        #[case] start_ns: u64,
6624        #[case] expected_stored_open_ns: u64,
6625    ) {
6626        // When the clock is exactly on a month/year boundary, fire_immediately=true.
6627        // stored_open_ns must resolve to one step before start_time (mirrors Cython
6628        // close_time - step arithmetic) so the first bar's open timestamp marks
6629        // the true start of the in-progress interval.
6630        let instrument = InstrumentAny::Equity(equity_aapl);
6631        let bar_spec = BarSpecification::new(1, aggregation, PriceType::Last);
6632        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6633        let handler = Arc::new(Mutex::new(Vec::new()));
6634        let handler_clone = Arc::clone(&handler);
6635        let clock = Rc::new(RefCell::new(TestClock::new()));
6636        clock.borrow_mut().set_time(UnixNanos::from(start_ns));
6637        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6638
6639        let aggregator = TimeBarAggregator::new(
6640            bar_type,
6641            instrument.price_precision(),
6642            instrument.size_precision(),
6643            clock,
6644            move |bar: Bar| {
6645                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6646                h.push(bar);
6647            },
6648            false,
6649            false,
6650            BarIntervalType::RightOpen, // ts_event = stored_open_ns
6651            None,
6652            0,
6653            false, // skip_first_non_full_bar
6654        );
6655
6656        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6657        let rc = Rc::new(RefCell::new(boxed));
6658        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6659
6660        rc.borrow_mut().update(
6661            Price::from("100.00"),
6662            Quantity::from(1),
6663            UnixNanos::from(start_ns),
6664        );
6665        rc.borrow_mut().build_bar(&TimeEvent::new(
6666            event_name,
6667            UUID4::new(),
6668            UnixNanos::from(start_ns),
6669            UnixNanos::from(start_ns),
6670        ));
6671
6672        let bars = handler.lock().expect(MUTEX_POISONED);
6673        assert_eq!(bars.len(), 1);
6674        assert_eq!(bars[0].ts_event, UnixNanos::from(expected_stored_open_ns));
6675        assert_eq!(bars[0].ts_init, UnixNanos::from(start_ns));
6676    }
6677
6678    #[rstest]
6679    fn test_time_bar_historical_prevents_bars_for_timer_before_last_data(equity_aapl: Equity) {
6680        let instrument = InstrumentAny::Equity(equity_aapl);
6681        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6682        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6683        let handler = Arc::new(Mutex::new(Vec::new()));
6684        let handler_clone = Arc::clone(&handler);
6685        let clock = Rc::new(RefCell::new(TestClock::new()));
6686
6687        let mut agg = TimeBarAggregator::new(
6688            bar_type,
6689            instrument.price_precision(),
6690            instrument.size_precision(),
6691            clock.clone(),
6692            move |bar: Bar| {
6693                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6694                h.push(bar);
6695            },
6696            true,
6697            true,
6698            BarIntervalType::LeftOpen,
6699            None,
6700            0,
6701            false,
6702        );
6703        agg.historical_mode = true;
6704        agg.set_clock_internal(clock);
6705        let boxed: Box<dyn BarAggregator> = Box::new(agg);
6706        let rc = Rc::new(RefCell::new(boxed));
6707        rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
6708
6709        let ts1 = UnixNanos::from(2_000_000_000);
6710        rc.borrow_mut()
6711            .update(Price::from("100.00"), Quantity::from(1), ts1);
6712
6713        let ts2 = UnixNanos::from(3_000_000_000);
6714        rc.borrow_mut()
6715            .update(Price::from("101.00"), Quantity::from(1), ts2);
6716
6717        let bars = handler.lock().expect(MUTEX_POISONED);
6718        assert!(
6719            !bars.is_empty(),
6720            "advancing time from ts1 to ts2 should produce at least one bar"
6721        );
6722        assert_eq!(bars[0].close, Price::from("100.00"));
6723    }
6724}
6725
6726#[cfg(test)]
6727mod property_tests {
6728    use std::{
6729        cell::RefCell,
6730        rc::Rc,
6731        sync::{Arc, Mutex},
6732    };
6733
6734    use nautilus_common::{clock::TestClock, timer::TimeEvent};
6735    use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
6736    use nautilus_model::{
6737        data::{Bar, BarSpecification, BarType, bar::get_bar_interval_ns},
6738        enums::{AggregationSource, BarAggregation, BarIntervalType, PriceType},
6739        instruments::{Instrument, InstrumentAny, stubs::equity_aapl},
6740        types::{Price, Quantity},
6741    };
6742    use proptest::prelude::*;
6743    use rstest::rstest;
6744    use ustr::Ustr;
6745
6746    use super::*;
6747
6748    fn time_bar_spec_strategy() -> impl Strategy<Value = (BarAggregation, usize)> {
6749        prop_oneof![
6750            (Just(BarAggregation::Second), 1usize..=5),
6751            (Just(BarAggregation::Minute), 1usize..=5),
6752            (Just(BarAggregation::Hour), 1usize..=4),
6753        ]
6754    }
6755
6756    fn interval_type_strategy() -> impl Strategy<Value = BarIntervalType> {
6757        prop_oneof![
6758            Just(BarIntervalType::LeftOpen),
6759            Just(BarIntervalType::RightOpen),
6760        ]
6761    }
6762
6763    proptest! {
6764        #[rstest]
6765        fn prop_skip_first_drops_partial_then_emits(
6766            (aggregation, step) in time_bar_spec_strategy(),
6767            interval_type in interval_type_strategy(),
6768            skip_first in any::<bool>(),
6769        ) {
6770            let instrument = InstrumentAny::Equity(equity_aapl());
6771            let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
6772            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6773            let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
6774
6775            // Anchor the clock one full interval past epoch plus a half-interval offset
6776            // so start_time lands mid-interval and fire_immediately is false.
6777            let now_ns = interval_ns + interval_ns / 2;
6778
6779            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6780            let handler_clone = Arc::clone(&handler);
6781            let clock = Rc::new(RefCell::new(TestClock::new()));
6782            clock.borrow_mut().set_time(UnixNanos::from(now_ns));
6783            let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6784
6785            let aggregator = TimeBarAggregator::new(
6786                bar_type,
6787                instrument.price_precision(),
6788                instrument.size_precision(),
6789                clock,
6790                move |bar: Bar| {
6791                    let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6792                    h.push(bar);
6793                },
6794                false,
6795                false,
6796                interval_type,
6797                None,
6798                0,
6799                skip_first,
6800            );
6801
6802            let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6803            let rc = Rc::new(RefCell::new(boxed));
6804            rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6805
6806            // First tick + first close event. start_time = 1 * interval, first_close
6807            // = 2 * interval. ts_init == first_close_ns: partial bar.
6808            rc.borrow_mut().update(
6809                Price::from("100.00"),
6810                Quantity::from(1),
6811                UnixNanos::from(now_ns),
6812            );
6813            let first_close = 2 * interval_ns;
6814            rc.borrow_mut().build_bar(&TimeEvent::new(
6815                event_name,
6816                UUID4::new(),
6817                UnixNanos::from(first_close),
6818                UnixNanos::from(first_close),
6819            ));
6820
6821            // Second tick + later close; emits unconditionally.
6822            rc.borrow_mut().update(
6823                Price::from("101.00"),
6824                Quantity::from(1),
6825                UnixNanos::from(first_close + interval_ns / 2),
6826            );
6827            let second_close = first_close + interval_ns;
6828            rc.borrow_mut().build_bar(&TimeEvent::new(
6829                event_name,
6830                UUID4::new(),
6831                UnixNanos::from(second_close),
6832                UnixNanos::from(second_close),
6833            ));
6834
6835            let bars = handler.lock().expect(MUTEX_POISONED);
6836            let expected = if skip_first { 1 } else { 2 };
6837            prop_assert_eq!(bars.len(), expected);
6838            prop_assert_eq!(bars.last().unwrap().close, Price::from("101.00"));
6839            for bar in bars.iter() {
6840                prop_assert!(bar.high >= bar.open);
6841                prop_assert!(bar.high >= bar.close);
6842                prop_assert!(bar.low <= bar.open);
6843                prop_assert!(bar.low <= bar.close);
6844            }
6845        }
6846
6847        #[rstest]
6848        fn prop_skip_first_noop_on_exact_boundary(
6849            (aggregation, step) in time_bar_spec_strategy(),
6850            interval_type in interval_type_strategy(),
6851        ) {
6852            let instrument = InstrumentAny::Equity(equity_aapl());
6853            let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
6854            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6855            let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
6856
6857            // Clock exactly on a bar boundary: fire_immediately=true, so the first
6858            // bar that reaches build_and_send must emit regardless of skip_first.
6859            let now_ns = interval_ns;
6860            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6861            let handler_clone = Arc::clone(&handler);
6862            let clock = Rc::new(RefCell::new(TestClock::new()));
6863            clock.borrow_mut().set_time(UnixNanos::from(now_ns));
6864            let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6865
6866            let aggregator = TimeBarAggregator::new(
6867                bar_type,
6868                instrument.price_precision(),
6869                instrument.size_precision(),
6870                clock,
6871                move |bar: Bar| {
6872                    let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6873                    h.push(bar);
6874                },
6875                false,
6876                false,
6877                interval_type,
6878                None,
6879                0,
6880                true, // skip_first_non_full_bar
6881            );
6882
6883            let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6884            let rc = Rc::new(RefCell::new(boxed));
6885            rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6886
6887            rc.borrow_mut().update(
6888                Price::from("100.00"),
6889                Quantity::from(1),
6890                UnixNanos::from(now_ns),
6891            );
6892            let next_close = now_ns + interval_ns;
6893            rc.borrow_mut().build_bar(&TimeEvent::new(
6894                event_name,
6895                UUID4::new(),
6896                UnixNanos::from(next_close),
6897                UnixNanos::from(next_close),
6898            ));
6899
6900            let bars = handler.lock().expect(MUTEX_POISONED);
6901            prop_assert_eq!(bars.len(), 1);
6902            prop_assert_eq!(bars[0].close, Price::from("100.00"));
6903        }
6904
6905        #[rstest]
6906        fn prop_bar_builder_ohlc_invariants(
6907            updates in prop::collection::vec((1i64..=100_000i64, 1u64..=1_000u64), 1..=50),
6908        ) {
6909            let instrument = InstrumentAny::Equity(equity_aapl());
6910            let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
6911            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6912            let mut builder = BarBuilder::new(bar_type, 2, 0);
6913
6914            let mut total_volume: u64 = 0;
6915
6916            for (i, (price_cents, size)) in updates.iter().enumerate() {
6917                let price = Price::new((*price_cents as f64) / 100.0, 2);
6918                let qty = Quantity::new(*size as f64, 0);
6919                let ts = UnixNanos::from((i as u64 + 1) * 1_000);
6920                total_volume += *size;
6921                builder.update(price, qty, ts);
6922            }
6923
6924            let bar = builder.build_now();
6925            prop_assert!(bar.low <= bar.open);
6926            prop_assert!(bar.low <= bar.close);
6927            prop_assert!(bar.high >= bar.open);
6928            prop_assert!(bar.high >= bar.close);
6929            prop_assert!(bar.low <= bar.high);
6930            prop_assert_eq!(bar.volume.as_f64(), total_volume as f64);
6931        }
6932
6933        #[rstest]
6934        fn prop_tick_bar_aggregator_volume_conservation(
6935            ticks in prop::collection::vec((1i64..=1_000i64, 1u64..=100u64), 3..=60),
6936            step in 1usize..=5,
6937        ) {
6938            let instrument = InstrumentAny::Equity(equity_aapl());
6939            let bar_spec = BarSpecification::new(step, BarAggregation::Tick, PriceType::Last);
6940            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6941            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6942            let handler_clone = Arc::clone(&handler);
6943
6944            let mut aggregator = TickBarAggregator::new(
6945                bar_type,
6946                instrument.price_precision(),
6947                instrument.size_precision(),
6948                move |bar: Bar| {
6949                    handler_clone.lock().expect(MUTEX_POISONED).push(bar);
6950                },
6951            );
6952
6953            let mut total_input: u64 = 0;
6954
6955            for (i, (price_cents, size)) in ticks.iter().enumerate() {
6956                let price = Price::new((*price_cents as f64) / 100.0, 2);
6957                let qty = Quantity::new(*size as f64, 0);
6958                aggregator.update(price, qty, UnixNanos::from((i as u64 + 1) * 1_000));
6959                total_input += *size;
6960            }
6961
6962            let bars = handler.lock().expect(MUTEX_POISONED);
6963            let emitted_count = bars.len();
6964            prop_assert_eq!(emitted_count, ticks.len() / step);
6965
6966            let mut sum_emitted: f64 = 0.0;
6967
6968            for bar in bars.iter() {
6969                prop_assert!(bar.low <= bar.open);
6970                prop_assert!(bar.low <= bar.close);
6971                prop_assert!(bar.high >= bar.open);
6972                prop_assert!(bar.high >= bar.close);
6973                sum_emitted += bar.volume.as_f64();
6974            }
6975
6976            // Unemitted pending size remains in the builder for the remainder `ticks.len() % step` ticks.
6977            let pending_size: u64 = ticks.iter()
6978                .skip(emitted_count * step)
6979                .map(|(_, s)| *s)
6980                .sum();
6981            prop_assert!((sum_emitted + pending_size as f64 - total_input as f64).abs() < 1e-6);
6982        }
6983
6984        #[rstest]
6985        fn prop_volume_bar_aggregator_conservation(
6986            sizes in prop::collection::vec(1u64..=50u64, 3..=40),
6987            step in 2u64..=10u64,
6988        ) {
6989            let instrument = InstrumentAny::Equity(equity_aapl());
6990            let bar_spec = BarSpecification::new(step as usize, BarAggregation::Volume, PriceType::Last);
6991            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6992            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6993            let handler_clone = Arc::clone(&handler);
6994
6995            let mut aggregator = VolumeBarAggregator::new(
6996                bar_type,
6997                instrument.price_precision(),
6998                instrument.size_precision(),
6999                move |bar: Bar| {
7000                    handler_clone.lock().expect(MUTEX_POISONED).push(bar);
7001                },
7002            );
7003
7004            let mut total_input: u64 = 0;
7005
7006            for (i, size) in sizes.iter().enumerate() {
7007                aggregator.update(
7008                    Price::from("100.00"),
7009                    Quantity::new(*size as f64, 0),
7010                    UnixNanos::from((i as u64 + 1) * 1_000),
7011                );
7012                total_input += *size;
7013            }
7014
7015            let bars = handler.lock().expect(MUTEX_POISONED);
7016
7017            // Every emitted bar has exactly `step` volume and OHLC ordering holds.
7018            for bar in bars.iter() {
7019                prop_assert_eq!(bar.volume, Quantity::from(step));
7020                prop_assert!(bar.low <= bar.open);
7021                prop_assert!(bar.low <= bar.close);
7022                prop_assert!(bar.high >= bar.open);
7023                prop_assert!(bar.high >= bar.close);
7024            }
7025
7026            // Conservation: total emitted + pending builder volume equals total input.
7027            let emitted_total: u64 = bars.len() as u64 * step;
7028            let pending = aggregator.core.builder.volume.as_f64();
7029            prop_assert!((emitted_total as f64 + pending - total_input as f64).abs() < 1e-6);
7030        }
7031
7032        #[rstest]
7033        fn prop_bar_builder_spread_adjustment_is_additive(
7034            updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7035            spread_cents in -10_000i64..=10_000i64,
7036            backward in any::<bool>(),
7037        ) {
7038            let instrument = InstrumentAny::Equity(equity_aapl());
7039            let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7040            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7041            let mut builder = BarBuilder::new(bar_type, 2, 0);
7042
7043            let spread = Decimal::new(spread_cents, 2);
7044            let mode = if backward {
7045                ContinuousFutureAdjustmentType::BackwardSpread
7046            } else {
7047                ContinuousFutureAdjustmentType::ForwardSpread
7048            };
7049            builder.set_adjustment(spread, mode);
7050
7051            let mut min_cents = i64::MAX;
7052            let mut max_cents = i64::MIN;
7053
7054            for (i, (price_cents, size)) in updates.iter().enumerate() {
7055                if *price_cents < min_cents {
7056                    min_cents = *price_cents;
7057                }
7058
7059                if *price_cents > max_cents {
7060                    max_cents = *price_cents;
7061                }
7062
7063                builder.update(
7064                    Price::new((*price_cents as f64) / 100.0, 2),
7065                    Quantity::new(*size as f64, 0),
7066                    UnixNanos::from((i as u64 + 1) * 1_000),
7067                );
7068            }
7069
7070            let bar = builder.build_now();
7071            let first_decimal = Decimal::new(updates.first().unwrap().0, 2);
7072            let last_decimal = Decimal::new(updates.last().unwrap().0, 2);
7073            let min_decimal = Decimal::new(min_cents, 2);
7074            let max_decimal = Decimal::new(max_cents, 2);
7075
7076            prop_assert_eq!(bar.open.as_decimal(), first_decimal + spread);
7077            prop_assert_eq!(bar.close.as_decimal(), last_decimal + spread);
7078            prop_assert_eq!(bar.low.as_decimal(), min_decimal + spread);
7079            prop_assert_eq!(bar.high.as_decimal(), max_decimal + spread);
7080        }
7081
7082        #[rstest]
7083        fn prop_bar_builder_inactive_adjustment_is_identity(
7084            updates in prop::collection::vec((1i64..=100_000i64, 1u64..=1_000u64), 1..=20),
7085            use_ratio in any::<bool>(),
7086        ) {
7087            let instrument = InstrumentAny::Equity(equity_aapl());
7088            let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7089            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7090
7091            let mut adjusted = BarBuilder::new(bar_type, 2, 0);
7092            let mut baseline = BarBuilder::new(bar_type, 2, 0);
7093
7094            // Inactive in either mode: ZERO spread or ONE ratio.
7095            let (input, mode) = if use_ratio {
7096                (Decimal::ONE, ContinuousFutureAdjustmentType::BackwardRatio)
7097            } else {
7098                (Decimal::ZERO, ContinuousFutureAdjustmentType::BackwardSpread)
7099            };
7100            adjusted.set_adjustment(input, mode);
7101
7102            for (i, (price_cents, size)) in updates.iter().enumerate() {
7103                let price = Price::new((*price_cents as f64) / 100.0, 2);
7104                let qty = Quantity::new(*size as f64, 0);
7105                let ts = UnixNanos::from((i as u64 + 1) * 1_000);
7106                adjusted.update(price, qty, ts);
7107                baseline.update(price, qty, ts);
7108            }
7109
7110            let bar_adjusted = adjusted.build_now();
7111            let bar_baseline = baseline.build_now();
7112            prop_assert_eq!(bar_adjusted.open, bar_baseline.open);
7113            prop_assert_eq!(bar_adjusted.high, bar_baseline.high);
7114            prop_assert_eq!(bar_adjusted.low, bar_baseline.low);
7115            prop_assert_eq!(bar_adjusted.close, bar_baseline.close);
7116            prop_assert_eq!(bar_adjusted.volume, bar_baseline.volume);
7117        }
7118
7119        #[rstest]
7120        fn prop_bar_builder_spread_preserves_raw_arithmetic(
7121            updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7122            // Sub-precision spread: scale 4 versus price precision 2. Locks in that
7123            // spread mode performs raw addition without rounding to price precision.
7124            spread_micro in -10_000i64..=10_000i64,
7125        ) {
7126            let instrument = InstrumentAny::Equity(equity_aapl());
7127            let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7128            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7129            let mut builder = BarBuilder::new(bar_type, 2, 0);
7130
7131            let spread = Decimal::new(spread_micro, 4);
7132            builder.set_adjustment(spread, ContinuousFutureAdjustmentType::BackwardSpread);
7133
7134            let adjustment_raw_i128 = mantissa_exponent_to_fixed_i128(
7135                spread.mantissa(),
7136                -(spread.scale() as i8),
7137                FIXED_PRECISION,
7138            )
7139            .expect("scale within range");
7140            #[allow(
7141                clippy::useless_conversion,
7142                reason = "i128 to PriceRaw is real when not high-precision"
7143            )]
7144            let expected_adjustment_raw: PriceRaw =
7145                adjustment_raw_i128.try_into().expect("within PriceRaw range");
7146
7147            let mut min_cents = i64::MAX;
7148            let mut max_cents = i64::MIN;
7149            let mut last_price = Price::new(0.0, 2);
7150            let mut first_price = Price::new(0.0, 2);
7151
7152            for (i, (price_cents, size)) in updates.iter().enumerate() {
7153                if *price_cents < min_cents {
7154                    min_cents = *price_cents;
7155                }
7156
7157                if *price_cents > max_cents {
7158                    max_cents = *price_cents;
7159                }
7160
7161                let price = Price::new((*price_cents as f64) / 100.0, 2);
7162
7163                if i == 0 {
7164                    first_price = price;
7165                }
7166
7167                last_price = price;
7168                builder.update(
7169                    price,
7170                    Quantity::new(*size as f64, 0),
7171                    UnixNanos::from((i as u64 + 1) * 1_000),
7172                );
7173            }
7174
7175            let bar = builder.build_now();
7176            let min_price = Price::new((min_cents as f64) / 100.0, 2);
7177            let max_price = Price::new((max_cents as f64) / 100.0, 2);
7178            prop_assert_eq!(bar.open.raw, first_price.raw + expected_adjustment_raw);
7179            prop_assert_eq!(bar.close.raw, last_price.raw + expected_adjustment_raw);
7180            prop_assert_eq!(bar.low.raw, min_price.raw + expected_adjustment_raw);
7181            prop_assert_eq!(bar.high.raw, max_price.raw + expected_adjustment_raw);
7182            prop_assert_eq!(bar.open.precision, 2);
7183            prop_assert_eq!(bar.high.precision, 2);
7184            prop_assert_eq!(bar.low.precision, 2);
7185            prop_assert_eq!(bar.close.precision, 2);
7186        }
7187
7188        #[rstest]
7189        fn prop_bar_builder_active_ratio_scales_each_ohlc(
7190            updates in prop::collection::vec((1_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7191            // Ratio in [0.50, 2.00] excluding exactly 1.00 to stay on the active path.
7192            ratio_centi in prop_oneof![50i64..=99i64, 101i64..=200i64],
7193            backward in any::<bool>(),
7194        ) {
7195            let instrument = InstrumentAny::Equity(equity_aapl());
7196            let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7197            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7198            let mut builder = BarBuilder::new(bar_type, 2, 0);
7199
7200            let ratio_decimal = Decimal::new(ratio_centi, 2);
7201            let ratio_f64 = (ratio_centi as f64) / 100.0;
7202            let mode = if backward {
7203                ContinuousFutureAdjustmentType::BackwardRatio
7204            } else {
7205                ContinuousFutureAdjustmentType::ForwardRatio
7206            };
7207            builder.set_adjustment(ratio_decimal, mode);
7208
7209            let mut min_cents = i64::MAX;
7210            let mut max_cents = i64::MIN;
7211            let mut first_cents = 0i64;
7212            let mut last_cents = 0i64;
7213
7214            for (i, (price_cents, size)) in updates.iter().enumerate() {
7215                if *price_cents < min_cents {
7216                    min_cents = *price_cents;
7217                }
7218
7219                if *price_cents > max_cents {
7220                    max_cents = *price_cents;
7221                }
7222
7223                if i == 0 {
7224                    first_cents = *price_cents;
7225                }
7226
7227                last_cents = *price_cents;
7228                builder.update(
7229                    Price::new((*price_cents as f64) / 100.0, 2),
7230                    Quantity::new(*size as f64, 0),
7231                    UnixNanos::from((i as u64 + 1) * 1_000),
7232                );
7233            }
7234
7235            let bar = builder.build_now();
7236            // Recompute via the same float math as the hot path so equality is exact.
7237            let expect = |cents: i64| Price::new((cents as f64) / 100.0 * ratio_f64, 2);
7238            prop_assert_eq!(bar.open, expect(first_cents));
7239            prop_assert_eq!(bar.close, expect(last_cents));
7240            // Ratio with positive ratio_f64 preserves ordering, so min/max map directly.
7241            prop_assert_eq!(bar.low, expect(min_cents));
7242            prop_assert_eq!(bar.high, expect(max_cents));
7243        }
7244
7245        #[rstest]
7246        fn prop_bar_builder_spread_mode_direction_is_metadata_only(
7247            updates in prop::collection::vec((10_000i64..=100_000i64, 1u64..=100u64), 1..=20),
7248            spread_cents in -10_000i64..=10_000i64,
7249        ) {
7250            let instrument = InstrumentAny::Equity(equity_aapl());
7251            let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
7252            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7253
7254            let spread = Decimal::new(spread_cents, 2);
7255            let mut backward = BarBuilder::new(bar_type, 2, 0);
7256            let mut forward = BarBuilder::new(bar_type, 2, 0);
7257            backward.set_adjustment(spread, ContinuousFutureAdjustmentType::BackwardSpread);
7258            forward.set_adjustment(spread, ContinuousFutureAdjustmentType::ForwardSpread);
7259
7260            for (i, (price_cents, size)) in updates.iter().enumerate() {
7261                let price = Price::new((*price_cents as f64) / 100.0, 2);
7262                let qty = Quantity::new(*size as f64, 0);
7263                let ts = UnixNanos::from((i as u64 + 1) * 1_000);
7264                backward.update(price, qty, ts);
7265                forward.update(price, qty, ts);
7266            }
7267
7268            let bar_backward = backward.build_now();
7269            let bar_forward = forward.build_now();
7270            prop_assert_eq!(bar_backward.open, bar_forward.open);
7271            prop_assert_eq!(bar_backward.high, bar_forward.high);
7272            prop_assert_eq!(bar_backward.low, bar_forward.low);
7273            prop_assert_eq!(bar_backward.close, bar_forward.close);
7274        }
7275
7276        #[rstest]
7277        fn prop_value_bar_aggregator_ohlc_invariants(
7278            ticks in prop::collection::vec((50i64..=500i64, 1u64..=20u64), 2..=30),
7279            step in 100u64..=2_000u64,
7280        ) {
7281            let instrument = InstrumentAny::Equity(equity_aapl());
7282            let bar_spec = BarSpecification::new(step as usize, BarAggregation::Value, PriceType::Last);
7283            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
7284            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
7285            let handler_clone = Arc::clone(&handler);
7286
7287            let mut aggregator = ValueBarAggregator::new(
7288                bar_type,
7289                instrument.price_precision(),
7290                instrument.size_precision(),
7291                move |bar: Bar| {
7292                    handler_clone.lock().expect(MUTEX_POISONED).push(bar);
7293                },
7294            );
7295
7296            for (i, (price_cents, size)) in ticks.iter().enumerate() {
7297                aggregator.update(
7298                    Price::new((*price_cents as f64) / 100.0, 2),
7299                    Quantity::new(*size as f64, 0),
7300                    UnixNanos::from((i as u64 + 1) * 1_000),
7301                );
7302            }
7303
7304            let bars = handler.lock().expect(MUTEX_POISONED);
7305            for bar in bars.iter() {
7306                prop_assert!(bar.low <= bar.open);
7307                prop_assert!(bar.low <= bar.close);
7308                prop_assert!(bar.high >= bar.open);
7309                prop_assert!(bar.high >= bar.close);
7310                prop_assert!(bar.volume.as_f64() > 0.0);
7311            }
7312        }
7313    }
7314}