use std::collections::VecDeque;
use crate::{Frame, Result, Timestamp};
use moq_transfork::coding::Decode;
#[derive(Debug)]
pub struct GroupConsumer {
group: moq_transfork::GroupConsumer,
index: usize,
buffered: VecDeque<Frame>,
max_timestamp: Option<Timestamp>,
}
impl GroupConsumer {
pub fn new(group: moq_transfork::GroupConsumer) -> Self {
Self {
group,
index: 0,
buffered: VecDeque::new(),
max_timestamp: None,
}
}
pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
if let Some(frame) = self.buffered.pop_front() {
Ok(Some(frame))
} else {
self.read_frame_unbuffered().await
}
}
async fn read_frame_unbuffered(&mut self) -> Result<Option<Frame>> {
let mut payload = match self.group.read_frame().await? {
Some(payload) => payload,
None => return Ok(None),
};
let micros = u64::decode(&mut payload)?;
let timestamp = Timestamp::from_micros(micros);
let frame = Frame {
keyframe: (self.index == 0),
timestamp,
payload,
};
if frame.keyframe {
tracing::debug!(?frame, group = ?self.group, "decoded keyframe");
} else {
tracing::trace!(?frame, group = ?self.group, index = self.index, "decoded frame");
}
self.index += 1;
self.max_timestamp = Some(self.max_timestamp.unwrap_or_default().max(timestamp));
Ok(Some(frame))
}
pub(super) async fn buffer_frames_until(&mut self, cutoff: Timestamp) -> Timestamp {
loop {
match self.max_timestamp {
Some(timestamp) if timestamp >= cutoff => return timestamp,
_ => (),
}
match self.read_frame().await {
Ok(Some(frame)) => self.buffered.push_back(frame),
_ => std::future::pending().await,
}
}
}
pub fn max_timestamp(&self) -> Option<Timestamp> {
self.max_timestamp
}
}
impl std::ops::Deref for GroupConsumer {
type Target = moq_transfork::GroupConsumer;
fn deref(&self) -> &Self::Target {
&self.group
}
}