1#[cfg(feature = "std")]
21pub use std_impl::{DispatchError, NativePrioritySetter, ThreadpoolRuntime};
22
23#[cfg(feature = "std")]
24#[allow(clippy::expect_used)]
25mod std_impl {
26 use alloc::boxed::Box;
27 use alloc::collections::VecDeque;
28 use alloc::sync::Arc;
29 use alloc::vec::Vec;
30 use core::fmt;
31 use core::time::Duration;
32 use std::sync::{Condvar, Mutex};
33 use std::thread::JoinHandle;
34
35 use crate::policy::Threadpool;
36 use crate::priority::{Priority, PriorityMapping};
37
38 const DYNAMIC_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
40
41 type Job = Box<dyn FnOnce() + Send + 'static>;
42
43 pub trait NativePrioritySetter: Send + Sync {
50 fn set_current_thread_priority(&self, native_priority: i32);
52 }
53
54 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
56 pub enum DispatchError {
57 NoLane,
59 Rejected,
61 }
62
63 impl fmt::Display for DispatchError {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 Self::NoLane => f.write_str("threadpool has no lane"),
67 Self::Rejected => f.write_str("request rejected (no worker, buffering off/full)"),
68 }
69 }
70 }
71
72 impl std::error::Error for DispatchError {}
73
74 struct LaneState {
75 queue: VecDeque<Job>,
76 workers: u32,
78 busy: u32,
80 dynamic_alive: u32,
82 shutdown: bool,
83 }
84
85 struct Lane {
86 priority: Priority,
87 native_priority: i32,
88 dynamic_threads: u32,
89 sync: Arc<(Mutex<LaneState>, Condvar)>,
90 handles: Mutex<Vec<JoinHandle<()>>>,
91 }
92
93 pub struct ThreadpoolRuntime {
95 lanes: Vec<Lane>,
96 allow_buffering: bool,
97 max_buffered: u32,
98 hook: Option<Arc<dyn NativePrioritySetter>>,
99 }
100
101 impl ThreadpoolRuntime {
102 #[must_use]
107 pub fn start<M: PriorityMapping>(
108 pool: &Threadpool,
109 mapping: &M,
110 hook: Option<Arc<dyn NativePrioritySetter>>,
111 ) -> Self {
112 let mut lanes = Vec::with_capacity(pool.lanes.len());
113 for lane_cfg in &pool.lanes {
114 let native_priority = mapping.to_native(lane_cfg.priority).unwrap_or(0);
115 let lane = Lane {
116 priority: lane_cfg.priority,
117 native_priority,
118 dynamic_threads: lane_cfg.dynamic_threads,
119 sync: Arc::new((
120 Mutex::new(LaneState {
121 queue: VecDeque::new(),
122 workers: lane_cfg.static_threads,
123 busy: 0,
124 dynamic_alive: 0,
125 shutdown: false,
126 }),
127 Condvar::new(),
128 )),
129 handles: Mutex::new(Vec::new()),
130 };
131 let stacksize = pool.stacksize;
132 let mut handles = lane.handles.lock().expect("lane handles poisoned");
133 for _ in 0..lane_cfg.static_threads {
134 handles.push(spawn_worker(
135 Arc::clone(&lane.sync),
136 hook.clone(),
137 native_priority,
138 stacksize,
139 false,
140 ));
141 }
142 drop(handles);
143 lanes.push(lane);
144 }
145 Self {
146 lanes,
147 allow_buffering: pool.allow_request_buffering,
148 max_buffered: pool.max_buffered_requests,
149 hook,
150 }
151 }
152
153 fn select_lane(&self, priority: Priority) -> Option<usize> {
157 let covering = self
158 .lanes
159 .iter()
160 .enumerate()
161 .filter(|(_, l)| l.priority <= priority)
162 .max_by_key(|(_, l)| l.priority)
163 .map(|(i, _)| i);
164 covering.or_else(|| {
165 self.lanes
166 .iter()
167 .enumerate()
168 .min_by_key(|(_, l)| l.priority)
169 .map(|(i, _)| i)
170 })
171 }
172
173 #[allow(clippy::missing_panics_doc)]
181 pub fn dispatch<F>(&self, priority: Priority, job: F) -> Result<(), DispatchError>
182 where
183 F: FnOnce() + Send + 'static,
184 {
185 let idx = self.select_lane(priority).ok_or(DispatchError::NoLane)?;
186 let lane = &self.lanes[idx];
187 let (lock, cv) = &*lane.sync;
188
189 let need_spawn;
190 {
191 let mut st = lock.lock().expect("lane state poisoned");
192 let pending = st.busy + st.queue.len() as u32;
196 let free = st.workers.saturating_sub(pending);
197 let can_grow = st.dynamic_alive < lane.dynamic_threads;
198 if !self.allow_buffering && free == 0 && !can_grow {
199 return Err(DispatchError::Rejected);
200 }
201 if self.max_buffered > 0 && st.queue.len() as u32 >= self.max_buffered {
202 return Err(DispatchError::Rejected);
203 }
204 st.queue.push_back(Box::new(job));
205 need_spawn = free == 0 && can_grow;
206 if need_spawn {
207 st.workers += 1;
208 st.dynamic_alive += 1;
209 }
210 }
211
212 if need_spawn {
213 let handle = spawn_worker(
214 Arc::clone(&lane.sync),
215 self.hook.clone(),
216 lane.native_priority,
217 0,
218 true,
219 );
220 lane.handles
221 .lock()
222 .expect("lane handles poisoned")
223 .push(handle);
224 } else {
225 cv.notify_one();
226 }
227 Ok(())
228 }
229
230 #[must_use]
232 pub fn lane_count(&self) -> usize {
233 self.lanes.len()
234 }
235
236 #[must_use]
242 pub fn spawned_workers(&self, lane_index: usize) -> usize {
243 self.lanes
244 .get(lane_index)
245 .map(|l| l.handles.lock().expect("lane handles poisoned").len())
246 .unwrap_or(0)
247 }
248 }
249
250 impl Drop for ThreadpoolRuntime {
251 fn drop(&mut self) {
252 for lane in &self.lanes {
253 let (lock, cv) = &*lane.sync;
254 {
255 let mut st = lock.lock().expect("lane state poisoned");
256 st.shutdown = true;
257 }
258 cv.notify_all();
259 }
260 for lane in &self.lanes {
261 let handles =
262 core::mem::take(&mut *lane.handles.lock().expect("lane handles poisoned"));
263 for h in handles {
264 let _ = h.join();
265 }
266 }
267 }
268 }
269
270 fn spawn_worker(
271 sync: Arc<(Mutex<LaneState>, Condvar)>,
272 hook: Option<Arc<dyn NativePrioritySetter>>,
273 native_priority: i32,
274 stacksize: usize,
275 dynamic: bool,
276 ) -> JoinHandle<()> {
277 let mut builder =
278 std::thread::Builder::new().name(alloc::format!("rtcorba-lane-{native_priority}"));
279 if stacksize > 0 {
280 builder = builder.stack_size(stacksize);
281 }
282 builder
283 .spawn(move || worker_loop(&sync, hook.as_deref(), native_priority, dynamic))
284 .expect("spawn rt-corba worker")
285 }
286
287 fn worker_loop(
288 sync: &(Mutex<LaneState>, Condvar),
289 hook: Option<&dyn NativePrioritySetter>,
290 native_priority: i32,
291 dynamic: bool,
292 ) {
293 if let Some(h) = hook {
294 h.set_current_thread_priority(native_priority);
295 }
296 let (lock, cv) = sync;
297 loop {
298 let job = {
299 let mut st = lock.lock().expect("lane state poisoned");
300 loop {
301 if let Some(job) = st.queue.pop_front() {
302 st.busy += 1;
303 break job;
304 }
305 if st.shutdown {
306 st.workers = st.workers.saturating_sub(1);
307 if dynamic {
308 st.dynamic_alive = st.dynamic_alive.saturating_sub(1);
309 }
310 return;
311 }
312 if dynamic {
313 let (guard, timeout) = cv
314 .wait_timeout(st, DYNAMIC_IDLE_TIMEOUT)
315 .expect("lane state poisoned");
316 st = guard;
317 if timeout.timed_out() && st.queue.is_empty() && !st.shutdown {
318 st.workers = st.workers.saturating_sub(1);
319 st.dynamic_alive = st.dynamic_alive.saturating_sub(1);
320 return;
321 }
322 } else {
323 st = cv.wait(st).expect("lane state poisoned");
324 }
325 }
326 };
327 job();
328 let mut st = lock.lock().expect("lane state poisoned");
329 st.busy = st.busy.saturating_sub(1);
330 }
331 }
332}
333
334#[cfg(all(test, feature = "std"))]
335#[allow(clippy::unwrap_used, clippy::panic)]
336mod tests {
337 use super::*;
338 use crate::policy::{Lane, Threadpool};
339 use crate::priority::{LinearPriorityMapping, Priority};
340 use alloc::sync::Arc;
341 use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
342
343 fn p(v: i16) -> Priority {
344 Priority::new(v).unwrap()
345 }
346
347 fn pool() -> Threadpool {
348 Threadpool {
349 lanes: alloc::vec![
350 Lane {
351 priority: p(0),
352 static_threads: 1,
353 dynamic_threads: 0,
354 },
355 Lane {
356 priority: p(50),
357 static_threads: 2,
358 dynamic_threads: 2,
359 },
360 ],
361 stacksize: 0,
362 allow_request_buffering: true,
363 max_buffered_requests: 0,
364 }
365 }
366
367 #[test]
368 fn dispatches_and_runs_all_jobs() {
369 let counter = Arc::new(AtomicU32::new(0));
370 let rt = ThreadpoolRuntime::start(&pool(), &LinearPriorityMapping::new(1, 99), None);
371 for _ in 0..20 {
372 let c = Arc::clone(&counter);
373 rt.dispatch(p(60), move || {
374 c.fetch_add(1, Ordering::SeqCst);
375 })
376 .unwrap();
377 }
378 drop(rt); assert_eq!(counter.load(Ordering::SeqCst), 20);
380 }
381
382 #[test]
383 fn routes_to_lane_by_priority() {
384 let seen = Arc::new(AtomicI32::new(-1));
385 let hook_pool = pool();
386 let rt = ThreadpoolRuntime::start(&hook_pool, &LinearPriorityMapping::new(1, 99), None);
387 let s = Arc::clone(&seen);
389 let done = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
390 let d2 = Arc::clone(&done);
391 rt.dispatch(p(10), move || {
392 s.store(1, Ordering::SeqCst);
394 let (m, cv) = &*d2;
395 *m.lock().unwrap() = true;
396 cv.notify_all();
397 })
398 .unwrap();
399 let (m, cv) = &*done;
400 let mut g = m.lock().unwrap();
401 while !*g {
402 g = cv.wait(g).unwrap();
403 }
404 assert_eq!(seen.load(Ordering::SeqCst), 1);
405 }
406
407 #[test]
408 fn native_priority_hook_invoked_per_worker() {
409 struct RecordHook(Arc<std::sync::Mutex<alloc::vec::Vec<i32>>>);
410 impl NativePrioritySetter for RecordHook {
411 fn set_current_thread_priority(&self, native_priority: i32) {
412 self.0.lock().unwrap().push(native_priority);
413 }
414 }
415 let log = Arc::new(std::sync::Mutex::new(alloc::vec::Vec::new()));
416 let hook = Arc::new(RecordHook(Arc::clone(&log)));
417 let rt = ThreadpoolRuntime::start(
418 &pool(),
419 &LinearPriorityMapping::new(1, 99),
420 Some(hook as Arc<dyn NativePrioritySetter>),
421 );
422 drop(rt);
423 let mut got = log.lock().unwrap().clone();
424 got.sort_unstable();
425 assert_eq!(got.len(), 3);
427 assert_eq!(got[0], 1);
428 }
429
430 #[test]
431 fn rejects_when_buffering_off_and_no_worker() {
432 let mut tp = Threadpool {
433 lanes: alloc::vec![Lane {
434 priority: p(0),
435 static_threads: 1,
436 dynamic_threads: 0,
437 }],
438 stacksize: 0,
439 allow_request_buffering: false,
440 max_buffered_requests: 0,
441 };
442 tp.allow_request_buffering = false;
443 let rt = ThreadpoolRuntime::start(&tp, &LinearPriorityMapping::new(1, 99), None);
444 let gate = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
446 let g2 = Arc::clone(&gate);
447 rt.dispatch(p(0), move || {
448 let (m, cv) = &*g2;
449 let mut held = m.lock().unwrap();
450 while !*held {
451 held = cv.wait(held).unwrap();
452 }
453 })
454 .unwrap();
455 std::thread::sleep(std::time::Duration::from_millis(50));
457 let r = rt.dispatch(p(0), || {});
459 assert_eq!(r, Err(DispatchError::Rejected));
460 let (m, cv) = &*gate;
462 *m.lock().unwrap() = true;
463 cv.notify_all();
464 }
465
466 #[test]
467 fn dynamic_worker_spawns_under_saturation() {
468 let rt = ThreadpoolRuntime::start(&pool(), &LinearPriorityMapping::new(1, 99), None);
471 let running = Arc::new(AtomicU32::new(0));
472 let gate = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
473 for _ in 0..4 {
474 let r = Arc::clone(&running);
475 let g = Arc::clone(&gate);
476 rt.dispatch(p(50), move || {
477 r.fetch_add(1, Ordering::SeqCst);
478 let (m, cv) = &*g;
479 let mut held = m.lock().unwrap();
480 while !*held {
481 held = cv.wait(held).unwrap();
482 }
483 })
484 .unwrap();
485 }
486 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
488 while running.load(Ordering::SeqCst) < 4 {
489 assert!(
490 std::time::Instant::now() < deadline,
491 "dynamic workers spawned too few"
492 );
493 std::thread::yield_now();
494 }
495 assert_eq!(running.load(Ordering::SeqCst), 4);
496 assert_eq!(rt.spawned_workers(1), 4);
498 let (m, cv) = &*gate;
499 *m.lock().unwrap() = true;
500 cv.notify_all();
501 }
502}