use thiserror::Error;
#[derive(Error, Debug)]
pub enum ProcessingError {
#[error("retryable error: {0}")]
Retryable(#[source] anyhow::Error),
#[error("non-retryable error: {0}")]
NonRetryable(#[source] anyhow::Error),
#[error("connection error: {0}")]
Connection(#[source] anyhow::Error),
}
impl ProcessingError {
pub fn is_connection_error(&self) -> bool {
matches!(self, ProcessingError::Connection(_))
}
}
pub type HandlerError = ProcessingError;
pub type PublisherError = ProcessingError;
#[derive(Error, Debug)]
pub enum ConsumerError {
#[error("consumer connection error: {0}")]
Connection(#[source] anyhow::Error),
#[error("consumer gap: requested offset {requested} but earliest available is {base}")]
Gap { requested: u64, base: u64 },
#[error("consumer reached end of stream")]
EndOfStream,
}
impl From<anyhow::Error> for ConsumerError {
fn from(err: anyhow::Error) -> Self {
ConsumerError::Connection(err)
}
}
impl From<anyhow::Error> for ProcessingError {
fn from(err: anyhow::Error) -> Self {
ProcessingError::Retryable(err)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_connection_error_only_matches_connection_variant() {
assert!(ProcessingError::Connection(anyhow::anyhow!("offline")).is_connection_error());
assert!(!ProcessingError::Retryable(anyhow::anyhow!("retry")).is_connection_error());
assert!(!ProcessingError::NonRetryable(anyhow::anyhow!("stop")).is_connection_error());
}
#[test]
fn test_anyhow_error_conversions_use_default_variants() {
let consumer_error = ConsumerError::from(anyhow::anyhow!("consumer failure"));
assert!(matches!(consumer_error, ConsumerError::Connection(_)));
let processing_error = ProcessingError::from(anyhow::anyhow!("processing failure"));
assert!(matches!(processing_error, ProcessingError::Retryable(_)));
}
#[test]
fn test_consumer_error_display_messages() {
assert_eq!(
ConsumerError::Gap {
requested: 42,
base: 9
}
.to_string(),
"consumer gap: requested offset 42 but earliest available is 9"
);
assert_eq!(
ConsumerError::EndOfStream.to_string(),
"consumer reached end of stream"
);
}
}