arcly-stream 0.1.3

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! One registered application and its live streams.

use super::events::{StreamEvent, StreamEventKind};
use super::handle::{StreamHandle, StreamState};
use crate::observe::Observer;
use crate::{AppName, Result, StreamError, StreamId};
use dashmap::DashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;

/// One registered application (e.g. "live", "vod").
///
/// Decoupled from `stream-center`'s `sc-config::AppConfig`: it takes a plain
/// `broadcast_capacity` and an injected [`Observer`] instead of reaching for a
/// metrics singleton.
pub struct Application {
    /// Application name (e.g. `"live"`).
    pub name: AppName,
    /// Per-stream broadcast channel capacity.
    pub broadcast_capacity: usize,
    /// Keyframe-anchored GOP replay buffer size, in frames (0 disables it).
    pub gop_capacity: usize,
    streams: DashMap<StreamId, StreamHandle>,
    /// Broadcast channel for stream lifecycle events within this application.
    event_tx: broadcast::Sender<StreamEvent>,
    /// Serializes concurrent start_publish calls to eliminate the check-then-insert
    /// race condition (TOCTOU) that could allow two publishers to claim the same stream.
    pub_lock: tokio::sync::Mutex<()>,
    /// Lock-free count of active streams; updated on start/end publish.
    stream_count: AtomicUsize,
    /// Injected telemetry hook (no-op by default).
    observer: Arc<dyn Observer>,
}

impl Application {
    /// Create an application with the given per-stream capacities and observer.
    pub fn new(
        name: AppName,
        broadcast_capacity: usize,
        gop_capacity: usize,
        observer: Arc<dyn Observer>,
    ) -> Arc<Self> {
        let (event_tx, _) = broadcast::channel(64);
        Arc::new(Self {
            name,
            broadcast_capacity,
            gop_capacity,
            streams: DashMap::new(),
            event_tx,
            pub_lock: tokio::sync::Mutex::new(()),
            stream_count: AtomicUsize::new(0),
            observer,
        })
    }

    /// Register a new publishing stream.  Fails if a stream with the same ID
    /// is already in the `Publishing` or `Transcoding` state.
    pub async fn start_publish(&self, stream_id: StreamId) -> Result<StreamHandle> {
        // Hold this mutex for the duration of the check+insert to prevent two
        // concurrent callers from both seeing the stream as free and both registering.
        let _guard = self.pub_lock.lock().await;

        if let Some(existing) = self.streams.get(&stream_id) {
            let state = existing.current_state().await;
            if matches!(state, StreamState::Publishing | StreamState::Transcoding) {
                return Err(StreamError::StreamAlreadyPublishing {
                    app: self.name.to_string(),
                    stream_id: stream_id.to_string(),
                });
            }
        }

        let handle = StreamHandle::with_observer(
            self.name.clone(),
            stream_id.clone(),
            self.broadcast_capacity,
            self.gop_capacity,
            Arc::clone(&self.observer),
        );
        handle.set_state(StreamState::Publishing).await;
        let started_at_ms = super::handle::now_ms();
        handle
            .update_metadata(|m| m.started_at_ms = started_at_ms)
            .await;
        self.streams.insert(stream_id.clone(), handle.clone());
        self.stream_count.fetch_add(1, Ordering::Relaxed);
        // Release the lock before notifying the observer to avoid holding it
        // across a potentially-blocking host callback.
        drop(_guard);
        self.observer.on_publish_started(self.name.as_str());

        info!(app = %self.name, stream = %stream_id, "Stream publish started");
        self.emit(stream_id.clone(), StreamEventKind::PublishStarted);

        Ok(handle)
    }

    /// Mark a stream as ended and remove it from the active registry.
    /// Returns `true` if the stream was present and removed, `false` if it
    /// was already gone (idempotent).
    pub async fn end_publish(&self, stream_id: &StreamId) -> Result<bool> {
        if let Some((_, handle)) = self.streams.remove(stream_id) {
            self.stream_count.fetch_sub(1, Ordering::Relaxed);
            self.observer.on_publish_ended(self.name.as_str());
            handle.set_state(StreamState::Ended).await;
            info!(app = %self.name, stream = %stream_id, "Stream publish ended");
            self.emit(stream_id.clone(), StreamEventKind::PublishEnded);
            return Ok(true);
        }
        Ok(false)
    }

    /// Look up a live stream handle for playback.
    pub fn get_stream(&self, stream_id: &StreamId) -> Option<StreamHandle> {
        self.streams.get(stream_id).map(|r| r.clone())
    }

    /// List all active stream IDs.
    pub fn active_streams(&self) -> Vec<StreamId> {
        self.streams.iter().map(|r| r.key().clone()).collect()
    }

    /// Snapshot of all live stream handles (used by the engine's idle reaper).
    pub fn active_handles(&self) -> Vec<StreamHandle> {
        self.streams.iter().map(|r| r.value().clone()).collect()
    }

    /// Subscribe to stream lifecycle events for this application.
    pub fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent> {
        self.event_tx.subscribe()
    }

    /// Number of streams currently active in this application.
    pub fn stream_count(&self) -> usize {
        self.stream_count.load(Ordering::Relaxed)
    }

    /// Emit a lifecycle event to both the broadcast channel and the observer.
    fn emit(&self, stream_id: StreamId, kind: StreamEventKind) {
        let event = StreamEvent {
            app: self.name.clone(),
            stream_id,
            kind,
        };
        self.observer.on_event(&event);
        let _ = self.event_tx.send(event);
    }
}

impl std::fmt::Debug for Application {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Application")
            .field("name", &self.name)
            .field("stream_count", &self.streams.len())
            .finish()
    }
}