use arcly_stream::prelude::*;
use arcly_stream::testing::{video_frame as video, CountingObserver};
use std::sync::Arc;
#[tokio::test]
async fn publish_subscribe_roundtrip() {
let obs = CountingObserver::new();
let engine = Engine::builder()
.application(AppSpec::new("live").broadcast_capacity(16))
.observer(obs.clone())
.build();
let key = StreamKey::new("live", "s1");
let handle = engine.start_publish(&key).await.expect("start publish");
let mut sub = engine
.get_stream(&key)
.expect("live stream")
.subscribe_resilient();
handle.publish_frame(video(0, true)).expect("publish");
handle.publish_frame(video(33, false)).expect("publish");
let f0 = sub.recv().await.expect("frame 0");
let f1 = sub.recv().await.expect("frame 1");
assert!(f0.is_keyframe());
assert!(!f1.is_keyframe());
assert_eq!(f1.pts, 33);
assert_eq!(engine.total_stream_count(), 1);
engine.end_publish(&key).await.expect("end publish");
assert_eq!(engine.total_stream_count(), 0);
assert_eq!(obs.publish_started(), 1);
assert_eq!(obs.publish_ended(), 1);
assert_eq!(obs.frames(), 2);
}
#[tokio::test]
async fn duplicate_publish_is_rejected() {
let engine = Engine::builder().application(AppSpec::new("live")).build();
let key = StreamKey::new("live", "dup");
let _h = engine.start_publish(&key).await.expect("first publish");
let err = engine.start_publish(&key).await.unwrap_err();
assert!(matches!(err, StreamError::StreamAlreadyPublishing { .. }));
}
#[tokio::test]
async fn publisher_limit_is_enforced() {
let engine = Engine::builder()
.max_publishers(1)
.application(AppSpec::new("live"))
.build();
let _h = engine
.start_publish(&StreamKey::new("live", "a"))
.await
.expect("first");
let err = engine
.start_publish(&StreamKey::new("live", "b"))
.await
.unwrap_err();
assert!(matches!(
err,
StreamError::PublisherLimitReached { limit: 1 }
));
}
#[tokio::test]
async fn unknown_app_is_an_error() {
let engine = Engine::builder().application(AppSpec::new("live")).build();
let err = engine
.start_publish(&StreamKey::new("ghost", "x"))
.await
.unwrap_err();
assert!(matches!(err, StreamError::AppNotFound(_)));
assert_eq!(engine.total_stream_count(), 0);
}
#[tokio::test]
async fn late_joiner_gets_cached_config() {
let engine = Engine::builder().application(AppSpec::new("live")).build();
let key = StreamKey::new("live", "cfg");
let handle = engine.start_publish(&key).await.expect("start");
let mut cfg = video(0, true);
cfg.flags |= FrameFlags::CONFIG;
handle.publish_frame(cfg).expect("publish config");
let (vcfg, acfg) = engine.get_stream(&key).unwrap().cached_configs();
assert!(
vcfg.is_some(),
"video config should be cached for late joiners"
);
assert!(acfg.is_none());
let _: Arc<Engine> = engine;
}