qubit_tokio_executor/tokio_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::{
14 Arc,
15 MutexGuard,
16 },
17};
18
19use qubit_function::{
20 Callable,
21 Runnable,
22};
23
24use qubit_executor::TaskHandle;
25use qubit_executor::task::spi::{
26 TaskEndpointPair,
27 TaskRunner,
28};
29
30use crate::TokioBlockingTaskHandle;
31use crate::tokio_executor_service_state::TokioExecutorServiceState;
32use crate::tokio_runtime::ensure_tokio_runtime_entered;
33use crate::tokio_service_task_guard::TokioServiceTaskGuard;
34use crate::tokio_task_slot_cancellation::{
35 cancel_unstarted_task_slot_if_queued,
36 share_task_slot,
37 take_task_slot,
38};
39use qubit_executor::service::{
40 ExecutorService,
41 ExecutorServiceLifecycle,
42 StopReport,
43 SubmissionError,
44};
45use tokio::task::AbortHandle;
46
47/// Tokio-backed service for submitted blocking tasks.
48///
49/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
50/// [`Callable`] tasks and runs them through Tokio's blocking task pool.
51#[derive(Default, Clone)]
52pub struct TokioExecutorService {
53 /// Shared service state used by all clones of this service.
54 state: Arc<TokioExecutorServiceState>,
55}
56
57/// Tokio-backed blocking executor service routed through `spawn_blocking`.
58pub type TokioBlockingExecutorService = TokioExecutorService;
59
60impl TokioExecutorService {
61 /// Creates a new service instance.
62 ///
63 /// # Returns
64 ///
65 /// A Tokio-backed executor service.
66 #[inline]
67 pub fn new() -> Self {
68 Self::default()
69 }
70
71 /// Prepares a blocking-task submission under the service submission lock.
72 ///
73 /// # Returns
74 ///
75 /// The held submission lock, service-local task marker, and lifecycle
76 /// guard for the accepted queued task.
77 ///
78 /// # Errors
79 ///
80 /// Returns [`SubmissionError::Shutdown`] if the service is not running, or
81 /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
82 /// entered into a Tokio runtime.
83 fn prepare_blocking_submission(
84 &self,
85 ) -> Result<(MutexGuard<'_, ()>, Arc<()>, TokioServiceTaskGuard), SubmissionError> {
86 let submission_guard = self.state.lock_submission();
87 if self.state.is_not_running() {
88 return Err(SubmissionError::Shutdown);
89 }
90 ensure_tokio_runtime_entered()?;
91 self.state.accept_task();
92
93 let marker = Arc::new(());
94 let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
95 Ok((submission_guard, marker, guard))
96 }
97
98 /// Spawns an already accepted blocking task and registers its abort hook.
99 ///
100 /// # Parameters
101 ///
102 /// * `submission_guard` - Submission lock held since task acceptance.
103 /// * `marker` - Service-local marker associated with this task.
104 /// * `guard` - Lifecycle guard that owns service-side task accounting.
105 /// * `task` - Work to run after the blocking closure starts.
106 /// * `cancel` - Hook invoked by service stop when Tokio aborts queued
107 /// work. It returns `true` only if queued service accounting was
108 /// actually cancelled by that hook.
109 ///
110 /// # Returns
111 ///
112 /// Tokio abort handle for the spawned blocking task.
113 ///
114 /// # Panics
115 ///
116 /// Panics only if called without an entered Tokio runtime. Callers must use
117 /// [`Self::prepare_blocking_submission`] first.
118 fn spawn_accepted_blocking_task<F, C>(
119 &self,
120 submission_guard: MutexGuard<'_, ()>,
121 marker: Arc<()>,
122 guard: TokioServiceTaskGuard,
123 task: F,
124 cancel: C,
125 ) -> AbortHandle
126 where
127 F: FnOnce() + Send + 'static,
128 C: FnOnce() -> bool + Send + 'static,
129 {
130 let join_handle = tokio::task::spawn_blocking(move || {
131 let guard = guard;
132 if !guard.mark_started() {
133 return;
134 }
135 task();
136 });
137 let abort_handle = join_handle.abort_handle();
138 self.state
139 .register_abort_handle(marker, abort_handle.clone(), cancel);
140 drop(submission_guard);
141 abort_handle
142 }
143}
144
145impl ExecutorService for TokioExecutorService {
146 type ResultHandle<R, E>
147 = TaskHandle<R, E>
148 where
149 R: Send + 'static,
150 E: Send + 'static;
151
152 type TrackedHandle<R, E>
153 = TokioBlockingTaskHandle<R, E>
154 where
155 R: Send + 'static,
156 E: Send + 'static;
157
158 /// Accepts a runnable and runs it through Tokio.
159 ///
160 /// # Parameters
161 ///
162 /// * `task` - Runnable to execute on Tokio's blocking task pool.
163 ///
164 /// # Returns
165 ///
166 /// `Ok(())` if the task was accepted.
167 ///
168 /// # Errors
169 ///
170 /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
171 /// requested before the task is accepted. Returns
172 /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
173 /// entered into a Tokio runtime.
174 fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
175 where
176 T: Runnable<E> + Send + 'static,
177 E: Send + 'static,
178 {
179 let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
180 let abort_queued_task = guard.finish_queued_once_callback();
181 self.spawn_accepted_blocking_task(
182 submission_guard,
183 marker,
184 guard,
185 move || {
186 let mut task = task;
187 let runner = TaskRunner::new(move || task.run());
188 let _ = runner.call::<(), E>();
189 },
190 abort_queued_task,
191 );
192 Ok(())
193 }
194
195 /// Accepts a callable and runs it through Tokio.
196 ///
197 /// # Parameters
198 ///
199 /// * `task` - Callable to execute on Tokio's blocking task pool.
200 ///
201 /// # Returns
202 ///
203 /// A [`TaskHandle`] for the accepted task.
204 ///
205 /// # Errors
206 ///
207 /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
208 /// requested before the task is accepted. Returns
209 /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
210 /// entered into a Tokio runtime.
211 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
212 where
213 C: Callable<R, E> + Send + 'static,
214 R: Send + 'static,
215 E: Send + 'static,
216 {
217 let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
218 let (handle, completion) = TaskEndpointPair::new().into_parts();
219 completion.accept();
220 let completion = share_task_slot(completion);
221 let abort_completion = Arc::clone(&completion);
222 let abort_queued_task = guard.finish_queued_once_callback();
223 self.spawn_accepted_blocking_task(
224 submission_guard,
225 marker,
226 guard,
227 move || {
228 if let Some(completion) = take_task_slot(&completion) {
229 TaskRunner::new(task).run(completion);
230 }
231 },
232 move || cancel_unstarted_task_slot_if_queued(&abort_completion, abort_queued_task),
233 );
234 Ok(handle)
235 }
236
237 /// Accepts a callable and returns an actively tracked handle.
238 ///
239 /// # Parameters
240 ///
241 /// * `task` - Callable to execute on Tokio's blocking task pool.
242 ///
243 /// # Returns
244 ///
245 /// A [`TokioBlockingTaskHandle`] for the accepted task.
246 ///
247 /// # Errors
248 ///
249 /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
250 /// requested before the task is accepted. Returns
251 /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
252 /// entered into a Tokio runtime.
253 fn submit_tracked_callable<C, R, E>(
254 &self,
255 task: C,
256 ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
257 where
258 C: Callable<R, E> + Send + 'static,
259 R: Send + 'static,
260 E: Send + 'static,
261 {
262 let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
263 let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
264 completion.accept();
265 let completion = share_task_slot(completion);
266 let abort_completion = Arc::clone(&completion);
267 let abort_queued_task = guard.finish_queued_once_callback();
268 let cancel_queued_task = guard.cancel_queued_callback();
269 let abort_handle = self.spawn_accepted_blocking_task(
270 submission_guard,
271 marker,
272 guard,
273 move || {
274 if let Some(completion) = take_task_slot(&completion) {
275 TaskRunner::new(task).run(completion);
276 }
277 },
278 move || cancel_unstarted_task_slot_if_queued(&abort_completion, abort_queued_task),
279 );
280 Ok(TokioBlockingTaskHandle::new(
281 handle,
282 abort_handle,
283 cancel_queued_task,
284 ))
285 }
286
287 /// Stops accepting new tasks.
288 ///
289 /// Already accepted tasks are allowed to finish unless they are cancelled
290 /// before their blocking closure starts.
291 fn shutdown(&self) {
292 let _guard = self.state.lock_submission();
293 self.state.shutdown();
294 self.state.notify_if_terminated();
295 }
296
297 /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
298 ///
299 /// Tokio cannot abort blocking tasks that have already started. Such tasks
300 /// continue running and keep the service active until their closure returns.
301 ///
302 /// # Returns
303 ///
304 /// A report with queued and running blocking task counts observed when
305 /// stop was requested, plus the number of queued blocking tasks that were
306 /// actually cancelled before their blocking closures started.
307 fn stop(&self) -> StopReport {
308 let _guard = self.state.lock_submission();
309 self.state.stop();
310 let (queued_count, running_count) = self.state.task_count_snapshot();
311 let cancellation_count = self.state.abort_tracked_tasks();
312 self.state.notify_if_terminated();
313 StopReport::new(queued_count, running_count, cancellation_count)
314 }
315
316 /// Returns the current lifecycle state.
317 fn lifecycle(&self) -> ExecutorServiceLifecycle {
318 self.state.lifecycle()
319 }
320
321 /// Returns whether shutdown has been requested.
322 fn is_not_running(&self) -> bool {
323 self.state.is_not_running()
324 }
325
326 /// Returns whether shutdown was requested and all tasks are finished.
327 fn is_terminated(&self) -> bool {
328 self.lifecycle() == ExecutorServiceLifecycle::Terminated
329 }
330
331 /// Blocks until the service has terminated.
332 fn wait_termination(&self) {
333 self.state.wait_termination();
334 }
335}
336
337impl TokioExecutorService {
338 /// Waits asynchronously until the service has terminated.
339 ///
340 /// # Returns
341 ///
342 /// A future that resolves after shutdown or stop has been requested and all
343 /// accepted blocking tasks have finished or been aborted before start.
344 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
345 Box::pin(async move {
346 let notified = self.state.terminated_notify.notified();
347 tokio::pin!(notified);
348 loop {
349 notified.as_mut().enable();
350 if self.is_terminated() {
351 return;
352 }
353 notified.as_mut().await;
354 notified.set(self.state.terminated_notify.notified());
355 }
356 })
357 }
358}