1use thiserror::Error;
2
3#[derive(Error, Debug)]
5pub enum ProcessingError {
6 #[error("retryable error: {0}")]
8 Retryable(#[source] anyhow::Error),
9 #[error("non-retryable error: {0}")]
11 NonRetryable(#[source] anyhow::Error),
12 #[error("connection error: {0}")]
14 Connection(#[source] anyhow::Error),
15}
16impl ProcessingError {
17 pub fn is_connection_error(&self) -> bool {
18 matches!(self, ProcessingError::Connection(_))
19 }
20}
21
22pub type HandlerError = ProcessingError;
23pub type PublisherError = ProcessingError;
24
25#[derive(Error, Debug)]
27pub enum ConsumerError {
28 #[error("consumer connection error: {0}")]
30 Connection(#[source] anyhow::Error),
31
32 #[error("consumer gap: requested offset {requested} but earliest available is {base}")]
34 Gap { requested: u64, base: u64 },
35
36 #[error("consumer reached end of stream")]
38 EndOfStream,
39}
40
41impl From<anyhow::Error> for ConsumerError {
42 fn from(err: anyhow::Error) -> Self {
43 ConsumerError::Connection(err)
45 }
46}
47
48impl From<anyhow::Error> for ProcessingError {
49 fn from(err: anyhow::Error) -> Self {
50 ProcessingError::Retryable(err)
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59
60 #[test]
61 fn test_is_connection_error_only_matches_connection_variant() {
62 assert!(ProcessingError::Connection(anyhow::anyhow!("offline")).is_connection_error());
63 assert!(!ProcessingError::Retryable(anyhow::anyhow!("retry")).is_connection_error());
64 assert!(!ProcessingError::NonRetryable(anyhow::anyhow!("stop")).is_connection_error());
65 }
66
67 #[test]
68 fn test_anyhow_error_conversions_use_default_variants() {
69 let consumer_error = ConsumerError::from(anyhow::anyhow!("consumer failure"));
70 assert!(matches!(consumer_error, ConsumerError::Connection(_)));
71
72 let processing_error = ProcessingError::from(anyhow::anyhow!("processing failure"));
73 assert!(matches!(processing_error, ProcessingError::Retryable(_)));
74 }
75
76 #[test]
77 fn test_consumer_error_display_messages() {
78 assert_eq!(
79 ConsumerError::Gap {
80 requested: 42,
81 base: 9
82 }
83 .to_string(),
84 "consumer gap: requested offset 42 but earliest available is 9"
85 );
86 assert_eq!(
87 ConsumerError::EndOfStream.to_string(),
88 "consumer reached end of stream"
89 );
90 }
91}