use std::path::Path;
use snapdir_core::manifest::{Manifest, ManifestEntry, PathType};
use snapdir_core::merkle::{Blake3Hasher, Hasher};
use snapdir_core::store::StoreError;
use crate::transfer::{run_concurrent, RateLimiter, TransferConfig};
fn strip_leading_dot_slash(path: &str) -> &str {
let trimmed = path.strip_prefix("./").unwrap_or(path);
trimmed.strip_suffix('/').unwrap_or(trimmed)
}
async fn read_verified(
entry: &ManifestEntry,
source: &Path,
rate_limiter: &RateLimiter,
) -> Result<Vec<u8>, StoreError> {
let rel = strip_leading_dot_slash(&entry.path);
let object_source = source.join(rel);
let bytes = std::fs::read(&object_source)?;
let actual = Blake3Hasher::new().hash_hex(&bytes);
if actual != entry.checksum {
return Err(StoreError::Integrity {
address: object_source.display().to_string(),
expected: entry.checksum.clone(),
actual,
});
}
rate_limiter.acquire(bytes.len() as u64).await;
Ok(bytes)
}
pub(crate) async fn push_objects_concurrent<'a, U, UFut, W, WFut>(
manifest: &'a Manifest,
config: &TransferConfig,
upload_one: U,
write_manifest: W,
) -> Result<(), StoreError>
where
U: Fn(&'a ManifestEntry) -> UFut,
UFut: std::future::Future<Output = Result<(), StoreError>>,
W: FnOnce() -> WFut,
WFut: std::future::Future<Output = Result<(), StoreError>>,
{
let files: Vec<&ManifestEntry> = manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.collect();
run_concurrent(files, config.concurrency, upload_one).await?;
write_manifest().await
}
pub(crate) async fn upload_object<KFut, PFut>(
entry: &ManifestEntry,
object_key: String,
source: &Path,
rate_limiter: &RateLimiter,
key_exists: impl FnOnce(String) -> KFut,
put_bytes: impl FnOnce(String, Vec<u8>) -> PFut,
) -> Result<(), StoreError>
where
KFut: std::future::Future<Output = Result<bool, StoreError>>,
PFut: std::future::Future<Output = Result<(), StoreError>>,
{
if key_exists(object_key.clone()).await? {
return Ok(());
}
let bytes = read_verified(entry, source, rate_limiter).await?;
put_bytes(object_key, bytes).await
}
#[cfg(test)]
mod tests {
use super::*;
use snapdir_core::merkle::Hasher;
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
fn runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_time()
.build()
.expect("build tokio runtime")
}
struct TempDir {
path: std::path::PathBuf,
}
impl TempDir {
fn new() -> Self {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let path =
std::env::temp_dir().join(format!("snapdir-push-test-{}-{n}", std::process::id()));
std::fs::create_dir_all(&path).expect("create temp dir");
Self { path }
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
fn checksum_of(bytes: &[u8]) -> String {
Blake3Hasher::new().hash_hex(bytes)
}
fn manifest_and_source(files: &[(&str, &[u8])], src: &Path) -> Manifest {
let mut m = Manifest::new();
m.push(ManifestEntry::new(
PathType::Directory,
"700",
"0".repeat(64),
0,
"./",
));
for (path, contents) in files {
if let Some(parent) = Path::new(path).parent() {
std::fs::create_dir_all(src.join(parent)).unwrap();
}
std::fs::write(src.join(path), contents).unwrap();
m.push(ManifestEntry::new(
PathType::File,
"600",
checksum_of(contents),
contents.len() as u64,
format!("./{path}"),
));
}
Manifest::from_entries(m.entries().to_vec())
}
struct FakeStore {
present: HashSet<String>,
uploaded: Mutex<Vec<String>>,
in_flight: AtomicUsize,
high_water: AtomicUsize,
manifest_written: AtomicBool,
uploads_done_at_manifest: AtomicUsize,
fail_checksum: String,
}
impl FakeStore {
fn new(present: &[&str], fail_checksum: &str) -> Arc<Self> {
Arc::new(Self {
present: present.iter().map(|s| (*s).to_owned()).collect(),
uploaded: Mutex::new(Vec::new()),
in_flight: AtomicUsize::new(0),
high_water: AtomicUsize::new(0),
manifest_written: AtomicBool::new(false),
uploads_done_at_manifest: AtomicUsize::new(0),
fail_checksum: fail_checksum.to_owned(),
})
}
#[allow(clippy::unused_async)]
async fn key_exists(&self, key: String) -> Result<bool, StoreError> {
Ok(self.present.contains(&key))
}
async fn put_bytes(&self, _key: String, bytes: Vec<u8>) -> Result<(), StoreError> {
let cur = self.in_flight.fetch_add(1, Ordering::SeqCst) + 1;
self.high_water.fetch_max(cur, Ordering::SeqCst);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let sum = checksum_of(&bytes);
self.in_flight.fetch_sub(1, Ordering::SeqCst);
if !self.fail_checksum.is_empty() && sum == self.fail_checksum {
return Err(StoreError::Backend {
message: "upload blew up".to_owned(),
source: None,
});
}
self.uploaded.lock().unwrap().push(sum);
Ok(())
}
#[allow(clippy::unused_async)]
async fn write_manifest(&self) -> Result<(), StoreError> {
self.uploads_done_at_manifest
.store(self.uploaded.lock().unwrap().len(), Ordering::SeqCst);
self.manifest_written.store(true, Ordering::SeqCst);
Ok(())
}
}
fn run_push(
fake: &Arc<FakeStore>,
manifest: &Manifest,
src: &Path,
concurrency: usize,
) -> Result<(), StoreError> {
let cfg = TransferConfig::new(concurrency, None);
let limiter = RateLimiter::new(None);
let rt = runtime();
rt.block_on(async {
push_objects_concurrent(
manifest,
&cfg,
|entry| {
let fake = Arc::clone(fake);
let limiter = &limiter;
async move {
upload_object(
entry,
entry.checksum.clone(),
src,
limiter,
|key| {
let fake = Arc::clone(&fake);
async move { fake.key_exists(key).await }
},
|key, bytes| {
let fake = Arc::clone(&fake);
async move { fake.put_bytes(key, bytes).await }
},
)
.await
}
},
|| {
let fake = Arc::clone(fake);
async move { fake.write_manifest().await }
},
)
.await
})
}
#[test]
fn concurrent_upload_all_objects_then_manifest() {
let files: &[(&str, &[u8])] = &[
("a.txt", b"alpha" as &[u8]),
("nested/b.txt", b"bravo"),
("nested/deep/c.txt", b"charlie"),
("d.txt", b"delta"),
];
for concurrency in [1usize, 4] {
let src = TempDir::new();
let manifest = manifest_and_source(files, src.path());
let fake = FakeStore::new(&[], "");
run_push(&fake, &manifest, src.path(), concurrency).expect("push must succeed");
let mut uploaded = fake.uploaded.lock().unwrap().clone();
uploaded.sort();
let mut expected: Vec<String> = files.iter().map(|(_, c)| checksum_of(c)).collect();
expected.sort();
assert_eq!(uploaded, expected, "all absent objects must be uploaded");
let hw = fake.high_water.load(Ordering::SeqCst);
let want = concurrency.min(files.len());
assert_eq!(
hw, want,
"concurrency={concurrency}: peak in-flight {hw} != expected {want}"
);
assert!(
fake.manifest_written.load(Ordering::SeqCst),
"manifest must be written"
);
assert_eq!(
fake.uploads_done_at_manifest.load(Ordering::SeqCst),
files.len(),
"manifest must be written only after every object upload completed"
);
}
}
#[test]
fn concurrent_upload_skips_present_objects() {
let files: &[(&str, &[u8])] = &[
("present.txt", b"already-here" as &[u8]),
("missing.txt", b"needs-upload"),
];
let src = TempDir::new();
let manifest = manifest_and_source(files, src.path());
let present_sum = checksum_of(b"already-here");
let fake = FakeStore::new(&[present_sum.as_str()], "");
run_push(&fake, &manifest, src.path(), 4).expect("push must succeed");
let uploaded = fake.uploaded.lock().unwrap().clone();
let missing_sum = checksum_of(b"needs-upload");
assert!(
!uploaded.contains(&present_sum),
"present object must never be uploaded"
);
assert_eq!(
uploaded,
vec![missing_sum],
"only the absent object should be uploaded"
);
assert!(fake.manifest_written.load(Ordering::SeqCst));
}
#[test]
fn concurrent_upload_all_or_nothing_on_failure() {
let files: &[(&str, &[u8])] = &[
("ok1.txt", b"one" as &[u8]),
("boom.txt", b"two"),
("ok2.txt", b"three"),
];
let src = TempDir::new();
let manifest = manifest_and_source(files, src.path());
let boom_sum = checksum_of(b"two");
let fake = FakeStore::new(&[], boom_sum.as_str());
let result = run_push(&fake, &manifest, src.path(), 4);
let err = result.expect_err("a failing object upload must surface");
assert!(
matches!(err, StoreError::Backend { ref message, .. } if message == "upload blew up"),
"unexpected error: {err:?}"
);
assert!(
!fake.manifest_written.load(Ordering::SeqCst),
"write_manifest must NEVER be called when an object upload fails"
);
}
#[test]
fn concurrent_upload_rejects_corrupt_source() {
let files: &[(&str, &[u8])] = &[("good.txt", b"good" as &[u8]), ("bad.txt", b"bad")];
let src = TempDir::new();
let manifest = manifest_and_source(files, src.path());
std::fs::write(src.path().join("bad.txt"), b"tampered").unwrap();
let fake = FakeStore::new(&[], "");
let result = run_push(&fake, &manifest, src.path(), 4);
let err = result.expect_err("a corrupt source must surface an Integrity error");
assert!(
matches!(err, StoreError::Integrity { .. }),
"unexpected error: {err:?}"
);
assert!(
!fake.manifest_written.load(Ordering::SeqCst),
"a corrupt source must leave no manifest"
);
}
}