use std::collections::HashMap;
use nautilus_core::{
ffi::cvec::CVec,
python::{IntoPyObjectNautilusExt, to_pyruntime_err},
};
use nautilus_model::data::{
Bar, Data, DataFFI, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
};
use nautilus_serialization::arrow::custom::CustomDataDecoder;
use pyo3::{prelude::*, types::PyCapsule};
use crate::backend::session::{DataBackendSession, DataQueryResult};
fn data_to_pyobject(py: Python<'_>, item: Data) -> PyResult<Py<PyAny>> {
match item {
Data::Quote(quote) => Py::new(py, quote).map(|x| x.into_any()),
Data::Trade(trade) => Py::new(py, trade).map(|x| x.into_any()),
Data::Bar(bar) => Py::new(py, bar).map(|x| x.into_any()),
Data::Delta(delta) => Py::new(py, delta).map(|x| x.into_any()),
Data::Deltas(deltas) => Py::new(py, (*deltas).clone()).map(|x| x.into_any()),
Data::Depth10(depth) => Py::new(py, *depth).map(|x| x.into_any()),
Data::IndexPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
Data::MarkPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
Data::InstrumentClose(close) => Py::new(py, close).map(|x| x.into_any()),
Data::Custom(custom) => Py::new(py, custom).map(|x| x.into_any()),
}
}
#[repr(C)]
#[pyclass(frozen, eq, eq_int, from_py_object)]
#[pyo3_stub_gen::derive::gen_stub_pyclass_enum(module = "nautilus_trader.persistence")]
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum NautilusDataType {
OrderBookDelta = 1,
OrderBookDepth10 = 2,
QuoteTick = 3,
TradeTick = 4,
Bar = 5,
MarkPriceUpdate = 6,
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl NautilusDataType {
const fn __hash__(&self) -> isize {
*self as isize
}
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl DataBackendSession {
#[new]
#[pyo3(signature=(chunk_size=10_000))]
fn new_session(chunk_size: usize) -> Self {
Self::new(chunk_size)
}
#[pyo3(name = "add_file")]
#[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
fn py_add_file(
mut slf: PyRefMut<'_, Self>,
data_type: NautilusDataType,
table_name: &str,
file_path: &str,
sql_query: Option<&str>,
) -> PyResult<()> {
let _guard = slf.runtime.enter();
match data_type {
NautilusDataType::OrderBookDelta => slf
.add_file::<OrderBookDelta>(table_name, file_path, sql_query, None)
.map_err(to_pyruntime_err),
NautilusDataType::OrderBookDepth10 => slf
.add_file::<OrderBookDepth10>(table_name, file_path, sql_query, None)
.map_err(to_pyruntime_err),
NautilusDataType::QuoteTick => slf
.add_file::<QuoteTick>(table_name, file_path, sql_query, None)
.map_err(to_pyruntime_err),
NautilusDataType::TradeTick => slf
.add_file::<TradeTick>(table_name, file_path, sql_query, None)
.map_err(to_pyruntime_err),
NautilusDataType::Bar => slf
.add_file::<Bar>(table_name, file_path, sql_query, None)
.map_err(to_pyruntime_err),
NautilusDataType::MarkPriceUpdate => slf
.add_file::<MarkPriceUpdate>(table_name, file_path, sql_query, None)
.map_err(to_pyruntime_err),
}
}
#[pyo3(name = "add_custom_file")]
#[pyo3(signature = (type_name, table_name, file_path, sql_query=None))]
fn py_add_custom_file(
mut slf: PyRefMut<'_, Self>,
type_name: &str,
table_name: &str,
file_path: &str,
sql_query: Option<&str>,
) -> PyResult<()> {
let _guard = slf.runtime.enter();
slf.add_file::<CustomDataDecoder>(table_name, file_path, sql_query, Some(type_name))
.map_err(to_pyruntime_err)
}
fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
let query_result = slf.get_query_result();
DataQueryResult::new(query_result, slf.chunk_size)
}
#[pyo3(name = "register_object_store_from_uri")]
#[pyo3(signature = (uri, storage_options=None))]
fn py_register_object_store_from_uri(
mut slf: PyRefMut<'_, Self>,
uri: &str,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<()> {
let storage_options = storage_options.map(|m| m.into_iter().collect());
slf.register_object_store_from_uri(uri, storage_options)
.map_err(to_pyruntime_err)
}
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl DataQueryResult {
const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<Py<PyAny>>> {
match slf.next() {
Some(acc) if !acc.is_empty() => {
let has_custom = acc.iter().any(|d| matches!(d, Data::Custom(_)));
if has_custom {
Python::attach(|py| {
let objects: Vec<Py<PyAny>> = acc
.into_iter()
.map(|item| data_to_pyobject(py, item))
.collect::<PyResult<_>>()?;
Ok(Some(objects.into_py_any_unwrap(py)))
})
} else {
let ffi_data: Vec<DataFFI> = acc
.into_iter()
.map(DataFFI::try_from)
.collect::<Result<Vec<_>, _>>()
.map_err(to_pyruntime_err)?;
let cvec: CVec = ffi_data.into();
Python::attach(|py| {
match PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {}) {
Ok(capsule) => Ok(Some(capsule.into_py_any_unwrap(py))),
Err(e) => Err(to_pyruntime_err(e)),
}
})
}
}
_ => Ok(None),
}
}
}