Skip to main content

qubit_tokio_executor/
tokio_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use qubit_function::Callable;
11
12use crate::TokioExecution;
13use qubit_executor::executor::{
14    Executor,
15    FutureExecutor,
16};
17
18/// Executes callable tasks on Tokio's blocking task pool.
19///
20/// `TokioExecutor` is a [`FutureExecutor`]: its [`Executor::call`] and
21/// [`Executor::execute`] methods return a [`TokioExecution`] value that
22/// **implements [`Future`]** with
23/// [`Output`](std::future::Future::Output) `= Result<R, E>`. You
24/// obtain the callable's result by **`.await`ing** (or polling) that future; it
25/// is **not** a resolved [`Result`] at return time.
26///
27/// # Semantics
28///
29/// * **`call` schedules work immediately** — [`Executor::call`] runs
30///   [`tokio::task::spawn_blocking`] **synchronously** before it returns. A
31///   Tokio runtime must **already be active** on the current thread when `call`
32///   runs (for example inside an `async` block executed under
33///   [`Runtime::block_on`](tokio::runtime::Runtime::block_on) or
34///   [`#[tokio::main]`](https://docs.rs/tokio/latest/tokio/attr.main.html)).
35///   Calling `call` first and only then entering a runtime is wrong: the
36///   blocking task was submitted with **no** runtime at `call` time.
37/// * **Any normal Tokio entry point works** — you are **not** restricted to
38///   [`Builder::new_current_thread`](tokio::runtime::Builder::new_current_thread);
39///   a multi-thread [`Runtime`](tokio::runtime::Runtime) or an async handler in
40///   a server is fine, as long as `call` happens while that runtime is running.
41/// * **Await the returned future on Tokio** — the [`TokioExecution`] polls a
42///   [`JoinHandle`](tokio::task::JoinHandle); complete it with `.await` inside
43///   the same kind of Tokio-driven async context.
44/// * **Blocking pool** — the closure runs on Tokio's *blocking* thread pool, not
45///   on the core async worker threads, so heavy synchronous work does not
46///   starve other async tasks on the runtime.
47/// * **Compared to
48///   [`ThreadPerTaskExecutor`](qubit_executor::executor::ThreadPerTaskExecutor)** —
49///   this type **reuses** Tokio-managed blocking threads (bounded pool) instead
50///   of one new [`std::thread`] per task, and you **await** the result instead
51///   of calling a blocking [`TaskHandle::get`](qubit_executor::TaskHandle::get).
52///
53/// # Examples
54///
55/// The following uses a single-thread [`Runtime`](tokio::runtime::Runtime) only to keep the snippet
56/// self-contained; [`#[tokio::main]`](https://docs.rs/tokio/latest/tokio/attr.main.html)
57/// or a multi-thread runtime are equally valid.
58///
59/// ```rust
60/// use std::io;
61///
62/// use qubit_tokio_executor::{
63///     Executor,
64///     TokioExecutor,
65/// };
66///
67/// # fn main() -> io::Result<()> {
68/// tokio::runtime::Builder::new_current_thread()
69///     .enable_all()
70///     .build()?
71///     .block_on(async {
72///         let executor = TokioExecutor;
73///         let value = executor.call(|| Ok::<i32, io::Error>(40 + 2)).await?;
74///         assert_eq!(value, 42);
75///         Ok::<(), io::Error>(())
76///     })?;
77/// # Ok(())
78/// # }
79/// ```
80#[derive(Debug, Default, Clone, Copy)]
81pub struct TokioExecutor;
82
83impl Executor for TokioExecutor {
84    type Execution<R, E>
85        = TokioExecution<R, E>
86    where
87        R: Send + 'static,
88        E: std::fmt::Display + Send + 'static;
89
90    /// Spawns the callable on Tokio's blocking task pool.
91    ///
92    /// This method invokes [`tokio::task::spawn_blocking`] **before** returning.
93    /// A Tokio runtime must be active when this method runs; see [`TokioExecutor`].
94    ///
95    /// # Parameters
96    ///
97    /// * `task` - Callable to run on Tokio's blocking task pool.
98    ///
99    /// # Returns
100    ///
101    /// A [`TokioExecution`] that implements [`Future`] with
102    /// [`Output`](std::future::Future::Output) `= Result<R, E>`. Await it to obtain the
103    /// callable's result.
104    fn call<C, R, E>(&self, mut task: C) -> Self::Execution<R, E>
105    where
106        C: Callable<R, E> + Send + 'static,
107        R: Send + 'static,
108        E: std::fmt::Display + Send + 'static,
109    {
110        // `spawn_blocking` runs now and requires `Handle::current()` — caller must
111        // already be inside a Tokio runtime (see struct-level documentation).
112        TokioExecution::new(tokio::task::spawn_blocking(move || task.call()))
113    }
114}
115
116impl FutureExecutor for TokioExecutor {}