safina-executor 0.3.3

Safe async runtime
Documentation
#![forbid(unsafe_code)]

use core::fmt::Debug;
use core::ops::Range;
use core::time::Duration;
use safina_executor::{get_thread_executor, Executor};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

fn assert_in_range<T: PartialOrd + Debug>(range: Range<T>, value: &T) {
    assert!(!range.is_empty(), "invalid range {:?}", range);
    // println!(
    //     "measured concurrency value {:?}, expected range {:?}",
    //     value, range,
    // );
    assert!(
        range.contains(value),
        "measured concurrency value {:?} out of range {:?}",
        value,
        range,
    );
}

/// # Panics
/// Panics if the elapsed time since `before` is outside of `range_ms`.
pub fn assert_elapsed(before: Instant, range_ms: Range<u64>) {
    assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms);
    let elapsed = before.elapsed();
    let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end);
    assert!(
        duration_range.contains(&elapsed),
        "{:?} elapsed, out of range {:?}",
        elapsed,
        duration_range
    );
}

fn measure_async_concurrency(executor: &Arc<Executor>, num_jobs: usize) -> f32 {
    const WAIT_DURATION: Duration = Duration::from_millis(100);
    let before = Instant::now();
    let receiver = {
        let (sender, receiver) = std::sync::mpsc::channel();
        for _ in 0..num_jobs {
            let sender_clone = sender.clone();
            executor.spawn(async move {
                std::thread::sleep(WAIT_DURATION);
                sender_clone.send(()).unwrap();
            });
        }
        receiver
    };
    for _ in 0..num_jobs {
        receiver.recv_timeout(Duration::from_millis(500)).unwrap();
    }
    let elapsed = before.elapsed();
    elapsed.as_secs_f32() / WAIT_DURATION.as_secs_f32()
}

fn expect_async_concurrency(executor: &Arc<Executor>, num_tasks: usize) {
    assert_in_range(
        1.0_f32..1.90,
        &measure_async_concurrency(executor, num_tasks),
    );
    assert_in_range(
        2.0_f32..2.90,
        &measure_async_concurrency(executor, num_tasks + 1),
    );
}

async fn measure_blocking_concurrency(executor: &Arc<Executor>, num_jobs: usize) -> f32 {
    const WAIT_DURATION: Duration = Duration::from_millis(100);
    let before = Instant::now();
    let mut receivers = Vec::new();
    for _ in 0..num_jobs {
        receivers.push(executor.schedule_blocking(|| std::thread::sleep(WAIT_DURATION)));
    }
    let deadline = Instant::now() + WAIT_DURATION * 3;
    for mut receiver in receivers {
        safina_timer::with_deadline(async move { receiver.async_recv().await }, deadline)
            .await
            .unwrap()
            .unwrap();
    }
    let elapsed = before.elapsed();
    elapsed.as_secs_f32() / WAIT_DURATION.as_secs_f32()
}

async fn expect_blocking_concurrency(executor: &Arc<Executor>, num_tasks: usize) {
    assert_in_range(
        1.0_f32..1.90,
        &measure_blocking_concurrency(executor, num_tasks).await,
    );
    assert_in_range(
        2.0_f32..2.90,
        &measure_blocking_concurrency(executor, num_tasks + 1).await,
    );
}

trait UnwindAnyToString {
    fn any_to_string(&self) -> String;
}
impl UnwindAnyToString for Box<dyn std::any::Any + Send> {
    fn any_to_string(&self) -> String {
        if let Some(s) = self.downcast_ref::<&'static str>() {
            (*s).to_string()
        } else if let Some(s) = self.downcast_ref::<String>() {
            s.clone()
        } else {
            format!("{:?}", self)
        }
    }
}

/// # Panics
/// Panics if the time elapsed since `before` is not in `range_ms`.
pub fn expect_elapsed(before: Instant, range_ms: Range<u64>) {
    assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms);
    let elapsed = before.elapsed();
    let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end);
    assert!(
        duration_range.contains(&elapsed),
        "{:?} elapsed, out of range {:?}",
        elapsed,
        duration_range
    );
}

fn get_async_thread_name(executor: &Arc<Executor>) -> String {
    let (sender, receiver) = std::sync::mpsc::channel();
    executor.spawn(async move {
        sender
            .send(std::thread::current().name().unwrap().to_string())
            .unwrap();
    });
    receiver.recv_timeout(Duration::from_millis(500)).unwrap()
}

async fn get_blocking_thread_name(executor: &Arc<Executor>) -> String {
    executor
        .schedule_blocking(|| std::thread::current().name().unwrap().to_string())
        .async_recv()
        .await
        .unwrap()
}

#[test]
fn test_default() {
    let executor = Executor::default();
    expect_async_concurrency(&executor, 4);
    assert_eq!(
        "async",
        get_async_thread_name(&executor)
            .strip_suffix(|c| "0123".contains(c))
            .unwrap()
    );
    executor.block_on(async {
        let executor = get_thread_executor().unwrap();
        expect_blocking_concurrency(&executor, 4).await;
        assert_eq!(
            "blocking",
            get_blocking_thread_name(&executor)
                .await
                .strip_suffix(|c| "0123".contains(c))
                .unwrap()
        );
    });
}

#[test]
#[allow(clippy::default_trait_access)]
fn test_default_trait() {
    let executor: Arc<Executor> = Arc::new(Default::default());
    expect_async_concurrency(&executor, 4);
    assert_eq!(
        "async",
        get_async_thread_name(&executor)
            .strip_suffix(|c| "0123".contains(c))
            .unwrap()
    );
    let executor_clone = executor.clone();
    executor.block_on(async move {
        expect_blocking_concurrency(&executor_clone, 4).await;
        assert_eq!(
            "blocking",
            get_blocking_thread_name(&executor_clone)
                .await
                .strip_suffix(|c| "0123".contains(c))
                .unwrap()
        );
    });
}

#[test]
fn test_new() {
    let executor = Executor::new(2, 2).unwrap();
    expect_async_concurrency(&executor, 2);
    assert_eq!(
        "async",
        get_async_thread_name(&executor)
            .strip_suffix(|c| "01".contains(c))
            .unwrap()
    );
    let executor_clone = executor.clone();
    executor.block_on(async move {
        expect_blocking_concurrency(&executor_clone, 2).await;
        assert_eq!(
            "blocking",
            get_blocking_thread_name(&executor_clone)
                .await
                .strip_suffix(|c| "0123".contains(c))
                .unwrap()
        );
    });
}

#[test]
fn test_with_name() {
    let executor = Executor::with_name("exec", 2, "io", 2).unwrap();
    expect_async_concurrency(&executor, 2);
    assert_eq!(
        "exec",
        get_async_thread_name(&executor)
            .strip_suffix(|c| "01".contains(c))
            .unwrap()
    );
    let executor_clone = executor.clone();
    executor.block_on(async move {
        expect_blocking_concurrency(&executor_clone, 2).await;
        assert_eq!(
            "io",
            get_blocking_thread_name(&executor_clone)
                .await
                .strip_suffix(|c| "0123".contains(c))
                .unwrap()
        );
    });
}

#[test]
fn test_spawn_unpin() {
    let executor = safina_executor::Executor::default();
    let (sender, receiver) = std::sync::mpsc::channel();
    executor.spawn(async move {
        sender.send(()).unwrap();
    });
    receiver.recv().unwrap();
}

#[test]
fn test_spawn() {
    let executor = safina_executor::Executor::default();
    let (sender, receiver) = std::sync::mpsc::channel();
    executor.spawn_unpin(Box::pin(async move {
        sender.send(()).unwrap();
    }));
    receiver.recv().unwrap();
}

#[test]
fn test_waking() {
    safina_timer::start_timer_thread();
    let executor = safina_executor::Executor::default();
    let (sender, receiver) = std::sync::mpsc::channel();
    let before = Instant::now();
    executor.spawn(async move {
        safina_timer::sleep_for(Duration::from_millis(100)).await;
        sender.send(()).unwrap();
    });
    receiver.recv().unwrap();
    expect_elapsed(before, 100..200);
}

#[test]
fn test_wake_after_ready() {
    struct FnFuture(Box<dyn (FnMut(&mut Context<'_>) -> Poll<()>) + Send>);
    impl Future for FnFuture {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            self.as_mut().0(cx)
        }
    }

    safina_timer::start_timer_thread();
    let executor = safina_executor::Executor::default();
    let (sender, receiver) = std::sync::mpsc::channel();
    let state = Arc::new(AtomicBool::new(true));
    let fut = FnFuture(Box::new(move |cx| {
        sender.send(cx.waker().clone()).unwrap();
        if state.swap(false, Ordering::AcqRel) {
            Poll::Ready(())
        } else {
            panic!("resumed after completion");
        }
    }));
    executor.spawn(fut);
    receiver.recv().unwrap().wake();
    receiver.recv().unwrap_err();
}

#[test]
fn test_async_panic() {
    // Start a single thread.
    let executor = safina_executor::Executor::new(1, 1).unwrap();
    // Run a future that panics.
    executor.spawn(async { panic!("ignore this panic") });
    std::thread::sleep(Duration::from_millis(100));
    // Check that futures still run.
    let (sender, receiver) = std::sync::mpsc::channel();
    executor.spawn(async move {
        sender.send(()).unwrap();
    });
    receiver.recv().unwrap();
}

#[test]
fn test_executor_block_on() {
    safina_timer::start_timer_thread();
    let executor = safina_executor::Executor::new(1, 1).unwrap();
    assert_eq!(3_u8, executor.block_on(async { 3_u8 }));
    let value = 5_u8;
    assert_eq!(5_u8, executor.block_on(async move { value }));
    let before = Instant::now();
    executor.block_on(async {
        safina_timer::sleep_for(Duration::from_millis(100)).await;
    });
    expect_elapsed(before, 100..200);
    assert_eq!(
        "ignore this panic",
        std::panic::catch_unwind(|| executor.block_on(async { panic!("ignore this panic") }))
            .unwrap_err()
            .any_to_string()
            .as_str()
    );
    executor.block_on(async {
        let (sender, receiver) = std::sync::mpsc::channel();
        safina_executor::spawn(async move {
            sender.send(()).unwrap();
        });
        receiver.recv().unwrap();
    });
}

#[test]
fn test_block_on() {
    safina_timer::start_timer_thread();
    assert_eq!(3_u8, safina_executor::block_on(async { 3_u8 }));
    let value = 5_u8;
    assert_eq!(5_u8, safina_executor::block_on(async move { value }));
    let before = Instant::now();
    safina_executor::block_on(async {
        safina_timer::sleep_for(Duration::from_millis(100)).await;
    });
    expect_elapsed(before, 100..200);
    assert_eq!(
        "ignore this panic",
        std::panic::catch_unwind(|| safina_executor::block_on(async {
            panic!("ignore this panic");
        }))
        .unwrap_err()
        .any_to_string()
        .as_str()
    );
}

#[test]
fn test_block_on_unpin() {
    assert_eq!(
        3_u8,
        safina_executor::block_on_unpin(Box::pin(async { 3_u8 }))
    );
}

#[test]
fn test_schedule_blocking_simple() {
    let executor = safina_executor::Executor::new(1, 1).unwrap();
    executor.block_on(async {
        assert_eq!(
            3_u8,
            safina_executor::schedule_blocking(|| 3_u8)
                .async_recv()
                .await
                .unwrap()
        );
    });
}
#[test]
fn test_executor_schedule_blocking_simple() {
    let executor = safina_executor::Executor::new(2, 2).unwrap();
    let executor_clone = executor.clone();
    executor.block_on(async move {
        assert_eq!(
            3_u8,
            executor_clone
                .schedule_blocking(|| 3_u8)
                .async_recv()
                .await
                .unwrap()
        );
    });
}

#[test]
fn test_schedule_blocking_move() {
    let executor = safina_executor::Executor::new(1, 1).unwrap();
    executor.block_on(async {
        let value = 5_u8;
        assert_eq!(
            5_u8,
            safina_executor::schedule_blocking(move || value)
                .async_recv()
                .await
                .unwrap()
        );
    });
}
#[test]
fn test_executor_schedule_blocking_move() {
    let executor = safina_executor::Executor::new(2, 2).unwrap();
    let executor_clone = executor.clone();
    executor.block_on(async move {
        let value = 5_u8;
        assert_eq!(
            5_u8,
            executor_clone
                .schedule_blocking(move || value)
                .async_recv()
                .await
                .unwrap()
        );
    });
}

#[test]
fn test_schedule_blocking_sleep() {
    let executor = safina_executor::Executor::new(1, 1).unwrap();
    executor.block_on(async {
        let before = Instant::now();
        assert_eq!(
            7_u8,
            safina_executor::schedule_blocking(|| {
                std::thread::sleep(Duration::from_millis(100));
                7_u8
            })
            .async_recv()
            .await
            .unwrap()
        );
        expect_elapsed(before, 100..200);
    });
}
#[test]
fn test_executor_schedule_blocking_sleep() {
    let executor = safina_executor::Executor::new(2, 2).unwrap();
    let executor_clone = executor.clone();
    executor.block_on(async move {
        let before = Instant::now();
        assert_eq!(
            7_u8,
            executor_clone
                .schedule_blocking(|| {
                    std::thread::sleep(Duration::from_millis(100));
                    7_u8
                })
                .async_recv()
                .await
                .unwrap()
        );
        expect_elapsed(before, 100..200);
    });
}

#[test]
fn test_schedule_blocking_panic() {
    let executor = safina_executor::Executor::new(1, 1).unwrap();
    executor.block_on(async {
        safina_executor::schedule_blocking(|| panic!("ignore this panic"))
            .async_recv()
            .await
            .unwrap_err();
    });
}
#[test]
fn test_executor_schedule_blocking_panic() {
    let executor = safina_executor::Executor::new(2, 2).unwrap();
    let executor_clone = executor.clone();
    executor.block_on(async move {
        executor_clone
            .schedule_blocking(|| panic!("ignore this panic"))
            .async_recv()
            .await
            .unwrap_err();
    });
}

#[test]
fn test_schedule_blocking_recv() {
    safina_timer::start_timer_thread();
    let executor = safina_executor::Executor::new(1, 1).unwrap();
    executor.block_on(async {
        let before = Instant::now();
        let (sender, receiver) = std::sync::mpsc::channel();
        let mut result_receiver =
            safina_executor::schedule_blocking(move || receiver.recv().unwrap());
        safina_timer::sleep_for(Duration::from_millis(100)).await;
        sender.send(9_u8).unwrap();
        assert_eq!(9_u8, result_receiver.async_recv().await.unwrap());
        expect_elapsed(before, 100..200);
    });
}
#[test]
fn test_executor_schedule_blocking_recv() {
    safina_timer::start_timer_thread();
    let executor = safina_executor::Executor::new(2, 2).unwrap();
    let executor_clone = executor.clone();
    executor.block_on(async move {
        let before = Instant::now();
        let (sender, receiver) = std::sync::mpsc::channel();
        let mut result_receiver =
            executor_clone.schedule_blocking(move || receiver.recv().unwrap());
        safina_timer::sleep_for(Duration::from_millis(100)).await;
        sender.send(9_u8).unwrap();
        assert_eq!(9_u8, result_receiver.async_recv().await.unwrap());
        expect_elapsed(before, 100..200);
    });
}

#[test]
fn no_executor_running() {
    assert_eq!(
        "called from outside a task; check for duplicate safina-executor crate: cargo tree -d",
        std::panic::catch_unwind(
            || safina_executor::block_on(safina_executor::schedule_blocking(|| 3_u8))
        )
        .unwrap_err()
        .any_to_string()
        .as_str()
    );
    assert_eq!(
        "called from outside a task; check for duplicate safina-executor crate: cargo tree -d",
        std::panic::catch_unwind(|| safina_executor::block_on(async {
            safina_executor::spawn(async {});
        }))
        .unwrap_err()
        .any_to_string()
        .as_str()
    );
}