Skip to main content

rivven_client/
error.rs

1use thiserror::Error;
2
3#[derive(Error, Debug)]
4pub enum Error {
5    #[error("Connection error: {0}")]
6    ConnectionError(String),
7
8    #[error("Configuration error: {0}")]
9    ConfigError(String),
10
11    #[error("IO error ({0:?}): {1}")]
12    IoError(std::io::ErrorKind, String),
13
14    #[error("Serialization error: {0}")]
15    SerializationError(#[from] postcard::Error),
16
17    #[error("Protocol error: {0}")]
18    ProtocolError(#[from] rivven_protocol::ProtocolError),
19
20    #[error("Server error: {0}")]
21    ServerError(String),
22
23    #[error("Authentication failed: {0}")]
24    AuthenticationFailed(String),
25
26    #[error("Invalid response")]
27    InvalidResponse,
28
29    #[error("Response too large: {0} bytes (max: {1})")]
30    ResponseTooLarge(usize, usize),
31
32    #[error("Request too large: {0} bytes (max: {1})")]
33    RequestTooLarge(usize, usize),
34
35    #[error("Circuit breaker open for server: {0}")]
36    CircuitBreakerOpen(String),
37
38    #[error("Connection pool exhausted: {0}")]
39    PoolExhausted(String),
40
41    #[error("All servers unavailable")]
42    AllServersUnavailable,
43
44    #[error("Request timeout")]
45    Timeout,
46
47    #[error("Timeout: {0}")]
48    TimeoutWithMessage(String),
49
50    #[error("{0}")]
51    Other(String),
52}
53
54impl Error {
55    /// Classify whether this error is retriable.
56    ///
57    /// **Permanent errors** (not retriable) include authentication failures,
58    /// configuration errors, and server-side rejections for invalid topics,
59    /// producer fencing, authorization failures, etc.
60    ///
61    /// **Transient errors** (retriable) include I/O errors, connection resets,
62    /// timeouts, and generic server errors that may resolve after reconnect.
63    pub fn is_retriable(&self) -> bool {
64        match self {
65            // Permanent — retrying won't help
66            Error::ConfigError(_)
67            | Error::AuthenticationFailed(_)
68            | Error::RequestTooLarge(_, _)
69            | Error::SerializationError(_) => false,
70
71            // ServerError — classify by known error prefixes.
72            // Permanent server-side rejections are not retriable.
73            Error::ServerError(msg) => {
74                let upper = msg.to_uppercase();
75                // Fencing / epoch errors — producer must re-init
76                if upper.starts_with("PRODUCER_FENCED")
77                    || upper.starts_with("INVALID_PRODUCER_EPOCH")
78                    // Authorization failures — credentials won't change on retry
79                    || upper.starts_with("TRANSACTIONAL_ID_AUTHORIZATION_FAILED")
80                    || upper.starts_with("CLUSTER_AUTHORIZATION_FAILED")
81                    || upper.starts_with("TOPIC_AUTHORIZATION_FAILED")
82                    || upper.starts_with("GROUP_AUTHORIZATION_FAILED")
83                    // Topic/partition validation errors — won't resolve without admin action
84                    || upper.starts_with("INVALID_TOPIC")
85                    || upper.starts_with("INVALID_PARTITIONS")
86                    || upper.starts_with("INVALID_REPLICATION_FACTOR")
87                    || upper.starts_with("INVALID_REQUIRED_ACKS")
88                    // Version/feature mismatch — won't change on retry
89                    || upper.starts_with("UNSUPPORTED_VERSION")
90                    || upper.starts_with("UNSUPPORTED_COMPRESSION")
91                    || upper.starts_with("UNSUPPORTED_FOR_MESSAGE_FORMAT")
92                    // Security errors
93                    || upper.starts_with("SECURITY_DISABLED")
94                    || upper.starts_with("ILLEGAL_SASL_STATE")
95                    // Consumer group state errors — need rejoin, not retry
96                    || upper.starts_with("ILLEGAL_GENERATION")
97                    || upper.starts_with("UNKNOWN_MEMBER_ID")
98                    || upper.starts_with("INVALID_SESSION_TIMEOUT")
99                    // Payload errors — won't change on retry
100                    || upper.starts_with("RECORD_TOO_LARGE")
101                    || upper.starts_with("MESSAGE_TOO_LARGE")
102                    // Group / transaction state errors
103                    || upper.starts_with("INVALID_GROUP_ID")
104                    || upper.starts_with("INVALID_TXN_STATE")
105                    || upper.starts_with("INVALID_PRODUCER_ID_MAPPING")
106                {
107                    false
108                } else {
109                    true
110                }
111            }
112
113            // Transient — I/O, connection, pool, timeout errors
114            Error::ConnectionError(_)
115            | Error::IoError(_, _)
116            | Error::ProtocolError(_)
117            | Error::InvalidResponse
118            | Error::ResponseTooLarge(_, _)
119            | Error::CircuitBreakerOpen(_)
120            | Error::PoolExhausted(_)
121            | Error::AllServersUnavailable
122            | Error::Timeout
123            | Error::TimeoutWithMessage(_)
124            | Error::Other(_) => true,
125        }
126    }
127}
128
129impl From<std::io::Error> for Error {
130    fn from(err: std::io::Error) -> Self {
131        Error::IoError(err.kind(), err.to_string())
132    }
133}
134
135pub type Result<T> = std::result::Result<T, Error>;
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn test_error_display() {
143        assert_eq!(
144            Error::ConnectionError("refused".to_string()).to_string(),
145            "Connection error: refused"
146        );
147        assert_eq!(
148            Error::ServerError("internal error".to_string()).to_string(),
149            "Server error: internal error"
150        );
151        assert_eq!(
152            Error::AuthenticationFailed("bad password".to_string()).to_string(),
153            "Authentication failed: bad password"
154        );
155        assert_eq!(Error::InvalidResponse.to_string(), "Invalid response");
156        assert_eq!(
157            Error::ResponseTooLarge(1000, 500).to_string(),
158            "Response too large: 1000 bytes (max: 500)"
159        );
160        assert_eq!(
161            Error::RequestTooLarge(2000, 1000).to_string(),
162            "Request too large: 2000 bytes (max: 1000)"
163        );
164        assert_eq!(
165            Error::CircuitBreakerOpen("server1".to_string()).to_string(),
166            "Circuit breaker open for server: server1"
167        );
168        assert_eq!(
169            Error::PoolExhausted("max connections".to_string()).to_string(),
170            "Connection pool exhausted: max connections"
171        );
172        assert_eq!(
173            Error::AllServersUnavailable.to_string(),
174            "All servers unavailable"
175        );
176        assert_eq!(Error::Timeout.to_string(), "Request timeout");
177        assert_eq!(
178            Error::TimeoutWithMessage("connect".to_string()).to_string(),
179            "Timeout: connect"
180        );
181        assert_eq!(
182            Error::Other("custom error".to_string()).to_string(),
183            "custom error"
184        );
185    }
186
187    #[test]
188    fn test_error_debug() {
189        let err = Error::ConnectionError("test".to_string());
190        let debug = format!("{:?}", err);
191        assert!(debug.contains("ConnectionError"));
192        assert!(debug.contains("test"));
193    }
194
195    #[test]
196    fn test_io_error_from() {
197        let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file not found");
198        let err: Error = io_err.into();
199        match &err {
200            Error::IoError(kind, msg) => {
201                assert_eq!(*kind, std::io::ErrorKind::NotFound);
202                assert!(msg.contains("file not found"));
203            }
204            _ => panic!("Expected IoError"),
205        }
206    }
207
208    #[test]
209    fn test_postcard_error_from() {
210        // Create a postcard deserialization error by trying to deserialize invalid data
211        let invalid_data: &[u8] = &[];
212        let result: std::result::Result<String, postcard::Error> =
213            postcard::from_bytes(invalid_data);
214        assert!(result.is_err());
215        let postcard_err = result.unwrap_err();
216        let err: Error = postcard_err.into();
217        assert!(matches!(err, Error::SerializationError(_)));
218    }
219
220    #[test]
221    fn test_result_type() {
222        fn returns_ok() -> Result<i32> {
223            Ok(42)
224        }
225
226        fn returns_err() -> Result<i32> {
227            Err(Error::InvalidResponse)
228        }
229
230        assert!(returns_ok().is_ok());
231        assert_eq!(returns_ok().unwrap(), 42);
232        assert!(returns_err().is_err());
233    }
234
235    #[test]
236    fn test_is_retriable_permanent_errors() {
237        // Config errors — never retriable
238        assert!(!Error::ConfigError("bad config".into()).is_retriable());
239
240        // Auth failures — never retriable
241        assert!(!Error::AuthenticationFailed("bad password".into()).is_retriable());
242
243        // Request too large — never retriable (payload won't shrink)
244        assert!(!Error::RequestTooLarge(1_000_000, 65_536).is_retriable());
245
246        // Server-side permanent rejections
247        assert!(!Error::ServerError("PRODUCER_FENCED: epoch mismatch".into()).is_retriable());
248        assert!(!Error::ServerError("INVALID_TOPIC: bad name".into()).is_retriable());
249        assert!(!Error::ServerError("TOPIC_AUTHORIZATION_FAILED".into()).is_retriable());
250        assert!(!Error::ServerError("CLUSTER_AUTHORIZATION_FAILED".into()).is_retriable());
251        assert!(!Error::ServerError("TRANSACTIONAL_ID_AUTHORIZATION_FAILED".into()).is_retriable());
252        assert!(!Error::ServerError("INVALID_PRODUCER_EPOCH: stale epoch".into()).is_retriable());
253        assert!(!Error::ServerError("UNSUPPORTED_VERSION".into()).is_retriable());
254        assert!(!Error::ServerError("UNSUPPORTED_COMPRESSION_TYPE".into()).is_retriable());
255        assert!(!Error::ServerError("UNSUPPORTED_FOR_MESSAGE_FORMAT".into()).is_retriable());
256
257        // Consumer group errors — need rejoin, not retry
258        assert!(!Error::ServerError("ILLEGAL_GENERATION: expected 5, got 3".into()).is_retriable());
259        assert!(!Error::ServerError("UNKNOWN_MEMBER_ID: consumer-abc".into()).is_retriable());
260        assert!(!Error::ServerError("INVALID_SESSION_TIMEOUT: too large".into()).is_retriable());
261
262        // Payload too large — won't change on retry
263        assert!(!Error::ServerError("RECORD_TOO_LARGE: 10485760 bytes".into()).is_retriable());
264        assert!(!Error::ServerError("MESSAGE_TOO_LARGE".into()).is_retriable());
265        assert!(!Error::ServerError("INVALID_REQUIRED_ACKS".into()).is_retriable());
266    }
267
268    #[test]
269    fn test_is_retriable_transient_errors() {
270        // I/O errors — retriable after reconnect
271        assert!(Error::IoError(std::io::ErrorKind::ConnectionReset, "reset".into()).is_retriable());
272        assert!(Error::ConnectionError("refused".into()).is_retriable());
273
274        // Timeouts — retriable
275        assert!(Error::Timeout.is_retriable());
276        assert!(Error::TimeoutWithMessage("connect".into()).is_retriable());
277
278        // Pool/circuit — retriable (transient capacity)
279        assert!(Error::CircuitBreakerOpen("server1".into()).is_retriable());
280        assert!(Error::PoolExhausted("max conns".into()).is_retriable());
281        assert!(Error::AllServersUnavailable.is_retriable());
282
283        // Generic server errors — retriable (might be transient load)
284        assert!(Error::ServerError("internal error".into()).is_retriable());
285        assert!(Error::ServerError("NOT_LEADER_FOR_PARTITION".into()).is_retriable());
286
287        // UNKNOWN_TOPIC is retriable to support auto-topic-creation
288        assert!(Error::ServerError("UNKNOWN_TOPIC: topic-1".into()).is_retriable());
289
290        // Invalid response — retriable (might be corruption)
291        assert!(Error::InvalidResponse.is_retriable());
292    }
293}