Skip to main content

qubit_tokio_executor/
tokio_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::sync::Arc;
10
11use qubit_function::Callable;
12
13use qubit_executor::TaskRunner;
14
15use crate::TokioTaskHandle;
16use crate::tokio_executor_service_state::TokioExecutorServiceState;
17use crate::tokio_service_task_guard::TokioServiceTaskGuard;
18use qubit_executor::service::{
19    ExecutorService,
20    RejectedExecution,
21    ShutdownReport,
22};
23
24/// Tokio-backed service for submitted blocking tasks.
25///
26/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
27/// [`Callable`] tasks, runs them through Tokio, and
28/// returns awaitable handles for their final results.
29#[derive(Default, Clone)]
30pub struct TokioExecutorService {
31    /// Shared service state used by all clones of this service.
32    state: Arc<TokioExecutorServiceState>,
33}
34
35impl TokioExecutorService {
36    /// Creates a new service instance.
37    ///
38    /// # Returns
39    ///
40    /// A Tokio-backed executor service.
41    #[inline]
42    pub fn new() -> Self {
43        Self::default()
44    }
45}
46
47impl ExecutorService for TokioExecutorService {
48    type Handle<R, E>
49        = TokioTaskHandle<R, E>
50    where
51        R: Send + 'static,
52        E: Send + 'static;
53
54    type Termination<'a>
55        = std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>>
56    where
57        Self: 'a;
58
59    /// Accepts a callable and runs it through Tokio.
60    ///
61    /// # Parameters
62    ///
63    /// * `task` - Callable to execute on Tokio's blocking task pool.
64    ///
65    /// # Returns
66    ///
67    /// A [`TokioTaskHandle`] for the accepted task.
68    ///
69    /// # Errors
70    ///
71    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
72    /// requested before the task is accepted.
73    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
74    where
75        C: Callable<R, E> + Send + 'static,
76        R: Send + 'static,
77        E: Send + 'static,
78    {
79        let submission_guard = self.state.lock_submission();
80        if self.state.shutdown.load() {
81            return Err(RejectedExecution::Shutdown);
82        }
83        self.state.active_tasks.inc();
84
85        let marker = Arc::new(());
86        let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
87        let handle = tokio::task::spawn_blocking(move || {
88            let _guard = guard;
89            TaskRunner::new(task).call()
90        });
91        self.state
92            .register_abort_handle(marker, handle.abort_handle());
93        drop(submission_guard);
94        Ok(TokioTaskHandle::new(handle))
95    }
96
97    /// Stops accepting new tasks.
98    ///
99    /// Already accepted tasks are allowed to finish unless they are cancelled
100    /// before their blocking closure starts.
101    fn shutdown(&self) {
102        let _guard = self.state.lock_submission();
103        self.state.shutdown.store(true);
104        self.state.notify_if_terminated();
105    }
106
107    /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
108    ///
109    /// Tokio cannot abort blocking tasks that have already started. Such tasks
110    /// continue running and keep the service active until their closure returns.
111    ///
112    /// # Returns
113    ///
114    /// A report with zero queued tasks, the observed active task count, and
115    /// the number of Tokio abort handles signalled.
116    fn shutdown_now(&self) -> ShutdownReport {
117        let _guard = self.state.lock_submission();
118        self.state.shutdown.store(true);
119        let running = self.state.active_tasks.get();
120        let cancellation_count = self.state.abort_tracked_tasks();
121        self.state.notify_if_terminated();
122        ShutdownReport::new(0, running, cancellation_count)
123    }
124
125    /// Returns whether shutdown has been requested.
126    fn is_shutdown(&self) -> bool {
127        self.state.shutdown.load()
128    }
129
130    /// Returns whether shutdown was requested and all tasks are finished.
131    fn is_terminated(&self) -> bool {
132        self.is_shutdown() && self.state.active_tasks.is_zero()
133    }
134
135    /// Waits until the service has terminated.
136    ///
137    /// # Returns
138    ///
139    /// A future that resolves after shutdown has been requested and all
140    /// accepted Tokio blocking tasks have finished or were cancelled before
141    /// their closures started.
142    fn await_termination(&self) -> Self::Termination<'_> {
143        Box::pin(async move {
144            let notified = self.state.terminated_notify.notified();
145            tokio::pin!(notified);
146            loop {
147                notified.as_mut().enable();
148                if self.is_terminated() {
149                    return;
150                }
151                notified.as_mut().await;
152                notified.set(self.state.terminated_notify.notified());
153            }
154        })
155    }
156}