use std::{
fmt::Debug,
path::PathBuf,
sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
},
};
use ahash::AHashMap;
use databento::live::Subscription;
use indexmap::IndexMap;
use nautilus_common::{
clients::DataClient,
live::{runner::get_data_event_sender, runtime::get_runtime},
messages::{
DataEvent,
data::{
RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
SubscribeInstrument, SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades,
UnsubscribeBookDeltas, UnsubscribeInstrumentStatus, UnsubscribeQuotes,
UnsubscribeTrades,
},
},
};
use nautilus_core::{AtomicMap, MUTEX_POISONED, string::secret::REDACTED, time::AtomicTime};
use nautilus_model::{
enums::BarAggregation,
identifiers::{ClientId, Symbol, Venue},
instruments::Instrument,
};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::{
common::Credential,
historical::{DatabentoHistoricalClient, RangeQueryParams},
live::{DatabentoFeedHandler, DatabentoMessage, HandlerCommand},
loader::DatabentoDataLoader,
symbology::instrument_id_to_symbol_string,
types::PublisherId,
};
#[derive(Clone)]
pub struct DatabentoDataClientConfig {
pub(crate) credential: Credential,
pub publishers_filepath: PathBuf,
pub use_exchange_as_venue: bool,
pub bars_timestamp_on_close: bool,
pub reconnect_timeout_mins: Option<u64>,
}
impl Debug for DatabentoDataClientConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(DatabentoDataClientConfig))
.field("credential", &REDACTED)
.field("publishers_filepath", &self.publishers_filepath)
.field("use_exchange_as_venue", &self.use_exchange_as_venue)
.field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
.field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
.finish()
}
}
impl DatabentoDataClientConfig {
#[must_use]
pub fn new(
api_key: impl Into<String>,
publishers_filepath: PathBuf,
use_exchange_as_venue: bool,
bars_timestamp_on_close: bool,
) -> Self {
Self {
credential: Credential::new(api_key),
publishers_filepath,
use_exchange_as_venue,
bars_timestamp_on_close,
reconnect_timeout_mins: Some(10), }
}
#[must_use]
pub fn api_key(&self) -> &str {
self.credential.api_key()
}
#[must_use]
pub fn api_key_masked(&self) -> String {
self.credential.api_key_masked()
}
}
#[cfg_attr(feature = "python", pyo3::pyclass)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
)]
#[derive(Debug)]
pub struct DatabentoDataClient {
client_id: ClientId,
config: DatabentoDataClientConfig,
is_connected: AtomicBool,
historical: DatabentoHistoricalClient,
loader: DatabentoDataLoader,
cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>>,
task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
cancellation_token: CancellationToken,
publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
}
impl DatabentoDataClient {
pub fn new(
client_id: ClientId,
config: DatabentoDataClientConfig,
clock: &'static AtomicTime,
) -> anyhow::Result<Self> {
let historical = DatabentoHistoricalClient::new(
config.credential.clone(),
config.publishers_filepath.clone(),
clock,
config.use_exchange_as_venue,
)?;
let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
let publishers_vec: Vec<crate::types::DatabentoPublisher> =
serde_json::from_str(&file_content)?;
let publisher_venue_map = publishers_vec
.into_iter()
.map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
.collect::<IndexMap<u16, Venue>>();
let data_sender = get_data_event_sender();
Ok(Self {
client_id,
config,
is_connected: AtomicBool::new(false),
historical,
loader,
cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
task_handles: Arc::new(Mutex::new(Vec::new())),
cancellation_token: CancellationToken::new(),
publisher_venue_map: Arc::new(publisher_venue_map),
symbol_venue_map: Arc::new(AtomicMap::new()),
data_sender,
})
}
fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
self.loader
.get_dataset_for_venue(&venue)
.map(ToString::to_string)
.ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
}
fn get_or_create_feed_handler(&self, dataset: &str) {
let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
if !channels.contains_key(dataset) {
log::info!("Creating new feed handler for dataset: {dataset}");
let cmd_tx = self.initialize_live_feed(dataset.to_string());
channels.insert(dataset.to_string(), cmd_tx);
log::debug!("Feed handler created for dataset: {dataset}, channel stored");
}
}
fn send_command_to_dataset(&self, dataset: &str, cmd: HandlerCommand) -> anyhow::Result<()> {
let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
if let Some(tx) = channels.get(dataset) {
tx.send(cmd)
.map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
} else {
anyhow::bail!("No feed handler found for dataset: {dataset}");
}
Ok(())
}
fn initialize_live_feed(
&self,
dataset: String,
) -> tokio::sync::mpsc::UnboundedSender<HandlerCommand> {
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
let mut feed_handler = DatabentoFeedHandler::new(
self.config.credential.clone(),
dataset,
cmd_rx,
msg_tx,
(*self.publisher_venue_map).clone(),
self.symbol_venue_map.clone(),
self.config.use_exchange_as_venue,
self.config.bars_timestamp_on_close,
self.config.reconnect_timeout_mins,
);
let cancellation_token = self.cancellation_token.clone();
let feed_handle = get_runtime().spawn(async move {
tokio::select! {
result = feed_handler.run() => {
if let Err(e) = result {
log::error!("Feed handler error: {e}");
}
}
() = cancellation_token.cancelled() => {
log::debug!("Feed handler cancelled");
}
}
});
let cancellation_token = self.cancellation_token.clone();
let data_sender = self.data_sender.clone();
let msg_handle = get_runtime().spawn(async move {
let mut msg_rx = msg_rx;
loop {
tokio::select! {
msg = msg_rx.recv() => {
match msg {
Some(DatabentoMessage::Data(data)) => {
log::debug!("Received data: {data:?}");
if let Err(e) = data_sender.send(DataEvent::Data(data)) {
log::error!("Failed to send data event: {e}");
}
}
Some(DatabentoMessage::Instrument(instrument)) => {
log::info!("Received instrument definition: {}", instrument.id());
if let Err(e) = data_sender.send(DataEvent::Instrument(*instrument)) {
log::error!("Failed to send instrument: {e}");
}
}
Some(DatabentoMessage::Status(status)) => {
log::debug!("Received status: {status:?}");
}
Some(DatabentoMessage::Imbalance(imbalance)) => {
log::debug!("Received imbalance: {imbalance:?}");
}
Some(DatabentoMessage::Statistics(statistics)) => {
log::debug!("Received statistics: {statistics:?}");
}
Some(DatabentoMessage::SubscriptionAck(ack)) => {
log::debug!("Received subscription ack: {}", ack.message);
}
Some(DatabentoMessage::Error(error)) => {
log::error!("Feed handler error: {error}");
}
Some(DatabentoMessage::Close) => {
log::info!("Feed handler closed");
break;
}
None => {
log::debug!("Message channel closed");
break;
}
}
}
() = cancellation_token.cancelled() => {
log::debug!("Message processing cancelled");
break;
}
}
}
});
{
let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
handles.push(feed_handle);
handles.push(msg_handle);
}
cmd_tx
}
}
#[async_trait::async_trait(?Send)]
impl DataClient for DatabentoDataClient {
fn client_id(&self) -> ClientId {
self.client_id
}
fn venue(&self) -> Option<Venue> {
None
}
fn start(&mut self) -> anyhow::Result<()> {
log::debug!("Starting");
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::debug!("Stopping");
self.cancellation_token.cancel();
let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
for (dataset, tx) in channels.iter() {
if let Err(e) = tx.send(HandlerCommand::Close) {
log::error!("Failed to send close command to dataset {dataset}: {e}");
}
}
self.is_connected.store(false, Ordering::Relaxed);
Ok(())
}
fn reset(&mut self) -> anyhow::Result<()> {
log::debug!("Resetting");
self.is_connected.store(false, Ordering::Relaxed);
Ok(())
}
fn dispose(&mut self) -> anyhow::Result<()> {
log::debug!("Disposing");
self.stop()
}
async fn connect(&mut self) -> anyhow::Result<()> {
log::debug!("Connecting...");
self.is_connected.store(true, Ordering::Relaxed);
log::info!("Connected");
Ok(())
}
async fn disconnect(&mut self) -> anyhow::Result<()> {
log::debug!("Disconnecting...");
self.cancellation_token.cancel();
{
let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
for (dataset, tx) in channels.iter() {
if let Err(e) = tx.send(HandlerCommand::Close) {
log::error!("Failed to send close command to dataset {dataset}: {e}");
}
}
}
let handles = {
let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
std::mem::take(&mut *task_handles)
};
for handle in handles {
if let Err(e) = handle.await
&& !e.is_cancelled()
{
log::error!("Task join error: {e}");
}
}
self.is_connected.store(false, Ordering::Relaxed);
{
let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
channels.clear();
}
log::info!("Disconnected");
Ok(())
}
fn is_connected(&self) -> bool {
self.is_connected.load(Ordering::Relaxed)
}
fn is_disconnected(&self) -> bool {
!self.is_connected()
}
fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
log::debug!("Subscribe instrument: {cmd:?}");
let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
let was_new_handler = {
let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
!channels.contains_key(&dataset)
};
self.get_or_create_feed_handler(&dataset);
if was_new_handler {
self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
}
self.symbol_venue_map
.insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
let symbol = cmd.instrument_id.symbol.to_string();
let subscription = Subscription::builder()
.schema(databento::dbn::Schema::Definition)
.symbols(symbol)
.build();
self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
Ok(())
}
fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
log::debug!("Subscribe quotes: {cmd:?}");
let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
let was_new_handler = {
let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
!channels.contains_key(&dataset)
};
self.get_or_create_feed_handler(&dataset);
if was_new_handler {
self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
}
self.symbol_venue_map
.insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
let symbol = cmd.instrument_id.symbol.to_string();
let subscription = Subscription::builder()
.schema(databento::dbn::Schema::Mbp1) .symbols(symbol)
.build();
self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
Ok(())
}
fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
log::debug!("Subscribe trades: {cmd:?}");
let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
let was_new_handler = {
let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
!channels.contains_key(&dataset)
};
self.get_or_create_feed_handler(&dataset);
if was_new_handler {
self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
}
self.symbol_venue_map
.insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
let symbol = cmd.instrument_id.symbol.to_string();
let subscription = Subscription::builder()
.schema(databento::dbn::Schema::Trades)
.symbols(symbol)
.build();
self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
Ok(())
}
fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
log::debug!("Subscribe book deltas: {cmd:?}");
let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
let was_new_handler = {
let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
!channels.contains_key(&dataset)
};
self.get_or_create_feed_handler(&dataset);
if was_new_handler {
self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
}
self.symbol_venue_map
.insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
let symbol = cmd.instrument_id.symbol.to_string();
let subscription = Subscription::builder()
.schema(databento::dbn::Schema::Mbo) .symbols(symbol)
.build();
self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
Ok(())
}
fn subscribe_instrument_status(
&mut self,
cmd: SubscribeInstrumentStatus,
) -> anyhow::Result<()> {
log::debug!("Subscribe instrument status: {cmd:?}");
let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
let was_new_handler = {
let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
!channels.contains_key(&dataset)
};
self.get_or_create_feed_handler(&dataset);
if was_new_handler {
self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
}
self.symbol_venue_map
.insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
let symbol = cmd.instrument_id.symbol.to_string();
let subscription = Subscription::builder()
.schema(databento::dbn::Schema::Status)
.symbols(symbol)
.build();
self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
Ok(())
}
fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
log::debug!("Unsubscribe quotes: {cmd:?}");
log::warn!(
"Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
cmd.instrument_id
);
Ok(())
}
fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
log::debug!("Unsubscribe trades: {cmd:?}");
log::warn!(
"Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
cmd.instrument_id
);
Ok(())
}
fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
log::debug!("Unsubscribe book deltas: {cmd:?}");
log::warn!(
"Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
cmd.instrument_id
);
Ok(())
}
fn unsubscribe_instrument_status(
&mut self,
cmd: &UnsubscribeInstrumentStatus,
) -> anyhow::Result<()> {
log::debug!("Unsubscribe instrument status: {cmd:?}");
log::warn!(
"Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
cmd.instrument_id
);
Ok(())
}
fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
log::debug!("Request instruments: {request:?}");
let historical_client = self.historical.clone();
let data_sender = self.data_sender.clone();
get_runtime().spawn(async move {
let symbols = vec!["ALL_SYMBOLS".to_string()];
let params = RangeQueryParams {
dataset: "GLBX.MDP3".to_string(), symbols,
start: request
.start
.map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
.into(),
end: request
.end
.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
.map(Into::into),
limit: None,
price_precision: None,
};
match historical_client.get_range_instruments(params).await {
Ok(instruments) => {
log::info!("Retrieved {} instruments", instruments.len());
for instrument in instruments {
if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
log::error!("Failed to send instrument: {e}");
}
}
}
Err(e) => {
log::error!("Failed to request instruments: {e}");
}
}
});
Ok(())
}
fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
log::debug!("Request quotes: {request:?}");
let historical_client = self.historical.clone();
get_runtime().spawn(async move {
let symbols = vec![instrument_id_to_symbol_string(
request.instrument_id,
&mut AHashMap::new(), )];
let params = RangeQueryParams {
dataset: "GLBX.MDP3".to_string(), symbols,
start: request
.start
.map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
.into(),
end: request
.end
.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
.map(Into::into),
limit: request.limit.map(|l| l.get() as u64),
price_precision: None,
};
match historical_client.get_range_quotes(params, None).await {
Ok(quotes) => {
log::info!("Retrieved {} quotes", quotes.len());
}
Err(e) => {
log::error!("Failed to request quotes: {e}");
}
}
});
Ok(())
}
fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
log::debug!("Request trades: {request:?}");
let historical_client = self.historical.clone();
get_runtime().spawn(async move {
let symbols = vec![instrument_id_to_symbol_string(
request.instrument_id,
&mut AHashMap::new(), )];
let params = RangeQueryParams {
dataset: "GLBX.MDP3".to_string(), symbols,
start: request
.start
.map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
.into(),
end: request
.end
.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
.map(Into::into),
limit: request.limit.map(|l| l.get() as u64),
price_precision: None,
};
match historical_client.get_range_trades(params).await {
Ok(trades) => {
log::info!("Retrieved {} trades", trades.len());
}
Err(e) => {
log::error!("Failed to request trades: {e}");
}
}
});
Ok(())
}
fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
log::debug!("Request bars: {request:?}");
let historical_client = self.historical.clone();
get_runtime().spawn(async move {
let symbols = vec![instrument_id_to_symbol_string(
request.bar_type.instrument_id(),
&mut AHashMap::new(), )];
let params = RangeQueryParams {
dataset: "GLBX.MDP3".to_string(), symbols,
start: request
.start
.map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
.into(),
end: request
.end
.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
.map(Into::into),
limit: request.limit.map(|l| l.get() as u64),
price_precision: None,
};
let aggregation = match request.bar_type.spec().aggregation {
BarAggregation::Second => BarAggregation::Second,
BarAggregation::Minute => BarAggregation::Minute,
BarAggregation::Hour => BarAggregation::Hour,
BarAggregation::Day => BarAggregation::Day,
_ => {
log::error!(
"Unsupported bar aggregation: {:?}",
request.bar_type.spec().aggregation
);
return;
}
};
match historical_client
.get_range_bars(params, aggregation, true)
.await
{
Ok(bars) => {
log::info!("Retrieved {} bars", bars.len());
}
Err(e) => {
log::error!("Failed to request bars: {e}");
}
}
});
Ok(())
}
}