Skip to main content

qubit_thread_pool/
pool_job.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    panic::{
12        AssertUnwindSafe,
13        catch_unwind,
14    },
15    sync::Mutex,
16};
17
18use qubit_executor::task::spi::{
19    TaskRunner,
20    TaskSlot,
21};
22use qubit_function::{
23    Callable,
24    Runnable,
25};
26
27/// Type-erased callable owned by a pool queue.
28trait PoolTask: Send + 'static {
29    /// Marks this task as accepted by an executor service.
30    fn accept(&self);
31
32    /// Runs this task and publishes its result if it was not cancelled first.
33    fn run(self: Box<Self>);
34
35    /// Cancels this task before it starts.
36    fn cancel(self: Box<Self>);
37}
38
39/// Callable task paired with its runner-side completion endpoint.
40struct CompletablePoolTask<C, R, E> {
41    /// Callable task to execute once a worker starts this job.
42    task: C,
43    /// Completion endpoint used to publish the task result.
44    completion: TaskSlot<R, E>,
45}
46
47impl<C, R, E> PoolTask for CompletablePoolTask<C, R, E>
48where
49    C: Callable<R, E> + Send + 'static,
50    R: Send + 'static,
51    E: Send + 'static,
52{
53    /// Marks this task as accepted by an executor service.
54    fn accept(&self) {
55        self.completion.accept();
56    }
57
58    /// Runs this task and publishes its result if it was not cancelled first.
59    fn run(self: Box<Self>) {
60        let Self { task, completion } = *self;
61        TaskRunner::new(task).run(completion);
62    }
63
64    /// Publishes cancellation for an unstarted accepted task.
65    fn cancel(self: Box<Self>) {
66        let Self { completion, .. } = *self;
67        let _cancelled = completion.cancel_unstarted();
68    }
69}
70
71/// Custom job callbacks supplied by higher-level services.
72struct CustomPoolTask {
73    /// Callback invoked once the pool accepts the job.
74    accept: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
75    /// Callback executed once a worker starts this job.
76    run: Box<dyn FnOnce() + Send + 'static>,
77    /// Callback executed if the job is cancelled before it starts.
78    cancel: Box<dyn FnOnce() + Send + 'static>,
79}
80
81impl PoolTask for CustomPoolTask {
82    /// Runs the acceptance callback once.
83    fn accept(&self) {
84        if let Some(accept) = self
85            .accept
86            .lock()
87            .expect("custom pool job accept lock should not be poisoned")
88            .take()
89        {
90            accept();
91        }
92    }
93
94    /// Runs this custom job.
95    fn run(self: Box<Self>) {
96        (self.run)();
97    }
98
99    /// Cancels this custom job before it starts.
100    fn cancel(self: Box<Self>) {
101        (self.cancel)();
102    }
103}
104
105/// Type-erased pool job with separate detached and cancellable forms.
106pub struct PoolJob {
107    /// Internal job representation hidden behind method-only access.
108    inner: PoolJobInner,
109}
110
111/// Private type-erased pool job representation.
112enum PoolJobInner {
113    /// Fire-and-forget job submitted without a completion endpoint.
114    Detached {
115        /// Callback executed once a worker starts the job.
116        run: Box<dyn FnOnce() + Send + 'static>,
117    },
118    /// Job whose queued cancellation must complete a result endpoint.
119    Completable(Box<dyn PoolTask>),
120}
121
122impl PoolJob {
123    /// Creates a custom cancellable job with no acceptance callback.
124    ///
125    /// Higher-level services that maintain their own task state usually want
126    /// [`Self::with_accept`] instead, so they can publish acceptance only after
127    /// the backing pool has accepted the job.
128    ///
129    /// # Parameters
130    ///
131    /// * `run` - Callback executed when a worker starts this job.
132    /// * `cancel` - Callback executed if the accepted job is cancelled before
133    ///   it starts.
134    ///
135    /// # Returns
136    ///
137    /// A custom type-erased job accepted by thread pools.
138    pub fn new(
139        run: Box<dyn FnOnce() + Send + 'static>,
140        cancel: Box<dyn FnOnce() + Send + 'static>,
141    ) -> Self {
142        Self::with_accept(Box::new(|| {}), run, cancel)
143    }
144
145    /// Creates a custom cancellable job with an acceptance callback.
146    ///
147    /// The pool invokes `accept` exactly once after the submission crosses the
148    /// acceptance boundary. If submission is rejected before acceptance, neither
149    /// `accept`, `run`, nor `cancel` is invoked.
150    ///
151    /// # Parameters
152    ///
153    /// * `accept` - Callback invoked once the pool accepts the job.
154    /// * `run` - Callback executed when a worker starts this job.
155    /// * `cancel` - Callback executed if the accepted job is cancelled before
156    ///   it starts.
157    ///
158    /// # Returns
159    ///
160    /// A custom type-erased job accepted by thread pools.
161    pub fn with_accept(
162        accept: Box<dyn FnOnce() + Send + 'static>,
163        run: Box<dyn FnOnce() + Send + 'static>,
164        cancel: Box<dyn FnOnce() + Send + 'static>,
165    ) -> Self {
166        Self {
167            inner: PoolJobInner::Completable(Box::new(CustomPoolTask {
168                accept: Mutex::new(Some(accept)),
169                run,
170                cancel,
171            })),
172        }
173    }
174
175    /// Creates a pool job from a typed callable task and completion endpoint.
176    ///
177    /// # Parameters
178    ///
179    /// * `task` - Callable task to execute when a worker starts this job.
180    /// * `completion` - Completion endpoint used to publish the typed result or
181    ///   cancellation.
182    ///
183    /// # Returns
184    ///
185    /// A type-erased job that runs the task on worker start and cancels the
186    /// completion endpoint if the job is cancelled while queued.
187    pub(crate) fn from_task<C, R, E>(task: C, completion: TaskSlot<R, E>) -> Self
188    where
189        C: Callable<R, E> + Send + 'static,
190        R: Send + 'static,
191        E: Send + 'static,
192    {
193        Self {
194            inner: PoolJobInner::Completable(Box::new(CompletablePoolTask { task, completion })),
195        }
196    }
197
198    /// Creates a pool job from a runnable task without retaining a result handle.
199    ///
200    /// # Parameters
201    ///
202    /// * `task` - Runnable task to execute when a worker starts this job.
203    ///
204    /// # Returns
205    ///
206    /// A type-erased job that runs the task and discards its final result. If
207    /// the job is abandoned while queued, cancellation has no result endpoint to
208    /// notify.
209    pub(crate) fn detached<T, E>(task: T) -> Self
210    where
211        T: Runnable<E> + Send + 'static,
212        E: Send + 'static,
213    {
214        Self {
215            inner: PoolJobInner::Detached {
216                run: Box::new(move || {
217                    let mut task = task;
218                    let _ignored = catch_unwind(AssertUnwindSafe(|| task.run()));
219                }),
220            },
221        }
222    }
223
224    /// Marks this job as accepted by an executor service.
225    ///
226    /// Detached jobs do not have a completion endpoint, so this is a no-op for
227    /// fire-and-forget submissions.
228    pub(crate) fn accept(&self) {
229        if let PoolJobInner::Completable(task) = &self.inner {
230            task.accept();
231        }
232    }
233
234    /// Runs this job if it has not been cancelled first.
235    ///
236    /// Consumes the job and invokes the run callback at most once.
237    pub(crate) fn run(self) {
238        match self.inner {
239            PoolJobInner::Detached { run } => run(),
240            PoolJobInner::Completable(task) => task.run(),
241        }
242    }
243
244    /// Cancels this queued job if it has not been run first.
245    ///
246    /// Consumes the job and invokes the cancellation callback at most once.
247    pub(crate) fn cancel(self) {
248        if let PoolJobInner::Completable(task) = self.inner {
249            task.cancel();
250        }
251    }
252}