arcly-stream 0.1.1

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation
//! The composition root — the analogue of `arcly-http`'s `App`.
//!
//! [`Engine`] replaces `stream-center`'s `ApplicationRegistry::from_config`.
//! It is built by a plain-Rust [`EngineBuilder`] (no TOML, no
//! `StreamCenterConfig`) and implements all three bus contracts
//! ([`PublishRegistry`], [`PlaybackRegistry`], [`EventBus`]), so it drops
//! straight into protocol code written against those traits.

mod builder;
mod driver;

pub use builder::EngineBuilder;

use crate::auth::{Credentials, StreamAuthenticator};
use crate::bus::{
    Application, EventBus, PlaybackRegistry, PublishRegistry, StreamEvent, StreamHandle,
};
use crate::observe::Observer;
use crate::{AppName, Result, StreamError, StreamId, StreamKey};
use async_trait::async_trait;
use dashmap::DashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;

/// Plain-Rust application descriptor — replaces `sc-config::AppConfig`.
#[derive(Debug, Clone)]
pub struct AppSpec {
    /// Application name (e.g. `"live"`).
    pub name: AppName,
    /// Per-stream broadcast channel capacity (frames buffered for slow joiners).
    pub broadcast_capacity: usize,
    /// Keyframe-anchored GOP replay buffer size, in frames (0 disables it).
    /// Enables sub-second playback start for late joiners.
    pub gop_capacity: usize,
}

impl AppSpec {
    /// A new app with default capacities and GOP caching disabled.
    pub fn new(name: impl Into<AppName>) -> Self {
        Self {
            name: name.into(),
            broadcast_capacity: 4096,
            gop_capacity: 0,
        }
    }

    /// Override the per-stream broadcast channel capacity.
    pub fn broadcast_capacity(mut self, n: usize) -> Self {
        self.broadcast_capacity = n;
        self
    }

    /// Enable the keyframe-anchored GOP replay buffer, bounded to `frames`
    /// (e.g. `fps × gop_seconds`). Late joiners receive the cached configs plus
    /// the current GOP and start decoding immediately.
    pub fn gop_cache(mut self, frames: usize) -> Self {
        self.gop_capacity = frames;
        self
    }
}

/// Engine-wide configuration — replaces `sc-config::ServerConfig` knobs the
/// engine actually needs.
#[derive(Debug, Clone)]
pub struct EngineConfig {
    /// Hard cap on concurrent publishing streams across all applications.
    pub max_publishers: usize,
    /// If set, a stream that has not received a frame within this window is
    /// reaped by [`Engine::reap_idle`] / the background idle reaper.
    pub idle_timeout: Option<std::time::Duration>,
}

impl Default for EngineConfig {
    fn default() -> Self {
        Self {
            max_publishers: 10_000,
            idle_timeout: None,
        }
    }
}

/// The engine. Many instances may coexist in one process (ideal for tests).
/// Cheap to `Arc`-share.
pub struct Engine {
    apps: DashMap<AppName, Arc<Application>>,
    config: EngineConfig,
    /// Running count of active publishers across all apps.
    active_publishers: AtomicUsize,
    observer: Arc<dyn Observer>,
    authenticator: Arc<dyn StreamAuthenticator>,
    /// Protocol workers registered on the builder via
    /// [`EngineBuilder::protocol`], consumed once by
    /// [`serve_registered`](Engine::serve_registered).
    pending_protocols: std::sync::Mutex<Vec<Box<dyn crate::inbound::InboundProtocol>>>,
}

impl Engine {
    /// Start building an engine.
    pub fn builder() -> EngineBuilder {
        EngineBuilder::new()
    }

    /// Internal constructor used by [`EngineBuilder::build`].
    pub(crate) fn from_parts(
        config: EngineConfig,
        specs: Vec<AppSpec>,
        observer: Arc<dyn Observer>,
        authenticator: Arc<dyn StreamAuthenticator>,
        protocols: Vec<Box<dyn crate::inbound::InboundProtocol>>,
    ) -> Arc<Self> {
        let apps = DashMap::new();
        for spec in specs {
            let app = Application::new(
                spec.name.clone(),
                spec.broadcast_capacity,
                spec.gop_capacity,
                Arc::clone(&observer),
            );
            apps.insert(spec.name.clone(), app);
            info!(app = %spec.name, "Application registered");
        }
        Arc::new(Self {
            apps,
            config,
            active_publishers: AtomicUsize::new(0),
            observer,
            authenticator,
            pending_protocols: std::sync::Mutex::new(protocols),
        })
    }

    /// Register an application after construction (e.g. on config reload).
    ///
    /// Rejects a name that is already registered with
    /// [`StreamError::AppAlreadyRegistered`] rather than silently replacing the
    /// live [`Application`] — an overwrite would orphan that app's active
    /// streams and leak the engine-wide publisher count that gates
    /// [`StreamError::PublisherLimitReached`]. To change an app's settings,
    /// drain and remove it first (a future `remove_app`), then re-register.
    pub fn register_app(&self, spec: AppSpec) -> Result<()> {
        use dashmap::mapref::entry::Entry;
        match self.apps.entry(spec.name.clone()) {
            Entry::Occupied(_) => Err(StreamError::AppAlreadyRegistered(spec.name.to_string())),
            Entry::Vacant(slot) => {
                let app = Application::new(
                    spec.name.clone(),
                    spec.broadcast_capacity,
                    spec.gop_capacity,
                    Arc::clone(&self.observer),
                );
                info!(app = %spec.name, "Application registered");
                slot.insert(app);
                Ok(())
            }
        }
    }

    /// List registered application names.
    pub fn list_apps(&self) -> Vec<AppName> {
        self.apps.iter().map(|r| r.key().clone()).collect()
    }

    /// Total active publishers across all applications (single atomic load).
    pub fn total_stream_count(&self) -> usize {
        self.active_publishers.load(Ordering::Acquire)
    }

    fn get_app(&self, app: &AppName) -> Result<Arc<Application>> {
        self.apps
            .get(app)
            .map(|r| r.clone())
            .ok_or_else(|| StreamError::AppNotFound(app.to_string()))
    }

    /// Claim a publish slot only if the injected [`StreamAuthenticator`] permits
    /// `creds` to publish `key`. Protocol handlers should call this rather than
    /// [`start_publish`](PublishRegistry::start_publish) directly so the auth
    /// policy is enforced uniformly across every transport.
    pub async fn start_publish_authorized(
        &self,
        key: &StreamKey,
        creds: &Credentials,
    ) -> Result<StreamHandle> {
        self.authenticator.authorize_publish(key, creds).await?;
        self.start_publish(key).await
    }

    /// Resolve a live stream for playback only if the authenticator permits
    /// `creds` to play `key`.
    pub async fn open_playback_authorized(
        &self,
        key: &StreamKey,
        creds: &Credentials,
    ) -> Result<StreamHandle> {
        self.authenticator.authorize_play(key, creds).await?;
        self.get_stream(key)
    }
}

#[async_trait]
impl PublishRegistry for Engine {
    #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
    async fn start_publish(&self, key: &StreamKey) -> Result<StreamHandle> {
        // Optimistically claim a publisher slot before taking the per-app lock.
        // Roll back the increment if the app-level check later rejects the stream.
        let prev = self.active_publishers.fetch_add(1, Ordering::AcqRel);
        if prev >= self.config.max_publishers {
            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
            return Err(StreamError::PublisherLimitReached {
                limit: self.config.max_publishers,
            });
        }

        let application = match self.get_app(&key.app) {
            Ok(app) => app,
            Err(e) => {
                self.active_publishers.fetch_sub(1, Ordering::AcqRel);
                return Err(e);
            }
        };

        match application.start_publish(key.stream_id.clone()).await {
            Ok(handle) => Ok(handle),
            Err(e) => {
                self.active_publishers.fetch_sub(1, Ordering::AcqRel);
                Err(e)
            }
        }
    }

    #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
    async fn end_publish(&self, key: &StreamKey) -> Result<()> {
        let application = self.get_app(&key.app)?;
        if application.end_publish(&key.stream_id).await? {
            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
        }
        Ok(())
    }
}

impl PlaybackRegistry for Engine {
    fn get_stream(&self, key: &StreamKey) -> Result<StreamHandle> {
        let application = self.get_app(&key.app)?;
        application
            .get_stream(&key.stream_id)
            .ok_or_else(|| StreamError::StreamNotFound {
                app: key.app.to_string(),
                stream_id: key.stream_id.to_string(),
            })
    }

    fn list_streams(&self, app: &AppName) -> Result<Vec<StreamId>> {
        Ok(self.get_app(app)?.active_streams())
    }
}

impl EventBus for Engine {
    fn subscribe_events(&self, app: &AppName) -> Result<broadcast::Receiver<StreamEvent>> {
        Ok(self.get_app(app)?.subscribe_events())
    }
}

impl std::fmt::Debug for Engine {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Engine")
            .field("app_count", &self.apps.len())
            .field("active_publishers", &self.total_stream_count())
            .finish()
    }
}