rs2_stream/media/
priority_queue.rs1use crate::queue::{Queue, QueueError};
6use super::types::{MediaChunk, MediaPriority};
7use async_stream::stream;
8use futures_core::Stream;
9use futures_util::StreamExt;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16struct PriorityItem {
17 chunk: MediaChunk,
18 priority: MediaPriority,
19 sequence: u64,
20}
21
22impl PartialOrd for PriorityItem {
23 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
24 Some(self.cmp(other))
25 }
26}
27
28impl Ord for PriorityItem {
29 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
30 self.priority.cmp(&other.priority)
32 .then_with(|| Reverse(self.sequence).cmp(&Reverse(other.sequence)))
33 }
34}
35
36pub struct MediaPriorityQueue {
39 internal_queue: Queue<PriorityItem>,
40 priority_buffer: Arc<Mutex<BinaryHeap<PriorityItem>>>,
41 buffer_size: usize,
42}
43
44impl MediaPriorityQueue {
45 pub fn new(capacity: usize, priority_buffer_size: usize) -> Self {
46 Self {
47 internal_queue: Queue::bounded(capacity),
48 priority_buffer: Arc::new(Mutex::new(BinaryHeap::new())),
49 buffer_size: priority_buffer_size,
50 }
51 }
52
53 pub async fn enqueue(&self, chunk: MediaChunk) -> Result<(), QueueError> {
54 let priority = chunk.priority;
55 let sequence = chunk.sequence_number;
56
57 let item = PriorityItem {
58 chunk,
59 priority,
60 sequence,
61 };
62
63 {
65 let mut buffer = self.priority_buffer.lock().await;
66 if buffer.len() < self.buffer_size {
67 buffer.push(item);
68 return Ok(());
69 }
70 }
71
72 self.internal_queue.enqueue(item).await
74 }
75
76 pub fn dequeue(&self) -> impl Stream<Item = MediaChunk> + Send + 'static {
77 let priority_buffer = Arc::clone(&self.priority_buffer);
78 let queue_stream = self.internal_queue.dequeue();
79
80 stream! {
81 let mut queue_stream = std::pin::pin!(queue_stream);
82
83 loop {
84 let high_priority_item = {
86 let mut buffer = priority_buffer.lock().await;
87 buffer.pop()
88 };
89
90 if let Some(item) = high_priority_item {
91 yield item.chunk;
92 } else {
93 match queue_stream.next().await {
95 Some(item) => yield item.chunk,
96 None => break,
97 }
98 }
99 }
100 }
101 }
102
103 pub async fn try_enqueue(&self, chunk: MediaChunk) -> Result<(), QueueError> {
105 let priority = chunk.priority;
106 let sequence = chunk.sequence_number;
107
108 let item = PriorityItem {
109 chunk,
110 priority,
111 sequence,
112 };
113
114 {
116 let mut buffer = self.priority_buffer.lock().await;
117 if buffer.len() < self.buffer_size {
118 buffer.push(item);
119 return Ok(());
120 }
121 }
122
123 self.internal_queue.try_enqueue(item).await
125 }
126
127
128 pub async fn close(&self) {
129 self.internal_queue.close().await;
130 }
131
132 pub async fn len(&self) -> usize {
133 let buffer_len = {
134 let buffer = self.priority_buffer.lock().await;
135 buffer.len()
136 };
137 buffer_len + self.internal_queue.len().await
138 }
139}