use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
moq_native::Log::new(tracing::Level::DEBUG).init();
let origin = moq_lite::Origin::produce();
let consumer = origin.consume();
tokio::select! {
res = run_session(origin) => res,
res = run_subscribe(consumer) => res,
}
}
async fn run_session(origin: moq_lite::OriginProducer) -> anyhow::Result<()> {
let client = moq_native::ClientConfig::default().init()?;
let url = url::Url::parse("https://cdn.moq.dev/anon/video-example").unwrap();
let reconnect = client.with_consume(origin).reconnect(url);
reconnect.closed().await
}
async fn run_subscribe(mut consumer: moq_lite::OriginConsumer) -> anyhow::Result<()> {
let (path, broadcast) = consumer
.announced()
.await
.ok_or_else(|| anyhow::anyhow!("origin closed"))?;
let broadcast = broadcast.ok_or_else(|| anyhow::anyhow!("broadcast unannounced: {path}"))?;
tracing::info!(%path, "broadcast announced");
let catalog_track = broadcast.subscribe_track(&hang::Catalog::default_track())?;
let mut catalog = hang::CatalogConsumer::new(catalog_track);
let info = catalog.next().await?.ok_or_else(|| anyhow::anyhow!("no catalog"))?;
let (name, config) = info
.video
.renditions
.iter()
.next()
.ok_or_else(|| anyhow::anyhow!("no video renditions"))?;
tracing::info!(
%name,
codec = %config.codec,
width = ?config.coded_width,
height = ?config.coded_height,
"subscribing to video track"
);
let track = moq_lite::Track {
name: name.clone(),
priority: 1,
};
let track_consumer = broadcast.subscribe_track(&track)?;
let mut ordered = hang::container::OrderedConsumer::new(track_consumer, Duration::from_millis(500));
while let Some(frame) = ordered.read().await? {
tracing::info!(
timestamp = ?frame.timestamp,
keyframe = frame.is_keyframe(),
group = frame.group,
bytes = frame.payload.num_bytes(),
"received frame"
);
}
Ok(())
}