use arcly_stream::prelude::*;
use arcly_stream::testing::CountingObserver;
use arcly_stream::PublishRegistry;
use bytes::Bytes;
use std::sync::Arc;
struct BurstHandler {
app: AppName,
stream_id: StreamId,
n: u32,
}
#[async_trait]
impl ProtocolHandler for BurstHandler {
fn name(&self) -> &'static str {
"burst"
}
async fn run(
&self,
registry: Arc<dyn PublishRegistry>,
shutdown: CancellationToken,
) -> Result<()> {
let key = StreamKey::new(self.app.clone(), self.stream_id.clone());
let handle = registry.start_publish(&key).await?;
for i in 0..self.n {
if shutdown.is_cancelled() {
break;
}
let frame = MediaFrame::new_video(
i as i64,
i as i64,
Bytes::from_static(b"x"),
CodecId::H264,
i % 30 == 0,
);
handle.publish_frame(frame)?;
}
registry.end_publish(&key).await?;
Ok(())
}
}
#[tokio::test]
async fn serve_drives_a_custom_protocol_handler() {
let obs = CountingObserver::new();
let engine = Engine::builder()
.application(AppSpec::new("live"))
.observer(obs.clone())
.build();
let handler = BurstHandler {
app: AppName::from("live"),
stream_id: StreamId::from("cam"),
n: 50,
};
engine
.serve(vec![Box::new(handler)], CancellationToken::new())
.await
.expect("serve completes");
assert_eq!(obs.frames(), 50);
assert_eq!(obs.publish_started(), 1);
assert_eq!(obs.publish_ended(), 1);
assert_eq!(engine.total_stream_count(), 0);
}
#[tokio::test]
async fn event_bus_reports_lifecycle() {
let engine = Engine::builder().application(AppSpec::new("live")).build();
let mut events = engine
.subscribe_events(&AppName::from("live"))
.expect("subscribe events");
let key = StreamKey::new("live", "evt");
engine.start_publish(&key).await.expect("start");
let started = events.recv().await.expect("started event");
assert_eq!(started.kind, StreamEventKind::PublishStarted);
engine.end_publish(&key).await.expect("end");
let ended = events.recv().await.expect("ended event");
assert_eq!(ended.kind, StreamEventKind::PublishEnded);
}
#[tokio::test]
async fn authenticator_gates_publish() {
use arcly_stream::auth::{Credentials, StreamAuthenticator};
struct KeyAuth;
#[async_trait]
impl StreamAuthenticator for KeyAuth {
async fn authorize_publish(&self, _key: &StreamKey, creds: &Credentials) -> Result<()> {
if creds.token.as_deref() == Some("good") {
Ok(())
} else {
Err(StreamError::Unauthorized("bad key".into()))
}
}
}
let engine = Engine::builder()
.application(AppSpec::new("live"))
.authenticator(KeyAuth)
.build();
let key = StreamKey::new("live", "cam");
let err = engine
.start_publish_authorized(&key, &Credentials::token("nope"))
.await
.unwrap_err();
assert!(matches!(err, StreamError::Unauthorized(_)));
assert_eq!(engine.total_stream_count(), 0);
let ok = engine
.start_publish_authorized(&key, &Credentials::token("good"))
.await;
assert!(ok.is_ok());
assert_eq!(engine.total_stream_count(), 1);
}
#[tokio::test]
async fn pump_source_drives_a_media_source() {
struct FiniteSource {
remaining: u32,
}
#[async_trait]
impl MediaSource for FiniteSource {
async fn next_frame(&mut self) -> Result<Option<MediaFrame>> {
if self.remaining == 0 {
return Ok(None);
}
self.remaining -= 1;
Ok(Some(MediaFrame::new_video(
0,
0,
Bytes::from_static(b"x"),
CodecId::H264,
true,
)))
}
}
let obs = CountingObserver::new();
let engine = Engine::builder()
.application(AppSpec::new("live"))
.observer(obs.clone())
.build();
let key = StreamKey::new("live", "src");
engine
.pump_source(
&key,
FiniteSource { remaining: 7 },
CancellationToken::new(),
)
.await
.expect("pump completes");
assert_eq!(obs.frames(), 7);
assert_eq!(obs.publish_started(), 1);
assert_eq!(obs.publish_ended(), 1);
assert_eq!(engine.total_stream_count(), 0);
}
#[tokio::test]
async fn engine_is_codec_agnostic_for_gop_and_eviction() {
for codec in [
CodecId::H264,
CodecId::H265,
CodecId::AV1,
CodecId::VP9,
CodecId::VVC,
] {
let engine = Engine::builder()
.application(AppSpec::new("live").gop_cache(32))
.build();
let key = StreamKey::new("live", "c");
let handle = engine.start_publish(&key).await.expect("start");
let mut kf = MediaFrame::new_video(0, 0, Bytes::from_static(b"k"), codec, true);
kf.flags |= FrameFlags::CONFIG;
handle.publish_frame(kf).expect("keyframe");
handle
.publish_frame(MediaFrame::new_video(
1,
1,
Bytes::from_static(b"d"),
codec,
false,
))
.expect("delta");
let replay = handle.replay_buffer();
assert!(
replay.iter().any(|f| f.is_keyframe()),
"{codec:?}: replay must contain the keyframe"
);
assert!(
replay.len() >= 2,
"{codec:?}: GOP must replay config + frames"
);
assert_eq!(handle.qos().total_frames, 2, "{codec:?}");
engine.end_publish(&key).await.expect("end");
}
}
#[tokio::test]
async fn gop_cache_replays_for_instant_start() {
let engine = Engine::builder()
.application(AppSpec::new("live").gop_cache(64))
.build();
let key = StreamKey::new("live", "cam");
let handle = engine.start_publish(&key).await.expect("start");
let mut cfg = MediaFrame::new_video(0, 0, Bytes::from_static(b"sps"), CodecId::H264, true);
cfg.flags |= FrameFlags::CONFIG;
handle.publish_frame(cfg).expect("config");
handle
.publish_frame(MediaFrame::new_video(
1,
1,
Bytes::from_static(b"idr"),
CodecId::H264,
true,
))
.expect("keyframe");
handle
.publish_frame(MediaFrame::new_video(
2,
2,
Bytes::from_static(b"p"),
CodecId::H264,
false,
))
.expect("delta");
let replay = handle.replay_buffer();
assert!(replay.len() >= 3);
assert!(replay[0].flags.contains(FrameFlags::CONFIG));
assert!(replay.iter().any(|f| f.is_keyframe() && f.pts == 1));
engine.end_publish(&key).await.expect("end");
}
#[tokio::test]
async fn idle_reaper_ends_silent_streams() {
let engine = Engine::builder()
.application(AppSpec::new("live"))
.idle_timeout(std::time::Duration::from_millis(10))
.build();
let key = StreamKey::new("live", "ghost");
let _h = engine.start_publish(&key).await.expect("start");
assert_eq!(engine.total_stream_count(), 1);
tokio::time::sleep(std::time::Duration::from_millis(40)).await;
let reaped = engine.reap_idle().await;
assert_eq!(reaped, 1);
assert_eq!(engine.total_stream_count(), 0);
}
#[tokio::test]
async fn subscription_evicts_after_max_lag() {
let obs = CountingObserver::new();
let engine = Engine::builder()
.application(AppSpec::new("live").broadcast_capacity(4))
.observer(obs.clone())
.build();
let key = StreamKey::new("live", "slow");
let handle = engine.start_publish(&key).await.expect("start");
let mut sub = engine
.get_stream(&key)
.unwrap()
.subscribe_resilient()
.max_lag(3);
for i in 0..50 {
handle
.publish_frame(MediaFrame::new_video(
i,
i,
Bytes::from_static(b"x"),
CodecId::H264,
false,
))
.expect("publish");
}
assert!(sub.recv().await.is_none());
assert!(sub.dropped() > 3);
engine.end_publish(&key).await.expect("end");
}
#[tokio::test]
async fn register_app_rejects_duplicates() {
let engine = Engine::builder().application(AppSpec::new("live")).build();
engine
.register_app(AppSpec::new("vod"))
.expect("fresh app registers");
assert_eq!(engine.list_apps().len(), 2);
let err = engine.register_app(AppSpec::new("live")).unwrap_err();
assert!(matches!(err, StreamError::AppAlreadyRegistered(name) if name == "live"));
assert_eq!(engine.list_apps().len(), 2);
}
#[tokio::test]
async fn metadata_is_live_after_publish() {
let engine = Engine::builder().application(AppSpec::new("live")).build();
let key = StreamKey::new("live", "cam");
let handle = engine.start_publish(&key).await.expect("start");
assert!(handle.metadata_snapshot().await.started_at_ms > 0);
handle
.update_metadata(|m| {
m.width = 1920;
m.height = 1080;
m.fps = 30.0;
})
.await;
let snap = handle.metadata_snapshot().await;
assert_eq!((snap.width, snap.height), (1920, 1080));
assert_eq!(snap.fps, 30.0);
engine.end_publish(&key).await.expect("end");
}
#[tokio::test]
async fn serve_cancels_siblings_when_one_handler_exits() {
use std::sync::atomic::{AtomicBool, Ordering};
struct QuickHandler;
#[async_trait]
impl ProtocolHandler for QuickHandler {
fn name(&self) -> &'static str {
"quick"
}
async fn run(&self, _r: Arc<dyn PublishRegistry>, _s: CancellationToken) -> Result<()> {
Ok(())
}
}
struct LongHandler(Arc<AtomicBool>);
#[async_trait]
impl ProtocolHandler for LongHandler {
fn name(&self) -> &'static str {
"long"
}
async fn run(&self, _r: Arc<dyn PublishRegistry>, s: CancellationToken) -> Result<()> {
s.cancelled().await;
self.0.store(true, Ordering::SeqCst);
Ok(())
}
}
let observed_cancel = Arc::new(AtomicBool::new(false));
let engine = Engine::builder().application(AppSpec::new("live")).build();
engine
.serve(
vec![
Box::new(QuickHandler),
Box::new(LongHandler(observed_cancel.clone())),
],
CancellationToken::new(),
)
.await
.expect("serve returns once all handlers drain");
assert!(
observed_cancel.load(Ordering::SeqCst),
"the long-running handler must observe coordinated cancellation"
);
}
#[tokio::test]
async fn resilient_subscription_resyncs_and_reports_lag() {
let obs = CountingObserver::new();
let engine = Engine::builder()
.application(AppSpec::new("live").broadcast_capacity(4))
.observer(obs.clone())
.build();
let key = StreamKey::new("live", "slow");
let handle = engine.start_publish(&key).await.expect("start");
let mut sub = engine.get_stream(&key).unwrap().subscribe_resilient();
for i in 0..10 {
handle
.publish_frame(MediaFrame::new_video(
i,
i,
Bytes::from_static(b"x"),
CodecId::H264,
false,
))
.expect("publish");
}
let frame = sub.recv().await.expect("resynced frame after lag");
assert!(frame.pts >= 6, "expected to resync near the buffer tail");
assert!(obs.lagged() >= 6, "lag must be reported to the observer");
engine.end_publish(&key).await.expect("end");
drop(handle);
while sub.recv().await.is_some() {}
}
#[cfg(feature = "ingest")]
#[test]
fn keyframe_gate_holds_until_idr() {
use arcly_stream::protocol::KeyframeGate;
let mut gate = KeyframeGate::new();
assert!(!gate.admit(FrameType::Delta)); assert!(gate.admit(FrameType::Audio)); assert!(gate.admit(FrameType::Key)); assert!(gate.admit(FrameType::Delta)); }
#[cfg(feature = "storage-fs")]
#[tokio::test]
async fn fs_storage_roundtrip() {
use arcly_stream::storage::FsStorage;
use arcly_stream::StorageBackend;
let dir = std::env::temp_dir().join(format!("arcly-stream-test-{}", std::process::id()));
let store = FsStorage::new(&dir);
store
.put("seg/0.m4s", Bytes::from_static(b"hello"))
.await
.expect("put");
assert!(store.exists("seg/0.m4s").await.expect("exists"));
assert_eq!(&store.get("seg/0.m4s").await.expect("get")[..], b"hello");
let keys = store.list("seg/").await.expect("list");
assert_eq!(keys, vec!["seg/0.m4s".to_string()]);
assert!(store.get("../escape").await.is_err());
store.delete("seg/0.m4s").await.expect("delete");
assert!(!store
.exists("seg/0.m4s")
.await
.expect("exists after delete"));
tokio::fs::remove_dir_all(&dir).await.ok();
}
#[cfg(feature = "macros")]
#[tokio::test]
async fn media_sink_derive_delegates() {
use arcly_stream::prelude::MediaSinkDerive;
use arcly_stream::testing::CollectingSink;
#[derive(MediaSinkDerive)]
struct LabeledSink {
#[allow(dead_code)]
label: &'static str,
#[sink]
inner: CollectingSink,
}
let collected = CollectingSink::new();
let mut sink = LabeledSink {
label: "recorder",
inner: collected.clone(),
};
sink.send_frame(MediaFrame::new_video(
0,
0,
Bytes::from_static(b"x"),
CodecId::H264,
true,
))
.await
.expect("delegated send");
sink.flush().await.expect("delegated flush");
assert_eq!(collected.len(), 1);
}
#[cfg(feature = "metrics")]
#[tokio::test]
async fn prometheus_observer_records() {
use arcly_stream::observability::PrometheusObserver;
let metrics = PrometheusObserver::new().expect("metrics");
let registry = metrics.registry().clone();
let engine = Engine::builder()
.application(AppSpec::new("live"))
.observer(metrics)
.build();
let key = StreamKey::new("live", "m");
let handle = engine.start_publish(&key).await.expect("start");
handle
.publish_frame(MediaFrame::new_video(
0,
0,
Bytes::from_static(b"x"),
CodecId::H264,
true,
))
.expect("publish");
let families = registry.gather();
let has_frames = families
.iter()
.any(|f| f.get_name() == "arcly_frames_total");
assert!(has_frames, "expected arcly_frames_total to be registered");
engine.end_publish(&key).await.expect("end");
}