Skip to main content

qubit_tokio_executor/
tokio_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::{
14        Arc,
15        MutexGuard,
16    },
17};
18
19use qubit_function::{
20    Callable,
21    Runnable,
22};
23
24use qubit_executor::TaskHandle;
25use qubit_executor::task::spi::{
26    TaskEndpointPair,
27    TaskRunner,
28};
29
30use crate::TokioBlockingTaskHandle;
31use crate::tokio_executor_service_state::TokioExecutorServiceState;
32use crate::tokio_runtime::ensure_tokio_runtime_entered;
33use crate::tokio_service_task_guard::TokioServiceTaskGuard;
34use crate::tokio_task_slot_cancellation::{
35    cancel_unstarted_task_slot_if_queued,
36    share_task_slot,
37    take_task_slot,
38};
39use qubit_executor::service::{
40    ExecutorService,
41    ExecutorServiceLifecycle,
42    StopReport,
43    SubmissionError,
44};
45use tokio::task::AbortHandle;
46
47/// Tokio-backed service for submitted blocking tasks.
48///
49/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
50/// [`Callable`] tasks and runs them through Tokio's blocking task pool.
51#[derive(Default, Clone)]
52pub struct TokioExecutorService {
53    /// Shared service state used by all clones of this service.
54    state: Arc<TokioExecutorServiceState>,
55}
56
57/// Tokio-backed blocking executor service routed through `spawn_blocking`.
58pub type TokioBlockingExecutorService = TokioExecutorService;
59
60impl TokioExecutorService {
61    /// Creates a new service instance.
62    ///
63    /// # Returns
64    ///
65    /// A Tokio-backed executor service.
66    #[inline]
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Prepares a blocking-task submission under the service submission lock.
72    ///
73    /// # Returns
74    ///
75    /// The held submission lock, service-local task marker, and lifecycle
76    /// guard for the accepted queued task.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`SubmissionError::Shutdown`] if the service is not running, or
81    /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
82    /// entered into a Tokio runtime.
83    fn prepare_blocking_submission(
84        &self,
85    ) -> Result<(MutexGuard<'_, ()>, Arc<()>, TokioServiceTaskGuard), SubmissionError> {
86        let submission_guard = self.state.lock_submission();
87        if self.state.is_not_running() {
88            return Err(SubmissionError::Shutdown);
89        }
90        ensure_tokio_runtime_entered()?;
91        self.state.accept_task();
92
93        let marker = Arc::new(());
94        let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
95        Ok((submission_guard, marker, guard))
96    }
97
98    /// Spawns an already accepted blocking task and registers its abort hook.
99    ///
100    /// # Parameters
101    ///
102    /// * `submission_guard` - Submission lock held since task acceptance.
103    /// * `marker` - Service-local marker associated with this task.
104    /// * `guard` - Lifecycle guard that owns service-side task accounting.
105    /// * `task` - Work to run after the blocking closure starts.
106    /// * `cancel` - Hook invoked by service stop when Tokio aborts queued
107    ///   work. It returns `true` only if queued service accounting was
108    ///   actually cancelled by that hook.
109    ///
110    /// # Returns
111    ///
112    /// Tokio abort handle for the spawned blocking task.
113    ///
114    /// # Panics
115    ///
116    /// Panics only if called without an entered Tokio runtime. Callers must use
117    /// [`Self::prepare_blocking_submission`] first.
118    fn spawn_accepted_blocking_task<F, C>(
119        &self,
120        submission_guard: MutexGuard<'_, ()>,
121        marker: Arc<()>,
122        guard: TokioServiceTaskGuard,
123        task: F,
124        cancel: C,
125    ) -> AbortHandle
126    where
127        F: FnOnce() + Send + 'static,
128        C: FnOnce() -> bool + Send + 'static,
129    {
130        let join_handle = tokio::task::spawn_blocking(move || {
131            let guard = guard;
132            if !guard.mark_started() {
133                return;
134            }
135            task();
136        });
137        let abort_handle = join_handle.abort_handle();
138        self.state
139            .register_abort_handle(marker, abort_handle.clone(), cancel);
140        drop(submission_guard);
141        abort_handle
142    }
143}
144
145impl ExecutorService for TokioExecutorService {
146    type ResultHandle<R, E>
147        = TaskHandle<R, E>
148    where
149        R: Send + 'static,
150        E: Send + 'static;
151
152    type TrackedHandle<R, E>
153        = TokioBlockingTaskHandle<R, E>
154    where
155        R: Send + 'static,
156        E: Send + 'static;
157
158    /// Accepts a runnable and runs it through Tokio.
159    ///
160    /// # Parameters
161    ///
162    /// * `task` - Runnable to execute on Tokio's blocking task pool.
163    ///
164    /// # Returns
165    ///
166    /// `Ok(())` if the task was accepted.
167    ///
168    /// # Errors
169    ///
170    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
171    /// requested before the task is accepted. Returns
172    /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
173    /// entered into a Tokio runtime.
174    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
175    where
176        T: Runnable<E> + Send + 'static,
177        E: Send + 'static,
178    {
179        let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
180        let abort_queued_task = guard.finish_queued_once_callback();
181        self.spawn_accepted_blocking_task(
182            submission_guard,
183            marker,
184            guard,
185            move || {
186                let mut task = task;
187                let runner = TaskRunner::new(move || task.run());
188                let _ = runner.call::<(), E>();
189            },
190            abort_queued_task,
191        );
192        Ok(())
193    }
194
195    /// Accepts a callable and runs it through Tokio.
196    ///
197    /// # Parameters
198    ///
199    /// * `task` - Callable to execute on Tokio's blocking task pool.
200    ///
201    /// # Returns
202    ///
203    /// A [`TaskHandle`] for the accepted task.
204    ///
205    /// # Errors
206    ///
207    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
208    /// requested before the task is accepted. Returns
209    /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
210    /// entered into a Tokio runtime.
211    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
212    where
213        C: Callable<R, E> + Send + 'static,
214        R: Send + 'static,
215        E: Send + 'static,
216    {
217        let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
218        let (handle, completion) = TaskEndpointPair::new().into_parts();
219        completion.accept();
220        let completion = share_task_slot(completion);
221        let abort_completion = Arc::clone(&completion);
222        let abort_queued_task = guard.finish_queued_once_callback();
223        self.spawn_accepted_blocking_task(
224            submission_guard,
225            marker,
226            guard,
227            move || {
228                if let Some(completion) = take_task_slot(&completion) {
229                    TaskRunner::new(task).run(completion);
230                }
231            },
232            move || cancel_unstarted_task_slot_if_queued(&abort_completion, abort_queued_task),
233        );
234        Ok(handle)
235    }
236
237    /// Accepts a callable and returns an actively tracked handle.
238    ///
239    /// # Parameters
240    ///
241    /// * `task` - Callable to execute on Tokio's blocking task pool.
242    ///
243    /// # Returns
244    ///
245    /// A [`TokioBlockingTaskHandle`] for the accepted task.
246    ///
247    /// # Errors
248    ///
249    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
250    /// requested before the task is accepted. Returns
251    /// [`SubmissionError::WorkerSpawnFailed`] if the current thread is not
252    /// entered into a Tokio runtime.
253    fn submit_tracked_callable<C, R, E>(
254        &self,
255        task: C,
256    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
257    where
258        C: Callable<R, E> + Send + 'static,
259        R: Send + 'static,
260        E: Send + 'static,
261    {
262        let (submission_guard, marker, guard) = self.prepare_blocking_submission()?;
263        let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
264        completion.accept();
265        let completion = share_task_slot(completion);
266        let abort_completion = Arc::clone(&completion);
267        let abort_queued_task = guard.finish_queued_once_callback();
268        let cancel_queued_task = guard.cancel_queued_callback();
269        let abort_handle = self.spawn_accepted_blocking_task(
270            submission_guard,
271            marker,
272            guard,
273            move || {
274                if let Some(completion) = take_task_slot(&completion) {
275                    TaskRunner::new(task).run(completion);
276                }
277            },
278            move || cancel_unstarted_task_slot_if_queued(&abort_completion, abort_queued_task),
279        );
280        Ok(TokioBlockingTaskHandle::new(
281            handle,
282            abort_handle,
283            cancel_queued_task,
284        ))
285    }
286
287    /// Stops accepting new tasks.
288    ///
289    /// Already accepted tasks are allowed to finish unless they are cancelled
290    /// before their blocking closure starts.
291    fn shutdown(&self) {
292        let _guard = self.state.lock_submission();
293        self.state.shutdown();
294        self.state.notify_if_terminated();
295    }
296
297    /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
298    ///
299    /// Tokio cannot abort blocking tasks that have already started. Such tasks
300    /// continue running and keep the service active until their closure returns.
301    ///
302    /// # Returns
303    ///
304    /// A report with queued and running blocking task counts observed when
305    /// stop was requested, plus the number of queued blocking tasks that were
306    /// actually cancelled before their blocking closures started.
307    fn stop(&self) -> StopReport {
308        let _guard = self.state.lock_submission();
309        self.state.stop();
310        let (queued_count, running_count) = self.state.task_count_snapshot();
311        let cancellation_count = self.state.abort_tracked_tasks();
312        self.state.notify_if_terminated();
313        StopReport::new(queued_count, running_count, cancellation_count)
314    }
315
316    /// Returns the current lifecycle state.
317    fn lifecycle(&self) -> ExecutorServiceLifecycle {
318        self.state.lifecycle()
319    }
320
321    /// Returns whether shutdown has been requested.
322    fn is_not_running(&self) -> bool {
323        self.state.is_not_running()
324    }
325
326    /// Returns whether shutdown was requested and all tasks are finished.
327    fn is_terminated(&self) -> bool {
328        self.lifecycle() == ExecutorServiceLifecycle::Terminated
329    }
330
331    /// Blocks until the service has terminated.
332    fn wait_termination(&self) {
333        self.state.wait_termination();
334    }
335}
336
337impl TokioExecutorService {
338    /// Waits asynchronously until the service has terminated.
339    ///
340    /// # Returns
341    ///
342    /// A future that resolves after shutdown or stop has been requested and all
343    /// accepted blocking tasks have finished or been aborted before start.
344    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
345        Box::pin(async move {
346            let notified = self.state.terminated_notify.notified();
347            tokio::pin!(notified);
348            loop {
349                notified.as_mut().enable();
350                if self.is_terminated() {
351                    return;
352                }
353                notified.as_mut().await;
354                notified.set(self.state.terminated_notify.notified());
355            }
356        })
357    }
358}