use arrow_array::{
Array, BooleanArray, Float64Array, Int64Array, RecordBatch,
builder::{BooleanBuilder, Float64Builder, Int64Builder, StringBuilder, UInt32Builder},
};
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
use crate::fixed_point::SCALE;
use crate::processor::OpenDeviationBarProcessor;
use crate::timestamp::TimestampUnit;
use crate::trade::Tick;
use crate::types::{DataSource, OpenDeviationBar};
#[derive(Debug, thiserror::Error)]
pub enum ArrowImportError {
#[error("Missing required column '{column}'")]
MissingColumn { column: &'static str },
#[error("Column '{column}': expected {expected}, got {actual}")]
TypeMismatch {
column: &'static str,
expected: &'static str,
actual: String,
},
#[error("Processing error: {0}")]
ProcessingError(String),
}
pub fn record_batch_to_ticks(
batch: &RecordBatch,
timestamp_unit: TimestampUnit,
) -> Result<Vec<Tick>, ArrowImportError> {
let num_rows = batch.num_rows();
if num_rows == 0 {
return Ok(Vec::new());
}
let timestamp_col = get_int64_column(batch, "timestamp")?;
let price_col = get_float64_column(batch, "price")?;
let volume_col = match batch.column_by_name("quantity") {
Some(col) => col.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
ArrowImportError::TypeMismatch {
column: "quantity",
expected: "Float64",
actual: format!("{:?}", col.data_type()),
}
})?,
None => match batch.column_by_name("volume") {
Some(col) => col.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
ArrowImportError::TypeMismatch {
column: "volume",
expected: "Float64",
actual: format!("{:?}", col.data_type()),
}
})?,
None => return Err(ArrowImportError::MissingColumn { column: "quantity" }),
},
};
let agg_trade_id_col = get_optional_int64_column(batch, "ref_id")
.or_else(|| get_optional_int64_column(batch, "agg_trade_id"));
let first_trade_id_col = get_optional_int64_column(batch, "first_sub_id")
.or_else(|| get_optional_int64_column(batch, "first_trade_id"));
let last_trade_id_col = get_optional_int64_column(batch, "last_sub_id")
.or_else(|| get_optional_int64_column(batch, "last_trade_id"));
let is_buyer_maker_col = get_optional_boolean_column(batch, "is_buyer_maker");
let is_best_match_col = get_optional_boolean_column(batch, "is_best_match");
let best_bid_col = get_optional_float64_column(batch, "best_bid");
let best_ask_col = get_optional_float64_column(batch, "best_ask");
let mut trades = Vec::with_capacity(num_rows);
let timestamp_iter = timestamp_col.iter();
let price_iter = price_col.iter();
let volume_iter = volume_col.iter();
for (i, ((timestamp_ms, price), volume)) in
timestamp_iter.zip(price_iter).zip(volume_iter).enumerate()
{
let timestamp_ms = timestamp_ms.expect("timestamp column has non-null rows");
let price = price.expect("price column has non-null rows");
let volume = volume.expect("volume column has non-null rows");
let agg_trade_id = agg_trade_id_col.map(|col| col.value(i)).unwrap_or(i as i64);
let first_trade_id = first_trade_id_col
.map(|col| col.value(i))
.unwrap_or(agg_trade_id);
let last_trade_id = last_trade_id_col
.map(|col| col.value(i))
.unwrap_or(agg_trade_id);
let is_buyer_maker = is_buyer_maker_col.map(|col| col.value(i)).unwrap_or(false);
let is_best_match = is_best_match_col.and_then(|col| {
if col.is_null(i) {
None
} else {
Some(col.value(i))
}
});
let timestamp_us = timestamp_unit.to_microseconds(timestamp_ms);
trades.push(Tick {
ref_id: agg_trade_id,
price: f64_to_fixed_point(price),
volume: f64_to_fixed_point(volume),
first_sub_id: first_trade_id,
last_sub_id: last_trade_id,
timestamp: timestamp_us,
is_buyer_maker,
is_best_match,
best_bid: best_bid_col.and_then(|col| {
if col.is_null(i) { None } else { Some(f64_to_fixed_point(col.value(i))) }
}),
best_ask: best_ask_col.and_then(|col| {
if col.is_null(i) { None } else { Some(f64_to_fixed_point(col.value(i))) }
}),
});
}
Ok(trades)
}
pub fn process_from_arrow_columns(
processor: &mut OpenDeviationBarProcessor,
batch: &RecordBatch,
timestamp_unit: TimestampUnit,
) -> Result<Vec<OpenDeviationBar>, ArrowImportError> {
let num_rows = batch.num_rows();
if num_rows == 0 {
return Ok(Vec::new());
}
let timestamp_col = get_int64_column(batch, "timestamp")?;
let price_col = get_float64_column(batch, "price")?;
let volume_col = match batch.column_by_name("quantity") {
Some(col) => col.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
ArrowImportError::TypeMismatch {
column: "quantity",
expected: "Float64",
actual: format!("{:?}", col.data_type()),
}
})?,
None => match batch.column_by_name("volume") {
Some(col) => col.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
ArrowImportError::TypeMismatch {
column: "volume",
expected: "Float64",
actual: format!("{:?}", col.data_type()),
}
})?,
None => return Err(ArrowImportError::MissingColumn { column: "quantity" }),
},
};
let agg_trade_id_col = get_optional_int64_column(batch, "ref_id")
.or_else(|| get_optional_int64_column(batch, "agg_trade_id"));
let first_trade_id_col = get_optional_int64_column(batch, "first_sub_id")
.or_else(|| get_optional_int64_column(batch, "first_trade_id"));
let last_trade_id_col = get_optional_int64_column(batch, "last_sub_id")
.or_else(|| get_optional_int64_column(batch, "last_trade_id"));
let is_buyer_maker_col = get_optional_boolean_column(batch, "is_buyer_maker");
let is_best_match_col = get_optional_boolean_column(batch, "is_best_match");
let best_bid_col = get_optional_float64_column(batch, "best_bid");
let best_ask_col = get_optional_float64_column(batch, "best_ask");
let ts_multiplier: i64 = if num_rows > 0 {
let first_ts = timestamp_col.value(0);
if first_ts > 1_500_000_000_000_000 {
1 } else {
1000 }
} else if timestamp_unit == TimestampUnit::Microsecond {
1
} else {
1000
};
let mut bars = Vec::with_capacity(num_rows / 100);
let timestamp_iter = timestamp_col.iter();
let price_iter = price_col.iter();
let volume_iter = volume_col.iter();
for (i, ((timestamp_raw, price), volume)) in
timestamp_iter.zip(price_iter).zip(volume_iter).enumerate()
{
let timestamp_raw = timestamp_raw.expect("timestamp column has non-null rows");
let price = price.expect("price column has non-null rows");
let volume = volume.expect("volume column has non-null rows");
let agg_trade_id = agg_trade_id_col.map(|col| col.value(i)).unwrap_or(i as i64);
let first_trade_id = first_trade_id_col
.map(|col| col.value(i))
.unwrap_or(agg_trade_id);
let last_trade_id = last_trade_id_col
.map(|col| col.value(i))
.unwrap_or(agg_trade_id);
let is_buyer_maker = is_buyer_maker_col.map(|col| col.value(i)).unwrap_or(false);
let is_best_match = is_best_match_col.and_then(|col| {
if col.is_null(i) {
None
} else {
Some(col.value(i))
}
});
let trade = Tick {
ref_id: agg_trade_id,
price: f64_to_fixed_point(price),
volume: f64_to_fixed_point(volume),
first_sub_id: first_trade_id,
last_sub_id: last_trade_id,
timestamp: timestamp_raw * ts_multiplier,
is_buyer_maker,
is_best_match,
best_bid: best_bid_col.and_then(|col| {
if col.is_null(i) { None } else { Some(f64_to_fixed_point(col.value(i))) }
}),
best_ask: best_ask_col.and_then(|col| {
if col.is_null(i) { None } else { Some(f64_to_fixed_point(col.value(i))) }
}),
};
match processor.process_single_trade(&trade) {
Ok(Some(bar)) => bars.push(bar),
Ok(None) => {}
Err(e) => return Err(ArrowImportError::ProcessingError(e.to_string())),
}
}
Ok(bars)
}
#[inline]
fn f64_to_fixed_point(value: f64) -> crate::fixed_point::FixedPoint {
crate::fixed_point::FixedPoint((value * SCALE as f64).round() as i64)
}
fn get_int64_column<'a>(
batch: &'a RecordBatch,
name: &'static str,
) -> Result<&'a Int64Array, ArrowImportError> {
let col = batch
.column_by_name(name)
.ok_or(ArrowImportError::MissingColumn { column: name })?;
col.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| ArrowImportError::TypeMismatch {
column: name,
expected: "Int64",
actual: format!("{:?}", col.data_type()),
})
}
fn get_float64_column<'a>(
batch: &'a RecordBatch,
name: &'static str,
) -> Result<&'a Float64Array, ArrowImportError> {
let col = batch
.column_by_name(name)
.ok_or(ArrowImportError::MissingColumn { column: name })?;
col.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| ArrowImportError::TypeMismatch {
column: name,
expected: "Float64",
actual: format!("{:?}", col.data_type()),
})
}
fn get_optional_int64_column<'a>(batch: &'a RecordBatch, name: &str) -> Option<&'a Int64Array> {
batch
.column_by_name(name)
.and_then(|col| col.as_any().downcast_ref::<Int64Array>())
}
fn get_optional_boolean_column<'a>(batch: &'a RecordBatch, name: &str) -> Option<&'a BooleanArray> {
batch
.column_by_name(name)
.and_then(|col| col.as_any().downcast_ref::<BooleanArray>())
}
fn get_optional_float64_column<'a>(batch: &'a RecordBatch, name: &str) -> Option<&'a Float64Array> {
batch
.column_by_name(name)
.and_then(|col| col.as_any().downcast_ref::<Float64Array>())
}
pub fn opendeviationbar_schema() -> Schema {
Schema::new(vec![
Field::new("open_time_us", DataType::Int64, false),
Field::new("close_time_us", DataType::Int64, false),
Field::new("open", DataType::Float64, false),
Field::new("high", DataType::Float64, false),
Field::new("low", DataType::Float64, false),
Field::new("close", DataType::Float64, false),
Field::new("volume", DataType::Float64, false),
Field::new("turnover", DataType::Float64, false),
Field::new("individual_trade_count", DataType::UInt32, false),
Field::new("agg_record_count", DataType::UInt32, false),
Field::new("first_sub_id", DataType::Int64, false),
Field::new("last_sub_id", DataType::Int64, false),
Field::new("first_ref_id", DataType::Int64, false),
Field::new("last_ref_id", DataType::Int64, false),
Field::new("data_source", DataType::Utf8, false),
Field::new("buy_volume", DataType::Float64, false),
Field::new("sell_volume", DataType::Float64, false),
Field::new("buy_trade_count", DataType::UInt32, false),
Field::new("sell_trade_count", DataType::UInt32, false),
Field::new("vwap", DataType::Float64, false),
Field::new("buy_turnover", DataType::Float64, false),
Field::new("sell_turnover", DataType::Float64, false),
Field::new("duration_us", DataType::Int64, false),
Field::new("ofi", DataType::Float64, false),
Field::new("vwap_close_deviation", DataType::Float64, false),
Field::new("price_impact", DataType::Float64, false),
Field::new("kyle_lambda_proxy", DataType::Float64, false),
Field::new("trade_intensity", DataType::Float64, false),
Field::new("volume_per_trade", DataType::Float64, false),
Field::new("aggression_ratio", DataType::Float64, false),
Field::new("aggregation_density", DataType::Float64, false),
Field::new("turnover_imbalance", DataType::Float64, false),
Field::new("has_gap", DataType::Boolean, false),
Field::new("gap_trade_count", DataType::Int64, false),
Field::new("max_gap_duration_us", DataType::Int64, false),
Field::new("is_exchange_gap", DataType::Boolean, false),
Field::new("spread_open", DataType::Float64, true),
Field::new("spread_high", DataType::Float64, true),
Field::new("spread_low", DataType::Float64, true),
Field::new("spread_close", DataType::Float64, true),
Field::new("twas", DataType::Float64, true),
Field::new("spread_mean", DataType::Float64, true),
Field::new("spread_variance", DataType::Float64, true),
Field::new("spread_skewness", DataType::Float64, true),
Field::new("spread_kurtosis", DataType::Float64, true),
Field::new("quote_asymmetry", DataType::Float64, true),
Field::new("bid_ask_imbalance", DataType::Float64, true),
Field::new("time_at_wide_ratio", DataType::Float64, true),
Field::new("time_at_tight_ratio", DataType::Float64, true),
Field::new("bid_open", DataType::Float64, true),
Field::new("bid_high", DataType::Float64, true),
Field::new("bid_low", DataType::Float64, true),
Field::new("bid_close", DataType::Float64, true),
Field::new("ask_open", DataType::Float64, true),
Field::new("ask_high", DataType::Float64, true),
Field::new("ask_low", DataType::Float64, true),
Field::new("ask_close", DataType::Float64, true),
Field::new("spread_kaufman_er", DataType::Float64, true),
Field::new("spread_range", DataType::Float64, true),
Field::new("spread_open_close_ratio", DataType::Float64, true),
Field::new("tick_count_with_quotes", DataType::Float64, true),
Field::new("total_quote_duration_us", DataType::Float64, true),
Field::new("spread_at_breach", DataType::Float64, true),
Field::new("quote_side_imbalance", DataType::Float64, true),
Field::new("quote_staleness_ratio", DataType::Float64, true),
Field::new("signed_tick_ratio", DataType::Float64, true),
Field::new("spread_price_correlation", DataType::Float64, true),
Field::new("tick_arrival_cv", DataType::Float64, true),
Field::new("spread_trajectory_shape", DataType::Float64, true),
Field::new("executable_spread_ratio", DataType::Float64, true),
Field::new("spread_autocorrelation", DataType::Float64, true),
Field::new("mid_momentum_ratio", DataType::Float64, true),
Field::new("worst_spread", DataType::Float64, true),
])
}
pub fn opendeviationbar_vec_to_record_batch(bars: &[OpenDeviationBar]) -> RecordBatch {
let schema = Arc::new(opendeviationbar_schema());
let n = bars.len();
let mut open_time = Int64Builder::with_capacity(n);
let mut close_time = Int64Builder::with_capacity(n);
let mut open = Float64Builder::with_capacity(n);
let mut high = Float64Builder::with_capacity(n);
let mut low = Float64Builder::with_capacity(n);
let mut close = Float64Builder::with_capacity(n);
let mut volume = Float64Builder::with_capacity(n);
let mut turnover = Float64Builder::with_capacity(n);
let mut individual_trade_count = UInt32Builder::with_capacity(n);
let mut agg_record_count = UInt32Builder::with_capacity(n);
let mut first_trade_id = Int64Builder::with_capacity(n);
let mut last_trade_id = Int64Builder::with_capacity(n);
let mut first_agg_trade_id = Int64Builder::with_capacity(n);
let mut last_agg_trade_id = Int64Builder::with_capacity(n);
let mut data_source = StringBuilder::with_capacity(n, n * 16);
let mut buy_volume = Float64Builder::with_capacity(n);
let mut sell_volume = Float64Builder::with_capacity(n);
let mut buy_trade_count = UInt32Builder::with_capacity(n);
let mut sell_trade_count = UInt32Builder::with_capacity(n);
let mut vwap = Float64Builder::with_capacity(n);
let mut buy_turnover = Float64Builder::with_capacity(n);
let mut sell_turnover = Float64Builder::with_capacity(n);
let mut duration_us = Int64Builder::with_capacity(n);
let mut ofi = Float64Builder::with_capacity(n);
let mut vwap_close_deviation = Float64Builder::with_capacity(n);
let mut price_impact = Float64Builder::with_capacity(n);
let mut kyle_lambda_proxy = Float64Builder::with_capacity(n);
let mut trade_intensity = Float64Builder::with_capacity(n);
let mut volume_per_trade = Float64Builder::with_capacity(n);
let mut aggression_ratio = Float64Builder::with_capacity(n);
let mut aggregation_density_f64 = Float64Builder::with_capacity(n);
let mut turnover_imbalance = Float64Builder::with_capacity(n);
let mut has_gap = BooleanBuilder::with_capacity(n);
let mut gap_trade_count = Int64Builder::with_capacity(n);
let mut max_gap_duration_us = Int64Builder::with_capacity(n);
let mut is_exchange_gap = BooleanBuilder::with_capacity(n);
let mut spread_open = Float64Builder::with_capacity(n);
let mut spread_high = Float64Builder::with_capacity(n);
let mut spread_low = Float64Builder::with_capacity(n);
let mut spread_close = Float64Builder::with_capacity(n);
let mut twas = Float64Builder::with_capacity(n);
let mut spread_mean = Float64Builder::with_capacity(n);
let mut spread_variance = Float64Builder::with_capacity(n);
let mut spread_skewness = Float64Builder::with_capacity(n);
let mut spread_kurtosis = Float64Builder::with_capacity(n);
let mut quote_asymmetry = Float64Builder::with_capacity(n);
let mut bid_ask_imbalance = Float64Builder::with_capacity(n);
let mut time_at_wide_ratio = Float64Builder::with_capacity(n);
let mut time_at_tight_ratio = Float64Builder::with_capacity(n);
let mut bid_open = Float64Builder::with_capacity(n);
let mut bid_high = Float64Builder::with_capacity(n);
let mut bid_low = Float64Builder::with_capacity(n);
let mut bid_close = Float64Builder::with_capacity(n);
let mut ask_open = Float64Builder::with_capacity(n);
let mut ask_high = Float64Builder::with_capacity(n);
let mut ask_low = Float64Builder::with_capacity(n);
let mut ask_close = Float64Builder::with_capacity(n);
let mut spread_kaufman_er = Float64Builder::with_capacity(n);
let mut spread_range = Float64Builder::with_capacity(n);
let mut spread_open_close_ratio = Float64Builder::with_capacity(n);
let mut tick_count_with_quotes = Float64Builder::with_capacity(n);
let mut total_quote_duration_us = Float64Builder::with_capacity(n);
let mut spread_at_breach = Float64Builder::with_capacity(n);
let mut quote_side_imbalance = Float64Builder::with_capacity(n);
let mut quote_staleness_ratio = Float64Builder::with_capacity(n);
let mut signed_tick_ratio = Float64Builder::with_capacity(n);
let mut spread_price_correlation = Float64Builder::with_capacity(n);
let mut tick_arrival_cv = Float64Builder::with_capacity(n);
let mut spread_trajectory_shape = Float64Builder::with_capacity(n);
let mut executable_spread_ratio = Float64Builder::with_capacity(n);
let mut spread_autocorrelation = Float64Builder::with_capacity(n);
let mut mid_momentum_ratio = Float64Builder::with_capacity(n);
let mut worst_spread = Float64Builder::with_capacity(n);
for bar in bars {
open_time.append_value(bar.open_time);
close_time.append_value(bar.close_time);
open.append_value(bar.open.to_f64());
high.append_value(bar.high.to_f64());
low.append_value(bar.low.to_f64());
close.append_value(bar.close.to_f64());
volume.append_value(bar.volume as f64 / SCALE as f64); turnover.append_value(bar.turnover as f64);
individual_trade_count.append_value(bar.individual_trade_count);
agg_record_count.append_value(bar.agg_record_count);
first_trade_id.append_value(bar.first_trade_id);
last_trade_id.append_value(bar.last_trade_id);
first_agg_trade_id.append_value(bar.first_agg_trade_id);
last_agg_trade_id.append_value(bar.last_agg_trade_id);
data_source.append_value(match bar.data_source {
DataSource::BinanceSpot => "BinanceSpot",
DataSource::BinanceFuturesUM => "BinanceFuturesUM",
DataSource::BinanceFuturesCM => "BinanceFuturesCM",
});
buy_volume.append_value(bar.buy_volume as f64 / SCALE as f64); sell_volume.append_value(bar.sell_volume as f64 / SCALE as f64); buy_trade_count.append_value(bar.buy_trade_count);
sell_trade_count.append_value(bar.sell_trade_count);
vwap.append_value(bar.vwap.to_f64());
buy_turnover.append_value(bar.buy_turnover as f64);
sell_turnover.append_value(bar.sell_turnover as f64);
duration_us.append_value(bar.duration_us);
ofi.append_value(bar.ofi);
vwap_close_deviation.append_value(bar.vwap_close_deviation);
price_impact.append_value(bar.price_impact);
kyle_lambda_proxy.append_value(bar.kyle_lambda_proxy);
trade_intensity.append_value(bar.trade_intensity);
volume_per_trade.append_value(bar.volume_per_trade);
aggression_ratio.append_value(bar.aggression_ratio);
aggregation_density_f64.append_value(bar.aggregation_density_f64);
turnover_imbalance.append_value(bar.turnover_imbalance);
has_gap.append_value(bar.has_gap);
gap_trade_count.append_value(bar.gap_trade_count);
max_gap_duration_us.append_value(bar.max_gap_duration_us);
is_exchange_gap.append_value(bar.is_exchange_gap);
spread_open.append_option(bar.spread_open);
spread_high.append_option(bar.spread_high);
spread_low.append_option(bar.spread_low);
spread_close.append_option(bar.spread_close);
twas.append_option(bar.twas);
spread_mean.append_option(bar.spread_mean);
spread_variance.append_option(bar.spread_variance);
spread_skewness.append_option(bar.spread_skewness);
spread_kurtosis.append_option(bar.spread_kurtosis);
quote_asymmetry.append_option(bar.quote_asymmetry);
bid_ask_imbalance.append_option(bar.bid_ask_imbalance);
time_at_wide_ratio.append_option(bar.time_at_wide_ratio);
time_at_tight_ratio.append_option(bar.time_at_tight_ratio);
bid_open.append_option(bar.bid_open);
bid_high.append_option(bar.bid_high);
bid_low.append_option(bar.bid_low);
bid_close.append_option(bar.bid_close);
ask_open.append_option(bar.ask_open);
ask_high.append_option(bar.ask_high);
ask_low.append_option(bar.ask_low);
ask_close.append_option(bar.ask_close);
spread_kaufman_er.append_option(bar.spread_kaufman_er);
spread_range.append_option(bar.spread_range);
spread_open_close_ratio.append_option(bar.spread_open_close_ratio);
tick_count_with_quotes.append_option(bar.tick_count_with_quotes);
total_quote_duration_us.append_option(bar.total_quote_duration_us);
spread_at_breach.append_option(bar.spread_at_breach);
quote_side_imbalance.append_option(bar.quote_side_imbalance);
quote_staleness_ratio.append_option(bar.quote_staleness_ratio);
signed_tick_ratio.append_option(bar.signed_tick_ratio);
spread_price_correlation.append_option(bar.spread_price_correlation);
tick_arrival_cv.append_option(bar.tick_arrival_cv);
spread_trajectory_shape.append_option(bar.spread_trajectory_shape);
executable_spread_ratio.append_option(bar.executable_spread_ratio);
spread_autocorrelation.append_option(bar.spread_autocorrelation);
mid_momentum_ratio.append_option(bar.mid_momentum_ratio);
worst_spread.append_option(bar.worst_spread);
}
RecordBatch::try_new(
schema,
vec![
Arc::new(open_time.finish()),
Arc::new(close_time.finish()),
Arc::new(open.finish()),
Arc::new(high.finish()),
Arc::new(low.finish()),
Arc::new(close.finish()),
Arc::new(volume.finish()),
Arc::new(turnover.finish()),
Arc::new(individual_trade_count.finish()),
Arc::new(agg_record_count.finish()),
Arc::new(first_trade_id.finish()),
Arc::new(last_trade_id.finish()),
Arc::new(first_agg_trade_id.finish()),
Arc::new(last_agg_trade_id.finish()),
Arc::new(data_source.finish()),
Arc::new(buy_volume.finish()),
Arc::new(sell_volume.finish()),
Arc::new(buy_trade_count.finish()),
Arc::new(sell_trade_count.finish()),
Arc::new(vwap.finish()),
Arc::new(buy_turnover.finish()),
Arc::new(sell_turnover.finish()),
Arc::new(duration_us.finish()),
Arc::new(ofi.finish()),
Arc::new(vwap_close_deviation.finish()),
Arc::new(price_impact.finish()),
Arc::new(kyle_lambda_proxy.finish()),
Arc::new(trade_intensity.finish()),
Arc::new(volume_per_trade.finish()),
Arc::new(aggression_ratio.finish()),
Arc::new(aggregation_density_f64.finish()),
Arc::new(turnover_imbalance.finish()),
Arc::new(has_gap.finish()),
Arc::new(gap_trade_count.finish()),
Arc::new(max_gap_duration_us.finish()),
Arc::new(is_exchange_gap.finish()),
Arc::new(spread_open.finish()),
Arc::new(spread_high.finish()),
Arc::new(spread_low.finish()),
Arc::new(spread_close.finish()),
Arc::new(twas.finish()),
Arc::new(spread_mean.finish()),
Arc::new(spread_variance.finish()),
Arc::new(spread_skewness.finish()),
Arc::new(spread_kurtosis.finish()),
Arc::new(quote_asymmetry.finish()),
Arc::new(bid_ask_imbalance.finish()),
Arc::new(time_at_wide_ratio.finish()),
Arc::new(time_at_tight_ratio.finish()),
Arc::new(bid_open.finish()),
Arc::new(bid_high.finish()),
Arc::new(bid_low.finish()),
Arc::new(bid_close.finish()),
Arc::new(ask_open.finish()),
Arc::new(ask_high.finish()),
Arc::new(ask_low.finish()),
Arc::new(ask_close.finish()),
Arc::new(spread_kaufman_er.finish()),
Arc::new(spread_range.finish()),
Arc::new(spread_open_close_ratio.finish()),
Arc::new(tick_count_with_quotes.finish()),
Arc::new(total_quote_duration_us.finish()),
Arc::new(spread_at_breach.finish()),
Arc::new(quote_side_imbalance.finish()),
Arc::new(quote_staleness_ratio.finish()),
Arc::new(signed_tick_ratio.finish()),
Arc::new(spread_price_correlation.finish()),
Arc::new(tick_arrival_cv.finish()),
Arc::new(spread_trajectory_shape.finish()),
Arc::new(executable_spread_ratio.finish()),
Arc::new(spread_autocorrelation.finish()),
Arc::new(mid_momentum_ratio.finish()),
Arc::new(worst_spread.finish()),
],
)
.expect("Failed to create RecordBatch from OpenDeviationBars")
}
pub fn tick_schema() -> Schema {
Schema::new(vec![
Field::new("ref_id", DataType::Int64, false),
Field::new("price", DataType::Float64, false),
Field::new("quantity", DataType::Float64, false),
Field::new("first_sub_id", DataType::Int64, false),
Field::new("last_sub_id", DataType::Int64, false),
Field::new("timestamp", DataType::Int64, false),
Field::new("is_buyer_maker", DataType::Boolean, false),
Field::new("is_best_match", DataType::Boolean, true), ])
}
pub fn ticks_to_record_batch(trades: &[Tick]) -> RecordBatch {
let schema = Arc::new(tick_schema());
let n = trades.len();
let mut agg_trade_id = Int64Builder::with_capacity(n);
let mut price = Float64Builder::with_capacity(n);
let mut volume = Float64Builder::with_capacity(n);
let mut first_trade_id = Int64Builder::with_capacity(n);
let mut last_trade_id = Int64Builder::with_capacity(n);
let mut timestamp = Int64Builder::with_capacity(n);
let mut is_buyer_maker = BooleanBuilder::with_capacity(n);
let mut is_best_match = BooleanBuilder::with_capacity(n);
for t in trades {
agg_trade_id.append_value(t.ref_id);
price.append_value(t.price.to_f64());
volume.append_value(t.volume.to_f64());
first_trade_id.append_value(t.first_sub_id);
last_trade_id.append_value(t.last_sub_id);
timestamp.append_value(t.timestamp);
is_buyer_maker.append_value(t.is_buyer_maker);
match t.is_best_match {
Some(v) => is_best_match.append_value(v),
None => is_best_match.append_null(),
}
}
RecordBatch::try_new(
schema,
vec![
Arc::new(agg_trade_id.finish()),
Arc::new(price.finish()),
Arc::new(volume.finish()),
Arc::new(first_trade_id.finish()),
Arc::new(last_trade_id.finish()),
Arc::new(timestamp.finish()),
Arc::new(is_buyer_maker.finish()),
Arc::new(is_best_match.finish()),
],
)
.expect("Failed to create RecordBatch from Ticks")
}
#[cfg(test)]
mod tests_phase57 {
use super::*;
use arrow_schema::{Field, Schema};
#[test]
fn test_process_from_arrow_with_best_bid_ask() {
use crate::trade::BreachMode;
let n = 200;
let base_ts: i64 = 1_704_067_200_000_000; let base_price = 1.10000_f64;
let threshold = 250_u32;
let timestamps: Vec<i64> = (0..n).map(|i| base_ts + i as i64 * 1_000_000).collect();
let mut prices = Vec::with_capacity(n);
let mut bids = Vec::with_capacity(n);
let mut asks = Vec::with_capacity(n);
for i in 0..n {
let p = if i < 100 {
base_price + (i as f64) * 0.00004
} else {
base_price - 0.005
};
let spread = 0.00010;
prices.push(p);
bids.push(p - spread / 2.0);
asks.push(p + spread / 2.0);
}
let quantities: Vec<f64> = (0..n).map(|_| 1.0).collect();
let ref_ids: Vec<i64> = (1..=n as i64).collect();
let schema = Arc::new(Schema::new(vec![
Field::new("timestamp", DataType::Int64, false),
Field::new("price", DataType::Float64, false),
Field::new("quantity", DataType::Float64, false),
Field::new("ref_id", DataType::Int64, false),
Field::new("best_bid", DataType::Float64, false),
Field::new("best_ask", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(timestamps)),
Arc::new(Float64Array::from(prices)),
Arc::new(Float64Array::from(quantities)),
Arc::new(Int64Array::from(ref_ids)),
Arc::new(Float64Array::from(bids)),
Arc::new(Float64Array::from(asks)),
],
)
.unwrap();
let mut processor =
OpenDeviationBarProcessor::new_with_breach_mode(threshold, BreachMode::Portcullis)
.unwrap();
let bars =
process_from_arrow_columns(&mut processor, &batch, TimestampUnit::Microsecond)
.unwrap();
assert!(
!bars.is_empty(),
"Expected at least 1 bar from 200 trades with Portcullis mode"
);
let bar_with_spread = bars.iter().find(|b| b.spread_open.is_some());
assert!(
bar_with_spread.is_some(),
"At least one bar should have spread_open populated when bid/ask columns present"
);
}
#[test]
fn test_process_from_arrow_without_best_bid_ask() {
let schema = Arc::new(Schema::new(vec![
Field::new("timestamp", DataType::Int64, false),
Field::new("price", DataType::Float64, false),
Field::new("quantity", DataType::Float64, false),
Field::new("ref_id", DataType::Int64, false),
]));
let n = 50;
let base_ts: i64 = 1_704_067_200_000;
let timestamps: Vec<i64> = (0..n).map(|i| base_ts + i as i64 * 100).collect();
let prices: Vec<f64> = (0..n)
.map(|i| {
if i < 20 {
50000.0 + (i as f64) * 10.0
} else {
49800.0
}
})
.collect();
let quantities: Vec<f64> = (0..n).map(|_| 0.1).collect();
let ref_ids: Vec<i64> = (1000..1000 + n as i64).collect();
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(timestamps)),
Arc::new(Float64Array::from(prices)),
Arc::new(Float64Array::from(quantities)),
Arc::new(Int64Array::from(ref_ids)),
],
)
.unwrap();
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let bars =
process_from_arrow_columns(&mut processor, &batch, TimestampUnit::Millisecond)
.unwrap();
assert!(
!bars.is_empty(),
"Expected at least 1 bar from 50 trades without bid/ask columns"
);
for bar in &bars {
assert!(
bar.spread_open.is_none(),
"spread_open should be None without bid/ask columns"
);
}
}
#[test]
fn test_record_batch_to_ticks_with_best_bid_ask() {
let schema = Arc::new(Schema::new(vec![
Field::new("timestamp", DataType::Int64, false),
Field::new("price", DataType::Float64, false),
Field::new("quantity", DataType::Float64, false),
Field::new("best_bid", DataType::Float64, false),
Field::new("best_ask", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(vec![1_704_067_200_000_000_i64])),
Arc::new(Float64Array::from(vec![1.10050])),
Arc::new(Float64Array::from(vec![1.0])),
Arc::new(Float64Array::from(vec![1.10000])),
Arc::new(Float64Array::from(vec![1.10100])),
],
)
.unwrap();
let ticks = record_batch_to_ticks(&batch, TimestampUnit::Microsecond).unwrap();
assert_eq!(ticks.len(), 1);
assert!(ticks[0].best_bid.is_some(), "best_bid should be Some");
assert!(ticks[0].best_ask.is_some(), "best_ask should be Some");
let bid_f64 = ticks[0].best_bid.unwrap().to_f64();
let ask_f64 = ticks[0].best_ask.unwrap().to_f64();
assert!(
(bid_f64 - 1.10000).abs() < 1e-8,
"bid should be 1.10000, got {bid_f64}"
);
assert!(
(ask_f64 - 1.10100).abs() < 1e-8,
"ask should be 1.10100, got {ask_f64}"
);
}
#[test]
fn test_record_batch_to_ticks_without_best_bid_ask() {
let schema = Arc::new(Schema::new(vec![
Field::new("timestamp", DataType::Int64, false),
Field::new("price", DataType::Float64, false),
Field::new("quantity", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(vec![1_704_067_200_000_i64])),
Arc::new(Float64Array::from(vec![50000.0])),
Arc::new(Float64Array::from(vec![1.0])),
],
)
.unwrap();
let ticks = record_batch_to_ticks(&batch, TimestampUnit::Millisecond).unwrap();
assert_eq!(ticks.len(), 1);
assert!(
ticks[0].best_bid.is_none(),
"best_bid should be None without column"
);
assert!(
ticks[0].best_ask.is_none(),
"best_ask should be None without column"
);
}
}