1use std::{
17 env,
18 path::{Path, PathBuf},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use databento::dbn::{self, InstrumentDefMsg};
24use dbn::{
25 Publisher,
26 decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32 identifiers::{InstrumentId, Symbol, Venue},
33 instruments::{Instrument, InstrumentAny},
34};
35
36use super::{
37 decode::{
38 decode_imbalance_msg, decode_record, decode_statistics_msg, decode_status_msg,
39 is_supported_stat_type,
40 },
41 symbology::decode_nautilus_instrument_id,
42 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
43};
44use crate::{
45 common::{build_publisher_venue_map, load_publishers},
46 decode::{DatabentoDecodeConfig, decode_instrument_def_msg},
47 symbology::MetadataCache,
48};
49
50#[cfg_attr(
78 feature = "python",
79 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
80)]
81#[cfg_attr(
82 feature = "python",
83 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.adapters.databento")
84)]
85#[derive(Debug)]
86pub struct DatabentoDataLoader {
87 publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
88 venue_dataset_map: IndexMap<Venue, Dataset>,
89 publisher_venue_map: IndexMap<PublisherId, Venue>,
90 symbol_venue_map: AHashMap<Symbol, Venue>,
91 price_precisions: AHashMap<Symbol, u8>,
92}
93
94impl DatabentoDataLoader {
95 pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
101 let mut loader = Self {
102 publishers_map: IndexMap::new(),
103 venue_dataset_map: IndexMap::new(),
104 publisher_venue_map: IndexMap::new(),
105 symbol_venue_map: AHashMap::new(),
106 price_precisions: AHashMap::new(),
107 };
108
109 let publishers_filepath = if let Some(p) = publishers_filepath {
111 p
112 } else {
113 let mut exe_path = env::current_exe()?;
115 exe_path.pop();
116 exe_path.push("publishers.json");
117 exe_path
118 };
119
120 loader
121 .load_publishers(publishers_filepath)
122 .context("error loading publishers.json")?;
123
124 Ok(loader)
125 }
126
127 pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
133 let publishers = load_publishers(filepath)?;
134
135 self.publishers_map = publishers
136 .iter()
137 .cloned()
138 .map(|p| (p.publisher_id, p))
139 .collect();
140
141 let mut venue_dataset_map = IndexMap::new();
142
143 for publisher in &publishers {
145 let venue = Venue::from(publisher.venue.as_str());
146 let dataset = Dataset::from(publisher.dataset.as_str());
147 venue_dataset_map.entry(venue).or_insert(dataset);
148 }
149
150 self.venue_dataset_map = venue_dataset_map;
151 apply_default_venue_dataset_mappings(&mut self.venue_dataset_map);
152
153 self.publisher_venue_map = build_publisher_venue_map(&publishers);
154
155 Ok(())
156 }
157
158 #[must_use]
160 pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
161 &self.publishers_map
162 }
163
164 pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
166 _ = self.venue_dataset_map.insert(venue, dataset);
167 }
168
169 #[must_use]
171 pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
172 self.venue_dataset_map.get(venue)
173 }
174
175 #[must_use]
177 pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
178 self.publisher_venue_map.get(&publisher_id)
179 }
180
181 pub fn set_price_precision(&mut self, symbol: Symbol, price_precision: u8) {
187 self.price_precisions.insert(symbol, price_precision);
188 }
189
190 #[must_use]
192 pub const fn get_price_precisions(&self) -> &AHashMap<Symbol, u8> {
193 &self.price_precisions
194 }
195
196 fn resolve_price_precision(
206 &self,
207 instrument_id: &InstrumentId,
208 price_precision: Option<u8>,
209 ) -> anyhow::Result<u8> {
210 if let Some(precision) = price_precision {
211 return Ok(precision);
212 }
213
214 self.price_precisions
215 .get(&instrument_id.symbol)
216 .copied()
217 .ok_or_else(|| {
218 anyhow::anyhow!(
219 "Could not resolve `price_precision` for {instrument_id}: \
220 pass `price_precision` explicitly, call `set_price_precision`, \
221 or load the instrument definitions first via `load_instruments`"
222 )
223 })
224 }
225
226 pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
232 let decoder = Decoder::from_zstd_file(filepath)?;
233 let metadata = decoder.metadata();
234 Ok(metadata.schema.map(|schema| schema.to_string()))
235 }
236
237 pub fn read_definition_records<'a>(
243 &'a mut self,
244 filepath: &Path,
245 use_exchange_as_venue: bool,
246 decode_config: Option<&'a DatabentoDecodeConfig>,
247 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + 'a> {
248 let decoder = Decoder::from_zstd_file(filepath)?;
249 let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsg>();
250
251 Ok(std::iter::from_fn(move || {
254 loop {
255 let advance = dbn_stream
256 .advance()
257 .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"));
258 if let Err(e) = advance {
259 return Some(Err(e));
260 }
261
262 let rec = dbn_stream.get()?;
263
264 let result: anyhow::Result<Option<InstrumentAny>> = (|| {
265 let record = dbn::RecordRef::from(rec);
266 let msg = record
267 .get::<InstrumentDefMsg>()
268 .ok_or_else(|| anyhow::anyhow!("Failed to decode InstrumentDefMsg"))?;
269
270 let raw_symbol = rec
271 .raw_symbol()
272 .map_err(|e| anyhow::anyhow!("Error decoding `raw_symbol`: {e}"))?;
273 let symbol = Symbol::from(raw_symbol);
274
275 let publisher = rec
276 .hd
277 .publisher()
278 .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
279 let venue = match publisher {
280 Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
281 let exchange = rec.exchange().map_err(|e| {
282 anyhow::anyhow!("Missing `exchange` for record: {e}")
283 })?;
284 let venue = Venue::from_code(exchange).map_err(|e| {
285 anyhow::anyhow!("Venue not found for exchange {exchange}: {e}")
286 })?;
287 self.symbol_venue_map.insert(symbol, venue);
288 venue
289 }
290 _ => *self
291 .publisher_venue_map
292 .get(&msg.hd.publisher_id)
293 .ok_or_else(|| {
294 anyhow::anyhow!(
295 "Venue not found for publisher_id {}",
296 msg.hd.publisher_id
297 )
298 })?,
299 };
300 let instrument_id = InstrumentId::new(symbol, venue);
301 let ts_init = msg.ts_recv.into();
302
303 decode_instrument_def_msg(rec, instrument_id, Some(ts_init), decode_config)
304 })();
305
306 match result {
307 Ok(Some(item)) => return Some(Ok(item)),
308 Ok(None) => {}
309 Err(e) => return Some(Err(e)),
310 }
311 }
312 }))
313 }
314
315 pub fn read_records<T>(
321 &self,
322 filepath: &Path,
323 instrument_id: Option<InstrumentId>,
324 price_precision: Option<u8>,
325 include_trades: bool,
326 bars_timestamp_on_close: Option<bool>,
327 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
328 where
329 T: dbn::Record + dbn::HasRType + 'static,
330 {
331 let decoder = Decoder::from_zstd_file(filepath)?;
332 let mut metadata_cache = if instrument_id.is_none() {
333 Some(MetadataCache::new(decoder.metadata().clone()))
334 } else {
335 None
336 };
337 let mut dbn_stream = decoder.decode_stream::<T>();
338 let fixed_instrument_id = instrument_id.is_some();
339 let mut fixed_price_precision = price_precision;
340
341 Ok(std::iter::from_fn(move || {
342 let result: anyhow::Result<Option<(Option<Data>, Option<Data>)>> = (|| {
343 dbn_stream
344 .advance()
345 .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
346
347 if let Some(rec) = dbn_stream.get() {
348 let record = dbn::RecordRef::from(rec);
349 let instrument_id = self
350 .resolve_record_instrument_id(&record, instrument_id, &mut metadata_cache)
351 .context("failed to decode instrument id")?;
352 let resolved_precision = self.resolve_stream_price_precision(
353 &instrument_id,
354 fixed_instrument_id,
355 &mut fixed_price_precision,
356 )?;
357 let (item1, item2) = decode_record(
358 &record,
359 instrument_id,
360 resolved_precision,
361 None,
362 include_trades,
363 bars_timestamp_on_close.unwrap_or(true),
364 )?;
365 Ok(Some((item1, item2)))
366 } else {
367 Ok(None)
368 }
369 })();
370
371 match result {
372 Ok(Some(v)) => Some(Ok(v)),
373 Ok(None) => None,
374 Err(e) => Some(Err(e)),
375 }
376 }))
377 }
378
379 pub fn load_instruments(
388 &mut self,
389 filepath: &Path,
390 use_exchange_as_venue: bool,
391 skip_on_error: bool,
392 decode_config: Option<&DatabentoDecodeConfig>,
393 ) -> anyhow::Result<Vec<InstrumentAny>> {
394 let instruments = if skip_on_error {
395 let mut collected = Vec::new();
396
397 for result in
398 self.read_definition_records(filepath, use_exchange_as_venue, decode_config)?
399 {
400 match result {
401 Ok(instrument) => collected.push(instrument),
402 Err(e) => log::warn!("Skipping instrument: {e}"),
403 }
404 }
405 collected
406 } else {
407 self.read_definition_records(filepath, use_exchange_as_venue, decode_config)?
408 .collect::<Result<Vec<_>, _>>()?
409 };
410
411 for instrument in &instruments {
412 self.price_precisions
413 .insert(instrument.id().symbol, instrument.price_precision());
414 }
415
416 Ok(instruments)
417 }
418
419 pub fn load_order_book_deltas(
427 &self,
428 filepath: &Path,
429 instrument_id: Option<InstrumentId>,
430 price_precision: Option<u8>,
431 ) -> anyhow::Result<Vec<OrderBookDelta>> {
432 self.read_order_book_deltas(filepath, instrument_id, price_precision)?
433 .collect()
434 }
435
436 pub fn read_order_book_deltas(
444 &self,
445 filepath: &Path,
446 instrument_id: Option<InstrumentId>,
447 price_precision: Option<u8>,
448 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<OrderBookDelta>> + '_> {
449 let records = self.read_records::<dbn::MboMsg>(
450 filepath,
451 instrument_id,
452 price_precision,
453 false,
454 None,
455 )?;
456
457 Ok(records.filter_map(|result| match result {
458 Ok((Some(item1), _)) => {
459 if let Data::Delta(delta) = item1 {
460 Some(Ok(delta))
461 } else {
462 None
463 }
464 }
465 Ok((None, _)) => None,
466 Err(e) => Some(Err(e)),
467 }))
468 }
469
470 pub fn load_order_book_depth10(
476 &self,
477 filepath: &Path,
478 instrument_id: Option<InstrumentId>,
479 price_precision: Option<u8>,
480 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
481 self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
482 .filter_map(|result| match result {
483 Ok((Some(item1), _)) => {
484 if let Data::Depth10(depth) = item1 {
485 Some(Ok(*depth))
486 } else {
487 None
488 }
489 }
490 Ok((None, _)) => None,
491 Err(e) => Some(Err(e)),
492 })
493 .collect()
494 }
495
496 pub fn load_quotes(
502 &self,
503 filepath: &Path,
504 instrument_id: Option<InstrumentId>,
505 price_precision: Option<u8>,
506 ) -> anyhow::Result<Vec<QuoteTick>> {
507 self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
508 .filter_map(|result| match result {
509 Ok((Some(item1), _)) => {
510 if let Data::Quote(quote) = item1 {
511 Some(Ok(quote))
512 } else {
513 None
514 }
515 }
516 Ok((None, _)) => None,
517 Err(e) => Some(Err(e)),
518 })
519 .collect()
520 }
521
522 pub fn load_bbo_quotes(
528 &self,
529 filepath: &Path,
530 instrument_id: Option<InstrumentId>,
531 price_precision: Option<u8>,
532 ) -> anyhow::Result<Vec<QuoteTick>> {
533 self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
534 .filter_map(|result| match result {
535 Ok((Some(item1), _)) => {
536 if let Data::Quote(quote) = item1 {
537 Some(Ok(quote))
538 } else {
539 None
540 }
541 }
542 Ok((None, _)) => None,
543 Err(e) => Some(Err(e)),
544 })
545 .collect()
546 }
547
548 pub fn load_cmbp_quotes(
554 &self,
555 filepath: &Path,
556 instrument_id: Option<InstrumentId>,
557 price_precision: Option<u8>,
558 ) -> anyhow::Result<Vec<QuoteTick>> {
559 self.read_records::<dbn::Cmbp1Msg>(filepath, instrument_id, price_precision, false, None)?
560 .filter_map(|result| match result {
561 Ok((Some(item1), _)) => {
562 if let Data::Quote(quote) = item1 {
563 Some(Ok(quote))
564 } else {
565 None
566 }
567 }
568 Ok((None, _)) => None,
569 Err(e) => Some(Err(e)),
570 })
571 .collect()
572 }
573
574 pub fn load_cbbo_quotes(
580 &self,
581 filepath: &Path,
582 instrument_id: Option<InstrumentId>,
583 price_precision: Option<u8>,
584 ) -> anyhow::Result<Vec<QuoteTick>> {
585 self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
586 .filter_map(|result| match result {
587 Ok((Some(item1), _)) => {
588 if let Data::Quote(quote) = item1 {
589 Some(Ok(quote))
590 } else {
591 None
592 }
593 }
594 Ok((None, _)) => None,
595 Err(e) => Some(Err(e)),
596 })
597 .collect()
598 }
599
600 pub fn load_tbbo_trades(
606 &self,
607 filepath: &Path,
608 instrument_id: Option<InstrumentId>,
609 price_precision: Option<u8>,
610 ) -> anyhow::Result<Vec<TradeTick>> {
611 self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, true, None)?
612 .filter_map(|result| match result {
613 Ok((_, maybe_item2)) => {
614 if let Some(Data::Trade(trade)) = maybe_item2 {
615 Some(Ok(trade))
616 } else {
617 None
618 }
619 }
620 Err(e) => Some(Err(e)),
621 })
622 .collect()
623 }
624
625 pub fn load_tcbbo_trades(
631 &self,
632 filepath: &Path,
633 instrument_id: Option<InstrumentId>,
634 price_precision: Option<u8>,
635 ) -> anyhow::Result<Vec<TradeTick>> {
636 self.read_records::<dbn::TcbboMsg>(filepath, instrument_id, price_precision, true, None)?
637 .filter_map(|result| match result {
638 Ok((_, maybe_item2)) => {
639 if let Some(Data::Trade(trade)) = maybe_item2 {
640 Some(Ok(trade))
641 } else {
642 None
643 }
644 }
645 Err(e) => Some(Err(e)),
646 })
647 .collect()
648 }
649
650 pub fn load_trades(
656 &self,
657 filepath: &Path,
658 instrument_id: Option<InstrumentId>,
659 price_precision: Option<u8>,
660 ) -> anyhow::Result<Vec<TradeTick>> {
661 self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
662 .filter_map(|result| match result {
663 Ok((Some(item1), _)) => {
664 if let Data::Trade(trade) = item1 {
665 Some(Ok(trade))
666 } else {
667 None
668 }
669 }
670 Ok((None, _)) => None,
671 Err(e) => Some(Err(e)),
672 })
673 .collect()
674 }
675
676 pub fn load_bars(
682 &self,
683 filepath: &Path,
684 instrument_id: Option<InstrumentId>,
685 price_precision: Option<u8>,
686 timestamp_on_close: Option<bool>,
687 ) -> anyhow::Result<Vec<Bar>> {
688 self.read_records::<dbn::OhlcvMsg>(
689 filepath,
690 instrument_id,
691 price_precision,
692 false,
693 timestamp_on_close,
694 )?
695 .filter_map(|result| match result {
696 Ok((Some(item1), _)) => {
697 if let Data::Bar(bar) = item1 {
698 Some(Ok(bar))
699 } else {
700 None
701 }
702 }
703 Ok((None, _)) => None,
704 Err(e) => Some(Err(e)),
705 })
706 .collect()
707 }
708
709 pub fn load_status_records<T>(
715 &self,
716 filepath: &Path,
717 instrument_id: Option<InstrumentId>,
718 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
719 where
720 T: dbn::Record + dbn::HasRType + 'static,
721 {
722 let decoder = Decoder::from_zstd_file(filepath)?;
723 let mut metadata_cache = if instrument_id.is_none() {
724 Some(MetadataCache::new(decoder.metadata().clone()))
725 } else {
726 None
727 };
728 let mut dbn_stream = decoder.decode_stream::<T>();
729
730 Ok(std::iter::from_fn(move || {
731 if let Err(e) = dbn_stream.advance() {
732 return Some(Err(e.into()));
733 }
734
735 match dbn_stream.get() {
736 Some(rec) => {
737 let record = dbn::RecordRef::from(rec);
738 let instrument_id = match self.resolve_record_instrument_id(
739 &record,
740 instrument_id,
741 &mut metadata_cache,
742 ) {
743 Ok(id) => id,
744 Err(e) => return Some(Err(e)),
745 };
746
747 let msg = match record.get::<dbn::StatusMsg>() {
748 Some(m) => m,
749 None => return Some(Err(anyhow::anyhow!("Invalid `StatusMsg`"))),
750 };
751 let ts_init = msg.ts_recv.into();
752
753 match decode_status_msg(msg, instrument_id, Some(ts_init)) {
754 Ok(data) => Some(Ok(data)),
755 Err(e) => Some(Err(e)),
756 }
757 }
758 None => None,
759 }
760 }))
761 }
762
763 pub fn read_imbalance_records<T>(
769 &self,
770 filepath: &Path,
771 instrument_id: Option<InstrumentId>,
772 price_precision: Option<u8>,
773 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
774 where
775 T: dbn::Record + dbn::HasRType + 'static,
776 {
777 let decoder = Decoder::from_zstd_file(filepath)?;
778 let mut metadata_cache = if instrument_id.is_none() {
779 Some(MetadataCache::new(decoder.metadata().clone()))
780 } else {
781 None
782 };
783 let mut dbn_stream = decoder.decode_stream::<T>();
784 let fixed_instrument_id = instrument_id.is_some();
785 let mut fixed_price_precision = price_precision;
786
787 Ok(std::iter::from_fn(move || {
788 if let Err(e) = dbn_stream.advance() {
789 return Some(Err(e.into()));
790 }
791
792 match dbn_stream.get() {
793 Some(rec) => {
794 let record = dbn::RecordRef::from(rec);
795 let instrument_id = match self.resolve_record_instrument_id(
796 &record,
797 instrument_id,
798 &mut metadata_cache,
799 ) {
800 Ok(id) => id,
801 Err(e) => return Some(Err(e)),
802 };
803 let resolved_precision = match self.resolve_stream_price_precision(
804 &instrument_id,
805 fixed_instrument_id,
806 &mut fixed_price_precision,
807 ) {
808 Ok(p) => p,
809 Err(e) => return Some(Err(e)),
810 };
811
812 let msg = match record.get::<dbn::ImbalanceMsg>() {
813 Some(m) => m,
814 None => return Some(Err(anyhow::anyhow!("Invalid `ImbalanceMsg`"))),
815 };
816 let ts_init = msg.ts_recv.into();
817
818 match decode_imbalance_msg(
819 msg,
820 instrument_id,
821 resolved_precision,
822 Some(ts_init),
823 ) {
824 Ok(data) => Some(Ok(data)),
825 Err(e) => Some(Err(e)),
826 }
827 }
828 None => None,
829 }
830 }))
831 }
832
833 pub fn read_statistics_records<T>(
839 &self,
840 filepath: &Path,
841 instrument_id: Option<InstrumentId>,
842 price_precision: Option<u8>,
843 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
844 where
845 T: dbn::Record + dbn::HasRType + 'static,
846 {
847 let decoder = Decoder::from_zstd_file(filepath)?;
848 let mut metadata_cache = if instrument_id.is_none() {
849 Some(MetadataCache::new(decoder.metadata().clone()))
850 } else {
851 None
852 };
853 let mut dbn_stream = decoder.decode_stream::<T>();
854 let fixed_instrument_id = instrument_id.is_some();
855 let mut fixed_price_precision = price_precision;
856
857 Ok(std::iter::from_fn(move || {
860 loop {
861 if let Err(e) = dbn_stream.advance() {
862 return Some(Err(e.into()));
863 }
864
865 let rec = dbn_stream.get()?;
866 let record = dbn::RecordRef::from(rec);
867 let msg = match record.get::<dbn::StatMsg>() {
868 Some(m) => m,
869 None => return Some(Err(anyhow::anyhow!("Invalid `StatMsg`"))),
870 };
871
872 if !is_supported_stat_type(msg.stat_type) {
873 log::warn!("Skipping unsupported `stat_type` {}", msg.stat_type);
874 continue;
875 }
876
877 let instrument_id = match self.resolve_record_instrument_id(
878 &record,
879 instrument_id,
880 &mut metadata_cache,
881 ) {
882 Ok(id) => id,
883 Err(e) => return Some(Err(e)),
884 };
885 let resolved_precision = match self.resolve_stream_price_precision(
886 &instrument_id,
887 fixed_instrument_id,
888 &mut fixed_price_precision,
889 ) {
890 Ok(p) => p,
891 Err(e) => return Some(Err(e)),
892 };
893 let ts_init = msg.ts_recv.into();
894
895 match decode_statistics_msg(msg, instrument_id, resolved_precision, Some(ts_init)) {
896 Ok(Some(data)) => return Some(Ok(data)),
897 Ok(None) => {}
898 Err(e) => return Some(Err(e)),
899 }
900 }
901 }))
902 }
903
904 fn resolve_record_instrument_id(
905 &self,
906 record: &dbn::RecordRef,
907 instrument_id: Option<InstrumentId>,
908 metadata_cache: &mut Option<MetadataCache>,
909 ) -> anyhow::Result<InstrumentId> {
910 if let Some(instrument_id) = instrument_id {
911 return Ok(instrument_id);
912 }
913
914 let metadata_cache = metadata_cache
915 .as_mut()
916 .ok_or_else(|| anyhow::anyhow!("missing metadata cache for dynamic instrument id"))?;
917
918 decode_nautilus_instrument_id(
919 record,
920 metadata_cache,
921 &self.publisher_venue_map,
922 &self.symbol_venue_map,
923 )
924 }
925
926 fn resolve_stream_price_precision(
927 &self,
928 instrument_id: &InstrumentId,
929 fixed_instrument_id: bool,
930 fixed_price_precision: &mut Option<u8>,
931 ) -> anyhow::Result<u8> {
932 if let Some(precision) = *fixed_price_precision {
933 return Ok(precision);
934 }
935
936 let precision = self.resolve_price_precision(instrument_id, None)?;
937 if fixed_instrument_id {
938 *fixed_price_precision = Some(precision);
939 }
940
941 Ok(precision)
942 }
943}
944
945fn apply_default_venue_dataset_mappings(venue_dataset_map: &mut IndexMap<Venue, Dataset>) {
949 let glbx = Dataset::from("GLBX.MDP3");
950
951 for venue in [
952 Venue::CBCM(),
953 Venue::GLBX(),
954 Venue::NYUM(),
955 Venue::XCBT(),
956 Venue::XCEC(),
957 Venue::XCME(),
958 Venue::XFXS(),
959 Venue::XNYM(),
960 ] {
961 _ = venue_dataset_map.insert(venue, glbx);
962 }
963
964 _ = venue_dataset_map.insert(Venue::from("EQUS"), Dataset::from("EQUS.MINI"));
967
968 let opra = Dataset::from("OPRA.PILLAR");
969 for venue_code in [
970 "AMXO", "XBOX", "XCBO", "EMLD", "EDGO", "GMNI", "XISX", "MCRY", "XMIO", "ARCO", "OPRA",
971 "MPRL", "XNDQ", "XBXO", "C2OX", "XPHL", "BATO", "MXOP", "SPHR",
972 ] {
973 _ = venue_dataset_map.insert(Venue::from(venue_code), opra);
974 }
975}
976
977#[cfg(test)]
978mod tests {
979 use std::path::{Path, PathBuf};
980
981 use nautilus_model::types::{Price, Quantity};
982 use rstest::{fixture, rstest};
983 use ustr::Ustr;
984
985 use super::*;
986
987 fn test_data_path() -> PathBuf {
988 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
989 }
990
991 #[fixture]
992 fn loader() -> DatabentoDataLoader {
993 let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
994 let mut loader = DatabentoDataLoader::new(Some(publishers_filepath)).unwrap();
995 loader.set_price_precision(Symbol::from("ESM4"), 2);
997 loader
998 }
999
1000 #[fixture]
1001 fn loader_without_seed() -> DatabentoDataLoader {
1002 let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
1003 DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
1004 }
1005
1006 #[rstest]
1009 fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
1010 let dataset = Ustr::from("EQUS.PLUS");
1011 let venue = Venue::from("XNAS");
1012 loader.set_dataset_for_venue(dataset, venue);
1013
1014 let result = loader.get_dataset_for_venue(&venue).unwrap();
1015 assert_eq!(*result, dataset);
1016 }
1017
1018 #[rstest]
1019 fn test_default_venue_dataset_mappings(loader: DatabentoDataLoader) {
1020 let xcme = Venue::XCME();
1021 let result = loader.get_dataset_for_venue(&xcme).unwrap();
1022 assert_eq!(*result, Ustr::from("GLBX.MDP3"));
1023
1024 let xcbo = Venue::from("XCBO");
1025 let result = loader.get_dataset_for_venue(&xcbo).unwrap();
1026 assert_eq!(*result, Ustr::from("OPRA.PILLAR"));
1027
1028 let equs = Venue::from("EQUS");
1029 let result = loader.get_dataset_for_venue(&equs).unwrap();
1030 assert_eq!(*result, Ustr::from("EQUS.MINI"));
1031 }
1032
1033 #[rstest]
1034 #[case(test_data_path().join("test_data.definition.equity.dbn.zst"))]
1035 fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
1036 let instruments = loader.load_instruments(&path, false, false, None).unwrap();
1037
1038 assert_eq!(instruments.len(), 2);
1039 assert_eq!(
1041 loader.get_price_precisions().get(&Symbol::from("ESM4")),
1042 Some(&2)
1043 );
1044 }
1045
1046 #[rstest]
1047 fn test_load_instruments_populates_price_precisions_cache(
1048 mut loader_without_seed: DatabentoDataLoader,
1049 ) {
1050 let path = test_data_path().join("test_data.definition.equity.dbn.zst");
1051 assert!(loader_without_seed.get_price_precisions().is_empty());
1052
1053 let instruments = loader_without_seed
1054 .load_instruments(&path, false, false, None)
1055 .unwrap();
1056
1057 assert_eq!(instruments.len(), 2);
1058 for instrument in &instruments {
1059 let symbol = instrument.id().symbol;
1060 assert_eq!(
1061 loader_without_seed.get_price_precisions().get(&symbol),
1062 Some(&instrument.price_precision()),
1063 "cache missing or mismatched entry for {symbol}",
1064 );
1065 }
1066 }
1067
1068 #[rstest]
1069 fn test_read_records_errors_when_precision_unresolvable(
1070 loader_without_seed: DatabentoDataLoader,
1071 ) {
1072 let path = test_data_path().join("test_data.mbo.dbn.zst");
1073 let instrument_id = InstrumentId::from("ESM4.GLBX");
1074
1075 let result = loader_without_seed.load_order_book_deltas(&path, Some(instrument_id), None);
1076
1077 let err = result.expect_err("expected precision-resolution error");
1078 let err_msg = format!("{err}");
1079 assert!(
1080 err_msg.contains("Could not resolve `price_precision`"),
1081 "unexpected error message: {err_msg}",
1082 );
1083 assert!(
1084 err_msg.contains("ESM4.GLBX"),
1085 "error should name the instrument: {err_msg}",
1086 );
1087 }
1088
1089 #[rstest]
1090 fn test_set_price_precision_unblocks_reads(mut loader_without_seed: DatabentoDataLoader) {
1091 let path = test_data_path().join("test_data.mbo.dbn.zst");
1092 let instrument_id = InstrumentId::from("ESM4.GLBX");
1093
1094 assert!(
1096 loader_without_seed
1097 .load_order_book_deltas(&path, Some(instrument_id), None)
1098 .is_err()
1099 );
1100
1101 loader_without_seed.set_price_precision(Symbol::from("ESM4"), 2);
1102
1103 let deltas = loader_without_seed
1104 .load_order_book_deltas(&path, Some(instrument_id), None)
1105 .unwrap();
1106 assert_eq!(deltas.len(), 2);
1107 }
1108
1109 #[rstest]
1110 fn test_resolve_price_precision_explicit_arg_overrides_cache(
1111 mut loader_without_seed: DatabentoDataLoader,
1112 ) {
1113 let instrument_id = InstrumentId::from("ESM4.GLBX");
1114 loader_without_seed.set_price_precision(Symbol::from("ESM4"), 9);
1116
1117 let explicit = loader_without_seed
1118 .resolve_price_precision(&instrument_id, Some(2))
1119 .unwrap();
1120 assert_eq!(explicit, 2);
1121
1122 let cached = loader_without_seed
1123 .resolve_price_precision(&instrument_id, None)
1124 .unwrap();
1125 assert_eq!(cached, 9);
1126 }
1127
1128 #[rstest]
1129 fn test_resolve_price_precision_cache_miss_errors(loader_without_seed: DatabentoDataLoader) {
1130 let instrument_id = InstrumentId::from("ESM4.GLBX");
1131
1132 let err = loader_without_seed
1133 .resolve_price_precision(&instrument_id, None)
1134 .expect_err("expected cache-miss error");
1135 assert!(format!("{err}").contains("Could not resolve `price_precision`"));
1136 }
1137
1138 #[rstest]
1139 fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
1140 let path = test_data_path().join("test_data.mbo.dbn.zst");
1141 let instrument_id = InstrumentId::from("ESM4.GLBX");
1142
1143 let deltas = loader
1144 .load_order_book_deltas(&path, Some(instrument_id), None)
1145 .unwrap();
1146
1147 assert_eq!(deltas.len(), 2);
1148 }
1149
1150 #[rstest]
1151 fn test_read_order_book_deltas_streams_without_collecting(loader: DatabentoDataLoader) {
1152 let path = test_data_path().join("test_data.mbo.dbn.zst");
1153 let instrument_id = InstrumentId::from("ESM4.GLBX");
1154
1155 let count = loader
1156 .read_order_book_deltas(&path, Some(instrument_id), None)
1157 .unwrap()
1158 .map(|result| result.map(|_| 1usize))
1159 .sum::<anyhow::Result<usize>>()
1160 .unwrap();
1161
1162 assert_eq!(count, 2);
1163 }
1164
1165 #[rstest]
1166 fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
1167 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1168 let instrument_id = InstrumentId::from("ESM4.GLBX");
1169
1170 let depths = loader
1171 .load_order_book_depth10(&path, Some(instrument_id), None)
1172 .unwrap();
1173
1174 assert_eq!(depths.len(), 2);
1175 }
1176
1177 #[rstest]
1178 fn test_load_quotes(loader: DatabentoDataLoader) {
1179 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1180 let instrument_id = InstrumentId::from("ESM4.GLBX");
1181
1182 let quotes = loader
1183 .load_quotes(&path, Some(instrument_id), None)
1184 .unwrap();
1185
1186 assert_eq!(quotes.len(), 2);
1187 }
1188
1189 #[rstest]
1190 #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
1191 #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
1192 fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1193 let instrument_id = InstrumentId::from("ESM4.GLBX");
1194
1195 let quotes = loader
1196 .load_bbo_quotes(&path, Some(instrument_id), None)
1197 .unwrap();
1198
1199 assert_eq!(quotes.len(), 4);
1200 }
1201
1202 #[rstest]
1203 fn test_load_cmbp_quotes(loader: DatabentoDataLoader) {
1204 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1205 let instrument_id = InstrumentId::from("ESM4.GLBX");
1206
1207 let quotes = loader
1208 .load_cmbp_quotes(&path, Some(instrument_id), None)
1209 .unwrap();
1210
1211 assert_eq!(quotes.len(), 2);
1213
1214 let first_quote = "es[0];
1216 assert_eq!(first_quote.instrument_id, instrument_id);
1217 assert_eq!(first_quote.bid_price, Price::from("3720.25"));
1218 assert_eq!(first_quote.ask_price, Price::from("3720.50"));
1219 assert_eq!(first_quote.bid_size, Quantity::from(24));
1220 assert_eq!(first_quote.ask_size, Quantity::from(11));
1221 assert_eq!(first_quote.ts_event, 1609160400006136329);
1222 assert_eq!(first_quote.ts_init, 1609160400006136329);
1223 }
1224
1225 #[rstest]
1226 fn test_load_cbbo_quotes(loader: DatabentoDataLoader) {
1227 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1228 let instrument_id = InstrumentId::from("ESM4.GLBX");
1229
1230 let quotes = loader
1231 .load_cbbo_quotes(&path, Some(instrument_id), None)
1232 .unwrap();
1233
1234 assert_eq!(quotes.len(), 2);
1236
1237 let first_quote = "es[0];
1239 assert_eq!(first_quote.instrument_id, instrument_id);
1240 assert_eq!(first_quote.bid_price, Price::from("3720.25"));
1241 assert_eq!(first_quote.ask_price, Price::from("3720.50"));
1242 assert_eq!(first_quote.bid_size, Quantity::from(24));
1243 assert_eq!(first_quote.ask_size, Quantity::from(11));
1244 assert_eq!(first_quote.ts_event, 1609160400006136329);
1245 assert_eq!(first_quote.ts_init, 1609160400006136329);
1246 }
1247
1248 #[rstest]
1249 fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
1250 let path = test_data_path().join("test_data.tbbo.dbn.zst");
1251 let instrument_id = InstrumentId::from("ESM4.GLBX");
1252
1253 let trades = loader
1254 .load_tbbo_trades(&path, Some(instrument_id), None)
1255 .unwrap();
1256
1257 assert_eq!(trades.len(), 2);
1258 assert_eq!(trades[0].instrument_id, instrument_id);
1259 assert_eq!(trades[0].price, Price::from("3720.25"));
1260 assert_eq!(trades[0].size, Quantity::from("5"));
1261 }
1262
1263 #[rstest]
1264 fn test_load_tcbbo_trades_rejects_cbbo_fixture(loader: DatabentoDataLoader) {
1265 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1266 let instrument_id = InstrumentId::from("ESM4.GLBX");
1267
1268 let result = loader.load_tcbbo_trades(&path, Some(instrument_id), None);
1269
1270 assert!(result.is_err());
1271 }
1272
1273 #[rstest]
1274 fn test_load_trades(loader: DatabentoDataLoader) {
1275 let path = test_data_path().join("test_data.trades.dbn.zst");
1276 let instrument_id = InstrumentId::from("ESM4.GLBX");
1277 let trades = loader
1278 .load_trades(&path, Some(instrument_id), None)
1279 .unwrap();
1280
1281 assert_eq!(trades.len(), 2);
1282 }
1283
1284 #[rstest]
1285 #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
1287 #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
1288 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1289 fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1290 let instrument_id = InstrumentId::from("ESM4.GLBX");
1291 let bars = loader
1292 .load_bars(&path, Some(instrument_id), None, None)
1293 .unwrap();
1294
1295 assert_eq!(bars.len(), 2);
1296 }
1297
1298 #[rstest]
1299 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1300 fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1301 let instrument_id = InstrumentId::from("ESM4.GLBX");
1302 let bars = loader
1303 .load_bars(&path, Some(instrument_id), None, Some(true))
1304 .unwrap();
1305
1306 assert_eq!(bars.len(), 2);
1307
1308 for bar in &bars {
1310 assert_eq!(
1311 bar.ts_event, bar.ts_init,
1312 "ts_event and ts_init should both be close time when bars_timestamp_on_close=true"
1313 );
1314 }
1315 }
1316
1317 #[rstest]
1318 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1319 fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1320 let instrument_id = InstrumentId::from("ESM4.GLBX");
1321 let bars = loader
1322 .load_bars(&path, Some(instrument_id), None, Some(false))
1323 .unwrap();
1324
1325 assert_eq!(bars.len(), 2);
1326
1327 for bar in &bars {
1329 assert_ne!(
1330 bar.ts_event, bar.ts_init,
1331 "ts_event should be open time and ts_init should be close time when bars_timestamp_on_close=false"
1332 );
1333 assert_eq!(bar.ts_init.as_u64(), bar.ts_event.as_u64() + 1_000_000_000);
1335 }
1336 }
1337
1338 #[rstest]
1339 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
1340 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
1341 fn test_load_bars_timestamp_comparison(
1342 loader: DatabentoDataLoader,
1343 #[case] path: PathBuf,
1344 #[case] bar_index: usize,
1345 ) {
1346 const ONE_SECOND_NS: u64 = 1_000_000_000;
1347
1348 let instrument_id = InstrumentId::from("ESM4.GLBX");
1349
1350 let bars_close = loader
1351 .load_bars(&path, Some(instrument_id), None, Some(true))
1352 .unwrap();
1353
1354 let bars_open = loader
1355 .load_bars(&path, Some(instrument_id), None, Some(false))
1356 .unwrap();
1357
1358 assert_eq!(bars_close.len(), bars_open.len());
1359 assert_eq!(bars_close.len(), 2);
1360
1361 let bar_close = &bars_close[bar_index];
1362 let bar_open = &bars_open[bar_index];
1363
1364 assert_eq!(bar_close.open, bar_open.open);
1366 assert_eq!(bar_close.high, bar_open.high);
1367 assert_eq!(bar_close.low, bar_open.low);
1368 assert_eq!(bar_close.close, bar_open.close);
1369 assert_eq!(bar_close.volume, bar_open.volume);
1370
1371 assert!(
1374 bar_close.ts_event > bar_open.ts_event,
1375 "Close-timestamped bar should have later timestamp than open-timestamped bar"
1376 );
1377
1378 assert_eq!(
1380 bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
1381 ONE_SECOND_NS,
1382 "Timestamp difference should be exactly 1 second for 1s bars"
1383 );
1384 }
1385
1386 #[rstest]
1387 fn test_load_status_records(loader: DatabentoDataLoader) {
1388 let path = test_data_path().join("test_data.status.dbn.zst");
1389 let instrument_id = InstrumentId::from("ESM4.GLBX");
1390
1391 let statuses = loader
1392 .load_status_records::<dbn::StatusMsg>(&path, Some(instrument_id))
1393 .unwrap()
1394 .collect::<anyhow::Result<Vec<_>>>()
1395 .unwrap();
1396
1397 assert_eq!(statuses.len(), 4, "Should load exactly 4 status records");
1399
1400 let first = &statuses[0];
1402 assert_eq!(first.instrument_id, instrument_id);
1403 assert_eq!(first.ts_event.as_u64(), 1609110000000000000);
1404 assert_eq!(first.ts_init.as_u64(), 1609113600000000000);
1405 }
1406
1407 #[rstest]
1408 fn test_read_imbalance_records(loader: DatabentoDataLoader) {
1409 let path = test_data_path().join("test_data.imbalance.dbn.zst");
1410 let instrument_id = InstrumentId::from("ESM4.GLBX");
1411
1412 let imbalances = loader
1413 .read_imbalance_records::<dbn::ImbalanceMsg>(&path, Some(instrument_id), None)
1414 .unwrap()
1415 .collect::<anyhow::Result<Vec<_>>>()
1416 .unwrap();
1417
1418 assert_eq!(
1420 imbalances.len(),
1421 2,
1422 "Should load exactly 2 imbalance records"
1423 );
1424
1425 let first = &imbalances[0];
1427 assert_eq!(first.instrument_id, instrument_id);
1428 assert!(
1429 first.ref_price.as_f64() > 0.0,
1430 "ref_price should be positive"
1431 );
1432 assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1433 assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1434 assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1435 }
1436
1437 #[rstest]
1438 fn test_read_statistics_records(loader: DatabentoDataLoader) {
1439 let path = test_data_path().join("test_data.statistics.dbn.zst");
1440 let instrument_id = InstrumentId::from("ESM4.GLBX");
1441
1442 let statistics = loader
1443 .read_statistics_records::<dbn::StatMsg>(&path, Some(instrument_id), None)
1444 .unwrap()
1445 .collect::<anyhow::Result<Vec<_>>>()
1446 .unwrap();
1447
1448 assert_eq!(
1450 statistics.len(),
1451 2,
1452 "Should load exactly 2 statistics records"
1453 );
1454
1455 let first = &statistics[0];
1457 assert_eq!(first.instrument_id, instrument_id);
1458 assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1459 assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1460 assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1461 assert!(first.sequence > 0, "sequence should be positive");
1462 }
1463}