use std::{path::PathBuf, time::Duration};
use nautilus_common::{
actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
enums::{Environment, LogColor},
log_info, nautilus_actor,
timer::TimeEvent,
};
use nautilus_core::env::get_env_var;
use nautilus_databento::factories::{DatabentoDataClientFactory, DatabentoLiveClientConfig};
use nautilus_live::node::LiveNode;
use nautilus_model::{
data::{QuoteTick, TradeTick},
identifiers::{ClientId, InstrumentId, TraderId},
stubs::TestDefault,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
let environment = Environment::Live;
let trader_id = TraderId::test_default();
let node_name = "DATABENTO-TESTER-001".to_string();
let api_key = get_env_var("DATABENTO_API_KEY").unwrap_or_else(|_| {
println!("WARNING: DATABENTO_API_KEY not found, using placeholder");
"db-placeholder-key".to_string()
});
let publishers_filepath = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
if !publishers_filepath.exists() {
println!(
"WARNING: Publishers file not found at: {}",
publishers_filepath.display()
);
}
let databento_config = DatabentoLiveClientConfig::new(
api_key,
publishers_filepath,
true, true, );
let client_factory = DatabentoDataClientFactory::new();
let client_id = ClientId::new("DATABENTO");
let instrument_ids = vec![InstrumentId::from("ESM6.XCME")];
let mut node = LiveNode::builder(trader_id, environment)?
.with_name(node_name)
.with_load_state(false)
.with_save_state(false)
.with_delay_post_stop_secs(2)
.add_data_client(None, Box::new(client_factory), Box::new(databento_config))?
.build()?;
let actor_config = DatabentoSubscriberActorConfig::new(client_id, instrument_ids);
let actor = DatabentoSubscriberActor::new(actor_config);
node.add_actor(actor)?;
node.run().await?;
Ok(())
}
#[derive(Debug, Clone)]
pub struct DatabentoSubscriberActorConfig {
pub base: DataActorConfig,
pub client_id: ClientId,
pub instrument_ids: Vec<InstrumentId>,
}
impl DatabentoSubscriberActorConfig {
#[must_use]
pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
Self {
base: DataActorConfig::default(),
client_id,
instrument_ids,
}
}
}
#[derive(Debug)]
pub struct DatabentoSubscriberActor {
core: DataActorCore,
config: DatabentoSubscriberActorConfig,
pub received_quotes: Vec<QuoteTick>,
pub received_trades: Vec<TradeTick>,
}
nautilus_actor!(DatabentoSubscriberActor);
impl DataActor for DatabentoSubscriberActor {
fn on_start(&mut self) -> anyhow::Result<()> {
let instrument_ids = self.config.instrument_ids.clone();
let client_id = self.config.client_id;
for instrument_id in instrument_ids {
self.subscribe_quotes(instrument_id, Some(client_id), None);
self.subscribe_trades(instrument_id, Some(client_id), None);
}
self.clock().set_timer(
"TEST-TIMER-1-SECOND",
Duration::from_secs(1),
None,
None,
None,
Some(true),
Some(false),
)?;
self.clock().set_timer(
"TEST-TIMER-2-SECOND",
Duration::from_secs(2),
None,
None,
None,
Some(true),
Some(false),
)?;
Ok(())
}
fn on_stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
log_info!("{event:?}", color = LogColor::Blue);
Ok(())
}
fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
log_info!("{quote:?}", color = LogColor::Cyan);
self.received_quotes.push(*quote);
Ok(())
}
fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
log_info!("{trade:?}", color = LogColor::Cyan);
self.received_trades.push(*trade);
Ok(())
}
}
impl DatabentoSubscriberActor {
#[must_use]
pub fn new(config: DatabentoSubscriberActorConfig) -> Self {
Self {
core: DataActorCore::new(config.base.clone()),
config,
received_quotes: Vec::new(),
received_trades: Vec::new(),
}
}
#[must_use]
pub const fn quote_count(&self) -> usize {
self.received_quotes.len()
}
#[must_use]
pub const fn trade_count(&self) -> usize {
self.received_trades.len()
}
}