1pub mod bar;
19pub mod close;
20pub mod custom;
21pub mod delta;
22pub mod depth;
23pub mod index_price;
24pub mod instrument;
25pub mod mark_price;
26pub mod quote;
27pub mod trade;
28
29use std::{
30 collections::HashMap,
31 io::{self, Write},
32};
33
34use arrow::{
35 array::{Array, ArrayRef, FixedSizeBinaryArray, StringArray, StringViewArray},
36 datatypes::{DataType, Schema},
37 error::ArrowError,
38 ipc::writer::StreamWriter,
39 record_batch::RecordBatch,
40};
41use nautilus_model::{
42 data::{
43 Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
44 delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
45 },
46 types::{
47 PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
48 fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
49 price::PriceRaw,
50 quantity::QuantityRaw,
51 },
52};
53#[cfg(feature = "python")]
54use pyo3::prelude::*;
55
56const KEY_BAR_TYPE: &str = "bar_type";
58pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
59pub const KEY_PRICE_PRECISION: &str = "price_precision";
60pub const KEY_SIZE_PRECISION: &str = "size_precision";
61
62#[derive(thiserror::Error, Debug)]
63pub enum DataStreamingError {
64 #[error("I/O error: {0}")]
65 IoError(#[from] io::Error),
66 #[error("Arrow error: {0}")]
67 ArrowError(#[from] arrow::error::ArrowError),
68 #[cfg(feature = "python")]
69 #[error("Python error: {0}")]
70 PythonError(#[from] PyErr),
71}
72
73#[derive(thiserror::Error, Debug)]
74pub enum EncodingError {
75 #[error("Empty data")]
76 EmptyData,
77 #[error("Missing metadata key: `{0}`")]
78 MissingMetadata(&'static str),
79 #[error("Missing data column: `{0}` at index {1}")]
80 MissingColumn(&'static str, usize),
81 #[error("Error parsing `{0}`: {1}")]
82 ParseError(&'static str, String),
83 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
84 InvalidColumnType(&'static str, usize, DataType, DataType),
85 #[error(
86 "Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
87 but this build expects {expected_bytes} bytes. The catalog was created with a different \
88 precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
89 build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
90 )]
91 PrecisionMismatch {
92 field: &'static str,
93 expected_bytes: i32,
94 actual_bytes: i32,
95 },
96 #[error("Arrow error: {0}")]
97 ArrowError(#[from] arrow::error::ArrowError),
98}
99
100#[inline]
101fn get_raw_price(bytes: &[u8]) -> PriceRaw {
102 PriceRaw::from_le_bytes(
103 bytes
104 .try_into()
105 .expect("Price raw bytes must be exactly the size of PriceRaw"),
106 )
107}
108
109#[inline]
110fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
111 QuantityRaw::from_le_bytes(
112 bytes
113 .try_into()
114 .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
115 )
116}
117
118#[inline]
126fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
127 let raw = get_raw_price(bytes);
128
129 if raw == PRICE_UNDEF || raw == PRICE_ERROR {
131 return raw;
132 }
133
134 correct_price_raw(raw, precision)
135}
136
137#[inline]
145fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
146 let raw = get_raw_quantity(bytes);
147
148 if raw == QUANTITY_UNDEF {
150 return raw;
151 }
152
153 correct_quantity_raw(raw, precision)
154}
155
156pub fn decode_price(
165 bytes: &[u8],
166 precision: u8,
167 field: &'static str,
168 row: usize,
169) -> Result<Price, EncodingError> {
170 let raw = get_corrected_raw_price(bytes, precision);
171 Price::from_raw_checked(raw, precision)
172 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
173}
174
175pub fn decode_quantity(
184 bytes: &[u8],
185 precision: u8,
186 field: &'static str,
187 row: usize,
188) -> Result<Quantity, EncodingError> {
189 let raw = get_corrected_raw_quantity(bytes, precision);
190 Quantity::from_raw_checked(raw, precision)
191 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
192}
193
194pub fn decode_price_with_sentinel(
202 bytes: &[u8],
203 precision: u8,
204 field: &'static str,
205 row: usize,
206) -> Result<Price, EncodingError> {
207 let raw = get_raw_price(bytes);
208 let (final_raw, final_precision) = if raw == PRICE_UNDEF {
209 (raw, 0)
210 } else {
211 (get_corrected_raw_price(bytes, precision), precision)
212 };
213 Price::from_raw_checked(final_raw, final_precision)
214 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
215}
216
217pub fn decode_quantity_with_sentinel(
225 bytes: &[u8],
226 precision: u8,
227 field: &'static str,
228 row: usize,
229) -> Result<Quantity, EncodingError> {
230 let raw = get_raw_quantity(bytes);
231 let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
232 (raw, 0)
233 } else {
234 (get_corrected_raw_quantity(bytes, precision), precision)
235 };
236 Quantity::from_raw_checked(final_raw, final_precision)
237 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
238}
239
240pub trait ArrowSchemaProvider {
242 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
244
245 #[must_use]
247 fn get_schema_map() -> HashMap<String, String> {
248 let schema = Self::get_schema(None);
249 let mut map = HashMap::new();
250 for field in schema.fields() {
251 let name = field.name().clone();
252 let data_type = format!("{:?}", field.data_type());
253 map.insert(name, data_type);
254 }
255 map
256 }
257}
258
259pub trait EncodeToRecordBatch
261where
262 Self: Sized + ArrowSchemaProvider,
263{
264 fn encode_batch(
270 metadata: &HashMap<String, String>,
271 data: &[Self],
272 ) -> Result<RecordBatch, ArrowError>;
273
274 fn metadata(&self) -> HashMap<String, String>;
276
277 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
283 chunk
284 .first()
285 .map(|elem| elem.metadata())
286 .expect("Chunk must have at least one element to encode")
287 }
288}
289
290pub trait DecodeFromRecordBatch
292where
293 Self: Sized + Into<Data> + ArrowSchemaProvider,
294{
295 fn decode_batch(
301 metadata: &HashMap<String, String>,
302 record_batch: RecordBatch,
303 ) -> Result<Vec<Self>, EncodingError>;
304}
305
306pub trait DecodeDataFromRecordBatch
308where
309 Self: Sized + ArrowSchemaProvider,
310{
311 fn decode_data_batch(
317 metadata: &HashMap<String, String>,
318 record_batch: RecordBatch,
319 ) -> Result<Vec<Data>, EncodingError>;
320}
321
322pub trait WriteStream {
324 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
330}
331
332impl<T: Write> WriteStream for T {
333 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
334 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
335 writer.write(record_batch)?;
336 writer.finish()?;
337 Ok(())
338 }
339}
340
341pub fn extract_column_string<'a>(
350 cols: &'a [ArrayRef],
351 column_key: &'static str,
352 column_index: usize,
353) -> Result<StringColumnRef<'a>, EncodingError> {
354 let column_values = cols
355 .get(column_index)
356 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
357 let dt = column_values.data_type();
358 if let Some(arr) = column_values.as_any().downcast_ref::<StringArray>() {
359 Ok(StringColumnRef::Utf8(arr))
360 } else if let Some(arr) = column_values.as_any().downcast_ref::<StringViewArray>() {
361 Ok(StringColumnRef::Utf8View(arr))
362 } else {
363 Err(EncodingError::InvalidColumnType(
364 column_key,
365 column_index,
366 DataType::Utf8,
367 dt.clone(),
368 ))
369 }
370}
371
372#[derive(Debug)]
374pub enum StringColumnRef<'a> {
375 Utf8(&'a StringArray),
376 Utf8View(&'a StringViewArray),
377}
378
379impl StringColumnRef<'_> {
380 #[inline]
382 pub fn value(&self, i: usize) -> &str {
383 match self {
384 Self::Utf8(arr) => arr.value(i),
385 Self::Utf8View(arr) => arr.value(i),
386 }
387 }
388}
389
390pub fn extract_column<'a, T: Array + 'static>(
398 cols: &'a [ArrayRef],
399 column_key: &'static str,
400 column_index: usize,
401 expected_type: DataType,
402) -> Result<&'a T, EncodingError> {
403 let column_values = cols
404 .get(column_index)
405 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
406 let downcasted_values =
407 column_values
408 .as_any()
409 .downcast_ref::<T>()
410 .ok_or(EncodingError::InvalidColumnType(
411 column_key,
412 column_index,
413 expected_type,
414 column_values.data_type().clone(),
415 ))?;
416 Ok(downcasted_values)
417}
418
419pub fn validate_precision_bytes(
429 array: &FixedSizeBinaryArray,
430 field: &'static str,
431) -> Result<(), EncodingError> {
432 let actual = array.value_length();
433 if actual != PRECISION_BYTES {
434 return Err(EncodingError::PrecisionMismatch {
435 field,
436 expected_bytes: PRECISION_BYTES,
437 actual_bytes: actual,
438 });
439 }
440 Ok(())
441}
442
443pub fn book_deltas_to_arrow_record_batch_bytes(
451 data: &[OrderBookDelta],
452) -> Result<RecordBatch, EncodingError> {
453 if data.is_empty() {
454 return Err(EncodingError::EmptyData);
455 }
456
457 let metadata = OrderBookDelta::chunk_metadata(data);
459 OrderBookDelta::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
460}
461
462#[allow(clippy::missing_panics_doc)] pub fn book_depth10_to_arrow_record_batch_bytes(
471 data: &[OrderBookDepth10],
472) -> Result<RecordBatch, EncodingError> {
473 if data.is_empty() {
474 return Err(EncodingError::EmptyData);
475 }
476
477 let first = data.first().unwrap();
479 let metadata = first.metadata();
480 OrderBookDepth10::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
481}
482
483#[allow(clippy::missing_panics_doc)] pub fn quotes_to_arrow_record_batch_bytes(
492 data: &[QuoteTick],
493) -> Result<RecordBatch, EncodingError> {
494 if data.is_empty() {
495 return Err(EncodingError::EmptyData);
496 }
497
498 let first = data.first().unwrap();
500 let metadata = first.metadata();
501 QuoteTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
502}
503
504#[allow(clippy::missing_panics_doc)] pub fn trades_to_arrow_record_batch_bytes(
513 data: &[TradeTick],
514) -> Result<RecordBatch, EncodingError> {
515 if data.is_empty() {
516 return Err(EncodingError::EmptyData);
517 }
518
519 let first = data.first().unwrap();
521 let metadata = first.metadata();
522 TradeTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
523}
524
525#[allow(clippy::missing_panics_doc)] pub fn bars_to_arrow_record_batch_bytes(data: &[Bar]) -> Result<RecordBatch, EncodingError> {
534 if data.is_empty() {
535 return Err(EncodingError::EmptyData);
536 }
537
538 let first = data.first().unwrap();
540 let metadata = first.metadata();
541 Bar::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
542}
543
544#[allow(clippy::missing_panics_doc)] pub fn mark_prices_to_arrow_record_batch_bytes(
553 data: &[MarkPriceUpdate],
554) -> Result<RecordBatch, EncodingError> {
555 if data.is_empty() {
556 return Err(EncodingError::EmptyData);
557 }
558
559 let first = data.first().unwrap();
561 let metadata = first.metadata();
562 MarkPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
563}
564
565#[allow(clippy::missing_panics_doc)] pub fn index_prices_to_arrow_record_batch_bytes(
574 data: &[IndexPriceUpdate],
575) -> Result<RecordBatch, EncodingError> {
576 if data.is_empty() {
577 return Err(EncodingError::EmptyData);
578 }
579
580 let first = data.first().unwrap();
582 let metadata = first.metadata();
583 IndexPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
584}
585
586#[allow(clippy::missing_panics_doc)] pub fn instrument_closes_to_arrow_record_batch_bytes(
595 data: &[InstrumentClose],
596) -> Result<RecordBatch, EncodingError> {
597 if data.is_empty() {
598 return Err(EncodingError::EmptyData);
599 }
600
601 let first = data.first().unwrap();
603 let metadata = first.metadata();
604 InstrumentClose::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
605}