1#![allow(dead_code)]
30
31use serde::{Deserialize, Serialize};
32
33pub const MANIFEST_VERSION: u32 = 1;
35
36pub const MANIFEST_FILENAME: &str = "manifest.json";
38
39pub const SUCCESS_FILENAME: &str = "_SUCCESS";
42
43pub const QUARANTINE_PREFIX: &str = "_quarantine";
46
47pub fn success_marker_body(manifest_bytes: &[u8]) -> String {
60 use xxhash_rust::xxh3::xxh3_64;
61 format!("xxh3:{:016x}\n", xxh3_64(manifest_bytes))
62}
63
64pub fn parse_success_marker(body: &str) -> Option<&str> {
74 let trimmed = body.trim_end_matches(|c: char| c.is_ascii_whitespace());
75 if trimmed.len() != "xxh3:".len() + 16 {
76 return None;
77 }
78 let (prefix, hex) = trimmed.split_at("xxh3:".len());
79 if prefix != "xxh3:" {
80 return None;
81 }
82 if !hex
83 .chars()
84 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
85 {
86 return None;
87 }
88 Some(trimmed)
89}
90
91#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
96pub struct RunManifest {
97 pub manifest_version: u32,
98 pub run_id: String,
99 pub export_name: String,
100 pub started_at: String,
101 pub finished_at: String,
102 pub status: ManifestStatus,
103 pub source: ManifestSource,
104 pub destination: ManifestDestination,
105 pub format: String,
106 pub compression: String,
107 pub schema_fingerprint: String,
109 pub row_count: i64,
110 pub part_count: u32,
111 pub parts: Vec<ManifestPart>,
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "snake_case")]
122pub enum ManifestStatus {
123 Success,
124 Failed,
125 Interrupted,
126}
127
128#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
129pub struct ManifestSource {
130 pub engine: String,
131 pub schema: Option<String>,
132 pub table: Option<String>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136pub struct ManifestDestination {
137 pub kind: String,
138 pub uri: String,
139}
140
141#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
146pub struct ManifestPart {
147 pub part_id: u32,
148 pub path: String,
149 pub rows: i64,
150 pub size_bytes: u64,
151 pub content_fingerprint: String,
155 pub status: PartStatus,
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
159#[serde(rename_all = "snake_case")]
160pub enum PartStatus {
161 Committed,
163 Quarantined,
165}
166
167impl RunManifest {
168 pub fn committed_rows(&self) -> i64 {
171 self.parts
172 .iter()
173 .filter(|p| p.status == PartStatus::Committed)
174 .map(|p| p.rows)
175 .sum()
176 }
177
178 pub fn committed_part_count(&self) -> usize {
180 self.parts
181 .iter()
182 .filter(|p| p.status == PartStatus::Committed)
183 .count()
184 }
185
186 pub fn validate_self_consistency(&self) -> std::result::Result<(), ManifestInconsistency> {
190 if self.manifest_version != MANIFEST_VERSION {
191 return Err(ManifestInconsistency::UnsupportedVersion {
192 found: self.manifest_version,
193 supported: MANIFEST_VERSION,
194 });
195 }
196 let actual_parts = self.committed_part_count();
197 if actual_parts != self.part_count as usize {
198 return Err(ManifestInconsistency::PartCountMismatch {
199 declared: self.part_count,
200 actual: actual_parts,
201 });
202 }
203 let actual_rows = self.committed_rows();
204 if actual_rows != self.row_count {
205 return Err(ManifestInconsistency::RowCountMismatch {
206 declared: self.row_count,
207 actual: actual_rows,
208 });
209 }
210 let mut ids: Vec<u32> = self.parts.iter().map(|p| p.part_id).collect();
212 ids.sort_unstable();
213 for w in ids.windows(2) {
214 if w[0] == w[1] {
215 return Err(ManifestInconsistency::DuplicatePartId(w[0]));
216 }
217 }
218 Ok(())
219 }
220}
221
222#[derive(Debug, PartialEq)]
227pub enum ManifestInconsistency {
228 UnsupportedVersion { found: u32, supported: u32 },
229 PartCountMismatch { declared: u32, actual: usize },
230 RowCountMismatch { declared: i64, actual: i64 },
231 DuplicatePartId(u32),
232}
233
234impl std::fmt::Display for ManifestInconsistency {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 match self {
237 Self::UnsupportedVersion { found, supported } => write!(
238 f,
239 "manifest_version {found} is not supported by this build (expected {supported})"
240 ),
241 Self::PartCountMismatch { declared, actual } => write!(
242 f,
243 "part_count declares {declared} parts but {actual} committed parts found"
244 ),
245 Self::RowCountMismatch { declared, actual } => write!(
246 f,
247 "row_count declares {declared} rows but committed parts sum to {actual}"
248 ),
249 Self::DuplicatePartId(id) => {
250 write!(f, "duplicate part_id {id} in manifest.parts")
251 }
252 }
253 }
254}
255
256impl std::error::Error for ManifestInconsistency {}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 fn part(id: u32, rows: i64, size: u64) -> ManifestPart {
263 ManifestPart {
264 part_id: id,
265 path: format!("part-{id:06}.parquet"),
266 rows,
267 size_bytes: size,
268 content_fingerprint: format!("xxh3:{:016x}", id as u64),
269 status: PartStatus::Committed,
270 }
271 }
272
273 fn manifest_with_parts(parts: Vec<ManifestPart>) -> RunManifest {
274 let row_count = parts
275 .iter()
276 .filter(|p| p.status == PartStatus::Committed)
277 .map(|p| p.rows)
278 .sum();
279 let part_count = parts
280 .iter()
281 .filter(|p| p.status == PartStatus::Committed)
282 .count() as u32;
283 RunManifest {
284 manifest_version: MANIFEST_VERSION,
285 run_id: "orders_20260521T120000.000".into(),
286 export_name: "public.orders".into(),
287 started_at: "2026-05-21T12:00:00Z".into(),
288 finished_at: "2026-05-21T12:14:33Z".into(),
289 status: ManifestStatus::Success,
290 source: ManifestSource {
291 engine: "postgres".into(),
292 schema: Some("public".into()),
293 table: Some("orders".into()),
294 },
295 destination: ManifestDestination {
296 kind: "gcs".into(),
297 uri: "gs://rivet-exports/public.orders/run/".into(),
298 },
299 format: "parquet".into(),
300 compression: "zstd".into(),
301 schema_fingerprint: "xxh3:0123456789abcdef".into(),
302 row_count,
303 part_count,
304 parts,
305 }
306 }
307
308 #[test]
311 fn manifest_version_is_one() {
312 assert_eq!(MANIFEST_VERSION, 1);
313 }
314
315 #[test]
316 fn filenames_are_stable() {
317 assert_eq!(MANIFEST_FILENAME, "manifest.json");
318 assert_eq!(SUCCESS_FILENAME, "_SUCCESS");
319 assert_eq!(QUARANTINE_PREFIX, "_quarantine");
320 }
321
322 #[test]
325 fn self_consistent_manifest_validates() {
326 let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
327 assert_eq!(m.validate_self_consistency(), Ok(()));
328 }
329
330 #[test]
331 fn rejects_part_count_mismatch() {
332 let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
333 m.part_count = 5;
334 assert!(matches!(
335 m.validate_self_consistency(),
336 Err(ManifestInconsistency::PartCountMismatch {
337 declared: 5,
338 actual: 1
339 })
340 ));
341 }
342
343 #[test]
344 fn rejects_row_count_mismatch() {
345 let mut m = manifest_with_parts(vec![part(1, 100, 4096)]);
346 m.row_count = 999;
347 assert!(matches!(
348 m.validate_self_consistency(),
349 Err(ManifestInconsistency::RowCountMismatch {
350 declared: 999,
351 actual: 100
352 })
353 ));
354 }
355
356 #[test]
357 fn rejects_duplicate_part_id() {
358 let m = manifest_with_parts(vec![part(1, 100, 4096), part(1, 200, 8192)]);
359 let err = m.validate_self_consistency().unwrap_err();
360 assert_eq!(err, ManifestInconsistency::DuplicatePartId(1));
361 }
362
363 #[test]
364 fn rejects_unsupported_version() {
365 let mut m = manifest_with_parts(vec![]);
366 m.manifest_version = 999;
367 m.part_count = 0;
368 m.row_count = 0;
369 assert!(matches!(
370 m.validate_self_consistency(),
371 Err(ManifestInconsistency::UnsupportedVersion {
372 found: 999,
373 supported: 1
374 })
375 ));
376 }
377
378 #[test]
381 fn quarantined_parts_do_not_count_toward_row_or_part_totals() {
382 let mut p_q = part(2, 999, 8192);
383 p_q.status = PartStatus::Quarantined;
384 let m = manifest_with_parts(vec![part(1, 100, 4096), p_q]);
385
386 assert_eq!(m.validate_self_consistency(), Ok(()));
388 assert_eq!(m.committed_rows(), 100);
389 assert_eq!(m.committed_part_count(), 1);
390 }
391
392 #[test]
395 fn json_roundtrip_preserves_fields() {
396 let m = manifest_with_parts(vec![part(1, 100, 4096), part(2, 200, 8192)]);
397 let json = serde_json::to_string_pretty(&m).unwrap();
398 let parsed: RunManifest = serde_json::from_str(&json).unwrap();
399 assert_eq!(m, parsed);
400 }
401
402 #[test]
403 fn status_serializes_as_snake_case() {
404 let m = manifest_with_parts(vec![]);
405 let mut m = m;
408 m.part_count = 0;
409 m.row_count = 0;
410 let json = serde_json::to_string(&m).unwrap();
411 assert!(json.contains("\"status\":\"success\""));
412
413 m.status = ManifestStatus::Interrupted;
414 let json = serde_json::to_string(&m).unwrap();
415 assert!(json.contains("\"status\":\"interrupted\""));
416 }
417
418 #[test]
421 fn success_marker_body_is_xxh3_prefix_plus_16_hex_plus_newline() {
422 let body = success_marker_body(b"some manifest bytes");
423 assert!(body.starts_with("xxh3:"), "body = {body:?}");
424 assert!(body.ends_with('\n'), "body = {body:?}");
425 let trimmed = body.trim_end();
426 let hex = &trimmed["xxh3:".len()..];
427 assert_eq!(hex.len(), 16, "body = {body:?}");
428 assert!(
429 hex.chars()
430 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
431 );
432 }
433
434 #[test]
435 fn success_marker_body_is_deterministic_for_same_input() {
436 let a = success_marker_body(b"hello");
437 let b = success_marker_body(b"hello");
438 assert_eq!(a, b);
439 }
440
441 #[test]
442 fn success_marker_body_differs_for_different_manifest_bytes() {
443 let a = success_marker_body(b"manifest one");
444 let b = success_marker_body(b"manifest two");
445 assert_ne!(a, b);
446 }
447
448 #[test]
449 fn parse_success_marker_roundtrips_with_writer() {
450 let body = success_marker_body(b"some manifest bytes");
451 let fp = parse_success_marker(&body).expect("must parse");
452 assert!(fp.starts_with("xxh3:"));
453 assert_eq!(fp.len(), "xxh3:".len() + 16);
454 }
455
456 #[test]
457 fn parse_success_marker_rejects_malformed_bodies() {
458 assert_eq!(parse_success_marker(""), None);
459 assert_eq!(parse_success_marker("\n"), None);
460 assert_eq!(parse_success_marker("sha256:0123456789abcdef"), None);
461 assert_eq!(parse_success_marker("xxh3:0123\n"), None);
463 assert_eq!(parse_success_marker("xxh3:0123456789ABCDEF\n"), None);
465 assert_eq!(parse_success_marker("xxh3:zzzzzzzzzzzzzzzz\n"), None);
467 assert_eq!(parse_success_marker("0123456789abcdef\n"), None);
469 }
470
471 #[test]
472 fn parse_success_marker_tolerates_trailing_whitespace() {
473 let body = "xxh3:0123456789abcdef\n";
474 assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
475 let body = "xxh3:0123456789abcdef\r\n";
477 assert_eq!(parse_success_marker(body), Some("xxh3:0123456789abcdef"));
478 }
479
480 #[test]
481 fn unknown_fields_are_ignored_by_reader() {
482 let json = r#"{
485 "manifest_version": 1,
486 "run_id": "r1",
487 "export_name": "t",
488 "started_at": "2026-01-01T00:00:00Z",
489 "finished_at": "2026-01-01T00:01:00Z",
490 "status": "success",
491 "source": {"engine": "postgres"},
492 "destination": {"kind": "local", "uri": "file:///tmp/out/"},
493 "format": "parquet",
494 "compression": "zstd",
495 "schema_fingerprint": "xxh3:0000000000000000",
496 "row_count": 0,
497 "part_count": 0,
498 "parts": [],
499 "future_field_added_in_v2": {"nested": true}
500 }"#;
501 let parsed: RunManifest = serde_json::from_str(json).unwrap();
502 assert_eq!(parsed.run_id, "r1");
503 assert_eq!(parsed.validate_self_consistency(), Ok(()));
504 }
505}