arcly-stream 0.1.6

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! End-to-end smoke test: build an engine, publish, subscribe, end.

use arcly_stream::prelude::*;
use arcly_stream::testing::{video_frame as video, CountingObserver};
use std::sync::Arc;

#[tokio::test]
async fn publish_subscribe_roundtrip() {
    let obs = CountingObserver::new();
    let engine = Engine::builder()
        .application(AppSpec::new("live").broadcast_capacity(16))
        .observer(obs.clone())
        .build();

    let key = StreamKey::new("live", "s1");
    let handle = engine.start_publish(&key).await.expect("start publish");

    // Subscribe, then publish: the subscriber should receive every frame.
    let mut sub = engine
        .get_stream(&key)
        .expect("live stream")
        .subscribe_resilient();

    handle.publish_frame(video(0, true)).expect("publish");
    handle.publish_frame(video(33, false)).expect("publish");

    let f0 = sub.recv().await.expect("frame 0");
    let f1 = sub.recv().await.expect("frame 1");
    assert!(f0.is_keyframe());
    assert!(!f1.is_keyframe());
    assert_eq!(f1.pts, 33);

    assert_eq!(engine.total_stream_count(), 1);
    engine.end_publish(&key).await.expect("end publish");
    assert_eq!(engine.total_stream_count(), 0);

    // Observer saw the lifecycle and both frames.
    assert_eq!(obs.publish_started(), 1);
    assert_eq!(obs.publish_ended(), 1);
    assert_eq!(obs.frames(), 2);
}

#[tokio::test]
async fn duplicate_publish_is_rejected() {
    let engine = Engine::builder().application(AppSpec::new("live")).build();
    let key = StreamKey::new("live", "dup");
    let _h = engine.start_publish(&key).await.expect("first publish");
    let err = engine.start_publish(&key).await.unwrap_err();
    assert!(matches!(err, StreamError::StreamAlreadyPublishing { .. }));
}

#[tokio::test]
async fn publisher_limit_is_enforced() {
    let engine = Engine::builder()
        .max_publishers(1)
        .application(AppSpec::new("live"))
        .build();
    let _h = engine
        .start_publish(&StreamKey::new("live", "a"))
        .await
        .expect("first");
    let err = engine
        .start_publish(&StreamKey::new("live", "b"))
        .await
        .unwrap_err();
    assert!(matches!(
        err,
        StreamError::PublisherLimitReached { limit: 1 }
    ));
}

#[tokio::test]
async fn unknown_app_is_an_error() {
    let engine = Engine::builder().application(AppSpec::new("live")).build();
    let err = engine
        .start_publish(&StreamKey::new("ghost", "x"))
        .await
        .unwrap_err();
    assert!(matches!(err, StreamError::AppNotFound(_)));
    // The rolled-back slot means a later valid publish still succeeds.
    assert_eq!(engine.total_stream_count(), 0);
}

#[tokio::test]
async fn late_joiner_gets_cached_config() {
    let engine = Engine::builder().application(AppSpec::new("live")).build();
    let key = StreamKey::new("live", "cfg");
    let handle = engine.start_publish(&key).await.expect("start");

    // Publish a CONFIG keyframe before anyone subscribes.
    let mut cfg = video(0, true);
    cfg.flags |= FrameFlags::CONFIG;
    handle.publish_frame(cfg).expect("publish config");

    let (vcfg, acfg) = engine.get_stream(&key).unwrap().cached_configs();
    assert!(
        vcfg.is_some(),
        "video config should be cached for late joiners"
    );
    assert!(acfg.is_none());

    let _: Arc<Engine> = engine;
}