use std::time::Duration;
use anyhow::Result;
use bytes::Bytes;
use moq_native::moq_lite::{self, Broadcast, Origin, Track};
use url::Url;
#[derive(Clone)]
pub struct MoqBuilder {
relay_url: String,
token: Option<String>,
path: String,
disable_tls_verify: bool,
}
impl MoqBuilder {
pub fn new() -> Self {
Self {
relay_url: "https://cdn.1ms.ai".to_string(),
token: None,
path: "anon/xoq".to_string(),
disable_tls_verify: false,
}
}
pub fn relay(mut self, url: &str) -> Self {
self.relay_url = url.to_string();
self
}
pub fn path(mut self, path: &str) -> Self {
self.path = path.to_string();
self
}
pub fn token(mut self, token: &str) -> Self {
self.token = Some(token.to_string());
self
}
pub fn disable_tls_verify(mut self) -> Self {
self.disable_tls_verify = true;
self
}
fn build_url(&self) -> Result<Url> {
let url_str = match &self.token {
Some(token) => format!("{}/{}?token={}", self.relay_url, self.path, token),
None => format!("{}/{}", self.relay_url, self.path),
};
Ok(Url::parse(&url_str)?)
}
pub fn build_url_for_path(&self, path: &str) -> Result<Url> {
let url_str = match &self.token {
Some(token) => format!("{}/{}?token={}", self.relay_url, path, token),
None => format!("{}/{}", self.relay_url, path),
};
Ok(Url::parse(&url_str)?)
}
fn create_client(&self) -> Result<moq_native::Client> {
let mut config = moq_native::ClientConfig::default();
if self.disable_tls_verify {
config.tls.disable_verify = Some(true);
}
config.init()
}
pub fn create_client_public(&self) -> Result<moq_native::Client> {
self.create_client()
}
pub async fn connect_duplex(self) -> Result<MoqConnection> {
let url = self.build_url()?;
let client = self.create_client()?;
let origin = Origin::produce();
let broadcast = Broadcast::produce();
origin
.producer
.publish_broadcast("", broadcast.consumer.clone());
let session = tokio::time::timeout(Duration::from_secs(15), {
client
.with_publish(origin.consumer.clone())
.with_consume(origin.producer.clone())
.connect(url)
})
.await
.map_err(|_| anyhow::anyhow!("MoQ connect timed out (15s)"))??;
Ok(MoqConnection {
broadcast: broadcast.producer,
origin_consumer: origin.consumer,
cached_broadcast: None,
_session: session,
})
}
pub async fn connect_publisher(self) -> Result<MoqPublisher> {
let url = self.build_url()?;
let client = self.create_client()?;
let origin = Origin::produce();
let broadcast = Broadcast::produce();
origin
.producer
.publish_broadcast("", broadcast.consumer.clone());
let session = tokio::time::timeout(
Duration::from_secs(15),
client.with_publish(origin.consumer).connect(url),
)
.await
.map_err(|_| anyhow::anyhow!("MoQ connect timed out (15s)"))??;
Ok(MoqPublisher {
broadcast: broadcast.producer,
session,
})
}
pub async fn connect_publisher_with_track(
self,
track_name: &str,
) -> Result<(MoqPublisher, MoqTrackWriter)> {
let url = self.build_url()?;
let client = self.create_client()?;
let origin = Origin::produce();
let mut broadcast = Broadcast::produce();
let track_producer = broadcast.producer.create_track(Track::new(track_name));
let writer = MoqTrackWriter {
track: track_producer,
group: None,
};
origin
.producer
.publish_broadcast("", broadcast.consumer.clone());
let session = tokio::time::timeout(
Duration::from_secs(15),
client.with_publish(origin.consumer).connect(url),
)
.await
.map_err(|_| anyhow::anyhow!("MoQ connect timed out (15s)"))??;
Ok((
MoqPublisher {
broadcast: broadcast.producer,
session,
},
writer,
))
}
pub async fn connect_subscriber(self) -> Result<MoqSubscriber> {
let url = self.build_url()?;
let client = self.create_client()?;
let origin = Origin::produce();
let session = tokio::time::timeout(
Duration::from_secs(15),
client.with_consume(origin.producer).connect(url),
)
.await
.map_err(|_| anyhow::anyhow!("MoQ connect timed out (15s)"))??;
Ok(MoqSubscriber {
origin_consumer: origin.consumer,
cached_broadcast: None,
_session: session,
})
}
}
impl Default for MoqBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct MoqConnection {
broadcast: moq_lite::BroadcastProducer,
origin_consumer: moq_lite::OriginConsumer,
cached_broadcast: Option<moq_lite::BroadcastConsumer>,
_session: moq_lite::Session,
}
impl MoqConnection {
pub fn create_track(&mut self, name: &str) -> MoqTrackWriter {
let track_producer = self.broadcast.create_track(Track::new(name));
MoqTrackWriter {
track: track_producer,
group: None,
}
}
pub async fn subscribe_track(&mut self, track_name: &str) -> Result<Option<MoqTrackReader>> {
let broadcast = if let Some(ref bc) = self.cached_broadcast {
bc.clone()
} else {
let bc = match tokio::time::timeout(
Duration::from_secs(10),
wait_for_broadcast(&mut self.origin_consumer),
)
.await
{
Ok(Some(broadcast)) => broadcast,
Ok(None) => return Ok(None),
Err(_) => anyhow::bail!("Timed out waiting for broadcast announcement (10s)"),
};
self.cached_broadcast = Some(bc.clone());
bc
};
let track_consumer = broadcast.subscribe_track(&Track::new(track_name));
Ok(Some(MoqTrackReader {
track: track_consumer,
group: None,
}))
}
}
pub struct MoqPublisher {
broadcast: moq_lite::BroadcastProducer,
session: moq_lite::Session,
}
impl MoqPublisher {
pub fn create_track(&mut self, name: &str) -> MoqTrackWriter {
let track_producer = self.broadcast.create_track(Track::new(name));
MoqTrackWriter {
track: track_producer,
group: None,
}
}
pub async fn closed(&self) -> Result<(), moq_lite::Error> {
self.session.closed().await
}
}
pub struct MoqSubscriber {
origin_consumer: moq_lite::OriginConsumer,
cached_broadcast: Option<moq_lite::BroadcastConsumer>,
_session: moq_lite::Session,
}
impl MoqSubscriber {
pub async fn subscribe_track(&mut self, track_name: &str) -> Result<Option<MoqTrackReader>> {
let broadcast = if let Some(ref bc) = self.cached_broadcast {
bc.clone()
} else {
let bc = match tokio::time::timeout(
Duration::from_secs(10),
wait_for_broadcast(&mut self.origin_consumer),
)
.await
{
Ok(Some(broadcast)) => broadcast,
Ok(None) => return Ok(None),
Err(_) => anyhow::bail!("Timed out waiting for broadcast announcement (10s)"),
};
self.cached_broadcast = Some(bc.clone());
bc
};
let track_consumer = broadcast.subscribe_track(&Track::new(track_name));
Ok(Some(MoqTrackReader {
track: track_consumer,
group: None,
}))
}
}
async fn wait_for_broadcast(
origin_consumer: &mut moq_lite::OriginConsumer,
) -> Option<moq_lite::BroadcastConsumer> {
loop {
match origin_consumer.announced().await {
Some((_path, Some(broadcast))) => return Some(broadcast),
Some((_path, None)) => continue,
None => return None,
}
}
}
pub struct MoqTrackWriter {
track: moq_lite::TrackProducer,
group: Option<moq_lite::GroupProducer>,
}
impl MoqTrackWriter {
pub fn from_producer(track: moq_lite::TrackProducer) -> Self {
Self { track, group: None }
}
pub fn write(&mut self, data: impl Into<Bytes>) {
self.track.write_frame(data.into());
}
pub fn write_stream(&mut self, data: impl Into<Bytes>) {
let group = self.group.get_or_insert_with(|| self.track.append_group());
group.write_frame(data.into());
}
pub fn write_str(&mut self, data: &str) {
self.write(Bytes::from(data.to_string()));
}
}
pub struct MoqTrackReader {
track: moq_lite::TrackConsumer,
group: Option<moq_lite::GroupConsumer>,
}
impl MoqTrackReader {
pub fn from_track(track: moq_lite::TrackConsumer) -> Self {
Self { track, group: None }
}
pub async fn read(&mut self) -> Result<Option<Bytes>> {
loop {
if let Some(group) = &mut self.group {
match group.read_frame().await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {
self.group = None;
}
Err(_) => {
self.group = None;
}
}
}
match self.track.next_group().await {
Ok(Some(group)) => {
self.group = Some(group);
}
Ok(None) => return Ok(None),
Err(e) => return Err(anyhow::anyhow!("Group error: {}", e)),
}
}
}
pub async fn read_string(&mut self) -> Result<Option<String>> {
if let Some(bytes) = self.read().await? {
return Ok(Some(String::from_utf8_lossy(&bytes).to_string()));
}
Ok(None)
}
}
pub struct MoqStream {
writer: MoqTrackWriter,
reader: MoqTrackReader,
_publisher: MoqPublisher,
_subscriber: MoqSubscriber,
}
impl MoqStream {
pub async fn connect_to(relay: &str, path: &str) -> Result<Self> {
Self::connect_internal(relay, path, false).await
}
pub async fn connect_to_insecure(relay: &str, path: &str) -> Result<Self> {
Self::connect_internal(relay, path, true).await
}
async fn connect_internal(relay: &str, path: &str, insecure: bool) -> Result<Self> {
let mut builder = MoqBuilder::new().relay(relay);
if insecure {
builder = builder.disable_tls_verify();
}
let pub_builder = builder.clone().path(&format!("{}/c2s", path));
let sub_builder = builder.path(&format!("{}/s2c", path));
let (pub_result, sub_result) = tokio::join!(
pub_builder.connect_publisher_with_track("data"),
Self::connect_and_subscribe_retry(sub_builder, "data")
);
let (publisher, writer) = pub_result?;
let (subscriber, reader) = sub_result?;
Ok(Self {
writer,
reader,
_publisher: publisher,
_subscriber: subscriber,
})
}
async fn connect_and_subscribe_retry(
builder: MoqBuilder,
track_name: &str,
) -> Result<(MoqSubscriber, MoqTrackReader)> {
let track = track_name.to_string();
loop {
let mut subscriber = builder.clone().connect_subscriber().await?;
match tokio::time::timeout(Duration::from_secs(2), subscriber.subscribe_track(&track))
.await
{
Ok(Ok(Some(reader))) => return Ok((subscriber, reader)),
Ok(Ok(None)) => {
tracing::debug!("Broadcast ended, reconnecting...");
}
Ok(Err(e)) => {
tracing::debug!("Subscribe error: {}, reconnecting...", e);
}
Err(_) => {
tracing::debug!("No broadcast yet, reconnecting...");
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
pub async fn accept_at(relay: &str, path: &str) -> Result<Self> {
Self::accept_internal(relay, path, false).await
}
pub async fn accept_at_insecure(relay: &str, path: &str) -> Result<Self> {
Self::accept_internal(relay, path, true).await
}
async fn accept_internal(relay: &str, path: &str, insecure: bool) -> Result<Self> {
let mut builder = MoqBuilder::new().relay(relay);
if insecure {
builder = builder.disable_tls_verify();
}
let pub_builder = builder.clone().path(&format!("{}/s2c", path));
let sub_builder = builder.path(&format!("{}/c2s", path));
let (pub_result, sub_result) = tokio::join!(
pub_builder.connect_publisher_with_track("data"),
Self::connect_and_subscribe_retry(sub_builder, "data")
);
let (publisher, writer) = pub_result?;
let (subscriber, reader) = sub_result?;
Ok(Self {
writer,
reader,
_publisher: publisher,
_subscriber: subscriber,
})
}
pub fn write(&mut self, data: impl Into<Bytes>) {
self.writer.write(data);
}
pub async fn read(&mut self) -> Result<Option<Bytes>> {
self.reader.read().await
}
}