Skip to main content

qubit_thread_pool/
thread_pool_hooks.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    fmt,
12    panic::{
13        AssertUnwindSafe,
14        catch_unwind,
15    },
16    sync::Arc,
17};
18
19/// Shared callback type used by worker and task hooks.
20type HookCallback = Arc<dyn Fn(usize) + Send + Sync + 'static>;
21
22/// Worker and task lifecycle hooks shared by thread-pool implementations.
23///
24/// Hooks are intentionally observational. They run on worker threads and
25/// receive the stable worker index that triggered the event. Panics raised by a
26/// hook are caught and ignored so instrumentation cannot kill a worker thread
27/// or corrupt executor accounting.
28#[derive(Clone, Default)]
29pub struct ThreadPoolHooks {
30    /// Callback invoked when a worker thread starts.
31    before_worker_start: Option<HookCallback>,
32    /// Callback invoked before a worker thread exits.
33    after_worker_stop: Option<HookCallback>,
34    /// Callback invoked immediately before a worker runs a job.
35    before_task: Option<HookCallback>,
36    /// Callback invoked immediately after a worker runs a job.
37    after_task: Option<HookCallback>,
38}
39
40impl ThreadPoolHooks {
41    /// Creates an empty hook set.
42    ///
43    /// # Returns
44    ///
45    /// A hook set with no callbacks installed.
46    #[inline]
47    pub fn new() -> Self {
48        Self::default()
49    }
50
51    /// Installs a callback invoked when a worker thread starts.
52    ///
53    /// # Parameters
54    ///
55    /// * `hook` - Callback receiving the stable worker index.
56    ///
57    /// # Returns
58    ///
59    /// This hook set for fluent configuration.
60    #[inline]
61    pub fn before_worker_start<F>(mut self, hook: F) -> Self
62    where
63        F: Fn(usize) + Send + Sync + 'static,
64    {
65        self.before_worker_start = Some(Arc::new(hook));
66        self
67    }
68
69    /// Installs a callback invoked before a worker thread exits.
70    ///
71    /// # Parameters
72    ///
73    /// * `hook` - Callback receiving the stable worker index.
74    ///
75    /// # Returns
76    ///
77    /// This hook set for fluent configuration.
78    #[inline]
79    pub fn after_worker_stop<F>(mut self, hook: F) -> Self
80    where
81        F: Fn(usize) + Send + Sync + 'static,
82    {
83        self.after_worker_stop = Some(Arc::new(hook));
84        self
85    }
86
87    /// Installs a callback invoked before each job is run.
88    ///
89    /// # Parameters
90    ///
91    /// * `hook` - Callback receiving the stable worker index.
92    ///
93    /// # Returns
94    ///
95    /// This hook set for fluent configuration.
96    #[inline]
97    pub fn before_task<F>(mut self, hook: F) -> Self
98    where
99        F: Fn(usize) + Send + Sync + 'static,
100    {
101        self.before_task = Some(Arc::new(hook));
102        self
103    }
104
105    /// Installs a callback invoked after each job is run.
106    ///
107    /// # Parameters
108    ///
109    /// * `hook` - Callback receiving the stable worker index.
110    ///
111    /// # Returns
112    ///
113    /// This hook set for fluent configuration.
114    #[inline]
115    pub fn after_task<F>(mut self, hook: F) -> Self
116    where
117        F: Fn(usize) + Send + Sync + 'static,
118    {
119        self.after_task = Some(Arc::new(hook));
120        self
121    }
122
123    /// Runs the worker-start hook if one is installed.
124    ///
125    /// # Parameters
126    ///
127    /// * `worker_index` - Stable index of the worker that started.
128    #[inline]
129    pub(crate) fn run_before_worker_start(&self, worker_index: usize) {
130        Self::run_hook(&self.before_worker_start, worker_index);
131    }
132
133    /// Runs the worker-stop hook if one is installed.
134    ///
135    /// # Parameters
136    ///
137    /// * `worker_index` - Stable index of the worker that is stopping.
138    #[inline]
139    pub(crate) fn run_after_worker_stop(&self, worker_index: usize) {
140        Self::run_hook(&self.after_worker_stop, worker_index);
141    }
142
143    /// Runs the before-task hook if one is installed.
144    ///
145    /// # Parameters
146    ///
147    /// * `worker_index` - Stable index of the worker that claimed a job.
148    #[inline]
149    pub(crate) fn run_before_task(&self, worker_index: usize) {
150        Self::run_hook(&self.before_task, worker_index);
151    }
152
153    /// Runs the after-task hook if one is installed.
154    ///
155    /// # Parameters
156    ///
157    /// * `worker_index` - Stable index of the worker that completed a job.
158    #[inline]
159    pub(crate) fn run_after_task(&self, worker_index: usize) {
160        Self::run_hook(&self.after_task, worker_index);
161    }
162
163    /// Returns whether any per-task hook is configured.
164    ///
165    /// # Returns
166    ///
167    /// `true` when worker loops must invoke task hooks around each job.
168    #[inline]
169    pub(crate) fn has_task_hooks(&self) -> bool {
170        self.before_task.is_some() || self.after_task.is_some()
171    }
172
173    /// Runs one hook callback while isolating hook panics.
174    ///
175    /// # Parameters
176    ///
177    /// * `hook` - Optional callback to invoke.
178    /// * `worker_index` - Stable worker index passed to the callback.
179    #[inline]
180    fn run_hook(hook: &Option<HookCallback>, worker_index: usize) {
181        if let Some(hook) = hook {
182            let _ = catch_unwind(AssertUnwindSafe(|| hook(worker_index)));
183        }
184    }
185}
186
187impl fmt::Debug for ThreadPoolHooks {
188    /// Formats hook presence without exposing callback internals.
189    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
190        formatter
191            .debug_struct("ThreadPoolHooks")
192            .field("before_worker_start", &self.before_worker_start.is_some())
193            .field("after_worker_stop", &self.after_worker_stop.is_some())
194            .field("before_task", &self.before_task.is_some())
195            .field("after_task", &self.after_task.is_some())
196            .finish()
197    }
198}