nautilus_serialization/arrow/
mod.rs1pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28 collections::HashMap,
29 io::{self, Write},
30};
31
32use arrow::{
33 array::{Array, ArrayRef},
34 datatypes::{DataType, Schema},
35 error::ArrowError,
36 ipc::writer::StreamWriter,
37 record_batch::RecordBatch,
38};
39use nautilus_model::{
40 data::{
41 Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42 delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43 },
44 types::{
45 PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
46 fixed::{correct_price_raw, correct_quantity_raw},
47 price::PriceRaw,
48 quantity::QuantityRaw,
49 },
50};
51#[cfg(feature = "python")]
52use pyo3::prelude::*;
53
54const KEY_BAR_TYPE: &str = "bar_type";
56pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
57const KEY_PRICE_PRECISION: &str = "price_precision";
58const KEY_SIZE_PRECISION: &str = "size_precision";
59
60#[derive(thiserror::Error, Debug)]
61pub enum DataStreamingError {
62 #[error("I/O error: {0}")]
63 IoError(#[from] io::Error),
64 #[error("Arrow error: {0}")]
65 ArrowError(#[from] arrow::error::ArrowError),
66 #[cfg(feature = "python")]
67 #[error("Python error: {0}")]
68 PythonError(#[from] PyErr),
69}
70
71#[derive(thiserror::Error, Debug)]
72pub enum EncodingError {
73 #[error("Empty data")]
74 EmptyData,
75 #[error("Missing metadata key: `{0}`")]
76 MissingMetadata(&'static str),
77 #[error("Missing data column: `{0}` at index {1}")]
78 MissingColumn(&'static str, usize),
79 #[error("Error parsing `{0}`: {1}")]
80 ParseError(&'static str, String),
81 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
82 InvalidColumnType(&'static str, usize, DataType, DataType),
83 #[error("Arrow error: {0}")]
84 ArrowError(#[from] arrow::error::ArrowError),
85}
86
87#[inline]
88fn get_raw_price(bytes: &[u8]) -> PriceRaw {
89 PriceRaw::from_le_bytes(
90 bytes
91 .try_into()
92 .expect("Price raw bytes must be exactly the size of PriceRaw"),
93 )
94}
95
96#[inline]
97fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
98 QuantityRaw::from_le_bytes(
99 bytes
100 .try_into()
101 .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
102 )
103}
104
105#[inline]
113fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
114 let raw = get_raw_price(bytes);
115
116 if raw == PRICE_UNDEF || raw == PRICE_ERROR {
118 return raw;
119 }
120
121 correct_price_raw(raw, precision)
122}
123
124#[inline]
132fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
133 let raw = get_raw_quantity(bytes);
134
135 if raw == QUANTITY_UNDEF {
137 return raw;
138 }
139
140 correct_quantity_raw(raw, precision)
141}
142
143fn decode_price(
148 bytes: &[u8],
149 precision: u8,
150 field: &'static str,
151 row: usize,
152) -> Result<Price, EncodingError> {
153 let raw = get_corrected_raw_price(bytes, precision);
154 Price::from_raw_checked(raw, precision)
155 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
156}
157
158fn decode_quantity(
163 bytes: &[u8],
164 precision: u8,
165 field: &'static str,
166 row: usize,
167) -> Result<Quantity, EncodingError> {
168 let raw = get_corrected_raw_quantity(bytes, precision);
169 Quantity::from_raw_checked(raw, precision)
170 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
171}
172
173fn decode_price_with_sentinel(
177 bytes: &[u8],
178 precision: u8,
179 field: &'static str,
180 row: usize,
181) -> Result<Price, EncodingError> {
182 let raw = get_raw_price(bytes);
183 let (final_raw, final_precision) = if raw == PRICE_UNDEF {
184 (raw, 0)
185 } else {
186 (get_corrected_raw_price(bytes, precision), precision)
187 };
188 Price::from_raw_checked(final_raw, final_precision)
189 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
190}
191
192fn decode_quantity_with_sentinel(
196 bytes: &[u8],
197 precision: u8,
198 field: &'static str,
199 row: usize,
200) -> Result<Quantity, EncodingError> {
201 let raw = get_raw_quantity(bytes);
202 let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
203 (raw, 0)
204 } else {
205 (get_corrected_raw_quantity(bytes, precision), precision)
206 };
207 Quantity::from_raw_checked(final_raw, final_precision)
208 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
209}
210
211pub trait ArrowSchemaProvider {
213 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
215
216 #[must_use]
218 fn get_schema_map() -> HashMap<String, String> {
219 let schema = Self::get_schema(None);
220 let mut map = HashMap::new();
221 for field in schema.fields() {
222 let name = field.name().clone();
223 let data_type = format!("{:?}", field.data_type());
224 map.insert(name, data_type);
225 }
226 map
227 }
228}
229
230pub trait EncodeToRecordBatch
232where
233 Self: Sized + ArrowSchemaProvider,
234{
235 fn encode_batch(
241 metadata: &HashMap<String, String>,
242 data: &[Self],
243 ) -> Result<RecordBatch, ArrowError>;
244
245 fn metadata(&self) -> HashMap<String, String>;
247
248 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
254 chunk
255 .first()
256 .map(|elem| elem.metadata())
257 .expect("Chunk must have at least one element to encode")
258 }
259}
260
261pub trait DecodeFromRecordBatch
263where
264 Self: Sized + Into<Data> + ArrowSchemaProvider,
265{
266 fn decode_batch(
272 metadata: &HashMap<String, String>,
273 record_batch: RecordBatch,
274 ) -> Result<Vec<Self>, EncodingError>;
275}
276
277pub trait DecodeDataFromRecordBatch
279where
280 Self: Sized + Into<Data> + ArrowSchemaProvider,
281{
282 fn decode_data_batch(
288 metadata: &HashMap<String, String>,
289 record_batch: RecordBatch,
290 ) -> Result<Vec<Data>, EncodingError>;
291}
292
293pub trait WriteStream {
295 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
301}
302
303impl<T: Write> WriteStream for T {
304 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
305 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
306 writer.write(record_batch)?;
307 writer.finish()?;
308 Ok(())
309 }
310}
311
312pub fn extract_column<'a, T: Array + 'static>(
320 cols: &'a [ArrayRef],
321 column_key: &'static str,
322 column_index: usize,
323 expected_type: DataType,
324) -> Result<&'a T, EncodingError> {
325 let column_values = cols
326 .get(column_index)
327 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
328 let downcasted_values =
329 column_values
330 .as_any()
331 .downcast_ref::<T>()
332 .ok_or(EncodingError::InvalidColumnType(
333 column_key,
334 column_index,
335 expected_type,
336 column_values.data_type().clone(),
337 ))?;
338 Ok(downcasted_values)
339}
340
341pub fn book_deltas_to_arrow_record_batch_bytes(
349 data: Vec<OrderBookDelta>,
350) -> Result<RecordBatch, EncodingError> {
351 if data.is_empty() {
352 return Err(EncodingError::EmptyData);
353 }
354
355 let metadata = OrderBookDelta::chunk_metadata(&data);
357 OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
358}
359
360pub fn book_depth10_to_arrow_record_batch_bytes(
372 data: Vec<OrderBookDepth10>,
373) -> Result<RecordBatch, EncodingError> {
374 if data.is_empty() {
375 return Err(EncodingError::EmptyData);
376 }
377
378 let first = data.first().unwrap();
381 let metadata = first.metadata();
382 OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
383}
384
385pub fn quotes_to_arrow_record_batch_bytes(
397 data: Vec<QuoteTick>,
398) -> Result<RecordBatch, EncodingError> {
399 if data.is_empty() {
400 return Err(EncodingError::EmptyData);
401 }
402
403 let first = data.first().unwrap();
406 let metadata = first.metadata();
407 QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
408}
409
410pub fn trades_to_arrow_record_batch_bytes(
422 data: Vec<TradeTick>,
423) -> Result<RecordBatch, EncodingError> {
424 if data.is_empty() {
425 return Err(EncodingError::EmptyData);
426 }
427
428 let first = data.first().unwrap();
431 let metadata = first.metadata();
432 TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
433}
434
435pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
447 if data.is_empty() {
448 return Err(EncodingError::EmptyData);
449 }
450
451 let first = data.first().unwrap();
454 let metadata = first.metadata();
455 Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
456}
457
458pub fn mark_prices_to_arrow_record_batch_bytes(
470 data: Vec<MarkPriceUpdate>,
471) -> Result<RecordBatch, EncodingError> {
472 if data.is_empty() {
473 return Err(EncodingError::EmptyData);
474 }
475
476 let first = data.first().unwrap();
479 let metadata = first.metadata();
480 MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
481}
482
483pub fn index_prices_to_arrow_record_batch_bytes(
495 data: Vec<IndexPriceUpdate>,
496) -> Result<RecordBatch, EncodingError> {
497 if data.is_empty() {
498 return Err(EncodingError::EmptyData);
499 }
500
501 let first = data.first().unwrap();
504 let metadata = first.metadata();
505 IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
506}
507
508pub fn instrument_closes_to_arrow_record_batch_bytes(
520 data: Vec<InstrumentClose>,
521) -> Result<RecordBatch, EncodingError> {
522 if data.is_empty() {
523 return Err(EncodingError::EmptyData);
524 }
525
526 let first = data.first().unwrap();
529 let metadata = first.metadata();
530 InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
531}