fluxion_error/lib.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
6//! Error types for Fluxion reactive streaming library
7//!
8//! This crate provides a comprehensive error handling system for all Fluxion operations.
9//! It defines a root [`FluxionError`] type with specific variants for different failure modes,
10//! allowing library users to handle errors appropriately.
11//!
12//! # Examples
13//!
14//! ```
15//! use fluxion_error::{FluxionError, Result};
16//!
17//! fn process_data() -> Result<()> {
18//! // Operation that might fail
19//! Err(FluxionError::InvalidState {
20//! message: "Stream not ready".to_string(),
21//! })
22//! }
23//! ```
24
25/// Root error type for all Fluxion operations
26///
27/// This enum encompasses all possible error conditions that can occur
28/// during stream processing, subscription, and channel operations.
29#[derive(Debug, thiserror::Error)]
30pub enum FluxionError {
31 /// Error acquiring a lock on shared state
32 ///
33 /// This typically indicates contention or a poisoned mutex.
34 /// The context provides details about which lock failed.
35 #[error("Failed to acquire lock: {context}")]
36 LockError {
37 /// Description of the lock that failed
38 context: String,
39 },
40
41 /// Channel send operation failed
42 ///
43 /// This occurs when attempting to send to a channel whose receiver
44 /// has been dropped.
45 #[error("Channel send failed: receiver dropped")]
46 ChannelSendError,
47
48 /// Channel receive operation failed
49 ///
50 /// This can occur when the channel is closed, empty, or in an invalid state.
51 #[error("Channel receive failed: {reason}")]
52 ChannelReceiveError {
53 /// Specific reason for the receive failure
54 reason: String,
55 },
56
57 /// Stream processing encountered an error
58 ///
59 /// This is a general error for stream operations that don't fit
60 /// other specific categories.
61 #[error("Stream processing error: {context}")]
62 StreamProcessingError {
63 /// Description of what went wrong during stream processing
64 context: String,
65 },
66
67 /// User-provided callback function panicked
68 ///
69 /// When a user-supplied closure or function panics during stream processing,
70 /// it's caught and converted to this error variant.
71 #[error("User callback panicked: {context}")]
72 CallbackPanic {
73 /// Information about the panic location and cause
74 context: String,
75 },
76
77 /// Subscription operation failed
78 ///
79 /// This encompasses errors during `subscribe_async` or `subscribe_latest_async`
80 /// operations, including user callback errors when no error handler is provided.
81 #[error("Subscription error: {context}")]
82 SubscriptionError {
83 /// Details about the subscription failure
84 context: String,
85 },
86
87 /// Invalid state encountered
88 ///
89 /// This indicates that an operation was attempted when the stream or channel
90 /// was in an inappropriate state.
91 #[error("Invalid state: {message}")]
92 InvalidState {
93 /// Description of the invalid state
94 message: String,
95 },
96
97 /// Timeout occurred while waiting for an operation
98 ///
99 /// Used when operations have time limits and they expire.
100 #[error("Operation timed out after {duration:?}: {operation}")]
101 Timeout {
102 /// The operation that timed out
103 operation: String,
104 /// How long we waited
105 duration: std::time::Duration,
106 },
107
108 /// Stream unexpectedly ended
109 ///
110 /// This occurs when more items were expected but the stream terminated.
111 #[error("Stream ended unexpectedly: expected {expected}, got {actual}")]
112 UnexpectedStreamEnd {
113 /// Number of items expected
114 expected: usize,
115 /// Number of items actually received
116 actual: usize,
117 },
118
119 /// Resource limit exceeded
120 ///
121 /// This indicates that a buffer, queue, or other bounded resource is full.
122 #[error("Resource limit exceeded: {resource} (limit: {limit})")]
123 ResourceLimitExceeded {
124 /// Name of the resource that hit its limit
125 resource: String,
126 /// The limit that was exceeded
127 limit: usize,
128 },
129
130 /// Custom error from user code
131 ///
132 /// This wraps errors produced by user-provided functions and callbacks,
133 /// allowing them to be propagated through the Fluxion error system.
134 #[error("User error: {0}")]
135 UserError(#[source] Box<dyn std::error::Error + Send + Sync>),
136
137 /// Multiple errors occurred
138 ///
139 /// When processing multiple items in parallel, multiple failures can occur.
140 /// This variant aggregates them.
141 #[error("Multiple errors occurred: {count} errors")]
142 MultipleErrors {
143 /// Number of errors that occurred
144 count: usize,
145 /// The individual errors (limited to prevent unbounded growth)
146 errors: Vec<FluxionError>,
147 },
148}
149
150impl FluxionError {
151 /// Create a lock error with the given context
152 pub fn lock_error(context: impl Into<String>) -> Self {
153 Self::LockError {
154 context: context.into(),
155 }
156 }
157
158 /// Create a stream processing error with the given context
159 pub fn stream_error(context: impl Into<String>) -> Self {
160 Self::StreamProcessingError {
161 context: context.into(),
162 }
163 }
164
165 /// Create an invalid state error with the given message
166 pub fn invalid_state(message: impl Into<String>) -> Self {
167 Self::InvalidState {
168 message: message.into(),
169 }
170 }
171
172 /// Create a subscription error with the given context
173 pub fn subscription_error(context: impl Into<String>) -> Self {
174 Self::SubscriptionError {
175 context: context.into(),
176 }
177 }
178
179 /// Create a channel receive error with the given reason
180 pub fn channel_receive_error(reason: impl Into<String>) -> Self {
181 Self::ChannelReceiveError {
182 reason: reason.into(),
183 }
184 }
185
186 /// Create a timeout error
187 pub fn timeout(operation: impl Into<String>, duration: std::time::Duration) -> Self {
188 Self::Timeout {
189 operation: operation.into(),
190 duration,
191 }
192 }
193
194 /// Create an unexpected stream end error
195 #[must_use]
196 pub const fn unexpected_end(expected: usize, actual: usize) -> Self {
197 Self::UnexpectedStreamEnd { expected, actual }
198 }
199
200 /// Create a resource limit exceeded error
201 pub fn resource_limit(resource: impl Into<String>, limit: usize) -> Self {
202 Self::ResourceLimitExceeded {
203 resource: resource.into(),
204 limit,
205 }
206 }
207
208 /// Wrap a user error
209 pub fn user_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
210 Self::UserError(Box::new(error))
211 }
212
213 /// Aggregate multiple user errors into a `MultipleErrors` variant
214 ///
215 /// This is useful for collecting errors from stream subscribers that don't have
216 /// error callbacks, allowing them to be propagated as a single error.
217 ///
218 /// # Examples
219 ///
220 /// ```
221 /// use fluxion_error::FluxionError;
222 ///
223 /// #[derive(Debug, thiserror::Error)]
224 /// #[error("Custom error: {msg}")]
225 /// struct CustomError {
226 /// msg: String,
227 /// }
228 ///
229 /// let errors = vec![
230 /// CustomError { msg: "first".to_string() },
231 /// CustomError { msg: "second".to_string() },
232 /// ];
233 ///
234 /// let result = FluxionError::from_user_errors(errors);
235 /// assert!(matches!(result, FluxionError::MultipleErrors { count: 2, .. }));
236 /// ```
237 pub fn from_user_errors<E>(errors: Vec<E>) -> Self
238 where
239 E: std::error::Error + Send + Sync + 'static,
240 {
241 let count = errors.len();
242 let fluxion_errors = errors
243 .into_iter()
244 .map(|e| Self::UserError(Box::new(e)))
245 .collect();
246
247 Self::MultipleErrors {
248 count,
249 errors: fluxion_errors,
250 }
251 }
252
253 /// Check if this is a recoverable error
254 ///
255 /// Some errors indicate transient failures that could succeed on retry.
256 #[must_use]
257 pub const fn is_recoverable(&self) -> bool {
258 matches!(
259 self,
260 Self::LockError { .. } | Self::Timeout { .. } | Self::ResourceLimitExceeded { .. }
261 )
262 }
263
264 /// Check if this error indicates a permanent failure
265 #[must_use]
266 pub const fn is_permanent(&self) -> bool {
267 matches!(
268 self,
269 Self::ChannelSendError | Self::ChannelReceiveError { .. } | Self::InvalidState { .. }
270 )
271 }
272}
273
274/// Specialized Result type for Fluxion operations
275///
276/// This is a type alias for `std::result::Result<T, FluxionError>`, providing
277/// a convenient shorthand for functions that return Fluxion errors.
278///
279/// # Examples
280///
281/// ```
282/// use fluxion_error::Result;
283///
284/// fn process() -> Result<String> {
285/// Ok("processed".to_string())
286/// }
287/// ```
288pub type Result<T> = std::result::Result<T, FluxionError>;
289
290/// Extension trait for converting errors into `FluxionError`
291///
292/// This trait is automatically implemented for all types that implement
293/// `std::error::Error + Send + Sync + 'static`, allowing easy conversion
294/// to `FluxionError`.
295pub trait IntoFluxionError {
296 /// Convert this error into a `FluxionError` with additional context
297 fn into_fluxion_error(self, context: &str) -> FluxionError;
298
299 /// Convert this error into a `FluxionError` without additional context
300 fn into_fluxion(self) -> FluxionError
301 where
302 Self: Sized,
303 {
304 self.into_fluxion_error("")
305 }
306}
307
308impl<E: std::error::Error + Send + Sync + 'static> IntoFluxionError for E {
309 fn into_fluxion_error(self, _context: &str) -> FluxionError {
310 FluxionError::user_error(self)
311 }
312}
313
314/// Helper trait for adding context to `Result`s
315///
316/// This allows chaining context information onto errors in a fluent style.
317pub trait ResultExt<T> {
318 /// Add context to an error
319 ///
320 /// # Errors
321 /// Returns `Err(FluxionError)` if the underlying result is `Err`.
322 fn context(self, context: impl Into<String>) -> Result<T>;
323
324 /// Add context to an error using a closure (lazy evaluation)
325 ///
326 /// # Errors
327 /// Returns `Err(FluxionError)` if the underlying result is `Err`.
328 fn with_context<F>(self, f: F) -> Result<T>
329 where
330 F: FnOnce() -> String;
331}
332
333impl<T, E> ResultExt<T> for std::result::Result<T, E>
334where
335 E: Into<FluxionError>,
336{
337 fn context(self, context: impl Into<String>) -> Result<T> {
338 self.map_err(|e| {
339 let context = context.into();
340 match e.into() {
341 FluxionError::UserError(inner) => FluxionError::StreamProcessingError {
342 context: format!("{context}: {inner}"),
343 },
344 other => other,
345 }
346 })
347 }
348
349 fn with_context<F>(self, f: F) -> Result<T>
350 where
351 F: FnOnce() -> String,
352 {
353 self.map_err(|e| {
354 let context = f();
355 match e.into() {
356 FluxionError::UserError(inner) => FluxionError::StreamProcessingError {
357 context: format!("{context}: {inner}"),
358 },
359 other => other,
360 }
361 })
362 }
363}