Skip to main content

qubit_tokio_executor/
tokio_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::{
14        Arc,
15        MutexGuard,
16    },
17};
18
19use qubit_function::{
20    Callable,
21    Runnable,
22};
23
24use qubit_executor::TaskHandle;
25use qubit_executor::task::spi::{
26    TaskEndpointPair,
27    TaskRunner,
28};
29
30use crate::TokioBlockingTaskHandle;
31use crate::tokio_executor_service_state::TokioExecutorServiceState;
32use crate::tokio_runtime::ensure_tokio_runtime_entered;
33use crate::tokio_service_task_guard::TokioServiceTaskGuard;
34use crate::tokio_task_slot_cancellation::{
35    cancel_unstarted_task_slot_if_queued,
36    share_task_slot,
37    take_task_slot,
38};
39use qubit_executor::service::{
40    ExecutorService,
41    ExecutorServiceLifecycle,
42    StopReport,
43    SubmissionError,
44};
45use tokio::task::AbortHandle;
46
47/// Tokio-backed service for submitted blocking tasks.
48///
49/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
50/// [`Callable`] tasks and runs them through Tokio's blocking task pool.
51#[derive(Default, Clone)]
52pub struct TokioExecutorService {
53    /// Shared service state used by all clones of this service.
54    state: Arc<TokioExecutorServiceState>,
55}
56
57/// Tokio-backed blocking executor service routed through `spawn_blocking`.
58pub type TokioBlockingExecutorService = TokioExecutorService;
59
60impl TokioExecutorService {
61    /// Creates a new service instance.
62    ///
63    /// # Returns
64    ///
65    /// A Tokio-backed executor service.
66    #[inline]
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Prepares a blocking-task submission under the service submission lock.
72    ///
73    /// # Returns
74    ///
75    /// The held submission lock, service-local task marker, and lifecycle
76    /// guard for the accepted queued task.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`SubmissionError::Shutdown`] if the service is not running, or
81    /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
82    /// entered into a Tokio runtime.
83    fn prepare_blocking_submission(
84        &self,
85    ) -> Result<(MutexGuard<'_, ()>, Arc<()>, TokioServiceTaskGuard), SubmissionError> {
86        let submission_guard = self.state.lock_submission();
87        if self.state.is_not_running() {
88            return Err(SubmissionError::Shutdown);
89        }
90        ensure_tokio_runtime_entered()?;
91        self.state.accept_task();
92
93        let marker = Arc::new(());
94        let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
95        Ok((submission_guard, marker, guard))
96    }
97
98    /// Spawns an already accepted blocking task and registers its abort hook.
99    ///
100    /// # Parameters
101    ///
102    /// * `submission_guard` - Submission lock held since task acceptance.
103    /// * `marker` - Service-local marker associated with this task.
104    /// * `guard` - Lifecycle guard that owns service-side task accounting.
105    /// * `task` - Work to run after the blocking closure starts.
106    /// * `cancel` - Hook invoked by service stop when Tokio aborts queued
107    ///   work. It returns `true` only if queued service accounting was
108    ///   actually cancelled by that hook.
109    ///
110    /// # Returns
111    ///
112    /// Tokio abort handle for the spawned blocking task.
113    ///
114    /// # Panics
115    ///
116    /// Panics only if called without an entered Tokio runtime. Callers must use
117    /// [`Self::prepare_blocking_submission`] first.
118    fn spawn_accepted_blocking_task<F, C>(
119        &self,
120        submission_guard: MutexGuard<'_, ()>,
121        marker: Arc<()>,
122        guard: TokioServiceTaskGuard,
123        task: F,
124        cancel: C,
125    ) -> AbortHandle
126    where
127        F: FnOnce() + Send + 'static,
128        C: FnOnce() -> bool + Send + 'static,
129    {
130        let join_handle = tokio::task::spawn_blocking(move || {
131            let guard = guard;
132            if !guard.mark_started() {
133                return;
134            }
135            task();
136        });
137        let abort_handle = join_handle.abort_handle();
138        self.state.register_abort_handle(marker, abort_handle.clone(), cancel);
139        drop(submission_guard);
140        abort_handle
141    }
142}
143
144impl ExecutorService for TokioExecutorService {
145    type ResultHandle<R, E>
146        = TaskHandle<R, E>
147    where
148        R: Send + 'static,
149        E: Send + 'static;
150
151    type TrackedHandle<R, E>
152        = TokioBlockingTaskHandle<R, E>
153    where
154        R: Send + 'static,
155        E: Send + 'static;
156
157    /// Accepts a runnable and runs it through Tokio.
158    ///
159    /// # Parameters
160    ///
161    /// * `task` - Runnable to execute on Tokio's blocking task pool.
162    ///
163    /// # Returns
164    ///
165    /// `Ok(())` if the task was accepted.
166    ///
167    /// # Errors
168    ///
169    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
170    /// requested before the task is accepted. Returns
171    /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
172    /// entered into a Tokio runtime.
173    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
174    where
175        T: Runnable<E> + Send + 'static,
176        E: Send + 'static,
177    {
178        let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
179        let abort_queued_task = guard.finish_queued_once_callback();
180        self.spawn_accepted_blocking_task(
181            submission_guard,
182            marker,
183            guard,
184            move || {
185                let mut task = task;
186                let runner = TaskRunner::new(move || task.run());
187                let _ = runner.call::<(), E>();
188            },
189            abort_queued_task,
190        );
191        Ok(())
192    }
193
194    /// Accepts a callable and runs it through Tokio.
195    ///
196    /// # Parameters
197    ///
198    /// * `task` - Callable to execute on Tokio's blocking task pool.
199    ///
200    /// # Returns
201    ///
202    /// A [`TaskHandle`] for the accepted task.
203    ///
204    /// # Errors
205    ///
206    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
207    /// requested before the task is accepted. Returns
208    /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
209    /// entered into a Tokio runtime.
210    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
211    where
212        C: Callable<R, E> + Send + 'static,
213        R: Send + 'static,
214        E: Send + 'static,
215    {
216        let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
217        let (handle, completion) = TaskEndpointPair::new().into_parts();
218        completion.accept();
219        let completion = share_task_slot(completion);
220        let abort_completion = Arc::clone(&completion);
221        let abort_queued_task = guard.finish_queued_once_callback();
222        self.spawn_accepted_blocking_task(
223            submission_guard,
224            marker,
225            guard,
226            move || {
227                if let Some(completion) = take_task_slot(&completion) {
228                    TaskRunner::new(task).run(completion);
229                }
230            },
231            move || cancel_unstarted_task_slot_if_queued(&abort_completion, abort_queued_task),
232        );
233        Ok(handle)
234    }
235
236    /// Accepts a callable and returns an actively tracked handle.
237    ///
238    /// # Parameters
239    ///
240    /// * `task` - Callable to execute on Tokio's blocking task pool.
241    ///
242    /// # Returns
243    ///
244    /// A [`TokioBlockingTaskHandle`] for the accepted task.
245    ///
246    /// # Errors
247    ///
248    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
249    /// requested before the task is accepted. Returns
250    /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
251    /// entered into a Tokio runtime.
252    fn submit_tracked_callable<C, R, E>(&self, task: C) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
253    where
254        C: Callable<R, E> + Send + 'static,
255        R: Send + 'static,
256        E: Send + 'static,
257    {
258        let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
259        let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
260        completion.accept();
261        let completion = share_task_slot(completion);
262        let abort_completion = Arc::clone(&completion);
263        let abort_queued_task = guard.finish_queued_once_callback();
264        let cancel_queued_task = guard.cancel_queued_callback();
265        let abort_handle = self.spawn_accepted_blocking_task(
266            submission_guard,
267            marker,
268            guard,
269            move || {
270                if let Some(completion) = take_task_slot(&completion) {
271                    TaskRunner::new(task).run(completion);
272                }
273            },
274            move || cancel_unstarted_task_slot_if_queued(&abort_completion, abort_queued_task),
275        );
276        Ok(TokioBlockingTaskHandle::new(handle, abort_handle, cancel_queued_task))
277    }
278
279    /// Stops accepting new tasks.
280    ///
281    /// Already accepted tasks are allowed to finish unless they are cancelled
282    /// before their blocking closure starts.
283    fn shutdown(&self) {
284        let _guard = self.state.lock_submission();
285        self.state.shutdown();
286        self.state.notify_if_terminated();
287    }
288
289    /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
290    ///
291    /// Tokio cannot abort blocking tasks that have already started. Such tasks
292    /// continue running and keep the service active until their closure returns.
293    ///
294    /// # Returns
295    ///
296    /// A report with queued and running blocking task counts observed when
297    /// stop was requested, plus the number of queued blocking tasks that were
298    /// actually cancelled before their blocking closures started.
299    fn stop(&self) -> StopReport {
300        let _guard = self.state.lock_submission();
301        self.state.stop();
302        let (queued_count, running_count) = self.state.task_count_snapshot();
303        let cancellation_count = self.state.abort_tracked_tasks();
304        self.state.notify_if_terminated();
305        StopReport::new(queued_count, running_count, cancellation_count)
306    }
307
308    /// Returns the current lifecycle state.
309    fn lifecycle(&self) -> ExecutorServiceLifecycle {
310        self.state.lifecycle()
311    }
312
313    /// Returns whether shutdown has been requested.
314    fn is_not_running(&self) -> bool {
315        self.state.is_not_running()
316    }
317
318    /// Returns whether shutdown was requested and all tasks are finished.
319    fn is_terminated(&self) -> bool {
320        self.lifecycle() == ExecutorServiceLifecycle::Terminated
321    }
322
323    /// Blocks until the service has terminated.
324    fn wait_termination(&self) {
325        self.state.wait_termination();
326    }
327}
328
329impl TokioExecutorService {
330    /// Waits asynchronously until the service has terminated.
331    ///
332    /// # Returns
333    ///
334    /// A future that resolves after shutdown or stop has been requested and all
335    /// accepted blocking tasks have finished or been aborted before start.
336    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
337        Box::pin(async move {
338            let notified = self.state.terminated_notify.notified();
339            tokio::pin!(notified);
340            loop {
341                notified.as_mut().enable();
342                if self.is_terminated() {
343                    return;
344                }
345                notified.as_mut().await;
346                notified.set(self.state.terminated_notify.notified());
347            }
348        })
349    }
350}