Skip to main content

oximedia_codec/
packet_queue.rs

1#![allow(dead_code)]
2//! Codec packet queuing and reordering.
3//!
4//! Video codecs often produce packets out of display order (B-frames cause
5//! decode order != presentation order). This module provides a priority queue
6//! that reorders coded packets by PTS (presentation timestamp) or DTS (decode
7//! timestamp) before they are fed to a muxer or consumer.
8
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11
12/// A coded packet with timestamps for queue ordering.
13#[derive(Clone, Debug)]
14pub struct QueuedPacket {
15    /// Presentation timestamp in timebase units.
16    pub pts: i64,
17    /// Decode timestamp in timebase units.
18    pub dts: i64,
19    /// Duration in timebase units.
20    pub duration: u64,
21    /// Stream index this packet belongs to.
22    pub stream_index: u32,
23    /// Whether this is a keyframe.
24    pub is_keyframe: bool,
25    /// Raw payload data.
26    pub data: Vec<u8>,
27    /// Sequence counter for stable sorting when timestamps tie.
28    sequence: u64,
29}
30
31impl QueuedPacket {
32    /// Create a new queued packet.
33    pub fn new(pts: i64, dts: i64, data: Vec<u8>) -> Self {
34        Self {
35            pts,
36            dts,
37            duration: 0,
38            stream_index: 0,
39            is_keyframe: false,
40            data,
41            sequence: 0,
42        }
43    }
44
45    /// Set the duration.
46    pub fn with_duration(mut self, duration: u64) -> Self {
47        self.duration = duration;
48        self
49    }
50
51    /// Set the stream index.
52    pub fn with_stream_index(mut self, index: u32) -> Self {
53        self.stream_index = index;
54        self
55    }
56
57    /// Set the keyframe flag.
58    pub fn with_keyframe(mut self, is_keyframe: bool) -> Self {
59        self.is_keyframe = is_keyframe;
60        self
61    }
62
63    /// Packet size in bytes.
64    pub fn size(&self) -> usize {
65        self.data.len()
66    }
67}
68
69impl PartialEq for QueuedPacket {
70    fn eq(&self, other: &Self) -> bool {
71        self.pts == other.pts && self.sequence == other.sequence
72    }
73}
74
75impl Eq for QueuedPacket {}
76
77/// Ordering strategy for packet queue.
78#[derive(Clone, Copy, Debug, PartialEq, Eq)]
79pub enum QueueOrder {
80    /// Order by PTS (presentation timestamp) ascending.
81    Pts,
82    /// Order by DTS (decode timestamp) ascending.
83    Dts,
84}
85
86impl Default for QueueOrder {
87    fn default() -> Self {
88        Self::Pts
89    }
90}
91
92/// Wrapper for min-heap ordering (BinaryHeap is max-heap by default).
93struct MinPacket {
94    packet: QueuedPacket,
95    order: QueueOrder,
96}
97
98impl PartialEq for MinPacket {
99    fn eq(&self, other: &Self) -> bool {
100        self.packet.eq(&other.packet)
101    }
102}
103
104impl Eq for MinPacket {}
105
106impl PartialOrd for MinPacket {
107    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
108        Some(self.cmp(other))
109    }
110}
111
112impl Ord for MinPacket {
113    fn cmp(&self, other: &Self) -> Ordering {
114        let self_ts = match self.order {
115            QueueOrder::Pts => self.packet.pts,
116            QueueOrder::Dts => self.packet.dts,
117        };
118        let other_ts = match self.order {
119            QueueOrder::Pts => other.packet.pts,
120            QueueOrder::Dts => other.packet.dts,
121        };
122        // Reverse for min-heap behavior
123        other_ts
124            .cmp(&self_ts)
125            .then_with(|| other.packet.sequence.cmp(&self.packet.sequence))
126    }
127}
128
129/// Configuration for the packet queue.
130#[derive(Clone, Debug)]
131pub struct PacketQueueConfig {
132    /// Maximum number of packets to buffer.
133    pub max_packets: usize,
134    /// Maximum total byte size to buffer.
135    pub max_bytes: usize,
136    /// Ordering strategy.
137    pub order: QueueOrder,
138}
139
140impl Default for PacketQueueConfig {
141    fn default() -> Self {
142        Self {
143            max_packets: 256,
144            max_bytes: 64 * 1024 * 1024,
145            order: QueueOrder::Pts,
146        }
147    }
148}
149
150/// Statistics for the packet queue.
151#[derive(Clone, Debug, Default)]
152pub struct QueueStats {
153    /// Total packets enqueued.
154    pub total_enqueued: u64,
155    /// Total packets dequeued.
156    pub total_dequeued: u64,
157    /// Total packets dropped due to overflow.
158    pub total_dropped: u64,
159    /// Total bytes enqueued.
160    pub total_bytes_in: u64,
161    /// Total bytes dequeued.
162    pub total_bytes_out: u64,
163}
164
165impl QueueStats {
166    /// Number of packets currently in the queue.
167    pub fn pending(&self) -> u64 {
168        self.total_enqueued - self.total_dequeued - self.total_dropped
169    }
170}
171
172/// A reordering packet queue.
173///
174/// Packets are inserted in arbitrary order and extracted in PTS or DTS order.
175pub struct PacketQueue {
176    heap: BinaryHeap<MinPacket>,
177    config: PacketQueueConfig,
178    total_bytes: usize,
179    sequence_counter: u64,
180    stats: QueueStats,
181}
182
183impl PacketQueue {
184    /// Create a new packet queue with default configuration.
185    pub fn new() -> Self {
186        Self::with_config(PacketQueueConfig::default())
187    }
188
189    /// Create a new packet queue with custom configuration.
190    pub fn with_config(config: PacketQueueConfig) -> Self {
191        Self {
192            heap: BinaryHeap::new(),
193            config,
194            total_bytes: 0,
195            sequence_counter: 0,
196            stats: QueueStats::default(),
197        }
198    }
199
200    /// Push a packet into the queue. Returns true if accepted, false if dropped.
201    pub fn push(&mut self, mut packet: QueuedPacket) -> bool {
202        let pkt_size = packet.size();
203        if self.heap.len() >= self.config.max_packets
204            || self.total_bytes + pkt_size > self.config.max_bytes
205        {
206            self.stats.total_dropped += 1;
207            return false;
208        }
209        packet.sequence = self.sequence_counter;
210        self.sequence_counter += 1;
211        self.total_bytes += pkt_size;
212        self.stats.total_enqueued += 1;
213        self.stats.total_bytes_in += pkt_size as u64;
214        self.heap.push(MinPacket {
215            packet,
216            order: self.config.order,
217        });
218        true
219    }
220
221    /// Pop the next packet in timestamp order.
222    pub fn pop(&mut self) -> Option<QueuedPacket> {
223        let min_pkt = self.heap.pop()?;
224        let pkt = min_pkt.packet;
225        self.total_bytes -= pkt.size();
226        self.stats.total_dequeued += 1;
227        self.stats.total_bytes_out += pkt.size() as u64;
228        Some(pkt)
229    }
230
231    /// Peek at the next packet without removing it.
232    pub fn peek_pts(&self) -> Option<i64> {
233        self.heap.peek().map(|p| p.packet.pts)
234    }
235
236    /// Number of packets in the queue.
237    pub fn len(&self) -> usize {
238        self.heap.len()
239    }
240
241    /// Whether the queue is empty.
242    pub fn is_empty(&self) -> bool {
243        self.heap.is_empty()
244    }
245
246    /// Total bytes buffered.
247    pub fn total_bytes(&self) -> usize {
248        self.total_bytes
249    }
250
251    /// Queue statistics.
252    pub fn stats(&self) -> &QueueStats {
253        &self.stats
254    }
255
256    /// Drain all packets in timestamp order.
257    pub fn drain(&mut self) -> Vec<QueuedPacket> {
258        let mut out = Vec::with_capacity(self.heap.len());
259        while let Some(pkt) = self.pop() {
260            out.push(pkt);
261        }
262        out
263    }
264
265    /// Clear the queue.
266    pub fn clear(&mut self) {
267        self.heap.clear();
268        self.total_bytes = 0;
269    }
270}
271
272impl Default for PacketQueue {
273    fn default() -> Self {
274        Self::new()
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    #[test]
283    fn test_queued_packet_new() {
284        let pkt = QueuedPacket::new(100, 90, vec![1, 2, 3]);
285        assert_eq!(pkt.pts, 100);
286        assert_eq!(pkt.dts, 90);
287        assert_eq!(pkt.size(), 3);
288        assert!(!pkt.is_keyframe);
289    }
290
291    #[test]
292    fn test_queued_packet_builder() {
293        let pkt = QueuedPacket::new(10, 5, vec![0; 10])
294            .with_duration(33)
295            .with_stream_index(1)
296            .with_keyframe(true);
297        assert_eq!(pkt.duration, 33);
298        assert_eq!(pkt.stream_index, 1);
299        assert!(pkt.is_keyframe);
300    }
301
302    #[test]
303    fn test_queue_order_default() {
304        assert_eq!(QueueOrder::default(), QueueOrder::Pts);
305    }
306
307    #[test]
308    fn test_empty_queue() {
309        let queue = PacketQueue::new();
310        assert!(queue.is_empty());
311        assert_eq!(queue.len(), 0);
312        assert_eq!(queue.total_bytes(), 0);
313    }
314
315    #[test]
316    fn test_push_and_pop_single() {
317        let mut queue = PacketQueue::new();
318        let pkt = QueuedPacket::new(100, 100, vec![42]);
319        assert!(queue.push(pkt));
320        assert_eq!(queue.len(), 1);
321
322        let out = queue.pop().expect("pop should return item");
323        assert_eq!(out.pts, 100);
324        assert_eq!(out.data, vec![42]);
325        assert!(queue.is_empty());
326    }
327
328    #[test]
329    fn test_pts_ordering() {
330        let mut queue = PacketQueue::new();
331        queue.push(QueuedPacket::new(300, 300, vec![3]));
332        queue.push(QueuedPacket::new(100, 100, vec![1]));
333        queue.push(QueuedPacket::new(200, 200, vec![2]));
334
335        assert_eq!(queue.pop().expect("pop should return item").pts, 100);
336        assert_eq!(queue.pop().expect("pop should return item").pts, 200);
337        assert_eq!(queue.pop().expect("pop should return item").pts, 300);
338    }
339
340    #[test]
341    fn test_dts_ordering() {
342        let config = PacketQueueConfig {
343            order: QueueOrder::Dts,
344            ..Default::default()
345        };
346        let mut queue = PacketQueue::with_config(config);
347        queue.push(QueuedPacket::new(300, 200, vec![2]));
348        queue.push(QueuedPacket::new(100, 100, vec![1]));
349        queue.push(QueuedPacket::new(200, 300, vec![3]));
350
351        assert_eq!(queue.pop().expect("pop should return item").dts, 100);
352        assert_eq!(queue.pop().expect("pop should return item").dts, 200);
353        assert_eq!(queue.pop().expect("pop should return item").dts, 300);
354    }
355
356    #[test]
357    fn test_max_packets_overflow() {
358        let config = PacketQueueConfig {
359            max_packets: 2,
360            ..Default::default()
361        };
362        let mut queue = PacketQueue::with_config(config);
363        assert!(queue.push(QueuedPacket::new(1, 1, vec![1])));
364        assert!(queue.push(QueuedPacket::new(2, 2, vec![2])));
365        assert!(!queue.push(QueuedPacket::new(3, 3, vec![3])));
366        assert_eq!(queue.stats().total_dropped, 1);
367    }
368
369    #[test]
370    fn test_max_bytes_overflow() {
371        let config = PacketQueueConfig {
372            max_bytes: 5,
373            ..Default::default()
374        };
375        let mut queue = PacketQueue::with_config(config);
376        assert!(queue.push(QueuedPacket::new(1, 1, vec![0; 3])));
377        assert!(!queue.push(QueuedPacket::new(2, 2, vec![0; 3])));
378        assert_eq!(queue.total_bytes(), 3);
379    }
380
381    #[test]
382    fn test_drain() {
383        let mut queue = PacketQueue::new();
384        queue.push(QueuedPacket::new(30, 30, vec![3]));
385        queue.push(QueuedPacket::new(10, 10, vec![1]));
386        queue.push(QueuedPacket::new(20, 20, vec![2]));
387
388        let drained = queue.drain();
389        assert_eq!(drained.len(), 3);
390        assert_eq!(drained[0].pts, 10);
391        assert_eq!(drained[1].pts, 20);
392        assert_eq!(drained[2].pts, 30);
393        assert!(queue.is_empty());
394    }
395
396    #[test]
397    fn test_peek_pts() {
398        let mut queue = PacketQueue::new();
399        assert!(queue.peek_pts().is_none());
400        queue.push(QueuedPacket::new(50, 50, vec![]));
401        queue.push(QueuedPacket::new(10, 10, vec![]));
402        assert_eq!(queue.peek_pts(), Some(10));
403    }
404
405    #[test]
406    fn test_stats() {
407        let mut queue = PacketQueue::new();
408        queue.push(QueuedPacket::new(1, 1, vec![0; 10]));
409        queue.push(QueuedPacket::new(2, 2, vec![0; 20]));
410        let _ = queue.pop();
411        let stats = queue.stats();
412        assert_eq!(stats.total_enqueued, 2);
413        assert_eq!(stats.total_dequeued, 1);
414        assert_eq!(stats.total_bytes_in, 30);
415        assert_eq!(stats.total_bytes_out, 10);
416        assert_eq!(stats.pending(), 1);
417    }
418
419    #[test]
420    fn test_clear() {
421        let mut queue = PacketQueue::new();
422        queue.push(QueuedPacket::new(1, 1, vec![0; 100]));
423        queue.push(QueuedPacket::new(2, 2, vec![0; 200]));
424        queue.clear();
425        assert!(queue.is_empty());
426        assert_eq!(queue.total_bytes(), 0);
427    }
428}