moq_karp/
group.rs

1use std::collections::VecDeque;
2
3use crate::{Frame, Result, Timestamp};
4use moq_transfork::coding::Decode;
5
6#[derive(Debug)]
7pub struct GroupConsumer {
8	// The MoqTransfork group (no timestamp information)
9	group: moq_transfork::GroupConsumer,
10
11	// The current frame index
12	index: usize,
13
14	// The any buffered frames in the group.
15	buffered: VecDeque<Frame>,
16
17	// The max timestamp in the group
18	max_timestamp: Option<Timestamp>,
19}
20
21impl GroupConsumer {
22	pub fn new(group: moq_transfork::GroupConsumer) -> Self {
23		Self {
24			group,
25			index: 0,
26			buffered: VecDeque::new(),
27			max_timestamp: None,
28		}
29	}
30
31	pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
32		if let Some(frame) = self.buffered.pop_front() {
33			Ok(Some(frame))
34		} else {
35			self.read_frame_unbuffered().await
36		}
37	}
38
39	async fn read_frame_unbuffered(&mut self) -> Result<Option<Frame>> {
40		let mut payload = match self.group.read_frame().await? {
41			Some(payload) => payload,
42			None => return Ok(None),
43		};
44
45		let micros = u64::decode(&mut payload)?;
46		let timestamp = Timestamp::from_micros(micros);
47
48		let frame = Frame {
49			keyframe: (self.index == 0),
50			timestamp,
51			payload,
52		};
53
54		if frame.keyframe {
55			tracing::debug!(?frame, group = ?self.group, "decoded keyframe");
56		} else {
57			tracing::trace!(?frame, group = ?self.group, index = self.index, "decoded frame");
58		}
59
60		self.index += 1;
61		self.max_timestamp = Some(self.max_timestamp.unwrap_or_default().max(timestamp));
62
63		Ok(Some(frame))
64	}
65
66	// Keep reading and buffering new frames, returning when `max` is larger than or equal to the cutoff.
67	// Not publish because the API is super weird.
68	// This will BLOCK FOREVER if the group has ended early; it's intended to be used within select!
69	pub(super) async fn buffer_frames_until(&mut self, cutoff: Timestamp) -> Timestamp {
70		loop {
71			match self.max_timestamp {
72				Some(timestamp) if timestamp >= cutoff => return timestamp,
73				_ => (),
74			}
75
76			match self.read_frame().await {
77				Ok(Some(frame)) => self.buffered.push_back(frame),
78				// Otherwise block forever so we don't return from FuturesUnordered
79				_ => std::future::pending().await,
80			}
81		}
82	}
83
84	pub fn max_timestamp(&self) -> Option<Timestamp> {
85		self.max_timestamp
86	}
87}
88
89impl std::ops::Deref for GroupConsumer {
90	type Target = moq_transfork::GroupConsumer;
91
92	fn deref(&self) -> &Self::Target {
93		&self.group
94	}
95}