qubit_tokio_executor/tokio_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use qubit_executor::TaskRunner;
15
16use crate::TokioTaskHandle;
17use crate::tokio_executor_service_state::TokioExecutorServiceState;
18use crate::tokio_service_task_guard::TokioServiceTaskGuard;
19use qubit_executor::service::{
20 ExecutorService,
21 RejectedExecution,
22 ShutdownReport,
23};
24
25/// Tokio-backed service for submitted blocking tasks.
26///
27/// The service accepts fallible [`Runnable`](qubit_function::Runnable) and
28/// [`Callable`] tasks, runs them through Tokio, and
29/// returns awaitable handles for their final results.
30#[derive(Default, Clone)]
31pub struct TokioExecutorService {
32 /// Shared service state used by all clones of this service.
33 state: Arc<TokioExecutorServiceState>,
34}
35
36/// Tokio-backed blocking executor service routed through `spawn_blocking`.
37pub type TokioBlockingExecutorService = TokioExecutorService;
38
39impl TokioExecutorService {
40 /// Creates a new service instance.
41 ///
42 /// # Returns
43 ///
44 /// A Tokio-backed executor service.
45 #[inline]
46 pub fn new() -> Self {
47 Self::default()
48 }
49}
50
51impl ExecutorService for TokioExecutorService {
52 type Handle<R, E>
53 = TokioTaskHandle<R, E>
54 where
55 R: Send + 'static,
56 E: Send + 'static;
57
58 type Termination<'a>
59 = std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>>
60 where
61 Self: 'a;
62
63 /// Accepts a callable and runs it through Tokio.
64 ///
65 /// # Parameters
66 ///
67 /// * `task` - Callable to execute on Tokio's blocking task pool.
68 ///
69 /// # Returns
70 ///
71 /// A [`TokioTaskHandle`] for the accepted task.
72 ///
73 /// # Errors
74 ///
75 /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
76 /// requested before the task is accepted.
77 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
78 where
79 C: Callable<R, E> + Send + 'static,
80 R: Send + 'static,
81 E: Send + 'static,
82 {
83 let submission_guard = self.state.lock_submission();
84 if self.state.shutdown.load() {
85 return Err(RejectedExecution::Shutdown);
86 }
87 self.state.active_tasks.inc();
88
89 let marker = Arc::new(());
90 let guard = TokioServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
91 let handle = tokio::task::spawn_blocking(move || {
92 let _guard = guard;
93 TaskRunner::new(task).call()
94 });
95 self.state
96 .register_abort_handle(marker, handle.abort_handle());
97 drop(submission_guard);
98 Ok(TokioTaskHandle::new(handle))
99 }
100
101 /// Stops accepting new tasks.
102 ///
103 /// Already accepted tasks are allowed to finish unless they are cancelled
104 /// before their blocking closure starts.
105 fn shutdown(&self) {
106 let _guard = self.state.lock_submission();
107 self.state.shutdown.store(true);
108 self.state.notify_if_terminated();
109 }
110
111 /// Stops accepting new tasks and requests abort for tracked Tokio tasks.
112 ///
113 /// Tokio cannot abort blocking tasks that have already started. Such tasks
114 /// continue running and keep the service active until their closure returns.
115 ///
116 /// # Returns
117 ///
118 /// A report with zero queued tasks, the observed active task count, and
119 /// the number of Tokio abort handles signalled.
120 fn shutdown_now(&self) -> ShutdownReport {
121 let _guard = self.state.lock_submission();
122 self.state.shutdown.store(true);
123 let running = self.state.active_tasks.get();
124 let cancellation_count = self.state.abort_tracked_tasks();
125 self.state.notify_if_terminated();
126 ShutdownReport::new(0, running, cancellation_count)
127 }
128
129 /// Returns whether shutdown has been requested.
130 fn is_shutdown(&self) -> bool {
131 self.state.shutdown.load()
132 }
133
134 /// Returns whether shutdown was requested and all tasks are finished.
135 fn is_terminated(&self) -> bool {
136 self.is_shutdown() && self.state.active_tasks.is_zero()
137 }
138
139 /// Waits until the service has terminated.
140 ///
141 /// # Returns
142 ///
143 /// A future that resolves after shutdown has been requested and all
144 /// accepted Tokio blocking tasks have finished or were cancelled before
145 /// their closures started.
146 fn await_termination(&self) -> Self::Termination<'_> {
147 Box::pin(async move {
148 let notified = self.state.terminated_notify.notified();
149 tokio::pin!(notified);
150 loop {
151 notified.as_mut().enable();
152 if self.is_terminated() {
153 return;
154 }
155 notified.as_mut().await;
156 notified.set(self.state.terminated_notify.notified());
157 }
158 })
159 }
160}