Skip to main content

nautilus_binance/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::ipc::reader::StreamReader;
19use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
20use nautilus_serialization::{
21    arrow::ArrowSchemaProvider, python::arrow::arrow_record_batch_to_pybytes,
22};
23use pyo3::{
24    conversion::IntoPyObjectExt,
25    prelude::*,
26    types::{PyBytes, PyType},
27};
28
29use crate::{
30    arrow::bar::{binance_bar_to_arrow_record_batch, decode_binance_bar_batch},
31    common::bar::BinanceBar,
32};
33
34/// Returns a mapping from field names to Arrow data types for the `BinanceBar` class.
35///
36/// # Errors
37///
38/// Returns a `PyErr` if the class name is not recognized.
39#[pyfunction]
40#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.binance")]
41pub fn get_binance_arrow_schema_map(
42    py: Python<'_>,
43    cls: &Bound<'_, PyType>,
44) -> PyResult<Py<PyAny>> {
45    let cls_str: String = cls.getattr("__name__")?.extract()?;
46    let result_map = match cls_str.as_str() {
47        stringify!(BinanceBar) => BinanceBar::get_schema_map(),
48        _ => {
49            return Err(to_pyvalue_err(format!(
50                "Arrow schema for `{cls_str}` is not currently implemented"
51            )));
52        }
53    };
54
55    result_map.into_py_any(py)
56}
57
58/// Encodes a list of `BinanceBar` into Arrow IPC bytes.
59///
60/// # Errors
61///
62/// Returns a `PyErr` if encoding fails.
63#[pyfunction(name = "binance_bar_to_arrow_record_batch_bytes")]
64#[allow(clippy::needless_pass_by_value)]
65pub fn py_binance_bar_to_arrow_record_batch_bytes(
66    py: Python,
67    data: Vec<BinanceBar>,
68) -> PyResult<Py<PyBytes>> {
69    match binance_bar_to_arrow_record_batch(&data) {
70        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
71        Err(e) => Err(to_pyvalue_err(e)),
72    }
73}
74
75/// Decodes Arrow IPC bytes into a list of `BinanceBar`.
76///
77/// # Errors
78///
79/// Returns a `PyErr` if decoding fails.
80#[pyfunction(name = "binance_bar_from_arrow_record_batch_bytes")]
81pub fn py_binance_bar_from_arrow_record_batch_bytes(
82    _py: Python,
83    data: Vec<u8>,
84) -> PyResult<Vec<BinanceBar>> {
85    let cursor = Cursor::new(data);
86    let reader = StreamReader::try_new(cursor, None).map_err(to_pyruntime_err)?;
87
88    let mut results = Vec::new();
89    for batch_result in reader {
90        let batch = batch_result.map_err(to_pyruntime_err)?;
91        let metadata = batch.schema().metadata().clone();
92        let decoded = decode_binance_bar_batch(&metadata, &batch).map_err(to_pyvalue_err)?;
93        results.extend(decoded);
94    }
95
96    Ok(results)
97}