Skip to main content

rivet/
error.rs

1//! **Layer: Cross-cutting**
2//!
3//! Error type alias plus the **exit-code taxonomy**: a small, stable set of
4//! process exit codes so an *unattended scheduler* can branch on the failure
5//! *class* instead of grepping stderr. Before this, `main` exited `1` for every
6//! error, forcing operators to regex the error text to decide retry-vs-stop.
7
8/// Machine-actionable exit-code taxonomy.
9///
10/// A scheduler keys its retry / alert policy off the numeric exit code:
11///
12/// | code | class | scheduler action |
13/// |------|-------|------------------|
14/// | `0`  | success | — (handled separately, not in this enum) |
15/// | `1`  | [`Generic`](ExitClass::Generic): config / usage / unclassified error | fix the config; do **not** retry blindly |
16/// | `2`  | [`Retryable`](ExitClass::Retryable): transient (connection reset, lock-wait timeout, capacity) | safe to retry the *same* command |
17/// | `3`  | [`DataIntegrity`](ExitClass::DataIntegrity): quality gate / reconcile mismatch / `validate` verification failure / duplicate-guard / manifest inconsistency | **STOP** — data may be wrong, do **not** blindly retry |
18/// | `4`  | [`SchemaDrift`](ExitClass::SchemaDrift): `on_schema_drift: fail` tripped | the source shape changed — needs human review |
19///
20/// ## Overlap with clap's usage exit (also `2`)
21///
22/// clap exits `2` on an argument-parse error (bad flag, missing required arg).
23/// That collides numerically with [`Retryable`](ExitClass::Retryable) `= 2`, but
24/// the two are distinguishable: clap's exit happens **pre-dispatch**, before any
25/// `rivet` work runs, so it prints *only* a clap usage block and **no** `Error:`
26/// line. A retryable rivet failure always prints an `Error: …` line (or a JSON
27/// object with `"exit_class": 2`). We deliberately do not fight clap by remapping
28/// our retryable code — `2 = retryable` matches the spec, and the usage overlap
29/// is documented and detectable by the absence of a rivet error line.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31#[repr(i32)]
32pub enum ExitClass {
33    /// `1` — config / usage / unclassified error. Fix the input; retrying the
34    /// identical command will fail the same way.
35    Generic = 1,
36    /// `2` — transient failure (connection reset, lock-wait timeout, capacity).
37    /// Safe to retry the same command after a backoff.
38    Retryable = 2,
39    /// `3` — data-integrity failure (quality gate, reconcile mismatch, `validate`
40    /// verification failure, duplicate-guard, manifest inconsistency). The
41    /// exported data may be wrong; **stop** and investigate rather than retry.
42    DataIntegrity = 3,
43    /// `4` — schema-drift failure (`on_schema_drift: fail` tripped). The source
44    /// shape changed; a human must review before re-running.
45    SchemaDrift = 4,
46}
47
48impl ExitClass {
49    /// The process exit code for this class.
50    pub fn code(self) -> i32 {
51        self as i32
52    }
53}
54
55/// Typed marker for a **data-integrity** failure (exit `3`).
56///
57/// Mirrors [`crate::source::StatementDurationTimeout`]: the *type*, not the
58/// wording, carries the classification. [`classify_exit`] downcasts it through
59/// the anyhow chain, so a reworded human message never silently flips the exit
60/// code. Constructed at the data-integrity bail sites (quality-gate failure,
61/// duplicate-guard) wrapping the existing message verbatim — `Display`
62/// reproduces the original text unchanged, so operator-facing output is
63/// identical.
64#[derive(Debug)]
65pub struct DataIntegrityError(String);
66
67impl DataIntegrityError {
68    /// Wrap an existing human-facing message as a data-integrity failure.
69    /// The message text is preserved verbatim for `Display`.
70    pub fn new(message: impl Into<String>) -> Self {
71        Self(message.into())
72    }
73}
74
75impl std::fmt::Display for DataIntegrityError {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.write_str(&self.0)
78    }
79}
80
81impl std::error::Error for DataIntegrityError {}
82
83/// Typed marker for a **schema-drift** failure (exit `4`).
84///
85/// Same contract as [`DataIntegrityError`]: classification rides on the type via
86/// downcast, `Display` reproduces the original message verbatim. Constructed
87/// where `on_schema_drift: fail` aborts the run.
88#[derive(Debug)]
89pub struct SchemaDriftError(String);
90
91impl SchemaDriftError {
92    /// Wrap an existing human-facing message as a schema-drift failure.
93    /// The message text is preserved verbatim for `Display`.
94    pub fn new(message: impl Into<String>) -> Self {
95        Self(message.into())
96    }
97}
98
99impl std::fmt::Display for SchemaDriftError {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.write_str(&self.0)
102    }
103}
104
105impl std::error::Error for SchemaDriftError {}
106
107/// Typed marker carrying an **already-decided** process exit code.
108///
109/// A parallel-export child runs in its own process, classifies its own failure,
110/// and exits with that code; the typed marker itself cannot cross the process
111/// boundary — only the integer code does. The parent wraps the aggregate failure
112/// in this marker so [`classify_exit`] re-derives the SAME class instead of
113/// stringifying `"exited with status 3"` and collapsing it to a generic `1`.
114#[derive(Debug)]
115pub struct PreclassifiedExit(pub i32);
116
117impl std::fmt::Display for PreclassifiedExit {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        write!(f, "child exited with status {}", self.0)
120    }
121}
122
123impl std::error::Error for PreclassifiedExit {}
124
125/// Typed marker carrying a **stable error code** (`RIVET_CONFIG_*` /
126/// `RIVET_SOURCE_*`), for config / source failures that an operator's tooling
127/// greps by code rather than by wording.
128///
129/// Same contract as [`DataIntegrityError`]: the code rides on the type via
130/// downcast (so a reworded message never moves the code), and `Display`
131/// reproduces the wrapped message verbatim — the console line is unchanged except
132/// for the `[CODE]` prefix `main` adds. [`error_code`] reads `code` for the JSON
133/// `code` field + the text prefix.
134///
135/// A `CodedError` is always exit class `Generic` (config / usage — fix it, don't
136/// retry), which is already [`classify_exit`]'s default, so it carries no class
137/// and needs no downcast arm there. The first coded error that needs a
138/// non-`Generic` class (e.g. a retryable source failure) is where a class field
139/// would be reintroduced — until then it is dead weight.
140#[derive(Debug)]
141pub struct CodedError {
142    code: &'static str,
143    message: String,
144}
145
146impl CodedError {
147    /// Wrap a human-facing message with a stable `RIVET_*` code.
148    pub fn new(code: &'static str, message: impl Into<String>) -> Self {
149        Self {
150            code,
151            message: message.into(),
152        }
153    }
154
155    /// The stable `RIVET_*` code.
156    pub fn code(&self) -> &'static str {
157        self.code
158    }
159}
160
161impl std::fmt::Display for CodedError {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.write_str(&self.message)
164    }
165}
166
167impl std::error::Error for CodedError {}
168
169/// The stable `RIVET_*` error code for a failure, if one was tagged via
170/// [`CodedError`] anywhere in the anyhow context chain. `main` surfaces it as the
171/// JSON `code` field and a `[CODE]` prefix on the text error line.
172pub fn error_code(err: &anyhow::Error) -> Option<&'static str> {
173    if let Some(c) = err.downcast_ref::<CodedError>() {
174        return Some(c.code());
175    }
176    // The existing source-side statement-timeout marker also gets a stable code,
177    // so the long-query failure an operator's `statement_timeout` tooling watches
178    // for is greppable without re-tagging its construction site.
179    if err
180        .downcast_ref::<crate::source::StatementDurationTimeout>()
181        .is_some()
182    {
183        return Some(codes::SOURCE_STATEMENT_TIMEOUT);
184    }
185    None
186}
187
188/// Map an error to its process exit code per the [`ExitClass`] taxonomy.
189///
190/// Precedence (first match wins):
191/// 1. [`SchemaDriftError`] downcast → `4`.
192/// 2. [`DataIntegrityError`] **or** [`crate::manifest::ManifestInconsistency`]
193///    downcast → `3`.
194/// 3. otherwise, if [`crate::pipeline::retry::classify_error`] says the error is
195///    transient → `2`.
196/// 4. otherwise → `1` (generic).
197///
198/// ## Why a string bridge for the aggregated `run` path
199///
200/// The single-export `apply` path returns the typed marker straight to `main`,
201/// so the downcasts below fire directly. The multi-export `run` path used to
202/// flatten per-export failures into a `Vec<String>` and re-raise a fresh
203/// `anyhow!`, erasing the concrete type — which once forced a substring bridge
204/// here. `pipeline::run` now carries a **representative typed failure** instead
205/// (the most stop-worthy class among the failures), so the marker survives and
206/// the downcasts work for `rivet run` too. Classification is therefore purely
207/// type-driven: an un-typed data-integrity / drift failure classifies as
208/// `Generic` on purpose — a *visible* signal that a marker was dropped upstream,
209/// rather than being silently rescued by string matching.
210pub fn classify_exit(err: &anyhow::Error) -> i32 {
211    // Each check downcasts through anyhow's context chain.
212    // A child process already classified itself and exited with that code; honor
213    // it verbatim (parallel-export path) so the parent surfaces the same class.
214    if let Some(p) = err.downcast_ref::<PreclassifiedExit>() {
215        return p.0;
216    }
217    // A `CodedError` (config validation) is always exit class `Generic`, which is
218    // this function's default below — so it needs no arm of its own here.
219    if err.downcast_ref::<SchemaDriftError>().is_some() {
220        return ExitClass::SchemaDrift.code();
221    }
222    if err.downcast_ref::<DataIntegrityError>().is_some()
223        || err
224            .downcast_ref::<crate::manifest::ManifestInconsistency>()
225            .is_some()
226    {
227        return ExitClass::DataIntegrity.code();
228    }
229    if crate::pipeline::retry::classify_error(err).is_transient() {
230        return ExitClass::Retryable.code();
231    }
232    ExitClass::Generic.code()
233}
234
235/// Stable, greppable error codes carried by [`CodedError`]. A scheduler / CI step
236/// matches on these (the JSON `code` field or the `[CODE]` text prefix) instead
237/// of the human wording, which is free to change. Every code shares the
238/// `RIVET_CONFIG_` or `RIVET_SOURCE_` prefix; the `codes_*` guard tests assert
239/// distinctness + the prefix, mirroring the verify-layer `RIVET_VERIFY_*` guard.
240pub mod codes {
241    // Config validation — always exit class Generic (`1`): fix the file, no retry.
242    pub const CONFIG_NO_EXPORTS: &str = "RIVET_CONFIG_NO_EXPORTS";
243    pub const CONFIG_CHUNK_COUNT_INVALID: &str = "RIVET_CONFIG_CHUNK_COUNT_INVALID";
244    pub const CONFIG_CHUNK_BY_DAYS_INVALID: &str = "RIVET_CONFIG_CHUNK_BY_DAYS_INVALID";
245    pub const CONFIG_DUPLICATE_EXPORT: &str = "RIVET_CONFIG_DUPLICATE_EXPORT";
246
247    // Source — a statement that ran past the configured duration cap. Carried by
248    // the existing `source::StatementDurationTimeout` marker (recognised in
249    // [`super::error_code`]), so the long-query failure an operator's
250    // `statement_timeout` tooling watches for has a stable code without
251    // re-tagging its construction site. (Connect / auth codes are a deliberate
252    // follow-up: tagging them at `create_source` must preserve the retry path's
253    // transient classification — wrapping the driver error there can blind
254    // `classify_error` and regress retries, so it needs its own careful change.)
255    pub const SOURCE_STATEMENT_TIMEOUT: &str = "RIVET_SOURCE_STATEMENT_TIMEOUT";
256
257    /// Every code, for the stability/uniqueness guard test.
258    #[cfg(test)]
259    pub(crate) const ALL: &[&str] = &[
260        CONFIG_NO_EXPORTS,
261        CONFIG_CHUNK_COUNT_INVALID,
262        CONFIG_CHUNK_BY_DAYS_INVALID,
263        CONFIG_DUPLICATE_EXPORT,
264        SOURCE_STATEMENT_TIMEOUT,
265    ];
266}
267
268/// `return Err`-style bail with a stable `RIVET_CONFIG_*` code (exit class
269/// Generic). Drop-in for `anyhow::bail!` at a config-validation site — the
270/// message text is unchanged; only a typed code rides alongside it.
271#[macro_export]
272macro_rules! config_bail {
273    ($code:expr, $($arg:tt)*) => {
274        return ::core::result::Result::Err(::anyhow::Error::new(
275            $crate::error::CodedError::new($code, format!($($arg)*))))
276    };
277}
278
279pub type Result<T> = anyhow::Result<T>;
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284
285    #[test]
286    fn schema_drift_marker_classifies_to_4() {
287        let err: anyhow::Error = SchemaDriftError::new("schema changed").into();
288        assert_eq!(classify_exit(&err), 4);
289        assert_eq!(ExitClass::SchemaDrift.code(), 4);
290    }
291
292    #[test]
293    fn data_integrity_marker_classifies_to_3() {
294        let err: anyhow::Error = DataIntegrityError::new("reconcile mismatch").into();
295        assert_eq!(classify_exit(&err), 3);
296        assert_eq!(ExitClass::DataIntegrity.code(), 3);
297    }
298
299    #[test]
300    fn manifest_inconsistency_classifies_to_3() {
301        let err: anyhow::Error = crate::manifest::ManifestInconsistency::DuplicatePartId(1).into();
302        assert_eq!(
303            classify_exit(&err),
304            3,
305            "manifest self-consistency failure is a data-integrity stop"
306        );
307    }
308
309    #[test]
310    fn transient_error_classifies_to_2_syntax_error_to_1() {
311        // Transient (string fallback in retry::classify_error) → retryable.
312        let transient = anyhow::anyhow!("connection reset by peer");
313        assert_eq!(
314            classify_exit(&transient),
315            2,
316            "connection reset is retryable"
317        );
318
319        // Permanent / generic → 1.
320        let syntax = anyhow::anyhow!("syntax error at or near \"SELET\"");
321        assert_eq!(classify_exit(&syntax), 1, "a syntax error is not retryable");
322    }
323
324    #[test]
325    fn typed_markers_survive_anyhow_context_wrapping() {
326        // The downcast walks the chain, so a context-wrapped marker still
327        // classifies by type (the `apply` path wraps with context on the way up).
328        let drift: anyhow::Error = SchemaDriftError::new("drift").into();
329        let wrapped = drift.context("export 'orders' failed");
330        assert_eq!(classify_exit(&wrapped), 4);
331
332        let dup: anyhow::Error = DataIntegrityError::new("dup").into();
333        let wrapped = dup.context("export 'orders' failed");
334        assert_eq!(classify_exit(&wrapped), 3);
335    }
336
337    #[test]
338    fn run_carries_typed_marker_through_multi_failure_context() {
339        // `pipeline::run`'s multi-failure path returns the representative typed
340        // failure wrapped in a context string listing the others. The marker
341        // must still downcast through that context so the exit class is right.
342        let dup: anyhow::Error =
343            DataIntegrityError::new("export 'orders': cannot safely retry (would duplicate rows)")
344                .into();
345        let aggregated = dup.context("2 export(s) failed; representative error follows (also: export 'events': connection reset)");
346        assert_eq!(
347            classify_exit(&aggregated),
348            3,
349            "the carried data-integrity marker must survive run's multi-failure context wrapping"
350        );
351    }
352
353    #[test]
354    fn untyped_flattened_string_is_generic_not_string_matched() {
355        // Deliberate behavior change: classification is type-driven only. A bare
356        // string that merely *reads* like a quality-gate failure (no marker) is
357        // Generic — a visible signal a marker was dropped, not a silent rescue.
358        let bare = anyhow::anyhow!("export 'orders': 1 quality check(s) failed: row_count low");
359        assert_eq!(
360            classify_exit(&bare),
361            1,
362            "an un-typed string must NOT be string-matched into data-integrity"
363        );
364    }
365
366    #[test]
367    fn data_integrity_marker_display_is_verbatim() {
368        // The marker must reproduce the wrapped message byte-for-byte so the
369        // operator-facing error line is unchanged from before the type existed.
370        let msg = "export 'orders': 1 quality check(s) failed";
371        assert_eq!(format!("{}", DataIntegrityError::new(msg)), msg);
372        assert_eq!(format!("{}", SchemaDriftError::new(msg)), msg);
373    }
374
375    #[test]
376    fn coded_error_codes_are_distinct_and_prefixed() {
377        use std::collections::HashSet;
378        let mut seen = HashSet::new();
379        for &c in codes::ALL {
380            assert!(seen.insert(c), "duplicate code: {c}");
381            assert!(
382                c.starts_with("RIVET_CONFIG_") || c.starts_with("RIVET_SOURCE_"),
383                "code {c} must share the RIVET_CONFIG_ / RIVET_SOURCE_ prefix",
384            );
385        }
386    }
387
388    #[test]
389    fn coded_error_surfaces_code_through_anyhow_context() {
390        // The code rides on the type through `.context()`; `Display` is the
391        // verbatim message (operator output unchanged but for the `[CODE]` prefix).
392        // `classify_exit` returns `Generic` via its default (no `CodedError` arm),
393        // proving the dropped `class` field changed nothing.
394        let e = anyhow::Error::new(CodedError::new(
395            codes::CONFIG_NO_EXPORTS,
396            "exports: at least one export must be defined",
397        ))
398        .context("while loading config");
399        assert_eq!(error_code(&e), Some(codes::CONFIG_NO_EXPORTS));
400        assert_eq!(classify_exit(&e), ExitClass::Generic.code());
401        assert!(format!("{e:#}").contains("at least one export must be defined"));
402    }
403}