use anyhow::{Context, Result};
use tokio::time::{timeout, Duration};
use moq_native_ietf::quic;
use moq_transport::{coding::TrackNamespace, serve::Tracks, session::Session};
use crate::Args;
const TEST_TIMEOUT: Duration = Duration::from_secs(10);
const TEST_NAMESPACE: &str = "moq-test/interop";
const TEST_TRACK: &str = "test-track";
async fn connect(args: &Args) -> Result<(web_transport::Session, String)> {
let tls = args.tls.load()?;
let quic = quic::Endpoint::new(quic::Config::new(args.bind, None, tls))?;
let (session, connection_id) = quic.client.connect(&args.relay, None).await?;
Ok((session, connection_id))
}
#[derive(Debug, Default)]
pub struct TestConnectionIds {
pub cids: Vec<String>,
}
impl TestConnectionIds {
pub fn add(&mut self, cid: String) {
self.cids.push(cid);
}
}
pub async fn test_setup_only(args: &Args) -> Result<TestConnectionIds> {
timeout(TEST_TIMEOUT, async {
let (session, cid) = connect(args).await.context("failed to connect to relay")?;
let mut cids = TestConnectionIds::default();
cids.add(cid);
let (session, _publisher, _subscriber) = Session::connect(session, None)
.await
.context("SETUP exchange failed")?;
log::info!("SETUP exchange completed successfully");
drop(session);
Ok(cids)
})
.await
.context("test timed out")?
}
pub async fn test_announce_only(args: &Args) -> Result<TestConnectionIds> {
timeout(TEST_TIMEOUT, async {
let (session, cid) = connect(args).await.context("failed to connect to relay")?;
let mut cids = TestConnectionIds::default();
cids.add(cid);
let (session, mut publisher, _subscriber) = Session::connect(session, None)
.await
.context("SETUP exchange failed")?;
let namespace = TrackNamespace::from_utf8_path(TEST_NAMESPACE);
let (_, _, reader) = Tracks::new(namespace.clone()).produce();
log::info!("Announcing namespace: {}", TEST_NAMESPACE);
let announce_result = tokio::select! {
res = publisher.announce(reader) => res,
res = session.run() => {
res.context("session error")?;
anyhow::bail!("session ended before announce completed");
}
_ = tokio::time::sleep(Duration::from_secs(2)) => {
log::info!("Announce succeeded (no error received, waiting for subscriptions timed out)");
return Ok(cids);
}
};
announce_result.context("announce failed")?;
Ok(cids)
})
.await
.context("test timed out")?
}
pub async fn test_subscribe_error(args: &Args) -> Result<TestConnectionIds> {
timeout(TEST_TIMEOUT, async {
let (session, cid) = connect(args).await.context("failed to connect to relay")?;
let mut cids = TestConnectionIds::default();
cids.add(cid);
let (session, _publisher, mut subscriber) = Session::connect(session, None)
.await
.context("SETUP exchange failed")?;
let namespace = TrackNamespace::from_utf8_path("nonexistent/namespace");
let (mut writer, _, _reader) = Tracks::new(namespace.clone()).produce();
let track = writer
.create(TEST_TRACK)
.ok_or_else(|| anyhow::anyhow!("failed to create track (already exists?)"))?;
log::info!(
"Subscribing to non-existent track: {}/{}",
"nonexistent/namespace",
TEST_TRACK
);
let subscribe_result = tokio::select! {
res = subscriber.subscribe(track) => res,
res = session.run() => {
res.context("session error")?;
anyhow::bail!("session ended before subscribe completed");
}
};
match subscribe_result {
Ok(()) => {
anyhow::bail!("subscribe succeeded but should have failed (track doesn't exist)");
}
Err(e) => {
let err_str = e.to_string().to_lowercase();
let is_expected_error = err_str.contains("not found")
|| err_str.contains("notfound")
|| err_str.contains("no such")
|| err_str.contains("doesn't exist")
|| err_str.contains("does not exist")
|| err_str.contains("unknown");
if is_expected_error {
log::info!("Got expected 'not found' error: {}", e);
} else {
log::warn!(
"Got error but not clearly 'not found': {}. \
This may indicate a different error type than expected.",
e
);
}
Ok(cids)
}
}
})
.await
.context("test timed out")?
}
pub async fn test_announce_subscribe(args: &Args) -> Result<TestConnectionIds> {
timeout(TEST_TIMEOUT, async {
let mut cids = TestConnectionIds::default();
let (pub_session, pub_cid) = connect(args).await.context("publisher failed to connect")?;
cids.add(pub_cid);
let (pub_session, mut publisher, _) = Session::connect(pub_session, None)
.await
.context("publisher SETUP failed")?;
let (sub_session, sub_cid) = connect(args)
.await
.context("subscriber failed to connect")?;
cids.add(sub_cid);
let (sub_session, _, mut subscriber) = Session::connect(sub_session, None)
.await
.context("subscriber SETUP failed")?;
let namespace = TrackNamespace::from_utf8_path(TEST_NAMESPACE);
let (mut pub_writer, _, pub_reader) = Tracks::new(namespace.clone()).produce();
let _track_writer = pub_writer.create(TEST_TRACK);
log::info!("Publisher announcing namespace: {}", TEST_NAMESPACE);
let (mut sub_writer, _, _sub_reader) = Tracks::new(namespace.clone()).produce();
let sub_track = sub_writer
.create(TEST_TRACK)
.ok_or_else(|| anyhow::anyhow!("failed to create subscriber track"))?;
log::info!(
"Subscriber subscribing to track: {}/{}",
TEST_NAMESPACE,
TEST_TRACK
);
tokio::select! {
res = publisher.announce(pub_reader) => {
res.context("publisher announce failed")?;
log::info!("Publisher announce completed");
}
res = subscriber.subscribe(sub_track) => {
match res {
Ok(()) => log::info!("Subscriber got SUBSCRIBE_OK - relay routed subscription correctly"),
Err(e) => log::info!("Subscriber got error: {} - subscription was processed", e),
}
}
res = pub_session.run() => {
res.context("publisher session error")?;
}
res = sub_session.run() => {
res.context("subscriber session error")?;
}
_ = tokio::time::sleep(Duration::from_secs(3)) => {
log::info!("Test timeout reached - subscription routing may still be in progress");
}
};
Ok(cids)
})
.await
.context("test timed out")?
}
pub async fn test_publish_namespace_done(args: &Args) -> Result<TestConnectionIds> {
timeout(TEST_TIMEOUT, async {
let (session, cid) = connect(args).await.context("failed to connect to relay")?;
let mut cids = TestConnectionIds::default();
cids.add(cid);
let (session, mut publisher, _subscriber) = Session::connect(session, None)
.await
.context("SETUP exchange failed")?;
let namespace = TrackNamespace::from_utf8_path(TEST_NAMESPACE);
let (_, _, reader) = Tracks::new(namespace.clone()).produce();
log::info!("Announcing namespace: {}", TEST_NAMESPACE);
let result = tokio::select! {
res = publisher.announce(reader) => res,
res = session.run() => {
res.context("session error")?;
anyhow::bail!("session ended before announce completed");
}
_ = tokio::time::sleep(Duration::from_secs(2)) => {
log::info!("Announce active, now sending PUBLISH_NAMESPACE_DONE");
Ok(())
}
};
result.context("announce failed")?;
tokio::time::sleep(Duration::from_millis(100)).await;
log::info!("PUBLISH_NAMESPACE_DONE sent successfully");
Ok(cids)
})
.await
.context("test timed out")?
}
pub async fn test_subscribe_before_announce(args: &Args) -> Result<TestConnectionIds> {
timeout(TEST_TIMEOUT, async {
let mut cids = TestConnectionIds::default();
let (sub_session, sub_cid) = connect(args)
.await
.context("subscriber failed to connect")?;
cids.add(sub_cid);
let (sub_session, _, mut subscriber) = Session::connect(sub_session, None)
.await
.context("subscriber SETUP failed")?;
let namespace = TrackNamespace::from_utf8_path(TEST_NAMESPACE);
let (mut sub_writer, _, _sub_reader) = Tracks::new(namespace.clone()).produce();
let sub_track = sub_writer
.create(TEST_TRACK)
.ok_or_else(|| anyhow::anyhow!("failed to create subscriber track"))?;
log::info!(
"Subscriber subscribing BEFORE announce: {}/{}",
TEST_NAMESPACE,
TEST_TRACK
);
let sub_handle = tokio::spawn(async move {
let result = tokio::select! {
res = subscriber.subscribe(sub_track) => res,
res = sub_session.run() => {
res.map_err(|e| moq_transport::serve::ServeError::Internal(e.to_string()))?;
Err(moq_transport::serve::ServeError::Done)
}
};
result
});
tokio::time::sleep(Duration::from_millis(500)).await;
let (pub_session, pub_cid) = connect(args).await.context("publisher failed to connect")?;
cids.add(pub_cid);
let (pub_session, mut publisher, _) = Session::connect(pub_session, None)
.await
.context("publisher SETUP failed")?;
let (mut pub_writer, _, pub_reader) = Tracks::new(namespace.clone()).produce();
let _track_writer = pub_writer.create(TEST_TRACK);
log::info!(
"Publisher announcing namespace (after subscriber): {}",
TEST_NAMESPACE
);
tokio::select! {
res = publisher.announce(pub_reader) => {
res.context("publisher announce failed")?;
}
res = pub_session.run() => {
res.context("publisher session error")?;
}
_ = tokio::time::sleep(Duration::from_secs(3)) => {
log::info!("Publisher announce timeout (expected)");
}
};
tokio::select! {
res = sub_handle => {
match res {
Ok(Ok(())) => log::info!("Subscriber completed successfully"),
Ok(Err(e)) => log::info!("Subscriber got error: {} (may be expected)", e),
Err(e) => log::warn!("Subscriber task panicked: {}", e),
}
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
log::info!("Subscriber still waiting (test complete)");
}
};
Ok(cids)
})
.await
.context("test timed out")?
}