1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
39use std::sync::Arc;
40
41use snapdir_core::manifest::PathType;
42use snapdir_core::store::StoreError;
43use snapdir_core::{Meter, Phase};
44
45use crate::stream::StreamStore;
46use crate::transfer::{BlockingRateLimiter, TransferConfig};
47
48#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct SyncReport {
55 pub objects_copied: usize,
58 pub objects_skipped: usize,
60 pub bytes_copied: u64,
62 pub dry_run: bool,
64}
65
66pub fn sync_snapshot(
85 from: &(dyn StreamStore + Sync),
86 to: &(dyn StreamStore + Sync),
87 id: &str,
88 config: &TransferConfig,
89 dry_run: bool,
90 meter: Option<&Meter>,
91) -> Result<SyncReport, StoreError> {
92 if to.get_manifest(id).is_ok() {
95 return Ok(SyncReport {
96 objects_copied: 0,
97 objects_skipped: 0,
98 bytes_copied: 0,
99 dry_run,
100 });
101 }
102
103 let manifest = from.get_manifest(id)?;
105
106 let files: Vec<&str> = manifest
109 .entries()
110 .iter()
111 .filter(|e| e.path_type == PathType::File)
112 .map(|e| e.checksum.as_str())
113 .collect();
114
115 if let Some(m) = meter {
119 m.set_phase(Phase::Transfer);
120 let total: u64 = manifest
121 .entries()
122 .iter()
123 .filter(|e| e.path_type == PathType::File)
124 .map(|e| e.size)
125 .sum();
126 m.set_total(total);
127 }
128
129 let copied = AtomicUsize::new(0);
130 let skipped = AtomicUsize::new(0);
131 let bytes = AtomicU64::new(0);
132
133 let limiter = Arc::new(BlockingRateLimiter::new(config.max_bytes_per_sec));
136
137 if !files.is_empty() {
138 let pool = rayon::ThreadPoolBuilder::new()
139 .num_threads(config.concurrency.get())
140 .build()
141 .map_err(|err| StoreError::Backend {
142 message: "failed to build sync thread pool".to_owned(),
143 source: Some(Box::new(err)),
144 })?;
145
146 pool.install(|| {
147 use rayon::prelude::*;
148 files.par_iter().try_for_each(|checksum| {
149 if to.has_object(checksum)? {
150 skipped.fetch_add(1, Ordering::Relaxed);
151 if let Some(m) = meter {
152 m.add_skipped(1);
153 }
154 return Ok(());
155 }
156 if dry_run {
157 copied.fetch_add(1, Ordering::Relaxed);
159 return Ok(());
160 }
161 if let Some(m) = meter {
164 m.object_started();
165 }
166 let blob = from.get_object(checksum)?;
167 let len = blob.len() as u64;
168 if let Some(m) = meter {
170 m.add_in(len);
171 }
172 limiter.acquire_blocking(len);
173 to.put_object(checksum, blob)?;
174 if let Some(m) = meter {
176 m.add_out(len);
177 m.object_finished();
178 }
179 copied.fetch_add(1, Ordering::Relaxed);
180 bytes.fetch_add(len, Ordering::Relaxed);
181 Ok::<(), StoreError>(())
182 })
183 })?;
184 }
185
186 if !dry_run {
190 to.put_manifest(id, &manifest)?;
191 }
192
193 Ok(SyncReport {
194 objects_copied: copied.into_inner(),
195 objects_skipped: skipped.into_inner(),
196 bytes_copied: bytes.into_inner(),
197 dry_run,
198 })
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::file_store::FileStore;
205 use snapdir_core::manifest::{Manifest, ManifestEntry};
206 use snapdir_core::merkle::{Blake3Hasher, Hasher};
207 use snapdir_core::store::Store;
208 use std::fs;
209 use std::path::{Path, PathBuf};
210 use std::sync::Mutex;
211
212 struct TempDir {
215 path: PathBuf,
216 }
217
218 impl TempDir {
219 fn new(tag: &str) -> Self {
220 use std::sync::atomic::AtomicU64;
221 static COUNTER: AtomicU64 = AtomicU64::new(0);
222 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
223 let path = std::env::temp_dir().join(format!(
224 "snapdir-sync-test-{}-{tag}-{n}",
225 std::process::id()
226 ));
227 fs::create_dir_all(&path).expect("create temp dir");
228 Self { path }
229 }
230
231 fn path(&self) -> &Path {
232 &self.path
233 }
234 }
235
236 impl Drop for TempDir {
237 fn drop(&mut self) {
238 let _ = fs::remove_dir_all(&self.path);
239 }
240 }
241
242 fn make_source(source: &Path) -> (Manifest, String) {
246 let hasher = Blake3Hasher::new();
247 let files: [(&str, &[u8]); 3] = [("a", b"alpha\n"), ("b", b"bravo\n"), ("c", b"charlie\n")];
248 let mut sums: Vec<(String, String, u64)> = Vec::new();
249 for (name, bytes) in files {
250 fs::write(source.join(name), bytes).unwrap();
251 sums.push((
252 (*name).to_owned(),
253 hasher.hash_hex(bytes),
254 bytes.len() as u64,
255 ));
256 }
257 let root_sum = snapdir_core::merkle::directory_checksum(
258 sums.iter().map(|(_, s, _)| s.as_str()),
259 &hasher,
260 );
261
262 let mut entries = vec![ManifestEntry::new(
263 PathType::Directory,
264 "700",
265 root_sum,
266 0,
267 "./",
268 )];
269 for (name, sum, size) in &sums {
270 entries.push(ManifestEntry::new(
271 PathType::File,
272 "600",
273 sum.clone(),
274 *size,
275 format!("./{name}"),
276 ));
277 }
278 let manifest = Manifest::from_entries(entries);
279 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
280 (manifest, id)
281 }
282
283 fn object_count(manifest: &Manifest) -> usize {
285 manifest
286 .entries()
287 .iter()
288 .filter(|e| e.path_type == PathType::File)
289 .count()
290 }
291
292 fn cfg() -> TransferConfig {
293 TransferConfig::new(4, None)
294 }
295
296 #[test]
297 fn sync_snapshot_mirrors_snapshot() {
298 let a_dir = TempDir::new("a");
299 let b_dir = TempDir::new("b");
300 let src_dir = TempDir::new("src");
301 let (manifest, id) = make_source(src_dir.path());
302 let n = object_count(&manifest);
303
304 let a = FileStore::from_root(a_dir.path());
305 let b = FileStore::from_root(b_dir.path());
306 a.push(&manifest, src_dir.path()).expect("stage into A");
307
308 let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
309
310 assert_eq!(report.objects_copied, n);
311 assert_eq!(report.objects_skipped, 0);
312 assert!(!report.dry_run);
313 b.get_manifest(&id).expect("B has manifest");
315 for entry in manifest.entries() {
316 if entry.path_type == PathType::File {
317 assert!(
318 b.has_object(&entry.checksum).expect("has_object ok"),
319 "B missing object {}",
320 entry.checksum
321 );
322 }
323 }
324 }
325
326 #[test]
327 fn meter_records_sync() {
328 let a_dir = TempDir::new("a");
333 let b_dir = TempDir::new("b");
334 let src_dir = TempDir::new("src");
335 let (manifest, id) = make_source(src_dir.path());
336 let n = object_count(&manifest);
337
338 let total_bytes: u64 = manifest
340 .entries()
341 .iter()
342 .filter(|e| e.path_type == PathType::File)
343 .map(|e| e.size)
344 .sum();
345
346 let a = FileStore::from_root(a_dir.path());
347 let b = FileStore::from_root(b_dir.path());
348 a.push(&manifest, src_dir.path()).expect("stage into A");
349
350 let meter = Arc::new(Meter::new());
351 let report =
352 sync_snapshot(&a, &b, &id, &cfg(), false, Some(&meter)).expect("first meter sync");
353 assert_eq!(report.objects_copied, n);
354
355 let snap = meter.snapshot();
356 assert_eq!(snap.bytes_in, total_bytes, "bytes_in == total object bytes");
357 assert_eq!(
358 snap.bytes_out, total_bytes,
359 "bytes_out == total object bytes"
360 );
361 assert_eq!(snap.objects_done, n as u64, "objects_done == N");
362 assert_eq!(snap.objects_skipped, 0, "nothing skipped on a fresh dest");
363 assert_eq!(snap.objects_total, total_bytes, "total == bytes total");
364 assert_eq!(snap.in_flight, 0, "no objects left in flight");
365 assert_eq!(snap.phase, Phase::Transfer, "phase set to Transfer");
366
367 let seed_dir = TempDir::new("seed");
372 let seeded = FileStore::from_root(seed_dir.path());
373 for entry in manifest.entries() {
374 if entry.path_type == PathType::File {
375 let blob = a.get_object(&entry.checksum).expect("get from A");
376 seeded.put_object(&entry.checksum, blob).expect("seed dest");
377 }
378 }
379 let later = Arc::new(Meter::new());
380 let later_report = sync_snapshot(&a, &seeded, &id, &cfg(), false, Some(&later))
381 .expect("second meter sync");
382 assert_eq!(
383 later_report.objects_skipped, n,
384 "all objects already present"
385 );
386 let later_snap = later.snapshot();
387 assert_eq!(later_snap.objects_skipped, n as u64, "meter skipped == N");
388 assert_eq!(later_snap.objects_done, 0, "no objects copied");
389 assert_eq!(later_snap.bytes_in, 0, "no bytes read");
390 assert_eq!(later_snap.bytes_out, 0, "no bytes written");
391 }
392
393 #[test]
394 fn sync_snapshot_skip_present_is_incremental() {
395 let a_dir = TempDir::new("a");
396 let b_dir = TempDir::new("b");
397 let src_dir = TempDir::new("src");
398 let (manifest, id) = make_source(src_dir.path());
399 let n = object_count(&manifest);
400
401 let a = FileStore::from_root(a_dir.path());
402 let b = FileStore::from_root(b_dir.path());
403 a.push(&manifest, src_dir.path()).expect("stage into A");
404
405 let first = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("first sync");
406 assert_eq!(first.objects_copied, n);
407
408 let second = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("second sync");
411 assert_eq!(second.objects_copied, 0);
412 assert_eq!(second.objects_skipped, 0);
413 assert_eq!(second.bytes_copied, 0);
414 b.get_manifest(&id).expect("B still has manifest");
415 }
416
417 #[test]
418 fn sync_snapshot_skip_present_per_object() {
419 let a_dir = TempDir::new("a");
422 let b_dir = TempDir::new("b");
423 let src_dir = TempDir::new("src");
424 let (manifest, id) = make_source(src_dir.path());
425 let n = object_count(&manifest);
426
427 let a = FileStore::from_root(a_dir.path());
428 let b = FileStore::from_root(b_dir.path());
429 a.push(&manifest, src_dir.path()).expect("stage into A");
430
431 let first_obj = manifest
433 .entries()
434 .iter()
435 .find(|e| e.path_type == PathType::File)
436 .unwrap();
437 let blob = a.get_object(&first_obj.checksum).expect("get from A");
438 b.put_object(&first_obj.checksum, blob).expect("seed B");
439
440 let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
441 assert_eq!(report.objects_copied, n - 1);
442 assert_eq!(report.objects_skipped, 1);
443 b.get_manifest(&id).expect("B has manifest after sync");
444 }
445
446 struct FailingPutStore {
449 inner: FileStore,
450 fail_on: String,
451 attempted: Mutex<Vec<String>>,
453 }
454
455 impl Store for FailingPutStore {
456 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
457 self.inner.get_manifest(id)
458 }
459 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
460 self.inner.fetch_files(manifest, dest)
461 }
462 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
463 self.inner.push(manifest, source)
464 }
465 }
466
467 impl StreamStore for FailingPutStore {
468 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
469 self.inner.has_object(checksum)
470 }
471 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
472 self.inner.get_object(checksum)
473 }
474 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
475 self.attempted.lock().unwrap().push(checksum.to_owned());
476 if checksum == self.fail_on {
477 return Err(StoreError::Backend {
478 message: "synthetic put_object failure".to_owned(),
479 source: None,
480 });
481 }
482 self.inner.put_object(checksum, bytes)
483 }
484 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
485 self.inner.put_manifest(id, manifest)
486 }
487 }
488
489 #[test]
490 fn sync_snapshot_all_or_nothing() {
491 let a_dir = TempDir::new("a");
492 let b_dir = TempDir::new("b");
493 let src_dir = TempDir::new("src");
494 let (manifest, id) = make_source(src_dir.path());
495
496 let a = FileStore::from_root(a_dir.path());
497 a.push(&manifest, src_dir.path()).expect("stage into A");
498
499 let fail_on = manifest
501 .entries()
502 .iter()
503 .find(|e| e.path_type == PathType::File)
504 .unwrap()
505 .checksum
506 .clone();
507
508 let b = FailingPutStore {
509 inner: FileStore::from_root(b_dir.path()),
510 fail_on,
511 attempted: Mutex::new(Vec::new()),
512 };
513
514 let one = TransferConfig::new(1, None);
516 let err =
517 sync_snapshot(&a, &b, &id, &one, false, None).expect_err("must surface put error");
518 assert!(
519 matches!(err, StoreError::Backend { ref message, .. } if message.contains("synthetic")),
520 "unexpected error: {err:?}"
521 );
522 assert!(
524 b.get_manifest(&id).is_err(),
525 "dest must have no manifest after a failed sync"
526 );
527 }
528
529 #[test]
530 fn sync_snapshot_dry_run_writes_nothing() {
531 let a_dir = TempDir::new("a");
532 let b_dir = TempDir::new("b");
533 let src_dir = TempDir::new("src");
534 let (manifest, id) = make_source(src_dir.path());
535 let n = object_count(&manifest);
536
537 let a = FileStore::from_root(a_dir.path());
538 let b = FileStore::from_root(b_dir.path());
539 a.push(&manifest, src_dir.path()).expect("stage into A");
540
541 let report = sync_snapshot(&a, &b, &id, &cfg(), true, None).expect("dry run ok");
542 assert!(report.dry_run);
543 assert_eq!(report.objects_copied, n, "would-copy count is N");
544 assert_eq!(report.objects_skipped, 0);
545 assert_eq!(report.bytes_copied, 0);
546
547 assert!(b.get_manifest(&id).is_err(), "dry run wrote a manifest");
549 for entry in manifest.entries() {
550 if entry.path_type == PathType::File {
551 assert!(
552 !b.has_object(&entry.checksum).expect("has_object ok"),
553 "dry run wrote an object"
554 );
555 }
556 }
557 }
558
559 #[test]
560 fn sync_snapshot_no_local_fs() {
561 let parent = TempDir::new("parent");
565 let a_root = parent.path().join("store-a");
566 let b_root = parent.path().join("store-b");
567 let src = parent.path().join("src");
568 fs::create_dir_all(&a_root).unwrap();
569 fs::create_dir_all(&b_root).unwrap();
570 fs::create_dir_all(&src).unwrap();
571
572 let (manifest, id) = make_source(&src);
573
574 let a = FileStore::from_root(&a_root);
575 let b = FileStore::from_root(&b_root);
576 a.push(&manifest, &src).expect("stage into A");
577
578 let before: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
580 .unwrap()
581 .map(|e| e.unwrap().path())
582 .collect();
583
584 sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
585
586 let after: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
587 .unwrap()
588 .map(|e| e.unwrap().path())
589 .collect();
590
591 assert_eq!(
592 before,
593 after,
594 "sync_snapshot created an entry outside the store dirs: {:?}",
595 after.difference(&before).collect::<Vec<_>>()
596 );
597 }
598}