pub mod bar;
pub mod close;
pub mod custom;
pub mod delta;
pub mod depth;
pub mod index_price;
pub mod instrument;
pub mod mark_price;
pub mod quote;
pub mod trade;
use std::{
collections::HashMap,
io::{self, Write},
};
use arrow::{
array::{Array, ArrayRef, FixedSizeBinaryArray, StringArray, StringViewArray},
datatypes::{DataType, Schema},
error::ArrowError,
ipc::writer::StreamWriter,
record_batch::RecordBatch,
};
use nautilus_model::{
data::{
Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
},
types::{
PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
price::PriceRaw,
quantity::QuantityRaw,
},
};
#[cfg(feature = "python")]
use pyo3::prelude::*;
const KEY_BAR_TYPE: &str = "bar_type";
pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
pub const KEY_PRICE_PRECISION: &str = "price_precision";
pub const KEY_SIZE_PRECISION: &str = "size_precision";
#[derive(thiserror::Error, Debug)]
pub enum DataStreamingError {
#[error("I/O error: {0}")]
IoError(#[from] io::Error),
#[error("Arrow error: {0}")]
ArrowError(#[from] arrow::error::ArrowError),
#[cfg(feature = "python")]
#[error("Python error: {0}")]
PythonError(#[from] PyErr),
}
#[derive(thiserror::Error, Debug)]
pub enum EncodingError {
#[error("Empty data")]
EmptyData,
#[error("Missing metadata key: `{0}`")]
MissingMetadata(&'static str),
#[error("Missing data column: `{0}` at index {1}")]
MissingColumn(&'static str, usize),
#[error("Error parsing `{0}`: {1}")]
ParseError(&'static str, String),
#[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
InvalidColumnType(&'static str, usize, DataType, DataType),
#[error(
"Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
but this build expects {expected_bytes} bytes. The catalog was created with a different \
precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
)]
PrecisionMismatch {
field: &'static str,
expected_bytes: i32,
actual_bytes: i32,
},
#[error("Arrow error: {0}")]
ArrowError(#[from] arrow::error::ArrowError),
}
#[inline]
fn get_raw_price(bytes: &[u8]) -> PriceRaw {
PriceRaw::from_le_bytes(
bytes
.try_into()
.expect("Price raw bytes must be exactly the size of PriceRaw"),
)
}
#[inline]
fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
QuantityRaw::from_le_bytes(
bytes
.try_into()
.expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
)
}
#[inline]
fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
let raw = get_raw_price(bytes);
if raw == PRICE_UNDEF || raw == PRICE_ERROR {
return raw;
}
correct_price_raw(raw, precision)
}
#[inline]
fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
let raw = get_raw_quantity(bytes);
if raw == QUANTITY_UNDEF {
return raw;
}
correct_quantity_raw(raw, precision)
}
pub fn decode_price(
bytes: &[u8],
precision: u8,
field: &'static str,
row: usize,
) -> Result<Price, EncodingError> {
let raw = get_corrected_raw_price(bytes, precision);
Price::from_raw_checked(raw, precision)
.map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
}
pub fn decode_quantity(
bytes: &[u8],
precision: u8,
field: &'static str,
row: usize,
) -> Result<Quantity, EncodingError> {
let raw = get_corrected_raw_quantity(bytes, precision);
Quantity::from_raw_checked(raw, precision)
.map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
}
pub fn decode_price_with_sentinel(
bytes: &[u8],
precision: u8,
field: &'static str,
row: usize,
) -> Result<Price, EncodingError> {
let raw = get_raw_price(bytes);
let (final_raw, final_precision) = if raw == PRICE_UNDEF {
(raw, 0)
} else {
(get_corrected_raw_price(bytes, precision), precision)
};
Price::from_raw_checked(final_raw, final_precision)
.map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
}
pub fn decode_quantity_with_sentinel(
bytes: &[u8],
precision: u8,
field: &'static str,
row: usize,
) -> Result<Quantity, EncodingError> {
let raw = get_raw_quantity(bytes);
let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
(raw, 0)
} else {
(get_corrected_raw_quantity(bytes, precision), precision)
};
Quantity::from_raw_checked(final_raw, final_precision)
.map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
}
pub trait ArrowSchemaProvider {
fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
#[must_use]
fn get_schema_map() -> HashMap<String, String> {
let schema = Self::get_schema(None);
let mut map = HashMap::new();
for field in schema.fields() {
let name = field.name().clone();
let data_type = format!("{:?}", field.data_type());
map.insert(name, data_type);
}
map
}
}
pub trait EncodeToRecordBatch
where
Self: Sized + ArrowSchemaProvider,
{
fn encode_batch(
metadata: &HashMap<String, String>,
data: &[Self],
) -> Result<RecordBatch, ArrowError>;
fn metadata(&self) -> HashMap<String, String>;
fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
chunk
.first()
.map(|elem| elem.metadata())
.expect("Chunk must have at least one element to encode")
}
}
pub trait DecodeFromRecordBatch
where
Self: Sized + Into<Data> + ArrowSchemaProvider,
{
fn decode_batch(
metadata: &HashMap<String, String>,
record_batch: RecordBatch,
) -> Result<Vec<Self>, EncodingError>;
}
pub trait DecodeDataFromRecordBatch
where
Self: Sized + ArrowSchemaProvider,
{
fn decode_data_batch(
metadata: &HashMap<String, String>,
record_batch: RecordBatch,
) -> Result<Vec<Data>, EncodingError>;
}
pub trait WriteStream {
fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
}
impl<T: Write> WriteStream for T {
fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
writer.write(record_batch)?;
writer.finish()?;
Ok(())
}
}
pub fn extract_column_string<'a>(
cols: &'a [ArrayRef],
column_key: &'static str,
column_index: usize,
) -> Result<StringColumnRef<'a>, EncodingError> {
let column_values = cols
.get(column_index)
.ok_or(EncodingError::MissingColumn(column_key, column_index))?;
let dt = column_values.data_type();
if let Some(arr) = column_values.as_any().downcast_ref::<StringArray>() {
Ok(StringColumnRef::Utf8(arr))
} else if let Some(arr) = column_values.as_any().downcast_ref::<StringViewArray>() {
Ok(StringColumnRef::Utf8View(arr))
} else {
Err(EncodingError::InvalidColumnType(
column_key,
column_index,
DataType::Utf8,
dt.clone(),
))
}
}
#[derive(Debug)]
pub enum StringColumnRef<'a> {
Utf8(&'a StringArray),
Utf8View(&'a StringViewArray),
}
impl StringColumnRef<'_> {
#[inline]
pub fn value(&self, i: usize) -> &str {
match self {
Self::Utf8(arr) => arr.value(i),
Self::Utf8View(arr) => arr.value(i),
}
}
}
pub fn extract_column<'a, T: Array + 'static>(
cols: &'a [ArrayRef],
column_key: &'static str,
column_index: usize,
expected_type: DataType,
) -> Result<&'a T, EncodingError> {
let column_values = cols
.get(column_index)
.ok_or(EncodingError::MissingColumn(column_key, column_index))?;
let downcasted_values =
column_values
.as_any()
.downcast_ref::<T>()
.ok_or(EncodingError::InvalidColumnType(
column_key,
column_index,
expected_type,
column_values.data_type().clone(),
))?;
Ok(downcasted_values)
}
pub fn validate_precision_bytes(
array: &FixedSizeBinaryArray,
field: &'static str,
) -> Result<(), EncodingError> {
let actual = array.value_length();
if actual != PRECISION_BYTES {
return Err(EncodingError::PrecisionMismatch {
field,
expected_bytes: PRECISION_BYTES,
actual_bytes: actual,
});
}
Ok(())
}
pub fn book_deltas_to_arrow_record_batch_bytes(
data: &[OrderBookDelta],
) -> Result<RecordBatch, EncodingError> {
if data.is_empty() {
return Err(EncodingError::EmptyData);
}
let metadata = OrderBookDelta::chunk_metadata(data);
OrderBookDelta::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
}
#[allow(clippy::missing_panics_doc)] pub fn book_depth10_to_arrow_record_batch_bytes(
data: &[OrderBookDepth10],
) -> Result<RecordBatch, EncodingError> {
if data.is_empty() {
return Err(EncodingError::EmptyData);
}
let first = data.first().unwrap();
let metadata = first.metadata();
OrderBookDepth10::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
}
#[allow(clippy::missing_panics_doc)] pub fn quotes_to_arrow_record_batch_bytes(
data: &[QuoteTick],
) -> Result<RecordBatch, EncodingError> {
if data.is_empty() {
return Err(EncodingError::EmptyData);
}
let first = data.first().unwrap();
let metadata = first.metadata();
QuoteTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
}
#[allow(clippy::missing_panics_doc)] pub fn trades_to_arrow_record_batch_bytes(
data: &[TradeTick],
) -> Result<RecordBatch, EncodingError> {
if data.is_empty() {
return Err(EncodingError::EmptyData);
}
let first = data.first().unwrap();
let metadata = first.metadata();
TradeTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
}
#[allow(clippy::missing_panics_doc)] pub fn bars_to_arrow_record_batch_bytes(data: &[Bar]) -> Result<RecordBatch, EncodingError> {
if data.is_empty() {
return Err(EncodingError::EmptyData);
}
let first = data.first().unwrap();
let metadata = first.metadata();
Bar::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
}
#[allow(clippy::missing_panics_doc)] pub fn mark_prices_to_arrow_record_batch_bytes(
data: &[MarkPriceUpdate],
) -> Result<RecordBatch, EncodingError> {
if data.is_empty() {
return Err(EncodingError::EmptyData);
}
let first = data.first().unwrap();
let metadata = first.metadata();
MarkPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
}
#[allow(clippy::missing_panics_doc)] pub fn index_prices_to_arrow_record_batch_bytes(
data: &[IndexPriceUpdate],
) -> Result<RecordBatch, EncodingError> {
if data.is_empty() {
return Err(EncodingError::EmptyData);
}
let first = data.first().unwrap();
let metadata = first.metadata();
IndexPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
}
#[allow(clippy::missing_panics_doc)] pub fn instrument_closes_to_arrow_record_batch_bytes(
data: &[InstrumentClose],
) -> Result<RecordBatch, EncodingError> {
if data.is_empty() {
return Err(EncodingError::EmptyData);
}
let first = data.first().unwrap();
let metadata = first.metadata();
InstrumentClose::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
}