safina-executor 0.3.3

Safe async runtime
Documentation
//! [![crates.io version](https://img.shields.io/crates/v/safina-executor.svg)](https://crates.io/crates/safina-executor)
//! [![license: Apache 2.0](https://gitlab.com/leonhard-llc/safina-rs/-/raw/main/license-apache-2.0.svg)](http://www.apache.org/licenses/LICENSE-2.0)
//! [![unsafe forbidden](https://gitlab.com/leonhard-llc/safina-rs/-/raw/main/unsafe-forbidden-success.svg)](https://github.com/rust-secure-code/safety-dance/)
//! [![pipeline status](https://gitlab.com/leonhard-llc/safina-rs/badges/main/pipeline.svg)](https://gitlab.com/leonhard-llc/safina-rs/-/pipelines)
//!
//! An async executor.
//!
//! It is part of [`safina`](https://crates.io/crates/safina), a safe async runtime.
//!
//! # Features
//! - Spawn async tasks to execute on a threadpool.
//! - Schedule closures or `FnOnce` to run on a separate thread pool
//!   for blocking jobs.
//! - Supports multiple executors.
//! - `forbid(unsafe_code)`
//! - Depends only on `std`
//! - Good test coverage (100%)
//!
//! # Limitations
//! - Allocates memory
//! - Not optimized
//!
//! # Documentation
//! <https://docs.rs/safina-executor>
//!
//! # Examples
//! ```rust
//! let executor = safina_executor::Executor::default();
//! let (sender, receiver) = std::sync::mpsc::channel();
//! executor.spawn(async move {
//!     sender.send(()).unwrap();
//! });
//! receiver.recv().unwrap();
//! ```
//!
//! ```rust
//! # async fn prepare_request() -> Result<(), std::io::Error> { Ok(()) }
//! # async fn execute_request() -> Result<(), std::io::Error> { Ok(()) }
//! # fn f() -> Result<(), std::io::Error> {
//! let result = safina_executor::block_on(async {
//!     prepare_request().await?;
//!     execute_request().await
//! })?;
//! # Ok(())
//! # }
//! # f().unwrap()
//! ```
//!
//! ```rust
//! # fn read_file1() -> Result<(), std::io::Error> { Ok(()) }
//! # fn read_file2() -> Result<(), std::io::Error> { Ok(()) }
//! # async fn f() -> Result<(), std::io::Error> {
//! let result = safina_executor::schedule_blocking(|| {
//!     read_file1()?;
//!     read_file2()
//! }).async_recv().await.unwrap()?;
//! # Ok(())
//! # }
//! # let executor = safina_executor::Executor::default();
//! # executor.block_on(f()).unwrap();
//! ```
//!
//! # Alternatives
//! - [`async-executor`](https://crates.io/crates/async-executor)
//!   - Popular
//!   - Dependencies have some `unsafe` code
//! - [`futures-executor`](https://crates.io/crates/futures-executor)
//!   - Very popular
//!   - Full of `unsafe`
//! - [`tokio-executor`](https://crates.io/crates/tokio-executor)
//!   - Very popular
//!   - Fast
//!   - Internally complicated
//!   - Full of `unsafe`
//! - [`executors`](https://crates.io/crates/executors)
//!   - Dependencies have lots of `unsafe` code
//! - [`bastion-executor`](https://crates.io/crates/bastion-executor)
//!   - Lots of `unsafe` code
//! - [`rayon_core`](https://crates.io/crates/rayon-core)
//!   - Generous amounts of `unsafe` code
//! - [`pollster`](https://crates.io/crates/pollster)
//!   - Minimal
//!   - Has a little `unsafe` code
//! - [`lelet`](https://crates.io/crates/lelet)
//!   - Automatically scales worker thread pool
//!   - Lots of `unsafe` code
//! - [`fibers`](https://crates.io/crates/fibers)
//!   - Dependencies are full of unsafe
//! - [`nostd_async`](https://crates.io/crates/nostd_async)
//!   - Has some `unsafe` code
//! - [`embedded-executor`](https://crates.io/crates/embedded-executor)
//!   - Generous amounts of `unsafe` code
//! - [`spin_on`](https://crates.io/crates/spin_on)
//!   - Minimal
//! - [`pasts`](https://crates.io/crates/pasts)
//! - [`switchyard`](https://crates.io/crates/switchyard)
//!   - Lots of `unsafe` code
//! - [`sealrs`](https://crates.io/crates/sealrs)
//! - [`rusty_pool`](https://crates.io/crates/rusty_pool)
//!   - Automatically adjusts thread pool
//!   - Dependencies are full of unsafe
//!
//! # Changelog
//! - v0.3.3 - Eliminate spurious "resumed after completion" worker thread panics.
//! - v0.3.2 - Re-export `safina_sync::Receiver` and `safina_threadpool::NewThreadPoolError`.
//! - v0.3.1 - Use `safina-async` v0.2.1.
//! - v0.3.0 - `schedule_blocking` to return new `safina_sync::Receiver`.
//! - v0.2.1 - Update docs.
//! - v0.2.0
//!   - `Executor::new` and `Executor::with_name` to return `Result`.
//!   - Upgrade to `safina-threadpool` v0.2.0.
//! - v0.1.7 - `block_on` functions to take futures that are not `Send`.
//! - v0.1.6 - Fix deadlock in `block_on` and `block_on_unpin` when task is awakened a second time.
//! - v0.1.5 - Support stable Rust!  Needs 1.51+.
//! - v0.1.4 - Add
//!   [`schedule_blocking`](https://docs.rs/safina-executor/latest/safina_executor/fn.schedule_blocking.html)
//!   and
//!   [`Executor::schedule_blocking`](https://docs.rs/safina-executor/latest/safina_executor/struct.Executor.html#method.schedule_blocking)
//! - v0.1.3
//!   - Removed global executor.  Users must explicitly create executor.
//!   - Removed dependency on unstable
//!     [`OnceCell`](https://doc.rust-lang.org/std/lazy/struct.OnceCell.html).
//!   - Uses [`safina_threadpool`](https://crates.io/crates/safina-threadpool)
//!     internally.
//! - v0.1.2 - Let callers pass futures to `spawn` and `block_on` without
//!   using `Box::pin`.
//!   Add `spawn_unpin` and `block_on_unpin` for folks who need to avoid allocating.
//!   so callers don't have to.
//! - v0.1.1 - Fix badge and update readme
//! - v0.1.0 - Renamed from `safina`
//!
//! # TO DO
//! - Add a stress test
//! - Add a benchmark.  See benchmarks in <https://crates.io/crates/executors>
//! - Add an `#[async_main]` macro
//! - Look into using [`flume`](https://crates.io/crates/flume)
//!   to eliminate the receiver mutex and reduce contention.
//!
//! # Release Process
//! 1. Edit `Cargo.toml` and bump version number.
//! 1. Run `./release.sh`
#![forbid(unsafe_code)]

use core::cell::Cell;
use core::future::Future;
use core::pin::Pin;
use core::task::Poll;
pub use safina_sync::Receiver;
pub use safina_threadpool::NewThreadPoolError;
use safina_threadpool::ThreadPool;
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Mutex, Weak};

thread_local! {
    static EXECUTOR: Cell<Weak<Executor>> = Cell::new(Weak::new());
}

/// Gets the `Executor` from thread-local storage.
///
/// This is a low-level function.
/// You probably want to call [`safina_executor::spawn`](fn.spawn.html)
/// and other functions which use this function internally.
#[must_use]
pub fn get_thread_executor() -> Option<Arc<Executor>> {
    EXECUTOR.with(|cell| {
        let weak = cell.take();
        let result = weak.upgrade();
        cell.set(weak);
        result
    })
}

/// Guard returned by [`set_thread_executor`](fn.set_thread_executor.html).
///
/// On drop, it removes the thread-local reference to the executor.
pub struct ThreadExecutorGuard;
impl Drop for ThreadExecutorGuard {
    fn drop(&mut self) {
        EXECUTOR.with(std::cell::Cell::take);
    }
}

/// Sets `executor` as the `Executor` for the current thread,
/// saving it to thread-local storage.
///
/// Calls to [`safina_executor::spawn`](fn.spawn.html) and other crate-level
/// functions will use this executor.
///
/// Returns a guard struct.
/// When the guard drops, it removes `executor` from thread-local storage.
///
/// This is a low-level function.  You probably don't need to use this.
#[must_use]
pub fn set_thread_executor(executor: Weak<Executor>) -> ThreadExecutorGuard {
    EXECUTOR.with(|cell| cell.set(executor));
    ThreadExecutorGuard {}
}

/// A collection of threads for executing tasks and blocking jobs.
///
/// The methods of this struct are defined on `&Arc<Executor>`.
pub struct Executor {
    async_pool: ThreadPool,
    blocking_pool: ThreadPool,
}

impl Executor {
    /// Creates a new executor with 4 async threads and 4 blocking threads.
    ///
    /// # Panics
    /// Panics when it fails to start the threads.
    #[must_use]
    pub fn default() -> Arc<Self> {
        Self::new(4, 4).unwrap()
    }

    /// Creates a new executor.
    ///
    /// `num_async_threads` is the number of threads to use for executing async
    /// tasks.
    ///
    /// `num_blocking_threads` is the number of threads to use for executing
    /// blocking jobs like connecting TCP sockets and reading files.
    ///
    /// You probably want to use [`default`](#method.default) instead of this.
    ///
    /// # Errors
    /// Returns an error when a parameter is invalid or it fails to start threads.
    pub fn new(
        num_async_threads: usize,
        num_blocking_threads: usize,
    ) -> Result<Arc<Self>, NewThreadPoolError> {
        Self::with_name("async", num_async_threads, "blocking", num_blocking_threads)
    }

    /// Creates a new executor with thread names prefixed with `name`.
    ///
    /// For example, `with_name("api_async", 4, "api_blocking", 100)`
    /// creates 4 threads named `"api_async0"` through `"api_async3"`
    /// and 100 threads named `"api_blocking0"` through `"api_blocking99"`.
    ///
    /// # Errors
    /// Returns an error when a parameter is invalid or it fails to start threads.
    pub fn with_name(
        async_threads_name: &'static str,
        num_async_threads: usize,
        blocking_threads_name: &'static str,
        num_blocking_threads: usize,
    ) -> Result<Arc<Self>, NewThreadPoolError> {
        Ok(Arc::new(Self {
            async_pool: ThreadPool::new(async_threads_name, num_async_threads)?,
            blocking_pool: ThreadPool::new(blocking_threads_name, num_blocking_threads)?,
        }))
    }

    /// Schedules `func` to run on any available thread in the blocking thread pool.
    ///
    /// Returns immediately.
    ///
    /// Use the returned receiver to get the result of `func`.
    /// If `func` panics, the receiver returns `RecvError`.
    ///
    /// Puts `func` in a
    /// [`Box`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html)
    /// before adding it to the thread pool queue.
    pub fn schedule_blocking<T, F>(self: &Arc<Self>, func: F) -> Receiver<T>
    where
        T: Send + 'static,
        F: (FnOnce() -> T) + Send + 'static,
    {
        let (sender, receiver) = safina_sync::oneshot();
        let weak_self = Arc::downgrade(self);
        self.blocking_pool.schedule(move || {
            let _guard = set_thread_executor(weak_self);
            let _result = sender.send(func());
        });
        receiver
    }

    /// Adds a task that will execute `fut`.
    ///
    /// The task runs on any available worker thread.
    /// The task runs until `fut` completes or the Executor is dropped.
    ///
    /// Returns immediately.
    ///
    /// Uses
    /// [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
    /// to make the future
    /// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
    /// You can use [`spawn_unpin`](#method.spawn_unpin) to avoid this allocation.
    ///
    /// Example:
    /// ```rust
    /// # async fn an_async_fn() -> Result<(), std::io::Error> { Ok(()) }
    /// # fn f() {
    /// let executor = safina_executor::Executor::default();
    /// executor.spawn(async move {
    ///     an_async_fn().await.unwrap();
    /// });
    /// # }
    /// ```
    pub fn spawn(self: &Arc<Self>, fut: impl (Future<Output = ()>) + Send + 'static) {
        self.spawn_unpin(Box::pin(fut));
    }

    /// Adds a task that will execute `fut`.
    ///
    /// The task runs on any available worker thread.
    /// The task runs until `fut` completes or the Executor is dropped.
    ///
    /// Returns immediately.
    ///
    /// Note that `fut` must be
    /// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
    /// You can use
    /// [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
    /// to make it Unpin.  The [`spawn`](#method.spawn) function does this for you.
    /// Or use [`pin_utils::pin_mut`](https://docs.rs/pin-utils/latest/pin_utils/macro.pin_mut.html)
    /// to do it with unsafe code that does not allocate memory.
    pub fn spawn_unpin(self: &Arc<Self>, fut: impl (Future<Output = ()>) + Send + Unpin + 'static) {
        let task: Arc<Mutex<Option<Box<dyn Future<Output = ()> + Send + Unpin>>>> =
            Arc::new(Mutex::new(Some(Box::new(fut))));
        let weak_self = Arc::downgrade(self);
        self.async_pool.schedule(move || poll_task(task, weak_self));
    }

    /// Executes the future on the current thread and returns its result.
    ///
    /// `fut` can call [`spawn`] to create tasks.
    /// Those tasks run on the executor and will continue even after
    /// `fut` completes and this call returns.
    ///
    /// Uses
    /// [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
    /// to make the future
    /// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
    /// You can use [`block_on_unpin`](#method.block_on_unpin) to avoid this allocation.
    ///
    /// # Panics
    /// Panics if the future panics.
    ///
    /// # Example
    /// ```rust
    /// # async fn prepare_request() -> Result<(), std::io::Error> { Ok(()) }
    /// # async fn execute_request() -> Result<(), std::io::Error> { Ok(()) }
    /// # fn f() -> Result<(), std::io::Error> {
    /// let executor = safina_executor::Executor::default();
    /// let result = executor.block_on(async {
    ///     prepare_request().await?;
    ///     execute_request().await
    /// })?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn block_on<R>(self: &Arc<Self>, fut: impl (Future<Output = R>) + 'static) -> R {
        self.block_on_unpin(Box::pin(fut))
    }

    /// Executes the future on the current thread and returns its result.
    ///
    /// `fut` can call [`spawn`] to create tasks.
    /// Those tasks run on the executor and will continue even after
    /// `fut` completes and this call returns.
    ///
    /// Note that `fut` must be
    /// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
    /// You can use
    /// [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
    /// to make it Unpin.  The [`block_on`](#method.block_on) function does this for you.
    /// Or use [`pin_utils::pin_mut`](https://docs.rs/pin-utils/latest/pin_utils/macro.pin_mut.html)
    /// to do it with unsafe code that does not allocate memory.
    ///
    /// # Panics
    /// Panics if the future panics.
    pub fn block_on_unpin<R>(
        self: &Arc<Self>,
        fut: impl (Future<Output = R>) + Unpin + 'static,
    ) -> R {
        let _guard = set_thread_executor(Arc::downgrade(self));
        block_on_unpin(fut)
    }
}

impl Default for Executor {
    fn default() -> Self {
        Arc::try_unwrap(Executor::default()).unwrap_or_else(|_| unreachable!())
    }
}

/// Schedules `func` to run on any available thread in the blocking thread pool.
///
/// Returns immediately.
///
/// Use the returned receiver to get the result of `func`.
/// If `func` panics, the receiver returns `RecvError`.
///
/// Puts `func` in a
/// [`Box`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html)
/// before adding it to the thread pool queue.
///
/// # Panics
/// Panics if the caller is not running on an [`Executor`](struct.Executor.html).
pub fn schedule_blocking<T, F>(func: F) -> Receiver<T>
where
    T: Send + 'static,
    F: (FnOnce() -> T) + Send + 'static,
{
    if let Some(executor) = get_thread_executor() {
        executor.schedule_blocking(func)
    } else {
        // See explanation in `spawn_unpin`.
        panic!(
            "called from outside a task; check for duplicate safina-executor crate: cargo tree -d"
        );
    }
}

#[allow(clippy::needless_pass_by_value)]
fn poll_task(
    task: Arc<Mutex<Option<Box<dyn Future<Output = ()> + Send + Unpin>>>>,
    executor: Weak<Executor>,
) {
    if executor.strong_count() > 0 {
        let waker =
            std::task::Waker::from(Arc::new(TaskWaker::new(task.clone(), executor.clone())));
        let mut cx = std::task::Context::from_waker(&waker);
        let mut opt_fut_guard = task.lock().unwrap();
        if let Some(fut) = opt_fut_guard.as_mut() {
            let _guard = set_thread_executor(executor);
            match Pin::new(&mut *fut).poll(&mut cx) {
                Poll::Ready(()) => {
                    opt_fut_guard.take();
                }
                Poll::Pending => {}
            }
        }
    }
}

struct TaskWaker {
    task: Arc<Mutex<Option<Box<dyn Future<Output = ()> + Send + Unpin>>>>,
    executor: Weak<Executor>,
}
impl TaskWaker {
    pub fn new(
        task: Arc<Mutex<Option<Box<dyn Future<Output = ()> + Send + Unpin>>>>,
        executor: Weak<Executor>,
    ) -> Self {
        Self { task, executor }
    }
}
impl std::task::Wake for TaskWaker {
    fn wake(self: Arc<Self>) {
        if let Some(ref executor) = self.executor.upgrade() {
            let task_clone = self.task.clone();
            let executor_weak = Arc::downgrade(executor);
            executor
                .async_pool
                .schedule(move || poll_task(task_clone, executor_weak));
        }
    }
}

/// Creates a new task to execute `fut` and schedules it for immediate execution.
///
/// Returns immediately.
///
/// Uses
/// [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
/// to make the future
/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
/// You can use [`spawn_unpin`](#method.spawn_unpin) to avoid this allocation.
///
/// # Panics
/// Panics if the caller is not running on an [`Executor`](struct.Executor.html).
///
/// # Example
/// ```rust
/// # async fn an_async_fn() -> Result<(), std::io::Error> { Ok(()) }
/// # fn f() {
/// let executor = safina_executor::Executor::default();
/// executor.spawn(async move {
///     safina_executor::spawn(async move {
///         an_async_fn().await.unwrap();
///     });
/// });
/// # }
/// ```
pub fn spawn(fut: impl (Future<Output = ()>) + Send + 'static) {
    spawn_unpin(Box::pin(fut));
}

/// Creates a new task to execute `fut` and schedules it for immediate execution.
///
/// Returns immediately.
///
/// Note that `fut` must be
/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
/// You can use
/// [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
/// to make it Unpin.  The [`spawn`](#method.spawn) function does this for you.
/// Or use [`pin_utils::pin_mut`](https://docs.rs/pin-utils/latest/pin_utils/macro.pin_mut.html)
/// to do it with unsafe code that does not allocate memory.
///
/// # Panics
/// Panics if the caller is not running on an [`Executor`](struct.Executor.html).
pub fn spawn_unpin(fut: impl (Future<Output = ()>) + Send + Unpin + 'static) {
    if let Some(executor) = get_thread_executor() {
        let task: Arc<Mutex<Option<Box<dyn Future<Output = ()> + Send + Unpin>>>> =
            Arc::new(Mutex::new(Some(Box::new(fut))));
        let executor_weak = Arc::downgrade(&executor);
        executor
            .async_pool
            .schedule(move || poll_task(task, executor_weak));
    } else {
        // Note: This panic can happen when there are two versions of the safina-executor crate.
        // Example:  You use the `async_test` macro.  It creates an executor with the first
        // version of the crate.  The executor is stored in the first crate's thread-local storage.
        // The second crate's thread-local storage is empty.  If the test code calls the second
        // crate's spawn or schedule functions, it hits this error.
        panic!(
            "called from outside a task; check for duplicate safina-executor crate: cargo tree -d"
        );
    }
}

/// Executes the future on the current thread and returns its result.
///
/// Does not create an executor.
/// Use [`Executor::block_on`](struct.Executor.html#method.block_on)
/// if you need `fut` to spawn new tasks.
///
/// Uses
/// [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
/// to make the future
/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
/// You can use [`block_on_unpin`](#method.block_on_unpin) to avoid this allocation.
///
/// # Panics
/// Panics if the future panics.
///
/// Panics if `fut` calls [`spawn`](fn.spawn.html) to create a new task.
///
/// # Example
/// ```rust
/// # async fn prepare_request() -> Result<(), std::io::Error> { Ok(()) }
/// # async fn execute_request() -> Result<(), std::io::Error> { Ok(()) }
/// # fn f() -> Result<(), std::io::Error> {
/// let result = safina_executor::block_on(async {
///     prepare_request().await?;
///     execute_request().await
/// })?;
/// # Ok(())
/// # }
/// ```
pub fn block_on<R>(fut: impl (Future<Output = R>) + 'static) -> R {
    block_on_unpin(Box::pin(fut))
}

/// Executes the future on the current thread and returns its result.
///
/// Does not create an executor.
/// Use [`Executor::block_on`](struct.Executor.html#method.block_on)
/// if you need `fut` to spawn new tasks.
///
/// Note that `fut` must be
/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
/// You can use
/// [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
/// to make it Unpin.  The [`block_on`](#method.block_on) function does this for you.
/// Or use [`pin_utils::pin_mut`](https://docs.rs/pin-utils/latest/pin_utils/macro.pin_mut.html)
/// to do it with unsafe code that does not allocate memory.
///
/// # Panics
/// Panics if the future panics.
///
/// Panics if `fut` calls [`spawn`](fn.spawn.html) to create a new task.
pub fn block_on_unpin<R>(mut fut: impl (Future<Output = R>) + Unpin + 'static) -> R {
    struct BlockOnTaskWaker(Mutex<Option<SyncSender<()>>>);
    impl std::task::Wake for BlockOnTaskWaker {
        fn wake(self: Arc<Self>) {
            if let Some(sender) = self.0.lock().unwrap().take() {
                let _ = sender.send(());
            }
        }
    }
    loop {
        let (sender, receiver) = std::sync::mpsc::sync_channel(1);
        let waker = std::task::Waker::from(Arc::new(BlockOnTaskWaker(Mutex::new(Some(sender)))));
        let mut cx = std::task::Context::from_waker(&waker);
        if let Poll::Ready(result) = Pin::new(&mut fut).poll(&mut cx) {
            return result;
        }
        receiver.recv().unwrap();
    }
}