arcly-stream 0.1.5

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! The engine builder — replaces `ApplicationRegistry::from_config`.

use super::{AppSpec, Engine, EngineConfig};
use crate::auth::{AllowAll, StreamAuthenticator};
use crate::inbound::InboundProtocol;
use crate::observe::{NoopObserver, Observer};
use std::sync::Arc;

/// Fluent builder for an [`Engine`]. Takes plain owned specs instead of a
/// deserialized config schema, so consumers never adopt `stream-center`'s TOML.
///
/// ```
/// use arcly_stream::prelude::*;
///
/// let engine = Engine::builder()
///     .max_publishers(1_000)
///     .application(AppSpec::new("live").broadcast_capacity(8192))
///     .build();
/// assert_eq!(engine.list_apps().len(), 1);
/// ```
pub struct EngineBuilder {
    config: EngineConfig,
    apps: Vec<AppSpec>,
    observer: Option<Arc<dyn Observer>>,
    authenticator: Option<Arc<dyn StreamAuthenticator>>,
    protocols: Vec<Box<dyn InboundProtocol>>,
}

impl EngineBuilder {
    /// A fresh builder with default config, no apps, and permit-all auth.
    pub fn new() -> Self {
        Self {
            config: EngineConfig::default(),
            apps: Vec::new(),
            observer: None,
            authenticator: None,
            protocols: Vec::new(),
        }
    }

    /// Set the hard cap on concurrent publishing streams.
    pub fn max_publishers(mut self, n: usize) -> Self {
        self.config.max_publishers = n;
        self
    }

    /// Replace the whole engine config.
    pub fn config(mut self, config: EngineConfig) -> Self {
        self.config = config;
        self
    }

    /// Register an application.
    pub fn application(mut self, app: AppSpec) -> Self {
        self.apps.push(app);
        self
    }

    /// Register many applications at once.
    pub fn applications(mut self, apps: impl IntoIterator<Item = AppSpec>) -> Self {
        self.apps.extend(apps);
        self
    }

    /// Install a telemetry observer. Defaults to [`NoopObserver`] if unset.
    pub fn observer<O: Observer>(mut self, observer: O) -> Self {
        self.observer = Some(Arc::new(observer));
        self
    }

    /// Install a pre-shared observer (when the host already holds an `Arc`).
    pub fn observer_arc(mut self, observer: Arc<dyn Observer>) -> Self {
        self.observer = Some(observer);
        self
    }

    /// Install a stream authenticator. Defaults to [`AllowAll`] if unset.
    pub fn authenticator<A: StreamAuthenticator>(mut self, auth: A) -> Self {
        self.authenticator = Some(Arc::new(auth));
        self
    }

    /// Install a pre-shared authenticator (when the host already holds an `Arc`).
    pub fn authenticator_arc(mut self, auth: Arc<dyn StreamAuthenticator>) -> Self {
        self.authenticator = Some(auth);
        self
    }

    /// Set the idle timeout after which frame-less streams are reaped.
    pub fn idle_timeout(mut self, timeout: std::time::Duration) -> Self {
        self.config.idle_timeout = Some(timeout);
        self
    }

    /// Register an inbound protocol worker (RTMP, or any custom
    /// [`InboundProtocol`] from your own crate). Workers registered here are run
    /// by [`Engine::serve_registered`](Engine::serve_registered).
    ///
    /// ```no_run
    /// # use arcly_stream::prelude::*;
    /// # use arcly_stream::protocol::rtmp::RtmpHandler;
    /// # async fn run() -> arcly_stream::Result<()> {
    /// let engine = Engine::builder()
    ///     .application(AppSpec::new("live").gop_cache(120))
    ///     .protocol(RtmpHandler::new("0.0.0.0:1935".parse().unwrap()))
    ///     // .protocol(MyCustomProtocol::new())   // your own InboundProtocol
    ///     .build();
    /// engine.serve_registered(CancellationToken::new()).await
    /// # }
    /// ```
    pub fn protocol<P: InboundProtocol>(mut self, protocol: P) -> Self {
        self.protocols.push(Box::new(protocol));
        self
    }

    /// Register a pre-boxed protocol worker (when the host already holds a
    /// `Box<dyn InboundProtocol>`, e.g. built dynamically from config).
    pub fn protocol_boxed(mut self, protocol: Box<dyn InboundProtocol>) -> Self {
        self.protocols.push(protocol);
        self
    }

    /// Build the engine. Any protocol workers registered via
    /// [`protocol`](Self::protocol) are carried into the engine and run by
    /// [`Engine::serve_registered`](Engine::serve_registered).
    pub fn build(self) -> Arc<Engine> {
        let observer = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
        let authenticator = self.authenticator.unwrap_or_else(|| Arc::new(AllowAll));
        Engine::from_parts(
            self.config,
            self.apps,
            observer,
            authenticator,
            self.protocols,
        )
    }
}

impl Default for EngineBuilder {
    fn default() -> Self {
        Self::new()
    }
}