use crate::errors::ProcessingError;
use crate::fixed_point::FixedPoint;
use crate::trade::Tick;
use crate::types::OpenDeviationBar;
#[derive(Debug, Clone)]
pub(crate) struct InternalOpenDeviationBar {
open_time: i64,
close_time: i64,
open: FixedPoint,
high: FixedPoint,
low: FixedPoint,
close: FixedPoint,
volume: i128,
turnover: i128,
individual_trade_count: i64,
agg_record_count: u32,
first_sub_id: i64,
last_sub_id: i64,
first_ref_id: i64,
last_ref_id: i64,
buy_volume: i128,
sell_volume: i128,
buy_trade_count: i64,
sell_trade_count: i64,
vwap: FixedPoint,
buy_turnover: i128,
sell_turnover: i128,
}
pub struct ExportOpenDeviationBarProcessor {
threshold_decimal_bps: u32,
current_bar: Option<InternalOpenDeviationBar>,
completed_bars: Vec<OpenDeviationBar>,
completed_bars_pool: Option<Vec<OpenDeviationBar>>,
prevent_same_timestamp_close: bool,
defer_open: bool,
}
impl ExportOpenDeviationBarProcessor {
pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
Self::with_options(threshold_decimal_bps, true)
}
pub fn with_options(
threshold_decimal_bps: u32,
prevent_same_timestamp_close: bool,
) -> Result<Self, ProcessingError> {
if threshold_decimal_bps < 1 {
return Err(ProcessingError::InvalidThreshold {
threshold_decimal_bps,
});
}
if threshold_decimal_bps > 100_000 {
return Err(ProcessingError::InvalidThreshold {
threshold_decimal_bps,
});
}
Ok(Self {
threshold_decimal_bps,
current_bar: None,
completed_bars: Vec::new(),
completed_bars_pool: None,
prevent_same_timestamp_close,
defer_open: false,
})
}
pub fn process_trades_continuously(&mut self, trades: &[Tick]) {
for trade in trades {
self.process_single_trade_fixed_point(trade);
}
}
fn process_single_trade_fixed_point(&mut self, trade: &Tick) {
if self.defer_open {
self.defer_open = false;
self.current_bar = None; }
if self.current_bar.is_none() {
let trade_turnover = trade.turnover();
let vol = trade.volume.0 as i128;
let (buy_vol, sell_vol, buy_count, sell_count, buy_turn, sell_turn) =
if trade.is_buyer_maker {
(0i128, vol, 0i64, 1i64, 0i128, trade_turnover)
} else {
(vol, 0i128, 1i64, 0i64, trade_turnover, 0i128)
};
self.current_bar = Some(InternalOpenDeviationBar {
open_time: trade.timestamp,
close_time: trade.timestamp,
open: trade.price,
high: trade.price,
low: trade.price,
close: trade.price,
volume: vol,
turnover: trade_turnover,
individual_trade_count: 1,
agg_record_count: 1,
first_sub_id: trade.first_sub_id,
last_sub_id: trade.last_sub_id,
first_ref_id: trade.ref_id,
last_ref_id: trade.ref_id,
buy_volume: buy_vol,
sell_volume: sell_vol,
buy_trade_count: buy_count,
sell_trade_count: sell_count,
vwap: trade.price,
buy_turnover: buy_turn,
sell_turnover: sell_turn,
});
return;
}
let bar = self.current_bar.as_mut().unwrap();
let trade_turnover = trade.turnover();
let price_val = trade.price.0;
let bar_open_val = bar.open.0;
let threshold_decimal_bps = self.threshold_decimal_bps as i64;
let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
if trade.timestamp > bar.close_time {
bar.close_time = trade.timestamp;
}
bar.close = trade.price;
bar.volume += trade.volume.0 as i128; bar.turnover += trade_turnover;
bar.individual_trade_count += 1;
bar.agg_record_count += 1;
if trade.last_sub_id > bar.last_sub_id {
bar.last_sub_id = trade.last_sub_id;
}
if trade.ref_id > bar.last_ref_id {
bar.last_ref_id = trade.ref_id; }
if price_val > bar.high.0 {
bar.high = trade.price;
}
if price_val < bar.low.0 {
bar.low = trade.price;
}
if trade.is_buyer_maker {
bar.sell_volume += trade.volume.0 as i128; bar.sell_turnover += trade_turnover;
bar.sell_trade_count += 1;
} else {
bar.buy_volume += trade.volume.0 as i128; bar.buy_turnover += trade_turnover;
bar.buy_trade_count += 1;
}
let price_breaches = price_val >= upper_threshold || price_val <= lower_threshold;
let timestamp_allows_close =
!self.prevent_same_timestamp_close || trade.timestamp != bar.open_time;
if price_breaches && timestamp_allows_close {
let completed_bar = self.current_bar.take().unwrap();
let mut export_bar = OpenDeviationBar {
open_time: completed_bar.open_time,
close_time: completed_bar.close_time,
open: completed_bar.open,
high: completed_bar.high,
low: completed_bar.low,
close: completed_bar.close,
volume: completed_bar.volume,
turnover: completed_bar.turnover,
individual_trade_count: completed_bar.individual_trade_count as u32,
agg_record_count: completed_bar.agg_record_count,
first_trade_id: completed_bar.first_sub_id,
last_trade_id: completed_bar.last_sub_id,
first_agg_trade_id: completed_bar.first_ref_id, last_agg_trade_id: completed_bar.last_ref_id,
buy_volume: completed_bar.buy_volume,
sell_volume: completed_bar.sell_volume,
buy_trade_count: completed_bar.buy_trade_count as u32,
sell_trade_count: completed_bar.sell_trade_count as u32,
vwap: completed_bar.vwap,
buy_turnover: completed_bar.buy_turnover,
sell_turnover: completed_bar.sell_turnover,
..Default::default() };
export_bar.compute_microstructure_features();
self.completed_bars.push(export_bar);
self.current_bar = None;
self.defer_open = true;
}
}
pub fn get_all_completed_bars(&mut self) -> Vec<OpenDeviationBar> {
let mut result = if let Some(mut pool_vec) = self.completed_bars_pool.take() {
pool_vec.clear();
pool_vec
} else {
Vec::new()
};
std::mem::swap(&mut result, &mut self.completed_bars);
self.completed_bars_pool = Some(std::mem::take(&mut self.completed_bars));
result
}
pub fn get_incomplete_bar(&mut self) -> Option<OpenDeviationBar> {
self.current_bar.as_ref().map(|incomplete| {
let mut bar = OpenDeviationBar {
open_time: incomplete.open_time,
close_time: incomplete.close_time,
open: incomplete.open,
high: incomplete.high,
low: incomplete.low,
close: incomplete.close,
volume: incomplete.volume,
turnover: incomplete.turnover,
individual_trade_count: incomplete.individual_trade_count as u32,
agg_record_count: incomplete.agg_record_count,
first_trade_id: incomplete.first_sub_id,
last_trade_id: incomplete.last_sub_id,
first_agg_trade_id: incomplete.first_ref_id,
last_agg_trade_id: incomplete.last_ref_id,
data_source: crate::types::DataSource::default(),
buy_volume: incomplete.buy_volume,
sell_volume: incomplete.sell_volume,
buy_trade_count: incomplete.buy_trade_count as u32,
sell_trade_count: incomplete.sell_trade_count as u32,
vwap: incomplete.vwap,
buy_turnover: incomplete.buy_turnover,
sell_turnover: incomplete.sell_turnover,
..Default::default()
};
bar.compute_microstructure_features();
bar
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::create_test_agg_trade_with_range;
fn buy_trade(id: i64, price: &str, vol: &str, ts: i64) -> Tick {
create_test_agg_trade_with_range(id, price, vol, ts, id * 10, id * 10, false)
}
fn sell_trade(id: i64, price: &str, vol: &str, ts: i64) -> Tick {
create_test_agg_trade_with_range(id, price, vol, ts, id * 10, id * 10, true)
}
#[test]
fn test_new_valid_threshold() {
let proc = ExportOpenDeviationBarProcessor::new(250);
assert!(proc.is_ok());
}
#[test]
fn test_new_invalid_threshold_zero() {
match ExportOpenDeviationBarProcessor::new(0) {
Err(ProcessingError::InvalidThreshold {
threshold_decimal_bps: 0,
}) => {}
Err(e) => panic!("Expected InvalidThreshold(0), got error: {e}"),
Ok(_) => panic!("Expected error for threshold 0"),
}
}
#[test]
fn test_new_invalid_threshold_too_high() {
let proc = ExportOpenDeviationBarProcessor::new(100_001);
assert!(proc.is_err());
}
#[test]
fn test_new_boundary_thresholds() {
assert!(ExportOpenDeviationBarProcessor::new(1).is_ok());
assert!(ExportOpenDeviationBarProcessor::new(100_000).is_ok());
}
#[test]
fn test_with_options_timestamp_gating() {
let proc = ExportOpenDeviationBarProcessor::with_options(250, false);
assert!(proc.is_ok());
}
#[test]
fn test_single_trade_no_bar_completion() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![buy_trade(1, "100.0", "1.0", 1000)];
proc.process_trades_continuously(&trades);
let completed = proc.get_all_completed_bars();
assert_eq!(completed.len(), 0, "Single trade should not complete a bar");
let incomplete = proc.get_incomplete_bar();
assert!(incomplete.is_some(), "Should have an incomplete bar");
let bar = incomplete.unwrap();
assert_eq!(bar.open, FixedPoint::from_str("100.0").unwrap());
assert_eq!(bar.close, FixedPoint::from_str("100.0").unwrap());
}
#[test]
fn test_breach_completes_bar() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
buy_trade(1, "100.0", "1.0", 1000),
buy_trade(2, "100.10", "1.0", 2000),
buy_trade(3, "100.25", "1.0", 3000), ];
proc.process_trades_continuously(&trades);
let completed = proc.get_all_completed_bars();
assert_eq!(completed.len(), 1, "Breach should complete one bar");
let bar = &completed[0];
assert_eq!(bar.open, FixedPoint::from_str("100.0").unwrap());
assert_eq!(bar.close, FixedPoint::from_str("100.25").unwrap());
assert_eq!(bar.high, FixedPoint::from_str("100.25").unwrap());
assert_eq!(bar.low, FixedPoint::from_str("100.0").unwrap());
}
#[test]
fn test_defer_open_semantics() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
buy_trade(1, "100.0", "1.0", 1000),
buy_trade(2, "100.25", "1.0", 2000), buy_trade(3, "100.50", "1.0", 3000), ];
proc.process_trades_continuously(&trades);
let completed = proc.get_all_completed_bars();
assert_eq!(completed.len(), 1);
assert_eq!(completed[0].open, FixedPoint::from_str("100.0").unwrap());
assert_eq!(completed[0].close, FixedPoint::from_str("100.25").unwrap());
let incomplete = proc.get_incomplete_bar();
assert!(incomplete.is_some());
let bar2 = incomplete.unwrap();
assert_eq!(
bar2.open,
FixedPoint::from_str("100.50").unwrap(),
"Bar 2 should open at trade 3's price, not the breaching trade"
);
}
#[test]
fn test_timestamp_gate_prevents_same_ts_close() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
buy_trade(1, "100.0", "1.0", 1000),
buy_trade(2, "100.30", "1.0", 1000), ];
proc.process_trades_continuously(&trades);
let completed = proc.get_all_completed_bars();
assert_eq!(
completed.len(),
0,
"Timestamp gate should prevent close on same ms"
);
}
#[test]
fn test_timestamp_gate_disabled() {
let mut proc = ExportOpenDeviationBarProcessor::with_options(250, false).unwrap();
let trades = vec![
buy_trade(1, "100.0", "1.0", 1000),
buy_trade(2, "100.30", "1.0", 1000), ];
proc.process_trades_continuously(&trades);
let completed = proc.get_all_completed_bars();
assert_eq!(
completed.len(),
1,
"With gating disabled, same-ts breach should close"
);
}
#[test]
fn test_get_all_completed_bars_drains() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
buy_trade(1, "100.0", "1.0", 1000),
buy_trade(2, "100.25", "1.0", 2000), ];
proc.process_trades_continuously(&trades);
let bars1 = proc.get_all_completed_bars();
assert_eq!(bars1.len(), 1);
let bars2 = proc.get_all_completed_bars();
assert_eq!(bars2.len(), 0, "get_all_completed_bars should drain buffer");
}
#[test]
fn test_vec_reuse_pool() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
proc.process_trades_continuously(&[
buy_trade(1, "100.0", "1.0", 1000),
buy_trade(2, "100.25", "1.0", 2000),
]);
let _bars1 = proc.get_all_completed_bars();
proc.process_trades_continuously(&[
sell_trade(3, "100.50", "1.0", 3000),
sell_trade(4, "100.75", "1.0", 4000),
sell_trade(5, "100.24", "1.0", 5000), ]);
let bars2 = proc.get_all_completed_bars();
assert_eq!(bars2.len(), 1);
}
#[test]
fn test_buy_sell_volume_segregation() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
buy_trade(1, "100.0", "2.0", 1000), sell_trade(2, "100.05", "3.0", 2000), buy_trade(3, "100.25", "1.0", 3000), ];
proc.process_trades_continuously(&trades);
let bars = proc.get_all_completed_bars();
assert_eq!(bars.len(), 1);
let bar = &bars[0];
let buy_vol = bar.buy_volume;
let sell_vol = bar.sell_volume;
assert_eq!(
buy_vol, 300_000_000,
"Buy volume should be 3.0 in FixedPoint i128"
);
assert_eq!(
sell_vol, 300_000_000,
"Sell volume should be 3.0 in FixedPoint i128"
);
}
#[test]
fn test_trade_id_tracking() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
create_test_agg_trade_with_range(100, "100.0", "1.0", 1000, 1000, 1005, false),
create_test_agg_trade_with_range(101, "100.10", "1.0", 2000, 1006, 1010, true),
create_test_agg_trade_with_range(102, "100.25", "1.0", 3000, 1011, 1015, false), ];
proc.process_trades_continuously(&trades);
let bars = proc.get_all_completed_bars();
assert_eq!(bars.len(), 1);
let bar = &bars[0];
assert_eq!(bar.first_agg_trade_id, 100);
assert_eq!(bar.last_agg_trade_id, 102);
assert_eq!(bar.first_trade_id, 1000);
assert_eq!(bar.last_trade_id, 1015);
}
#[test]
fn test_microstructure_features_computed() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
buy_trade(1, "100.0", "5.0", 1000),
sell_trade(2, "100.10", "3.0", 2000),
buy_trade(3, "100.25", "2.0", 3000), ];
proc.process_trades_continuously(&trades);
let bars = proc.get_all_completed_bars();
let bar = &bars[0];
assert!(bar.ofi != 0.0, "OFI should be computed");
assert!(bar.trade_intensity > 0.0, "Trade intensity should be > 0");
assert!(bar.volume_per_trade > 0.0, "Volume per trade should be > 0");
}
#[test]
fn test_incomplete_bar_has_microstructure() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
proc.process_trades_continuously(&[
buy_trade(1, "100.0", "5.0", 1000),
sell_trade(2, "100.10", "3.0", 2000),
]);
let incomplete = proc.get_incomplete_bar().unwrap();
assert!(
incomplete.volume_per_trade > 0.0,
"Incomplete bar should have microstructure features"
);
}
#[test]
fn test_multiple_bars_sequence() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
buy_trade(1, "100.0", "1.0", 1000),
buy_trade(2, "100.25", "1.0", 2000), buy_trade(3, "100.50", "1.0", 3000), buy_trade(4, "100.76", "1.0", 4000), ];
proc.process_trades_continuously(&trades);
let bars = proc.get_all_completed_bars();
assert_eq!(bars.len(), 2, "Should produce 2 complete bars");
assert_eq!(bars[0].open, FixedPoint::from_str("100.0").unwrap());
assert_eq!(bars[1].open, FixedPoint::from_str("100.50").unwrap());
}
#[test]
fn test_downward_breach() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
sell_trade(1, "100.0", "1.0", 1000),
sell_trade(2, "99.75", "1.0", 2000), ];
proc.process_trades_continuously(&trades);
let bars = proc.get_all_completed_bars();
assert_eq!(bars.len(), 1);
assert_eq!(bars[0].close, FixedPoint::from_str("99.75").unwrap());
}
#[test]
fn test_empty_trades_no_op() {
let mut proc = ExportOpenDeviationBarProcessor::new(250).unwrap();
proc.process_trades_continuously(&[]);
assert_eq!(proc.get_all_completed_bars().len(), 0);
assert!(proc.get_incomplete_bar().is_none());
}
#[test]
fn test_parity_with_open_deviation_bar_processor() {
use crate::processor::OpenDeviationBarProcessor;
for threshold in [250, 500, 1000] {
let trades: Vec<Tick> = (0..20)
.map(|i| {
let price = format!("{:.8}", 100.0 + (i as f64 * 0.15));
buy_trade(i + 1, &price, "1.0", 1000 + i * 1000)
})
.collect();
let mut main_proc = OpenDeviationBarProcessor::new(threshold).unwrap();
let main_bars = main_proc.process_agg_trade_records(&trades).unwrap();
let mut export_proc = ExportOpenDeviationBarProcessor::new(threshold).unwrap();
export_proc.process_trades_continuously(&trades);
let export_bars = export_proc.get_all_completed_bars();
assert_eq!(
main_bars.len(),
export_bars.len(),
"threshold={threshold}: bar count mismatch: main={} export={}",
main_bars.len(),
export_bars.len()
);
for (i, (m, e)) in main_bars.iter().zip(export_bars.iter()).enumerate() {
assert_eq!(m.open, e.open, "t={threshold} bar={i}: open mismatch");
assert_eq!(m.high, e.high, "t={threshold} bar={i}: high mismatch");
assert_eq!(m.low, e.low, "t={threshold} bar={i}: low mismatch");
assert_eq!(m.close, e.close, "t={threshold} bar={i}: close mismatch");
assert_eq!(m.volume, e.volume, "t={threshold} bar={i}: volume mismatch");
assert_eq!(
m.open_time, e.open_time,
"t={threshold} bar={i}: open_time mismatch"
);
assert_eq!(
m.close_time, e.close_time,
"t={threshold} bar={i}: close_time mismatch"
);
assert_eq!(
m.individual_trade_count, e.individual_trade_count,
"t={threshold} bar={i}: trade_count mismatch"
);
assert_eq!(
m.first_agg_trade_id, e.first_agg_trade_id,
"t={threshold} bar={i}: first_agg_trade_id mismatch"
);
assert_eq!(
m.last_agg_trade_id, e.last_agg_trade_id,
"t={threshold} bar={i}: last_agg_trade_id mismatch"
);
}
}
}
}