delay_queue/delay_queue.rs
1use std::collections::BinaryHeap;
2use std::sync::{Arc, Condvar, Mutex, MutexGuard};
3use std::time::{Duration, Instant};
4use std::cmp::Ordering;
5use delayed::Delayed;
6
7/// A concurrent unbounded blocking queue where each item can only be removed when its delay
8/// expires.
9///
10/// The queue supports multiple producers and multiple consumers.
11///
12/// Items of the queue must implement the `Delayed` trait. In most situations you can just use
13/// the helper struct `Delay` to wrap the values to be used by the queue.
14///
15/// If you implement the `Delayed` trait for your types, keep in mind that the `DelayQueue` assumes
16/// that the `Instant` until which each item is delayed does not change while that item is
17/// in the queue.
18///
19/// # Examples
20///
21/// Basic usage:
22///
23/// ```no_run
24/// use delay_queue::{Delay, DelayQueue};
25/// use std::time::{Duration, Instant};
26///
27/// let mut queue = DelayQueue::new();
28/// queue.push(Delay::for_duration("2nd", Duration::from_secs(5)));
29/// queue.push(Delay::until_instant("1st", Instant::now()));
30///
31/// println!("First pop: {}", queue.pop().value);
32/// println!("Second pop: {}", queue.pop().value);
33/// assert!(queue.is_empty());
34/// ```
35#[derive(Debug)]
36pub struct DelayQueue<T: Delayed> {
37 /// Points to the data that is shared between instances of the same queue (created by
38 /// cloning a queue). Usually the different instances of a queue will live in different
39 /// threads.
40 shared_data: Arc<DelayQueueSharedData<T>>,
41}
42
43/// The underlying data of a queue.
44///
45/// When a `DelayQueue` is cloned, it's clone will point to the same `DelayQueueSharedData`.
46/// This is done so a queue be used by different threads.
47#[derive(Debug)]
48struct DelayQueueSharedData<T: Delayed> {
49 /// Mutex protected `BinaryHeap` that holds the items of the queue in the order that they
50 /// should be popped.
51 queue: Mutex<BinaryHeap<Entry<T>>>,
52
53 /// Condition variable that signals when there is a new item at the head of the queue.
54 condvar_new_head: Condvar,
55}
56
57impl<T: Delayed> DelayQueue<T> {
58 /// Creates an empty `DelayQueue<T>`.
59 ///
60 /// # Examples
61 ///
62 /// Basic usage:
63 ///
64 /// ```
65 /// use delay_queue::{Delay, DelayQueue};
66 ///
67 /// let mut queue : DelayQueue<Delay<i32>> = DelayQueue::new();
68 /// ```
69 pub fn new() -> DelayQueue<T> {
70 DelayQueue {
71 shared_data: Arc::new(DelayQueueSharedData {
72 queue: Mutex::new(BinaryHeap::new()),
73 condvar_new_head: Condvar::new(),
74 }),
75 }
76 }
77
78 /// Creates an empty `DelayQueue<T>` with a specific capacity.
79 /// This preallocates enough memory for `capacity` elements,
80 /// so that the `DelayQueue` does not have to be reallocated
81 /// until it contains at least that many values.
82 ///
83 /// # Examples
84 ///
85 /// Basic usage:
86 ///
87 /// ```
88 /// use delay_queue::{Delay, DelayQueue};
89 ///
90 /// let mut queue : DelayQueue<Delay<&str>> = DelayQueue::with_capacity(10);
91 /// ```
92 pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
93 DelayQueue {
94 shared_data: Arc::new(DelayQueueSharedData {
95 queue: Mutex::new(BinaryHeap::with_capacity(capacity)),
96 condvar_new_head: Condvar::new(),
97 }),
98 }
99 }
100
101 /// Pushes an item onto the queue.
102 ///
103 /// # Examples
104 ///
105 /// Basic usage:
106 ///
107 /// ```
108 /// use delay_queue::{Delay, DelayQueue};
109 /// use std::time::Duration;
110 ///
111 /// let mut queue = DelayQueue::new();
112 /// queue.push(Delay::for_duration("2nd", Duration::from_secs(5)));
113 /// ```
114 pub fn push(&mut self, item: T) {
115 let mut queue = self.shared_data.queue.lock().unwrap();
116
117 {
118 // If the new item goes to the head of the queue then notify consumers
119 let cur_head = queue.peek();
120 if (cur_head == None)
121 || (item.delayed_until() < cur_head.unwrap().delayed.delayed_until())
122 {
123 self.shared_data.condvar_new_head.notify_one();
124 }
125 }
126
127 queue.push(Entry::new(item));
128 }
129
130 /// Pops the next item from the queue, blocking if necessary until an item is available and its
131 /// delay has expired.
132 ///
133 /// # Examples
134 ///
135 /// Basic usage:
136 ///
137 /// ```no_run
138 /// use delay_queue::{Delay, DelayQueue};
139 /// use std::time::{Duration, Instant};
140 ///
141 /// let mut queue = DelayQueue::new();
142 ///
143 /// queue.push(Delay::until_instant("1st", Instant::now()));
144 ///
145 /// // The pop will not block, since the delay has expired.
146 /// println!("First pop: {}", queue.pop().value);
147 ///
148 /// queue.push(Delay::for_duration("2nd", Duration::from_secs(5)));
149 ///
150 /// // The pop will block for approximately 5 seconds before returning the item.
151 /// println!("Second pop: {}", queue.pop().value);
152 /// ```
153 pub fn pop(&mut self) -> T {
154 let mut queue = self.shared_data.queue.lock().unwrap();
155
156 // Loop until an element can be popped, waiting if necessary
157 loop {
158 let wait_duration = match queue.peek() {
159 Some(elem) => {
160 let now = Instant::now();
161 // If there is an element and its delay is expired
162 // break out of the loop to pop it
163 if elem.delayed.delayed_until() <= now {
164 break;
165 }
166 // Otherwise, calculate the Duration until the element expires
167 elem.delayed.delayed_until() - now
168 }
169
170 // Signal that there is no element with a duration of zero
171 None => Duration::from_secs(0),
172 };
173
174 // Wait until there is a new head of the queue
175 // or the time to pop the current head expires
176 queue = if wait_duration > Duration::from_secs(0) {
177 self.shared_data
178 .condvar_new_head
179 .wait_timeout(queue, wait_duration)
180 .unwrap()
181 .0
182 } else {
183 self.shared_data.condvar_new_head.wait(queue).unwrap()
184 };
185 }
186
187 self.force_pop(queue)
188 }
189
190 /// Pops the next item from the queue, blocking if necessary until an item is available and its
191 /// delay has expired or until the given timeout expires.
192 ///
193 /// Returns `None` if the given timeout expires and no item became available to be popped.
194 ///
195 /// # Examples
196 ///
197 /// Basic usage:
198 ///
199 /// ```no_run
200 /// use delay_queue::{Delay, DelayQueue};
201 /// use std::time::Duration;
202 ///
203 /// let mut queue = DelayQueue::new();
204 ///
205 /// queue.push(Delay::for_duration("1st", Duration::from_secs(5)));
206 ///
207 /// // The pop will block for approximately 2 seconds before returning None.
208 /// println!("First pop: {:?}",
209 /// queue.try_pop_for(Duration::from_secs(2))); // Prints "None"
210 ///
211 /// // The pop will block for approximately 3 seconds before returning the item.
212 /// println!("Second pop: {}",
213 /// queue.try_pop_for(Duration::from_secs(5)).unwrap().value); // Prints "1st"
214 /// ```
215 pub fn try_pop_for(&mut self, timeout: Duration) -> Option<T> {
216 self.try_pop_until(Instant::now() + timeout)
217 }
218
219 /// Pops the next item from the queue, blocking if necessary until an item is available and its
220 /// delay has expired or until the given `Instant` is reached.
221 ///
222 /// Returns `None` if the given `Instant` is reached and no item became available to be popped.
223 ///
224 /// # Examples
225 ///
226 /// Basic usage:
227 ///
228 /// ```no_run
229 /// use delay_queue::{Delay, DelayQueue};
230 /// use std::time::{Duration, Instant};
231 ///
232 /// let mut queue = DelayQueue::new();
233 ///
234 /// queue.push(Delay::for_duration("1st", Duration::from_secs(5)));
235 ///
236 /// // The pop will block for approximately 2 seconds before returning None.
237 /// println!("First pop: {:?}",
238 /// queue.try_pop_until(Instant::now() + Duration::from_secs(2))); // Prints "None"
239 ///
240 /// // The pop will block for approximately 3 seconds before returning the item.
241 /// println!("Second pop: {}",
242 /// queue.try_pop_until(Instant::now() + Duration::from_secs(5))
243 /// .unwrap().value); // Prints "1st"
244 /// ```
245 pub fn try_pop_until(&mut self, try_until: Instant) -> Option<T> {
246 let mut queue = self.shared_data.queue.lock().unwrap();
247
248 // Loop until an element can be popped or the timeout expires, waiting if necessary
249 loop {
250 let now = Instant::now();
251
252 let next_elem_duration = match queue.peek() {
253 // If there is an element and its delay is expired, break out of the loop to pop it
254 Some(elem) if elem.delayed.delayed_until() <= now => break,
255
256 // Calculate the Duration until the element expires
257 Some(elem) => elem.delayed.delayed_until() - now,
258
259 // Signal that there is no element with a duration of zero
260 None => Duration::from_secs(0),
261 };
262
263 if now >= try_until {
264 return None;
265 }
266
267 let time_left = try_until - now;
268
269 let wait_duration = if next_elem_duration > Duration::from_secs(0) {
270 // We'll wait until the time to pop the next element is reached
271 // or our timeout expires, whichever comes first
272 next_elem_duration.min(time_left)
273 } else {
274 // There is no element in the queue, we'll wait for one until our timeout expires
275 time_left
276 };
277
278 // Wait until there is a new head of the queue,
279 // the time to pop the current head expires,
280 // or the timeout expires
281 queue = self.shared_data
282 .condvar_new_head
283 .wait_timeout(queue, wait_duration)
284 .unwrap()
285 .0
286 }
287
288 Some(self.force_pop(queue))
289 }
290
291 /// Checks if the queue is empty.
292 ///
293 /// # Examples
294 ///
295 /// Basic usage:
296 ///
297 /// ```
298 /// use delay_queue::{Delay, DelayQueue};
299 /// use std::time::Instant;
300 ///
301 /// let mut queue = DelayQueue::new();
302 /// queue.push(Delay::until_instant("val", Instant::now()));
303 ///
304 /// assert!(!queue.is_empty());
305 ///
306 /// println!("First pop: {}", queue.pop().value);
307 ///
308 /// assert!(queue.is_empty());
309 /// ```
310 pub fn is_empty(&self) -> bool {
311 let queue = self.shared_data.queue.lock().unwrap();
312 queue.is_empty()
313 }
314
315 /// Pops an element from the queue, notifying `condvar_new_head` if there are elements still
316 /// left in the queue.
317 ///
318 /// # Panics
319 ///
320 /// Panics if `queue` is empty.
321 fn force_pop(&self, mut queue: MutexGuard<BinaryHeap<Entry<T>>>) -> T {
322 if queue.len() > 1 {
323 self.shared_data.condvar_new_head.notify_one();
324 }
325
326 queue.pop().unwrap().delayed
327 }
328}
329
330impl<T: Delayed> Default for DelayQueue<T> {
331 /// Creates an empty `DelayQueue<T>`.
332 fn default() -> DelayQueue<T> {
333 DelayQueue::new()
334 }
335}
336
337impl<T: Delayed> Clone for DelayQueue<T> {
338 /// Returns a new `DelayQueue` that points to the same underlying data.
339 ///
340 /// This method can be used to share a queue between different threads.
341 ///
342 /// # Examples
343 ///
344 /// Basic usage:
345 ///
346 /// ```no_run
347 /// use delay_queue::{Delay, DelayQueue};
348 /// use std::time::Duration;
349 /// use std::thread;
350 ///
351 /// let mut queue = DelayQueue::new();
352 ///
353 /// queue.push(Delay::for_duration("1st", Duration::from_secs(1)));
354 ///
355 /// let mut cloned_queue = queue.clone();
356 ///
357 /// let handle = thread::spawn(move || {
358 /// println!("First pop: {}", cloned_queue.pop().value);
359 /// println!("Second pop: {}", cloned_queue.pop().value);
360 /// });
361 ///
362 /// queue.push(Delay::for_duration("2nd", Duration::from_secs(2)));
363 ///
364 /// handle.join().unwrap();
365 /// ```
366 fn clone(&self) -> DelayQueue<T> {
367 DelayQueue {
368 shared_data: self.shared_data.clone(),
369 }
370 }
371}
372
373
374/// An entry in the `DelayQueue`.
375///
376/// Holds a `Delayed` item and implements an ordering based on delay `Instant`s of the items.
377#[derive(Debug)]
378struct Entry<T: Delayed> {
379 delayed: T,
380}
381
382impl<T: Delayed> Entry<T> {
383 fn new(delayed: T) -> Entry<T> {
384 Entry { delayed }
385 }
386}
387
388/// Implements ordering for `Entry`, so it can be used to correctly order elements in the
389/// `BinaryHeap` of the `DelayQueue`.
390///
391/// Earlier entries have higher priority (should be popped first), so they are Greater that later
392/// entries.
393impl<T: Delayed> Ord for Entry<T> {
394 fn cmp(&self, other: &Entry<T>) -> Ordering {
395 other
396 .delayed
397 .delayed_until()
398 .cmp(&self.delayed.delayed_until())
399 }
400}
401
402impl<T: Delayed> PartialOrd for Entry<T> {
403 fn partial_cmp(&self, other: &Entry<T>) -> Option<Ordering> {
404 Some(self.cmp(other))
405 }
406}
407
408impl<T: Delayed> PartialEq for Entry<T> {
409 fn eq(&self, other: &Entry<T>) -> bool {
410 self.cmp(other) == Ordering::Equal
411 }
412}
413
414impl<T: Delayed> Eq for Entry<T> {}
415
416
417#[cfg(test)]
418mod tests {
419 extern crate timebomb;
420
421 use self::timebomb::timeout_ms;
422 use std::time::{Duration, Instant};
423 use std::thread;
424 use delayed::Delay;
425 use super::{DelayQueue, Entry};
426
427 #[test]
428 fn entry_comparisons() {
429 let delayed_one_hour = Entry::new(Delay::for_duration("abc", Duration::from_secs(3600)));
430 let delayed_now = Entry::new(Delay::for_duration("def", Duration::from_secs(0)));
431
432 assert_eq!(delayed_now, delayed_now);
433 assert_ne!(delayed_now, delayed_one_hour);
434
435 assert!(delayed_now > delayed_one_hour);
436 assert!(delayed_one_hour < delayed_now);
437 assert!(delayed_one_hour <= delayed_one_hour);
438 }
439
440 #[test]
441 fn is_empty() {
442 timeout_ms(
443 || {
444 let mut queue = DelayQueue::new();
445
446 assert!(queue.is_empty());
447
448 queue.push(Delay::until_instant("1st", Instant::now()));
449
450 assert!(!queue.is_empty());
451 assert_eq!(queue.pop().value, "1st");
452 assert!(queue.is_empty());
453 },
454 1000,
455 );
456 }
457
458 #[test]
459 fn push_pop_single_thread() {
460 timeout_ms(
461 || {
462 let mut queue = DelayQueue::new();
463
464 let delay1 = Delay::until_instant("1st", Instant::now());
465 let delay2 = Delay::for_duration("2nd", Duration::from_millis(20));
466 let delay3 = Delay::for_duration("3rd", Duration::from_millis(30));
467 let delay4 = Delay::for_duration("4th", Duration::from_millis(40));
468
469 queue.push(delay2);
470 queue.push(delay4);
471 queue.push(delay1);
472
473 assert_eq!(queue.pop().value, "1st");
474 assert_eq!(queue.pop().value, "2nd");
475
476 queue.push(delay3);
477
478 assert_eq!(queue.pop().value, "3rd");
479 assert_eq!(queue.pop().value, "4th");
480
481 assert!(queue.is_empty());
482 },
483 1000,
484 );
485 }
486
487 #[test]
488 fn push_pop_different_thread() {
489 timeout_ms(
490 || {
491 let mut queue = DelayQueue::new();
492
493 let delay1 = Delay::until_instant("1st", Instant::now());
494 let delay2 = Delay::for_duration("2nd", Duration::from_millis(20));
495 let delay3 = Delay::for_duration("3rd", Duration::from_millis(30));
496 let delay4 = Delay::for_duration("4th", Duration::from_millis(40));
497
498 queue.push(delay2);
499 queue.push(delay3);
500 queue.push(delay1);
501
502 let mut cloned_queue = queue.clone();
503
504 let handle = thread::spawn(move || {
505 assert_eq!(cloned_queue.pop().value, "1st");
506 assert_eq!(cloned_queue.pop().value, "2nd");
507 assert_eq!(cloned_queue.pop().value, "3rd");
508 assert_eq!(cloned_queue.pop().value, "4th");
509 assert!(cloned_queue.is_empty());
510 });
511
512 queue.push(delay4);
513
514 handle.join().unwrap();
515
516 assert!(queue.is_empty());
517 },
518 1000,
519 );
520 }
521
522 #[test]
523 fn pop_before_push() {
524 timeout_ms(
525 || {
526 let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
527
528 let mut cloned_queue = queue.clone();
529
530 let handle = thread::spawn(move || {
531 assert_eq!(cloned_queue.pop().value, "1st");
532 assert!(cloned_queue.is_empty());
533 });
534
535 thread::sleep(Duration::from_millis(100));
536 queue.push(Delay::for_duration("1st", Duration::from_millis(10)));
537
538 handle.join().unwrap();
539
540 assert!(queue.is_empty());
541 },
542 1000,
543 );
544 }
545
546 #[test]
547 fn pop_two_before_push() {
548 timeout_ms(
549 || {
550 let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
551 let mut handles = vec![];
552
553 for _ in 0..3 {
554 let mut queue = queue.clone();
555 let handle = thread::spawn(move || {
556 let val = queue.pop().value;
557 if val == "3rd" {
558 assert!(queue.is_empty());
559 }
560 });
561 handles.push(handle);
562 }
563
564 thread::sleep(Duration::from_millis(100));
565 queue.push(Delay::for_duration("1st", Duration::from_millis(10)));
566 queue.push(Delay::for_duration("2nd", Duration::from_millis(20)));
567 queue.push(Delay::for_duration("3rd", Duration::from_millis(30)));
568
569 for handle in handles {
570 handle.join().unwrap();
571 }
572
573 assert!(queue.is_empty());
574 },
575 1000,
576 );
577 }
578
579 #[test]
580 fn push_higher_priority_while_waiting_to_pop() {
581 timeout_ms(
582 || {
583 let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
584
585 let delay1 = Delay::until_instant("1st", Instant::now());
586 let delay2 = Delay::for_duration("2nd", Duration::from_millis(100));
587
588 let mut cloned_queue = queue.clone();
589
590 let handle = thread::spawn(move || {
591 assert_eq!(cloned_queue.pop().value, "1st");
592 assert_eq!(cloned_queue.pop().value, "2nd");
593 assert!(cloned_queue.is_empty());
594 });
595
596 thread::sleep(Duration::from_millis(10));
597 queue.push(delay2);
598 thread::sleep(Duration::from_millis(10));
599 queue.push(delay1);
600
601 handle.join().unwrap();
602
603 assert!(queue.is_empty());
604 },
605 1000,
606 );
607 }
608
609 #[test]
610 fn try_pop_until_now() {
611 timeout_ms(
612 || {
613 let mut queue = DelayQueue::new();
614
615 let delay1 = Delay::until_instant("1st", Instant::now());
616 let delay2 = Delay::for_duration("2nd", Duration::from_millis(500));
617
618 queue.push(delay1);
619 queue.push(delay2);
620
621 assert_eq!(queue.try_pop_until(Instant::now()).unwrap().value, "1st");
622 assert_eq!(queue.try_pop_until(Instant::now()), None);
623
624 assert!(!queue.is_empty());
625 },
626 1000,
627 );
628 }
629
630 #[test]
631 fn try_pop_for_zero_duration() {
632 timeout_ms(
633 || {
634 let mut queue = DelayQueue::new();
635
636 let delay1 = Delay::until_instant("1st", Instant::now());
637 let delay2 = Delay::for_duration("2nd", Duration::from_millis(500));
638
639 queue.push(delay1);
640 queue.push(delay2);
641
642 assert_eq!(
643 queue.try_pop_for(Duration::from_millis(0)).unwrap().value,
644 "1st"
645 );
646 assert_eq!(queue.try_pop_for(Duration::from_millis(0)), None);
647
648 assert!(!queue.is_empty());
649 },
650 1000,
651 );
652 }
653
654 #[test]
655 fn try_pop_until() {
656 timeout_ms(
657 || {
658 let mut queue = DelayQueue::new();
659
660 let delay1 = Delay::for_duration("1st", Duration::from_millis(100));
661
662 queue.push(delay1);
663
664 assert_eq!(
665 queue.try_pop_until(Instant::now() + Duration::from_millis(10)),
666 None
667 );
668 assert_eq!(
669 queue
670 .try_pop_until(Instant::now() + Duration::from_millis(200))
671 .unwrap()
672 .value,
673 "1st"
674 );
675
676 assert!(queue.is_empty());
677
678 assert_eq!(
679 queue.try_pop_until(Instant::now() + Duration::from_millis(10)),
680 None
681 );
682 },
683 1000,
684 );
685 }
686
687 #[test]
688 fn try_pop_for() {
689 timeout_ms(
690 || {
691 let mut queue = DelayQueue::new();
692
693 let delay1 = Delay::for_duration("1st", Duration::from_millis(100));
694
695 queue.push(delay1);
696
697 assert_eq!(queue.try_pop_for(Duration::from_millis(10)), None);
698 assert_eq!(
699 queue.try_pop_for(Duration::from_millis(200)).unwrap().value,
700 "1st"
701 );
702
703 assert!(queue.is_empty());
704
705 assert_eq!(queue.try_pop_for(Duration::from_millis(10)), None);
706 },
707 1000,
708 );
709 }
710
711 #[test]
712 fn push_higher_priority_while_waiting_to_try_pop() {
713 timeout_ms(
714 || {
715 let mut queue = DelayQueue::new();
716
717 let delay1 = Delay::until_instant("1st", Instant::now());
718 let delay2 = Delay::for_duration("2nd", Duration::from_millis(1000));
719
720 queue.push(delay2);
721
722 let mut cloned_queue = queue.clone();
723
724 let handle = thread::spawn(move || {
725 assert_eq!(
726 cloned_queue
727 .try_pop_for(Duration::from_millis(100))
728 .unwrap()
729 .value,
730 "1st"
731 );
732 assert!(!cloned_queue.is_empty());
733 });
734
735 thread::sleep(Duration::from_millis(20));
736 queue.push(delay1);
737
738 handle.join().unwrap();
739 },
740 1000,
741 );
742 }
743}