use std::collections::HashSet;
use crate::{Audio, Catalog, Error, Result, Track, TrackConsumer, TrackProducer, Video};
use moq_transfork::{Announced, AnnouncedConsumer, Path, Session};
use derive_more::Debug;
#[derive(Debug)]
#[debug("{:?}", path)]
pub struct BroadcastProducer {
session: Session,
path: Path,
catalog: Catalog,
catalog_producer: moq_transfork::TrackProducer, }
impl BroadcastProducer {
pub fn new(mut session: Session, path: Path) -> Result<Self> {
let id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
let path = path.push(id);
let track = moq_transfork::Track {
path: path.clone(),
priority: -1,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
}
.produce();
session.publish(track.1)?;
Ok(Self {
session,
path,
catalog: Catalog::default(),
catalog_producer: track.0,
})
}
pub fn video(&mut self, info: Video) -> Result<TrackProducer> {
let (producer, consumer) = moq_transfork::Track {
path: self.path.clone().push(&info.track.name),
priority: info.track.priority,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
}
.produce();
self.session.publish(consumer)?;
self.catalog.video.push(info);
self.update()?;
Ok(TrackProducer::new(producer))
}
pub fn audio(&mut self, info: Audio) -> Result<TrackProducer> {
let (producer, consumer) = moq_transfork::Track {
path: self.path.clone().push(&info.track.name),
priority: info.track.priority,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
}
.produce();
self.session.publish(consumer)?;
self.catalog.audio.push(info);
self.update()?;
Ok(TrackProducer::new(producer))
}
pub fn catalog(&self) -> &Catalog {
&self.catalog
}
fn update(&mut self) -> Result<()> {
let frame = self.catalog.to_string()?;
let mut group = self.catalog_producer.append_group();
group.write_frame(frame);
Ok(())
}
}
#[derive(Debug)]
#[debug("{:?}", announced.prefix())]
pub struct BroadcastAnnounced {
session: Session,
announced: AnnouncedConsumer,
active: HashSet<String>,
}
impl BroadcastAnnounced {
pub fn new(session: Session, path: Path) -> Self {
let announced = session.announced_prefix(path);
Self {
session,
announced,
active: HashSet::new(),
}
}
pub async fn broadcast(&mut self) -> Option<BroadcastConsumer> {
while let Some(suffix) = self.announced.next().await {
match suffix {
Announced::Active(suffix) => match self.try_load(suffix).await {
Ok(consumer) => return consumer,
Err(err) => tracing::warn!(?err, "failed to load broadcast"),
},
Announced::Ended(suffix) => self.unload(suffix),
}
}
None
}
async fn try_load(&mut self, suffix: Path) -> Result<Option<BroadcastConsumer>> {
let name = suffix.first().ok_or(Error::InvalidSession)?;
tracing::info!(?name, "loading broadcast");
if self.active.contains(name.as_str()) {
return Ok(None);
}
let path = self.announced.prefix().clone().push(name);
tracing::info!(prefix = ?self.announced.prefix(), ?path, "loading broadcast");
let broadcast = BroadcastConsumer::new(self.session.clone(), path);
self.active.insert(name.to_string());
Ok(Some(broadcast))
}
fn unload(&mut self, suffix: Path) {
let name = suffix.first().expect("invalid path");
self.active.remove(name.as_str());
}
}
#[derive(Debug)]
#[debug("{:?}", path)]
pub struct BroadcastConsumer {
session: Session,
path: Path,
catalog_track: moq_transfork::TrackConsumer,
catalog_group: Option<moq_transfork::GroupConsumer>,
}
impl BroadcastConsumer {
pub fn new(session: Session, path: Path) -> Self {
let track = moq_transfork::Track {
path: path.clone(),
priority: -1,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
};
let catalog_track = session.subscribe(track);
tracing::info!(?path, "fetching catalog");
Self {
session,
path,
catalog_track,
catalog_group: None,
}
}
pub async fn catalog(&mut self) -> Result<Option<Catalog>> {
loop {
tokio::select! {
biased;
Some(frame) = async { self.catalog_group.as_mut()?.read_frame().await.transpose() } => {
let catalog = Catalog::from_slice(&frame?)?;
self.catalog_group.take(); return Ok(Some(catalog));
},
Some(group) = async { self.catalog_track.next_group().await.transpose() } => {
println!("{:?}", group);
self.catalog_group.replace(group?);
},
else => return Ok(None),
}
}
}
pub fn track(&self, track: &Track) -> TrackConsumer {
let track = moq_transfork::Track {
path: self.path.clone().push(&track.name),
priority: track.priority,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
};
let track = self.session.subscribe(track);
TrackConsumer::new(track)
}
}