gosh_dl/
priority_queue.rs

1//! Priority Queue for Download Scheduling
2//!
3//! Implements a priority-based queue for managing concurrent downloads.
4//! Downloads are scheduled based on priority (Critical > High > Normal > Low),
5//! with FIFO ordering within the same priority level.
6
7use crate::protocol::DownloadId;
8use parking_lot::Mutex;
9use std::collections::{BinaryHeap, HashMap};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
13
14// Re-export DownloadPriority for backward compatibility
15pub use crate::protocol::DownloadPriority;
16
17/// Entry in the priority queue
18#[derive(Debug, Clone, Eq, PartialEq)]
19struct QueueEntry {
20    id: DownloadId,
21    priority: DownloadPriority,
22    /// Sequence number for FIFO ordering within same priority
23    sequence: u64,
24}
25
26impl Ord for QueueEntry {
27    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
28        // Higher priority first, then lower sequence (earlier) first
29        match self.priority.cmp(&other.priority) {
30            std::cmp::Ordering::Equal => other.sequence.cmp(&self.sequence), // Lower sequence = higher priority
31            other => other,
32        }
33    }
34}
35
36impl PartialOrd for QueueEntry {
37    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
38        Some(self.cmp(other))
39    }
40}
41
42/// A permit that allows a download to proceed
43/// When dropped, releases the slot back to the queue
44pub struct PriorityPermit {
45    _permit: OwnedSemaphorePermit,
46    id: DownloadId,
47    queue: Arc<PriorityQueue>,
48}
49
50impl Drop for PriorityPermit {
51    fn drop(&mut self) {
52        // Remove from active set
53        self.queue.inner.lock().active.remove(&self.id);
54        // Notify all waiting downloads so the highest priority can acquire
55        self.queue.notify.notify_waiters();
56    }
57}
58
59/// Internal state of the priority queue
60struct PriorityQueueInner {
61    /// Downloads waiting for a slot
62    waiting: BinaryHeap<QueueEntry>,
63    /// Currently active downloads
64    active: HashMap<DownloadId, DownloadPriority>,
65    /// Priority of each waiting download (for quick lookup)
66    waiting_priorities: HashMap<DownloadId, DownloadPriority>,
67}
68
69/// Priority-based download queue
70///
71/// Manages concurrent download slots with priority ordering.
72/// Higher priority downloads are started before lower priority ones,
73/// with FIFO ordering within the same priority level.
74pub struct PriorityQueue {
75    /// Semaphore for limiting concurrent downloads
76    semaphore: Arc<Semaphore>,
77    /// Internal queue state
78    inner: Mutex<PriorityQueueInner>,
79    /// Sequence counter for FIFO ordering
80    sequence: AtomicU64,
81    /// Notification for waiting downloads
82    notify: Notify,
83}
84
85impl PriorityQueue {
86    /// Create a new priority queue with the given concurrency limit
87    pub fn new(max_concurrent: usize) -> Arc<Self> {
88        Arc::new(Self {
89            semaphore: Arc::new(Semaphore::new(max_concurrent)),
90            inner: Mutex::new(PriorityQueueInner {
91                waiting: BinaryHeap::new(),
92                active: HashMap::new(),
93                waiting_priorities: HashMap::new(),
94            }),
95            sequence: AtomicU64::new(0),
96            notify: Notify::new(),
97        })
98    }
99
100    /// Acquire a permit for the download to proceed (blocking).
101    ///
102    /// This method **adds the download to the waiting queue** and blocks until:
103    /// 1. A slot becomes available, AND
104    /// 2. This download is the highest priority in the waiting queue
105    ///
106    /// The download remains in the queue until a permit is granted, ensuring fair
107    /// ordering based on priority and arrival time (FIFO within same priority).
108    ///
109    /// # Difference from `try_acquire`
110    /// - `acquire`: Adds to queue, waits for turn, guarantees eventual permit
111    /// - `try_acquire`: Does NOT add to queue, immediate success or failure
112    ///
113    /// Use `acquire` for downloads that should wait their turn.
114    /// Use `try_acquire` for opportunistic slot acquisition (e.g., resuming paused downloads).
115    pub async fn acquire(
116        self: &Arc<Self>,
117        id: DownloadId,
118        priority: DownloadPriority,
119    ) -> PriorityPermit {
120        // Add to waiting queue
121        let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
122        {
123            let mut inner = self.inner.lock();
124            inner.waiting.push(QueueEntry {
125                id,
126                priority,
127                sequence,
128            });
129            inner.waiting_priorities.insert(id, priority);
130        }
131
132        loop {
133            // Check if we're next in line
134            {
135                let inner = self.inner.lock();
136                if let Some(next) = inner.waiting.peek() {
137                    if next.id == id {
138                        // We're next, try to acquire semaphore
139                        drop(inner); // Release lock before async operation
140
141                        // Try to acquire permit
142                        if let Ok(permit) = self.semaphore.clone().try_acquire_owned() {
143                            // Got permit, remove from waiting and add to active
144                            let mut inner = self.inner.lock();
145                            inner.waiting.pop();
146                            inner.waiting_priorities.remove(&id);
147                            inner.active.insert(id, priority);
148
149                            return PriorityPermit {
150                                _permit: permit,
151                                id,
152                                queue: Arc::clone(self),
153                            };
154                        }
155                    }
156                }
157            }
158
159            // Wait for notification (either slot freed or priority changed)
160            self.notify.notified().await;
161        }
162    }
163
164    /// Try to acquire a permit immediately without waiting (non-blocking).
165    ///
166    /// This method does **NOT** add the download to the waiting queue. It either
167    /// succeeds immediately or returns `None`.
168    ///
169    /// Returns `None` if:
170    /// - No slot is currently available, OR
171    /// - Higher priority downloads are already waiting in the queue
172    ///
173    /// # Difference from `acquire`
174    /// - `try_acquire`: Does NOT add to queue, immediate success or failure
175    /// - `acquire`: Adds to queue, waits for turn, guarantees eventual permit
176    ///
177    /// # Use Cases
178    /// - Opportunistic slot acquisition (e.g., checking if a paused download can resume)
179    /// - Avoiding queue position for downloads that shouldn't block others
180    /// - Non-async contexts where blocking is not possible
181    ///
182    /// # Warning
183    /// If you call `try_acquire` and it fails, the download is NOT queued.
184    /// You must call `acquire` if you want the download to wait for a slot.
185    pub fn try_acquire(
186        self: &Arc<Self>,
187        id: DownloadId,
188        priority: DownloadPriority,
189    ) -> Option<PriorityPermit> {
190        let mut inner = self.inner.lock();
191
192        // Check if there are higher priority downloads waiting
193        if let Some(next) = inner.waiting.peek() {
194            if next.priority > priority {
195                return None; // Higher priority download is waiting
196            }
197        }
198
199        // Try to acquire permit
200        match self.semaphore.clone().try_acquire_owned() {
201            Ok(permit) => {
202                inner.active.insert(id, priority);
203                Some(PriorityPermit {
204                    _permit: permit,
205                    id,
206                    queue: Arc::clone(self),
207                })
208            }
209            Err(_) => None,
210        }
211    }
212
213    /// Update the priority of a waiting download
214    ///
215    /// If the download is already active, this has no effect on scheduling.
216    /// Returns true if the priority was updated.
217    pub fn set_priority(&self, id: DownloadId, new_priority: DownloadPriority) -> bool {
218        let mut inner = self.inner.lock();
219
220        // Check if download is waiting
221        if inner.waiting_priorities.contains_key(&id) {
222            // Remove and re-add with new priority
223            let entries: Vec<_> = inner.waiting.drain().collect();
224            for entry in entries {
225                if entry.id == id {
226                    inner.waiting.push(QueueEntry {
227                        id: entry.id,
228                        priority: new_priority,
229                        sequence: entry.sequence,
230                    });
231                } else {
232                    inner.waiting.push(entry);
233                }
234            }
235            inner.waiting_priorities.insert(id, new_priority);
236            drop(inner);
237
238            // Notify waiting downloads to re-check their position
239            self.notify.notify_waiters();
240            return true;
241        }
242
243        // Check if download is active (update tracking but doesn't affect scheduling)
244        if let Some(priority) = inner.active.get_mut(&id) {
245            *priority = new_priority;
246            return true;
247        }
248
249        false
250    }
251
252    /// Remove a download from the waiting queue
253    ///
254    /// Call this if a download is cancelled before acquiring a permit.
255    pub fn remove(&self, id: DownloadId) {
256        let mut inner = self.inner.lock();
257        inner.waiting_priorities.remove(&id);
258        // Rebuild heap without the removed entry
259        let entries: Vec<_> = inner.waiting.drain().filter(|e| e.id != id).collect();
260        for entry in entries {
261            inner.waiting.push(entry);
262        }
263    }
264
265    /// Get the priority of a download (waiting or active)
266    pub fn get_priority(&self, id: DownloadId) -> Option<DownloadPriority> {
267        let inner = self.inner.lock();
268        inner
269            .waiting_priorities
270            .get(&id)
271            .or_else(|| inner.active.get(&id))
272            .copied()
273    }
274
275    /// Get the number of active downloads
276    pub fn active_count(&self) -> usize {
277        self.inner.lock().active.len()
278    }
279
280    /// Get the number of waiting downloads
281    pub fn waiting_count(&self) -> usize {
282        self.inner.lock().waiting.len()
283    }
284
285    /// Get the position in queue for a waiting download (1-indexed, None if not waiting)
286    pub fn queue_position(&self, id: DownloadId) -> Option<usize> {
287        let inner = self.inner.lock();
288        if !inner.waiting_priorities.contains_key(&id) {
289            return None;
290        }
291        // Count entries with higher priority or same priority but lower sequence
292        let mut sorted: Vec<_> = inner.waiting.iter().cloned().collect();
293        sorted.sort_by(|a, b| b.cmp(a)); // Reverse to get descending order
294        sorted.iter().position(|e| e.id == id).map(|p| p + 1)
295    }
296
297    /// Get statistics about the queue
298    pub fn stats(&self) -> PriorityQueueStats {
299        let inner = self.inner.lock();
300        let mut by_priority = HashMap::new();
301        for priority in inner.waiting_priorities.values() {
302            *by_priority.entry(*priority).or_insert(0) += 1;
303        }
304        PriorityQueueStats {
305            active: inner.active.len(),
306            waiting: inner.waiting.len(),
307            waiting_by_priority: by_priority,
308        }
309    }
310}
311
312/// Statistics about the priority queue
313#[derive(Debug, Clone)]
314pub struct PriorityQueueStats {
315    /// Number of active downloads
316    pub active: usize,
317    /// Total number of waiting downloads
318    pub waiting: usize,
319    /// Waiting downloads by priority level
320    pub waiting_by_priority: HashMap<DownloadPriority, usize>,
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_priority_ordering() {
329        assert!(DownloadPriority::Critical > DownloadPriority::High);
330        assert!(DownloadPriority::High > DownloadPriority::Normal);
331        assert!(DownloadPriority::Normal > DownloadPriority::Low);
332    }
333
334    #[test]
335    fn test_priority_from_str() {
336        assert_eq!(
337            "low".parse::<DownloadPriority>().unwrap(),
338            DownloadPriority::Low
339        );
340        assert_eq!(
341            "normal".parse::<DownloadPriority>().unwrap(),
342            DownloadPriority::Normal
343        );
344        assert_eq!(
345            "high".parse::<DownloadPriority>().unwrap(),
346            DownloadPriority::High
347        );
348        assert_eq!(
349            "critical".parse::<DownloadPriority>().unwrap(),
350            DownloadPriority::Critical
351        );
352    }
353
354    #[test]
355    fn test_queue_entry_ordering() {
356        let entry1 = QueueEntry {
357            id: DownloadId::new(),
358            priority: DownloadPriority::Normal,
359            sequence: 1,
360        };
361        let entry2 = QueueEntry {
362            id: DownloadId::new(),
363            priority: DownloadPriority::High,
364            sequence: 2,
365        };
366        let entry3 = QueueEntry {
367            id: DownloadId::new(),
368            priority: DownloadPriority::Normal,
369            sequence: 0,
370        };
371
372        // Higher priority should be greater
373        assert!(entry2 > entry1);
374
375        // Same priority, lower sequence should be greater
376        assert!(entry3 > entry1);
377    }
378
379    #[tokio::test]
380    async fn test_priority_queue_basic() {
381        let queue = PriorityQueue::new(2);
382        let id1 = DownloadId::new();
383        let id2 = DownloadId::new();
384
385        // Should be able to acquire 2 permits
386        let permit1 = queue.clone().acquire(id1, DownloadPriority::Normal).await;
387        let permit2 = queue.clone().acquire(id2, DownloadPriority::Normal).await;
388
389        assert_eq!(queue.active_count(), 2);
390
391        // Drop permits
392        drop(permit1);
393        drop(permit2);
394
395        assert_eq!(queue.active_count(), 0);
396    }
397
398    #[tokio::test]
399    async fn test_priority_queue_priority_ordering() {
400        let queue = PriorityQueue::new(1);
401        let id_low = DownloadId::new();
402        let id_high = DownloadId::new();
403
404        // Acquire first slot
405        let permit1 = queue
406            .clone()
407            .acquire(DownloadId::new(), DownloadPriority::Normal)
408            .await;
409
410        // Add low priority to queue first
411        let queue_clone = queue.clone();
412        let low_handle =
413            tokio::spawn(async move { queue_clone.acquire(id_low, DownloadPriority::Low).await });
414
415        // Give it time to enter the queue
416        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
417
418        // Add high priority to queue
419        let queue_clone = queue.clone();
420        let high_handle =
421            tokio::spawn(async move { queue_clone.acquire(id_high, DownloadPriority::High).await });
422
423        // Give it time to enter the queue
424        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
425
426        assert_eq!(queue.waiting_count(), 2);
427
428        // Release first permit - high priority should get the slot
429        drop(permit1);
430
431        // Wait for high priority to acquire
432        let high_permit = tokio::time::timeout(std::time::Duration::from_millis(100), high_handle)
433            .await
434            .expect("timeout")
435            .expect("join error");
436
437        assert_eq!(queue.active_count(), 1);
438        assert_eq!(queue.waiting_count(), 1);
439
440        // Release high priority permit
441        drop(high_permit);
442
443        // Wait for low priority to acquire
444        let _low_permit = tokio::time::timeout(std::time::Duration::from_millis(100), low_handle)
445            .await
446            .expect("timeout")
447            .expect("join error");
448
449        assert_eq!(queue.active_count(), 1);
450        assert_eq!(queue.waiting_count(), 0);
451    }
452
453    #[test]
454    fn test_set_priority() {
455        let queue = PriorityQueue::new(1);
456        let id = DownloadId::new();
457
458        // Add to waiting queue (can't acquire because no async context for test)
459        {
460            let mut inner = queue.inner.lock();
461            inner.waiting.push(QueueEntry {
462                id,
463                priority: DownloadPriority::Low,
464                sequence: 0,
465            });
466            inner.waiting_priorities.insert(id, DownloadPriority::Low);
467        }
468
469        assert_eq!(queue.get_priority(id), Some(DownloadPriority::Low));
470
471        // Update priority
472        assert!(queue.set_priority(id, DownloadPriority::High));
473
474        assert_eq!(queue.get_priority(id), Some(DownloadPriority::High));
475    }
476
477    #[test]
478    fn test_remove() {
479        let queue = PriorityQueue::new(1);
480        let id = DownloadId::new();
481
482        // Add to waiting queue
483        {
484            let mut inner = queue.inner.lock();
485            inner.waiting.push(QueueEntry {
486                id,
487                priority: DownloadPriority::Normal,
488                sequence: 0,
489            });
490            inner
491                .waiting_priorities
492                .insert(id, DownloadPriority::Normal);
493        }
494
495        assert_eq!(queue.waiting_count(), 1);
496
497        // Remove
498        queue.remove(id);
499
500        assert_eq!(queue.waiting_count(), 0);
501        assert_eq!(queue.get_priority(id), None);
502    }
503
504    #[test]
505    fn test_stats() {
506        let queue = PriorityQueue::new(2);
507
508        // Add some waiting entries
509        {
510            let mut inner = queue.inner.lock();
511            for i in 0..3 {
512                let id = DownloadId::new();
513                let priority = match i % 3 {
514                    0 => DownloadPriority::Low,
515                    1 => DownloadPriority::Normal,
516                    _ => DownloadPriority::High,
517                };
518                inner.waiting.push(QueueEntry {
519                    id,
520                    priority,
521                    sequence: i,
522                });
523                inner.waiting_priorities.insert(id, priority);
524            }
525        }
526
527        let stats = queue.stats();
528        assert_eq!(stats.waiting, 3);
529        assert_eq!(stats.active, 0);
530    }
531}