use std::collections::HashMap;
use std::hash::{BuildHasherDefault, Hasher};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::storage::fxhash;
use crate::storage::page::rid::Rid;
use crate::{Error, Result};
#[derive(Default)]
pub(crate) struct IdentityHasher(u64);
impl Hasher for IdentityHasher {
#[inline]
fn finish(&self) -> u64 {
self.0
}
#[inline]
fn write_u64(&mut self, value: u64) {
self.0 = value;
}
#[inline]
fn write(&mut self, bytes: &[u8]) {
let mut state = self.0;
for &byte in bytes {
state = state.rotate_left(5) ^ u64::from(byte);
state = state.wrapping_mul(0x100_0000_01b3);
}
self.0 = state;
}
}
pub(crate) type IdentityBuildHasher = BuildHasherDefault<IdentityHasher>;
type Shard = HashMap<u64, Slot, IdentityBuildHasher>;
pub(crate) const SHARD_BITS: u32 = 5;
pub(crate) const SHARD_COUNT: usize = 1 << SHARD_BITS;
const SHARD_MASK: u64 = (SHARD_COUNT as u64) - 1;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum Slot {
Single(Rid),
Multi(Vec<Rid>),
}
impl Slot {
pub(crate) fn iter(&self) -> SlotIter<'_> {
match self {
Self::Single(rid) => SlotIter::Single(Some(rid)),
Self::Multi(rids) => SlotIter::Multi(rids.iter()),
}
}
#[must_use]
pub(crate) fn is_empty(&self) -> bool {
matches!(self, Self::Multi(rids) if rids.is_empty())
}
}
pub(crate) enum SlotIter<'a> {
Single(Option<&'a Rid>),
Multi(core::slice::Iter<'a, Rid>),
}
impl<'a> Iterator for SlotIter<'a> {
type Item = &'a Rid;
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Single(slot) => slot.take(),
Self::Multi(iter) => iter.next(),
}
}
}
#[derive(Debug)]
pub(crate) struct Keymap {
shards: Box<[RwLock<Shard>; SHARD_COUNT]>,
}
impl Default for Keymap {
fn default() -> Self {
Self::new()
}
}
impl Keymap {
#[must_use]
pub(crate) fn new() -> Self {
let shards = std::array::from_fn::<_, SHARD_COUNT, _>(|_| RwLock::new(Shard::default()));
Self {
shards: Box::new(shards),
}
}
#[inline]
#[must_use]
pub(crate) fn hash_key(key: &[u8]) -> u64 {
fxhash::hash(key)
}
#[inline]
#[must_use]
pub(crate) const fn shard_for(hash: u64) -> usize {
(hash & SHARD_MASK) as usize
}
pub(crate) fn read_shard(&self, shard_idx: usize) -> Result<RwLockReadGuard<'_, Shard>> {
self.shards[shard_idx]
.read()
.map_err(|_poisoned| Error::LockPoisoned)
}
pub(crate) fn write_shard(&self, shard_idx: usize) -> Result<RwLockWriteGuard<'_, Shard>> {
self.shards[shard_idx]
.write()
.map_err(|_poisoned| Error::LockPoisoned)
}
pub(crate) fn lookup(&self, hash: u64) -> Result<Option<Slot>> {
let shard = self.read_shard(Self::shard_for(hash))?;
Ok(shard.get(&hash).cloned())
}
pub(crate) fn insert(&self, hash: u64, rid: Rid) -> Result<()> {
let mut shard = self.write_shard(Self::shard_for(hash))?;
match shard.get_mut(&hash) {
None => {
let _previous = shard.insert(hash, Slot::Single(rid));
}
Some(Slot::Single(existing)) => {
if *existing != rid {
let old = *existing;
let _replaced = shard.insert(hash, Slot::Multi(vec![old, rid]));
}
}
Some(Slot::Multi(rids)) => {
if !rids.contains(&rid) {
rids.push(rid);
}
}
}
Ok(())
}
pub(crate) fn replace_single(&self, hash: u64, rid: Rid) -> Result<()> {
let mut shard = self.write_shard(Self::shard_for(hash))?;
let _previous = shard.insert(hash, Slot::Single(rid));
Ok(())
}
pub(crate) fn remove(&self, hash: u64, rid: Rid) -> Result<bool> {
let mut shard = self.write_shard(Self::shard_for(hash))?;
let Some(slot) = shard.get_mut(&hash) else {
return Ok(false);
};
match slot {
Slot::Single(existing) => {
if *existing != rid {
return Ok(false);
}
let _removed = shard.remove(&hash);
Ok(true)
}
Slot::Multi(rids) => {
let Some(pos) = rids.iter().position(|r| *r == rid) else {
return Ok(false);
};
let _removed = rids.remove(pos);
if rids.len() == 1 {
let last = rids[0];
let _replaced = shard.insert(hash, Slot::Single(last));
} else if rids.is_empty() {
let _removed = shard.remove(&hash);
}
Ok(true)
}
}
}
pub(crate) fn clear(&self) -> Result<()> {
for shard_idx in 0..SHARD_COUNT {
let mut shard = self.write_shard(shard_idx)?;
shard.clear();
}
Ok(())
}
pub(crate) fn len(&self) -> Result<usize> {
let mut total = 0_usize;
for shard_idx in 0..SHARD_COUNT {
let shard = self.read_shard(shard_idx)?;
total = total.saturating_add(shard.len());
}
Ok(total)
}
pub(crate) fn write_all(&self) -> Result<Vec<RwLockWriteGuard<'_, Shard>>> {
let mut guards = Vec::with_capacity(SHARD_COUNT);
for shard_idx in 0..SHARD_COUNT {
guards.push(self.write_shard(shard_idx)?);
}
Ok(guards)
}
pub(crate) fn read_all(&self) -> Result<Vec<RwLockReadGuard<'_, Shard>>> {
let mut guards = Vec::with_capacity(SHARD_COUNT);
for shard_idx in 0..SHARD_COUNT {
guards.push(self.read_shard(shard_idx)?);
}
Ok(guards)
}
}
#[derive(Debug)]
pub(crate) struct Namespace {
pub(crate) id: u32,
pub(crate) name: Box<str>,
pub(crate) keymap: Keymap,
}
impl Namespace {
#[must_use]
pub(crate) fn new(id: u32, name: impl Into<Box<str>>) -> Self {
Self {
id,
name: name.into(),
keymap: Keymap::new(),
}
}
#[must_use]
pub(crate) fn default_ns() -> Self {
Self::new(0, "")
}
}
#[cfg(test)]
mod tests {
use super::{Keymap, Namespace, Slot, SHARD_COUNT};
use crate::storage::page::rid::Rid;
#[test]
fn fresh_keymap_reports_zero_length() {
let map = Keymap::new();
let len = map.len();
assert!(matches!(len, Ok(0)));
}
#[test]
fn lookup_returns_none_for_unknown_hash() {
let map = Keymap::new();
let h = Keymap::hash_key(b"missing");
let lookup = map.lookup(h);
assert!(matches!(lookup, Ok(None)));
}
#[test]
fn insert_then_lookup_returns_single_slot() {
let map = Keymap::new();
let h = Keymap::hash_key(b"alpha");
let rid = Rid::new(7, 3);
let inserted = map.insert(h, rid);
assert!(inserted.is_ok());
let lookup = map.lookup(h);
assert!(matches!(lookup, Ok(Some(Slot::Single(found))) if found == rid));
}
#[test]
fn inserting_same_rid_twice_remains_single() {
let map = Keymap::new();
let h = Keymap::hash_key(b"alpha");
let rid = Rid::new(1, 1);
assert!(map.insert(h, rid).is_ok());
assert!(map.insert(h, rid).is_ok());
let lookup = map.lookup(h);
assert!(matches!(lookup, Ok(Some(Slot::Single(found))) if found == rid));
}
#[test]
fn collision_promotes_single_to_multi() {
let map = Keymap::new();
let h = Keymap::hash_key(b"alpha");
let rid_a = Rid::new(1, 0);
let rid_b = Rid::new(2, 0);
assert!(map.insert(h, rid_a).is_ok());
assert!(map.insert(h, rid_b).is_ok());
let lookup = map.lookup(h);
assert!(matches!(
lookup,
Ok(Some(Slot::Multi(ref rids))) if rids == &vec![rid_a, rid_b]
));
}
#[test]
fn remove_collapses_multi_back_to_single_with_one_left() {
let map = Keymap::new();
let h = Keymap::hash_key(b"alpha");
let rid_a = Rid::new(1, 0);
let rid_b = Rid::new(2, 0);
let rid_c = Rid::new(3, 0);
assert!(map.insert(h, rid_a).is_ok());
assert!(map.insert(h, rid_b).is_ok());
assert!(map.insert(h, rid_c).is_ok());
let removed = map.remove(h, rid_b);
assert!(matches!(removed, Ok(true)));
let lookup = map.lookup(h);
assert!(matches!(
lookup,
Ok(Some(Slot::Multi(ref rids))) if rids == &vec![rid_a, rid_c]
));
let removed = map.remove(h, rid_a);
assert!(matches!(removed, Ok(true)));
let lookup = map.lookup(h);
assert!(matches!(lookup, Ok(Some(Slot::Single(found))) if found == rid_c));
let removed = map.remove(h, rid_c);
assert!(matches!(removed, Ok(true)));
let lookup = map.lookup(h);
assert!(matches!(lookup, Ok(None)));
}
#[test]
fn remove_unknown_rid_reports_false() {
let map = Keymap::new();
let h = Keymap::hash_key(b"alpha");
let rid = Rid::new(1, 0);
assert!(map.insert(h, rid).is_ok());
let removed = map.remove(h, Rid::new(99, 99));
assert!(matches!(removed, Ok(false)));
let lookup = map.lookup(h);
assert!(matches!(lookup, Ok(Some(Slot::Single(found))) if found == rid));
}
#[test]
fn replace_single_overwrites_existing_slot() {
let map = Keymap::new();
let h = Keymap::hash_key(b"alpha");
assert!(map.insert(h, Rid::new(1, 0)).is_ok());
assert!(map.insert(h, Rid::new(2, 0)).is_ok());
assert!(map.replace_single(h, Rid::new(99, 99)).is_ok());
let lookup = map.lookup(h);
assert!(matches!(
lookup,
Ok(Some(Slot::Single(rid))) if rid == Rid::new(99, 99)
));
}
#[test]
fn slot_iter_yields_every_rid() {
let single = Slot::Single(Rid::new(7, 1));
let collected: Vec<_> = single.iter().copied().collect();
assert_eq!(collected, vec![Rid::new(7, 1)]);
let multi = Slot::Multi(vec![Rid::new(1, 0), Rid::new(2, 0), Rid::new(3, 0)]);
let collected: Vec<_> = multi.iter().copied().collect();
assert_eq!(
collected,
vec![Rid::new(1, 0), Rid::new(2, 0), Rid::new(3, 0)]
);
}
#[test]
fn clear_drops_every_entry() {
let map = Keymap::new();
for i in 0_u32..1000 {
let key = format!("k{i}");
let _ = map.insert(Keymap::hash_key(key.as_bytes()), Rid::new(u64::from(i), 0));
}
let len = map.len();
assert!(matches!(len, Ok(1000)));
let cleared = map.clear();
assert!(cleared.is_ok());
let len = map.len();
assert!(matches!(len, Ok(0)));
}
#[test]
fn write_all_and_read_all_return_one_guard_per_shard() {
let map = Keymap::new();
let writes = map.write_all();
assert!(matches!(writes, Ok(ref guards) if guards.len() == SHARD_COUNT));
drop(writes);
let reads = map.read_all();
assert!(matches!(reads, Ok(ref guards) if guards.len() == SHARD_COUNT));
}
#[test]
fn shard_for_distributes_uniformly_across_a_simple_key_space() {
let mut counts = [0_usize; SHARD_COUNT];
for i in 0..10_000_u32 {
let key = format!("key-{i}");
counts[Keymap::shard_for(Keymap::hash_key(key.as_bytes()))] += 1;
}
for count in counts {
assert!(count > 0, "shard distribution missed a bucket");
}
}
#[test]
fn namespace_default_has_id_zero_and_empty_name() {
let ns = Namespace::default_ns();
assert_eq!(ns.id, 0);
assert_eq!(&*ns.name, "");
let len = ns.keymap.len();
assert!(matches!(len, Ok(0)));
}
#[test]
fn namespace_named_constructor_records_name() {
let ns = Namespace::new(7, "users");
assert_eq!(ns.id, 7);
assert_eq!(&*ns.name, "users");
}
}