use super::error::{DatabentoResultExt, decode_error};
use super::transformer::{dbn_mbp1_to_quote, dbn_trade_to_public_trade};
use crate::error::DataError;
use crate::event::MarketEvent;
use crate::subscription::{quote::Quote, trade::PublicTrade};
use chrono::Utc;
use databento::HistoricalClient;
use databento::dbn::decode::DynDecoder;
use databento::dbn::enums::VersionUpgradePolicy;
use databento::dbn::{self, decode::DecodeRecord};
use databento::historical::timeseries::GetRangeParams;
use futures::Stream;
use rustrade_instrument::exchange::ExchangeId;
use std::path::Path;
use tracing::{debug, info};
#[derive(Debug)]
pub struct DatabentoHistorical {
client: HistoricalClient,
}
impl DatabentoHistorical {
pub fn from_env() -> Result<Self, DataError> {
debug!("Creating Databento historical client from env");
let client = HistoricalClient::builder()
.key_from_env()
.with_context("reading API key from env")?
.build()
.with_context("building historical client")?;
Ok(Self { client })
}
pub fn new(api_key: &str) -> Result<Self, DataError> {
debug!("Creating Databento historical client");
let client = HistoricalClient::builder()
.key(api_key)
.with_context("setting API key")?
.build()
.with_context("building historical client")?;
Ok(Self { client })
}
pub async fn fetch_trades<K: Clone>(
&mut self,
params: &GetRangeParams,
exchange: ExchangeId,
instrument: K,
) -> Result<Vec<MarketEvent<K, PublicTrade>>, DataError> {
debug!(?params, "Fetching historical trades from Databento");
let mut decoder = self
.client
.timeseries()
.get_range(params)
.await
.with_context("fetching trades")?;
let mut trades = Vec::with_capacity(4096);
while let Some(record) = decoder
.decode_record::<dbn::TradeMsg>()
.await
.with_context("decoding trade record")?
{
match dbn_trade_to_public_trade(record) {
Ok((time_exchange, trade)) => {
trades.push(MarketEvent {
time_exchange,
time_received: Utc::now(),
exchange,
instrument: instrument.clone(),
kind: trade,
});
}
Err(e) => {
debug!(error = %e, "Skipping invalid trade record");
}
}
}
info!(count = trades.len(), "Fetched historical trades");
Ok(trades)
}
pub async fn fetch_quotes<K: Clone>(
&mut self,
params: &GetRangeParams,
exchange: ExchangeId,
instrument: K,
) -> Result<Vec<MarketEvent<K, Quote>>, DataError> {
debug!(?params, "Fetching historical quotes from Databento");
let mut decoder = self
.client
.timeseries()
.get_range(params)
.await
.with_context("fetching quotes")?;
let mut quotes = Vec::with_capacity(4096);
while let Some(record) = decoder
.decode_record::<dbn::Mbp1Msg>()
.await
.with_context("decoding quote record")?
{
match dbn_mbp1_to_quote(record) {
Ok((time_exchange, quote)) => {
quotes.push(MarketEvent {
time_exchange,
time_received: Utc::now(),
exchange,
instrument: instrument.clone(),
kind: quote,
});
}
Err(e) => {
debug!(error = %e, "Skipping invalid quote record");
}
}
}
info!(count = quotes.len(), "Fetched historical quotes");
Ok(quotes)
}
pub async fn fetch_trades_stream<K: Clone + Send + 'static>(
&mut self,
params: &GetRangeParams,
exchange: ExchangeId,
instrument: K,
) -> Result<impl Stream<Item = Result<MarketEvent<K, PublicTrade>, DataError>>, DataError> {
debug!(?params, "Streaming historical trades from Databento");
let decoder = self
.client
.timeseries()
.get_range(params)
.await
.with_context("fetching trades")?;
Ok(futures::stream::unfold(
TradeStreamState {
decoder,
exchange,
instrument,
},
|mut state| async move {
loop {
match state.decoder.decode_record::<dbn::TradeMsg>().await {
Ok(Some(record)) => match dbn_trade_to_public_trade(record) {
Ok((time_exchange, trade)) => {
let event = MarketEvent {
time_exchange,
time_received: Utc::now(),
exchange: state.exchange,
instrument: state.instrument.clone(),
kind: trade,
};
return Some((Ok(event), state));
}
Err(e) => {
debug!(error = %e, "Skipping invalid trade record");
continue;
}
},
Ok(None) => return None,
Err(e) => {
return Some((Err(decode_error(e.to_string())), state));
}
}
}
},
))
}
pub async fn fetch_quotes_stream<K: Clone + Send + 'static>(
&mut self,
params: &GetRangeParams,
exchange: ExchangeId,
instrument: K,
) -> Result<impl Stream<Item = Result<MarketEvent<K, Quote>, DataError>>, DataError> {
debug!(?params, "Streaming historical quotes from Databento");
let decoder = self
.client
.timeseries()
.get_range(params)
.await
.with_context("fetching quotes")?;
Ok(futures::stream::unfold(
QuoteStreamState {
decoder,
exchange,
instrument,
},
|mut state| async move {
loop {
match state.decoder.decode_record::<dbn::Mbp1Msg>().await {
Ok(Some(record)) => match dbn_mbp1_to_quote(record) {
Ok((time_exchange, quote)) => {
let event = MarketEvent {
time_exchange,
time_received: Utc::now(),
exchange: state.exchange,
instrument: state.instrument.clone(),
kind: quote,
};
return Some((Ok(event), state));
}
Err(e) => {
debug!(error = %e, "Skipping invalid quote record");
continue;
}
},
Ok(None) => return None,
Err(e) => {
return Some((Err(decode_error(e.to_string())), state));
}
}
}
},
))
}
pub fn client(&self) -> &HistoricalClient {
&self.client
}
pub fn client_mut(&mut self) -> &mut HistoricalClient {
&mut self.client
}
}
pub fn load_trades_from_dbn<K: Clone>(
path: &Path,
exchange: ExchangeId,
instrument: K,
) -> Result<impl Iterator<Item = Result<MarketEvent<K, PublicTrade>, DataError>>, DataError> {
let decoder =
DynDecoder::from_file(path, VersionUpgradePolicy::AsIs).with_context("opening DBN file")?;
Ok(DbnTradeIterator {
decoder,
exchange,
instrument,
})
}
pub fn load_quotes_from_dbn<K: Clone>(
path: &Path,
exchange: ExchangeId,
instrument: K,
) -> Result<impl Iterator<Item = Result<MarketEvent<K, Quote>, DataError>>, DataError> {
let decoder =
DynDecoder::from_file(path, VersionUpgradePolicy::AsIs).with_context("opening DBN file")?;
Ok(DbnQuoteIterator {
decoder,
exchange,
instrument,
})
}
struct TradeStreamState<K, D> {
decoder: D,
exchange: ExchangeId,
instrument: K,
}
struct QuoteStreamState<K, D> {
decoder: D,
exchange: ExchangeId,
instrument: K,
}
struct DbnTradeIterator<K> {
decoder: DynDecoder<'static, std::io::BufReader<std::fs::File>>,
exchange: ExchangeId,
instrument: K,
}
impl<K: Clone> Iterator for DbnTradeIterator<K> {
type Item = Result<MarketEvent<K, PublicTrade>, DataError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.decoder.decode_record::<dbn::TradeMsg>() {
Ok(Some(record)) => match dbn_trade_to_public_trade(record) {
Ok((time_exchange, trade)) => {
return Some(Ok(MarketEvent {
time_exchange,
time_received: time_exchange,
exchange: self.exchange,
instrument: self.instrument.clone(),
kind: trade,
}));
}
Err(e) => {
debug!(error = %e, "Skipping invalid trade record");
continue;
}
},
Ok(None) => return None,
Err(e) => {
return Some(Err(decode_error(e.to_string())));
}
}
}
}
}
struct DbnQuoteIterator<K> {
decoder: DynDecoder<'static, std::io::BufReader<std::fs::File>>,
exchange: ExchangeId,
instrument: K,
}
impl<K: Clone> Iterator for DbnQuoteIterator<K> {
type Item = Result<MarketEvent<K, Quote>, DataError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.decoder.decode_record::<dbn::Mbp1Msg>() {
Ok(Some(record)) => match dbn_mbp1_to_quote(record) {
Ok((time_exchange, quote)) => {
return Some(Ok(MarketEvent {
time_exchange,
time_received: time_exchange,
exchange: self.exchange,
instrument: self.instrument.clone(),
kind: quote,
}));
}
Err(e) => {
debug!(error = %e, "Skipping invalid quote record");
continue;
}
},
Ok(None) => return None,
Err(e) => {
return Some(Err(decode_error(e.to_string())));
}
}
}
}
}