use std::{str::FromStr, sync::Arc};
use bytes::Buf;
use moq_mux::import;
use crate::{Error, Id, NonZeroSlab};
#[derive(Default)]
pub struct Publish {
broadcasts: NonZeroSlab<(moq_net::BroadcastProducer, moq_mux::catalog::Producer)>,
media: NonZeroSlab<import::Framed>,
tracks: NonZeroSlab<moq_net::TrackProducer>,
groups: NonZeroSlab<moq_net::GroupProducer>,
}
impl Publish {
pub fn create(&mut self) -> Result<Id, Error> {
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = moq_mux::catalog::Producer::new(&mut broadcast)?;
let id = self.broadcasts.insert((broadcast, catalog))?;
Ok(id)
}
pub fn get(&self, id: Id) -> Result<&moq_net::BroadcastProducer, Error> {
self.broadcasts
.get(id)
.ok_or(Error::BroadcastNotFound)
.map(|(broadcast, _)| broadcast)
}
pub fn pair_mut(
&mut self,
id: Id,
) -> Result<(&mut moq_net::BroadcastProducer, &mut moq_mux::catalog::Producer), Error> {
let (broadcast, catalog) = self.broadcasts.get_mut(id).ok_or(Error::BroadcastNotFound)?;
Ok((broadcast, catalog))
}
pub fn close(&mut self, broadcast: Id) -> Result<(), Error> {
self.broadcasts.remove(broadcast).ok_or(Error::BroadcastNotFound)?;
Ok(())
}
pub fn media_ordered(&mut self, broadcast: Id, format: &str, mut init: &[u8]) -> Result<Id, Error> {
let (broadcast, catalog) = self.broadcasts.get(broadcast).ok_or(Error::BroadcastNotFound)?;
let format = import::FramedFormat::from_str(format).map_err(|_| Error::UnknownFormat(format.to_string()))?;
let decoder = import::Framed::new(broadcast.clone(), catalog.clone(), format, &mut init)
.map_err(|err| Error::InitFailed(Arc::new(err)))?;
let id = self.media.insert(decoder)?;
Ok(id)
}
pub fn media_frame(
&mut self,
media: Id,
mut data: &[u8],
timestamp: hang::container::Timestamp,
) -> Result<(), Error> {
let media = self.media.get_mut(media).ok_or(Error::MediaNotFound)?;
media
.decode_frame(&mut data, Some(timestamp))
.map_err(|err| Error::DecodeFailed(Arc::new(err)))?;
if data.has_remaining() {
return Err(Error::DecodeFailed(Arc::new(anyhow::anyhow!(
"buffer was not fully consumed"
))));
}
Ok(())
}
pub fn media_close(&mut self, media: Id) -> Result<(), Error> {
let mut decoder = self.media.remove(media).ok_or(Error::MediaNotFound)?;
decoder.finish().map_err(|err| Error::DecodeFailed(Arc::new(err)))?;
Ok(())
}
pub fn video_config(&mut self, broadcast: Id, name: &str, config: hang::catalog::VideoConfig) -> Result<(), Error> {
let (_, catalog) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
catalog.lock().video.insert(name, config).map_err(Error::Hang)?;
Ok(())
}
pub fn audio_config(&mut self, broadcast: Id, name: &str, config: hang::catalog::AudioConfig) -> Result<(), Error> {
let (_, catalog) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
catalog.lock().audio.insert(name, config).map_err(Error::Hang)?;
Ok(())
}
pub fn video_remove(&mut self, broadcast: Id, name: &str) -> Result<(), Error> {
let (_, catalog) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
catalog.lock().video.remove(name);
Ok(())
}
pub fn audio_remove(&mut self, broadcast: Id, name: &str) -> Result<(), Error> {
let (_, catalog) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
catalog.lock().audio.remove(name);
Ok(())
}
pub fn track(&mut self, broadcast: Id, name: &str) -> Result<Id, Error> {
let (broadcast, _) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
let track = broadcast.create_track(moq_net::Track {
name: name.to_string(),
priority: 0,
})?;
self.tracks.insert(track)
}
pub fn track_group(&mut self, track: Id) -> Result<Id, Error> {
let track = self.tracks.get_mut(track).ok_or(Error::TrackNotFound)?;
let group = track.append_group()?;
self.groups.insert(group)
}
pub fn track_frame(&mut self, track: Id, payload: &[u8]) -> Result<(), Error> {
let track = self.tracks.get_mut(track).ok_or(Error::TrackNotFound)?;
track.write_frame(bytes::Bytes::copy_from_slice(payload))?;
Ok(())
}
pub fn track_finish(&mut self, track: Id) -> Result<(), Error> {
let mut track = self.tracks.remove(track).ok_or(Error::TrackNotFound)?;
track.finish()?;
Ok(())
}
pub fn group_frame(&mut self, group: Id, payload: &[u8]) -> Result<(), Error> {
let group = self.groups.get_mut(group).ok_or(Error::GroupNotFound)?;
group.write_frame(bytes::Bytes::copy_from_slice(payload))?;
Ok(())
}
pub fn group_finish(&mut self, group: Id) -> Result<(), Error> {
let mut group = self.groups.remove(group).ok_or(Error::GroupNotFound)?;
group.finish()?;
Ok(())
}
}