Skip to main content

nautilus_model/data/
bar.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregate structures, data types and functionality.
17
18use std::{
19    collections::HashMap,
20    fmt::{Debug, Display},
21    hash::Hash,
22    num::{NonZero, NonZeroUsize},
23    str::FromStr,
24};
25
26use chrono::{DateTime, Datelike, Duration, SubsecRound, TimeDelta, Timelike, Utc};
27use derive_builder::Builder;
28use indexmap::IndexMap;
29use nautilus_core::{
30    UnixNanos,
31    correctness::{FAILED, check_predicate_true},
32    datetime::{add_n_months, subtract_n_months},
33    serialization::Serializable,
34};
35use serde::{Deserialize, Deserializer, Serialize, Serializer};
36
37use super::HasTsInit;
38use crate::{
39    enums::{AggregationSource, BarAggregation, PriceType},
40    identifiers::InstrumentId,
41    types::{Price, Quantity, fixed::FIXED_SIZE_BINARY},
42};
43
44pub const BAR_SPEC_1_SECOND_LAST: BarSpecification = BarSpecification {
45    step: NonZero::new(1).unwrap(),
46    aggregation: BarAggregation::Second,
47    price_type: PriceType::Last,
48};
49pub const BAR_SPEC_1_MINUTE_LAST: BarSpecification = BarSpecification {
50    step: NonZero::new(1).unwrap(),
51    aggregation: BarAggregation::Minute,
52    price_type: PriceType::Last,
53};
54pub const BAR_SPEC_3_MINUTE_LAST: BarSpecification = BarSpecification {
55    step: NonZero::new(3).unwrap(),
56    aggregation: BarAggregation::Minute,
57    price_type: PriceType::Last,
58};
59pub const BAR_SPEC_5_MINUTE_LAST: BarSpecification = BarSpecification {
60    step: NonZero::new(5).unwrap(),
61    aggregation: BarAggregation::Minute,
62    price_type: PriceType::Last,
63};
64pub const BAR_SPEC_15_MINUTE_LAST: BarSpecification = BarSpecification {
65    step: NonZero::new(15).unwrap(),
66    aggregation: BarAggregation::Minute,
67    price_type: PriceType::Last,
68};
69pub const BAR_SPEC_30_MINUTE_LAST: BarSpecification = BarSpecification {
70    step: NonZero::new(30).unwrap(),
71    aggregation: BarAggregation::Minute,
72    price_type: PriceType::Last,
73};
74pub const BAR_SPEC_1_HOUR_LAST: BarSpecification = BarSpecification {
75    step: NonZero::new(1).unwrap(),
76    aggregation: BarAggregation::Hour,
77    price_type: PriceType::Last,
78};
79pub const BAR_SPEC_2_HOUR_LAST: BarSpecification = BarSpecification {
80    step: NonZero::new(2).unwrap(),
81    aggregation: BarAggregation::Hour,
82    price_type: PriceType::Last,
83};
84pub const BAR_SPEC_4_HOUR_LAST: BarSpecification = BarSpecification {
85    step: NonZero::new(4).unwrap(),
86    aggregation: BarAggregation::Hour,
87    price_type: PriceType::Last,
88};
89pub const BAR_SPEC_6_HOUR_LAST: BarSpecification = BarSpecification {
90    step: NonZero::new(6).unwrap(),
91    aggregation: BarAggregation::Hour,
92    price_type: PriceType::Last,
93};
94pub const BAR_SPEC_12_HOUR_LAST: BarSpecification = BarSpecification {
95    step: NonZero::new(12).unwrap(),
96    aggregation: BarAggregation::Hour,
97    price_type: PriceType::Last,
98};
99pub const BAR_SPEC_1_DAY_LAST: BarSpecification = BarSpecification {
100    step: NonZero::new(1).unwrap(),
101    aggregation: BarAggregation::Day,
102    price_type: PriceType::Last,
103};
104pub const BAR_SPEC_2_DAY_LAST: BarSpecification = BarSpecification {
105    step: NonZero::new(2).unwrap(),
106    aggregation: BarAggregation::Day,
107    price_type: PriceType::Last,
108};
109pub const BAR_SPEC_3_DAY_LAST: BarSpecification = BarSpecification {
110    step: NonZero::new(3).unwrap(),
111    aggregation: BarAggregation::Day,
112    price_type: PriceType::Last,
113};
114pub const BAR_SPEC_5_DAY_LAST: BarSpecification = BarSpecification {
115    step: NonZero::new(5).unwrap(),
116    aggregation: BarAggregation::Day,
117    price_type: PriceType::Last,
118};
119pub const BAR_SPEC_1_WEEK_LAST: BarSpecification = BarSpecification {
120    step: NonZero::new(1).unwrap(),
121    aggregation: BarAggregation::Week,
122    price_type: PriceType::Last,
123};
124pub const BAR_SPEC_1_MONTH_LAST: BarSpecification = BarSpecification {
125    step: NonZero::new(1).unwrap(),
126    aggregation: BarAggregation::Month,
127    price_type: PriceType::Last,
128};
129pub const BAR_SPEC_3_MONTH_LAST: BarSpecification = BarSpecification {
130    step: NonZero::new(3).unwrap(),
131    aggregation: BarAggregation::Month,
132    price_type: PriceType::Last,
133};
134pub const BAR_SPEC_6_MONTH_LAST: BarSpecification = BarSpecification {
135    step: NonZero::new(6).unwrap(),
136    aggregation: BarAggregation::Month,
137    price_type: PriceType::Last,
138};
139pub const BAR_SPEC_12_MONTH_LAST: BarSpecification = BarSpecification {
140    step: NonZero::new(12).unwrap(),
141    aggregation: BarAggregation::Month,
142    price_type: PriceType::Last,
143};
144
145/// Returns the bar interval as a `TimeDelta`.
146///
147/// # Panics
148///
149/// Panics if the aggregation method of the given `bar_type` is not time based.
150#[must_use]
151pub fn get_bar_interval(bar_type: &BarType) -> TimeDelta {
152    let spec = bar_type.spec();
153
154    match spec.aggregation {
155        BarAggregation::Millisecond => TimeDelta::milliseconds(spec.step.get() as i64),
156        BarAggregation::Second => TimeDelta::seconds(spec.step.get() as i64),
157        BarAggregation::Minute => TimeDelta::minutes(spec.step.get() as i64),
158        BarAggregation::Hour => TimeDelta::hours(spec.step.get() as i64),
159        BarAggregation::Day => TimeDelta::days(spec.step.get() as i64),
160        BarAggregation::Week => TimeDelta::days(7 * spec.step.get() as i64),
161        BarAggregation::Month => TimeDelta::days(30 * spec.step.get() as i64), // Proxy for comparing bar lengths
162        BarAggregation::Year => TimeDelta::days(365 * spec.step.get() as i64), // Proxy for comparing bar lengths
163        _ => panic!("Aggregation not time based"),
164    }
165}
166
167/// Returns the bar interval as `UnixNanos`.
168///
169/// # Panics
170///
171/// Panics if the aggregation method of the given `bar_type` is not time based.
172#[must_use]
173pub fn get_bar_interval_ns(bar_type: &BarType) -> UnixNanos {
174    let interval_ns = get_bar_interval(bar_type)
175        .num_nanoseconds()
176        .expect("Invalid bar interval") as u64;
177    UnixNanos::from(interval_ns)
178}
179
180/// Returns the time bar start as a timezone-aware `DateTime<Utc>`.
181///
182/// # Panics
183///
184/// Panics if computing the base `NaiveDate` or `DateTime` from `now` fails,
185/// or if the aggregation type is unsupported.
186pub fn get_time_bar_start(
187    now: DateTime<Utc>,
188    bar_type: &BarType,
189    time_bars_origin: Option<TimeDelta>,
190) -> DateTime<Utc> {
191    let spec = bar_type.spec();
192    let step = spec.step.get() as i64;
193    let origin_offset: TimeDelta = time_bars_origin.unwrap_or_else(TimeDelta::zero);
194
195    match spec.aggregation {
196        BarAggregation::Millisecond => {
197            find_closest_smaller_time(now, origin_offset, Duration::milliseconds(step))
198        }
199        BarAggregation::Second => {
200            find_closest_smaller_time(now, origin_offset, Duration::seconds(step))
201        }
202        BarAggregation::Minute => {
203            find_closest_smaller_time(now, origin_offset, Duration::minutes(step))
204        }
205        BarAggregation::Hour => {
206            find_closest_smaller_time(now, origin_offset, Duration::hours(step))
207        }
208        BarAggregation::Day => find_closest_smaller_time(now, origin_offset, Duration::days(step)),
209        BarAggregation::Week => {
210            let mut start_time = now.trunc_subsecs(0)
211                - Duration::seconds(i64::from(now.second()))
212                - Duration::minutes(i64::from(now.minute()))
213                - Duration::hours(i64::from(now.hour()))
214                - TimeDelta::days(i64::from(now.weekday().num_days_from_monday()));
215            start_time += origin_offset;
216
217            if now < start_time {
218                start_time -= Duration::weeks(step);
219            }
220
221            start_time
222        }
223        BarAggregation::Month => {
224            // Set to the first day of the year
225            let mut start_time = DateTime::from_naive_utc_and_offset(
226                chrono::NaiveDate::from_ymd_opt(now.year(), 1, 1)
227                    .expect("valid date")
228                    .and_hms_opt(0, 0, 0)
229                    .expect("valid time"),
230                Utc,
231            );
232            start_time += origin_offset;
233
234            if now < start_time {
235                start_time =
236                    subtract_n_months(start_time, 12).expect("Failed to subtract 12 months");
237            }
238
239            let months_step = step as u32;
240
241            while start_time <= now {
242                start_time =
243                    add_n_months(start_time, months_step).expect("Failed to add months in loop");
244            }
245
246            start_time =
247                subtract_n_months(start_time, months_step).expect("Failed to subtract months_step");
248            start_time
249        }
250        BarAggregation::Year => {
251            let step_i32 = step as i32;
252
253            // Reconstruct from Jan 1 + origin each time to avoid leap-day drift
254            let year_start = |y: i32| {
255                DateTime::from_naive_utc_and_offset(
256                    chrono::NaiveDate::from_ymd_opt(y, 1, 1)
257                        .expect("valid date")
258                        .and_hms_opt(0, 0, 0)
259                        .expect("valid time"),
260                    Utc,
261                ) + origin_offset
262            };
263
264            let mut year = now.year();
265            if year_start(year) > now {
266                year -= step_i32;
267            }
268
269            while year_start(year + step_i32) <= now {
270                year += step_i32;
271            }
272
273            year_start(year)
274        }
275        _ => panic!(
276            "Aggregation type {} not supported for time bars",
277            spec.aggregation
278        ),
279    }
280}
281
282/// Finds the closest smaller time based on a daily time origin and period.
283///
284/// This function calculates the most recent time that is aligned with the given period
285/// and is less than or equal to the current time.
286fn find_closest_smaller_time(
287    now: DateTime<Utc>,
288    daily_time_origin: TimeDelta,
289    period: TimeDelta,
290) -> DateTime<Utc> {
291    // Floor to start of day
292    let day_start = now.trunc_subsecs(0)
293        - Duration::seconds(i64::from(now.second()))
294        - Duration::minutes(i64::from(now.minute()))
295        - Duration::hours(i64::from(now.hour()));
296    let base_time = day_start + daily_time_origin;
297
298    let time_difference = now - base_time;
299    let period_ns = period.num_nanoseconds().unwrap_or(1);
300
301    // Use div_euclid for floor division (rounds toward -inf, not zero)
302    // so negative deltas (now before origin) yield the previous period boundary
303    let num_periods = time_difference
304        .num_nanoseconds()
305        .unwrap_or(0)
306        .div_euclid(period_ns);
307
308    base_time + TimeDelta::nanoseconds(num_periods * period_ns)
309}
310
311/// Represents a bar aggregation specification including a step, aggregation
312/// method/rule and price type.
313#[repr(C)]
314#[derive(
315    Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Builder,
316)]
317#[cfg_attr(
318    feature = "python",
319    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
320)]
321#[cfg_attr(
322    feature = "python",
323    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
324)]
325pub struct BarSpecification {
326    /// The step for binning samples for bar aggregation.
327    pub step: NonZeroUsize,
328    /// The type of bar aggregation.
329    pub aggregation: BarAggregation,
330    /// The price type to use for aggregation.
331    pub price_type: PriceType,
332}
333
334impl BarSpecification {
335    /// Creates a new [`BarSpecification`] instance with correctness checking.
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if `step` is not positive (> 0), or if `step` is not
340    /// valid for a fixed-subunit time aggregation.
341    ///
342    /// # Notes
343    ///
344    /// PyO3 requires a `Result` type for proper error handling and stacktrace printing in Python.
345    pub fn new_checked(
346        step: usize,
347        aggregation: BarAggregation,
348        price_type: PriceType,
349    ) -> anyhow::Result<Self> {
350        let step = NonZeroUsize::new(step)
351            .ok_or(anyhow::anyhow!("Invalid step: {step} (must be non-zero)"))?;
352        Self::validate_step(step.get(), aggregation)?;
353
354        Ok(Self {
355            step,
356            aggregation,
357            price_type,
358        })
359    }
360
361    fn validate_step(step: usize, aggregation: BarAggregation) -> anyhow::Result<()> {
362        match aggregation {
363            BarAggregation::Millisecond => {
364                Self::validate_periodic_step(step, aggregation, 1000, false)
365            }
366            BarAggregation::Second | BarAggregation::Minute => {
367                Self::validate_periodic_step(step, aggregation, 60, false)
368            }
369            BarAggregation::Hour => Self::validate_periodic_step(step, aggregation, 24, false),
370            BarAggregation::Month => Self::validate_periodic_step(step, aggregation, 12, false),
371            _ => Ok(()),
372        }
373    }
374
375    fn validate_periodic_step(
376        step: usize,
377        aggregation: BarAggregation,
378        subunits: usize,
379        allow_equal: bool,
380    ) -> anyhow::Result<()> {
381        if !subunits.is_multiple_of(step) {
382            anyhow::bail!(
383                "Invalid step in bar_type.spec.step: {step} for aggregation={}. \
384                 step must evenly divide {subunits} (so it is periodic).",
385                aggregation as u8
386            );
387        }
388
389        if !allow_equal && subunits == step {
390            anyhow::bail!(
391                "Invalid step in bar_type.spec.step: {step} for aggregation={}. \
392                 step must not be {subunits}. Use higher aggregation unit instead.",
393                aggregation as u8
394            );
395        }
396
397        Ok(())
398    }
399
400    /// Creates a new [`BarSpecification`] instance.
401    ///
402    /// # Panics
403    ///
404    /// Panics if `step` is not positive (> 0), or if `step` is not valid for
405    /// a fixed-subunit time aggregation.
406    #[must_use]
407    pub fn new(step: usize, aggregation: BarAggregation, price_type: PriceType) -> Self {
408        Self::new_checked(step, aggregation, price_type).expect(FAILED)
409    }
410
411    /// Returns the `TimeDelta` interval for this bar specification.
412    ///
413    /// # Notes
414    ///
415    /// For [`BarAggregation::Month`] and [`BarAggregation::Year`], proxy values are used
416    /// (30 days for months, 365 days for years) to estimate their respective durations,
417    /// since months and years have variable lengths.
418    ///
419    /// # Panics
420    ///
421    /// Panics if the aggregation method is not time-based.
422    #[must_use]
423    pub fn timedelta(&self) -> TimeDelta {
424        match self.aggregation {
425            BarAggregation::Millisecond => Duration::milliseconds(self.step.get() as i64),
426            BarAggregation::Second => Duration::seconds(self.step.get() as i64),
427            BarAggregation::Minute => Duration::minutes(self.step.get() as i64),
428            BarAggregation::Hour => Duration::hours(self.step.get() as i64),
429            BarAggregation::Day => Duration::days(self.step.get() as i64),
430            BarAggregation::Week => Duration::days(self.step.get() as i64 * 7),
431            BarAggregation::Month => Duration::days(self.step.get() as i64 * 30), // Proxy for comparing bar lengths
432            BarAggregation::Year => Duration::days(self.step.get() as i64 * 365), // Proxy for comparing bar lengths
433            _ => panic!(
434                "Timedelta not supported for aggregation type: {:?}",
435                self.aggregation
436            ),
437        }
438    }
439
440    /// Return a value indicating whether the aggregation method is time-driven:
441    ///  - [`BarAggregation::Millisecond`]
442    ///  - [`BarAggregation::Second`]
443    ///  - [`BarAggregation::Minute`]
444    ///  - [`BarAggregation::Hour`]
445    ///  - [`BarAggregation::Day`]
446    ///  - [`BarAggregation::Week`]
447    ///  - [`BarAggregation::Month`]
448    ///  - [`BarAggregation::Year`]
449    #[must_use]
450    pub fn is_time_aggregated(&self) -> bool {
451        matches!(
452            self.aggregation,
453            BarAggregation::Millisecond
454                | BarAggregation::Second
455                | BarAggregation::Minute
456                | BarAggregation::Hour
457                | BarAggregation::Day
458                | BarAggregation::Week
459                | BarAggregation::Month
460                | BarAggregation::Year
461        )
462    }
463
464    /// Return a value indicating whether the aggregation method is threshold-driven:
465    ///  - [`BarAggregation::Tick`]
466    ///  - [`BarAggregation::TickImbalance`]
467    ///  - [`BarAggregation::Volume`]
468    ///  - [`BarAggregation::VolumeImbalance`]
469    ///  - [`BarAggregation::Value`]
470    ///  - [`BarAggregation::ValueImbalance`]
471    #[must_use]
472    pub fn is_threshold_aggregated(&self) -> bool {
473        matches!(
474            self.aggregation,
475            BarAggregation::Tick
476                | BarAggregation::TickImbalance
477                | BarAggregation::Volume
478                | BarAggregation::VolumeImbalance
479                | BarAggregation::Value
480                | BarAggregation::ValueImbalance
481        )
482    }
483
484    /// Return a value indicating whether the aggregation method is information-driven:
485    ///  - [`BarAggregation::TickRuns`]
486    ///  - [`BarAggregation::VolumeRuns`]
487    ///  - [`BarAggregation::ValueRuns`]
488    #[must_use]
489    pub fn is_information_aggregated(&self) -> bool {
490        matches!(
491            self.aggregation,
492            BarAggregation::TickRuns | BarAggregation::VolumeRuns | BarAggregation::ValueRuns
493        )
494    }
495}
496
497impl Display for BarSpecification {
498    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499        write!(f, "{}-{}-{}", self.step, self.aggregation, self.price_type)
500    }
501}
502
503/// Represents a bar type including the instrument ID, bar specification and
504/// aggregation source.
505#[repr(C)]
506#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
507#[cfg_attr(
508    feature = "python",
509    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
510)]
511#[cfg_attr(
512    feature = "python",
513    pyo3_stub_gen::derive::gen_stub_pyclass_enum(module = "nautilus_trader.model")
514)]
515pub enum BarType {
516    Standard {
517        /// The bar type's instrument ID.
518        instrument_id: InstrumentId,
519        /// The bar type's specification.
520        spec: BarSpecification,
521        /// The bar type's aggregation source.
522        aggregation_source: AggregationSource,
523    },
524    Composite {
525        /// The bar type's instrument ID.
526        instrument_id: InstrumentId,
527        /// The bar type's specification.
528        spec: BarSpecification,
529        /// The bar type's aggregation source.
530        aggregation_source: AggregationSource,
531
532        /// The composite step for binning samples for bar aggregation.
533        composite_step: usize,
534        /// The composite type of bar aggregation.
535        composite_aggregation: BarAggregation,
536        /// The composite bar type's aggregation source.
537        composite_aggregation_source: AggregationSource,
538    },
539}
540
541impl BarType {
542    /// Creates a new [`BarType`] instance.
543    #[must_use]
544    pub fn new(
545        instrument_id: InstrumentId,
546        spec: BarSpecification,
547        aggregation_source: AggregationSource,
548    ) -> Self {
549        Self::Standard {
550            instrument_id,
551            spec,
552            aggregation_source,
553        }
554    }
555
556    /// Creates a new composite [`BarType`] instance.
557    #[must_use]
558    pub fn new_composite(
559        instrument_id: InstrumentId,
560        spec: BarSpecification,
561        aggregation_source: AggregationSource,
562
563        composite_step: usize,
564        composite_aggregation: BarAggregation,
565        composite_aggregation_source: AggregationSource,
566    ) -> Self {
567        Self::Composite {
568            instrument_id,
569            spec,
570            aggregation_source,
571
572            composite_step,
573            composite_aggregation,
574            composite_aggregation_source,
575        }
576    }
577
578    /// Returns whether this instance is a standard bar type.
579    #[must_use]
580    pub fn is_standard(&self) -> bool {
581        match &self {
582            Self::Standard { .. } => true,
583            Self::Composite { .. } => false,
584        }
585    }
586
587    /// Returns whether this instance is a composite bar type.
588    #[must_use]
589    pub fn is_composite(&self) -> bool {
590        match &self {
591            Self::Standard { .. } => false,
592            Self::Composite { .. } => true,
593        }
594    }
595
596    /// Returns whether the bar aggregation source is `EXTERNAL`.
597    #[must_use]
598    pub fn is_externally_aggregated(&self) -> bool {
599        self.aggregation_source() == AggregationSource::External
600    }
601
602    /// Returns whether the bar aggregation source is `INTERNAL`.
603    #[must_use]
604    pub fn is_internally_aggregated(&self) -> bool {
605        self.aggregation_source() == AggregationSource::Internal
606    }
607
608    /// Returns the standard bar type component.
609    #[must_use]
610    pub fn standard(&self) -> Self {
611        match self {
612            &b @ Self::Standard { .. } => b,
613            Self::Composite {
614                instrument_id,
615                spec,
616                aggregation_source,
617                ..
618            } => Self::new(*instrument_id, *spec, *aggregation_source),
619        }
620    }
621
622    /// Returns any composite bar type component.
623    #[must_use]
624    pub fn composite(&self) -> Self {
625        match self {
626            &b @ Self::Standard { .. } => b, // case shouldn't be used if is_composite is called before
627            Self::Composite {
628                instrument_id,
629                spec,
630                aggregation_source: _,
631
632                composite_step,
633                composite_aggregation,
634                composite_aggregation_source,
635            } => Self::new(
636                *instrument_id,
637                BarSpecification::new(*composite_step, *composite_aggregation, spec.price_type),
638                *composite_aggregation_source,
639            ),
640        }
641    }
642
643    /// Returns the [`InstrumentId`] for this bar type.
644    #[must_use]
645    pub fn instrument_id(&self) -> InstrumentId {
646        match &self {
647            Self::Standard { instrument_id, .. } | Self::Composite { instrument_id, .. } => {
648                *instrument_id
649            }
650        }
651    }
652
653    /// Returns the [`BarSpecification`] for this bar type.
654    #[must_use]
655    pub fn spec(&self) -> BarSpecification {
656        match &self {
657            Self::Standard { spec, .. } | Self::Composite { spec, .. } => *spec,
658        }
659    }
660
661    /// Returns the [`AggregationSource`] for this bar type.
662    #[must_use]
663    pub fn aggregation_source(&self) -> AggregationSource {
664        match &self {
665            Self::Standard {
666                aggregation_source, ..
667            }
668            | Self::Composite {
669                aggregation_source, ..
670            } => *aggregation_source,
671        }
672    }
673
674    /// Returns the instrument ID and bar specification as a tuple key.
675    ///
676    /// Useful as a hashmap key when aggregation source should be ignored,
677    /// such as for indicator registration where INTERNAL and EXTERNAL bars
678    /// should trigger the same indicators.
679    #[must_use]
680    pub fn id_spec_key(&self) -> (InstrumentId, BarSpecification) {
681        (self.instrument_id(), self.spec())
682    }
683}
684
685#[derive(thiserror::Error, Debug)]
686#[error("Error parsing `BarType` from '{input}', invalid token: '{token}' at position {position}")]
687pub struct BarTypeParseError {
688    input: String,
689    token: String,
690    position: usize,
691}
692
693impl FromStr for BarType {
694    type Err = BarTypeParseError;
695
696    #[expect(clippy::needless_collect)] // Collect needed for .rev() and indexing
697    fn from_str(s: &str) -> Result<Self, Self::Err> {
698        let parts: Vec<&str> = s.split('@').collect();
699        let standard = parts[0];
700        let composite_str = parts.get(1);
701
702        let pieces: Vec<&str> = standard.rsplitn(5, '-').collect();
703        let rev_pieces: Vec<&str> = pieces.into_iter().rev().collect();
704        if rev_pieces.len() != 5 {
705            return Err(BarTypeParseError {
706                input: s.to_string(),
707                token: String::new(),
708                position: 0,
709            });
710        }
711
712        let instrument_id =
713            InstrumentId::from_str(rev_pieces[0]).map_err(|_| BarTypeParseError {
714                input: s.to_string(),
715                token: rev_pieces[0].to_string(),
716                position: 0,
717            })?;
718
719        let step = rev_pieces[1].parse().map_err(|_| BarTypeParseError {
720            input: s.to_string(),
721            token: rev_pieces[1].to_string(),
722            position: 1,
723        })?;
724        let aggregation =
725            BarAggregation::from_str(rev_pieces[2]).map_err(|_| BarTypeParseError {
726                input: s.to_string(),
727                token: rev_pieces[2].to_string(),
728                position: 2,
729            })?;
730        let price_type = PriceType::from_str(rev_pieces[3]).map_err(|_| BarTypeParseError {
731            input: s.to_string(),
732            token: rev_pieces[3].to_string(),
733            position: 3,
734        })?;
735        let aggregation_source =
736            AggregationSource::from_str(rev_pieces[4]).map_err(|_| BarTypeParseError {
737                input: s.to_string(),
738                token: rev_pieces[4].to_string(),
739                position: 4,
740            })?;
741        let spec = BarSpecification::new_checked(step, aggregation, price_type).map_err(|_| {
742            BarTypeParseError {
743                input: s.to_string(),
744                token: rev_pieces[1].to_string(),
745                position: 1,
746            }
747        })?;
748
749        if let Some(composite_str) = composite_str {
750            let composite_pieces: Vec<&str> = composite_str.rsplitn(3, '-').collect();
751            let rev_composite_pieces: Vec<&str> = composite_pieces.into_iter().rev().collect();
752            if rev_composite_pieces.len() != 3 {
753                return Err(BarTypeParseError {
754                    input: s.to_string(),
755                    token: String::new(),
756                    position: 5,
757                });
758            }
759
760            let composite_step =
761                rev_composite_pieces[0]
762                    .parse()
763                    .map_err(|_| BarTypeParseError {
764                        input: s.to_string(),
765                        token: rev_composite_pieces[0].to_string(),
766                        position: 5,
767                    })?;
768            let composite_aggregation =
769                BarAggregation::from_str(rev_composite_pieces[1]).map_err(|_| {
770                    BarTypeParseError {
771                        input: s.to_string(),
772                        token: rev_composite_pieces[1].to_string(),
773                        position: 6,
774                    }
775                })?;
776            let composite_aggregation_source = AggregationSource::from_str(rev_composite_pieces[2])
777                .map_err(|_| BarTypeParseError {
778                    input: s.to_string(),
779                    token: rev_composite_pieces[2].to_string(),
780                    position: 7,
781                })?;
782            BarSpecification::new_checked(composite_step, composite_aggregation, price_type)
783                .map_err(|_| BarTypeParseError {
784                    input: s.to_string(),
785                    token: rev_composite_pieces[0].to_string(),
786                    position: 5,
787                })?;
788
789            Ok(Self::new_composite(
790                instrument_id,
791                spec,
792                aggregation_source,
793                composite_step,
794                composite_aggregation,
795                composite_aggregation_source,
796            ))
797        } else {
798            Ok(Self::Standard {
799                instrument_id,
800                spec,
801                aggregation_source,
802            })
803        }
804    }
805}
806
807impl<T: AsRef<str>> From<T> for BarType {
808    fn from(value: T) -> Self {
809        Self::from_str(value.as_ref()).expect(FAILED)
810    }
811}
812
813impl Display for BarType {
814    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
815        match &self {
816            Self::Standard {
817                instrument_id,
818                spec,
819                aggregation_source,
820            } => {
821                write!(f, "{instrument_id}-{spec}-{aggregation_source}")
822            }
823            Self::Composite {
824                instrument_id,
825                spec,
826                aggregation_source,
827
828                composite_step,
829                composite_aggregation,
830                composite_aggregation_source,
831            } => {
832                write!(
833                    f,
834                    "{}-{}-{}@{}-{}-{}",
835                    instrument_id,
836                    spec,
837                    aggregation_source,
838                    *composite_step,
839                    *composite_aggregation,
840                    *composite_aggregation_source
841                )
842            }
843        }
844    }
845}
846
847impl Serialize for BarType {
848    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
849    where
850        S: Serializer,
851    {
852        serializer.serialize_str(&self.to_string())
853    }
854}
855
856impl<'de> Deserialize<'de> for BarType {
857    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
858    where
859        D: Deserializer<'de>,
860    {
861        let s: std::borrow::Cow<'de, str> = Deserialize::deserialize(deserializer)?;
862        Self::from_str(s.as_ref()).map_err(serde::de::Error::custom)
863    }
864}
865
866/// Represents an aggregated bar.
867#[repr(C)]
868#[derive(Clone, Copy, Hash, PartialEq, Eq, Debug, Serialize, Deserialize)]
869#[serde(tag = "type")]
870#[cfg_attr(
871    feature = "python",
872    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
873)]
874#[cfg_attr(
875    feature = "python",
876    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
877)]
878pub struct Bar {
879    /// The bar type for this bar.
880    pub bar_type: BarType,
881    /// The bars open price.
882    pub open: Price,
883    /// The bars high price.
884    pub high: Price,
885    /// The bars low price.
886    pub low: Price,
887    /// The bars close price.
888    pub close: Price,
889    /// The bars volume.
890    pub volume: Quantity,
891    /// UNIX timestamp (nanoseconds) when the data event occurred.
892    pub ts_event: UnixNanos,
893    /// UNIX timestamp (nanoseconds) when the instance was created.
894    pub ts_init: UnixNanos,
895}
896
897impl Bar {
898    /// Creates a new [`Bar`] instance with correctness checking.
899    ///
900    /// # Errors
901    ///
902    /// Returns an error if:
903    /// - `high` is not >= `low`.
904    /// - `high` is not >= `close`.
905    /// - `low` is not <= `close`.
906    ///
907    /// # Notes
908    ///
909    /// PyO3 requires a `Result` type for proper error handling and stacktrace printing in Python.
910    #[expect(clippy::too_many_arguments)]
911    pub fn new_checked(
912        bar_type: BarType,
913        open: Price,
914        high: Price,
915        low: Price,
916        close: Price,
917        volume: Quantity,
918        ts_event: UnixNanos,
919        ts_init: UnixNanos,
920    ) -> anyhow::Result<Self> {
921        check_predicate_true(high >= open, "high >= open")?;
922        check_predicate_true(high >= low, "high >= low")?;
923        check_predicate_true(high >= close, "high >= close")?;
924        check_predicate_true(low <= close, "low <= close")?;
925        check_predicate_true(low <= open, "low <= open")?;
926
927        Ok(Self {
928            bar_type,
929            open,
930            high,
931            low,
932            close,
933            volume,
934            ts_event,
935            ts_init,
936        })
937    }
938
939    /// Creates a new [`Bar`] instance.
940    ///
941    /// # Panics
942    ///
943    /// This function panics if:
944    /// - `high` is not >= `low`.
945    /// - `high` is not >= `close`.
946    /// - `low` is not <= `close`.
947    #[expect(clippy::too_many_arguments)]
948    #[must_use]
949    pub fn new(
950        bar_type: BarType,
951        open: Price,
952        high: Price,
953        low: Price,
954        close: Price,
955        volume: Quantity,
956        ts_event: UnixNanos,
957        ts_init: UnixNanos,
958    ) -> Self {
959        Self::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
960            .expect(FAILED)
961    }
962
963    #[must_use]
964    pub fn instrument_id(&self) -> InstrumentId {
965        self.bar_type.instrument_id()
966    }
967
968    /// Returns the metadata for the type, for use with serialization formats.
969    #[must_use]
970    pub fn get_metadata(
971        bar_type: &BarType,
972        price_precision: u8,
973        size_precision: u8,
974    ) -> HashMap<String, String> {
975        let mut metadata = HashMap::new();
976        let instrument_id = bar_type.instrument_id();
977        metadata.insert("bar_type".to_string(), bar_type.to_string());
978        metadata.insert("instrument_id".to_string(), instrument_id.to_string());
979        metadata.insert("price_precision".to_string(), price_precision.to_string());
980        metadata.insert("size_precision".to_string(), size_precision.to_string());
981        metadata
982    }
983
984    /// Returns the field map for the type, for use with Arrow schemas.
985    #[must_use]
986    pub fn get_fields() -> IndexMap<String, String> {
987        let mut metadata = IndexMap::new();
988        metadata.insert("open".to_string(), FIXED_SIZE_BINARY.to_string());
989        metadata.insert("high".to_string(), FIXED_SIZE_BINARY.to_string());
990        metadata.insert("low".to_string(), FIXED_SIZE_BINARY.to_string());
991        metadata.insert("close".to_string(), FIXED_SIZE_BINARY.to_string());
992        metadata.insert("volume".to_string(), FIXED_SIZE_BINARY.to_string());
993        metadata.insert("ts_event".to_string(), "UInt64".to_string());
994        metadata.insert("ts_init".to_string(), "UInt64".to_string());
995        metadata
996    }
997}
998
999impl Display for Bar {
1000    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1001        write!(
1002            f,
1003            "{},{},{},{},{},{},{}",
1004            self.bar_type, self.open, self.high, self.low, self.close, self.volume, self.ts_event
1005        )
1006    }
1007}
1008
1009impl Serializable for Bar {}
1010
1011impl HasTsInit for Bar {
1012    fn ts_init(&self) -> UnixNanos {
1013        self.ts_init
1014    }
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019    use std::str::FromStr;
1020
1021    use chrono::TimeZone;
1022    use nautilus_core::serialization::msgpack::{FromMsgPack, ToMsgPack};
1023    use rstest::rstest;
1024
1025    use super::*;
1026    use crate::identifiers::{Symbol, Venue};
1027
1028    #[rstest]
1029    fn test_bar_specification_new_invalid() {
1030        let result = BarSpecification::new_checked(0, BarAggregation::Tick, PriceType::Last);
1031        assert!(result.is_err());
1032    }
1033
1034    #[rstest]
1035    #[should_panic(expected = "Invalid step: 0 (must be non-zero)")]
1036    fn test_bar_specification_new_checked_with_invalid_step_panics() {
1037        let aggregation = BarAggregation::Tick;
1038        let price_type = PriceType::Last;
1039
1040        let _ = BarSpecification::new(0, aggregation, price_type);
1041    }
1042
1043    #[rstest]
1044    #[should_panic(expected = "Invalid step in bar_type.spec.step: 7")]
1045    fn test_bar_specification_new_with_invalid_periodic_step_panics() {
1046        let _ = BarSpecification::new(7, BarAggregation::Minute, PriceType::Last);
1047    }
1048
1049    #[rstest]
1050    #[case(
1051        BarAggregation::Millisecond,
1052        12,
1053        "Invalid step in bar_type.spec.step: 12 for aggregation=10. step must evenly divide 1000"
1054    )]
1055    #[case(
1056        BarAggregation::Millisecond,
1057        1000,
1058        "Invalid step in bar_type.spec.step: 1000 for aggregation=10. step must not be 1000"
1059    )]
1060    #[case(
1061        BarAggregation::Second,
1062        50,
1063        "Invalid step in bar_type.spec.step: 50 for aggregation=11. step must evenly divide 60"
1064    )]
1065    #[case(
1066        BarAggregation::Second,
1067        60,
1068        "Invalid step in bar_type.spec.step: 60 for aggregation=11. step must not be 60"
1069    )]
1070    #[case(
1071        BarAggregation::Minute,
1072        40,
1073        "Invalid step in bar_type.spec.step: 40 for aggregation=12. step must evenly divide 60"
1074    )]
1075    #[case(
1076        BarAggregation::Minute,
1077        60,
1078        "Invalid step in bar_type.spec.step: 60 for aggregation=12. step must not be 60"
1079    )]
1080    #[case(
1081        BarAggregation::Hour,
1082        5,
1083        "Invalid step in bar_type.spec.step: 5 for aggregation=13. step must evenly divide 24"
1084    )]
1085    #[case(
1086        BarAggregation::Hour,
1087        13,
1088        "Invalid step in bar_type.spec.step: 13 for aggregation=13. step must evenly divide 24"
1089    )]
1090    #[case(
1091        BarAggregation::Hour,
1092        24,
1093        "Invalid step in bar_type.spec.step: 24 for aggregation=13. step must not be 24"
1094    )]
1095    #[case(
1096        BarAggregation::Month,
1097        5,
1098        "Invalid step in bar_type.spec.step: 5 for aggregation=16. step must evenly divide 12"
1099    )]
1100    #[case(
1101        BarAggregation::Month,
1102        12,
1103        "Invalid step in bar_type.spec.step: 12 for aggregation=16. step must not be 12"
1104    )]
1105    fn test_bar_specification_new_checked_invalid_periodic_step(
1106        #[case] aggregation: BarAggregation,
1107        #[case] step: usize,
1108        #[case] expected: &str,
1109    ) {
1110        let result = BarSpecification::new_checked(step, aggregation, PriceType::Last);
1111
1112        assert!(result.unwrap_err().to_string().starts_with(expected));
1113    }
1114
1115    #[rstest]
1116    #[case(BarAggregation::Day)]
1117    #[case(BarAggregation::Week)]
1118    #[case(BarAggregation::Year)]
1119    #[case(BarAggregation::Tick)]
1120    #[case(BarAggregation::TickImbalance)]
1121    #[case(BarAggregation::TickRuns)]
1122    #[case(BarAggregation::Volume)]
1123    #[case(BarAggregation::VolumeImbalance)]
1124    #[case(BarAggregation::VolumeRuns)]
1125    #[case(BarAggregation::Value)]
1126    #[case(BarAggregation::ValueImbalance)]
1127    #[case(BarAggregation::ValueRuns)]
1128    #[case(BarAggregation::Renko)]
1129    fn test_bar_specification_new_checked_allows_non_periodic_steps(
1130        #[case] aggregation: BarAggregation,
1131    ) {
1132        let result = BarSpecification::new_checked(7, aggregation, PriceType::Last);
1133
1134        assert!(result.is_ok());
1135    }
1136
1137    #[rstest]
1138    #[case(BarAggregation::Millisecond, 1, TimeDelta::milliseconds(1))]
1139    #[case(BarAggregation::Millisecond, 10, TimeDelta::milliseconds(10))]
1140    #[case(BarAggregation::Second, 1, TimeDelta::seconds(1))]
1141    #[case(BarAggregation::Second, 15, TimeDelta::seconds(15))]
1142    #[case(BarAggregation::Minute, 1, TimeDelta::minutes(1))]
1143    #[case(BarAggregation::Minute, 30, TimeDelta::minutes(30))]
1144    #[case(BarAggregation::Hour, 1, TimeDelta::hours(1))]
1145    #[case(BarAggregation::Hour, 4, TimeDelta::hours(4))]
1146    #[case(BarAggregation::Day, 1, TimeDelta::days(1))]
1147    #[case(BarAggregation::Day, 2, TimeDelta::days(2))]
1148    #[case(BarAggregation::Week, 1, TimeDelta::days(7))]
1149    #[case(BarAggregation::Week, 2, TimeDelta::days(14))]
1150    #[case(BarAggregation::Month, 1, TimeDelta::days(30))]
1151    #[case(BarAggregation::Month, 3, TimeDelta::days(90))]
1152    #[case(BarAggregation::Year, 1, TimeDelta::days(365))]
1153    #[case(BarAggregation::Year, 2, TimeDelta::days(730))]
1154    #[should_panic(expected = "Aggregation not time based")]
1155    #[case(BarAggregation::Tick, 1, TimeDelta::zero())]
1156    fn test_get_bar_interval(
1157        #[case] aggregation: BarAggregation,
1158        #[case] step: usize,
1159        #[case] expected: TimeDelta,
1160    ) {
1161        let bar_type = BarType::Standard {
1162            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
1163            spec: BarSpecification::new(step, aggregation, PriceType::Last),
1164            aggregation_source: AggregationSource::Internal,
1165        };
1166
1167        let interval = get_bar_interval(&bar_type);
1168        assert_eq!(interval, expected);
1169    }
1170
1171    #[rstest]
1172    #[case(BarAggregation::Millisecond, 1, UnixNanos::from(1_000_000))]
1173    #[case(BarAggregation::Millisecond, 10, UnixNanos::from(10_000_000))]
1174    #[case(BarAggregation::Second, 1, UnixNanos::from(1_000_000_000))]
1175    #[case(BarAggregation::Second, 10, UnixNanos::from(10_000_000_000))]
1176    #[case(BarAggregation::Minute, 1, UnixNanos::from(60_000_000_000))]
1177    #[case(BarAggregation::Minute, 30, UnixNanos::from(1_800_000_000_000))]
1178    #[case(BarAggregation::Hour, 1, UnixNanos::from(3_600_000_000_000))]
1179    #[case(BarAggregation::Hour, 4, UnixNanos::from(14_400_000_000_000))]
1180    #[case(BarAggregation::Day, 1, UnixNanos::from(86_400_000_000_000))]
1181    #[case(BarAggregation::Day, 2, UnixNanos::from(172_800_000_000_000))]
1182    #[case(BarAggregation::Week, 1, UnixNanos::from(604_800_000_000_000))]
1183    #[case(BarAggregation::Week, 2, UnixNanos::from(1_209_600_000_000_000))]
1184    #[case(BarAggregation::Month, 1, UnixNanos::from(2_592_000_000_000_000))]
1185    #[case(BarAggregation::Month, 3, UnixNanos::from(7_776_000_000_000_000))]
1186    #[case(BarAggregation::Year, 1, UnixNanos::from(31_536_000_000_000_000))]
1187    #[case(BarAggregation::Year, 2, UnixNanos::from(63_072_000_000_000_000))]
1188    #[should_panic(expected = "Aggregation not time based")]
1189    #[case(BarAggregation::Tick, 1, UnixNanos::from(0))]
1190    fn test_get_bar_interval_ns(
1191        #[case] aggregation: BarAggregation,
1192        #[case] step: usize,
1193        #[case] expected: UnixNanos,
1194    ) {
1195        let bar_type = BarType::Standard {
1196            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
1197            spec: BarSpecification::new(step, aggregation, PriceType::Last),
1198            aggregation_source: AggregationSource::Internal,
1199        };
1200
1201        let interval_ns = get_bar_interval_ns(&bar_type);
1202        assert_eq!(interval_ns, expected);
1203    }
1204
1205    #[rstest]
1206    #[case::millisecond(
1207    Utc.timestamp_opt(1_658_349_296, 123_000_000).unwrap(), // 2024-07-21 12:34:56.123 UTC
1208    BarAggregation::Millisecond,
1209    1,
1210    Utc.timestamp_opt(1_658_349_296, 123_000_000).unwrap(),  // 2024-07-21 12:34:56.123 UTC
1211    )]
1212    #[rstest]
1213    #[case::millisecond(
1214    Utc.timestamp_opt(1_658_349_296, 123_000_000).unwrap(), // 2024-07-21 12:34:56.123 UTC
1215    BarAggregation::Millisecond,
1216    10,
1217    Utc.timestamp_opt(1_658_349_296, 120_000_000).unwrap(),  // 2024-07-21 12:34:56.120 UTC
1218    )]
1219    #[case::second(
1220    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1221    BarAggregation::Second,
1222    1,
1223    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap()
1224    )]
1225    #[case::second(
1226    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1227    BarAggregation::Second,
1228    5,
1229    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 55).unwrap()
1230    )]
1231    #[case::minute(
1232    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1233    BarAggregation::Minute,
1234    1,
1235    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 0).unwrap()
1236    )]
1237    #[case::minute(
1238    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1239    BarAggregation::Minute,
1240    5,
1241    Utc.with_ymd_and_hms(2024, 7, 21, 12, 30, 0).unwrap()
1242    )]
1243    #[case::hour(
1244    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1245    BarAggregation::Hour,
1246    1,
1247    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
1248    )]
1249    #[case::hour(
1250    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1251    BarAggregation::Hour,
1252    2,
1253    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
1254    )]
1255    #[case::day(
1256    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
1257    BarAggregation::Day,
1258    1,
1259    Utc.with_ymd_and_hms(2024, 7, 21, 0, 0, 0).unwrap()
1260    )]
1261    fn test_get_time_bar_start(
1262        #[case] now: DateTime<Utc>,
1263        #[case] aggregation: BarAggregation,
1264        #[case] step: usize,
1265        #[case] expected: DateTime<Utc>,
1266    ) {
1267        let bar_type = BarType::Standard {
1268            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
1269            spec: BarSpecification::new(step, aggregation, PriceType::Last),
1270            aggregation_source: AggregationSource::Internal,
1271        };
1272
1273        let start_time = get_time_bar_start(now, &bar_type, None);
1274        assert_eq!(start_time, expected);
1275    }
1276
1277    #[rstest]
1278    fn test_bar_spec_string_reprs() {
1279        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1280        assert_eq!(bar_spec.to_string(), "1-MINUTE-BID");
1281        assert_eq!(format!("{bar_spec}"), "1-MINUTE-BID");
1282    }
1283
1284    #[rstest]
1285    fn test_bar_type_parse_valid() {
1286        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL";
1287        let bar_type = BarType::from(input);
1288
1289        assert_eq!(
1290            bar_type.instrument_id(),
1291            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1292        );
1293        assert_eq!(
1294            bar_type.spec(),
1295            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1296        );
1297        assert_eq!(bar_type.aggregation_source(), AggregationSource::External);
1298        assert_eq!(bar_type, BarType::from(input));
1299    }
1300
1301    #[rstest]
1302    #[case("BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL", true, false)]
1303    #[case("BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-INTERNAL", false, true)]
1304    #[case(
1305        "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL",
1306        false,
1307        true
1308    )]
1309    #[case(
1310        "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-EXTERNAL@1-MINUTE-INTERNAL",
1311        true,
1312        false
1313    )]
1314    fn test_bar_type_aggregation_source_predicates(
1315        #[case] input: &str,
1316        #[case] expected_external: bool,
1317        #[case] expected_internal: bool,
1318    ) {
1319        let bar_type = BarType::from(input);
1320        assert_eq!(bar_type.is_externally_aggregated(), expected_external);
1321        assert_eq!(bar_type.is_internally_aggregated(), expected_internal);
1322    }
1323
1324    #[rstest]
1325    fn test_bar_type_composite_aggregation_source_predicates_track_inner() {
1326        let bar_type =
1327            BarType::from("BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL");
1328
1329        assert!(bar_type.is_internally_aggregated());
1330        assert!(!bar_type.is_externally_aggregated());
1331
1332        let composite = bar_type.composite();
1333        assert!(composite.is_externally_aggregated());
1334        assert!(!composite.is_internally_aggregated());
1335    }
1336
1337    #[rstest]
1338    fn test_bar_type_from_str_with_utf8_symbol() {
1339        let non_ascii_instrument = "TËST-PÉRP.BINANCE";
1340        let non_ascii_bar_type = "TËST-PÉRP.BINANCE-1-MINUTE-LAST-EXTERNAL";
1341
1342        let bar_type = BarType::from_str(non_ascii_bar_type).unwrap();
1343
1344        assert_eq!(
1345            bar_type.instrument_id(),
1346            InstrumentId::from_str(non_ascii_instrument).unwrap()
1347        );
1348        assert_eq!(
1349            bar_type.spec(),
1350            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1351        );
1352        assert_eq!(bar_type.aggregation_source(), AggregationSource::External);
1353        assert_eq!(bar_type.to_string(), non_ascii_bar_type);
1354    }
1355
1356    #[rstest]
1357    fn test_bar_type_composite_parse_valid() {
1358        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL";
1359        let bar_type = BarType::from(input);
1360        let standard = bar_type.standard();
1361
1362        assert_eq!(
1363            bar_type.instrument_id(),
1364            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1365        );
1366        assert_eq!(
1367            bar_type.spec(),
1368            BarSpecification::new(2, BarAggregation::Minute, PriceType::Last,)
1369        );
1370        assert_eq!(bar_type.aggregation_source(), AggregationSource::Internal);
1371        assert_eq!(bar_type, BarType::from(input));
1372        assert!(bar_type.is_composite());
1373
1374        assert_eq!(
1375            standard.instrument_id(),
1376            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1377        );
1378        assert_eq!(
1379            standard.spec(),
1380            BarSpecification::new(2, BarAggregation::Minute, PriceType::Last,)
1381        );
1382        assert_eq!(standard.aggregation_source(), AggregationSource::Internal);
1383        assert!(standard.is_standard());
1384
1385        let composite = bar_type.composite();
1386        let composite_input = "BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL";
1387
1388        assert_eq!(
1389            composite.instrument_id(),
1390            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1391        );
1392        assert_eq!(
1393            composite.spec(),
1394            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last,)
1395        );
1396        assert_eq!(composite.aggregation_source(), AggregationSource::External);
1397        assert_eq!(composite, BarType::from(composite_input));
1398        assert!(composite.is_standard());
1399    }
1400
1401    #[rstest]
1402    fn test_bar_type_parse_invalid_token_pos_0() {
1403        let input = "BTCUSDT-PERP-1-MINUTE-LAST-INTERNAL";
1404        let result = BarType::from_str(input);
1405
1406        assert_eq!(
1407            result.unwrap_err().to_string(),
1408            format!(
1409                "Error parsing `BarType` from '{input}', invalid token: 'BTCUSDT-PERP' at position 0"
1410            )
1411        );
1412    }
1413
1414    #[rstest]
1415    fn test_bar_type_parse_invalid_token_pos_1() {
1416        let input = "BTCUSDT-PERP.BINANCE-INVALID-MINUTE-LAST-INTERNAL";
1417        let result = BarType::from_str(input);
1418
1419        assert_eq!(
1420            result.unwrap_err().to_string(),
1421            format!(
1422                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 1"
1423            )
1424        );
1425    }
1426
1427    #[rstest]
1428    fn test_bar_type_parse_invalid_spec_step() {
1429        let input = "BTCUSDT-PERP.BINANCE-60-MINUTE-LAST-INTERNAL";
1430        let result = BarType::from_str(input);
1431
1432        assert_eq!(
1433            result.unwrap_err().to_string(),
1434            format!("Error parsing `BarType` from '{input}', invalid token: '60' at position 1")
1435        );
1436    }
1437
1438    #[rstest]
1439    fn test_bar_type_parse_invalid_token_pos_2() {
1440        let input = "BTCUSDT-PERP.BINANCE-1-INVALID-LAST-INTERNAL";
1441        let result = BarType::from_str(input);
1442
1443        assert_eq!(
1444            result.unwrap_err().to_string(),
1445            format!(
1446                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 2"
1447            )
1448        );
1449    }
1450
1451    #[rstest]
1452    fn test_bar_type_parse_invalid_token_pos_3() {
1453        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-INVALID-INTERNAL";
1454        let result = BarType::from_str(input);
1455
1456        assert_eq!(
1457            result.unwrap_err().to_string(),
1458            format!(
1459                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 3"
1460            )
1461        );
1462    }
1463
1464    #[rstest]
1465    fn test_bar_type_parse_invalid_token_pos_4() {
1466        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-BID-INVALID";
1467        let result = BarType::from_str(input);
1468
1469        assert!(result.is_err());
1470        assert_eq!(
1471            result.unwrap_err().to_string(),
1472            format!(
1473                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 4"
1474            )
1475        );
1476    }
1477
1478    #[rstest]
1479    fn test_bar_type_parse_invalid_token_pos_5() {
1480        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@INVALID-MINUTE-EXTERNAL";
1481        let result = BarType::from_str(input);
1482
1483        assert!(result.is_err());
1484        assert_eq!(
1485            result.unwrap_err().to_string(),
1486            format!(
1487                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 5"
1488            )
1489        );
1490    }
1491
1492    #[rstest]
1493    fn test_bar_type_parse_invalid_composite_spec_step() {
1494        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@60-MINUTE-EXTERNAL";
1495        let result = BarType::from_str(input);
1496
1497        assert!(result.is_err());
1498        assert_eq!(
1499            result.unwrap_err().to_string(),
1500            format!("Error parsing `BarType` from '{input}', invalid token: '60' at position 5")
1501        );
1502    }
1503
1504    #[rstest]
1505    fn test_bar_type_parse_invalid_token_pos_6() {
1506        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-INVALID-EXTERNAL";
1507        let result = BarType::from_str(input);
1508
1509        assert!(result.is_err());
1510        assert_eq!(
1511            result.unwrap_err().to_string(),
1512            format!(
1513                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 6"
1514            )
1515        );
1516    }
1517
1518    #[rstest]
1519    fn test_bar_type_parse_invalid_token_pos_7() {
1520        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-INVALID";
1521        let result = BarType::from_str(input);
1522
1523        assert!(result.is_err());
1524        assert_eq!(
1525            result.unwrap_err().to_string(),
1526            format!(
1527                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 7"
1528            )
1529        );
1530    }
1531
1532    #[rstest]
1533    fn test_bar_type_equality() {
1534        let instrument_id1 = InstrumentId {
1535            symbol: Symbol::new("AUD/USD"),
1536            venue: Venue::new("SIM"),
1537        };
1538        let instrument_id2 = InstrumentId {
1539            symbol: Symbol::new("GBP/USD"),
1540            venue: Venue::new("SIM"),
1541        };
1542        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1543        let bar_type1 = BarType::Standard {
1544            instrument_id: instrument_id1,
1545            spec: bar_spec,
1546            aggregation_source: AggregationSource::External,
1547        };
1548        let bar_type2 = BarType::Standard {
1549            instrument_id: instrument_id1,
1550            spec: bar_spec,
1551            aggregation_source: AggregationSource::External,
1552        };
1553        let bar_type3 = BarType::Standard {
1554            instrument_id: instrument_id2,
1555            spec: bar_spec,
1556            aggregation_source: AggregationSource::External,
1557        };
1558        assert_eq!(bar_type1, bar_type1);
1559        assert_eq!(bar_type1, bar_type2);
1560        assert_ne!(bar_type1, bar_type3);
1561    }
1562
1563    #[rstest]
1564    fn test_bar_type_id_spec_key_ignores_aggregation_source() {
1565        let bar_type_external = BarType::from_str("ESM4.XCME-1-MINUTE-LAST-EXTERNAL").unwrap();
1566        let bar_type_internal = BarType::from_str("ESM4.XCME-1-MINUTE-LAST-INTERNAL").unwrap();
1567
1568        // Full equality should differ
1569        assert_ne!(bar_type_external, bar_type_internal);
1570
1571        // id_spec_key should be the same
1572        assert_eq!(
1573            bar_type_external.id_spec_key(),
1574            bar_type_internal.id_spec_key()
1575        );
1576
1577        // Verify key components
1578        let (instrument_id, spec) = bar_type_external.id_spec_key();
1579        assert_eq!(instrument_id, bar_type_external.instrument_id());
1580        assert_eq!(spec, bar_type_external.spec());
1581    }
1582
1583    #[rstest]
1584    fn test_bar_type_comparison() {
1585        let instrument_id1 = InstrumentId {
1586            symbol: Symbol::new("AUD/USD"),
1587            venue: Venue::new("SIM"),
1588        };
1589
1590        let instrument_id2 = InstrumentId {
1591            symbol: Symbol::new("GBP/USD"),
1592            venue: Venue::new("SIM"),
1593        };
1594        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1595        let bar_spec2 = BarSpecification::new(2, BarAggregation::Minute, PriceType::Bid);
1596        let bar_type1 = BarType::Standard {
1597            instrument_id: instrument_id1,
1598            spec: bar_spec,
1599            aggregation_source: AggregationSource::External,
1600        };
1601        let bar_type2 = BarType::Standard {
1602            instrument_id: instrument_id1,
1603            spec: bar_spec,
1604            aggregation_source: AggregationSource::External,
1605        };
1606        let bar_type3 = BarType::Standard {
1607            instrument_id: instrument_id2,
1608            spec: bar_spec,
1609            aggregation_source: AggregationSource::External,
1610        };
1611        let bar_type4 = BarType::Composite {
1612            instrument_id: instrument_id2,
1613            spec: bar_spec2,
1614            aggregation_source: AggregationSource::Internal,
1615
1616            composite_step: 1,
1617            composite_aggregation: BarAggregation::Minute,
1618            composite_aggregation_source: AggregationSource::External,
1619        };
1620
1621        assert!(bar_type1 <= bar_type2);
1622        assert!(bar_type1 < bar_type3);
1623        assert!(bar_type3 > bar_type1);
1624        assert!(bar_type3 >= bar_type1);
1625        assert!(bar_type4 >= bar_type1);
1626    }
1627
1628    #[rstest]
1629    fn test_bar_new() {
1630        let bar_type = BarType::from("AAPL.XNAS-1-MINUTE-LAST-INTERNAL");
1631        let open = Price::from("100.0");
1632        let high = Price::from("105.0");
1633        let low = Price::from("95.0");
1634        let close = Price::from("102.0");
1635        let volume = Quantity::from("1000");
1636        let ts_event = UnixNanos::from(1_000_000);
1637        let ts_init = UnixNanos::from(2_000_000);
1638
1639        let bar = Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init);
1640
1641        assert_eq!(bar.bar_type, bar_type);
1642        assert_eq!(bar.open, open);
1643        assert_eq!(bar.high, high);
1644        assert_eq!(bar.low, low);
1645        assert_eq!(bar.close, close);
1646        assert_eq!(bar.volume, volume);
1647        assert_eq!(bar.ts_event, ts_event);
1648        assert_eq!(bar.ts_init, ts_init);
1649    }
1650
1651    #[rstest]
1652    #[case("100.0", "90.0", "95.0", "92.0")] // high < open
1653    #[case("100.0", "105.0", "110.0", "102.0")] // high < low
1654    #[case("100.0", "105.0", "95.0", "110.0")] // high < close
1655    #[case("100.0", "105.0", "95.0", "90.0")] // low > close
1656    #[case("100.0", "110.0", "105.0", "108.0")] // low > open
1657    #[case("100.0", "90.0", "110.0", "120.0")] // high < open, high < close, low > close
1658    fn test_bar_new_checked_conditions(
1659        #[case] open: &str,
1660        #[case] high: &str,
1661        #[case] low: &str,
1662        #[case] close: &str,
1663    ) {
1664        let bar_type = BarType::from("AAPL.XNAS-1-MINUTE-LAST-INTERNAL");
1665        let open = Price::from(open);
1666        let high = Price::from(high);
1667        let low = Price::from(low);
1668        let close = Price::from(close);
1669        let volume = Quantity::from("1000");
1670        let ts_event = UnixNanos::from(1_000_000);
1671        let ts_init = UnixNanos::from(2_000_000);
1672
1673        let result = Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init);
1674
1675        assert!(result.is_err());
1676    }
1677
1678    #[rstest]
1679    fn test_bar_equality() {
1680        let instrument_id = InstrumentId {
1681            symbol: Symbol::new("AUDUSD"),
1682            venue: Venue::new("SIM"),
1683        };
1684        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1685        let bar_type = BarType::Standard {
1686            instrument_id,
1687            spec: bar_spec,
1688            aggregation_source: AggregationSource::External,
1689        };
1690        let bar1 = Bar {
1691            bar_type,
1692            open: Price::from("1.00001"),
1693            high: Price::from("1.00004"),
1694            low: Price::from("1.00002"),
1695            close: Price::from("1.00003"),
1696            volume: Quantity::from("100000"),
1697            ts_event: UnixNanos::default(),
1698            ts_init: UnixNanos::from(1),
1699        };
1700
1701        let bar2 = Bar {
1702            bar_type,
1703            open: Price::from("1.00000"),
1704            high: Price::from("1.00004"),
1705            low: Price::from("1.00002"),
1706            close: Price::from("1.00003"),
1707            volume: Quantity::from("100000"),
1708            ts_event: UnixNanos::default(),
1709            ts_init: UnixNanos::from(1),
1710        };
1711        assert_eq!(bar1, bar1);
1712        assert_ne!(bar1, bar2);
1713    }
1714
1715    #[rstest]
1716    fn test_json_serialization() {
1717        let bar = Bar::default();
1718        let serialized = bar.to_json_bytes().unwrap();
1719        let deserialized = Bar::from_json_bytes(serialized.as_ref()).unwrap();
1720        assert_eq!(deserialized, bar);
1721    }
1722
1723    #[rstest]
1724    fn test_msgpack_serialization() {
1725        let bar = Bar::default();
1726        let serialized = bar.to_msgpack_bytes().unwrap();
1727        let deserialized = Bar::from_msgpack_bytes(serialized.as_ref()).unwrap();
1728        assert_eq!(deserialized, bar);
1729    }
1730}