rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Cross-runtime signaler tests.
//!
//! Compiled under either `--features tokio` or `--features smol
//! --no-default-features`. The active runtime drives the test executor.

use super::{AsyncSignaler, RuntimeSignaler};
use std::sync::Arc;

fn run<F: std::future::Future>(fut: F) -> F::Output {
    #[cfg(feature = "tokio")]
    {
        ::tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .expect("rt")
            .block_on(fut)
    }
    #[cfg(all(feature = "smol", not(feature = "tokio")))]
    {
        ::async_io::block_on(fut)
    }
}

#[test]
fn signal_then_await_returns_immediately() {
    run(async {
        let s = Arc::new(RuntimeSignaler::new().expect("signaler"));
        s.signal();
        // Should not park.
        s.signaled().await;
    });
}

#[test]
fn await_then_signal_wakes() {
    run(async {
        let s = Arc::new(RuntimeSignaler::new().expect("signaler"));
        let s2 = Arc::clone(&s);
        let waiter = async move { s2.signaled().await };
        let firer = async move {
            // Tiny yield so the waiter parks first.
            for _ in 0..16 {
                #[cfg(feature = "tokio")]
                ::tokio::task::yield_now().await;
                #[cfg(all(feature = "smol", not(feature = "tokio")))]
                ::smol::future::yield_now().await;
            }
            s.signal();
        };
        ::futures::future::join(waiter, firer).await;
    });
}

#[test]
fn coalesces_multiple_signals_into_one_wake() {
    run(async {
        let s = Arc::new(RuntimeSignaler::new().expect("signaler"));
        // Fire many before any await; one drain should clear all.
        for _ in 0..1000 {
            s.signal();
        }
        s.signaled().await;
        // Subsequent await should park (would hang) — verify by racing
        // with a fresh signal that arrives "after" coalescing drained.
        let s2 = Arc::clone(&s);
        let waiter = async move { s2.signaled().await };
        let firer = async move {
            for _ in 0..16 {
                #[cfg(feature = "tokio")]
                ::tokio::task::yield_now().await;
                #[cfg(all(feature = "smol", not(feature = "tokio")))]
                ::smol::future::yield_now().await;
            }
            s.signal();
        };
        ::futures::future::join(waiter, firer).await;
    });
}

#[test]
fn cross_thread_signal_wakes_runtime_task() {
    run(async {
        let s = Arc::new(RuntimeSignaler::new().expect("signaler"));
        let s_thread = Arc::clone(&s);
        let _h = std::thread::spawn(move || {
            std::thread::sleep(std::time::Duration::from_millis(10));
            s_thread.signal();
        });
        s.signaled().await;
    });
}