Skip to main content

Crate rs_singleflight

Crate rs_singleflight 

Source
Expand description

§rs-singleflight

rs-singleflight provides async single-flight request coalescing for Rust.

For each key, exactly one leader computes an expensive resource while duplicate callers subscribe to the same in-flight result. If the leader is dropped before completion, subscribers receive Outcome::Canceled instead of hanging.

§Example

use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};

use rs_singleflight::{Group, Outcome};

#[tokio::main]
async fn main() {
    let group = Arc::new(Group::<String, String, ()>::new());
    let calls = Arc::new(AtomicUsize::new(0));

    let mut tasks = Vec::new();
    for _ in 0..8 {
        let group = Arc::clone(&group);
        let calls = Arc::clone(&calls);
        tasks.push(tokio::spawn(async move {
            group
                .run("resource".to_owned(), || async {
                    calls.fetch_add(1, Ordering::SeqCst);
                    Ok("computed value".to_owned())
                })
                .await
        }));
    }

    for task in tasks {
        match task.await.unwrap().as_ref() {
            Outcome::Complete { result, shared } => {
                assert_eq!(result.as_ref().unwrap(), "computed value");
                assert!(*shared);
            }
            Outcome::Canceled => panic!("leader was canceled"),
        }
    }

    assert_eq!(calls.load(Ordering::SeqCst), 1);
}

§Custom scheduling

Use Group::entry when you want to decide where the leader computation runs. It returns either a Leader, which owns the single computation, or a Subscriber, which waits on the leader’s broadcast result.

use rs_singleflight::{Entry, Group};

#[tokio::main]
async fn main() {
    let group = Group::<&'static str, usize, ()>::new();

    match group.entry("key") {
        Entry::Leader(leader) => {
            tokio::spawn(async move {
                leader.complete(Ok(42));
            });
        }
        Entry::Subscriber(subscriber) => {
            let outcome = subscriber.recv().await.unwrap();
            assert!(outcome.is_shared());
        }
    }
}

§License

BSD-3-Clause.

Structs§

Group
Namespace for duplicate suppression.
Leader
Owner of the single computation for a key.
Subscriber
Receiver for a duplicate caller.

Enums§

Entry
Returned by Group::entry.
Outcome
Result published by the single in-flight computation.
WaitError
Error returned when a subscriber cannot receive a leader result.