Skip to main content

snapdir_stores/
sync.rs

1//! Streaming store-to-store snapshot copy.
2//!
3//! [`sync_snapshot`] copies ONE snapshot — its manifest plus every raw object
4//! it references — directly from a source [`StreamStore`] to a destination
5//! [`StreamStore`], **through memory only**. There is no local filesystem
6//! staging: the function signature deliberately takes **no
7//! [`Path`](std::path::Path)** anywhere, so a blob read out of the source can
8//! only ever flow into the destination (it never touches scratch/cache on disk).
9//!
10//! # Sync methods, rayon threads — not the async driver
11//!
12//! [`StreamStore`]'s methods are **synchronous**: the network backends drive
13//! their async SDK calls on an internal runtime via `block_on`. Driving them
14//! from the async [`run_concurrent`](crate::transfer::run_concurrent) /
15//! [`RateLimiter`](crate::transfer::RateLimiter) would nest one tokio runtime
16//! inside another and panic. So this orchestrator parallelizes object copies
17//! across a **rayon [`ThreadPool`](rayon::ThreadPool)** sized to
18//! [`TransferConfig::concurrency`] — exactly the pattern
19//! [`FileStore::parallel_copy`](crate::file_store::FileStore). Rayon workers are
20//! plain OS threads, so each one may safely call the `block_on`-ing sync
21//! `get_object`/`put_object`. Bandwidth is throttled by the **synchronous**
22//! [`BlockingRateLimiter`] (one shared bucket via [`Arc`]), never the async
23//! [`RateLimiter`](crate::transfer::RateLimiter).
24//!
25//! # Invariants
26//!
27//! - **Skip-present / incremental:** an object the destination already
28//!   [`has_object`](StreamStore::has_object) is not re-copied.
29//! - **Manifest-last / all-or-nothing:** the destination manifest is written
30//!   only after every referenced object has landed. On the first object error
31//!   the copy stops and NO manifest is written, so a destination manifest always
32//!   implies its objects are present (mirroring
33//!   [`push`](snapdir_core::store::Store::push)).
34//! - **Verified:** every blob is BLAKE3-verified by the underlying
35//!   [`StreamStore`] on both read and write, and the source manifest is verified
36//!   to hash to `id` by [`get_manifest`](snapdir_core::store::Store::get_manifest).
37
38use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
39use std::sync::Arc;
40
41use snapdir_core::manifest::PathType;
42use snapdir_core::store::StoreError;
43use snapdir_core::{Meter, Phase};
44
45use crate::stream::StreamStore;
46use crate::transfer::{BlockingRateLimiter, TransferConfig};
47
48/// Outcome of a [`sync_snapshot`] call.
49///
50/// When `dry_run` is `true`, `objects_copied` is the number of objects that
51/// *would* be copied (those absent from the destination) and `bytes_copied`
52/// stays `0` — nothing is read or written.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct SyncReport {
55    /// Objects actually copied source → dest (or, in a dry run, that would be
56    /// copied).
57    pub objects_copied: usize,
58    /// Objects skipped because the destination already held them.
59    pub objects_skipped: usize,
60    /// Total bytes copied into the destination (always `0` for a dry run).
61    pub bytes_copied: u64,
62    /// Whether this was a dry run (no reads, writes, or manifest).
63    pub dry_run: bool,
64}
65
66/// Copies one snapshot's manifest + raw objects directly from `from` to `to`,
67/// through memory only (no local filesystem staging).
68///
69/// See the [module docs](crate::sync) for the rayon-pool / sync-rate-limiter
70/// design and the manifest-last invariant. The function takes **no
71/// [`Path`](std::path::Path)** — that is the structural guarantee nothing is
72/// staged on local disk.
73///
74/// # Fast path
75///
76/// If the destination already has the manifest for `id`, the snapshot is fully
77/// mirrored and a zero-transfer [`SyncReport`] is returned without touching the
78/// source's objects.
79///
80/// # Errors
81///
82/// Returns the first [`StoreError`] from any manifest/object operation. On an
83/// object error NO destination manifest is written.
84pub fn sync_snapshot(
85    from: &(dyn StreamStore + Sync),
86    to: &(dyn StreamStore + Sync),
87    id: &str,
88    config: &TransferConfig,
89    dry_run: bool,
90    meter: Option<&Meter>,
91) -> Result<SyncReport, StoreError> {
92    // Fast path: a destination manifest implies all its objects are present, so
93    // an already-mirrored snapshot needs no work (and no source reads).
94    if to.get_manifest(id).is_ok() {
95        return Ok(SyncReport {
96            objects_copied: 0,
97            objects_skipped: 0,
98            bytes_copied: 0,
99            dry_run,
100        });
101    }
102
103    // Verifies the source manifest hashes back to `id` before we trust it.
104    let manifest = from.get_manifest(id)?;
105
106    // Sync moves content objects, not the directory tree, so only File entries
107    // carry an object to copy.
108    let files: Vec<&str> = manifest
109        .entries()
110        .iter()
111        .filter(|e| e.path_type == PathType::File)
112        .map(|e| e.checksum.as_str())
113        .collect();
114
115    // Advisory progress: we are entering the transfer phase, and the total is
116    // the sum of the to-copy object sizes (the File entries' manifest sizes).
117    // No effect on what is copied; a no-op without a meter.
118    if let Some(m) = meter {
119        m.set_phase(Phase::Transfer);
120        let total: u64 = manifest
121            .entries()
122            .iter()
123            .filter(|e| e.path_type == PathType::File)
124            .map(|e| e.size)
125            .sum();
126        m.set_total(total);
127    }
128
129    let copied = AtomicUsize::new(0);
130    let skipped = AtomicUsize::new(0);
131    let bytes = AtomicU64::new(0);
132
133    // One shared synchronous token bucket across all rayon workers (Arc so the
134    // closure can be Sync/shared). Unlimited when max_bytes_per_sec is None/0.
135    let limiter = Arc::new(BlockingRateLimiter::new(config.max_bytes_per_sec));
136
137    if !files.is_empty() {
138        let pool = rayon::ThreadPoolBuilder::new()
139            .num_threads(config.concurrency.get())
140            .build()
141            .map_err(|err| StoreError::Backend {
142                message: "failed to build sync thread pool".to_owned(),
143                source: Some(Box::new(err)),
144            })?;
145
146        pool.install(|| {
147            use rayon::prelude::*;
148            files.par_iter().try_for_each(|checksum| {
149                if to.has_object(checksum)? {
150                    skipped.fetch_add(1, Ordering::Relaxed);
151                    if let Some(m) = meter {
152                        m.add_skipped(1);
153                    }
154                    return Ok(());
155                }
156                if dry_run {
157                    // Count as "would copy"; never read or write anything.
158                    copied.fetch_add(1, Ordering::Relaxed);
159                    return Ok(());
160                }
161                // Bytes live only in memory: read from source, throttle, write
162                // to dest. Never written to any path.
163                if let Some(m) = meter {
164                    m.object_started();
165                }
166                let blob = from.get_object(checksum)?;
167                let len = blob.len() as u64;
168                // Read from source (bytes-in).
169                if let Some(m) = meter {
170                    m.add_in(len);
171                }
172                limiter.acquire_blocking(len);
173                to.put_object(checksum, blob)?;
174                // Written to dest (bytes-out), object done.
175                if let Some(m) = meter {
176                    m.add_out(len);
177                    m.object_finished();
178                }
179                copied.fetch_add(1, Ordering::Relaxed);
180                bytes.fetch_add(len, Ordering::Relaxed);
181                Ok::<(), StoreError>(())
182            })
183        })?;
184    }
185
186    // Manifest-last / all-or-nothing: only after every object copy succeeded
187    // (and never in a dry run) do we write the destination manifest, so a
188    // present manifest always implies present objects.
189    if !dry_run {
190        to.put_manifest(id, &manifest)?;
191    }
192
193    Ok(SyncReport {
194        objects_copied: copied.into_inner(),
195        objects_skipped: skipped.into_inner(),
196        bytes_copied: bytes.into_inner(),
197        dry_run,
198    })
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use crate::file_store::FileStore;
205    use snapdir_core::manifest::{Manifest, ManifestEntry};
206    use snapdir_core::merkle::{Blake3Hasher, Hasher};
207    use snapdir_core::store::Store;
208    use std::fs;
209    use std::path::{Path, PathBuf};
210    use std::sync::Mutex;
211
212    /// A tiny temp-dir helper so tests don't pull in a dev-dependency. Creates a
213    /// unique directory under the system temp dir and removes it on drop.
214    struct TempDir {
215        path: PathBuf,
216    }
217
218    impl TempDir {
219        fn new(tag: &str) -> Self {
220            use std::sync::atomic::AtomicU64;
221            static COUNTER: AtomicU64 = AtomicU64::new(0);
222            let n = COUNTER.fetch_add(1, Ordering::Relaxed);
223            let path = std::env::temp_dir().join(format!(
224                "snapdir-sync-test-{}-{tag}-{n}",
225                std::process::id()
226            ));
227            fs::create_dir_all(&path).expect("create temp dir");
228            Self { path }
229        }
230
231        fn path(&self) -> &Path {
232            &self.path
233        }
234    }
235
236    impl Drop for TempDir {
237        fn drop(&mut self) {
238            let _ = fs::remove_dir_all(&self.path);
239        }
240    }
241
242    /// Builds a small multi-file source tree (`a`, `b`, `c`) under `source` and
243    /// returns its manifest + snapshot id. Checksums are the real BLAKE3 of the
244    /// file bytes so store verification passes.
245    fn make_source(source: &Path) -> (Manifest, String) {
246        let hasher = Blake3Hasher::new();
247        let files: [(&str, &[u8]); 3] = [("a", b"alpha\n"), ("b", b"bravo\n"), ("c", b"charlie\n")];
248        let mut sums: Vec<(String, String, u64)> = Vec::new();
249        for (name, bytes) in files {
250            fs::write(source.join(name), bytes).unwrap();
251            sums.push((
252                (*name).to_owned(),
253                hasher.hash_hex(bytes),
254                bytes.len() as u64,
255            ));
256        }
257        let root_sum = snapdir_core::merkle::directory_checksum(
258            sums.iter().map(|(_, s, _)| s.as_str()),
259            &hasher,
260        );
261
262        let mut entries = vec![ManifestEntry::new(
263            PathType::Directory,
264            "700",
265            root_sum,
266            0,
267            "./",
268        )];
269        for (name, sum, size) in &sums {
270            entries.push(ManifestEntry::new(
271                PathType::File,
272                "600",
273                sum.clone(),
274                *size,
275                format!("./{name}"),
276            ));
277        }
278        let manifest = Manifest::from_entries(entries);
279        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
280        (manifest, id)
281    }
282
283    /// The number of File objects in `manifest`.
284    fn object_count(manifest: &Manifest) -> usize {
285        manifest
286            .entries()
287            .iter()
288            .filter(|e| e.path_type == PathType::File)
289            .count()
290    }
291
292    fn cfg() -> TransferConfig {
293        TransferConfig::new(4, None)
294    }
295
296    #[test]
297    fn sync_snapshot_mirrors_snapshot() {
298        let a_dir = TempDir::new("a");
299        let b_dir = TempDir::new("b");
300        let src_dir = TempDir::new("src");
301        let (manifest, id) = make_source(src_dir.path());
302        let n = object_count(&manifest);
303
304        let a = FileStore::from_root(a_dir.path());
305        let b = FileStore::from_root(b_dir.path());
306        a.push(&manifest, src_dir.path()).expect("stage into A");
307
308        let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
309
310        assert_eq!(report.objects_copied, n);
311        assert_eq!(report.objects_skipped, 0);
312        assert!(!report.dry_run);
313        // B has the manifest and every object.
314        b.get_manifest(&id).expect("B has manifest");
315        for entry in manifest.entries() {
316            if entry.path_type == PathType::File {
317                assert!(
318                    b.has_object(&entry.checksum).expect("has_object ok"),
319                    "B missing object {}",
320                    entry.checksum
321                );
322            }
323        }
324    }
325
326    #[test]
327    fn meter_records_sync() {
328        // A multi-object snapshot synced A -> empty B records bytes-in ==
329        // bytes-out == total object bytes, objects_done == N, skipped == 0; a
330        // second sync into the now-populated B records the fast-path /
331        // skip-everything outcome (no copies).
332        let a_dir = TempDir::new("a");
333        let b_dir = TempDir::new("b");
334        let src_dir = TempDir::new("src");
335        let (manifest, id) = make_source(src_dir.path());
336        let n = object_count(&manifest);
337
338        // Total File-object bytes from the manifest sizes.
339        let total_bytes: u64 = manifest
340            .entries()
341            .iter()
342            .filter(|e| e.path_type == PathType::File)
343            .map(|e| e.size)
344            .sum();
345
346        let a = FileStore::from_root(a_dir.path());
347        let b = FileStore::from_root(b_dir.path());
348        a.push(&manifest, src_dir.path()).expect("stage into A");
349
350        let meter = Arc::new(Meter::new());
351        let report =
352            sync_snapshot(&a, &b, &id, &cfg(), false, Some(&meter)).expect("first meter sync");
353        assert_eq!(report.objects_copied, n);
354
355        let snap = meter.snapshot();
356        assert_eq!(snap.bytes_in, total_bytes, "bytes_in == total object bytes");
357        assert_eq!(
358            snap.bytes_out, total_bytes,
359            "bytes_out == total object bytes"
360        );
361        assert_eq!(snap.objects_done, n as u64, "objects_done == N");
362        assert_eq!(snap.objects_skipped, 0, "nothing skipped on a fresh dest");
363        assert_eq!(snap.objects_total, total_bytes, "total == bytes total");
364        assert_eq!(snap.in_flight, 0, "no objects left in flight");
365        assert_eq!(snap.phase, Phase::Transfer, "phase set to Transfer");
366
367        // Second sync into the now-fully-mirrored B. The fast path (dest has the
368        // manifest) short-circuits, so this records no new copies. Pre-seed every
369        // object into a fresh B' WITHOUT its manifest to exercise the per-object
370        // skip branch and assert objects_skipped == N, objects_done == 0.
371        let seed_dir = TempDir::new("seed");
372        let seeded = FileStore::from_root(seed_dir.path());
373        for entry in manifest.entries() {
374            if entry.path_type == PathType::File {
375                let blob = a.get_object(&entry.checksum).expect("get from A");
376                seeded.put_object(&entry.checksum, blob).expect("seed dest");
377            }
378        }
379        let later = Arc::new(Meter::new());
380        let later_report = sync_snapshot(&a, &seeded, &id, &cfg(), false, Some(&later))
381            .expect("second meter sync");
382        assert_eq!(
383            later_report.objects_skipped, n,
384            "all objects already present"
385        );
386        let later_snap = later.snapshot();
387        assert_eq!(later_snap.objects_skipped, n as u64, "meter skipped == N");
388        assert_eq!(later_snap.objects_done, 0, "no objects copied");
389        assert_eq!(later_snap.bytes_in, 0, "no bytes read");
390        assert_eq!(later_snap.bytes_out, 0, "no bytes written");
391    }
392
393    #[test]
394    fn sync_snapshot_skip_present_is_incremental() {
395        let a_dir = TempDir::new("a");
396        let b_dir = TempDir::new("b");
397        let src_dir = TempDir::new("src");
398        let (manifest, id) = make_source(src_dir.path());
399        let n = object_count(&manifest);
400
401        let a = FileStore::from_root(a_dir.path());
402        let b = FileStore::from_root(b_dir.path());
403        a.push(&manifest, src_dir.path()).expect("stage into A");
404
405        let first = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("first sync");
406        assert_eq!(first.objects_copied, n);
407
408        // Second run: destination already mirrored → fast path returns a
409        // zero-transfer report; B is unchanged.
410        let second = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("second sync");
411        assert_eq!(second.objects_copied, 0);
412        assert_eq!(second.objects_skipped, 0);
413        assert_eq!(second.bytes_copied, 0);
414        b.get_manifest(&id).expect("B still has manifest");
415    }
416
417    #[test]
418    fn sync_snapshot_skip_present_per_object() {
419        // Pre-seed one object into B (but NOT B's manifest), so the fast path
420        // does not trigger and we exercise the per-object skip branch.
421        let a_dir = TempDir::new("a");
422        let b_dir = TempDir::new("b");
423        let src_dir = TempDir::new("src");
424        let (manifest, id) = make_source(src_dir.path());
425        let n = object_count(&manifest);
426
427        let a = FileStore::from_root(a_dir.path());
428        let b = FileStore::from_root(b_dir.path());
429        a.push(&manifest, src_dir.path()).expect("stage into A");
430
431        // Copy one object from A into B directly.
432        let first_obj = manifest
433            .entries()
434            .iter()
435            .find(|e| e.path_type == PathType::File)
436            .unwrap();
437        let blob = a.get_object(&first_obj.checksum).expect("get from A");
438        b.put_object(&first_obj.checksum, blob).expect("seed B");
439
440        let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
441        assert_eq!(report.objects_copied, n - 1);
442        assert_eq!(report.objects_skipped, 1);
443        b.get_manifest(&id).expect("B has manifest after sync");
444    }
445
446    /// A dest store that wraps a [`FileStore`] but fails `put_object` for one
447    /// chosen checksum, to drive the all-or-nothing path.
448    struct FailingPutStore {
449        inner: FileStore,
450        fail_on: String,
451        // Records which checksums were attempted, for sanity.
452        attempted: Mutex<Vec<String>>,
453    }
454
455    impl Store for FailingPutStore {
456        fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
457            self.inner.get_manifest(id)
458        }
459        fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
460            self.inner.fetch_files(manifest, dest)
461        }
462        fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
463            self.inner.push(manifest, source)
464        }
465    }
466
467    impl StreamStore for FailingPutStore {
468        fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
469            self.inner.has_object(checksum)
470        }
471        fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
472            self.inner.get_object(checksum)
473        }
474        fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
475            self.attempted.lock().unwrap().push(checksum.to_owned());
476            if checksum == self.fail_on {
477                return Err(StoreError::Backend {
478                    message: "synthetic put_object failure".to_owned(),
479                    source: None,
480                });
481            }
482            self.inner.put_object(checksum, bytes)
483        }
484        fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
485            self.inner.put_manifest(id, manifest)
486        }
487    }
488
489    #[test]
490    fn sync_snapshot_all_or_nothing() {
491        let a_dir = TempDir::new("a");
492        let b_dir = TempDir::new("b");
493        let src_dir = TempDir::new("src");
494        let (manifest, id) = make_source(src_dir.path());
495
496        let a = FileStore::from_root(a_dir.path());
497        a.push(&manifest, src_dir.path()).expect("stage into A");
498
499        // Pick a checksum to fail on.
500        let fail_on = manifest
501            .entries()
502            .iter()
503            .find(|e| e.path_type == PathType::File)
504            .unwrap()
505            .checksum
506            .clone();
507
508        let b = FailingPutStore {
509            inner: FileStore::from_root(b_dir.path()),
510            fail_on,
511            attempted: Mutex::new(Vec::new()),
512        };
513
514        // Concurrency 1 keeps the failure deterministic.
515        let one = TransferConfig::new(1, None);
516        let err =
517            sync_snapshot(&a, &b, &id, &one, false, None).expect_err("must surface put error");
518        assert!(
519            matches!(err, StoreError::Backend { ref message, .. } if message.contains("synthetic")),
520            "unexpected error: {err:?}"
521        );
522        // NO manifest written to the dest.
523        assert!(
524            b.get_manifest(&id).is_err(),
525            "dest must have no manifest after a failed sync"
526        );
527    }
528
529    #[test]
530    fn sync_snapshot_dry_run_writes_nothing() {
531        let a_dir = TempDir::new("a");
532        let b_dir = TempDir::new("b");
533        let src_dir = TempDir::new("src");
534        let (manifest, id) = make_source(src_dir.path());
535        let n = object_count(&manifest);
536
537        let a = FileStore::from_root(a_dir.path());
538        let b = FileStore::from_root(b_dir.path());
539        a.push(&manifest, src_dir.path()).expect("stage into A");
540
541        let report = sync_snapshot(&a, &b, &id, &cfg(), true, None).expect("dry run ok");
542        assert!(report.dry_run);
543        assert_eq!(report.objects_copied, n, "would-copy count is N");
544        assert_eq!(report.objects_skipped, 0);
545        assert_eq!(report.bytes_copied, 0);
546
547        // B has NO manifest and NO objects.
548        assert!(b.get_manifest(&id).is_err(), "dry run wrote a manifest");
549        for entry in manifest.entries() {
550            if entry.path_type == PathType::File {
551                assert!(
552                    !b.has_object(&entry.checksum).expect("has_object ok"),
553                    "dry run wrote an object"
554                );
555            }
556        }
557    }
558
559    #[test]
560    fn sync_snapshot_no_local_fs() {
561        // Hold A and B under one parent tempdir and assert sync creates NOTHING
562        // outside A's and B's store dirs (no scratch/cache). The structural
563        // guarantee is that sync_snapshot takes no &Path; this test backs it up.
564        let parent = TempDir::new("parent");
565        let a_root = parent.path().join("store-a");
566        let b_root = parent.path().join("store-b");
567        let src = parent.path().join("src");
568        fs::create_dir_all(&a_root).unwrap();
569        fs::create_dir_all(&b_root).unwrap();
570        fs::create_dir_all(&src).unwrap();
571
572        let (manifest, id) = make_source(&src);
573
574        let a = FileStore::from_root(&a_root);
575        let b = FileStore::from_root(&b_root);
576        a.push(&manifest, &src).expect("stage into A");
577
578        // Snapshot the set of top-level entries under parent before sync.
579        let before: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
580            .unwrap()
581            .map(|e| e.unwrap().path())
582            .collect();
583
584        sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
585
586        let after: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
587            .unwrap()
588            .map(|e| e.unwrap().path())
589            .collect();
590
591        assert_eq!(
592            before,
593            after,
594            "sync_snapshot created an entry outside the store dirs: {:?}",
595            after.difference(&before).collect::<Vec<_>>()
596        );
597    }
598}