#[cfg(feature = "std")]
pub use std_impl::{DispatchError, NativePrioritySetter, ThreadpoolRuntime};
#[cfg(feature = "std")]
#[allow(clippy::expect_used)]
mod std_impl {
use alloc::boxed::Box;
use alloc::collections::VecDeque;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::fmt;
use core::time::Duration;
use std::sync::{Condvar, Mutex};
use std::thread::JoinHandle;
use crate::policy::Threadpool;
use crate::priority::{Priority, PriorityMapping};
const DYNAMIC_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
type Job = Box<dyn FnOnce() + Send + 'static>;
pub trait NativePrioritySetter: Send + Sync {
fn set_current_thread_priority(&self, native_priority: i32);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DispatchError {
NoLane,
Rejected,
}
impl fmt::Display for DispatchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoLane => f.write_str("threadpool has no lane"),
Self::Rejected => f.write_str("request rejected (no worker, buffering off/full)"),
}
}
}
impl std::error::Error for DispatchError {}
struct LaneState {
queue: VecDeque<Job>,
workers: u32,
busy: u32,
dynamic_alive: u32,
shutdown: bool,
}
struct Lane {
priority: Priority,
native_priority: i32,
dynamic_threads: u32,
sync: Arc<(Mutex<LaneState>, Condvar)>,
handles: Mutex<Vec<JoinHandle<()>>>,
}
pub struct ThreadpoolRuntime {
lanes: Vec<Lane>,
allow_buffering: bool,
max_buffered: u32,
hook: Option<Arc<dyn NativePrioritySetter>>,
}
impl ThreadpoolRuntime {
#[must_use]
pub fn start<M: PriorityMapping>(
pool: &Threadpool,
mapping: &M,
hook: Option<Arc<dyn NativePrioritySetter>>,
) -> Self {
let mut lanes = Vec::with_capacity(pool.lanes.len());
for lane_cfg in &pool.lanes {
let native_priority = mapping.to_native(lane_cfg.priority).unwrap_or(0);
let lane = Lane {
priority: lane_cfg.priority,
native_priority,
dynamic_threads: lane_cfg.dynamic_threads,
sync: Arc::new((
Mutex::new(LaneState {
queue: VecDeque::new(),
workers: lane_cfg.static_threads,
busy: 0,
dynamic_alive: 0,
shutdown: false,
}),
Condvar::new(),
)),
handles: Mutex::new(Vec::new()),
};
let stacksize = pool.stacksize;
let mut handles = lane.handles.lock().expect("lane handles poisoned");
for _ in 0..lane_cfg.static_threads {
handles.push(spawn_worker(
Arc::clone(&lane.sync),
hook.clone(),
native_priority,
stacksize,
false,
));
}
drop(handles);
lanes.push(lane);
}
Self {
lanes,
allow_buffering: pool.allow_request_buffering,
max_buffered: pool.max_buffered_requests,
hook,
}
}
fn select_lane(&self, priority: Priority) -> Option<usize> {
let covering = self
.lanes
.iter()
.enumerate()
.filter(|(_, l)| l.priority <= priority)
.max_by_key(|(_, l)| l.priority)
.map(|(i, _)| i);
covering.or_else(|| {
self.lanes
.iter()
.enumerate()
.min_by_key(|(_, l)| l.priority)
.map(|(i, _)| i)
})
}
#[allow(clippy::missing_panics_doc)]
pub fn dispatch<F>(&self, priority: Priority, job: F) -> Result<(), DispatchError>
where
F: FnOnce() + Send + 'static,
{
let idx = self.select_lane(priority).ok_or(DispatchError::NoLane)?;
let lane = &self.lanes[idx];
let (lock, cv) = &*lane.sync;
let need_spawn;
{
let mut st = lock.lock().expect("lane state poisoned");
let pending = st.busy + st.queue.len() as u32;
let free = st.workers.saturating_sub(pending);
let can_grow = st.dynamic_alive < lane.dynamic_threads;
if !self.allow_buffering && free == 0 && !can_grow {
return Err(DispatchError::Rejected);
}
if self.max_buffered > 0 && st.queue.len() as u32 >= self.max_buffered {
return Err(DispatchError::Rejected);
}
st.queue.push_back(Box::new(job));
need_spawn = free == 0 && can_grow;
if need_spawn {
st.workers += 1;
st.dynamic_alive += 1;
}
}
if need_spawn {
let handle = spawn_worker(
Arc::clone(&lane.sync),
self.hook.clone(),
lane.native_priority,
0,
true,
);
lane.handles
.lock()
.expect("lane handles poisoned")
.push(handle);
} else {
cv.notify_one();
}
Ok(())
}
#[must_use]
pub fn lane_count(&self) -> usize {
self.lanes.len()
}
#[must_use]
pub fn spawned_workers(&self, lane_index: usize) -> usize {
self.lanes
.get(lane_index)
.map(|l| l.handles.lock().expect("lane handles poisoned").len())
.unwrap_or(0)
}
}
impl Drop for ThreadpoolRuntime {
fn drop(&mut self) {
for lane in &self.lanes {
let (lock, cv) = &*lane.sync;
{
let mut st = lock.lock().expect("lane state poisoned");
st.shutdown = true;
}
cv.notify_all();
}
for lane in &self.lanes {
let handles =
core::mem::take(&mut *lane.handles.lock().expect("lane handles poisoned"));
for h in handles {
let _ = h.join();
}
}
}
}
fn spawn_worker(
sync: Arc<(Mutex<LaneState>, Condvar)>,
hook: Option<Arc<dyn NativePrioritySetter>>,
native_priority: i32,
stacksize: usize,
dynamic: bool,
) -> JoinHandle<()> {
let mut builder =
std::thread::Builder::new().name(alloc::format!("rtcorba-lane-{native_priority}"));
if stacksize > 0 {
builder = builder.stack_size(stacksize);
}
builder
.spawn(move || worker_loop(&sync, hook.as_deref(), native_priority, dynamic))
.expect("spawn rt-corba worker")
}
fn worker_loop(
sync: &(Mutex<LaneState>, Condvar),
hook: Option<&dyn NativePrioritySetter>,
native_priority: i32,
dynamic: bool,
) {
if let Some(h) = hook {
h.set_current_thread_priority(native_priority);
}
let (lock, cv) = sync;
loop {
let job = {
let mut st = lock.lock().expect("lane state poisoned");
loop {
if let Some(job) = st.queue.pop_front() {
st.busy += 1;
break job;
}
if st.shutdown {
st.workers = st.workers.saturating_sub(1);
if dynamic {
st.dynamic_alive = st.dynamic_alive.saturating_sub(1);
}
return;
}
if dynamic {
let (guard, timeout) = cv
.wait_timeout(st, DYNAMIC_IDLE_TIMEOUT)
.expect("lane state poisoned");
st = guard;
if timeout.timed_out() && st.queue.is_empty() && !st.shutdown {
st.workers = st.workers.saturating_sub(1);
st.dynamic_alive = st.dynamic_alive.saturating_sub(1);
return;
}
} else {
st = cv.wait(st).expect("lane state poisoned");
}
}
};
job();
let mut st = lock.lock().expect("lane state poisoned");
st.busy = st.busy.saturating_sub(1);
}
}
}
#[cfg(all(test, feature = "std"))]
#[allow(clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use crate::policy::{Lane, Threadpool};
use crate::priority::{LinearPriorityMapping, Priority};
use alloc::sync::Arc;
use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
fn p(v: i16) -> Priority {
Priority::new(v).unwrap()
}
fn pool() -> Threadpool {
Threadpool {
lanes: alloc::vec![
Lane {
priority: p(0),
static_threads: 1,
dynamic_threads: 0,
},
Lane {
priority: p(50),
static_threads: 2,
dynamic_threads: 2,
},
],
stacksize: 0,
allow_request_buffering: true,
max_buffered_requests: 0,
}
}
#[test]
fn dispatches_and_runs_all_jobs() {
let counter = Arc::new(AtomicU32::new(0));
let rt = ThreadpoolRuntime::start(&pool(), &LinearPriorityMapping::new(1, 99), None);
for _ in 0..20 {
let c = Arc::clone(&counter);
rt.dispatch(p(60), move || {
c.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
}
drop(rt); assert_eq!(counter.load(Ordering::SeqCst), 20);
}
#[test]
fn routes_to_lane_by_priority() {
let seen = Arc::new(AtomicI32::new(-1));
let hook_pool = pool();
let rt = ThreadpoolRuntime::start(&hook_pool, &LinearPriorityMapping::new(1, 99), None);
let s = Arc::clone(&seen);
let done = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
let d2 = Arc::clone(&done);
rt.dispatch(p(10), move || {
s.store(1, Ordering::SeqCst);
let (m, cv) = &*d2;
*m.lock().unwrap() = true;
cv.notify_all();
})
.unwrap();
let (m, cv) = &*done;
let mut g = m.lock().unwrap();
while !*g {
g = cv.wait(g).unwrap();
}
assert_eq!(seen.load(Ordering::SeqCst), 1);
}
#[test]
fn native_priority_hook_invoked_per_worker() {
struct RecordHook(Arc<std::sync::Mutex<alloc::vec::Vec<i32>>>);
impl NativePrioritySetter for RecordHook {
fn set_current_thread_priority(&self, native_priority: i32) {
self.0.lock().unwrap().push(native_priority);
}
}
let log = Arc::new(std::sync::Mutex::new(alloc::vec::Vec::new()));
let hook = Arc::new(RecordHook(Arc::clone(&log)));
let rt = ThreadpoolRuntime::start(
&pool(),
&LinearPriorityMapping::new(1, 99),
Some(hook as Arc<dyn NativePrioritySetter>),
);
drop(rt);
let mut got = log.lock().unwrap().clone();
got.sort_unstable();
assert_eq!(got.len(), 3);
assert_eq!(got[0], 1);
}
#[test]
fn rejects_when_buffering_off_and_no_worker() {
let mut tp = Threadpool {
lanes: alloc::vec![Lane {
priority: p(0),
static_threads: 1,
dynamic_threads: 0,
}],
stacksize: 0,
allow_request_buffering: false,
max_buffered_requests: 0,
};
tp.allow_request_buffering = false;
let rt = ThreadpoolRuntime::start(&tp, &LinearPriorityMapping::new(1, 99), None);
let gate = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
let g2 = Arc::clone(&gate);
rt.dispatch(p(0), move || {
let (m, cv) = &*g2;
let mut held = m.lock().unwrap();
while !*held {
held = cv.wait(held).unwrap();
}
})
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
let r = rt.dispatch(p(0), || {});
assert_eq!(r, Err(DispatchError::Rejected));
let (m, cv) = &*gate;
*m.lock().unwrap() = true;
cv.notify_all();
}
#[test]
fn dynamic_worker_spawns_under_saturation() {
let rt = ThreadpoolRuntime::start(&pool(), &LinearPriorityMapping::new(1, 99), None);
let running = Arc::new(AtomicU32::new(0));
let gate = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
for _ in 0..4 {
let r = Arc::clone(&running);
let g = Arc::clone(&gate);
rt.dispatch(p(50), move || {
r.fetch_add(1, Ordering::SeqCst);
let (m, cv) = &*g;
let mut held = m.lock().unwrap();
while !*held {
held = cv.wait(held).unwrap();
}
})
.unwrap();
}
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
while running.load(Ordering::SeqCst) < 4 {
assert!(
std::time::Instant::now() < deadline,
"dynamic workers spawned too few"
);
std::thread::yield_now();
}
assert_eq!(running.load(Ordering::SeqCst), 4);
assert_eq!(rt.spawned_workers(1), 4);
let (m, cv) = &*gate;
*m.lock().unwrap() = true;
cv.notify_all();
}
}