use super::{
value_initializer::{InitResult, ValueInitializer},
CacheBuilder, ConcurrentCacheExt, Iter, PredicateId,
};
use crate::{
common::{
concurrent::{
constants::{MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS},
housekeeper::{self, InnerSync},
Weigher, WriteOp,
},
time::Instant,
},
notification::{self, EvictionListener},
sync_base::base_cache::{BaseCache, HouseKeeperArc},
Policy, PredicateError,
};
#[cfg(feature = "unstable-debug-counters")]
use crate::common::concurrent::debug_counters::CacheDebugStats;
use crossbeam_channel::{Sender, TrySendError};
use std::{
borrow::Borrow,
collections::hash_map::RandomState,
fmt,
future::Future,
hash::{BuildHasher, Hash},
sync::Arc,
time::Duration,
};
/// A thread-safe, futures-aware concurrent in-memory cache.
///
/// `Cache` supports full concurrency of retrievals and a high expected concurrency
/// for updates. It can be accessed inside and outside of asynchronous contexts.
///
/// `Cache` utilizes a lock-free concurrent hash table as the central key-value
/// storage. `Cache` performs a best-effort bounding of the map using an entry
/// replacement algorithm to determine which entries to evict when the capacity is
/// exceeded.
///
/// To use this cache, enable a crate feature called "future".
///
/// # Table of Contents
///
/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
/// - [Example: Size-based Eviction](#example-size-based-eviction)
/// - [Example: Time-based Expirations](#example-time-based-expirations)
/// - [Example: Eviction Listener](#example-eviction-listener)
/// - [You should avoid eviction listener to panic](#you-should-avoid-eviction-listener-to-panic)
/// - [Delivery Modes for Eviction Listener](#delivery-modes-for-eviction-listener)
/// - [Thread Safety](#thread-safety)
/// - [Sharing a cache across threads](#sharing-a-cache-across-threads)
/// - [Hashing Algorithm](#hashing-algorithm)
///
/// # Example: `insert`, `get` and `invalidate`
///
/// Cache entries are manually added using an insert method, and are stored in the
/// cache until either evicted or manually invalidated:
///
/// - Inside an async context (`async fn` or `async` block), use
/// [`insert`](#method.insert), [`get_with`](#method.get_with) or
/// [`invalidate`](#method.invalidate) methods for updating the cache and `await`
/// them.
/// - Outside any async context, use [`blocking`](#method.blocking) method to access
/// blocking version of [`insert`](./struct.BlockingOp.html#method.insert) or
/// [`invalidate`](struct.BlockingOp.html#method.invalidate) methods.
///
/// Here's an example of reading and updating a cache by using multiple asynchronous
/// tasks with [Tokio][tokio-crate] runtime:
///
/// [tokio-crate]: https://crates.io/crates/tokio
///
///```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// // futures-util = "0.3"
///
/// use moka::future::Cache;
///
/// #[tokio::main]
/// async fn main() {
/// const NUM_TASKS: usize = 16;
/// const NUM_KEYS_PER_TASK: usize = 64;
///
/// fn value(n: usize) -> String {
/// format!("value {}", n)
/// }
///
/// // Create a cache that can store up to 10,000 entries.
/// let cache = Cache::new(10_000);
///
/// // Spawn async tasks and write to and read from the cache.
/// let tasks: Vec<_> = (0..NUM_TASKS)
/// .map(|i| {
/// // To share the same cache across the async tasks, clone it.
/// // This is a cheap operation.
/// let my_cache = cache.clone();
/// let start = i * NUM_KEYS_PER_TASK;
/// let end = (i + 1) * NUM_KEYS_PER_TASK;
///
/// tokio::spawn(async move {
/// // Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
/// for key in start..end {
/// // insert() is an async method, so await it.
/// my_cache.insert(key, value(key)).await;
/// // get() returns Option<String>, a clone of the stored value.
/// assert_eq!(my_cache.get(&key), Some(value(key)));
/// }
///
/// // Invalidate every 4 element of the inserted entries.
/// for key in (start..end).step_by(4) {
/// // invalidate() is an async method, so await it.
/// my_cache.invalidate(&key).await;
/// }
/// })
/// })
/// .collect();
///
/// // Wait for all tasks to complete.
/// futures_util::future::join_all(tasks).await;
///
/// // Verify the result.
/// for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
/// if key % 4 == 0 {
/// assert_eq!(cache.get(&key), None);
/// } else {
/// assert_eq!(cache.get(&key), Some(value(key)));
/// }
/// }
/// }
/// ```
///
/// If you want to atomically initialize and insert a value when the key is not
/// present, you might want to check other insertion methods
/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
///
/// # Avoiding to clone the value at `get`
///
/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
/// time `get` is called for an existing key, it creates a clone of the stored value
/// `V` and returns it. This is because the `Cache` allows concurrent updates from
/// threads so a value stored in the cache can be dropped or replaced at any time by
/// any other thread. `get` cannot return a reference `&V` as it is impossible to
/// guarantee the value outlives the reference.
///
/// If you want to store values that will be expensive to clone, wrap them by
/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
/// thread-safe reference-counted pointer and its `clone()` method is cheap.
///
/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
///
/// # Example: Size-based Eviction
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// // futures-util = "0.3"
///
/// use std::convert::TryInto;
/// use moka::future::Cache;
///
/// #[tokio::main]
/// async fn main() {
/// // Evict based on the number of entries in the cache.
/// let cache = Cache::builder()
/// // Up to 10,000 entries.
/// .max_capacity(10_000)
/// // Create the cache.
/// .build();
/// cache.insert(1, "one".to_string()).await;
///
/// // Evict based on the byte length of strings in the cache.
/// let cache = Cache::builder()
/// // A weigher closure takes &K and &V and returns a u32
/// // representing the relative size of the entry.
/// .weigher(|_key, value: &String| -> u32 {
/// value.len().try_into().unwrap_or(u32::MAX)
/// })
/// // This cache will hold up to 32MiB of values.
/// .max_capacity(32 * 1024 * 1024)
/// .build();
/// cache.insert(2, "two".to_string()).await;
/// }
/// ```
///
/// If your cache should not grow beyond a certain size, use the `max_capacity`
/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
/// will try to evict entries that have not been used recently or very often.
///
/// At the cache creation time, a weigher closure can be set by the `weigher` method
/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
/// returns a `u32` representing the relative size of the entry:
///
/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
/// size of `1`. This means the cache will be bounded by the number of entries.
/// - If the `weigher` is set, the cache will call the weigher to calculate the
/// weighted size (relative size) on an entry. This means the cache will be bounded
/// by the total weighted size of entries.
///
/// Note that weighted sizes are not used when making eviction selections.
///
/// [builder-struct]: ./struct.CacheBuilder.html
///
/// # Example: Time-based Expirations
///
/// `Cache` supports the following expiration policies:
///
/// - **Time to live**: A cached entry will be expired after the specified duration
/// past from `insert`.
/// - **Time to idle**: A cached entry will be expired after the specified duration
/// past from `get` or `insert`.
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// // futures-util = "0.3"
///
/// use moka::future::Cache;
/// use std::time::Duration;
///
/// #[tokio::main]
/// async fn main() {
/// let cache = Cache::builder()
/// // Time to live (TTL): 30 minutes
/// .time_to_live(Duration::from_secs(30 * 60))
/// // Time to idle (TTI): 5 minutes
/// .time_to_idle(Duration::from_secs( 5 * 60))
/// // Create the cache.
/// .build();
///
/// // This entry will expire after 5 minutes (TTI) if there is no get().
/// cache.insert(0, "zero").await;
///
/// // This get() will extend the entry life for another 5 minutes.
/// cache.get(&0);
///
/// // Even though we keep calling get(), the entry will expire
/// // after 30 minutes (TTL) from the insert().
/// }
/// ```
///
/// # Example: Eviction Listener
///
/// A `Cache` can be configured with an eviction listener, a closure that is called
/// every time there is a cache eviction. The listener takes three parameters: the
/// key and value of the evicted entry, and the
/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
/// entry was evicted.
///
/// An eviction listener can be used to keep other data structures in sync with the
/// cache, for example.
///
/// The following example demonstrates how to use an eviction listener with
/// time-to-live expiration to manage the lifecycle of temporary files on a
/// filesystem. The cache stores the paths of the files, and when one of them has
/// expired, the eviction lister will be called with the path, so it can remove the
/// file from the filesystem.
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // anyhow = "1.0"
/// // uuid = { version = "1.1", features = ["v4"] }
/// // tokio = { version = "1.18", features = ["fs", "macros", "rt-multi-thread", "sync", "time"] }
///
/// use moka::future::Cache;
///
/// use anyhow::{anyhow, Context};
/// use std::{
/// io,
/// path::{Path, PathBuf},
/// sync::Arc,
/// time::Duration,
/// };
/// use tokio::{fs, sync::RwLock};
/// use uuid::Uuid;
///
/// /// The DataFileManager writes, reads and removes data files.
/// struct DataFileManager {
/// base_dir: PathBuf,
/// file_count: usize,
/// }
///
/// impl DataFileManager {
/// fn new(base_dir: PathBuf) -> Self {
/// Self {
/// base_dir,
/// file_count: 0,
/// }
/// }
///
/// async fn write_data_file(&mut self, contents: String) -> io::Result<PathBuf> {
/// loop {
/// // Generate a unique file path.
/// let mut path = self.base_dir.to_path_buf();
/// path.push(Uuid::new_v4().as_hyphenated().to_string());
///
/// if path.exists() {
/// continue; // This path is already taken by others. Retry.
/// }
///
/// // We have got a unique file path, so create the file at
/// // the path and write the contents to the file.
/// fs::write(&path, contents).await?;
/// self.file_count += 1;
/// println!(
/// "Created a data file at {:?} (file count: {})",
/// path, self.file_count
/// );
///
/// // Return the path.
/// return Ok(path);
/// }
/// }
///
/// async fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
/// // Reads the contents of the file at the path, and return the contents.
/// fs::read_to_string(path).await
/// }
///
/// async fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
/// // Remove the file at the path.
/// fs::remove_file(path.as_ref()).await?;
/// self.file_count -= 1;
/// println!(
/// "Removed a data file at {:?} (file count: {})",
/// path.as_ref(),
/// self.file_count
/// );
///
/// Ok(())
/// }
/// }
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// // Create an instance of the DataFileManager and wrap it with
/// // Arc<RwLock<_>> so it can be shared across threads.
/// let file_mgr = DataFileManager::new(std::env::temp_dir());
/// let file_mgr = Arc::new(RwLock::new(file_mgr));
///
/// let file_mgr1 = Arc::clone(&file_mgr);
/// let rt = tokio::runtime::Handle::current();
///
/// // Create an eviction lister closure.
/// let listener = move |k, v: PathBuf, cause| {
/// // Try to remove the data file at the path `v`.
/// println!(
/// "\n== An entry has been evicted. k: {:?}, v: {:?}, cause: {:?}",
/// k, v, cause
/// );
/// rt.block_on(async {
/// // Acquire the write lock of the DataFileManager.
/// let mut mgr = file_mgr1.write().await;
/// // Remove the data file. We must handle error cases here to
/// // prevent the listener from panicking.
/// if let Err(_e) = mgr.remove_data_file(v.as_path()).await {
/// eprintln!("Failed to remove a data file at {:?}", v);
/// }
/// });
/// };
///
/// // Create the cache. Set time to live for two seconds and set the
/// // eviction listener.
/// let cache = Cache::builder()
/// .max_capacity(100)
/// .time_to_live(Duration::from_secs(2))
/// .eviction_listener_with_queued_delivery_mode(listener)
/// .build();
///
/// // Insert an entry to the cache.
/// // This will create and write a data file for the key "user1", store the
/// // path of the file to the cache, and return it.
/// println!("== try_get_with()");
/// let path = cache
/// .try_get_with("user1", async {
/// let mut mgr = file_mgr.write().await;
/// let path = mgr
/// .write_data_file("user data".into())
/// .await
/// .with_context(|| format!("Failed to create a data file"))?;
/// Ok(path) as anyhow::Result<_>
/// })
/// .await
/// .map_err(|e| anyhow!("{}", e))?;
///
/// // Read the data file at the path and print the contents.
/// println!("\n== read_data_file()");
/// {
/// let mgr = file_mgr.read().await;
/// let contents = mgr
/// .read_data_file(path.as_path())
/// .await
/// .with_context(|| format!("Failed to read data from {:?}", path))?;
/// println!("contents: {}", contents);
/// }
///
/// // Sleep for five seconds. While sleeping, the cache entry for key "user1"
/// // will be expired and evicted, so the eviction lister will be called to
/// // remove the file.
/// tokio::time::sleep(Duration::from_secs(5)).await;
///
/// Ok(())
/// }
/// ```
///
/// ## You should avoid eviction listener to panic
///
/// It is very important to make an eviction listener closure not to panic.
/// Otherwise, the cache will stop calling the listener after a panic. This is an
/// intended behavior because the cache cannot know whether it is memory safe or not
/// to call the panicked lister again.
///
/// When a listener panics, the cache will swallow the panic and disable the
/// listener. If you want to know when a listener panics and the reason of the panic,
/// you can enable an optional `logging` feature of Moka and check error-level logs.
///
/// To enable the `logging`, do the followings:
///
/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
/// `info`, ...):
/// - If you are using the `env_logger` crate, you can achieve this by setting
/// `RUST_LOG` environment variable to `moka=error`.
/// 3. If you have more than one caches, you may want to set a distinct name for each
/// cache by using cache builder's [`name`][builder-name-method] method. The name
/// will appear in the log.
///
/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
///
/// ## Delivery Modes for Eviction Listener
///
/// The [`DeliveryMode`][delivery-mode] specifies how and when an eviction
/// notification should be delivered to an eviction listener. Currently, the
/// `future::Cache` supports only one delivery mode: `Queued` mode.
///
/// A future version of `future::Cache` will support `Immediate` mode, which will be
/// easier to use in many use cases than queued mode. Unlike the `future::Cache`,
/// the `sync::Cache` already supports it.
///
/// Once `future::Cache` supports the immediate mode, the `eviction_listener` and
/// `eviction_listener_with_conf` methods will be added to the
/// `future::CacheBuilder`. The former will use the immediate mode, and the latter
/// will take a custom configurations to specify the queued mode. The current method
/// `eviction_listener_with_queued_delivery_mode` will be deprecated.
///
/// For more details about the delivery modes, see [this section][sync-delivery-modes]
/// of `sync::Cache` documentation.
///
/// [delivery-mode]: ../notification/enum.DeliveryMode.html
/// [sync-delivery-modes]: ../sync/struct.Cache.html#delivery-modes-for-eviction-listener
///
/// # Thread Safety
///
/// All methods provided by the `Cache` are considered thread-safe, and can be safely
/// accessed by multiple concurrent threads.
///
/// - `Cache<K, V, S>` requires trait bounds `Send`, `Sync` and `'static` for `K`
/// (key), `V` (value) and `S` (hasher state).
/// - `Cache<K, V, S>` will implement `Send` and `Sync`.
///
/// # Sharing a cache across asynchronous tasks
///
/// To share a cache across async tasks (or OS threads), do one of the followings:
///
/// - Create a clone of the cache by calling its `clone` method and pass it to other
/// task.
/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
/// [once_cell][once-cell-crate] create, and set it to a `static` variable.
///
/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
/// reference-counted pointers to the internal data structures.
///
/// [once-cell-crate]: https://crates.io/crates/once_cell
///
/// # Hashing Algorithm
///
/// By default, `Cache` uses a hashing algorithm selected to provide resistance
/// against HashDoS attacks. It will be the same one used by
/// `std::collections::HashMap`, which is currently SipHash 1-3.
///
/// While SipHash's performance is very competitive for medium sized keys, other
/// hashing algorithms will outperform it for small keys such as integers as well as
/// large keys such as long strings. However those algorithms will typically not
/// protect against attacks such as HashDoS.
///
/// The hashing algorithm can be replaced on a per-`Cache` basis using the
/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
/// Many alternative algorithms are available on crates.io, such as the
/// [aHash][ahash-crate] crate.
///
/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
/// [ahash-crate]: https://crates.io/crates/ahash
///
pub struct Cache<K, V, S = RandomState> {
base: BaseCache<K, V, S>,
value_initializer: Arc<ValueInitializer<K, V, S>>,
}
// TODO: https://github.com/moka-rs/moka/issues/54
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<K, V, S> Send for Cache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Send,
{
}
unsafe impl<K, V, S> Sync for Cache<K, V, S>
where
K: Send + Sync,
V: Send + Sync,
S: Sync,
{
}
// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
impl<K, V, S> Clone for Cache<K, V, S> {
/// Makes a clone of this shared cache.
///
/// This operation is cheap as it only creates thread-safe reference counted
/// pointers to the shared internal data structures.
fn clone(&self) -> Self {
Self {
base: self.base.clone(),
value_initializer: Arc::clone(&self.value_initializer),
}
}
}
impl<K, V, S> fmt::Debug for Cache<K, V, S>
where
K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
V: fmt::Debug + Clone + Send + Sync + 'static,
// TODO: Remove these bounds from S.
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d_map = f.debug_map();
for (k, v) in self.iter() {
d_map.entry(&k, &v);
}
d_map.finish()
}
}
impl<K, V, S> Cache<K, V, S> {
/// Returns cache’s name.
pub fn name(&self) -> Option<&str> {
self.base.name()
}
/// Returns a read-only cache policy of this cache.
///
/// At this time, cache policy cannot be modified after cache creation.
/// A future version may support to modify it.
pub fn policy(&self) -> Policy {
self.base.policy()
}
/// Returns an approximate number of entries in this cache.
///
/// The value returned is _an estimate_; the actual count may differ if there are
/// concurrent insertions or removals, or if some entries are pending removal due
/// to expiration. This inaccuracy can be mitigated by performing a `sync()`
/// first.
///
/// # Example
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// use moka::future::Cache;
///
/// #[tokio::main]
/// async fn main() {
/// let cache = Cache::new(10);
/// cache.insert('n', "Netherland Dwarf").await;
/// cache.insert('l', "Lop Eared").await;
/// cache.insert('d', "Dutch").await;
///
/// // Ensure an entry exists.
/// assert!(cache.contains_key(&'n'));
///
/// // However, followings may print stale number zeros instead of threes.
/// println!("{}", cache.entry_count()); // -> 0
/// println!("{}", cache.weighted_size()); // -> 0
///
/// // To mitigate the inaccuracy, bring `ConcurrentCacheExt` trait to
/// // the scope so we can use `sync` method.
/// use moka::future::ConcurrentCacheExt;
/// // Call `sync` to run pending internal tasks.
/// cache.sync();
///
/// // Followings will print the actual numbers.
/// println!("{}", cache.entry_count()); // -> 3
/// println!("{}", cache.weighted_size()); // -> 3
/// }
/// ```
///
pub fn entry_count(&self) -> u64 {
self.base.entry_count()
}
/// Returns an approximate total weighted size of entries in this cache.
///
/// The value returned is _an estimate_; the actual size may differ if there are
/// concurrent insertions or removals, or if some entries are pending removal due
/// to expiration. This inaccuracy can be mitigated by performing a `sync()`
/// first. See [`entry_count`](#method.entry_count) for a sample code.
pub fn weighted_size(&self) -> u64 {
self.base.weighted_size()
}
#[cfg(feature = "unstable-debug-counters")]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-debug-counters")))]
pub fn debug_stats(&self) -> CacheDebugStats {
self.base.debug_stats()
}
}
impl<K, V> Cache<K, V, RandomState>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
/// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
///
/// To adjust various configuration knobs such as `initial_capacity` or
/// `time_to_live`, use the [`CacheBuilder`][builder-struct].
///
/// [builder-struct]: ./struct.CacheBuilder.html
pub fn new(max_capacity: u64) -> Self {
let build_hasher = RandomState::default();
Self::with_everything(
None,
Some(max_capacity),
None,
build_hasher,
None,
None,
None,
None,
None,
false,
housekeeper::Configuration::new_thread_pool(true),
)
}
/// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` with
/// various configuration knobs.
///
/// [builder-struct]: ./struct.CacheBuilder.html
pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
CacheBuilder::default()
}
}
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
// https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
#[allow(clippy::too_many_arguments)]
pub(crate) fn with_everything(
name: Option<String>,
max_capacity: Option<u64>,
initial_capacity: Option<usize>,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_listener: Option<EvictionListener<K, V>>,
eviction_listener_conf: Option<notification::Configuration>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
housekeeper_conf: housekeeper::Configuration,
) -> Self {
Self {
base: BaseCache::new(
name,
max_capacity,
initial_capacity,
build_hasher.clone(),
weigher,
eviction_listener,
eviction_listener_conf,
time_to_live,
time_to_idle,
invalidator_enabled,
housekeeper_conf,
),
value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
}
}
/// Returns `true` if the cache contains a value for the key.
///
/// Unlike the `get` method, this method is not considered a cache read operation,
/// so it does not update the historic popularity estimator or reset the idle
/// timer for the key.
///
/// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
/// on the borrowed form _must_ match those for the key type.
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.base.contains_key_with_hash(key, self.base.hash(key))
}
/// Returns a _clone_ of the value corresponding to the key.
///
/// If you want to store values that will be expensive to clone, wrap them by
/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
/// thread-safe reference-counted pointer and its `clone()` method is cheap.
///
/// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
/// on the borrowed form _must_ match those for the key type.
///
/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.base.get_with_hash(key, self.base.hash(key))
}
/// Deprecated, replaced with [`get_with`](#method.get_with)
#[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
pub async fn get_or_insert_with(&self, key: K, init: impl Future<Output = V>) -> V {
self.get_with(key, init).await
}
/// Deprecated, replaced with [`try_get_with`](#method.try_get_with)
#[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
pub async fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
{
self.try_get_with(key, init).await
}
/// Returns a _clone_ of the value corresponding to the key. If the value does
/// not exist, resolve the `init` future and inserts the output.
///
/// # Concurrent calls on the same key
///
/// This method guarantees that concurrent calls on the same not-existing key are
/// coalesced into one evaluation of the `init` future. Only one of the calls
/// evaluates its future, and other calls wait for that future to resolve.
///
/// The following code snippet demonstrates this behavior:
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // futures-util = "0.3"
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// use moka::future::Cache;
/// use std::sync::Arc;
///
/// #[tokio::main]
/// async fn main() {
/// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
/// let cache = Cache::new(100);
///
/// // Spawn four async tasks.
/// let tasks: Vec<_> = (0..4_u8)
/// .map(|task_id| {
/// let my_cache = cache.clone();
/// tokio::spawn(async move {
/// println!("Task {} started.", task_id);
///
/// // Insert and get the value for key1. Although all four async tasks
/// // will call `get_with` at the same time, the `init` async
/// // block must be resolved only once.
/// let value = my_cache
/// .get_with("key1", async move {
/// println!("Task {} inserting a value.", task_id);
/// Arc::new(vec![0u8; TEN_MIB])
/// })
/// .await;
///
/// // Ensure the value exists now.
/// assert_eq!(value.len(), TEN_MIB);
/// assert!(my_cache.get(&"key1").is_some());
///
/// println!("Task {} got the value. (len: {})", task_id, value.len());
/// })
/// })
/// .collect();
///
/// // Run all tasks concurrently and wait for them to complete.
/// futures_util::future::join_all(tasks).await;
/// }
/// ```
///
/// **A Sample Result**
///
/// - The `init` future (async black) was resolved exactly once by task 3.
/// - Other tasks were blocked until task 3 inserted the value.
///
/// ```console
/// Task 0 started.
/// Task 3 started.
/// Task 1 started.
/// Task 2 started.
/// Task 3 inserting a value.
/// Task 3 got the value. (len: 10485760)
/// Task 0 got the value. (len: 10485760)
/// Task 1 got the value. (len: 10485760)
/// Task 2 got the value. (len: 10485760)
/// ```
///
/// # Panics
///
/// This method panics when the `init` future has panicked. When it happens, only
/// the caller whose `init` future panicked will get the panic (e.g. only task 3
/// in the above sample). If there are other calls in progress (e.g. task 0, 1
/// and 2 above), this method will restart and resolve one of the remaining
/// `init` futures.
///
pub async fn get_with(&self, key: K, init: impl Future<Output = V>) -> V {
let hash = self.base.hash(&key);
let key = Arc::new(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if)
.await
}
/// Similar to [`get_with`](#method.get_with), but instead of passing an owned
/// key, you can pass a reference to the key. If the key does not exist in the
/// cache, the key will be cloned to create new entry in the cache.
pub async fn get_with_by_ref<Q>(&self, key: &Q, init: impl Future<Output = V>) -> V
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
let replace_if = None as Option<fn(&V) -> bool>;
self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if)
.await
}
/// Works like [`get_with`](#method.get_with), but takes an additional
/// `replace_if` closure.
///
/// This method will resolve the `init` future and insert the output to the
/// cache when:
///
/// - The key does not exist.
/// - Or, `replace_if` closure returns `true`.
pub async fn get_with_if(
&self,
key: K,
init: impl Future<Output = V>,
replace_if: impl FnMut(&V) -> bool,
) -> V {
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if))
.await
}
// We will provide this API under the new `entry` API.
//
// /// Similar to [`get_with_if`](#method.get_with_if), but instead of passing an
// /// owned key, you can pass a reference to the key. If the key does not exist in
// /// the cache, the key will be cloned to create new entry in the cache.
// pub async fn get_with_if_by_ref<Q>(
// &self,
// key: &Q,
// init: impl Future<Output = V>,
// replace_if: impl FnMut(&V) -> bool,
// ) -> V
// where
// K: Borrow<Q>,
// Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
// {
// let hash = self.base.hash(key);
// self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, Some(replace_if))
// .await
// }
/// Returns a _clone_ of the value corresponding to the key. If the value does
/// not exist, resolves the `init` future, and inserts the value if `Some(value)`
/// was returned. If `None` was returned from the future, this method does not
/// insert a value and returns `None`.
///
/// # Concurrent calls on the same key
///
/// This method guarantees that concurrent calls on the same not-existing key are
/// coalesced into one evaluation of the `init` future. Only one of the calls
/// evaluates its future, and other calls wait for that future to resolve.
///
/// The following code snippet demonstrates this behavior:
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // futures-util = "0.3"
/// // reqwest = "0.11"
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// use moka::future::Cache;
///
/// // This async function tries to get HTML from the given URI.
/// async fn get_html(task_id: u8, uri: &str) -> Option<String> {
/// println!("get_html() called by task {}.", task_id);
/// reqwest::get(uri).await.ok()?.text().await.ok()
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let cache = Cache::new(100);
///
/// // Spawn four async tasks.
/// let tasks: Vec<_> = (0..4_u8)
/// .map(|task_id| {
/// let my_cache = cache.clone();
/// tokio::spawn(async move {
/// println!("Task {} started.", task_id);
///
/// // Try to insert and get the value for key1. Although
/// // all four async tasks will call `try_get_with`
/// // at the same time, get_html() must be called only once.
/// let value = my_cache
/// .optionally_get_with(
/// "key1",
/// get_html(task_id, "https://www.rust-lang.org"),
/// ).await;
///
/// // Ensure the value exists now.
/// assert!(value.is_some());
/// assert!(my_cache.get(&"key1").is_some());
///
/// println!(
/// "Task {} got the value. (len: {})",
/// task_id,
/// value.unwrap().len()
/// );
/// })
/// })
/// .collect();
///
/// // Run all tasks concurrently and wait for them to complete.
/// futures_util::future::join_all(tasks).await;
/// }
/// ```
///
/// **A Sample Result**
///
/// - `get_html()` was called exactly once by task 2.
/// - Other tasks were blocked until task 2 inserted the value.
///
/// ```console
/// Task 1 started.
/// Task 0 started.
/// Task 2 started.
/// Task 3 started.
/// get_html() called by task 2.
/// Task 2 got the value. (len: 19419)
/// Task 1 got the value. (len: 19419)
/// Task 0 got the value. (len: 19419)
/// Task 3 got the value. (len: 19419)
/// ```
///
/// # Panics
///
/// This method panics when the `init` future has panicked. When it happens, only
/// the caller whose `init` future panicked will get the panic (e.g. only task 2
/// in the above sample). If there are other calls in progress (e.g. task 0, 1
/// and 3 above), this method will restart and resolve one of the remaining
/// `init` futures.
///
pub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
where
F: Future<Output = Option<V>>,
{
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_optionally_insert_with_hash_and_fun(key, hash, init)
.await
}
/// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
/// of passing an owned key, you can pass a reference to the key. If the key does
/// not exist in the cache, the key will be cloned to create new entry in the
/// cache.
pub async fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
where
F: Future<Output = Option<V>>,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init)
.await
}
/// Returns a _clone_ of the value corresponding to the key. If the value does
/// not exist, resolves the `init` future, and inserts the value if `Ok(value)`
/// was returned. If `Err(_)` was returned from the future, this method does not
/// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
///
/// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
///
/// # Concurrent calls on the same key
///
/// This method guarantees that concurrent calls on the same not-existing key are
/// coalesced into one evaluation of the `init` future (as long as these
/// futures return the same error type). Only one of the calls evaluates its
/// future, and other calls wait for that future to resolve.
///
/// The following code snippet demonstrates this behavior:
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // futures-util = "0.3"
/// // reqwest = "0.11"
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// use moka::future::Cache;
///
/// // This async function tries to get HTML from the given URI.
/// async fn get_html(task_id: u8, uri: &str) -> Result<String, reqwest::Error> {
/// println!("get_html() called by task {}.", task_id);
/// reqwest::get(uri).await?.text().await
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let cache = Cache::new(100);
///
/// // Spawn four async tasks.
/// let tasks: Vec<_> = (0..4_u8)
/// .map(|task_id| {
/// let my_cache = cache.clone();
/// tokio::spawn(async move {
/// println!("Task {} started.", task_id);
///
/// // Try to insert and get the value for key1. Although
/// // all four async tasks will call `try_get_with`
/// // at the same time, get_html() must be called only once.
/// let value = my_cache
/// .try_get_with(
/// "key1",
/// get_html(task_id, "https://www.rust-lang.org"),
/// ).await;
///
/// // Ensure the value exists now.
/// assert!(value.is_ok());
/// assert!(my_cache.get(&"key1").is_some());
///
/// println!(
/// "Task {} got the value. (len: {})",
/// task_id,
/// value.unwrap().len()
/// );
/// })
/// })
/// .collect();
///
/// // Run all tasks concurrently and wait for them to complete.
/// futures_util::future::join_all(tasks).await;
/// }
/// ```
///
/// **A Sample Result**
///
/// - `get_html()` was called exactly once by task 2.
/// - Other tasks were blocked until task 2 inserted the value.
///
/// ```console
/// Task 1 started.
/// Task 0 started.
/// Task 2 started.
/// Task 3 started.
/// get_html() called by task 2.
/// Task 2 got the value. (len: 19419)
/// Task 1 got the value. (len: 19419)
/// Task 0 got the value. (len: 19419)
/// Task 3 got the value. (len: 19419)
/// ```
///
/// # Panics
///
/// This method panics when the `init` future has panicked. When it happens, only
/// the caller whose `init` future panicked will get the panic (e.g. only task 2
/// in the above sample). If there are other calls in progress (e.g. task 0, 1
/// and 3 above), this method will restart and resolve one of the remaining
/// `init` futures.
///
pub async fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
{
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.get_or_try_insert_with_hash_and_fun(key, hash, init)
.await
}
/// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
/// owned key, you can pass a reference to the key. If the key does not exist in
/// the cache, the key will be cloned to create new entry in the cache.
pub async fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init)
.await
}
/// Inserts a key-value pair into the cache.
///
/// If the cache has this key present, the value is updated.
pub async fn insert(&self, key: K, value: V) {
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.insert_with_hash(key, hash, value).await
}
fn do_blocking_insert(&self, key: K, value: V) {
let hash = self.base.hash(&key);
let key = Arc::new(key);
let (op, now) = self.base.do_insert_with_hash(key, hash, value);
let hk = self.base.housekeeper.as_ref();
Self::blocking_schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.expect("Failed to insert");
}
/// Discards any cached value for the key.
///
/// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
/// on the borrowed form _must_ match those for the key type.
pub async fn invalidate<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
if let Some(kv) = self.base.remove_entry(key, hash) {
if self.base.is_removal_notifier_enabled() {
self.base.notify_invalidate(&kv.key, &kv.entry)
}
let op = WriteOp::Remove(kv);
let now = self.base.current_time_from_expiration_clock();
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.await
.expect("Failed to remove");
crossbeam_epoch::pin().flush();
}
}
fn do_blocking_invalidate<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.base.hash(key);
if let Some(kv) = self.base.remove_entry(key, hash) {
let op = WriteOp::Remove(kv);
let now = self.base.current_time_from_expiration_clock();
let hk = self.base.housekeeper.as_ref();
Self::blocking_schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.expect("Failed to remove");
}
}
/// Discards all cached values.
///
/// This method returns immediately and a background thread will evict all the
/// cached values inserted before the time when this method was called. It is
/// guaranteed that the `get` method must not return these invalidated values
/// even if they have not been evicted.
///
/// Like the `invalidate` method, this method does not clear the historic
/// popularity estimator of keys so that it retains the client activities of
/// trying to retrieve an item.
pub fn invalidate_all(&self) {
self.base.invalidate_all();
}
/// Discards cached values that satisfy a predicate.
///
/// `invalidate_entries_if` takes a closure that returns `true` or `false`. This
/// method returns immediately and a background thread will apply the closure to
/// each cached value inserted before the time when `invalidate_entries_if` was
/// called. If the closure returns `true` on a value, that value will be evicted
/// from the cache.
///
/// Also the `get` method will apply the closure to a value to determine if it
/// should have been invalidated. Therefore, it is guaranteed that the `get`
/// method must not return invalidated values.
///
/// Note that you must call
/// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
/// at the cache creation time as the cache needs to maintain additional internal
/// data structures to support this method. Otherwise, calling this method will
/// fail with a
/// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
///
/// Like the `invalidate` method, this method does not clear the historic
/// popularity estimator of keys so that it retains the client activities of
/// trying to retrieve an item.
///
/// [support-invalidation-closures]: ./struct.CacheBuilder.html#method.support_invalidation_closures
/// [invalidation-disabled-error]: ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
where
F: Fn(&K, &V) -> bool + Send + Sync + 'static,
{
self.base.invalidate_entries_if(Arc::new(predicate))
}
/// Creates an iterator visiting all key-value pairs in arbitrary order. The
/// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
/// value.
///
/// Iterators do not block concurrent reads and writes on the cache. An entry can
/// be inserted to, invalidated or evicted from a cache while iterators are alive
/// on the same cache.
///
/// Unlike the `get` method, visiting entries via an iterator do not update the
/// historic popularity estimator or reset idle timers for keys.
///
/// # Guarantees
///
/// In order to allow concurrent access to the cache, iterator's `next` method
/// does _not_ guarantee the following:
///
/// - It does not guarantee to return a key-value pair (an entry) if its key has
/// been inserted to the cache _after_ the iterator was created.
/// - Such an entry may or may not be returned depending on key's hash and
/// timing.
///
/// and the `next` method guarantees the followings:
///
/// - It guarantees not to return the same entry more than once.
/// - It guarantees not to return an entry if it has been removed from the cache
/// after the iterator was created.
/// - Note: An entry can be removed by following reasons:
/// - Manually invalidated.
/// - Expired (e.g. time-to-live).
/// - Evicted as the cache capacity exceeded.
///
/// # Examples
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// use moka::future::Cache;
///
/// #[tokio::main]
/// async fn main() {
/// let cache = Cache::new(100);
/// cache.insert("Julia", 14).await;
///
/// let mut iter = cache.iter();
/// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
/// assert_eq!(*k, "Julia");
/// assert_eq!(v, 14);
///
/// assert!(iter.next().is_none());
/// }
/// ```
///
pub fn iter(&self) -> Iter<'_, K, V> {
use crate::sync_base::iter::{Iter as InnerIter, ScanningGet};
let inner = InnerIter::with_single_cache_segment(&self.base, self.base.num_cht_segments());
Iter::new(inner)
}
/// Returns a `BlockingOp` for this cache. It provides blocking
/// [`insert`](./struct.BlockingOp.html#method.insert) and
/// [`invalidate`](struct.BlockingOp.html#method.invalidate) methods, which
/// can be called outside of asynchronous contexts.
pub fn blocking(&self) -> BlockingOp<'_, K, V, S> {
BlockingOp(self)
}
}
impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
type Item = (Arc<K>, V);
type IntoIter = Iter<'a, K, V>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<K, V, S> ConcurrentCacheExt<K, V> for Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn sync(&self) {
self.base.inner.sync(MAX_SYNC_REPEATS);
}
}
//
// private methods
//
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
async fn get_or_insert_with_hash_and_fun(
&self,
key: Arc<K>,
hash: u64,
init: impl Future<Output = V>,
mut replace_if: Option<impl FnMut(&V) -> bool>,
) -> V {
let maybe_v = self
.base
.get_with_hash_but_ignore_if(&key, hash, replace_if.as_mut());
if let Some(v) = maybe_v {
v
} else {
self.insert_with_hash_and_fun(key, hash, init, replace_if)
.await
}
}
async fn get_or_insert_with_hash_by_ref_and_fun<Q>(
&self,
key: &Q,
hash: u64,
init: impl Future<Output = V>,
mut replace_if: Option<impl FnMut(&V) -> bool>,
) -> V
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let maybe_v = self
.base
.get_with_hash_but_ignore_if(key, hash, replace_if.as_mut());
if let Some(v) = maybe_v {
v
} else {
let key = Arc::new(key.to_owned());
self.insert_with_hash_and_fun(key, hash, init, replace_if)
.await
}
}
async fn insert_with_hash_and_fun(
&self,
key: Arc<K>,
hash: u64,
init: impl Future<Output = V>,
mut replace_if: Option<impl FnMut(&V) -> bool>,
) -> V {
use futures_util::FutureExt;
let get = || {
self.base
.get_with_hash_but_no_recording(&key, hash, replace_if.as_mut())
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v).boxed();
match self
.value_initializer
.init_or_read(Arc::clone(&key), get, init, insert)
.await
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
v
}
InitResult::ReadExisting(v) => v,
InitResult::InitErr(_) => unreachable!(),
}
}
async fn get_or_try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
{
if let Some(v) = self.base.get_with_hash(&key, hash) {
return Ok(v);
}
self.try_insert_with_hash_and_fun(key, hash, init).await
}
async fn get_or_try_insert_with_hash_by_ref_and_fun<F, E, Q>(
&self,
key: &Q,
hash: u64,
init: F,
) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
if let Some(v) = self.base.get_with_hash(key, hash) {
return Ok(v);
}
let key = Arc::new(key.to_owned());
self.try_insert_with_hash_and_fun(key, hash, init).await
}
async fn try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, E>>,
E: Send + Sync + 'static,
{
use futures_util::FutureExt;
let get = || {
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash_but_no_recording(&key, hash, ignore_if)
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v).boxed();
match self
.value_initializer
.try_init_or_read(Arc::clone(&key), get, init, insert)
.await
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Ok(v)
}
InitResult::ReadExisting(v) => Ok(v),
InitResult::InitErr(e) => {
crossbeam_epoch::pin().flush();
Err(e)
}
}
}
async fn get_or_optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Option<V>
where
F: Future<Output = Option<V>>,
{
let res = self.base.get_with_hash(&key, hash);
if res.is_some() {
return res;
}
self.optionally_insert_with_hash_and_fun(key, hash, init)
.await
}
async fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
&self,
key: &Q,
hash: u64,
init: F,
) -> Option<V>
where
F: Future<Output = Option<V>>,
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
let res = self.base.get_with_hash(key, hash);
if res.is_some() {
return res;
}
let key = Arc::new(key.to_owned());
self.optionally_insert_with_hash_and_fun(key, hash, init)
.await
}
async fn optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Option<V>
where
F: Future<Output = Option<V>>,
{
use futures_util::FutureExt;
let get = || {
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash_but_no_recording(&key, hash, ignore_if)
};
let insert = |v| self.insert_with_hash(key.clone(), hash, v).boxed();
match self
.value_initializer
.optionally_init_or_read(Arc::clone(&key), get, init, insert)
.await
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Some(v)
}
InitResult::ReadExisting(v) => Some(v),
InitResult::InitErr(_) => None,
}
}
async fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
let (op, now) = self.base.do_insert_with_hash(key, hash, value);
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.await
.expect("Failed to insert");
}
#[inline]
async fn schedule_write_op(
inner: &impl InnerSync,
ch: &Sender<WriteOp<K, V>>,
op: WriteOp<K, V>,
now: Instant,
housekeeper: Option<&HouseKeeperArc<K, V, S>>,
) -> Result<(), TrySendError<WriteOp<K, V>>> {
let mut op = op;
// TODO: Try to replace the timer with an async event listener to see if it
// can provide better performance.
loop {
BaseCache::apply_reads_writes_if_needed(inner, ch, now, housekeeper);
match ch.try_send(op) {
Ok(()) => break,
Err(TrySendError::Full(op1)) => {
op = op1;
async_io::Timer::after(Duration::from_micros(WRITE_RETRY_INTERVAL_MICROS))
.await;
}
Err(e @ TrySendError::Disconnected(_)) => return Err(e),
}
}
Ok(())
}
#[inline]
fn blocking_schedule_write_op(
inner: &impl InnerSync,
ch: &Sender<WriteOp<K, V>>,
op: WriteOp<K, V>,
now: Instant,
housekeeper: Option<&HouseKeeperArc<K, V, S>>,
) -> Result<(), TrySendError<WriteOp<K, V>>> {
let mut op = op;
loop {
BaseCache::apply_reads_writes_if_needed(inner, ch, now, housekeeper);
match ch.try_send(op) {
Ok(()) => break,
Err(TrySendError::Full(op1)) => {
op = op1;
std::thread::sleep(Duration::from_micros(WRITE_RETRY_INTERVAL_MICROS));
}
Err(e @ TrySendError::Disconnected(_)) => return Err(e),
}
}
Ok(())
}
}
// For unit tests.
#[cfg(test)]
impl<K, V, S> Cache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn is_table_empty(&self) -> bool {
self.entry_count() == 0
}
fn invalidation_predicate_count(&self) -> usize {
self.base.invalidation_predicate_count()
}
fn reconfigure_for_testing(&mut self) {
self.base.reconfigure_for_testing();
}
fn set_expiration_clock(&self, clock: Option<crate::common::time::Clock>) {
self.base.set_expiration_clock(clock);
}
}
pub struct BlockingOp<'a, K, V, S>(&'a Cache<K, V, S>);
impl<'a, K, V, S> BlockingOp<'a, K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
/// Inserts a key-value pair into the cache. If the cache has this key present,
/// the value is updated.
///
/// This method is intended for use cases where you are inserting from
/// synchronous code.
pub fn insert(&self, key: K, value: V) {
self.0.do_blocking_insert(key, value)
}
/// Discards any cached value for the key.
///
/// This method is intended for use cases where you are invalidating from
/// synchronous code.
///
/// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
/// on the borrowed form _must_ match those for the key type.
pub fn invalidate<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.0.do_blocking_invalidate(key)
}
}
// To see the debug prints, run test as `cargo test -- --nocapture`
#[cfg(test)]
mod tests {
use super::{Cache, ConcurrentCacheExt};
use crate::{common::time::Clock, notification::RemovalCause};
use async_io::Timer;
use parking_lot::Mutex;
use std::{convert::Infallible, sync::Arc, time::Duration};
#[tokio::test]
async fn basic_single_async_task() {
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
// We use non-async mutex in the eviction listener (because the listener
// is a regular closure).
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
cache.insert("a", "alice").await;
cache.insert("b", "bob").await;
assert_eq!(cache.get(&"a"), Some("alice"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b"), Some("bob"));
cache.sync();
// counts: a -> 1, b -> 1
cache.insert("c", "cindy").await;
assert_eq!(cache.get(&"c"), Some("cindy"));
assert!(cache.contains_key(&"c"));
// counts: a -> 1, b -> 1, c -> 1
cache.sync();
assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert!(cache.contains_key(&"b"));
cache.sync();
// counts: a -> 2, b -> 2, c -> 1
// "d" should not be admitted because its frequency is too low.
cache.insert("d", "david").await; // count: d -> 0
expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.sync();
assert_eq!(cache.get(&"d"), None); // d -> 1
assert!(!cache.contains_key(&"d"));
cache.insert("d", "david").await;
expected.push((Arc::new("d"), "david", RemovalCause::Size));
cache.sync();
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d"), None); // d -> 2
// "d" should be admitted and "c" should be evicted
// because d's frequency is higher than c's.
cache.insert("d", "dennis").await;
expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
cache.sync();
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert_eq!(cache.get(&"c"), None);
assert_eq!(cache.get(&"d"), Some("dennis"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
cache.invalidate(&"b").await;
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
cache.sync();
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"b"));
verify_notification_vec(&cache, actual, &expected);
}
#[test]
fn basic_single_blocking_api() {
let mut cache = Cache::new(3);
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
cache.blocking().insert("a", "alice");
cache.blocking().insert("b", "bob");
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
cache.sync();
// counts: a -> 1, b -> 1
cache.blocking().insert("c", "cindy");
assert_eq!(cache.get(&"c"), Some("cindy"));
// counts: a -> 1, b -> 1, c -> 1
cache.sync();
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
cache.sync();
// counts: a -> 2, b -> 2, c -> 1
// "d" should not be admitted because its frequency is too low.
cache.blocking().insert("d", "david"); // count: d -> 0
cache.sync();
assert_eq!(cache.get(&"d"), None); // d -> 1
cache.blocking().insert("d", "david");
cache.sync();
assert_eq!(cache.get(&"d"), None); // d -> 2
// "d" should be admitted and "c" should be evicted
// because d's frequency is higher than c's.
cache.blocking().insert("d", "dennis");
cache.sync();
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert_eq!(cache.get(&"c"), None);
assert_eq!(cache.get(&"d"), Some("dennis"));
cache.blocking().invalidate(&"b");
assert_eq!(cache.get(&"b"), None);
}
#[tokio::test]
async fn size_aware_eviction() {
let weigher = |_k: &&str, v: &(&str, u32)| v.1;
let alice = ("alice", 10);
let bob = ("bob", 15);
let bill = ("bill", 20);
let cindy = ("cindy", 5);
let david = ("david", 15);
let dennis = ("dennis", 15);
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.max_capacity(31)
.weigher(weigher)
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
cache.insert("a", alice).await;
cache.insert("b", bob).await;
assert_eq!(cache.get(&"a"), Some(alice));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b"), Some(bob));
cache.sync();
// order (LRU -> MRU) and counts: a -> 1, b -> 1
cache.insert("c", cindy).await;
assert_eq!(cache.get(&"c"), Some(cindy));
assert!(cache.contains_key(&"c"));
// order and counts: a -> 1, b -> 1, c -> 1
cache.sync();
assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a"), Some(alice));
assert_eq!(cache.get(&"b"), Some(bob));
assert!(cache.contains_key(&"b"));
cache.sync();
// order and counts: c -> 1, a -> 2, b -> 2
// To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
// "d" must have higher count than 3, which is the aggregated count
// of "a" and "c".
cache.insert("d", david).await; // count: d -> 0
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_eq!(cache.get(&"d"), None); // d -> 1
assert!(!cache.contains_key(&"d"));
cache.insert("d", david).await;
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d"), None); // d -> 2
cache.insert("d", david).await;
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert_eq!(cache.get(&"d"), None); // d -> 3
assert!(!cache.contains_key(&"d"));
cache.insert("d", david).await;
expected.push((Arc::new("d"), david, RemovalCause::Size));
cache.sync();
assert!(!cache.contains_key(&"d"));
assert_eq!(cache.get(&"d"), None); // d -> 4
// Finally "d" should be admitted by evicting "c" and "a".
cache.insert("d", dennis).await;
expected.push((Arc::new("c"), cindy, RemovalCause::Size));
expected.push((Arc::new("a"), alice, RemovalCause::Size));
cache.sync();
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), Some(bob));
assert_eq!(cache.get(&"c"), None);
assert_eq!(cache.get(&"d"), Some(dennis));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
// Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
cache.insert("b", bill).await;
expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
expected.push((Arc::new("d"), dennis, RemovalCause::Size));
cache.sync();
assert_eq!(cache.get(&"b"), Some(bill));
assert_eq!(cache.get(&"d"), None);
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"d"));
// Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
cache.insert("a", alice).await;
cache.insert("b", bob).await;
expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
cache.sync();
assert_eq!(cache.get(&"a"), Some(alice));
assert_eq!(cache.get(&"b"), Some(bob));
assert_eq!(cache.get(&"d"), None);
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"d"));
// Verify the sizes.
assert_eq!(cache.entry_count(), 2);
assert_eq!(cache.weighted_size(), 25);
verify_notification_vec(&cache, actual, &expected);
}
#[tokio::test]
async fn basic_multi_async_tasks() {
let num_tasks = 4;
let cache = Cache::new(100);
let tasks = (0..num_tasks)
.map(|id| {
let cache = cache.clone();
if id == 0 {
tokio::spawn(async move {
cache.blocking().insert(10, format!("{}-100", id));
cache.get(&10);
cache.blocking().insert(20, format!("{}-200", id));
cache.blocking().invalidate(&10);
})
} else {
tokio::spawn(async move {
cache.insert(10, format!("{}-100", id)).await;
cache.get(&10);
cache.insert(20, format!("{}-200", id)).await;
cache.invalidate(&10).await;
})
}
})
.collect::<Vec<_>>();
let _ = futures_util::future::join_all(tasks).await;
assert!(cache.get(&10).is_none());
assert!(cache.get(&20).is_some());
assert!(!cache.contains_key(&10));
assert!(cache.contains_key(&20));
}
#[tokio::test]
async fn invalidate_all() {
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.max_capacity(100)
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
cache.insert("a", "alice").await;
cache.insert("b", "bob").await;
cache.insert("c", "cindy").await;
assert_eq!(cache.get(&"a"), Some("alice"));
assert_eq!(cache.get(&"b"), Some("bob"));
assert_eq!(cache.get(&"c"), Some("cindy"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(cache.contains_key(&"c"));
// `cache.sync()` is no longer needed here before invalidating. The last
// modified timestamp of the entries were updated when they were inserted.
// https://github.com/moka-rs/moka/issues/155
cache.invalidate_all();
expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
cache.sync();
cache.insert("d", "david").await;
cache.sync();
assert!(cache.get(&"a").is_none());
assert!(cache.get(&"b").is_none());
assert!(cache.get(&"c").is_none());
assert_eq!(cache.get(&"d"), Some("david"));
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
verify_notification_vec(&cache, actual, &expected);
}
// This test is for https://github.com/moka-rs/moka/issues/155
#[tokio::test]
async fn invalidate_all_without_sync() {
let cache = Cache::new(1024);
assert_eq!(cache.get(&0), None);
cache.insert(0, 1).await;
assert_eq!(cache.get(&0), Some(1));
cache.invalidate_all();
assert_eq!(cache.get(&0), None);
}
#[tokio::test]
async fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
use std::collections::HashSet;
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.max_capacity(100)
.support_invalidation_closures()
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
// Make the cache exterior immutable.
let cache = cache;
cache.insert(0, "alice").await;
cache.insert(1, "bob").await;
cache.insert(2, "alex").await;
cache.sync();
mock.increment(Duration::from_secs(5)); // 5 secs from the start.
cache.sync();
assert_eq!(cache.get(&0), Some("alice"));
assert_eq!(cache.get(&1), Some("bob"));
assert_eq!(cache.get(&2), Some("alex"));
assert!(cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(cache.contains_key(&2));
let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
assert_eq!(cache.invalidation_predicate_count(), 1);
expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
mock.increment(Duration::from_secs(5)); // 10 secs from the start.
cache.insert(3, "alice").await;
// Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
cache.sync(); // To submit the invalidation task.
std::thread::sleep(Duration::from_millis(200));
cache.sync(); // To process the task result.
std::thread::sleep(Duration::from_millis(200));
assert!(cache.get(&0).is_none());
assert!(cache.get(&2).is_none());
assert_eq!(cache.get(&1), Some("bob"));
// This should survive as it was inserted after calling invalidate_entries_if.
assert_eq!(cache.get(&3), Some("alice"));
assert!(!cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(!cache.contains_key(&2));
assert!(cache.contains_key(&3));
assert_eq!(cache.entry_count(), 2);
assert_eq!(cache.invalidation_predicate_count(), 0);
mock.increment(Duration::from_secs(5)); // 15 secs from the start.
cache.invalidate_entries_if(|_k, &v| v == "alice")?;
cache.invalidate_entries_if(|_k, &v| v == "bob")?;
assert_eq!(cache.invalidation_predicate_count(), 2);
// key 1 was inserted before key 3.
expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
// Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
cache.sync(); // To submit the invalidation task.
std::thread::sleep(Duration::from_millis(200));
cache.sync(); // To process the task result.
std::thread::sleep(Duration::from_millis(200));
assert!(cache.get(&1).is_none());
assert!(cache.get(&3).is_none());
assert!(!cache.contains_key(&1));
assert!(!cache.contains_key(&3));
assert_eq!(cache.entry_count(), 0);
assert_eq!(cache.invalidation_predicate_count(), 0);
verify_notification_vec(&cache, actual, &expected);
Ok(())
}
#[tokio::test]
async fn time_to_live() {
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.max_capacity(100)
.time_to_live(Duration::from_secs(10))
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
// Make the cache exterior immutable.
let cache = cache;
cache.insert("a", "alice").await;
cache.sync();
mock.increment(Duration::from_secs(5)); // 5 secs from the start.
cache.sync();
assert_eq!(cache.get(&"a"), Some("alice"));
assert!(cache.contains_key(&"a"));
mock.increment(Duration::from_secs(5)); // 10 secs.
expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert!(!cache.contains_key(&"a"));
assert_eq!(cache.iter().count(), 0);
cache.sync();
assert!(cache.is_table_empty());
cache.insert("b", "bob").await;
cache.sync();
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); // 15 secs.
cache.sync();
assert_eq!(cache.get(&"b"), Some("bob"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
cache.insert("b", "bill").await;
expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
cache.sync();
mock.increment(Duration::from_secs(5)); // 20 secs
cache.sync();
assert_eq!(cache.get(&"b"), Some("bill"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(5)); // 25 secs
expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.sync();
assert!(cache.is_table_empty());
verify_notification_vec(&cache, actual, &expected);
}
#[tokio::test]
async fn time_to_idle() {
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
// Make the cache exterior immutable.
let cache = cache;
cache.insert("a", "alice").await;
cache.sync();
mock.increment(Duration::from_secs(5)); // 5 secs from the start.
cache.sync();
assert_eq!(cache.get(&"a"), Some("alice"));
mock.increment(Duration::from_secs(5)); // 10 secs.
cache.sync();
cache.insert("b", "bob").await;
cache.sync();
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(2)); // 12 secs.
cache.sync();
// contains_key does not reset the idle timer for the key.
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
cache.sync();
assert_eq!(cache.entry_count(), 2);
mock.increment(Duration::from_secs(3)); // 15 secs.
expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), Some("bob"));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 1);
cache.sync();
assert_eq!(cache.entry_count(), 1);
mock.increment(Duration::from_secs(10)); // 25 secs
expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
assert_eq!(cache.get(&"a"), None);
assert_eq!(cache.get(&"b"), None);
assert!(!cache.contains_key(&"a"));
assert!(!cache.contains_key(&"b"));
assert_eq!(cache.iter().count(), 0);
cache.sync();
assert!(cache.is_table_empty());
verify_notification_vec(&cache, actual, &expected);
}
#[tokio::test]
async fn test_iter() {
const NUM_KEYS: usize = 50;
fn make_value(key: usize) -> String {
format!("val: {}", key)
}
let cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.build();
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key)).await;
}
let mut key_set = std::collections::HashSet::new();
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
}
// Ensure there are no missing or duplicate keys in the iteration.
assert_eq!(key_set.len(), NUM_KEYS);
}
/// Runs 16 async tasks at the same time and ensures no deadlock occurs.
///
/// - Eight of the task will update key-values in the cache.
/// - Eight others will iterate the cache.
///
#[tokio::test]
async fn test_iter_multi_async_tasks() {
use std::collections::HashSet;
const NUM_KEYS: usize = 1024;
const NUM_TASKS: usize = 16;
fn make_value(key: usize) -> String {
format!("val: {}", key)
}
let cache = Cache::builder()
.max_capacity(2048)
.time_to_idle(Duration::from_secs(10))
.build();
// Initialize the cache.
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key)).await;
}
let rw_lock = Arc::new(tokio::sync::RwLock::<()>::default());
let write_lock = rw_lock.write().await;
let tasks = (0..NUM_TASKS)
.map(|n| {
let cache = cache.clone();
let rw_lock = Arc::clone(&rw_lock);
if n % 2 == 0 {
// This thread will update the cache.
tokio::spawn(async move {
let read_lock = rw_lock.read().await;
for key in 0..NUM_KEYS {
// TODO: Update keys in a random order?
cache.insert(key, make_value(key)).await;
}
std::mem::drop(read_lock);
})
} else {
// This thread will iterate the cache.
tokio::spawn(async move {
let read_lock = rw_lock.read().await;
let mut key_set = HashSet::new();
// let mut key_count = 0usize;
for (key, value) in &cache {
assert_eq!(value, make_value(*key));
key_set.insert(*key);
// key_count += 1;
}
// Ensure there are no missing or duplicate keys in the iteration.
assert_eq!(key_set.len(), NUM_KEYS);
std::mem::drop(read_lock);
})
}
})
.collect::<Vec<_>>();
// Let these threads to run by releasing the write lock.
std::mem::drop(write_lock);
let _ = futures_util::future::join_all(tasks).await;
// Ensure there are no missing or duplicate keys in the iteration.
let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
assert_eq!(key_set.len(), NUM_KEYS);
}
#[tokio::test]
async fn get_with() {
let cache = Cache::new(100);
const KEY: u32 = 0;
// This test will run five async tasks:
//
// Task1 will be the first task to call `get_with` for a key, so its async
// block will be evaluated and then a &str value "task1" will be inserted to
// the cache.
let task1 = {
let cache1 = cache.clone();
async move {
// Call `get_with` immediately.
let v = cache1
.get_with(KEY, async {
// Wait for 300 ms and return a &str value.
Timer::after(Duration::from_millis(300)).await;
"task1"
})
.await;
assert_eq!(v, "task1");
}
};
// Task2 will be the second task to call `get_with` for the same key, so its
// async block will not be evaluated. Once task1's async block finishes, it
// will get the value inserted by task1's async block.
let task2 = {
let cache2 = cache.clone();
async move {
// Wait for 100 ms before calling `get_with`.
Timer::after(Duration::from_millis(100)).await;
let v = cache2.get_with(KEY, async { unreachable!() }).await;
assert_eq!(v, "task1");
}
};
// Task3 will be the third task to call `get_with` for the same key. By the
// time it calls, task1's async block should have finished already and the
// value should be already inserted to the cache. So its async block will not
// be evaluated and will get the value inserted by task1's async block
// immediately.
let task3 = {
let cache3 = cache.clone();
async move {
// Wait for 400 ms before calling `get_with`.
Timer::after(Duration::from_millis(400)).await;
let v = cache3.get_with(KEY, async { unreachable!() }).await;
assert_eq!(v, "task1");
}
};
// Task4 will call `get` for the same key. It will call when task1's async
// block is still running, so it will get none for the key.
let task4 = {
let cache4 = cache.clone();
async move {
// Wait for 200 ms before calling `get`.
Timer::after(Duration::from_millis(200)).await;
let maybe_v = cache4.get(&KEY);
assert!(maybe_v.is_none());
}
};
// Task5 will call `get` for the same key. It will call after task1's async
// block finished, so it will get the value insert by task1's async block.
let task5 = {
let cache5 = cache.clone();
async move {
// Wait for 400 ms before calling `get`.
Timer::after(Duration::from_millis(400)).await;
let maybe_v = cache5.get(&KEY);
assert_eq!(maybe_v, Some("task1"));
}
};
futures_util::join!(task1, task2, task3, task4, task5);
}
#[tokio::test]
async fn get_with_by_ref() {
let cache = Cache::new(100);
const KEY: &u32 = &0;
// This test will run five async tasks:
//
// Task1 will be the first task to call `get_with_by_ref` for a key, so its async
// block will be evaluated and then a &str value "task1" will be inserted to
// the cache.
let task1 = {
let cache1 = cache.clone();
async move {
// Call `get_with_by_ref` immediately.
let v = cache1
.get_with_by_ref(KEY, async {
// Wait for 300 ms and return a &str value.
Timer::after(Duration::from_millis(300)).await;
"task1"
})
.await;
assert_eq!(v, "task1");
}
};
// Task2 will be the second task to call `get_with_by_ref` for the same key, so its
// async block will not be evaluated. Once task1's async block finishes, it
// will get the value inserted by task1's async block.
let task2 = {
let cache2 = cache.clone();
async move {
// Wait for 100 ms before calling `get_with_by_ref`.
Timer::after(Duration::from_millis(100)).await;
let v = cache2.get_with_by_ref(KEY, async { unreachable!() }).await;
assert_eq!(v, "task1");
}
};
// Task3 will be the third task to call `get_with_by_ref` for the same key. By the
// time it calls, task1's async block should have finished already and the
// value should be already inserted to the cache. So its async block will not
// be evaluated and will get the value inserted by task1's async block
// immediately.
let task3 = {
let cache3 = cache.clone();
async move {
// Wait for 400 ms before calling `get_with_by_ref`.
Timer::after(Duration::from_millis(400)).await;
let v = cache3.get_with_by_ref(KEY, async { unreachable!() }).await;
assert_eq!(v, "task1");
}
};
// Task4 will call `get` for the same key. It will call when task1's async
// block is still running, so it will get none for the key.
let task4 = {
let cache4 = cache.clone();
async move {
// Wait for 200 ms before calling `get`.
Timer::after(Duration::from_millis(200)).await;
let maybe_v = cache4.get(KEY);
assert!(maybe_v.is_none());
}
};
// Task5 will call `get` for the same key. It will call after task1's async
// block finished, so it will get the value insert by task1's async block.
let task5 = {
let cache5 = cache.clone();
async move {
// Wait for 400 ms before calling `get`.
Timer::after(Duration::from_millis(400)).await;
let maybe_v = cache5.get(KEY);
assert_eq!(maybe_v, Some("task1"));
}
};
futures_util::join!(task1, task2, task3, task4, task5);
}
#[tokio::test]
async fn get_with_if() {
let cache = Cache::new(100);
const KEY: u32 = 0;
// This test will run seven async tasks:
//
// Task1 will be the first task to call `get_with_if` for a key, so its async
// block will be evaluated and then a &str value "task1" will be inserted to
// the cache.
let task1 = {
let cache1 = cache.clone();
async move {
// Call `get_with_if` immediately.
let v = cache1
.get_with_if(
KEY,
async {
// Wait for 300 ms and return a &str value.
Timer::after(Duration::from_millis(300)).await;
"task1"
},
|_v| unreachable!(),
)
.await;
assert_eq!(v, "task1");
}
};
// Task2 will be the second task to call `get_with_if` for the same key, so
// its async block will not be evaluated. Once task1's async block finishes,
// it will get the value inserted by task1's async block.
let task2 = {
let cache2 = cache.clone();
async move {
// Wait for 100 ms before calling `get_with_if`.
Timer::after(Duration::from_millis(100)).await;
let v = cache2
.get_with_if(KEY, async { unreachable!() }, |_v| unreachable!())
.await;
assert_eq!(v, "task1");
}
};
// Task3 will be the third task to call `get_with_if` for the same key. By
// the time it calls, task1's async block should have finished already and
// the value should be already inserted to the cache. Also task3's
// `replace_if` closure returns `false`. So its async block will not be
// evaluated and will get the value inserted by task1's async block
// immediately.
let task3 = {
let cache3 = cache.clone();
async move {
// Wait for 350 ms before calling `get_with_if`.
Timer::after(Duration::from_millis(350)).await;
let v = cache3
.get_with_if(KEY, async { unreachable!() }, |v| {
assert_eq!(v, &"task1");
false
})
.await;
assert_eq!(v, "task1");
}
};
// Task4 will be the fourth task to call `get_with_if` for the same key. The
// value should have been already inserted to the cache by task1. However
// task4's `replace_if` closure returns `true`. So its async block will be
// evaluated to replace the current value.
let task4 = {
let cache4 = cache.clone();
async move {
// Wait for 400 ms before calling `get_with_if`.
Timer::after(Duration::from_millis(400)).await;
let v = cache4
.get_with_if(KEY, async { "task4" }, |v| {
assert_eq!(v, &"task1");
true
})
.await;
assert_eq!(v, "task4");
}
};
// Task5 will call `get` for the same key. It will call when task1's async
// block is still running, so it will get none for the key.
let task5 = {
let cache5 = cache.clone();
async move {
// Wait for 200 ms before calling `get`.
Timer::after(Duration::from_millis(200)).await;
let maybe_v = cache5.get(&KEY);
assert!(maybe_v.is_none());
}
};
// Task6 will call `get` for the same key. It will call after task1's async
// block finished, so it will get the value insert by task1's async block.
let task6 = {
let cache6 = cache.clone();
async move {
// Wait for 350 ms before calling `get`.
Timer::after(Duration::from_millis(350)).await;
let maybe_v = cache6.get(&KEY);
assert_eq!(maybe_v, Some("task1"));
}
};
// Task7 will call `get` for the same key. It will call after task4's async
// block finished, so it will get the value insert by task4's async block.
let task7 = {
let cache7 = cache.clone();
async move {
// Wait for 450 ms before calling `get`.
Timer::after(Duration::from_millis(450)).await;
let maybe_v = cache7.get(&KEY);
assert_eq!(maybe_v, Some("task4"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
}
// #[tokio::test]
// async fn get_with_if_by_ref() {
// let cache = Cache::new(100);
// const KEY: &u32 = &0;
// // This test will run seven async tasks:
// //
// // Task1 will be the first task to call `get_with_if_by_ref` for a key, so its async
// // block will be evaluated and then a &str value "task1" will be inserted to
// // the cache.
// let task1 = {
// let cache1 = cache.clone();
// async move {
// // Call `get_with_if_by_ref` immediately.
// let v = cache1
// .get_with_if_by_ref(
// KEY,
// async {
// // Wait for 300 ms and return a &str value.
// Timer::after(Duration::from_millis(300)).await;
// "task1"
// },
// |_v| unreachable!(),
// )
// .await;
// assert_eq!(v, "task1");
// }
// };
// // Task2 will be the second task to call `get_with_if_by_ref` for the same key, so
// // its async block will not be evaluated. Once task1's async block finishes,
// // it will get the value inserted by task1's async block.
// let task2 = {
// let cache2 = cache.clone();
// async move {
// // Wait for 100 ms before calling `get_with_if_by_ref`.
// Timer::after(Duration::from_millis(100)).await;
// let v = cache2
// .get_with_if_by_ref(KEY, async { unreachable!() }, |_v| unreachable!())
// .await;
// assert_eq!(v, "task1");
// }
// };
// // Task3 will be the third task to call `get_with_if_by_ref` for the same key. By
// // the time it calls, task1's async block should have finished already and
// // the value should be already inserted to the cache. Also task3's
// // `replace_if` closure returns `false`. So its async block will not be
// // evaluated and will get the value inserted by task1's async block
// // immediately.
// let task3 = {
// let cache3 = cache.clone();
// async move {
// // Wait for 350 ms before calling `get_with_if_by_ref`.
// Timer::after(Duration::from_millis(350)).await;
// let v = cache3
// .get_with_if_by_ref(KEY, async { unreachable!() }, |v| {
// assert_eq!(v, &"task1");
// false
// })
// .await;
// assert_eq!(v, "task1");
// }
// };
// // Task4 will be the fourth task to call `get_with_if_by_ref` for the same key. The
// // value should have been already inserted to the cache by task1. However
// // task4's `replace_if` closure returns `true`. So its async block will be
// // evaluated to replace the current value.
// let task4 = {
// let cache4 = cache.clone();
// async move {
// // Wait for 400 ms before calling `get_with_if_by_ref`.
// Timer::after(Duration::from_millis(400)).await;
// let v = cache4
// .get_with_if_by_ref(KEY, async { "task4" }, |v| {
// assert_eq!(v, &"task1");
// true
// })
// .await;
// assert_eq!(v, "task4");
// }
// };
// // Task5 will call `get` for the same key. It will call when task1's async
// // block is still running, so it will get none for the key.
// let task5 = {
// let cache5 = cache.clone();
// async move {
// // Wait for 200 ms before calling `get`.
// Timer::after(Duration::from_millis(200)).await;
// let maybe_v = cache5.get(KEY);
// assert!(maybe_v.is_none());
// }
// };
// // Task6 will call `get` for the same key. It will call after task1's async
// // block finished, so it will get the value insert by task1's async block.
// let task6 = {
// let cache6 = cache.clone();
// async move {
// // Wait for 350 ms before calling `get`.
// Timer::after(Duration::from_millis(350)).await;
// let maybe_v = cache6.get(KEY);
// assert_eq!(maybe_v, Some("task1"));
// }
// };
// // Task7 will call `get` for the same key. It will call after task4's async
// // block finished, so it will get the value insert by task4's async block.
// let task7 = {
// let cache7 = cache.clone();
// async move {
// // Wait for 450 ms before calling `get`.
// Timer::after(Duration::from_millis(450)).await;
// let maybe_v = cache7.get(KEY);
// assert_eq!(maybe_v, Some("task4"));
// }
// };
// futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
// }
#[tokio::test]
async fn try_get_with() {
use std::sync::Arc;
// Note that MyError does not implement std::error::Error trait
// like anyhow::Error.
#[derive(Debug)]
pub struct MyError(String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = Cache::new(100);
const KEY: u32 = 0;
// This test will run eight async tasks:
//
// Task1 will be the first task to call `get_with` for a key, so its async
// block will be evaluated and then an error will be returned. Nothing will
// be inserted to the cache.
let task1 = {
let cache1 = cache.clone();
async move {
// Call `try_get_with` immediately.
let v = cache1
.try_get_with(KEY, async {
// Wait for 300 ms and return an error.
Timer::after(Duration::from_millis(300)).await;
Err(MyError("task1 error".into()))
})
.await;
assert!(v.is_err());
}
};
// Task2 will be the second task to call `get_with` for the same key, so its
// async block will not be evaluated. Once task1's async block finishes, it
// will get the same error value returned by task1's async block.
let task2 = {
let cache2 = cache.clone();
async move {
// Wait for 100 ms before calling `try_get_with`.
Timer::after(Duration::from_millis(100)).await;
let v: MyResult<_> = cache2.try_get_with(KEY, async { unreachable!() }).await;
assert!(v.is_err());
}
};
// Task3 will be the third task to call `get_with` for the same key. By the
// time it calls, task1's async block should have finished already, but the
// key still does not exist in the cache. So its async block will be
// evaluated and then an okay &str value will be returned. That value will be
// inserted to the cache.
let task3 = {
let cache3 = cache.clone();
async move {
// Wait for 400 ms before calling `try_get_with`.
Timer::after(Duration::from_millis(400)).await;
let v: MyResult<_> = cache3
.try_get_with(KEY, async {
// Wait for 300 ms and return an Ok(&str) value.
Timer::after(Duration::from_millis(300)).await;
Ok("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task4 will be the fourth task to call `get_with` for the same key. So its
// async block will not be evaluated. Once task3's async block finishes, it
// will get the same okay &str value.
let task4 = {
let cache4 = cache.clone();
async move {
// Wait for 500 ms before calling `try_get_with`.
Timer::after(Duration::from_millis(500)).await;
let v: MyResult<_> = cache4.try_get_with(KEY, async { unreachable!() }).await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task5 will be the fifth task to call `get_with` for the same key. So its
// async block will not be evaluated. By the time it calls, task3's async
// block should have finished already, so its async block will not be
// evaluated and will get the value insert by task3's async block
// immediately.
let task5 = {
let cache5 = cache.clone();
async move {
// Wait for 800 ms before calling `try_get_with`.
Timer::after(Duration::from_millis(800)).await;
let v: MyResult<_> = cache5.try_get_with(KEY, async { unreachable!() }).await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task6 will call `get` for the same key. It will call when task1's async
// block is still running, so it will get none for the key.
let task6 = {
let cache6 = cache.clone();
async move {
// Wait for 200 ms before calling `get`.
Timer::after(Duration::from_millis(200)).await;
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
}
};
// Task7 will call `get` for the same key. It will call after task1's async
// block finished with an error. So it will get none for the key.
let task7 = {
let cache7 = cache.clone();
async move {
// Wait for 400 ms before calling `get`.
Timer::after(Duration::from_millis(400)).await;
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
}
};
// Task8 will call `get` for the same key. It will call after task3's async
// block finished, so it will get the value insert by task3's async block.
let task8 = {
let cache8 = cache.clone();
async move {
// Wait for 800 ms before calling `get`.
Timer::after(Duration::from_millis(800)).await;
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("task3"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
}
#[tokio::test]
async fn try_get_with_by_ref() {
use std::sync::Arc;
// Note that MyError does not implement std::error::Error trait
// like anyhow::Error.
#[derive(Debug)]
pub struct MyError(String);
type MyResult<T> = Result<T, Arc<MyError>>;
let cache = Cache::new(100);
const KEY: &u32 = &0;
// This test will run eight async tasks:
//
// Task1 will be the first task to call `try_get_with_by_ref` for a key, so its async
// block will be evaluated and then an error will be returned. Nothing will
// be inserted to the cache.
let task1 = {
let cache1 = cache.clone();
async move {
// Call `try_get_with_by_ref` immediately.
let v = cache1
.try_get_with_by_ref(KEY, async {
// Wait for 300 ms and return an error.
Timer::after(Duration::from_millis(300)).await;
Err(MyError("task1 error".into()))
})
.await;
assert!(v.is_err());
}
};
// Task2 will be the second task to call `get_with` for the same key, so its
// async block will not be evaluated. Once task1's async block finishes, it
// will get the same error value returned by task1's async block.
let task2 = {
let cache2 = cache.clone();
async move {
// Wait for 100 ms before calling `try_get_with_by_ref`.
Timer::after(Duration::from_millis(100)).await;
let v: MyResult<_> = cache2
.try_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert!(v.is_err());
}
};
// Task3 will be the third task to call `get_with` for the same key. By the
// time it calls, task1's async block should have finished already, but the
// key still does not exist in the cache. So its async block will be
// evaluated and then an okay &str value will be returned. That value will be
// inserted to the cache.
let task3 = {
let cache3 = cache.clone();
async move {
// Wait for 400 ms before calling `try_get_with_by_ref`.
Timer::after(Duration::from_millis(400)).await;
let v: MyResult<_> = cache3
.try_get_with_by_ref(KEY, async {
// Wait for 300 ms and return an Ok(&str) value.
Timer::after(Duration::from_millis(300)).await;
Ok("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task4 will be the fourth task to call `get_with` for the same key. So its
// async block will not be evaluated. Once task3's async block finishes, it
// will get the same okay &str value.
let task4 = {
let cache4 = cache.clone();
async move {
// Wait for 500 ms before calling `try_get_with_by_ref`.
Timer::after(Duration::from_millis(500)).await;
let v: MyResult<_> = cache4
.try_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task5 will be the fifth task to call `get_with` for the same key. So its
// async block will not be evaluated. By the time it calls, task3's async
// block should have finished already, so its async block will not be
// evaluated and will get the value insert by task3's async block
// immediately.
let task5 = {
let cache5 = cache.clone();
async move {
// Wait for 800 ms before calling `try_get_with_by_ref`.
Timer::after(Duration::from_millis(800)).await;
let v: MyResult<_> = cache5
.try_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task6 will call `get` for the same key. It will call when task1's async
// block is still running, so it will get none for the key.
let task6 = {
let cache6 = cache.clone();
async move {
// Wait for 200 ms before calling `get`.
Timer::after(Duration::from_millis(200)).await;
let maybe_v = cache6.get(KEY);
assert!(maybe_v.is_none());
}
};
// Task7 will call `get` for the same key. It will call after task1's async
// block finished with an error. So it will get none for the key.
let task7 = {
let cache7 = cache.clone();
async move {
// Wait for 400 ms before calling `get`.
Timer::after(Duration::from_millis(400)).await;
let maybe_v = cache7.get(KEY);
assert!(maybe_v.is_none());
}
};
// Task8 will call `get` for the same key. It will call after task3's async
// block finished, so it will get the value insert by task3's async block.
let task8 = {
let cache8 = cache.clone();
async move {
// Wait for 800 ms before calling `get`.
Timer::after(Duration::from_millis(800)).await;
let maybe_v = cache8.get(KEY);
assert_eq!(maybe_v, Some("task3"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
}
#[tokio::test]
async fn optionally_get_with() {
let cache = Cache::new(100);
const KEY: u32 = 0;
// This test will run eight async tasks:
//
// Task1 will be the first task to call `optionally_get_with` for a key,
// so its async block will be evaluated and then an None will be
// returned. Nothing will be inserted to the cache.
let task1 = {
let cache1 = cache.clone();
async move {
// Call `try_get_with` immediately.
let v = cache1
.optionally_get_with(KEY, async {
// Wait for 300 ms and return an None.
Timer::after(Duration::from_millis(300)).await;
None
})
.await;
assert!(v.is_none());
}
};
// Task2 will be the second task to call `optionally_get_with` for the same key, so its
// async block will not be evaluated. Once task1's async block finishes, it
// will get the same error value returned by task1's async block.
let task2 = {
let cache2 = cache.clone();
async move {
// Wait for 100 ms before calling `optionally_get_with`.
Timer::after(Duration::from_millis(100)).await;
let v = cache2
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert!(v.is_none());
}
};
// Task3 will be the third task to call `optionally_get_with` for the
// same key. By the time it calls, task1's async block should have
// finished already, but the key still does not exist in the cache. So
// its async block will be evaluated and then an okay &str value will be
// returned. That value will be inserted to the cache.
let task3 = {
let cache3 = cache.clone();
async move {
// Wait for 400 ms before calling `optionally_get_with`.
Timer::after(Duration::from_millis(400)).await;
let v = cache3
.optionally_get_with(KEY, async {
// Wait for 300 ms and return an Some(&str) value.
Timer::after(Duration::from_millis(300)).await;
Some("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task4 will be the fourth task to call `optionally_get_with` for the
// same key. So its async block will not be evaluated. Once task3's
// async block finishes, it will get the same okay &str value.
let task4 = {
let cache4 = cache.clone();
async move {
// Wait for 500 ms before calling `try_get_with`.
Timer::after(Duration::from_millis(500)).await;
let v = cache4
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task5 will be the fifth task to call `optionally_get_with` for the
// same key. So its async block will not be evaluated. By the time it
// calls, task3's async block should have finished already, so its async
// block will not be evaluated and will get the value insert by task3's
// async block immediately.
let task5 = {
let cache5 = cache.clone();
async move {
// Wait for 800 ms before calling `optionally_get_with`.
Timer::after(Duration::from_millis(800)).await;
let v = cache5
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task6 will call `get` for the same key. It will call when task1's async
// block is still running, so it will get none for the key.
let task6 = {
let cache6 = cache.clone();
async move {
// Wait for 200 ms before calling `get`.
Timer::after(Duration::from_millis(200)).await;
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
}
};
// Task7 will call `get` for the same key. It will call after task1's async
// block finished with an error. So it will get none for the key.
let task7 = {
let cache7 = cache.clone();
async move {
// Wait for 400 ms before calling `get`.
Timer::after(Duration::from_millis(400)).await;
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
}
};
// Task8 will call `get` for the same key. It will call after task3's async
// block finished, so it will get the value insert by task3's async block.
let task8 = {
let cache8 = cache.clone();
async move {
// Wait for 800 ms before calling `get`.
Timer::after(Duration::from_millis(800)).await;
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("task3"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
}
#[tokio::test]
async fn optionally_get_with_by_ref() {
let cache = Cache::new(100);
const KEY: &u32 = &0;
// This test will run eight async tasks:
//
// Task1 will be the first task to call `optionally_get_with_by_ref` for a key,
// so its async block will be evaluated and then an None will be
// returned. Nothing will be inserted to the cache.
let task1 = {
let cache1 = cache.clone();
async move {
// Call `try_get_with` immediately.
let v = cache1
.optionally_get_with_by_ref(KEY, async {
// Wait for 300 ms and return an None.
Timer::after(Duration::from_millis(300)).await;
None
})
.await;
assert!(v.is_none());
}
};
// Task2 will be the second task to call `optionally_get_with_by_ref` for the same key, so its
// async block will not be evaluated. Once task1's async block finishes, it
// will get the same error value returned by task1's async block.
let task2 = {
let cache2 = cache.clone();
async move {
// Wait for 100 ms before calling `optionally_get_with_by_ref`.
Timer::after(Duration::from_millis(100)).await;
let v = cache2
.optionally_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert!(v.is_none());
}
};
// Task3 will be the third task to call `optionally_get_with_by_ref` for the
// same key. By the time it calls, task1's async block should have
// finished already, but the key still does not exist in the cache. So
// its async block will be evaluated and then an okay &str value will be
// returned. That value will be inserted to the cache.
let task3 = {
let cache3 = cache.clone();
async move {
// Wait for 400 ms before calling `optionally_get_with_by_ref`.
Timer::after(Duration::from_millis(400)).await;
let v = cache3
.optionally_get_with_by_ref(KEY, async {
// Wait for 300 ms and return an Some(&str) value.
Timer::after(Duration::from_millis(300)).await;
Some("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task4 will be the fourth task to call `optionally_get_with_by_ref` for the
// same key. So its async block will not be evaluated. Once task3's
// async block finishes, it will get the same okay &str value.
let task4 = {
let cache4 = cache.clone();
async move {
// Wait for 500 ms before calling `try_get_with`.
Timer::after(Duration::from_millis(500)).await;
let v = cache4
.optionally_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task5 will be the fifth task to call `optionally_get_with_by_ref` for the
// same key. So its async block will not be evaluated. By the time it
// calls, task3's async block should have finished already, so its async
// block will not be evaluated and will get the value insert by task3's
// async block immediately.
let task5 = {
let cache5 = cache.clone();
async move {
// Wait for 800 ms before calling `optionally_get_with_by_ref`.
Timer::after(Duration::from_millis(800)).await;
let v = cache5
.optionally_get_with_by_ref(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};
// Task6 will call `get` for the same key. It will call when task1's async
// block is still running, so it will get none for the key.
let task6 = {
let cache6 = cache.clone();
async move {
// Wait for 200 ms before calling `get`.
Timer::after(Duration::from_millis(200)).await;
let maybe_v = cache6.get(KEY);
assert!(maybe_v.is_none());
}
};
// Task7 will call `get` for the same key. It will call after task1's async
// block finished with an error. So it will get none for the key.
let task7 = {
let cache7 = cache.clone();
async move {
// Wait for 400 ms before calling `get`.
Timer::after(Duration::from_millis(400)).await;
let maybe_v = cache7.get(KEY);
assert!(maybe_v.is_none());
}
};
// Task8 will call `get` for the same key. It will call after task3's async
// block finished, so it will get the value insert by task3's async block.
let task8 = {
let cache8 = cache.clone();
async move {
// Wait for 800 ms before calling `get`.
Timer::after(Duration::from_millis(800)).await;
let maybe_v = cache8.get(KEY);
assert_eq!(maybe_v, Some("task3"));
}
};
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
}
#[tokio::test]
// https://github.com/moka-rs/moka/issues/43
async fn handle_panic_in_get_with() {
use tokio::time::{sleep, Duration};
let cache = Cache::new(16);
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
{
let cache_ref = cache.clone();
let semaphore_ref = semaphore.clone();
tokio::task::spawn(async move {
let _ = cache_ref
.get_with(1, async move {
semaphore_ref.add_permits(1);
sleep(Duration::from_millis(50)).await;
panic!("Panic during try_get_with");
})
.await;
});
}
let _ = semaphore.acquire().await.expect("semaphore acquire failed");
assert_eq!(cache.get_with(1, async { 5 }).await, 5);
}
#[tokio::test]
// https://github.com/moka-rs/moka/issues/43
async fn handle_panic_in_try_get_with() {
use tokio::time::{sleep, Duration};
let cache = Cache::new(16);
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
{
let cache_ref = cache.clone();
let semaphore_ref = semaphore.clone();
tokio::task::spawn(async move {
let _ = cache_ref
.try_get_with(1, async move {
semaphore_ref.add_permits(1);
sleep(Duration::from_millis(50)).await;
panic!("Panic during try_get_with");
})
.await as Result<_, Arc<Infallible>>;
});
}
let _ = semaphore.acquire().await.expect("semaphore acquire failed");
assert_eq!(
cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
Ok(5)
);
}
#[tokio::test]
// https://github.com/moka-rs/moka/issues/59
async fn abort_get_with() {
use tokio::time::{sleep, Duration};
let cache = Cache::new(16);
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
let handle;
{
let cache_ref = cache.clone();
let semaphore_ref = semaphore.clone();
handle = tokio::task::spawn(async move {
let _ = cache_ref
.get_with(1, async move {
semaphore_ref.add_permits(1);
sleep(Duration::from_millis(50)).await;
unreachable!();
})
.await;
});
}
let _ = semaphore.acquire().await.expect("semaphore acquire failed");
handle.abort();
assert_eq!(cache.get_with(1, async { 5 }).await, 5);
}
#[tokio::test]
// https://github.com/moka-rs/moka/issues/59
async fn abort_try_get_with() {
use tokio::time::{sleep, Duration};
let cache = Cache::new(16);
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
let handle;
{
let cache_ref = cache.clone();
let semaphore_ref = semaphore.clone();
handle = tokio::task::spawn(async move {
let _ = cache_ref
.try_get_with(1, async move {
semaphore_ref.add_permits(1);
sleep(Duration::from_millis(50)).await;
unreachable!();
})
.await as Result<_, Arc<Infallible>>;
});
}
let _ = semaphore.acquire().await.expect("semaphore acquire failed");
handle.abort();
assert_eq!(
cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
Ok(5)
);
}
#[tokio::test]
async fn test_removal_notifications() {
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
cache.insert('a', "alice").await;
cache.invalidate(&'a').await;
expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
cache.sync();
assert_eq!(cache.entry_count(), 0);
cache.insert('b', "bob").await;
cache.insert('c', "cathy").await;
cache.insert('d', "david").await;
cache.sync();
assert_eq!(cache.entry_count(), 3);
// This will be rejected due to the size constraint.
cache.insert('e', "emily").await;
expected.push((Arc::new('e'), "emily", RemovalCause::Size));
cache.sync();
assert_eq!(cache.entry_count(), 3);
// Raise the popularity of 'e' so it will be accepted next time.
cache.get(&'e');
cache.sync();
// Retry.
cache.insert('e', "eliza").await;
// and the LRU entry will be evicted.
expected.push((Arc::new('b'), "bob", RemovalCause::Size));
cache.sync();
assert_eq!(cache.entry_count(), 3);
// Replace an existing entry.
cache.insert('d', "dennis").await;
expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
cache.sync();
assert_eq!(cache.entry_count(), 3);
verify_notification_vec(&cache, actual, &expected);
}
#[tokio::test]
async fn test_removal_notifications_with_updates() {
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| a1.lock().push((k, v, cause));
// Create a cache with the eviction listener and also TTL and TTI.
let mut cache = Cache::builder()
.eviction_listener_with_queued_delivery_mode(listener)
.time_to_live(Duration::from_secs(7))
.time_to_idle(Duration::from_secs(5))
.build();
cache.reconfigure_for_testing();
let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
// Make the cache exterior immutable.
let cache = cache;
cache.insert("alice", "a0").await;
cache.sync();
// Now alice (a0) has been expired by the idle timeout (TTI).
mock.increment(Duration::from_secs(6));
expected.push((Arc::new("alice"), "a0", RemovalCause::Expired));
assert_eq!(cache.get(&"alice"), None);
// We have not ran sync after the expiration of alice (a0), so it is
// still in the cache.
assert_eq!(cache.entry_count(), 1);
// Re-insert alice with a different value. Since alice (a0) is still
// in the cache, this is actually a replace operation rather than an
// insert operation. We want to verify that the RemovalCause of a0 is
// Expired, not Replaced.
cache.insert("alice", "a1").await;
cache.sync();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), Some("a1"));
cache.sync();
// Now alice has been expired by time-to-live (TTL).
mock.increment(Duration::from_secs(4));
expected.push((Arc::new("alice"), "a1", RemovalCause::Expired));
assert_eq!(cache.get(&"alice"), None);
// But, again, it is still in the cache.
assert_eq!(cache.entry_count(), 1);
// Re-insert alice with a different value and verify that the
// RemovalCause of a1 is Expired (not Replaced).
cache.insert("alice", "a2").await;
cache.sync();
assert_eq!(cache.entry_count(), 1);
// Now alice (a2) has been expired by the idle timeout.
mock.increment(Duration::from_secs(6));
expected.push((Arc::new("alice"), "a2", RemovalCause::Expired));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
// This invalidate will internally remove alice (a2).
cache.invalidate(&"alice").await;
cache.sync();
assert_eq!(cache.entry_count(), 0);
// Re-insert, and this time, make it expired by the TTL.
cache.insert("alice", "a3").await;
cache.sync();
mock.increment(Duration::from_secs(4));
assert_eq!(cache.get(&"alice"), Some("a3"));
cache.sync();
mock.increment(Duration::from_secs(4));
expected.push((Arc::new("alice"), "a3", RemovalCause::Expired));
assert_eq!(cache.get(&"alice"), None);
assert_eq!(cache.entry_count(), 1);
// This invalidate will internally remove alice (a2).
cache.invalidate(&"alice").await;
cache.sync();
assert_eq!(cache.entry_count(), 0);
verify_notification_vec(&cache, actual, &expected);
}
// NOTE: To enable the panic logging, run the following command:
//
// RUST_LOG=moka=info cargo test --features 'future, logging' -- \
// future::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
//
#[tokio::test]
async fn recover_from_panicking_eviction_listener() {
#[cfg(feature = "logging")]
let _ = env_logger::builder().is_test(true).try_init();
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
// Create an eviction listener that panics when it see
// a value "panic now!".
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| {
if v == "panic now!" {
panic!("Panic now!");
}
a1.lock().push((k, v, cause))
};
// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.name("My Future Cache")
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
// Insert an okay value.
cache.insert("alice", "a0").await;
cache.sync();
// Insert a value that will cause the eviction listener to panic.
cache.insert("alice", "panic now!").await;
expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
cache.sync();
// Insert an okay value. This will replace the previous
// value "panic now!" so the eviction listener will panic.
cache.insert("alice", "a2").await;
cache.sync();
// No more removal notification should be sent.
// Invalidate the okay value.
cache.invalidate(&"alice").await;
cache.sync();
verify_notification_vec(&cache, actual, &expected);
}
// This test ensures that the `contains_key`, `get` and `invalidate` can use
// borrowed form `&[u8]` for key with type `Vec<u8>`.
// https://github.com/moka-rs/moka/issues/166
#[tokio::test]
async fn borrowed_forms_of_key() {
let cache: Cache<Vec<u8>, ()> = Cache::new(1);
let key = vec![1_u8];
cache.insert(key.clone(), ()).await;
// key as &Vec<u8>
let key_v: &Vec<u8> = &key;
assert!(cache.contains_key(key_v));
assert_eq!(cache.get(key_v), Some(()));
cache.invalidate(key_v).await;
cache.insert(key, ()).await;
// key as &[u8]
let key_s: &[u8] = &[1_u8];
assert!(cache.contains_key(key_s));
assert_eq!(cache.get(key_s), Some(()));
cache.invalidate(key_s).await;
}
#[tokio::test]
async fn drop_value_immediately_after_eviction() {
use crate::common::test_utils::{Counters, Value};
const MAX_CAPACITY: u32 = 500;
const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
let counters = Arc::new(Counters::default());
let counters1 = Arc::clone(&counters);
let listener = move |_k, _v, cause| match cause {
RemovalCause::Size => counters1.incl_evicted(),
RemovalCause::Explicit => counters1.incl_invalidated(),
_ => (),
};
let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY as u64)
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();
// Make the cache exterior immutable.
let cache = cache;
for key in 0..KEYS {
let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
cache.insert(key, value).await;
counters.incl_inserted();
cache.sync();
}
let eviction_count = KEYS - MAX_CAPACITY;
// Retries will be needed when testing in a QEMU VM.
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));
if counters.evicted() != eviction_count || counters.value_dropped() != eviction_count {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(counters.evicted(), eviction_count, "Retries exhausted");
assert_eq!(
counters.value_dropped(),
eviction_count,
"Retries exhausted"
);
}
}
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), 0, "invalidated");
assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
break;
}
for key in 0..KEYS {
cache.invalidate(&key).await;
cache.sync();
}
let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));
if counters.invalidated() != MAX_CAPACITY || counters.value_dropped() != KEYS {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(counters.invalidated(), MAX_CAPACITY, "Retries exhausted");
assert_eq!(counters.value_dropped(), KEYS, "Retries exhausted");
}
}
assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
break;
}
std::mem::drop(cache);
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
}
#[tokio::test]
async fn test_debug_format() {
let cache = Cache::new(10);
cache.insert('a', "alice").await;
cache.insert('b', "bob").await;
cache.insert('c', "cindy").await;
let debug_str = format!("{:?}", cache);
assert!(debug_str.starts_with('{'));
assert!(debug_str.contains(r#"'a': "alice""#));
assert!(debug_str.contains(r#"'b': "bob""#));
assert!(debug_str.contains(r#"'c': "cindy""#));
assert!(debug_str.ends_with('}'));
}
type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
fn verify_notification_vec<K, V, S>(
cache: &Cache<K, V, S>,
actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
expected: &[NotificationTuple<K, V>],
) where
K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
{
// Retries will be needed when testing in a QEMU VM.
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));
let actual = &*actual.lock();
if actual.len() != expected.len() {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(actual.len(), expected.len(), "Retries exhausted");
}
}
for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
assert_eq!(actual, expected, "expected[{}]", i);
}
break;
}
}
}