1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
//! **Layer: Observability**
//!
//! Manifest-aware verification for `--validate` (ADR-0012 §M5 / §M6,
//! constrained by the ADR-0013 trust-flag contract that says: no new flags;
//! manifest-aware checks live under the existing `--validate`).
//!
//! Although this module reads from a `Destination` (technically L2 surface),
//! it makes **no execution decisions**: it does not write data, advance
//! cursors, mutate state, or change the pipeline path. Its only output is
//! a structured `ManifestVerification` verdict the run report renders.
//! Per ADR-0003, that places it firmly in L4 Observability — the
//! destination read surface is just the carrier.
//!
//! Inputs (read-only):
//! - the destination's `manifest.json` body
//! - the destination's `_SUCCESS` body, if present
//! - the listing of every object under the destination prefix
//!
//! Outputs:
//! - [`ManifestVerification`] — a structured verdict the run report renders
//! into the operator-facing "Verdicts" section.
//!
//! Out of scope here:
//! - per-file row-count check (that runs *during* the export, against the
//! local temp file before upload — see `pipeline::validate::validate_output`).
//! - source-side reconciliation (lives in [`pipeline::reconcile_cmd`] and
//! is what `--reconcile` adds on top of this).
//! - re-fingerprinting parts (`--validate --deep`, future).
//!
//! Failure modes are explicit: each check produces a `Failure` enum variant
//! that is rendered verbatim in `summary.json` so an Airflow / CI consumer
//! can branch on the kind, not parse strings.
use serde::{Deserialize, Serialize};
use crate::destination::Destination;
use crate::error::Result;
use crate::manifest::{
MANIFEST_FILENAME, RunManifest, SUCCESS_FILENAME, join_key, parse_success_marker,
success_marker_body,
};
use crate::pipeline::manifest_reconcile::{PartPresence, reconcile_manifest_against_listing};
/// Outcome of a single `--validate` pass over a destination prefix.
///
/// Stable enough to be embedded in `summary.json` directly (see
/// `pipeline::report::ValidationOutcome`). Forward-compat: consumers MUST
/// ignore unknown fields (no `deny_unknown_fields`).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestVerification {
/// True iff a `manifest.json` was found at the destination and parsed.
/// `false` triggers ADR-0012 M6 fallback (legacy run); higher-level
/// check results below are then "skipped" rather than "passed".
pub manifest_found: bool,
/// Mirrors ADR-0012 M6's required `legacy_run` operator-facing label.
pub legacy_run: bool,
/// Manifest parts whose presence and recorded `size_bytes` were
/// confirmed at the destination. 0 when no manifest was found.
pub parts_verified: usize,
/// Subset of `parts_verified` whose **content** was confirmed via an MD5
/// the store surfaced in its listing (no download) — the rest are size-only.
/// Lets `passed: true` say how much of the dataset was content-checked
/// rather than implying all of it was. `#[serde(default)]` for back-compat.
#[serde(default)]
pub parts_md5_verified: usize,
/// Manifest parts that were declared `committed` but not actually
/// present, present at a different size, or otherwise mismatched.
pub parts_failed: usize,
/// True iff `_SUCCESS` exists at the destination AND its body matches
/// the fingerprint of the bytes we read for `manifest.json`. An
/// existing `_SUCCESS` whose body diverges from the manifest is itself
/// an integrity failure — surfaced via `failures`.
pub success_marker_consistent: bool,
/// Self-consistency of the manifest (`row_count`, `part_count`,
/// duplicate `part_id`s). Skipped when `manifest_found = false`.
pub manifest_self_consistent: bool,
/// Final verdict, **derived** (not hand-maintained) — `manifest_found` and
/// no *fatal* failure ([`Failure::is_fatal`]). Stored so it stays in the
/// `summary.json` contract, but computed in one place
/// ([`ManifestVerification::recompute_passed`]) so a new failure variant is
/// fatal by default rather than relying on every site to flip a bool.
pub passed: bool,
/// Per-failure detail. May be non-empty with `passed = true` for advisory
/// (non-fatal) failures like [`Failure::UntrackedObject`]. Stable variant
/// set; new variants land under a new manifest version per ADR-0012.
pub failures: Vec<Failure>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Failure {
/// Manifest declared a part that does not exist at the destination.
PartMissing { part_id: u32, path: String },
/// Manifest declared a part whose actual size differs from `size_bytes`.
PartSizeMismatch {
part_id: u32,
path: String,
expected: u64,
actual: u64,
},
/// Part present at the recorded size but its content MD5 (from the store's
/// listing metadata) differs from the manifest's — transit / at-rest
/// corruption, caught with no download.
PartChecksumMismatch {
part_id: u32,
path: String,
expected: String,
actual: String,
},
/// `_SUCCESS` exists but its body is malformed (not `xxh3:<16-hex>` after
/// trim). ADR-0012 M2 — orchestrators rely on this format being strict.
SuccessMarkerMalformed { body_preview: String },
/// `_SUCCESS` body parsed but does not match `xxh3(manifest.json bytes)`.
/// Two legitimate sources: (a) someone overwrote `_SUCCESS` after the
/// manifest was rewritten — orchestrator bug; (b) the manifest was
/// edited in place after the run — operator bug. Either way the
/// manifest is no longer trustworthy.
SuccessMarkerStale {
marker_fingerprint: String,
manifest_fingerprint: String,
},
/// `RunManifest::validate_self_consistency` rejected the manifest.
/// Usually a writer bug (declared row_count != sum of committed parts'
/// rows); blocks the rest of the verification because the manifest
/// itself is unreliable.
ManifestSelfInconsistent { detail: String },
/// Reading `manifest.json` returned an I/O error other than "absent".
ManifestReadError { detail: String },
/// Reading `_SUCCESS` returned an I/O error other than "absent".
SuccessMarkerReadError { detail: String },
/// Listing the destination prefix returned an I/O error. Reduces the
/// untracked-parts check (M5 surplus) to a no-op for this run.
ListPrefixError { detail: String },
/// A file is present at the destination prefix but no manifest entry
/// references it. M9-adjacent: `--validate` only flags it; quarantine
/// belongs to `--resume`.
UntrackedObject { key: String, size_bytes: u64 },
/// The export declared `verify: content` but some parts could only be
/// size-verified (no comparable content checksum from the store) — the
/// declared integrity contract was not met.
ContentVerificationUnmet { size_only: usize, total: usize },
}
impl Failure {
/// Whether this failure invalidates the dataset (flips `passed` to false).
///
/// Every variant is fatal **except** [`Failure::UntrackedObject`]: surplus
/// objects are an audit signal whose cleanup is `--resume`'s job (ADR-0012
/// M9), not a corruption of the manifest-listed parts. New variants are
/// fatal by default — opt out here explicitly, so a forgotten case fails
/// closed (safe) rather than silently passing.
pub fn is_fatal(&self) -> bool {
!matches!(self, Failure::UntrackedObject { .. })
}
}
impl std::fmt::Display for Failure {
/// One operator-facing line per failure variant. Used by:
/// - `pipeline::report::render_markdown` (summary.md "failure:" lines)
/// - `pipeline::validate_cmd::render_pretty` (`rivet validate` stdout)
/// - any future consumer that wants a human-readable failure label
///
/// The wire format (`failures[].kind` + per-variant fields) lives in
/// the `Serialize` derive above and is the contract Airflow / CI
/// consumers branch on. This `Display` impl is for humans only.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Failure::PartMissing { part_id, path } => {
write!(f, "part {} missing at {}", part_id, path)
}
Failure::PartSizeMismatch {
part_id,
path,
expected,
actual,
} => write!(
f,
"part {} size mismatch at {}: manifest {}, dest {}",
part_id, path, expected, actual
),
Failure::PartChecksumMismatch {
part_id,
path,
expected,
actual,
} => write!(
f,
"part {} content mismatch at {}: manifest md5 {}, dest {}",
part_id, path, expected, actual
),
Failure::SuccessMarkerMalformed { body_preview } => {
write!(f, "_SUCCESS body malformed: {body_preview:?}")
}
Failure::SuccessMarkerStale {
marker_fingerprint,
manifest_fingerprint,
} => write!(
f,
"_SUCCESS body {} != manifest fingerprint {} (stale marker)",
marker_fingerprint, manifest_fingerprint
),
Failure::ManifestSelfInconsistent { detail } => {
write!(f, "manifest self-consistency: {detail}")
}
Failure::ManifestReadError { detail } => {
write!(f, "manifest read error: {detail}")
}
Failure::SuccessMarkerReadError { detail } => {
write!(f, "_SUCCESS read error: {detail}")
}
Failure::ListPrefixError { detail } => {
write!(f, "destination listing error: {detail}")
}
Failure::UntrackedObject { key, size_bytes } => {
write!(f, "untracked object: {} ({} bytes)", key, size_bytes)
}
Failure::ContentVerificationUnmet { size_only, total } => write!(
f,
"verify: content not met — {size_only} of {total} part(s) only \
size-verified (no store checksum); lower max_file_size so parts \
upload as a single PUT, or the backend exposes no checksum"
),
}
}
}
impl ManifestVerification {
/// Base verdict: nothing checked yet (no manifest, all counts zero, all
/// sub-checks false, `passed = false`). Every constructor builds on this
/// and overrides only what differs, so a new field lands in **one** place
/// rather than several near-identical literals.
fn empty() -> Self {
Self {
manifest_found: false,
legacy_run: false,
parts_verified: 0,
parts_md5_verified: 0,
parts_failed: 0,
success_marker_consistent: false,
manifest_self_consistent: false,
passed: false,
failures: Vec::new(),
}
}
/// Recompute `passed` from the verdict's facts: a manifest was found and no
/// **fatal** failure was recorded (advisory failures like `UntrackedObject`
/// don't count). The single source of truth — callers set failures and
/// call this once, rather than flipping `passed` by hand at every site.
fn recompute_passed(&mut self) {
self.passed = self.manifest_found && !self.failures.iter().any(Failure::is_fatal);
}
/// Apply the export's `verify` policy (ADR-0013 / review D). When content
/// verification is required but some parts were only size-verified, record
/// a fatal [`Failure::ContentVerificationUnmet`] and re-derive `passed`.
/// Policy lives here (one place); the composers — run finalize and the
/// `rivet validate` command — just call it with their export's intent.
pub fn enforce_content_policy(&mut self, require_content: bool) {
if require_content && self.manifest_found {
let size_only = self.parts_verified.saturating_sub(self.parts_md5_verified);
if size_only > 0 {
self.failures.push(Failure::ContentVerificationUnmet {
size_only,
total: self.parts_verified,
});
self.recompute_passed();
}
}
}
/// Construct the M6 (legacy run) verdict for a destination that has no
/// manifest at all. Caller composes this with the existing per-file
/// row-count check; together they form the legacy `--validate` result.
pub fn legacy() -> Self {
// `passed = false` is intentional — not "validation failed" but "this
// verifier cannot certify"; the caller layers per-file row counts on
// top and composes the final verdict.
Self {
legacy_run: true,
..Self::empty()
}
}
/// True iff this verification surfaced any explicit failure (i.e. a
/// reason an orchestrator should refuse the run). Distinct from
/// `!passed`, which can also mean "legacy / not applicable".
pub fn has_failures(&self) -> bool {
!self.failures.is_empty()
}
}
/// Run the manifest-aware verification at `manifest_dir` (the destination-
/// relative directory containing `manifest.json` and `_SUCCESS`).
///
/// `manifest_dir` is the same key shape `Destination::write` was called with
/// for the manifest itself — typically empty (`""`) for prefix-rooted runs,
/// or the per-export sub-directory. Trailing `/` is optional.
///
/// This function does not panic on any expected I/O outcome — every read
/// failure becomes a `Failure::*ReadError` so the caller can render a
/// useful message instead of bailing.
pub fn verify_at_destination(
dest: &dyn Destination,
manifest_dir: &str,
) -> Result<ManifestVerification> {
let manifest_key = join_key(manifest_dir, MANIFEST_FILENAME);
let success_key = join_key(manifest_dir, SUCCESS_FILENAME);
// ── 1. Manifest read ───────────────────────────────────────────────
//
// Error-consistency contract: every I/O outcome here surfaces as a
// structured `Failure` variant rather than as `Err`. An operator gets
// one verdict shape regardless of whether the destination is missing,
// permission-denied, or temporarily unreachable. The bubbled `Err`
// path is reserved for *programmer* errors (caller passes a malformed
// `manifest_dir`, a future destination breaks an internal invariant).
let manifest_bytes = match dest.head(&manifest_key) {
Ok(None) => return Ok(ManifestVerification::legacy()),
Ok(Some(_)) => match dest.read(&manifest_key) {
Ok(b) => b,
Err(e) => {
let mut v = ManifestVerification::legacy();
v.legacy_run = false;
v.failures.push(Failure::ManifestReadError {
detail: format!("{e:#}"),
});
v.passed = false;
return Ok(v);
}
},
Err(e) => {
// `head` failure is symmetric to a `read` failure — same kind
// (`ManifestReadError`) so consumers don't have to branch on
// which method tripped. Distinct from "manifest absent"
// (Ok(None) above) which legitimately means "legacy prefix".
let mut v = ManifestVerification::legacy();
v.legacy_run = false;
v.failures.push(Failure::ManifestReadError {
detail: format!("manifest head failed: {e:#}"),
});
v.passed = false;
return Ok(v);
}
};
let manifest: RunManifest = match serde_json::from_slice(&manifest_bytes) {
Ok(m) => m,
Err(e) => {
// A malformed manifest is treated as a self-inconsistency —
// semantically equivalent for the operator (the manifest can't
// be trusted) but kept distinct in `failures` so the kind is
// explicit on the wire.
return Ok(ManifestVerification {
manifest_found: true,
failures: vec![Failure::ManifestSelfInconsistent {
detail: format!("manifest.json parse failed: {e}"),
}],
..ManifestVerification::empty()
});
}
};
// Optimistic base: a found, self-consistent manifest that passes until a
// check below flips it. Overrides only what differs from `empty()`.
let mut out = ManifestVerification {
manifest_found: true,
manifest_self_consistent: true,
passed: true,
..ManifestVerification::empty()
};
// ── 2. Self-consistency ─────────────────────────────────────────────
if let Err(e) = manifest.validate_self_consistency() {
out.manifest_self_consistent = false;
out.failures.push(Failure::ManifestSelfInconsistent {
detail: format!("{e}"),
});
// Don't short-circuit — we still want to surface part-presence
// failures because the operator may want to know both classes at
// once rather than fix-then-rerun.
}
// ── 3. Reconcile parts + surplus against ONE prefix listing ────────
//
// Presence and untracked-surplus both fall out of a single
// `reconcile_manifest_against_listing` over one `list_prefix` — the same
// pure walk chunked resume uses (`build_resume_plan`). This replaces the
// old per-part `HEAD` loop (N round-trips) and its separate untracked
// listing. Per-part failures are emitted here (step 3); untracked is
// emitted at step 5 so the failure ordering an operator reads is stable.
//
// Trade-off: presence now rides the listing, not per-part `HEAD`. If the
// listing cannot be read, an audit cannot certify the parts — so a list
// failure flips `passed = false` (a `ListPrefixError`), rather than the
// old behaviour where per-part HEAD still "verified" parts a failed
// listing couldn't enumerate. Every Rivet destination backend offers
// strong read-after-write list consistency, so the happy path is one call.
let reconciliation = match dest.list_prefix(manifest_dir) {
Ok(listing) => Some(reconcile_manifest_against_listing(
&manifest,
&listing,
manifest_dir,
)),
Err(e) => {
out.failures.push(Failure::ListPrefixError {
detail: format!("{e:#}"),
});
None
}
};
if let Some(rec) = &reconciliation {
for check in &rec.per_part {
match &check.presence {
PartPresence::Present { md5_verified } => {
out.parts_verified += 1;
if *md5_verified {
out.parts_md5_verified += 1;
}
}
PartPresence::SizeMismatch { expected, actual } => {
out.parts_failed += 1;
out.failures.push(Failure::PartSizeMismatch {
part_id: check.part_id,
path: check.path.clone(),
expected: *expected,
actual: *actual,
});
}
PartPresence::Missing => {
out.parts_failed += 1;
out.failures.push(Failure::PartMissing {
part_id: check.part_id,
path: check.path.clone(),
});
}
PartPresence::ChecksumMismatch { expected, actual } => {
out.parts_failed += 1;
out.failures.push(Failure::PartChecksumMismatch {
part_id: check.part_id,
path: check.path.clone(),
expected: expected.clone(),
actual: actual.clone(),
});
}
}
}
}
// ── 4. _SUCCESS marker consistency ─────────────────────────────────
//
// Same error-consistency contract as step 1: head/read failures become
// `Failure::SuccessMarkerReadError`, not bubbled `Err`. Absent marker
// (Ok(None)) stays informational, not a failure (M2: only successful
// runs land _SUCCESS, so its absence on a failed manifest is correct).
let success_head = match dest.head(&success_key) {
Ok(h) => h,
Err(e) => {
out.failures.push(Failure::SuccessMarkerReadError {
detail: format!("_SUCCESS head failed: {e:#}"),
});
out.recompute_passed();
return Ok(out);
}
};
match success_head {
None => {
// Absent _SUCCESS is informational, not a failure: per ADR-0012
// M2, only successful runs land it. A failed-then-rewritten
// manifest legitimately lacks _SUCCESS. Leave
// `success_marker_consistent = false` (this is a "no signal"
// bool, not a "broken" bool) and let the caller decide.
}
Some(_) => match dest.read(&success_key) {
Err(e) => {
out.failures.push(Failure::SuccessMarkerReadError {
detail: format!("{e:#}"),
});
}
Ok(body) => {
let body_str = match std::str::from_utf8(&body) {
Ok(s) => s,
Err(_) => {
out.failures.push(Failure::SuccessMarkerMalformed {
body_preview: format!("(non-utf8, {} bytes)", body.len()),
});
out.recompute_passed();
return Ok(out);
}
};
match parse_success_marker(body_str) {
None => {
out.failures.push(Failure::SuccessMarkerMalformed {
body_preview: preview(body_str),
});
}
Some(marker_fp) => {
let manifest_fp = success_marker_body(&manifest_bytes);
// success_marker_body returns the trailing `\n`
// form; trim before comparing to the parsed marker
// (which already trims).
let manifest_fp_trimmed = manifest_fp.trim_end_matches('\n');
if marker_fp == manifest_fp_trimmed {
out.success_marker_consistent = true;
} else {
out.failures.push(Failure::SuccessMarkerStale {
marker_fingerprint: marker_fp.to_string(),
manifest_fingerprint: manifest_fp_trimmed.to_string(),
});
}
}
}
}
},
}
// ── 5. Untracked surplus ───────────────────────────────────────────
//
// Already computed by the step-3 reconciliation (sidecars, quarantine,
// and the doctor probe are filtered there). Emit it last so the failure
// ordering stays parts → marker → untracked. A list failure left
// `reconciliation = None` and already flipped `passed` above.
if let Some(rec) = reconciliation {
for obj in rec.untracked {
out.failures.push(Failure::UntrackedObject {
key: obj.key,
size_bytes: obj.size_bytes,
});
}
}
out.recompute_passed();
Ok(out)
}
/// Truncate `s` to a small printable preview for error messages.
fn preview(s: &str) -> String {
let trimmed: String = s.chars().take(40).collect();
if s.chars().count() > 40 {
format!("{trimmed}…")
} else {
trimmed
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{DestinationConfig, DestinationType};
use crate::destination::local::LocalDestination;
use crate::manifest::{
MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
PartStatus, RunManifest,
};
use std::path::Path;
fn local_dest(base: &Path) -> LocalDestination {
LocalDestination::new(&DestinationConfig {
destination_type: DestinationType::Local,
path: Some(base.to_string_lossy().into_owned()),
..Default::default()
})
.unwrap()
}
fn part(part_id: u32, rows: i64, size: u64, fp: &str) -> ManifestPart {
ManifestPart {
part_id,
path: format!("part-{part_id:06}.parquet"),
rows,
size_bytes: size,
content_fingerprint: fp.into(),
content_md5: String::new(),
status: PartStatus::Committed,
}
}
fn build_manifest(parts: Vec<ManifestPart>, status: ManifestStatus) -> RunManifest {
let row_count: i64 = parts
.iter()
.filter(|p| p.status == PartStatus::Committed)
.map(|p| p.rows)
.sum();
let part_count = parts
.iter()
.filter(|p| p.status == PartStatus::Committed)
.count() as u32;
RunManifest {
manifest_version: MANIFEST_VERSION,
run_id: "r".into(),
export_name: "public.orders".into(),
started_at: "2026-05-21T12:00:00Z".into(),
finished_at: "2026-05-21T12:01:00Z".into(),
status,
source: ManifestSource {
engine: "postgres".into(),
schema: Some("public".into()),
table: Some("orders".into()),
},
destination: ManifestDestination {
kind: "local".into(),
uri: "file:///tmp/out".into(),
},
format: "parquet".into(),
compression: "zstd".into(),
schema_fingerprint: "xxh3:0123456789abcdef".into(),
row_count,
part_count,
parts,
}
}
/// Lay out a clean dataset with manifest + _SUCCESS at the root.
fn write_dataset(dir: &Path, m: &RunManifest, parts_with_bytes: &[(&str, &[u8])]) {
for (name, bytes) in parts_with_bytes {
std::fs::write(dir.join(name), bytes).unwrap();
}
let body = serde_json::to_vec_pretty(m).unwrap();
std::fs::write(dir.join(MANIFEST_FILENAME), &body).unwrap();
if matches!(m.status, ManifestStatus::Success) {
std::fs::write(dir.join(SUCCESS_FILENAME), success_marker_body(&body)).unwrap();
}
}
// ── happy path ───────────────────────────────────────────────────────
#[test]
fn happy_path_verifies_all_parts_and_success_marker() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![
part(1, 10, 4, "xxh3:1111111111111111"),
part(2, 20, 5, "xxh3:2222222222222222"),
],
ManifestStatus::Success,
);
write_dataset(
dir.path(),
&m,
&[
("part-000001.parquet", b"AAAA"),
("part-000002.parquet", b"BBBBB"),
],
);
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(!v.legacy_run);
assert_eq!(v.parts_verified, 2);
assert_eq!(v.parts_failed, 0);
assert!(v.success_marker_consistent);
assert!(v.manifest_self_consistent);
assert!(v.passed);
assert!(v.failures.is_empty());
}
// ── M6 legacy run ───────────────────────────────────────────────────
#[test]
fn no_manifest_returns_legacy_run_label() {
// Empty prefix — no manifest, no parts.
let dir = tempfile::tempdir().unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.manifest_found);
assert!(v.legacy_run);
assert_eq!(v.parts_verified, 0);
assert!(!v.passed);
assert!(v.failures.is_empty(), "no failures, just a legacy label");
}
// ── M5 part-presence failures ───────────────────────────────────────
#[test]
fn missing_part_is_flagged_with_part_id_and_path() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![
part(1, 10, 4, "xxh3:1111111111111111"),
part(2, 20, 5, "xxh3:2222222222222222"),
],
ManifestStatus::Success,
);
write_dataset(
dir.path(),
&m,
&[("part-000001.parquet", b"AAAA")], // part 2 missing
);
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert_eq!(v.parts_verified, 1);
assert_eq!(v.parts_failed, 1);
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::PartMissing { part_id: 2, .. }))
);
}
#[test]
fn part_size_mismatch_is_flagged_with_expected_and_actual() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
// Manifest claims 4 bytes; we write 6.
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"OOPSIE")]);
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.passed);
let mismatch = v
.failures
.iter()
.find_map(|f| match f {
Failure::PartSizeMismatch {
part_id,
expected,
actual,
..
} => Some((*part_id, *expected, *actual)),
_ => None,
})
.expect("must surface the size mismatch");
assert_eq!(mismatch, (1, 4, 6));
}
// ── _SUCCESS marker integrity ───────────────────────────────────────
#[test]
fn stale_success_marker_is_flagged_as_inconsistent() {
// Write a manifest, then overwrite _SUCCESS with the marker for a
// *different* manifest body — simulating an orchestrator that
// mishandled a re-run.
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(
dir.path().join(SUCCESS_FILENAME),
success_marker_body(b"different manifest body"),
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.success_marker_consistent);
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::SuccessMarkerStale { .. }))
);
}
#[test]
fn malformed_success_marker_body_is_flagged() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(dir.path().join(SUCCESS_FILENAME), b"not even xxh3 shaped").unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::SuccessMarkerMalformed { .. }))
);
}
#[test]
fn absent_success_marker_does_not_fail_validation_alone() {
// ADR-0012 M2: only successful runs land _SUCCESS. A failed-then-
// rewritten manifest legitimately lacks one — verification must
// not flip `passed` just for that.
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Failed,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
// Note: write_dataset only writes _SUCCESS for status == Success,
// so no marker exists here.
assert!(!dir.path().join(SUCCESS_FILENAME).exists());
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(
!v.success_marker_consistent,
"no marker => false (no signal)"
);
// The parts still verified, so passed = true.
assert!(v.passed);
assert!(v.failures.is_empty());
}
// ── self-consistency ────────────────────────────────────────────────
#[test]
fn self_inconsistent_manifest_is_flagged_but_part_check_still_runs() {
let dir = tempfile::tempdir().unwrap();
let mut m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
m.row_count = 9999; // lie
let body = serde_json::to_vec_pretty(&m).unwrap();
std::fs::write(dir.path().join("part-000001.parquet"), b"AAAA").unwrap();
std::fs::write(dir.path().join(MANIFEST_FILENAME), &body).unwrap();
std::fs::write(
dir.path().join(SUCCESS_FILENAME),
success_marker_body(&body),
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(!v.manifest_self_consistent);
assert!(!v.passed);
// Parts that are physically present still get their `parts_verified`
// counter bumped — both signals are independently useful.
assert_eq!(v.parts_verified, 1);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::ManifestSelfInconsistent { .. }))
);
}
// ── untracked objects ───────────────────────────────────────────────
#[test]
fn untracked_object_under_prefix_is_flagged() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(dir.path().join("rogue.parquet"), b"XX").unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(
v.failures.iter().any(
|f| matches!(f, Failure::UntrackedObject { key, .. } if key == "rogue.parquet")
)
);
// Untracked objects are surfaced but do NOT flip `passed` — that
// is the resume-side decision (M9). Parts and marker are fine,
// so passed remains true.
assert!(v.passed);
}
#[test]
fn quarantine_prefix_objects_are_silently_ignored() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::create_dir_all(dir.path().join(crate::manifest::QUARANTINE_PREFIX)).unwrap();
std::fs::write(
dir.path()
.join(crate::manifest::QUARANTINE_PREFIX)
.join("old.parquet"),
b"OO",
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.passed);
assert!(
!v.failures
.iter()
.any(|f| matches!(f, Failure::UntrackedObject { .. })),
"quarantine_prefix is the legitimate home for these — must not flag"
);
}
#[test]
fn doctor_probe_is_not_flagged_as_untracked() {
// Regression: `rivet doctor` writes `.rivet_doctor_probe` at the
// destination prefix and never removes it. A subsequent
// `rivet run --validate` against the same prefix must treat it as a
// Rivet sidecar, not foreign data — otherwise `has_failures()` trips
// and the run's `validated` flag is downgraded to FAIL.
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(
dir.path().join(crate::manifest::DOCTOR_PROBE_FILENAME),
b"ok",
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(
!v.has_failures(),
"doctor probe must not surface as a failure: {:?}",
v.failures
);
assert!(v.passed);
}
// ── manifest_dir join semantics ─────────────────────────────────────
#[test]
fn verifies_in_subdirectory_when_manifest_dir_is_non_empty() {
let outer = tempfile::tempdir().unwrap();
std::fs::create_dir_all(outer.path().join("sub/run")).unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
let body = serde_json::to_vec_pretty(&m).unwrap();
std::fs::write(outer.path().join("sub/run/part-000001.parquet"), b"AAAA").unwrap();
std::fs::write(outer.path().join("sub/run").join(MANIFEST_FILENAME), &body).unwrap();
std::fs::write(
outer.path().join("sub/run").join(SUCCESS_FILENAME),
success_marker_body(&body),
)
.unwrap();
let dest = local_dest(outer.path());
let v = verify_at_destination(&dest, "sub/run").unwrap();
assert!(v.passed);
assert_eq!(v.parts_verified, 1);
// Trailing slash is normalised — same outcome.
let v2 = verify_at_destination(&dest, "sub/run/").unwrap();
assert!(v2.passed);
}
// ── list-failure semantics (presence now rides the listing) ──────────
/// Wraps a real `LocalDestination` but fails every `list_prefix`. Used to
/// pin the post-refactor contract: presence is derived from the listing,
/// so a listing we cannot read means the audit cannot certify the parts.
struct ListFails(LocalDestination);
impl crate::destination::Destination for ListFails {
fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
self.0.write(p, k)
}
fn capabilities(&self) -> crate::destination::DestinationCapabilities {
self.0.capabilities()
}
fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
self.0.head(k)
}
fn read(&self, k: &str) -> Result<Vec<u8>> {
self.0.read(k)
}
fn list_prefix(&self, _: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
anyhow::bail!("listing unavailable")
}
}
#[test]
fn list_failure_cannot_certify_parts_and_fails_the_audit() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(vec![part(0, 3, 3, "xxh3:0")], ManifestStatus::Success);
write_dataset(dir.path(), &m, &[("part-000000.parquet", b"abc")]);
let dest = ListFails(local_dest(dir.path()));
let v = verify_at_destination(&dest, "").unwrap();
// The manifest itself reads + parses fine (HEAD/read still work)…
assert!(v.manifest_found);
assert!(v.manifest_self_consistent);
// …but with no listing we verify zero parts and refuse to pass.
assert!(
!v.passed,
"an audit that cannot list the prefix must not pass"
);
assert_eq!(v.parts_verified, 0);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::ListPrefixError { .. })),
"expected a ListPrefixError, got: {:?}",
v.failures
);
}
#[test]
fn passed_is_derived_advisory_failures_do_not_fail() {
// An advisory failure (untracked surplus) keeps the verdict passing…
let mut v = ManifestVerification {
manifest_found: true,
..ManifestVerification::empty()
};
v.failures.push(Failure::UntrackedObject {
key: "stray.parquet".into(),
size_bytes: 9,
});
v.recompute_passed();
assert!(v.passed, "untracked surplus is advisory, not fatal");
// …while any fatal failure flips it.
v.failures.push(Failure::PartMissing {
part_id: 1,
path: "part-000001.parquet".into(),
});
v.recompute_passed();
assert!(!v.passed, "a missing part is fatal");
// No manifest → never passes regardless of failures.
let mut legacy = ManifestVerification::empty();
legacy.recompute_passed();
assert!(!legacy.passed, "no manifest found → cannot certify");
}
#[test]
fn verify_content_policy_fails_only_size_only_parts() {
// 3 parts, 2 content-checked, 1 size-only.
let base = ManifestVerification {
manifest_found: true,
parts_verified: 3,
parts_md5_verified: 2,
..ManifestVerification::empty()
};
// verify: size → size-only is acceptable, passes.
let mut sz = base.clone();
sz.recompute_passed();
sz.enforce_content_policy(false);
assert!(sz.passed, "size-only OK under verify: size");
// verify: content → the 1 size-only part is a fatal failure.
let mut ct = base.clone();
ct.recompute_passed();
ct.enforce_content_policy(true);
assert!(!ct.passed, "a size-only part fails verify: content");
assert!(
ct.failures.iter().any(|f| matches!(
f,
Failure::ContentVerificationUnmet {
size_only: 1,
total: 3
}
)),
"expected ContentVerificationUnmet, got: {:?}",
ct.failures
);
// verify: content with every part md5-checked → satisfied.
let mut all = ManifestVerification {
parts_md5_verified: 3,
..base
};
all.recompute_passed();
all.enforce_content_policy(true);
assert!(
all.passed && all.failures.is_empty(),
"all md5 meets verify: content"
);
}
}