qubit-tokio-executor 0.3.0

Tokio-backed executor and executor service implementations for Qubit Rust libraries
Documentation
/*******************************************************************************
 *
 *    Copyright (c) 2025 - 2026 Haixing Hu.
 *
 *    SPDX-License-Identifier: Apache-2.0
 *
 *    Licensed under the Apache License, Version 2.0.
 *
 ******************************************************************************/
use qubit_function::Callable;

use qubit_executor::{
    TrackedTask,
    executor::Executor,
    service::SubmissionError,
    task::spi::{
        TaskEndpointPair,
        TaskRunner,
    },
};

/// Executes callable tasks on Tokio's blocking task pool.
///
/// `TokioExecutor` implements [`Executor`] by submitting work to Tokio's
/// blocking task pool and returning the standard tracked task handle.
///
/// # Semantics
///
/// * **`call` schedules work immediately** — [`Executor::call`] runs
///   [`tokio::task::spawn_blocking`] **synchronously** before it returns. A
///   Tokio runtime must **already be active** on the current thread when `call`
///   runs (for example inside an `async` block executed under
///   [`Runtime::block_on`](tokio::runtime::Runtime::block_on) or
///   [`#[tokio::main]`](https://docs.rs/tokio/latest/tokio/attr.main.html)).
///   Calling `call` first and only then entering a runtime is wrong: the
///   blocking task was submitted with **no** runtime at `call` time.
/// * **Any normal Tokio entry point works** — you are **not** restricted to
///   [`Builder::new_current_thread`](tokio::runtime::Builder::new_current_thread);
///   a multi-thread [`Runtime`](tokio::runtime::Runtime) or an async handler in
///   a server is fine, as long as `call` happens while that runtime is running.
/// * **Await the returned tracked task on Tokio** — the returned
///   [`TrackedTask`] implements [`IntoFuture`](std::future::IntoFuture), so it
///   can be awaited inside a Tokio-driven async context after submission
///   succeeds.
/// * **Blocking pool** — the closure runs on Tokio's *blocking* thread pool, not
///   on the core async worker threads, so heavy synchronous work does not
///   starve other async tasks on the runtime.
/// * **Compared to
///   [`ThreadPerTaskExecutor`](qubit_executor::executor::ThreadPerTaskExecutor)** —
///   this type **reuses** Tokio-managed blocking threads (bounded pool) instead
///   of one new [`std::thread`] per task, and can return a handle that is either
///   awaited or read with blocking `get`.
///
/// # Examples
///
/// The following uses a single-thread [`Runtime`](tokio::runtime::Runtime) only to keep the snippet
/// self-contained; [`#[tokio::main]`](https://docs.rs/tokio/latest/tokio/attr.main.html)
/// or a multi-thread runtime are equally valid.
///
/// ```rust
/// use std::io;
///
/// use qubit_tokio_executor::{
///     Executor,
///     TokioExecutor,
/// };
///
/// # fn main() -> io::Result<()> {
/// tokio::runtime::Builder::new_current_thread()
///     .enable_all()
///     .build()?
///     .block_on(async {
///         let executor = TokioExecutor;
///         let value = executor
///             .call(|| Ok::<i32, io::Error>(40 + 2))
///             .expect("executor should accept callable")
///             .await
///             .expect("callable should complete successfully");
///         assert_eq!(value, 42);
///         Ok::<(), io::Error>(())
///     })?;
/// # Ok(())
/// # }
/// ```
#[derive(Debug, Default, Clone, Copy)]
pub struct TokioExecutor;

impl Executor for TokioExecutor {
    /// Spawns the callable on Tokio's blocking task pool.
    ///
    /// This method invokes [`tokio::task::spawn_blocking`] **before** returning.
    /// A Tokio runtime must be active when this method runs; see [`TokioExecutor`].
    ///
    /// # Parameters
    ///
    /// * `task` - Callable to run on Tokio's blocking task pool.
    ///
    /// # Returns
    ///
    /// A tracked task handle for the accepted callable.
    fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
    where
        C: Callable<R, E> + Send + 'static,
        R: Send + 'static,
        E: Send + 'static,
    {
        // `spawn_blocking` runs now and requires `Handle::current()` — caller must
        // already be inside a Tokio runtime (see struct-level documentation).
        let (handle, slot) = TaskEndpointPair::new().into_tracked_parts();
        tokio::task::spawn_blocking(move || {
            TaskRunner::new(task).run(slot);
        });
        Ok(handle)
    }
}