use std::{any::Any, cell::RefCell, fmt::Debug, path::PathBuf, rc::Rc};
use nautilus_common::{
cache::CacheView,
clients::DataClient,
clock::Clock,
factories::{ClientConfig, DataClientFactory},
};
use nautilus_core::{
string::secret::REDACTED,
time::{AtomicTime, get_atomic_clock_realtime},
};
use nautilus_model::identifiers::ClientId;
use crate::{
common::{Credential, DATABENTO},
data::{DatabentoDataClient, DatabentoDataClientConfig},
historical::DatabentoHistoricalClient,
};
#[derive(Clone)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(
module = "nautilus_trader.core.nautilus_pyo3.databento",
from_py_object
)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
)]
pub struct DatabentoLiveClientConfig {
credential: Credential,
pub publishers_filepath: PathBuf,
pub use_exchange_as_venue: bool,
pub bars_timestamp_on_close: bool,
}
impl Debug for DatabentoLiveClientConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(DatabentoLiveClientConfig))
.field("credential", &REDACTED)
.field("publishers_filepath", &self.publishers_filepath)
.field("use_exchange_as_venue", &self.use_exchange_as_venue)
.field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
.finish()
}
}
impl DatabentoLiveClientConfig {
#[must_use]
pub fn new(
api_key: impl Into<String>,
publishers_filepath: PathBuf,
use_exchange_as_venue: bool,
bars_timestamp_on_close: bool,
) -> Self {
Self {
credential: Credential::new(api_key),
publishers_filepath,
use_exchange_as_venue,
bars_timestamp_on_close,
}
}
#[must_use]
pub fn api_key(&self) -> &str {
self.credential.api_key()
}
#[must_use]
pub fn api_key_masked(&self) -> String {
self.credential.api_key_masked()
}
}
impl ClientConfig for DatabentoLiveClientConfig {
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug, Clone)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(
module = "nautilus_trader.core.nautilus_pyo3.databento",
from_py_object
)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
)]
pub struct DatabentoDataClientFactory;
impl DatabentoDataClientFactory {
#[must_use]
pub const fn new() -> Self {
Self
}
pub fn create_live_data_client(
client_id: ClientId,
api_key: impl Into<String>,
publishers_filepath: PathBuf,
use_exchange_as_venue: bool,
bars_timestamp_on_close: bool,
clock: &'static AtomicTime,
) -> anyhow::Result<DatabentoDataClient> {
let config = DatabentoDataClientConfig::new(
api_key,
publishers_filepath,
use_exchange_as_venue,
bars_timestamp_on_close,
);
DatabentoDataClient::new(client_id, config, clock)
}
pub fn create_live_data_client_with_config(
client_id: ClientId,
config: DatabentoDataClientConfig,
clock: &'static AtomicTime,
) -> anyhow::Result<DatabentoDataClient> {
DatabentoDataClient::new(client_id, config, clock)
}
}
impl Default for DatabentoDataClientFactory {
fn default() -> Self {
Self::new()
}
}
impl DataClientFactory for DatabentoDataClientFactory {
fn create(
&self,
name: &str,
config: &dyn ClientConfig,
_cache: CacheView,
_clock: Rc<RefCell<dyn Clock>>,
) -> anyhow::Result<Box<dyn DataClient>> {
let databento_config = config
.as_any()
.downcast_ref::<DatabentoLiveClientConfig>()
.ok_or_else(|| {
anyhow::anyhow!(
"Invalid config type for DatabentoDataClientFactory. Expected DatabentoLiveClientConfig, was {config:?}"
)
})?;
let client_id = ClientId::from(name);
let config = DatabentoDataClientConfig::new(
databento_config.api_key(),
databento_config.publishers_filepath.clone(),
databento_config.use_exchange_as_venue,
databento_config.bars_timestamp_on_close,
);
let client = DatabentoDataClient::new(client_id, config, get_atomic_clock_realtime())?;
Ok(Box::new(client))
}
fn name(&self) -> &'static str {
DATABENTO
}
fn config_type(&self) -> &'static str {
"DatabentoLiveClientConfig"
}
}
#[derive(Debug)]
pub struct DatabentoHistoricalClientFactory;
impl DatabentoHistoricalClientFactory {
pub fn create(
api_key: String,
publishers_filepath: PathBuf,
use_exchange_as_venue: bool,
clock: &'static AtomicTime,
) -> anyhow::Result<DatabentoHistoricalClient> {
DatabentoHistoricalClient::new(
Credential::new(api_key),
publishers_filepath,
clock,
use_exchange_as_venue,
)
}
}
#[derive(Debug, Default)]
pub struct DatabentoDataClientConfigBuilder {
api_key: Option<String>,
dataset: Option<String>,
publishers_filepath: Option<PathBuf>,
use_exchange_as_venue: bool,
bars_timestamp_on_close: bool,
}
impl DatabentoDataClientConfigBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn api_key(mut self, api_key: String) -> Self {
self.api_key = Some(api_key);
self
}
#[must_use]
pub fn dataset(mut self, dataset: String) -> Self {
self.dataset = Some(dataset);
self
}
#[must_use]
pub fn publishers_filepath(mut self, filepath: PathBuf) -> Self {
self.publishers_filepath = Some(filepath);
self
}
#[must_use]
pub const fn use_exchange_as_venue(mut self, use_exchange: bool) -> Self {
self.use_exchange_as_venue = use_exchange;
self
}
#[must_use]
pub const fn bars_timestamp_on_close(mut self, timestamp_on_close: bool) -> Self {
self.bars_timestamp_on_close = timestamp_on_close;
self
}
pub fn build(self) -> anyhow::Result<DatabentoDataClientConfig> {
let api_key = self
.api_key
.ok_or_else(|| anyhow::anyhow!("API key is required"))?;
let publishers_filepath = self
.publishers_filepath
.ok_or_else(|| anyhow::anyhow!("Publishers filepath is required"))?;
Ok(DatabentoDataClientConfig::new(
api_key,
publishers_filepath,
self.use_exchange_as_venue,
self.bars_timestamp_on_close,
))
}
}
#[cfg(test)]
mod tests {
use nautilus_core::time::get_atomic_clock_realtime;
use rstest::rstest;
use super::*;
#[rstest]
fn test_config_builder() {
let config = DatabentoDataClientConfigBuilder::new()
.api_key("test_key".to_string())
.dataset("GLBX.MDP3".to_string())
.publishers_filepath(PathBuf::from("test_publishers.json"))
.use_exchange_as_venue(true)
.bars_timestamp_on_close(false)
.build();
assert!(config.is_ok());
let config = config.unwrap();
assert_eq!(config.api_key(), "test_key");
assert!(config.use_exchange_as_venue);
assert!(!config.bars_timestamp_on_close);
}
#[rstest]
fn test_config_builder_missing_required_fields() {
let config = DatabentoDataClientConfigBuilder::new()
.api_key("test_key".to_string())
.build();
assert!(config.is_err());
}
#[rstest]
fn test_historical_client_factory() {
let api_key = "test-000000000000000000000000000".to_string();
let publishers_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
let clock = get_atomic_clock_realtime();
let result =
DatabentoHistoricalClientFactory::create(api_key, publishers_path, false, clock);
assert!(result.is_ok());
}
#[rstest]
fn test_live_data_client_factory_missing_publishers() {
let client_id = ClientId::from("DATABENTO-001");
let api_key = "test_key".to_string();
let publishers_path = PathBuf::from("nonexistent_publishers.json");
let clock = get_atomic_clock_realtime();
let result = DatabentoDataClientFactory::create_live_data_client(
client_id,
api_key,
publishers_path,
false,
true,
clock,
);
assert!(result.is_err());
}
}