use memmap2::MmapMut;
use parking_lot::RwLock;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use sochdb_core::{Result, SochDBError};
use super::{CompressionType, PayloadIndex, PayloadMeta};
const MAGIC: [u8; 8] = *b"SOCHIDX2";
const VERSION: u32 = 1;
const HEADER_SIZE: u64 = 64;
const SLOT_SIZE: u64 = 40;
const TAG_EMPTY: u8 = 0x00;
const TAG_OCCUPIED: u8 = 0x01;
const INITIAL_CAPACITY: u64 = 4096;
const MAX_LOAD_FACTOR: f64 = 0.75;
const HEADER_OFF_MAGIC: usize = 0;
const HEADER_OFF_VERSION: usize = 8;
const HEADER_OFF_NUM_SLOTS: usize = 16;
const HEADER_OFF_NUM_ENTRIES: usize = 24;
const HEADER_OFF_SEED: usize = 32;
const SLOT_OFF_TAG: usize = 0;
const SLOT_OFF_COMPRESSION: usize = 1;
const SLOT_OFF_LENGTH: usize = 4;
const SLOT_OFF_UNCOMPRESSED_LEN: usize = 8;
const SLOT_OFF_EDGE_ID: usize = 16;
const SLOT_OFF_OFFSET: usize = 32;
#[inline(always)]
fn hash_u128(key: u128, seed: u64) -> u64 {
let lo = key as u64;
let hi = (key >> 64) as u64;
let mut h = lo ^ seed;
h ^= h >> 30;
h = h.wrapping_mul(0xbf58476d1ce4e5b9);
h ^= h >> 27;
h = h.wrapping_mul(0x94d049bb133111eb);
h ^= h >> 31;
let mut g = hi ^ seed.wrapping_mul(0x9e3779b97f4a7c15); g ^= g >> 30;
g = g.wrapping_mul(0xbf58476d1ce4e5b9);
g ^= g >> 27;
g = g.wrapping_mul(0x94d049bb133111eb);
g ^= g >> 31;
h ^ g
}
pub(crate) struct DiskHashIndex {
mmap: RwLock<MmapMut>,
file: RwLock<std::fs::File>,
path: PathBuf,
num_entries: AtomicU64,
num_slots: AtomicU64,
seed: u64,
}
impl DiskHashIndex {
pub fn new(index_path: PathBuf) -> Result<Self> {
if let Some(parent) = index_path.parent() {
fs::create_dir_all(parent)?;
}
if index_path.exists() && fs::metadata(&index_path)?.len() >= HEADER_SIZE {
Self::open_existing(index_path)
} else {
Self::create_new(index_path, INITIAL_CAPACITY)
}
}
fn create_new(path: PathBuf, capacity: u64) -> Result<Self> {
debug_assert!(capacity.is_power_of_two(), "Capacity must be power of 2");
let file_size = HEADER_SIZE + capacity * SLOT_SIZE;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.map_err(|e| SochDBError::Internal(format!("Failed to create index file: {}", e)))?;
file.set_len(file_size)
.map_err(|e| SochDBError::Internal(format!("Failed to set index file size: {}", e)))?;
let mut mmap = unsafe {
memmap2::MmapOptions::new()
.map_mut(&file)
.map_err(|e| SochDBError::Internal(format!("Failed to mmap index: {}", e)))?
};
let seed = Self::generate_seed();
mmap[HEADER_OFF_MAGIC..HEADER_OFF_MAGIC + 8].copy_from_slice(&MAGIC);
mmap[HEADER_OFF_VERSION..HEADER_OFF_VERSION + 4]
.copy_from_slice(&VERSION.to_le_bytes());
mmap[HEADER_OFF_NUM_SLOTS..HEADER_OFF_NUM_SLOTS + 8]
.copy_from_slice(&capacity.to_le_bytes());
mmap[HEADER_OFF_NUM_ENTRIES..HEADER_OFF_NUM_ENTRIES + 8]
.copy_from_slice(&0u64.to_le_bytes());
mmap[HEADER_OFF_SEED..HEADER_OFF_SEED + 8]
.copy_from_slice(&seed.to_le_bytes());
mmap.flush()
.map_err(|e| SochDBError::Internal(format!("Failed to flush new index: {}", e)))?;
tracing::info!(
capacity = capacity,
file_size_kb = file_size / 1024,
path = %path.display(),
"Created new DiskHashIndex"
);
Ok(Self {
mmap: RwLock::new(mmap),
file: RwLock::new(file),
path,
num_entries: AtomicU64::new(0),
num_slots: AtomicU64::new(capacity),
seed,
})
}
fn open_existing(path: PathBuf) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.map_err(|e| SochDBError::Internal(format!("Failed to open index file: {}", e)))?;
let mmap = unsafe {
memmap2::MmapOptions::new()
.map_mut(&file)
.map_err(|e| SochDBError::Internal(format!("Failed to mmap index: {}", e)))?
};
if mmap.len() < HEADER_SIZE as usize || mmap[0..8] != MAGIC {
return Err(SochDBError::Corruption(
"Invalid DiskHashIndex magic — file corrupt or wrong format. \
Delete the index file to trigger rebuild from payload data."
.into(),
));
}
let version = u32::from_le_bytes(
mmap[HEADER_OFF_VERSION..HEADER_OFF_VERSION + 4]
.try_into()
.unwrap(),
);
if version != VERSION {
return Err(SochDBError::Corruption(format!(
"Unsupported DiskHashIndex version {} (expected {})",
version, VERSION
)));
}
let num_slots = u64::from_le_bytes(
mmap[HEADER_OFF_NUM_SLOTS..HEADER_OFF_NUM_SLOTS + 8]
.try_into()
.unwrap(),
);
let num_entries = u64::from_le_bytes(
mmap[HEADER_OFF_NUM_ENTRIES..HEADER_OFF_NUM_ENTRIES + 8]
.try_into()
.unwrap(),
);
let seed = u64::from_le_bytes(
mmap[HEADER_OFF_SEED..HEADER_OFF_SEED + 8]
.try_into()
.unwrap(),
);
if !num_slots.is_power_of_two() {
return Err(SochDBError::Corruption(format!(
"num_slots {} is not a power of 2 — index corrupt",
num_slots
)));
}
let expected_file_size = HEADER_SIZE + num_slots * SLOT_SIZE;
if (mmap.len() as u64) < expected_file_size {
return Err(SochDBError::Corruption(format!(
"Index file truncated: expected {} bytes, got {}",
expected_file_size,
mmap.len()
)));
}
tracing::info!(
num_entries = num_entries,
num_slots = num_slots,
load_factor = format!("{:.2}", num_entries as f64 / num_slots as f64),
path = %path.display(),
"Opened existing DiskHashIndex"
);
Ok(Self {
mmap: RwLock::new(mmap),
file: RwLock::new(file),
path,
num_entries: AtomicU64::new(num_entries),
num_slots: AtomicU64::new(num_slots),
seed,
})
}
#[inline(always)]
fn slot_offset(slot_index: u64) -> usize {
(HEADER_SIZE + slot_index * SLOT_SIZE) as usize
}
#[inline(always)]
fn read_tag(mmap: &MmapMut, slot_index: u64) -> u8 {
mmap[Self::slot_offset(slot_index) + SLOT_OFF_TAG]
}
#[inline(always)]
fn read_edge_id(mmap: &MmapMut, slot_index: u64) -> u128 {
let base = Self::slot_offset(slot_index) + SLOT_OFF_EDGE_ID;
u128::from_le_bytes(mmap[base..base + 16].try_into().unwrap())
}
#[inline]
fn read_meta(mmap: &MmapMut, slot_index: u64) -> PayloadMeta {
let base = Self::slot_offset(slot_index);
let compression_byte = mmap[base + SLOT_OFF_COMPRESSION];
let length = u32::from_le_bytes(
mmap[base + SLOT_OFF_LENGTH..base + SLOT_OFF_LENGTH + 4]
.try_into()
.unwrap(),
);
let uncompressed_length = u32::from_le_bytes(
mmap[base + SLOT_OFF_UNCOMPRESSED_LEN..base + SLOT_OFF_UNCOMPRESSED_LEN + 4]
.try_into()
.unwrap(),
);
let edge_id = u128::from_le_bytes(
mmap[base + SLOT_OFF_EDGE_ID..base + SLOT_OFF_EDGE_ID + 16]
.try_into()
.unwrap(),
);
let offset = u64::from_le_bytes(
mmap[base + SLOT_OFF_OFFSET..base + SLOT_OFF_OFFSET + 8]
.try_into()
.unwrap(),
);
PayloadMeta {
edge_id,
offset,
length,
compression: CompressionType::from_u8(compression_byte).unwrap_or(CompressionType::None),
uncompressed_length,
}
}
#[inline]
fn write_slot(mmap: &mut MmapMut, slot_index: u64, meta: &PayloadMeta) {
let base = Self::slot_offset(slot_index);
mmap[base + SLOT_OFF_TAG] = TAG_OCCUPIED;
mmap[base + SLOT_OFF_COMPRESSION] = meta.compression as u8;
mmap[base + 2] = 0;
mmap[base + 3] = 0;
mmap[base + SLOT_OFF_LENGTH..base + SLOT_OFF_LENGTH + 4]
.copy_from_slice(&meta.length.to_le_bytes());
mmap[base + SLOT_OFF_UNCOMPRESSED_LEN..base + SLOT_OFF_UNCOMPRESSED_LEN + 4]
.copy_from_slice(&meta.uncompressed_length.to_le_bytes());
mmap[base + 12..base + 16].copy_from_slice(&[0u8; 4]);
mmap[base + SLOT_OFF_EDGE_ID..base + SLOT_OFF_EDGE_ID + 16]
.copy_from_slice(&meta.edge_id.to_le_bytes());
mmap[base + SLOT_OFF_OFFSET..base + SLOT_OFF_OFFSET + 8]
.copy_from_slice(&meta.offset.to_le_bytes());
}
fn write_header_entries(mmap: &mut MmapMut, count: u64) {
mmap[HEADER_OFF_NUM_ENTRIES..HEADER_OFF_NUM_ENTRIES + 8]
.copy_from_slice(&count.to_le_bytes());
}
fn probe(&self, mmap: &MmapMut, edge_id: u128) -> std::result::Result<u64, u64> {
let num_slots = self.num_slots.load(Ordering::Relaxed);
let mask = num_slots - 1; let mut slot = hash_u128(edge_id, self.seed) & mask;
loop {
let tag = Self::read_tag(mmap, slot);
if tag == TAG_EMPTY {
return Err(slot);
}
if tag == TAG_OCCUPIED && Self::read_edge_id(mmap, slot) == edge_id {
return Ok(slot);
}
slot = (slot + 1) & mask;
}
}
fn grow(&self) -> Result<()> {
let old_num_slots = self.num_slots.load(Ordering::Relaxed);
let new_num_slots = old_num_slots
.checked_mul(2)
.ok_or_else(|| SochDBError::Internal("Index capacity overflow".into()))?;
let new_file_size = HEADER_SIZE + new_num_slots * SLOT_SIZE;
tracing::info!(
old_slots = old_num_slots,
new_slots = new_num_slots,
new_file_size_mb = new_file_size / (1024 * 1024),
"DiskHashIndex: growing table"
);
let temp_path = self.path.with_extension("tmp");
let new_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)
.map_err(|e| SochDBError::Internal(format!("Failed to create temp index: {}", e)))?;
new_file.set_len(new_file_size)
.map_err(|e| SochDBError::Internal(format!("Failed to set temp index size: {}", e)))?;
let mut new_mmap = unsafe {
memmap2::MmapOptions::new()
.map_mut(&new_file)
.map_err(|e| SochDBError::Internal(format!("Failed to mmap temp index: {}", e)))?
};
new_mmap[HEADER_OFF_MAGIC..HEADER_OFF_MAGIC + 8].copy_from_slice(&MAGIC);
new_mmap[HEADER_OFF_VERSION..HEADER_OFF_VERSION + 4]
.copy_from_slice(&VERSION.to_le_bytes());
new_mmap[HEADER_OFF_NUM_SLOTS..HEADER_OFF_NUM_SLOTS + 8]
.copy_from_slice(&new_num_slots.to_le_bytes());
new_mmap[HEADER_OFF_SEED..HEADER_OFF_SEED + 8]
.copy_from_slice(&self.seed.to_le_bytes());
let old_mmap = self.mmap.read();
let new_mask = new_num_slots - 1;
let mut rehashed = 0u64;
for old_slot in 0..old_num_slots {
if Self::read_tag(&old_mmap, old_slot) != TAG_OCCUPIED {
continue;
}
let meta = Self::read_meta(&old_mmap, old_slot);
let mut new_slot = hash_u128(meta.edge_id, self.seed) & new_mask;
loop {
if new_mmap[Self::slot_offset(new_slot) + SLOT_OFF_TAG] == TAG_EMPTY {
break;
}
new_slot = (new_slot + 1) & new_mask;
}
Self::write_slot(&mut new_mmap, new_slot, &meta);
rehashed += 1;
}
drop(old_mmap);
Self::write_header_entries(&mut new_mmap, rehashed);
new_mmap
.flush()
.map_err(|e| SochDBError::Internal(format!("Failed to flush grown index: {}", e)))?;
fs::rename(&temp_path, &self.path)
.map_err(|e| SochDBError::Internal(format!("Failed to rename grown index: {}", e)))?;
*self.mmap.write() = new_mmap;
*self.file.write() = new_file;
self.num_slots.store(new_num_slots, Ordering::Release);
tracing::info!(
rehashed = rehashed,
new_slots = new_num_slots,
load_factor = format!("{:.2}", rehashed as f64 / new_num_slots as f64),
"DiskHashIndex: grow complete"
);
Ok(())
}
#[inline]
fn needs_grow(&self) -> bool {
let entries = self.num_entries.load(Ordering::Relaxed);
let slots = self.num_slots.load(Ordering::Relaxed);
(entries + 1) as f64 / slots as f64 > MAX_LOAD_FACTOR
}
fn generate_seed() -> u64 {
let mut seed: u64 = 0;
#[cfg(unix)]
{
let mut ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
unsafe {
libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
}
seed ^= ts.tv_sec as u64;
seed ^= (ts.tv_nsec as u64).wrapping_mul(0x9e3779b97f4a7c15);
}
#[cfg(not(unix))]
{
use std::time::SystemTime;
if let Ok(dur) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
seed ^= dur.as_nanos() as u64;
}
}
seed ^= (std::process::id() as u64).wrapping_mul(0x517cc1b727220a95);
let stack_var: u8 = 0;
seed ^= ((&stack_var as *const u8 as u64) >> 12).wrapping_mul(0x6c62272e07bb0142);
seed ^= seed >> 30;
seed = seed.wrapping_mul(0xbf58476d1ce4e5b9);
seed ^= seed >> 27;
seed = seed.wrapping_mul(0x94d049bb133111eb);
seed ^= seed >> 31;
if seed == 0 {
seed = 0x1234567890abcdef;
}
seed
}
}
impl PayloadIndex for DiskHashIndex {
fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
if self.needs_grow() {
self.grow()?;
}
let mut mmap = self.mmap.write();
match self.probe(&mmap, edge_id) {
Ok(existing_slot) => {
Self::write_slot(&mut mmap, existing_slot, &meta);
}
Err(empty_slot) => {
Self::write_slot(&mut mmap, empty_slot, &meta);
let new_count = self.num_entries.fetch_add(1, Ordering::AcqRel) + 1;
Self::write_header_entries(&mut mmap, new_count);
}
}
Ok(())
}
fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
let mmap = self.mmap.read();
match self.probe(&mmap, edge_id) {
Ok(slot) => Ok(Some(Self::read_meta(&mmap, slot))),
Err(_) => Ok(None),
}
}
fn contains_key(&self, edge_id: u128) -> bool {
let mmap = self.mmap.read();
self.probe(&mmap, edge_id).is_ok()
}
fn len(&self) -> usize {
self.num_entries.load(Ordering::Relaxed) as usize
}
fn is_empty(&self) -> bool {
self.num_entries.load(Ordering::Relaxed) == 0
}
fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
let mmap = self.mmap.read();
let num_slots = self.num_slots.load(Ordering::Relaxed);
let mut entries = Vec::with_capacity(self.num_entries.load(Ordering::Relaxed) as usize);
for slot in 0..num_slots {
if Self::read_tag(&mmap, slot) == TAG_OCCUPIED {
entries.push(Self::read_meta(&mmap, slot));
}
}
Box::new(entries.into_iter())
}
fn save(&self) -> Result<()> {
let mmap = self.mmap.read();
mmap.flush()
.map_err(|e| SochDBError::Internal(format!("DiskHashIndex flush failed: {}", e)))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn make_meta(edge_id: u128, offset: u64) -> PayloadMeta {
PayloadMeta {
edge_id,
offset,
length: 100,
compression: CompressionType::None,
uncompressed_length: 100,
}
}
#[test]
fn test_basic_insert_and_get() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
let idx = DiskHashIndex::new(path).unwrap();
idx.insert(1, make_meta(1, 0)).unwrap();
idx.insert(2, make_meta(2, 100)).unwrap();
let m1 = idx.get(1).unwrap().unwrap();
assert_eq!(m1.edge_id, 1);
assert_eq!(m1.offset, 0);
let m2 = idx.get(2).unwrap().unwrap();
assert_eq!(m2.edge_id, 2);
assert_eq!(m2.offset, 100);
assert!(idx.get(999).unwrap().is_none());
assert_eq!(idx.len(), 2);
}
#[test]
fn test_overwrite() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
let idx = DiskHashIndex::new(path).unwrap();
idx.insert(1, make_meta(1, 0)).unwrap();
assert_eq!(idx.get(1).unwrap().unwrap().offset, 0);
idx.insert(1, make_meta(1, 999)).unwrap();
assert_eq!(idx.get(1).unwrap().unwrap().offset, 999);
assert_eq!(idx.len(), 1);
}
#[test]
fn test_persistence() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
{
let idx = DiskHashIndex::new(path.clone()).unwrap();
idx.insert(1, make_meta(1, 0)).unwrap();
idx.insert(2, make_meta(2, 100)).unwrap();
idx.save().unwrap();
}
{
let idx = DiskHashIndex::new(path).unwrap();
assert_eq!(idx.len(), 2);
assert_eq!(idx.get(1).unwrap().unwrap().offset, 0);
assert_eq!(idx.get(2).unwrap().unwrap().offset, 100);
}
}
#[test]
fn test_grow() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
let idx = DiskHashIndex::new(path).unwrap();
let n = 10_000u128;
for i in 0..n {
idx.insert(i, make_meta(i, i as u64 * 40)).unwrap();
}
assert_eq!(idx.len(), n as usize);
for i in 0..n {
let meta = idx.get(i).unwrap().unwrap();
assert_eq!(meta.edge_id, i);
assert_eq!(meta.offset, i as u64 * 40);
}
}
#[test]
fn test_contains_key() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
let idx = DiskHashIndex::new(path).unwrap();
idx.insert(42, make_meta(42, 0)).unwrap();
assert!(idx.contains_key(42));
assert!(!idx.contains_key(43));
}
#[test]
fn test_iter_values() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
let idx = DiskHashIndex::new(path).unwrap();
for i in 0..100u128 {
idx.insert(i, make_meta(i, i as u64)).unwrap();
}
let values: Vec<_> = idx.iter_values().collect();
assert_eq!(values.len(), 100);
let mut ids: Vec<u128> = values.iter().map(|m| m.edge_id).collect();
ids.sort();
let expected: Vec<u128> = (0..100).collect();
assert_eq!(ids, expected);
}
#[test]
fn test_compression_types() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
let idx = DiskHashIndex::new(path).unwrap();
let mut meta = make_meta(1, 0);
meta.compression = CompressionType::LZ4;
meta.uncompressed_length = 500;
meta.length = 200;
idx.insert(1, meta).unwrap();
let retrieved = idx.get(1).unwrap().unwrap();
assert_eq!(retrieved.compression, CompressionType::LZ4);
assert_eq!(retrieved.uncompressed_length, 500);
assert_eq!(retrieved.length, 200);
}
#[test]
fn test_large_edge_ids() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
let idx = DiskHashIndex::new(path).unwrap();
let ids = [
u128::MAX,
u128::MAX - 1,
1u128 << 64,
(1u128 << 64) + 1,
0u128,
u128::MAX / 2,
];
for (i, &id) in ids.iter().enumerate() {
idx.insert(id, make_meta(id, i as u64 * 100)).unwrap();
}
for (i, &id) in ids.iter().enumerate() {
let meta = idx.get(id).unwrap().unwrap();
assert_eq!(meta.edge_id, id);
assert_eq!(meta.offset, i as u64 * 100);
}
}
#[test]
fn test_is_empty() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
let idx = DiskHashIndex::new(path).unwrap();
assert!(idx.is_empty());
idx.insert(1, make_meta(1, 0)).unwrap();
assert!(!idx.is_empty());
}
#[test]
fn test_grow_preserves_persistence() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.idx");
{
let idx = DiskHashIndex::new(path.clone()).unwrap();
for i in 0..5000u128 {
idx.insert(i, make_meta(i, i as u64)).unwrap();
}
idx.save().unwrap();
}
{
let idx = DiskHashIndex::new(path).unwrap();
assert_eq!(idx.len(), 5000);
for i in 0..5000u128 {
assert_eq!(idx.get(i).unwrap().unwrap().offset, i as u64);
}
}
}
#[test]
fn test_hash_distribution() {
let seed = 0xdeadbeef_u64;
let n = 10_000;
let num_buckets = 1024u64;
let mut counts = vec![0u32; num_buckets as usize];
for i in 0..n {
let h = hash_u128(i as u128, seed);
counts[(h & (num_buckets - 1)) as usize] += 1;
}
let expected = n as f64 / num_buckets as f64; let max_count = *counts.iter().max().unwrap() as f64;
let min_count = *counts.iter().min().unwrap() as f64;
assert!(
max_count / expected < 3.0,
"Hash distribution too skewed: max={}, expected={}",
max_count,
expected
);
assert!(
min_count > 0.0,
"Hash has empty buckets — degenerate distribution"
);
}
}