1#![allow(dead_code)]
30
31use serde::{Deserialize, Serialize};
32
33pub const MANIFEST_VERSION: u32 = 1;
35
36pub const MANIFEST_FILENAME: &str = "manifest.json";
38
39pub const SUCCESS_FILENAME: &str = "_SUCCESS";
42
43pub const QUARANTINE_PREFIX: &str = "_quarantine";
46
47pub const DOCTOR_PROBE_FILENAME: &str = ".rivet_doctor_probe";
52
53pub fn join_key(dir: &str, key: &str) -> String {
59 let dir = dir.trim_end_matches('/');
60 if dir.is_empty() {
61 key.to_string()
62 } else {
63 format!("{dir}/{key}")
64 }
65}
66
67pub fn success_marker_body(manifest_bytes: &[u8]) -> String {
80 use xxhash_rust::xxh3::xxh3_64;
81 format!("xxh3:{:016x}\n", xxh3_64(manifest_bytes))
82}
83
84pub fn parse_success_marker(body: &str) -> Option<&str> {
94 let trimmed = body.trim_end_matches(|c: char| c.is_ascii_whitespace());
95 if trimmed.len() != "xxh3:".len() + 16 {
96 return None;
97 }
98 let (prefix, hex) = trimmed.split_at("xxh3:".len());
99 if prefix != "xxh3:" {
100 return None;
101 }
102 if !hex
103 .chars()
104 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
105 {
106 return None;
107 }
108 Some(trimmed)
109}
110
111#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
116pub struct RunManifest {
117 pub manifest_version: u32,
118 pub run_id: String,
119 pub export_name: String,
120 pub started_at: String,
121 pub finished_at: String,
122 pub status: ManifestStatus,
123 pub source: ManifestSource,
124 pub destination: ManifestDestination,
125 pub format: String,
126 pub compression: String,
127 pub schema_fingerprint: String,
129 pub row_count: i64,
130 pub part_count: u32,
131 pub parts: Vec<ManifestPart>,
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
141#[serde(rename_all = "snake_case")]
142pub enum ManifestStatus {
143 Success,
144 Failed,
145 Interrupted,
146}
147
148#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
149pub struct ManifestSource {
150 pub engine: String,
151 pub schema: Option<String>,
152 pub table: Option<String>,
153}
154
155#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
156pub struct ManifestDestination {
157 pub kind: String,
158 pub uri: String,
159}
160
161#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
166pub struct ManifestPart {
167 pub part_id: u32,
168 pub path: String,
169 pub rows: i64,
170 pub size_bytes: u64,
171 pub content_fingerprint: String,
175 #[serde(default)]
182 pub content_md5: String,
183 pub status: PartStatus,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
187#[serde(rename_all = "snake_case")]
188pub enum PartStatus {
189 Committed,
191 Quarantined,
193}
194
195impl RunManifest {
196 pub fn committed_rows(&self) -> i64 {
199 self.parts
200 .iter()
201 .filter(|p| p.status == PartStatus::Committed)
202 .map(|p| p.rows)
203 .sum()
204 }
205
206 pub fn committed_part_count(&self) -> usize {
208 self.parts
209 .iter()
210 .filter(|p| p.status == PartStatus::Committed)
211 .count()
212 }
213
214 pub fn validate_self_consistency(&self) -> std::result::Result<(), ManifestInconsistency> {
218 if self.manifest_version != MANIFEST_VERSION {
219 return Err(ManifestInconsistency::UnsupportedVersion {
220 found: self.manifest_version,
221 supported: MANIFEST_VERSION,
222 });
223 }
224 let actual_parts = self.committed_part_count();
225 if actual_parts != self.part_count as usize {
226 return Err(ManifestInconsistency::PartCountMismatch {
227 declared: self.part_count,
228 actual: actual_parts,
229 });
230 }
231 let actual_rows = self.committed_rows();
232 if actual_rows != self.row_count {
233 return Err(ManifestInconsistency::RowCountMismatch {
234 declared: self.row_count,
235 actual: actual_rows,
236 });
237 }
238 let mut ids: Vec<u32> = self.parts.iter().map(|p| p.part_id).collect();
240 ids.sort_unstable();
241 for w in ids.windows(2) {
242 if w[0] == w[1] {
243 return Err(ManifestInconsistency::DuplicatePartId(w[0]));
244 }
245 }
246 Ok(())
247 }
248}
249
250#[derive(Debug, PartialEq)]
255pub enum ManifestInconsistency {
256 UnsupportedVersion { found: u32, supported: u32 },
257 PartCountMismatch { declared: u32, actual: usize },
258 RowCountMismatch { declared: i64, actual: i64 },
259 DuplicatePartId(u32),
260}
261
262impl std::fmt::Display for ManifestInconsistency {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 match self {
265 Self::UnsupportedVersion { found, supported } => write!(
266 f,
267 "manifest_version {found} is not supported by this build (expected {supported})"
268 ),
269 Self::PartCountMismatch { declared, actual } => write!(
270 f,
271 "part_count declares {declared} parts but {actual} committed parts found"
272 ),
273 Self::RowCountMismatch { declared, actual } => write!(
274 f,
275 "row_count declares {declared} rows but committed parts sum to {actual}"
276 ),
277 Self::DuplicatePartId(id) => {
278 write!(f, "duplicate part_id {id} in manifest.parts")
279 }
280 }
281 }
282}
283
284impl std::error::Error for ManifestInconsistency {}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 fn part(id: u32, rows: i64, size: u64) -> ManifestPart {
291 ManifestPart {
292 part_id: id,
293 path: format!("part-{id:06}.parquet"),
294 rows,
295 size_bytes: size,
296 content_fingerprint: format!("xxh3:{:016x}", id as u64),
297 content_md5: String::new(),
298 status: PartStatus::Committed,
299 }
300 }
301
302 fn manifest_with_parts(parts: Vec<ManifestPart>) -> RunManifest {
303 let row_count = parts
304 .iter()
305 .filter(|p| p.status == PartStatus::Committed)
306 .map(|p| p.rows)
307 .sum();
308 let part_count = parts
309 .iter()
310 .filter(|p| p.status == PartStatus::Committed)
311 .count() as u32;
312 RunManifest {
313 manifest_version: MANIFEST_VERSION,
314 run_id: "orders_20260521T120000.000".into(),
315 export_name: "public.orders".into(),
316 started_at: "2026-05-21T12:00:00Z".into(),
317 finished_at: "2026-05-21T12:14:33Z".into(),
318 status: ManifestStatus::Success,
319 source: ManifestSource {
320 engine: "postgres".into(),
321 schema: Some("public".into()),
322 table: Some("orders".into()),
323 },
324 destination: ManifestDestination {
325 kind: "gcs".into(),
326 uri: "gs://rivet-exports/public.orders/run/".into(),
327 },
328 format: "parquet".into(),
329 compression: "zstd".into(),
330 schema_fingerprint: "xxh3:0123456789abcdef".into(),
331 row_count,
332 part_count,
333 parts,
334 }
335 }
336
337 #[test]
340 fn manifest_version_is_one() {
341 assert_eq!(MANIFEST_VERSION, 1);
342 }
343
344 #[test]
345 fn filenames_are_stable() {
346 assert_eq!(MANIFEST_FILENAME, "manifest.json");
347 assert_eq!(SUCCESS_FILENAME, "_SUCCESS");
348 assert_eq!(QUARANTINE_PREFIX, "_quarantine");
349 }
350
351 #[test]
354 fn self_consistent_manifest_validates() {
355 let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
356 assert_eq!(m.validate_self_consistency(), Ok(()));
357 }
358
359 #[test]
360 fn rejects_part_count_mismatch() {
361 let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
362 m.part_count = 5;
363 assert!(matches!(
364 m.validate_self_consistency(),
365 Err(ManifestInconsistency::PartCountMismatch {
366 declared: 5,
367 actual: 1
368 })
369 ));
370 }
371
372 #[test]
373 fn rejects_row_count_mismatch() {
374 let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
375 m.row_count = 999;
376 assert!(matches!(
377 m.validate_self_consistency(),
378 Err(ManifestInconsistency::RowCountMismatch {
379 declared: 999,
380 actual: 100
381 })
382 ));
383 }
384
385 #[test]
386 fn rejects_duplicate_part_id() {
387 let m = manifest_with_parts(vec![part(1, 100, 4096), part(1, 200, 8192)]);
388 let err = m.validate_self_consistency().unwrap_err();
389 assert_eq!(err, ManifestInconsistency::DuplicatePartId(1));
390 }
391
392 #[test]
393 fn rejects_unsupported_version() {
394 let mut m = manifest_with_parts(vec![]);
395 m.manifest_version = 999;
396 m.part_count = 0;
397 m.row_count = 0;
398 assert!(matches!(
399 m.validate_self_consistency(),
400 Err(ManifestInconsistency::UnsupportedVersion {
401 found: 999,
402 supported: 1
403 })
404 ));
405 }
406
407 #[test]
410 fn quarantined_parts_do_not_count_toward_row_or_part_totals() {
411 let mut p_q = part(2, 999, 8192);
412 p_q.status = PartStatus::Quarantined;
413 let m = manifest_with_parts(vec![part(1, 100, 4096), p_q]);
414
415 assert_eq!(m.validate_self_consistency(), Ok(()));
417 assert_eq!(m.committed_rows(), 100);
418 assert_eq!(m.committed_part_count(), 1);
419 }
420
421 #[test]
424 fn json_roundtrip_preserves_fields() {
425 let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
426 let json = serde_json::to_string_pretty(&m).unwrap();
427 let parsed: RunManifest = serde_json::from_str(&json).unwrap();
428 assert_eq!(m, parsed);
429 }
430
431 #[test]
432 fn status_serializes_as_snake_case() {
433 let m = manifest_with_parts(vec![]);
434 let mut m = m;
437 m.part_count = 0;
438 m.row_count = 0;
439 let json = serde_json::to_string(&m).unwrap();
440 assert!(json.contains("\"status\":\"success\""));
441
442 m.status = ManifestStatus::Interrupted;
443 let json = serde_json::to_string(&m).unwrap();
444 assert!(json.contains("\"status\":\"interrupted\""));
445 }
446
447 #[test]
450 fn success_marker_body_is_xxh3_prefix_plus_16_hex_plus_newline() {
451 let body = success_marker_body(b"some manifest bytes");
452 assert!(body.starts_with("xxh3:"), "body = {body:?}");
453 assert!(body.ends_with('\n'), "body = {body:?}");
454 let trimmed = body.trim_end();
455 let hex = &trimmed["xxh3:".len()..];
456 assert_eq!(hex.len(), 16, "body = {body:?}");
457 assert!(
458 hex.chars()
459 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
460 );
461 }
462
463 #[test]
464 fn success_marker_body_is_deterministic_for_same_input() {
465 let a = success_marker_body(b"hello");
466 let b = success_marker_body(b"hello");
467 assert_eq!(a, b);
468 }
469
470 #[test]
471 fn success_marker_body_differs_for_different_manifest_bytes() {
472 let a = success_marker_body(b"manifest one");
473 let b = success_marker_body(b"manifest two");
474 assert_ne!(a, b);
475 }
476
477 #[test]
478 fn parse_success_marker_roundtrips_with_writer() {
479 let body = success_marker_body(b"some manifest bytes");
480 let fp = parse_success_marker(&body).expect("must parse");
481 assert!(fp.starts_with("xxh3:"));
482 assert_eq!(fp.len(), "xxh3:".len() + 16);
483 }
484
485 #[test]
486 fn parse_success_marker_rejects_malformed_bodies() {
487 assert_eq!(parse_success_marker(""), None);
488 assert_eq!(parse_success_marker("\n"), None);
489 assert_eq!(parse_success_marker("sha256:0123456789abcdef"), None);
490 assert_eq!(parse_success_marker("xxh3:0123\n"), None);
492 assert_eq!(parse_success_marker("xxh3:0123456789ABCDEF\n"), None);
494 assert_eq!(parse_success_marker("xxh3:zzzzzzzzzzzzzzzz\n"), None);
496 assert_eq!(parse_success_marker("0123456789abcdef\n"), None);
498 }
499
500 #[test]
501 fn parse_success_marker_tolerates_trailing_whitespace() {
502 let body = "xxh3:0123456789abcdef\n";
503 assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
504 let body = "xxh3:0123456789abcdef\r\n";
506 assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
507 }
508
509 #[test]
510 fn unknown_fields_are_ignored_by_reader() {
511 let json = r#"{
514 "manifest_version": 1,
515 "run_id": "r1",
516 "export_name": "t",
517 "started_at": "2026-01-01T00:00:00Z",
518 "finished_at": "2026-01-01T00:01:00Z",
519 "status": "success",
520 "source": {"engine": "postgres"},
521 "destination": {"kind": "local", "uri": "file:///tmp/out/"},
522 "format": "parquet",
523 "compression": "zstd",
524 "schema_fingerprint": "xxh3:0000000000000000",
525 "row_count": 0,
526 "part_count": 0,
527 "parts": [],
528 "future_field_added_in_v2": {"nested": true}
529 }"#;
530 let parsed: RunManifest = serde_json::from_str(json).unwrap();
531 assert_eq!(parsed.run_id, "r1");
532 assert_eq!(parsed.validate_self_consistency(), Ok(()));
533 }
534}