use crate::snapshots::MarketSnapshot;
#[derive(Debug, Clone, PartialEq)]
pub struct MarketAggregate {
pub ts_ns: u64,
pub ob_mid_price: f64,
pub ob_spread: f64,
pub ob_imbalance: f64,
pub trade_volume: f64,
pub trade_vwap: f64,
pub trade_count: u64,
pub liq_notional: f64,
pub liq_count: u64,
pub liq_imbalance: f64,
pub fr_rate: f64,
pub fr_annualized: f64,
pub fr_next_settlement_delta: i64,
pub oi_contracts: f64,
pub oi_value: f64,
pub oi_change: f64,
}
impl MarketAggregate {
pub fn from_snapshot(snap: &MarketSnapshot, prev_oi: f64) -> Self {
let (ob_mid_price, ob_spread, ob_imbalance) = match &snap.orderbook {
Some(ob) => {
let best_bid = ob.bids.first().map(|l| l.price).unwrap_or(0.0);
let best_ask = ob.asks.first().map(|l| l.price).unwrap_or(0.0);
let mid = if best_bid > 0.0 && best_ask > 0.0 {
(best_bid + best_ask) / 2.0
} else {
0.0
};
let spread = if best_bid > 0.0 && best_ask > 0.0 {
best_ask - best_bid
} else {
0.0
};
let bid_vol: f64 = ob.bids.iter().map(|l| l.volume).sum();
let ask_vol: f64 = ob.asks.iter().map(|l| l.volume).sum();
let total_vol = bid_vol + ask_vol;
let imbalance = if total_vol > 0.0 {
(bid_vol - ask_vol) / total_vol
} else {
0.0
};
(mid, spread, imbalance)
}
None => (0.0, 0.0, 0.0),
};
let trade_count = snap.trades.len() as u64;
let trade_volume: f64 = snap.trades.iter().map(|t| t.amount).sum();
let trade_notional: f64 = snap.trades.iter().map(|t| t.price * t.amount).sum();
let trade_vwap = if trade_volume > 0.0 {
trade_notional / trade_volume
} else {
0.0
};
let liq_count = snap.liquidations.len() as u64;
let mut buy_notional: f64 = 0.0;
let mut sell_notional: f64 = 0.0;
for liq in &snap.liquidations {
let notional = liq.price * liq.amount;
match liq.side.as_str() {
"Buy" => buy_notional += notional,
_ => sell_notional += notional,
}
}
let liq_notional = buy_notional + sell_notional;
let liq_imbalance = if liq_notional > 0.0 {
(buy_notional - sell_notional) / liq_notional
} else {
0.0
};
let (fr_rate, fr_annualized, fr_next_settlement_delta) =
match snap.funding_rate.last() {
Some(fr) => {
let rate = fr.funding_rate;
let annualized = rate * 3.0 * 365.0;
let delta = if fr.next_funding_ts > 0 {
fr.next_funding_ts as i64 - fr.funding_rate_ts as i64
} else {
0
};
(rate, annualized, delta)
}
None => (0.0, 0.0, 0),
};
let (oi_contracts, oi_value) = match snap.open_interest.last() {
Some(oi) => (oi.open_interest, oi.open_interest_value),
None => (0.0, 0.0),
};
let oi_change = oi_contracts - prev_oi;
Self {
ts_ns: snap.ts_ns,
ob_mid_price,
ob_spread,
ob_imbalance,
trade_volume,
trade_vwap,
trade_count,
liq_notional,
liq_count,
liq_imbalance,
fr_rate,
fr_annualized,
fr_next_settlement_delta,
oi_contracts,
oi_value,
oi_change,
}
}
}
#[cfg(feature = "parquet")]
pub fn write_market_aggregate_parquet(
aggregates: &[MarketAggregate],
output_dir: &std::path::Path,
) -> Result<std::path::PathBuf, crate::errors::PersistError> {
use arrow::{
array::{Float64Array, Int64Array, UInt64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use parquet::{
arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties,
};
use std::{fs::File, sync::Arc};
let n = aggregates.len();
let mut ts_ns_vec = Vec::with_capacity(n);
let mut ob_mid_vec = Vec::with_capacity(n);
let mut ob_spread_vec = Vec::with_capacity(n);
let mut ob_imb_vec = Vec::with_capacity(n);
let mut tv_vec = Vec::with_capacity(n);
let mut vwap_vec = Vec::with_capacity(n);
let mut tc_vec = Vec::with_capacity(n);
let mut ln_vec = Vec::with_capacity(n);
let mut lc_vec = Vec::with_capacity(n);
let mut li_vec = Vec::with_capacity(n);
let mut fr_vec = Vec::with_capacity(n);
let mut fra_vec = Vec::with_capacity(n);
let mut frd_vec = Vec::with_capacity(n);
let mut oic_vec = Vec::with_capacity(n);
let mut oiv_vec = Vec::with_capacity(n);
let mut oid_vec = Vec::with_capacity(n);
for agg in aggregates {
ts_ns_vec.push(agg.ts_ns);
ob_mid_vec.push(agg.ob_mid_price);
ob_spread_vec.push(agg.ob_spread);
ob_imb_vec.push(agg.ob_imbalance);
tv_vec.push(agg.trade_volume);
vwap_vec.push(agg.trade_vwap);
tc_vec.push(agg.trade_count);
ln_vec.push(agg.liq_notional);
lc_vec.push(agg.liq_count);
li_vec.push(agg.liq_imbalance);
fr_vec.push(agg.fr_rate);
fra_vec.push(agg.fr_annualized);
frd_vec.push(agg.fr_next_settlement_delta);
oic_vec.push(agg.oi_contracts);
oiv_vec.push(agg.oi_value);
oid_vec.push(agg.oi_change);
}
let schema = Schema::new(vec![
Field::new("ts_ns", DataType::UInt64, false),
Field::new("ob_mid_price", DataType::Float64, false),
Field::new("ob_spread", DataType::Float64, false),
Field::new("ob_imbalance", DataType::Float64, false),
Field::new("trade_volume", DataType::Float64, false),
Field::new("trade_vwap", DataType::Float64, false),
Field::new("trade_count", DataType::UInt64, false),
Field::new("liq_notional", DataType::Float64, false),
Field::new("liq_count", DataType::UInt64, false),
Field::new("liq_imbalance", DataType::Float64, false),
Field::new("fr_rate", DataType::Float64, false),
Field::new("fr_annualized", DataType::Float64, false),
Field::new("fr_next_settlement_delta", DataType::Int64, false),
Field::new("oi_contracts", DataType::Float64, false),
Field::new("oi_value", DataType::Float64, false),
Field::new("oi_change", DataType::Float64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(UInt64Array::from(ts_ns_vec)),
Arc::new(Float64Array::from(ob_mid_vec)),
Arc::new(Float64Array::from(ob_spread_vec)),
Arc::new(Float64Array::from(ob_imb_vec)),
Arc::new(Float64Array::from(tv_vec)),
Arc::new(Float64Array::from(vwap_vec)),
Arc::new(UInt64Array::from(tc_vec)),
Arc::new(Float64Array::from(ln_vec)),
Arc::new(UInt64Array::from(lc_vec)),
Arc::new(Float64Array::from(li_vec)),
Arc::new(Float64Array::from(fr_vec)),
Arc::new(Float64Array::from(fra_vec)),
Arc::new(Int64Array::from(frd_vec)),
Arc::new(Float64Array::from(oic_vec)),
Arc::new(Float64Array::from(oiv_vec)),
Arc::new(Float64Array::from(oid_vec)),
],
)?;
let file_ts = chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f");
let filename = format!("market_aggregate_{}.parquet", file_ts);
let path = output_dir.join(filename);
let file = File::create(&path)?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(path)
}
#[cfg(not(feature = "parquet"))]
pub fn write_market_aggregate_parquet(
_aggregates: &[MarketAggregate],
_output_dir: &std::path::Path,
) -> Result<std::path::PathBuf, crate::errors::PersistError> {
Err(crate::errors::PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}
#[cfg(feature = "parquet")]
pub fn read_market_aggregate_parquet(
path: &std::path::Path,
) -> Result<Vec<MarketAggregate>, crate::errors::PersistError> {
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
let mut aggregates = Vec::new();
for batch_result in reader {
let batch = batch_result?;
let ts_ns = batch
.column(0)
.as_primitive::<arrow::datatypes::UInt64Type>();
let ob_mid = batch
.column(1)
.as_primitive::<arrow::datatypes::Float64Type>();
let ob_spread = batch
.column(2)
.as_primitive::<arrow::datatypes::Float64Type>();
let ob_imb = batch
.column(3)
.as_primitive::<arrow::datatypes::Float64Type>();
let tv = batch
.column(4)
.as_primitive::<arrow::datatypes::Float64Type>();
let vwap = batch
.column(5)
.as_primitive::<arrow::datatypes::Float64Type>();
let tc = batch
.column(6)
.as_primitive::<arrow::datatypes::UInt64Type>();
let ln = batch
.column(7)
.as_primitive::<arrow::datatypes::Float64Type>();
let lc = batch
.column(8)
.as_primitive::<arrow::datatypes::UInt64Type>();
let li = batch
.column(9)
.as_primitive::<arrow::datatypes::Float64Type>();
let fr = batch
.column(10)
.as_primitive::<arrow::datatypes::Float64Type>();
let fra = batch
.column(11)
.as_primitive::<arrow::datatypes::Float64Type>();
let frd = batch
.column(12)
.as_primitive::<arrow::datatypes::Int64Type>();
let oic = batch
.column(13)
.as_primitive::<arrow::datatypes::Float64Type>();
let oiv = batch
.column(14)
.as_primitive::<arrow::datatypes::Float64Type>();
let oid = batch
.column(15)
.as_primitive::<arrow::datatypes::Float64Type>();
for i in 0..batch.num_rows() {
aggregates.push(MarketAggregate {
ts_ns: ts_ns.value(i),
ob_mid_price: ob_mid.value(i),
ob_spread: ob_spread.value(i),
ob_imbalance: ob_imb.value(i),
trade_volume: tv.value(i),
trade_vwap: vwap.value(i),
trade_count: tc.value(i),
liq_notional: ln.value(i),
liq_count: lc.value(i),
liq_imbalance: li.value(i),
fr_rate: fr.value(i),
fr_annualized: fra.value(i),
fr_next_settlement_delta: frd.value(i),
oi_contracts: oic.value(i),
oi_value: oiv.value(i),
oi_change: oid.value(i),
});
}
}
Ok(aggregates)
}
#[cfg(not(feature = "parquet"))]
pub fn read_market_aggregate_parquet(
_path: &std::path::Path,
) -> Result<Vec<MarketAggregate>, crate::errors::PersistError> {
Err(crate::errors::PersistError::UnsupportedFormat(
"parquet support not compiled in (enable 'parquet' feature)".to_string(),
))
}