use crate::errors::PersistError;
use crate::open_interest::OpenInterest;
use std::path::Path;
#[cfg(feature = "parquet")]
pub fn write_oi_parquet(
records: &[OpenInterest],
path: &Path,
) -> Result<(), PersistError> {
use arrow::{
array::{Float64Array, StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use parquet::{
arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties,
};
use std::{fs::File, sync::Arc};
let n = records.len();
let mut timestamps = Vec::with_capacity(n);
let mut symbols: Vec<&str> = Vec::with_capacity(n);
let mut oi_values = Vec::with_capacity(n);
let mut oi_usd_values = Vec::with_capacity(n);
let mut exchanges: Vec<&str> = Vec::with_capacity(n);
for oi in records {
timestamps.push(oi.open_interest_ts);
symbols.push(&oi.symbol);
oi_values.push(oi.open_interest);
oi_usd_values.push(oi.open_interest_value);
exchanges.push(&oi.exchange);
}
let schema = Schema::new(vec![
Field::new("ts", DataType::UInt64, false),
Field::new("symbol", DataType::Utf8, false),
Field::new("open_interest", DataType::Float64, false),
Field::new("open_interest_value", DataType::Float64, false),
Field::new("exchange", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(UInt64Array::from(timestamps)),
Arc::new(StringArray::from(symbols)),
Arc::new(Float64Array::from(oi_values)),
Arc::new(Float64Array::from(oi_usd_values)),
Arc::new(StringArray::from(exchanges)),
],
)?;
let file = File::create(path)?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(())
}
#[cfg(not(feature = "parquet"))]
pub fn write_oi_parquet(
_records: &[OpenInterest],
_path: &Path,
) -> Result<(), PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn read_oi_parquet(path: &Path) -> Result<Vec<OpenInterest>, PersistError> {
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
let mut records = Vec::new();
for batch_result in reader {
let batch = batch_result?;
let timestamps = batch
.column(0)
.as_primitive::<arrow::datatypes::UInt64Type>();
let symbols = batch.column(1).as_string::<i32>();
let oi_values = batch
.column(2)
.as_primitive::<arrow::datatypes::Float64Type>();
let oi_usd_values = batch
.column(3)
.as_primitive::<arrow::datatypes::Float64Type>();
let exchanges = batch.column(4).as_string::<i32>();
for i in 0..batch.num_rows() {
records.push(OpenInterest {
open_interest_ts: timestamps.value(i),
symbol: symbols.value(i).to_string(),
open_interest: oi_values.value(i),
open_interest_value: oi_usd_values.value(i),
exchange: exchanges.value(i).to_string(),
});
}
}
Ok(records)
}
#[cfg(not(feature = "parquet"))]
pub fn read_oi_parquet(_path: &Path) -> Result<Vec<OpenInterest>, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn write_oi_parquet_timestamped(
records: &[OpenInterest],
output_dir: &Path,
mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
let file_ts = chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f");
let raw_symbol = records
.first()
.map(|r| r.symbol.as_str())
.unwrap_or("unknown");
let exchange = records
.first()
.map(|r| r.exchange.as_str())
.unwrap_or("unknown");
let symbol = raw_symbol.replace('/', "-");
let filename = format!("{}_{}_oi_{}_{}.parquet", exchange, symbol, mode, file_ts);
let path = output_dir.join(filename);
write_oi_parquet(records, &path)?;
Ok(path)
}
#[cfg(not(feature = "parquet"))]
pub fn write_oi_parquet_timestamped(
_records: &[OpenInterest],
_output_dir: &Path,
_mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}