nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28    collections::HashMap,
29    io::{self, Write},
30};
31
32use arrow::{
33    array::{Array, ArrayRef},
34    datatypes::{DataType, Schema},
35    error::ArrowError,
36    ipc::writer::StreamWriter,
37    record_batch::RecordBatch,
38};
39use nautilus_model::{
40    data::{
41        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43    },
44    types::{price::PriceRaw, quantity::QuantityRaw},
45};
46#[cfg(feature = "python")]
47use pyo3::prelude::*;
48
49// Define metadata key constants constants
50const KEY_BAR_TYPE: &str = "bar_type";
51pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
52const KEY_PRICE_PRECISION: &str = "price_precision";
53const KEY_SIZE_PRECISION: &str = "size_precision";
54
55#[derive(thiserror::Error, Debug)]
56pub enum DataStreamingError {
57    #[error("I/O error: {0}")]
58    IoError(#[from] io::Error),
59    #[error("Arrow error: {0}")]
60    ArrowError(#[from] arrow::error::ArrowError),
61    #[cfg(feature = "python")]
62    #[error("Python error: {0}")]
63    PythonError(#[from] PyErr),
64}
65
66#[derive(thiserror::Error, Debug)]
67pub enum EncodingError {
68    #[error("Empty data")]
69    EmptyData,
70    #[error("Missing metadata key: `{0}`")]
71    MissingMetadata(&'static str),
72    #[error("Missing data column: `{0}` at index {1}")]
73    MissingColumn(&'static str, usize),
74    #[error("Error parsing `{0}`: {1}")]
75    ParseError(&'static str, String),
76    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
77    InvalidColumnType(&'static str, usize, DataType, DataType),
78    #[error("Arrow error: {0}")]
79    ArrowError(#[from] arrow::error::ArrowError),
80}
81
82#[inline]
83fn get_raw_price(bytes: &[u8]) -> PriceRaw {
84    PriceRaw::from_le_bytes(bytes.try_into().unwrap())
85}
86
87#[inline]
88fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
89    QuantityRaw::from_le_bytes(bytes.try_into().unwrap())
90}
91
92pub trait ArrowSchemaProvider {
93    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
94
95    #[must_use]
96    fn get_schema_map() -> HashMap<String, String> {
97        let schema = Self::get_schema(None);
98        let mut map = HashMap::new();
99        for field in schema.fields() {
100            let name = field.name().to_string();
101            let data_type = format!("{:?}", field.data_type());
102            map.insert(name, data_type);
103        }
104        map
105    }
106}
107
108pub trait EncodeToRecordBatch
109where
110    Self: Sized + ArrowSchemaProvider,
111{
112    fn encode_batch(
113        metadata: &HashMap<String, String>,
114        data: &[Self],
115    ) -> Result<RecordBatch, ArrowError>;
116
117    fn metadata(&self) -> HashMap<String, String>;
118    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
119        chunk
120            .first()
121            .map(|elem| elem.metadata())
122            .expect("Chunk must have atleast one element to encode")
123    }
124}
125
126pub trait DecodeFromRecordBatch
127where
128    Self: Sized + Into<Data> + ArrowSchemaProvider,
129{
130    fn decode_batch(
131        metadata: &HashMap<String, String>,
132        record_batch: RecordBatch,
133    ) -> Result<Vec<Self>, EncodingError>;
134}
135
136pub trait DecodeDataFromRecordBatch
137where
138    Self: Sized + Into<Data> + ArrowSchemaProvider,
139{
140    fn decode_data_batch(
141        metadata: &HashMap<String, String>,
142        record_batch: RecordBatch,
143    ) -> Result<Vec<Data>, EncodingError>;
144}
145
146pub trait WriteStream {
147    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
148}
149
150impl<T: EncodeToRecordBatch + Write> WriteStream for T {
151    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
152        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
153        writer.write(record_batch)?;
154        writer.finish()?;
155        Ok(())
156    }
157}
158
159pub fn extract_column<'a, T: Array + 'static>(
160    cols: &'a [ArrayRef],
161    column_key: &'static str,
162    column_index: usize,
163    expected_type: DataType,
164) -> Result<&'a T, EncodingError> {
165    let column_values = cols
166        .get(column_index)
167        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
168    let downcasted_values =
169        column_values
170            .as_any()
171            .downcast_ref::<T>()
172            .ok_or(EncodingError::InvalidColumnType(
173                column_key,
174                column_index,
175                expected_type,
176                column_values.data_type().clone(),
177            ))?;
178    Ok(downcasted_values)
179}
180
181pub fn book_deltas_to_arrow_record_batch_bytes(
182    data: Vec<OrderBookDelta>,
183) -> Result<RecordBatch, EncodingError> {
184    if data.is_empty() {
185        return Err(EncodingError::EmptyData);
186    }
187
188    // Extract metadata from chunk
189    let metadata = OrderBookDelta::chunk_metadata(&data);
190    OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
191}
192
193pub fn book_depth10_to_arrow_record_batch_bytes(
194    data: Vec<OrderBookDepth10>,
195) -> Result<RecordBatch, EncodingError> {
196    if data.is_empty() {
197        return Err(EncodingError::EmptyData);
198    }
199
200    // Take first element and extract metadata
201    // SAFETY: Unwrap safe as already checked that `data` not empty
202    let first = data.first().unwrap();
203    let metadata = first.metadata();
204    OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
205}
206
207pub fn quotes_to_arrow_record_batch_bytes(
208    data: Vec<QuoteTick>,
209) -> Result<RecordBatch, EncodingError> {
210    if data.is_empty() {
211        return Err(EncodingError::EmptyData);
212    }
213
214    // Take first element and extract metadata
215    // SAFETY: Unwrap safe as already checked that `data` not empty
216    let first = data.first().unwrap();
217    let metadata = first.metadata();
218    QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
219}
220
221pub fn trades_to_arrow_record_batch_bytes(
222    data: Vec<TradeTick>,
223) -> Result<RecordBatch, EncodingError> {
224    if data.is_empty() {
225        return Err(EncodingError::EmptyData);
226    }
227
228    // Take first element and extract metadata
229    // SAFETY: Unwrap safe as already checked that `data` not empty
230    let first = data.first().unwrap();
231    let metadata = first.metadata();
232    TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
233}
234
235pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
236    if data.is_empty() {
237        return Err(EncodingError::EmptyData);
238    }
239
240    // Take first element and extract metadata
241    // SAFETY: Unwrap safe as already checked that `data` not empty
242    let first = data.first().unwrap();
243    let metadata = first.metadata();
244    Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
245}
246
247pub fn mark_prices_to_arrow_record_batch_bytes(
248    data: Vec<MarkPriceUpdate>,
249) -> Result<RecordBatch, EncodingError> {
250    if data.is_empty() {
251        return Err(EncodingError::EmptyData);
252    }
253
254    // Take first element and extract metadata
255    // SAFETY: Unwrap safe as already checked that `data` not empty
256    let first = data.first().unwrap();
257    let metadata = first.metadata();
258    MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
259}
260
261pub fn index_prices_to_arrow_record_batch_bytes(
262    data: Vec<IndexPriceUpdate>,
263) -> Result<RecordBatch, EncodingError> {
264    if data.is_empty() {
265        return Err(EncodingError::EmptyData);
266    }
267
268    // Take first element and extract metadata
269    // SAFETY: Unwrap safe as already checked that `data` not empty
270    let first = data.first().unwrap();
271    let metadata = first.metadata();
272    IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
273}
274
275pub fn instrument_closes_to_arrow_record_batch_bytes(
276    data: Vec<InstrumentClose>,
277) -> Result<RecordBatch, EncodingError> {
278    if data.is_empty() {
279        return Err(EncodingError::EmptyData);
280    }
281
282    // Take first element and extract metadata
283    // SAFETY: Unwrap safe as already checked that `data` not empty
284    let first = data.first().unwrap();
285    let metadata = first.metadata();
286    InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
287}