qubit_tokio_executor/tokio_executor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use qubit_function::Callable;
11
12use crate::TokioExecution;
13use qubit_executor::executor::{
14 Executor,
15 FutureExecutor,
16};
17
18/// Executes callable tasks on Tokio's blocking task pool.
19///
20/// `TokioExecutor` is a [`FutureExecutor`]: its [`Executor::call`] and
21/// [`Executor::execute`] methods return a [`TokioExecution`] value that
22/// **implements [`Future`]** with
23/// [`Output`](std::future::Future::Output) `= Result<R, E>`. You
24/// obtain the callable's result by **`.await`ing** (or polling) that future; it
25/// is **not** a resolved [`Result`] at return time.
26///
27/// # Semantics
28///
29/// * **`call` schedules work immediately** — [`Executor::call`] runs
30/// [`tokio::task::spawn_blocking`] **synchronously** before it returns. A
31/// Tokio runtime must **already be active** on the current thread when `call`
32/// runs (for example inside an `async` block executed under
33/// [`Runtime::block_on`](tokio::runtime::Runtime::block_on) or
34/// [`#[tokio::main]`](https://docs.rs/tokio/latest/tokio/attr.main.html)).
35/// Calling `call` first and only then entering a runtime is wrong: the
36/// blocking task was submitted with **no** runtime at `call` time.
37/// * **Any normal Tokio entry point works** — you are **not** restricted to
38/// [`Builder::new_current_thread`](tokio::runtime::Builder::new_current_thread);
39/// a multi-thread [`Runtime`](tokio::runtime::Runtime) or an async handler in
40/// a server is fine, as long as `call` happens while that runtime is running.
41/// * **Await the returned future on Tokio** — the [`TokioExecution`] polls a
42/// [`JoinHandle`](tokio::task::JoinHandle); complete it with `.await` inside
43/// the same kind of Tokio-driven async context.
44/// * **Blocking pool** — the closure runs on Tokio's *blocking* thread pool, not
45/// on the core async worker threads, so heavy synchronous work does not
46/// starve other async tasks on the runtime.
47/// * **Compared to
48/// [`ThreadPerTaskExecutor`](qubit_executor::executor::ThreadPerTaskExecutor)** —
49/// this type **reuses** Tokio-managed blocking threads (bounded pool) instead
50/// of one new [`std::thread`] per task, and you **await** the result instead
51/// of calling a blocking [`TaskHandle::get`](qubit_executor::TaskHandle::get).
52///
53/// # Examples
54///
55/// The following uses a single-thread [`Runtime`](tokio::runtime::Runtime) only to keep the snippet
56/// self-contained; [`#[tokio::main]`](https://docs.rs/tokio/latest/tokio/attr.main.html)
57/// or a multi-thread runtime are equally valid.
58///
59/// ```rust
60/// use std::io;
61///
62/// use qubit_tokio_executor::{
63/// Executor,
64/// TokioExecutor,
65/// };
66///
67/// # fn main() -> io::Result<()> {
68/// tokio::runtime::Builder::new_current_thread()
69/// .enable_all()
70/// .build()?
71/// .block_on(async {
72/// let executor = TokioExecutor;
73/// let value = executor.call(|| Ok::<i32, io::Error>(40 + 2)).await?;
74/// assert_eq!(value, 42);
75/// Ok::<(), io::Error>(())
76/// })?;
77/// # Ok(())
78/// # }
79/// ```
80#[derive(Debug, Default, Clone, Copy)]
81pub struct TokioExecutor;
82
83impl Executor for TokioExecutor {
84 type Execution<R, E>
85 = TokioExecution<R, E>
86 where
87 R: Send + 'static,
88 E: std::fmt::Display + Send + 'static;
89
90 /// Spawns the callable on Tokio's blocking task pool.
91 ///
92 /// This method invokes [`tokio::task::spawn_blocking`] **before** returning.
93 /// A Tokio runtime must be active when this method runs; see [`TokioExecutor`].
94 ///
95 /// # Parameters
96 ///
97 /// * `task` - Callable to run on Tokio's blocking task pool.
98 ///
99 /// # Returns
100 ///
101 /// A [`TokioExecution`] that implements [`Future`] with
102 /// [`Output`](std::future::Future::Output) `= Result<R, E>`. Await it to obtain the
103 /// callable's result.
104 fn call<C, R, E>(&self, mut task: C) -> Self::Execution<R, E>
105 where
106 C: Callable<R, E> + Send + 'static,
107 R: Send + 'static,
108 E: std::fmt::Display + Send + 'static,
109 {
110 // `spawn_blocking` runs now and requires `Handle::current()` — caller must
111 // already be inside a Tokio runtime (see struct-level documentation).
112 TokioExecution::new(tokio::task::spawn_blocking(move || task.call()))
113 }
114}
115
116impl FutureExecutor for TokioExecutor {}