use rawdb::Database;
use sha2::{Digest, Sha256};
use std::fs;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use tempfile::TempDir;
use vecdb::{
AnyStoredVec, ImportOptions, ImportableVec, RawStrategy, ReadWriteRawVec, ReadableVec, Reader,
Result, Stamp, StoredVec, Version, WritableVec,
};
pub trait IntegrityVec: StoredVec<I = usize, T = u32> + DerefMut + ImportableVec + Sized
where
Self::Target: IntegrityOps,
{
fn import_with_changes<'a>(
db: &'a Database,
name: &'a str,
changes: u16,
) -> Result<(Self, ImportOptions<'a>)>;
}
pub trait IntegrityOps {
fn update(&mut self, index: usize, value: u32) -> Result<()>;
fn take(&mut self, index: usize) -> Result<Option<u32>>;
fn stamped_write_with_changes(&mut self, stamp: Stamp) -> Result<()>;
fn stamp(&self) -> Stamp;
fn collect(&self) -> Vec<u32>;
fn collect_holed(&self) -> Result<Vec<Option<u32>>>;
fn get_any_or_read(&self, index: usize, reader: &Reader) -> Result<Option<u32>>;
fn create_reader(&self) -> Reader;
}
impl<S> IntegrityOps for ReadWriteRawVec<usize, u32, S>
where
S: RawStrategy<u32>,
{
fn update(&mut self, index: usize, value: u32) -> Result<()> {
ReadWriteRawVec::update(self, index, value)
}
fn take(&mut self, index: usize) -> Result<Option<u32>> {
let reader = self.create_reader();
let result = ReadWriteRawVec::take(self, index, &reader);
drop(reader);
result
}
fn stamped_write_with_changes(&mut self, stamp: Stamp) -> Result<()> {
WritableVec::stamped_write_with_changes(self, stamp)
}
fn stamp(&self) -> Stamp {
AnyStoredVec::stamp(self)
}
fn collect(&self) -> Vec<u32> {
ReadableVec::collect(self)
}
fn collect_holed(&self) -> Result<Vec<Option<u32>>> {
ReadWriteRawVec::collect_holed(self)
}
fn get_any_or_read(&self, index: usize, reader: &Reader) -> Result<Option<u32>> {
ReadWriteRawVec::get_any_or_read(self, index, reader)
}
fn create_reader(&self) -> Reader {
ReadWriteRawVec::create_reader(self)
}
}
pub fn setup_test_db() -> Result<(Database, TempDir)> {
let temp_dir = TempDir::new()?;
let db = Database::open(temp_dir.path())?;
Ok((db, temp_dir))
}
fn compute_directory_hash(dir: &Path) -> Result<String, Box<dyn std::error::Error>> {
let mut hasher = Sha256::new();
let mut files: Vec<PathBuf> = Vec::new();
let data_file = dir.join("data");
if data_file.exists() && data_file.is_file() {
files.push(data_file);
}
let regions_dir = dir.join("regions");
if regions_dir.exists() {
fn collect_files(dir: &Path, files: &mut Vec<PathBuf>) {
let Ok(entries) = fs::read_dir(dir) else {
return;
};
for entry in entries.filter_map(|e| e.ok()) {
let path = entry.path();
if path.components().any(|c| c.as_os_str() == "changes") {
continue;
}
if path.is_dir() {
collect_files(&path, files);
} else if path.is_file() {
files.push(path);
}
}
}
collect_files(®ions_dir, &mut files);
}
files.sort();
for file_path in &files {
if let Ok(rel_path) = file_path.strip_prefix(dir) {
hasher.update(rel_path.to_string_lossy().as_bytes());
}
let contents = fs::read(file_path)?;
hasher.update(&contents);
}
let hash = hasher.finalize();
Ok(hash.iter().map(|b| format!("{:02x}", b)).collect())
}
fn run_data_integrity_rollback_flush_reopen<V>() -> Result<(), Box<dyn std::error::Error>>
where
V: IntegrityVec + WritableVec<usize, u32>,
<V as Deref>::Target: IntegrityOps,
{
println!("=== Data Integrity Test: Rollback + Flush + Reopen ===\n");
println!("This test verifies:");
println!(" • Rollback + flush + reopen preserves data correctly");
println!(" • Data readable via both gets and iterators");
println!(" • File hashes track physical layout differences\n");
let (database, temp) = setup_test_db()?;
let test_path = temp.path();
let (mut vec, _options) = V::import_with_changes(&database, "vec", 10)?;
println!("✓ Created vecdb");
println!("\n--- Phase 1: Initial work ---");
for i in 0..5 {
vec.push(i);
}
vec.deref_mut().stamped_write_with_changes(Stamp::new(1))?;
println!("✓ Added values [0, 1, 2, 3, 4] and flushed (stamp 1)");
println!(" Current data: {:?}", vec.deref_mut().collect());
println!("\n--- Phase 2: More work ---");
for i in 5..10 {
vec.push(i);
}
vec.deref_mut().stamped_write_with_changes(Stamp::new(2))?;
println!("✓ Added values [5, 6, 7, 8, 9] and flushed (stamp 2)");
println!(" Current data: {:?}", vec.deref_mut().collect());
println!("\n--- Checkpoint 1 ---");
let checkpoint1_data = vec.deref_mut().collect_holed()?;
let checkpoint1_stamp = vec.deref_mut().stamp();
let checkpoint1_hash = compute_directory_hash(test_path)?;
println!("✓ Saved checkpoint1 at stamp {:?}", checkpoint1_stamp);
println!(" Data: {:?}", checkpoint1_data);
println!(" Length: {}", vec.len());
println!(" File hash: {}", checkpoint1_hash);
println!("\n--- Phase 3: Three more operations ---");
vec.deref_mut().update(2, 100)?;
vec.deref_mut().update(7, 200)?;
vec.deref_mut().stamped_write_with_changes(Stamp::new(3))?;
println!("✓ Operation 1: Updated index 2→100, 7→200 (stamp 3)");
println!(" Current data: {:?}", vec.deref_mut().collect());
vec.push(20);
vec.push(21);
vec.deref_mut().stamped_write_with_changes(Stamp::new(4))?;
println!("✓ Operation 2: Added values [20, 21] (stamp 4)");
println!(" Current data: {:?}", vec.deref_mut().collect());
vec.deref_mut().take(5)?;
vec.push(30);
vec.deref_mut().stamped_write_with_changes(Stamp::new(5))?;
println!("✓ Operation 3: Removed index 5, added value 30 (stamp 5)");
println!(" Current data: {:?}", vec.deref_mut().collect());
println!(" Data with holes: {:?}", vec.deref_mut().collect_holed()?);
println!("\n--- Checkpoint 2 ---");
let checkpoint2_data = vec.deref_mut().collect_holed()?;
let checkpoint2_stamp = vec.deref_mut().stamp();
let checkpoint2_hash = compute_directory_hash(test_path)?;
println!("✓ Saved checkpoint2 at stamp {:?}", checkpoint2_stamp);
println!(" Data: {:?}", checkpoint2_data);
println!(" Length: {}", vec.len());
println!(" File hash: {}", checkpoint2_hash);
println!("\n--- Phase 4: Undo last 3 operations ---");
vec.rollback()?;
println!("✓ Rollback 1: Now at stamp {:?}", vec.deref_mut().stamp());
println!(" Current data: {:?}", vec.deref_mut().collect());
vec.rollback()?;
println!("✓ Rollback 2: Now at stamp {:?}", vec.deref_mut().stamp());
println!(" Current data: {:?}", vec.deref_mut().collect());
vec.rollback()?;
println!("✓ Rollback 3: Now at stamp {:?}", vec.deref_mut().stamp());
println!(" Current data: {:?}", vec.deref_mut().collect());
println!("\n--- Verification 1: After undo (in-memory) ---");
let after_undo_data = vec.deref_mut().collect_holed()?;
let after_undo_stamp = vec.deref_mut().stamp();
println!("In-memory state after rollback:");
println!(
" Stamp: {:?} (expected: {:?})",
after_undo_stamp, checkpoint1_stamp
);
println!(" Data: {:?}", after_undo_data);
assert_eq!(
after_undo_stamp, checkpoint1_stamp,
"Stamp mismatch after undo!"
);
assert_eq!(
after_undo_data, checkpoint1_data,
"In-memory data mismatch after undo!"
);
println!("✓ PASS: In-memory data matches checkpoint1 after undo");
println!("\n--- Step 10: Flush, close, and reopen ---");
vec.deref_mut()
.stamped_write_with_changes(checkpoint1_stamp)?;
let after_flush_hash = compute_directory_hash(test_path)?;
println!("✓ Flushed to disk");
println!(" File hash: {}", after_flush_hash);
if after_flush_hash != checkpoint1_hash {
println!(
" Note: Hash differs from checkpoint1 ({}) due to region allocation bug",
checkpoint1_hash
);
}
drop(vec);
println!("✓ Closed vecdb");
let (mut vec, _options) = V::import_with_changes(&database, "vec", 10)?;
println!("✓ Reopened vecdb");
println!(" Stamp after reopen: {:?}", vec.deref_mut().stamp());
println!(" Length after reopen: {}", vec.len());
println!("\n--- Verification 2: After reopen (using gets) ---");
let reader = vec.deref_mut().create_reader();
let mut data_via_gets = Vec::new();
for i in 0..vec.len() {
let value = vec.deref_mut().get_any_or_read(i, &reader)?;
data_via_gets.push(value);
}
drop(reader);
println!("Data read via gets: {:?}", data_via_gets);
assert_eq!(
data_via_gets, checkpoint1_data,
"Data mismatch reading via gets after reopen!"
);
println!("✓ PASS: Data correct when reading via gets");
println!("\n--- Verification 3: After reopen (using iterator) ---");
let data_via_iter = vec.deref_mut().collect_holed()?;
println!("Data read via iterator: {:?}", data_via_iter);
assert_eq!(
data_via_iter, checkpoint1_data,
"Data mismatch reading via iterator after reopen!"
);
println!("✓ PASS: Data correct when reading via iterator");
let data_via_clean_iter: Vec<u32> = vec.deref_mut().collect();
let expected_clean: Vec<u32> = checkpoint1_data.iter().filter_map(|x| *x).collect();
println!("Data via clean iterator: {:?}", data_via_clean_iter);
assert_eq!(
data_via_clean_iter, expected_clean,
"Data mismatch via clean iterator!"
);
println!("✓ PASS: Data correct when reading via clean iterator");
println!("\n✓ ALL VERIFICATION PASSED: Rollback + flush + reopen preserved data correctly!");
println!("\n--- Phase 5: Redo same 3 operations ---");
vec.deref_mut().update(2, 100)?;
vec.deref_mut().update(7, 200)?;
vec.deref_mut().stamped_write_with_changes(Stamp::new(3))?;
println!("✓ Redo operation 1: Updated index 2→100, 7→200 (stamp 3)");
println!(" Current data: {:?}", vec.deref_mut().collect());
vec.push(20);
vec.push(21);
vec.deref_mut().stamped_write_with_changes(Stamp::new(4))?;
println!("✓ Redo operation 2: Added values [20, 21] (stamp 4)");
println!(" Current data: {:?}", vec.deref_mut().collect());
vec.deref_mut().take(5)?;
vec.push(30);
vec.deref_mut().stamped_write_with_changes(Stamp::new(5))?;
println!("✓ Redo operation 3: Removed index 5, added value 30 (stamp 5)");
println!(" Current data: {:?}", vec.deref_mut().collect());
println!(" Data with holes: {:?}", vec.deref_mut().collect_holed()?);
println!("\n--- Verification 4: After redo (in-memory) ---");
let after_redo_data = vec.deref_mut().collect_holed()?;
let after_redo_stamp = vec.deref_mut().stamp();
println!("In-memory state after redo:");
println!(
" Stamp: {:?} (expected: {:?})",
after_redo_stamp, checkpoint2_stamp
);
println!(" Data: {:?}", after_redo_data);
assert_eq!(
after_redo_stamp, checkpoint2_stamp,
"Stamp mismatch after redo!"
);
assert_eq!(
after_redo_data, checkpoint2_data,
"In-memory data mismatch after redo!"
);
println!("✓ PASS: In-memory data matches checkpoint2 after redo");
println!("\n--- Step 12: Flush, close, and reopen (after redo) ---");
vec.deref_mut()
.stamped_write_with_changes(checkpoint2_stamp)?;
let after_redo_flush_hash = compute_directory_hash(test_path)?;
println!("✓ Flushed to disk");
println!(" File hash: {}", after_redo_flush_hash);
if after_redo_flush_hash == checkpoint2_hash {
println!(" ✓ Hash matches checkpoint2 (operations were deterministic)");
} else {
println!(
" Note: Hash differs from checkpoint2 ({})",
checkpoint2_hash
);
}
drop(vec);
println!("✓ Closed vecdb");
let (mut vec, _options) = V::import_with_changes(&database, "vec", 10)?;
println!("✓ Reopened vecdb");
println!(" Stamp after reopen: {:?}", vec.deref_mut().stamp());
println!(" Length after reopen: {}", vec.len());
println!("\n--- Verification 5: After reopen (using gets) ---");
let reader = vec.deref_mut().create_reader();
let mut data_via_gets = Vec::new();
for i in 0..vec.len() {
let value = vec.deref_mut().get_any_or_read(i, &reader)?;
data_via_gets.push(value);
}
drop(reader);
println!("Data read via gets: {:?}", data_via_gets);
assert_eq!(
data_via_gets, checkpoint2_data,
"Data mismatch reading via gets after reopen!"
);
println!("✓ PASS: Data correct when reading via gets");
println!("\n--- Verification 6: After reopen (using iterator) ---");
let data_via_iter = vec.deref_mut().collect_holed()?;
println!("Data read via iterator: {:?}", data_via_iter);
assert_eq!(
data_via_iter, checkpoint2_data,
"Data mismatch reading via iterator after reopen!"
);
println!("✓ PASS: Data correct when reading via iterator");
let data_via_clean_iter: Vec<u32> = vec.deref_mut().collect();
let expected_clean: Vec<u32> = checkpoint2_data.iter().filter_map(|x| *x).collect();
println!("Data via clean iterator: {:?}", data_via_clean_iter);
assert_eq!(
data_via_clean_iter, expected_clean,
"Data mismatch via clean iterator!"
);
println!("✓ PASS: Data correct when reading via clean iterator");
println!("\n✓ ALL VERIFICATION PASSED: Redo + flush + reopen preserved data correctly!");
println!("\n=== Test Results ===");
println!("\nData integrity:");
println!("✓ Rollback + flush + reopen: Data correctly preserved");
println!("✓ Redo + flush + reopen: Data correctly preserved");
println!("✓ Gets work correctly after reopen");
println!("✓ Iterators (holed and clean) work correctly after reopen");
println!("\nFile layout:");
println!(" Checkpoint1 hash: {}", checkpoint1_hash);
println!(" After rollback: {}", after_flush_hash);
if after_flush_hash != checkpoint1_hash {
println!(" ⚠ Hashes differ due to region allocation bug (regions not rolled back)");
}
println!(" Checkpoint2 hash: {}", checkpoint2_hash);
println!(" After redo: {}", after_redo_flush_hash);
if after_redo_flush_hash == checkpoint2_hash {
println!(" ✓ Hashes match (redo is deterministic)");
}
println!("\n✓ ALL TESTS PASSED - Data integrity maintained across rollback/redo cycle!");
Ok(())
}
mod bytes {
use super::*;
use vecdb::BytesVec;
impl IntegrityVec for BytesVec<usize, u32> {
fn import_with_changes<'a>(
db: &'a Database,
name: &'a str,
changes: u16,
) -> Result<(Self, ImportOptions<'a>)> {
let mut options: ImportOptions = (db, name, Version::TWO).into();
options = options.with_saved_stamped_changes(changes);
let vec = Self::forced_import_with(options)?;
Ok((vec, options))
}
}
#[test]
fn data_integrity_rollback_flush_reopen() -> Result<(), Box<dyn std::error::Error>> {
run_data_integrity_rollback_flush_reopen::<BytesVec<usize, u32>>()
}
}
#[cfg(feature = "zerocopy")]
mod zerocopy {
use super::*;
use vecdb::ZeroCopyVec;
impl IntegrityVec for ZeroCopyVec<usize, u32> {
fn import_with_changes<'a>(
db: &'a Database,
name: &'a str,
changes: u16,
) -> Result<(Self, ImportOptions<'a>)> {
let mut options: ImportOptions = (db, name, Version::TWO).into();
options = options.with_saved_stamped_changes(changes);
let vec = Self::forced_import_with(options)?;
Ok((vec, options))
}
}
#[test]
fn data_integrity_rollback_flush_reopen() -> Result<(), Box<dyn std::error::Error>> {
run_data_integrity_rollback_flush_reopen::<ZeroCopyVec<usize, u32>>()
}
}