use std::hash::Hash;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(feature = "async_core")]
use crate::ConcurrentCachedAsync;
use crate::{
CacheMetrics, CachedIter, CachedPeek, ConcurrentCacheBase, ConcurrentCached,
ConcurrentCloneCached, Expires,
};
use super::{
CachePadded, DefaultShardHasher, Shard, ShardHasher, checked_shard_count, shard_index,
};
use crate::Cached;
use crate::ConcurrentCacheEvict;
use crate::stores::{BuildError, LruCache};
type OnEvict<K, V> = Arc<dyn Fn(&K, &V) + Send + Sync>;
#[allow(clippy::type_complexity)]
struct ExpiringLruInner<K, V, H> {
shards: Box<[CachePadded<Shard<LruCache<K, V>>>]>,
shard_mask: usize,
hasher: H,
on_evict: Option<OnEvict<K, V>>,
evictions: AtomicU64,
total_capacity: usize,
}
pub type ShardedExpiringLruCache<K, V> = ShardedExpiringLruCacheBase<K, V, DefaultShardHasher>;
pub struct ShardedExpiringLruCacheBase<K, V, H = DefaultShardHasher> {
inner: Arc<ExpiringLruInner<K, V, H>>,
}
impl<K, V, H> Clone for ShardedExpiringLruCacheBase<K, V, H> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<K, V, H> std::fmt::Debug for ShardedExpiringLruCacheBase<K, V, H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ShardedExpiringLruCache")
.field("shards", &self.inner.shards.len())
.field("capacity", &self.inner.total_capacity)
.field("evictions", &self.inner.evictions.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl<K, V> ShardedExpiringLruCacheBase<K, V, DefaultShardHasher>
where
K: Hash + Eq + Clone,
V: Expires,
{
#[must_use]
pub fn new(max_size: usize) -> ShardedExpiringLruCache<K, V> {
Self::builder().max_size(max_size).build().expect(
"ShardedExpiringLruCache::new requires a non-zero max_size with a valid allocation",
)
}
#[must_use]
pub fn builder() -> ShardedExpiringLruCacheBuilder<K, V, DefaultShardHasher> {
ShardedExpiringLruCacheBuilder::default()
}
}
impl<K, V, H> ShardedExpiringLruCacheBase<K, V, H>
where
K: Hash + Eq + Clone,
V: Expires,
H: ShardHasher<K>,
{
#[inline]
fn shard_of(&self, k: &K) -> &CachePadded<Shard<LruCache<K, V>>> {
let h = self.inner.hasher.shard_hash(k);
&self.inner.shards[shard_index(h, self.inner.shard_mask)]
}
}
impl<K: Clone + Hash + Eq, V: Clone + Expires, H: ShardHasher<K>>
ShardedExpiringLruCacheBase<K, V, H>
{
#[must_use]
pub fn deep_clone(&self) -> Self {
let n = self.inner.shards.len();
let shards = (0..n)
.map(|i| {
let guard = self.inner.shards[i].lock.read();
let store_copy = guard.clone();
drop(guard);
let hits = self.inner.shards[i].hits.load(Ordering::Relaxed);
let misses = self.inner.shards[i].misses.load(Ordering::Relaxed);
let shard = Shard {
lock: parking_lot::RwLock::new(store_copy),
hits: AtomicU64::new(hits),
misses: AtomicU64::new(misses),
};
CachePadded(shard)
})
.collect::<Vec<_>>()
.into_boxed_slice();
Self {
inner: Arc::new(ExpiringLruInner {
shards,
shard_mask: self.inner.shard_mask,
hasher: self.inner.hasher.clone(),
on_evict: self.inner.on_evict.clone(),
evictions: AtomicU64::new(self.inner.evictions.load(Ordering::Relaxed)),
total_capacity: self.inner.total_capacity,
}),
}
}
}
impl<K, V, H: ShardHasher<K>> ShardedExpiringLruCacheBase<K, V, H>
where
K: Hash + Eq + Clone,
V: Clone + Expires,
{
#[must_use]
pub fn get(&self, k: &K) -> Option<V> {
ConcurrentCached::cache_get(self, k).unwrap()
}
pub fn set(&self, k: K, v: V) -> Option<V> {
ConcurrentCached::cache_set(self, k, v).unwrap()
}
pub fn remove(&self, k: &K) -> Option<V> {
ConcurrentCached::cache_remove(self, k).unwrap()
}
pub fn remove_entry(&self, k: &K) -> Option<(K, V)> {
ConcurrentCached::cache_remove_entry(self, k).unwrap()
}
pub fn delete(&self, k: &K) -> bool {
ConcurrentCached::cache_delete(self, k).unwrap()
}
pub fn reset(&self) {
ConcurrentCached::cache_reset(self).unwrap()
}
}
impl<K, V, H: ShardHasher<K>> ShardedExpiringLruCacheBase<K, V, H>
where
K: Hash + Eq + Clone,
V: Expires,
{
#[must_use]
pub fn metrics(&self) -> CacheMetrics {
let mut hits = 0u64;
let mut misses = 0u64;
let mut inner_evictions = 0u64;
let mut size = 0usize;
for shard in self.inner.shards.iter() {
hits += shard.hits.load(Ordering::Relaxed);
misses += shard.misses.load(Ordering::Relaxed);
let guard = shard.lock.read();
if let Some(e) = guard.cache_evictions() {
inner_evictions += e;
}
size += guard.cache_size();
}
CacheMetrics {
hits: Some(hits),
misses: Some(misses),
evictions: Some(inner_evictions + self.inner.evictions.load(Ordering::Relaxed)),
entry_count: size,
capacity: Some(self.inner.total_capacity),
}
}
#[must_use]
pub fn shards(&self) -> usize {
self.inner.shards.len()
}
#[must_use]
pub fn shard_sizes(&self) -> Vec<usize> {
self.inner
.shards
.iter()
.map(|s| s.lock.read().cache_size())
.collect()
}
#[must_use]
pub fn len(&self) -> usize {
self.inner
.shards
.iter()
.map(|s| s.lock.read().cache_size())
.sum()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.inner
.shards
.iter()
.all(|s| s.lock.read().cache_size() == 0)
}
pub fn clear(&self) {
for shard in self.inner.shards.iter() {
shard.lock.write().cache_clear();
}
}
pub fn cache_clear_with_on_evict(&self) {
if self.inner.on_evict.is_none() {
return self.clear();
}
for shard in self.inner.shards.iter() {
let removed: Vec<(K, V)> = {
let mut guard = shard.lock.write();
let keys: Vec<K> = guard.iter().map(|(k, _)| k.clone()).collect();
let mut removed = Vec::with_capacity(keys.len());
for k in keys {
if let Some(pair) = guard.pop_raw(&k) {
removed.push(pair);
}
}
if !removed.is_empty() {
guard
.evictions
.fetch_add(removed.len() as u64, Ordering::Relaxed);
}
removed
};
if let Some(on_evict) = &self.inner.on_evict {
for (k, v) in &removed {
on_evict(k, v);
}
}
}
}
#[must_use]
pub fn capacity(&self) -> usize {
self.inner.total_capacity
}
}
impl<K, V, H> ConcurrentCacheBase for ShardedExpiringLruCacheBase<K, V, H>
where
K: Hash + Eq + Clone,
V: Clone + Expires,
H: ShardHasher<K>,
{
type Error = std::convert::Infallible;
fn cache_size(&self) -> Result<Option<usize>, Self::Error> {
Ok(Some(self.len()))
}
fn cache_hits(&self) -> Option<u64> {
Some(
self.inner
.shards
.iter()
.map(|s| s.hits.load(Ordering::Relaxed))
.sum(),
)
}
fn cache_misses(&self) -> Option<u64> {
Some(
self.inner
.shards
.iter()
.map(|s| s.misses.load(Ordering::Relaxed))
.sum(),
)
}
fn cache_capacity(&self) -> Option<usize> {
Some(self.inner.total_capacity)
}
fn cache_evictions(&self) -> Option<u64> {
let mut inner_evictions = 0u64;
for shard in self.inner.shards.iter() {
let guard = shard.lock.read();
if let Some(e) = Cached::cache_evictions(&*guard) {
inner_evictions += e;
}
}
Some(inner_evictions + self.inner.evictions.load(Ordering::Relaxed))
}
}
impl<K, V, H> ConcurrentCached<K, V> for ShardedExpiringLruCacheBase<K, V, H>
where
K: Hash + Eq + Clone,
V: Clone + Expires,
H: ShardHasher<K>,
{
fn cache_get(&self, k: &K) -> Result<Option<V>, Self::Error> {
let shard = self.shard_of(k);
let mut guard = shard.lock.write();
let expired = match guard.cache_peek(k) {
None => {
shard.misses.fetch_add(1, Ordering::Relaxed);
return Ok(None);
}
Some(v) => v.is_expired(),
};
if expired {
let removed = guard.pop_raw(k);
drop(guard);
if let Some((ref key, ref val)) = removed {
self.inner.evictions.fetch_add(1, Ordering::Relaxed);
if let Some(on_evict) = &self.inner.on_evict {
on_evict(key, val);
}
}
shard.misses.fetch_add(1, Ordering::Relaxed);
Ok(None)
} else {
let val = guard.cache_get(k).cloned();
shard.hits.fetch_add(1, Ordering::Relaxed);
Ok(val)
}
}
fn cache_set(&self, k: K, v: V) -> Result<Option<V>, Self::Error> {
let shard = self.shard_of(&k);
Ok(shard.lock.write().cache_set(k, v))
}
fn cache_remove(&self, k: &K) -> Result<Option<V>, Self::Error> {
let shard = self.shard_of(k);
let removed = {
let mut guard = shard.lock.write();
let removed = guard.pop_raw(k);
if removed.is_some() {
guard.evictions.fetch_add(1, Ordering::Relaxed);
}
removed
};
let Some((key, val)) = removed else {
return Ok(None);
};
if let Some(on_evict) = &self.inner.on_evict {
on_evict(&key, &val);
}
if val.is_expired() {
Ok(None)
} else {
Ok(Some(val))
}
}
fn cache_remove_entry(&self, k: &K) -> Result<Option<(K, V)>, Self::Error> {
let shard = self.shard_of(k);
let removed = {
let mut guard = shard.lock.write();
let removed = guard.pop_raw(k);
if removed.is_some() {
guard.evictions.fetch_add(1, Ordering::Relaxed);
}
removed
};
let Some((key, val)) = removed else {
return Ok(None);
};
if let Some(on_evict) = &self.inner.on_evict {
on_evict(&key, &val);
}
Ok(Some((key, val)))
}
fn cache_clear(&self) -> Result<(), Self::Error> {
self.clear();
Ok(())
}
fn cache_reset(&self) -> Result<(), Self::Error> {
self.clear();
ConcurrentCached::cache_reset_metrics(self)
}
fn cache_reset_metrics(&self) -> Result<(), Self::Error> {
for shard in self.inner.shards.iter() {
shard.hits.store(0, Ordering::Relaxed);
shard.misses.store(0, Ordering::Relaxed);
shard.lock.write().cache_reset_metrics();
}
self.inner.evictions.store(0, Ordering::Relaxed);
Ok(())
}
}
#[cfg(feature = "async_core")]
impl<K, V, H> ConcurrentCachedAsync<K, V> for ShardedExpiringLruCacheBase<K, V, H>
where
K: Hash + Eq + Clone + Send + Sync,
V: Clone + Expires + Send + Sync,
H: ShardHasher<K>,
{
async fn async_cache_get(&self, k: &K) -> Result<Option<V>, Self::Error> {
ConcurrentCached::cache_get(self, k)
}
async fn async_cache_set(&self, k: K, v: V) -> Result<Option<V>, Self::Error> {
ConcurrentCached::cache_set(self, k, v)
}
async fn async_cache_remove(&self, k: &K) -> Result<Option<V>, Self::Error> {
ConcurrentCached::cache_remove(self, k)
}
async fn async_cache_remove_entry(&self, k: &K) -> Result<Option<(K, V)>, Self::Error> {
ConcurrentCached::cache_remove_entry(self, k)
}
async fn async_cache_clear(&self) -> Result<(), Self::Error> {
ConcurrentCached::cache_clear(self)
}
async fn async_cache_reset(&self) -> Result<(), Self::Error> {
ConcurrentCached::cache_reset(self)
}
async fn async_cache_reset_metrics(&self) -> Result<(), Self::Error> {
ConcurrentCached::cache_reset_metrics(self)
}
}
impl<K, V, H> ShardedExpiringLruCacheBase<K, V, H>
where
K: Clone + Hash + Eq,
V: Expires,
H: ShardHasher<K>,
{
#[must_use]
pub fn evict(&self) -> usize {
let mut total = 0;
for shard in self.inner.shards.iter() {
let removed = {
let mut guard = shard.lock.write();
let expired_keys: Vec<K> = guard
.iter()
.filter(|(_, v)| v.is_expired())
.map(|(k, _)| k.clone())
.collect();
let mut removed = Vec::new();
for k in expired_keys {
if let Some((key, val)) = guard.pop_raw(&k) {
removed.push((key, val));
}
}
removed
};
total += removed.len();
if !removed.is_empty() {
self.inner
.evictions
.fetch_add(removed.len() as u64, Ordering::Relaxed);
if let Some(on_evict) = &self.inner.on_evict {
for (k, v) in &removed {
on_evict(k, v);
}
}
}
}
total
}
}
impl<K, V, H> ConcurrentCacheEvict for ShardedExpiringLruCacheBase<K, V, H>
where
K: Clone + Hash + Eq,
V: Expires,
H: ShardHasher<K>,
{
fn evict(&self) -> usize {
ShardedExpiringLruCacheBase::evict(self)
}
}
#[doc(alias = "ttl")]
pub struct ShardedExpiringLruCacheBuilder<K, V: Expires, H = DefaultShardHasher> {
shards: Option<usize>,
max_size: Option<usize>,
per_shard_max_size: Option<usize>,
hasher: Option<H>,
on_evict: Option<OnEvict<K, V>>,
_k: std::marker::PhantomData<K>,
_v: std::marker::PhantomData<V>,
}
impl<K, V: Expires> Default for ShardedExpiringLruCacheBuilder<K, V, DefaultShardHasher> {
fn default() -> Self {
Self {
shards: None,
max_size: None,
per_shard_max_size: None,
hasher: Some(DefaultShardHasher::default()),
on_evict: None,
_k: std::marker::PhantomData,
_v: std::marker::PhantomData,
}
}
}
impl<K, V: Expires, H> ShardedExpiringLruCacheBuilder<K, V, H> {
#[doc(alias = "size")]
#[doc(alias = "capacity")]
#[must_use]
pub fn max_size(mut self, max_size: usize) -> Self {
self.max_size = Some(max_size);
self
}
#[must_use]
pub fn per_shard_max_size(mut self, per_shard_max_size: usize) -> Self {
self.per_shard_max_size = Some(per_shard_max_size);
self
}
#[must_use]
pub fn shards(mut self, shards: usize) -> Self {
self.shards = Some(shards);
self
}
#[doc(alias = "with_hasher")]
#[must_use]
pub fn hasher<H2: ShardHasher<K>>(
self,
hasher: H2,
) -> ShardedExpiringLruCacheBuilder<K, V, H2> {
ShardedExpiringLruCacheBuilder {
shards: self.shards,
max_size: self.max_size,
per_shard_max_size: self.per_shard_max_size,
hasher: Some(hasher),
on_evict: self.on_evict,
_k: std::marker::PhantomData,
_v: std::marker::PhantomData,
}
}
#[must_use]
pub fn on_evict(mut self, on_evict: impl Fn(&K, &V) + Send + Sync + 'static) -> Self {
self.on_evict = Some(Arc::new(on_evict));
self
}
fn resolve_per_shard_cap(&self, n_shards: usize) -> Result<usize, BuildError> {
match (self.max_size, self.per_shard_max_size) {
(Some(_), Some(_)) => Err(BuildError::InvalidValue {
field: "max_size / per_shard_max_size",
reason: "`max_size` and `per_shard_max_size` are mutually exclusive",
}),
(None, None) => Err(BuildError::MissingRequired("max_size")),
(Some(total), None) => {
if total == 0 {
return Err(BuildError::InvalidValue {
field: "max_size",
reason: "must be greater than zero",
});
}
let mut cap = total.div_ceil(n_shards);
if n_shards > 1 {
cap = std::cmp::max(cap, 16);
}
Ok(cap)
}
(None, Some(per)) => {
if per == 0 {
return Err(BuildError::InvalidValue {
field: "per_shard_max_size",
reason: "must be greater than zero",
});
}
Ok(per)
}
}
}
fn total_capacity(&self, n_shards: usize, per_shard_cap: usize) -> Result<usize, BuildError> {
let field = if self.per_shard_max_size.is_some() {
"per_shard_max_size"
} else {
"max_size"
};
n_shards
.checked_mul(per_shard_cap)
.ok_or(BuildError::InvalidValue {
field,
reason: "effective sharded capacity overflows usize",
})
}
#[must_use]
pub fn copy_from<H2: ShardHasher<K>>(
self,
existing: &ShardedExpiringLruCacheBase<K, V, H2>,
) -> ShardedExpiringLruCacheBase<K, V, H>
where
K: Clone + Hash + Eq,
V: Clone,
H: ShardHasher<K>,
{
let new_cache = self
.build()
.unwrap_or_else(|e| panic!("ShardedExpiringLruCache build failed: {e}"));
for shard in existing.inner.shards.iter() {
let entries: Vec<(K, V)> = {
let guard = shard.lock.read();
guard.iter_order()
};
for (k, v) in entries.into_iter().rev() {
if !v.is_expired() {
let _ = ConcurrentCached::cache_set(&new_cache, k, v);
}
}
}
new_cache
}
#[must_use = "the Result from build() must be used"]
pub fn build(self) -> Result<ShardedExpiringLruCacheBase<K, V, H>, BuildError>
where
K: Hash + Eq + Clone,
H: ShardHasher<K>,
{
let n = checked_shard_count(self.shards)?;
let mask = n - 1;
let per_shard_cap = self.resolve_per_shard_cap(n)?;
let total_cap = self.total_capacity(n, per_shard_cap)?;
let on_evict = self.on_evict.clone();
let shards = (0..n)
.map(|_| {
let mut lru = LruCache::builder().max_size(per_shard_cap).build()?;
lru.on_evict = on_evict.clone();
lru.disable_hit_miss_tracking();
Ok(CachePadded(Shard::new(lru)))
})
.collect::<Result<Vec<_>, BuildError>>()?
.into_boxed_slice();
Ok(ShardedExpiringLruCacheBase {
inner: Arc::new(ExpiringLruInner {
shards,
shard_mask: mask,
hasher: self
.hasher
.expect("hasher is always initialized via Default or .hasher()"),
on_evict: self.on_evict,
evictions: AtomicU64::new(0),
total_capacity: total_cap,
}),
})
}
}
impl<K, V, H> ConcurrentCloneCached<K, V> for ShardedExpiringLruCacheBase<K, V, H>
where
K: Hash + Eq + Clone,
V: Clone + Expires,
H: ShardHasher<K>,
{
fn cache_get_with_expiry_status(&self, k: &K) -> (Option<V>, bool) {
let shard = self.shard_of(k);
let mut guard = shard.lock.write();
let (expired, peeked) = match guard.cache_peek(k) {
None => {
drop(guard);
shard.misses.fetch_add(1, Ordering::Relaxed);
return (None, false);
}
Some(v) => (v.is_expired(), v.clone()),
};
if expired {
drop(guard);
shard.misses.fetch_add(1, Ordering::Relaxed);
(Some(peeked), true)
} else {
let value = guard.cache_get(k).cloned();
drop(guard);
shard.hits.fetch_add(1, Ordering::Relaxed);
(value, false)
}
}
fn cache_peek_with_expiry_status(&self, k: &K) -> (Option<V>, bool) {
let shard = self.shard_of(k);
let guard = shard.lock.read();
match guard.cache_peek(k) {
None => (None, false),
Some(v) => {
let expired = v.is_expired();
(Some(v.clone()), expired)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ConcurrentCached;
use crate::ConcurrentCached as SyncConcurrentCached;
use crate::ConcurrentCloneCached;
#[derive(Clone)]
struct Val {
v: u32,
expired: bool,
}
impl crate::Expires for Val {
fn is_expired(&self) -> bool {
self.expired
}
}
#[test]
fn new_returns_ready_cache_respecting_max_size() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.shards(1)
.max_size(2)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1,
Val {
v: 10,
expired: false,
},
)
.unwrap();
assert_eq!(
SyncConcurrentCached::cache_get(&c, &1)
.unwrap()
.map(|v| v.v),
Some(10)
);
SyncConcurrentCached::cache_set(
&c,
2,
Val {
v: 20,
expired: false,
},
)
.unwrap();
SyncConcurrentCached::cache_set(
&c,
3,
Val {
v: 30,
expired: false,
},
)
.unwrap(); assert_eq!(c.len(), 2);
assert!(SyncConcurrentCached::cache_get(&c, &1).unwrap().is_none());
let c2 = ShardedExpiringLruCache::<u32, Val>::new(64);
SyncConcurrentCached::cache_set(
&c2,
1,
Val {
v: 1,
expired: false,
},
)
.unwrap();
assert_eq!(
SyncConcurrentCached::cache_get(&c2, &1)
.unwrap()
.map(|v| v.v),
Some(1)
);
assert_eq!(
ShardedExpiringLruCache::<u32, Val>::new(1024).capacity(),
ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(1024)
.build()
.unwrap()
.capacity()
);
}
#[test]
#[should_panic(expected = "non-zero max_size")]
fn new_zero_max_size_panics() {
let _c = ShardedExpiringLruCache::<u32, Val>::new(0);
}
#[test]
fn copy_from_skips_expired() {
let old = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
for i in 0..10u32 {
SyncConcurrentCached::cache_set(
&old,
i,
Val {
v: i,
expired: true,
},
)
.expect("insert must succeed");
}
let new_cache = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.max_size(64)
.copy_from(&old);
assert_eq!(new_cache.len(), 0);
}
#[test]
fn copy_from_preserves_live_entries() {
let old = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
for i in 0..20u32 {
SyncConcurrentCached::cache_set(
&old,
i,
Val {
v: i * 10,
expired: false,
},
)
.expect("insert must succeed");
}
let new_cache = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.max_size(64)
.copy_from(&old);
assert_eq!(new_cache.len(), 20);
for i in 0..20u32 {
let got =
SyncConcurrentCached::cache_get(&new_cache, &i).expect("key was just inserted");
assert_eq!(got.map(|v| v.v), Some(i * 10));
}
}
#[test]
fn copy_from_respects_capacity() {
let old = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
for i in 0..40u32 {
SyncConcurrentCached::cache_set(
&old,
i,
Val {
v: i,
expired: false,
},
)
.expect("insert must succeed");
}
let new_cache = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.shards(1)
.max_size(8)
.copy_from(&old);
assert!(
new_cache.len() <= 8,
"new cache should not exceed capacity; got {}",
new_cache.len()
);
assert!(!new_cache.is_empty(), "new cache should not be empty");
}
#[test]
fn cache_remove_fires_on_evict_and_updates_metrics() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrd};
let evict_count = Arc::new(AtomicU64::new(0));
let ec = evict_count.clone();
let cache = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.shards(1)
.max_size(8)
.on_evict(move |_, _| {
ec.fetch_add(1, AtomicOrd::Relaxed);
})
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&cache,
1,
Val {
v: 1,
expired: false,
},
)
.expect("insert must succeed");
SyncConcurrentCached::cache_set(
&cache,
2,
Val {
v: 2,
expired: true,
},
)
.expect("insert must succeed");
let before = cache
.metrics()
.evictions
.expect("eviction-tracking stores report an evictions count");
let got = SyncConcurrentCached::cache_remove(&cache, &1).expect("key must be present");
assert_eq!(got.map(|v| v.v), Some(1));
assert_eq!(
evict_count.load(AtomicOrd::Relaxed),
1,
"on_evict must fire"
);
assert_eq!(
cache
.metrics()
.evictions
.expect("eviction-tracking stores report an evictions count")
- before,
1,
"evictions metric must increment on successful remove"
);
let before2 = cache
.metrics()
.evictions
.expect("eviction-tracking stores report an evictions count");
let got2 = SyncConcurrentCached::cache_remove(&cache, &2).expect("key must be present");
assert_eq!(
got2.map(|v| v.v),
None,
"expired entry must return None from cache_remove"
);
assert_eq!(
evict_count.load(AtomicOrd::Relaxed),
2,
"on_evict must fire even for expired entries"
);
assert_eq!(
cache
.metrics()
.evictions
.expect("eviction-tracking stores report an evictions count")
- before2,
1,
"evictions metric increments even for expired removes"
);
}
#[test]
fn cache_clear_with_on_evict_fires_for_all_entries() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrd};
let count = Arc::new(AtomicU64::new(0));
let count2 = count.clone();
let c = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.shards(1)
.max_size(64)
.on_evict(move |_, _| {
count2.fetch_add(1, AtomicOrd::Relaxed);
})
.build()
.unwrap();
for i in 0..20u32 {
SyncConcurrentCached::cache_set(
&c,
i,
Val {
v: i,
expired: false,
},
)
.expect("insert must succeed");
}
let before = c
.metrics()
.evictions
.expect("eviction-tracking stores report an evictions count");
c.cache_clear_with_on_evict();
assert_eq!(
c.len(),
0,
"cache must be empty after cache_clear_with_on_evict"
);
assert_eq!(
count.load(AtomicOrd::Relaxed),
20,
"on_evict must fire for every entry"
);
assert_eq!(
c.metrics()
.evictions
.expect("eviction-tracking stores report an evictions count")
- before,
20,
"evictions counter must increment for each entry"
);
}
#[test]
fn clear_does_not_fire_on_evict() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrd};
let count = Arc::new(AtomicU64::new(0));
let count2 = count.clone();
let c = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.max_size(64)
.on_evict(move |_, _| {
count2.fetch_add(1, AtomicOrd::Relaxed);
})
.build()
.unwrap();
for i in 0..10u32 {
SyncConcurrentCached::cache_set(
&c,
i,
Val {
v: i,
expired: false,
},
)
.expect("insert must succeed");
}
c.clear();
assert_eq!(
count.load(AtomicOrd::Relaxed),
0,
"clear must not fire on_evict"
);
}
#[test]
fn cache_remove_entry_returns_some_for_live_entry() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 1,
expired: false,
},
)
.expect("insert must succeed");
assert!(
SyncConcurrentCached::cache_remove_entry(&c, &999u32)
.expect("cache_remove_entry must succeed")
.is_none()
);
let removed =
SyncConcurrentCached::cache_remove_entry(&c, &1u32).expect("key must be present");
assert!(removed.is_some());
assert_eq!(removed.expect("must be Some").0, 1u32);
assert!(
SyncConcurrentCached::cache_get(&c, &1u32)
.expect("cache_get must succeed")
.is_none()
);
}
#[test]
fn cache_remove_entry_returns_some_for_expired_entry() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 1,
expired: true,
},
)
.expect("insert must succeed");
SyncConcurrentCached::cache_set(
&c,
2u32,
Val {
v: 2,
expired: true,
},
)
.expect("insert must succeed");
assert!(
SyncConcurrentCached::cache_remove(&c, &1u32)
.expect("cache_remove must succeed")
.is_none()
);
let removed =
SyncConcurrentCached::cache_remove_entry(&c, &2u32).expect("key must be present");
assert!(
removed.is_some(),
"cache_remove_entry must return Some for expired entry"
);
assert_eq!(removed.expect("must be Some").0, 2u32);
}
#[test]
fn cache_delete_returns_true_for_expired_entry() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 1,
expired: true,
},
)
.expect("insert must succeed");
assert!(
SyncConcurrentCached::cache_delete(&c, &1u32).expect("cache_delete must succeed"),
"cache_delete must be true for expired entry"
);
assert!(!SyncConcurrentCached::cache_delete(&c, &1u32).expect("cache_delete must succeed"));
}
#[test]
fn cache_remove_entry_fires_on_evict_for_expired() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrd};
let count = Arc::new(AtomicU64::new(0));
let count2 = count.clone();
let c = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.shards(1)
.max_size(64)
.on_evict(move |_, _| {
count2.fetch_add(1, AtomicOrd::Relaxed);
})
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 1,
expired: true,
},
)
.expect("insert must succeed");
SyncConcurrentCached::cache_remove_entry(&c, &1u32).expect("key must be present");
assert_eq!(
count.load(AtomicOrd::Relaxed),
1,
"on_evict fires for expired entries"
);
SyncConcurrentCached::cache_remove_entry(&c, &999u32)
.expect("cache_remove_entry must succeed");
assert_eq!(count.load(AtomicOrd::Relaxed), 1, "no fire for absent key");
}
#[test]
fn cache_remove_entry_increments_eviction_counter() {
let c = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.max_size(64)
.shards(1)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 1,
expired: true,
},
)
.expect("insert must succeed");
let before = c.metrics().evictions.expect("evictions are always tracked");
SyncConcurrentCached::cache_remove_entry(&c, &1u32).expect("key must be present"); SyncConcurrentCached::cache_remove_entry(&c, &999u32)
.expect("cache_remove_entry must succeed"); assert_eq!(
c.metrics().evictions.expect("evictions are always tracked") - before,
1,
"cache_remove_entry must increment evictions for present key only"
);
}
#[test]
fn concurrent_clone_cached_absent_is_none_false() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
let (val, expired) = ConcurrentCloneCached::cache_get_with_expiry_status(&c, &1u32);
assert!(val.is_none(), "absent key must return None");
assert!(!expired, "absent key must return expired=false");
assert_eq!(
c.metrics().misses,
Some(1),
"absent lookup must increment misses"
);
}
#[test]
fn concurrent_clone_cached_live_entry_is_some_false() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 7,
expired: false,
},
)
.expect("insert must succeed");
let result = ConcurrentCloneCached::cache_get_with_expiry_status(&c, &1u32);
assert_eq!(
result.0.map(|v| v.v),
Some(7),
"live entry must return the value"
);
assert!(!result.1, "live entry must not set the expired flag");
assert_eq!(c.metrics().hits, Some(1), "live lookup must increment hits");
assert_eq!(
c.metrics().evictions,
Some(0),
"live lookup must not increment evictions"
);
}
#[test]
fn concurrent_clone_cached_expired_returns_stale_no_eviction() {
let c = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.max_size(64)
.shards(1)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 55,
expired: true,
},
)
.expect("insert must succeed");
let result = ConcurrentCloneCached::cache_get_with_expiry_status(&c, &1u32);
assert_eq!(
result.0.map(|v| v.v),
Some(55),
"expired entry must return the stale value"
);
assert!(result.1, "expired entry must set the expired flag");
assert_eq!(
c.metrics().misses,
Some(1),
"expired lookup must increment misses"
);
assert_eq!(
c.metrics().evictions,
Some(0),
"expired lookup must NOT increment evictions"
);
let result2 = ConcurrentCloneCached::cache_get_with_expiry_status(&c, &1u32);
assert_eq!(
result2.0.map(|v| v.v),
Some(55),
"entry must still be present after expiry-status lookup"
);
assert!(
result2.1,
"entry must still be expired on second expiry-status call"
);
}
#[test]
fn peek_with_expiry_status_no_side_effects() {
let c = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.max_size(64)
.shards(1)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 42,
expired: false,
},
)
.expect("insert must succeed");
let before = c.metrics();
let (val, expired) = ConcurrentCloneCached::cache_peek_with_expiry_status(&c, &1u32);
assert_eq!(
val.map(|x| x.v),
Some(42),
"live peek must return the value"
);
assert!(!expired, "live peek must report expired=false");
let (val2, expired2) = ConcurrentCloneCached::cache_peek_with_expiry_status(&c, &999u32);
assert!(val2.is_none(), "absent peek must return None");
assert!(!expired2, "absent peek must report expired=false");
let after = c.metrics();
assert_eq!(after.hits, before.hits, "peek must not increment hits");
assert_eq!(
after.misses, before.misses,
"peek must not increment misses"
);
assert_eq!(
after.evictions, before.evictions,
"peek must not increment evictions"
);
assert!(
SyncConcurrentCached::cache_get(&c, &1u32)
.expect("cache_get must succeed")
.is_some(),
"entry must still be present after peek"
);
}
#[test]
fn peek_with_expiry_status_does_not_promote_lru() {
let c = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.max_size(2)
.shards(1)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 10,
expired: false,
},
)
.expect("insert must succeed");
SyncConcurrentCached::cache_set(
&c,
2u32,
Val {
v: 20,
expired: false,
},
)
.expect("insert must succeed");
let (val, expired) = ConcurrentCloneCached::cache_peek_with_expiry_status(&c, &1u32);
assert_eq!(val.map(|x| x.v), Some(10), "peek must return the value");
assert!(!expired, "peek must report expired=false");
let m = c.metrics();
assert_eq!(m.hits, Some(0), "peek must not increment hits");
assert_eq!(m.misses, Some(0), "peek must not increment misses");
SyncConcurrentCached::cache_set(
&c,
3u32,
Val {
v: 30,
expired: false,
},
)
.expect("insert must succeed");
assert!(
SyncConcurrentCached::cache_get(&c, &1u32)
.expect("cache_get must succeed")
.is_none(),
"key 1 must be evicted as LRU (peek must not have promoted it)"
);
assert!(
SyncConcurrentCached::cache_get(&c, &2u32)
.expect("cache_get must succeed")
.is_some(),
"key 2 must survive"
);
assert!(
SyncConcurrentCached::cache_get(&c, &3u32)
.expect("cache_get must succeed")
.is_some(),
"key 3 must survive"
);
}
#[test]
fn peek_with_expiry_status_stale_entry_no_side_effects() {
let c = ShardedExpiringLruCacheBase::<u32, Val>::builder()
.max_size(64)
.shards(1)
.build()
.unwrap();
SyncConcurrentCached::cache_set(
&c,
1u32,
Val {
v: 77,
expired: true,
},
)
.expect("insert must succeed");
let before = c.metrics();
let (val, expired) = ConcurrentCloneCached::cache_peek_with_expiry_status(&c, &1u32);
assert_eq!(
val.map(|x| x.v),
Some(77),
"expired peek must return the stale value"
);
assert!(expired, "expired peek must report expired=true");
let after = c.metrics();
assert_eq!(
after.hits, before.hits,
"expired peek must not increment hits"
);
assert_eq!(
after.misses, before.misses,
"expired peek must not increment misses"
);
assert_eq!(
after.evictions, before.evictions,
"expired peek must not increment evictions"
);
let (val2, expired2) = ConcurrentCloneCached::cache_peek_with_expiry_status(&c, &1u32);
assert_eq!(
val2.map(|x| x.v),
Some(77),
"entry must still be present after expired peek"
);
assert!(expired2, "entry must still be expired after peek");
}
#[test]
fn inherent_get_returns_option_not_result() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
let v: Option<Val> = c.get(&1);
assert!(v.is_none());
c.set(
1,
Val {
v: 42,
expired: false,
},
);
let v: Option<Val> = c.get(&1);
assert_eq!(v.map(|x| x.v), Some(42));
}
#[test]
fn inherent_get_returns_none_for_expired() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
c.set(
1,
Val {
v: 99,
expired: true,
},
);
let v: Option<Val> = c.get(&1);
assert!(
v.is_none(),
"expired entry must return None from inherent get"
);
}
#[test]
fn inherent_set_returns_previous_value() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
let prev: Option<Val> = c.set(
1,
Val {
v: 10,
expired: false,
},
);
assert!(prev.is_none());
let prev: Option<Val> = c.set(
1,
Val {
v: 20,
expired: false,
},
);
assert_eq!(prev.map(|x| x.v), Some(10));
assert_eq!(c.get(&1).map(|x| x.v), Some(20));
}
#[test]
fn inherent_remove_returns_prior_live_value() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
c.set(
1,
Val {
v: 99,
expired: false,
},
);
let v: Option<Val> = c.remove(&1);
assert_eq!(v.map(|x| x.v), Some(99));
assert!(c.remove(&1).is_none());
}
#[test]
fn inherent_remove_entry_returns_key_and_value() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.shards(1)
.max_size(64)
.build()
.unwrap();
c.set(
7,
Val {
v: 77,
expired: false,
},
);
let pair: Option<(u32, Val)> = c.remove_entry(&7);
assert_eq!(pair.map(|(k, v)| (k, v.v)), Some((7, 77)));
assert!(c.remove_entry(&7).is_none());
}
#[test]
fn inherent_delete_returns_bool() {
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
c.set(
1,
Val {
v: 10,
expired: false,
},
);
let removed: bool = c.delete(&1);
assert!(removed);
let removed: bool = c.delete(&1);
assert!(!removed);
}
#[test]
fn inherent_and_trait_methods_coexist_via_fully_qualified_path() {
fn use_trait<C>(cache: &C, k: u32, v: Val)
where
C: SyncConcurrentCached<u32, Val>,
{
let _: Result<Option<Val>, _> = ConcurrentCached::cache_set(cache, k, v);
let _: Result<Option<Val>, _> = ConcurrentCached::cache_get(cache, &k);
let _: Result<Option<Val>, _> = ConcurrentCached::cache_remove(cache, &k);
}
let c = ShardedExpiringLruCache::<u32, Val>::builder()
.max_size(64)
.build()
.unwrap();
use_trait(
&c,
1,
Val {
v: 42,
expired: false,
},
);
}
}