rivet/error.rs
1//! **Layer: Cross-cutting**
2//!
3//! Error type alias plus the **exit-code taxonomy**: a small, stable set of
4//! process exit codes so an *unattended scheduler* can branch on the failure
5//! *class* instead of grepping stderr. Before this, `main` exited `1` for every
6//! error, forcing operators to regex the error text to decide retry-vs-stop.
7
8/// Machine-actionable exit-code taxonomy.
9///
10/// A scheduler keys its retry / alert policy off the numeric exit code:
11///
12/// | code | class | scheduler action |
13/// |------|-------|------------------|
14/// | `0` | success | — (handled separately, not in this enum) |
15/// | `1` | [`Generic`](ExitClass::Generic): config / usage / unclassified error | fix the config; do **not** retry blindly |
16/// | `2` | [`Retryable`](ExitClass::Retryable): transient (connection reset, lock-wait timeout, capacity) | safe to retry the *same* command |
17/// | `3` | [`DataIntegrity`](ExitClass::DataIntegrity): quality gate / reconcile mismatch / `validate` verification failure / duplicate-guard / manifest inconsistency | **STOP** — data may be wrong, do **not** blindly retry |
18/// | `4` | [`SchemaDrift`](ExitClass::SchemaDrift): `on_schema_drift: fail` tripped | the source shape changed — needs human review |
19///
20/// ## Overlap with clap's usage exit (also `2`)
21///
22/// clap exits `2` on an argument-parse error (bad flag, missing required arg).
23/// That collides numerically with [`Retryable`](ExitClass::Retryable) `= 2`, but
24/// the two are distinguishable: clap's exit happens **pre-dispatch**, before any
25/// `rivet` work runs, so it prints *only* a clap usage block and **no** `Error:`
26/// line. A retryable rivet failure always prints an `Error: …` line (or a JSON
27/// object with `"exit_class": 2`). We deliberately do not fight clap by remapping
28/// our retryable code — `2 = retryable` matches the spec, and the usage overlap
29/// is documented and detectable by the absence of a rivet error line.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31#[repr(i32)]
32pub enum ExitClass {
33 /// `1` — config / usage / unclassified error. Fix the input; retrying the
34 /// identical command will fail the same way.
35 Generic = 1,
36 /// `2` — transient failure (connection reset, lock-wait timeout, capacity).
37 /// Safe to retry the same command after a backoff.
38 Retryable = 2,
39 /// `3` — data-integrity failure (quality gate, reconcile mismatch, `validate`
40 /// verification failure, duplicate-guard, manifest inconsistency). The
41 /// exported data may be wrong; **stop** and investigate rather than retry.
42 DataIntegrity = 3,
43 /// `4` — schema-drift failure (`on_schema_drift: fail` tripped). The source
44 /// shape changed; a human must review before re-running.
45 SchemaDrift = 4,
46}
47
48impl ExitClass {
49 /// The process exit code for this class.
50 pub fn code(self) -> i32 {
51 self as i32
52 }
53}
54
55/// Typed marker for a **data-integrity** failure (exit `3`).
56///
57/// Mirrors [`crate::source::StatementDurationTimeout`]: the *type*, not the
58/// wording, carries the classification. [`classify_exit`] downcasts it through
59/// the anyhow chain, so a reworded human message never silently flips the exit
60/// code. Constructed at the data-integrity bail sites (quality-gate failure,
61/// duplicate-guard) wrapping the existing message verbatim — `Display`
62/// reproduces the original text unchanged, so operator-facing output is
63/// identical.
64#[derive(Debug)]
65pub struct DataIntegrityError(String);
66
67impl DataIntegrityError {
68 /// Wrap an existing human-facing message as a data-integrity failure.
69 /// The message text is preserved verbatim for `Display`.
70 pub fn new(message: impl Into<String>) -> Self {
71 Self(message.into())
72 }
73}
74
75impl std::fmt::Display for DataIntegrityError {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str(&self.0)
78 }
79}
80
81impl std::error::Error for DataIntegrityError {}
82
83/// Typed marker for a **schema-drift** failure (exit `4`).
84///
85/// Same contract as [`DataIntegrityError`]: classification rides on the type via
86/// downcast, `Display` reproduces the original message verbatim. Constructed
87/// where `on_schema_drift: fail` aborts the run.
88#[derive(Debug)]
89pub struct SchemaDriftError(String);
90
91impl SchemaDriftError {
92 /// Wrap an existing human-facing message as a schema-drift failure.
93 /// The message text is preserved verbatim for `Display`.
94 pub fn new(message: impl Into<String>) -> Self {
95 Self(message.into())
96 }
97}
98
99impl std::fmt::Display for SchemaDriftError {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.write_str(&self.0)
102 }
103}
104
105impl std::error::Error for SchemaDriftError {}
106
107/// Typed marker carrying an **already-decided** process exit code.
108///
109/// A parallel-export child runs in its own process, classifies its own failure,
110/// and exits with that code; the typed marker itself cannot cross the process
111/// boundary — only the integer code does. The parent wraps the aggregate failure
112/// in this marker so [`classify_exit`] re-derives the SAME class instead of
113/// stringifying `"exited with status 3"` and collapsing it to a generic `1`.
114#[derive(Debug)]
115pub struct PreclassifiedExit(pub i32);
116
117impl std::fmt::Display for PreclassifiedExit {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 write!(f, "child exited with status {}", self.0)
120 }
121}
122
123impl std::error::Error for PreclassifiedExit {}
124
125/// Typed marker carrying a **stable error code** (`RIVET_CONFIG_*` /
126/// `RIVET_SOURCE_*`), for config / source failures that an operator's tooling
127/// greps by code rather than by wording.
128///
129/// Same contract as [`DataIntegrityError`]: the code rides on the type via
130/// downcast (so a reworded message never moves the code), and `Display`
131/// reproduces the wrapped message verbatim — the console line is unchanged except
132/// for the `[CODE]` prefix `main` adds. [`error_code`] reads `code` for the JSON
133/// `code` field + the text prefix.
134///
135/// A `CodedError` is always exit class `Generic` (config / usage — fix it, don't
136/// retry), which is already [`classify_exit`]'s default, so it carries no class
137/// and needs no downcast arm there. The first coded error that needs a
138/// non-`Generic` class (e.g. a retryable source failure) is where a class field
139/// would be reintroduced — until then it is dead weight.
140#[derive(Debug)]
141pub struct CodedError {
142 code: &'static str,
143 message: String,
144}
145
146impl CodedError {
147 /// Wrap a human-facing message with a stable `RIVET_*` code.
148 pub fn new(code: &'static str, message: impl Into<String>) -> Self {
149 Self {
150 code,
151 message: message.into(),
152 }
153 }
154
155 /// The stable `RIVET_*` code.
156 pub fn code(&self) -> &'static str {
157 self.code
158 }
159}
160
161impl std::fmt::Display for CodedError {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.write_str(&self.message)
164 }
165}
166
167impl std::error::Error for CodedError {}
168
169/// The stable `RIVET_*` error code for a failure, if one was tagged via
170/// [`CodedError`] anywhere in the anyhow context chain. `main` surfaces it as the
171/// JSON `code` field and a `[CODE]` prefix on the text error line.
172pub fn error_code(err: &anyhow::Error) -> Option<&'static str> {
173 if let Some(c) = err.downcast_ref::<CodedError>() {
174 return Some(c.code());
175 }
176 // The existing source-side statement-timeout marker also gets a stable code,
177 // so the long-query failure an operator's `statement_timeout` tooling watches
178 // for is greppable without re-tagging its construction site.
179 if err
180 .downcast_ref::<crate::source::StatementDurationTimeout>()
181 .is_some()
182 {
183 return Some(codes::SOURCE_STATEMENT_TIMEOUT);
184 }
185 None
186}
187
188/// Map an error to its process exit code per the [`ExitClass`] taxonomy.
189///
190/// Precedence (first match wins):
191/// 1. [`SchemaDriftError`] downcast → `4`.
192/// 2. [`DataIntegrityError`] **or** [`crate::manifest::ManifestInconsistency`]
193/// downcast → `3`.
194/// 3. otherwise, if [`crate::pipeline::retry::classify_error`] says the error is
195/// transient → `2`.
196/// 4. otherwise → `1` (generic).
197///
198/// ## Why a string bridge for the aggregated `run` path
199///
200/// The single-export `apply` path returns the typed marker straight to `main`,
201/// so the downcasts below fire directly. The multi-export `run` path used to
202/// flatten per-export failures into a `Vec<String>` and re-raise a fresh
203/// `anyhow!`, erasing the concrete type — which once forced a substring bridge
204/// here. `pipeline::run` now carries a **representative typed failure** instead
205/// (the most stop-worthy class among the failures), so the marker survives and
206/// the downcasts work for `rivet run` too. Classification is therefore purely
207/// type-driven: an un-typed data-integrity / drift failure classifies as
208/// `Generic` on purpose — a *visible* signal that a marker was dropped upstream,
209/// rather than being silently rescued by string matching.
210pub fn classify_exit(err: &anyhow::Error) -> i32 {
211 // Each check downcasts through anyhow's context chain.
212 // A child process already classified itself and exited with that code; honor
213 // it verbatim (parallel-export path) so the parent surfaces the same class.
214 if let Some(p) = err.downcast_ref::<PreclassifiedExit>() {
215 return p.0;
216 }
217 // A `CodedError` (config validation) is always exit class `Generic`, which is
218 // this function's default below — so it needs no arm of its own here.
219 if err.downcast_ref::<SchemaDriftError>().is_some() {
220 return ExitClass::SchemaDrift.code();
221 }
222 if err.downcast_ref::<DataIntegrityError>().is_some()
223 || err
224 .downcast_ref::<crate::manifest::ManifestInconsistency>()
225 .is_some()
226 {
227 return ExitClass::DataIntegrity.code();
228 }
229 if crate::pipeline::retry::classify_error(err).is_transient() {
230 return ExitClass::Retryable.code();
231 }
232 ExitClass::Generic.code()
233}
234
235/// Stable, greppable error codes carried by [`CodedError`]. A scheduler / CI step
236/// matches on these (the JSON `code` field or the `[CODE]` text prefix) instead
237/// of the human wording, which is free to change. Every code shares the
238/// `RIVET_CONFIG_` or `RIVET_SOURCE_` prefix; the `codes_*` guard tests assert
239/// distinctness + the prefix, mirroring the verify-layer `RIVET_VERIFY_*` guard.
240pub mod codes {
241 // Config validation — always exit class Generic (`1`): fix the file, no retry.
242 pub const CONFIG_NO_EXPORTS: &str = "RIVET_CONFIG_NO_EXPORTS";
243 pub const CONFIG_CHUNK_COUNT_INVALID: &str = "RIVET_CONFIG_CHUNK_COUNT_INVALID";
244 pub const CONFIG_CHUNK_BY_DAYS_INVALID: &str = "RIVET_CONFIG_CHUNK_BY_DAYS_INVALID";
245 pub const CONFIG_DUPLICATE_EXPORT: &str = "RIVET_CONFIG_DUPLICATE_EXPORT";
246
247 // Source — a statement that ran past the configured duration cap. Carried by
248 // the existing `source::StatementDurationTimeout` marker (recognised in
249 // [`super::error_code`]), so the long-query failure an operator's
250 // `statement_timeout` tooling watches for has a stable code without
251 // re-tagging its construction site. (Connect / auth codes are a deliberate
252 // follow-up: tagging them at `create_source` must preserve the retry path's
253 // transient classification — wrapping the driver error there can blind
254 // `classify_error` and regress retries, so it needs its own careful change.)
255 pub const SOURCE_STATEMENT_TIMEOUT: &str = "RIVET_SOURCE_STATEMENT_TIMEOUT";
256
257 /// Every code, for the stability/uniqueness guard test.
258 #[cfg(test)]
259 pub(crate) const ALL: &[&str] = &[
260 CONFIG_NO_EXPORTS,
261 CONFIG_CHUNK_COUNT_INVALID,
262 CONFIG_CHUNK_BY_DAYS_INVALID,
263 CONFIG_DUPLICATE_EXPORT,
264 SOURCE_STATEMENT_TIMEOUT,
265 ];
266}
267
268/// `return Err`-style bail with a stable `RIVET_CONFIG_*` code (exit class
269/// Generic). Drop-in for `anyhow::bail!` at a config-validation site — the
270/// message text is unchanged; only a typed code rides alongside it.
271#[macro_export]
272macro_rules! config_bail {
273 ($code:expr, $($arg:tt)*) => {
274 return ::core::result::Result::Err(::anyhow::Error::new(
275 $crate::error::CodedError::new($code, format!($($arg)*))))
276 };
277}
278
279pub type Result<T> = anyhow::Result<T>;
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284
285 #[test]
286 fn schema_drift_marker_classifies_to_4() {
287 let err: anyhow::Error = SchemaDriftError::new("schema changed").into();
288 assert_eq!(classify_exit(&err), 4);
289 assert_eq!(ExitClass::SchemaDrift.code(), 4);
290 }
291
292 #[test]
293 fn data_integrity_marker_classifies_to_3() {
294 let err: anyhow::Error = DataIntegrityError::new("reconcile mismatch").into();
295 assert_eq!(classify_exit(&err), 3);
296 assert_eq!(ExitClass::DataIntegrity.code(), 3);
297 }
298
299 #[test]
300 fn manifest_inconsistency_classifies_to_3() {
301 let err: anyhow::Error = crate::manifest::ManifestInconsistency::DuplicatePartId(1).into();
302 assert_eq!(
303 classify_exit(&err),
304 3,
305 "manifest self-consistency failure is a data-integrity stop"
306 );
307 }
308
309 #[test]
310 fn transient_error_classifies_to_2_syntax_error_to_1() {
311 // Transient (string fallback in retry::classify_error) → retryable.
312 let transient = anyhow::anyhow!("connection reset by peer");
313 assert_eq!(
314 classify_exit(&transient),
315 2,
316 "connection reset is retryable"
317 );
318
319 // Permanent / generic → 1.
320 let syntax = anyhow::anyhow!("syntax error at or near \"SELET\"");
321 assert_eq!(classify_exit(&syntax), 1, "a syntax error is not retryable");
322 }
323
324 #[test]
325 fn typed_markers_survive_anyhow_context_wrapping() {
326 // The downcast walks the chain, so a context-wrapped marker still
327 // classifies by type (the `apply` path wraps with context on the way up).
328 let drift: anyhow::Error = SchemaDriftError::new("drift").into();
329 let wrapped = drift.context("export 'orders' failed");
330 assert_eq!(classify_exit(&wrapped), 4);
331
332 let dup: anyhow::Error = DataIntegrityError::new("dup").into();
333 let wrapped = dup.context("export 'orders' failed");
334 assert_eq!(classify_exit(&wrapped), 3);
335 }
336
337 #[test]
338 fn run_carries_typed_marker_through_multi_failure_context() {
339 // `pipeline::run`'s multi-failure path returns the representative typed
340 // failure wrapped in a context string listing the others. The marker
341 // must still downcast through that context so the exit class is right.
342 let dup: anyhow::Error =
343 DataIntegrityError::new("export 'orders': cannot safely retry (would duplicate rows)")
344 .into();
345 let aggregated = dup.context("2 export(s) failed; representative error follows (also: export 'events': connection reset)");
346 assert_eq!(
347 classify_exit(&aggregated),
348 3,
349 "the carried data-integrity marker must survive run's multi-failure context wrapping"
350 );
351 }
352
353 #[test]
354 fn untyped_flattened_string_is_generic_not_string_matched() {
355 // Deliberate behavior change: classification is type-driven only. A bare
356 // string that merely *reads* like a quality-gate failure (no marker) is
357 // Generic — a visible signal a marker was dropped, not a silent rescue.
358 let bare = anyhow::anyhow!("export 'orders': 1 quality check(s) failed: row_count low");
359 assert_eq!(
360 classify_exit(&bare),
361 1,
362 "an un-typed string must NOT be string-matched into data-integrity"
363 );
364 }
365
366 #[test]
367 fn data_integrity_marker_display_is_verbatim() {
368 // The marker must reproduce the wrapped message byte-for-byte so the
369 // operator-facing error line is unchanged from before the type existed.
370 let msg = "export 'orders': 1 quality check(s) failed";
371 assert_eq!(format!("{}", DataIntegrityError::new(msg)), msg);
372 assert_eq!(format!("{}", SchemaDriftError::new(msg)), msg);
373 }
374
375 #[test]
376 fn coded_error_codes_are_distinct_and_prefixed() {
377 use std::collections::HashSet;
378 let mut seen = HashSet::new();
379 for &c in codes::ALL {
380 assert!(seen.insert(c), "duplicate code: {c}");
381 assert!(
382 c.starts_with("RIVET_CONFIG_") || c.starts_with("RIVET_SOURCE_"),
383 "code {c} must share the RIVET_CONFIG_ / RIVET_SOURCE_ prefix",
384 );
385 }
386 }
387
388 #[test]
389 fn coded_error_surfaces_code_through_anyhow_context() {
390 // The code rides on the type through `.context()`; `Display` is the
391 // verbatim message (operator output unchanged but for the `[CODE]` prefix).
392 // `classify_exit` returns `Generic` via its default (no `CodedError` arm),
393 // proving the dropped `class` field changed nothing.
394 let e = anyhow::Error::new(CodedError::new(
395 codes::CONFIG_NO_EXPORTS,
396 "exports: at least one export must be defined",
397 ))
398 .context("while loading config");
399 assert_eq!(error_code(&e), Some(codes::CONFIG_NO_EXPORTS));
400 assert_eq!(classify_exit(&e), ExitClass::Generic.code());
401 assert!(format!("{e:#}").contains("at least one export must be defined"));
402 }
403}