concurrent_pqueue/
lib.rs

1use chrono::{Duration, NaiveDateTime, Utc};
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::hash::Hash;
4use std::sync::{Arc, Mutex};
5
6/// Priority queue wrapper with internal synchronization using Arc and Mutex for thread safety.
7///
8/// You can clone this and pass it to multiple threads to share the same internal queue. Cloning
9/// will not copy the data, but instead, each cloned instance will point to the same internal queue.
10pub struct PQueue<T>
11where
12    T: Eq + Hash + Clone,
13{
14    queue: Arc<Mutex<PriorityQueue<T>>>,
15}
16
17impl<T> Default for PQueue<T>
18where
19    T: Eq + Hash + Clone,
20{
21    /// Creates a new empty priority queue using default initialization.
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27impl<T> Clone for PQueue<T>
28where
29    T: Eq + Hash + Clone,
30{
31    /// Creates a shallow clone that shares the same internal queue.
32    /// Multiple cloned instances will operate on the same underlying data.
33    fn clone(&self) -> Self {
34        Self {
35            queue: self.queue.clone(),
36        }
37    }
38}
39
40impl<T> PQueue<T>
41where
42    T: Eq + Hash + Clone,
43{
44    /// Creates a new empty priority queue with thread-safe `Arc<Mutex<T>>` wrapper.
45    pub fn new() -> Self {
46        Self {
47            queue: Arc::new(Mutex::new(PriorityQueue {
48                scores: BTreeMap::new(),
49                items: HashMap::new(),
50                stats: PQueueStatsTracker {
51                    start_time: Utc::now().naive_utc(),
52                    updates: 0,
53                    items: 0,
54                    pools: 0,
55                },
56            })),
57        }
58    }
59
60    /// Update the score of an item in the queue or adds it if it doesn't yet
61    /// exist.
62    ///
63    /// Returns a tuple of the old score (`None` if the item didn't yet exist)
64    /// and the new score.
65    pub fn update(&self, item: T, new_score: i64) -> (Option<i64>, i64) {
66        let mut queue = self.queue.lock().unwrap();
67
68        queue.update(Arc::new(item), new_score)
69    }
70
71    /// Peek at the highest scoring item in the queue.
72    ///
73    /// Returns the item with the highest score, or `None` if the queue is
74    /// empty.
75    pub fn peek(&self) -> Option<T> {
76        let queue = self.queue.lock().unwrap();
77
78        queue.peek().map(|arc_item| (*arc_item).clone())
79    }
80
81    /// Remove and return the highest scoring item from the queue.
82    ///
83    /// Returns the item with the highest score, or `None` if the queue is
84    /// empty.
85    pub fn next(&self) -> Option<T> {
86        let mut queue = self.queue.lock().unwrap();
87
88        queue
89            .next()
90            // Attempt to unwrap Arc, fallback to clone if other references exist
91            .map(|arc_item| Arc::try_unwrap(arc_item).unwrap_or_else(|arc| (*arc).clone()))
92    }
93
94    /// Get the current score of an item in the queue.
95    ///
96    /// Returns the score of the item, or `None` if the item doesn't exist in
97    /// the queue.
98    pub fn score(&self, item: &T) -> Option<i64> {
99        let queue = self.queue.lock().unwrap();
100
101        // Create Arc wrapper for lookup (HashMap key consistency)
102        queue.score(&Arc::new(item.clone()))
103    }
104
105    /// Get the statistics of the priority queue.
106    ///
107    /// Returns the statistics of the priority queue.
108    pub fn stats(&self) -> PQueueStats {
109        let queue = self.queue.lock().unwrap();
110
111        queue.stats.clone().into()
112    }
113}
114
115/// Statistics for the priority queue, returned by the `stats` method.
116#[derive(Clone, Debug)]
117pub struct PQueueStats {
118    /// The time since the priority queue was instantiated
119    pub uptime: Duration,
120    /// The version of the priority queue lib
121    pub version: String,
122    /// The count of update calls made to the queue since it was started
123    pub updates: i64,
124    /// The count of items currently in the queue
125    pub items: i64,
126    /// The count of separate score pools in the queue (a pool is just a set
127    /// of items with the same score)
128    pub pools: i64,
129}
130
131impl From<PQueueStatsTracker> for PQueueStats {
132    /// Converts internal stats tracker to public stats format with computed uptime.
133    fn from(value: PQueueStatsTracker) -> Self {
134        Self {
135            uptime: Utc::now().naive_utc() - value.start_time,
136            version: env!("CARGO_PKG_VERSION").to_string(),
137            updates: value.updates,
138            items: value.items,
139            pools: value.pools,
140        }
141    }
142}
143
144/// Statistics tracker for the priority queue
145#[derive(Clone, Debug)]
146struct PQueueStatsTracker {
147    /// The time the priority queue was instantiated
148    start_time: NaiveDateTime,
149    /// The count of update calls made to the queue since it was started
150    updates: i64,
151    /// The count of items currently in the queue
152    items: i64,
153    /// The count of separate score pools in the queue (a pool is just a set
154    /// of items with the same score)
155    pools: i64,
156}
157
158/// The core priority queue structure using a dual-index design:
159/// - BTreeMap for ordered access to scores (highest first)
160/// - HashMap for O(1) item-to-score lookups
161///
162/// Items with the same score are stored in a VecDeque for FIFO ordering.
163struct PriorityQueue<T>
164where
165    T: Eq + Hash,
166{
167    /// Maps scores to queues of items (BTreeMap keeps scores sorted)
168    scores: BTreeMap<i64, VecDeque<Arc<T>>>,
169    /// Maps items to their current scores for fast lookups
170    items: HashMap<Arc<T>, i64>,
171    /// Internal statistics tracking
172    stats: PQueueStatsTracker,
173}
174
175impl<T> PriorityQueue<T>
176where
177    T: Eq + Hash + Clone,
178{
179    /// Update the score of an item in the queue or adds it if it doesn't yet
180    /// exist.
181    ///
182    /// Returns a tuple of the old score (`None` if the item didn't yet exist)
183    /// and the new score.
184    pub fn update(&mut self, item: Arc<T>, new_score: i64) -> (Option<i64>, i64) {
185        let mut old_score = None;
186        let mut new_score = new_score;
187
188        self.stats.updates += 1;
189        if let Some(&current_score) = self.items.get(&item) {
190            old_score = Some(current_score);
191
192            self.remove_item(&item, current_score);
193            // Additive scoring: new score is added to existing score
194            new_score += current_score;
195        } else {
196            self.stats.items += 1;
197        }
198
199        self.items.insert(item.clone(), new_score);
200        // Track pool creation: a pool is a set of items with the same score
201        if !self.scores.contains_key(&new_score) {
202            self.stats.pools += 1;
203        }
204        self.scores.entry(new_score).or_default().push_back(item);
205
206        (old_score, new_score)
207    }
208
209    /// Peek at the highest scoring item in the queue.
210    ///
211    /// Returns the item with the highest score, or `None` if the queue is
212    /// empty.
213    pub fn peek(&self) -> Option<Arc<T>> {
214        self.scores
215            .iter()
216            .next_back()
217            .and_then(|(_, items)| items.iter().next().cloned())
218    }
219
220    /// Remove and return the highest scoring item from the queue.
221    ///
222    /// Returns the item with the highest score, or `None` if the queue is
223    /// empty.
224    pub fn next(&mut self) -> Option<Arc<T>> {
225        if let Some((&score, items)) = self.scores.iter_mut().next_back() {
226            let item = items.pop_front();
227            if let Some(item) = item {
228                // Clean up empty pools to maintain accurate pool count
229                if items.is_empty() {
230                    self.scores.remove(&score);
231                    self.stats.pools -= 1;
232                }
233                self.items.remove(&item);
234                self.stats.items -= 1;
235                Some(item)
236            } else {
237                // Edge case: empty pool cleanup
238                self.scores.remove(&score);
239                self.stats.pools -= 1;
240                None
241            }
242        } else {
243            None
244        }
245    }
246
247    /// Get the current score of an item in the queue.
248    ///
249    /// Returns the score of the item, or `None` if the item doesn't exist in
250    /// the queue.
251    pub fn score(&self, item: &Arc<T>) -> Option<i64> {
252        self.items.get(item).cloned()
253    }
254
255    /// Removes an item from a specific score pool and cleans up empty pools.
256    /// Used internally when updating existing items to new scores.
257    fn remove_item(&mut self, item: &Arc<T>, score: i64) {
258        if let Some(items) = self.scores.get_mut(&score) {
259            items.retain(|i| i != item);
260            // Clean up empty pools to prevent memory leaks and maintain accurate stats
261            if items.is_empty() {
262                self.scores.remove(&score);
263                self.stats.pools -= 1;
264            }
265        }
266    }
267}
268
269/// Trait defining the core operations for a priority queue.
270/// This abstraction allows for different implementations while maintaining a consistent API.
271pub trait PQueueOperations<T> {
272    /// Creates a new empty priority queue.
273    fn new() -> Self;
274    /// Updates an item's score (additive) or adds it if it doesn't exist.
275    fn update(&self, item: T, new_score: i64);
276    /// Returns the highest-scoring item without removing it.
277    fn peek(&self) -> Option<T>;
278    /// Removes and returns the highest-scoring item.
279    fn next(&self) -> Option<T>;
280    /// Gets the current score for a specific item.
281    fn score(&self, item: &T) -> Option<i64>;
282    /// Returns current queue statistics.
283    fn stats(&self) -> PQueueStats;
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[test]
291    fn test_update_and_peek() {
292        let queue = PQueue::<String>::new();
293        queue.update("item1".to_string(), 10);
294        queue.update("item2".to_string(), 20);
295        assert_eq!(queue.peek(), Some("item2".to_string()));
296    }
297
298    #[test]
299    fn test_next() {
300        let queue = PQueue::<String>::new();
301        queue.update("item1".to_string(), 10);
302        queue.update("item2".to_string(), 20);
303        assert_eq!(queue.next(), Some("item2".to_string()));
304        assert_eq!(queue.peek(), Some("item1".to_string()));
305    }
306
307    #[test]
308    fn test_update_existing_item() {
309        let queue = PQueue::<String>::new();
310        let (old_score, new_score) = queue.update("item1".to_string(), 10);
311        assert_eq!(old_score, None);
312        assert_eq!(new_score, 10);
313
314        let (old_score, new_score) = queue.update("item1".to_string(), 20);
315        assert_eq!(old_score, Some(10));
316        assert_eq!(new_score, 30);
317
318        assert_eq!(queue.score(&"item1".to_string()), Some(new_score));
319    }
320
321    #[test]
322    fn test_next_on_empty() {
323        let queue = PQueue::<String>::new();
324        assert_eq!(queue.next(), None);
325    }
326
327    #[test]
328    fn test_score_retrieval() {
329        let queue = PQueue::<String>::new();
330        queue.update("item1".to_string(), 10);
331        queue.update("item2".to_string(), 20);
332        assert_eq!(queue.score(&"item1".to_string()), Some(10));
333        assert_eq!(queue.score(&"item2".to_string()), Some(20));
334    }
335
336    #[test]
337    fn test_score_after_update() {
338        let queue = PQueue::<String>::new();
339        queue.update("item1".to_string(), 10);
340        queue.update("item1".to_string(), 20); // Updating the same item
341        assert_eq!(queue.score(&"item1".to_string()), Some(30)); // Expect the score to be cumulative
342    }
343
344    #[test]
345    fn test_stats_after_operations() {
346        let queue = PQueue::<String>::new();
347        queue.update("item1".to_string(), 10);
348        queue.update("item2".to_string(), 20);
349        queue.next();
350        let stats = queue.stats();
351        assert_eq!(stats.updates, 2);
352        assert_eq!(stats.items, 1); // One item should have been removed
353        assert_eq!(stats.pools, 1); // Pools count after one removal
354    }
355
356    #[test]
357    fn test_removal_of_items() {
358        let queue = PQueue::<String>::new();
359        queue.update("item1".to_string(), 10);
360        queue.update("item2".to_string(), 20);
361        queue.next(); // This should remove "item2"
362        assert_eq!(queue.score(&"item2".to_string()), None); // "item2" should not be in the queue
363    }
364
365    #[test]
366    fn test_complex_scenario() {
367        let queue = PQueue::<String>::new();
368        let queue2 = queue.clone();
369        queue.update("item1".to_string(), 10);
370        queue.update("item2".to_string(), 15);
371        // ensure that queue and it clone share the same internal queue by adding an item to queue2
372        // and checking if it comes back when we peek from queue
373        queue2.update("item3".to_string(), 22);
374        queue2.update("item4".to_string(), 15);
375        queue.update("item1".to_string(), 6); // Increment item1's score
376        assert_eq!(queue.peek(), Some("item3".to_string())); // "item3" should have the highest score
377        queue.next(); // Remove "item3"
378        assert_eq!(queue.peek(), Some("item1".to_string())); // "item1" should have the highest score now
379        queue.next(); // remove "item1"
380        assert_eq!(queue.peek(), Some("item2".to_string())); // Now "item2" should be at the top since it got score 15 before item4 did
381        queue.next(); // remove "item2"
382        assert_eq!(queue.peek(), Some("item4".to_string())); // Now "item4" is at the front of the queue
383    }
384}