use super::compaction::{punch_hole, CompactionContext};
use super::sharded_index::ShardedIndex;
use super::traits::VectorStorage;
use super::MmapStorage;
use memmap2::MmapMut;
use parking_lot::RwLock;
use std::fs::OpenOptions;
use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use tempfile::tempdir;
#[test]
fn test_punch_hole_zeros_target_region() {
let dir = tempdir().expect("Failed to create temp dir");
let file_path = dir.path().join("punch.dat");
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&file_path)
.expect("Failed to open file");
let mut writer = file.try_clone().expect("clone failed");
let pattern = vec![0xABu8; 2048];
writer.write_all(&pattern).expect("write failed");
writer.flush().expect("flush failed");
let result = punch_hole(&file, 512, 512);
assert!(result.is_ok(), "punch_hole should succeed");
let mut reader = file.try_clone().expect("clone failed");
reader.seek(SeekFrom::Start(0)).expect("seek failed");
let mut buf = vec![0u8; 2048];
reader.read_exact(&mut buf).expect("read failed");
assert!(
buf[..512].iter().all(|&b| b == 0xAB),
"Bytes before hole should be untouched"
);
assert!(
buf[512..1024].iter().all(|&b| b == 0),
"Hole region should be zeroed"
);
assert!(
buf[1024..].iter().all(|&b| b == 0xAB),
"Bytes after hole should be untouched"
);
}
#[test]
fn test_punch_hole_at_file_start() {
let dir = tempdir().expect("Failed to create temp dir");
let file_path = dir.path().join("punch_start.dat");
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&file_path)
.expect("open failed");
let mut writer = file.try_clone().expect("clone failed");
writer.write_all(&[0xFF; 1024]).expect("write failed");
writer.flush().expect("flush failed");
let result = punch_hole(&file, 0, 256);
assert!(result.is_ok());
let mut reader = file.try_clone().expect("clone failed");
reader.seek(SeekFrom::Start(0)).expect("seek failed");
let mut buf = vec![0u8; 1024];
reader.read_exact(&mut buf).expect("read failed");
assert!(buf[..256].iter().all(|&b| b == 0), "Start should be zeroed");
assert!(
buf[256..].iter().all(|&b| b == 0xFF),
"Remainder should be untouched"
);
}
#[test]
fn test_punch_hole_zero_length_is_noop() {
let dir = tempdir().expect("Failed to create temp dir");
let file_path = dir.path().join("punch_zero.dat");
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&file_path)
.expect("open failed");
let mut writer = file.try_clone().expect("clone failed");
writer.write_all(&[0xCC; 512]).expect("write failed");
writer.flush().expect("flush failed");
let result = punch_hole(&file, 128, 0);
assert!(result.is_ok());
let mut reader = file.try_clone().expect("clone failed");
reader.seek(SeekFrom::Start(0)).expect("seek failed");
let mut buf = vec![0u8; 512];
reader.read_exact(&mut buf).expect("read failed");
assert!(
buf.iter().all(|&b| b == 0xCC),
"Zero-length punch should not modify data"
);
}
fn storage_with_vectors(dir: &std::path::Path, dimension: usize, ids: &[u64]) -> MmapStorage {
let mut storage = MmapStorage::new(dir, dimension).expect("create storage");
#[allow(clippy::cast_precision_loss)]
for &id in ids {
let vector: Vec<f32> = (0..dimension).map(|d| id as f32 + d as f32).collect();
storage.store(id, &vector).expect("store vector");
}
storage.flush().expect("flush");
storage
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_compact_reclaims_deleted_vectors() {
let dir = tempdir().expect("tempdir");
let dim = 4;
let vector_size = dim * std::mem::size_of::<f32>();
let mut storage = storage_with_vectors(dir.path(), dim, &[1, 2, 3, 4, 5]);
storage.delete(1).expect("delete");
storage.delete(3).expect("delete");
storage.delete(5).expect("delete");
let reclaimed = storage.compact().expect("compact");
assert_eq!(reclaimed, 3 * vector_size);
for &id in &[2u64, 4] {
let v = storage.retrieve(id).expect("retrieve").expect("exists");
let expected: Vec<f32> = (0..dim).map(|d| id as f32 + d as f32).collect();
assert_eq!(v, expected, "Vector {id} data mismatch after compaction");
}
for &id in &[1u64, 3, 5] {
assert!(
storage.retrieve(id).expect("retrieve").is_none(),
"Vector {id} should be absent"
);
}
}
#[test]
fn test_compact_all_deleted_returns_zero() {
let dir = tempdir().expect("tempdir");
let mut storage = storage_with_vectors(dir.path(), 4, &[10, 20]);
storage.delete(10).expect("delete");
storage.delete(20).expect("delete");
let reclaimed = storage.compact().expect("compact");
assert_eq!(reclaimed, 0);
}
#[test]
fn test_compact_no_deletions_returns_zero() {
let dir = tempdir().expect("tempdir");
let mut storage = storage_with_vectors(dir.path(), 4, &[1, 2, 3]);
let reclaimed = storage.compact().expect("compact");
assert_eq!(reclaimed, 0, "No fragmentation means nothing to reclaim");
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_compact_preserves_data_after_reopen() {
let dir = tempdir().expect("tempdir");
let dim = 3;
{
let mut storage = storage_with_vectors(dir.path(), dim, &[1, 2, 3, 4]);
storage.delete(2).expect("delete");
storage.delete(4).expect("delete");
storage.compact().expect("compact");
storage.flush().expect("flush");
}
let storage = MmapStorage::new(dir.path(), dim).expect("reopen");
for &id in &[1u64, 3] {
let v = storage.retrieve(id).expect("retrieve").expect("exists");
let expected: Vec<f32> = (0..dim).map(|d| id as f32 + d as f32).collect();
assert_eq!(v, expected, "Vector {id} mismatch after reopen");
}
assert_eq!(storage.len(), 2);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_compact_then_insert_works() {
let dir = tempdir().expect("tempdir");
let dim = 4;
let mut storage = storage_with_vectors(dir.path(), dim, &[1, 2, 3]);
storage.delete(2).expect("delete");
storage.compact().expect("compact");
let new_vec: Vec<f32> = vec![99.0; dim];
storage.store(100, &new_vec).expect("store after compact");
let retrieved = storage.retrieve(100).expect("retrieve").expect("exists");
assert_eq!(retrieved, new_vec);
assert_eq!(storage.len(), 3); }
#[test]
fn test_fragmentation_ratio_empty_storage() {
let dir = tempdir().expect("tempdir");
let storage = MmapStorage::new(dir.path(), 4).expect("create");
let ratio = storage.fragmentation_ratio();
assert!(
ratio < f64::EPSILON,
"Empty storage should have zero fragmentation, got {ratio}"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_fragmentation_ratio_known_values() {
let dir = tempdir().expect("tempdir");
let dim = 4;
let mut storage = storage_with_vectors(dir.path(), dim, &[1, 2, 3, 4]);
let ratio = storage.fragmentation_ratio();
assert!(ratio < 0.01, "No deletions: expected ~0%, got {ratio}");
storage.delete(1).expect("delete");
let ratio = storage.fragmentation_ratio();
assert!(
(0.20..0.30).contains(&ratio),
"Delete 1/4: expected ~25%, got {ratio}"
);
storage.delete(2).expect("delete");
let ratio = storage.fragmentation_ratio();
assert!(
(0.45..0.55).contains(&ratio),
"Delete 2/4: expected ~50%, got {ratio}"
);
storage.delete(3).expect("delete");
let ratio = storage.fragmentation_ratio();
assert!(
(0.70..0.80).contains(&ratio),
"Delete 3/4: expected ~75%, got {ratio}"
);
}
#[test]
fn test_fragmentation_ratio_zero_after_compact() {
let dir = tempdir().expect("tempdir");
let dim = 4;
let mut storage = storage_with_vectors(dir.path(), dim, &[1, 2, 3, 4]);
storage.delete(1).expect("delete");
storage.delete(3).expect("delete");
storage.compact().expect("compact");
let ratio = storage.fragmentation_ratio();
assert!(
ratio < 0.01,
"After compaction, fragmentation should be ~0%, got {ratio}"
);
}
#[test]
fn test_original_file_survives_if_temp_file_is_removed() {
let dir = tempdir().expect("tempdir");
let dim = 4;
let mut storage = storage_with_vectors(dir.path(), dim, &[1, 2, 3]);
storage.delete(1).expect("delete");
storage.flush().expect("flush");
let data_path = dir.path().join("vectors.dat");
assert!(data_path.exists(), "Original data file should exist");
storage.compact().expect("compact");
assert!(data_path.exists(), "Data file must exist after compaction");
let temp_path = dir.path().join("vectors.dat.tmp");
assert!(
!temp_path.exists(),
"Temp file should be removed after successful compaction"
);
assert_eq!(storage.len(), 2);
assert!(storage.retrieve(2).expect("retrieve").is_some());
assert!(storage.retrieve(3).expect("retrieve").is_some());
}
#[test]
fn test_stale_temp_file_does_not_block_compaction() {
let dir = tempdir().expect("tempdir");
let dim = 4;
let mut storage = storage_with_vectors(dir.path(), dim, &[1, 2, 3, 4]);
let temp_path = dir.path().join("vectors.dat.tmp");
std::fs::write(&temp_path, b"stale leftover").expect("write stale");
storage.delete(1).expect("delete");
let reclaimed = storage.compact().expect("compact should succeed");
assert!(reclaimed > 0, "Should have reclaimed space");
assert!(!temp_path.exists(), "Stale temp file should be cleaned up");
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_backup_file_cleaned_after_successful_compaction() {
let dir = tempdir().expect("tempdir");
let dim = 4;
let mut storage = storage_with_vectors(dir.path(), dim, &[1, 2, 3, 4]);
storage.delete(1).expect("delete");
storage.delete(2).expect("delete");
storage.compact().expect("compact");
let backup_path = dir.path().join("vectors.dat.bak");
assert!(
!backup_path.exists(),
"Backup file should be removed after successful compaction"
);
}
type ContextParts = (
ShardedIndex,
RwLock<MmapMut>,
AtomicUsize,
RwLock<BufWriter<std::fs::File>>,
);
fn build_context_parts(
dir: &std::path::Path,
dimension: usize,
entries: &[(u64, Vec<f32>)],
) -> io::Result<ContextParts> {
std::fs::create_dir_all(dir)?;
let vector_size = dimension * std::mem::size_of::<f32>();
let total_bytes = entries.len() * vector_size;
let file_size = u64::try_from(total_bytes.max(4096)).unwrap_or(4096);
let data_path = dir.join("vectors.dat");
let data_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&data_path)?;
data_file.set_len(file_size)?;
let mut mmap = unsafe { MmapMut::map_mut(&data_file)? };
let index = ShardedIndex::new();
let mut offset = 0usize;
for (id, vec) in entries {
let bytes = crate::storage::vector_bytes::vector_to_bytes(vec);
mmap[offset..offset + vector_size].copy_from_slice(bytes);
index.insert(*id, offset);
offset += vector_size;
}
mmap.flush()?;
let wal_path = dir.join("vectors.wal");
let wal_file = OpenOptions::new()
.append(true)
.create(true)
.open(&wal_path)?;
let wal = BufWriter::new(wal_file);
Ok((
index,
RwLock::new(mmap),
AtomicUsize::new(offset),
RwLock::new(wal),
))
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_context_compact_updates_index_offsets() {
let dir = tempdir().expect("tempdir");
let dim = 3;
let entries: Vec<(u64, Vec<f32>)> = (1..=4)
.map(|id| {
let v: Vec<f32> = (0..dim).map(|d| id as f32 + d as f32).collect();
(id, v)
})
.collect();
let (index, mmap, next_offset, wal) =
build_context_parts(dir.path(), dim, &entries).expect("build");
index.remove(1);
index.remove(3);
let ctx = CompactionContext {
path: dir.path(),
dimension: dim,
index: &index,
mmap: &mmap,
next_offset: &next_offset,
wal: &wal,
initial_size: 4096,
};
let reclaimed = ctx.compact().expect("compact");
let vector_size = dim * std::mem::size_of::<f32>();
assert_eq!(reclaimed, 2 * vector_size);
let offset_2 = index.get(2).expect("id 2 should exist");
let offset_4 = index.get(4).expect("id 4 should exist");
assert_ne!(offset_2, offset_4, "Offsets should be distinct");
let expected_next = 2 * vector_size;
assert_eq!(next_offset.load(Ordering::Relaxed), expected_next);
}
#[test]
fn test_context_fragmentation_ratio_precise() {
let dir = tempdir().expect("tempdir");
let dim = 4;
let vector_size = dim * std::mem::size_of::<f32>();
let entries: Vec<(u64, Vec<f32>)> = (1..=10).map(|id| (id, vec![0.0f32; dim])).collect();
let (index, mmap, next_offset, wal) =
build_context_parts(dir.path(), dim, &entries).expect("build");
index.remove(2);
index.remove(5);
index.remove(8);
let ctx = CompactionContext {
path: dir.path(),
dimension: dim,
index: &index,
mmap: &mmap,
next_offset: &next_offset,
wal: &wal,
initial_size: 4096,
};
let ratio = ctx.fragmentation_ratio();
let active_size = 7 * vector_size;
let current_offset = 10 * vector_size;
#[allow(clippy::cast_precision_loss)]
let expected = 1.0 - (active_size as f64 / current_offset as f64);
assert!(
(ratio - expected).abs() < 1e-10,
"Expected ratio {expected}, got {ratio}"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_concurrent_reads_during_compaction() {
use std::sync::Arc;
use std::thread;
let dir = tempdir().expect("tempdir");
let dim = 4;
let mut storage = storage_with_vectors(dir.path(), dim, &(1..=20).collect::<Vec<_>>());
for id in (1..=20).filter(|x| x % 2 == 0) {
storage.delete(id).expect("delete");
}
let storage = Arc::new(parking_lot::Mutex::new(storage));
let barrier = Arc::new(std::sync::Barrier::new(5));
let mut handles = Vec::new();
for _ in 0..4 {
let s = Arc::clone(&storage);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
for _ in 0..50 {
let guard = s.lock();
for id in (1..=20).filter(|x| x % 2 != 0) {
if let Ok(Some(v)) = guard.retrieve(id) {
assert_eq!(v.len(), dim, "vector dimension mismatch for id={id}");
}
}
drop(guard);
thread::yield_now();
}
}));
}
barrier.wait();
{
let mut guard = storage.lock();
let reclaimed = guard.compact().expect("compact should succeed");
assert!(reclaimed > 0, "should reclaim space from deleted vectors");
}
for h in handles {
h.join().expect("reader thread panicked");
}
let guard = storage.lock();
for id in (1..=20).filter(|x| x % 2 != 0) {
let v = guard.retrieve(id).expect("retrieve").expect("exists");
let expected: Vec<f32> = (0..dim).map(|d| id as f32 + d as f32).collect();
assert_eq!(
v, expected,
"vector {id} corrupted after concurrent compaction"
);
}
}
#[test]
#[allow(clippy::cast_possible_truncation)] fn test_replace_all_readers_see_consistent_state() {
use std::sync::Arc;
use std::thread;
let index = Arc::new(ShardedIndex::new());
for id in 0..100u64 {
index.insert(id, id as usize * 16);
}
let barrier = Arc::new(std::sync::Barrier::new(5));
let mut handles = Vec::new();
for _ in 0..4 {
let idx = Arc::clone(&index);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
for _ in 0..200 {
let len = idx.len();
assert!(len > 0, "reader observed empty index during replace_all");
thread::yield_now();
}
}));
}
barrier.wait();
let mut new_entries = rustc_hash::FxHashMap::default();
for id in 0..50u64 {
new_entries.insert(id, id as usize * 32);
}
index.replace_all(new_entries);
for h in handles {
h.join().expect("reader thread panicked");
}
assert_eq!(index.len(), 50);
for id in 0..50u64 {
assert_eq!(
index.get(id),
Some(id as usize * 32),
"entry {id} has wrong offset after replace_all"
);
}
for id in 50..100u64 {
assert!(
index.get(id).is_none(),
"entry {id} should have been removed by replace_all"
);
}
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_compact_roundtrip_data_integrity() {
let dir = tempdir().expect("tempdir");
let dim = 8;
let ids: Vec<u64> = (1..=50).collect();
let mut storage = storage_with_vectors(dir.path(), dim, &ids);
let survivors: Vec<u64> = ids.iter().copied().filter(|x| x % 3 != 0).collect();
let expected: Vec<(u64, Vec<f32>)> = survivors
.iter()
.map(|&id| {
let v: Vec<f32> = (0..dim).map(|d| id as f32 + d as f32).collect();
(id, v)
})
.collect();
for &id in &ids {
if id % 3 == 0 {
storage.delete(id).expect("delete");
}
}
let reclaimed = storage.compact().expect("compact");
assert!(reclaimed > 0, "should reclaim deleted vector space");
for (id, exp_vec) in &expected {
let got = storage.retrieve(*id).expect("retrieve").expect("exists");
assert_eq!(
&got, exp_vec,
"vector {id} data mismatch after compact roundtrip"
);
}
for &id in &ids {
if id % 3 == 0 {
assert!(
storage.retrieve(id).expect("retrieve").is_none(),
"deleted vector {id} should not exist after compaction"
);
}
}
assert_eq!(storage.len(), survivors.len());
}