qubit_tokio_executor/tokio_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::{
14 Arc,
15 MutexGuard,
16 },
17};
18
19use qubit_function::{
20 Callable,
21 Runnable,
22};
23
24use qubit_executor::TaskHandle;
25use qubit_executor::task::spi::{
26 TaskEndpointPair,
27 TaskRunner,
28};
29
30use crate::TokioBlockingTaskHandle;
31use crate::tokio_executor_service_state::TokioExecutorServiceState;
32use crate::tokio_runtime::ensure_tokio_runtime_entered;
33use crate::tokio_service_task_guard::TokioServiceTaskGuard;
34use crate::tokio_task_slot_cancellation::{
35 cancel_unstarted_task_slot_if_queued,
36 share_task_slot,
37 take_task_slot,
38};
39use qubit_executor::service::{
40 ExecutorService,
41 ExecutorServiceLifecycle,
42 StopReport,
43 SubmissionError,
44};
45use tokio::task::AbortHandle;
46
47/// Tokio-backed service for submitted blocking tasks.
48///
49/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
50/// [`Callable`] tasks and runs them through Tokio's blocking task pool.
51#[derive(Default, Clone)]
52pub struct TokioExecutorService {
53 /// Shared service state used by all clones of this service.
54 state: Arc<TokioExecutorServiceState>,
55}
56
57/// Tokio-backed blocking executor service routed through `spawn_blocking`.
58pub type TokioBlockingExecutorService = TokioExecutorService;
59
60impl TokioExecutorService {
61 /// Creates a new service instance.
62 ///
63 /// # Returns
64 ///
65 /// A Tokio-backed executor service.
66 #[inline]
67 pub fn new() -> Self {
68 Self::default()
69 }
70
71 /// Prepares a blocking-task submission under the service submission lock.
72 ///
73 /// # Returns
74 ///
75 /// The held submission lock, service-local task marker, and lifecycle
76 /// guard for the accepted queued task.
77 ///
78 /// # Errors
79 ///
80 /// Returns [`SubmissionError::Shutdown`] if the service is not running, or
81 /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
82 /// entered into a Tokio runtime.
83 fn prepare_blocking_submission(
84 &self,
85 ) -> Result<(MutexGuard<'_, ()>, Arc<()>, TokioServiceTaskGuard), SubmissionError> {
86 let submission_guard = self.state.lock_submission();
87 if self.state.is_not_running() {
88 return Err(SubmissionError::Shutdown);
89 }
90 ensure_tokio_runtime_entered()?;
91 self.state.accept_task();
92
93 let marker = Arc::new(());
94 let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
95 Ok((submission_guard, marker, guard))
96 }
97
98 /// Spawns an already accepted blocking task and registers its abort hook.
99 ///
100 /// # Parameters
101 ///
102 /// * `submission_guard` - Submission lock held since task acceptance.
103 /// * `marker` - Service-local marker associated with this task.
104 /// * `guard` - Lifecycle guard that owns service-side task accounting.
105 /// * `task` - Work to run after the blocking closure starts.
106 /// * `cancel` - Hook invoked by service stop when Tokio aborts queued
107 /// work. It returns `true` only if queued service accounting was
108 /// actually cancelled by that hook.
109 ///
110 /// # Returns
111 ///
112 /// Tokio abort handle for the spawned blocking task.
113 ///
114 /// # Panics
115 ///
116 /// Panics only if called without an entered Tokio runtime. Callers must use
117 /// [`Self::prepare_blocking_submission`] first.
118 fn spawn_accepted_blocking_task<F, C>(
119 &self,
120 submission_guard: MutexGuard<'_, ()>,
121 marker: Arc<()>,
122 guard: TokioServiceTaskGuard,
123 task: F,
124 cancel: C,
125 ) -> AbortHandle
126 where
127 F: FnOnce() + Send + 'static,
128 C: FnOnce() -> bool + Send + 'static,
129 {
130 let join_handle = tokio::task::spawn_blocking(move || {
131 let guard = guard;
132 if !guard.mark_started() {
133 return;
134 }
135 task();
136 });
137 let abort_handle = join_handle.abort_handle();
138 self.state.register_abort_handle(marker, abort_handle.clone(), cancel);
139 drop(submission_guard);
140 abort_handle
141 }
142}
143
144impl ExecutorService for TokioExecutorService {
145 type ResultHandle<R, E>
146 = TaskHandle<R, E>
147 where
148 R: Send + 'static,
149 E: Send + 'static;
150
151 type TrackedHandle<R, E>
152 = TokioBlockingTaskHandle<R, E>
153 where
154 R: Send + 'static,
155 E: Send + 'static;
156
157 /// Accepts a runnable and runs it through Tokio.
158 ///
159 /// # Parameters
160 ///
161 /// * `task` - Runnable to execute on Tokio's blocking task pool.
162 ///
163 /// # Returns
164 ///
165 /// `Ok(())` if the task was accepted.
166 ///
167 /// # Errors
168 ///
169 /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
170 /// requested before the task is accepted. Returns
171 /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
172 /// entered into a Tokio runtime.
173 fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
174 where
175 T: Runnable<E> + Send + 'static,
176 E: Send + 'static,
177 {
178 let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
179 let abort_queued_task = guard.finish_queued_once_callback();
180 self.spawn_accepted_blocking_task(
181 submission_guard,
182 marker,
183 guard,
184 move || {
185 let mut task = task;
186 let runner = TaskRunner::new(move || task.run());
187 let _ = runner.call::<(), E>();
188 },
189 abort_queued_task,
190 );
191 Ok(())
192 }
193
194 /// Accepts a callable and runs it through Tokio.
195 ///
196 /// # Parameters
197 ///
198 /// * `task` - Callable to execute on Tokio's blocking task pool.
199 ///
200 /// # Returns
201 ///
202 /// A [`TaskHandle`] for the accepted task.
203 ///
204 /// # Errors
205 ///
206 /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
207 /// requested before the task is accepted. Returns
208 /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
209 /// entered into a Tokio runtime.
210 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
211 where
212 C: Callable<R, E> + Send + 'static,
213 R: Send + 'static,
214 E: Send + 'static,
215 {
216 let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
217 let (handle, completion) = TaskEndpointPair::new().into_parts();
218 completion.accept();
219 let completion = share_task_slot(completion);
220 let abort_completion = Arc::clone(&completion);
221 let abort_queued_task = guard.finish_queued_once_callback();
222 self.spawn_accepted_blocking_task(
223 submission_guard,
224 marker,
225 guard,
226 move || {
227 if let Some(completion) = take_task_slot(&completion) {
228 TaskRunner::new(task).run(completion);
229 }
230 },
231 move || cancel_unstarted_task_slot_if_queued(&abort_completion, abort_queued_task),
232 );
233 Ok(handle)
234 }
235
236 /// Accepts a callable and returns an actively tracked handle.
237 ///
238 /// # Parameters
239 ///
240 /// * `task` - Callable to execute on Tokio's blocking task pool.
241 ///
242 /// # Returns
243 ///
244 /// A [`TokioBlockingTaskHandle`] for the accepted task.
245 ///
246 /// # Errors
247 ///
248 /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
249 /// requested before the task is accepted. Returns
250 /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
251 /// entered into a Tokio runtime.
252 fn submit_tracked_callable<C, R, E>(&self, task: C) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
253 where
254 C: Callable<R, E> + Send + 'static,
255 R: Send + 'static,
256 E: Send + 'static,
257 {
258 let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
259 let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
260 completion.accept();
261 let completion = share_task_slot(completion);
262 let abort_completion = Arc::clone(&completion);
263 let abort_queued_task = guard.finish_queued_once_callback();
264 let cancel_queued_task = guard.cancel_queued_callback();
265 let abort_handle = self.spawn_accepted_blocking_task(
266 submission_guard,
267 marker,
268 guard,
269 move || {
270 if let Some(completion) = take_task_slot(&completion) {
271 TaskRunner::new(task).run(completion);
272 }
273 },
274 move || cancel_unstarted_task_slot_if_queued(&abort_completion, abort_queued_task),
275 );
276 Ok(TokioBlockingTaskHandle::new(handle, abort_handle, cancel_queued_task))
277 }
278
279 /// Stops accepting new tasks.
280 ///
281 /// Already accepted tasks are allowed to finish unless they are cancelled
282 /// before their blocking closure starts.
283 fn shutdown(&self) {
284 let _guard = self.state.lock_submission();
285 self.state.shutdown();
286 self.state.notify_if_terminated();
287 }
288
289 /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
290 ///
291 /// Tokio cannot abort blocking tasks that have already started. Such tasks
292 /// continue running and keep the service active until their closure returns.
293 ///
294 /// # Returns
295 ///
296 /// A report with queued and running blocking task counts observed when
297 /// stop was requested, plus the number of queued blocking tasks that were
298 /// actually cancelled before their blocking closures started.
299 fn stop(&self) -> StopReport {
300 let _guard = self.state.lock_submission();
301 self.state.stop();
302 let (queued_count, running_count) = self.state.task_count_snapshot();
303 let cancellation_count = self.state.abort_tracked_tasks();
304 self.state.notify_if_terminated();
305 StopReport::new(queued_count, running_count, cancellation_count)
306 }
307
308 /// Returns the current lifecycle state.
309 fn lifecycle(&self) -> ExecutorServiceLifecycle {
310 self.state.lifecycle()
311 }
312
313 /// Returns whether shutdown has been requested.
314 fn is_not_running(&self) -> bool {
315 self.state.is_not_running()
316 }
317
318 /// Returns whether shutdown was requested and all tasks are finished.
319 fn is_terminated(&self) -> bool {
320 self.lifecycle() == ExecutorServiceLifecycle::Terminated
321 }
322
323 /// Blocks until the service has terminated.
324 fn wait_termination(&self) {
325 self.state.wait_termination();
326 }
327}
328
329impl TokioExecutorService {
330 /// Waits asynchronously until the service has terminated.
331 ///
332 /// # Returns
333 ///
334 /// A future that resolves after shutdown or stop has been requested and all
335 /// accepted blocking tasks have finished or been aborted before start.
336 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
337 Box::pin(async move {
338 let notified = self.state.terminated_notify.notified();
339 tokio::pin!(notified);
340 loop {
341 notified.as_mut().enable();
342 if self.is_terminated() {
343 return;
344 }
345 notified.as_mut().await;
346 notified.set(self.state.terminated_notify.notified());
347 }
348 })
349 }
350}