use std::io;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::thread;
use crate::api::raft_log_writer::RaftLogWriter;
use crate::api::raft_log_writer::blocking_flush;
use crate::tests::context::TestContext;
#[test]
fn test_concurrent_read_race_condition() -> Result<(), io::Error> {
let mut ctx = TestContext::new()?;
let config = &mut ctx.config;
config.chunk_max_records = Some(5);
config.log_cache_capacity = Some(0);
let raft_log = {
let mut rl = ctx.new_raft_log()?;
let num_entries = 50;
for i in 0..num_entries {
let payload = format!("payload_{:04}", i);
rl.append([((1, i), payload)])?;
}
blocking_flush(&mut rl)?;
drop(rl);
ctx.new_raft_log()?
};
let raft_log = Arc::new(raft_log);
let num_threads = 8;
let iterations_per_thread = 100;
let num_entries = 50u64;
let error_count = Arc::new(AtomicUsize::new(0));
let mismatch_count = Arc::new(AtomicUsize::new(0));
let panic_count = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for thread_id in 0..num_threads {
let rl = raft_log.clone();
let errors = error_count.clone();
let mismatches = mismatch_count.clone();
let panics = panic_count.clone();
let handle = thread::spawn(move || {
for iter in 0..iterations_per_thread {
let start_idx =
((thread_id + iter) % num_entries as usize) as u64;
let end_idx = (start_idx + 5).min(num_entries);
let result = std::panic::catch_unwind(
std::panic::AssertUnwindSafe(|| {
for result in rl.read(start_idx, end_idx) {
match result {
Ok((log_id, payload)) => {
let expected_index = log_id.1;
let expected_payload = format!(
"payload_{:04}",
expected_index
);
if payload != expected_payload {
mismatches
.fetch_add(1, Ordering::Relaxed);
}
}
Err(_e) => {
errors.fetch_add(1, Ordering::Relaxed);
}
}
}
}),
);
if result.is_err() {
panics.fetch_add(1, Ordering::Relaxed);
}
}
});
handles.push(handle);
}
for handle in handles {
let _ = handle.join();
}
let total_errors = error_count.load(Ordering::Relaxed);
let total_mismatches = mismatch_count.load(Ordering::Relaxed);
let total_panics = panic_count.load(Ordering::Relaxed);
println!(
"Concurrent read test completed: {} errors, {} mismatches, {} panics",
total_errors, total_mismatches, total_panics
);
assert_eq!(
total_errors + total_mismatches + total_panics,
0,
"Race condition detected: {} errors, {} data mismatches, {} panics",
total_errors,
total_mismatches,
total_panics
);
Ok(())
}