libmoq 0.3.0

Media over QUIC, C bindings
Documentation
use std::{str::FromStr, sync::Arc};

use bytes::Buf;
use moq_mux::import;

use crate::{Error, Id, NonZeroSlab};

#[derive(Default)]
pub struct Publish {
	/// Active broadcast producers for publishing.
	broadcasts: NonZeroSlab<(moq_net::BroadcastProducer, moq_mux::catalog::hang::Producer)>,

	/// Active media encoders/decoders for publishing.
	media: NonZeroSlab<import::Framed>,

	/// Raw track producers (no media/container/catalog framing).
	tracks: NonZeroSlab<moq_net::TrackProducer>,

	/// Raw group producers, created from a raw track producer.
	groups: NonZeroSlab<moq_net::GroupProducer>,
}

impl Publish {
	pub fn create(&mut self) -> Result<Id, Error> {
		let mut broadcast = moq_net::Broadcast::new().produce();
		let catalog = moq_mux::catalog::hang::Producer::new(&mut broadcast)?;

		let id = self.broadcasts.insert((broadcast, catalog))?;
		Ok(id)
	}

	pub fn get(&self, id: Id) -> Result<&moq_net::BroadcastProducer, Error> {
		self.broadcasts
			.get(id)
			.ok_or(Error::BroadcastNotFound)
			.map(|(broadcast, _)| broadcast)
	}

	/// Mutable access to both the broadcast and its catalog producer.
	/// Used by sibling modules (e.g. `audio`) that need to attach a new
	/// track to an existing publish.
	pub fn pair_mut(
		&mut self,
		id: Id,
	) -> Result<(&mut moq_net::BroadcastProducer, &mut moq_mux::catalog::hang::Producer), Error> {
		let (broadcast, catalog) = self.broadcasts.get_mut(id).ok_or(Error::BroadcastNotFound)?;
		Ok((broadcast, catalog))
	}

	pub fn close(&mut self, broadcast: Id) -> Result<(), Error> {
		self.broadcasts.remove(broadcast).ok_or(Error::BroadcastNotFound)?;
		Ok(())
	}

	pub fn media_ordered(&mut self, broadcast: Id, format: &str, mut init: &[u8]) -> Result<Id, Error> {
		let (broadcast, catalog) = self.broadcasts.get(broadcast).ok_or(Error::BroadcastNotFound)?;

		let format = import::FramedFormat::from_str(format).map_err(|_| Error::UnknownFormat(format.to_string()))?;
		let decoder = import::Framed::new(broadcast.clone(), catalog.clone(), format, &mut init)
			.map_err(|err| Error::InitFailed(Arc::new(err)))?;

		let id = self.media.insert(decoder)?;
		Ok(id)
	}

	pub fn media_frame(
		&mut self,
		media: Id,
		mut data: &[u8],
		timestamp: hang::container::Timestamp,
	) -> Result<(), Error> {
		let media = self.media.get_mut(media).ok_or(Error::MediaNotFound)?;

		media
			.decode_frame(&mut data, Some(timestamp))
			.map_err(|err| Error::DecodeFailed(Arc::new(err)))?;

		if data.has_remaining() {
			return Err(Error::DecodeFailed(Arc::new(anyhow::anyhow!(
				"buffer was not fully consumed"
			))));
		}

		Ok(())
	}

	pub fn media_close(&mut self, media: Id) -> Result<(), Error> {
		let mut decoder = self.media.remove(media).ok_or(Error::MediaNotFound)?;
		decoder.finish().map_err(|err| Error::DecodeFailed(Arc::new(err)))?;
		Ok(())
	}

	/// Insert or replace a video rendition in the broadcast's catalog.
	///
	/// The catalog is republished automatically.
	pub fn video_config(&mut self, broadcast: Id, name: &str, config: hang::catalog::VideoConfig) -> Result<(), Error> {
		let (_, catalog) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
		catalog.lock().video.insert(name, config).map_err(Error::Hang)?;
		Ok(())
	}

	/// Insert or replace an audio rendition in the broadcast's catalog.
	///
	/// The catalog is republished automatically.
	pub fn audio_config(&mut self, broadcast: Id, name: &str, config: hang::catalog::AudioConfig) -> Result<(), Error> {
		let (_, catalog) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
		catalog.lock().audio.insert(name, config).map_err(Error::Hang)?;
		Ok(())
	}

	/// Remove a video rendition from the broadcast's catalog by name.
	///
	/// The catalog is republished automatically.
	pub fn video_remove(&mut self, broadcast: Id, name: &str) -> Result<(), Error> {
		let (_, catalog) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
		catalog.lock().video.remove(name);
		Ok(())
	}

	/// Remove an audio rendition from the broadcast's catalog by name.
	///
	/// The catalog is republished automatically.
	pub fn audio_remove(&mut self, broadcast: Id, name: &str) -> Result<(), Error> {
		let (_, catalog) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
		catalog.lock().audio.remove(name);
		Ok(())
	}

	/// Create a raw track on a broadcast for arbitrary byte payloads.
	///
	/// No codec, container, or catalog framing. This is the moq-net primitive
	/// for non-media tracks. Pair it with [`Self::video_config`] / [`Self::audio_config`]
	/// if you want to describe the track in the catalog as well.
	pub fn track(&mut self, broadcast: Id, name: &str) -> Result<Id, Error> {
		let (broadcast, _) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
		let track = broadcast.create_track(moq_net::Track {
			name: name.to_string(),
			priority: 0,
		})?;
		self.tracks.insert(track)
	}

	/// Append a new group to a raw track, returning a group producer.
	pub fn track_group(&mut self, track: Id) -> Result<Id, Error> {
		let track = self.tracks.get_mut(track).ok_or(Error::TrackNotFound)?;
		let group = track.append_group()?;
		self.groups.insert(group)
	}

	/// Write a single-frame group to a raw track. Convenience for the common
	/// one-frame-per-group pattern (e.g. status/command tracks).
	pub fn track_frame(&mut self, track: Id, payload: &[u8]) -> Result<(), Error> {
		let track = self.tracks.get_mut(track).ok_or(Error::TrackNotFound)?;
		track.write_frame(bytes::Bytes::copy_from_slice(payload))?;
		Ok(())
	}

	/// Finish a raw track. No more groups or frames can be written.
	pub fn track_finish(&mut self, track: Id) -> Result<(), Error> {
		let mut track = self.tracks.remove(track).ok_or(Error::TrackNotFound)?;
		track.finish()?;
		Ok(())
	}

	/// Write a frame into a raw group.
	pub fn group_frame(&mut self, group: Id, payload: &[u8]) -> Result<(), Error> {
		let group = self.groups.get_mut(group).ok_or(Error::GroupNotFound)?;
		group.write_frame(bytes::Bytes::copy_from_slice(payload))?;
		Ok(())
	}

	/// Finish a raw group. No more frames can be written.
	pub fn group_finish(&mut self, group: Id) -> Result<(), Error> {
		let mut group = self.groups.remove(group).ok_or(Error::GroupNotFound)?;
		group.finish()?;
		Ok(())
	}
}