Skip to main content

qubit_thread_pool/
pool_job.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    panic::{
12        AssertUnwindSafe,
13        catch_unwind,
14    },
15    sync::Mutex,
16};
17
18use qubit_executor::task::spi::{
19    TaskRunner,
20    TaskSlot,
21};
22use qubit_function::{
23    Callable,
24    Runnable,
25};
26
27/// Type-erased callable owned by a pool queue.
28trait PoolTask: Send + 'static {
29    /// Marks this task as accepted by an executor service.
30    ///
31    /// # Returns
32    ///
33    /// `true` when the acceptance callback completed, or `false` when a custom
34    /// callback panicked and was contained.
35    fn accept(&self) -> bool;
36
37    /// Runs this task and publishes its result if it was not cancelled first.
38    fn run(self: Box<Self>);
39
40    /// Cancels this task before it starts.
41    fn cancel(self: Box<Self>);
42}
43
44/// Callable task paired with its runner-side completion endpoint.
45struct CompletablePoolTask<C, R, E> {
46    /// Callable task to execute once a worker starts this job.
47    task: C,
48    /// Completion endpoint used to publish the task result.
49    completion: TaskSlot<R, E>,
50}
51
52impl<C, R, E> PoolTask for CompletablePoolTask<C, R, E>
53where
54    C: Callable<R, E> + Send + 'static,
55    R: Send + 'static,
56    E: Send + 'static,
57{
58    /// Marks this task as accepted by an executor service.
59    fn accept(&self) -> bool {
60        self.completion.accept();
61        true
62    }
63
64    /// Runs this task and publishes its result if it was not cancelled first.
65    fn run(self: Box<Self>) {
66        let Self { task, completion } = *self;
67        TaskRunner::new(task).run(completion);
68    }
69
70    /// Publishes cancellation for an unstarted accepted task.
71    fn cancel(self: Box<Self>) {
72        let Self { completion, .. } = *self;
73        let _cancelled = completion.cancel_unstarted();
74    }
75}
76
77/// Custom job callbacks supplied by higher-level services.
78///
79/// The callbacks are executed synchronously by the pool path that reaches the
80/// corresponding lifecycle event, so they must stay short and non-blocking.
81/// Panics from custom callbacks are contained before they can escape into pool
82/// worker or shutdown accounting.
83struct CustomPoolTask {
84    /// Callback invoked once the pool accepts the job.
85    accept: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
86    /// Callback executed once a worker starts this job.
87    run: Box<dyn FnOnce() + Send + 'static>,
88    /// Callback executed if the job is cancelled before it starts.
89    cancel: Box<dyn FnOnce() + Send + 'static>,
90}
91
92impl PoolTask for CustomPoolTask {
93    /// Runs the acceptance callback once.
94    fn accept(&self) -> bool {
95        if let Some(accept) = self
96            .accept
97            .lock()
98            .unwrap_or_else(std::sync::PoisonError::into_inner)
99            .take()
100        {
101            return catch_unwind(AssertUnwindSafe(accept)).is_ok();
102        }
103        true
104    }
105
106    /// Runs this custom job.
107    fn run(self: Box<Self>) {
108        let Self { run, .. } = *self;
109        let _ignored = catch_unwind(AssertUnwindSafe(run));
110    }
111
112    /// Cancels this custom job before it starts.
113    fn cancel(self: Box<Self>) {
114        let Self { cancel, .. } = *self;
115        let _ignored = catch_unwind(AssertUnwindSafe(cancel));
116    }
117}
118
119/// Type-erased pool job with separate detached and cancellable forms.
120pub struct PoolJob {
121    /// Internal job representation hidden behind method-only access.
122    inner: PoolJobInner,
123}
124
125/// Private type-erased pool job representation.
126enum PoolJobInner {
127    /// Fire-and-forget job submitted without a completion endpoint.
128    Detached {
129        /// Callback executed once a worker starts the job.
130        run: Box<dyn FnOnce() + Send + 'static>,
131    },
132    /// Job whose queued cancellation must complete a result endpoint.
133    Completable(Box<dyn PoolTask>),
134}
135
136impl PoolJob {
137    /// Creates a custom cancellable job with no acceptance callback.
138    ///
139    /// Higher-level services that maintain their own task state usually want
140    /// [`Self::with_accept`] instead, so they can publish acceptance only after
141    /// the backing pool has accepted the job.
142    /// Custom callbacks run synchronously and should not block. Panics raised
143    /// by `run` or `cancel` are caught and ignored by the pool job wrapper.
144    ///
145    /// # Parameters
146    ///
147    /// * `run` - Callback executed when a worker starts this job.
148    /// * `cancel` - Callback executed if the accepted job is cancelled before
149    ///   it starts.
150    ///
151    /// # Returns
152    ///
153    /// A custom type-erased job accepted by thread pools.
154    pub fn new(
155        run: Box<dyn FnOnce() + Send + 'static>,
156        cancel: Box<dyn FnOnce() + Send + 'static>,
157    ) -> Self {
158        Self::with_accept(Box::new(|| {}), run, cancel)
159    }
160
161    /// Creates a custom cancellable job with an acceptance callback.
162    ///
163    /// The pool invokes `accept` exactly once after the submission crosses the
164    /// acceptance boundary. If submission is rejected before acceptance, neither
165    /// `accept`, `run`, nor `cancel` is invoked. Custom callbacks run
166    /// synchronously and should not block. Panics raised by these callbacks are
167    /// caught and ignored by the pool job wrapper; an `accept` panic is reported
168    /// to the pool as a failed acceptance callback.
169    ///
170    /// # Parameters
171    ///
172    /// * `accept` - Callback invoked once the pool accepts the job.
173    /// * `run` - Callback executed when a worker starts this job.
174    /// * `cancel` - Callback executed if the accepted job is cancelled before
175    ///   it starts.
176    ///
177    /// # Returns
178    ///
179    /// A custom type-erased job accepted by thread pools.
180    pub fn with_accept(
181        accept: Box<dyn FnOnce() + Send + 'static>,
182        run: Box<dyn FnOnce() + Send + 'static>,
183        cancel: Box<dyn FnOnce() + Send + 'static>,
184    ) -> Self {
185        Self {
186            inner: PoolJobInner::Completable(Box::new(CustomPoolTask {
187                accept: Mutex::new(Some(accept)),
188                run,
189                cancel,
190            })),
191        }
192    }
193
194    /// Creates a pool job from a typed callable task and completion endpoint.
195    ///
196    /// # Parameters
197    ///
198    /// * `task` - Callable task to execute when a worker starts this job.
199    /// * `completion` - Completion endpoint used to publish the typed result or
200    ///   cancellation.
201    ///
202    /// # Returns
203    ///
204    /// A type-erased job that runs the task on worker start and cancels the
205    /// completion endpoint if the job is cancelled while queued.
206    pub(crate) fn from_task<C, R, E>(task: C, completion: TaskSlot<R, E>) -> Self
207    where
208        C: Callable<R, E> + Send + 'static,
209        R: Send + 'static,
210        E: Send + 'static,
211    {
212        Self {
213            inner: PoolJobInner::Completable(Box::new(CompletablePoolTask { task, completion })),
214        }
215    }
216
217    /// Creates a pool job from a runnable task without retaining a result handle.
218    ///
219    /// # Parameters
220    ///
221    /// * `task` - Runnable task to execute when a worker starts this job.
222    ///
223    /// # Returns
224    ///
225    /// A type-erased job that runs the task and discards its final result. If
226    /// the job is abandoned while queued, cancellation has no result endpoint to
227    /// notify.
228    pub(crate) fn detached<T, E>(task: T) -> Self
229    where
230        T: Runnable<E> + Send + 'static,
231        E: Send + 'static,
232    {
233        Self {
234            inner: PoolJobInner::Detached {
235                run: Box::new(move || {
236                    let mut task = task;
237                    let _ignored = catch_unwind(AssertUnwindSafe(|| task.run()));
238                }),
239            },
240        }
241    }
242
243    /// Marks this job as accepted by an executor service.
244    ///
245    /// Detached jobs do not have a completion endpoint, so this is a no-op for
246    /// fire-and-forget submissions.
247    ///
248    /// # Returns
249    ///
250    /// `true` when the job can continue to execution or queueing, or `false`
251    /// when a custom acceptance callback panicked and was contained.
252    pub(crate) fn accept(&self) -> bool {
253        if let PoolJobInner::Completable(task) = &self.inner {
254            return task.accept();
255        }
256        true
257    }
258
259    /// Runs this job if it has not been cancelled first.
260    ///
261    /// Consumes the job and invokes the run callback at most once.
262    pub(crate) fn run(self) {
263        match self.inner {
264            PoolJobInner::Detached { run } => run(),
265            PoolJobInner::Completable(task) => task.run(),
266        }
267    }
268
269    /// Cancels this queued job if it has not been run first.
270    ///
271    /// Consumes the job and invokes the cancellation callback at most once.
272    pub(crate) fn cancel(self) {
273        if let PoolJobInner::Completable(task) = self.inner {
274            task.cancel();
275        }
276    }
277}