use crate::{Audio, Catalog, Error, Result, Track, TrackConsumer, TrackProducer, Video};
use moq_async::{spawn, Lock};
use moq_transfork::{Announced, AnnouncedConsumer, AnnouncedMatch, Session};
use derive_more::Debug;
#[derive(Debug, Clone)]
#[debug("{:?}", path)]
pub struct BroadcastProducer {
pub session: Session,
pub path: String,
id: u64,
catalog: Lock<CatalogProducer>,
}
impl BroadcastProducer {
pub fn new(mut session: Session, path: String) -> Result<Self> {
let id = web_time::SystemTime::now()
.duration_since(web_time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let full = format!("{}/{}/catalog.json", path, id);
let catalog = moq_transfork::Track {
path: full,
priority: -1,
order: moq_transfork::GroupOrder::Desc,
}
.produce();
session.publish(catalog.1)?;
let catalog = Lock::new(CatalogProducer::new(catalog.0)?);
Ok(Self {
session,
path,
id,
catalog,
})
}
pub fn catalog(&self) -> Catalog {
self.catalog.lock().current.clone()
}
pub fn publish_video(&mut self, info: Video) -> Result<TrackProducer> {
let path = format!("{}/{}/{}.karp", self.path, self.id, &info.track.name);
let (producer, consumer) = moq_transfork::Track {
path,
priority: info.track.priority,
order: moq_transfork::GroupOrder::Desc,
}
.produce();
self.session.publish(consumer)?;
let mut catalog = self.catalog.lock();
catalog.current.video.push(info.clone());
catalog.publish()?;
let producer = TrackProducer::new(producer);
let consumer = producer.subscribe();
let catalog = self.catalog.clone();
spawn(async move {
consumer.closed().await.ok();
let mut catalog = catalog.lock();
catalog.current.video.retain(|v| v.track != info.track);
catalog.publish().unwrap();
});
Ok(producer)
}
pub fn publish_audio(&mut self, info: Audio) -> Result<TrackProducer> {
let path = format!("{}/{}/{}.karp", self.path, self.id, &info.track.name);
let (producer, consumer) = moq_transfork::Track {
path,
priority: info.track.priority,
order: moq_transfork::GroupOrder::Desc,
}
.produce();
self.session.publish(consumer)?;
let mut catalog = self.catalog.lock();
catalog.current.audio.push(info.clone());
catalog.publish()?;
let producer = TrackProducer::new(producer);
let consumer = producer.subscribe();
let catalog = self.catalog.clone();
spawn(async move {
consumer.closed().await.ok();
let mut catalog = catalog.lock();
catalog.current.audio.retain(|v| v.track != info.track);
catalog.publish().unwrap();
});
Ok(producer)
}
}
struct CatalogProducer {
current: Catalog,
track: moq_transfork::TrackProducer,
}
impl CatalogProducer {
fn new(track: moq_transfork::TrackProducer) -> Result<Self> {
let mut this = Self {
current: Catalog::default(),
track,
};
this.publish()?;
Ok(this)
}
fn publish(&mut self) -> Result<()> {
let frame = self.current.to_string()?;
let mut group = self.track.append_group();
group.write_frame(frame);
Ok(())
}
}
#[derive(Debug)]
#[debug("{:?}", path)]
pub struct BroadcastConsumer {
pub session: Session,
pub path: String,
announced: AnnouncedConsumer,
current: Option<String>,
ended: bool,
catalog_latest: Option<Catalog>,
catalog_track: Option<moq_transfork::TrackConsumer>,
catalog_group: Option<moq_transfork::GroupConsumer>,
}
impl BroadcastConsumer {
pub fn new(session: Session, path: String) -> Self {
let filter = moq_transfork::Filter::Wildcard {
prefix: format!("{}/", path),
suffix: "/catalog.json".to_string(),
};
let announced = session.announced(filter);
Self {
session,
path,
announced,
current: None,
ended: false,
catalog_latest: None,
catalog_track: None,
catalog_group: None,
}
}
pub fn catalog(&self) -> Option<&Catalog> {
self.catalog_latest.as_ref()
}
pub async fn next_catalog(&mut self) -> Result<Option<&Catalog>> {
loop {
if self.ended {
self.ended = false;
return Ok(None);
}
tokio::select! {
biased;
Some(announced) = self.announced.next() => {
match announced {
Announced::Active(am) => self.load(am),
Announced::Ended(am) => self.unload(am),
Announced::Live => {
if self.current.is_none() {
return Ok(None)
}
},
}
},
Some(group) = async { self.catalog_track.as_mut()?.next_group().await.transpose() } => {
self.catalog_group.replace(group?);
},
Some(frame) = async { self.catalog_group.as_mut()?.read_frame().await.transpose() } => {
self.catalog_latest = Some(Catalog::from_slice(&frame?)?);
self.catalog_group.take(); return Ok(self.catalog_latest.as_ref());
},
else => return Err(self.session.closed().await.into()),
}
}
}
fn load(&mut self, am: AnnouncedMatch) {
let id = am.capture();
if let Some(current) = &self.current {
if id <= current.as_str() {
tracing::warn!(?id, ?current, "ignoring old broadcast");
return;
}
}
let id = id.to_string();
let path = am.to_full();
tracing::info!(?path, "loading catalog");
let track = moq_transfork::Track {
path,
priority: -1,
order: moq_transfork::GroupOrder::Desc,
};
self.catalog_track = Some(self.session.subscribe(track));
self.current = Some(id);
}
fn unload(&mut self, am: AnnouncedMatch) {
if let Some(current) = self.current.as_ref() {
if current.as_str() == am.capture() {
self.current = None;
self.catalog_track = None;
self.catalog_group = None;
self.ended = true;
}
}
}
pub fn track(&self, track: &Track) -> Result<TrackConsumer> {
let id = self.current.as_ref().ok_or(Error::MissingTrack)?;
let track = moq_transfork::Track {
path: format!("{}/{}/{}.karp", self.path, id, &track.name),
priority: track.priority,
order: moq_transfork::GroupOrder::Desc,
};
let track = self.session.subscribe(track);
Ok(TrackConsumer::new(track))
}
}