Skip to main content

rivet/
manifest.rs

1//! **Layer: Trust contract**
2//!
3//! Public JSON manifest written next to every cloud-or-local-file run's
4//! output.  Defines the wire schema and the in-memory builder; the actual
5//! writer (atomic rename / atomic PUT) lives next to the destination
6//! implementations.
7//!
8//! Invariants are documented in [`docs/adr/0012-cloud-manifest-contract.md`].
9//! This module owns *only* the data types and a tiny set of pure helpers —
10//! ordering, atomicity, and `_SUCCESS` semantics belong to the writer.
11//!
12//! The manifest is read by:
13//! - `--resume` (decision matrix M8: skip / rewrite / quarantine)
14//! - `--validate` (M5: every listed part exists at recorded size)
15//! - `--reconcile` (manifest row counts vs source `COUNT(*)`)
16//! - the run report (informational; not a verdict source)
17//!
18//! Forward compatibility: callers MUST ignore unknown fields when reading.
19//! Field additions are non-breaking; field removals or type changes require
20//! a [`MANIFEST_VERSION`] bump.
21
22// The wire types (RunManifest, ManifestPart, ...) and the writer-side
23// helpers (success_marker_body) are already wired into the pipeline; the
24// reader-side helpers (validate_self_consistency, committed_rows,
25// committed_part_count, parse_success_marker, ManifestInconsistency) ship
26// next when `--validate` / `--reconcile` learn to inspect the manifest.
27// Mark the whole module as dead-code-tolerant until then so the bin crate
28// (which doesn't compile tests) stays clean.
29#![allow(dead_code)]
30
31use serde::{Deserialize, Serialize};
32
33/// Current manifest schema version.  See ADR-0012 §Manifest schema.
34pub const MANIFEST_VERSION: u32 = 1;
35
36/// File name of the manifest at the destination prefix.
37pub const MANIFEST_FILENAME: &str = "manifest.json";
38
39/// File name of the success marker.  Written *after* the manifest per M2;
40/// its presence implies M5 (every listed part exists at recorded size).
41pub const SUCCESS_FILENAME: &str = "_SUCCESS";
42
43/// Prefix under which untracked / corrupt parts are moved on resume (M9).
44/// Layout: `<prefix>/_quarantine/<run_id>/<original-name>`.
45pub const QUARANTINE_PREFIX: &str = "_quarantine";
46
47/// Compute the body of the `_SUCCESS` marker for a given serialized manifest.
48///
49/// Format: a single line `"xxh3:<16-hex>\n"`.  ADR-0012 M2 — `_SUCCESS`
50/// carries the manifest fingerprint so an orchestrator can detect manifest
51/// changes (rerun, resume that completed, repair) with a cheap `GET _SUCCESS`
52/// instead of refetching the full manifest body.
53///
54/// `manifest_bytes` must be the exact bytes that were written to `manifest.json`
55/// — usually the result of `serde_json::to_vec_pretty(&RunManifest)`.  The
56/// caller is responsible for using the same bytes for both writes; computing
57/// the fingerprint from a re-serialized struct would risk encoding drift
58/// (key ordering, whitespace) producing a different hash.
59pub fn success_marker_body(manifest_bytes: &[u8]) -> String {
60    use xxhash_rust::xxh3::xxh3_64;
61    format!("xxh3:{:016x}\n", xxh3_64(manifest_bytes))
62}
63
64/// Parse the fingerprint out of a `_SUCCESS` marker body.
65///
66/// Returns `Some("xxh3:<hex>")` on a well-formed marker, `None` on anything
67/// else (empty file, missing prefix, wrong length, non-hex body).  Trailing
68/// whitespace and newlines are tolerated to match the on-wire shape produced
69/// by [`success_marker_body`].
70///
71/// Used by `--validate` and by external polling consumers (Airflow sensors,
72/// CI checks) to decide whether a cached manifest is still current.
73pub fn parse_success_marker(body: &str) -> Option<&str> {
74    let trimmed = body.trim_end_matches(|c: char| c.is_ascii_whitespace());
75    if trimmed.len() != "xxh3:".len() + 16 {
76        return None;
77    }
78    let (prefix, hex) = trimmed.split_at("xxh3:".len());
79    if prefix != "xxh3:" {
80        return None;
81    }
82    if !hex
83        .chars()
84        .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
85    {
86        return None;
87    }
88    Some(trimmed)
89}
90
91/// Public, stable JSON shape for the run manifest.
92///
93/// One manifest is written per `run_id` per export.  See ADR-0012 M4
94/// (Append-Only Per Run) for the resume-across-interruption story.
95#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
96pub struct RunManifest {
97    pub manifest_version: u32,
98    pub run_id: String,
99    pub export_name: String,
100    pub started_at: String,
101    pub finished_at: String,
102    pub status: ManifestStatus,
103    pub source: ManifestSource,
104    pub destination: ManifestDestination,
105    pub format: String,
106    pub compression: String,
107    /// xxh3 fingerprint of the column schema; see [`crate::state::schema_fingerprint`].
108    pub schema_fingerprint: String,
109    pub row_count: i64,
110    pub part_count: u32,
111    pub parts: Vec<ManifestPart>,
112}
113
114/// Terminal status of the run *as recorded by the writer*.
115///
116/// `success` is only written when M2 (Manifest Before SUCCESS) is satisfied
117/// — i.e. when the writer is about to drop the `_SUCCESS` marker.
118/// `failed` and `interrupted` manifests serve as audit trails and as input
119/// to resume; they do NOT trigger `_SUCCESS`.
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "snake_case")]
122pub enum ManifestStatus {
123    Success,
124    Failed,
125    Interrupted,
126}
127
128#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
129pub struct ManifestSource {
130    pub engine: String,
131    pub schema: Option<String>,
132    pub table: Option<String>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136pub struct ManifestDestination {
137    pub kind: String,
138    pub uri: String,
139}
140
141/// One committed (or quarantined) output part.
142///
143/// `path` is **relative to the destination prefix** (ADR-0012 §Manifest
144/// schema) so the manifest is portable across copies of the dataset.
145#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
146pub struct ManifestPart {
147    pub part_id: u32,
148    pub path: String,
149    pub rows: i64,
150    pub size_bytes: u64,
151    /// xxh3 fingerprint of the part body.  Format mirrors [`crate::state::schema_fingerprint`]:
152    /// `"xxh3:<16-hex>"`.  Algorithm prefix MUST be checked before interpreting
153    /// the hex body (sha256/blake3 reserved for future hashers).
154    pub content_fingerprint: String,
155    pub status: PartStatus,
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
159#[serde(rename_all = "snake_case")]
160pub enum PartStatus {
161    /// Listed in the active manifest at the destination.
162    Committed,
163    /// Found in a prior manifest but rejected on resume (M9); retained for audit.
164    Quarantined,
165}
166
167impl RunManifest {
168    /// Sum of `rows` across `Committed` parts.  Used by M5 sanity checks and
169    /// by `--reconcile` to compare against source `COUNT(*)`.
170    pub fn committed_rows(&self) -> i64 {
171        self.parts
172            .iter()
173            .filter(|p| p.status == PartStatus::Committed)
174            .map(|p| p.rows)
175            .sum()
176    }
177
178    /// Number of `Committed` parts.
179    pub fn committed_part_count(&self) -> usize {
180        self.parts
181            .iter()
182            .filter(|p| p.status == PartStatus::Committed)
183            .count()
184    }
185
186    /// Verify that the recorded aggregates (`row_count`, `part_count`) match
187    /// the actual `Committed` parts in `parts`.  A mismatch is a writer bug;
188    /// callers should refuse to act on the manifest until investigated.
189    pub fn validate_self_consistency(&self) -> std::result::Result<(), ManifestInconsistency> {
190        if self.manifest_version != MANIFEST_VERSION {
191            return Err(ManifestInconsistency::UnsupportedVersion {
192                found: self.manifest_version,
193                supported: MANIFEST_VERSION,
194            });
195        }
196        let actual_parts = self.committed_part_count();
197        if actual_parts != self.part_count as usize {
198            return Err(ManifestInconsistency::PartCountMismatch {
199                declared: self.part_count,
200                actual: actual_parts,
201            });
202        }
203        let actual_rows = self.committed_rows();
204        if actual_rows != self.row_count {
205            return Err(ManifestInconsistency::RowCountMismatch {
206                declared: self.row_count,
207                actual: actual_rows,
208            });
209        }
210        // Part IDs must be unique within a manifest.
211        let mut ids: Vec<u32> = self.parts.iter().map(|p| p.part_id).collect();
212        ids.sort_unstable();
213        for w in ids.windows(2) {
214            if w[0] == w[1] {
215                return Err(ManifestInconsistency::DuplicatePartId(w[0]));
216            }
217        }
218        Ok(())
219    }
220}
221
222/// Self-consistency failures detected by [`RunManifest::validate_self_consistency`].
223///
224/// These represent writer bugs, not destination drift; M5 destination-state
225/// checks live in the validate command path.
226#[derive(Debug, PartialEq)]
227pub enum ManifestInconsistency {
228    UnsupportedVersion { found: u32, supported: u32 },
229    PartCountMismatch { declared: u32, actual: usize },
230    RowCountMismatch { declared: i64, actual: i64 },
231    DuplicatePartId(u32),
232}
233
234impl std::fmt::Display for ManifestInconsistency {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        match self {
237            Self::UnsupportedVersion { found, supported } => write!(
238                f,
239                "manifest_version {found} is not supported by this build (expected {supported})"
240            ),
241            Self::PartCountMismatch { declared, actual } => write!(
242                f,
243                "part_count declares {declared} parts but {actual} committed parts found"
244            ),
245            Self::RowCountMismatch { declared, actual } => write!(
246                f,
247                "row_count declares {declared} rows but committed parts sum to {actual}"
248            ),
249            Self::DuplicatePartId(id) => {
250                write!(f, "duplicate part_id {id} in manifest.parts")
251            }
252        }
253    }
254}
255
256impl std::error::Error for ManifestInconsistency {}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    fn part(id: u32, rows: i64, size: u64) -> ManifestPart {
263        ManifestPart {
264            part_id: id,
265            path: format!("part-{id:06}.parquet"),
266            rows,
267            size_bytes: size,
268            content_fingerprint: format!("xxh3:{:016x}", id as u64),
269            status: PartStatus::Committed,
270        }
271    }
272
273    fn manifest_with_parts(parts: Vec<ManifestPart>) -> RunManifest {
274        let row_count = parts
275            .iter()
276            .filter(|p| p.status == PartStatus::Committed)
277            .map(|p| p.rows)
278            .sum();
279        let part_count = parts
280            .iter()
281            .filter(|p| p.status == PartStatus::Committed)
282            .count() as u32;
283        RunManifest {
284            manifest_version: MANIFEST_VERSION,
285            run_id: "orders_20260521T120000.000".into(),
286            export_name: "public.orders".into(),
287            started_at: "2026-05-21T12:00:00Z".into(),
288            finished_at: "2026-05-21T12:14:33Z".into(),
289            status: ManifestStatus::Success,
290            source: ManifestSource {
291                engine: "postgres".into(),
292                schema: Some("public".into()),
293                table: Some("orders".into()),
294            },
295            destination: ManifestDestination {
296                kind: "gcs".into(),
297                uri: "gs://rivet-exports/public.orders/run/".into(),
298            },
299            format: "parquet".into(),
300            compression: "zstd".into(),
301            schema_fingerprint: "xxh3:0123456789abcdef".into(),
302            row_count,
303            part_count,
304            parts,
305        }
306    }
307
308    // ── constants ───────────────────────────────────────────────────────────
309
310    #[test]
311    fn manifest_version_is_one() {
312        assert_eq!(MANIFEST_VERSION, 1);
313    }
314
315    #[test]
316    fn filenames_are_stable() {
317        assert_eq!(MANIFEST_FILENAME, "manifest.json");
318        assert_eq!(SUCCESS_FILENAME, "_SUCCESS");
319        assert_eq!(QUARANTINE_PREFIX, "_quarantine");
320    }
321
322    // ── self-consistency ────────────────────────────────────────────────────
323
324    #[test]
325    fn self_consistent_manifest_validates() {
326        let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
327        assert_eq!(m.validate_self_consistency(), Ok(()));
328    }
329
330    #[test]
331    fn rejects_part_count_mismatch() {
332        let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
333        m.part_count = 5;
334        assert!(matches!(
335            m.validate_self_consistency(),
336            Err(ManifestInconsistency::PartCountMismatch {
337                declared: 5,
338                actual: 1
339            })
340        ));
341    }
342
343    #[test]
344    fn rejects_row_count_mismatch() {
345        let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
346        m.row_count = 999;
347        assert!(matches!(
348            m.validate_self_consistency(),
349            Err(ManifestInconsistency::RowCountMismatch {
350                declared: 999,
351                actual: 100
352            })
353        ));
354    }
355
356    #[test]
357    fn rejects_duplicate_part_id() {
358        let m = manifest_with_parts(vec![part(1, 100, 4096), part(1, 200, 8192)]);
359        let err = m.validate_self_consistency().unwrap_err();
360        assert_eq!(err, ManifestInconsistency::DuplicatePartId(1));
361    }
362
363    #[test]
364    fn rejects_unsupported_version() {
365        let mut m = manifest_with_parts(vec![]);
366        m.manifest_version = 999;
367        m.part_count = 0;
368        m.row_count = 0;
369        assert!(matches!(
370            m.validate_self_consistency(),
371            Err(ManifestInconsistency::UnsupportedVersion {
372                found: 999,
373                supported: 1
374            })
375        ));
376    }
377
378    // ── quarantined parts ──────────────────────────────────────────────────
379
380    #[test]
381    fn quarantined_parts_do_not_count_toward_row_or_part_totals() {
382        let mut p_q = part(2, 999, 8192);
383        p_q.status = PartStatus::Quarantined;
384        let m = manifest_with_parts(vec![part(1, 100, 4096), p_q]);
385
386        // The factory only counts committed; manifest must validate.
387        assert_eq!(m.validate_self_consistency(), Ok(()));
388        assert_eq!(m.committed_rows(), 100);
389        assert_eq!(m.committed_part_count(), 1);
390    }
391
392    // ── serde roundtrip ────────────────────────────────────────────────────
393
394    #[test]
395    fn json_roundtrip_preserves_fields() {
396        let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
397        let json = serde_json::to_string_pretty(&m).unwrap();
398        let parsed: RunManifest = serde_json::from_str(&json).unwrap();
399        assert_eq!(m, parsed);
400    }
401
402    #[test]
403    fn status_serializes_as_snake_case() {
404        let m = manifest_with_parts(vec![]);
405        // Force part_count=0 so the empty-parts manifest still validates self-consistency,
406        // then check the wire form.  (This test cares about the enum encoding, not totals.)
407        let mut m = m;
408        m.part_count = 0;
409        m.row_count = 0;
410        let json = serde_json::to_string(&m).unwrap();
411        assert!(json.contains("\"status\":\"success\""));
412
413        m.status = ManifestStatus::Interrupted;
414        let json = serde_json::to_string(&m).unwrap();
415        assert!(json.contains("\"status\":\"interrupted\""));
416    }
417
418    // ── success marker ─────────────────────────────────────────────────────
419
420    #[test]
421    fn success_marker_body_is_xxh3_prefix_plus_16_hex_plus_newline() {
422        let body = success_marker_body(b"some manifest bytes");
423        assert!(body.starts_with("xxh3:"), "body = {body:?}");
424        assert!(body.ends_with('\n'), "body = {body:?}");
425        let trimmed = body.trim_end();
426        let hex = &trimmed["xxh3:".len()..];
427        assert_eq!(hex.len(), 16, "body = {body:?}");
428        assert!(
429            hex.chars()
430                .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
431        );
432    }
433
434    #[test]
435    fn success_marker_body_is_deterministic_for_same_input() {
436        let a = success_marker_body(b"hello");
437        let b = success_marker_body(b"hello");
438        assert_eq!(a, b);
439    }
440
441    #[test]
442    fn success_marker_body_differs_for_different_manifest_bytes() {
443        let a = success_marker_body(b"manifest one");
444        let b = success_marker_body(b"manifest two");
445        assert_ne!(a, b);
446    }
447
448    #[test]
449    fn parse_success_marker_roundtrips_with_writer() {
450        let body = success_marker_body(b"some manifest bytes");
451        let fp = parse_success_marker(&body).expect("must parse");
452        assert!(fp.starts_with("xxh3:"));
453        assert_eq!(fp.len(), "xxh3:".len() + 16);
454    }
455
456    #[test]
457    fn parse_success_marker_rejects_malformed_bodies() {
458        assert_eq!(parse_success_marker(""), None);
459        assert_eq!(parse_success_marker("\n"), None);
460        assert_eq!(parse_success_marker("sha256:0123456789abcdef"), None);
461        // Wrong hex length:
462        assert_eq!(parse_success_marker("xxh3:0123\n"), None);
463        // Uppercase hex (we emit lowercase; reject to keep the format strict):
464        assert_eq!(parse_success_marker("xxh3:0123456789ABCDEF\n"), None);
465        // Non-hex body:
466        assert_eq!(parse_success_marker("xxh3:zzzzzzzzzzzzzzzz\n"), None);
467        // Missing prefix:
468        assert_eq!(parse_success_marker("0123456789abcdef\n"), None);
469    }
470
471    #[test]
472    fn parse_success_marker_tolerates_trailing_whitespace() {
473        let body = "xxh3:0123456789abcdef\n";
474        assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
475        // CRLF on Windows, double newline, trailing spaces — all fine.
476        let body = "xxh3:0123456789abcdef\r\n";
477        assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
478    }
479
480    #[test]
481    fn unknown_fields_are_ignored_by_reader() {
482        // ADR-0012 forward-compatibility contract: a reader compiled against
483        // v1 must tolerate v2-style fields that it doesn't recognise.
484        let json = r#"{
485            "manifest_version": 1,
486            "run_id": "r1",
487            "export_name": "t",
488            "started_at": "2026-01-01T00:00:00Z",
489            "finished_at": "2026-01-01T00:01:00Z",
490            "status": "success",
491            "source": {"engine": "postgres"},
492            "destination": {"kind": "local", "uri": "file:///tmp/out/"},
493            "format": "parquet",
494            "compression": "zstd",
495            "schema_fingerprint": "xxh3:0000000000000000",
496            "row_count": 0,
497            "part_count": 0,
498            "parts": [],
499            "future_field_added_in_v2": {"nested": true}
500        }"#;
501        let parsed: RunManifest = serde_json::from_str(json).unwrap();
502        assert_eq!(parsed.run_id, "r1");
503        assert_eq!(parsed.validate_self_consistency(), Ok(()));
504    }
505}