moq-mux 0.5.5

Media muxers and demuxers for MoQ
Documentation
use bytes::{Buf, BytesMut};

use super::Config;
use crate::catalog::hang::CatalogExt;

/// AAC importer.
///
/// Initialized from an AudioSpecificConfig blob (variable-length, typically extracted from
/// an MP4 ESDS atom). Each input buffer passed to [`decode`](Self::decode) is published as
/// one hang frame in its own group, so the relay can forward each frame without waiting for
/// a group boundary. The codec's packet loss concealment handles drops.
pub struct Import<E: CatalogExt = ()> {
	catalog: crate::catalog::Producer<E>,
	track: crate::container::Producer<crate::catalog::hang::Container>,
	zero: Option<tokio::time::Instant>,
}

impl<E: CatalogExt> Import<E> {
	pub fn new(
		broadcast: moq_net::BroadcastProducer,
		catalog: crate::catalog::Producer<E>,
		config: Config,
	) -> anyhow::Result<Self> {
		Self::new_with_source(
			crate::track_provider::TrackProvider::unique(broadcast, ".aac"),
			catalog,
			config,
		)
	}

	pub fn new_with_track(
		track: moq_net::TrackProducer,
		catalog: crate::catalog::Producer<E>,
		config: Config,
	) -> anyhow::Result<Self> {
		Self::new_with_source(crate::track_provider::TrackProvider::fixed(track), catalog, config)
	}

	fn new_with_source(
		mut tracks: crate::track_provider::TrackProvider,
		mut catalog: crate::catalog::Producer<E>,
		config: Config,
	) -> anyhow::Result<Self> {
		let track = tracks.create()?;

		let mut audio_config = hang::catalog::AudioConfig::new(
			hang::catalog::AAC {
				profile: config.profile,
			},
			config.sample_rate,
			config.channel_count,
		);
		audio_config.container = hang::catalog::Container::Legacy;

		tracing::debug!(name = ?track.name, config = ?audio_config, "starting track");
		catalog.lock().audio.renditions.insert(track.name.clone(), audio_config);

		Ok(Self {
			catalog,
			track: crate::container::Producer::new(track, crate::catalog::hang::Container::Legacy),
			zero: None,
		})
	}

	/// Returns a reference to the underlying track producer.
	pub fn track(&self) -> &moq_net::TrackProducer {
		self.track.track()
	}

	/// Finish the track, flushing the current group.
	pub fn finish(&mut self) -> anyhow::Result<()> {
		self.track.finish()?;
		Ok(())
	}

	/// Close the current group and open the next one at `sequence`.
	pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> {
		self.track.seek(sequence)?;
		Ok(())
	}

	pub fn decode<T: Buf>(&mut self, buf: &mut T, pts: Option<crate::container::Timestamp>) -> anyhow::Result<()> {
		let pts = self.pts(pts)?;

		// Collect the input into a contiguous Bytes payload.
		let mut payload = BytesMut::with_capacity(buf.remaining());
		while buf.has_remaining() {
			let chunk = buf.chunk();
			payload.extend_from_slice(chunk);
			let len = chunk.len();
			buf.advance(len);
		}

		// Each frame is its own group so the relay can forward it immediately.
		// The codec's packet loss concealment handles drops.
		let frame = crate::container::Frame {
			timestamp: pts,
			payload: payload.freeze(),
			keyframe: true,
		};

		self.track.write(frame)?;
		self.track.finish_group()?;

		Ok(())
	}

	fn pts(&mut self, hint: Option<crate::container::Timestamp>) -> anyhow::Result<crate::container::Timestamp> {
		if let Some(pts) = hint {
			return Ok(pts);
		}

		let zero = self.zero.get_or_insert_with(tokio::time::Instant::now);
		Ok(crate::container::Timestamp::from_micros(
			zero.elapsed().as_micros() as u64
		)?)
	}
}

impl<E: CatalogExt> Drop for Import<E> {
	fn drop(&mut self) {
		tracing::debug!(name = ?self.track.name, "ending track");
		self.catalog.lock().audio.renditions.remove(&self.track.name);
	}
}