Skip to main content

snapdir_stores/
file_store.rs

1//! `FileStore`: the `file://` storage backend.
2//!
3//! A [`FileStore`] is rooted at a local directory and holds the frozen
4//! content-addressable `.objects`/`.manifests` sharded layout, so a store
5//! directory written by any conforming implementation is interchangeable:
6//!
7//! ```text
8//! <root>/.objects/<sharded checksum>     raw file bytes
9//! <root>/.manifests/<sharded snapshot id> manifest text
10//! ```
11//!
12//! Sharding and the on-disk paths come straight from [`snapdir_core::store`]
13//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
14//!
15//! # Oracle parity
16//!
17//! - **`new` / URL parsing** mirrors `_snapdir_file_store_get_store_dir`:
18//!   strips a leading `file://`, `file:///`, `file://localhost/` (etc.) prefix
19//!   down to an absolute path and drops a trailing slash.
20//! - **`push`** mirrors `snapdir_file_store_get_push_command` +
21//!   `_snapdir_file_store_persit`: it is a no-op if the manifest already exists
22//!   (skip-if-present); otherwise it writes every referenced object that is
23//!   absent (skip-if-present per object) *before* writing the manifest, so a
24//!   present manifest always implies all of its objects are present.
25//! - **`fetch_files` / `get_manifest`** mirror the fetch side of
26//!   `_snapdir_file_store_persit`: copy to a temp path, verify the content
27//!   BLAKE3 against the expected checksum, retry up to five times, then
28//!   atomically rename into place.
29//!
30//! All I/O is native in-process filesystem I/O; nothing shells out.
31
32use std::fs;
33use std::io;
34use std::path::{Path, PathBuf};
35
36use snapdir_core::manifest::{Manifest, PathType};
37use snapdir_core::merkle::{Blake3Hasher, Hasher};
38use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
39
40use crate::transfer::TransferConfig;
41use crate::util::{file_present_and_verified, hash_file};
42
43/// Number of times the oracle retries a persist whose copied bytes fail their
44/// checksum but whose source still verifies (`_SNAPDIR_FILE_STORE_RETRIES`).
45const MAX_PERSIST_RETRIES: u32 = 5;
46
47/// A content-addressable store backed by a local directory (the `file://`
48/// backend).
49///
50/// Construct one with [`FileStore::new`] (parsing a `file://` URL or a bare
51/// path) or [`FileStore::from_root`] (an already-resolved directory).
52#[derive(Debug, Clone)]
53pub struct FileStore {
54    root: PathBuf,
55    config: TransferConfig,
56}
57
58impl FileStore {
59    /// Builds a store from a `store` URL or path, matching the oracle's
60    /// `_snapdir_file_store_get_store_dir`.
61    ///
62    /// Accepts `file:///abs/path`, `file://localhost/abs/path`, `file://`
63    /// followed by an absolute path, or a bare absolute path. A leading
64    /// `file:` scheme (with any number of slashes, optionally `localhost`) is
65    /// rewritten to a single leading `/`, and a trailing slash is dropped.
66    #[must_use]
67    pub fn new(store: &str) -> Self {
68        Self::from_root(parse_store_dir(store))
69    }
70
71    /// Like [`new`](Self::new), but carries a [`TransferConfig`] for
72    /// concurrency / bandwidth control.
73    #[must_use]
74    pub fn new_with_config(store: &str, config: TransferConfig) -> Self {
75        Self::from_root_with_config(parse_store_dir(store), config)
76    }
77
78    /// Builds a store rooted at an already-resolved directory.
79    #[must_use]
80    pub fn from_root(root: impl Into<PathBuf>) -> Self {
81        Self::from_root_with_config(root, TransferConfig::default())
82    }
83
84    /// Like [`from_root`](Self::from_root), but carries a [`TransferConfig`] for
85    /// concurrency / bandwidth control. [`from_root`](Self::from_root) and
86    /// [`new`](Self::new) delegate here with [`TransferConfig::default`].
87    #[must_use]
88    pub fn from_root_with_config(root: impl Into<PathBuf>, config: TransferConfig) -> Self {
89        Self {
90            root: root.into(),
91            config,
92        }
93    }
94
95    /// Returns the store's root directory.
96    #[must_use]
97    pub fn root(&self) -> &Path {
98        &self.root
99    }
100
101    /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
102    /// with. Consumed by the transfer loops in later gates.
103    #[must_use]
104    pub fn transfer_config(&self) -> &TransferConfig {
105        &self.config
106    }
107
108    /// Absolute on-disk path of an object given its checksum.
109    fn object_disk_path(&self, checksum: &str) -> PathBuf {
110        self.root.join(object_path(checksum))
111    }
112
113    /// Absolute on-disk path of a manifest given its snapshot id.
114    fn manifest_disk_path(&self, id: &str) -> PathBuf {
115        self.root.join(manifest_path(id))
116    }
117
118    /// Copies a batch of `(source, target, expected_checksum)` jobs through
119    /// [`persist`] across a thread pool bounded by `self.config.concurrency`.
120    ///
121    /// Local copies have no network bandwidth concern, so the async
122    /// rate-limited transfer driver does not apply here — only the concurrency
123    /// cap. The first [`StoreError`] is propagated and stops scheduling further
124    /// work (`try_for_each`). A `concurrency` of 1 yields a single-threaded
125    /// sequential copy. Each task uses a fresh, cheap, stateless
126    /// [`Blake3Hasher`] to sidestep any `Sync` concern.
127    fn parallel_copy(&self, jobs: &[(PathBuf, PathBuf, String)]) -> Result<(), StoreError> {
128        use rayon::prelude::*;
129
130        if jobs.is_empty() {
131            return Ok(());
132        }
133
134        let pool = rayon::ThreadPoolBuilder::new()
135            .num_threads(self.config.concurrency.get())
136            .build()
137            .map_err(|err| StoreError::Backend {
138                message: "failed to build copy thread pool".to_owned(),
139                source: Some(Box::new(err)),
140            })?;
141
142        pool.install(|| {
143            jobs.par_iter().try_for_each(|(source, target, expected)| {
144                persist(source, target, expected, &Blake3Hasher::new())
145            })
146        })
147    }
148}
149
150impl Store for FileStore {
151    fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
152        let path = self.manifest_disk_path(id);
153        let bytes = match fs::read(&path) {
154            Ok(bytes) => bytes,
155            Err(err) if err.kind() == io::ErrorKind::NotFound => {
156                return Err(StoreError::ManifestNotFound { id: id.to_owned() });
157            }
158            Err(err) => return Err(StoreError::Io(err)),
159        };
160
161        // The snapshot id is BLAKE3 of the comment-stripped manifest text with
162        // the oracle's trailing `echo` newline. Verify the stored bytes hash
163        // back to `id` before trusting them (oracle: the manifest id check on
164        // fetch). `snapshot_id` in core re-renders + re-hashes the parsed
165        // manifest, so parse first, then verify against the parsed form.
166        let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
167            message: format!("manifest {id} is not valid UTF-8"),
168            source: Some(Box::new(err)),
169        })?;
170        let manifest = Manifest::parse(&text)?;
171
172        let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
173        if actual != id {
174            return Err(StoreError::Integrity {
175                address: manifest_path(id),
176                expected: id.to_owned(),
177                actual,
178            });
179        }
180
181        Ok(manifest)
182    }
183
184    fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
185        let hasher = Blake3Hasher::new();
186
187        // First, SEQUENTIAL pass: materialize every directory and pre-create
188        // each file's parent (so the parallel copies below never race on
189        // `create_dir_all` of the same ancestor), short-circuit files that are
190        // already present-and-verified (skip-if-present-and-verified — no object
191        // read at all, so a populated dest succeeds even if the store object is
192        // gone), and confirm the source object exists for the rest (preserving
193        // the `ObjectNotFound` error when a needed source is missing). The file
194        // entries that actually need copying are collected as `(source, target,
195        // checksum)` jobs for the parallel phase.
196        let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
197        for entry in manifest.entries() {
198            let rel = strip_leading_dot_slash(&entry.path);
199            let target = dest.join(rel);
200            match entry.path_type {
201                PathType::Directory => {
202                    fs::create_dir_all(&target)?;
203                }
204                PathType::File => {
205                    // A destination file that already exists and whose content
206                    // hashes to the manifest's checksum needs no copy. A
207                    // mismatching/corrupt local file falls through and is
208                    // repaired by the persist below.
209                    if file_present_and_verified(&target, &entry.checksum, &hasher) {
210                        continue;
211                    }
212                    if let Some(parent) = target.parent() {
213                        fs::create_dir_all(parent)?;
214                    }
215                    let source = self.object_disk_path(&entry.checksum);
216                    if !source.exists() {
217                        return Err(StoreError::ObjectNotFound {
218                            checksum: entry.checksum.clone(),
219                        });
220                    }
221                    jobs.push((source, target, entry.checksum.clone()));
222                }
223            }
224        }
225
226        // Parallel copy phase, bounded by `config.concurrency`. `try_for_each`
227        // propagates the first `StoreError` and stops scheduling new work. Each
228        // task uses a fresh, cheap, stateless `Blake3Hasher`.
229        self.parallel_copy(&jobs)
230    }
231
232    fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
233        // Compute the snapshot id of the manifest we are about to push so we
234        // can locate (and skip-if-present) its manifest file.
235        let hasher = Blake3Hasher::new();
236        let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
237        let manifest_target = self.manifest_disk_path(&id);
238
239        // Skip-if-present: nothing to do when the manifest already exists. A
240        // present manifest implies all its objects are present (we maintain
241        // that invariant by writing the manifest last).
242        if manifest_target.exists() {
243            return Ok(());
244        }
245
246        // Collect every referenced object that is absent (skip-if-present per
247        // object: an object already filed under its content address is trusted,
248        // it is content-addressable). These are copied BEFORE the manifest.
249        let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
250        for entry in manifest.entries() {
251            if entry.path_type != PathType::File {
252                continue;
253            }
254            let object_target = self.object_disk_path(&entry.checksum);
255            if object_target.exists() {
256                continue;
257            }
258            let rel = strip_leading_dot_slash(&entry.path);
259            let object_source = source.join(rel);
260            jobs.push((object_source, object_target, entry.checksum.clone()));
261        }
262
263        // Parallel copy phase, bounded by `config.concurrency`. ALL-OR-NOTHING:
264        // any error returns immediately and NO manifest is written; the
265        // manifest is written only after every object copy succeeds, preserving
266        // the invariant that a present manifest implies present objects.
267        self.parallel_copy(&jobs)?;
268
269        // Write the manifest last, via the same verify/retry/atomic-rename
270        // path, so a present manifest always implies present objects.
271        write_manifest(manifest, &manifest_target, &id, &hasher)?;
272        Ok(())
273    }
274}
275
276/// Copies `source` to `target`, verifying the content BLAKE3 against
277/// `expected`, retrying up to [`MAX_PERSIST_RETRIES`] times, then atomically
278/// renaming into place. Mirrors `_snapdir_file_store_persit`.
279fn persist(
280    source: &Path,
281    target: &Path,
282    expected: &str,
283    hasher: &impl Hasher,
284) -> Result<(), StoreError> {
285    if let Some(parent) = target.parent() {
286        fs::create_dir_all(parent)?;
287    }
288
289    let mut attempts_left = MAX_PERSIST_RETRIES;
290    loop {
291        // Copy to a unique temp path beside the target so the final rename is
292        // an atomic, same-filesystem move (the oracle's `.tmp` discipline).
293        let tmp = temp_sibling(target);
294        copy_file(source, &tmp)?;
295
296        let actual = hash_file(&tmp, hasher)?;
297        if actual == expected {
298            // Atomic rename into the final content-addressed location.
299            fs::rename(&tmp, target)?;
300            return Ok(());
301        }
302
303        // Copied bytes did not verify. Clean up the temp file and decide
304        // whether to retry: the oracle only retries when the *source* still
305        // hashes to the expected value, otherwise the source itself is bad.
306        let _ = fs::remove_file(&tmp);
307        let source_actual = hash_file(source, hasher)?;
308        if source_actual != expected {
309            return Err(StoreError::Integrity {
310                address: source.display().to_string(),
311                expected: expected.to_owned(),
312                actual: source_actual,
313            });
314        }
315
316        attempts_left = attempts_left.saturating_sub(1);
317        if attempts_left == 0 {
318            return Err(StoreError::Integrity {
319                address: target.display().to_string(),
320                expected: expected.to_owned(),
321                actual,
322            });
323        }
324    }
325}
326
327/// Writes a manifest's text to `target`, verifying it hashes to `id`, then
328/// atomically renaming into place. The manifest's "content" is the
329/// snapshot-id-bearing text (`Display` + trailing newline), so we verify with
330/// [`snapdir_core::merkle::snapshot_id`] rather than a raw byte hash.
331fn write_manifest(
332    manifest: &Manifest,
333    target: &Path,
334    id: &str,
335    hasher: &impl Hasher,
336) -> Result<(), StoreError> {
337    if let Some(parent) = target.parent() {
338        fs::create_dir_all(parent)?;
339    }
340
341    // The on-disk manifest must hash (snapshot_id) back to `id`. Render once
342    // and confirm before writing.
343    let actual = snapdir_core::merkle::snapshot_id(manifest, hasher);
344    if actual != id {
345        return Err(StoreError::Integrity {
346            address: target.display().to_string(),
347            expected: id.to_owned(),
348            actual,
349        });
350    }
351
352    // Oracle stores `echo "${manifest}"` — the manifest text plus a single
353    // trailing newline (the same bytes snapshot_id hashes).
354    let mut text = manifest.to_string();
355    text.push('\n');
356
357    let tmp = temp_sibling(target);
358    fs::write(&tmp, text.as_bytes())?;
359    fs::rename(&tmp, target)?;
360    Ok(())
361}
362
363/// Copies a regular file's bytes from `source` to `target` (mirrors the
364/// oracle's `cp -RL -n`: dereference, do not clobber — `target` is a fresh
365/// temp path so the no-clobber aspect is implicit).
366fn copy_file(source: &Path, target: &Path) -> Result<(), StoreError> {
367    fs::copy(source, target)?;
368    Ok(())
369}
370
371/// Builds a unique temp sibling path for `target` (same directory, so the
372/// final rename stays on one filesystem). Uses pid + a process-monotonic
373/// counter so concurrent persists never collide.
374fn temp_sibling(target: &Path) -> PathBuf {
375    use std::sync::atomic::{AtomicU64, Ordering};
376    static COUNTER: AtomicU64 = AtomicU64::new(0);
377    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
378    let pid = std::process::id();
379    let file_name = target
380        .file_name()
381        .map(|s| s.to_string_lossy().into_owned())
382        .unwrap_or_default();
383    let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
384    match target.parent() {
385        Some(parent) => parent.join(tmp_name),
386        None => PathBuf::from(tmp_name),
387    }
388}
389
390/// Strips a leading `./` (relative-mode manifest paths) and a trailing `/`
391/// (directory entries) so the remainder can be joined onto a destination root.
392fn strip_leading_dot_slash(path: &str) -> &str {
393    let trimmed = path.strip_prefix("./").unwrap_or(path);
394    trimmed.strip_suffix('/').unwrap_or(trimmed)
395}
396
397/// Resolves a `store` URL/path to its on-disk directory, matching the oracle's
398/// `_snapdir_file_store_get_store_dir`:
399///
400/// ```sh
401/// store_dir="$(echo "$store" | sed -E 's|^file:/*(localhost/?)?|/|')"
402/// echo "${store_dir%/}"
403/// ```
404///
405/// i.e. replace a leading `file:` + any number of `/` (optionally followed by
406/// `localhost` + optional `/`) with a single `/`, then strip a trailing slash.
407fn parse_store_dir(store: &str) -> PathBuf {
408    let resolved = if let Some(rest) = store.strip_prefix("file:") {
409        // Drop the run of slashes the scheme leaves behind.
410        let rest = rest.trim_start_matches('/');
411        // An optional `localhost` host segment, with an optional trailing
412        // slash, is also dropped by the oracle's regex.
413        let rest = if let Some(after) = rest.strip_prefix("localhost") {
414            after.strip_prefix('/').unwrap_or(after)
415        } else {
416            rest
417        };
418        // The regex always substitutes a single leading `/`.
419        format!("/{rest}")
420    } else {
421        store.to_owned()
422    };
423
424    // `${store_dir%/}` — strip a single trailing slash (but keep a bare "/").
425    let trimmed = if resolved.len() > 1 {
426        resolved.strip_suffix('/').unwrap_or(&resolved)
427    } else {
428        &resolved
429    };
430    PathBuf::from(trimmed)
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use snapdir_core::manifest::ManifestEntry;
437    use std::fs;
438    use std::path::Path;
439
440    // A tiny temp-dir helper so tests don't pull in a dev-dependency. Creates a
441    // unique directory under the system temp dir and removes it on drop.
442    struct TempDir {
443        path: PathBuf,
444    }
445
446    impl TempDir {
447        fn new(tag: &str) -> Self {
448            use std::sync::atomic::{AtomicU64, Ordering};
449            static COUNTER: AtomicU64 = AtomicU64::new(0);
450            let n = COUNTER.fetch_add(1, Ordering::Relaxed);
451            let path = std::env::temp_dir().join(format!(
452                "snapdir-filestore-test-{}-{tag}-{n}",
453                std::process::id()
454            ));
455            fs::create_dir_all(&path).expect("create temp dir");
456            Self { path }
457        }
458
459        fn path(&self) -> &Path {
460            &self.path
461        }
462    }
463
464    impl Drop for TempDir {
465        fn drop(&mut self) {
466            let _ = fs::remove_dir_all(&self.path);
467        }
468    }
469
470    /// Builds a manifest for a source tree containing `foo` ("foo\n") and
471    /// `bar` ("bar\n") and writes those files into `source`. Returns the
472    /// manifest and its snapshot id. Checksums are the real BLAKE3 of the
473    /// file bytes so the store's verification passes.
474    fn make_foo_bar_source(source: &Path) -> (Manifest, String) {
475        let hasher = Blake3Hasher::new();
476        fs::write(source.join("foo"), b"foo\n").unwrap();
477        fs::write(source.join("bar"), b"bar\n").unwrap();
478        let foo_sum = hasher.hash_hex(b"foo\n");
479        let bar_sum = hasher.hash_hex(b"bar\n");
480
481        let root_sum =
482            snapdir_core::merkle::directory_checksum([foo_sum.as_str(), bar_sum.as_str()], &hasher);
483
484        let mut manifest = Manifest::new();
485        manifest.push(ManifestEntry::new(
486            PathType::Directory,
487            "700",
488            root_sum,
489            8,
490            "./",
491        ));
492        manifest.push(ManifestEntry::new(
493            PathType::File,
494            "600",
495            bar_sum,
496            4,
497            "./bar",
498        ));
499        manifest.push(ManifestEntry::new(
500            PathType::File,
501            "600",
502            foo_sum,
503            4,
504            "./foo",
505        ));
506        let manifest = Manifest::from_entries(manifest.entries().to_vec());
507        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
508        (manifest, id)
509    }
510
511    #[test]
512    fn file_store_parse_store_dir_matches_oracle_sed() {
513        // file:// + abs path -> abs path; trailing slash stripped.
514        assert_eq!(
515            parse_store_dir("file:///tmp/store"),
516            PathBuf::from("/tmp/store")
517        );
518        assert_eq!(
519            parse_store_dir("file:///tmp/store/"),
520            PathBuf::from("/tmp/store")
521        );
522        // localhost host segment dropped.
523        assert_eq!(
524            parse_store_dir("file://localhost/tmp/store"),
525            PathBuf::from("/tmp/store")
526        );
527        // file:// + abs path with two slashes.
528        assert_eq!(
529            parse_store_dir("file://tmp/store"),
530            PathBuf::from("/tmp/store")
531        );
532        // bare absolute path left intact.
533        assert_eq!(parse_store_dir("/tmp/store"), PathBuf::from("/tmp/store"));
534        // bare root preserved.
535        assert_eq!(parse_store_dir("file:///"), PathBuf::from("/"));
536    }
537
538    #[test]
539    fn file_store_push_lands_objects_at_sharded_keys_and_manifest_last() {
540        let store_dir = TempDir::new("store");
541        let src_dir = TempDir::new("src");
542        let (manifest, id) = make_foo_bar_source(src_dir.path());
543
544        let store = FileStore::from_root(store_dir.path());
545        store.push(&manifest, src_dir.path()).expect("push ok");
546
547        // Objects land at the exact sharded keys.
548        for entry in manifest.entries() {
549            if entry.path_type == PathType::File {
550                let obj = store_dir.path().join(object_path(&entry.checksum));
551                assert!(obj.exists(), "expected object at {}", obj.display());
552                // Content matches.
553                let bytes = fs::read(&obj).unwrap();
554                assert_eq!(
555                    Blake3Hasher::new().hash_hex(&bytes),
556                    entry.checksum,
557                    "object content must hash to its address"
558                );
559            }
560        }
561
562        // Manifest written at its sharded key, and hashes back to the id.
563        let man_path = store_dir.path().join(manifest_path(&id));
564        assert!(man_path.exists(), "manifest must exist after push");
565        let read_back = store.get_manifest(&id).expect("manifest reads back");
566        assert_eq!(read_back, manifest);
567    }
568
569    #[test]
570    fn file_store_push_skips_when_manifest_present() {
571        let store_dir = TempDir::new("store");
572        let src_dir = TempDir::new("src");
573        let (manifest, id) = make_foo_bar_source(src_dir.path());
574        let store = FileStore::from_root(store_dir.path());
575        store.push(&manifest, src_dir.path()).expect("first push");
576
577        // Remove an object but keep the manifest: a second push must skip
578        // entirely (manifest-present short-circuit), leaving the object gone.
579        let foo_entry = manifest
580            .entries()
581            .iter()
582            .find(|e| e.path == "./foo")
583            .unwrap();
584        let obj = store_dir.path().join(object_path(&foo_entry.checksum));
585        fs::remove_file(&obj).unwrap();
586
587        let _ = id;
588        store
589            .push(&manifest, src_dir.path())
590            .expect("second push skips");
591        assert!(
592            !obj.exists(),
593            "manifest-present push must be a full no-op (object stays removed)"
594        );
595    }
596
597    #[test]
598    fn file_store_push_skips_present_objects_but_adds_missing() {
599        let store_dir = TempDir::new("store");
600        let src_dir = TempDir::new("src");
601        let (manifest, id) = make_foo_bar_source(src_dir.path());
602        let store = FileStore::from_root(store_dir.path());
603        store.push(&manifest, src_dir.path()).expect("first push");
604
605        // Delete the manifest and one object; re-push must re-create the
606        // missing object (and the manifest) without erroring on the present one.
607        let man_path = store_dir.path().join(manifest_path(&id));
608        fs::remove_file(&man_path).unwrap();
609        let foo_entry = manifest
610            .entries()
611            .iter()
612            .find(|e| e.path == "./foo")
613            .unwrap();
614        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
615        fs::remove_file(&foo_obj).unwrap();
616
617        store.push(&manifest, src_dir.path()).expect("re-push");
618        assert!(foo_obj.exists(), "missing object must be re-added");
619        assert!(man_path.exists(), "manifest must be re-written");
620    }
621
622    #[test]
623    fn file_store_fetch_round_trips_and_verifies() {
624        let store_dir = TempDir::new("store");
625        let src_dir = TempDir::new("src");
626        let dest_dir = TempDir::new("dest");
627        let (manifest, id) = make_foo_bar_source(src_dir.path());
628        let store = FileStore::from_root(store_dir.path());
629        store.push(&manifest, src_dir.path()).expect("push");
630
631        let fetched = store.get_manifest(&id).expect("get manifest");
632        store
633            .fetch_files(&fetched, dest_dir.path())
634            .expect("fetch files");
635
636        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
637        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
638    }
639
640    #[test]
641    fn file_store_get_manifest_missing_is_not_found() {
642        let store_dir = TempDir::new("store");
643        let store = FileStore::from_root(store_dir.path());
644        let missing = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
645        match store.get_manifest(missing) {
646            Err(StoreError::ManifestNotFound { id }) => assert_eq!(id, missing),
647            other => panic!("expected ManifestNotFound, got {other:?}"),
648        }
649    }
650
651    #[test]
652    fn file_store_get_manifest_tampered_fails_integrity() {
653        let store_dir = TempDir::new("store");
654        let src_dir = TempDir::new("src");
655        let (manifest, id) = make_foo_bar_source(src_dir.path());
656        let store = FileStore::from_root(store_dir.path());
657        store.push(&manifest, src_dir.path()).expect("push");
658
659        // Tamper with the stored manifest bytes.
660        let man_path = store_dir.path().join(manifest_path(&id));
661        fs::write(&man_path, b"D 700 deadbeef 0 ./\n").unwrap();
662
663        match store.get_manifest(&id) {
664            Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, id),
665            other => panic!("expected Integrity, got {other:?}"),
666        }
667    }
668
669    #[test]
670    fn file_store_fetch_missing_object_is_not_found() {
671        let store_dir = TempDir::new("store");
672        let dest_dir = TempDir::new("dest");
673        let hasher = Blake3Hasher::new();
674        let foo_sum = hasher.hash_hex(b"foo\n");
675
676        let mut manifest = Manifest::new();
677        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 4, "./"));
678        manifest.push(ManifestEntry::new(
679            PathType::File,
680            "600",
681            foo_sum.clone(),
682            4,
683            "./foo",
684        ));
685
686        let store = FileStore::from_root(store_dir.path());
687        match store.fetch_files(&manifest, dest_dir.path()) {
688            Err(StoreError::ObjectNotFound { checksum }) => assert_eq!(checksum, foo_sum),
689            other => panic!("expected ObjectNotFound, got {other:?}"),
690        }
691    }
692
693    #[test]
694    fn file_store_persist_rejects_corrupt_source() {
695        // A "source" object whose bytes do not match the claimed checksum must
696        // fail integrity (the oracle's "Invalid source checksum" path), not
697        // silently store corrupt data.
698        let store_dir = TempDir::new("store");
699        let src_dir = TempDir::new("src");
700        let dest_dir = TempDir::new("dest");
701        let hasher = Blake3Hasher::new();
702
703        // Real foo source/manifest, then corrupt the stored object so fetch's
704        // verify-on-copy trips and the source (the corrupt store object) fails.
705        let (manifest, id) = make_foo_bar_source(src_dir.path());
706        let store = FileStore::from_root(store_dir.path());
707        store.push(&manifest, src_dir.path()).expect("push");
708
709        let foo_entry = manifest
710            .entries()
711            .iter()
712            .find(|e| e.path == "./foo")
713            .unwrap();
714        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
715        fs::write(&foo_obj, b"corrupted not foo\n").unwrap();
716        // Sanity: the corrupted bytes really differ from the expected sum.
717        assert_ne!(hasher.hash_hex(b"corrupted not foo\n"), foo_entry.checksum);
718
719        let fetched = store.get_manifest(&id).expect("manifest still valid");
720        match store.fetch_files(&fetched, dest_dir.path()) {
721            Err(StoreError::Integrity { expected, .. }) => {
722                assert_eq!(expected, foo_entry.checksum);
723            }
724            other => panic!("expected Integrity from corrupt object, got {other:?}"),
725        }
726        // The corrupt object must NOT have been materialized at the dest.
727        assert!(!dest_dir.path().join("foo").exists());
728    }
729
730    #[test]
731    fn fetch_skip_present_verified() {
732        // Push a tree, fetch it (populating dest), then DELETE the store's whole
733        // `.objects` tree so any object read would now fail with ObjectNotFound.
734        // A second fetch into the SAME dest must still return Ok — proving every
735        // file was skipped via local checksum match (ZERO object reads).
736        let store_dir = TempDir::new("store");
737        let src_dir = TempDir::new("src");
738        let dest_dir = TempDir::new("dest");
739        let (manifest, id) = make_foo_bar_source(src_dir.path());
740
741        let store = FileStore::from_root(store_dir.path());
742        store.push(&manifest, src_dir.path()).expect("push");
743
744        let fetched = store.get_manifest(&id).expect("get manifest");
745        store
746            .fetch_files(&fetched, dest_dir.path())
747            .expect("first fetch populates dest");
748        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
749        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
750
751        // Nuke every object in the store. Any read of an object now fails.
752        let objects = store_dir.path().join(".objects");
753        fs::remove_dir_all(&objects).expect("remove .objects tree");
754        assert!(!objects.exists());
755
756        // Second fetch into the populated dest must succeed without reading a
757        // single (now-missing) object.
758        store
759            .fetch_files(&fetched, dest_dir.path())
760            .expect("second fetch skips every present+verified file (no object reads)");
761
762        // Dest contents intact.
763        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
764        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
765    }
766
767    #[test]
768    fn file_store_fetch_repairs_corrupt_dest_and_skips_intact() {
769        // With store objects present: corrupt one dest file. The corrupted file
770        // is re-fetched (repaired) to match its checksum again, while an
771        // unrelated already-correct dest file is still skipped.
772        let store_dir = TempDir::new("store");
773        let src_dir = TempDir::new("src");
774        let dest_dir = TempDir::new("dest");
775        let (manifest, id) = make_foo_bar_source(src_dir.path());
776
777        let store = FileStore::from_root(store_dir.path());
778        store.push(&manifest, src_dir.path()).expect("push");
779        let fetched = store.get_manifest(&id).expect("get manifest");
780        store
781            .fetch_files(&fetched, dest_dir.path())
782            .expect("first fetch populates dest");
783
784        // Corrupt `foo` in the dest; leave `bar` correct.
785        fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
786        // Remove `bar`'s store object so it CANNOT be re-fetched; the only way a
787        // second fetch can succeed is if `bar` is skipped (present + verified).
788        let bar_entry = manifest
789            .entries()
790            .iter()
791            .find(|e| e.path == "./bar")
792            .unwrap();
793        let bar_obj = store_dir.path().join(object_path(&bar_entry.checksum));
794        fs::remove_file(&bar_obj).unwrap();
795
796        store
797            .fetch_files(&fetched, dest_dir.path())
798            .expect("repair corrupt foo, skip intact bar");
799
800        // foo repaired back to its checksummed content; bar untouched.
801        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
802        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
803    }
804
805    #[test]
806    fn file_store_fetch_mismatch_then_missing_object_errors() {
807        // Confirms the skip is checksum-gated, not mere existence: corrupt a
808        // dest file AND remove its store object → fetch cannot repair and errors
809        // ObjectNotFound (it did not blindly skip the present-but-wrong file).
810        let store_dir = TempDir::new("store");
811        let src_dir = TempDir::new("src");
812        let dest_dir = TempDir::new("dest");
813        let (manifest, id) = make_foo_bar_source(src_dir.path());
814
815        let store = FileStore::from_root(store_dir.path());
816        store.push(&manifest, src_dir.path()).expect("push");
817        let fetched = store.get_manifest(&id).expect("get manifest");
818        store
819            .fetch_files(&fetched, dest_dir.path())
820            .expect("first fetch populates dest");
821
822        let foo_entry = manifest
823            .entries()
824            .iter()
825            .find(|e| e.path == "./foo")
826            .unwrap();
827        // Corrupt the dest file so the skip gate fails for it...
828        fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
829        // ...and remove its store object so it cannot be repaired.
830        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
831        fs::remove_file(&foo_obj).unwrap();
832
833        match store.fetch_files(&fetched, dest_dir.path()) {
834            Err(StoreError::ObjectNotFound { checksum }) => {
835                assert_eq!(checksum, foo_entry.checksum);
836            }
837            other => panic!("expected ObjectNotFound (cannot repair), got {other:?}"),
838        }
839    }
840
841    /// Builds a small nested tree under `source` (several files across nested
842    /// directories) and returns its manifest + snapshot id, with real BLAKE3
843    /// checksums so store verification passes. Layout:
844    ///
845    /// ```text
846    /// ./a.txt            "a contents\n"
847    /// ./b.txt            "b contents\n"
848    /// ./sub/             (dir)
849    /// ./sub/c.txt        "c contents\n"
850    /// ./sub/deep/        (dir)
851    /// ./sub/deep/d.txt   "d contents\n"
852    /// ```
853    fn make_nested_source(source: &Path) -> (Manifest, String) {
854        let hasher = Blake3Hasher::new();
855        let files: &[(&str, &[u8])] = &[
856            ("a.txt", b"a contents\n"),
857            ("b.txt", b"b contents\n"),
858            ("sub/c.txt", b"c contents\n"),
859            ("sub/deep/d.txt", b"d contents\n"),
860        ];
861
862        fs::create_dir_all(source.join("sub/deep")).unwrap();
863        for (rel, bytes) in files {
864            fs::write(source.join(rel), bytes).unwrap();
865        }
866
867        let mut manifest = Manifest::new();
868        // Directory entries first; their checksums/sizes are not verified on
869        // fetch (only files are content-addressed), so placeholder values are
870        // fine for re-materialization. The snapshot id derivation in core hashes
871        // the rendered text regardless, and we round-trip through it below.
872        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
873        manifest.push(ManifestEntry::new(
874            PathType::Directory,
875            "700",
876            "x",
877            0,
878            "./sub/",
879        ));
880        manifest.push(ManifestEntry::new(
881            PathType::Directory,
882            "700",
883            "x",
884            0,
885            "./sub/deep/",
886        ));
887        for (rel, bytes) in files {
888            let sum = hasher.hash_hex(bytes);
889            #[allow(clippy::cast_possible_truncation)]
890            manifest.push(ManifestEntry::new(
891                PathType::File,
892                "600",
893                sum,
894                bytes.len() as u64,
895                format!("./{rel}"),
896            ));
897        }
898
899        let manifest = Manifest::from_entries(manifest.entries().to_vec());
900        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
901        (manifest, id)
902    }
903
904    /// Asserts the four nested files re-materialized byte-identically at `dest`.
905    fn assert_nested_dest(dest: &Path) {
906        assert_eq!(fs::read(dest.join("a.txt")).unwrap(), b"a contents\n");
907        assert_eq!(fs::read(dest.join("b.txt")).unwrap(), b"b contents\n");
908        assert_eq!(fs::read(dest.join("sub/c.txt")).unwrap(), b"c contents\n");
909        assert_eq!(
910            fs::read(dest.join("sub/deep/d.txt")).unwrap(),
911            b"d contents\n"
912        );
913    }
914
915    #[test]
916    fn filestore_parallel_roundtrip_byte_identical() {
917        // A multi-threaded (concurrency=4) push+fetch round-trip of a nested
918        // tree must re-materialize byte-identically, and a sequential
919        // (concurrency=1) run must produce the identical store + dest.
920        let src_dir = TempDir::new("src");
921        let (manifest, id) = make_nested_source(src_dir.path());
922
923        // Parallel run.
924        let par_store_dir = TempDir::new("store-par");
925        let par_dest_dir = TempDir::new("dest-par");
926        let par_store =
927            FileStore::from_root_with_config(par_store_dir.path(), TransferConfig::new(4, None));
928        par_store.push(&manifest, src_dir.path()).expect("par push");
929        let par_manifest = par_store.get_manifest(&id).expect("par get manifest");
930        assert_eq!(par_manifest, manifest, "round-tripped manifest matches");
931        par_store
932            .fetch_files(&par_manifest, par_dest_dir.path())
933            .expect("par fetch");
934        assert_nested_dest(par_dest_dir.path());
935
936        // Sequential run into a fresh store/dest.
937        let seq_store_dir = TempDir::new("store-seq");
938        let seq_dest_dir = TempDir::new("dest-seq");
939        let seq_store =
940            FileStore::from_root_with_config(seq_store_dir.path(), TransferConfig::new(1, None));
941        seq_store.push(&manifest, src_dir.path()).expect("seq push");
942        let seq_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
943        assert_eq!(seq_id, id, "snapshot id is concurrency-independent");
944        seq_store
945            .fetch_files(&manifest, seq_dest_dir.path())
946            .expect("seq fetch");
947        assert_nested_dest(seq_dest_dir.path());
948
949        // Both stores landed every object at the identical sharded key with
950        // identical bytes.
951        for entry in manifest.entries() {
952            if entry.path_type != PathType::File {
953                continue;
954            }
955            let key = object_path(&entry.checksum);
956            let par_obj = par_store_dir.path().join(&key);
957            let seq_obj = seq_store_dir.path().join(&key);
958            assert!(par_obj.exists(), "par object {key} present");
959            assert!(seq_obj.exists(), "seq object {key} present");
960            assert_eq!(
961                fs::read(&par_obj).unwrap(),
962                fs::read(&seq_obj).unwrap(),
963                "par and seq object bytes identical"
964            );
965        }
966    }
967
968    #[test]
969    fn filestore_parallel_concurrency_one_sequential() {
970        // The concurrency=1 (single-thread pool) path is a correct sequential
971        // copy: round-trips byte-identically.
972        let store_dir = TempDir::new("store");
973        let src_dir = TempDir::new("src");
974        let dest_dir = TempDir::new("dest");
975        let (manifest, id) = make_nested_source(src_dir.path());
976
977        let store =
978            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(1, None));
979        store.push(&manifest, src_dir.path()).expect("push");
980        let fetched = store.get_manifest(&id).expect("get manifest");
981        store.fetch_files(&fetched, dest_dir.path()).expect("fetch");
982        assert_nested_dest(dest_dir.path());
983    }
984
985    #[test]
986    fn filestore_parallel_all_or_nothing_bad_object() {
987        // A source file whose bytes do not match its manifest checksum must make
988        // push fail with `Integrity` AND write NO manifest (all-or-nothing:
989        // manifest is written only after every parallel object copy succeeds).
990        let store_dir = TempDir::new("store");
991        let src_dir = TempDir::new("src");
992        let (manifest, id) = make_nested_source(src_dir.path());
993
994        // Corrupt one source file so its bytes no longer hash to the manifest
995        // checksum; persist's source-verify trips and push returns Integrity.
996        fs::write(src_dir.path().join("sub/c.txt"), b"TAMPERED\n").unwrap();
997
998        let store =
999            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1000        match store.push(&manifest, src_dir.path()) {
1001            Err(StoreError::Integrity { .. }) => {}
1002            other => panic!("expected Integrity from bad source object, got {other:?}"),
1003        }
1004
1005        // ALL-OR-NOTHING: the manifest must NOT have been written.
1006        let man_path = store.manifest_disk_path(&id);
1007        assert!(
1008            !man_path.exists(),
1009            "manifest must not be written when an object copy fails"
1010        );
1011    }
1012
1013    #[test]
1014    fn filestore_parallel_large_n_round_trips() {
1015        // Exercise the concurrency bound with N >> concurrency files.
1016        let store_dir = TempDir::new("store");
1017        let src_dir = TempDir::new("src");
1018        let dest_dir = TempDir::new("dest");
1019        let hasher = Blake3Hasher::new();
1020
1021        let mut manifest = Manifest::new();
1022        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1023        let n = 50usize;
1024        for i in 0..n {
1025            let name = format!("file-{i:03}.txt");
1026            let contents = format!("contents of file {i}\n");
1027            fs::write(src_dir.path().join(&name), contents.as_bytes()).unwrap();
1028            let sum = hasher.hash_hex(contents.as_bytes());
1029            #[allow(clippy::cast_possible_truncation)]
1030            manifest.push(ManifestEntry::new(
1031                PathType::File,
1032                "600",
1033                sum,
1034                contents.len() as u64,
1035                format!("./{name}"),
1036            ));
1037        }
1038        let manifest = Manifest::from_entries(manifest.entries().to_vec());
1039        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1040
1041        let store =
1042            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1043        store.push(&manifest, src_dir.path()).expect("push N files");
1044        let fetched = store.get_manifest(&id).expect("get manifest");
1045        store
1046            .fetch_files(&fetched, dest_dir.path())
1047            .expect("fetch N files");
1048
1049        for i in 0..n {
1050            let name = format!("file-{i:03}.txt");
1051            let expected = format!("contents of file {i}\n");
1052            assert_eq!(
1053                fs::read(dest_dir.path().join(&name)).unwrap(),
1054                expected.as_bytes()
1055            );
1056        }
1057    }
1058
1059    #[test]
1060    fn file_store_strip_leading_dot_slash() {
1061        assert_eq!(strip_leading_dot_slash("./foo"), "foo");
1062        assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
1063        assert_eq!(strip_leading_dot_slash("./a/"), "a");
1064        assert_eq!(strip_leading_dot_slash("./"), "");
1065        assert_eq!(strip_leading_dot_slash("/abs/path"), "/abs/path");
1066    }
1067}