arcly-stream 0.1.0

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation
//! The run loop — `arcly-http`'s `App::launch` analogue for protocol handlers.

use super::Engine;
use crate::traits::{MediaSource, ProtocolHandler};
use crate::{PublishRegistry, Result, StreamKey};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

impl Engine {
    /// Drive the engine: run every protocol handler concurrently until
    /// `shutdown` fires, then let them observe the cancellation and wind down.
    ///
    /// **Coordinated teardown:** when *any* handler returns — cleanly, with an
    /// error, or by panicking — the shared `shutdown` token is cancelled so the
    /// remaining handlers wind down too. A single listener dying never leaves
    /// its siblings orphaned.
    ///
    /// Returns the first fatal handler error, if any; otherwise `Ok(())` once
    /// all handlers have returned.
    ///
    /// ```no_run
    /// # use arcly_stream::prelude::*;
    /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn ProtocolHandler>) -> arcly_stream::Result<()> {
    /// engine.serve(vec![h], CancellationToken::new()).await
    /// # }
    /// ```
    pub async fn serve(
        self: &Arc<Self>,
        handlers: Vec<Box<dyn ProtocolHandler>>,
        shutdown: CancellationToken,
    ) -> Result<()> {
        let registry: Arc<dyn crate::bus::PublishRegistry> = self.clone();
        let mut tasks = Vec::with_capacity(handlers.len());

        for handler in handlers {
            let registry = Arc::clone(&registry);
            let shutdown = shutdown.clone();
            let name = handler.name();
            info!(protocol = name, "Protocol handler starting");
            tasks.push(tokio::spawn(async move {
                // Cancel the shared token however this task leaves — so a peer's
                // exit (or panic, via the drop guard) winds the whole set down.
                let _guard = shutdown.clone().drop_guard();
                let result = handler.run(registry, shutdown).await;
                if let Err(e) = &result {
                    error!(protocol = name, error = %e, "Protocol handler exited with error");
                }
                (name, result)
            }));
        }

        let mut first_err = None;
        for task in tasks {
            match task.await {
                Ok((name, Ok(()))) => info!(protocol = name, "Protocol handler stopped"),
                Ok((_, Err(e))) => {
                    if first_err.is_none() {
                        first_err = Some(e);
                    }
                }
                Err(join_err) => {
                    error!(error = %join_err, "Protocol handler task panicked");
                }
            }
        }

        match first_err {
            Some(e) => Err(e),
            None => Ok(()),
        }
    }

    /// Like [`serve`](Self::serve), but owns the shutdown trigger: it runs until
    /// `Ctrl-C` (SIGINT) or, on Unix, `SIGTERM` — the two signals an
    /// orchestrator (systemd, Kubernetes) uses to stop a process — then cancels
    /// the handlers and waits for them to drain.
    ///
    /// This is the batteries-included entry point for a binary; use
    /// [`serve`](Self::serve) when the host already owns a cancellation token
    /// (e.g. to compose the engine into a larger shutdown graph).
    ///
    /// ```no_run
    /// # use arcly_stream::prelude::*;
    /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn ProtocolHandler>) -> arcly_stream::Result<()> {
    /// engine.serve_until_signal(vec![h]).await
    /// # }
    /// ```
    pub async fn serve_until_signal(
        self: &Arc<Self>,
        handlers: Vec<Box<dyn ProtocolHandler>>,
    ) -> Result<()> {
        let shutdown = CancellationToken::new();
        let signal_token = shutdown.clone();
        tokio::spawn(async move {
            wait_for_shutdown_signal().await;
            info!("Shutdown signal received; cancelling protocol handlers");
            signal_token.cancel();
        });
        self.serve(handlers, shutdown).await
    }

    /// Drive a [`MediaSource`] into the bus: claim `key`, pump every frame the
    /// source yields, and end the publish session when the source is exhausted,
    /// errors, or `shutdown` fires.
    ///
    /// This is the first-class runner for the [`MediaSource`] trait — the
    /// in-process counterpart to a socket-driven [`ProtocolHandler`]. Useful for
    /// file/loopback ingest, test fixtures, and generated streams.
    pub async fn pump_source<S: MediaSource>(
        self: &Arc<Self>,
        key: &StreamKey,
        mut source: S,
        shutdown: CancellationToken,
    ) -> Result<()> {
        let handle = self.start_publish(key).await?;
        let result = loop {
            tokio::select! {
                _ = shutdown.cancelled() => break Ok(()),
                next = source.next_frame() => match next {
                    Ok(Some(frame)) => {
                        let _ = handle.publish_frame(frame);
                    }
                    Ok(None) => break Ok(()),
                    Err(e) => {
                        warn!(stream = %key, error = %e, "MediaSource errored; ending publish");
                        break Err(e);
                    }
                },
            }
        };
        // Always release the publish slot, even on a source error.
        self.end_publish(key).await?;
        result
    }

    /// Reap every publishing stream that has not received a frame within the
    /// configured [`idle_timeout`](super::EngineConfig::idle_timeout). Returns
    /// the number of streams reaped. A no-op when no timeout is configured.
    pub async fn reap_idle(self: &Arc<Self>) -> usize {
        let Some(timeout) = self.config.idle_timeout else {
            return 0;
        };
        let timeout_ms = timeout.as_millis() as u64;
        let now = crate::bus::now_ms();
        let mut reaped = 0;

        // Collect victims first; don't mutate the maps while iterating them.
        let mut victims = Vec::new();
        for app in self.apps.iter() {
            for handle in app.value().active_handles() {
                if now.saturating_sub(handle.last_frame_ms()) >= timeout_ms {
                    victims.push(handle.key().clone());
                }
            }
        }

        for key in victims {
            self.observer.on_stream_reaped(&key);
            info!(stream = %key, "Reaping idle stream");
            if self.end_publish(&key).await.is_ok() {
                reaped += 1;
            }
        }
        reaped
    }

    /// Spawn a background task that calls [`reap_idle`](Self::reap_idle) every
    /// `interval` until `shutdown` fires. No-op (returns immediately) when no
    /// idle timeout is configured.
    pub fn spawn_idle_reaper(
        self: &Arc<Self>,
        interval: std::time::Duration,
        shutdown: CancellationToken,
    ) {
        if self.config.idle_timeout.is_none() {
            return;
        }
        let engine = Arc::clone(self);
        tokio::spawn(async move {
            let mut tick = tokio::time::interval(interval);
            loop {
                tokio::select! {
                    _ = shutdown.cancelled() => break,
                    _ = tick.tick() => {
                        let n = engine.reap_idle().await;
                        if n > 0 {
                            info!(reaped = n, "Idle reaper swept streams");
                        }
                    }
                }
            }
        });
    }
}

/// Resolves on the first OS shutdown signal: SIGINT everywhere, plus SIGTERM on
/// Unix. A failure to install a handler is logged and that signal is simply
/// never observed, rather than aborting the process.
async fn wait_for_shutdown_signal() {
    let ctrl_c = async {
        if let Err(e) = tokio::signal::ctrl_c().await {
            error!(error = %e, "failed to listen for Ctrl-C");
            // Park forever so this branch never wins the select.
            std::future::pending::<()>().await;
        }
    };

    #[cfg(unix)]
    let terminate = async {
        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
            Ok(mut sig) => {
                sig.recv().await;
            }
            Err(e) => {
                error!(error = %e, "failed to install SIGTERM handler");
                std::future::pending::<()>().await;
            }
        }
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {}
        _ = terminate => {}
    }
}