1use std::cmp::Reverse;
22use std::collections::BinaryHeap;
23
24use crate::error::{CodecError, CodecResult};
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct QueuedFrame {
29 pub pts: i64,
31 pub frame_type: QueueFrameType,
33 pub data: Vec<u8>,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum QueueFrameType {
40 Intra,
42 Inter,
44 BiPredicted,
46}
47
48#[derive(Debug, Clone)]
51pub struct ReadyFrame {
52 pub pts: i64,
54 pub dts: i64,
56 pub frame_type: QueueFrameType,
58 pub data: Vec<u8>,
60}
61
62#[derive(Debug, Default)]
69pub struct FrameQueue {
70 heap: BinaryHeap<Reverse<PtsOrdFrame>>,
72 pts_set: std::collections::BTreeSet<i64>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
78struct PtsOrdFrame(QueuedFrame);
79
80impl PartialOrd for PtsOrdFrame {
81 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
82 Some(self.cmp(other))
83 }
84}
85
86impl Ord for PtsOrdFrame {
87 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
88 self.0.pts.cmp(&other.0.pts)
89 }
90}
91
92impl FrameQueue {
93 pub fn new() -> Self {
95 Self::default()
96 }
97
98 pub fn push(&mut self, frame: QueuedFrame) -> CodecResult<()> {
100 if self.pts_set.contains(&frame.pts) {
101 return Err(CodecError::InvalidParameter(format!(
102 "duplicate PTS {} in frame queue",
103 frame.pts
104 )));
105 }
106 self.pts_set.insert(frame.pts);
107 self.heap.push(Reverse(PtsOrdFrame(frame)));
108 Ok(())
109 }
110
111 pub fn pop(&mut self) -> Option<QueuedFrame> {
113 let Reverse(PtsOrdFrame(frame)) = self.heap.pop()?;
114 self.pts_set.remove(&frame.pts);
115 Some(frame)
116 }
117
118 pub fn peek_pts(&self) -> Option<i64> {
120 self.heap.peek().map(|Reverse(PtsOrdFrame(f))| f.pts)
121 }
122
123 pub fn len(&self) -> usize {
125 self.heap.len()
126 }
127
128 pub fn is_empty(&self) -> bool {
130 self.heap.is_empty()
131 }
132
133 pub fn drain_ordered(&mut self) -> Vec<QueuedFrame> {
135 let mut result = Vec::with_capacity(self.heap.len());
136 while let Some(f) = self.pop() {
137 result.push(f);
138 }
139 result
140 }
141}
142
143#[derive(Debug, Clone)]
147pub struct ReorderConfig {
148 pub max_b_frames: usize,
151 pub timebase_num: u32,
154 pub timebase_den: u32,
156 pub min_dts_delta: i64,
159}
160
161impl Default for ReorderConfig {
162 fn default() -> Self {
163 Self {
164 max_b_frames: 2,
165 timebase_num: 1,
166 timebase_den: 90_000, min_dts_delta: 3000, }
169 }
170}
171
172#[derive(Debug)]
185pub struct BFrameReorderBuffer {
186 config: ReorderConfig,
187 pending: Vec<QueuedFrame>,
189 next_dts: Option<i64>,
191 dts_counter: i64,
193 output: std::collections::VecDeque<ReadyFrame>,
195}
196
197impl BFrameReorderBuffer {
198 pub fn new(config: ReorderConfig) -> Self {
200 Self {
201 config,
202 pending: Vec::new(),
203 next_dts: None,
204 dts_counter: 0,
205 output: std::collections::VecDeque::new(),
206 }
207 }
208
209 pub fn default_config() -> Self {
211 Self::new(ReorderConfig::default())
212 }
213
214 pub fn push(&mut self, frame: QueuedFrame) {
218 if self.next_dts.is_none() {
221 let offset = (self.config.max_b_frames as i64) * self.config.min_dts_delta;
222 let initial_dts = frame.pts - offset;
223 self.next_dts = Some(initial_dts);
224 self.dts_counter = initial_dts;
225 }
226
227 match frame.frame_type {
228 QueueFrameType::Intra | QueueFrameType::Inter => {
229 self.flush_pending_b_frames();
233 self.emit_frame(frame);
235 }
236 QueueFrameType::BiPredicted => {
237 if self.config.max_b_frames == 0 {
238 self.emit_frame(frame);
240 } else {
241 self.pending.push(frame);
242 if self.pending.len() >= self.config.max_b_frames {
244 self.flush_pending_b_frames();
245 }
246 }
247 }
248 }
249 }
250
251 pub fn flush(&mut self) {
253 self.flush_pending_b_frames();
254 }
255
256 pub fn pop(&mut self) -> Option<ReadyFrame> {
258 self.output.pop_front()
259 }
260
261 pub fn ready_len(&self) -> usize {
263 self.output.len()
264 }
265
266 pub fn pending_len(&self) -> usize {
268 self.pending.len()
269 }
270
271 fn flush_pending_b_frames(&mut self) {
274 self.pending.sort_by_key(|f| f.pts);
276 let frames: Vec<_> = self.pending.drain(..).collect();
277 for f in frames {
278 self.emit_frame(f);
279 }
280 }
281
282 fn emit_frame(&mut self, frame: QueuedFrame) {
283 let dts = self.dts_counter;
284 self.dts_counter += self.config.min_dts_delta;
285 self.output.push_back(ReadyFrame {
286 pts: frame.pts,
287 dts,
288 frame_type: frame.frame_type,
289 data: frame.data,
290 });
291 }
292}
293
294#[derive(Debug)]
306pub struct DtsCalculator {
307 min_delta: i64,
309 max_b_frames: usize,
311 next_dts: Option<i64>,
313}
314
315impl DtsCalculator {
316 pub fn new(min_delta: i64, max_b_frames: usize) -> CodecResult<Self> {
320 if min_delta <= 0 {
321 return Err(CodecError::InvalidParameter(
322 "DtsCalculator: min_delta must be positive".into(),
323 ));
324 }
325 Ok(Self {
326 min_delta,
327 max_b_frames,
328 next_dts: None,
329 })
330 }
331
332 pub fn next(&mut self, pts: i64, _is_keyframe: bool) -> i64 {
337 let dts = match self.next_dts {
338 None => {
339 let offset = (self.max_b_frames as i64) * self.min_delta;
340 let initial = pts - offset;
341 self.next_dts = Some(initial + self.min_delta);
342 initial
343 }
344 Some(ref mut counter) => {
345 let dts = *counter;
346 *counter += self.min_delta;
347 dts
348 }
349 };
350 dts
351 }
352
353 pub fn compute_batch(&mut self, frames: &[(i64, bool)]) -> Vec<i64> {
357 frames
358 .iter()
359 .map(|&(pts, is_key)| self.next(pts, is_key))
360 .collect()
361 }
362
363 pub fn reset(&mut self) {
365 self.next_dts = None;
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372
373 fn make_frame(pts: i64, ft: QueueFrameType) -> QueuedFrame {
374 QueuedFrame {
375 pts,
376 frame_type: ft,
377 data: vec![pts as u8],
378 }
379 }
380
381 #[test]
382 fn test_frame_queue_push_pop_ordered() {
383 let mut q = FrameQueue::new();
384 q.push(make_frame(200, QueueFrameType::Inter)).unwrap();
385 q.push(make_frame(0, QueueFrameType::Intra)).unwrap();
386 q.push(make_frame(100, QueueFrameType::BiPredicted))
387 .unwrap();
388
389 assert_eq!(q.pop().unwrap().pts, 0);
390 assert_eq!(q.pop().unwrap().pts, 100);
391 assert_eq!(q.pop().unwrap().pts, 200);
392 assert!(q.pop().is_none());
393 }
394
395 #[test]
396 fn test_frame_queue_duplicate_pts_error() {
397 let mut q = FrameQueue::new();
398 q.push(make_frame(100, QueueFrameType::Intra)).unwrap();
399 let result = q.push(make_frame(100, QueueFrameType::Inter));
400 assert!(result.is_err());
401 }
402
403 #[test]
404 fn test_frame_queue_drain_ordered() {
405 let mut q = FrameQueue::new();
406 for pts in [500i64, 100, 300, 0, 200] {
407 q.push(make_frame(pts, QueueFrameType::Inter)).unwrap();
408 }
409 let drained = q.drain_ordered();
410 let pts_seq: Vec<i64> = drained.iter().map(|f| f.pts).collect();
411 assert_eq!(pts_seq, vec![0, 100, 200, 300, 500]);
412 }
413
414 #[test]
415 fn test_frame_queue_peek_pts() {
416 let mut q = FrameQueue::new();
417 assert_eq!(q.peek_pts(), None);
418 q.push(make_frame(50, QueueFrameType::Intra)).unwrap();
419 q.push(make_frame(10, QueueFrameType::Inter)).unwrap();
420 assert_eq!(q.peek_pts(), Some(10));
421 }
422
423 #[test]
424 fn test_b_frame_reorder_anchor_before_b() {
425 let cfg = ReorderConfig {
426 max_b_frames: 2,
427 min_dts_delta: 1,
428 ..Default::default()
429 };
430 let mut buf = BFrameReorderBuffer::new(cfg);
431 buf.push(make_frame(0, QueueFrameType::Intra));
433 buf.push(make_frame(1, QueueFrameType::BiPredicted));
434 buf.push(make_frame(2, QueueFrameType::BiPredicted));
435 buf.push(make_frame(3, QueueFrameType::Inter));
436 buf.flush();
437
438 let mut out = Vec::new();
440 while let Some(f) = buf.pop() {
441 out.push(f);
442 }
443 assert!(!out.is_empty());
444 for w in out.windows(2) {
446 assert!(w[1].dts >= w[0].dts, "DTS must be non-decreasing");
447 }
448 }
449
450 #[test]
451 fn test_b_frame_reorder_dts_leq_pts() {
452 let cfg = ReorderConfig {
453 max_b_frames: 2,
454 min_dts_delta: 3000,
455 ..Default::default()
456 };
457 let mut buf = BFrameReorderBuffer::new(cfg);
458 let pts_sequence = [0i64, 3000, 6000, 9000, 12000];
459 for (i, &pts) in pts_sequence.iter().enumerate() {
460 let ft = if i % 3 == 0 {
461 QueueFrameType::Intra
462 } else if i % 3 == 1 {
463 QueueFrameType::BiPredicted
464 } else {
465 QueueFrameType::Inter
466 };
467 buf.push(make_frame(pts, ft));
468 }
469 buf.flush();
470
471 while let Some(f) = buf.pop() {
472 assert!(f.dts <= f.pts, "DTS ({}) must be <= PTS ({})", f.dts, f.pts);
473 }
474 }
475
476 #[test]
477 fn test_dts_calculator_basic() {
478 let mut calc = DtsCalculator::new(3000, 2).unwrap();
479 let pts_vals = [6000i64, 9000, 12000, 15000];
480 let frames: Vec<(i64, bool)> = pts_vals
481 .iter()
482 .enumerate()
483 .map(|(i, &p)| (p, i == 0))
484 .collect();
485 let dts = calc.compute_batch(&frames);
486 assert_eq!(dts[0], 0);
488 for w in dts.windows(2) {
490 assert_eq!(w[1] - w[0], 3000);
491 }
492 }
493
494 #[test]
495 fn test_dts_calculator_invalid_delta() {
496 let result = DtsCalculator::new(0, 2);
497 assert!(result.is_err());
498 let result2 = DtsCalculator::new(-1, 2);
499 assert!(result2.is_err());
500 }
501
502 #[test]
503 fn test_dts_calculator_reset() {
504 let mut calc = DtsCalculator::new(1000, 1).unwrap();
505 let dts1 = calc.next(5000, true);
506 calc.reset();
507 let dts2 = calc.next(5000, true);
508 assert_eq!(dts1, dts2);
510 }
511
512 #[test]
513 fn test_no_b_frames_passthrough() {
514 let cfg = ReorderConfig {
515 max_b_frames: 0,
516 min_dts_delta: 1,
517 ..Default::default()
518 };
519 let mut buf = BFrameReorderBuffer::new(cfg);
520 for pts in [0i64, 1, 2, 3] {
521 buf.push(make_frame(pts, QueueFrameType::BiPredicted));
522 }
523 buf.flush();
524 let mut pts_out = Vec::new();
525 while let Some(f) = buf.pop() {
526 pts_out.push(f.pts);
527 }
528 assert_eq!(pts_out.len(), 4);
530 }
531}