use std::{
sync::{
Arc, Barrier,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
thread,
time::Duration,
};
use rawdb::Database;
use tempfile::TempDir;
use vecdb::{
AnyStoredVec, AnyVec, BytesVec, ImportableVec, ReadableVec, Result, StoredVec, Version,
WritableVec,
};
#[cfg(feature = "pco")]
use vecdb::PcoVec;
fn setup_test_db() -> Result<(Database, TempDir)> {
let temp_dir = TempDir::new()?;
let db = Database::open(temp_dir.path())?;
Ok((db, temp_dir))
}
#[test]
fn test_reader_sees_written_data_without_flush() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: BytesVec<usize, u64> = BytesVec::forced_import(&db, "test_vec", version)?;
for i in 0..100u64 {
writer.push(i);
}
writer.write()?;
let r = writer.reader();
assert_eq!(r.len(), 100);
assert_eq!(r.get(0), 0);
assert_eq!(r.get(50), 50);
assert_eq!(r.get(99), 99);
Ok(())
}
#[test]
fn test_reader_sees_new_data_after_write() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: BytesVec<usize, u64> = BytesVec::forced_import(&db, "test_vec", version)?;
for i in 0..50u64 {
writer.push(i);
}
writer.write()?;
let reader = writer.read_only_clone();
assert_eq!(reader.len(), 50);
for i in 50..100u64 {
writer.push(i);
}
assert_eq!(reader.len(), 50);
writer.write()?;
assert_eq!(reader.len(), 100);
let r = reader.reader();
assert_eq!(r.get(99), 99);
assert_eq!(r.get(75), 75);
Ok(())
}
#[test]
fn test_concurrent_read_during_write() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: BytesVec<usize, u64> = BytesVec::forced_import(&db, "test_vec", version)?;
for i in 0..1000u64 {
writer.push(i);
}
writer.write()?;
let reader = writer.read_only_clone();
let barrier = Arc::new(Barrier::new(2));
let reader_barrier = barrier.clone();
let reader_handle = thread::spawn(move || -> Result<()> {
reader_barrier.wait();
for _ in 0..100 {
let len = reader.len();
if len > 0 {
let r = reader.reader();
for i in 0..len.min(100) {
let val = r.try_get(i);
assert!(val.is_some(), "Expected value at index {}", i);
assert_eq!(val.unwrap(), i as u64);
}
}
thread::sleep(Duration::from_micros(100));
}
Ok(())
});
barrier.wait();
for batch in 0..10 {
for i in 0..100u64 {
writer.push(1000 + batch * 100 + i);
}
writer.write()?;
thread::sleep(Duration::from_micros(50));
}
reader_handle.join().unwrap()?;
assert_eq!(writer.len(), 2000);
Ok(())
}
#[test]
fn test_batched_writes_single_flush() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut vec1: BytesVec<usize, u64> = BytesVec::forced_import(&db, "vec1", version)?;
let mut vec2: BytesVec<usize, u64> = BytesVec::forced_import(&db, "vec2", version)?;
let mut vec3: BytesVec<usize, u64> = BytesVec::forced_import(&db, "vec3", version)?;
for i in 0..100u64 {
vec1.push(i);
vec2.push(i * 2);
vec3.push(i * 3);
}
vec1.write()?;
vec2.write()?;
vec3.write()?;
let r1 = vec1.reader();
let r2 = vec2.reader();
let r3 = vec3.reader();
assert_eq!(r1.len(), 100);
assert_eq!(r2.len(), 100);
assert_eq!(r3.len(), 100);
assert_eq!(r1.get(50), 50);
assert_eq!(r2.get(50), 100);
assert_eq!(r3.get(50), 150);
db.flush()?;
drop(r1);
drop(r2);
drop(r3);
let r1 = vec1.reader();
assert_eq!(r1.get(99), 99);
Ok(())
}
#[test]
#[cfg(feature = "pco")]
fn test_pco_concurrent_read_write() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: PcoVec<usize, u64> = PcoVec::forced_import(&db, "pco_vec", version)?;
for i in 0..500u64 {
writer.push(i);
}
writer.write()?;
let reader = writer.read_only_clone();
for i in 500..1000u64 {
writer.push(i);
}
writer.write()?;
assert_eq!(reader.len(), 1000);
assert_eq!(reader.collect_range(0, 1), vec![0]);
assert_eq!(reader.collect_range(500, 501), vec![500]);
assert_eq!(reader.collect_range(999, 1000), vec![999]);
Ok(())
}
#[test]
fn test_reader_isolation_from_pushed() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: BytesVec<usize, u64> = BytesVec::forced_import(&db, "test_vec", version)?;
for i in 0..50u64 {
writer.push(i);
}
writer.write()?;
let reader = writer.read_only_clone();
for i in 50..100u64 {
writer.push(i);
}
assert_eq!(writer.len(), 100);
assert_eq!(writer.pushed_len(), 50);
assert_eq!(reader.len(), 50);
let r = reader.reader();
assert_eq!(r.get(49), 49);
assert_eq!(r.try_get(50), None);
Ok(())
}
#[test]
fn test_memory_ordering_len_vs_data() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: BytesVec<usize, u64> = BytesVec::forced_import(&db, "test_vec", version)?;
for i in 0..100u64 {
writer.push(i);
}
writer.write()?;
let reader = writer.read_only_clone();
let barrier = Arc::new(Barrier::new(2));
let stop = Arc::new(AtomicBool::new(false));
let errors = Arc::new(AtomicUsize::new(0));
let reads = Arc::new(AtomicUsize::new(0));
let reader_barrier = barrier.clone();
let stop_clone = stop.clone();
let errors_clone = errors.clone();
let reads_clone = reads.clone();
let reader_handle = thread::spawn(move || {
reader_barrier.wait();
while !stop_clone.load(Ordering::Relaxed) {
let len = reader.len();
if len > 0 {
let last_idx = len - 1;
let r = reader.reader();
let val = r.get(last_idx);
if val != last_idx as u64 {
eprintln!(
"ERROR: Read wrong value at {}: expected {}, got {}",
last_idx, last_idx, val
);
errors_clone.fetch_add(1, Ordering::Relaxed);
}
reads_clone.fetch_add(1, Ordering::Relaxed);
}
}
});
barrier.wait();
for batch in 0..100 {
for i in 0..10u64 {
let val = 100 + batch * 10 + i;
writer.push(val);
}
writer.write()?;
thread::yield_now();
}
thread::sleep(Duration::from_millis(1));
stop.store(true, Ordering::Relaxed);
reader_handle.join().unwrap();
let error_count = errors.load(Ordering::Relaxed);
let read_count = reads.load(Ordering::Relaxed);
println!("Completed {} reads with {} errors", read_count, error_count);
assert_eq!(error_count, 0, "Memory ordering violation detected!");
assert!(read_count > 0, "Should have completed at least some reads");
assert_eq!(writer.len(), 1100);
Ok(())
}
#[test]
fn test_length_data_consistency_stress() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: BytesVec<usize, u64> = BytesVec::forced_import(&db, "test_vec", version)?;
let reader = writer.read_only_clone();
let stop = Arc::new(AtomicBool::new(false));
let max_len_seen = Arc::new(AtomicUsize::new(0));
let errors = Arc::new(AtomicUsize::new(0));
let stop_clone = stop.clone();
let max_len_clone = max_len_seen.clone();
let errors_clone = errors.clone();
let reader_handle = thread::spawn(move || {
for _ in 0..1000 {
if stop_clone.load(Ordering::Relaxed) {
break;
}
let len = reader.len();
max_len_clone.fetch_max(len, Ordering::Relaxed);
if len > 0 {
let r = reader.reader();
let indices_to_check = [0, len.saturating_sub(1), len / 2];
for &i in &indices_to_check {
if i >= len {
continue;
}
let val = r.get(i);
if val != i as u64 {
errors_clone.fetch_add(1, Ordering::Relaxed);
}
}
}
thread::sleep(Duration::from_micros(10));
}
});
for i in 0..500u64 {
writer.push(i);
if i % 10 == 0 {
writer.write()?;
}
}
writer.write()?;
thread::sleep(Duration::from_millis(10));
stop.store(true, Ordering::Relaxed);
reader_handle.join().unwrap();
let error_count = errors.load(Ordering::Relaxed);
assert_eq!(error_count, 0, "Consistency violation detected!");
let max_len = max_len_seen.load(Ordering::Relaxed);
println!("Reader saw max len: {}", max_len);
assert!(max_len > 0, "Reader should have seen some data");
Ok(())
}
#[test]
fn test_many_readers_one_writer() -> Result<()> {
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: BytesVec<usize, u64> = BytesVec::forced_import(&db, "test_vec", version)?;
for i in 0..100u64 {
writer.push(i);
}
writer.write()?;
let num_readers = 8;
let barrier = Arc::new(Barrier::new(num_readers + 1));
let handles: Vec<_> = (0..num_readers)
.map(|_| {
let reader = writer.read_only_clone();
let b = barrier.clone();
thread::spawn(move || -> Result<()> {
b.wait();
for _ in 0..50 {
let r = reader.reader();
let len = r.len();
for i in 0..len.min(100) {
let val = r.get(i);
assert_eq!(val, i as u64);
}
thread::sleep(Duration::from_micros(10));
}
Ok(())
})
})
.collect();
barrier.wait();
for batch in 0..20 {
for i in 0..50u64 {
writer.push(100 + batch * 50 + i);
}
writer.write()?;
thread::sleep(Duration::from_micros(100));
}
for handle in handles {
handle.join().unwrap()?;
}
assert_eq!(writer.len(), 1100);
Ok(())
}
#[test]
#[ignore] fn test_realworld_stress() -> Result<()> {
use std::time::Instant;
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut vec_a: BytesVec<usize, u64> = BytesVec::forced_import(&db, "metric_a", version)?;
let mut vec_b: BytesVec<usize, u64> = BytesVec::forced_import(&db, "metric_b", version)?;
let mut vec_c: BytesVec<usize, u64> = BytesVec::forced_import(&db, "metric_c", version)?;
for i in 0..1000u64 {
vec_a.push(i);
vec_b.push(i * 2);
vec_c.push(i * 3);
}
vec_a.write()?;
vec_b.write()?;
vec_c.write()?;
db.flush()?;
let reader_a = vec_a.read_only_clone();
let reader_b = vec_b.read_only_clone();
let reader_c = vec_c.read_only_clone();
let stop = Arc::new(AtomicBool::new(false));
let total_reads = Arc::new(AtomicUsize::new(0));
let errors = Arc::new(AtomicUsize::new(0));
let num_readers = 4;
let reader_handles: Vec<_> = (0..num_readers)
.map(|reader_id| {
let r_a = reader_a.clone();
let r_b = reader_b.clone();
let r_c = reader_c.clone();
let stop = stop.clone();
let reads = total_reads.clone();
let errs = errors.clone();
thread::spawn(move || {
let mut local_reads = 0u64;
let mut local_errors = 0u64;
while !stop.load(Ordering::Relaxed) {
let len_a = r_a.len();
let len_b = r_b.len();
let len_c = r_c.len();
let max_len = len_a.max(len_b).max(len_c);
let min_len = len_a.min(len_b).min(len_c);
if max_len - min_len > 100 {
eprintln!(
"Reader {}: Large length discrepancy: a={}, b={}, c={}",
reader_id, len_a, len_b, len_c
);
local_errors += 1;
}
if min_len > 0 {
let ra = r_a.reader();
let rb = r_b.reader();
let rc = r_c.reader();
if ra.get(0) != 0 {
local_errors += 1;
}
let safe_idx = min_len.saturating_sub(1);
let va = ra.get(safe_idx);
if va != safe_idx as u64 {
eprintln!(
"Reader {}: vec_a[{}] = {} (expected {})",
reader_id, safe_idx, va, safe_idx
);
local_errors += 1;
}
if rb.get(safe_idx) != (safe_idx as u64) * 2 {
local_errors += 1;
}
if rc.get(safe_idx) != (safe_idx as u64) * 3 {
local_errors += 1;
}
let mid_idx = min_len / 2;
if ra.get(mid_idx) != mid_idx as u64 {
local_errors += 1;
}
local_reads += 1;
}
thread::sleep(Duration::from_micros(100));
}
reads.fetch_add(local_reads as usize, Ordering::Relaxed);
errs.fetch_add(local_errors as usize, Ordering::Relaxed);
})
})
.collect();
let start = Instant::now();
let num_blocks = 100;
let values_per_block = 50;
let mut current_idx = 1000u64;
println!(
"Starting {} blocks of {} values each...",
num_blocks, values_per_block
);
for block in 0..num_blocks {
for _ in 0..values_per_block {
vec_a.push(current_idx);
vec_b.push(current_idx * 2);
vec_c.push(current_idx * 3);
current_idx += 1;
}
vec_a.write()?;
vec_b.write()?;
vec_c.write()?;
db.flush()?;
if block % 10 == 0 {
println!("Processed block {}/{}", block + 1, num_blocks);
}
thread::sleep(Duration::from_millis(10));
}
let write_duration = start.elapsed();
stop.store(true, Ordering::Relaxed);
thread::sleep(Duration::from_millis(50));
for handle in reader_handles {
handle.join().unwrap();
}
let final_reads = total_reads.load(Ordering::Relaxed);
let final_errors = errors.load(Ordering::Relaxed);
println!("\n=== Results ===");
println!("Write duration: {:?}", write_duration);
println!(
"Final vec lengths: a={}, b={}, c={}",
vec_a.len(),
vec_b.len(),
vec_c.len()
);
println!("Total reader verifications: {}", final_reads);
println!("Errors detected: {}", final_errors);
let expected_len = 1000 + (num_blocks * values_per_block) as usize;
assert_eq!(vec_a.len(), expected_len);
assert_eq!(vec_b.len(), expected_len);
assert_eq!(vec_c.len(), expected_len);
assert_eq!(final_errors, 0, "Data integrity errors detected!");
assert!(
final_reads > 100,
"Should have completed many read verifications"
);
println!("Verifying final data integrity...");
let reader_a_ref = vec_a.create_reader();
let reader_b_ref = vec_b.create_reader();
let reader_c_ref = vec_c.create_reader();
for i in 0..expected_len {
let a = vec_a.read_at(i, &reader_a_ref)?;
let b = vec_b.read_at(i, &reader_b_ref)?;
let c = vec_c.read_at(i, &reader_c_ref)?;
assert_eq!(a, i as u64, "vec_a[{}] incorrect", i);
assert_eq!(b, (i as u64) * 2, "vec_b[{}] incorrect", i);
assert_eq!(c, (i as u64) * 3, "vec_c[{}] incorrect", i);
}
println!("All {} values verified correctly!", expected_len);
Ok(())
}
#[test]
#[ignore] fn test_extended_stress() -> Result<()> {
use std::time::Instant;
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
#[cfg(feature = "pco")]
let mut writer: PcoVec<usize, u64> = PcoVec::forced_import(&db, "stress_vec", version)?;
#[cfg(not(feature = "pco"))]
let mut writer: BytesVec<usize, u64> = BytesVec::forced_import(&db, "stress_vec", version)?;
let stop = Arc::new(AtomicBool::new(false));
let writes_completed = Arc::new(AtomicUsize::new(0));
let reads_completed = Arc::new(AtomicUsize::new(0));
let read_errors = Arc::new(AtomicUsize::new(0));
let num_readers = 1;
let reader_handles: Vec<_> = (0..num_readers)
.map(|_| {
let reader = writer.read_only_clone();
let stop = stop.clone();
let reads = reads_completed.clone();
let errs = read_errors.clone();
thread::spawn(move || {
let mut local_reads = 0usize;
let mut local_errors = 0usize;
while !stop.load(Ordering::Relaxed) {
let len = reader.len();
if len > 0 {
let idx = len - 1;
if let Some(v) = reader.collect_one(idx) {
if v != idx as u64 {
local_errors += 1;
}
local_reads += 1;
} else {
local_errors += 1;
}
let random_idx = (len * 7) / 11; if random_idx < len
&& let Some(v) = reader.collect_one(random_idx)
{
if v != random_idx as u64 {
local_errors += 1;
}
local_reads += 1;
}
}
thread::sleep(Duration::from_micros(100));
}
reads.fetch_add(local_reads, Ordering::Relaxed);
errs.fetch_add(local_errors, Ordering::Relaxed);
})
})
.collect();
let start = Instant::now();
let target_duration = Duration::from_secs(5);
let mut current_idx = 0u64;
let mut batches = 0usize;
println!("Running stress test for {:?}...", target_duration);
while start.elapsed() < target_duration {
let batch_size = 100;
for _ in 0..batch_size {
writer.push(current_idx);
current_idx += 1;
}
writer.write()?;
writes_completed.fetch_add(1, Ordering::Relaxed);
batches += 1;
if batches.is_multiple_of(10) {
println!("Written {} batches, {} values", batches, current_idx);
}
thread::sleep(Duration::from_millis(100));
if batches.is_multiple_of(10) {
db.flush()?;
}
}
db.flush()?;
let write_duration = start.elapsed();
stop.store(true, Ordering::Relaxed);
thread::sleep(Duration::from_millis(100));
for handle in reader_handles {
handle.join().unwrap();
}
let final_writes = writes_completed.load(Ordering::Relaxed);
let final_reads = reads_completed.load(Ordering::Relaxed);
let final_errors = read_errors.load(Ordering::Relaxed);
println!("\n=== Extended Stress Test Results ===");
println!("Duration: {:?}", write_duration);
println!("Total values written: {}", current_idx);
println!("Write batches: {}", final_writes);
println!("Read verifications: {}", final_reads);
println!("Errors: {}", final_errors);
println!(
"Reads per second: {:.0}",
final_reads as f64 / write_duration.as_secs_f64()
);
assert_eq!(writer.len(), current_idx as usize);
assert_eq!(final_errors, 0, "Data integrity errors detected!");
assert!(final_reads > 1000, "Should have many read verifications");
println!("Spot-checking final data...");
for i in [
0,
100,
1000,
current_idx as usize / 2,
current_idx as usize - 1,
] {
if i < writer.len() {
let v = writer.collect_one(i).unwrap();
assert_eq!(v, i as u64, "Value at {} incorrect", i);
}
}
println!("Spot-check passed!");
Ok(())
}
#[test]
#[ignore] fn test_extended_stress_bytes() -> Result<()> {
use std::time::Instant;
let (db, _temp) = setup_test_db()?;
let version = Version::ONE;
let mut writer: BytesVec<usize, u64> =
BytesVec::forced_import(&db, "stress_vec_bytes", version)?;
let stop = Arc::new(AtomicBool::new(false));
let writes_completed = Arc::new(AtomicUsize::new(0));
let reads_completed = Arc::new(AtomicUsize::new(0));
let read_errors = Arc::new(AtomicUsize::new(0));
let num_readers = 8;
let reader_handles: Vec<_> = (0..num_readers)
.map(|_| {
let reader = writer.read_only_clone();
let stop = stop.clone();
let reads = reads_completed.clone();
let errs = read_errors.clone();
thread::spawn(move || {
let mut local_reads = 0usize;
let mut local_errors = 0usize;
while !stop.load(Ordering::Relaxed) {
let r = reader.reader();
let len = r.len();
if len > 0 {
let idx = len - 1;
let v = r.get(idx);
if v != idx as u64 {
local_errors += 1;
}
local_reads += 1;
let random_idx = (len * 7) / 11;
if random_idx < len {
let v = r.get(random_idx);
if v != random_idx as u64 {
local_errors += 1;
}
local_reads += 1;
}
}
}
reads.fetch_add(local_reads, Ordering::Relaxed);
errs.fetch_add(local_errors, Ordering::Relaxed);
})
})
.collect();
let start = Instant::now();
let target_duration = Duration::from_secs(5);
let mut current_idx = 0u64;
let mut batches = 0usize;
println!("Running BytesVec stress test for {:?}...", target_duration);
while start.elapsed() < target_duration {
let batch_size = 10 + (batches % 50);
for _ in 0..batch_size {
writer.push(current_idx);
current_idx += 1;
}
writer.write()?;
writes_completed.fetch_add(1, Ordering::Relaxed);
batches += 1;
if batches.is_multiple_of(10) {
db.flush()?;
}
}
db.flush()?;
let write_duration = start.elapsed();
stop.store(true, Ordering::Relaxed);
thread::sleep(Duration::from_millis(100));
for handle in reader_handles {
handle.join().unwrap();
}
let final_writes = writes_completed.load(Ordering::Relaxed);
let final_reads = reads_completed.load(Ordering::Relaxed);
let final_errors = read_errors.load(Ordering::Relaxed);
println!("\n=== BytesVec Extended Stress Test Results ===");
println!("Duration: {:?}", write_duration);
println!("Total values written: {}", current_idx);
println!("Write batches: {}", final_writes);
println!("Read verifications: {}", final_reads);
println!("Errors: {}", final_errors);
println!(
"Reads per second: {:.0}",
final_reads as f64 / write_duration.as_secs_f64()
);
assert_eq!(writer.len(), current_idx as usize);
assert_eq!(final_errors, 0, "Data integrity errors detected!");
assert!(
final_reads > 10000,
"Should have many read verifications with tight loop"
);
println!("Spot-checking final data...");
let reader_ref = writer.create_reader();
for i in [
0,
100,
1000,
current_idx as usize / 2,
current_idx as usize - 1,
] {
if i < writer.len() {
let v = writer.read_at(i, &reader_ref)?;
assert_eq!(v, i as u64, "Value at {} incorrect", i);
}
}
println!("Spot-check passed!");
Ok(())
}