nautilus_serialization/python/
arrow.rs1use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::{to_pyruntime_err, to_pytype_err, to_pyvalue_err};
20use nautilus_model::{
21 data::{
22 Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick, close::InstrumentClose,
24 },
25 python::data::{
26 pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27 pyobjects_to_instrument_closes, pyobjects_to_mark_prices, pyobjects_to_quotes,
28 pyobjects_to_trades,
29 },
30};
31use pyo3::{
32 conversion::IntoPyObjectExt,
33 prelude::*,
34 types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38 ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39 book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
40 instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
41 quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
42};
43
44fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
46 let mut cursor = Cursor::new(Vec::new());
48 {
49 let mut writer =
50 StreamWriter::try_new(&mut cursor, &batch.schema()).map_err(to_pyruntime_err)?;
51
52 writer.write(&batch).map_err(to_pyruntime_err)?;
53
54 writer.finish().map_err(to_pyruntime_err)?;
55 }
56
57 let buffer = cursor.into_inner();
58 let pybytes = PyBytes::new(py, &buffer);
59
60 Ok(pybytes.into())
61}
62
63#[pyfunction]
69pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
70 let cls_str: String = cls.getattr("__name__")?.extract()?;
71 let result_map = match cls_str.as_str() {
72 stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
73 stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
74 stringify!(QuoteTick) => QuoteTick::get_schema_map(),
75 stringify!(TradeTick) => TradeTick::get_schema_map(),
76 stringify!(Bar) => Bar::get_schema_map(),
77 stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
78 stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
79 stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
80 _ => {
81 return Err(to_pytype_err(format!(
82 "Arrow schema for `{cls_str}` is not currently implemented in Rust."
83 )));
84 }
85 };
86
87 result_map.into_py_any(py)
88}
89
90#[pyfunction]
100#[allow(clippy::missing_panics_doc)] pub fn pyobjects_to_arrow_record_batch_bytes(
102 py: Python,
103 data: Vec<Bound<'_, PyAny>>,
104) -> PyResult<Py<PyBytes>> {
105 if data.is_empty() {
106 return Err(to_pyvalue_err("Empty data"));
107 }
108
109 let data_type: String = data
110 .first()
111 .unwrap() .getattr("__class__")?
113 .getattr("__name__")?
114 .extract()?;
115
116 match data_type.as_str() {
117 stringify!(OrderBookDelta) => {
118 let deltas = pyobjects_to_book_deltas(data)?;
119 py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
120 }
121 stringify!(OrderBookDepth10) => {
122 let depth_snapshots: Vec<OrderBookDepth10> = data
123 .into_iter()
124 .map(|obj| obj.extract::<OrderBookDepth10>().map_err(Into::into))
125 .collect::<PyResult<Vec<OrderBookDepth10>>>()?;
126 py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
127 }
128 stringify!(QuoteTick) => {
129 let quotes = pyobjects_to_quotes(data)?;
130 py_quotes_to_arrow_record_batch_bytes(py, quotes)
131 }
132 stringify!(TradeTick) => {
133 let trades = pyobjects_to_trades(data)?;
134 py_trades_to_arrow_record_batch_bytes(py, trades)
135 }
136 stringify!(Bar) => {
137 let bars = pyobjects_to_bars(data)?;
138 py_bars_to_arrow_record_batch_bytes(py, bars)
139 }
140 stringify!(MarkPriceUpdate) => {
141 let updates = pyobjects_to_mark_prices(data)?;
142 py_mark_prices_to_arrow_record_batch_bytes(py, updates)
143 }
144 stringify!(IndexPriceUpdate) => {
145 let index_prices = pyobjects_to_index_prices(data)?;
146 py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
147 }
148 stringify!(InstrumentClose) => {
149 let closes = pyobjects_to_instrument_closes(data)?;
150 py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
151 }
152 _ => Err(to_pyvalue_err(format!(
153 "unsupported data type: {data_type}"
154 ))),
155 }
156}
157
158#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
164pub fn py_book_deltas_to_arrow_record_batch_bytes(
165 py: Python,
166 data: Vec<OrderBookDelta>,
167) -> PyResult<Py<PyBytes>> {
168 match book_deltas_to_arrow_record_batch_bytes(data) {
169 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
170 Err(e) => Err(to_pyvalue_err(e)),
171 }
172}
173
174#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
180pub fn py_book_depth10_to_arrow_record_batch_bytes(
181 py: Python,
182 data: Vec<OrderBookDepth10>,
183) -> PyResult<Py<PyBytes>> {
184 match book_depth10_to_arrow_record_batch_bytes(data) {
185 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
186 Err(e) => Err(to_pyvalue_err(e)),
187 }
188}
189
190#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
196pub fn py_quotes_to_arrow_record_batch_bytes(
197 py: Python,
198 data: Vec<QuoteTick>,
199) -> PyResult<Py<PyBytes>> {
200 match quotes_to_arrow_record_batch_bytes(data) {
201 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
202 Err(e) => Err(to_pyvalue_err(e)),
203 }
204}
205
206#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
212pub fn py_trades_to_arrow_record_batch_bytes(
213 py: Python,
214 data: Vec<TradeTick>,
215) -> PyResult<Py<PyBytes>> {
216 match trades_to_arrow_record_batch_bytes(data) {
217 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
218 Err(e) => Err(to_pyvalue_err(e)),
219 }
220}
221
222#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
228pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
229 match bars_to_arrow_record_batch_bytes(data) {
230 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
231 Err(e) => Err(to_pyvalue_err(e)),
232 }
233}
234
235#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
241pub fn py_mark_prices_to_arrow_record_batch_bytes(
242 py: Python,
243 data: Vec<MarkPriceUpdate>,
244) -> PyResult<Py<PyBytes>> {
245 match mark_prices_to_arrow_record_batch_bytes(data) {
246 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
247 Err(e) => Err(to_pyvalue_err(e)),
248 }
249}
250
251#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
257pub fn py_index_prices_to_arrow_record_batch_bytes(
258 py: Python,
259 data: Vec<IndexPriceUpdate>,
260) -> PyResult<Py<PyBytes>> {
261 match index_prices_to_arrow_record_batch_bytes(data) {
262 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
263 Err(e) => Err(to_pyvalue_err(e)),
264 }
265}
266
267#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
273pub fn py_instrument_closes_to_arrow_record_batch_bytes(
274 py: Python,
275 data: Vec<InstrumentClose>,
276) -> PyResult<Py<PyBytes>> {
277 match instrument_closes_to_arrow_record_batch_bytes(data) {
278 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
279 Err(e) => Err(to_pyvalue_err(e)),
280 }
281}