memberlist_plumtree/
error.rs

1//! Error types for the Plumtree protocol.
2//!
3//! Errors are classified as either **transient** or **permanent**:
4//!
5//! - **Transient errors** may succeed if retried (e.g., network timeouts, temporary unavailability)
6//! - **Permanent errors** will not succeed with retries (e.g., invalid configuration, message too large)
7//!
8//! Use [`Error::is_transient()`] to check if an operation should be retried.
9
10use crate::message::MessageId;
11use std::fmt;
12
13/// Result type alias for Plumtree operations.
14pub type Result<T> = std::result::Result<T, Error>;
15
16/// Classification of error types for retry decisions.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ErrorKind {
19    /// Error is transient and may succeed if retried.
20    ///
21    /// Examples: network timeout, temporary peer unavailability, channel full.
22    Transient,
23
24    /// Error is permanent and will not succeed with retries.
25    ///
26    /// Examples: message too large, invalid configuration, shutdown.
27    Permanent,
28}
29
30/// Errors that can occur during Plumtree operations.
31#[derive(Debug)]
32pub enum Error {
33    /// Message payload exceeds the configured maximum size.
34    MessageTooLarge {
35        /// Size of the message in bytes.
36        size: usize,
37        /// Maximum allowed size.
38        max_size: usize,
39    },
40
41    /// Message was not found in the cache.
42    MessageNotFound(MessageId),
43
44    /// Failed to encode a message.
45    Encode(String),
46
47    /// Failed to decode a message.
48    Decode(String),
49
50    /// Network send operation failed.
51    Send {
52        /// Target node that we failed to send to.
53        target: String,
54        /// Underlying error message.
55        reason: String,
56    },
57
58    /// The Plumtree instance has been shut down.
59    Shutdown,
60
61    /// No peers available for broadcast.
62    NoPeers,
63
64    /// Internal channel error.
65    Channel(String),
66
67    /// Outgoing message queue is full (backpressure).
68    ///
69    /// This error indicates the system is under load and the caller
70    /// should back off and retry later.
71    QueueFull {
72        /// Number of messages that failed to queue.
73        dropped: usize,
74        /// Current queue capacity.
75        capacity: usize,
76    },
77
78    /// Memberlist operation failed.
79    Memberlist(String),
80
81    /// Configuration error.
82    Config(String),
83
84    /// Generic IO error.
85    Io(std::io::Error),
86
87    /// Custom error with message.
88    Custom(String),
89}
90
91impl fmt::Display for Error {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        match self {
94            Error::MessageTooLarge { size, max_size } => {
95                write!(
96                    f,
97                    "message size ({} bytes) exceeds maximum ({} bytes)",
98                    size, max_size
99                )
100            }
101            Error::MessageNotFound(id) => {
102                write!(f, "message not found in cache: {}", id)
103            }
104            Error::Encode(msg) => {
105                write!(f, "failed to encode message: {}", msg)
106            }
107            Error::Decode(msg) => {
108                write!(f, "failed to decode message: {}", msg)
109            }
110            Error::Send { target, reason } => {
111                write!(f, "failed to send to {}: {}", target, reason)
112            }
113            Error::Shutdown => {
114                write!(f, "plumtree instance has been shut down")
115            }
116            Error::NoPeers => {
117                write!(f, "no peers available for broadcast")
118            }
119            Error::Channel(msg) => {
120                write!(f, "channel error: {}", msg)
121            }
122            Error::QueueFull { dropped, capacity } => {
123                write!(
124                    f,
125                    "outgoing message queue is full ({} messages dropped, capacity {})",
126                    dropped, capacity
127                )
128            }
129            Error::Memberlist(msg) => {
130                write!(f, "memberlist error: {}", msg)
131            }
132            Error::Config(msg) => {
133                write!(f, "configuration error: {}", msg)
134            }
135            Error::Io(err) => {
136                write!(f, "IO error: {}", err)
137            }
138            Error::Custom(msg) => {
139                write!(f, "{}", msg)
140            }
141        }
142    }
143}
144
145impl std::error::Error for Error {
146    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
147        match self {
148            Error::Io(err) => Some(err),
149            _ => None,
150        }
151    }
152}
153
154impl Error {
155    /// Get the classification of this error.
156    ///
157    /// Returns [`ErrorKind::Transient`] for errors that may succeed if retried,
158    /// or [`ErrorKind::Permanent`] for errors that will not succeed with retries.
159    pub const fn kind(&self) -> ErrorKind {
160        match self {
161            // Permanent errors - these will never succeed with retries
162            Error::MessageTooLarge { .. } => ErrorKind::Permanent,
163            Error::Config(_) => ErrorKind::Permanent,
164            Error::Shutdown => ErrorKind::Permanent,
165            Error::Encode(_) => ErrorKind::Permanent,
166            Error::Decode(_) => ErrorKind::Permanent,
167
168            // Transient errors - these may succeed if retried
169            Error::MessageNotFound(_) => ErrorKind::Transient,
170            Error::Send { .. } => ErrorKind::Transient,
171            Error::NoPeers => ErrorKind::Transient,
172            Error::Channel(_) => ErrorKind::Transient,
173            Error::QueueFull { .. } => ErrorKind::Transient,
174            Error::Memberlist(_) => ErrorKind::Transient,
175            Error::Io(_) => ErrorKind::Transient,
176
177            // Custom errors default to transient (conservative approach)
178            Error::Custom(_) => ErrorKind::Transient,
179        }
180    }
181
182    /// Check if this error is transient (may succeed if retried).
183    ///
184    /// # Example
185    ///
186    /// ```ignore
187    /// match plumtree.broadcast(payload).await {
188    ///     Ok(_) => {}
189    ///     Err(e) if e.is_transient() => {
190    ///         // Retry after a delay
191    ///         tokio::time::sleep(Duration::from_millis(100)).await;
192    ///         plumtree.broadcast(payload).await?;
193    ///     }
194    ///     Err(e) => return Err(e), // Permanent error, don't retry
195    /// }
196    /// ```
197    pub const fn is_transient(&self) -> bool {
198        matches!(self.kind(), ErrorKind::Transient)
199    }
200
201    /// Check if this error is permanent (will not succeed with retries).
202    pub const fn is_permanent(&self) -> bool {
203        matches!(self.kind(), ErrorKind::Permanent)
204    }
205
206    /// Check if this error indicates the system is shutting down.
207    pub const fn is_shutdown(&self) -> bool {
208        matches!(self, Error::Shutdown)
209    }
210
211    /// Check if this is a rate limiting or resource exhaustion error.
212    pub fn is_resource_exhausted(&self) -> bool {
213        match self {
214            Error::Channel(msg) => msg.contains("full") || msg.contains("capacity"),
215            Error::QueueFull { .. } => true,
216            Error::NoPeers => true,
217            _ => false,
218        }
219    }
220
221    /// Check if this is a queue full error (backpressure).
222    pub const fn is_queue_full(&self) -> bool {
223        matches!(self, Error::QueueFull { .. })
224    }
225}
226
227impl From<std::io::Error> for Error {
228    fn from(err: std::io::Error) -> Self {
229        Error::Io(err)
230    }
231}
232
233impl<T> From<async_channel::SendError<T>> for Error {
234    fn from(err: async_channel::SendError<T>) -> Self {
235        Error::Channel(err.to_string())
236    }
237}
238
239impl From<async_channel::RecvError> for Error {
240    fn from(err: async_channel::RecvError) -> Self {
241        Error::Channel(err.to_string())
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn test_error_display() {
251        let err = Error::MessageTooLarge {
252            size: 100000,
253            max_size: 65536,
254        };
255        assert!(err.to_string().contains("100000"));
256        assert!(err.to_string().contains("65536"));
257    }
258
259    #[test]
260    fn test_error_from_io() {
261        let io_err = std::io::Error::new(std::io::ErrorKind::Other, "test error");
262        let err: Error = io_err.into();
263        assert!(matches!(err, Error::Io(_)));
264    }
265
266    #[test]
267    fn test_permanent_errors() {
268        // MessageTooLarge is permanent
269        let err = Error::MessageTooLarge {
270            size: 100,
271            max_size: 50,
272        };
273        assert!(err.is_permanent());
274        assert!(!err.is_transient());
275        assert_eq!(err.kind(), ErrorKind::Permanent);
276
277        // Config errors are permanent
278        let err = Error::Config("bad config".to_string());
279        assert!(err.is_permanent());
280
281        // Shutdown is permanent
282        let err = Error::Shutdown;
283        assert!(err.is_permanent());
284        assert!(err.is_shutdown());
285
286        // Encode/Decode errors are permanent
287        let err = Error::Encode("bad format".to_string());
288        assert!(err.is_permanent());
289
290        let err = Error::Decode("invalid data".to_string());
291        assert!(err.is_permanent());
292    }
293
294    #[test]
295    fn test_transient_errors() {
296        // Send errors are transient (network issues)
297        let err = Error::Send {
298            target: "node1".to_string(),
299            reason: "connection refused".to_string(),
300        };
301        assert!(err.is_transient());
302        assert!(!err.is_permanent());
303        assert_eq!(err.kind(), ErrorKind::Transient);
304
305        // NoPeers is transient (peers may join later)
306        let err = Error::NoPeers;
307        assert!(err.is_transient());
308
309        // Channel errors are transient
310        let err = Error::Channel("channel closed".to_string());
311        assert!(err.is_transient());
312
313        // MessageNotFound is transient (message may be cached later)
314        let err = Error::MessageNotFound(MessageId::new());
315        assert!(err.is_transient());
316
317        // IO errors are transient
318        let io_err = std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout");
319        let err: Error = io_err.into();
320        assert!(err.is_transient());
321    }
322
323    #[test]
324    fn test_resource_exhausted() {
325        let err = Error::Channel("channel full".to_string());
326        assert!(err.is_resource_exhausted());
327
328        let err = Error::NoPeers;
329        assert!(err.is_resource_exhausted());
330
331        let err = Error::Shutdown;
332        assert!(!err.is_resource_exhausted());
333    }
334
335    #[test]
336    fn test_queue_full_error() {
337        let err = Error::QueueFull {
338            dropped: 5,
339            capacity: 1024,
340        };
341
342        // QueueFull is transient
343        assert!(err.is_transient());
344        assert!(!err.is_permanent());
345        assert_eq!(err.kind(), ErrorKind::Transient);
346
347        // QueueFull indicates resource exhaustion
348        assert!(err.is_resource_exhausted());
349        assert!(err.is_queue_full());
350
351        // Check display format
352        let msg = err.to_string();
353        assert!(msg.contains("5"));
354        assert!(msg.contains("1024"));
355        assert!(msg.contains("queue"));
356    }
357}