use std::collections::HashMap;
use std::hint::spin_loop;
use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
use parking_lot::{Mutex, RwLock};
use crate::Result;
const SHARDS: usize = 64;
const SHARD_MASK: u64 = (SHARDS as u64) - 1;
const INITIAL_SHARD_CAPACITY: usize = 1024;
const GROWTH_NUM: usize = 3;
const GROWTH_DENOM: usize = 4;
const STATE_EMPTY: u8 = 0;
const STATE_OCCUPIED: u8 = 1;
const STATE_TOMBSTONE: u8 = 2;
const STATE_OVERFLOW: u8 = 3;
pub(crate) type KeyHash = u64;
#[derive(Debug, Clone, Copy)]
struct SlotSnapshot {
state: u8,
hash: u64,
offset: u64,
}
#[repr(C)]
struct AtomicSlot {
seq: AtomicU64,
state: AtomicU8,
hash: AtomicU64,
offset: AtomicU64,
}
impl AtomicSlot {
const fn empty() -> Self {
Self {
seq: AtomicU64::new(0),
state: AtomicU8::new(STATE_EMPTY),
hash: AtomicU64::new(0),
offset: AtomicU64::new(0),
}
}
#[inline]
fn read(&self) -> SlotSnapshot {
loop {
let s0 = self.seq.load(Ordering::Acquire);
if s0 & 1 == 1 {
spin_loop();
continue;
}
let state = self.state.load(Ordering::Relaxed);
let hash = self.hash.load(Ordering::Relaxed);
let offset = self.offset.load(Ordering::Relaxed);
std::sync::atomic::compiler_fence(Ordering::Acquire);
let s1 = self.seq.load(Ordering::Acquire);
if s0 == s1 {
return SlotSnapshot {
state,
hash,
offset,
};
}
spin_loop();
}
}
#[inline]
fn write_unconditional(&self, state: u8, hash: u64, offset: u64) {
let original = self.acquire_seqlock();
self.state.store(state, Ordering::Relaxed);
self.hash.store(hash, Ordering::Relaxed);
self.offset.store(offset, Ordering::Relaxed);
self.release_seqlock(original);
}
#[inline]
fn acquire_seqlock(&self) -> u64 {
loop {
let s = self.seq.load(Ordering::Acquire);
if s & 1 == 0
&& self
.seq
.compare_exchange_weak(s, s | 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return s;
}
spin_loop();
}
}
#[inline]
fn release_seqlock(&self, original_even: u64) {
self.seq
.store(original_even.wrapping_add(2), Ordering::Release);
}
#[inline]
fn try_claim(&self, expected_state: u8, new_state: u8, hash: u64, offset: u64) -> bool {
let original = self.acquire_seqlock();
if self.state.load(Ordering::Relaxed) != expected_state {
self.release_seqlock(original);
return false;
}
self.state.store(new_state, Ordering::Relaxed);
self.hash.store(hash, Ordering::Relaxed);
self.offset.store(offset, Ordering::Relaxed);
self.release_seqlock(original);
true
}
#[inline]
fn try_update(&self, expected_hash: u64, new_offset: u64) -> Option<u64> {
let original = self.acquire_seqlock();
if self.state.load(Ordering::Relaxed) != STATE_OCCUPIED
|| self.hash.load(Ordering::Relaxed) != expected_hash
{
self.release_seqlock(original);
return None;
}
let prev = self.offset.load(Ordering::Relaxed);
self.offset.store(new_offset, Ordering::Relaxed);
self.release_seqlock(original);
Some(prev)
}
#[inline]
fn try_tombstone(&self, expected_hash: u64) -> Option<u64> {
let original = self.acquire_seqlock();
if self.state.load(Ordering::Relaxed) != STATE_OCCUPIED
|| self.hash.load(Ordering::Relaxed) != expected_hash
{
self.release_seqlock(original);
return None;
}
let prev = self.offset.load(Ordering::Relaxed);
self.state.store(STATE_TOMBSTONE, Ordering::Relaxed);
self.hash.store(0, Ordering::Relaxed);
self.offset.store(0, Ordering::Relaxed);
self.release_seqlock(original);
Some(prev)
}
#[inline]
fn try_promote_to_overflow(&self, expected_hash: u64) -> bool {
let original = self.acquire_seqlock();
if self.state.load(Ordering::Relaxed) != STATE_OCCUPIED
|| self.hash.load(Ordering::Relaxed) != expected_hash
{
self.release_seqlock(original);
return false;
}
self.state.store(STATE_OVERFLOW, Ordering::Relaxed);
self.offset.store(0, Ordering::Relaxed);
self.release_seqlock(original);
true
}
}
enum ReplaceOutcome {
Done(Option<u64>),
NeedGrow,
Retry,
}
struct ShardInner {
table: Box<[AtomicSlot]>,
capacity: usize,
mask: usize,
occupied: AtomicUsize,
tombstones: AtomicUsize,
}
impl ShardInner {
fn new(capacity: usize) -> Self {
debug_assert!(capacity.is_power_of_two());
let mut v = Vec::with_capacity(capacity);
for _ in 0..capacity {
v.push(AtomicSlot::empty());
}
Self {
table: v.into_boxed_slice(),
capacity,
mask: capacity - 1,
occupied: AtomicUsize::new(0),
tombstones: AtomicUsize::new(0),
}
}
#[inline]
fn probe_index(&self, hash: u64, step: usize) -> usize {
(hash as usize).wrapping_add(step) & self.mask
}
fn over_load_factor(&self) -> bool {
let used = self.occupied.load(Ordering::Acquire) + self.tombstones.load(Ordering::Acquire);
used * GROWTH_DENOM > self.capacity * GROWTH_NUM
}
}
struct Shard {
inner: RwLock<ShardInner>,
overflow: Mutex<OverflowMap>,
}
type OverflowMap = HashMap<u64, Vec<(Vec<u8>, u64)>>;
impl Shard {
fn new() -> Self {
Self {
inner: RwLock::new(ShardInner::new(INITIAL_SHARD_CAPACITY)),
overflow: Mutex::new(OverflowMap::new()),
}
}
fn get(&self, hash: u64, key: &[u8]) -> Option<u64> {
let inner = self.inner.read();
for step in 0..inner.capacity {
let idx = inner.probe_index(hash, step);
let snap = inner.table[idx].read();
match snap.state {
STATE_EMPTY => return None,
STATE_OCCUPIED if snap.hash == hash => return Some(snap.offset),
STATE_OVERFLOW if snap.hash == hash => {
drop(inner);
let overflow = self.overflow.lock();
if let Some(entries) = overflow.get(&hash) {
for (k, off) in entries {
if k.as_slice() == key {
return Some(*off);
}
}
}
return None;
}
_ => continue,
}
}
None
}
fn replace<F>(
&self,
hash: u64,
key: &[u8],
offset: u64,
mut resolve_existing: F,
) -> Result<Option<u64>>
where
F: FnMut(u64) -> Result<Option<Vec<u8>>>,
{
loop {
match self.replace_attempt(hash, key, offset, &mut resolve_existing)? {
ReplaceOutcome::Done(prev) => return Ok(prev),
ReplaceOutcome::NeedGrow => self.grow(),
ReplaceOutcome::Retry => continue,
}
}
}
fn replace_attempt<F>(
&self,
hash: u64,
key: &[u8],
offset: u64,
resolve_existing: &mut F,
) -> Result<ReplaceOutcome>
where
F: FnMut(u64) -> Result<Option<Vec<u8>>>,
{
let inner = self.inner.read();
let cap = inner.capacity;
let mut first_reusable: Option<usize> = None;
for step in 0..cap {
let idx = inner.probe_index(hash, step);
let snap = inner.table[idx].read();
match snap.state {
STATE_EMPTY => {
let (target, expected_state) = match first_reusable {
Some(t) => (t, STATE_TOMBSTONE),
None => (idx, STATE_EMPTY),
};
if !inner.table[target].try_claim(expected_state, STATE_OCCUPIED, hash, offset)
{
return Ok(ReplaceOutcome::Retry);
}
let _ = inner.occupied.fetch_add(1, Ordering::AcqRel);
if expected_state == STATE_TOMBSTONE {
let _ = inner.tombstones.fetch_sub(1, Ordering::AcqRel);
}
return Ok(ReplaceOutcome::Done(None));
}
STATE_TOMBSTONE => {
if first_reusable.is_none() {
first_reusable = Some(idx);
}
continue;
}
STATE_OCCUPIED if snap.hash == hash => {
match resolve_existing(snap.offset)? {
Some(existing_key) if existing_key.as_slice() == key => {
match inner.table[idx].try_update(hash, offset) {
Some(prev) => return Ok(ReplaceOutcome::Done(Some(prev))),
None => return Ok(ReplaceOutcome::Retry),
}
}
Some(existing_key) => {
let existing_offset = snap.offset;
drop(inner);
{
let mut overflow = self.overflow.lock();
let entries = overflow.entry(hash).or_default();
entries.push((existing_key, existing_offset));
entries.push((key.to_vec(), offset));
}
let inner = self.inner.read();
for step2 in 0..inner.capacity {
let idx2 = inner.probe_index(hash, step2);
let snap2 = inner.table[idx2].read();
if snap2.state == STATE_OCCUPIED && snap2.hash == hash {
let _ = inner.table[idx2].try_promote_to_overflow(hash);
break;
}
if snap2.state == STATE_EMPTY {
break;
}
}
return Ok(ReplaceOutcome::Done(None));
}
None => {
match inner.table[idx].try_update(hash, offset) {
Some(prev) => return Ok(ReplaceOutcome::Done(Some(prev))),
None => return Ok(ReplaceOutcome::Retry),
}
}
}
}
STATE_OCCUPIED => continue,
STATE_OVERFLOW if snap.hash == hash => {
drop(inner);
let mut overflow = self.overflow.lock();
let entries = overflow.entry(hash).or_default();
for entry in entries.iter_mut() {
if entry.0.as_slice() == key {
let prev = entry.1;
entry.1 = offset;
return Ok(ReplaceOutcome::Done(Some(prev)));
}
}
entries.push((key.to_vec(), offset));
return Ok(ReplaceOutcome::Done(None));
}
STATE_OVERFLOW => continue,
_ => continue,
}
}
match first_reusable {
Some(target) if !inner.over_load_factor() => {
if !inner.table[target].try_claim(STATE_TOMBSTONE, STATE_OCCUPIED, hash, offset) {
return Ok(ReplaceOutcome::Retry);
}
let _ = inner.occupied.fetch_add(1, Ordering::AcqRel);
let _ = inner.tombstones.fetch_sub(1, Ordering::AcqRel);
Ok(ReplaceOutcome::Done(None))
}
_ => {
drop(inner);
Ok(ReplaceOutcome::NeedGrow)
}
}
}
fn remove(&self, hash: u64, key: &[u8]) -> Option<u64> {
loop {
let inner = self.inner.read();
let mut raced = false;
for step in 0..inner.capacity {
let idx = inner.probe_index(hash, step);
let snap = inner.table[idx].read();
match snap.state {
STATE_EMPTY => return None,
STATE_OCCUPIED if snap.hash == hash => {
match inner.table[idx].try_tombstone(hash) {
Some(prev) => {
let _ = inner.occupied.fetch_sub(1, Ordering::AcqRel);
let _ = inner.tombstones.fetch_add(1, Ordering::AcqRel);
return Some(prev);
}
None => {
raced = true;
break;
}
}
}
STATE_OVERFLOW if snap.hash == hash => {
drop(inner);
let mut overflow = self.overflow.lock();
let mut matched: Option<u64> = None;
if let Some(entries) = overflow.get_mut(&hash) {
let mut take = None;
for (i, (k, off)) in entries.iter().enumerate() {
if k.as_slice() == key {
take = Some((i, *off));
break;
}
}
if let Some((i, off)) = take {
let _ = entries.remove(i);
matched = Some(off);
}
}
return matched;
}
_ => continue,
}
}
if !raced {
return None;
}
}
}
fn len(&self) -> usize {
let inner = self.inner.read();
let primary = inner.occupied.load(Ordering::Acquire);
drop(inner);
let overflow = self.overflow.lock();
let overflow_total: usize = overflow.values().map(Vec::len).sum();
let overflow_hashes = overflow.len();
primary + overflow_total.saturating_sub(overflow_hashes)
}
fn clear(&self) {
let inner = self.inner.write();
for slot in inner.table.iter() {
slot.write_unconditional(STATE_EMPTY, 0, 0);
}
inner.occupied.store(0, Ordering::Release);
inner.tombstones.store(0, Ordering::Release);
drop(inner);
self.overflow.lock().clear();
}
fn collect_offsets(&self, out: &mut Vec<u64>) {
let inner = self.inner.read();
for slot in inner.table.iter() {
let snap = slot.read();
if snap.state == STATE_OCCUPIED {
out.push(snap.offset);
}
}
drop(inner);
for entries in self.overflow.lock().values() {
for (_, off) in entries {
out.push(*off);
}
}
}
fn grow(&self) {
let mut inner = self.inner.write();
if !inner.over_load_factor() {
return;
}
let new_capacity = inner.capacity * 2;
let new = ShardInner::new(new_capacity);
for slot in inner.table.iter() {
let snap = slot.read();
match snap.state {
STATE_OCCUPIED | STATE_OVERFLOW => {
insert_into_fresh_table(&new, snap.state, snap.hash, snap.offset);
}
_ => {}
}
}
*inner = new;
}
}
fn insert_into_fresh_table(inner: &ShardInner, state: u8, hash: u64, offset: u64) {
for step in 0..inner.capacity {
let idx = inner.probe_index(hash, step);
let slot = &inner.table[idx];
let snap = slot.read();
if snap.state == STATE_EMPTY {
slot.write_unconditional(state, hash, offset);
let _ = inner.occupied.fetch_add(1, Ordering::AcqRel);
return;
}
}
debug_assert!(false, "insert_into_fresh_table: probe overflowed");
}
pub(crate) struct Index {
shards: Box<[Shard; SHARDS]>,
}
impl std::fmt::Debug for Index {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Index").field("shards", &SHARDS).finish()
}
}
impl Default for Index {
fn default() -> Self {
Self::new()
}
}
impl Index {
#[must_use]
pub(crate) fn new() -> Self {
let mut v: Vec<Shard> = Vec::with_capacity(SHARDS);
for _ in 0..SHARDS {
v.push(Shard::new());
}
let boxed_slice = v.into_boxed_slice();
let ptr: *mut [Shard; SHARDS] = Box::into_raw(boxed_slice) as *mut [Shard; SHARDS];
let shards: Box<[Shard; SHARDS]> = unsafe { Box::from_raw(ptr) };
Self { shards }
}
#[inline]
#[must_use]
pub(crate) fn hash_key(key: &[u8]) -> KeyHash {
const PRIME_1: u64 = 0xa076_1d64_78bd_642f;
const PRIME_2: u64 = 0xe703_7ed1_a0b4_28db;
let mut h = (key.len() as u64).wrapping_mul(PRIME_1);
let mut bytes = key;
while bytes.len() >= 16 {
let mut b1 = [0_u8; 8];
let mut b2 = [0_u8; 8];
b1.copy_from_slice(&bytes[..8]);
b2.copy_from_slice(&bytes[8..16]);
let w1 = u64::from_le_bytes(b1);
let w2 = u64::from_le_bytes(b2);
let m1 = w1.wrapping_mul(PRIME_1);
let m2 = w2.wrapping_mul(PRIME_2);
h = h.rotate_left(31) ^ m1.wrapping_add(m2.rotate_left(27));
h = h.wrapping_mul(PRIME_1);
bytes = &bytes[16..];
}
if bytes.len() >= 8 {
let mut b = [0_u8; 8];
b.copy_from_slice(&bytes[..8]);
let w = u64::from_le_bytes(b);
h ^= w.wrapping_mul(PRIME_2);
h = h.rotate_left(31).wrapping_mul(PRIME_1);
bytes = &bytes[8..];
}
if bytes.len() >= 4 {
let mut b = [0_u8; 4];
b.copy_from_slice(&bytes[..4]);
let w = u32::from_le_bytes(b) as u64;
h ^= w.wrapping_mul(PRIME_2);
h = h.rotate_left(31).wrapping_mul(PRIME_1);
bytes = &bytes[4..];
}
for &byte in bytes {
h ^= (byte as u64).wrapping_mul(PRIME_2);
h = h.rotate_left(31).wrapping_mul(PRIME_1);
}
h ^= h >> 33;
h = h.wrapping_mul(0xff51_afd7_ed55_8ccd);
h ^= h >> 33;
h = h.wrapping_mul(0xc4ce_b9fe_1a85_ec53);
h ^= h >> 33;
h
}
pub(crate) fn get(&self, hash: KeyHash, key: &[u8]) -> Result<Option<u64>> {
let shard = &self.shards[(hash & SHARD_MASK) as usize];
Ok(shard.get(hash, key))
}
pub(crate) fn replace<F>(
&self,
hash: KeyHash,
key: &[u8],
offset: u64,
resolve_existing: F,
) -> Result<Option<u64>>
where
F: FnMut(u64) -> Result<Option<Vec<u8>>>,
{
let shard = &self.shards[(hash & SHARD_MASK) as usize];
shard.replace(hash, key, offset, resolve_existing)
}
pub(crate) fn remove(&self, hash: KeyHash, key: &[u8]) -> Result<Option<u64>> {
let shard = &self.shards[(hash & SHARD_MASK) as usize];
Ok(shard.remove(hash, key))
}
pub(crate) fn len(&self) -> Result<usize> {
let mut total = 0;
for shard in self.shards.iter() {
total += shard.len();
}
Ok(total)
}
pub(crate) fn clear(&self) -> Result<()> {
for shard in self.shards.iter() {
shard.clear();
}
Ok(())
}
pub(crate) fn collect_offsets(&self) -> Result<Vec<u64>> {
let mut out = Vec::new();
for shard in self.shards.iter() {
shard.collect_offsets(&mut out);
}
Ok(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn no_resolver(_offset: u64) -> Result<Option<Vec<u8>>> {
Ok(None)
}
#[test]
fn insert_and_get_round_trip() {
let idx = Index::new();
let h = Index::hash_key(b"alpha");
assert!(idx
.replace(h, b"alpha", 100, no_resolver)
.unwrap()
.is_none());
assert_eq!(idx.get(h, b"alpha").unwrap(), Some(100));
}
#[test]
fn replace_returns_previous_offset() {
let idx = Index::new();
let h = Index::hash_key(b"alpha");
let _ = idx.replace(h, b"alpha", 100, no_resolver).unwrap();
let resolver = |_off: u64| Ok(Some(b"alpha".to_vec()));
let prev = idx.replace(h, b"alpha", 200, resolver).unwrap();
assert_eq!(prev, Some(100));
assert_eq!(idx.get(h, b"alpha").unwrap(), Some(200));
}
#[test]
fn remove_drops_entry() {
let idx = Index::new();
let h = Index::hash_key(b"alpha");
let _ = idx.replace(h, b"alpha", 100, no_resolver).unwrap();
let prev = idx.remove(h, b"alpha").unwrap();
assert_eq!(prev, Some(100));
assert_eq!(idx.get(h, b"alpha").unwrap(), None);
}
#[test]
fn hash_collision_disambiguates_by_key() {
let idx = Index::new();
let _ = idx.replace(42, b"first", 100, no_resolver).unwrap();
let resolver = |_off: u64| Ok(Some(b"first".to_vec()));
let _ = idx.replace(42, b"second", 200, resolver).unwrap();
assert_eq!(idx.get(42, b"first").unwrap(), Some(100));
assert_eq!(idx.get(42, b"second").unwrap(), Some(200));
assert_eq!(idx.get(42, b"third").unwrap(), None);
}
#[test]
fn len_reflects_total_entries_across_shards() {
let idx = Index::new();
for i in 0_u32..200 {
let key = format!("k{i:04}");
let h = Index::hash_key(key.as_bytes());
let _ = idx
.replace(h, key.as_bytes(), i as u64, no_resolver)
.unwrap();
}
assert_eq!(idx.len().unwrap(), 200);
}
#[test]
fn clear_empties_every_shard() {
let idx = Index::new();
for i in 0_u32..50 {
let key = format!("k{i}");
let h = Index::hash_key(key.as_bytes());
let _ = idx
.replace(h, key.as_bytes(), i as u64, no_resolver)
.unwrap();
}
idx.clear().unwrap();
assert_eq!(idx.len().unwrap(), 0);
}
#[test]
fn fxhash_is_deterministic() {
let h1 = Index::hash_key(b"deterministic");
let h2 = Index::hash_key(b"deterministic");
assert_eq!(h1, h2);
let h3 = Index::hash_key(b"different");
assert_ne!(h1, h3);
}
#[test]
fn growth_triggers_and_preserves_entries() {
let idx = Index::new();
let count = (INITIAL_SHARD_CAPACITY * GROWTH_NUM / GROWTH_DENOM) + 32;
for i in 0_u64..count as u64 {
let hash = (i << 6) & !SHARD_MASK;
let key = format!("k{i:06}");
let _ = idx
.replace(hash, key.as_bytes(), i, |_| {
Ok(Some(key.as_bytes().to_vec()))
})
.unwrap();
}
for i in 0_u64..count as u64 {
let hash = (i << 6) & !SHARD_MASK;
let key = format!("k{i:06}");
assert_eq!(
idx.get(hash, key.as_bytes()).unwrap(),
Some(i),
"lost entry {i} after growth"
);
}
}
#[test]
fn tombstone_is_reused_on_subsequent_insert() {
let idx = Index::new();
let h = Index::hash_key(b"alpha");
let _ = idx.replace(h, b"alpha", 100, no_resolver).unwrap();
let _ = idx.remove(h, b"alpha").unwrap();
assert!(idx
.replace(h, b"alpha", 200, no_resolver)
.unwrap()
.is_none());
assert_eq!(idx.get(h, b"alpha").unwrap(), Some(200));
}
}