use crate::container::{Container, Frame};
pub struct Producer<C: Container> {
pub track: moq_lite::TrackProducer,
container: C,
group: Option<moq_lite::GroupProducer>,
buffer: Vec<Frame>,
latency: std::time::Duration,
}
impl<C: Container> Producer<C> {
pub fn new(track: moq_lite::TrackProducer, container: C) -> Self {
Self {
track,
container,
group: None,
buffer: Vec::new(),
latency: std::time::Duration::ZERO,
}
}
pub fn with_latency(mut self, latency: std::time::Duration) -> Self {
self.latency = latency;
self
}
pub fn write(&mut self, frame: Frame) -> Result<(), C::Error> {
if frame.keyframe {
self.flush()?;
if let Some(mut group) = self.group.take() {
group.finish()?;
}
let group = self.track.append_group()?;
self.group = Some(group);
}
if self.group.is_none() {
return Err(moq_lite::Error::ProtocolViolation.into());
}
if self.latency.is_zero() {
let group = self.group.as_mut().unwrap();
self.container.write(group, &[frame])?;
} else {
self.buffer.push(frame);
if self.buffer.len() >= 2 {
let first_ts: std::time::Duration = self.buffer.first().unwrap().timestamp.into();
let last_ts: std::time::Duration = self.buffer.last().unwrap().timestamp.into();
if last_ts.saturating_sub(first_ts) >= self.latency {
self.flush()?;
}
}
}
Ok(())
}
fn flush(&mut self) -> Result<(), C::Error> {
if self.buffer.is_empty() {
return Ok(());
}
let group = match &mut self.group {
Some(group) => group,
None => return Ok(()),
};
self.container.write(group, &self.buffer)?;
self.buffer.clear();
Ok(())
}
pub fn finish(&mut self) -> Result<(), C::Error> {
self.flush()?;
if let Some(mut group) = self.group.take() {
group.finish()?;
}
self.track.finish()?;
Ok(())
}
pub fn consume(&self) -> moq_lite::TrackConsumer {
self.track.consume()
}
}