1use chrono::{Duration, NaiveDateTime, Utc};
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::hash::Hash;
4use std::sync::{Arc, Mutex};
5
6pub struct PQueue<T>
11where
12 T: Eq + Hash + Clone,
13{
14 queue: Arc<Mutex<PriorityQueue<T>>>,
15}
16
17impl<T> Default for PQueue<T>
18where
19 T: Eq + Hash + Clone,
20{
21 fn default() -> Self {
23 Self::new()
24 }
25}
26
27impl<T> Clone for PQueue<T>
28where
29 T: Eq + Hash + Clone,
30{
31 fn clone(&self) -> Self {
34 Self {
35 queue: self.queue.clone(),
36 }
37 }
38}
39
40impl<T> PQueue<T>
41where
42 T: Eq + Hash + Clone,
43{
44 pub fn new() -> Self {
46 Self {
47 queue: Arc::new(Mutex::new(PriorityQueue {
48 scores: BTreeMap::new(),
49 items: HashMap::new(),
50 stats: PQueueStatsTracker {
51 start_time: Utc::now().naive_utc(),
52 updates: 0,
53 items: 0,
54 pools: 0,
55 },
56 })),
57 }
58 }
59
60 pub fn update(&self, item: T, new_score: i64) -> (Option<i64>, i64) {
66 let mut queue = self.queue.lock().unwrap();
67
68 queue.update(Arc::new(item), new_score)
69 }
70
71 pub fn peek(&self) -> Option<T> {
76 let queue = self.queue.lock().unwrap();
77
78 queue.peek().map(|arc_item| (*arc_item).clone())
79 }
80
81 pub fn next(&self) -> Option<T> {
86 let mut queue = self.queue.lock().unwrap();
87
88 queue
89 .next()
90 .map(|arc_item| Arc::try_unwrap(arc_item).unwrap_or_else(|arc| (*arc).clone()))
92 }
93
94 pub fn score(&self, item: &T) -> Option<i64> {
99 let queue = self.queue.lock().unwrap();
100
101 queue.score(&Arc::new(item.clone()))
103 }
104
105 pub fn stats(&self) -> PQueueStats {
109 let queue = self.queue.lock().unwrap();
110
111 queue.stats.clone().into()
112 }
113}
114
115#[derive(Clone, Debug)]
117pub struct PQueueStats {
118 pub uptime: Duration,
120 pub version: String,
122 pub updates: i64,
124 pub items: i64,
126 pub pools: i64,
129}
130
131impl From<PQueueStatsTracker> for PQueueStats {
132 fn from(value: PQueueStatsTracker) -> Self {
134 Self {
135 uptime: Utc::now().naive_utc() - value.start_time,
136 version: env!("CARGO_PKG_VERSION").to_string(),
137 updates: value.updates,
138 items: value.items,
139 pools: value.pools,
140 }
141 }
142}
143
144#[derive(Clone, Debug)]
146struct PQueueStatsTracker {
147 start_time: NaiveDateTime,
149 updates: i64,
151 items: i64,
153 pools: i64,
156}
157
158struct PriorityQueue<T>
164where
165 T: Eq + Hash,
166{
167 scores: BTreeMap<i64, VecDeque<Arc<T>>>,
169 items: HashMap<Arc<T>, i64>,
171 stats: PQueueStatsTracker,
173}
174
175impl<T> PriorityQueue<T>
176where
177 T: Eq + Hash + Clone,
178{
179 pub fn update(&mut self, item: Arc<T>, new_score: i64) -> (Option<i64>, i64) {
185 let mut old_score = None;
186 let mut new_score = new_score;
187
188 self.stats.updates += 1;
189 if let Some(¤t_score) = self.items.get(&item) {
190 old_score = Some(current_score);
191
192 self.remove_item(&item, current_score);
193 new_score += current_score;
195 } else {
196 self.stats.items += 1;
197 }
198
199 self.items.insert(item.clone(), new_score);
200 if !self.scores.contains_key(&new_score) {
202 self.stats.pools += 1;
203 }
204 self.scores.entry(new_score).or_default().push_back(item);
205
206 (old_score, new_score)
207 }
208
209 pub fn peek(&self) -> Option<Arc<T>> {
214 self.scores
215 .iter()
216 .next_back()
217 .and_then(|(_, items)| items.iter().next().cloned())
218 }
219
220 pub fn next(&mut self) -> Option<Arc<T>> {
225 if let Some((&score, items)) = self.scores.iter_mut().next_back() {
226 let item = items.pop_front();
227 if let Some(item) = item {
228 if items.is_empty() {
230 self.scores.remove(&score);
231 self.stats.pools -= 1;
232 }
233 self.items.remove(&item);
234 self.stats.items -= 1;
235 Some(item)
236 } else {
237 self.scores.remove(&score);
239 self.stats.pools -= 1;
240 None
241 }
242 } else {
243 None
244 }
245 }
246
247 pub fn score(&self, item: &Arc<T>) -> Option<i64> {
252 self.items.get(item).cloned()
253 }
254
255 fn remove_item(&mut self, item: &Arc<T>, score: i64) {
258 if let Some(items) = self.scores.get_mut(&score) {
259 items.retain(|i| i != item);
260 if items.is_empty() {
262 self.scores.remove(&score);
263 self.stats.pools -= 1;
264 }
265 }
266 }
267}
268
269pub trait PQueueOperations<T> {
272 fn new() -> Self;
274 fn update(&self, item: T, new_score: i64);
276 fn peek(&self) -> Option<T>;
278 fn next(&self) -> Option<T>;
280 fn score(&self, item: &T) -> Option<i64>;
282 fn stats(&self) -> PQueueStats;
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[test]
291 fn test_update_and_peek() {
292 let queue = PQueue::<String>::new();
293 queue.update("item1".to_string(), 10);
294 queue.update("item2".to_string(), 20);
295 assert_eq!(queue.peek(), Some("item2".to_string()));
296 }
297
298 #[test]
299 fn test_next() {
300 let queue = PQueue::<String>::new();
301 queue.update("item1".to_string(), 10);
302 queue.update("item2".to_string(), 20);
303 assert_eq!(queue.next(), Some("item2".to_string()));
304 assert_eq!(queue.peek(), Some("item1".to_string()));
305 }
306
307 #[test]
308 fn test_update_existing_item() {
309 let queue = PQueue::<String>::new();
310 let (old_score, new_score) = queue.update("item1".to_string(), 10);
311 assert_eq!(old_score, None);
312 assert_eq!(new_score, 10);
313
314 let (old_score, new_score) = queue.update("item1".to_string(), 20);
315 assert_eq!(old_score, Some(10));
316 assert_eq!(new_score, 30);
317
318 assert_eq!(queue.score(&"item1".to_string()), Some(new_score));
319 }
320
321 #[test]
322 fn test_next_on_empty() {
323 let queue = PQueue::<String>::new();
324 assert_eq!(queue.next(), None);
325 }
326
327 #[test]
328 fn test_score_retrieval() {
329 let queue = PQueue::<String>::new();
330 queue.update("item1".to_string(), 10);
331 queue.update("item2".to_string(), 20);
332 assert_eq!(queue.score(&"item1".to_string()), Some(10));
333 assert_eq!(queue.score(&"item2".to_string()), Some(20));
334 }
335
336 #[test]
337 fn test_score_after_update() {
338 let queue = PQueue::<String>::new();
339 queue.update("item1".to_string(), 10);
340 queue.update("item1".to_string(), 20); assert_eq!(queue.score(&"item1".to_string()), Some(30)); }
343
344 #[test]
345 fn test_stats_after_operations() {
346 let queue = PQueue::<String>::new();
347 queue.update("item1".to_string(), 10);
348 queue.update("item2".to_string(), 20);
349 queue.next();
350 let stats = queue.stats();
351 assert_eq!(stats.updates, 2);
352 assert_eq!(stats.items, 1); assert_eq!(stats.pools, 1); }
355
356 #[test]
357 fn test_removal_of_items() {
358 let queue = PQueue::<String>::new();
359 queue.update("item1".to_string(), 10);
360 queue.update("item2".to_string(), 20);
361 queue.next(); assert_eq!(queue.score(&"item2".to_string()), None); }
364
365 #[test]
366 fn test_complex_scenario() {
367 let queue = PQueue::<String>::new();
368 let queue2 = queue.clone();
369 queue.update("item1".to_string(), 10);
370 queue.update("item2".to_string(), 15);
371 queue2.update("item3".to_string(), 22);
374 queue2.update("item4".to_string(), 15);
375 queue.update("item1".to_string(), 6); assert_eq!(queue.peek(), Some("item3".to_string())); queue.next(); assert_eq!(queue.peek(), Some("item1".to_string())); queue.next(); assert_eq!(queue.peek(), Some("item2".to_string())); queue.next(); assert_eq!(queue.peek(), Some("item4".to_string())); }
384}