Skip to main content

qubit_tokio_executor/
tokio_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use qubit_executor::TaskRunner;
15
16use crate::TokioTaskHandle;
17use crate::tokio_executor_service_state::TokioExecutorServiceState;
18use crate::tokio_service_task_guard::TokioServiceTaskGuard;
19use qubit_executor::service::{
20    ExecutorService,
21    RejectedExecution,
22    ShutdownReport,
23};
24
25/// Tokio-backed service for submitted blocking tasks.
26///
27/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
28/// [`Callable`] tasks, runs them through Tokio, and
29/// returns awaitable handles for their final results.
30#[derive(Default, Clone)]
31pub struct TokioExecutorService {
32    /// Shared service state used by all clones of this service.
33    state: Arc<TokioExecutorServiceState>,
34}
35
36impl TokioExecutorService {
37    /// Creates a new service instance.
38    ///
39    /// # Returns
40    ///
41    /// A Tokio-backed executor service.
42    #[inline]
43    pub fn new() -> Self {
44        Self::default()
45    }
46}
47
48impl ExecutorService for TokioExecutorService {
49    type Handle<R, E>
50        = TokioTaskHandle<R, E>
51    where
52        R: Send + 'static,
53        E: Send + 'static;
54
55    type Termination<'a>
56        = std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>>
57    where
58        Self: 'a;
59
60    /// Accepts a callable and runs it through Tokio.
61    ///
62    /// # Parameters
63    ///
64    /// * `task` - Callable to execute on Tokio's blocking task pool.
65    ///
66    /// # Returns
67    ///
68    /// A [`TokioTaskHandle`] for the accepted task.
69    ///
70    /// # Errors
71    ///
72    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
73    /// requested before the task is accepted.
74    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
75    where
76        C: Callable<R, E> + Send + 'static,
77        R: Send + 'static,
78        E: Send + 'static,
79    {
80        let submission_guard = self.state.lock_submission();
81        if self.state.shutdown.load() {
82            return Err(RejectedExecution::Shutdown);
83        }
84        self.state.active_tasks.inc();
85
86        let marker = Arc::new(());
87        let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
88        let handle = tokio::task::spawn_blocking(move || {
89            let _guard = guard;
90            TaskRunner::new(task).call()
91        });
92        self.state
93            .register_abort_handle(marker, handle.abort_handle());
94        drop(submission_guard);
95        Ok(TokioTaskHandle::new(handle))
96    }
97
98    /// Stops accepting new tasks.
99    ///
100    /// Already accepted tasks are allowed to finish unless they are cancelled
101    /// before their blocking closure starts.
102    fn shutdown(&self) {
103        let _guard = self.state.lock_submission();
104        self.state.shutdown.store(true);
105        self.state.notify_if_terminated();
106    }
107
108    /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
109    ///
110    /// Tokio cannot abort blocking tasks that have already started. Such tasks
111    /// continue running and keep the service active until their closure returns.
112    ///
113    /// # Returns
114    ///
115    /// A report with zero queued tasks, the observed active task count, and
116    /// the number of Tokio abort handles signalled.
117    fn shutdown_now(&self) -> ShutdownReport {
118        let _guard = self.state.lock_submission();
119        self.state.shutdown.store(true);
120        let running = self.state.active_tasks.get();
121        let cancellation_count = self.state.abort_tracked_tasks();
122        self.state.notify_if_terminated();
123        ShutdownReport::new(0, running, cancellation_count)
124    }
125
126    /// Returns whether shutdown has been requested.
127    fn is_shutdown(&self) -> bool {
128        self.state.shutdown.load()
129    }
130
131    /// Returns whether shutdown was requested and all tasks are finished.
132    fn is_terminated(&self) -> bool {
133        self.is_shutdown() && self.state.active_tasks.is_zero()
134    }
135
136    /// Waits until the service has terminated.
137    ///
138    /// # Returns
139    ///
140    /// A future that resolves after shutdown has been requested and all
141    /// accepted Tokio blocking tasks have finished or were cancelled before
142    /// their closures started.
143    fn await_termination(&self) -> Self::Termination<'_> {
144        Box::pin(async move {
145            let notified = self.state.terminated_notify.notified();
146            tokio::pin!(notified);
147            loop {
148                notified.as_mut().enable();
149                if self.is_terminated() {
150                    return;
151                }
152                notified.as_mut().await;
153                notified.set(self.state.terminated_notify.notified());
154            }
155        })
156    }
157}