Skip to main content

slipstream/
artifact.rs

1//! Artifact codec and staging for snapshot export/import (replica bootstrap).
2//!
3//! An **artifact** is a transferable copy of a durable fold: a directory holding
4//! the backend's files under `data/` plus a [`MANIFEST_FILE`] recording the
5//! backend's identity, its on-disk format generation, per-file checksums, and —
6//! the load-bearing part — the **watch cursor the files are consistent with**.
7//! Export produces one; import verifies and installs one; the consumer resumes
8//! its KV watch from the embedded cursor and replays only the log tail.
9//!
10//! This module owns what is common across backends: the manifest wire format,
11//! checksum verification, and the stage-then-atomic-rename discipline that keeps
12//! half-written artifacts and half-imported folds from ever being observable at
13//! their final paths. The backend-specific parts (how fjall/RocksDB/the append
14//! log get a consistent copy of their files) live with each backend.
15//!
16//! ## Crash safety
17//!
18//! - **Export**: payload and manifest are assembled in a hidden temp directory
19//!   beside `dest`, fsynced, and renamed into place. A crash mid-export leaves a
20//!   `.slipstream-artifact-*` temp dir (cleaned up by the next tempdir reaper or
21//!   operator) and **no** artifact at `dest`. An artifact that exists is complete.
22//! - **Import**: the payload is copied-and-verified into a temp directory beside
23//!   the destination, then renamed. A crash mid-import leaves no fold at the
24//!   destination; a crash after the rename leaves a fully valid fold (a retried
25//!   import then refuses the non-empty destination — the caller should simply
26//!   open it).
27//!
28//! ## Blocking I/O
29//!
30//! Everything here is synchronous file I/O, same discipline as
31//! [`crate::snapshot`]: async callers must offload to `spawn_blocking` (the
32//! [`watch_applied`](crate::watch_applied) export path does).
33
34use std::collections::BTreeSet;
35use std::fs::{self, File};
36use std::io::{Read, Write};
37use std::path::{Component, Path, PathBuf};
38use std::time::{SystemTime, UNIX_EPOCH};
39
40use serde::{Deserialize, Serialize};
41
42use crate::kv::{VersionToken, WatchCursor};
43use crate::snapshot::SnapshotError;
44
45/// Version of the artifact layout itself (`MANIFEST.json` schema + `data/`
46/// payload convention). Bumped only when the artifact shape changes; the
47/// *payload* format is governed separately by [`ExportManifest::backend_version`].
48pub const ARTIFACT_SCHEMA_VERSION: u32 = 1;
49
50/// Manifest file name at the artifact root. Written last and fsynced, so its
51/// presence (after the atomic rename) means the artifact is complete.
52pub const MANIFEST_FILE: &str = "MANIFEST.json";
53
54/// Directory under the artifact root holding the backend's payload files.
55pub(crate) const PAYLOAD_DIR: &str = "data";
56
57/// Streaming-hash buffer size.
58const HASH_BUF: usize = 1 << 20;
59
60/// Manifest of an exported artifact: what is in it, which backend wrote it, and
61/// the cursor its payload is consistent with.
62#[derive(Debug, Clone)]
63pub struct ExportManifest {
64    /// Artifact layout version ([`ARTIFACT_SCHEMA_VERSION`]).
65    pub schema_version: u32,
66    /// Backend identity: `"append-log"`, `"fjall"`, or `"rocksdb"`.
67    pub backend: String,
68    /// The backend's **on-disk format generation** (not a crate semver): the
69    /// append log's `FORMAT_VERSION` (`"2"`), fjall's format marker (`"3"`),
70    /// the rust-rocksdb binding version (`"0.50"`, informational — RocksDB
71    /// reads older formats and its own open is the arbiter).
72    pub backend_version: String,
73    /// The resume cursor the payload is exactly consistent with. Resuming the
74    /// watch from here replays only the post-export tail.
75    pub cursor: WatchCursor,
76    /// Export wall-clock time, seconds since the Unix epoch. Informational.
77    pub created_at_unix: u64,
78    /// Every payload file, with size and BLAKE3 digest. Import verifies all of
79    /// them and rejects undeclared extras.
80    pub files: Vec<ArtifactFile>,
81}
82
83/// One payload file in an [`ExportManifest`].
84#[derive(Debug, Clone)]
85pub struct ArtifactFile {
86    /// Path relative to the artifact root, `/`-separated, always under `data/`.
87    pub path: String,
88    /// Size in bytes.
89    pub size: u64,
90    /// Lowercase-hex BLAKE3 digest of the file contents. (BLAKE3 over SHA-256:
91    /// there is no interop constraint — slipstream writes and reads its own
92    /// manifests — and artifacts reach GBs, where BLAKE3's SIMD hashing is
93    /// several times faster.)
94    pub blake3: String,
95}
96
97// ---------------------------------------------------------------------------
98// Wire format (serde) — kept separate from the public types so the public
99// surface can hold a real WatchCursor while the JSON stays a stable hex string.
100// ---------------------------------------------------------------------------
101
102// `deny_unknown_fields` on both: the manifest is the trust boundary for a
103// remote-supplied artifact, and an unrecognized field is far more likely a
104// corrupted/hostile manifest or a schema mismatch than benign noise — reject
105// loudly rather than skip silently. Forward compatibility is governed by
106// `schema_version`, which is checked first in `manifest_from_slice`; a future
107// schema that adds fields must bump it.
108#[derive(Serialize, Deserialize)]
109#[serde(deny_unknown_fields)]
110struct ManifestWire {
111    schema_version: u32,
112    backend: String,
113    backend_version: String,
114    /// Raw cursor bytes as lowercase hex; empty string = no cursor
115    /// ([`WatchCursor::none`]).
116    cursor_hex: String,
117    created_at_unix: u64,
118    files: Vec<FileWire>,
119}
120
121#[derive(Serialize, Deserialize)]
122#[serde(deny_unknown_fields)]
123struct FileWire {
124    path: String,
125    size: u64,
126    blake3: String,
127}
128
129fn invalid(msg: impl Into<String>) -> SnapshotError {
130    SnapshotError::ArtifactInvalid(msg.into())
131}
132
133// ---------------------------------------------------------------------------
134// Hex (cursor encoding) — two tiny helpers instead of a dependency.
135// ---------------------------------------------------------------------------
136
137pub(crate) fn hex_encode(bytes: &[u8]) -> String {
138    let mut out = String::with_capacity(bytes.len() * 2);
139    for b in bytes {
140        use std::fmt::Write as _;
141        let _ = write!(out, "{b:02x}");
142    }
143    out
144}
145
146pub(crate) fn hex_decode(s: &str) -> Option<Vec<u8>> {
147    if !s.len().is_multiple_of(2) {
148        return None;
149    }
150    (0..s.len())
151        .step_by(2)
152        .map(|i| u8::from_str_radix(s.get(i..i + 2)?, 16).ok())
153        .collect()
154}
155
156fn cursor_to_hex(cursor: &WatchCursor) -> String {
157    hex_encode(cursor.version().as_bytes())
158}
159
160fn cursor_from_hex(s: &str) -> Result<WatchCursor, SnapshotError> {
161    let bytes = hex_decode(s).ok_or_else(|| invalid(format!("malformed cursor_hex: {s:?}")))?;
162    let token = VersionToken::from_raw(&bytes).ok_or_else(|| {
163        invalid(format!(
164            "cursor_hex decodes to {} bytes, exceeds version token capacity",
165            bytes.len()
166        ))
167    })?;
168    Ok(WatchCursor::from_version(token))
169}
170
171// ---------------------------------------------------------------------------
172// Manifest read/write
173// ---------------------------------------------------------------------------
174
175/// Serialize and write `MANIFEST.json` at the artifact root: tempfile in the
176/// same directory, fsync, atomic rename — the manifest is the artifact's
177/// completeness marker, so it must never be observable half-written.
178pub(crate) fn write_manifest(
179    artifact_root: &Path,
180    manifest: &ExportManifest,
181) -> Result<(), SnapshotError> {
182    let wire = ManifestWire {
183        schema_version: manifest.schema_version,
184        backend: manifest.backend.clone(),
185        backend_version: manifest.backend_version.clone(),
186        cursor_hex: cursor_to_hex(&manifest.cursor),
187        created_at_unix: manifest.created_at_unix,
188        files: manifest
189            .files
190            .iter()
191            .map(|f| FileWire {
192                path: f.path.clone(),
193                size: f.size,
194                blake3: f.blake3.clone(),
195            })
196            .collect(),
197    };
198    let json = serde_json::to_vec_pretty(&wire)
199        .map_err(|e| SnapshotError::Backend(format!("manifest serialization failed: {e}")))?;
200
201    let mut tmp = tempfile::NamedTempFile::new_in(artifact_root)?;
202    tmp.write_all(&json)?;
203    tmp.as_file().sync_all()?;
204    tmp.persist(artifact_root.join(MANIFEST_FILE))
205        .map_err(|e| SnapshotError::Io(e.error))?;
206    Ok(())
207}
208
209/// Read and validate `MANIFEST.json` from an artifact directory.
210pub(crate) fn read_manifest(artifact_dir: &Path) -> Result<ExportManifest, SnapshotError> {
211    let path = artifact_dir.join(MANIFEST_FILE);
212    let data = fs::read(&path).map_err(|e| {
213        if e.kind() == std::io::ErrorKind::NotFound {
214            invalid(format!("no {MANIFEST_FILE} in {}", artifact_dir.display()))
215        } else {
216            SnapshotError::Io(e)
217        }
218    })?;
219    manifest_from_slice(&data)
220}
221
222/// Parse and validate manifest JSON bytes.
223///
224/// Validates the schema version and every file path (relative, `/`-separated,
225/// no `..`, no `\`, under `data/`) so a hostile or corrupted manifest can never
226/// direct a copy outside the staging area (zip-slip).
227pub(crate) fn manifest_from_slice(data: &[u8]) -> Result<ExportManifest, SnapshotError> {
228    let wire: ManifestWire =
229        serde_json::from_slice(data).map_err(|e| invalid(format!("malformed manifest: {e}")))?;
230
231    if wire.schema_version != ARTIFACT_SCHEMA_VERSION {
232        return Err(invalid(format!(
233            "unsupported artifact schema_version {} (this build supports {})",
234            wire.schema_version, ARTIFACT_SCHEMA_VERSION
235        )));
236    }
237    for f in &wire.files {
238        validate_payload_path(&f.path)?;
239    }
240
241    Ok(ExportManifest {
242        schema_version: wire.schema_version,
243        backend: wire.backend,
244        backend_version: wire.backend_version,
245        cursor: cursor_from_hex(&wire.cursor_hex)?,
246        created_at_unix: wire.created_at_unix,
247        files: wire
248            .files
249            .into_iter()
250            .map(|f| ArtifactFile {
251                path: f.path,
252                size: f.size,
253                blake3: f.blake3,
254            })
255            .collect(),
256    })
257}
258
259/// Reject any manifest path that could escape the artifact when joined: it must
260/// be relative, `/`-separated, contain no `..`/`.` components, and live under
261/// `data/`.
262fn validate_payload_path(p: &str) -> Result<(), SnapshotError> {
263    let prefix = format!("{PAYLOAD_DIR}/");
264    if !p.starts_with(&prefix) || p.len() == prefix.len() {
265        return Err(invalid(format!(
266            "manifest path {p:?} is not under {PAYLOAD_DIR}/"
267        )));
268    }
269    if p.contains('\\') {
270        return Err(invalid(format!("manifest path {p:?} contains a backslash")));
271    }
272    let path = Path::new(p);
273    for comp in path.components() {
274        match comp {
275            Component::Normal(_) => {}
276            _ => {
277                return Err(invalid(format!(
278                    "manifest path {p:?} contains a non-normal component"
279                )));
280            }
281        }
282    }
283    Ok(())
284}
285
286// ---------------------------------------------------------------------------
287// Payload hashing
288// ---------------------------------------------------------------------------
289
290/// Streaming BLAKE3 of one file, returning `(size, hex_digest)` plus the open
291/// handle so the caller can `sync_all` without a second `open(2)`.
292fn hash_file(path: &Path, buf: &mut [u8]) -> Result<(File, u64, String), SnapshotError> {
293    let mut file = File::open(path)?;
294    let mut hasher = blake3::Hasher::new();
295    let mut size = 0u64;
296    loop {
297        let n = file.read(buf)?;
298        if n == 0 {
299            break;
300        }
301        size += n as u64;
302        hasher.update(&buf[..n]);
303    }
304    Ok((file, size, hasher.finalize().to_hex().to_string()))
305}
306
307/// Every regular file under `root/data/`, relative `/`-separated paths, sorted.
308fn list_payload_files(root: &Path) -> Result<Vec<PathBuf>, SnapshotError> {
309    let payload = root.join(PAYLOAD_DIR);
310    let mut out = Vec::new();
311    let mut stack = vec![payload.clone()];
312    while let Some(dir) = stack.pop() {
313        for entry in fs::read_dir(&dir)? {
314            let entry = entry?;
315            let ty = entry.file_type()?;
316            if ty.is_dir() {
317                stack.push(entry.path());
318            } else if ty.is_file() {
319                out.push(entry.path());
320            } else {
321                // Symlinks etc. have no place in an artifact: a symlink would
322                // hash as its target's bytes but restore as a link (or escape
323                // the payload entirely). Refuse at export so import never has
324                // to trust one.
325                return Err(invalid(format!(
326                    "payload contains a non-regular file: {}",
327                    entry.path().display()
328                )));
329            }
330        }
331    }
332    out.sort();
333    Ok(out)
334}
335
336/// Hash every payload file under `root/data/` and fsync the payload tree
337/// (files + directories), returning the manifest file list in sorted order.
338pub(crate) fn hash_payload(root: &Path) -> Result<Vec<ArtifactFile>, SnapshotError> {
339    let mut files = Vec::new();
340    let mut buf = vec![0u8; HASH_BUF];
341    for abs in list_payload_files(root)? {
342        let (file, size, blake3) = hash_file(&abs, &mut buf)?;
343        // Durability before the rename: the artifact's completeness contract is
344        // "exists ⇒ verifiable", which only holds if the hashed bytes are the
345        // on-disk bytes.
346        file.sync_all()?;
347        let rel = abs
348            .strip_prefix(root)
349            .map_err(|_| SnapshotError::Backend("payload path escaped artifact root".into()))?;
350        let rel = rel
351            .to_str()
352            .ok_or_else(|| invalid(format!("non-UTF-8 payload path: {}", rel.display())))?;
353        files.push(ArtifactFile {
354            path: rel.to_string(),
355            size,
356            blake3,
357        });
358    }
359    fsync_dir_tree(&root.join(PAYLOAD_DIR))?;
360    Ok(files)
361}
362
363fn fsync_dir(path: &Path) -> Result<(), SnapshotError> {
364    File::open(path)?.sync_all()?;
365    Ok(())
366}
367
368fn fsync_dir_tree(root: &Path) -> Result<(), SnapshotError> {
369    let mut stack = vec![root.to_path_buf()];
370    while let Some(dir) = stack.pop() {
371        fsync_dir(&dir)?;
372        for entry in fs::read_dir(&dir)? {
373            let entry = entry?;
374            if entry.file_type()?.is_dir() {
375                stack.push(entry.path());
376            }
377        }
378    }
379    Ok(())
380}
381
382// ---------------------------------------------------------------------------
383// Destination preconditions
384// ---------------------------------------------------------------------------
385
386/// A destination is available when it does not exist, or is an empty directory
387/// (removed just before the final rename). Anything else is refused — never
388/// overwrite a fold or an artifact in place.
389pub(crate) fn check_dest_available(dest: &Path) -> Result<(), SnapshotError> {
390    match fs::metadata(dest) {
391        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
392        Err(e) => Err(SnapshotError::Io(e)),
393        Ok(meta) if meta.is_dir() => {
394            if fs::read_dir(dest)?.next().is_some() {
395                Err(invalid(format!(
396                    "destination {} exists and is not empty",
397                    dest.display()
398                )))
399            } else {
400                Ok(())
401            }
402        }
403        Ok(_) => Err(invalid(format!(
404            "destination {} already exists",
405            dest.display()
406        ))),
407    }
408}
409
410/// Remove `dest` if it is an (already-verified-empty) directory so a rename can
411/// land on its path, then rename `from` onto it and fsync the parent.
412///
413/// The is_dir → remove_dir → rename sequence has a TOCTOU window: a concurrent
414/// writer can recreate `dest` between the remove and the rename. That race
415/// fails closed — `remove_dir` errors on a non-empty dir and `rename` errors
416/// when `dest` reappears as a file or non-empty dir — so the worst case is an
417/// error return, never a silent overwrite. Don't "fix" this with
418/// remove-then-retry; refusing the round is the intended behavior.
419pub(crate) fn rename_into_place(from: &Path, dest: &Path) -> Result<(), SnapshotError> {
420    if dest.is_dir() {
421        fs::remove_dir(dest)?;
422    }
423    fs::rename(from, dest)?;
424    if let Some(parent) = dest.parent() {
425        fsync_dir(parent)?;
426    }
427    Ok(())
428}
429
430fn stage_dir_in(parent: &Path) -> Result<tempfile::TempDir, SnapshotError> {
431    // Same directory as the destination so the final rename is a same-filesystem
432    // atomic rename (mirrors `compact_to_file`'s tempfile discipline). The
433    // hidden prefix keeps half-built stages out of an operator's way.
434    Ok(tempfile::Builder::new()
435        .prefix(".slipstream-artifact-")
436        .tempdir_in(parent)?)
437}
438
439fn dest_parent(dest: &Path) -> Result<&Path, SnapshotError> {
440    dest.parent()
441        .filter(|p| !p.as_os_str().is_empty())
442        .ok_or_else(|| {
443            invalid(format!(
444                "destination {} has no parent directory",
445                dest.display()
446            ))
447        })
448}
449
450// ---------------------------------------------------------------------------
451// Export staging
452// ---------------------------------------------------------------------------
453
454/// Assembles an artifact in a temp directory beside `dest`, then seals it
455/// (hash → manifest → fsync) and atomically renames it into place.
456///
457/// Backends write their payload into [`payload`](Self::payload), then call
458/// [`seal_and_finalize`](Self::seal_and_finalize).
459pub(crate) struct ExportStage {
460    dir: tempfile::TempDir,
461    dest: PathBuf,
462}
463
464impl ExportStage {
465    /// Create a stage for an artifact that will land at `dest_dir`. Fails fast
466    /// if `dest_dir` is unavailable (exists non-empty).
467    pub(crate) fn new(dest_dir: &Path) -> Result<Self, SnapshotError> {
468        check_dest_available(dest_dir)?;
469        let parent = dest_parent(dest_dir)?;
470        let dir = stage_dir_in(parent)?;
471        Ok(Self {
472            dir,
473            dest: dest_dir.to_path_buf(),
474        })
475    }
476
477    /// The payload directory the backend writes its files into.
478    ///
479    /// Deliberately NOT pre-created: RocksDB's checkpoint API requires its
480    /// target to not exist, so each backend creates (or lets the engine create)
481    /// this path itself.
482    pub(crate) fn payload(&self) -> PathBuf {
483        self.dir.path().join(PAYLOAD_DIR)
484    }
485
486    /// Hash the payload, write the manifest, fsync, and atomically rename the
487    /// stage to the destination. Returns the sealed manifest.
488    pub(crate) fn seal_and_finalize(
489        self,
490        backend: &str,
491        backend_version: &str,
492        cursor: &WatchCursor,
493    ) -> Result<ExportManifest, SnapshotError> {
494        let root = self.dir.path();
495        let files = hash_payload(root)?;
496        let manifest = ExportManifest {
497            schema_version: ARTIFACT_SCHEMA_VERSION,
498            backend: backend.to_string(),
499            backend_version: backend_version.to_string(),
500            cursor: cursor.clone(),
501            created_at_unix: SystemTime::now()
502                .duration_since(UNIX_EPOCH)
503                .map(|d| d.as_secs())
504                .unwrap_or(0),
505            files,
506        };
507        write_manifest(root, &manifest)?;
508        fsync_dir(root)?;
509
510        // Re-check the destination (it may have appeared since `new`), then
511        // rename. `keep()` disarms the TempDir destructor — the path has been
512        // renamed away, so there is nothing left for it to delete.
513        check_dest_available(&self.dest)?;
514        let dest = self.dest.clone();
515        let root = self.dir.keep();
516        rename_into_place(&root, &dest)?;
517        Ok(manifest)
518    }
519}
520
521// ---------------------------------------------------------------------------
522// Import staging
523// ---------------------------------------------------------------------------
524
525/// A verified copy of an artifact's payload, staged beside the destination and
526/// ready to be renamed into place.
527pub(crate) struct ImportStage {
528    dir: tempfile::TempDir,
529    dest: PathBuf,
530}
531
532impl ImportStage {
533    /// The staged payload root (mirrors the artifact's `data/` layout).
534    pub(crate) fn payload(&self) -> PathBuf {
535        self.dir.path().join(PAYLOAD_DIR)
536    }
537
538    /// Rename the whole staged payload directory onto `dest` (directory-shaped
539    /// backends: fjall, RocksDB).
540    #[cfg(any(feature = "fjall", feature = "rocksdb"))]
541    pub(crate) fn finalize_dir(self) -> Result<(), SnapshotError> {
542        check_dest_available(&self.dest)?;
543        rename_into_place(&self.payload(), &self.dest)
544        // TempDir drop removes the now-payload-less stage directory.
545    }
546
547    /// Rename a single staged payload file onto `dest` (file-shaped backends:
548    /// the append log).
549    pub(crate) fn finalize_file(self, rel: &str) -> Result<(), SnapshotError> {
550        check_dest_available(&self.dest)?;
551        rename_into_place(&self.payload().join(rel), &self.dest)
552    }
553}
554
555/// Validate an artifact against its manifest and stage a verified copy of its
556/// payload beside `dest`.
557///
558/// Checks, in order: manifest well-formedness ([`read_manifest`]), backend
559/// identity, backend version (via the caller's policy closure), destination
560/// availability, then every payload file (copied while hashing — size and
561/// BLAKE3 digest must match the manifest, and the payload must contain no undeclared
562/// extra files). The transport that delivered the artifact is untrusted; this
563/// re-verification is the trust boundary.
564pub(crate) fn verify_and_stage_import(
565    artifact_dir: &Path,
566    dest: &Path,
567    expected_backend: &str,
568    check_backend_version: impl Fn(&str) -> Result<(), SnapshotError>,
569) -> Result<(ExportManifest, ImportStage), SnapshotError> {
570    let manifest = read_manifest(artifact_dir)?;
571
572    if manifest.backend != expected_backend {
573        return Err(invalid(format!(
574            "artifact backend is {:?}, expected {:?}",
575            manifest.backend, expected_backend
576        )));
577    }
578    check_backend_version(&manifest.backend_version)?;
579    check_dest_available(dest)?;
580
581    // Undeclared extras: a file in the payload that the manifest doesn't list
582    // was never hashed at export — it cannot be trusted.
583    let declared: BTreeSet<&str> = manifest.files.iter().map(|f| f.path.as_str()).collect();
584    for abs in list_payload_files(artifact_dir)? {
585        let rel = abs
586            .strip_prefix(artifact_dir)
587            .map_err(|_| SnapshotError::Backend("payload path escaped artifact dir".into()))?;
588        let rel = rel
589            .to_str()
590            .ok_or_else(|| invalid(format!("non-UTF-8 payload path: {}", rel.display())))?;
591        if !declared.contains(rel) {
592            return Err(invalid(format!("payload contains undeclared file: {rel}")));
593        }
594    }
595
596    let parent = dest_parent(dest)?;
597    let dir = stage_dir_in(parent)?;
598    let stage = ImportStage {
599        dir,
600        dest: dest.to_path_buf(),
601    };
602
603    // Copy-while-hashing: one read pass per file serves both the copy and the
604    // verification. One buffer for the whole loop — a fresh 1 MiB zero-init per
605    // file would be O(files) wasted work on multi-hundred-file artifacts.
606    let mut buf = vec![0u8; HASH_BUF];
607    for f in &manifest.files {
608        let src_path = artifact_dir.join(&f.path);
609        let dst_path = stage.dir.path().join(&f.path);
610        if let Some(p) = dst_path.parent() {
611            fs::create_dir_all(p)?;
612        }
613        let mut src = File::open(&src_path).map_err(|e| {
614            if e.kind() == std::io::ErrorKind::NotFound {
615                invalid(format!("payload file missing: {}", f.path))
616            } else {
617                SnapshotError::Io(e)
618            }
619        })?;
620        let mut dst = File::create(&dst_path)?;
621        let mut hasher = blake3::Hasher::new();
622        let mut size = 0u64;
623        loop {
624            let n = src.read(&mut buf)?;
625            if n == 0 {
626                break;
627            }
628            size += n as u64;
629            hasher.update(&buf[..n]);
630            dst.write_all(&buf[..n])?;
631        }
632        dst.sync_all()?;
633        if size != f.size {
634            return Err(invalid(format!(
635                "payload file {} is {size} bytes, manifest says {}",
636                f.path, f.size
637            )));
638        }
639        let digest = hasher.finalize().to_hex().to_string();
640        if digest != f.blake3 {
641            return Err(invalid(format!(
642                "payload file {} checksum mismatch (got {digest}, manifest says {})",
643                f.path, f.blake3
644            )));
645        }
646    }
647    fsync_dir_tree(&stage.payload())?;
648
649    Ok((manifest, stage))
650}
651
652// ---------------------------------------------------------------------------
653// Tests
654// ---------------------------------------------------------------------------
655
656#[cfg(test)]
657mod tests {
658    use super::*;
659    use tempfile::TempDir;
660
661    fn manifest_with(files: Vec<ArtifactFile>, cursor: WatchCursor) -> ExportManifest {
662        ExportManifest {
663            schema_version: ARTIFACT_SCHEMA_VERSION,
664            backend: "append-log".into(),
665            backend_version: "2".into(),
666            cursor,
667            created_at_unix: 1_765_400_000,
668            files,
669        }
670    }
671
672    #[test]
673    fn manifest_round_trips() {
674        let dir = TempDir::new().unwrap();
675        let m = manifest_with(
676            vec![ArtifactFile {
677                path: "data/fold.snap".into(),
678                size: 42,
679                blake3: "ab".repeat(32),
680            }],
681            WatchCursor::from_u64(184_467),
682        );
683        write_manifest(dir.path(), &m).unwrap();
684        let got = read_manifest(dir.path()).unwrap();
685        assert_eq!(got.schema_version, m.schema_version);
686        assert_eq!(got.backend, m.backend);
687        assert_eq!(got.backend_version, m.backend_version);
688        assert_eq!(got.cursor, m.cursor);
689        assert_eq!(got.created_at_unix, m.created_at_unix);
690        assert_eq!(got.files.len(), 1);
691        assert_eq!(got.files[0].path, "data/fold.snap");
692        assert_eq!(got.files[0].size, 42);
693    }
694
695    #[test]
696    fn manifest_round_trips_none_cursor() {
697        let dir = TempDir::new().unwrap();
698        let m = manifest_with(vec![], WatchCursor::none());
699        write_manifest(dir.path(), &m).unwrap();
700        let got = read_manifest(dir.path()).unwrap();
701        assert!(got.cursor.is_none(), "none cursor survives the round trip");
702    }
703
704    #[test]
705    fn manifest_round_trips_fdb_width_cursor() {
706        // 10-byte tokens have no u64 form; the hex path must carry them intact.
707        let raw = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10];
708        let cursor = WatchCursor::from_version(VersionToken::from_raw(&raw).unwrap());
709        let dir = TempDir::new().unwrap();
710        write_manifest(dir.path(), &manifest_with(vec![], cursor.clone())).unwrap();
711        let got = read_manifest(dir.path()).unwrap();
712        assert_eq!(got.cursor, cursor);
713    }
714
715    fn write_raw_manifest(dir: &Path, json: &str) {
716        fs::write(dir.join(MANIFEST_FILE), json).unwrap();
717    }
718
719    fn wire_json(cursor_hex: &str, files: &str, schema: u32) -> String {
720        format!(
721            r#"{{"schema_version":{schema},"backend":"append-log","backend_version":"2",
722                 "cursor_hex":"{cursor_hex}","created_at_unix":0,"files":{files}}}"#
723        )
724    }
725
726    #[test]
727    fn rejects_bad_cursor_hex() {
728        let dir = TempDir::new().unwrap();
729        for bad in ["zz", "abc", "0102030405060708090a0b"] {
730            // non-hex, odd length, 11 bytes (> token capacity)
731            write_raw_manifest(dir.path(), &wire_json(bad, "[]", ARTIFACT_SCHEMA_VERSION));
732            match read_manifest(dir.path()) {
733                Err(SnapshotError::ArtifactInvalid(_)) => {}
734                other => panic!("cursor_hex {bad:?}: expected ArtifactInvalid, got {other:?}"),
735            }
736        }
737    }
738
739    #[test]
740    fn rejects_wrong_schema_version() {
741        let dir = TempDir::new().unwrap();
742        write_raw_manifest(
743            dir.path(),
744            &wire_json("", "[]", ARTIFACT_SCHEMA_VERSION + 1),
745        );
746        match read_manifest(dir.path()) {
747            Err(SnapshotError::ArtifactInvalid(msg)) => {
748                assert!(msg.contains("schema_version"), "{msg}");
749            }
750            other => panic!("expected ArtifactInvalid, got {other:?}"),
751        }
752    }
753
754    #[test]
755    fn rejects_path_traversal() {
756        let dir = TempDir::new().unwrap();
757        for bad in [
758            "../escape",
759            "/abs/path",
760            "data/../escape",
761            "data/a\\b",
762            "nondata/x",
763            "data/",
764            "data",
765        ] {
766            let files = format!(
767                r#"[{{"path":"{}","size":0,"blake3":""}}]"#,
768                bad.replace('\\', "\\\\")
769            );
770            write_raw_manifest(dir.path(), &wire_json("", &files, ARTIFACT_SCHEMA_VERSION));
771            match read_manifest(dir.path()) {
772                Err(SnapshotError::ArtifactInvalid(_)) => {}
773                other => panic!("path {bad:?}: expected ArtifactInvalid, got {other:?}"),
774            }
775        }
776    }
777
778    #[test]
779    fn rejects_malformed_manifest_json() {
780        // Truly unparseable bytes (not merely wrong field values) must surface
781        // as ArtifactInvalid, never an Io/Backend error or a panic.
782        let dir = TempDir::new().unwrap();
783        write_raw_manifest(dir.path(), "not json at all {{{");
784        match read_manifest(dir.path()) {
785            Err(SnapshotError::ArtifactInvalid(msg)) => {
786                assert!(msg.contains("malformed"), "{msg}");
787            }
788            other => panic!("expected ArtifactInvalid, got {other:?}"),
789        }
790    }
791
792    /// A symlink in the payload is refused at export: it would hash as its
793    /// target's bytes but restore as a link (or escape the payload entirely),
794    /// so `hash_payload` must reject it before a manifest is ever written.
795    #[cfg(unix)]
796    #[test]
797    fn hash_payload_rejects_symlink() {
798        let dir = TempDir::new().unwrap();
799        let payload = dir.path().join(PAYLOAD_DIR);
800        fs::create_dir(&payload).unwrap();
801        fs::write(payload.join("real"), b"data").unwrap();
802        let target = dir.path().join("outside");
803        fs::write(&target, b"outside the payload").unwrap();
804        std::os::unix::fs::symlink(&target, payload.join("link")).unwrap();
805
806        match hash_payload(dir.path()) {
807            Err(SnapshotError::ArtifactInvalid(msg)) => {
808                assert!(msg.contains("non-regular"), "{msg}");
809            }
810            other => panic!("expected ArtifactInvalid, got {other:?}"),
811        }
812    }
813
814    /// The TOCTOU window documented on `rename_into_place`: the destination
815    /// appears (non-empty) between `ExportStage::new` and `seal_and_finalize`.
816    /// The race must fail closed — an error return, never a silent overwrite.
817    #[test]
818    fn export_stage_fails_closed_when_dest_appears_before_seal() {
819        let dir = TempDir::new().unwrap();
820        let dest = dir.path().join("artifact");
821        let stage = ExportStage::new(&dest).unwrap();
822        fs::create_dir(stage.payload()).unwrap();
823        fs::write(stage.payload().join("fold.snap"), b"data").unwrap();
824
825        // A concurrent writer lands a non-empty directory at the destination.
826        fs::create_dir(&dest).unwrap();
827        fs::write(dest.join("stray"), b"x").unwrap();
828
829        let err = stage
830            .seal_and_finalize("append-log", "2", &WatchCursor::from_u64(1))
831            .unwrap_err();
832        assert!(matches!(err, SnapshotError::ArtifactInvalid(_)));
833        assert!(
834            dest.join("stray").exists(),
835            "occupied destination is untouched"
836        );
837    }
838
839    #[test]
840    fn hex_round_trips() {
841        for bytes in [&[][..], &[0u8][..], &[0xde, 0xad, 0xbe, 0xef][..]] {
842            assert_eq!(hex_decode(&hex_encode(bytes)).unwrap(), bytes);
843        }
844        assert!(hex_decode("0g").is_none());
845        assert!(hex_decode("a").is_none());
846    }
847
848    #[test]
849    fn dest_preconditions() {
850        let dir = TempDir::new().unwrap();
851        // Absent: fine.
852        check_dest_available(&dir.path().join("absent")).unwrap();
853        // Empty dir: fine.
854        let empty = dir.path().join("empty");
855        fs::create_dir(&empty).unwrap();
856        check_dest_available(&empty).unwrap();
857        // Non-empty dir: refused.
858        let full = dir.path().join("full");
859        fs::create_dir(&full).unwrap();
860        fs::write(full.join("x"), b"x").unwrap();
861        assert!(matches!(
862            check_dest_available(&full),
863            Err(SnapshotError::ArtifactInvalid(_))
864        ));
865        // Existing file: refused.
866        let file = dir.path().join("file");
867        fs::write(&file, b"x").unwrap();
868        assert!(matches!(
869            check_dest_available(&file),
870            Err(SnapshotError::ArtifactInvalid(_))
871        ));
872    }
873}