use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use snapdir_core::manifest::{Manifest, PathType};
use snapdir_core::merkle::{Blake3Hasher, Hasher};
use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
use snapdir_core::Meter;
use crate::stream::StreamStore;
use crate::transfer::TransferConfig;
use crate::util::{file_present_and_verified, hash_file};
const MAX_PERSIST_RETRIES: u32 = 5;
#[derive(Debug, Clone)]
pub struct FileStore {
root: PathBuf,
config: TransferConfig,
meter: Option<Arc<Meter>>,
}
impl FileStore {
#[must_use]
pub fn new(store: &str) -> Self {
Self::from_root(parse_store_dir(store))
}
#[must_use]
pub fn new_with_config(store: &str, config: TransferConfig) -> Self {
Self::from_root_with_config(parse_store_dir(store), config)
}
#[must_use]
pub fn from_root(root: impl Into<PathBuf>) -> Self {
Self::from_root_with_config(root, TransferConfig::default())
}
#[must_use]
pub fn from_root_with_config(root: impl Into<PathBuf>, config: TransferConfig) -> Self {
Self {
root: root.into(),
config,
meter: None,
}
}
#[must_use]
pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
self.meter = meter;
self
}
#[must_use]
pub fn root(&self) -> &Path {
&self.root
}
#[must_use]
pub fn transfer_config(&self) -> &TransferConfig {
&self.config
}
fn object_disk_path(&self, checksum: &str) -> PathBuf {
self.root.join(object_path(checksum))
}
fn manifest_disk_path(&self, id: &str) -> PathBuf {
self.root.join(manifest_path(id))
}
fn parallel_copy(&self, jobs: &[(PathBuf, PathBuf, String)]) -> Result<(), StoreError> {
use rayon::prelude::*;
if jobs.is_empty() {
return Ok(());
}
let meter = self.meter.as_deref();
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.config.concurrency.get())
.build()
.map_err(|err| StoreError::Backend {
message: "failed to build copy thread pool".to_owned(),
source: Some(Box::new(err)),
})?;
pool.install(|| {
jobs.par_iter().try_for_each(|(source, target, expected)| {
if let Some(m) = meter {
m.object_started();
}
let len = std::fs::metadata(source).map_or(0, |md| md.len());
persist(source, target, expected, &Blake3Hasher::new())?;
if let Some(m) = meter {
m.add_in(len);
m.add_out(len);
m.object_finished();
}
Ok(())
})
})
}
}
impl Store for FileStore {
fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
let path = self.manifest_disk_path(id);
let bytes = match fs::read(&path) {
Ok(bytes) => bytes,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
return Err(StoreError::ManifestNotFound { id: id.to_owned() });
}
Err(err) => return Err(StoreError::Io(err)),
};
let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
message: format!("manifest {id} is not valid UTF-8"),
source: Some(Box::new(err)),
})?;
let manifest = Manifest::parse(&text)?;
let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
if actual != id {
return Err(StoreError::Integrity {
address: manifest_path(id),
expected: id.to_owned(),
actual,
});
}
Ok(manifest)
}
fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
let hasher = Blake3Hasher::new();
let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
for entry in manifest.entries() {
let rel = strip_leading_dot_slash(&entry.path);
let target = dest.join(rel);
match entry.path_type {
PathType::Directory => {
fs::create_dir_all(&target)?;
}
PathType::File => {
if file_present_and_verified(&target, &entry.checksum, &hasher) {
if let Some(m) = self.meter.as_deref() {
m.add_skipped(1);
}
continue;
}
if let Some(parent) = target.parent() {
fs::create_dir_all(parent)?;
}
let source = self.object_disk_path(&entry.checksum);
if !source.exists() {
return Err(StoreError::ObjectNotFound {
checksum: entry.checksum.clone(),
});
}
jobs.push((source, target, entry.checksum.clone()));
}
}
}
if let Some(m) = self.meter.as_deref() {
let total: u64 = jobs
.iter()
.map(|(source, _, _)| fs::metadata(source).map_or(0, |md| md.len()))
.sum();
m.set_total(total);
}
self.parallel_copy(&jobs)
}
fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
let hasher = Blake3Hasher::new();
let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
let manifest_target = self.manifest_disk_path(&id);
if manifest_target.exists() {
return Ok(());
}
let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
for entry in manifest.entries() {
if entry.path_type != PathType::File {
continue;
}
let object_target = self.object_disk_path(&entry.checksum);
if object_target.exists() {
if let Some(m) = self.meter.as_deref() {
m.add_skipped(1);
}
continue;
}
let rel = strip_leading_dot_slash(&entry.path);
let object_source = source.join(rel);
jobs.push((object_source, object_target, entry.checksum.clone()));
}
if let Some(m) = self.meter.as_deref() {
let total: u64 = jobs
.iter()
.map(|(src, _, _)| fs::metadata(src).map_or(0, |md| md.len()))
.sum();
m.set_total(total);
}
self.parallel_copy(&jobs)?;
write_manifest(manifest, &manifest_target, &id, &hasher)?;
Ok(())
}
}
impl StreamStore for FileStore {
fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
Ok(self.object_disk_path(checksum).exists())
}
fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
let path = self.object_disk_path(checksum);
let bytes = match fs::read(&path) {
Ok(bytes) => bytes,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
return Err(StoreError::ObjectNotFound {
checksum: checksum.to_owned(),
});
}
Err(err) => return Err(StoreError::Io(err)),
};
let actual = Blake3Hasher::new().hash_hex(&bytes);
if actual != checksum {
return Err(StoreError::Integrity {
address: path.display().to_string(),
expected: checksum.to_owned(),
actual,
});
}
Ok(bytes)
}
fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
let actual = Blake3Hasher::new().hash_hex(&bytes);
if actual != checksum {
return Err(StoreError::Integrity {
address: object_path(checksum),
expected: checksum.to_owned(),
actual,
});
}
let target = self.object_disk_path(checksum);
if let Some(parent) = target.parent() {
fs::create_dir_all(parent)?;
}
let tmp = temp_sibling(&target);
fs::write(&tmp, &bytes)?;
fs::rename(&tmp, &target)?;
Ok(())
}
fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
write_manifest(
manifest,
&self.manifest_disk_path(id),
id,
&Blake3Hasher::new(),
)
}
}
fn persist(
source: &Path,
target: &Path,
expected: &str,
hasher: &impl Hasher,
) -> Result<(), StoreError> {
if let Some(parent) = target.parent() {
fs::create_dir_all(parent)?;
}
let mut attempts_left = MAX_PERSIST_RETRIES;
loop {
let tmp = temp_sibling(target);
copy_file(source, &tmp)?;
let actual = hash_file(&tmp, hasher)?;
if actual == expected {
fs::rename(&tmp, target)?;
return Ok(());
}
let _ = fs::remove_file(&tmp);
let source_actual = hash_file(source, hasher)?;
if source_actual != expected {
return Err(StoreError::Integrity {
address: source.display().to_string(),
expected: expected.to_owned(),
actual: source_actual,
});
}
attempts_left = attempts_left.saturating_sub(1);
if attempts_left == 0 {
return Err(StoreError::Integrity {
address: target.display().to_string(),
expected: expected.to_owned(),
actual,
});
}
}
}
fn write_manifest(
manifest: &Manifest,
target: &Path,
id: &str,
hasher: &impl Hasher,
) -> Result<(), StoreError> {
if let Some(parent) = target.parent() {
fs::create_dir_all(parent)?;
}
let actual = snapdir_core::merkle::snapshot_id(manifest, hasher);
if actual != id {
return Err(StoreError::Integrity {
address: target.display().to_string(),
expected: id.to_owned(),
actual,
});
}
let mut text = manifest.to_string();
text.push('\n');
let tmp = temp_sibling(target);
fs::write(&tmp, text.as_bytes())?;
fs::rename(&tmp, target)?;
Ok(())
}
fn copy_file(source: &Path, target: &Path) -> Result<(), StoreError> {
fs::copy(source, target)?;
Ok(())
}
fn temp_sibling(target: &Path) -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let file_name = target
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_default();
let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
match target.parent() {
Some(parent) => parent.join(tmp_name),
None => PathBuf::from(tmp_name),
}
}
fn strip_leading_dot_slash(path: &str) -> &str {
let trimmed = path.strip_prefix("./").unwrap_or(path);
trimmed.strip_suffix('/').unwrap_or(trimmed)
}
fn parse_store_dir(store: &str) -> PathBuf {
let resolved = if let Some(rest) = store.strip_prefix("file:") {
let rest = rest.trim_start_matches('/');
let rest = if let Some(after) = rest.strip_prefix("localhost") {
after.strip_prefix('/').unwrap_or(after)
} else {
rest
};
format!("/{rest}")
} else {
store.to_owned()
};
let trimmed = if resolved.len() > 1 {
resolved.strip_suffix('/').unwrap_or(&resolved)
} else {
&resolved
};
PathBuf::from(trimmed)
}
#[cfg(test)]
mod tests {
use super::*;
use snapdir_core::manifest::ManifestEntry;
use std::fs;
use std::path::Path;
struct TempDir {
path: PathBuf,
}
impl TempDir {
fn new(tag: &str) -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let path = std::env::temp_dir().join(format!(
"snapdir-filestore-test-{}-{tag}-{n}",
std::process::id()
));
fs::create_dir_all(&path).expect("create temp dir");
Self { path }
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.path);
}
}
fn make_foo_bar_source(source: &Path) -> (Manifest, String) {
let hasher = Blake3Hasher::new();
fs::write(source.join("foo"), b"foo\n").unwrap();
fs::write(source.join("bar"), b"bar\n").unwrap();
let foo_sum = hasher.hash_hex(b"foo\n");
let bar_sum = hasher.hash_hex(b"bar\n");
let root_sum =
snapdir_core::merkle::directory_checksum([foo_sum.as_str(), bar_sum.as_str()], &hasher);
let mut manifest = Manifest::new();
manifest.push(ManifestEntry::new(
PathType::Directory,
"700",
root_sum,
8,
"./",
));
manifest.push(ManifestEntry::new(
PathType::File,
"600",
bar_sum,
4,
"./bar",
));
manifest.push(ManifestEntry::new(
PathType::File,
"600",
foo_sum,
4,
"./foo",
));
let manifest = Manifest::from_entries(manifest.entries().to_vec());
let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
(manifest, id)
}
#[test]
fn file_store_parse_store_dir_matches_oracle_sed() {
assert_eq!(
parse_store_dir("file:///tmp/store"),
PathBuf::from("/tmp/store")
);
assert_eq!(
parse_store_dir("file:///tmp/store/"),
PathBuf::from("/tmp/store")
);
assert_eq!(
parse_store_dir("file://localhost/tmp/store"),
PathBuf::from("/tmp/store")
);
assert_eq!(
parse_store_dir("file://tmp/store"),
PathBuf::from("/tmp/store")
);
assert_eq!(parse_store_dir("/tmp/store"), PathBuf::from("/tmp/store"));
assert_eq!(parse_store_dir("file:///"), PathBuf::from("/"));
}
#[test]
fn file_store_push_lands_objects_at_sharded_keys_and_manifest_last() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("push ok");
for entry in manifest.entries() {
if entry.path_type == PathType::File {
let obj = store_dir.path().join(object_path(&entry.checksum));
assert!(obj.exists(), "expected object at {}", obj.display());
let bytes = fs::read(&obj).unwrap();
assert_eq!(
Blake3Hasher::new().hash_hex(&bytes),
entry.checksum,
"object content must hash to its address"
);
}
}
let man_path = store_dir.path().join(manifest_path(&id));
assert!(man_path.exists(), "manifest must exist after push");
let read_back = store.get_manifest(&id).expect("manifest reads back");
assert_eq!(read_back, manifest);
}
#[test]
fn file_store_push_skips_when_manifest_present() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("first push");
let foo_entry = manifest
.entries()
.iter()
.find(|e| e.path == "./foo")
.unwrap();
let obj = store_dir.path().join(object_path(&foo_entry.checksum));
fs::remove_file(&obj).unwrap();
let _ = id;
store
.push(&manifest, src_dir.path())
.expect("second push skips");
assert!(
!obj.exists(),
"manifest-present push must be a full no-op (object stays removed)"
);
}
#[test]
fn file_store_push_skips_present_objects_but_adds_missing() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("first push");
let man_path = store_dir.path().join(manifest_path(&id));
fs::remove_file(&man_path).unwrap();
let foo_entry = manifest
.entries()
.iter()
.find(|e| e.path == "./foo")
.unwrap();
let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
fs::remove_file(&foo_obj).unwrap();
store.push(&manifest, src_dir.path()).expect("re-push");
assert!(foo_obj.exists(), "missing object must be re-added");
assert!(man_path.exists(), "manifest must be re-written");
}
#[test]
fn file_store_fetch_round_trips_and_verifies() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let dest_dir = TempDir::new("dest");
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("push");
let fetched = store.get_manifest(&id).expect("get manifest");
store
.fetch_files(&fetched, dest_dir.path())
.expect("fetch files");
assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
}
#[test]
fn file_store_get_manifest_missing_is_not_found() {
let store_dir = TempDir::new("store");
let store = FileStore::from_root(store_dir.path());
let missing = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
match store.get_manifest(missing) {
Err(StoreError::ManifestNotFound { id }) => assert_eq!(id, missing),
other => panic!("expected ManifestNotFound, got {other:?}"),
}
}
#[test]
fn file_store_get_manifest_tampered_fails_integrity() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("push");
let man_path = store_dir.path().join(manifest_path(&id));
fs::write(&man_path, b"D 700 deadbeef 0 ./\n").unwrap();
match store.get_manifest(&id) {
Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, id),
other => panic!("expected Integrity, got {other:?}"),
}
}
#[test]
fn file_store_fetch_missing_object_is_not_found() {
let store_dir = TempDir::new("store");
let dest_dir = TempDir::new("dest");
let hasher = Blake3Hasher::new();
let foo_sum = hasher.hash_hex(b"foo\n");
let mut manifest = Manifest::new();
manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 4, "./"));
manifest.push(ManifestEntry::new(
PathType::File,
"600",
foo_sum.clone(),
4,
"./foo",
));
let store = FileStore::from_root(store_dir.path());
match store.fetch_files(&manifest, dest_dir.path()) {
Err(StoreError::ObjectNotFound { checksum }) => assert_eq!(checksum, foo_sum),
other => panic!("expected ObjectNotFound, got {other:?}"),
}
}
#[test]
fn file_store_persist_rejects_corrupt_source() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let dest_dir = TempDir::new("dest");
let hasher = Blake3Hasher::new();
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("push");
let foo_entry = manifest
.entries()
.iter()
.find(|e| e.path == "./foo")
.unwrap();
let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
fs::write(&foo_obj, b"corrupted not foo\n").unwrap();
assert_ne!(hasher.hash_hex(b"corrupted not foo\n"), foo_entry.checksum);
let fetched = store.get_manifest(&id).expect("manifest still valid");
match store.fetch_files(&fetched, dest_dir.path()) {
Err(StoreError::Integrity { expected, .. }) => {
assert_eq!(expected, foo_entry.checksum);
}
other => panic!("expected Integrity from corrupt object, got {other:?}"),
}
assert!(!dest_dir.path().join("foo").exists());
}
#[test]
fn fetch_skip_present_verified() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let dest_dir = TempDir::new("dest");
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("push");
let fetched = store.get_manifest(&id).expect("get manifest");
store
.fetch_files(&fetched, dest_dir.path())
.expect("first fetch populates dest");
assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
let objects = store_dir.path().join(".objects");
fs::remove_dir_all(&objects).expect("remove .objects tree");
assert!(!objects.exists());
store
.fetch_files(&fetched, dest_dir.path())
.expect("second fetch skips every present+verified file (no object reads)");
assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
}
#[test]
fn file_store_fetch_repairs_corrupt_dest_and_skips_intact() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let dest_dir = TempDir::new("dest");
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("push");
let fetched = store.get_manifest(&id).expect("get manifest");
store
.fetch_files(&fetched, dest_dir.path())
.expect("first fetch populates dest");
fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
let bar_entry = manifest
.entries()
.iter()
.find(|e| e.path == "./bar")
.unwrap();
let bar_obj = store_dir.path().join(object_path(&bar_entry.checksum));
fs::remove_file(&bar_obj).unwrap();
store
.fetch_files(&fetched, dest_dir.path())
.expect("repair corrupt foo, skip intact bar");
assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
}
#[test]
fn file_store_fetch_mismatch_then_missing_object_errors() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let dest_dir = TempDir::new("dest");
let (manifest, id) = make_foo_bar_source(src_dir.path());
let store = FileStore::from_root(store_dir.path());
store.push(&manifest, src_dir.path()).expect("push");
let fetched = store.get_manifest(&id).expect("get manifest");
store
.fetch_files(&fetched, dest_dir.path())
.expect("first fetch populates dest");
let foo_entry = manifest
.entries()
.iter()
.find(|e| e.path == "./foo")
.unwrap();
fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
fs::remove_file(&foo_obj).unwrap();
match store.fetch_files(&fetched, dest_dir.path()) {
Err(StoreError::ObjectNotFound { checksum }) => {
assert_eq!(checksum, foo_entry.checksum);
}
other => panic!("expected ObjectNotFound (cannot repair), got {other:?}"),
}
}
fn make_nested_source(source: &Path) -> (Manifest, String) {
let hasher = Blake3Hasher::new();
let files: &[(&str, &[u8])] = &[
("a.txt", b"a contents\n"),
("b.txt", b"b contents\n"),
("sub/c.txt", b"c contents\n"),
("sub/deep/d.txt", b"d contents\n"),
];
fs::create_dir_all(source.join("sub/deep")).unwrap();
for (rel, bytes) in files {
fs::write(source.join(rel), bytes).unwrap();
}
let mut manifest = Manifest::new();
manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
manifest.push(ManifestEntry::new(
PathType::Directory,
"700",
"x",
0,
"./sub/",
));
manifest.push(ManifestEntry::new(
PathType::Directory,
"700",
"x",
0,
"./sub/deep/",
));
for (rel, bytes) in files {
let sum = hasher.hash_hex(bytes);
#[allow(clippy::cast_possible_truncation)]
manifest.push(ManifestEntry::new(
PathType::File,
"600",
sum,
bytes.len() as u64,
format!("./{rel}"),
));
}
let manifest = Manifest::from_entries(manifest.entries().to_vec());
let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
(manifest, id)
}
fn assert_nested_dest(dest: &Path) {
assert_eq!(fs::read(dest.join("a.txt")).unwrap(), b"a contents\n");
assert_eq!(fs::read(dest.join("b.txt")).unwrap(), b"b contents\n");
assert_eq!(fs::read(dest.join("sub/c.txt")).unwrap(), b"c contents\n");
assert_eq!(
fs::read(dest.join("sub/deep/d.txt")).unwrap(),
b"d contents\n"
);
}
#[test]
fn filestore_parallel_roundtrip_byte_identical() {
let src_dir = TempDir::new("src");
let (manifest, id) = make_nested_source(src_dir.path());
let par_store_dir = TempDir::new("store-par");
let par_dest_dir = TempDir::new("dest-par");
let par_store =
FileStore::from_root_with_config(par_store_dir.path(), TransferConfig::new(4, None));
par_store.push(&manifest, src_dir.path()).expect("par push");
let par_manifest = par_store.get_manifest(&id).expect("par get manifest");
assert_eq!(par_manifest, manifest, "round-tripped manifest matches");
par_store
.fetch_files(&par_manifest, par_dest_dir.path())
.expect("par fetch");
assert_nested_dest(par_dest_dir.path());
let seq_store_dir = TempDir::new("store-seq");
let seq_dest_dir = TempDir::new("dest-seq");
let seq_store =
FileStore::from_root_with_config(seq_store_dir.path(), TransferConfig::new(1, None));
seq_store.push(&manifest, src_dir.path()).expect("seq push");
let seq_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
assert_eq!(seq_id, id, "snapshot id is concurrency-independent");
seq_store
.fetch_files(&manifest, seq_dest_dir.path())
.expect("seq fetch");
assert_nested_dest(seq_dest_dir.path());
for entry in manifest.entries() {
if entry.path_type != PathType::File {
continue;
}
let key = object_path(&entry.checksum);
let par_obj = par_store_dir.path().join(&key);
let seq_obj = seq_store_dir.path().join(&key);
assert!(par_obj.exists(), "par object {key} present");
assert!(seq_obj.exists(), "seq object {key} present");
assert_eq!(
fs::read(&par_obj).unwrap(),
fs::read(&seq_obj).unwrap(),
"par and seq object bytes identical"
);
}
}
#[test]
fn filestore_parallel_concurrency_one_sequential() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let dest_dir = TempDir::new("dest");
let (manifest, id) = make_nested_source(src_dir.path());
let store =
FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(1, None));
store.push(&manifest, src_dir.path()).expect("push");
let fetched = store.get_manifest(&id).expect("get manifest");
store.fetch_files(&fetched, dest_dir.path()).expect("fetch");
assert_nested_dest(dest_dir.path());
}
#[test]
fn filestore_parallel_all_or_nothing_bad_object() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let (manifest, id) = make_nested_source(src_dir.path());
fs::write(src_dir.path().join("sub/c.txt"), b"TAMPERED\n").unwrap();
let store =
FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
match store.push(&manifest, src_dir.path()) {
Err(StoreError::Integrity { .. }) => {}
other => panic!("expected Integrity from bad source object, got {other:?}"),
}
let man_path = store.manifest_disk_path(&id);
assert!(
!man_path.exists(),
"manifest must not be written when an object copy fails"
);
}
#[test]
fn filestore_parallel_large_n_round_trips() {
let store_dir = TempDir::new("store");
let src_dir = TempDir::new("src");
let dest_dir = TempDir::new("dest");
let hasher = Blake3Hasher::new();
let mut manifest = Manifest::new();
manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
let n = 50usize;
for i in 0..n {
let name = format!("file-{i:03}.txt");
let contents = format!("contents of file {i}\n");
fs::write(src_dir.path().join(&name), contents.as_bytes()).unwrap();
let sum = hasher.hash_hex(contents.as_bytes());
#[allow(clippy::cast_possible_truncation)]
manifest.push(ManifestEntry::new(
PathType::File,
"600",
sum,
contents.len() as u64,
format!("./{name}"),
));
}
let manifest = Manifest::from_entries(manifest.entries().to_vec());
let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
let store =
FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
store.push(&manifest, src_dir.path()).expect("push N files");
let fetched = store.get_manifest(&id).expect("get manifest");
store
.fetch_files(&fetched, dest_dir.path())
.expect("fetch N files");
for i in 0..n {
let name = format!("file-{i:03}.txt");
let expected = format!("contents of file {i}\n");
assert_eq!(
fs::read(dest_dir.path().join(&name)).unwrap(),
expected.as_bytes()
);
}
}
#[test]
fn meter_records_filestore_push_fetch() {
let src_dir = TempDir::new("src");
let (manifest, id) = make_nested_source(src_dir.path());
let n = manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.count() as u64;
let total_bytes: u64 = manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.map(|e| e.size)
.sum();
let store_dir = TempDir::new("store");
let dest_dir = TempDir::new("dest");
let meter = Arc::new(Meter::new());
let store = FileStore::from_root(store_dir.path()).with_meter(Some(Arc::clone(&meter)));
store.push(&manifest, src_dir.path()).expect("push");
let after_push = meter.snapshot();
assert_eq!(after_push.bytes_in, total_bytes, "push read every object");
assert_eq!(after_push.bytes_out, total_bytes, "push wrote every object");
assert_eq!(after_push.objects_done, n, "push finished N objects");
assert_eq!(after_push.objects_skipped, 0, "fresh store skips nothing");
assert_eq!(after_push.objects_total, total_bytes, "push set byte total");
assert_eq!(after_push.in_flight, 0, "nothing left in flight");
let fetched = store.get_manifest(&id).expect("get manifest");
store
.fetch_files(&fetched, dest_dir.path())
.expect("fetch_files");
let after_fetch = meter.snapshot();
assert_eq!(
after_fetch.bytes_in,
2 * total_bytes,
"fetch read every object again"
);
assert_eq!(
after_fetch.bytes_out,
2 * total_bytes,
"fetch wrote every object again"
);
assert_eq!(after_fetch.objects_done, 2 * n, "push + fetch = 2N objects");
assert_eq!(after_fetch.in_flight, 0, "nothing left in flight");
assert_nested_dest(dest_dir.path());
}
#[test]
fn meter_records_none_is_identical() {
let src_dir = TempDir::new("src");
let (manifest, id) = make_nested_source(src_dir.path());
let metered_store_dir = TempDir::new("store-metered");
let metered_dest_dir = TempDir::new("dest-metered");
let meter = Arc::new(Meter::new());
let metered =
FileStore::from_root(metered_store_dir.path()).with_meter(Some(Arc::clone(&meter)));
metered
.push(&manifest, src_dir.path())
.expect("metered push");
let metered_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
let metered_manifest = metered.get_manifest(&id).expect("metered manifest");
metered
.fetch_files(&metered_manifest, metered_dest_dir.path())
.expect("metered fetch");
let plain_store_dir = TempDir::new("store-plain");
let plain_dest_dir = TempDir::new("dest-plain");
let plain = FileStore::from_root(plain_store_dir.path());
plain.push(&manifest, src_dir.path()).expect("plain push");
let plain_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
let plain_manifest = plain.get_manifest(&id).expect("plain manifest");
plain
.fetch_files(&plain_manifest, plain_dest_dir.path())
.expect("plain fetch");
assert_eq!(metered_id, plain_id, "snapshot id unaffected by the meter");
assert_eq!(metered_id, id);
for entry in manifest.entries() {
if entry.path_type != PathType::File {
continue;
}
let key = object_path(&entry.checksum);
let metered_obj = metered_store_dir.path().join(&key);
let plain_obj = plain_store_dir.path().join(&key);
assert!(metered_obj.exists(), "metered object {key} present");
assert!(plain_obj.exists(), "plain object {key} present");
assert_eq!(
fs::read(&metered_obj).unwrap(),
fs::read(&plain_obj).unwrap(),
"metered and unmetered object bytes identical"
);
}
assert_nested_dest(metered_dest_dir.path());
assert_nested_dest(plain_dest_dir.path());
}
#[test]
fn file_store_strip_leading_dot_slash() {
assert_eq!(strip_leading_dot_slash("./foo"), "foo");
assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
assert_eq!(strip_leading_dot_slash("./a/"), "a");
assert_eq!(strip_leading_dot_slash("./"), "");
assert_eq!(strip_leading_dot_slash("/abs/path"), "/abs/path");
}
#[test]
fn stream_store_filestore_object_roundtrip() {
let store_dir = TempDir::new("stream-roundtrip");
let store = FileStore::from_root(store_dir.path());
let bytes = b"hello stream store\n".to_vec();
let checksum = Blake3Hasher::new().hash_hex(&bytes);
assert!(!store.has_object(&checksum).unwrap());
store.put_object(&checksum, bytes.clone()).expect("put ok");
assert!(store.has_object(&checksum).unwrap());
assert_eq!(store.get_object(&checksum).unwrap(), bytes);
assert!(store_dir.path().join(object_path(&checksum)).exists());
}
#[test]
fn stream_store_get_object_rejects_corruption() {
let store_dir = TempDir::new("stream-corrupt");
let store = FileStore::from_root(store_dir.path());
let good = b"the real object bytes\n".to_vec();
let checksum = Blake3Hasher::new().hash_hex(&good);
let target = store_dir.path().join(object_path(&checksum));
fs::create_dir_all(target.parent().unwrap()).unwrap();
fs::write(&target, b"TAMPERED bytes that do not hash to the address\n").unwrap();
match store.get_object(&checksum) {
Err(StoreError::Integrity {
expected, actual, ..
}) => {
assert_eq!(expected, checksum);
assert_ne!(actual, checksum, "actual must differ from the address");
}
other => panic!("expected Integrity, got {other:?}"),
}
}
#[test]
fn stream_store_put_object_rejects_wrong_checksum() {
let store_dir = TempDir::new("stream-wrong-checksum");
let store = FileStore::from_root(store_dir.path());
let bytes = b"some payload\n".to_vec();
let wrong = "dead".repeat(16); assert_ne!(wrong, Blake3Hasher::new().hash_hex(&bytes));
match store.put_object(&wrong, bytes) {
Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, wrong),
other => panic!("expected Integrity, got {other:?}"),
}
assert!(!store.has_object(&wrong).unwrap());
assert!(!store_dir.path().join(object_path(&wrong)).exists());
}
#[test]
fn stream_store_put_manifest_roundtrips() {
let store_dir = TempDir::new("stream-manifest");
let src_dir = TempDir::new("stream-manifest-src");
let store = FileStore::from_root(store_dir.path());
let (manifest, id) = make_foo_bar_source(src_dir.path());
store.put_manifest(&id, &manifest).expect("put_manifest ok");
let back = store.get_manifest(&id).expect("get_manifest ok");
assert_eq!(back.entries(), manifest.entries());
assert_eq!(
snapdir_core::merkle::snapshot_id(&back, &Blake3Hasher::new()),
id
);
}
}