1#![allow(dead_code)]
30
31use serde::{Deserialize, Serialize};
32
33pub const MANIFEST_VERSION: u32 = 1;
35
36pub const MANIFEST_FILENAME: &str = "manifest.json";
38
39pub const SUCCESS_FILENAME: &str = "_SUCCESS";
42
43pub const QUARANTINE_PREFIX: &str = "_quarantine";
46
47pub const DOCTOR_PROBE_FILENAME: &str = ".rivet_doctor_probe";
52
53pub fn join_key(dir: &str, key: &str) -> String {
59 let dir = dir.trim_end_matches('/');
60 if dir.is_empty() {
61 key.to_string()
62 } else {
63 format!("{dir}/{key}")
64 }
65}
66
67pub fn success_marker_body(manifest_bytes: &[u8]) -> String {
80 use xxhash_rust::xxh3::xxh3_64;
81 format!("xxh3:{:016x}\n", xxh3_64(manifest_bytes))
82}
83
84pub fn parse_success_marker(body: &str) -> Option<&str> {
94 let trimmed = body.trim_end_matches(|c: char| c.is_ascii_whitespace());
95 if trimmed.len() != "xxh3:".len() + 16 {
96 return None;
97 }
98 let (prefix, hex) = trimmed.split_at("xxh3:".len());
99 if prefix != "xxh3:" {
100 return None;
101 }
102 if !hex
103 .chars()
104 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
105 {
106 return None;
107 }
108 Some(trimmed)
109}
110
111#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
117pub struct ColumnChecksum {
118 pub name: String,
119 pub checksum: String,
120}
121
122#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct RunManifest {
128 pub manifest_version: u32,
129 pub run_id: String,
130 pub export_name: String,
131 pub started_at: String,
132 pub finished_at: String,
133 pub status: ManifestStatus,
134 pub source: ManifestSource,
135 pub destination: ManifestDestination,
136 pub format: String,
137 pub compression: String,
138 pub schema_fingerprint: String,
140 pub row_count: i64,
141 pub part_count: u32,
142 pub parts: Vec<ManifestPart>,
143 #[serde(default, skip_serializing_if = "Option::is_none")]
150 pub column_checksums: Option<Vec<ColumnChecksum>>,
151 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub checksum_key_column: Option<String>,
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub enum ManifestStatus {
167 Success,
168 Failed,
169 Interrupted,
170}
171
172#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
173pub struct ManifestSource {
174 pub engine: String,
175 pub schema: Option<String>,
176 pub table: Option<String>,
177}
178
179#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
180pub struct ManifestDestination {
181 pub kind: String,
182 pub uri: String,
183}
184
185#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
190pub struct ManifestPart {
191 pub part_id: u32,
192 pub path: String,
193 pub rows: i64,
194 pub size_bytes: u64,
195 pub content_fingerprint: String,
199 #[serde(default)]
206 pub content_md5: String,
207 pub status: PartStatus,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
211#[serde(rename_all = "snake_case")]
212pub enum PartStatus {
213 Committed,
215 Quarantined,
217}
218
219impl RunManifest {
220 pub fn committed_rows(&self) -> i64 {
223 self.parts
224 .iter()
225 .filter(|p| p.status == PartStatus::Committed)
226 .map(|p| p.rows)
227 .sum()
228 }
229
230 pub fn committed_part_count(&self) -> usize {
232 self.parts
233 .iter()
234 .filter(|p| p.status == PartStatus::Committed)
235 .count()
236 }
237
238 pub fn validate_self_consistency(&self) -> std::result::Result<(), ManifestInconsistency> {
242 if self.manifest_version != MANIFEST_VERSION {
243 return Err(ManifestInconsistency::UnsupportedVersion {
244 found: self.manifest_version,
245 supported: MANIFEST_VERSION,
246 });
247 }
248 let actual_parts = self.committed_part_count();
249 if actual_parts != self.part_count as usize {
250 return Err(ManifestInconsistency::PartCountMismatch {
251 declared: self.part_count,
252 actual: actual_parts,
253 });
254 }
255 let actual_rows = self.committed_rows();
256 if actual_rows != self.row_count {
257 return Err(ManifestInconsistency::RowCountMismatch {
258 declared: self.row_count,
259 actual: actual_rows,
260 });
261 }
262 let mut ids: Vec<u32> = self.parts.iter().map(|p| p.part_id).collect();
264 ids.sort_unstable();
265 for w in ids.windows(2) {
266 if w[0] == w[1] {
267 return Err(ManifestInconsistency::DuplicatePartId(w[0]));
268 }
269 }
270 Ok(())
271 }
272}
273
274#[derive(Debug, PartialEq)]
279pub enum ManifestInconsistency {
280 UnsupportedVersion { found: u32, supported: u32 },
281 PartCountMismatch { declared: u32, actual: usize },
282 RowCountMismatch { declared: i64, actual: i64 },
283 DuplicatePartId(u32),
284}
285
286impl std::fmt::Display for ManifestInconsistency {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 match self {
289 Self::UnsupportedVersion { found, supported } => write!(
290 f,
291 "manifest_version {found} is not supported by this build (expected {supported})"
292 ),
293 Self::PartCountMismatch { declared, actual } => write!(
294 f,
295 "part_count declares {declared} parts but {actual} committed parts found"
296 ),
297 Self::RowCountMismatch { declared, actual } => write!(
298 f,
299 "row_count declares {declared} rows but committed parts sum to {actual}"
300 ),
301 Self::DuplicatePartId(id) => {
302 write!(f, "duplicate part_id {id} in manifest.parts")
303 }
304 }
305 }
306}
307
308impl std::error::Error for ManifestInconsistency {}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 fn part(id: u32, rows: i64, size: u64) -> ManifestPart {
315 ManifestPart {
316 part_id: id,
317 path: format!("part-{id:06}.parquet"),
318 rows,
319 size_bytes: size,
320 content_fingerprint: format!("xxh3:{:016x}", id as u64),
321 content_md5: String::new(),
322 status: PartStatus::Committed,
323 }
324 }
325
326 fn manifest_with_parts(parts: Vec<ManifestPart>) -> RunManifest {
327 let row_count = parts
328 .iter()
329 .filter(|p| p.status == PartStatus::Committed)
330 .map(|p| p.rows)
331 .sum();
332 let part_count = parts
333 .iter()
334 .filter(|p| p.status == PartStatus::Committed)
335 .count() as u32;
336 RunManifest {
337 manifest_version: MANIFEST_VERSION,
338 run_id: "orders_20260521T120000.000".into(),
339 export_name: "public.orders".into(),
340 started_at: "2026-05-21T12:00:00Z".into(),
341 finished_at: "2026-05-21T12:14:33Z".into(),
342 status: ManifestStatus::Success,
343 source: ManifestSource {
344 engine: "postgres".into(),
345 schema: Some("public".into()),
346 table: Some("orders".into()),
347 },
348 destination: ManifestDestination {
349 kind: "gcs".into(),
350 uri: "gs://rivet-exports/public.orders/run/".into(),
351 },
352 format: "parquet".into(),
353 compression: "zstd".into(),
354 schema_fingerprint: "xxh3:0123456789abcdef".into(),
355 row_count,
356 part_count,
357 parts,
358 column_checksums: None,
359 checksum_key_column: None,
360 }
361 }
362
363 #[test]
366 fn manifest_version_is_one() {
367 assert_eq!(MANIFEST_VERSION, 1);
368 }
369
370 #[test]
371 fn filenames_are_stable() {
372 assert_eq!(MANIFEST_FILENAME, "manifest.json");
373 assert_eq!(SUCCESS_FILENAME, "_SUCCESS");
374 assert_eq!(QUARANTINE_PREFIX, "_quarantine");
375 }
376
377 #[test]
380 fn self_consistent_manifest_validates() {
381 let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
382 assert_eq!(m.validate_self_consistency(), Ok(()));
383 }
384
385 #[test]
386 fn rejects_part_count_mismatch() {
387 let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
388 m.part_count = 5;
389 assert!(matches!(
390 m.validate_self_consistency(),
391 Err(ManifestInconsistency::PartCountMismatch {
392 declared: 5,
393 actual: 1
394 })
395 ));
396 }
397
398 #[test]
399 fn rejects_row_count_mismatch() {
400 let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
401 m.row_count = 999;
402 assert!(matches!(
403 m.validate_self_consistency(),
404 Err(ManifestInconsistency::RowCountMismatch {
405 declared: 999,
406 actual: 100
407 })
408 ));
409 }
410
411 #[test]
412 fn rejects_duplicate_part_id() {
413 let m = manifest_with_parts(vec![part(1, 100, 4096), part(1, 200, 8192)]);
414 let err = m.validate_self_consistency().unwrap_err();
415 assert_eq!(err, ManifestInconsistency::DuplicatePartId(1));
416 }
417
418 #[test]
419 fn rejects_unsupported_version() {
420 let mut m = manifest_with_parts(vec![]);
421 m.manifest_version = 999;
422 m.part_count = 0;
423 m.row_count = 0;
424 assert!(matches!(
425 m.validate_self_consistency(),
426 Err(ManifestInconsistency::UnsupportedVersion {
427 found: 999,
428 supported: 1
429 })
430 ));
431 }
432
433 #[test]
436 fn quarantined_parts_do_not_count_toward_row_or_part_totals() {
437 let mut p_q = part(2, 999, 8192);
438 p_q.status = PartStatus::Quarantined;
439 let m = manifest_with_parts(vec![part(1, 100, 4096), p_q]);
440
441 assert_eq!(m.validate_self_consistency(), Ok(()));
443 assert_eq!(m.committed_rows(), 100);
444 assert_eq!(m.committed_part_count(), 1);
445 }
446
447 #[test]
450 fn json_roundtrip_preserves_fields() {
451 let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
452 let json = serde_json::to_string_pretty(&m).unwrap();
453 let parsed: RunManifest = serde_json::from_str(&json).unwrap();
454 assert_eq!(m, parsed);
455 }
456
457 #[test]
458 fn status_serializes_as_snake_case() {
459 let m = manifest_with_parts(vec![]);
460 let mut m = m;
463 m.part_count = 0;
464 m.row_count = 0;
465 let json = serde_json::to_string(&m).unwrap();
466 assert!(json.contains("\"status\":\"success\""));
467
468 m.status = ManifestStatus::Interrupted;
469 let json = serde_json::to_string(&m).unwrap();
470 assert!(json.contains("\"status\":\"interrupted\""));
471 }
472
473 #[test]
476 fn success_marker_body_is_xxh3_prefix_plus_16_hex_plus_newline() {
477 let body = success_marker_body(b"some manifest bytes");
478 assert!(body.starts_with("xxh3:"), "body = {body:?}");
479 assert!(body.ends_with('\n'), "body = {body:?}");
480 let trimmed = body.trim_end();
481 let hex = &trimmed["xxh3:".len()..];
482 assert_eq!(hex.len(), 16, "body = {body:?}");
483 assert!(
484 hex.chars()
485 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
486 );
487 }
488
489 #[test]
490 fn success_marker_body_is_deterministic_for_same_input() {
491 let a = success_marker_body(b"hello");
492 let b = success_marker_body(b"hello");
493 assert_eq!(a, b);
494 }
495
496 #[test]
497 fn success_marker_body_differs_for_different_manifest_bytes() {
498 let a = success_marker_body(b"manifest one");
499 let b = success_marker_body(b"manifest two");
500 assert_ne!(a, b);
501 }
502
503 #[test]
504 fn parse_success_marker_roundtrips_with_writer() {
505 let body = success_marker_body(b"some manifest bytes");
506 let fp = parse_success_marker(&body).expect("must parse");
507 assert!(fp.starts_with("xxh3:"));
508 assert_eq!(fp.len(), "xxh3:".len() + 16);
509 }
510
511 #[test]
512 fn parse_success_marker_rejects_malformed_bodies() {
513 assert_eq!(parse_success_marker(""), None);
514 assert_eq!(parse_success_marker("\n"), None);
515 assert_eq!(parse_success_marker("sha256:0123456789abcdef"), None);
516 assert_eq!(parse_success_marker("xxh3:0123\n"), None);
518 assert_eq!(parse_success_marker("xxh3:0123456789ABCDEF\n"), None);
520 assert_eq!(parse_success_marker("xxh3:zzzzzzzzzzzzzzzz\n"), None);
522 assert_eq!(parse_success_marker("0123456789abcdef\n"), None);
524 }
525
526 #[test]
527 fn parse_success_marker_tolerates_trailing_whitespace() {
528 let body = "xxh3:0123456789abcdef\n";
529 assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
530 let body = "xxh3:0123456789abcdef\r\n";
532 assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
533 }
534
535 #[test]
536 fn unknown_fields_are_ignored_by_reader() {
537 let json = r#"{
540 "manifest_version": 1,
541 "run_id": "r1",
542 "export_name": "t",
543 "started_at": "2026-01-01T00:00:00Z",
544 "finished_at": "2026-01-01T00:01:00Z",
545 "status": "success",
546 "source": {"engine": "postgres"},
547 "destination": {"kind": "local", "uri": "file:///tmp/out/"},
548 "format": "parquet",
549 "compression": "zstd",
550 "schema_fingerprint": "xxh3:0000000000000000",
551 "row_count": 0,
552 "part_count": 0,
553 "parts": [],
554 "future_field_added_in_v2": {"nested": true}
555 }"#;
556 let parsed: RunManifest = serde_json::from_str(json).unwrap();
557 assert_eq!(parsed.run_id, "r1");
558 assert_eq!(parsed.validate_self_consistency(), Ok(()));
559 }
560}