Skip to main content

rivet/
manifest.rs

1//! **Layer: Trust contract**
2//!
3//! Public JSON manifest written next to every cloud-or-local-file run's
4//! output.  Defines the wire schema and the in-memory builder; the actual
5//! writer (atomic rename / atomic PUT) lives next to the destination
6//! implementations.
7//!
8//! Invariants are documented in [`docs/adr/0012-cloud-manifest-contract.md`].
9//! This module owns *only* the data types and a tiny set of pure helpers —
10//! ordering, atomicity, and `_SUCCESS` semantics belong to the writer.
11//!
12//! The manifest is read by:
13//! - `--resume` (decision matrix M8: skip / rewrite / quarantine)
14//! - `--validate` (M5: every listed part exists at recorded size)
15//! - `--reconcile` (manifest row counts vs source `COUNT(*)`)
16//! - the run report (informational; not a verdict source)
17//!
18//! Forward compatibility: callers MUST ignore unknown fields when reading.
19//! Field additions are non-breaking; field removals or type changes require
20//! a [`MANIFEST_VERSION`] bump.
21
22// The wire types (RunManifest, ManifestPart, ...) and the writer-side
23// helpers (success_marker_body) are already wired into the pipeline; the
24// reader-side helpers (validate_self_consistency, committed_rows,
25// committed_part_count, parse_success_marker, ManifestInconsistency) ship
26// next when `--validate` / `--reconcile` learn to inspect the manifest.
27// Mark the whole module as dead-code-tolerant until then so the bin crate
28// (which doesn't compile tests) stays clean.
29#![allow(dead_code)]
30
31use serde::{Deserialize, Serialize};
32
33/// Current manifest schema version.  See ADR-0012 §Manifest schema.
34pub const MANIFEST_VERSION: u32 = 1;
35
36/// File name of the manifest at the destination prefix.
37pub const MANIFEST_FILENAME: &str = "manifest.json";
38
39/// File name of the success marker.  Written *after* the manifest per M2;
40/// its presence implies M5 (every listed part exists at recorded size).
41pub const SUCCESS_FILENAME: &str = "_SUCCESS";
42
43/// Prefix under which untracked / corrupt parts are moved on resume (M9).
44/// Layout: `<prefix>/_quarantine/<run_id>/<original-name>`.
45pub const QUARANTINE_PREFIX: &str = "_quarantine";
46
47/// Writability probe `rivet doctor` drops at the destination prefix.  It is a
48/// Rivet-internal sidecar (like [`MANIFEST_FILENAME`] / [`SUCCESS_FILENAME`]),
49/// so the manifest-aware `--validate` pass must not flag it as an untracked
50/// foreign object when a run follows a `doctor` against the same prefix.
51pub const DOCTOR_PROBE_FILENAME: &str = ".rivet_doctor_probe";
52
53/// Compute the body of the `_SUCCESS` marker for a given serialized manifest.
54///
55/// Format: a single line `"xxh3:<16-hex>\n"`.  ADR-0012 M2 — `_SUCCESS`
56/// carries the manifest fingerprint so an orchestrator can detect manifest
57/// changes (rerun, resume that completed, repair) with a cheap `GET _SUCCESS`
58/// instead of refetching the full manifest body.
59///
60/// `manifest_bytes` must be the exact bytes that were written to `manifest.json`
61/// — usually the result of `serde_json::to_vec_pretty(&RunManifest)`.  The
62/// caller is responsible for using the same bytes for both writes; computing
63/// the fingerprint from a re-serialized struct would risk encoding drift
64/// (key ordering, whitespace) producing a different hash.
65pub fn success_marker_body(manifest_bytes: &[u8]) -> String {
66    use xxhash_rust::xxh3::xxh3_64;
67    format!("xxh3:{:016x}\n", xxh3_64(manifest_bytes))
68}
69
70/// Parse the fingerprint out of a `_SUCCESS` marker body.
71///
72/// Returns `Some("xxh3:<hex>")` on a well-formed marker, `None` on anything
73/// else (empty file, missing prefix, wrong length, non-hex body).  Trailing
74/// whitespace and newlines are tolerated to match the on-wire shape produced
75/// by [`success_marker_body`].
76///
77/// Used by `--validate` and by external polling consumers (Airflow sensors,
78/// CI checks) to decide whether a cached manifest is still current.
79pub fn parse_success_marker(body: &str) -> Option<&str> {
80    let trimmed = body.trim_end_matches(|c: char| c.is_ascii_whitespace());
81    if trimmed.len() != "xxh3:".len() + 16 {
82        return None;
83    }
84    let (prefix, hex) = trimmed.split_at("xxh3:".len());
85    if prefix != "xxh3:" {
86        return None;
87    }
88    if !hex
89        .chars()
90        .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
91    {
92        return None;
93    }
94    Some(trimmed)
95}
96
97/// Public, stable JSON shape for the run manifest.
98///
99/// One manifest is written per `run_id` per export.  See ADR-0012 M4
100/// (Append-Only Per Run) for the resume-across-interruption story.
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102pub struct RunManifest {
103    pub manifest_version: u32,
104    pub run_id: String,
105    pub export_name: String,
106    pub started_at: String,
107    pub finished_at: String,
108    pub status: ManifestStatus,
109    pub source: ManifestSource,
110    pub destination: ManifestDestination,
111    pub format: String,
112    pub compression: String,
113    /// xxh3 fingerprint of the column schema; see [`crate::state::schema_fingerprint`].
114    pub schema_fingerprint: String,
115    pub row_count: i64,
116    pub part_count: u32,
117    pub parts: Vec<ManifestPart>,
118}
119
120/// Terminal status of the run *as recorded by the writer*.
121///
122/// `success` is only written when M2 (Manifest Before SUCCESS) is satisfied
123/// — i.e. when the writer is about to drop the `_SUCCESS` marker.
124/// `failed` and `interrupted` manifests serve as audit trails and as input
125/// to resume; they do NOT trigger `_SUCCESS`.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127#[serde(rename_all = "snake_case")]
128pub enum ManifestStatus {
129    Success,
130    Failed,
131    Interrupted,
132}
133
134#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
135pub struct ManifestSource {
136    pub engine: String,
137    pub schema: Option<String>,
138    pub table: Option<String>,
139}
140
141#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
142pub struct ManifestDestination {
143    pub kind: String,
144    pub uri: String,
145}
146
147/// One committed (or quarantined) output part.
148///
149/// `path` is **relative to the destination prefix** (ADR-0012 §Manifest
150/// schema) so the manifest is portable across copies of the dataset.
151#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
152pub struct ManifestPart {
153    pub part_id: u32,
154    pub path: String,
155    pub rows: i64,
156    pub size_bytes: u64,
157    /// xxh3 fingerprint of the part body.  Format mirrors [`crate::state::schema_fingerprint`]:
158    /// `"xxh3:<16-hex>"`.  Algorithm prefix MUST be checked before interpreting
159    /// the hex body (sha256/blake3 reserved for future hashers).
160    pub content_fingerprint: String,
161    pub status: PartStatus,
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub enum PartStatus {
167    /// Listed in the active manifest at the destination.
168    Committed,
169    /// Found in a prior manifest but rejected on resume (M9); retained for audit.
170    Quarantined,
171}
172
173impl RunManifest {
174    /// Sum of `rows` across `Committed` parts.  Used by M5 sanity checks and
175    /// by `--reconcile` to compare against source `COUNT(*)`.
176    pub fn committed_rows(&self) -> i64 {
177        self.parts
178            .iter()
179            .filter(|p| p.status == PartStatus::Committed)
180            .map(|p| p.rows)
181            .sum()
182    }
183
184    /// Number of `Committed` parts.
185    pub fn committed_part_count(&self) -> usize {
186        self.parts
187            .iter()
188            .filter(|p| p.status == PartStatus::Committed)
189            .count()
190    }
191
192    /// Verify that the recorded aggregates (`row_count`, `part_count`) match
193    /// the actual `Committed` parts in `parts`.  A mismatch is a writer bug;
194    /// callers should refuse to act on the manifest until investigated.
195    pub fn validate_self_consistency(&self) -> std::result::Result<(), ManifestInconsistency> {
196        if self.manifest_version != MANIFEST_VERSION {
197            return Err(ManifestInconsistency::UnsupportedVersion {
198                found: self.manifest_version,
199                supported: MANIFEST_VERSION,
200            });
201        }
202        let actual_parts = self.committed_part_count();
203        if actual_parts != self.part_count as usize {
204            return Err(ManifestInconsistency::PartCountMismatch {
205                declared: self.part_count,
206                actual: actual_parts,
207            });
208        }
209        let actual_rows = self.committed_rows();
210        if actual_rows != self.row_count {
211            return Err(ManifestInconsistency::RowCountMismatch {
212                declared: self.row_count,
213                actual: actual_rows,
214            });
215        }
216        // Part IDs must be unique within a manifest.
217        let mut ids: Vec<u32> = self.parts.iter().map(|p| p.part_id).collect();
218        ids.sort_unstable();
219        for w in ids.windows(2) {
220            if w[0] == w[1] {
221                return Err(ManifestInconsistency::DuplicatePartId(w[0]));
222            }
223        }
224        Ok(())
225    }
226}
227
228/// Self-consistency failures detected by [`RunManifest::validate_self_consistency`].
229///
230/// These represent writer bugs, not destination drift; M5 destination-state
231/// checks live in the validate command path.
232#[derive(Debug, PartialEq)]
233pub enum ManifestInconsistency {
234    UnsupportedVersion { found: u32, supported: u32 },
235    PartCountMismatch { declared: u32, actual: usize },
236    RowCountMismatch { declared: i64, actual: i64 },
237    DuplicatePartId(u32),
238}
239
240impl std::fmt::Display for ManifestInconsistency {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        match self {
243            Self::UnsupportedVersion { found, supported } => write!(
244                f,
245                "manifest_version {found} is not supported by this build (expected {supported})"
246            ),
247            Self::PartCountMismatch { declared, actual } => write!(
248                f,
249                "part_count declares {declared} parts but {actual} committed parts found"
250            ),
251            Self::RowCountMismatch { declared, actual } => write!(
252                f,
253                "row_count declares {declared} rows but committed parts sum to {actual}"
254            ),
255            Self::DuplicatePartId(id) => {
256                write!(f, "duplicate part_id {id} in manifest.parts")
257            }
258        }
259    }
260}
261
262impl std::error::Error for ManifestInconsistency {}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    fn part(id: u32, rows: i64, size: u64) -> ManifestPart {
269        ManifestPart {
270            part_id: id,
271            path: format!("part-{id:06}.parquet"),
272            rows,
273            size_bytes: size,
274            content_fingerprint: format!("xxh3:{:016x}", id as u64),
275            status: PartStatus::Committed,
276        }
277    }
278
279    fn manifest_with_parts(parts: Vec<ManifestPart>) -> RunManifest {
280        let row_count = parts
281            .iter()
282            .filter(|p| p.status == PartStatus::Committed)
283            .map(|p| p.rows)
284            .sum();
285        let part_count = parts
286            .iter()
287            .filter(|p| p.status == PartStatus::Committed)
288            .count() as u32;
289        RunManifest {
290            manifest_version: MANIFEST_VERSION,
291            run_id: "orders_20260521T120000.000".into(),
292            export_name: "public.orders".into(),
293            started_at: "2026-05-21T12:00:00Z".into(),
294            finished_at: "2026-05-21T12:14:33Z".into(),
295            status: ManifestStatus::Success,
296            source: ManifestSource {
297                engine: "postgres".into(),
298                schema: Some("public".into()),
299                table: Some("orders".into()),
300            },
301            destination: ManifestDestination {
302                kind: "gcs".into(),
303                uri: "gs://rivet-exports/public.orders/run/".into(),
304            },
305            format: "parquet".into(),
306            compression: "zstd".into(),
307            schema_fingerprint: "xxh3:0123456789abcdef".into(),
308            row_count,
309            part_count,
310            parts,
311        }
312    }
313
314    // ── constants ───────────────────────────────────────────────────────────
315
316    #[test]
317    fn manifest_version_is_one() {
318        assert_eq!(MANIFEST_VERSION, 1);
319    }
320
321    #[test]
322    fn filenames_are_stable() {
323        assert_eq!(MANIFEST_FILENAME, "manifest.json");
324        assert_eq!(SUCCESS_FILENAME, "_SUCCESS");
325        assert_eq!(QUARANTINE_PREFIX, "_quarantine");
326    }
327
328    // ── self-consistency ────────────────────────────────────────────────────
329
330    #[test]
331    fn self_consistent_manifest_validates() {
332        let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
333        assert_eq!(m.validate_self_consistency(), Ok(()));
334    }
335
336    #[test]
337    fn rejects_part_count_mismatch() {
338        let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
339        m.part_count = 5;
340        assert!(matches!(
341            m.validate_self_consistency(),
342            Err(ManifestInconsistency::PartCountMismatch {
343                declared: 5,
344                actual: 1
345            })
346        ));
347    }
348
349    #[test]
350    fn rejects_row_count_mismatch() {
351        let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
352        m.row_count = 999;
353        assert!(matches!(
354            m.validate_self_consistency(),
355            Err(ManifestInconsistency::RowCountMismatch {
356                declared: 999,
357                actual: 100
358            })
359        ));
360    }
361
362    #[test]
363    fn rejects_duplicate_part_id() {
364        let m = manifest_with_parts(vec![part(1, 100, 4096), part(1, 200, 8192)]);
365        let err = m.validate_self_consistency().unwrap_err();
366        assert_eq!(err, ManifestInconsistency::DuplicatePartId(1));
367    }
368
369    #[test]
370    fn rejects_unsupported_version() {
371        let mut m = manifest_with_parts(vec![]);
372        m.manifest_version = 999;
373        m.part_count = 0;
374        m.row_count = 0;
375        assert!(matches!(
376            m.validate_self_consistency(),
377            Err(ManifestInconsistency::UnsupportedVersion {
378                found: 999,
379                supported: 1
380            })
381        ));
382    }
383
384    // ── quarantined parts ──────────────────────────────────────────────────
385
386    #[test]
387    fn quarantined_parts_do_not_count_toward_row_or_part_totals() {
388        let mut p_q = part(2, 999, 8192);
389        p_q.status = PartStatus::Quarantined;
390        let m = manifest_with_parts(vec![part(1, 100, 4096), p_q]);
391
392        // The factory only counts committed; manifest must validate.
393        assert_eq!(m.validate_self_consistency(), Ok(()));
394        assert_eq!(m.committed_rows(), 100);
395        assert_eq!(m.committed_part_count(), 1);
396    }
397
398    // ── serde roundtrip ────────────────────────────────────────────────────
399
400    #[test]
401    fn json_roundtrip_preserves_fields() {
402        let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
403        let json = serde_json::to_string_pretty(&m).unwrap();
404        let parsed: RunManifest = serde_json::from_str(&json).unwrap();
405        assert_eq!(m, parsed);
406    }
407
408    #[test]
409    fn status_serializes_as_snake_case() {
410        let m = manifest_with_parts(vec![]);
411        // Force part_count=0 so the empty-parts manifest still validates self-consistency,
412        // then check the wire form.  (This test cares about the enum encoding, not totals.)
413        let mut m = m;
414        m.part_count = 0;
415        m.row_count = 0;
416        let json = serde_json::to_string(&m).unwrap();
417        assert!(json.contains("\"status\":\"success\""));
418
419        m.status = ManifestStatus::Interrupted;
420        let json = serde_json::to_string(&m).unwrap();
421        assert!(json.contains("\"status\":\"interrupted\""));
422    }
423
424    // ── success marker ─────────────────────────────────────────────────────
425
426    #[test]
427    fn success_marker_body_is_xxh3_prefix_plus_16_hex_plus_newline() {
428        let body = success_marker_body(b"some manifest bytes");
429        assert!(body.starts_with("xxh3:"), "body = {body:?}");
430        assert!(body.ends_with('\n'), "body = {body:?}");
431        let trimmed = body.trim_end();
432        let hex = &trimmed["xxh3:".len()..];
433        assert_eq!(hex.len(), 16, "body = {body:?}");
434        assert!(
435            hex.chars()
436                .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
437        );
438    }
439
440    #[test]
441    fn success_marker_body_is_deterministic_for_same_input() {
442        let a = success_marker_body(b"hello");
443        let b = success_marker_body(b"hello");
444        assert_eq!(a, b);
445    }
446
447    #[test]
448    fn success_marker_body_differs_for_different_manifest_bytes() {
449        let a = success_marker_body(b"manifest one");
450        let b = success_marker_body(b"manifest two");
451        assert_ne!(a, b);
452    }
453
454    #[test]
455    fn parse_success_marker_roundtrips_with_writer() {
456        let body = success_marker_body(b"some manifest bytes");
457        let fp = parse_success_marker(&body).expect("must parse");
458        assert!(fp.starts_with("xxh3:"));
459        assert_eq!(fp.len(), "xxh3:".len() + 16);
460    }
461
462    #[test]
463    fn parse_success_marker_rejects_malformed_bodies() {
464        assert_eq!(parse_success_marker(""), None);
465        assert_eq!(parse_success_marker("\n"), None);
466        assert_eq!(parse_success_marker("sha256:0123456789abcdef"), None);
467        // Wrong hex length:
468        assert_eq!(parse_success_marker("xxh3:0123\n"), None);
469        // Uppercase hex (we emit lowercase; reject to keep the format strict):
470        assert_eq!(parse_success_marker("xxh3:0123456789ABCDEF\n"), None);
471        // Non-hex body:
472        assert_eq!(parse_success_marker("xxh3:zzzzzzzzzzzzzzzz\n"), None);
473        // Missing prefix:
474        assert_eq!(parse_success_marker("0123456789abcdef\n"), None);
475    }
476
477    #[test]
478    fn parse_success_marker_tolerates_trailing_whitespace() {
479        let body = "xxh3:0123456789abcdef\n";
480        assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
481        // CRLF on Windows, double newline, trailing spaces — all fine.
482        let body = "xxh3:0123456789abcdef\r\n";
483        assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
484    }
485
486    #[test]
487    fn unknown_fields_are_ignored_by_reader() {
488        // ADR-0012 forward-compatibility contract: a reader compiled against
489        // v1 must tolerate v2-style fields that it doesn't recognise.
490        let json = r#"{
491            "manifest_version": 1,
492            "run_id": "r1",
493            "export_name": "t",
494            "started_at": "2026-01-01T00:00:00Z",
495            "finished_at": "2026-01-01T00:01:00Z",
496            "status": "success",
497            "source": {"engine": "postgres"},
498            "destination": {"kind": "local", "uri": "file:///tmp/out/"},
499            "format": "parquet",
500            "compression": "zstd",
501            "schema_fingerprint": "xxh3:0000000000000000",
502            "row_count": 0,
503            "part_count": 0,
504            "parts": [],
505            "future_field_added_in_v2": {"nested": true}
506        }"#;
507        let parsed: RunManifest = serde_json::from_str(json).unwrap();
508        assert_eq!(parsed.run_id, "r1");
509        assert_eq!(parsed.validate_self_consistency(), Ok(()));
510    }
511}