use super::error::{DatabentoErrorKind, DatabentoResultExt};
use super::transformer::{dbn_mbp1_to_orderbook_l1, dbn_trade_to_public_trade};
use crate::error::DataError;
use crate::event::{DataKind, MarketEvent};
use chrono::Utc;
use databento::LiveClient;
use databento::dbn::{Mbp1Msg, PitSymbolMap, RecordRef, Schema, TradeMsg};
use databento::live::Subscription;
use futures::Stream;
use rustrade_instrument::exchange::ExchangeId;
use std::collections::HashMap;
use tracing::{debug, trace, warn};
#[derive(Debug)]
pub struct DatabentoLive<K> {
client: LiveClient,
instruments: HashMap<String, K>,
exchange: ExchangeId,
}
impl<K> DatabentoLive<K> {
pub async fn from_env(
dataset: &str,
exchange: ExchangeId,
instruments: HashMap<String, K>,
) -> Result<Self, DataError> {
debug!(dataset, "Creating Databento live client from env");
let client = LiveClient::builder()
.key_from_env()
.with_context("reading API key from env")?
.dataset(dataset)
.build()
.await
.with_context("building live client")?;
Ok(Self {
client,
instruments,
exchange,
})
}
pub async fn new(
api_key: &str,
dataset: &str,
exchange: ExchangeId,
instruments: HashMap<String, K>,
) -> Result<Self, DataError> {
debug!(dataset, "Creating Databento live client");
let client = LiveClient::builder()
.key(api_key)
.with_context("setting API key")?
.dataset(dataset)
.build()
.await
.with_context("building live client")?;
Ok(Self {
client,
instruments,
exchange,
})
}
pub async fn subscribe(&mut self, symbols: &[&str], schema: Schema) -> Result<(), DataError> {
debug!(?symbols, ?schema, "Subscribing to Databento live feed");
let subscription = Subscription::builder()
.symbols(symbols)
.schema(schema)
.build();
self.client
.subscribe(subscription)
.await
.with_context("subscribing to live feed")?;
Ok(())
}
pub fn client(&self) -> &LiveClient {
&self.client
}
pub fn client_mut(&mut self) -> &mut LiveClient {
&mut self.client
}
}
impl<K: Clone + Send + 'static> DatabentoLive<K> {
pub async fn start(
mut self,
) -> Result<impl Stream<Item = Result<MarketEvent<K, DataKind>, DataError>>, DataError> {
debug!("Starting Databento live stream");
self.client
.start()
.await
.with_context("starting live stream")?;
let stream = futures::stream::unfold(
StreamState {
client: self.client,
symbol_map: PitSymbolMap::new(),
instruments: self.instruments,
exchange: self.exchange,
},
|mut state| async move {
loop {
let record = match state.client.next_record().await {
Ok(Some(rec)) => rec,
Ok(None) => {
debug!("Databento live stream ended");
return None;
}
Err(e) => {
let err = DataError::Databento {
kind: DatabentoErrorKind::Network,
context: "receiving record".to_string(),
message: e.to_string(),
};
return Some((Err(err), state));
}
};
if let Err(e) = state.symbol_map.on_record(record) {
warn!(error = %e, "Failed to update symbol map");
}
if let Some(result) = process_record(
record,
&state.symbol_map,
&state.instruments,
state.exchange,
) {
return Some((result, state));
}
}
},
);
Ok(stream)
}
}
struct StreamState<K> {
client: LiveClient,
symbol_map: PitSymbolMap,
instruments: HashMap<String, K>,
exchange: ExchangeId,
}
fn process_record<K: Clone>(
record: RecordRef<'_>,
symbol_map: &PitSymbolMap,
instruments: &HashMap<String, K>,
exchange: ExchangeId,
) -> Option<Result<MarketEvent<K, DataKind>, DataError>> {
if let Some(trade) = record.get::<TradeMsg>() {
let symbol = symbol_map.get(trade.hd.instrument_id)?;
let instrument = match instruments.get(symbol) {
Some(key) => key.clone(),
None => {
trace!(%symbol, "Skipping trade for unknown symbol");
return None;
}
};
match dbn_trade_to_public_trade(trade) {
Ok((time_exchange, public_trade)) => {
return Some(Ok(MarketEvent {
time_exchange,
time_received: Utc::now(),
exchange,
instrument,
kind: DataKind::Trade(public_trade),
}));
}
Err(e) => {
debug!(error = %e, %symbol, "Skipping invalid trade record");
return None;
}
}
}
if let Some(mbp1) = record.get::<Mbp1Msg>() {
let symbol = symbol_map.get(mbp1.hd.instrument_id)?;
let instrument = match instruments.get(symbol) {
Some(key) => key.clone(),
None => {
trace!(%symbol, "Skipping quote for unknown symbol");
return None;
}
};
match dbn_mbp1_to_orderbook_l1(mbp1) {
Ok((time_exchange, l1)) => {
return Some(Ok(MarketEvent {
time_exchange,
time_received: Utc::now(),
exchange,
instrument,
kind: DataKind::OrderBookL1(l1),
}));
}
Err(e) => {
debug!(error = %e, %symbol, "Skipping invalid quote record");
return None;
}
}
}
None
}