Skip to main content

nautilus_databento/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    env,
18    path::{Path, PathBuf},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use databento::dbn::{self, InstrumentDefMsg};
24use dbn::{
25    Publisher,
26    decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32    identifiers::{InstrumentId, Symbol, Venue},
33    instruments::{Instrument, InstrumentAny},
34};
35
36use super::{
37    decode::{
38        decode_imbalance_msg, decode_record, decode_statistics_msg, decode_status_msg,
39        is_supported_stat_type,
40    },
41    symbology::decode_nautilus_instrument_id,
42    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
43};
44use crate::{
45    common::{build_publisher_venue_map, load_publishers},
46    decode::{DatabentoDecodeConfig, decode_instrument_def_msg},
47    symbology::MetadataCache,
48};
49
50/// A Nautilus data loader for Databento Binary Encoding (DBN) format data.
51///
52/// # Supported Schemas
53///  - `MBO` -> `OrderBookDelta`
54///  - `MBP_1` -> `(QuoteTick, Option<TradeTick>)`
55///  - `MBP_10` -> `OrderBookDepth10`
56///  - `BBO_1S` -> `QuoteTick`
57///  - `BBO_1M` -> `QuoteTick`
58///  - `CMBP_1` -> `(QuoteTick, Option<TradeTick>)`
59///  - `CBBO_1S` -> `QuoteTick`
60///  - `CBBO_1M` -> `QuoteTick`
61///  - `TCBBO` -> `(QuoteTick, TradeTick)`
62///  - `TBBO` -> `(QuoteTick, TradeTick)`
63///  - `TRADES` -> `TradeTick`
64///  - `OHLCV_1S` -> `Bar`
65///  - `OHLCV_1M` -> `Bar`
66///  - `OHLCV_1H` -> `Bar`
67///  - `OHLCV_1D` -> `Bar`
68///  - `OHLCV_EOD` -> `Bar`
69///  - `DEFINITION` -> `Instrument`
70///  - `IMBALANCE` -> `DatabentoImbalance`
71///  - `STATISTICS` -> `DatabentoStatistics`
72///  - `STATUS` -> `InstrumentStatus`
73///
74/// # References
75///
76/// <https://databento.com/docs/schemas-and-data-formats>
77#[cfg_attr(
78    feature = "python",
79    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
80)]
81#[cfg_attr(
82    feature = "python",
83    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.adapters.databento")
84)]
85#[derive(Debug)]
86pub struct DatabentoDataLoader {
87    publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
88    venue_dataset_map: IndexMap<Venue, Dataset>,
89    publisher_venue_map: IndexMap<PublisherId, Venue>,
90    symbol_venue_map: AHashMap<Symbol, Venue>,
91    price_precisions: AHashMap<Symbol, u8>,
92}
93
94impl DatabentoDataLoader {
95    /// Creates a new [`DatabentoDataLoader`] instance.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if locating or loading publishers data fails.
100    pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
101        let mut loader = Self {
102            publishers_map: IndexMap::new(),
103            venue_dataset_map: IndexMap::new(),
104            publisher_venue_map: IndexMap::new(),
105            symbol_venue_map: AHashMap::new(),
106            price_precisions: AHashMap::new(),
107        };
108
109        // Load publishers
110        let publishers_filepath = if let Some(p) = publishers_filepath {
111            p
112        } else {
113            // Use built-in publishers path
114            let mut exe_path = env::current_exe()?;
115            exe_path.pop();
116            exe_path.push("publishers.json");
117            exe_path
118        };
119
120        loader
121            .load_publishers(publishers_filepath)
122            .context("error loading publishers.json")?;
123
124        Ok(loader)
125    }
126
127    /// Load the publishers data from the file at the given `filepath`.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the file cannot be read or parsed as JSON.
132    pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
133        let publishers = load_publishers(filepath)?;
134
135        self.publishers_map = publishers
136            .iter()
137            .cloned()
138            .map(|p| (p.publisher_id, p))
139            .collect();
140
141        let mut venue_dataset_map = IndexMap::new();
142
143        // Only insert a dataset if the venue key is not already in the map
144        for publisher in &publishers {
145            let venue = Venue::from(publisher.venue.as_str());
146            let dataset = Dataset::from(publisher.dataset.as_str());
147            venue_dataset_map.entry(venue).or_insert(dataset);
148        }
149
150        self.venue_dataset_map = venue_dataset_map;
151        apply_default_venue_dataset_mappings(&mut self.venue_dataset_map);
152
153        self.publisher_venue_map = build_publisher_venue_map(&publishers);
154
155        Ok(())
156    }
157
158    /// Returns the internal Databento publishers currently held by the loader.
159    #[must_use]
160    pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
161        &self.publishers_map
162    }
163
164    /// Sets the `venue` to map to the given `dataset`.
165    pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
166        _ = self.venue_dataset_map.insert(venue, dataset);
167    }
168
169    /// Returns the dataset which matches the given `venue` (if found).
170    #[must_use]
171    pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
172        self.venue_dataset_map.get(venue)
173    }
174
175    /// Returns the venue which matches the given `publisher_id` (if found).
176    #[must_use]
177    pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
178        self.publisher_venue_map.get(&publisher_id)
179    }
180
181    /// Caches a `price_precision` for the given `symbol`.
182    ///
183    /// When market data is read without an explicit `price_precision` argument,
184    /// the loader resolves precision per record from this cache. Definitions
185    /// loaded via [`Self::load_instruments`] are inserted automatically.
186    pub fn set_price_precision(&mut self, symbol: Symbol, price_precision: u8) {
187        self.price_precisions.insert(symbol, price_precision);
188    }
189
190    /// Returns the cached price precisions keyed by symbol.
191    #[must_use]
192    pub const fn get_price_precisions(&self) -> &AHashMap<Symbol, u8> {
193        &self.price_precisions
194    }
195
196    /// Resolves a price precision for the given `instrument_id`.
197    ///
198    /// Resolution order:
199    /// 1. The explicit `price_precision` argument (if `Some`).
200    /// 2. The cached precision for the instrument's symbol.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error when no precision is available.
205    fn resolve_price_precision(
206        &self,
207        instrument_id: &InstrumentId,
208        price_precision: Option<u8>,
209    ) -> anyhow::Result<u8> {
210        if let Some(precision) = price_precision {
211            return Ok(precision);
212        }
213
214        self.price_precisions
215            .get(&instrument_id.symbol)
216            .copied()
217            .ok_or_else(|| {
218                anyhow::anyhow!(
219                    "Could not resolve `price_precision` for {instrument_id}: \
220                     pass `price_precision` explicitly, call `set_price_precision`, \
221                     or load the instrument definitions first via `load_instruments`"
222                )
223            })
224    }
225
226    /// Returns the schema for the given `filepath`.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the file cannot be decoded or metadata retrieval fails.
231    pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
232        let decoder = Decoder::from_zstd_file(filepath)?;
233        let metadata = decoder.metadata();
234        Ok(metadata.schema.map(|schema| schema.to_string()))
235    }
236
237    /// Reads instrument definition records from a DBN file.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if decoding the definition records fails.
242    pub fn read_definition_records<'a>(
243        &'a mut self,
244        filepath: &Path,
245        use_exchange_as_venue: bool,
246        decode_config: Option<&'a DatabentoDecodeConfig>,
247    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + 'a> {
248        let decoder = Decoder::from_zstd_file(filepath)?;
249        let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsg>();
250
251        // Loop over skipped records (Ok(None)) so one unsupported class does not
252        // terminate the stream
253        Ok(std::iter::from_fn(move || {
254            loop {
255                let advance = dbn_stream
256                    .advance()
257                    .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"));
258                if let Err(e) = advance {
259                    return Some(Err(e));
260                }
261
262                let rec = dbn_stream.get()?;
263
264                let result: anyhow::Result<Option<InstrumentAny>> = (|| {
265                    let record = dbn::RecordRef::from(rec);
266                    let msg = record
267                        .get::<InstrumentDefMsg>()
268                        .ok_or_else(|| anyhow::anyhow!("Failed to decode InstrumentDefMsg"))?;
269
270                    let raw_symbol = rec
271                        .raw_symbol()
272                        .map_err(|e| anyhow::anyhow!("Error decoding `raw_symbol`: {e}"))?;
273                    let symbol = Symbol::from(raw_symbol);
274
275                    let publisher = rec
276                        .hd
277                        .publisher()
278                        .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
279                    let venue = match publisher {
280                        Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
281                            let exchange = rec.exchange().map_err(|e| {
282                                anyhow::anyhow!("Missing `exchange` for record: {e}")
283                            })?;
284                            let venue = Venue::from_code(exchange).map_err(|e| {
285                                anyhow::anyhow!("Venue not found for exchange {exchange}: {e}")
286                            })?;
287                            self.symbol_venue_map.insert(symbol, venue);
288                            venue
289                        }
290                        _ => *self
291                            .publisher_venue_map
292                            .get(&msg.hd.publisher_id)
293                            .ok_or_else(|| {
294                                anyhow::anyhow!(
295                                    "Venue not found for publisher_id {}",
296                                    msg.hd.publisher_id
297                                )
298                            })?,
299                    };
300                    let instrument_id = InstrumentId::new(symbol, venue);
301                    let ts_init = msg.ts_recv.into();
302
303                    decode_instrument_def_msg(rec, instrument_id, Some(ts_init), decode_config)
304                })();
305
306                match result {
307                    Ok(Some(item)) => return Some(Ok(item)),
308                    Ok(None) => {}
309                    Err(e) => return Some(Err(e)),
310                }
311            }
312        }))
313    }
314
315    /// Reads and decodes market data records from a DBN file.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if reading records fails.
320    pub fn read_records<T>(
321        &self,
322        filepath: &Path,
323        instrument_id: Option<InstrumentId>,
324        price_precision: Option<u8>,
325        include_trades: bool,
326        bars_timestamp_on_close: Option<bool>,
327    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
328    where
329        T: dbn::Record + dbn::HasRType + 'static,
330    {
331        let decoder = Decoder::from_zstd_file(filepath)?;
332        let mut metadata_cache = if instrument_id.is_none() {
333            Some(MetadataCache::new(decoder.metadata().clone()))
334        } else {
335            None
336        };
337        let mut dbn_stream = decoder.decode_stream::<T>();
338        let fixed_instrument_id = instrument_id.is_some();
339        let mut fixed_price_precision = price_precision;
340
341        Ok(std::iter::from_fn(move || {
342            let result: anyhow::Result<Option<(Option<Data>, Option<Data>)>> = (|| {
343                dbn_stream
344                    .advance()
345                    .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
346
347                if let Some(rec) = dbn_stream.get() {
348                    let record = dbn::RecordRef::from(rec);
349                    let instrument_id = self
350                        .resolve_record_instrument_id(&record, instrument_id, &mut metadata_cache)
351                        .context("failed to decode instrument id")?;
352                    let resolved_precision = self.resolve_stream_price_precision(
353                        &instrument_id,
354                        fixed_instrument_id,
355                        &mut fixed_price_precision,
356                    )?;
357                    let (item1, item2) = decode_record(
358                        &record,
359                        instrument_id,
360                        resolved_precision,
361                        None,
362                        include_trades,
363                        bars_timestamp_on_close.unwrap_or(true),
364                    )?;
365                    Ok(Some((item1, item2)))
366                } else {
367                    Ok(None)
368                }
369            })();
370
371            match result {
372                Ok(Some(v)) => Some(Ok(v)),
373                Ok(None) => None,
374                Err(e) => Some(Err(e)),
375            }
376        }))
377    }
378
379    /// Loads all instrument definitions from a DBN file.
380    ///
381    /// When `skip_on_error` is true, instruments that fail to decode are logged
382    /// as warnings and skipped. When false (default), any decode error is propagated.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if loading instruments fails.
387    pub fn load_instruments(
388        &mut self,
389        filepath: &Path,
390        use_exchange_as_venue: bool,
391        skip_on_error: bool,
392        decode_config: Option<&DatabentoDecodeConfig>,
393    ) -> anyhow::Result<Vec<InstrumentAny>> {
394        let instruments = if skip_on_error {
395            let mut collected = Vec::new();
396
397            for result in
398                self.read_definition_records(filepath, use_exchange_as_venue, decode_config)?
399            {
400                match result {
401                    Ok(instrument) => collected.push(instrument),
402                    Err(e) => log::warn!("Skipping instrument: {e}"),
403                }
404            }
405            collected
406        } else {
407            self.read_definition_records(filepath, use_exchange_as_venue, decode_config)?
408                .collect::<Result<Vec<_>, _>>()?
409        };
410
411        for instrument in &instruments {
412            self.price_precisions
413                .insert(instrument.id().symbol, instrument.price_precision());
414        }
415
416        Ok(instruments)
417    }
418
419    /// Loads order book delta messages from a DBN MBO schema file.
420    ///
421    /// Cannot include trades.
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if loading order book deltas fails.
426    pub fn load_order_book_deltas(
427        &self,
428        filepath: &Path,
429        instrument_id: Option<InstrumentId>,
430        price_precision: Option<u8>,
431    ) -> anyhow::Result<Vec<OrderBookDelta>> {
432        self.read_order_book_deltas(filepath, instrument_id, price_precision)?
433            .collect()
434    }
435
436    /// Reads order book delta messages from a DBN MBO schema file without collecting them.
437    ///
438    /// Cannot include trades.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if opening or decoding order book deltas fails.
443    pub fn read_order_book_deltas(
444        &self,
445        filepath: &Path,
446        instrument_id: Option<InstrumentId>,
447        price_precision: Option<u8>,
448    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<OrderBookDelta>> + '_> {
449        let records = self.read_records::<dbn::MboMsg>(
450            filepath,
451            instrument_id,
452            price_precision,
453            false,
454            None,
455        )?;
456
457        Ok(records.filter_map(|result| match result {
458            Ok((Some(item1), _)) => {
459                if let Data::Delta(delta) = item1 {
460                    Some(Ok(delta))
461                } else {
462                    None
463                }
464            }
465            Ok((None, _)) => None,
466            Err(e) => Some(Err(e)),
467        }))
468    }
469
470    /// Loads order book depth10 snapshots from a DBN MBP-10 schema file.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if loading order book depth10 fails.
475    pub fn load_order_book_depth10(
476        &self,
477        filepath: &Path,
478        instrument_id: Option<InstrumentId>,
479        price_precision: Option<u8>,
480    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
481        self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
482            .filter_map(|result| match result {
483                Ok((Some(item1), _)) => {
484                    if let Data::Depth10(depth) = item1 {
485                        Some(Ok(*depth))
486                    } else {
487                        None
488                    }
489                }
490                Ok((None, _)) => None,
491                Err(e) => Some(Err(e)),
492            })
493            .collect()
494    }
495
496    /// Loads quote tick messages from a DBN MBP-1 or TBBO schema file.
497    ///
498    /// # Errors
499    ///
500    /// Returns an error if loading quotes fails.
501    pub fn load_quotes(
502        &self,
503        filepath: &Path,
504        instrument_id: Option<InstrumentId>,
505        price_precision: Option<u8>,
506    ) -> anyhow::Result<Vec<QuoteTick>> {
507        self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
508            .filter_map(|result| match result {
509                Ok((Some(item1), _)) => {
510                    if let Data::Quote(quote) = item1 {
511                        Some(Ok(quote))
512                    } else {
513                        None
514                    }
515                }
516                Ok((None, _)) => None,
517                Err(e) => Some(Err(e)),
518            })
519            .collect()
520    }
521
522    /// Loads best bid/offer quote messages from a DBN BBO schema file.
523    ///
524    /// # Errors
525    ///
526    /// Returns an error if loading BBO quotes fails.
527    pub fn load_bbo_quotes(
528        &self,
529        filepath: &Path,
530        instrument_id: Option<InstrumentId>,
531        price_precision: Option<u8>,
532    ) -> anyhow::Result<Vec<QuoteTick>> {
533        self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
534            .filter_map(|result| match result {
535                Ok((Some(item1), _)) => {
536                    if let Data::Quote(quote) = item1 {
537                        Some(Ok(quote))
538                    } else {
539                        None
540                    }
541                }
542                Ok((None, _)) => None,
543                Err(e) => Some(Err(e)),
544            })
545            .collect()
546    }
547
548    /// Loads consolidated MBP-1 quote messages from a DBN CMBP-1 schema file.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if loading consolidated MBP-1 quotes fails.
553    pub fn load_cmbp_quotes(
554        &self,
555        filepath: &Path,
556        instrument_id: Option<InstrumentId>,
557        price_precision: Option<u8>,
558    ) -> anyhow::Result<Vec<QuoteTick>> {
559        self.read_records::<dbn::Cmbp1Msg>(filepath, instrument_id, price_precision, false, None)?
560            .filter_map(|result| match result {
561                Ok((Some(item1), _)) => {
562                    if let Data::Quote(quote) = item1 {
563                        Some(Ok(quote))
564                    } else {
565                        None
566                    }
567                }
568                Ok((None, _)) => None,
569                Err(e) => Some(Err(e)),
570            })
571            .collect()
572    }
573
574    /// Loads consolidated best bid/offer quote messages from a DBN CBBO schema file.
575    ///
576    /// # Errors
577    ///
578    /// Returns an error if loading consolidated BBO quotes fails.
579    pub fn load_cbbo_quotes(
580        &self,
581        filepath: &Path,
582        instrument_id: Option<InstrumentId>,
583        price_precision: Option<u8>,
584    ) -> anyhow::Result<Vec<QuoteTick>> {
585        self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
586            .filter_map(|result| match result {
587                Ok((Some(item1), _)) => {
588                    if let Data::Quote(quote) = item1 {
589                        Some(Ok(quote))
590                    } else {
591                        None
592                    }
593                }
594                Ok((None, _)) => None,
595                Err(e) => Some(Err(e)),
596            })
597            .collect()
598    }
599
600    /// Loads trade messages from a DBN TBBO schema file.
601    ///
602    /// # Errors
603    ///
604    /// Returns an error if loading TBBO trades fails.
605    pub fn load_tbbo_trades(
606        &self,
607        filepath: &Path,
608        instrument_id: Option<InstrumentId>,
609        price_precision: Option<u8>,
610    ) -> anyhow::Result<Vec<TradeTick>> {
611        self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, true, None)?
612            .filter_map(|result| match result {
613                Ok((_, maybe_item2)) => {
614                    if let Some(Data::Trade(trade)) = maybe_item2 {
615                        Some(Ok(trade))
616                    } else {
617                        None
618                    }
619                }
620                Err(e) => Some(Err(e)),
621            })
622            .collect()
623    }
624
625    /// Loads trade messages from a DBN TCBBO schema file.
626    ///
627    /// # Errors
628    ///
629    /// Returns an error if loading TCBBO trades fails.
630    pub fn load_tcbbo_trades(
631        &self,
632        filepath: &Path,
633        instrument_id: Option<InstrumentId>,
634        price_precision: Option<u8>,
635    ) -> anyhow::Result<Vec<TradeTick>> {
636        self.read_records::<dbn::TcbboMsg>(filepath, instrument_id, price_precision, true, None)?
637            .filter_map(|result| match result {
638                Ok((_, maybe_item2)) => {
639                    if let Some(Data::Trade(trade)) = maybe_item2 {
640                        Some(Ok(trade))
641                    } else {
642                        None
643                    }
644                }
645                Err(e) => Some(Err(e)),
646            })
647            .collect()
648    }
649
650    /// Loads trade messages from a DBN TRADES schema file.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if loading trades fails.
655    pub fn load_trades(
656        &self,
657        filepath: &Path,
658        instrument_id: Option<InstrumentId>,
659        price_precision: Option<u8>,
660    ) -> anyhow::Result<Vec<TradeTick>> {
661        self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
662            .filter_map(|result| match result {
663                Ok((Some(item1), _)) => {
664                    if let Data::Trade(trade) = item1 {
665                        Some(Ok(trade))
666                    } else {
667                        None
668                    }
669                }
670                Ok((None, _)) => None,
671                Err(e) => Some(Err(e)),
672            })
673            .collect()
674    }
675
676    /// Loads OHLCV bar messages from a DBN OHLCV schema file.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if loading bars fails.
681    pub fn load_bars(
682        &self,
683        filepath: &Path,
684        instrument_id: Option<InstrumentId>,
685        price_precision: Option<u8>,
686        timestamp_on_close: Option<bool>,
687    ) -> anyhow::Result<Vec<Bar>> {
688        self.read_records::<dbn::OhlcvMsg>(
689            filepath,
690            instrument_id,
691            price_precision,
692            false,
693            timestamp_on_close,
694        )?
695        .filter_map(|result| match result {
696            Ok((Some(item1), _)) => {
697                if let Data::Bar(bar) = item1 {
698                    Some(Ok(bar))
699                } else {
700                    None
701                }
702            }
703            Ok((None, _)) => None,
704            Err(e) => Some(Err(e)),
705        })
706        .collect()
707    }
708
709    /// Loads instrument status messages from a DBN STATUS schema file.
710    ///
711    /// # Errors
712    ///
713    /// Returns an error if loading status records fails.
714    pub fn load_status_records<T>(
715        &self,
716        filepath: &Path,
717        instrument_id: Option<InstrumentId>,
718    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
719    where
720        T: dbn::Record + dbn::HasRType + 'static,
721    {
722        let decoder = Decoder::from_zstd_file(filepath)?;
723        let mut metadata_cache = if instrument_id.is_none() {
724            Some(MetadataCache::new(decoder.metadata().clone()))
725        } else {
726            None
727        };
728        let mut dbn_stream = decoder.decode_stream::<T>();
729
730        Ok(std::iter::from_fn(move || {
731            if let Err(e) = dbn_stream.advance() {
732                return Some(Err(e.into()));
733            }
734
735            match dbn_stream.get() {
736                Some(rec) => {
737                    let record = dbn::RecordRef::from(rec);
738                    let instrument_id = match self.resolve_record_instrument_id(
739                        &record,
740                        instrument_id,
741                        &mut metadata_cache,
742                    ) {
743                        Ok(id) => id,
744                        Err(e) => return Some(Err(e)),
745                    };
746
747                    let msg = match record.get::<dbn::StatusMsg>() {
748                        Some(m) => m,
749                        None => return Some(Err(anyhow::anyhow!("Invalid `StatusMsg`"))),
750                    };
751                    let ts_init = msg.ts_recv.into();
752
753                    match decode_status_msg(msg, instrument_id, Some(ts_init)) {
754                        Ok(data) => Some(Ok(data)),
755                        Err(e) => Some(Err(e)),
756                    }
757                }
758                None => None,
759            }
760        }))
761    }
762
763    /// Reads imbalance messages from a DBN IMBALANCE schema file.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if reading imbalance records fails.
768    pub fn read_imbalance_records<T>(
769        &self,
770        filepath: &Path,
771        instrument_id: Option<InstrumentId>,
772        price_precision: Option<u8>,
773    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
774    where
775        T: dbn::Record + dbn::HasRType + 'static,
776    {
777        let decoder = Decoder::from_zstd_file(filepath)?;
778        let mut metadata_cache = if instrument_id.is_none() {
779            Some(MetadataCache::new(decoder.metadata().clone()))
780        } else {
781            None
782        };
783        let mut dbn_stream = decoder.decode_stream::<T>();
784        let fixed_instrument_id = instrument_id.is_some();
785        let mut fixed_price_precision = price_precision;
786
787        Ok(std::iter::from_fn(move || {
788            if let Err(e) = dbn_stream.advance() {
789                return Some(Err(e.into()));
790            }
791
792            match dbn_stream.get() {
793                Some(rec) => {
794                    let record = dbn::RecordRef::from(rec);
795                    let instrument_id = match self.resolve_record_instrument_id(
796                        &record,
797                        instrument_id,
798                        &mut metadata_cache,
799                    ) {
800                        Ok(id) => id,
801                        Err(e) => return Some(Err(e)),
802                    };
803                    let resolved_precision = match self.resolve_stream_price_precision(
804                        &instrument_id,
805                        fixed_instrument_id,
806                        &mut fixed_price_precision,
807                    ) {
808                        Ok(p) => p,
809                        Err(e) => return Some(Err(e)),
810                    };
811
812                    let msg = match record.get::<dbn::ImbalanceMsg>() {
813                        Some(m) => m,
814                        None => return Some(Err(anyhow::anyhow!("Invalid `ImbalanceMsg`"))),
815                    };
816                    let ts_init = msg.ts_recv.into();
817
818                    match decode_imbalance_msg(
819                        msg,
820                        instrument_id,
821                        resolved_precision,
822                        Some(ts_init),
823                    ) {
824                        Ok(data) => Some(Ok(data)),
825                        Err(e) => Some(Err(e)),
826                    }
827                }
828                None => None,
829            }
830        }))
831    }
832
833    /// Reads statistics messages from a DBN STATISTICS schema file.
834    ///
835    /// # Errors
836    ///
837    /// Returns an error if reading statistics records fails.
838    pub fn read_statistics_records<T>(
839        &self,
840        filepath: &Path,
841        instrument_id: Option<InstrumentId>,
842        price_precision: Option<u8>,
843    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
844    where
845        T: dbn::Record + dbn::HasRType + 'static,
846    {
847        let decoder = Decoder::from_zstd_file(filepath)?;
848        let mut metadata_cache = if instrument_id.is_none() {
849            Some(MetadataCache::new(decoder.metadata().clone()))
850        } else {
851            None
852        };
853        let mut dbn_stream = decoder.decode_stream::<T>();
854        let fixed_instrument_id = instrument_id.is_some();
855        let mut fixed_price_precision = price_precision;
856
857        // Loop over skipped records so one unsupported stat_type does not terminate
858        // the stream; precheck before precision resolution.
859        Ok(std::iter::from_fn(move || {
860            loop {
861                if let Err(e) = dbn_stream.advance() {
862                    return Some(Err(e.into()));
863                }
864
865                let rec = dbn_stream.get()?;
866                let record = dbn::RecordRef::from(rec);
867                let msg = match record.get::<dbn::StatMsg>() {
868                    Some(m) => m,
869                    None => return Some(Err(anyhow::anyhow!("Invalid `StatMsg`"))),
870                };
871
872                if !is_supported_stat_type(msg.stat_type) {
873                    log::warn!("Skipping unsupported `stat_type` {}", msg.stat_type);
874                    continue;
875                }
876
877                let instrument_id = match self.resolve_record_instrument_id(
878                    &record,
879                    instrument_id,
880                    &mut metadata_cache,
881                ) {
882                    Ok(id) => id,
883                    Err(e) => return Some(Err(e)),
884                };
885                let resolved_precision = match self.resolve_stream_price_precision(
886                    &instrument_id,
887                    fixed_instrument_id,
888                    &mut fixed_price_precision,
889                ) {
890                    Ok(p) => p,
891                    Err(e) => return Some(Err(e)),
892                };
893                let ts_init = msg.ts_recv.into();
894
895                match decode_statistics_msg(msg, instrument_id, resolved_precision, Some(ts_init)) {
896                    Ok(Some(data)) => return Some(Ok(data)),
897                    Ok(None) => {}
898                    Err(e) => return Some(Err(e)),
899                }
900            }
901        }))
902    }
903
904    fn resolve_record_instrument_id(
905        &self,
906        record: &dbn::RecordRef,
907        instrument_id: Option<InstrumentId>,
908        metadata_cache: &mut Option<MetadataCache>,
909    ) -> anyhow::Result<InstrumentId> {
910        if let Some(instrument_id) = instrument_id {
911            return Ok(instrument_id);
912        }
913
914        let metadata_cache = metadata_cache
915            .as_mut()
916            .ok_or_else(|| anyhow::anyhow!("missing metadata cache for dynamic instrument id"))?;
917
918        decode_nautilus_instrument_id(
919            record,
920            metadata_cache,
921            &self.publisher_venue_map,
922            &self.symbol_venue_map,
923        )
924    }
925
926    fn resolve_stream_price_precision(
927        &self,
928        instrument_id: &InstrumentId,
929        fixed_instrument_id: bool,
930        fixed_price_precision: &mut Option<u8>,
931    ) -> anyhow::Result<u8> {
932        if let Some(precision) = *fixed_price_precision {
933            return Ok(precision);
934        }
935
936        let precision = self.resolve_price_precision(instrument_id, None)?;
937        if fixed_instrument_id {
938            *fixed_price_precision = Some(precision);
939        }
940
941        Ok(precision)
942    }
943}
944
945/// Applies default venue-to-dataset mappings for consolidated Databento feeds.
946/// GLBX.MDP3 covers CME Globex exchange MICs; OPRA.PILLAR covers OPRA option venues;
947/// EQUS.MINI is the consolidated US equities default.
948fn apply_default_venue_dataset_mappings(venue_dataset_map: &mut IndexMap<Venue, Dataset>) {
949    let glbx = Dataset::from("GLBX.MDP3");
950
951    for venue in [
952        Venue::CBCM(),
953        Venue::GLBX(),
954        Venue::NYUM(),
955        Venue::XCBT(),
956        Venue::XCEC(),
957        Venue::XCME(),
958        Venue::XFXS(),
959        Venue::XNYM(),
960    ] {
961        _ = venue_dataset_map.insert(venue, glbx);
962    }
963
964    // publishers.json seeds the consolidated EQUS venue with the unreleased EQUS.PLUS,
965    // so pin it to EQUS.MINI, the cheapest released US equities feed.
966    _ = venue_dataset_map.insert(Venue::from("EQUS"), Dataset::from("EQUS.MINI"));
967
968    let opra = Dataset::from("OPRA.PILLAR");
969    for venue_code in [
970        "AMXO", "XBOX", "XCBO", "EMLD", "EDGO", "GMNI", "XISX", "MCRY", "XMIO", "ARCO", "OPRA",
971        "MPRL", "XNDQ", "XBXO", "C2OX", "XPHL", "BATO", "MXOP", "SPHR",
972    ] {
973        _ = venue_dataset_map.insert(Venue::from(venue_code), opra);
974    }
975}
976
977#[cfg(test)]
978mod tests {
979    use std::path::{Path, PathBuf};
980
981    use nautilus_model::types::{Price, Quantity};
982    use rstest::{fixture, rstest};
983    use ustr::Ustr;
984
985    use super::*;
986
987    fn test_data_path() -> PathBuf {
988        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
989    }
990
991    #[fixture]
992    fn loader() -> DatabentoDataLoader {
993        let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
994        let mut loader = DatabentoDataLoader::new(Some(publishers_filepath)).unwrap();
995        // ES futures test data uses precision 2 (USD cents)
996        loader.set_price_precision(Symbol::from("ESM4"), 2);
997        loader
998    }
999
1000    #[fixture]
1001    fn loader_without_seed() -> DatabentoDataLoader {
1002        let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
1003        DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
1004    }
1005
1006    // TODO: Improve the below assertions that we've actually read the records we expected
1007
1008    #[rstest]
1009    fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
1010        let dataset = Ustr::from("EQUS.PLUS");
1011        let venue = Venue::from("XNAS");
1012        loader.set_dataset_for_venue(dataset, venue);
1013
1014        let result = loader.get_dataset_for_venue(&venue).unwrap();
1015        assert_eq!(*result, dataset);
1016    }
1017
1018    #[rstest]
1019    fn test_default_venue_dataset_mappings(loader: DatabentoDataLoader) {
1020        let xcme = Venue::XCME();
1021        let result = loader.get_dataset_for_venue(&xcme).unwrap();
1022        assert_eq!(*result, Ustr::from("GLBX.MDP3"));
1023
1024        let xcbo = Venue::from("XCBO");
1025        let result = loader.get_dataset_for_venue(&xcbo).unwrap();
1026        assert_eq!(*result, Ustr::from("OPRA.PILLAR"));
1027
1028        let equs = Venue::from("EQUS");
1029        let result = loader.get_dataset_for_venue(&equs).unwrap();
1030        assert_eq!(*result, Ustr::from("EQUS.MINI"));
1031    }
1032
1033    #[rstest]
1034    #[case(test_data_path().join("test_data.definition.equity.dbn.zst"))]
1035    fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
1036        let instruments = loader.load_instruments(&path, false, false, None).unwrap();
1037
1038        assert_eq!(instruments.len(), 2);
1039        // Definition records auto-populate the precision cache
1040        assert_eq!(
1041            loader.get_price_precisions().get(&Symbol::from("ESM4")),
1042            Some(&2)
1043        );
1044    }
1045
1046    #[rstest]
1047    fn test_load_instruments_populates_price_precisions_cache(
1048        mut loader_without_seed: DatabentoDataLoader,
1049    ) {
1050        let path = test_data_path().join("test_data.definition.equity.dbn.zst");
1051        assert!(loader_without_seed.get_price_precisions().is_empty());
1052
1053        let instruments = loader_without_seed
1054            .load_instruments(&path, false, false, None)
1055            .unwrap();
1056
1057        assert_eq!(instruments.len(), 2);
1058        for instrument in &instruments {
1059            let symbol = instrument.id().symbol;
1060            assert_eq!(
1061                loader_without_seed.get_price_precisions().get(&symbol),
1062                Some(&instrument.price_precision()),
1063                "cache missing or mismatched entry for {symbol}",
1064            );
1065        }
1066    }
1067
1068    #[rstest]
1069    fn test_read_records_errors_when_precision_unresolvable(
1070        loader_without_seed: DatabentoDataLoader,
1071    ) {
1072        let path = test_data_path().join("test_data.mbo.dbn.zst");
1073        let instrument_id = InstrumentId::from("ESM4.GLBX");
1074
1075        let result = loader_without_seed.load_order_book_deltas(&path, Some(instrument_id), None);
1076
1077        let err = result.expect_err("expected precision-resolution error");
1078        let err_msg = format!("{err}");
1079        assert!(
1080            err_msg.contains("Could not resolve `price_precision`"),
1081            "unexpected error message: {err_msg}",
1082        );
1083        assert!(
1084            err_msg.contains("ESM4.GLBX"),
1085            "error should name the instrument: {err_msg}",
1086        );
1087    }
1088
1089    #[rstest]
1090    fn test_set_price_precision_unblocks_reads(mut loader_without_seed: DatabentoDataLoader) {
1091        let path = test_data_path().join("test_data.mbo.dbn.zst");
1092        let instrument_id = InstrumentId::from("ESM4.GLBX");
1093
1094        // Without a seeded precision the read errors
1095        assert!(
1096            loader_without_seed
1097                .load_order_book_deltas(&path, Some(instrument_id), None)
1098                .is_err()
1099        );
1100
1101        loader_without_seed.set_price_precision(Symbol::from("ESM4"), 2);
1102
1103        let deltas = loader_without_seed
1104            .load_order_book_deltas(&path, Some(instrument_id), None)
1105            .unwrap();
1106        assert_eq!(deltas.len(), 2);
1107    }
1108
1109    #[rstest]
1110    fn test_resolve_price_precision_explicit_arg_overrides_cache(
1111        mut loader_without_seed: DatabentoDataLoader,
1112    ) {
1113        let instrument_id = InstrumentId::from("ESM4.GLBX");
1114        // Seed a deliberately wrong cache value so we can detect which path is taken
1115        loader_without_seed.set_price_precision(Symbol::from("ESM4"), 9);
1116
1117        let explicit = loader_without_seed
1118            .resolve_price_precision(&instrument_id, Some(2))
1119            .unwrap();
1120        assert_eq!(explicit, 2);
1121
1122        let cached = loader_without_seed
1123            .resolve_price_precision(&instrument_id, None)
1124            .unwrap();
1125        assert_eq!(cached, 9);
1126    }
1127
1128    #[rstest]
1129    fn test_resolve_price_precision_cache_miss_errors(loader_without_seed: DatabentoDataLoader) {
1130        let instrument_id = InstrumentId::from("ESM4.GLBX");
1131
1132        let err = loader_without_seed
1133            .resolve_price_precision(&instrument_id, None)
1134            .expect_err("expected cache-miss error");
1135        assert!(format!("{err}").contains("Could not resolve `price_precision`"));
1136    }
1137
1138    #[rstest]
1139    fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
1140        let path = test_data_path().join("test_data.mbo.dbn.zst");
1141        let instrument_id = InstrumentId::from("ESM4.GLBX");
1142
1143        let deltas = loader
1144            .load_order_book_deltas(&path, Some(instrument_id), None)
1145            .unwrap();
1146
1147        assert_eq!(deltas.len(), 2);
1148    }
1149
1150    #[rstest]
1151    fn test_read_order_book_deltas_streams_without_collecting(loader: DatabentoDataLoader) {
1152        let path = test_data_path().join("test_data.mbo.dbn.zst");
1153        let instrument_id = InstrumentId::from("ESM4.GLBX");
1154
1155        let count = loader
1156            .read_order_book_deltas(&path, Some(instrument_id), None)
1157            .unwrap()
1158            .map(|result| result.map(|_| 1usize))
1159            .sum::<anyhow::Result<usize>>()
1160            .unwrap();
1161
1162        assert_eq!(count, 2);
1163    }
1164
1165    #[rstest]
1166    fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
1167        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1168        let instrument_id = InstrumentId::from("ESM4.GLBX");
1169
1170        let depths = loader
1171            .load_order_book_depth10(&path, Some(instrument_id), None)
1172            .unwrap();
1173
1174        assert_eq!(depths.len(), 2);
1175    }
1176
1177    #[rstest]
1178    fn test_load_quotes(loader: DatabentoDataLoader) {
1179        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1180        let instrument_id = InstrumentId::from("ESM4.GLBX");
1181
1182        let quotes = loader
1183            .load_quotes(&path, Some(instrument_id), None)
1184            .unwrap();
1185
1186        assert_eq!(quotes.len(), 2);
1187    }
1188
1189    #[rstest]
1190    #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
1191    #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
1192    fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1193        let instrument_id = InstrumentId::from("ESM4.GLBX");
1194
1195        let quotes = loader
1196            .load_bbo_quotes(&path, Some(instrument_id), None)
1197            .unwrap();
1198
1199        assert_eq!(quotes.len(), 4);
1200    }
1201
1202    #[rstest]
1203    fn test_load_cmbp_quotes(loader: DatabentoDataLoader) {
1204        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1205        let instrument_id = InstrumentId::from("ESM4.GLBX");
1206
1207        let quotes = loader
1208            .load_cmbp_quotes(&path, Some(instrument_id), None)
1209            .unwrap();
1210
1211        // Verify exact data count
1212        assert_eq!(quotes.len(), 2);
1213
1214        // Verify first quote fields
1215        let first_quote = &quotes[0];
1216        assert_eq!(first_quote.instrument_id, instrument_id);
1217        assert_eq!(first_quote.bid_price, Price::from("3720.25"));
1218        assert_eq!(first_quote.ask_price, Price::from("3720.50"));
1219        assert_eq!(first_quote.bid_size, Quantity::from(24));
1220        assert_eq!(first_quote.ask_size, Quantity::from(11));
1221        assert_eq!(first_quote.ts_event, 1609160400006136329);
1222        assert_eq!(first_quote.ts_init, 1609160400006136329);
1223    }
1224
1225    #[rstest]
1226    fn test_load_cbbo_quotes(loader: DatabentoDataLoader) {
1227        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1228        let instrument_id = InstrumentId::from("ESM4.GLBX");
1229
1230        let quotes = loader
1231            .load_cbbo_quotes(&path, Some(instrument_id), None)
1232            .unwrap();
1233
1234        // Verify exact data count
1235        assert_eq!(quotes.len(), 2);
1236
1237        // Verify first quote fields
1238        let first_quote = &quotes[0];
1239        assert_eq!(first_quote.instrument_id, instrument_id);
1240        assert_eq!(first_quote.bid_price, Price::from("3720.25"));
1241        assert_eq!(first_quote.ask_price, Price::from("3720.50"));
1242        assert_eq!(first_quote.bid_size, Quantity::from(24));
1243        assert_eq!(first_quote.ask_size, Quantity::from(11));
1244        assert_eq!(first_quote.ts_event, 1609160400006136329);
1245        assert_eq!(first_quote.ts_init, 1609160400006136329);
1246    }
1247
1248    #[rstest]
1249    fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
1250        let path = test_data_path().join("test_data.tbbo.dbn.zst");
1251        let instrument_id = InstrumentId::from("ESM4.GLBX");
1252
1253        let trades = loader
1254            .load_tbbo_trades(&path, Some(instrument_id), None)
1255            .unwrap();
1256
1257        assert_eq!(trades.len(), 2);
1258        assert_eq!(trades[0].instrument_id, instrument_id);
1259        assert_eq!(trades[0].price, Price::from("3720.25"));
1260        assert_eq!(trades[0].size, Quantity::from("5"));
1261    }
1262
1263    #[rstest]
1264    fn test_load_tcbbo_trades_rejects_cbbo_fixture(loader: DatabentoDataLoader) {
1265        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1266        let instrument_id = InstrumentId::from("ESM4.GLBX");
1267
1268        let result = loader.load_tcbbo_trades(&path, Some(instrument_id), None);
1269
1270        assert!(result.is_err());
1271    }
1272
1273    #[rstest]
1274    fn test_load_trades(loader: DatabentoDataLoader) {
1275        let path = test_data_path().join("test_data.trades.dbn.zst");
1276        let instrument_id = InstrumentId::from("ESM4.GLBX");
1277        let trades = loader
1278            .load_trades(&path, Some(instrument_id), None)
1279            .unwrap();
1280
1281        assert_eq!(trades.len(), 2);
1282    }
1283
1284    #[rstest]
1285    // #[case(test_data_path().join("test_data.ohlcv-1d.dbn.zst"))]  // TODO: Empty file (0 records)
1286    #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
1287    #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
1288    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1289    fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1290        let instrument_id = InstrumentId::from("ESM4.GLBX");
1291        let bars = loader
1292            .load_bars(&path, Some(instrument_id), None, None)
1293            .unwrap();
1294
1295        assert_eq!(bars.len(), 2);
1296    }
1297
1298    #[rstest]
1299    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1300    fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1301        let instrument_id = InstrumentId::from("ESM4.GLBX");
1302        let bars = loader
1303            .load_bars(&path, Some(instrument_id), None, Some(true))
1304            .unwrap();
1305
1306        assert_eq!(bars.len(), 2);
1307
1308        // When bars_timestamp_on_close is true, both ts_event and ts_init should be close time
1309        for bar in &bars {
1310            assert_eq!(
1311                bar.ts_event, bar.ts_init,
1312                "ts_event and ts_init should both be close time when bars_timestamp_on_close=true"
1313            );
1314        }
1315    }
1316
1317    #[rstest]
1318    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1319    fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1320        let instrument_id = InstrumentId::from("ESM4.GLBX");
1321        let bars = loader
1322            .load_bars(&path, Some(instrument_id), None, Some(false))
1323            .unwrap();
1324
1325        assert_eq!(bars.len(), 2);
1326
1327        // When bars_timestamp_on_close is false, ts_event is open time and ts_init is close time
1328        for bar in &bars {
1329            assert_ne!(
1330                bar.ts_event, bar.ts_init,
1331                "ts_event should be open time and ts_init should be close time when bars_timestamp_on_close=false"
1332            );
1333            // For 1-second bars, ts_init (close) should be 1 second after ts_event (open)
1334            assert_eq!(bar.ts_init.as_u64(), bar.ts_event.as_u64() + 1_000_000_000);
1335        }
1336    }
1337
1338    #[rstest]
1339    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
1340    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
1341    fn test_load_bars_timestamp_comparison(
1342        loader: DatabentoDataLoader,
1343        #[case] path: PathBuf,
1344        #[case] bar_index: usize,
1345    ) {
1346        const ONE_SECOND_NS: u64 = 1_000_000_000;
1347
1348        let instrument_id = InstrumentId::from("ESM4.GLBX");
1349
1350        let bars_close = loader
1351            .load_bars(&path, Some(instrument_id), None, Some(true))
1352            .unwrap();
1353
1354        let bars_open = loader
1355            .load_bars(&path, Some(instrument_id), None, Some(false))
1356            .unwrap();
1357
1358        assert_eq!(bars_close.len(), bars_open.len());
1359        assert_eq!(bars_close.len(), 2);
1360
1361        let bar_close = &bars_close[bar_index];
1362        let bar_open = &bars_open[bar_index];
1363
1364        // Bars should have the same OHLCV data
1365        assert_eq!(bar_close.open, bar_open.open);
1366        assert_eq!(bar_close.high, bar_open.high);
1367        assert_eq!(bar_close.low, bar_open.low);
1368        assert_eq!(bar_close.close, bar_open.close);
1369        assert_eq!(bar_close.volume, bar_open.volume);
1370
1371        // The close-timestamped bar should have later timestamp than open-timestamped bar
1372        // For 1-second bars, this should be exactly 1 second difference
1373        assert!(
1374            bar_close.ts_event > bar_open.ts_event,
1375            "Close-timestamped bar should have later timestamp than open-timestamped bar"
1376        );
1377
1378        // The difference should be exactly 1 second (1_000_000_000 nanoseconds) for 1s bars
1379        assert_eq!(
1380            bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
1381            ONE_SECOND_NS,
1382            "Timestamp difference should be exactly 1 second for 1s bars"
1383        );
1384    }
1385
1386    #[rstest]
1387    fn test_load_status_records(loader: DatabentoDataLoader) {
1388        let path = test_data_path().join("test_data.status.dbn.zst");
1389        let instrument_id = InstrumentId::from("ESM4.GLBX");
1390
1391        let statuses = loader
1392            .load_status_records::<dbn::StatusMsg>(&path, Some(instrument_id))
1393            .unwrap()
1394            .collect::<anyhow::Result<Vec<_>>>()
1395            .unwrap();
1396
1397        // Assert total count matches Python test expectations
1398        assert_eq!(statuses.len(), 4, "Should load exactly 4 status records");
1399
1400        // Assert first record fields match Python test expectations
1401        let first = &statuses[0];
1402        assert_eq!(first.instrument_id, instrument_id);
1403        assert_eq!(first.ts_event.as_u64(), 1609110000000000000);
1404        assert_eq!(first.ts_init.as_u64(), 1609113600000000000);
1405    }
1406
1407    #[rstest]
1408    fn test_read_imbalance_records(loader: DatabentoDataLoader) {
1409        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1410        let instrument_id = InstrumentId::from("ESM4.GLBX");
1411
1412        let imbalances = loader
1413            .read_imbalance_records::<dbn::ImbalanceMsg>(&path, Some(instrument_id), None)
1414            .unwrap()
1415            .collect::<anyhow::Result<Vec<_>>>()
1416            .unwrap();
1417
1418        // Assert total count
1419        assert_eq!(
1420            imbalances.len(),
1421            2,
1422            "Should load exactly 2 imbalance records"
1423        );
1424
1425        // Assert first record has required fields
1426        let first = &imbalances[0];
1427        assert_eq!(first.instrument_id, instrument_id);
1428        assert!(
1429            first.ref_price.as_f64() > 0.0,
1430            "ref_price should be positive"
1431        );
1432        assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1433        assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1434        assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1435    }
1436
1437    #[rstest]
1438    fn test_read_statistics_records(loader: DatabentoDataLoader) {
1439        let path = test_data_path().join("test_data.statistics.dbn.zst");
1440        let instrument_id = InstrumentId::from("ESM4.GLBX");
1441
1442        let statistics = loader
1443            .read_statistics_records::<dbn::StatMsg>(&path, Some(instrument_id), None)
1444            .unwrap()
1445            .collect::<anyhow::Result<Vec<_>>>()
1446            .unwrap();
1447
1448        // Assert total count
1449        assert_eq!(
1450            statistics.len(),
1451            2,
1452            "Should load exactly 2 statistics records"
1453        );
1454
1455        // Assert first record has required fields
1456        let first = &statistics[0];
1457        assert_eq!(first.instrument_id, instrument_id);
1458        assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1459        assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1460        assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1461        assert!(first.sequence > 0, "sequence should be positive");
1462    }
1463}