komora_sync/
priority_queue.rs

1use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd};
2use std::collections::BinaryHeap;
3use std::sync::{Condvar, Mutex};
4
5pub struct PriorityQueue<T> {
6    q: Mutex<BinaryHeap<Prioritized<T>>>,
7    cv: Condvar,
8}
9
10struct Prioritized<T> {
11    priority: u64,
12    t: T,
13}
14
15impl<T> Ord for Prioritized<T> {
16    fn cmp(&self, rhs: &Prioritized<T>) -> Ordering {
17        self.priority.cmp(&rhs.priority)
18    }
19}
20
21impl<T> PartialOrd for Prioritized<T> {
22    fn partial_cmp(&self, rhs: &Prioritized<T>) -> Option<Ordering> {
23        Some(self.cmp(rhs))
24    }
25}
26
27impl<T> PartialEq for Prioritized<T> {
28    fn eq(&self, rhs: &Prioritized<T>) -> bool {
29        self.priority == rhs.priority
30    }
31}
32
33impl<T> Eq for Prioritized<T> {}
34
35impl<T> PriorityQueue<T> {
36    pub fn new() -> PriorityQueue<T> {
37        PriorityQueue {
38            q: Mutex::default(),
39            cv: Condvar::new(),
40        }
41    }
42
43    /// Higher priority gets popped first.
44    ///
45    /// # Examples
46    /// ```
47    /// let pq = komora_sync::PriorityQueue::new();
48    /// pq.push(2, 2);
49    /// pq.push(1, 1);
50    /// pq.push(4, 4);
51    /// pq.push(1, 1);
52    /// assert_eq!(pq.pop(), 4);
53    /// assert_eq!(pq.pop(), 2);
54    /// assert_eq!(pq.pop(), 1);
55    /// assert_eq!(pq.pop(), 1);
56    /// ```
57    pub fn push(&self, t: T, priority: u64) {
58        let mut q = self.q.lock().unwrap();
59        q.push(Prioritized { t, priority });
60        drop(q);
61        self.cv.notify_one();
62    }
63
64    pub fn pop(&self) -> T {
65        let mut q = self.q.lock().unwrap();
66
67        while q.is_empty() {
68            q = self.cv.wait(q).unwrap();
69        }
70
71        q.pop().unwrap().t
72    }
73}