delay_queue/
delay_queue.rs

1use std::collections::BinaryHeap;
2use std::sync::{Arc, Condvar, Mutex, MutexGuard};
3use std::time::{Duration, Instant};
4use std::cmp::Ordering;
5use delayed::Delayed;
6
7/// A concurrent unbounded blocking queue where each item can only be removed when its delay
8/// expires.
9///
10/// The queue supports multiple producers and multiple consumers.
11///
12/// Items of the queue must implement the `Delayed` trait. In most situations you can just use
13/// the helper struct `Delay` to wrap the values to be used by the queue.
14///
15/// If you implement the `Delayed` trait for your types, keep in mind that the `DelayQueue` assumes
16/// that the `Instant` until which each item is delayed does not change while that item is
17/// in the queue.
18///
19/// # Examples
20///
21/// Basic usage:
22///
23/// ```no_run
24/// use delay_queue::{Delay, DelayQueue};
25/// use std::time::{Duration, Instant};
26///
27/// let mut queue = DelayQueue::new();
28/// queue.push(Delay::for_duration("2nd", Duration::from_secs(5)));
29/// queue.push(Delay::until_instant("1st", Instant::now()));
30///
31/// println!("First pop: {}", queue.pop().value);
32/// println!("Second pop: {}", queue.pop().value);
33/// assert!(queue.is_empty());
34/// ```
35#[derive(Debug)]
36pub struct DelayQueue<T: Delayed> {
37    /// Points to the data that is shared between instances of the same queue (created by
38    /// cloning a queue). Usually the different instances of a queue will live in different
39    /// threads.
40    shared_data: Arc<DelayQueueSharedData<T>>,
41}
42
43/// The underlying data of a queue.
44///
45/// When a `DelayQueue` is cloned, it's clone will point to the same `DelayQueueSharedData`.
46/// This is done so a queue be used by different threads.
47#[derive(Debug)]
48struct DelayQueueSharedData<T: Delayed> {
49    /// Mutex protected `BinaryHeap` that holds the items of the queue in the order that they
50    /// should be popped.
51    queue: Mutex<BinaryHeap<Entry<T>>>,
52
53    /// Condition variable that signals when there is a new item at the head of the queue.
54    condvar_new_head: Condvar,
55}
56
57impl<T: Delayed> DelayQueue<T> {
58    /// Creates an empty `DelayQueue<T>`.
59    ///
60    /// # Examples
61    ///
62    /// Basic usage:
63    ///
64    /// ```
65    /// use delay_queue::{Delay, DelayQueue};
66    ///
67    /// let mut queue : DelayQueue<Delay<i32>>  = DelayQueue::new();
68    /// ```
69    pub fn new() -> DelayQueue<T> {
70        DelayQueue {
71            shared_data: Arc::new(DelayQueueSharedData {
72                queue: Mutex::new(BinaryHeap::new()),
73                condvar_new_head: Condvar::new(),
74            }),
75        }
76    }
77
78    /// Creates an empty `DelayQueue<T>` with a specific capacity.
79    /// This preallocates enough memory for `capacity` elements,
80    /// so that the `DelayQueue` does not have to be reallocated
81    /// until it contains at least that many values.
82    ///
83    /// # Examples
84    ///
85    /// Basic usage:
86    ///
87    /// ```
88    /// use delay_queue::{Delay, DelayQueue};
89    ///
90    /// let mut queue : DelayQueue<Delay<&str>>  = DelayQueue::with_capacity(10);
91    /// ```
92    pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
93        DelayQueue {
94            shared_data: Arc::new(DelayQueueSharedData {
95                queue: Mutex::new(BinaryHeap::with_capacity(capacity)),
96                condvar_new_head: Condvar::new(),
97            }),
98        }
99    }
100
101    /// Pushes an item onto the queue.
102    ///
103    /// # Examples
104    ///
105    /// Basic usage:
106    ///
107    /// ```
108    /// use delay_queue::{Delay, DelayQueue};
109    /// use std::time::Duration;
110    ///
111    /// let mut queue = DelayQueue::new();
112    /// queue.push(Delay::for_duration("2nd", Duration::from_secs(5)));
113    /// ```
114    pub fn push(&mut self, item: T) {
115        let mut queue = self.shared_data.queue.lock().unwrap();
116
117        {
118            // If the new item goes to the head of the queue then notify consumers
119            let cur_head = queue.peek();
120            if (cur_head == None)
121                || (item.delayed_until() < cur_head.unwrap().delayed.delayed_until())
122            {
123                self.shared_data.condvar_new_head.notify_one();
124            }
125        }
126
127        queue.push(Entry::new(item));
128    }
129
130    /// Pops the next item from the queue, blocking if necessary until an item is available and its
131    /// delay has expired.
132    ///
133    /// # Examples
134    ///
135    /// Basic usage:
136    ///
137    /// ```no_run
138    /// use delay_queue::{Delay, DelayQueue};
139    /// use std::time::{Duration, Instant};
140    ///
141    /// let mut queue = DelayQueue::new();
142    ///
143    /// queue.push(Delay::until_instant("1st", Instant::now()));
144    ///
145    /// // The pop will not block, since the delay has expired.
146    /// println!("First pop: {}", queue.pop().value);
147    ///
148    /// queue.push(Delay::for_duration("2nd", Duration::from_secs(5)));
149    ///
150    /// // The pop will block for approximately 5 seconds before returning the item.
151    /// println!("Second pop: {}", queue.pop().value);
152    /// ```
153    pub fn pop(&mut self) -> T {
154        let mut queue = self.shared_data.queue.lock().unwrap();
155
156        // Loop until an element can be popped, waiting if necessary
157        loop {
158            let wait_duration = match queue.peek() {
159                Some(elem) => {
160                    let now = Instant::now();
161                    // If there is an element and its delay is expired
162                    // break out of the loop to pop it
163                    if elem.delayed.delayed_until() <= now {
164                        break;
165                    }
166                    // Otherwise, calculate the Duration until the element expires
167                    elem.delayed.delayed_until() - now
168                }
169
170                // Signal that there is no element with a duration of zero
171                None => Duration::from_secs(0),
172            };
173
174            // Wait until there is a new head of the queue
175            // or the time to pop the current head expires
176            queue = if wait_duration > Duration::from_secs(0) {
177                self.shared_data
178                    .condvar_new_head
179                    .wait_timeout(queue, wait_duration)
180                    .unwrap()
181                    .0
182            } else {
183                self.shared_data.condvar_new_head.wait(queue).unwrap()
184            };
185        }
186
187        self.force_pop(queue)
188    }
189
190    /// Pops the next item from the queue, blocking if necessary until an item is available and its
191    /// delay has expired or until the given timeout expires.
192    ///
193    /// Returns `None` if the given timeout expires and no item became available to be popped.
194    ///
195    /// # Examples
196    ///
197    /// Basic usage:
198    ///
199    /// ```no_run
200    /// use delay_queue::{Delay, DelayQueue};
201    /// use std::time::Duration;
202    ///
203    /// let mut queue = DelayQueue::new();
204    ///
205    /// queue.push(Delay::for_duration("1st", Duration::from_secs(5)));
206    ///
207    /// // The pop will block for approximately 2 seconds before returning None.
208    /// println!("First pop: {:?}",
209    ///          queue.try_pop_for(Duration::from_secs(2))); // Prints "None"
210    ///
211    /// // The pop will block for approximately 3 seconds before returning the item.
212    /// println!("Second pop: {}",
213    ///          queue.try_pop_for(Duration::from_secs(5)).unwrap().value); // Prints "1st"
214    /// ```
215    pub fn try_pop_for(&mut self, timeout: Duration) -> Option<T> {
216        self.try_pop_until(Instant::now() + timeout)
217    }
218
219    /// Pops the next item from the queue, blocking if necessary until an item is available and its
220    /// delay has expired or until the given `Instant` is reached.
221    ///
222    /// Returns `None` if the given `Instant` is reached and no item became available to be popped.
223    ///
224    /// # Examples
225    ///
226    /// Basic usage:
227    ///
228    /// ```no_run
229    /// use delay_queue::{Delay, DelayQueue};
230    /// use std::time::{Duration, Instant};
231    ///
232    /// let mut queue = DelayQueue::new();
233    ///
234    /// queue.push(Delay::for_duration("1st", Duration::from_secs(5)));
235    ///
236    /// // The pop will block for approximately 2 seconds before returning None.
237    /// println!("First pop: {:?}",
238    ///          queue.try_pop_until(Instant::now() + Duration::from_secs(2))); // Prints "None"
239    ///
240    /// // The pop will block for approximately 3 seconds before returning the item.
241    /// println!("Second pop: {}",
242    ///          queue.try_pop_until(Instant::now() + Duration::from_secs(5))
243    ///               .unwrap().value); // Prints "1st"
244    /// ```
245    pub fn try_pop_until(&mut self, try_until: Instant) -> Option<T> {
246        let mut queue = self.shared_data.queue.lock().unwrap();
247
248        // Loop until an element can be popped or the timeout expires, waiting if necessary
249        loop {
250            let now = Instant::now();
251
252            let next_elem_duration = match queue.peek() {
253                // If there is an element and its delay is expired, break out of the loop to pop it
254                Some(elem) if elem.delayed.delayed_until() <= now => break,
255
256                // Calculate the Duration until the element expires
257                Some(elem) => elem.delayed.delayed_until() - now,
258
259                // Signal that there is no element with a duration of zero
260                None => Duration::from_secs(0),
261            };
262
263            if now >= try_until {
264                return None;
265            }
266
267            let time_left = try_until - now;
268
269            let wait_duration = if next_elem_duration > Duration::from_secs(0) {
270                // We'll wait until the time to pop the next element is reached
271                // or our timeout expires, whichever comes first
272                next_elem_duration.min(time_left)
273            } else {
274                // There is no element in the queue, we'll wait for one until our timeout expires
275                time_left
276            };
277
278            // Wait until there is a new head of the queue,
279            // the time to pop the current head expires,
280            // or the timeout expires
281            queue = self.shared_data
282                .condvar_new_head
283                .wait_timeout(queue, wait_duration)
284                .unwrap()
285                .0
286        }
287
288        Some(self.force_pop(queue))
289    }
290
291    /// Checks if the queue is empty.
292    ///
293    /// # Examples
294    ///
295    /// Basic usage:
296    ///
297    /// ```
298    /// use delay_queue::{Delay, DelayQueue};
299    /// use std::time::Instant;
300    ///
301    /// let mut queue = DelayQueue::new();
302    /// queue.push(Delay::until_instant("val", Instant::now()));
303    ///
304    /// assert!(!queue.is_empty());
305    ///
306    /// println!("First pop: {}", queue.pop().value);
307    ///
308    /// assert!(queue.is_empty());
309    /// ```
310    pub fn is_empty(&self) -> bool {
311        let queue = self.shared_data.queue.lock().unwrap();
312        queue.is_empty()
313    }
314
315    /// Pops an element from the queue, notifying `condvar_new_head` if there are elements still
316    /// left in the queue.
317    ///
318    /// # Panics
319    ///
320    /// Panics if `queue` is empty.
321    fn force_pop(&self, mut queue: MutexGuard<BinaryHeap<Entry<T>>>) -> T {
322        if queue.len() > 1 {
323            self.shared_data.condvar_new_head.notify_one();
324        }
325
326        queue.pop().unwrap().delayed
327    }
328}
329
330impl<T: Delayed> Default for DelayQueue<T> {
331    /// Creates an empty `DelayQueue<T>`.
332    fn default() -> DelayQueue<T> {
333        DelayQueue::new()
334    }
335}
336
337impl<T: Delayed> Clone for DelayQueue<T> {
338    /// Returns a new `DelayQueue` that points to the same underlying data.
339    ///
340    /// This method can be used to share a queue between different threads.
341    ///
342    /// # Examples
343    ///
344    /// Basic usage:
345    ///
346    /// ```no_run
347    /// use delay_queue::{Delay, DelayQueue};
348    /// use std::time::Duration;
349    /// use std::thread;
350    ///
351    /// let mut queue = DelayQueue::new();
352    ///
353    /// queue.push(Delay::for_duration("1st", Duration::from_secs(1)));
354    ///
355    /// let mut cloned_queue = queue.clone();
356    ///
357    /// let handle = thread::spawn(move || {
358    ///     println!("First pop: {}", cloned_queue.pop().value);
359    ///     println!("Second pop: {}", cloned_queue.pop().value);
360    /// });
361    ///
362    /// queue.push(Delay::for_duration("2nd", Duration::from_secs(2)));
363    ///
364    /// handle.join().unwrap();
365    /// ```
366    fn clone(&self) -> DelayQueue<T> {
367        DelayQueue {
368            shared_data: self.shared_data.clone(),
369        }
370    }
371}
372
373
374/// An entry in the `DelayQueue`.
375///
376/// Holds a `Delayed` item and implements an ordering based on delay `Instant`s of the items.
377#[derive(Debug)]
378struct Entry<T: Delayed> {
379    delayed: T,
380}
381
382impl<T: Delayed> Entry<T> {
383    fn new(delayed: T) -> Entry<T> {
384        Entry { delayed }
385    }
386}
387
388/// Implements ordering for `Entry`, so it can be used to correctly order elements in the
389/// `BinaryHeap` of the `DelayQueue`.
390///
391/// Earlier entries have higher priority (should be popped first), so they are Greater that later
392/// entries.
393impl<T: Delayed> Ord for Entry<T> {
394    fn cmp(&self, other: &Entry<T>) -> Ordering {
395        other
396            .delayed
397            .delayed_until()
398            .cmp(&self.delayed.delayed_until())
399    }
400}
401
402impl<T: Delayed> PartialOrd for Entry<T> {
403    fn partial_cmp(&self, other: &Entry<T>) -> Option<Ordering> {
404        Some(self.cmp(other))
405    }
406}
407
408impl<T: Delayed> PartialEq for Entry<T> {
409    fn eq(&self, other: &Entry<T>) -> bool {
410        self.cmp(other) == Ordering::Equal
411    }
412}
413
414impl<T: Delayed> Eq for Entry<T> {}
415
416
417#[cfg(test)]
418mod tests {
419    extern crate timebomb;
420
421    use self::timebomb::timeout_ms;
422    use std::time::{Duration, Instant};
423    use std::thread;
424    use delayed::Delay;
425    use super::{DelayQueue, Entry};
426
427    #[test]
428    fn entry_comparisons() {
429        let delayed_one_hour = Entry::new(Delay::for_duration("abc", Duration::from_secs(3600)));
430        let delayed_now = Entry::new(Delay::for_duration("def", Duration::from_secs(0)));
431
432        assert_eq!(delayed_now, delayed_now);
433        assert_ne!(delayed_now, delayed_one_hour);
434
435        assert!(delayed_now > delayed_one_hour);
436        assert!(delayed_one_hour < delayed_now);
437        assert!(delayed_one_hour <= delayed_one_hour);
438    }
439
440    #[test]
441    fn is_empty() {
442        timeout_ms(
443            || {
444                let mut queue = DelayQueue::new();
445
446                assert!(queue.is_empty());
447
448                queue.push(Delay::until_instant("1st", Instant::now()));
449
450                assert!(!queue.is_empty());
451                assert_eq!(queue.pop().value, "1st");
452                assert!(queue.is_empty());
453            },
454            1000,
455        );
456    }
457
458    #[test]
459    fn push_pop_single_thread() {
460        timeout_ms(
461            || {
462                let mut queue = DelayQueue::new();
463
464                let delay1 = Delay::until_instant("1st", Instant::now());
465                let delay2 = Delay::for_duration("2nd", Duration::from_millis(20));
466                let delay3 = Delay::for_duration("3rd", Duration::from_millis(30));
467                let delay4 = Delay::for_duration("4th", Duration::from_millis(40));
468
469                queue.push(delay2);
470                queue.push(delay4);
471                queue.push(delay1);
472
473                assert_eq!(queue.pop().value, "1st");
474                assert_eq!(queue.pop().value, "2nd");
475
476                queue.push(delay3);
477
478                assert_eq!(queue.pop().value, "3rd");
479                assert_eq!(queue.pop().value, "4th");
480
481                assert!(queue.is_empty());
482            },
483            1000,
484        );
485    }
486
487    #[test]
488    fn push_pop_different_thread() {
489        timeout_ms(
490            || {
491                let mut queue = DelayQueue::new();
492
493                let delay1 = Delay::until_instant("1st", Instant::now());
494                let delay2 = Delay::for_duration("2nd", Duration::from_millis(20));
495                let delay3 = Delay::for_duration("3rd", Duration::from_millis(30));
496                let delay4 = Delay::for_duration("4th", Duration::from_millis(40));
497
498                queue.push(delay2);
499                queue.push(delay3);
500                queue.push(delay1);
501
502                let mut cloned_queue = queue.clone();
503
504                let handle = thread::spawn(move || {
505                    assert_eq!(cloned_queue.pop().value, "1st");
506                    assert_eq!(cloned_queue.pop().value, "2nd");
507                    assert_eq!(cloned_queue.pop().value, "3rd");
508                    assert_eq!(cloned_queue.pop().value, "4th");
509                    assert!(cloned_queue.is_empty());
510                });
511
512                queue.push(delay4);
513
514                handle.join().unwrap();
515
516                assert!(queue.is_empty());
517            },
518            1000,
519        );
520    }
521
522    #[test]
523    fn pop_before_push() {
524        timeout_ms(
525            || {
526                let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
527
528                let mut cloned_queue = queue.clone();
529
530                let handle = thread::spawn(move || {
531                    assert_eq!(cloned_queue.pop().value, "1st");
532                    assert!(cloned_queue.is_empty());
533                });
534
535                thread::sleep(Duration::from_millis(100));
536                queue.push(Delay::for_duration("1st", Duration::from_millis(10)));
537
538                handle.join().unwrap();
539
540                assert!(queue.is_empty());
541            },
542            1000,
543        );
544    }
545
546    #[test]
547    fn pop_two_before_push() {
548        timeout_ms(
549            || {
550                let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
551                let mut handles = vec![];
552
553                for _ in 0..3 {
554                    let mut queue = queue.clone();
555                    let handle = thread::spawn(move || {
556                        let val = queue.pop().value;
557                        if val == "3rd" {
558                            assert!(queue.is_empty());
559                        }
560                    });
561                    handles.push(handle);
562                }
563
564                thread::sleep(Duration::from_millis(100));
565                queue.push(Delay::for_duration("1st", Duration::from_millis(10)));
566                queue.push(Delay::for_duration("2nd", Duration::from_millis(20)));
567                queue.push(Delay::for_duration("3rd", Duration::from_millis(30)));
568
569                for handle in handles {
570                    handle.join().unwrap();
571                }
572
573                assert!(queue.is_empty());
574            },
575            1000,
576        );
577    }
578
579    #[test]
580    fn push_higher_priority_while_waiting_to_pop() {
581        timeout_ms(
582            || {
583                let mut queue: DelayQueue<Delay<&str>> = DelayQueue::new();
584
585                let delay1 = Delay::until_instant("1st", Instant::now());
586                let delay2 = Delay::for_duration("2nd", Duration::from_millis(100));
587
588                let mut cloned_queue = queue.clone();
589
590                let handle = thread::spawn(move || {
591                    assert_eq!(cloned_queue.pop().value, "1st");
592                    assert_eq!(cloned_queue.pop().value, "2nd");
593                    assert!(cloned_queue.is_empty());
594                });
595
596                thread::sleep(Duration::from_millis(10));
597                queue.push(delay2);
598                thread::sleep(Duration::from_millis(10));
599                queue.push(delay1);
600
601                handle.join().unwrap();
602
603                assert!(queue.is_empty());
604            },
605            1000,
606        );
607    }
608
609    #[test]
610    fn try_pop_until_now() {
611        timeout_ms(
612            || {
613                let mut queue = DelayQueue::new();
614
615                let delay1 = Delay::until_instant("1st", Instant::now());
616                let delay2 = Delay::for_duration("2nd", Duration::from_millis(500));
617
618                queue.push(delay1);
619                queue.push(delay2);
620
621                assert_eq!(queue.try_pop_until(Instant::now()).unwrap().value, "1st");
622                assert_eq!(queue.try_pop_until(Instant::now()), None);
623
624                assert!(!queue.is_empty());
625            },
626            1000,
627        );
628    }
629
630    #[test]
631    fn try_pop_for_zero_duration() {
632        timeout_ms(
633            || {
634                let mut queue = DelayQueue::new();
635
636                let delay1 = Delay::until_instant("1st", Instant::now());
637                let delay2 = Delay::for_duration("2nd", Duration::from_millis(500));
638
639                queue.push(delay1);
640                queue.push(delay2);
641
642                assert_eq!(
643                    queue.try_pop_for(Duration::from_millis(0)).unwrap().value,
644                    "1st"
645                );
646                assert_eq!(queue.try_pop_for(Duration::from_millis(0)), None);
647
648                assert!(!queue.is_empty());
649            },
650            1000,
651        );
652    }
653
654    #[test]
655    fn try_pop_until() {
656        timeout_ms(
657            || {
658                let mut queue = DelayQueue::new();
659
660                let delay1 = Delay::for_duration("1st", Duration::from_millis(100));
661
662                queue.push(delay1);
663
664                assert_eq!(
665                    queue.try_pop_until(Instant::now() + Duration::from_millis(10)),
666                    None
667                );
668                assert_eq!(
669                    queue
670                        .try_pop_until(Instant::now() + Duration::from_millis(200))
671                        .unwrap()
672                        .value,
673                    "1st"
674                );
675
676                assert!(queue.is_empty());
677
678                assert_eq!(
679                    queue.try_pop_until(Instant::now() + Duration::from_millis(10)),
680                    None
681                );
682            },
683            1000,
684        );
685    }
686
687    #[test]
688    fn try_pop_for() {
689        timeout_ms(
690            || {
691                let mut queue = DelayQueue::new();
692
693                let delay1 = Delay::for_duration("1st", Duration::from_millis(100));
694
695                queue.push(delay1);
696
697                assert_eq!(queue.try_pop_for(Duration::from_millis(10)), None);
698                assert_eq!(
699                    queue.try_pop_for(Duration::from_millis(200)).unwrap().value,
700                    "1st"
701                );
702
703                assert!(queue.is_empty());
704
705                assert_eq!(queue.try_pop_for(Duration::from_millis(10)), None);
706            },
707            1000,
708        );
709    }
710
711    #[test]
712    fn push_higher_priority_while_waiting_to_try_pop() {
713        timeout_ms(
714            || {
715                let mut queue = DelayQueue::new();
716
717                let delay1 = Delay::until_instant("1st", Instant::now());
718                let delay2 = Delay::for_duration("2nd", Duration::from_millis(1000));
719
720                queue.push(delay2);
721
722                let mut cloned_queue = queue.clone();
723
724                let handle = thread::spawn(move || {
725                    assert_eq!(
726                        cloned_queue
727                            .try_pop_for(Duration::from_millis(100))
728                            .unwrap()
729                            .value,
730                        "1st"
731                    );
732                    assert!(!cloned_queue.is_empty());
733                });
734
735                thread::sleep(Duration::from_millis(20));
736                queue.push(delay1);
737
738                handle.join().unwrap();
739            },
740            1000,
741        );
742    }
743}