use url::Url;
use anyhow::Context;
use clap::Parser;
mod clock;
use moq_lite::*;
#[derive(Parser, Clone)]
pub struct Config {
#[arg(long)]
pub url: Url,
#[arg(long)]
pub broadcast: String,
#[command(flatten)]
pub client: moq_native::ClientConfig,
#[arg(long, default_value = "seconds")]
pub track: String,
#[command(flatten)]
pub log: moq_native::Log,
#[command(subcommand)]
pub role: Command,
}
#[derive(Parser, Clone)]
pub enum Command {
Publish,
Subscribe,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = Config::parse();
config.log.init();
let client = config.client.init()?;
tracing::info!(url = ?config.url, "connecting to server");
let track = Track {
name: config.track,
priority: 0,
};
let origin = moq_lite::Origin::produce();
match config.role {
Command::Publish => {
let mut broadcast = moq_lite::Broadcast::produce();
let track = broadcast.create_track(track)?;
let clock = clock::Publisher::new(track);
origin.publish_broadcast(&config.broadcast, broadcast.consume());
let session = client.with_publish(origin.consume()).connect(config.url).await?;
tokio::select! {
res = session.closed() => res.context("session closed"),
_ = clock.run() => Ok(()),
}
}
Command::Subscribe => {
let session = client.with_consume(origin.clone()).connect(config.url).await?;
tracing::info!(broadcast = %config.broadcast, "waiting for broadcast to be online");
let path: moq_lite::Path<'_> = config.broadcast.into();
let mut origin = origin
.consume_only(&[path])
.context("not allowed to consume broadcast")?;
let mut clock: Option<clock::Subscriber> = None;
loop {
tokio::select! {
Some(announce) = origin.announced() => match announce {
(path, Some(broadcast)) => {
tracing::info!(broadcast = %path, "broadcast is online, subscribing to track");
let track = broadcast.subscribe_track(&track)?;
clock = Some(clock::Subscriber::new(track));
}
(path, None) => {
tracing::warn!(broadcast = %path, "broadcast is offline, waiting...");
}
},
res = session.closed() => return res.context("session closed"),
Some(res) = async { Some(clock.take()?.run().await) } => res.context("clock error")?,
}
}
}
}
}