Skip to main content

zerodds_corba_rt/
threadpool.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Runnable threadpool with priority **lanes** (RT-CORBA §5.7) — the
5//! runtime realization of the structural model from [`crate::policy`].
6//!
7//! A [`ThreadpoolRuntime`] maps each [`Lane`](crate::policy::Lane) onto real
8//! OS threads: per lane, `static_threads` workers are created at startup and up
9//! to `dynamic_threads` more are added dynamically under saturation (with
10//! idle-timeout teardown). [`dispatch`](ThreadpoolRuntime::dispatch) routes a
11//! job via [`Threadpool::lane_for`](crate::policy::Threadpool::lane_for) into the
12//! lane of its priority. All workers wait **event-driven** on a condvar
13//! (no busy-poll).
14//!
15//! The **OS scheduler priority** (e.g. `pthread_setschedparam`/`SCHED_FIFO`)
16//! is set via the injectable [`NativePrioritySetter`] hook — the
17//! platform-specific, possibly `unsafe` code needed there deliberately lives in
18//! the caller, so that this crate stays `forbid(unsafe_code)`.
19
20#[cfg(feature = "std")]
21pub use std_impl::{DispatchError, NativePrioritySetter, ThreadpoolRuntime};
22
23#[cfg(feature = "std")]
24#[allow(clippy::expect_used)]
25mod std_impl {
26    use alloc::boxed::Box;
27    use alloc::collections::VecDeque;
28    use alloc::sync::Arc;
29    use alloc::vec::Vec;
30    use core::fmt;
31    use core::time::Duration;
32    use std::sync::{Condvar, Mutex};
33    use std::thread::JoinHandle;
34
35    use crate::policy::Threadpool;
36    use crate::priority::{Priority, PriorityMapping};
37
38    /// Idle time after which a *dynamic* worker tears itself down.
39    const DYNAMIC_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
40
41    type Job = Box<dyn FnOnce() + Send + 'static>;
42
43    /// Hook for setting the native OS scheduler priority of a worker thread.
44    ///
45    /// Called once per worker at thread start, with the lane's native priority
46    /// computed via the [`PriorityMapping`]. The concrete
47    /// (platform-specific, often `unsafe`) implementation — e.g.
48    /// `pthread_setschedparam` with `SCHED_FIFO` — is provided by the caller.
49    pub trait NativePrioritySetter: Send + Sync {
50        /// Sets the OS priority of the current thread.
51        fn set_current_thread_priority(&self, native_priority: i32);
52    }
53
54    /// Error from [`dispatch`](ThreadpoolRuntime::dispatch).
55    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
56    pub enum DispatchError {
57        /// The pool has no lane (empty threadpool).
58        NoLane,
59        /// No worker free and buffering off/buffer full — request rejected.
60        Rejected,
61    }
62
63    impl fmt::Display for DispatchError {
64        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65            match self {
66                Self::NoLane => f.write_str("threadpool has no lane"),
67                Self::Rejected => f.write_str("request rejected (no worker, buffering off/full)"),
68            }
69        }
70    }
71
72    impl std::error::Error for DispatchError {}
73
74    struct LaneState {
75        queue: VecDeque<Job>,
76        /// Currently alive workers (static + dynamic).
77        workers: u32,
78        /// Workers currently executing a job.
79        busy: u32,
80        /// Of those, the dynamically created ones (for the growth budget).
81        dynamic_alive: u32,
82        shutdown: bool,
83    }
84
85    struct Lane {
86        priority: Priority,
87        native_priority: i32,
88        dynamic_threads: u32,
89        sync: Arc<(Mutex<LaneState>, Condvar)>,
90        handles: Mutex<Vec<JoinHandle<()>>>,
91    }
92
93    /// A running threadpool with priority lanes (RT-CORBA §5.7).
94    pub struct ThreadpoolRuntime {
95        lanes: Vec<Lane>,
96        allow_buffering: bool,
97        max_buffered: u32,
98        hook: Option<Arc<dyn NativePrioritySetter>>,
99    }
100
101    impl ThreadpoolRuntime {
102        /// Starts a threadpool: creates `static_threads` workers per lane.
103        ///
104        /// `mapping` maps the lane priority to the native priority that is
105        /// passed to the `hook` (if set) per worker.
106        #[must_use]
107        pub fn start<M: PriorityMapping>(
108            pool: &Threadpool,
109            mapping: &M,
110            hook: Option<Arc<dyn NativePrioritySetter>>,
111        ) -> Self {
112            let mut lanes = Vec::with_capacity(pool.lanes.len());
113            for lane_cfg in &pool.lanes {
114                let native_priority = mapping.to_native(lane_cfg.priority).unwrap_or(0);
115                let lane = Lane {
116                    priority: lane_cfg.priority,
117                    native_priority,
118                    dynamic_threads: lane_cfg.dynamic_threads,
119                    sync: Arc::new((
120                        Mutex::new(LaneState {
121                            queue: VecDeque::new(),
122                            workers: lane_cfg.static_threads,
123                            busy: 0,
124                            dynamic_alive: 0,
125                            shutdown: false,
126                        }),
127                        Condvar::new(),
128                    )),
129                    handles: Mutex::new(Vec::new()),
130                };
131                let stacksize = pool.stacksize;
132                let mut handles = lane.handles.lock().expect("lane handles poisoned");
133                for _ in 0..lane_cfg.static_threads {
134                    handles.push(spawn_worker(
135                        Arc::clone(&lane.sync),
136                        hook.clone(),
137                        native_priority,
138                        stacksize,
139                        false,
140                    ));
141                }
142                drop(handles);
143                lanes.push(lane);
144            }
145            Self {
146                lanes,
147                allow_buffering: pool.allow_request_buffering,
148                max_buffered: pool.max_buffered_requests,
149                hook,
150            }
151        }
152
153        /// Selects the lane index for a priority — same rule as
154        /// [`Threadpool::lane_for`](crate::policy::Threadpool::lane_for):
155        /// highest lane priority ≤ `priority`, otherwise the lowest lane.
156        fn select_lane(&self, priority: Priority) -> Option<usize> {
157            let covering = self
158                .lanes
159                .iter()
160                .enumerate()
161                .filter(|(_, l)| l.priority <= priority)
162                .max_by_key(|(_, l)| l.priority)
163                .map(|(i, _)| i);
164            covering.or_else(|| {
165                self.lanes
166                    .iter()
167                    .enumerate()
168                    .min_by_key(|(_, l)| l.priority)
169                    .map(|(i, _)| i)
170            })
171        }
172
173        /// Hands a job to the lane of its priority. Wakes a waiting worker or
174        /// creates — under saturation with free dynamic budget — a dynamic
175        /// worker.
176        ///
177        /// # Errors
178        /// [`DispatchError::NoLane`] on an empty pool; [`DispatchError::Rejected`]
179        /// if no worker is free and buffering is off or the buffer is full.
180        #[allow(clippy::missing_panics_doc)]
181        pub fn dispatch<F>(&self, priority: Priority, job: F) -> Result<(), DispatchError>
182        where
183            F: FnOnce() + Send + 'static,
184        {
185            let idx = self.select_lane(priority).ok_or(DispatchError::NoLane)?;
186            let lane = &self.lanes[idx];
187            let (lock, cv) = &*lane.sync;
188
189            let need_spawn;
190            {
191                let mut st = lock.lock().expect("lane state poisoned");
192                // Free capacity = alive workers minus already-scheduled work
193                // (running + in the queue). A worker that is just starting up
194                // counts as free; a waiting job occupies a worker.
195                let pending = st.busy + st.queue.len() as u32;
196                let free = st.workers.saturating_sub(pending);
197                let can_grow = st.dynamic_alive < lane.dynamic_threads;
198                if !self.allow_buffering && free == 0 && !can_grow {
199                    return Err(DispatchError::Rejected);
200                }
201                if self.max_buffered > 0 && st.queue.len() as u32 >= self.max_buffered {
202                    return Err(DispatchError::Rejected);
203                }
204                st.queue.push_back(Box::new(job));
205                need_spawn = free == 0 && can_grow;
206                if need_spawn {
207                    st.workers += 1;
208                    st.dynamic_alive += 1;
209                }
210            }
211
212            if need_spawn {
213                let handle = spawn_worker(
214                    Arc::clone(&lane.sync),
215                    self.hook.clone(),
216                    lane.native_priority,
217                    0,
218                    true,
219                );
220                lane.handles
221                    .lock()
222                    .expect("lane handles poisoned")
223                    .push(handle);
224            } else {
225                cv.notify_one();
226            }
227            Ok(())
228        }
229
230        /// Number of lanes.
231        #[must_use]
232        pub fn lane_count(&self) -> usize {
233            self.lanes.len()
234        }
235
236        /// Number of currently alive workers of a lane (static + dynamic),
237        /// derived from the stored handles. Primarily for tests/telemetry.
238        ///
239        /// # Panics
240        /// On a poisoned internal lock.
241        #[must_use]
242        pub fn spawned_workers(&self, lane_index: usize) -> usize {
243            self.lanes
244                .get(lane_index)
245                .map(|l| l.handles.lock().expect("lane handles poisoned").len())
246                .unwrap_or(0)
247        }
248    }
249
250    impl Drop for ThreadpoolRuntime {
251        fn drop(&mut self) {
252            for lane in &self.lanes {
253                let (lock, cv) = &*lane.sync;
254                {
255                    let mut st = lock.lock().expect("lane state poisoned");
256                    st.shutdown = true;
257                }
258                cv.notify_all();
259            }
260            for lane in &self.lanes {
261                let handles =
262                    core::mem::take(&mut *lane.handles.lock().expect("lane handles poisoned"));
263                for h in handles {
264                    let _ = h.join();
265                }
266            }
267        }
268    }
269
270    fn spawn_worker(
271        sync: Arc<(Mutex<LaneState>, Condvar)>,
272        hook: Option<Arc<dyn NativePrioritySetter>>,
273        native_priority: i32,
274        stacksize: usize,
275        dynamic: bool,
276    ) -> JoinHandle<()> {
277        let mut builder =
278            std::thread::Builder::new().name(alloc::format!("rtcorba-lane-{native_priority}"));
279        if stacksize > 0 {
280            builder = builder.stack_size(stacksize);
281        }
282        builder
283            .spawn(move || worker_loop(&sync, hook.as_deref(), native_priority, dynamic))
284            .expect("spawn rt-corba worker")
285    }
286
287    fn worker_loop(
288        sync: &(Mutex<LaneState>, Condvar),
289        hook: Option<&dyn NativePrioritySetter>,
290        native_priority: i32,
291        dynamic: bool,
292    ) {
293        if let Some(h) = hook {
294            h.set_current_thread_priority(native_priority);
295        }
296        let (lock, cv) = sync;
297        loop {
298            let job = {
299                let mut st = lock.lock().expect("lane state poisoned");
300                loop {
301                    if let Some(job) = st.queue.pop_front() {
302                        st.busy += 1;
303                        break job;
304                    }
305                    if st.shutdown {
306                        st.workers = st.workers.saturating_sub(1);
307                        if dynamic {
308                            st.dynamic_alive = st.dynamic_alive.saturating_sub(1);
309                        }
310                        return;
311                    }
312                    if dynamic {
313                        let (guard, timeout) = cv
314                            .wait_timeout(st, DYNAMIC_IDLE_TIMEOUT)
315                            .expect("lane state poisoned");
316                        st = guard;
317                        if timeout.timed_out() && st.queue.is_empty() && !st.shutdown {
318                            st.workers = st.workers.saturating_sub(1);
319                            st.dynamic_alive = st.dynamic_alive.saturating_sub(1);
320                            return;
321                        }
322                    } else {
323                        st = cv.wait(st).expect("lane state poisoned");
324                    }
325                }
326            };
327            job();
328            let mut st = lock.lock().expect("lane state poisoned");
329            st.busy = st.busy.saturating_sub(1);
330        }
331    }
332}
333
334#[cfg(all(test, feature = "std"))]
335#[allow(clippy::unwrap_used, clippy::panic)]
336mod tests {
337    use super::*;
338    use crate::policy::{Lane, Threadpool};
339    use crate::priority::{LinearPriorityMapping, Priority};
340    use alloc::sync::Arc;
341    use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
342
343    fn p(v: i16) -> Priority {
344        Priority::new(v).unwrap()
345    }
346
347    fn pool() -> Threadpool {
348        Threadpool {
349            lanes: alloc::vec![
350                Lane {
351                    priority: p(0),
352                    static_threads: 1,
353                    dynamic_threads: 0,
354                },
355                Lane {
356                    priority: p(50),
357                    static_threads: 2,
358                    dynamic_threads: 2,
359                },
360            ],
361            stacksize: 0,
362            allow_request_buffering: true,
363            max_buffered_requests: 0,
364        }
365    }
366
367    #[test]
368    fn dispatches_and_runs_all_jobs() {
369        let counter = Arc::new(AtomicU32::new(0));
370        let rt = ThreadpoolRuntime::start(&pool(), &LinearPriorityMapping::new(1, 99), None);
371        for _ in 0..20 {
372            let c = Arc::clone(&counter);
373            rt.dispatch(p(60), move || {
374                c.fetch_add(1, Ordering::SeqCst);
375            })
376            .unwrap();
377        }
378        drop(rt); // joins all workers → all jobs have been processed
379        assert_eq!(counter.load(Ordering::SeqCst), 20);
380    }
381
382    #[test]
383    fn routes_to_lane_by_priority() {
384        let seen = Arc::new(AtomicI32::new(-1));
385        let hook_pool = pool();
386        let rt = ThreadpoolRuntime::start(&hook_pool, &LinearPriorityMapping::new(1, 99), None);
387        // Priority 10 → covers only lane 0 (prio 0).
388        let s = Arc::clone(&seen);
389        let done = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
390        let d2 = Arc::clone(&done);
391        rt.dispatch(p(10), move || {
392            // Lane-0 workers run at the native prio of Priority(0) = 1.
393            s.store(1, Ordering::SeqCst);
394            let (m, cv) = &*d2;
395            *m.lock().unwrap() = true;
396            cv.notify_all();
397        })
398        .unwrap();
399        let (m, cv) = &*done;
400        let mut g = m.lock().unwrap();
401        while !*g {
402            g = cv.wait(g).unwrap();
403        }
404        assert_eq!(seen.load(Ordering::SeqCst), 1);
405    }
406
407    #[test]
408    fn native_priority_hook_invoked_per_worker() {
409        struct RecordHook(Arc<std::sync::Mutex<alloc::vec::Vec<i32>>>);
410        impl NativePrioritySetter for RecordHook {
411            fn set_current_thread_priority(&self, native_priority: i32) {
412                self.0.lock().unwrap().push(native_priority);
413            }
414        }
415        let log = Arc::new(std::sync::Mutex::new(alloc::vec::Vec::new()));
416        let hook = Arc::new(RecordHook(Arc::clone(&log)));
417        let rt = ThreadpoolRuntime::start(
418            &pool(),
419            &LinearPriorityMapping::new(1, 99),
420            Some(hook as Arc<dyn NativePrioritySetter>),
421        );
422        drop(rt);
423        let mut got = log.lock().unwrap().clone();
424        got.sort_unstable();
425        // Lane 0 (prio 0 → native 1) ×1 worker, lane 50 (→ ~50) ×2 workers.
426        assert_eq!(got.len(), 3);
427        assert_eq!(got[0], 1);
428    }
429
430    #[test]
431    fn rejects_when_buffering_off_and_no_worker() {
432        let mut tp = Threadpool {
433            lanes: alloc::vec![Lane {
434                priority: p(0),
435                static_threads: 1,
436                dynamic_threads: 0,
437            }],
438            stacksize: 0,
439            allow_request_buffering: false,
440            max_buffered_requests: 0,
441        };
442        tp.allow_request_buffering = false;
443        let rt = ThreadpoolRuntime::start(&tp, &LinearPriorityMapping::new(1, 99), None);
444        // Occupy the single worker with a blocking job.
445        let gate = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
446        let g2 = Arc::clone(&gate);
447        rt.dispatch(p(0), move || {
448            let (m, cv) = &*g2;
449            let mut held = m.lock().unwrap();
450            while !*held {
451                held = cv.wait(held).unwrap();
452            }
453        })
454        .unwrap();
455        // Wait until the worker has picked up the job (idle == 0).
456        std::thread::sleep(std::time::Duration::from_millis(50));
457        // Now no worker is free, no dynamic budget, buffering off → reject.
458        let r = rt.dispatch(p(0), || {});
459        assert_eq!(r, Err(DispatchError::Rejected));
460        // Open the gate so the worker ends cleanly.
461        let (m, cv) = &*gate;
462        *m.lock().unwrap() = true;
463        cv.notify_all();
464    }
465
466    #[test]
467    fn dynamic_worker_spawns_under_saturation() {
468        // Lane 50: 2 static + 2 dynamic. 4 blocking jobs must all be able to
469        // run at the same time → 2 dynamic workers are created.
470        let rt = ThreadpoolRuntime::start(&pool(), &LinearPriorityMapping::new(1, 99), None);
471        let running = Arc::new(AtomicU32::new(0));
472        let gate = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
473        for _ in 0..4 {
474            let r = Arc::clone(&running);
475            let g = Arc::clone(&gate);
476            rt.dispatch(p(50), move || {
477                r.fetch_add(1, Ordering::SeqCst);
478                let (m, cv) = &*g;
479                let mut held = m.lock().unwrap();
480                while !*held {
481                    held = cv.wait(held).unwrap();
482                }
483            })
484            .unwrap();
485        }
486        // Until all 4 run at the same time (otherwise there would be only 2 workers).
487        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
488        while running.load(Ordering::SeqCst) < 4 {
489            assert!(
490                std::time::Instant::now() < deadline,
491                "dynamic workers spawned too few"
492            );
493            std::thread::yield_now();
494        }
495        assert_eq!(running.load(Ordering::SeqCst), 4);
496        // Lane 50 is index 1: 2 static + 2 dynamic = 4 handles.
497        assert_eq!(rt.spawned_workers(1), 4);
498        let (m, cv) = &*gate;
499        *m.lock().unwrap() = true;
500        cv.notify_all();
501    }
502}