1use std::collections::BTreeSet;
35use std::fs::{self, File};
36use std::io::{Read, Write};
37use std::path::{Component, Path, PathBuf};
38use std::time::{SystemTime, UNIX_EPOCH};
39
40use serde::{Deserialize, Serialize};
41
42use crate::kv::{VersionToken, WatchCursor};
43use crate::snapshot::SnapshotError;
44
45pub const ARTIFACT_SCHEMA_VERSION: u32 = 1;
49
50pub const MANIFEST_FILE: &str = "MANIFEST.json";
53
54pub(crate) const PAYLOAD_DIR: &str = "data";
56
57const HASH_BUF: usize = 1 << 20;
59
60#[derive(Debug, Clone)]
63pub struct ExportManifest {
64 pub schema_version: u32,
66 pub backend: String,
68 pub backend_version: String,
73 pub cursor: WatchCursor,
76 pub created_at_unix: u64,
78 pub files: Vec<ArtifactFile>,
81}
82
83#[derive(Debug, Clone)]
85pub struct ArtifactFile {
86 pub path: String,
88 pub size: u64,
90 pub blake3: String,
95}
96
97#[derive(Serialize, Deserialize)]
109#[serde(deny_unknown_fields)]
110struct ManifestWire {
111 schema_version: u32,
112 backend: String,
113 backend_version: String,
114 cursor_hex: String,
117 created_at_unix: u64,
118 files: Vec<FileWire>,
119}
120
121#[derive(Serialize, Deserialize)]
122#[serde(deny_unknown_fields)]
123struct FileWire {
124 path: String,
125 size: u64,
126 blake3: String,
127}
128
129fn invalid(msg: impl Into<String>) -> SnapshotError {
130 SnapshotError::ArtifactInvalid(msg.into())
131}
132
133pub(crate) fn hex_encode(bytes: &[u8]) -> String {
138 let mut out = String::with_capacity(bytes.len() * 2);
139 for b in bytes {
140 use std::fmt::Write as _;
141 let _ = write!(out, "{b:02x}");
142 }
143 out
144}
145
146pub(crate) fn hex_decode(s: &str) -> Option<Vec<u8>> {
147 if !s.len().is_multiple_of(2) {
148 return None;
149 }
150 (0..s.len())
151 .step_by(2)
152 .map(|i| u8::from_str_radix(s.get(i..i + 2)?, 16).ok())
153 .collect()
154}
155
156fn cursor_to_hex(cursor: &WatchCursor) -> String {
157 hex_encode(cursor.version().as_bytes())
158}
159
160fn cursor_from_hex(s: &str) -> Result<WatchCursor, SnapshotError> {
161 let bytes = hex_decode(s).ok_or_else(|| invalid(format!("malformed cursor_hex: {s:?}")))?;
162 let token = VersionToken::from_raw(&bytes).ok_or_else(|| {
163 invalid(format!(
164 "cursor_hex decodes to {} bytes, exceeds version token capacity",
165 bytes.len()
166 ))
167 })?;
168 Ok(WatchCursor::from_version(token))
169}
170
171pub(crate) fn write_manifest(
179 artifact_root: &Path,
180 manifest: &ExportManifest,
181) -> Result<(), SnapshotError> {
182 let wire = ManifestWire {
183 schema_version: manifest.schema_version,
184 backend: manifest.backend.clone(),
185 backend_version: manifest.backend_version.clone(),
186 cursor_hex: cursor_to_hex(&manifest.cursor),
187 created_at_unix: manifest.created_at_unix,
188 files: manifest
189 .files
190 .iter()
191 .map(|f| FileWire {
192 path: f.path.clone(),
193 size: f.size,
194 blake3: f.blake3.clone(),
195 })
196 .collect(),
197 };
198 let json = serde_json::to_vec_pretty(&wire)
199 .map_err(|e| SnapshotError::Backend(format!("manifest serialization failed: {e}")))?;
200
201 let mut tmp = tempfile::NamedTempFile::new_in(artifact_root)?;
202 tmp.write_all(&json)?;
203 tmp.as_file().sync_all()?;
204 tmp.persist(artifact_root.join(MANIFEST_FILE))
205 .map_err(|e| SnapshotError::Io(e.error))?;
206 Ok(())
207}
208
209pub(crate) fn read_manifest(artifact_dir: &Path) -> Result<ExportManifest, SnapshotError> {
211 let path = artifact_dir.join(MANIFEST_FILE);
212 let data = fs::read(&path).map_err(|e| {
213 if e.kind() == std::io::ErrorKind::NotFound {
214 invalid(format!("no {MANIFEST_FILE} in {}", artifact_dir.display()))
215 } else {
216 SnapshotError::Io(e)
217 }
218 })?;
219 manifest_from_slice(&data)
220}
221
222pub(crate) fn manifest_from_slice(data: &[u8]) -> Result<ExportManifest, SnapshotError> {
228 let wire: ManifestWire =
229 serde_json::from_slice(data).map_err(|e| invalid(format!("malformed manifest: {e}")))?;
230
231 if wire.schema_version != ARTIFACT_SCHEMA_VERSION {
232 return Err(invalid(format!(
233 "unsupported artifact schema_version {} (this build supports {})",
234 wire.schema_version, ARTIFACT_SCHEMA_VERSION
235 )));
236 }
237 for f in &wire.files {
238 validate_payload_path(&f.path)?;
239 }
240
241 Ok(ExportManifest {
242 schema_version: wire.schema_version,
243 backend: wire.backend,
244 backend_version: wire.backend_version,
245 cursor: cursor_from_hex(&wire.cursor_hex)?,
246 created_at_unix: wire.created_at_unix,
247 files: wire
248 .files
249 .into_iter()
250 .map(|f| ArtifactFile {
251 path: f.path,
252 size: f.size,
253 blake3: f.blake3,
254 })
255 .collect(),
256 })
257}
258
259fn validate_payload_path(p: &str) -> Result<(), SnapshotError> {
263 let prefix = format!("{PAYLOAD_DIR}/");
264 if !p.starts_with(&prefix) || p.len() == prefix.len() {
265 return Err(invalid(format!(
266 "manifest path {p:?} is not under {PAYLOAD_DIR}/"
267 )));
268 }
269 if p.contains('\\') {
270 return Err(invalid(format!("manifest path {p:?} contains a backslash")));
271 }
272 let path = Path::new(p);
273 for comp in path.components() {
274 match comp {
275 Component::Normal(_) => {}
276 _ => {
277 return Err(invalid(format!(
278 "manifest path {p:?} contains a non-normal component"
279 )));
280 }
281 }
282 }
283 Ok(())
284}
285
286fn hash_file(path: &Path, buf: &mut [u8]) -> Result<(File, u64, String), SnapshotError> {
293 let mut file = File::open(path)?;
294 let mut hasher = blake3::Hasher::new();
295 let mut size = 0u64;
296 loop {
297 let n = file.read(buf)?;
298 if n == 0 {
299 break;
300 }
301 size += n as u64;
302 hasher.update(&buf[..n]);
303 }
304 Ok((file, size, hasher.finalize().to_hex().to_string()))
305}
306
307fn list_payload_files(root: &Path) -> Result<Vec<PathBuf>, SnapshotError> {
309 let payload = root.join(PAYLOAD_DIR);
310 let mut out = Vec::new();
311 let mut stack = vec![payload.clone()];
312 while let Some(dir) = stack.pop() {
313 for entry in fs::read_dir(&dir)? {
314 let entry = entry?;
315 let ty = entry.file_type()?;
316 if ty.is_dir() {
317 stack.push(entry.path());
318 } else if ty.is_file() {
319 out.push(entry.path());
320 } else {
321 return Err(invalid(format!(
326 "payload contains a non-regular file: {}",
327 entry.path().display()
328 )));
329 }
330 }
331 }
332 out.sort();
333 Ok(out)
334}
335
336pub(crate) fn hash_payload(root: &Path) -> Result<Vec<ArtifactFile>, SnapshotError> {
339 let mut files = Vec::new();
340 let mut buf = vec![0u8; HASH_BUF];
341 for abs in list_payload_files(root)? {
342 let (file, size, blake3) = hash_file(&abs, &mut buf)?;
343 file.sync_all()?;
347 let rel = abs
348 .strip_prefix(root)
349 .map_err(|_| SnapshotError::Backend("payload path escaped artifact root".into()))?;
350 let rel = rel
351 .to_str()
352 .ok_or_else(|| invalid(format!("non-UTF-8 payload path: {}", rel.display())))?;
353 files.push(ArtifactFile {
354 path: rel.to_string(),
355 size,
356 blake3,
357 });
358 }
359 fsync_dir_tree(&root.join(PAYLOAD_DIR))?;
360 Ok(files)
361}
362
363fn fsync_dir(path: &Path) -> Result<(), SnapshotError> {
364 File::open(path)?.sync_all()?;
365 Ok(())
366}
367
368fn fsync_dir_tree(root: &Path) -> Result<(), SnapshotError> {
369 let mut stack = vec![root.to_path_buf()];
370 while let Some(dir) = stack.pop() {
371 fsync_dir(&dir)?;
372 for entry in fs::read_dir(&dir)? {
373 let entry = entry?;
374 if entry.file_type()?.is_dir() {
375 stack.push(entry.path());
376 }
377 }
378 }
379 Ok(())
380}
381
382pub(crate) fn check_dest_available(dest: &Path) -> Result<(), SnapshotError> {
390 match fs::metadata(dest) {
391 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
392 Err(e) => Err(SnapshotError::Io(e)),
393 Ok(meta) if meta.is_dir() => {
394 if fs::read_dir(dest)?.next().is_some() {
395 Err(invalid(format!(
396 "destination {} exists and is not empty",
397 dest.display()
398 )))
399 } else {
400 Ok(())
401 }
402 }
403 Ok(_) => Err(invalid(format!(
404 "destination {} already exists",
405 dest.display()
406 ))),
407 }
408}
409
410pub(crate) fn rename_into_place(from: &Path, dest: &Path) -> Result<(), SnapshotError> {
420 if dest.is_dir() {
421 fs::remove_dir(dest)?;
422 }
423 fs::rename(from, dest)?;
424 if let Some(parent) = dest.parent() {
425 fsync_dir(parent)?;
426 }
427 Ok(())
428}
429
430fn stage_dir_in(parent: &Path) -> Result<tempfile::TempDir, SnapshotError> {
431 Ok(tempfile::Builder::new()
435 .prefix(".slipstream-artifact-")
436 .tempdir_in(parent)?)
437}
438
439fn dest_parent(dest: &Path) -> Result<&Path, SnapshotError> {
440 dest.parent()
441 .filter(|p| !p.as_os_str().is_empty())
442 .ok_or_else(|| {
443 invalid(format!(
444 "destination {} has no parent directory",
445 dest.display()
446 ))
447 })
448}
449
450pub(crate) struct ExportStage {
460 dir: tempfile::TempDir,
461 dest: PathBuf,
462}
463
464impl ExportStage {
465 pub(crate) fn new(dest_dir: &Path) -> Result<Self, SnapshotError> {
468 check_dest_available(dest_dir)?;
469 let parent = dest_parent(dest_dir)?;
470 let dir = stage_dir_in(parent)?;
471 Ok(Self {
472 dir,
473 dest: dest_dir.to_path_buf(),
474 })
475 }
476
477 pub(crate) fn payload(&self) -> PathBuf {
483 self.dir.path().join(PAYLOAD_DIR)
484 }
485
486 pub(crate) fn seal_and_finalize(
489 self,
490 backend: &str,
491 backend_version: &str,
492 cursor: &WatchCursor,
493 ) -> Result<ExportManifest, SnapshotError> {
494 let root = self.dir.path();
495 let files = hash_payload(root)?;
496 let manifest = ExportManifest {
497 schema_version: ARTIFACT_SCHEMA_VERSION,
498 backend: backend.to_string(),
499 backend_version: backend_version.to_string(),
500 cursor: cursor.clone(),
501 created_at_unix: SystemTime::now()
502 .duration_since(UNIX_EPOCH)
503 .map(|d| d.as_secs())
504 .unwrap_or(0),
505 files,
506 };
507 write_manifest(root, &manifest)?;
508 fsync_dir(root)?;
509
510 check_dest_available(&self.dest)?;
514 let dest = self.dest.clone();
515 let root = self.dir.keep();
516 rename_into_place(&root, &dest)?;
517 Ok(manifest)
518 }
519}
520
521pub(crate) struct ImportStage {
528 dir: tempfile::TempDir,
529 dest: PathBuf,
530}
531
532impl ImportStage {
533 pub(crate) fn payload(&self) -> PathBuf {
535 self.dir.path().join(PAYLOAD_DIR)
536 }
537
538 #[cfg(any(feature = "fjall", feature = "rocksdb"))]
541 pub(crate) fn finalize_dir(self) -> Result<(), SnapshotError> {
542 check_dest_available(&self.dest)?;
543 rename_into_place(&self.payload(), &self.dest)
544 }
546
547 pub(crate) fn finalize_file(self, rel: &str) -> Result<(), SnapshotError> {
550 check_dest_available(&self.dest)?;
551 rename_into_place(&self.payload().join(rel), &self.dest)
552 }
553}
554
555pub(crate) fn verify_and_stage_import(
565 artifact_dir: &Path,
566 dest: &Path,
567 expected_backend: &str,
568 check_backend_version: impl Fn(&str) -> Result<(), SnapshotError>,
569) -> Result<(ExportManifest, ImportStage), SnapshotError> {
570 let manifest = read_manifest(artifact_dir)?;
571
572 if manifest.backend != expected_backend {
573 return Err(invalid(format!(
574 "artifact backend is {:?}, expected {:?}",
575 manifest.backend, expected_backend
576 )));
577 }
578 check_backend_version(&manifest.backend_version)?;
579 check_dest_available(dest)?;
580
581 let declared: BTreeSet<&str> = manifest.files.iter().map(|f| f.path.as_str()).collect();
584 for abs in list_payload_files(artifact_dir)? {
585 let rel = abs
586 .strip_prefix(artifact_dir)
587 .map_err(|_| SnapshotError::Backend("payload path escaped artifact dir".into()))?;
588 let rel = rel
589 .to_str()
590 .ok_or_else(|| invalid(format!("non-UTF-8 payload path: {}", rel.display())))?;
591 if !declared.contains(rel) {
592 return Err(invalid(format!("payload contains undeclared file: {rel}")));
593 }
594 }
595
596 let parent = dest_parent(dest)?;
597 let dir = stage_dir_in(parent)?;
598 let stage = ImportStage {
599 dir,
600 dest: dest.to_path_buf(),
601 };
602
603 let mut buf = vec![0u8; HASH_BUF];
607 for f in &manifest.files {
608 let src_path = artifact_dir.join(&f.path);
609 let dst_path = stage.dir.path().join(&f.path);
610 if let Some(p) = dst_path.parent() {
611 fs::create_dir_all(p)?;
612 }
613 let mut src = File::open(&src_path).map_err(|e| {
614 if e.kind() == std::io::ErrorKind::NotFound {
615 invalid(format!("payload file missing: {}", f.path))
616 } else {
617 SnapshotError::Io(e)
618 }
619 })?;
620 let mut dst = File::create(&dst_path)?;
621 let mut hasher = blake3::Hasher::new();
622 let mut size = 0u64;
623 loop {
624 let n = src.read(&mut buf)?;
625 if n == 0 {
626 break;
627 }
628 size += n as u64;
629 hasher.update(&buf[..n]);
630 dst.write_all(&buf[..n])?;
631 }
632 dst.sync_all()?;
633 if size != f.size {
634 return Err(invalid(format!(
635 "payload file {} is {size} bytes, manifest says {}",
636 f.path, f.size
637 )));
638 }
639 let digest = hasher.finalize().to_hex().to_string();
640 if digest != f.blake3 {
641 return Err(invalid(format!(
642 "payload file {} checksum mismatch (got {digest}, manifest says {})",
643 f.path, f.blake3
644 )));
645 }
646 }
647 fsync_dir_tree(&stage.payload())?;
648
649 Ok((manifest, stage))
650}
651
652#[cfg(test)]
657mod tests {
658 use super::*;
659 use tempfile::TempDir;
660
661 fn manifest_with(files: Vec<ArtifactFile>, cursor: WatchCursor) -> ExportManifest {
662 ExportManifest {
663 schema_version: ARTIFACT_SCHEMA_VERSION,
664 backend: "append-log".into(),
665 backend_version: "2".into(),
666 cursor,
667 created_at_unix: 1_765_400_000,
668 files,
669 }
670 }
671
672 #[test]
673 fn manifest_round_trips() {
674 let dir = TempDir::new().unwrap();
675 let m = manifest_with(
676 vec![ArtifactFile {
677 path: "data/fold.snap".into(),
678 size: 42,
679 blake3: "ab".repeat(32),
680 }],
681 WatchCursor::from_u64(184_467),
682 );
683 write_manifest(dir.path(), &m).unwrap();
684 let got = read_manifest(dir.path()).unwrap();
685 assert_eq!(got.schema_version, m.schema_version);
686 assert_eq!(got.backend, m.backend);
687 assert_eq!(got.backend_version, m.backend_version);
688 assert_eq!(got.cursor, m.cursor);
689 assert_eq!(got.created_at_unix, m.created_at_unix);
690 assert_eq!(got.files.len(), 1);
691 assert_eq!(got.files[0].path, "data/fold.snap");
692 assert_eq!(got.files[0].size, 42);
693 }
694
695 #[test]
696 fn manifest_round_trips_none_cursor() {
697 let dir = TempDir::new().unwrap();
698 let m = manifest_with(vec![], WatchCursor::none());
699 write_manifest(dir.path(), &m).unwrap();
700 let got = read_manifest(dir.path()).unwrap();
701 assert!(got.cursor.is_none(), "none cursor survives the round trip");
702 }
703
704 #[test]
705 fn manifest_round_trips_fdb_width_cursor() {
706 let raw = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10];
708 let cursor = WatchCursor::from_version(VersionToken::from_raw(&raw).unwrap());
709 let dir = TempDir::new().unwrap();
710 write_manifest(dir.path(), &manifest_with(vec![], cursor.clone())).unwrap();
711 let got = read_manifest(dir.path()).unwrap();
712 assert_eq!(got.cursor, cursor);
713 }
714
715 fn write_raw_manifest(dir: &Path, json: &str) {
716 fs::write(dir.join(MANIFEST_FILE), json).unwrap();
717 }
718
719 fn wire_json(cursor_hex: &str, files: &str, schema: u32) -> String {
720 format!(
721 r#"{{"schema_version":{schema},"backend":"append-log","backend_version":"2",
722 "cursor_hex":"{cursor_hex}","created_at_unix":0,"files":{files}}}"#
723 )
724 }
725
726 #[test]
727 fn rejects_bad_cursor_hex() {
728 let dir = TempDir::new().unwrap();
729 for bad in ["zz", "abc", "0102030405060708090a0b"] {
730 write_raw_manifest(dir.path(), &wire_json(bad, "[]", ARTIFACT_SCHEMA_VERSION));
732 match read_manifest(dir.path()) {
733 Err(SnapshotError::ArtifactInvalid(_)) => {}
734 other => panic!("cursor_hex {bad:?}: expected ArtifactInvalid, got {other:?}"),
735 }
736 }
737 }
738
739 #[test]
740 fn rejects_wrong_schema_version() {
741 let dir = TempDir::new().unwrap();
742 write_raw_manifest(
743 dir.path(),
744 &wire_json("", "[]", ARTIFACT_SCHEMA_VERSION + 1),
745 );
746 match read_manifest(dir.path()) {
747 Err(SnapshotError::ArtifactInvalid(msg)) => {
748 assert!(msg.contains("schema_version"), "{msg}");
749 }
750 other => panic!("expected ArtifactInvalid, got {other:?}"),
751 }
752 }
753
754 #[test]
755 fn rejects_path_traversal() {
756 let dir = TempDir::new().unwrap();
757 for bad in [
758 "../escape",
759 "/abs/path",
760 "data/../escape",
761 "data/a\\b",
762 "nondata/x",
763 "data/",
764 "data",
765 ] {
766 let files = format!(
767 r#"[{{"path":"{}","size":0,"blake3":""}}]"#,
768 bad.replace('\\', "\\\\")
769 );
770 write_raw_manifest(dir.path(), &wire_json("", &files, ARTIFACT_SCHEMA_VERSION));
771 match read_manifest(dir.path()) {
772 Err(SnapshotError::ArtifactInvalid(_)) => {}
773 other => panic!("path {bad:?}: expected ArtifactInvalid, got {other:?}"),
774 }
775 }
776 }
777
778 #[test]
779 fn rejects_malformed_manifest_json() {
780 let dir = TempDir::new().unwrap();
783 write_raw_manifest(dir.path(), "not json at all {{{");
784 match read_manifest(dir.path()) {
785 Err(SnapshotError::ArtifactInvalid(msg)) => {
786 assert!(msg.contains("malformed"), "{msg}");
787 }
788 other => panic!("expected ArtifactInvalid, got {other:?}"),
789 }
790 }
791
792 #[cfg(unix)]
796 #[test]
797 fn hash_payload_rejects_symlink() {
798 let dir = TempDir::new().unwrap();
799 let payload = dir.path().join(PAYLOAD_DIR);
800 fs::create_dir(&payload).unwrap();
801 fs::write(payload.join("real"), b"data").unwrap();
802 let target = dir.path().join("outside");
803 fs::write(&target, b"outside the payload").unwrap();
804 std::os::unix::fs::symlink(&target, payload.join("link")).unwrap();
805
806 match hash_payload(dir.path()) {
807 Err(SnapshotError::ArtifactInvalid(msg)) => {
808 assert!(msg.contains("non-regular"), "{msg}");
809 }
810 other => panic!("expected ArtifactInvalid, got {other:?}"),
811 }
812 }
813
814 #[test]
818 fn export_stage_fails_closed_when_dest_appears_before_seal() {
819 let dir = TempDir::new().unwrap();
820 let dest = dir.path().join("artifact");
821 let stage = ExportStage::new(&dest).unwrap();
822 fs::create_dir(stage.payload()).unwrap();
823 fs::write(stage.payload().join("fold.snap"), b"data").unwrap();
824
825 fs::create_dir(&dest).unwrap();
827 fs::write(dest.join("stray"), b"x").unwrap();
828
829 let err = stage
830 .seal_and_finalize("append-log", "2", &WatchCursor::from_u64(1))
831 .unwrap_err();
832 assert!(matches!(err, SnapshotError::ArtifactInvalid(_)));
833 assert!(
834 dest.join("stray").exists(),
835 "occupied destination is untouched"
836 );
837 }
838
839 #[test]
840 fn hex_round_trips() {
841 for bytes in [&[][..], &[0u8][..], &[0xde, 0xad, 0xbe, 0xef][..]] {
842 assert_eq!(hex_decode(&hex_encode(bytes)).unwrap(), bytes);
843 }
844 assert!(hex_decode("0g").is_none());
845 assert!(hex_decode("a").is_none());
846 }
847
848 #[test]
849 fn dest_preconditions() {
850 let dir = TempDir::new().unwrap();
851 check_dest_available(&dir.path().join("absent")).unwrap();
853 let empty = dir.path().join("empty");
855 fs::create_dir(&empty).unwrap();
856 check_dest_available(&empty).unwrap();
857 let full = dir.path().join("full");
859 fs::create_dir(&full).unwrap();
860 fs::write(full.join("x"), b"x").unwrap();
861 assert!(matches!(
862 check_dest_available(&full),
863 Err(SnapshotError::ArtifactInvalid(_))
864 ));
865 let file = dir.path().join("file");
867 fs::write(&file, b"x").unwrap();
868 assert!(matches!(
869 check_dest_available(&file),
870 Err(SnapshotError::ArtifactInvalid(_))
871 ));
872 }
873}