ragc_core/
priority_queue.rs

1// Bounded priority queue for streaming compression pipeline
2// Matches C++ AGC's CBoundedPQueue (queue.h:153-346)
3
4use std::collections::BinaryHeap;
5use std::sync::{Arc, Condvar, Mutex};
6
7/// Result type for pop operations
8///
9/// Matches C++ AGC's result_t enum (queue.h:171):
10/// ```cpp
11/// enum class result_t { empty, completed, normal };
12/// ```
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum PopResult {
15    /// Queue is empty but producers are still active (wait and retry)
16    Empty,
17    /// Queue is empty AND no producers remain (exit worker loop)
18    Completed,
19    /// Successfully popped an item
20    Normal,
21}
22
23/// Priority queue entry
24///
25/// Matches C++ AGC's multimap key: `pair<size_t, size_t>` (queue.h:155)
26/// - First: priority (higher = processed first)
27/// - Second: cost (for capacity limiting)
28#[derive(Debug, Clone, Eq, PartialEq)]
29struct QueueEntry<T> {
30    priority: usize,
31    cost: usize,
32    data: T,
33}
34
35impl<T> PartialOrd for QueueEntry<T>
36where
37    T: Eq,
38{
39    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
40        Some(self.cmp(other))
41    }
42}
43
44impl<T> Ord for QueueEntry<T>
45where
46    T: Eq,
47{
48    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
49        // Compare by (priority, cost) - matches C++ multimap ordering
50        // Note: BinaryHeap is max-heap, which matches PopLarge() behavior
51        (self.priority, self.cost).cmp(&(other.priority, other.cost))
52    }
53}
54
55/// Internal queue state
56struct QueueState<T> {
57    /// Priority queue (max-heap by priority, then cost)
58    queue: BinaryHeap<QueueEntry<T>>,
59    /// Number of active producers
60    n_producers: usize,
61    /// Current total cost in queue
62    current_cost: usize,
63    /// Maximum allowed cost
64    max_cost: usize,
65}
66
67/// Bounded priority queue with capacity limiting
68///
69/// This matches C++ AGC's CBoundedPQueue (queue.h:153-346):
70/// ```cpp
71/// template<typename T> class CBoundedPQueue {
72///     typedef multimap<pair<size_t, size_t>, T> queue_t;
73///     // ...
74///     void Emplace(T&& data, size_t priority, size_t cost);
75///     result_t PopLarge(T& data);
76///     void MarkCompleted();
77/// }
78/// ```
79///
80/// Key features:
81/// - **Priority ordering**: Higher priority items processed first
82/// - **Capacity limiting**: Sum of costs must stay below max_cost
83/// - **Thread-safe**: Multiple producers and consumers
84/// - **Completion signaling**: MarkCompleted() when producer done
85///
86/// Example:
87/// ```no_run
88/// use ragc_core::priority_queue::{BoundedPriorityQueue, PopResult};
89///
90/// let queue = BoundedPriorityQueue::new(2, 1000); // 2 producers, 1000 max cost
91///
92/// // Producer thread
93/// queue.emplace("task1".to_string(), 100, 50);  // priority 100, cost 50
94/// queue.mark_completed();  // This producer is done
95///
96/// // Consumer thread
97/// loop {
98///     match queue.pop_large() {
99///         (PopResult::Normal, Some(data)) => {
100///             // Process data
101///         }
102///         (PopResult::Empty, None) => {
103///             continue;  // Wait for more
104///         }
105///         (PopResult::Completed, None) => {
106///             break;  // All done
107///         }
108///         _ => {
109///             // Handle other cases
110///         }
111///     }
112/// }
113/// ```
114pub struct BoundedPriorityQueue<T> {
115    state: Arc<(Mutex<QueueState<T>>, Condvar, Condvar)>,
116}
117
118impl<T> BoundedPriorityQueue<T>
119where
120    T: Clone + Eq,
121{
122    /// Create a new bounded priority queue
123    ///
124    /// Matches C++ AGC's constructor (queue.h:174-180):
125    /// ```cpp
126    /// CBoundedPQueue(const int _n_producers, const size_t _max_cost);
127    /// ```
128    ///
129    /// # Arguments
130    /// * `n_producers` - Number of producer threads
131    /// * `max_cost` - Maximum total cost allowed in queue
132    pub fn new(n_producers: usize, max_cost: usize) -> Self {
133        BoundedPriorityQueue {
134            state: Arc::new((
135                Mutex::new(QueueState {
136                    queue: BinaryHeap::new(),
137                    n_producers,
138                    current_cost: 0,
139                    max_cost,
140                }),
141                Condvar::new(), // cv_queue_empty
142                Condvar::new(), // cv_queue_full
143            )),
144        }
145    }
146
147    /// Add an item to the queue (blocks if at capacity)
148    ///
149    /// Matches C++ AGC's Emplace (queue.h:238-251):
150    /// ```cpp
151    /// void Emplace(T&& data, const size_t priority, const size_t cost);
152    /// ```
153    ///
154    /// # Arguments
155    /// * `data` - Item to enqueue
156    /// * `priority` - Priority (higher = processed first)
157    /// * `cost` - Memory cost (for capacity limiting)
158    pub fn emplace(&self, data: T, priority: usize, cost: usize) {
159        let (mutex, cv_empty, cv_full) = &*self.state;
160        let mut state = mutex.lock().unwrap();
161
162        // Wait until there's space (current_cost < max_cost)
163        while state.current_cost >= state.max_cost {
164            state = cv_full.wait(state).unwrap();
165        }
166
167        let was_empty = state.queue.is_empty();
168        state.queue.push(QueueEntry {
169            priority,
170            cost,
171            data,
172        });
173        state.current_cost += cost;
174
175        if was_empty {
176            cv_empty.notify_all();
177        }
178    }
179
180    /// Add multiple copies of an item with zero cost (for sync barriers)
181    ///
182    /// Matches C++ AGC's EmplaceManyNoCost (queue.h:270-280):
183    /// ```cpp
184    /// void EmplaceManyNoCost(T&& data, size_t priority, size_t n_items);
185    /// ```
186    ///
187    /// This is used to send synchronization tokens to all workers.
188    ///
189    /// # Arguments
190    /// * `data` - Item to enqueue (will be cloned n_items times)
191    /// * `priority` - Priority
192    /// * `n_items` - Number of copies to enqueue
193    pub fn emplace_many_no_cost(&self, data: T, priority: usize, n_items: usize) {
194        let (mutex, cv_empty, _) = &*self.state;
195        let mut state = mutex.lock().unwrap();
196
197        for _ in 0..n_items {
198            state.queue.push(QueueEntry {
199                priority,
200                cost: 0,
201                data: data.clone(),
202            });
203        }
204
205        cv_empty.notify_all();
206    }
207
208    /// Pop the highest priority item (blocks if empty)
209    ///
210    /// Matches C++ AGC's PopLarge (queue.h:284-313):
211    /// ```cpp
212    /// result_t PopLarge(T& data);
213    /// ```
214    ///
215    /// Returns:
216    /// - (Normal, Some(data)): Successfully popped highest priority item
217    /// - (Empty, None): Queue empty but producers still active
218    /// - (Completed, None): Queue empty and no producers (exit)
219    pub fn pop_large(&self) -> (PopResult, Option<T>) {
220        let (mutex, cv_empty, cv_full) = &*self.state;
221        let mut state = mutex.lock().unwrap();
222
223        // Wait until queue has items or all producers are done
224        while state.queue.is_empty() && state.n_producers > 0 {
225            state = cv_empty.wait(state).unwrap();
226        }
227
228        if state.queue.is_empty() {
229            // No items and no producers left
230            return if state.n_producers > 0 {
231                (PopResult::Empty, None)
232            } else {
233                (PopResult::Completed, None)
234            };
235        }
236
237        // Pop highest priority item (BinaryHeap is max-heap)
238        let entry = state.queue.pop().unwrap();
239        state.current_cost -= entry.cost;
240
241        if state.queue.is_empty() {
242            cv_empty.notify_all();
243        }
244
245        cv_full.notify_all();
246
247        (PopResult::Normal, Some(entry.data))
248    }
249
250    /// Signal that a producer is done
251    ///
252    /// Matches C++ AGC's MarkCompleted (queue.h:212-219):
253    /// ```cpp
254    /// void MarkCompleted();
255    /// ```
256    ///
257    /// When all producers call this, consumers will receive Completed.
258    pub fn mark_completed(&self) {
259        let (mutex, cv_empty, _) = &*self.state;
260        let mut state = mutex.lock().unwrap();
261
262        state.n_producers -= 1;
263
264        if state.n_producers == 0 {
265            cv_empty.notify_all();
266        }
267    }
268
269    /// Check if queue is empty
270    ///
271    /// Matches C++ AGC's IsEmpty (queue.h:197-201)
272    pub fn is_empty(&self) -> bool {
273        let (mutex, _, _) = &*self.state;
274        let state = mutex.lock().unwrap();
275        state.queue.is_empty()
276    }
277
278    /// Check if queue is completed (empty and no producers)
279    ///
280    /// Matches C++ AGC's IsCompleted (queue.h:204-209)
281    pub fn is_completed(&self) -> bool {
282        let (mutex, _, _) = &*self.state;
283        let state = mutex.lock().unwrap();
284        state.queue.is_empty() && state.n_producers == 0
285    }
286
287    /// Get current queue size (items, total_cost)
288    ///
289    /// Matches C++ AGC's GetSize (queue.h:340-345)
290    pub fn get_size(&self) -> (usize, usize) {
291        let (mutex, _, _) = &*self.state;
292        let state = mutex.lock().unwrap();
293        (state.queue.len(), state.current_cost)
294    }
295}
296
297// Make queue cloneable (shares Arc internally)
298impl<T> Clone for BoundedPriorityQueue<T> {
299    fn clone(&self) -> Self {
300        BoundedPriorityQueue {
301            state: Arc::clone(&self.state),
302        }
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use std::thread;
310    use std::time::Duration;
311
312    #[test]
313    fn test_basic_operations() {
314        let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(1, 1000);
315
316        queue.emplace("task1".to_string(), 100, 50);
317        queue.emplace("task2".to_string(), 200, 50);
318        queue.emplace("task3".to_string(), 150, 50);
319
320        // Should pop in priority order: 200, 150, 100
321        let (result, data) = queue.pop_large();
322        assert_eq!(result, PopResult::Normal);
323        assert_eq!(data, Some("task2".to_string()));
324
325        let (result, data) = queue.pop_large();
326        assert_eq!(result, PopResult::Normal);
327        assert_eq!(data, Some("task3".to_string()));
328
329        let (result, data) = queue.pop_large();
330        assert_eq!(result, PopResult::Normal);
331        assert_eq!(data, Some("task1".to_string()));
332    }
333
334    #[test]
335    fn test_completion_signaling() {
336        let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(1, 1000);
337
338        queue.mark_completed();
339
340        // Queue is empty and producer is done
341        let (result, data) = queue.pop_large();
342        assert_eq!(result, PopResult::Completed);
343        assert_eq!(data, None);
344    }
345
346    #[test]
347    fn test_emplace_many_no_cost() {
348        let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(1, 1000);
349
350        queue.emplace_many_no_cost("sync".to_string(), 500, 3);
351
352        for _ in 0..3 {
353            let (result, data) = queue.pop_large();
354            assert_eq!(result, PopResult::Normal);
355            assert_eq!(data, Some("sync".to_string()));
356        }
357    }
358
359    #[test]
360    fn test_multi_threaded() {
361        let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(2, 1000);
362        let q1 = queue.clone();
363        let q2 = queue.clone();
364
365        // Producer 1
366        let p1 = thread::spawn(move || {
367            for i in 0..5 {
368                q1.emplace(format!("p1-{}", i), 100 + i, 10);
369                thread::sleep(Duration::from_millis(1));
370            }
371            q1.mark_completed();
372        });
373
374        // Producer 2
375        let p2 = thread::spawn(move || {
376            for i in 0..5 {
377                q2.emplace(format!("p2-{}", i), 200 + i, 10);
378                thread::sleep(Duration::from_millis(1));
379            }
380            q2.mark_completed();
381        });
382
383        // Consumer
384        let mut count = 0;
385        loop {
386            match queue.pop_large() {
387                (PopResult::Normal, Some(_)) => {
388                    count += 1;
389                }
390                (PopResult::Empty, None) => {
391                    thread::sleep(Duration::from_millis(1));
392                    continue;
393                }
394                (PopResult::Completed, None) => {
395                    break;
396                }
397                _ => panic!("Unexpected queue state"),
398            }
399        }
400
401        p1.join().unwrap();
402        p2.join().unwrap();
403
404        assert_eq!(count, 10);
405    }
406
407    #[test]
408    fn test_capacity_limiting() {
409        let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(1, 100);
410        let q = queue.clone();
411
412        // This should fill the queue (50 + 50 = 100)
413        queue.emplace("task1".to_string(), 100, 50);
414        queue.emplace("task2".to_string(), 100, 50);
415
416        // Try to add one more in a separate thread (should block)
417        let producer = thread::spawn(move || {
418            q.emplace("task3".to_string(), 100, 50);
419        });
420
421        // Give producer time to try to enqueue
422        thread::sleep(Duration::from_millis(10));
423
424        // Should still have only 2 items
425        assert_eq!(queue.get_size(), (2, 100));
426
427        // Pop one to make space
428        queue.pop_large();
429
430        // Now producer should be able to proceed
431        producer.join().unwrap();
432
433        // Should now have 2 items again
434        assert_eq!(queue.get_size(), (2, 100));
435    }
436}