armature_messaging/
error.rs

1//! Error types for messaging operations
2
3use thiserror::Error;
4
5/// Errors that can occur during messaging operations
6#[derive(Error, Debug)]
7pub enum MessagingError {
8    /// Failed to connect to the broker
9    #[error("Connection failed: {0}")]
10    Connection(String),
11
12    /// Failed to publish a message
13    #[error("Publish failed: {0}")]
14    Publish(String),
15
16    /// Failed to subscribe to a topic/queue
17    #[error("Subscribe failed: {0}")]
18    Subscribe(String),
19
20    /// Failed to acknowledge a message
21    #[error("Acknowledge failed: {0}")]
22    Acknowledge(String),
23
24    /// Failed to serialize a message
25    #[error("Serialization failed: {0}")]
26    Serialization(String),
27
28    /// Failed to deserialize a message
29    #[error("Deserialization failed: {0}")]
30    Deserialization(String),
31
32    /// Operation timed out
33    #[error("Operation timed out: {0}")]
34    Timeout(String),
35
36    /// Authentication failed
37    #[error("Authentication failed: {0}")]
38    Authentication(String),
39
40    /// Authorization failed
41    #[error("Authorization failed: {0}")]
42    Authorization(String),
43
44    /// Invalid configuration
45    #[error("Invalid configuration: {0}")]
46    Configuration(String),
47
48    /// Channel/connection is closed
49    #[error("Channel closed: {0}")]
50    ChannelClosed(String),
51
52    /// Queue/topic not found
53    #[error("Queue/topic not found: {0}")]
54    NotFound(String),
55
56    /// Queue/topic already exists
57    #[error("Queue/topic already exists: {0}")]
58    AlreadyExists(String),
59
60    /// Resource exhausted (e.g., too many connections)
61    #[error("Resource exhausted: {0}")]
62    ResourceExhausted(String),
63
64    /// Message was rejected by the broker
65    #[error("Message rejected: {0}")]
66    Rejected(String),
67
68    /// Message expired (TTL exceeded)
69    #[error("Message expired: {0}")]
70    Expired(String),
71
72    /// Internal broker error
73    #[error("Broker error: {0}")]
74    BrokerError(String),
75
76    /// IO error
77    #[error("IO error: {0}")]
78    Io(#[from] std::io::Error),
79
80    /// Other errors
81    #[error("{0}")]
82    Other(String),
83}
84
85impl MessagingError {
86    /// Check if this error is retryable
87    pub fn is_retryable(&self) -> bool {
88        matches!(
89            self,
90            MessagingError::Connection(_)
91                | MessagingError::Timeout(_)
92                | MessagingError::ChannelClosed(_)
93                | MessagingError::ResourceExhausted(_)
94                | MessagingError::BrokerError(_)
95        )
96    }
97
98    /// Check if this error indicates a connection issue
99    pub fn is_connection_error(&self) -> bool {
100        matches!(
101            self,
102            MessagingError::Connection(_)
103                | MessagingError::ChannelClosed(_)
104                | MessagingError::Authentication(_)
105        )
106    }
107}
108
109#[cfg(feature = "rabbitmq")]
110impl From<lapin::Error> for MessagingError {
111    fn from(err: lapin::Error) -> Self {
112        match &err {
113            lapin::Error::IOError(_) => MessagingError::Connection(err.to_string()),
114            lapin::Error::ChannelsLimitReached => {
115                MessagingError::ResourceExhausted(err.to_string())
116            }
117            lapin::Error::InvalidChannelState(_) => MessagingError::ChannelClosed(err.to_string()),
118            lapin::Error::InvalidConnectionState(_) => MessagingError::Connection(err.to_string()),
119            _ => MessagingError::BrokerError(err.to_string()),
120        }
121    }
122}
123
124#[cfg(feature = "kafka")]
125impl From<rdkafka::error::KafkaError> for MessagingError {
126    fn from(err: rdkafka::error::KafkaError) -> Self {
127        match &err {
128            rdkafka::error::KafkaError::MessageProduction(_) => {
129                MessagingError::Publish(err.to_string())
130            }
131            rdkafka::error::KafkaError::MessageConsumption(_) => {
132                MessagingError::Subscribe(err.to_string())
133            }
134            rdkafka::error::KafkaError::ClientCreation(_) => {
135                MessagingError::Connection(err.to_string())
136            }
137            _ => MessagingError::BrokerError(err.to_string()),
138        }
139    }
140}
141
142#[cfg(feature = "nats")]
143impl From<async_nats::ConnectError> for MessagingError {
144    fn from(err: async_nats::ConnectError) -> Self {
145        MessagingError::Connection(err.to_string())
146    }
147}
148
149#[cfg(feature = "nats")]
150impl From<async_nats::PublishError> for MessagingError {
151    fn from(err: async_nats::PublishError) -> Self {
152        MessagingError::Publish(err.to_string())
153    }
154}
155
156#[cfg(feature = "nats")]
157impl From<async_nats::SubscribeError> for MessagingError {
158    fn from(err: async_nats::SubscribeError) -> Self {
159        MessagingError::Subscribe(err.to_string())
160    }
161}