qubit_thread_pool/pool_job.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 panic::{
12 AssertUnwindSafe,
13 catch_unwind,
14 },
15 sync::Mutex,
16};
17
18use qubit_executor::task::spi::{
19 TaskRunner,
20 TaskSlot,
21};
22use qubit_function::{
23 Callable,
24 Runnable,
25};
26
27/// Type-erased callable owned by a pool queue.
28trait PoolTask: Send + 'static {
29 /// Marks this task as accepted by an executor service.
30 fn accept(&self);
31
32 /// Runs this task and publishes its result if it was not cancelled first.
33 fn run(self: Box<Self>);
34
35 /// Cancels this task before it starts.
36 fn cancel(self: Box<Self>);
37}
38
39/// Callable task paired with its runner-side completion endpoint.
40struct CompletablePoolTask<C, R, E> {
41 /// Callable task to execute once a worker starts this job.
42 task: C,
43 /// Completion endpoint used to publish the task result.
44 completion: TaskSlot<R, E>,
45}
46
47impl<C, R, E> PoolTask for CompletablePoolTask<C, R, E>
48where
49 C: Callable<R, E> + Send + 'static,
50 R: Send + 'static,
51 E: Send + 'static,
52{
53 /// Marks this task as accepted by an executor service.
54 fn accept(&self) {
55 self.completion.accept();
56 }
57
58 /// Runs this task and publishes its result if it was not cancelled first.
59 fn run(self: Box<Self>) {
60 let Self { task, completion } = *self;
61 TaskRunner::new(task).run(completion);
62 }
63
64 /// Publishes cancellation for an unstarted accepted task.
65 fn cancel(self: Box<Self>) {
66 let Self { completion, .. } = *self;
67 let _cancelled = completion.cancel_unstarted();
68 }
69}
70
71/// Custom job callbacks supplied by higher-level services.
72struct CustomPoolTask {
73 /// Callback invoked once the pool accepts the job.
74 accept: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
75 /// Callback executed once a worker starts this job.
76 run: Box<dyn FnOnce() + Send + 'static>,
77 /// Callback executed if the job is cancelled before it starts.
78 cancel: Box<dyn FnOnce() + Send + 'static>,
79}
80
81impl PoolTask for CustomPoolTask {
82 /// Runs the acceptance callback once.
83 fn accept(&self) {
84 if let Some(accept) = self
85 .accept
86 .lock()
87 .expect("custom pool job accept lock should not be poisoned")
88 .take()
89 {
90 accept();
91 }
92 }
93
94 /// Runs this custom job.
95 fn run(self: Box<Self>) {
96 (self.run)();
97 }
98
99 /// Cancels this custom job before it starts.
100 fn cancel(self: Box<Self>) {
101 (self.cancel)();
102 }
103}
104
105/// Type-erased pool job with separate detached and cancellable forms.
106pub struct PoolJob {
107 /// Internal job representation hidden behind method-only access.
108 inner: PoolJobInner,
109}
110
111/// Private type-erased pool job representation.
112enum PoolJobInner {
113 /// Fire-and-forget job submitted without a completion endpoint.
114 Detached {
115 /// Callback executed once a worker starts the job.
116 run: Box<dyn FnOnce() + Send + 'static>,
117 },
118 /// Job whose queued cancellation must complete a result endpoint.
119 Completable(Box<dyn PoolTask>),
120}
121
122impl PoolJob {
123 /// Creates a custom cancellable job with no acceptance callback.
124 ///
125 /// Higher-level services that maintain their own task state usually want
126 /// [`Self::with_accept`] instead, so they can publish acceptance only after
127 /// the backing pool has accepted the job.
128 ///
129 /// # Parameters
130 ///
131 /// * `run` - Callback executed when a worker starts this job.
132 /// * `cancel` - Callback executed if the accepted job is cancelled before
133 /// it starts.
134 ///
135 /// # Returns
136 ///
137 /// A custom type-erased job accepted by thread pools.
138 pub fn new(
139 run: Box<dyn FnOnce() + Send + 'static>,
140 cancel: Box<dyn FnOnce() + Send + 'static>,
141 ) -> Self {
142 Self::with_accept(Box::new(|| {}), run, cancel)
143 }
144
145 /// Creates a custom cancellable job with an acceptance callback.
146 ///
147 /// The pool invokes `accept` exactly once after the submission crosses the
148 /// acceptance boundary. If submission is rejected before acceptance, neither
149 /// `accept`, `run`, nor `cancel` is invoked.
150 ///
151 /// # Parameters
152 ///
153 /// * `accept` - Callback invoked once the pool accepts the job.
154 /// * `run` - Callback executed when a worker starts this job.
155 /// * `cancel` - Callback executed if the accepted job is cancelled before
156 /// it starts.
157 ///
158 /// # Returns
159 ///
160 /// A custom type-erased job accepted by thread pools.
161 pub fn with_accept(
162 accept: Box<dyn FnOnce() + Send + 'static>,
163 run: Box<dyn FnOnce() + Send + 'static>,
164 cancel: Box<dyn FnOnce() + Send + 'static>,
165 ) -> Self {
166 Self {
167 inner: PoolJobInner::Completable(Box::new(CustomPoolTask {
168 accept: Mutex::new(Some(accept)),
169 run,
170 cancel,
171 })),
172 }
173 }
174
175 /// Creates a pool job from a typed callable task and completion endpoint.
176 ///
177 /// # Parameters
178 ///
179 /// * `task` - Callable task to execute when a worker starts this job.
180 /// * `completion` - Completion endpoint used to publish the typed result or
181 /// cancellation.
182 ///
183 /// # Returns
184 ///
185 /// A type-erased job that runs the task on worker start and cancels the
186 /// completion endpoint if the job is cancelled while queued.
187 pub(crate) fn from_task<C, R, E>(task: C, completion: TaskSlot<R, E>) -> Self
188 where
189 C: Callable<R, E> + Send + 'static,
190 R: Send + 'static,
191 E: Send + 'static,
192 {
193 Self {
194 inner: PoolJobInner::Completable(Box::new(CompletablePoolTask { task, completion })),
195 }
196 }
197
198 /// Creates a pool job from a runnable task without retaining a result handle.
199 ///
200 /// # Parameters
201 ///
202 /// * `task` - Runnable task to execute when a worker starts this job.
203 ///
204 /// # Returns
205 ///
206 /// A type-erased job that runs the task and discards its final result. If
207 /// the job is abandoned while queued, cancellation has no result endpoint to
208 /// notify.
209 pub(crate) fn detached<T, E>(task: T) -> Self
210 where
211 T: Runnable<E> + Send + 'static,
212 E: Send + 'static,
213 {
214 Self {
215 inner: PoolJobInner::Detached {
216 run: Box::new(move || {
217 let mut task = task;
218 let _ignored = catch_unwind(AssertUnwindSafe(|| task.run()));
219 }),
220 },
221 }
222 }
223
224 /// Marks this job as accepted by an executor service.
225 ///
226 /// Detached jobs do not have a completion endpoint, so this is a no-op for
227 /// fire-and-forget submissions.
228 pub(crate) fn accept(&self) {
229 if let PoolJobInner::Completable(task) = &self.inner {
230 task.accept();
231 }
232 }
233
234 /// Runs this job if it has not been cancelled first.
235 ///
236 /// Consumes the job and invokes the run callback at most once.
237 pub(crate) fn run(self) {
238 match self.inner {
239 PoolJobInner::Detached { run } => run(),
240 PoolJobInner::Completable(task) => task.run(),
241 }
242 }
243
244 /// Cancels this queued job if it has not been run first.
245 ///
246 /// Consumes the job and invokes the cancellation callback at most once.
247 pub(crate) fn cancel(self) {
248 if let PoolJobInner::Completable(task) = self.inner {
249 task.cancel();
250 }
251 }
252}