arcly-stream 0.1.6

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! The crate-wide error type.

use thiserror::Error;

/// Programmatic classification for [`StreamError::Protocol`].
///
/// Lets callers branch on *what kind* of protocol failure occurred without
/// string-matching the detail message (e.g. retry on `Timeout`, drop the
/// connection on `Malformed`).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ProtocolErrorKind {
    /// The peer sent bytes that violate the wire format.
    Malformed,
    /// A required feature/codec/version is not supported.
    Unsupported,
    /// The operation exceeded its deadline.
    Timeout,
    /// Authentication or stream-key authorization failed.
    Unauthorized,
    /// Anything not covered above.
    Other,
}

impl ProtocolErrorKind {
    fn as_str(self) -> &'static str {
        match self {
            ProtocolErrorKind::Malformed => "malformed",
            ProtocolErrorKind::Unsupported => "unsupported",
            ProtocolErrorKind::Timeout => "timeout",
            ProtocolErrorKind::Unauthorized => "unauthorized",
            ProtocolErrorKind::Other => "other",
        }
    }
}

/// The crate-wide error type returned by every fallible operation (see the
/// [`Result`](crate::Result) alias). `#[non_exhaustive]`, so matching must
/// include a wildcard arm.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum StreamError {
    /// A wire-protocol failure, classified by [`ProtocolErrorKind`].
    #[error("Protocol error ({kind}): {detail}", kind = kind.as_str())]
    Protocol {
        /// What kind of protocol failure occurred.
        kind: ProtocolErrorKind,
        /// Human-readable detail for logs.
        detail: String,
    },

    /// A protocol handshake did not complete successfully.
    #[error("Handshake failed: {0}")]
    Handshake(String),

    /// The peer closed the connection before the operation finished.
    #[error("Connection closed unexpectedly")]
    ConnectionClosed,

    /// No live stream matches the requested key in the given application.
    #[error("Stream '{stream_id}' not found in application '{app}'")]
    StreamNotFound {
        /// Application name searched.
        app: String,
        /// Stream id searched.
        stream_id: String,
    },

    /// No application is registered under this name.
    #[error("Application '{0}' not found")]
    AppNotFound(String),

    /// An application with this name is already registered.
    #[error("Application '{0}' is already registered")]
    AppAlreadyRegistered(String),

    /// Authorization denied for the attempted publish or play.
    #[error("Unauthorized: {0}")]
    Unauthorized(String),

    /// A live publisher already holds this stream key.
    #[error("Stream '{stream_id}' is already publishing in application '{app}'")]
    StreamAlreadyPublishing {
        /// Application name.
        app: String,
        /// Stream id already publishing.
        stream_id: String,
    },

    /// The engine-wide concurrent-publisher cap was reached.
    #[error("Publisher limit reached ({limit} active streams); rejecting new publish")]
    PublisherLimitReached {
        /// The configured cap that was hit.
        limit: usize,
    },

    /// The codec is recognized but not supported by this build.
    #[error("Unsupported codec: {0:?}")]
    UnsupportedCodec(String),

    /// A codec parse/bitstream error.
    #[error("Codec error: {0}")]
    Codec(String),

    /// A transcoding pipeline error.
    #[error("Transcoding error: {0}")]
    Transcode(String),

    /// A requested hardware-acceleration backend is unavailable.
    #[error("Hardware acceleration unavailable: {0}")]
    HwAccelUnavailable(String),

    /// A generic media-pipeline error.
    #[error("Pipeline error: {0}")]
    Pipeline(String),

    /// A storage-backend operation failed.
    #[error("Storage error: {0}")]
    Storage(String),

    /// The requested object does not exist in the storage backend.
    #[error("Object not found: {0}")]
    StorageNotFound(String),

    /// A cluster/federation operation failed.
    #[error("Cluster error: {0}")]
    Cluster(String),

    /// No cluster node matches the requested address.
    #[error("Node not found: {0}")]
    NodeNotFound(String),

    /// A configuration value was missing or invalid.
    #[error("Configuration error: {0}")]
    Config(String),

    /// An underlying I/O error (auto-converted via `?`).
    #[error("I/O error: {0}")]
    Io(#[from] std::io::Error),

    /// Anything not covered by a more specific variant.
    #[error("{0}")]
    Other(String),
}

impl StreamError {
    /// A protocol error of unspecified [`ProtocolErrorKind::Other`] kind.
    pub fn protocol(msg: impl Into<String>) -> Self {
        Self::Protocol {
            kind: ProtocolErrorKind::Other,
            detail: msg.into(),
        }
    }

    /// A protocol error classified with an explicit [`ProtocolErrorKind`].
    pub fn protocol_kind(kind: ProtocolErrorKind, msg: impl Into<String>) -> Self {
        Self::Protocol {
            kind,
            detail: msg.into(),
        }
    }

    /// A [`Codec`](Self::Codec) error.
    pub fn codec(msg: impl Into<String>) -> Self {
        Self::Codec(msg.into())
    }

    /// A [`Transcode`](Self::Transcode) error.
    pub fn transcode(msg: impl Into<String>) -> Self {
        Self::Transcode(msg.into())
    }

    /// A [`Storage`](Self::Storage) error.
    pub fn storage(msg: impl Into<String>) -> Self {
        Self::Storage(msg.into())
    }

    /// A [`Cluster`](Self::Cluster) error.
    pub fn cluster(msg: impl Into<String>) -> Self {
        Self::Cluster(msg.into())
    }

    /// A [`Config`](Self::Config) error.
    pub fn config(msg: impl Into<String>) -> Self {
        Self::Config(msg.into())
    }

    /// An [`Other`](Self::Other) error.
    pub fn other(msg: impl Into<String>) -> Self {
        Self::Other(msg.into())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn protocol_helpers_set_kind_and_render_it() {
        let e = StreamError::protocol("boom");
        assert!(matches!(
            e,
            StreamError::Protocol {
                kind: ProtocolErrorKind::Other,
                ..
            }
        ));
        assert_eq!(e.to_string(), "Protocol error (other): boom");

        let e = StreamError::protocol_kind(ProtocolErrorKind::Timeout, "slow peer");
        assert_eq!(e.to_string(), "Protocol error (timeout): slow peer");
    }

    #[test]
    fn io_errors_convert_via_from() {
        let io = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe");
        let e: StreamError = io.into();
        assert!(matches!(e, StreamError::Io(_)));
    }

    #[test]
    fn structured_variants_carry_context() {
        let e = StreamError::StreamNotFound {
            app: "live".into(),
            stream_id: "cam".into(),
        };
        assert_eq!(
            e.to_string(),
            "Stream 'cam' not found in application 'live'"
        );
    }
}