#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
#[derive(Debug, thiserror::Error)]
pub enum FluxionError {
#[error("Failed to acquire lock: {context}")]
LockError {
context: String,
},
#[error("Channel send failed: receiver dropped")]
ChannelSendError,
#[error("Channel receive failed: {reason}")]
ChannelReceiveError {
reason: String,
},
#[error("Stream processing error: {context}")]
StreamProcessingError {
context: String,
},
#[error("User callback panicked: {context}")]
CallbackPanic {
context: String,
},
#[error("Subscription error: {context}")]
SubscriptionError {
context: String,
},
#[error("Invalid state: {message}")]
InvalidState {
message: String,
},
#[error("Operation timed out after {duration:?}: {operation}")]
Timeout {
operation: String,
duration: std::time::Duration,
},
#[error("Stream ended unexpectedly: expected {expected}, got {actual}")]
UnexpectedStreamEnd {
expected: usize,
actual: usize,
},
#[error("Resource limit exceeded: {resource} (limit: {limit})")]
ResourceLimitExceeded {
resource: String,
limit: usize,
},
#[error("User error: {0}")]
UserError(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("Multiple errors occurred: {count} errors")]
MultipleErrors {
count: usize,
errors: Vec<FluxionError>,
},
}
impl FluxionError {
pub fn lock_error(context: impl Into<String>) -> Self {
Self::LockError {
context: context.into(),
}
}
pub fn stream_error(context: impl Into<String>) -> Self {
Self::StreamProcessingError {
context: context.into(),
}
}
pub fn invalid_state(message: impl Into<String>) -> Self {
Self::InvalidState {
message: message.into(),
}
}
pub fn subscription_error(context: impl Into<String>) -> Self {
Self::SubscriptionError {
context: context.into(),
}
}
pub fn channel_receive_error(reason: impl Into<String>) -> Self {
Self::ChannelReceiveError {
reason: reason.into(),
}
}
pub fn timeout(operation: impl Into<String>, duration: std::time::Duration) -> Self {
Self::Timeout {
operation: operation.into(),
duration,
}
}
#[must_use]
pub const fn unexpected_end(expected: usize, actual: usize) -> Self {
Self::UnexpectedStreamEnd { expected, actual }
}
pub fn resource_limit(resource: impl Into<String>, limit: usize) -> Self {
Self::ResourceLimitExceeded {
resource: resource.into(),
limit,
}
}
pub fn user_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::UserError(Box::new(error))
}
pub fn from_user_errors<E>(errors: Vec<E>) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
let count = errors.len();
let fluxion_errors = errors
.into_iter()
.map(|e| Self::UserError(Box::new(e)))
.collect();
Self::MultipleErrors {
count,
errors: fluxion_errors,
}
}
#[must_use]
pub const fn is_recoverable(&self) -> bool {
matches!(
self,
Self::LockError { .. } | Self::Timeout { .. } | Self::ResourceLimitExceeded { .. }
)
}
#[must_use]
pub const fn is_permanent(&self) -> bool {
matches!(
self,
Self::ChannelSendError | Self::ChannelReceiveError { .. } | Self::InvalidState { .. }
)
}
}
pub type Result<T> = std::result::Result<T, FluxionError>;
pub trait IntoFluxionError {
fn into_fluxion_error(self, context: &str) -> FluxionError;
fn into_fluxion(self) -> FluxionError
where
Self: Sized,
{
self.into_fluxion_error("")
}
}
impl<E: std::error::Error + Send + Sync + 'static> IntoFluxionError for E {
fn into_fluxion_error(self, _context: &str) -> FluxionError {
FluxionError::user_error(self)
}
}
pub trait ResultExt<T> {
fn context(self, context: impl Into<String>) -> Result<T>;
fn with_context<F>(self, f: F) -> Result<T>
where
F: FnOnce() -> String;
}
impl<T, E> ResultExt<T> for std::result::Result<T, E>
where
E: Into<FluxionError>,
{
fn context(self, context: impl Into<String>) -> Result<T> {
self.map_err(|e| {
let context = context.into();
match e.into() {
FluxionError::UserError(inner) => FluxionError::StreamProcessingError {
context: format!("{context}: {inner}"),
},
other => other,
}
})
}
fn with_context<F>(self, f: F) -> Result<T>
where
F: FnOnce() -> String,
{
self.map_err(|e| {
let context = f();
match e.into() {
FluxionError::UserError(inner) => FluxionError::StreamProcessingError {
context: format!("{context}: {inner}"),
},
other => other,
}
})
}
}