1#![allow(dead_code)]
30
31use serde::{Deserialize, Serialize};
32
33pub const MANIFEST_VERSION: u32 = 1;
35
36pub const MANIFEST_FILENAME: &str = "manifest.json";
38
39pub const SUCCESS_FILENAME: &str = "_SUCCESS";
42
43pub const QUARANTINE_PREFIX: &str = "_quarantine";
46
47pub const DOCTOR_PROBE_FILENAME: &str = ".rivet_doctor_probe";
52
53pub fn success_marker_body(manifest_bytes: &[u8]) -> String {
66 use xxhash_rust::xxh3::xxh3_64;
67 format!("xxh3:{:016x}\n", xxh3_64(manifest_bytes))
68}
69
70pub fn parse_success_marker(body: &str) -> Option<&str> {
80 let trimmed = body.trim_end_matches(|c: char| c.is_ascii_whitespace());
81 if trimmed.len() != "xxh3:".len() + 16 {
82 return None;
83 }
84 let (prefix, hex) = trimmed.split_at("xxh3:".len());
85 if prefix != "xxh3:" {
86 return None;
87 }
88 if !hex
89 .chars()
90 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
91 {
92 return None;
93 }
94 Some(trimmed)
95}
96
97#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102pub struct RunManifest {
103 pub manifest_version: u32,
104 pub run_id: String,
105 pub export_name: String,
106 pub started_at: String,
107 pub finished_at: String,
108 pub status: ManifestStatus,
109 pub source: ManifestSource,
110 pub destination: ManifestDestination,
111 pub format: String,
112 pub compression: String,
113 pub schema_fingerprint: String,
115 pub row_count: i64,
116 pub part_count: u32,
117 pub parts: Vec<ManifestPart>,
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127#[serde(rename_all = "snake_case")]
128pub enum ManifestStatus {
129 Success,
130 Failed,
131 Interrupted,
132}
133
134#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
135pub struct ManifestSource {
136 pub engine: String,
137 pub schema: Option<String>,
138 pub table: Option<String>,
139}
140
141#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
142pub struct ManifestDestination {
143 pub kind: String,
144 pub uri: String,
145}
146
147#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
152pub struct ManifestPart {
153 pub part_id: u32,
154 pub path: String,
155 pub rows: i64,
156 pub size_bytes: u64,
157 pub content_fingerprint: String,
161 pub status: PartStatus,
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub enum PartStatus {
167 Committed,
169 Quarantined,
171}
172
173impl RunManifest {
174 pub fn committed_rows(&self) -> i64 {
177 self.parts
178 .iter()
179 .filter(|p| p.status == PartStatus::Committed)
180 .map(|p| p.rows)
181 .sum()
182 }
183
184 pub fn committed_part_count(&self) -> usize {
186 self.parts
187 .iter()
188 .filter(|p| p.status == PartStatus::Committed)
189 .count()
190 }
191
192 pub fn validate_self_consistency(&self) -> std::result::Result<(), ManifestInconsistency> {
196 if self.manifest_version != MANIFEST_VERSION {
197 return Err(ManifestInconsistency::UnsupportedVersion {
198 found: self.manifest_version,
199 supported: MANIFEST_VERSION,
200 });
201 }
202 let actual_parts = self.committed_part_count();
203 if actual_parts != self.part_count as usize {
204 return Err(ManifestInconsistency::PartCountMismatch {
205 declared: self.part_count,
206 actual: actual_parts,
207 });
208 }
209 let actual_rows = self.committed_rows();
210 if actual_rows != self.row_count {
211 return Err(ManifestInconsistency::RowCountMismatch {
212 declared: self.row_count,
213 actual: actual_rows,
214 });
215 }
216 let mut ids: Vec<u32> = self.parts.iter().map(|p| p.part_id).collect();
218 ids.sort_unstable();
219 for w in ids.windows(2) {
220 if w[0] == w[1] {
221 return Err(ManifestInconsistency::DuplicatePartId(w[0]));
222 }
223 }
224 Ok(())
225 }
226}
227
228#[derive(Debug, PartialEq)]
233pub enum ManifestInconsistency {
234 UnsupportedVersion { found: u32, supported: u32 },
235 PartCountMismatch { declared: u32, actual: usize },
236 RowCountMismatch { declared: i64, actual: i64 },
237 DuplicatePartId(u32),
238}
239
240impl std::fmt::Display for ManifestInconsistency {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 Self::UnsupportedVersion { found, supported } => write!(
244 f,
245 "manifest_version {found} is not supported by this build (expected {supported})"
246 ),
247 Self::PartCountMismatch { declared, actual } => write!(
248 f,
249 "part_count declares {declared} parts but {actual} committed parts found"
250 ),
251 Self::RowCountMismatch { declared, actual } => write!(
252 f,
253 "row_count declares {declared} rows but committed parts sum to {actual}"
254 ),
255 Self::DuplicatePartId(id) => {
256 write!(f, "duplicate part_id {id} in manifest.parts")
257 }
258 }
259 }
260}
261
262impl std::error::Error for ManifestInconsistency {}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 fn part(id: u32, rows: i64, size: u64) -> ManifestPart {
269 ManifestPart {
270 part_id: id,
271 path: format!("part-{id:06}.parquet"),
272 rows,
273 size_bytes: size,
274 content_fingerprint: format!("xxh3:{:016x}", id as u64),
275 status: PartStatus::Committed,
276 }
277 }
278
279 fn manifest_with_parts(parts: Vec<ManifestPart>) -> RunManifest {
280 let row_count = parts
281 .iter()
282 .filter(|p| p.status == PartStatus::Committed)
283 .map(|p| p.rows)
284 .sum();
285 let part_count = parts
286 .iter()
287 .filter(|p| p.status == PartStatus::Committed)
288 .count() as u32;
289 RunManifest {
290 manifest_version: MANIFEST_VERSION,
291 run_id: "orders_20260521T120000.000".into(),
292 export_name: "public.orders".into(),
293 started_at: "2026-05-21T12:00:00Z".into(),
294 finished_at: "2026-05-21T12:14:33Z".into(),
295 status: ManifestStatus::Success,
296 source: ManifestSource {
297 engine: "postgres".into(),
298 schema: Some("public".into()),
299 table: Some("orders".into()),
300 },
301 destination: ManifestDestination {
302 kind: "gcs".into(),
303 uri: "gs://rivet-exports/public.orders/run/".into(),
304 },
305 format: "parquet".into(),
306 compression: "zstd".into(),
307 schema_fingerprint: "xxh3:0123456789abcdef".into(),
308 row_count,
309 part_count,
310 parts,
311 }
312 }
313
314 #[test]
317 fn manifest_version_is_one() {
318 assert_eq!(MANIFEST_VERSION, 1);
319 }
320
321 #[test]
322 fn filenames_are_stable() {
323 assert_eq!(MANIFEST_FILENAME, "manifest.json");
324 assert_eq!(SUCCESS_FILENAME, "_SUCCESS");
325 assert_eq!(QUARANTINE_PREFIX, "_quarantine");
326 }
327
328 #[test]
331 fn self_consistent_manifest_validates() {
332 let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
333 assert_eq!(m.validate_self_consistency(), Ok(()));
334 }
335
336 #[test]
337 fn rejects_part_count_mismatch() {
338 let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
339 m.part_count = 5;
340 assert!(matches!(
341 m.validate_self_consistency(),
342 Err(ManifestInconsistency::PartCountMismatch {
343 declared: 5,
344 actual: 1
345 })
346 ));
347 }
348
349 #[test]
350 fn rejects_row_count_mismatch() {
351 let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
352 m.row_count = 999;
353 assert!(matches!(
354 m.validate_self_consistency(),
355 Err(ManifestInconsistency::RowCountMismatch {
356 declared: 999,
357 actual: 100
358 })
359 ));
360 }
361
362 #[test]
363 fn rejects_duplicate_part_id() {
364 let m = manifest_with_parts(vec![part(1, 100, 4096), part(1, 200, 8192)]);
365 let err = m.validate_self_consistency().unwrap_err();
366 assert_eq!(err, ManifestInconsistency::DuplicatePartId(1));
367 }
368
369 #[test]
370 fn rejects_unsupported_version() {
371 let mut m = manifest_with_parts(vec![]);
372 m.manifest_version = 999;
373 m.part_count = 0;
374 m.row_count = 0;
375 assert!(matches!(
376 m.validate_self_consistency(),
377 Err(ManifestInconsistency::UnsupportedVersion {
378 found: 999,
379 supported: 1
380 })
381 ));
382 }
383
384 #[test]
387 fn quarantined_parts_do_not_count_toward_row_or_part_totals() {
388 let mut p_q = part(2, 999, 8192);
389 p_q.status = PartStatus::Quarantined;
390 let m = manifest_with_parts(vec![part(1, 100, 4096), p_q]);
391
392 assert_eq!(m.validate_self_consistency(), Ok(()));
394 assert_eq!(m.committed_rows(), 100);
395 assert_eq!(m.committed_part_count(), 1);
396 }
397
398 #[test]
401 fn json_roundtrip_preserves_fields() {
402 let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
403 let json = serde_json::to_string_pretty(&m).unwrap();
404 let parsed: RunManifest = serde_json::from_str(&json).unwrap();
405 assert_eq!(m, parsed);
406 }
407
408 #[test]
409 fn status_serializes_as_snake_case() {
410 let m = manifest_with_parts(vec![]);
411 let mut m = m;
414 m.part_count = 0;
415 m.row_count = 0;
416 let json = serde_json::to_string(&m).unwrap();
417 assert!(json.contains("\"status\":\"success\""));
418
419 m.status = ManifestStatus::Interrupted;
420 let json = serde_json::to_string(&m).unwrap();
421 assert!(json.contains("\"status\":\"interrupted\""));
422 }
423
424 #[test]
427 fn success_marker_body_is_xxh3_prefix_plus_16_hex_plus_newline() {
428 let body = success_marker_body(b"some manifest bytes");
429 assert!(body.starts_with("xxh3:"), "body = {body:?}");
430 assert!(body.ends_with('\n'), "body = {body:?}");
431 let trimmed = body.trim_end();
432 let hex = &trimmed["xxh3:".len()..];
433 assert_eq!(hex.len(), 16, "body = {body:?}");
434 assert!(
435 hex.chars()
436 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
437 );
438 }
439
440 #[test]
441 fn success_marker_body_is_deterministic_for_same_input() {
442 let a = success_marker_body(b"hello");
443 let b = success_marker_body(b"hello");
444 assert_eq!(a, b);
445 }
446
447 #[test]
448 fn success_marker_body_differs_for_different_manifest_bytes() {
449 let a = success_marker_body(b"manifest one");
450 let b = success_marker_body(b"manifest two");
451 assert_ne!(a, b);
452 }
453
454 #[test]
455 fn parse_success_marker_roundtrips_with_writer() {
456 let body = success_marker_body(b"some manifest bytes");
457 let fp = parse_success_marker(&body).expect("must parse");
458 assert!(fp.starts_with("xxh3:"));
459 assert_eq!(fp.len(), "xxh3:".len() + 16);
460 }
461
462 #[test]
463 fn parse_success_marker_rejects_malformed_bodies() {
464 assert_eq!(parse_success_marker(""), None);
465 assert_eq!(parse_success_marker("\n"), None);
466 assert_eq!(parse_success_marker("sha256:0123456789abcdef"), None);
467 assert_eq!(parse_success_marker("xxh3:0123\n"), None);
469 assert_eq!(parse_success_marker("xxh3:0123456789ABCDEF\n"), None);
471 assert_eq!(parse_success_marker("xxh3:zzzzzzzzzzzzzzzz\n"), None);
473 assert_eq!(parse_success_marker("0123456789abcdef\n"), None);
475 }
476
477 #[test]
478 fn parse_success_marker_tolerates_trailing_whitespace() {
479 let body = "xxh3:0123456789abcdef\n";
480 assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
481 let body = "xxh3:0123456789abcdef\r\n";
483 assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
484 }
485
486 #[test]
487 fn unknown_fields_are_ignored_by_reader() {
488 let json = r#"{
491 "manifest_version": 1,
492 "run_id": "r1",
493 "export_name": "t",
494 "started_at": "2026-01-01T00:00:00Z",
495 "finished_at": "2026-01-01T00:01:00Z",
496 "status": "success",
497 "source": {"engine": "postgres"},
498 "destination": {"kind": "local", "uri": "file:///tmp/out/"},
499 "format": "parquet",
500 "compression": "zstd",
501 "schema_fingerprint": "xxh3:0000000000000000",
502 "row_count": 0,
503 "part_count": 0,
504 "parts": [],
505 "future_field_added_in_v2": {"nested": true}
506 }"#;
507 let parsed: RunManifest = serde_json::from_str(json).unwrap();
508 assert_eq!(parsed.run_id, "r1");
509 assert_eq!(parsed.validate_self_consistency(), Ok(()));
510 }
511}