armature_messaging/
error.rs1use thiserror::Error;
4
5#[derive(Error, Debug)]
7pub enum MessagingError {
8 #[error("Connection failed: {0}")]
10 Connection(String),
11
12 #[error("Publish failed: {0}")]
14 Publish(String),
15
16 #[error("Subscribe failed: {0}")]
18 Subscribe(String),
19
20 #[error("Acknowledge failed: {0}")]
22 Acknowledge(String),
23
24 #[error("Serialization failed: {0}")]
26 Serialization(String),
27
28 #[error("Deserialization failed: {0}")]
30 Deserialization(String),
31
32 #[error("Operation timed out: {0}")]
34 Timeout(String),
35
36 #[error("Authentication failed: {0}")]
38 Authentication(String),
39
40 #[error("Authorization failed: {0}")]
42 Authorization(String),
43
44 #[error("Invalid configuration: {0}")]
46 Configuration(String),
47
48 #[error("Channel closed: {0}")]
50 ChannelClosed(String),
51
52 #[error("Queue/topic not found: {0}")]
54 NotFound(String),
55
56 #[error("Queue/topic already exists: {0}")]
58 AlreadyExists(String),
59
60 #[error("Resource exhausted: {0}")]
62 ResourceExhausted(String),
63
64 #[error("Message rejected: {0}")]
66 Rejected(String),
67
68 #[error("Message expired: {0}")]
70 Expired(String),
71
72 #[error("Broker error: {0}")]
74 BrokerError(String),
75
76 #[error("IO error: {0}")]
78 Io(#[from] std::io::Error),
79
80 #[error("{0}")]
82 Other(String),
83}
84
85impl MessagingError {
86 pub fn is_retryable(&self) -> bool {
88 matches!(
89 self,
90 MessagingError::Connection(_)
91 | MessagingError::Timeout(_)
92 | MessagingError::ChannelClosed(_)
93 | MessagingError::ResourceExhausted(_)
94 | MessagingError::BrokerError(_)
95 )
96 }
97
98 pub fn is_connection_error(&self) -> bool {
100 matches!(
101 self,
102 MessagingError::Connection(_)
103 | MessagingError::ChannelClosed(_)
104 | MessagingError::Authentication(_)
105 )
106 }
107}
108
109#[cfg(feature = "rabbitmq")]
110impl From<lapin::Error> for MessagingError {
111 fn from(err: lapin::Error) -> Self {
112 match &err {
113 lapin::Error::IOError(_) => MessagingError::Connection(err.to_string()),
114 lapin::Error::ChannelsLimitReached => {
115 MessagingError::ResourceExhausted(err.to_string())
116 }
117 lapin::Error::InvalidChannelState(_) => MessagingError::ChannelClosed(err.to_string()),
118 lapin::Error::InvalidConnectionState(_) => MessagingError::Connection(err.to_string()),
119 _ => MessagingError::BrokerError(err.to_string()),
120 }
121 }
122}
123
124#[cfg(feature = "kafka")]
125impl From<rdkafka::error::KafkaError> for MessagingError {
126 fn from(err: rdkafka::error::KafkaError) -> Self {
127 match &err {
128 rdkafka::error::KafkaError::MessageProduction(_) => {
129 MessagingError::Publish(err.to_string())
130 }
131 rdkafka::error::KafkaError::MessageConsumption(_) => {
132 MessagingError::Subscribe(err.to_string())
133 }
134 rdkafka::error::KafkaError::ClientCreation(_) => {
135 MessagingError::Connection(err.to_string())
136 }
137 _ => MessagingError::BrokerError(err.to_string()),
138 }
139 }
140}
141
142#[cfg(feature = "nats")]
143impl From<async_nats::ConnectError> for MessagingError {
144 fn from(err: async_nats::ConnectError) -> Self {
145 MessagingError::Connection(err.to_string())
146 }
147}
148
149#[cfg(feature = "nats")]
150impl From<async_nats::PublishError> for MessagingError {
151 fn from(err: async_nats::PublishError) -> Self {
152 MessagingError::Publish(err.to_string())
153 }
154}
155
156#[cfg(feature = "nats")]
157impl From<async_nats::SubscribeError> for MessagingError {
158 fn from(err: async_nats::SubscribeError) -> Self {
159 MessagingError::Subscribe(err.to_string())
160 }
161}