1use crate::{
2 archive::{
3 Archive, CompressionFormat, CompressionFormatCallback, ProgressCallback, entries::Entry,
4 },
5 chunks::{ChunkIndex, lock::LockMode, reader::EntryReader, storage},
6};
7use std::{
8 fs::{File, FileTimes},
9 io::{Cursor, Read, Write},
10 path::{Path, PathBuf},
11 sync::{Arc, Mutex, RwLock},
12};
13
14pub type DeletionProgressCallback = Option<Arc<dyn Fn(u64, bool) + Send + Sync + 'static>>;
15
16pub struct Repository {
17 pub directory: PathBuf,
18 pub save_on_drop: bool,
19
20 pub chunk_index: ChunkIndex,
21}
22
23impl Repository {
24 pub fn open(
28 directory: &Path,
29 chunks_directory: Option<&Path>,
30 storage: Option<Arc<dyn storage::ChunkStorage>>,
31 ) -> std::io::Result<Self> {
32 let chunk_index = ChunkIndex::open(
33 chunks_directory.map_or(directory.join(".ddup-bak/chunks"), |p| p.to_path_buf()),
34 storage.map_or(
35 Arc::new(storage::ChunkStorageLocal(
36 directory.join(".ddup-bak/chunks"),
37 )),
38 |s| s,
39 ),
40 )?;
41
42 Ok(Self {
43 directory: directory.to_path_buf(),
44 save_on_drop: true,
45 chunk_index,
46 })
47 }
48
49 pub fn new(
50 directory: &Path,
51 chunk_size: usize,
52 max_chunk_count: usize,
53 storage: Option<Arc<dyn storage::ChunkStorage>>,
54 ) -> Self {
55 std::fs::create_dir_all(directory.join(".ddup-bak/archives")).unwrap();
56 std::fs::create_dir_all(directory.join(".ddup-bak/archives-restored")).unwrap();
57 std::fs::create_dir_all(directory.join(".ddup-bak/chunks")).unwrap();
58
59 let chunk_index = ChunkIndex::new(
60 directory.join(".ddup-bak/chunks"),
61 chunk_size,
62 max_chunk_count,
63 storage.map_or(
64 Arc::new(storage::ChunkStorageLocal(
65 directory.join(".ddup-bak/chunks"),
66 )),
67 |s| s,
68 ),
69 );
70
71 Self {
72 directory: directory.to_path_buf(),
73 save_on_drop: true,
74 chunk_index,
75 }
76 }
77
78 pub fn save(&self) -> std::io::Result<()> {
79 self.chunk_index.save()?;
80
81 Ok(())
82 }
83
84 #[inline]
85 pub fn archive_path(&self, name: &str) -> PathBuf {
86 self.directory
87 .join(".ddup-bak/archives")
88 .join(format!("{name}.ddup"))
89 }
90
91 #[inline]
97 pub const fn set_save_on_drop(&mut self, save_on_drop: bool) -> &mut Self {
98 self.save_on_drop = save_on_drop;
99
100 self
101 }
102
103 pub fn list_archives(&self) -> std::io::Result<Vec<String>> {
108 let mut archives = Vec::new();
109 let archive_dir = self.directory.join(".ddup-bak/archives");
110
111 for entry in std::fs::read_dir(archive_dir)?.flatten() {
112 if let Some(name) = entry.file_name().to_str() {
113 if let Some(stripped) = name.strip_suffix(".ddup") {
114 archives.push(stripped.to_string());
115 }
116 }
117 }
118
119 Ok(archives)
120 }
121
122 pub fn get_archive(&self, name: &str) -> std::io::Result<Archive> {
126 let archive_path = self.archive_path(name);
127
128 Archive::open(archive_path.to_str().unwrap())
129 }
130
131 pub fn clean(&self, progress: DeletionProgressCallback) -> std::io::Result<()> {
132 let mut w = self.chunk_index.lock.write_lock(LockMode::Destructive)?;
133 self.chunk_index.clean(progress)?;
134
135 w.unlock()?;
136
137 Ok(())
138 }
139
140 pub fn entry_reader(&self, entry: Entry) -> std::io::Result<EntryReader> {
141 match entry {
142 Entry::File(file_entry) => Ok(EntryReader::new(file_entry, self.chunk_index.clone())),
143 _ => Err(std::io::Error::new(
144 std::io::ErrorKind::InvalidData,
145 "Entry is not a file",
146 )),
147 }
148 }
149
150 #[inline]
151 pub fn archive_path_parent<'a>(
152 archive: &'a mut Archive,
153 entry: &Path,
154 ) -> Option<&'a mut Box<crate::archive::entries::DirectoryEntry>> {
155 archive
156 .find_archive_entry_mut(entry.parent()?)
157 .map(|e| match e {
158 Entry::Directory(dir) => dir,
159 _ => panic!("Parent entry is not a directory"),
160 })
161 }
162
163 #[allow(clippy::too_many_arguments)]
164 fn recursive_create_archive(
165 archive: Arc<Mutex<Option<Archive>>>,
166 chunk_index: &ChunkIndex,
167 entry: ignore::DirEntry,
168 metadata: std::fs::Metadata,
169 root_path: &Path,
170 progress_chunking: ProgressCallback,
171 compression_callback: CompressionFormatCallback,
172 scope: &rayon::Scope,
173 error: Arc<RwLock<Option<std::io::Error>>>,
174 ) -> std::io::Result<()> {
175 let path = entry.path().strip_prefix(root_path).map_err(|_| {
176 std::io::Error::new(
177 std::io::ErrorKind::InvalidInput,
178 "Path is not a subpath of the root directory",
179 )
180 })?;
181
182 if error.read().unwrap().is_some() {
183 return Ok(());
184 }
185
186 if let Some(f) = &progress_chunking {
187 f(entry.path())
188 }
189
190 if metadata.is_file() {
191 let compression = compression_callback
192 .as_ref()
193 .map(|f| f(path, &metadata))
194 .unwrap_or(CompressionFormat::Deflate);
195
196 let chunks =
197 chunk_index.chunk_file(&entry.path().to_path_buf(), compression, Some(scope))?;
198
199 let mut chunk_content = Vec::new();
200 for id in chunks {
201 chunk_content.extend_from_slice(&crate::varint::encode_u64(id));
202 }
203
204 let mut archive = archive.lock().unwrap();
205 let file_entry = archive.as_mut().unwrap().write_file_entry(
206 Cursor::new(chunk_content),
207 Some(metadata.len()),
208 path.file_name().unwrap().to_string_lossy().into_owned(),
209 metadata.permissions().into(),
210 metadata.modified().unwrap_or(std::time::SystemTime::now()),
211 {
212 #[cfg(unix)]
213 {
214 use std::os::unix::fs::MetadataExt;
215 (metadata.uid(), metadata.gid())
216 }
217 #[cfg(windows)]
218 {
219 (0, 0)
220 }
221 },
222 compression,
223 )?;
224
225 if let Some(parent) = Self::archive_path_parent(archive.as_mut().unwrap(), path) {
226 parent.entries.push(Entry::File(file_entry));
227 } else {
228 archive
229 .as_mut()
230 .unwrap()
231 .entries
232 .push(Entry::File(file_entry));
233 }
234 } else if metadata.is_symlink() {
235 if let Ok(target) = std::fs::read_link(entry.path()) {
236 let mut archive = archive.lock().unwrap();
237
238 let link_entry = Entry::Symlink(Box::new(crate::archive::entries::SymlinkEntry {
239 name: path.file_name().unwrap().to_string_lossy().into_owned(),
240 mode: metadata.permissions().into(),
241 mtime: metadata.modified().unwrap_or(std::time::SystemTime::now()),
242 owner: {
243 #[cfg(unix)]
244 {
245 use std::os::unix::fs::MetadataExt;
246 (metadata.uid(), metadata.gid())
247 }
248 #[cfg(windows)]
249 {
250 (0, 0)
251 }
252 },
253 target: target.to_string_lossy().into_owned(),
254 target_dir: target.is_dir(),
255 }));
256
257 if let Some(parent) = Self::archive_path_parent(archive.as_mut().unwrap(), path) {
258 parent.entries.push(link_entry);
259 } else {
260 archive.as_mut().unwrap().entries.push(link_entry);
261 }
262 }
263 }
264
265 Ok(())
266 }
267
268 #[allow(clippy::too_many_arguments)]
269 pub fn create_archive(
270 &self,
271 name: &str,
272 directory: Option<ignore::Walk>,
273 directory_root: Option<&Path>,
274 progress_chunking: ProgressCallback,
275 compression_callback: CompressionFormatCallback,
276 threads: usize,
277 ) -> std::io::Result<Archive> {
278 if self.list_archives()?.contains(&name.to_string()) {
279 return Err(std::io::Error::new(
280 std::io::ErrorKind::AlreadyExists,
281 format!("Archive {name} already exists"),
282 ));
283 }
284
285 let mut w = self.chunk_index.lock.write_lock(LockMode::NonDestructive)?;
286
287 let archive_path = self.archive_path(name);
288
289 let worker_pool = Arc::new(
290 rayon::ThreadPoolBuilder::new()
291 .num_threads(threads)
292 .build()
293 .unwrap(),
294 );
295 let error = Arc::new(RwLock::new(None));
296
297 let walker = directory.unwrap_or_else(|| {
298 ignore::WalkBuilder::new(&self.directory)
299 .follow_links(false)
300 .git_global(false)
301 .build()
302 });
303
304 let archive = Arc::new(Mutex::new(Some(Archive::new(File::create(&archive_path)?))));
305
306 worker_pool.in_place_scope(|scope| {
307 for entry in walker.flatten() {
308 let path = entry.path();
309 let metadata = match path.symlink_metadata() {
310 Ok(metadata) => metadata,
311 Err(err) => {
312 let mut error = error.write().unwrap();
313 if error.is_none() {
314 *error = Some(err);
315 }
316 break;
317 }
318 };
319 if path.file_name() == Some(".ddup-bak".as_ref()) {
320 continue;
321 }
322
323 if error.read().unwrap().is_some() {
324 break;
325 }
326
327 if metadata.is_dir() {
328 if path.file_name().is_none() {
329 continue;
330 }
331
332 let mut archive = archive.lock().unwrap();
333
334 let dir_entry =
335 Entry::Directory(Box::new(crate::archive::entries::DirectoryEntry {
336 name: path.file_name().unwrap().to_string_lossy().into_owned(),
337 mode: metadata.permissions().into(),
338 mtime: metadata.modified().unwrap_or(std::time::SystemTime::now()),
339 owner: {
340 #[cfg(unix)]
341 {
342 use std::os::unix::fs::MetadataExt;
343 (metadata.uid(), metadata.gid())
344 }
345 #[cfg(windows)]
346 {
347 (0, 0)
348 }
349 },
350 entries: Vec::new(),
351 }));
352
353 if let Some(parent) = Self::archive_path_parent(
354 archive.as_mut().unwrap(),
355 path.strip_prefix(directory_root.unwrap_or(&self.directory))
356 .unwrap(),
357 ) {
358 parent.entries.push(dir_entry);
359 } else {
360 archive.as_mut().unwrap().entries.push(dir_entry);
361 }
362 }
363
364 scope.spawn({
365 let error = Arc::clone(&error);
366 let archive = Arc::clone(&archive);
367 let chunk_index = self.chunk_index.clone();
368 let directory_root = directory_root.unwrap_or(&self.directory);
369 let progress_chunking = progress_chunking.clone();
370 let compression_callback = compression_callback.clone();
371
372 move |scope| {
373 if let Err(err) = Self::recursive_create_archive(
374 archive,
375 &chunk_index,
376 entry,
377 metadata,
378 directory_root,
379 progress_chunking,
380 compression_callback,
381 scope,
382 Arc::clone(&error),
383 ) {
384 let mut error = error.write().unwrap();
385 if error.is_none() {
386 *error = Some(err);
387 }
388 }
389 }
390 });
391 }
392 });
393
394 if let Some(err) = error.write().unwrap().take() {
395 return Err(err);
396 }
397
398 let mut archive = archive.lock().unwrap().take().unwrap();
399 archive.write_end_header()?;
400
401 w.unlock()?;
402
403 Ok(archive)
404 }
405
406 pub fn read_entry_content<S: Write>(
407 &self,
408 entry: Entry,
409 stream: &mut S,
410 ) -> std::io::Result<()> {
411 match entry {
412 Entry::File(mut file_entry) => {
413 let mut buffer = [0; 4096];
414
415 loop {
416 let chunk_id = crate::varint::decode_u64(&mut file_entry);
417 if chunk_id == 0 {
418 break;
419 }
420
421 let mut chunk = self.chunk_index.read_chunk_id_content(chunk_id)?;
422
423 loop {
424 let bytes_read = chunk.read(&mut buffer)?;
425 if bytes_read == 0 {
426 break;
427 }
428
429 stream.write_all(&buffer[..bytes_read])?;
430 }
431 }
432
433 Ok(())
434 }
435 _ => Err(std::io::Error::new(
436 std::io::ErrorKind::InvalidData,
437 "Entry is not a file",
438 )),
439 }
440 }
441
442 fn recursive_restore_archive(
443 chunk_index: &ChunkIndex,
444 entry: Entry,
445 directory: &Path,
446 progress: ProgressCallback,
447 scope: &rayon::Scope,
448 error: Arc<RwLock<Option<std::io::Error>>>,
449 ) -> std::io::Result<()> {
450 let path = directory.join(entry.name());
451
452 if error.read().unwrap().is_some() {
453 return Ok(());
454 }
455
456 if let Some(f) = &progress {
457 f(&path)
458 }
459
460 match entry {
461 Entry::File(mut file_entry) => {
462 let mut file = File::create(&path)?;
463 let mut buffer = [0; 4096];
464
465 loop {
466 let chunk_id = crate::varint::decode_u64(&mut file_entry);
467 if chunk_id == 0 {
468 break;
469 }
470
471 let mut chunk = chunk_index.read_chunk_id_content(chunk_id)?;
472
473 loop {
474 let bytes_read = chunk.read(&mut buffer)?;
475 if bytes_read == 0 {
476 break;
477 }
478
479 file.write_all(&buffer[..bytes_read])?;
480 }
481 }
482
483 file.set_permissions(file_entry.mode.into())?;
484 file.set_times(FileTimes::new().set_modified(file_entry.mtime))?;
485
486 #[cfg(unix)]
487 {
488 let (uid, gid) = file_entry.owner;
489
490 std::os::unix::fs::lchown(&path, Some(uid), Some(gid))?;
491 }
492 }
493 Entry::Directory(dir_entry) => {
494 std::fs::create_dir_all(&path)?;
495
496 std::fs::set_permissions(&path, dir_entry.mode.into())?;
497
498 #[cfg(unix)]
499 {
500 let (uid, gid) = dir_entry.owner;
501 std::os::unix::fs::chown(&path, Some(uid), Some(gid))?;
502 }
503
504 for sub_entry in dir_entry.entries {
505 scope.spawn({
506 let error = Arc::clone(&error);
507 let chunk_index = chunk_index.clone();
508 let path = path.to_path_buf();
509 let progress = progress.clone();
510
511 move |scope| {
512 if let Err(err) = Self::recursive_restore_archive(
513 &chunk_index,
514 sub_entry,
515 &path,
516 progress,
517 scope,
518 Arc::clone(&error),
519 ) {
520 let mut error = error.write().unwrap();
521 if error.is_none() {
522 *error = Some(err);
523 }
524 }
525 }
526 });
527 }
528 }
529 #[cfg(unix)]
530 Entry::Symlink(link_entry) => {
531 std::os::unix::fs::symlink(link_entry.target, &path)?;
532 std::fs::set_permissions(&path, link_entry.mode.into())?;
533
534 let (uid, gid) = link_entry.owner;
535 std::os::unix::fs::lchown(&path, Some(uid), Some(gid))?;
536 }
537 #[cfg(windows)]
538 Entry::Symlink(link_entry) => {
539 if link_entry.target_dir {
540 std::os::windows::fs::symlink_dir(link_entry.target, &path)?;
541 } else {
542 std::os::windows::fs::symlink_file(link_entry.target, &path)?;
543 }
544
545 std::fs::set_permissions(&path, link_entry.mode.into())?;
546 }
547 }
548
549 Ok(())
550 }
551
552 pub fn restore_archive(
553 &self,
554 name: &str,
555 progress: ProgressCallback,
556 threads: usize,
557 ) -> std::io::Result<PathBuf> {
558 if !self.list_archives()?.contains(&name.to_string()) {
559 return Err(std::io::Error::new(
560 std::io::ErrorKind::NotFound,
561 format!("Archive {name} not found"),
562 ));
563 }
564
565 let mut r = self.chunk_index.lock.read_lock(LockMode::NonDestructive)?;
566
567 let archive_path = self.archive_path(name);
568 let archive = Archive::open(archive_path.to_str().unwrap())?;
569 let destination = self
570 .directory
571 .join(".ddup-bak/archives-restored")
572 .join(name);
573
574 std::fs::create_dir_all(&destination)?;
575
576 let worker_pool = Arc::new(
577 rayon::ThreadPoolBuilder::new()
578 .num_threads(threads)
579 .build()
580 .unwrap(),
581 );
582 let error = Arc::new(RwLock::new(None));
583
584 worker_pool.in_place_scope(|scope| {
585 for entry in archive.into_entries() {
586 scope.spawn({
587 let error = Arc::clone(&error);
588 let chunk_index = self.chunk_index.clone();
589 let destination = destination.clone();
590 let progress = progress.clone();
591
592 move |scope| {
593 if let Err(err) = Self::recursive_restore_archive(
594 &chunk_index,
595 entry,
596 &destination,
597 progress,
598 scope,
599 Arc::clone(&error),
600 ) {
601 let mut error = error.write().unwrap();
602 if error.is_none() {
603 *error = Some(err);
604 }
605 }
606 }
607 });
608 }
609 });
610
611 if let Some(err) = error.write().unwrap().take() {
612 return Err(err);
613 }
614
615 r.unlock()?;
616
617 Ok(destination)
618 }
619
620 pub fn restore_entries(
621 &self,
622 name: &str,
623 entries: Vec<Entry>,
624 progress: ProgressCallback,
625 threads: usize,
626 ) -> std::io::Result<PathBuf> {
627 if !self.list_archives()?.contains(&name.to_string()) {
628 return Err(std::io::Error::new(
629 std::io::ErrorKind::NotFound,
630 format!("Archive {name} not found"),
631 ));
632 }
633
634 let mut r = self.chunk_index.lock.read_lock(LockMode::NonDestructive)?;
635
636 let destination = self
637 .directory
638 .join(".ddup-bak/archives-restored")
639 .join(name);
640
641 std::fs::create_dir_all(&destination)?;
642
643 let worker_pool = Arc::new(
644 rayon::ThreadPoolBuilder::new()
645 .num_threads(threads)
646 .build()
647 .unwrap(),
648 );
649 let error = Arc::new(RwLock::new(None));
650
651 worker_pool.in_place_scope(|scope| {
652 for entry in entries {
653 scope.spawn({
654 let error = Arc::clone(&error);
655 let chunk_index = self.chunk_index.clone();
656 let destination = destination.clone();
657 let progress = progress.clone();
658
659 move |scope| {
660 if let Err(err) = Self::recursive_restore_archive(
661 &chunk_index,
662 entry,
663 &destination,
664 progress,
665 scope,
666 Arc::clone(&error),
667 ) {
668 let mut error = error.write().unwrap();
669 if error.is_none() {
670 *error = Some(err);
671 }
672 }
673 }
674 });
675 }
676 });
677
678 if let Some(err) = error.write().unwrap().take() {
679 return Err(err);
680 }
681
682 r.unlock()?;
683
684 Ok(destination)
685 }
686
687 fn recursive_delete_archive(
688 &self,
689 entry: Entry,
690 progress: DeletionProgressCallback,
691 ) -> std::io::Result<()> {
692 match entry {
693 Entry::File(mut file_entry) => loop {
694 let chunk_id = crate::varint::decode_u64(&mut file_entry);
695 if chunk_id == 0 {
696 break;
697 }
698
699 if let Some(deleted) = self.chunk_index.dereference_chunk_id(chunk_id, true) {
700 if let Some(f) = &progress {
701 f(chunk_id, deleted)
702 }
703 }
704 },
705 Entry::Directory(dir_entry) => {
706 for sub_entry in dir_entry.entries {
707 self.recursive_delete_archive(sub_entry, progress.clone())?;
708 }
709 }
710 _ => {}
711 }
712
713 Ok(())
714 }
715
716 pub fn delete_archive(
717 &self,
718 name: &str,
719 progress: DeletionProgressCallback,
720 ) -> std::io::Result<()> {
721 if !self.list_archives()?.contains(&name.to_string()) {
722 return Err(std::io::Error::new(
723 std::io::ErrorKind::NotFound,
724 format!("Archive {name} not found"),
725 ));
726 }
727
728 let mut w = self.chunk_index.lock.write_lock(LockMode::Destructive)?;
729
730 let archive_path = self.archive_path(name);
731 let archive = Archive::open(archive_path.to_str().unwrap())?;
732
733 for entry in archive.into_entries() {
734 self.recursive_delete_archive(entry, progress.clone())?;
735 }
736
737 std::fs::remove_file(archive_path)?;
738
739 w.unlock()?;
740
741 Ok(())
742 }
743}
744
745impl Drop for Repository {
746 fn drop(&mut self) {
747 if self.save_on_drop {
748 if let Err(err) = self.save() {
749 eprintln!("Failed to save repository: {err}");
750 }
751 }
752 }
753}