rs2_stream/media/
priority_queue.rs

1//! Priority queue implementation for media streaming
2//!
3//! Extends the existing Queue with priority-based ordering
4
5use super::types::{MediaChunk, MediaPriority};
6use crate::queue::{Queue, QueueError};
7use async_stream::stream;
8use futures_core::Stream;
9use futures_util::StreamExt;
10use std::cmp::Reverse;
11use std::collections::BinaryHeap;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14use crate::resource_manager::get_global_resource_manager;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17struct PriorityItem {
18    chunk: MediaChunk,
19    priority: MediaPriority,
20    sequence: u64,
21}
22
23impl PartialOrd for PriorityItem {
24    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
25        Some(self.cmp(other))
26    }
27}
28
29impl Ord for PriorityItem {
30    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
31        // Higher priority first, then lower sequence number (older chunks first)
32        self.priority
33            .cmp(&other.priority)
34            .then_with(|| Reverse(self.sequence).cmp(&Reverse(other.sequence)))
35    }
36}
37
38/// Priority queue for media chunks
39/// Uses your existing Queue internally but adds priority ordering
40pub struct MediaPriorityQueue {
41    internal_queue: Queue<PriorityItem>,
42    priority_buffer: Arc<Mutex<BinaryHeap<PriorityItem>>>,
43    buffer_size: usize,
44}
45
46impl MediaPriorityQueue {
47    pub fn new(capacity: usize, priority_buffer_size: usize) -> Self {
48        Self {
49            internal_queue: Queue::bounded(capacity),
50            priority_buffer: Arc::new(Mutex::new(BinaryHeap::new())),
51            buffer_size: priority_buffer_size,
52        }
53    }
54
55    pub async fn enqueue(&self, chunk: MediaChunk) -> Result<(), QueueError> {
56        let resource_manager = get_global_resource_manager();
57        let priority = chunk.priority;
58        let sequence = chunk.sequence_number;
59        let item = PriorityItem {
60            chunk,
61            priority,
62            sequence,
63        };
64        // Try to add to priority buffer first
65        {
66            let mut buffer = self.priority_buffer.lock().await;
67            if buffer.len() < self.buffer_size {
68                buffer.push(item);
69                resource_manager.track_memory_allocation(1).await.ok();
70                return Ok(());
71            } else {
72                resource_manager.track_buffer_overflow().await.ok();
73            }
74        }
75        // Buffer full, push to main queue
76        self.internal_queue.enqueue(item).await
77    }
78
79    pub fn dequeue(&self) -> impl Stream<Item = MediaChunk> + Send + 'static {
80        let priority_buffer = Arc::clone(&self.priority_buffer);
81        let queue_stream = self.internal_queue.dequeue();
82        let resource_manager = get_global_resource_manager();
83        stream! {
84            let mut queue_stream = std::pin::pin!(queue_stream);
85            loop {
86                // First check priority buffer
87                let high_priority_item = {
88                    let mut buffer = priority_buffer.lock().await;
89                    let popped = buffer.pop();
90                    if popped.is_some() {
91                        resource_manager.track_memory_deallocation(1).await;
92                    }
93                    popped
94                };
95                if let Some(item) = high_priority_item {
96                    yield item.chunk;
97                } else {
98                    // No high priority items, get from main queue
99                    match queue_stream.next().await {
100                        Some(item) => {
101                            resource_manager.track_memory_deallocation(1).await;
102                            yield item.chunk
103                        },
104                        None => break,
105                    }
106                }
107            }
108        }
109    }
110
111    /// Try to enqueue without blocking - useful for live streaming
112    pub async fn try_enqueue(&self, chunk: MediaChunk) -> Result<(), QueueError> {
113        let resource_manager = get_global_resource_manager();
114        let priority = chunk.priority;
115        let sequence = chunk.sequence_number;
116        let item = PriorityItem {
117            chunk,
118            priority,
119            sequence,
120        };
121        // Try to add to priority buffer first
122        {
123            let mut buffer = self.priority_buffer.lock().await;
124            if buffer.len() < self.buffer_size {
125                buffer.push(item);
126                resource_manager.track_memory_allocation(1).await.ok();
127                return Ok(());
128            } else {
129                resource_manager.track_buffer_overflow().await.ok();
130            }
131        }
132        // Buffer full, try to push to main queue without blocking
133        self.internal_queue.try_enqueue(item).await
134    }
135
136    pub async fn close(&self) {
137        self.internal_queue.close().await;
138    }
139
140    pub async fn len(&self) -> usize {
141        let buffer_len = {
142            let buffer = self.priority_buffer.lock().await;
143            buffer.len()
144        };
145        buffer_len + self.internal_queue.len().await
146    }
147}