qubit_tokio_executor/tokio_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9use std::sync::Arc;
10
11use qubit_function::Callable;
12
13use qubit_executor::TaskRunner;
14
15use crate::TokioTaskHandle;
16use crate::tokio_executor_service_state::TokioExecutorServiceState;
17use crate::tokio_service_task_guard::TokioServiceTaskGuard;
18use qubit_executor::service::{
19 ExecutorService,
20 RejectedExecution,
21 ShutdownReport,
22};
23
24/// Tokio-backed service for submitted blocking tasks.
25///
26/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
27/// [`Callable`] tasks, runs them through Tokio, and
28/// returns awaitable handles for their final results.
29#[derive(Default, Clone)]
30pub struct TokioExecutorService {
31 /// Shared service state used by all clones of this service.
32 state: Arc<TokioExecutorServiceState>,
33}
34
35impl TokioExecutorService {
36 /// Creates a new service instance.
37 ///
38 /// # Returns
39 ///
40 /// A Tokio-backed executor service.
41 #[inline]
42 pub fn new() -> Self {
43 Self::default()
44 }
45}
46
47impl ExecutorService for TokioExecutorService {
48 type Handle<R, E>
49 = TokioTaskHandle<R, E>
50 where
51 R: Send + 'static,
52 E: Send + 'static;
53
54 type Termination<'a>
55 = std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>>
56 where
57 Self: 'a;
58
59 /// Accepts a callable and runs it through Tokio.
60 ///
61 /// # Parameters
62 ///
63 /// * `task` - Callable to execute on Tokio's blocking task pool.
64 ///
65 /// # Returns
66 ///
67 /// A [`TokioTaskHandle`] for the accepted task.
68 ///
69 /// # Errors
70 ///
71 /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
72 /// requested before the task is accepted.
73 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
74 where
75 C: Callable<R, E> + Send + 'static,
76 R: Send + 'static,
77 E: Send + 'static,
78 {
79 let submission_guard = self.state.lock_submission();
80 if self.state.shutdown.load() {
81 return Err(RejectedExecution::Shutdown);
82 }
83 self.state.active_tasks.inc();
84
85 let marker = Arc::new(());
86 let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
87 let handle = tokio::task::spawn_blocking(move || {
88 let _guard = guard;
89 TaskRunner::new(task).call()
90 });
91 self.state
92 .register_abort_handle(marker, handle.abort_handle());
93 drop(submission_guard);
94 Ok(TokioTaskHandle::new(handle))
95 }
96
97 /// Stops accepting new tasks.
98 ///
99 /// Already accepted tasks are allowed to finish unless they are cancelled
100 /// before their blocking closure starts.
101 fn shutdown(&self) {
102 let _guard = self.state.lock_submission();
103 self.state.shutdown.store(true);
104 self.state.notify_if_terminated();
105 }
106
107 /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
108 ///
109 /// Tokio cannot abort blocking tasks that have already started. Such tasks
110 /// continue running and keep the service active until their closure returns.
111 ///
112 /// # Returns
113 ///
114 /// A report with zero queued tasks, the observed active task count, and
115 /// the number of Tokio abort handles signalled.
116 fn shutdown_now(&self) -> ShutdownReport {
117 let _guard = self.state.lock_submission();
118 self.state.shutdown.store(true);
119 let running = self.state.active_tasks.get();
120 let cancellation_count = self.state.abort_tracked_tasks();
121 self.state.notify_if_terminated();
122 ShutdownReport::new(0, running, cancellation_count)
123 }
124
125 /// Returns whether shutdown has been requested.
126 fn is_shutdown(&self) -> bool {
127 self.state.shutdown.load()
128 }
129
130 /// Returns whether shutdown was requested and all tasks are finished.
131 fn is_terminated(&self) -> bool {
132 self.is_shutdown() && self.state.active_tasks.is_zero()
133 }
134
135 /// Waits until the service has terminated.
136 ///
137 /// # Returns
138 ///
139 /// A future that resolves after shutdown has been requested and all
140 /// accepted Tokio blocking tasks have finished or were cancelled before
141 /// their closures started.
142 fn await_termination(&self) -> Self::Termination<'_> {
143 Box::pin(async move {
144 let notified = self.state.terminated_notify.notified();
145 tokio::pin!(notified);
146 loop {
147 notified.as_mut().enable();
148 if self.is_terminated() {
149 return;
150 }
151 notified.as_mut().await;
152 notified.set(self.state.terminated_notify.notified());
153 }
154 })
155 }
156}