#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum ExitClass {
Generic = 1,
Retryable = 2,
DataIntegrity = 3,
SchemaDrift = 4,
}
impl ExitClass {
pub fn code(self) -> i32 {
self as i32
}
}
#[derive(Debug)]
pub struct DataIntegrityError(String);
impl DataIntegrityError {
pub fn new(message: impl Into<String>) -> Self {
Self(message.into())
}
}
impl std::fmt::Display for DataIntegrityError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl std::error::Error for DataIntegrityError {}
#[derive(Debug)]
pub struct SchemaDriftError(String);
impl SchemaDriftError {
pub fn new(message: impl Into<String>) -> Self {
Self(message.into())
}
}
impl std::fmt::Display for SchemaDriftError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl std::error::Error for SchemaDriftError {}
#[derive(Debug)]
pub struct PreclassifiedExit(pub i32);
impl std::fmt::Display for PreclassifiedExit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "child exited with status {}", self.0)
}
}
impl std::error::Error for PreclassifiedExit {}
#[derive(Debug)]
pub struct CodedError {
code: &'static str,
message: String,
}
impl CodedError {
pub fn new(code: &'static str, message: impl Into<String>) -> Self {
Self {
code,
message: message.into(),
}
}
pub fn code(&self) -> &'static str {
self.code
}
}
impl std::fmt::Display for CodedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message)
}
}
impl std::error::Error for CodedError {}
pub fn error_code(err: &anyhow::Error) -> Option<&'static str> {
if let Some(c) = err.downcast_ref::<CodedError>() {
return Some(c.code());
}
if err
.downcast_ref::<crate::source::StatementDurationTimeout>()
.is_some()
{
return Some(codes::SOURCE_STATEMENT_TIMEOUT);
}
None
}
pub fn classify_exit(err: &anyhow::Error) -> i32 {
if let Some(p) = err.downcast_ref::<PreclassifiedExit>() {
return p.0;
}
if err.downcast_ref::<SchemaDriftError>().is_some() {
return ExitClass::SchemaDrift.code();
}
if err.downcast_ref::<DataIntegrityError>().is_some()
|| err
.downcast_ref::<crate::manifest::ManifestInconsistency>()
.is_some()
{
return ExitClass::DataIntegrity.code();
}
if crate::pipeline::retry::classify_error(err).is_transient() {
return ExitClass::Retryable.code();
}
ExitClass::Generic.code()
}
pub mod codes {
pub const CONFIG_NO_EXPORTS: &str = "RIVET_CONFIG_NO_EXPORTS";
pub const CONFIG_CHUNK_COUNT_INVALID: &str = "RIVET_CONFIG_CHUNK_COUNT_INVALID";
pub const CONFIG_CHUNK_BY_DAYS_INVALID: &str = "RIVET_CONFIG_CHUNK_BY_DAYS_INVALID";
pub const CONFIG_DUPLICATE_EXPORT: &str = "RIVET_CONFIG_DUPLICATE_EXPORT";
pub const SOURCE_STATEMENT_TIMEOUT: &str = "RIVET_SOURCE_STATEMENT_TIMEOUT";
#[cfg(test)]
pub(crate) const ALL: &[&str] = &[
CONFIG_NO_EXPORTS,
CONFIG_CHUNK_COUNT_INVALID,
CONFIG_CHUNK_BY_DAYS_INVALID,
CONFIG_DUPLICATE_EXPORT,
SOURCE_STATEMENT_TIMEOUT,
];
}
#[macro_export]
macro_rules! config_bail {
($code:expr, $($arg:tt)*) => {
return ::core::result::Result::Err(::anyhow::Error::new(
$crate::error::CodedError::new($code, format!($($arg)*))))
};
}
pub type Result<T> = anyhow::Result<T>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn schema_drift_marker_classifies_to_4() {
let err: anyhow::Error = SchemaDriftError::new("schema changed").into();
assert_eq!(classify_exit(&err), 4);
assert_eq!(ExitClass::SchemaDrift.code(), 4);
}
#[test]
fn data_integrity_marker_classifies_to_3() {
let err: anyhow::Error = DataIntegrityError::new("reconcile mismatch").into();
assert_eq!(classify_exit(&err), 3);
assert_eq!(ExitClass::DataIntegrity.code(), 3);
}
#[test]
fn manifest_inconsistency_classifies_to_3() {
let err: anyhow::Error = crate::manifest::ManifestInconsistency::DuplicatePartId(1).into();
assert_eq!(
classify_exit(&err),
3,
"manifest self-consistency failure is a data-integrity stop"
);
}
#[test]
fn transient_error_classifies_to_2_syntax_error_to_1() {
let transient = anyhow::anyhow!("connection reset by peer");
assert_eq!(
classify_exit(&transient),
2,
"connection reset is retryable"
);
let syntax = anyhow::anyhow!("syntax error at or near \"SELET\"");
assert_eq!(classify_exit(&syntax), 1, "a syntax error is not retryable");
}
#[test]
fn typed_markers_survive_anyhow_context_wrapping() {
let drift: anyhow::Error = SchemaDriftError::new("drift").into();
let wrapped = drift.context("export 'orders' failed");
assert_eq!(classify_exit(&wrapped), 4);
let dup: anyhow::Error = DataIntegrityError::new("dup").into();
let wrapped = dup.context("export 'orders' failed");
assert_eq!(classify_exit(&wrapped), 3);
}
#[test]
fn run_carries_typed_marker_through_multi_failure_context() {
let dup: anyhow::Error =
DataIntegrityError::new("export 'orders': cannot safely retry (would duplicate rows)")
.into();
let aggregated = dup.context("2 export(s) failed; representative error follows (also: export 'events': connection reset)");
assert_eq!(
classify_exit(&aggregated),
3,
"the carried data-integrity marker must survive run's multi-failure context wrapping"
);
}
#[test]
fn untyped_flattened_string_is_generic_not_string_matched() {
let bare = anyhow::anyhow!("export 'orders': 1 quality check(s) failed: row_count low");
assert_eq!(
classify_exit(&bare),
1,
"an un-typed string must NOT be string-matched into data-integrity"
);
}
#[test]
fn data_integrity_marker_display_is_verbatim() {
let msg = "export 'orders': 1 quality check(s) failed";
assert_eq!(format!("{}", DataIntegrityError::new(msg)), msg);
assert_eq!(format!("{}", SchemaDriftError::new(msg)), msg);
}
#[test]
fn coded_error_codes_are_distinct_and_prefixed() {
use std::collections::HashSet;
let mut seen = HashSet::new();
for &c in codes::ALL {
assert!(seen.insert(c), "duplicate code: {c}");
assert!(
c.starts_with("RIVET_CONFIG_") || c.starts_with("RIVET_SOURCE_"),
"code {c} must share the RIVET_CONFIG_ / RIVET_SOURCE_ prefix",
);
}
}
#[test]
fn coded_error_surfaces_code_through_anyhow_context() {
let e = anyhow::Error::new(CodedError::new(
codes::CONFIG_NO_EXPORTS,
"exports: at least one export must be defined",
))
.context("while loading config");
assert_eq!(error_code(&e), Some(codes::CONFIG_NO_EXPORTS));
assert_eq!(classify_exit(&e), ExitClass::Generic.code());
assert!(format!("{e:#}").contains("at least one export must be defined"));
}
}