use crate::errors::PersistError;
use crate::funding::FundingRate;
use std::path::Path;
#[cfg(feature = "parquet")]
pub fn write_funding_parquet(
rates: &[FundingRate],
path: &Path,
) -> Result<(), PersistError> {
use arrow::{
array::{Float64Array, StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use parquet::{
arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties,
};
use std::{fs::File, sync::Arc};
let n = rates.len();
let mut timestamps = Vec::with_capacity(n);
let mut symbols: Vec<&str> = Vec::with_capacity(n);
let mut funding_rates = Vec::with_capacity(n);
let mut next_funding_timestamps = Vec::with_capacity(n);
let mut exchanges: Vec<&str> = Vec::with_capacity(n);
for fr in rates {
timestamps.push(fr.funding_rate_ts);
symbols.push(&fr.symbol);
funding_rates.push(fr.funding_rate);
next_funding_timestamps.push(fr.next_funding_ts);
exchanges.push(&fr.exchange);
}
let schema = Schema::new(vec![
Field::new("ts", DataType::UInt64, false),
Field::new("symbol", DataType::Utf8, false),
Field::new("funding_rate", DataType::Float64, false),
Field::new("next_funding_ts", DataType::UInt64, false),
Field::new("exchange", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(UInt64Array::from(timestamps)),
Arc::new(StringArray::from(symbols)),
Arc::new(Float64Array::from(funding_rates)),
Arc::new(UInt64Array::from(next_funding_timestamps)),
Arc::new(StringArray::from(exchanges)),
],
)?;
let file = File::create(path)?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(())
}
#[cfg(not(feature = "parquet"))]
pub fn write_funding_parquet(
_rates: &[FundingRate],
_path: &Path,
) -> Result<(), PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn read_funding_parquet(path: &Path) -> Result<Vec<FundingRate>, PersistError> {
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
let mut rates = Vec::new();
for batch_result in reader {
let batch = batch_result?;
let timestamps = batch
.column(0)
.as_primitive::<arrow::datatypes::UInt64Type>();
let symbols = batch.column(1).as_string::<i32>();
let funding_rates = batch
.column(2)
.as_primitive::<arrow::datatypes::Float64Type>();
let next_funding_timestamps = batch
.column(3)
.as_primitive::<arrow::datatypes::UInt64Type>();
let exchanges = batch.column(4).as_string::<i32>();
for i in 0..batch.num_rows() {
rates.push(FundingRate {
funding_rate_ts: timestamps.value(i),
symbol: symbols.value(i).to_string(),
funding_rate: funding_rates.value(i),
next_funding_ts: next_funding_timestamps.value(i),
exchange: exchanges.value(i).to_string(),
});
}
}
Ok(rates)
}
#[cfg(not(feature = "parquet"))]
pub fn read_funding_parquet(_path: &Path) -> Result<Vec<FundingRate>, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn write_funding_parquet_timestamped(
rates: &[FundingRate],
output_dir: &Path,
mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
let file_ts = chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f");
let raw_symbol = rates
.first()
.map(|r| r.symbol.as_str())
.unwrap_or("unknown");
let exchange = rates
.first()
.map(|r| r.exchange.as_str())
.unwrap_or("unknown");
let symbol = raw_symbol.replace('/', "-");
let filename = format!(
"{}_{}_funding_{}_{}.parquet",
exchange, symbol, mode, file_ts
);
let path = output_dir.join(filename);
write_funding_parquet(rates, &path)?;
Ok(path)
}
#[cfg(not(feature = "parquet"))]
pub fn write_funding_parquet_timestamped(
_rates: &[FundingRate],
_output_dir: &Path,
_mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
Err(PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}