go 0.1.2

A runtime-agnostic Go-style concurrency library for Rust
Documentation
//! Deduplicate concurrent calls keyed by an arbitrary key.
//!
//! Equivalent to Go's `golang.org/x/sync/singleflight`: while a call for a
//! given key is in flight, concurrent callers for the same key wait and receive
//! a clone of the single in-flight result instead of running the work again.

use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::sync::{Arc, Mutex};

use event_listener::Event;

struct CallState<V> {
    dups: usize,
    result: Option<V>,
}

struct Call<V> {
    event: Event,
    state: Mutex<CallState<V>>,
}

struct GroupInner<K, V> {
    calls: Mutex<HashMap<K, Arc<Call<V>>>>,
}

/// Deduplicates concurrent calls so that only one execution is in flight for a
/// given key at a time; duplicate callers receive a clone of the shared result.
pub struct Group<K, V> {
    inner: Arc<GroupInner<K, V>>,
}

impl<K, V> Clone for Group<K, V> {
    fn clone(&self) -> Self {
        Self {
            inner: Arc::clone(&self.inner),
        }
    }
}

impl<K, V> Default for Group<K, V> {
    fn default() -> Self {
        Self::new()
    }
}

enum Role<V> {
    Leader(Arc<Call<V>>),
    Follower(Arc<Call<V>>),
}

// Removes the in-flight entry and wakes waiters if the leader's future is
// dropped (cancelled) or panics before publishing a result. Owns a clone of the
// key (rather than borrowing it) so the `do_` future stays `Send` without
// requiring `K: Sync`.
struct LeaderGuard<'a, K: Eq + Hash, V> {
    inner: &'a GroupInner<K, V>,
    key: K,
    call: &'a Arc<Call<V>>,
    done: bool,
}

impl<'a, K: Eq + Hash, V> Drop for LeaderGuard<'a, K, V> {
    fn drop(&mut self) {
        if !self.done {
            self.inner.calls.lock().unwrap().remove(&self.key);
            self.call.event.notify(usize::MAX);
        }
    }
}

impl<K, V> Group<K, V> {
    /// Create an empty group.
    pub fn new() -> Self {
        Self {
            inner: Arc::new(GroupInner {
                calls: Mutex::new(HashMap::new()),
            }),
        }
    }
}

impl<K, V> Group<K, V>
where
    K: Eq + Hash + Clone,
    V: Clone,
{
    /// Execute `fut`, deduplicating against any concurrent call for `key`.
    ///
    /// The first caller for a key (the leader) runs `fut`; concurrent callers
    /// for the same key wait and receive a clone of the leader's result.
    /// Returns `(value, shared)`, where `shared` is `true` if the value was
    /// shared with at least one other concurrent caller.
    ///
    /// If the leader's future is cancelled (dropped) or panics before
    /// producing a value, the in-flight call is abandoned: waiting callers
    /// retry, and one of them becomes the new leader.
    ///
    /// # Examples
    ///
    /// ```
    /// use go::singleflight::Group;
    ///
    /// async fn dedup(group: &Group<&'static str, i32>) {
    ///     let (value, shared) = group.do_("key", async { 42 }).await;
    ///     assert_eq!(value, 42);
    ///     let _ = shared;
    /// }
    /// ```
    pub async fn do_<F>(&self, key: K, fut: F) -> (V, bool)
    where
        F: Future<Output = V>,
    {
        let mut fut = Some(fut);
        loop {
            let role = {
                let mut calls = self.inner.calls.lock().unwrap();
                match calls.get(&key) {
                    Some(call) => {
                        call.state.lock().unwrap().dups += 1;
                        Role::Follower(Arc::clone(call))
                    }
                    None => {
                        let call = Arc::new(Call {
                            event: Event::new(),
                            state: Mutex::new(CallState {
                                dups: 0,
                                result: None,
                            }),
                        });
                        calls.insert(key.clone(), Arc::clone(&call));
                        Role::Leader(call)
                    }
                }
            };

            match role {
                Role::Follower(call) => {
                    let listener = call.event.listen();
                    if let Some(v) = call.state.lock().unwrap().result.clone() {
                        return (v, true);
                    }
                    listener.await;
                    if let Some(v) = call.state.lock().unwrap().result.clone() {
                        return (v, true);
                    }
                    // Leader abandoned the call without a result; retry as a
                    // candidate leader.
                    continue;
                }
                Role::Leader(call) => {
                    let mut guard = LeaderGuard {
                        inner: &self.inner,
                        key: key.clone(),
                        call: &call,
                        done: false,
                    };
                    let value = fut.take().expect("leader future already consumed").await;
                    let shared = {
                        let mut calls = self.inner.calls.lock().unwrap();
                        calls.remove(&key);
                        let mut state = call.state.lock().unwrap();
                        state.result = Some(value.clone());
                        state.dups > 0
                    };
                    guard.done = true;
                    call.event.notify(usize::MAX);
                    return (value, shared);
                }
            }
        }
    }

    /// Like [`do_`](Self::do_) but runs the work on the active runtime and
    /// returns a receiver that yields `(value, shared)` exactly once.
    ///
    /// Requires `Send + 'static` because it spawns onto the runtime.
    pub fn do_chan<F>(&self, key: K, fut: F) -> crate::chan::Receiver<(V, bool)>
    where
        F: Future<Output = V> + Send + 'static,
        K: Send + 'static,
        V: Send + 'static,
    {
        let (tx, rx) = crate::chan::bounded::<(V, bool)>(1);
        let group = self.clone();
        crate::spawn(async move {
            let result = group.do_(key, fut).await;
            let _ = tx.send(result).await;
        });
        rx
    }

    /// Forget an in-flight key so the next `do_` call starts a fresh execution.
    ///
    /// Callers already waiting on the forgotten call still receive its result
    /// when the original leader finishes; callers arriving after `forget`
    /// create a new leader. Forgetting an absent key is a no-op.
    pub fn forget(&self, key: &K) {
        self.inner.calls.lock().unwrap().remove(key);
    }
}