1pub mod account_state;
19pub mod bar;
20pub mod close;
21pub mod custom;
22pub mod delta;
23pub mod depth;
24#[cfg(feature = "display")]
25pub mod display;
26pub mod funding;
27pub mod index_price;
28pub mod instrument;
29pub mod instrument_status;
30pub mod json;
31pub mod mark_price;
32pub mod option_greeks;
33pub mod order_event;
34pub mod position_event;
35pub mod quote;
36pub mod report;
37pub mod snapshot;
38pub mod trade;
39
40use std::{
41 collections::HashMap,
42 io::{self, Write},
43};
44
45use arrow::{
46 array::{Array, ArrayRef, FixedSizeBinaryArray, StringArray, StringViewArray},
47 datatypes::{DataType, Schema},
48 error::ArrowError,
49 ipc::writer::StreamWriter,
50 record_batch::RecordBatch,
51};
52use nautilus_model::{
53 data::{
54 Data, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, bar::Bar,
55 close::InstrumentClose, delta::OrderBookDelta, depth::OrderBookDepth10,
56 option_chain::OptionGreeks, quote::QuoteTick, trade::TradeTick,
57 },
58 types::{
59 PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
60 fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
61 price::PriceRaw,
62 quantity::QuantityRaw,
63 },
64};
65#[cfg(feature = "python")]
66use pyo3::prelude::*;
67use ustr::Ustr;
68
69const KEY_BAR_TYPE: &str = "bar_type";
71pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
72pub const KEY_PRICE_PRECISION: &str = "price_precision";
73pub const KEY_SIZE_PRECISION: &str = "size_precision";
74
75#[derive(thiserror::Error, Debug)]
76pub enum DataStreamingError {
77 #[error("I/O error: {0}")]
78 IoError(#[from] io::Error),
79 #[error("Arrow error: {0}")]
80 ArrowError(#[from] arrow::error::ArrowError),
81 #[cfg(feature = "python")]
82 #[error("Python error: {0}")]
83 PythonError(#[from] PyErr),
84}
85
86#[derive(thiserror::Error, Debug)]
87pub enum EncodingError {
88 #[error("Empty data")]
89 EmptyData,
90 #[error("Missing metadata key: `{0}`")]
91 MissingMetadata(&'static str),
92 #[error("Missing data column: `{0}` at index {1}")]
93 MissingColumn(&'static str, usize),
94 #[error("Error parsing `{0}`: {1}")]
95 ParseError(&'static str, String),
96 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
97 InvalidColumnType(&'static str, usize, DataType, DataType),
98 #[error(
99 "Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
100 but this build expects {expected_bytes} bytes. The catalog was created with a different \
101 precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
102 build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
103 )]
104 PrecisionMismatch {
105 field: &'static str,
106 expected_bytes: i32,
107 actual_bytes: i32,
108 },
109 #[error("Arrow error: {0}")]
110 ArrowError(#[from] arrow::error::ArrowError),
111}
112
113#[inline]
114fn get_raw_price(bytes: &[u8]) -> PriceRaw {
115 PriceRaw::from_le_bytes(
116 bytes
117 .try_into()
118 .expect("Price raw bytes must be exactly the size of PriceRaw"),
119 )
120}
121
122#[inline]
123fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
124 QuantityRaw::from_le_bytes(
125 bytes
126 .try_into()
127 .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
128 )
129}
130
131#[inline]
139fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
140 let raw = get_raw_price(bytes);
141
142 if raw == PRICE_UNDEF || raw == PRICE_ERROR {
144 return raw;
145 }
146
147 correct_price_raw(raw, precision)
148}
149
150#[inline]
158fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
159 let raw = get_raw_quantity(bytes);
160
161 if raw == QUANTITY_UNDEF {
163 return raw;
164 }
165
166 correct_quantity_raw(raw, precision)
167}
168
169pub fn decode_price(
178 bytes: &[u8],
179 precision: u8,
180 field: &'static str,
181 row: usize,
182) -> Result<Price, EncodingError> {
183 let raw = get_corrected_raw_price(bytes, precision);
184 Price::from_raw_checked(raw, precision)
185 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
186}
187
188pub fn decode_quantity(
197 bytes: &[u8],
198 precision: u8,
199 field: &'static str,
200 row: usize,
201) -> Result<Quantity, EncodingError> {
202 let raw = get_corrected_raw_quantity(bytes, precision);
203 Quantity::from_raw_checked(raw, precision)
204 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
205}
206
207pub fn decode_price_with_sentinel(
215 bytes: &[u8],
216 precision: u8,
217 field: &'static str,
218 row: usize,
219) -> Result<Price, EncodingError> {
220 let raw = get_raw_price(bytes);
221 let (final_raw, final_precision) = if raw == PRICE_UNDEF {
222 (raw, 0)
223 } else {
224 (get_corrected_raw_price(bytes, precision), precision)
225 };
226 Price::from_raw_checked(final_raw, final_precision)
227 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
228}
229
230pub fn decode_quantity_with_sentinel(
238 bytes: &[u8],
239 precision: u8,
240 field: &'static str,
241 row: usize,
242) -> Result<Quantity, EncodingError> {
243 let raw = get_raw_quantity(bytes);
244 let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
245 (raw, 0)
246 } else {
247 (get_corrected_raw_quantity(bytes, precision), precision)
248 };
249 Quantity::from_raw_checked(final_raw, final_precision)
250 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
251}
252
253pub trait ArrowSchemaProvider {
255 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
257
258 #[must_use]
260 fn get_schema_map() -> HashMap<String, String> {
261 let schema = Self::get_schema(None);
262 let mut map = HashMap::new();
263
264 for field in schema.fields() {
265 let name = field.name().clone();
266 let data_type = format!("{:?}", field.data_type());
267 map.insert(name, data_type);
268 }
269 map
270 }
271}
272
273pub trait EncodeToRecordBatch
275where
276 Self: Sized + ArrowSchemaProvider,
277{
278 fn encode_batch(
284 metadata: &HashMap<String, String>,
285 data: &[Self],
286 ) -> Result<RecordBatch, ArrowError>;
287
288 fn metadata(&self) -> HashMap<String, String>;
290
291 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
297 chunk
298 .first()
299 .map(Self::metadata)
300 .expect("Chunk must have at least one element to encode")
301 }
302}
303
304pub trait DecodeFromRecordBatch
306where
307 Self: Sized + Into<Data> + ArrowSchemaProvider,
308{
309 fn decode_batch(
315 metadata: &HashMap<String, String>,
316 record_batch: RecordBatch,
317 ) -> Result<Vec<Self>, EncodingError>;
318}
319
320pub trait DecodeTypedFromRecordBatch
322where
323 Self: Sized + ArrowSchemaProvider,
324{
325 fn decode_typed_batch(
331 metadata: &HashMap<String, String>,
332 record_batch: RecordBatch,
333 ) -> Result<Vec<Self>, EncodingError>;
334}
335
336impl<T> DecodeTypedFromRecordBatch for T
337where
338 T: DecodeFromRecordBatch,
339{
340 fn decode_typed_batch(
341 metadata: &HashMap<String, String>,
342 record_batch: RecordBatch,
343 ) -> Result<Vec<Self>, EncodingError> {
344 Self::decode_batch(metadata, record_batch)
345 }
346}
347
348pub trait DecodeDataFromRecordBatch
350where
351 Self: Sized + ArrowSchemaProvider,
352{
353 fn decode_data_batch(
359 metadata: &HashMap<String, String>,
360 record_batch: RecordBatch,
361 ) -> Result<Vec<Data>, EncodingError>;
362}
363
364pub trait WriteStream {
366 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
372}
373
374impl<T: Write> WriteStream for T {
375 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
376 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
377 writer.write(record_batch)?;
378 writer.finish()?;
379 Ok(())
380 }
381}
382
383pub fn extract_column_string<'a>(
392 cols: &'a [ArrayRef],
393 column_key: &'static str,
394 column_index: usize,
395) -> Result<StringColumnRef<'a>, EncodingError> {
396 let column_values = cols
397 .get(column_index)
398 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
399 let dt = column_values.data_type();
400 if let Some(arr) = column_values.as_any().downcast_ref::<StringArray>() {
401 Ok(StringColumnRef::Utf8(arr))
402 } else if let Some(arr) = column_values.as_any().downcast_ref::<StringViewArray>() {
403 Ok(StringColumnRef::Utf8View(arr))
404 } else {
405 Err(EncodingError::InvalidColumnType(
406 column_key,
407 column_index,
408 DataType::Utf8,
409 dt.clone(),
410 ))
411 }
412}
413
414#[derive(Debug)]
416pub enum StringColumnRef<'a> {
417 Utf8(&'a StringArray),
418 Utf8View(&'a StringViewArray),
419}
420
421impl StringColumnRef<'_> {
422 #[inline]
424 #[must_use]
425 pub fn value(&self, i: usize) -> &str {
426 match self {
427 Self::Utf8(arr) => arr.value(i),
428 Self::Utf8View(arr) => arr.value(i),
429 }
430 }
431}
432
433pub fn extract_column<'a, T: Array + 'static>(
441 cols: &'a [ArrayRef],
442 column_key: &'static str,
443 column_index: usize,
444 expected_type: DataType,
445) -> Result<&'a T, EncodingError> {
446 let column_values = cols
447 .get(column_index)
448 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
449 let downcasted_values =
450 column_values
451 .as_any()
452 .downcast_ref::<T>()
453 .ok_or(EncodingError::InvalidColumnType(
454 column_key,
455 column_index,
456 expected_type,
457 column_values.data_type().clone(),
458 ))?;
459 Ok(downcasted_values)
460}
461
462pub fn extract_column_by_name_or_index<'a, T: Array + 'static>(
468 record_batch: &'a RecordBatch,
469 column_key: &'static str,
470 fallback_index: usize,
471 expected_type: DataType,
472) -> Result<&'a T, EncodingError> {
473 let column_index = record_batch
474 .schema()
475 .index_of(column_key)
476 .unwrap_or(fallback_index);
477 extract_column::<T>(
478 record_batch.columns(),
479 column_key,
480 column_index,
481 expected_type,
482 )
483}
484
485pub fn extract_optional_string_column_by_name<'a>(
491 record_batch: &'a RecordBatch,
492 column_key: &'static str,
493) -> Result<Option<&'a StringArray>, EncodingError> {
494 let Ok(column_index) = record_batch.schema().index_of(column_key) else {
495 return Ok(None);
496 };
497 let column_values = record_batch
498 .columns()
499 .get(column_index)
500 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
501 let downcasted_values = column_values.as_any().downcast_ref::<StringArray>().ok_or(
502 EncodingError::InvalidColumnType(
503 column_key,
504 column_index,
505 DataType::Utf8,
506 column_values.data_type().clone(),
507 ),
508 )?;
509 Ok(Some(downcasted_values))
510}
511
512#[must_use]
514pub fn optional_ustr_value(values: Option<&StringArray>, row: usize) -> Option<Ustr> {
515 values.and_then(|column| (!column.is_null(row)).then(|| Ustr::from(column.value(row))))
516}
517
518pub fn validate_precision_bytes(
528 array: &FixedSizeBinaryArray,
529 field: &'static str,
530) -> Result<(), EncodingError> {
531 let actual = array.value_length();
532 if actual != PRECISION_BYTES {
533 return Err(EncodingError::PrecisionMismatch {
534 field,
535 expected_bytes: PRECISION_BYTES,
536 actual_bytes: actual,
537 });
538 }
539 Ok(())
540}
541
542pub fn book_deltas_to_arrow_record_batch_bytes(
550 data: &[OrderBookDelta],
551) -> Result<RecordBatch, EncodingError> {
552 if data.is_empty() {
553 return Err(EncodingError::EmptyData);
554 }
555
556 let metadata = OrderBookDelta::chunk_metadata(data);
558 OrderBookDelta::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
559}
560
561#[expect(clippy::missing_panics_doc)] pub fn book_depth10_to_arrow_record_batch_bytes(
570 data: &[OrderBookDepth10],
571) -> Result<RecordBatch, EncodingError> {
572 if data.is_empty() {
573 return Err(EncodingError::EmptyData);
574 }
575
576 let first = data.first().unwrap();
578 let metadata = first.metadata();
579 OrderBookDepth10::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
580}
581
582#[expect(clippy::missing_panics_doc)] pub fn quotes_to_arrow_record_batch_bytes(
591 data: &[QuoteTick],
592) -> Result<RecordBatch, EncodingError> {
593 if data.is_empty() {
594 return Err(EncodingError::EmptyData);
595 }
596
597 let first = data.first().unwrap();
599 let metadata = first.metadata();
600 QuoteTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
601}
602
603#[expect(clippy::missing_panics_doc)] pub fn trades_to_arrow_record_batch_bytes(
612 data: &[TradeTick],
613) -> Result<RecordBatch, EncodingError> {
614 if data.is_empty() {
615 return Err(EncodingError::EmptyData);
616 }
617
618 let first = data.first().unwrap();
620 let metadata = first.metadata();
621 TradeTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
622}
623
624#[expect(clippy::missing_panics_doc)] pub fn bars_to_arrow_record_batch_bytes(data: &[Bar]) -> Result<RecordBatch, EncodingError> {
633 if data.is_empty() {
634 return Err(EncodingError::EmptyData);
635 }
636
637 let first = data.first().unwrap();
639 let metadata = first.metadata();
640 Bar::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
641}
642
643#[expect(clippy::missing_panics_doc)] pub fn mark_prices_to_arrow_record_batch_bytes(
652 data: &[MarkPriceUpdate],
653) -> Result<RecordBatch, EncodingError> {
654 if data.is_empty() {
655 return Err(EncodingError::EmptyData);
656 }
657
658 let first = data.first().unwrap();
660 let metadata = first.metadata();
661 MarkPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
662}
663
664#[expect(clippy::missing_panics_doc)] pub fn index_prices_to_arrow_record_batch_bytes(
673 data: &[IndexPriceUpdate],
674) -> Result<RecordBatch, EncodingError> {
675 if data.is_empty() {
676 return Err(EncodingError::EmptyData);
677 }
678
679 let first = data.first().unwrap();
681 let metadata = first.metadata();
682 IndexPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
683}
684
685#[expect(clippy::missing_panics_doc)] pub fn instrument_status_to_arrow_record_batch_bytes(
694 data: &[InstrumentStatus],
695) -> Result<RecordBatch, EncodingError> {
696 if data.is_empty() {
697 return Err(EncodingError::EmptyData);
698 }
699
700 let first = data.first().unwrap();
701 let metadata = first.metadata();
702 InstrumentStatus::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
703}
704
705#[expect(clippy::missing_panics_doc)] pub fn option_greeks_to_arrow_record_batch_bytes(
714 data: &[OptionGreeks],
715) -> Result<RecordBatch, EncodingError> {
716 if data.is_empty() {
717 return Err(EncodingError::EmptyData);
718 }
719
720 let first = data.first().unwrap();
721 let metadata = first.metadata();
722 OptionGreeks::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
723}
724
725#[expect(clippy::missing_panics_doc)] pub fn instrument_closes_to_arrow_record_batch_bytes(
734 data: &[InstrumentClose],
735) -> Result<RecordBatch, EncodingError> {
736 if data.is_empty() {
737 return Err(EncodingError::EmptyData);
738 }
739
740 let first = data.first().unwrap();
742 let metadata = first.metadata();
743 InstrumentClose::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
744}