use anyhow::Context;
use bytes::Buf;
use crate::container::jitter::MinFrameDuration;
use super::FrameHeader;
pub struct Import {
tracks: crate::track_provider::TrackProvider,
catalog: crate::catalog::Producer,
track: Option<crate::container::Producer<crate::catalog::hang::Container>>,
config: Option<hang::catalog::VideoConfig>,
zero: Option<tokio::time::Instant>,
jitter: MinFrameDuration,
}
impl Import {
pub fn new(broadcast: moq_net::BroadcastProducer, catalog: crate::catalog::Producer) -> Self {
Self {
tracks: crate::track_provider::TrackProvider::unique(broadcast, ".vp09"),
catalog,
track: None,
config: None,
zero: None,
jitter: MinFrameDuration::new(),
}
}
pub fn new_with_track(track: moq_net::TrackProducer, catalog: crate::catalog::Producer) -> Self {
Self {
tracks: crate::track_provider::TrackProvider::fixed(track),
catalog,
track: None,
config: None,
zero: None,
jitter: MinFrameDuration::new(),
}
}
pub fn initialize<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> {
if buf.has_remaining() {
self.decode_frame(buf, None)?;
}
Ok(())
}
fn init(&mut self, vp9: hang::catalog::VP9, width: u16, height: u16) -> anyhow::Result<()> {
let mut config = hang::catalog::VideoConfig::new(vp9);
config.coded_width = Some(width as u32);
config.coded_height = Some(height as u32);
config.container = hang::catalog::Container::Legacy;
if self.config.as_ref() == Some(&config) {
return Ok(());
}
if self.track.is_some() && self.tracks.is_fixed() {
anyhow::bail!("fixed track cannot be reconfigured");
}
if let Some(track) = self.track.take() {
tracing::debug!(name = ?track.name, "reinitializing track");
self.catalog.lock().video.renditions.remove(&track.name);
}
let track = self.tracks.create()?;
tracing::debug!(name = ?track.name, ?config, "starting track");
self.catalog
.lock()
.video
.renditions
.insert(track.name.clone(), config.clone());
self.config = Some(config);
self.track = Some(crate::container::Producer::new(
track,
crate::catalog::hang::Container::Legacy,
));
Ok(())
}
pub fn decode_frame<T: Buf + AsRef<[u8]>>(
&mut self,
buf: &mut T,
pts: Option<crate::container::Timestamp>,
) -> anyhow::Result<()> {
let payload = buf.copy_to_bytes(buf.remaining());
anyhow::ensure!(!payload.is_empty(), "empty VP9 frame");
let header = FrameHeader::parse(&payload)?;
if let Some(key) = header.key {
self.init(key.to_catalog(), key.width, key.height)?;
}
let pts = self.pts(pts)?;
let track = self
.track
.as_mut()
.context("expected a VP9 key frame before any interframe")?;
track.write(crate::container::Frame {
timestamp: pts,
payload,
keyframe: header.keyframe,
})?;
if let Some(jitter) = self.jitter.observe(pts)
&& let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name)
{
c.jitter = Some(jitter);
}
Ok(())
}
pub fn track(&self) -> anyhow::Result<&moq_net::TrackProducer> {
Ok(self.track.as_ref().context("not initialized")?.track())
}
pub fn finish(&mut self) -> anyhow::Result<()> {
let track = self.track.as_mut().context("not initialized")?;
track.finish()?;
Ok(())
}
pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> {
let track = self.track.as_mut().context("not initialized")?;
track.seek(sequence)?;
Ok(())
}
pub fn is_initialized(&self) -> bool {
self.track.is_some()
}
fn pts(&mut self, hint: Option<crate::container::Timestamp>) -> anyhow::Result<crate::container::Timestamp> {
if let Some(pts) = hint {
return Ok(pts);
}
let zero = self.zero.get_or_insert_with(tokio::time::Instant::now);
Ok(crate::container::Timestamp::from_micros(
zero.elapsed().as_micros() as u64
)?)
}
}
impl Drop for Import {
fn drop(&mut self) {
if let Some(track) = self.track.take() {
tracing::debug!(name = ?track.name, "ending track");
self.catalog.lock().video.renditions.remove(&track.name);
}
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use crate::container::Timestamp;
const KEYFRAME: &[u8] = &[0x82, 0x49, 0x83, 0x42, 0x20, 0x13, 0xf0, 0x0e, 0xf0, 0x00];
#[tokio::test(start_paused = true)]
async fn imports_keyframe_then_interframe() {
let mut broadcast = moq_net::Broadcast::new().produce();
let mut catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = super::Import::new(broadcast.clone(), catalog.clone());
import.initialize(&mut Bytes::new()).unwrap();
assert!(!import.is_initialized());
import
.decode_frame(
&mut Bytes::from_static(KEYFRAME),
Some(Timestamp::from_micros(0).unwrap()),
)
.unwrap();
assert!(import.is_initialized());
let name = import.track().unwrap().name.clone();
let config = catalog.lock().video.renditions.get(&name).cloned().unwrap();
assert!(matches!(config.codec, hang::catalog::VideoCodec::VP9(_)));
assert_eq!(config.coded_width, Some(320));
assert_eq!(config.coded_height, Some(240));
import
.decode_frame(
&mut Bytes::from_static(&[0x84, 0x00, 0x00]),
Some(Timestamp::from_micros(33_000).unwrap()),
)
.unwrap();
import.finish().unwrap();
}
#[tokio::test(start_paused = true)]
async fn rejects_interframe_first() {
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = super::Import::new(broadcast.clone(), catalog);
let mut interframe = Bytes::from_static(&[0x84, 0x00, 0x00]);
assert!(
import
.decode_frame(&mut interframe, Some(Timestamp::from_micros(0).unwrap()))
.is_err()
);
}
}