1use std::fs;
33use std::io;
34use std::path::{Path, PathBuf};
35
36use std::sync::Arc;
37
38use snapdir_core::manifest::{Manifest, PathType};
39use snapdir_core::merkle::{Blake3Hasher, Hasher};
40use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
41use snapdir_core::Meter;
42
43use crate::adaptive::{
44 p95_object_size, AdaptiveGate, AdaptivePolicy as ControllerPolicy, ControllerDriver, OpResult,
45 OpSample,
46};
47use crate::stream::StreamStore;
48use crate::transfer::{classify_error, AdaptivePolicy, TransferConfig};
49use crate::util::{file_present_and_verified, hash_file};
50
51const MAX_PERSIST_RETRIES: u32 = 5;
54
55#[derive(Debug, Clone)]
61pub struct FileStore {
62 root: PathBuf,
63 config: TransferConfig,
64 meter: Option<Arc<Meter>>,
68}
69
70impl FileStore {
71 #[must_use]
79 pub fn new(store: &str) -> Self {
80 Self::from_root(parse_store_dir(store))
81 }
82
83 #[must_use]
86 pub fn new_with_config(store: &str, config: TransferConfig) -> Self {
87 Self::from_root_with_config(parse_store_dir(store), config)
88 }
89
90 #[must_use]
92 pub fn from_root(root: impl Into<PathBuf>) -> Self {
93 Self::from_root_with_config(root, TransferConfig::default())
94 }
95
96 #[must_use]
100 pub fn from_root_with_config(root: impl Into<PathBuf>, config: TransferConfig) -> Self {
101 Self {
102 root: root.into(),
103 config,
104 meter: None,
105 }
106 }
107
108 #[must_use]
114 pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
115 self.meter = meter;
116 self
117 }
118
119 #[must_use]
121 pub fn root(&self) -> &Path {
122 &self.root
123 }
124
125 #[must_use]
128 pub fn transfer_config(&self) -> &TransferConfig {
129 &self.config
130 }
131
132 fn object_disk_path(&self, checksum: &str) -> PathBuf {
134 self.root.join(object_path(checksum))
135 }
136
137 fn manifest_disk_path(&self, id: &str) -> PathBuf {
139 self.root.join(manifest_path(id))
140 }
141
142 fn parallel_copy(&self, jobs: &[(PathBuf, PathBuf, String)]) -> Result<(), StoreError> {
152 if jobs.is_empty() {
153 return Ok(());
154 }
155 match self.config.adaptive {
156 AdaptivePolicy::Off => self.parallel_copy_fixed(jobs),
157 AdaptivePolicy::On { fraction, ceiling } => {
158 self.parallel_copy_adaptive(jobs, fraction, ceiling)
159 }
160 }
161 }
162
163 fn parallel_copy_fixed(&self, jobs: &[(PathBuf, PathBuf, String)]) -> Result<(), StoreError> {
166 use rayon::prelude::*;
167
168 let meter = self.meter.as_deref();
171
172 let pool = rayon::ThreadPoolBuilder::new()
173 .num_threads(self.config.concurrency.get())
174 .build()
175 .map_err(|err| StoreError::Backend {
176 message: "failed to build copy thread pool".to_owned(),
177 source: Some(Box::new(err)),
178 })?;
179
180 pool.install(|| {
181 jobs.par_iter().try_for_each(|(source, target, expected)| {
182 if let Some(m) = meter {
183 m.object_started();
184 }
185 let len = std::fs::metadata(source).map_or(0, |md| md.len());
189 persist(source, target, expected, &Blake3Hasher::new())?;
190 if let Some(m) = meter {
191 m.add_in(len);
192 m.add_out(len);
193 m.object_finished();
194 }
195 Ok(())
196 })
197 })
198 }
199
200 fn parallel_copy_adaptive(
210 &self,
211 jobs: &[(PathBuf, PathBuf, String)],
212 fraction: f64,
213 ceiling: usize,
214 ) -> Result<(), StoreError> {
215 use rayon::prelude::*;
216
217 let meter = self.meter.as_deref();
218
219 let sizes: Vec<u64> = jobs
220 .iter()
221 .map(|(source, _, _)| std::fs::metadata(source).map_or(0, |md| md.len()))
222 .collect();
223 let p95 = p95_object_size(&sizes);
224 let total_ram = snapdir_core::resources::total_ram_bytes().unwrap_or(0);
225 let policy = ControllerPolicy::new(fraction, ceiling, total_ram, None);
226
227 let gate = AdaptiveGate::new(self.config.concurrency.get(), ceiling);
228 let driver = ControllerDriver::new(policy, gate.clone(), p95, None, self.meter.clone());
230
231 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
233 let tick_driver = driver.clone();
234 let tick_stop = Arc::clone(&stop);
235 let ticker = std::thread::spawn(move || {
236 while !tick_stop.load(std::sync::atomic::Ordering::Relaxed) {
237 std::thread::sleep(std::time::Duration::from_millis(250));
238 if tick_stop.load(std::sync::atomic::Ordering::Relaxed) {
239 break;
240 }
241 tick_driver.tick();
242 }
243 });
244
245 let pool = rayon::ThreadPoolBuilder::new()
246 .num_threads(ceiling.max(1))
247 .build()
248 .map_err(|err| StoreError::Backend {
249 message: "failed to build copy thread pool".to_owned(),
250 source: Some(Box::new(err)),
251 })?;
252
253 let result = pool.install(|| {
254 jobs.par_iter().try_for_each(|(source, target, expected)| {
255 let _permit = gate.acquire_blocking();
257 if let Some(m) = meter {
258 m.object_started();
259 }
260 let len = std::fs::metadata(source).map_or(0, |md| md.len());
261 let started = std::time::Instant::now();
262 let outcome = persist(source, target, expected, &Blake3Hasher::new());
263 let latency = started.elapsed();
264 let (bytes, op_result) = match &outcome {
265 Ok(()) => (len, OpResult::Ok),
266 Err(err) => (0, classify_error(err)),
267 };
268 driver.record_op(OpSample {
269 bytes,
270 latency,
271 result: op_result,
272 });
273 outcome?;
274 if let Some(m) = meter {
275 m.add_in(len);
276 m.add_out(len);
277 m.object_finished();
278 }
279 Ok(())
280 })
281 });
282
283 stop.store(true, std::sync::atomic::Ordering::Relaxed);
285 let _ = ticker.join();
286 result
287 }
288}
289
290impl Store for FileStore {
291 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
292 let path = self.manifest_disk_path(id);
293 let bytes = match fs::read(&path) {
294 Ok(bytes) => bytes,
295 Err(err) if err.kind() == io::ErrorKind::NotFound => {
296 return Err(StoreError::ManifestNotFound { id: id.to_owned() });
297 }
298 Err(err) => return Err(StoreError::Io(err)),
299 };
300
301 let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
307 message: format!("manifest {id} is not valid UTF-8"),
308 source: Some(Box::new(err)),
309 })?;
310 let manifest = Manifest::parse(&text)?;
311
312 let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
313 if actual != id {
314 return Err(StoreError::Integrity {
315 address: manifest_path(id),
316 expected: id.to_owned(),
317 actual,
318 });
319 }
320
321 Ok(manifest)
322 }
323
324 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
325 let hasher = Blake3Hasher::new();
326
327 let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
337 for entry in manifest.entries() {
338 let rel = strip_leading_dot_slash(&entry.path);
339 let target = dest.join(rel);
340 match entry.path_type {
341 PathType::Directory => {
342 fs::create_dir_all(&target)?;
343 }
344 PathType::File => {
345 if file_present_and_verified(&target, &entry.checksum, &hasher) {
350 if let Some(m) = self.meter.as_deref() {
352 m.add_skipped(1);
353 }
354 continue;
355 }
356 if let Some(parent) = target.parent() {
357 fs::create_dir_all(parent)?;
358 }
359 let source = self.object_disk_path(&entry.checksum);
360 if !source.exists() {
361 return Err(StoreError::ObjectNotFound {
362 checksum: entry.checksum.clone(),
363 });
364 }
365 jobs.push((source, target, entry.checksum.clone()));
366 }
367 }
368 }
369
370 if let Some(m) = self.meter.as_deref() {
373 let total: u64 = jobs
374 .iter()
375 .map(|(source, _, _)| fs::metadata(source).map_or(0, |md| md.len()))
376 .sum();
377 m.set_total(total);
378 }
379
380 self.parallel_copy(&jobs)
384 }
385
386 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
387 let hasher = Blake3Hasher::new();
390 let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
391 let manifest_target = self.manifest_disk_path(&id);
392
393 if manifest_target.exists() {
397 return Ok(());
398 }
399
400 let mut jobs: Vec<(PathBuf, PathBuf, String)> = Vec::new();
404 for entry in manifest.entries() {
405 if entry.path_type != PathType::File {
406 continue;
407 }
408 let object_target = self.object_disk_path(&entry.checksum);
409 if object_target.exists() {
410 if let Some(m) = self.meter.as_deref() {
412 m.add_skipped(1);
413 }
414 continue;
415 }
416 let rel = strip_leading_dot_slash(&entry.path);
417 let object_source = source.join(rel);
418 jobs.push((object_source, object_target, entry.checksum.clone()));
419 }
420
421 if let Some(m) = self.meter.as_deref() {
424 let total: u64 = jobs
425 .iter()
426 .map(|(src, _, _)| fs::metadata(src).map_or(0, |md| md.len()))
427 .sum();
428 m.set_total(total);
429 }
430
431 self.parallel_copy(&jobs)?;
436
437 write_manifest(manifest, &manifest_target, &id, &hasher)?;
440 Ok(())
441 }
442}
443
444impl StreamStore for FileStore {
445 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
446 Ok(self.object_disk_path(checksum).exists())
447 }
448
449 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
450 let path = self.object_disk_path(checksum);
451 let bytes = match fs::read(&path) {
452 Ok(bytes) => bytes,
453 Err(err) if err.kind() == io::ErrorKind::NotFound => {
454 return Err(StoreError::ObjectNotFound {
455 checksum: checksum.to_owned(),
456 });
457 }
458 Err(err) => return Err(StoreError::Io(err)),
459 };
460
461 let actual = Blake3Hasher::new().hash_hex(&bytes);
465 if actual != checksum {
466 return Err(StoreError::Integrity {
467 address: path.display().to_string(),
468 expected: checksum.to_owned(),
469 actual,
470 });
471 }
472 Ok(bytes)
473 }
474
475 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
476 let actual = Blake3Hasher::new().hash_hex(&bytes);
479 if actual != checksum {
480 return Err(StoreError::Integrity {
481 address: object_path(checksum),
482 expected: checksum.to_owned(),
483 actual,
484 });
485 }
486
487 let target = self.object_disk_path(checksum);
488 if let Some(parent) = target.parent() {
489 fs::create_dir_all(parent)?;
490 }
491 let tmp = temp_sibling(&target);
494 fs::write(&tmp, &bytes)?;
495 fs::rename(&tmp, &target)?;
496 Ok(())
497 }
498
499 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
500 write_manifest(
501 manifest,
502 &self.manifest_disk_path(id),
503 id,
504 &Blake3Hasher::new(),
505 )
506 }
507}
508
509fn persist(
513 source: &Path,
514 target: &Path,
515 expected: &str,
516 hasher: &impl Hasher,
517) -> Result<(), StoreError> {
518 if let Some(parent) = target.parent() {
519 fs::create_dir_all(parent)?;
520 }
521
522 let mut attempts_left = MAX_PERSIST_RETRIES;
523 loop {
524 let tmp = temp_sibling(target);
527 copy_file(source, &tmp)?;
528
529 let actual = hash_file(&tmp, hasher)?;
530 if actual == expected {
531 fs::rename(&tmp, target)?;
533 return Ok(());
534 }
535
536 let _ = fs::remove_file(&tmp);
540 let source_actual = hash_file(source, hasher)?;
541 if source_actual != expected {
542 return Err(StoreError::Integrity {
543 address: source.display().to_string(),
544 expected: expected.to_owned(),
545 actual: source_actual,
546 });
547 }
548
549 attempts_left = attempts_left.saturating_sub(1);
550 if attempts_left == 0 {
551 return Err(StoreError::Integrity {
552 address: target.display().to_string(),
553 expected: expected.to_owned(),
554 actual,
555 });
556 }
557 }
558}
559
560fn write_manifest(
565 manifest: &Manifest,
566 target: &Path,
567 id: &str,
568 hasher: &impl Hasher,
569) -> Result<(), StoreError> {
570 if let Some(parent) = target.parent() {
571 fs::create_dir_all(parent)?;
572 }
573
574 let actual = snapdir_core::merkle::snapshot_id(manifest, hasher);
577 if actual != id {
578 return Err(StoreError::Integrity {
579 address: target.display().to_string(),
580 expected: id.to_owned(),
581 actual,
582 });
583 }
584
585 let mut text = manifest.to_string();
588 text.push('\n');
589
590 let tmp = temp_sibling(target);
591 fs::write(&tmp, text.as_bytes())?;
592 fs::rename(&tmp, target)?;
593 Ok(())
594}
595
596fn copy_file(source: &Path, target: &Path) -> Result<(), StoreError> {
600 fs::copy(source, target)?;
601 Ok(())
602}
603
604fn temp_sibling(target: &Path) -> PathBuf {
608 use std::sync::atomic::{AtomicU64, Ordering};
609 static COUNTER: AtomicU64 = AtomicU64::new(0);
610 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
611 let pid = std::process::id();
612 let file_name = target
613 .file_name()
614 .map(|s| s.to_string_lossy().into_owned())
615 .unwrap_or_default();
616 let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
617 match target.parent() {
618 Some(parent) => parent.join(tmp_name),
619 None => PathBuf::from(tmp_name),
620 }
621}
622
623fn strip_leading_dot_slash(path: &str) -> &str {
626 let trimmed = path.strip_prefix("./").unwrap_or(path);
627 trimmed.strip_suffix('/').unwrap_or(trimmed)
628}
629
630fn parse_store_dir(store: &str) -> PathBuf {
641 let resolved = if let Some(rest) = store.strip_prefix("file:") {
642 let rest = rest.trim_start_matches('/');
644 let rest = if let Some(after) = rest.strip_prefix("localhost") {
647 after.strip_prefix('/').unwrap_or(after)
648 } else {
649 rest
650 };
651 format!("/{rest}")
653 } else {
654 store.to_owned()
655 };
656
657 let trimmed = if resolved.len() > 1 {
659 resolved.strip_suffix('/').unwrap_or(&resolved)
660 } else {
661 &resolved
662 };
663 PathBuf::from(trimmed)
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669 use snapdir_core::manifest::ManifestEntry;
670 use std::fs;
671 use std::path::Path;
672
673 struct TempDir {
676 path: PathBuf,
677 }
678
679 impl TempDir {
680 fn new(tag: &str) -> Self {
681 use std::sync::atomic::{AtomicU64, Ordering};
682 static COUNTER: AtomicU64 = AtomicU64::new(0);
683 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
684 let path = std::env::temp_dir().join(format!(
685 "snapdir-filestore-test-{}-{tag}-{n}",
686 std::process::id()
687 ));
688 fs::create_dir_all(&path).expect("create temp dir");
689 Self { path }
690 }
691
692 fn path(&self) -> &Path {
693 &self.path
694 }
695 }
696
697 impl Drop for TempDir {
698 fn drop(&mut self) {
699 let _ = fs::remove_dir_all(&self.path);
700 }
701 }
702
703 fn make_foo_bar_source(source: &Path) -> (Manifest, String) {
708 let hasher = Blake3Hasher::new();
709 fs::write(source.join("foo"), b"foo\n").unwrap();
710 fs::write(source.join("bar"), b"bar\n").unwrap();
711 let foo_sum = hasher.hash_hex(b"foo\n");
712 let bar_sum = hasher.hash_hex(b"bar\n");
713
714 let root_sum =
715 snapdir_core::merkle::directory_checksum([foo_sum.as_str(), bar_sum.as_str()], &hasher);
716
717 let mut manifest = Manifest::new();
718 manifest.push(ManifestEntry::new(
719 PathType::Directory,
720 "700",
721 root_sum,
722 8,
723 "./",
724 ));
725 manifest.push(ManifestEntry::new(
726 PathType::File,
727 "600",
728 bar_sum,
729 4,
730 "./bar",
731 ));
732 manifest.push(ManifestEntry::new(
733 PathType::File,
734 "600",
735 foo_sum,
736 4,
737 "./foo",
738 ));
739 let manifest = Manifest::from_entries(manifest.entries().to_vec());
740 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
741 (manifest, id)
742 }
743
744 #[test]
745 fn file_store_parse_store_dir_matches_oracle_sed() {
746 assert_eq!(
748 parse_store_dir("file:///tmp/store"),
749 PathBuf::from("/tmp/store")
750 );
751 assert_eq!(
752 parse_store_dir("file:///tmp/store/"),
753 PathBuf::from("/tmp/store")
754 );
755 assert_eq!(
757 parse_store_dir("file://localhost/tmp/store"),
758 PathBuf::from("/tmp/store")
759 );
760 assert_eq!(
762 parse_store_dir("file://tmp/store"),
763 PathBuf::from("/tmp/store")
764 );
765 assert_eq!(parse_store_dir("/tmp/store"), PathBuf::from("/tmp/store"));
767 assert_eq!(parse_store_dir("file:///"), PathBuf::from("/"));
769 }
770
771 #[test]
772 fn file_store_push_lands_objects_at_sharded_keys_and_manifest_last() {
773 let store_dir = TempDir::new("store");
774 let src_dir = TempDir::new("src");
775 let (manifest, id) = make_foo_bar_source(src_dir.path());
776
777 let store = FileStore::from_root(store_dir.path());
778 store.push(&manifest, src_dir.path()).expect("push ok");
779
780 for entry in manifest.entries() {
782 if entry.path_type == PathType::File {
783 let obj = store_dir.path().join(object_path(&entry.checksum));
784 assert!(obj.exists(), "expected object at {}", obj.display());
785 let bytes = fs::read(&obj).unwrap();
787 assert_eq!(
788 Blake3Hasher::new().hash_hex(&bytes),
789 entry.checksum,
790 "object content must hash to its address"
791 );
792 }
793 }
794
795 let man_path = store_dir.path().join(manifest_path(&id));
797 assert!(man_path.exists(), "manifest must exist after push");
798 let read_back = store.get_manifest(&id).expect("manifest reads back");
799 assert_eq!(read_back, manifest);
800 }
801
802 #[test]
803 fn file_store_push_skips_when_manifest_present() {
804 let store_dir = TempDir::new("store");
805 let src_dir = TempDir::new("src");
806 let (manifest, id) = make_foo_bar_source(src_dir.path());
807 let store = FileStore::from_root(store_dir.path());
808 store.push(&manifest, src_dir.path()).expect("first push");
809
810 let foo_entry = manifest
813 .entries()
814 .iter()
815 .find(|e| e.path == "./foo")
816 .unwrap();
817 let obj = store_dir.path().join(object_path(&foo_entry.checksum));
818 fs::remove_file(&obj).unwrap();
819
820 let _ = id;
821 store
822 .push(&manifest, src_dir.path())
823 .expect("second push skips");
824 assert!(
825 !obj.exists(),
826 "manifest-present push must be a full no-op (object stays removed)"
827 );
828 }
829
830 #[test]
831 fn file_store_push_skips_present_objects_but_adds_missing() {
832 let store_dir = TempDir::new("store");
833 let src_dir = TempDir::new("src");
834 let (manifest, id) = make_foo_bar_source(src_dir.path());
835 let store = FileStore::from_root(store_dir.path());
836 store.push(&manifest, src_dir.path()).expect("first push");
837
838 let man_path = store_dir.path().join(manifest_path(&id));
841 fs::remove_file(&man_path).unwrap();
842 let foo_entry = manifest
843 .entries()
844 .iter()
845 .find(|e| e.path == "./foo")
846 .unwrap();
847 let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
848 fs::remove_file(&foo_obj).unwrap();
849
850 store.push(&manifest, src_dir.path()).expect("re-push");
851 assert!(foo_obj.exists(), "missing object must be re-added");
852 assert!(man_path.exists(), "manifest must be re-written");
853 }
854
855 #[test]
856 fn file_store_fetch_round_trips_and_verifies() {
857 let store_dir = TempDir::new("store");
858 let src_dir = TempDir::new("src");
859 let dest_dir = TempDir::new("dest");
860 let (manifest, id) = make_foo_bar_source(src_dir.path());
861 let store = FileStore::from_root(store_dir.path());
862 store.push(&manifest, src_dir.path()).expect("push");
863
864 let fetched = store.get_manifest(&id).expect("get manifest");
865 store
866 .fetch_files(&fetched, dest_dir.path())
867 .expect("fetch files");
868
869 assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
870 assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
871 }
872
873 #[test]
874 fn file_store_get_manifest_missing_is_not_found() {
875 let store_dir = TempDir::new("store");
876 let store = FileStore::from_root(store_dir.path());
877 let missing = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
878 match store.get_manifest(missing) {
879 Err(StoreError::ManifestNotFound { id }) => assert_eq!(id, missing),
880 other => panic!("expected ManifestNotFound, got {other:?}"),
881 }
882 }
883
884 #[test]
885 fn file_store_get_manifest_tampered_fails_integrity() {
886 let store_dir = TempDir::new("store");
887 let src_dir = TempDir::new("src");
888 let (manifest, id) = make_foo_bar_source(src_dir.path());
889 let store = FileStore::from_root(store_dir.path());
890 store.push(&manifest, src_dir.path()).expect("push");
891
892 let man_path = store_dir.path().join(manifest_path(&id));
894 fs::write(&man_path, b"D 700 deadbeef 0 ./\n").unwrap();
895
896 match store.get_manifest(&id) {
897 Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, id),
898 other => panic!("expected Integrity, got {other:?}"),
899 }
900 }
901
902 #[test]
903 fn file_store_fetch_missing_object_is_not_found() {
904 let store_dir = TempDir::new("store");
905 let dest_dir = TempDir::new("dest");
906 let hasher = Blake3Hasher::new();
907 let foo_sum = hasher.hash_hex(b"foo\n");
908
909 let mut manifest = Manifest::new();
910 manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 4, "./"));
911 manifest.push(ManifestEntry::new(
912 PathType::File,
913 "600",
914 foo_sum.clone(),
915 4,
916 "./foo",
917 ));
918
919 let store = FileStore::from_root(store_dir.path());
920 match store.fetch_files(&manifest, dest_dir.path()) {
921 Err(StoreError::ObjectNotFound { checksum }) => assert_eq!(checksum, foo_sum),
922 other => panic!("expected ObjectNotFound, got {other:?}"),
923 }
924 }
925
926 #[test]
927 fn file_store_persist_rejects_corrupt_source() {
928 let store_dir = TempDir::new("store");
932 let src_dir = TempDir::new("src");
933 let dest_dir = TempDir::new("dest");
934 let hasher = Blake3Hasher::new();
935
936 let (manifest, id) = make_foo_bar_source(src_dir.path());
939 let store = FileStore::from_root(store_dir.path());
940 store.push(&manifest, src_dir.path()).expect("push");
941
942 let foo_entry = manifest
943 .entries()
944 .iter()
945 .find(|e| e.path == "./foo")
946 .unwrap();
947 let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
948 fs::write(&foo_obj, b"corrupted not foo\n").unwrap();
949 assert_ne!(hasher.hash_hex(b"corrupted not foo\n"), foo_entry.checksum);
951
952 let fetched = store.get_manifest(&id).expect("manifest still valid");
953 match store.fetch_files(&fetched, dest_dir.path()) {
954 Err(StoreError::Integrity { expected, .. }) => {
955 assert_eq!(expected, foo_entry.checksum);
956 }
957 other => panic!("expected Integrity from corrupt object, got {other:?}"),
958 }
959 assert!(!dest_dir.path().join("foo").exists());
961 }
962
963 #[test]
964 fn fetch_skip_present_verified() {
965 let store_dir = TempDir::new("store");
970 let src_dir = TempDir::new("src");
971 let dest_dir = TempDir::new("dest");
972 let (manifest, id) = make_foo_bar_source(src_dir.path());
973
974 let store = FileStore::from_root(store_dir.path());
975 store.push(&manifest, src_dir.path()).expect("push");
976
977 let fetched = store.get_manifest(&id).expect("get manifest");
978 store
979 .fetch_files(&fetched, dest_dir.path())
980 .expect("first fetch populates dest");
981 assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
982 assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
983
984 let objects = store_dir.path().join(".objects");
986 fs::remove_dir_all(&objects).expect("remove .objects tree");
987 assert!(!objects.exists());
988
989 store
992 .fetch_files(&fetched, dest_dir.path())
993 .expect("second fetch skips every present+verified file (no object reads)");
994
995 assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
997 assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
998 }
999
1000 #[test]
1001 fn file_store_fetch_repairs_corrupt_dest_and_skips_intact() {
1002 let store_dir = TempDir::new("store");
1006 let src_dir = TempDir::new("src");
1007 let dest_dir = TempDir::new("dest");
1008 let (manifest, id) = make_foo_bar_source(src_dir.path());
1009
1010 let store = FileStore::from_root(store_dir.path());
1011 store.push(&manifest, src_dir.path()).expect("push");
1012 let fetched = store.get_manifest(&id).expect("get manifest");
1013 store
1014 .fetch_files(&fetched, dest_dir.path())
1015 .expect("first fetch populates dest");
1016
1017 fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
1019 let bar_entry = manifest
1022 .entries()
1023 .iter()
1024 .find(|e| e.path == "./bar")
1025 .unwrap();
1026 let bar_obj = store_dir.path().join(object_path(&bar_entry.checksum));
1027 fs::remove_file(&bar_obj).unwrap();
1028
1029 store
1030 .fetch_files(&fetched, dest_dir.path())
1031 .expect("repair corrupt foo, skip intact bar");
1032
1033 assert_eq!(fs::read(dest_dir.path().join("foo")).unwrap(), b"foo\n");
1035 assert_eq!(fs::read(dest_dir.path().join("bar")).unwrap(), b"bar\n");
1036 }
1037
1038 #[test]
1039 fn file_store_fetch_mismatch_then_missing_object_errors() {
1040 let store_dir = TempDir::new("store");
1044 let src_dir = TempDir::new("src");
1045 let dest_dir = TempDir::new("dest");
1046 let (manifest, id) = make_foo_bar_source(src_dir.path());
1047
1048 let store = FileStore::from_root(store_dir.path());
1049 store.push(&manifest, src_dir.path()).expect("push");
1050 let fetched = store.get_manifest(&id).expect("get manifest");
1051 store
1052 .fetch_files(&fetched, dest_dir.path())
1053 .expect("first fetch populates dest");
1054
1055 let foo_entry = manifest
1056 .entries()
1057 .iter()
1058 .find(|e| e.path == "./foo")
1059 .unwrap();
1060 fs::write(dest_dir.path().join("foo"), b"WRONG\n").unwrap();
1062 let foo_obj = store_dir.path().join(object_path(&foo_entry.checksum));
1064 fs::remove_file(&foo_obj).unwrap();
1065
1066 match store.fetch_files(&fetched, dest_dir.path()) {
1067 Err(StoreError::ObjectNotFound { checksum }) => {
1068 assert_eq!(checksum, foo_entry.checksum);
1069 }
1070 other => panic!("expected ObjectNotFound (cannot repair), got {other:?}"),
1071 }
1072 }
1073
1074 fn make_nested_source(source: &Path) -> (Manifest, String) {
1087 let hasher = Blake3Hasher::new();
1088 let files: &[(&str, &[u8])] = &[
1089 ("a.txt", b"a contents\n"),
1090 ("b.txt", b"b contents\n"),
1091 ("sub/c.txt", b"c contents\n"),
1092 ("sub/deep/d.txt", b"d contents\n"),
1093 ];
1094
1095 fs::create_dir_all(source.join("sub/deep")).unwrap();
1096 for (rel, bytes) in files {
1097 fs::write(source.join(rel), bytes).unwrap();
1098 }
1099
1100 let mut manifest = Manifest::new();
1101 manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1106 manifest.push(ManifestEntry::new(
1107 PathType::Directory,
1108 "700",
1109 "x",
1110 0,
1111 "./sub/",
1112 ));
1113 manifest.push(ManifestEntry::new(
1114 PathType::Directory,
1115 "700",
1116 "x",
1117 0,
1118 "./sub/deep/",
1119 ));
1120 for (rel, bytes) in files {
1121 let sum = hasher.hash_hex(bytes);
1122 #[allow(clippy::cast_possible_truncation)]
1123 manifest.push(ManifestEntry::new(
1124 PathType::File,
1125 "600",
1126 sum,
1127 bytes.len() as u64,
1128 format!("./{rel}"),
1129 ));
1130 }
1131
1132 let manifest = Manifest::from_entries(manifest.entries().to_vec());
1133 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1134 (manifest, id)
1135 }
1136
1137 fn assert_nested_dest(dest: &Path) {
1139 assert_eq!(fs::read(dest.join("a.txt")).unwrap(), b"a contents\n");
1140 assert_eq!(fs::read(dest.join("b.txt")).unwrap(), b"b contents\n");
1141 assert_eq!(fs::read(dest.join("sub/c.txt")).unwrap(), b"c contents\n");
1142 assert_eq!(
1143 fs::read(dest.join("sub/deep/d.txt")).unwrap(),
1144 b"d contents\n"
1145 );
1146 }
1147
1148 #[test]
1149 fn filestore_parallel_roundtrip_byte_identical() {
1150 let src_dir = TempDir::new("src");
1154 let (manifest, id) = make_nested_source(src_dir.path());
1155
1156 let par_store_dir = TempDir::new("store-par");
1158 let par_dest_dir = TempDir::new("dest-par");
1159 let par_store =
1160 FileStore::from_root_with_config(par_store_dir.path(), TransferConfig::new(4, None));
1161 par_store.push(&manifest, src_dir.path()).expect("par push");
1162 let par_manifest = par_store.get_manifest(&id).expect("par get manifest");
1163 assert_eq!(par_manifest, manifest, "round-tripped manifest matches");
1164 par_store
1165 .fetch_files(&par_manifest, par_dest_dir.path())
1166 .expect("par fetch");
1167 assert_nested_dest(par_dest_dir.path());
1168
1169 let seq_store_dir = TempDir::new("store-seq");
1171 let seq_dest_dir = TempDir::new("dest-seq");
1172 let seq_store =
1173 FileStore::from_root_with_config(seq_store_dir.path(), TransferConfig::new(1, None));
1174 seq_store.push(&manifest, src_dir.path()).expect("seq push");
1175 let seq_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1176 assert_eq!(seq_id, id, "snapshot id is concurrency-independent");
1177 seq_store
1178 .fetch_files(&manifest, seq_dest_dir.path())
1179 .expect("seq fetch");
1180 assert_nested_dest(seq_dest_dir.path());
1181
1182 for entry in manifest.entries() {
1185 if entry.path_type != PathType::File {
1186 continue;
1187 }
1188 let key = object_path(&entry.checksum);
1189 let par_obj = par_store_dir.path().join(&key);
1190 let seq_obj = seq_store_dir.path().join(&key);
1191 assert!(par_obj.exists(), "par object {key} present");
1192 assert!(seq_obj.exists(), "seq object {key} present");
1193 assert_eq!(
1194 fs::read(&par_obj).unwrap(),
1195 fs::read(&seq_obj).unwrap(),
1196 "par and seq object bytes identical"
1197 );
1198 }
1199 }
1200
1201 #[test]
1202 fn filestore_adaptive_push_fetch_same_snapshot_id_and_bytes() {
1203 use crate::transfer::AdaptivePolicy;
1208
1209 let src_dir = TempDir::new("src");
1210 let (manifest, id) = make_nested_source(src_dir.path());
1211
1212 let off_store_dir = TempDir::new("store-off");
1214 let off_dest_dir = TempDir::new("dest-off");
1215 let off =
1216 FileStore::from_root_with_config(off_store_dir.path(), TransferConfig::new(4, None));
1217 off.push(&manifest, src_dir.path()).expect("off push");
1218 let off_manifest = off.get_manifest(&id).expect("off manifest");
1219 off.fetch_files(&off_manifest, off_dest_dir.path())
1220 .expect("off fetch");
1221
1222 let on_store_dir = TempDir::new("store-on");
1224 let on_dest_dir = TempDir::new("dest-on");
1225 let on_cfg = TransferConfig::new(4, None).with_adaptive(AdaptivePolicy::On {
1226 fraction: 0.8,
1227 ceiling: 2,
1228 });
1229 let on = FileStore::from_root_with_config(on_store_dir.path(), on_cfg);
1230 on.push(&manifest, src_dir.path()).expect("adaptive push");
1231 let on_manifest = on.get_manifest(&id).expect("adaptive manifest");
1232 on.fetch_files(&on_manifest, on_dest_dir.path())
1233 .expect("adaptive fetch");
1234
1235 let off_id = snapdir_core::merkle::snapshot_id(&off_manifest, &Blake3Hasher::new());
1237 let on_id = snapdir_core::merkle::snapshot_id(&on_manifest, &Blake3Hasher::new());
1238 assert_eq!(off_id, on_id, "adaptive must re-id to the same snapshot");
1239 assert_eq!(on_id, id, "and it matches the original id");
1240 assert_eq!(off_manifest, on_manifest, "round-tripped manifests equal");
1241
1242 for entry in manifest.entries() {
1244 if entry.path_type != PathType::File {
1245 continue;
1246 }
1247 let key = object_path(&entry.checksum);
1248 let off_obj = off_store_dir.path().join(&key);
1249 let on_obj = on_store_dir.path().join(&key);
1250 assert!(on_obj.exists(), "adaptive object {key} present");
1251 assert_eq!(
1252 fs::read(&off_obj).unwrap(),
1253 fs::read(&on_obj).unwrap(),
1254 "Off vs On object bytes identical for {key}"
1255 );
1256 }
1257 assert_nested_dest(off_dest_dir.path());
1258 assert_nested_dest(on_dest_dir.path());
1259 }
1260
1261 #[test]
1262 fn filestore_parallel_concurrency_one_sequential() {
1263 let store_dir = TempDir::new("store");
1266 let src_dir = TempDir::new("src");
1267 let dest_dir = TempDir::new("dest");
1268 let (manifest, id) = make_nested_source(src_dir.path());
1269
1270 let store =
1271 FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(1, None));
1272 store.push(&manifest, src_dir.path()).expect("push");
1273 let fetched = store.get_manifest(&id).expect("get manifest");
1274 store.fetch_files(&fetched, dest_dir.path()).expect("fetch");
1275 assert_nested_dest(dest_dir.path());
1276 }
1277
1278 #[test]
1279 fn filestore_parallel_all_or_nothing_bad_object() {
1280 let store_dir = TempDir::new("store");
1284 let src_dir = TempDir::new("src");
1285 let (manifest, id) = make_nested_source(src_dir.path());
1286
1287 fs::write(src_dir.path().join("sub/c.txt"), b"TAMPERED\n").unwrap();
1290
1291 let store =
1292 FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1293 match store.push(&manifest, src_dir.path()) {
1294 Err(StoreError::Integrity { .. }) => {}
1295 other => panic!("expected Integrity from bad source object, got {other:?}"),
1296 }
1297
1298 let man_path = store.manifest_disk_path(&id);
1300 assert!(
1301 !man_path.exists(),
1302 "manifest must not be written when an object copy fails"
1303 );
1304 }
1305
1306 #[test]
1307 fn filestore_parallel_large_n_round_trips() {
1308 let store_dir = TempDir::new("store");
1310 let src_dir = TempDir::new("src");
1311 let dest_dir = TempDir::new("dest");
1312 let hasher = Blake3Hasher::new();
1313
1314 let mut manifest = Manifest::new();
1315 manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
1316 let n = 50usize;
1317 for i in 0..n {
1318 let name = format!("file-{i:03}.txt");
1319 let contents = format!("contents of file {i}\n");
1320 fs::write(src_dir.path().join(&name), contents.as_bytes()).unwrap();
1321 let sum = hasher.hash_hex(contents.as_bytes());
1322 #[allow(clippy::cast_possible_truncation)]
1323 manifest.push(ManifestEntry::new(
1324 PathType::File,
1325 "600",
1326 sum,
1327 contents.len() as u64,
1328 format!("./{name}"),
1329 ));
1330 }
1331 let manifest = Manifest::from_entries(manifest.entries().to_vec());
1332 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
1333
1334 let store =
1335 FileStore::from_root_with_config(store_dir.path(), TransferConfig::new(4, None));
1336 store.push(&manifest, src_dir.path()).expect("push N files");
1337 let fetched = store.get_manifest(&id).expect("get manifest");
1338 store
1339 .fetch_files(&fetched, dest_dir.path())
1340 .expect("fetch N files");
1341
1342 for i in 0..n {
1343 let name = format!("file-{i:03}.txt");
1344 let expected = format!("contents of file {i}\n");
1345 assert_eq!(
1346 fs::read(dest_dir.path().join(&name)).unwrap(),
1347 expected.as_bytes()
1348 );
1349 }
1350 }
1351
1352 #[test]
1353 fn meter_records_filestore_push_fetch() {
1354 let src_dir = TempDir::new("src");
1360 let (manifest, id) = make_nested_source(src_dir.path());
1361
1362 let n = manifest
1363 .entries()
1364 .iter()
1365 .filter(|e| e.path_type == PathType::File)
1366 .count() as u64;
1367 let total_bytes: u64 = manifest
1368 .entries()
1369 .iter()
1370 .filter(|e| e.path_type == PathType::File)
1371 .map(|e| e.size)
1372 .sum();
1373
1374 let store_dir = TempDir::new("store");
1375 let dest_dir = TempDir::new("dest");
1376 let meter = Arc::new(Meter::new());
1377 let store = FileStore::from_root(store_dir.path()).with_meter(Some(Arc::clone(&meter)));
1378
1379 store.push(&manifest, src_dir.path()).expect("push");
1380 let after_push = meter.snapshot();
1381 assert_eq!(after_push.bytes_in, total_bytes, "push read every object");
1382 assert_eq!(after_push.bytes_out, total_bytes, "push wrote every object");
1383 assert_eq!(after_push.objects_done, n, "push finished N objects");
1384 assert_eq!(after_push.objects_skipped, 0, "fresh store skips nothing");
1385 assert_eq!(after_push.objects_total, total_bytes, "push set byte total");
1386 assert_eq!(after_push.in_flight, 0, "nothing left in flight");
1387
1388 let fetched = store.get_manifest(&id).expect("get manifest");
1389 store
1390 .fetch_files(&fetched, dest_dir.path())
1391 .expect("fetch_files");
1392 let after_fetch = meter.snapshot();
1393 assert_eq!(
1394 after_fetch.bytes_in,
1395 2 * total_bytes,
1396 "fetch read every object again"
1397 );
1398 assert_eq!(
1399 after_fetch.bytes_out,
1400 2 * total_bytes,
1401 "fetch wrote every object again"
1402 );
1403 assert_eq!(after_fetch.objects_done, 2 * n, "push + fetch = 2N objects");
1404 assert_eq!(after_fetch.in_flight, 0, "nothing left in flight");
1405
1406 assert_nested_dest(dest_dir.path());
1408 }
1409
1410 #[test]
1411 fn meter_records_none_is_identical() {
1412 let src_dir = TempDir::new("src");
1416 let (manifest, id) = make_nested_source(src_dir.path());
1417
1418 let metered_store_dir = TempDir::new("store-metered");
1420 let metered_dest_dir = TempDir::new("dest-metered");
1421 let meter = Arc::new(Meter::new());
1422 let metered =
1423 FileStore::from_root(metered_store_dir.path()).with_meter(Some(Arc::clone(&meter)));
1424 metered
1425 .push(&manifest, src_dir.path())
1426 .expect("metered push");
1427 let metered_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1428 let metered_manifest = metered.get_manifest(&id).expect("metered manifest");
1429 metered
1430 .fetch_files(&metered_manifest, metered_dest_dir.path())
1431 .expect("metered fetch");
1432
1433 let plain_store_dir = TempDir::new("store-plain");
1435 let plain_dest_dir = TempDir::new("dest-plain");
1436 let plain = FileStore::from_root(plain_store_dir.path());
1437 plain.push(&manifest, src_dir.path()).expect("plain push");
1438 let plain_id = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
1439 let plain_manifest = plain.get_manifest(&id).expect("plain manifest");
1440 plain
1441 .fetch_files(&plain_manifest, plain_dest_dir.path())
1442 .expect("plain fetch");
1443
1444 assert_eq!(metered_id, plain_id, "snapshot id unaffected by the meter");
1446 assert_eq!(metered_id, id);
1447
1448 for entry in manifest.entries() {
1450 if entry.path_type != PathType::File {
1451 continue;
1452 }
1453 let key = object_path(&entry.checksum);
1454 let metered_obj = metered_store_dir.path().join(&key);
1455 let plain_obj = plain_store_dir.path().join(&key);
1456 assert!(metered_obj.exists(), "metered object {key} present");
1457 assert!(plain_obj.exists(), "plain object {key} present");
1458 assert_eq!(
1459 fs::read(&metered_obj).unwrap(),
1460 fs::read(&plain_obj).unwrap(),
1461 "metered and unmetered object bytes identical"
1462 );
1463 }
1464
1465 assert_nested_dest(metered_dest_dir.path());
1467 assert_nested_dest(plain_dest_dir.path());
1468 }
1469
1470 #[test]
1471 fn file_store_strip_leading_dot_slash() {
1472 assert_eq!(strip_leading_dot_slash("./foo"), "foo");
1473 assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
1474 assert_eq!(strip_leading_dot_slash("./a/"), "a");
1475 assert_eq!(strip_leading_dot_slash("./"), "");
1476 assert_eq!(strip_leading_dot_slash("/abs/path"), "/abs/path");
1477 }
1478
1479 #[test]
1485 fn stream_store_filestore_object_roundtrip() {
1486 let store_dir = TempDir::new("stream-roundtrip");
1487 let store = FileStore::from_root(store_dir.path());
1488
1489 let bytes = b"hello stream store\n".to_vec();
1490 let checksum = Blake3Hasher::new().hash_hex(&bytes);
1491
1492 assert!(!store.has_object(&checksum).unwrap());
1494
1495 store.put_object(&checksum, bytes.clone()).expect("put ok");
1496
1497 assert!(store.has_object(&checksum).unwrap());
1499 assert_eq!(store.get_object(&checksum).unwrap(), bytes);
1500
1501 assert!(store_dir.path().join(object_path(&checksum)).exists());
1503 }
1504
1505 #[test]
1506 fn stream_store_get_object_rejects_corruption() {
1507 let store_dir = TempDir::new("stream-corrupt");
1508 let store = FileStore::from_root(store_dir.path());
1509
1510 let good = b"the real object bytes\n".to_vec();
1513 let checksum = Blake3Hasher::new().hash_hex(&good);
1514 let target = store_dir.path().join(object_path(&checksum));
1515 fs::create_dir_all(target.parent().unwrap()).unwrap();
1516 fs::write(&target, b"TAMPERED bytes that do not hash to the address\n").unwrap();
1517
1518 match store.get_object(&checksum) {
1519 Err(StoreError::Integrity {
1520 expected, actual, ..
1521 }) => {
1522 assert_eq!(expected, checksum);
1523 assert_ne!(actual, checksum, "actual must differ from the address");
1524 }
1525 other => panic!("expected Integrity, got {other:?}"),
1526 }
1527 }
1528
1529 #[test]
1530 fn stream_store_put_object_rejects_wrong_checksum() {
1531 let store_dir = TempDir::new("stream-wrong-checksum");
1532 let store = FileStore::from_root(store_dir.path());
1533
1534 let bytes = b"some payload\n".to_vec();
1535 let wrong = "dead".repeat(16); assert_ne!(wrong, Blake3Hasher::new().hash_hex(&bytes));
1538
1539 match store.put_object(&wrong, bytes) {
1540 Err(StoreError::Integrity { expected, .. }) => assert_eq!(expected, wrong),
1541 other => panic!("expected Integrity, got {other:?}"),
1542 }
1543
1544 assert!(!store.has_object(&wrong).unwrap());
1546 assert!(!store_dir.path().join(object_path(&wrong)).exists());
1547 }
1548
1549 #[test]
1550 fn stream_store_put_manifest_roundtrips() {
1551 let store_dir = TempDir::new("stream-manifest");
1552 let src_dir = TempDir::new("stream-manifest-src");
1553 let store = FileStore::from_root(store_dir.path());
1554
1555 let (manifest, id) = make_foo_bar_source(src_dir.path());
1556
1557 store.put_manifest(&id, &manifest).expect("put_manifest ok");
1558
1559 let back = store.get_manifest(&id).expect("get_manifest ok");
1562 assert_eq!(back.entries(), manifest.entries());
1563 assert_eq!(
1564 snapdir_core::merkle::snapshot_id(&back, &Blake3Hasher::new()),
1565 id
1566 );
1567 }
1568}