Skip to main content

nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod bar;
19pub mod close;
20pub mod custom;
21pub mod delta;
22pub mod depth;
23pub mod index_price;
24pub mod instrument;
25pub mod mark_price;
26pub mod quote;
27pub mod trade;
28
29use std::{
30    collections::HashMap,
31    io::{self, Write},
32};
33
34use arrow::{
35    array::{Array, ArrayRef, FixedSizeBinaryArray, StringArray, StringViewArray},
36    datatypes::{DataType, Schema},
37    error::ArrowError,
38    ipc::writer::StreamWriter,
39    record_batch::RecordBatch,
40};
41use nautilus_model::{
42    data::{
43        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
44        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
45    },
46    types::{
47        PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
48        fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
49        price::PriceRaw,
50        quantity::QuantityRaw,
51    },
52};
53#[cfg(feature = "python")]
54use pyo3::prelude::*;
55
56// Define metadata key constants constants
57const KEY_BAR_TYPE: &str = "bar_type";
58pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
59pub const KEY_PRICE_PRECISION: &str = "price_precision";
60pub const KEY_SIZE_PRECISION: &str = "size_precision";
61
62#[derive(thiserror::Error, Debug)]
63pub enum DataStreamingError {
64    #[error("I/O error: {0}")]
65    IoError(#[from] io::Error),
66    #[error("Arrow error: {0}")]
67    ArrowError(#[from] arrow::error::ArrowError),
68    #[cfg(feature = "python")]
69    #[error("Python error: {0}")]
70    PythonError(#[from] PyErr),
71}
72
73#[derive(thiserror::Error, Debug)]
74pub enum EncodingError {
75    #[error("Empty data")]
76    EmptyData,
77    #[error("Missing metadata key: `{0}`")]
78    MissingMetadata(&'static str),
79    #[error("Missing data column: `{0}` at index {1}")]
80    MissingColumn(&'static str, usize),
81    #[error("Error parsing `{0}`: {1}")]
82    ParseError(&'static str, String),
83    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
84    InvalidColumnType(&'static str, usize, DataType, DataType),
85    #[error(
86        "Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
87         but this build expects {expected_bytes} bytes. The catalog was created with a different \
88         precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
89         build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
90    )]
91    PrecisionMismatch {
92        field: &'static str,
93        expected_bytes: i32,
94        actual_bytes: i32,
95    },
96    #[error("Arrow error: {0}")]
97    ArrowError(#[from] arrow::error::ArrowError),
98}
99
100#[inline]
101fn get_raw_price(bytes: &[u8]) -> PriceRaw {
102    PriceRaw::from_le_bytes(
103        bytes
104            .try_into()
105            .expect("Price raw bytes must be exactly the size of PriceRaw"),
106    )
107}
108
109#[inline]
110fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
111    QuantityRaw::from_le_bytes(
112        bytes
113            .try_into()
114            .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
115    )
116}
117
118/// Gets raw price bytes and corrects for floating-point precision errors in stored data.
119///
120/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
121/// introduces floating-point errors. This corrects the raw value to the nearest valid
122/// multiple of the scale factor for the given precision.
123///
124/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
125#[inline]
126fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
127    let raw = get_raw_price(bytes);
128
129    // Preserve sentinel values unchanged
130    if raw == PRICE_UNDEF || raw == PRICE_ERROR {
131        return raw;
132    }
133
134    correct_price_raw(raw, precision)
135}
136
137/// Gets raw quantity bytes and corrects for floating-point precision errors in stored data.
138///
139/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
140/// introduces floating-point errors. This corrects the raw value to the nearest valid
141/// multiple of the scale factor for the given precision.
142///
143/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
144#[inline]
145fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
146    let raw = get_raw_quantity(bytes);
147
148    // Preserve sentinel values unchanged
149    if raw == QUANTITY_UNDEF {
150        return raw;
151    }
152
153    correct_quantity_raw(raw, precision)
154}
155
156/// Decodes a [`Price`] from raw bytes with bounds validation.
157///
158/// Uses corrected raw values to handle floating-point precision errors in stored data.
159/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
160///
161/// # Errors
162///
163/// Returns an [`EncodingError::ParseError`] if the price value is out of bounds.
164pub fn decode_price(
165    bytes: &[u8],
166    precision: u8,
167    field: &'static str,
168    row: usize,
169) -> Result<Price, EncodingError> {
170    let raw = get_corrected_raw_price(bytes, precision);
171    Price::from_raw_checked(raw, precision)
172        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
173}
174
175/// Decodes a [`Quantity`] from raw bytes with bounds validation.
176///
177/// Uses corrected raw values to handle floating-point precision errors in stored data.
178/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
179///
180/// # Errors
181///
182/// Returns an [`EncodingError::ParseError`] if the quantity value is out of bounds.
183pub fn decode_quantity(
184    bytes: &[u8],
185    precision: u8,
186    field: &'static str,
187    row: usize,
188) -> Result<Quantity, EncodingError> {
189    let raw = get_corrected_raw_quantity(bytes, precision);
190    Quantity::from_raw_checked(raw, precision)
191        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
192}
193
194/// Decodes a [`Price`] from raw bytes, using precision 0 for sentinel values.
195///
196/// For order book data where sentinel values indicate empty levels.
197///
198/// # Errors
199///
200/// Returns an [`EncodingError::ParseError`] if the price value is out of bounds.
201pub fn decode_price_with_sentinel(
202    bytes: &[u8],
203    precision: u8,
204    field: &'static str,
205    row: usize,
206) -> Result<Price, EncodingError> {
207    let raw = get_raw_price(bytes);
208    let (final_raw, final_precision) = if raw == PRICE_UNDEF {
209        (raw, 0)
210    } else {
211        (get_corrected_raw_price(bytes, precision), precision)
212    };
213    Price::from_raw_checked(final_raw, final_precision)
214        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
215}
216
217/// Decodes a [`Quantity`] from raw bytes, using precision 0 for sentinel values.
218///
219/// For order book data where sentinel values indicate empty levels.
220///
221/// # Errors
222///
223/// Returns an [`EncodingError::ParseError`] if the quantity value is out of bounds.
224pub fn decode_quantity_with_sentinel(
225    bytes: &[u8],
226    precision: u8,
227    field: &'static str,
228    row: usize,
229) -> Result<Quantity, EncodingError> {
230    let raw = get_raw_quantity(bytes);
231    let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
232        (raw, 0)
233    } else {
234        (get_corrected_raw_quantity(bytes, precision), precision)
235    };
236    Quantity::from_raw_checked(final_raw, final_precision)
237        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
238}
239
240/// Provides Apache Arrow schema definitions for data types.
241pub trait ArrowSchemaProvider {
242    /// Returns the Arrow schema for this type with optional metadata.
243    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
244
245    /// Returns a map of field names to their Arrow data types.
246    #[must_use]
247    fn get_schema_map() -> HashMap<String, String> {
248        let schema = Self::get_schema(None);
249        let mut map = HashMap::new();
250        for field in schema.fields() {
251            let name = field.name().clone();
252            let data_type = format!("{:?}", field.data_type());
253            map.insert(name, data_type);
254        }
255        map
256    }
257}
258
259/// Encodes data types to Apache Arrow RecordBatch format.
260pub trait EncodeToRecordBatch
261where
262    Self: Sized + ArrowSchemaProvider,
263{
264    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
265    ///
266    /// # Errors
267    ///
268    /// Returns an `ArrowError` if the encoding fails.
269    fn encode_batch(
270        metadata: &HashMap<String, String>,
271        data: &[Self],
272    ) -> Result<RecordBatch, ArrowError>;
273
274    /// Returns the metadata for this data element.
275    fn metadata(&self) -> HashMap<String, String>;
276
277    /// Returns the metadata for the first element in a chunk.
278    ///
279    /// # Panics
280    ///
281    /// Panics if `chunk` is empty.
282    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
283        chunk
284            .first()
285            .map(|elem| elem.metadata())
286            .expect("Chunk must have at least one element to encode")
287    }
288}
289
290/// Decodes data types from Apache Arrow RecordBatch format.
291pub trait DecodeFromRecordBatch
292where
293    Self: Sized + Into<Data> + ArrowSchemaProvider,
294{
295    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
296    ///
297    /// # Errors
298    ///
299    /// Returns an `EncodingError` if the decoding fails.
300    fn decode_batch(
301        metadata: &HashMap<String, String>,
302        record_batch: RecordBatch,
303    ) -> Result<Vec<Self>, EncodingError>;
304}
305
306/// Decodes raw Data objects from Apache Arrow RecordBatch format.
307pub trait DecodeDataFromRecordBatch
308where
309    Self: Sized + ArrowSchemaProvider,
310{
311    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
312    ///
313    /// # Errors
314    ///
315    /// Returns an `EncodingError` if the decoding fails.
316    fn decode_data_batch(
317        metadata: &HashMap<String, String>,
318        record_batch: RecordBatch,
319    ) -> Result<Vec<Data>, EncodingError>;
320}
321
322/// Writes RecordBatch data to output streams.
323pub trait WriteStream {
324    /// Writes a `RecordBatch` to the implementing output stream.
325    ///
326    /// # Errors
327    ///
328    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
329    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
330}
331
332impl<T: Write> WriteStream for T {
333    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
334        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
335        writer.write(record_batch)?;
336        writer.finish()?;
337        Ok(())
338    }
339}
340
341/// Extracts a string column, accepting both Utf8 (`StringArray`) and Utf8View (`StringViewArray`).
342/// Parquet may return Utf8View when reading, so this handles both formats.
343///
344/// # Errors
345///
346/// Returns an error if:
347/// - `column_index` is out of range: `EncodingError::MissingColumn`.
348/// - The column type is neither Utf8 nor Utf8View: `EncodingError::InvalidColumnType`.
349pub fn extract_column_string<'a>(
350    cols: &'a [ArrayRef],
351    column_key: &'static str,
352    column_index: usize,
353) -> Result<StringColumnRef<'a>, EncodingError> {
354    let column_values = cols
355        .get(column_index)
356        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
357    let dt = column_values.data_type();
358    if let Some(arr) = column_values.as_any().downcast_ref::<StringArray>() {
359        Ok(StringColumnRef::Utf8(arr))
360    } else if let Some(arr) = column_values.as_any().downcast_ref::<StringViewArray>() {
361        Ok(StringColumnRef::Utf8View(arr))
362    } else {
363        Err(EncodingError::InvalidColumnType(
364            column_key,
365            column_index,
366            DataType::Utf8,
367            dt.clone(),
368        ))
369    }
370}
371
372/// Reference to a string column, either Utf8 or Utf8View.
373#[derive(Debug)]
374pub enum StringColumnRef<'a> {
375    Utf8(&'a StringArray),
376    Utf8View(&'a StringViewArray),
377}
378
379impl StringColumnRef<'_> {
380    /// Returns the string value at row `i`.
381    #[inline]
382    pub fn value(&self, i: usize) -> &str {
383        match self {
384            Self::Utf8(arr) => arr.value(i),
385            Self::Utf8View(arr) => arr.value(i),
386        }
387    }
388}
389
390/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
391///
392/// # Errors
393///
394/// Returns an error if:
395/// - `column_index` is out of range: `EncodingError::MissingColumn`.
396/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
397pub fn extract_column<'a, T: Array + 'static>(
398    cols: &'a [ArrayRef],
399    column_key: &'static str,
400    column_index: usize,
401    expected_type: DataType,
402) -> Result<&'a T, EncodingError> {
403    let column_values = cols
404        .get(column_index)
405        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
406    let downcasted_values =
407        column_values
408            .as_any()
409            .downcast_ref::<T>()
410            .ok_or(EncodingError::InvalidColumnType(
411                column_key,
412                column_index,
413                expected_type,
414                column_values.data_type().clone(),
415            ))?;
416    Ok(downcasted_values)
417}
418
419/// Validates that a [`FixedSizeBinaryArray`] has the expected precision byte width.
420///
421/// This detects precision mode mismatches that occur when catalog data was encoded
422/// with a different precision mode (64-bit standard vs 128-bit high-precision).
423///
424/// # Errors
425///
426/// Returns [`EncodingError::PrecisionMismatch`] if the actual byte width doesn't
427/// match [`PRECISION_BYTES`].
428pub fn validate_precision_bytes(
429    array: &FixedSizeBinaryArray,
430    field: &'static str,
431) -> Result<(), EncodingError> {
432    let actual = array.value_length();
433    if actual != PRECISION_BYTES {
434        return Err(EncodingError::PrecisionMismatch {
435            field,
436            expected_bytes: PRECISION_BYTES,
437            actual_bytes: actual,
438        });
439    }
440    Ok(())
441}
442
443/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
444///
445/// # Errors
446///
447/// Returns an error if:
448/// - `data` is empty: `EncodingError::EmptyData`.
449/// - Encoding fails: `EncodingError::ArrowError`.
450pub fn book_deltas_to_arrow_record_batch_bytes(
451    data: &[OrderBookDelta],
452) -> Result<RecordBatch, EncodingError> {
453    if data.is_empty() {
454        return Err(EncodingError::EmptyData);
455    }
456
457    // Extract metadata from chunk
458    let metadata = OrderBookDelta::chunk_metadata(data);
459    OrderBookDelta::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
460}
461
462/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
463///
464/// # Errors
465///
466/// Returns an error if:
467/// - `data` is empty: `EncodingError::EmptyData`.
468/// - Encoding fails: `EncodingError::ArrowError`.
469#[allow(clippy::missing_panics_doc)] // Guarded by empty check
470pub fn book_depth10_to_arrow_record_batch_bytes(
471    data: &[OrderBookDepth10],
472) -> Result<RecordBatch, EncodingError> {
473    if data.is_empty() {
474        return Err(EncodingError::EmptyData);
475    }
476
477    // Take first element and extract metadata
478    let first = data.first().unwrap();
479    let metadata = first.metadata();
480    OrderBookDepth10::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
481}
482
483/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
484///
485/// # Errors
486///
487/// Returns an error if:
488/// - `data` is empty: `EncodingError::EmptyData`.
489/// - Encoding fails: `EncodingError::ArrowError`.
490#[allow(clippy::missing_panics_doc)] // Guarded by empty check
491pub fn quotes_to_arrow_record_batch_bytes(
492    data: &[QuoteTick],
493) -> Result<RecordBatch, EncodingError> {
494    if data.is_empty() {
495        return Err(EncodingError::EmptyData);
496    }
497
498    // Take first element and extract metadata
499    let first = data.first().unwrap();
500    let metadata = first.metadata();
501    QuoteTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
502}
503
504/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
505///
506/// # Errors
507///
508/// Returns an error if:
509/// - `data` is empty: `EncodingError::EmptyData`.
510/// - Encoding fails: `EncodingError::ArrowError`.
511#[allow(clippy::missing_panics_doc)] // Guarded by empty check
512pub fn trades_to_arrow_record_batch_bytes(
513    data: &[TradeTick],
514) -> Result<RecordBatch, EncodingError> {
515    if data.is_empty() {
516        return Err(EncodingError::EmptyData);
517    }
518
519    // Take first element and extract metadata
520    let first = data.first().unwrap();
521    let metadata = first.metadata();
522    TradeTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
523}
524
525/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
526///
527/// # Errors
528///
529/// Returns an error if:
530/// - `data` is empty: `EncodingError::EmptyData`.
531/// - Encoding fails: `EncodingError::ArrowError`.
532#[allow(clippy::missing_panics_doc)] // Guarded by empty check
533pub fn bars_to_arrow_record_batch_bytes(data: &[Bar]) -> Result<RecordBatch, EncodingError> {
534    if data.is_empty() {
535        return Err(EncodingError::EmptyData);
536    }
537
538    // Take first element and extract metadata
539    let first = data.first().unwrap();
540    let metadata = first.metadata();
541    Bar::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
542}
543
544/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
545///
546/// # Errors
547///
548/// Returns an error if:
549/// - `data` is empty: `EncodingError::EmptyData`.
550/// - Encoding fails: `EncodingError::ArrowError`.
551#[allow(clippy::missing_panics_doc)] // Guarded by empty check
552pub fn mark_prices_to_arrow_record_batch_bytes(
553    data: &[MarkPriceUpdate],
554) -> Result<RecordBatch, EncodingError> {
555    if data.is_empty() {
556        return Err(EncodingError::EmptyData);
557    }
558
559    // Take first element and extract metadata
560    let first = data.first().unwrap();
561    let metadata = first.metadata();
562    MarkPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
563}
564
565/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
566///
567/// # Errors
568///
569/// Returns an error if:
570/// - `data` is empty: `EncodingError::EmptyData`.
571/// - Encoding fails: `EncodingError::ArrowError`.
572#[allow(clippy::missing_panics_doc)] // Guarded by empty check
573pub fn index_prices_to_arrow_record_batch_bytes(
574    data: &[IndexPriceUpdate],
575) -> Result<RecordBatch, EncodingError> {
576    if data.is_empty() {
577        return Err(EncodingError::EmptyData);
578    }
579
580    // Take first element and extract metadata
581    let first = data.first().unwrap();
582    let metadata = first.metadata();
583    IndexPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
584}
585
586/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
587///
588/// # Errors
589///
590/// Returns an error if:
591/// - `data` is empty: `EncodingError::EmptyData`.
592/// - Encoding fails: `EncodingError::ArrowError`.
593#[allow(clippy::missing_panics_doc)] // Guarded by empty check
594pub fn instrument_closes_to_arrow_record_batch_bytes(
595    data: &[InstrumentClose],
596) -> Result<RecordBatch, EncodingError> {
597    if data.is_empty() {
598        return Err(EncodingError::EmptyData);
599    }
600
601    // Take first element and extract metadata
602    let first = data.first().unwrap();
603    let metadata = first.metadata();
604    InstrumentClose::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
605}