Skip to main content

rustic_core/blob/
packer.rs

1use std::{
2    num::NonZeroU32,
3    sync::{Arc, RwLock},
4    thread::scope,
5    time::{Duration, SystemTime},
6};
7
8use bytes::{Bytes, BytesMut};
9use crossbeam_channel::{Receiver, Sender, bounded};
10use integer_sqrt::IntegerSquareRoot;
11use jiff::Timestamp;
12use log::warn;
13use pariter::IteratorExt;
14
15use crate::{
16    Progress,
17    backend::{
18        FileType,
19        decrypt::{DecryptFullBackend, DecryptWriteBackend},
20    },
21    blob::{BlobId, BlobLocations, BlobType},
22    crypto::{CryptoKey, hasher::hash},
23    error::{ErrorKind, RusticError, RusticResult},
24    index::{IndexEntry, indexer::SharedIndexer},
25    repofile::{
26        configfile::ConfigFile,
27        indexfile::{IndexBlob, IndexPack},
28        packfile::{PackHeaderLength, PackHeaderRef, PackId},
29        snapshotfile::SnapshotSummary,
30    },
31};
32
33/// [`PackerErrorKind`] describes the errors that can be returned for a Packer
34#[derive(thiserror::Error, Debug, displaydoc::Display)]
35#[non_exhaustive]
36pub enum PackerErrorKind {
37    /// Conversion from `{from}` to `{to}` failed: `{source}`
38    Conversion {
39        to: &'static str,
40        from: &'static str,
41        source: std::num::TryFromIntError,
42    },
43    /// Sending crossbeam message failed: `id`: `{id:?}`, `data`: `{data:?}` : `{source}`
44    SendingCrossbeamMessage {
45        id: BlobId,
46        data: Bytes,
47        source: crossbeam_channel::SendError<(Bytes, BlobId)>,
48    },
49    /// Sending crossbeam data message failed: `data`: `{data:?}`, `index_pack`: `{index_pack:?}` : `{source}`
50    SendingCrossbeamDataMessage {
51        data: Bytes,
52        index_pack: Box<IndexPack>,
53        source: crossbeam_channel::SendError<(Bytes, IndexPack)>,
54    },
55}
56
57pub(crate) type PackerResult<T> = Result<T, Box<PackerErrorKind>>;
58
59pub(super) mod constants {
60    use std::time::Duration;
61
62    /// Kilobyte in bytes
63    pub(super) const KB: u32 = 1024;
64    /// Megabyte in bytes
65    pub(super) const MB: u32 = 1024 * KB;
66    /// The absolute maximum size of a pack: including headers it should not exceed 4 GB
67    pub(super) const MAX_SIZE: u32 = 4076 * MB;
68    /// The maximum number of blobs in a pack
69    pub(super) const MAX_COUNT: u32 = 10_000;
70    /// The maximum age of a pack
71    pub(super) const MAX_AGE: Duration = Duration::from_secs(300);
72}
73
74/// The pack sizer is responsible for computing the size of the pack file.
75#[derive(Debug, Clone, Copy)]
76pub struct PackSizer {
77    /// The default size of a pack file.
78    default_size: u32,
79    /// The grow factor of a pack file.
80    grow_factor: u32,
81    /// The size limit of a pack file.
82    size_limit: u32,
83    /// The current size of a pack file.
84    current_size: u64,
85    /// The minimum pack size tolerance in percent before a repack is triggered.
86    min_packsize_tolerate_percent: u32,
87    /// The maximum pack size tolerance in percent before a repack is triggered.
88    max_packsize_tolerate_percent: u32,
89}
90
91impl PackSizer {
92    /// Creates a new `PackSizer` from a config file.
93    ///
94    /// # Arguments
95    ///
96    /// * `config` - The config file.
97    /// * `blob_type` - The blob type.
98    /// * `current_size` - The current size of the pack file.
99    ///
100    /// # Returns
101    ///
102    /// A new `PackSizer`.
103    #[must_use]
104    pub fn from_config(config: &ConfigFile, blob_type: BlobType, current_size: u64) -> Self {
105        let (default_size, grow_factor, size_limit) = config.packsize(blob_type);
106        let (min_packsize_tolerate_percent, max_packsize_tolerate_percent) =
107            config.packsize_ok_percents();
108        Self {
109            default_size,
110            grow_factor,
111            size_limit,
112            current_size,
113            min_packsize_tolerate_percent,
114            max_packsize_tolerate_percent,
115        }
116    }
117
118    /// Creates a new `PackSizer` with a fixed size
119    ///
120    /// # Arguments
121    ///
122    /// * `size` - The fixed size to use.
123    ///
124    /// # Returns
125    ///
126    /// A new `PackSizer`.
127    #[must_use]
128    pub fn fixed(size: u32) -> Self {
129        Self {
130            default_size: size,
131            grow_factor: 0,
132            size_limit: size,
133            current_size: 0,
134            min_packsize_tolerate_percent: 100,
135            max_packsize_tolerate_percent: 100,
136        }
137    }
138
139    /// Computes the size of the pack file.
140    #[must_use]
141    #[allow(clippy::cast_possible_truncation)]
142    pub fn pack_size(&self) -> u32 {
143        let size = if self.grow_factor == 0 {
144            self.default_size
145        } else {
146            // The cast actually shouldn't pose any problems.
147            // `current_size` is `u64`, the maximum value is `2^64-1`.
148            // `isqrt(2^64-1) = 2^32-1` which fits into a `u32`. (@aawsome)
149            self.current_size.integer_sqrt() as u32 * self.grow_factor + self.default_size
150        };
151        size.min(self.size_limit).min(constants::MAX_SIZE)
152    }
153
154    /// Evaluates whether the given size is not too small or too large
155    ///
156    /// # Arguments
157    ///
158    /// * `size` - The size to check
159    #[must_use]
160    pub fn size_ok(&self, size: u32) -> bool {
161        !self.is_too_small(size) && !self.is_too_large(size)
162    }
163
164    /// Evaluates whether the given size is too small
165    ///
166    /// # Arguments
167    ///
168    /// * `size` - The size to check
169    #[must_use]
170    pub fn is_too_small(&self, size: u32) -> bool {
171        let target_size = self.pack_size();
172        // Note: we cast to u64 so that no overflow can occur in the multiplications
173        u64::from(size) * 100
174            < u64::from(target_size) * u64::from(self.min_packsize_tolerate_percent)
175    }
176
177    /// Evaluates whether the given size is too large
178    ///
179    /// # Arguments
180    ///
181    /// * `size` - The size to check
182    #[must_use]
183    pub fn is_too_large(&self, size: u32) -> bool {
184        let target_size = self.pack_size();
185        // Note: we cast to u64 so that no overflow can occur in the multiplications
186        u64::from(size) * 100
187            > u64::from(target_size) * u64::from(self.max_packsize_tolerate_percent)
188    }
189
190    /// Adds the given size to the current size.
191    ///
192    /// # Arguments
193    ///
194    /// * `added` - The size to add
195    ///
196    /// # Panics
197    ///
198    /// * If the size is too large
199    pub fn add_size(&mut self, added: u32) {
200        self.current_size += u64::from(added);
201    }
202}
203
204/// The `Packer` is responsible for packing blobs into pack files.
205///
206/// # Type Parameters
207///
208/// * `BE` - The backend type.
209#[allow(missing_debug_implementations)]
210#[allow(clippy::struct_field_names)]
211#[derive(Clone)]
212pub struct Packer<BE: DecryptWriteBackend> {
213    /// The raw packer wrapped in an `Arc` and `RwLock`.
214    // This is a hack: raw_packer and indexer are only used in the add_raw() method.
215    // TODO: Refactor as actor, like the other add() methods
216    raw_packer: Arc<RwLock<RawPacker<BE>>>,
217    /// The shared indexer containing the backend.
218    indexer: SharedIndexer<BE>,
219    /// The sender to send blobs to the raw packer.
220    sender: Sender<(Bytes, BlobId)>,
221    /// The receiver to receive the status from the raw packer.
222    finish: Receiver<RusticResult<PackerStats>>,
223}
224
225impl<BE: DecryptWriteBackend> Packer<BE> {
226    /// Creates a new `Packer`.
227    ///
228    /// # Type Parameters
229    ///
230    /// * `BE` - The backend type.
231    ///
232    /// # Arguments
233    ///
234    /// * `be` - The backend to write to.
235    /// * `blob_type` - The blob type.
236    /// * `indexer` - The indexer to write to.
237    /// * `config` - The config file.
238    /// * `total_size` - The total size of the pack file.
239    ///
240    /// # Errors
241    ///
242    /// * If sending the message to the raw packer fails.
243    /// * If converting the data length to u64 fails
244    #[allow(clippy::unnecessary_wraps)]
245    pub fn new(
246        be: BE,
247        blob_type: BlobType,
248        indexer: SharedIndexer<BE>,
249        pack_sizer: PackSizer,
250    ) -> RusticResult<Self> {
251        let raw_packer = Arc::new(RwLock::new(RawPacker::new(
252            be.clone(),
253            blob_type,
254            indexer.clone(),
255            pack_sizer,
256        )));
257
258        let (tx, rx) = bounded(0);
259        let (finish_tx, finish_rx) = bounded::<RusticResult<PackerStats>>(0);
260        let packer = Self {
261            raw_packer: raw_packer.clone(),
262            indexer: indexer.clone(),
263            sender: tx,
264            finish: finish_rx,
265        };
266
267        let _join_handle = std::thread::spawn(move || {
268            scope(|scope| {
269                let status = rx
270                    .into_iter()
271                    .readahead_scoped(scope)
272                    // early check if id is already contained
273                    .filter(|(_, id)| !indexer.read().unwrap().has(id))
274                    .filter(|(_, id)| !raw_packer.read().unwrap().has(id))
275                    .readahead_scoped(scope)
276                    .parallel_map_scoped(scope, |(data, id): (Bytes, BlobId)| {
277                        let (data, data_len, uncompressed_length) = be.process_data(&data)?;
278                        Ok((data, id, u64::from(data_len), uncompressed_length))
279                    })
280                    .readahead_scoped(scope)
281                    // check again if id is already contained
282                    // TODO: We may still save duplicate blobs - the indexer is only updated when the packfile write has completed
283                    .filter(|res| {
284                        res.as_ref()
285                            .map_or_else(|_| true, |(_, id, _, _)| !indexer.read().unwrap().has(id))
286                    })
287                    .try_for_each(|item: RusticResult<_>| -> RusticResult<()> {
288                        let (data, id, data_len, ul) = item?;
289                        raw_packer
290                            .write()
291                            .unwrap()
292                            .add_raw(&data, &id, data_len, ul)
293                    })
294                    .and_then(|()| raw_packer.write().unwrap().finalize());
295                _ = finish_tx.send(status);
296            });
297        });
298
299        Ok(packer)
300    }
301
302    /// Adds the blob to the packfile
303    ///
304    /// # Arguments
305    ///
306    /// * `data` - The blob data
307    /// * `id` - The blob id
308    ///
309    /// # Errors
310    ///
311    /// * If sending the message to the raw packer fails.
312    pub fn add(&self, data: Bytes, id: BlobId) -> RusticResult<()> {
313        self.sender.send((data, id)).map_err(|err| {
314            RusticError::with_source(
315                ErrorKind::Internal,
316                "Sending crossbeam message failed: `id`: `{id}`",
317                err,
318            )
319            .ask_report()
320            .attach_context("id", id.to_hex().to_string())
321        })?;
322        Ok(())
323    }
324
325    /// Adds the already encrypted (and maybe compressed) blob to the packfile
326    ///
327    /// # Arguments
328    ///
329    /// * `data` - The blob data
330    /// * `id` - The blob id
331    /// * `data_len` - The length of the blob data
332    /// * `uncompressed_length` - The length of the blob data before compression
333    /// * `size_limit` - The size limit for the pack file
334    ///
335    /// # Errors
336    ///
337    /// * If the blob is already present in the index
338    /// * If sending the message to the raw packer fails.
339    fn add_raw(
340        &self,
341        data: &[u8],
342        id: &BlobId,
343        data_len: u64,
344        uncompressed_length: Option<NonZeroU32>,
345    ) -> RusticResult<()> {
346        // only add if this blob is not present
347        if self.indexer.read().unwrap().has(id) {
348            Ok(())
349        } else {
350            self.raw_packer
351                .write()
352                .unwrap()
353                .add_raw(data, id, data_len, uncompressed_length)
354        }
355    }
356
357    /// Finalizes the packer and does cleanup
358    ///
359    /// # Panics
360    ///
361    /// * If the channel could not be dropped
362    pub fn finalize(self) -> RusticResult<PackerStats> {
363        // cancel channel
364        drop(self.sender);
365        // wait for items in channel to be processed
366        self.finish
367            .recv()
368            .expect("Should be able to receive from channel to finalize packer.")
369    }
370}
371
372// TODO: add documentation!
373#[derive(Default, Debug, Clone, Copy)]
374pub struct PackerStats {
375    /// The number of blobs added
376    blobs: u64,
377    /// The number of data blobs added
378    data: u64,
379    /// The number of packed data blobs added
380    data_packed: u64,
381}
382
383impl PackerStats {
384    /// Adds the stats to the summary
385    ///
386    /// # Arguments
387    ///
388    /// * `summary` - The summary to add to
389    /// * `tpe` - The blob type
390    ///
391    /// # Panics
392    ///
393    /// * If the blob type is invalid
394    pub fn apply(self, summary: &mut SnapshotSummary, tpe: BlobType) {
395        summary.data_added += self.data;
396        summary.data_added_packed += self.data_packed;
397        match tpe {
398            BlobType::Tree => {
399                summary.tree_blobs += self.blobs;
400                summary.data_added_trees += self.data;
401                summary.data_added_trees_packed += self.data_packed;
402            }
403            BlobType::Data => {
404                summary.data_blobs += self.blobs;
405                summary.data_added_files += self.data;
406                summary.data_added_files_packed += self.data_packed;
407            }
408        }
409    }
410}
411
412/// The `RawPacker` is responsible for packing blobs into pack files.
413///
414/// # Type Parameters
415///
416/// * `BE` - The backend type.
417#[allow(missing_debug_implementations, clippy::module_name_repetitions)]
418pub(crate) struct RawPacker<BE: DecryptWriteBackend> {
419    /// The backend to write to.
420    be: BE,
421    /// The blob type to pack.
422    blob_type: BlobType,
423    /// The file to write to
424    file: BytesMut,
425    /// The size of the file
426    size: u32,
427    /// The number of blobs in the pack
428    count: u32,
429    /// The time the pack was created
430    created: SystemTime,
431    /// The index of the pack
432    index: IndexPack,
433    /// The actor to write the pack file
434    file_writer: Option<Actor>,
435    /// The pack sizer
436    pack_sizer: PackSizer,
437    /// The packer stats
438    stats: PackerStats,
439}
440
441impl<BE: DecryptWriteBackend> RawPacker<BE> {
442    /// Creates a new `RawPacker`.
443    ///
444    /// # Type Parameters
445    ///
446    /// * `BE` - The backend type.
447    ///
448    /// # Arguments
449    ///
450    /// * `be` - The backend to write to.
451    /// * `blob_type` - The blob type.
452    /// * `indexer` - The indexer to write to.
453    /// * `config` - The config file.
454    /// * `total_size` - The total size of the pack file.
455    fn new(be: BE, blob_type: BlobType, indexer: SharedIndexer<BE>, pack_sizer: PackSizer) -> Self {
456        let file_writer = Some(Actor::new(
457            FileWriterHandle {
458                be: be.clone(),
459                indexer,
460                cacheable: blob_type.is_cacheable(),
461            },
462            1,
463            1,
464        ));
465
466        Self {
467            be,
468            blob_type,
469            file: BytesMut::new(),
470            size: 0,
471            count: 0,
472            created: SystemTime::now(),
473            index: IndexPack::default(),
474            file_writer,
475            pack_sizer,
476            stats: PackerStats::default(),
477        }
478    }
479
480    /// Saves the packfile and returns the stats
481    ///
482    /// # Errors
483    ///
484    /// * If the packfile could not be saved
485    fn finalize(&mut self) -> RusticResult<PackerStats> {
486        self.save().map_err(|err| {
487            err.overwrite_kind(ErrorKind::Internal)
488                .prepend_guidance_line("Failed to save packfile. Data may be lost.")
489                .ask_report()
490        })?;
491
492        self.file_writer.take().unwrap().finalize()?;
493
494        Ok(std::mem::take(&mut self.stats))
495    }
496
497    /// Writes the given data to the packfile.
498    ///
499    /// # Arguments
500    ///
501    /// * `data` - The data to write.
502    ///
503    /// # Returns
504    ///
505    /// The number of bytes written.
506    fn write_data(&mut self, data: &[u8]) -> PackerResult<u32> {
507        let len = data
508            .len()
509            .try_into()
510            .map_err(|err| PackerErrorKind::Conversion {
511                to: "u32",
512                from: "usize",
513                source: err,
514            })?;
515        self.file.extend_from_slice(data);
516        self.size += len;
517        Ok(len)
518    }
519
520    /// Adds the already compressed/encrypted blob to the packfile without any check
521    ///
522    /// # Arguments
523    ///
524    /// * `data` - The blob data
525    /// * `id` - The blob id
526    /// * `data_len` - The length of the blob data
527    /// * `uncompressed_length` - The length of the blob data before compression
528    /// * `size_limit` - The size limit for the pack file
529    ///
530    /// # Errors
531    ///
532    /// * If converting the data length to u64 fails
533    fn add_raw(
534        &mut self,
535        data: &[u8],
536        id: &BlobId,
537        data_len: u64,
538        uncompressed_length: Option<NonZeroU32>,
539    ) -> RusticResult<()> {
540        if self.has(id) {
541            return Ok(());
542        }
543        self.stats.blobs += 1;
544
545        self.stats.data += data_len;
546
547        let data_len_packed: u64 = data.len().try_into().map_err(|err| {
548            RusticError::with_source(
549                ErrorKind::Internal,
550                "Failed to convert data length `{length}` to u64.",
551                err,
552            )
553            .attach_context("length", data.len().to_string())
554        })?;
555
556        self.stats.data_packed += data_len_packed;
557
558        let size_limit = self.pack_sizer.pack_size();
559
560        let offset = self.size;
561
562        let len = self.write_data(data).map_err(|err| {
563            RusticError::with_source(
564                ErrorKind::Internal,
565                "Failed to write data to packfile for blob `{id}`.",
566                err,
567            )
568            .attach_context("id", id.to_string())
569            .attach_context("size_limit", size_limit.to_string())
570            .attach_context("data_length_packed", data_len_packed.to_string())
571        })?;
572
573        self.index
574            .add(*id, self.blob_type, offset, len, uncompressed_length);
575
576        self.count += 1;
577
578        // check if PackFile needs to be saved
579        let elapsed = self.created.elapsed().unwrap_or_else(|err| {
580            warn!("couldn't get elapsed time from system time: {err:?}");
581            Duration::ZERO
582        });
583
584        if self.count >= constants::MAX_COUNT
585            || self.size >= size_limit
586            || elapsed >= constants::MAX_AGE
587        {
588            self.pack_sizer.add_size(self.index.pack_size());
589            self.save()?;
590            self.size = 0;
591            self.count = 0;
592            self.created = SystemTime::now();
593        }
594        Ok(())
595    }
596
597    /// Writes header and length of header to packfile
598    ///
599    /// # Errors
600    ///
601    /// * If converting the header length to u32 fails
602    /// * If the header could not be written
603    fn write_header(&mut self) -> RusticResult<()> {
604        // compute the pack header
605        let data = PackHeaderRef::from_index_pack(&self.index)
606            .to_binary()
607            .map_err(|err| -> Box<RusticError> {
608                RusticError::with_source(
609                    ErrorKind::Internal,
610                    "Failed to convert pack header `{index_pack_id}` to binary representation.",
611                    err,
612                )
613                .attach_context("index_pack_id", self.index.id.to_string())
614            })?;
615
616        // encrypt and write to pack file
617        let data = self.be.key().encrypt_data(&data)?;
618
619        let headerlen: u32 = data.len().try_into().map_err(|err| {
620            RusticError::with_source(
621                ErrorKind::Internal,
622                "Failed to convert header length `{length}` to u32.",
623                err,
624            )
625            .attach_context("length", data.len().to_string())
626        })?;
627
628        // write header to pack file
629        _ = self.write_data(&data).map_err(|err| {
630            RusticError::with_source(
631                ErrorKind::Internal,
632                "Failed to write header with length `{length}` to packfile.",
633                err,
634            )
635            .attach_context("length", headerlen.to_string())
636        })?;
637
638        // convert header length to binary representation
639        let binary_repr = PackHeaderLength::from_u32(headerlen)
640            .to_binary()
641            .map_err(|err| {
642                RusticError::with_source(
643                    ErrorKind::Internal,
644                    "Failed to convert header length `{length}` to binary representation.",
645                    err,
646                )
647                .attach_context("length", headerlen.to_string())
648            })?;
649
650        // finally write length of header unencrypted to pack file
651        _ = self.write_data(&binary_repr).map_err(|err| {
652            RusticError::with_source(
653                ErrorKind::Internal,
654                "Failed to write header length `{length}` to packfile.",
655                err,
656            )
657            .attach_context("length", headerlen.to_string())
658        })?;
659
660        Ok(())
661    }
662
663    /// Saves the packfile
664    ///
665    /// # Errors
666    ///
667    /// If the header could not be written
668    ///
669    /// # Errors
670    ///
671    /// * If converting the header length to u32 fails
672    /// * If the header could not be written
673    fn save(&mut self) -> RusticResult<()> {
674        if self.size == 0 {
675            return Ok(());
676        }
677
678        self.write_header()?;
679
680        // write file to backend
681        let index = std::mem::take(&mut self.index);
682        let file = std::mem::replace(&mut self.file, BytesMut::new());
683        self.file_writer
684            .as_ref()
685            .unwrap()
686            .send((file.into(), index))
687            .map_err(|err| {
688                RusticError::with_source(
689                    ErrorKind::Internal,
690                    "Failed to send packfile to file writer.",
691                    err,
692                )
693            })?;
694
695        Ok(())
696    }
697
698    fn has(&self, id: &BlobId) -> bool {
699        self.index.blobs.iter().any(|b| &b.id == id)
700    }
701}
702
703// TODO: add documentation
704/// # Type Parameters
705///
706/// * `BE` - The backend type.
707#[derive(Clone)]
708pub(crate) struct FileWriterHandle<BE: DecryptWriteBackend> {
709    /// The backend to write to.
710    be: BE,
711    /// The shared indexer containing the backend.
712    indexer: SharedIndexer<BE>,
713    /// Whether the file is cacheable.
714    cacheable: bool,
715}
716
717impl<BE: DecryptWriteBackend> FileWriterHandle<BE> {
718    // TODO: add documentation
719    fn process(&self, load: (Bytes, PackId, IndexPack)) -> RusticResult<IndexPack> {
720        let (file, id, mut index) = load;
721        index.id = id;
722        self.be
723            .write_bytes(FileType::Pack, &id, self.cacheable, file)?;
724        index.time = Some(Timestamp::now());
725        Ok(index)
726    }
727
728    fn index(&self, index: IndexPack) -> RusticResult<()> {
729        self.indexer.write().unwrap().add(index)?;
730        Ok(())
731    }
732}
733
734// TODO: add documentation
735pub(crate) struct Actor {
736    /// The sender to send blobs to the raw packer.
737    sender: Sender<(Bytes, IndexPack)>,
738    /// The receiver to receive the status from the raw packer.
739    finish: Receiver<RusticResult<()>>,
740}
741
742impl Actor {
743    /// Creates a new `Actor`.
744    ///
745    /// # Type Parameters
746    ///
747    /// * `BE` - The backend type.
748    ///
749    /// # Arguments
750    ///
751    /// * `fwh` - The file writer handle.
752    /// * `queue_len` - The length of the queue.
753    /// * `par` - The number of parallel threads.
754    fn new<BE: DecryptWriteBackend>(
755        fwh: FileWriterHandle<BE>,
756        queue_len: usize,
757        _par: usize,
758    ) -> Self {
759        let (tx, rx) = bounded(queue_len);
760        let (finish_tx, finish_rx) = bounded::<RusticResult<()>>(0);
761
762        let _join_handle = std::thread::spawn(move || {
763            scope(|scope| {
764                let status = rx
765                    .into_iter()
766                    .readahead_scoped(scope)
767                    .map(|(file, index): (Bytes, IndexPack)| {
768                        let id = hash(&file);
769                        (file, PackId::from(id), index)
770                    })
771                    .readahead_scoped(scope)
772                    .map(|load| fwh.process(load))
773                    .readahead_scoped(scope)
774                    .try_for_each(|index| fwh.index(index?));
775                _ = finish_tx.send(status);
776            });
777        });
778
779        Self {
780            sender: tx,
781            finish: finish_rx,
782        }
783    }
784
785    /// Sends the given data to the actor.
786    ///
787    /// # Arguments
788    ///
789    /// * `load` - The data to send.
790    ///
791    /// # Errors
792    ///
793    /// If sending the message to the actor fails.
794    fn send(&self, load: (Bytes, IndexPack)) -> PackerResult<()> {
795        self.sender.send(load.clone()).map_err(|err| {
796            PackerErrorKind::SendingCrossbeamDataMessage {
797                data: load.0,
798                index_pack: Box::new(load.1),
799                source: err,
800            }
801        })?;
802        Ok(())
803    }
804
805    /// Finalizes the actor and does cleanup
806    ///
807    /// # Panics
808    ///
809    /// * If the receiver is not present
810    fn finalize(self) -> RusticResult<()> {
811        // cancel channel
812        drop(self.sender);
813        // wait for items in channel to be processed
814        self.finish.recv().unwrap()
815    }
816}
817
818#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
819pub struct CopyPackBlobs {
820    pub pack_id: PackId,
821    pub locations: BlobLocations<BlobId>,
822}
823
824impl CopyPackBlobs {
825    pub fn from_index_blob(pack_id: PackId, blob: IndexBlob) -> Self {
826        Self {
827            pack_id,
828            locations: BlobLocations::from_blob_location(blob.location, blob.id),
829        }
830    }
831
832    pub fn from_index_entry(entry: IndexEntry, id: BlobId) -> Self {
833        Self {
834            pack_id: entry.pack,
835            locations: BlobLocations::from_blob_location(entry.location, id),
836        }
837    }
838
839    #[allow(clippy::result_large_err)]
840    /// coalesce two `RepackBlobs` if possible
841    pub fn coalesce(self, other: Self) -> Result<Self, (Self, Self)> {
842        if self.pack_id == other.pack_id && self.locations.can_coalesce(&other.locations) {
843            Ok(Self {
844                pack_id: self.pack_id,
845                locations: self.locations.append(other.locations),
846            })
847        } else {
848            Err((self, other))
849        }
850    }
851}
852
853/// The `BlobCopier` is responsible for copying or repacking blobs into pack files.
854///
855/// # Type Parameters
856///
857/// * `BE` - The backend to read from.
858#[allow(missing_debug_implementations)]
859pub struct BlobCopier<BE>
860where
861    BE: DecryptFullBackend,
862{
863    /// The backend to read from.
864    be_src: BE,
865    /// The packer to write to.
866    packer: Packer<BE>,
867    /// The size limit of the pack file.
868    size_limit: u32,
869    /// the blob type
870    blob_type: BlobType,
871}
872
873impl<BE: DecryptFullBackend> BlobCopier<BE> {
874    /// Creates a new `BlobCopier`.
875    ///
876    /// # Type Parameters
877    ///
878    /// * `BE` - The backend to read from.
879    ///
880    /// # Arguments
881    ///
882    /// * `be` - The backend to read from.
883    /// * `blob_type` - The blob type.
884    /// * `indexer` - The indexer to write to.
885    /// * `config` - The config file.
886    /// * `total_size` - The total size of the pack file.
887    ///
888    /// # Errors
889    ///
890    /// * If the Packer could not be created
891    pub fn new(
892        be_src: BE,
893        be_dst: BE,
894        blob_type: BlobType,
895        indexer: SharedIndexer<BE>,
896        pack_sizer: PackSizer,
897    ) -> RusticResult<Self> {
898        let packer = Packer::new(be_dst, blob_type, indexer, pack_sizer)?;
899        let size_limit = pack_sizer.pack_size();
900        Ok(Self {
901            be_src,
902            packer,
903            size_limit,
904            blob_type,
905        })
906    }
907
908    /// Adds the blob to the packfile without any check
909    ///
910    /// # Arguments
911    ///
912    /// * `pack_id` - The pack id
913    /// * `blob` - The blob to add
914    ///
915    /// # Errors
916    ///
917    /// * If the blob could not be added
918    /// * If reading the blob from the backend fails
919    pub fn copy_fast(&self, pack_blobs: CopyPackBlobs, p: &Progress) -> RusticResult<()> {
920        let offset = pack_blobs.locations.offset;
921        let data = self.be_src.read_partial(
922            FileType::Pack,
923            &pack_blobs.pack_id,
924            self.blob_type.is_cacheable(),
925            offset,
926            pack_blobs.locations.length,
927        )?;
928
929        // TODO: write in parallel
930        for (blob, blob_id) in pack_blobs.locations.blobs {
931            let start = usize::try_from(blob.offset - offset)
932                .expect("convert from u32 to usize should not fail!");
933            let end = usize::try_from(blob.offset + blob.length - offset)
934                .expect("convert from u32 to usize should not fail!");
935            self.packer
936                .add_raw(
937                    &data[start..end],
938                    &blob_id,
939                    u64::from(blob.length),
940                    blob.uncompressed_length,
941                )
942                .map_err(|err| {
943                    err.overwrite_kind(ErrorKind::Internal)
944                        .prepend_guidance_line(
945                            "Failed to fast-add (unchecked) blob `{blob_id}` to packfile.",
946                        )
947                        .attach_context("blob_id", blob_id.to_string())
948                })?;
949            p.inc(blob.length.into());
950        }
951
952        Ok(())
953    }
954
955    /// Adds the blob to the packfile
956    ///
957    /// # Arguments
958    ///
959    /// * `pack_id` - The pack id
960    /// * `blob` - The blob to add
961    ///
962    /// # Errors
963    ///
964    /// * If the blob could not be added
965    /// * If reading the blob from the backend fails
966    pub fn copy(&self, pack_blobs: CopyPackBlobs, p: &Progress) -> RusticResult<()> {
967        let offset = pack_blobs.locations.offset;
968        let read_data = self.be_src.read_partial(
969            FileType::Pack,
970            &pack_blobs.pack_id,
971            self.blob_type.is_cacheable(),
972            offset,
973            pack_blobs.locations.length,
974        )?;
975
976        // TODO: write in parallel
977        for (blob, blob_id) in pack_blobs.locations.blobs {
978            let start = usize::try_from(blob.offset - offset)
979                .expect("convert from u32 to usize should not fail!");
980            let end = usize::try_from(blob.offset + blob.length - offset)
981                .expect("convert from u32 to usize should not fail!");
982            let data = self
983                .be_src
984                .read_encrypted_from_partial(&read_data[start..end], blob.uncompressed_length)?;
985
986            self.packer.add(data, blob_id).map_err(|err| {
987                RusticError::with_source(
988                    ErrorKind::Internal,
989                    "Failed to add blob to packfile.",
990                    err,
991                )
992            })?;
993            p.inc(blob.length.into());
994        }
995
996        Ok(())
997    }
998
999    /// Finalizes the repacker and returns the stats
1000    pub fn finalize(self) -> RusticResult<PackerStats> {
1001        self.packer.finalize()
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use super::*;
1008    use insta::assert_ron_snapshot;
1009
1010    #[test]
1011    fn pack_sizers() {
1012        let config = ConfigFile {
1013            treepack_size_limit: Some(5 * 1024 * 1024),
1014            ..Default::default()
1015        };
1016        let mut pack_sizers = [
1017            PackSizer::from_config(&config, BlobType::Tree, 0),
1018            PackSizer::from_config(&config, BlobType::Data, 0),
1019            PackSizer::fixed(12345),
1020        ];
1021
1022        let output: Vec<_> = [
1023            0,
1024            10,
1025            1000,
1026            100_000,
1027            100_000,
1028            100_000,
1029            10_000_000,
1030            10_000_000,
1031            1_000_000_000,
1032            1_000_000_000,
1033        ]
1034        .into_iter()
1035        .map(|i| {
1036            pack_sizers
1037                .iter_mut()
1038                .map(|ps| {
1039                    ps.add_size(i);
1040                    (ps.current_size, ps.pack_size())
1041                })
1042                .collect::<Vec<_>>()
1043        })
1044        .collect();
1045
1046        assert_ron_snapshot!(output);
1047    }
1048}