1use std::collections::VecDeque;
2
3use crate::{Frame, Result, Timestamp};
4use moq_transfork::coding::Decode;
5
6#[derive(Debug)]
7pub struct GroupConsumer {
8 group: moq_transfork::GroupConsumer,
10
11 index: usize,
13
14 buffered: VecDeque<Frame>,
16
17 max_timestamp: Option<Timestamp>,
19}
20
21impl GroupConsumer {
22 pub fn new(group: moq_transfork::GroupConsumer) -> Self {
23 Self {
24 group,
25 index: 0,
26 buffered: VecDeque::new(),
27 max_timestamp: None,
28 }
29 }
30
31 pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
32 if let Some(frame) = self.buffered.pop_front() {
33 Ok(Some(frame))
34 } else {
35 self.read_frame_unbuffered().await
36 }
37 }
38
39 async fn read_frame_unbuffered(&mut self) -> Result<Option<Frame>> {
40 let mut payload = match self.group.read_frame().await? {
41 Some(payload) => payload,
42 None => return Ok(None),
43 };
44
45 let micros = u64::decode(&mut payload)?;
46 let timestamp = Timestamp::from_micros(micros);
47
48 let frame = Frame {
49 keyframe: (self.index == 0),
50 timestamp,
51 payload,
52 };
53
54 if frame.keyframe {
55 tracing::debug!(?frame, group = ?self.group, "decoded keyframe");
56 } else {
57 tracing::trace!(?frame, group = ?self.group, index = self.index, "decoded frame");
58 }
59
60 self.index += 1;
61 self.max_timestamp = Some(self.max_timestamp.unwrap_or_default().max(timestamp));
62
63 Ok(Some(frame))
64 }
65
66 pub(super) async fn buffer_frames_until(&mut self, cutoff: Timestamp) -> Timestamp {
70 loop {
71 match self.max_timestamp {
72 Some(timestamp) if timestamp >= cutoff => return timestamp,
73 _ => (),
74 }
75
76 match self.read_frame().await {
77 Ok(Some(frame)) => self.buffered.push_back(frame),
78 _ => std::future::pending().await,
80 }
81 }
82 }
83
84 pub fn max_timestamp(&self) -> Option<Timestamp> {
85 self.max_timestamp
86 }
87}
88
89impl std::ops::Deref for GroupConsumer {
90 type Target = moq_transfork::GroupConsumer;
91
92 fn deref(&self) -> &Self::Target {
93 &self.group
94 }
95}