extern crate task_queue;
use std::sync::mpsc;
use std::sync::{ Arc, Barrier, Condvar, Mutex };
use std::sync::atomic::{ AtomicUsize, Ordering };
use std::thread;
use std::time::Duration;
use task_queue::TaskQueue;
use task_queue::spawn_policy::DynamicSpawnPolicy;
use task_queue::spawn_policy::ManualSpawnPolicy;
#[test]
fn test_work() {
let (sender, reciver) = mpsc::channel::<()>();
let mut queue = TaskQueue::new();
for _ in 0..20 {
let sender_clone = sender.clone();
queue.enqueue(move || {
sender_clone.send(()).unwrap();
}).unwrap();
}
for _ in 0..20 {
reciver.recv().unwrap();
}
}
#[test]
fn test_stop_and_wait() {
let mut queue = TaskQueue::new();
for _ in 0..10 {
queue.enqueue(|| { }).unwrap();
}
queue.stop_wait();
}
#[test]
fn test_stop() {
let data = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(11));
let mut queue = TaskQueue::new();
for i in 0..20 {
let barrier_clone = barrier.clone();
let data_clone = data.clone();
queue.enqueue(move || {
if i < 10 {
barrier_clone.wait();
}
data_clone.fetch_add(1, Ordering::AcqRel);
}).unwrap();
}
let handles = queue.stop();
barrier.wait();
for handle in handles {
handle.join().unwrap();
}
let result = data.load(Ordering::Relaxed);
assert_eq!(result, 20);
}
#[test]
fn test_stop_immediately() {
let data = Arc::new(AtomicUsize::new(0));
let mut queue = TaskQueue::new();
for _ in 0..20 {
let clone = data.clone();
queue.enqueue(move || {
clone.fetch_add(1, Ordering::SeqCst);
}).unwrap();
}
let not_executed_tasks = queue.stop_immediately();
for t in ¬_executed_tasks {
t.run();
}
let num = data.load(Ordering::Relaxed);
assert_eq!(num, 20);
}
#[test]
#[ignore]
fn test_dynamic_policy_up() {
let queue = test_dynamic_policy(100, 10);
assert_eq!(queue.get_threads_count(), 10);
queue.stop_wait();
}
#[test]
#[ignore]
fn test_dynamic_policy_down() {
let queue = test_dynamic_policy(10, 100);
assert_eq!(queue.get_threads_count(), 5);
queue.stop_wait();
}
fn test_dynamic_policy(task_delay: u64, enqueue_delay: u64) -> TaskQueue {
const TASKS_COUNT : usize = 1000;
const POLICY_DELTA : u64 = 1000;
let mut queue = TaskQueue::with_threads(5, 10).expect("params is ok");
let condvar = Arc::new(Condvar::new());
let countdown = Arc::new(Mutex::new(TASKS_COUNT));
let policy = Box::new(DynamicSpawnPolicy::with_delta(Duration::from_millis(POLICY_DELTA)));
queue.set_spawn_policy(policy);
for _ in 0..TASKS_COUNT {
let countdown_clone = countdown.clone();
let condvar_clone = condvar.clone();
queue.enqueue(move || {
thread::sleep(Duration::from_millis(task_delay));
let mut guard = countdown_clone.lock().unwrap();
*guard -= 1;
if *guard == 0 {
condvar_clone.notify_one();
}
}).unwrap();
thread::sleep(Duration::from_millis(enqueue_delay));
}
{
let mut guard = countdown.lock().unwrap();
while *guard != 0 {
guard = condvar.wait(guard).unwrap();
}
}
queue
}
#[test]
fn test_dynamic_close_thread() {
let mut queue = TaskQueue::with_threads(1, 2).expect("params is ok");
let mut policy = ManualSpawnPolicy::with_threads(1);
let mut controller = policy.get_controller();
let barrier = Arc::new(Barrier::new(3));
queue.set_spawn_policy(Box::new(policy));
controller.add_thread();
for _ in 0..2 {
let barrier_clone = barrier.clone();
queue.enqueue(move || {
barrier_clone.wait();
barrier_clone.wait();
}).unwrap();
}
barrier.wait();
controller.remove_thread();
queue.enqueue(|| {}).unwrap();
let threads_count = queue.get_threads_count();
let threads = queue.stop();
assert_eq!(threads_count, 1);
assert_eq!(threads.len(), 2);
barrier.wait();
for handle in threads {
handle.join().unwrap();
}
}