go 0.1.2

A runtime-agnostic Go-style concurrency library for Rust
Documentation
/// Helper macro: run an async block on whichever backend is active.
macro_rules! run_async {
    ($body:expr) => {{
        #[cfg(feature = "rt-tokio")]
        {
            tokio::runtime::Runtime::new().unwrap().block_on($body)
        }
        #[cfg(feature = "rt-async-std")]
        {
            async_std::task::block_on($body)
        }
        #[cfg(feature = "rt-smol")]
        {
            smol::block_on($body)
        }
    }};
}

// ===========================================================================
// Spawn tests
// ===========================================================================

#[test]
fn spawn_returns_value() {
    run_async!(async {
        let handle = go::spawn(async { 42 });
        assert_eq!(handle.await.unwrap(), 42);
    });
}

#[test]
fn spawn_captures_panic() {
    run_async!(async {
        let handle = go::spawn(async { panic!("oops") as i32 });
        let err = handle.await.unwrap_err();
        assert!(matches!(err, go::JoinError::Panic(_)));
    });
}

#[test]
fn go_macro_works() {
    run_async!(async {
        let handle = go::go!(async { "hello" });
        assert_eq!(handle.await.unwrap(), "hello");
    });
}

// ===========================================================================
// Channel tests
// ===========================================================================

#[test]
fn chan_bounded_send_recv() {
    run_async!(async {
        let (tx, rx) = go::chan::bounded(4);
        tx.send(10).await.unwrap();
        tx.send(20).await.unwrap();
        assert_eq!(rx.recv().await.unwrap(), 10);
        assert_eq!(rx.recv().await.unwrap(), 20);
    });
}

#[test]
fn chan_unbounded_send_recv() {
    run_async!(async {
        let (tx, rx) = go::chan::unbounded();
        tx.send("hello".to_string()).await.unwrap();
        assert_eq!(rx.recv().await.unwrap(), "hello");
    });
}

#[test]
fn chan_recv_error_on_closed() {
    run_async!(async {
        let (tx, rx) = go::chan::bounded::<i32>(1);
        drop(tx);
        assert!(rx.recv().await.is_err());
    });
}

#[test]
#[should_panic(expected = "go::chan::bounded requires cap >= 1")]
fn chan_bounded_cap_zero_panics() {
    let _ = go::chan::bounded::<i32>(0);
}

// ===========================================================================
// WaitGroup tests
// ===========================================================================

#[test]
fn waitgroup_basic() {
    run_async!(async {
        let wg = go::WaitGroup::new();
        wg.add(3);
        for _ in 0..3 {
            let wg = wg.clone();
            go::go!(async move {
                wg.done();
            });
        }
        wg.wait().await;
    });
}

#[test]
fn waitgroup_guard() {
    run_async!(async {
        let wg = go::WaitGroup::new();
        wg.add(1);
        let wg2 = wg.clone();
        go::go!(async move {
            let _g = wg2.guard();
        });
        wg.wait().await;
    });
}

#[test]
fn waitgroup_zero_is_immediate() {
    run_async!(async {
        let wg = go::WaitGroup::new();
        wg.wait().await;
    });
}

// ===========================================================================
// Time tests
// ===========================================================================

#[test]
fn sleep_completes() {
    run_async!(async {
        let start = std::time::Instant::now();
        go::sleep(std::time::Duration::from_millis(50)).await;
        assert!(start.elapsed() >= std::time::Duration::from_millis(40));
    });
}

#[test]
fn timeout_succeeds_when_fast() {
    run_async!(async {
        let result = go::timeout(std::time::Duration::from_secs(1), async { 42 }).await;
        assert_eq!(result.unwrap(), 42);
    });
}

#[test]
fn timeout_fails_when_slow() {
    run_async!(async {
        let result = go::timeout(std::time::Duration::from_millis(10), async {
            go::sleep(std::time::Duration::from_secs(10)).await;
            42
        })
        .await;
        assert!(result.is_err());
    });
}

// ===========================================================================
// select! tests
// ===========================================================================

#[test]
fn select_picks_ready_branch() {
    run_async!(async {
        let (tx1, rx1) = go::chan::bounded(1);
        let (_tx2, rx2) = go::chan::bounded::<i32>(1);
        tx1.send(42).await.unwrap();
        let val = go::select! {
            v = rx1.recv() => v.unwrap(),
            v = rx2.recv() => v.unwrap(),
        };
        assert_eq!(val, 42);
    });
}

#[test]
fn select_default_when_none_ready() {
    run_async!(async {
        let (_tx, rx) = go::chan::bounded::<i32>(1);
        let val = go::select! {
            v = rx.recv() => v.unwrap(),
            default => -1,
        };
        assert_eq!(val, -1);
    });
}

#[test]
fn select_three_branches() {
    run_async!(async {
        let (_tx1, rx1) = go::chan::bounded::<i32>(1);
        let (tx2, rx2) = go::chan::bounded(1);
        let (_tx3, rx3) = go::chan::bounded::<i32>(1);
        tx2.send(99).await.unwrap();
        let val = go::select! {
            v = rx1.recv() => v.unwrap(),
            v = rx2.recv() => v.unwrap(),
            v = rx3.recv() => v.unwrap(),
        };
        assert_eq!(val, 99);
    });
}

#[test]
fn select_single_branch_awaits() {
    run_async!(async {
        let (tx, rx) = go::chan::bounded(1);
        tx.send(7).await.unwrap();
        let val = go::select! {
            v = rx.recv() => v.unwrap(),
        };
        assert_eq!(val, 7);
    });
}

// ===========================================================================
// Singleflight tests
// ===========================================================================

use std::sync::atomic::{AtomicUsize, Ordering};

#[test]
fn singleflight_single_call() {
    run_async!(async {
        let g = go::singleflight::Group::<&str, i32>::new();
        let (v, shared) = g.do_("k", async { 5 }).await;
        assert_eq!(v, 5);
        assert!(!shared, "a lone call is never shared");
    });
}

#[test]
fn singleflight_dedups_concurrent_calls() {
    run_async!(async {
        let g = go::singleflight::Group::<&str, i32>::new();
        let counter = std::sync::Arc::new(AtomicUsize::new(0));

        let mut handles = Vec::new();
        for _ in 0..10 {
            let g = g.clone();
            let counter = counter.clone();
            handles.push(go::spawn(async move {
                g.do_("key", async move {
                    counter.fetch_add(1, Ordering::SeqCst);
                    go::sleep(std::time::Duration::from_millis(100)).await;
                    7
                })
                .await
            }));
        }

        let mut shared_count = 0;
        for h in handles {
            let (v, shared) = h.await.unwrap();
            assert_eq!(v, 7);
            if shared {
                shared_count += 1;
            }
        }

        assert_eq!(
            counter.load(Ordering::SeqCst),
            1,
            "the underlying future must run exactly once"
        );
        assert!(shared_count >= 1, "followers must observe shared == true");
    });
}

#[test]
fn singleflight_distinct_keys_do_not_dedup() {
    run_async!(async {
        let g = go::singleflight::Group::<&str, i32>::new();
        let counter = std::sync::Arc::new(AtomicUsize::new(0));

        let g1 = g.clone();
        let c1 = counter.clone();
        let h1 = go::spawn(async move {
            g1.do_("a", async move {
                c1.fetch_add(1, Ordering::SeqCst);
                go::sleep(std::time::Duration::from_millis(50)).await;
                1
            })
            .await
        });

        let g2 = g.clone();
        let c2 = counter.clone();
        let h2 = go::spawn(async move {
            g2.do_("b", async move {
                c2.fetch_add(1, Ordering::SeqCst);
                go::sleep(std::time::Duration::from_millis(50)).await;
                2
            })
            .await
        });

        let (va, _) = h1.await.unwrap();
        let (vb, _) = h2.await.unwrap();
        assert_eq!(va, 1);
        assert_eq!(vb, 2);
        assert_eq!(counter.load(Ordering::SeqCst), 2);
    });
}

#[test]
fn singleflight_forget_lets_a_new_call_through() {
    run_async!(async {
        let g = go::singleflight::Group::<&str, i32>::new();
        let counter = std::sync::Arc::new(AtomicUsize::new(0));

        // Leader 1 is in flight (sleeping).
        let g1 = g.clone();
        let c1 = counter.clone();
        let h1 = go::spawn(async move {
            g1.do_("k", async move {
                c1.fetch_add(1, Ordering::SeqCst);
                go::sleep(std::time::Duration::from_millis(100)).await;
                1
            })
            .await
        });

        // Let leader 1 register in the map.
        go::sleep(std::time::Duration::from_millis(20)).await;

        // Forget the in-flight key so the next call does NOT dedup with it.
        g.forget(&"k");

        let g2 = g.clone();
        let c2 = counter.clone();
        let h2 = go::spawn(async move {
            g2.do_("k", async move {
                c2.fetch_add(1, Ordering::SeqCst);
                go::sleep(std::time::Duration::from_millis(100)).await;
                2
            })
            .await
        });

        let (v1, _) = h1.await.unwrap();
        let (v2, _) = h2.await.unwrap();
        assert_eq!(v1, 1);
        assert_eq!(v2, 2);
        assert_eq!(
            counter.load(Ordering::SeqCst),
            2,
            "forget must allow a second concurrent execution"
        );
    });
}

#[test]
fn singleflight_do_chan_yields_result() {
    run_async!(async {
        let g = go::singleflight::Group::<&str, i32>::new();
        let rx = g.do_chan("k", async { 99 });
        let (v, _shared) = rx.recv().await.unwrap();
        assert_eq!(v, 99);
    });
}

#[test]
fn singleflight_abandoned_leader_does_not_hang() {
    run_async!(async {
        let g = go::singleflight::Group::<&str, i32>::new();

        // Cancel the leader's future by letting a timeout elapse first.
        let r = go::timeout(
            std::time::Duration::from_millis(20),
            g.do_("k", async {
                go::sleep(std::time::Duration::from_millis(500)).await;
                1
            }),
        )
        .await;
        assert!(r.is_err(), "the leader future should have been cancelled");

        // A fresh call on the same key must still complete (no stuck entry).
        let (v, _) = g.do_("k", async { 42 }).await;
        assert_eq!(v, 42);
    });
}