rivet-cli 0.16.3

Rivet: PostgreSQL/MySQL/SQL Server → Parquet/CSV (local, S3, GCS, Azure). Crate name rivet-cli; binary rivet.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! **Layer: Cross-cutting**
//!
//! Error type alias plus the **exit-code taxonomy**: a small, stable set of
//! process exit codes so an *unattended scheduler* can branch on the failure
//! *class* instead of grepping stderr. Before this, `main` exited `1` for every
//! error, forcing operators to regex the error text to decide retry-vs-stop.

/// Machine-actionable exit-code taxonomy.
///
/// A scheduler keys its retry / alert policy off the numeric exit code:
///
/// | code | class | scheduler action |
/// |------|-------|------------------|
/// | `0`  | success | — (handled separately, not in this enum) |
/// | `1`  | [`Generic`](ExitClass::Generic): config / usage / unclassified error | fix the config; do **not** retry blindly |
/// | `2`  | [`Retryable`](ExitClass::Retryable): transient (connection reset, lock-wait timeout, capacity) | safe to retry the *same* command |
/// | `3`  | [`DataIntegrity`](ExitClass::DataIntegrity): quality gate / reconcile mismatch / `validate` verification failure / duplicate-guard / manifest inconsistency | **STOP** — data may be wrong, do **not** blindly retry |
/// | `4`  | [`SchemaDrift`](ExitClass::SchemaDrift): `on_schema_drift: fail` tripped | the source shape changed — needs human review |
///
/// ## Overlap with clap's usage exit (also `2`)
///
/// clap exits `2` on an argument-parse error (bad flag, missing required arg).
/// That collides numerically with [`Retryable`](ExitClass::Retryable) `= 2`, but
/// the two are distinguishable: clap's exit happens **pre-dispatch**, before any
/// `rivet` work runs, so it prints *only* a clap usage block and **no** `Error:`
/// line. A retryable rivet failure always prints an `Error: …` line (or a JSON
/// object with `"exit_class": 2`). We deliberately do not fight clap by remapping
/// our retryable code — `2 = retryable` matches the spec, and the usage overlap
/// is documented and detectable by the absence of a rivet error line.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum ExitClass {
    /// `1` — config / usage / unclassified error. Fix the input; retrying the
    /// identical command will fail the same way.
    Generic = 1,
    /// `2` — transient failure (connection reset, lock-wait timeout, capacity).
    /// Safe to retry the same command after a backoff.
    Retryable = 2,
    /// `3` — data-integrity failure (quality gate, reconcile mismatch, `validate`
    /// verification failure, duplicate-guard, manifest inconsistency). The
    /// exported data may be wrong; **stop** and investigate rather than retry.
    DataIntegrity = 3,
    /// `4` — schema-drift failure (`on_schema_drift: fail` tripped). The source
    /// shape changed; a human must review before re-running.
    SchemaDrift = 4,
}

impl ExitClass {
    /// The process exit code for this class.
    pub fn code(self) -> i32 {
        self as i32
    }
}

/// Typed marker for a **data-integrity** failure (exit `3`).
///
/// Mirrors [`crate::source::StatementDurationTimeout`]: the *type*, not the
/// wording, carries the classification. [`classify_exit`] downcasts it through
/// the anyhow chain, so a reworded human message never silently flips the exit
/// code. Constructed at the data-integrity bail sites (quality-gate failure,
/// duplicate-guard) wrapping the existing message verbatim — `Display`
/// reproduces the original text unchanged, so operator-facing output is
/// identical.
#[derive(Debug)]
pub struct DataIntegrityError(String);

impl DataIntegrityError {
    /// Wrap an existing human-facing message as a data-integrity failure.
    /// The message text is preserved verbatim for `Display`.
    pub fn new(message: impl Into<String>) -> Self {
        Self(message.into())
    }
}

impl std::fmt::Display for DataIntegrityError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(&self.0)
    }
}

impl std::error::Error for DataIntegrityError {}

/// Typed marker for a **schema-drift** failure (exit `4`).
///
/// Same contract as [`DataIntegrityError`]: classification rides on the type via
/// downcast, `Display` reproduces the original message verbatim. Constructed
/// where `on_schema_drift: fail` aborts the run.
#[derive(Debug)]
pub struct SchemaDriftError(String);

impl SchemaDriftError {
    /// Wrap an existing human-facing message as a schema-drift failure.
    /// The message text is preserved verbatim for `Display`.
    pub fn new(message: impl Into<String>) -> Self {
        Self(message.into())
    }
}

impl std::fmt::Display for SchemaDriftError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(&self.0)
    }
}

impl std::error::Error for SchemaDriftError {}

/// Typed marker carrying an **already-decided** process exit code.
///
/// A parallel-export child runs in its own process, classifies its own failure,
/// and exits with that code; the typed marker itself cannot cross the process
/// boundary — only the integer code does. The parent wraps the aggregate failure
/// in this marker so [`classify_exit`] re-derives the SAME class instead of
/// stringifying `"exited with status 3"` and collapsing it to a generic `1`.
#[derive(Debug)]
pub struct PreclassifiedExit(pub i32);

impl std::fmt::Display for PreclassifiedExit {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "child exited with status {}", self.0)
    }
}

impl std::error::Error for PreclassifiedExit {}

/// Typed marker carrying a **stable error code** (`RIVET_CONFIG_*` /
/// `RIVET_SOURCE_*`), for config / source failures that an operator's tooling
/// greps by code rather than by wording.
///
/// Same contract as [`DataIntegrityError`]: the code rides on the type via
/// downcast (so a reworded message never moves the code), and `Display`
/// reproduces the wrapped message verbatim — the console line is unchanged except
/// for the `[CODE]` prefix `main` adds. [`error_code`] reads `code` for the JSON
/// `code` field + the text prefix.
///
/// A `CodedError` is always exit class `Generic` (config / usage — fix it, don't
/// retry), which is already [`classify_exit`]'s default, so it carries no class
/// and needs no downcast arm there. The first coded error that needs a
/// non-`Generic` class (e.g. a retryable source failure) is where a class field
/// would be reintroduced — until then it is dead weight.
#[derive(Debug)]
pub struct CodedError {
    code: &'static str,
    message: String,
}

impl CodedError {
    /// Wrap a human-facing message with a stable `RIVET_*` code.
    pub fn new(code: &'static str, message: impl Into<String>) -> Self {
        Self {
            code,
            message: message.into(),
        }
    }

    /// The stable `RIVET_*` code.
    pub fn code(&self) -> &'static str {
        self.code
    }
}

impl std::fmt::Display for CodedError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(&self.message)
    }
}

impl std::error::Error for CodedError {}

/// The stable `RIVET_*` error code for a failure, if one was tagged via
/// [`CodedError`] anywhere in the anyhow context chain. `main` surfaces it as the
/// JSON `code` field and a `[CODE]` prefix on the text error line.
pub fn error_code(err: &anyhow::Error) -> Option<&'static str> {
    if let Some(c) = err.downcast_ref::<CodedError>() {
        return Some(c.code());
    }
    // The existing source-side statement-timeout marker also gets a stable code,
    // so the long-query failure an operator's `statement_timeout` tooling watches
    // for is greppable without re-tagging its construction site.
    if err
        .downcast_ref::<crate::source::StatementDurationTimeout>()
        .is_some()
    {
        return Some(codes::SOURCE_STATEMENT_TIMEOUT);
    }
    None
}

/// Map an error to its process exit code per the [`ExitClass`] taxonomy.
///
/// Precedence (first match wins):
/// 1. [`SchemaDriftError`] downcast → `4`.
/// 2. [`DataIntegrityError`] **or** [`crate::manifest::ManifestInconsistency`]
///    downcast → `3`.
/// 3. otherwise, if [`crate::pipeline::retry::classify_error`] says the error is
///    transient → `2`.
/// 4. otherwise → `1` (generic).
///
/// ## Why a string bridge for the aggregated `run` path
///
/// The single-export `apply` path returns the typed marker straight to `main`,
/// so the downcasts below fire directly. The multi-export `run` path used to
/// flatten per-export failures into a `Vec<String>` and re-raise a fresh
/// `anyhow!`, erasing the concrete type — which once forced a substring bridge
/// here. `pipeline::run` now carries a **representative typed failure** instead
/// (the most stop-worthy class among the failures), so the marker survives and
/// the downcasts work for `rivet run` too. Classification is therefore purely
/// type-driven: an un-typed data-integrity / drift failure classifies as
/// `Generic` on purpose — a *visible* signal that a marker was dropped upstream,
/// rather than being silently rescued by string matching.
pub fn classify_exit(err: &anyhow::Error) -> i32 {
    // Each check downcasts through anyhow's context chain.
    // A child process already classified itself and exited with that code; honor
    // it verbatim (parallel-export path) so the parent surfaces the same class.
    if let Some(p) = err.downcast_ref::<PreclassifiedExit>() {
        return p.0;
    }
    // A `CodedError` (config validation) is always exit class `Generic`, which is
    // this function's default below — so it needs no arm of its own here.
    if err.downcast_ref::<SchemaDriftError>().is_some() {
        return ExitClass::SchemaDrift.code();
    }
    if err.downcast_ref::<DataIntegrityError>().is_some()
        || err
            .downcast_ref::<crate::manifest::ManifestInconsistency>()
            .is_some()
    {
        return ExitClass::DataIntegrity.code();
    }
    if crate::pipeline::retry::classify_error(err).is_transient() {
        return ExitClass::Retryable.code();
    }
    ExitClass::Generic.code()
}

/// Stable, greppable error codes carried by [`CodedError`]. A scheduler / CI step
/// matches on these (the JSON `code` field or the `[CODE]` text prefix) instead
/// of the human wording, which is free to change. Every code shares the
/// `RIVET_CONFIG_` or `RIVET_SOURCE_` prefix; the `codes_*` guard tests assert
/// distinctness + the prefix, mirroring the verify-layer `RIVET_VERIFY_*` guard.
pub mod codes {
    // Config validation — always exit class Generic (`1`): fix the file, no retry.
    pub const CONFIG_NO_EXPORTS: &str = "RIVET_CONFIG_NO_EXPORTS";
    pub const CONFIG_CHUNK_COUNT_INVALID: &str = "RIVET_CONFIG_CHUNK_COUNT_INVALID";
    pub const CONFIG_CHUNK_BY_DAYS_INVALID: &str = "RIVET_CONFIG_CHUNK_BY_DAYS_INVALID";
    pub const CONFIG_DUPLICATE_EXPORT: &str = "RIVET_CONFIG_DUPLICATE_EXPORT";

    // Source — a statement that ran past the configured duration cap. Carried by
    // the existing `source::StatementDurationTimeout` marker (recognised in
    // [`super::error_code`]), so the long-query failure an operator's
    // `statement_timeout` tooling watches for has a stable code without
    // re-tagging its construction site. (Connect / auth codes are a deliberate
    // follow-up: tagging them at `create_source` must preserve the retry path's
    // transient classification — wrapping the driver error there can blind
    // `classify_error` and regress retries, so it needs its own careful change.)
    pub const SOURCE_STATEMENT_TIMEOUT: &str = "RIVET_SOURCE_STATEMENT_TIMEOUT";

    /// Every code, for the stability/uniqueness guard test.
    #[cfg(test)]
    pub(crate) const ALL: &[&str] = &[
        CONFIG_NO_EXPORTS,
        CONFIG_CHUNK_COUNT_INVALID,
        CONFIG_CHUNK_BY_DAYS_INVALID,
        CONFIG_DUPLICATE_EXPORT,
        SOURCE_STATEMENT_TIMEOUT,
    ];
}

/// `return Err`-style bail with a stable `RIVET_CONFIG_*` code (exit class
/// Generic). Drop-in for `anyhow::bail!` at a config-validation site — the
/// message text is unchanged; only a typed code rides alongside it.
#[macro_export]
macro_rules! config_bail {
    ($code:expr, $($arg:tt)*) => {
        return ::core::result::Result::Err(::anyhow::Error::new(
            $crate::error::CodedError::new($code, format!($($arg)*))))
    };
}

pub type Result<T> = anyhow::Result<T>;

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn schema_drift_marker_classifies_to_4() {
        let err: anyhow::Error = SchemaDriftError::new("schema changed").into();
        assert_eq!(classify_exit(&err), 4);
        assert_eq!(ExitClass::SchemaDrift.code(), 4);
    }

    #[test]
    fn data_integrity_marker_classifies_to_3() {
        let err: anyhow::Error = DataIntegrityError::new("reconcile mismatch").into();
        assert_eq!(classify_exit(&err), 3);
        assert_eq!(ExitClass::DataIntegrity.code(), 3);
    }

    #[test]
    fn manifest_inconsistency_classifies_to_3() {
        let err: anyhow::Error = crate::manifest::ManifestInconsistency::DuplicatePartId(1).into();
        assert_eq!(
            classify_exit(&err),
            3,
            "manifest self-consistency failure is a data-integrity stop"
        );
    }

    #[test]
    fn transient_error_classifies_to_2_syntax_error_to_1() {
        // Transient (string fallback in retry::classify_error) → retryable.
        let transient = anyhow::anyhow!("connection reset by peer");
        assert_eq!(
            classify_exit(&transient),
            2,
            "connection reset is retryable"
        );

        // Permanent / generic → 1.
        let syntax = anyhow::anyhow!("syntax error at or near \"SELET\"");
        assert_eq!(classify_exit(&syntax), 1, "a syntax error is not retryable");
    }

    #[test]
    fn typed_markers_survive_anyhow_context_wrapping() {
        // The downcast walks the chain, so a context-wrapped marker still
        // classifies by type (the `apply` path wraps with context on the way up).
        let drift: anyhow::Error = SchemaDriftError::new("drift").into();
        let wrapped = drift.context("export 'orders' failed");
        assert_eq!(classify_exit(&wrapped), 4);

        let dup: anyhow::Error = DataIntegrityError::new("dup").into();
        let wrapped = dup.context("export 'orders' failed");
        assert_eq!(classify_exit(&wrapped), 3);
    }

    #[test]
    fn run_carries_typed_marker_through_multi_failure_context() {
        // `pipeline::run`'s multi-failure path returns the representative typed
        // failure wrapped in a context string listing the others. The marker
        // must still downcast through that context so the exit class is right.
        let dup: anyhow::Error =
            DataIntegrityError::new("export 'orders': cannot safely retry (would duplicate rows)")
                .into();
        let aggregated = dup.context("2 export(s) failed; representative error follows (also: export 'events': connection reset)");
        assert_eq!(
            classify_exit(&aggregated),
            3,
            "the carried data-integrity marker must survive run's multi-failure context wrapping"
        );
    }

    #[test]
    fn untyped_flattened_string_is_generic_not_string_matched() {
        // Deliberate behavior change: classification is type-driven only. A bare
        // string that merely *reads* like a quality-gate failure (no marker) is
        // Generic — a visible signal a marker was dropped, not a silent rescue.
        let bare = anyhow::anyhow!("export 'orders': 1 quality check(s) failed: row_count low");
        assert_eq!(
            classify_exit(&bare),
            1,
            "an un-typed string must NOT be string-matched into data-integrity"
        );
    }

    #[test]
    fn data_integrity_marker_display_is_verbatim() {
        // The marker must reproduce the wrapped message byte-for-byte so the
        // operator-facing error line is unchanged from before the type existed.
        let msg = "export 'orders': 1 quality check(s) failed";
        assert_eq!(format!("{}", DataIntegrityError::new(msg)), msg);
        assert_eq!(format!("{}", SchemaDriftError::new(msg)), msg);
    }

    #[test]
    fn coded_error_codes_are_distinct_and_prefixed() {
        use std::collections::HashSet;
        let mut seen = HashSet::new();
        for &c in codes::ALL {
            assert!(seen.insert(c), "duplicate code: {c}");
            assert!(
                c.starts_with("RIVET_CONFIG_") || c.starts_with("RIVET_SOURCE_"),
                "code {c} must share the RIVET_CONFIG_ / RIVET_SOURCE_ prefix",
            );
        }
    }

    #[test]
    fn coded_error_surfaces_code_through_anyhow_context() {
        // The code rides on the type through `.context()`; `Display` is the
        // verbatim message (operator output unchanged but for the `[CODE]` prefix).
        // `classify_exit` returns `Generic` via its default (no `CodedError` arm),
        // proving the dropped `class` field changed nothing.
        let e = anyhow::Error::new(CodedError::new(
            codes::CONFIG_NO_EXPORTS,
            "exports: at least one export must be defined",
        ))
        .context("while loading config");
        assert_eq!(error_code(&e), Some(codes::CONFIG_NO_EXPORTS));
        assert_eq!(classify_exit(&e), ExitClass::Generic.code());
        assert!(format!("{e:#}").contains("at least one export must be defined"));
    }
}