qubit_thread_pool/thread_pool/pool_job.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9use qubit_executor::{
10 TaskCompletion,
11 TaskRunner,
12};
13use qubit_function::Callable;
14
15/// Type-erased pool job with a cancellation path for queued work.
16///
17/// `PoolJob` is a low-level extension point for building custom services on
18/// top of [`super::thread_pool::ThreadPool`]. The pool calls the run callback after a worker takes
19/// the job, or the cancel callback if the job is still queued during immediate
20/// shutdown.
21pub struct PoolJob {
22 /// Callback executed once a worker starts the job.
23 run: Box<dyn FnOnce() + Send + 'static>,
24 /// Callback executed if the job is cancelled before a worker starts it.
25 cancel: Box<dyn FnOnce() + Send + 'static>,
26}
27
28impl PoolJob {
29 /// Creates a pool job from run and cancel callbacks.
30 ///
31 /// # Parameters
32 ///
33 /// * `run` - Callback executed once a worker starts this job.
34 /// * `cancel` - Callback executed if this job is cancelled while queued.
35 ///
36 /// # Returns
37 ///
38 /// A type-erased job accepted by [`super::thread_pool::ThreadPool::submit_job`].
39 pub fn new(
40 run: Box<dyn FnOnce() + Send + 'static>,
41 cancel: Box<dyn FnOnce() + Send + 'static>,
42 ) -> Self {
43 Self { run, cancel }
44 }
45
46 /// Creates a pool job from a typed callable task and completion endpoint.
47 ///
48 /// # Parameters
49 ///
50 /// * `task` - Callable task to execute when a worker starts this job.
51 /// * `completion` - Completion endpoint used to publish the typed result or
52 /// cancellation.
53 ///
54 /// # Returns
55 ///
56 /// A type-erased job that runs the task on worker start and cancels the
57 /// completion endpoint if the job is abandoned while queued.
58 pub fn from_task<C, R, E>(task: C, completion: TaskCompletion<R, E>) -> Self
59 where
60 C: Callable<R, E> + Send + 'static,
61 R: Send + 'static,
62 E: Send + 'static,
63 {
64 let cancel_completion = completion.clone();
65 Self::new(
66 Box::new(move || {
67 TaskRunner::new(task).run(completion);
68 }),
69 Box::new(move || {
70 cancel_completion.cancel();
71 }),
72 )
73 }
74
75 /// Runs this job if it has not been cancelled first.
76 ///
77 /// Consumes the job and invokes the run callback at most once.
78 pub(crate) fn run(self) {
79 (self.run)();
80 }
81
82 /// Cancels this queued job if it has not been run first.
83 ///
84 /// Consumes the job and invokes the cancellation callback at most once.
85 pub(crate) fn cancel(self) {
86 (self.cancel)();
87 }
88}