Skip to main content

snapdir_stores/
sync.rs

1//! Streaming store-to-store snapshot copy.
2//!
3//! [`sync_snapshot`] copies ONE snapshot — its manifest plus every raw object
4//! it references — directly from a source [`StreamStore`] to a destination
5//! [`StreamStore`], **through memory only**. There is no local filesystem
6//! staging: the function signature deliberately takes **no
7//! [`Path`](std::path::Path)** anywhere, so a blob read out of the source can
8//! only ever flow into the destination (it never touches scratch/cache on disk).
9//!
10//! # Sync methods, rayon threads — not the async driver
11//!
12//! [`StreamStore`]'s methods are **synchronous**: the network backends drive
13//! their async SDK calls on an internal runtime via `block_on`. Driving them
14//! from the async [`run_concurrent`](crate::transfer::run_concurrent) /
15//! [`RateLimiter`](crate::transfer::RateLimiter) would nest one tokio runtime
16//! inside another and panic. So this orchestrator parallelizes object copies
17//! across a **rayon [`ThreadPool`](rayon::ThreadPool)** sized to
18//! [`TransferConfig::concurrency`] — exactly the pattern
19//! [`FileStore::parallel_copy`](crate::file_store::FileStore). Rayon workers are
20//! plain OS threads, so each one may safely call the `block_on`-ing sync
21//! `get_object`/`put_object`. Bandwidth is throttled by the **synchronous**
22//! [`BlockingRateLimiter`] (one shared bucket via [`Arc`]), never the async
23//! [`RateLimiter`](crate::transfer::RateLimiter).
24//!
25//! # Invariants
26//!
27//! - **Skip-present / incremental:** an object the destination already
28//!   [`has_object`](StreamStore::has_object) is not re-copied.
29//! - **Manifest-last / all-or-nothing:** the destination manifest is written
30//!   only after every referenced object has landed. On the first object error
31//!   the copy stops and NO manifest is written, so a destination manifest always
32//!   implies its objects are present (mirroring
33//!   [`push`](snapdir_core::store::Store::push)).
34//! - **Verified:** every blob is BLAKE3-verified by the underlying
35//!   [`StreamStore`] on both read and write, and the source manifest is verified
36//!   to hash to `id` by [`get_manifest`](snapdir_core::store::Store::get_manifest).
37
38use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
39use std::sync::Arc;
40
41use snapdir_core::manifest::PathType;
42use snapdir_core::store::StoreError;
43use snapdir_core::{Meter, Phase};
44
45use crate::adaptive::{
46    p95_object_size, AdaptiveGate, AdaptivePolicy as ControllerPolicy, ControllerDriver, OpResult,
47    OpSample,
48};
49use crate::stream::StreamStore;
50use crate::transfer::{classify_error, AdaptivePolicy, BlockingRateLimiter, TransferConfig};
51
52/// Outcome of a [`sync_snapshot`] call.
53///
54/// When `dry_run` is `true`, `objects_copied` is the number of objects that
55/// *would* be copied (those absent from the destination) and `bytes_copied`
56/// stays `0` — nothing is read or written.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct SyncReport {
59    /// Objects actually copied source → dest (or, in a dry run, that would be
60    /// copied).
61    pub objects_copied: usize,
62    /// Objects skipped because the destination already held them.
63    pub objects_skipped: usize,
64    /// Total bytes copied into the destination (always `0` for a dry run).
65    pub bytes_copied: u64,
66    /// Whether this was a dry run (no reads, writes, or manifest).
67    pub dry_run: bool,
68}
69
70/// Copies one snapshot's manifest + raw objects directly from `from` to `to`,
71/// through memory only (no local filesystem staging).
72///
73/// See the [module docs](crate::sync) for the rayon-pool / sync-rate-limiter
74/// design and the manifest-last invariant. The function takes **no
75/// [`Path`](std::path::Path)** — that is the structural guarantee nothing is
76/// staged on local disk.
77///
78/// # Fast path
79///
80/// If the destination already has the manifest for `id`, the snapshot is fully
81/// mirrored and a zero-transfer [`SyncReport`] is returned without touching the
82/// source's objects.
83///
84/// # Errors
85///
86/// Returns the first [`StoreError`] from any manifest/object operation. On an
87/// object error NO destination manifest is written.
88#[allow(clippy::too_many_lines)]
89pub fn sync_snapshot(
90    from: &(dyn StreamStore + Sync),
91    to: &(dyn StreamStore + Sync),
92    id: &str,
93    config: &TransferConfig,
94    dry_run: bool,
95    meter: Option<&Meter>,
96) -> Result<SyncReport, StoreError> {
97    // Fast path: a destination manifest implies all its objects are present, so
98    // an already-mirrored snapshot needs no work (and no source reads).
99    if to.get_manifest(id).is_ok() {
100        return Ok(SyncReport {
101            objects_copied: 0,
102            objects_skipped: 0,
103            bytes_copied: 0,
104            dry_run,
105        });
106    }
107
108    // Verifies the source manifest hashes back to `id` before we trust it.
109    let manifest = from.get_manifest(id)?;
110
111    // Sync moves content objects, not the directory tree, so only File entries
112    // carry an object to copy.
113    let files: Vec<&str> = manifest
114        .entries()
115        .iter()
116        .filter(|e| e.path_type == PathType::File)
117        .map(|e| e.checksum.as_str())
118        .collect();
119    // Manifest-declared object sizes (for the adaptive controller's memory
120    // guardrail p95). Advisory only; never gates what is copied.
121    let object_sizes: Vec<u64> = manifest
122        .entries()
123        .iter()
124        .filter(|e| e.path_type == PathType::File)
125        .map(|e| e.size)
126        .collect();
127
128    // Advisory progress: we are entering the transfer phase, and the total is
129    // the sum of the to-copy object sizes (the File entries' manifest sizes).
130    // No effect on what is copied; a no-op without a meter.
131    if let Some(m) = meter {
132        m.set_phase(Phase::Transfer);
133        let total: u64 = manifest
134            .entries()
135            .iter()
136            .filter(|e| e.path_type == PathType::File)
137            .map(|e| e.size)
138            .sum();
139        m.set_total(total);
140    }
141
142    let copied = AtomicUsize::new(0);
143    let skipped = AtomicUsize::new(0);
144    let bytes = AtomicU64::new(0);
145
146    // One shared synchronous token bucket across all rayon workers (Arc so the
147    // closure can be Sync/shared). Unlimited when max_bytes_per_sec is None/0.
148    let limiter = Arc::new(BlockingRateLimiter::new(config.max_bytes_per_sec));
149
150    if !files.is_empty() {
151        // The per-object copy step, shared by the fixed and adaptive passes so
152        // they copy byte-identically (only scheduling/rate differ). `report` is
153        // called with the measured `OpSample` after each copy (a no-op for the
154        // fixed path; feeds the controller in the adaptive path).
155        let copy_one = |checksum: &str, report: &dyn Fn(OpSample)| -> Result<(), StoreError> {
156            if to.has_object(checksum)? {
157                skipped.fetch_add(1, Ordering::Relaxed);
158                if let Some(m) = meter {
159                    m.add_skipped(1);
160                }
161                return Ok(());
162            }
163            if dry_run {
164                // Count as "would copy"; never read or write anything.
165                copied.fetch_add(1, Ordering::Relaxed);
166                return Ok(());
167            }
168            // Bytes live only in memory: read from source, throttle, write to
169            // dest. Never written to any path.
170            if let Some(m) = meter {
171                m.object_started();
172            }
173            let started = std::time::Instant::now();
174            let outcome = (|| {
175                let blob = from.get_object(checksum)?;
176                let len = blob.len() as u64;
177                // Read from source (bytes-in).
178                if let Some(m) = meter {
179                    m.add_in(len);
180                }
181                limiter.acquire_blocking(len);
182                to.put_object(checksum, blob)?;
183                Ok::<u64, StoreError>(len)
184            })();
185            let latency = started.elapsed();
186            match &outcome {
187                Ok(len) => report(OpSample {
188                    bytes: *len,
189                    latency,
190                    result: OpResult::Ok,
191                }),
192                Err(err) => report(OpSample {
193                    bytes: 0,
194                    latency,
195                    result: classify_error(err),
196                }),
197            }
198            let len = outcome?;
199            // Written to dest (bytes-out), object done.
200            if let Some(m) = meter {
201                m.add_out(len);
202                m.object_finished();
203            }
204            copied.fetch_add(1, Ordering::Relaxed);
205            bytes.fetch_add(len, Ordering::Relaxed);
206            Ok(())
207        };
208
209        match config.adaptive {
210            AdaptivePolicy::Off => {
211                let pool = rayon::ThreadPoolBuilder::new()
212                    .num_threads(config.concurrency.get())
213                    .build()
214                    .map_err(|err| StoreError::Backend {
215                        message: "failed to build sync thread pool".to_owned(),
216                        source: Some(Box::new(err)),
217                    })?;
218                let noop = |_: OpSample| {};
219                pool.install(|| {
220                    use rayon::prelude::*;
221                    files
222                        .par_iter()
223                        .try_for_each(|checksum| copy_one(checksum, &noop))
224                })?;
225            }
226            AdaptivePolicy::On { fraction, ceiling } => {
227                sync_objects_adaptive(
228                    &files,
229                    &object_sizes,
230                    config,
231                    &limiter,
232                    meter,
233                    fraction,
234                    ceiling,
235                    &copy_one,
236                )?;
237            }
238        }
239    }
240
241    // Manifest-last / all-or-nothing: only after every object copy succeeded
242    // (and never in a dry run) do we write the destination manifest, so a
243    // present manifest always implies present objects.
244    if !dry_run {
245        to.put_manifest(id, &manifest)?;
246    }
247
248    Ok(SyncReport {
249        objects_copied: copied.into_inner(),
250        objects_skipped: skipped.into_inner(),
251        bytes_copied: bytes.into_inner(),
252        dry_run,
253    })
254}
255
256/// Adaptive store-to-store copy pass: pool sized to the policy `ceiling`, each
257/// object gated to the controller's live limit (effective concurrency ≤
258/// ceiling), every copy timed + classified + recorded via `copy_one`'s report
259/// hook, with a background `std::thread` ticking the controller (~250ms) to
260/// resize the gate and retune the shared [`BlockingRateLimiter`]. The exact
261/// objects copied and first-error-wins semantics are identical to the fixed
262/// pass; only scheduling/rate differ.
263#[allow(clippy::too_many_arguments)]
264fn sync_objects_adaptive<C>(
265    files: &[&str],
266    object_sizes: &[u64],
267    config: &TransferConfig,
268    limiter: &Arc<BlockingRateLimiter>,
269    meter: Option<&Meter>,
270    fraction: f64,
271    ceiling: usize,
272    copy_one: &C,
273) -> Result<(), StoreError>
274where
275    C: Fn(&str, &dyn Fn(OpSample)) -> Result<(), StoreError> + Sync,
276{
277    use rayon::prelude::*;
278
279    let p95 = p95_object_size(object_sizes);
280    let total_ram = snapdir_core::resources::total_ram_bytes().unwrap_or(0);
281    let policy = ControllerPolicy::new(fraction, ceiling, total_ram, config.max_bytes_per_sec);
282
283    let gate = AdaptiveGate::new(config.concurrency.get(), ceiling);
284
285    // Retune the shared synchronous limiter live (its `set_rate` is sync).
286    let blocking_limiter = Arc::clone(limiter);
287    let rate_applier: Arc<dyn Fn(Option<u64>) + Send + Sync> =
288        Arc::new(move |rate| blocking_limiter.set_rate(rate));
289    // The orchestrator only has a borrowed `&Meter`; the driver's optional
290    // display-meter mirror needs an owned `Arc<Meter>`, so the live limit/rate
291    // display is left to the meter recording in `copy_one` (None here). The
292    // controller still drives concurrency + rate correctly.
293    let _ = meter;
294    let driver = ControllerDriver::new(policy, gate.clone(), p95, Some(rate_applier), None);
295
296    // Background tick thread, stopped on the shared flag once the copy ends.
297    let stop = Arc::new(AtomicBool::new(false));
298    let tick_driver = driver.clone();
299    let tick_stop = Arc::clone(&stop);
300    let ticker = std::thread::spawn(move || {
301        while !tick_stop.load(Ordering::Relaxed) {
302            std::thread::sleep(std::time::Duration::from_millis(250));
303            if tick_stop.load(Ordering::Relaxed) {
304                break;
305            }
306            tick_driver.tick();
307        }
308    });
309
310    let pool = rayon::ThreadPoolBuilder::new()
311        .num_threads(ceiling.max(1))
312        .build()
313        .map_err(|err| StoreError::Backend {
314            message: "failed to build sync thread pool".to_owned(),
315            source: Some(Box::new(err)),
316        })?;
317
318    let result = pool.install(|| {
319        files.par_iter().try_for_each(|checksum| {
320            // Gate to the controller's live limit (effective concurrency).
321            let _permit = gate.acquire_blocking();
322            let report = |sample: OpSample| driver.record_op(sample);
323            copy_one(checksum, &report)
324        })
325    });
326
327    stop.store(true, Ordering::Relaxed);
328    let _ = ticker.join();
329    result
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use crate::file_store::FileStore;
336    use snapdir_core::manifest::{Manifest, ManifestEntry};
337    use snapdir_core::merkle::{Blake3Hasher, Hasher};
338    use snapdir_core::store::Store;
339    use std::fs;
340    use std::path::{Path, PathBuf};
341    use std::sync::Mutex;
342
343    /// A tiny temp-dir helper so tests don't pull in a dev-dependency. Creates a
344    /// unique directory under the system temp dir and removes it on drop.
345    struct TempDir {
346        path: PathBuf,
347    }
348
349    impl TempDir {
350        fn new(tag: &str) -> Self {
351            use std::sync::atomic::AtomicU64;
352            static COUNTER: AtomicU64 = AtomicU64::new(0);
353            let n = COUNTER.fetch_add(1, Ordering::Relaxed);
354            let path = std::env::temp_dir().join(format!(
355                "snapdir-sync-test-{}-{tag}-{n}",
356                std::process::id()
357            ));
358            fs::create_dir_all(&path).expect("create temp dir");
359            Self { path }
360        }
361
362        fn path(&self) -> &Path {
363            &self.path
364        }
365    }
366
367    impl Drop for TempDir {
368        fn drop(&mut self) {
369            let _ = fs::remove_dir_all(&self.path);
370        }
371    }
372
373    /// Builds a small multi-file source tree (`a`, `b`, `c`) under `source` and
374    /// returns its manifest + snapshot id. Checksums are the real BLAKE3 of the
375    /// file bytes so store verification passes.
376    fn make_source(source: &Path) -> (Manifest, String) {
377        let hasher = Blake3Hasher::new();
378        let files: [(&str, &[u8]); 3] = [("a", b"alpha\n"), ("b", b"bravo\n"), ("c", b"charlie\n")];
379        let mut sums: Vec<(String, String, u64)> = Vec::new();
380        for (name, bytes) in files {
381            fs::write(source.join(name), bytes).unwrap();
382            sums.push((
383                (*name).to_owned(),
384                hasher.hash_hex(bytes),
385                bytes.len() as u64,
386            ));
387        }
388        let root_sum = snapdir_core::merkle::directory_checksum(
389            sums.iter().map(|(_, s, _)| s.as_str()),
390            &hasher,
391        );
392
393        let mut entries = vec![ManifestEntry::new(
394            PathType::Directory,
395            "700",
396            root_sum,
397            0,
398            "./",
399        )];
400        for (name, sum, size) in &sums {
401            entries.push(ManifestEntry::new(
402                PathType::File,
403                "600",
404                sum.clone(),
405                *size,
406                format!("./{name}"),
407            ));
408        }
409        let manifest = Manifest::from_entries(entries);
410        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
411        (manifest, id)
412    }
413
414    /// The number of File objects in `manifest`.
415    fn object_count(manifest: &Manifest) -> usize {
416        manifest
417            .entries()
418            .iter()
419            .filter(|e| e.path_type == PathType::File)
420            .count()
421    }
422
423    fn cfg() -> TransferConfig {
424        TransferConfig::new(4, None)
425    }
426
427    #[test]
428    fn sync_snapshot_mirrors_snapshot() {
429        let a_dir = TempDir::new("a");
430        let b_dir = TempDir::new("b");
431        let src_dir = TempDir::new("src");
432        let (manifest, id) = make_source(src_dir.path());
433        let n = object_count(&manifest);
434
435        let a = FileStore::from_root(a_dir.path());
436        let b = FileStore::from_root(b_dir.path());
437        a.push(&manifest, src_dir.path()).expect("stage into A");
438
439        let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
440
441        assert_eq!(report.objects_copied, n);
442        assert_eq!(report.objects_skipped, 0);
443        assert!(!report.dry_run);
444        // B has the manifest and every object.
445        b.get_manifest(&id).expect("B has manifest");
446        for entry in manifest.entries() {
447            if entry.path_type == PathType::File {
448                assert!(
449                    b.has_object(&entry.checksum).expect("has_object ok"),
450                    "B missing object {}",
451                    entry.checksum
452                );
453            }
454        }
455    }
456
457    #[test]
458    fn meter_records_sync() {
459        // A multi-object snapshot synced A -> empty B records bytes-in ==
460        // bytes-out == total object bytes, objects_done == N, skipped == 0; a
461        // second sync into the now-populated B records the fast-path /
462        // skip-everything outcome (no copies).
463        let a_dir = TempDir::new("a");
464        let b_dir = TempDir::new("b");
465        let src_dir = TempDir::new("src");
466        let (manifest, id) = make_source(src_dir.path());
467        let n = object_count(&manifest);
468
469        // Total File-object bytes from the manifest sizes.
470        let total_bytes: u64 = manifest
471            .entries()
472            .iter()
473            .filter(|e| e.path_type == PathType::File)
474            .map(|e| e.size)
475            .sum();
476
477        let a = FileStore::from_root(a_dir.path());
478        let b = FileStore::from_root(b_dir.path());
479        a.push(&manifest, src_dir.path()).expect("stage into A");
480
481        let meter = Arc::new(Meter::new());
482        let report =
483            sync_snapshot(&a, &b, &id, &cfg(), false, Some(&meter)).expect("first meter sync");
484        assert_eq!(report.objects_copied, n);
485
486        let snap = meter.snapshot();
487        assert_eq!(snap.bytes_in, total_bytes, "bytes_in == total object bytes");
488        assert_eq!(
489            snap.bytes_out, total_bytes,
490            "bytes_out == total object bytes"
491        );
492        assert_eq!(snap.objects_done, n as u64, "objects_done == N");
493        assert_eq!(snap.objects_skipped, 0, "nothing skipped on a fresh dest");
494        assert_eq!(snap.objects_total, total_bytes, "total == bytes total");
495        assert_eq!(snap.in_flight, 0, "no objects left in flight");
496        assert_eq!(snap.phase, Phase::Transfer, "phase set to Transfer");
497
498        // Second sync into the now-fully-mirrored B. The fast path (dest has the
499        // manifest) short-circuits, so this records no new copies. Pre-seed every
500        // object into a fresh B' WITHOUT its manifest to exercise the per-object
501        // skip branch and assert objects_skipped == N, objects_done == 0.
502        let seed_dir = TempDir::new("seed");
503        let seeded = FileStore::from_root(seed_dir.path());
504        for entry in manifest.entries() {
505            if entry.path_type == PathType::File {
506                let blob = a.get_object(&entry.checksum).expect("get from A");
507                seeded.put_object(&entry.checksum, blob).expect("seed dest");
508            }
509        }
510        let later = Arc::new(Meter::new());
511        let later_report = sync_snapshot(&a, &seeded, &id, &cfg(), false, Some(&later))
512            .expect("second meter sync");
513        assert_eq!(
514            later_report.objects_skipped, n,
515            "all objects already present"
516        );
517        let later_snap = later.snapshot();
518        assert_eq!(later_snap.objects_skipped, n as u64, "meter skipped == N");
519        assert_eq!(later_snap.objects_done, 0, "no objects copied");
520        assert_eq!(later_snap.bytes_in, 0, "no bytes read");
521        assert_eq!(later_snap.bytes_out, 0, "no bytes written");
522    }
523
524    #[test]
525    fn sync_snapshot_skip_present_is_incremental() {
526        let a_dir = TempDir::new("a");
527        let b_dir = TempDir::new("b");
528        let src_dir = TempDir::new("src");
529        let (manifest, id) = make_source(src_dir.path());
530        let n = object_count(&manifest);
531
532        let a = FileStore::from_root(a_dir.path());
533        let b = FileStore::from_root(b_dir.path());
534        a.push(&manifest, src_dir.path()).expect("stage into A");
535
536        let first = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("first sync");
537        assert_eq!(first.objects_copied, n);
538
539        // Second run: destination already mirrored → fast path returns a
540        // zero-transfer report; B is unchanged.
541        let second = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("second sync");
542        assert_eq!(second.objects_copied, 0);
543        assert_eq!(second.objects_skipped, 0);
544        assert_eq!(second.bytes_copied, 0);
545        b.get_manifest(&id).expect("B still has manifest");
546    }
547
548    #[test]
549    fn sync_snapshot_skip_present_per_object() {
550        // Pre-seed one object into B (but NOT B's manifest), so the fast path
551        // does not trigger and we exercise the per-object skip branch.
552        let a_dir = TempDir::new("a");
553        let b_dir = TempDir::new("b");
554        let src_dir = TempDir::new("src");
555        let (manifest, id) = make_source(src_dir.path());
556        let n = object_count(&manifest);
557
558        let a = FileStore::from_root(a_dir.path());
559        let b = FileStore::from_root(b_dir.path());
560        a.push(&manifest, src_dir.path()).expect("stage into A");
561
562        // Copy one object from A into B directly.
563        let first_obj = manifest
564            .entries()
565            .iter()
566            .find(|e| e.path_type == PathType::File)
567            .unwrap();
568        let blob = a.get_object(&first_obj.checksum).expect("get from A");
569        b.put_object(&first_obj.checksum, blob).expect("seed B");
570
571        let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
572        assert_eq!(report.objects_copied, n - 1);
573        assert_eq!(report.objects_skipped, 1);
574        b.get_manifest(&id).expect("B has manifest after sync");
575    }
576
577    /// A dest store that wraps a [`FileStore`] but fails `put_object` for one
578    /// chosen checksum, to drive the all-or-nothing path.
579    struct FailingPutStore {
580        inner: FileStore,
581        fail_on: String,
582        // Records which checksums were attempted, for sanity.
583        attempted: Mutex<Vec<String>>,
584    }
585
586    impl Store for FailingPutStore {
587        fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
588            self.inner.get_manifest(id)
589        }
590        fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
591            self.inner.fetch_files(manifest, dest)
592        }
593        fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
594            self.inner.push(manifest, source)
595        }
596    }
597
598    impl StreamStore for FailingPutStore {
599        fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
600            self.inner.has_object(checksum)
601        }
602        fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
603            self.inner.get_object(checksum)
604        }
605        fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
606            self.attempted.lock().unwrap().push(checksum.to_owned());
607            if checksum == self.fail_on {
608                return Err(StoreError::Backend {
609                    message: "synthetic put_object failure".to_owned(),
610                    source: None,
611                });
612            }
613            self.inner.put_object(checksum, bytes)
614        }
615        fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
616            self.inner.put_manifest(id, manifest)
617        }
618    }
619
620    #[test]
621    fn sync_snapshot_all_or_nothing() {
622        let a_dir = TempDir::new("a");
623        let b_dir = TempDir::new("b");
624        let src_dir = TempDir::new("src");
625        let (manifest, id) = make_source(src_dir.path());
626
627        let a = FileStore::from_root(a_dir.path());
628        a.push(&manifest, src_dir.path()).expect("stage into A");
629
630        // Pick a checksum to fail on.
631        let fail_on = manifest
632            .entries()
633            .iter()
634            .find(|e| e.path_type == PathType::File)
635            .unwrap()
636            .checksum
637            .clone();
638
639        let b = FailingPutStore {
640            inner: FileStore::from_root(b_dir.path()),
641            fail_on,
642            attempted: Mutex::new(Vec::new()),
643        };
644
645        // Concurrency 1 keeps the failure deterministic.
646        let one = TransferConfig::new(1, None);
647        let err =
648            sync_snapshot(&a, &b, &id, &one, false, None).expect_err("must surface put error");
649        assert!(
650            matches!(err, StoreError::Backend { ref message, .. } if message.contains("synthetic")),
651            "unexpected error: {err:?}"
652        );
653        // NO manifest written to the dest.
654        assert!(
655            b.get_manifest(&id).is_err(),
656            "dest must have no manifest after a failed sync"
657        );
658    }
659
660    #[test]
661    fn sync_snapshot_adaptive_mirrors_same_snapshot() {
662        // INVARIANT: an adaptive (policy On, low ceiling) sync mirrors the SAME
663        // snapshot id + the same objects as the non-adaptive (Off) sync over the
664        // same source. Adaptive only changes scheduling/rate.
665        let a_dir = TempDir::new("a");
666        let off_dir = TempDir::new("off");
667        let on_dir = TempDir::new("on");
668        let src_dir = TempDir::new("src");
669        let (manifest, id) = make_source(src_dir.path());
670        let n = object_count(&manifest);
671
672        let a = FileStore::from_root(a_dir.path());
673        a.push(&manifest, src_dir.path()).expect("stage into A");
674
675        let off = FileStore::from_root(off_dir.path());
676        let off_report = sync_snapshot(&a, &off, &id, &cfg(), false, None).expect("off sync");
677
678        let on = FileStore::from_root(on_dir.path());
679        let on_cfg = TransferConfig::new(4, None).with_adaptive(AdaptivePolicy::On {
680            fraction: 0.8,
681            ceiling: 2,
682        });
683        let on_report = sync_snapshot(&a, &on, &id, &on_cfg, false, None).expect("adaptive sync");
684
685        assert_eq!(off_report.objects_copied, n);
686        assert_eq!(
687            on_report.objects_copied, n,
688            "adaptive copies the same count"
689        );
690        assert_eq!(on_report.objects_skipped, 0);
691
692        // Both dests have the manifest (same id) and every object, byte-identical.
693        on.get_manifest(&id).expect("On dest has the manifest");
694        for entry in manifest.entries() {
695            if entry.path_type == PathType::File {
696                let off_blob = off.get_object(&entry.checksum).expect("off object");
697                let on_blob = on.get_object(&entry.checksum).expect("on object");
698                assert_eq!(off_blob, on_blob, "Off vs On object bytes identical");
699            }
700        }
701    }
702
703    #[test]
704    fn sync_snapshot_dry_run_writes_nothing() {
705        let a_dir = TempDir::new("a");
706        let b_dir = TempDir::new("b");
707        let src_dir = TempDir::new("src");
708        let (manifest, id) = make_source(src_dir.path());
709        let n = object_count(&manifest);
710
711        let a = FileStore::from_root(a_dir.path());
712        let b = FileStore::from_root(b_dir.path());
713        a.push(&manifest, src_dir.path()).expect("stage into A");
714
715        let report = sync_snapshot(&a, &b, &id, &cfg(), true, None).expect("dry run ok");
716        assert!(report.dry_run);
717        assert_eq!(report.objects_copied, n, "would-copy count is N");
718        assert_eq!(report.objects_skipped, 0);
719        assert_eq!(report.bytes_copied, 0);
720
721        // B has NO manifest and NO objects.
722        assert!(b.get_manifest(&id).is_err(), "dry run wrote a manifest");
723        for entry in manifest.entries() {
724            if entry.path_type == PathType::File {
725                assert!(
726                    !b.has_object(&entry.checksum).expect("has_object ok"),
727                    "dry run wrote an object"
728                );
729            }
730        }
731    }
732
733    #[test]
734    fn sync_snapshot_no_local_fs() {
735        // Hold A and B under one parent tempdir and assert sync creates NOTHING
736        // outside A's and B's store dirs (no scratch/cache). The structural
737        // guarantee is that sync_snapshot takes no &Path; this test backs it up.
738        let parent = TempDir::new("parent");
739        let a_root = parent.path().join("store-a");
740        let b_root = parent.path().join("store-b");
741        let src = parent.path().join("src");
742        fs::create_dir_all(&a_root).unwrap();
743        fs::create_dir_all(&b_root).unwrap();
744        fs::create_dir_all(&src).unwrap();
745
746        let (manifest, id) = make_source(&src);
747
748        let a = FileStore::from_root(&a_root);
749        let b = FileStore::from_root(&b_root);
750        a.push(&manifest, &src).expect("stage into A");
751
752        // Snapshot the set of top-level entries under parent before sync.
753        let before: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
754            .unwrap()
755            .map(|e| e.unwrap().path())
756            .collect();
757
758        sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
759
760        let after: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
761            .unwrap()
762            .map(|e| e.unwrap().path())
763            .collect();
764
765        assert_eq!(
766            before,
767            after,
768            "sync_snapshot created an entry outside the store dirs: {:?}",
769            after.difference(&before).collect::<Vec<_>>()
770        );
771    }
772}