use super::{
value_initializer::{InitResult, ValueInitializer},
CacheBuilder, ConcurrentCacheExt,
};
use crate::{
common::{
concurrent::{
constants::{MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS},
housekeeper::{self, InnerSync},
Weigher, WriteOp,
},
time::Instant,
},
notification::{self, EvictionListener},
sync::{Iter, PredicateId},
sync_base::{
base_cache::{BaseCache, HouseKeeperArc},
iter::ScanningGet,
},
Policy, PredicateError,
};
use crossbeam_channel::{Sender, TrySendError};
use std::{
borrow::Borrow,
collections::hash_map::RandomState,
fmt,
hash::{BuildHasher, Hash},
sync::Arc,
time::Duration,
};
pub struct Cache<K, V, S = RandomState> {
base: BaseCache<K, V, S>,
value_initializer: Arc<ValueInitializer<K, V, S>>,
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<K, V, S> Send for Cache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Send,
{
}
unsafe impl<K, V, S> Sync for Cache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Sync,
{
}
impl<K, V, S> Clone for Cache<K, V, S> {
fn clone(&self) -> Self {
Self {
base: self.base.clone(),
value_initializer: Arc::clone(&self.value_initializer),
}
}
}
impl<K, V, S> fmt::Debug for Cache<K, V, S>
where
K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
V: fmt::Debug + Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d_map = f.debug_map();
for (k, v) in self.iter() {
d_map.entry(&k, &v);
}
d_map.finish()
}
}
impl<K, V, S> Cache<K, V, S> {
pub fn name(&self) -> Option<&str> {
self.base.name()
}
pub fn policy(&self) -> Policy {
self.base.policy()
}
pub fn entry_count(&self) -> u64 {
self.base.entry_count()
}
pub fn weighted_size(&self) -> u64 {
self.base.weighted_size()
}
}
impl<K, V> Cache<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
pub fn new(max_capacity: u64) -> Self {
let build_hasher = RandomState::default();
let housekeeper_conf = housekeeper::Configuration::new_thread_pool(true);
Self::with_everything(
None,
Some(max_capacity),
None,
build_hasher,
None,
None,
None,
None,
None,
false,
housekeeper_conf,
)
}
pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
CacheBuilder::default()
}
}
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn with_everything(
name: Option<String>,
max_capacity: Option<u64>,
initial_capacity: Option<usize>,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_listener: Option<EvictionListener<K, V>>,
eviction_listener_conf: Option<notification::Configuration>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
housekeeper_conf: housekeeper::Configuration,
) -> Self {
Self {
base: BaseCache::new(
name,
max_capacity,
initial_capacity,
build_hasher.clone(),
weigher,
eviction_listener,
eviction_listener_conf,
time_to_live,
time_to_idle,
invalidator_enabled,
housekeeper_conf,
),
value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
}
}
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.base.contains_key_with_hash(key, self.base.hash(key))
}
pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.base.contains_key_with_hash(key, hash)
}
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.base.get_with_hash(key, self.base.hash(key))
}
pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.base.get_with_hash(key, hash)
}
#[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
self.get_with(key, init)
}
#[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
self.try_get_with(key, init)
}
pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
let hash = self.base.hash(&key);
let key = Arc::new(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if)
}
pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if)
}
pub fn get_with_if(
&self,
key: K,
init: impl FnOnce() -> V,
replace_if: impl FnMut(&V) -> bool,
) -> V {
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if))
}
pub(crate) fn get_or_insert_with_hash_and_fun(
&self,
key: Arc<K>,
hash: u64,
init: impl FnOnce() -> V,
mut replace_if: Option<impl FnMut(&V) -> bool>,
) -> V {
self.base
.get_with_hash_but_ignore_if(&key, hash, replace_if.as_mut())
.unwrap_or_else(|| self.insert_with_hash_and_fun(key, hash, init, replace_if))
}
pub(crate) fn get_or_insert_with_hash_by_ref_and_fun<Q>(
&self,
key: &Q,
hash: u64,
init: impl FnOnce() -> V,
mut replace_if: Option<impl FnMut(&V) -> bool>,
) -> V
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
self.base
.get_with_hash_but_ignore_if(key, hash, replace_if.as_mut())
.unwrap_or_else(|| {
let key = Arc::new(key.to_owned());
self.insert_with_hash_and_fun(key, hash, init, replace_if)
})
}
pub(crate) fn insert_with_hash_and_fun(
&self,
key: Arc<K>,
hash: u64,
init: impl FnOnce() -> V,
mut replace_if: Option<impl FnMut(&V) -> bool>,
) -> V {
let get = || {
self.base
.get_with_hash_but_no_recording(&key, hash, replace_if.as_mut())
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v);
match self
.value_initializer
.init_or_read(Arc::clone(&key), get, init, insert)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
v
}
InitResult::ReadExisting(v) => v,
InitResult::InitErr(_) => unreachable!(),
}
}
pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
where
F: FnOnce() -> Option<V>,
{
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_optionally_insert_with_hash_and_fun(key, hash, init)
}
pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
where
F: FnOnce() -> Option<V>,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init)
}
pub(super) fn get_or_optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Option<V>
where
F: FnOnce() -> Option<V>,
{
let res = self.get_with_hash(&key, hash);
if res.is_some() {
return res;
}
self.optionally_insert_with_hash_and_fun(key, hash, init)
}
pub(super) fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
&self,
key: &Q,
hash: u64,
init: F,
) -> Option<V>
where
F: FnOnce() -> Option<V>,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let res = self.get_with_hash(key, hash);
if res.is_some() {
return res;
}
let key = Arc::new(key.to_owned());
self.optionally_insert_with_hash_and_fun(key, hash, init)
}
pub(super) fn optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Option<V>
where
F: FnOnce() -> Option<V>,
{
let get = || {
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash_but_no_recording(&key, hash, ignore_if)
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v);
match self
.value_initializer
.optionally_init_or_read(Arc::clone(&key), get, init, insert)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Some(v)
}
InitResult::ReadExisting(v) => Some(v),
InitResult::InitErr(_) => {
crossbeam_epoch::pin().flush();
None
}
}
}
pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_try_insert_with_hash_and_fun(key, hash, init)
}
pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init)
}
pub(crate) fn get_or_try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
if let Some(v) = self.get_with_hash(&key, hash) {
return Ok(v);
}
self.try_insert_with_hash_and_fun(key, hash, init)
}
pub(crate) fn get_or_try_insert_with_hash_by_ref_and_fun<F, Q, E>(
&self,
key: &Q,
hash: u64,
init: F,
) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
if let Some(v) = self.get_with_hash(key, hash) {
return Ok(v);
}
let key = Arc::new(key.to_owned());
self.try_insert_with_hash_and_fun(key, hash, init)
}
pub(crate) fn try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
let get = || {
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash_but_no_recording(&key, hash, ignore_if)
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v);
match self
.value_initializer
.try_init_or_read(Arc::clone(&key), get, init, insert)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Ok(v)
}
InitResult::ReadExisting(v) => Ok(v),
InitResult::InitErr(e) => {
crossbeam_epoch::pin().flush();
Err(e)
}
}
}
pub fn insert(&self, key: K, value: V) {
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.insert_with_hash(key, hash, value)
}
pub(crate) fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
let (op, now) = self.base.do_insert_with_hash(key, hash, value);
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.expect("Failed to insert");
}
pub fn invalidate<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
self.invalidate_with_hash(key, hash);
}
pub(crate) fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64)
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let mut kl = None;
let mut klg = None;
if self.base.is_removal_notifier_enabled() && self.base.is_blocking_removal_notification() {
if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
kl = self.base.maybe_key_lock(&arc_key);
klg = kl.as_ref().map(|kl| kl.lock());
}
}
if let Some(kv) = self.base.remove_entry(key, hash) {
if self.base.is_removal_notifier_enabled() {
self.base.notify_invalidate(&kv.key, &kv.entry)
}
std::mem::drop(klg);
std::mem::drop(kl);
let op = WriteOp::Remove(kv);
let now = self.base.current_time_from_expiration_clock();
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.expect("Failed to remove");
crossbeam_epoch::pin().flush();
}
}
pub fn invalidate_all(&self) {
self.base.invalidate_all();
}
pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
where
F: Fn(&K, &V) -> bool + Send + Sync + 'static,
{
self.base.invalidate_entries_if(Arc::new(predicate))
}
pub(crate) fn invalidate_entries_with_arc_fun<F>(
&self,
predicate: Arc<F>,
) -> Result<PredicateId, PredicateError>
where
F: Fn(&K, &V) -> bool + Send + Sync + 'static,
{
self.base.invalidate_entries_if(predicate)
}
pub fn iter(&self) -> Iter<'_, K, V> {
Iter::with_single_cache_segment(&self.base, self.num_cht_segments())
}
}
impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
type Item = (Arc<K>, V);
type IntoIter = Iter<'a, K, V>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<K, V, S> ConcurrentCacheExt<K, V> for Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn sync(&self) {
self.base.inner.sync(MAX_SYNC_REPEATS);
}
}
impl<K, V, S> ScanningGet<K, V> for Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn num_cht_segments(&self) -> usize {
self.base.num_cht_segments()
}
fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
self.base.scanning_get(key)
}
fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
self.base.keys(cht_segment)
}
}
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
#[inline]
fn schedule_write_op(
inner: &impl InnerSync,
ch: &Sender<WriteOp<K, V>>,
op: WriteOp<K, V>,
now: Instant,
housekeeper: Option<&HouseKeeperArc<K, V, S>>,
) -> Result<(), TrySendError<WriteOp<K, V>>> {
let mut op = op;
loop {
BaseCache::apply_reads_writes_if_needed(inner, ch, now, housekeeper);
match ch.try_send(op) {
Ok(()) => break,
Err(TrySendError::Full(op1)) => {
op = op1;
std::thread::sleep(Duration::from_micros(WRITE_RETRY_INTERVAL_MICROS));
}
Err(e @ TrySendError::Disconnected(_)) => return Err(e),
}
}
Ok(())
}
}
#[cfg(test)]
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
pub(crate) fn is_table_empty(&self) -> bool {
self.entry_count() == 0
}
pub(crate) fn invalidation_predicate_count(&self) -> usize {
self.base.invalidation_predicate_count()
}
pub(crate) fn reconfigure_for_testing(&mut self) {
self.base.reconfigure_for_testing();
}
pub(crate) fn set_expiration_clock(&self, clock: Option<crate::common::time::Clock>) {
self.base.set_expiration_clock(clock);
}
}
#[cfg(test)]
mod tests {
use super::{Cache, ConcurrentCacheExt};
use crate::{
common::time::Clock,
notification::{
self,
macros::{assert_eq_with_mode, assert_with_mode},
DeliveryMode, RemovalCause,
},
};
use parking_lot::Mutex;
use std::{convert::Infallible, sync::Arc, time::Duration};
#[test]
fn basic_single_thread() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
cache.insert("b", "bob");
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
cache.sync();
cache.insert("c", "cindy");
assert_eq_with_mode!(cache.get(&"c"), Some("cindy"), delivery_mode);
assert_with_mode!(cache.contains_key(&"c"), delivery_mode);
cache.sync();
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
cache.sync();
cache.insert("d", "david"); expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
cache.insert("d", "david");
expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.sync();
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode);
cache.insert("d", "dennis");
expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
assert_eq_with_mode!(cache.get(&"c"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), Some("dennis"), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"c"), delivery_mode);
assert_with_mode!(cache.contains_key(&"d"), delivery_mode);
cache.invalidate(&"b");
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
cache.sync();
assert_eq_with_mode!(cache.get(&"b"), None, delivery_mode);
assert_with_mode!(!cache.contains_key(&"b"), delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn size_aware_eviction() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
let weigher = |_k: &&str, v: &(&str, u32)| v.1;
let alice = ("alice", 10);
let bob = ("bob", 15);
let bill = ("bill", 20);
let cindy = ("cindy", 5);
let david = ("david", 15);
let dennis = ("dennis", 15);
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
let mut cache = Cache::builder()
.max_capacity(31)
.weigher(weigher)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", alice);
cache.insert("b", bob);
assert_eq_with_mode!(cache.get(&"a"), Some(alice), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some(bob), delivery_mode);
cache.sync();
cache.insert("c", cindy);
assert_eq_with_mode!(cache.get(&"c"), Some(cindy), delivery_mode);
assert_with_mode!(cache.contains_key(&"c"), delivery_mode);
cache.sync();
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_eq_with_mode!(cache.get(&"a"), Some(alice), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some(bob), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
cache.sync();
cache.insert("d", david); expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode);
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode);
cache.insert("d", dennis);
expected.push((Arc::new("c"), cindy, RemovalCause::Size));
expected.push((Arc::new("a"), alice, RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some(bob), delivery_mode);
assert_eq_with_mode!(cache.get(&"c"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), Some(dennis), delivery_mode);
assert_with_mode!(!cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"c"), delivery_mode);
assert_with_mode!(cache.contains_key(&"d"), delivery_mode);
cache.insert("b", bill);
expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
expected.push((Arc::new("d"), dennis, RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"b"), Some(bill), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
cache.insert("a", alice);
cache.insert("b", bob);
expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
cache.sync();
assert_eq_with_mode!(cache.get(&"a"), Some(alice), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some(bob), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
assert_eq_with_mode!(cache.entry_count(), 2, delivery_mode);
assert_eq_with_mode!(cache.weighted_size(), 25, delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn basic_multi_threads() {
let num_threads = 4;
let cache = Cache::new(100);
#[allow(clippy::needless_collect)]
let handles = (0..num_threads)
.map(|id| {
let cache = cache.clone();
std::thread::spawn(move || {
cache.insert(10, format!("{}-100", id));
cache.get(&10);
cache.insert(20, format!("{}-200", id));
cache.invalidate(&10);
})
})
.collect::<Vec<_>>();
handles.into_iter().for_each(|h| h.join().expect("Failed"));
assert!(cache.get(&10).is_none());
assert!(cache.get(&20).is_some());
assert!(!cache.contains_key(&10));
assert!(cache.contains_key(&20));
}
#[test]
fn invalidate_all() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
let mut cache = Cache::builder()
.max_capacity(100)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("a", "alice");
cache.insert("b", "bob");
cache.insert("c", "cindy");
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
assert_eq_with_mode!(cache.get(&"c"), Some("cindy"), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(cache.contains_key(&"c"), delivery_mode);
cache.invalidate_all();
expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
cache.sync();
cache.insert("d", "david");
cache.sync();
assert_with_mode!(cache.get(&"a").is_none(), delivery_mode);
assert_with_mode!(cache.get(&"b").is_none(), delivery_mode);
assert_with_mode!(cache.get(&"c").is_none(), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), Some("david"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"c"), delivery_mode);
assert_with_mode!(cache.contains_key(&"d"), delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
run_test(DeliveryMode::Immediate)?;
run_test(DeliveryMode::Queued)?;
fn run_test(delivery_mode: DeliveryMode) -> Result<(), Box<dyn std::error::Error>> {
use std::collections::HashSet;
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
let mut cache = Cache::builder()
.max_capacity(100)
.support_invalidation_closures()
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
let cache = cache;
cache.insert(0, "alice");
cache.insert(1, "bob");
cache.insert(2, "alex");
cache.sync();
mock.increment(Duration::from_secs(5)); cache.sync();
assert_eq_with_mode!(cache.get(&0), Some("alice"), delivery_mode);
assert_eq_with_mode!(cache.get(&1), Some("bob"), delivery_mode);
assert_eq_with_mode!(cache.get(&2), Some("alex"), delivery_mode);
assert_with_mode!(cache.contains_key(&0), delivery_mode);
assert_with_mode!(cache.contains_key(&1), delivery_mode);
assert_with_mode!(cache.contains_key(&2), delivery_mode);
let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
assert_eq_with_mode!(cache.base.invalidation_predicate_count(), 1, delivery_mode);
expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
mock.increment(Duration::from_secs(5));
cache.insert(3, "alice");
cache.sync(); std::thread::sleep(Duration::from_millis(200));
cache.sync(); std::thread::sleep(Duration::from_millis(200));
assert_with_mode!(cache.get(&0).is_none(), delivery_mode);
assert_with_mode!(cache.get(&2).is_none(), delivery_mode);
assert_eq_with_mode!(cache.get(&1), Some("bob"), delivery_mode);
assert_eq_with_mode!(cache.get(&3), Some("alice"), delivery_mode);
assert_with_mode!(!cache.contains_key(&0), delivery_mode);
assert_with_mode!(cache.contains_key(&1), delivery_mode);
assert_with_mode!(!cache.contains_key(&2), delivery_mode);
assert_with_mode!(cache.contains_key(&3), delivery_mode);
assert_eq_with_mode!(cache.entry_count(), 2, delivery_mode);
assert_eq_with_mode!(cache.invalidation_predicate_count(), 0, delivery_mode);
mock.increment(Duration::from_secs(5));
cache.invalidate_entries_if(|_k, &v| v == "alice")?;
cache.invalidate_entries_if(|_k, &v| v == "bob")?;
assert_eq_with_mode!(cache.invalidation_predicate_count(), 2, delivery_mode);
expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
cache.sync(); std::thread::sleep(Duration::from_millis(200));
cache.sync(); std::thread::sleep(Duration::from_millis(200));
assert_with_mode!(cache.get(&1).is_none(), delivery_mode);
assert_with_mode!(cache.get(&3).is_none(), delivery_mode);
assert_with_mode!(!cache.contains_key(&1), delivery_mode);
assert_with_mode!(!cache.contains_key(&3), delivery_mode);
assert_eq_with_mode!(cache.entry_count(), 0, delivery_mode);
assert_eq_with_mode!(cache.invalidation_predicate_count(), 0, delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
Ok(())
}
Ok(())
}
#[test]
fn time_to_live() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
let mut cache = Cache::builder()
.max_capacity(100)
.time_to_live(Duration::from_secs(10))
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
let cache = cache;
cache.insert("a", "alice");
cache.sync();
mock.increment(Duration::from_secs(5)); cache.sync();
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode);
assert_with_mode!(!cache.contains_key(&"a"), delivery_mode);
assert_eq_with_mode!(cache.iter().count(), 0, delivery_mode);
cache.sync();
assert_with_mode!(cache.is_table_empty(), delivery_mode);
cache.insert("b", "bob");
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 1, delivery_mode);
mock.increment(Duration::from_secs(5)); cache.sync();
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.entry_count(), 1, delivery_mode);
cache.insert("b", "bill");
expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
cache.sync();
mock.increment(Duration::from_secs(5)); cache.sync();
assert_eq_with_mode!(cache.get(&"b"), Some("bill"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.entry_count(), 1, delivery_mode);
mock.increment(Duration::from_secs(5)); expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), None, delivery_mode);
assert_with_mode!(!cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.iter().count(), 0, delivery_mode);
cache.sync();
assert_with_mode!(cache.is_table_empty(), delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn time_to_idle() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
let mut cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
let cache = cache;
cache.insert("a", "alice");
cache.sync();
mock.increment(Duration::from_secs(5)); cache.sync();
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
mock.increment(Duration::from_secs(5)); cache.sync();
cache.insert("b", "bob");
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 2, delivery_mode);
mock.increment(Duration::from_secs(2)); cache.sync();
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 2, delivery_mode);
mock.increment(Duration::from_secs(3)); expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.iter().count(), 1, delivery_mode);
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 1, delivery_mode);
mock.increment(Duration::from_secs(10)); expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), None, delivery_mode);
assert_with_mode!(!cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.iter().count(), 0, delivery_mode);
cache.sync();
assert_with_mode!(cache.is_table_empty(), delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn test_iter() {
const NUM_KEYS: usize = 50;
fn make_value(key: usize) -> String {
format!("val: {}", key)
}
let cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.build();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}
let mut key_set = std::collections::HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
assert_eq!(key_set.len(), NUM_KEYS);
}
#[test]
fn test_iter_multi_threads() {
use std::collections::HashSet;
const NUM_KEYS: usize = 1024;
const NUM_THREADS: usize = 16;
fn make_value(key: usize) -> String {
format!("val: {}", key)
}
let cache = Cache::builder()
.max_capacity(2048)
.time_to_idle(Duration::from_secs(10))
.build();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}
let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
let write_lock = rw_lock.write().unwrap();
#[allow(clippy::needless_collect)]
let handles = (0..NUM_THREADS)
.map(|n| {
let cache = cache.clone();
let rw_lock = Arc::clone(&rw_lock);
if n % 2 == 0 {
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}
std::mem::drop(read_lock);
})
} else {
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
let mut key_set = HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
assert_eq!(key_set.len(), NUM_KEYS);
std::mem::drop(read_lock);
})
}
})
.collect::<Vec<_>>();
std::mem::drop(write_lock);
handles.into_iter().for_each(|h| h.join().expect("Failed"));
let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
assert_eq!(key_set.len(), NUM_KEYS);
}
#[test]
fn get_with() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.get_with(KEY, || {
sleep(Duration::from_millis(300));
"thread1"
});
assert_eq!(v, "thread1");
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.get_with(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache3.get_with(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache4.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache5.get(&KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
for t in vec![thread1, thread2, thread3, thread4, thread5] {
t.join().expect("Failed to join");
}
}
#[test]
fn get_with_by_ref() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: &u32 = &0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
"thread1"
});
assert_eq!(v, "thread1");
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.get_with_by_ref(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache3.get_with_by_ref(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache4.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache5.get(KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
for t in vec![thread1, thread2, thread3, thread4, thread5] {
t.join().expect("Failed to join");
}
}
#[test]
fn get_with_if() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.get_with_if(
KEY,
|| {
sleep(Duration::from_millis(300));
"thread1"
},
|_v| unreachable!(),
);
assert_eq!(v, "thread1");
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
assert_eq!(v, "thread1");
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(350));
let v = cache3.get_with_if(
KEY,
|| unreachable!(),
|v| {
assert_eq!(v, &"thread1");
false
},
);
assert_eq!(v, "thread1");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache4.get_with_if(
KEY,
|| "thread4",
|v| {
assert_eq!(v, &"thread1");
true
},
);
assert_eq!(v, "thread4");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache5.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(350));
let maybe_v = cache6.get(&KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(450));
let maybe_v = cache7.get(&KEY);
assert_eq!(maybe_v, Some("thread4"));
})
};
for t in vec![
thread1, thread2, thread3, thread4, thread5, thread6, thread7,
] {
t.join().expect("Failed to join");
}
}
#[test]
fn try_get_with() {
use std::{
sync::Arc,
thread::{sleep, spawn},
};
#[derive(Debug)]
pub struct MyError(String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.try_get_with(KEY, || {
sleep(Duration::from_millis(300));
Err(MyError("thread1 error".into()))
});
assert!(v.is_err());
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
assert!(v.is_err());
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v: MyResult<_> = cache3.try_get_with(KEY, || {
sleep(Duration::from_millis(300));
Ok("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in vec![
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
}
#[test]
fn try_get_with_by_ref() {
use std::{
sync::Arc,
thread::{sleep, spawn},
};
#[derive(Debug)]
pub struct MyError(String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = Cache::new(100);
const KEY: &u32 = &0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.try_get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
Err(MyError("thread1 error".into()))
});
assert!(v.is_err());
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v: MyResult<_> = cache2.try_get_with_by_ref(KEY, || unreachable!());
assert!(v.is_err());
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v: MyResult<_> = cache3.try_get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
Ok("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
let v: MyResult<_> = cache4.try_get_with_by_ref(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let v: MyResult<_> = cache5.try_get_with_by_ref(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in vec![
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
}
#[test]
fn optionally_get_with() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: u32 = 0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.optionally_get_with(KEY, || {
sleep(Duration::from_millis(300));
None
});
assert!(v.is_none());
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.optionally_get_with(KEY, || unreachable!());
assert!(v.is_none());
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache3.optionally_get_with(KEY, || {
sleep(Duration::from_millis(300));
Some("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
let v = cache4.optionally_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let v = cache5.optionally_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
})
};
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in vec![
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
}
#[test]
fn optionally_get_with_by_ref() {
use std::thread::{sleep, spawn};
let cache = Cache::new(100);
const KEY: &u32 = &0;
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
let v = cache1.optionally_get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
None
});
assert!(v.is_none());
})
};
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(100));
let v = cache2.optionally_get_with_by_ref(KEY, || unreachable!());
assert!(v.is_none());
})
};
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let v = cache3.optionally_get_with_by_ref(KEY, || {
sleep(Duration::from_millis(300));
Some("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(500));
let v = cache4.optionally_get_with_by_ref(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let v = cache5.optionally_get_with_by_ref(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(KEY);
assert!(maybe_v.is_none());
})
};
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in vec![
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
}
#[test]
fn handle_panic_in_get_with() {
use std::{sync::Barrier, thread};
let cache = Cache::new(16);
let barrier = Arc::new(Barrier::new(2));
{
let cache_ref = cache.clone();
let barrier_ref = barrier.clone();
thread::spawn(move || {
let _ = cache_ref.get_with(1, || {
barrier_ref.wait();
thread::sleep(Duration::from_millis(50));
panic!("Panic during get_with");
});
});
}
barrier.wait();
assert_eq!(cache.get_with(1, || 5), 5);
}
#[test]
fn handle_panic_in_try_get_with() {
use std::{sync::Barrier, thread};
let cache = Cache::new(16);
let barrier = Arc::new(Barrier::new(2));
{
let cache_ref = cache.clone();
let barrier_ref = barrier.clone();
thread::spawn(move || {
let _ = cache_ref.try_get_with(1, || {
barrier_ref.wait();
thread::sleep(Duration::from_millis(50));
panic!("Panic during try_get_with");
}) as Result<_, Arc<Infallible>>;
});
}
barrier.wait();
assert_eq!(
cache.try_get_with(1, || Ok(5)) as Result<_, Arc<Infallible>>,
Ok(5)
);
}
#[test]
fn test_removal_notifications() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert('a', "alice");
cache.invalidate(&'a');
expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 0, delivery_mode);
cache.insert('b', "bob");
cache.insert('c', "cathy");
cache.insert('d', "david");
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 3, delivery_mode);
cache.insert('e', "emily");
expected.push((Arc::new('e'), "emily", RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 3, delivery_mode);
cache.get(&'e');
cache.sync();
cache.insert('e', "eliza");
expected.push((Arc::new('b'), "bob", RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 3, delivery_mode);
cache.insert('d', "dennis");
expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
cache.sync();
assert_eq_with_mode!(cache.entry_count(), 3, delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn test_immediate_removal_notifications_with_updates() {
let actual = Arc::new(Mutex::new(Vec::new()));
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(DeliveryMode::Immediate)
.build();
let mut cache = Cache::builder()
.eviction_listener_with_conf(listener, listener_conf)
.time_to_live(Duration::from_secs(7))
.time_to_idle(Duration::from_secs(5))
.build();
cache.reconfigure_for_testing();
let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
let cache = cache;
cache.insert("alice", "a0");
cache.sync();
mock.increment(Duration::from_secs(6));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
cache.insert("alice", "a1");
{
let mut a = actual.lock();
assert_eq!(a.len(), 1);
assert_eq!(a[0], (Arc::new("alice"), "a0", RemovalCause::Expired));
a.clear();
}
cache.sync();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), Some("a1"));
cache.sync();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
cache.insert("alice", "a2");
{
let mut a = actual.lock();
assert_eq!(a.len(), 1);
assert_eq!(a[0], (Arc::new("alice"), "a1", RemovalCause::Expired));
a.clear();
}
cache.sync();
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(6));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
cache.invalidate(&"alice");
cache.sync();
assert_eq!(cache.entry_count(), 0);
{
let mut a = actual.lock();
assert_eq!(a.len(), 1);
assert_eq!(a[0], (Arc::new("alice"), "a2", RemovalCause::Expired));
a.clear();
}
cache.insert("alice", "a3");
cache.sync();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), Some("a3"));
cache.sync();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
cache.invalidate(&"alice");
cache.sync();
assert_eq!(cache.entry_count(), 0);
{
let mut a = actual.lock();
assert_eq!(a.len(), 1);
assert_eq!(a[0], (Arc::new("alice"), "a3", RemovalCause::Expired));
a.clear();
}
}
#[test]
fn test_key_lock_used_by_immediate_removal_notifications() {
use std::thread::{sleep, spawn};
const KEY: &str = "alice";
type Val = &'static str;
#[derive(PartialEq, Eq, Debug)]
enum Event {
Insert(Val),
Invalidate(Val),
BeginNotify(Val, RemovalCause),
EndNotify(Val, RemovalCause),
}
let actual = Arc::new(Mutex::new(Vec::new()));
let a0 = Arc::clone(&actual);
let listener = move |_k, v, cause| {
a0.lock().push(Event::BeginNotify(v, cause));
sleep(Duration::from_millis(100));
a0.lock().push(Event::EndNotify(v, cause));
};
let listener_conf = notification::Configuration::builder()
.delivery_mode(DeliveryMode::Immediate)
.build();
let mut cache = Cache::builder()
.eviction_listener_with_conf(listener, listener_conf)
.time_to_live(Duration::from_millis(200))
.build();
cache.reconfigure_for_testing();
let cache = cache;
let expected = vec![
Event::Insert("a0"),
Event::Insert("a1"),
Event::BeginNotify("a0", RemovalCause::Expired),
Event::Insert("a2"),
Event::EndNotify("a0", RemovalCause::Expired),
Event::BeginNotify("a1", RemovalCause::Replaced),
Event::Invalidate("a2"),
Event::EndNotify("a1", RemovalCause::Replaced),
Event::BeginNotify("a2", RemovalCause::Explicit),
Event::EndNotify("a2", RemovalCause::Explicit),
];
actual.lock().push(Event::Insert("a0"));
cache.insert(KEY, "a0");
cache.sync();
let thread1 = {
let a1 = Arc::clone(&actual);
let c1 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(210));
a1.lock().push(Event::Insert("a1"));
c1.insert(KEY, "a1");
})
};
let thread2 = {
let a2 = Arc::clone(&actual);
let c2 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(220));
a2.lock().push(Event::Insert("a2"));
c2.insert(KEY, "a2");
})
};
let thread3 = {
let a3 = Arc::clone(&actual);
let c3 = cache.clone();
spawn(move || {
sleep(Duration::from_millis(320));
a3.lock().push(Event::Invalidate("a2"));
c3.invalidate(&KEY);
})
};
for t in vec![thread1, thread2, thread3] {
t.join().expect("Failed to join");
}
let actual = actual.lock();
assert_eq!(actual.len(), expected.len());
for (i, (actual, expected)) in actual.iter().zip(&expected).enumerate() {
assert_eq!(actual, expected, "expected[{}]", i);
}
}
#[test]
fn recover_from_panicking_eviction_listener() {
#[cfg(feature = "logging")]
let _ = env_logger::builder().is_test(true).try_init();
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| {
if v == "panic now!" {
panic!("Panic now!");
}
a1.lock().push((k, v, cause))
};
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
let mut cache = Cache::builder()
.name("My Sync Cache")
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let cache = cache;
cache.insert("alice", "a0");
cache.sync();
cache.insert("alice", "panic now!");
expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
cache.sync();
cache.insert("alice", "a2");
cache.sync();
cache.invalidate(&"alice");
cache.sync();
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn borrowed_forms_of_key() {
let cache: Cache<Vec<u8>, ()> = Cache::new(1);
let key = vec![1_u8];
cache.insert(key.clone(), ());
let key_v: &Vec<u8> = &key;
assert!(cache.contains_key(key_v));
assert_eq!(cache.get(key_v), Some(()));
cache.invalidate(key_v);
cache.insert(key, ());
let key_s: &[u8] = &[1_u8];
assert!(cache.contains_key(key_s));
assert_eq!(cache.get(key_s), Some(()));
cache.invalidate(key_s);
}
#[test]
fn drop_value_immediately_after_eviction() {
use crate::common::test_utils::{Counters, Value};
const MAX_CAPACITY: u32 = 500;
const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
let counters = Arc::new(Counters::default());
let counters1 = Arc::clone(&counters);
let listener = move |_k, _v, cause| match cause {
RemovalCause::Size => counters1.incl_evicted(),
RemovalCause::Explicit => counters1.incl_invalidated(),
_ => (),
};
let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY as u64)
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
let cache = cache;
for key in 0..KEYS {
let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
cache.insert(key, value);
counters.incl_inserted();
cache.sync();
}
let eviction_count = KEYS - MAX_CAPACITY;
cache.sync();
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), 0, "invalidated");
assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
for key in 0..KEYS {
cache.invalidate(&key);
cache.sync();
}
cache.sync();
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
std::mem::drop(cache);
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
}
#[test]
#[ignore]
fn enabling_and_disabling_thread_pools() {
use crate::common::concurrent::thread_pool::{PoolName::*, ThreadPoolRegistry};
{
let cache = Cache::builder().thread_pool_enabled(true).build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert_eq!(enabled_pools, &[Housekeeper]);
}
{
let cache = Cache::builder()
.thread_pool_enabled(true)
.support_invalidation_closures()
.build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert_eq!(enabled_pools, &[Housekeeper, Invalidator]);
}
{
let listener = |_k, _v, _cause| {};
let listener_conf = notification::Configuration::builder()
.delivery_mode(DeliveryMode::Queued)
.build();
let cache = Cache::builder()
.thread_pool_enabled(true)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert_eq!(enabled_pools, &[Housekeeper, RemovalNotifier]);
}
{
let listener = |_k, _v, _cause| {};
let listener_conf = notification::Configuration::builder()
.delivery_mode(DeliveryMode::Immediate)
.build();
let cache = Cache::builder()
.thread_pool_enabled(true)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert_eq!(enabled_pools, &[Housekeeper]);
}
{
let cache = Cache::builder().thread_pool_enabled(false).build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert!(enabled_pools.is_empty());
}
}
#[test]
fn test_debug_format() {
let cache = Cache::new(10);
cache.insert('a', "alice");
cache.insert('b', "bob");
cache.insert('c', "cindy");
let debug_str = format!("{:?}", cache);
assert!(debug_str.starts_with('{'));
assert!(debug_str.contains(r#"'a': "alice""#));
assert!(debug_str.contains(r#"'b': "bob""#));
assert!(debug_str.contains(r#"'c': "cindy""#));
assert!(debug_str.ends_with('}'));
}
type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
fn verify_notification_vec<K, V, S>(
cache: &Cache<K, V, S>,
actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
expected: &[NotificationTuple<K, V>],
delivery_mode: DeliveryMode,
) where
K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
{
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
cache.sync();
std::thread::sleep(Duration::from_millis(500));
let actual = &*actual.lock();
if actual.len() != expected.len() {
if retries <= MAX_RETRIES {
retries += 1;
continue;
} else {
assert_eq!(
actual.len(),
expected.len(),
"Retries exhausted (delivery mode: {:?})",
delivery_mode
);
}
}
for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
assert_eq!(
actual, expected,
"expected[{}] (delivery mode: {:?})",
i, delivery_mode
);
}
break;
}
}
}