use std::{
env, fs,
path::{Path, PathBuf},
};
use ahash::AHashMap;
use anyhow::Context;
use databento::dbn::{self, InstrumentDefMsg};
use dbn::{
Publisher,
decode::{DbnMetadata, DecodeStream, dbn::Decoder},
};
use fallible_streaming_iterator::FallibleStreamingIterator;
use indexmap::IndexMap;
use nautilus_model::{
data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
identifiers::{InstrumentId, Symbol, Venue},
instruments::{Instrument, InstrumentAny},
};
use super::{
decode::{
decode_imbalance_msg, decode_record, decode_statistics_msg, decode_status_msg,
is_supported_stat_type,
},
symbology::decode_nautilus_instrument_id,
types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
};
use crate::{decode::decode_instrument_def_msg, symbology::MetadataCache};
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
)]
#[derive(Debug)]
pub struct DatabentoDataLoader {
publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
venue_dataset_map: IndexMap<Venue, Dataset>,
publisher_venue_map: IndexMap<PublisherId, Venue>,
symbol_venue_map: AHashMap<Symbol, Venue>,
price_precisions: AHashMap<Symbol, u8>,
}
impl DatabentoDataLoader {
pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
let mut loader = Self {
publishers_map: IndexMap::new(),
venue_dataset_map: IndexMap::new(),
publisher_venue_map: IndexMap::new(),
symbol_venue_map: AHashMap::new(),
price_precisions: AHashMap::new(),
};
let publishers_filepath = if let Some(p) = publishers_filepath {
p
} else {
let mut exe_path = env::current_exe()?;
exe_path.pop();
exe_path.push("publishers.json");
exe_path
};
loader
.load_publishers(publishers_filepath)
.context("error loading publishers.json")?;
Ok(loader)
}
pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
let file_content = fs::read_to_string(filepath)?;
let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
self.publishers_map = publishers
.clone()
.into_iter()
.map(|p| (p.publisher_id, p))
.collect();
let mut venue_dataset_map = IndexMap::new();
for publisher in &publishers {
let venue = Venue::from(publisher.venue.as_str());
let dataset = Dataset::from(publisher.dataset.as_str());
venue_dataset_map.entry(venue).or_insert(dataset);
}
self.venue_dataset_map = venue_dataset_map;
apply_default_venue_dataset_mappings(&mut self.venue_dataset_map);
self.publisher_venue_map = publishers
.into_iter()
.map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
.collect();
Ok(())
}
#[must_use]
pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
&self.publishers_map
}
pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
_ = self.venue_dataset_map.insert(venue, dataset);
}
#[must_use]
pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
self.venue_dataset_map.get(venue)
}
#[must_use]
pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
self.publisher_venue_map.get(&publisher_id)
}
pub fn set_price_precision(&mut self, symbol: Symbol, price_precision: u8) {
self.price_precisions.insert(symbol, price_precision);
}
#[must_use]
pub const fn get_price_precisions(&self) -> &AHashMap<Symbol, u8> {
&self.price_precisions
}
fn resolve_price_precision(
&self,
instrument_id: &InstrumentId,
price_precision: Option<u8>,
) -> anyhow::Result<u8> {
if let Some(precision) = price_precision {
return Ok(precision);
}
self.price_precisions
.get(&instrument_id.symbol)
.copied()
.ok_or_else(|| {
anyhow::anyhow!(
"Could not resolve `price_precision` for {instrument_id}: \
pass `price_precision` explicitly, call `set_price_precision`, \
or load the instrument definitions first via `load_instruments`"
)
})
}
pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
let decoder = Decoder::from_zstd_file(filepath)?;
let metadata = decoder.metadata();
Ok(metadata.schema.map(|schema| schema.to_string()))
}
pub fn read_definition_records(
&mut self,
filepath: &Path,
use_exchange_as_venue: bool,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
let decoder = Decoder::from_zstd_file(filepath)?;
let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsg>();
Ok(std::iter::from_fn(move || {
loop {
let advance = dbn_stream
.advance()
.map_err(|e| anyhow::anyhow!("Stream advance error: {e}"));
if let Err(e) = advance {
return Some(Err(e));
}
let rec = dbn_stream.get()?;
let result: anyhow::Result<Option<InstrumentAny>> = (|| {
let record = dbn::RecordRef::from(rec);
let msg = record
.get::<InstrumentDefMsg>()
.ok_or_else(|| anyhow::anyhow!("Failed to decode InstrumentDefMsg"))?;
let raw_symbol = rec
.raw_symbol()
.map_err(|e| anyhow::anyhow!("Error decoding `raw_symbol`: {e}"))?;
let symbol = Symbol::from(raw_symbol);
let publisher = rec
.hd
.publisher()
.map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
let venue = match publisher {
Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
let exchange = rec.exchange().map_err(|e| {
anyhow::anyhow!("Missing `exchange` for record: {e}")
})?;
let venue = Venue::from_code(exchange).map_err(|e| {
anyhow::anyhow!("Venue not found for exchange {exchange}: {e}")
})?;
self.symbol_venue_map.insert(symbol, venue);
venue
}
_ => *self
.publisher_venue_map
.get(&msg.hd.publisher_id)
.ok_or_else(|| {
anyhow::anyhow!(
"Venue not found for publisher_id {}",
msg.hd.publisher_id
)
})?,
};
let instrument_id = InstrumentId::new(symbol, venue);
let ts_init = msg.ts_recv.into();
decode_instrument_def_msg(rec, instrument_id, Some(ts_init))
})();
match result {
Ok(Some(item)) => return Some(Ok(item)),
Ok(None) => {}
Err(e) => return Some(Err(e)),
}
}
}))
}
pub fn read_records<T>(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
include_trades: bool,
bars_timestamp_on_close: Option<bool>,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
where
T: dbn::Record + dbn::HasRType + 'static,
{
let decoder = Decoder::from_zstd_file(filepath)?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut dbn_stream = decoder.decode_stream::<T>();
Ok(std::iter::from_fn(move || {
let result: anyhow::Result<Option<(Option<Data>, Option<Data>)>> = (|| {
dbn_stream
.advance()
.map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
if let Some(rec) = dbn_stream.get() {
let record = dbn::RecordRef::from(rec);
let instrument_id = if let Some(id) = &instrument_id {
*id
} else {
decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&self.symbol_venue_map,
)
.context("failed to decode instrument id")?
};
let resolved_precision =
self.resolve_price_precision(&instrument_id, price_precision)?;
let (item1, item2) = decode_record(
&record,
instrument_id,
resolved_precision,
None,
include_trades,
bars_timestamp_on_close.unwrap_or(true),
)?;
Ok(Some((item1, item2)))
} else {
Ok(None)
}
})();
match result {
Ok(Some(v)) => Some(Ok(v)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}))
}
pub fn load_instruments(
&mut self,
filepath: &Path,
use_exchange_as_venue: bool,
skip_on_error: bool,
) -> anyhow::Result<Vec<InstrumentAny>> {
let instruments = if skip_on_error {
let mut collected = Vec::new();
for result in self.read_definition_records(filepath, use_exchange_as_venue)? {
match result {
Ok(instrument) => collected.push(instrument),
Err(e) => log::warn!("Skipping instrument: {e}"),
}
}
collected
} else {
self.read_definition_records(filepath, use_exchange_as_venue)?
.collect::<Result<Vec<_>, _>>()?
};
for instrument in &instruments {
self.price_precisions
.insert(instrument.id().symbol, instrument.price_precision());
}
Ok(instruments)
}
pub fn load_order_book_deltas(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<OrderBookDelta>> {
self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Delta(delta) = item1 {
Some(Ok(delta))
} else {
None
}
}
Ok((None, _)) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_order_book_depth10(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<OrderBookDepth10>> {
self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Depth10(depth) = item1 {
Some(Ok(*depth))
} else {
None
}
}
Ok((None, _)) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_quotes(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<QuoteTick>> {
self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Quote(quote) = item1 {
Some(Ok(quote))
} else {
None
}
}
Ok((None, _)) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_bbo_quotes(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<QuoteTick>> {
self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Quote(quote) = item1 {
Some(Ok(quote))
} else {
None
}
}
Ok((None, _)) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_cmbp_quotes(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<QuoteTick>> {
self.read_records::<dbn::Cmbp1Msg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Quote(quote) = item1 {
Some(Ok(quote))
} else {
None
}
}
Ok((None, _)) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_cbbo_quotes(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<QuoteTick>> {
self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Quote(quote) = item1 {
Some(Ok(quote))
} else {
None
}
}
Ok((None, _)) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_tbbo_trades(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<TradeTick>> {
self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((_, maybe_item2)) => {
if let Some(Data::Trade(trade)) = maybe_item2 {
Some(Ok(trade))
} else {
None
}
}
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_tcbbo_trades(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<TradeTick>> {
self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((_, maybe_item2)) => {
if let Some(Data::Trade(trade)) = maybe_item2 {
Some(Ok(trade))
} else {
None
}
}
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_trades(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<TradeTick>> {
self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Trade(trade) = item1 {
Some(Ok(trade))
} else {
None
}
}
Ok((None, _)) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_bars(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
timestamp_on_close: Option<bool>,
) -> anyhow::Result<Vec<Bar>> {
self.read_records::<dbn::OhlcvMsg>(
filepath,
instrument_id,
price_precision,
false,
timestamp_on_close,
)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Bar(bar) = item1 {
Some(Ok(bar))
} else {
None
}
}
Ok((None, _)) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub fn load_status_records<T>(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
where
T: dbn::Record + dbn::HasRType + 'static,
{
let decoder = Decoder::from_zstd_file(filepath)?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut dbn_stream = decoder.decode_stream::<T>();
Ok(std::iter::from_fn(move || {
if let Err(e) = dbn_stream.advance() {
return Some(Err(e.into()));
}
match dbn_stream.get() {
Some(rec) => {
let record = dbn::RecordRef::from(rec);
let instrument_id = match &instrument_id {
Some(id) => *id, None => match decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&self.symbol_venue_map,
) {
Ok(id) => id,
Err(e) => return Some(Err(e)),
},
};
let msg = match record.get::<dbn::StatusMsg>() {
Some(m) => m,
None => return Some(Err(anyhow::anyhow!("Invalid `StatusMsg`"))),
};
let ts_init = msg.ts_recv.into();
match decode_status_msg(msg, instrument_id, Some(ts_init)) {
Ok(data) => Some(Ok(data)),
Err(e) => Some(Err(e)),
}
}
None => None,
}
}))
}
pub fn read_imbalance_records<T>(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
where
T: dbn::Record + dbn::HasRType + 'static,
{
let decoder = Decoder::from_zstd_file(filepath)?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut dbn_stream = decoder.decode_stream::<T>();
Ok(std::iter::from_fn(move || {
if let Err(e) = dbn_stream.advance() {
return Some(Err(e.into()));
}
match dbn_stream.get() {
Some(rec) => {
let record = dbn::RecordRef::from(rec);
let instrument_id = match &instrument_id {
Some(id) => *id, None => match decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&self.symbol_venue_map,
) {
Ok(id) => id,
Err(e) => return Some(Err(e)),
},
};
let resolved_precision =
match self.resolve_price_precision(&instrument_id, price_precision) {
Ok(p) => p,
Err(e) => return Some(Err(e)),
};
let msg = match record.get::<dbn::ImbalanceMsg>() {
Some(m) => m,
None => return Some(Err(anyhow::anyhow!("Invalid `ImbalanceMsg`"))),
};
let ts_init = msg.ts_recv.into();
match decode_imbalance_msg(
msg,
instrument_id,
resolved_precision,
Some(ts_init),
) {
Ok(data) => Some(Ok(data)),
Err(e) => Some(Err(e)),
}
}
None => None,
}
}))
}
pub fn read_statistics_records<T>(
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
where
T: dbn::Record + dbn::HasRType + 'static,
{
let decoder = Decoder::from_zstd_file(filepath)?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut dbn_stream = decoder.decode_stream::<T>();
Ok(std::iter::from_fn(move || {
loop {
if let Err(e) = dbn_stream.advance() {
return Some(Err(e.into()));
}
let rec = dbn_stream.get()?;
let record = dbn::RecordRef::from(rec);
let msg = match record.get::<dbn::StatMsg>() {
Some(m) => m,
None => return Some(Err(anyhow::anyhow!("Invalid `StatMsg`"))),
};
if !is_supported_stat_type(msg.stat_type) {
log::warn!("Skipping unsupported `stat_type` {}", msg.stat_type);
continue;
}
let instrument_id = match &instrument_id {
Some(id) => *id, None => match decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&self.symbol_venue_map,
) {
Ok(id) => id,
Err(e) => return Some(Err(e)),
},
};
let resolved_precision =
match self.resolve_price_precision(&instrument_id, price_precision) {
Ok(p) => p,
Err(e) => return Some(Err(e)),
};
let ts_init = msg.ts_recv.into();
match decode_statistics_msg(msg, instrument_id, resolved_precision, Some(ts_init)) {
Ok(Some(data)) => return Some(Ok(data)),
Ok(None) => {}
Err(e) => return Some(Err(e)),
}
}
}))
}
}
fn apply_default_venue_dataset_mappings(venue_dataset_map: &mut IndexMap<Venue, Dataset>) {
let glbx = Dataset::from("GLBX.MDP3");
for venue in [
Venue::CBCM(),
Venue::GLBX(),
Venue::NYUM(),
Venue::XCBT(),
Venue::XCEC(),
Venue::XCME(),
Venue::XFXS(),
Venue::XNYM(),
] {
_ = venue_dataset_map.insert(venue, glbx);
}
let opra = Dataset::from("OPRA.PILLAR");
for venue_code in [
"AMXO", "XBOX", "XCBO", "EMLD", "EDGO", "GMNI", "XISX", "MCRY", "XMIO", "ARCO", "OPRA",
"MPRL", "XNDQ", "XBXO", "C2OX", "XPHL", "BATO", "MXOP", "SPHR",
] {
_ = venue_dataset_map.insert(Venue::from(venue_code), opra);
}
}
#[cfg(test)]
mod tests {
use std::path::{Path, PathBuf};
use nautilus_model::types::{Price, Quantity};
use rstest::{fixture, rstest};
use ustr::Ustr;
use super::*;
fn test_data_path() -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
}
#[fixture]
fn loader() -> DatabentoDataLoader {
let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
let mut loader = DatabentoDataLoader::new(Some(publishers_filepath)).unwrap();
loader.set_price_precision(Symbol::from("ESM4"), 2);
loader
}
#[fixture]
fn loader_without_seed() -> DatabentoDataLoader {
let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
}
#[rstest]
fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
let dataset = Ustr::from("EQUS.PLUS");
let venue = Venue::from("XNAS");
loader.set_dataset_for_venue(dataset, venue);
let result = loader.get_dataset_for_venue(&venue).unwrap();
assert_eq!(*result, dataset);
}
#[rstest]
fn test_default_venue_dataset_mappings(loader: DatabentoDataLoader) {
let xcme = Venue::XCME();
let result = loader.get_dataset_for_venue(&xcme).unwrap();
assert_eq!(*result, Ustr::from("GLBX.MDP3"));
let xcbo = Venue::from("XCBO");
let result = loader.get_dataset_for_venue(&xcbo).unwrap();
assert_eq!(*result, Ustr::from("OPRA.PILLAR"));
}
#[rstest]
#[case(test_data_path().join("test_data.definition.dbn.zst"))]
fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
let instruments = loader.load_instruments(&path, false, false).unwrap();
assert_eq!(instruments.len(), 2);
assert_eq!(
loader.get_price_precisions().get(&Symbol::from("ESM4")),
Some(&2)
);
}
#[rstest]
fn test_load_instruments_populates_price_precisions_cache(
mut loader_without_seed: DatabentoDataLoader,
) {
let path = test_data_path().join("test_data.definition.dbn.zst");
assert!(loader_without_seed.get_price_precisions().is_empty());
let instruments = loader_without_seed
.load_instruments(&path, false, false)
.unwrap();
assert_eq!(instruments.len(), 2);
for instrument in &instruments {
let symbol = instrument.id().symbol;
assert_eq!(
loader_without_seed.get_price_precisions().get(&symbol),
Some(&instrument.price_precision()),
"cache missing or mismatched entry for {symbol}",
);
}
}
#[rstest]
fn test_read_records_errors_when_precision_unresolvable(
loader_without_seed: DatabentoDataLoader,
) {
let path = test_data_path().join("test_data.mbo.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let result = loader_without_seed.load_order_book_deltas(&path, Some(instrument_id), None);
let err = result.expect_err("expected precision-resolution error");
let err_msg = format!("{err}");
assert!(
err_msg.contains("Could not resolve `price_precision`"),
"unexpected error message: {err_msg}",
);
assert!(
err_msg.contains("ESM4.GLBX"),
"error should name the instrument: {err_msg}",
);
}
#[rstest]
fn test_set_price_precision_unblocks_reads(mut loader_without_seed: DatabentoDataLoader) {
let path = test_data_path().join("test_data.mbo.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
assert!(
loader_without_seed
.load_order_book_deltas(&path, Some(instrument_id), None)
.is_err()
);
loader_without_seed.set_price_precision(Symbol::from("ESM4"), 2);
let deltas = loader_without_seed
.load_order_book_deltas(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(deltas.len(), 2);
}
#[rstest]
fn test_resolve_price_precision_explicit_arg_overrides_cache(
mut loader_without_seed: DatabentoDataLoader,
) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
loader_without_seed.set_price_precision(Symbol::from("ESM4"), 9);
let explicit = loader_without_seed
.resolve_price_precision(&instrument_id, Some(2))
.unwrap();
assert_eq!(explicit, 2);
let cached = loader_without_seed
.resolve_price_precision(&instrument_id, None)
.unwrap();
assert_eq!(cached, 9);
}
#[rstest]
fn test_resolve_price_precision_cache_miss_errors(loader_without_seed: DatabentoDataLoader) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
let err = loader_without_seed
.resolve_price_precision(&instrument_id, None)
.expect_err("expected cache-miss error");
assert!(format!("{err}").contains("Could not resolve `price_precision`"));
}
#[rstest]
fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.mbo.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let deltas = loader
.load_order_book_deltas(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(deltas.len(), 2);
}
#[rstest]
fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.mbp-10.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let depths = loader
.load_order_book_depth10(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(depths.len(), 2);
}
#[rstest]
fn test_load_quotes(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.mbp-1.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let quotes = loader
.load_quotes(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(quotes.len(), 2);
}
#[rstest]
#[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
#[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
let quotes = loader
.load_bbo_quotes(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(quotes.len(), 4);
}
#[rstest]
fn test_load_cmbp_quotes(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let quotes = loader
.load_cmbp_quotes(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(quotes.len(), 2);
let first_quote = "es[0];
assert_eq!(first_quote.instrument_id, instrument_id);
assert_eq!(first_quote.bid_price, Price::from("3720.25"));
assert_eq!(first_quote.ask_price, Price::from("3720.50"));
assert_eq!(first_quote.bid_size, Quantity::from(24));
assert_eq!(first_quote.ask_size, Quantity::from(11));
assert_eq!(first_quote.ts_event, 1609160400006136329);
assert_eq!(first_quote.ts_init, 1609160400006136329);
}
#[rstest]
fn test_load_cbbo_quotes(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let quotes = loader
.load_cbbo_quotes(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(quotes.len(), 2);
let first_quote = "es[0];
assert_eq!(first_quote.instrument_id, instrument_id);
assert_eq!(first_quote.bid_price, Price::from("3720.25"));
assert_eq!(first_quote.ask_price, Price::from("3720.50"));
assert_eq!(first_quote.bid_size, Quantity::from(24));
assert_eq!(first_quote.ask_size, Quantity::from(11));
assert_eq!(first_quote.ts_event, 1609160400006136329);
assert_eq!(first_quote.ts_init, 1609160400006136329);
}
#[rstest]
fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.tbbo.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let trades = loader
.load_tbbo_trades(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(trades.len(), 0);
}
#[rstest]
fn test_load_tcbbo_trades(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let result = loader.load_tcbbo_trades(&path, Some(instrument_id), None);
assert!(result.is_ok());
let trades = result.unwrap();
assert_eq!(trades.len(), 2);
}
#[rstest]
fn test_load_trades(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.trades.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let trades = loader
.load_trades(&path, Some(instrument_id), None)
.unwrap();
assert_eq!(trades.len(), 2);
}
#[rstest]
#[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
#[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
let bars = loader
.load_bars(&path, Some(instrument_id), None, None)
.unwrap();
assert_eq!(bars.len(), 2);
}
#[rstest]
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
let bars = loader
.load_bars(&path, Some(instrument_id), None, Some(true))
.unwrap();
assert_eq!(bars.len(), 2);
for bar in &bars {
assert_eq!(
bar.ts_event, bar.ts_init,
"ts_event and ts_init should both be close time when bars_timestamp_on_close=true"
);
}
}
#[rstest]
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
let bars = loader
.load_bars(&path, Some(instrument_id), None, Some(false))
.unwrap();
assert_eq!(bars.len(), 2);
for bar in &bars {
assert_ne!(
bar.ts_event, bar.ts_init,
"ts_event should be open time and ts_init should be close time when bars_timestamp_on_close=false"
);
assert_eq!(bar.ts_init.as_u64(), bar.ts_event.as_u64() + 1_000_000_000);
}
}
#[rstest]
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
fn test_load_bars_timestamp_comparison(
loader: DatabentoDataLoader,
#[case] path: PathBuf,
#[case] bar_index: usize,
) {
const ONE_SECOND_NS: u64 = 1_000_000_000;
let instrument_id = InstrumentId::from("ESM4.GLBX");
let bars_close = loader
.load_bars(&path, Some(instrument_id), None, Some(true))
.unwrap();
let bars_open = loader
.load_bars(&path, Some(instrument_id), None, Some(false))
.unwrap();
assert_eq!(bars_close.len(), bars_open.len());
assert_eq!(bars_close.len(), 2);
let bar_close = &bars_close[bar_index];
let bar_open = &bars_open[bar_index];
assert_eq!(bar_close.open, bar_open.open);
assert_eq!(bar_close.high, bar_open.high);
assert_eq!(bar_close.low, bar_open.low);
assert_eq!(bar_close.close, bar_open.close);
assert_eq!(bar_close.volume, bar_open.volume);
assert!(
bar_close.ts_event > bar_open.ts_event,
"Close-timestamped bar should have later timestamp than open-timestamped bar"
);
assert_eq!(
bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
ONE_SECOND_NS,
"Timestamp difference should be exactly 1 second for 1s bars"
);
}
#[rstest]
fn test_load_status_records(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.status.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let statuses = loader
.load_status_records::<dbn::StatusMsg>(&path, Some(instrument_id))
.unwrap()
.collect::<anyhow::Result<Vec<_>>>()
.unwrap();
assert_eq!(statuses.len(), 4, "Should load exactly 4 status records");
let first = &statuses[0];
assert_eq!(first.instrument_id, instrument_id);
assert_eq!(first.ts_event.as_u64(), 1609110000000000000);
assert_eq!(first.ts_init.as_u64(), 1609113600000000000);
}
#[rstest]
fn test_read_imbalance_records(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.imbalance.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let imbalances = loader
.read_imbalance_records::<dbn::ImbalanceMsg>(&path, Some(instrument_id), None)
.unwrap()
.collect::<anyhow::Result<Vec<_>>>()
.unwrap();
assert_eq!(
imbalances.len(),
2,
"Should load exactly 2 imbalance records"
);
let first = &imbalances[0];
assert_eq!(first.instrument_id, instrument_id);
assert!(
first.ref_price.as_f64() > 0.0,
"ref_price should be positive"
);
assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
}
#[rstest]
fn test_read_statistics_records(loader: DatabentoDataLoader) {
let path = test_data_path().join("test_data.statistics.dbn.zst");
let instrument_id = InstrumentId::from("ESM4.GLBX");
let statistics = loader
.read_statistics_records::<dbn::StatMsg>(&path, Some(instrument_id), None)
.unwrap()
.collect::<anyhow::Result<Vec<_>>>()
.unwrap();
assert_eq!(
statistics.len(),
2,
"Should load exactly 2 statistics records"
);
let first = &statistics[0];
assert_eq!(first.instrument_id, instrument_id);
assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
assert!(first.sequence > 0, "sequence should be positive");
}
}