arcly-stream 0.1.5

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! The **multi-protocol ingestion architecture** — the public seam for teaching
//! the engine new inbound wire protocols (RTSP, SRT, WebRTC WHIP/WHEP, …)
//! without touching the kernel.
//!
//! # The three pieces
//!
//! | Type | Role |
//! |------|------|
//! | [`InboundProtocol`] | The worker contract you implement: own a listener, accept connections, bridge frames onto the bus. |
//! | [`IngestContext`] | The ergonomic, cloneable handle a worker uses to reach the engine bus — hands out publish sessions. |
//! | [`PublishSession`] | An RAII token for one live stream: every frame lands in the GOP cache + live QoS; releasing it frees the publish slot. |
//!
//! ```text
//!   your crate                          arcly-stream kernel
//!  ┌────────────────────────┐          ┌───────────────────────────────┐
//!  │ struct MyRtspHandler    │          │ Engine (lock-free bus)        │
//!  │ impl InboundProtocol {  │  serve   │  • broadcast fan-out          │
//!  │   async fn serve(ctx) { │◀─────────│  • GOP cache (instant start)  │
//!  │     ctx.open_publish()──┼────────▶ │  • live QoS counters          │
//!  │       .publish_frame()  │ frames   │  PublishRegistry              │
//!  │ } }                     │          │                               │
//!  └────────────────────────┘          └───────────────────────────────┘
//! ```
//!
//! # Design pattern: a worker owns its transport
//!
//! [`InboundProtocol::serve`] is intentionally a *single, long-lived call* that
//! owns the listener for the protocol's whole lifetime, rather than a set of
//! per-connection lifecycle callbacks. This keeps the contract minimal and lets
//! each protocol model its own connection state machine (RTMP chunk streams,
//! RTSP sessions, SRT handshakes) however it needs — the kernel never assumes a
//! shape. The reusable [`run_tcp_ingest_server`](crate::protocol::run_tcp_ingest_server)
//! accept-loop (feature `ingest`) covers the common TCP case so most workers are
//! a thin per-connection handler over it.
//!
//! # Thread-safety & runtime requirements
//!
//! - A worker is `Send + Sync + 'static`: the engine shares one instance across
//!   tasks and may run it for the entire process lifetime.
//! - [`serve`](InboundProtocol::serve) must return promptly once `shutdown` is
//!   cancelled — the engine's coordinated teardown waits on every worker.
//! - [`PublishSession`] is **not** `Clone`: it models exclusive ownership of one
//!   publish slot. Frames may be published from any task that holds it (or holds
//!   its [`StreamHandle`](PublishSession::handle)).
//!
//! # Graceful teardown
//!
//! When an ingest connection drops, ending its [`PublishSession`] (via
//! [`finish`](PublishSession::finish), or best-effort on `Drop`) frees the
//! publish slot *and* closes the stream's lock-free broadcast bus. Active
//! playback subscribers are therefore **notified seamlessly** — a resilient
//! subscriber's `recv` yields `None` — with no handler reaching across to each
//! one. A host that drives a recorder ([`RecordingSink`](crate::record::RecordingSink))
//! off the same subscription flushes it on that signal, so recordings land in
//! the [`StorageBackend`](crate::traits::StorageBackend) on disconnect. The
//! engine's [idle reaper](crate::Engine::reap_idle) provides the same teardown
//! for connections that wedge without closing cleanly.
//!
//! # Minimal worker
//!
//! ```no_run
//! use arcly_stream::prelude::*;
//! use arcly_stream::inbound::{InboundProtocol, IngestContext};
//! use arcly_stream::bytes::Bytes;
//!
//! struct LoopbackProtocol;
//!
//! #[async_trait]
//! impl InboundProtocol for LoopbackProtocol {
//!     fn name(&self) -> &'static str { "loopback" }
//!
//!     async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
//!         // Claim a stream; the session releases it on drop.
//!         let session = ctx.open_publish(StreamKey::new("live", "demo")).await?;
//!         let kf = MediaFrame::new_video(0, 0, Bytes::from_static(b"idr"), CodecId::H264, true);
//!         session.publish_frame(kf)?;
//!         shutdown.cancelled().await; // run until told to stop
//!         session.finish().await
//!     }
//! }
//! ```

use crate::bus::{PublishRegistry, StreamHandle};
use crate::{MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

/// A pluggable inbound wire-protocol worker — the unit the engine runs to ingest
/// a transport (RTMP, RTSP, SRT, WHIP, …).
///
/// Implement this in your own crate and register it with
/// [`EngineBuilder::protocol`](crate::EngineBuilder::protocol) (or pass it to
/// [`Engine::serve`](crate::Engine::serve)); the engine never needs to know the
/// concrete type. A worker:
///
/// 1. **binds** its listener(s) inside [`serve`](Self::serve),
/// 2. **accepts** connections and performs each protocol **handshake**,
/// 3. resolves a [`StreamKey`] and opens a [`PublishSession`] via the
///    [`IngestContext`],
/// 4. **bridges** decoded access units to [`MediaFrame`]s and publishes them,
/// 5. **tears down** cleanly when a connection closes or `shutdown` fires.
///
/// Any type implementing the legacy [`ProtocolHandler`](crate::ProtocolHandler)
/// is automatically an `InboundProtocol` via a blanket bridge, so existing
/// handlers keep working unchanged.
#[async_trait]
pub trait InboundProtocol: Send + Sync + 'static {
    /// Stable, human-readable protocol name (`"rtmp"`, `"rtsp"`, …). Used in logs
    /// and the engine's per-worker lifecycle tracing.
    fn name(&self) -> &'static str;

    /// Run the protocol's listener until `shutdown` is cancelled.
    ///
    /// Return `Ok(())` on a clean shutdown. Returning `Err` signals a fatal fault
    /// (e.g. the listener could not bind) and trips the engine's coordinated
    /// teardown, winding down sibling workers too.
    ///
    /// Implementations **must** observe `shutdown` and return promptly once it is
    /// cancelled; the engine awaits every worker during drain.
    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()>;
}

/// Blanket bridge: every legacy [`ProtocolHandler`](crate::ProtocolHandler) is an
/// [`InboundProtocol`]. New protocols should implement `InboundProtocol` directly
/// for the ergonomic [`IngestContext`]; this keeps pre-existing handlers working.
#[async_trait]
impl<T: crate::traits::ProtocolHandler + 'static> InboundProtocol for T {
    fn name(&self) -> &'static str {
        crate::traits::ProtocolHandler::name(self)
    }

    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
        crate::traits::ProtocolHandler::run(self, Arc::clone(ctx.registry()), shutdown).await
    }
}

/// The ergonomic, cloneable handle a protocol worker uses to reach the engine
/// bus, handed to every [`InboundProtocol::serve`] call.
///
/// It hides the [`PublishRegistry`] trait object behind a small, stable surface:
/// claim a stream for publishing ([`open_publish`](Self::open_publish)), or reach
/// the underlying registry for advanced flows ([`registry`](Self::registry)).
/// Cloning is cheap (an `Arc` bump) — share it freely across per-connection tasks.
#[derive(Clone)]
pub struct IngestContext {
    registry: Arc<dyn PublishRegistry>,
}

impl IngestContext {
    /// Wrap a publish registry. The engine constructs this for you; tests and
    /// embedders can build one directly from any [`PublishRegistry`] (e.g. an
    /// `Arc<Engine>`).
    pub fn new(registry: Arc<dyn PublishRegistry>) -> Self {
        Self { registry }
    }

    /// Claim `key` for publishing, returning an RAII [`PublishSession`].
    ///
    /// Frames published through the returned session flow into the lock-free
    /// broadcast fan-out, the keyframe-anchored GOP cache (for instant-start
    /// replay), and the live QoS counters — exactly as a native handler's frames
    /// do. Fails with [`StreamAlreadyPublishing`] on a live duplicate or
    /// [`PublisherLimitReached`] at capacity.
    ///
    /// [`StreamAlreadyPublishing`]: crate::StreamError::StreamAlreadyPublishing
    /// [`PublisherLimitReached`]: crate::StreamError::PublisherLimitReached
    pub async fn open_publish(&self, key: StreamKey) -> Result<PublishSession> {
        let handle = self.registry.start_publish(&key).await?;
        Ok(PublishSession {
            handle,
            registry: Arc::clone(&self.registry),
            key,
            released: false,
        })
    }

    /// The underlying publish registry, for flows not covered by
    /// [`open_publish`](Self::open_publish).
    pub fn registry(&self) -> &Arc<dyn PublishRegistry> {
        &self.registry
    }
}

/// An RAII publish session — a protocol worker's exclusive token for one live
/// stream.
///
/// Wraps the engine's [`StreamHandle`]: every [`publish_frame`](Self::publish_frame)
/// lands in the broadcast fan-out, GOP cache, and live QoS counters. Dropping the
/// session releases the publish slot (best-effort, on the current Tokio runtime),
/// so a worker that returns early or panics never leaks a stream. Prefer
/// [`finish`](Self::finish) for deterministic, awaited teardown.
pub struct PublishSession {
    handle: StreamHandle,
    registry: Arc<dyn PublishRegistry>,
    key: StreamKey,
    released: bool,
}

impl PublishSession {
    /// The stream key this session publishes to.
    pub fn key(&self) -> &StreamKey {
        &self.key
    }

    /// The underlying engine [`StreamHandle`] — GOP cache, QoS, subscriber count,
    /// metadata, and direct `subscribe`/replay access.
    pub fn handle(&self) -> &StreamHandle {
        &self.handle
    }

    /// Publish one decoded frame to all subscribers; returns the live subscriber
    /// count (`0` when nobody is watching yet).
    pub fn publish_frame(&self, frame: MediaFrame) -> Result<usize> {
        self.handle.publish_frame(frame)
    }

    /// Release the publish slot deterministically. Prefer this over relying on
    /// [`Drop`] whenever you can `await` — it surfaces the teardown error and
    /// completes before the next stream can reuse the key.
    pub async fn finish(mut self) -> Result<()> {
        self.released = true;
        self.registry.end_publish(&self.key).await
    }
}

impl Drop for PublishSession {
    fn drop(&mut self) {
        if self.released {
            return;
        }
        // Best-effort async release. `end_publish` is async; if a Tokio runtime is
        // available (the normal case inside a worker), spawn the teardown so the
        // slot is freed even on an early return or panic.
        if let Ok(rt) = tokio::runtime::Handle::try_current() {
            let registry = Arc::clone(&self.registry);
            let key = self.key.clone();
            rt.spawn(async move {
                let _ = registry.end_publish(&key).await;
            });
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::bus::PlaybackRegistry;
    use crate::{AppSpec, CodecId, Engine, FrameFlags};
    use bytes::Bytes;

    /// A tiny custom protocol that publishes a config + keyframe then idles until
    /// shutdown — exercising the full `InboundProtocol` → `IngestContext` →
    /// `PublishSession` path against a real engine.
    struct DemoProtocol {
        key: StreamKey,
    }

    #[async_trait]
    impl InboundProtocol for DemoProtocol {
        fn name(&self) -> &'static str {
            "demo"
        }

        async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
            let session = ctx.open_publish(self.key.clone()).await?;
            let mut cfg =
                MediaFrame::new_video(0, 0, Bytes::from_static(b"sps"), CodecId::H264, false);
            cfg.flags |= FrameFlags::CONFIG;
            session.publish_frame(cfg)?;
            session.publish_frame(MediaFrame::new_video(
                0,
                0,
                Bytes::from_static(b"idr"),
                CodecId::H264,
                true,
            ))?;
            shutdown.cancelled().await;
            session.finish().await
        }
    }

    #[tokio::test]
    async fn custom_protocol_publishes_through_ingest_context() {
        let engine = Engine::builder()
            .application(AppSpec::new("live").gop_cache(8))
            .build();
        let key = StreamKey::new("live", "cam");
        let ctx = IngestContext::new(engine.clone());

        let proto = DemoProtocol { key: key.clone() };
        let shutdown = CancellationToken::new();

        // Run the worker; cancel once it has had a chance to publish.
        let worker = {
            let shutdown = shutdown.clone();
            tokio::spawn(async move { proto.serve(ctx, shutdown).await })
        };

        // Wait until the stream is live and its GOP cache holds the keyframe.
        let handle = loop {
            if let Ok(h) = engine.get_stream(&key) {
                if h.replay_buffer().iter().any(|f| f.is_keyframe()) {
                    break h;
                }
            }
            tokio::task::yield_now().await;
        };
        let (vcfg, _) = handle.cached_configs();
        assert!(vcfg.is_some(), "config frame cached via PublishSession");

        shutdown.cancel();
        worker.await.unwrap().unwrap();
        // After finish(), the publish slot is released.
        assert!(
            engine.get_stream(&key).is_err(),
            "session released on finish"
        );
    }

    #[tokio::test]
    async fn dropping_session_releases_the_slot() {
        let engine = Engine::builder().application(AppSpec::new("live")).build();
        let key = StreamKey::new("live", "drop-test");
        let ctx = IngestContext::new(engine.clone());

        {
            let _session = ctx.open_publish(key.clone()).await.unwrap();
            assert!(engine.get_stream(&key).is_ok(), "stream live while held");
        } // dropped here → best-effort async end_publish spawned

        // Yield so the spawned teardown runs.
        for _ in 0..16 {
            if engine.get_stream(&key).is_err() {
                break;
            }
            tokio::task::yield_now().await;
        }
        assert!(engine.get_stream(&key).is_err(), "slot released on drop");
    }

    #[tokio::test]
    async fn finishing_a_session_notifies_subscribers_via_the_bus() {
        // Graceful teardown: ending a publish closes the lock-free bus, so an
        // active playback subscriber is seamlessly notified (its stream ends)
        // without the protocol handler reaching across to each subscriber.
        let engine = Engine::builder()
            .application(AppSpec::new("live").gop_cache(4))
            .build();
        let key = StreamKey::new("live", "cam");
        let ctx = IngestContext::new(engine.clone());

        let session = ctx.open_publish(key.clone()).await.unwrap();
        let handle = engine.get_stream(&key).unwrap();
        let mut sub = handle.subscribe_resilient();

        // The subscriber sees the live frame.
        session
            .publish_frame(MediaFrame::new_video(
                0,
                0,
                Bytes::from_static(b"idr"),
                CodecId::H264,
                true,
            ))
            .unwrap();
        assert!(sub.recv().await.is_some(), "subscriber receives live frame");

        // Release every handle clone, then tear the session down.
        drop(handle);
        session.finish().await.unwrap();

        // The subscriber is notified of teardown: the closed bus yields `None`.
        assert!(
            sub.recv().await.is_none(),
            "subscriber notified that the stream ended"
        );
        assert!(engine.get_stream(&key).is_err(), "slot released on finish");
    }
}