use crate::errors::PersistError;
use crate::trades::Trade;
use std::path::Path;
#[cfg(feature = "parquet")]
pub fn write_trades_parquet(trades: &[Trade], path: &Path) -> Result<(), PersistError> {
use arrow::{
array::{Float64Array, StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use parquet::{
arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties,
};
use std::{fs::File, sync::Arc};
let n = trades.len();
let mut timestamps = Vec::with_capacity(n);
let mut symbols: Vec<&str> = Vec::with_capacity(n);
let mut sides: Vec<&str> = Vec::with_capacity(n);
let mut prices = Vec::with_capacity(n);
let mut amounts = Vec::with_capacity(n);
let mut exchanges: Vec<&str> = Vec::with_capacity(n);
let mut ids: Vec<&str> = Vec::with_capacity(n);
for t in trades {
timestamps.push(t.trade_ts);
symbols.push(&t.symbol);
sides.push(&t.side);
prices.push(t.price);
amounts.push(t.amount);
exchanges.push(&t.exchange);
ids.push(&t.id);
}
let schema = Schema::new(vec![
Field::new("trade_ts", DataType::UInt64, false),
Field::new("symbol", DataType::Utf8, false),
Field::new("side", DataType::Utf8, false),
Field::new("price", DataType::Float64, false),
Field::new("amount", DataType::Float64, false),
Field::new("exchange", DataType::Utf8, false),
Field::new("id", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(UInt64Array::from(timestamps)),
Arc::new(StringArray::from(symbols)),
Arc::new(StringArray::from(sides)),
Arc::new(Float64Array::from(prices)),
Arc::new(Float64Array::from(amounts)),
Arc::new(StringArray::from(exchanges)),
Arc::new(StringArray::from(ids)),
],
)?;
let file = File::create(path)?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(())
}
#[cfg(not(feature = "parquet"))]
pub fn write_trades_parquet(_trades: &[Trade], _path: &Path) -> Result<(), PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn read_trades_parquet(path: &Path) -> Result<Vec<Trade>, PersistError> {
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
let mut trades = Vec::new();
for batch_result in reader {
let batch = batch_result?;
let timestamps = batch
.column(0)
.as_primitive::<arrow::datatypes::UInt64Type>();
let symbols = batch.column(1).as_string::<i32>();
let sides = batch.column(2).as_string::<i32>();
let prices = batch
.column(3)
.as_primitive::<arrow::datatypes::Float64Type>();
let amounts = batch
.column(4)
.as_primitive::<arrow::datatypes::Float64Type>();
let exchanges = batch.column(5).as_string::<i32>();
let ids = batch.column(6).as_string::<i32>();
for i in 0..batch.num_rows() {
trades.push(Trade {
trade_ts: timestamps.value(i),
symbol: symbols.value(i).to_string(),
side: sides.value(i).to_string(),
price: prices.value(i),
amount: amounts.value(i),
exchange: exchanges.value(i).to_string(),
id: ids.value(i).to_string(),
});
}
}
Ok(trades)
}
#[cfg(not(feature = "parquet"))]
pub fn read_trades_parquet(_path: &Path) -> Result<Vec<Trade>, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn write_trades_parquet_timestamped(
trades: &[Trade],
output_dir: &Path,
mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
let file_ts = chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f");
let raw_symbol = trades
.first()
.map(|t| t.symbol.as_str())
.unwrap_or("unknown");
let exchange = trades
.first()
.map(|t| t.exchange.as_str())
.unwrap_or("unknonw");
let symbol = raw_symbol.replace('/', "-");
let filename = format!(
"{}_{}_trades_{}_{}.parquet",
exchange, symbol, mode, file_ts
);
let path = output_dir.join(filename);
write_trades_parquet(trades, &path)?;
Ok(path)
}
#[cfg(not(feature = "parquet"))]
pub fn write_trades_parquet_timestamped(
_trades: &[Trade],
_output_dir: &Path,
_mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}