use std::collections::HashMap;
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::ttl::Record;
use crate::{Error, Result};
pub(crate) const SHARD_COUNT: usize = 32;
const SHARD_MASK: u64 = (SHARD_COUNT as u64) - 1;
const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
pub(crate) type Shard = HashMap<Vec<u8>, Record>;
#[derive(Debug)]
pub(crate) struct Index {
shards: Box<[RwLock<Shard>; SHARD_COUNT]>,
}
impl Index {
#[must_use]
pub(crate) fn new() -> Self {
let shards = std::array::from_fn::<_, SHARD_COUNT, _>(|_| RwLock::new(Shard::new()));
Self {
shards: Box::new(shards),
}
}
pub(crate) fn from_records<I>(records: I) -> Self
where
I: IntoIterator<Item = (Vec<u8>, Record)>,
{
let index = Self::new();
for (key, record) in records {
let shard_idx = shard_for(&key);
if let Ok(mut shard) = index.shards[shard_idx].write() {
let _previous = shard.insert(key, record);
}
}
index
}
#[inline]
#[must_use]
pub(crate) fn shard_for_key(key: &[u8]) -> usize {
shard_for(key)
}
pub(crate) fn read(&self, shard_idx: usize) -> Result<RwLockReadGuard<'_, Shard>> {
self.shards[shard_idx]
.read()
.map_err(|_poisoned| Error::LockPoisoned)
}
pub(crate) fn write(&self, shard_idx: usize) -> Result<RwLockWriteGuard<'_, Shard>> {
self.shards[shard_idx]
.write()
.map_err(|_poisoned| Error::LockPoisoned)
}
pub(crate) fn read_all(&self) -> Result<Vec<RwLockReadGuard<'_, Shard>>> {
let mut guards = Vec::with_capacity(SHARD_COUNT);
for shard in self.shards.iter() {
guards.push(shard.read().map_err(|_poisoned| Error::LockPoisoned)?);
}
Ok(guards)
}
pub(crate) fn write_all(&self) -> Result<Vec<RwLockWriteGuard<'_, Shard>>> {
let mut guards = Vec::with_capacity(SHARD_COUNT);
for shard in self.shards.iter() {
guards.push(shard.write().map_err(|_poisoned| Error::LockPoisoned)?);
}
Ok(guards)
}
}
impl Default for Index {
fn default() -> Self {
Self::new()
}
}
#[inline]
fn shard_for(bytes: &[u8]) -> usize {
let mut hash = FNV_OFFSET;
for &byte in bytes {
hash ^= u64::from(byte);
hash = hash.wrapping_mul(FNV_PRIME);
}
(hash & SHARD_MASK) as usize
}
#[cfg(test)]
mod tests {
use super::{shard_for, Index, SHARD_COUNT};
use crate::ttl::record_new;
#[test]
fn shard_for_distributes_uniformly_across_a_simple_key_space() {
let mut counts = [0_usize; SHARD_COUNT];
for i in 0..10_000_u32 {
let key = format!("key-{i}").into_bytes();
counts[shard_for(&key)] += 1;
}
for count in counts {
assert!(count > 0, "shard distribution missed a bucket");
}
}
#[test]
fn shard_for_is_deterministic() {
let key = b"deterministic-key";
let first = shard_for(key);
let second = shard_for(key);
assert_eq!(first, second);
}
#[test]
fn index_reads_back_what_it_writes() {
let index = Index::new();
let key = b"hello".to_vec();
let shard_idx = Index::shard_for_key(&key);
let write_guard = index.write(shard_idx);
assert!(write_guard.is_ok());
let mut shard = match write_guard {
Ok(guard) => guard,
Err(err) => panic!("write guard should succeed: {err}"),
};
let _previous = shard.insert(key.clone(), record_new(b"world".to_vec(), None));
drop(shard);
let read_guard = index.read(shard_idx);
assert!(read_guard.is_ok());
let shard = match read_guard {
Ok(guard) => guard,
Err(err) => panic!("read guard should succeed: {err}"),
};
let record = match shard.get(key.as_slice()) {
Some(record) => record,
None => panic!("inserted record should be present"),
};
assert_eq!(crate::ttl::record_value(record), b"world");
}
#[test]
fn read_all_and_write_all_return_one_guard_per_shard() {
let index = Index::new();
let reads = index.read_all();
assert!(reads.is_ok());
let reads = match reads {
Ok(guards) => guards,
Err(err) => panic!("read_all should succeed: {err}"),
};
assert_eq!(reads.len(), SHARD_COUNT);
drop(reads);
let writes = index.write_all();
assert!(writes.is_ok());
let writes = match writes {
Ok(guards) => guards,
Err(err) => panic!("write_all should succeed: {err}"),
};
assert_eq!(writes.len(), SHARD_COUNT);
}
}