use std::collections::BinaryHeap;
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use std::cmp::Ordering;
use delayed::Delayed;
#[derive(Debug)]
pub struct DelayQueue<T: Delayed> {
shared_data: Arc<DelayQueueSharedData<T>>,
}
#[derive(Debug)]
struct DelayQueueSharedData<T: Delayed> {
queue: Mutex<BinaryHeap<Entry<T>>>,
condvar_new_head: Condvar,
}
impl<T: Delayed> DelayQueue<T> {
pub fn new() -> DelayQueue<T> {
DelayQueue {
shared_data: Arc::new(DelayQueueSharedData {
queue: Mutex::new(BinaryHeap::new()),
condvar_new_head: Condvar::new(),
}),
}
}
pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
DelayQueue {
shared_data: Arc::new(DelayQueueSharedData {
queue: Mutex::new(BinaryHeap::with_capacity(capacity)),
condvar_new_head: Condvar::new(),
}),
}
}
pub fn push(&mut self, item: T) {
let mut queue = self.shared_data.queue.lock().unwrap();
{
let cur_head = queue.peek();
if (cur_head == None)
|| (item.delayed_until() < cur_head.unwrap().delayed.delayed_until())
{
self.shared_data.condvar_new_head.notify_one();
}
}
queue.push(Entry::new(item));
}
pub fn pop(&mut self) -> T {
let mut queue = self.shared_data.queue.lock().unwrap();
loop {
let wait_duration = match queue.peek() {
Some(elem) => {
let now = Instant::now();
if elem.delayed.delayed_until() <= now {
break;
}
elem.delayed.delayed_until() - now
}
None => Duration::from_secs(0),
};
queue = if wait_duration > Duration::from_secs(0) {
self.shared_data
.condvar_new_head
.wait_timeout(queue, wait_duration)
.unwrap()
.0
} else {
self.shared_data.condvar_new_head.wait(queue).unwrap()
};
}
self.force_pop(queue)
}
pub fn try_pop_for(&mut self, timeout: Duration) -> Option<T> {
self.try_pop_until(Instant::now() + timeout)
}
pub fn try_pop_until(&mut self, try_until: Instant) -> Option<T> {
let mut queue = self.shared_data.queue.lock().unwrap();
loop {
let now = Instant::now();
let next_elem_duration = match queue.peek() {
Some(elem) if elem.delayed.delayed_until() <= now => break,
Some(elem) => elem.delayed.delayed_until() - now,
None => Duration::from_secs(0),
};
if now >= try_until {
return None;
}
let time_left = try_until - now;
let wait_duration = if next_elem_duration > Duration::from_secs(0) {
next_elem_duration.min(time_left)
} else {
time_left
};
queue = self.shared_data
.condvar_new_head
.wait_timeout(queue, wait_duration)
.unwrap()
.0
}
Some(self.force_pop(queue))
}
pub fn is_empty(&self) -> bool {
let queue = self.shared_data.queue.lock().unwrap();
queue.is_empty()
}
fn force_pop(&self, mut queue: MutexGuard<BinaryHeap<Entry<T>>>) -> T {
if queue.len() > 1 {
self.shared_data.condvar_new_head.notify_one();
}
queue.pop().unwrap().delayed
}
}
impl<T: Delayed> Default for DelayQueue<T> {
fn default() -> DelayQueue<T> {
DelayQueue::new()
}
}
impl<T: Delayed> Clone for DelayQueue<T> {
fn clone(&self) -> DelayQueue<T> {
DelayQueue {
shared_data: self.shared_data.clone(),
}
}
}
#[derive(Debug)]
struct Entry<T: Delayed> {
delayed: T,
}
impl<T: Delayed> Entry<T> {
fn new(delayed: T) -> Entry<T> {
Entry { delayed }
}
}
impl<T: Delayed> Ord for Entry<T> {
fn cmp(&self, other: &Entry<T>) -> Ordering {
other
.delayed
.delayed_until()
.cmp(&self.delayed.delayed_until())
}
}
impl<T: Delayed> PartialOrd for Entry<T> {
fn partial_cmp(&self, other: &Entry<T>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: Delayed> PartialEq for Entry<T> {
fn eq(&self, other: &Entry<T>) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<T: Delayed> Eq for Entry<T> {}
#[cfg(test)]
mod tests {
extern crate timebomb;
use self::timebomb::timeout_ms;
use std::time::{Duration, Instant};
use std::thread;
use delayed::Delay;
use super::{DelayQueue, Entry};
#[test]
fn entry_comparisons() {
let delayed_one_hour = Entry::new(Delay::for_duration("abc", Duration::from_secs(3600)));
let delayed_now = Entry::new(Delay::for_duration("def", Duration::from_secs(0)));
assert_eq!(delayed_now, delayed_now);
assert_ne!(delayed_now, delayed_one_hour);
assert!(delayed_now > delayed_one_hour);
assert!(delayed_one_hour < delayed_now);
assert!(delayed_one_hour <= delayed_one_hour);
}
#[test]
fn is_empty() {
timeout_ms(
|| {
let mut queue = DelayQueue::new();
assert!(queue.is_empty());
queue.push(Delay::until_instant("1st", Instant::now()));
assert!(!queue.is_empty());
assert_eq!(queue.pop().value, "1st");
assert!(queue.is_empty());
},
1000,
);
}
#[test]
fn push_pop_single_thread() {
timeout_ms(
|| {
let mut queue = DelayQueue::new();
let delay1 = Delay::until_instant("1st", Instant::now());
let delay2 = Delay::for_duration("2nd", Duration::from_millis(20));
let delay3 = Delay::for_duration("3rd", Duration::from_millis(30));
let delay4 = Delay::for_duration("4th", Duration::from_millis(40));
queue.push(delay2);
queue.push(delay4);
queue.push(delay1);
assert_eq!(queue.pop().value, "1st");
assert_eq!(queue.pop().value, "2nd");
queue.push(delay3);
assert_eq!(queue.pop().value, "3rd");
assert_eq!(queue.pop().value, "4th");
assert!(queue.is_empty());
},
1000,
);
}
#[test]
fn push_pop_different_thread() {
timeout_ms(
|| {
let mut queue = DelayQueue::new();
let delay1 = Delay::until_instant("1st", Instant::now());
let delay2 = Delay::for_duration("2nd", Duration::from_millis(20));
let delay3 = Delay::for_duration("3rd", Duration::from_millis(30));
let delay4 = Delay::for_duration("4th", Duration::from_millis(40));
queue.push(delay2);
queue.push(delay3);
queue.push(delay1);
let mut cloned_queue = queue.clone();
let handle = thread::spawn(move || {
assert_eq!(cloned_queue.pop().value, "1st");
assert_eq!(cloned_queue.pop().value, "2nd");
assert_eq!(cloned_queue.pop().value, "3rd");
assert_eq!(cloned_queue.pop().value, "4th");
assert!(cloned_queue.is_empty());
});
queue.push(delay4);
handle.join().unwrap();
assert!(queue.is_empty());
},
1000,
);
}
#[test]
fn pop_before_push() {
timeout_ms(
|| {
let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
let mut cloned_queue = queue.clone();
let handle = thread::spawn(move || {
assert_eq!(cloned_queue.pop().value, "1st");
assert!(cloned_queue.is_empty());
});
thread::sleep(Duration::from_millis(100));
queue.push(Delay::for_duration("1st", Duration::from_millis(10)));
handle.join().unwrap();
assert!(queue.is_empty());
},
1000,
);
}
#[test]
fn pop_two_before_push() {
timeout_ms(
|| {
let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
let mut handles = vec![];
for _ in 0..3 {
let mut queue = queue.clone();
let handle = thread::spawn(move || {
let val = queue.pop().value;
if val == "3rd" {
assert!(queue.is_empty());
}
});
handles.push(handle);
}
thread::sleep(Duration::from_millis(100));
queue.push(Delay::for_duration("1st", Duration::from_millis(10)));
queue.push(Delay::for_duration("2nd", Duration::from_millis(20)));
queue.push(Delay::for_duration("3rd", Duration::from_millis(30)));
for handle in handles {
handle.join().unwrap();
}
assert!(queue.is_empty());
},
1000,
);
}
#[test]
fn push_higher_priority_while_waiting_to_pop() {
timeout_ms(
|| {
let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
let delay1 = Delay::until_instant("1st", Instant::now());
let delay2 = Delay::for_duration("2nd", Duration::from_millis(100));
let mut cloned_queue = queue.clone();
let handle = thread::spawn(move || {
assert_eq!(cloned_queue.pop().value, "1st");
assert_eq!(cloned_queue.pop().value, "2nd");
assert!(cloned_queue.is_empty());
});
thread::sleep(Duration::from_millis(10));
queue.push(delay2);
thread::sleep(Duration::from_millis(10));
queue.push(delay1);
handle.join().unwrap();
assert!(queue.is_empty());
},
1000,
);
}
#[test]
fn try_pop_until_now() {
timeout_ms(
|| {
let mut queue = DelayQueue::new();
let delay1 = Delay::until_instant("1st", Instant::now());
let delay2 = Delay::for_duration("2nd", Duration::from_millis(500));
queue.push(delay1);
queue.push(delay2);
assert_eq!(queue.try_pop_until(Instant::now()).unwrap().value, "1st");
assert_eq!(queue.try_pop_until(Instant::now()), None);
assert!(!queue.is_empty());
},
1000,
);
}
#[test]
fn try_pop_for_zero_duration() {
timeout_ms(
|| {
let mut queue = DelayQueue::new();
let delay1 = Delay::until_instant("1st", Instant::now());
let delay2 = Delay::for_duration("2nd", Duration::from_millis(500));
queue.push(delay1);
queue.push(delay2);
assert_eq!(
queue.try_pop_for(Duration::from_millis(0)).unwrap().value,
"1st"
);
assert_eq!(queue.try_pop_for(Duration::from_millis(0)), None);
assert!(!queue.is_empty());
},
1000,
);
}
#[test]
fn try_pop_until() {
timeout_ms(
|| {
let mut queue = DelayQueue::new();
let delay1 = Delay::for_duration("1st", Duration::from_millis(100));
queue.push(delay1);
assert_eq!(
queue.try_pop_until(Instant::now() + Duration::from_millis(10)),
None
);
assert_eq!(
queue
.try_pop_until(Instant::now() + Duration::from_millis(200))
.unwrap()
.value,
"1st"
);
assert!(queue.is_empty());
assert_eq!(
queue.try_pop_until(Instant::now() + Duration::from_millis(10)),
None
);
},
1000,
);
}
#[test]
fn try_pop_for() {
timeout_ms(
|| {
let mut queue = DelayQueue::new();
let delay1 = Delay::for_duration("1st", Duration::from_millis(100));
queue.push(delay1);
assert_eq!(queue.try_pop_for(Duration::from_millis(10)), None);
assert_eq!(
queue.try_pop_for(Duration::from_millis(200)).unwrap().value,
"1st"
);
assert!(queue.is_empty());
assert_eq!(queue.try_pop_for(Duration::from_millis(10)), None);
},
1000,
);
}
#[test]
fn push_higher_priority_while_waiting_to_try_pop() {
timeout_ms(
|| {
let mut queue = DelayQueue::new();
let delay1 = Delay::until_instant("1st", Instant::now());
let delay2 = Delay::for_duration("2nd", Duration::from_millis(1000));
queue.push(delay2);
let mut cloned_queue = queue.clone();
let handle = thread::spawn(move || {
assert_eq!(
cloned_queue
.try_pop_for(Duration::from_millis(100))
.unwrap()
.value,
"1st"
);
assert!(!cloned_queue.is_empty());
});
thread::sleep(Duration::from_millis(20));
queue.push(delay1);
handle.join().unwrap();
},
1000,
);
}
}