fluxion_error/
lib.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
6//! Error types for Fluxion reactive streaming library
7//!
8//! This crate provides a comprehensive error handling system for all Fluxion operations.
9//! It defines a root [`FluxionError`] type with specific variants for different failure modes,
10//! allowing library users to handle errors appropriately.
11//!
12//! # Examples
13//!
14//! ```
15//! use fluxion_error::{FluxionError, Result};
16//!
17//! fn process_data() -> Result<()> {
18//!     // Operation that might fail
19//!     Err(FluxionError::InvalidState {
20//!         message: "Stream not ready".to_string(),
21//!     })
22//! }
23//! ```
24
25/// Root error type for all Fluxion operations
26///
27/// This enum encompasses all possible error conditions that can occur
28/// during stream processing, subscription, and channel operations.
29#[derive(Debug, thiserror::Error)]
30pub enum FluxionError {
31    /// Error acquiring a lock on shared state
32    ///
33    /// This typically indicates contention or a poisoned mutex.
34    /// The context provides details about which lock failed.
35    #[error("Failed to acquire lock: {context}")]
36    LockError {
37        /// Description of the lock that failed
38        context: String,
39    },
40
41    /// Channel send operation failed
42    ///
43    /// This occurs when attempting to send to a channel whose receiver
44    /// has been dropped.
45    #[error("Channel send failed: receiver dropped")]
46    ChannelSendError,
47
48    /// Channel receive operation failed
49    ///
50    /// This can occur when the channel is closed, empty, or in an invalid state.
51    #[error("Channel receive failed: {reason}")]
52    ChannelReceiveError {
53        /// Specific reason for the receive failure
54        reason: String,
55    },
56
57    /// Stream processing encountered an error
58    ///
59    /// This is a general error for stream operations that don't fit
60    /// other specific categories.
61    #[error("Stream processing error: {context}")]
62    StreamProcessingError {
63        /// Description of what went wrong during stream processing
64        context: String,
65    },
66
67    /// User-provided callback function panicked
68    ///
69    /// When a user-supplied closure or function panics during stream processing,
70    /// it's caught and converted to this error variant.
71    #[error("User callback panicked: {context}")]
72    CallbackPanic {
73        /// Information about the panic location and cause
74        context: String,
75    },
76
77    /// Subscription operation failed
78    ///
79    /// This encompasses errors during `subscribe_async` or `subscribe_latest_async`
80    /// operations, including user callback errors when no error handler is provided.
81    #[error("Subscription error: {context}")]
82    SubscriptionError {
83        /// Details about the subscription failure
84        context: String,
85    },
86
87    /// Invalid state encountered
88    ///
89    /// This indicates that an operation was attempted when the stream or channel
90    /// was in an inappropriate state.
91    #[error("Invalid state: {message}")]
92    InvalidState {
93        /// Description of the invalid state
94        message: String,
95    },
96
97    /// Timeout occurred while waiting for an operation
98    ///
99    /// Used when operations have time limits and they expire.
100    #[error("Operation timed out after {duration:?}: {operation}")]
101    Timeout {
102        /// The operation that timed out
103        operation: String,
104        /// How long we waited
105        duration: std::time::Duration,
106    },
107
108    /// Stream unexpectedly ended
109    ///
110    /// This occurs when more items were expected but the stream terminated.
111    #[error("Stream ended unexpectedly: expected {expected}, got {actual}")]
112    UnexpectedStreamEnd {
113        /// Number of items expected
114        expected: usize,
115        /// Number of items actually received
116        actual: usize,
117    },
118
119    /// Resource limit exceeded
120    ///
121    /// This indicates that a buffer, queue, or other bounded resource is full.
122    #[error("Resource limit exceeded: {resource} (limit: {limit})")]
123    ResourceLimitExceeded {
124        /// Name of the resource that hit its limit
125        resource: String,
126        /// The limit that was exceeded
127        limit: usize,
128    },
129
130    /// Custom error from user code
131    ///
132    /// This wraps errors produced by user-provided functions and callbacks,
133    /// allowing them to be propagated through the Fluxion error system.
134    #[error("User error: {0}")]
135    UserError(#[source] Box<dyn std::error::Error + Send + Sync>),
136
137    /// Multiple errors occurred
138    ///
139    /// When processing multiple items in parallel, multiple failures can occur.
140    /// This variant aggregates them.
141    #[error("Multiple errors occurred: {count} errors")]
142    MultipleErrors {
143        /// Number of errors that occurred
144        count: usize,
145        /// The individual errors (limited to prevent unbounded growth)
146        errors: Vec<FluxionError>,
147    },
148}
149
150impl FluxionError {
151    /// Create a lock error with the given context
152    pub fn lock_error(context: impl Into<String>) -> Self {
153        Self::LockError {
154            context: context.into(),
155        }
156    }
157
158    /// Create a stream processing error with the given context
159    pub fn stream_error(context: impl Into<String>) -> Self {
160        Self::StreamProcessingError {
161            context: context.into(),
162        }
163    }
164
165    /// Create an invalid state error with the given message
166    pub fn invalid_state(message: impl Into<String>) -> Self {
167        Self::InvalidState {
168            message: message.into(),
169        }
170    }
171
172    /// Create a subscription error with the given context
173    pub fn subscription_error(context: impl Into<String>) -> Self {
174        Self::SubscriptionError {
175            context: context.into(),
176        }
177    }
178
179    /// Create a channel receive error with the given reason
180    pub fn channel_receive_error(reason: impl Into<String>) -> Self {
181        Self::ChannelReceiveError {
182            reason: reason.into(),
183        }
184    }
185
186    /// Create a timeout error
187    pub fn timeout(operation: impl Into<String>, duration: std::time::Duration) -> Self {
188        Self::Timeout {
189            operation: operation.into(),
190            duration,
191        }
192    }
193
194    /// Create an unexpected stream end error
195    #[must_use]
196    pub const fn unexpected_end(expected: usize, actual: usize) -> Self {
197        Self::UnexpectedStreamEnd { expected, actual }
198    }
199
200    /// Create a resource limit exceeded error
201    pub fn resource_limit(resource: impl Into<String>, limit: usize) -> Self {
202        Self::ResourceLimitExceeded {
203            resource: resource.into(),
204            limit,
205        }
206    }
207
208    /// Wrap a user error
209    pub fn user_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
210        Self::UserError(Box::new(error))
211    }
212
213    /// Aggregate multiple user errors into a `MultipleErrors` variant
214    ///
215    /// This is useful for collecting errors from stream subscribers that don't have
216    /// error callbacks, allowing them to be propagated as a single error.
217    ///
218    /// # Examples
219    ///
220    /// ```
221    /// use fluxion_error::FluxionError;
222    ///
223    /// #[derive(Debug, thiserror::Error)]
224    /// #[error("Custom error: {msg}")]
225    /// struct CustomError {
226    ///     msg: String,
227    /// }
228    ///
229    /// let errors = vec![
230    ///     CustomError { msg: "first".to_string() },
231    ///     CustomError { msg: "second".to_string() },
232    /// ];
233    ///
234    /// let result = FluxionError::from_user_errors(errors);
235    /// assert!(matches!(result, FluxionError::MultipleErrors { count: 2, .. }));
236    /// ```
237    pub fn from_user_errors<E>(errors: Vec<E>) -> Self
238    where
239        E: std::error::Error + Send + Sync + 'static,
240    {
241        let count = errors.len();
242        let fluxion_errors = errors
243            .into_iter()
244            .map(|e| Self::UserError(Box::new(e)))
245            .collect();
246
247        Self::MultipleErrors {
248            count,
249            errors: fluxion_errors,
250        }
251    }
252
253    /// Check if this is a recoverable error
254    ///
255    /// Some errors indicate transient failures that could succeed on retry.
256    #[must_use]
257    pub const fn is_recoverable(&self) -> bool {
258        matches!(
259            self,
260            Self::LockError { .. } | Self::Timeout { .. } | Self::ResourceLimitExceeded { .. }
261        )
262    }
263
264    /// Check if this error indicates a permanent failure
265    #[must_use]
266    pub const fn is_permanent(&self) -> bool {
267        matches!(
268            self,
269            Self::ChannelSendError | Self::ChannelReceiveError { .. } | Self::InvalidState { .. }
270        )
271    }
272}
273
274/// Specialized Result type for Fluxion operations
275///
276/// This is a type alias for `std::result::Result<T, FluxionError>`, providing
277/// a convenient shorthand for functions that return Fluxion errors.
278///
279/// # Examples
280///
281/// ```
282/// use fluxion_error::Result;
283///
284/// fn process() -> Result<String> {
285///     Ok("processed".to_string())
286/// }
287/// ```
288pub type Result<T> = std::result::Result<T, FluxionError>;
289
290/// Extension trait for converting errors into `FluxionError`
291///
292/// This trait is automatically implemented for all types that implement
293/// `std::error::Error + Send + Sync + 'static`, allowing easy conversion
294/// to `FluxionError`.
295pub trait IntoFluxionError {
296    /// Convert this error into a `FluxionError` with additional context
297    fn into_fluxion_error(self, context: &str) -> FluxionError;
298
299    /// Convert this error into a `FluxionError` without additional context
300    fn into_fluxion(self) -> FluxionError
301    where
302        Self: Sized,
303    {
304        self.into_fluxion_error("")
305    }
306}
307
308impl<E: std::error::Error + Send + Sync + 'static> IntoFluxionError for E {
309    fn into_fluxion_error(self, _context: &str) -> FluxionError {
310        FluxionError::user_error(self)
311    }
312}
313
314/// Helper trait for adding context to `Result`s
315///
316/// This allows chaining context information onto errors in a fluent style.
317pub trait ResultExt<T> {
318    /// Add context to an error
319    ///
320    /// # Errors
321    /// Returns `Err(FluxionError)` if the underlying result is `Err`.
322    fn context(self, context: impl Into<String>) -> Result<T>;
323
324    /// Add context to an error using a closure (lazy evaluation)
325    ///
326    /// # Errors
327    /// Returns `Err(FluxionError)` if the underlying result is `Err`.
328    fn with_context<F>(self, f: F) -> Result<T>
329    where
330        F: FnOnce() -> String;
331}
332
333impl<T, E> ResultExt<T> for std::result::Result<T, E>
334where
335    E: Into<FluxionError>,
336{
337    fn context(self, context: impl Into<String>) -> Result<T> {
338        self.map_err(|e| {
339            let context = context.into();
340            match e.into() {
341                FluxionError::UserError(inner) => FluxionError::StreamProcessingError {
342                    context: format!("{context}: {inner}"),
343                },
344                other => other,
345            }
346        })
347    }
348
349    fn with_context<F>(self, f: F) -> Result<T>
350    where
351        F: FnOnce() -> String,
352    {
353        self.map_err(|e| {
354            let context = f();
355            match e.into() {
356                FluxionError::UserError(inner) => FluxionError::StreamProcessingError {
357                    context: format!("{context}: {inner}"),
358                },
359                other => other,
360            }
361        })
362    }
363}