1use thiserror::Error;
2
3#[derive(Error, Debug)]
4pub enum Error {
5 #[error("Connection error: {0}")]
6 ConnectionError(String),
7
8 #[error("Configuration error: {0}")]
9 ConfigError(String),
10
11 #[error("IO error ({0:?}): {1}")]
12 IoError(std::io::ErrorKind, String),
13
14 #[error("Serialization error: {0}")]
15 SerializationError(#[from] postcard::Error),
16
17 #[error("Protocol error: {0}")]
18 ProtocolError(#[from] rivven_protocol::ProtocolError),
19
20 #[error("Server error: {0}")]
21 ServerError(String),
22
23 #[error("Authentication failed: {0}")]
24 AuthenticationFailed(String),
25
26 #[error("Invalid response")]
27 InvalidResponse,
28
29 #[error("Response too large: {0} bytes (max: {1})")]
30 ResponseTooLarge(usize, usize),
31
32 #[error("Request too large: {0} bytes (max: {1})")]
33 RequestTooLarge(usize, usize),
34
35 #[error("Circuit breaker open for server: {0}")]
36 CircuitBreakerOpen(String),
37
38 #[error("Connection pool exhausted: {0}")]
39 PoolExhausted(String),
40
41 #[error("All servers unavailable")]
42 AllServersUnavailable,
43
44 #[error("Request timeout")]
45 Timeout,
46
47 #[error("Timeout: {0}")]
48 TimeoutWithMessage(String),
49
50 #[error("{0}")]
51 Other(String),
52}
53
54impl Error {
55 pub fn is_retriable(&self) -> bool {
64 match self {
65 Error::ConfigError(_)
67 | Error::AuthenticationFailed(_)
68 | Error::RequestTooLarge(_, _)
69 | Error::SerializationError(_) => false,
70
71 Error::ServerError(msg) => {
74 let upper = msg.to_uppercase();
75 if upper.starts_with("PRODUCER_FENCED")
77 || upper.starts_with("INVALID_PRODUCER_EPOCH")
78 || upper.starts_with("TRANSACTIONAL_ID_AUTHORIZATION_FAILED")
80 || upper.starts_with("CLUSTER_AUTHORIZATION_FAILED")
81 || upper.starts_with("TOPIC_AUTHORIZATION_FAILED")
82 || upper.starts_with("GROUP_AUTHORIZATION_FAILED")
83 || upper.starts_with("INVALID_TOPIC")
85 || upper.starts_with("INVALID_PARTITIONS")
86 || upper.starts_with("INVALID_REPLICATION_FACTOR")
87 || upper.starts_with("INVALID_REQUIRED_ACKS")
88 || upper.starts_with("UNSUPPORTED_VERSION")
90 || upper.starts_with("UNSUPPORTED_COMPRESSION")
91 || upper.starts_with("UNSUPPORTED_FOR_MESSAGE_FORMAT")
92 || upper.starts_with("SECURITY_DISABLED")
94 || upper.starts_with("ILLEGAL_SASL_STATE")
95 || upper.starts_with("ILLEGAL_GENERATION")
97 || upper.starts_with("UNKNOWN_MEMBER_ID")
98 || upper.starts_with("INVALID_SESSION_TIMEOUT")
99 || upper.starts_with("RECORD_TOO_LARGE")
101 || upper.starts_with("MESSAGE_TOO_LARGE")
102 || upper.starts_with("INVALID_GROUP_ID")
104 || upper.starts_with("INVALID_TXN_STATE")
105 || upper.starts_with("INVALID_PRODUCER_ID_MAPPING")
106 {
107 false
108 } else {
109 true
110 }
111 }
112
113 Error::ConnectionError(_)
115 | Error::IoError(_, _)
116 | Error::ProtocolError(_)
117 | Error::InvalidResponse
118 | Error::ResponseTooLarge(_, _)
119 | Error::CircuitBreakerOpen(_)
120 | Error::PoolExhausted(_)
121 | Error::AllServersUnavailable
122 | Error::Timeout
123 | Error::TimeoutWithMessage(_)
124 | Error::Other(_) => true,
125 }
126 }
127}
128
129impl From<std::io::Error> for Error {
130 fn from(err: std::io::Error) -> Self {
131 Error::IoError(err.kind(), err.to_string())
132 }
133}
134
135pub type Result<T> = std::result::Result<T, Error>;
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[test]
142 fn test_error_display() {
143 assert_eq!(
144 Error::ConnectionError("refused".to_string()).to_string(),
145 "Connection error: refused"
146 );
147 assert_eq!(
148 Error::ServerError("internal error".to_string()).to_string(),
149 "Server error: internal error"
150 );
151 assert_eq!(
152 Error::AuthenticationFailed("bad password".to_string()).to_string(),
153 "Authentication failed: bad password"
154 );
155 assert_eq!(Error::InvalidResponse.to_string(), "Invalid response");
156 assert_eq!(
157 Error::ResponseTooLarge(1000, 500).to_string(),
158 "Response too large: 1000 bytes (max: 500)"
159 );
160 assert_eq!(
161 Error::RequestTooLarge(2000, 1000).to_string(),
162 "Request too large: 2000 bytes (max: 1000)"
163 );
164 assert_eq!(
165 Error::CircuitBreakerOpen("server1".to_string()).to_string(),
166 "Circuit breaker open for server: server1"
167 );
168 assert_eq!(
169 Error::PoolExhausted("max connections".to_string()).to_string(),
170 "Connection pool exhausted: max connections"
171 );
172 assert_eq!(
173 Error::AllServersUnavailable.to_string(),
174 "All servers unavailable"
175 );
176 assert_eq!(Error::Timeout.to_string(), "Request timeout");
177 assert_eq!(
178 Error::TimeoutWithMessage("connect".to_string()).to_string(),
179 "Timeout: connect"
180 );
181 assert_eq!(
182 Error::Other("custom error".to_string()).to_string(),
183 "custom error"
184 );
185 }
186
187 #[test]
188 fn test_error_debug() {
189 let err = Error::ConnectionError("test".to_string());
190 let debug = format!("{:?}", err);
191 assert!(debug.contains("ConnectionError"));
192 assert!(debug.contains("test"));
193 }
194
195 #[test]
196 fn test_io_error_from() {
197 let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file not found");
198 let err: Error = io_err.into();
199 match &err {
200 Error::IoError(kind, msg) => {
201 assert_eq!(*kind, std::io::ErrorKind::NotFound);
202 assert!(msg.contains("file not found"));
203 }
204 _ => panic!("Expected IoError"),
205 }
206 }
207
208 #[test]
209 fn test_postcard_error_from() {
210 let invalid_data: &[u8] = &[];
212 let result: std::result::Result<String, postcard::Error> =
213 postcard::from_bytes(invalid_data);
214 assert!(result.is_err());
215 let postcard_err = result.unwrap_err();
216 let err: Error = postcard_err.into();
217 assert!(matches!(err, Error::SerializationError(_)));
218 }
219
220 #[test]
221 fn test_result_type() {
222 fn returns_ok() -> Result<i32> {
223 Ok(42)
224 }
225
226 fn returns_err() -> Result<i32> {
227 Err(Error::InvalidResponse)
228 }
229
230 assert!(returns_ok().is_ok());
231 assert_eq!(returns_ok().unwrap(), 42);
232 assert!(returns_err().is_err());
233 }
234
235 #[test]
236 fn test_is_retriable_permanent_errors() {
237 assert!(!Error::ConfigError("bad config".into()).is_retriable());
239
240 assert!(!Error::AuthenticationFailed("bad password".into()).is_retriable());
242
243 assert!(!Error::RequestTooLarge(1_000_000, 65_536).is_retriable());
245
246 assert!(!Error::ServerError("PRODUCER_FENCED: epoch mismatch".into()).is_retriable());
248 assert!(!Error::ServerError("INVALID_TOPIC: bad name".into()).is_retriable());
249 assert!(!Error::ServerError("TOPIC_AUTHORIZATION_FAILED".into()).is_retriable());
250 assert!(!Error::ServerError("CLUSTER_AUTHORIZATION_FAILED".into()).is_retriable());
251 assert!(!Error::ServerError("TRANSACTIONAL_ID_AUTHORIZATION_FAILED".into()).is_retriable());
252 assert!(!Error::ServerError("INVALID_PRODUCER_EPOCH: stale epoch".into()).is_retriable());
253 assert!(!Error::ServerError("UNSUPPORTED_VERSION".into()).is_retriable());
254 assert!(!Error::ServerError("UNSUPPORTED_COMPRESSION_TYPE".into()).is_retriable());
255 assert!(!Error::ServerError("UNSUPPORTED_FOR_MESSAGE_FORMAT".into()).is_retriable());
256
257 assert!(!Error::ServerError("ILLEGAL_GENERATION: expected 5, got 3".into()).is_retriable());
259 assert!(!Error::ServerError("UNKNOWN_MEMBER_ID: consumer-abc".into()).is_retriable());
260 assert!(!Error::ServerError("INVALID_SESSION_TIMEOUT: too large".into()).is_retriable());
261
262 assert!(!Error::ServerError("RECORD_TOO_LARGE: 10485760 bytes".into()).is_retriable());
264 assert!(!Error::ServerError("MESSAGE_TOO_LARGE".into()).is_retriable());
265 assert!(!Error::ServerError("INVALID_REQUIRED_ACKS".into()).is_retriable());
266 }
267
268 #[test]
269 fn test_is_retriable_transient_errors() {
270 assert!(Error::IoError(std::io::ErrorKind::ConnectionReset, "reset".into()).is_retriable());
272 assert!(Error::ConnectionError("refused".into()).is_retriable());
273
274 assert!(Error::Timeout.is_retriable());
276 assert!(Error::TimeoutWithMessage("connect".into()).is_retriable());
277
278 assert!(Error::CircuitBreakerOpen("server1".into()).is_retriable());
280 assert!(Error::PoolExhausted("max conns".into()).is_retriable());
281 assert!(Error::AllServersUnavailable.is_retriable());
282
283 assert!(Error::ServerError("internal error".into()).is_retriable());
285 assert!(Error::ServerError("NOT_LEADER_FOR_PARTITION".into()).is_retriable());
286
287 assert!(Error::ServerError("UNKNOWN_TOPIC: topic-1".into()).is_retriable());
289
290 assert!(Error::InvalidResponse.is_retriable());
292 }
293}