ragc_core/priority_queue.rs
1// Bounded priority queue for streaming compression pipeline
2// Matches C++ AGC's CBoundedPQueue (queue.h:153-346)
3
4use std::collections::BinaryHeap;
5use std::sync::{Arc, Condvar, Mutex};
6
7/// Result type for pop operations
8///
9/// Matches C++ AGC's result_t enum (queue.h:171):
10/// ```cpp
11/// enum class result_t { empty, completed, normal };
12/// ```
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum PopResult {
15 /// Queue is empty but producers are still active (wait and retry)
16 Empty,
17 /// Queue is empty AND no producers remain (exit worker loop)
18 Completed,
19 /// Successfully popped an item
20 Normal,
21}
22
23/// Priority queue entry
24///
25/// Matches C++ AGC's multimap key: `pair<size_t, size_t>` (queue.h:155)
26/// - First: priority (higher = processed first)
27/// - Second: cost (for capacity limiting)
28#[derive(Debug, Clone, Eq, PartialEq)]
29struct QueueEntry<T> {
30 priority: usize,
31 cost: usize,
32 data: T,
33}
34
35impl<T> PartialOrd for QueueEntry<T>
36where
37 T: Eq,
38{
39 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
40 Some(self.cmp(other))
41 }
42}
43
44impl<T> Ord for QueueEntry<T>
45where
46 T: Eq,
47{
48 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
49 // Compare by (priority, cost) - matches C++ multimap ordering
50 // Note: BinaryHeap is max-heap, which matches PopLarge() behavior
51 (self.priority, self.cost).cmp(&(other.priority, other.cost))
52 }
53}
54
55/// Internal queue state
56struct QueueState<T> {
57 /// Priority queue (max-heap by priority, then cost)
58 queue: BinaryHeap<QueueEntry<T>>,
59 /// Number of active producers
60 n_producers: usize,
61 /// Current total cost in queue
62 current_cost: usize,
63 /// Maximum allowed cost
64 max_cost: usize,
65}
66
67/// Bounded priority queue with capacity limiting
68///
69/// This matches C++ AGC's CBoundedPQueue (queue.h:153-346):
70/// ```cpp
71/// template<typename T> class CBoundedPQueue {
72/// typedef multimap<pair<size_t, size_t>, T> queue_t;
73/// // ...
74/// void Emplace(T&& data, size_t priority, size_t cost);
75/// result_t PopLarge(T& data);
76/// void MarkCompleted();
77/// }
78/// ```
79///
80/// Key features:
81/// - **Priority ordering**: Higher priority items processed first
82/// - **Capacity limiting**: Sum of costs must stay below max_cost
83/// - **Thread-safe**: Multiple producers and consumers
84/// - **Completion signaling**: MarkCompleted() when producer done
85///
86/// Example:
87/// ```no_run
88/// use ragc_core::priority_queue::{BoundedPriorityQueue, PopResult};
89///
90/// let queue = BoundedPriorityQueue::new(2, 1000); // 2 producers, 1000 max cost
91///
92/// // Producer thread
93/// queue.emplace("task1".to_string(), 100, 50); // priority 100, cost 50
94/// queue.mark_completed(); // This producer is done
95///
96/// // Consumer thread
97/// loop {
98/// match queue.pop_large() {
99/// (PopResult::Normal, Some(data)) => {
100/// // Process data
101/// }
102/// (PopResult::Empty, None) => {
103/// continue; // Wait for more
104/// }
105/// (PopResult::Completed, None) => {
106/// break; // All done
107/// }
108/// _ => {
109/// // Handle other cases
110/// }
111/// }
112/// }
113/// ```
114pub struct BoundedPriorityQueue<T> {
115 state: Arc<(Mutex<QueueState<T>>, Condvar, Condvar)>,
116}
117
118impl<T> BoundedPriorityQueue<T>
119where
120 T: Clone + Eq,
121{
122 /// Create a new bounded priority queue
123 ///
124 /// Matches C++ AGC's constructor (queue.h:174-180):
125 /// ```cpp
126 /// CBoundedPQueue(const int _n_producers, const size_t _max_cost);
127 /// ```
128 ///
129 /// # Arguments
130 /// * `n_producers` - Number of producer threads
131 /// * `max_cost` - Maximum total cost allowed in queue
132 pub fn new(n_producers: usize, max_cost: usize) -> Self {
133 BoundedPriorityQueue {
134 state: Arc::new((
135 Mutex::new(QueueState {
136 queue: BinaryHeap::new(),
137 n_producers,
138 current_cost: 0,
139 max_cost,
140 }),
141 Condvar::new(), // cv_queue_empty
142 Condvar::new(), // cv_queue_full
143 )),
144 }
145 }
146
147 /// Add an item to the queue (blocks if at capacity)
148 ///
149 /// Matches C++ AGC's Emplace (queue.h:238-251):
150 /// ```cpp
151 /// void Emplace(T&& data, const size_t priority, const size_t cost);
152 /// ```
153 ///
154 /// # Arguments
155 /// * `data` - Item to enqueue
156 /// * `priority` - Priority (higher = processed first)
157 /// * `cost` - Memory cost (for capacity limiting)
158 pub fn emplace(&self, data: T, priority: usize, cost: usize) {
159 let (mutex, cv_empty, cv_full) = &*self.state;
160 let mut state = mutex.lock().unwrap();
161
162 // Wait until there's space (current_cost < max_cost)
163 while state.current_cost >= state.max_cost {
164 state = cv_full.wait(state).unwrap();
165 }
166
167 let was_empty = state.queue.is_empty();
168 state.queue.push(QueueEntry {
169 priority,
170 cost,
171 data,
172 });
173 state.current_cost += cost;
174
175 if was_empty {
176 cv_empty.notify_all();
177 }
178 }
179
180 /// Add multiple copies of an item with zero cost (for sync barriers)
181 ///
182 /// Matches C++ AGC's EmplaceManyNoCost (queue.h:270-280):
183 /// ```cpp
184 /// void EmplaceManyNoCost(T&& data, size_t priority, size_t n_items);
185 /// ```
186 ///
187 /// This is used to send synchronization tokens to all workers.
188 ///
189 /// # Arguments
190 /// * `data` - Item to enqueue (will be cloned n_items times)
191 /// * `priority` - Priority
192 /// * `n_items` - Number of copies to enqueue
193 pub fn emplace_many_no_cost(&self, data: T, priority: usize, n_items: usize) {
194 let (mutex, cv_empty, _) = &*self.state;
195 let mut state = mutex.lock().unwrap();
196
197 for _ in 0..n_items {
198 state.queue.push(QueueEntry {
199 priority,
200 cost: 0,
201 data: data.clone(),
202 });
203 }
204
205 cv_empty.notify_all();
206 }
207
208 /// Pop the highest priority item (blocks if empty)
209 ///
210 /// Matches C++ AGC's PopLarge (queue.h:284-313):
211 /// ```cpp
212 /// result_t PopLarge(T& data);
213 /// ```
214 ///
215 /// Returns:
216 /// - (Normal, Some(data)): Successfully popped highest priority item
217 /// - (Empty, None): Queue empty but producers still active
218 /// - (Completed, None): Queue empty and no producers (exit)
219 pub fn pop_large(&self) -> (PopResult, Option<T>) {
220 let (mutex, cv_empty, cv_full) = &*self.state;
221 let mut state = mutex.lock().unwrap();
222
223 // Wait until queue has items or all producers are done
224 while state.queue.is_empty() && state.n_producers > 0 {
225 state = cv_empty.wait(state).unwrap();
226 }
227
228 if state.queue.is_empty() {
229 // No items and no producers left
230 return if state.n_producers > 0 {
231 (PopResult::Empty, None)
232 } else {
233 (PopResult::Completed, None)
234 };
235 }
236
237 // Pop highest priority item (BinaryHeap is max-heap)
238 let entry = state.queue.pop().unwrap();
239 state.current_cost -= entry.cost;
240
241 if state.queue.is_empty() {
242 cv_empty.notify_all();
243 }
244
245 cv_full.notify_all();
246
247 (PopResult::Normal, Some(entry.data))
248 }
249
250 /// Signal that a producer is done
251 ///
252 /// Matches C++ AGC's MarkCompleted (queue.h:212-219):
253 /// ```cpp
254 /// void MarkCompleted();
255 /// ```
256 ///
257 /// When all producers call this, consumers will receive Completed.
258 pub fn mark_completed(&self) {
259 let (mutex, cv_empty, _) = &*self.state;
260 let mut state = mutex.lock().unwrap();
261
262 state.n_producers -= 1;
263
264 if state.n_producers == 0 {
265 cv_empty.notify_all();
266 }
267 }
268
269 /// Check if queue is empty
270 ///
271 /// Matches C++ AGC's IsEmpty (queue.h:197-201)
272 pub fn is_empty(&self) -> bool {
273 let (mutex, _, _) = &*self.state;
274 let state = mutex.lock().unwrap();
275 state.queue.is_empty()
276 }
277
278 /// Check if queue is completed (empty and no producers)
279 ///
280 /// Matches C++ AGC's IsCompleted (queue.h:204-209)
281 pub fn is_completed(&self) -> bool {
282 let (mutex, _, _) = &*self.state;
283 let state = mutex.lock().unwrap();
284 state.queue.is_empty() && state.n_producers == 0
285 }
286
287 /// Get current queue size (items, total_cost)
288 ///
289 /// Matches C++ AGC's GetSize (queue.h:340-345)
290 pub fn get_size(&self) -> (usize, usize) {
291 let (mutex, _, _) = &*self.state;
292 let state = mutex.lock().unwrap();
293 (state.queue.len(), state.current_cost)
294 }
295}
296
297// Make queue cloneable (shares Arc internally)
298impl<T> Clone for BoundedPriorityQueue<T> {
299 fn clone(&self) -> Self {
300 BoundedPriorityQueue {
301 state: Arc::clone(&self.state),
302 }
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use std::thread;
310 use std::time::Duration;
311
312 #[test]
313 fn test_basic_operations() {
314 let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(1, 1000);
315
316 queue.emplace("task1".to_string(), 100, 50);
317 queue.emplace("task2".to_string(), 200, 50);
318 queue.emplace("task3".to_string(), 150, 50);
319
320 // Should pop in priority order: 200, 150, 100
321 let (result, data) = queue.pop_large();
322 assert_eq!(result, PopResult::Normal);
323 assert_eq!(data, Some("task2".to_string()));
324
325 let (result, data) = queue.pop_large();
326 assert_eq!(result, PopResult::Normal);
327 assert_eq!(data, Some("task3".to_string()));
328
329 let (result, data) = queue.pop_large();
330 assert_eq!(result, PopResult::Normal);
331 assert_eq!(data, Some("task1".to_string()));
332 }
333
334 #[test]
335 fn test_completion_signaling() {
336 let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(1, 1000);
337
338 queue.mark_completed();
339
340 // Queue is empty and producer is done
341 let (result, data) = queue.pop_large();
342 assert_eq!(result, PopResult::Completed);
343 assert_eq!(data, None);
344 }
345
346 #[test]
347 fn test_emplace_many_no_cost() {
348 let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(1, 1000);
349
350 queue.emplace_many_no_cost("sync".to_string(), 500, 3);
351
352 for _ in 0..3 {
353 let (result, data) = queue.pop_large();
354 assert_eq!(result, PopResult::Normal);
355 assert_eq!(data, Some("sync".to_string()));
356 }
357 }
358
359 #[test]
360 fn test_multi_threaded() {
361 let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(2, 1000);
362 let q1 = queue.clone();
363 let q2 = queue.clone();
364
365 // Producer 1
366 let p1 = thread::spawn(move || {
367 for i in 0..5 {
368 q1.emplace(format!("p1-{}", i), 100 + i, 10);
369 thread::sleep(Duration::from_millis(1));
370 }
371 q1.mark_completed();
372 });
373
374 // Producer 2
375 let p2 = thread::spawn(move || {
376 for i in 0..5 {
377 q2.emplace(format!("p2-{}", i), 200 + i, 10);
378 thread::sleep(Duration::from_millis(1));
379 }
380 q2.mark_completed();
381 });
382
383 // Consumer
384 let mut count = 0;
385 loop {
386 match queue.pop_large() {
387 (PopResult::Normal, Some(_)) => {
388 count += 1;
389 }
390 (PopResult::Empty, None) => {
391 thread::sleep(Duration::from_millis(1));
392 continue;
393 }
394 (PopResult::Completed, None) => {
395 break;
396 }
397 _ => panic!("Unexpected queue state"),
398 }
399 }
400
401 p1.join().unwrap();
402 p2.join().unwrap();
403
404 assert_eq!(count, 10);
405 }
406
407 #[test]
408 fn test_capacity_limiting() {
409 let queue: BoundedPriorityQueue<String> = BoundedPriorityQueue::new(1, 100);
410 let q = queue.clone();
411
412 // This should fill the queue (50 + 50 = 100)
413 queue.emplace("task1".to_string(), 100, 50);
414 queue.emplace("task2".to_string(), 100, 50);
415
416 // Try to add one more in a separate thread (should block)
417 let producer = thread::spawn(move || {
418 q.emplace("task3".to_string(), 100, 50);
419 });
420
421 // Give producer time to try to enqueue
422 thread::sleep(Duration::from_millis(10));
423
424 // Should still have only 2 items
425 assert_eq!(queue.get_size(), (2, 100));
426
427 // Pop one to make space
428 queue.pop_large();
429
430 // Now producer should be able to proceed
431 producer.join().unwrap();
432
433 // Should now have 2 items again
434 assert_eq!(queue.get_size(), (2, 100));
435 }
436}