Skip to main content

oximedia_codec/
frame_queue.rs

1//! Encoder frame queue with PTS-ordered input, B-frame reorder buffer, and DTS calculation.
2//!
3//! ## Overview
4//!
5//! Video encoders with B-frame support cannot emit packets in display order.  They
6//! need to:
7//! 1. **Receive** frames in display/presentation order (PTS-ordered).
8//! 2. **Reorder** them so reference frames (I/P) are encoded before the B-frames
9//!    that depend on them.
10//! 3. **Emit** encoded packets with a *decode timestamp* (DTS) that is ≤ the PTS
11//!    and strictly non-decreasing.
12//!
13//! This module provides:
14//! - [`FrameQueue`] — PTS-sorted input staging area that enforces ordering and
15//!   detects duplicate / out-of-order submissions.
16//! - [`BFrameReorderBuffer`] — Delayed-output buffer with configurable lookahead
17//!   depth that reorders frames for B-frame encoding and computes DTS offsets.
18//! - [`DtsCalculator`] — Standalone utility to turn a sequence of (PTS, is_key)
19//!   pairs into correct DTS values.
20
21use std::cmp::Reverse;
22use std::collections::BinaryHeap;
23
24use crate::error::{CodecError, CodecResult};
25
26/// A single frame entry held in the queue.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct QueuedFrame {
29    /// Presentation timestamp in encoder timebase units.
30    pub pts: i64,
31    /// Frame type hint used for reorder decisions.
32    pub frame_type: QueueFrameType,
33    /// Opaque payload (e.g. raw pixel data or a frame index).
34    pub data: Vec<u8>,
35}
36
37/// Frame type hint supplied by the caller.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum QueueFrameType {
40    /// Intra-coded (keyframe) — never depends on other frames.
41    Intra,
42    /// Inter-coded, predicted from a past reference.
43    Inter,
44    /// Bi-directionally predicted — depends on both past and future references.
45    BiPredicted,
46}
47
48/// A [`QueuedFrame`] augmented with its computed DTS, ready to be handed to the
49/// codec for actual encoding.
50#[derive(Debug, Clone)]
51pub struct ReadyFrame {
52    /// Original presentation timestamp.
53    pub pts: i64,
54    /// Decode timestamp (≤ pts, strictly non-decreasing per output packet).
55    pub dts: i64,
56    /// Frame type.
57    pub frame_type: QueueFrameType,
58    /// Opaque payload forwarded from [`QueuedFrame`].
59    pub data: Vec<u8>,
60}
61
62// ── FrameQueue ───────────────────────────────────────────────────────────────
63
64/// PTS-ordered staging queue for incoming frames.
65///
66/// Frames are pushed in any order and popped in strictly ascending PTS order.
67/// Submitting a frame with a PTS that already exists in the queue is an error.
68#[derive(Debug, Default)]
69pub struct FrameQueue {
70    /// Min-heap: `Reverse` so `BinaryHeap` (max-heap) becomes a min-heap.
71    heap: BinaryHeap<Reverse<PtsOrdFrame>>,
72    /// Set of PTS values currently in the queue (for duplicate detection).
73    pts_set: std::collections::BTreeSet<i64>,
74}
75
76/// Wrapper that orders [`QueuedFrame`] by PTS.
77#[derive(Debug, Clone, PartialEq, Eq)]
78struct PtsOrdFrame(QueuedFrame);
79
80impl PartialOrd for PtsOrdFrame {
81    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
82        Some(self.cmp(other))
83    }
84}
85
86impl Ord for PtsOrdFrame {
87    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
88        self.0.pts.cmp(&other.0.pts)
89    }
90}
91
92impl FrameQueue {
93    /// Create an empty queue.
94    pub fn new() -> Self {
95        Self::default()
96    }
97
98    /// Push a frame.  Returns an error if a frame with the same PTS is already queued.
99    pub fn push(&mut self, frame: QueuedFrame) -> CodecResult<()> {
100        if self.pts_set.contains(&frame.pts) {
101            return Err(CodecError::InvalidParameter(format!(
102                "duplicate PTS {} in frame queue",
103                frame.pts
104            )));
105        }
106        self.pts_set.insert(frame.pts);
107        self.heap.push(Reverse(PtsOrdFrame(frame)));
108        Ok(())
109    }
110
111    /// Pop the frame with the lowest PTS, or `None` if the queue is empty.
112    pub fn pop(&mut self) -> Option<QueuedFrame> {
113        let Reverse(PtsOrdFrame(frame)) = self.heap.pop()?;
114        self.pts_set.remove(&frame.pts);
115        Some(frame)
116    }
117
118    /// Peek at the PTS of the next frame without removing it.
119    pub fn peek_pts(&self) -> Option<i64> {
120        self.heap.peek().map(|Reverse(PtsOrdFrame(f))| f.pts)
121    }
122
123    /// Number of frames currently queued.
124    pub fn len(&self) -> usize {
125        self.heap.len()
126    }
127
128    /// Returns `true` if no frames are queued.
129    pub fn is_empty(&self) -> bool {
130        self.heap.is_empty()
131    }
132
133    /// Drain all frames in ascending PTS order.
134    pub fn drain_ordered(&mut self) -> Vec<QueuedFrame> {
135        let mut result = Vec::with_capacity(self.heap.len());
136        while let Some(f) = self.pop() {
137            result.push(f);
138        }
139        result
140    }
141}
142
143// ── BFrameReorderBuffer ──────────────────────────────────────────────────────
144
145/// Configuration for the B-frame reorder buffer.
146#[derive(Debug, Clone)]
147pub struct ReorderConfig {
148    /// Maximum number of consecutive B-frames between reference frames
149    /// (lookahead depth).  Set to `0` to disable B-frame reordering.
150    pub max_b_frames: usize,
151    /// Timebase numerator (for DTS offset calculation).
152    /// DTS is expressed in the same timebase as PTS.
153    pub timebase_num: u32,
154    /// Timebase denominator (for DTS offset calculation).
155    pub timebase_den: u32,
156    /// Minimum delta between adjacent DTS values.  Typically
157    /// `timebase_den / (framerate_num * timebase_num)`.
158    pub min_dts_delta: i64,
159}
160
161impl Default for ReorderConfig {
162    fn default() -> Self {
163        Self {
164            max_b_frames: 2,
165            timebase_num: 1,
166            timebase_den: 90_000, // MPEG-90 kHz timebase
167            min_dts_delta: 3000,  // 1 frame @ 30 fps in 90 kHz units
168        }
169    }
170}
171
172/// B-frame reorder buffer.
173///
174/// Accepts frames in PTS order and emits [`ReadyFrame`]s in *decode order* with
175/// computed DTS values.
176///
177/// ### Reorder algorithm (simplified)
178///
179/// Frames are collected into *groups of pictures* (GOPs).  Within each GOP a
180/// run of `B` frames is deferred until its anchor `P/I` frame has been output.
181/// DTS is assigned as a monotonically increasing counter starting from the
182/// smallest PTS seen, offset back by `max_b_frames * min_dts_delta` to ensure
183/// DTS ≤ PTS for every frame.
184#[derive(Debug)]
185pub struct BFrameReorderBuffer {
186    config: ReorderConfig,
187    /// Frames waiting to be emitted after the anchor is found.
188    pending: Vec<QueuedFrame>,
189    /// Next DTS value to assign.
190    next_dts: Option<i64>,
191    /// Monotonically increasing DTS counter (once initialised).
192    dts_counter: i64,
193    /// Output queue: frames ready for the encoder in decode order.
194    output: std::collections::VecDeque<ReadyFrame>,
195}
196
197impl BFrameReorderBuffer {
198    /// Create a new reorder buffer with the given configuration.
199    pub fn new(config: ReorderConfig) -> Self {
200        Self {
201            config,
202            pending: Vec::new(),
203            next_dts: None,
204            dts_counter: 0,
205            output: std::collections::VecDeque::new(),
206        }
207    }
208
209    /// Create with default configuration.
210    pub fn default_config() -> Self {
211        Self::new(ReorderConfig::default())
212    }
213
214    /// Push a frame in PTS / display order.
215    ///
216    /// The buffer will internally decide when to emit frames in decode order.
217    pub fn push(&mut self, frame: QueuedFrame) {
218        // Initialise DTS on first frame.  We set it `max_b_frames` steps before
219        // the first PTS so that B-frame packets always have DTS ≤ PTS.
220        if self.next_dts.is_none() {
221            let offset = (self.config.max_b_frames as i64) * self.config.min_dts_delta;
222            let initial_dts = frame.pts - offset;
223            self.next_dts = Some(initial_dts);
224            self.dts_counter = initial_dts;
225        }
226
227        match frame.frame_type {
228            QueueFrameType::Intra | QueueFrameType::Inter => {
229                // Anchor frame: flush any pending B-frames first (they have
230                // smaller PTS but were held so the anchor could be encoded
231                // first).
232                self.flush_pending_b_frames();
233                // Then emit the anchor itself.
234                self.emit_frame(frame);
235            }
236            QueueFrameType::BiPredicted => {
237                if self.config.max_b_frames == 0 {
238                    // B-frames disabled: treat as inter.
239                    self.emit_frame(frame);
240                } else {
241                    self.pending.push(frame);
242                    // Flush when the buffer is full.
243                    if self.pending.len() >= self.config.max_b_frames {
244                        self.flush_pending_b_frames();
245                    }
246                }
247            }
248        }
249    }
250
251    /// Flush all remaining frames (call at end of stream).
252    pub fn flush(&mut self) {
253        self.flush_pending_b_frames();
254    }
255
256    /// Pop the next [`ReadyFrame`] in decode order, or `None` if none are ready.
257    pub fn pop(&mut self) -> Option<ReadyFrame> {
258        self.output.pop_front()
259    }
260
261    /// Number of frames ready to be consumed.
262    pub fn ready_len(&self) -> usize {
263        self.output.len()
264    }
265
266    /// Number of frames still buffered (not yet emitted).
267    pub fn pending_len(&self) -> usize {
268        self.pending.len()
269    }
270
271    // ── private helpers ───────────────────────────────────────────────────
272
273    fn flush_pending_b_frames(&mut self) {
274        // Sort pending B-frames by PTS before emitting them.
275        self.pending.sort_by_key(|f| f.pts);
276        let frames: Vec<_> = self.pending.drain(..).collect();
277        for f in frames {
278            self.emit_frame(f);
279        }
280    }
281
282    fn emit_frame(&mut self, frame: QueuedFrame) {
283        let dts = self.dts_counter;
284        self.dts_counter += self.config.min_dts_delta;
285        self.output.push_back(ReadyFrame {
286            pts: frame.pts,
287            dts,
288            frame_type: frame.frame_type,
289            data: frame.data,
290        });
291    }
292}
293
294// ── DtsCalculator ────────────────────────────────────────────────────────────
295
296/// Standalone DTS calculator.
297///
298/// Given a sequence of `(pts, is_keyframe)` pairs, produces DTS values that are:
299/// - Strictly non-decreasing.
300/// - Always ≤ the corresponding PTS.
301/// - Separated by at least `min_delta` timebase units.
302///
303/// Useful when you already have the full frame sequence and want to annotate
304/// DTS in batch (e.g. when writing container mux metadata).
305#[derive(Debug)]
306pub struct DtsCalculator {
307    /// Minimum gap between adjacent DTS values.
308    min_delta: i64,
309    /// B-frame lookahead depth (used for initial DTS pre-roll offset).
310    max_b_frames: usize,
311    /// Running DTS counter.
312    next_dts: Option<i64>,
313}
314
315impl DtsCalculator {
316    /// Create a new calculator.
317    ///
318    /// `min_delta` must be > 0.
319    pub fn new(min_delta: i64, max_b_frames: usize) -> CodecResult<Self> {
320        if min_delta <= 0 {
321            return Err(CodecError::InvalidParameter(
322                "DtsCalculator: min_delta must be positive".into(),
323            ));
324        }
325        Ok(Self {
326            min_delta,
327            max_b_frames,
328            next_dts: None,
329        })
330    }
331
332    /// Compute DTS for a single frame in sequence order.
333    ///
334    /// The first call initialises the internal counter from `pts` minus the
335    /// B-frame pre-roll offset.
336    pub fn next(&mut self, pts: i64, _is_keyframe: bool) -> i64 {
337        let dts = match self.next_dts {
338            None => {
339                let offset = (self.max_b_frames as i64) * self.min_delta;
340                let initial = pts - offset;
341                self.next_dts = Some(initial + self.min_delta);
342                initial
343            }
344            Some(ref mut counter) => {
345                let dts = *counter;
346                *counter += self.min_delta;
347                dts
348            }
349        };
350        dts
351    }
352
353    /// Compute DTS for a batch of `(pts, is_keyframe)` pairs.
354    ///
355    /// Returns a `Vec<i64>` of the same length.
356    pub fn compute_batch(&mut self, frames: &[(i64, bool)]) -> Vec<i64> {
357        frames
358            .iter()
359            .map(|&(pts, is_key)| self.next(pts, is_key))
360            .collect()
361    }
362
363    /// Reset the internal counter so a new sequence can be processed.
364    pub fn reset(&mut self) {
365        self.next_dts = None;
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    fn make_frame(pts: i64, ft: QueueFrameType) -> QueuedFrame {
374        QueuedFrame {
375            pts,
376            frame_type: ft,
377            data: vec![pts as u8],
378        }
379    }
380
381    #[test]
382    fn test_frame_queue_push_pop_ordered() {
383        let mut q = FrameQueue::new();
384        q.push(make_frame(200, QueueFrameType::Inter)).unwrap();
385        q.push(make_frame(0, QueueFrameType::Intra)).unwrap();
386        q.push(make_frame(100, QueueFrameType::BiPredicted))
387            .unwrap();
388
389        assert_eq!(q.pop().unwrap().pts, 0);
390        assert_eq!(q.pop().unwrap().pts, 100);
391        assert_eq!(q.pop().unwrap().pts, 200);
392        assert!(q.pop().is_none());
393    }
394
395    #[test]
396    fn test_frame_queue_duplicate_pts_error() {
397        let mut q = FrameQueue::new();
398        q.push(make_frame(100, QueueFrameType::Intra)).unwrap();
399        let result = q.push(make_frame(100, QueueFrameType::Inter));
400        assert!(result.is_err());
401    }
402
403    #[test]
404    fn test_frame_queue_drain_ordered() {
405        let mut q = FrameQueue::new();
406        for pts in [500i64, 100, 300, 0, 200] {
407            q.push(make_frame(pts, QueueFrameType::Inter)).unwrap();
408        }
409        let drained = q.drain_ordered();
410        let pts_seq: Vec<i64> = drained.iter().map(|f| f.pts).collect();
411        assert_eq!(pts_seq, vec![0, 100, 200, 300, 500]);
412    }
413
414    #[test]
415    fn test_frame_queue_peek_pts() {
416        let mut q = FrameQueue::new();
417        assert_eq!(q.peek_pts(), None);
418        q.push(make_frame(50, QueueFrameType::Intra)).unwrap();
419        q.push(make_frame(10, QueueFrameType::Inter)).unwrap();
420        assert_eq!(q.peek_pts(), Some(10));
421    }
422
423    #[test]
424    fn test_b_frame_reorder_anchor_before_b() {
425        let cfg = ReorderConfig {
426            max_b_frames: 2,
427            min_dts_delta: 1,
428            ..Default::default()
429        };
430        let mut buf = BFrameReorderBuffer::new(cfg);
431        // Display order: I B B P
432        buf.push(make_frame(0, QueueFrameType::Intra));
433        buf.push(make_frame(1, QueueFrameType::BiPredicted));
434        buf.push(make_frame(2, QueueFrameType::BiPredicted));
435        buf.push(make_frame(3, QueueFrameType::Inter));
436        buf.flush();
437
438        // Decode order should be: I P B B  (or I then pending B-frames after P anchor)
439        let mut out = Vec::new();
440        while let Some(f) = buf.pop() {
441            out.push(f);
442        }
443        assert!(!out.is_empty());
444        // DTS must be non-decreasing
445        for w in out.windows(2) {
446            assert!(w[1].dts >= w[0].dts, "DTS must be non-decreasing");
447        }
448    }
449
450    #[test]
451    fn test_b_frame_reorder_dts_leq_pts() {
452        let cfg = ReorderConfig {
453            max_b_frames: 2,
454            min_dts_delta: 3000,
455            ..Default::default()
456        };
457        let mut buf = BFrameReorderBuffer::new(cfg);
458        let pts_sequence = [0i64, 3000, 6000, 9000, 12000];
459        for (i, &pts) in pts_sequence.iter().enumerate() {
460            let ft = if i % 3 == 0 {
461                QueueFrameType::Intra
462            } else if i % 3 == 1 {
463                QueueFrameType::BiPredicted
464            } else {
465                QueueFrameType::Inter
466            };
467            buf.push(make_frame(pts, ft));
468        }
469        buf.flush();
470
471        while let Some(f) = buf.pop() {
472            assert!(f.dts <= f.pts, "DTS ({}) must be <= PTS ({})", f.dts, f.pts);
473        }
474    }
475
476    #[test]
477    fn test_dts_calculator_basic() {
478        let mut calc = DtsCalculator::new(3000, 2).unwrap();
479        let pts_vals = [6000i64, 9000, 12000, 15000];
480        let frames: Vec<(i64, bool)> = pts_vals
481            .iter()
482            .enumerate()
483            .map(|(i, &p)| (p, i == 0))
484            .collect();
485        let dts = calc.compute_batch(&frames);
486        // First DTS = 6000 - 2*3000 = 0
487        assert_eq!(dts[0], 0);
488        // Each subsequent DTS increments by min_delta
489        for w in dts.windows(2) {
490            assert_eq!(w[1] - w[0], 3000);
491        }
492    }
493
494    #[test]
495    fn test_dts_calculator_invalid_delta() {
496        let result = DtsCalculator::new(0, 2);
497        assert!(result.is_err());
498        let result2 = DtsCalculator::new(-1, 2);
499        assert!(result2.is_err());
500    }
501
502    #[test]
503    fn test_dts_calculator_reset() {
504        let mut calc = DtsCalculator::new(1000, 1).unwrap();
505        let dts1 = calc.next(5000, true);
506        calc.reset();
507        let dts2 = calc.next(5000, true);
508        // After reset the initial DTS should be the same as first call
509        assert_eq!(dts1, dts2);
510    }
511
512    #[test]
513    fn test_no_b_frames_passthrough() {
514        let cfg = ReorderConfig {
515            max_b_frames: 0,
516            min_dts_delta: 1,
517            ..Default::default()
518        };
519        let mut buf = BFrameReorderBuffer::new(cfg);
520        for pts in [0i64, 1, 2, 3] {
521            buf.push(make_frame(pts, QueueFrameType::BiPredicted));
522        }
523        buf.flush();
524        let mut pts_out = Vec::new();
525        while let Some(f) = buf.pop() {
526            pts_out.push(f.pts);
527        }
528        // All frames emitted immediately in push order (no reordering)
529        assert_eq!(pts_out.len(), 4);
530    }
531}