arcly-stream 0.1.5

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! The foundational extension traits a downstream user implements.
//!
//! These are codec- and runtime-agnostic. Implement the ones you need and the
//! engine drives them; the engine never assumes a particular protocol,
//! container, or storage backend.

use crate::bus::PublishRegistry;
use crate::frame::CodecId;
use crate::{MediaFrame, Result, StreamError};
use async_trait::async_trait;
use bytes::Bytes;
use std::path::Path;
use std::sync::Arc;

/// A source that produces [`MediaFrame`]s (e.g. a protocol ingest connection).
#[async_trait]
pub trait MediaSource: Send + Sync {
    /// Returns the next frame, or `Ok(None)` when the source is exhausted.
    /// An `Err` result indicates a fatal I/O or protocol error.
    ///
    /// Using `Result<Option<T>>` rather than `Option<Result<T>>` follows the
    /// standard Rust I/O convention: `Ok(Some(frame))` = got a frame,
    /// `Ok(None)` = end of stream, `Err(e)` = unrecoverable error.
    async fn next_frame(&mut self) -> Result<Option<MediaFrame>>;
}

/// A sink that consumes [`MediaFrame`]s (e.g. a protocol playback connection,
/// a transcoding pipeline node, or a recorder).
#[async_trait]
pub trait MediaSink: Send + Sync {
    /// Send a frame to this sink.
    async fn send_frame(&mut self, frame: MediaFrame) -> Result<()>;

    /// Flush any internal buffers and signal end of stream.
    async fn flush(&mut self) -> Result<()> {
        Ok(())
    }
}

/// Low-level protocol handler that accepts connections and maps them to
/// stream identities.  Each concrete protocol implementation provides this.
///
/// Unlike `stream-center`'s original `ProtocolHandler`, `run` receives a
/// [`PublishRegistry`] trait object rather than reaching for a concrete
/// `ApplicationRegistry`, so a handler is reusable against any engine — or a
/// host's own bus implementation.
#[async_trait]
pub trait ProtocolHandler: Send + Sync {
    /// Human-readable name (e.g. `"rtmp"`, `"hls"`).
    fn name(&self) -> &'static str;

    /// Start accepting connections.  Implementations should run until the
    /// provided `shutdown` token is cancelled, resolving incoming connections
    /// to a [`StreamKey`](crate::StreamKey) and forwarding frames through
    /// `registry`.
    async fn run(
        &self,
        registry: Arc<dyn PublishRegistry>,
        shutdown: tokio_util::sync::CancellationToken,
    ) -> Result<()>;
}

/// Object-storage abstraction used by recording and HLS/DASH packagers.
#[async_trait]
pub trait StorageBackend: Send + Sync + 'static {
    /// Write `data` at `key`.  Overwrites if the key exists.
    async fn put(&self, key: &str, data: Bytes) -> Result<()>;

    /// Read the object stored at `key`.
    async fn get(&self, key: &str) -> Result<Bytes>;

    /// Delete the object at `key`.
    async fn delete(&self, key: &str) -> Result<()>;

    /// List all keys with the given `prefix`.
    async fn list(&self, prefix: &str) -> Result<Vec<String>>;

    /// Returns `true` if the object exists.
    async fn exists(&self, key: &str) -> Result<bool> {
        match self.get(key).await {
            Ok(_) => Ok(true),
            Err(StreamError::StorageNotFound(_)) => Ok(false),
            Err(e) => Err(e),
        }
    }

    /// Write the file at `path` to `key`, then remove the source file.
    ///
    /// The default implementation reads the file into memory and calls [`put`](Self::put).
    /// Backends that share a filesystem with the temp directory (e.g. disk)
    /// should override this with a rename to avoid the memory round-trip.
    async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
        let data = tokio::fs::read(path)
            .await
            .map_err(|e| StreamError::storage(format!("put_file read failed: {e}")))?;
        self.put(key, Bytes::from(data)).await?;
        tokio::fs::remove_file(path).await.ok();
        Ok(())
    }
}

/// `Arc<B>` is itself a [`StorageBackend`], so a single backend can be shared
/// across many packagers/recorders without wrapping boilerplate.
#[async_trait]
impl<B: StorageBackend> StorageBackend for Arc<B> {
    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
        (**self).put(key, data).await
    }
    async fn get(&self, key: &str) -> Result<Bytes> {
        (**self).get(key).await
    }
    async fn delete(&self, key: &str) -> Result<()> {
        (**self).delete(key).await
    }
    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
        (**self).list(prefix).await
    }
    async fn exists(&self, key: &str) -> Result<bool> {
        (**self).exists(key).await
    }
    async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
        (**self).put_file(key, path).await
    }
}

/// Hardware-acceleration backend (NVENC, AMF, QSV, NPU, …).
///
/// Implementors are registered at startup based on available hardware.
pub trait HwAccelBackend: Send + Sync {
    /// Unique backend identifier (e.g. `"nvenc"`, `"qsv"`).
    fn id(&self) -> &'static str;

    /// Returns whether the backend is usable on this machine.
    fn is_available(&self) -> bool;

    /// List codec IDs this backend can encode.
    fn supported_encoders(&self) -> &[CodecId];

    /// List codec IDs this backend can decode.
    fn supported_decoders(&self) -> &[CodecId];
}