use desync::scheduler::*;
use super::timeout::*;
use std::thread;
use std::time::*;
use std::sync::*;
use std::sync::mpsc::*;
#[test]
#[cfg(not(miri))] fn suspend_queue_with_background_thread() {
for _x in 0..1000 {
timeout(|| {
use futures::executor;
let queue = queue();
let scheduler = scheduler();
let (tx, rx) = channel();
let pos = Arc::new(Mutex::new(0));
let pos2 = pos.clone();
let tx2 = tx.clone();
desync(&queue, move || { tx2.send(*pos2.lock().unwrap()).unwrap(); });
let suspended = scheduler.suspend(&queue);
let resumer = executor::block_on(suspended).unwrap();
let pos2 = pos.clone();
let tx2 = tx.clone();
desync(&queue, move || { tx2.send(*pos2.lock().unwrap()).unwrap(); });
assert!(rx.recv().unwrap() == 0);
thread::yield_now();
*pos.lock().unwrap() = 1;
resumer.resume();
assert!(rx.recv().unwrap() == 1);
}, 500);
}
}
#[test]
fn suspend_queue_with_local_drain() {
timeout(|| {
use futures::executor;
let scheduler = Arc::new(Scheduler::new());
scheduler.set_max_threads(0);
scheduler.despawn_threads_if_overloaded();
let queue = scheduler.create_job_queue();
scheduler.desync(&queue, ||{});
let suspended = scheduler.suspend(&queue);
let resumer = executor::block_on(suspended).unwrap();
let resume_scheduler = scheduler.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
resumer.resume();
resume_scheduler.set_max_threads(1);
});
assert!(scheduler.sync(&queue, || 42) == 42);
}, 500);
}
#[test]
fn safe_to_drop_suspended_queue() {
timeout(|| {
use futures::executor;
let queue = queue();
let scheduler = scheduler();
let pos = Arc::new(Mutex::new(0));
let pos2 = pos.clone();
desync(&queue, move || { let mut pos2 = pos2.lock().unwrap(); *pos2 += 1 });
let suspended = scheduler.suspend(&queue);
let pos2 = pos.clone();
desync(&queue, move || { let mut pos2 = pos2.lock().unwrap(); *pos2 += 1 });
while *pos.lock().unwrap() == 0 {
thread::sleep(Duration::from_millis(100));
}
assert!(*pos.lock().unwrap() == 1);
executor::block_on(suspended).unwrap();
}, 500);
}