rustic_core/blob/
packer.rs

1use std::{
2    num::NonZeroU32,
3    sync::{Arc, RwLock},
4    time::{Duration, SystemTime},
5};
6
7use bytes::{Bytes, BytesMut};
8use chrono::Local;
9use crossbeam_channel::{Receiver, Sender, bounded};
10use integer_sqrt::IntegerSquareRoot;
11use log::warn;
12use pariter::{IteratorExt, scope};
13
14use crate::{
15    backend::{
16        FileType,
17        decrypt::{DecryptFullBackend, DecryptWriteBackend},
18    },
19    blob::{BlobId, BlobType},
20    crypto::{CryptoKey, hasher::hash},
21    error::{ErrorKind, RusticError, RusticResult},
22    index::indexer::SharedIndexer,
23    repofile::{
24        configfile::ConfigFile,
25        indexfile::{IndexBlob, IndexPack},
26        packfile::{PackHeaderLength, PackHeaderRef, PackId},
27        snapshotfile::SnapshotSummary,
28    },
29};
30
31/// [`PackerErrorKind`] describes the errors that can be returned for a Packer
32#[derive(thiserror::Error, Debug, displaydoc::Display)]
33#[non_exhaustive]
34pub enum PackerErrorKind {
35    /// Conversion from `{from}` to `{to}` failed: `{source}`
36    Conversion {
37        to: &'static str,
38        from: &'static str,
39        source: std::num::TryFromIntError,
40    },
41    /// Sending crossbeam message failed: `size_limit`: `{size_limit:?}`, `id`: `{id:?}`, `data`: `{data:?}` : `{source}`
42    SendingCrossbeamMessage {
43        size_limit: Option<u32>,
44        id: BlobId,
45        data: Bytes,
46        source: crossbeam_channel::SendError<(Bytes, BlobId, Option<u32>)>,
47    },
48    /// Sending crossbeam data message failed: `data`: `{data:?}`, `index_pack`: `{index_pack:?}` : `{source}`
49    SendingCrossbeamDataMessage {
50        data: Bytes,
51        index_pack: IndexPack,
52        source: crossbeam_channel::SendError<(Bytes, IndexPack)>,
53    },
54}
55
56pub(crate) type PackerResult<T> = Result<T, Box<PackerErrorKind>>;
57
58pub(super) mod constants {
59    use std::time::Duration;
60
61    /// Kilobyte in bytes
62    pub(super) const KB: u32 = 1024;
63    /// Megabyte in bytes
64    pub(super) const MB: u32 = 1024 * KB;
65    /// The absolute maximum size of a pack: including headers it should not exceed 4 GB
66    pub(super) const MAX_SIZE: u32 = 4076 * MB;
67    /// The maximum number of blobs in a pack
68    pub(super) const MAX_COUNT: u32 = 10_000;
69    /// The maximum age of a pack
70    pub(super) const MAX_AGE: Duration = Duration::from_secs(300);
71}
72
73/// The pack sizer is responsible for computing the size of the pack file.
74#[derive(Debug, Clone, Copy)]
75pub struct PackSizer {
76    /// The default size of a pack file.
77    default_size: u32,
78    /// The grow factor of a pack file.
79    grow_factor: u32,
80    /// The size limit of a pack file.
81    size_limit: u32,
82    /// The current size of a pack file.
83    current_size: u64,
84    /// The minimum pack size tolerance in percent before a repack is triggered.
85    min_packsize_tolerate_percent: u32,
86    /// The maximum pack size tolerance in percent before a repack is triggered.
87    max_packsize_tolerate_percent: u32,
88}
89
90impl PackSizer {
91    /// Creates a new `PackSizer` from a config file.
92    ///
93    /// # Arguments
94    ///
95    /// * `config` - The config file.
96    /// * `blob_type` - The blob type.
97    /// * `current_size` - The current size of the pack file.
98    ///
99    /// # Returns
100    ///
101    /// A new `PackSizer`.
102    #[must_use]
103    pub fn from_config(config: &ConfigFile, blob_type: BlobType, current_size: u64) -> Self {
104        let (default_size, grow_factor, size_limit) = config.packsize(blob_type);
105        let (min_packsize_tolerate_percent, max_packsize_tolerate_percent) =
106            config.packsize_ok_percents();
107        Self {
108            default_size,
109            grow_factor,
110            size_limit,
111            current_size,
112            min_packsize_tolerate_percent,
113            max_packsize_tolerate_percent,
114        }
115    }
116
117    /// Computes the size of the pack file.
118    #[must_use]
119    // The cast actually shouldn't pose any problems.
120    // `current_size` is `u64`, the maximum value is `2^64-1`.
121    // `isqrt(2^64-1) = 2^32-1` which fits into a `u32`. (@aawsome)
122    #[allow(clippy::cast_possible_truncation)]
123    pub fn pack_size(&self) -> u32 {
124        (self.current_size.integer_sqrt() as u32 * self.grow_factor + self.default_size)
125            .min(self.size_limit)
126            .min(constants::MAX_SIZE)
127    }
128
129    /// Evaluates whether the given size is not too small or too large
130    ///
131    /// # Arguments
132    ///
133    /// * `size` - The size to check
134    #[must_use]
135    pub fn size_ok(&self, size: u32) -> bool {
136        !self.is_too_small(size) && !self.is_too_large(size)
137    }
138
139    /// Evaluates whether the given size is too small
140    ///
141    /// # Arguments
142    ///
143    /// * `size` - The size to check
144    #[must_use]
145    pub fn is_too_small(&self, size: u32) -> bool {
146        let target_size = self.pack_size();
147        // Note: we cast to u64 so that no overflow can occur in the multiplications
148        u64::from(size) * 100
149            < u64::from(target_size) * u64::from(self.min_packsize_tolerate_percent)
150    }
151
152    /// Evaluates whether the given size is too large
153    ///
154    /// # Arguments
155    ///
156    /// * `size` - The size to check
157    #[must_use]
158    pub fn is_too_large(&self, size: u32) -> bool {
159        let target_size = self.pack_size();
160        // Note: we cast to u64 so that no overflow can occur in the multiplications
161        u64::from(size) * 100
162            > u64::from(target_size) * u64::from(self.max_packsize_tolerate_percent)
163    }
164
165    /// Adds the given size to the current size.
166    ///
167    /// # Arguments
168    ///
169    /// * `added` - The size to add
170    ///
171    /// # Panics
172    ///
173    /// * If the size is too large
174    fn add_size(&mut self, added: u32) {
175        self.current_size += u64::from(added);
176    }
177}
178
179/// The `Packer` is responsible for packing blobs into pack files.
180///
181/// # Type Parameters
182///
183/// * `BE` - The backend type.
184#[allow(missing_debug_implementations)]
185#[allow(clippy::struct_field_names)]
186#[derive(Clone)]
187pub struct Packer<BE: DecryptWriteBackend> {
188    /// The raw packer wrapped in an `Arc` and `RwLock`.
189    // This is a hack: raw_packer and indexer are only used in the add_raw() method.
190    // TODO: Refactor as actor, like the other add() methods
191    raw_packer: Arc<RwLock<RawPacker<BE>>>,
192    /// The shared indexer containing the backend.
193    indexer: SharedIndexer<BE>,
194    /// The sender to send blobs to the raw packer.
195    sender: Sender<(Bytes, BlobId, Option<u32>)>,
196    /// The receiver to receive the status from the raw packer.
197    finish: Receiver<RusticResult<PackerStats>>,
198}
199
200impl<BE: DecryptWriteBackend> Packer<BE> {
201    /// Creates a new `Packer`.
202    ///
203    /// # Type Parameters
204    ///
205    /// * `BE` - The backend type.
206    ///
207    /// # Arguments
208    ///
209    /// * `be` - The backend to write to.
210    /// * `blob_type` - The blob type.
211    /// * `indexer` - The indexer to write to.
212    /// * `config` - The config file.
213    /// * `total_size` - The total size of the pack file.
214    ///
215    /// # Errors
216    ///
217    /// * If sending the message to the raw packer fails.
218    /// * If converting the data length to u64 fails
219    #[allow(clippy::unnecessary_wraps)]
220    pub fn new(
221        be: BE,
222        blob_type: BlobType,
223        indexer: SharedIndexer<BE>,
224        config: &ConfigFile,
225        total_size: u64,
226    ) -> RusticResult<Self> {
227        let raw_packer = Arc::new(RwLock::new(RawPacker::new(
228            be.clone(),
229            blob_type,
230            indexer.clone(),
231            config,
232            total_size,
233        )));
234
235        let (tx, rx) = bounded(0);
236        let (finish_tx, finish_rx) = bounded::<RusticResult<PackerStats>>(0);
237        let packer = Self {
238            raw_packer: raw_packer.clone(),
239            indexer: indexer.clone(),
240            sender: tx,
241            finish: finish_rx,
242        };
243
244        let _join_handle = std::thread::spawn(move || {
245            scope(|scope| {
246                let status = rx
247                    .into_iter()
248                    .readahead_scoped(scope)
249                    // early check if id is already contained
250                    .filter(|(_, id, _)| !indexer.read().unwrap().has(id))
251                    .filter(|(_, id, _)| !raw_packer.read().unwrap().has(id))
252                    .readahead_scoped(scope)
253                    .parallel_map_scoped(
254                        scope,
255                        |(data, id, size_limit): (Bytes, BlobId, Option<u32>)| {
256                            let (data, data_len, uncompressed_length) = be.process_data(&data)?;
257                            Ok((
258                                data,
259                                id,
260                                u64::from(data_len),
261                                uncompressed_length,
262                                size_limit,
263                            ))
264                        },
265                    )
266                    .readahead_scoped(scope)
267                    // check again if id is already contained
268                    // TODO: We may still save duplicate blobs - the indexer is only updated when the packfile write has completed
269                    .filter(|res| {
270                        res.as_ref().map_or_else(
271                            |_| true,
272                            |(_, id, _, _, _)| !indexer.read().unwrap().has(id),
273                        )
274                    })
275                    .try_for_each(|item: RusticResult<_>| -> RusticResult<()> {
276                        let (data, id, data_len, ul, size_limit) = item?;
277                        raw_packer
278                            .write()
279                            .unwrap()
280                            .add_raw(&data, &id, data_len, ul, size_limit)
281                    })
282                    .and_then(|()| raw_packer.write().unwrap().finalize());
283                _ = finish_tx.send(status);
284            })
285            .unwrap();
286        });
287
288        Ok(packer)
289    }
290
291    /// Adds the blob to the packfile
292    ///
293    /// # Arguments
294    ///
295    /// * `data` - The blob data
296    /// * `id` - The blob id
297    ///
298    /// # Errors
299    ///
300    /// * If sending the message to the raw packer fails.
301    pub fn add(&self, data: Bytes, id: BlobId) -> RusticResult<()> {
302        // compute size limit based on total size and size bounds
303        self.add_with_sizelimit(data, id, None).map_err(|err| {
304            RusticError::with_source(
305                ErrorKind::Internal,
306                "Failed to add blob `{id}` to packfile.",
307                err,
308            )
309            .attach_context("id", id.to_string())
310            .ask_report()
311        })
312    }
313
314    /// Adds the blob to the packfile, allows specifying a size limit for the pack file
315    ///
316    /// # Arguments
317    ///
318    /// * `data` - The blob data
319    /// * `id` - The blob id
320    /// * `size_limit` - The size limit for the pack file
321    ///
322    /// # Errors
323    ///
324    /// * If sending the message to the raw packer fails.
325    fn add_with_sizelimit(
326        &self,
327        data: Bytes,
328        id: BlobId,
329        size_limit: Option<u32>,
330    ) -> PackerResult<()> {
331        self.sender
332            .send((data.clone(), id, size_limit))
333            .map_err(|err| PackerErrorKind::SendingCrossbeamMessage {
334                size_limit,
335                id,
336                data,
337                source: err,
338            })?;
339        Ok(())
340    }
341
342    /// Adds the already encrypted (and maybe compressed) blob to the packfile
343    ///
344    /// # Arguments
345    ///
346    /// * `data` - The blob data
347    /// * `id` - The blob id
348    /// * `data_len` - The length of the blob data
349    /// * `uncompressed_length` - The length of the blob data before compression
350    /// * `size_limit` - The size limit for the pack file
351    ///
352    /// # Errors
353    ///
354    /// * If the blob is already present in the index
355    /// * If sending the message to the raw packer fails.
356    fn add_raw(
357        &self,
358        data: &[u8],
359        id: &BlobId,
360        data_len: u64,
361        uncompressed_length: Option<NonZeroU32>,
362        size_limit: Option<u32>,
363    ) -> RusticResult<()> {
364        // only add if this blob is not present
365        if self.indexer.read().unwrap().has(id) {
366            Ok(())
367        } else {
368            self.raw_packer.write().unwrap().add_raw(
369                data,
370                id,
371                data_len,
372                uncompressed_length,
373                size_limit,
374            )
375        }
376    }
377
378    /// Finalizes the packer and does cleanup
379    ///
380    /// # Panics
381    ///
382    /// * If the channel could not be dropped
383    pub fn finalize(self) -> RusticResult<PackerStats> {
384        // cancel channel
385        drop(self.sender);
386        // wait for items in channel to be processed
387        self.finish
388            .recv()
389            .expect("Should be able to receive from channel to finalize packer.")
390    }
391}
392
393// TODO: add documentation!
394#[derive(Default, Debug, Clone, Copy)]
395pub struct PackerStats {
396    /// The number of blobs added
397    blobs: u64,
398    /// The number of data blobs added
399    data: u64,
400    /// The number of packed data blobs added
401    data_packed: u64,
402}
403
404impl PackerStats {
405    /// Adds the stats to the summary
406    ///
407    /// # Arguments
408    ///
409    /// * `summary` - The summary to add to
410    /// * `tpe` - The blob type
411    ///
412    /// # Panics
413    ///
414    /// * If the blob type is invalid
415    pub fn apply(self, summary: &mut SnapshotSummary, tpe: BlobType) {
416        summary.data_added += self.data;
417        summary.data_added_packed += self.data_packed;
418        match tpe {
419            BlobType::Tree => {
420                summary.tree_blobs += self.blobs;
421                summary.data_added_trees += self.data;
422                summary.data_added_trees_packed += self.data_packed;
423            }
424            BlobType::Data => {
425                summary.data_blobs += self.blobs;
426                summary.data_added_files += self.data;
427                summary.data_added_files_packed += self.data_packed;
428            }
429        }
430    }
431}
432
433/// The `RawPacker` is responsible for packing blobs into pack files.
434///
435/// # Type Parameters
436///
437/// * `BE` - The backend type.
438#[allow(missing_debug_implementations, clippy::module_name_repetitions)]
439pub(crate) struct RawPacker<BE: DecryptWriteBackend> {
440    /// The backend to write to.
441    be: BE,
442    /// The blob type to pack.
443    blob_type: BlobType,
444    /// The file to write to
445    file: BytesMut,
446    /// The size of the file
447    size: u32,
448    /// The number of blobs in the pack
449    count: u32,
450    /// The time the pack was created
451    created: SystemTime,
452    /// The index of the pack
453    index: IndexPack,
454    /// The actor to write the pack file
455    file_writer: Option<Actor>,
456    /// The pack sizer
457    pack_sizer: PackSizer,
458    /// The packer stats
459    stats: PackerStats,
460}
461
462impl<BE: DecryptWriteBackend> RawPacker<BE> {
463    /// Creates a new `RawPacker`.
464    ///
465    /// # Type Parameters
466    ///
467    /// * `BE` - The backend type.
468    ///
469    /// # Arguments
470    ///
471    /// * `be` - The backend to write to.
472    /// * `blob_type` - The blob type.
473    /// * `indexer` - The indexer to write to.
474    /// * `config` - The config file.
475    /// * `total_size` - The total size of the pack file.
476    fn new(
477        be: BE,
478        blob_type: BlobType,
479        indexer: SharedIndexer<BE>,
480        config: &ConfigFile,
481        total_size: u64,
482    ) -> Self {
483        let file_writer = Some(Actor::new(
484            FileWriterHandle {
485                be: be.clone(),
486                indexer,
487                cacheable: blob_type.is_cacheable(),
488            },
489            1,
490            1,
491        ));
492
493        let pack_sizer = PackSizer::from_config(config, blob_type, total_size);
494
495        Self {
496            be,
497            blob_type,
498            file: BytesMut::new(),
499            size: 0,
500            count: 0,
501            created: SystemTime::now(),
502            index: IndexPack::default(),
503            file_writer,
504            pack_sizer,
505            stats: PackerStats::default(),
506        }
507    }
508
509    /// Saves the packfile and returns the stats
510    ///
511    /// # Errors
512    ///
513    /// * If the packfile could not be saved
514    fn finalize(&mut self) -> RusticResult<PackerStats> {
515        self.save().map_err(|err| {
516            err.overwrite_kind(ErrorKind::Internal)
517                .prepend_guidance_line("Failed to save packfile. Data may be lost.")
518                .ask_report()
519        })?;
520
521        self.file_writer.take().unwrap().finalize()?;
522
523        Ok(std::mem::take(&mut self.stats))
524    }
525
526    /// Writes the given data to the packfile.
527    ///
528    /// # Arguments
529    ///
530    /// * `data` - The data to write.
531    ///
532    /// # Returns
533    ///
534    /// The number of bytes written.
535    fn write_data(&mut self, data: &[u8]) -> PackerResult<u32> {
536        let len = data
537            .len()
538            .try_into()
539            .map_err(|err| PackerErrorKind::Conversion {
540                to: "u32",
541                from: "usize",
542                source: err,
543            })?;
544        self.file.extend_from_slice(data);
545        self.size += len;
546        Ok(len)
547    }
548
549    /// Adds the already compressed/encrypted blob to the packfile without any check
550    ///
551    /// # Arguments
552    ///
553    /// * `data` - The blob data
554    /// * `id` - The blob id
555    /// * `data_len` - The length of the blob data
556    /// * `uncompressed_length` - The length of the blob data before compression
557    /// * `size_limit` - The size limit for the pack file
558    ///
559    /// # Errors
560    ///
561    /// * If converting the data length to u64 fails
562    fn add_raw(
563        &mut self,
564        data: &[u8],
565        id: &BlobId,
566        data_len: u64,
567        uncompressed_length: Option<NonZeroU32>,
568        size_limit: Option<u32>,
569    ) -> RusticResult<()> {
570        if self.has(id) {
571            return Ok(());
572        }
573        self.stats.blobs += 1;
574
575        self.stats.data += data_len;
576
577        let data_len_packed: u64 = data.len().try_into().map_err(|err| {
578            RusticError::with_source(
579                ErrorKind::Internal,
580                "Failed to convert data length `{length}` to u64.",
581                err,
582            )
583            .attach_context("length", data.len().to_string())
584        })?;
585
586        self.stats.data_packed += data_len_packed;
587
588        let size_limit = size_limit.unwrap_or_else(|| self.pack_sizer.pack_size());
589
590        let offset = self.size;
591
592        let len = self.write_data(data).map_err(|err| {
593            RusticError::with_source(
594                ErrorKind::Internal,
595                "Failed to write data to packfile for blob `{id}`.",
596                err,
597            )
598            .attach_context("id", id.to_string())
599            .attach_context("size_limit", size_limit.to_string())
600            .attach_context("data_length_packed", data_len_packed.to_string())
601        })?;
602
603        self.index
604            .add(*id, self.blob_type, offset, len, uncompressed_length);
605
606        self.count += 1;
607
608        // check if PackFile needs to be saved
609        let elapsed = self.created.elapsed().unwrap_or_else(|err| {
610            warn!("couldn't get elapsed time from system time: {err:?}");
611            Duration::ZERO
612        });
613
614        if self.count >= constants::MAX_COUNT
615            || self.size >= size_limit
616            || elapsed >= constants::MAX_AGE
617        {
618            self.pack_sizer.add_size(self.index.pack_size());
619            self.save()?;
620            self.size = 0;
621            self.count = 0;
622            self.created = SystemTime::now();
623        }
624        Ok(())
625    }
626
627    /// Writes header and length of header to packfile
628    ///
629    /// # Errors
630    ///
631    /// * If converting the header length to u32 fails
632    /// * If the header could not be written
633    fn write_header(&mut self) -> RusticResult<()> {
634        // compute the pack header
635        let data = PackHeaderRef::from_index_pack(&self.index)
636            .to_binary()
637            .map_err(|err| -> Box<RusticError> {
638                RusticError::with_source(
639                    ErrorKind::Internal,
640                    "Failed to convert pack header `{index_pack_id}` to binary representation.",
641                    err,
642                )
643                .attach_context("index_pack_id", self.index.id.to_string())
644            })?;
645
646        // encrypt and write to pack file
647        let data = self.be.key().encrypt_data(&data)?;
648
649        let headerlen: u32 = data.len().try_into().map_err(|err| {
650            RusticError::with_source(
651                ErrorKind::Internal,
652                "Failed to convert header length `{length}` to u32.",
653                err,
654            )
655            .attach_context("length", data.len().to_string())
656        })?;
657
658        // write header to pack file
659        _ = self.write_data(&data).map_err(|err| {
660            RusticError::with_source(
661                ErrorKind::Internal,
662                "Failed to write header with length `{length}` to packfile.",
663                err,
664            )
665            .attach_context("length", headerlen.to_string())
666        })?;
667
668        // convert header length to binary representation
669        let binary_repr = PackHeaderLength::from_u32(headerlen)
670            .to_binary()
671            .map_err(|err| {
672                RusticError::with_source(
673                    ErrorKind::Internal,
674                    "Failed to convert header length `{length}` to binary representation.",
675                    err,
676                )
677                .attach_context("length", headerlen.to_string())
678            })?;
679
680        // finally write length of header unencrypted to pack file
681        _ = self.write_data(&binary_repr).map_err(|err| {
682            RusticError::with_source(
683                ErrorKind::Internal,
684                "Failed to write header length `{length}` to packfile.",
685                err,
686            )
687            .attach_context("length", headerlen.to_string())
688        })?;
689
690        Ok(())
691    }
692
693    /// Saves the packfile
694    ///
695    /// # Errors
696    ///
697    /// If the header could not be written
698    ///
699    /// # Errors
700    ///
701    /// * If converting the header length to u32 fails
702    /// * If the header could not be written
703    fn save(&mut self) -> RusticResult<()> {
704        if self.size == 0 {
705            return Ok(());
706        }
707
708        self.write_header()?;
709
710        // write file to backend
711        let index = std::mem::take(&mut self.index);
712        let file = std::mem::replace(&mut self.file, BytesMut::new());
713        self.file_writer
714            .as_ref()
715            .unwrap()
716            .send((file.into(), index))
717            .map_err(|err| {
718                RusticError::with_source(
719                    ErrorKind::Internal,
720                    "Failed to send packfile to file writer.",
721                    err,
722                )
723            })?;
724
725        Ok(())
726    }
727
728    fn has(&self, id: &BlobId) -> bool {
729        self.index.blobs.iter().any(|b| &b.id == id)
730    }
731}
732
733// TODO: add documentation
734/// # Type Parameters
735///
736/// * `BE` - The backend type.
737#[derive(Clone)]
738pub(crate) struct FileWriterHandle<BE: DecryptWriteBackend> {
739    /// The backend to write to.
740    be: BE,
741    /// The shared indexer containing the backend.
742    indexer: SharedIndexer<BE>,
743    /// Whether the file is cacheable.
744    cacheable: bool,
745}
746
747impl<BE: DecryptWriteBackend> FileWriterHandle<BE> {
748    // TODO: add documentation
749    fn process(&self, load: (Bytes, PackId, IndexPack)) -> RusticResult<IndexPack> {
750        let (file, id, mut index) = load;
751        index.id = id;
752        self.be
753            .write_bytes(FileType::Pack, &id, self.cacheable, file)?;
754        index.time = Some(Local::now());
755        Ok(index)
756    }
757
758    fn index(&self, index: IndexPack) -> RusticResult<()> {
759        self.indexer.write().unwrap().add(index)?;
760        Ok(())
761    }
762}
763
764// TODO: add documentation
765pub(crate) struct Actor {
766    /// The sender to send blobs to the raw packer.
767    sender: Sender<(Bytes, IndexPack)>,
768    /// The receiver to receive the status from the raw packer.
769    finish: Receiver<RusticResult<()>>,
770}
771
772impl Actor {
773    /// Creates a new `Actor`.
774    ///
775    /// # Type Parameters
776    ///
777    /// * `BE` - The backend type.
778    ///
779    /// # Arguments
780    ///
781    /// * `fwh` - The file writer handle.
782    /// * `queue_len` - The length of the queue.
783    /// * `par` - The number of parallel threads.
784    fn new<BE: DecryptWriteBackend>(
785        fwh: FileWriterHandle<BE>,
786        queue_len: usize,
787        _par: usize,
788    ) -> Self {
789        let (tx, rx) = bounded(queue_len);
790        let (finish_tx, finish_rx) = bounded::<RusticResult<()>>(0);
791
792        let _join_handle = std::thread::spawn(move || {
793            scope(|scope| {
794                let status = rx
795                    .into_iter()
796                    .readahead_scoped(scope)
797                    .map(|(file, index): (Bytes, IndexPack)| {
798                        let id = hash(&file);
799                        (file, PackId::from(id), index)
800                    })
801                    .readahead_scoped(scope)
802                    .map(|load| fwh.process(load))
803                    .readahead_scoped(scope)
804                    .try_for_each(|index| fwh.index(index?));
805                _ = finish_tx.send(status);
806            })
807            .unwrap();
808        });
809
810        Self {
811            sender: tx,
812            finish: finish_rx,
813        }
814    }
815
816    /// Sends the given data to the actor.
817    ///
818    /// # Arguments
819    ///
820    /// * `load` - The data to send.
821    ///
822    /// # Errors
823    ///
824    /// If sending the message to the actor fails.
825    fn send(&self, load: (Bytes, IndexPack)) -> PackerResult<()> {
826        self.sender.send(load.clone()).map_err(|err| {
827            PackerErrorKind::SendingCrossbeamDataMessage {
828                data: load.0,
829                index_pack: load.1,
830                source: err,
831            }
832        })?;
833        Ok(())
834    }
835
836    /// Finalizes the actor and does cleanup
837    ///
838    /// # Panics
839    ///
840    /// * If the receiver is not present
841    fn finalize(self) -> RusticResult<()> {
842        // cancel channel
843        drop(self.sender);
844        // wait for items in channel to be processed
845        self.finish.recv().unwrap()
846    }
847}
848
849/// The `Repacker` is responsible for repacking blobs into pack files.
850///
851/// # Type Parameters
852///
853/// * `BE` - The backend to read from.
854#[allow(missing_debug_implementations)]
855pub struct Repacker<BE>
856where
857    BE: DecryptFullBackend,
858{
859    /// The backend to read from.
860    be: BE,
861    /// The packer to write to.
862    packer: Packer<BE>,
863    /// The size limit of the pack file.
864    size_limit: u32,
865}
866
867impl<BE: DecryptFullBackend> Repacker<BE> {
868    /// Creates a new `Repacker`.
869    ///
870    /// # Type Parameters
871    ///
872    /// * `BE` - The backend to read from.
873    ///
874    /// # Arguments
875    ///
876    /// * `be` - The backend to read from.
877    /// * `blob_type` - The blob type.
878    /// * `indexer` - The indexer to write to.
879    /// * `config` - The config file.
880    /// * `total_size` - The total size of the pack file.
881    ///
882    /// # Errors
883    ///
884    /// * If the Packer could not be created
885    pub fn new(
886        be: BE,
887        blob_type: BlobType,
888        indexer: SharedIndexer<BE>,
889        config: &ConfigFile,
890        total_size: u64,
891    ) -> RusticResult<Self> {
892        let packer = Packer::new(be.clone(), blob_type, indexer, config, total_size)?;
893        let size_limit = PackSizer::from_config(config, blob_type, total_size).pack_size();
894        Ok(Self {
895            be,
896            packer,
897            size_limit,
898        })
899    }
900
901    /// Adds the blob to the packfile without any check
902    ///
903    /// # Arguments
904    ///
905    /// * `pack_id` - The pack id
906    /// * `blob` - The blob to add
907    ///
908    /// # Errors
909    ///
910    /// * If the blob could not be added
911    /// * If reading the blob from the backend fails
912    pub fn add_fast(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> {
913        let data = self.be.read_partial(
914            FileType::Pack,
915            pack_id,
916            blob.tpe.is_cacheable(),
917            blob.offset,
918            blob.length,
919        )?;
920
921        self.packer
922            .add_raw(
923                &data,
924                &blob.id,
925                0,
926                blob.uncompressed_length,
927                Some(self.size_limit),
928            )
929            .map_err(|err| {
930                err.overwrite_kind(ErrorKind::Internal)
931                    .prepend_guidance_line(
932                        "Failed to fast-add (unchecked) blob `{blob_id}` to packfile.",
933                    )
934                    .attach_context("blob_id", blob.id.to_string())
935            })?;
936
937        Ok(())
938    }
939
940    /// Adds the blob to the packfile
941    ///
942    /// # Arguments
943    ///
944    /// * `pack_id` - The pack id
945    /// * `blob` - The blob to add
946    ///
947    /// # Errors
948    ///
949    /// * If the blob could not be added
950    /// * If reading the blob from the backend fails
951    pub fn add(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> {
952        let data = self.be.read_encrypted_partial(
953            FileType::Pack,
954            pack_id,
955            blob.tpe.is_cacheable(),
956            blob.offset,
957            blob.length,
958            blob.uncompressed_length,
959        )?;
960
961        self.packer
962            .add_with_sizelimit(data, blob.id, Some(self.size_limit))
963            .map_err(|err| {
964                RusticError::with_source(
965                    ErrorKind::Internal,
966                    "Failed to add blob to packfile.",
967                    err,
968                )
969            })?;
970
971        Ok(())
972    }
973
974    /// Finalizes the repacker and returns the stats
975    pub fn finalize(self) -> RusticResult<PackerStats> {
976        self.packer.finalize()
977    }
978}