Skip to main content

nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod account_state;
19pub mod bar;
20pub mod close;
21pub mod custom;
22pub mod delta;
23pub mod depth;
24#[cfg(feature = "display")]
25pub mod display;
26pub mod funding;
27pub mod index_price;
28pub mod instrument;
29pub mod instrument_status;
30pub mod json;
31pub mod mark_price;
32pub mod option_greeks;
33pub mod order_event;
34pub mod position_event;
35pub mod quote;
36pub mod report;
37pub mod snapshot;
38pub mod trade;
39
40use std::{
41    collections::HashMap,
42    io::{self, Write},
43};
44
45use arrow::{
46    array::{Array, ArrayRef, FixedSizeBinaryArray, StringArray, StringViewArray},
47    datatypes::{DataType, Schema},
48    error::ArrowError,
49    ipc::writer::StreamWriter,
50    record_batch::RecordBatch,
51};
52use nautilus_model::{
53    data::{
54        Data, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, bar::Bar,
55        close::InstrumentClose, delta::OrderBookDelta, depth::OrderBookDepth10,
56        option_chain::OptionGreeks, quote::QuoteTick, trade::TradeTick,
57    },
58    types::{
59        PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
60        fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
61        price::PriceRaw,
62        quantity::QuantityRaw,
63    },
64};
65#[cfg(feature = "python")]
66use pyo3::prelude::*;
67use ustr::Ustr;
68
69// Define metadata key constants constants
70const KEY_BAR_TYPE: &str = "bar_type";
71pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
72pub const KEY_PRICE_PRECISION: &str = "price_precision";
73pub const KEY_SIZE_PRECISION: &str = "size_precision";
74
75#[derive(thiserror::Error, Debug)]
76pub enum DataStreamingError {
77    #[error("I/O error: {0}")]
78    IoError(#[from] io::Error),
79    #[error("Arrow error: {0}")]
80    ArrowError(#[from] arrow::error::ArrowError),
81    #[cfg(feature = "python")]
82    #[error("Python error: {0}")]
83    PythonError(#[from] PyErr),
84}
85
86#[derive(thiserror::Error, Debug)]
87pub enum EncodingError {
88    #[error("Empty data")]
89    EmptyData,
90    #[error("Missing metadata key: `{0}`")]
91    MissingMetadata(&'static str),
92    #[error("Missing data column: `{0}` at index {1}")]
93    MissingColumn(&'static str, usize),
94    #[error("Error parsing `{0}`: {1}")]
95    ParseError(&'static str, String),
96    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
97    InvalidColumnType(&'static str, usize, DataType, DataType),
98    #[error(
99        "Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
100         but this build expects {expected_bytes} bytes. The catalog was created with a different \
101         precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
102         build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
103    )]
104    PrecisionMismatch {
105        field: &'static str,
106        expected_bytes: i32,
107        actual_bytes: i32,
108    },
109    #[error("Arrow error: {0}")]
110    ArrowError(#[from] arrow::error::ArrowError),
111}
112
113#[inline]
114fn get_raw_price(bytes: &[u8]) -> PriceRaw {
115    PriceRaw::from_le_bytes(
116        bytes
117            .try_into()
118            .expect("Price raw bytes must be exactly the size of PriceRaw"),
119    )
120}
121
122#[inline]
123fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
124    QuantityRaw::from_le_bytes(
125        bytes
126            .try_into()
127            .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
128    )
129}
130
131/// Gets raw price bytes and corrects for floating-point precision errors in stored data.
132///
133/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
134/// introduces floating-point errors. This corrects the raw value to the nearest valid
135/// multiple of the scale factor for the given precision.
136///
137/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
138#[inline]
139fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
140    let raw = get_raw_price(bytes);
141
142    // Preserve sentinel values unchanged
143    if raw == PRICE_UNDEF || raw == PRICE_ERROR {
144        return raw;
145    }
146
147    correct_price_raw(raw, precision)
148}
149
150/// Gets raw quantity bytes and corrects for floating-point precision errors in stored data.
151///
152/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
153/// introduces floating-point errors. This corrects the raw value to the nearest valid
154/// multiple of the scale factor for the given precision.
155///
156/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
157#[inline]
158fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
159    let raw = get_raw_quantity(bytes);
160
161    // Preserve sentinel values unchanged
162    if raw == QUANTITY_UNDEF {
163        return raw;
164    }
165
166    correct_quantity_raw(raw, precision)
167}
168
169/// Decodes a [`Price`] from raw bytes with bounds validation.
170///
171/// Uses corrected raw values to handle floating-point precision errors in stored data.
172/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
173///
174/// # Errors
175///
176/// Returns an [`EncodingError::ParseError`] if the price value is out of bounds.
177pub fn decode_price(
178    bytes: &[u8],
179    precision: u8,
180    field: &'static str,
181    row: usize,
182) -> Result<Price, EncodingError> {
183    let raw = get_corrected_raw_price(bytes, precision);
184    Price::from_raw_checked(raw, precision)
185        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
186}
187
188/// Decodes a [`Quantity`] from raw bytes with bounds validation.
189///
190/// Uses corrected raw values to handle floating-point precision errors in stored data.
191/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
192///
193/// # Errors
194///
195/// Returns an [`EncodingError::ParseError`] if the quantity value is out of bounds.
196pub fn decode_quantity(
197    bytes: &[u8],
198    precision: u8,
199    field: &'static str,
200    row: usize,
201) -> Result<Quantity, EncodingError> {
202    let raw = get_corrected_raw_quantity(bytes, precision);
203    Quantity::from_raw_checked(raw, precision)
204        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
205}
206
207/// Decodes a [`Price`] from raw bytes, using precision 0 for sentinel values.
208///
209/// For order book data where sentinel values indicate empty levels.
210///
211/// # Errors
212///
213/// Returns an [`EncodingError::ParseError`] if the price value is out of bounds.
214pub fn decode_price_with_sentinel(
215    bytes: &[u8],
216    precision: u8,
217    field: &'static str,
218    row: usize,
219) -> Result<Price, EncodingError> {
220    let raw = get_raw_price(bytes);
221    let (final_raw, final_precision) = if raw == PRICE_UNDEF {
222        (raw, 0)
223    } else {
224        (get_corrected_raw_price(bytes, precision), precision)
225    };
226    Price::from_raw_checked(final_raw, final_precision)
227        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
228}
229
230/// Decodes a [`Quantity`] from raw bytes, using precision 0 for sentinel values.
231///
232/// For order book data where sentinel values indicate empty levels.
233///
234/// # Errors
235///
236/// Returns an [`EncodingError::ParseError`] if the quantity value is out of bounds.
237pub fn decode_quantity_with_sentinel(
238    bytes: &[u8],
239    precision: u8,
240    field: &'static str,
241    row: usize,
242) -> Result<Quantity, EncodingError> {
243    let raw = get_raw_quantity(bytes);
244    let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
245        (raw, 0)
246    } else {
247        (get_corrected_raw_quantity(bytes, precision), precision)
248    };
249    Quantity::from_raw_checked(final_raw, final_precision)
250        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
251}
252
253/// Provides Apache Arrow schema definitions for data types.
254pub trait ArrowSchemaProvider {
255    /// Returns the Arrow schema for this type with optional metadata.
256    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
257
258    /// Returns a map of field names to their Arrow data types.
259    #[must_use]
260    fn get_schema_map() -> HashMap<String, String> {
261        let schema = Self::get_schema(None);
262        let mut map = HashMap::new();
263
264        for field in schema.fields() {
265            let name = field.name().clone();
266            let data_type = format!("{:?}", field.data_type());
267            map.insert(name, data_type);
268        }
269        map
270    }
271}
272
273/// Encodes data types to Apache Arrow RecordBatch format.
274pub trait EncodeToRecordBatch
275where
276    Self: Sized + ArrowSchemaProvider,
277{
278    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
279    ///
280    /// # Errors
281    ///
282    /// Returns an `ArrowError` if the encoding fails.
283    fn encode_batch(
284        metadata: &HashMap<String, String>,
285        data: &[Self],
286    ) -> Result<RecordBatch, ArrowError>;
287
288    /// Returns the metadata for this data element.
289    fn metadata(&self) -> HashMap<String, String>;
290
291    /// Returns the metadata for the first element in a chunk.
292    ///
293    /// # Panics
294    ///
295    /// Panics if `chunk` is empty.
296    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
297        chunk
298            .first()
299            .map(Self::metadata)
300            .expect("Chunk must have at least one element to encode")
301    }
302}
303
304/// Decodes data types from Apache Arrow RecordBatch format.
305pub trait DecodeFromRecordBatch
306where
307    Self: Sized + Into<Data> + ArrowSchemaProvider,
308{
309    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
310    ///
311    /// # Errors
312    ///
313    /// Returns an `EncodingError` if the decoding fails.
314    fn decode_batch(
315        metadata: &HashMap<String, String>,
316        record_batch: RecordBatch,
317    ) -> Result<Vec<Self>, EncodingError>;
318}
319
320/// Decodes strongly typed values from Apache Arrow RecordBatch format.
321pub trait DecodeTypedFromRecordBatch
322where
323    Self: Sized + ArrowSchemaProvider,
324{
325    /// Decodes a `RecordBatch` into a vector of values of the implementing type.
326    ///
327    /// # Errors
328    ///
329    /// Returns an `EncodingError` if the decoding fails.
330    fn decode_typed_batch(
331        metadata: &HashMap<String, String>,
332        record_batch: RecordBatch,
333    ) -> Result<Vec<Self>, EncodingError>;
334}
335
336impl<T> DecodeTypedFromRecordBatch for T
337where
338    T: DecodeFromRecordBatch,
339{
340    fn decode_typed_batch(
341        metadata: &HashMap<String, String>,
342        record_batch: RecordBatch,
343    ) -> Result<Vec<Self>, EncodingError> {
344        Self::decode_batch(metadata, record_batch)
345    }
346}
347
348/// Decodes raw Data objects from Apache Arrow RecordBatch format.
349pub trait DecodeDataFromRecordBatch
350where
351    Self: Sized + ArrowSchemaProvider,
352{
353    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
354    ///
355    /// # Errors
356    ///
357    /// Returns an `EncodingError` if the decoding fails.
358    fn decode_data_batch(
359        metadata: &HashMap<String, String>,
360        record_batch: RecordBatch,
361    ) -> Result<Vec<Data>, EncodingError>;
362}
363
364/// Writes RecordBatch data to output streams.
365pub trait WriteStream {
366    /// Writes a `RecordBatch` to the implementing output stream.
367    ///
368    /// # Errors
369    ///
370    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
371    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
372}
373
374impl<T: Write> WriteStream for T {
375    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
376        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
377        writer.write(record_batch)?;
378        writer.finish()?;
379        Ok(())
380    }
381}
382
383/// Extracts a string column, accepting both Utf8 (`StringArray`) and Utf8View (`StringViewArray`).
384/// Parquet may return Utf8View when reading, so this handles both formats.
385///
386/// # Errors
387///
388/// Returns an error if:
389/// - `column_index` is out of range: `EncodingError::MissingColumn`.
390/// - The column type is neither Utf8 nor Utf8View: `EncodingError::InvalidColumnType`.
391pub fn extract_column_string<'a>(
392    cols: &'a [ArrayRef],
393    column_key: &'static str,
394    column_index: usize,
395) -> Result<StringColumnRef<'a>, EncodingError> {
396    let column_values = cols
397        .get(column_index)
398        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
399    let dt = column_values.data_type();
400    if let Some(arr) = column_values.as_any().downcast_ref::<StringArray>() {
401        Ok(StringColumnRef::Utf8(arr))
402    } else if let Some(arr) = column_values.as_any().downcast_ref::<StringViewArray>() {
403        Ok(StringColumnRef::Utf8View(arr))
404    } else {
405        Err(EncodingError::InvalidColumnType(
406            column_key,
407            column_index,
408            DataType::Utf8,
409            dt.clone(),
410        ))
411    }
412}
413
414/// Reference to a string column, either Utf8 or Utf8View.
415#[derive(Debug)]
416pub enum StringColumnRef<'a> {
417    Utf8(&'a StringArray),
418    Utf8View(&'a StringViewArray),
419}
420
421impl StringColumnRef<'_> {
422    /// Returns the string value at row `i`.
423    #[inline]
424    #[must_use]
425    pub fn value(&self, i: usize) -> &str {
426        match self {
427            Self::Utf8(arr) => arr.value(i),
428            Self::Utf8View(arr) => arr.value(i),
429        }
430    }
431}
432
433/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
434///
435/// # Errors
436///
437/// Returns an error if:
438/// - `column_index` is out of range: `EncodingError::MissingColumn`.
439/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
440pub fn extract_column<'a, T: Array + 'static>(
441    cols: &'a [ArrayRef],
442    column_key: &'static str,
443    column_index: usize,
444    expected_type: DataType,
445) -> Result<&'a T, EncodingError> {
446    let column_values = cols
447        .get(column_index)
448        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
449    let downcasted_values =
450        column_values
451            .as_any()
452            .downcast_ref::<T>()
453            .ok_or(EncodingError::InvalidColumnType(
454                column_key,
455                column_index,
456                expected_type,
457                column_values.data_type().clone(),
458            ))?;
459    Ok(downcasted_values)
460}
461
462/// Extracts a column by name when present, falling back to an index for older schemas.
463///
464/// # Errors
465///
466/// Returns an error if the resolved column is missing or has the wrong type.
467pub fn extract_column_by_name_or_index<'a, T: Array + 'static>(
468    record_batch: &'a RecordBatch,
469    column_key: &'static str,
470    fallback_index: usize,
471    expected_type: DataType,
472) -> Result<&'a T, EncodingError> {
473    let column_index = record_batch
474        .schema()
475        .index_of(column_key)
476        .unwrap_or(fallback_index);
477    extract_column::<T>(
478        record_batch.columns(),
479        column_key,
480        column_index,
481        expected_type,
482    )
483}
484
485/// Extracts an optional UTF-8 column by name.
486///
487/// # Errors
488///
489/// Returns an error if the column exists but is not UTF-8.
490pub fn extract_optional_string_column_by_name<'a>(
491    record_batch: &'a RecordBatch,
492    column_key: &'static str,
493) -> Result<Option<&'a StringArray>, EncodingError> {
494    let Ok(column_index) = record_batch.schema().index_of(column_key) else {
495        return Ok(None);
496    };
497    let column_values = record_batch
498        .columns()
499        .get(column_index)
500        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
501    let downcasted_values = column_values.as_any().downcast_ref::<StringArray>().ok_or(
502        EncodingError::InvalidColumnType(
503            column_key,
504            column_index,
505            DataType::Utf8,
506            column_values.data_type().clone(),
507        ),
508    )?;
509    Ok(Some(downcasted_values))
510}
511
512/// Returns an optional [`Ustr`] value from an optional string column.
513#[must_use]
514pub fn optional_ustr_value(values: Option<&StringArray>, row: usize) -> Option<Ustr> {
515    values.and_then(|column| (!column.is_null(row)).then(|| Ustr::from(column.value(row))))
516}
517
518/// Validates that a [`FixedSizeBinaryArray`] has the expected precision byte width.
519///
520/// This detects precision mode mismatches that occur when catalog data was encoded
521/// with a different precision mode (64-bit standard vs 128-bit high-precision).
522///
523/// # Errors
524///
525/// Returns [`EncodingError::PrecisionMismatch`] if the actual byte width doesn't
526/// match [`PRECISION_BYTES`].
527pub fn validate_precision_bytes(
528    array: &FixedSizeBinaryArray,
529    field: &'static str,
530) -> Result<(), EncodingError> {
531    let actual = array.value_length();
532    if actual != PRECISION_BYTES {
533        return Err(EncodingError::PrecisionMismatch {
534            field,
535            expected_bytes: PRECISION_BYTES,
536            actual_bytes: actual,
537        });
538    }
539    Ok(())
540}
541
542/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
543///
544/// # Errors
545///
546/// Returns an error if:
547/// - `data` is empty: `EncodingError::EmptyData`.
548/// - Encoding fails: `EncodingError::ArrowError`.
549pub fn book_deltas_to_arrow_record_batch_bytes(
550    data: &[OrderBookDelta],
551) -> Result<RecordBatch, EncodingError> {
552    if data.is_empty() {
553        return Err(EncodingError::EmptyData);
554    }
555
556    // Extract metadata from chunk
557    let metadata = OrderBookDelta::chunk_metadata(data);
558    OrderBookDelta::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
559}
560
561/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
562///
563/// # Errors
564///
565/// Returns an error if:
566/// - `data` is empty: `EncodingError::EmptyData`.
567/// - Encoding fails: `EncodingError::ArrowError`.
568#[expect(clippy::missing_panics_doc)] // Guarded by empty check
569pub fn book_depth10_to_arrow_record_batch_bytes(
570    data: &[OrderBookDepth10],
571) -> Result<RecordBatch, EncodingError> {
572    if data.is_empty() {
573        return Err(EncodingError::EmptyData);
574    }
575
576    // Take first element and extract metadata
577    let first = data.first().unwrap();
578    let metadata = first.metadata();
579    OrderBookDepth10::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
580}
581
582/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
583///
584/// # Errors
585///
586/// Returns an error if:
587/// - `data` is empty: `EncodingError::EmptyData`.
588/// - Encoding fails: `EncodingError::ArrowError`.
589#[expect(clippy::missing_panics_doc)] // Guarded by empty check
590pub fn quotes_to_arrow_record_batch_bytes(
591    data: &[QuoteTick],
592) -> Result<RecordBatch, EncodingError> {
593    if data.is_empty() {
594        return Err(EncodingError::EmptyData);
595    }
596
597    // Take first element and extract metadata
598    let first = data.first().unwrap();
599    let metadata = first.metadata();
600    QuoteTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
601}
602
603/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
604///
605/// # Errors
606///
607/// Returns an error if:
608/// - `data` is empty: `EncodingError::EmptyData`.
609/// - Encoding fails: `EncodingError::ArrowError`.
610#[expect(clippy::missing_panics_doc)] // Guarded by empty check
611pub fn trades_to_arrow_record_batch_bytes(
612    data: &[TradeTick],
613) -> Result<RecordBatch, EncodingError> {
614    if data.is_empty() {
615        return Err(EncodingError::EmptyData);
616    }
617
618    // Take first element and extract metadata
619    let first = data.first().unwrap();
620    let metadata = first.metadata();
621    TradeTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
622}
623
624/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
625///
626/// # Errors
627///
628/// Returns an error if:
629/// - `data` is empty: `EncodingError::EmptyData`.
630/// - Encoding fails: `EncodingError::ArrowError`.
631#[expect(clippy::missing_panics_doc)] // Guarded by empty check
632pub fn bars_to_arrow_record_batch_bytes(data: &[Bar]) -> Result<RecordBatch, EncodingError> {
633    if data.is_empty() {
634        return Err(EncodingError::EmptyData);
635    }
636
637    // Take first element and extract metadata
638    let first = data.first().unwrap();
639    let metadata = first.metadata();
640    Bar::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
641}
642
643/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
644///
645/// # Errors
646///
647/// Returns an error if:
648/// - `data` is empty: `EncodingError::EmptyData`.
649/// - Encoding fails: `EncodingError::ArrowError`.
650#[expect(clippy::missing_panics_doc)] // Guarded by empty check
651pub fn mark_prices_to_arrow_record_batch_bytes(
652    data: &[MarkPriceUpdate],
653) -> Result<RecordBatch, EncodingError> {
654    if data.is_empty() {
655        return Err(EncodingError::EmptyData);
656    }
657
658    // Take first element and extract metadata
659    let first = data.first().unwrap();
660    let metadata = first.metadata();
661    MarkPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
662}
663
664/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
665///
666/// # Errors
667///
668/// Returns an error if:
669/// - `data` is empty: `EncodingError::EmptyData`.
670/// - Encoding fails: `EncodingError::ArrowError`.
671#[expect(clippy::missing_panics_doc)] // Guarded by empty check
672pub fn index_prices_to_arrow_record_batch_bytes(
673    data: &[IndexPriceUpdate],
674) -> Result<RecordBatch, EncodingError> {
675    if data.is_empty() {
676        return Err(EncodingError::EmptyData);
677    }
678
679    // Take first element and extract metadata
680    let first = data.first().unwrap();
681    let metadata = first.metadata();
682    IndexPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
683}
684
685/// Converts a vector of `InstrumentStatus` into an Arrow `RecordBatch`.
686///
687/// # Errors
688///
689/// Returns an error if:
690/// - `data` is empty: `EncodingError::EmptyData`.
691/// - Encoding fails: `EncodingError::ArrowError`.
692#[expect(clippy::missing_panics_doc)] // Guarded by empty check
693pub fn instrument_status_to_arrow_record_batch_bytes(
694    data: &[InstrumentStatus],
695) -> Result<RecordBatch, EncodingError> {
696    if data.is_empty() {
697        return Err(EncodingError::EmptyData);
698    }
699
700    let first = data.first().unwrap();
701    let metadata = first.metadata();
702    InstrumentStatus::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
703}
704
705/// Converts a vector of `OptionGreeks` into an Arrow `RecordBatch`.
706///
707/// # Errors
708///
709/// Returns an error if:
710/// - `data` is empty: `EncodingError::EmptyData`.
711/// - Encoding fails: `EncodingError::ArrowError`.
712#[expect(clippy::missing_panics_doc)] // Guarded by empty check
713pub fn option_greeks_to_arrow_record_batch_bytes(
714    data: &[OptionGreeks],
715) -> Result<RecordBatch, EncodingError> {
716    if data.is_empty() {
717        return Err(EncodingError::EmptyData);
718    }
719
720    let first = data.first().unwrap();
721    let metadata = first.metadata();
722    OptionGreeks::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
723}
724
725/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
726///
727/// # Errors
728///
729/// Returns an error if:
730/// - `data` is empty: `EncodingError::EmptyData`.
731/// - Encoding fails: `EncodingError::ArrowError`.
732#[expect(clippy::missing_panics_doc)] // Guarded by empty check
733pub fn instrument_closes_to_arrow_record_batch_bytes(
734    data: &[InstrumentClose],
735) -> Result<RecordBatch, EncodingError> {
736    if data.is_empty() {
737        return Err(EncodingError::EmptyData);
738    }
739
740    // Take first element and extract metadata
741    let first = data.first().unwrap();
742    let metadata = first.metadata();
743    InstrumentClose::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
744}