Skip to main content

nautilus_serialization/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::{to_pyruntime_err, to_pytype_err, to_pyvalue_err};
20use nautilus_model::{
21    data::{
22        Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick, close::InstrumentClose,
24    },
25    python::data::{
26        pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27        pyobjects_to_instrument_closes, pyobjects_to_mark_prices, pyobjects_to_quotes,
28        pyobjects_to_trades,
29    },
30};
31use pyo3::{
32    conversion::IntoPyObjectExt,
33    prelude::*,
34    types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38    ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39    book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
40    instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
41    quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
42};
43
44/// Transforms the given record `batch` into Python `bytes`.
45///
46/// # Errors
47///
48/// Returns a `PyErr` if writing the Arrow IPC stream fails.
49pub fn arrow_record_batch_to_pybytes(py: Python, batch: &RecordBatch) -> PyResult<Py<PyBytes>> {
50    // Create a cursor to write to a byte array in memory
51    let mut cursor = Cursor::new(Vec::new());
52    {
53        let mut writer =
54            StreamWriter::try_new(&mut cursor, &batch.schema()).map_err(to_pyruntime_err)?;
55
56        writer.write(batch).map_err(to_pyruntime_err)?;
57
58        writer.finish().map_err(to_pyruntime_err)?;
59    }
60
61    let buffer = cursor.into_inner();
62    let pybytes = PyBytes::new(py, &buffer);
63
64    Ok(pybytes.into())
65}
66
67/// Returns a mapping from field names to Arrow data types for the given Rust data class.
68///
69/// # Errors
70///
71/// Returns a `PyErr` if the class name is not recognized or schema extraction fails.
72#[pyfunction]
73#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
74pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
75    let cls_str: String = cls.getattr("__name__")?.extract()?;
76    let result_map = match cls_str.as_str() {
77        stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
78        stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
79        stringify!(QuoteTick) => QuoteTick::get_schema_map(),
80        stringify!(TradeTick) => TradeTick::get_schema_map(),
81        stringify!(Bar) => Bar::get_schema_map(),
82        stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
83        stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
84        stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
85        _ => {
86            return Err(to_pytype_err(format!(
87                "Arrow schema for `{cls_str}` is not currently implemented in Rust."
88            )));
89        }
90    };
91
92    result_map.into_py_any(py)
93}
94
95/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
96#[pyfunction]
97#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
98#[allow(clippy::missing_panics_doc)] // Guarded by empty check
99pub fn pyobjects_to_arrow_record_batch_bytes(
100    py: Python,
101    data: Vec<Bound<'_, PyAny>>,
102) -> PyResult<Py<PyBytes>> {
103    if data.is_empty() {
104        return Err(to_pyvalue_err("Empty data"));
105    }
106
107    let data_type: String = data
108        .first()
109        .unwrap() // SAFETY: Unwrap safe as already checked that `data` not empty
110        .getattr("__class__")?
111        .getattr("__name__")?
112        .extract()?;
113
114    match data_type.as_str() {
115        stringify!(OrderBookDelta) => {
116            let deltas = pyobjects_to_book_deltas(data)?;
117            py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
118        }
119        stringify!(OrderBookDepth10) => {
120            let depth_snapshots: Vec<OrderBookDepth10> = data
121                .into_iter()
122                .map(|obj| obj.extract::<OrderBookDepth10>().map_err(Into::into))
123                .collect::<PyResult<Vec<OrderBookDepth10>>>()?;
124            py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
125        }
126        stringify!(QuoteTick) => {
127            let quotes = pyobjects_to_quotes(data)?;
128            py_quotes_to_arrow_record_batch_bytes(py, quotes)
129        }
130        stringify!(TradeTick) => {
131            let trades = pyobjects_to_trades(data)?;
132            py_trades_to_arrow_record_batch_bytes(py, trades)
133        }
134        stringify!(Bar) => {
135            let bars = pyobjects_to_bars(data)?;
136            py_bars_to_arrow_record_batch_bytes(py, bars)
137        }
138        stringify!(MarkPriceUpdate) => {
139            let updates = pyobjects_to_mark_prices(data)?;
140            py_mark_prices_to_arrow_record_batch_bytes(py, updates)
141        }
142        stringify!(IndexPriceUpdate) => {
143            let index_prices = pyobjects_to_index_prices(data)?;
144            py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
145        }
146        stringify!(InstrumentClose) => {
147            let closes = pyobjects_to_instrument_closes(data)?;
148            py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
149        }
150        _ => Err(to_pyvalue_err(format!(
151            "unsupported data type: {data_type}"
152        ))),
153    }
154}
155
156/// Converts a list of `OrderBookDelta` into Arrow IPC bytes for Python.
157///
158/// # Errors
159///
160/// Returns a `PyErr` if encoding fails.
161#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
162#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
163#[allow(clippy::needless_pass_by_value)]
164pub fn py_book_deltas_to_arrow_record_batch_bytes(
165    py: Python,
166    data: Vec<OrderBookDelta>,
167) -> PyResult<Py<PyBytes>> {
168    match book_deltas_to_arrow_record_batch_bytes(&data) {
169        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
170        Err(e) => Err(to_pyvalue_err(e)),
171    }
172}
173
174/// Converts a list of `OrderBookDepth10` into Arrow IPC bytes for Python.
175///
176/// # Errors
177///
178/// Returns a `PyErr` if encoding fails.
179#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
180#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
181#[allow(clippy::needless_pass_by_value)]
182pub fn py_book_depth10_to_arrow_record_batch_bytes(
183    py: Python,
184    data: Vec<OrderBookDepth10>,
185) -> PyResult<Py<PyBytes>> {
186    match book_depth10_to_arrow_record_batch_bytes(&data) {
187        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
188        Err(e) => Err(to_pyvalue_err(e)),
189    }
190}
191
192/// Converts a list of `QuoteTick` into Arrow IPC bytes for Python.
193///
194/// # Errors
195///
196/// Returns a `PyErr` if encoding fails.
197#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
198#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
199#[allow(clippy::needless_pass_by_value)]
200pub fn py_quotes_to_arrow_record_batch_bytes(
201    py: Python,
202    data: Vec<QuoteTick>,
203) -> PyResult<Py<PyBytes>> {
204    match quotes_to_arrow_record_batch_bytes(&data) {
205        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
206        Err(e) => Err(to_pyvalue_err(e)),
207    }
208}
209
210/// Converts a list of `TradeTick` into Arrow IPC bytes for Python.
211///
212/// # Errors
213///
214/// Returns a `PyErr` if encoding fails.
215#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
216#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
217#[allow(clippy::needless_pass_by_value)]
218pub fn py_trades_to_arrow_record_batch_bytes(
219    py: Python,
220    data: Vec<TradeTick>,
221) -> PyResult<Py<PyBytes>> {
222    match trades_to_arrow_record_batch_bytes(&data) {
223        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
224        Err(e) => Err(to_pyvalue_err(e)),
225    }
226}
227
228/// Converts a list of `Bar` into Arrow IPC bytes for Python.
229///
230/// # Errors
231///
232/// Returns a `PyErr` if encoding fails.
233#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
234#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
235#[allow(clippy::needless_pass_by_value)]
236pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
237    match bars_to_arrow_record_batch_bytes(&data) {
238        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
239        Err(e) => Err(to_pyvalue_err(e)),
240    }
241}
242
243/// Converts a list of `MarkPriceUpdate` into Arrow IPC bytes for Python.
244///
245/// # Errors
246///
247/// Returns a `PyErr` if encoding fails.
248#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
249#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
250#[allow(clippy::needless_pass_by_value)]
251pub fn py_mark_prices_to_arrow_record_batch_bytes(
252    py: Python,
253    data: Vec<MarkPriceUpdate>,
254) -> PyResult<Py<PyBytes>> {
255    match mark_prices_to_arrow_record_batch_bytes(&data) {
256        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
257        Err(e) => Err(to_pyvalue_err(e)),
258    }
259}
260
261/// Converts a list of `IndexPriceUpdate` into Arrow IPC bytes for Python.
262///
263/// # Errors
264///
265/// Returns a `PyErr` if encoding fails.
266#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
267#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
268#[allow(clippy::needless_pass_by_value)]
269pub fn py_index_prices_to_arrow_record_batch_bytes(
270    py: Python,
271    data: Vec<IndexPriceUpdate>,
272) -> PyResult<Py<PyBytes>> {
273    match index_prices_to_arrow_record_batch_bytes(&data) {
274        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
275        Err(e) => Err(to_pyvalue_err(e)),
276    }
277}
278
279/// Converts a list of `InstrumentClose` into Arrow IPC bytes for Python.
280///
281/// # Errors
282///
283/// Returns a `PyErr` if encoding fails.
284#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
285#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
286#[allow(clippy::needless_pass_by_value)]
287pub fn py_instrument_closes_to_arrow_record_batch_bytes(
288    py: Python,
289    data: Vec<InstrumentClose>,
290) -> PyResult<Py<PyBytes>> {
291    match instrument_closes_to_arrow_record_batch_bytes(&data) {
292        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
293        Err(e) => Err(to_pyvalue_err(e)),
294    }
295}