1#![allow(dead_code)]
2use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11
12#[derive(Clone, Debug)]
14pub struct QueuedPacket {
15 pub pts: i64,
17 pub dts: i64,
19 pub duration: u64,
21 pub stream_index: u32,
23 pub is_keyframe: bool,
25 pub data: Vec<u8>,
27 sequence: u64,
29}
30
31impl QueuedPacket {
32 pub fn new(pts: i64, dts: i64, data: Vec<u8>) -> Self {
34 Self {
35 pts,
36 dts,
37 duration: 0,
38 stream_index: 0,
39 is_keyframe: false,
40 data,
41 sequence: 0,
42 }
43 }
44
45 pub fn with_duration(mut self, duration: u64) -> Self {
47 self.duration = duration;
48 self
49 }
50
51 pub fn with_stream_index(mut self, index: u32) -> Self {
53 self.stream_index = index;
54 self
55 }
56
57 pub fn with_keyframe(mut self, is_keyframe: bool) -> Self {
59 self.is_keyframe = is_keyframe;
60 self
61 }
62
63 pub fn size(&self) -> usize {
65 self.data.len()
66 }
67}
68
69impl PartialEq for QueuedPacket {
70 fn eq(&self, other: &Self) -> bool {
71 self.pts == other.pts && self.sequence == other.sequence
72 }
73}
74
75impl Eq for QueuedPacket {}
76
77#[derive(Clone, Copy, Debug, PartialEq, Eq)]
79pub enum QueueOrder {
80 Pts,
82 Dts,
84}
85
86impl Default for QueueOrder {
87 fn default() -> Self {
88 Self::Pts
89 }
90}
91
92struct MinPacket {
94 packet: QueuedPacket,
95 order: QueueOrder,
96}
97
98impl PartialEq for MinPacket {
99 fn eq(&self, other: &Self) -> bool {
100 self.packet.eq(&other.packet)
101 }
102}
103
104impl Eq for MinPacket {}
105
106impl PartialOrd for MinPacket {
107 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
108 Some(self.cmp(other))
109 }
110}
111
112impl Ord for MinPacket {
113 fn cmp(&self, other: &Self) -> Ordering {
114 let self_ts = match self.order {
115 QueueOrder::Pts => self.packet.pts,
116 QueueOrder::Dts => self.packet.dts,
117 };
118 let other_ts = match self.order {
119 QueueOrder::Pts => other.packet.pts,
120 QueueOrder::Dts => other.packet.dts,
121 };
122 other_ts
124 .cmp(&self_ts)
125 .then_with(|| other.packet.sequence.cmp(&self.packet.sequence))
126 }
127}
128
129#[derive(Clone, Debug)]
131pub struct PacketQueueConfig {
132 pub max_packets: usize,
134 pub max_bytes: usize,
136 pub order: QueueOrder,
138}
139
140impl Default for PacketQueueConfig {
141 fn default() -> Self {
142 Self {
143 max_packets: 256,
144 max_bytes: 64 * 1024 * 1024,
145 order: QueueOrder::Pts,
146 }
147 }
148}
149
150#[derive(Clone, Debug, Default)]
152pub struct QueueStats {
153 pub total_enqueued: u64,
155 pub total_dequeued: u64,
157 pub total_dropped: u64,
159 pub total_bytes_in: u64,
161 pub total_bytes_out: u64,
163}
164
165impl QueueStats {
166 pub fn pending(&self) -> u64 {
168 self.total_enqueued - self.total_dequeued - self.total_dropped
169 }
170}
171
172pub struct PacketQueue {
176 heap: BinaryHeap<MinPacket>,
177 config: PacketQueueConfig,
178 total_bytes: usize,
179 sequence_counter: u64,
180 stats: QueueStats,
181}
182
183impl PacketQueue {
184 pub fn new() -> Self {
186 Self::with_config(PacketQueueConfig::default())
187 }
188
189 pub fn with_config(config: PacketQueueConfig) -> Self {
191 Self {
192 heap: BinaryHeap::new(),
193 config,
194 total_bytes: 0,
195 sequence_counter: 0,
196 stats: QueueStats::default(),
197 }
198 }
199
200 pub fn push(&mut self, mut packet: QueuedPacket) -> bool {
202 let pkt_size = packet.size();
203 if self.heap.len() >= self.config.max_packets
204 || self.total_bytes + pkt_size > self.config.max_bytes
205 {
206 self.stats.total_dropped += 1;
207 return false;
208 }
209 packet.sequence = self.sequence_counter;
210 self.sequence_counter += 1;
211 self.total_bytes += pkt_size;
212 self.stats.total_enqueued += 1;
213 self.stats.total_bytes_in += pkt_size as u64;
214 self.heap.push(MinPacket {
215 packet,
216 order: self.config.order,
217 });
218 true
219 }
220
221 pub fn pop(&mut self) -> Option<QueuedPacket> {
223 let min_pkt = self.heap.pop()?;
224 let pkt = min_pkt.packet;
225 self.total_bytes -= pkt.size();
226 self.stats.total_dequeued += 1;
227 self.stats.total_bytes_out += pkt.size() as u64;
228 Some(pkt)
229 }
230
231 pub fn peek_pts(&self) -> Option<i64> {
233 self.heap.peek().map(|p| p.packet.pts)
234 }
235
236 pub fn len(&self) -> usize {
238 self.heap.len()
239 }
240
241 pub fn is_empty(&self) -> bool {
243 self.heap.is_empty()
244 }
245
246 pub fn total_bytes(&self) -> usize {
248 self.total_bytes
249 }
250
251 pub fn stats(&self) -> &QueueStats {
253 &self.stats
254 }
255
256 pub fn drain(&mut self) -> Vec<QueuedPacket> {
258 let mut out = Vec::with_capacity(self.heap.len());
259 while let Some(pkt) = self.pop() {
260 out.push(pkt);
261 }
262 out
263 }
264
265 pub fn clear(&mut self) {
267 self.heap.clear();
268 self.total_bytes = 0;
269 }
270}
271
272impl Default for PacketQueue {
273 fn default() -> Self {
274 Self::new()
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 #[test]
283 fn test_queued_packet_new() {
284 let pkt = QueuedPacket::new(100, 90, vec![1, 2, 3]);
285 assert_eq!(pkt.pts, 100);
286 assert_eq!(pkt.dts, 90);
287 assert_eq!(pkt.size(), 3);
288 assert!(!pkt.is_keyframe);
289 }
290
291 #[test]
292 fn test_queued_packet_builder() {
293 let pkt = QueuedPacket::new(10, 5, vec![0; 10])
294 .with_duration(33)
295 .with_stream_index(1)
296 .with_keyframe(true);
297 assert_eq!(pkt.duration, 33);
298 assert_eq!(pkt.stream_index, 1);
299 assert!(pkt.is_keyframe);
300 }
301
302 #[test]
303 fn test_queue_order_default() {
304 assert_eq!(QueueOrder::default(), QueueOrder::Pts);
305 }
306
307 #[test]
308 fn test_empty_queue() {
309 let queue = PacketQueue::new();
310 assert!(queue.is_empty());
311 assert_eq!(queue.len(), 0);
312 assert_eq!(queue.total_bytes(), 0);
313 }
314
315 #[test]
316 fn test_push_and_pop_single() {
317 let mut queue = PacketQueue::new();
318 let pkt = QueuedPacket::new(100, 100, vec![42]);
319 assert!(queue.push(pkt));
320 assert_eq!(queue.len(), 1);
321
322 let out = queue.pop().expect("pop should return item");
323 assert_eq!(out.pts, 100);
324 assert_eq!(out.data, vec![42]);
325 assert!(queue.is_empty());
326 }
327
328 #[test]
329 fn test_pts_ordering() {
330 let mut queue = PacketQueue::new();
331 queue.push(QueuedPacket::new(300, 300, vec![3]));
332 queue.push(QueuedPacket::new(100, 100, vec![1]));
333 queue.push(QueuedPacket::new(200, 200, vec![2]));
334
335 assert_eq!(queue.pop().expect("pop should return item").pts, 100);
336 assert_eq!(queue.pop().expect("pop should return item").pts, 200);
337 assert_eq!(queue.pop().expect("pop should return item").pts, 300);
338 }
339
340 #[test]
341 fn test_dts_ordering() {
342 let config = PacketQueueConfig {
343 order: QueueOrder::Dts,
344 ..Default::default()
345 };
346 let mut queue = PacketQueue::with_config(config);
347 queue.push(QueuedPacket::new(300, 200, vec![2]));
348 queue.push(QueuedPacket::new(100, 100, vec![1]));
349 queue.push(QueuedPacket::new(200, 300, vec![3]));
350
351 assert_eq!(queue.pop().expect("pop should return item").dts, 100);
352 assert_eq!(queue.pop().expect("pop should return item").dts, 200);
353 assert_eq!(queue.pop().expect("pop should return item").dts, 300);
354 }
355
356 #[test]
357 fn test_max_packets_overflow() {
358 let config = PacketQueueConfig {
359 max_packets: 2,
360 ..Default::default()
361 };
362 let mut queue = PacketQueue::with_config(config);
363 assert!(queue.push(QueuedPacket::new(1, 1, vec![1])));
364 assert!(queue.push(QueuedPacket::new(2, 2, vec![2])));
365 assert!(!queue.push(QueuedPacket::new(3, 3, vec![3])));
366 assert_eq!(queue.stats().total_dropped, 1);
367 }
368
369 #[test]
370 fn test_max_bytes_overflow() {
371 let config = PacketQueueConfig {
372 max_bytes: 5,
373 ..Default::default()
374 };
375 let mut queue = PacketQueue::with_config(config);
376 assert!(queue.push(QueuedPacket::new(1, 1, vec![0; 3])));
377 assert!(!queue.push(QueuedPacket::new(2, 2, vec![0; 3])));
378 assert_eq!(queue.total_bytes(), 3);
379 }
380
381 #[test]
382 fn test_drain() {
383 let mut queue = PacketQueue::new();
384 queue.push(QueuedPacket::new(30, 30, vec![3]));
385 queue.push(QueuedPacket::new(10, 10, vec![1]));
386 queue.push(QueuedPacket::new(20, 20, vec![2]));
387
388 let drained = queue.drain();
389 assert_eq!(drained.len(), 3);
390 assert_eq!(drained[0].pts, 10);
391 assert_eq!(drained[1].pts, 20);
392 assert_eq!(drained[2].pts, 30);
393 assert!(queue.is_empty());
394 }
395
396 #[test]
397 fn test_peek_pts() {
398 let mut queue = PacketQueue::new();
399 assert!(queue.peek_pts().is_none());
400 queue.push(QueuedPacket::new(50, 50, vec![]));
401 queue.push(QueuedPacket::new(10, 10, vec![]));
402 assert_eq!(queue.peek_pts(), Some(10));
403 }
404
405 #[test]
406 fn test_stats() {
407 let mut queue = PacketQueue::new();
408 queue.push(QueuedPacket::new(1, 1, vec![0; 10]));
409 queue.push(QueuedPacket::new(2, 2, vec![0; 20]));
410 let _ = queue.pop();
411 let stats = queue.stats();
412 assert_eq!(stats.total_enqueued, 2);
413 assert_eq!(stats.total_dequeued, 1);
414 assert_eq!(stats.total_bytes_in, 30);
415 assert_eq!(stats.total_bytes_out, 10);
416 assert_eq!(stats.pending(), 1);
417 }
418
419 #[test]
420 fn test_clear() {
421 let mut queue = PacketQueue::new();
422 queue.push(QueuedPacket::new(1, 1, vec![0; 100]));
423 queue.push(QueuedPacket::new(2, 2, vec![0; 200]));
424 queue.clear();
425 assert!(queue.is_empty());
426 assert_eq!(queue.total_bytes(), 0);
427 }
428}