Skip to main content

snapdir_stores/
file_store.rs

1//! `FileStore`: the `file://` storage backend.
2//!
3//! A [`FileStore`] is rooted at a local directory and holds the frozen
4//! content-addressable `.objects`/`.manifests` sharded layout, so a store
5//! directory written by any conforming implementation is interchangeable:
6//!
7//! ```text
8//! <root>/.objects/<sharded checksum>     raw file bytes
9//! <root>/.manifests/<sharded snapshot id> manifest text
10//! ```
11//!
12//! Sharding and the on-disk paths come straight from [`snapdir_core::store`]
13//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
14//!
15//! # Oracle parity
16//!
17//! - **`new` / URL parsing** mirrors `_snapdir_file_store_get_store_dir`:
18//!   strips a leading `file://`, `file:///`, `file://localhost/` (etc.) prefix
19//!   down to an absolute path and drops a trailing slash.
20//! - **`push`** mirrors `snapdir_file_store_get_push_command` +
21//!   `_snapdir_file_store_persit`: it is a no-op if the manifest already exists
22//!   (skip-if-present); otherwise it writes every referenced object that is
23//!   absent (skip-if-present per object) *before* writing the manifest, so a
24//!   present manifest always implies all of its objects are present.
25//! - **`fetch_files` / `get_manifest`** mirror the fetch side of
26//!   `_snapdir_file_store_persit`: copy to a temp path, verify the content
27//!   BLAKE3 against the expected checksum, retry up to five times, then
28//!   atomically rename into place.
29//!
30//! All I/O is native in-process filesystem I/O; nothing shells out.
31
32use std::fs;
33use std::io;
34use std::path::{Path, PathBuf};
35
36use std::sync::Arc;
37
38use snapdir_core::manifest::{Manifest, PathType};
39use snapdir_core::merkle::{Blake3Hasher, Hasher};
40use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
41use snapdir_core::Meter;
42
43use crate::adaptive::{
44    p95_object_size, AdaptiveGate, AdaptivePolicy as ControllerPolicy, ControllerDriver, OpResult,
45    OpSample,
46};
47use crate::stream::StreamStore;
48use crate::transfer::{classify_error, AdaptivePolicy, TransferConfig};
49use crate::util::{file_present_and_verified, hash_file};
50
51/// Number of times the oracle retries a persist whose copied bytes fail their
52/// checksum but whose source still verifies (`_SNAPDIR_FILE_STORE_RETRIES`).
53const MAX_PERSIST_RETRIES: u32 = 5;
54
55/// A content-addressable store backed by a local directory (the `file://`
56/// backend).
57///
58/// Construct one with [`FileStore::new`] (parsing a `file://` URL or a bare
59/// path) or [`FileStore::from_root`] (an already-resolved directory).
60#[derive(Debug, Clone)]
61pub struct FileStore {
62    root: PathBuf,
63    config: TransferConfig,
64    /// Optional progress meter; recorded into during transfers. `None` (the
65    /// default from every constructor) means zero recording and byte-identical
66    /// behavior. Set by the CLI via [`FileStore::with_meter`].
67    meter: Option<Arc<Meter>>,
68}
69
70impl FileStore {
71    /// Builds a store from a `store` URL or path, matching the oracle's
72    /// `_snapdir_file_store_get_store_dir`.
73    ///
74    /// Accepts `file:///abs/path`, `file://localhost/abs/path`, `file://`
75    /// followed by an absolute path, or a bare absolute path. A leading
76    /// `file:` scheme (with any number of slashes, optionally `localhost`) is
77    /// rewritten to a single leading `/`, and a trailing slash is dropped.
78    #[must_use]
79    pub fn new(store: &str) -> Self {
80        Self::from_root(parse_store_dir(store))
81    }
82
83    /// Like [`new`](Self::new), but carries a [`TransferConfig`] for
84    /// concurrency / bandwidth control.
85    #[must_use]
86    pub fn new_with_config(store: &str, config: TransferConfig) -> Self {
87        Self::from_root_with_config(parse_store_dir(store), config)
88    }
89
90    /// Builds a store rooted at an already-resolved directory.
91    #[must_use]
92    pub fn from_root(root: impl Into<PathBuf>) -> Self {
93        Self::from_root_with_config(root, TransferConfig::default())
94    }
95
96    /// Like [`from_root`](Self::from_root), but carries a [`TransferConfig`] for
97    /// concurrency / bandwidth control. [`from_root`](Self::from_root) and
98    /// [`new`](Self::new) delegate here with [`TransferConfig::default`].
99    #[must_use]
100    pub fn from_root_with_config(root: impl Into<PathBuf>, config: TransferConfig) -> Self {
101        Self {
102            root: root.into(),
103            config,
104            meter: None,
105        }
106    }
107
108    /// Attaches (or clears) an optional progress [`Meter`], rides alongside
109    /// [`config`](Self::transfer_config). The copy paths record bytes-in /
110    /// bytes-out + per-object progress into it; `None` (the constructor default)
111    /// means zero recording and byte-identical behavior. The CLI sets this after
112    /// construction.
113    #[must_use]
114    pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
115        self.meter = meter;
116        self
117    }
118
119    /// Returns the store's root directory.
120    #[must_use]
121    pub fn root(&self) -> &Path {
122        &self.root
123    }
124
125    /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
126    /// with. Consumed by the transfer loops in later gates.
127    #[must_use]
128    pub fn transfer_config(&self) -> &TransferConfig {
129        &self.config
130    }
131
132    /// Absolute on-disk path of an object given its checksum.
133    fn object_disk_path(&self, checksum: &str) -> PathBuf {
134        self.root.join(object_path(checksum))
135    }
136
137    /// Absolute on-disk path of a manifest given its snapshot id.
138    fn manifest_disk_path(&self, id: &str) -> PathBuf {
139        self.root.join(manifest_path(id))
140    }
141
142    /// Copies a batch of `(source, target, expected_checksum)` jobs through
143    /// [`persist`] across a thread pool bounded by `self.config.concurrency`.
144    ///
145    /// Local copies have no network bandwidth concern, so the async
146    /// rate-limited transfer driver does not apply here — only the concurrency
147    /// cap. The first [`StoreError`] is propagated and stops scheduling further
148    /// work (`try_for_each`). A `concurrency` of 1 yields a single-threaded
149    /// sequential copy. Each task uses a fresh, cheap, stateless
150    /// [`Blake3Hasher`] to sidestep any `Sync` concern.
151    fn parallel_copy(&self, jobs: &[(PathBuf, PathBuf, String)]) -> Result<(), StoreError> {
152        if jobs.is_empty() {
153            return Ok(());
154        }
155        match self.config.adaptive {
156            AdaptivePolicy::Off => self.parallel_copy_fixed(jobs),
157            AdaptivePolicy::On { fraction, ceiling } => {
158                self.parallel_copy_adaptive(jobs, fraction, ceiling)
159            }
160        }
161    }
162
163    /// Fixed-concurrency rayon copy: pool sized to `config.concurrency`, no gate
164    /// or driver. The historical (byte-identical) local-copy path.
165    fn parallel_copy_fixed(&self, jobs: &[(PathBuf, PathBuf, String)]) -> Result<(), StoreError> {
166        use rayon::prelude::*;
167
168        // `&Meter` is `Sync`, so it is shared across the rayon closures. `None`
169        // means zero recording and byte-identical behavior.
170        let meter = self.meter.as_deref();
171
172        let pool = rayon::ThreadPoolBuilder::new()
173            .num_threads(self.config.concurrency.get())
174            .build()
175            .map_err(|err| StoreError::Backend {
176                message: "failed to build copy thread pool".to_owned(),
177                source: Some(Box::new(err)),
178            })?;
179
180        pool.install(|| {
181            jobs.par_iter().try_for_each(|(source, target, expected)| {
182                if let Some(m) = meter {
183                    m.object_started();
184                }
185                // `persist` reads `source` and writes `target`. Record the
186                // source size as both bytes-in (read) and bytes-out (written);
187                // a missing source surfaces as the persist error below.
188                let len = std::fs::metadata(source).map_or(0, |md| md.len());
189                persist(source, target, expected, &Blake3Hasher::new())?;
190                if let Some(m) = meter {
191                    m.add_in(len);
192                    m.add_out(len);
193                    m.object_finished();
194                }
195                Ok(())
196            })
197        })
198    }
199
200    /// Adaptive rayon copy: pool sized to the policy `ceiling`, each job gated to
201    /// the controller's live limit (effective concurrency ≤ ceiling), every copy
202    /// timed + classified + recorded, with a background `std::thread` ticking the
203    /// controller (~250ms) to resize the gate. Local FS has no network rate, so
204    /// the controller drives concurrency only (no rate applier).
205    ///
206    /// The exact objects copied and the first-error-wins (`try_for_each`)
207    /// semantics are identical to [`parallel_copy_fixed`]; only scheduling
208    /// differs.
209    fn parallel_copy_adaptive(
210        &self,
211        jobs: &[(PathBuf, PathBuf, String)],
212        fraction: f64,
213        ceiling: usize,
214    ) -> Result<(), StoreError> {
215        use rayon::prelude::*;
216
217        let meter = self.meter.as_deref();
218
219        let sizes: Vec<u64> = jobs
220            .iter()
221            .map(|(source, _, _)| std::fs::metadata(source).map_or(0, |md| md.len()))
222            .collect();
223        let p95 = p95_object_size(&sizes);
224        let total_ram = snapdir_core::resources::total_ram_bytes().unwrap_or(0);
225        let policy = ControllerPolicy::new(fraction, ceiling, total_ram, None);
226
227        let gate = AdaptiveGate::new(self.config.concurrency.get(), ceiling);
228        // No rate limiter for local copies (concurrency-only control).
229        let driver = ControllerDriver::new(policy, gate.clone(), p95, None, self.meter.clone());
230
231        // Background tick thread: stop it on the shared flag once the copy ends.
232        let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
233        let tick_driver = driver.clone();
234        let tick_stop = Arc::clone(&stop);
235        let ticker = std::thread::spawn(move || {
236            while !tick_stop.load(std::sync::atomic::Ordering::Relaxed) {
237                std::thread::sleep(std::time::Duration::from_millis(250));
238                if tick_stop.load(std::sync::atomic::Ordering::Relaxed) {
239                    break;
240                }
241                tick_driver.tick();
242            }
243        });
244
245        let pool = rayon::ThreadPoolBuilder::new()
246            .num_threads(ceiling.max(1))
247            .build()
248            .map_err(|err| StoreError::Backend {
249                message: "failed to build copy thread pool".to_owned(),
250                source: Some(Box::new(err)),
251            })?;
252
253        let result = pool.install(|| {
254            jobs.par_iter().try_for_each(|(source, target, expected)| {
255                // Gate to the controller's live limit (effective concurrency).
256                let _permit = gate.acquire_blocking();
257                if let Some(m) = meter {
258                    m.object_started();
259                }
260                let len = std::fs::metadata(source).map_or(0, |md| md.len());
261                let started = std::time::Instant::now();
262                let outcome = persist(source, target, expected, &Blake3Hasher::new());
263                let latency = started.elapsed();
264                let (bytes, op_result) = match &outcome {
265                    Ok(()) => (len, OpResult::Ok),
266                    Err(err) => (0, classify_error(err)),
267                };
268                driver.record_op(OpSample {
269                    bytes,
270                    latency,
271                    result: op_result,
272                });
273                outcome?;
274                if let Some(m) = meter {
275                    m.add_in(len);
276                    m.add_out(len);
277                    m.object_finished();
278                }
279                Ok(())
280            })
281        });
282
283        // Stop the tick thread and join it before returning.
284        stop.store(true, std::sync::atomic::Ordering::Relaxed);
285        let _ = ticker.join();
286        result
287    }
288}
289
290impl Store for FileStore {
291    fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
292        let path = self.manifest_disk_path(id);
293        let bytes = match fs::read(&path) {
294            Ok(bytes) => bytes,
295            Err(err) if err.kind() == io::ErrorKind::NotFound => {
296                return Err(StoreError::ManifestNotFound { id: id.to_owned() });
297            }
298            Err(err) => return Err(StoreError::Io(err)),
299        };
300
301        // The snapshot id is BLAKE3 of the comment-stripped manifest text with
302        // the oracle's trailing `echo` newline. Verify the stored bytes hash
303        // back to `id` before trusting them (oracle: the manifest id check on
304        // fetch). `snapshot_id` in core re-renders + re-hashes the parsed
305        // manifest, so parse first, then verify against the parsed form.
306        let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
307            message: format!("manifest {id} is not valid UTF-8"),
308            source: Some(Box::new(err)),
309        })?;
310        let manifest = Manifest::parse(&text)?;
311
312        let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
313        if actual != id {
314            return Err(StoreError::Integrity {
315                address: manifest_path(id),
316                expected: id.to_owned(),
317                actual,
318            });
319        }
320
321        Ok(manifest)
322    }
323
324    fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
325        let hasher = Blake3Hasher::new();
326
327        // First, SEQUENTIAL pass: materialize every directory and pre-create
328        // each file's parent (so the parallel copies below never race on
329        // `create_dir_all` of the same ancestor), short-circuit files that are
330        // already present-and-verified (skip-if-present-and-verified — no object
331        // read at all, so a populated dest succeeds even if the store object is
332        // gone), and confirm the source object exists for the rest (preserving
333        // the `ObjectNotFound` error when a needed source is missing). The file
334        // entries that actually need copying are collected as `(source, target,
335        // checksum)` jobs for the parallel phase.
336        let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
337        for entry in manifest.entries() {
338            let rel = strip_leading_dot_slash(&entry.path);
339            let target = dest.join(rel);
340            match entry.path_type {
341                PathType::Directory => {
342                    fs::create_dir_all(&target)?;
343                }
344                PathType::File => {
345                    // A destination file that already exists and whose content
346                    // hashes to the manifest's checksum needs no copy. A
347                    // mismatching/corrupt local file falls through and is
348                    // repaired by the persist below.
349                    if file_present_and_verified(&target, &entry.checksum, &hasher) {
350                        // Skip-present: record it as skipped (advisory only).
351                        if let Some(m) = self.meter.as_deref() {
352                            m.add_skipped(1);
353                        }
354                        continue;
355                    }
356                    if let Some(parent) = target.parent() {
357                        fs::create_dir_all(parent)?;
358                    }
359                    let source = self.object_disk_path(&entry.checksum);
360                    if !source.exists() {
361                        return Err(StoreError::ObjectNotFound {
362                            checksum: entry.checksum.clone(),
363                        });
364                    }
365                    jobs.push((source, target, entry.checksum.clone()));
366                }
367            }
368        }
369
370        // Total to copy (bytes over the to-copy set), recorded so the bar can
371        // track bytes. Advisory: no effect on what is copied. No-op w/o meter.
372        if let Some(m) = self.meter.as_deref() {
373            let total: u64 = jobs
374                .iter()
375                .map(|(source, _, _)| fs::metadata(source).map_or(0, |md| md.len()))
376                .sum();
377            m.set_total(total);
378        }
379
380        // Parallel copy phase, bounded by `config.concurrency`. `try_for_each`
381        // propagates the first `StoreError` and stops scheduling new work. Each
382        // task uses a fresh, cheap, stateless `Blake3Hasher`.
383        self.parallel_copy(&jobs)
384    }
385
386    fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
387        // Compute the snapshot id of the manifest we are about to push so we
388        // can locate (and skip-if-present) its manifest file.
389        let hasher = Blake3Hasher::new();
390        let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
391        let manifest_target = self.manifest_disk_path(&id);
392
393        // Skip-if-present: nothing to do when the manifest already exists. A
394        // present manifest implies all its objects are present (we maintain
395        // that invariant by writing the manifest last).
396        if manifest_target.exists() {
397            return Ok(());
398        }
399
400        // Collect every referenced object that is absent (skip-if-present per
401        // object: an object already filed under its content address is trusted,
402        // it is content-addressable). These are copied BEFORE the manifest.
403        let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
404        for entry in manifest.entries() {
405            if entry.path_type != PathType::File {
406                continue;
407            }
408            let object_target = self.object_disk_path(&entry.checksum);
409            if object_target.exists() {
410                // Skip-present per object: record it as skipped (advisory only).
411                if let Some(m) = self.meter.as_deref() {
412                    m.add_skipped(1);
413                }
414                continue;
415            }
416            let rel = strip_leading_dot_slash(&entry.path);
417            let object_source = source.join(rel);
418            jobs.push((object_source, object_target, entry.checksum.clone()));
419        }
420
421        // Total to push (bytes over the to-push set), recorded so the bar can
422        // track bytes. Advisory: no effect on what is pushed. No-op w/o meter.
423        if let Some(m) = self.meter.as_deref() {
424            let total: u64 = jobs
425                .iter()
426                .map(|(src, _, _)| fs::metadata(src).map_or(0, |md| md.len()))
427                .sum();
428            m.set_total(total);
429        }
430
431        // Parallel copy phase, bounded by `config.concurrency`. ALL-OR-NOTHING:
432        // any error returns immediately and NO manifest is written; the
433        // manifest is written only after every object copy succeeds, preserving
434        // the invariant that a present manifest implies present objects.
435        self.parallel_copy(&jobs)?;
436
437        // Write the manifest last, via the same verify/retry/atomic-rename
438        // path, so a present manifest always implies present objects.
439        write_manifest(manifest, &manifest_target, &id, &hasher)?;
440        Ok(())
441    }
442}
443
444impl StreamStore for FileStore {
445    fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
446        Ok(self.object_disk_path(checksum).exists())
447    }
448
449    fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
450        let path = self.object_disk_path(checksum);
451        let bytes = match fs::read(&path) {
452            Ok(bytes) => bytes,
453            Err(err) if err.kind() == io::ErrorKind::NotFound => {
454                return Err(StoreError::ObjectNotFound {
455                    checksum: checksum.to_owned(),
456                });
457            }
458            Err(err) => return Err(StoreError::Io(err)),
459        };
460
461        // Verify the stored blob hashes back to its content-address before
462        // returning it — corruption must surface as `Integrity`, never as bad
463        // bytes handed to a store-to-store copy.
464        let actual = Blake3Hasher::new().hash_hex(&bytes);
465        if actual != checksum {
466            return Err(StoreError::Integrity {
467                address: path.display().to_string(),
468                expected: checksum.to_owned(),
469                actual,
470            });
471        }
472        Ok(bytes)
473    }
474
475    fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
476        // Verify BEFORE writing: a blob whose bytes do not hash to `checksum`
477        // must never land at that content-address (nothing is stored).
478        let actual = Blake3Hasher::new().hash_hex(&bytes);
479        if actual != checksum {
480            return Err(StoreError::Integrity {
481                address: object_path(checksum),
482                expected: checksum.to_owned(),
483                actual,
484            });
485        }
486
487        let target = self.object_disk_path(checksum);
488        if let Some(parent) = target.parent() {
489            fs::create_dir_all(parent)?;
490        }
491        // Temp sibling + atomic rename, the same write discipline `persist`
492        // uses, so a partially-written object is never visible at its address.
493        let tmp = temp_sibling(&target);
494        fs::write(&tmp, &bytes)?;
495        fs::rename(&tmp, &target)?;
496        Ok(())
497    }
498
499    fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
500        write_manifest(
501            manifest,
502            &self.manifest_disk_path(id),
503            id,
504            &Blake3Hasher::new(),
505        )
506    }
507}
508
509/// Copies `source` to `target`, verifying the content BLAKE3 against
510/// `expected`, retrying up to [`MAX_PERSIST_RETRIES`] times, then atomically
511/// renaming into place. Mirrors `_snapdir_file_store_persit`.
512fn persist(
513    source: &Path,
514    target: &Path,
515    expected: &str,
516    hasher: &impl Hasher,
517) -> Result<(), StoreError> {
518    if let Some(parent) = target.parent() {
519        fs::create_dir_all(parent)?;
520    }
521
522    let mut attempts_left = MAX_PERSIST_RETRIES;
523    loop {
524        // Copy to a unique temp path beside the target so the final rename is
525        // an atomic, same-filesystem move (the oracle's `.tmp` discipline).
526        let tmp = temp_sibling(target);
527        copy_file(source, &tmp)?;
528
529        let actual = hash_file(&tmp, hasher)?;
530        if actual == expected {
531            // Atomic rename into the final content-addressed location.
532            fs::rename(&tmp, target)?;
533            return Ok(());
534        }
535
536        // Copied bytes did not verify. Clean up the temp file and decide
537        // whether to retry: the oracle only retries when the *source* still
538        // hashes to the expected value, otherwise the source itself is bad.
539        let _ = fs::remove_file(&tmp);
540        let source_actual = hash_file(source, hasher)?;
541        if source_actual != expected {
542            return Err(StoreError::Integrity {
543                address: source.display().to_string(),
544                expected: expected.to_owned(),
545                actual: source_actual,
546            });
547        }
548
549        attempts_left = attempts_left.saturating_sub(1);
550        if attempts_left == 0 {
551            return Err(StoreError::Integrity {
552                address: target.display().to_string(),
553                expected: expected.to_owned(),
554                actual,
555            });
556        }
557    }
558}
559
560/// Writes a manifest's text to `target`, verifying it hashes to `id`, then
561/// atomically renaming into place. The manifest's "content" is the
562/// snapshot-id-bearing text (`Display` + trailing newline), so we verify with
563/// [`snapdir_core::merkle::snapshot_id`] rather than a raw byte hash.
564fn write_manifest(
565    manifest: &Manifest,
566    target: &Path,
567    id: &str,
568    hasher: &impl Hasher,
569) -> Result<(), StoreError> {
570    if let Some(parent) = target.parent() {
571        fs::create_dir_all(parent)?;
572    }
573
574    // The on-disk manifest must hash (snapshot_id) back to `id`. Render once
575    // and confirm before writing.
576    let actual = snapdir_core::merkle::snapshot_id(manifest, hasher);
577    if actual != id {
578        return Err(StoreError::Integrity {
579            address: target.display().to_string(),
580            expected: id.to_owned(),
581            actual,
582        });
583    }
584
585    // Oracle stores `echo "${manifest}"` — the manifest text plus a single
586    // trailing newline (the same bytes snapshot_id hashes).
587    let mut text = manifest.to_string();
588    text.push('\n');
589
590    let tmp = temp_sibling(target);
591    fs::write(&tmp, text.as_bytes())?;
592    fs::rename(&tmp, target)?;
593    Ok(())
594}
595
596/// Copies a regular file's bytes from `source` to `target` (mirrors the
597/// oracle's `cp -RL -n`: dereference, do not clobber — `target` is a fresh
598/// temp path so the no-clobber aspect is implicit).
599fn copy_file(source: &Path, target: &Path) -> Result<(), StoreError> {
600    fs::copy(source, target)?;
601    Ok(())
602}
603
604/// Builds a unique temp sibling path for `target` (same directory, so the
605/// final rename stays on one filesystem). Uses pid + a process-monotonic
606/// counter so concurrent persists never collide.
607fn temp_sibling(target: &Path) -> PathBuf {
608    use std::sync::atomic::{AtomicU64, Ordering};
609    static COUNTER: AtomicU64 = AtomicU64::new(0);
610    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
611    let pid = std::process::id();
612    let file_name = target
613        .file_name()
614        .map(|s| s.to_string_lossy().into_owned())
615        .unwrap_or_default();
616    let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
617    match target.parent() {
618        Some(parent) => parent.join(tmp_name),
619        None => PathBuf::from(tmp_name),
620    }
621}
622
623/// Strips a leading `./` (relative-mode manifest paths) and a trailing `/`
624/// (directory entries) so the remainder can be joined onto a destination root.
625fn strip_leading_dot_slash(path: &str) -> &str {
626    let trimmed = path.strip_prefix("./").unwrap_or(path);
627    trimmed.strip_suffix('/').unwrap_or(trimmed)
628}
629
630/// Resolves a `store` URL/path to its on-disk directory, matching the oracle's
631/// `_snapdir_file_store_get_store_dir`:
632///
633/// ```sh
634/// store_dir="$(echo "$store" | sed -E 's|^file:/*(localhost/?)?|/|')"
635/// echo "${store_dir%/}"
636/// ```
637///
638/// i.e. replace a leading `file:` + any number of `/` (optionally followed by
639/// `localhost` + optional `/`) with a single `/`, then strip a trailing slash.
640fn parse_store_dir(store: &str) -> PathBuf {
641    let resolved = if let Some(rest) = store.strip_prefix("file:") {
642        // Drop the run of slashes the scheme leaves behind.
643        let rest = rest.trim_start_matches('/');
644        // An optional `localhost` host segment, with an optional trailing
645        // slash, is also dropped by the oracle's regex.
646        let rest = if let Some(after) = rest.strip_prefix("localhost") {
647            after.strip_prefix('/').unwrap_or(after)
648        } else {
649            rest
650        };
651        // The regex always substitutes a single leading `/`.
652        format!("/{rest}")
653    } else {
654        store.to_owned()
655    };
656
657    // `${store_dir%/}` — strip a single trailing slash (but keep a bare "/").
658    let trimmed = if resolved.len() > 1 {
659        resolved.strip_suffix('/').unwrap_or(&resolved)
660    } else {
661        &resolved
662    };
663    PathBuf::from(trimmed)
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669    use snapdir_core::manifest::ManifestEntry;
670    use std::fs;
671    use std::path::Path;
672
673    // A tiny temp-dir helper so tests don't pull in a dev-dependency. Creates a
674    // unique directory under the system temp dir and removes it on drop.
675    struct TempDir {
676        path: PathBuf,
677    }
678
679    impl TempDir {
680        fn new(tag: &str) -> Self {
681            use std::sync::atomic::{AtomicU64, Ordering};
682            static COUNTER: AtomicU64 = AtomicU64::new(0);
683            let n = COUNTER.fetch_add(1, Ordering::Relaxed);
684            let path = std::env::temp_dir().join(format!(
685                "snapdir-filestore-test-{}-{tag}-{n}",
686                std::process::id()
687            ));
688            fs::create_dir_all(&path).expect("create temp dir");
689            Self { path }
690        }
691
692        fn path(&self) -> &Path {
693            &self.path
694        }
695    }
696
697    impl Drop for TempDir {
698        fn drop(&mut self) {
699            let _ = fs::remove_dir_all(&self.path);
700        }
701    }
702
703    /// Builds a manifest for a source tree containing `foo` ("foo\n") and
704    /// `bar` ("bar\n") and writes those files into `source`. Returns the
705    /// manifest and its snapshot id. Checksums are the real BLAKE3 of the
706    /// file bytes so the store's verification passes.
707    fn make_foo_bar_source(source: &Path) -> (Manifest, String) {
708        let hasher = Blake3Hasher::new();
709        fs::write(source.join("foo"), b"foo\n").unwrap();
710        fs::write(source.join("bar"), b"bar\n").unwrap();
711        let foo_sum = hasher.hash_hex(b"foo\n");
712        let bar_sum = hasher.hash_hex(b"bar\n");
713
714        let root_sum =
715            snapdir_core::merkle::directory_checksum([foo_sum.as_str(), bar_sum.as_str()], &hasher);
716
717        let mut manifest = Manifest::new();
718        manifest.push(ManifestEntry::new(
719            PathType::Directory,
720            "700",
721            root_sum,
722            8,
723            "./",
724        ));
725        manifest.push(ManifestEntry::new(
726            PathType::File,
727            "600",
728            bar_sum,
729            4,
730            "./bar",
731        ));
732        manifest.push(ManifestEntry::new(
733            PathType::File,
734            "600",
735            foo_sum,
736            4,
737            "./foo",
738        ));
739        let manifest = Manifest::from_entries(manifest.entries().to_vec());
740        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
741        (manifest, id)
742    }
743
744    #[test]
745    fn file_store_parse_store_dir_matches_oracle_sed() {
746        // file:// + abs path -> abs path; trailing slash stripped.
747        assert_eq!(
748            parse_store_dir("file:///tmp/store"),
749            PathBuf::from("/tmp/store")
750        );
751        assert_eq!(
752            parse_store_dir("file:///tmp/store/"),
753            PathBuf::from("/tmp/store")
754        );
755        // localhost host segment dropped.
756        assert_eq!(
757            parse_store_dir("file://localhost/tmp/store"),
758            PathBuf::from("/tmp/store")
759        );
760        // file:// + abs path with two slashes.
761        assert_eq!(
762            parse_store_dir("file://tmp/store"),
763            PathBuf::from("/tmp/store")
764        );
765        // bare absolute path left intact.
766        assert_eq!(parse_store_dir("/tmp/store"), PathBuf::from("/tmp/store"));
767        // bare root preserved.
768        assert_eq!(parse_store_dir("file:///"), PathBuf::from("/"));
769    }
770
771    #[test]
772    fn file_store_push_lands_objects_at_sharded_keys_and_manifest_last() {
773        let store_dir = TempDir::new("store");
774        let src_dir = TempDir::new("src");
775        let (manifest, id) = make_foo_bar_source(src_dir.path());
776
777        let store = FileStore::from_root(store_dir.path());
778        store.push(&manifest, src_dir.path()).expect("push ok");
779
780        // Objects land at the exact sharded keys.
781        for entry in manifest.entries() {
782            if entry.path_type == PathType::File {
783                let obj = store_dir.path().join(object_path(&entry.checksum));
784                assert!(obj.exists(), "expected object at {}", obj.display());
785                // Content matches.
786                let bytes = fs::read(&obj).unwrap();
787                assert_eq!(
788                    Blake3Hasher::new().hash_hex(&bytes),
789                    entry.checksum,
790                    "object content must hash to its address"
791                );
792            }
793        }
794
795        // Manifest written at its sharded key, and hashes back to the id.
796        let man_path = store_dir.path().join(manifest_path(&id));
797        assert!(man_path.exists(), "manifest must exist after push");
798        let read_back = store.get_manifest(&id).expect("manifest reads back");
799        assert_eq!(read_back, manifest);
800    }
801
802    #[test]
803    fn file_store_push_skips_when_manifest_present() {
804        let store_dir = TempDir::new("store");
805        let src_dir = TempDir::new("src");
806        let (manifest, id) = make_foo_bar_source(src_dir.path());
807        let store = FileStore::from_root(store_dir.path());
808        store.push(&manifest, src_dir.path()).expect("first push");
809
810        // Remove an object but keep the manifest: a second push must skip
811        // entirely (manifest-present short-circuit), leaving the object gone.
812        let foo_entry = manifest
813            .entries()
814            .iter()
815            .find(|e| e.path == "./foo")
816            .unwrap();
817        let obj = store_dir.path().join(object_path(&foo_entry.checksum));
818        fs::remove_file(&obj).unwrap();
819
820        let _ = id;
821        store
822            .push(&manifest, src_dir.path())
823            .expect("second push skips");
824        assert!(
825            !obj.exists(),
826            "manifest-present push must be a full no-op (object stays removed)"
827        );
828    }
829
830    #[test]
831    fn file_store_push_skips_present_objects_but_adds_missing() {
832        let store_dir = TempDir::new("store");
833        let src_dir = TempDir::new("src");
834        let (manifest, id) = make_foo_bar_source(src_dir.path());
835        let store = FileStore::from_root(store_dir.path());
836        store.push(&manifest, src_dir.path()).expect("first push");
837
838        // Delete the manifest and one object; re-push must re-create the
839        // missing object (and the manifest) without erroring on the present one.
840        let man_path = store_dir.path().join(manifest_path(&id));
841        fs::remove_file(&man_path).unwrap();
842        let foo_entry = manifest
843            .entries()
844            .iter()
845            .find(|e| e.path == "./foo")
846            .unwrap();
847        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
848        fs::remove_file(&foo_obj).unwrap();
849
850        store.push(&manifest, src_dir.path()).expect("re-push");
851        assert!(foo_obj.exists(), "missing object must be re-added");
852        assert!(man_path.exists(), "manifest must be re-written");
853    }
854
855    #[test]
856    fn file_store_fetch_round_trips_and_verifies() {
857        let store_dir = TempDir::new("store");
858        let src_dir = TempDir::new("src");
859        let dest_dir = TempDir::new("dest");
860        let (manifest, id) = make_foo_bar_source(src_dir.path());
861        let store = FileStore::from_root(store_dir.path());
862        store.push(&manifest, src_dir.path()).expect("push");
863
864        let fetched = store.get_manifest(&id).expect("get manifest");
865        store
866            .fetch_files(&fetched, dest_dir.path())
867            .expect("fetch files");
868
869        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
870        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
871    }
872
873    #[test]
874    fn file_store_get_manifest_missing_is_not_found() {
875        let store_dir = TempDir::new("store");
876        let store = FileStore::from_root(store_dir.path());
877        let missing = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
878        match store.get_manifest(missing) {
879            Err(StoreError::ManifestNotFound { id }) => assert_eq!(id, missing),
880            other => panic!("expected ManifestNotFound, got {other:?}"),
881        }
882    }
883
884    #[test]
885    fn file_store_get_manifest_tampered_fails_integrity() {
886        let store_dir = TempDir::new("store");
887        let src_dir = TempDir::new("src");
888        let (manifest, id) = make_foo_bar_source(src_dir.path());
889        let store = FileStore::from_root(store_dir.path());
890        store.push(&manifest, src_dir.path()).expect("push");
891
892        // Tamper with the stored manifest bytes.
893        let man_path = store_dir.path().join(manifest_path(&id));
894        fs::write(&man_path, b"D 700 deadbeef 0 ./\n").unwrap();
895
896        match store.get_manifest(&id) {
897            Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, id),
898            other => panic!("expected Integrity, got {other:?}"),
899        }
900    }
901
902    #[test]
903    fn file_store_fetch_missing_object_is_not_found() {
904        let store_dir = TempDir::new("store");
905        let dest_dir = TempDir::new("dest");
906        let hasher = Blake3Hasher::new();
907        let foo_sum = hasher.hash_hex(b"foo\n");
908
909        let mut manifest = Manifest::new();
910        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 4, "./"));
911        manifest.push(ManifestEntry::new(
912            PathType::File,
913            "600",
914            foo_sum.clone(),
915            4,
916            "./foo",
917        ));
918
919        let store = FileStore::from_root(store_dir.path());
920        match store.fetch_files(&manifest, dest_dir.path()) {
921            Err(StoreError::ObjectNotFound { checksum }) => assert_eq!(checksum, foo_sum),
922            other => panic!("expected ObjectNotFound, got {other:?}"),
923        }
924    }
925
926    #[test]
927    fn file_store_persist_rejects_corrupt_source() {
928        // A "source" object whose bytes do not match the claimed checksum must
929        // fail integrity (the oracle's "Invalid source checksum" path), not
930        // silently store corrupt data.
931        let store_dir = TempDir::new("store");
932        let src_dir = TempDir::new("src");
933        let dest_dir = TempDir::new("dest");
934        let hasher = Blake3Hasher::new();
935
936        // Real foo source/manifest, then corrupt the stored object so fetch's
937        // verify-on-copy trips and the source (the corrupt store object) fails.
938        let (manifest, id) = make_foo_bar_source(src_dir.path());
939        let store = FileStore::from_root(store_dir.path());
940        store.push(&manifest, src_dir.path()).expect("push");
941
942        let foo_entry = manifest
943            .entries()
944            .iter()
945            .find(|e| e.path == "./foo")
946            .unwrap();
947        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
948        fs::write(&foo_obj, b"corrupted not foo\n").unwrap();
949        // Sanity: the corrupted bytes really differ from the expected sum.
950        assert_ne!(hasher.hash_hex(b"corrupted not foo\n"), foo_entry.checksum);
951
952        let fetched = store.get_manifest(&id).expect("manifest still valid");
953        match store.fetch_files(&fetched, dest_dir.path()) {
954            Err(StoreError::Integrity { expected, .. }) => {
955                assert_eq!(expected, foo_entry.checksum);
956            }
957            other => panic!("expected Integrity from corrupt object, got {other:?}"),
958        }
959        // The corrupt object must NOT have been materialized at the dest.
960        assert!(!dest_dir.path().join("foo").exists());
961    }
962
963    #[test]
964    fn fetch_skip_present_verified() {
965        // Push a tree, fetch it (populating dest), then DELETE the store's whole
966        // `.objects` tree so any object read would now fail with ObjectNotFound.
967        // A second fetch into the SAME dest must still return Ok — proving every
968        // file was skipped via local checksum match (ZERO object reads).
969        let store_dir = TempDir::new("store");
970        let src_dir = TempDir::new("src");
971        let dest_dir = TempDir::new("dest");
972        let (manifest, id) = make_foo_bar_source(src_dir.path());
973
974        let store = FileStore::from_root(store_dir.path());
975        store.push(&manifest, src_dir.path()).expect("push");
976
977        let fetched = store.get_manifest(&id).expect("get manifest");
978        store
979            .fetch_files(&fetched, dest_dir.path())
980            .expect("first fetch populates dest");
981        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
982        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
983
984        // Nuke every object in the store. Any read of an object now fails.
985        let objects = store_dir.path().join(".objects");
986        fs::remove_dir_all(&objects).expect("remove .objects tree");
987        assert!(!objects.exists());
988
989        // Second fetch into the populated dest must succeed without reading a
990        // single (now-missing) object.
991        store
992            .fetch_files(&fetched, dest_dir.path())
993            .expect("second fetch skips every present+verified file (no object reads)");
994
995        // Dest contents intact.
996        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
997        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
998    }
999
1000    #[test]
1001    fn file_store_fetch_repairs_corrupt_dest_and_skips_intact() {
1002        // With store objects present: corrupt one dest file. The corrupted file
1003        // is re-fetched (repaired) to match its checksum again, while an
1004        // unrelated already-correct dest file is still skipped.
1005        let store_dir = TempDir::new("store");
1006        let src_dir = TempDir::new("src");
1007        let dest_dir = TempDir::new("dest");
1008        let (manifest, id) = make_foo_bar_source(src_dir.path());
1009
1010        let store = FileStore::from_root(store_dir.path());
1011        store.push(&manifest, src_dir.path()).expect("push");
1012        let fetched = store.get_manifest(&id).expect("get manifest");
1013        store
1014            .fetch_files(&fetched, dest_dir.path())
1015            .expect("first fetch populates dest");
1016
1017        // Corrupt `foo` in the dest; leave `bar` correct.
1018        fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
1019        // Remove `bar`'s store object so it CANNOT be re-fetched; the only way a
1020        // second fetch can succeed is if `bar` is skipped (present + verified).
1021        let bar_entry = manifest
1022            .entries()
1023            .iter()
1024            .find(|e| e.path == "./bar")
1025            .unwrap();
1026        let bar_obj = store_dir.path().join(object_path(&bar_entry.checksum));
1027        fs::remove_file(&bar_obj).unwrap();
1028
1029        store
1030            .fetch_files(&fetched, dest_dir.path())
1031            .expect("repair corrupt foo, skip intact bar");
1032
1033        // foo repaired back to its checksummed content; bar untouched.
1034        assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
1035        assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
1036    }
1037
1038    #[test]
1039    fn file_store_fetch_mismatch_then_missing_object_errors() {
1040        // Confirms the skip is checksum-gated, not mere existence: corrupt a
1041        // dest file AND remove its store object → fetch cannot repair and errors
1042        // ObjectNotFound (it did not blindly skip the present-but-wrong file).
1043        let store_dir = TempDir::new("store");
1044        let src_dir = TempDir::new("src");
1045        let dest_dir = TempDir::new("dest");
1046        let (manifest, id) = make_foo_bar_source(src_dir.path());
1047
1048        let store = FileStore::from_root(store_dir.path());
1049        store.push(&manifest, src_dir.path()).expect("push");
1050        let fetched = store.get_manifest(&id).expect("get manifest");
1051        store
1052            .fetch_files(&fetched, dest_dir.path())
1053            .expect("first fetch populates dest");
1054
1055        let foo_entry = manifest
1056            .entries()
1057            .iter()
1058            .find(|e| e.path == "./foo")
1059            .unwrap();
1060        // Corrupt the dest file so the skip gate fails for it...
1061        fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
1062        // ...and remove its store object so it cannot be repaired.
1063        let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
1064        fs::remove_file(&foo_obj).unwrap();
1065
1066        match store.fetch_files(&fetched, dest_dir.path()) {
1067            Err(StoreError::ObjectNotFound { checksum }) => {
1068                assert_eq!(checksum, foo_entry.checksum);
1069            }
1070            other => panic!("expected ObjectNotFound (cannot repair), got {other:?}"),
1071        }
1072    }
1073
1074    /// Builds a small nested tree under `source` (several files across nested
1075    /// directories) and returns its manifest + snapshot id, with real BLAKE3
1076    /// checksums so store verification passes. Layout:
1077    ///
1078    /// ```text
1079    /// ./a.txt            "a contents\n"
1080    /// ./b.txt            "b contents\n"
1081    /// ./sub/             (dir)
1082    /// ./sub/c.txt        "c contents\n"
1083    /// ./sub/deep/        (dir)
1084    /// ./sub/deep/d.txt   "d contents\n"
1085    /// ```
1086    fn make_nested_source(source: &Path) -> (Manifest, String) {
1087        let hasher = Blake3Hasher::new();
1088        let files: &[(&str, &[u8])] = &[
1089            ("a.txt", b"a contents\n"),
1090            ("b.txt", b"b contents\n"),
1091            ("sub/c.txt", b"c contents\n"),
1092            ("sub/deep/d.txt", b"d contents\n"),
1093        ];
1094
1095        fs::create_dir_all(source.join("sub/deep")).unwrap();
1096        for (rel, bytes) in files {
1097            fs::write(source.join(rel), bytes).unwrap();
1098        }
1099
1100        let mut manifest = Manifest::new();
1101        // Directory entries first; their checksums/sizes are not verified on
1102        // fetch (only files are content-addressed), so placeholder values are
1103        // fine for re-materialization. The snapshot id derivation in core hashes
1104        // the rendered text regardless, and we round-trip through it below.
1105        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1106        manifest.push(ManifestEntry::new(
1107            PathType::Directory,
1108            "700",
1109            "x",
1110            0,
1111            "./sub/",
1112        ));
1113        manifest.push(ManifestEntry::new(
1114            PathType::Directory,
1115            "700",
1116            "x",
1117            0,
1118            "./sub/deep/",
1119        ));
1120        for (rel, bytes) in files {
1121            let sum = hasher.hash_hex(bytes);
1122            #[allow(clippy::cast_possible_truncation)]
1123            manifest.push(ManifestEntry::new(
1124                PathType::File,
1125                "600",
1126                sum,
1127                bytes.len() as u64,
1128                format!("./{rel}"),
1129            ));
1130        }
1131
1132        let manifest = Manifest::from_entries(manifest.entries().to_vec());
1133        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1134        (manifest, id)
1135    }
1136
1137    /// Asserts the four nested files re-materialized byte-identically at `dest`.
1138    fn assert_nested_dest(dest: &Path) {
1139        assert_eq!(fs::read(dest.join("a.txt")).unwrap(), b"a contents\n");
1140        assert_eq!(fs::read(dest.join("b.txt")).unwrap(), b"b contents\n");
1141        assert_eq!(fs::read(dest.join("sub/c.txt")).unwrap(), b"c contents\n");
1142        assert_eq!(
1143            fs::read(dest.join("sub/deep/d.txt")).unwrap(),
1144            b"d contents\n"
1145        );
1146    }
1147
1148    #[test]
1149    fn filestore_parallel_roundtrip_byte_identical() {
1150        // A multi-threaded (concurrency=4) push+fetch round-trip of a nested
1151        // tree must re-materialize byte-identically, and a sequential
1152        // (concurrency=1) run must produce the identical store + dest.
1153        let src_dir = TempDir::new("src");
1154        let (manifest, id) = make_nested_source(src_dir.path());
1155
1156        // Parallel run.
1157        let par_store_dir = TempDir::new("store-par");
1158        let par_dest_dir = TempDir::new("dest-par");
1159        let par_store =
1160            FileStore::from_root_with_config(par_store_dir.path(), TransferConfig::new(4, None));
1161        par_store.push(&manifest, src_dir.path()).expect("par push");
1162        let par_manifest = par_store.get_manifest(&id).expect("par get manifest");
1163        assert_eq!(par_manifest, manifest, "round-tripped manifest matches");
1164        par_store
1165            .fetch_files(&par_manifest, par_dest_dir.path())
1166            .expect("par fetch");
1167        assert_nested_dest(par_dest_dir.path());
1168
1169        // Sequential run into a fresh store/dest.
1170        let seq_store_dir = TempDir::new("store-seq");
1171        let seq_dest_dir = TempDir::new("dest-seq");
1172        let seq_store =
1173            FileStore::from_root_with_config(seq_store_dir.path(), TransferConfig::new(1, None));
1174        seq_store.push(&manifest, src_dir.path()).expect("seq push");
1175        let seq_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1176        assert_eq!(seq_id, id, "snapshot id is concurrency-independent");
1177        seq_store
1178            .fetch_files(&manifest, seq_dest_dir.path())
1179            .expect("seq fetch");
1180        assert_nested_dest(seq_dest_dir.path());
1181
1182        // Both stores landed every object at the identical sharded key with
1183        // identical bytes.
1184        for entry in manifest.entries() {
1185            if entry.path_type != PathType::File {
1186                continue;
1187            }
1188            let key = object_path(&entry.checksum);
1189            let par_obj = par_store_dir.path().join(&key);
1190            let seq_obj = seq_store_dir.path().join(&key);
1191            assert!(par_obj.exists(), "par object {key} present");
1192            assert!(seq_obj.exists(), "seq object {key} present");
1193            assert_eq!(
1194                fs::read(&par_obj).unwrap(),
1195                fs::read(&seq_obj).unwrap(),
1196                "par and seq object bytes identical"
1197            );
1198        }
1199    }
1200
1201    #[test]
1202    fn filestore_adaptive_push_fetch_same_snapshot_id_and_bytes() {
1203        // INVARIANT: an adaptive (policy On, low ceiling) push+fetch produces a
1204        // byte-identical store + dest and re-ids to the SAME snapshot id as the
1205        // non-adaptive (Off) path over the same input. Adaptive only changes
1206        // scheduling, never what is transferred.
1207        use crate::transfer::AdaptivePolicy;
1208
1209        let src_dir = TempDir::new("src");
1210        let (manifest, id) = make_nested_source(src_dir.path());
1211
1212        // Off path.
1213        let off_store_dir = TempDir::new("store-off");
1214        let off_dest_dir = TempDir::new("dest-off");
1215        let off =
1216            FileStore::from_root_with_config(off_store_dir.path(), TransferConfig::new(4, None));
1217        off.push(&manifest, src_dir.path()).expect("off push");
1218        let off_manifest = off.get_manifest(&id).expect("off manifest");
1219        off.fetch_files(&off_manifest, off_dest_dir.path())
1220            .expect("off fetch");
1221
1222        // Adaptive path (low ceiling = 2).
1223        let on_store_dir = TempDir::new("store-on");
1224        let on_dest_dir = TempDir::new("dest-on");
1225        let on_cfg = TransferConfig::new(4, None).with_adaptive(AdaptivePolicy::On {
1226            fraction: 0.8,
1227            ceiling: 2,
1228        });
1229        let on = FileStore::from_root_with_config(on_store_dir.path(), on_cfg);
1230        on.push(&manifest, src_dir.path()).expect("adaptive push");
1231        let on_manifest = on.get_manifest(&id).expect("adaptive manifest");
1232        on.fetch_files(&on_manifest, on_dest_dir.path())
1233            .expect("adaptive fetch");
1234
1235        // Same snapshot id, re-derived from the round-tripped manifest.
1236        let off_id = snapdir_core::merkle::snapshot_id(&off_manifest, &Blake3Hasher::new());
1237        let on_id = snapdir_core::merkle::snapshot_id(&on_manifest, &Blake3Hasher::new());
1238        assert_eq!(off_id, on_id, "adaptive must re-id to the same snapshot");
1239        assert_eq!(on_id, id, "and it matches the original id");
1240        assert_eq!(off_manifest, on_manifest, "round-tripped manifests equal");
1241
1242        // Byte-identical objects at identical sharded keys + identical dest trees.
1243        for entry in manifest.entries() {
1244            if entry.path_type != PathType::File {
1245                continue;
1246            }
1247            let key = object_path(&entry.checksum);
1248            let off_obj = off_store_dir.path().join(&key);
1249            let on_obj = on_store_dir.path().join(&key);
1250            assert!(on_obj.exists(), "adaptive object {key} present");
1251            assert_eq!(
1252                fs::read(&off_obj).unwrap(),
1253                fs::read(&on_obj).unwrap(),
1254                "Off vs On object bytes identical for {key}"
1255            );
1256        }
1257        assert_nested_dest(off_dest_dir.path());
1258        assert_nested_dest(on_dest_dir.path());
1259    }
1260
1261    #[test]
1262    fn filestore_parallel_concurrency_one_sequential() {
1263        // The concurrency=1 (single-thread pool) path is a correct sequential
1264        // copy: round-trips byte-identically.
1265        let store_dir = TempDir::new("store");
1266        let src_dir = TempDir::new("src");
1267        let dest_dir = TempDir::new("dest");
1268        let (manifest, id) = make_nested_source(src_dir.path());
1269
1270        let store =
1271            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(1, None));
1272        store.push(&manifest, src_dir.path()).expect("push");
1273        let fetched = store.get_manifest(&id).expect("get manifest");
1274        store.fetch_files(&fetched, dest_dir.path()).expect("fetch");
1275        assert_nested_dest(dest_dir.path());
1276    }
1277
1278    #[test]
1279    fn filestore_parallel_all_or_nothing_bad_object() {
1280        // A source file whose bytes do not match its manifest checksum must make
1281        // push fail with `Integrity` AND write NO manifest (all-or-nothing:
1282        // manifest is written only after every parallel object copy succeeds).
1283        let store_dir = TempDir::new("store");
1284        let src_dir = TempDir::new("src");
1285        let (manifest, id) = make_nested_source(src_dir.path());
1286
1287        // Corrupt one source file so its bytes no longer hash to the manifest
1288        // checksum; persist's source-verify trips and push returns Integrity.
1289        fs::write(src_dir.path().join("sub/c.txt"), b"TAMPERED\n").unwrap();
1290
1291        let store =
1292            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1293        match store.push(&manifest, src_dir.path()) {
1294            Err(StoreError::Integrity { .. }) => {}
1295            other => panic!("expected Integrity from bad source object, got {other:?}"),
1296        }
1297
1298        // ALL-OR-NOTHING: the manifest must NOT have been written.
1299        let man_path = store.manifest_disk_path(&id);
1300        assert!(
1301            !man_path.exists(),
1302            "manifest must not be written when an object copy fails"
1303        );
1304    }
1305
1306    #[test]
1307    fn filestore_parallel_large_n_round_trips() {
1308        // Exercise the concurrency bound with N >> concurrency files.
1309        let store_dir = TempDir::new("store");
1310        let src_dir = TempDir::new("src");
1311        let dest_dir = TempDir::new("dest");
1312        let hasher = Blake3Hasher::new();
1313
1314        let mut manifest = Manifest::new();
1315        manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1316        let n = 50usize;
1317        for i in 0..n {
1318            let name = format!("file-{i:03}.txt");
1319            let contents = format!("contents of file {i}\n");
1320            fs::write(src_dir.path().join(&name), contents.as_bytes()).unwrap();
1321            let sum = hasher.hash_hex(contents.as_bytes());
1322            #[allow(clippy::cast_possible_truncation)]
1323            manifest.push(ManifestEntry::new(
1324                PathType::File,
1325                "600",
1326                sum,
1327                contents.len() as u64,
1328                format!("./{name}"),
1329            ));
1330        }
1331        let manifest = Manifest::from_entries(manifest.entries().to_vec());
1332        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1333
1334        let store =
1335            FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1336        store.push(&manifest, src_dir.path()).expect("push N files");
1337        let fetched = store.get_manifest(&id).expect("get manifest");
1338        store
1339            .fetch_files(&fetched, dest_dir.path())
1340            .expect("fetch N files");
1341
1342        for i in 0..n {
1343            let name = format!("file-{i:03}.txt");
1344            let expected = format!("contents of file {i}\n");
1345            assert_eq!(
1346                fs::read(dest_dir.path().join(&name)).unwrap(),
1347                expected.as_bytes()
1348            );
1349        }
1350    }
1351
1352    #[test]
1353    fn meter_records_filestore_push_fetch() {
1354        // A FileStore wired `with_meter` doing push(manifest, src) then
1355        // fetch_files(manifest, dest) records add_in/add_out + objects_done
1356        // matching the object set. Push and fetch each touch every object once,
1357        // so over the two operations bytes_in/out == 2 * total bytes and
1358        // objects_done == 2 * N.
1359        let src_dir = TempDir::new("src");
1360        let (manifest, id) = make_nested_source(src_dir.path());
1361
1362        let n = manifest
1363            .entries()
1364            .iter()
1365            .filter(|e| e.path_type == PathType::File)
1366            .count() as u64;
1367        let total_bytes: u64 = manifest
1368            .entries()
1369            .iter()
1370            .filter(|e| e.path_type == PathType::File)
1371            .map(|e| e.size)
1372            .sum();
1373
1374        let store_dir = TempDir::new("store");
1375        let dest_dir = TempDir::new("dest");
1376        let meter = Arc::new(Meter::new());
1377        let store = FileStore::from_root(store_dir.path()).with_meter(Some(Arc::clone(&meter)));
1378
1379        store.push(&manifest, src_dir.path()).expect("push");
1380        let after_push = meter.snapshot();
1381        assert_eq!(after_push.bytes_in, total_bytes, "push read every object");
1382        assert_eq!(after_push.bytes_out, total_bytes, "push wrote every object");
1383        assert_eq!(after_push.objects_done, n, "push finished N objects");
1384        assert_eq!(after_push.objects_skipped, 0, "fresh store skips nothing");
1385        assert_eq!(after_push.objects_total, total_bytes, "push set byte total");
1386        assert_eq!(after_push.in_flight, 0, "nothing left in flight");
1387
1388        let fetched = store.get_manifest(&id).expect("get manifest");
1389        store
1390            .fetch_files(&fetched, dest_dir.path())
1391            .expect("fetch_files");
1392        let after_fetch = meter.snapshot();
1393        assert_eq!(
1394            after_fetch.bytes_in,
1395            2 * total_bytes,
1396            "fetch read every object again"
1397        );
1398        assert_eq!(
1399            after_fetch.bytes_out,
1400            2 * total_bytes,
1401            "fetch wrote every object again"
1402        );
1403        assert_eq!(after_fetch.objects_done, 2 * n, "push + fetch = 2N objects");
1404        assert_eq!(after_fetch.in_flight, 0, "nothing left in flight");
1405
1406        // Dest materialized correctly.
1407        assert_nested_dest(dest_dir.path());
1408    }
1409
1410    #[test]
1411    fn meter_records_none_is_identical() {
1412        // The same push+fetch with NO meter produces byte-identical store/dest
1413        // contents and the same snapshot id as a metered run — recording changes
1414        // nothing.
1415        let src_dir = TempDir::new("src");
1416        let (manifest, id) = make_nested_source(src_dir.path());
1417
1418        // Metered run.
1419        let metered_store_dir = TempDir::new("store-metered");
1420        let metered_dest_dir = TempDir::new("dest-metered");
1421        let meter = Arc::new(Meter::new());
1422        let metered =
1423            FileStore::from_root(metered_store_dir.path()).with_meter(Some(Arc::clone(&meter)));
1424        metered
1425            .push(&manifest, src_dir.path())
1426            .expect("metered push");
1427        let metered_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1428        let metered_manifest = metered.get_manifest(&id).expect("metered manifest");
1429        metered
1430            .fetch_files(&metered_manifest, metered_dest_dir.path())
1431            .expect("metered fetch");
1432
1433        // Unmetered run (meter is None — the constructor default).
1434        let plain_store_dir = TempDir::new("store-plain");
1435        let plain_dest_dir = TempDir::new("dest-plain");
1436        let plain = FileStore::from_root(plain_store_dir.path());
1437        plain.push(&manifest, src_dir.path()).expect("plain push");
1438        let plain_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1439        let plain_manifest = plain.get_manifest(&id).expect("plain manifest");
1440        plain
1441            .fetch_files(&plain_manifest, plain_dest_dir.path())
1442            .expect("plain fetch");
1443
1444        // Same snapshot id.
1445        assert_eq!(metered_id, plain_id, "snapshot id unaffected by the meter");
1446        assert_eq!(metered_id, id);
1447
1448        // Byte-identical objects at identical sharded keys.
1449        for entry in manifest.entries() {
1450            if entry.path_type != PathType::File {
1451                continue;
1452            }
1453            let key = object_path(&entry.checksum);
1454            let metered_obj = metered_store_dir.path().join(&key);
1455            let plain_obj = plain_store_dir.path().join(&key);
1456            assert!(metered_obj.exists(), "metered object {key} present");
1457            assert!(plain_obj.exists(), "plain object {key} present");
1458            assert_eq!(
1459                fs::read(&metered_obj).unwrap(),
1460                fs::read(&plain_obj).unwrap(),
1461                "metered and unmetered object bytes identical"
1462            );
1463        }
1464
1465        // Byte-identical dest trees.
1466        assert_nested_dest(metered_dest_dir.path());
1467        assert_nested_dest(plain_dest_dir.path());
1468    }
1469
1470    #[test]
1471    fn file_store_strip_leading_dot_slash() {
1472        assert_eq!(strip_leading_dot_slash("./foo"), "foo");
1473        assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
1474        assert_eq!(strip_leading_dot_slash("./a/"), "a");
1475        assert_eq!(strip_leading_dot_slash("./"), "");
1476        assert_eq!(strip_leading_dot_slash("/abs/path"), "/abs/path");
1477    }
1478
1479    // --- StreamStore (object/manifest-level streaming) -------------------
1480    //
1481    // Hermetic: FileStore is local, so these exercise the verify discipline
1482    // (BLAKE3 round-trip + corruption rejection) without any cloud creds.
1483
1484    #[test]
1485    fn stream_store_filestore_object_roundtrip() {
1486        let store_dir = TempDir::new("stream-roundtrip");
1487        let store = FileStore::from_root(store_dir.path());
1488
1489        let bytes = b"hello stream store\n".to_vec();
1490        let checksum = Blake3Hasher::new().hash_hex(&bytes);
1491
1492        // Absent before the write.
1493        assert!(!store.has_object(&checksum).unwrap());
1494
1495        store.put_object(&checksum, bytes.clone()).expect("put ok");
1496
1497        // Present after, and the round-tripped bytes are identical.
1498        assert!(store.has_object(&checksum).unwrap());
1499        assert_eq!(store.get_object(&checksum).unwrap(), bytes);
1500
1501        // It landed at the exact sharded content-address.
1502        assert!(store_dir.path().join(object_path(&checksum)).exists());
1503    }
1504
1505    #[test]
1506    fn stream_store_get_object_rejects_corruption() {
1507        let store_dir = TempDir::new("stream-corrupt");
1508        let store = FileStore::from_root(store_dir.path());
1509
1510        // Address a blob under `checksum` but write DIFFERENT bytes directly
1511        // to its on-disk path, simulating a corrupt/tampered object.
1512        let good = b"the real object bytes\n".to_vec();
1513        let checksum = Blake3Hasher::new().hash_hex(&good);
1514        let target = store_dir.path().join(object_path(&checksum));
1515        fs::create_dir_all(target.parent().unwrap()).unwrap();
1516        fs::write(&target, b"TAMPERED bytes that do not hash to the address\n").unwrap();
1517
1518        match store.get_object(&checksum) {
1519            Err(StoreError::Integrity {
1520                expected, actual, ..
1521            }) => {
1522                assert_eq!(expected, checksum);
1523                assert_ne!(actual, checksum, "actual must differ from the address");
1524            }
1525            other => panic!("expected Integrity, got {other:?}"),
1526        }
1527    }
1528
1529    #[test]
1530    fn stream_store_put_object_rejects_wrong_checksum() {
1531        let store_dir = TempDir::new("stream-wrong-checksum");
1532        let store = FileStore::from_root(store_dir.path());
1533
1534        let bytes = b"some payload\n".to_vec();
1535        // A syntactically-valid but WRONG content-address.
1536        let wrong = "dead".repeat(16); // 64 hex chars, not the real hash.
1537        assert_ne!(wrong, Blake3Hasher::new().hash_hex(&bytes));
1538
1539        match store.put_object(&wrong, bytes) {
1540            Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, wrong),
1541            other => panic!("expected Integrity, got {other:?}"),
1542        }
1543
1544        // Nothing was stored at the bogus address.
1545        assert!(!store.has_object(&wrong).unwrap());
1546        assert!(!store_dir.path().join(object_path(&wrong)).exists());
1547    }
1548
1549    #[test]
1550    fn stream_store_put_manifest_roundtrips() {
1551        let store_dir = TempDir::new("stream-manifest");
1552        let src_dir = TempDir::new("stream-manifest-src");
1553        let store = FileStore::from_root(store_dir.path());
1554
1555        let (manifest, id) = make_foo_bar_source(src_dir.path());
1556
1557        store.put_manifest(&id, &manifest).expect("put_manifest ok");
1558
1559        // get_manifest reads it back, re-verifies the id, and yields an equal
1560        // manifest.
1561        let back = store.get_manifest(&id).expect("get_manifest ok");
1562        assert_eq!(back.entries(), manifest.entries());
1563        assert_eq!(
1564            snapdir_core::merkle::snapshot_id(&back, &Blake3Hasher::new()),
1565            id
1566        );
1567    }
1568}