qubit_thread_pool/pool_job.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 panic::{
12 AssertUnwindSafe,
13 catch_unwind,
14 },
15 sync::Mutex,
16};
17
18use qubit_executor::task::spi::{
19 TaskRunner,
20 TaskSlot,
21};
22use qubit_function::{
23 Callable,
24 Runnable,
25};
26
27/// Type-erased callable owned by a pool queue.
28trait PoolTask: Send + 'static {
29 /// Marks this task as accepted by an executor service.
30 ///
31 /// # Returns
32 ///
33 /// `true` when the acceptance callback completed, or `false` when a custom
34 /// callback panicked and was contained.
35 fn accept(&self) -> bool;
36
37 /// Runs this task and publishes its result if it was not cancelled first.
38 fn run(self: Box<Self>);
39
40 /// Cancels this task before it starts.
41 fn cancel(self: Box<Self>);
42}
43
44/// Callable task paired with its runner-side completion endpoint.
45struct CompletablePoolTask<C, R, E> {
46 /// Callable task to execute once a worker starts this job.
47 task: C,
48 /// Completion endpoint used to publish the task result.
49 completion: TaskSlot<R, E>,
50}
51
52impl<C, R, E> PoolTask for CompletablePoolTask<C, R, E>
53where
54 C: Callable<R, E> + Send + 'static,
55 R: Send + 'static,
56 E: Send + 'static,
57{
58 /// Marks this task as accepted by an executor service.
59 fn accept(&self) -> bool {
60 self.completion.accept();
61 true
62 }
63
64 /// Runs this task and publishes its result if it was not cancelled first.
65 fn run(self: Box<Self>) {
66 let Self { task, completion } = *self;
67 TaskRunner::new(task).run(completion);
68 }
69
70 /// Publishes cancellation for an unstarted accepted task.
71 fn cancel(self: Box<Self>) {
72 let Self { completion, .. } = *self;
73 let _cancelled = completion.cancel_unstarted();
74 }
75}
76
77/// Custom job callbacks supplied by higher-level services.
78///
79/// The callbacks are executed synchronously by the pool path that reaches the
80/// corresponding lifecycle event, so they must stay short and non-blocking.
81/// Panics from custom callbacks are contained before they can escape into pool
82/// worker or shutdown accounting.
83struct CustomPoolTask {
84 /// Callback invoked once the pool accepts the job.
85 accept: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
86 /// Callback executed once a worker starts this job.
87 run: Box<dyn FnOnce() + Send + 'static>,
88 /// Callback executed if the job is cancelled before it starts.
89 cancel: Box<dyn FnOnce() + Send + 'static>,
90}
91
92impl PoolTask for CustomPoolTask {
93 /// Runs the acceptance callback once.
94 fn accept(&self) -> bool {
95 if let Some(accept) = self
96 .accept
97 .lock()
98 .unwrap_or_else(std::sync::PoisonError::into_inner)
99 .take()
100 {
101 return catch_unwind(AssertUnwindSafe(accept)).is_ok();
102 }
103 true
104 }
105
106 /// Runs this custom job.
107 fn run(self: Box<Self>) {
108 let Self { run, .. } = *self;
109 let _ignored = catch_unwind(AssertUnwindSafe(run));
110 }
111
112 /// Cancels this custom job before it starts.
113 fn cancel(self: Box<Self>) {
114 let Self { cancel, .. } = *self;
115 let _ignored = catch_unwind(AssertUnwindSafe(cancel));
116 }
117}
118
119/// Type-erased pool job with separate detached and cancellable forms.
120pub struct PoolJob {
121 /// Internal job representation hidden behind method-only access.
122 inner: PoolJobInner,
123}
124
125/// Private type-erased pool job representation.
126enum PoolJobInner {
127 /// Fire-and-forget job submitted without a completion endpoint.
128 Detached {
129 /// Callback executed once a worker starts the job.
130 run: Box<dyn FnOnce() + Send + 'static>,
131 },
132 /// Job whose queued cancellation must complete a result endpoint.
133 Completable(Box<dyn PoolTask>),
134}
135
136impl PoolJob {
137 /// Creates a custom cancellable job with no acceptance callback.
138 ///
139 /// Higher-level services that maintain their own task state usually want
140 /// [`Self::with_accept`] instead, so they can publish acceptance only after
141 /// the backing pool has accepted the job.
142 /// Custom callbacks run synchronously and should not block. Panics raised
143 /// by `run` or `cancel` are caught and ignored by the pool job wrapper.
144 ///
145 /// # Parameters
146 ///
147 /// * `run` - Callback executed when a worker starts this job.
148 /// * `cancel` - Callback executed if the accepted job is cancelled before
149 /// it starts.
150 ///
151 /// # Returns
152 ///
153 /// A custom type-erased job accepted by thread pools.
154 pub fn new(
155 run: Box<dyn FnOnce() + Send + 'static>,
156 cancel: Box<dyn FnOnce() + Send + 'static>,
157 ) -> Self {
158 Self::with_accept(Box::new(|| {}), run, cancel)
159 }
160
161 /// Creates a custom cancellable job with an acceptance callback.
162 ///
163 /// The pool invokes `accept` exactly once after the submission crosses the
164 /// acceptance boundary. If submission is rejected before acceptance, neither
165 /// `accept`, `run`, nor `cancel` is invoked. Custom callbacks run
166 /// synchronously and should not block. Panics raised by these callbacks are
167 /// caught and ignored by the pool job wrapper; an `accept` panic is reported
168 /// to the pool as a failed acceptance callback.
169 ///
170 /// # Parameters
171 ///
172 /// * `accept` - Callback invoked once the pool accepts the job.
173 /// * `run` - Callback executed when a worker starts this job.
174 /// * `cancel` - Callback executed if the accepted job is cancelled before
175 /// it starts.
176 ///
177 /// # Returns
178 ///
179 /// A custom type-erased job accepted by thread pools.
180 pub fn with_accept(
181 accept: Box<dyn FnOnce() + Send + 'static>,
182 run: Box<dyn FnOnce() + Send + 'static>,
183 cancel: Box<dyn FnOnce() + Send + 'static>,
184 ) -> Self {
185 Self {
186 inner: PoolJobInner::Completable(Box::new(CustomPoolTask {
187 accept: Mutex::new(Some(accept)),
188 run,
189 cancel,
190 })),
191 }
192 }
193
194 /// Creates a pool job from a typed callable task and completion endpoint.
195 ///
196 /// # Parameters
197 ///
198 /// * `task` - Callable task to execute when a worker starts this job.
199 /// * `completion` - Completion endpoint used to publish the typed result or
200 /// cancellation.
201 ///
202 /// # Returns
203 ///
204 /// A type-erased job that runs the task on worker start and cancels the
205 /// completion endpoint if the job is cancelled while queued.
206 pub(crate) fn from_task<C, R, E>(task: C, completion: TaskSlot<R, E>) -> Self
207 where
208 C: Callable<R, E> + Send + 'static,
209 R: Send + 'static,
210 E: Send + 'static,
211 {
212 Self {
213 inner: PoolJobInner::Completable(Box::new(CompletablePoolTask { task, completion })),
214 }
215 }
216
217 /// Creates a pool job from a runnable task without retaining a result handle.
218 ///
219 /// # Parameters
220 ///
221 /// * `task` - Runnable task to execute when a worker starts this job.
222 ///
223 /// # Returns
224 ///
225 /// A type-erased job that runs the task and discards its final result. If
226 /// the job is abandoned while queued, cancellation has no result endpoint to
227 /// notify.
228 pub(crate) fn detached<T, E>(task: T) -> Self
229 where
230 T: Runnable<E> + Send + 'static,
231 E: Send + 'static,
232 {
233 Self {
234 inner: PoolJobInner::Detached {
235 run: Box::new(move || {
236 let mut task = task;
237 let _ignored = catch_unwind(AssertUnwindSafe(|| task.run()));
238 }),
239 },
240 }
241 }
242
243 /// Marks this job as accepted by an executor service.
244 ///
245 /// Detached jobs do not have a completion endpoint, so this is a no-op for
246 /// fire-and-forget submissions.
247 ///
248 /// # Returns
249 ///
250 /// `true` when the job can continue to execution or queueing, or `false`
251 /// when a custom acceptance callback panicked and was contained.
252 pub(crate) fn accept(&self) -> bool {
253 if let PoolJobInner::Completable(task) = &self.inner {
254 return task.accept();
255 }
256 true
257 }
258
259 /// Runs this job if it has not been cancelled first.
260 ///
261 /// Consumes the job and invokes the run callback at most once.
262 pub(crate) fn run(self) {
263 match self.inner {
264 PoolJobInner::Detached { run } => run(),
265 PoolJobInner::Completable(task) => task.run(),
266 }
267 }
268
269 /// Cancels this queued job if it has not been run first.
270 ///
271 /// Consumes the job and invokes the cancellation callback at most once.
272 pub(crate) fn cancel(self) {
273 if let PoolJobInner::Completable(task) = self.inner {
274 task.cancel();
275 }
276 }
277}