use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use crate::{Error, Result};
pub(crate) const VALUE_SHARD_COUNT: usize = 32;
const VALUE_SHARD_MASK: u64 = (VALUE_SHARD_COUNT as u64) - 1;
pub(crate) const DEFAULT_TOTAL_BYTES: usize = 64 * 1024 * 1024;
#[derive(Debug, Clone)]
pub(crate) struct CachedValue {
pub(crate) value: Arc<[u8]>,
pub(crate) expires_at: u64,
}
#[derive(Debug)]
struct CacheEntry {
key: Box<[u8]>,
value: Arc<[u8]>,
expires_at: u64,
referenced: AtomicBool,
}
#[derive(Debug)]
struct CacheShard {
entries: HashMap<(u32, u64), CacheEntry>,
clock: VecDeque<(u32, u64)>,
bytes_used: usize,
bytes_capacity: usize,
}
impl CacheShard {
fn new(bytes_capacity: usize) -> Self {
Self {
entries: HashMap::new(),
clock: VecDeque::new(),
bytes_used: 0,
bytes_capacity,
}
}
fn get(&self, ns: u32, hash: u64, key: &[u8]) -> Option<CachedValue> {
let entry = self.entries.get(&(ns, hash))?;
if entry.key.as_ref() != key {
return None;
}
entry.referenced.store(true, Ordering::Relaxed);
Some(CachedValue {
value: Arc::clone(&entry.value),
expires_at: entry.expires_at,
})
}
fn insert(&mut self, ns: u32, hash: u64, key: Box<[u8]>, value: Arc<[u8]>, expires_at: u64) {
let entry_bytes = key.len() + value.len();
if entry_bytes > self.bytes_capacity {
return;
}
if let Some(existing) = self.entries.get_mut(&(ns, hash)) {
let old_bytes = existing.key.len() + existing.value.len();
existing.key = key;
existing.value = value;
existing.expires_at = expires_at;
existing.referenced.store(true, Ordering::Relaxed);
self.bytes_used = self.bytes_used + entry_bytes - old_bytes;
return;
}
while self.bytes_used + entry_bytes > self.bytes_capacity {
if !self.evict_one() {
return;
}
}
let _previous = self.entries.insert(
(ns, hash),
CacheEntry {
key,
value,
expires_at,
referenced: AtomicBool::new(true),
},
);
self.clock.push_back((ns, hash));
self.bytes_used = self.bytes_used.saturating_add(entry_bytes);
}
fn invalidate(&mut self, ns: u32, hash: u64) -> bool {
let Some(entry) = self.entries.remove(&(ns, hash)) else {
return false;
};
let bytes = entry.key.len() + entry.value.len();
self.bytes_used = self.bytes_used.saturating_sub(bytes);
true
}
fn clear(&mut self) {
self.entries.clear();
self.clock.clear();
self.bytes_used = 0;
}
fn evict_one(&mut self) -> bool {
let mut budget = self.clock.len().saturating_mul(2).max(1);
while budget > 0 {
budget -= 1;
let Some(candidate) = self.clock.pop_front() else {
return false;
};
let entry_present = self.entries.contains_key(&candidate);
if !entry_present {
continue;
}
let referenced = match self.entries.get(&candidate) {
Some(entry) => entry.referenced.swap(false, Ordering::Relaxed),
None => continue,
};
if referenced {
self.clock.push_back(candidate);
continue;
}
if let Some(entry) = self.entries.remove(&candidate) {
let bytes = entry.key.len() + entry.value.len();
self.bytes_used = self.bytes_used.saturating_sub(bytes);
return true;
}
}
false
}
}
#[derive(Debug)]
pub(crate) struct ValueCache {
shards: Box<[RwLock<CacheShard>; VALUE_SHARD_COUNT]>,
}
impl ValueCache {
#[must_use]
pub(crate) fn new(total_bytes: usize) -> Self {
let bytes_per_shard = total_bytes.div_ceil(VALUE_SHARD_COUNT).max(1);
let shards = std::array::from_fn::<_, VALUE_SHARD_COUNT, _>(|_| {
RwLock::new(CacheShard::new(bytes_per_shard))
});
Self {
shards: Box::new(shards),
}
}
#[must_use]
pub(crate) fn with_default_capacity() -> Self {
Self::new(DEFAULT_TOTAL_BYTES)
}
#[inline]
#[must_use]
const fn shard_for(hash: u64) -> usize {
(hash & VALUE_SHARD_MASK) as usize
}
pub(crate) fn get(&self, ns: u32, hash: u64, key: &[u8]) -> Result<Option<CachedValue>> {
let shard = self.shards[Self::shard_for(hash)]
.read()
.map_err(|_poisoned| Error::LockPoisoned)?;
Ok(shard.get(ns, hash, key))
}
pub(crate) fn insert(
&self,
ns: u32,
hash: u64,
key: Box<[u8]>,
value: Arc<[u8]>,
expires_at: u64,
) -> Result<()> {
let mut shard = self.shards[Self::shard_for(hash)]
.write()
.map_err(|_poisoned| Error::LockPoisoned)?;
shard.insert(ns, hash, key, value, expires_at);
Ok(())
}
pub(crate) fn invalidate(&self, ns: u32, hash: u64) -> Result<bool> {
let mut shard = self.shards[Self::shard_for(hash)]
.write()
.map_err(|_poisoned| Error::LockPoisoned)?;
Ok(shard.invalidate(ns, hash))
}
pub(crate) fn clear(&self) -> Result<()> {
for idx in 0..VALUE_SHARD_COUNT {
let mut shard = self.shards[idx]
.write()
.map_err(|_poisoned| Error::LockPoisoned)?;
shard.clear();
}
Ok(())
}
pub(crate) fn bytes_used(&self) -> Result<usize> {
let mut total = 0_usize;
for idx in 0..VALUE_SHARD_COUNT {
let shard = self.shards[idx]
.read()
.map_err(|_poisoned| Error::LockPoisoned)?;
total = total.saturating_add(shard.bytes_used);
}
Ok(total)
}
}
#[cfg(test)]
mod tests {
use super::{CachedValue, ValueCache, DEFAULT_TOTAL_BYTES, VALUE_SHARD_COUNT};
use std::sync::Arc;
fn arc_bytes(payload: &[u8]) -> Arc<[u8]> {
Arc::from(payload.to_vec().into_boxed_slice())
}
fn boxed_bytes(payload: &[u8]) -> Box<[u8]> {
payload.to_vec().into_boxed_slice()
}
#[test]
fn fresh_cache_starts_empty() {
let cache = ValueCache::new(1024);
let used = cache.bytes_used();
assert!(matches!(used, Ok(0)));
}
#[test]
fn default_capacity_is_documented_constant() {
let cache = ValueCache::with_default_capacity();
let used = cache.bytes_used();
assert!(matches!(used, Ok(0)));
let _ = DEFAULT_TOTAL_BYTES;
}
#[test]
fn insert_then_get_returns_value_and_expires_at() {
let cache = ValueCache::new(1024);
let inserted = cache.insert(0, 42, boxed_bytes(b"alpha"), arc_bytes(b"one"), 100);
assert!(inserted.is_ok());
let fetched = match cache.get(0, 42, b"alpha") {
Ok(value) => value,
Err(err) => panic!("get should succeed: {err}"),
};
let CachedValue { value, expires_at } = match fetched {
Some(value) => value,
None => panic!("entry should be cached"),
};
assert_eq!(value.as_ref(), b"one");
assert_eq!(expires_at, 100);
}
#[test]
fn miss_returns_none_for_unknown_hash() {
let cache = ValueCache::new(1024);
let fetched = cache.get(0, 99, b"nope");
assert!(matches!(fetched, Ok(None)));
}
#[test]
fn collision_with_different_key_returns_none() {
let cache = ValueCache::new(1024);
let _ = cache.insert(0, 42, boxed_bytes(b"alpha"), arc_bytes(b"one"), 0);
let fetched = cache.get(0, 42, b"beta");
assert!(matches!(fetched, Ok(None)));
}
#[test]
fn different_namespaces_are_isolated() {
let cache = ValueCache::new(1024);
let _ = cache.insert(0, 42, boxed_bytes(b"alpha"), arc_bytes(b"ns0"), 0);
let _ = cache.insert(1, 42, boxed_bytes(b"alpha"), arc_bytes(b"ns1"), 0);
let from_ns0 = match cache.get(0, 42, b"alpha") {
Ok(Some(v)) => v,
Ok(None) => panic!("ns0 entry should be cached"),
Err(err) => panic!("get should succeed: {err}"),
};
assert_eq!(from_ns0.value.as_ref(), b"ns0");
let from_ns1 = match cache.get(1, 42, b"alpha") {
Ok(Some(v)) => v,
Ok(None) => panic!("ns1 entry should be cached"),
Err(err) => panic!("get should succeed: {err}"),
};
assert_eq!(from_ns1.value.as_ref(), b"ns1");
}
#[test]
fn replacing_entry_updates_byte_accounting() {
let cache = ValueCache::new(VALUE_SHARD_COUNT * 256);
let _ = cache.insert(0, 42, boxed_bytes(b"k"), arc_bytes(b"short"), 0);
let used_after_first = match cache.bytes_used() {
Ok(u) => u,
Err(err) => panic!("bytes_used should succeed: {err}"),
};
let _ = cache.insert(0, 42, boxed_bytes(b"k"), arc_bytes(&[b'x'; 64]), 0);
let used_after_second = match cache.bytes_used() {
Ok(u) => u,
Err(err) => panic!("bytes_used should succeed: {err}"),
};
assert!(used_after_second > used_after_first);
}
#[test]
fn invalidate_drops_entry_and_recovers_bytes() {
let cache = ValueCache::new(1024);
let _ = cache.insert(0, 42, boxed_bytes(b"k"), arc_bytes(b"value"), 0);
let used_before = match cache.bytes_used() {
Ok(u) => u,
Err(err) => panic!("bytes_used should succeed: {err}"),
};
assert!(used_before > 0);
let invalidated = cache.invalidate(0, 42);
assert!(matches!(invalidated, Ok(true)));
let fetched = cache.get(0, 42, b"k");
assert!(matches!(fetched, Ok(None)));
let used_after = match cache.bytes_used() {
Ok(u) => u,
Err(err) => panic!("bytes_used should succeed: {err}"),
};
assert!(used_after < used_before);
}
#[test]
fn invalidate_unknown_entry_reports_false() {
let cache = ValueCache::new(1024);
let invalidated = cache.invalidate(0, 99);
assert!(matches!(invalidated, Ok(false)));
}
#[test]
fn clear_drops_every_entry() {
let cache = ValueCache::new(64 * 1024);
for i in 0_u64..64 {
let key = format!("k{i}").into_bytes().into_boxed_slice();
let value = arc_bytes(format!("v{i}").as_bytes());
let _ = cache.insert(0, i, key, value, 0);
}
let cleared = cache.clear();
assert!(cleared.is_ok());
let used = cache.bytes_used();
assert!(matches!(used, Ok(0)));
}
#[test]
fn entries_larger_than_shard_capacity_are_dropped_silently() {
let cache = ValueCache::new(VALUE_SHARD_COUNT * 16);
let huge = arc_bytes(&vec![b'x'; 4096]);
let inserted = cache.insert(0, 1, boxed_bytes(b"huge"), huge, 0);
assert!(inserted.is_ok());
let fetched = cache.get(0, 1, b"huge");
assert!(matches!(fetched, Ok(None)));
}
#[test]
fn evicts_under_pressure_to_stay_within_capacity() {
let per_shard = 64;
let cache = ValueCache::new(VALUE_SHARD_COUNT * per_shard);
let stride = VALUE_SHARD_COUNT as u64;
for i in 0_u64..6 {
let key = format!("k{i}").into_bytes().into_boxed_slice();
let value = arc_bytes(&[b'a' + i as u8; 24]);
let _ = cache.insert(0, i * stride, key, value, 0);
}
let used = match cache.bytes_used() {
Ok(u) => u,
Err(err) => panic!("bytes_used should succeed: {err}"),
};
assert!(used <= VALUE_SHARD_COUNT * per_shard);
let last = cache.get(0, 5 * stride, b"k5");
assert!(matches!(last, Ok(Some(_))));
}
#[test]
fn clock_keeps_recently_touched_entry_after_a_full_sweep() {
let per_shard = 96;
let cache = ValueCache::new(VALUE_SHARD_COUNT * per_shard);
let stride = VALUE_SHARD_COUNT as u64;
for i in 0_u64..3 {
let key = format!("k{i}").into_bytes().into_boxed_slice();
let value = arc_bytes(&[b'a' + i as u8; 24]);
let _ = cache.insert(0, i * stride, key, value, 0);
}
let _ = cache.insert(0, 3 * stride, boxed_bytes(b"k3"), arc_bytes(&[b'd'; 24]), 0);
let used = match cache.bytes_used() {
Ok(u) => u,
Err(err) => panic!("bytes_used should succeed: {err}"),
};
assert!(used <= VALUE_SHARD_COUNT * per_shard);
let _ = cache.get(0, 3 * stride, b"k3");
let _ = cache.insert(0, 4 * stride, boxed_bytes(b"k4"), arc_bytes(&[b'e'; 24]), 0);
let k3_after = cache.get(0, 3 * stride, b"k3");
assert!(matches!(k3_after, Ok(Some(_))));
}
#[test]
fn small_total_budget_still_yields_one_byte_per_shard() {
let cache = ValueCache::new(1);
let used = cache.bytes_used();
assert!(matches!(used, Ok(0)));
}
}