#![allow(clippy::must_use_candidate)]
use crossbeam_queue::SegQueue;
use parking_lot::{Condvar, Mutex};
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::thread::{self, JoinHandle as ThreadJoinHandle};
use std::time::{Duration, Instant};
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
pub type TimeGetter = fn() -> Instant;
pub type SleepFn = fn(Duration);
fn wall_clock_now() -> Instant {
Instant::now()
}
fn blocking_thread_sleep(duration: Duration) {
thread::sleep(duration);
}
fn timeout_deadline(timeout: Duration, time_getter: TimeGetter) -> Instant {
time_getter() + timeout
}
fn timeout_remaining(deadline: Instant, time_getter: TimeGetter) -> Duration {
deadline.saturating_duration_since(time_getter())
}
#[derive(Clone)]
pub struct BlockingPoolHandle {
inner: Arc<BlockingPoolInner>,
}
impl fmt::Debug for BlockingPoolHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlockingPoolHandle")
.field(
"active_threads",
&self.inner.active_threads.load(Ordering::Relaxed),
)
.field(
"pending_tasks",
&self.inner.pending_count.load(Ordering::Relaxed),
)
.finish()
}
}
pub struct BlockingPool {
inner: Arc<BlockingPoolInner>,
}
impl fmt::Debug for BlockingPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let handles_len = self.inner.thread_handles.lock().len();
f.debug_struct("BlockingPool")
.field("min_threads", &self.inner.min_threads)
.field("max_threads", &self.inner.max_threads)
.field(
"active_threads",
&self.inner.active_threads.load(Ordering::Relaxed),
)
.field(
"pending_tasks",
&self.inner.pending_count.load(Ordering::Relaxed),
)
.field("thread_handles", &handles_len)
.finish()
}
}
struct BlockingPoolInner {
min_threads: usize,
max_threads: usize,
active_threads: AtomicUsize,
busy_threads: AtomicUsize,
pending_count: AtomicUsize,
next_task_id: AtomicU64,
next_thread_id: AtomicU64,
queue: SegQueue<BlockingTask>,
shutdown: AtomicBool,
condvar: Condvar,
mutex: Mutex<()>,
idle_timeout: Duration,
time_getter: TimeGetter,
sleep_fn: SleepFn,
thread_name_prefix: String,
on_thread_start: Option<Arc<dyn Fn() + Send + Sync>>,
on_thread_stop: Option<Arc<dyn Fn() + Send + Sync>>,
thread_handles: Mutex<Vec<ThreadJoinHandle<()>>>,
}
struct BlockingTask {
work: Box<dyn FnOnce() + Send + 'static>,
#[allow(dead_code)]
priority: u8,
cancelled: Arc<AtomicBool>,
completion: Arc<BlockingTaskCompletion>,
}
struct BlockingTaskCompletion {
done: AtomicBool,
condvar: Condvar,
mutex: Mutex<()>,
time_getter: TimeGetter,
}
impl BlockingTaskCompletion {
fn new(time_getter: TimeGetter) -> Self {
Self {
done: AtomicBool::new(false),
condvar: Condvar::new(),
mutex: Mutex::new(()),
time_getter,
}
}
fn signal_done(&self) {
self.done.store(true, Ordering::Release);
let _guard = self.mutex.lock();
self.condvar.notify_all();
}
fn wait(&self) {
if self.done.load(Ordering::Acquire) {
return;
}
{
let mut guard = self.mutex.lock();
while !self.done.load(Ordering::Acquire) {
self.condvar.wait(&mut guard);
}
drop(guard);
}
}
fn wait_timeout(&self, timeout: Duration) -> bool {
if self.done.load(Ordering::Acquire) {
return true;
}
let deadline = timeout_deadline(timeout, self.time_getter);
let mut guard = self.mutex.lock();
while !self.done.load(Ordering::Acquire) {
let remaining = timeout_remaining(deadline, self.time_getter);
if remaining.is_zero() {
return false;
}
self.condvar.wait_for(&mut guard, remaining);
}
drop(guard);
true
}
fn is_done(&self) -> bool {
self.done.load(Ordering::Acquire)
}
}
pub struct BlockingTaskHandle {
#[allow(dead_code)]
task_id: u64,
cancelled: Arc<AtomicBool>,
completion: Arc<BlockingTaskCompletion>,
}
impl BlockingTaskHandle {
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
#[must_use]
pub fn is_done(&self) -> bool {
self.completion.is_done()
}
pub fn wait(&self) {
self.completion.wait();
}
#[must_use]
pub fn wait_timeout(&self, timeout: Duration) -> bool {
self.completion.wait_timeout(timeout)
}
}
impl fmt::Debug for BlockingTaskHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlockingTaskHandle")
.field("task_id", &self.task_id)
.field("cancelled", &self.is_cancelled())
.field("done", &self.is_done())
.field("completion", &self.completion.is_done())
.finish()
}
}
impl BlockingPool {
#[must_use]
pub fn new(min_threads: usize, max_threads: usize) -> Self {
Self::with_config(min_threads, max_threads, BlockingPoolOptions::default())
}
#[must_use]
pub fn with_config(
min_threads: usize,
max_threads: usize,
options: BlockingPoolOptions,
) -> Self {
assert!(max_threads > 0, "max_threads must be at least 1");
let max_threads = max_threads.max(min_threads);
let inner = Arc::new(BlockingPoolInner {
min_threads,
max_threads,
active_threads: AtomicUsize::new(0),
busy_threads: AtomicUsize::new(0),
pending_count: AtomicUsize::new(0),
next_task_id: AtomicU64::new(1),
next_thread_id: AtomicU64::new(1),
queue: SegQueue::new(),
shutdown: AtomicBool::new(false),
condvar: Condvar::new(),
mutex: Mutex::new(()),
idle_timeout: options.idle_timeout,
time_getter: options.time_getter,
sleep_fn: options.sleep_fn,
thread_name_prefix: options.thread_name_prefix,
on_thread_start: options.on_thread_start,
on_thread_stop: options.on_thread_stop,
thread_handles: Mutex::new(Vec::with_capacity(max_threads)),
});
let pool = Self { inner };
for _ in 0..min_threads {
pool.spawn_thread();
}
pool
}
#[must_use]
pub fn handle(&self) -> BlockingPoolHandle {
BlockingPoolHandle {
inner: Arc::clone(&self.inner),
}
}
pub fn spawn<F>(&self, f: F) -> BlockingTaskHandle
where
F: FnOnce() + Send + 'static,
{
self.spawn_with_priority(f, 128)
}
pub fn spawn_with_priority<F>(&self, f: F, priority: u8) -> BlockingTaskHandle
where
F: FnOnce() + Send + 'static,
{
let task_id = self.inner.next_task_id.fetch_add(1, Ordering::Relaxed);
let cancelled = Arc::new(AtomicBool::new(false));
let completion = Arc::new(BlockingTaskCompletion::new(self.inner.time_getter));
let handle = BlockingTaskHandle {
task_id,
cancelled: Arc::clone(&cancelled),
completion: Arc::clone(&completion),
};
if self.inner.shutdown.load(Ordering::Acquire) {
cancelled.store(true, Ordering::Release);
completion.signal_done();
return handle;
}
let task = BlockingTask {
work: Box::new(f),
priority,
cancelled: Arc::clone(&cancelled),
completion: Arc::clone(&completion),
};
if !try_enqueue_task(&self.inner, task) {
cancelled.store(true, Ordering::Release);
completion.signal_done();
return handle;
}
self.maybe_spawn_thread();
self.notify_one();
handle
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.inner.pending_count.load(Ordering::Relaxed)
}
#[must_use]
pub fn active_threads(&self) -> usize {
self.inner.active_threads.load(Ordering::Relaxed)
}
#[must_use]
pub fn busy_threads(&self) -> usize {
self.inner.busy_threads.load(Ordering::Relaxed)
}
#[must_use]
pub fn is_shutdown(&self) -> bool {
self.inner.shutdown.load(Ordering::Acquire)
}
pub fn shutdown(&self) {
let _guard = self.inner.mutex.lock();
self.inner.shutdown.store(true, Ordering::Release);
self.inner.condvar.notify_all();
}
pub fn shutdown_and_wait(&self, timeout: Duration) -> bool {
self.shutdown();
let deadline = timeout_deadline(timeout, self.inner.time_getter);
while self.inner.active_threads.load(Ordering::Acquire) > 0 {
let remaining = timeout_remaining(deadline, self.inner.time_getter);
if remaining.is_zero() {
return false;
}
self.notify_all();
(self.inner.sleep_fn)(Duration::from_millis(10).min(remaining));
}
{
let mut handles = self.inner.thread_handles.lock();
for handle in handles.drain(..) {
let _ = handle.join();
}
}
true
}
fn spawn_thread(&self) {
spawn_thread_on_inner(&self.inner);
}
fn maybe_spawn_thread(&self) {
maybe_spawn_thread_on_inner(&self.inner);
}
fn notify_one(&self) {
let _guard = self.inner.mutex.lock();
self.inner.condvar.notify_one();
}
fn notify_all(&self) {
let _guard = self.inner.mutex.lock();
self.inner.condvar.notify_all();
}
}
impl Drop for BlockingPool {
fn drop(&mut self) {
self.shutdown();
let _ = self.shutdown_and_wait(Duration::from_secs(5));
}
}
impl BlockingPoolHandle {
pub fn spawn<F>(&self, f: F) -> BlockingTaskHandle
where
F: FnOnce() + Send + 'static,
{
self.spawn_with_priority(f, 128)
}
pub fn spawn_with_priority<F>(&self, f: F, priority: u8) -> BlockingTaskHandle
where
F: FnOnce() + Send + 'static,
{
let task_id = self.inner.next_task_id.fetch_add(1, Ordering::Relaxed);
let cancelled = Arc::new(AtomicBool::new(false));
let completion = Arc::new(BlockingTaskCompletion::new(self.inner.time_getter));
let handle = BlockingTaskHandle {
task_id,
cancelled: Arc::clone(&cancelled),
completion: Arc::clone(&completion),
};
if self.inner.shutdown.load(Ordering::Acquire) {
cancelled.store(true, Ordering::Release);
completion.signal_done();
return handle;
}
let task = BlockingTask {
work: Box::new(f),
priority,
cancelled: Arc::clone(&cancelled),
completion: Arc::clone(&completion),
};
if !try_enqueue_task(&self.inner, task) {
cancelled.store(true, Ordering::Release);
completion.signal_done();
return handle;
}
maybe_spawn_thread_on_inner(&self.inner);
{
let _guard = self.inner.mutex.lock();
self.inner.condvar.notify_one();
}
handle
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.inner.pending_count.load(Ordering::Relaxed)
}
#[must_use]
pub fn active_threads(&self) -> usize {
self.inner.active_threads.load(Ordering::Relaxed)
}
#[must_use]
pub fn is_shutdown(&self) -> bool {
self.inner.shutdown.load(Ordering::Acquire)
}
}
fn try_enqueue_task(inner: &Arc<BlockingPoolInner>, task: BlockingTask) -> bool {
let _guard = inner.mutex.lock();
if inner.shutdown.load(Ordering::Acquire) {
return false;
}
inner.queue.push(task);
inner.pending_count.fetch_add(1, Ordering::Relaxed);
true
}
#[derive(Clone)]
pub struct BlockingPoolOptions {
pub idle_timeout: Duration,
pub time_getter: TimeGetter,
pub sleep_fn: SleepFn,
pub thread_name_prefix: String,
pub on_thread_start: Option<Arc<dyn Fn() + Send + Sync>>,
pub on_thread_stop: Option<Arc<dyn Fn() + Send + Sync>>,
}
impl Default for BlockingPoolOptions {
fn default() -> Self {
Self {
idle_timeout: DEFAULT_IDLE_TIMEOUT,
time_getter: wall_clock_now,
sleep_fn: blocking_thread_sleep,
thread_name_prefix: "asupersync".to_string(),
on_thread_start: None,
on_thread_stop: None,
}
}
}
impl fmt::Debug for BlockingPoolOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlockingPoolOptions")
.field("idle_timeout", &self.idle_timeout)
.field(
"custom_time_getter",
&(!std::ptr::fn_addr_eq(self.time_getter, wall_clock_now as TimeGetter)),
)
.field(
"custom_sleep_fn",
&(!std::ptr::fn_addr_eq(self.sleep_fn, blocking_thread_sleep as SleepFn)),
)
.field("thread_name_prefix", &self.thread_name_prefix)
.field("on_thread_start", &self.on_thread_start.is_some())
.field("on_thread_stop", &self.on_thread_stop.is_some())
.finish()
}
}
fn spawn_thread_on_inner(inner: &Arc<BlockingPoolInner>) {
loop {
let current = inner.active_threads.load(Ordering::Relaxed);
if current >= inner.max_threads {
return;
}
if inner
.active_threads
.compare_exchange_weak(current, current + 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
let inner_clone = Arc::clone(inner);
let thread_id = inner.next_thread_id.fetch_add(1, Ordering::Relaxed);
let name = format!("{}-blocking-{}", inner.thread_name_prefix, thread_id);
match thread::Builder::new().name(name).spawn(move || {
struct ThreadExitGuard<'a> {
inner: &'a Arc<BlockingPoolInner>,
retired_with_claim: bool,
}
impl Drop for ThreadExitGuard<'_> {
fn drop(&mut self) {
if let Some(ref callback) = self.inner.on_thread_stop {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
callback();
}));
}
if !self.retired_with_claim {
self.inner.active_threads.fetch_sub(1, Ordering::Relaxed);
if self.inner.pending_count.load(Ordering::Acquire) > 0
&& !self.inner.queue.is_empty()
{
maybe_spawn_thread_on_inner(self.inner);
let _guard = self.inner.mutex.lock();
self.inner.condvar.notify_one();
}
}
}
}
let mut guard = ThreadExitGuard {
inner: &inner_clone,
retired_with_claim: false,
};
if let Some(ref callback) = inner_clone.on_thread_start {
callback();
}
guard.retired_with_claim = blocking_worker_loop(&inner_clone);
let _ = guard.retired_with_claim;
}) {
Ok(handle) => {
let mut handles = inner.thread_handles.lock();
handles.push(handle);
let mut i = 0;
while i < handles.len() {
if handles[i].is_finished() {
let _ = handles.swap_remove(i).join();
} else {
i += 1;
}
}
drop(handles);
}
Err(_) => {
inner.active_threads.fetch_sub(1, Ordering::Relaxed);
}
}
}
fn maybe_spawn_thread_on_inner(inner: &Arc<BlockingPoolInner>) {
let active = inner.active_threads.load(Ordering::Relaxed);
let busy = inner.busy_threads.load(Ordering::Relaxed);
let pending = inner.pending_count.load(Ordering::Relaxed);
let idle = active.saturating_sub(busy);
if active < inner.max_threads && pending > idle {
spawn_thread_on_inner(inner);
}
}
fn try_claim_idle_retirement(inner: &BlockingPoolInner) -> bool {
let mut current = inner.active_threads.load(Ordering::Relaxed);
loop {
if current <= inner.min_threads {
return false;
}
match inner.active_threads.compare_exchange_weak(
current,
current - 1,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(next) => current = next,
}
}
}
#[allow(clippy::significant_drop_tightening)] fn blocking_worker_loop(inner: &BlockingPoolInner) -> bool {
let mut idle_since: Option<Instant> = None;
loop {
if let Some(task) = inner.queue.pop() {
idle_since = None;
inner.busy_threads.fetch_add(1, Ordering::Relaxed);
inner.pending_count.fetch_sub(1, Ordering::Relaxed);
if task.cancelled.load(Ordering::Acquire) {
inner.busy_threads.fetch_sub(1, Ordering::Relaxed);
task.completion.signal_done();
continue;
}
let _result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(task.work));
inner.busy_threads.fetch_sub(1, Ordering::Relaxed);
task.completion.signal_done();
continue;
}
if inner.shutdown.load(Ordering::Acquire) {
break;
}
let active = inner.active_threads.load(Ordering::Relaxed);
if active > inner.min_threads {
let now = (inner.time_getter)();
let start = *idle_since.get_or_insert(now);
let elapsed = now.saturating_duration_since(start);
if elapsed >= inner.idle_timeout {
if inner.queue.is_empty() && try_claim_idle_retirement(inner) {
if inner.queue.is_empty() {
return true;
}
{
let mut current = inner.active_threads.load(Ordering::Relaxed);
let mut unretired = false;
loop {
if current >= inner.max_threads {
break;
}
match inner.active_threads.compare_exchange_weak(
current,
current + 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
unretired = true;
break;
}
Err(next) => current = next,
}
}
if !unretired {
inner.active_threads.fetch_add(1, Ordering::Relaxed);
}
}
}
idle_since = None;
continue;
}
let remaining = inner.idle_timeout.saturating_sub(elapsed);
let mut guard = inner.mutex.lock();
if !inner.queue.is_empty() {
drop(guard);
continue;
}
if inner.shutdown.load(Ordering::Acquire) {
drop(guard);
break;
}
let _wait_result = inner.condvar.wait_for(&mut guard, remaining);
drop(guard);
} else {
idle_since = None;
let mut guard = inner.mutex.lock();
if !inner.queue.is_empty() {
drop(guard);
continue;
}
if inner.shutdown.load(Ordering::Acquire) {
drop(guard);
break;
}
inner.condvar.wait(&mut guard);
drop(guard);
}
}
false
}
#[cfg(test)]
#[allow(dead_code)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize};
use std::sync::{Condvar as StdCondvar, Mutex as StdMutex, OnceLock};
static DETERMINISTIC_HOOK_TEST_LOCK: OnceLock<StdMutex<()>> = OnceLock::new();
static SCRIPTED_TIME_BASE: OnceLock<Instant> = OnceLock::new();
static SCRIPTED_TIME_CALLS: AtomicUsize = AtomicUsize::new(0);
static SCRIPTED_TIME_OFFSET_MS: AtomicU64 = AtomicU64::new(0);
static SCRIPTED_SLEEP_CALLS: AtomicUsize = AtomicUsize::new(0);
fn deterministic_hook_test_guard() -> std::sync::MutexGuard<'static, ()> {
DETERMINISTIC_HOOK_TEST_LOCK
.get_or_init(|| StdMutex::new(()))
.lock()
.expect("deterministic hook test lock poisoned")
}
fn reset_scripted_time_state() {
SCRIPTED_TIME_CALLS.store(0, Ordering::Relaxed);
SCRIPTED_TIME_OFFSET_MS.store(0, Ordering::Relaxed);
SCRIPTED_SLEEP_CALLS.store(0, Ordering::Relaxed);
}
fn scripted_time_base() -> Instant {
*SCRIPTED_TIME_BASE.get_or_init(Instant::now)
}
fn stepped_timeout_time() -> Instant {
let base = scripted_time_base();
if SCRIPTED_TIME_CALLS.fetch_add(1, Ordering::Relaxed) == 0 {
base
} else {
base + Duration::from_millis(25)
}
}
fn advancing_timeout_time() -> Instant {
scripted_time_base()
+ Duration::from_millis(SCRIPTED_TIME_OFFSET_MS.load(Ordering::Relaxed))
}
fn advancing_timeout_sleep(duration: Duration) {
SCRIPTED_SLEEP_CALLS.fetch_add(1, Ordering::Relaxed);
let millis = duration.as_millis().min(u128::from(u64::MAX)) as u64;
SCRIPTED_TIME_OFFSET_MS.fetch_add(millis, Ordering::Relaxed);
}
#[test]
fn basic_spawn_and_wait() {
let pool = BlockingPool::new(1, 4);
let counter = Arc::new(AtomicI32::new(0));
let counter_clone = Arc::clone(&counter);
let handle = pool.spawn(move || {
counter_clone.fetch_add(1, Ordering::Relaxed);
});
handle.wait();
assert!(handle.is_done());
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[test]
fn multiple_tasks() {
let pool = BlockingPool::new(2, 8);
let counter = Arc::new(AtomicI32::new(0));
let mut handles = Vec::new();
for _ in 0..100 {
let counter_clone = Arc::clone(&counter);
handles.push(pool.spawn(move || {
counter_clone.fetch_add(1, Ordering::Relaxed);
}));
}
for handle in handles {
handle.wait();
}
assert_eq!(counter.load(Ordering::Relaxed), 100);
}
#[test]
fn test_spawn_from_handle() {
let pool = BlockingPool::new(1, 4);
let handle = pool.handle();
let counter = Arc::new(AtomicI32::new(0));
let c = Arc::clone(&counter);
let task = handle.spawn(move || {
c.fetch_add(1, Ordering::Relaxed);
});
task.wait();
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[test]
fn test_active_threads_starts_at_min() {
let pool = BlockingPool::new(3, 8);
thread::sleep(Duration::from_millis(50));
assert_eq!(pool.active_threads(), 3);
}
#[test]
fn cancellation_before_execution() {
let pool = BlockingPool::new(0, 1); let counter = Arc::new(AtomicI32::new(0));
let counter_clone = Arc::clone(&counter);
let handle = pool.spawn(move || {
counter_clone.fetch_add(1, Ordering::Relaxed);
});
handle.cancel();
assert!(handle.is_cancelled());
let _ = handle.wait_timeout(Duration::from_secs(2));
thread::sleep(Duration::from_millis(50));
}
#[test]
fn test_shutdown_and_wait_empty_pool() {
let pool = BlockingPool::new(2, 4);
thread::sleep(Duration::from_millis(20));
let start = std::time::Instant::now();
let result = pool.shutdown_and_wait(Duration::from_secs(2));
let elapsed = start.elapsed();
assert!(result, "Shutdown should succeed");
assert!(elapsed < Duration::from_secs(1));
assert_eq!(pool.active_threads(), 0);
}
#[test]
fn test_shutdown_and_wait_timeout_respected() {
let pool = BlockingPool::new(1, 1);
pool.spawn(|| {
thread::sleep(Duration::from_millis(200));
});
thread::sleep(Duration::from_millis(20));
let start = std::time::Instant::now();
let result = pool.shutdown_and_wait(Duration::from_millis(50));
let elapsed = start.elapsed();
assert!(!result, "Expected timeout to return false");
assert!(elapsed >= Duration::from_millis(50));
assert!(elapsed < Duration::from_secs(1));
}
#[test]
fn test_shutdown_idempotent() {
let pool = BlockingPool::new(1, 2);
pool.spawn(|| {});
pool.shutdown();
assert!(pool.is_shutdown());
pool.shutdown();
assert!(pool.is_shutdown());
assert!(pool.shutdown_and_wait(Duration::from_secs(2)));
}
#[test]
fn spawn_after_shutdown_is_rejected() {
let pool = BlockingPool::new(1, 2);
pool.shutdown();
let counter = Arc::new(AtomicI32::new(0));
let c = Arc::clone(&counter);
let handle = pool.spawn(move || {
c.fetch_add(1, Ordering::Relaxed);
});
assert!(handle.is_cancelled());
assert!(handle.wait_timeout(Duration::from_millis(100)));
assert_eq!(counter.load(Ordering::Relaxed), 0);
}
#[test]
fn handle_spawn_after_shutdown_is_rejected() {
let pool = BlockingPool::new(1, 2);
let handle_api = pool.handle();
pool.shutdown();
let counter = Arc::new(AtomicI32::new(0));
let c = Arc::clone(&counter);
let handle = handle_api.spawn(move || {
c.fetch_add(1, Ordering::Relaxed);
});
assert!(handle.is_cancelled());
assert!(handle.wait_timeout(Duration::from_millis(100)));
assert_eq!(counter.load(Ordering::Relaxed), 0);
}
#[test]
fn spawn_rechecks_shutdown_before_queueing_under_submission_lock() {
let pool = BlockingPool::new(0, 1);
let handle_api = pool.handle();
let executed = Arc::new(AtomicBool::new(false));
let gate = Arc::new(std::sync::Barrier::new(2));
let submission_guard = pool.inner.mutex.lock();
let executed_clone = Arc::clone(&executed);
let gate_clone = Arc::clone(&gate);
let join = thread::spawn(move || {
gate_clone.wait();
handle_api.spawn(move || {
executed_clone.store(true, Ordering::Release);
})
});
gate.wait();
pool.inner.shutdown.store(true, Ordering::Release);
drop(submission_guard);
let handle = join.join().expect("spawn thread should return a handle");
assert!(handle.is_cancelled());
assert!(handle.wait_timeout(Duration::from_millis(100)));
assert_eq!(pool.pending_count(), 0);
assert_eq!(pool.active_threads(), 0);
assert!(!executed.load(Ordering::Acquire));
}
#[test]
fn wait_timeout() {
let pool = BlockingPool::new(1, 1);
let handle = pool.spawn(|| {
thread::sleep(Duration::from_millis(500));
});
assert!(!handle.wait_timeout(Duration::from_millis(10)));
assert!(handle.wait_timeout(Duration::from_secs(2)));
assert!(handle.is_done());
}
#[test]
fn test_worker_parks_on_empty() {
let pool = BlockingPool::new(2, 4);
thread::sleep(Duration::from_millis(50));
assert_eq!(pool.busy_threads(), 0);
}
#[test]
fn test_worker_wakes_on_task() {
let pool = BlockingPool::new(1, 2);
thread::sleep(Duration::from_millis(50));
let counter = Arc::new(AtomicI32::new(0));
let c = Arc::clone(&counter);
let handle = pool.spawn(move || {
c.fetch_add(1, Ordering::Relaxed);
});
assert!(handle.wait_timeout(Duration::from_secs(2)));
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[test]
fn test_worker_idle_timeout_excess_threads_exit() {
let options = BlockingPoolOptions {
idle_timeout: Duration::from_millis(50),
..Default::default()
};
let pool = BlockingPool::with_config(0, 3, options);
let barrier = Arc::new(std::sync::Barrier::new(4));
let mut handles = Vec::new();
for _ in 0..3 {
let b = Arc::clone(&barrier);
handles.push(pool.spawn(move || {
b.wait();
}));
}
thread::sleep(Duration::from_millis(50));
let active_before = pool.active_threads();
assert!(active_before >= 1);
barrier.wait();
for h in handles {
h.wait();
}
thread::sleep(Duration::from_millis(300));
let active_after = pool.active_threads();
assert!(
active_after <= 1,
"Expected excess threads to retire, active_after={active_after}"
);
}
#[test]
fn thread_scaling() {
let pool = BlockingPool::new(1, 4);
assert_eq!(pool.active_threads(), 1);
let counter = Arc::new(AtomicI32::new(0));
let mut handles = Vec::new();
for _ in 0..4 {
let counter_clone = Arc::clone(&counter);
handles.push(pool.spawn(move || {
counter_clone.fetch_add(1, Ordering::Relaxed);
thread::sleep(Duration::from_millis(10));
}));
}
for handle in handles {
handle.wait();
}
assert_eq!(counter.load(Ordering::Relaxed), 4);
assert!(pool.active_threads() >= 1);
}
#[test]
fn test_task_panic_caught() {
let pool = BlockingPool::new(2, 4);
let _ = pool.spawn(|| unreachable!("intentional panic"));
thread::sleep(Duration::from_millis(50));
let counter = Arc::new(AtomicI32::new(0));
let c = Arc::clone(&counter);
let handle = pool.spawn(move || {
c.fetch_add(1, Ordering::Relaxed);
});
handle.wait();
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[test]
fn shutdown_graceful() {
let pool = BlockingPool::new(2, 4);
let counter = Arc::new(AtomicI32::new(0));
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
pool.spawn(move || {
counter_clone.fetch_add(1, Ordering::Relaxed);
});
}
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
assert_eq!(counter.load(Ordering::Relaxed), 10);
}
#[test]
fn handle_cloning() {
let pool = BlockingPool::new(1, 4);
let handle = pool.handle();
let handle2 = handle.clone();
let counter = Arc::new(AtomicI32::new(0));
let c1 = Arc::clone(&counter);
let t1 = handle.spawn(move || {
c1.fetch_add(1, Ordering::Relaxed);
});
let c2 = Arc::clone(&counter);
let t2 = handle2.spawn(move || {
c2.fetch_add(1, Ordering::Relaxed);
});
t1.wait();
t2.wait();
assert_eq!(counter.load(Ordering::Relaxed), 2);
}
#[test]
fn test_queue_concurrent_push() {
let pool = BlockingPool::new(2, 8);
let counter = Arc::new(AtomicU64::new(0));
let mut spawners = Vec::new();
let spawner_count: u64 = 4;
let tasks_per_spawner: u64 = 50;
for _ in 0..spawner_count {
let pool_handle = pool.handle();
let c = Arc::clone(&counter);
spawners.push(thread::spawn(move || {
for _ in 0..tasks_per_spawner {
let c_inner = Arc::clone(&c);
pool_handle.spawn(move || {
c_inner.fetch_add(1, Ordering::Relaxed);
});
}
}));
}
for spawner in spawners {
spawner.join().expect("spawner panicked");
}
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
assert_eq!(
counter.load(Ordering::Relaxed),
spawner_count * tasks_per_spawner
);
}
#[test]
fn pool_metrics() {
let pool = BlockingPool::new(1, 4);
assert_eq!(pool.active_threads(), 1);
assert_eq!(pool.pending_count(), 0);
assert_eq!(pool.busy_threads(), 0);
let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier_clone = Arc::clone(&barrier);
let _handle = pool.spawn(move || {
barrier_clone.wait();
});
thread::sleep(Duration::from_millis(10));
assert_eq!(pool.busy_threads(), 1);
barrier.wait();
}
#[test]
fn min_max_normalization() {
let pool = BlockingPool::new(4, 2);
assert!(pool.active_threads() >= 4);
}
#[test]
fn thread_callbacks() {
let started = Arc::new(AtomicI32::new(0));
let stopped = Arc::new(AtomicI32::new(0));
let started_clone = Arc::clone(&started);
let stopped_clone = Arc::clone(&stopped);
let options = BlockingPoolOptions {
on_thread_start: Some(Arc::new(move || {
started_clone.fetch_add(1, Ordering::Relaxed);
})),
on_thread_stop: Some(Arc::new(move || {
stopped_clone.fetch_add(1, Ordering::Relaxed);
})),
..Default::default()
};
let pool = BlockingPool::with_config(2, 4, options);
thread::sleep(Duration::from_millis(50));
assert_eq!(started.load(Ordering::Relaxed), 2);
pool.shutdown_and_wait(Duration::from_secs(5));
assert_eq!(stopped.load(Ordering::Relaxed), 2);
}
#[test]
fn test_thread_name_unique() {
let options = BlockingPoolOptions {
thread_name_prefix: "unique-pool".to_string(),
..Default::default()
};
let pool = BlockingPool::with_config(2, 2, options);
let barrier = Arc::new(std::sync::Barrier::new(3));
let names = Arc::new(Mutex::new(Vec::new()));
let mut handles = Vec::new();
for _ in 0..2 {
let b = Arc::clone(&barrier);
let n = Arc::clone(&names);
handles.push(pool.spawn(move || {
if let Some(name) = thread::current().name() {
n.lock().push(name.to_string());
}
b.wait();
}));
}
barrier.wait();
for h in handles {
h.wait();
}
let recorded = names.lock().clone();
let unique: HashSet<_> = recorded.into_iter().collect();
assert_eq!(unique.len(), 2, "Expected two unique thread names");
}
#[test]
fn panicking_task_does_not_hang_waiters() {
let prev_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(|_| {}));
let pool = BlockingPool::new(1, 1);
let panic_handle = pool.spawn(|| {
unreachable!("intentional test panic");
});
let survived = Arc::new(AtomicBool::new(false));
let survived_clone = Arc::clone(&survived);
let follow_up = pool.spawn(move || {
survived_clone.store(true, Ordering::Release);
});
assert!(
panic_handle.wait_timeout(Duration::from_secs(5)),
"panicking task should signal completion, not hang"
);
assert!(
follow_up.wait_timeout(Duration::from_secs(5)),
"follow-up task should complete on the surviving worker"
);
assert!(
survived.load(Ordering::Acquire),
"worker thread should survive a task panic"
);
std::panic::set_hook(prev_hook);
}
#[test]
fn idle_retirement_claim_allows_only_one_thread_at_floor() {
let inner = Arc::new(BlockingPoolInner {
min_threads: 1,
max_threads: 2,
active_threads: AtomicUsize::new(2),
busy_threads: AtomicUsize::new(0),
pending_count: AtomicUsize::new(0),
next_task_id: AtomicU64::new(1),
next_thread_id: AtomicU64::new(1),
queue: SegQueue::new(),
shutdown: AtomicBool::new(false),
condvar: Condvar::new(),
mutex: Mutex::new(()),
idle_timeout: Duration::from_millis(1),
time_getter: wall_clock_now,
sleep_fn: blocking_thread_sleep,
thread_name_prefix: "retire-test".to_string(),
on_thread_start: None,
on_thread_stop: None,
thread_handles: Mutex::new(Vec::new()),
});
let barrier = Arc::new(std::sync::Barrier::new(3));
let claims = Arc::new(AtomicUsize::new(0));
let mut joiners = Vec::new();
for _ in 0..2 {
let inner_clone = Arc::clone(&inner);
let barrier_clone = Arc::clone(&barrier);
let claims_clone = Arc::clone(&claims);
joiners.push(thread::spawn(move || {
barrier_clone.wait();
if try_claim_idle_retirement(&inner_clone) {
claims_clone.fetch_add(1, Ordering::Relaxed);
}
}));
}
barrier.wait();
for joiner in joiners {
joiner.join().expect("retirement claimant panicked");
}
assert_eq!(
claims.load(Ordering::Relaxed),
1,
"exactly one worker should claim the retirement slot at the floor"
);
assert_eq!(
inner.active_threads.load(Ordering::Relaxed),
inner.min_threads,
"retirement claims must not drop below min_threads"
);
}
#[test]
fn cancelled_task_signals_completion() {
let pool = BlockingPool::new(1, 2);
let executed = Arc::new(AtomicBool::new(false));
let exec = Arc::clone(&executed);
let handle = pool.spawn(move || {
thread::sleep(Duration::from_millis(200));
exec.store(true, Ordering::Release);
});
handle.cancel();
assert!(
handle.wait_timeout(Duration::from_secs(5)),
"cancelled task must signal completion"
);
assert!(handle.is_done());
}
#[test]
fn busy_threads_balanced_through_panic() {
let prev_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(|_| {}));
let pool = BlockingPool::new(2, 4);
let h1 = pool.spawn(|| unreachable!("audit panic"));
h1.wait();
thread::sleep(Duration::from_millis(50));
assert_eq!(
pool.busy_threads(),
0,
"busy_threads must be decremented even after panic"
);
std::panic::set_hook(prev_hook);
}
#[test]
fn spawn_thread_on_inner_respects_max_threads() {
let inner = Arc::new(BlockingPoolInner {
min_threads: 0,
max_threads: 2,
active_threads: AtomicUsize::new(2),
busy_threads: AtomicUsize::new(0),
pending_count: AtomicUsize::new(0),
next_task_id: AtomicU64::new(1),
next_thread_id: AtomicU64::new(1),
queue: SegQueue::new(),
shutdown: AtomicBool::new(false),
condvar: Condvar::new(),
mutex: Mutex::new(()),
idle_timeout: Duration::from_millis(10),
time_getter: wall_clock_now,
sleep_fn: blocking_thread_sleep,
thread_name_prefix: "max-test".to_string(),
on_thread_start: None,
on_thread_stop: None,
thread_handles: Mutex::new(Vec::new()),
});
spawn_thread_on_inner(&inner);
assert_eq!(
inner.active_threads.load(Ordering::Relaxed),
2,
"spawn must not exceed max_threads"
);
}
#[test]
fn spawn_thread_on_inner_rollback_on_overflow() {
let inner = Arc::new(BlockingPoolInner {
min_threads: 0,
max_threads: 1,
active_threads: AtomicUsize::new(1),
busy_threads: AtomicUsize::new(0),
pending_count: AtomicUsize::new(0),
next_task_id: AtomicU64::new(1),
next_thread_id: AtomicU64::new(1),
queue: SegQueue::new(),
shutdown: AtomicBool::new(false),
condvar: Condvar::new(),
mutex: Mutex::new(()),
idle_timeout: Duration::from_millis(10),
time_getter: wall_clock_now,
sleep_fn: blocking_thread_sleep,
thread_name_prefix: "overflow".to_string(),
on_thread_start: None,
on_thread_stop: None,
thread_handles: Mutex::new(Vec::new()),
});
spawn_thread_on_inner(&inner);
assert_eq!(inner.active_threads.load(Ordering::Relaxed), 1);
assert_eq!(inner.thread_handles.lock().len(), 0);
}
#[test]
fn completion_wait_after_signal_returns_immediately() {
let comp = BlockingTaskCompletion::new(wall_clock_now);
comp.signal_done();
assert!(comp.wait_timeout(Duration::from_millis(0)));
}
#[test]
fn completion_wait_timeout_uses_custom_time_getter() {
let _guard = deterministic_hook_test_guard();
reset_scripted_time_state();
let completion = BlockingTaskCompletion::new(stepped_timeout_time);
assert!(
!completion.wait_timeout(Duration::from_millis(10)),
"custom time getter should let wait_timeout observe elapsed time without wall sleep"
);
assert_eq!(
SCRIPTED_TIME_CALLS.load(Ordering::Relaxed),
2,
"timeout path should only consult the synthetic clock for deadline and remaining time"
);
}
#[test]
fn worker_idle_retirement_uses_custom_time_getter() {
let _guard = deterministic_hook_test_guard();
reset_scripted_time_state();
let retired = Arc::new((StdMutex::new(false), StdCondvar::new()));
let retired_signal = Arc::clone(&retired);
let pool = BlockingPool::with_config(
0,
1,
BlockingPoolOptions {
idle_timeout: Duration::from_millis(5),
time_getter: stepped_timeout_time,
on_thread_stop: Some(Arc::new(move || {
let (lock, condvar) = &*retired_signal;
{
let mut retired = lock.lock().expect("retirement flag poisoned");
*retired = true;
}
condvar.notify_all();
})),
..Default::default()
},
);
pool.spawn(|| {}).wait();
let (lock, condvar) = &*retired;
let retired = {
let retired = lock.lock().expect("retirement flag poisoned");
let (retired, _timeout) = condvar
.wait_timeout_while(retired, Duration::from_secs(1), |retired| !*retired)
.expect("retirement wait poisoned");
*retired
};
assert!(
retired,
"synthetic time getter should retire the idle worker without long wall sleeps"
);
assert_eq!(
pool.active_threads(),
0,
"idle retirement should decrement active thread count to zero"
);
assert!(
SCRIPTED_TIME_CALLS.load(Ordering::Relaxed) >= 2,
"idle retirement path should consult the scripted clock across multiple loop turns"
);
}
#[test]
fn shutdown_and_wait_uses_custom_time_and_sleep_hooks() {
let _guard = deterministic_hook_test_guard();
reset_scripted_time_state();
let pool = BlockingPool::with_config(
0,
1,
BlockingPoolOptions {
time_getter: advancing_timeout_time,
sleep_fn: advancing_timeout_sleep,
..Default::default()
},
);
pool.inner.active_threads.store(1, Ordering::Release);
assert!(
!pool.shutdown_and_wait(Duration::from_millis(25)),
"synthetic time should drive shutdown timeout accounting without wall sleep"
);
assert!(
pool.is_shutdown(),
"shutdown flag should be set before waiting"
);
assert!(
SCRIPTED_SLEEP_CALLS.load(Ordering::Relaxed) > 0,
"shutdown wait loop should use the configured sleep hook"
);
assert_eq!(
SCRIPTED_TIME_OFFSET_MS.load(Ordering::Relaxed),
25,
"sleep hook should advance the synthetic clock through the full timeout budget"
);
pool.inner.active_threads.store(0, Ordering::Release);
}
#[test]
fn shutdown_drains_pending_tasks() {
let pool = BlockingPool::new(1, 1);
let blocker = Arc::new(std::sync::Barrier::new(2));
let b = Arc::clone(&blocker);
pool.spawn(move || {
b.wait();
});
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..5 {
let c = Arc::clone(&counter);
let _handle = pool.spawn(move || {
c.fetch_add(1, Ordering::Relaxed);
});
}
blocker.wait();
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
assert_eq!(
counter.load(Ordering::Relaxed),
5,
"all queued tasks must execute before shutdown completes"
);
}
#[test]
fn handle_spawn_accepted_before_shutdown_still_runs() {
let exiting = Arc::new((StdMutex::new(false), StdCondvar::new()));
let exit_gate = Arc::new((StdMutex::new(false), StdCondvar::new()));
let exiting_signal = Arc::clone(&exiting);
let exit_gate_signal = Arc::clone(&exit_gate);
let pool = BlockingPool::with_config(
0,
1,
BlockingPoolOptions {
on_thread_stop: Some(Arc::new(move || {
let (lock, condvar) = &*exiting_signal;
{
let mut exiting = lock.lock().expect("exit signal poisoned");
*exiting = true;
}
condvar.notify_all();
let (gate_lock, gate_condvar) = &*exit_gate_signal;
let mut release = gate_lock.lock().expect("exit gate poisoned");
while !*release {
release = gate_condvar.wait(release).expect("exit gate poisoned");
}
drop(release);
})),
..Default::default()
},
);
pool.spawn(|| {}).wait();
pool.shutdown();
let (exiting_lock, exiting_condvar) = &*exiting;
let (exiting, _timeout) = exiting_condvar
.wait_timeout_while(
exiting_lock.lock().expect("exit signal poisoned"),
Duration::from_secs(1),
|exiting| !*exiting,
)
.expect("exit signal wait poisoned");
assert!(
*exiting,
"worker should enter the stop callback before the late task is enqueued"
);
drop(exiting);
let ran = Arc::new(AtomicUsize::new(0));
let ran_clone = Arc::clone(&ran);
let task_id = pool.inner.next_task_id.fetch_add(1, Ordering::Relaxed);
let cancelled = Arc::new(AtomicBool::new(false));
let completion = Arc::new(BlockingTaskCompletion::new(pool.inner.time_getter));
let handle = BlockingTaskHandle {
task_id,
cancelled: Arc::clone(&cancelled),
completion: Arc::clone(&completion),
};
let task = BlockingTask {
work: Box::new(move || {
ran_clone.fetch_add(1, Ordering::Relaxed);
}),
priority: 128,
cancelled: Arc::clone(&cancelled),
completion: Arc::clone(&completion),
};
pool.inner.queue.push(task);
pool.inner.pending_count.fetch_add(1, Ordering::Relaxed);
maybe_spawn_thread_on_inner(&pool.inner);
{
let _guard = pool.inner.mutex.lock();
pool.inner.condvar.notify_one();
}
let (gate_lock, gate_condvar) = &*exit_gate;
{
let mut release = gate_lock.lock().expect("exit gate poisoned");
*release = true;
}
gate_condvar.notify_all();
assert!(
handle.wait_timeout(Duration::from_secs(5)),
"accepted work must still complete even if shutdown starts while the last worker exits"
);
assert_eq!(
ran.load(Ordering::Relaxed),
1,
"late accepted task should run exactly once"
);
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
mod spawn_blocking_conformance {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};
struct ConformanceTestData {
thread_ids: Arc<Mutex<Vec<thread::ThreadId>>>,
execution_count: Arc<AtomicU32>,
barrier: Arc<Barrier>,
}
impl ConformanceTestData {
fn new(expected_threads: usize) -> Self {
Self {
thread_ids: Arc::new(Mutex::new(Vec::new())),
execution_count: Arc::new(AtomicU32::new(0)),
barrier: Arc::new(Barrier::new(expected_threads + 1)), }
}
fn record_execution(&self) {
let current_thread = thread::current().id();
self.thread_ids.lock().push(current_thread);
self.execution_count.fetch_add(1, Ordering::SeqCst);
}
fn get_unique_thread_count(&self) -> usize {
let ids = self.thread_ids.lock();
let mut unique_ids = Vec::new();
for id in ids.iter() {
if !unique_ids.contains(id) {
unique_ids.push(*id);
}
}
unique_ids.len()
}
}
#[test]
fn blocking_task_scheduled_on_dedicated_thread_pool_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(2, 4);
let test_data = ConformanceTestData::new(3);
let main_thread_id = thread::current().id();
let mut handles = Vec::new();
for _ in 0..3 {
let test_data_clone = test_data.thread_ids.clone();
let barrier_clone = test_data.barrier.clone();
let handle = pool.spawn(move || {
let current_thread = thread::current().id();
test_data_clone.lock().push(current_thread);
barrier_clone.wait();
thread::sleep(Duration::from_millis(10));
});
handles.push(handle);
}
test_data.barrier.wait();
for handle in handles {
assert!(handle.wait_timeout(Duration::from_secs(5)));
}
{
let thread_ids = test_data.thread_ids.lock();
assert_eq!(thread_ids.len(), 3, "All three tasks should have executed");
for thread_id in thread_ids.iter() {
assert_ne!(
*thread_id, main_thread_id,
"Blocking tasks should not run on main thread"
);
}
}
let unique_count = test_data.get_unique_thread_count();
assert!(
unique_count >= 2,
"Should use at least 2 different threads, got {}",
unique_count
);
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn cancellation_drains_pool_correctly_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 1);
let start_barrier = Arc::new(Barrier::new(2));
let finish_gate = Arc::new((Mutex::new(false), Condvar::new()));
let start_barrier_clone = start_barrier.clone();
let finish_gate_clone = finish_gate.clone();
let handle1 = pool.spawn(move || {
start_barrier_clone.wait(); let (lock, cvar) = &*finish_gate_clone;
let mut finish = lock.lock();
while !*finish {
cvar.wait(&mut finish);
}
});
start_barrier.wait();
let executed = Arc::new(AtomicBool::new(false));
let executed_clone = executed.clone();
let handle2 = pool.spawn(move || {
executed_clone.store(true, Ordering::SeqCst);
});
thread::sleep(Duration::from_millis(50));
handle2.cancel();
{
let (lock, cvar) = &*finish_gate;
let mut finish = lock.lock();
*finish = true;
cvar.notify_all();
}
assert!(handle1.wait_timeout(Duration::from_secs(5)));
assert!(!handle1.is_cancelled());
assert!(handle2.wait_timeout(Duration::from_secs(1))); assert!(handle2.is_cancelled());
assert!(
!executed.load(Ordering::SeqCst),
"Cancelled task should not execute"
);
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn panic_in_blocking_task_isolated_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 2);
let task_executed = Arc::new(AtomicBool::new(false));
let handle_panic = pool.spawn(|| {
panic!("Test panic - should be isolated");
});
let task_executed_clone = task_executed.clone();
let handle_normal = pool.spawn(move || {
task_executed_clone.store(true, Ordering::SeqCst);
});
assert!(handle_panic.wait_timeout(Duration::from_secs(5)));
assert!(handle_normal.wait_timeout(Duration::from_secs(5)));
assert!(
task_executed.load(Ordering::SeqCst),
"Normal task should execute after panic"
);
let handle_after_panic = pool.spawn(|| {
let _ = "still working";
});
assert!(handle_after_panic.wait_timeout(Duration::from_secs(5)));
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn result_returned_via_completion_mechanism_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 2);
let completion_time = Arc::new(Mutex::new(None::<Instant>));
let completion_time_clone = completion_time.clone();
let handle = pool.spawn(move || {
thread::sleep(Duration::from_millis(100));
*completion_time_clone.lock() = Some(Instant::now());
});
let start_time = Instant::now();
assert!(!handle.is_done());
assert!(handle.wait_timeout(Duration::from_secs(5)));
assert!(handle.is_done());
let end_time = Instant::now();
let elapsed = end_time.duration_since(start_time);
assert!(
elapsed >= Duration::from_millis(100),
"Should wait at least 100ms"
);
let recorded_completion = completion_time.lock();
assert!(
recorded_completion.is_some(),
"Completion time should be recorded"
);
let instant_handle = pool.spawn(|| {});
assert!(instant_handle.wait_timeout(Duration::from_secs(5)));
assert!(instant_handle.is_done());
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn completion_mechanism_timeout_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 2);
let gate = Arc::new((Mutex::new(false), Condvar::new()));
let gate_clone = gate.clone();
let handle = pool.spawn(move || {
let (lock, cvar) = &*gate_clone;
let mut release = lock.lock();
while !*release {
cvar.wait(&mut release);
}
});
let start_time = Instant::now();
assert!(!handle.wait_timeout(Duration::from_millis(100)));
let elapsed = start_time.elapsed();
assert!(
elapsed >= Duration::from_millis(90),
"Should wait at least 90ms"
);
assert!(
elapsed <= Duration::from_millis(200),
"Should timeout within 200ms"
);
assert!(!handle.is_done(), "Task should not be done after timeout");
{
let (lock, cvar) = &*gate;
let mut release = lock.lock();
*release = true;
cvar.notify_all();
}
assert!(handle.wait_timeout(Duration::from_secs(5)));
assert!(handle.is_done());
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn budget_accounting_across_poll_boundaries_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 4);
struct ResourceTracker {
task_starts: AtomicU32,
task_ends: AtomicU32,
max_concurrent: AtomicU32,
current_concurrent: AtomicU32,
}
let tracker = Arc::new(ResourceTracker {
task_starts: AtomicU32::new(0),
task_ends: AtomicU32::new(0),
max_concurrent: AtomicU32::new(0),
current_concurrent: AtomicU32::new(0),
});
let barrier = Arc::new(Barrier::new(4)); let mut handles = Vec::new();
for _i in 0..3 {
let tracker_clone = tracker.clone();
let barrier_clone = barrier.clone();
let handle = pool.spawn(move || {
tracker_clone.task_starts.fetch_add(1, Ordering::SeqCst);
let current = tracker_clone
.current_concurrent
.fetch_add(1, Ordering::SeqCst)
+ 1;
let mut max = tracker_clone.max_concurrent.load(Ordering::SeqCst);
while current > max {
match tracker_clone.max_concurrent.compare_exchange_weak(
max,
current,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(new_max) => max = new_max,
}
}
barrier_clone.wait();
thread::sleep(Duration::from_millis(50));
tracker_clone
.current_concurrent
.fetch_sub(1, Ordering::SeqCst);
tracker_clone.task_ends.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}
barrier.wait();
for handle in handles {
assert!(handle.wait_timeout(Duration::from_secs(5)));
}
assert_eq!(
tracker.task_starts.load(Ordering::SeqCst),
3,
"All tasks should start"
);
assert_eq!(
tracker.task_ends.load(Ordering::SeqCst),
3,
"All tasks should end"
);
assert_eq!(
tracker.current_concurrent.load(Ordering::SeqCst),
0,
"No tasks should be running"
);
let max_concurrent = tracker.max_concurrent.load(Ordering::SeqCst);
assert!(max_concurrent <= 4, "Should not exceed pool thread limit");
assert!(max_concurrent >= 1, "At least one task should run");
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn spawn_blocking_priority_scheduling_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 1);
let execution_order = Arc::new(Mutex::new(Vec::new()));
let start_gate = Arc::new((Mutex::new(false), Condvar::new()));
let start_gate_clone = start_gate.clone();
let handle_blocker = pool.spawn(move || {
let (lock, cvar) = &*start_gate_clone;
let mut start = lock.lock();
while !*start {
cvar.wait(&mut start);
}
});
let mut priority_handles = Vec::new();
for (priority, task_id) in [(0, "high"), (128, "medium"), (255, "low")] {
let execution_order_clone = execution_order.clone();
let handle = pool.spawn_with_priority(
move || {
execution_order_clone.lock().push(task_id);
},
priority,
);
priority_handles.push(handle);
}
thread::sleep(Duration::from_millis(50));
{
let (lock, cvar) = &*start_gate;
let mut start = lock.lock();
*start = true;
cvar.notify_all();
}
assert!(handle_blocker.wait_timeout(Duration::from_secs(5)));
for handle in priority_handles {
assert!(handle.wait_timeout(Duration::from_secs(5)));
}
let order = execution_order.lock();
assert_eq!(order.len(), 3, "All priority tasks should execute");
assert!(order.contains(&"high"));
assert!(order.contains(&"medium"));
assert!(order.contains(&"low"));
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn blocking_pool_handle_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 2);
let handle = pool.handle();
let executed = Arc::new(AtomicBool::new(false));
let executed_clone = executed.clone();
let task_handle = handle.spawn(move || {
executed_clone.store(true, Ordering::SeqCst);
});
assert!(task_handle.wait_timeout(Duration::from_secs(5)));
assert!(
executed.load(Ordering::SeqCst),
"Handle-spawned task should execute"
);
let priority_executed = Arc::new(AtomicBool::new(false));
let priority_executed_clone = priority_executed.clone();
let priority_handle = handle.spawn_with_priority(
move || {
priority_executed_clone.store(true, Ordering::SeqCst);
},
64, );
assert!(priority_handle.wait_timeout(Duration::from_secs(5)));
assert!(
priority_executed.load(Ordering::SeqCst),
"Priority handle task should execute"
);
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn blocking_task_lifecycle_state_transitions_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 2);
let gate = Arc::new((Mutex::new(false), Condvar::new()));
let gate_clone = gate.clone();
let handle = pool.spawn(move || {
let (lock, cvar) = &*gate_clone;
let mut release = lock.lock();
while !*release {
cvar.wait(&mut release);
}
});
assert!(!handle.is_done());
assert!(!handle.is_cancelled());
handle.cancel();
assert!(handle.is_cancelled());
assert!(!handle.is_done());
{
let (lock, cvar) = &*gate;
let mut release = lock.lock();
*release = true;
cvar.notify_all();
}
assert!(handle.wait_timeout(Duration::from_secs(5)));
assert!(handle.is_done());
assert!(handle.is_cancelled());
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
#[test]
fn blocking_pool_shutdown_lifecycle_conformance() {
let _guard = deterministic_hook_test_guard();
let pool = BlockingPool::new(1, 2);
let pre_shutdown_executed = Arc::new(AtomicBool::new(false));
let pre_shutdown_executed_clone = pre_shutdown_executed.clone();
let handle_pre = pool.spawn(move || {
thread::sleep(Duration::from_millis(100)); pre_shutdown_executed_clone.store(true, Ordering::SeqCst);
});
pool.shutdown();
let post_shutdown_executed = Arc::new(AtomicBool::new(false));
let post_shutdown_executed_clone = post_shutdown_executed.clone();
let handle_post = pool.spawn(move || {
post_shutdown_executed_clone.store(true, Ordering::SeqCst);
});
assert!(handle_pre.wait_timeout(Duration::from_secs(5)));
assert!(
pre_shutdown_executed.load(Ordering::SeqCst),
"Pre-shutdown task should execute"
);
assert!(handle_post.wait_timeout(Duration::from_secs(1))); assert!(handle_post.is_cancelled());
assert!(
!post_shutdown_executed.load(Ordering::SeqCst),
"Post-shutdown task should not execute"
);
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
}
}
}
mod metamorphic;