rdedup_lib/
lib.rs

1// {{{ use and mod
2use std::collections::HashSet;
3use std::io;
4use std::io::{Error, Read, Result, Write};
5use std::iter::Iterator;
6use std::path::{Path, PathBuf};
7use std::sync::{mpsc, Arc};
8
9use sgdata::SGData;
10use slog::{info, o, warn, FnValue, Level, Logger};
11use slog_perf::TimeReporter;
12use sodiumoxide::crypto::{self, box_, secretbox};
13use url::Url;
14
15use rdedup_cdc as rollsum;
16
17mod iterators;
18
19mod config;
20
21mod aio;
22use crate::aio::*;
23
24mod chunking;
25mod hashing;
26
27mod chunk_processor;
28use crate::chunk_processor::*;
29
30mod sorting_recv;
31use crate::sorting_recv::SortingIterator;
32
33mod encryption;
34use crate::encryption::EncryptionEngine;
35
36mod compression;
37use crate::compression::ArcCompression;
38
39mod pwhash;
40
41pub mod settings;
42
43mod util;
44use self::util::*;
45
46mod reading;
47use self::reading::*;
48
49mod generation;
50use self::generation::*;
51
52mod name;
53use self::name::*;
54
55mod misc;
56use self::misc::*;
57// }}}
58
59// Fancy reexport of backends API and particular backends structs
60pub mod backends {
61    pub use crate::aio::backend::{Backend, BackendThread, Lock};
62    pub use crate::aio::Metadata;
63
64    pub mod local {
65        pub use crate::aio::local::{Local, LocalThread};
66    }
67
68    #[cfg(feature = "backend-b2")]
69    pub mod b2 {
70        pub use crate::aio::b2::{Auth, B2Thread, Lock, B2};
71    }
72}
73
74type ArcDecrypter = Arc<dyn encryption::Decrypter + Send + Sync + 'static>;
75type ArcEncrypter = Arc<dyn encryption::Encrypter + Send + Sync + 'static>;
76
77const INGRESS_BUFFER_SIZE: usize = 128 * 1024;
78const DIGEST_SIZE: usize = 32;
79
80/// Type of user provided closure that will ask user for a passphrase is needed
81pub type PassphraseFn<'a> = &'a dyn Fn() -> io::Result<String>;
82
83/// Type of user provided closure that will find backend based on URL
84pub type BackendSelectFn = &'static (dyn Fn(&Url) -> io::Result<Box<dyn backends::Backend + Send + Sync>>
85              + Send
86              + Sync);
87
88#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89/// Data type (index/data)
90pub enum DataType {
91    Index,
92    Data,
93}
94
95impl DataType {
96    fn should_compress(&self) -> bool {
97        *self == DataType::Data
98    }
99
100    fn should_encrypt(&self) -> bool {
101        *self == DataType::Data
102    }
103}
104
105pub struct VerifyResults {
106    pub scanned: usize,
107    pub errors: Vec<(Vec<u8>, Error)>,
108}
109
110pub struct DuResults {
111    pub chunks: usize,
112    pub bytes: u64,
113}
114
115/// A decryption handle
116///
117/// Used as an argument to operations that decrypt data.
118pub struct DecryptHandle {
119    decrypter: ArcDecrypter,
120}
121
122/// A encryption handle
123///
124/// Used as an argument to operations that encrypt data.
125pub struct EncryptHandle {
126    encrypter: ArcEncrypter,
127}
128
129// {{{ Repo
130/// Rdedup repository handle
131#[derive(Clone)]
132pub struct Repo {
133    url: Url,
134    backend_select: BackendSelectFn,
135    config: config::Repo,
136
137    compression: compression::ArcCompression,
138    hasher: hashing::ArcHasher,
139
140    /// Logger
141    log: slog::Logger,
142
143    aio: aio::AsyncIO,
144}
145
146impl Repo {
147    pub fn unlock_decrypt(
148        &self,
149        pass: PassphraseFn<'_>,
150    ) -> io::Result<DecryptHandle> {
151        info!(self.log, "Opening read handle");
152        let decrypter = self
153            .config
154            .encryption
155            .decrypter(pass, &self.config.pwhash)?;
156
157        Ok(DecryptHandle { decrypter })
158    }
159
160    pub fn unlock_encrypt(
161        &self,
162        pass: PassphraseFn<'_>,
163    ) -> io::Result<EncryptHandle> {
164        info!(self.log, "Opening write handle");
165        let encrypter = self
166            .config
167            .encryption
168            .encrypter(pass, &self.config.pwhash)?;
169
170        Ok(EncryptHandle { encrypter })
171    }
172
173    fn ensure_repo_empty_or_new(aio: &AsyncIO) -> Result<()> {
174        let list = aio.list(PathBuf::from(".")).wait();
175
176        if list.is_ok() && !list.unwrap().is_empty() {
177            return Err(Error::new(
178                io::ErrorKind::AlreadyExists,
179                "repo dir must not exist or be empty to be used",
180            ));
181        }
182        Ok(())
183    }
184
185    /// Create new rdedup repository
186    pub fn init<L>(
187        url: &Url,
188        passphrase: PassphraseFn<'_>,
189        settings: settings::Repo,
190        log: L,
191    ) -> Result<Repo>
192    where
193        L: Into<Option<Logger>>,
194    {
195        Self::init_custom(
196            url,
197            &aio::backend_from_url,
198            passphrase,
199            settings,
200            log,
201        )
202    }
203
204    pub fn open<L>(url: &Url, log: L) -> Result<Repo>
205    where
206        L: Into<Option<Logger>>,
207    {
208        Self::open_custom(url, &aio::backend_from_url, log)
209    }
210
211    /// Create new rdedup repository
212    pub fn init_custom<L>(
213        url: &Url,
214        backend_select: BackendSelectFn,
215        passphrase: PassphraseFn<'_>,
216        settings: settings::Repo,
217        log: L,
218    ) -> Result<Repo>
219    where
220        L: Into<Option<Logger>>,
221    {
222        let log = log
223            .into()
224            .unwrap_or_else(|| Logger::root(slog::Discard, o!()));
225
226        let backend = backend_select(&url)?;
227        let aio = aio::AsyncIO::new(backend, log.clone())?;
228
229        Repo::ensure_repo_empty_or_new(&aio)?;
230        let config = config::Repo::new_from_settings(passphrase, settings)?;
231        config.write(&aio)?;
232
233        let compression = config.compression.to_engine();
234        let hasher = config.hashing.to_hasher();
235
236        Ok(Repo {
237            url: url.clone(),
238            backend_select,
239            config,
240            compression,
241            hasher,
242            log,
243            aio,
244        })
245    }
246
247    pub fn open_custom<L>(
248        url: &Url,
249        backend_select: BackendSelectFn,
250        log: L,
251    ) -> Result<Repo>
252    where
253        L: Into<Option<Logger>>,
254    {
255        let log = log
256            .into()
257            .unwrap_or_else(|| Logger::root(slog::Discard, o!()));
258
259        let backend = backend_select(url)?;
260        let aio = aio::AsyncIO::new(backend, log.clone())?;
261
262        let config = config::Repo::read(&aio)?;
263
264        let compression = config.compression.to_engine();
265        let hasher = config.hashing.to_hasher();
266        Ok(Repo {
267            url: url.clone(),
268            backend_select,
269            config,
270            compression,
271            hasher,
272            log,
273            aio,
274        })
275    }
276
277    /// Change the passphrase
278    pub fn change_passphrase(
279        &mut self,
280        old_p: PassphraseFn<'_>,
281        new_p: PassphraseFn<'_>,
282    ) -> Result<()> {
283        let _lock = self.aio.lock_exclusive();
284
285        if self.config.version == 0 {
286            Err(Error::new(
287                io::ErrorKind::NotFound,
288                "rdedup v0 config format not supported",
289            ))
290        } else {
291            self.config.encryption.change_passphrase(
292                old_p,
293                new_p,
294                &self.config.pwhash,
295            )?;
296            self.config.write(&self.aio)?;
297            Ok(())
298        }
299    }
300
301    /// Write a chunk of data to the repo.
302    fn chunk_and_write_data_thread<'a>(
303        &'a self,
304        input_data_iter: Box<dyn Iterator<Item = Vec<u8>> + Send + 'a>,
305        process_tx: crossbeam_channel::Sender<chunk_processor::Message>,
306        aio: aio::AsyncIO,
307        data_type: DataType,
308    ) -> io::Result<DataAddress> {
309        // Note: This channel is intentionally unbounded
310        // The processing loop runs in sort of a loop (actually more of a
311        // recursive spiral). Unless this channel is unbounded it's possible
312        // that one `index-processor` will wait for `chunker` while `chunker`
313        // waits for `chunk-processor` while will `chunk-processor` waits
314        // for `index-processor`.
315        //
316        // In practice there's always less `index` data than chunk data (that's
317        // the whole point of keeping index) so this channel does not have
318        // to be bounded.
319        let (digests_tx, digests_rx) = mpsc::channel();
320
321        crossbeam::scope(move |scope| {
322            let mut timer = slog_perf::TimeReporter::new_with_level(
323                "index-processor",
324                self.log.clone(),
325                Level::Debug,
326            );
327            timer.start("spawn-chunker");
328
329            scope.spawn({
330                let process_tx = process_tx.clone();
331                move |_| {
332                    let mut timer = slog_perf::TimeReporter::new_with_level(
333                        "chunker",
334                        self.log.clone(),
335                        Level::Debug,
336                    );
337
338                    let chunker = chunking::Chunker::new(
339                        input_data_iter,
340                        self.config.chunking.to_engine(),
341                    );
342
343                    let mut data = util::EnumerateU64::new(chunker);
344
345                    while let Some(i_sg) =
346                        timer.start_with("rx-and-chunking", || data.next())
347                    {
348                        timer.start("tx");
349                        let (i, sg) = i_sg;
350                        process_tx
351                            .send(chunk_processor::Message {
352                                data: (i as u64, sg),
353                                response_tx: digests_tx.clone(),
354                                data_type,
355                            })
356                            .expect("chunk process tx channel closed")
357                    }
358                    drop(digests_tx);
359                }
360            });
361
362            timer.start("sorting-recv-create");
363            let mut digests_rx = SortingIterator::new(digests_rx.into_iter());
364
365            timer.start("digest-rx");
366            let first_digest =
367                digests_rx.next().expect("At least one index digest");
368
369            if let Some(second_digest) =
370                timer.start_with("digest-rx", || digests_rx.next())
371            {
372                let mut two_first = vec![first_digest, second_digest];
373                let mut address = self.chunk_and_write_data_thread(
374                    Box::new(
375                        two_first
376                            .drain(..)
377                            .chain(digests_rx)
378                            .map(|digest| digest.0),
379                    ),
380                    process_tx,
381                    aio.clone(),
382                    DataType::Index,
383                )?;
384
385                address.index_level += 1;
386                Ok(address)
387            } else {
388                Ok(DataAddress {
389                    index_level: 0,
390                    digest: first_digest,
391                })
392            }
393        })
394        .expect("chunker thread failed")
395    }
396
397    /// Number of threads to use to parallelize CPU-intense part of
398    /// the workload.
399    fn write_cpu_thread_num(&self) -> usize {
400        num_cpus::get()
401    }
402
403    fn input_reader_thread<R>(
404        &self,
405        reader: R,
406        chunker_tx: mpsc::SyncSender<Vec<u8>>,
407    ) where
408        R: Read + Send,
409    {
410        let mut time = TimeReporter::new_with_level(
411            "input-reader",
412            self.log.clone(),
413            Level::Debug,
414        );
415
416        let r2vi = ReaderVecIter::new(reader, INGRESS_BUFFER_SIZE);
417        let mut while_ok = WhileOk::new(r2vi);
418
419        while let Some(buf) = time.start_with("input", || while_ok.next()) {
420            time.start("tx");
421            chunker_tx.send(buf).expect("chunker tx channel closed")
422        }
423
424        if let Some(e) = while_ok.finish() {
425            panic!("Input thread error: {}", e)
426        }
427    }
428
429    fn get_chunk_accessor(
430        &self,
431        decrypter: Option<ArcDecrypter>,
432        compression: ArcCompression,
433        generations: Vec<Generation>,
434    ) -> DefaultChunkAccessor<'_> {
435        DefaultChunkAccessor::new(self, decrypter, compression, generations)
436    }
437
438    fn get_recording_chunk_accessor<'a>(
439        &'a self,
440        accessed: &'a mut HashSet<Vec<u8>>,
441        decrypter: Option<ArcDecrypter>,
442        compression: ArcCompression,
443        generations: Vec<Generation>,
444    ) -> RecordingChunkAccessor<'a> {
445        RecordingChunkAccessor::new(
446            self,
447            accessed,
448            decrypter,
449            compression,
450            generations,
451        )
452    }
453
454    fn wipe_generation_maybe(
455        &self,
456        gen: Generation,
457        min_age_secs: u64,
458    ) -> io::Result<()> {
459        let gen_config = match gen.load_config(&self.aio) {
460            Ok(c) => c,
461            Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
462                info!(
463                    self.log,
464                    "Generation config file not found. Rerun GC later to finish";
465                );
466
467                return Ok(());
468            }
469            Err(e) => return Err(e),
470        };
471
472        if gen_config.created + chrono::Duration::seconds(min_age_secs as i64)
473            > chrono::Utc::now()
474        {
475            info!(
476                self.log,
477                "Generation is not old enough. Rerun GC later to finish";
478                "gen" => FnValue(|_| gen.to_string()),
479                "gen-created" => gen_config.created.to_rfc3339(),
480                "now" => chrono::Utc::now().to_rfc3339(),
481            );
482            return Ok(());
483        }
484        info!(
485            self.log,
486            "Reclaiming old generation finished. Deleting...";
487            "gen" => FnValue(|_| gen.to_string()),
488        );
489
490        // Make sure chunks are successfully removed before
491        // attempting to delete the generation dir itself
492        // so that we don't leave garbage with no Generation
493        // config file.
494        substitute_err_not_found(
495            self.aio
496                .remove_dir_all(
497                    PathBuf::from(gen.to_string()).join(NAME_SUBDIR),
498                )
499                .wait(),
500            || (),
501        )?;
502
503        substitute_err_not_found(
504            self.aio
505                .remove_dir_all(
506                    PathBuf::from(gen.to_string()).join(config::DATA_SUBDIR),
507                )
508                .wait(),
509            || (),
510        )?;
511
512        self.aio
513            .remove_dir_all(PathBuf::from(gen.to_string()))
514            .wait()?;
515
516        Ok(())
517    }
518
519    fn update_name_to(
520        &self,
521        name_str: &str,
522        cur_gen: Generation,
523        generations: &[Generation],
524    ) -> io::Result<()> {
525        // traverse all the chunks (both index and data)
526        // and move all the chunks to the newest gen
527        info!(
528            self.log,
529            "Updating name to current generation";
530            "name" => name_str,
531            "gen" => FnValue(|_| cur_gen.to_string())
532        );
533        let name = Name::load_from_any(name_str, generations, &self.aio)?;
534        let data_address: DataAddress = name.into();
535
536        let accessor = GenerationUpdateChunkAccessor::new(
537            self,
538            Arc::clone(&self.compression),
539            generations.to_vec(),
540        );
541        {
542            let traverser = ReadContext::new(&accessor);
543            traverser.read_recursively(ReadRequest::new(
544                DataType::Data,
545                data_address.as_ref(),
546                None,
547                self.log.clone(),
548            ))?;
549        }
550
551        Name::update_generation_to(name_str, cur_gen, generations, &self.aio)?;
552
553        Ok(())
554    }
555
556    fn reachable_recursively_insert(
557        &self,
558        da: DataAddressRef<'_>,
559        reachable_digests: &mut HashSet<Vec<u8>>,
560        generations: Vec<Generation>,
561    ) -> Result<()> {
562        reachable_digests.insert(da.digest.0.into());
563
564        let accessor = self.get_recording_chunk_accessor(
565            reachable_digests,
566            None,
567            Arc::clone(&self.compression),
568            generations,
569        );
570        let traverser = ReadContext::new(&accessor);
571        traverser.read_recursively(ReadRequest::new(
572            DataType::Data,
573            da,
574            None,
575            self.log.clone(),
576        ))
577    }
578
579    /// Return all reachable chunks
580    #[allow(dead_code)] // tests
581    fn list_reachable_chunks(&self) -> Result<HashSet<Vec<u8>>> {
582        let generations = self.read_generations()?;
583        let mut reachable_digests = HashSet::new();
584        let all_names = Name::list_all(&generations, &self.aio)?;
585        for name_str in &all_names {
586            match Name::load_from_any(name_str, &generations, &self.aio) {
587                Ok(name) => {
588                    let data_address: DataAddress = name.into();
589                    info!(self.log, "processing"; "name" => name_str);
590                    self.reachable_recursively_insert(
591                        data_address.as_ref(),
592                        &mut reachable_digests,
593                        generations.clone(),
594                    )?;
595                }
596                Err(e) => {
597                    info!(
598                        self.log,
599                        "skipped";
600                        "name" => name_str, "error" => e.to_string()
601                    );
602                }
603            }
604        }
605
606        Ok(reachable_digests)
607    }
608
609    fn chunk_rel_path_by_digest(
610        &self,
611        digest: DigestRef<'_>,
612        gen_str: &str,
613    ) -> PathBuf {
614        self.config.nesting.get_path(
615            Path::new(config::DATA_SUBDIR),
616            digest.0,
617            gen_str,
618        )
619    }
620
621    pub fn list_names(&self) -> io::Result<Vec<String>> {
622        let _lock = self.aio.lock_shared();
623        Name::list_all(&self.read_generations()?, &self.aio)
624    }
625
626    /// Remove a stored name from repo
627    pub fn rm(&self, name: &str) -> Result<()> {
628        let _lock = self.aio.lock_exclusive();
629        Name::remove_any(name, &self.read_generations()?, &self.aio)
630    }
631
632    pub fn gc(&self, min_age_secs: u64) -> Result<()> {
633        let _lock = self.aio.lock_exclusive();
634
635        let generations = self.read_generations()?;
636
637        if generations.is_empty() {
638            info!(self.log, "Nothing in the repository yet, nothing to gc");
639            return Ok(());
640        }
641
642        if generations.len() == 1 {
643            let new_gen = generations.last().unwrap().gen_next();
644            info!(self.log, "Creating new generation"; "gen" => FnValue(|_| new_gen.to_string()));
645            new_gen.write(&self.aio)?;
646        } else {
647            info!(
648                self.log,
649                "Restarting previous GC operation";
650                "gen" => FnValue(|_| generations.last().unwrap().to_string())
651            );
652        }
653
654        loop {
655            let generations = self.read_generations()?;
656            assert!(!generations.is_empty());
657            if generations.len() == 1 {
658                info!(
659                    self.log,
660                    "One generation left - GC cycle complete";
661                    "gen" => FnValue(|_| generations[0].to_string())
662                );
663                return Ok(());
664            }
665            let gen_oldest = generations[0];
666            let gen_cur = generations.last().unwrap();
667
668            let names = Name::list(gen_oldest, &self.aio)?;
669
670            info!(
671                self.log,
672                "Names left in the generation to be GCed";
673                "count" => names.len(),
674                "gen" => FnValue(|_| gen_oldest.to_string())
675            );
676            if names.is_empty() {
677                self.wipe_generation_maybe(gen_oldest, min_age_secs)?;
678                return Ok(());
679            }
680            self.update_name_to(&names[0], *gen_cur, &generations)?;
681        }
682    }
683
684    pub fn read<W: Write>(
685        &self,
686        name_str: &str,
687        writer: &mut W,
688        dec: &DecryptHandle,
689    ) -> Result<()> {
690        let _lock = self.aio.lock_shared();
691
692        let generations = self.read_generations()?;
693
694        let name = Name::load_from_any(name_str, &generations, &self.aio)?;
695        let data_address: DataAddress = name.into();
696
697        let accessor = self.get_chunk_accessor(
698            Some(Arc::clone(&dec.decrypter)),
699            Arc::clone(&self.compression),
700            generations,
701        );
702        let traverser = ReadContext::new(&accessor);
703        traverser.read_recursively(ReadRequest::new(
704            DataType::Data,
705            data_address.as_ref(),
706            Some(writer),
707            self.log.clone(),
708        ))
709    }
710
711    pub fn du(&self, name_str: &str, dec: &DecryptHandle) -> Result<DuResults> {
712        let _lock = self.aio.lock_shared();
713
714        let generations = self.read_generations()?;
715        let name = Name::load_from_any(name_str, &generations, &self.aio)?;
716        let data_address: DataAddress = name.into();
717
718        let mut counter = CounterWriter::new();
719        let accessor = VerifyingChunkAccessor::new(
720            self,
721            Some(Arc::clone(&dec.decrypter)),
722            Arc::clone(&self.compression),
723            generations,
724        );
725        {
726            let traverser = ReadContext::new(&accessor);
727            traverser.read_recursively(ReadRequest::new(
728                DataType::Data,
729                data_address.as_ref(),
730                Some(&mut counter),
731                self.log.clone(),
732            ))?;
733        }
734        Ok(DuResults {
735            chunks: accessor.get_results().scanned,
736            bytes: counter.count,
737        })
738    }
739
740    pub fn verify(
741        &self,
742        name_str: &str,
743        dec: &DecryptHandle,
744    ) -> Result<VerifyResults> {
745        let _lock = self.aio.lock_shared();
746
747        let generations = self.read_generations()?;
748
749        let name = Name::load_from_any(name_str, &generations, &self.aio)?;
750        let data_address: DataAddress = name.into();
751
752        let mut counter = CounterWriter::new();
753        let accessor = VerifyingChunkAccessor::new(
754            self,
755            Some(Arc::clone(&dec.decrypter)),
756            Arc::clone(&self.compression),
757            generations,
758        );
759        {
760            let traverser = ReadContext::new(&accessor);
761            traverser.read_recursively(ReadRequest::new(
762                DataType::Data,
763                data_address.as_ref(),
764                Some(&mut counter),
765                self.log.clone(),
766            ))?;
767        }
768        Ok(accessor.get_results())
769    }
770
771    fn read_generations(&self) -> io::Result<Vec<Generation>> {
772        let mut list: Vec<_> = self
773            .aio
774            .list(PathBuf::new())
775            .wait()?
776            .iter()
777            .filter_map(|path| path.file_name().and_then(|file| file.to_str()))
778            .filter(|&item| {
779                item != config::CONFIG_YML_FILE
780                    && item != config::LOCK_FILE
781                    && !item.ends_with(".yml")
782            })
783            .filter_map(|item| match Generation::try_from(item) {
784                Ok(gen) => {
785                    if self.aio.read_metadata(gen.config_path()).wait().is_ok()
786                    {
787                        Some(gen)
788                    } else {
789                        warn!(
790                            self.log,
791                            "skipping dead generation: `{}` (config missing)",
792                            item,
793                        );
794                        None
795                    }
796                }
797                Err(e) => {
798                    warn!(
799                        self.log,
800                        "skipping unknown generation: `{}` due to: `{}`",
801                        item,
802                        e
803                    );
804                    None
805                }
806            })
807            .collect();
808
809        list.sort();
810        Ok(list)
811    }
812
813    pub fn write<R>(
814        &self,
815        name_str: &str,
816        reader: R,
817        enc: &EncryptHandle,
818    ) -> Result<WriteStats>
819    where
820        R: Read + Send,
821    {
822        info!(self.log, "Writing data"; "name" => name_str);
823        let _lock = self.aio.lock_shared();
824
825        let mut generations = self.read_generations()?;
826
827        if generations.is_empty() {
828            let gen_first = Generation::gen_first();
829            gen_first.write(&self.aio)?;
830            generations.push(gen_first);
831        }
832
833        let mut timer = slog_perf::TimeReporter::new_with_level(
834            "write",
835            self.log.clone(),
836            Level::Info,
837        );
838        timer.start("write");
839        let num_threads = num_cpus::get();
840        let (chunker_tx, chunker_rx) =
841            mpsc::sync_channel(self.write_cpu_thread_num());
842
843        let backend = (self.backend_select)(&self.url)?;
844        let aio = aio::AsyncIO::new(backend, self.log.clone())?;
845
846        let stats = aio.stats();
847
848        // mpmc queue used  as spmc fan-out
849        let (process_tx, process_rx) = crossbeam_channel::bounded(num_threads);
850
851        let data_address = crossbeam::scope(|scope| {
852            scope.spawn(move |_| self.input_reader_thread(reader, chunker_tx));
853
854            for _ in 0..num_threads {
855                let process_rx = process_rx.clone();
856                let aio = aio.clone();
857                let encrypter = Arc::clone(&enc.encrypter);
858                let compression = Arc::clone(&self.compression);
859                let hasher = Arc::clone(&self.hasher);
860                let generations = generations.clone();
861                scope.spawn(move |_| {
862                    let processor = ChunkProcessor::new(
863                        self.clone(),
864                        process_rx,
865                        aio,
866                        encrypter,
867                        compression,
868                        hasher,
869                        generations,
870                    );
871                    processor.run();
872                });
873            }
874            drop(process_rx);
875
876            let chunk_and_write = scope.spawn(move |_| {
877                self.chunk_and_write_data_thread(
878                    Box::new(chunker_rx.into_iter()),
879                    process_tx,
880                    aio,
881                    DataType::Data,
882                )
883            });
884
885            chunk_and_write.join()
886        })
887        .expect("non-joined thread panicked (chunk processor?)");
888
889        let data_address = data_address.map_err(|e| {
890            if let Some(io_e) = e.downcast_ref::<io::Error>() {
891                io::Error::new(io_e.kind(), format!("{}", io_e))
892            } else {
893                io::Error::new(io::ErrorKind::Other, format!("{:?}", e))
894            }
895        })?;
896
897        let name: Name = data_address?.into();
898        name.write_as(name_str, *generations.last().unwrap(), &self.aio)?;
899        Ok(stats.get_stats())
900    }
901}
902// }}}
903
904#[cfg(test)]
905mod tests;
906
907// vim: foldmethod=marker foldmarker={{{,}}}