use crate::{Audio, Catalog, Error, Result, Track, TrackConsumer, TrackProducer, Video};
use moq_transfork::{Announced, AnnouncedConsumer, Path, Session};
use derive_more::Debug;
#[derive(Debug)]
#[debug("{:?}", path)]
pub struct BroadcastProducer {
pub session: Session,
pub path: Path,
id: u128,
catalog: Catalog,
catalog_producer: moq_transfork::TrackProducer, }
impl BroadcastProducer {
pub fn new(mut session: Session, path: Path) -> Result<Self> {
let id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
let full = path.clone().push(id);
let track = moq_transfork::Track {
path: full,
priority: -1,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
}
.produce();
session.publish(track.1)?;
Ok(Self {
session,
path,
id,
catalog: Catalog::default(),
catalog_producer: track.0,
})
}
pub fn video(&mut self, info: Video) -> Result<TrackProducer> {
let path = self.path.clone().push(self.id).push(&info.track.name);
let (producer, consumer) = moq_transfork::Track {
path,
priority: info.track.priority,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
}
.produce();
self.session.publish(consumer)?;
self.catalog.video.push(info);
self.update()?;
Ok(TrackProducer::new(producer))
}
pub fn audio(&mut self, info: Audio) -> Result<TrackProducer> {
let path = self.path.clone().push(self.id).push(&info.track.name);
let (producer, consumer) = moq_transfork::Track {
path,
priority: info.track.priority,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
}
.produce();
self.session.publish(consumer)?;
self.catalog.audio.push(info);
self.update()?;
Ok(TrackProducer::new(producer))
}
pub fn catalog(&self) -> &Catalog {
&self.catalog
}
fn update(&mut self) -> Result<()> {
let frame = self.catalog.to_string()?;
let mut group = self.catalog_producer.append_group();
group.write_frame(frame);
Ok(())
}
}
#[derive(Debug)]
#[debug("{:?}", path)]
pub struct BroadcastConsumer {
pub session: Session,
pub path: Path,
announced: AnnouncedConsumer,
current: Option<String>,
catalog_track: Option<moq_transfork::TrackConsumer>,
catalog_group: Option<moq_transfork::GroupConsumer>,
}
impl BroadcastConsumer {
pub fn new(session: Session, path: Path) -> Self {
let announced = session.announced_prefix(path.clone());
Self {
session,
path,
announced,
current: None,
catalog_track: None,
catalog_group: None,
}
}
pub async fn catalog(&mut self) -> Result<Option<Catalog>> {
loop {
tokio::select! {
biased;
Some(announced) = self.announced.next() => {
match announced {
Announced::Active(suffix) => self.load(suffix),
Announced::Ended(suffix) => self.unload(suffix),
Announced::Live => {
if self.current.is_none() { return Ok(None) }
},
}
},
Some(group) = async { self.catalog_track.as_mut()?.next_group().await.transpose() } => {
self.catalog_group.replace(group?);
},
Some(frame) = async { self.catalog_group.as_mut()?.read_frame().await.transpose() } => {
let catalog = Catalog::from_slice(&frame?)?;
self.catalog_group.take(); return Ok(Some(catalog));
},
else => return Ok(None),
}
}
}
fn load(&mut self, suffix: Path) {
if suffix.len() != 1 {
return;
}
let id = &suffix[0];
if let Some(current) = &self.current {
if id <= current {
tracing::warn!(?id, ?current, "ignoring old broadcast");
return;
}
}
let path = self.announced.prefix().clone().push(id);
tracing::info!(?path, "loading catalog");
let track = moq_transfork::Track {
path,
priority: -1,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
};
self.catalog_track = Some(self.session.subscribe(track));
self.current = Some(id.to_string());
}
fn unload(&mut self, suffix: Path) {
if suffix.len() != 1 {
return;
}
let id = &suffix[0];
if self.current.as_ref() == Some(id) {
self.current = None;
self.catalog_track = None;
self.catalog_group = None;
}
}
pub fn track(&self, track: &Track) -> Result<TrackConsumer> {
let path = self.catalog_track.as_ref().ok_or(Error::MissingTrack)?.path.clone();
let track = moq_transfork::Track {
path: path.push(&track.name),
priority: track.priority,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
};
let track = self.session.subscribe(track);
Ok(TrackConsumer::new(track))
}
}