1use std::collections::HashSet;
3use std::io;
4use std::io::{Error, Read, Result, Write};
5use std::iter::Iterator;
6use std::path::{Path, PathBuf};
7use std::sync::{mpsc, Arc};
8
9use sgdata::SGData;
10use slog::{info, o, warn, FnValue, Level, Logger};
11use slog_perf::TimeReporter;
12use sodiumoxide::crypto::{self, box_, secretbox};
13use url::Url;
14
15use rdedup_cdc as rollsum;
16
17mod iterators;
18
19mod config;
20
21mod aio;
22use crate::aio::*;
23
24mod chunking;
25mod hashing;
26
27mod chunk_processor;
28use crate::chunk_processor::*;
29
30mod sorting_recv;
31use crate::sorting_recv::SortingIterator;
32
33mod encryption;
34use crate::encryption::EncryptionEngine;
35
36mod compression;
37use crate::compression::ArcCompression;
38
39mod pwhash;
40
41pub mod settings;
42
43mod util;
44use self::util::*;
45
46mod reading;
47use self::reading::*;
48
49mod generation;
50use self::generation::*;
51
52mod name;
53use self::name::*;
54
55mod misc;
56use self::misc::*;
57pub mod backends {
61 pub use crate::aio::backend::{Backend, BackendThread, Lock};
62 pub use crate::aio::Metadata;
63
64 pub mod local {
65 pub use crate::aio::local::{Local, LocalThread};
66 }
67
68 #[cfg(feature = "backend-b2")]
69 pub mod b2 {
70 pub use crate::aio::b2::{Auth, B2Thread, Lock, B2};
71 }
72}
73
74type ArcDecrypter = Arc<dyn encryption::Decrypter + Send + Sync + 'static>;
75type ArcEncrypter = Arc<dyn encryption::Encrypter + Send + Sync + 'static>;
76
77const INGRESS_BUFFER_SIZE: usize = 128 * 1024;
78const DIGEST_SIZE: usize = 32;
79
80pub type PassphraseFn<'a> = &'a dyn Fn() -> io::Result<String>;
82
83pub type BackendSelectFn = &'static (dyn Fn(&Url) -> io::Result<Box<dyn backends::Backend + Send + Sync>>
85 + Send
86 + Sync);
87
88#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89pub enum DataType {
91 Index,
92 Data,
93}
94
95impl DataType {
96 fn should_compress(&self) -> bool {
97 *self == DataType::Data
98 }
99
100 fn should_encrypt(&self) -> bool {
101 *self == DataType::Data
102 }
103}
104
105pub struct VerifyResults {
106 pub scanned: usize,
107 pub errors: Vec<(Vec<u8>, Error)>,
108}
109
110pub struct DuResults {
111 pub chunks: usize,
112 pub bytes: u64,
113}
114
115pub struct DecryptHandle {
119 decrypter: ArcDecrypter,
120}
121
122pub struct EncryptHandle {
126 encrypter: ArcEncrypter,
127}
128
129#[derive(Clone)]
132pub struct Repo {
133 url: Url,
134 backend_select: BackendSelectFn,
135 config: config::Repo,
136
137 compression: compression::ArcCompression,
138 hasher: hashing::ArcHasher,
139
140 log: slog::Logger,
142
143 aio: aio::AsyncIO,
144}
145
146impl Repo {
147 pub fn unlock_decrypt(
148 &self,
149 pass: PassphraseFn<'_>,
150 ) -> io::Result<DecryptHandle> {
151 info!(self.log, "Opening read handle");
152 let decrypter = self
153 .config
154 .encryption
155 .decrypter(pass, &self.config.pwhash)?;
156
157 Ok(DecryptHandle { decrypter })
158 }
159
160 pub fn unlock_encrypt(
161 &self,
162 pass: PassphraseFn<'_>,
163 ) -> io::Result<EncryptHandle> {
164 info!(self.log, "Opening write handle");
165 let encrypter = self
166 .config
167 .encryption
168 .encrypter(pass, &self.config.pwhash)?;
169
170 Ok(EncryptHandle { encrypter })
171 }
172
173 fn ensure_repo_empty_or_new(aio: &AsyncIO) -> Result<()> {
174 let list = aio.list(PathBuf::from(".")).wait();
175
176 if list.is_ok() && !list.unwrap().is_empty() {
177 return Err(Error::new(
178 io::ErrorKind::AlreadyExists,
179 "repo dir must not exist or be empty to be used",
180 ));
181 }
182 Ok(())
183 }
184
185 pub fn init<L>(
187 url: &Url,
188 passphrase: PassphraseFn<'_>,
189 settings: settings::Repo,
190 log: L,
191 ) -> Result<Repo>
192 where
193 L: Into<Option<Logger>>,
194 {
195 Self::init_custom(
196 url,
197 &aio::backend_from_url,
198 passphrase,
199 settings,
200 log,
201 )
202 }
203
204 pub fn open<L>(url: &Url, log: L) -> Result<Repo>
205 where
206 L: Into<Option<Logger>>,
207 {
208 Self::open_custom(url, &aio::backend_from_url, log)
209 }
210
211 pub fn init_custom<L>(
213 url: &Url,
214 backend_select: BackendSelectFn,
215 passphrase: PassphraseFn<'_>,
216 settings: settings::Repo,
217 log: L,
218 ) -> Result<Repo>
219 where
220 L: Into<Option<Logger>>,
221 {
222 let log = log
223 .into()
224 .unwrap_or_else(|| Logger::root(slog::Discard, o!()));
225
226 let backend = backend_select(&url)?;
227 let aio = aio::AsyncIO::new(backend, log.clone())?;
228
229 Repo::ensure_repo_empty_or_new(&aio)?;
230 let config = config::Repo::new_from_settings(passphrase, settings)?;
231 config.write(&aio)?;
232
233 let compression = config.compression.to_engine();
234 let hasher = config.hashing.to_hasher();
235
236 Ok(Repo {
237 url: url.clone(),
238 backend_select,
239 config,
240 compression,
241 hasher,
242 log,
243 aio,
244 })
245 }
246
247 pub fn open_custom<L>(
248 url: &Url,
249 backend_select: BackendSelectFn,
250 log: L,
251 ) -> Result<Repo>
252 where
253 L: Into<Option<Logger>>,
254 {
255 let log = log
256 .into()
257 .unwrap_or_else(|| Logger::root(slog::Discard, o!()));
258
259 let backend = backend_select(url)?;
260 let aio = aio::AsyncIO::new(backend, log.clone())?;
261
262 let config = config::Repo::read(&aio)?;
263
264 let compression = config.compression.to_engine();
265 let hasher = config.hashing.to_hasher();
266 Ok(Repo {
267 url: url.clone(),
268 backend_select,
269 config,
270 compression,
271 hasher,
272 log,
273 aio,
274 })
275 }
276
277 pub fn change_passphrase(
279 &mut self,
280 old_p: PassphraseFn<'_>,
281 new_p: PassphraseFn<'_>,
282 ) -> Result<()> {
283 let _lock = self.aio.lock_exclusive();
284
285 if self.config.version == 0 {
286 Err(Error::new(
287 io::ErrorKind::NotFound,
288 "rdedup v0 config format not supported",
289 ))
290 } else {
291 self.config.encryption.change_passphrase(
292 old_p,
293 new_p,
294 &self.config.pwhash,
295 )?;
296 self.config.write(&self.aio)?;
297 Ok(())
298 }
299 }
300
301 fn chunk_and_write_data_thread<'a>(
303 &'a self,
304 input_data_iter: Box<dyn Iterator<Item = Vec<u8>> + Send + 'a>,
305 process_tx: crossbeam_channel::Sender<chunk_processor::Message>,
306 aio: aio::AsyncIO,
307 data_type: DataType,
308 ) -> io::Result<DataAddress> {
309 let (digests_tx, digests_rx) = mpsc::channel();
320
321 crossbeam::scope(move |scope| {
322 let mut timer = slog_perf::TimeReporter::new_with_level(
323 "index-processor",
324 self.log.clone(),
325 Level::Debug,
326 );
327 timer.start("spawn-chunker");
328
329 scope.spawn({
330 let process_tx = process_tx.clone();
331 move |_| {
332 let mut timer = slog_perf::TimeReporter::new_with_level(
333 "chunker",
334 self.log.clone(),
335 Level::Debug,
336 );
337
338 let chunker = chunking::Chunker::new(
339 input_data_iter,
340 self.config.chunking.to_engine(),
341 );
342
343 let mut data = util::EnumerateU64::new(chunker);
344
345 while let Some(i_sg) =
346 timer.start_with("rx-and-chunking", || data.next())
347 {
348 timer.start("tx");
349 let (i, sg) = i_sg;
350 process_tx
351 .send(chunk_processor::Message {
352 data: (i as u64, sg),
353 response_tx: digests_tx.clone(),
354 data_type,
355 })
356 .expect("chunk process tx channel closed")
357 }
358 drop(digests_tx);
359 }
360 });
361
362 timer.start("sorting-recv-create");
363 let mut digests_rx = SortingIterator::new(digests_rx.into_iter());
364
365 timer.start("digest-rx");
366 let first_digest =
367 digests_rx.next().expect("At least one index digest");
368
369 if let Some(second_digest) =
370 timer.start_with("digest-rx", || digests_rx.next())
371 {
372 let mut two_first = vec![first_digest, second_digest];
373 let mut address = self.chunk_and_write_data_thread(
374 Box::new(
375 two_first
376 .drain(..)
377 .chain(digests_rx)
378 .map(|digest| digest.0),
379 ),
380 process_tx,
381 aio.clone(),
382 DataType::Index,
383 )?;
384
385 address.index_level += 1;
386 Ok(address)
387 } else {
388 Ok(DataAddress {
389 index_level: 0,
390 digest: first_digest,
391 })
392 }
393 })
394 .expect("chunker thread failed")
395 }
396
397 fn write_cpu_thread_num(&self) -> usize {
400 num_cpus::get()
401 }
402
403 fn input_reader_thread<R>(
404 &self,
405 reader: R,
406 chunker_tx: mpsc::SyncSender<Vec<u8>>,
407 ) where
408 R: Read + Send,
409 {
410 let mut time = TimeReporter::new_with_level(
411 "input-reader",
412 self.log.clone(),
413 Level::Debug,
414 );
415
416 let r2vi = ReaderVecIter::new(reader, INGRESS_BUFFER_SIZE);
417 let mut while_ok = WhileOk::new(r2vi);
418
419 while let Some(buf) = time.start_with("input", || while_ok.next()) {
420 time.start("tx");
421 chunker_tx.send(buf).expect("chunker tx channel closed")
422 }
423
424 if let Some(e) = while_ok.finish() {
425 panic!("Input thread error: {}", e)
426 }
427 }
428
429 fn get_chunk_accessor(
430 &self,
431 decrypter: Option<ArcDecrypter>,
432 compression: ArcCompression,
433 generations: Vec<Generation>,
434 ) -> DefaultChunkAccessor<'_> {
435 DefaultChunkAccessor::new(self, decrypter, compression, generations)
436 }
437
438 fn get_recording_chunk_accessor<'a>(
439 &'a self,
440 accessed: &'a mut HashSet<Vec<u8>>,
441 decrypter: Option<ArcDecrypter>,
442 compression: ArcCompression,
443 generations: Vec<Generation>,
444 ) -> RecordingChunkAccessor<'a> {
445 RecordingChunkAccessor::new(
446 self,
447 accessed,
448 decrypter,
449 compression,
450 generations,
451 )
452 }
453
454 fn wipe_generation_maybe(
455 &self,
456 gen: Generation,
457 min_age_secs: u64,
458 ) -> io::Result<()> {
459 let gen_config = match gen.load_config(&self.aio) {
460 Ok(c) => c,
461 Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
462 info!(
463 self.log,
464 "Generation config file not found. Rerun GC later to finish";
465 );
466
467 return Ok(());
468 }
469 Err(e) => return Err(e),
470 };
471
472 if gen_config.created + chrono::Duration::seconds(min_age_secs as i64)
473 > chrono::Utc::now()
474 {
475 info!(
476 self.log,
477 "Generation is not old enough. Rerun GC later to finish";
478 "gen" => FnValue(|_| gen.to_string()),
479 "gen-created" => gen_config.created.to_rfc3339(),
480 "now" => chrono::Utc::now().to_rfc3339(),
481 );
482 return Ok(());
483 }
484 info!(
485 self.log,
486 "Reclaiming old generation finished. Deleting...";
487 "gen" => FnValue(|_| gen.to_string()),
488 );
489
490 substitute_err_not_found(
495 self.aio
496 .remove_dir_all(
497 PathBuf::from(gen.to_string()).join(NAME_SUBDIR),
498 )
499 .wait(),
500 || (),
501 )?;
502
503 substitute_err_not_found(
504 self.aio
505 .remove_dir_all(
506 PathBuf::from(gen.to_string()).join(config::DATA_SUBDIR),
507 )
508 .wait(),
509 || (),
510 )?;
511
512 self.aio
513 .remove_dir_all(PathBuf::from(gen.to_string()))
514 .wait()?;
515
516 Ok(())
517 }
518
519 fn update_name_to(
520 &self,
521 name_str: &str,
522 cur_gen: Generation,
523 generations: &[Generation],
524 ) -> io::Result<()> {
525 info!(
528 self.log,
529 "Updating name to current generation";
530 "name" => name_str,
531 "gen" => FnValue(|_| cur_gen.to_string())
532 );
533 let name = Name::load_from_any(name_str, generations, &self.aio)?;
534 let data_address: DataAddress = name.into();
535
536 let accessor = GenerationUpdateChunkAccessor::new(
537 self,
538 Arc::clone(&self.compression),
539 generations.to_vec(),
540 );
541 {
542 let traverser = ReadContext::new(&accessor);
543 traverser.read_recursively(ReadRequest::new(
544 DataType::Data,
545 data_address.as_ref(),
546 None,
547 self.log.clone(),
548 ))?;
549 }
550
551 Name::update_generation_to(name_str, cur_gen, generations, &self.aio)?;
552
553 Ok(())
554 }
555
556 fn reachable_recursively_insert(
557 &self,
558 da: DataAddressRef<'_>,
559 reachable_digests: &mut HashSet<Vec<u8>>,
560 generations: Vec<Generation>,
561 ) -> Result<()> {
562 reachable_digests.insert(da.digest.0.into());
563
564 let accessor = self.get_recording_chunk_accessor(
565 reachable_digests,
566 None,
567 Arc::clone(&self.compression),
568 generations,
569 );
570 let traverser = ReadContext::new(&accessor);
571 traverser.read_recursively(ReadRequest::new(
572 DataType::Data,
573 da,
574 None,
575 self.log.clone(),
576 ))
577 }
578
579 #[allow(dead_code)] fn list_reachable_chunks(&self) -> Result<HashSet<Vec<u8>>> {
582 let generations = self.read_generations()?;
583 let mut reachable_digests = HashSet::new();
584 let all_names = Name::list_all(&generations, &self.aio)?;
585 for name_str in &all_names {
586 match Name::load_from_any(name_str, &generations, &self.aio) {
587 Ok(name) => {
588 let data_address: DataAddress = name.into();
589 info!(self.log, "processing"; "name" => name_str);
590 self.reachable_recursively_insert(
591 data_address.as_ref(),
592 &mut reachable_digests,
593 generations.clone(),
594 )?;
595 }
596 Err(e) => {
597 info!(
598 self.log,
599 "skipped";
600 "name" => name_str, "error" => e.to_string()
601 );
602 }
603 }
604 }
605
606 Ok(reachable_digests)
607 }
608
609 fn chunk_rel_path_by_digest(
610 &self,
611 digest: DigestRef<'_>,
612 gen_str: &str,
613 ) -> PathBuf {
614 self.config.nesting.get_path(
615 Path::new(config::DATA_SUBDIR),
616 digest.0,
617 gen_str,
618 )
619 }
620
621 pub fn list_names(&self) -> io::Result<Vec<String>> {
622 let _lock = self.aio.lock_shared();
623 Name::list_all(&self.read_generations()?, &self.aio)
624 }
625
626 pub fn rm(&self, name: &str) -> Result<()> {
628 let _lock = self.aio.lock_exclusive();
629 Name::remove_any(name, &self.read_generations()?, &self.aio)
630 }
631
632 pub fn gc(&self, min_age_secs: u64) -> Result<()> {
633 let _lock = self.aio.lock_exclusive();
634
635 let generations = self.read_generations()?;
636
637 if generations.is_empty() {
638 info!(self.log, "Nothing in the repository yet, nothing to gc");
639 return Ok(());
640 }
641
642 if generations.len() == 1 {
643 let new_gen = generations.last().unwrap().gen_next();
644 info!(self.log, "Creating new generation"; "gen" => FnValue(|_| new_gen.to_string()));
645 new_gen.write(&self.aio)?;
646 } else {
647 info!(
648 self.log,
649 "Restarting previous GC operation";
650 "gen" => FnValue(|_| generations.last().unwrap().to_string())
651 );
652 }
653
654 loop {
655 let generations = self.read_generations()?;
656 assert!(!generations.is_empty());
657 if generations.len() == 1 {
658 info!(
659 self.log,
660 "One generation left - GC cycle complete";
661 "gen" => FnValue(|_| generations[0].to_string())
662 );
663 return Ok(());
664 }
665 let gen_oldest = generations[0];
666 let gen_cur = generations.last().unwrap();
667
668 let names = Name::list(gen_oldest, &self.aio)?;
669
670 info!(
671 self.log,
672 "Names left in the generation to be GCed";
673 "count" => names.len(),
674 "gen" => FnValue(|_| gen_oldest.to_string())
675 );
676 if names.is_empty() {
677 self.wipe_generation_maybe(gen_oldest, min_age_secs)?;
678 return Ok(());
679 }
680 self.update_name_to(&names[0], *gen_cur, &generations)?;
681 }
682 }
683
684 pub fn read<W: Write>(
685 &self,
686 name_str: &str,
687 writer: &mut W,
688 dec: &DecryptHandle,
689 ) -> Result<()> {
690 let _lock = self.aio.lock_shared();
691
692 let generations = self.read_generations()?;
693
694 let name = Name::load_from_any(name_str, &generations, &self.aio)?;
695 let data_address: DataAddress = name.into();
696
697 let accessor = self.get_chunk_accessor(
698 Some(Arc::clone(&dec.decrypter)),
699 Arc::clone(&self.compression),
700 generations,
701 );
702 let traverser = ReadContext::new(&accessor);
703 traverser.read_recursively(ReadRequest::new(
704 DataType::Data,
705 data_address.as_ref(),
706 Some(writer),
707 self.log.clone(),
708 ))
709 }
710
711 pub fn du(&self, name_str: &str, dec: &DecryptHandle) -> Result<DuResults> {
712 let _lock = self.aio.lock_shared();
713
714 let generations = self.read_generations()?;
715 let name = Name::load_from_any(name_str, &generations, &self.aio)?;
716 let data_address: DataAddress = name.into();
717
718 let mut counter = CounterWriter::new();
719 let accessor = VerifyingChunkAccessor::new(
720 self,
721 Some(Arc::clone(&dec.decrypter)),
722 Arc::clone(&self.compression),
723 generations,
724 );
725 {
726 let traverser = ReadContext::new(&accessor);
727 traverser.read_recursively(ReadRequest::new(
728 DataType::Data,
729 data_address.as_ref(),
730 Some(&mut counter),
731 self.log.clone(),
732 ))?;
733 }
734 Ok(DuResults {
735 chunks: accessor.get_results().scanned,
736 bytes: counter.count,
737 })
738 }
739
740 pub fn verify(
741 &self,
742 name_str: &str,
743 dec: &DecryptHandle,
744 ) -> Result<VerifyResults> {
745 let _lock = self.aio.lock_shared();
746
747 let generations = self.read_generations()?;
748
749 let name = Name::load_from_any(name_str, &generations, &self.aio)?;
750 let data_address: DataAddress = name.into();
751
752 let mut counter = CounterWriter::new();
753 let accessor = VerifyingChunkAccessor::new(
754 self,
755 Some(Arc::clone(&dec.decrypter)),
756 Arc::clone(&self.compression),
757 generations,
758 );
759 {
760 let traverser = ReadContext::new(&accessor);
761 traverser.read_recursively(ReadRequest::new(
762 DataType::Data,
763 data_address.as_ref(),
764 Some(&mut counter),
765 self.log.clone(),
766 ))?;
767 }
768 Ok(accessor.get_results())
769 }
770
771 fn read_generations(&self) -> io::Result<Vec<Generation>> {
772 let mut list: Vec<_> = self
773 .aio
774 .list(PathBuf::new())
775 .wait()?
776 .iter()
777 .filter_map(|path| path.file_name().and_then(|file| file.to_str()))
778 .filter(|&item| {
779 item != config::CONFIG_YML_FILE
780 && item != config::LOCK_FILE
781 && !item.ends_with(".yml")
782 })
783 .filter_map(|item| match Generation::try_from(item) {
784 Ok(gen) => {
785 if self.aio.read_metadata(gen.config_path()).wait().is_ok()
786 {
787 Some(gen)
788 } else {
789 warn!(
790 self.log,
791 "skipping dead generation: `{}` (config missing)",
792 item,
793 );
794 None
795 }
796 }
797 Err(e) => {
798 warn!(
799 self.log,
800 "skipping unknown generation: `{}` due to: `{}`",
801 item,
802 e
803 );
804 None
805 }
806 })
807 .collect();
808
809 list.sort();
810 Ok(list)
811 }
812
813 pub fn write<R>(
814 &self,
815 name_str: &str,
816 reader: R,
817 enc: &EncryptHandle,
818 ) -> Result<WriteStats>
819 where
820 R: Read + Send,
821 {
822 info!(self.log, "Writing data"; "name" => name_str);
823 let _lock = self.aio.lock_shared();
824
825 let mut generations = self.read_generations()?;
826
827 if generations.is_empty() {
828 let gen_first = Generation::gen_first();
829 gen_first.write(&self.aio)?;
830 generations.push(gen_first);
831 }
832
833 let mut timer = slog_perf::TimeReporter::new_with_level(
834 "write",
835 self.log.clone(),
836 Level::Info,
837 );
838 timer.start("write");
839 let num_threads = num_cpus::get();
840 let (chunker_tx, chunker_rx) =
841 mpsc::sync_channel(self.write_cpu_thread_num());
842
843 let backend = (self.backend_select)(&self.url)?;
844 let aio = aio::AsyncIO::new(backend, self.log.clone())?;
845
846 let stats = aio.stats();
847
848 let (process_tx, process_rx) = crossbeam_channel::bounded(num_threads);
850
851 let data_address = crossbeam::scope(|scope| {
852 scope.spawn(move |_| self.input_reader_thread(reader, chunker_tx));
853
854 for _ in 0..num_threads {
855 let process_rx = process_rx.clone();
856 let aio = aio.clone();
857 let encrypter = Arc::clone(&enc.encrypter);
858 let compression = Arc::clone(&self.compression);
859 let hasher = Arc::clone(&self.hasher);
860 let generations = generations.clone();
861 scope.spawn(move |_| {
862 let processor = ChunkProcessor::new(
863 self.clone(),
864 process_rx,
865 aio,
866 encrypter,
867 compression,
868 hasher,
869 generations,
870 );
871 processor.run();
872 });
873 }
874 drop(process_rx);
875
876 let chunk_and_write = scope.spawn(move |_| {
877 self.chunk_and_write_data_thread(
878 Box::new(chunker_rx.into_iter()),
879 process_tx,
880 aio,
881 DataType::Data,
882 )
883 });
884
885 chunk_and_write.join()
886 })
887 .expect("non-joined thread panicked (chunk processor?)");
888
889 let data_address = data_address.map_err(|e| {
890 if let Some(io_e) = e.downcast_ref::<io::Error>() {
891 io::Error::new(io_e.kind(), format!("{}", io_e))
892 } else {
893 io::Error::new(io::ErrorKind::Other, format!("{:?}", e))
894 }
895 })?;
896
897 let name: Name = data_address?.into();
898 name.write_as(name_str, *generations.last().unwrap(), &self.aio)?;
899 Ok(stats.get_stats())
900 }
901}
902#[cfg(test)]
905mod tests;
906
907