use revue::worker::{Priority, WorkerConfig, WorkerPool};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn poll_until<F>(mut condition: F, timeout_ms: u64) -> bool
where
F: FnMut() -> bool,
{
let start = Instant::now();
let timeout = Duration::from_millis(timeout_ms);
let poll_interval = Duration::from_millis(2);
while start.elapsed() < timeout {
if condition() {
return true;
}
thread::sleep(poll_interval);
}
false
}
#[test]
fn test_pool_config_threads() {
let pool = WorkerPool::new(4);
assert_eq!(pool.thread_count(), 4);
}
#[test]
fn test_pool_config_queue_capacity() {
let config = WorkerConfig {
threads: 2,
queue_capacity: 100,
default_timeout_ms: None,
};
let pool = WorkerPool::with_config(config.clone());
assert_eq!(pool.thread_count(), 2);
for _ in 0..100 {
assert!(pool.submit(|| {}));
}
}
#[test]
fn test_pool_config_timeout() {
let config = WorkerConfig {
threads: 1,
queue_capacity: 10,
default_timeout_ms: Some(1000),
};
let pool = WorkerPool::with_config(config);
assert_eq!(pool.thread_count(), 1);
}
#[test]
fn test_pool_default() {
let pool = WorkerPool::default();
assert!(pool.thread_count() >= 1);
}
#[test]
fn test_pool_with_threads() {
let pool = WorkerPool::new(8);
assert_eq!(pool.thread_count(), 8);
}
#[test]
fn test_pool_submit_full_queue() {
let config = WorkerConfig {
threads: 1,
queue_capacity: 3,
default_timeout_ms: Some(100),
};
let pool = WorkerPool::with_config(config);
let barrier = Arc::new(AtomicUsize::new(0));
let barrier_clone = barrier.clone();
pool.submit(move || {
while barrier_clone.load(Ordering::SeqCst) == 0 {
thread::sleep(Duration::from_millis(10));
}
});
thread::sleep(Duration::from_millis(50));
for _ in 0..3 {
assert!(pool.submit(|| {}));
}
assert!(!pool.submit(|| {}));
barrier.store(1, Ordering::SeqCst);
}
#[test]
fn test_pool_submit_priority_ordering() {
let pool = WorkerPool::new(1);
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let barrier = Arc::new(AtomicUsize::new(0));
let barrier_clone = barrier.clone();
pool.submit(move || {
while barrier_clone.load(Ordering::SeqCst) == 0 {
thread::sleep(Duration::from_millis(1));
}
});
poll_until(|| barrier.load(Ordering::SeqCst) == 0, 50);
let order1 = order.clone();
pool.submit_with_priority(
move || {
order1.lock().unwrap().push("low");
},
Priority::Low,
);
let order2 = order.clone();
pool.submit_with_priority(
move || {
order2.lock().unwrap().push("normal");
},
Priority::Normal,
);
let order3 = order.clone();
pool.submit_with_priority(
move || {
order3.lock().unwrap().push("high");
},
Priority::High,
);
barrier.store(1, Ordering::SeqCst);
poll_until(
|| {
let result = order.lock().unwrap();
result.len() == 3
},
500,
);
pool.shutdown();
let result = order.lock().unwrap();
assert_eq!(result[0], "high");
assert_eq!(result[1], "normal");
assert_eq!(result[2], "low");
}
#[test]
fn test_pool_submit_many_tasks() {
let pool = WorkerPool::new(4);
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..100 {
let counter = counter.clone();
pool.submit(move || {
counter.fetch_add(1, Ordering::SeqCst);
});
}
poll_until(|| counter.load(Ordering::SeqCst) == 100, 2000);
assert_eq!(counter.load(Ordering::SeqCst), 100);
}
#[test]
fn test_pool_shutdown_graceful() {
let pool = WorkerPool::new(2);
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..10 {
let counter = counter.clone();
pool.submit(move || {
counter.fetch_add(1, Ordering::SeqCst);
thread::sleep(Duration::from_millis(10));
});
}
pool.shutdown();
poll_until(|| counter.load(Ordering::SeqCst) > 0, 500);
assert!(counter.load(Ordering::SeqCst) > 0);
}
#[test]
fn test_pool_submit_after_shutdown() {
let pool = WorkerPool::new(1);
pool.shutdown();
assert!(!pool.submit(|| {}));
assert!(!pool.submit_with_priority(|| {}, Priority::High));
}
#[test]
fn test_priority_fifo_same_priority() {
let pool = WorkerPool::new(1);
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let barrier = Arc::new(AtomicUsize::new(0));
let barrier_clone = barrier.clone();
pool.submit(move || {
while barrier_clone.load(Ordering::SeqCst) == 0 {
thread::sleep(Duration::from_millis(1));
}
});
poll_until(|| barrier.load(Ordering::SeqCst) == 0, 50);
for i in 0..5 {
let order = order.clone();
pool.submit_with_priority(
move || {
order.lock().unwrap().push(i);
},
Priority::Normal,
);
}
barrier.store(1, Ordering::SeqCst);
poll_until(
|| {
let result = order.lock().unwrap();
result.len() == 5
},
500,
);
pool.shutdown();
let result = order.lock().unwrap();
assert_eq!(*result, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_priority_high_preempts_low() {
let pool = WorkerPool::new(1);
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let barrier = Arc::new(AtomicUsize::new(0));
let barrier_clone = barrier.clone();
pool.submit(move || {
while barrier_clone.load(Ordering::SeqCst) == 0 {
thread::sleep(Duration::from_millis(1));
}
});
poll_until(|| barrier.load(Ordering::SeqCst) == 0, 50);
let order1 = order.clone();
pool.submit_with_priority(
move || {
order1.lock().unwrap().push("low1");
},
Priority::Low,
);
let order2 = order.clone();
pool.submit_with_priority(
move || {
order2.lock().unwrap().push("high");
},
Priority::High,
);
let order3 = order.clone();
pool.submit_with_priority(
move || {
order3.lock().unwrap().push("low2");
},
Priority::Low,
);
barrier.store(1, Ordering::SeqCst);
poll_until(
|| {
let result = order.lock().unwrap();
result.len() == 3
},
500,
);
pool.shutdown();
let result = order.lock().unwrap();
assert_eq!(result[0], "high");
}
#[test]
fn test_pool_active_workers() {
let pool = WorkerPool::new(4);
let _active = pool.active_workers();
}
#[test]
fn test_pool_queue_length() {
let config = WorkerConfig {
threads: 1,
queue_capacity: 100,
default_timeout_ms: None,
};
let pool = WorkerPool::with_config(config);
let barrier = Arc::new(AtomicUsize::new(0));
let barrier_clone = barrier.clone();
pool.submit(move || {
while barrier_clone.load(Ordering::SeqCst) == 0 {
thread::sleep(Duration::from_millis(1));
}
});
poll_until(|| barrier.load(Ordering::SeqCst) == 0, 50);
for _ in 0..10 {
pool.submit(|| {});
}
let queue_len = pool.queue_len();
assert!(queue_len > 0);
barrier.store(1, Ordering::SeqCst);
}
#[test]
fn test_pool_is_shutdown() {
let pool = WorkerPool::new(2);
assert!(!pool.is_shutdown());
pool.shutdown();
assert!(pool.is_shutdown());
}
#[test]
fn test_pool_multiple_shutdowns() {
let pool = WorkerPool::new(2);
pool.shutdown();
pool.shutdown();
assert!(pool.is_shutdown());
}
#[test]
fn test_pool_concurrent_submissions() {
let pool = Arc::new(WorkerPool::new(4));
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..4 {
let pool = pool.clone();
let counter = counter.clone();
let handle = thread::spawn(move || {
for _ in 0..25 {
let counter = counter.clone();
pool.submit(move || {
counter.fetch_add(1, Ordering::SeqCst);
});
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
poll_until(|| counter.load(Ordering::SeqCst) == 100, 2000);
assert_eq!(counter.load(Ordering::SeqCst), 100);
}