1use std::fs;
33use std::io;
34use std::path::{Path, PathBuf};
35
36use std::sync::Arc;
37
38use snapdir_core::manifest::{Manifest, PathType};
39use snapdir_core::merkle::{Blake3Hasher, Hasher};
40use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
41use snapdir_core::Meter;
42
43use crate::stream::StreamStore;
44use crate::transfer::TransferConfig;
45use crate::util::{file_present_and_verified, hash_file};
46
47const MAX_PERSIST_RETRIES: u32 = 5;
50
51#[derive(Debug, Clone)]
57pub struct FileStore {
58 root: PathBuf,
59 config: TransferConfig,
60 meter: Option<Arc<Meter>>,
64}
65
66impl FileStore {
67 #[must_use]
75 pub fn new(store: &str) -> Self {
76 Self::from_root(parse_store_dir(store))
77 }
78
79 #[must_use]
82 pub fn new_with_config(store: &str, config: TransferConfig) -> Self {
83 Self::from_root_with_config(parse_store_dir(store), config)
84 }
85
86 #[must_use]
88 pub fn from_root(root: impl Into<PathBuf>) -> Self {
89 Self::from_root_with_config(root, TransferConfig::default())
90 }
91
92 #[must_use]
96 pub fn from_root_with_config(root: impl Into<PathBuf>, config: TransferConfig) -> Self {
97 Self {
98 root: root.into(),
99 config,
100 meter: None,
101 }
102 }
103
104 #[must_use]
110 pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
111 self.meter = meter;
112 self
113 }
114
115 #[must_use]
117 pub fn root(&self) -> &Path {
118 &self.root
119 }
120
121 #[must_use]
124 pub fn transfer_config(&self) -> &TransferConfig {
125 &self.config
126 }
127
128 fn object_disk_path(&self, checksum: &str) -> PathBuf {
130 self.root.join(object_path(checksum))
131 }
132
133 fn manifest_disk_path(&self, id: &str) -> PathBuf {
135 self.root.join(manifest_path(id))
136 }
137
138 fn parallel_copy(&self, jobs: &[(PathBuf, PathBuf, String)]) -> Result<(), StoreError> {
148 use rayon::prelude::*;
149
150 if jobs.is_empty() {
151 return Ok(());
152 }
153
154 let meter = self.meter.as_deref();
157
158 let pool = rayon::ThreadPoolBuilder::new()
159 .num_threads(self.config.concurrency.get())
160 .build()
161 .map_err(|err| StoreError::Backend {
162 message: "failed to build copy thread pool".to_owned(),
163 source: Some(Box::new(err)),
164 })?;
165
166 pool.install(|| {
167 jobs.par_iter().try_for_each(|(source, target, expected)| {
168 if let Some(m) = meter {
169 m.object_started();
170 }
171 let len = std::fs::metadata(source).map_or(0, |md| md.len());
175 persist(source, target, expected, &Blake3Hasher::new())?;
176 if let Some(m) = meter {
177 m.add_in(len);
178 m.add_out(len);
179 m.object_finished();
180 }
181 Ok(())
182 })
183 })
184 }
185}
186
187impl Store for FileStore {
188 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
189 let path = self.manifest_disk_path(id);
190 let bytes = match fs::read(&path) {
191 Ok(bytes) => bytes,
192 Err(err) if err.kind() == io::ErrorKind::NotFound => {
193 return Err(StoreError::ManifestNotFound { id: id.to_owned() });
194 }
195 Err(err) => return Err(StoreError::Io(err)),
196 };
197
198 let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
204 message: format!("manifest {id} is not valid UTF-8"),
205 source: Some(Box::new(err)),
206 })?;
207 let manifest = Manifest::parse(&text)?;
208
209 let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
210 if actual != id {
211 return Err(StoreError::Integrity {
212 address: manifest_path(id),
213 expected: id.to_owned(),
214 actual,
215 });
216 }
217
218 Ok(manifest)
219 }
220
221 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
222 let hasher = Blake3Hasher::new();
223
224 let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
234 for entry in manifest.entries() {
235 let rel = strip_leading_dot_slash(&entry.path);
236 let target = dest.join(rel);
237 match entry.path_type {
238 PathType::Directory => {
239 fs::create_dir_all(&target)?;
240 }
241 PathType::File => {
242 if file_present_and_verified(&target, &entry.checksum, &hasher) {
247 if let Some(m) = self.meter.as_deref() {
249 m.add_skipped(1);
250 }
251 continue;
252 }
253 if let Some(parent) = target.parent() {
254 fs::create_dir_all(parent)?;
255 }
256 let source = self.object_disk_path(&entry.checksum);
257 if !source.exists() {
258 return Err(StoreError::ObjectNotFound {
259 checksum: entry.checksum.clone(),
260 });
261 }
262 jobs.push((source, target, entry.checksum.clone()));
263 }
264 }
265 }
266
267 if let Some(m) = self.meter.as_deref() {
270 let total: u64 = jobs
271 .iter()
272 .map(|(source, _, _)| fs::metadata(source).map_or(0, |md| md.len()))
273 .sum();
274 m.set_total(total);
275 }
276
277 self.parallel_copy(&jobs)
281 }
282
283 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
284 let hasher = Blake3Hasher::new();
287 let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
288 let manifest_target = self.manifest_disk_path(&id);
289
290 if manifest_target.exists() {
294 return Ok(());
295 }
296
297 let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
301 for entry in manifest.entries() {
302 if entry.path_type != PathType::File {
303 continue;
304 }
305 let object_target = self.object_disk_path(&entry.checksum);
306 if object_target.exists() {
307 if let Some(m) = self.meter.as_deref() {
309 m.add_skipped(1);
310 }
311 continue;
312 }
313 let rel = strip_leading_dot_slash(&entry.path);
314 let object_source = source.join(rel);
315 jobs.push((object_source, object_target, entry.checksum.clone()));
316 }
317
318 if let Some(m) = self.meter.as_deref() {
321 let total: u64 = jobs
322 .iter()
323 .map(|(src, _, _)| fs::metadata(src).map_or(0, |md| md.len()))
324 .sum();
325 m.set_total(total);
326 }
327
328 self.parallel_copy(&jobs)?;
333
334 write_manifest(manifest, &manifest_target, &id, &hasher)?;
337 Ok(())
338 }
339}
340
341impl StreamStore for FileStore {
342 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
343 Ok(self.object_disk_path(checksum).exists())
344 }
345
346 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
347 let path = self.object_disk_path(checksum);
348 let bytes = match fs::read(&path) {
349 Ok(bytes) => bytes,
350 Err(err) if err.kind() == io::ErrorKind::NotFound => {
351 return Err(StoreError::ObjectNotFound {
352 checksum: checksum.to_owned(),
353 });
354 }
355 Err(err) => return Err(StoreError::Io(err)),
356 };
357
358 let actual = Blake3Hasher::new().hash_hex(&bytes);
362 if actual != checksum {
363 return Err(StoreError::Integrity {
364 address: path.display().to_string(),
365 expected: checksum.to_owned(),
366 actual,
367 });
368 }
369 Ok(bytes)
370 }
371
372 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
373 let actual = Blake3Hasher::new().hash_hex(&bytes);
376 if actual != checksum {
377 return Err(StoreError::Integrity {
378 address: object_path(checksum),
379 expected: checksum.to_owned(),
380 actual,
381 });
382 }
383
384 let target = self.object_disk_path(checksum);
385 if let Some(parent) = target.parent() {
386 fs::create_dir_all(parent)?;
387 }
388 let tmp = temp_sibling(&target);
391 fs::write(&tmp, &bytes)?;
392 fs::rename(&tmp, &target)?;
393 Ok(())
394 }
395
396 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
397 write_manifest(
398 manifest,
399 &self.manifest_disk_path(id),
400 id,
401 &Blake3Hasher::new(),
402 )
403 }
404}
405
406fn persist(
410 source: &Path,
411 target: &Path,
412 expected: &str,
413 hasher: &impl Hasher,
414) -> Result<(), StoreError> {
415 if let Some(parent) = target.parent() {
416 fs::create_dir_all(parent)?;
417 }
418
419 let mut attempts_left = MAX_PERSIST_RETRIES;
420 loop {
421 let tmp = temp_sibling(target);
424 copy_file(source, &tmp)?;
425
426 let actual = hash_file(&tmp, hasher)?;
427 if actual == expected {
428 fs::rename(&tmp, target)?;
430 return Ok(());
431 }
432
433 let _ = fs::remove_file(&tmp);
437 let source_actual = hash_file(source, hasher)?;
438 if source_actual != expected {
439 return Err(StoreError::Integrity {
440 address: source.display().to_string(),
441 expected: expected.to_owned(),
442 actual: source_actual,
443 });
444 }
445
446 attempts_left = attempts_left.saturating_sub(1);
447 if attempts_left == 0 {
448 return Err(StoreError::Integrity {
449 address: target.display().to_string(),
450 expected: expected.to_owned(),
451 actual,
452 });
453 }
454 }
455}
456
457fn write_manifest(
462 manifest: &Manifest,
463 target: &Path,
464 id: &str,
465 hasher: &impl Hasher,
466) -> Result<(), StoreError> {
467 if let Some(parent) = target.parent() {
468 fs::create_dir_all(parent)?;
469 }
470
471 let actual = snapdir_core::merkle::snapshot_id(manifest, hasher);
474 if actual != id {
475 return Err(StoreError::Integrity {
476 address: target.display().to_string(),
477 expected: id.to_owned(),
478 actual,
479 });
480 }
481
482 let mut text = manifest.to_string();
485 text.push('\n');
486
487 let tmp = temp_sibling(target);
488 fs::write(&tmp, text.as_bytes())?;
489 fs::rename(&tmp, target)?;
490 Ok(())
491}
492
493fn copy_file(source: &Path, target: &Path) -> Result<(), StoreError> {
497 fs::copy(source, target)?;
498 Ok(())
499}
500
501fn temp_sibling(target: &Path) -> PathBuf {
505 use std::sync::atomic::{AtomicU64, Ordering};
506 static COUNTER: AtomicU64 = AtomicU64::new(0);
507 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
508 let pid = std::process::id();
509 let file_name = target
510 .file_name()
511 .map(|s| s.to_string_lossy().into_owned())
512 .unwrap_or_default();
513 let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
514 match target.parent() {
515 Some(parent) => parent.join(tmp_name),
516 None => PathBuf::from(tmp_name),
517 }
518}
519
520fn strip_leading_dot_slash(path: &str) -> &str {
523 let trimmed = path.strip_prefix("./").unwrap_or(path);
524 trimmed.strip_suffix('/').unwrap_or(trimmed)
525}
526
527fn parse_store_dir(store: &str) -> PathBuf {
538 let resolved = if let Some(rest) = store.strip_prefix("file:") {
539 let rest = rest.trim_start_matches('/');
541 let rest = if let Some(after) = rest.strip_prefix("localhost") {
544 after.strip_prefix('/').unwrap_or(after)
545 } else {
546 rest
547 };
548 format!("/{rest}")
550 } else {
551 store.to_owned()
552 };
553
554 let trimmed = if resolved.len() > 1 {
556 resolved.strip_suffix('/').unwrap_or(&resolved)
557 } else {
558 &resolved
559 };
560 PathBuf::from(trimmed)
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use snapdir_core::manifest::ManifestEntry;
567 use std::fs;
568 use std::path::Path;
569
570 struct TempDir {
573 path: PathBuf,
574 }
575
576 impl TempDir {
577 fn new(tag: &str) -> Self {
578 use std::sync::atomic::{AtomicU64, Ordering};
579 static COUNTER: AtomicU64 = AtomicU64::new(0);
580 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
581 let path = std::env::temp_dir().join(format!(
582 "snapdir-filestore-test-{}-{tag}-{n}",
583 std::process::id()
584 ));
585 fs::create_dir_all(&path).expect("create temp dir");
586 Self { path }
587 }
588
589 fn path(&self) -> &Path {
590 &self.path
591 }
592 }
593
594 impl Drop for TempDir {
595 fn drop(&mut self) {
596 let _ = fs::remove_dir_all(&self.path);
597 }
598 }
599
600 fn make_foo_bar_source(source: &Path) -> (Manifest, String) {
605 let hasher = Blake3Hasher::new();
606 fs::write(source.join("foo"), b"foo\n").unwrap();
607 fs::write(source.join("bar"), b"bar\n").unwrap();
608 let foo_sum = hasher.hash_hex(b"foo\n");
609 let bar_sum = hasher.hash_hex(b"bar\n");
610
611 let root_sum =
612 snapdir_core::merkle::directory_checksum([foo_sum.as_str(), bar_sum.as_str()], &hasher);
613
614 let mut manifest = Manifest::new();
615 manifest.push(ManifestEntry::new(
616 PathType::Directory,
617 "700",
618 root_sum,
619 8,
620 "./",
621 ));
622 manifest.push(ManifestEntry::new(
623 PathType::File,
624 "600",
625 bar_sum,
626 4,
627 "./bar",
628 ));
629 manifest.push(ManifestEntry::new(
630 PathType::File,
631 "600",
632 foo_sum,
633 4,
634 "./foo",
635 ));
636 let manifest = Manifest::from_entries(manifest.entries().to_vec());
637 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
638 (manifest, id)
639 }
640
641 #[test]
642 fn file_store_parse_store_dir_matches_oracle_sed() {
643 assert_eq!(
645 parse_store_dir("file:///tmp/store"),
646 PathBuf::from("/tmp/store")
647 );
648 assert_eq!(
649 parse_store_dir("file:///tmp/store/"),
650 PathBuf::from("/tmp/store")
651 );
652 assert_eq!(
654 parse_store_dir("file://localhost/tmp/store"),
655 PathBuf::from("/tmp/store")
656 );
657 assert_eq!(
659 parse_store_dir("file://tmp/store"),
660 PathBuf::from("/tmp/store")
661 );
662 assert_eq!(parse_store_dir("/tmp/store"), PathBuf::from("/tmp/store"));
664 assert_eq!(parse_store_dir("file:///"), PathBuf::from("/"));
666 }
667
668 #[test]
669 fn file_store_push_lands_objects_at_sharded_keys_and_manifest_last() {
670 let store_dir = TempDir::new("store");
671 let src_dir = TempDir::new("src");
672 let (manifest, id) = make_foo_bar_source(src_dir.path());
673
674 let store = FileStore::from_root(store_dir.path());
675 store.push(&manifest, src_dir.path()).expect("push ok");
676
677 for entry in manifest.entries() {
679 if entry.path_type == PathType::File {
680 let obj = store_dir.path().join(object_path(&entry.checksum));
681 assert!(obj.exists(), "expected object at {}", obj.display());
682 let bytes = fs::read(&obj).unwrap();
684 assert_eq!(
685 Blake3Hasher::new().hash_hex(&bytes),
686 entry.checksum,
687 "object content must hash to its address"
688 );
689 }
690 }
691
692 let man_path = store_dir.path().join(manifest_path(&id));
694 assert!(man_path.exists(), "manifest must exist after push");
695 let read_back = store.get_manifest(&id).expect("manifest reads back");
696 assert_eq!(read_back, manifest);
697 }
698
699 #[test]
700 fn file_store_push_skips_when_manifest_present() {
701 let store_dir = TempDir::new("store");
702 let src_dir = TempDir::new("src");
703 let (manifest, id) = make_foo_bar_source(src_dir.path());
704 let store = FileStore::from_root(store_dir.path());
705 store.push(&manifest, src_dir.path()).expect("first push");
706
707 let foo_entry = manifest
710 .entries()
711 .iter()
712 .find(|e| e.path == "./foo")
713 .unwrap();
714 let obj = store_dir.path().join(object_path(&foo_entry.checksum));
715 fs::remove_file(&obj).unwrap();
716
717 let _ = id;
718 store
719 .push(&manifest, src_dir.path())
720 .expect("second push skips");
721 assert!(
722 !obj.exists(),
723 "manifest-present push must be a full no-op (object stays removed)"
724 );
725 }
726
727 #[test]
728 fn file_store_push_skips_present_objects_but_adds_missing() {
729 let store_dir = TempDir::new("store");
730 let src_dir = TempDir::new("src");
731 let (manifest, id) = make_foo_bar_source(src_dir.path());
732 let store = FileStore::from_root(store_dir.path());
733 store.push(&manifest, src_dir.path()).expect("first push");
734
735 let man_path = store_dir.path().join(manifest_path(&id));
738 fs::remove_file(&man_path).unwrap();
739 let foo_entry = manifest
740 .entries()
741 .iter()
742 .find(|e| e.path == "./foo")
743 .unwrap();
744 let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
745 fs::remove_file(&foo_obj).unwrap();
746
747 store.push(&manifest, src_dir.path()).expect("re-push");
748 assert!(foo_obj.exists(), "missing object must be re-added");
749 assert!(man_path.exists(), "manifest must be re-written");
750 }
751
752 #[test]
753 fn file_store_fetch_round_trips_and_verifies() {
754 let store_dir = TempDir::new("store");
755 let src_dir = TempDir::new("src");
756 let dest_dir = TempDir::new("dest");
757 let (manifest, id) = make_foo_bar_source(src_dir.path());
758 let store = FileStore::from_root(store_dir.path());
759 store.push(&manifest, src_dir.path()).expect("push");
760
761 let fetched = store.get_manifest(&id).expect("get manifest");
762 store
763 .fetch_files(&fetched, dest_dir.path())
764 .expect("fetch files");
765
766 assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
767 assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
768 }
769
770 #[test]
771 fn file_store_get_manifest_missing_is_not_found() {
772 let store_dir = TempDir::new("store");
773 let store = FileStore::from_root(store_dir.path());
774 let missing = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
775 match store.get_manifest(missing) {
776 Err(StoreError::ManifestNotFound { id }) => assert_eq!(id, missing),
777 other => panic!("expected ManifestNotFound, got {other:?}"),
778 }
779 }
780
781 #[test]
782 fn file_store_get_manifest_tampered_fails_integrity() {
783 let store_dir = TempDir::new("store");
784 let src_dir = TempDir::new("src");
785 let (manifest, id) = make_foo_bar_source(src_dir.path());
786 let store = FileStore::from_root(store_dir.path());
787 store.push(&manifest, src_dir.path()).expect("push");
788
789 let man_path = store_dir.path().join(manifest_path(&id));
791 fs::write(&man_path, b"D 700 deadbeef 0 ./\n").unwrap();
792
793 match store.get_manifest(&id) {
794 Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, id),
795 other => panic!("expected Integrity, got {other:?}"),
796 }
797 }
798
799 #[test]
800 fn file_store_fetch_missing_object_is_not_found() {
801 let store_dir = TempDir::new("store");
802 let dest_dir = TempDir::new("dest");
803 let hasher = Blake3Hasher::new();
804 let foo_sum = hasher.hash_hex(b"foo\n");
805
806 let mut manifest = Manifest::new();
807 manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 4, "./"));
808 manifest.push(ManifestEntry::new(
809 PathType::File,
810 "600",
811 foo_sum.clone(),
812 4,
813 "./foo",
814 ));
815
816 let store = FileStore::from_root(store_dir.path());
817 match store.fetch_files(&manifest, dest_dir.path()) {
818 Err(StoreError::ObjectNotFound { checksum }) => assert_eq!(checksum, foo_sum),
819 other => panic!("expected ObjectNotFound, got {other:?}"),
820 }
821 }
822
823 #[test]
824 fn file_store_persist_rejects_corrupt_source() {
825 let store_dir = TempDir::new("store");
829 let src_dir = TempDir::new("src");
830 let dest_dir = TempDir::new("dest");
831 let hasher = Blake3Hasher::new();
832
833 let (manifest, id) = make_foo_bar_source(src_dir.path());
836 let store = FileStore::from_root(store_dir.path());
837 store.push(&manifest, src_dir.path()).expect("push");
838
839 let foo_entry = manifest
840 .entries()
841 .iter()
842 .find(|e| e.path == "./foo")
843 .unwrap();
844 let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
845 fs::write(&foo_obj, b"corrupted not foo\n").unwrap();
846 assert_ne!(hasher.hash_hex(b"corrupted not foo\n"), foo_entry.checksum);
848
849 let fetched = store.get_manifest(&id).expect("manifest still valid");
850 match store.fetch_files(&fetched, dest_dir.path()) {
851 Err(StoreError::Integrity { expected, .. }) => {
852 assert_eq!(expected, foo_entry.checksum);
853 }
854 other => panic!("expected Integrity from corrupt object, got {other:?}"),
855 }
856 assert!(!dest_dir.path().join("foo").exists());
858 }
859
860 #[test]
861 fn fetch_skip_present_verified() {
862 let store_dir = TempDir::new("store");
867 let src_dir = TempDir::new("src");
868 let dest_dir = TempDir::new("dest");
869 let (manifest, id) = make_foo_bar_source(src_dir.path());
870
871 let store = FileStore::from_root(store_dir.path());
872 store.push(&manifest, src_dir.path()).expect("push");
873
874 let fetched = store.get_manifest(&id).expect("get manifest");
875 store
876 .fetch_files(&fetched, dest_dir.path())
877 .expect("first fetch populates dest");
878 assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
879 assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
880
881 let objects = store_dir.path().join(".objects");
883 fs::remove_dir_all(&objects).expect("remove .objects tree");
884 assert!(!objects.exists());
885
886 store
889 .fetch_files(&fetched, dest_dir.path())
890 .expect("second fetch skips every present+verified file (no object reads)");
891
892 assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
894 assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
895 }
896
897 #[test]
898 fn file_store_fetch_repairs_corrupt_dest_and_skips_intact() {
899 let store_dir = TempDir::new("store");
903 let src_dir = TempDir::new("src");
904 let dest_dir = TempDir::new("dest");
905 let (manifest, id) = make_foo_bar_source(src_dir.path());
906
907 let store = FileStore::from_root(store_dir.path());
908 store.push(&manifest, src_dir.path()).expect("push");
909 let fetched = store.get_manifest(&id).expect("get manifest");
910 store
911 .fetch_files(&fetched, dest_dir.path())
912 .expect("first fetch populates dest");
913
914 fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
916 let bar_entry = manifest
919 .entries()
920 .iter()
921 .find(|e| e.path == "./bar")
922 .unwrap();
923 let bar_obj = store_dir.path().join(object_path(&bar_entry.checksum));
924 fs::remove_file(&bar_obj).unwrap();
925
926 store
927 .fetch_files(&fetched, dest_dir.path())
928 .expect("repair corrupt foo, skip intact bar");
929
930 assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
932 assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
933 }
934
935 #[test]
936 fn file_store_fetch_mismatch_then_missing_object_errors() {
937 let store_dir = TempDir::new("store");
941 let src_dir = TempDir::new("src");
942 let dest_dir = TempDir::new("dest");
943 let (manifest, id) = make_foo_bar_source(src_dir.path());
944
945 let store = FileStore::from_root(store_dir.path());
946 store.push(&manifest, src_dir.path()).expect("push");
947 let fetched = store.get_manifest(&id).expect("get manifest");
948 store
949 .fetch_files(&fetched, dest_dir.path())
950 .expect("first fetch populates dest");
951
952 let foo_entry = manifest
953 .entries()
954 .iter()
955 .find(|e| e.path == "./foo")
956 .unwrap();
957 fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
959 let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
961 fs::remove_file(&foo_obj).unwrap();
962
963 match store.fetch_files(&fetched, dest_dir.path()) {
964 Err(StoreError::ObjectNotFound { checksum }) => {
965 assert_eq!(checksum, foo_entry.checksum);
966 }
967 other => panic!("expected ObjectNotFound (cannot repair), got {other:?}"),
968 }
969 }
970
971 fn make_nested_source(source: &Path) -> (Manifest, String) {
984 let hasher = Blake3Hasher::new();
985 let files: &[(&str, &[u8])] = &[
986 ("a.txt", b"a contents\n"),
987 ("b.txt", b"b contents\n"),
988 ("sub/c.txt", b"c contents\n"),
989 ("sub/deep/d.txt", b"d contents\n"),
990 ];
991
992 fs::create_dir_all(source.join("sub/deep")).unwrap();
993 for (rel, bytes) in files {
994 fs::write(source.join(rel), bytes).unwrap();
995 }
996
997 let mut manifest = Manifest::new();
998 manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1003 manifest.push(ManifestEntry::new(
1004 PathType::Directory,
1005 "700",
1006 "x",
1007 0,
1008 "./sub/",
1009 ));
1010 manifest.push(ManifestEntry::new(
1011 PathType::Directory,
1012 "700",
1013 "x",
1014 0,
1015 "./sub/deep/",
1016 ));
1017 for (rel, bytes) in files {
1018 let sum = hasher.hash_hex(bytes);
1019 #[allow(clippy::cast_possible_truncation)]
1020 manifest.push(ManifestEntry::new(
1021 PathType::File,
1022 "600",
1023 sum,
1024 bytes.len() as u64,
1025 format!("./{rel}"),
1026 ));
1027 }
1028
1029 let manifest = Manifest::from_entries(manifest.entries().to_vec());
1030 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1031 (manifest, id)
1032 }
1033
1034 fn assert_nested_dest(dest: &Path) {
1036 assert_eq!(fs::read(dest.join("a.txt")).unwrap(), b"a contents\n");
1037 assert_eq!(fs::read(dest.join("b.txt")).unwrap(), b"b contents\n");
1038 assert_eq!(fs::read(dest.join("sub/c.txt")).unwrap(), b"c contents\n");
1039 assert_eq!(
1040 fs::read(dest.join("sub/deep/d.txt")).unwrap(),
1041 b"d contents\n"
1042 );
1043 }
1044
1045 #[test]
1046 fn filestore_parallel_roundtrip_byte_identical() {
1047 let src_dir = TempDir::new("src");
1051 let (manifest, id) = make_nested_source(src_dir.path());
1052
1053 let par_store_dir = TempDir::new("store-par");
1055 let par_dest_dir = TempDir::new("dest-par");
1056 let par_store =
1057 FileStore::from_root_with_config(par_store_dir.path(), TransferConfig::new(4, None));
1058 par_store.push(&manifest, src_dir.path()).expect("par push");
1059 let par_manifest = par_store.get_manifest(&id).expect("par get manifest");
1060 assert_eq!(par_manifest, manifest, "round-tripped manifest matches");
1061 par_store
1062 .fetch_files(&par_manifest, par_dest_dir.path())
1063 .expect("par fetch");
1064 assert_nested_dest(par_dest_dir.path());
1065
1066 let seq_store_dir = TempDir::new("store-seq");
1068 let seq_dest_dir = TempDir::new("dest-seq");
1069 let seq_store =
1070 FileStore::from_root_with_config(seq_store_dir.path(), TransferConfig::new(1, None));
1071 seq_store.push(&manifest, src_dir.path()).expect("seq push");
1072 let seq_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1073 assert_eq!(seq_id, id, "snapshot id is concurrency-independent");
1074 seq_store
1075 .fetch_files(&manifest, seq_dest_dir.path())
1076 .expect("seq fetch");
1077 assert_nested_dest(seq_dest_dir.path());
1078
1079 for entry in manifest.entries() {
1082 if entry.path_type != PathType::File {
1083 continue;
1084 }
1085 let key = object_path(&entry.checksum);
1086 let par_obj = par_store_dir.path().join(&key);
1087 let seq_obj = seq_store_dir.path().join(&key);
1088 assert!(par_obj.exists(), "par object {key} present");
1089 assert!(seq_obj.exists(), "seq object {key} present");
1090 assert_eq!(
1091 fs::read(&par_obj).unwrap(),
1092 fs::read(&seq_obj).unwrap(),
1093 "par and seq object bytes identical"
1094 );
1095 }
1096 }
1097
1098 #[test]
1099 fn filestore_parallel_concurrency_one_sequential() {
1100 let store_dir = TempDir::new("store");
1103 let src_dir = TempDir::new("src");
1104 let dest_dir = TempDir::new("dest");
1105 let (manifest, id) = make_nested_source(src_dir.path());
1106
1107 let store =
1108 FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(1, None));
1109 store.push(&manifest, src_dir.path()).expect("push");
1110 let fetched = store.get_manifest(&id).expect("get manifest");
1111 store.fetch_files(&fetched, dest_dir.path()).expect("fetch");
1112 assert_nested_dest(dest_dir.path());
1113 }
1114
1115 #[test]
1116 fn filestore_parallel_all_or_nothing_bad_object() {
1117 let store_dir = TempDir::new("store");
1121 let src_dir = TempDir::new("src");
1122 let (manifest, id) = make_nested_source(src_dir.path());
1123
1124 fs::write(src_dir.path().join("sub/c.txt"), b"TAMPERED\n").unwrap();
1127
1128 let store =
1129 FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1130 match store.push(&manifest, src_dir.path()) {
1131 Err(StoreError::Integrity { .. }) => {}
1132 other => panic!("expected Integrity from bad source object, got {other:?}"),
1133 }
1134
1135 let man_path = store.manifest_disk_path(&id);
1137 assert!(
1138 !man_path.exists(),
1139 "manifest must not be written when an object copy fails"
1140 );
1141 }
1142
1143 #[test]
1144 fn filestore_parallel_large_n_round_trips() {
1145 let store_dir = TempDir::new("store");
1147 let src_dir = TempDir::new("src");
1148 let dest_dir = TempDir::new("dest");
1149 let hasher = Blake3Hasher::new();
1150
1151 let mut manifest = Manifest::new();
1152 manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1153 let n = 50usize;
1154 for i in 0..n {
1155 let name = format!("file-{i:03}.txt");
1156 let contents = format!("contents of file {i}\n");
1157 fs::write(src_dir.path().join(&name), contents.as_bytes()).unwrap();
1158 let sum = hasher.hash_hex(contents.as_bytes());
1159 #[allow(clippy::cast_possible_truncation)]
1160 manifest.push(ManifestEntry::new(
1161 PathType::File,
1162 "600",
1163 sum,
1164 contents.len() as u64,
1165 format!("./{name}"),
1166 ));
1167 }
1168 let manifest = Manifest::from_entries(manifest.entries().to_vec());
1169 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1170
1171 let store =
1172 FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1173 store.push(&manifest, src_dir.path()).expect("push N files");
1174 let fetched = store.get_manifest(&id).expect("get manifest");
1175 store
1176 .fetch_files(&fetched, dest_dir.path())
1177 .expect("fetch N files");
1178
1179 for i in 0..n {
1180 let name = format!("file-{i:03}.txt");
1181 let expected = format!("contents of file {i}\n");
1182 assert_eq!(
1183 fs::read(dest_dir.path().join(&name)).unwrap(),
1184 expected.as_bytes()
1185 );
1186 }
1187 }
1188
1189 #[test]
1190 fn meter_records_filestore_push_fetch() {
1191 let src_dir = TempDir::new("src");
1197 let (manifest, id) = make_nested_source(src_dir.path());
1198
1199 let n = manifest
1200 .entries()
1201 .iter()
1202 .filter(|e| e.path_type == PathType::File)
1203 .count() as u64;
1204 let total_bytes: u64 = manifest
1205 .entries()
1206 .iter()
1207 .filter(|e| e.path_type == PathType::File)
1208 .map(|e| e.size)
1209 .sum();
1210
1211 let store_dir = TempDir::new("store");
1212 let dest_dir = TempDir::new("dest");
1213 let meter = Arc::new(Meter::new());
1214 let store = FileStore::from_root(store_dir.path()).with_meter(Some(Arc::clone(&meter)));
1215
1216 store.push(&manifest, src_dir.path()).expect("push");
1217 let after_push = meter.snapshot();
1218 assert_eq!(after_push.bytes_in, total_bytes, "push read every object");
1219 assert_eq!(after_push.bytes_out, total_bytes, "push wrote every object");
1220 assert_eq!(after_push.objects_done, n, "push finished N objects");
1221 assert_eq!(after_push.objects_skipped, 0, "fresh store skips nothing");
1222 assert_eq!(after_push.objects_total, total_bytes, "push set byte total");
1223 assert_eq!(after_push.in_flight, 0, "nothing left in flight");
1224
1225 let fetched = store.get_manifest(&id).expect("get manifest");
1226 store
1227 .fetch_files(&fetched, dest_dir.path())
1228 .expect("fetch_files");
1229 let after_fetch = meter.snapshot();
1230 assert_eq!(
1231 after_fetch.bytes_in,
1232 2 * total_bytes,
1233 "fetch read every object again"
1234 );
1235 assert_eq!(
1236 after_fetch.bytes_out,
1237 2 * total_bytes,
1238 "fetch wrote every object again"
1239 );
1240 assert_eq!(after_fetch.objects_done, 2 * n, "push + fetch = 2N objects");
1241 assert_eq!(after_fetch.in_flight, 0, "nothing left in flight");
1242
1243 assert_nested_dest(dest_dir.path());
1245 }
1246
1247 #[test]
1248 fn meter_records_none_is_identical() {
1249 let src_dir = TempDir::new("src");
1253 let (manifest, id) = make_nested_source(src_dir.path());
1254
1255 let metered_store_dir = TempDir::new("store-metered");
1257 let metered_dest_dir = TempDir::new("dest-metered");
1258 let meter = Arc::new(Meter::new());
1259 let metered =
1260 FileStore::from_root(metered_store_dir.path()).with_meter(Some(Arc::clone(&meter)));
1261 metered
1262 .push(&manifest, src_dir.path())
1263 .expect("metered push");
1264 let metered_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1265 let metered_manifest = metered.get_manifest(&id).expect("metered manifest");
1266 metered
1267 .fetch_files(&metered_manifest, metered_dest_dir.path())
1268 .expect("metered fetch");
1269
1270 let plain_store_dir = TempDir::new("store-plain");
1272 let plain_dest_dir = TempDir::new("dest-plain");
1273 let plain = FileStore::from_root(plain_store_dir.path());
1274 plain.push(&manifest, src_dir.path()).expect("plain push");
1275 let plain_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1276 let plain_manifest = plain.get_manifest(&id).expect("plain manifest");
1277 plain
1278 .fetch_files(&plain_manifest, plain_dest_dir.path())
1279 .expect("plain fetch");
1280
1281 assert_eq!(metered_id, plain_id, "snapshot id unaffected by the meter");
1283 assert_eq!(metered_id, id);
1284
1285 for entry in manifest.entries() {
1287 if entry.path_type != PathType::File {
1288 continue;
1289 }
1290 let key = object_path(&entry.checksum);
1291 let metered_obj = metered_store_dir.path().join(&key);
1292 let plain_obj = plain_store_dir.path().join(&key);
1293 assert!(metered_obj.exists(), "metered object {key} present");
1294 assert!(plain_obj.exists(), "plain object {key} present");
1295 assert_eq!(
1296 fs::read(&metered_obj).unwrap(),
1297 fs::read(&plain_obj).unwrap(),
1298 "metered and unmetered object bytes identical"
1299 );
1300 }
1301
1302 assert_nested_dest(metered_dest_dir.path());
1304 assert_nested_dest(plain_dest_dir.path());
1305 }
1306
1307 #[test]
1308 fn file_store_strip_leading_dot_slash() {
1309 assert_eq!(strip_leading_dot_slash("./foo"), "foo");
1310 assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
1311 assert_eq!(strip_leading_dot_slash("./a/"), "a");
1312 assert_eq!(strip_leading_dot_slash("./"), "");
1313 assert_eq!(strip_leading_dot_slash("/abs/path"), "/abs/path");
1314 }
1315
1316 #[test]
1322 fn stream_store_filestore_object_roundtrip() {
1323 let store_dir = TempDir::new("stream-roundtrip");
1324 let store = FileStore::from_root(store_dir.path());
1325
1326 let bytes = b"hello stream store\n".to_vec();
1327 let checksum = Blake3Hasher::new().hash_hex(&bytes);
1328
1329 assert!(!store.has_object(&checksum).unwrap());
1331
1332 store.put_object(&checksum, bytes.clone()).expect("put ok");
1333
1334 assert!(store.has_object(&checksum).unwrap());
1336 assert_eq!(store.get_object(&checksum).unwrap(), bytes);
1337
1338 assert!(store_dir.path().join(object_path(&checksum)).exists());
1340 }
1341
1342 #[test]
1343 fn stream_store_get_object_rejects_corruption() {
1344 let store_dir = TempDir::new("stream-corrupt");
1345 let store = FileStore::from_root(store_dir.path());
1346
1347 let good = b"the real object bytes\n".to_vec();
1350 let checksum = Blake3Hasher::new().hash_hex(&good);
1351 let target = store_dir.path().join(object_path(&checksum));
1352 fs::create_dir_all(target.parent().unwrap()).unwrap();
1353 fs::write(&target, b"TAMPERED bytes that do not hash to the address\n").unwrap();
1354
1355 match store.get_object(&checksum) {
1356 Err(StoreError::Integrity {
1357 expected, actual, ..
1358 }) => {
1359 assert_eq!(expected, checksum);
1360 assert_ne!(actual, checksum, "actual must differ from the address");
1361 }
1362 other => panic!("expected Integrity, got {other:?}"),
1363 }
1364 }
1365
1366 #[test]
1367 fn stream_store_put_object_rejects_wrong_checksum() {
1368 let store_dir = TempDir::new("stream-wrong-checksum");
1369 let store = FileStore::from_root(store_dir.path());
1370
1371 let bytes = b"some payload\n".to_vec();
1372 let wrong = "dead".repeat(16); assert_ne!(wrong, Blake3Hasher::new().hash_hex(&bytes));
1375
1376 match store.put_object(&wrong, bytes) {
1377 Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, wrong),
1378 other => panic!("expected Integrity, got {other:?}"),
1379 }
1380
1381 assert!(!store.has_object(&wrong).unwrap());
1383 assert!(!store_dir.path().join(object_path(&wrong)).exists());
1384 }
1385
1386 #[test]
1387 fn stream_store_put_manifest_roundtrips() {
1388 let store_dir = TempDir::new("stream-manifest");
1389 let src_dir = TempDir::new("stream-manifest-src");
1390 let store = FileStore::from_root(store_dir.path());
1391
1392 let (manifest, id) = make_foo_bar_source(src_dir.path());
1393
1394 store.put_manifest(&id, &manifest).expect("put_manifest ok");
1395
1396 let back = store.get_manifest(&id).expect("get_manifest ok");
1399 assert_eq!(back.entries(), manifest.entries());
1400 assert_eq!(
1401 snapdir_core::merkle::snapshot_id(&back, &Blake3Hasher::new()),
1402 id
1403 );
1404 }
1405}