use super::{
base_cache::{BaseCache, HouseKeeperArc},
value_initializer::{InitResult, ValueInitializer},
CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector,
};
use crate::{
common::{
concurrent::{
constants::WRITE_RETRY_INTERVAL_MICROS, housekeeper::InnerSync, Weigher, WriteOp,
},
iter::ScanningGet,
time::{Clock, Instant},
HousekeeperConfig,
},
notification::EvictionListener,
ops::compute::{self, CompResult},
policy::{EvictionPolicy, ExpirationPolicy},
sync::{Iter, PredicateId},
Entry, Policy, PredicateError,
};
use crossbeam_channel::{Sender, TrySendError};
use equivalent::Equivalent;
use std::{
collections::hash_map::RandomState,
fmt,
hash::{BuildHasher, Hash},
sync::Arc,
time::Duration,
};
pub struct Cache<K, V, S = RandomState> {
pub(crate) base: BaseCache<K, V, S>,
value_initializer: Arc<ValueInitializer<K, V, S>>,
}
unsafe impl<K, V, S> Send for Cache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Send,
{
}
unsafe impl<K, V, S> Sync for Cache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Sync,
{
}
impl<K, V, S> Clone for Cache<K, V, S> {
fn clone(&self) -> Self {
Self {
base: self.base.clone(),
value_initializer: Arc::clone(&self.value_initializer),
}
}
}
impl<K, V, S> fmt::Debug for Cache<K, V, S>
where
K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
V: fmt::Debug + Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d_map = f.debug_map();
for (k, v) in self {
d_map.entry(&k, &v);
}
d_map.finish()
}
}
impl<K, V, S> Cache<K, V, S> {
pub fn name(&self) -> Option<&str> {
self.base.name()
}
pub fn policy(&self) -> Policy {
self.base.policy()
}
pub fn entry_count(&self) -> u64 {
self.base.entry_count()
}
pub fn weighted_size(&self) -> u64 {
self.base.weighted_size()
}
}
impl<K, V> Cache<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
pub fn new(max_capacity: u64) -> Self {
let build_hasher = RandomState::default();
Self::with_everything(
None,
Some(max_capacity),
None,
build_hasher,
None,
EvictionPolicy::default(),
None,
ExpirationPolicy::default(),
HousekeeperConfig::default(),
false,
Clock::default(),
)
}
pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
CacheBuilder::default()
}
}
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn with_everything(
name: Option<String>,
max_capacity: Option<u64>,
initial_capacity: Option<usize>,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_policy: EvictionPolicy,
eviction_listener: Option<EvictionListener<K, V>>,
expiration_policy: ExpirationPolicy<K, V>,
housekeeper_config: HousekeeperConfig,
invalidator_enabled: bool,
clock: Clock,
) -> Self {
Self {
base: BaseCache::new(
name,
max_capacity,
initial_capacity,
build_hasher.clone(),
weigher,
eviction_policy,
eviction_listener,
expiration_policy,
housekeeper_config,
invalidator_enabled,
clock,
),
value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
}
}
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
Q: Equivalent<K> + Hash + ?Sized,
{
self.base.contains_key_with_hash(key, self.base.hash(key))
}
pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
where
Q: Equivalent<K> + Hash + ?Sized,
{
self.base.contains_key_with_hash(key, hash)
}
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
Q: Equivalent<K> + Hash + ?Sized,
{
self.base
.get_with_hash(key, self.base.hash(key), false)
.map(Entry::into_value)
}
pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64, need_key: bool) -> Option<Entry<K, V>>
where
Q: Equivalent<K> + Hash + ?Sized,
{
self.base.get_with_hash(key, hash, need_key)
}
pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
where
K: Hash + Eq,
{
let hash = self.base.hash(&key);
OwnedKeyEntrySelector::new(key, hash, self)
}
pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
where
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
RefKeyEntrySelector::new(key, hash, self)
}
pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
let hash = self.base.hash(&key);
let key = Arc::new(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
.into_value()
}
pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
where
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
.into_value()
}
#[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
pub fn get_with_if(
&self,
key: K,
init: impl FnOnce() -> V,
replace_if: impl FnMut(&V) -> bool,
) -> V {
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
.into_value()
}
pub(crate) fn get_or_insert_with_hash_and_fun(
&self,
key: Arc<K>,
hash: u64,
init: impl FnOnce() -> V,
mut replace_if: Option<impl FnMut(&V) -> bool>,
need_key: bool,
) -> Entry<K, V> {
self.base
.get_with_hash_and_ignore_if(&*key, hash, replace_if.as_mut(), need_key)
.unwrap_or_else(|| self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key))
}
pub(crate) fn get_or_insert_with_hash_by_ref_and_fun<Q>(
&self,
key: &Q,
hash: u64,
init: impl FnOnce() -> V,
mut replace_if: Option<impl FnMut(&V) -> bool>,
need_key: bool,
) -> Entry<K, V>
where
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
self.base
.get_with_hash_and_ignore_if(key, hash, replace_if.as_mut(), need_key)
.unwrap_or_else(|| {
let key = Arc::new(key.to_owned());
self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
})
}
pub(crate) fn insert_with_hash_and_fun(
&self,
key: Arc<K>,
hash: u64,
init: impl FnOnce() -> V,
mut replace_if: Option<impl FnMut(&V) -> bool>,
need_key: bool,
) -> Entry<K, V> {
let get = || {
self.base
.get_with_hash_without_recording(&*key, hash, replace_if.as_mut())
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v);
let k = if need_key {
Some(Arc::clone(&key))
} else {
None
};
let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
match self
.value_initializer
.try_init_or_read(&key, type_id, get, init, insert, post_init)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Entry::new(k, v, true, false)
}
InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
InitResult::InitErr(_) => unreachable!(),
}
}
pub(crate) fn get_or_insert_with_hash(
&self,
key: Arc<K>,
hash: u64,
init: impl FnOnce() -> V,
) -> Entry<K, V> {
match self.base.get_with_hash(&*key, hash, true) {
Some(entry) => entry,
None => {
let value = init();
self.insert_with_hash(Arc::clone(&key), hash, value.clone());
Entry::new(Some(key), value, true, false)
}
}
}
pub(crate) fn get_or_insert_with_hash_by_ref<Q>(
&self,
key: &Q,
hash: u64,
init: impl FnOnce() -> V,
) -> Entry<K, V>
where
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
match self.base.get_with_hash(key, hash, true) {
Some(entry) => entry,
None => {
let key = Arc::new(key.to_owned());
let value = init();
self.insert_with_hash(Arc::clone(&key), hash, value.clone());
Entry::new(Some(key), value, true, false)
}
}
}
pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
where
F: FnOnce() -> Option<V>,
{
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
.map(Entry::into_value)
}
pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
where
F: FnOnce() -> Option<V>,
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
.map(Entry::into_value)
}
pub(super) fn get_or_optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: F,
need_key: bool,
) -> Option<Entry<K, V>>
where
F: FnOnce() -> Option<V>,
{
let entry = self.get_with_hash(&*key, hash, need_key);
if entry.is_some() {
return entry;
}
self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
}
pub(super) fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
&self,
key: &Q,
hash: u64,
init: F,
need_key: bool,
) -> Option<Entry<K, V>>
where
F: FnOnce() -> Option<V>,
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
let entry = self.get_with_hash(key, hash, need_key);
if entry.is_some() {
return entry;
}
let key = Arc::new(key.to_owned());
self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
}
pub(super) fn optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: F,
need_key: bool,
) -> Option<Entry<K, V>>
where
F: FnOnce() -> Option<V>,
{
let get = || {
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash_without_recording(&*key, hash, ignore_if)
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v);
let k = if need_key {
Some(Arc::clone(&key))
} else {
None
};
let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
match self
.value_initializer
.try_init_or_read(&key, type_id, get, init, insert, post_init)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Some(Entry::new(k, v, true, false))
}
InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
InitResult::InitErr(_) => {
crossbeam_epoch::pin().flush();
None
}
}
}
pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
.map(Entry::into_value)
}
pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
.map(Entry::into_value)
}
pub(crate) fn get_or_try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: F,
need_key: bool,
) -> Result<Entry<K, V>, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
if let Some(entry) = self.get_with_hash(&*key, hash, need_key) {
return Ok(entry);
}
self.try_insert_with_hash_and_fun(key, hash, init, need_key)
}
pub(crate) fn get_or_try_insert_with_hash_by_ref_and_fun<F, Q, E>(
&self,
key: &Q,
hash: u64,
init: F,
need_key: bool,
) -> Result<Entry<K, V>, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
if let Some(entry) = self.get_with_hash(key, hash, false) {
return Ok(entry);
}
let key = Arc::new(key.to_owned());
self.try_insert_with_hash_and_fun(key, hash, init, need_key)
}
pub(crate) fn try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: F,
need_key: bool,
) -> Result<Entry<K, V>, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
let get = || {
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash_without_recording(&*key, hash, ignore_if)
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v);
let k = if need_key {
Some(Arc::clone(&key))
} else {
None
};
let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
match self
.value_initializer
.try_init_or_read(&key, type_id, get, init, insert, post_init)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Ok(Entry::new(k, v, true, false))
}
InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
InitResult::InitErr(e) => {
crossbeam_epoch::pin().flush();
Err(e)
}
}
}
pub fn insert(&self, key: K, value: V) {
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.insert_with_hash(key, hash, value);
}
pub(crate) fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
if self.base.is_map_disabled() {
return;
}
let (op, now) = self.base.do_insert_with_hash(key, hash, value);
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.expect("Failed to insert");
}
pub(crate) fn compute_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
f: F,
) -> compute::CompResult<K, V>
where
F: FnOnce(Option<Entry<K, V>>) -> compute::Op<V>,
{
let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
match self
.value_initializer
.try_compute(key, hash, self, f, post_init, true)
{
Ok(result) => result,
Err(_) => unreachable!(),
}
}
pub(crate) fn try_compute_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
f: F,
) -> Result<compute::CompResult<K, V>, E>
where
F: FnOnce(Option<Entry<K, V>>) -> Result<compute::Op<V>, E>,
E: Send + Sync + 'static,
{
let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
self.value_initializer
.try_compute(key, hash, self, f, post_init, true)
}
pub(crate) fn upsert_with_hash_and_fun<F>(&self, key: Arc<K>, hash: u64, f: F) -> Entry<K, V>
where
F: FnOnce(Option<Entry<K, V>>) -> V,
{
let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
match self
.value_initializer
.try_compute(key, hash, self, f, post_init, false)
{
Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
_ => unreachable!(),
}
}
pub fn invalidate<Q>(&self, key: &Q)
where
Q: Equivalent<K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
self.invalidate_with_hash(key, hash, false);
}
pub fn remove<Q>(&self, key: &Q) -> Option<V>
where
Q: Equivalent<K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
self.invalidate_with_hash(key, hash, true)
}
pub(crate) fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
where
Q: Equivalent<K> + Hash + ?Sized,
{
let mut kl = None;
let mut klg = None;
if self.base.is_removal_notifier_enabled() {
if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
kl = self.base.maybe_key_lock(&arc_key);
klg = kl.as_ref().map(|kl| kl.lock());
}
}
match self.base.remove_entry(key, hash) {
None => None,
Some(kv) => {
let now = self.base.current_time();
let info = kv.entry.entry_info();
let entry_gen = info.incr_entry_gen();
if self.base.is_removal_notifier_enabled() {
self.base.notify_invalidate(&kv.key, &kv.entry);
}
std::mem::drop(klg);
std::mem::drop(kl);
let maybe_v = if need_value {
Some(kv.entry.value.clone())
} else {
None
};
let op = WriteOp::Remove {
kv_entry: kv,
entry_gen,
};
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.expect("Failed to remove");
crossbeam_epoch::pin().flush();
maybe_v
}
}
}
pub fn invalidate_all(&self) {
self.base.invalidate_all();
}
pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
where
F: Fn(&K, &V) -> bool + Send + Sync + 'static,
{
self.base.invalidate_entries_if(Arc::new(predicate))
}
pub(crate) fn invalidate_entries_with_arc_fun<F>(
&self,
predicate: Arc<F>,
) -> Result<PredicateId, PredicateError>
where
F: Fn(&K, &V) -> bool + Send + Sync + 'static,
{
self.base.invalidate_entries_if(predicate)
}
pub fn iter(&self) -> Iter<'_, K, V> {
Iter::with_single_cache_segment(&self.base, self.num_cht_segments())
}
pub fn run_pending_tasks(&self) {
if let Some(hk) = &self.base.housekeeper {
hk.run_pending_tasks(&*self.base.inner);
}
}
}
impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
type Item = (Arc<K>, V);
type IntoIter = Iter<'a, K, V>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<K, V, S> ScanningGet<K, V> for Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn num_cht_segments(&self) -> usize {
self.base.num_cht_segments()
}
fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
self.base.scanning_get(key)
}
fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
self.base.keys(cht_segment)
}
}
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
#[inline]
fn schedule_write_op(
inner: &impl InnerSync,
ch: &Sender<WriteOp<K, V>>,
op: WriteOp<K, V>,
now: Instant,
housekeeper: Option<&HouseKeeperArc>,
) -> Result<(), TrySendError<WriteOp<K, V>>> {
let mut op = op;
loop {
BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, now, housekeeper);
match ch.try_send(op) {
Ok(()) => break,
Err(TrySendError::Full(op1)) => {
op = op1;
std::thread::sleep(Duration::from_micros(WRITE_RETRY_INTERVAL_MICROS));
}
Err(e @ TrySendError::Disconnected(_)) => return Err(e),
}
}
Ok(())
}
}
#[cfg(test)]
impl<K, V, S> Cache<K, V, S> {
pub(crate) fn is_table_empty(&self) -> bool {
self.entry_count() == 0
}
pub(crate) fn is_waiter_map_empty(&self) -> bool {
self.value_initializer.waiter_count() == 0
}
}
#[cfg(test)]
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
pub(crate) fn invalidation_predicate_count(&self) -> usize {
self.base.invalidation_predicate_count()
}
pub(crate) fn reconfigure_for_testing(&mut self) {
self.base.reconfigure_for_testing();
}
pub(crate) fn key_locks_map_is_empty(&self) -> bool {
self.base.key_locks_map_is_empty()
}
}
#[cfg(test)]
mod tests {
use super::Cache;
use crate::{
common::{time::Clock, HousekeeperConfig},
notification::RemovalCause,
policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
Expiry,
};
use parking_lot::Mutex;
use std::{
convert::Infallible,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::{Duration, Instant as StdInstant},
};
#[test]
fn max_capacity_zero() {
let mut cache = Cache::new(0);
cache.reconfigure_for_testing();
let cache = cache;
cache.insert(0, ());
assert!(!cache.contains_key(&0));
assert!(cache.get(&0).is_none());
cache.run_pending_tasks();
assert!(!cache.contains_key(&0));
assert!(cache.get(&0).is_none());
assert_eq!(cache.entry_count(), 0)
}
#[test]
fn basic_single_thread() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
cache.insert("b", "bob");
assert_eq!(cache.get(&"a"), Some("alice"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b"), Some("bob"));
cache.run_pending_tasks();
cache.insert("c", "cindy");
assert_eq!(cache.get(&"c"), Some("cindy"));
assert!(cache.contains_key(&"c"));
cache.run_pending_tasks();
assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks();
cache.insert("d", "david"); expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
cache.insert("d", "david");
expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.run_pending_tasks();
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d"), None);
cache.insert("d", "dennis");
expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert_eq!(cache.get(&"c"), None);
assert_eq!(cache.get(&"d"), Some("dennis"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
cache.invalidate(&"b");
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
cache.run_pending_tasks();
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"b"));
assert!(cache.remove(&"b").is_none());
assert_eq!(cache.remove(&"d"), Some("dennis"));
expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
cache.run_pending_tasks();
assert_eq!(cache.get(&"d"), None);
assert!(!cache.contains_key(&"d"));
verify_notification_vec(&cache, actual, &expected);
assert!(cache.key_locks_map_is_empty());
}
#[test]
fn basic_lru_single_thread() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_policy(EvictionPolicy::lru())
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
cache.insert("b", "bob");
assert_eq!(cache.get(&"a"), Some("alice"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b"), Some("bob"));
cache.run_pending_tasks();
cache.insert("c", "cindy");
assert_eq!(cache.get(&"c"), Some("cindy"));
assert!(cache.contains_key(&"c"));
cache.run_pending_tasks();
assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks();
cache.insert("d", "david");
expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert_eq!(cache.get(&"c"), None);
assert_eq!(cache.get(&"d"), Some("david"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
cache.run_pending_tasks();
cache.invalidate(&"b");
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
cache.run_pending_tasks();
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"b"));
assert!(cache.remove(&"b").is_none());
assert_eq!(cache.remove(&"d"), Some("david"));
expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
cache.run_pending_tasks();
assert_eq!(cache.get(&"d"), None);
assert!(!cache.contains_key(&"d"));
cache.insert("e", "emily");
cache.insert("f", "frank");
cache.insert("g", "gina");
expected.push((Arc::new("a"), "alice", RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"e"), Some("emily"));
assert_eq!(cache.get(&"f"), Some("frank"));
assert_eq!(cache.get(&"g"), Some("gina"));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"e"));
assert!(cache.contains_key(&"f"));
assert!(cache.contains_key(&"g"));
verify_notification_vec(&cache, actual, &expected);
assert!(cache.key_locks_map_is_empty());
}
#[test]
fn size_aware_eviction() {
let weigher = |_k: &&str, v: &(&str, u32)| v.1;
let alice = ("alice", 10);
let bob = ("bob", 15);
let bill = ("bill", 20);
let cindy = ("cindy", 5);
let david = ("david", 15);
let dennis = ("dennis", 15);
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let mut cache = Cache::builder()
.max_capacity(31)
.weigher(weigher)
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", alice);
cache.insert("b", bob);
assert_eq!(cache.get(&"a"), Some(alice));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b"), Some(bob));
cache.run_pending_tasks();
cache.insert("c", cindy);
assert_eq!(cache.get(&"c"), Some(cindy));
assert!(cache.contains_key(&"c"));
cache.run_pending_tasks();
assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a"), Some(alice));
assert_eq!(cache.get(&"b"), Some(bob));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks();
cache.insert("d", david); expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.run_pending_tasks();
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d"), None);
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.run_pending_tasks();
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d"), None);
cache.insert("d", dennis);
expected.push((Arc::new("c"), cindy, RemovalCause::Size));
expected.push((Arc::new("a"), alice, RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), Some(bob));
assert_eq!(cache.get(&"c"), None);
assert_eq!(cache.get(&"d"), Some(dennis));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
cache.insert("b", bill);
expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
expected.push((Arc::new("d"), dennis, RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.get(&"b"), Some(bill));
assert_eq!(cache.get(&"d"), None);
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"d"));
cache.insert("a", alice);
cache.insert("b", bob);
expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), Some(alice));
assert_eq!(cache.get(&"b"), Some(bob));
assert_eq!(cache.get(&"d"), None);
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.entry_count(), 2);
assert_eq!(cache.weighted_size(), 25);
verify_notification_vec(&cache, actual, &expected);
assert!(cache.key_locks_map_is_empty());
}
#[test]
fn basic_multi_threads() {
let num_threads = 4;
let cache = Cache::new(100);
#[allow(clippy::needless_collect)]
let handles = (0..num_threads)
.map(|id| {
let cache = cache.clone();
std::thread::spawn(move || {
cache.insert(10, format!("{id}-100"));
cache.get(&10);
cache.insert(20, format!("{id}-200"));
cache.invalidate(&10);
})
})
.collect::<Vec<_>>();
handles.into_iter().for_each(|h| h.join().expect("Failed"));
assert!(cache.get(&10).is_none());
assert!(cache.get(&20).is_some());
assert!(!cache.contains_key(&10));
assert!(cache.contains_key(&20));
}
#[test]
fn invalidate_all() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let mut cache = Cache::builder()
.max_capacity(100)
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
cache.insert("b", "bob");
cache.insert("c", "cindy");
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert_eq!(cache.get(&"c"), Some("cindy"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(cache.contains_key(&"c"));
cache.invalidate_all();
expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
cache.run_pending_tasks();
cache.insert("d", "david");
cache.run_pending_tasks();
assert!(cache.get(&"a").is_none());
assert!(cache.get(&"b").is_none());
assert!(cache.get(&"c").is_none());
assert_eq!(cache.get(&"d"), Some("david"));
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
verify_notification_vec(&cache, actual, &expected);
}
#[test]
fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
use std::collections::HashSet;
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.support_invalidation_closures()
.eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert(0, "alice");
cache.insert(1, "bob");
cache.insert(2, "alex");
cache.run_pending_tasks();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&0), Some("alice"));
assert_eq!(cache.get(&1), Some("bob"));
assert_eq!(cache.get(&2), Some("alex"));
assert!(cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(cache.contains_key(&2));
let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
assert_eq!(cache.base.invalidation_predicate_count(), 1);
expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
mock.increment(Duration::from_secs(5));
cache.insert(3, "alice");
cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
assert!(cache.get(&0).is_none());
assert!(cache.get(&2).is_none());
assert_eq!(cache.get(&1), Some("bob"));
assert_eq!(cache.get(&3), Some("alice"));
assert!(!cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(!cache.contains_key(&2));
assert!(cache.contains_key(&3));
assert_eq!(cache.entry_count(), 2);
assert_eq!(cache.invalidation_predicate_count(), 0);
mock.increment(Duration::from_secs(5));
cache.invalidate_entries_if(|_k, &v| v == "alice")?;
cache.invalidate_entries_if(|_k, &v| v == "bob")?;
assert_eq!(cache.invalidation_predicate_count(), 2);
expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
assert!(cache.get(&1).is_none());
assert!(cache.get(&3).is_none());
assert!(!cache.contains_key(&1));
assert!(!cache.contains_key(&3));
assert_eq!(cache.entry_count(), 0);
assert_eq!(cache.invalidation_predicate_count(), 0);
verify_notification_vec(&cache, actual, &expected);
Ok(())
}
#[test]
fn time_to_live() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.time_to_live(Duration::from_secs(10))
.eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
cache.run_pending_tasks();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), Some("alice"));
assert!(cache.contains_key(&"a"));
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert!(!cache.contains_key(&"a"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks();
assert!(cache.is_table_empty());
cache.insert("b", "bob");
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&"b"), Some("bob"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
cache.insert("b", "bill");
expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
cache.run_pending_tasks();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&"b"), Some("bill"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks();
assert!(cache.is_table_empty());
verify_notification_vec(&cache, actual, &expected);
}
#[test]
fn time_to_idle() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
cache.run_pending_tasks();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), Some("alice"));
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
cache.insert("b", "bob");
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(2)); cache.run_pending_tasks();
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(3)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), Some("bob"));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 1);
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(10)); expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks();
assert!(cache.is_table_empty());
verify_notification_vec(&cache, actual, &expected);
}
#[test]
fn ensure_access_time_is_updated_immediately_after_read() {
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(10)
.time_to_idle(Duration::from_secs(5))
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert(1, 1);
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&1), Some(1));
mock.increment(Duration::from_secs(2));
assert_eq!(cache.get(&1), Some(1));
cache.run_pending_tasks();
assert_eq!(cache.get(&1), Some(1));
}
#[test]
fn time_to_live_by_expiry_type() {
struct MyExpiry {
counters: Arc<ExpiryCallCounters>,
}
impl MyExpiry {
fn new(counters: Arc<ExpiryCallCounters>) -> Self {
Self { counters }
}
}
impl Expiry<&str, &str> for MyExpiry {
fn expire_after_create(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
) -> Option<Duration> {
self.counters.incl_actual_creations();
Some(Duration::from_secs(10))
}
fn expire_after_update(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
) -> Option<Duration> {
self.counters.incl_actual_updates();
Some(Duration::from_secs(10))
}
}
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let expiry_counters = Arc::new(ExpiryCallCounters::default());
let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
expiry_counters.incl_expected_creations();
cache.run_pending_tasks();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), Some("alice"));
assert!(cache.contains_key(&"a"));
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert!(!cache.contains_key(&"a"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks();
assert!(cache.is_table_empty());
cache.insert("b", "bob");
expiry_counters.incl_expected_creations();
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&"b"), Some("bob"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
cache.insert("b", "bill");
expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
expiry_counters.incl_expected_updates();
cache.run_pending_tasks();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&"b"), Some("bill"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks();
assert!(cache.is_table_empty());
expiry_counters.verify();
verify_notification_vec(&cache, actual, &expected);
}
#[test]
fn time_to_idle_by_expiry_type() {
struct MyExpiry {
counters: Arc<ExpiryCallCounters>,
}
impl MyExpiry {
fn new(counters: Arc<ExpiryCallCounters>) -> Self {
Self { counters }
}
}
impl Expiry<&str, &str> for MyExpiry {
fn expire_after_read(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
_last_modified_at: StdInstant,
) -> Option<Duration> {
self.counters.incl_actual_reads();
Some(Duration::from_secs(10))
}
}
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let expiry_counters = Arc::new(ExpiryCallCounters::default());
let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
cache.run_pending_tasks();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
assert_eq!(cache.get(&"a"), Some("alice"));
expiry_counters.incl_expected_reads();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
cache.insert("b", "bob");
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(2)); cache.run_pending_tasks();
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(3)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), Some("bob"));
expiry_counters.incl_expected_reads();
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 1);
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(10)); expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks();
assert!(cache.is_table_empty());
expiry_counters.verify();
verify_notification_vec(&cache, actual, &expected);
}
#[test]
fn test_expiry_using_get_with() {
struct NoExpiry {
counters: Arc<ExpiryCallCounters>,
}
impl NoExpiry {
fn new(counters: Arc<ExpiryCallCounters>) -> Self {
Self { counters }
}
}
impl Expiry<&str, &str> for NoExpiry {
fn expire_after_create(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
) -> Option<Duration> {
self.counters.incl_actual_creations();
None
}
fn expire_after_read(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
_last_modified_at: StdInstant,
) -> Option<Duration> {
self.counters.incl_actual_reads();
None
}
fn expire_after_update(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
) -> Option<Duration> {
unreachable!("The `expire_after_update()` method should not be called.");
}
}
let expiry_counters = Arc::new(ExpiryCallCounters::default());
let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
let mut cache = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.get_with("a", || "alice");
expiry_counters.incl_expected_creations();
cache.run_pending_tasks();
cache.get_with("a", || "alex");
expiry_counters.incl_expected_reads();
cache.run_pending_tasks();
cache.invalidate("a");
cache.get_with("a", || "amanda");
expiry_counters.incl_expected_creations();
cache.run_pending_tasks();
expiry_counters.verify();
}
#[test]
fn expire_after_update_none_on_expired_entry() {
use std::sync::atomic::{AtomicBool, Ordering};
let should_expire = Arc::new(AtomicBool::new(true));
struct TestExpiry {
should_expire: Arc<AtomicBool>,
}
impl crate::Expiry<String, String> for TestExpiry {
fn expire_after_create(
&self,
_key: &String,
_value: &String,
_current_time: StdInstant,
) -> Option<Duration> {
if self.should_expire.load(Ordering::SeqCst) {
Some(Duration::ZERO)
} else {
None
}
}
fn expire_after_update(
&self,
_key: &String,
_value: &String,
_current_time: StdInstant,
_duration_until_expiry: Option<Duration>,
) -> Option<Duration> {
if self.should_expire.load(Ordering::SeqCst) {
Some(Duration::ZERO)
} else {
None
}
}
}
let expiry = TestExpiry {
should_expire: Arc::clone(&should_expire),
};
let cache: Cache<String, String> = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.build();
let key = "test_key".to_string();
cache.insert(key.clone(), "first_value".to_string());
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 1, "Entry should exist in cache");
assert_eq!(
cache.get(&key),
None,
"Entry should not be accessible (expired)"
);
cache.run_pending_tasks();
should_expire.store(false, Ordering::SeqCst);
cache.insert(key.clone(), "second_value".to_string());
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 1, "Entry should exist in cache");
let result = cache.get(&key);
assert_eq!(
result,
Some("second_value".to_string()),
"Entry should be accessible after clearing expiration"
);
}
#[test]
fn test_expire_after_create_only_no_stale_entries() {
use std::sync::atomic::{AtomicBool, Ordering};
struct TestExpiry {
should_expire: Arc<AtomicBool>,
}
impl crate::Expiry<String, String> for TestExpiry {
fn expire_after_create(
&self,
_key: &String,
_value: &String,
_current_time: StdInstant,
) -> Option<Duration> {
if self.should_expire.load(Ordering::SeqCst) {
Some(Duration::from_secs(1))
} else {
None
}
}
}
let (clock, mock) = Clock::mock();
let should_expire = Arc::new(AtomicBool::new(true));
let mut cache: Cache<String, String> = Cache::builder()
.max_capacity(100)
.expire_after(TestExpiry {
should_expire: Arc::clone(&should_expire),
})
.clock(clock)
.build();
cache.reconfigure_for_testing();
let key = "key1".to_string();
cache.insert(key.clone(), "value1".to_string());
cache.run_pending_tasks();
assert_eq!(cache.get(&key), Some("value1".to_string()));
mock.increment(Duration::from_secs(2));
assert_eq!(cache.get(&key), None, "Entry should be expired");
should_expire.store(false, Ordering::SeqCst);
cache.insert(key.clone(), "value2".to_string());
cache.run_pending_tasks();
assert_eq!(
cache.get(&key),
Some("value2".to_string()),
"Re-inserted entry with no expiry should be accessible"
);
should_expire.store(true, Ordering::SeqCst);
let key2 = "key2".to_string();
cache.insert(key2.clone(), "v1".to_string());
cache.run_pending_tasks();
assert_eq!(cache.get(&key2), Some("v1".to_string()));
mock.increment(Duration::from_secs(2));
assert_eq!(cache.get(&key2), None, "key2 should be expired");
cache.insert(key2.clone(), "v2".to_string());
cache.run_pending_tasks();
assert_eq!(
cache.get(&key2),
Some("v2".to_string()),
"Re-inserted key2 should be accessible with fresh TTL"
);
mock.increment(Duration::from_secs(2));
assert_eq!(
cache.get(&key2),
None,
"key2 should expire with fresh TTL from expire_after_create"
);
}
#[test]
fn test_race_between_updating_entry_and_processing_its_write_ops() {
let (clock, mock) = Clock::mock();
let cache = Cache::builder()
.max_capacity(2)
.time_to_idle(Duration::from_secs(1))
.clock(clock)
.build();
cache.insert("a", "alice");
cache.insert("b", "bob");
cache.insert("c", "cathy"); mock.increment(Duration::from_secs(2));
cache.insert("c", "cindy");
assert_eq!(cache.remove(&"c"), Some("cindy"));
mock.increment(Duration::from_secs(2));
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 0);
}
#[test]
fn test_race_between_recreating_entry_and_processing_its_write_ops() {
let cache = Cache::builder().max_capacity(2).build();
cache.insert('a', "a");
cache.insert('b', "b");
cache.run_pending_tasks();
cache.insert('c', "c1"); assert!(cache.remove(&'a').is_some()); assert!(cache.remove(&'b').is_some()); assert!(cache.remove(&'c').is_some()); cache.insert('c', "c2");
cache.run_pending_tasks();
assert_eq!(cache.get(&'c'), Some("c2"));
}
#[test]
fn test_iter() {
const NUM_KEYS: usize = 50;
fn make_value(key: usize) -> String {
format!("val: {key}")
}
let cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.build();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}
let mut key_set = std::collections::HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
assert_eq!(key_set.len(), NUM_KEYS);
}
#[test]
fn test_iter_multi_threads() {
use std::collections::HashSet;
const NUM_KEYS: usize = 1024;
const NUM_THREADS: usize = 16;
fn make_value(key: usize) -> String {
format!("val: {key}")
}
let cache = Cache::builder()
.max_capacity(2048)
.time_to_idle(Duration::from_secs(10))
.build();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}
let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
let write_lock = rw_lock.write().unwrap();
#[allow(clippy::needless_collect)]
let handles = (0..NUM_THREADS)
.map(|n| {
let cache = cache.clone();
let rw_lock = Arc::clone(&rw_lock);
if n % 2 == 0 {
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}
std::mem::drop(read_lock);
})
} else {
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
let mut key_set = HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
assert_eq!(key_set.len(), NUM_KEYS);
std::mem::drop(read_lock);
})
}
})
.collect::<Vec<_>>();
std::mem::drop(write_lock);
handles.into_iter().for_each(|h| h.join().expect("Failed"));
let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
assert_eq!(key_set.len(), NUM_KEYS);
}
#[test]
fn get_with() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.get_with(KEY, || {
sleep(Duration::from_millis(300));
"thread1"
});
assert_eq!(v, "thread1");
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.get_with(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache3.get_with(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache4.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache5.get(&KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
for t in [thread1, thread2, thread3, thread4, thread5] {
t.join().expect("Failed to join");
}
assert!(cache.is_waiter_map_empty());
}
#[test]
fn get_with_by_ref() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: &u32 = &0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
"thread1"
});
assert_eq!(v, "thread1");
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.get_with_by_ref(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache3.get_with_by_ref(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache4.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache5.get(KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
for t in [thread1, thread2, thread3, thread4, thread5] {
t.join().expect("Failed to join");
}
assert!(cache.is_waiter_map_empty());
}
#[test]
fn entry_or_insert_with_if() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let entry = cache1.entry(KEY).or_insert_with_if(
|| {
sleep(Duration::from_millis(300));
"thread1"
},
|_v| unreachable!(),
);
assert!(entry.is_fresh());
assert_eq!(entry.into_value(), "thread1");
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let entry = cache2
.entry(KEY)
.or_insert_with_if(|| unreachable!(), |_v| unreachable!());
assert!(!entry.is_fresh());
assert_eq!(entry.into_value(), "thread1");
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(350));
let entry = cache3.entry(KEY).or_insert_with_if(
|| unreachable!(),
|v| {
assert_eq!(v, &"thread1");
false
},
);
assert!(!entry.is_fresh());
assert_eq!(entry.into_value(), "thread1");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let entry = cache4.entry(KEY).or_insert_with_if(
|| "thread4",
|v| {
assert_eq!(v, &"thread1");
true
},
);
assert!(entry.is_fresh());
assert_eq!(entry.into_value(), "thread4");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache5.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(350));
let maybe_v = cache6.get(&KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(450));
let maybe_v = cache7.get(&KEY);
assert_eq!(maybe_v, Some("thread4"));
})
};
for t in [
thread1, thread2, thread3, thread4, thread5, thread6, thread7,
] {
t.join().expect("Failed to join");
}
assert!(cache.is_waiter_map_empty());
}
#[test]
fn entry_by_ref_or_insert_with_if() {
use std::thread::{sleep, spawn};
let cache: Cache<u32, &str> = Cache::new(100);
const KEY: &u32 = &0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1
.entry_by_ref(KEY)
.or_insert_with_if(
|| {
sleep(Duration::from_millis(300));
"thread1"
},
|_v| unreachable!(),
)
.into_value();
assert_eq!(v, "thread1");
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2
.entry_by_ref(KEY)
.or_insert_with_if(|| unreachable!(), |_v| unreachable!())
.into_value();
assert_eq!(v, "thread1");
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(350));
let v = cache3
.entry_by_ref(KEY)
.or_insert_with_if(
|| unreachable!(),
|v| {
assert_eq!(v, &"thread1");
false
},
)
.into_value();
assert_eq!(v, "thread1");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache4
.entry_by_ref(KEY)
.or_insert_with_if(
|| "thread4",
|v| {
assert_eq!(v, &"thread1");
true
},
)
.into_value();
assert_eq!(v, "thread4");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache5.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(350));
let maybe_v = cache6.get(KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(450));
let maybe_v = cache7.get(KEY);
assert_eq!(maybe_v, Some("thread4"));
})
};
for t in [
thread1, thread2, thread3, thread4, thread5, thread6, thread7,
] {
t.join().expect("Failed to join");
}
assert!(cache.is_waiter_map_empty());
}
#[test]
fn try_get_with() {
use std::{
sync::Arc,
thread::{sleep, spawn},
};
#[derive(Debug)]
pub struct MyError(#[allow(dead_code)] String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.try_get_with(KEY, || {
sleep(Duration::from_millis(300));
Err(MyError("thread1 error".into()))
});
assert!(v.is_err());
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
assert!(v.is_err());
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v: MyResult<_> = cache3.try_get_with(KEY, || {
sleep(Duration::from_millis(300));
Ok("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in [
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
assert!(cache.is_waiter_map_empty());
}
#[test]
fn try_get_with_by_ref() {
use std::{
sync::Arc,
thread::{sleep, spawn},
};
#[derive(Debug)]
pub struct MyError(#[allow(dead_code)] String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = Cache::new(100);
const KEY: &u32 = &0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.try_get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
Err(MyError("thread1 error".into()))
});
assert!(v.is_err());
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v: MyResult<_> = cache2.try_get_with_by_ref(KEY, || unreachable!());
assert!(v.is_err());
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v: MyResult<_> = cache3.try_get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
Ok("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
let v: MyResult<_> = cache4.try_get_with_by_ref(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let v: MyResult<_> = cache5.try_get_with_by_ref(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in [
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
assert!(cache.is_waiter_map_empty());
}
#[test]
fn optionally_get_with() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.optionally_get_with(KEY, || {
sleep(Duration::from_millis(300));
None
});
assert!(v.is_none());
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.optionally_get_with(KEY, || unreachable!());
assert!(v.is_none());
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache3.optionally_get_with(KEY, || {
sleep(Duration::from_millis(300));
Some("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
let v = cache4.optionally_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let v = cache5.optionally_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in [
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
assert!(cache.is_waiter_map_empty());
}
#[test]
fn optionally_get_with_by_ref() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: &u32 = &0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.optionally_get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
None
});
assert!(v.is_none());
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.optionally_get_with_by_ref(KEY, || unreachable!());
assert!(v.is_none());
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache3.optionally_get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
Some("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
let v = cache4.optionally_get_with_by_ref(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let v = cache5.optionally_get_with_by_ref(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in [
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
assert!(cache.is_waiter_map_empty());
}
#[test]
fn upsert_with() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
cache1.entry(KEY).and_upsert_with(|maybe_entry| {
sleep(Duration::from_millis(200));
assert!(maybe_entry.is_none());
1
})
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
cache2.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
sleep(Duration::from_millis(200));
let entry = maybe_entry.expect("The entry should exist");
entry.into_value() + 1
})
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(300));
cache3.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
sleep(Duration::from_millis(100));
let entry = maybe_entry.expect("The entry should exist");
entry.into_value() + 1
})
})
};
let ent1 = thread1.join().expect("Thread 1 should finish");
let ent2 = thread2.join().expect("Thread 2 should finish");
let ent3 = thread3.join().expect("Thread 3 should finish");
assert_eq!(ent1.into_value(), 1);
assert_eq!(ent2.into_value(), 2);
assert_eq!(ent3.into_value(), 3);
assert_eq!(cache.get(&KEY), Some(3));
assert!(cache.is_waiter_map_empty());
}
#[test]
fn compute_with() {
use crate::ops::compute;
use std::{
sync::RwLock,
thread::{sleep, spawn},
};
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
cache1.entry(KEY).and_compute_with(|maybe_entry| {
sleep(Duration::from_millis(200));
assert!(maybe_entry.is_none());
compute::Op::Put(Arc::new(RwLock::new(vec![1])))
})
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
cache2.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().unwrap(), vec![1]);
sleep(Duration::from_millis(200));
value.write().unwrap().push(2);
compute::Op::Put(value)
})
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(300));
cache3.entry(KEY).and_compute_with(|maybe_entry| {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().unwrap(), vec![1, 2]);
sleep(Duration::from_millis(200));
compute::Op::Remove
})
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
cache4.entry(KEY).and_compute_with(|maybe_entry| {
assert!(maybe_entry.is_none());
sleep(Duration::from_millis(200));
compute::Op::Nop
})
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(700));
cache5.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
assert!(maybe_entry.is_none());
sleep(Duration::from_millis(200));
compute::Op::Put(Arc::new(RwLock::new(vec![5])))
})
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(900));
cache6.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().unwrap(), vec![5]);
sleep(Duration::from_millis(100));
compute::Op::Nop
})
})
};
let res1 = thread1.join().expect("Thread 1 should finish");
let res2 = thread2.join().expect("Thread 2 should finish");
let res3 = thread3.join().expect("Thread 3 should finish");
let res4 = thread4.join().expect("Thread 4 should finish");
let res5 = thread5.join().expect("Thread 5 should finish");
let res6 = thread6.join().expect("Thread 6 should finish");
let compute::CompResult::Inserted(entry) = res1 else {
panic!("Expected `Inserted`. Got {res1:?}")
};
assert_eq!(
*entry.into_value().read().unwrap(),
vec![1, 2] );
let compute::CompResult::ReplacedWith(entry) = res2 else {
panic!("Expected `ReplacedWith`. Got {res2:?}")
};
assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
let compute::CompResult::Removed(entry) = res3 else {
panic!("Expected `Removed`. Got {res3:?}")
};
assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
let compute::CompResult::StillNone(key) = res4 else {
panic!("Expected `StillNone`. Got {res4:?}")
};
assert_eq!(*key, KEY);
let compute::CompResult::Inserted(entry) = res5 else {
panic!("Expected `Inserted`. Got {res5:?}")
};
assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
let compute::CompResult::Unchanged(entry) = res6 else {
panic!("Expected `Unchanged`. Got {res6:?}")
};
assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
assert!(cache.is_waiter_map_empty());
}
#[test]
fn try_compute_with() {
use crate::ops::compute;
use std::{
sync::RwLock,
thread::{sleep, spawn},
};
let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
cache1.entry(KEY).and_try_compute_with(|maybe_entry| {
sleep(Duration::from_millis(200));
assert!(maybe_entry.is_none());
Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
})
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
cache2
.entry_by_ref(&KEY)
.and_try_compute_with(|maybe_entry| {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().unwrap(), vec![1]);
sleep(Duration::from_millis(200));
value.write().unwrap().push(2);
Ok(compute::Op::Put(value)) as Result<_, ()>
})
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(300));
cache3.entry(KEY).and_try_compute_with(|maybe_entry| {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().unwrap(), vec![1, 2]);
sleep(Duration::from_millis(200));
Err(())
})
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
cache4.entry(KEY).and_try_compute_with(|maybe_entry| {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().unwrap(), vec![1, 2]);
sleep(Duration::from_millis(100));
Ok(compute::Op::Remove) as Result<_, ()>
})
})
};
let res1 = thread1.join().expect("Thread 1 should finish");
let res2 = thread2.join().expect("Thread 2 should finish");
let res3 = thread3.join().expect("Thread 3 should finish");
let res4 = thread4.join().expect("Thread 4 should finish");
let Ok(compute::CompResult::Inserted(entry)) = res1 else {
panic!("Expected `Inserted`. Got {res1:?}")
};
assert_eq!(
*entry.into_value().read().unwrap(),
vec![1, 2] );
let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
panic!("Expected `ReplacedWith`. Got {res2:?}")
};
assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
assert!(res3.is_err());
let Ok(compute::CompResult::Removed(entry)) = res4 else {
panic!("Expected `Removed`. Got {res4:?}")
};
assert_eq!(
*entry.into_value().read().unwrap(),
vec![1, 2] );
assert!(cache.is_waiter_map_empty());
}
#[test]
fn handle_panic_in_get_with() {
use std::{sync::Barrier, thread};
let cache = Cache::new(16);
let barrier = Arc::new(Barrier::new(2));
{
let cache_ref = cache.clone();
let barrier_ref = barrier.clone();
thread::spawn(move || {
let _ = cache_ref.get_with(1, || {
barrier_ref.wait();
thread::sleep(Duration::from_millis(50));
panic!("Panic during get_with");
});
});
}
barrier.wait();
assert_eq!(cache.get_with(1, || 5), 5);
assert!(cache.is_waiter_map_empty());
}
#[test]
fn handle_panic_in_try_get_with() {
use std::{sync::Barrier, thread};
let cache = Cache::new(16);
let barrier = Arc::new(Barrier::new(2));
{
let cache_ref = cache.clone();
let barrier_ref = barrier.clone();
thread::spawn(move || {
let _ = cache_ref.try_get_with(1, || {
barrier_ref.wait();
thread::sleep(Duration::from_millis(50));
panic!("Panic during try_get_with");
}) as Result<_, Arc<Infallible>>;
});
}
barrier.wait();
assert_eq!(
cache.try_get_with(1, || Ok(5)) as Result<_, Arc<Infallible>>,
Ok(5)
);
assert!(cache.is_waiter_map_empty());
}
#[test]
fn test_removal_notifications() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert('a', "alice");
cache.invalidate(&'a');
expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 0);
cache.insert('b', "bob");
cache.insert('c', "cathy");
cache.insert('d', "david");
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 3);
cache.insert('e', "emily");
expected.push((Arc::new('e'), "emily", RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 3);
cache.get(&'e');
cache.run_pending_tasks();
cache.insert('e', "eliza");
expected.push((Arc::new('b'), "bob", RemovalCause::Size));
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 3);
cache.insert('d', "dennis");
expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 3);
verify_notification_vec(&cache, actual, &expected);
}
#[test]
fn test_immediate_removal_notifications_with_updates() {
let actual = Arc::new(Mutex::new(Vec::new()));
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.eviction_listener(listener)
.time_to_live(Duration::from_secs(7))
.time_to_idle(Duration::from_secs(5))
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("alice", "a0");
cache.run_pending_tasks();
mock.increment(Duration::from_secs(6));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
cache.insert("alice", "a1");
{
let mut a = actual.lock();
assert_eq!(a.len(), 1);
assert_eq!(a[0], (Arc::new("alice"), "a0", RemovalCause::Expired));
a.clear();
}
cache.run_pending_tasks();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), Some("a1"));
cache.run_pending_tasks();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
cache.insert("alice", "a2");
{
let mut a = actual.lock();
assert_eq!(a.len(), 1);
assert_eq!(a[0], (Arc::new("alice"), "a1", RemovalCause::Expired));
a.clear();
}
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(6));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
cache.invalidate(&"alice");
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 0);
{
let mut a = actual.lock();
assert_eq!(a.len(), 1);
assert_eq!(a[0], (Arc::new("alice"), "a2", RemovalCause::Expired));
a.clear();
}
cache.insert("alice", "a3");
cache.run_pending_tasks();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), Some("a3"));
cache.run_pending_tasks();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
cache.invalidate(&"alice");
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), 0);
{
let mut a = actual.lock();
assert_eq!(a.len(), 1);
assert_eq!(a[0], (Arc::new("alice"), "a3", RemovalCause::Expired));
a.clear();
}
assert!(cache.key_locks_map_is_empty());
}
#[test]
#[cfg_attr(not(run_flaky_tests), ignore)]
fn test_key_lock_used_by_immediate_removal_notifications() {
use std::thread::{sleep, spawn};
const KEY: &str = "alice";
type Val = &'static str;
#[derive(PartialEq, Eq, Debug)]
enum Event {
Insert(Val),
Invalidate(Val),
BeginNotify(Val, RemovalCause),
EndNotify(Val, RemovalCause),
}
let actual = Arc::new(Mutex::new(Vec::new()));
let a0 = Arc::clone(&actual);
let listener = move |_k, v, cause| {
a0.lock().push(Event::BeginNotify(v, cause));
sleep(Duration::from_millis(300));
a0.lock().push(Event::EndNotify(v, cause));
};
let mut cache = Cache::builder()
.eviction_listener(listener)
.time_to_live(Duration::from_millis(500))
.build();
cache.reconfigure_for_testing();
let cache = cache;
let expected = vec![
Event::Insert("a0"),
Event::Insert("a1"),
Event::BeginNotify("a0", RemovalCause::Expired),
Event::Insert("a2"),
Event::EndNotify("a0", RemovalCause::Expired),
Event::BeginNotify("a1", RemovalCause::Replaced),
Event::Invalidate("a2"),
Event::EndNotify("a1", RemovalCause::Replaced),
Event::BeginNotify("a2", RemovalCause::Explicit),
Event::EndNotify("a2", RemovalCause::Explicit),
];
actual.lock().push(Event::Insert("a0"));
cache.insert(KEY, "a0");
cache.run_pending_tasks();
let thread1 = {
let a1 = Arc::clone(&actual);
let c1 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(600));
a1.lock().push(Event::Insert("a1"));
c1.insert(KEY, "a1");
})
};
let thread2 = {
let a2 = Arc::clone(&actual);
let c2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
a2.lock().push(Event::Insert("a2"));
c2.insert(KEY, "a2");
})
};
let thread3 = {
let a3 = Arc::clone(&actual);
let c3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(1100));
a3.lock().push(Event::Invalidate("a2"));
c3.invalidate(&KEY);
})
};
for t in [thread1, thread2, thread3] {
t.join().expect("Failed to join");
}
let actual = actual.lock();
assert_eq!(actual.len(), expected.len());
for (i, (actual, expected)) in actual.iter().zip(&expected).enumerate() {
assert_eq!(actual, expected, "expected[{i}]");
}
assert!(cache.key_locks_map_is_empty());
}
#[test]
fn no_batch_size_limit_on_eviction() {
const MAX_CAPACITY: u64 = 20;
const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
const MAX_LOG_SYNC_REPEATS: u32 = 1;
const EVICTION_BATCH_SIZE: u32 = 1;
let hk_conf = HousekeeperConfig::new(
Some(EVICTION_TIMEOUT),
Some(MAX_LOG_SYNC_REPEATS),
Some(EVICTION_BATCH_SIZE),
);
let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY)
.eviction_policy(EvictionPolicy::lru())
.housekeeper_config(hk_conf)
.build();
cache.reconfigure_for_testing();
let cache = cache;
for i in 0..MAX_CAPACITY {
let v = format!("v{i}");
cache.insert(i, v)
}
assert_eq!(cache.entry_count(), 0);
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), MAX_CAPACITY);
for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
let v = format!("v{i}");
cache.insert(i, v)
}
assert_eq!(cache.entry_count(), MAX_CAPACITY);
assert!(cache.contains_key(&0)); assert!(cache.contains_key(&(MAX_CAPACITY - 1))); assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
cache.run_pending_tasks();
assert_eq!(cache.entry_count(), MAX_CAPACITY);
assert!(!cache.contains_key(&0));
assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
}
#[test]
fn slow_eviction_listener() {
const MAX_CAPACITY: u64 = 20;
const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
const LISTENER_DELAY: Duration = Duration::from_millis(11);
const MAX_LOG_SYNC_REPEATS: u32 = 1;
const EVICTION_BATCH_SIZE: u32 = 1;
let hk_conf = HousekeeperConfig::new(
Some(EVICTION_TIMEOUT),
Some(MAX_LOG_SYNC_REPEATS),
Some(EVICTION_BATCH_SIZE),
);
let (clock, mock) = Clock::mock();
let listener_call_count = Arc::new(AtomicU8::new(0));
let lcc = Arc::clone(&listener_call_count);
let listener = move |_k, _v, _cause| {
mock.increment(LISTENER_DELAY);
lcc.fetch_add(1, Ordering::AcqRel);
};
let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY)
.eviction_policy(EvictionPolicy::lru())
.eviction_listener(listener)
.housekeeper_config(hk_conf)
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
for i in 0..MAX_CAPACITY {
let v = format!("v{i}");
cache.insert(i, v)
}
assert_eq!(cache.entry_count(), 0);
cache.run_pending_tasks();
assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
assert_eq!(cache.entry_count(), MAX_CAPACITY);
for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
let v = format!("v{i}");
cache.insert(i, v);
}
assert_eq!(cache.entry_count(), MAX_CAPACITY);
cache.run_pending_tasks();
let mut expected_call_count = 3;
assert_eq!(
listener_call_count.load(Ordering::Acquire) as u64,
expected_call_count
);
assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
loop {
cache.run_pending_tasks();
expected_call_count += 3;
if expected_call_count > MAX_CAPACITY {
expected_call_count = MAX_CAPACITY;
}
let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
assert_eq!(actual_count, expected_call_count);
let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
assert_eq!(cache.entry_count(), expected_entry_count);
if expected_call_count >= MAX_CAPACITY {
break;
}
}
assert_eq!(cache.entry_count(), MAX_CAPACITY);
}
#[test]
fn recover_from_panicking_eviction_listener() {
#[cfg(feature = "logging")]
let _ = env_logger::builder().is_test(true).try_init();
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| {
if v == "panic now!" {
panic!("Panic now!");
}
a1.lock().push((k, v, cause))
};
let mut cache = Cache::builder()
.name("My Sync Cache")
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("alice", "a0");
cache.run_pending_tasks();
cache.insert("alice", "panic now!");
expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
cache.run_pending_tasks();
cache.insert("alice", "a2");
cache.run_pending_tasks();
cache.invalidate(&"alice");
cache.run_pending_tasks();
verify_notification_vec(&cache, actual, &expected);
}
#[test]
fn borrowed_forms_of_key() {
let cache: Cache<Vec<u8>, ()> = Cache::new(1);
let key = vec![1_u8];
cache.insert(key.clone(), ());
let key_v: &Vec<u8> = &key;
assert!(cache.contains_key(key_v));
assert_eq!(cache.get(key_v), Some(()));
cache.invalidate(key_v);
cache.insert(key, ());
let key_s: &[u8] = &[1_u8];
assert!(cache.contains_key(key_s));
assert_eq!(cache.get(key_s), Some(()));
cache.invalidate(key_s);
}
#[test]
#[cfg_attr(not(run_flaky_tests), ignore)]
fn drop_value_immediately_after_eviction() {
use crate::common::test_utils::{Counters, Value};
const MAX_CAPACITY: u32 = 500;
const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
let counters = Arc::new(Counters::default());
let counters1 = Arc::clone(&counters);
let listener = move |_k, _v, cause| match cause {
RemovalCause::Size => counters1.incl_evicted(),
RemovalCause::Explicit => counters1.incl_invalidated(),
_ => (),
};
let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY as u64)
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
let cache = cache;
for key in 0..KEYS {
let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
cache.insert(key, value);
counters.incl_inserted();
cache.run_pending_tasks();
}
let eviction_count = KEYS - MAX_CAPACITY;
cache.run_pending_tasks();
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), 0, "invalidated");
assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
for key in 0..KEYS {
cache.invalidate(&key);
cache.run_pending_tasks();
}
cache.run_pending_tasks();
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
std::mem::drop(cache);
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
}
#[test]
#[cfg_attr(not(run_flaky_tests), ignore)]
fn ensure_gc_runs_when_dropping_cache() {
let cache = Cache::builder().build();
let val = Arc::new(0);
{
let val = Arc::clone(&val);
cache.get_with(1, move || val);
}
drop(cache);
assert_eq!(Arc::strong_count(&val), 1);
}
#[test]
fn test_debug_format() {
let cache = Cache::new(10);
cache.insert('a', "alice");
cache.insert('b', "bob");
cache.insert('c', "cindy");
let debug_str = format!("{cache:?}");
assert!(debug_str.starts_with('{'));
assert!(debug_str.contains(r#"'a': "alice""#));
assert!(debug_str.contains(r#"'b': "bob""#));
assert!(debug_str.contains(r#"'c': "cindy""#));
assert!(debug_str.ends_with('}'));
}
type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
fn verify_notification_vec<K, V, S>(
cache: &Cache<K, V, S>,
actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
expected: &[NotificationTuple<K, V>],
) where
K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
{
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
cache.run_pending_tasks();
std::thread::sleep(Duration::from_millis(500));
let actual = &*actual.lock();
if actual.len() != expected.len() {
if retries <= MAX_RETRIES {
retries += 1;
continue;
} else {
assert_eq!(actual.len(), expected.len(), "Retries exhausted");
}
}
for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
assert_eq!(actual, expected, "expected[{i}]");
}
break;
}
}
#[test]
fn test_optionally_get_with_expired_entry_bug() {
use std::sync::atomic::{AtomicU32, Ordering};
struct CreateOnlyExpiry;
impl Expiry<&str, u32> for CreateOnlyExpiry {
fn expire_after_create(
&self,
_key: &&str,
_value: &u32,
_current_time: StdInstant,
) -> Option<Duration> {
Some(Duration::from_secs(2))
}
}
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.expire_after(CreateOnlyExpiry)
.clock(clock)
.build();
cache.reconfigure_for_testing();
let cache = cache;
let counter = AtomicU32::new(0);
let next_value = || {
let v = counter.fetch_add(1, Ordering::SeqCst) + 1;
Some(v)
};
let value = cache.optionally_get_with("key", next_value);
assert_eq!(value, Some(1), "First access should return 1");
cache.run_pending_tasks();
mock.increment(Duration::from_millis(1980));
let value = cache.optionally_get_with("key", next_value);
assert_eq!(value, Some(1), "Access at 1.98s should hit and return 1");
mock.increment(Duration::from_millis(30)); let value = cache.optionally_get_with("key", next_value);
assert_eq!(value, Some(2), "Access at 2.01s should miss and return 2");
cache.run_pending_tasks();
mock.increment(Duration::from_secs(10)); cache.run_pending_tasks();
let value = cache.optionally_get_with("key", next_value);
assert_ne!(value, Some(2), "Access at 12.01s should not still be 2");
}
}