qubit_tokio_executor/tokio_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use qubit_executor::TaskRunner;
15
16use crate::TokioTaskHandle;
17use crate::tokio_executor_service_state::TokioExecutorServiceState;
18use crate::tokio_service_task_guard::TokioServiceTaskGuard;
19use qubit_executor::service::{
20 ExecutorService,
21 RejectedExecution,
22 ShutdownReport,
23};
24
25/// Tokio-backed service for submitted blocking tasks.
26///
27/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
28/// [`Callable`] tasks, runs them through Tokio, and
29/// returns awaitable handles for their final results.
30#[derive(Default, Clone)]
31pub struct TokioExecutorService {
32 /// Shared service state used by all clones of this service.
33 state: Arc<TokioExecutorServiceState>,
34}
35
36impl TokioExecutorService {
37 /// Creates a new service instance.
38 ///
39 /// # Returns
40 ///
41 /// A Tokio-backed executor service.
42 #[inline]
43 pub fn new() -> Self {
44 Self::default()
45 }
46}
47
48impl ExecutorService for TokioExecutorService {
49 type Handle<R, E>
50 = TokioTaskHandle<R, E>
51 where
52 R: Send + 'static,
53 E: Send + 'static;
54
55 type Termination<'a>
56 = std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>>
57 where
58 Self: 'a;
59
60 /// Accepts a callable and runs it through Tokio.
61 ///
62 /// # Parameters
63 ///
64 /// * `task` - Callable to execute on Tokio's blocking task pool.
65 ///
66 /// # Returns
67 ///
68 /// A [`TokioTaskHandle`] for the accepted task.
69 ///
70 /// # Errors
71 ///
72 /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
73 /// requested before the task is accepted.
74 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
75 where
76 C: Callable<R, E> + Send + 'static,
77 R: Send + 'static,
78 E: Send + 'static,
79 {
80 let submission_guard = self.state.lock_submission();
81 if self.state.shutdown.load() {
82 return Err(RejectedExecution::Shutdown);
83 }
84 self.state.active_tasks.inc();
85
86 let marker = Arc::new(());
87 let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
88 let handle = tokio::task::spawn_blocking(move || {
89 let _guard = guard;
90 TaskRunner::new(task).call()
91 });
92 self.state
93 .register_abort_handle(marker, handle.abort_handle());
94 drop(submission_guard);
95 Ok(TokioTaskHandle::new(handle))
96 }
97
98 /// Stops accepting new tasks.
99 ///
100 /// Already accepted tasks are allowed to finish unless they are cancelled
101 /// before their blocking closure starts.
102 fn shutdown(&self) {
103 let _guard = self.state.lock_submission();
104 self.state.shutdown.store(true);
105 self.state.notify_if_terminated();
106 }
107
108 /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
109 ///
110 /// Tokio cannot abort blocking tasks that have already started. Such tasks
111 /// continue running and keep the service active until their closure returns.
112 ///
113 /// # Returns
114 ///
115 /// A report with zero queued tasks, the observed active task count, and
116 /// the number of Tokio abort handles signalled.
117 fn shutdown_now(&self) -> ShutdownReport {
118 let _guard = self.state.lock_submission();
119 self.state.shutdown.store(true);
120 let running = self.state.active_tasks.get();
121 let cancellation_count = self.state.abort_tracked_tasks();
122 self.state.notify_if_terminated();
123 ShutdownReport::new(0, running, cancellation_count)
124 }
125
126 /// Returns whether shutdown has been requested.
127 fn is_shutdown(&self) -> bool {
128 self.state.shutdown.load()
129 }
130
131 /// Returns whether shutdown was requested and all tasks are finished.
132 fn is_terminated(&self) -> bool {
133 self.is_shutdown() && self.state.active_tasks.is_zero()
134 }
135
136 /// Waits until the service has terminated.
137 ///
138 /// # Returns
139 ///
140 /// A future that resolves after shutdown has been requested and all
141 /// accepted Tokio blocking tasks have finished or were cancelled before
142 /// their closures started.
143 fn await_termination(&self) -> Self::Termination<'_> {
144 Box::pin(async move {
145 let notified = self.state.terminated_notify.notified();
146 tokio::pin!(notified);
147 loop {
148 notified.as_mut().enable();
149 if self.is_terminated() {
150 return;
151 }
152 notified.as_mut().await;
153 notified.set(self.state.terminated_notify.notified());
154 }
155 })
156 }
157}