use moq_native::moq_net::{Origin, Track};
use std::time::Duration;
const TIMEOUT: Duration = Duration::from_secs(10);
#[cfg(any(feature = "quinn", feature = "quiche", feature = "noq"))]
async fn backend_test(scheme: &str, backend: moq_native::QuicBackend) {
let pub_origin = Origin::random().produce();
let mut broadcast = pub_origin.create_broadcast("test").expect("failed to create broadcast");
let mut track = broadcast
.create_track(Track::new("video"))
.expect("failed to create track");
let mut group = track.append_group().expect("failed to append group");
group.write_frame(b"hello".as_ref()).expect("failed to write frame");
group.finish().expect("failed to finish group");
let mut server_config = moq_native::ServerConfig::default();
server_config.bind = Some("[::]:0".to_string());
server_config.tls.generate = vec!["localhost".into()];
server_config.backend = Some(backend.clone());
let mut server = server_config.init().expect("failed to init server");
let addr = server.local_addr().expect("failed to get local addr");
let sub_origin = Origin::random().produce();
let mut announcements = sub_origin.consume();
let mut client_config = moq_native::ClientConfig::default();
client_config.tls.disable_verify = Some(true);
client_config.backend = Some(backend);
let client = client_config.init().expect("failed to init client");
let url: url::Url = format!("{scheme}://localhost:{}", addr.port()).parse().unwrap();
let server_handle = tokio::spawn(async move {
let request = server.accept().await.expect("no incoming connection");
let session = request.with_publish(pub_origin.consume()).ok().await?;
let _broadcast = broadcast;
let _track = track;
let _ = session.closed().await;
Ok::<_, anyhow::Error>(())
});
let client = client.with_consume(sub_origin);
let session = tokio::time::timeout(TIMEOUT, client.connect(url))
.await
.expect("client connect timed out")
.expect("client connect failed");
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced())
.await
.expect("announce timed out")
.expect("origin closed");
assert_eq!(path.as_str(), "test");
let bc = bc.expect("expected announce, got unannounce");
let mut track_sub = bc
.subscribe_track(&Track::new("video"))
.expect("subscribe_track failed");
let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.recv_group())
.await
.expect("recv_group timed out")
.expect("recv_group failed")
.expect("track closed prematurely");
let frame = tokio::time::timeout(TIMEOUT, group_sub.read_frame())
.await
.expect("read_frame timed out")
.expect("read_frame failed")
.expect("group closed prematurely");
assert_eq!(&*frame, b"hello");
drop(session);
server_handle
.await
.expect("server task panicked")
.expect("server task failed");
}
#[cfg(any(feature = "quinn", feature = "noq"))]
fn generate_mtls_certs() -> (tempfile::TempDir, MtlsPaths) {
use rcgen::{BasicConstraints, CertificateParams, IsCa, Issuer, KeyPair};
use std::io::Write;
let dir = tempfile::tempdir().expect("failed to create tempdir");
let ca_key = KeyPair::generate().expect("ca key");
let mut ca_params = CertificateParams::new(Vec::new()).expect("ca params");
ca_params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
let ca_cert = ca_params.self_signed(&ca_key).expect("ca cert");
let issuer = Issuer::from_params(&ca_params, &ca_key);
let server_key = KeyPair::generate().expect("server key");
let server_params = CertificateParams::new(vec!["localhost".to_string()]).expect("server params");
let server_cert = server_params.signed_by(&server_key, &issuer).expect("server cert");
let client_key = KeyPair::generate().expect("client key");
let client_params = CertificateParams::new(Vec::new()).expect("client params");
let client_cert = client_params.signed_by(&client_key, &issuer).expect("client cert");
let write = |name: &str, contents: String| {
let path = dir.path().join(name);
let mut file = std::fs::File::create(&path).expect("create pem file");
file.write_all(contents.as_bytes()).expect("write pem file");
path
};
let paths = MtlsPaths {
ca: write("ca.pem", ca_cert.pem()),
server_cert: write("server.pem", server_cert.pem()),
server_key: write("server.key", server_key.serialize_pem()),
client_cert: write("client.pem", client_cert.pem()),
client_key: write("client.key", client_key.serialize_pem()),
};
(dir, paths)
}
#[cfg(any(feature = "quinn", feature = "noq"))]
struct MtlsPaths {
ca: std::path::PathBuf,
server_cert: std::path::PathBuf,
server_key: std::path::PathBuf,
client_cert: std::path::PathBuf,
client_key: std::path::PathBuf,
}
#[cfg(any(feature = "quinn", feature = "noq"))]
async fn mtls_test(scheme: &str, backend: moq_native::QuicBackend) {
let (_dir, paths) = generate_mtls_certs();
let pub_origin = Origin::random().produce();
let mut server_config = moq_native::ServerConfig::default();
server_config.bind = Some("[::]:0".to_string());
server_config.tls.cert = vec![paths.server_cert.clone()];
server_config.tls.key = vec![paths.server_key.clone()];
server_config.tls.root = vec![paths.ca.clone()];
server_config.backend = Some(backend.clone());
let mut server = server_config.init().expect("failed to init server");
let addr = server.local_addr().expect("failed to get local addr");
let mut client_config = moq_native::ClientConfig::default();
client_config.tls.root = vec![paths.ca.clone()];
client_config.tls.system_roots = Some(false);
client_config.tls.cert = Some(paths.client_cert.clone());
client_config.tls.key = Some(paths.client_key.clone());
client_config.backend = Some(backend);
let client = client_config.init().expect("failed to init client");
let url: url::Url = format!("{scheme}://localhost:{}", addr.port()).parse().unwrap();
let server_handle = tokio::spawn(async move {
let request = server.accept().await.expect("no incoming connection");
let has_cert = request.peer_identity().is_some();
let session = request.with_publish(pub_origin.consume()).ok().await?;
let _ = session.closed().await;
Ok::<_, anyhow::Error>(has_cert)
});
let session = tokio::time::timeout(TIMEOUT, client.connect(url))
.await
.expect("client connect timed out")
.expect("client connect failed");
drop(session);
let has_cert = server_handle
.await
.expect("server task panicked")
.expect("server task failed");
assert!(has_cert, "server did not observe the client certificate");
}
#[cfg(feature = "quinn")]
#[tracing_test::traced_test]
#[tokio::test]
async fn quinn_raw_quic() {
backend_test("moqt", moq_native::QuicBackend::Quinn).await;
}
#[cfg(feature = "quinn")]
#[tracing_test::traced_test]
#[tokio::test]
async fn quinn_mtls() {
mtls_test("https", moq_native::QuicBackend::Quinn).await;
}
#[cfg(feature = "quinn")]
#[tracing_test::traced_test]
#[tokio::test]
async fn quinn_webtransport() {
backend_test("https", moq_native::QuicBackend::Quinn).await;
}
#[cfg(feature = "quiche")]
#[tracing_test::traced_test]
#[tokio::test]
#[ignore = "quiche raw QUIC (moqt://) fails; likely a web-transport-quiche bug"]
async fn quiche_raw_quic() {
backend_test("moqt", moq_native::QuicBackend::Quiche).await;
}
#[cfg(feature = "quiche")]
#[tracing_test::traced_test]
#[tokio::test]
async fn quiche_webtransport() {
backend_test("https", moq_native::QuicBackend::Quiche).await;
}
#[cfg(feature = "iroh")]
#[tracing_test::traced_test]
#[tokio::test]
async fn iroh_connect() {
use moq_native::iroh::EndpointConfig;
let pub_origin = Origin::random().produce();
let mut broadcast = pub_origin.create_broadcast("test").expect("failed to create broadcast");
let mut track = broadcast
.create_track(Track::new("video"))
.expect("failed to create track");
let mut group = track.append_group().expect("failed to append group");
group.write_frame(b"hello".as_ref()).expect("failed to write frame");
group.finish().expect("failed to finish group");
let mut server_iroh_config = EndpointConfig::default();
server_iroh_config.enabled = Some(true);
let server_endpoint = server_iroh_config
.bind()
.await
.expect("failed to bind server iroh endpoint")
.expect("server iroh endpoint not enabled");
let server_addr = server_endpoint.addr();
let server_addrs: Vec<std::net::SocketAddr> = server_addr.ip_addrs().copied().collect();
let server_endpoint_id = server_endpoint.id();
let mut server_config = moq_native::ServerConfig::default();
server_config.bind = Some("[::]:0".to_string());
server_config.tls.generate = vec!["localhost".into()];
let mut server = server_config
.init()
.expect("failed to init server")
.with_iroh(Some(server_endpoint));
let sub_origin = Origin::random().produce();
let mut announcements = sub_origin.consume();
let mut client_iroh_config = EndpointConfig::default();
client_iroh_config.enabled = Some(true);
let client_endpoint = client_iroh_config
.bind()
.await
.expect("failed to bind client iroh endpoint")
.expect("client iroh endpoint not enabled");
let mut client_config = moq_native::ClientConfig::default();
client_config.tls.disable_verify = Some(true);
let client = client_config
.init()
.expect("failed to init client")
.with_iroh(Some(client_endpoint))
.with_iroh_addrs(server_addrs);
let url: url::Url = format!("iroh://{server_endpoint_id}").parse().unwrap();
let server_handle = tokio::spawn(async move {
let request = server.accept().await.expect("no incoming connection");
let session = request.with_publish(pub_origin.consume()).ok().await?;
let _broadcast = broadcast;
let _track = track;
let _ = session.closed().await;
Ok::<_, anyhow::Error>(())
});
let client = client.with_consume(sub_origin);
let session = tokio::time::timeout(TIMEOUT, client.connect(url))
.await
.expect("client connect timed out")
.expect("client connect failed");
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced())
.await
.expect("announce timed out")
.expect("origin closed");
assert_eq!(path.as_str(), "test");
let bc = bc.expect("expected announce, got unannounce");
let mut track_sub = bc
.subscribe_track(&Track::new("video"))
.expect("subscribe_track failed");
let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.recv_group())
.await
.expect("recv_group timed out")
.expect("recv_group failed")
.expect("track closed prematurely");
let frame = tokio::time::timeout(TIMEOUT, group_sub.read_frame())
.await
.expect("read_frame timed out")
.expect("read_frame failed")
.expect("group closed prematurely");
assert_eq!(&*frame, b"hello");
drop(session);
server_handle
.await
.expect("server task panicked")
.expect("server task failed");
}
#[cfg(feature = "noq")]
#[tracing_test::traced_test]
#[tokio::test]
async fn noq_raw_quic() {
backend_test("moqt", moq_native::QuicBackend::Noq).await;
}
#[cfg(feature = "noq")]
#[tracing_test::traced_test]
#[tokio::test]
async fn noq_webtransport() {
backend_test("https", moq_native::QuicBackend::Noq).await;
}
#[cfg(feature = "noq")]
#[tracing_test::traced_test]
#[tokio::test]
async fn noq_mtls() {
mtls_test("https", moq_native::QuicBackend::Noq).await;
}