use super::{cache::Cache, CacheBuilder, ConcurrentCacheExt};
use crate::{
common::concurrent::{housekeeper, Weigher},
notification::{self, EvictionListener},
sync_base::iter::{Iter, ScanningGet},
Policy, PredicateError,
};
use std::{
borrow::Borrow,
collections::hash_map::RandomState,
fmt,
hash::{BuildHasher, Hash, Hasher},
sync::Arc,
time::Duration,
};
/// A thread-safe concurrent in-memory cache, with multiple internal segments.
///
/// `SegmentedCache` has multiple internal [`Cache`][cache-struct] instances for
/// increased concurrent update performance. However, it has little overheads on
/// retrievals and updates for managing these segments.
///
/// For usage examples, see the document of the [`Cache`][cache-struct].
///
/// [cache-struct]: ./struct.Cache.html
///
pub struct SegmentedCache<K, V, S = RandomState> {
inner: Arc<Inner<K, V, S>>,
}
// TODO: https://github.com/moka-rs/moka/issues/54
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<K, V, S> Send for SegmentedCache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Send,
{
}
unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Sync,
{
}
impl<K, V, S> Clone for SegmentedCache<K, V, S> {
/// Makes a clone of this shared cache.
///
/// This operation is cheap as it only creates thread-safe reference counted
/// pointers to the shared internal data structures.
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S>
where
K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
V: fmt::Debug + Clone + Send + Sync + 'static,
// TODO: Remove these bounds from S.
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d_map = f.debug_map();
for (k, v) in self.iter() {
d_map.entry(&k, &v);
}
d_map.finish()
}
}
impl<K, V> SegmentedCache<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
/// Constructs a new `SegmentedCache<K, V>` that has multiple internal
/// segments and will store up to the `max_capacity`.
///
/// To adjust various configuration knobs such as `initial_capacity` or
/// `time_to_live`, use the [`CacheBuilder`][builder-struct].
///
/// [builder-struct]: ./struct.CacheBuilder.html
///
/// # Panics
///
/// Panics if `num_segments` is 0.
pub fn new(max_capacity: u64, num_segments: usize) -> Self {
let build_hasher = RandomState::default();
Self::with_everything(
None,
Some(max_capacity),
None,
num_segments,
build_hasher,
None,
None,
None,
None,
None,
false,
housekeeper::Configuration::new_thread_pool(true),
)
}
/// Returns a [`CacheBuilder`][builder-struct], which can builds a
/// `SegmentedCache` with various configuration knobs.
///
/// [builder-struct]: ./struct.CacheBuilder.html
pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> {
CacheBuilder::default().segments(num_segments)
}
}
impl<K, V, S> SegmentedCache<K, V, S> {
/// Returns cache’s name.
pub fn name(&self) -> Option<&str> {
self.inner.segments[0].name()
}
/// Returns a read-only cache policy of this cache.
///
/// At this time, cache policy cannot be modified after cache creation.
/// A future version may support to modify it.
pub fn policy(&self) -> Policy {
let mut policy = self.inner.segments[0].policy();
policy.set_max_capacity(self.inner.desired_capacity);
policy.set_num_segments(self.inner.segments.len());
policy
}
/// Returns an approximate number of entries in this cache.
///
/// The value returned is _an estimate_; the actual count may differ if there are
/// concurrent insertions or removals, or if some entries are pending removal due
/// to expiration. This inaccuracy can be mitigated by performing a `sync()`
/// first.
///
/// # Example
///
/// ```rust
/// use moka::sync::SegmentedCache;
///
/// let cache = SegmentedCache::new(10, 4);
/// cache.insert('n', "Netherland Dwarf");
/// cache.insert('l', "Lop Eared");
/// cache.insert('d', "Dutch");
///
/// // Ensure an entry exists.
/// assert!(cache.contains_key(&'n'));
///
/// // However, followings may print stale number zeros instead of threes.
/// println!("{}", cache.entry_count()); // -> 0
/// println!("{}", cache.weighted_size()); // -> 0
///
/// // To mitigate the inaccuracy, bring `ConcurrentCacheExt` trait to
/// // the scope so we can use `sync` method.
/// use moka::sync::ConcurrentCacheExt;
/// // Call `sync` to run pending internal tasks.
/// cache.sync();
///
/// // Followings will print the actual numbers.
/// println!("{}", cache.entry_count()); // -> 3
/// println!("{}", cache.weighted_size()); // -> 3
/// ```
///
pub fn entry_count(&self) -> u64 {
self.inner
.segments
.iter()
.map(|seg| seg.entry_count())
.sum()
}
/// Returns an approximate total weighted size of entries in this cache.
///
/// The value returned is _an estimate_; the actual size may differ if there are
/// concurrent insertions or removals, or if some entries are pending removal due
/// to expiration. This inaccuracy can be mitigated by performing a `sync()`
/// first. See [`entry_count`](#method.entry_count) for a sample code.
pub fn weighted_size(&self) -> u64 {
self.inner
.segments
.iter()
.map(|seg| seg.weighted_size())
.sum()
}
}
impl<K, V, S> SegmentedCache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
/// # Panics
///
/// Panics if `num_segments` is 0.
#[allow(clippy::too_many_arguments)]
pub(crate) fn with_everything(
name: Option<String>,
max_capacity: Option<u64>,
initial_capacity: Option<usize>,
num_segments: usize,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_listener: Option<EvictionListener<K, V>>,
eviction_listener_conf: Option<notification::Configuration>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
housekeeper_conf: housekeeper::Configuration,
) -> Self {
Self {
inner: Arc::new(Inner::new(
name,
max_capacity,
initial_capacity,
num_segments,
build_hasher,
weigher,
eviction_listener,
eviction_listener_conf,
time_to_live,
time_to_idle,
invalidator_enabled,
housekeeper_conf,
)),
}
}
/// Returns `true` if the cache contains a value for the key.
///
/// Unlike the `get` method, this method is not considered a cache read operation,
/// so it does not update the historic popularity estimator or reset the idle
/// timer for the key.
///
/// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
/// on the borrowed form _must_ match those for the key type.
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.inner.hash(key);
self.inner.select(hash).contains_key_with_hash(key, hash)
}
/// Returns a _clone_ of the value corresponding to the key.
///
/// If you want to store values that will be expensive to clone, wrap them by
/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
/// thread-safe reference-counted pointer and its `clone()` method is cheap.
///
/// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
/// on the borrowed form _must_ match those for the key type.
///
/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.inner.hash(key);
self.inner.select(hash).get_with_hash(key, hash)
}
/// Deprecated, replaced with [`get_with`](#method.get_with)
#[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
self.get_with(key, init)
}
/// Deprecated, replaced with [`try_get_with`](#method.try_get_with)
#[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
self.try_get_with(key, init)
}
/// Returns a _clone_ of the value corresponding to the key. If the value does
/// not exist, evaluates the `init` closure and inserts the output.
///
/// # Concurrent calls on the same key
///
/// This method guarantees that concurrent calls on the same not-existing key are
/// coalesced into one evaluation of the `init` closure. Only one of the calls
/// evaluates its closure, and other calls wait for that closure to complete. See
/// [`Cache::get_with`][get-with-method] for more details.
///
/// [get-with-method]: ./struct.Cache.html#method.get_with
pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
let hash = self.inner.hash(&key);
let key = Arc::new(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.inner
.select(hash)
.get_or_insert_with_hash_and_fun(key, hash, init, replace_if)
}
/// Similar to [`get_with`](#method.get_with), but instead of passing an owned
/// key, you can pass a reference to the key. If the key does not exist in the
/// cache, the key will be cloned to create new entry in the cache.
pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.inner.hash(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.inner
.select(hash)
.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if)
}
/// Works like [`get_with`](#method.get_with), but takes an additional
/// `replace_if` closure.
///
/// This method will evaluate the `init` closure and insert the output to the
/// cache when:
///
/// - The key does not exist.
/// - Or, `replace_if` closure returns `true`.
pub fn get_with_if(
&self,
key: K,
init: impl FnOnce() -> V,
replace_if: impl FnMut(&V) -> bool,
) -> V {
let hash = self.inner.hash(&key);
let key = Arc::new(key);
self.inner
.select(hash)
.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if))
}
// We will provide this API under the new `entry` API.
//
// /// Similar to [`get_with_if`](#method.get_with_if), but instead of passing an
// /// owned key, you can pass a reference to the key. If the key does not exist in
// /// the cache, the key will be cloned to create new entry in the cache.
// pub fn get_with_if_by_ref<Q>(
// &self,
// key: &Q,
// init: impl FnOnce() -> V,
// replace_if: impl FnMut(&V) -> bool,
// ) -> V
// where
// K: Borrow<Q>,
// Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
// {
// let hash = self.inner.hash(key);
// self.inner
// .select(hash)
// .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, Some(replace_if))
// }
/// Returns a _clone_ of the value corresponding to the key. If the value does
/// not exist, evaluates the `init` closure, and inserts the value if
/// `Some(value)` was returned. If `None` was returned from the closure, this
/// method does not insert a value and returns `None`.
///
/// # Concurrent calls on the same key
///
/// This method guarantees that concurrent calls on the same not-existing key are
/// coalesced into one evaluation of the `init` closure. Only one of the calls
/// evaluates its closure, and other calls wait for that closure to complete.
/// See [`Cache::optionally_get_with`][opt-get-with-method] for more details.
///
/// [opt-get-with-method]: ./struct.Cache.html#method.optionally_get_with
pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
where
F: FnOnce() -> Option<V>,
{
let hash = self.inner.hash(&key);
let key = Arc::new(key);
self.inner
.select(hash)
.get_or_optionally_insert_with_hash_and_fun(key, hash, init)
}
/// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
/// of passing an owned key, you can pass a reference to the key. If the key does
/// not exist in the cache, the key will be cloned to create new entry in the
/// cache.
pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
where
F: FnOnce() -> Option<V>,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.inner.hash(key);
self.inner
.select(hash)
.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init)
}
/// Returns a _clone_ of the value corresponding to the key. If the value does
/// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
/// was returned. If `Err(_)` was returned from the closure, this method does not
/// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
///
/// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
///
/// # Concurrent calls on the same key
///
/// This method guarantees that concurrent calls on the same not-existing key are
/// coalesced into one evaluation of the `init` closure (as long as these
/// closures return the same error type). Only one of the calls evaluates its
/// closure, and other calls wait for that closure to complete. See
/// [`Cache::try_get_with`][try-get-with-method] for more details.
///
/// [try-get-with-method]: ./struct.Cache.html#method.try_get_with
pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
{
let hash = self.inner.hash(&key);
let key = Arc::new(key);
self.inner
.select(hash)
.get_or_try_insert_with_hash_and_fun(key, hash, init)
}
/// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
/// owned key, you can pass a reference to the key. If the key does not exist in
/// the cache, the key will be cloned to create new entry in the cache.
pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
where
F: FnOnce() -> Result<V, E>,
E: Send + Sync + 'static,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.inner.hash(key);
self.inner
.select(hash)
.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init)
}
/// Inserts a key-value pair into the cache.
///
/// If the cache has this key present, the value is updated.
pub fn insert(&self, key: K, value: V) {
let hash = self.inner.hash(&key);
let key = Arc::new(key);
self.inner.select(hash).insert_with_hash(key, hash, value);
}
/// Discards any cached value for the key.
///
/// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
/// on the borrowed form _must_ match those for the key type.
pub fn invalidate<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.inner.hash(key);
self.inner.select(hash).invalidate_with_hash(key, hash);
}
/// Discards all cached values.
///
/// This method returns immediately and a background thread will evict all the
/// cached values inserted before the time when this method was called. It is
/// guaranteed that the `get` method must not return these invalidated values
/// even if they have not been evicted.
///
/// Like the `invalidate` method, this method does not clear the historic
/// popularity estimator of keys so that it retains the client activities of
/// trying to retrieve an item.
pub fn invalidate_all(&self) {
for segment in self.inner.segments.iter() {
segment.invalidate_all();
}
}
/// Discards cached values that satisfy a predicate.
///
/// `invalidate_entries_if` takes a closure that returns `true` or `false`. This
/// method returns immediately and a background thread will apply the closure to
/// each cached value inserted before the time when `invalidate_entries_if` was
/// called. If the closure returns `true` on a value, that value will be evicted
/// from the cache.
///
/// Also the `get` method will apply the closure to a value to determine if it
/// should have been invalidated. Therefore, it is guaranteed that the `get`
/// method must not return invalidated values.
///
/// Note that you must call
/// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
/// at the cache creation time as the cache needs to maintain additional internal
/// data structures to support this method. Otherwise, calling this method will
/// fail with a
/// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
///
/// Like the `invalidate` method, this method does not clear the historic
/// popularity estimator of keys so that it retains the client activities of
/// trying to retrieve an item.
///
/// [support-invalidation-closures]: ./struct.CacheBuilder.html#method.support_invalidation_closures
/// [invalidation-disabled-error]: ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError>
where
F: Fn(&K, &V) -> bool + Send + Sync + 'static,
{
let pred = Arc::new(predicate);
for segment in self.inner.segments.iter() {
segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?;
}
Ok(())
}
/// Creates an iterator visiting all key-value pairs in arbitrary order. The
/// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
/// value.
///
/// Iterators do not block concurrent reads and writes on the cache. An entry can
/// be inserted to, invalidated or evicted from a cache while iterators are alive
/// on the same cache.
///
/// Unlike the `get` method, visiting entries via an iterator do not update the
/// historic popularity estimator or reset idle timers for keys.
///
/// # Guarantees
///
/// In order to allow concurrent access to the cache, iterator's `next` method
/// does _not_ guarantee the following:
///
/// - It does not guarantee to return a key-value pair (an entry) if its key has
/// been inserted to the cache _after_ the iterator was created.
/// - Such an entry may or may not be returned depending on key's hash and
/// timing.
///
/// and the `next` method guarantees the followings:
///
/// - It guarantees not to return the same entry more than once.
/// - It guarantees not to return an entry if it has been removed from the cache
/// after the iterator was created.
/// - Note: An entry can be removed by following reasons:
/// - Manually invalidated.
/// - Expired (e.g. time-to-live).
/// - Evicted as the cache capacity exceeded.
///
/// # Examples
///
/// ```rust
/// use moka::sync::SegmentedCache;
///
/// let cache = SegmentedCache::new(100, 4);
/// cache.insert("Julia", 14);
///
/// let mut iter = cache.iter();
/// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
/// assert_eq!(*k, "Julia");
/// assert_eq!(v, 14);
///
/// assert!(iter.next().is_none());
/// ```
///
pub fn iter(&self) -> Iter<'_, K, V> {
let num_cht_segments = self.inner.segments[0].num_cht_segments();
let segments = self
.inner
.segments
.iter()
.map(|c| c as &dyn ScanningGet<_, _>)
.collect::<Vec<_>>()
.into_boxed_slice();
Iter::with_multiple_cache_segments(segments, num_cht_segments)
}
// /// This is used by unit tests to get consistent result.
// #[cfg(test)]
// pub(crate) fn reconfigure_for_testing(&mut self) {
// // Stop the housekeeping job that may cause sync() method to return earlier.
// for segment in self.inner.segments.iter_mut() {
// segment.reconfigure_for_testing()
// }
// }
}
impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
type Item = (Arc<K>, V);
type IntoIter = Iter<'a, K, V>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<K, V, S> ConcurrentCacheExt<K, V> for SegmentedCache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn sync(&self) {
for segment in self.inner.segments.iter() {
segment.sync();
}
}
}
// For unit tests.
#[cfg(test)]
impl<K, V, S> SegmentedCache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn invalidation_predicate_count(&self) -> usize {
self.inner
.segments
.iter()
.map(|seg| seg.invalidation_predicate_count())
.sum()
}
fn reconfigure_for_testing(&mut self) {
let inner = Arc::get_mut(&mut self.inner)
.expect("There are other strong reference to self.inner Arc");
for segment in inner.segments.iter_mut() {
segment.reconfigure_for_testing();
}
}
fn create_mock_expiration_clock(&self) -> MockExpirationClock {
let mut exp_clock = MockExpirationClock::default();
for segment in self.inner.segments.iter() {
let (clock, mock) = crate::common::time::Clock::mock();
segment.set_expiration_clock(Some(clock));
exp_clock.mocks.push(mock);
}
exp_clock
}
}
// For unit tests.
#[cfg(test)]
#[derive(Default)]
struct MockExpirationClock {
mocks: Vec<Arc<crate::common::time::Mock>>,
}
#[cfg(test)]
impl MockExpirationClock {
fn increment(&mut self, duration: Duration) {
for mock in &mut self.mocks {
mock.increment(duration);
}
}
}
struct Inner<K, V, S> {
desired_capacity: Option<u64>,
segments: Box<[Cache<K, V, S>]>,
build_hasher: S,
segment_shift: u32,
}
impl<K, V, S> Inner<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
/// # Panics
///
/// Panics if `num_segments` is 0.
#[allow(clippy::too_many_arguments)]
fn new(
name: Option<String>,
max_capacity: Option<u64>,
initial_capacity: Option<usize>,
num_segments: usize,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_listener: Option<EvictionListener<K, V>>,
eviction_listener_conf: Option<notification::Configuration>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
housekeeper_conf: housekeeper::Configuration,
) -> Self {
assert!(num_segments > 0);
let actual_num_segments = num_segments.next_power_of_two();
let segment_shift = 64 - actual_num_segments.trailing_zeros();
// TODO: Round up.
let seg_max_capacity = max_capacity.map(|n| n / actual_num_segments as u64);
let seg_init_capacity = initial_capacity.map(|cap| cap / actual_num_segments);
// NOTE: We cannot initialize the segments as `vec![cache; actual_num_segments]`
// because Cache::clone() does not clone its inner but shares the same inner.
let segments = (0..actual_num_segments)
.map(|_| {
Cache::with_everything(
name.clone(),
seg_max_capacity,
seg_init_capacity,
build_hasher.clone(),
weigher.as_ref().map(Arc::clone),
eviction_listener.as_ref().map(Arc::clone),
eviction_listener_conf.clone(),
time_to_live,
time_to_idle,
invalidator_enabled,
housekeeper_conf.clone(),
)
})
.collect::<Vec<_>>();
Self {
desired_capacity: max_capacity,
segments: segments.into_boxed_slice(),
build_hasher,
segment_shift,
}
}
#[inline]
fn hash<Q>(&self, key: &Q) -> u64
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let mut hasher = self.build_hasher.build_hasher();
key.hash(&mut hasher);
hasher.finish()
}
#[inline]
fn select(&self, hash: u64) -> &Cache<K, V, S> {
let index = self.segment_index_from_hash(hash);
&self.segments[index]
}
#[inline]
fn segment_index_from_hash(&self, hash: u64) -> usize {
if self.segment_shift == 64 {
0
} else {
(hash >> self.segment_shift) as usize
}
}
}
#[cfg(test)]
mod tests {
use super::{ConcurrentCacheExt, SegmentedCache};
use crate::notification::{
self,
macros::{assert_eq_with_mode, assert_with_mode},
DeliveryMode, RemovalCause,
};
use parking_lot::Mutex;
use std::{sync::Arc, time::Duration};
#[test]
fn basic_single_thread() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
// Create a cache with the eviction listener.
let mut cache = SegmentedCache::builder(1)
.max_capacity(3)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
cache.insert("a", "alice");
cache.insert("b", "bob");
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
cache.sync();
// counts: a -> 1, b -> 1
cache.insert("c", "cindy");
assert_eq_with_mode!(cache.get(&"c"), Some("cindy"), delivery_mode);
assert_with_mode!(cache.contains_key(&"c"), delivery_mode);
// counts: a -> 1, b -> 1, c -> 1
cache.sync();
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
cache.sync();
// counts: a -> 2, b -> 2, c -> 1
// "d" should not be admitted because its frequency is too low.
cache.insert("d", "david"); // count: d -> 0
expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); // d -> 1
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
cache.insert("d", "david");
expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.sync();
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); // d -> 2
// "d" should be admitted and "c" should be evicted
// because d's frequency is higher than c's.
cache.insert("d", "dennis");
expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
assert_eq_with_mode!(cache.get(&"c"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), Some("dennis"), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"c"), delivery_mode);
assert_with_mode!(cache.contains_key(&"d"), delivery_mode);
cache.invalidate(&"b");
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
cache.sync();
assert_eq_with_mode!(cache.get(&"b"), None, delivery_mode);
assert_with_mode!(!cache.contains_key(&"b"), delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn non_power_of_two_segments() {
let mut cache = SegmentedCache::new(100, 5);
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
assert_eq!(cache.iter().count(), 0);
cache.insert("a", "alice");
cache.insert("b", "bob");
cache.insert("c", "cindy");
assert_eq!(cache.iter().count(), 3);
cache.sync();
assert_eq!(cache.iter().count(), 3);
}
#[test]
fn size_aware_eviction() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
let weigher = |_k: &&str, v: &(&str, u32)| v.1;
let alice = ("alice", 10);
let bob = ("bob", 15);
let bill = ("bill", 20);
let cindy = ("cindy", 5);
let david = ("david", 15);
let dennis = ("dennis", 15);
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
// Create a cache with the eviction listener.
let mut cache = SegmentedCache::builder(1)
.max_capacity(31)
.weigher(weigher)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
cache.insert("a", alice);
cache.insert("b", bob);
assert_eq_with_mode!(cache.get(&"a"), Some(alice), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some(bob), delivery_mode);
cache.sync();
// order (LRU -> MRU) and counts: a -> 1, b -> 1
cache.insert("c", cindy);
assert_eq_with_mode!(cache.get(&"c"), Some(cindy), delivery_mode);
assert_with_mode!(cache.contains_key(&"c"), delivery_mode);
// order and counts: a -> 1, b -> 1, c -> 1
cache.sync();
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_eq_with_mode!(cache.get(&"a"), Some(alice), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some(bob), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
cache.sync();
// order and counts: c -> 1, a -> 2, b -> 2
// To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
// "d" must have higher count than 3, which is the aggregated count
// of "a" and "c".
cache.insert("d", david); // count: d -> 0
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); // d -> 1
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); // d -> 2
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); // d -> 3
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
cache.insert("d", david);
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode); // d -> 4
// Finally "d" should be admitted by evicting "c" and "a".
cache.insert("d", dennis);
expected.push((Arc::new("c"), cindy, RemovalCause::Size));
expected.push((Arc::new("a"), alice, RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some(bob), delivery_mode);
assert_eq_with_mode!(cache.get(&"c"), None, delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), Some(dennis), delivery_mode);
assert_with_mode!(!cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"c"), delivery_mode);
assert_with_mode!(cache.contains_key(&"d"), delivery_mode);
// Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
cache.insert("b", bill);
expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
expected.push((Arc::new("d"), dennis, RemovalCause::Size));
cache.sync();
assert_eq_with_mode!(cache.get(&"b"), Some(bill), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
// Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
cache.insert("a", alice);
cache.insert("b", bob);
expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
cache.sync();
assert_eq_with_mode!(cache.get(&"a"), Some(alice), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some(bob), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), None, delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"d"), delivery_mode);
// Verify the sizes.
assert_eq_with_mode!(cache.entry_count(), 2, delivery_mode);
assert_eq_with_mode!(cache.weighted_size(), 25, delivery_mode);
verify_notification_vec(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn basic_multi_threads() {
let num_threads = 4;
let mut cache = SegmentedCache::new(100, num_threads);
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
// https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
#[allow(clippy::needless_collect)]
let handles = (0..num_threads)
.map(|id| {
let cache = cache.clone();
std::thread::spawn(move || {
cache.insert(10, format!("{}-100", id));
cache.get(&10);
cache.sync();
cache.insert(20, format!("{}-200", id));
cache.invalidate(&10);
})
})
.collect::<Vec<_>>();
handles.into_iter().for_each(|h| h.join().expect("Failed"));
cache.sync();
assert!(cache.get(&10).is_none());
assert!(cache.get(&20).is_some());
assert!(!cache.contains_key(&10));
assert!(cache.contains_key(&20));
}
#[test]
fn invalidate_all() {
run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);
fn run_test(delivery_mode: DeliveryMode) {
use std::collections::HashMap;
// The following `HashMap`s will hold actual and expected notifications.
// Note: We use `HashMap` here as the order of invalidations is non-deterministic.
let actual = Arc::new(Mutex::new(HashMap::new()));
let mut expected = HashMap::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| {
a1.lock().insert(k, (v, cause));
};
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
// Create a cache with the eviction listener.
let mut cache = SegmentedCache::builder(4)
.max_capacity(100)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
cache.insert("a", "alice");
cache.insert("b", "bob");
cache.insert("c", "cindy");
assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode);
assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode);
assert_eq_with_mode!(cache.get(&"c"), Some("cindy"), delivery_mode);
assert_with_mode!(cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(cache.contains_key(&"c"), delivery_mode);
// `cache.sync()` is no longer needed here before invalidating. The last
// modified timestamp of the entries were updated when they were inserted.
// https://github.com/moka-rs/moka/issues/155
cache.invalidate_all();
expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit));
expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit));
expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit));
cache.sync();
cache.insert("d", "david");
cache.sync();
assert_with_mode!(cache.get(&"a").is_none(), delivery_mode);
assert_with_mode!(cache.get(&"b").is_none(), delivery_mode);
assert_with_mode!(cache.get(&"c").is_none(), delivery_mode);
assert_eq_with_mode!(cache.get(&"d"), Some("david"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"a"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"b"), delivery_mode);
assert_with_mode!(!cache.contains_key(&"c"), delivery_mode);
assert_with_mode!(cache.contains_key(&"d"), delivery_mode);
verify_notification_map(&cache, actual, &expected, delivery_mode);
}
}
#[test]
fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
run_test(DeliveryMode::Immediate)?;
run_test(DeliveryMode::Queued)?;
fn run_test(delivery_mode: DeliveryMode) -> Result<(), Box<dyn std::error::Error>> {
use std::collections::{HashMap, HashSet};
const SEGMENTS: usize = 4;
// The following `HashMap`s will hold actual and expected notifications.
// Note: We use `HashMap` here as the order of invalidations is non-deterministic.
let actual = Arc::new(Mutex::new(HashMap::new()));
let mut expected = HashMap::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| {
a1.lock().insert(k, (v, cause));
};
let listener_conf = notification::Configuration::builder()
.delivery_mode(delivery_mode)
.build();
// Create a cache with the eviction listener.
let mut cache = SegmentedCache::builder(SEGMENTS)
.max_capacity(100)
.support_invalidation_closures()
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.reconfigure_for_testing();
let mut mock = cache.create_mock_expiration_clock();
// Make the cache exterior immutable.
let cache = cache;
cache.insert(0, "alice");
cache.insert(1, "bob");
cache.insert(2, "alex");
cache.sync();
mock.increment(Duration::from_secs(5)); // 5 secs from the start.
cache.sync();
assert_eq_with_mode!(cache.get(&0), Some("alice"), delivery_mode);
assert_eq_with_mode!(cache.get(&1), Some("bob"), delivery_mode);
assert_eq_with_mode!(cache.get(&2), Some("alex"), delivery_mode);
assert_with_mode!(cache.contains_key(&0), delivery_mode);
assert_with_mode!(cache.contains_key(&1), delivery_mode);
assert_with_mode!(cache.contains_key(&2), delivery_mode);
let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
assert_eq_with_mode!(
cache.invalidation_predicate_count(),
SEGMENTS,
delivery_mode
);
expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit));
expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit));
mock.increment(Duration::from_secs(5)); // 10 secs from the start.
cache.insert(3, "alice");
// Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
cache.sync(); // To submit the invalidation task.
std::thread::sleep(Duration::from_millis(200));
cache.sync(); // To process the task result.
std::thread::sleep(Duration::from_millis(200));
assert_with_mode!(cache.get(&0).is_none(), delivery_mode);
assert_with_mode!(cache.get(&2).is_none(), delivery_mode);
assert_eq_with_mode!(cache.get(&1), Some("bob"), delivery_mode);
// This should survive as it was inserted after calling invalidate_entries_if.
assert_eq_with_mode!(cache.get(&3), Some("alice"), delivery_mode);
assert_with_mode!(!cache.contains_key(&0), delivery_mode);
assert_with_mode!(cache.contains_key(&1), delivery_mode);
assert_with_mode!(!cache.contains_key(&2), delivery_mode);
assert_with_mode!(cache.contains_key(&3), delivery_mode);
assert_eq_with_mode!(cache.entry_count(), 2, delivery_mode);
assert_eq_with_mode!(cache.invalidation_predicate_count(), 0, delivery_mode);
mock.increment(Duration::from_secs(5)); // 15 secs from the start.
cache.invalidate_entries_if(|_k, &v| v == "alice")?;
cache.invalidate_entries_if(|_k, &v| v == "bob")?;
assert_eq_with_mode!(
cache.invalidation_predicate_count(),
SEGMENTS * 2,
delivery_mode
);
expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit));
expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit));
// Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
cache.sync(); // To submit the invalidation task.
std::thread::sleep(Duration::from_millis(200));
cache.sync(); // To process the task result.
std::thread::sleep(Duration::from_millis(200));
assert_with_mode!(cache.get(&1).is_none(), delivery_mode);
assert_with_mode!(cache.get(&3).is_none(), delivery_mode);
assert_with_mode!(!cache.contains_key(&1), delivery_mode);
assert_with_mode!(!cache.contains_key(&3), delivery_mode);
assert_eq_with_mode!(cache.entry_count(), 0, delivery_mode);
assert_eq_with_mode!(cache.invalidation_predicate_count(), 0, delivery_mode);
verify_notification_map(&cache, actual, &expected, delivery_mode);
Ok(())
}
Ok(())
}
#[test]
fn test_iter() {
const NUM_KEYS: usize = 50;
fn make_value(key: usize) -> String {
format!("val: {}", key)
}
// let cache = SegmentedCache::builder(5)
let cache = SegmentedCache::builder(4)
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.build();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}
let mut key_set = std::collections::HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
// Ensure there are no missing or duplicate keys in the iteration.
assert_eq!(key_set.len(), NUM_KEYS);
}
/// Runs 16 threads at the same time and ensures no deadlock occurs.
///
/// - Eight of the threads will update key-values in the cache.
/// - Eight others will iterate the cache.
///
#[test]
fn test_iter_multi_threads() {
use std::collections::HashSet;
const NUM_KEYS: usize = 1024;
const NUM_THREADS: usize = 16;
fn make_value(key: usize) -> String {
format!("val: {}", key)
}
let cache = SegmentedCache::builder(4)
.max_capacity(2048)
.time_to_idle(Duration::from_secs(10))
.build();
// Initialize the cache.
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}
let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
let write_lock = rw_lock.write().unwrap();
// https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
#[allow(clippy::needless_collect)]
let handles = (0..NUM_THREADS)
.map(|n| {
let cache = cache.clone();
let rw_lock = Arc::clone(&rw_lock);
if n % 2 == 0 {
// This thread will update the cache.
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
for key in 0..NUM_KEYS {
// TODO: Update keys in a random order?
cache.insert(key, make_value(key));
}
std::mem::drop(read_lock);
})
} else {
// This thread will iterate the cache.
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
let mut key_set = HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
// Ensure there are no missing or duplicate keys in the iteration.
assert_eq!(key_set.len(), NUM_KEYS);
std::mem::drop(read_lock);
})
}
})
.collect::<Vec<_>>();
// Let these threads to run by releasing the write lock.
std::mem::drop(write_lock);
handles.into_iter().for_each(|h| h.join().expect("Failed"));
// Ensure there are no missing or duplicate keys in the iteration.
let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
assert_eq!(key_set.len(), NUM_KEYS);
}
#[test]
fn get_with() {
use std::thread::{sleep, spawn};
let cache = SegmentedCache::new(100, 4);
const KEY: u32 = 0;
// This test will run five threads:
//
// Thread1 will be the first thread to call `get_with` for a key, so its init
// closure will be evaluated and then a &str value "thread1" will be inserted
// to the cache.
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
// Call `get_with` immediately.
let v = cache1.get_with(KEY, || {
// Wait for 300 ms and return a &str value.
sleep(Duration::from_millis(300));
"thread1"
});
assert_eq!(v, "thread1");
})
};
// Thread2 will be the second thread to call `get_with` for the same key, so
// its init closure will not be evaluated. Once thread1's init closure
// finishes, it will get the value inserted by thread1's init closure.
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
// Wait for 100 ms before calling `get_with`.
sleep(Duration::from_millis(100));
let v = cache2.get_with(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
// Thread3 will be the third thread to call `get_with` for the same key. By
// the time it calls, thread1's init closure should have finished already and
// the value should be already inserted to the cache. So its init closure
// will not be evaluated and will get the value insert by thread1's init
// closure immediately.
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
// Wait for 400 ms before calling `get_with`.
sleep(Duration::from_millis(400));
let v = cache3.get_with(KEY, || unreachable!());
assert_eq!(v, "thread1");
})
};
// Thread4 will call `get` for the same key. It will call when thread1's init
// closure is still running, so it will get none for the key.
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
// Wait for 200 ms before calling `get`.
sleep(Duration::from_millis(200));
let maybe_v = cache4.get(&KEY);
assert!(maybe_v.is_none());
})
};
// Thread5 will call `get` for the same key. It will call after thread1's init
// closure finished, so it will get the value insert by thread1's init closure.
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
// Wait for 400 ms before calling `get`.
sleep(Duration::from_millis(400));
let maybe_v = cache5.get(&KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
for t in vec![thread1, thread2, thread3, thread4, thread5] {
t.join().expect("Failed to join");
}
}
#[test]
fn get_with_if() {
use std::thread::{sleep, spawn};
let cache = SegmentedCache::new(100, 4);
const KEY: u32 = 0;
// This test will run seven threads:
//
// Thread1 will be the first thread to call `get_with_if` for a key, so its
// init closure will be evaluated and then a &str value "thread1" will be
// inserted to the cache.
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
// Call `get_with` immediately.
let v = cache1.get_with_if(
KEY,
|| {
// Wait for 300 ms and return a &str value.
sleep(Duration::from_millis(300));
"thread1"
},
|_v| unreachable!(),
);
assert_eq!(v, "thread1");
})
};
// Thread2 will be the second thread to call `get_with_if` for the same key,
// so its init closure will not be evaluated. Once thread1's init closure
// finishes, it will get the value inserted by thread1's init closure.
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
// Wait for 100 ms before calling `get_with`.
sleep(Duration::from_millis(100));
let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
assert_eq!(v, "thread1");
})
};
// Thread3 will be the third thread to call `get_with_if` for the same
// key. By the time it calls, thread1's init closure should have finished
// already and the value should be already inserted to the cache. Also
// thread3's `replace_if` closure returns `false`. So its init closure will
// not be evaluated and will get the value inserted by thread1's init closure
// immediately.
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
// Wait for 350 ms before calling `get_with_if`.
sleep(Duration::from_millis(350));
let v = cache3.get_with_if(
KEY,
|| unreachable!(),
|v| {
assert_eq!(v, &"thread1");
false
},
);
assert_eq!(v, "thread1");
})
};
// Thread4 will be the fourth thread to call `get_with_if` for the same
// key. The value should have been already inserted to the cache by
// thread1. However thread4's `replace_if` closure returns `true`. So its
// init closure will be evaluated to replace the current value.
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
// Wait for 400 ms before calling `get_with_if`.
sleep(Duration::from_millis(400));
let v = cache4.get_with_if(
KEY,
|| "thread4",
|v| {
assert_eq!(v, &"thread1");
true
},
);
assert_eq!(v, "thread4");
})
};
// Thread5 will call `get` for the same key. It will call when thread1's init
// closure is still running, so it will get none for the key.
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
// Wait for 200 ms before calling `get`.
sleep(Duration::from_millis(200));
let maybe_v = cache5.get(&KEY);
assert!(maybe_v.is_none());
})
};
// Thread6 will call `get` for the same key. It will call when thread1's init
// closure is still running, so it will get none for the key.
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
// Wait for 200 ms before calling `get`.
sleep(Duration::from_millis(350));
let maybe_v = cache6.get(&KEY);
assert_eq!(maybe_v, Some("thread1"));
})
};
// Thread7 will call `get` for the same key. It will call after thread1's init
// closure finished, so it will get the value insert by thread1's init closure.
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
// Wait for 400 ms before calling `get`.
sleep(Duration::from_millis(450));
let maybe_v = cache7.get(&KEY);
assert_eq!(maybe_v, Some("thread4"));
})
};
for t in vec![
thread1, thread2, thread3, thread4, thread5, thread6, thread7,
] {
t.join().expect("Failed to join");
}
}
#[test]
fn try_get_with() {
use std::{
sync::Arc,
thread::{sleep, spawn},
};
#[derive(thiserror::Error, Debug)]
#[error("{}", _0)]
pub struct MyError(String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = SegmentedCache::new(100, 4);
const KEY: u32 = 0;
// This test will run eight threads:
//
// Thread1 will be the first thread to call `try_get_with` for a key, so its
// init closure will be evaluated and then an error will be returned. Nothing
// will be inserted to the cache.
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
// Call `try_get_with` immediately.
let v = cache1.try_get_with(KEY, || {
// Wait for 300 ms and return an error.
sleep(Duration::from_millis(300));
Err(MyError("thread1 error".into()))
});
assert!(v.is_err());
})
};
// Thread2 will be the second thread to call `try_get_with` for the same key,
// so its init closure will not be evaluated. Once thread1's init closure
// finishes, it will get the same error value returned by thread1's init
// closure.
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
// Wait for 100 ms before calling `try_get_with`.
sleep(Duration::from_millis(100));
let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
assert!(v.is_err());
})
};
// Thread3 will be the third thread to call `get_with` for the same key. By
// the time it calls, thread1's init closure should have finished already,
// but the key still does not exist in the cache. So its init closure will be
// evaluated and then an okay &str value will be returned. That value will be
// inserted to the cache.
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
// Wait for 400 ms before calling `try_get_with`.
sleep(Duration::from_millis(400));
let v: MyResult<_> = cache3.try_get_with(KEY, || {
// Wait for 300 ms and return an Ok(&str) value.
sleep(Duration::from_millis(300));
Ok("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
// thread4 will be the fourth thread to call `try_get_with` for the same
// key. So its init closure will not be evaluated. Once thread3's init
// closure finishes, it will get the same okay &str value.
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
// Wait for 500 ms before calling `try_get_with`.
sleep(Duration::from_millis(500));
let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
// Thread5 will be the fifth thread to call `try_get_with` for the same
// key. So its init closure will not be evaluated. By the time it calls,
// thread3's init closure should have finished already, so its init closure
// will not be evaluated and will get the value insert by thread3's init
// closure immediately.
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
// Wait for 800 ms before calling `try_get_with`.
sleep(Duration::from_millis(800));
let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
// Thread6 will call `get` for the same key. It will call when thread1's init
// closure is still running, so it will get none for the key.
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
// Wait for 200 ms before calling `get`.
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
})
};
// Thread7 will call `get` for the same key. It will call after thread1's init
// closure finished with an error. So it will get none for the key.
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
// Wait for 400 ms before calling `get`.
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
})
};
// Thread8 will call `get` for the same key. It will call after thread3's init
// closure finished, so it will get the value insert by thread3's init closure.
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
// Wait for 800 ms before calling `get`.
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in vec![
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
}
#[test]
fn optionally_get_with() {
use std::thread::{sleep, spawn};
let cache = SegmentedCache::new(100, 4);
const KEY: u32 = 0;
// This test will run eight threads:
//
// Thread1 will be the first thread to call `optionally_get_with` for a key, so its
// init closure will be evaluated and then an error will be returned. Nothing
// will be inserted to the cache.
let thread1 = {
let cache1 = cache.clone();
spawn(move || {
// Call `optionally_get_with` immediately.
let v = cache1.optionally_get_with(KEY, || {
// Wait for 300 ms and return an error.
sleep(Duration::from_millis(300));
None
});
assert!(v.is_none());
})
};
// Thread2 will be the second thread to call `optionally_get_with` for the same key,
// so its init closure will not be evaluated. Once thread1's init closure
// finishes, it will get the same error value returned by thread1's init
// closure.
let thread2 = {
let cache2 = cache.clone();
spawn(move || {
// Wait for 100 ms before calling `optionally_get_with`.
sleep(Duration::from_millis(100));
let v = cache2.optionally_get_with(KEY, || unreachable!());
assert!(v.is_none());
})
};
// Thread3 will be the third thread to call `get_with` for the same key. By
// the time it calls, thread1's init closure should have finished already,
// but the key still does not exist in the cache. So its init closure will be
// evaluated and then an okay &str value will be returned. That value will be
// inserted to the cache.
let thread3 = {
let cache3 = cache.clone();
spawn(move || {
// Wait for 400 ms before calling `optionally_get_with`.
sleep(Duration::from_millis(400));
let v = cache3.optionally_get_with(KEY, || {
// Wait for 300 ms and return an Ok(&str) value.
sleep(Duration::from_millis(300));
Some("thread3")
});
assert_eq!(v.unwrap(), "thread3");
})
};
// thread4 will be the fourth thread to call `optionally_get_with` for the same
// key. So its init closure will not be evaluated. Once thread3's init
// closure finishes, it will get the same okay &str value.
let thread4 = {
let cache4 = cache.clone();
spawn(move || {
// Wait for 500 ms before calling `optionally_get_with`.
sleep(Duration::from_millis(500));
let v = cache4.optionally_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
// Thread5 will be the fifth thread to call `optionally_get_with` for the same
// key. So its init closure will not be evaluated. By the time it calls,
// thread3's init closure should have finished already, so its init closure
// will not be evaluated and will get the value insert by thread3's init
// closure immediately.
let thread5 = {
let cache5 = cache.clone();
spawn(move || {
// Wait for 800 ms before calling `optionally_get_with`.
sleep(Duration::from_millis(800));
let v = cache5.optionally_get_with(KEY, || unreachable!());
assert_eq!(v.unwrap(), "thread3");
})
};
// Thread6 will call `get` for the same key. It will call when thread1's init
// closure is still running, so it will get none for the key.
let thread6 = {
let cache6 = cache.clone();
spawn(move || {
// Wait for 200 ms before calling `get`.
sleep(Duration::from_millis(200));
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
})
};
// Thread7 will call `get` for the same key. It will call after thread1's init
// closure finished with an error. So it will get none for the key.
let thread7 = {
let cache7 = cache.clone();
spawn(move || {
// Wait for 400 ms before calling `get`.
sleep(Duration::from_millis(400));
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
})
};
// Thread8 will call `get` for the same key. It will call after thread3's init
// closure finished, so it will get the value insert by thread3's init closure.
let thread8 = {
let cache8 = cache.clone();
spawn(move || {
// Wait for 800 ms before calling `get`.
sleep(Duration::from_millis(800));
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("thread3"));
})
};
for t in vec![
thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
] {
t.join().expect("Failed to join");
}
}
// This test ensures that the `contains_key`, `get` and `invalidate` can use
// borrowed form `&[u8]` for key with type `Vec<u8>`.
// https://github.com/moka-rs/moka/issues/166
#[test]
fn borrowed_forms_of_key() {
let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2);
let key = vec![1_u8];
cache.insert(key.clone(), ());
// key as &Vec<u8>
let key_v: &Vec<u8> = &key;
assert!(cache.contains_key(key_v));
assert_eq!(cache.get(key_v), Some(()));
cache.invalidate(key_v);
cache.insert(key, ());
// key as &[u8]
let key_s: &[u8] = &[1_u8];
assert!(cache.contains_key(key_s));
assert_eq!(cache.get(key_s), Some(()));
cache.invalidate(key_s);
}
#[test]
fn drop_value_immediately_after_eviction() {
use crate::common::test_utils::{Counters, Value};
const NUM_SEGMENTS: usize = 1;
const MAX_CAPACITY: u32 = 500;
const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
let counters = Arc::new(Counters::default());
let counters1 = Arc::clone(&counters);
let listener = move |_k, _v, cause| match cause {
RemovalCause::Size => counters1.incl_evicted(),
RemovalCause::Explicit => counters1.incl_invalidated(),
_ => (),
};
let mut cache = SegmentedCache::builder(NUM_SEGMENTS)
.max_capacity(MAX_CAPACITY as u64)
.eviction_listener(listener)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
for key in 0..KEYS {
let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
cache.insert(key, value);
counters.incl_inserted();
cache.sync();
}
let eviction_count = KEYS - MAX_CAPACITY;
cache.sync();
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), 0, "invalidated");
assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
for key in 0..KEYS {
cache.invalidate(&key);
cache.sync();
}
cache.sync();
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
std::mem::drop(cache);
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
}
// Ignored by default. This test cannot run in parallel with other tests.
#[test]
#[ignore]
fn enabling_and_disabling_thread_pools() {
use crate::common::concurrent::thread_pool::{PoolName::*, ThreadPoolRegistry};
const NUM_SEGMENTS: usize = 4;
// Enable the housekeeper pool.
{
let cache = SegmentedCache::builder(NUM_SEGMENTS)
.thread_pool_enabled(true)
.build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert_eq!(enabled_pools, &[Housekeeper]);
}
// Enable the housekeeper and invalidator pools.
{
let cache = SegmentedCache::builder(NUM_SEGMENTS)
.thread_pool_enabled(true)
.support_invalidation_closures()
.build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert_eq!(enabled_pools, &[Housekeeper, Invalidator]);
}
// Queued delivery mode: Enable the housekeeper and removal notifier pools.
{
let listener = |_k, _v, _cause| {};
let listener_conf = notification::Configuration::builder()
.delivery_mode(DeliveryMode::Queued)
.build();
let cache = SegmentedCache::builder(NUM_SEGMENTS)
.thread_pool_enabled(true)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert_eq!(enabled_pools, &[Housekeeper, RemovalNotifier]);
}
// Immediate delivery mode: Enable only the housekeeper pool.
{
let listener = |_k, _v, _cause| {};
let listener_conf = notification::Configuration::builder()
.delivery_mode(DeliveryMode::Immediate)
.build();
let cache = SegmentedCache::builder(NUM_SEGMENTS)
.thread_pool_enabled(true)
.eviction_listener_with_conf(listener, listener_conf)
.build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert_eq!(enabled_pools, &[Housekeeper]);
}
// Disable all pools.
{
let cache = SegmentedCache::builder(NUM_SEGMENTS)
.thread_pool_enabled(false)
.build();
cache.insert('a', "a");
let enabled_pools = ThreadPoolRegistry::enabled_pools();
assert!(enabled_pools.is_empty());
}
}
#[test]
fn test_debug_format() {
let cache = SegmentedCache::new(10, 4);
cache.insert('a', "alice");
cache.insert('b', "bob");
cache.insert('c', "cindy");
let debug_str = format!("{:?}", cache);
assert!(debug_str.starts_with('{'));
assert!(debug_str.contains(r#"'a': "alice""#));
assert!(debug_str.contains(r#"'b': "bob""#));
assert!(debug_str.contains(r#"'c': "cindy""#));
assert!(debug_str.ends_with('}'));
}
type NotificationPair<V> = (V, RemovalCause);
type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause);
fn verify_notification_vec<K, V, S>(
cache: &SegmentedCache<K, V, S>,
actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>,
expected: &[NotificationTriple<K, V>],
delivery_mode: DeliveryMode,
) where
K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
{
// Retries will be needed when testing in a QEMU VM.
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));
let actual = &*actual.lock();
if actual.len() != expected.len() {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(
actual.len(),
expected.len(),
"Retries exhausted (delivery mode: {:?})",
delivery_mode
);
}
}
for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
assert_eq!(
actual, expected,
"expected[{}] (delivery mode: {:?})",
i, delivery_mode
);
}
break;
}
}
fn verify_notification_map<K, V, S>(
cache: &SegmentedCache<K, V, S>,
actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>,
expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>,
delivery_mode: DeliveryMode,
) where
K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static,
V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
{
// Retries will be needed when testing in a QEMU VM.
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));
let actual = &*actual.lock();
if actual.len() != expected.len() {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(
actual.len(),
expected.len(),
"Retries exhausted (delivery mode: {:?})",
delivery_mode
);
}
}
for actual_key in actual.keys() {
assert_eq!(
actual.get(actual_key),
expected.get(actual_key),
"expected[{}] (delivery mode: {:?})",
actual_key,
delivery_mode
);
}
break;
}
}
}