Skip to main content

rivet/
manifest.rs

1//! **Layer: Trust contract**
2//!
3//! Public JSON manifest written next to every cloud-or-local-file run's
4//! output.  Defines the wire schema and the in-memory builder; the actual
5//! writer (atomic rename / atomic PUT) lives next to the destination
6//! implementations.
7//!
8//! Invariants are documented in [`docs/adr/0012-cloud-manifest-contract.md`].
9//! This module owns *only* the data types and a tiny set of pure helpers —
10//! ordering, atomicity, and `_SUCCESS` semantics belong to the writer.
11//!
12//! The manifest is read by:
13//! - `--resume` (decision matrix M8: skip / rewrite / quarantine)
14//! - `--validate` (M5: every listed part exists at recorded size)
15//! - `--reconcile` (manifest row counts vs source `COUNT(*)`)
16//! - the run report (informational; not a verdict source)
17//!
18//! Forward compatibility: callers MUST ignore unknown fields when reading.
19//! Field additions are non-breaking; field removals or type changes require
20//! a [`MANIFEST_VERSION`] bump.
21
22// The wire types (RunManifest, ManifestPart, ...) and the writer-side
23// helpers (success_marker_body) are already wired into the pipeline; the
24// reader-side helpers (validate_self_consistency, committed_rows,
25// committed_part_count, parse_success_marker, ManifestInconsistency) ship
26// next when `--validate` / `--reconcile` learn to inspect the manifest.
27// Mark the whole module as dead-code-tolerant until then so the bin crate
28// (which doesn't compile tests) stays clean.
29#![allow(dead_code)]
30
31use serde::{Deserialize, Serialize};
32
33/// Current manifest schema version.  See ADR-0012 §Manifest schema.
34pub const MANIFEST_VERSION: u32 = 1;
35
36/// File name of the manifest at the destination prefix.
37pub const MANIFEST_FILENAME: &str = "manifest.json";
38
39/// File name of the success marker.  Written *after* the manifest per M2;
40/// its presence implies M5 (every listed part exists at recorded size).
41pub const SUCCESS_FILENAME: &str = "_SUCCESS";
42
43/// Prefix under which untracked / corrupt parts are moved on resume (M9).
44/// Layout: `<prefix>/_quarantine/<run_id>/<original-name>`.
45pub const QUARANTINE_PREFIX: &str = "_quarantine";
46
47/// Writability probe `rivet doctor` drops at the destination prefix.  It is a
48/// Rivet-internal sidecar (like [`MANIFEST_FILENAME`] / [`SUCCESS_FILENAME`]),
49/// so the manifest-aware `--validate` pass must not flag it as an untracked
50/// foreign object when a run follows a `doctor` against the same prefix.
51pub const DOCTOR_PROBE_FILENAME: &str = ".rivet_doctor_probe";
52
53/// Join a manifest-relative key (e.g. a part `path`, [`MANIFEST_FILENAME`])
54/// onto a destination sub-directory.  An empty `dir` returns `key` unchanged
55/// — the common case, since production callers pass `""` (the manifest lives
56/// at the prefix root).  Shared by the destination-verification and
57/// resume-reconciliation paths so both speak the same key namespace.
58pub fn join_key(dir: &str, key: &str) -> String {
59    let dir = dir.trim_end_matches('/');
60    if dir.is_empty() {
61        key.to_string()
62    } else {
63        format!("{dir}/{key}")
64    }
65}
66
67/// Compute the body of the `_SUCCESS` marker for a given serialized manifest.
68///
69/// Format: a single line `"xxh3:<16-hex>\n"`.  ADR-0012 M2 — `_SUCCESS`
70/// carries the manifest fingerprint so an orchestrator can detect manifest
71/// changes (rerun, resume that completed, repair) with a cheap `GET _SUCCESS`
72/// instead of refetching the full manifest body.
73///
74/// `manifest_bytes` must be the exact bytes that were written to `manifest.json`
75/// — usually the result of `serde_json::to_vec_pretty(&RunManifest)`.  The
76/// caller is responsible for using the same bytes for both writes; computing
77/// the fingerprint from a re-serialized struct would risk encoding drift
78/// (key ordering, whitespace) producing a different hash.
79pub fn success_marker_body(manifest_bytes: &[u8]) -> String {
80    use xxhash_rust::xxh3::xxh3_64;
81    format!("xxh3:{:016x}\n", xxh3_64(manifest_bytes))
82}
83
84/// Parse the fingerprint out of a `_SUCCESS` marker body.
85///
86/// Returns `Some("xxh3:<hex>")` on a well-formed marker, `None` on anything
87/// else (empty file, missing prefix, wrong length, non-hex body).  Trailing
88/// whitespace and newlines are tolerated to match the on-wire shape produced
89/// by [`success_marker_body`].
90///
91/// Used by `--validate` and by external polling consumers (Airflow sensors,
92/// CI checks) to decide whether a cached manifest is still current.
93pub fn parse_success_marker(body: &str) -> Option<&str> {
94    let trimmed = body.trim_end_matches(|c: char| c.is_ascii_whitespace());
95    if trimmed.len() != "xxh3:".len() + 16 {
96        return None;
97    }
98    let (prefix, hex) = trimmed.split_at("xxh3:".len());
99    if prefix != "xxh3:" {
100        return None;
101    }
102    if !hex
103        .chars()
104        .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
105    {
106        return None;
107    }
108    Some(trimmed)
109}
110
111/// One per-column Form B checksum, keyed by column **name** (not position) so a
112/// column reorder between export and validate can never silently misalign the
113/// comparison (the positional `Vec<String>` it replaced could). `checksum` is the
114/// per-column xxh3, XOR-combined over the whole export, as a decimal string
115/// (JSON-stable).
116#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
117pub struct ColumnChecksum {
118    pub name: String,
119    pub checksum: String,
120}
121
122/// Public, stable JSON shape for the run manifest.
123///
124/// One manifest is written per `run_id` per export.  See ADR-0012 M4
125/// (Append-Only Per Run) for the resume-across-interruption story.
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct RunManifest {
128    pub manifest_version: u32,
129    pub run_id: String,
130    pub export_name: String,
131    pub started_at: String,
132    pub finished_at: String,
133    pub status: ManifestStatus,
134    pub source: ManifestSource,
135    pub destination: ManifestDestination,
136    pub format: String,
137    pub compression: String,
138    /// xxh3 fingerprint of the column schema; see [`crate::state::schema_fingerprint`].
139    pub schema_fingerprint: String,
140    pub row_count: i64,
141    pub part_count: u32,
142    pub parts: Vec<ManifestPart>,
143    /// Per-column value checksum over the whole export (Form B), keyed by column
144    /// name — the per-column xxh3 XOR-combined over the run. `rivet validate`
145    /// re-reads the parts and recomputes this to catch an `Arrow→Parquet` encode
146    /// fault or post-write corruption — the step the in-process Form A check
147    /// cannot see. Optional for back-compat: older manifests omit it (no
148    /// `MANIFEST_VERSION` bump), newer readers tolerate its absence.
149    #[serde(default, skip_serializing_if = "Option::is_none")]
150    pub column_checksums: Option<Vec<ColumnChecksum>>,
151    /// The column the Form B checksum is keyed to (`xxh3(key ‖ value)`, the
152    /// export's cursor/key column) so `validate` re-keys identically. `None` ⇒
153    /// un-keyed (a full export with no cursor). See [`ColumnChecksum`].
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub checksum_key_column: Option<String>,
156}
157
158/// Terminal status of the run *as recorded by the writer*.
159///
160/// `success` is only written when M2 (Manifest Before SUCCESS) is satisfied
161/// — i.e. when the writer is about to drop the `_SUCCESS` marker.
162/// `failed` and `interrupted` manifests serve as audit trails and as input
163/// to resume; they do NOT trigger `_SUCCESS`.
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub enum ManifestStatus {
167    Success,
168    Failed,
169    Interrupted,
170}
171
172#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
173pub struct ManifestSource {
174    pub engine: String,
175    pub schema: Option<String>,
176    pub table: Option<String>,
177}
178
179#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
180pub struct ManifestDestination {
181    pub kind: String,
182    pub uri: String,
183}
184
185/// One committed (or quarantined) output part.
186///
187/// `path` is **relative to the destination prefix** (ADR-0012 §Manifest
188/// schema) so the manifest is portable across copies of the dataset.
189#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
190pub struct ManifestPart {
191    pub part_id: u32,
192    pub path: String,
193    pub rows: i64,
194    pub size_bytes: u64,
195    /// xxh3 fingerprint of the part body.  Format mirrors [`crate::state::schema_fingerprint`]:
196    /// `"xxh3:<16-hex>"`.  Algorithm prefix MUST be checked before interpreting
197    /// the hex body (sha256/blake3 reserved for future hashers).
198    pub content_fingerprint: String,
199    /// Base64 MD5 of the part body, in GCS's `md5Hash` encoding — lets
200    /// destination verification compare against the object's listing metadata
201    /// with **no download** (GCS/S3/Azure surface this; the comparison rides
202    /// the listing `--validate` already does).  Empty for legacy manifests and
203    /// for parts whose MD5 could not be computed; the check then degrades to
204    /// size-only.  `#[serde(default)]` keeps pre-0.7.x manifests parseable.
205    #[serde(default)]
206    pub content_md5: String,
207    pub status: PartStatus,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
211#[serde(rename_all = "snake_case")]
212pub enum PartStatus {
213    /// Listed in the active manifest at the destination.
214    Committed,
215    /// Found in a prior manifest but rejected on resume (M9); retained for audit.
216    Quarantined,
217}
218
219impl RunManifest {
220    /// Sum of `rows` across `Committed` parts.  Used by M5 sanity checks and
221    /// by `--reconcile` to compare against source `COUNT(*)`.
222    pub fn committed_rows(&self) -> i64 {
223        self.parts
224            .iter()
225            .filter(|p| p.status == PartStatus::Committed)
226            .map(|p| p.rows)
227            .sum()
228    }
229
230    /// Number of `Committed` parts.
231    pub fn committed_part_count(&self) -> usize {
232        self.parts
233            .iter()
234            .filter(|p| p.status == PartStatus::Committed)
235            .count()
236    }
237
238    /// Verify that the recorded aggregates (`row_count`, `part_count`) match
239    /// the actual `Committed` parts in `parts`.  A mismatch is a writer bug;
240    /// callers should refuse to act on the manifest until investigated.
241    pub fn validate_self_consistency(&self) -> std::result::Result<(), ManifestInconsistency> {
242        if self.manifest_version != MANIFEST_VERSION {
243            return Err(ManifestInconsistency::UnsupportedVersion {
244                found: self.manifest_version,
245                supported: MANIFEST_VERSION,
246            });
247        }
248        let actual_parts = self.committed_part_count();
249        if actual_parts != self.part_count as usize {
250            return Err(ManifestInconsistency::PartCountMismatch {
251                declared: self.part_count,
252                actual: actual_parts,
253            });
254        }
255        let actual_rows = self.committed_rows();
256        if actual_rows != self.row_count {
257            return Err(ManifestInconsistency::RowCountMismatch {
258                declared: self.row_count,
259                actual: actual_rows,
260            });
261        }
262        // Part IDs must be unique within a manifest.
263        let mut ids: Vec<u32> = self.parts.iter().map(|p| p.part_id).collect();
264        ids.sort_unstable();
265        for w in ids.windows(2) {
266            if w[0] == w[1] {
267                return Err(ManifestInconsistency::DuplicatePartId(w[0]));
268            }
269        }
270        Ok(())
271    }
272}
273
274/// Self-consistency failures detected by [`RunManifest::validate_self_consistency`].
275///
276/// These represent writer bugs, not destination drift; M5 destination-state
277/// checks live in the validate command path.
278#[derive(Debug, PartialEq)]
279pub enum ManifestInconsistency {
280    UnsupportedVersion { found: u32, supported: u32 },
281    PartCountMismatch { declared: u32, actual: usize },
282    RowCountMismatch { declared: i64, actual: i64 },
283    DuplicatePartId(u32),
284}
285
286impl std::fmt::Display for ManifestInconsistency {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        match self {
289            Self::UnsupportedVersion { found, supported } => write!(
290                f,
291                "manifest_version {found} is not supported by this build (expected {supported})"
292            ),
293            Self::PartCountMismatch { declared, actual } => write!(
294                f,
295                "part_count declares {declared} parts but {actual} committed parts found"
296            ),
297            Self::RowCountMismatch { declared, actual } => write!(
298                f,
299                "row_count declares {declared} rows but committed parts sum to {actual}"
300            ),
301            Self::DuplicatePartId(id) => {
302                write!(f, "duplicate part_id {id} in manifest.parts")
303            }
304        }
305    }
306}
307
308impl std::error::Error for ManifestInconsistency {}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    fn part(id: u32, rows: i64, size: u64) -> ManifestPart {
315        ManifestPart {
316            part_id: id,
317            path: format!("part-{id:06}.parquet"),
318            rows,
319            size_bytes: size,
320            content_fingerprint: format!("xxh3:{:016x}", id as u64),
321            content_md5: String::new(),
322            status: PartStatus::Committed,
323        }
324    }
325
326    fn manifest_with_parts(parts: Vec<ManifestPart>) -> RunManifest {
327        let row_count = parts
328            .iter()
329            .filter(|p| p.status == PartStatus::Committed)
330            .map(|p| p.rows)
331            .sum();
332        let part_count = parts
333            .iter()
334            .filter(|p| p.status == PartStatus::Committed)
335            .count() as u32;
336        RunManifest {
337            manifest_version: MANIFEST_VERSION,
338            run_id: "orders_20260521T120000.000".into(),
339            export_name: "public.orders".into(),
340            started_at: "2026-05-21T12:00:00Z".into(),
341            finished_at: "2026-05-21T12:14:33Z".into(),
342            status: ManifestStatus::Success,
343            source: ManifestSource {
344                engine: "postgres".into(),
345                schema: Some("public".into()),
346                table: Some("orders".into()),
347            },
348            destination: ManifestDestination {
349                kind: "gcs".into(),
350                uri: "gs://rivet-exports/public.orders/run/".into(),
351            },
352            format: "parquet".into(),
353            compression: "zstd".into(),
354            schema_fingerprint: "xxh3:0123456789abcdef".into(),
355            row_count,
356            part_count,
357            parts,
358            column_checksums: None,
359            checksum_key_column: None,
360        }
361    }
362
363    // ── constants ───────────────────────────────────────────────────────────
364
365    #[test]
366    fn manifest_version_is_one() {
367        assert_eq!(MANIFEST_VERSION, 1);
368    }
369
370    #[test]
371    fn filenames_are_stable() {
372        assert_eq!(MANIFEST_FILENAME, "manifest.json");
373        assert_eq!(SUCCESS_FILENAME, "_SUCCESS");
374        assert_eq!(QUARANTINE_PREFIX, "_quarantine");
375    }
376
377    // ── self-consistency ────────────────────────────────────────────────────
378
379    #[test]
380    fn self_consistent_manifest_validates() {
381        let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
382        assert_eq!(m.validate_self_consistency(), Ok(()));
383    }
384
385    #[test]
386    fn rejects_part_count_mismatch() {
387        let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
388        m.part_count = 5;
389        assert!(matches!(
390            m.validate_self_consistency(),
391            Err(ManifestInconsistency::PartCountMismatch {
392                declared: 5,
393                actual: 1
394            })
395        ));
396    }
397
398    #[test]
399    fn rejects_row_count_mismatch() {
400        let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
401        m.row_count = 999;
402        assert!(matches!(
403            m.validate_self_consistency(),
404            Err(ManifestInconsistency::RowCountMismatch {
405                declared: 999,
406                actual: 100
407            })
408        ));
409    }
410
411    #[test]
412    fn rejects_duplicate_part_id() {
413        let m = manifest_with_parts(vec![part(1, 100, 4096), part(1, 200, 8192)]);
414        let err = m.validate_self_consistency().unwrap_err();
415        assert_eq!(err, ManifestInconsistency::DuplicatePartId(1));
416    }
417
418    #[test]
419    fn rejects_unsupported_version() {
420        let mut m = manifest_with_parts(vec![]);
421        m.manifest_version = 999;
422        m.part_count = 0;
423        m.row_count = 0;
424        assert!(matches!(
425            m.validate_self_consistency(),
426            Err(ManifestInconsistency::UnsupportedVersion {
427                found: 999,
428                supported: 1
429            })
430        ));
431    }
432
433    // ── quarantined parts ──────────────────────────────────────────────────
434
435    #[test]
436    fn quarantined_parts_do_not_count_toward_row_or_part_totals() {
437        let mut p_q = part(2, 999, 8192);
438        p_q.status = PartStatus::Quarantined;
439        let m = manifest_with_parts(vec![part(1, 100, 4096), p_q]);
440
441        // The factory only counts committed; manifest must validate.
442        assert_eq!(m.validate_self_consistency(), Ok(()));
443        assert_eq!(m.committed_rows(), 100);
444        assert_eq!(m.committed_part_count(), 1);
445    }
446
447    // ── serde roundtrip ────────────────────────────────────────────────────
448
449    #[test]
450    fn json_roundtrip_preserves_fields() {
451        let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
452        let json = serde_json::to_string_pretty(&m).unwrap();
453        let parsed: RunManifest = serde_json::from_str(&json).unwrap();
454        assert_eq!(m, parsed);
455    }
456
457    #[test]
458    fn status_serializes_as_snake_case() {
459        let m = manifest_with_parts(vec![]);
460        // Force part_count=0 so the empty-parts manifest still validates self-consistency,
461        // then check the wire form.  (This test cares about the enum encoding, not totals.)
462        let mut m = m;
463        m.part_count = 0;
464        m.row_count = 0;
465        let json = serde_json::to_string(&m).unwrap();
466        assert!(json.contains("\"status\":\"success\""));
467
468        m.status = ManifestStatus::Interrupted;
469        let json = serde_json::to_string(&m).unwrap();
470        assert!(json.contains("\"status\":\"interrupted\""));
471    }
472
473    // ── success marker ─────────────────────────────────────────────────────
474
475    #[test]
476    fn success_marker_body_is_xxh3_prefix_plus_16_hex_plus_newline() {
477        let body = success_marker_body(b"some manifest bytes");
478        assert!(body.starts_with("xxh3:"), "body = {body:?}");
479        assert!(body.ends_with('\n'), "body = {body:?}");
480        let trimmed = body.trim_end();
481        let hex = &trimmed["xxh3:".len()..];
482        assert_eq!(hex.len(), 16, "body = {body:?}");
483        assert!(
484            hex.chars()
485                .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
486        );
487    }
488
489    #[test]
490    fn success_marker_body_is_deterministic_for_same_input() {
491        let a = success_marker_body(b"hello");
492        let b = success_marker_body(b"hello");
493        assert_eq!(a, b);
494    }
495
496    #[test]
497    fn success_marker_body_differs_for_different_manifest_bytes() {
498        let a = success_marker_body(b"manifest one");
499        let b = success_marker_body(b"manifest two");
500        assert_ne!(a, b);
501    }
502
503    #[test]
504    fn parse_success_marker_roundtrips_with_writer() {
505        let body = success_marker_body(b"some manifest bytes");
506        let fp = parse_success_marker(&body).expect("must parse");
507        assert!(fp.starts_with("xxh3:"));
508        assert_eq!(fp.len(), "xxh3:".len() + 16);
509    }
510
511    #[test]
512    fn parse_success_marker_rejects_malformed_bodies() {
513        assert_eq!(parse_success_marker(""), None);
514        assert_eq!(parse_success_marker("\n"), None);
515        assert_eq!(parse_success_marker("sha256:0123456789abcdef"), None);
516        // Wrong hex length:
517        assert_eq!(parse_success_marker("xxh3:0123\n"), None);
518        // Uppercase hex (we emit lowercase; reject to keep the format strict):
519        assert_eq!(parse_success_marker("xxh3:0123456789ABCDEF\n"), None);
520        // Non-hex body:
521        assert_eq!(parse_success_marker("xxh3:zzzzzzzzzzzzzzzz\n"), None);
522        // Missing prefix:
523        assert_eq!(parse_success_marker("0123456789abcdef\n"), None);
524    }
525
526    #[test]
527    fn parse_success_marker_tolerates_trailing_whitespace() {
528        let body = "xxh3:0123456789abcdef\n";
529        assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
530        // CRLF on Windows, double newline, trailing spaces — all fine.
531        let body = "xxh3:0123456789abcdef\r\n";
532        assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
533    }
534
535    #[test]
536    fn unknown_fields_are_ignored_by_reader() {
537        // ADR-0012 forward-compatibility contract: a reader compiled against
538        // v1 must tolerate v2-style fields that it doesn't recognise.
539        let json = r#"{
540            "manifest_version": 1,
541            "run_id": "r1",
542            "export_name": "t",
543            "started_at": "2026-01-01T00:00:00Z",
544            "finished_at": "2026-01-01T00:01:00Z",
545            "status": "success",
546            "source": {"engine": "postgres"},
547            "destination": {"kind": "local", "uri": "file:///tmp/out/"},
548            "format": "parquet",
549            "compression": "zstd",
550            "schema_fingerprint": "xxh3:0000000000000000",
551            "row_count": 0,
552            "part_count": 0,
553            "parts": [],
554            "future_field_added_in_v2": {"nested": true}
555        }"#;
556        let parsed: RunManifest = serde_json::from_str(json).unwrap();
557        assert_eq!(parsed.run_id, "r1");
558        assert_eq!(parsed.validate_self_consistency(), Ok(()));
559    }
560}