use opendeviationbar_core::checkpoint::{Checkpoint, CheckpointError, PositionVerification};
use opendeviationbar_core::fixed_point::FixedPoint;
use opendeviationbar_core::interbar::InterBarConfig;
use opendeviationbar_core::processor::{
ExportOpenDeviationBarProcessor, OpenDeviationBarProcessor, ProcessingError,
};
use opendeviationbar_core::test_utils::{self, scenarios};
use opendeviationbar_core::Tick;
use opendeviationbar_core::types::OpenDeviationBar;
#[test]
fn test_single_bar_no_breach() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::no_breach_sequence(250);
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(
bars.len(),
0,
"Strict algorithm should not create bars without breach"
);
let bars_with_incomplete = processor
.process_agg_trade_records_with_incomplete(&trades)
.unwrap();
assert_eq!(
bars_with_incomplete.len(),
1,
"Analysis mode should include incomplete bar"
);
let bar = &bars_with_incomplete[0];
assert_eq!(bar.open.to_string(), "50000.00000000");
assert_eq!(bar.high.to_string(), "50100.00000000");
assert_eq!(bar.low.to_string(), "49900.00000000");
assert_eq!(bar.close.to_string(), "49900.00000000");
}
#[test]
fn test_exact_breach_upward() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::exact_breach_upward(250);
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(
bars.len(),
1,
"Strict algorithm should only return completed bars"
);
let bar1 = &bars[0];
assert_eq!(bar1.open.to_string(), "50000.00000000");
assert_eq!(bar1.close.to_string(), "50125.00000000"); assert_eq!(bar1.high.to_string(), "50125.00000000");
assert_eq!(bar1.low.to_string(), "50000.00000000");
let bars_with_incomplete = processor
.process_agg_trade_records_with_incomplete(&trades)
.unwrap();
assert_eq!(
bars_with_incomplete.len(),
2,
"Analysis mode should include incomplete bars"
);
let bar2 = &bars_with_incomplete[1];
assert_eq!(bar2.open.to_string(), "50500.00000000"); assert_eq!(bar2.close.to_string(), "50500.00000000");
}
#[test]
fn test_exact_breach_downward() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::exact_breach_downward(250);
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 1);
let bar = &bars[0];
assert_eq!(bar.open.to_string(), "50000.00000000");
assert_eq!(bar.close.to_string(), "49875.00000000"); assert_eq!(bar.high.to_string(), "50000.00000000");
assert_eq!(bar.low.to_string(), "49875.00000000");
}
#[test]
fn test_large_gap_single_bar() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::large_gap_sequence();
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 1);
let bar = &bars[0];
assert_eq!(bar.open.to_string(), "50000.00000000");
assert_eq!(bar.close.to_string(), "51000.00000000");
assert_eq!(bar.high.to_string(), "51000.00000000");
assert_eq!(bar.low.to_string(), "50000.00000000");
}
#[test]
fn test_unsorted_trades_error() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::unsorted_sequence();
let result = processor.process_agg_trade_records(&trades);
assert!(result.is_err());
match result {
Err(ProcessingError::UnsortedTrades { index, .. }) => {
assert_eq!(index, 1);
}
_ => panic!("Expected UnsortedTrades error"),
}
}
#[test]
fn test_threshold_calculation() {
let processor = OpenDeviationBarProcessor::new(250).unwrap();
let price = FixedPoint::from_str("50000.0").unwrap();
let (upper, lower) = price.compute_range_thresholds_cached(processor.threshold_ratio);
assert_eq!(upper.to_string(), "50125.00000000");
assert_eq!(lower.to_string(), "49875.00000000");
}
#[test]
fn test_empty_trades() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap(); let trades = scenarios::empty_sequence();
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 0);
}
#[test]
fn test_debug_streaming_data() {
let mut processor = OpenDeviationBarProcessor::new(100).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
];
println!("Test data prices: 50014 -> 50163 -> 50032");
println!("Expected price movements: +0.3% then -0.26%");
let bars = processor.process_agg_trade_records(&trades).unwrap();
println!("Generated {} open deviation bars", bars.len());
for (i, bar) in bars.iter().enumerate() {
println!(
" Bar {}: O={} H={} L={} C={}",
i + 1,
bar.open,
bar.high,
bar.low,
bar.close
);
}
assert!(
!bars.is_empty(),
"Expected at least 1 open deviation bar with 0.3% price movement and 0.1% threshold"
);
}
#[test]
fn test_threshold_validation() {
assert!(OpenDeviationBarProcessor::new(250).is_ok());
assert!(matches!(
OpenDeviationBarProcessor::new(0),
Err(ProcessingError::InvalidThreshold {
threshold_decimal_bps: 0
})
));
assert!(matches!(
OpenDeviationBarProcessor::new(150_000),
Err(ProcessingError::InvalidThreshold {
threshold_decimal_bps: 150_000
})
));
assert!(OpenDeviationBarProcessor::new(1).is_ok());
assert!(OpenDeviationBarProcessor::new(100_000).is_ok());
}
#[test]
fn test_export_processor_with_manual_trades() {
println!("Testing ExportOpenDeviationBarProcessor with same trade data...");
let mut export_processor = ExportOpenDeviationBarProcessor::new(100).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(1, "50014.00859087", "0.12019569", 1756710002083),
test_utils::create_test_agg_trade(2, "50163.87750994", "1.01283708", 1756710005113), test_utils::create_test_agg_trade(3, "50032.44128269", "0.69397094", 1756710008770),
];
println!(
"Processing {} trades with ExportOpenDeviationBarProcessor...",
trades.len()
);
export_processor.process_trades_continuously(&trades);
let bars = export_processor.get_all_completed_bars();
println!(
"ExportOpenDeviationBarProcessor generated {} open deviation bars",
bars.len()
);
for (i, bar) in bars.iter().enumerate() {
println!(
" Bar {}: O={} H={} L={} C={}",
i + 1,
bar.open,
bar.high,
bar.low,
bar.close
);
}
assert!(
!bars.is_empty(),
"ExportOpenDeviationBarProcessor should generate same results as basic processor"
);
}
#[test]
fn test_checkpoint_creation() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::no_breach_sequence(250);
let _bars = processor.process_agg_trade_records(&trades).unwrap();
let checkpoint = processor.create_checkpoint("BTCUSDT");
assert_eq!(checkpoint.symbol, "BTCUSDT");
assert_eq!(checkpoint.threshold_decimal_bps, 250);
assert!(checkpoint.has_incomplete_bar()); assert!(checkpoint.thresholds.is_some()); assert!(checkpoint.last_trade_id.is_some()); }
#[test]
fn test_checkpoint_serialization_roundtrip() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::no_breach_sequence(250);
let _bars = processor.process_agg_trade_records(&trades).unwrap();
let checkpoint = processor.create_checkpoint("BTCUSDT");
let json = serde_json::to_string(&checkpoint).expect("Serialization should succeed");
let restored: Checkpoint = serde_json::from_str(&json).expect("Deserialization should succeed");
assert_eq!(restored.symbol, checkpoint.symbol);
assert_eq!(
restored.threshold_decimal_bps,
checkpoint.threshold_decimal_bps
);
assert_eq!(
restored.incomplete_bar.is_some(),
checkpoint.incomplete_bar.is_some()
);
}
#[test]
fn test_cross_file_bar_continuation() {
let mut all_trades = Vec::new();
let base_timestamp = 1640995200000000i64;
for i in 0..20 {
let price = 50000.0 + (i as f64 * 100.0) * if i % 4 < 2 { 1.0 } else { -1.0 };
let trade = test_utils::create_test_agg_trade(
i + 1,
&format!("{:.8}", price),
"1.0",
base_timestamp + (i * 1000000),
);
all_trades.push(trade);
}
let mut processor_full = OpenDeviationBarProcessor::new(100).unwrap(); let bars_full = processor_full
.process_agg_trade_records(&all_trades)
.unwrap();
let split_point = 10;
let mut processor_1 = OpenDeviationBarProcessor::new(100).unwrap();
let part1_trades = &all_trades[0..split_point];
let bars_1 = processor_1.process_agg_trade_records(part1_trades).unwrap();
let checkpoint = processor_1.create_checkpoint("TEST");
let mut processor_2 = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
let part2_trades = &all_trades[split_point..];
let bars_2 = processor_2.process_agg_trade_records(part2_trades).unwrap();
let split_total = bars_1.len() + bars_2.len();
println!("Full processing: {} bars", bars_full.len());
println!(
"Split processing: {} + {} = {} bars",
bars_1.len(),
bars_2.len(),
split_total
);
assert_eq!(
split_total,
bars_full.len(),
"Split processing should produce same bar count as full processing"
);
let all_split_bars: Vec<_> = bars_1.iter().chain(bars_2.iter()).collect();
for (i, (full, split)) in bars_full.iter().zip(all_split_bars.iter()).enumerate() {
assert_eq!(full.open.0, split.open.0, "Bar {} open price mismatch", i);
assert_eq!(
full.close.0, split.close.0,
"Bar {} close price mismatch",
i
);
}
}
#[test]
fn test_verify_position_exact() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
let _ = processor.process_single_trade(&trade1);
let _ = processor.process_single_trade(&trade2);
let next_trade = test_utils::create_test_agg_trade(102, "50020.0", "1.0", 1640995202000000);
let verification = processor.verify_position(&next_trade);
assert_eq!(verification, PositionVerification::Exact);
}
#[test]
fn test_verify_position_gap() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trade1 = test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200000000);
let trade2 = test_utils::create_test_agg_trade(101, "50010.0", "1.0", 1640995201000000);
let _ = processor.process_single_trade(&trade1);
let _ = processor.process_single_trade(&trade2);
let next_trade = test_utils::create_test_agg_trade(105, "50020.0", "1.0", 1640995202000000);
let verification = processor.verify_position(&next_trade);
match verification {
PositionVerification::Gap {
expected_id,
actual_id,
missing_count,
} => {
assert_eq!(expected_id, 102);
assert_eq!(actual_id, 105);
assert_eq!(missing_count, 3);
}
_ => panic!("Expected Gap verification, got {:?}", verification),
}
}
#[test]
fn test_verify_position_timestamp_only() {
let processor = OpenDeviationBarProcessor::new(250).unwrap();
let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 5000000);
let verification = processor.verify_position(&trade);
match verification {
PositionVerification::TimestampOnly { gap_ms } => {
assert_eq!(gap_ms, 5000, "gap_ms should be (timestamp - 0) / 1000");
}
_ => panic!(
"Expected TimestampOnly verification, got {:?}",
verification
),
}
}
#[test]
fn test_checkpoint_clean_completion() {
let mut processor = OpenDeviationBarProcessor::new(100).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), ];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
let checkpoint = processor.create_checkpoint("TEST");
assert!(
!checkpoint.has_incomplete_bar(),
"No incomplete bar when last trade was a breach with no following trade"
);
}
#[test]
fn test_checkpoint_with_remainder() {
let mut processor = OpenDeviationBarProcessor::new(100).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200000000),
test_utils::create_test_agg_trade(2, "50100.0", "1.0", 1640995201000000), test_utils::create_test_agg_trade(3, "50110.0", "1.0", 1640995202000000), ];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
let checkpoint = processor.create_checkpoint("TEST");
assert!(
checkpoint.has_incomplete_bar(),
"Should have incomplete bar from trade 3"
);
let incomplete = checkpoint.incomplete_bar.unwrap();
assert_eq!(
incomplete.open.to_string(),
"50110.00000000",
"Incomplete bar should open at trade 3 price"
);
}
#[test]
fn test_streaming_batch_parity() {
let threshold = 250;
let trades = test_utils::TickBuilder::new()
.add_trade(1, 1.0, 0) .add_trade(2, 1.001, 1000) .add_trade(3, 1.003, 2000) .add_trade(4, 1.004, 3000) .add_trade(5, 1.005, 4000) .add_trade(6, 1.008, 5000) .add_trade(7, 1.009, 6000) .build();
let mut batch_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
let batch_incomplete = batch_processor.get_incomplete_bar();
let mut stream_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
for trade in &trades {
if let Some(bar) = stream_processor.process_single_trade(trade).unwrap() {
stream_bars.push(bar);
}
}
let stream_incomplete = stream_processor.get_incomplete_bar();
assert_eq!(
batch_bars.len(),
stream_bars.len(),
"Batch and streaming should produce same number of completed bars"
);
for (i, (batch_bar, stream_bar)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
assert_eq!(
batch_bar.open, stream_bar.open,
"Bar {i}: open price mismatch"
);
assert_eq!(
batch_bar.close, stream_bar.close,
"Bar {i}: close price mismatch"
);
assert_eq!(
batch_bar.high, stream_bar.high,
"Bar {i}: high price mismatch"
);
assert_eq!(batch_bar.low, stream_bar.low, "Bar {i}: low price mismatch");
assert_eq!(
batch_bar.volume, stream_bar.volume,
"Bar {i}: volume mismatch (double-counting?)"
);
assert_eq!(
batch_bar.open_time, stream_bar.open_time,
"Bar {i}: open_time mismatch"
);
assert_eq!(
batch_bar.close_time, stream_bar.close_time,
"Bar {i}: close_time mismatch"
);
assert_eq!(
batch_bar.individual_trade_count, stream_bar.individual_trade_count,
"Bar {i}: trade count mismatch"
);
}
match (batch_incomplete, stream_incomplete) {
(Some(b), Some(s)) => {
assert_eq!(b.open, s.open, "Incomplete bar: open mismatch");
assert_eq!(b.close, s.close, "Incomplete bar: close mismatch");
assert_eq!(b.volume, s.volume, "Incomplete bar: volume mismatch");
}
(None, None) => {} _ => panic!("Incomplete bar presence mismatch between batch and streaming"),
}
}
mod proptest_batch_streaming_parity {
use super::*;
use proptest::prelude::*;
fn trade_sequence(n: usize, base_price: f64, volatility: f64) -> Vec<Tick> {
let mut trades = Vec::with_capacity(n);
let mut price = base_price;
let base_ts = 1640995200000i64;
for i in 0..n {
let step = ((i as f64 * 0.3).sin() * volatility)
+ ((i as f64 * 0.07).cos() * volatility * 0.5);
price += step;
if price < 100.0 {
price = 100.0 + (i as f64 * 0.01).sin().abs() * 50.0;
}
let trade = test_utils::create_test_agg_trade_with_range(
i as i64 + 1,
&format!("{:.8}", price),
"1.50000000",
base_ts + (i as i64 * 500), (i as i64 + 1) * 10,
(i as i64 + 1) * 10,
i % 3 != 0, );
trades.push(trade);
}
trades
}
fn assert_bar_parity(i: usize, batch: &OpenDeviationBar, stream: &OpenDeviationBar) {
assert_eq!(batch.open_time, stream.open_time, "Bar {i}: open_time");
assert_eq!(batch.close_time, stream.close_time, "Bar {i}: close_time");
assert_eq!(batch.open, stream.open, "Bar {i}: open");
assert_eq!(batch.high, stream.high, "Bar {i}: high");
assert_eq!(batch.low, stream.low, "Bar {i}: low");
assert_eq!(batch.close, stream.close, "Bar {i}: close");
assert_eq!(batch.volume, stream.volume, "Bar {i}: volume");
assert_eq!(batch.turnover, stream.turnover, "Bar {i}: turnover");
assert_eq!(batch.buy_volume, stream.buy_volume, "Bar {i}: buy_volume");
assert_eq!(
batch.sell_volume, stream.sell_volume,
"Bar {i}: sell_volume"
);
assert_eq!(
batch.buy_turnover, stream.buy_turnover,
"Bar {i}: buy_turnover"
);
assert_eq!(
batch.sell_turnover, stream.sell_turnover,
"Bar {i}: sell_turnover"
);
assert_eq!(
batch.individual_trade_count, stream.individual_trade_count,
"Bar {i}: trade_count"
);
assert_eq!(
batch.agg_record_count, stream.agg_record_count,
"Bar {i}: agg_record_count"
);
assert_eq!(
batch.first_trade_id, stream.first_trade_id,
"Bar {i}: first_trade_id"
);
assert_eq!(
batch.last_trade_id, stream.last_trade_id,
"Bar {i}: last_trade_id"
);
assert_eq!(
batch.first_agg_trade_id, stream.first_agg_trade_id,
"Bar {i}: first_agg_trade_id"
);
assert_eq!(
batch.last_agg_trade_id, stream.last_agg_trade_id,
"Bar {i}: last_agg_trade_id"
);
assert_eq!(
batch.buy_trade_count, stream.buy_trade_count,
"Bar {i}: buy_trade_count"
);
assert_eq!(
batch.sell_trade_count, stream.sell_trade_count,
"Bar {i}: sell_trade_count"
);
assert_eq!(batch.vwap, stream.vwap, "Bar {i}: vwap");
assert_eq!(
batch.duration_us, stream.duration_us,
"Bar {i}: duration_us"
);
assert_eq!(batch.ofi.to_bits(), stream.ofi.to_bits(), "Bar {i}: ofi");
assert_eq!(
batch.vwap_close_deviation.to_bits(),
stream.vwap_close_deviation.to_bits(),
"Bar {i}: vwap_close_dev"
);
assert_eq!(
batch.price_impact.to_bits(),
stream.price_impact.to_bits(),
"Bar {i}: price_impact"
);
assert_eq!(
batch.kyle_lambda_proxy.to_bits(),
stream.kyle_lambda_proxy.to_bits(),
"Bar {i}: kyle_lambda"
);
assert_eq!(
batch.trade_intensity.to_bits(),
stream.trade_intensity.to_bits(),
"Bar {i}: trade_intensity"
);
assert_eq!(
batch.volume_per_trade.to_bits(),
stream.volume_per_trade.to_bits(),
"Bar {i}: vol_per_trade"
);
assert_eq!(
batch.aggression_ratio.to_bits(),
stream.aggression_ratio.to_bits(),
"Bar {i}: aggression_ratio"
);
assert_eq!(
batch.aggregation_density_f64.to_bits(),
stream.aggregation_density_f64.to_bits(),
"Bar {i}: agg_density"
);
assert_eq!(
batch.turnover_imbalance.to_bits(),
stream.turnover_imbalance.to_bits(),
"Bar {i}: turnover_imbalance"
);
}
proptest! {
#[test]
fn batch_streaming_parity_random(
n in 200usize..500,
volatility in 10.0f64..200.0,
) {
let trades = trade_sequence(n, 50000.0, volatility);
let mut batch_proc = OpenDeviationBarProcessor::new(250).unwrap();
let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
let batch_incomplete = batch_proc.get_incomplete_bar();
let mut stream_proc = OpenDeviationBarProcessor::new(250).unwrap();
let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
for trade in &trades {
if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
stream_bars.push(bar);
}
}
let stream_incomplete = stream_proc.get_incomplete_bar();
prop_assert_eq!(batch_bars.len(), stream_bars.len(),
"Completed bar count mismatch: batch={}, stream={} for n={}, vol={}",
batch_bars.len(), stream_bars.len(), n, volatility);
for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
assert_bar_parity(i, b, s);
}
match (&batch_incomplete, &stream_incomplete) {
(Some(b), Some(s)) => assert_bar_parity(batch_bars.len(), b, s),
(None, None) => {}
_ => prop_assert!(false,
"Incomplete bar presence mismatch: batch={}, stream={}",
batch_incomplete.is_some(), stream_incomplete.is_some()),
}
}
#[test]
fn batch_streaming_parity_thresholds(
threshold in 100u32..1000,
) {
let trades = trade_sequence(300, 50000.0, 80.0);
let mut batch_proc = OpenDeviationBarProcessor::new(threshold).unwrap();
let batch_bars = batch_proc.process_agg_trade_records(&trades).unwrap();
let mut stream_proc = OpenDeviationBarProcessor::new(threshold).unwrap();
let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
for trade in &trades {
if let Some(bar) = stream_proc.process_single_trade(trade).unwrap() {
stream_bars.push(bar);
}
}
prop_assert_eq!(batch_bars.len(), stream_bars.len(),
"Bar count mismatch at threshold={}", threshold);
for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
assert_bar_parity(i, b, s);
}
}
}
}
#[test]
fn test_defer_open_new_bar_opens_with_next_trade() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
assert!(processor.process_single_trade(&t1).unwrap().is_none());
let t2 = test_utils::create_test_agg_trade(2, "50150.0", "2.0", 2000);
let bar = processor.process_single_trade(&t2).unwrap();
assert!(bar.is_some(), "Should close bar on breach");
let closed_bar = bar.unwrap();
assert_eq!(closed_bar.open.to_string(), "50000.00000000");
assert_eq!(closed_bar.close.to_string(), "50150.00000000");
assert!(
processor.get_incomplete_bar().is_none(),
"No incomplete bar after breach - defer_open is true"
);
let t3 = test_utils::create_test_agg_trade(3, "50100.0", "3.0", 3000);
assert!(processor.process_single_trade(&t3).unwrap().is_none());
let incomplete = processor.get_incomplete_bar().unwrap();
assert_eq!(
incomplete.open.to_string(),
"50100.00000000",
"New bar should open at trade 3's price, not trade 2's"
);
}
#[test]
fn test_bar_close_take_single_trade() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::single_breach_sequence(250);
for trade in &trades[..trades.len() - 1] {
let result = processor.process_single_trade(trade).unwrap();
assert!(result.is_none());
}
let bar = processor
.process_single_trade(trades.last().unwrap())
.unwrap()
.expect("Should produce completed bar");
assert_eq!(bar.open.to_string(), "50000.00000000");
assert!(bar.high >= bar.open.max(bar.close));
assert!(bar.low <= bar.open.min(bar.close));
assert!(bar.volume > 0);
assert!(processor.get_incomplete_bar().is_none());
}
#[test]
fn test_bar_close_take_batch() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::large_sequence(500);
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert!(
!bars.is_empty(),
"Should produce at least one completed bar"
);
for bar in &bars {
assert!(bar.high >= bar.open.max(bar.close));
assert!(bar.low <= bar.open.min(bar.close));
assert!(bar.volume > 0);
assert!(bar.close_time >= bar.open_time);
}
}
#[test]
fn test_checkpoint_conditional_clone() {
let trades = scenarios::no_breach_sequence(250);
let mut processor1 = OpenDeviationBarProcessor::new(250).unwrap();
let bars_without = processor1.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars_without.len(), 0);
assert!(processor1.get_incomplete_bar().is_some());
let mut processor2 = OpenDeviationBarProcessor::new(250).unwrap();
let bars_with = processor2
.process_agg_trade_records_with_incomplete(&trades)
.unwrap();
assert_eq!(bars_with.len(), 1);
assert!(processor2.get_incomplete_bar().is_some());
let cp1 = processor1.get_incomplete_bar().unwrap();
let cp2 = processor2.get_incomplete_bar().unwrap();
assert_eq!(cp1.open, cp2.open);
assert_eq!(cp1.close, cp2.close);
assert_eq!(cp1.high, cp2.high);
assert_eq!(cp1.low, cp2.low);
}
#[test]
fn test_checkpoint_v1_to_v2_migration() {
let v1_json = r#"{
"symbol": "BTCUSDT",
"threshold_decimal_bps": 250,
"incomplete_bar": null,
"thresholds": null,
"last_timestamp_us": 1640995200000000,
"last_trade_id": 5000,
"price_hash": 0,
"anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0},
"prevent_same_timestamp_close": true,
"defer_open": false
}"#;
let checkpoint: Checkpoint = serde_json::from_str(v1_json).unwrap();
assert_eq!(
checkpoint.version, 1,
"Old checkpoints should default to v1"
);
assert_eq!(checkpoint.symbol, "BTCUSDT");
assert_eq!(checkpoint.threshold_decimal_bps, 250);
let mut processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
assert!(
!processor.get_incomplete_bar().is_some(),
"No incomplete bar before processing"
);
let trades = scenarios::single_breach_sequence(250);
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert!(
!bars.is_empty(),
"Should produce bars after v1→v2 migration"
);
assert!(bars[0].volume > 0, "Bar should have volume after migration");
assert!(
bars[0].close_time >= bars[0].open_time,
"Bar times should be valid"
);
let new_checkpoint = processor.create_checkpoint("BTCUSDT");
assert_eq!(new_checkpoint.version, 2, "New checkpoints should be v2");
assert_eq!(new_checkpoint.symbol, "BTCUSDT");
let json = serde_json::to_string(&new_checkpoint).unwrap();
let restored: Checkpoint = serde_json::from_str(&json).unwrap();
assert_eq!(restored.version, 2);
assert_eq!(restored.symbol, "BTCUSDT");
}
#[test]
fn test_from_checkpoint_invalid_threshold_zero() {
let checkpoint = Checkpoint::new("BTCUSDT".to_string(), 0, None, None, 0, None, 0, true);
match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
Err(CheckpointError::InvalidThreshold { threshold: 0, .. }) => {}
other => panic!("Expected InvalidThreshold(0), got {:?}", other.err()),
}
}
#[test]
fn test_from_checkpoint_invalid_threshold_too_high() {
let checkpoint = Checkpoint::new("BTCUSDT".to_string(), 200_000, None, None, 0, None, 0, true);
match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
Err(CheckpointError::InvalidThreshold {
threshold: 200_000, ..
}) => {}
other => panic!("Expected InvalidThreshold(200000), got {:?}", other.err()),
}
}
#[test]
fn test_from_checkpoint_missing_thresholds() {
let _t = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let bar = OpenDeviationBar::new(&_t);
let mut checkpoint = Checkpoint::new("BTCUSDT".to_string(), 250, None, None, 0, None, 0, true);
checkpoint.incomplete_bar = Some(bar);
checkpoint.thresholds = None;
match OpenDeviationBarProcessor::from_checkpoint(checkpoint) {
Err(CheckpointError::MissingThresholds) => {}
other => panic!("Expected MissingThresholds, got {:?}", other.err()),
}
}
#[test]
fn test_from_checkpoint_unknown_version_treated_as_v2() {
let mut checkpoint = Checkpoint::new("BTCUSDT".to_string(), 250, None, None, 0, None, 0, true);
checkpoint.version = 99;
let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
assert_eq!(processor.threshold_decimal_bps(), 250);
}
#[test]
fn test_from_checkpoint_valid_with_incomplete_bar() {
use opendeviationbar_core::fixed_point::FixedPoint;
let _t = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let bar = OpenDeviationBar::new(&_t);
let upper = FixedPoint::from_str("50125.0").unwrap();
let lower = FixedPoint::from_str("49875.0").unwrap();
let checkpoint = Checkpoint::new(
"BTCUSDT".to_string(),
250,
Some(bar),
Some((upper, lower)),
0,
None,
0,
true,
);
let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
assert!(
processor.get_incomplete_bar().is_some(),
"Should restore incomplete bar"
);
}
#[test]
fn test_reset_at_ouroboros_with_orphan() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 2000);
assert!(processor.process_single_trade(&t1).unwrap().is_none());
assert!(processor.process_single_trade(&t2).unwrap().is_none());
assert!(
processor.get_incomplete_bar().is_some(),
"Should have incomplete bar"
);
let orphan = processor.reset_at_ouroboros();
assert!(orphan.is_some(), "Should return orphaned bar");
let orphan_bar = orphan.unwrap();
assert_eq!(orphan_bar.open.to_string(), "50000.00000000");
assert!(
processor.get_incomplete_bar().is_none(),
"No bar after reset"
);
}
#[test]
fn test_reset_at_ouroboros_clean_state() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let orphan = processor.reset_at_ouroboros();
assert!(orphan.is_none(), "No orphan when state is clean");
assert!(processor.get_incomplete_bar().is_none());
}
#[test]
fn test_reset_at_ouroboros_clears_defer_open() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); processor.process_single_trade(&t1).unwrap();
let bar = processor.process_single_trade(&t2).unwrap();
assert!(bar.is_some(), "Should breach");
assert!(processor.get_incomplete_bar().is_none());
processor.reset_at_ouroboros();
let t3 = test_utils::create_test_agg_trade(3, "50000.0", "1.0", 3000);
processor.process_single_trade(&t3).unwrap();
assert!(
processor.get_incomplete_bar().is_some(),
"Should have new bar after reset"
);
}
#[test]
fn test_reset_at_ouroboros_computes_duration_us() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1_000_000); let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 2_000_000); processor.process_single_trade(&t1).unwrap();
processor.process_single_trade(&t2).unwrap();
let orphan = processor
.reset_at_ouroboros()
.expect("Should return orphaned bar");
assert_eq!(
orphan.duration_us, 1_000_000,
"Orphan bar from reset_at_ouroboros must have computed duration_us (Issue #275)"
);
assert!(
orphan.trade_intensity > 0.0,
"Orphan bar must have non-zero trade_intensity when duration > 0"
);
}
#[test]
fn test_get_incomplete_bar_computes_duration_us() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1_000_000);
let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 2_000_000);
processor.process_single_trade(&t1).unwrap();
processor.process_single_trade(&t2).unwrap();
let incomplete = processor
.get_incomplete_bar()
.expect("Should have incomplete bar");
assert_eq!(
incomplete.duration_us, 1_000_000,
"Incomplete bar from get_incomplete_bar must have computed duration_us (Issue #275)"
);
assert!(
incomplete.trade_intensity > 0.0,
"Incomplete bar must have non-zero trade_intensity when duration > 0"
);
}
#[test]
fn test_single_trade_no_bar() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let bars = processor.process_agg_trade_records(&[trade]).unwrap();
assert_eq!(
bars.len(),
0,
"Single trade should not produce a completed bar"
);
assert!(
processor.get_incomplete_bar().is_some(),
"Should have incomplete bar"
);
}
#[test]
fn test_identical_timestamps_no_close() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1000); let bars = processor.process_agg_trade_records(&[t1, t2]).unwrap();
assert_eq!(
bars.len(),
0,
"Bar should not close on same timestamp as open (Issue #36)"
);
}
#[test]
fn test_identical_timestamps_then_different_closes() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let t2 = test_utils::create_test_agg_trade(2, "50050.0", "1.0", 1000); let t3 = test_utils::create_test_agg_trade(3, "50200.0", "1.0", 2000); let bars = processor.process_agg_trade_records(&[t1, t2, t3]).unwrap();
assert_eq!(
bars.len(),
1,
"Should close when breach at different timestamp"
);
}
#[test]
fn test_streaming_defer_open_semantics() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let t1 = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let t2 = test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000); let t3 = test_utils::create_test_agg_trade(3, "51000.0", "1.0", 3000);
processor.process_single_trade(&t1).unwrap();
let bar = processor.process_single_trade(&t2).unwrap();
assert!(bar.is_some(), "Trade 2 should cause a breach");
assert!(processor.get_incomplete_bar().is_none());
let bar2 = processor.process_single_trade(&t3).unwrap();
assert!(bar2.is_none(), "Trade 3 should open new bar, not breach");
let incomplete = processor.get_incomplete_bar().unwrap();
assert_eq!(
incomplete.open.to_f64(),
51000.0,
"New bar should open at t3 price"
);
}
#[test]
fn test_process_empty_then_trades() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let bars = processor.process_agg_trade_records(&[]).unwrap();
assert_eq!(bars.len(), 0);
assert!(processor.get_incomplete_bar().is_none());
let trade = test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000);
let bars = processor.process_agg_trade_records(&[trade]).unwrap();
assert_eq!(bars.len(), 0);
assert!(processor.get_incomplete_bar().is_some());
}
#[test]
fn test_multiple_breaches_in_batch() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
test_utils::create_test_agg_trade(2, "50200.0", "1.0", 2000), test_utils::create_test_agg_trade(3, "50500.0", "1.0", 3000), test_utils::create_test_agg_trade(4, "50700.0", "1.0", 4000), test_utils::create_test_agg_trade(5, "51000.0", "1.0", 5000), ];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(
bars.len(),
2,
"Should produce 2 completed bars from 2 breaches"
);
}
#[test]
fn test_streaming_batch_parity_extended() {
let threshold = 100;
let mut trades = Vec::new();
let mut price = 50000.0;
for i in 0..20 {
if i % 3 == 0 && i > 0 {
price *= 1.002; } else if i % 3 == 1 && i > 1 {
price *= 0.998; } else {
price *= 1.0005; }
trades.push(test_utils::create_test_agg_trade(
(i + 1) as i64,
&format!("{:.8}", price),
"1.0",
(i as i64 + 1) * 1000,
));
}
let mut batch_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
let batch_bars = batch_processor.process_agg_trade_records(&trades).unwrap();
let mut stream_processor = OpenDeviationBarProcessor::new(threshold).unwrap();
let mut stream_bars: Vec<OpenDeviationBar> = Vec::new();
for trade in &trades {
if let Some(bar) = stream_processor.process_single_trade(trade).unwrap() {
stream_bars.push(bar);
}
}
assert!(
batch_bars.len() >= 3,
"Should produce at least 3 bars from zigzag pattern"
);
assert_eq!(
batch_bars.len(),
stream_bars.len(),
"Batch ({}) and streaming ({}) bar count mismatch",
batch_bars.len(),
stream_bars.len()
);
for (i, (b, s)) in batch_bars.iter().zip(stream_bars.iter()).enumerate() {
assert_eq!(b.open, s.open, "Bar {i}: open mismatch");
assert_eq!(b.close, s.close, "Bar {i}: close mismatch");
assert_eq!(b.high, s.high, "Bar {i}: high mismatch");
assert_eq!(b.low, s.low, "Bar {i}: low mismatch");
assert_eq!(b.volume, s.volume, "Bar {i}: volume mismatch");
assert_eq!(b.open_time, s.open_time, "Bar {i}: open_time mismatch");
assert_eq!(b.close_time, s.close_time, "Bar {i}: close_time mismatch");
assert_eq!(
b.individual_trade_count, s.individual_trade_count,
"Bar {i}: trade_count mismatch"
);
}
let batch_inc = batch_processor.get_incomplete_bar();
let stream_inc = stream_processor.get_incomplete_bar();
match (&batch_inc, &stream_inc) {
(Some(b), Some(s)) => {
assert_eq!(b.open, s.open, "Incomplete: open mismatch");
assert_eq!(b.volume, s.volume, "Incomplete: volume mismatch");
}
(None, None) => {}
_ => panic!("Incomplete bar presence mismatch"),
}
}
#[test]
fn test_multi_batch_sequential_state_continuity() {
let mut processor = OpenDeviationBarProcessor::new(100).unwrap(); let mut all_bars = Vec::new();
let batch1 = vec![
test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1000),
test_utils::create_test_agg_trade(2, "50020.0", "1.0", 2000),
test_utils::create_test_agg_trade(3, "50060.0", "1.0", 3000), ];
let bars1 = processor.process_agg_trade_records(&batch1).unwrap();
all_bars.extend(bars1);
let batch2 = vec![
test_utils::create_test_agg_trade(4, "50100.0", "1.0", 4000), test_utils::create_test_agg_trade(5, "50120.0", "1.0", 5000),
test_utils::create_test_agg_trade(6, "50170.0", "1.0", 6000), ];
let bars2 = processor.process_agg_trade_records(&batch2).unwrap();
all_bars.extend(bars2);
let batch3 = vec![
test_utils::create_test_agg_trade(7, "50200.0", "1.0", 7000), test_utils::create_test_agg_trade(8, "50220.0", "1.0", 8000),
test_utils::create_test_agg_trade(9, "50280.0", "1.0", 9000), ];
let bars3 = processor.process_agg_trade_records(&batch3).unwrap();
all_bars.extend(bars3);
assert!(
all_bars.len() >= 3,
"Expected at least 3 bars from 3 batches, got {}",
all_bars.len()
);
for i in 1..all_bars.len() {
assert!(
all_bars[i].close_time >= all_bars[i - 1].close_time,
"Bar {i}: close_time {} < previous {}",
all_bars[i].close_time,
all_bars[i - 1].close_time
);
}
for i in 1..all_bars.len() {
assert_eq!(
all_bars[i].first_agg_trade_id,
all_bars[i - 1].last_agg_trade_id + 1,
"Bar {i}: trade ID gap (first={}, prev last={})",
all_bars[i].first_agg_trade_id,
all_bars[i - 1].last_agg_trade_id
);
}
}
#[test]
fn test_same_timestamp_prevents_bar_close() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades: Vec<Tick> = (0..5)
.map(|i| {
let price_str = if i == 0 {
"50000.0".to_string()
} else {
format!("{}.0", 50000 + (i + 1) * 200)
};
Tick {
ref_id: i as i64,
price: FixedPoint::from_str(&price_str).unwrap(),
volume: FixedPoint::from_str("1.0").unwrap(),
first_sub_id: i as i64,
last_sub_id: i as i64,
timestamp: 1000000, is_buyer_maker: false,
is_best_match: None,
best_bid: None,
best_ask: None,
}
})
.collect();
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(
bars.len(),
0,
"Same timestamp should prevent bar close (Issue #36)"
);
}
#[test]
fn test_single_trade_incomplete_bar() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trade = Tick {
ref_id: 1,
price: FixedPoint::from_str("50000.0").unwrap(),
volume: FixedPoint::from_str("10.0").unwrap(),
first_sub_id: 1,
last_sub_id: 1,
timestamp: 1000000,
is_buyer_maker: false,
is_best_match: None,
best_bid: None,
best_ask: None,
};
let bars = processor
.process_agg_trade_records(&[trade.clone()])
.unwrap();
assert_eq!(bars.len(), 0, "Single trade cannot complete a bar");
let mut processor2 = OpenDeviationBarProcessor::new(250).unwrap();
let bars_incl = processor2
.process_agg_trade_records_with_incomplete(&[trade])
.unwrap();
assert_eq!(bars_incl.len(), 1, "Should return 1 incomplete bar");
assert_eq!(bars_incl[0].open, bars_incl[0].close);
assert_eq!(bars_incl[0].high, bars_incl[0].low);
}
#[test]
fn test_with_options_gate_disabled_same_timestamp_closes() {
let mut processor = OpenDeviationBarProcessor::with_options(250, false).unwrap();
assert!(!processor.prevent_same_timestamp_close());
let trades = vec![
Tick {
ref_id: 1,
price: FixedPoint::from_str("50000.0").unwrap(),
volume: FixedPoint::from_str("1.0").unwrap(),
first_sub_id: 1,
last_sub_id: 1,
timestamp: 1000000,
is_buyer_maker: false,
is_best_match: None,
best_bid: None,
best_ask: None,
},
Tick {
ref_id: 2,
price: FixedPoint::from_str("50200.0").unwrap(), volume: FixedPoint::from_str("1.0").unwrap(),
first_sub_id: 2,
last_sub_id: 2,
timestamp: 1000000, is_buyer_maker: false,
is_best_match: None,
best_bid: None,
best_ask: None,
},
];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(
bars.len(),
1,
"Gate disabled: same-timestamp breach should close bar"
);
}
#[test]
fn test_inter_bar_config_enables_features() {
use opendeviationbar_core::interbar::LookbackMode;
let processor = OpenDeviationBarProcessor::new(250).unwrap();
assert!(
!processor.inter_bar_enabled(),
"Default: inter-bar disabled"
);
let processor = processor.with_inter_bar_config(InterBarConfig {
lookback_mode: LookbackMode::FixedCount(100),
compute_tier2: false,
compute_tier3: false,
..Default::default()
});
assert!(
processor.inter_bar_enabled(),
"After config: inter-bar enabled"
);
}
#[test]
fn test_intra_bar_feature_toggle() {
let processor = OpenDeviationBarProcessor::new(250).unwrap();
assert!(
!processor.intra_bar_enabled(),
"Default: intra-bar disabled"
);
let processor = processor.with_intra_bar_features();
assert!(
processor.intra_bar_enabled(),
"After toggle: intra-bar enabled"
);
}
#[test]
fn test_set_inter_bar_config_after_construction() {
use opendeviationbar_core::interbar::LookbackMode;
let mut processor = OpenDeviationBarProcessor::new(500).unwrap();
assert!(!processor.inter_bar_enabled());
processor.set_inter_bar_config(InterBarConfig {
lookback_mode: LookbackMode::FixedCount(200),
compute_tier2: true,
compute_tier3: false,
..Default::default()
});
assert!(
processor.inter_bar_enabled(),
"set_inter_bar_config should enable"
);
}
#[test]
fn test_process_with_options_incomplete_false_vs_true() {
let trades = scenarios::single_breach_sequence(250);
let mut p1 = OpenDeviationBarProcessor::new(250).unwrap();
let bars_strict = p1
.process_agg_trade_records_with_options(&trades, false)
.unwrap();
let mut p2 = OpenDeviationBarProcessor::new(250).unwrap();
let bars_incl = p2
.process_agg_trade_records_with_options(&trades, true)
.unwrap();
assert!(
bars_incl.len() >= bars_strict.len(),
"inclusive ({}) must be >= strict ({})",
bars_incl.len(),
bars_strict.len()
);
}
#[test]
fn test_anomaly_summary_default_no_anomalies() {
let processor = OpenDeviationBarProcessor::new(250).unwrap();
let summary = processor.anomaly_summary();
assert_eq!(summary.gaps_detected, 0);
assert_eq!(summary.overlaps_detected, 0);
assert_eq!(summary.timestamp_anomalies, 0);
assert!(!summary.has_anomalies());
assert_eq!(summary.total(), 0);
}
#[test]
fn test_anomaly_summary_preserved_through_checkpoint() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::single_breach_sequence(250);
processor.process_agg_trade_records(&trades).unwrap();
let checkpoint = processor.create_checkpoint("TEST");
let restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
let summary = restored.anomaly_summary();
assert_eq!(summary.total(), 0);
}
#[test]
fn test_anomaly_summary_from_checkpoint_with_anomalies() {
let json = r#"{
"version": 3,
"symbol": "TESTUSDT",
"threshold_decimal_bps": 250,
"prevent_same_timestamp_close": true,
"defer_open": false,
"current_bar": null,
"thresholds": null,
"last_timestamp_us": 1000000,
"last_trade_id": 5,
"price_hash": 0,
"anomaly_summary": {"gaps_detected": 3, "overlaps_detected": 1, "timestamp_anomalies": 2}
}"#;
let checkpoint: opendeviationbar_core::checkpoint::Checkpoint =
serde_json::from_str(json).unwrap();
let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
let summary = processor.anomaly_summary();
assert_eq!(summary.gaps_detected, 3);
assert_eq!(summary.overlaps_detected, 1);
assert_eq!(summary.timestamp_anomalies, 2);
assert!(summary.has_anomalies());
assert_eq!(summary.total(), 6);
}
#[test]
fn test_with_inter_bar_config_and_cache_shared() {
use opendeviationbar_core::entropy_cache_global::get_global_entropy_cache;
use opendeviationbar_core::interbar::LookbackMode;
let global_cache = get_global_entropy_cache();
let config = InterBarConfig {
lookback_mode: LookbackMode::FixedCount(100),
compute_tier2: true,
compute_tier3: true,
..Default::default()
};
let p1 = OpenDeviationBarProcessor::new(250)
.unwrap()
.with_inter_bar_config_and_cache(config.clone(), Some(global_cache.clone()));
let p2 = OpenDeviationBarProcessor::new(500)
.unwrap()
.with_inter_bar_config_and_cache(config, Some(global_cache));
assert!(p1.inter_bar_enabled());
assert!(p2.inter_bar_enabled());
}
#[test]
fn test_set_inter_bar_config_with_cache_after_checkpoint() {
use opendeviationbar_core::entropy_cache_global::get_global_entropy_cache;
use opendeviationbar_core::interbar::LookbackMode;
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = scenarios::single_breach_sequence(250);
processor.process_agg_trade_records(&trades).unwrap();
let checkpoint = processor.create_checkpoint("TEST");
let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
assert!(
!restored.inter_bar_enabled(),
"Checkpoint does not preserve inter-bar config"
);
let global_cache = get_global_entropy_cache();
restored.set_inter_bar_config_with_cache(
InterBarConfig {
lookback_mode: LookbackMode::FixedCount(100),
compute_tier2: false,
compute_tier3: false,
..Default::default()
},
Some(global_cache),
);
assert!(
restored.inter_bar_enabled(),
"set_inter_bar_config_with_cache should re-enable"
);
}
#[test]
fn test_threshold_decimal_bps_getter() {
let p250 = OpenDeviationBarProcessor::new(250).unwrap();
assert_eq!(p250.threshold_decimal_bps(), 250);
let p1000 = OpenDeviationBarProcessor::new(1000).unwrap();
assert_eq!(p1000.threshold_decimal_bps(), 1000);
}
#[test]
fn test_checkpoint_gap_discards_forming_bar() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000), test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000), test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640995202_000_000), ];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 0, "No breach = no completed bars");
let checkpoint = processor.create_checkpoint("BTCUSDT");
assert!(checkpoint.has_incomplete_bar(), "Should have forming bar");
let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
let gap_trades = vec![
test_utils::create_test_agg_trade(4, "50030.0", "1.0", 1641002402_000_000), test_utils::create_test_agg_trade(5, "50040.0", "1.0", 1641002403_000_000),
];
let bars = restored.process_agg_trade_records(&gap_trades).unwrap();
assert_eq!(
bars.len(),
0,
"No bars should complete — forming bar was discarded"
);
assert_eq!(
restored.anomaly_summary().gaps_detected,
1,
"Gap should be recorded in anomaly summary"
);
}
#[test]
fn test_checkpoint_small_gap_continues_bar() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640995201_000_000),
];
let _ = processor.process_agg_trade_records(&trades).unwrap();
let checkpoint = processor.create_checkpoint("BTCUSDT");
let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
let small_gap_trades = vec![
test_utils::create_test_agg_trade(3, "50020.0", "1.0", 1640997001_000_000), test_utils::create_test_agg_trade(4, "50125.01", "1.0", 1640997002_000_000), ];
let bars = restored
.process_agg_trade_records(&small_gap_trades)
.unwrap();
assert_eq!(bars.len(), 1, "Bar should complete normally with small gap");
assert_eq!(bars[0].open_time, 1640995200_000_000);
assert_eq!(
restored.anomaly_summary().gaps_detected,
0,
"No gap anomaly for small gap"
);
}
#[test]
fn test_checkpoint_gap_custom_max_gap() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![test_utils::create_test_agg_trade(
1,
"50000.0",
"1.0",
1640995200_000_000,
)];
let _ = processor.process_agg_trade_records(&trades).unwrap();
let checkpoint = processor.create_checkpoint("BTCUSDT");
let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint)
.unwrap()
.with_max_gap(1_800_000_000);
let gap_trades = vec![
test_utils::create_test_agg_trade(2, "50010.0", "1.0", 1640997900_000_000), ];
let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
assert_eq!(
restored.anomaly_summary().gaps_detected,
1,
"45-min gap should be detected with 30-min threshold"
);
}
#[test]
fn test_is_valid_range_rejects_oversized() {
use opendeviationbar_core::fixed_point::FixedPoint;
let threshold_decimal_bps: u32 = 250; let threshold_ratio = ((threshold_decimal_bps as i64)
* opendeviationbar_core::fixed_point::SCALE)
/ (opendeviationbar_core::fixed_point::BASIS_POINTS_SCALE as i64);
let mut oversized = OpenDeviationBar::default();
oversized.open = FixedPoint::from_str("50000.0").unwrap();
oversized.high = FixedPoint::from_str("50250.01").unwrap();
oversized.low = FixedPoint::from_str("50000.0").unwrap();
assert!(
!oversized.is_valid_range(threshold_ratio, 2),
"Bar exceeding 2x threshold should be invalid"
);
let mut valid = OpenDeviationBar::default();
valid.open = FixedPoint::from_str("50000.0").unwrap();
valid.high = FixedPoint::from_str("50100.0").unwrap();
valid.low = FixedPoint::from_str("50000.0").unwrap();
assert!(
valid.is_valid_range(threshold_ratio, 2),
"Bar within threshold should be valid"
);
let mut exact = OpenDeviationBar::default();
exact.open = FixedPoint::from_str("50000.0").unwrap();
exact.high = FixedPoint::from_str("50125.0").unwrap();
exact.low = FixedPoint::from_str("50000.0").unwrap();
assert!(
exact.is_valid_range(threshold_ratio, 2),
"Bar at exact threshold should be valid"
);
}
#[test]
fn test_checkpoint_no_incomplete_bar_gap_is_noop() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(1, "50000.0", "1.0", 1640995200_000_000),
test_utils::create_test_agg_trade(2, "50200.0", "1.0", 1640995201_000_000), ];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 1);
let checkpoint = processor.create_checkpoint("BTCUSDT");
assert!(!checkpoint.has_incomplete_bar());
let mut restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
let gap_trades = vec![
test_utils::create_test_agg_trade(3, "50010.0", "1.0", 1641081600_000_000), ];
let _ = restored.process_agg_trade_records(&gap_trades).unwrap();
assert_eq!(
restored.anomaly_summary().gaps_detected,
0,
"No gap anomaly when no forming bar exists"
);
}
#[test]
fn test_last_completed_bar_tid_fresh_processor() {
let processor = OpenDeviationBarProcessor::new(1000).unwrap();
assert_eq!(
processor.last_completed_bar_tid(),
None,
"Fresh processor should have last_completed_bar_tid == None"
);
}
#[test]
fn test_last_completed_bar_tid_after_bar_completion() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200_000_000),
test_utils::create_test_agg_trade(101, "50050.0", "1.0", 1640995201_000_000),
test_utils::create_test_agg_trade(102, "50125.0", "1.0", 1640995202_000_000), test_utils::create_test_agg_trade(103, "50500.0", "1.0", 1640995203_000_000), ];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 1, "Should have exactly one completed bar");
assert_eq!(bars[0].last_agg_trade_id, 102);
assert_eq!(
processor.last_completed_bar_tid(),
Some(102),
"last_completed_bar_tid should be set to completed bar's last_agg_trade_id"
);
assert_eq!(
processor.last_agg_trade_id(),
Some(103),
"last_agg_trade_id should include the forming bar tail"
);
}
#[test]
fn test_last_completed_bar_tid_no_bar_completed() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200_000_000),
test_utils::create_test_agg_trade(101, "50050.0", "1.0", 1640995201_000_000),
test_utils::create_test_agg_trade(102, "50100.0", "1.0", 1640995202_000_000),
];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 0, "No bars should complete");
assert_eq!(
processor.last_completed_bar_tid(),
None,
"last_completed_bar_tid should remain None when no bar completes"
);
assert_eq!(
processor.last_agg_trade_id(),
Some(102),
"last_agg_trade_id should be updated by every trade"
);
}
#[test]
fn test_last_completed_bar_tid_after_orphan_emission() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(200, "50000.0", "1.0", 1640995200_000_000),
test_utils::create_test_agg_trade(201, "50050.0", "1.0", 1640995201_000_000),
];
processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(processor.last_completed_bar_tid(), None);
let orphan = processor.reset_at_ouroboros();
assert!(orphan.is_some(), "Should emit orphan bar");
let orphan_bar = orphan.unwrap();
assert_eq!(orphan_bar.last_agg_trade_id, 201);
assert_eq!(
processor.last_completed_bar_tid(),
Some(201),
"last_completed_bar_tid should be set to orphan bar's last_agg_trade_id"
);
}
#[test]
fn test_last_completed_bar_tid_ouroboros_no_forming_bar() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200_000_000),
test_utils::create_test_agg_trade(101, "50050.0", "1.0", 1640995201_000_000),
test_utils::create_test_agg_trade(102, "50125.0", "1.0", 1640995202_000_000), ];
let bars = processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(bars.len(), 1);
assert_eq!(processor.last_completed_bar_tid(), Some(102));
let orphan = processor.reset_at_ouroboros();
assert!(orphan.is_none(), "No forming bar to orphan");
assert_eq!(
processor.last_completed_bar_tid(),
Some(102),
"last_completed_bar_tid should persist across ouroboros reset when no orphan emitted"
);
}
#[test]
fn test_last_completed_bar_tid_checkpoint_round_trip() {
let mut processor = OpenDeviationBarProcessor::new(250).unwrap();
let trades = vec![
test_utils::create_test_agg_trade(100, "50000.0", "1.0", 1640995200_000_000),
test_utils::create_test_agg_trade(101, "50125.0", "1.0", 1640995201_000_000), test_utils::create_test_agg_trade(102, "50500.0", "1.0", 1640995202_000_000), ];
processor.process_agg_trade_records(&trades).unwrap();
assert_eq!(processor.last_completed_bar_tid(), Some(101));
let checkpoint = processor.create_checkpoint("BTCUSDT");
let restored = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
assert_eq!(
restored.last_completed_bar_tid(),
Some(101),
"last_completed_bar_tid should survive checkpoint round-trip"
);
}
#[test]
fn test_last_completed_bar_tid_old_checkpoint_backward_compat() {
let json = r#"{
"version": 2,
"symbol": "BTCUSDT",
"threshold_decimal_bps": 250,
"incomplete_bar": null,
"thresholds": null,
"last_timestamp_us": 1640995200000000,
"last_trade_id": 12345,
"price_hash": 0,
"anomaly_summary": {"gaps_detected": 0, "overlaps_detected": 0, "timestamp_anomalies": 0},
"prevent_same_timestamp_close": true,
"defer_open": false
}"#;
let checkpoint: opendeviationbar_core::checkpoint::Checkpoint =
serde_json::from_str(json).unwrap();
assert_eq!(
checkpoint.last_completed_bar_tid, None,
"Missing field should default to None"
);
let processor = OpenDeviationBarProcessor::from_checkpoint(checkpoint).unwrap();
assert_eq!(
processor.last_completed_bar_tid(),
None,
"Processor restored from old checkpoint should have last_completed_bar_tid == None"
);
}