rs2_stream/media/
priority_queue.rs

1//! Priority queue implementation for media streaming
2//!
3//! Extends the existing Queue with priority-based ordering
4
5use crate::queue::{Queue, QueueError};
6use super::types::{MediaChunk, MediaPriority};
7use async_stream::stream;
8use futures_core::Stream;
9use futures_util::StreamExt;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16struct PriorityItem {
17    chunk: MediaChunk,
18    priority: MediaPriority,
19    sequence: u64,
20}
21
22impl PartialOrd for PriorityItem {
23    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
24        Some(self.cmp(other))
25    }
26}
27
28impl Ord for PriorityItem {
29    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
30        // Higher priority first, then lower sequence number (older chunks first)
31        self.priority.cmp(&other.priority)
32            .then_with(|| Reverse(self.sequence).cmp(&Reverse(other.sequence)))
33    }
34}
35
36/// Priority queue for media chunks
37/// Uses your existing Queue internally but adds priority ordering
38pub struct MediaPriorityQueue {
39    internal_queue: Queue<PriorityItem>,
40    priority_buffer: Arc<Mutex<BinaryHeap<PriorityItem>>>,
41    buffer_size: usize,
42}
43
44impl MediaPriorityQueue {
45    pub fn new(capacity: usize, priority_buffer_size: usize) -> Self {
46        Self {
47            internal_queue: Queue::bounded(capacity),
48            priority_buffer: Arc::new(Mutex::new(BinaryHeap::new())),
49            buffer_size: priority_buffer_size,
50        }
51    }
52
53    pub async fn enqueue(&self, chunk: MediaChunk) -> Result<(), QueueError> {
54        let priority = chunk.priority;
55        let sequence = chunk.sequence_number;
56
57        let item = PriorityItem {
58            chunk,
59            priority,
60            sequence,
61        };
62
63        // Try to add to priority buffer first
64        {
65            let mut buffer = self.priority_buffer.lock().await;
66            if buffer.len() < self.buffer_size {
67                buffer.push(item);
68                return Ok(());
69            }
70        }
71
72        // Buffer full, push to main queue
73        self.internal_queue.enqueue(item).await
74    }
75
76    pub fn dequeue(&self) -> impl Stream<Item = MediaChunk> + Send + 'static {
77        let priority_buffer = Arc::clone(&self.priority_buffer);
78        let queue_stream = self.internal_queue.dequeue();
79
80        stream! {
81            let mut queue_stream = std::pin::pin!(queue_stream);
82            
83            loop {
84                // First check priority buffer
85                let high_priority_item = {
86                    let mut buffer = priority_buffer.lock().await;
87                    buffer.pop()
88                };
89
90                if let Some(item) = high_priority_item {
91                    yield item.chunk;
92                } else {
93                    // No high priority items, get from main queue
94                    match queue_stream.next().await {
95                        Some(item) => yield item.chunk,
96                        None => break,
97                    }
98                }
99            }
100        }
101    }
102
103    /// Try to enqueue without blocking - useful for live streaming
104    pub async fn try_enqueue(&self, chunk: MediaChunk) -> Result<(), QueueError> {
105        let priority = chunk.priority;
106        let sequence = chunk.sequence_number;
107
108        let item = PriorityItem {
109            chunk,
110            priority,
111            sequence,
112        };
113
114        // Try to add to priority buffer first
115        {
116            let mut buffer = self.priority_buffer.lock().await;
117            if buffer.len() < self.buffer_size {
118                buffer.push(item);
119                return Ok(());
120            }
121        }
122
123        // Buffer full, try to push to main queue without blocking
124        self.internal_queue.try_enqueue(item).await
125    }
126
127
128    pub async fn close(&self) {
129        self.internal_queue.close().await;
130    }
131
132    pub async fn len(&self) -> usize {
133        let buffer_len = {
134            let buffer = self.priority_buffer.lock().await;
135            buffer.len()
136        };
137        buffer_len + self.internal_queue.len().await
138    }
139}