Skip to main content

nautilus_model/data/
bar.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregate structures, data types and functionality.
17
18use std::{
19    collections::HashMap,
20    fmt::{Debug, Display},
21    hash::Hash,
22    num::{NonZero, NonZeroUsize},
23    str::FromStr,
24};
25
26use chrono::{DateTime, Datelike, Duration, SubsecRound, TimeDelta, Timelike, Utc};
27use derive_builder::Builder;
28use indexmap::IndexMap;
29use nautilus_core::{
30    UnixNanos,
31    correctness::{FAILED, check_predicate_true},
32    datetime::{add_n_months, subtract_n_months},
33    serialization::Serializable,
34};
35use serde::{Deserialize, Deserializer, Serialize, Serializer};
36
37use super::HasTsInit;
38use crate::{
39    enums::{AggregationSource, BarAggregation, PriceType},
40    identifiers::InstrumentId,
41    types::{Price, Quantity, fixed::FIXED_SIZE_BINARY},
42};
43
44pub const BAR_SPEC_1_SECOND_LAST: BarSpecification = BarSpecification {
45    step: NonZero::new(1).unwrap(),
46    aggregation: BarAggregation::Second,
47    price_type: PriceType::Last,
48};
49pub const BAR_SPEC_1_MINUTE_LAST: BarSpecification = BarSpecification {
50    step: NonZero::new(1).unwrap(),
51    aggregation: BarAggregation::Minute,
52    price_type: PriceType::Last,
53};
54pub const BAR_SPEC_3_MINUTE_LAST: BarSpecification = BarSpecification {
55    step: NonZero::new(3).unwrap(),
56    aggregation: BarAggregation::Minute,
57    price_type: PriceType::Last,
58};
59pub const BAR_SPEC_5_MINUTE_LAST: BarSpecification = BarSpecification {
60    step: NonZero::new(5).unwrap(),
61    aggregation: BarAggregation::Minute,
62    price_type: PriceType::Last,
63};
64pub const BAR_SPEC_15_MINUTE_LAST: BarSpecification = BarSpecification {
65    step: NonZero::new(15).unwrap(),
66    aggregation: BarAggregation::Minute,
67    price_type: PriceType::Last,
68};
69pub const BAR_SPEC_30_MINUTE_LAST: BarSpecification = BarSpecification {
70    step: NonZero::new(30).unwrap(),
71    aggregation: BarAggregation::Minute,
72    price_type: PriceType::Last,
73};
74pub const BAR_SPEC_1_HOUR_LAST: BarSpecification = BarSpecification {
75    step: NonZero::new(1).unwrap(),
76    aggregation: BarAggregation::Hour,
77    price_type: PriceType::Last,
78};
79pub const BAR_SPEC_2_HOUR_LAST: BarSpecification = BarSpecification {
80    step: NonZero::new(2).unwrap(),
81    aggregation: BarAggregation::Hour,
82    price_type: PriceType::Last,
83};
84pub const BAR_SPEC_4_HOUR_LAST: BarSpecification = BarSpecification {
85    step: NonZero::new(4).unwrap(),
86    aggregation: BarAggregation::Hour,
87    price_type: PriceType::Last,
88};
89pub const BAR_SPEC_6_HOUR_LAST: BarSpecification = BarSpecification {
90    step: NonZero::new(6).unwrap(),
91    aggregation: BarAggregation::Hour,
92    price_type: PriceType::Last,
93};
94pub const BAR_SPEC_12_HOUR_LAST: BarSpecification = BarSpecification {
95    step: NonZero::new(12).unwrap(),
96    aggregation: BarAggregation::Hour,
97    price_type: PriceType::Last,
98};
99pub const BAR_SPEC_1_DAY_LAST: BarSpecification = BarSpecification {
100    step: NonZero::new(1).unwrap(),
101    aggregation: BarAggregation::Day,
102    price_type: PriceType::Last,
103};
104pub const BAR_SPEC_2_DAY_LAST: BarSpecification = BarSpecification {
105    step: NonZero::new(2).unwrap(),
106    aggregation: BarAggregation::Day,
107    price_type: PriceType::Last,
108};
109pub const BAR_SPEC_3_DAY_LAST: BarSpecification = BarSpecification {
110    step: NonZero::new(3).unwrap(),
111    aggregation: BarAggregation::Day,
112    price_type: PriceType::Last,
113};
114pub const BAR_SPEC_5_DAY_LAST: BarSpecification = BarSpecification {
115    step: NonZero::new(5).unwrap(),
116    aggregation: BarAggregation::Day,
117    price_type: PriceType::Last,
118};
119pub const BAR_SPEC_1_WEEK_LAST: BarSpecification = BarSpecification {
120    step: NonZero::new(1).unwrap(),
121    aggregation: BarAggregation::Week,
122    price_type: PriceType::Last,
123};
124pub const BAR_SPEC_1_MONTH_LAST: BarSpecification = BarSpecification {
125    step: NonZero::new(1).unwrap(),
126    aggregation: BarAggregation::Month,
127    price_type: PriceType::Last,
128};
129pub const BAR_SPEC_3_MONTH_LAST: BarSpecification = BarSpecification {
130    step: NonZero::new(3).unwrap(),
131    aggregation: BarAggregation::Month,
132    price_type: PriceType::Last,
133};
134pub const BAR_SPEC_6_MONTH_LAST: BarSpecification = BarSpecification {
135    step: NonZero::new(6).unwrap(),
136    aggregation: BarAggregation::Month,
137    price_type: PriceType::Last,
138};
139pub const BAR_SPEC_12_MONTH_LAST: BarSpecification = BarSpecification {
140    step: NonZero::new(12).unwrap(),
141    aggregation: BarAggregation::Month,
142    price_type: PriceType::Last,
143};
144
145/// Returns the bar interval as a `TimeDelta`.
146///
147/// # Panics
148///
149/// Panics if the aggregation method of the given `bar_type` is not time based,
150/// or if `step` is too large for the interval arithmetic.
151#[must_use]
152pub fn get_bar_interval(bar_type: &BarType) -> TimeDelta {
153    let spec = bar_type.spec();
154    let step = step_to_i64(spec.step);
155
156    match spec.aggregation {
157        BarAggregation::Millisecond => TimeDelta::milliseconds(step),
158        BarAggregation::Second => TimeDelta::seconds(step),
159        BarAggregation::Minute => TimeDelta::minutes(step),
160        BarAggregation::Hour => TimeDelta::hours(step),
161        BarAggregation::Day => TimeDelta::days(step),
162        BarAggregation::Week => {
163            TimeDelta::days(step.checked_mul(7).expect("`step` overflows i64 days"))
164        }
165        BarAggregation::Month => {
166            // Proxy for comparing bar lengths
167            TimeDelta::days(step.checked_mul(30).expect("`step` overflows i64 days"))
168        }
169        BarAggregation::Year => {
170            // Proxy for comparing bar lengths
171            TimeDelta::days(step.checked_mul(365).expect("`step` overflows i64 days"))
172        }
173        _ => panic!("Aggregation not time based"),
174    }
175}
176
177/// Returns the bar interval as `UnixNanos`.
178///
179/// # Panics
180///
181/// Panics if the aggregation method of the given `bar_type` is not time based.
182#[must_use]
183pub fn get_bar_interval_ns(bar_type: &BarType) -> UnixNanos {
184    let interval_ns = get_bar_interval(bar_type)
185        .num_nanoseconds()
186        .expect("Invalid bar interval")
187        .cast_unsigned();
188    UnixNanos::from(interval_ns)
189}
190
191/// Returns the time bar start as a timezone-aware `DateTime<Utc>`.
192///
193/// # Panics
194///
195/// Panics if computing the base `NaiveDate` or `DateTime` from `now` fails,
196/// if `step` cannot be represented for the calendar arithmetic,
197/// or if the aggregation type is unsupported.
198pub fn get_time_bar_start(
199    now: DateTime<Utc>,
200    bar_type: &BarType,
201    time_bars_origin: Option<TimeDelta>,
202) -> DateTime<Utc> {
203    let spec = bar_type.spec();
204    let step = step_to_i64(spec.step);
205    let origin_offset: TimeDelta = time_bars_origin.unwrap_or_else(TimeDelta::zero);
206
207    match spec.aggregation {
208        BarAggregation::Millisecond => {
209            find_closest_smaller_time(now, origin_offset, Duration::milliseconds(step))
210        }
211        BarAggregation::Second => {
212            find_closest_smaller_time(now, origin_offset, Duration::seconds(step))
213        }
214        BarAggregation::Minute => {
215            find_closest_smaller_time(now, origin_offset, Duration::minutes(step))
216        }
217        BarAggregation::Hour => {
218            find_closest_smaller_time(now, origin_offset, Duration::hours(step))
219        }
220        BarAggregation::Day => find_closest_smaller_time(now, origin_offset, Duration::days(step)),
221        BarAggregation::Week => {
222            let mut start_time = now.trunc_subsecs(0)
223                - Duration::seconds(i64::from(now.second()))
224                - Duration::minutes(i64::from(now.minute()))
225                - Duration::hours(i64::from(now.hour()))
226                - TimeDelta::days(i64::from(now.weekday().num_days_from_monday()));
227            start_time += origin_offset;
228
229            if now < start_time {
230                start_time -= Duration::weeks(step);
231            }
232
233            start_time
234        }
235        BarAggregation::Month => {
236            // Set to the first day of the year
237            let mut start_time = DateTime::from_naive_utc_and_offset(
238                chrono::NaiveDate::from_ymd_opt(now.year(), 1, 1)
239                    .expect("valid date")
240                    .and_hms_opt(0, 0, 0)
241                    .expect("valid time"),
242                Utc,
243            );
244            start_time += origin_offset;
245
246            if now < start_time {
247                start_time =
248                    subtract_n_months(start_time, 12).expect("Failed to subtract 12 months");
249            }
250
251            let months_step =
252                u32::try_from(step).expect("`step` exceeds u32 range for month arithmetic");
253
254            while start_time <= now {
255                start_time =
256                    add_n_months(start_time, months_step).expect("Failed to add months in loop");
257            }
258
259            start_time =
260                subtract_n_months(start_time, months_step).expect("Failed to subtract months_step");
261            start_time
262        }
263        BarAggregation::Year => {
264            let step_i32 =
265                i32::try_from(step).expect("`step` exceeds i32 range for year arithmetic");
266
267            // Reconstruct from Jan 1 + origin each time to avoid leap-day drift
268            let year_start = |y: i32| {
269                DateTime::from_naive_utc_and_offset(
270                    chrono::NaiveDate::from_ymd_opt(y, 1, 1)
271                        .expect("valid date")
272                        .and_hms_opt(0, 0, 0)
273                        .expect("valid time"),
274                    Utc,
275                ) + origin_offset
276            };
277
278            let mut year = now.year();
279            if year_start(year) > now {
280                year -= step_i32;
281            }
282
283            while year_start(year + step_i32) <= now {
284                year += step_i32;
285            }
286
287            year_start(year)
288        }
289        _ => panic!(
290            "Aggregation type {} not supported for time bars",
291            spec.aggregation
292        ),
293    }
294}
295
296/// Finds the closest smaller time based on a daily time origin and period.
297///
298/// This function calculates the most recent time that is aligned with the given period
299/// and is less than or equal to the current time.
300fn find_closest_smaller_time(
301    now: DateTime<Utc>,
302    daily_time_origin: TimeDelta,
303    period: TimeDelta,
304) -> DateTime<Utc> {
305    // Floor to start of day
306    let day_start = now.trunc_subsecs(0)
307        - Duration::seconds(i64::from(now.second()))
308        - Duration::minutes(i64::from(now.minute()))
309        - Duration::hours(i64::from(now.hour()));
310    let base_time = day_start + daily_time_origin;
311
312    let time_difference = now - base_time;
313    let period_ns = period.num_nanoseconds().unwrap_or(1);
314
315    // Use div_euclid for floor division (rounds toward -inf, not zero)
316    // so negative deltas (now before origin) yield the previous period boundary
317    let num_periods = time_difference
318        .num_nanoseconds()
319        .unwrap_or(0)
320        .div_euclid(period_ns);
321
322    base_time + TimeDelta::nanoseconds(num_periods * period_ns)
323}
324
325/// Converts a bar specification step to `i64` for time arithmetic.
326///
327/// # Panics
328///
329/// Panics if `step` exceeds the `i64` range.
330fn step_to_i64(step: NonZeroUsize) -> i64 {
331    i64::try_from(step.get()).expect("`step` exceeds i64 range")
332}
333
334/// Represents a bar aggregation specification including a step, aggregation
335/// method/rule and price type.
336#[repr(C)]
337#[derive(
338    Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Builder,
339)]
340#[cfg_attr(
341    feature = "python",
342    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
343)]
344#[cfg_attr(
345    feature = "python",
346    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
347)]
348pub struct BarSpecification {
349    /// The step for binning samples for bar aggregation.
350    pub step: NonZeroUsize,
351    /// The type of bar aggregation.
352    pub aggregation: BarAggregation,
353    /// The price type to use for aggregation.
354    pub price_type: PriceType,
355}
356
357impl BarSpecification {
358    /// Creates a new [`BarSpecification`] instance with correctness checking.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if `step` is not positive (> 0), or if `step` is not
363    /// valid for a fixed-subunit time aggregation.
364    ///
365    /// # Notes
366    ///
367    /// PyO3 requires a `Result` type for proper error handling and stacktrace printing in Python.
368    pub fn new_checked(
369        step: usize,
370        aggregation: BarAggregation,
371        price_type: PriceType,
372    ) -> anyhow::Result<Self> {
373        let step = NonZeroUsize::new(step)
374            .ok_or(anyhow::anyhow!("Invalid step: {step} (must be non-zero)"))?;
375        Self::validate_step(step.get(), aggregation)?;
376
377        Ok(Self {
378            step,
379            aggregation,
380            price_type,
381        })
382    }
383
384    fn validate_step(step: usize, aggregation: BarAggregation) -> anyhow::Result<()> {
385        match aggregation {
386            BarAggregation::Millisecond => {
387                Self::validate_periodic_step(step, aggregation, 1000, false)
388            }
389            BarAggregation::Second | BarAggregation::Minute => {
390                Self::validate_periodic_step(step, aggregation, 60, false)
391            }
392            BarAggregation::Hour => Self::validate_periodic_step(step, aggregation, 24, false),
393            BarAggregation::Month => Self::validate_periodic_step(step, aggregation, 12, false),
394            _ => Ok(()),
395        }
396    }
397
398    fn validate_periodic_step(
399        step: usize,
400        aggregation: BarAggregation,
401        subunits: usize,
402        allow_equal: bool,
403    ) -> anyhow::Result<()> {
404        if !subunits.is_multiple_of(step) {
405            anyhow::bail!(
406                "Invalid step in bar_type.spec.step: {step} for aggregation={}. \
407                 step must evenly divide {subunits} (so it is periodic).",
408                aggregation as u8
409            );
410        }
411
412        if !allow_equal && subunits == step {
413            anyhow::bail!(
414                "Invalid step in bar_type.spec.step: {step} for aggregation={}. \
415                 step must not be {subunits}. Use higher aggregation unit instead.",
416                aggregation as u8
417            );
418        }
419
420        Ok(())
421    }
422
423    /// Creates a new [`BarSpecification`] instance.
424    ///
425    /// # Panics
426    ///
427    /// Panics if `step` is not positive (> 0), or if `step` is not valid for
428    /// a fixed-subunit time aggregation.
429    #[must_use]
430    pub fn new(step: usize, aggregation: BarAggregation, price_type: PriceType) -> Self {
431        Self::new_checked(step, aggregation, price_type).expect(FAILED)
432    }
433
434    /// Returns the `TimeDelta` interval for this bar specification.
435    ///
436    /// # Notes
437    ///
438    /// For [`BarAggregation::Month`] and [`BarAggregation::Year`], proxy values are used
439    /// (30 days for months, 365 days for years) to estimate their respective durations,
440    /// since months and years have variable lengths.
441    ///
442    /// # Panics
443    ///
444    /// Panics if the aggregation method is not time-based, or if `step` is too
445    /// large for the interval arithmetic.
446    #[must_use]
447    pub fn timedelta(&self) -> TimeDelta {
448        let step = step_to_i64(self.step);
449
450        match self.aggregation {
451            BarAggregation::Millisecond => Duration::milliseconds(step),
452            BarAggregation::Second => Duration::seconds(step),
453            BarAggregation::Minute => Duration::minutes(step),
454            BarAggregation::Hour => Duration::hours(step),
455            BarAggregation::Day => Duration::days(step),
456            BarAggregation::Week => {
457                Duration::days(step.checked_mul(7).expect("`step` overflows i64 days"))
458            }
459            BarAggregation::Month => {
460                // Proxy for comparing bar lengths
461                Duration::days(step.checked_mul(30).expect("`step` overflows i64 days"))
462            }
463            BarAggregation::Year => {
464                // Proxy for comparing bar lengths
465                Duration::days(step.checked_mul(365).expect("`step` overflows i64 days"))
466            }
467            _ => panic!(
468                "Timedelta not supported for aggregation type: {:?}",
469                self.aggregation
470            ),
471        }
472    }
473
474    /// Return a value indicating whether the aggregation method is time-driven:
475    ///  - [`BarAggregation::Millisecond`]
476    ///  - [`BarAggregation::Second`]
477    ///  - [`BarAggregation::Minute`]
478    ///  - [`BarAggregation::Hour`]
479    ///  - [`BarAggregation::Day`]
480    ///  - [`BarAggregation::Week`]
481    ///  - [`BarAggregation::Month`]
482    ///  - [`BarAggregation::Year`]
483    #[must_use]
484    pub fn is_time_aggregated(&self) -> bool {
485        matches!(
486            self.aggregation,
487            BarAggregation::Millisecond
488                | BarAggregation::Second
489                | BarAggregation::Minute
490                | BarAggregation::Hour
491                | BarAggregation::Day
492                | BarAggregation::Week
493                | BarAggregation::Month
494                | BarAggregation::Year
495        )
496    }
497
498    /// Return a value indicating whether the aggregation method is threshold-driven:
499    ///  - [`BarAggregation::Tick`]
500    ///  - [`BarAggregation::TickImbalance`]
501    ///  - [`BarAggregation::Volume`]
502    ///  - [`BarAggregation::VolumeImbalance`]
503    ///  - [`BarAggregation::Value`]
504    ///  - [`BarAggregation::ValueImbalance`]
505    #[must_use]
506    pub fn is_threshold_aggregated(&self) -> bool {
507        matches!(
508            self.aggregation,
509            BarAggregation::Tick
510                | BarAggregation::TickImbalance
511                | BarAggregation::Volume
512                | BarAggregation::VolumeImbalance
513                | BarAggregation::Value
514                | BarAggregation::ValueImbalance
515        )
516    }
517
518    /// Return a value indicating whether the aggregation method is information-driven:
519    ///  - [`BarAggregation::TickRuns`]
520    ///  - [`BarAggregation::VolumeRuns`]
521    ///  - [`BarAggregation::ValueRuns`]
522    #[must_use]
523    pub fn is_information_aggregated(&self) -> bool {
524        matches!(
525            self.aggregation,
526            BarAggregation::TickRuns | BarAggregation::VolumeRuns | BarAggregation::ValueRuns
527        )
528    }
529}
530
531impl Display for BarSpecification {
532    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
533        write!(f, "{}-{}-{}", self.step, self.aggregation, self.price_type)
534    }
535}
536
537/// Represents a bar type including the instrument ID, bar specification and
538/// aggregation source.
539#[repr(C)]
540#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
541#[cfg_attr(
542    feature = "python",
543    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
544)]
545#[cfg_attr(
546    feature = "python",
547    pyo3_stub_gen::derive::gen_stub_pyclass_enum(module = "nautilus_trader.model")
548)]
549pub enum BarType {
550    Standard {
551        /// The bar type's instrument ID.
552        instrument_id: InstrumentId,
553        /// The bar type's specification.
554        spec: BarSpecification,
555        /// The bar type's aggregation source.
556        aggregation_source: AggregationSource,
557    },
558    Composite {
559        /// The bar type's instrument ID.
560        instrument_id: InstrumentId,
561        /// The bar type's specification.
562        spec: BarSpecification,
563        /// The bar type's aggregation source.
564        aggregation_source: AggregationSource,
565
566        /// The composite step for binning samples for bar aggregation.
567        composite_step: usize,
568        /// The composite type of bar aggregation.
569        composite_aggregation: BarAggregation,
570        /// The composite bar type's aggregation source.
571        composite_aggregation_source: AggregationSource,
572    },
573}
574
575impl BarType {
576    /// Creates a new [`BarType`] instance.
577    #[must_use]
578    pub fn new(
579        instrument_id: InstrumentId,
580        spec: BarSpecification,
581        aggregation_source: AggregationSource,
582    ) -> Self {
583        Self::Standard {
584            instrument_id,
585            spec,
586            aggregation_source,
587        }
588    }
589
590    /// Creates a new composite [`BarType`] instance.
591    #[must_use]
592    pub fn new_composite(
593        instrument_id: InstrumentId,
594        spec: BarSpecification,
595        aggregation_source: AggregationSource,
596
597        composite_step: usize,
598        composite_aggregation: BarAggregation,
599        composite_aggregation_source: AggregationSource,
600    ) -> Self {
601        Self::Composite {
602            instrument_id,
603            spec,
604            aggregation_source,
605
606            composite_step,
607            composite_aggregation,
608            composite_aggregation_source,
609        }
610    }
611
612    /// Returns whether this instance is a standard bar type.
613    #[must_use]
614    pub fn is_standard(&self) -> bool {
615        match &self {
616            Self::Standard { .. } => true,
617            Self::Composite { .. } => false,
618        }
619    }
620
621    /// Returns whether this instance is a composite bar type.
622    #[must_use]
623    pub fn is_composite(&self) -> bool {
624        match &self {
625            Self::Standard { .. } => false,
626            Self::Composite { .. } => true,
627        }
628    }
629
630    /// Returns whether the bar aggregation source is `EXTERNAL`.
631    #[must_use]
632    pub fn is_externally_aggregated(&self) -> bool {
633        self.aggregation_source() == AggregationSource::External
634    }
635
636    /// Returns whether the bar aggregation source is `INTERNAL`.
637    #[must_use]
638    pub fn is_internally_aggregated(&self) -> bool {
639        self.aggregation_source() == AggregationSource::Internal
640    }
641
642    /// Returns the standard bar type component.
643    #[must_use]
644    pub fn standard(&self) -> Self {
645        match self {
646            &b @ Self::Standard { .. } => b,
647            Self::Composite {
648                instrument_id,
649                spec,
650                aggregation_source,
651                ..
652            } => Self::new(*instrument_id, *spec, *aggregation_source),
653        }
654    }
655
656    /// Returns any composite bar type component.
657    #[must_use]
658    pub fn composite(&self) -> Self {
659        match self {
660            &b @ Self::Standard { .. } => b, // case shouldn't be used if is_composite is called before
661            Self::Composite {
662                instrument_id,
663                spec,
664                aggregation_source: _,
665
666                composite_step,
667                composite_aggregation,
668                composite_aggregation_source,
669            } => Self::new(
670                *instrument_id,
671                BarSpecification::new(*composite_step, *composite_aggregation, spec.price_type),
672                *composite_aggregation_source,
673            ),
674        }
675    }
676
677    /// Returns the [`InstrumentId`] for this bar type.
678    #[must_use]
679    pub fn instrument_id(&self) -> InstrumentId {
680        match &self {
681            Self::Standard { instrument_id, .. } | Self::Composite { instrument_id, .. } => {
682                *instrument_id
683            }
684        }
685    }
686
687    /// Returns the [`BarSpecification`] for this bar type.
688    #[must_use]
689    pub fn spec(&self) -> BarSpecification {
690        match &self {
691            Self::Standard { spec, .. } | Self::Composite { spec, .. } => *spec,
692        }
693    }
694
695    /// Returns the [`AggregationSource`] for this bar type.
696    #[must_use]
697    pub fn aggregation_source(&self) -> AggregationSource {
698        match &self {
699            Self::Standard {
700                aggregation_source, ..
701            }
702            | Self::Composite {
703                aggregation_source, ..
704            } => *aggregation_source,
705        }
706    }
707
708    /// Returns the instrument ID and bar specification as a tuple key.
709    ///
710    /// Useful as a hashmap key when aggregation source should be ignored,
711    /// such as for indicator registration where INTERNAL and EXTERNAL bars
712    /// should trigger the same indicators.
713    #[must_use]
714    pub fn id_spec_key(&self) -> (InstrumentId, BarSpecification) {
715        (self.instrument_id(), self.spec())
716    }
717}
718
719#[derive(thiserror::Error, Debug)]
720#[error("Error parsing `BarType` from '{input}', invalid token: '{token}' at position {position}")]
721pub struct BarTypeParseError {
722    input: String,
723    token: String,
724    position: usize,
725}
726
727impl FromStr for BarType {
728    type Err = BarTypeParseError;
729
730    #[expect(clippy::needless_collect)] // Collect needed for .rev() and indexing
731    fn from_str(s: &str) -> Result<Self, Self::Err> {
732        let parts: Vec<&str> = s.split('@').collect();
733        if parts.len() > 2 {
734            return Err(BarTypeParseError {
735                input: s.to_string(),
736                token: parts[2].to_string(),
737                position: 5,
738            });
739        }
740        let standard = parts[0];
741        let composite_str = parts.get(1);
742
743        let pieces: Vec<&str> = standard.rsplitn(5, '-').collect();
744        let rev_pieces: Vec<&str> = pieces.into_iter().rev().collect();
745        if rev_pieces.len() != 5 {
746            return Err(BarTypeParseError {
747                input: s.to_string(),
748                token: String::new(),
749                position: 0,
750            });
751        }
752
753        let instrument_id =
754            InstrumentId::from_str(rev_pieces[0]).map_err(|_| BarTypeParseError {
755                input: s.to_string(),
756                token: rev_pieces[0].to_string(),
757                position: 0,
758            })?;
759
760        let step = rev_pieces[1].parse().map_err(|_| BarTypeParseError {
761            input: s.to_string(),
762            token: rev_pieces[1].to_string(),
763            position: 1,
764        })?;
765        let aggregation =
766            BarAggregation::from_str(rev_pieces[2]).map_err(|_| BarTypeParseError {
767                input: s.to_string(),
768                token: rev_pieces[2].to_string(),
769                position: 2,
770            })?;
771        let price_type = PriceType::from_str(rev_pieces[3]).map_err(|_| BarTypeParseError {
772            input: s.to_string(),
773            token: rev_pieces[3].to_string(),
774            position: 3,
775        })?;
776        let aggregation_source =
777            AggregationSource::from_str(rev_pieces[4]).map_err(|_| BarTypeParseError {
778                input: s.to_string(),
779                token: rev_pieces[4].to_string(),
780                position: 4,
781            })?;
782        let spec = BarSpecification::new_checked(step, aggregation, price_type).map_err(|_| {
783            BarTypeParseError {
784                input: s.to_string(),
785                token: rev_pieces[1].to_string(),
786                position: 1,
787            }
788        })?;
789
790        if let Some(composite_str) = composite_str {
791            let composite_pieces: Vec<&str> = composite_str.rsplitn(3, '-').collect();
792            let rev_composite_pieces: Vec<&str> = composite_pieces.into_iter().rev().collect();
793            if rev_composite_pieces.len() != 3 {
794                return Err(BarTypeParseError {
795                    input: s.to_string(),
796                    token: String::new(),
797                    position: 5,
798                });
799            }
800
801            let composite_step =
802                rev_composite_pieces[0]
803                    .parse()
804                    .map_err(|_| BarTypeParseError {
805                        input: s.to_string(),
806                        token: rev_composite_pieces[0].to_string(),
807                        position: 5,
808                    })?;
809            let composite_aggregation =
810                BarAggregation::from_str(rev_composite_pieces[1]).map_err(|_| {
811                    BarTypeParseError {
812                        input: s.to_string(),
813                        token: rev_composite_pieces[1].to_string(),
814                        position: 6,
815                    }
816                })?;
817            let composite_aggregation_source = AggregationSource::from_str(rev_composite_pieces[2])
818                .map_err(|_| BarTypeParseError {
819                    input: s.to_string(),
820                    token: rev_composite_pieces[2].to_string(),
821                    position: 7,
822                })?;
823            BarSpecification::new_checked(composite_step, composite_aggregation, price_type)
824                .map_err(|_| BarTypeParseError {
825                    input: s.to_string(),
826                    token: rev_composite_pieces[0].to_string(),
827                    position: 5,
828                })?;
829
830            Ok(Self::new_composite(
831                instrument_id,
832                spec,
833                aggregation_source,
834                composite_step,
835                composite_aggregation,
836                composite_aggregation_source,
837            ))
838        } else {
839            Ok(Self::Standard {
840                instrument_id,
841                spec,
842                aggregation_source,
843            })
844        }
845    }
846}
847
848impl<T: AsRef<str>> From<T> for BarType {
849    fn from(value: T) -> Self {
850        Self::from_str(value.as_ref()).expect(FAILED)
851    }
852}
853
854impl Display for BarType {
855    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
856        match &self {
857            Self::Standard {
858                instrument_id,
859                spec,
860                aggregation_source,
861            } => {
862                write!(f, "{instrument_id}-{spec}-{aggregation_source}")
863            }
864            Self::Composite {
865                instrument_id,
866                spec,
867                aggregation_source,
868
869                composite_step,
870                composite_aggregation,
871                composite_aggregation_source,
872            } => {
873                write!(
874                    f,
875                    "{}-{}-{}@{}-{}-{}",
876                    instrument_id,
877                    spec,
878                    aggregation_source,
879                    *composite_step,
880                    *composite_aggregation,
881                    *composite_aggregation_source
882                )
883            }
884        }
885    }
886}
887
888impl Serialize for BarType {
889    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
890    where
891        S: Serializer,
892    {
893        serializer.serialize_str(&self.to_string())
894    }
895}
896
897impl<'de> Deserialize<'de> for BarType {
898    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
899    where
900        D: Deserializer<'de>,
901    {
902        let s: std::borrow::Cow<'de, str> = Deserialize::deserialize(deserializer)?;
903        Self::from_str(s.as_ref()).map_err(serde::de::Error::custom)
904    }
905}
906
907/// Represents an aggregated bar.
908#[repr(C)]
909#[derive(Clone, Copy, Hash, PartialEq, Eq, Debug, Serialize, Deserialize)]
910#[serde(tag = "type")]
911#[cfg_attr(
912    feature = "python",
913    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
914)]
915#[cfg_attr(
916    feature = "python",
917    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
918)]
919pub struct Bar {
920    /// The bar type for this bar.
921    pub bar_type: BarType,
922    /// The bars open price.
923    pub open: Price,
924    /// The bars high price.
925    pub high: Price,
926    /// The bars low price.
927    pub low: Price,
928    /// The bars close price.
929    pub close: Price,
930    /// The bars volume.
931    pub volume: Quantity,
932    /// UNIX timestamp (nanoseconds) when the data event occurred.
933    pub ts_event: UnixNanos,
934    /// UNIX timestamp (nanoseconds) when the instance was created.
935    pub ts_init: UnixNanos,
936}
937
938impl Bar {
939    /// Creates a new [`Bar`] instance with correctness checking.
940    ///
941    /// # Errors
942    ///
943    /// Returns an error if:
944    /// - `high` is not >= `low`.
945    /// - `high` is not >= `close`.
946    /// - `low` is not <= `close`.
947    ///
948    /// # Notes
949    ///
950    /// PyO3 requires a `Result` type for proper error handling and stacktrace printing in Python.
951    #[expect(clippy::too_many_arguments)]
952    pub fn new_checked(
953        bar_type: BarType,
954        open: Price,
955        high: Price,
956        low: Price,
957        close: Price,
958        volume: Quantity,
959        ts_event: UnixNanos,
960        ts_init: UnixNanos,
961    ) -> anyhow::Result<Self> {
962        check_predicate_true(high >= open, "high >= open")?;
963        check_predicate_true(high >= low, "high >= low")?;
964        check_predicate_true(high >= close, "high >= close")?;
965        check_predicate_true(low <= close, "low <= close")?;
966        check_predicate_true(low <= open, "low <= open")?;
967
968        Ok(Self {
969            bar_type,
970            open,
971            high,
972            low,
973            close,
974            volume,
975            ts_event,
976            ts_init,
977        })
978    }
979
980    /// Creates a new [`Bar`] instance.
981    ///
982    /// # Panics
983    ///
984    /// This function panics if:
985    /// - `high` is not >= `low`.
986    /// - `high` is not >= `close`.
987    /// - `low` is not <= `close`.
988    #[expect(clippy::too_many_arguments)]
989    #[must_use]
990    pub fn new(
991        bar_type: BarType,
992        open: Price,
993        high: Price,
994        low: Price,
995        close: Price,
996        volume: Quantity,
997        ts_event: UnixNanos,
998        ts_init: UnixNanos,
999    ) -> Self {
1000        Self::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
1001            .expect(FAILED)
1002    }
1003
1004    #[must_use]
1005    pub fn instrument_id(&self) -> InstrumentId {
1006        self.bar_type.instrument_id()
1007    }
1008
1009    /// Returns the metadata for the type, for use with serialization formats.
1010    #[must_use]
1011    pub fn get_metadata(
1012        bar_type: &BarType,
1013        price_precision: u8,
1014        size_precision: u8,
1015    ) -> HashMap<String, String> {
1016        let mut metadata = HashMap::new();
1017        let instrument_id = bar_type.instrument_id();
1018        metadata.insert("bar_type".to_string(), bar_type.to_string());
1019        metadata.insert("instrument_id".to_string(), instrument_id.to_string());
1020        metadata.insert("price_precision".to_string(), price_precision.to_string());
1021        metadata.insert("size_precision".to_string(), size_precision.to_string());
1022        metadata
1023    }
1024
1025    /// Returns the field map for the type, for use with Arrow schemas.
1026    #[must_use]
1027    pub fn get_fields() -> IndexMap<String, String> {
1028        let mut metadata = IndexMap::new();
1029        metadata.insert("open".to_string(), FIXED_SIZE_BINARY.to_string());
1030        metadata.insert("high".to_string(), FIXED_SIZE_BINARY.to_string());
1031        metadata.insert("low".to_string(), FIXED_SIZE_BINARY.to_string());
1032        metadata.insert("close".to_string(), FIXED_SIZE_BINARY.to_string());
1033        metadata.insert("volume".to_string(), FIXED_SIZE_BINARY.to_string());
1034        metadata.insert("ts_event".to_string(), "UInt64".to_string());
1035        metadata.insert("ts_init".to_string(), "UInt64".to_string());
1036        metadata
1037    }
1038}
1039
1040impl Display for Bar {
1041    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1042        write!(
1043            f,
1044            "{},{},{},{},{},{},{}",
1045            self.bar_type, self.open, self.high, self.low, self.close, self.volume, self.ts_event
1046        )
1047    }
1048}
1049
1050impl Serializable for Bar {}
1051
1052impl HasTsInit for Bar {
1053    fn ts_init(&self) -> UnixNanos {
1054        self.ts_init
1055    }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060    use std::str::FromStr;
1061
1062    use chrono::TimeZone;
1063    use nautilus_core::serialization::msgpack::{FromMsgPack, ToMsgPack};
1064    use rstest::rstest;
1065
1066    use super::*;
1067    use crate::identifiers::{Symbol, Venue};
1068
1069    #[rstest]
1070    fn test_bar_specification_new_invalid() {
1071        let result = BarSpecification::new_checked(0, BarAggregation::Tick, PriceType::Last);
1072        assert!(
1073            result
1074                .unwrap_err()
1075                .to_string()
1076                .contains("Invalid step: 0 (must be non-zero)")
1077        );
1078    }
1079
1080    #[rstest]
1081    #[should_panic(expected = "Invalid step: 0 (must be non-zero)")]
1082    fn test_bar_specification_new_checked_with_invalid_step_panics() {
1083        let aggregation = BarAggregation::Tick;
1084        let price_type = PriceType::Last;
1085
1086        let _ = BarSpecification::new(0, aggregation, price_type);
1087    }
1088
1089    #[rstest]
1090    #[should_panic(expected = "Invalid step in bar_type.spec.step: 7")]
1091    fn test_bar_specification_new_with_invalid_periodic_step_panics() {
1092        let _ = BarSpecification::new(7, BarAggregation::Minute, PriceType::Last);
1093    }
1094
1095    #[rstest]
1096    #[case(
1097        BarAggregation::Millisecond,
1098        12,
1099        "Invalid step in bar_type.spec.step: 12 for aggregation=10. step must evenly divide 1000"
1100    )]
1101    #[case(
1102        BarAggregation::Millisecond,
1103        1000,
1104        "Invalid step in bar_type.spec.step: 1000 for aggregation=10. step must not be 1000"
1105    )]
1106    #[case(
1107        BarAggregation::Second,
1108        50,
1109        "Invalid step in bar_type.spec.step: 50 for aggregation=11. step must evenly divide 60"
1110    )]
1111    #[case(
1112        BarAggregation::Second,
1113        60,
1114        "Invalid step in bar_type.spec.step: 60 for aggregation=11. step must not be 60"
1115    )]
1116    #[case(
1117        BarAggregation::Minute,
1118        40,
1119        "Invalid step in bar_type.spec.step: 40 for aggregation=12. step must evenly divide 60"
1120    )]
1121    #[case(
1122        BarAggregation::Minute,
1123        60,
1124        "Invalid step in bar_type.spec.step: 60 for aggregation=12. step must not be 60"
1125    )]
1126    #[case(
1127        BarAggregation::Hour,
1128        5,
1129        "Invalid step in bar_type.spec.step: 5 for aggregation=13. step must evenly divide 24"
1130    )]
1131    #[case(
1132        BarAggregation::Hour,
1133        13,
1134        "Invalid step in bar_type.spec.step: 13 for aggregation=13. step must evenly divide 24"
1135    )]
1136    #[case(
1137        BarAggregation::Hour,
1138        24,
1139        "Invalid step in bar_type.spec.step: 24 for aggregation=13. step must not be 24"
1140    )]
1141    #[case(
1142        BarAggregation::Month,
1143        5,
1144        "Invalid step in bar_type.spec.step: 5 for aggregation=16. step must evenly divide 12"
1145    )]
1146    #[case(
1147        BarAggregation::Month,
1148        12,
1149        "Invalid step in bar_type.spec.step: 12 for aggregation=16. step must not be 12"
1150    )]
1151    fn test_bar_specification_new_checked_invalid_periodic_step(
1152        #[case] aggregation: BarAggregation,
1153        #[case] step: usize,
1154        #[case] expected: &str,
1155    ) {
1156        let result = BarSpecification::new_checked(step, aggregation, PriceType::Last);
1157
1158        assert!(result.unwrap_err().to_string().starts_with(expected));
1159    }
1160
1161    #[rstest]
1162    #[case(BarAggregation::Day)]
1163    #[case(BarAggregation::Week)]
1164    #[case(BarAggregation::Year)]
1165    #[case(BarAggregation::Tick)]
1166    #[case(BarAggregation::TickImbalance)]
1167    #[case(BarAggregation::TickRuns)]
1168    #[case(BarAggregation::Volume)]
1169    #[case(BarAggregation::VolumeImbalance)]
1170    #[case(BarAggregation::VolumeRuns)]
1171    #[case(BarAggregation::Value)]
1172    #[case(BarAggregation::ValueImbalance)]
1173    #[case(BarAggregation::ValueRuns)]
1174    #[case(BarAggregation::Renko)]
1175    fn test_bar_specification_new_checked_allows_non_periodic_steps(
1176        #[case] aggregation: BarAggregation,
1177    ) {
1178        let result = BarSpecification::new_checked(7, aggregation, PriceType::Last);
1179
1180        assert!(result.is_ok());
1181    }
1182
1183    #[rstest]
1184    #[case(BarAggregation::Millisecond, 1, TimeDelta::milliseconds(1))]
1185    #[case(BarAggregation::Millisecond, 10, TimeDelta::milliseconds(10))]
1186    #[case(BarAggregation::Second, 1, TimeDelta::seconds(1))]
1187    #[case(BarAggregation::Second, 15, TimeDelta::seconds(15))]
1188    #[case(BarAggregation::Minute, 1, TimeDelta::minutes(1))]
1189    #[case(BarAggregation::Minute, 30, TimeDelta::minutes(30))]
1190    #[case(BarAggregation::Hour, 1, TimeDelta::hours(1))]
1191    #[case(BarAggregation::Hour, 4, TimeDelta::hours(4))]
1192    #[case(BarAggregation::Day, 1, TimeDelta::days(1))]
1193    #[case(BarAggregation::Day, 2, TimeDelta::days(2))]
1194    #[case(BarAggregation::Week, 1, TimeDelta::days(7))]
1195    #[case(BarAggregation::Week, 2, TimeDelta::days(14))]
1196    #[case(BarAggregation::Month, 1, TimeDelta::days(30))]
1197    #[case(BarAggregation::Month, 3, TimeDelta::days(90))]
1198    #[case(BarAggregation::Year, 1, TimeDelta::days(365))]
1199    #[case(BarAggregation::Year, 2, TimeDelta::days(730))]
1200    #[should_panic(expected = "Aggregation not time based")]
1201    #[case(BarAggregation::Tick, 1, TimeDelta::zero())]
1202    fn test_get_bar_interval(
1203        #[case] aggregation: BarAggregation,
1204        #[case] step: usize,
1205        #[case] expected: TimeDelta,
1206    ) {
1207        let bar_type = BarType::Standard {
1208            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
1209            spec: BarSpecification::new(step, aggregation, PriceType::Last),
1210            aggregation_source: AggregationSource::Internal,
1211        };
1212
1213        let interval = get_bar_interval(&bar_type);
1214        assert_eq!(interval, expected);
1215    }
1216
1217    #[rstest]
1218    #[case(BarAggregation::Millisecond, 1, UnixNanos::from(1_000_000))]
1219    #[case(BarAggregation::Millisecond, 10, UnixNanos::from(10_000_000))]
1220    #[case(BarAggregation::Second, 1, UnixNanos::from(1_000_000_000))]
1221    #[case(BarAggregation::Second, 10, UnixNanos::from(10_000_000_000))]
1222    #[case(BarAggregation::Minute, 1, UnixNanos::from(60_000_000_000))]
1223    #[case(BarAggregation::Minute, 30, UnixNanos::from(1_800_000_000_000))]
1224    #[case(BarAggregation::Hour, 1, UnixNanos::from(3_600_000_000_000))]
1225    #[case(BarAggregation::Hour, 4, UnixNanos::from(14_400_000_000_000))]
1226    #[case(BarAggregation::Day, 1, UnixNanos::from(86_400_000_000_000))]
1227    #[case(BarAggregation::Day, 2, UnixNanos::from(172_800_000_000_000))]
1228    #[case(BarAggregation::Week, 1, UnixNanos::from(604_800_000_000_000))]
1229    #[case(BarAggregation::Week, 2, UnixNanos::from(1_209_600_000_000_000))]
1230    #[case(BarAggregation::Month, 1, UnixNanos::from(2_592_000_000_000_000))]
1231    #[case(BarAggregation::Month, 3, UnixNanos::from(7_776_000_000_000_000))]
1232    #[case(BarAggregation::Year, 1, UnixNanos::from(31_536_000_000_000_000))]
1233    #[case(BarAggregation::Year, 2, UnixNanos::from(63_072_000_000_000_000))]
1234    #[should_panic(expected = "Aggregation not time based")]
1235    #[case(BarAggregation::Tick, 1, UnixNanos::from(0))]
1236    fn test_get_bar_interval_ns(
1237        #[case] aggregation: BarAggregation,
1238        #[case] step: usize,
1239        #[case] expected: UnixNanos,
1240    ) {
1241        let bar_type = BarType::Standard {
1242            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
1243            spec: BarSpecification::new(step, aggregation, PriceType::Last),
1244            aggregation_source: AggregationSource::Internal,
1245        };
1246
1247        let interval_ns = get_bar_interval_ns(&bar_type);
1248        assert_eq!(interval_ns, expected);
1249    }
1250
1251    fn bar_type_with_raw_step(step: usize, aggregation: BarAggregation) -> BarType {
1252        // Bypasses `BarSpecification::new_checked` to exercise the conversion guards
1253        let spec = BarSpecification {
1254            step: NonZeroUsize::new(step).unwrap(),
1255            aggregation,
1256            price_type: PriceType::Last,
1257        };
1258        BarType::new(
1259            InstrumentId::from("BTCUSDT-PERP.BINANCE"),
1260            spec,
1261            AggregationSource::Internal,
1262        )
1263    }
1264
1265    #[rstest]
1266    #[should_panic(expected = "`step` exceeds i64 range")]
1267    fn test_get_bar_interval_step_exceeds_i64_panics() {
1268        let bar_type = bar_type_with_raw_step(usize::MAX, BarAggregation::Second);
1269        let _ = get_bar_interval(&bar_type);
1270    }
1271
1272    #[rstest]
1273    #[should_panic(expected = "`step` overflows i64 days")]
1274    fn test_get_bar_interval_week_step_overflow_panics() {
1275        let step = usize::try_from(i64::MAX).unwrap();
1276        let bar_type = bar_type_with_raw_step(step, BarAggregation::Week);
1277        let _ = get_bar_interval(&bar_type);
1278    }
1279
1280    #[rstest]
1281    #[should_panic(expected = "`step` overflows i64 days")]
1282    fn test_timedelta_year_step_overflow_panics() {
1283        let step = usize::try_from(i64::MAX).unwrap();
1284        let bar_type = bar_type_with_raw_step(step, BarAggregation::Year);
1285        let _ = bar_type.spec().timedelta();
1286    }
1287
1288    #[rstest]
1289    #[should_panic(expected = "`step` exceeds u32 range for month arithmetic")]
1290    fn test_get_time_bar_start_month_step_exceeds_u32_panics() {
1291        let bar_type = bar_type_with_raw_step(1_usize << 40, BarAggregation::Month);
1292        let now = Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap();
1293        let _ = get_time_bar_start(now, &bar_type, None);
1294    }
1295
1296    #[rstest]
1297    #[should_panic(expected = "`step` exceeds i32 range for year arithmetic")]
1298    fn test_get_time_bar_start_year_step_exceeds_i32_panics() {
1299        let bar_type = bar_type_with_raw_step(1_usize << 40, BarAggregation::Year);
1300        let now = Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap();
1301        let _ = get_time_bar_start(now, &bar_type, None);
1302    }
1303
1304    #[rstest]
1305    #[case::millisecond(
1306    Utc.timestamp_opt(1_658_349_296, 123_000_000).unwrap(), // 2024-07-21 12:34:56.123 UTC
1307    BarAggregation::Millisecond,
1308    1,
1309    Utc.timestamp_opt(1_658_349_296, 123_000_000).unwrap(),  // 2024-07-21 12:34:56.123 UTC
1310    )]
1311    #[rstest]
1312    #[case::millisecond(
1313    Utc.timestamp_opt(1_658_349_296, 123_000_000).unwrap(), // 2024-07-21 12:34:56.123 UTC
1314    BarAggregation::Millisecond,
1315    10,
1316    Utc.timestamp_opt(1_658_349_296, 120_000_000).unwrap(),  // 2024-07-21 12:34:56.120 UTC
1317    )]
1318    #[case::second(
1319    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1320    BarAggregation::Second,
1321    1,
1322    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap()
1323    )]
1324    #[case::second(
1325    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1326    BarAggregation::Second,
1327    5,
1328    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 55).unwrap()
1329    )]
1330    #[case::minute(
1331    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1332    BarAggregation::Minute,
1333    1,
1334    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 0).unwrap()
1335    )]
1336    #[case::minute(
1337    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1338    BarAggregation::Minute,
1339    5,
1340    Utc.with_ymd_and_hms(2024, 7, 21, 12, 30, 0).unwrap()
1341    )]
1342    #[case::hour(
1343    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1344    BarAggregation::Hour,
1345    1,
1346    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
1347    )]
1348    #[case::hour(
1349    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1350    BarAggregation::Hour,
1351    2,
1352    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
1353    )]
1354    #[case::day(
1355    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1356    BarAggregation::Day,
1357    1,
1358    Utc.with_ymd_and_hms(2024, 7, 21, 0, 0, 0).unwrap()
1359    )]
1360    fn test_get_time_bar_start(
1361        #[case] now: DateTime<Utc>,
1362        #[case] aggregation: BarAggregation,
1363        #[case] step: usize,
1364        #[case] expected: DateTime<Utc>,
1365    ) {
1366        let bar_type = BarType::Standard {
1367            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
1368            spec: BarSpecification::new(step, aggregation, PriceType::Last),
1369            aggregation_source: AggregationSource::Internal,
1370        };
1371
1372        let start_time = get_time_bar_start(now, &bar_type, None);
1373        assert_eq!(start_time, expected);
1374    }
1375
1376    #[rstest]
1377    fn test_bar_spec_string_reprs() {
1378        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1379        assert_eq!(bar_spec.to_string(), "1-MINUTE-BID");
1380        assert_eq!(format!("{bar_spec}"), "1-MINUTE-BID");
1381    }
1382
1383    #[rstest]
1384    fn test_bar_type_parse_valid() {
1385        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL";
1386        let bar_type = BarType::from(input);
1387
1388        assert_eq!(
1389            bar_type.instrument_id(),
1390            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1391        );
1392        assert_eq!(
1393            bar_type.spec(),
1394            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1395        );
1396        assert_eq!(bar_type.aggregation_source(), AggregationSource::External);
1397        assert_eq!(bar_type, BarType::from(input));
1398    }
1399
1400    #[rstest]
1401    #[case("BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL", true, false)]
1402    #[case("BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-INTERNAL", false, true)]
1403    #[case(
1404        "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL",
1405        false,
1406        true
1407    )]
1408    #[case(
1409        "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-EXTERNAL@1-MINUTE-INTERNAL",
1410        true,
1411        false
1412    )]
1413    fn test_bar_type_aggregation_source_predicates(
1414        #[case] input: &str,
1415        #[case] expected_external: bool,
1416        #[case] expected_internal: bool,
1417    ) {
1418        let bar_type = BarType::from(input);
1419        assert_eq!(bar_type.is_externally_aggregated(), expected_external);
1420        assert_eq!(bar_type.is_internally_aggregated(), expected_internal);
1421    }
1422
1423    #[rstest]
1424    fn test_bar_type_composite_aggregation_source_predicates_track_inner() {
1425        let bar_type =
1426            BarType::from("BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL");
1427
1428        assert!(bar_type.is_internally_aggregated());
1429        assert!(!bar_type.is_externally_aggregated());
1430
1431        let composite = bar_type.composite();
1432        assert!(composite.is_externally_aggregated());
1433        assert!(!composite.is_internally_aggregated());
1434    }
1435
1436    #[rstest]
1437    fn test_bar_type_from_str_with_utf8_symbol() {
1438        let non_ascii_instrument = "TËST-PÉRP.BINANCE";
1439        let non_ascii_bar_type = "TËST-PÉRP.BINANCE-1-MINUTE-LAST-EXTERNAL";
1440
1441        let bar_type = BarType::from_str(non_ascii_bar_type).unwrap();
1442
1443        assert_eq!(
1444            bar_type.instrument_id(),
1445            InstrumentId::from_str(non_ascii_instrument).unwrap()
1446        );
1447        assert_eq!(
1448            bar_type.spec(),
1449            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1450        );
1451        assert_eq!(bar_type.aggregation_source(), AggregationSource::External);
1452        assert_eq!(bar_type.to_string(), non_ascii_bar_type);
1453    }
1454
1455    #[rstest]
1456    fn test_bar_type_composite_parse_valid() {
1457        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL";
1458        let bar_type = BarType::from(input);
1459        let standard = bar_type.standard();
1460
1461        assert_eq!(
1462            bar_type.instrument_id(),
1463            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1464        );
1465        assert_eq!(
1466            bar_type.spec(),
1467            BarSpecification::new(2, BarAggregation::Minute, PriceType::Last,)
1468        );
1469        assert_eq!(bar_type.aggregation_source(), AggregationSource::Internal);
1470        assert_eq!(bar_type, BarType::from(input));
1471        assert!(bar_type.is_composite());
1472
1473        assert_eq!(
1474            standard.instrument_id(),
1475            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1476        );
1477        assert_eq!(
1478            standard.spec(),
1479            BarSpecification::new(2, BarAggregation::Minute, PriceType::Last,)
1480        );
1481        assert_eq!(standard.aggregation_source(), AggregationSource::Internal);
1482        assert!(standard.is_standard());
1483
1484        let composite = bar_type.composite();
1485        let composite_input = "BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL";
1486
1487        assert_eq!(
1488            composite.instrument_id(),
1489            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1490        );
1491        assert_eq!(
1492            composite.spec(),
1493            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last,)
1494        );
1495        assert_eq!(composite.aggregation_source(), AggregationSource::External);
1496        assert_eq!(composite, BarType::from(composite_input));
1497        assert!(composite.is_standard());
1498    }
1499
1500    #[rstest]
1501    fn test_bar_type_parse_invalid_token_pos_0() {
1502        let input = "BTCUSDT-PERP-1-MINUTE-LAST-INTERNAL";
1503        let result = BarType::from_str(input);
1504
1505        assert_eq!(
1506            result.unwrap_err().to_string(),
1507            format!(
1508                "Error parsing `BarType` from '{input}', invalid token: 'BTCUSDT-PERP' at position 0"
1509            )
1510        );
1511    }
1512
1513    #[rstest]
1514    fn test_bar_type_parse_invalid_token_pos_1() {
1515        let input = "BTCUSDT-PERP.BINANCE-INVALID-MINUTE-LAST-INTERNAL";
1516        let result = BarType::from_str(input);
1517
1518        assert_eq!(
1519            result.unwrap_err().to_string(),
1520            format!(
1521                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 1"
1522            )
1523        );
1524    }
1525
1526    #[rstest]
1527    fn test_bar_type_parse_invalid_spec_step() {
1528        let input = "BTCUSDT-PERP.BINANCE-60-MINUTE-LAST-INTERNAL";
1529        let result = BarType::from_str(input);
1530
1531        assert_eq!(
1532            result.unwrap_err().to_string(),
1533            format!("Error parsing `BarType` from '{input}', invalid token: '60' at position 1")
1534        );
1535    }
1536
1537    #[rstest]
1538    fn test_bar_type_parse_invalid_token_pos_2() {
1539        let input = "BTCUSDT-PERP.BINANCE-1-INVALID-LAST-INTERNAL";
1540        let result = BarType::from_str(input);
1541
1542        assert_eq!(
1543            result.unwrap_err().to_string(),
1544            format!(
1545                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 2"
1546            )
1547        );
1548    }
1549
1550    #[rstest]
1551    fn test_bar_type_parse_invalid_token_pos_3() {
1552        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-INVALID-INTERNAL";
1553        let result = BarType::from_str(input);
1554
1555        assert_eq!(
1556            result.unwrap_err().to_string(),
1557            format!(
1558                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 3"
1559            )
1560        );
1561    }
1562
1563    #[rstest]
1564    fn test_bar_type_parse_invalid_token_pos_4() {
1565        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-BID-INVALID";
1566        let result = BarType::from_str(input);
1567
1568        assert!(result.is_err());
1569        assert_eq!(
1570            result.unwrap_err().to_string(),
1571            format!(
1572                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 4"
1573            )
1574        );
1575    }
1576
1577    #[rstest]
1578    fn test_bar_type_parse_invalid_token_pos_5() {
1579        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@INVALID-MINUTE-EXTERNAL";
1580        let result = BarType::from_str(input);
1581
1582        assert!(result.is_err());
1583        assert_eq!(
1584            result.unwrap_err().to_string(),
1585            format!(
1586                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 5"
1587            )
1588        );
1589    }
1590
1591    #[rstest]
1592    fn test_bar_type_parse_invalid_composite_spec_step() {
1593        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@60-MINUTE-EXTERNAL";
1594        let result = BarType::from_str(input);
1595
1596        assert!(result.is_err());
1597        assert_eq!(
1598            result.unwrap_err().to_string(),
1599            format!("Error parsing `BarType` from '{input}', invalid token: '60' at position 5")
1600        );
1601    }
1602
1603    #[rstest]
1604    fn test_bar_type_parse_invalid_token_pos_6() {
1605        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-INVALID-EXTERNAL";
1606        let result = BarType::from_str(input);
1607
1608        assert!(result.is_err());
1609        assert_eq!(
1610            result.unwrap_err().to_string(),
1611            format!(
1612                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 6"
1613            )
1614        );
1615    }
1616
1617    #[rstest]
1618    fn test_bar_type_parse_invalid_token_pos_7() {
1619        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-INVALID";
1620        let result = BarType::from_str(input);
1621
1622        assert!(result.is_err());
1623        assert_eq!(
1624            result.unwrap_err().to_string(),
1625            format!(
1626                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 7"
1627            )
1628        );
1629    }
1630
1631    #[rstest]
1632    fn test_bar_type_parse_rejects_extra_composite_segment() {
1633        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL@1-HOUR-EXTERNAL";
1634        let result = BarType::from_str(input);
1635
1636        assert_eq!(
1637            result.unwrap_err().to_string(),
1638            format!(
1639                "Error parsing `BarType` from '{input}', invalid token: '1-HOUR-EXTERNAL' at position 5"
1640            )
1641        );
1642    }
1643
1644    #[rstest]
1645    fn test_bar_type_equality() {
1646        let instrument_id1 = InstrumentId {
1647            symbol: Symbol::new("AUD/USD"),
1648            venue: Venue::new("SIM"),
1649        };
1650        let instrument_id2 = InstrumentId {
1651            symbol: Symbol::new("GBP/USD"),
1652            venue: Venue::new("SIM"),
1653        };
1654        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1655        let bar_type1 = BarType::Standard {
1656            instrument_id: instrument_id1,
1657            spec: bar_spec,
1658            aggregation_source: AggregationSource::External,
1659        };
1660        let bar_type2 = BarType::Standard {
1661            instrument_id: instrument_id1,
1662            spec: bar_spec,
1663            aggregation_source: AggregationSource::External,
1664        };
1665        let bar_type3 = BarType::Standard {
1666            instrument_id: instrument_id2,
1667            spec: bar_spec,
1668            aggregation_source: AggregationSource::External,
1669        };
1670        assert_eq!(bar_type1, bar_type1);
1671        assert_eq!(bar_type1, bar_type2);
1672        assert_ne!(bar_type1, bar_type3);
1673    }
1674
1675    #[rstest]
1676    fn test_bar_type_id_spec_key_ignores_aggregation_source() {
1677        let bar_type_external = BarType::from_str("ESM4.XCME-1-MINUTE-LAST-EXTERNAL").unwrap();
1678        let bar_type_internal = BarType::from_str("ESM4.XCME-1-MINUTE-LAST-INTERNAL").unwrap();
1679
1680        // Full equality should differ
1681        assert_ne!(bar_type_external, bar_type_internal);
1682
1683        // id_spec_key should be the same
1684        assert_eq!(
1685            bar_type_external.id_spec_key(),
1686            bar_type_internal.id_spec_key()
1687        );
1688
1689        // Verify key components
1690        let (instrument_id, spec) = bar_type_external.id_spec_key();
1691        assert_eq!(instrument_id, bar_type_external.instrument_id());
1692        assert_eq!(spec, bar_type_external.spec());
1693    }
1694
1695    #[rstest]
1696    fn test_bar_type_comparison() {
1697        let instrument_id1 = InstrumentId {
1698            symbol: Symbol::new("AUD/USD"),
1699            venue: Venue::new("SIM"),
1700        };
1701
1702        let instrument_id2 = InstrumentId {
1703            symbol: Symbol::new("GBP/USD"),
1704            venue: Venue::new("SIM"),
1705        };
1706        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1707        let bar_spec2 = BarSpecification::new(2, BarAggregation::Minute, PriceType::Bid);
1708        let bar_type1 = BarType::Standard {
1709            instrument_id: instrument_id1,
1710            spec: bar_spec,
1711            aggregation_source: AggregationSource::External,
1712        };
1713        let bar_type2 = BarType::Standard {
1714            instrument_id: instrument_id1,
1715            spec: bar_spec,
1716            aggregation_source: AggregationSource::External,
1717        };
1718        let bar_type3 = BarType::Standard {
1719            instrument_id: instrument_id2,
1720            spec: bar_spec,
1721            aggregation_source: AggregationSource::External,
1722        };
1723        let bar_type4 = BarType::Composite {
1724            instrument_id: instrument_id2,
1725            spec: bar_spec2,
1726            aggregation_source: AggregationSource::Internal,
1727
1728            composite_step: 1,
1729            composite_aggregation: BarAggregation::Minute,
1730            composite_aggregation_source: AggregationSource::External,
1731        };
1732
1733        assert!(bar_type1 <= bar_type2);
1734        assert!(bar_type1 < bar_type3);
1735        assert!(bar_type3 > bar_type1);
1736        assert!(bar_type3 >= bar_type1);
1737        assert!(bar_type4 >= bar_type1);
1738    }
1739
1740    #[rstest]
1741    fn test_bar_new() {
1742        let bar_type = BarType::from("AAPL.XNAS-1-MINUTE-LAST-INTERNAL");
1743        let open = Price::from("100.0");
1744        let high = Price::from("105.0");
1745        let low = Price::from("95.0");
1746        let close = Price::from("102.0");
1747        let volume = Quantity::from("1000");
1748        let ts_event = UnixNanos::from(1_000_000);
1749        let ts_init = UnixNanos::from(2_000_000);
1750
1751        let bar = Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init);
1752
1753        assert_eq!(bar.bar_type, bar_type);
1754        assert_eq!(bar.open, open);
1755        assert_eq!(bar.high, high);
1756        assert_eq!(bar.low, low);
1757        assert_eq!(bar.close, close);
1758        assert_eq!(bar.volume, volume);
1759        assert_eq!(bar.ts_event, ts_event);
1760        assert_eq!(bar.ts_init, ts_init);
1761    }
1762
1763    #[rstest]
1764    #[case("100.0", "90.0", "95.0", "92.0", "high >= open")]
1765    #[case("100.0", "105.0", "110.0", "102.0", "high >= low")]
1766    #[case("100.0", "105.0", "95.0", "110.0", "high >= close")]
1767    #[case("100.0", "105.0", "95.0", "90.0", "low <= close")]
1768    #[case("100.0", "110.0", "105.0", "108.0", "low <= open")]
1769    #[case("100.0", "90.0", "110.0", "120.0", "high >= open")] // First failing predicate reported
1770    fn test_bar_new_checked_conditions(
1771        #[case] open: &str,
1772        #[case] high: &str,
1773        #[case] low: &str,
1774        #[case] close: &str,
1775        #[case] expected: &str,
1776    ) {
1777        let bar_type = BarType::from("AAPL.XNAS-1-MINUTE-LAST-INTERNAL");
1778        let open = Price::from(open);
1779        let high = Price::from(high);
1780        let low = Price::from(low);
1781        let close = Price::from(close);
1782        let volume = Quantity::from("1000");
1783        let ts_event = UnixNanos::from(1_000_000);
1784        let ts_init = UnixNanos::from(2_000_000);
1785
1786        let result = Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init);
1787
1788        let error = result.unwrap_err();
1789        assert!(
1790            error.to_string().contains(expected),
1791            "unexpected message: {error}"
1792        );
1793    }
1794
1795    #[rstest]
1796    fn test_bar_equality() {
1797        let instrument_id = InstrumentId {
1798            symbol: Symbol::new("AUDUSD"),
1799            venue: Venue::new("SIM"),
1800        };
1801        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1802        let bar_type = BarType::Standard {
1803            instrument_id,
1804            spec: bar_spec,
1805            aggregation_source: AggregationSource::External,
1806        };
1807        let bar1 = Bar {
1808            bar_type,
1809            open: Price::from("1.00001"),
1810            high: Price::from("1.00004"),
1811            low: Price::from("1.00002"),
1812            close: Price::from("1.00003"),
1813            volume: Quantity::from("100000"),
1814            ts_event: UnixNanos::default(),
1815            ts_init: UnixNanos::from(1),
1816        };
1817
1818        let bar2 = Bar {
1819            bar_type,
1820            open: Price::from("1.00000"),
1821            high: Price::from("1.00004"),
1822            low: Price::from("1.00002"),
1823            close: Price::from("1.00003"),
1824            volume: Quantity::from("100000"),
1825            ts_event: UnixNanos::default(),
1826            ts_init: UnixNanos::from(1),
1827        };
1828        assert_eq!(bar1, bar1);
1829        assert_ne!(bar1, bar2);
1830    }
1831
1832    #[rstest]
1833    fn test_json_serialization() {
1834        let bar = Bar::default();
1835        let serialized = bar.to_json_bytes().unwrap();
1836        let deserialized = Bar::from_json_bytes(serialized.as_ref()).unwrap();
1837        assert_eq!(deserialized, bar);
1838    }
1839
1840    #[rstest]
1841    fn test_msgpack_serialization() {
1842        let bar = Bar::default();
1843        let serialized = bar.to_msgpack_bytes().unwrap();
1844        let deserialized = Bar::from_msgpack_bytes(serialized.as_ref()).unwrap();
1845        assert_eq!(deserialized, bar);
1846    }
1847}