nautilus_serialization/python/
arrow.rs1use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::{to_pyruntime_err, to_pytype_err, to_pyvalue_err};
20use nautilus_model::{
21 data::{
22 Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick, close::InstrumentClose,
24 },
25 python::data::{
26 pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27 pyobjects_to_instrument_closes, pyobjects_to_mark_prices, pyobjects_to_quotes,
28 pyobjects_to_trades,
29 },
30};
31use pyo3::{
32 conversion::IntoPyObjectExt,
33 prelude::*,
34 types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38 ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39 book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
40 instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
41 quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
42};
43
44pub fn arrow_record_batch_to_pybytes(py: Python, batch: &RecordBatch) -> PyResult<Py<PyBytes>> {
50 let mut cursor = Cursor::new(Vec::new());
52 {
53 let mut writer =
54 StreamWriter::try_new(&mut cursor, &batch.schema()).map_err(to_pyruntime_err)?;
55
56 writer.write(batch).map_err(to_pyruntime_err)?;
57
58 writer.finish().map_err(to_pyruntime_err)?;
59 }
60
61 let buffer = cursor.into_inner();
62 let pybytes = PyBytes::new(py, &buffer);
63
64 Ok(pybytes.into())
65}
66
67#[pyfunction]
73#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
74pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
75 let cls_str: String = cls.getattr("__name__")?.extract()?;
76 let result_map = match cls_str.as_str() {
77 stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
78 stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
79 stringify!(QuoteTick) => QuoteTick::get_schema_map(),
80 stringify!(TradeTick) => TradeTick::get_schema_map(),
81 stringify!(Bar) => Bar::get_schema_map(),
82 stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
83 stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
84 stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
85 _ => {
86 return Err(to_pytype_err(format!(
87 "Arrow schema for `{cls_str}` is not currently implemented in Rust."
88 )));
89 }
90 };
91
92 result_map.into_py_any(py)
93}
94
95#[pyfunction]
97#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
98#[allow(clippy::missing_panics_doc)] pub fn pyobjects_to_arrow_record_batch_bytes(
100 py: Python,
101 data: Vec<Bound<'_, PyAny>>,
102) -> PyResult<Py<PyBytes>> {
103 if data.is_empty() {
104 return Err(to_pyvalue_err("Empty data"));
105 }
106
107 let data_type: String = data
108 .first()
109 .unwrap() .getattr("__class__")?
111 .getattr("__name__")?
112 .extract()?;
113
114 match data_type.as_str() {
115 stringify!(OrderBookDelta) => {
116 let deltas = pyobjects_to_book_deltas(data)?;
117 py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
118 }
119 stringify!(OrderBookDepth10) => {
120 let depth_snapshots: Vec<OrderBookDepth10> = data
121 .into_iter()
122 .map(|obj| obj.extract::<OrderBookDepth10>().map_err(Into::into))
123 .collect::<PyResult<Vec<OrderBookDepth10>>>()?;
124 py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
125 }
126 stringify!(QuoteTick) => {
127 let quotes = pyobjects_to_quotes(data)?;
128 py_quotes_to_arrow_record_batch_bytes(py, quotes)
129 }
130 stringify!(TradeTick) => {
131 let trades = pyobjects_to_trades(data)?;
132 py_trades_to_arrow_record_batch_bytes(py, trades)
133 }
134 stringify!(Bar) => {
135 let bars = pyobjects_to_bars(data)?;
136 py_bars_to_arrow_record_batch_bytes(py, bars)
137 }
138 stringify!(MarkPriceUpdate) => {
139 let updates = pyobjects_to_mark_prices(data)?;
140 py_mark_prices_to_arrow_record_batch_bytes(py, updates)
141 }
142 stringify!(IndexPriceUpdate) => {
143 let index_prices = pyobjects_to_index_prices(data)?;
144 py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
145 }
146 stringify!(InstrumentClose) => {
147 let closes = pyobjects_to_instrument_closes(data)?;
148 py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
149 }
150 _ => Err(to_pyvalue_err(format!(
151 "unsupported data type: {data_type}"
152 ))),
153 }
154}
155
156#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
162#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
163#[allow(clippy::needless_pass_by_value)]
164pub fn py_book_deltas_to_arrow_record_batch_bytes(
165 py: Python,
166 data: Vec<OrderBookDelta>,
167) -> PyResult<Py<PyBytes>> {
168 match book_deltas_to_arrow_record_batch_bytes(&data) {
169 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
170 Err(e) => Err(to_pyvalue_err(e)),
171 }
172}
173
174#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
180#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
181#[allow(clippy::needless_pass_by_value)]
182pub fn py_book_depth10_to_arrow_record_batch_bytes(
183 py: Python,
184 data: Vec<OrderBookDepth10>,
185) -> PyResult<Py<PyBytes>> {
186 match book_depth10_to_arrow_record_batch_bytes(&data) {
187 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
188 Err(e) => Err(to_pyvalue_err(e)),
189 }
190}
191
192#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
198#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
199#[allow(clippy::needless_pass_by_value)]
200pub fn py_quotes_to_arrow_record_batch_bytes(
201 py: Python,
202 data: Vec<QuoteTick>,
203) -> PyResult<Py<PyBytes>> {
204 match quotes_to_arrow_record_batch_bytes(&data) {
205 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
206 Err(e) => Err(to_pyvalue_err(e)),
207 }
208}
209
210#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
216#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
217#[allow(clippy::needless_pass_by_value)]
218pub fn py_trades_to_arrow_record_batch_bytes(
219 py: Python,
220 data: Vec<TradeTick>,
221) -> PyResult<Py<PyBytes>> {
222 match trades_to_arrow_record_batch_bytes(&data) {
223 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
224 Err(e) => Err(to_pyvalue_err(e)),
225 }
226}
227
228#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
234#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
235#[allow(clippy::needless_pass_by_value)]
236pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
237 match bars_to_arrow_record_batch_bytes(&data) {
238 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
239 Err(e) => Err(to_pyvalue_err(e)),
240 }
241}
242
243#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
249#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
250#[allow(clippy::needless_pass_by_value)]
251pub fn py_mark_prices_to_arrow_record_batch_bytes(
252 py: Python,
253 data: Vec<MarkPriceUpdate>,
254) -> PyResult<Py<PyBytes>> {
255 match mark_prices_to_arrow_record_batch_bytes(&data) {
256 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
257 Err(e) => Err(to_pyvalue_err(e)),
258 }
259}
260
261#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
267#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
268#[allow(clippy::needless_pass_by_value)]
269pub fn py_index_prices_to_arrow_record_batch_bytes(
270 py: Python,
271 data: Vec<IndexPriceUpdate>,
272) -> PyResult<Py<PyBytes>> {
273 match index_prices_to_arrow_record_batch_bytes(&data) {
274 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
275 Err(e) => Err(to_pyvalue_err(e)),
276 }
277}
278
279#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
285#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
286#[allow(clippy::needless_pass_by_value)]
287pub fn py_instrument_closes_to_arrow_record_batch_bytes(
288 py: Python,
289 data: Vec<InstrumentClose>,
290) -> PyResult<Py<PyBytes>> {
291 match instrument_closes_to_arrow_record_batch_bytes(&data) {
292 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
293 Err(e) => Err(to_pyvalue_err(e)),
294 }
295}