pub struct Cache<K, V, S = RandomState> { /* private fields */ }
Available on crate feature future only.
Expand description

A thread-safe, futures-aware concurrent in-memory cache.

Cache supports full concurrency of retrievals and a high expected concurrency for updates. It can be accessed inside and outside of asynchronous contexts.

Cache utilizes a lock-free concurrent hash table as the central key-value storage. Cache performs a best-effort bounding of the map using an entry replacement algorithm to determine which entries to evict when the capacity is exceeded.

To use this cache, enable a crate feature called “future”.

Examples

Cache entries are manually added using an insert method, and are stored in the cache until either evicted or manually invalidated:

Here’s an example of reading and updating a cache by using multiple asynchronous tasks with Tokio runtime:

 // Cargo.toml
 //
 // [dependencies]
 // moka = { version = "0.8", features = ["future"] }
 // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
 // futures-util = "0.3"

 use moka::future::Cache;

 #[tokio::main]
 async fn main() {
     const NUM_TASKS: usize = 16;
     const NUM_KEYS_PER_TASK: usize = 64;

     fn value(n: usize) -> String {
         format!("value {}", n)
     }

     // Create a cache that can store up to 10,000 entries.
     let cache = Cache::new(10_000);

     // Spawn async tasks and write to and read from the cache.
     let tasks: Vec<_> = (0..NUM_TASKS)
         .map(|i| {
             // To share the same cache across the async tasks, clone it.
             // This is a cheap operation.
             let my_cache = cache.clone();
             let start = i * NUM_KEYS_PER_TASK;
             let end = (i + 1) * NUM_KEYS_PER_TASK;

             tokio::spawn(async move {
                 // Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
                 for key in start..end {
                     // insert() is an async method, so await it.
                     my_cache.insert(key, value(key)).await;
                     // get() returns Option<String>, a clone of the stored value.
                     assert_eq!(my_cache.get(&key), Some(value(key)));
                 }

                 // Invalidate every 4 element of the inserted entries.
                 for key in (start..end).step_by(4) {
                     // invalidate() is an async method, so await it.
                     my_cache.invalidate(&key).await;
                 }
             })
         })
         .collect();

     // Wait for all tasks to complete.
     futures_util::future::join_all(tasks).await;

     // Verify the result.
     for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
         if key % 4 == 0 {
             assert_eq!(cache.get(&key), None);
         } else {
             assert_eq!(cache.get(&key), Some(value(key)));
         }
     }
 }

If you want to atomically initialize and insert a value when the key is not present, you might want to check other insertion methods get_with and try_get_with.

Avoiding to clone the value at get

The return type of get method is Option<V> instead of Option<&V>. Every time get is called for an existing key, it creates a clone of the stored value V and returns it. This is because the Cache allows concurrent updates from threads so a value stored in the cache can be dropped or replaced at any time by any other thread. get cannot return a reference &V as it is impossible to guarantee the value outlives the reference.

If you want to store values that will be expensive to clone, wrap them by std::sync::Arc before storing in a cache. Arc is a thread-safe reference-counted pointer and its clone() method is cheap.

Size-based Eviction

// Cargo.toml
//
// [dependencies]
// moka = { version = "0.8", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
// futures-util = "0.3"

use std::convert::TryInto;
use moka::future::Cache;

#[tokio::main]
async fn main() {
    // Evict based on the number of entries in the cache.
    let cache = Cache::builder()
        // Up to 10,000 entries.
        .max_capacity(10_000)
        // Create the cache.
        .build();
    cache.insert(1, "one".to_string()).await;

    // Evict based on the byte length of strings in the cache.
    let cache = Cache::builder()
        // A weigher closure takes &K and &V and returns a u32
        // representing the relative size of the entry.
        .weigher(|_key, value: &String| -> u32 {
            value.len().try_into().unwrap_or(u32::MAX)
        })
        // This cache will hold up to 32MiB of values.
        .max_capacity(32 * 1024 * 1024)
        .build();
    cache.insert(2, "two".to_string()).await;
}

If your cache should not grow beyond a certain size, use the max_capacity method of the CacheBuilder to set the upper bound. The cache will try to evict entries that have not been used recently or very often.

At the cache creation time, a weigher closure can be set by the weigher method of the CacheBuilder. A weigher closure takes &K and &V as the arguments and returns a u32 representing the relative size of the entry:

  • If the weigher is not set, the cache will treat each entry has the same size of 1. This means the cache will be bounded by the number of entries.
  • If the weigher is set, the cache will call the weigher to calculate the weighted size (relative size) on an entry. This means the cache will be bounded by the total weighted size of entries.

Note that weighted sizes are not used when making eviction selections.

Time-based Expirations

Cache supports the following expiration policies:

  • Time to live: A cached entry will be expired after the specified duration past from insert.
  • Time to idle: A cached entry will be expired after the specified duration past from get or insert.
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.8", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
// futures-util = "0.3"

use moka::future::Cache;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let cache = Cache::builder()
        // Time to live (TTL): 30 minutes
        .time_to_live(Duration::from_secs(30 * 60))
        // Time to idle (TTI):  5 minutes
        .time_to_idle(Duration::from_secs( 5 * 60))
        // Create the cache.
        .build();

    // This entry will expire after 5 minutes (TTI) if there is no get().
    cache.insert(0, "zero").await;

    // This get() will extend the entry life for another 5 minutes.
    cache.get(&0);

    // Even though we keep calling get(), the entry will expire
    // after 30 minutes (TTL) from the insert().
}

Thread Safety

All methods provided by the Cache are considered thread-safe, and can be safely accessed by multiple concurrent threads.

  • Cache<K, V, S> requires trait bounds Send, Sync and 'static for K (key), V (value) and S (hasher state).
  • Cache<K, V, S> will implement Send and Sync.

Sharing a cache across asynchronous tasks

To share a cache across async tasks (or OS threads), do one of the followings:

  • Create a clone of the cache by calling its clone method and pass it to other task.
  • Wrap the cache by a sync::OnceCell or sync::Lazy from once_cell create, and set it to a static variable.

Cloning is a cheap operation for Cache as it only creates thread-safe reference-counted pointers to the internal data structures.

Hashing Algorithm

By default, Cache uses a hashing algorithm selected to provide resistance against HashDoS attacks. It will be the same one used by std::collections::HashMap, which is currently SipHash 1-3.

While SipHash’s performance is very competitive for medium sized keys, other hashing algorithms will outperform it for small keys such as integers as well as large keys such as long strings. However those algorithms will typically not protect against attacks such as HashDoS.

The hashing algorithm can be replaced on a per-Cache basis using the build_with_hasher method of the CacheBuilder. Many alternative algorithms are available on crates.io, such as the aHash crate.

Implementations

Constructs a new Cache<K, V> that will store up to the max_capacity.

To adjust various configuration knobs such as initial_capacity or time_to_live, use the CacheBuilder.

Returns a CacheBuilder, which can builds a Cache with various configuration knobs.

Returns true if the cache contains a value for the key.

Unlike the get method, this method is not considered a cache read operation, so it does not update the historic popularity estimator or reset the idle timer for the key.

The key may be any borrowed form of the cache’s key type, but Hash and Eq on the borrowed form must match those for the key type.

Returns a clone of the value corresponding to the key.

If you want to store values that will be expensive to clone, wrap them by std::sync::Arc before storing in a cache. Arc is a thread-safe reference-counted pointer and its clone() method is cheap.

The key may be any borrowed form of the cache’s key type, but Hash and Eq on the borrowed form must match those for the key type.

👎 Deprecated since 0.8.0:

Replaced with get_with

Deprecated, replaced with get_with

👎 Deprecated since 0.8.0:

Replaced with try_get_with

Deprecated, replaced with try_get_with

Ensures the value of the key exists by inserting the output of the init future if not exist, and returns a clone of the value.

This method prevents to resolve the init future multiple times on the same key even if the method is concurrently called by many async tasks; only one of the calls resolves its future, and other calls wait for that future to complete.

Example
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.8", features = ["future"] }
// futures-util = "0.3"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
    let cache = Cache::new(100);

    // Spawn four async tasks.
    let tasks: Vec<_> = (0..4_u8)
        .map(|task_id| {
            let my_cache = cache.clone();
            tokio::spawn(async move {
                println!("Task {} started.", task_id);

                // Insert and get the value for key1. Although all four async tasks
                // will call `get_with` at the same time, the `init` async
                // block must be resolved only once.
                let value = my_cache
                    .get_with("key1", async move {
                        println!("Task {} inserting a value.", task_id);
                        Arc::new(vec![0u8; TEN_MIB])
                    })
                    .await;

                // Ensure the value exists now.
                assert_eq!(value.len(), TEN_MIB);
                assert!(my_cache.get(&"key1").is_some());

                println!("Task {} got the value. (len: {})", task_id, value.len());
            })
        })
        .collect();

    // Run all tasks concurrently and wait for them to complete.
    futures_util::future::join_all(tasks).await;
}

A Sample Result

  • The init future (async black) was resolved exactly once by task 3.
  • Other tasks were blocked until task 3 inserted the value.
Task 0 started.
Task 3 started.
Task 1 started.
Task 2 started.
Task 3 inserting a value.
Task 3 got the value. (len: 10485760)
Task 0 got the value. (len: 10485760)
Task 1 got the value. (len: 10485760)
Task 2 got the value. (len: 10485760)
Panics

This method panics when the init future has been panicked. When it happens, only the caller whose init future panicked will get the panic (e.g. only task 3 in the above sample). If there are other calls in progress (e.g. task 0, 1 and 2 above), this method will restart and resolve one of the remaining init futures.

Works like get_with, but takes an additional replace_if closure.

This method will resolve the init feature and insert the output to the cache when:

  • The key does not exist.
  • Or, replace_if closure returns true.

Try to ensure the value of the key exists by inserting an Ok output of the init future if not exist, and returns a clone of the value or the Err produced by the future.

This method prevents to resolve the init future multiple times on the same key even if the method is concurrently called by many async tasks; only one of the calls resolves its future (as long as these futures return the same error type), and other calls wait for that future to complete.

Example
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.8", features = ["future"] }
// futures-util = "0.3"
// reqwest = "0.11"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;

// This async function tries to get HTML from the given URI.
async fn get_html(task_id: u8, uri: &str) -> Result<String, reqwest::Error> {
    println!("get_html() called by task {}.", task_id);
    Ok(reqwest::get(uri).await?.text().await?)
}

#[tokio::main]
async fn main() {
    let cache = Cache::new(100);

    // Spawn four async tasks.
    let tasks: Vec<_> = (0..4_u8)
        .map(|task_id| {
            let my_cache = cache.clone();
            tokio::spawn(async move {
                println!("Task {} started.", task_id);

                // Try to insert and get the value for key1. Although
                // all four async tasks will call `try_get_with`
                // at the same time, get_html() must be called only once.
                let value = my_cache
                    .try_get_with(
                        "key1",
                        get_html(task_id, "https://www.rust-lang.org"),
                    ).await;

                // Ensure the value exists now.
                assert!(value.is_ok());
                assert!(my_cache.get(&"key1").is_some());

                println!(
                    "Task {} got the value. (len: {})",
                    task_id,
                    value.unwrap().len()
                );
            })
        })
        .collect();

    // Run all tasks concurrently and wait for them to complete.
    futures_util::future::join_all(tasks).await;
}

A Sample Result

  • get_html() was called exactly once by task 2.
  • Other tasks were blocked until task 2 inserted the value.
Task 1 started.
Task 0 started.
Task 2 started.
Task 3 started.
get_html() called by task 2.
Task 2 got the value. (len: 19419)
Task 1 got the value. (len: 19419)
Task 0 got the value. (len: 19419)
Task 3 got the value. (len: 19419)
Panics

This method panics when the init future has been panicked. When it happens, only the caller whose init future panicked will get the panic (e.g. only task 2 in the above sample). If there are other calls in progress (e.g. task 0, 1 and 3 above), this method will restart and resolve one of the remaining init futures.

Inserts a key-value pair into the cache.

If the cache has this key present, the value is updated.

Discards any cached value for the key.

The key may be any borrowed form of the cache’s key type, but Hash and Eq on the borrowed form must match those for the key type.

Discards all cached values.

This method returns immediately and a background thread will evict all the cached values inserted before the time when this method was called. It is guaranteed that the get method must not return these invalidated values even if they have not been evicted.

Like the invalidate method, this method does not clear the historic popularity estimator of keys so that it retains the client activities of trying to retrieve an item.

Discards cached values that satisfy a predicate.

invalidate_entries_if takes a closure that returns true or false. This method returns immediately and a background thread will apply the closure to each cached value inserted before the time when invalidate_entries_if was called. If the closure returns true on a value, that value will be evicted from the cache.

Also the get method will apply the closure to a value to determine if it should have been invalidated. Therefore, it is guaranteed that the get method must not return invalidated values.

Note that you must call CacheBuilder::support_invalidation_closures at the cache creation time as the cache needs to maintain additional internal data structures to support this method. Otherwise, calling this method will fail with a PredicateError::InvalidationClosuresDisabled.

Like the invalidate method, this method does not clear the historic popularity estimator of keys so that it retains the client activities of trying to retrieve an item.

Creates an iterator visiting all key-value pairs in arbitrary order. The iterator element type is (Arc<K>, V), where V is a clone of a stored value.

Iterators do not block concurrent reads and writes on the cache. An entry can be inserted to, invalidated or evicted from a cache while iterators are alive on the same cache.

Unlike the get method, visiting entries via an iterator do not update the historic popularity estimator or reset idle timers for keys.

Guarantees

In order to allow concurrent access to the cache, iterator’s next method does not guarantee the following:

  • It does not guarantee to return a key-value pair (an entry) if its key has been inserted to the cache after the iterator was created.
    • Such an entry may or may not be returned depending on key’s hash and timing.

and the next method guarantees the followings:

  • It guarantees not to return the same entry more than once.
  • It guarantees not to return an entry if it has been removed from the cache after the iterator was created.
    • Note: An entry can be removed by following reasons:
      • Manually invalidated.
      • Expired (e.g. time-to-live).
      • Evicted as the cache capacity exceeded.
Examples
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.8.2", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;

#[tokio::main]
async fn main() {
    let cache = Cache::new(100);
    cache.insert("Julia", 14).await;

    let mut iter = cache.iter();
    let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
    assert_eq!(*k, "Julia");
    assert_eq!(v, 14);

    assert!(iter.next().is_none());
}

Returns a BlockingOp for this cache. It provides blocking insert and invalidate methods, which can be called outside of asynchronous contexts.

Returns a read-only cache policy of this cache.

At this time, cache policy cannot be modified after cache creation. A future version may support to modify it.

Trait Implementations

Returns a copy of the value. Read more

Performs copy-assignment from source. Read more

Performs any pending maintenance operations needed by the cache.

The type of the elements being iterated over.

Which kind of iterator are we turning this into?

Creates an iterator from a value. Read more

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The resulting type after obtaining ownership.

Creates owned data from borrowed data, usually by cloning. Read more

🔬 This is a nightly-only experimental API. (toowned_clone_into)

Uses borrowed data to replace owned data, usually by cloning. Read more

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.