qubit_thread_pool/thread_pool/pool_job.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use qubit_executor::{
11 TaskCompletion,
12 TaskRunner,
13};
14use qubit_function::Callable;
15
16/// Type-erased pool job with a cancellation path for queued work.
17///
18/// `PoolJob` is a low-level extension point for building custom services on
19/// top of [`super::thread_pool::ThreadPool`]. The pool calls the run callback after a worker takes
20/// the job, or the cancel callback if the job is still queued during immediate
21/// shutdown.
22pub struct PoolJob {
23 /// Callback executed once a worker starts the job.
24 run: Box<dyn FnOnce() + Send + 'static>,
25 /// Callback executed if the job is cancelled before a worker starts it.
26 cancel: Box<dyn FnOnce() + Send + 'static>,
27}
28
29impl PoolJob {
30 /// Creates a pool job from run and cancel callbacks.
31 ///
32 /// # Parameters
33 ///
34 /// * `run` - Callback executed once a worker starts this job.
35 /// * `cancel` - Callback executed if this job is cancelled while queued.
36 ///
37 /// # Returns
38 ///
39 /// A type-erased job accepted by [`super::thread_pool::ThreadPool::submit_job`].
40 pub fn new(
41 run: Box<dyn FnOnce() + Send + 'static>,
42 cancel: Box<dyn FnOnce() + Send + 'static>,
43 ) -> Self {
44 Self { run, cancel }
45 }
46
47 /// Creates a pool job from a typed callable task and completion endpoint.
48 ///
49 /// # Parameters
50 ///
51 /// * `task` - Callable task to execute when a worker starts this job.
52 /// * `completion` - Completion endpoint used to publish the typed result or
53 /// cancellation.
54 ///
55 /// # Returns
56 ///
57 /// A type-erased job that runs the task on worker start and cancels the
58 /// completion endpoint if the job is abandoned while queued.
59 pub fn from_task<C, R, E>(task: C, completion: TaskCompletion<R, E>) -> Self
60 where
61 C: Callable<R, E> + Send + 'static,
62 R: Send + 'static,
63 E: Send + 'static,
64 {
65 let cancel_completion = completion.clone();
66 Self::new(
67 Box::new(move || {
68 TaskRunner::new(task).run(completion);
69 }),
70 Box::new(move || {
71 cancel_completion.cancel();
72 }),
73 )
74 }
75
76 /// Runs this job if it has not been cancelled first.
77 ///
78 /// Consumes the job and invokes the run callback at most once.
79 pub(crate) fn run(self) {
80 (self.run)();
81 }
82
83 /// Cancels this queued job if it has not been run first.
84 ///
85 /// Consumes the job and invokes the cancellation callback at most once.
86 pub(crate) fn cancel(self) {
87 (self.cancel)();
88 }
89}