use std::io::Cursor;
use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
use nautilus_core::python::{to_pyruntime_err, to_pytype_err, to_pyvalue_err};
use nautilus_model::{
data::{
Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
TradeTick, close::InstrumentClose,
},
python::data::{
pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
pyobjects_to_instrument_closes, pyobjects_to_mark_prices, pyobjects_to_quotes,
pyobjects_to_trades,
},
};
use pyo3::{
conversion::IntoPyObjectExt,
prelude::*,
types::{PyBytes, PyType},
};
use crate::arrow::{
ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
};
pub fn arrow_record_batch_to_pybytes(py: Python, batch: &RecordBatch) -> PyResult<Py<PyBytes>> {
let mut cursor = Cursor::new(Vec::new());
{
let mut writer =
StreamWriter::try_new(&mut cursor, &batch.schema()).map_err(to_pyruntime_err)?;
writer.write(batch).map_err(to_pyruntime_err)?;
writer.finish().map_err(to_pyruntime_err)?;
}
let buffer = cursor.into_inner();
let pybytes = PyBytes::new(py, &buffer);
Ok(pybytes.into())
}
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
let cls_str: String = cls.getattr("__name__")?.extract()?;
let result_map = match cls_str.as_str() {
stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
stringify!(QuoteTick) => QuoteTick::get_schema_map(),
stringify!(TradeTick) => TradeTick::get_schema_map(),
stringify!(Bar) => Bar::get_schema_map(),
stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
_ => {
return Err(to_pytype_err(format!(
"Arrow schema for `{cls_str}` is not currently implemented in Rust."
)));
}
};
result_map.into_py_any(py)
}
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::missing_panics_doc)] pub fn pyobjects_to_arrow_record_batch_bytes(
py: Python,
data: Vec<Bound<'_, PyAny>>,
) -> PyResult<Py<PyBytes>> {
if data.is_empty() {
return Err(to_pyvalue_err("Empty data"));
}
let data_type: String = data
.first()
.unwrap() .getattr("__class__")?
.getattr("__name__")?
.extract()?;
match data_type.as_str() {
stringify!(OrderBookDelta) => {
let deltas = pyobjects_to_book_deltas(data)?;
py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
}
stringify!(OrderBookDepth10) => {
let depth_snapshots: Vec<OrderBookDepth10> = data
.into_iter()
.map(|obj| obj.extract::<OrderBookDepth10>().map_err(Into::into))
.collect::<PyResult<Vec<OrderBookDepth10>>>()?;
py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
}
stringify!(QuoteTick) => {
let quotes = pyobjects_to_quotes(data)?;
py_quotes_to_arrow_record_batch_bytes(py, quotes)
}
stringify!(TradeTick) => {
let trades = pyobjects_to_trades(data)?;
py_trades_to_arrow_record_batch_bytes(py, trades)
}
stringify!(Bar) => {
let bars = pyobjects_to_bars(data)?;
py_bars_to_arrow_record_batch_bytes(py, bars)
}
stringify!(MarkPriceUpdate) => {
let updates = pyobjects_to_mark_prices(data)?;
py_mark_prices_to_arrow_record_batch_bytes(py, updates)
}
stringify!(IndexPriceUpdate) => {
let index_prices = pyobjects_to_index_prices(data)?;
py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
}
stringify!(InstrumentClose) => {
let closes = pyobjects_to_instrument_closes(data)?;
py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
}
_ => Err(to_pyvalue_err(format!(
"unsupported data type: {data_type}"
))),
}
}
#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::needless_pass_by_value)]
pub fn py_book_deltas_to_arrow_record_batch_bytes(
py: Python,
data: Vec<OrderBookDelta>,
) -> PyResult<Py<PyBytes>> {
match book_deltas_to_arrow_record_batch_bytes(&data) {
Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
Err(e) => Err(to_pyvalue_err(e)),
}
}
#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::needless_pass_by_value)]
pub fn py_book_depth10_to_arrow_record_batch_bytes(
py: Python,
data: Vec<OrderBookDepth10>,
) -> PyResult<Py<PyBytes>> {
match book_depth10_to_arrow_record_batch_bytes(&data) {
Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
Err(e) => Err(to_pyvalue_err(e)),
}
}
#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::needless_pass_by_value)]
pub fn py_quotes_to_arrow_record_batch_bytes(
py: Python,
data: Vec<QuoteTick>,
) -> PyResult<Py<PyBytes>> {
match quotes_to_arrow_record_batch_bytes(&data) {
Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
Err(e) => Err(to_pyvalue_err(e)),
}
}
#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::needless_pass_by_value)]
pub fn py_trades_to_arrow_record_batch_bytes(
py: Python,
data: Vec<TradeTick>,
) -> PyResult<Py<PyBytes>> {
match trades_to_arrow_record_batch_bytes(&data) {
Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
Err(e) => Err(to_pyvalue_err(e)),
}
}
#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::needless_pass_by_value)]
pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
match bars_to_arrow_record_batch_bytes(&data) {
Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
Err(e) => Err(to_pyvalue_err(e)),
}
}
#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::needless_pass_by_value)]
pub fn py_mark_prices_to_arrow_record_batch_bytes(
py: Python,
data: Vec<MarkPriceUpdate>,
) -> PyResult<Py<PyBytes>> {
match mark_prices_to_arrow_record_batch_bytes(&data) {
Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
Err(e) => Err(to_pyvalue_err(e)),
}
}
#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::needless_pass_by_value)]
pub fn py_index_prices_to_arrow_record_batch_bytes(
py: Python,
data: Vec<IndexPriceUpdate>,
) -> PyResult<Py<PyBytes>> {
match index_prices_to_arrow_record_batch_bytes(&data) {
Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
Err(e) => Err(to_pyvalue_err(e)),
}
}
#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
#[allow(clippy::needless_pass_by_value)]
pub fn py_instrument_closes_to_arrow_record_batch_bytes(
py: Python,
data: Vec<InstrumentClose>,
) -> PyResult<Py<PyBytes>> {
match instrument_closes_to_arrow_record_batch_bytes(&data) {
Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
Err(e) => Err(to_pyvalue_err(e)),
}
}