arcly-stream 0.1.0

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation
//! The crate-wide error type.

use thiserror::Error;

/// Programmatic classification for [`StreamError::Protocol`].
///
/// Lets callers branch on *what kind* of protocol failure occurred without
/// string-matching the detail message (e.g. retry on `Timeout`, drop the
/// connection on `Malformed`).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ProtocolErrorKind {
    /// The peer sent bytes that violate the wire format.
    Malformed,
    /// A required feature/codec/version is not supported.
    Unsupported,
    /// The operation exceeded its deadline.
    Timeout,
    /// Authentication or stream-key authorization failed.
    Unauthorized,
    /// Anything not covered above.
    Other,
}

impl ProtocolErrorKind {
    fn as_str(self) -> &'static str {
        match self {
            ProtocolErrorKind::Malformed => "malformed",
            ProtocolErrorKind::Unsupported => "unsupported",
            ProtocolErrorKind::Timeout => "timeout",
            ProtocolErrorKind::Unauthorized => "unauthorized",
            ProtocolErrorKind::Other => "other",
        }
    }
}

#[derive(Debug, Error)]
#[non_exhaustive]
pub enum StreamError {
    // Protocol errors
    #[error("Protocol error ({kind}): {detail}", kind = kind.as_str())]
    Protocol {
        kind: ProtocolErrorKind,
        detail: String,
    },

    #[error("Handshake failed: {0}")]
    Handshake(String),

    #[error("Connection closed unexpectedly")]
    ConnectionClosed,

    // Stream lifecycle errors
    #[error("Stream '{stream_id}' not found in application '{app}'")]
    StreamNotFound { app: String, stream_id: String },

    #[error("Application '{0}' not found")]
    AppNotFound(String),

    #[error("Application '{0}' is already registered")]
    AppAlreadyRegistered(String),

    #[error("Unauthorized: {0}")]
    Unauthorized(String),

    #[error("Stream '{stream_id}' is already publishing in application '{app}'")]
    StreamAlreadyPublishing { app: String, stream_id: String },

    #[error("Publisher limit reached ({limit} active streams); rejecting new publish")]
    PublisherLimitReached { limit: usize },

    // Codec errors
    #[error("Unsupported codec: {0:?}")]
    UnsupportedCodec(String),

    #[error("Codec error: {0}")]
    Codec(String),

    // Transcode errors
    #[error("Transcoding error: {0}")]
    Transcode(String),

    #[error("Hardware acceleration unavailable: {0}")]
    HwAccelUnavailable(String),

    #[error("Pipeline error: {0}")]
    Pipeline(String),

    // Storage errors
    #[error("Storage error: {0}")]
    Storage(String),

    #[error("Object not found: {0}")]
    StorageNotFound(String),

    // Cluster errors
    #[error("Cluster error: {0}")]
    Cluster(String),

    #[error("Node not found: {0}")]
    NodeNotFound(String),

    // Configuration errors
    #[error("Configuration error: {0}")]
    Config(String),

    // I/O errors
    #[error("I/O error: {0}")]
    Io(#[from] std::io::Error),

    // Generic
    #[error("{0}")]
    Other(String),
}

impl StreamError {
    /// A protocol error of unspecified [`ProtocolErrorKind::Other`] kind.
    pub fn protocol(msg: impl Into<String>) -> Self {
        Self::Protocol {
            kind: ProtocolErrorKind::Other,
            detail: msg.into(),
        }
    }

    /// A protocol error classified with an explicit [`ProtocolErrorKind`].
    pub fn protocol_kind(kind: ProtocolErrorKind, msg: impl Into<String>) -> Self {
        Self::Protocol {
            kind,
            detail: msg.into(),
        }
    }

    pub fn codec(msg: impl Into<String>) -> Self {
        Self::Codec(msg.into())
    }

    pub fn transcode(msg: impl Into<String>) -> Self {
        Self::Transcode(msg.into())
    }

    pub fn storage(msg: impl Into<String>) -> Self {
        Self::Storage(msg.into())
    }

    pub fn cluster(msg: impl Into<String>) -> Self {
        Self::Cluster(msg.into())
    }

    pub fn config(msg: impl Into<String>) -> Self {
        Self::Config(msg.into())
    }

    pub fn other(msg: impl Into<String>) -> Self {
        Self::Other(msg.into())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn protocol_helpers_set_kind_and_render_it() {
        let e = StreamError::protocol("boom");
        assert!(matches!(
            e,
            StreamError::Protocol {
                kind: ProtocolErrorKind::Other,
                ..
            }
        ));
        assert_eq!(e.to_string(), "Protocol error (other): boom");

        let e = StreamError::protocol_kind(ProtocolErrorKind::Timeout, "slow peer");
        assert_eq!(e.to_string(), "Protocol error (timeout): slow peer");
    }

    #[test]
    fn io_errors_convert_via_from() {
        let io = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe");
        let e: StreamError = io.into();
        assert!(matches!(e, StreamError::Io(_)));
    }

    #[test]
    fn structured_variants_carry_context() {
        let e = StreamError::StreamNotFound {
            app: "live".into(),
            stream_id: "cam".into(),
        };
        assert_eq!(
            e.to_string(),
            "Stream 'cam' not found in application 'live'"
        );
    }
}