#![allow(clippy::all)]
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::super::BlockingPool;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Barrier, Condvar, Mutex};
use std::time::{Duration, Instant};
fn run_queued_cancellation_scenario(cancel_repeats: usize) -> (Vec<u8>, bool, bool) {
let pool = BlockingPool::new(1, 1);
let start_barrier = Arc::new(Barrier::new(2));
let finish_gate = Arc::new((Mutex::new(false), Condvar::new()));
let execution_order = Arc::new(Mutex::new(Vec::new()));
let cancelled_executed = Arc::new(AtomicBool::new(false));
let follower_executed = Arc::new(AtomicBool::new(false));
let start_barrier_clone = Arc::clone(&start_barrier);
let finish_gate_clone = Arc::clone(&finish_gate);
let execution_order_clone = Arc::clone(&execution_order);
let handle1 = pool.spawn(move || {
start_barrier_clone.wait();
let (lock, cvar) = &*finish_gate_clone;
let mut finish = lock.lock().unwrap();
while !*finish {
finish = cvar.wait(finish).unwrap();
}
execution_order_clone.lock().unwrap().push(1);
});
start_barrier.wait();
let cancelled_executed_clone = Arc::clone(&cancelled_executed);
let execution_order_clone = Arc::clone(&execution_order);
let handle2 = pool.spawn(move || {
cancelled_executed_clone.store(true, Ordering::SeqCst);
execution_order_clone.lock().unwrap().push(2);
});
let follower_executed_clone = Arc::clone(&follower_executed);
let execution_order_clone = Arc::clone(&execution_order);
let handle3 = pool.spawn(move || {
follower_executed_clone.store(true, Ordering::SeqCst);
execution_order_clone.lock().unwrap().push(3);
});
let queue_deadline = Instant::now() + Duration::from_secs(1);
while pool.pending_count() < 2 && Instant::now() < queue_deadline {
std::thread::sleep(Duration::from_millis(1));
}
assert!(
pool.pending_count() >= 2,
"blocked worker should leave the cancelled task and follower queued"
);
for _ in 0..cancel_repeats {
handle2.cancel();
}
assert!(
handle2.is_cancelled(),
"queued task should report cancelled"
);
{
let (lock, cvar) = &*finish_gate;
let mut finish = lock.lock().unwrap();
*finish = true;
cvar.notify_all();
}
assert!(handle1.wait_timeout(Duration::from_secs(5)));
assert!(handle2.wait_timeout(Duration::from_secs(5)));
assert!(handle3.wait_timeout(Duration::from_secs(5)));
assert!(pool.shutdown_and_wait(Duration::from_secs(5)));
(
execution_order.lock().unwrap().clone(),
cancelled_executed.load(Ordering::SeqCst),
follower_executed.load(Ordering::SeqCst),
)
}
fn sorted_completed_task_ids(
pool: &BlockingPool,
task_ids: &[usize],
delay: Duration,
) -> Vec<usize> {
let completed = Arc::new(Mutex::new(Vec::new()));
let mut handles = Vec::with_capacity(task_ids.len());
for &task_id in task_ids {
let completed = Arc::clone(&completed);
let handle = pool.spawn(move || {
std::thread::sleep(delay);
completed.lock().unwrap().push(task_id);
});
handles.push(handle);
}
for handle in handles {
handle.wait();
}
let mut result = completed.lock().unwrap().clone();
result.sort_unstable();
result
}
#[test]
fn mr_fifo_ordering_preservation() {
let pool = BlockingPool::new(1, 1);
const TASK_COUNT: usize = 10;
let completion_order = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut handles = Vec::new();
for task_id in 0..TASK_COUNT {
let completion_order = Arc::clone(&completion_order);
let handle = pool.spawn(move || {
std::thread::sleep(Duration::from_millis(10));
completion_order.lock().unwrap().push(task_id);
});
handles.push(handle);
}
for handle in handles {
handle.wait();
}
let final_order = completion_order.lock().unwrap().clone();
let expected_order: Vec<usize> = (0..TASK_COUNT).collect();
assert_eq!(
final_order, expected_order,
"Tasks should complete in FIFO order with single thread. Expected: {:?}, Got: {:?}",
expected_order, final_order
);
}
#[test]
fn mr_permutation_invariance_multiple_threads() {
let pool = BlockingPool::new(2, 4);
let task_ids = vec![1, 2, 3, 4, 5, 6, 7, 8];
let original_completed =
sorted_completed_task_ids(&pool, &task_ids, Duration::from_millis(50));
let mut reversed_ids = task_ids.clone();
reversed_ids.reverse();
let reversed_completed =
sorted_completed_task_ids(&pool, &reversed_ids, Duration::from_millis(50));
assert_eq!(
original_completed, reversed_completed,
"Same set of tasks should complete regardless of submission order"
);
assert_eq!(
original_completed, task_ids,
"All submitted tasks should complete exactly once"
);
}
#[test]
fn mr_thread_scaling_consistency() {
let tasks = vec![10, 20, 30, 40, 50];
let minimal_pool = BlockingPool::new(1, 1);
let minimal_results =
sorted_completed_task_ids(&minimal_pool, &tasks, Duration::from_millis(30));
let maximal_pool = BlockingPool::new(tasks.len(), tasks.len());
let maximal_results =
sorted_completed_task_ids(&maximal_pool, &tasks, Duration::from_millis(30));
assert_eq!(
minimal_results, maximal_results,
"Thread count should not affect which tasks complete"
);
assert_eq!(
minimal_results, tasks,
"All tasks should complete regardless of thread count"
);
}
#[test]
fn mr_cancellation_monotonicity() {
let pool = BlockingPool::new(2, 2);
let task_count = 6;
let all_completed = {
let completed = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _task_id in 0..task_count {
let completed = Arc::clone(&completed);
let handle = pool.spawn(move || {
std::thread::sleep(Duration::from_millis(100));
completed.fetch_add(1, Ordering::Relaxed);
});
handles.push(handle);
}
for handle in handles {
handle.wait();
}
completed.load(Ordering::Relaxed)
};
let partial_completed = {
let completed = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for task_id in 0..task_count {
let completed = Arc::clone(&completed);
let handle = pool.spawn(move || {
std::thread::sleep(Duration::from_millis(100));
completed.fetch_add(1, Ordering::Relaxed);
});
if task_id % 2 == 0 {
handle.cancel();
}
handles.push(handle);
}
for handle in handles {
handle.wait();
}
completed.load(Ordering::Relaxed)
};
assert!(
partial_completed <= all_completed,
"Cancelling tasks should not increase completion count. All: {}, Partial: {}",
all_completed,
partial_completed
);
}
#[test]
fn mr_load_distribution_fairness() {
let pool = BlockingPool::new(3, 3);
let task_count = 12;
let thread_assignments = Arc::new(std::sync::Mutex::new(HashMap::new()));
let mut handles = Vec::new();
for _i in 0..task_count {
let thread_assignments = Arc::clone(&thread_assignments);
let handle = pool.spawn(move || {
let thread_id = std::thread::current().id();
std::thread::sleep(Duration::from_millis(50));
let mut assignments = thread_assignments.lock().unwrap();
let count = assignments.entry(thread_id).or_insert(0);
*count += 1;
});
handles.push(handle);
}
for handle in handles {
handle.wait();
}
let assignments = thread_assignments.lock().unwrap();
let task_counts: Vec<usize> = assignments.values().copied().collect();
assert!(
!task_counts.is_empty(),
"At least one thread should have processed tasks"
);
let max_tasks_per_thread = *task_counts.iter().max().unwrap();
let min_tasks_per_thread = *task_counts.iter().min().unwrap();
let expected_tasks_per_thread = task_count / 3; let fairness_threshold = 2;
assert!(
max_tasks_per_thread <= expected_tasks_per_thread + fairness_threshold,
"Load distribution too uneven. Max: {}, Expected: {}, Threshold: {}",
max_tasks_per_thread,
expected_tasks_per_thread,
fairness_threshold
);
assert!(
max_tasks_per_thread - min_tasks_per_thread <= fairness_threshold,
"Thread load variance too high. Max: {}, Min: {}, Threshold: {}",
max_tasks_per_thread,
min_tasks_per_thread,
fairness_threshold
);
}
#[test]
fn mr_priority_invariance() {
let pool = BlockingPool::new(2, 2);
let mixed_priority_results = {
let completed = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut handles = Vec::new();
let priorities = vec![1, 255, 128, 50, 200]; for (i, &priority) in priorities.iter().enumerate() {
let completed = Arc::clone(&completed);
let handle = pool.spawn_with_priority(
move || {
std::thread::sleep(Duration::from_millis(50));
completed.lock().unwrap().push(i);
},
priority,
);
handles.push(handle);
}
for handle in handles {
handle.wait();
}
let mut result = completed.lock().unwrap().clone();
result.sort_unstable();
result
};
let same_priority_results = {
let completed = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut handles = Vec::new();
for i in 0..5 {
let completed = Arc::clone(&completed);
let handle = pool.spawn_with_priority(
move || {
std::thread::sleep(Duration::from_millis(50));
completed.lock().unwrap().push(i);
},
128,
); handles.push(handle);
}
for handle in handles {
handle.wait();
}
let mut result = completed.lock().unwrap().clone();
result.sort_unstable();
result
};
assert_eq!(
mixed_priority_results, same_priority_results,
"Priority should not affect completion in current implementation"
);
assert_eq!(
mixed_priority_results,
vec![0, 1, 2, 3, 4],
"All tasks should complete regardless of priority"
);
}
#[test]
fn mr_repeated_cancellation_preserves_follower_progress() {
let (single_cancel_order, single_cancelled_executed, single_follower_executed) =
run_queued_cancellation_scenario(1);
let (repeated_cancel_order, repeated_cancelled_executed, repeated_follower_executed) =
run_queued_cancellation_scenario(4);
assert!(
!single_cancelled_executed && !repeated_cancelled_executed,
"queued cancelled task must stay skipped regardless of repeated cancel calls"
);
assert!(
single_follower_executed && repeated_follower_executed,
"follower task must still execute after the blocked worker is released"
);
assert_eq!(
single_cancel_order,
vec![1, 3],
"single cancellation should preserve blocker-then-follower execution order"
);
assert_eq!(
repeated_cancel_order,
vec![1, 3],
"repeated cancellation should preserve blocker-then-follower execution order"
);
assert_eq!(
repeated_cancel_order, single_cancel_order,
"repeating cancellation must not perturb survivor execution order"
);
}
#[test]
fn mr_spawn_blocking_cancellation_states() {
use crate::runtime::{RuntimeBuilder, spawn_blocking};
use futures_lite::future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
let rt = RuntimeBuilder::new()
.worker_threads(1)
.blocking_threads(1, 1)
.build()
.unwrap();
let executed_queued = Arc::new(AtomicBool::new(false));
let executed_running = Arc::new(AtomicBool::new(false));
let c_queued = Arc::clone(&executed_queued);
let c_running = Arc::clone(&executed_running);
let blocker_running = Arc::new(AtomicBool::new(false));
let blocker_release = Arc::new(AtomicBool::new(false));
let b_running = Arc::clone(&blocker_running);
let b_release = Arc::clone(&blocker_release);
let running_task_started = Arc::new(AtomicBool::new(false));
let r_started = Arc::clone(&running_task_started);
let request_cx = rt.request_cx_with_budget(crate::types::Budget::INFINITE);
rt.block_on_with_cx(request_cx, async move {
let _blocker_task = crate::runtime::builder::Runtime::current_handle()
.unwrap()
.spawn(async move {
spawn_blocking(move || {
b_running.store(true, Ordering::SeqCst);
while !b_release.load(Ordering::SeqCst) {
std::thread::sleep(Duration::from_millis(1));
}
})
.await;
});
while !blocker_running.load(Ordering::SeqCst) {
crate::runtime::yield_now::yield_now().await;
}
let queued_fut = spawn_blocking(move || {
c_queued.store(true, Ordering::SeqCst);
});
let mut pin_queued = Box::pin(queued_fut);
let waker = futures_lite::future::block_on(future::poll_fn(|cx| {
std::task::Poll::Ready(cx.waker().clone())
}));
let mut ctx = std::task::Context::from_waker(&waker);
let _ = std::future::Future::poll(pin_queued.as_mut(), &mut ctx);
drop(pin_queued);
blocker_release.store(true, Ordering::SeqCst);
crate::time::sleep(crate::types::Time::ZERO, Duration::from_millis(100)).await;
let running_fut = spawn_blocking(move || {
r_started.store(true, Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(50));
c_running.store(true, Ordering::SeqCst);
});
let mut pin_running = Box::pin(running_fut);
let _ = std::future::Future::poll(pin_running.as_mut(), &mut ctx);
while !running_task_started.load(Ordering::SeqCst) {
crate::runtime::yield_now::yield_now().await;
}
drop(pin_running);
});
let deadline = Instant::now() + Duration::from_secs(1);
while !executed_running.load(Ordering::SeqCst) && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(1));
}
assert!(
!executed_queued.load(Ordering::SeqCst),
"Queued spawn_blocking must not execute if dropped before starting (hard cancellation)"
);
assert!(
executed_running.load(Ordering::SeqCst),
"Running spawn_blocking must run to completion even if dropped (soft cancellation)"
);
}
}