#![cfg(feature = "databento")]
#![allow(clippy::unwrap_used, clippy::expect_used)]
use databento::dbn::Schema;
use databento::historical::timeseries::GetRangeParams;
use futures_util::StreamExt;
use rust_decimal::Decimal;
use rustrade_data::exchange::databento::{DatabentoHistorical, DatabentoLive};
use rustrade_instrument::exchange::ExchangeId;
use std::collections::HashMap;
use std::pin::pin;
use time::{Duration, OffsetDateTime};
use tracing_subscriber::{EnvFilter, fmt};
fn init_logging() {
let _ = fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(tracing::Level::DEBUG.into())
.from_env_lossy(),
)
.try_init();
}
#[tokio::test]
#[ignore]
async fn test_historical_client_creation() {
init_logging();
let client = DatabentoHistorical::from_env();
assert!(
client.is_ok(),
"Failed to create historical client: {:?}",
client.err()
);
tracing::info!("Historical client created successfully");
}
#[tokio::test]
#[ignore]
async fn test_historical_fetch_trades() {
init_logging();
let mut client = DatabentoHistorical::from_env().expect("Failed to create historical client");
let end = OffsetDateTime::now_utc() - Duration::hours(25);
let start = end - Duration::minutes(5);
let params = GetRangeParams::builder()
.dataset("GLBX.MDP3")
.symbols(vec!["ES.FUT".to_string()])
.schema(Schema::Trades)
.date_time_range(start..end)
.build();
tracing::info!(%start, %end, "Fetching ES futures trades");
let trades = client
.fetch_trades(¶ms, ExchangeId::DatabentoGlbx, "ES")
.await;
assert!(trades.is_ok(), "Failed to fetch trades: {:?}", trades.err());
let trades = trades.unwrap();
tracing::info!(count = trades.len(), "Fetched trades");
if !trades.is_empty() {
let first = &trades[0];
assert_eq!(first.exchange, ExchangeId::DatabentoGlbx);
assert!(
first.kind.price > Decimal::ZERO,
"Trade price should be positive"
);
assert!(
first.kind.amount > Decimal::ZERO,
"Trade amount should be positive"
);
tracing::info!(
price = %first.kind.price,
amount = %first.kind.amount,
"First trade"
);
}
}
#[tokio::test]
#[ignore]
async fn test_historical_fetch_quotes() {
init_logging();
let mut client = DatabentoHistorical::from_env().expect("Failed to create historical client");
let end = OffsetDateTime::now_utc() - Duration::hours(25);
let start = end - Duration::minutes(5);
let params = GetRangeParams::builder()
.dataset("GLBX.MDP3")
.symbols(vec!["ES.FUT".to_string()])
.schema(Schema::Mbp1)
.date_time_range(start..end)
.build();
tracing::info!(%start, %end, "Fetching ES futures quotes");
let quotes = client
.fetch_quotes(¶ms, ExchangeId::DatabentoGlbx, "ES")
.await;
assert!(quotes.is_ok(), "Failed to fetch quotes: {:?}", quotes.err());
let quotes = quotes.unwrap();
tracing::info!(count = quotes.len(), "Fetched quotes");
if !quotes.is_empty() {
let first = "es[0];
assert_eq!(first.exchange, ExchangeId::DatabentoGlbx);
assert!(
first.kind.bid_price > Decimal::ZERO,
"Bid price should be positive"
);
assert!(
first.kind.ask_price > Decimal::ZERO,
"Ask price should be positive"
);
assert!(
first.kind.ask_price >= first.kind.bid_price,
"Ask should be >= bid"
);
tracing::info!(
bid = %first.kind.bid_price,
ask = %first.kind.ask_price,
"First quote"
);
}
}
#[tokio::test]
#[ignore]
async fn test_live_client_connection() {
init_logging();
let instruments: HashMap<String, String> = [("ESM5".to_string(), "ES-front".to_string())]
.into_iter()
.collect();
let client = DatabentoLive::from_env("GLBX.MDP3", ExchangeId::DatabentoGlbx, instruments).await;
assert!(
client.is_ok(),
"Failed to create live client: {:?}",
client.err()
);
tracing::info!("Live client connected successfully");
}
#[tokio::test]
#[ignore]
async fn test_live_subscribe() {
init_logging();
let instruments: HashMap<String, String> = [("ESM5".to_string(), "ES-front".to_string())]
.into_iter()
.collect();
let mut client = DatabentoLive::from_env("GLBX.MDP3", ExchangeId::DatabentoGlbx, instruments)
.await
.expect("Failed to create live client");
let result = client.subscribe(&["ESM5"], Schema::Trades).await;
assert!(result.is_ok(), "Failed to subscribe: {:?}", result.err());
tracing::info!("Subscription successful");
}
#[tokio::test]
#[ignore]
async fn test_live_stream_receives_data() {
init_logging();
let instruments: HashMap<String, String> = [("ESM5".to_string(), "ES-front".to_string())]
.into_iter()
.collect();
let mut client = DatabentoLive::from_env("GLBX.MDP3", ExchangeId::DatabentoGlbx, instruments)
.await
.expect("Failed to create live client");
client
.subscribe(&["ESM5"], Schema::Trades)
.await
.expect("Failed to subscribe");
tracing::info!("Starting live stream, waiting for data...");
let stream = client.start().await.expect("Failed to start stream");
let mut stream = pin!(stream);
let timeout = tokio::time::timeout(std::time::Duration::from_secs(60), async {
while let Some(event) = stream.next().await {
match event {
Ok(market_event) => {
tracing::info!(?market_event, "Received market event");
return Ok(market_event);
}
Err(e) => {
tracing::warn!(error = %e, "Stream error");
}
}
}
Err("Stream ended without data")
})
.await;
assert!(
timeout.is_ok(),
"Timeout waiting for live data. If outside CME Globex hours, this is expected."
);
let event = timeout.unwrap();
assert!(event.is_ok(), "No valid market event received");
tracing::info!("Live stream test passed");
}