use thiserror::Error;
#[derive(Error, Debug)]
pub enum MessagingError {
#[error("Connection failed: {0}")]
Connection(String),
#[error("Publish failed: {0}")]
Publish(String),
#[error("Subscribe failed: {0}")]
Subscribe(String),
#[error("Acknowledge failed: {0}")]
Acknowledge(String),
#[error("Serialization failed: {0}")]
Serialization(String),
#[error("Deserialization failed: {0}")]
Deserialization(String),
#[error("Operation timed out: {0}")]
Timeout(String),
#[error("Authentication failed: {0}")]
Authentication(String),
#[error("Authorization failed: {0}")]
Authorization(String),
#[error("Invalid configuration: {0}")]
Configuration(String),
#[error("Channel closed: {0}")]
ChannelClosed(String),
#[error("Queue/topic not found: {0}")]
NotFound(String),
#[error("Queue/topic already exists: {0}")]
AlreadyExists(String),
#[error("Resource exhausted: {0}")]
ResourceExhausted(String),
#[error("Message rejected: {0}")]
Rejected(String),
#[error("Message expired: {0}")]
Expired(String),
#[error("Broker error: {0}")]
BrokerError(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("{0}")]
Other(String),
}
impl MessagingError {
pub fn is_retryable(&self) -> bool {
matches!(
self,
MessagingError::Connection(_)
| MessagingError::Timeout(_)
| MessagingError::ChannelClosed(_)
| MessagingError::ResourceExhausted(_)
| MessagingError::BrokerError(_)
)
}
pub fn is_connection_error(&self) -> bool {
matches!(
self,
MessagingError::Connection(_)
| MessagingError::ChannelClosed(_)
| MessagingError::Authentication(_)
)
}
}
#[cfg(feature = "rabbitmq")]
impl From<lapin::Error> for MessagingError {
fn from(err: lapin::Error) -> Self {
match &err {
lapin::Error::IOError(_) => MessagingError::Connection(err.to_string()),
lapin::Error::ChannelsLimitReached => {
MessagingError::ResourceExhausted(err.to_string())
}
lapin::Error::InvalidChannelState(_) => MessagingError::ChannelClosed(err.to_string()),
lapin::Error::InvalidConnectionState(_) => MessagingError::Connection(err.to_string()),
_ => MessagingError::BrokerError(err.to_string()),
}
}
}
#[cfg(feature = "kafka")]
impl From<rdkafka::error::KafkaError> for MessagingError {
fn from(err: rdkafka::error::KafkaError) -> Self {
match &err {
rdkafka::error::KafkaError::MessageProduction(_) => {
MessagingError::Publish(err.to_string())
}
rdkafka::error::KafkaError::MessageConsumption(_) => {
MessagingError::Subscribe(err.to_string())
}
rdkafka::error::KafkaError::ClientCreation(_) => {
MessagingError::Connection(err.to_string())
}
_ => MessagingError::BrokerError(err.to_string()),
}
}
}
#[cfg(feature = "nats")]
impl From<async_nats::ConnectError> for MessagingError {
fn from(err: async_nats::ConnectError) -> Self {
MessagingError::Connection(err.to_string())
}
}
#[cfg(feature = "nats")]
impl From<async_nats::PublishError> for MessagingError {
fn from(err: async_nats::PublishError) -> Self {
MessagingError::Publish(err.to_string())
}
}
#[cfg(feature = "nats")]
impl From<async_nats::SubscribeError> for MessagingError {
fn from(err: async_nats::SubscribeError) -> Self {
MessagingError::Subscribe(err.to_string())
}
}