use super::origin::*;
use super::producer::*;
use super::server::MoqServer;
use super::session::MoqClient;
use crate::error::MoqError;
use std::time::Duration;
const TIMEOUT: Duration = Duration::from_secs(10);
fn opus_head() -> Vec<u8> {
let mut head = Vec::with_capacity(19);
head.extend_from_slice(b"OpusHead");
head.push(1); head.push(2); head.extend_from_slice(&0u16.to_le_bytes()); head.extend_from_slice(&48000u32.to_le_bytes()); head.extend_from_slice(&0u16.to_le_bytes()); head.push(0); head
}
fn h264_init() -> Vec<u8> {
let mut init = Vec::new();
init.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); init.extend_from_slice(&[
0x67, 0x64, 0x00, 0x1f, 0xac, 0x24, 0x84, 0x01, 0x40, 0x16, 0xec, 0x04, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40,
0x00, 0x00, 0x0c, 0x23, 0xc6, 0x0c, 0x92,
]);
init.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); init.extend_from_slice(&[0x68, 0xee, 0x32, 0xc8, 0xb0]);
init
}
#[test]
fn origin_lifecycle() {
let origin = MoqOriginProducer::new();
let _consumer = origin.consume();
}
#[test]
fn publish_media_lifecycle() {
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = opus_head();
let media = broadcast.publish_media("opus".into(), init).unwrap();
media.write_frame(b"opus frame".to_vec(), 1000).unwrap();
media.finish().unwrap();
broadcast.finish().unwrap();
}
#[tokio::test]
async fn raw_track_activity() {
let broadcast = MoqBroadcastProducer::new().unwrap();
let track = broadcast.publish_track("status".into()).unwrap();
assert_eq!(track.name().unwrap(), "status");
let consumer = track.consume().unwrap();
tokio::time::timeout(TIMEOUT, track.used())
.await
.expect("timed out waiting for raw track to become used")
.unwrap();
drop(consumer);
tokio::time::timeout(TIMEOUT, track.unused())
.await
.expect("timed out waiting for raw track to become unused")
.unwrap();
}
#[tokio::test]
async fn dynamic_track_request() {
let broadcast = MoqBroadcastProducer::new().unwrap();
let dynamic = broadcast.dynamic().unwrap();
let consumer = broadcast.consume().unwrap();
let track_consumer = consumer.subscribe_track("events".into()).unwrap();
let track = tokio::time::timeout(TIMEOUT, dynamic.requested_track())
.await
.expect("timed out waiting for requested track")
.unwrap();
assert_eq!(track.name().unwrap(), "events");
let payload = b"hello dynamic track".to_vec();
track.write_frame(payload.clone()).unwrap();
let frame = tokio::time::timeout(TIMEOUT, track_consumer.read_frame())
.await
.expect("timed out waiting for dynamic track frame")
.unwrap()
.expect("expected a frame");
assert_eq!(frame, payload);
track.finish().unwrap();
}
#[tokio::test]
async fn dynamic_track_request_can_abort() {
let broadcast = MoqBroadcastProducer::new().unwrap();
let dynamic = broadcast.dynamic().unwrap();
let consumer = broadcast.consume().unwrap();
let _track_consumer = consumer.subscribe_track("unknown".into()).unwrap();
let track = tokio::time::timeout(TIMEOUT, dynamic.requested_track())
.await
.expect("timed out waiting for requested track")
.unwrap();
track.abort(404).unwrap();
assert!(matches!(track.name(), Err(MoqError::Closed)));
}
#[tokio::test]
async fn dynamic_track_request_can_publish_media() {
let broadcast = MoqBroadcastProducer::new().unwrap();
let dynamic = broadcast.dynamic().unwrap();
let consumer = broadcast.consume().unwrap();
let catalog_consumer = consumer.subscribe_catalog().unwrap();
let media_consumer = consumer
.subscribe_media("requested-audio".into(), crate::media::Container::Legacy, 10_000)
.unwrap();
let track = tokio::time::timeout(TIMEOUT, dynamic.requested_track())
.await
.expect("timed out waiting for requested track")
.unwrap();
assert_eq!(track.name().unwrap(), "requested-audio");
let media = broadcast
.publish_media_on_track(&track, "opus".into(), opus_head())
.unwrap();
assert_eq!(media.name().unwrap(), "requested-audio");
assert!(matches!(track.name(), Err(MoqError::Closed)));
let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next())
.await
.expect("timed out waiting for catalog")
.unwrap()
.expect("expected a catalog");
let audio = catalog
.audio
.get("requested-audio")
.expect("requested track should be in catalog");
assert_eq!(audio.codec, "opus");
assert_eq!(audio.sample_rate, 48000);
assert_eq!(audio.channel_count, 2);
let payload = b"dynamic opus frame".to_vec();
media.write_frame(payload.clone(), 20_000).unwrap();
let frame = tokio::time::timeout(TIMEOUT, media_consumer.next())
.await
.expect("timed out waiting for media frame")
.unwrap()
.expect("expected a frame");
assert_eq!(frame.payload, payload);
assert_eq!(frame.timestamp_us, 20_000);
media.finish().unwrap();
}
#[tokio::test]
async fn media_track_activity_and_name() {
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = opus_head();
let media = broadcast.publish_media("opus".into(), init).unwrap();
let track_name = media.name().unwrap();
assert_eq!(track_name, "0.opus");
let broadcast_consumer = broadcast.consume().unwrap();
let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap();
let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next())
.await
.expect("timed out waiting for catalog")
.unwrap()
.expect("expected a catalog");
assert!(catalog.audio.contains_key(&track_name));
let track_consumer = broadcast_consumer.subscribe_track(track_name).unwrap();
tokio::time::timeout(TIMEOUT, media.used())
.await
.expect("timed out waiting for media track to become used")
.unwrap();
drop(track_consumer);
tokio::time::timeout(TIMEOUT, media.unused())
.await
.expect("timed out waiting for media track to become unused")
.unwrap();
}
#[test]
fn unknown_format() {
let broadcast = MoqBroadcastProducer::new().unwrap();
let err = broadcast
.publish_media("nope".into(), vec![])
.err()
.expect("unknown format should fail");
assert!(
matches!(err, crate::error::MoqError::Codec(_)),
"expected Codec error, got {err}"
);
}
#[tokio::test]
async fn local_publish_consume_audio() {
let origin = MoqOriginProducer::new();
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = opus_head();
let media = broadcast.publish_media("opus".into(), init).unwrap();
origin.publish("live".into(), &broadcast).unwrap();
let consumer = origin.consume();
let announced = consumer.announced("".into()).unwrap();
let announcement = tokio::time::timeout(TIMEOUT, announced.next())
.await
.expect("timed out waiting for announcement")
.unwrap()
.expect("expected an announcement");
assert_eq!(announcement.path(), "live");
let broadcast_consumer = announcement.broadcast();
let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap();
let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next())
.await
.expect("timed out waiting for catalog")
.unwrap()
.expect("expected a catalog");
assert_eq!(catalog.audio.len(), 1);
let (track_name, audio) = catalog.audio.iter().next().unwrap();
assert_eq!(audio.codec, "opus");
assert_eq!(audio.sample_rate, 48000);
assert_eq!(audio.channel_count, 2);
assert!(catalog.video.is_empty());
let media_consumer = broadcast_consumer
.subscribe_media(track_name.clone(), audio.container.clone(), 10_000)
.unwrap();
let payload = b"opus audio payload data".to_vec();
media.write_frame(payload.clone(), 1_000_000).unwrap();
let frame = tokio::time::timeout(TIMEOUT, media_consumer.next())
.await
.expect("timed out waiting for frame")
.unwrap()
.expect("expected a frame");
assert_eq!(frame.payload, payload);
assert_eq!(frame.timestamp_us, 1_000_000);
}
#[tokio::test]
async fn video_publish_consume() {
let origin = MoqOriginProducer::new();
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = h264_init();
let media = broadcast.publish_media("avc3".into(), init).unwrap();
origin.publish("video-test".into(), &broadcast).unwrap();
let consumer = origin.consume();
let announced = consumer.announced("".into()).unwrap();
let announcement = tokio::time::timeout(TIMEOUT, announced.next())
.await
.expect("timed out")
.unwrap()
.expect("expected announcement");
let broadcast_consumer = announcement.broadcast();
let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap();
let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next())
.await
.expect("timed out")
.unwrap()
.expect("expected catalog");
assert_eq!(catalog.video.len(), 1);
let (track_name, video) = catalog.video.iter().next().unwrap();
assert!(
video.codec.starts_with("avc1.") || video.codec.starts_with("avc3."),
"codec should be avc1/avc3, got {}",
video.codec
);
let coded = video.coded.as_ref().expect("coded dimensions should be set");
assert_eq!(coded.width, 1280);
assert_eq!(coded.height, 720);
assert!(catalog.audio.is_empty());
let media_consumer = broadcast_consumer
.subscribe_media(track_name.clone(), video.container.clone(), 10_000)
.unwrap();
let keyframe = vec![0x00, 0x00, 0x00, 0x01, 0x65, 0xAA, 0xBB, 0xCC];
media.write_frame(keyframe, 0).unwrap();
let frame = tokio::time::timeout(TIMEOUT, media_consumer.next())
.await
.expect("timed out")
.unwrap()
.expect("expected frame");
assert_eq!(frame.timestamp_us, 0);
assert!(!frame.payload.is_empty(), "frame should have payload data");
}
#[tokio::test]
async fn multiple_frames_ordering() {
let origin = MoqOriginProducer::new();
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = opus_head();
let media = broadcast.publish_media("opus".into(), init).unwrap();
origin.publish("ordering-test".into(), &broadcast).unwrap();
let consumer = origin.consume();
let announced = consumer.announced("".into()).unwrap();
let announcement = tokio::time::timeout(TIMEOUT, announced.next())
.await
.unwrap()
.unwrap()
.unwrap();
let broadcast_consumer = announcement.broadcast();
let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap();
let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next())
.await
.unwrap()
.unwrap()
.unwrap();
let (track_name, audio) = catalog.audio.iter().next().unwrap();
let media_consumer = broadcast_consumer
.subscribe_media(track_name.clone(), audio.container.clone(), 10_000)
.unwrap();
let timestamps: [u64; 5] = [0, 20_000, 40_000, 60_000, 80_000];
for (i, &ts) in timestamps.iter().enumerate() {
let payload = format!("frame-{i}");
media.write_frame(payload.into_bytes(), ts).unwrap();
}
for (i, &expected_ts) in timestamps.iter().enumerate() {
let frame = tokio::time::timeout(TIMEOUT, media_consumer.next())
.await
.unwrap_or_else(|_| panic!("timed out waiting for frame {i}"))
.unwrap()
.unwrap_or_else(|| panic!("expected frame {i}"));
assert_eq!(frame.timestamp_us, expected_ts, "frame {i} has wrong timestamp");
let expected = format!("frame-{i}");
assert_eq!(frame.payload, expected.as_bytes(), "frame {i} has wrong payload");
}
}
#[tokio::test]
async fn catalog_update_on_new_track() {
let origin = MoqOriginProducer::new();
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = opus_head();
let _media1 = broadcast.publish_media("opus".into(), init.clone()).unwrap();
origin.publish("catalog-update".into(), &broadcast).unwrap();
let consumer = origin.consume();
let announced = consumer.announced("".into()).unwrap();
let announcement = tokio::time::timeout(TIMEOUT, announced.next())
.await
.unwrap()
.unwrap()
.unwrap();
let broadcast_consumer = announcement.broadcast();
let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap();
let catalog1 = tokio::time::timeout(TIMEOUT, catalog_consumer.next())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(catalog1.audio.len(), 1);
let _media2 = broadcast.publish_media("opus".into(), init).unwrap();
let catalog2 = tokio::time::timeout(TIMEOUT, catalog_consumer.next())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(catalog2.audio.len(), 2);
}
#[test]
fn finish_closes_producer() {
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = opus_head();
let _media = broadcast.publish_media("opus".into(), init).unwrap();
broadcast.finish().unwrap();
let err = broadcast.finish().unwrap_err();
assert!(
matches!(err, crate::error::MoqError::Closed),
"expected Closed error, got {err}"
);
}
#[tokio::test]
async fn announced_broadcast() {
let origin = MoqOriginProducer::new();
let broadcast = MoqBroadcastProducer::new().unwrap();
origin.publish("test/broadcast".into(), &broadcast).unwrap();
let consumer = origin.consume();
let announced = consumer.announced("".into()).unwrap();
let announcement = tokio::time::timeout(TIMEOUT, announced.next())
.await
.expect("timed out")
.unwrap()
.expect("expected announcement");
assert_eq!(announcement.path(), "test/broadcast");
let _catalog = announcement.broadcast().subscribe_catalog().unwrap();
}
#[test]
fn without_runtime() {
std::thread::spawn(|| {
let origin = MoqOriginProducer::new();
let consumer = origin.consume();
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = opus_head();
let media = broadcast.publish_media("opus".into(), init).unwrap();
media.write_frame(b"hello".to_vec(), 1000).unwrap();
origin.publish("test".into(), &broadcast).unwrap();
let announced = consumer.announced("".into()).unwrap();
let announcement = pollster::block_on(announced.next()).unwrap().unwrap();
assert_eq!(announcement.path(), "test");
let _bc = announcement.broadcast();
let client = MoqClient::new();
client.set_tls_disable_verify(true);
client.set_consume(Some(origin));
announced.cancel();
client.cancel();
media.finish().unwrap();
broadcast.finish().unwrap();
drop(client);
drop(consumer);
drop(announcement);
drop(announced);
})
.join()
.expect("client thread panicked, FFI method missing runtime guard");
}
#[tokio::test]
async fn server_client_roundtrip() {
let server_origin = MoqOriginProducer::new();
let server = MoqServer::new();
server.set_bind("127.0.0.1:0".into()).unwrap();
server.set_tls_generate(vec!["localhost".into()]);
server.set_publish(Some(server_origin.clone()));
let addr = tokio::time::timeout(TIMEOUT, server.listen())
.await
.expect("listen timed out")
.expect("listen failed");
let url = format!("https://{addr}");
let accept_server = server.clone();
let accept = tokio::spawn(async move {
let request = accept_server
.accept()
.await
.expect("accept errored")
.expect("accept returned None");
request.ok().await.expect("handshake failed")
});
let client_origin = MoqOriginProducer::new();
let client = MoqClient::new();
client.set_tls_disable_verify(true);
client.set_bind("127.0.0.1:0".into()).unwrap();
client.set_consume(Some(client_origin.clone()));
let session = tokio::time::timeout(TIMEOUT, client.connect(url))
.await
.expect("connect timed out")
.expect("connect failed");
let server_session = tokio::time::timeout(TIMEOUT, accept)
.await
.expect("server accept timed out")
.expect("server accept task panicked");
let broadcast = MoqBroadcastProducer::new().unwrap();
let init = opus_head();
let media = broadcast.publish_media("opus".into(), init).unwrap();
server_origin.publish("hello".into(), &broadcast).unwrap();
let consumer = client_origin.consume();
let announced = consumer.announced("".into()).unwrap();
let announcement = tokio::time::timeout(TIMEOUT, announced.next())
.await
.expect("timed out waiting for announcement over the wire")
.unwrap()
.expect("expected an announcement");
assert_eq!(announcement.path(), "hello");
let bc = announcement.broadcast();
let catalog_consumer = bc.subscribe_catalog().unwrap();
let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next())
.await
.expect("timed out waiting for catalog")
.unwrap()
.expect("expected a catalog");
let (track_name, audio) = catalog.audio.iter().next().unwrap();
let media_consumer = bc
.subscribe_media(track_name.clone(), audio.container.clone(), 10_000)
.unwrap();
let payload = b"hello over the wire".to_vec();
media.write_frame(payload.clone(), 1_000_000).unwrap();
let frame = tokio::time::timeout(TIMEOUT, media_consumer.next())
.await
.expect("timed out waiting for frame")
.unwrap()
.expect("expected a frame");
assert_eq!(frame.payload, payload);
assert_eq!(frame.timestamp_us, 1_000_000);
media.finish().unwrap();
broadcast.finish().unwrap();
session.shutdown();
server_session.cancel(0);
server.cancel();
}
#[tokio::test]
async fn server_set_bind_validates() {
let server = MoqServer::new();
assert!(server.set_bind("127.0.0.1:0".into()).is_ok());
assert!(server.set_bind("[::]:443".into()).is_ok());
assert!(server.set_bind("localhost:4443".into()).is_ok());
assert!(matches!(
server.set_bind("not-an-address".into()),
Err(crate::error::MoqError::Bind(_))
));
}
#[tokio::test]
async fn server_cert_fingerprints_available_after_listen() {
let server = MoqServer::new();
server.set_bind("127.0.0.1:0".into()).unwrap();
server.set_tls_generate(vec!["localhost".into()]);
assert!(matches!(
server.cert_fingerprints(),
Err(crate::error::MoqError::Bind(_))
));
tokio::time::timeout(TIMEOUT, server.listen())
.await
.expect("listen timed out")
.expect("listen failed");
let fps = server.cert_fingerprints().expect("fingerprints available");
assert_eq!(fps.len(), 1, "one generated cert => one fingerprint");
assert_eq!(fps[0].len(), 64, "fingerprint should be hex SHA-256");
assert!(fps[0].chars().all(|c| c.is_ascii_hexdigit()));
}
#[tokio::test]
async fn request_double_respond_returns_already_responded() {
use crate::error::MoqError;
let server = MoqServer::new();
server.set_bind("127.0.0.1:0".into()).unwrap();
server.set_tls_generate(vec!["localhost".into()]);
let addr = server.listen().await.expect("listen failed");
let url = format!("https://{addr}");
let accept_server = server.clone();
let accept = tokio::spawn(async move {
let request = accept_server
.accept()
.await
.expect("accept errored")
.expect("accept returned None");
let session = request.ok().await.expect("first ok succeeds");
let second_ok = request.ok().await;
assert!(
matches!(second_ok, Err(MoqError::AlreadyResponded)),
"second ok() must fail"
);
let second_close = request.close(403).await;
assert!(
matches!(second_close, Err(MoqError::AlreadyResponded)),
"close after ok must fail"
);
session
});
let client = MoqClient::new();
client.set_tls_disable_verify(true);
client.set_bind("127.0.0.1:0".into()).unwrap();
let _session = tokio::time::timeout(TIMEOUT, client.connect(url))
.await
.expect("connect timed out")
.expect("connect failed");
let server_session = tokio::time::timeout(TIMEOUT, accept)
.await
.expect("accept timed out")
.expect("accept task panicked");
server_session.cancel(0);
server.cancel();
}
#[tokio::test]
async fn request_per_session_publish_override() {
let server = MoqServer::new();
server.set_bind("127.0.0.1:0".into()).unwrap();
server.set_tls_generate(vec!["localhost".into()]);
let addr = server.listen().await.expect("listen failed");
let url = format!("https://{addr}");
let override_origin = MoqOriginProducer::new();
let override_for_task = override_origin.clone();
let accept_server = server.clone();
let accept = tokio::spawn(async move {
let request = accept_server
.accept()
.await
.expect("accept errored")
.expect("accept returned None");
request.set_publish(Some(override_for_task));
request.ok().await.expect("ok succeeds")
});
let client_origin = MoqOriginProducer::new();
let client = MoqClient::new();
client.set_tls_disable_verify(true);
client.set_bind("127.0.0.1:0".into()).unwrap();
client.set_consume(Some(client_origin.clone()));
let session = tokio::time::timeout(TIMEOUT, client.connect(url))
.await
.expect("connect timed out")
.expect("connect failed");
let server_session = tokio::time::timeout(TIMEOUT, accept)
.await
.expect("accept timed out")
.expect("accept task panicked");
let broadcast = MoqBroadcastProducer::new().unwrap();
override_origin.publish("override-only".into(), &broadcast).unwrap();
let consumer = client_origin.consume();
let announced = consumer.announced("".into()).unwrap();
let announcement = tokio::time::timeout(TIMEOUT, announced.next())
.await
.expect("timed out waiting for override announcement")
.unwrap()
.expect("expected an announcement");
assert_eq!(announcement.path(), "override-only");
broadcast.finish().unwrap();
session.cancel(0);
server_session.cancel(0);
server.cancel();
}