use std::{
fs::{self, File, OpenOptions},
io::{BufWriter, Write},
path::Path,
};
use kimberlite_types::Offset;
use crate::StorageError;
const MAGIC: &[u8; 4] = b"VDXI";
const VERSION: u8 = 0x01;
const RESERVED: [u8; 3] = [0u8; 3];
const MAGIC_SIZE: usize = 4;
const VERSION_SIZE: usize = 1;
const RESERVED_SIZE: usize = 3;
const COUNT_SIZE: usize = 8; const POSITION_SIZE: usize = 8; const CRC_SIZE: usize = 4;
const HEADER_SIZE: usize = MAGIC_SIZE + VERSION_SIZE + RESERVED_SIZE + COUNT_SIZE;
pub const MAX_WAL_BYTES: u64 = 256 * 1024 * 1024;
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct OffsetIndex {
positions: Vec<u64>,
}
impl OffsetIndex {
pub fn new() -> Self {
Self::default()
}
pub fn append(&mut self, byte_position: u64) {
debug_assert!(
self.positions
.last()
.is_none_or(|&last| byte_position > last),
"byte_position {} must be greater than last position {:?}",
byte_position,
self.positions.last()
);
let prev_len = self.positions.len();
self.positions.push(byte_position);
debug_assert_eq!(self.positions.len(), prev_len + 1);
}
#[must_use]
pub fn lookup(&self, offset: Offset) -> Option<u64> {
self.positions.get(offset.as_usize()).copied()
}
#[must_use]
pub fn len(&self) -> usize {
self.positions.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.positions.is_empty()
}
pub fn from_positions(positions: Vec<u64>) -> Self {
debug_assert!(
positions.windows(2).all(|w| w[0] < w[1]),
"positions must be monotonically increasing"
);
Self { positions }
}
#[must_use]
pub fn positions(&self) -> &[u64] {
&self.positions
}
pub fn save_incremental(
&self,
path: &Path,
new_entries_start: usize,
compact_threshold_bytes: u64,
) -> Result<(), StorageError> {
let wal_path = wal_path_for(path);
let new_entries = &self.positions[new_entries_start..];
if new_entries.is_empty() {
return Ok(());
}
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&wal_path)?;
let mut buf = Vec::with_capacity(new_entries.len() * POSITION_SIZE);
for pos in new_entries {
buf.extend_from_slice(&pos.to_le_bytes());
}
file.write_all(&buf)?;
file.flush()?;
let wal_size_bytes = file.metadata()?.len();
if wal_size_bytes >= compact_threshold_bytes {
self.save(path)?;
let _ = fs::remove_file(&wal_path);
}
Ok(())
}
pub fn load_with_wal(path: &Path) -> Result<Self, StorageError> {
let mut index = Self::load(path)?;
let wal_path = wal_path_for(path);
if wal_path.exists() {
let wal_data = fs::read(&wal_path)?;
let entry_count = wal_data.len() / POSITION_SIZE;
for i in 0..entry_count {
let start = i * POSITION_SIZE;
let pos_bytes: [u8; POSITION_SIZE] = wal_data[start..start + POSITION_SIZE]
.try_into()
.expect("slice length equals POSITION_SIZE");
let byte_position = u64::from_le_bytes(pos_bytes);
index.positions.push(byte_position);
}
}
Ok(index)
}
pub fn flushed_count(path: &Path) -> Result<usize, StorageError> {
let index = Self::load(path)?;
Ok(index.len())
}
pub fn save(&self, path: &Path) -> Result<(), StorageError> {
let positions_size = self.positions.len() * POSITION_SIZE;
let total_size = HEADER_SIZE + positions_size + CRC_SIZE;
let mut buf: Vec<u8> = Vec::with_capacity(total_size);
buf.extend_from_slice(MAGIC);
buf.extend_from_slice(&[VERSION]);
buf.extend_from_slice(&RESERVED);
buf.extend_from_slice(&(self.positions.len() as u64).to_le_bytes());
for pos in &self.positions {
buf.extend_from_slice(&pos.to_le_bytes());
}
let checksum = kimberlite_crypto::crc32(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
debug_assert_eq!(buf.len(), total_size, "buffer size mismatch");
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
writer.write_all(&buf)?;
writer.flush()?;
Ok(())
}
pub fn load(path: &Path) -> Result<Self, StorageError> {
let data = fs::read(path)?;
if data.len() < HEADER_SIZE + CRC_SIZE {
return Err(StorageError::IndexTruncated {
expected: HEADER_SIZE + CRC_SIZE,
actual: data.len(),
});
}
let magic: [u8; MAGIC_SIZE] = data[0..MAGIC_SIZE]
.try_into()
.expect("slice length equals MAGIC_SIZE after bounds check");
if &magic != MAGIC {
return Err(StorageError::InvalidIndexMagic);
}
let version = data[MAGIC_SIZE];
if version != VERSION {
return Err(StorageError::UnsupportedIndexVersion(version));
}
let count_start = MAGIC_SIZE + VERSION_SIZE + RESERVED_SIZE;
let count_bytes: [u8; COUNT_SIZE] = data[count_start..count_start + COUNT_SIZE]
.try_into()
.expect("slice length equals COUNT_SIZE after bounds check");
let count = u64::from_le_bytes(count_bytes) as usize;
let positions_size = count * POSITION_SIZE;
let expected_size = HEADER_SIZE + positions_size + CRC_SIZE;
if data.len() < expected_size {
return Err(StorageError::IndexTruncated {
expected: expected_size,
actual: data.len(),
});
}
let crc_start = HEADER_SIZE + positions_size;
let stored_crc_bytes: [u8; CRC_SIZE] = data[crc_start..crc_start + CRC_SIZE]
.try_into()
.expect("slice length equals CRC_SIZE after bounds check");
let stored_crc = u32::from_le_bytes(stored_crc_bytes);
let computed_crc = kimberlite_crypto::crc32(&data[0..crc_start]);
if stored_crc != computed_crc {
return Err(StorageError::IndexChecksumMismatch {
expected: stored_crc,
actual: computed_crc,
});
}
let mut positions = Vec::with_capacity(count);
for i in 0..count {
let start = HEADER_SIZE + (i * POSITION_SIZE);
let pos_bytes: [u8; POSITION_SIZE] = data[start..start + POSITION_SIZE]
.try_into()
.expect("slice length equals POSITION_SIZE after bounds check");
positions.push(u64::from_le_bytes(pos_bytes));
}
debug_assert_eq!(positions.len(), count, "position count mismatch");
Ok(Self { positions })
}
}
fn wal_path_for(index_path: &Path) -> std::path::PathBuf {
let mut wal = index_path.as_os_str().to_owned();
wal.push(".wal");
std::path::PathBuf::from(wal)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_max_wal_bytes_constant() {
assert_eq!(MAX_WAL_BYTES, 256 * 1024 * 1024);
assert_eq!(MAX_WAL_BYTES, 268_435_456);
}
#[test]
fn test_wal_compaction_triggers_at_byte_limit() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let mut index = OffsetIndex::new();
let test_threshold = 1024u64; let entries_needed = (test_threshold / POSITION_SIZE as u64) as usize + 1;
for i in 0..entries_needed {
index.append((i * 1000) as u64);
}
index
.save_incremental(&index_path, 0, test_threshold)
.unwrap();
assert!(index_path.exists());
let wal_path = wal_path_for(&index_path);
assert!(!wal_path.exists() || fs::metadata(&wal_path).unwrap().len() == 0);
}
#[test]
fn test_wal_not_compacted_below_threshold() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let mut index = OffsetIndex::new();
for i in 0..10 {
index.append((i * 1000) as u64);
}
let large_threshold = 1024 * 1024 * 1024u64; index
.save_incremental(&index_path, 0, large_threshold)
.unwrap();
let wal_path = wal_path_for(&index_path);
assert!(wal_path.exists());
assert!(fs::metadata(&wal_path).unwrap().len() > 0);
assert!(!index_path.exists());
}
#[test]
fn test_incremental_save_empty_entries() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let index = OffsetIndex::new();
let result = index.save_incremental(&index_path, 0, MAX_WAL_BYTES);
assert!(result.is_ok());
assert!(!index_path.exists());
assert!(!wal_path_for(&index_path).exists());
}
#[test]
fn test_wal_byte_tracking_accuracy() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let mut index = OffsetIndex::new();
let entry_count = 100;
for i in 0..entry_count {
index.append((i * 1000) as u64);
}
index.save_incremental(&index_path, 0, u64::MAX).unwrap();
let wal_path = wal_path_for(&index_path);
let wal_size = fs::metadata(&wal_path).unwrap().len();
let expected_size = (entry_count * POSITION_SIZE) as u64;
assert_eq!(wal_size, expected_size);
}
use proptest::prelude::*;
#[test]
fn prop_wal_compaction_at_threshold() {
proptest!(|(entry_count in 10usize..1000, threshold_kb in 1u64..100)| {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let mut index = OffsetIndex::new();
for i in 0..entry_count {
index.append((i * 1000) as u64);
}
let threshold_bytes = threshold_kb * 1024;
index.save_incremental(&index_path, 0, threshold_bytes).unwrap();
let expected_wal_bytes = (entry_count * POSITION_SIZE) as u64;
let should_compact = expected_wal_bytes >= threshold_bytes;
if should_compact {
prop_assert!(index_path.exists());
} else {
let wal_path = wal_path_for(&index_path);
prop_assert!(wal_path.exists());
}
});
}
#[test]
fn prop_wal_size_alignment() {
proptest!(|(entry_count in 1usize..500)| {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let mut index = OffsetIndex::new();
for i in 0..entry_count {
index.append((i * 1000) as u64);
}
index.save_incremental(&index_path, 0, u64::MAX).unwrap();
let wal_path = wal_path_for(&index_path);
if wal_path.exists() {
let wal_size = fs::metadata(&wal_path).unwrap().len() as usize;
prop_assert_eq!(wal_size % POSITION_SIZE, 0);
}
});
}
#[test]
fn prop_compaction_correctness() {
proptest!(|(entry_count in 50usize..200)| {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let mut index = OffsetIndex::new();
for i in 0..entry_count {
index.append((i * 1000) as u64);
}
index.save_incremental(&index_path, 0, 1).unwrap();
let loaded = OffsetIndex::load(&index_path).unwrap();
prop_assert_eq!(loaded.len(), entry_count);
for (i, &pos) in loaded.positions.iter().enumerate() {
prop_assert_eq!(pos, (i * 1000) as u64);
}
});
}
#[test]
fn prop_max_wal_bytes_enforcement() {
proptest!(|(entry_count in 100usize..10000)| {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let mut index = OffsetIndex::new();
for i in 0..entry_count {
index.append((i * 1000) as u64);
}
index.save_incremental(&index_path, 0, MAX_WAL_BYTES).unwrap();
let wal_path = wal_path_for(&index_path);
if wal_path.exists() {
let wal_size = fs::metadata(&wal_path).unwrap().len();
prop_assert!(wal_size < MAX_WAL_BYTES);
}
});
}
}