Skip to main content

arcly_stream/
error.rs

1//! The crate-wide error type.
2
3use thiserror::Error;
4
5/// Programmatic classification for [`StreamError::Protocol`].
6///
7/// Lets callers branch on *what kind* of protocol failure occurred without
8/// string-matching the detail message (e.g. retry on `Timeout`, drop the
9/// connection on `Malformed`).
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11#[non_exhaustive]
12pub enum ProtocolErrorKind {
13    /// The peer sent bytes that violate the wire format.
14    Malformed,
15    /// A required feature/codec/version is not supported.
16    Unsupported,
17    /// The operation exceeded its deadline.
18    Timeout,
19    /// Authentication or stream-key authorization failed.
20    Unauthorized,
21    /// Anything not covered above.
22    Other,
23}
24
25impl ProtocolErrorKind {
26    fn as_str(self) -> &'static str {
27        match self {
28            ProtocolErrorKind::Malformed => "malformed",
29            ProtocolErrorKind::Unsupported => "unsupported",
30            ProtocolErrorKind::Timeout => "timeout",
31            ProtocolErrorKind::Unauthorized => "unauthorized",
32            ProtocolErrorKind::Other => "other",
33        }
34    }
35}
36
37/// The crate-wide error type returned by every fallible operation (see the
38/// [`Result`](crate::Result) alias). `#[non_exhaustive]`, so matching must
39/// include a wildcard arm.
40#[derive(Debug, Error)]
41#[non_exhaustive]
42pub enum StreamError {
43    /// A wire-protocol failure, classified by [`ProtocolErrorKind`].
44    #[error("Protocol error ({kind}): {detail}", kind = kind.as_str())]
45    Protocol {
46        /// What kind of protocol failure occurred.
47        kind: ProtocolErrorKind,
48        /// Human-readable detail for logs.
49        detail: String,
50    },
51
52    /// A protocol handshake did not complete successfully.
53    #[error("Handshake failed: {0}")]
54    Handshake(String),
55
56    /// The peer closed the connection before the operation finished.
57    #[error("Connection closed unexpectedly")]
58    ConnectionClosed,
59
60    /// No live stream matches the requested key in the given application.
61    #[error("Stream '{stream_id}' not found in application '{app}'")]
62    StreamNotFound {
63        /// Application name searched.
64        app: String,
65        /// Stream id searched.
66        stream_id: String,
67    },
68
69    /// No application is registered under this name.
70    #[error("Application '{0}' not found")]
71    AppNotFound(String),
72
73    /// An application with this name is already registered.
74    #[error("Application '{0}' is already registered")]
75    AppAlreadyRegistered(String),
76
77    /// Authorization denied for the attempted publish or play.
78    #[error("Unauthorized: {0}")]
79    Unauthorized(String),
80
81    /// A live publisher already holds this stream key.
82    #[error("Stream '{stream_id}' is already publishing in application '{app}'")]
83    StreamAlreadyPublishing {
84        /// Application name.
85        app: String,
86        /// Stream id already publishing.
87        stream_id: String,
88    },
89
90    /// The engine-wide concurrent-publisher cap was reached.
91    #[error("Publisher limit reached ({limit} active streams); rejecting new publish")]
92    PublisherLimitReached {
93        /// The configured cap that was hit.
94        limit: usize,
95    },
96
97    /// The codec is recognized but not supported by this build.
98    #[error("Unsupported codec: {0:?}")]
99    UnsupportedCodec(String),
100
101    /// A codec parse/bitstream error.
102    #[error("Codec error: {0}")]
103    Codec(String),
104
105    /// A transcoding pipeline error.
106    #[error("Transcoding error: {0}")]
107    Transcode(String),
108
109    /// A requested hardware-acceleration backend is unavailable.
110    #[error("Hardware acceleration unavailable: {0}")]
111    HwAccelUnavailable(String),
112
113    /// A generic media-pipeline error.
114    #[error("Pipeline error: {0}")]
115    Pipeline(String),
116
117    /// A storage-backend operation failed.
118    #[error("Storage error: {0}")]
119    Storage(String),
120
121    /// The requested object does not exist in the storage backend.
122    #[error("Object not found: {0}")]
123    StorageNotFound(String),
124
125    /// A cluster/federation operation failed.
126    #[error("Cluster error: {0}")]
127    Cluster(String),
128
129    /// No cluster node matches the requested address.
130    #[error("Node not found: {0}")]
131    NodeNotFound(String),
132
133    /// A configuration value was missing or invalid.
134    #[error("Configuration error: {0}")]
135    Config(String),
136
137    /// An underlying I/O error (auto-converted via `?`).
138    #[error("I/O error: {0}")]
139    Io(#[from] std::io::Error),
140
141    /// Anything not covered by a more specific variant.
142    #[error("{0}")]
143    Other(String),
144}
145
146impl StreamError {
147    /// A protocol error of unspecified [`ProtocolErrorKind::Other`] kind.
148    pub fn protocol(msg: impl Into<String>) -> Self {
149        Self::Protocol {
150            kind: ProtocolErrorKind::Other,
151            detail: msg.into(),
152        }
153    }
154
155    /// A protocol error classified with an explicit [`ProtocolErrorKind`].
156    pub fn protocol_kind(kind: ProtocolErrorKind, msg: impl Into<String>) -> Self {
157        Self::Protocol {
158            kind,
159            detail: msg.into(),
160        }
161    }
162
163    /// A [`Codec`](Self::Codec) error.
164    pub fn codec(msg: impl Into<String>) -> Self {
165        Self::Codec(msg.into())
166    }
167
168    /// A [`Transcode`](Self::Transcode) error.
169    pub fn transcode(msg: impl Into<String>) -> Self {
170        Self::Transcode(msg.into())
171    }
172
173    /// A [`Storage`](Self::Storage) error.
174    pub fn storage(msg: impl Into<String>) -> Self {
175        Self::Storage(msg.into())
176    }
177
178    /// A [`Cluster`](Self::Cluster) error.
179    pub fn cluster(msg: impl Into<String>) -> Self {
180        Self::Cluster(msg.into())
181    }
182
183    /// A [`Config`](Self::Config) error.
184    pub fn config(msg: impl Into<String>) -> Self {
185        Self::Config(msg.into())
186    }
187
188    /// An [`Other`](Self::Other) error.
189    pub fn other(msg: impl Into<String>) -> Self {
190        Self::Other(msg.into())
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    #[test]
199    fn protocol_helpers_set_kind_and_render_it() {
200        let e = StreamError::protocol("boom");
201        assert!(matches!(
202            e,
203            StreamError::Protocol {
204                kind: ProtocolErrorKind::Other,
205                ..
206            }
207        ));
208        assert_eq!(e.to_string(), "Protocol error (other): boom");
209
210        let e = StreamError::protocol_kind(ProtocolErrorKind::Timeout, "slow peer");
211        assert_eq!(e.to_string(), "Protocol error (timeout): slow peer");
212    }
213
214    #[test]
215    fn io_errors_convert_via_from() {
216        let io = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe");
217        let e: StreamError = io.into();
218        assert!(matches!(e, StreamError::Io(_)));
219    }
220
221    #[test]
222    fn structured_variants_carry_context() {
223        let e = StreamError::StreamNotFound {
224            app: "live".into(),
225            stream_id: "cam".into(),
226        };
227        assert_eq!(
228            e.to_string(),
229            "Stream 'cam' not found in application 'live'"
230        );
231    }
232}