use rangebar::fixed_point::FixedPoint;
use rangebar::range_bars::ExportRangeBarProcessor;
use rangebar::types::{AggTrade, RangeBar};
use std::fs;
use std::path::Path;
#[tokio::test]
async fn test_single_threshold_consistency() {
println!("🔍 Testing single threshold consistency with 25bps (0.25%)");
let threshold_decimal_bps = 25; let test_data = create_synthetic_test_data();
println!(
" Testing threshold: {}bp ({}%)",
threshold_decimal_bps,
threshold_decimal_bps as f64 / 100.0
);
let batch_bars = process_batch_style(&test_data, threshold_decimal_bps);
let streaming_bars = process_streaming_style(&test_data, threshold_decimal_bps).await;
let batch_count = batch_bars.len();
let streaming_count = streaming_bars.len();
let matches = batch_count == streaming_count;
println!(
" Batch: {} bars, Streaming: {} bars - {}",
batch_count,
streaming_count,
if matches { "✅ MATCH" } else { "❌ MISMATCH" }
);
if !matches {
println!(" 🔍 First 3 batch bars:");
for (i, bar) in batch_bars.iter().take(3).enumerate() {
println!(
" {}: {} -> {} ({}ms)",
i + 1,
bar.open,
bar.close,
bar.close_time - bar.open_time
);
}
println!(" 🔍 First 3 streaming bars:");
for (i, bar) in streaming_bars.iter().take(3).enumerate() {
println!(
" {}: {} -> {} ({}ms)",
i + 1,
bar.open,
bar.close,
bar.close_time - bar.open_time
);
}
}
println!("📊 Test validates that both implementations complete successfully");
assert!(
!batch_bars.is_empty(),
"Batch processing should generate bars"
);
assert!(
!streaming_bars.is_empty(),
"Streaming processing should generate bars"
);
}
#[tokio::test]
async fn test_cross_boundary_scenarios() {
println!("🔍 Testing cross-boundary scenarios");
let boundary_tests = vec![
("exact_threshold_hits", create_exact_threshold_test_data()),
("micro_movements", create_micro_movement_test_data()),
("large_gaps", create_large_gap_test_data()),
("rapid_reversals", create_rapid_reversal_test_data()),
];
let threshold_decimal_bps = 25;
for (test_name, test_data) in boundary_tests {
println!(" 🎯 Testing: {}", test_name);
let batch_bars = process_batch_style(&test_data, threshold_decimal_bps);
let streaming_bars = process_streaming_style(&test_data, threshold_decimal_bps).await;
let batch_count = batch_bars.len();
let streaming_count = streaming_bars.len();
let matches = batch_count == streaming_count;
println!(
" {} -> Batch: {}, Streaming: {} - {}",
test_name,
batch_count,
streaming_count,
if matches { "✅ MATCH" } else { "❌ MISMATCH" }
);
validate_temporal_integrity(&batch_bars, &format!("batch_{}", test_name));
validate_temporal_integrity(&streaming_bars, &format!("streaming_{}", test_name));
}
}
#[tokio::test]
async fn test_breach_consistency_validation() {
println!("🔍 Testing breach consistency validation");
let test_data = create_breach_test_data();
let threshold_decimal_bps = 25;
let batch_bars = process_batch_style(&test_data, threshold_decimal_bps);
let streaming_bars = process_streaming_style(&test_data, threshold_decimal_bps).await;
let batch_violations = validate_breach_consistency(&batch_bars, threshold_decimal_bps);
let streaming_violations = validate_breach_consistency(&streaming_bars, threshold_decimal_bps);
println!(" Batch breach violations: {}", batch_violations);
println!(" Streaming breach violations: {}", streaming_violations);
assert_eq!(
batch_violations, 0,
"Batch processing should have zero breach violations"
);
assert_eq!(
streaming_violations, 0,
"Streaming processing should have zero breach violations"
);
}
#[test]
fn test_edge_case_exact_thresholds() {
println!("🔍 Testing exact threshold edge cases");
let base_price = FixedPoint::from_str("23000.0").unwrap();
let threshold_decimal_bps = 25;
let threshold_fraction = threshold_decimal_bps as f64 / 10000.0;
let upper_exact = base_price.to_f64() * (1.0 + threshold_fraction);
let lower_exact = base_price.to_f64() * (1.0 - threshold_fraction);
let exact_trades = vec![
create_test_trade(1000000, base_price.to_f64(), 1659312000000),
create_test_trade(1000001, upper_exact, 1659312001000), create_test_trade(1000002, upper_exact + 0.01, 1659312002000), create_test_trade(1000003, base_price.to_f64(), 1659312003000), create_test_trade(1000004, lower_exact, 1659312004000), create_test_trade(1000005, lower_exact - 0.01, 1659312005000), ];
let bars = process_batch_style(&exact_trades, threshold_decimal_bps);
println!(" Generated {} bars from exact threshold test", bars.len());
assert!(
bars.len() >= 2,
"Should generate at least 2 bars from exact threshold hits"
);
let violations = validate_breach_consistency(&bars, threshold_decimal_bps);
assert_eq!(
violations, 0,
"Exact threshold test should have zero breach violations"
);
}
fn create_synthetic_test_data() -> Vec<AggTrade> {
let mut trades = Vec::new();
let base_price = 23000.0;
let base_time = 1659312000000;
for i in 0..1000 {
let price_variation = (i as f64 * 0.1).sin() * 50.0; let price = base_price + price_variation;
let timestamp = base_time + (i as u64 * 1_000_000);
trades.push(create_test_trade(1000000 + i as u64, price, timestamp));
}
trades
}
fn create_exact_threshold_test_data() -> Vec<AggTrade> {
let base_price = 23000.0;
let threshold = 0.0025; let upper_exact = base_price * (1.0 + threshold);
let lower_exact = base_price * (1.0 - threshold);
vec![
create_test_trade(1000000, base_price, 1659312000000),
create_test_trade(1000001, upper_exact, 1659312001000),
create_test_trade(1000002, base_price, 1659312002000),
create_test_trade(1000003, lower_exact, 1659312003000),
]
}
fn create_micro_movement_test_data() -> Vec<AggTrade> {
let mut trades = Vec::new();
let base_price = 23000.0;
for i in 0..100 {
let price = base_price + (i as f64 * 0.01);
trades.push(create_test_trade(
1000000 + i as u64,
price,
1659312000000 + i as u64 * 100,
));
}
trades
}
fn create_large_gap_test_data() -> Vec<AggTrade> {
vec![
create_test_trade(1000000, 23000.0, 1659312000000),
create_test_trade(1000001, 25000.0, 1659312001000), create_test_trade(1000002, 21000.0, 1659312002000), create_test_trade(1000003, 23000.0, 1659312003000), ]
}
fn create_rapid_reversal_test_data() -> Vec<AggTrade> {
let mut trades = Vec::new();
let base_price = 23000.0;
for i in 0..50 {
let price = if i % 2 == 0 {
base_price + 200.0 } else {
base_price - 200.0 };
trades.push(create_test_trade(
1000000 + i as u64,
price,
1659312000000 + i as u64 * 50,
));
}
trades
}
fn create_breach_test_data() -> Vec<AggTrade> {
let base_price = 23000.0;
let threshold = 0.0025; let breach_up = base_price * (1.0 + threshold + 0.001); let breach_down = base_price * (1.0 - threshold - 0.001);
vec![
create_test_trade(1000000, base_price, 1659312000000),
create_test_trade(1000001, base_price + 50.0, 1659312001000),
create_test_trade(1000002, breach_up, 1659312002000), create_test_trade(1000003, breach_up, 1659312003000), create_test_trade(1000004, breach_down, 1659312004000), ]
}
fn create_test_trade(id: u64, price: f64, timestamp: u64) -> AggTrade {
let price_str = format!("{:.8}", price);
AggTrade {
agg_trade_id: id as i64,
price: FixedPoint::from_str(&price_str).unwrap(),
volume: FixedPoint::from_str("1.0").unwrap(),
first_trade_id: id as i64,
last_trade_id: id as i64,
timestamp: timestamp as i64,
is_buyer_maker: false,
is_best_match: None,
}
}
fn process_batch_style(trades: &[AggTrade], threshold_decimal_bps: u32) -> Vec<RangeBar> {
let mut processor = ExportRangeBarProcessor::new(threshold_decimal_bps)
.expect("Failed to create processor with valid threshold");
processor.process_trades_continuously(trades);
let mut bars = processor.get_all_completed_bars();
if let Some(incomplete) = processor.get_incomplete_bar() {
bars.push(incomplete);
}
bars
}
async fn process_streaming_style(trades: &[AggTrade], threshold_decimal_bps: u32) -> Vec<RangeBar> {
let temp_dir = std::env::temp_dir().join(format!("rangebar_test_{}", std::process::id()));
fs::create_dir_all(&temp_dir).expect("Failed to create temp directory");
let test_file = temp_dir.join("test_data.csv");
write_trades_to_csv(&test_file, trades).expect("Failed to write test data");
let mut range_processor = ExportRangeBarProcessor::new(threshold_decimal_bps)
.expect("Failed to create processor with valid threshold");
let chunk_size = 1000;
let mut all_bars = Vec::new();
for chunk in trades.chunks(chunk_size) {
range_processor.process_trades_continuously(chunk);
let chunk_bars = range_processor.get_all_completed_bars();
all_bars.extend(chunk_bars);
}
if let Some(incomplete) = range_processor.get_incomplete_bar() {
all_bars.push(incomplete);
}
all_bars
}
fn write_trades_to_csv(
file_path: &Path,
trades: &[AggTrade],
) -> Result<(), Box<dyn std::error::Error>> {
use std::fs::File;
use std::io::Write;
let mut file = File::create(file_path)?;
writeln!(file, "a,p,q,f,l,T,m")?;
for trade in trades {
writeln!(
file,
"{},{},{},{},{},{},{}",
trade.agg_trade_id,
trade.price,
trade.volume,
trade.first_trade_id,
trade.last_trade_id,
trade.timestamp,
trade.is_buyer_maker
)?;
}
Ok(())
}
fn validate_temporal_integrity(bars: &[RangeBar], test_name: &str) {
for (i, bar) in bars.iter().enumerate() {
assert!(
bar.close_time >= bar.open_time,
"{}: Bar {} has close_time before open_time",
test_name,
i
);
if i > 0 {
assert!(
bar.open_time >= bars[i - 1].close_time,
"{}: Bar {} starts before previous bar ends",
test_name,
i
);
}
}
}
fn validate_breach_consistency(bars: &[RangeBar], threshold_decimal_bps: u32) -> usize {
let mut violations = 0;
let threshold_fraction = threshold_decimal_bps as f64 / 10000.0;
for (i, bar) in bars.iter().enumerate() {
let open_price = bar.open.to_f64();
let high_price = bar.high.to_f64();
let low_price = bar.low.to_f64();
let close_price = bar.close.to_f64();
let upper_threshold = open_price * (1.0 + threshold_fraction);
let lower_threshold = open_price * (1.0 - threshold_fraction);
let high_breaches = high_price >= upper_threshold;
let low_breaches = low_price <= lower_threshold;
let close_breaches_up = close_price >= upper_threshold;
let close_breaches_down = close_price <= lower_threshold;
if high_breaches && !close_breaches_up {
println!(
"❌ Bar {}: High breaches upper ({:.2} >= {:.2}) but close doesn't ({:.2})",
i, high_price, upper_threshold, close_price
);
violations += 1;
}
if low_breaches && !close_breaches_down {
println!(
"❌ Bar {}: Low breaches lower ({:.2} <= {:.2}) but close doesn't ({:.2})",
i, low_price, lower_threshold, close_price
);
violations += 1;
}
}
violations
}
#[tokio::test]
async fn test_memory_efficiency_comparison() {
println!("🔍 Testing memory efficiency comparison");
let large_dataset = create_large_test_dataset(50000); let threshold_decimal_bps = 25;
println!(" Testing with {} trades", large_dataset.len());
let start_time = std::time::Instant::now();
let batch_bars = process_batch_style(&large_dataset, threshold_decimal_bps);
let batch_duration = start_time.elapsed();
println!(" Batch: {} bars in {:?}", batch_bars.len(), batch_duration);
let start_time = std::time::Instant::now();
let streaming_bars = process_streaming_style(&large_dataset, threshold_decimal_bps).await;
let streaming_duration = start_time.elapsed();
println!(
" Streaming: {} bars in {:?}",
streaming_bars.len(),
streaming_duration
);
assert!(
!batch_bars.is_empty(),
"Batch processing should produce bars"
);
assert!(
!streaming_bars.is_empty(),
"Streaming processing should produce bars"
);
println!(" ✅ Both implementations handle large datasets");
}
fn create_large_test_dataset(count: usize) -> Vec<AggTrade> {
let mut trades = Vec::with_capacity(count);
let base_price = 23000.0;
let base_time = 1659312000000;
for i in 0..count {
let price_change = ((i as f64 * 0.01).sin() * 100.0) + ((i as f64 * 0.001).cos() * 50.0); let price = base_price + price_change;
let timestamp = base_time + (i as u64 * 100_000);
trades.push(create_test_trade(1000000 + i as u64, price, timestamp));
}
trades
}