Skip to main content

arcly_stream/
error.rs

1//! The crate-wide error type.
2
3use thiserror::Error;
4
5/// Programmatic classification for [`StreamError::Protocol`].
6///
7/// Lets callers branch on *what kind* of protocol failure occurred without
8/// string-matching the detail message (e.g. retry on `Timeout`, drop the
9/// connection on `Malformed`).
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11#[non_exhaustive]
12pub enum ProtocolErrorKind {
13    /// The peer sent bytes that violate the wire format.
14    Malformed,
15    /// A required feature/codec/version is not supported.
16    Unsupported,
17    /// The operation exceeded its deadline.
18    Timeout,
19    /// Authentication or stream-key authorization failed.
20    Unauthorized,
21    /// Anything not covered above.
22    Other,
23}
24
25impl ProtocolErrorKind {
26    fn as_str(self) -> &'static str {
27        match self {
28            ProtocolErrorKind::Malformed => "malformed",
29            ProtocolErrorKind::Unsupported => "unsupported",
30            ProtocolErrorKind::Timeout => "timeout",
31            ProtocolErrorKind::Unauthorized => "unauthorized",
32            ProtocolErrorKind::Other => "other",
33        }
34    }
35}
36
37#[derive(Debug, Error)]
38#[non_exhaustive]
39pub enum StreamError {
40    // Protocol errors
41    #[error("Protocol error ({kind}): {detail}", kind = kind.as_str())]
42    Protocol {
43        kind: ProtocolErrorKind,
44        detail: String,
45    },
46
47    #[error("Handshake failed: {0}")]
48    Handshake(String),
49
50    #[error("Connection closed unexpectedly")]
51    ConnectionClosed,
52
53    // Stream lifecycle errors
54    #[error("Stream '{stream_id}' not found in application '{app}'")]
55    StreamNotFound { app: String, stream_id: String },
56
57    #[error("Application '{0}' not found")]
58    AppNotFound(String),
59
60    #[error("Application '{0}' is already registered")]
61    AppAlreadyRegistered(String),
62
63    #[error("Unauthorized: {0}")]
64    Unauthorized(String),
65
66    #[error("Stream '{stream_id}' is already publishing in application '{app}'")]
67    StreamAlreadyPublishing { app: String, stream_id: String },
68
69    #[error("Publisher limit reached ({limit} active streams); rejecting new publish")]
70    PublisherLimitReached { limit: usize },
71
72    // Codec errors
73    #[error("Unsupported codec: {0:?}")]
74    UnsupportedCodec(String),
75
76    #[error("Codec error: {0}")]
77    Codec(String),
78
79    // Transcode errors
80    #[error("Transcoding error: {0}")]
81    Transcode(String),
82
83    #[error("Hardware acceleration unavailable: {0}")]
84    HwAccelUnavailable(String),
85
86    #[error("Pipeline error: {0}")]
87    Pipeline(String),
88
89    // Storage errors
90    #[error("Storage error: {0}")]
91    Storage(String),
92
93    #[error("Object not found: {0}")]
94    StorageNotFound(String),
95
96    // Cluster errors
97    #[error("Cluster error: {0}")]
98    Cluster(String),
99
100    #[error("Node not found: {0}")]
101    NodeNotFound(String),
102
103    // Configuration errors
104    #[error("Configuration error: {0}")]
105    Config(String),
106
107    // I/O errors
108    #[error("I/O error: {0}")]
109    Io(#[from] std::io::Error),
110
111    // Generic
112    #[error("{0}")]
113    Other(String),
114}
115
116impl StreamError {
117    /// A protocol error of unspecified [`ProtocolErrorKind::Other`] kind.
118    pub fn protocol(msg: impl Into<String>) -> Self {
119        Self::Protocol {
120            kind: ProtocolErrorKind::Other,
121            detail: msg.into(),
122        }
123    }
124
125    /// A protocol error classified with an explicit [`ProtocolErrorKind`].
126    pub fn protocol_kind(kind: ProtocolErrorKind, msg: impl Into<String>) -> Self {
127        Self::Protocol {
128            kind,
129            detail: msg.into(),
130        }
131    }
132
133    pub fn codec(msg: impl Into<String>) -> Self {
134        Self::Codec(msg.into())
135    }
136
137    pub fn transcode(msg: impl Into<String>) -> Self {
138        Self::Transcode(msg.into())
139    }
140
141    pub fn storage(msg: impl Into<String>) -> Self {
142        Self::Storage(msg.into())
143    }
144
145    pub fn cluster(msg: impl Into<String>) -> Self {
146        Self::Cluster(msg.into())
147    }
148
149    pub fn config(msg: impl Into<String>) -> Self {
150        Self::Config(msg.into())
151    }
152
153    pub fn other(msg: impl Into<String>) -> Self {
154        Self::Other(msg.into())
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn protocol_helpers_set_kind_and_render_it() {
164        let e = StreamError::protocol("boom");
165        assert!(matches!(
166            e,
167            StreamError::Protocol {
168                kind: ProtocolErrorKind::Other,
169                ..
170            }
171        ));
172        assert_eq!(e.to_string(), "Protocol error (other): boom");
173
174        let e = StreamError::protocol_kind(ProtocolErrorKind::Timeout, "slow peer");
175        assert_eq!(e.to_string(), "Protocol error (timeout): slow peer");
176    }
177
178    #[test]
179    fn io_errors_convert_via_from() {
180        let io = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe");
181        let e: StreamError = io.into();
182        assert!(matches!(e, StreamError::Io(_)));
183    }
184
185    #[test]
186    fn structured_variants_carry_context() {
187        let e = StreamError::StreamNotFound {
188            app: "live".into(),
189            stream_id: "cam".into(),
190        };
191        assert_eq!(
192            e.to_string(),
193            "Stream 'cam' not found in application 'live'"
194        );
195    }
196}