Skip to main content

mq_bridge/
errors.rs

1use thiserror::Error;
2
3/// Errors that can occur during message processing (handling or publishing).
4#[derive(Error, Debug)]
5pub enum ProcessingError {
6    /// A transient error occurred. The operation should be retried.
7    #[error("retryable error: {0}")]
8    Retryable(#[source] anyhow::Error),
9    /// A permanent error occurred. The operation should not be retried.
10    #[error("non-retryable error: {0}")]
11    NonRetryable(#[source] anyhow::Error),
12    /// A connection-level error occurred. Used to signal broker disconnects, etc.
13    #[error("connection error: {0}")]
14    Connection(#[source] anyhow::Error),
15}
16impl ProcessingError {
17    pub fn is_connection_error(&self) -> bool {
18        matches!(self, ProcessingError::Connection(_))
19    }
20}
21
22pub type HandlerError = ProcessingError;
23pub type PublisherError = ProcessingError;
24
25/// Errors that can occur when consuming messages.
26#[derive(Error, Debug)]
27pub enum ConsumerError {
28    /// A transport-level or other error occurred that should trigger a reconnect.
29    #[error("consumer connection error: {0}")]
30    Connection(#[source] anyhow::Error),
31
32    /// A consumer gap was detected: the requested events were already garbage-collected.
33    #[error("consumer gap: requested offset {requested} but earliest available is {base}")]
34    Gap { requested: u64, base: u64 },
35
36    /// The consumer has reached the end of the stream and has shut down gracefully.
37    #[error("consumer reached end of stream")]
38    EndOfStream,
39}
40
41impl From<anyhow::Error> for ConsumerError {
42    fn from(err: anyhow::Error) -> Self {
43        // By default, we'll treat any generic error as a connection-level, retryable error.
44        ConsumerError::Connection(err)
45    }
46}
47
48impl From<anyhow::Error> for ProcessingError {
49    fn from(err: anyhow::Error) -> Self {
50        // Default to Retryable for generic errors. Callers should use
51        // ProcessingError::NonRetryable directly for known permanent failures.
52        ProcessingError::Retryable(err)
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59
60    #[test]
61    fn test_is_connection_error_only_matches_connection_variant() {
62        assert!(ProcessingError::Connection(anyhow::anyhow!("offline")).is_connection_error());
63        assert!(!ProcessingError::Retryable(anyhow::anyhow!("retry")).is_connection_error());
64        assert!(!ProcessingError::NonRetryable(anyhow::anyhow!("stop")).is_connection_error());
65    }
66
67    #[test]
68    fn test_anyhow_error_conversions_use_default_variants() {
69        let consumer_error = ConsumerError::from(anyhow::anyhow!("consumer failure"));
70        assert!(matches!(consumer_error, ConsumerError::Connection(_)));
71
72        let processing_error = ProcessingError::from(anyhow::anyhow!("processing failure"));
73        assert!(matches!(processing_error, ProcessingError::Retryable(_)));
74    }
75
76    #[test]
77    fn test_consumer_error_display_messages() {
78        assert_eq!(
79            ConsumerError::Gap {
80                requested: 42,
81                base: 9
82            }
83            .to_string(),
84            "consumer gap: requested offset 42 but earliest available is 9"
85        );
86        assert_eq!(
87            ConsumerError::EndOfStream.to_string(),
88            "consumer reached end of stream"
89        );
90    }
91}