moq-mux 0.3.7

Media muxers and demuxers for MoQ
Documentation
use anyhow::Context;
use buf_list::BufList;
use bytes::Buf;

// Make a new audio group every 100ms.
// NOTE: We could do this per-frame, but there's not much benefit to it.
const MAX_GROUP_DURATION: hang::container::Timestamp = hang::container::Timestamp::from_millis_unchecked(100);

/// Typed AAC configuration for initialization without binary blobs.
pub struct AacConfig {
	pub profile: u8,
	pub sample_rate: u32,
	pub channel_count: u32,
}

impl AacConfig {
	/// Parse an AudioSpecificConfig buffer into an AacConfig.
	pub fn parse<T: Buf>(buf: &mut T) -> anyhow::Result<Self> {
		anyhow::ensure!(buf.remaining() >= 2, "AudioSpecificConfig must be at least 2 bytes");

		// Parse AudioSpecificConfig (ISO 14496-3)
		// This parser handles variable-length configurations including:
		// - Basic formats (object_type < 31)
		// - Extended formats (object_type == 31)
		// - SBR/PS extensions
		// - Explicit sample rates (freq_index == 15)

		// Read first byte
		let b0 = buf.get_u8();
		let mut object_type = b0 >> 3;
		let mut freq_index;

		// Handle extended audioObjectType (object_type == 31)
		let (profile, sample_rate, channel_count) = if object_type == 31 {
			anyhow::ensure!(
				buf.remaining() >= 2,
				"extended audioObjectType requires 2 additional bytes"
			);
			// Extended format: next 6 bits are the extended object_type (32-63)
			// Bits 5-7 of b0 are the first 3 bits of extended object_type
			let b_ext = buf.get_u8();
			// Bits 0-2 of b_ext are the last 3 bits of extended object_type
			let audio_object_type_ext = ((b0 & 0x07) << 3) | ((b_ext >> 5) & 0x07);
			object_type = 32 + audio_object_type_ext;
			// Bits 3-6 of b_ext are samplingFrequencyIndex (4 bits)
			freq_index = (b_ext >> 1) & 0x0F;
			// Bit 0 of b_ext is the first bit of channelConfiguration
			let channel_config_high = b_ext & 0x01;

			// Read next byte for rest of channelConfiguration
			anyhow::ensure!(buf.remaining() >= 1, "AudioSpecificConfig incomplete");
			let b1 = buf.get_u8();
			// Bits 5-7 of b1 are the remaining 3 bits of channelConfiguration
			let channel_config = (channel_config_high << 3) | ((b1 >> 5) & 0x07);

			let sample_rate = sample_rate_from_index(freq_index, buf)?;
			let channel_count = channel_count_from_config(channel_config);

			// Consume any remaining extension data
			if buf.remaining() > 0 {
				buf.advance(buf.remaining());
			}

			(object_type, sample_rate, channel_count)
		} else {
			// Standard format: bits 5-7 of b0 are first 3 bits of freq_index
			freq_index = (b0 & 0x07) << 1;

			// Read second byte
			anyhow::ensure!(buf.remaining() >= 1, "AudioSpecificConfig incomplete");
			let b1 = buf.get_u8();

			// Complete frequency index (bit 7 of b1 is bit 0 of freq_index)
			freq_index |= (b1 >> 7) & 0x01;

			// Channel configuration
			let channel_config = (b1 >> 3) & 0x0F;

			let sample_rate = sample_rate_from_index(freq_index, buf)?;
			let channel_count = channel_count_from_config(channel_config);

			// Consume any remaining extension data (SBR, PS, etc.)
			// AudioSpecificConfig can have variable-length extensions that we don't need to parse.
			// Since we've already extracted the essential info (object_type, sample_rate, channels),
			// we'll consume any remaining bytes to ensure the buffer is properly advanced.
			// This makes the parser robust to different AAC variants from OBS and other sources.
			if buf.remaining() > 0 {
				buf.advance(buf.remaining());
			}

			(object_type, sample_rate, channel_count)
		};

		Ok(Self {
			profile,
			sample_rate,
			channel_count,
		})
	}
}

/// AAC decoder, initialized via AudioSpecificConfig (variable length from ESDS box).
pub struct Aac {
	catalog: crate::CatalogProducer,
	track: hang::container::OrderedProducer,
	zero: Option<tokio::time::Instant>,
}

impl Aac {
	pub fn new(
		mut broadcast: moq_lite::BroadcastProducer,
		mut catalog: crate::CatalogProducer,
		config: AacConfig,
	) -> anyhow::Result<Self> {
		let track = broadcast.unique_track(".aac")?;

		// TODO parse the actual frame size from the profile (e.g. 960 for HE-AAC, 480/512 for AAC-LD).
		// 1024 samples is correct for LC-AAC which covers the vast majority of real-world streams.
		let frame_duration_us = 1024u64 * 1_000_000 / config.sample_rate as u64;
		let jitter = moq_lite::Time::from_micros(frame_duration_us).ok();

		let audio_config = hang::catalog::AudioConfig {
			codec: hang::catalog::AAC {
				profile: config.profile,
			}
			.into(),
			sample_rate: config.sample_rate,
			channel_count: config.channel_count,
			bitrate: None,
			description: None,
			container: hang::catalog::Container::Legacy,
			jitter,
		};

		catalog.lock().audio.insert(&track.info.name, audio_config.clone())?;
		tracing::debug!(name = ?track.info.name, config = ?audio_config, "starting track");

		Ok(Self {
			catalog,
			track: hang::container::OrderedProducer::new(track).with_max_group_duration(MAX_GROUP_DURATION),
			zero: None,
		})
	}

	/// Returns a reference to the underlying track producer.
	pub fn track(&self) -> &moq_lite::TrackProducer {
		&self.track
	}

	/// Finish the track, flushing the current group.
	pub fn finish(&mut self) -> anyhow::Result<()> {
		self.track.finish()?;
		Ok(())
	}

	pub fn decode<T: Buf>(&mut self, buf: &mut T, pts: Option<hang::container::Timestamp>) -> anyhow::Result<()> {
		let pts = self.pts(pts)?;

		// Create a BufList at chunk boundaries, potentially avoiding allocations.
		let mut payload = BufList::new();
		while !buf.chunk().is_empty() {
			payload.push_chunk(buf.copy_to_bytes(buf.chunk().len()));
		}

		let frame = hang::container::Frame {
			timestamp: pts,
			payload,
		};

		self.track.write(frame)?;

		Ok(())
	}

	fn pts(&mut self, hint: Option<hang::container::Timestamp>) -> anyhow::Result<hang::container::Timestamp> {
		if let Some(pts) = hint {
			return Ok(pts);
		}

		let zero = self.zero.get_or_insert_with(tokio::time::Instant::now);
		Ok(hang::container::Timestamp::from_micros(
			zero.elapsed().as_micros() as u64
		)?)
	}
}

impl Drop for Aac {
	fn drop(&mut self) {
		tracing::debug!(name = ?self.track.info.name, "ending track");
		self.catalog.lock().audio.remove(&self.track.info.name);
	}
}

fn sample_rate_from_index<T: Buf>(freq_index: u8, buf: &mut T) -> anyhow::Result<u32> {
	const SAMPLE_RATES: [u32; 13] = [
		96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350,
	];

	if freq_index == 15 {
		anyhow::ensure!(buf.remaining() >= 3, "explicit sample rate requires 3 additional bytes");
		let rate_bytes = [buf.get_u8(), buf.get_u8(), buf.get_u8()];
		return Ok(((rate_bytes[0] as u32) << 16) | ((rate_bytes[1] as u32) << 8) | (rate_bytes[2] as u32));
	}

	SAMPLE_RATES
		.get(freq_index as usize)
		.copied()
		.context("unsupported sample rate index")
}

fn channel_count_from_config(channel_config: u8) -> u32 {
	if channel_config == 0 {
		2
	} else if channel_config <= 7 {
		channel_config as u32
	} else {
		tracing::warn!(channel_config, "unsupported channel config, defaulting to stereo");
		2
	}
}