rs-singleflight 0.1.1

Async single-flight request coalescing for Rust.
Documentation
  • Coverage
  • 51.85%
    14 out of 27 items documented1 out of 19 items with examples
  • Size
  • Source code size: 20.67 kB This is the summed size of all the files inside the crates.io package for this release.
  • Documentation size: 650.26 kB This is the summed size of all files generated by rustdoc for all configured targets
  • Ø build duration
  • this release: 6s Average build duration of successful builds.
  • all releases: 6s Average build duration of successful builds in releases after 2024-10-23.
  • Links
  • wilbyang/rs-singleflight
    0 0 0
  • crates.io
  • Dependencies
  • Versions
  • Owners
  • wilbyang

rs-singleflight

rs-singleflight provides async single-flight request coalescing for Rust.

For each key, exactly one leader computes an expensive resource while duplicate callers subscribe to the same in-flight result. If the leader is dropped before completion, subscribers receive Outcome::Canceled instead of hanging.

Example

use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};

use rs_singleflight::{Group, Outcome};

#[tokio::main]
async fn main() {
    let calls = Arc::new(AtomicUsize::new(0));
    let calls_for_op = Arc::clone(&calls);
    let group = Group::new(move |key: String| {
        let calls = Arc::clone(&calls_for_op);
        async move {
            assert_eq!(key, "resource");
            calls.fetch_add(1, Ordering::SeqCst);
            Ok::<String, ()>("computed value".to_owned())
        }
    });

    let mut tasks = Vec::new();
    for _ in 0..8 {
        let group = group.clone();
        tasks.push(tokio::spawn(async move {
            group.run("resource".to_owned()).await
        }));
    }

    for task in tasks {
        match task.await.unwrap().as_ref() {
            Outcome::Complete { result, shared } => {
                assert_eq!(result.as_ref().unwrap(), "computed value");
                assert!(*shared);
            }
            Outcome::Canceled => panic!("leader was canceled"),
        }
    }

    assert_eq!(calls.load(Ordering::SeqCst), 1);
}

Custom scheduling

Use Group::entry when you want to decide where the leader computation runs. It returns either a Leader, which owns the single computation, or a Subscriber, which waits on the leader's broadcast result.

use rs_singleflight::{Entry, Group};

#[tokio::main]
async fn main() {
    let group = Group::new(|key: &'static str| async move {
        assert_eq!(key, "key");
        Ok::<usize, ()>(42)
    });

    match group.entry("key") {
        Entry::Leader(leader) => {
            tokio::spawn(async move {
                leader.complete(Ok(42));
            });
        }
        Entry::Subscriber(subscriber) => {
            let outcome = subscriber.recv().await.unwrap();
            assert!(outcome.is_shared());
        }
    }
}

License

BSD-3-Clause.