ddup_bak/
repository.rs

1use crate::{
2    archive::{
3        Archive, CompressionFormat, CompressionFormatCallback, ProgressCallback, entries::Entry,
4    },
5    chunks::{ChunkIndex, lock::LockMode, reader::EntryReader, storage},
6};
7use std::{
8    fs::{File, FileTimes},
9    io::{Cursor, Read, Write},
10    path::{Path, PathBuf},
11    sync::{Arc, Mutex, RwLock},
12};
13
14pub type DeletionProgressCallback = Option<Arc<dyn Fn(u64, bool) + Send + Sync + 'static>>;
15
16pub struct Repository {
17    pub directory: PathBuf,
18    pub save_on_drop: bool,
19
20    pub chunk_index: ChunkIndex,
21}
22
23impl Repository {
24    /// Opens an existing repository.
25    /// The repository must be initialized with `new` before use.
26    /// The repository directory must contain a `.ddup-bak` directory.
27    pub fn open(
28        directory: &Path,
29        chunks_directory: Option<&Path>,
30        storage: Option<Arc<dyn storage::ChunkStorage>>,
31    ) -> std::io::Result<Self> {
32        let chunk_index = ChunkIndex::open(
33            chunks_directory.map_or(directory.join(".ddup-bak/chunks"), |p| p.to_path_buf()),
34            storage.map_or(
35                Arc::new(storage::ChunkStorageLocal(
36                    directory.join(".ddup-bak/chunks"),
37                )),
38                |s| s,
39            ),
40        )?;
41
42        Ok(Self {
43            directory: directory.to_path_buf(),
44            save_on_drop: true,
45            chunk_index,
46        })
47    }
48
49    pub fn new(
50        directory: &Path,
51        chunk_size: usize,
52        max_chunk_count: usize,
53        storage: Option<Arc<dyn storage::ChunkStorage>>,
54    ) -> Self {
55        std::fs::create_dir_all(directory.join(".ddup-bak/archives")).unwrap();
56        std::fs::create_dir_all(directory.join(".ddup-bak/archives-restored")).unwrap();
57        std::fs::create_dir_all(directory.join(".ddup-bak/chunks")).unwrap();
58
59        let chunk_index = ChunkIndex::new(
60            directory.join(".ddup-bak/chunks"),
61            chunk_size,
62            max_chunk_count,
63            storage.map_or(
64                Arc::new(storage::ChunkStorageLocal(
65                    directory.join(".ddup-bak/chunks"),
66                )),
67                |s| s,
68            ),
69        );
70
71        Self {
72            directory: directory.to_path_buf(),
73            save_on_drop: true,
74            chunk_index,
75        }
76    }
77
78    pub fn save(&self) -> std::io::Result<()> {
79        self.chunk_index.save()?;
80
81        Ok(())
82    }
83
84    #[inline]
85    pub fn archive_path(&self, name: &str) -> PathBuf {
86        self.directory
87            .join(".ddup-bak/archives")
88            .join(format!("{name}.ddup"))
89    }
90
91    /// Sets the save_on_drop flag.
92    /// If set to true, the repository will save all changes to disk when dropped.
93    /// If set to false, the repository will not save changes when dropped.
94    /// This is useful for testing purposes, where you may want to discard changes.
95    /// By default, this flag is set to true and should NOT be changed.
96    #[inline]
97    pub const fn set_save_on_drop(&mut self, save_on_drop: bool) -> &mut Self {
98        self.save_on_drop = save_on_drop;
99
100        self
101    }
102
103    /// Lists all archives in the repository.
104    /// Returns a vector of archive names without the ".ddup" extension.
105    /// Example: "my_archive" instead of "my_archive.ddup".
106    /// The archives are stored in the ".ddup-bak/archives" directory.
107    pub fn list_archives(&self) -> std::io::Result<Vec<String>> {
108        let mut archives = Vec::new();
109        let archive_dir = self.directory.join(".ddup-bak/archives");
110
111        for entry in std::fs::read_dir(archive_dir)?.flatten() {
112            if let Some(name) = entry.file_name().to_str() {
113                if let Some(stripped) = name.strip_suffix(".ddup") {
114                    archives.push(stripped.to_string());
115                }
116            }
117        }
118
119        Ok(archives)
120    }
121
122    /// Gets an archive by name.
123    /// Do not use this method to extract data, the data is chunked and compressed.
124    /// Use `restore_archive` instead.
125    pub fn get_archive(&self, name: &str) -> std::io::Result<Archive> {
126        let archive_path = self.archive_path(name);
127
128        Archive::open(archive_path.to_str().unwrap())
129    }
130
131    pub fn clean(&self, progress: DeletionProgressCallback) -> std::io::Result<()> {
132        let mut w = self.chunk_index.lock.write_lock(LockMode::Destructive)?;
133        self.chunk_index.clean(progress)?;
134
135        w.unlock()?;
136
137        Ok(())
138    }
139
140    pub fn entry_reader(&self, entry: Entry) -> std::io::Result<EntryReader> {
141        match entry {
142            Entry::File(file_entry) => Ok(EntryReader::new(file_entry, self.chunk_index.clone())),
143            _ => Err(std::io::Error::new(
144                std::io::ErrorKind::InvalidData,
145                "Entry is not a file",
146            )),
147        }
148    }
149
150    #[inline]
151    pub fn archive_path_parent<'a>(
152        archive: &'a mut Archive,
153        entry: &Path,
154    ) -> Option<&'a mut Box<crate::archive::entries::DirectoryEntry>> {
155        archive
156            .find_archive_entry_mut(entry.parent()?)
157            .map(|e| match e {
158                Entry::Directory(dir) => dir,
159                _ => panic!("Parent entry is not a directory"),
160            })
161    }
162
163    #[allow(clippy::too_many_arguments)]
164    fn recursive_create_archive(
165        archive: Arc<Mutex<Option<Archive>>>,
166        chunk_index: &ChunkIndex,
167        entry: ignore::DirEntry,
168        metadata: std::fs::Metadata,
169        root_path: &Path,
170        progress_chunking: ProgressCallback,
171        compression_callback: CompressionFormatCallback,
172        scope: &rayon::Scope,
173        error: Arc<RwLock<Option<std::io::Error>>>,
174    ) -> std::io::Result<()> {
175        let path = entry.path().strip_prefix(root_path).map_err(|_| {
176            std::io::Error::new(
177                std::io::ErrorKind::InvalidInput,
178                "Path is not a subpath of the root directory",
179            )
180        })?;
181
182        if error.read().unwrap().is_some() {
183            return Ok(());
184        }
185
186        if let Some(f) = &progress_chunking {
187            f(entry.path())
188        }
189
190        if metadata.is_file() {
191            let compression = compression_callback
192                .as_ref()
193                .map(|f| f(path, &metadata))
194                .unwrap_or(CompressionFormat::Deflate);
195
196            let chunks =
197                chunk_index.chunk_file(&entry.path().to_path_buf(), compression, Some(scope))?;
198
199            let mut chunk_content = Vec::new();
200            for id in chunks {
201                chunk_content.extend_from_slice(&crate::varint::encode_u64(id));
202            }
203
204            let mut archive = archive.lock().unwrap();
205            let file_entry = archive.as_mut().unwrap().write_file_entry(
206                Cursor::new(chunk_content),
207                Some(metadata.len()),
208                path.file_name().unwrap().to_string_lossy().into_owned(),
209                metadata.permissions().into(),
210                metadata.modified().unwrap_or(std::time::SystemTime::now()),
211                {
212                    #[cfg(unix)]
213                    {
214                        use std::os::unix::fs::MetadataExt;
215                        (metadata.uid(), metadata.gid())
216                    }
217                    #[cfg(windows)]
218                    {
219                        (0, 0)
220                    }
221                },
222                compression,
223            )?;
224
225            if let Some(parent) = Self::archive_path_parent(archive.as_mut().unwrap(), path) {
226                parent.entries.push(Entry::File(file_entry));
227            } else {
228                archive
229                    .as_mut()
230                    .unwrap()
231                    .entries
232                    .push(Entry::File(file_entry));
233            }
234        } else if metadata.is_symlink() {
235            if let Ok(target) = std::fs::read_link(entry.path()) {
236                let mut archive = archive.lock().unwrap();
237
238                let link_entry = Entry::Symlink(Box::new(crate::archive::entries::SymlinkEntry {
239                    name: path.file_name().unwrap().to_string_lossy().into_owned(),
240                    mode: metadata.permissions().into(),
241                    mtime: metadata.modified().unwrap_or(std::time::SystemTime::now()),
242                    owner: {
243                        #[cfg(unix)]
244                        {
245                            use std::os::unix::fs::MetadataExt;
246                            (metadata.uid(), metadata.gid())
247                        }
248                        #[cfg(windows)]
249                        {
250                            (0, 0)
251                        }
252                    },
253                    target: target.to_string_lossy().into_owned(),
254                    target_dir: target.is_dir(),
255                }));
256
257                if let Some(parent) = Self::archive_path_parent(archive.as_mut().unwrap(), path) {
258                    parent.entries.push(link_entry);
259                } else {
260                    archive.as_mut().unwrap().entries.push(link_entry);
261                }
262            }
263        }
264
265        Ok(())
266    }
267
268    #[allow(clippy::too_many_arguments)]
269    pub fn create_archive(
270        &self,
271        name: &str,
272        directory: Option<ignore::Walk>,
273        directory_root: Option<&Path>,
274        progress_chunking: ProgressCallback,
275        compression_callback: CompressionFormatCallback,
276        threads: usize,
277    ) -> std::io::Result<Archive> {
278        if self.list_archives()?.contains(&name.to_string()) {
279            return Err(std::io::Error::new(
280                std::io::ErrorKind::AlreadyExists,
281                format!("Archive {name} already exists"),
282            ));
283        }
284
285        let mut w = self.chunk_index.lock.write_lock(LockMode::NonDestructive)?;
286
287        let archive_path = self.archive_path(name);
288
289        let worker_pool = Arc::new(
290            rayon::ThreadPoolBuilder::new()
291                .num_threads(threads)
292                .build()
293                .unwrap(),
294        );
295        let error = Arc::new(RwLock::new(None));
296
297        let walker = directory.unwrap_or_else(|| {
298            ignore::WalkBuilder::new(&self.directory)
299                .follow_links(false)
300                .git_global(false)
301                .build()
302        });
303
304        let archive = Arc::new(Mutex::new(Some(Archive::new(File::create(&archive_path)?))));
305
306        worker_pool.in_place_scope(|scope| {
307            for entry in walker.flatten() {
308                let path = entry.path();
309                let metadata = match path.symlink_metadata() {
310                    Ok(metadata) => metadata,
311                    Err(err) => {
312                        let mut error = error.write().unwrap();
313                        if error.is_none() {
314                            *error = Some(err);
315                        }
316                        break;
317                    }
318                };
319                if path.file_name() == Some(".ddup-bak".as_ref()) {
320                    continue;
321                }
322
323                if error.read().unwrap().is_some() {
324                    break;
325                }
326
327                if metadata.is_dir() {
328                    if path.file_name().is_none() {
329                        continue;
330                    }
331
332                    let mut archive = archive.lock().unwrap();
333
334                    let dir_entry =
335                        Entry::Directory(Box::new(crate::archive::entries::DirectoryEntry {
336                            name: path.file_name().unwrap().to_string_lossy().into_owned(),
337                            mode: metadata.permissions().into(),
338                            mtime: metadata.modified().unwrap_or(std::time::SystemTime::now()),
339                            owner: {
340                                #[cfg(unix)]
341                                {
342                                    use std::os::unix::fs::MetadataExt;
343                                    (metadata.uid(), metadata.gid())
344                                }
345                                #[cfg(windows)]
346                                {
347                                    (0, 0)
348                                }
349                            },
350                            entries: Vec::new(),
351                        }));
352
353                    if let Some(parent) = Self::archive_path_parent(
354                        archive.as_mut().unwrap(),
355                        path.strip_prefix(directory_root.unwrap_or(&self.directory))
356                            .unwrap(),
357                    ) {
358                        parent.entries.push(dir_entry);
359                    } else {
360                        archive.as_mut().unwrap().entries.push(dir_entry);
361                    }
362                }
363
364                scope.spawn({
365                    let error = Arc::clone(&error);
366                    let archive = Arc::clone(&archive);
367                    let chunk_index = self.chunk_index.clone();
368                    let directory_root = directory_root.unwrap_or(&self.directory);
369                    let progress_chunking = progress_chunking.clone();
370                    let compression_callback = compression_callback.clone();
371
372                    move |scope| {
373                        if let Err(err) = Self::recursive_create_archive(
374                            archive,
375                            &chunk_index,
376                            entry,
377                            metadata,
378                            directory_root,
379                            progress_chunking,
380                            compression_callback,
381                            scope,
382                            Arc::clone(&error),
383                        ) {
384                            let mut error = error.write().unwrap();
385                            if error.is_none() {
386                                *error = Some(err);
387                            }
388                        }
389                    }
390                });
391            }
392        });
393
394        if let Some(err) = error.write().unwrap().take() {
395            return Err(err);
396        }
397
398        let mut archive = archive.lock().unwrap().take().unwrap();
399        archive.write_end_header()?;
400
401        w.unlock()?;
402
403        Ok(archive)
404    }
405
406    pub fn read_entry_content<S: Write>(
407        &self,
408        entry: Entry,
409        stream: &mut S,
410    ) -> std::io::Result<()> {
411        match entry {
412            Entry::File(mut file_entry) => {
413                let mut buffer = [0; 4096];
414
415                loop {
416                    let chunk_id = crate::varint::decode_u64(&mut file_entry);
417                    if chunk_id == 0 {
418                        break;
419                    }
420
421                    let mut chunk = self.chunk_index.read_chunk_id_content(chunk_id)?;
422
423                    loop {
424                        let bytes_read = chunk.read(&mut buffer)?;
425                        if bytes_read == 0 {
426                            break;
427                        }
428
429                        stream.write_all(&buffer[..bytes_read])?;
430                    }
431                }
432
433                Ok(())
434            }
435            _ => Err(std::io::Error::new(
436                std::io::ErrorKind::InvalidData,
437                "Entry is not a file",
438            )),
439        }
440    }
441
442    fn recursive_restore_archive(
443        chunk_index: &ChunkIndex,
444        entry: Entry,
445        directory: &Path,
446        progress: ProgressCallback,
447        scope: &rayon::Scope,
448        error: Arc<RwLock<Option<std::io::Error>>>,
449    ) -> std::io::Result<()> {
450        let path = directory.join(entry.name());
451
452        if error.read().unwrap().is_some() {
453            return Ok(());
454        }
455
456        if let Some(f) = &progress {
457            f(&path)
458        }
459
460        match entry {
461            Entry::File(mut file_entry) => {
462                let mut file = File::create(&path)?;
463                let mut buffer = [0; 4096];
464
465                loop {
466                    let chunk_id = crate::varint::decode_u64(&mut file_entry);
467                    if chunk_id == 0 {
468                        break;
469                    }
470
471                    let mut chunk = chunk_index.read_chunk_id_content(chunk_id)?;
472
473                    loop {
474                        let bytes_read = chunk.read(&mut buffer)?;
475                        if bytes_read == 0 {
476                            break;
477                        }
478
479                        file.write_all(&buffer[..bytes_read])?;
480                    }
481                }
482
483                file.set_permissions(file_entry.mode.into())?;
484                file.set_times(FileTimes::new().set_modified(file_entry.mtime))?;
485
486                #[cfg(unix)]
487                {
488                    let (uid, gid) = file_entry.owner;
489
490                    std::os::unix::fs::lchown(&path, Some(uid), Some(gid))?;
491                }
492            }
493            Entry::Directory(dir_entry) => {
494                std::fs::create_dir_all(&path)?;
495
496                std::fs::set_permissions(&path, dir_entry.mode.into())?;
497
498                #[cfg(unix)]
499                {
500                    let (uid, gid) = dir_entry.owner;
501                    std::os::unix::fs::chown(&path, Some(uid), Some(gid))?;
502                }
503
504                for sub_entry in dir_entry.entries {
505                    scope.spawn({
506                        let error = Arc::clone(&error);
507                        let chunk_index = chunk_index.clone();
508                        let path = path.to_path_buf();
509                        let progress = progress.clone();
510
511                        move |scope| {
512                            if let Err(err) = Self::recursive_restore_archive(
513                                &chunk_index,
514                                sub_entry,
515                                &path,
516                                progress,
517                                scope,
518                                Arc::clone(&error),
519                            ) {
520                                let mut error = error.write().unwrap();
521                                if error.is_none() {
522                                    *error = Some(err);
523                                }
524                            }
525                        }
526                    });
527                }
528            }
529            #[cfg(unix)]
530            Entry::Symlink(link_entry) => {
531                std::os::unix::fs::symlink(link_entry.target, &path)?;
532                std::fs::set_permissions(&path, link_entry.mode.into())?;
533
534                let (uid, gid) = link_entry.owner;
535                std::os::unix::fs::lchown(&path, Some(uid), Some(gid))?;
536            }
537            #[cfg(windows)]
538            Entry::Symlink(link_entry) => {
539                if link_entry.target_dir {
540                    std::os::windows::fs::symlink_dir(link_entry.target, &path)?;
541                } else {
542                    std::os::windows::fs::symlink_file(link_entry.target, &path)?;
543                }
544
545                std::fs::set_permissions(&path, link_entry.mode.into())?;
546            }
547        }
548
549        Ok(())
550    }
551
552    pub fn restore_archive(
553        &self,
554        name: &str,
555        progress: ProgressCallback,
556        threads: usize,
557    ) -> std::io::Result<PathBuf> {
558        if !self.list_archives()?.contains(&name.to_string()) {
559            return Err(std::io::Error::new(
560                std::io::ErrorKind::NotFound,
561                format!("Archive {name} not found"),
562            ));
563        }
564
565        let mut r = self.chunk_index.lock.read_lock(LockMode::NonDestructive)?;
566
567        let archive_path = self.archive_path(name);
568        let archive = Archive::open(archive_path.to_str().unwrap())?;
569        let destination = self
570            .directory
571            .join(".ddup-bak/archives-restored")
572            .join(name);
573
574        std::fs::create_dir_all(&destination)?;
575
576        let worker_pool = Arc::new(
577            rayon::ThreadPoolBuilder::new()
578                .num_threads(threads)
579                .build()
580                .unwrap(),
581        );
582        let error = Arc::new(RwLock::new(None));
583
584        worker_pool.in_place_scope(|scope| {
585            for entry in archive.into_entries() {
586                scope.spawn({
587                    let error = Arc::clone(&error);
588                    let chunk_index = self.chunk_index.clone();
589                    let destination = destination.clone();
590                    let progress = progress.clone();
591
592                    move |scope| {
593                        if let Err(err) = Self::recursive_restore_archive(
594                            &chunk_index,
595                            entry,
596                            &destination,
597                            progress,
598                            scope,
599                            Arc::clone(&error),
600                        ) {
601                            let mut error = error.write().unwrap();
602                            if error.is_none() {
603                                *error = Some(err);
604                            }
605                        }
606                    }
607                });
608            }
609        });
610
611        if let Some(err) = error.write().unwrap().take() {
612            return Err(err);
613        }
614
615        r.unlock()?;
616
617        Ok(destination)
618    }
619
620    pub fn restore_entries(
621        &self,
622        name: &str,
623        entries: Vec<Entry>,
624        progress: ProgressCallback,
625        threads: usize,
626    ) -> std::io::Result<PathBuf> {
627        if !self.list_archives()?.contains(&name.to_string()) {
628            return Err(std::io::Error::new(
629                std::io::ErrorKind::NotFound,
630                format!("Archive {name} not found"),
631            ));
632        }
633
634        let mut r = self.chunk_index.lock.read_lock(LockMode::NonDestructive)?;
635
636        let destination = self
637            .directory
638            .join(".ddup-bak/archives-restored")
639            .join(name);
640
641        std::fs::create_dir_all(&destination)?;
642
643        let worker_pool = Arc::new(
644            rayon::ThreadPoolBuilder::new()
645                .num_threads(threads)
646                .build()
647                .unwrap(),
648        );
649        let error = Arc::new(RwLock::new(None));
650
651        worker_pool.in_place_scope(|scope| {
652            for entry in entries {
653                scope.spawn({
654                    let error = Arc::clone(&error);
655                    let chunk_index = self.chunk_index.clone();
656                    let destination = destination.clone();
657                    let progress = progress.clone();
658
659                    move |scope| {
660                        if let Err(err) = Self::recursive_restore_archive(
661                            &chunk_index,
662                            entry,
663                            &destination,
664                            progress,
665                            scope,
666                            Arc::clone(&error),
667                        ) {
668                            let mut error = error.write().unwrap();
669                            if error.is_none() {
670                                *error = Some(err);
671                            }
672                        }
673                    }
674                });
675            }
676        });
677
678        if let Some(err) = error.write().unwrap().take() {
679            return Err(err);
680        }
681
682        r.unlock()?;
683
684        Ok(destination)
685    }
686
687    fn recursive_delete_archive(
688        &self,
689        entry: Entry,
690        progress: DeletionProgressCallback,
691    ) -> std::io::Result<()> {
692        match entry {
693            Entry::File(mut file_entry) => loop {
694                let chunk_id = crate::varint::decode_u64(&mut file_entry);
695                if chunk_id == 0 {
696                    break;
697                }
698
699                if let Some(deleted) = self.chunk_index.dereference_chunk_id(chunk_id, true) {
700                    if let Some(f) = &progress {
701                        f(chunk_id, deleted)
702                    }
703                }
704            },
705            Entry::Directory(dir_entry) => {
706                for sub_entry in dir_entry.entries {
707                    self.recursive_delete_archive(sub_entry, progress.clone())?;
708                }
709            }
710            _ => {}
711        }
712
713        Ok(())
714    }
715
716    pub fn delete_archive(
717        &self,
718        name: &str,
719        progress: DeletionProgressCallback,
720    ) -> std::io::Result<()> {
721        if !self.list_archives()?.contains(&name.to_string()) {
722            return Err(std::io::Error::new(
723                std::io::ErrorKind::NotFound,
724                format!("Archive {name} not found"),
725            ));
726        }
727
728        let mut w = self.chunk_index.lock.write_lock(LockMode::Destructive)?;
729
730        let archive_path = self.archive_path(name);
731        let archive = Archive::open(archive_path.to_str().unwrap())?;
732
733        for entry in archive.into_entries() {
734            self.recursive_delete_archive(entry, progress.clone())?;
735        }
736
737        std::fs::remove_file(archive_path)?;
738
739        w.unlock()?;
740
741        Ok(())
742    }
743}
744
745impl Drop for Repository {
746    fn drop(&mut self) {
747        if self.save_on_drop {
748            if let Err(err) = self.save() {
749                eprintln!("Failed to save repository: {err}");
750            }
751        }
752    }
753}