use crate::orderbooks::{Orderbook, OrderbookTarget, OrderbookTargetData};
use crate::{errors::PersistError, orderbooks::OrderbookDelta};
use std::path::Path;
#[cfg(feature = "parquet")]
pub fn write_ob_delta_parquet(
ob: &OrderbookDelta,
path: &Path,
) -> Result<(), PersistError> {
use crate::utils::{current_timestamp_ms, decimal_to_f64};
use arrow::{
array::{Float64Array, StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use parquet::{
arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties,
};
use std::{fs::File, sync::Arc};
let timestamp = current_timestamp_ms();
let symbol = ob.symbol();
let exchange = ob.exchange();
let mut timestamps: Vec<u64> = Vec::new();
let mut symbols: Vec<&str> = Vec::new();
let mut exchanges: Vec<&str> = Vec::new();
let mut sides: Vec<&str> = Vec::new();
let mut levels: Vec<u64> = Vec::new();
let mut prices: Vec<f64> = Vec::new();
let mut sizes: Vec<f64> = Vec::new();
for (level, (price, size)) in ob.top_bids(ob.bid_depth()).iter().enumerate() {
timestamps.push(timestamp);
symbols.push(symbol);
exchanges.push(exchange);
sides.push("bid");
levels.push(level as u64);
prices.push(decimal_to_f64(*price));
sizes.push(decimal_to_f64(*size));
}
for (level, (price, size)) in ob.top_asks(ob.ask_depth()).iter().enumerate() {
timestamps.push(timestamp);
symbols.push(symbol);
exchanges.push(exchange);
sides.push("ask");
levels.push(level as u64);
prices.push(decimal_to_f64(*price));
sizes.push(decimal_to_f64(*size));
}
let schema = Schema::new(vec![
Field::new("timestamp_ms", DataType::UInt64, false),
Field::new("symbol", DataType::Utf8, false),
Field::new("side", DataType::Utf8, false),
Field::new("exchange", DataType::Utf8, false),
Field::new("level", DataType::UInt64, false),
Field::new("price", DataType::Float64, false),
Field::new("size", DataType::Float64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(UInt64Array::from(timestamps)),
Arc::new(StringArray::from(symbols)),
Arc::new(StringArray::from(exchanges)),
Arc::new(StringArray::from(sides)),
Arc::new(UInt64Array::from(levels)),
Arc::new(Float64Array::from(prices)),
Arc::new(Float64Array::from(sizes)),
],
)?;
let file = File::create(path)?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(())
}
#[cfg(not(feature = "parquet"))]
pub fn write_ob_delta_parquet(
_ob: &OrderbookDelta,
_path: &Path,
) -> Result<(), PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(not(feature = "parquet"))]
pub fn load_parquet_to_delta(_path: &Path) -> Result<OrderbookDelta, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(not(feature = "parquet"))]
pub fn load_parquet_to_ob(_path: &Path) -> Result<Vec<Orderbook>, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(not(feature = "parquet"))]
pub fn read_ob_parquet(
_path: &Path,
_target: OrderbookTarget,
) -> Result<OrderbookTargetData, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn read_ob_parquet(
path: &Path,
target: OrderbookTarget,
) -> Result<OrderbookTargetData, PersistError> {
match target {
OrderbookTarget::Delta => {
let delta = load_parquet_to_delta(path)?;
Ok(OrderbookTargetData::Delta(delta))
}
OrderbookTarget::Snapshot => {
let orderbooks = load_parquet_to_ob(path)?;
Ok(OrderbookTargetData::Snapshot(orderbooks))
}
}
}
#[cfg(feature = "parquet")]
pub fn load_parquet_to_delta(path: &Path) -> Result<OrderbookDelta, PersistError> {
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use rust_decimal::Decimal;
use std::{collections::BTreeMap, fs::File};
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
let mut bids: BTreeMap<Decimal, Decimal> = BTreeMap::new();
let mut asks: BTreeMap<Decimal, Decimal> = BTreeMap::new();
let mut symbol = String::new();
let mut exchange = String::new();
for batch_result in reader {
let batch = batch_result?;
let symbols = batch.column(1).as_string::<i32>();
let exchanges = batch.column(2).as_string::<i32>();
let sides = batch.column(3).as_string::<i32>();
let prices = batch
.column(5)
.as_primitive::<arrow::datatypes::Float64Type>();
let sizes = batch
.column(6)
.as_primitive::<arrow::datatypes::Float64Type>();
for i in 0..batch.num_rows() {
if symbol.is_empty() {
symbol = symbols.value(i).to_string();
}
if exchange.is_empty() {
exchange = exchanges.value(i).to_string();
}
let side = sides.value(i);
let price = Decimal::from_f64_retain(prices.value(i))
.ok_or_else(|| PersistError::Parse("price conversion failed".into()))?;
let size = Decimal::from_f64_retain(sizes.value(i))
.ok_or_else(|| PersistError::Parse("size conversion failed".into()))?;
match side {
"bid" => {
bids.insert(price, size);
}
"ask" => {
asks.insert(price, size);
}
_ => {}
}
}
}
Ok(OrderbookDelta::from_maps(symbol, exchange, bids, asks))
}
#[cfg(feature = "parquet")]
pub fn load_parquet_to_ob(path: &Path) -> Result<Vec<Orderbook>, PersistError> {
use crate::{Level, OrderSide};
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::{collections::BTreeMap, fs::File};
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
type LevelRow = (u64, f64, f64);
type SnapshotOrderbook = (String, String, Vec<LevelRow>, Vec<LevelRow>);
let mut groups: BTreeMap<u64, SnapshotOrderbook> = BTreeMap::new();
for batch_result in reader {
let batch = batch_result?;
let timestamps = batch
.column(0)
.as_primitive::<arrow::datatypes::UInt64Type>();
let symbols = batch.column(1).as_string::<i32>();
let exchanges = batch.column(2).as_string::<i32>();
let sides = batch.column(3).as_string::<i32>();
let levels = batch
.column(4)
.as_primitive::<arrow::datatypes::UInt64Type>();
let prices = batch
.column(5)
.as_primitive::<arrow::datatypes::Float64Type>();
let sizes = batch
.column(6)
.as_primitive::<arrow::datatypes::Float64Type>();
for i in 0..batch.num_rows() {
let ts = timestamps.value(i);
let symbol = symbols.value(i).to_string();
let exchange = exchanges.value(i).to_string();
let side = sides.value(i);
let level = levels.value(i);
let price = prices.value(i);
let size = sizes.value(i);
let entry = groups
.entry(ts)
.or_insert_with(|| (symbol, exchange, Vec::new(), Vec::new()));
match side {
"bid" => entry.2.push((level, price, size)),
"ask" => entry.3.push((level, price, size)),
_ => {}
}
}
}
let orderbooks: Vec<Orderbook> = groups
.into_iter()
.map(|(ts, (symbol, exchange, mut bid_levels, mut ask_levels))| {
bid_levels.sort_by_key(|(l, _, _)| *l);
let bids: Vec<Level> = bid_levels
.into_iter()
.map(|(idx, price, volume)| {
Level::new(idx as u32, OrderSide::Bids, price, volume, vec![])
})
.collect();
ask_levels.sort_by_key(|(l, _, _)| *l);
let asks: Vec<Level> = ask_levels
.into_iter()
.map(|(idx, price, volume)| {
Level::new(idx as u32, OrderSide::Asks, price, volume, vec![])
})
.collect();
Orderbook::new(0, ts, symbol, exchange, bids, asks)
})
.collect();
Ok(orderbooks)
}
#[cfg(not(feature = "parquet"))]
pub fn write_ob_parquet(
_snapshots: &[Orderbook],
_output_dir: &Path,
_mode: &str,
) -> Result<OrderbookTargetData, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn write_ob_parquet(
snapshots: &[Orderbook],
output_dir: &Path,
mode: &str,
) -> anyhow::Result<std::path::PathBuf> {
use arrow::{
array::{Float64Array, StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use parquet::{
arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties,
};
use std::{fs::File, sync::Arc};
let total_rows: usize = snapshots
.iter()
.map(|ob| ob.bids.len() + ob.asks.len())
.sum();
let file_symbol = snapshots[0].symbol.replace('/', "-");
let file_exchange = snapshots[0].exchange.clone();
let mut timestamps = Vec::with_capacity(total_rows);
let mut symbols: Vec<&str> = Vec::with_capacity(total_rows);
let mut exchanges: Vec<&str> = Vec::with_capacity(total_rows);
let mut sides: Vec<&str> = Vec::with_capacity(total_rows);
let mut levels: Vec<u64> = Vec::with_capacity(total_rows);
let mut prices = Vec::with_capacity(total_rows);
let mut sizes = Vec::with_capacity(total_rows);
for ob in snapshots {
for (idx, bid) in ob.bids.iter().enumerate() {
timestamps.push(ob.orderbook_ts);
symbols.push(&ob.symbol);
exchanges.push(&ob.exchange);
sides.push("bid");
levels.push(idx as u64);
prices.push(bid.price);
sizes.push(bid.volume);
}
for (idx, ask) in ob.asks.iter().enumerate() {
timestamps.push(ob.orderbook_ts);
symbols.push(&ob.symbol);
exchanges.push(&ob.exchange);
sides.push("ask");
levels.push(idx as u64);
prices.push(ask.price);
sizes.push(ask.volume);
}
}
let schema = Schema::new(vec![
Field::new("timestamp_ns", DataType::UInt64, false),
Field::new("symbol", DataType::Utf8, false),
Field::new("exchange", DataType::Utf8, false),
Field::new("side", DataType::Utf8, false),
Field::new("level", DataType::UInt64, false),
Field::new("price", DataType::Float64, false),
Field::new("size", DataType::Float64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(UInt64Array::from(timestamps)),
Arc::new(StringArray::from(symbols)),
Arc::new(StringArray::from(exchanges)),
Arc::new(StringArray::from(sides)),
Arc::new(UInt64Array::from(levels)),
Arc::new(Float64Array::from(prices)),
Arc::new(Float64Array::from(sizes)),
],
)?;
let file_ts = chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f");
let filename = format!(
"{}_{}_ob_{}_{}.parquet",
file_exchange, file_symbol, mode, file_ts
);
let path = output_dir.join(filename);
let file = File::create(&path)?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(path)
}