use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use snapdir_core::manifest::PathType;
use snapdir_core::store::StoreError;
use snapdir_core::{Meter, Phase};
use crate::stream::StreamStore;
use crate::transfer::{BlockingRateLimiter, TransferConfig};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncReport {
pub objects_copied: usize,
pub objects_skipped: usize,
pub bytes_copied: u64,
pub dry_run: bool,
}
pub fn sync_snapshot(
from: &(dyn StreamStore + Sync),
to: &(dyn StreamStore + Sync),
id: &str,
config: &TransferConfig,
dry_run: bool,
meter: Option<&Meter>,
) -> Result<SyncReport, StoreError> {
if to.get_manifest(id).is_ok() {
return Ok(SyncReport {
objects_copied: 0,
objects_skipped: 0,
bytes_copied: 0,
dry_run,
});
}
let manifest = from.get_manifest(id)?;
let files: Vec<&str> = manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.map(|e| e.checksum.as_str())
.collect();
if let Some(m) = meter {
m.set_phase(Phase::Transfer);
let total: u64 = manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.map(|e| e.size)
.sum();
m.set_total(total);
}
let copied = AtomicUsize::new(0);
let skipped = AtomicUsize::new(0);
let bytes = AtomicU64::new(0);
let limiter = Arc::new(BlockingRateLimiter::new(config.max_bytes_per_sec));
if !files.is_empty() {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(config.concurrency.get())
.build()
.map_err(|err| StoreError::Backend {
message: "failed to build sync thread pool".to_owned(),
source: Some(Box::new(err)),
})?;
pool.install(|| {
use rayon::prelude::*;
files.par_iter().try_for_each(|checksum| {
if to.has_object(checksum)? {
skipped.fetch_add(1, Ordering::Relaxed);
if let Some(m) = meter {
m.add_skipped(1);
}
return Ok(());
}
if dry_run {
copied.fetch_add(1, Ordering::Relaxed);
return Ok(());
}
if let Some(m) = meter {
m.object_started();
}
let blob = from.get_object(checksum)?;
let len = blob.len() as u64;
if let Some(m) = meter {
m.add_in(len);
}
limiter.acquire_blocking(len);
to.put_object(checksum, blob)?;
if let Some(m) = meter {
m.add_out(len);
m.object_finished();
}
copied.fetch_add(1, Ordering::Relaxed);
bytes.fetch_add(len, Ordering::Relaxed);
Ok::<(), StoreError>(())
})
})?;
}
if !dry_run {
to.put_manifest(id, &manifest)?;
}
Ok(SyncReport {
objects_copied: copied.into_inner(),
objects_skipped: skipped.into_inner(),
bytes_copied: bytes.into_inner(),
dry_run,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file_store::FileStore;
use snapdir_core::manifest::{Manifest, ManifestEntry};
use snapdir_core::merkle::{Blake3Hasher, Hasher};
use snapdir_core::store::Store;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
struct TempDir {
path: PathBuf,
}
impl TempDir {
fn new(tag: &str) -> Self {
use std::sync::atomic::AtomicU64;
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let path = std::env::temp_dir().join(format!(
"snapdir-sync-test-{}-{tag}-{n}",
std::process::id()
));
fs::create_dir_all(&path).expect("create temp dir");
Self { path }
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.path);
}
}
fn make_source(source: &Path) -> (Manifest, String) {
let hasher = Blake3Hasher::new();
let files: [(&str, &[u8]); 3] = [("a", b"alpha\n"), ("b", b"bravo\n"), ("c", b"charlie\n")];
let mut sums: Vec<(String, String, u64)> = Vec::new();
for (name, bytes) in files {
fs::write(source.join(name), bytes).unwrap();
sums.push((
(*name).to_owned(),
hasher.hash_hex(bytes),
bytes.len() as u64,
));
}
let root_sum = snapdir_core::merkle::directory_checksum(
sums.iter().map(|(_, s, _)| s.as_str()),
&hasher,
);
let mut entries = vec![ManifestEntry::new(
PathType::Directory,
"700",
root_sum,
0,
"./",
)];
for (name, sum, size) in &sums {
entries.push(ManifestEntry::new(
PathType::File,
"600",
sum.clone(),
*size,
format!("./{name}"),
));
}
let manifest = Manifest::from_entries(entries);
let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
(manifest, id)
}
fn object_count(manifest: &Manifest) -> usize {
manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.count()
}
fn cfg() -> TransferConfig {
TransferConfig::new(4, None)
}
#[test]
fn sync_snapshot_mirrors_snapshot() {
let a_dir = TempDir::new("a");
let b_dir = TempDir::new("b");
let src_dir = TempDir::new("src");
let (manifest, id) = make_source(src_dir.path());
let n = object_count(&manifest);
let a = FileStore::from_root(a_dir.path());
let b = FileStore::from_root(b_dir.path());
a.push(&manifest, src_dir.path()).expect("stage into A");
let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
assert_eq!(report.objects_copied, n);
assert_eq!(report.objects_skipped, 0);
assert!(!report.dry_run);
b.get_manifest(&id).expect("B has manifest");
for entry in manifest.entries() {
if entry.path_type == PathType::File {
assert!(
b.has_object(&entry.checksum).expect("has_object ok"),
"B missing object {}",
entry.checksum
);
}
}
}
#[test]
fn meter_records_sync() {
let a_dir = TempDir::new("a");
let b_dir = TempDir::new("b");
let src_dir = TempDir::new("src");
let (manifest, id) = make_source(src_dir.path());
let n = object_count(&manifest);
let total_bytes: u64 = manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.map(|e| e.size)
.sum();
let a = FileStore::from_root(a_dir.path());
let b = FileStore::from_root(b_dir.path());
a.push(&manifest, src_dir.path()).expect("stage into A");
let meter = Arc::new(Meter::new());
let report =
sync_snapshot(&a, &b, &id, &cfg(), false, Some(&meter)).expect("first meter sync");
assert_eq!(report.objects_copied, n);
let snap = meter.snapshot();
assert_eq!(snap.bytes_in, total_bytes, "bytes_in == total object bytes");
assert_eq!(
snap.bytes_out, total_bytes,
"bytes_out == total object bytes"
);
assert_eq!(snap.objects_done, n as u64, "objects_done == N");
assert_eq!(snap.objects_skipped, 0, "nothing skipped on a fresh dest");
assert_eq!(snap.objects_total, total_bytes, "total == bytes total");
assert_eq!(snap.in_flight, 0, "no objects left in flight");
assert_eq!(snap.phase, Phase::Transfer, "phase set to Transfer");
let seed_dir = TempDir::new("seed");
let seeded = FileStore::from_root(seed_dir.path());
for entry in manifest.entries() {
if entry.path_type == PathType::File {
let blob = a.get_object(&entry.checksum).expect("get from A");
seeded.put_object(&entry.checksum, blob).expect("seed dest");
}
}
let later = Arc::new(Meter::new());
let later_report = sync_snapshot(&a, &seeded, &id, &cfg(), false, Some(&later))
.expect("second meter sync");
assert_eq!(
later_report.objects_skipped, n,
"all objects already present"
);
let later_snap = later.snapshot();
assert_eq!(later_snap.objects_skipped, n as u64, "meter skipped == N");
assert_eq!(later_snap.objects_done, 0, "no objects copied");
assert_eq!(later_snap.bytes_in, 0, "no bytes read");
assert_eq!(later_snap.bytes_out, 0, "no bytes written");
}
#[test]
fn sync_snapshot_skip_present_is_incremental() {
let a_dir = TempDir::new("a");
let b_dir = TempDir::new("b");
let src_dir = TempDir::new("src");
let (manifest, id) = make_source(src_dir.path());
let n = object_count(&manifest);
let a = FileStore::from_root(a_dir.path());
let b = FileStore::from_root(b_dir.path());
a.push(&manifest, src_dir.path()).expect("stage into A");
let first = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("first sync");
assert_eq!(first.objects_copied, n);
let second = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("second sync");
assert_eq!(second.objects_copied, 0);
assert_eq!(second.objects_skipped, 0);
assert_eq!(second.bytes_copied, 0);
b.get_manifest(&id).expect("B still has manifest");
}
#[test]
fn sync_snapshot_skip_present_per_object() {
let a_dir = TempDir::new("a");
let b_dir = TempDir::new("b");
let src_dir = TempDir::new("src");
let (manifest, id) = make_source(src_dir.path());
let n = object_count(&manifest);
let a = FileStore::from_root(a_dir.path());
let b = FileStore::from_root(b_dir.path());
a.push(&manifest, src_dir.path()).expect("stage into A");
let first_obj = manifest
.entries()
.iter()
.find(|e| e.path_type == PathType::File)
.unwrap();
let blob = a.get_object(&first_obj.checksum).expect("get from A");
b.put_object(&first_obj.checksum, blob).expect("seed B");
let report = sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
assert_eq!(report.objects_copied, n - 1);
assert_eq!(report.objects_skipped, 1);
b.get_manifest(&id).expect("B has manifest after sync");
}
struct FailingPutStore {
inner: FileStore,
fail_on: String,
attempted: Mutex<Vec<String>>,
}
impl Store for FailingPutStore {
fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
self.inner.get_manifest(id)
}
fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
self.inner.fetch_files(manifest, dest)
}
fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
self.inner.push(manifest, source)
}
}
impl StreamStore for FailingPutStore {
fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
self.inner.has_object(checksum)
}
fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
self.inner.get_object(checksum)
}
fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
self.attempted.lock().unwrap().push(checksum.to_owned());
if checksum == self.fail_on {
return Err(StoreError::Backend {
message: "synthetic put_object failure".to_owned(),
source: None,
});
}
self.inner.put_object(checksum, bytes)
}
fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
self.inner.put_manifest(id, manifest)
}
}
#[test]
fn sync_snapshot_all_or_nothing() {
let a_dir = TempDir::new("a");
let b_dir = TempDir::new("b");
let src_dir = TempDir::new("src");
let (manifest, id) = make_source(src_dir.path());
let a = FileStore::from_root(a_dir.path());
a.push(&manifest, src_dir.path()).expect("stage into A");
let fail_on = manifest
.entries()
.iter()
.find(|e| e.path_type == PathType::File)
.unwrap()
.checksum
.clone();
let b = FailingPutStore {
inner: FileStore::from_root(b_dir.path()),
fail_on,
attempted: Mutex::new(Vec::new()),
};
let one = TransferConfig::new(1, None);
let err =
sync_snapshot(&a, &b, &id, &one, false, None).expect_err("must surface put error");
assert!(
matches!(err, StoreError::Backend { ref message, .. } if message.contains("synthetic")),
"unexpected error: {err:?}"
);
assert!(
b.get_manifest(&id).is_err(),
"dest must have no manifest after a failed sync"
);
}
#[test]
fn sync_snapshot_dry_run_writes_nothing() {
let a_dir = TempDir::new("a");
let b_dir = TempDir::new("b");
let src_dir = TempDir::new("src");
let (manifest, id) = make_source(src_dir.path());
let n = object_count(&manifest);
let a = FileStore::from_root(a_dir.path());
let b = FileStore::from_root(b_dir.path());
a.push(&manifest, src_dir.path()).expect("stage into A");
let report = sync_snapshot(&a, &b, &id, &cfg(), true, None).expect("dry run ok");
assert!(report.dry_run);
assert_eq!(report.objects_copied, n, "would-copy count is N");
assert_eq!(report.objects_skipped, 0);
assert_eq!(report.bytes_copied, 0);
assert!(b.get_manifest(&id).is_err(), "dry run wrote a manifest");
for entry in manifest.entries() {
if entry.path_type == PathType::File {
assert!(
!b.has_object(&entry.checksum).expect("has_object ok"),
"dry run wrote an object"
);
}
}
}
#[test]
fn sync_snapshot_no_local_fs() {
let parent = TempDir::new("parent");
let a_root = parent.path().join("store-a");
let b_root = parent.path().join("store-b");
let src = parent.path().join("src");
fs::create_dir_all(&a_root).unwrap();
fs::create_dir_all(&b_root).unwrap();
fs::create_dir_all(&src).unwrap();
let (manifest, id) = make_source(&src);
let a = FileStore::from_root(&a_root);
let b = FileStore::from_root(&b_root);
a.push(&manifest, &src).expect("stage into A");
let before: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
.unwrap()
.map(|e| e.unwrap().path())
.collect();
sync_snapshot(&a, &b, &id, &cfg(), false, None).expect("sync ok");
let after: std::collections::BTreeSet<PathBuf> = fs::read_dir(parent.path())
.unwrap()
.map(|e| e.unwrap().path())
.collect();
assert_eq!(
before,
after,
"sync_snapshot created an entry outside the store dirs: {:?}",
after.difference(&before).collect::<Vec<_>>()
);
}
}