1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
39use std::sync::Arc;
40
41use snapdir_core::manifest::PathType;
42use snapdir_core::store::StoreError;
43use snapdir_core::{Meter, Phase};
44
45use crate::adaptive::{
46 p95_object_size, AdaptiveGate, AdaptivePolicy as ControllerPolicy, ControllerDriver, OpResult,
47 OpSample,
48};
49use crate::stream::StreamStore;
50use crate::transfer::{classify_error, AdaptivePolicy, BlockingRateLimiter, TransferConfig};
51
52#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct SyncReport {
59 pub objects_copied: usize,
62 pub objects_skipped: usize,
64 pub bytes_copied: u64,
66 pub dry_run: bool,
68}
69
70#[allow(clippy::too_many_lines)]
89pub fn sync_snapshot(
90 from: &(dyn StreamStore + Sync),
91 to: &(dyn StreamStore + Sync),
92 id: &str,
93 config: &TransferConfig,
94 dry_run: bool,
95 meter: Option<&Meter>,
96) -> Result<SyncReport, StoreError> {
97 if to.get_manifest(id).is_ok() {
100 return Ok(SyncReport {
101 objects_copied: 0,
102 objects_skipped: 0,
103 bytes_copied: 0,
104 dry_run,
105 });
106 }
107
108 let manifest = from.get_manifest(id)?;
110
111 let files: Vec<&str> = manifest
114 .entries()
115 .iter()
116 .filter(|e| e.path_type == PathType::File)
117 .map(|e| e.checksum.as_str())
118 .collect();
119 let object_sizes: Vec<u64> = manifest
122 .entries()
123 .iter()
124 .filter(|e| e.path_type == PathType::File)
125 .map(|e| e.size)
126 .collect();
127
128 if let Some(m) = meter {
132 m.set_phase(Phase::Transfer);
133 let total: u64 = manifest
134 .entries()
135 .iter()
136 .filter(|e| e.path_type == PathType::File)
137 .map(|e| e.size)
138 .sum();
139 m.set_total(total);
140 }
141
142 let copied = AtomicUsize::new(0);
143 let skipped = AtomicUsize::new(0);
144 let bytes = AtomicU64::new(0);
145
146 let limiter = Arc::new(BlockingRateLimiter::new(config.max_bytes_per_sec));
149
150 if !files.is_empty() {
151 let copy_one = |checksum: &str, report: &dyn Fn(OpSample)| -> Result<(), StoreError> {
156 if to.has_object(checksum)? {
157 skipped.fetch_add(1, Ordering::Relaxed);
158 if let Some(m) = meter {
159 m.add_skipped(1);
160 }
161 return Ok(());
162 }
163 if dry_run {
164 copied.fetch_add(1, Ordering::Relaxed);
166 return Ok(());
167 }
168 if let Some(m) = meter {
171 m.object_started();
172 }
173 let started = std::time::Instant::now();
174 let outcome = (|| {
175 let blob = from.get_object(checksum)?;
176 let len = blob.len() as u64;
177 if let Some(m) = meter {
179 m.add_in(len);
180 }
181 limiter.acquire_blocking(len);
182 to.put_object(checksum, blob)?;
183 Ok::<u64, StoreError>(len)
184 })();
185 let latency = started.elapsed();
186 match &outcome {
187 Ok(len) => report(OpSample {
188 bytes: *len,
189 latency,
190 result: OpResult::Ok,
191 }),
192 Err(err) => report(OpSample {
193 bytes: 0,
194 latency,
195 result: classify_error(err),
196 }),
197 }
198 let len = outcome?;
199 if let Some(m) = meter {
201 m.add_out(len);
202 m.object_finished();
203 }
204 copied.fetch_add(1, Ordering::Relaxed);
205 bytes.fetch_add(len, Ordering::Relaxed);
206 Ok(())
207 };
208
209 match config.adaptive {
210 AdaptivePolicy::Off => {
211 let pool = rayon::ThreadPoolBuilder::new()
212 .num_threads(config.concurrency.get())
213 .build()
214 .map_err(|err| StoreError::Backend {
215 message: "failed to build sync thread pool".to_owned(),
216 source: Some(Box::new(err)),
217 })?;
218 let noop = |_: OpSample| {};
219 pool.install(|| {
220 use rayon::prelude::*;
221 files
222 .par_iter()
223 .try_for_each(|checksum| copy_one(checksum, &noop))
224 })?;
225 }
226 AdaptivePolicy::On { fraction, ceiling } => {
227 sync_objects_adaptive(
228 &files,
229 &object_sizes,
230 config,
231 &limiter,
232 meter,
233 fraction,
234 ceiling,
235 ©_one,
236 )?;
237 }
238 }
239 }
240
241 if !dry_run {
245 to.put_manifest(id, &manifest)?;
246 }
247
248 Ok(SyncReport {
249 objects_copied: copied.into_inner(),
250 objects_skipped: skipped.into_inner(),
251 bytes_copied: bytes.into_inner(),
252 dry_run,
253 })
254}
255
256#[allow(clippy::too_many_arguments)]
264fn sync_objects_adaptive<C>(
265 files: &[&str],
266 object_sizes: &[u64],
267 config: &TransferConfig,
268 limiter: &Arc<BlockingRateLimiter>,
269 meter: Option<&Meter>,
270 fraction: f64,
271 ceiling: usize,
272 copy_one: &C,
273) -> Result<(), StoreError>
274where
275 C: Fn(&str, &dyn Fn(OpSample)) -> Result<(), StoreError> + Sync,
276{
277 use rayon::prelude::*;
278
279 let p95 = p95_object_size(object_sizes);
280 let total_ram = snapdir_core::resources::total_ram_bytes().unwrap_or(0);
281 let policy = ControllerPolicy::new(fraction, ceiling, total_ram, config.max_bytes_per_sec);
282
283 let gate = AdaptiveGate::new(config.concurrency.get(), ceiling);
284
285 let blocking_limiter = Arc::clone(limiter);
287 let rate_applier: Arc<dyn Fn(Option<u64>) + Send + Sync> =
288 Arc::new(move |rate| blocking_limiter.set_rate(rate));
289 let _ = meter;
294 let driver = ControllerDriver::new(policy, gate.clone(), p95, Some(rate_applier), None);
295
296 let stop = Arc::new(AtomicBool::new(false));
298 let tick_driver = driver.clone();
299 let tick_stop = Arc::clone(&stop);
300 let ticker = std::thread::spawn(move || {
301 while !tick_stop.load(Ordering::Relaxed) {
302 std::thread::sleep(std::time::Duration::from_millis(250));
303 if tick_stop.load(Ordering::Relaxed) {
304 break;
305 }
306 tick_driver.tick();
307 }
308 });
309
310 let pool = rayon::ThreadPoolBuilder::new()
311 .num_threads(ceiling.max(1))
312 .build()
313 .map_err(|err| StoreError::Backend {
314 message: "failed to build sync thread pool".to_owned(),
315 source: Some(Box::new(err)),
316 })?;
317
318 let result = pool.install(|| {
319 files.par_iter().try_for_each(|checksum| {
320 let _permit = gate.acquire_blocking();
322 let report = |sample: OpSample| driver.record_op(sample);
323 copy_one(checksum, &report)
324 })
325 });
326
327 stop.store(true, Ordering::Relaxed);
328 let _ = ticker.join();
329 result
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use crate::file_store::FileStore;
336 use snapdir_core::manifest::{Manifest, ManifestEntry};
337 use snapdir_core::merkle::{Blake3Hasher, Hasher};
338 use snapdir_core::store::Store;
339 use std::fs;
340 use std::path::{Path, PathBuf};
341 use std::sync::Mutex;
342
343 struct TempDir {
346 path: PathBuf,
347 }
348
349 impl TempDir {
350 fn new(tag: &str) -> Self {
351 use std::sync::atomic::AtomicU64;
352 static COUNTER: AtomicU64 = AtomicU64::new(0);
353 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
354 let path = std::env::temp_dir().join(format!(
355 "snapdir-sync-test-{}-{tag}-{n}",
356 std::process::id()
357 ));
358 fs::create_dir_all(&path).expect("create temp dir");
359 Self { path }
360 }
361
362 fn path(&self) -> &Path {
363 &self.path
364 }
365 }
366
367 impl Drop for TempDir {
368 fn drop(&mut self) {
369 let _ = fs::remove_dir_all(&self.path);
370 }
371 }
372
373 fn make_source(source: &Path) -> (Manifest, String) {
377 let hasher = Blake3Hasher::new();
378 let files: [(&str, &[u8]); 3] = [("a", b"alpha\n"), ("b", b"bravo\n"), ("c", b"charlie\n")];
379 let mut sums: Vec<(String, String, u64)> = Vec::new();
380 for (name, bytes) in files {
381 fs::write(source.join(name), bytes).unwrap();
382 sums.push((
383 (*name).to_owned(),
384 hasher.hash_hex(bytes),
385 bytes.len() as u64,
386 ));
387 }
388 let root_sum = snapdir_core::merkle::directory_checksum(
389 sums.iter().map(|(_, s, _)| s.as_str()),
390 &hasher,
391 );
392
393 let mut entries = vec![ManifestEntry::new(
394 PathType::Directory,
395 "700",
396 root_sum,
397 0,
398 "./",
399 )];
400 for (name, sum, size) in &sums {
401 entries.push(ManifestEntry::new(
402 PathType::File,
403 "600",
404 sum.clone(),
405 *size,
406 format!("./{name}"),
407 ));
408 }
409 let manifest = Manifest::from_entries(entries);
410 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
411 (manifest, id)
412 }
413
414 fn object_count(manifest: &Manifest) -> usize {
416 manifest
417 .entries()
418 .iter()
419 .filter(|e| e.path_type == PathType::File)
420 .count()
421 }
422
423 fn cfg() -> TransferConfig {
424 TransferConfig::new(4, None)
425 }
426
427 #[test]
428 fn sync_snapshot_mirrors_snapshot() {
429 let a_dir = TempDir::new("a");
430 let b_dir = TempDir::new("b");
431 let src_dir = TempDir::new("src");
432 let (manifest, id) = make_source(src_dir.path());
433 let n = object_count(&manifest);
434
435 let a = FileStore::from_root(a_dir.path());
436 let b = FileStore::from_root(b_dir.path());
437 a.push(&manifest, src_dir.path()).expect("stage into A");
438
439 let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
440
441 assert_eq!(report.objects_copied, n);
442 assert_eq!(report.objects_skipped, 0);
443 assert!(!report.dry_run);
444 b.get_manifest(&id).expect("B has manifest");
446 for entry in manifest.entries() {
447 if entry.path_type == PathType::File {
448 assert!(
449 b.has_object(&entry.checksum).expect("has_object ok"),
450 "B missing object {}",
451 entry.checksum
452 );
453 }
454 }
455 }
456
457 #[test]
458 fn meter_records_sync() {
459 let a_dir = TempDir::new("a");
464 let b_dir = TempDir::new("b");
465 let src_dir = TempDir::new("src");
466 let (manifest, id) = make_source(src_dir.path());
467 let n = object_count(&manifest);
468
469 let total_bytes: u64 = manifest
471 .entries()
472 .iter()
473 .filter(|e| e.path_type == PathType::File)
474 .map(|e| e.size)
475 .sum();
476
477 let a = FileStore::from_root(a_dir.path());
478 let b = FileStore::from_root(b_dir.path());
479 a.push(&manifest, src_dir.path()).expect("stage into A");
480
481 let meter = Arc::new(Meter::new());
482 let report =
483 sync_snapshot(&a, &b, &id, &cfg(), false, Some(&meter)).expect("first meter sync");
484 assert_eq!(report.objects_copied, n);
485
486 let snap = meter.snapshot();
487 assert_eq!(snap.bytes_in, total_bytes, "bytes_in == total object bytes");
488 assert_eq!(
489 snap.bytes_out, total_bytes,
490 "bytes_out == total object bytes"
491 );
492 assert_eq!(snap.objects_done, n as u64, "objects_done == N");
493 assert_eq!(snap.objects_skipped, 0, "nothing skipped on a fresh dest");
494 assert_eq!(snap.objects_total, total_bytes, "total == bytes total");
495 assert_eq!(snap.in_flight, 0, "no objects left in flight");
496 assert_eq!(snap.phase, Phase::Transfer, "phase set to Transfer");
497
498 let seed_dir = TempDir::new("seed");
503 let seeded = FileStore::from_root(seed_dir.path());
504 for entry in manifest.entries() {
505 if entry.path_type == PathType::File {
506 let blob = a.get_object(&entry.checksum).expect("get from A");
507 seeded.put_object(&entry.checksum, blob).expect("seed dest");
508 }
509 }
510 let later = Arc::new(Meter::new());
511 let later_report = sync_snapshot(&a, &seeded, &id, &cfg(), false, Some(&later))
512 .expect("second meter sync");
513 assert_eq!(
514 later_report.objects_skipped, n,
515 "all objects already present"
516 );
517 let later_snap = later.snapshot();
518 assert_eq!(later_snap.objects_skipped, n as u64, "meter skipped == N");
519 assert_eq!(later_snap.objects_done, 0, "no objects copied");
520 assert_eq!(later_snap.bytes_in, 0, "no bytes read");
521 assert_eq!(later_snap.bytes_out, 0, "no bytes written");
522 }
523
524 #[test]
525 fn sync_snapshot_skip_present_is_incremental() {
526 let a_dir = TempDir::new("a");
527 let b_dir = TempDir::new("b");
528 let src_dir = TempDir::new("src");
529 let (manifest, id) = make_source(src_dir.path());
530 let n = object_count(&manifest);
531
532 let a = FileStore::from_root(a_dir.path());
533 let b = FileStore::from_root(b_dir.path());
534 a.push(&manifest, src_dir.path()).expect("stage into A");
535
536 let first = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("first sync");
537 assert_eq!(first.objects_copied, n);
538
539 let second = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("second sync");
542 assert_eq!(second.objects_copied, 0);
543 assert_eq!(second.objects_skipped, 0);
544 assert_eq!(second.bytes_copied, 0);
545 b.get_manifest(&id).expect("B still has manifest");
546 }
547
548 #[test]
549 fn sync_snapshot_skip_present_per_object() {
550 let a_dir = TempDir::new("a");
553 let b_dir = TempDir::new("b");
554 let src_dir = TempDir::new("src");
555 let (manifest, id) = make_source(src_dir.path());
556 let n = object_count(&manifest);
557
558 let a = FileStore::from_root(a_dir.path());
559 let b = FileStore::from_root(b_dir.path());
560 a.push(&manifest, src_dir.path()).expect("stage into A");
561
562 let first_obj = manifest
564 .entries()
565 .iter()
566 .find(|e| e.path_type == PathType::File)
567 .unwrap();
568 let blob = a.get_object(&first_obj.checksum).expect("get from A");
569 b.put_object(&first_obj.checksum, blob).expect("seed B");
570
571 let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
572 assert_eq!(report.objects_copied, n - 1);
573 assert_eq!(report.objects_skipped, 1);
574 b.get_manifest(&id).expect("B has manifest after sync");
575 }
576
577 struct FailingPutStore {
580 inner: FileStore,
581 fail_on: String,
582 attempted: Mutex<Vec<String>>,
584 }
585
586 impl Store for FailingPutStore {
587 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
588 self.inner.get_manifest(id)
589 }
590 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
591 self.inner.fetch_files(manifest, dest)
592 }
593 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
594 self.inner.push(manifest, source)
595 }
596 }
597
598 impl StreamStore for FailingPutStore {
599 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
600 self.inner.has_object(checksum)
601 }
602 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
603 self.inner.get_object(checksum)
604 }
605 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
606 self.attempted.lock().unwrap().push(checksum.to_owned());
607 if checksum == self.fail_on {
608 return Err(StoreError::Backend {
609 message: "synthetic put_object failure".to_owned(),
610 source: None,
611 });
612 }
613 self.inner.put_object(checksum, bytes)
614 }
615 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
616 self.inner.put_manifest(id, manifest)
617 }
618 }
619
620 #[test]
621 fn sync_snapshot_all_or_nothing() {
622 let a_dir = TempDir::new("a");
623 let b_dir = TempDir::new("b");
624 let src_dir = TempDir::new("src");
625 let (manifest, id) = make_source(src_dir.path());
626
627 let a = FileStore::from_root(a_dir.path());
628 a.push(&manifest, src_dir.path()).expect("stage into A");
629
630 let fail_on = manifest
632 .entries()
633 .iter()
634 .find(|e| e.path_type == PathType::File)
635 .unwrap()
636 .checksum
637 .clone();
638
639 let b = FailingPutStore {
640 inner: FileStore::from_root(b_dir.path()),
641 fail_on,
642 attempted: Mutex::new(Vec::new()),
643 };
644
645 let one = TransferConfig::new(1, None);
647 let err =
648 sync_snapshot(&a, &b, &id, &one, false, None).expect_err("must surface put error");
649 assert!(
650 matches!(err, StoreError::Backend { ref message, .. } if message.contains("synthetic")),
651 "unexpected error: {err:?}"
652 );
653 assert!(
655 b.get_manifest(&id).is_err(),
656 "dest must have no manifest after a failed sync"
657 );
658 }
659
660 #[test]
661 fn sync_snapshot_adaptive_mirrors_same_snapshot() {
662 let a_dir = TempDir::new("a");
666 let off_dir = TempDir::new("off");
667 let on_dir = TempDir::new("on");
668 let src_dir = TempDir::new("src");
669 let (manifest, id) = make_source(src_dir.path());
670 let n = object_count(&manifest);
671
672 let a = FileStore::from_root(a_dir.path());
673 a.push(&manifest, src_dir.path()).expect("stage into A");
674
675 let off = FileStore::from_root(off_dir.path());
676 let off_report = sync_snapshot(&a, &off, &id, &cfg(), false, None).expect("off sync");
677
678 let on = FileStore::from_root(on_dir.path());
679 let on_cfg = TransferConfig::new(4, None).with_adaptive(AdaptivePolicy::On {
680 fraction: 0.8,
681 ceiling: 2,
682 });
683 let on_report = sync_snapshot(&a, &on, &id, &on_cfg, false, None).expect("adaptive sync");
684
685 assert_eq!(off_report.objects_copied, n);
686 assert_eq!(
687 on_report.objects_copied, n,
688 "adaptive copies the same count"
689 );
690 assert_eq!(on_report.objects_skipped, 0);
691
692 on.get_manifest(&id).expect("On dest has the manifest");
694 for entry in manifest.entries() {
695 if entry.path_type == PathType::File {
696 let off_blob = off.get_object(&entry.checksum).expect("off object");
697 let on_blob = on.get_object(&entry.checksum).expect("on object");
698 assert_eq!(off_blob, on_blob, "Off vs On object bytes identical");
699 }
700 }
701 }
702
703 #[test]
704 fn sync_snapshot_dry_run_writes_nothing() {
705 let a_dir = TempDir::new("a");
706 let b_dir = TempDir::new("b");
707 let src_dir = TempDir::new("src");
708 let (manifest, id) = make_source(src_dir.path());
709 let n = object_count(&manifest);
710
711 let a = FileStore::from_root(a_dir.path());
712 let b = FileStore::from_root(b_dir.path());
713 a.push(&manifest, src_dir.path()).expect("stage into A");
714
715 let report = sync_snapshot(&a, &b, &id, &cfg(), true, None).expect("dry run ok");
716 assert!(report.dry_run);
717 assert_eq!(report.objects_copied, n, "would-copy count is N");
718 assert_eq!(report.objects_skipped, 0);
719 assert_eq!(report.bytes_copied, 0);
720
721 assert!(b.get_manifest(&id).is_err(), "dry run wrote a manifest");
723 for entry in manifest.entries() {
724 if entry.path_type == PathType::File {
725 assert!(
726 !b.has_object(&entry.checksum).expect("has_object ok"),
727 "dry run wrote an object"
728 );
729 }
730 }
731 }
732
733 #[test]
734 fn sync_snapshot_no_local_fs() {
735 let parent = TempDir::new("parent");
739 let a_root = parent.path().join("store-a");
740 let b_root = parent.path().join("store-b");
741 let src = parent.path().join("src");
742 fs::create_dir_all(&a_root).unwrap();
743 fs::create_dir_all(&b_root).unwrap();
744 fs::create_dir_all(&src).unwrap();
745
746 let (manifest, id) = make_source(&src);
747
748 let a = FileStore::from_root(&a_root);
749 let b = FileStore::from_root(&b_root);
750 a.push(&manifest, &src).expect("stage into A");
751
752 let before: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
754 .unwrap()
755 .map(|e| e.unwrap().path())
756 .collect();
757
758 sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
759
760 let after: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
761 .unwrap()
762 .map(|e| e.unwrap().path())
763 .collect();
764
765 assert_eq!(
766 before,
767 after,
768 "sync_snapshot created an entry outside the store dirs: {:?}",
769 after.difference(&before).collect::<Vec<_>>()
770 );
771 }
772}