pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum SourceErrorKind {
NetworkTransient,
AuthFailed,
SchemaMismatch,
SlotNotFound,
QuotaExceeded,
Unknown,
}
impl SourceErrorKind {
pub fn is_recoverable(self) -> bool {
matches!(self, Self::NetworkTransient | Self::QuotaExceeded)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ErrorKind {
Transient,
Terminal,
Configuration,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum FingerprintError {
#[error("cannot fingerprint event with empty source.source_name")]
EmptySourceName,
#[error("cannot fingerprint event with empty source.offset")]
EmptyOffset,
#[error("fingerprint serialisation failed: {0}")]
SerializationFailed(#[from] serde_json::Error),
}
impl From<FingerprintError> for Error {
fn from(err: FingerprintError) -> Self {
Self::ValidationError(vec![err.to_string()])
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
#[error("source error: {0}")]
SourceError(String),
#[error("checkpoint error: {0}")]
CheckpointError(String),
#[error("schema error: {0}")]
SchemaError(String),
#[error("validation error(s): {0:?}")]
ValidationError(Vec<String>),
#[error("configuration error: {0}")]
ConfigError(String),
#[error(transparent)]
IoError(#[from] std::io::Error),
#[error("serialization error: {0}")]
SerializationError(String),
#[error("timeout error: {0}")]
TimeoutError(String),
#[error("unrecoverable error: {0}")]
Unrecoverable(String),
#[error("state error: {0}")]
StateError(String),
#[error("transform error: {0}")]
TransformError(String),
#[error("not implemented: {0}")]
NotImplemented(String),
#[error("post-commit confirm failed (checkpoint is safe — replay is safe): {detail}")]
PostCommitConfirmFailed {
checkpoint_safe: bool,
detail: String,
},
}
impl Error {
pub fn source_error(kind: SourceErrorKind, message: impl std::fmt::Display) -> Self {
Self::SourceError(format!("[{kind:?}] {message}"))
}
pub fn kind(&self) -> ErrorKind {
match self {
Self::SourceError(_) | Self::TimeoutError(_) => ErrorKind::Transient,
Self::ConfigError(_) | Self::NotImplemented(_) => ErrorKind::Configuration,
Self::Unrecoverable(_)
| Self::CheckpointError(_)
| Self::SchemaError(_)
| Self::StateError(_)
| Self::TransformError(_)
| Self::SerializationError(_)
| Self::IoError(_)
| Self::ValidationError(_)
| Self::PostCommitConfirmFailed { .. } => ErrorKind::Terminal,
}
}
pub fn is_recoverable(&self) -> bool {
self.kind() == ErrorKind::Transient
}
}
impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
Self::SerializationError(value.to_string())
}
}
#[cfg(test)]
mod tests {
use super::{Error, ErrorKind};
#[test]
fn recoverable_flag_matches_contract() {
assert!(Error::SourceError("conn reset".into()).is_recoverable());
assert!(Error::TimeoutError("deadline exceeded".into()).is_recoverable());
assert!(!Error::ConfigError("invalid".into()).is_recoverable());
assert!(!Error::ValidationError(vec!["bad field".into()]).is_recoverable());
assert!(!Error::CheckpointError("io".into()).is_recoverable());
assert!(!Error::SchemaError("missing".into()).is_recoverable());
assert!(!Error::StateError("illegal transition".into()).is_recoverable());
assert!(!Error::TransformError("crash".into()).is_recoverable());
assert!(!Error::Unrecoverable("boom".into()).is_recoverable());
}
#[test]
fn post_commit_confirm_failed_is_terminal_and_not_recoverable() {
let err = Error::PostCommitConfirmFailed {
checkpoint_safe: true,
detail: "slot advance failed".into(),
};
assert_eq!(err.kind(), ErrorKind::Terminal);
assert!(!err.is_recoverable());
let display = err.to_string();
assert!(display.contains("checkpoint is safe"));
assert!(display.contains("replay is safe"));
}
#[test]
fn error_kind_classifies_all_variants() {
assert_eq!(Error::SourceError("x".into()).kind(), ErrorKind::Transient);
assert_eq!(Error::TimeoutError("x".into()).kind(), ErrorKind::Transient);
assert_eq!(
Error::ConfigError("x".into()).kind(),
ErrorKind::Configuration
);
assert_eq!(
Error::NotImplemented("x".into()).kind(),
ErrorKind::Configuration
);
assert_eq!(
Error::CheckpointError("x".into()).kind(),
ErrorKind::Terminal
);
assert_eq!(Error::SchemaError("x".into()).kind(), ErrorKind::Terminal);
assert_eq!(Error::StateError("x".into()).kind(), ErrorKind::Terminal);
assert_eq!(
Error::TransformError("x".into()).kind(),
ErrorKind::Terminal
);
assert_eq!(Error::Unrecoverable("x".into()).kind(), ErrorKind::Terminal);
assert_eq!(Error::ValidationError(vec![]).kind(), ErrorKind::Terminal);
assert_eq!(
Error::SerializationError("x".into()).kind(),
ErrorKind::Terminal
);
}
#[test]
fn is_recoverable_is_consistent_with_kind() {
let errors = [
Error::SourceError("x".into()),
Error::TimeoutError("x".into()),
Error::ConfigError("x".into()),
Error::CheckpointError("x".into()),
Error::StateError("x".into()),
];
for err in &errors {
assert_eq!(err.is_recoverable(), err.kind() == ErrorKind::Transient);
}
}
#[test]
fn serde_errors_map_to_serialization_errors() {
let error = serde_json::from_str::<serde_json::Value>("{").unwrap_err();
assert!(matches!(Error::from(error), Error::SerializationError(_)));
}
}