arcly-stream 0.1.6

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! The run loop — `arcly-http`'s `App::launch` analogue for protocol handlers.

use super::Engine;
use crate::inbound::{InboundProtocol, IngestContext};
use crate::traits::MediaSource;
use crate::{PublishRegistry, Result, StreamKey};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

impl Engine {
    /// Drive the engine: run every protocol handler concurrently until
    /// `shutdown` fires, then let them observe the cancellation and wind down.
    ///
    /// **Coordinated teardown:** when *any* handler returns — cleanly, with an
    /// error, or by panicking — the shared `shutdown` token is cancelled so the
    /// remaining handlers wind down too. A single listener dying never leaves
    /// its siblings orphaned.
    ///
    /// Returns the first fatal handler error, if any; otherwise `Ok(())` once
    /// all handlers have returned.
    ///
    /// Accepts any [`InboundProtocol`] worker — the bundled `RtmpHandler`, a
    /// legacy [`ProtocolHandler`](crate::ProtocolHandler) (bridged automatically),
    /// or your own custom protocol. Prefer
    /// [`EngineBuilder::protocol`](crate::EngineBuilder::protocol) +
    /// [`serve_registered`](Self::serve_registered) for the builder-driven path.
    ///
    /// ```no_run
    /// # use arcly_stream::prelude::*;
    /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn InboundProtocol>) -> arcly_stream::Result<()> {
    /// engine.serve(vec![h], CancellationToken::new()).await
    /// # }
    /// ```
    pub async fn serve(
        self: &Arc<Self>,
        handlers: Vec<Box<dyn InboundProtocol>>,
        shutdown: CancellationToken,
    ) -> Result<()> {
        // Every worker reaches the bus through the same ergonomic context.
        let ctx = IngestContext::new(self.clone());
        let mut tasks = Vec::with_capacity(handlers.len());

        for handler in handlers {
            let ctx = ctx.clone();
            let shutdown = shutdown.clone();
            let name = handler.name();
            info!(protocol = name, "Protocol handler starting");
            tasks.push(tokio::spawn(async move {
                // Cancel the shared token however this task leaves — so a peer's
                // exit (or panic, via the drop guard) winds the whole set down.
                let _guard = shutdown.clone().drop_guard();
                let result = handler.serve(ctx, shutdown).await;
                if let Err(e) = &result {
                    error!(protocol = name, error = %e, "Protocol handler exited with error");
                }
                (name, result)
            }));
        }

        let mut first_err = None;
        for task in tasks {
            match task.await {
                Ok((name, Ok(()))) => info!(protocol = name, "Protocol handler stopped"),
                Ok((_, Err(e))) => {
                    if first_err.is_none() {
                        first_err = Some(e);
                    }
                }
                Err(join_err) => {
                    error!(error = %join_err, "Protocol handler task panicked");
                }
            }
        }

        match first_err {
            Some(e) => Err(e),
            None => Ok(()),
        }
    }

    /// Like [`serve`](Self::serve), but owns the shutdown trigger: it runs until
    /// `Ctrl-C` (SIGINT) or, on Unix, `SIGTERM` — the two signals an
    /// orchestrator (systemd, Kubernetes) uses to stop a process — then cancels
    /// the handlers and waits for them to drain.
    ///
    /// This is the batteries-included entry point for a binary; use
    /// [`serve`](Self::serve) when the host already owns a cancellation token
    /// (e.g. to compose the engine into a larger shutdown graph).
    ///
    /// ```no_run
    /// # use arcly_stream::prelude::*;
    /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn InboundProtocol>) -> arcly_stream::Result<()> {
    /// engine.serve_until_signal(vec![h]).await
    /// # }
    /// ```
    pub async fn serve_until_signal(
        self: &Arc<Self>,
        handlers: Vec<Box<dyn InboundProtocol>>,
    ) -> Result<()> {
        let shutdown = CancellationToken::new();
        let signal_token = shutdown.clone();
        tokio::spawn(async move {
            wait_for_shutdown_signal().await;
            info!("Shutdown signal received; cancelling protocol handlers");
            signal_token.cancel();
        });
        self.serve(handlers, shutdown).await
    }

    /// Run the protocol workers registered on the [`EngineBuilder`](crate::EngineBuilder) via
    /// [`protocol`](crate::EngineBuilder::protocol), until `shutdown` fires.
    ///
    /// This is the builder-driven counterpart to [`serve`](Self::serve): register
    /// workers fluently, keep the `Arc<Engine>` for your own use (subscribing,
    /// packaging), then drive them. The registered workers are consumed on the
    /// first call; a second call serves an empty set.
    ///
    /// ```no_run
    /// # use arcly_stream::prelude::*;
    /// # use arcly_stream::protocol::rtmp::RtmpHandler;
    /// # async fn run() -> arcly_stream::Result<()> {
    /// let engine = Engine::builder()
    ///     .application(AppSpec::new("live"))
    ///     .protocol(RtmpHandler::new("0.0.0.0:1935".parse().unwrap()))
    ///     .build();
    /// engine.serve_registered(CancellationToken::new()).await
    /// # }
    /// ```
    pub async fn serve_registered(self: &Arc<Self>, shutdown: CancellationToken) -> Result<()> {
        let handlers = {
            let mut guard = self
                .pending_protocols
                .lock()
                .unwrap_or_else(|p| p.into_inner());
            std::mem::take(&mut *guard)
        };
        self.serve(handlers, shutdown).await
    }

    /// Like [`serve_registered`](Self::serve_registered) but owns the shutdown
    /// trigger, running the builder-registered workers until `Ctrl-C`/`SIGTERM`.
    pub async fn serve_registered_until_signal(self: &Arc<Self>) -> Result<()> {
        let handlers = {
            let mut guard = self
                .pending_protocols
                .lock()
                .unwrap_or_else(|p| p.into_inner());
            std::mem::take(&mut *guard)
        };
        self.serve_until_signal(handlers).await
    }

    /// Drive a [`MediaSource`] into the bus: claim `key`, pump every frame the
    /// source yields, and end the publish session when the source is exhausted,
    /// errors, or `shutdown` fires.
    ///
    /// This is the first-class runner for the [`MediaSource`] trait — the
    /// in-process counterpart to a socket-driven
    /// [`ProtocolHandler`](crate::ProtocolHandler). Useful for
    /// file/loopback ingest, test fixtures, and generated streams.
    pub async fn pump_source<S: MediaSource>(
        self: &Arc<Self>,
        key: &StreamKey,
        mut source: S,
        shutdown: CancellationToken,
    ) -> Result<()> {
        let handle = self.start_publish(key).await?;
        let result = loop {
            tokio::select! {
                _ = shutdown.cancelled() => break Ok(()),
                next = source.next_frame() => match next {
                    Ok(Some(frame)) => {
                        let _ = handle.publish_frame(frame);
                    }
                    Ok(None) => break Ok(()),
                    Err(e) => {
                        warn!(stream = %key, error = %e, "MediaSource errored; ending publish");
                        break Err(e);
                    }
                },
            }
        };
        // Always release the publish slot, even on a source error.
        self.end_publish(key).await?;
        result
    }

    /// Reap every publishing stream that has not received a frame within the
    /// configured [`idle_timeout`](super::EngineConfig::idle_timeout). Returns
    /// the number of streams reaped. A no-op when no timeout is configured.
    pub async fn reap_idle(self: &Arc<Self>) -> usize {
        let Some(timeout) = self.config.idle_timeout else {
            return 0;
        };
        let timeout_ms = timeout.as_millis() as u64;
        // Monotonic — matches `StreamHandle::last_frame_ms` and is immune to wall
        // clock steps that could otherwise reap (or fail to reap) a live stream.
        let now = crate::bus::mono_ms();
        let mut reaped = 0;

        // Collect victims first; don't mutate the maps while iterating them.
        let mut victims = Vec::new();
        for app in self.apps.iter() {
            for handle in app.value().active_handles() {
                if now.saturating_sub(handle.last_frame_ms()) >= timeout_ms {
                    victims.push(handle.key().clone());
                }
            }
        }

        for key in victims {
            self.observer.on_stream_reaped(&key);
            info!(stream = %key, "Reaping idle stream");
            if self.end_publish(&key).await.is_ok() {
                reaped += 1;
            }
        }
        reaped
    }

    /// Spawn a background task that calls [`reap_idle`](Self::reap_idle) every
    /// `interval` until `shutdown` fires. No-op (returns immediately) when no
    /// idle timeout is configured.
    pub fn spawn_idle_reaper(
        self: &Arc<Self>,
        interval: std::time::Duration,
        shutdown: CancellationToken,
    ) {
        if self.config.idle_timeout.is_none() {
            return;
        }
        let engine = Arc::clone(self);
        tokio::spawn(async move {
            let mut tick = tokio::time::interval(interval);
            loop {
                tokio::select! {
                    _ = shutdown.cancelled() => break,
                    _ = tick.tick() => {
                        let n = engine.reap_idle().await;
                        if n > 0 {
                            info!(reaped = n, "Idle reaper swept streams");
                        }
                    }
                }
            }
        });
    }
}

/// Resolves on the first OS shutdown signal: SIGINT everywhere, plus SIGTERM on
/// Unix. A failure to install a handler is logged and that signal is simply
/// never observed, rather than aborting the process.
async fn wait_for_shutdown_signal() {
    let ctrl_c = async {
        if let Err(e) = tokio::signal::ctrl_c().await {
            error!(error = %e, "failed to listen for Ctrl-C");
            // Park forever so this branch never wins the select.
            std::future::pending::<()>().await;
        }
    };

    #[cfg(unix)]
    let terminate = async {
        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
            Ok(mut sig) => {
                sig.recv().await;
            }
            Err(e) => {
                error!(error = %e, "failed to install SIGTERM handler");
                std::future::pending::<()>().await;
            }
        }
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {}
        _ = terminate => {}
    }
}