Skip to main content

snapdir_stores/
file_store.rs

1//! `FileStore`: the `file://` storage backend.
2//!
3//! A [`FileStore`] is rooted at a local directory and holds the frozen
4//! content-addressable `.objects`/`.manifests` sharded layout, so a store
5//! directory written by any conforming implementation is interchangeable:
6//!
7//! ```text
8//! <root>/.objects/<sharded checksum>     raw file bytes
9//! <root>/.manifests/<sharded snapshot id> manifest text
10//! ```
11//!
12//! Sharding and the on-disk paths come straight from [`snapdir_core::store`]
13//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
14//!
15//! # Oracle parity
16//!
17//! - **`new` / URL parsing** mirrors `_snapdir_file_store_get_store_dir`:
18//!   strips a leading `file://`, `file:///`, `file://localhost/` (etc.) prefix
19//!   down to an absolute path and drops a trailing slash.
20//! - **`push`** mirrors `snapdir_file_store_get_push_command` +
21//!   `_snapdir_file_store_persit`: it is a no-op if the manifest already exists
22//!   (skip-if-present); otherwise it writes every referenced object that is
23//!   absent (skip-if-present per object) *before* writing the manifest, so a
24//!   present manifest always implies all of its objects are present.
25//! - **`fetch_files` / `get_manifest`** mirror the fetch side of
26//!   `_snapdir_file_store_persit`: copy to a temp path, verify the content
27//!   BLAKE3 against the expected checksum, retry up to five times, then
28//!   atomically rename into place.
29//!
30//! All I/O is native in-process filesystem I/O; nothing shells out.
31
32use std::fs;
33use std::io;
34use std::path::{Path, PathBuf};
35
36use std::sync::Arc;
37
38use snapdir_core::manifest::{Manifest, PathType};
39use snapdir_core::merkle::{Blake3Hasher, Hasher};
40use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
41use snapdir_core::Meter;
42
43use crate::stream::StreamStore;
44use crate::transfer::TransferConfig;
45use crate::util::{file_present_and_verified, hash_file};
46
47/// Number of times the oracle retries a persist whose copied bytes fail their
48/// checksum but whose source still verifies (`_SNAPDIR_FILE_STORE_RETRIES`).
49const MAX_PERSIST_RETRIES: u32 = 5;
50
51/// A content-addressable store backed by a local directory (the `file://`
52/// backend).
53///
54/// Construct one with [`FileStore::new`] (parsing a `file://` URL or a bare
55/// path) or [`FileStore::from_root`] (an already-resolved directory).
56#[derive(Debug, Clone)]
57pub struct FileStore {
58    root: PathBuf,
59    config: TransferConfig,
60    /// Optional progress meter; recorded into during transfers. `None` (the
61    /// default from every constructor) means zero recording and byte-identical
62    /// behavior. Set by the CLI via [`FileStore::with_meter`].
63    meter: Option<Arc<Meter>>,
64}
65
66impl FileStore {
67    /// Builds a store from a `store` URL or path, matching the oracle's
68    /// `_snapdir_file_store_get_store_dir`.
69    ///
70    /// Accepts `file:///abs/path`, `file://localhost/abs/path`, `file://`
71    /// followed by an absolute path, or a bare absolute path. A leading
72    /// `file:` scheme (with any number of slashes, optionally `localhost`) is
73    /// rewritten to a single leading `/`, and a trailing slash is dropped.
74    #[must_use]
75    pub fn new(store: &str) -> Self {
76        Self::from_root(parse_store_dir(store))
77    }
78
79    /// Like [`new`](Self::new), but carries a [`TransferConfig`] for
80    /// concurrency / bandwidth control.
81    #[must_use]
82    pub fn new_with_config(store: &str, config: TransferConfig) -> Self {
83        Self::from_root_with_config(parse_store_dir(store), config)
84    }
85
86    /// Builds a store rooted at an already-resolved directory.
87    #[must_use]
88    pub fn from_root(root: impl Into<PathBuf>) -> Self {
89        Self::from_root_with_config(root, TransferConfig::default())
90    }
91
92    /// Like [`from_root`](Self::from_root), but carries a [`TransferConfig`] for
93    /// concurrency / bandwidth control. [`from_root`](Self::from_root) and
94    /// [`new`](Self::new) delegate here with [`TransferConfig::default`].
95    #[must_use]
96    pub fn from_root_with_config(root: impl Into<PathBuf>, config: TransferConfig) -> Self {
97        Self {
98            root: root.into(),
99            config,
100            meter: None,
101        }
102    }
103
104    /// Attaches (or clears) an optional progress [`Meter`], rides alongside
105    /// [`config`](Self::transfer_config). The copy paths record bytes-in /
106    /// bytes-out + per-object progress into it; `None` (the constructor default)
107    /// means zero recording and byte-identical behavior. The CLI sets this after
108    /// construction.
109    #[must_use]
110    pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
111        self.meter = meter;
112        self
113    }
114
115    /// Returns the store's root directory.
116    #[must_use]
117    pub fn root(&self) -> &Path {
118        &self.root
119    }
120
121    /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
122    /// with. Consumed by the transfer loops in later gates.
123    #[must_use]
124    pub fn transfer_config(&self) -> &TransferConfig {
125        &self.config
126    }
127
128    /// Absolute on-disk path of an object given its checksum.
129    fn object_disk_path(&self, checksum: &str) -> PathBuf {
130        self.root.join(object_path(checksum))
131    }
132
133    /// Absolute on-disk path of a manifest given its snapshot id.
134    fn manifest_disk_path(&self, id: &str) -> PathBuf {
135        self.root.join(manifest_path(id))
136    }
137
138    /// Copies a batch of `(source, target, expected_checksum)` jobs through
139    /// [`persist`] across a thread pool bounded by `self.config.concurrency`.
140    ///
141    /// Local copies have no network bandwidth concern, so the async
142    /// rate-limited transfer driver does not apply here — only the concurrency
143    /// cap. The first [`StoreError`] is propagated and stops scheduling further
144    /// work (`try_for_each`). A `concurrency` of 1 yields a single-threaded
145    /// sequential copy. Each task uses a fresh, cheap, stateless
146    /// [`Blake3Hasher`] to sidestep any `Sync` concern.
147    fn parallel_copy(&self, jobs: &[(PathBuf, PathBuf, String)]) -> Result<(), StoreError> {
148        use rayon::prelude::*;
149
150        if jobs.is_empty() {
151            return Ok(());
152        }
153
154        // `&Meter` is `Sync`, so it is shared across the rayon closures. `None`
155        // means zero recording and byte-identical behavior.
156        let meter = self.meter.as_deref();
157
158        let pool = rayon::ThreadPoolBuilder::new()
159            .num_threads(self.config.concurrency.get())
160            .build()
161            .map_err(|err| StoreError::Backend {
162                message: "failed to build copy thread pool".to_owned(),
163                source: Some(Box::new(err)),
164            })?;
165
166        pool.install(|| {
167            jobs.par_iter().try_for_each(|(source, target, expected)| {
168                if let Some(m) = meter {
169                    m.object_started();
170                }
171                // `persist` reads `source` and writes `target`. Record the
172                // source size as both bytes-in (read) and bytes-out (written);
173                // a missing source surfaces as the persist error below.
174                let len = std::fs::metadata(source).map_or(0, |md| md.len());
175                persist(source, target, expected, &Blake3Hasher::new())?;
176                if let Some(m) = meter {
177                    m.add_in(len);
178                    m.add_out(len);
179                    m.object_finished();
180                }
181                Ok(())
182            })
183        })
184    }
185}
186
187impl Store for FileStore {
188    fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
189        let path = self.manifest_disk_path(id);
190        let bytes = match fs::read(&path) {
191            Ok(bytes) => bytes,
192            Err(err) if err.kind() == io::ErrorKind::NotFound => {
193                return Err(StoreError::ManifestNotFound { id: id.to_owned() });
194            }
195            Err(err) => return Err(StoreError::Io(err)),
196        };
197
198        // The snapshot id is BLAKE3 of the comment-stripped manifest text with
199        // the oracle's trailing `echo` newline. Verify the stored bytes hash
200        // back to `id` before trusting them (oracle: the manifest id check on
201        // fetch). `snapshot_id` in core re-renders + re-hashes the parsed
202        // manifest, so parse first, then verify against the parsed form.
203        let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
204            message: format!("manifest {id} is not valid UTF-8"),
205            source: Some(Box::new(err)),
206        })?;
207        let manifest = Manifest::parse(&text)?;
208
209        let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
210        if actual != id {
211            return Err(StoreError::Integrity {
212                address: manifest_path(id),
213                expected: id.to_owned(),
214                actual,
215            });
216        }
217
218        Ok(manifest)
219    }
220
221    fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
222        let hasher = Blake3Hasher::new();
223
224        // First, SEQUENTIAL pass: materialize every directory and pre-create
225        // each file's parent (so the parallel copies below never race on
226        // `create_dir_all` of the same ancestor), short-circuit files that are
227        // already present-and-verified (skip-if-present-and-verified — no object
228        // read at all, so a populated dest succeeds even if the store object is
229        // gone), and confirm the source object exists for the rest (preserving
230        // the `ObjectNotFound` error when a needed source is missing). The file
231        // entries that actually need copying are collected as `(source, target,
232        // checksum)` jobs for the parallel phase.
233        let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
234        for entry in manifest.entries() {
235            let rel = strip_leading_dot_slash(&entry.path);
236            let target = dest.join(rel);
237            match entry.path_type {
238                PathType::Directory => {
239                    fs::create_dir_all(&target)?;
240                }
241                PathType::File => {
242                    // A destination file that already exists and whose content
243                    // hashes to the manifest's checksum needs no copy. A
244                    // mismatching/corrupt local file falls through and is
245                    // repaired by the persist below.
246                    if file_present_and_verified(&target, &entry.checksum, &hasher) {
247                        // Skip-present: record it as skipped (advisory only).
248                        if let Some(m) = self.meter.as_deref() {
249                            m.add_skipped(1);
250                        }
251                        continue;
252                    }
253                    if let Some(parent) = target.parent() {
254                        fs::create_dir_all(parent)?;
255                    }
256                    let source = self.object_disk_path(&entry.checksum);
257                    if !source.exists() {
258                        return Err(StoreError::ObjectNotFound {
259                            checksum: entry.checksum.clone(),
260                        });
261                    }
262                    jobs.push((source, target, entry.checksum.clone()));
263                }
264            }
265        }
266
267        // Total to copy (bytes over the to-copy set), recorded so the bar can
268        // track bytes. Advisory: no effect on what is copied. No-op w/o meter.
269        if let Some(m) = self.meter.as_deref() {
270            let total: u64 = jobs
271                .iter()
272                .map(|(source, _, _)| fs::metadata(source).map_or(0, |md| md.len()))
273                .sum();
274            m.set_total(total);
275        }
276
277        // Parallel copy phase, bounded by `config.concurrency`. `try_for_each`
278        // propagates the first `StoreError` and stops scheduling new work. Each
279        // task uses a fresh, cheap, stateless `Blake3Hasher`.
280        self.parallel_copy(&jobs)
281    }
282
283    fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
284        // Compute the snapshot id of the manifest we are about to push so we
285        // can locate (and skip-if-present) its manifest file.
286        let hasher = Blake3Hasher::new();
287        let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
288        let manifest_target = self.manifest_disk_path(&id);
289
290        // Skip-if-present: nothing to do when the manifest already exists. A
291        // present manifest implies all its objects are present (we maintain
292        // that invariant by writing the manifest last).
293        if manifest_target.exists() {
294            return Ok(());
295        }
296
297        // Collect every referenced object that is absent (skip-if-present per
298        // object: an object already filed under its content address is trusted,
299        // it is content-addressable). These are copied BEFORE the manifest.
300        let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
301        for entry in manifest.entries() {
302            if entry.path_type != PathType::File {
303                continue;
304            }
305            let object_target = self.object_disk_path(&entry.checksum);
306            if object_target.exists() {
307                // Skip-present per object: record it as skipped (advisory only).
308                if let Some(m) = self.meter.as_deref() {
309                    m.add_skipped(1);
310                }
311                continue;
312            }
313            let rel = strip_leading_dot_slash(&entry.path);
314            let object_source = source.join(rel);
315            jobs.push((object_source, object_target, entry.checksum.clone()));
316        }
317
318        // Total to push (bytes over the to-push set), recorded so the bar can
319        // track bytes. Advisory: no effect on what is pushed. No-op w/o meter.
320        if let Some(m) = self.meter.as_deref() {
321            let total: u64 = jobs
322                .iter()
323                .map(|(src, _, _)| fs::metadata(src).map_or(0, |md| md.len()))
324                .sum();
325            m.set_total(total);
326        }
327
328        // Parallel copy phase, bounded by `config.concurrency`. ALL-OR-NOTHING:
329        // any error returns immediately and NO manifest is written; the
330        // manifest is written only after every object copy succeeds, preserving
331        // the invariant that a present manifest implies present objects.
332        self.parallel_copy(&jobs)?;
333
334        // Write the manifest last, via the same verify/retry/atomic-rename
335        // path, so a present manifest always implies present objects.
336        write_manifest(manifest, &manifest_target, &id, &hasher)?;
337        Ok(())
338    }
339}
340
341impl StreamStore for FileStore {
342    fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
343        Ok(self.object_disk_path(checksum).exists())
344    }
345
346    fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
347        let path = self.object_disk_path(checksum);
348        let bytes = match fs::read(&path) {
349            Ok(bytes) => bytes,
350            Err(err) if err.kind() == io::ErrorKind::NotFound => {
351                return Err(StoreError::ObjectNotFound {
352                    checksum: checksum.to_owned(),
353                });
354            }
355            Err(err) => return Err(StoreError::Io(err)),
356        };
357
358        // Verify the stored blob hashes back to its content-address before
359        // returning it — corruption must surface as `Integrity`, never as bad
360        // bytes handed to a store-to-store copy.
361        let actual = Blake3Hasher::new().hash_hex(&bytes);
362        if actual != checksum {
363            return Err(StoreError::Integrity {
364                address: path.display().to_string(),
365                expected: checksum.to_owned(),
366                actual,
367            });
368        }
369        Ok(bytes)
370    }
371
372    fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
373        // Verify BEFORE writing: a blob whose bytes do not hash to `checksum`
374        // must never land at that content-address (nothing is stored).
375        let actual = Blake3Hasher::new().hash_hex(&bytes);
376        if actual != checksum {
377            return Err(StoreError::Integrity {
378                address: object_path(checksum),
379                expected: checksum.to_owned(),
380                actual,
381            });
382        }
383
384        let target = self.object_disk_path(checksum);
385        if let Some(parent) = target.parent() {
386            fs::create_dir_all(parent)?;
387        }
388        // Temp sibling + atomic rename, the same write discipline `persist`
389        // uses, so a partially-written object is never visible at its address.
390        let tmp = temp_sibling(&target);
391        fs::write(&tmp, &bytes)?;
392        fs::rename(&tmp, &target)?;
393        Ok(())
394    }
395
396    fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
397        write_manifest(
398            manifest,
399            &self.manifest_disk_path(id),
400            id,
401            &Blake3Hasher::new(),
402        )
403    }
404}
405
406/// Copies `source` to `target`, verifying the content BLAKE3 against
407/// `expected`, retrying up to [`MAX_PERSIST_RETRIES`] times, then atomically
408/// renaming into place. Mirrors `_snapdir_file_store_persit`.
409fn persist(
410    source: &Path,
411    target: &Path,
412    expected: &str,
413    hasher: &impl Hasher,
414) -> Result<(), StoreError> {
415    if let Some(parent) = target.parent() {
416        fs::create_dir_all(parent)?;
417    }
418
419    let mut attempts_left = MAX_PERSIST_RETRIES;
420    loop {
421        // Copy to a unique temp path beside the target so the final rename is
422        // an atomic, same-filesystem move (the oracle's `.tmp` discipline).
423        let tmp = temp_sibling(target);
424        copy_file(source, &tmp)?;
425
426        let actual = hash_file(&tmp, hasher)?;
427        if actual == expected {
428            // Atomic rename into the final content-addressed location.
429            fs::rename(&tmp, target)?;
430            return Ok(());
431        }
432
433        // Copied bytes did not verify. Clean up the temp file and decide
434        // whether to retry: the oracle only retries when the *source* still
435        // hashes to the expected value, otherwise the source itself is bad.
436        let _ = fs::remove_file(&tmp);
437        let source_actual = hash_file(source, hasher)?;
438        if source_actual != expected {
439            return Err(StoreError::Integrity {
440                address: source.display().to_string(),
441                expected: expected.to_owned(),
442                actual: source_actual,
443            });
444        }
445
446        attempts_left = attempts_left.saturating_sub(1);
447        if attempts_left == 0 {
448            return Err(StoreError::Integrity {
449                address: target.display().to_string(),
450                expected: expected.to_owned(),
451                actual,
452            });
453        }
454    }
455}
456
457/// Writes a manifest's text to `target`, verifying it hashes to `id`, then
458/// atomically renaming into place. The manifest's "content" is the
459/// snapshot-id-bearing text (`Display` + trailing newline), so we verify with
460/// [`snapdir_core::merkle::snapshot_id`] rather than a raw byte hash.
461fn write_manifest(
462    manifest: &Manifest,
463    target: &Path,
464    id: &str,
465    hasher: &impl Hasher,
466) -> Result<(), StoreError> {
467    if let Some(parent) = target.parent() {
468        fs::create_dir_all(parent)?;
469    }
470
471    // The on-disk manifest must hash (snapshot_id) back to `id`. Render once
472    // and confirm before writing.
473    let actual = snapdir_core::merkle::snapshot_id(manifest, hasher);
474    if actual != id {
475        return Err(StoreError::Integrity {
476            address: target.display().to_string(),
477            expected: id.to_owned(),
478            actual,
479        });
480    }
481
482    // Oracle stores `echo "${manifest}"` — the manifest text plus a single
483    // trailing newline (the same bytes snapshot_id hashes).
484    let mut text = manifest.to_string();
485    text.push('\n');
486
487    let tmp = temp_sibling(target);
488    fs::write(&tmp, text.as_bytes())?;
489    fs::rename(&tmp, target)?;
490    Ok(())
491}
492
493/// Copies a regular file's bytes from `source` to `target` (mirrors the
494/// oracle's `cp -RL -n`: dereference, do not clobber — `target` is a fresh
495/// temp path so the no-clobber aspect is implicit).
496fn copy_file(source: &Path, target: &Path) -> Result<(), StoreError> {
497    fs::copy(source, target)?;
498    Ok(())
499}
500
501/// Builds a unique temp sibling path for `target` (same directory, so the
502/// final rename stays on one filesystem). Uses pid + a process-monotonic
503/// counter so concurrent persists never collide.
504fn temp_sibling(target: &Path) -> PathBuf {
505    use std::sync::atomic::{AtomicU64, Ordering};
506    static COUNTER: AtomicU64 = AtomicU64::new(0);
507    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
508    let pid = std::process::id();
509    let file_name = target
510        .file_name()
511        .map(|s| s.to_string_lossy().into_owned())
512        .unwrap_or_default();
513    let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
514    match target.parent() {
515        Some(parent) => parent.join(tmp_name),
516        None => PathBuf::from(tmp_name),
517    }
518}
519
520/// Strips a leading `./` (relative-mode manifest paths) and a trailing `/`
521/// (directory entries) so the remainder can be joined onto a destination root.
522fn strip_leading_dot_slash(path: &str) -> &str {
523    let trimmed = path.strip_prefix("./").unwrap_or(path);
524    trimmed.strip_suffix('/').unwrap_or(trimmed)
525}
526
527/// Resolves a `store` URL/path to its on-disk directory, matching the oracle's
528/// `_snapdir_file_store_get_store_dir`:
529///
530/// ```sh
531/// store_dir="$(echo "$store" | sed -E 's|^file:/*(localhost/?)?|/|')"
532/// echo "${store_dir%/}"
533/// ```
534///
535/// i.e. replace a leading `file:` + any number of `/` (optionally followed by
536/// `localhost` + optional `/`) with a single `/`, then strip a trailing slash.
537fn parse_store_dir(store: &str) -> PathBuf {
538    let resolved = if let Some(rest) = store.strip_prefix("file:") {
539        // Drop the run of slashes the scheme leaves behind.
540        let rest = rest.trim_start_matches('/');
541        // An optional `localhost` host segment, with an optional trailing
542        // slash, is also dropped by the oracle's regex.
543        let rest = if let Some(after) = rest.strip_prefix("localhost") {
544            after.strip_prefix('/').unwrap_or(after)
545        } else {
546            rest
547        };
548        // The regex always substitutes a single leading `/`.
549        format!("/{rest}")
550    } else {
551        store.to_owned()
552    };
553
554    // `${store_dir%/}` — strip a single trailing slash (but keep a bare "/").
555    let trimmed = if resolved.len() > 1 {
556        resolved.strip_suffix('/').unwrap_or(&resolved)
557    } else {
558        &resolved
559    };
560    PathBuf::from(trimmed)
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use snapdir_core::manifest::ManifestEntry;
567    use std::fs;
568    use std::path::Path;
569
570    // A tiny temp-dir helper so tests don't pull in a dev-dependency. Creates a
571    // unique directory under the system temp dir and removes it on drop.
572    struct TempDir {
573        path: PathBuf,
574    }
575
576    impl TempDir {
577        fn new(tag: &str) -> Self {
578            use std::sync::atomic::{AtomicU64, Ordering};
579            static COUNTER: AtomicU64 = AtomicU64::new(0);
580            let n = COUNTER.fetch_add(1, Ordering::Relaxed);
581            let path = std::env::temp_dir().join(format!(
582                "snapdir-filestore-test-{}-{tag}-{n}",
583                std::process::id()
584            ));
585            fs::create_dir_all(&path).expect("create temp dir");
586            Self { path }
587        }
588
589        fn path(&self) -> &Path {
590            &self.path
591        }
592    }
593
594    impl Drop for TempDir {
595        fn drop(&mut self) {
596            let _ = fs::remove_dir_all(&self.path);
597        }
598    }
599
600    /// Builds a manifest for a source tree containing `foo` ("foo\n") and
601    /// `bar` ("bar\n") and writes those files into `source`. Returns the
602    /// manifest and its snapshot id. Checksums are the real BLAKE3 of the
603    /// file bytes so the store's verification passes.
604    fn make_foo_bar_source(source: &Path) -> (Manifest, String) {
605        let hasher = Blake3Hasher::new();
606        fs::write(source.join("foo"), b"foo\n").unwrap();
607        fs::write(source.join("bar"), b"bar\n").unwrap();
608        let foo_sum = hasher.hash_hex(b"foo\n");
609        let bar_sum = hasher.hash_hex(b"bar\n");
610
611        let root_sum =
612            snapdir_core::merkle::directory_checksum([foo_sum.as_str(), bar_sum.as_str()], &hasher);
613
614        let mut manifest = Manifest::new();
615        manifest.push(ManifestEntry::new(
616            PathType::Directory,
617            "700",
618            root_sum,
619            8,
620            "./",
621        ));
622        manifest.push(ManifestEntry::new(
623            PathType::File,
624            "600",
625            bar_sum,
626            4,
627            "./bar",
628        ));
629        manifest.push(ManifestEntry::new(
630            PathType::File,
631            "600",
632            foo_sum,
633            4,
634            "./foo",
635        ));
636        let manifest = Manifest::from_entries(manifest.entries().to_vec());
637        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
638        (manifest, id)
639    }
640
641    #[test]
642    fn file_store_parse_store_dir_matches_oracle_sed() {
643        // file:// + abs path -> abs path; trailing slash stripped.
644        assert_eq!(
645            parse_store_dir("file:///tmp/store"),
646            PathBuf::from("/tmp/store")
647        );
648        assert_eq!(
649            parse_store_dir("file:///tmp/store/"),
650            PathBuf::from("/tmp/store")
651        );
652        // localhost host segment dropped.
653        assert_eq!(
654            parse_store_dir("file://localhost/tmp/store"),
655            PathBuf::from("/tmp/store")
656        );
657        // file:// + abs path with two slashes.
658        assert_eq!(
659            parse_store_dir("file://tmp/store"),
660            PathBuf::from("/tmp/store")
661        );
662        // bare absolute path left intact.
663        assert_eq!(parse_store_dir("/tmp/store"), PathBuf::from("/tmp/store"));
664        // bare root preserved.
665        assert_eq!(parse_store_dir("file:///"), PathBuf::from("/"));
666    }
667
668    #[test]
669    fn file_store_push_lands_objects_at_sharded_keys_and_manifest_last() {
670        let store_dir = TempDir::new("store");
671        let src_dir = TempDir::new("src");
672        let (manifest, id) = make_foo_bar_source(src_dir.path());
673
674        let store = FileStore::from_root(store_dir.path());
675        store.push(&manifest, src_dir.path()).expect("push ok");
676
677        // Objects land at the exact sharded keys.
678        for entry in manifest.entries() {
679            if entry.path_type == PathType::File {
680                let obj = store_dir.path().join(object_path(&entry.checksum));
681                assert!(obj.exists(), "expected object at {}", obj.display());
682                // Content matches.
683                let bytes = fs::read(&obj).unwrap();
684                assert_eq!(
685                    Blake3Hasher::new().hash_hex(&bytes),
686                    entry.checksum,
687                    "object content must hash to its address"
688                );
689            }
690        }
691
692        // Manifest written at its sharded key, and hashes back to the id.
693        let man_path = store_dir.path().join(manifest_path(&id));
694        assert!(man_path.exists(), "manifest must exist after push");
695        let read_back = store.get_manifest(&id).expect("manifest reads back");
696        assert_eq!(read_back, manifest);
697    }
698
699    #[test]
700    fn file_store_push_skips_when_manifest_present() {
701        let store_dir = TempDir::new("store");
702        let src_dir = TempDir::new("src");
703        let (manifest, id) = make_foo_bar_source(src_dir.path());
704        let store = FileStore::from_root(store_dir.path());
705        store.push(&manifest, src_dir.path()).expect("first push");
706
707        // Remove an object but keep the manifest: a second push must skip
708        // entirely (manifest-present short-circuit), leaving the object gone.
709        let foo_entry = manifest
710            .entries()
711            .iter()
712            .find(|e| e.path == "./foo")
713            .unwrap();
714        let obj = store_dir.path().join(object_path(&foo_entry.checksum));
715        fs::remove_file(&obj).unwrap();
716
717        let _ = id;
718        store
719            .push(&manifest, src_dir.path())
720            .expect("second push skips");
721        assert!(
722            !obj.exists(),
723            "manifest-present push must be a full no-op (object stays removed)"
724        );
725    }
726
727    #[test]
728    fn file_store_push_skips_present_objects_but_adds_missing() {
729        let store_dir = TempDir::new("store");
730        let src_dir = TempDir::new("src");
731        let (manifest, id) = make_foo_bar_source(src_dir.path());
732        let store = FileStore::from_root(store_dir.path());
733        store.push(&manifest, src_dir.path()).expect("first push");
734
735        // Delete the manifest and one object; re-push must re-create the
736        // missing object (and the manifest) without erroring on the present one.
737        let man_path = store_dir.path().join(manifest_path(&id));
738        fs::remove_file(&man_path).unwrap();
739        let foo_entry = manifest
740            .entries()
741            .iter()
742            .find(|e| e.path == "./foo")
743            .unwrap();
744        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
745        fs::remove_file(&foo_obj).unwrap();
746
747        store.push(&manifest, src_dir.path()).expect("re-push");
748        assert!(foo_obj.exists(), "missing object must be re-added");
749        assert!(man_path.exists(), "manifest must be re-written");
750    }
751
752    #[test]
753    fn file_store_fetch_round_trips_and_verifies() {
754        let store_dir = TempDir::new("store");
755        let src_dir = TempDir::new("src");
756        let dest_dir = TempDir::new("dest");
757        let (manifest, id) = make_foo_bar_source(src_dir.path());
758        let store = FileStore::from_root(store_dir.path());
759        store.push(&manifest, src_dir.path()).expect("push");
760
761        let fetched = store.get_manifest(&id).expect("get manifest");
762        store
763            .fetch_files(&fetched, dest_dir.path())
764            .expect("fetch files");
765
766        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
767        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
768    }
769
770    #[test]
771    fn file_store_get_manifest_missing_is_not_found() {
772        let store_dir = TempDir::new("store");
773        let store = FileStore::from_root(store_dir.path());
774        let missing = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
775        match store.get_manifest(missing) {
776            Err(StoreError::ManifestNotFound { id }) => assert_eq!(id, missing),
777            other => panic!("expected ManifestNotFound, got {other:?}"),
778        }
779    }
780
781    #[test]
782    fn file_store_get_manifest_tampered_fails_integrity() {
783        let store_dir = TempDir::new("store");
784        let src_dir = TempDir::new("src");
785        let (manifest, id) = make_foo_bar_source(src_dir.path());
786        let store = FileStore::from_root(store_dir.path());
787        store.push(&manifest, src_dir.path()).expect("push");
788
789        // Tamper with the stored manifest bytes.
790        let man_path = store_dir.path().join(manifest_path(&id));
791        fs::write(&man_path, b"D 700 deadbeef 0 ./\n").unwrap();
792
793        match store.get_manifest(&id) {
794            Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, id),
795            other => panic!("expected Integrity, got {other:?}"),
796        }
797    }
798
799    #[test]
800    fn file_store_fetch_missing_object_is_not_found() {
801        let store_dir = TempDir::new("store");
802        let dest_dir = TempDir::new("dest");
803        let hasher = Blake3Hasher::new();
804        let foo_sum = hasher.hash_hex(b"foo\n");
805
806        let mut manifest = Manifest::new();
807        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 4, "./"));
808        manifest.push(ManifestEntry::new(
809            PathType::File,
810            "600",
811            foo_sum.clone(),
812            4,
813            "./foo",
814        ));
815
816        let store = FileStore::from_root(store_dir.path());
817        match store.fetch_files(&manifest, dest_dir.path()) {
818            Err(StoreError::ObjectNotFound { checksum }) => assert_eq!(checksum, foo_sum),
819            other => panic!("expected ObjectNotFound, got {other:?}"),
820        }
821    }
822
823    #[test]
824    fn file_store_persist_rejects_corrupt_source() {
825        // A "source" object whose bytes do not match the claimed checksum must
826        // fail integrity (the oracle's "Invalid source checksum" path), not
827        // silently store corrupt data.
828        let store_dir = TempDir::new("store");
829        let src_dir = TempDir::new("src");
830        let dest_dir = TempDir::new("dest");
831        let hasher = Blake3Hasher::new();
832
833        // Real foo source/manifest, then corrupt the stored object so fetch's
834        // verify-on-copy trips and the source (the corrupt store object) fails.
835        let (manifest, id) = make_foo_bar_source(src_dir.path());
836        let store = FileStore::from_root(store_dir.path());
837        store.push(&manifest, src_dir.path()).expect("push");
838
839        let foo_entry = manifest
840            .entries()
841            .iter()
842            .find(|e| e.path == "./foo")
843            .unwrap();
844        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
845        fs::write(&foo_obj, b"corrupted not foo\n").unwrap();
846        // Sanity: the corrupted bytes really differ from the expected sum.
847        assert_ne!(hasher.hash_hex(b"corrupted not foo\n"), foo_entry.checksum);
848
849        let fetched = store.get_manifest(&id).expect("manifest still valid");
850        match store.fetch_files(&fetched, dest_dir.path()) {
851            Err(StoreError::Integrity { expected, .. }) => {
852                assert_eq!(expected, foo_entry.checksum);
853            }
854            other => panic!("expected Integrity from corrupt object, got {other:?}"),
855        }
856        // The corrupt object must NOT have been materialized at the dest.
857        assert!(!dest_dir.path().join("foo").exists());
858    }
859
860    #[test]
861    fn fetch_skip_present_verified() {
862        // Push a tree, fetch it (populating dest), then DELETE the store's whole
863        // `.objects` tree so any object read would now fail with ObjectNotFound.
864        // A second fetch into the SAME dest must still return Ok — proving every
865        // file was skipped via local checksum match (ZERO object reads).
866        let store_dir = TempDir::new("store");
867        let src_dir = TempDir::new("src");
868        let dest_dir = TempDir::new("dest");
869        let (manifest, id) = make_foo_bar_source(src_dir.path());
870
871        let store = FileStore::from_root(store_dir.path());
872        store.push(&manifest, src_dir.path()).expect("push");
873
874        let fetched = store.get_manifest(&id).expect("get manifest");
875        store
876            .fetch_files(&fetched, dest_dir.path())
877            .expect("first fetch populates dest");
878        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
879        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
880
881        // Nuke every object in the store. Any read of an object now fails.
882        let objects = store_dir.path().join(".objects");
883        fs::remove_dir_all(&objects).expect("remove .objects tree");
884        assert!(!objects.exists());
885
886        // Second fetch into the populated dest must succeed without reading a
887        // single (now-missing) object.
888        store
889            .fetch_files(&fetched, dest_dir.path())
890            .expect("second fetch skips every present+verified file (no object reads)");
891
892        // Dest contents intact.
893        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
894        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
895    }
896
897    #[test]
898    fn file_store_fetch_repairs_corrupt_dest_and_skips_intact() {
899        // With store objects present: corrupt one dest file. The corrupted file
900        // is re-fetched (repaired) to match its checksum again, while an
901        // unrelated already-correct dest file is still skipped.
902        let store_dir = TempDir::new("store");
903        let src_dir = TempDir::new("src");
904        let dest_dir = TempDir::new("dest");
905        let (manifest, id) = make_foo_bar_source(src_dir.path());
906
907        let store = FileStore::from_root(store_dir.path());
908        store.push(&manifest, src_dir.path()).expect("push");
909        let fetched = store.get_manifest(&id).expect("get manifest");
910        store
911            .fetch_files(&fetched, dest_dir.path())
912            .expect("first fetch populates dest");
913
914        // Corrupt `foo` in the dest; leave `bar` correct.
915        fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
916        // Remove `bar`'s store object so it CANNOT be re-fetched; the only way a
917        // second fetch can succeed is if `bar` is skipped (present + verified).
918        let bar_entry = manifest
919            .entries()
920            .iter()
921            .find(|e| e.path == "./bar")
922            .unwrap();
923        let bar_obj = store_dir.path().join(object_path(&bar_entry.checksum));
924        fs::remove_file(&bar_obj).unwrap();
925
926        store
927            .fetch_files(&fetched, dest_dir.path())
928            .expect("repair corrupt foo, skip intact bar");
929
930        // foo repaired back to its checksummed content; bar untouched.
931        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
932        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
933    }
934
935    #[test]
936    fn file_store_fetch_mismatch_then_missing_object_errors() {
937        // Confirms the skip is checksum-gated, not mere existence: corrupt a
938        // dest file AND remove its store object → fetch cannot repair and errors
939        // ObjectNotFound (it did not blindly skip the present-but-wrong file).
940        let store_dir = TempDir::new("store");
941        let src_dir = TempDir::new("src");
942        let dest_dir = TempDir::new("dest");
943        let (manifest, id) = make_foo_bar_source(src_dir.path());
944
945        let store = FileStore::from_root(store_dir.path());
946        store.push(&manifest, src_dir.path()).expect("push");
947        let fetched = store.get_manifest(&id).expect("get manifest");
948        store
949            .fetch_files(&fetched, dest_dir.path())
950            .expect("first fetch populates dest");
951
952        let foo_entry = manifest
953            .entries()
954            .iter()
955            .find(|e| e.path == "./foo")
956            .unwrap();
957        // Corrupt the dest file so the skip gate fails for it...
958        fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
959        // ...and remove its store object so it cannot be repaired.
960        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
961        fs::remove_file(&foo_obj).unwrap();
962
963        match store.fetch_files(&fetched, dest_dir.path()) {
964            Err(StoreError::ObjectNotFound { checksum }) => {
965                assert_eq!(checksum, foo_entry.checksum);
966            }
967            other => panic!("expected ObjectNotFound (cannot repair), got {other:?}"),
968        }
969    }
970
971    /// Builds a small nested tree under `source` (several files across nested
972    /// directories) and returns its manifest + snapshot id, with real BLAKE3
973    /// checksums so store verification passes. Layout:
974    ///
975    /// ```text
976    /// ./a.txt            "a contents\n"
977    /// ./b.txt            "b contents\n"
978    /// ./sub/             (dir)
979    /// ./sub/c.txt        "c contents\n"
980    /// ./sub/deep/        (dir)
981    /// ./sub/deep/d.txt   "d contents\n"
982    /// ```
983    fn make_nested_source(source: &Path) -> (Manifest, String) {
984        let hasher = Blake3Hasher::new();
985        let files: &[(&str, &[u8])] = &[
986            ("a.txt", b"a contents\n"),
987            ("b.txt", b"b contents\n"),
988            ("sub/c.txt", b"c contents\n"),
989            ("sub/deep/d.txt", b"d contents\n"),
990        ];
991
992        fs::create_dir_all(source.join("sub/deep")).unwrap();
993        for (rel, bytes) in files {
994            fs::write(source.join(rel), bytes).unwrap();
995        }
996
997        let mut manifest = Manifest::new();
998        // Directory entries first; their checksums/sizes are not verified on
999        // fetch (only files are content-addressed), so placeholder values are
1000        // fine for re-materialization. The snapshot id derivation in core hashes
1001        // the rendered text regardless, and we round-trip through it below.
1002        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1003        manifest.push(ManifestEntry::new(
1004            PathType::Directory,
1005            "700",
1006            "x",
1007            0,
1008            "./sub/",
1009        ));
1010        manifest.push(ManifestEntry::new(
1011            PathType::Directory,
1012            "700",
1013            "x",
1014            0,
1015            "./sub/deep/",
1016        ));
1017        for (rel, bytes) in files {
1018            let sum = hasher.hash_hex(bytes);
1019            #[allow(clippy::cast_possible_truncation)]
1020            manifest.push(ManifestEntry::new(
1021                PathType::File,
1022                "600",
1023                sum,
1024                bytes.len() as u64,
1025                format!("./{rel}"),
1026            ));
1027        }
1028
1029        let manifest = Manifest::from_entries(manifest.entries().to_vec());
1030        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1031        (manifest, id)
1032    }
1033
1034    /// Asserts the four nested files re-materialized byte-identically at `dest`.
1035    fn assert_nested_dest(dest: &Path) {
1036        assert_eq!(fs::read(dest.join("a.txt")).unwrap(), b"a contents\n");
1037        assert_eq!(fs::read(dest.join("b.txt")).unwrap(), b"b contents\n");
1038        assert_eq!(fs::read(dest.join("sub/c.txt")).unwrap(), b"c contents\n");
1039        assert_eq!(
1040            fs::read(dest.join("sub/deep/d.txt")).unwrap(),
1041            b"d contents\n"
1042        );
1043    }
1044
1045    #[test]
1046    fn filestore_parallel_roundtrip_byte_identical() {
1047        // A multi-threaded (concurrency=4) push+fetch round-trip of a nested
1048        // tree must re-materialize byte-identically, and a sequential
1049        // (concurrency=1) run must produce the identical store + dest.
1050        let src_dir = TempDir::new("src");
1051        let (manifest, id) = make_nested_source(src_dir.path());
1052
1053        // Parallel run.
1054        let par_store_dir = TempDir::new("store-par");
1055        let par_dest_dir = TempDir::new("dest-par");
1056        let par_store =
1057            FileStore::from_root_with_config(par_store_dir.path(), TransferConfig::new(4, None));
1058        par_store.push(&manifest, src_dir.path()).expect("par push");
1059        let par_manifest = par_store.get_manifest(&id).expect("par get manifest");
1060        assert_eq!(par_manifest, manifest, "round-tripped manifest matches");
1061        par_store
1062            .fetch_files(&par_manifest, par_dest_dir.path())
1063            .expect("par fetch");
1064        assert_nested_dest(par_dest_dir.path());
1065
1066        // Sequential run into a fresh store/dest.
1067        let seq_store_dir = TempDir::new("store-seq");
1068        let seq_dest_dir = TempDir::new("dest-seq");
1069        let seq_store =
1070            FileStore::from_root_with_config(seq_store_dir.path(), TransferConfig::new(1, None));
1071        seq_store.push(&manifest, src_dir.path()).expect("seq push");
1072        let seq_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1073        assert_eq!(seq_id, id, "snapshot id is concurrency-independent");
1074        seq_store
1075            .fetch_files(&manifest, seq_dest_dir.path())
1076            .expect("seq fetch");
1077        assert_nested_dest(seq_dest_dir.path());
1078
1079        // Both stores landed every object at the identical sharded key with
1080        // identical bytes.
1081        for entry in manifest.entries() {
1082            if entry.path_type != PathType::File {
1083                continue;
1084            }
1085            let key = object_path(&entry.checksum);
1086            let par_obj = par_store_dir.path().join(&key);
1087            let seq_obj = seq_store_dir.path().join(&key);
1088            assert!(par_obj.exists(), "par object {key} present");
1089            assert!(seq_obj.exists(), "seq object {key} present");
1090            assert_eq!(
1091                fs::read(&par_obj).unwrap(),
1092                fs::read(&seq_obj).unwrap(),
1093                "par and seq object bytes identical"
1094            );
1095        }
1096    }
1097
1098    #[test]
1099    fn filestore_parallel_concurrency_one_sequential() {
1100        // The concurrency=1 (single-thread pool) path is a correct sequential
1101        // copy: round-trips byte-identically.
1102        let store_dir = TempDir::new("store");
1103        let src_dir = TempDir::new("src");
1104        let dest_dir = TempDir::new("dest");
1105        let (manifest, id) = make_nested_source(src_dir.path());
1106
1107        let store =
1108            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(1, None));
1109        store.push(&manifest, src_dir.path()).expect("push");
1110        let fetched = store.get_manifest(&id).expect("get manifest");
1111        store.fetch_files(&fetched, dest_dir.path()).expect("fetch");
1112        assert_nested_dest(dest_dir.path());
1113    }
1114
1115    #[test]
1116    fn filestore_parallel_all_or_nothing_bad_object() {
1117        // A source file whose bytes do not match its manifest checksum must make
1118        // push fail with `Integrity` AND write NO manifest (all-or-nothing:
1119        // manifest is written only after every parallel object copy succeeds).
1120        let store_dir = TempDir::new("store");
1121        let src_dir = TempDir::new("src");
1122        let (manifest, id) = make_nested_source(src_dir.path());
1123
1124        // Corrupt one source file so its bytes no longer hash to the manifest
1125        // checksum; persist's source-verify trips and push returns Integrity.
1126        fs::write(src_dir.path().join("sub/c.txt"), b"TAMPERED\n").unwrap();
1127
1128        let store =
1129            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1130        match store.push(&manifest, src_dir.path()) {
1131            Err(StoreError::Integrity { .. }) => {}
1132            other => panic!("expected Integrity from bad source object, got {other:?}"),
1133        }
1134
1135        // ALL-OR-NOTHING: the manifest must NOT have been written.
1136        let man_path = store.manifest_disk_path(&id);
1137        assert!(
1138            !man_path.exists(),
1139            "manifest must not be written when an object copy fails"
1140        );
1141    }
1142
1143    #[test]
1144    fn filestore_parallel_large_n_round_trips() {
1145        // Exercise the concurrency bound with N >> concurrency files.
1146        let store_dir = TempDir::new("store");
1147        let src_dir = TempDir::new("src");
1148        let dest_dir = TempDir::new("dest");
1149        let hasher = Blake3Hasher::new();
1150
1151        let mut manifest = Manifest::new();
1152        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1153        let n = 50usize;
1154        for i in 0..n {
1155            let name = format!("file-{i:03}.txt");
1156            let contents = format!("contents of file {i}\n");
1157            fs::write(src_dir.path().join(&name), contents.as_bytes()).unwrap();
1158            let sum = hasher.hash_hex(contents.as_bytes());
1159            #[allow(clippy::cast_possible_truncation)]
1160            manifest.push(ManifestEntry::new(
1161                PathType::File,
1162                "600",
1163                sum,
1164                contents.len() as u64,
1165                format!("./{name}"),
1166            ));
1167        }
1168        let manifest = Manifest::from_entries(manifest.entries().to_vec());
1169        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1170
1171        let store =
1172            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1173        store.push(&manifest, src_dir.path()).expect("push N files");
1174        let fetched = store.get_manifest(&id).expect("get manifest");
1175        store
1176            .fetch_files(&fetched, dest_dir.path())
1177            .expect("fetch N files");
1178
1179        for i in 0..n {
1180            let name = format!("file-{i:03}.txt");
1181            let expected = format!("contents of file {i}\n");
1182            assert_eq!(
1183                fs::read(dest_dir.path().join(&name)).unwrap(),
1184                expected.as_bytes()
1185            );
1186        }
1187    }
1188
1189    #[test]
1190    fn meter_records_filestore_push_fetch() {
1191        // A FileStore wired `with_meter` doing push(manifest, src) then
1192        // fetch_files(manifest, dest) records add_in/add_out + objects_done
1193        // matching the object set. Push and fetch each touch every object once,
1194        // so over the two operations bytes_in/out == 2 * total bytes and
1195        // objects_done == 2 * N.
1196        let src_dir = TempDir::new("src");
1197        let (manifest, id) = make_nested_source(src_dir.path());
1198
1199        let n = manifest
1200            .entries()
1201            .iter()
1202            .filter(|e| e.path_type == PathType::File)
1203            .count() as u64;
1204        let total_bytes: u64 = manifest
1205            .entries()
1206            .iter()
1207            .filter(|e| e.path_type == PathType::File)
1208            .map(|e| e.size)
1209            .sum();
1210
1211        let store_dir = TempDir::new("store");
1212        let dest_dir = TempDir::new("dest");
1213        let meter = Arc::new(Meter::new());
1214        let store = FileStore::from_root(store_dir.path()).with_meter(Some(Arc::clone(&meter)));
1215
1216        store.push(&manifest, src_dir.path()).expect("push");
1217        let after_push = meter.snapshot();
1218        assert_eq!(after_push.bytes_in, total_bytes, "push read every object");
1219        assert_eq!(after_push.bytes_out, total_bytes, "push wrote every object");
1220        assert_eq!(after_push.objects_done, n, "push finished N objects");
1221        assert_eq!(after_push.objects_skipped, 0, "fresh store skips nothing");
1222        assert_eq!(after_push.objects_total, total_bytes, "push set byte total");
1223        assert_eq!(after_push.in_flight, 0, "nothing left in flight");
1224
1225        let fetched = store.get_manifest(&id).expect("get manifest");
1226        store
1227            .fetch_files(&fetched, dest_dir.path())
1228            .expect("fetch_files");
1229        let after_fetch = meter.snapshot();
1230        assert_eq!(
1231            after_fetch.bytes_in,
1232            2 * total_bytes,
1233            "fetch read every object again"
1234        );
1235        assert_eq!(
1236            after_fetch.bytes_out,
1237            2 * total_bytes,
1238            "fetch wrote every object again"
1239        );
1240        assert_eq!(after_fetch.objects_done, 2 * n, "push + fetch = 2N objects");
1241        assert_eq!(after_fetch.in_flight, 0, "nothing left in flight");
1242
1243        // Dest materialized correctly.
1244        assert_nested_dest(dest_dir.path());
1245    }
1246
1247    #[test]
1248    fn meter_records_none_is_identical() {
1249        // The same push+fetch with NO meter produces byte-identical store/dest
1250        // contents and the same snapshot id as a metered run — recording changes
1251        // nothing.
1252        let src_dir = TempDir::new("src");
1253        let (manifest, id) = make_nested_source(src_dir.path());
1254
1255        // Metered run.
1256        let metered_store_dir = TempDir::new("store-metered");
1257        let metered_dest_dir = TempDir::new("dest-metered");
1258        let meter = Arc::new(Meter::new());
1259        let metered =
1260            FileStore::from_root(metered_store_dir.path()).with_meter(Some(Arc::clone(&meter)));
1261        metered
1262            .push(&manifest, src_dir.path())
1263            .expect("metered push");
1264        let metered_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1265        let metered_manifest = metered.get_manifest(&id).expect("metered manifest");
1266        metered
1267            .fetch_files(&metered_manifest, metered_dest_dir.path())
1268            .expect("metered fetch");
1269
1270        // Unmetered run (meter is None — the constructor default).
1271        let plain_store_dir = TempDir::new("store-plain");
1272        let plain_dest_dir = TempDir::new("dest-plain");
1273        let plain = FileStore::from_root(plain_store_dir.path());
1274        plain.push(&manifest, src_dir.path()).expect("plain push");
1275        let plain_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1276        let plain_manifest = plain.get_manifest(&id).expect("plain manifest");
1277        plain
1278            .fetch_files(&plain_manifest, plain_dest_dir.path())
1279            .expect("plain fetch");
1280
1281        // Same snapshot id.
1282        assert_eq!(metered_id, plain_id, "snapshot id unaffected by the meter");
1283        assert_eq!(metered_id, id);
1284
1285        // Byte-identical objects at identical sharded keys.
1286        for entry in manifest.entries() {
1287            if entry.path_type != PathType::File {
1288                continue;
1289            }
1290            let key = object_path(&entry.checksum);
1291            let metered_obj = metered_store_dir.path().join(&key);
1292            let plain_obj = plain_store_dir.path().join(&key);
1293            assert!(metered_obj.exists(), "metered object {key} present");
1294            assert!(plain_obj.exists(), "plain object {key} present");
1295            assert_eq!(
1296                fs::read(&metered_obj).unwrap(),
1297                fs::read(&plain_obj).unwrap(),
1298                "metered and unmetered object bytes identical"
1299            );
1300        }
1301
1302        // Byte-identical dest trees.
1303        assert_nested_dest(metered_dest_dir.path());
1304        assert_nested_dest(plain_dest_dir.path());
1305    }
1306
1307    #[test]
1308    fn file_store_strip_leading_dot_slash() {
1309        assert_eq!(strip_leading_dot_slash("./foo"), "foo");
1310        assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
1311        assert_eq!(strip_leading_dot_slash("./a/"), "a");
1312        assert_eq!(strip_leading_dot_slash("./"), "");
1313        assert_eq!(strip_leading_dot_slash("/abs/path"), "/abs/path");
1314    }
1315
1316    // --- StreamStore (object/manifest-level streaming) -------------------
1317    //
1318    // Hermetic: FileStore is local, so these exercise the verify discipline
1319    // (BLAKE3 round-trip + corruption rejection) without any cloud creds.
1320
1321    #[test]
1322    fn stream_store_filestore_object_roundtrip() {
1323        let store_dir = TempDir::new("stream-roundtrip");
1324        let store = FileStore::from_root(store_dir.path());
1325
1326        let bytes = b"hello stream store\n".to_vec();
1327        let checksum = Blake3Hasher::new().hash_hex(&bytes);
1328
1329        // Absent before the write.
1330        assert!(!store.has_object(&checksum).unwrap());
1331
1332        store.put_object(&checksum, bytes.clone()).expect("put ok");
1333
1334        // Present after, and the round-tripped bytes are identical.
1335        assert!(store.has_object(&checksum).unwrap());
1336        assert_eq!(store.get_object(&checksum).unwrap(), bytes);
1337
1338        // It landed at the exact sharded content-address.
1339        assert!(store_dir.path().join(object_path(&checksum)).exists());
1340    }
1341
1342    #[test]
1343    fn stream_store_get_object_rejects_corruption() {
1344        let store_dir = TempDir::new("stream-corrupt");
1345        let store = FileStore::from_root(store_dir.path());
1346
1347        // Address a blob under `checksum` but write DIFFERENT bytes directly
1348        // to its on-disk path, simulating a corrupt/tampered object.
1349        let good = b"the real object bytes\n".to_vec();
1350        let checksum = Blake3Hasher::new().hash_hex(&good);
1351        let target = store_dir.path().join(object_path(&checksum));
1352        fs::create_dir_all(target.parent().unwrap()).unwrap();
1353        fs::write(&target, b"TAMPERED bytes that do not hash to the address\n").unwrap();
1354
1355        match store.get_object(&checksum) {
1356            Err(StoreError::Integrity {
1357                expected, actual, ..
1358            }) => {
1359                assert_eq!(expected, checksum);
1360                assert_ne!(actual, checksum, "actual must differ from the address");
1361            }
1362            other => panic!("expected Integrity, got {other:?}"),
1363        }
1364    }
1365
1366    #[test]
1367    fn stream_store_put_object_rejects_wrong_checksum() {
1368        let store_dir = TempDir::new("stream-wrong-checksum");
1369        let store = FileStore::from_root(store_dir.path());
1370
1371        let bytes = b"some payload\n".to_vec();
1372        // A syntactically-valid but WRONG content-address.
1373        let wrong = "dead".repeat(16); // 64 hex chars, not the real hash.
1374        assert_ne!(wrong, Blake3Hasher::new().hash_hex(&bytes));
1375
1376        match store.put_object(&wrong, bytes) {
1377            Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, wrong),
1378            other => panic!("expected Integrity, got {other:?}"),
1379        }
1380
1381        // Nothing was stored at the bogus address.
1382        assert!(!store.has_object(&wrong).unwrap());
1383        assert!(!store_dir.path().join(object_path(&wrong)).exists());
1384    }
1385
1386    #[test]
1387    fn stream_store_put_manifest_roundtrips() {
1388        let store_dir = TempDir::new("stream-manifest");
1389        let src_dir = TempDir::new("stream-manifest-src");
1390        let store = FileStore::from_root(store_dir.path());
1391
1392        let (manifest, id) = make_foo_bar_source(src_dir.path());
1393
1394        store.put_manifest(&id, &manifest).expect("put_manifest ok");
1395
1396        // get_manifest reads it back, re-verifies the id, and yields an equal
1397        // manifest.
1398        let back = store.get_manifest(&id).expect("get_manifest ok");
1399        assert_eq!(back.entries(), manifest.entries());
1400        assert_eq!(
1401            snapdir_core::merkle::snapshot_id(&back, &Blake3Hasher::new()),
1402            id
1403        );
1404    }
1405}