use std::{collections::HashMap, path::PathBuf};
use databento::dbn;
use nautilus_core::{
ffi::cvec::CVec,
python::{IntoPyObjectNautilusExt, to_pyvalue_err},
};
use nautilus_model::{
data::{
Bar, Data, DataFFI, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick,
TradeTick,
},
identifiers::{InstrumentId, Symbol, Venue},
python::instruments::instrument_any_to_pyobject,
};
use pyo3::{
prelude::*,
types::{PyCapsule, PyList},
};
use ustr::Ustr;
use crate::{
loader::DatabentoDataLoader,
types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
};
#[expect(clippy::needless_pass_by_value)]
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl DatabentoDataLoader {
#[new]
#[pyo3(signature = (publishers_filepath=None))]
fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
Self::new(publishers_filepath).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_publishers")]
fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
self.load_publishers(publishers_filepath)
.map_err(to_pyvalue_err)
}
#[must_use]
#[pyo3(name = "get_publishers")]
fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
self.get_publishers()
.iter()
.map(|(&key, value)| (key, value.clone()))
.collect::<HashMap<u16, DatabentoPublisher>>()
}
#[pyo3(name = "set_dataset_for_venue")]
fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
self.set_dataset_for_venue(Ustr::from(&dataset), venue);
}
#[must_use]
#[pyo3(name = "get_dataset_for_venue")]
fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
self.get_dataset_for_venue(venue).map(ToString::to_string)
}
#[must_use]
#[pyo3(name = "get_venue_for_publisher")]
fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
self.get_venue_for_publisher(publisher_id)
.map(ToString::to_string)
}
#[pyo3(name = "set_price_precision")]
fn py_set_price_precision(&mut self, symbol: &str, price_precision: u8) {
self.set_price_precision(Symbol::from(symbol), price_precision);
}
#[must_use]
#[pyo3(name = "get_price_precisions")]
fn py_get_price_precisions(&self) -> HashMap<String, u8> {
self.get_price_precisions()
.iter()
.map(|(symbol, precision)| (symbol.to_string(), *precision))
.collect()
}
#[pyo3(name = "schema_for_file")]
fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
self.schema_from_file(&filepath).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_instruments")]
#[pyo3(signature = (filepath, use_exchange_as_venue, skip_on_error=false))]
fn py_load_instruments(
&mut self,
py: Python,
filepath: PathBuf,
use_exchange_as_venue: bool,
skip_on_error: bool,
) -> PyResult<Py<PyAny>> {
let iter = self
.load_instruments(&filepath, use_exchange_as_venue, skip_on_error)
.map_err(to_pyvalue_err)?;
let mut data = Vec::new();
for instrument in iter {
let py_object = instrument_any_to_pyobject(py, instrument)?;
data.push(py_object);
}
let list = PyList::new(py, &data).expect("Invalid `ExactSizeIterator`");
Ok(list.into_py_any_unwrap(py))
}
#[pyo3(name = "load_order_book_deltas")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_order_book_deltas(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<OrderBookDelta>> {
self.load_order_book_deltas(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_order_book_deltas_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
fn py_load_order_book_deltas_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
include_trades: Option<bool>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::MboMsg>(
&filepath,
instrument_id,
price_precision,
include_trades.unwrap_or(false),
None,
)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_order_book_depth10")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_order_book_depth10(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<OrderBookDepth10>> {
self.load_order_book_depth10(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_order_book_depth10_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_order_book_depth10_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false, None)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_quotes")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_quotes(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<QuoteTick>> {
self.load_quotes(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_quotes_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
fn py_load_quotes_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
include_trades: Option<bool>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::Mbp1Msg>(
&filepath,
instrument_id,
price_precision,
include_trades.unwrap_or(false),
None,
)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_bbo_quotes")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_bbo_quotes(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<QuoteTick>> {
self.load_bbo_quotes(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_bbo_quotes_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_bbo_quotes_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false, None)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_cmbp_quotes")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_cmbp_quotes(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<QuoteTick>> {
self.load_cmbp_quotes(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_cmbp_quotes_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
fn py_load_cmbp_quotes_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
include_trades: Option<bool>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::Cmbp1Msg>(
&filepath,
instrument_id,
price_precision,
include_trades.unwrap_or(false),
None,
)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_cbbo_quotes")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_cbbo_quotes(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<QuoteTick>> {
self.load_cbbo_quotes(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_cbbo_quotes_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_cbbo_quotes_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_tbbo_trades")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_tbbo_trades(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<TradeTick>> {
self.load_tbbo_trades(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_tbbo_trades_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_tbbo_trades_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false, None)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_tcbbo_trades")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_tcbbo_trades(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<TradeTick>> {
self.load_tcbbo_trades(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_tcbbo_trades_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_tcbbo_trades_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_trades")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_trades(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<TradeTick>> {
self.load_trades(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_trades_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_trades_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false, None)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_bars")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
fn py_load_bars(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
timestamp_on_close: bool,
) -> PyResult<Vec<Bar>> {
self.load_bars(
&filepath,
instrument_id,
price_precision,
Some(timestamp_on_close),
)
.map_err(to_pyvalue_err)
}
#[pyo3(name = "load_bars_as_pycapsule")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
fn py_load_bars_as_pycapsule(
&self,
py: Python,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
timestamp_on_close: bool,
) -> PyResult<Py<PyAny>> {
let iter = self
.read_records::<dbn::OhlcvMsg>(
&filepath,
instrument_id,
price_precision,
false,
Some(timestamp_on_close),
)
.map_err(to_pyvalue_err)?;
exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
}
#[pyo3(name = "load_status")]
#[pyo3(signature = (filepath, instrument_id=None))]
fn py_load_status(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
) -> PyResult<Vec<InstrumentStatus>> {
let iter = self
.load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
.map_err(to_pyvalue_err)?;
let mut data = Vec::new();
for result in iter {
match result {
Ok(item) => data.push(item),
Err(e) => return Err(to_pyvalue_err(e)),
}
}
Ok(data)
}
#[pyo3(name = "load_imbalance")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_imbalance(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<DatabentoImbalance>> {
let iter = self
.read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)?;
let mut data = Vec::new();
for result in iter {
match result {
Ok(item) => data.push(item),
Err(e) => return Err(to_pyvalue_err(e)),
}
}
Ok(data)
}
#[pyo3(name = "load_statistics")]
#[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
fn py_load_statistics(
&self,
filepath: PathBuf,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> PyResult<Vec<DatabentoStatistics>> {
let iter = self
.read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
.map_err(to_pyvalue_err)?;
let mut data = Vec::new();
for result in iter {
match result {
Ok(item) => data.push(item),
Err(e) => return Err(to_pyvalue_err(e)),
}
}
Ok(data)
}
}
fn exhaust_data_iter_to_pycapsule(
py: Python,
iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
) -> anyhow::Result<Py<PyAny>> {
let mut data = Vec::new();
for result in iter {
match result {
Ok((Some(item1), None)) => data.push(item1),
Ok((None, Some(item2))) => data.push(item2),
Ok((Some(item1), Some(item2))) => {
data.push(item1);
data.push(item2);
}
Ok((None, None)) => {}
Err(e) => return Err(e),
}
}
let ffi_data: Vec<DataFFI> = data
.into_iter()
.map(DataFFI::try_from)
.collect::<Result<Vec<_>, _>>()
.map_err(to_pyvalue_err)?;
let cvec: CVec = ffi_data.into();
let capsule = PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {})?;
Ok(capsule.into_py_any_unwrap(py))
}