use std::{fmt::Debug, fs, num::NonZeroU64, path::PathBuf, str::FromStr, sync::Arc};
use databento::{
dbn::{self, decode::DbnMetadata},
historical::timeseries::GetRangeParams,
};
use indexmap::IndexMap;
use nautilus_core::{AtomicMap, UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
use nautilus_model::{
data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
enums::BarAggregation,
identifiers::{InstrumentId, Symbol, Venue},
instruments::{Instrument, InstrumentAny},
};
use crate::{
common::{Credential, get_date_time_range},
decode::{
decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg, decode_mbp10_msg,
decode_record, decode_statistics_msg, decode_status_msg, is_supported_stat_type,
},
symbology::{
MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
infer_symbology_type,
},
types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
};
#[derive(Clone)]
pub struct DatabentoHistoricalClient {
credential: Credential,
clock: &'static AtomicTime,
inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
price_precisions: Arc<AtomicMap<Symbol, u8>>,
use_exchange_as_venue: bool,
}
#[derive(Debug)]
pub struct RangeQueryParams {
pub dataset: String,
pub symbols: Vec<String>,
pub start: UnixNanos,
pub end: Option<UnixNanos>,
pub limit: Option<u64>,
pub price_precision: Option<u8>,
}
#[derive(Debug, Clone)]
pub struct DatasetRange {
pub start: String,
pub end: String,
}
impl Debug for DatabentoHistoricalClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(DatabentoHistoricalClient))
.field("credential", &self.credential)
.finish()
}
}
impl DatabentoHistoricalClient {
#[must_use]
pub fn api_key(&self) -> &str {
self.credential.api_key()
}
pub fn new(
credential: Credential,
publishers_filepath: PathBuf,
clock: &'static AtomicTime,
use_exchange_as_venue: bool,
) -> anyhow::Result<Self> {
let client = databento::HistoricalClient::builder()
.user_agent_extension(NAUTILUS_USER_AGENT.into())
.key(credential.api_key())
.map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
.build()
.map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
let file_content = fs::read_to_string(publishers_filepath)?;
let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
let publisher_venue_map = publishers_vec
.into_iter()
.map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
.collect::<IndexMap<u16, Venue>>();
Ok(Self {
clock,
inner: Arc::new(tokio::sync::Mutex::new(client)),
publisher_venue_map: Arc::new(publisher_venue_map),
symbol_venue_map: Arc::new(AtomicMap::new()),
price_precisions: Arc::new(AtomicMap::new()),
credential,
use_exchange_as_venue,
})
}
pub fn set_price_precision(&self, symbol: Symbol, price_precision: u8) {
self.price_precisions.insert(symbol, price_precision);
}
fn resolve_price_precision(
&self,
instrument_id: &InstrumentId,
price_precision: Option<u8>,
) -> anyhow::Result<u8> {
if let Some(precision) = price_precision {
return Ok(precision);
}
let precisions = self.price_precisions.load();
precisions
.get(&instrument_id.symbol)
.copied()
.ok_or_else(|| {
anyhow::anyhow!(
"Could not resolve `price_precision` for {instrument_id}: \
pass `price_precision` explicitly, call `set_price_precision`, \
or fetch the instrument definitions first via `get_range_instruments`"
)
})
}
pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
let mut client = self.inner.lock().await;
let response = client
.metadata()
.get_dataset_range(dataset)
.await
.map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
Ok(DatasetRange {
start: response.start.to_string(),
end: response.end.to_string(),
})
}
pub async fn get_range_instruments(
&self,
params: RangeQueryParams,
) -> anyhow::Result<Vec<InstrumentAny>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Definition)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut instruments = Vec::new();
while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
let record = dbn::RecordRef::from(msg);
let sym_map = self.symbol_venue_map.load();
let mut instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
let exchange = msg
.exchange()
.map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
let venue = Venue::from_code(exchange)
.map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
instrument_id.venue = venue;
}
match decode_instrument_def_msg(msg, instrument_id, None) {
Ok(Some(instrument)) => instruments.push(instrument),
Ok(None) => {} Err(e) => log::error!("Failed to decode instrument: {e:?}"),
}
}
for instrument in &instruments {
self.price_precisions
.insert(instrument.id().symbol, instrument.price_precision());
}
Ok(instruments)
}
pub async fn get_range_quotes(
&self,
params: RangeQueryParams,
schema: Option<String>,
) -> anyhow::Result<Vec<QuoteTick>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
let dbn_schema = dbn::Schema::from_str(&schema)?;
match dbn_schema {
dbn::Schema::Mbp1
| dbn::Schema::Bbo1S
| dbn::Schema::Bbo1M
| dbn::Schema::Cmbp1
| dbn::Schema::Cbbo1S
| dbn::Schema::Cbbo1M => (),
_ => anyhow::bail!(
"Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m, cmbp-1, cbbo-1s, cbbo-1m"
),
}
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn_schema)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let price_precision_arg = params.price_precision;
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<QuoteTick> = Vec::new();
let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
let sym_map = self.symbol_venue_map.load();
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
let price_precision =
self.resolve_price_precision(&instrument_id, price_precision_arg)?;
let (data, _) = decode_record(
&record,
instrument_id,
price_precision,
None,
false, true,
)?;
match data {
Some(Data::Quote(quote)) => result.push(quote),
None => {} _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
}
Ok(())
};
match dbn_schema {
dbn::Schema::Mbp1 => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
dbn::Schema::Cmbp1 => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Cmbp1Msg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
dbn::Schema::Bbo1M => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
dbn::Schema::Bbo1S => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
dbn::Schema::Cbbo1S | dbn::Schema::Cbbo1M => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::CbboMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
_ => anyhow::bail!("Invalid schema {dbn_schema}"),
}
Ok(result)
}
pub async fn get_range_order_book_depth10(
&self,
params: RangeQueryParams,
depth: Option<usize>,
) -> anyhow::Result<Vec<OrderBookDepth10>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let _depth = depth.unwrap_or(10);
if _depth != 10 {
anyhow::bail!("Only depth=10 is currently supported for order book depths");
}
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Mbp10)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let price_precision_arg = params.price_precision;
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<OrderBookDepth10> = Vec::new();
let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
let sym_map = self.symbol_venue_map.load();
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
let price_precision =
self.resolve_price_precision(&instrument_id, price_precision_arg)?;
if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
result.push(depth);
}
Ok(())
};
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
Ok(result)
}
pub async fn get_range_order_book_deltas(
&self,
params: RangeQueryParams,
) -> anyhow::Result<Vec<OrderBookDelta>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Mbo)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let price_precision_arg = params.price_precision;
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<OrderBookDelta> = Vec::new();
let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
let sym_map = self.symbol_venue_map.load();
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
let price_precision =
self.resolve_price_precision(&instrument_id, price_precision_arg)?;
if let Some(msg) = record.get::<dbn::MboMsg>() {
let (delta, _trade) =
decode_mbo_msg(msg, instrument_id, price_precision, None, false)?;
if let Some(delta) = delta {
result.push(delta);
}
}
Ok(())
};
while let Ok(Some(msg)) = decoder.decode_record::<dbn::MboMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
Ok(result)
}
pub async fn get_range_trades(
&self,
params: RangeQueryParams,
) -> anyhow::Result<Vec<TradeTick>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Trades)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let price_precision_arg = params.price_precision;
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<TradeTick> = Vec::new();
while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
let record = dbn::RecordRef::from(msg);
let sym_map = self.symbol_venue_map.load();
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
let price_precision =
self.resolve_price_precision(&instrument_id, price_precision_arg)?;
let (data, _) = decode_record(
&record,
instrument_id,
price_precision,
None,
false, true,
)?;
match data {
Some(Data::Trade(trade)) => {
result.push(trade);
}
_ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
}
}
Ok(result)
}
pub async fn get_range_bars(
&self,
params: RangeQueryParams,
aggregation: BarAggregation,
timestamp_on_close: bool,
) -> anyhow::Result<Vec<Bar>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let schema = match aggregation {
BarAggregation::Second => dbn::Schema::Ohlcv1S,
BarAggregation::Minute => dbn::Schema::Ohlcv1M,
BarAggregation::Hour => dbn::Schema::Ohlcv1H,
BarAggregation::Day => dbn::Schema::Ohlcv1D,
_ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
};
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(schema)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let price_precision_arg = params.price_precision;
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<Bar> = Vec::new();
while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
let record = dbn::RecordRef::from(msg);
let sym_map = self.symbol_venue_map.load();
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
let price_precision =
self.resolve_price_precision(&instrument_id, price_precision_arg)?;
let (data, _) = decode_record(
&record,
instrument_id,
price_precision,
None,
false, timestamp_on_close,
)?;
match data {
Some(Data::Bar(bar)) => {
result.push(bar);
}
_ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
}
}
Ok(result)
}
pub async fn get_range_imbalance(
&self,
params: RangeQueryParams,
) -> anyhow::Result<Vec<DatabentoImbalance>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Imbalance)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let price_precision_arg = params.price_precision;
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<DatabentoImbalance> = Vec::new();
while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
let record = dbn::RecordRef::from(msg);
let sym_map = self.symbol_venue_map.load();
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
let price_precision =
self.resolve_price_precision(&instrument_id, price_precision_arg)?;
let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
result.push(imbalance);
}
Ok(result)
}
pub async fn get_range_statistics(
&self,
params: RangeQueryParams,
) -> anyhow::Result<Vec<DatabentoStatistics>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Statistics)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let price_precision_arg = params.price_precision;
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<DatabentoStatistics> = Vec::new();
while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
if !is_supported_stat_type(msg.stat_type) {
log::warn!("Skipping unsupported `stat_type` {}", msg.stat_type);
continue;
}
let record = dbn::RecordRef::from(msg);
let sym_map = self.symbol_venue_map.load();
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
let price_precision =
self.resolve_price_precision(&instrument_id, price_precision_arg)?;
if let Some(statistics) =
decode_statistics_msg(msg, instrument_id, price_precision, None)?
{
result.push(statistics);
}
}
Ok(result)
}
pub async fn get_range_status(
&self,
params: RangeQueryParams,
) -> anyhow::Result<Vec<InstrumentStatus>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;
let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;
let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Status)
.maybe_limit(params.limit.and_then(NonZeroU64::new))
.build();
let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<InstrumentStatus> = Vec::new();
while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
let record = dbn::RecordRef::from(msg);
let sym_map = self.symbol_venue_map.load();
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;
let status = decode_status_msg(msg, instrument_id, None)?;
result.push(status);
}
Ok(result)
}
pub fn prepare_symbols_from_instrument_ids(
&self,
instrument_ids: &[InstrumentId],
) -> Vec<String> {
self.symbol_venue_map.rcu(|m| {
for id in instrument_ids {
m.entry(id.symbol).or_insert(id.venue);
}
});
instrument_ids
.iter()
.map(|id| id.symbol.to_string())
.collect()
}
}
#[cfg(test)]
mod tests {
use nautilus_core::time::get_atomic_clock_realtime;
use rstest::{fixture, rstest};
use super::*;
fn test_api_key() -> String {
"test-000000000000000000000000000".to_string()
}
fn publishers_path() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json")
}
#[fixture]
fn historical_client() -> DatabentoHistoricalClient {
DatabentoHistoricalClient::new(
Credential::new(test_api_key()),
publishers_path(),
get_atomic_clock_realtime(),
false,
)
.unwrap()
}
#[rstest]
fn test_set_price_precision_inserts_into_cache(historical_client: DatabentoHistoricalClient) {
let symbol = Symbol::from("ESM4");
historical_client.set_price_precision(symbol, 2);
let precisions = historical_client.price_precisions.load();
assert_eq!(precisions.get(&symbol), Some(&2));
}
#[rstest]
fn test_resolve_price_precision_explicit_arg(historical_client: DatabentoHistoricalClient) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
historical_client.set_price_precision(Symbol::from("ESM4"), 9);
let precision = historical_client
.resolve_price_precision(&instrument_id, Some(2))
.unwrap();
assert_eq!(precision, 2);
}
#[rstest]
fn test_resolve_price_precision_cache_hit(historical_client: DatabentoHistoricalClient) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
historical_client.set_price_precision(Symbol::from("ESM4"), 2);
let precision = historical_client
.resolve_price_precision(&instrument_id, None)
.unwrap();
assert_eq!(precision, 2);
}
#[rstest]
fn test_resolve_price_precision_cache_miss_errors(
historical_client: DatabentoHistoricalClient,
) {
let instrument_id = InstrumentId::from("ESM4.GLBX");
let err = historical_client
.resolve_price_precision(&instrument_id, None)
.expect_err("expected cache-miss error");
let err_msg = format!("{err}");
assert!(
err_msg.contains("Could not resolve `price_precision`"),
"unexpected error message: {err_msg}",
);
assert!(
err_msg.contains("ESM4.GLBX"),
"error should name the instrument: {err_msg}",
);
}
}