rs2_stream/media/
priority_queue.rs1use super::types::{MediaChunk, MediaPriority};
6use crate::queue::{Queue, QueueError};
7use async_stream::stream;
8use futures_core::Stream;
9use futures_util::StreamExt;
10use std::cmp::Reverse;
11use std::collections::BinaryHeap;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14use crate::resource_manager::get_global_resource_manager;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17struct PriorityItem {
18 chunk: MediaChunk,
19 priority: MediaPriority,
20 sequence: u64,
21}
22
23impl PartialOrd for PriorityItem {
24 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
25 Some(self.cmp(other))
26 }
27}
28
29impl Ord for PriorityItem {
30 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
31 self.priority
33 .cmp(&other.priority)
34 .then_with(|| Reverse(self.sequence).cmp(&Reverse(other.sequence)))
35 }
36}
37
38pub struct MediaPriorityQueue {
41 internal_queue: Queue<PriorityItem>,
42 priority_buffer: Arc<Mutex<BinaryHeap<PriorityItem>>>,
43 buffer_size: usize,
44}
45
46impl MediaPriorityQueue {
47 pub fn new(capacity: usize, priority_buffer_size: usize) -> Self {
48 Self {
49 internal_queue: Queue::bounded(capacity),
50 priority_buffer: Arc::new(Mutex::new(BinaryHeap::new())),
51 buffer_size: priority_buffer_size,
52 }
53 }
54
55 pub async fn enqueue(&self, chunk: MediaChunk) -> Result<(), QueueError> {
56 let resource_manager = get_global_resource_manager();
57 let priority = chunk.priority;
58 let sequence = chunk.sequence_number;
59 let item = PriorityItem {
60 chunk,
61 priority,
62 sequence,
63 };
64 {
66 let mut buffer = self.priority_buffer.lock().await;
67 if buffer.len() < self.buffer_size {
68 buffer.push(item);
69 resource_manager.track_memory_allocation(1).await.ok();
70 return Ok(());
71 } else {
72 resource_manager.track_buffer_overflow().await.ok();
73 }
74 }
75 self.internal_queue.enqueue(item).await
77 }
78
79 pub fn dequeue(&self) -> impl Stream<Item = MediaChunk> + Send + 'static {
80 let priority_buffer = Arc::clone(&self.priority_buffer);
81 let queue_stream = self.internal_queue.dequeue();
82 let resource_manager = get_global_resource_manager();
83 stream! {
84 let mut queue_stream = std::pin::pin!(queue_stream);
85 loop {
86 let high_priority_item = {
88 let mut buffer = priority_buffer.lock().await;
89 let popped = buffer.pop();
90 if popped.is_some() {
91 resource_manager.track_memory_deallocation(1).await;
92 }
93 popped
94 };
95 if let Some(item) = high_priority_item {
96 yield item.chunk;
97 } else {
98 match queue_stream.next().await {
100 Some(item) => {
101 resource_manager.track_memory_deallocation(1).await;
102 yield item.chunk
103 },
104 None => break,
105 }
106 }
107 }
108 }
109 }
110
111 pub async fn try_enqueue(&self, chunk: MediaChunk) -> Result<(), QueueError> {
113 let resource_manager = get_global_resource_manager();
114 let priority = chunk.priority;
115 let sequence = chunk.sequence_number;
116 let item = PriorityItem {
117 chunk,
118 priority,
119 sequence,
120 };
121 {
123 let mut buffer = self.priority_buffer.lock().await;
124 if buffer.len() < self.buffer_size {
125 buffer.push(item);
126 resource_manager.track_memory_allocation(1).await.ok();
127 return Ok(());
128 } else {
129 resource_manager.track_buffer_overflow().await.ok();
130 }
131 }
132 self.internal_queue.try_enqueue(item).await
134 }
135
136 pub async fn close(&self) {
137 self.internal_queue.close().await;
138 }
139
140 pub async fn len(&self) -> usize {
141 let buffer_len = {
142 let buffer = self.priority_buffer.lock().await;
143 buffer.len()
144 };
145 buffer_len + self.internal_queue.len().await
146 }
147}