use parking_lot::{Condvar, Mutex};
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::ptr;
use std::time::Instant;
struct Node {
cvar: Condvar,
next: *mut Node,
prev: *mut Node,
}
struct State<T> {
head: *mut Node,
jobs: VecDeque<T>,
}
unsafe impl<T> Sync for State<T> where T: Sync {}
unsafe impl<T> Send for State<T> where T: Send {}
pub struct JobQueue<T> {
state: Mutex<State<T>>,
}
impl<T> JobQueue<T> {
pub fn new() -> Self {
JobQueue {
state: Mutex::new(State {
head: ptr::null_mut(),
jobs: VecDeque::new(),
}),
}
}
pub fn len(&self) -> usize {
self.state.lock().jobs.len()
}
pub fn push(&self, job: T) {
let mut state = self.state.lock();
state.jobs.push_back(job);
unsafe {
if !state.head.is_null() {
let woken = state.head;
state.head = (*woken).next;
if !state.head.is_null() {
(*state.head).prev = ptr::null_mut();
}
(*woken).next = ptr::null_mut();
(*woken).cvar.notify_one();
}
}
}
pub fn try_pop(&self) -> Option<T> {
self.state.lock().jobs.pop_front()
}
pub fn pop_until(&self, timeout: Instant) -> Option<T> {
let mut state = self.state.lock();
loop {
if let Some(job) = state.jobs.pop_front() {
return Some(job);
}
let node = UnsafeCell::new(Node {
cvar: Condvar::new(),
next: state.head,
prev: ptr::null_mut(),
});
unsafe {
if !state.head.is_null() {
(*state.head).prev = node.get();
}
state.head = node.get();
let result = (*node.get()).cvar.wait_until(&mut state, timeout);
if !(*node.get()).next.is_null() {
(*(*node.get()).next).prev = (*node.get()).prev;
}
if !(*node.get()).prev.is_null() {
(*(*node.get()).prev).next = (*node.get()).next;
} else if state.head == node.get() {
state.head = (*node.get()).next;
}
if result.timed_out() {
return None;
}
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[test]
fn single_threaded() {
let queue = JobQueue::new();
queue.push(0);
assert_eq!(queue.try_pop(), Some(0));
assert_eq!(queue.try_pop(), None);
queue.push(1);
let start = Instant::now();
assert_eq!(queue.pop_until(start - Duration::from_millis(10)), Some(1));
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_millis(10));
let start = Instant::now();
assert_eq!(queue.pop_until(start + Duration::from_millis(10)), None);
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(10) && elapsed < Duration::from_millis(20));
}
#[test]
fn wake_lifo() {
let queue = Arc::new(JobQueue::<i32>::new());
let handle1 = thread::spawn({
let queue = queue.clone();
move || queue.pop_until(Instant::now() + Duration::from_millis(1500))
});
let handle2 = thread::spawn({
let queue = queue.clone();
move || {
thread::sleep(Duration::from_millis(100));
queue.pop_until(Instant::now() + Duration::from_millis(1500))
}
});
thread::sleep(Duration::from_secs(1));
queue.push(0);
assert_eq!(handle1.join().unwrap(), None);
assert_eq!(handle2.join().unwrap(), Some(0));
}
}