use equivalent::Equivalent;
use super::{
base_cache::BaseCache,
value_initializer::{InitResult, ValueInitializer},
CacheBuilder, CancelGuard, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector,
WriteOp,
};
use crate::{
common::{concurrent::Weigher, time::Clock, HousekeeperConfig},
notification::AsyncEvictionListener,
ops::compute::{self, CompResult},
policy::{EvictionPolicy, ExpirationPolicy},
Entry, Policy, PredicateError,
};
#[cfg(feature = "unstable-debug-counters")]
use crate::common::concurrent::debug_counters::CacheDebugStats;
use std::{
collections::hash_map::RandomState,
fmt,
future::Future,
hash::{BuildHasher, Hash},
pin::Pin,
sync::Arc,
};
#[cfg(test)]
use std::sync::atomic::{AtomicBool, Ordering};
pub struct Cache<K, V, S = RandomState> {
pub(crate) base: BaseCache<K, V, S>,
value_initializer: Arc<ValueInitializer<K, V, S>>,
#[cfg(test)]
schedule_write_op_should_block: AtomicBool,
}
unsafe impl<K, V, S> Send for Cache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Send,
{
}
unsafe impl<K, V, S> Sync for Cache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Sync,
{
}
impl<K, V, S> Clone for Cache<K, V, S> {
fn clone(&self) -> Self {
Self {
base: self.base.clone(),
value_initializer: Arc::clone(&self.value_initializer),
#[cfg(test)]
schedule_write_op_should_block: AtomicBool::new(
self.schedule_write_op_should_block.load(Ordering::Acquire),
),
}
}
}
impl<K, V, S> fmt::Debug for Cache<K, V, S>
where
K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
V: fmt::Debug + Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d_map = f.debug_map();
for (k, v) in self {
d_map.entry(&k, &v);
}
d_map.finish()
}
}
impl<K, V, S> Cache<K, V, S> {
pub fn name(&self) -> Option<&str> {
self.base.name()
}
pub fn policy(&self) -> Policy {
self.base.policy()
}
pub fn entry_count(&self) -> u64 {
self.base.entry_count()
}
pub fn weighted_size(&self) -> u64 {
self.base.weighted_size()
}
#[cfg(feature = "unstable-debug-counters")]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-debug-counters")))]
pub async fn debug_stats(&self) -> CacheDebugStats {
self.base.debug_stats().await
}
}
impl<K, V> Cache<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
pub fn new(max_capacity: u64) -> Self {
let build_hasher = RandomState::default();
Self::with_everything(
None,
Some(max_capacity),
None,
build_hasher,
None,
EvictionPolicy::default(),
None,
ExpirationPolicy::default(),
HousekeeperConfig::default(),
false,
Clock::default(),
)
}
pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
CacheBuilder::default()
}
}
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn with_everything(
name: Option<String>,
max_capacity: Option<u64>,
initial_capacity: Option<usize>,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_policy: EvictionPolicy,
eviction_listener: Option<AsyncEvictionListener<K, V>>,
expiration_policy: ExpirationPolicy<K, V>,
housekeeper_config: HousekeeperConfig,
invalidator_enabled: bool,
clock: Clock,
) -> Self {
Self {
base: BaseCache::new(
name,
max_capacity,
initial_capacity,
build_hasher.clone(),
weigher,
eviction_policy,
eviction_listener,
expiration_policy,
housekeeper_config,
invalidator_enabled,
clock,
),
value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
#[cfg(test)]
schedule_write_op_should_block: Default::default(), }
}
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
Q: Equivalent<K> + Hash + ?Sized,
{
self.base.contains_key_with_hash(key, self.base.hash(key))
}
pub async fn get<Q>(&self, key: &Q) -> Option<V>
where
Q: Equivalent<K> + Hash + ?Sized,
{
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash(key, self.base.hash(key), ignore_if, false, true)
.await
.map(Entry::into_value)
}
pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
where
K: Hash + Eq,
{
let hash = self.base.hash(&key);
OwnedKeyEntrySelector::new(key, hash, self)
}
pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
where
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
RefKeyEntrySelector::new(key, hash, self)
}
pub async fn get_with(&self, key: K, init: impl Future<Output = V>) -> V {
futures_util::pin_mut!(init);
let hash = self.base.hash(&key);
let key = Arc::new(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
.await
.into_value()
}
pub async fn get_with_by_ref<Q>(&self, key: &Q, init: impl Future<Output = V>) -> V
where
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
futures_util::pin_mut!(init);
let hash = self.base.hash(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
.await
.into_value()
}
#[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
pub async fn get_with_if(
&self,
key: K,
init: impl Future<Output = V>,
replace_if: impl FnMut(&V) -> bool + Send,
) -> V {
futures_util::pin_mut!(init);
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
.await
.into_value()
}
pub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
where
F: Future<Output = Option<V>>,
{
futures_util::pin_mut!(init);
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
.await
.map(Entry::into_value)
}
pub async fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
where
F: Future<Output = Option<V>>,
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
futures_util::pin_mut!(init);
let hash = self.base.hash(key);
self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
.await
.map(Entry::into_value)
}
pub async fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
{
futures_util::pin_mut!(init);
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
.await
.map(Entry::into_value)
}
pub async fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
futures_util::pin_mut!(init);
let hash = self.base.hash(key);
self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
.await
.map(Entry::into_value)
}
pub async fn insert(&self, key: K, value: V) {
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.insert_with_hash(key, hash, value).await;
}
pub async fn invalidate<Q>(&self, key: &Q)
where
Q: Equivalent<K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
self.invalidate_with_hash(key, hash, false).await;
}
pub async fn remove<Q>(&self, key: &Q) -> Option<V>
where
Q: Equivalent<K> + Hash + ?Sized,
{
let hash = self.base.hash(key);
self.invalidate_with_hash(key, hash, true).await
}
pub fn invalidate_all(&self) {
self.base.invalidate_all();
}
pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
where
F: Fn(&K, &V) -> bool + Send + Sync + 'static,
{
self.base.invalidate_entries_if(Arc::new(predicate))
}
pub fn iter(&self) -> Iter<'_, K, V> {
use crate::common::iter::{Iter as InnerIter, ScanningGet};
let inner = InnerIter::with_single_cache_segment(&self.base, self.base.num_cht_segments());
Iter::new(inner)
}
pub async fn run_pending_tasks(&self) {
if let Some(hk) = &self.base.housekeeper {
self.base.retry_interrupted_ops().await;
hk.run_pending_tasks(Arc::clone(&self.base.inner)).await;
}
}
}
impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
type Item = (Arc<K>, V);
type IntoIter = Iter<'a, K, V>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
pub(crate) async fn get_or_insert_with_hash_and_fun(
&self,
key: Arc<K>,
hash: u64,
init: Pin<&mut impl Future<Output = V>>,
mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
need_key: bool,
) -> Entry<K, V> {
let maybe_entry = self
.base
.get_with_hash(&*key, hash, replace_if.as_mut(), need_key, true)
.await;
if let Some(entry) = maybe_entry {
entry
} else {
self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
.await
}
}
pub(crate) async fn get_or_insert_with_hash_by_ref_and_fun<Q>(
&self,
key: &Q,
hash: u64,
init: Pin<&mut impl Future<Output = V>>,
mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
need_key: bool,
) -> Entry<K, V>
where
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
let maybe_entry = self
.base
.get_with_hash(key, hash, replace_if.as_mut(), need_key, true)
.await;
if let Some(entry) = maybe_entry {
entry
} else {
let key = Arc::new(key.to_owned());
self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
.await
}
}
async fn insert_with_hash_and_fun(
&self,
key: Arc<K>,
hash: u64,
init: Pin<&mut impl Future<Output = V>>,
replace_if: Option<impl FnMut(&V) -> bool + Send>,
need_key: bool,
) -> Entry<K, V> {
let k = if need_key {
Some(Arc::clone(&key))
} else {
None
};
let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
match self
.value_initializer
.try_init_or_read(&key, hash, type_id, self, replace_if, init, post_init)
.await
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Entry::new(k, v, true, false)
}
InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
InitResult::InitErr(_) => unreachable!(),
}
}
pub(crate) async fn get_or_insert_with_hash(
&self,
key: Arc<K>,
hash: u64,
init: impl FnOnce() -> V,
) -> Entry<K, V> {
match self
.base
.get_with_hash(&*key, hash, never_ignore(), true, true)
.await
{
Some(entry) => entry,
None => {
let value = init();
self.insert_with_hash(Arc::clone(&key), hash, value.clone())
.await;
Entry::new(Some(key), value, true, false)
}
}
}
pub(crate) async fn get_or_insert_with_hash_by_ref<Q>(
&self,
key: &Q,
hash: u64,
init: impl FnOnce() -> V,
) -> Entry<K, V>
where
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
match self
.base
.get_with_hash(key, hash, never_ignore(), true, true)
.await
{
Some(entry) => entry,
None => {
let key = Arc::new(key.to_owned());
let value = init();
self.insert_with_hash(Arc::clone(&key), hash, value.clone())
.await;
Entry::new(Some(key), value, true, false)
}
}
}
pub(crate) async fn get_or_optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: Pin<&mut F>,
need_key: bool,
) -> Option<Entry<K, V>>
where
F: Future<Output = Option<V>>,
{
let entry = self
.base
.get_with_hash(&*key, hash, never_ignore(), need_key, true)
.await;
if entry.is_some() {
return entry;
}
self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
.await
}
pub(crate) async fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
&self,
key: &Q,
hash: u64,
init: Pin<&mut F>,
need_key: bool,
) -> Option<Entry<K, V>>
where
F: Future<Output = Option<V>>,
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
let entry = self
.base
.get_with_hash(key, hash, never_ignore(), need_key, true)
.await;
if entry.is_some() {
return entry;
}
let key = Arc::new(key.to_owned());
self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
.await
}
async fn optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: Pin<&mut F>,
need_key: bool,
) -> Option<Entry<K, V>>
where
F: Future<Output = Option<V>>,
{
let k = if need_key {
Some(Arc::clone(&key))
} else {
None
};
let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
match self
.value_initializer
.try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
.await
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Some(Entry::new(k, v, true, false))
}
InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
InitResult::InitErr(_) => None,
}
}
pub(super) async fn get_or_try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: Pin<&mut F>,
need_key: bool,
) -> Result<Entry<K, V>, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
{
if let Some(entry) = self
.base
.get_with_hash(&*key, hash, never_ignore(), need_key, true)
.await
{
return Ok(entry);
}
self.try_insert_with_hash_and_fun(key, hash, init, need_key)
.await
}
pub(super) async fn get_or_try_insert_with_hash_by_ref_and_fun<F, E, Q>(
&self,
key: &Q,
hash: u64,
init: Pin<&mut F>,
need_key: bool,
) -> Result<Entry<K, V>, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
{
if let Some(entry) = self
.base
.get_with_hash(key, hash, never_ignore(), need_key, true)
.await
{
return Ok(entry);
}
let key = Arc::new(key.to_owned());
self.try_insert_with_hash_and_fun(key, hash, init, need_key)
.await
}
async fn try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: Pin<&mut F>,
need_key: bool,
) -> Result<Entry<K, V>, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
{
let k = if need_key {
Some(Arc::clone(&key))
} else {
None
};
let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
match self
.value_initializer
.try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
.await
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Ok(Entry::new(k, v, true, false))
}
InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
InitResult::InitErr(e) => {
crossbeam_epoch::pin().flush();
Err(e)
}
}
}
pub(crate) async fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
if self.base.is_map_disabled() {
return;
}
let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await;
let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, ts);
cancel_guard.set_op(op.clone());
let should_block;
#[cfg(not(test))]
{
should_block = false;
}
#[cfg(test)]
{
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}
let hk = self.base.housekeeper.as_ref();
let event = self.base.write_op_ch_ready_event();
BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
event,
op,
ts,
hk,
should_block,
)
.await
.expect("Failed to schedule write op for insert");
cancel_guard.clear();
}
pub(crate) async fn compute_with_hash_and_fun<F, Fut>(
&self,
key: Arc<K>,
hash: u64,
f: F,
) -> compute::CompResult<K, V>
where
F: FnOnce(Option<Entry<K, V>>) -> Fut,
Fut: Future<Output = compute::Op<V>>,
{
let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
match self
.value_initializer
.try_compute(key, hash, self, f, post_init, true)
.await
{
Ok(result) => result,
Err(_) => unreachable!(),
}
}
pub(crate) async fn try_compute_with_hash_and_fun<F, Fut, E>(
&self,
key: Arc<K>,
hash: u64,
f: F,
) -> Result<compute::CompResult<K, V>, E>
where
F: FnOnce(Option<Entry<K, V>>) -> Fut,
Fut: Future<Output = Result<compute::Op<V>, E>>,
E: Send + Sync + 'static,
{
let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
self.value_initializer
.try_compute(key, hash, self, f, post_init, true)
.await
}
pub(crate) async fn try_compute_if_nobody_else_with_hash_and_fun<F, Fut, E>(
&self,
key: Arc<K>,
hash: u64,
f: F,
) -> Result<compute::CompResult<K, V>, E>
where
F: FnOnce(Option<Entry<K, V>>) -> Fut,
Fut: Future<Output = Result<compute::Op<V>, E>>,
E: Send + Sync + 'static,
{
let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with_if_nobody_else;
self.value_initializer
.try_compute_if_nobody_else(key, hash, self, f, post_init, true)
.await
}
pub(crate) async fn upsert_with_hash_and_fun<F, Fut>(
&self,
key: Arc<K>,
hash: u64,
f: F,
) -> Entry<K, V>
where
F: FnOnce(Option<Entry<K, V>>) -> Fut,
Fut: Future<Output = V>,
{
let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
match self
.value_initializer
.try_compute(key, hash, self, f, post_init, false)
.await
{
Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
_ => unreachable!(),
}
}
pub(crate) async fn invalidate_with_hash<Q>(
&self,
key: &Q,
hash: u64,
need_value: bool,
) -> Option<V>
where
Q: Equivalent<K> + Hash + ?Sized,
{
use futures_util::FutureExt;
self.base.retry_interrupted_ops().await;
let mut kl = None;
let mut klg = None;
if self.base.is_removal_notifier_enabled() {
if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
kl = self.base.maybe_key_lock(&arc_key);
klg = if let Some(lock) = &kl {
Some(lock.lock().await)
} else {
None
};
}
}
match self.base.remove_entry(key, hash) {
None => None,
Some(kv) => {
let now = self.base.current_time();
let maybe_v = if need_value {
Some(kv.entry.value.clone())
} else {
None
};
let info = kv.entry.entry_info();
let entry_gen = info.incr_entry_gen();
let op: WriteOp<K, V> = WriteOp::Remove {
kv_entry: kv.clone(),
entry_gen,
};
let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);
if self.base.is_removal_notifier_enabled() {
let future = self
.base
.notify_invalidate(&kv.key, &kv.entry)
.boxed()
.shared();
cancel_guard.set_future_and_op(future.clone(), op.clone());
future.await;
cancel_guard.unset_future();
} else {
cancel_guard.set_op(op.clone());
}
std::mem::drop(klg);
std::mem::drop(kl);
let should_block;
#[cfg(not(test))]
{
should_block = false;
}
#[cfg(test)]
{
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}
let event = self.base.write_op_ch_ready_event();
let hk = self.base.housekeeper.as_ref();
BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
event,
op,
now,
hk,
should_block,
)
.await
.expect("Failed to schedule write op for remove");
cancel_guard.clear();
crossbeam_epoch::pin().flush();
maybe_v
}
}
}
}
#[cfg(test)]
impl<K, V, S> Cache<K, V, S> {
pub(crate) fn is_table_empty(&self) -> bool {
self.entry_count() == 0
}
pub(crate) fn is_waiter_map_empty(&self) -> bool {
self.value_initializer.waiter_count() == 0
}
}
#[cfg(test)]
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn invalidation_predicate_count(&self) -> usize {
self.base.invalidation_predicate_count()
}
async fn reconfigure_for_testing(&mut self) {
self.base.reconfigure_for_testing().await;
}
fn key_locks_map_is_empty(&self) -> bool {
self.base.key_locks_map_is_empty()
}
fn run_pending_tasks_initiation_count(&self) -> usize {
self.base
.housekeeper
.as_ref()
.map(|hk| hk.start_count.load(Ordering::Acquire))
.expect("housekeeper is not set")
}
fn run_pending_tasks_completion_count(&self) -> usize {
self.base
.housekeeper
.as_ref()
.map(|hk| hk.complete_count.load(Ordering::Acquire))
.expect("housekeeper is not set")
}
}
#[inline]
fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> {
None
}
#[cfg(test)]
mod tests {
use super::Cache;
use crate::{
common::{time::Clock, HousekeeperConfig},
future::FutureExt,
notification::{ListenerFuture, RemovalCause},
ops::compute,
policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
Expiry,
};
use async_lock::{Barrier, Mutex};
use std::{
convert::Infallible,
sync::{
atomic::{AtomicU32, AtomicU8, Ordering},
Arc,
},
time::{Duration, Instant as StdInstant},
vec,
};
use tokio::time::sleep;
#[test]
fn futures_are_send() {
let cache = Cache::new(0);
fn is_send(_: impl Send) {}
is_send(cache.get(&()));
is_send(cache.get_with((), async {}));
is_send(cache.get_with_by_ref(&(), async {}));
#[allow(deprecated)]
is_send(cache.get_with_if((), async {}, |_| false));
is_send(cache.insert((), ()));
is_send(cache.invalidate(&()));
is_send(cache.optionally_get_with((), async { None }));
is_send(cache.optionally_get_with_by_ref(&(), async { None }));
is_send(cache.remove(&()));
is_send(cache.run_pending_tasks());
is_send(cache.try_get_with((), async { Err(()) }));
is_send(cache.try_get_with_by_ref(&(), async { Err(()) }));
is_send(
cache
.entry(())
.and_compute_with(|_| async { compute::Op::Nop }),
);
is_send(
cache
.entry(())
.and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
);
is_send(cache.entry(()).and_upsert_with(|_| async {}));
is_send(cache.entry(()).or_default());
is_send(cache.entry(()).or_insert(()));
is_send(cache.entry(()).or_insert_with(async {}));
is_send(cache.entry(()).or_insert_with_if(async {}, |_| false));
is_send(cache.entry(()).or_optionally_insert_with(async { None }));
is_send(cache.entry(()).or_try_insert_with(async { Err(()) }));
is_send(
cache
.entry_by_ref(&())
.and_compute_with(|_| async { compute::Op::Nop }),
);
is_send(
cache
.entry_by_ref(&())
.and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
);
is_send(cache.entry_by_ref(&()).and_upsert_with(|_| async {}));
is_send(cache.entry_by_ref(&()).or_default());
is_send(cache.entry_by_ref(&()).or_insert(()));
is_send(cache.entry_by_ref(&()).or_insert_with(async {}));
is_send(
cache
.entry_by_ref(&())
.or_insert_with_if(async {}, |_| false),
);
is_send(
cache
.entry_by_ref(&())
.or_optionally_insert_with(async { None }),
);
is_send(
cache
.entry_by_ref(&())
.or_try_insert_with(async { Err(()) }),
);
}
#[tokio::test]
async fn max_capacity_zero() {
let mut cache = Cache::new(0);
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert(0, ()).await;
assert!(!cache.contains_key(&0));
assert!(cache.get(&0).await.is_none());
cache.run_pending_tasks().await;
assert!(!cache.contains_key(&0));
assert!(cache.get(&0).await.is_none());
assert_eq!(cache.entry_count(), 0)
}
#[tokio::test]
async fn basic_single_async_task() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let mut cache = Cache::builder()
.max_capacity(3)
.async_eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("a", "alice").await;
cache.insert("b", "bob").await;
assert_eq!(cache.get(&"a").await, Some("alice"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b").await, Some("bob"));
cache.run_pending_tasks().await;
cache.insert("c", "cindy").await;
assert_eq!(cache.get(&"c").await, Some("cindy"));
assert!(cache.contains_key(&"c"));
cache.run_pending_tasks().await;
assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a").await, Some("alice"));
assert_eq!(cache.get(&"b").await, Some("bob"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks().await;
cache.insert("d", "david").await; expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"d").await, None); assert!(!cache.contains_key(&"d"));
cache.insert("d", "david").await;
expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.run_pending_tasks().await;
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d").await, None);
cache.insert("d", "dennis").await;
expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, Some("alice"));
assert_eq!(cache.get(&"b").await, Some("bob"));
assert_eq!(cache.get(&"c").await, None);
assert_eq!(cache.get(&"d").await, Some("dennis"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
cache.invalidate(&"b").await;
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"b").await, None);
assert!(!cache.contains_key(&"b"));
assert!(cache.remove(&"b").await.is_none());
assert_eq!(cache.remove(&"d").await, Some("dennis"));
expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"d").await, None);
assert!(!cache.contains_key(&"d"));
verify_notification_vec(&cache, actual, &expected).await;
assert!(cache.key_locks_map_is_empty());
}
#[tokio::test]
async fn basic_lru_single_thread() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_policy(EvictionPolicy::lru())
.async_eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("a", "alice").await;
cache.insert("b", "bob").await;
assert_eq!(cache.get(&"a").await, Some("alice"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b").await, Some("bob"));
cache.run_pending_tasks().await;
cache.insert("c", "cindy").await;
assert_eq!(cache.get(&"c").await, Some("cindy"));
assert!(cache.contains_key(&"c"));
cache.run_pending_tasks().await;
assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a").await, Some("alice"));
assert_eq!(cache.get(&"b").await, Some("bob"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks().await;
cache.insert("d", "david").await;
expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, Some("alice"));
assert_eq!(cache.get(&"b").await, Some("bob"));
assert_eq!(cache.get(&"c").await, None);
assert_eq!(cache.get(&"d").await, Some("david"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
cache.run_pending_tasks().await;
cache.invalidate(&"b").await;
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"b").await, None);
assert!(!cache.contains_key(&"b"));
assert!(cache.remove(&"b").await.is_none());
assert_eq!(cache.remove(&"d").await, Some("david"));
expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"d").await, None);
assert!(!cache.contains_key(&"d"));
cache.insert("e", "emily").await;
cache.insert("f", "frank").await;
cache.insert("g", "gina").await;
expected.push((Arc::new("a"), "alice", RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"e").await, Some("emily"));
assert_eq!(cache.get(&"f").await, Some("frank"));
assert_eq!(cache.get(&"g").await, Some("gina"));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"e"));
assert!(cache.contains_key(&"f"));
assert!(cache.contains_key(&"g"));
verify_notification_vec(&cache, actual, &expected).await;
assert!(cache.key_locks_map_is_empty());
}
#[tokio::test]
async fn size_aware_eviction() {
let weigher = |_k: &&str, v: &(&str, u32)| v.1;
let alice = ("alice", 10);
let bob = ("bob", 15);
let bill = ("bill", 20);
let cindy = ("cindy", 5);
let david = ("david", 15);
let dennis = ("dennis", 15);
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let mut cache = Cache::builder()
.max_capacity(31)
.weigher(weigher)
.async_eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("a", alice).await;
cache.insert("b", bob).await;
assert_eq!(cache.get(&"a").await, Some(alice));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b").await, Some(bob));
cache.run_pending_tasks().await;
cache.insert("c", cindy).await;
assert_eq!(cache.get(&"c").await, Some(cindy));
assert!(cache.contains_key(&"c"));
cache.run_pending_tasks().await;
assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a").await, Some(alice));
assert_eq!(cache.get(&"b").await, Some(bob));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks().await;
cache.insert("d", david).await; expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"d").await, None); assert!(!cache.contains_key(&"d"));
cache.insert("d", david).await;
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.run_pending_tasks().await;
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d").await, None);
cache.insert("d", david).await;
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"d").await, None); assert!(!cache.contains_key(&"d"));
cache.insert("d", david).await;
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.run_pending_tasks().await;
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d").await, None);
cache.insert("d", dennis).await;
expected.push((Arc::new("c"), cindy, RemovalCause::Size));
expected.push((Arc::new("a"), alice, RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"b").await, Some(bob));
assert_eq!(cache.get(&"c").await, None);
assert_eq!(cache.get(&"d").await, Some(dennis));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
cache.insert("b", bill).await;
expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
expected.push((Arc::new("d"), dennis, RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"b").await, Some(bill));
assert_eq!(cache.get(&"d").await, None);
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"d"));
cache.insert("a", alice).await;
cache.insert("b", bob).await;
expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, Some(alice));
assert_eq!(cache.get(&"b").await, Some(bob));
assert_eq!(cache.get(&"d").await, None);
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.entry_count(), 2);
assert_eq!(cache.weighted_size(), 25);
verify_notification_vec(&cache, actual, &expected).await;
assert!(cache.key_locks_map_is_empty());
}
#[tokio::test]
async fn basic_multi_async_tasks() {
let num_tasks = 2;
let num_threads = 2;
let cache = Cache::new(100);
let barrier = Arc::new(Barrier::new(num_tasks + num_threads as usize));
let tasks = (0..num_tasks)
.map(|id| {
let cache = cache.clone();
let barrier = Arc::clone(&barrier);
tokio::spawn(async move {
barrier.wait().await;
cache.insert(10, format!("{id}-100")).await;
cache.get(&10).await;
cache.insert(20, format!("{id}-200")).await;
cache.invalidate(&10).await;
})
})
.collect::<Vec<_>>();
let threads = (0..num_threads)
.map(|id| {
let cache = cache.clone();
let barrier = Arc::clone(&barrier);
let rt = tokio::runtime::Handle::current();
std::thread::spawn(move || {
rt.block_on(barrier.wait());
rt.block_on(cache.insert(10, format!("{id}-100")));
rt.block_on(cache.get(&10));
rt.block_on(cache.insert(20, format!("{id}-200")));
rt.block_on(cache.invalidate(&10));
})
})
.collect::<Vec<_>>();
let _ = futures_util::future::join_all(tasks).await;
threads.into_iter().for_each(|t| t.join().unwrap());
assert!(cache.get(&10).await.is_none());
assert!(cache.get(&20).await.is_some());
assert!(!cache.contains_key(&10));
assert!(cache.contains_key(&20));
}
#[tokio::test]
async fn invalidate_all() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let mut cache = Cache::builder()
.max_capacity(100)
.async_eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("a", "alice").await;
cache.insert("b", "bob").await;
cache.insert("c", "cindy").await;
assert_eq!(cache.get(&"a").await, Some("alice"));
assert_eq!(cache.get(&"b").await, Some("bob"));
assert_eq!(cache.get(&"c").await, Some("cindy"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(cache.contains_key(&"c"));
cache.invalidate_all();
expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
cache.run_pending_tasks().await;
cache.insert("d", "david").await;
cache.run_pending_tasks().await;
assert!(cache.get(&"a").await.is_none());
assert!(cache.get(&"b").await.is_none());
assert!(cache.get(&"c").await.is_none());
assert_eq!(cache.get(&"d").await, Some("david"));
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
verify_notification_vec(&cache, actual, &expected).await;
}
#[tokio::test]
async fn invalidate_all_without_running_pending_tasks() {
let cache = Cache::new(1024);
assert_eq!(cache.get(&0).await, None);
cache.insert(0, 1).await;
assert_eq!(cache.get(&0).await, Some(1));
cache.invalidate_all();
assert_eq!(cache.get(&0).await, None);
}
#[tokio::test]
async fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
use std::collections::HashSet;
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.support_invalidation_closures()
.async_eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert(0, "alice").await;
cache.insert(1, "bob").await;
cache.insert(2, "alex").await;
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&0).await, Some("alice"));
assert_eq!(cache.get(&1).await, Some("bob"));
assert_eq!(cache.get(&2).await, Some("alex"));
assert!(cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(cache.contains_key(&2));
let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
assert_eq!(cache.invalidation_predicate_count(), 1);
expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
mock.increment(Duration::from_secs(5));
cache.insert(3, "alice").await;
cache.run_pending_tasks().await; std::thread::sleep(Duration::from_millis(200));
cache.run_pending_tasks().await; std::thread::sleep(Duration::from_millis(200));
assert!(cache.get(&0).await.is_none());
assert!(cache.get(&2).await.is_none());
assert_eq!(cache.get(&1).await, Some("bob"));
assert_eq!(cache.get(&3).await, Some("alice"));
assert!(!cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(!cache.contains_key(&2));
assert!(cache.contains_key(&3));
assert_eq!(cache.entry_count(), 2);
assert_eq!(cache.invalidation_predicate_count(), 0);
mock.increment(Duration::from_secs(5));
cache.invalidate_entries_if(|_k, &v| v == "alice")?;
cache.invalidate_entries_if(|_k, &v| v == "bob")?;
assert_eq!(cache.invalidation_predicate_count(), 2);
expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
cache.run_pending_tasks().await; std::thread::sleep(Duration::from_millis(200));
cache.run_pending_tasks().await; std::thread::sleep(Duration::from_millis(200));
assert!(cache.get(&1).await.is_none());
assert!(cache.get(&3).await.is_none());
assert!(!cache.contains_key(&1));
assert!(!cache.contains_key(&3));
assert_eq!(cache.entry_count(), 0);
assert_eq!(cache.invalidation_predicate_count(), 0);
verify_notification_vec(&cache, actual, &expected).await;
Ok(())
}
#[tokio::test]
async fn time_to_live() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.time_to_live(Duration::from_secs(10))
.async_eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("a", "alice").await;
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, Some("alice"));
assert!(cache.contains_key(&"a"));
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a").await, None);
assert!(!cache.contains_key(&"a"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks().await;
assert!(cache.is_table_empty());
cache.insert("b", "bob").await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&"b").await, Some("bob"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
cache.insert("b", "bill").await;
expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&"b").await, Some("bill"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"b").await, None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks().await;
assert!(cache.is_table_empty());
verify_notification_vec(&cache, actual, &expected).await;
}
#[tokio::test]
async fn time_to_idle() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.async_eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("a", "alice").await;
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, Some("alice"));
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
cache.insert("b", "bob").await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(2)); cache.run_pending_tasks().await;
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(3)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"b").await, Some("bob"));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 1);
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(10)); expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"b").await, None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks().await;
assert!(cache.is_table_empty());
verify_notification_vec(&cache, actual, &expected).await;
}
#[tokio::test]
async fn ensure_access_time_is_updated_immediately_after_read() {
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(10)
.time_to_idle(Duration::from_secs(5))
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert(1, 1).await;
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&1).await, Some(1));
mock.increment(Duration::from_secs(2));
assert_eq!(cache.get(&1).await, Some(1));
cache.run_pending_tasks().await;
assert_eq!(cache.get(&1).await, Some(1));
}
#[tokio::test]
async fn time_to_live_by_expiry_type() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
struct MyExpiry {
counters: Arc<ExpiryCallCounters>,
}
impl MyExpiry {
fn new(counters: Arc<ExpiryCallCounters>) -> Self {
Self { counters }
}
}
impl Expiry<&str, &str> for MyExpiry {
fn expire_after_create(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
) -> Option<Duration> {
self.counters.incl_actual_creations();
Some(Duration::from_secs(10))
}
fn expire_after_update(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
) -> Option<Duration> {
self.counters.incl_actual_updates();
Some(Duration::from_secs(10))
}
}
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let expiry_counters = Arc::new(ExpiryCallCounters::default());
let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.async_eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("a", "alice").await;
expiry_counters.incl_expected_creations();
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, Some("alice"));
assert!(cache.contains_key(&"a"));
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a").await, None);
assert!(!cache.contains_key(&"a"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks().await;
assert!(cache.is_table_empty());
cache.insert("b", "bob").await;
expiry_counters.incl_expected_creations();
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&"b").await, Some("bob"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
cache.insert("b", "bill").await;
expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
expiry_counters.incl_expected_updates();
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&"b").await, Some("bill"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"b").await, None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks().await;
assert!(cache.is_table_empty());
expiry_counters.verify();
verify_notification_vec(&cache, actual, &expected).await;
}
#[tokio::test]
async fn time_to_idle_by_expiry_type() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
struct MyExpiry {
counters: Arc<ExpiryCallCounters>,
}
impl MyExpiry {
fn new(counters: Arc<ExpiryCallCounters>) -> Self {
Self { counters }
}
}
impl Expiry<&str, &str> for MyExpiry {
fn expire_after_read(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
_last_modified_at: StdInstant,
) -> Option<Duration> {
self.counters.incl_actual_reads();
Some(Duration::from_secs(10))
}
}
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let expiry_counters = Arc::new(ExpiryCallCounters::default());
let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.async_eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("a", "alice").await;
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
assert_eq!(cache.get(&"a").await, Some("alice"));
expiry_counters.incl_expected_reads();
mock.increment(Duration::from_secs(5)); cache.run_pending_tasks().await;
cache.insert("b", "bob").await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(2)); cache.run_pending_tasks().await;
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(3)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"b").await, Some("bob"));
expiry_counters.incl_expected_reads();
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 1);
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(10)); expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"b").await, None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.run_pending_tasks().await;
assert!(cache.is_table_empty());
expiry_counters.verify();
verify_notification_vec(&cache, actual, &expected).await;
}
#[tokio::test]
async fn test_expiry_using_get_with() {
struct NoExpiry {
counters: Arc<ExpiryCallCounters>,
}
impl NoExpiry {
fn new(counters: Arc<ExpiryCallCounters>) -> Self {
Self { counters }
}
}
impl Expiry<&str, &str> for NoExpiry {
fn expire_after_create(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
) -> Option<Duration> {
self.counters.incl_actual_creations();
None
}
fn expire_after_read(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
_last_modified_at: StdInstant,
) -> Option<Duration> {
self.counters.incl_actual_reads();
None
}
fn expire_after_update(
&self,
_key: &&str,
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
) -> Option<Duration> {
unreachable!("The `expire_after_update()` method should not be called.");
}
}
let expiry_counters = Arc::new(ExpiryCallCounters::default());
let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
let mut cache = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.get_with("a", async { "alice" }).await;
expiry_counters.incl_expected_creations();
cache.run_pending_tasks().await;
cache.get_with("a", async { "alex" }).await;
expiry_counters.incl_expected_reads();
cache.run_pending_tasks().await;
cache.invalidate("a").await;
cache.get_with("a", async { "amanda" }).await;
expiry_counters.incl_expected_creations();
cache.run_pending_tasks().await;
expiry_counters.verify();
}
#[tokio::test]
async fn test_race_between_updating_entry_and_processing_its_write_ops() {
let (clock, mock) = Clock::mock();
let cache = Cache::builder()
.max_capacity(2)
.time_to_idle(Duration::from_secs(1))
.clock(clock)
.build();
cache.insert("a", "alice").await;
cache.insert("b", "bob").await;
cache.insert("c", "cathy").await; mock.increment(Duration::from_secs(2));
cache.insert("c", "cindy").await;
assert_eq!(cache.remove(&"c").await, Some("cindy"));
mock.increment(Duration::from_secs(2));
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 0);
}
#[tokio::test]
async fn test_race_between_recreating_entry_and_processing_its_write_ops() {
let cache = Cache::builder().max_capacity(2).build();
cache.insert('a', "a").await;
cache.insert('b', "b").await;
cache.run_pending_tasks().await;
cache.insert('c', "c1").await; assert!(cache.remove(&'a').await.is_some()); assert!(cache.remove(&'b').await.is_some()); assert!(cache.remove(&'c').await.is_some()); cache.insert('c', "c2").await;
cache.run_pending_tasks().await;
assert_eq!(cache.get(&'c').await, Some("c2"));
}
#[tokio::test]
async fn test_iter() {
const NUM_KEYS: usize = 50;
fn make_value(key: usize) -> String {
format!("val: {key}")
}
let cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.build();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key)).await;
}
let mut key_set = std::collections::HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
assert_eq!(key_set.len(), NUM_KEYS);
}
#[tokio::test]
async fn test_iter_multi_async_tasks() {
use std::collections::HashSet;
const NUM_KEYS: usize = 1024;
const NUM_TASKS: usize = 16;
fn make_value(key: usize) -> String {
format!("val: {key}")
}
let cache = Cache::builder()
.max_capacity(2048)
.time_to_idle(Duration::from_secs(10))
.build();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key)).await;
}
let rw_lock = Arc::new(tokio::sync::RwLock::<()>::default());
let write_lock = rw_lock.write().await;
let tasks = (0..NUM_TASKS)
.map(|n| {
let cache = cache.clone();
let rw_lock = Arc::clone(&rw_lock);
if n % 2 == 0 {
tokio::spawn(async move {
let read_lock = rw_lock.read().await;
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key)).await;
}
std::mem::drop(read_lock);
})
} else {
tokio::spawn(async move {
let read_lock = rw_lock.read().await;
let mut key_set = HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
assert_eq!(key_set.len(), NUM_KEYS);
std::mem::drop(read_lock);
})
}
})
.collect::<Vec<_>>();
std::mem::drop(write_lock);
let _ = futures_util::future::join_all(tasks).await;
let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
assert_eq!(key_set.len(), NUM_KEYS);
}
#[tokio::test]
async fn get_with() {
let cache = Cache::new(100);
const KEY: u32 = 0;
let task1 = {
let cache1 = cache.clone();
async move {
let v = cache1
.get_with(KEY, async {
sleep(Duration::from_millis(300)).await;
"task1"
})
.await;
assert_eq!(v, "task1");
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
let v = cache2.get_with(KEY, async { unreachable!() }).await;
assert_eq!(v, "task1");
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let v = cache3.get_with(KEY, async { unreachable!() }).await;
assert_eq!(v, "task1");
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(200)).await;
let maybe_v = cache4.get(&KEY).await;
assert!(maybe_v.is_none());
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let maybe_v = cache5.get(&KEY).await;
assert_eq!(maybe_v, Some("task1"));
}
};
futures_util::join!(task1, task2, task3, task4, task5);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn get_with_by_ref() {
let cache = Cache::new(100);
const KEY: &u32 = &0;
let task1 = {
let cache1 = cache.clone();
async move {
let v = cache1
.get_with_by_ref(KEY, async {
sleep(Duration::from_millis(300)).await;
"task1"
})
.await;
assert_eq!(v, "task1");
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
let v = cache2.get_with_by_ref(KEY, async { unreachable!() }).await;
assert_eq!(v, "task1");
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let v = cache3.get_with_by_ref(KEY, async { unreachable!() }).await;
assert_eq!(v, "task1");
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(200)).await;
let maybe_v = cache4.get(KEY).await;
assert!(maybe_v.is_none());
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let maybe_v = cache5.get(KEY).await;
assert_eq!(maybe_v, Some("task1"));
}
};
futures_util::join!(task1, task2, task3, task4, task5);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn entry_or_insert_with_if() {
let cache = Cache::new(100);
const KEY: u32 = 0;
let task1 = {
let cache1 = cache.clone();
async move {
let entry = cache1
.entry(KEY)
.or_insert_with_if(
async {
sleep(Duration::from_millis(300)).await;
"task1"
},
|_v| unreachable!(),
)
.await;
assert!(entry.is_fresh());
assert_eq!(entry.into_value(), "task1");
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
let entry = cache2
.entry(KEY)
.or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
.await;
assert!(!entry.is_fresh());
assert_eq!(entry.into_value(), "task1");
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(350)).await;
let entry = cache3
.entry(KEY)
.or_insert_with_if(async { unreachable!() }, |v| {
assert_eq!(v, &"task1");
false
})
.await;
assert!(!entry.is_fresh());
assert_eq!(entry.into_value(), "task1");
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let entry = cache4
.entry(KEY)
.or_insert_with_if(async { "task4" }, |v| {
assert_eq!(v, &"task1");
true
})
.await;
assert!(entry.is_fresh());
assert_eq!(entry.into_value(), "task4");
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(200)).await;
let maybe_v = cache5.get(&KEY).await;
assert!(maybe_v.is_none());
}
};
let task6 = {
let cache6 = cache.clone();
async move {
sleep(Duration::from_millis(350)).await;
let maybe_v = cache6.get(&KEY).await;
assert_eq!(maybe_v, Some("task1"));
}
};
let task7 = {
let cache7 = cache.clone();
async move {
sleep(Duration::from_millis(450)).await;
let maybe_v = cache7.get(&KEY).await;
assert_eq!(maybe_v, Some("task4"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn entry_by_ref_or_insert_with_if() {
let cache = Cache::new(100);
const KEY: &u32 = &0;
let task1 = {
let cache1 = cache.clone();
async move {
let entry = cache1
.entry_by_ref(KEY)
.or_insert_with_if(
async {
sleep(Duration::from_millis(300)).await;
"task1"
},
|_v| unreachable!(),
)
.await;
assert!(entry.is_fresh());
assert_eq!(entry.into_value(), "task1");
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
let entry = cache2
.entry_by_ref(KEY)
.or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
.await;
assert!(!entry.is_fresh());
assert_eq!(entry.into_value(), "task1");
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(350)).await;
let entry = cache3
.entry_by_ref(KEY)
.or_insert_with_if(async { unreachable!() }, |v| {
assert_eq!(v, &"task1");
false
})
.await;
assert!(!entry.is_fresh());
assert_eq!(entry.into_value(), "task1");
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let entry = cache4
.entry_by_ref(KEY)
.or_insert_with_if(async { "task4" }, |v| {
assert_eq!(v, &"task1");
true
})
.await;
assert!(entry.is_fresh());
assert_eq!(entry.into_value(), "task4");
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(200)).await;
let maybe_v = cache5.get(KEY).await;
assert!(maybe_v.is_none());
}
};
let task6 = {
let cache6 = cache.clone();
async move {
sleep(Duration::from_millis(350)).await;
let maybe_v = cache6.get(KEY).await;
assert_eq!(maybe_v, Some("task1"));
}
};
let task7 = {
let cache7 = cache.clone();
async move {
sleep(Duration::from_millis(450)).await;
let maybe_v = cache7.get(KEY).await;
assert_eq!(maybe_v, Some("task4"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn try_get_with() {
use std::sync::Arc;
#[derive(Debug)]
pub struct MyError(#[allow(dead_code)] String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = Cache::new(100);
const KEY: u32 = 0;
let task1 = {
let cache1 = cache.clone();
async move {
let v = cache1
.try_get_with(KEY, async {
sleep(Duration::from_millis(300)).await;
Err(MyError("task1 error".into()))
})
.await;
assert!(v.is_err());
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
let v: MyResult<_> = cache2.try_get_with(KEY, async { unreachable!() }).await;
assert!(v.is_err());
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let v: MyResult<_> = cache3
.try_get_with(KEY, async {
sleep(Duration::from_millis(300)).await;
Ok("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(500)).await;
let v: MyResult<_> = cache4.try_get_with(KEY, async { unreachable!() }).await;
assert_eq!(v.unwrap(), "task3");
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(800)).await;
let v: MyResult<_> = cache5.try_get_with(KEY, async { unreachable!() }).await;
assert_eq!(v.unwrap(), "task3");
}
};
let task6 = {
let cache6 = cache.clone();
async move {
sleep(Duration::from_millis(200)).await;
let maybe_v = cache6.get(&KEY).await;
assert!(maybe_v.is_none());
}
};
let task7 = {
let cache7 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let maybe_v = cache7.get(&KEY).await;
assert!(maybe_v.is_none());
}
};
let task8 = {
let cache8 = cache.clone();
async move {
sleep(Duration::from_millis(800)).await;
let maybe_v = cache8.get(&KEY).await;
assert_eq!(maybe_v, Some("task3"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn try_get_with_by_ref() {
use std::sync::Arc;
#[derive(Debug)]
pub struct MyError(#[allow(dead_code)] String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = Cache::new(100);
const KEY: &u32 = &0;
let task1 = {
let cache1 = cache.clone();
async move {
let v = cache1
.try_get_with_by_ref(KEY, async {
sleep(Duration::from_millis(300)).await;
Err(MyError("task1 error".into()))
})
.await;
assert!(v.is_err());
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
let v: MyResult<_> = cache2
.try_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert!(v.is_err());
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let v: MyResult<_> = cache3
.try_get_with_by_ref(KEY, async {
sleep(Duration::from_millis(300)).await;
Ok("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(500)).await;
let v: MyResult<_> = cache4
.try_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(800)).await;
let v: MyResult<_> = cache5
.try_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task6 = {
let cache6 = cache.clone();
async move {
sleep(Duration::from_millis(200)).await;
let maybe_v = cache6.get(KEY).await;
assert!(maybe_v.is_none());
}
};
let task7 = {
let cache7 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let maybe_v = cache7.get(KEY).await;
assert!(maybe_v.is_none());
}
};
let task8 = {
let cache8 = cache.clone();
async move {
sleep(Duration::from_millis(800)).await;
let maybe_v = cache8.get(KEY).await;
assert_eq!(maybe_v, Some("task3"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn optionally_get_with() {
let cache = Cache::new(100);
const KEY: u32 = 0;
let task1 = {
let cache1 = cache.clone();
async move {
let v = cache1
.optionally_get_with(KEY, async {
sleep(Duration::from_millis(300)).await;
None
})
.await;
assert!(v.is_none());
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
let v = cache2
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert!(v.is_none());
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let v = cache3
.optionally_get_with(KEY, async {
sleep(Duration::from_millis(300)).await;
Some("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(500)).await;
let v = cache4
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(800)).await;
let v = cache5
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task6 = {
let cache6 = cache.clone();
async move {
sleep(Duration::from_millis(200)).await;
let maybe_v = cache6.get(&KEY).await;
assert!(maybe_v.is_none());
}
};
let task7 = {
let cache7 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let maybe_v = cache7.get(&KEY).await;
assert!(maybe_v.is_none());
}
};
let task8 = {
let cache8 = cache.clone();
async move {
sleep(Duration::from_millis(800)).await;
let maybe_v = cache8.get(&KEY).await;
assert_eq!(maybe_v, Some("task3"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
}
#[tokio::test]
async fn optionally_get_with_by_ref() {
let cache = Cache::new(100);
const KEY: &u32 = &0;
let task1 = {
let cache1 = cache.clone();
async move {
let v = cache1
.optionally_get_with_by_ref(KEY, async {
sleep(Duration::from_millis(300)).await;
None
})
.await;
assert!(v.is_none());
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
let v = cache2
.optionally_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert!(v.is_none());
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let v = cache3
.optionally_get_with_by_ref(KEY, async {
sleep(Duration::from_millis(300)).await;
Some("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(500)).await;
let v = cache4
.optionally_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(800)).await;
let v = cache5
.optionally_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
let task6 = {
let cache6 = cache.clone();
async move {
sleep(Duration::from_millis(200)).await;
let maybe_v = cache6.get(KEY).await;
assert!(maybe_v.is_none());
}
};
let task7 = {
let cache7 = cache.clone();
async move {
sleep(Duration::from_millis(400)).await;
let maybe_v = cache7.get(KEY).await;
assert!(maybe_v.is_none());
}
};
let task8 = {
let cache8 = cache.clone();
async move {
sleep(Duration::from_millis(800)).await;
let maybe_v = cache8.get(KEY).await;
assert_eq!(maybe_v, Some("task3"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn upsert_with() {
let cache = Cache::new(100);
const KEY: u32 = 0;
let task1 = {
let cache1 = cache.clone();
async move {
cache1
.entry(KEY)
.and_upsert_with(|maybe_entry| async move {
sleep(Duration::from_millis(200)).await;
assert!(maybe_entry.is_none());
1
})
.await
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
cache2
.entry_by_ref(&KEY)
.and_upsert_with(|maybe_entry| async move {
sleep(Duration::from_millis(200)).await;
let entry = maybe_entry.expect("The entry should exist");
entry.into_value() + 1
})
.await
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(300)).await;
cache3
.entry_by_ref(&KEY)
.and_upsert_with(|maybe_entry| async move {
sleep(Duration::from_millis(100)).await;
let entry = maybe_entry.expect("The entry should exist");
entry.into_value() + 1
})
.await
}
};
let (ent1, ent2, ent3) = futures_util::join!(task1, task2, task3);
assert_eq!(ent1.into_value(), 1);
assert_eq!(ent2.into_value(), 2);
assert_eq!(ent3.into_value(), 3);
assert_eq!(cache.get(&KEY).await, Some(3));
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn compute_with() {
use crate::ops::compute;
use tokio::sync::RwLock;
let cache = Cache::new(100);
const KEY: u32 = 0;
let task1 = {
let cache1 = cache.clone();
async move {
cache1
.entry(KEY)
.and_compute_with(|maybe_entry| async move {
sleep(Duration::from_millis(200)).await;
assert!(maybe_entry.is_none());
compute::Op::Put(Arc::new(RwLock::new(vec![1])))
})
.await
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
cache2
.entry_by_ref(&KEY)
.and_compute_with(|maybe_entry| async move {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().await, vec![1]);
sleep(Duration::from_millis(200)).await;
value.write().await.push(2);
compute::Op::Put(value)
})
.await
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(300)).await;
cache3
.entry(KEY)
.and_compute_with(|maybe_entry| async move {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().await, vec![1, 2]);
sleep(Duration::from_millis(200)).await;
compute::Op::Remove
})
.await
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(500)).await;
cache4
.entry(KEY)
.and_compute_with(|maybe_entry| async move {
assert!(maybe_entry.is_none());
sleep(Duration::from_millis(200)).await;
compute::Op::Nop
})
.await
}
};
let task5 = {
let cache5 = cache.clone();
async move {
sleep(Duration::from_millis(700)).await;
cache5
.entry_by_ref(&KEY)
.and_compute_with(|maybe_entry| async move {
assert!(maybe_entry.is_none());
sleep(Duration::from_millis(200)).await;
compute::Op::Put(Arc::new(RwLock::new(vec![5])))
})
.await
}
};
let task6 = {
let cache6 = cache.clone();
async move {
sleep(Duration::from_millis(900)).await;
cache6
.entry_by_ref(&KEY)
.and_compute_with(|maybe_entry| async move {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().await, vec![5]);
sleep(Duration::from_millis(100)).await;
compute::Op::Nop
})
.await
}
};
let (res1, res2, res3, res4, res5, res6) =
futures_util::join!(task1, task2, task3, task4, task5, task6);
let compute::CompResult::Inserted(entry) = res1 else {
panic!("Expected `Inserted`. Got {res1:?}")
};
assert_eq!(
*entry.into_value().read().await,
vec![1, 2] );
let compute::CompResult::ReplacedWith(entry) = res2 else {
panic!("Expected `ReplacedWith`. Got {res2:?}")
};
assert_eq!(*entry.into_value().read().await, vec![1, 2]);
let compute::CompResult::Removed(entry) = res3 else {
panic!("Expected `Removed`. Got {res3:?}")
};
assert_eq!(*entry.into_value().read().await, vec![1, 2]);
let compute::CompResult::StillNone(key) = res4 else {
panic!("Expected `StillNone`. Got {res4:?}")
};
assert_eq!(*key, KEY);
let compute::CompResult::Inserted(entry) = res5 else {
panic!("Expected `Inserted`. Got {res5:?}")
};
assert_eq!(*entry.into_value().read().await, vec![5]);
let compute::CompResult::Unchanged(entry) = res6 else {
panic!("Expected `Unchanged`. Got {res6:?}")
};
assert_eq!(*entry.into_value().read().await, vec![5]);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn try_compute_with() {
use crate::ops::compute;
use tokio::sync::RwLock;
let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
const KEY: u32 = 0;
let task1 = {
let cache1 = cache.clone();
async move {
cache1
.entry(KEY)
.and_try_compute_with(|maybe_entry| async move {
sleep(Duration::from_millis(200)).await;
assert!(maybe_entry.is_none());
Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
})
.await
}
};
let task2 = {
let cache2 = cache.clone();
async move {
sleep(Duration::from_millis(100)).await;
cache2
.entry_by_ref(&KEY)
.and_try_compute_with(|maybe_entry| async move {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().await, vec![1]);
sleep(Duration::from_millis(200)).await;
value.write().await.push(2);
Ok(compute::Op::Put(value)) as Result<_, ()>
})
.await
}
};
let task3 = {
let cache3 = cache.clone();
async move {
sleep(Duration::from_millis(300)).await;
cache3
.entry(KEY)
.and_try_compute_with(|maybe_entry| async move {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().await, vec![1, 2]);
sleep(Duration::from_millis(200)).await;
Err(())
})
.await
}
};
let task4 = {
let cache4 = cache.clone();
async move {
sleep(Duration::from_millis(500)).await;
cache4
.entry(KEY)
.and_try_compute_with(|maybe_entry| async move {
let entry = maybe_entry.expect("The entry should exist");
let value = entry.into_value();
assert_eq!(*value.read().await, vec![1, 2]);
sleep(Duration::from_millis(100)).await;
Ok(compute::Op::Remove) as Result<_, ()>
})
.await
}
};
let (res1, res2, res3, res4) = futures_util::join!(task1, task2, task3, task4);
let Ok(compute::CompResult::Inserted(entry)) = res1 else {
panic!("Expected `Inserted`. Got {res1:?}")
};
assert_eq!(
*entry.into_value().read().await,
vec![1, 2] );
let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
panic!("Expected `ReplacedWith`. Got {res2:?}")
};
assert_eq!(*entry.into_value().read().await, vec![1, 2]);
assert!(res3.is_err());
let Ok(compute::CompResult::Removed(entry)) = res4 else {
panic!("Expected `Removed`. Got {res4:?}")
};
assert_eq!(
*entry.into_value().read().await,
vec![1, 2] );
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn handle_panic_in_get_with() {
use tokio::time::{sleep, Duration};
let cache = Cache::new(16);
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
{
let cache_ref = cache.clone();
let semaphore_ref = semaphore.clone();
tokio::task::spawn(async move {
let _ = cache_ref
.get_with(1, async move {
semaphore_ref.add_permits(1);
sleep(Duration::from_millis(50)).await;
panic!("Panic during try_get_with");
})
.await;
});
}
let _ = semaphore.acquire().await.expect("semaphore acquire failed");
assert_eq!(cache.get_with(1, async { 5 }).await, 5);
}
#[tokio::test]
async fn handle_panic_in_try_get_with() {
use tokio::time::{sleep, Duration};
let cache = Cache::new(16);
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
{
let cache_ref = cache.clone();
let semaphore_ref = semaphore.clone();
tokio::task::spawn(async move {
let _ = cache_ref
.try_get_with(1, async move {
semaphore_ref.add_permits(1);
sleep(Duration::from_millis(50)).await;
panic!("Panic during try_get_with");
})
.await as Result<_, Arc<Infallible>>;
});
}
let _ = semaphore.acquire().await.expect("semaphore acquire failed");
assert_eq!(
cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
Ok(5)
);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn abort_get_with() {
use tokio::time::{sleep, Duration};
let cache = Cache::new(16);
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
let handle;
{
let cache_ref = cache.clone();
let semaphore_ref = semaphore.clone();
handle = tokio::task::spawn(async move {
let _ = cache_ref
.get_with(1, async move {
semaphore_ref.add_permits(1);
sleep(Duration::from_millis(50)).await;
unreachable!();
})
.await;
});
}
let _ = semaphore.acquire().await.expect("semaphore acquire failed");
handle.abort();
assert_eq!(cache.get_with(1, async { 5 }).await, 5);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn abort_try_get_with() {
use tokio::time::{sleep, Duration};
let cache = Cache::new(16);
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
let handle;
{
let cache_ref = cache.clone();
let semaphore_ref = semaphore.clone();
handle = tokio::task::spawn(async move {
let _ = cache_ref
.try_get_with(1, async move {
semaphore_ref.add_permits(1);
sleep(Duration::from_millis(50)).await;
unreachable!();
})
.await as Result<_, Arc<Infallible>>;
});
}
let _ = semaphore.acquire().await.expect("semaphore acquire failed");
handle.abort();
assert_eq!(
cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
Ok(5)
);
assert!(cache.is_waiter_map_empty());
}
#[tokio::test]
async fn test_removal_notifications() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let mut cache = Cache::builder()
.max_capacity(3)
.async_eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert('a', "alice").await;
cache.invalidate(&'a').await;
expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 0);
cache.insert('b', "bob").await;
cache.insert('c', "cathy").await;
cache.insert('d', "david").await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 3);
cache.insert('e', "emily").await;
expected.push((Arc::new('e'), "emily", RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 3);
cache.get(&'e').await;
cache.run_pending_tasks().await;
cache.insert('e', "eliza").await;
expected.push((Arc::new('b'), "bob", RemovalCause::Size));
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 3);
cache.insert('d', "dennis").await;
expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 3);
verify_notification_vec(&cache, actual, &expected).await;
}
#[tokio::test]
async fn test_removal_notifications_with_updates() {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let (clock, mock) = Clock::mock();
let mut cache = Cache::builder()
.async_eviction_listener(listener)
.time_to_live(Duration::from_secs(7))
.time_to_idle(Duration::from_secs(5))
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("alice", "a0").await;
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(6));
expected.push((Arc::new("alice"), "a0", RemovalCause::Expired));
assert_eq!(cache.get(&"alice").await, None);
assert_eq!(cache.entry_count(), 1);
cache.insert("alice", "a1").await;
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice").await, Some("a1"));
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(4));
expected.push((Arc::new("alice"), "a1", RemovalCause::Expired));
assert_eq!(cache.get(&"alice").await, None);
assert_eq!(cache.entry_count(), 1);
cache.insert("alice", "a2").await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(6));
expected.push((Arc::new("alice"), "a2", RemovalCause::Expired));
assert_eq!(cache.get(&"alice").await, None);
assert_eq!(cache.entry_count(), 1);
cache.invalidate(&"alice").await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 0);
cache.insert("alice", "a3").await;
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice").await, Some("a3"));
cache.run_pending_tasks().await;
mock.increment(Duration::from_secs(4));
expected.push((Arc::new("alice"), "a3", RemovalCause::Expired));
assert_eq!(cache.get(&"alice").await, None);
assert_eq!(cache.entry_count(), 1);
cache.invalidate(&"alice").await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 0);
verify_notification_vec(&cache, actual, &expected).await;
}
#[tokio::test]
async fn no_batch_size_limit_on_eviction() {
const MAX_CAPACITY: u64 = 20;
const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
const MAX_LOG_SYNC_REPEATS: u32 = 1;
const EVICTION_BATCH_SIZE: u32 = 1;
let hk_conf = HousekeeperConfig::new(
Some(EVICTION_TIMEOUT),
Some(MAX_LOG_SYNC_REPEATS),
Some(EVICTION_BATCH_SIZE),
);
let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY)
.eviction_policy(EvictionPolicy::lru())
.housekeeper_config(hk_conf)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
for i in 0..MAX_CAPACITY {
let v = format!("v{i}");
cache.insert(i, v).await
}
assert_eq!(cache.entry_count(), 0);
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), MAX_CAPACITY);
for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
let v = format!("v{i}");
cache.insert(i, v).await
}
assert_eq!(cache.entry_count(), MAX_CAPACITY);
assert!(cache.contains_key(&0)); assert!(cache.contains_key(&(MAX_CAPACITY - 1))); assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), MAX_CAPACITY);
assert!(!cache.contains_key(&0));
assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
}
#[tokio::test]
async fn slow_eviction_listener() {
const MAX_CAPACITY: u64 = 20;
const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
const LISTENER_DELAY: Duration = Duration::from_millis(11);
const MAX_LOG_SYNC_REPEATS: u32 = 1;
const EVICTION_BATCH_SIZE: u32 = 1;
let hk_conf = HousekeeperConfig::new(
Some(EVICTION_TIMEOUT),
Some(MAX_LOG_SYNC_REPEATS),
Some(EVICTION_BATCH_SIZE),
);
let (clock, mock) = Clock::mock();
let listener_call_count = Arc::new(AtomicU8::new(0));
let lcc = Arc::clone(&listener_call_count);
let listener = move |_k, _v, _cause| {
mock.increment(LISTENER_DELAY);
lcc.fetch_add(1, Ordering::AcqRel);
};
let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY)
.eviction_policy(EvictionPolicy::lru())
.eviction_listener(listener)
.housekeeper_config(hk_conf)
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
for i in 0..MAX_CAPACITY {
let v = format!("v{i}");
cache.insert(i, v).await
}
assert_eq!(cache.entry_count(), 0);
cache.run_pending_tasks().await;
assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
assert_eq!(cache.entry_count(), MAX_CAPACITY);
for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
let v = format!("v{i}");
cache.insert(i, v).await
}
assert_eq!(cache.entry_count(), MAX_CAPACITY);
cache.run_pending_tasks().await;
let mut expected_call_count = 3;
assert_eq!(
listener_call_count.load(Ordering::Acquire) as u64,
expected_call_count
);
assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
loop {
cache.run_pending_tasks().await;
expected_call_count += 3;
if expected_call_count > MAX_CAPACITY {
expected_call_count = MAX_CAPACITY;
}
let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
assert_eq!(actual_count, expected_call_count);
let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
assert_eq!(cache.entry_count(), expected_entry_count);
if expected_call_count >= MAX_CAPACITY {
break;
}
}
assert_eq!(cache.entry_count(), MAX_CAPACITY);
}
#[tokio::test]
async fn recover_from_panicking_eviction_listener() {
#[cfg(feature = "logging")]
let _ = env_logger::builder().is_test(true).try_init();
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
if v == "panic now!" {
panic!("Panic now!");
}
a2.lock().await.push((k, v, cause));
}
.boxed()
};
let mut cache = Cache::builder()
.name("My Future Cache")
.async_eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert("alice", "a0").await;
cache.run_pending_tasks().await;
cache.insert("alice", "panic now!").await;
expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
cache.run_pending_tasks().await;
cache.insert("alice", "a2").await;
cache.run_pending_tasks().await;
cache.invalidate(&"alice").await;
cache.run_pending_tasks().await;
verify_notification_vec(&cache, actual, &expected).await;
}
#[tokio::test]
async fn cancel_future_while_running_pending_tasks() {
use crate::future::FutureExt;
use futures_util::future::poll_immediate;
use tokio::task::yield_now;
let listener_initiation_count: Arc<AtomicU32> = Default::default();
let listener_completion_count: Arc<AtomicU32> = Default::default();
let listener = {
let init_count = Arc::clone(&listener_initiation_count);
let comp_count = Arc::clone(&listener_completion_count);
move |_k, _v, _r| {
init_count.fetch_add(1, Ordering::AcqRel);
let comp_count1 = Arc::clone(&comp_count);
async move {
yield_now().await;
comp_count1.fetch_add(1, Ordering::AcqRel);
}
.boxed()
}
};
let (clock, mock) = Clock::mock();
let mut cache: Cache<u32, u32> = Cache::builder()
.time_to_live(Duration::from_millis(10))
.async_eviction_listener(listener)
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert(1, 1).await;
assert_eq!(cache.run_pending_tasks_initiation_count(), 0);
assert_eq!(cache.run_pending_tasks_completion_count(), 0);
mock.increment(Duration::from_millis(7));
cache.run_pending_tasks().await;
assert_eq!(cache.run_pending_tasks_initiation_count(), 1);
assert_eq!(cache.run_pending_tasks_completion_count(), 1);
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 0);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
mock.increment(Duration::from_millis(7));
let fut = cache.run_pending_tasks();
assert!(poll_immediate(fut).await.is_none());
assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
assert_eq!(cache.run_pending_tasks_completion_count(), 1);
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
cache.run_pending_tasks().await;
assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
assert_eq!(cache.run_pending_tasks_completion_count(), 2);
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
}
#[tokio::test]
async fn cancel_future_while_calling_eviction_listener() {
use crate::future::FutureExt;
use futures_util::future::poll_immediate;
use tokio::task::yield_now;
let listener_initiation_count: Arc<AtomicU32> = Default::default();
let listener_completion_count: Arc<AtomicU32> = Default::default();
let listener = {
let init_count = Arc::clone(&listener_initiation_count);
let comp_count = Arc::clone(&listener_completion_count);
move |_k, _v, _r| {
init_count.fetch_add(1, Ordering::AcqRel);
let comp_count1 = Arc::clone(&comp_count);
async move {
yield_now().await;
comp_count1.fetch_add(1, Ordering::AcqRel);
}
.boxed()
}
};
let mut cache: Cache<u32, u32> = Cache::builder()
.time_to_live(Duration::from_millis(10))
.async_eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache.insert(1, 1).await;
let fut = cache.insert(1, 2);
assert!(poll_immediate(fut).await.is_none());
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
cache.run_pending_tasks().await;
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
let fut = cache.invalidate(&1);
assert!(poll_immediate(fut).await.is_none());
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
cache.get(&99).await;
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 2);
let prepare = || async {
cache.invalidate(&1).await;
listener_initiation_count.store(0, Ordering::Release);
listener_completion_count.store(0, Ordering::Release);
cache.insert(1, 1).await;
let fut = cache.insert(1, 2);
assert!(poll_immediate(fut).await.is_none());
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
};
prepare().await;
cache.insert(99, 99).await;
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
prepare().await;
cache.invalidate(&88).await;
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
prepare().await;
cache.remove(&77).await;
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
}
#[tokio::test]
async fn cancel_future_while_scheduling_write_op() {
use futures_util::future::poll_immediate;
let mut cache: Cache<u32, u32> = Cache::builder().build();
cache.reconfigure_for_testing().await;
let cache = cache;
cache
.schedule_write_op_should_block
.store(true, Ordering::Release);
let fut = cache.insert(1, 1);
assert!(poll_immediate(fut).await.is_none());
assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
assert_eq!(cache.base.write_op_ch.len(), 0);
cache
.schedule_write_op_should_block
.store(false, Ordering::Release);
cache.get(&99).await;
assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
assert_eq!(cache.base.write_op_ch.len(), 1);
cache.run_pending_tasks().await;
assert_eq!(cache.base.write_op_ch.len(), 0);
cache
.schedule_write_op_should_block
.store(true, Ordering::Release);
let fut = cache.invalidate(&1);
assert!(poll_immediate(fut).await.is_none());
assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
assert_eq!(cache.base.write_op_ch.len(), 0);
cache
.schedule_write_op_should_block
.store(false, Ordering::Release);
cache.get(&99).await;
assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
assert_eq!(cache.base.write_op_ch.len(), 1);
cache.run_pending_tasks().await;
assert_eq!(cache.base.write_op_ch.len(), 0);
}
#[tokio::test]
async fn borrowed_forms_of_key() {
let cache: Cache<Vec<u8>, ()> = Cache::new(1);
let key = vec![1_u8];
cache.insert(key.clone(), ()).await;
let key_v: &Vec<u8> = &key;
assert!(cache.contains_key(key_v));
assert_eq!(cache.get(key_v).await, Some(()));
cache.invalidate(key_v).await;
cache.insert(key, ()).await;
let key_s: &[u8] = &[1_u8];
assert!(cache.contains_key(key_s));
assert_eq!(cache.get(key_s).await, Some(()));
cache.invalidate(key_s).await;
}
#[tokio::test]
#[cfg_attr(not(run_flaky_tests), ignore)]
async fn drop_value_immediately_after_eviction() {
use crate::common::test_utils::{Counters, Value};
const MAX_CAPACITY: u32 = 500;
const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
let counters = Arc::new(Counters::default());
let counters1 = Arc::clone(&counters);
let listener = move |_k, _v, cause| match cause {
RemovalCause::Size => counters1.incl_evicted(),
RemovalCause::Explicit => counters1.incl_invalidated(),
_ => (),
};
let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY as u64)
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;
let cache = cache;
for key in 0..KEYS {
let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
cache.insert(key, value).await;
counters.incl_inserted();
cache.run_pending_tasks().await;
}
let eviction_count = KEYS - MAX_CAPACITY;
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
std::thread::sleep(Duration::from_millis(500));
if counters.evicted() != eviction_count || counters.value_dropped() != eviction_count {
if retries <= MAX_RETRIES {
retries += 1;
cache.run_pending_tasks().await;
continue;
} else {
assert_eq!(counters.evicted(), eviction_count, "Retries exhausted");
assert_eq!(
counters.value_dropped(),
eviction_count,
"Retries exhausted"
);
}
}
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), 0, "invalidated");
assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
break;
}
for key in 0..KEYS {
cache.invalidate(&key).await;
cache.run_pending_tasks().await;
}
let mut retries = 0;
loop {
std::thread::sleep(Duration::from_millis(500));
if counters.invalidated() != MAX_CAPACITY || counters.value_dropped() != KEYS {
if retries <= MAX_RETRIES {
retries += 1;
cache.run_pending_tasks().await;
continue;
} else {
assert_eq!(counters.invalidated(), MAX_CAPACITY, "Retries exhausted");
assert_eq!(counters.value_dropped(), KEYS, "Retries exhausted");
}
}
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
break;
}
std::mem::drop(cache);
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
}
#[tokio::test]
#[cfg_attr(not(run_flaky_tests), ignore)]
async fn ensure_gc_runs_when_dropping_cache() {
let cache = Cache::builder().build();
let val = Arc::new(0);
cache
.get_with(1, std::future::ready(Arc::clone(&val)))
.await;
drop(cache);
assert_eq!(Arc::strong_count(&val), 1);
}
#[tokio::test]
async fn test_debug_format() {
let cache = Cache::new(10);
cache.insert('a', "alice").await;
cache.insert('b', "bob").await;
cache.insert('c', "cindy").await;
let debug_str = format!("{cache:?}");
assert!(debug_str.starts_with('{'));
assert!(debug_str.contains(r#"'a': "alice""#));
assert!(debug_str.contains(r#"'b': "bob""#));
assert!(debug_str.contains(r#"'c': "cindy""#));
assert!(debug_str.ends_with('}'));
}
type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
async fn verify_notification_vec<K, V, S>(
cache: &Cache<K, V, S>,
actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
expected: &[NotificationTuple<K, V>],
) where
K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
{
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
std::thread::sleep(Duration::from_millis(500));
let actual = &*actual.lock().await;
if actual.len() != expected.len() {
if retries <= MAX_RETRIES {
retries += 1;
cache.run_pending_tasks().await;
continue;
} else {
assert_eq!(actual.len(), expected.len(), "Retries exhausted");
}
}
for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
assert_eq!(actual, expected, "expected[{i}]");
}
break;
}
}
#[tokio::test]
async fn expire_after_update_none_on_expired_entry() {
use std::sync::atomic::{AtomicBool, Ordering};
let should_expire = Arc::new(AtomicBool::new(true));
struct TestExpiry {
should_expire: Arc<AtomicBool>,
}
impl Expiry<String, String> for TestExpiry {
fn expire_after_create(
&self,
_key: &String,
_value: &String,
_current_time: StdInstant,
) -> Option<Duration> {
if self.should_expire.load(Ordering::SeqCst) {
Some(Duration::ZERO)
} else {
None
}
}
fn expire_after_update(
&self,
_key: &String,
_value: &String,
_current_time: StdInstant,
_duration_until_expiry: Option<Duration>,
) -> Option<Duration> {
if self.should_expire.load(Ordering::SeqCst) {
Some(Duration::ZERO)
} else {
None
}
}
}
let expiry = TestExpiry {
should_expire: Arc::clone(&should_expire),
};
let mut cache: Cache<String, String> = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.build();
cache.reconfigure_for_testing().await;
let key = "test_key".to_string();
cache.insert(key.clone(), "first_value".to_string()).await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 1, "Entry should exist in cache");
assert_eq!(
cache.get(&key).await,
None,
"Entry should not be accessible (expired)"
);
cache.run_pending_tasks().await;
should_expire.store(false, Ordering::SeqCst);
cache.insert(key.clone(), "second_value".to_string()).await;
cache.run_pending_tasks().await;
assert_eq!(cache.entry_count(), 1, "Entry should exist in cache");
let result = cache.get(&key).await;
assert_eq!(
result,
Some("second_value".to_string()),
"Entry should be accessible after clearing expiration"
);
}
#[tokio::test]
async fn test_expire_after_create_only_no_stale_entries() {
use std::sync::atomic::AtomicBool;
struct TestExpiry {
should_expire: Arc<AtomicBool>,
}
impl Expiry<String, String> for TestExpiry {
fn expire_after_create(
&self,
_key: &String,
_value: &String,
_current_time: StdInstant,
) -> Option<Duration> {
if self.should_expire.load(Ordering::SeqCst) {
Some(Duration::from_secs(1))
} else {
None
}
}
}
let (clock, mock) = Clock::mock();
let should_expire = Arc::new(AtomicBool::new(true));
let mut cache: Cache<String, String> = Cache::builder()
.max_capacity(100)
.expire_after(TestExpiry {
should_expire: Arc::clone(&should_expire),
})
.clock(clock)
.build();
cache.reconfigure_for_testing().await;
let key = "key1".to_string();
cache.insert(key.clone(), "value1".to_string()).await;
cache.run_pending_tasks().await;
assert_eq!(cache.get(&key).await, Some("value1".to_string()));
mock.increment(Duration::from_secs(2));
assert_eq!(cache.get(&key).await, None, "Entry should be expired");
should_expire.store(false, Ordering::SeqCst);
cache.insert(key.clone(), "value2".to_string()).await;
cache.run_pending_tasks().await;
assert_eq!(
cache.get(&key).await,
Some("value2".to_string()),
"Re-inserted entry with no expiry should be accessible"
);
should_expire.store(true, Ordering::SeqCst);
let key2 = "key2".to_string();
cache.insert(key2.clone(), "v1".to_string()).await;
cache.run_pending_tasks().await;
assert_eq!(cache.get(&key2).await, Some("v1".to_string()));
mock.increment(Duration::from_secs(2));
assert_eq!(cache.get(&key2).await, None, "key2 should be expired");
cache.insert(key2.clone(), "v2".to_string()).await;
cache.run_pending_tasks().await;
assert_eq!(
cache.get(&key2).await,
Some("v2".to_string()),
"Re-inserted key2 should be accessible with fresh TTL"
);
mock.increment(Duration::from_secs(2));
assert_eq!(
cache.get(&key2).await,
None,
"key2 should expire with fresh TTL from expire_after_create"
);
}
}