Skip to main content

qubit_tokio_executor/
tokio_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use qubit_executor::TaskRunner;
15
16use crate::TokioTaskHandle;
17use crate::tokio_executor_service_state::TokioExecutorServiceState;
18use crate::tokio_service_task_guard::TokioServiceTaskGuard;
19use qubit_executor::service::{
20    ExecutorService,
21    RejectedExecution,
22    ShutdownReport,
23};
24
25/// Tokio-backed service for submitted blocking tasks.
26///
27/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
28/// [`Callable`] tasks, runs them through Tokio, and
29/// returns awaitable handles for their final results.
30#[derive(Default, Clone)]
31pub struct TokioExecutorService {
32    /// Shared service state used by all clones of this service.
33    state: Arc<TokioExecutorServiceState>,
34}
35
36/// Tokio-backed blocking executor service routed through `spawn_blocking`.
37pub type TokioBlockingExecutorService = TokioExecutorService;
38
39impl TokioExecutorService {
40    /// Creates a new service instance.
41    ///
42    /// # Returns
43    ///
44    /// A Tokio-backed executor service.
45    #[inline]
46    pub fn new() -> Self {
47        Self::default()
48    }
49}
50
51impl ExecutorService for TokioExecutorService {
52    type Handle<R, E>
53        = TokioTaskHandle<R, E>
54    where
55        R: Send + 'static,
56        E: Send + 'static;
57
58    type Termination<'a>
59        = std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>>
60    where
61        Self: 'a;
62
63    /// Accepts a callable and runs it through Tokio.
64    ///
65    /// # Parameters
66    ///
67    /// * `task` - Callable to execute on Tokio's blocking task pool.
68    ///
69    /// # Returns
70    ///
71    /// A [`TokioTaskHandle`] for the accepted task.
72    ///
73    /// # Errors
74    ///
75    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
76    /// requested before the task is accepted.
77    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
78    where
79        C: Callable<R, E> + Send + 'static,
80        R: Send + 'static,
81        E: Send + 'static,
82    {
83        let submission_guard = self.state.lock_submission();
84        if self.state.shutdown.load() {
85            return Err(RejectedExecution::Shutdown);
86        }
87        self.state.active_tasks.inc();
88
89        let marker = Arc::new(());
90        let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
91        let handle = tokio::task::spawn_blocking(move || {
92            let _guard = guard;
93            TaskRunner::new(task).call()
94        });
95        self.state
96            .register_abort_handle(marker, handle.abort_handle());
97        drop(submission_guard);
98        Ok(TokioTaskHandle::new(handle))
99    }
100
101    /// Stops accepting new tasks.
102    ///
103    /// Already accepted tasks are allowed to finish unless they are cancelled
104    /// before their blocking closure starts.
105    fn shutdown(&self) {
106        let _guard = self.state.lock_submission();
107        self.state.shutdown.store(true);
108        self.state.notify_if_terminated();
109    }
110
111    /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
112    ///
113    /// Tokio cannot abort blocking tasks that have already started. Such tasks
114    /// continue running and keep the service active until their closure returns.
115    ///
116    /// # Returns
117    ///
118    /// A report with zero queued tasks, the observed active task count, and
119    /// the number of Tokio abort handles signalled.
120    fn shutdown_now(&self) -> ShutdownReport {
121        let _guard = self.state.lock_submission();
122        self.state.shutdown.store(true);
123        let running = self.state.active_tasks.get();
124        let cancellation_count = self.state.abort_tracked_tasks();
125        self.state.notify_if_terminated();
126        ShutdownReport::new(0, running, cancellation_count)
127    }
128
129    /// Returns whether shutdown has been requested.
130    fn is_shutdown(&self) -> bool {
131        self.state.shutdown.load()
132    }
133
134    /// Returns whether shutdown was requested and all tasks are finished.
135    fn is_terminated(&self) -> bool {
136        self.is_shutdown() && self.state.active_tasks.is_zero()
137    }
138
139    /// Waits until the service has terminated.
140    ///
141    /// # Returns
142    ///
143    /// A future that resolves after shutdown has been requested and all
144    /// accepted Tokio blocking tasks have finished or were cancelled before
145    /// their closures started.
146    fn await_termination(&self) -> Self::Termination<'_> {
147        Box::pin(async move {
148            let notified = self.state.terminated_notify.notified();
149            tokio::pin!(notified);
150            loop {
151                notified.as_mut().enable();
152                if self.is_terminated() {
153                    return;
154                }
155                notified.as_mut().await;
156                notified.set(self.state.terminated_notify.notified());
157            }
158        })
159    }
160}