use crate::errors::PersistError;
use crate::liquidations::Liquidation;
use std::path::Path;
#[cfg(feature = "parquet")]
pub fn write_liquidations_parquet(
liquidations: &[Liquidation],
path: &Path,
) -> Result<(), PersistError> {
use arrow::{
array::{Float64Array, StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use parquet::{
arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties,
};
use std::{fs::File, sync::Arc};
let n = liquidations.len();
let mut timestamps = Vec::with_capacity(n);
let mut symbols: Vec<&str> = Vec::with_capacity(n);
let mut sides: Vec<&str> = Vec::with_capacity(n);
let mut prices = Vec::with_capacity(n);
let mut amounts = Vec::with_capacity(n);
let mut exchanges: Vec<&str> = Vec::with_capacity(n);
for liq in liquidations {
timestamps.push(liq.liquidation_ts);
symbols.push(&liq.symbol);
sides.push(&liq.side);
prices.push(liq.price);
amounts.push(liq.amount);
exchanges.push(&liq.exchange);
}
let schema = Schema::new(vec![
Field::new("liquidation_ts", DataType::UInt64, false),
Field::new("symbol", DataType::Utf8, false),
Field::new("side", DataType::Utf8, false),
Field::new("price", DataType::Float64, false),
Field::new("amount", DataType::Float64, false),
Field::new("exchange", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(UInt64Array::from(timestamps)),
Arc::new(StringArray::from(symbols)),
Arc::new(StringArray::from(sides)),
Arc::new(Float64Array::from(prices)),
Arc::new(Float64Array::from(amounts)),
Arc::new(StringArray::from(exchanges)),
],
)?;
let file = File::create(path)?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(())
}
#[cfg(not(feature = "parquet"))]
pub fn write_liquidations_parquet(
_liquidations: &[Liquidation],
_path: &Path,
) -> Result<(), PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn read_liquidations_parquet(path: &Path) -> Result<Vec<Liquidation>, PersistError> {
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
let mut liquidations = Vec::new();
for batch_result in reader {
let batch = batch_result?;
let timestamps = batch
.column(0)
.as_primitive::<arrow::datatypes::UInt64Type>();
let symbols = batch.column(1).as_string::<i32>();
let sides = batch.column(2).as_string::<i32>();
let prices = batch
.column(3)
.as_primitive::<arrow::datatypes::Float64Type>();
let amounts = batch
.column(4)
.as_primitive::<arrow::datatypes::Float64Type>();
let exchanges = batch.column(5).as_string::<i32>();
for i in 0..batch.num_rows() {
liquidations.push(Liquidation {
liquidation_ts: timestamps.value(i),
symbol: symbols.value(i).to_string(),
side: sides.value(i).to_string(),
price: prices.value(i),
amount: amounts.value(i),
exchange: exchanges.value(i).to_string(),
});
}
}
Ok(liquidations)
}
#[cfg(not(feature = "parquet"))]
pub fn read_liquidations_parquet(_path: &Path) -> Result<Vec<Liquidation>, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn write_liquidations_parquet_timestamped(
liquidations: &[Liquidation],
output_dir: &Path,
mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
let file_ts = chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f");
let raw_symbol = liquidations
.first()
.map(|l| l.symbol.as_str())
.unwrap_or("unknown");
let exchange = liquidations
.first()
.map(|l| l.exchange.as_str())
.unwrap_or("unknown");
let symbol = raw_symbol.replace('/', "-");
let filename = format!(
"{}_{}_liquidations_{}_{}.parquet",
exchange, symbol, mode, file_ts
);
let path = output_dir.join(filename);
write_liquidations_parquet(liquidations, &path)?;
Ok(path)
}
#[cfg(not(feature = "parquet"))]
pub fn write_liquidations_parquet_timestamped(
_liquidations: &[Liquidation],
_output_dir: &Path,
_mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}