Skip to main content

rivet/
manifest.rs

1//! **Layer: Trust contract**
2//!
3//! Public JSON manifest written next to every cloud-or-local-file run's
4//! output.  Defines the wire schema and the in-memory builder; the actual
5//! writer (atomic rename / atomic PUT) lives next to the destination
6//! implementations.
7//!
8//! Invariants are documented in [`docs/adr/0012-cloud-manifest-contract.md`].
9//! This module owns *only* the data types and a tiny set of pure helpers —
10//! ordering, atomicity, and `_SUCCESS` semantics belong to the writer.
11//!
12//! The manifest is read by:
13//! - `--resume` (decision matrix M8: skip / rewrite / quarantine)
14//! - `--validate` (M5: every listed part exists at recorded size)
15//! - `--reconcile` (manifest row counts vs source `COUNT(*)`)
16//! - the run report (informational; not a verdict source)
17//!
18//! Forward compatibility: callers MUST ignore unknown fields when reading.
19//! Field additions are non-breaking; field removals or type changes require
20//! a [`MANIFEST_VERSION`] bump.
21
22// The wire types (RunManifest, ManifestPart, ...) and the writer-side
23// helpers (success_marker_body) are already wired into the pipeline; the
24// reader-side helpers (validate_self_consistency, committed_rows,
25// committed_part_count, parse_success_marker, ManifestInconsistency) ship
26// next when `--validate` / `--reconcile` learn to inspect the manifest.
27// Mark the whole module as dead-code-tolerant until then so the bin crate
28// (which doesn't compile tests) stays clean.
29#![allow(dead_code)]
30
31use serde::{Deserialize, Serialize};
32
33/// Current manifest schema version.  See ADR-0012 §Manifest schema.
34pub const MANIFEST_VERSION: u32 = 1;
35
36/// File name of the manifest at the destination prefix.
37pub const MANIFEST_FILENAME: &str = "manifest.json";
38
39/// File name of the success marker.  Written *after* the manifest per M2;
40/// its presence implies M5 (every listed part exists at recorded size).
41pub const SUCCESS_FILENAME: &str = "_SUCCESS";
42
43/// Prefix under which untracked / corrupt parts are moved on resume (M9).
44/// Layout: `<prefix>/_quarantine/<run_id>/<original-name>`.
45pub const QUARANTINE_PREFIX: &str = "_quarantine";
46
47/// Writability probe `rivet doctor` drops at the destination prefix.  It is a
48/// Rivet-internal sidecar (like [`MANIFEST_FILENAME`] / [`SUCCESS_FILENAME`]),
49/// so the manifest-aware `--validate` pass must not flag it as an untracked
50/// foreign object when a run follows a `doctor` against the same prefix.
51pub const DOCTOR_PROBE_FILENAME: &str = ".rivet_doctor_probe";
52
53/// Join a manifest-relative key (e.g. a part `path`, [`MANIFEST_FILENAME`])
54/// onto a destination sub-directory.  An empty `dir` returns `key` unchanged
55/// — the common case, since production callers pass `""` (the manifest lives
56/// at the prefix root).  Shared by the destination-verification and
57/// resume-reconciliation paths so both speak the same key namespace.
58pub fn join_key(dir: &str, key: &str) -> String {
59    let dir = dir.trim_end_matches('/');
60    if dir.is_empty() {
61        key.to_string()
62    } else {
63        format!("{dir}/{key}")
64    }
65}
66
67/// Compute the body of the `_SUCCESS` marker for a given serialized manifest.
68///
69/// Format: a single line `"xxh3:<16-hex>\n"`.  ADR-0012 M2 — `_SUCCESS`
70/// carries the manifest fingerprint so an orchestrator can detect manifest
71/// changes (rerun, resume that completed, repair) with a cheap `GET _SUCCESS`
72/// instead of refetching the full manifest body.
73///
74/// `manifest_bytes` must be the exact bytes that were written to `manifest.json`
75/// — usually the result of `serde_json::to_vec_pretty(&RunManifest)`.  The
76/// caller is responsible for using the same bytes for both writes; computing
77/// the fingerprint from a re-serialized struct would risk encoding drift
78/// (key ordering, whitespace) producing a different hash.
79pub fn success_marker_body(manifest_bytes: &[u8]) -> String {
80    use xxhash_rust::xxh3::xxh3_64;
81    format!("xxh3:{:016x}\n", xxh3_64(manifest_bytes))
82}
83
84/// Parse the fingerprint out of a `_SUCCESS` marker body.
85///
86/// Returns `Some("xxh3:<hex>")` on a well-formed marker, `None` on anything
87/// else (empty file, missing prefix, wrong length, non-hex body).  Trailing
88/// whitespace and newlines are tolerated to match the on-wire shape produced
89/// by [`success_marker_body`].
90///
91/// Used by `--validate` and by external polling consumers (Airflow sensors,
92/// CI checks) to decide whether a cached manifest is still current.
93pub fn parse_success_marker(body: &str) -> Option<&str> {
94    let trimmed = body.trim_end_matches(|c: char| c.is_ascii_whitespace());
95    if trimmed.len() != "xxh3:".len() + 16 {
96        return None;
97    }
98    let (prefix, hex) = trimmed.split_at("xxh3:".len());
99    if prefix != "xxh3:" {
100        return None;
101    }
102    if !hex
103        .chars()
104        .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
105    {
106        return None;
107    }
108    Some(trimmed)
109}
110
111/// Public, stable JSON shape for the run manifest.
112///
113/// One manifest is written per `run_id` per export.  See ADR-0012 M4
114/// (Append-Only Per Run) for the resume-across-interruption story.
115#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
116pub struct RunManifest {
117    pub manifest_version: u32,
118    pub run_id: String,
119    pub export_name: String,
120    pub started_at: String,
121    pub finished_at: String,
122    pub status: ManifestStatus,
123    pub source: ManifestSource,
124    pub destination: ManifestDestination,
125    pub format: String,
126    pub compression: String,
127    /// xxh3 fingerprint of the column schema; see [`crate::state::schema_fingerprint`].
128    pub schema_fingerprint: String,
129    pub row_count: i64,
130    pub part_count: u32,
131    pub parts: Vec<ManifestPart>,
132}
133
134/// Terminal status of the run *as recorded by the writer*.
135///
136/// `success` is only written when M2 (Manifest Before SUCCESS) is satisfied
137/// — i.e. when the writer is about to drop the `_SUCCESS` marker.
138/// `failed` and `interrupted` manifests serve as audit trails and as input
139/// to resume; they do NOT trigger `_SUCCESS`.
140#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
141#[serde(rename_all = "snake_case")]
142pub enum ManifestStatus {
143    Success,
144    Failed,
145    Interrupted,
146}
147
148#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
149pub struct ManifestSource {
150    pub engine: String,
151    pub schema: Option<String>,
152    pub table: Option<String>,
153}
154
155#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
156pub struct ManifestDestination {
157    pub kind: String,
158    pub uri: String,
159}
160
161/// One committed (or quarantined) output part.
162///
163/// `path` is **relative to the destination prefix** (ADR-0012 §Manifest
164/// schema) so the manifest is portable across copies of the dataset.
165#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
166pub struct ManifestPart {
167    pub part_id: u32,
168    pub path: String,
169    pub rows: i64,
170    pub size_bytes: u64,
171    /// xxh3 fingerprint of the part body.  Format mirrors [`crate::state::schema_fingerprint`]:
172    /// `"xxh3:<16-hex>"`.  Algorithm prefix MUST be checked before interpreting
173    /// the hex body (sha256/blake3 reserved for future hashers).
174    pub content_fingerprint: String,
175    /// Base64 MD5 of the part body, in GCS's `md5Hash` encoding — lets
176    /// destination verification compare against the object's listing metadata
177    /// with **no download** (GCS/S3/Azure surface this; the comparison rides
178    /// the listing `--validate` already does).  Empty for legacy manifests and
179    /// for parts whose MD5 could not be computed; the check then degrades to
180    /// size-only.  `#[serde(default)]` keeps pre-0.7.x manifests parseable.
181    #[serde(default)]
182    pub content_md5: String,
183    pub status: PartStatus,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
187#[serde(rename_all = "snake_case")]
188pub enum PartStatus {
189    /// Listed in the active manifest at the destination.
190    Committed,
191    /// Found in a prior manifest but rejected on resume (M9); retained for audit.
192    Quarantined,
193}
194
195impl RunManifest {
196    /// Sum of `rows` across `Committed` parts.  Used by M5 sanity checks and
197    /// by `--reconcile` to compare against source `COUNT(*)`.
198    pub fn committed_rows(&self) -> i64 {
199        self.parts
200            .iter()
201            .filter(|p| p.status == PartStatus::Committed)
202            .map(|p| p.rows)
203            .sum()
204    }
205
206    /// Number of `Committed` parts.
207    pub fn committed_part_count(&self) -> usize {
208        self.parts
209            .iter()
210            .filter(|p| p.status == PartStatus::Committed)
211            .count()
212    }
213
214    /// Verify that the recorded aggregates (`row_count`, `part_count`) match
215    /// the actual `Committed` parts in `parts`.  A mismatch is a writer bug;
216    /// callers should refuse to act on the manifest until investigated.
217    pub fn validate_self_consistency(&self) -> std::result::Result<(), ManifestInconsistency> {
218        if self.manifest_version != MANIFEST_VERSION {
219            return Err(ManifestInconsistency::UnsupportedVersion {
220                found: self.manifest_version,
221                supported: MANIFEST_VERSION,
222            });
223        }
224        let actual_parts = self.committed_part_count();
225        if actual_parts != self.part_count as usize {
226            return Err(ManifestInconsistency::PartCountMismatch {
227                declared: self.part_count,
228                actual: actual_parts,
229            });
230        }
231        let actual_rows = self.committed_rows();
232        if actual_rows != self.row_count {
233            return Err(ManifestInconsistency::RowCountMismatch {
234                declared: self.row_count,
235                actual: actual_rows,
236            });
237        }
238        // Part IDs must be unique within a manifest.
239        let mut ids: Vec<u32> = self.parts.iter().map(|p| p.part_id).collect();
240        ids.sort_unstable();
241        for w in ids.windows(2) {
242            if w[0] == w[1] {
243                return Err(ManifestInconsistency::DuplicatePartId(w[0]));
244            }
245        }
246        Ok(())
247    }
248}
249
250/// Self-consistency failures detected by [`RunManifest::validate_self_consistency`].
251///
252/// These represent writer bugs, not destination drift; M5 destination-state
253/// checks live in the validate command path.
254#[derive(Debug, PartialEq)]
255pub enum ManifestInconsistency {
256    UnsupportedVersion { found: u32, supported: u32 },
257    PartCountMismatch { declared: u32, actual: usize },
258    RowCountMismatch { declared: i64, actual: i64 },
259    DuplicatePartId(u32),
260}
261
262impl std::fmt::Display for ManifestInconsistency {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        match self {
265            Self::UnsupportedVersion { found, supported } => write!(
266                f,
267                "manifest_version {found} is not supported by this build (expected {supported})"
268            ),
269            Self::PartCountMismatch { declared, actual } => write!(
270                f,
271                "part_count declares {declared} parts but {actual} committed parts found"
272            ),
273            Self::RowCountMismatch { declared, actual } => write!(
274                f,
275                "row_count declares {declared} rows but committed parts sum to {actual}"
276            ),
277            Self::DuplicatePartId(id) => {
278                write!(f, "duplicate part_id {id} in manifest.parts")
279            }
280        }
281    }
282}
283
284impl std::error::Error for ManifestInconsistency {}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    fn part(id: u32, rows: i64, size: u64) -> ManifestPart {
291        ManifestPart {
292            part_id: id,
293            path: format!("part-{id:06}.parquet"),
294            rows,
295            size_bytes: size,
296            content_fingerprint: format!("xxh3:{:016x}", id as u64),
297            content_md5: String::new(),
298            status: PartStatus::Committed,
299        }
300    }
301
302    fn manifest_with_parts(parts: Vec<ManifestPart>) -> RunManifest {
303        let row_count = parts
304            .iter()
305            .filter(|p| p.status == PartStatus::Committed)
306            .map(|p| p.rows)
307            .sum();
308        let part_count = parts
309            .iter()
310            .filter(|p| p.status == PartStatus::Committed)
311            .count() as u32;
312        RunManifest {
313            manifest_version: MANIFEST_VERSION,
314            run_id: "orders_20260521T120000.000".into(),
315            export_name: "public.orders".into(),
316            started_at: "2026-05-21T12:00:00Z".into(),
317            finished_at: "2026-05-21T12:14:33Z".into(),
318            status: ManifestStatus::Success,
319            source: ManifestSource {
320                engine: "postgres".into(),
321                schema: Some("public".into()),
322                table: Some("orders".into()),
323            },
324            destination: ManifestDestination {
325                kind: "gcs".into(),
326                uri: "gs://rivet-exports/public.orders/run/".into(),
327            },
328            format: "parquet".into(),
329            compression: "zstd".into(),
330            schema_fingerprint: "xxh3:0123456789abcdef".into(),
331            row_count,
332            part_count,
333            parts,
334        }
335    }
336
337    // ── constants ───────────────────────────────────────────────────────────
338
339    #[test]
340    fn manifest_version_is_one() {
341        assert_eq!(MANIFEST_VERSION, 1);
342    }
343
344    #[test]
345    fn filenames_are_stable() {
346        assert_eq!(MANIFEST_FILENAME, "manifest.json");
347        assert_eq!(SUCCESS_FILENAME, "_SUCCESS");
348        assert_eq!(QUARANTINE_PREFIX, "_quarantine");
349    }
350
351    // ── self-consistency ────────────────────────────────────────────────────
352
353    #[test]
354    fn self_consistent_manifest_validates() {
355        let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
356        assert_eq!(m.validate_self_consistency(), Ok(()));
357    }
358
359    #[test]
360    fn rejects_part_count_mismatch() {
361        let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
362        m.part_count = 5;
363        assert!(matches!(
364            m.validate_self_consistency(),
365            Err(ManifestInconsistency::PartCountMismatch {
366                declared: 5,
367                actual: 1
368            })
369        ));
370    }
371
372    #[test]
373    fn rejects_row_count_mismatch() {
374        let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
375        m.row_count = 999;
376        assert!(matches!(
377            m.validate_self_consistency(),
378            Err(ManifestInconsistency::RowCountMismatch {
379                declared: 999,
380                actual: 100
381            })
382        ));
383    }
384
385    #[test]
386    fn rejects_duplicate_part_id() {
387        let m = manifest_with_parts(vec![part(1, 100, 4096), part(1, 200, 8192)]);
388        let err = m.validate_self_consistency().unwrap_err();
389        assert_eq!(err, ManifestInconsistency::DuplicatePartId(1));
390    }
391
392    #[test]
393    fn rejects_unsupported_version() {
394        let mut m = manifest_with_parts(vec![]);
395        m.manifest_version = 999;
396        m.part_count = 0;
397        m.row_count = 0;
398        assert!(matches!(
399            m.validate_self_consistency(),
400            Err(ManifestInconsistency::UnsupportedVersion {
401                found: 999,
402                supported: 1
403            })
404        ));
405    }
406
407    // ── quarantined parts ──────────────────────────────────────────────────
408
409    #[test]
410    fn quarantined_parts_do_not_count_toward_row_or_part_totals() {
411        let mut p_q = part(2, 999, 8192);
412        p_q.status = PartStatus::Quarantined;
413        let m = manifest_with_parts(vec![part(1, 100, 4096), p_q]);
414
415        // The factory only counts committed; manifest must validate.
416        assert_eq!(m.validate_self_consistency(), Ok(()));
417        assert_eq!(m.committed_rows(), 100);
418        assert_eq!(m.committed_part_count(), 1);
419    }
420
421    // ── serde roundtrip ────────────────────────────────────────────────────
422
423    #[test]
424    fn json_roundtrip_preserves_fields() {
425        let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
426        let json = serde_json::to_string_pretty(&m).unwrap();
427        let parsed: RunManifest = serde_json::from_str(&json).unwrap();
428        assert_eq!(m, parsed);
429    }
430
431    #[test]
432    fn status_serializes_as_snake_case() {
433        let m = manifest_with_parts(vec![]);
434        // Force part_count=0 so the empty-parts manifest still validates self-consistency,
435        // then check the wire form.  (This test cares about the enum encoding, not totals.)
436        let mut m = m;
437        m.part_count = 0;
438        m.row_count = 0;
439        let json = serde_json::to_string(&m).unwrap();
440        assert!(json.contains("\"status\":\"success\""));
441
442        m.status = ManifestStatus::Interrupted;
443        let json = serde_json::to_string(&m).unwrap();
444        assert!(json.contains("\"status\":\"interrupted\""));
445    }
446
447    // ── success marker ─────────────────────────────────────────────────────
448
449    #[test]
450    fn success_marker_body_is_xxh3_prefix_plus_16_hex_plus_newline() {
451        let body = success_marker_body(b"some manifest bytes");
452        assert!(body.starts_with("xxh3:"), "body = {body:?}");
453        assert!(body.ends_with('\n'), "body = {body:?}");
454        let trimmed = body.trim_end();
455        let hex = &trimmed["xxh3:".len()..];
456        assert_eq!(hex.len(), 16, "body = {body:?}");
457        assert!(
458            hex.chars()
459                .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
460        );
461    }
462
463    #[test]
464    fn success_marker_body_is_deterministic_for_same_input() {
465        let a = success_marker_body(b"hello");
466        let b = success_marker_body(b"hello");
467        assert_eq!(a, b);
468    }
469
470    #[test]
471    fn success_marker_body_differs_for_different_manifest_bytes() {
472        let a = success_marker_body(b"manifest one");
473        let b = success_marker_body(b"manifest two");
474        assert_ne!(a, b);
475    }
476
477    #[test]
478    fn parse_success_marker_roundtrips_with_writer() {
479        let body = success_marker_body(b"some manifest bytes");
480        let fp = parse_success_marker(&body).expect("must parse");
481        assert!(fp.starts_with("xxh3:"));
482        assert_eq!(fp.len(), "xxh3:".len() + 16);
483    }
484
485    #[test]
486    fn parse_success_marker_rejects_malformed_bodies() {
487        assert_eq!(parse_success_marker(""), None);
488        assert_eq!(parse_success_marker("\n"), None);
489        assert_eq!(parse_success_marker("sha256:0123456789abcdef"), None);
490        // Wrong hex length:
491        assert_eq!(parse_success_marker("xxh3:0123\n"), None);
492        // Uppercase hex (we emit lowercase; reject to keep the format strict):
493        assert_eq!(parse_success_marker("xxh3:0123456789ABCDEF\n"), None);
494        // Non-hex body:
495        assert_eq!(parse_success_marker("xxh3:zzzzzzzzzzzzzzzz\n"), None);
496        // Missing prefix:
497        assert_eq!(parse_success_marker("0123456789abcdef\n"), None);
498    }
499
500    #[test]
501    fn parse_success_marker_tolerates_trailing_whitespace() {
502        let body = "xxh3:0123456789abcdef\n";
503        assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
504        // CRLF on Windows, double newline, trailing spaces — all fine.
505        let body = "xxh3:0123456789abcdef\r\n";
506        assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
507    }
508
509    #[test]
510    fn unknown_fields_are_ignored_by_reader() {
511        // ADR-0012 forward-compatibility contract: a reader compiled against
512        // v1 must tolerate v2-style fields that it doesn't recognise.
513        let json = r#"{
514            "manifest_version": 1,
515            "run_id": "r1",
516            "export_name": "t",
517            "started_at": "2026-01-01T00:00:00Z",
518            "finished_at": "2026-01-01T00:01:00Z",
519            "status": "success",
520            "source": {"engine": "postgres"},
521            "destination": {"kind": "local", "uri": "file:///tmp/out/"},
522            "format": "parquet",
523            "compression": "zstd",
524            "schema_fingerprint": "xxh3:0000000000000000",
525            "row_count": 0,
526            "part_count": 0,
527            "parts": [],
528            "future_field_added_in_v2": {"nested": true}
529        }"#;
530        let parsed: RunManifest = serde_json::from_str(json).unwrap();
531        assert_eq!(parsed.run_id, "r1");
532        assert_eq!(parsed.validate_self_consistency(), Ok(()));
533    }
534}