1use std::cell::OnceCell;
200use std::collections::hash_map::IntoIter;
201use std::collections::{HashMap, HashSet};
202use std::fs::File;
203use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
204use std::ops::Deref;
205use std::path::{Path, PathBuf};
206use std::time::SystemTime;
207
208use file_declutter::FileDeclutter;
209use rayon::prelude::*;
210use serde::{Deserialize, Serialize};
211use thiserror::Error;
212use walkdir::WalkDir;
213
214#[derive(Debug, Error)]
215pub enum Error {
216 #[error(transparent)]
217 Io(#[from] std::io::Error),
218}
219
220type Result<R> = std::result::Result<R, Error>;
221
222#[derive(Clone, Debug, Default, Deserialize, Serialize)]
224#[serde(from = "Option<T>")]
225#[serde(into = "Option<T>")]
226struct LazyOption<T>(OnceCell<T>)
227where
228 T: Clone;
229
230impl<T> From<Option<T>> for LazyOption<T>
231where
232 T: Clone,
233{
234 fn from(value: Option<T>) -> Self {
235 Self(if let Some(value) = value {
236 OnceCell::from(value)
237 } else {
238 OnceCell::new()
239 })
240 }
241}
242
243impl<T> Into<Option<T>> for LazyOption<T>
244where
245 T: Clone,
246{
247 fn into(self) -> Option<T> {
248 self.0.get().cloned()
249 }
250}
251
252impl<T> Deref for LazyOption<T>
253where
254 T: Clone,
255{
256 type Target = OnceCell<T>;
257
258 fn deref(&self) -> &Self::Target {
259 &self.0
260 }
261}
262
263#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
265pub enum HashingAlgorithm {
266 MD5,
267 SHA1,
268 SHA256,
269 SHA512,
270}
271
272impl HashingAlgorithm {
273 fn select_hasher(&self) -> Box<dyn sha2::digest::DynDigest> {
275 match self {
276 Self::MD5 => Box::new(md5::Md5::default()),
277 Self::SHA1 => Box::new(sha1::Sha1::default()),
278 Self::SHA256 => Box::new(sha2::Sha256::default()),
279 Self::SHA512 => Box::new(sha2::Sha512::default()),
280 }
281 }
282}
283
284#[derive(Clone, Debug, Deserialize, Serialize)]
286pub struct FileWithChunks {
287 #[serde(skip)]
288 base: PathBuf,
289 pub path: String,
291 pub size: u64,
293 pub mtime: SystemTime,
295 chunks: LazyOption<Vec<FileChunk>>,
296 hashing_algorithm: HashingAlgorithm,
297}
298
299impl PartialEq for FileWithChunks {
300 fn eq(&self, other: &Self) -> bool {
301 self.path == other.path && self.size == other.size && self.mtime == other.mtime
302 }
303}
304
305impl Eq for FileWithChunks {}
306
307impl FileWithChunks {
308 pub fn try_new(
310 source_path: impl Into<PathBuf>,
311 path: impl Into<PathBuf>,
312 hashing_algorithm: HashingAlgorithm,
313 ) -> Result<Self> {
314 let base = source_path.into();
315
316 let path = path.into();
317 let metadata = path.metadata()?;
318
319 let path = path
320 .strip_prefix(&base)
321 .unwrap()
322 .to_string_lossy()
323 .to_string();
324 let size = metadata.len();
325 let mtime = metadata.modified()?;
326
327 Ok(Self {
328 base,
329 path,
330 size,
331 mtime,
332 chunks: Default::default(),
333 hashing_algorithm,
334 })
335 }
336
337 pub fn get_chunks(&self) -> Option<&Vec<FileChunk>> {
339 self.chunks.get()
340 }
341
342 pub fn get_or_calculate_chunks(&self) -> Result<&Vec<FileChunk>> {
343 if self.chunks.get().is_none() {
344 let chunks = self.calculate_chunks()?;
345
346 self.chunks.set(chunks.clone()).unwrap();
348 }
349
350 Ok(self.chunks.get().unwrap())
351 }
352
353 fn calculate_chunks(&self) -> Result<Vec<FileChunk>> {
355 let path = self.base.join(&self.path);
356
357 let size = path.metadata()?.len();
358
359 let hashing_algorithm = self.hashing_algorithm;
360
361 let chunk_size = 1024 * 1024;
363 if size == 0 {
364 let hasher = hashing_algorithm.select_hasher();
365 let hash = hasher.finalize();
366 let hash = base16ct::lower::encode_string(&hash);
367
368 std::iter::once(Ok::<FileChunk, Error>(FileChunk::new(0, 0, hash))).collect()
369 } else {
370 (0..(size + chunk_size - 1) / chunk_size)
371 .into_par_iter()
372 .map(|start| {
373 let mut input = BufReader::new(File::open(&path)?);
374 input.seek(SeekFrom::Start(start * chunk_size)).unwrap();
375
376 let chunk = input
377 .bytes()
378 .by_ref()
379 .take(chunk_size as usize)
380 .flatten()
381 .collect::<Vec<_>>();
382
383 let mut hasher = hashing_algorithm.select_hasher();
384 hasher.update(&chunk);
385 let hash = hasher.finalize();
386 let hash = base16ct::lower::encode_string(&hash);
387
388 Ok::<FileChunk, Error>(FileChunk::new(
389 start * chunk_size,
390 chunk.len() as u64,
391 hash,
392 ))
393 })
394 .collect()
395 }
396 }
397}
398
399#[derive(Clone, Debug, Deserialize, Serialize)]
401pub struct FileChunk {
402 pub start: u64,
403 pub size: u64,
404 pub hash: String,
405 #[serde(skip)]
406 pub path: Option<String>,
407}
408
409impl FileChunk {
410 pub fn new(start: u64, size: u64, hash: String) -> Self {
412 Self {
413 start,
414 size,
415 hash,
416 path: None,
417 }
418 }
419}
420
421pub struct DedupCache(HashMap<String, FileWithChunks>);
423
424impl DedupCache {
425 fn new() -> Self {
427 Self(HashMap::new())
428 }
429
430 fn from_hashmap(hash_map: HashMap<String, FileWithChunks>) -> Self {
432 Self(hash_map)
433 }
434
435 fn read_from_file(&mut self, path: impl AsRef<Path>) {
437 let reader = File::open(&path).map(BufReader::new);
438
439 let cache_from_file: Vec<FileWithChunks> =
440 if path.as_ref().extension() == Some("zst".as_ref()) {
441 reader
442 .and_then(zstd::Decoder::with_buffer)
443 .map(|reader| serde_json::from_reader(reader))
444 .map(|result| result.unwrap())
445 .unwrap_or_default()
446 } else {
447 reader
448 .map(|reader| serde_json::from_reader(reader))
449 .map(|result| result.unwrap())
450 .unwrap_or_default()
451 };
452
453 for x in cache_from_file {
454 self.insert(x.path.clone(), x);
455 }
456 }
457
458 fn write_to_file(&self, path: impl AsRef<Path>) {
460 std::fs::create_dir_all(path.as_ref().parent().unwrap()).unwrap();
461 let writer = File::create(&path).map(BufWriter::new);
462
463 if path.as_ref().extension() == Some("zst".as_ref()) {
464 writer
465 .and_then(|writer| zstd::Encoder::new(writer, 0))
466 .map(|encoder| encoder.auto_finish())
467 .map(|writer| serde_json::to_writer(writer, &self.values().collect::<Vec<_>>()))
468 .unwrap()
469 .unwrap();
470 } else {
471 writer
472 .map(|writer| serde_json::to_writer(writer, &self.values().collect::<Vec<_>>()))
473 .unwrap()
474 .unwrap();
475 }
476 }
477
478 pub fn get_chunks(&self) -> Result<impl Iterator<Item = (String, FileChunk, bool)> + '_> {
481 Ok(self.values().flat_map(|fwc| {
482 let mut dirty = fwc.get_chunks().is_none();
483
484 fwc.get_or_calculate_chunks()
485 .unwrap()
486 .iter()
487 .map(move |chunk| {
488 let result = (
489 chunk.hash.clone(),
490 FileChunk {
491 path: Some(fwc.path.clone()),
492 ..chunk.clone()
493 },
494 dirty,
495 );
496
497 dirty = false;
498
499 result
500 })
501 }))
502 }
503
504 pub fn get(&self, path: &str) -> Option<&FileWithChunks> {
505 self.0.get(path)
506 }
507
508 pub fn get_mut(&mut self, path: &str) -> Option<&mut FileWithChunks> {
509 self.0.get_mut(path)
510 }
511
512 fn insert(&mut self, path: String, fwc: FileWithChunks) {
513 self.0.insert(path, fwc);
514 }
515
516 pub fn contains_key(&self, path: &str) -> bool {
517 self.0.contains_key(path)
518 }
519
520 pub fn into_iter(self) -> IntoIter<String, FileWithChunks> {
521 self.0.into_iter()
522 }
523
524 pub fn values(&self) -> impl Iterator<Item = &FileWithChunks> {
525 self.0.values()
526 }
527
528 pub fn len(&self) -> usize {
529 self.0.len()
530 }
531}
532
533pub struct Deduper {
536 source_path: PathBuf,
537 cache_path: PathBuf,
538 pub cache: DedupCache,
539}
540
541impl Deduper {
542 pub fn new(
547 source_path: impl Into<PathBuf>,
548 cache_paths: Vec<impl Into<PathBuf>>,
549 hashing_algorithm: HashingAlgorithm,
550 same_file_system: bool,
551 ) -> Self {
552 let source_path = source_path.into();
553
554 let mut cache = DedupCache::new();
555
556 let cache_path = {
557 let mut cache_path = Default::default();
558 for cache_path_from_iter in cache_paths.into_iter().rev() {
559 cache_path = cache_path_from_iter.into();
560 cache.read_from_file(&cache_path);
561 }
562 cache_path
563 };
564
565 cache = DedupCache::from_hashmap(
566 cache
567 .into_iter()
568 .filter(|(path, _)| source_path.join(path).exists())
569 .collect(),
570 );
571
572 let dir_walker = WalkDir::new(&source_path)
573 .min_depth(1)
574 .same_file_system(same_file_system);
575
576 for entry in dir_walker {
577 let entry = entry.unwrap().into_path();
578
579 if !entry.is_file() {
580 continue;
581 }
582
583 let fwc = FileWithChunks::try_new(&source_path, &entry, hashing_algorithm).unwrap();
584
585 if let Some(fwc_cache) = cache.get_mut(&fwc.path) {
586 if fwc == *fwc_cache {
587 fwc_cache.base = source_path.clone();
588 continue;
589 }
590 }
591
592 cache.insert(fwc.path.clone(), fwc);
593 }
594
595 Self {
596 source_path,
597 cache_path,
598 cache,
599 }
600 }
601
602 pub fn write_cache(&self) {
604 let temp_path = self.cache_path.clone().with_extension(format!(
605 "tmp.{}.{}",
606 SystemTime::now()
607 .duration_since(SystemTime::UNIX_EPOCH)
608 .unwrap()
609 .as_millis(),
610 self.cache_path
611 .extension()
612 .unwrap_or("ext".as_ref())
613 .to_str()
614 .unwrap()
615 ));
616 self.cache.write_to_file(&temp_path);
617 std::fs::rename(temp_path, &self.cache_path).unwrap();
618 }
619
620 pub fn write_chunks(
623 &mut self,
624 target_path: impl Into<PathBuf>,
625 declutter_levels: usize,
626 ) -> Result<()> {
627 let target_path = target_path.into();
628 let data_dir = target_path.join("data");
629 std::fs::create_dir_all(&data_dir)?;
630 for (_, chunk, _) in self.cache.get_chunks()? {
631 let mut chunk_file = PathBuf::from(&chunk.hash);
632 if declutter_levels > 0 {
633 chunk_file = FileDeclutter::oneshot(chunk_file, declutter_levels);
634 }
635 chunk_file = data_dir.join(chunk_file);
636
637 if !chunk_file.exists() {
638 std::fs::create_dir_all(&chunk_file.parent().unwrap())?;
639 let mut out = File::create(chunk_file)?;
640 let data_in = BufReader::new(File::open(
641 self.source_path.join(chunk.path.as_ref().unwrap()),
642 )?)
643 .bytes()
644 .skip(chunk.start as usize)
645 .take(chunk.size as usize)
646 .flatten()
647 .collect::<Vec<_>>();
648 out.write_all(&data_in)?;
649 }
650 }
651
652 Ok(())
653 }
654}
655
656pub struct Hydrator {
658 source_path: PathBuf,
659 pub cache: DedupCache,
660}
661
662impl Hydrator {
663 pub fn new(source_path: impl Into<PathBuf>, cache_paths: Vec<impl Into<PathBuf>>) -> Self {
665 let source_path = source_path.into();
666
667 let mut cache = DedupCache::new();
668
669 for cache_path in cache_paths.into_iter().rev() {
670 let cache_path = cache_path.into();
671 cache.read_from_file(&cache_path);
672 }
673
674 Self { source_path, cache }
675 }
676
677 pub fn restore_files(&self, target_path: impl Into<PathBuf>, declutter_levels: usize) {
680 let data_dir = self.source_path.join("data");
681 let target_path = target_path.into();
682 std::fs::create_dir_all(&target_path).unwrap();
683 for fwc in self.cache.values() {
684 let target = target_path.join(&fwc.path);
685 std::fs::create_dir_all(&target.parent().unwrap()).unwrap();
686 let target_file = File::create(&target).unwrap();
687 let mut target = BufWriter::new(&target_file);
688 for chunk in fwc.get_chunks().unwrap() {
689 let mut chunk_file = PathBuf::from(&chunk.hash);
690 if declutter_levels > 0 {
691 chunk_file = FileDeclutter::oneshot(chunk_file, declutter_levels);
692 }
693 chunk_file = data_dir.join(chunk_file);
694
695 let mut source = File::open(chunk_file).unwrap();
696 std::io::copy(&mut source, &mut target).unwrap();
697 }
698 target.flush().unwrap();
699 target_file.set_modified(fwc.mtime).unwrap()
700 }
701 }
702
703 pub fn check_cache(&self) -> bool {
705 let mut success = true;
706
707 let path_data = self.source_path.join("data");
708 for (hash, meta) in self
709 .cache
710 .get_chunks()
711 .unwrap()
712 .map(|(hash, meta, ..)| (PathBuf::from(hash), meta))
713 {
714 let path = path_data.join(FileDeclutter::oneshot(hash, 3));
715
716 if !path.exists() {
717 eprintln!("Does not exist: {}", path.display());
718 success = false;
719 continue;
720 }
721
722 if path.metadata().unwrap().len() != meta.size {
723 eprintln!(
724 "Does not have expected size of {}: {}",
725 meta.size,
726 path.display()
727 );
728 success = false;
729 continue;
730 }
731 }
732
733 success
734 }
735
736 pub fn list_extra_files(&self, declutter_levels: usize) -> impl Iterator<Item = PathBuf> {
738 let files_in_cache = FileDeclutter::new_from_iter(
739 self.cache
740 .get_chunks()
741 .unwrap()
742 .map(|(hash, ..)| PathBuf::from(hash)),
743 )
744 .base(&self.source_path.join("data"))
745 .levels(declutter_levels)
746 .map(|(_, path)| path)
747 .collect::<HashSet<_>>();
748
749 WalkDir::new(&self.source_path.join("data"))
750 .min_depth(1)
751 .same_file_system(false)
752 .into_iter()
753 .filter(move |entry| {
754 entry
755 .as_ref()
756 .map(|entry| {
757 entry.file_type().is_file() && !files_in_cache.contains(entry.path())
758 })
759 .unwrap_or_default()
760 })
761 .flatten()
762 .map(|entry| entry.into_path())
763 }
764
765 pub fn delete_extra_files(&self, declutter_levels: usize) -> anyhow::Result<()> {
767 for path in self.list_extra_files(declutter_levels) {
768 std::fs::remove_file(&path)?;
769 }
770
771 Ok(())
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use assert_fs::fixture::ChildPath;
778 use assert_fs::prelude::*;
779 use assert_fs::{NamedTempFile, TempDir};
780
781 use super::*;
782
783 fn setup() -> anyhow::Result<(TempDir, ChildPath, ChildPath, ChildPath)> {
784 let temp = TempDir::new()?;
785
786 let origin = temp.child("origin");
787 origin.create_dir_all()?;
788 origin.child("README.md").write_str("Hello, world!")?;
789
790 let deduped = temp.child("deduped");
791 deduped.create_dir_all()?;
792
793 let cache = temp.child("cache.json");
794
795 {
796 let mut deduper = Deduper::new(
797 origin.to_path_buf(),
798 vec![cache.to_path_buf()],
799 HashingAlgorithm::MD5,
800 true,
801 );
802 deduper.write_chunks(deduped.to_path_buf(), 3)?;
803 deduper.write_cache();
804 }
805
806 Ok((temp, origin, deduped, cache))
807 }
808
809 #[test]
810 fn compare_filechunk_objects() -> anyhow::Result<()> {
811 let temp = TempDir::new()?;
812
813 let file_1 = temp.child("file_1");
814 std::fs::write(&file_1, "content_1")?;
815
816 let file_2 = temp.child("file_2");
817 std::fs::write(&file_2, "content_2")?;
818
819 let fwc_1 = FileWithChunks::try_new(&temp.path(), &file_1.path(), HashingAlgorithm::MD5)?;
820 let fwc_1_same =
821 FileWithChunks::try_new(&temp.path(), &file_1.path(), HashingAlgorithm::MD5)?;
822 let fwc_2 = FileWithChunks::try_new(&temp.path(), &file_2.path(), HashingAlgorithm::MD5)?;
823
824 assert_eq!(fwc_1, fwc_1);
825 assert_eq!(fwc_1, fwc_1_same);
826 assert_ne!(fwc_1, fwc_2);
827
828 File::open(&file_1)?.set_modified(SystemTime::now())?;
829
830 let fwc_1_new =
831 FileWithChunks::try_new(&temp.path(), &file_1.path(), HashingAlgorithm::MD5)?;
832
833 assert_ne!(fwc_1, fwc_1_new);
834
835 Ok(())
836 }
837
838 #[test]
839 fn check_all_hashing_algorithms() -> anyhow::Result<()> {
840 let algorithms = &[
841 (HashingAlgorithm::MD5, "0fb073cd346f46f60c15e719f3820482"),
842 (
843 HashingAlgorithm::SHA1,
844 "5503f5edc1bba66a7733c5ec38f4e9d449021be9",
845 ),
846 (
847 HashingAlgorithm::SHA256,
848 "e8c73ac958a87f17906b092bd99f37038788ee23b271574aad6d5bf1c76cc61c",
849 ),
850 (
851 HashingAlgorithm::SHA512,
852 "e6eda213df25f96ca380dd07640df530574e380c1b93d5d863fec05d5908a4880a3075fef4a438cfb1023cc51affb4624002f54b4790fe8362c7de032eb39aaa",
853 ),
854 ];
855
856 let temp = TempDir::new()?;
857 let file = temp.child("file");
858 std::fs::write(&file, "hello rust")?;
859
860 for (algorithm, expected_hash) in algorithms.iter().copied() {
861 let cache_file = NamedTempFile::new("cache.json")?;
862
863 let chunks = Deduper::new(temp.path(), vec![cache_file.path()], algorithm, true)
864 .cache
865 .get_chunks()?
866 .collect::<Vec<_>>();
867
868 assert_eq!(chunks.len(), 1, "Too many chunks");
869
870 let (hash, _, _) = &chunks[0];
871 assert_eq!(
872 hash, &expected_hash,
873 "Algorithm {:?} does not produce expected hash",
874 algorithm
875 );
876 }
877
878 Ok(())
879 }
880
881 #[test]
882 fn check_cache() -> anyhow::Result<()> {
883 let (_temp, _origin, deduped, cache) = setup()?;
884
885 assert!(
886 Hydrator::new(deduped.to_path_buf(), vec![cache.to_path_buf()]).check_cache(),
887 "Cache checking failed when it shouldn't"
888 );
889
890 std::fs::remove_dir_all(deduped.child("data").read_dir()?.next().unwrap()?.path())?;
891
892 assert!(
893 !Hydrator::new(deduped.to_path_buf(), vec![cache.to_path_buf()]).check_cache(),
894 "Cache checking didn't fail when it should"
895 );
896
897 Ok(())
898 }
899
900 #[test]
901 fn check_list_extra() -> anyhow::Result<()> {
902 let (_temp, _origin, deduped, cache) = setup()?;
903
904 assert_eq!(
905 Hydrator::new(deduped.to_path_buf(), vec![cache.to_path_buf()])
906 .list_extra_files(3)
907 .count(),
908 0,
909 "Extra files present when there shouldn't be"
910 );
911
912 deduped
913 .child("data")
914 .child("extra_file")
915 .write_str("Hello, world!")?;
916
917 assert_eq!(
918 Hydrator::new(deduped.to_path_buf(), vec![cache.to_path_buf()])
919 .list_extra_files(3)
920 .count(),
921 1,
922 "Number of extra files present is not 1"
923 );
924
925 deduped
926 .child("data")
927 .child("e")
928 .child("x")
929 .child("t")
930 .child("extra_file")
931 .write_str("Hello, world!")?;
932
933 assert_eq!(
934 Hydrator::new(deduped.to_path_buf(), vec![cache.to_path_buf()])
935 .list_extra_files(3)
936 .count(),
937 2,
938 "Number of extra files present is not 2"
939 );
940
941 Ok(())
942 }
943}