1use std::{
2 num::NonZeroU32,
3 sync::{Arc, RwLock},
4 time::{Duration, SystemTime},
5};
6
7use bytes::{Bytes, BytesMut};
8use chrono::Local;
9use crossbeam_channel::{Receiver, Sender, bounded};
10use integer_sqrt::IntegerSquareRoot;
11use log::warn;
12use pariter::{IteratorExt, scope};
13
14use crate::{
15 backend::{
16 FileType,
17 decrypt::{DecryptFullBackend, DecryptWriteBackend},
18 },
19 blob::{BlobId, BlobType},
20 crypto::{CryptoKey, hasher::hash},
21 error::{ErrorKind, RusticError, RusticResult},
22 index::indexer::SharedIndexer,
23 repofile::{
24 configfile::ConfigFile,
25 indexfile::{IndexBlob, IndexPack},
26 packfile::{PackHeaderLength, PackHeaderRef, PackId},
27 snapshotfile::SnapshotSummary,
28 },
29};
30
31#[derive(thiserror::Error, Debug, displaydoc::Display)]
33#[non_exhaustive]
34pub enum PackerErrorKind {
35 Conversion {
37 to: &'static str,
38 from: &'static str,
39 source: std::num::TryFromIntError,
40 },
41 SendingCrossbeamMessage {
43 size_limit: Option<u32>,
44 id: BlobId,
45 data: Bytes,
46 source: crossbeam_channel::SendError<(Bytes, BlobId, Option<u32>)>,
47 },
48 SendingCrossbeamDataMessage {
50 data: Bytes,
51 index_pack: IndexPack,
52 source: crossbeam_channel::SendError<(Bytes, IndexPack)>,
53 },
54}
55
56pub(crate) type PackerResult<T> = Result<T, Box<PackerErrorKind>>;
57
58pub(super) mod constants {
59 use std::time::Duration;
60
61 pub(super) const KB: u32 = 1024;
63 pub(super) const MB: u32 = 1024 * KB;
65 pub(super) const MAX_SIZE: u32 = 4076 * MB;
67 pub(super) const MAX_COUNT: u32 = 10_000;
69 pub(super) const MAX_AGE: Duration = Duration::from_secs(300);
71}
72
73#[derive(Debug, Clone, Copy)]
75pub struct PackSizer {
76 default_size: u32,
78 grow_factor: u32,
80 size_limit: u32,
82 current_size: u64,
84 min_packsize_tolerate_percent: u32,
86 max_packsize_tolerate_percent: u32,
88}
89
90impl PackSizer {
91 #[must_use]
103 pub fn from_config(config: &ConfigFile, blob_type: BlobType, current_size: u64) -> Self {
104 let (default_size, grow_factor, size_limit) = config.packsize(blob_type);
105 let (min_packsize_tolerate_percent, max_packsize_tolerate_percent) =
106 config.packsize_ok_percents();
107 Self {
108 default_size,
109 grow_factor,
110 size_limit,
111 current_size,
112 min_packsize_tolerate_percent,
113 max_packsize_tolerate_percent,
114 }
115 }
116
117 #[must_use]
119 #[allow(clippy::cast_possible_truncation)]
123 pub fn pack_size(&self) -> u32 {
124 (self.current_size.integer_sqrt() as u32 * self.grow_factor + self.default_size)
125 .min(self.size_limit)
126 .min(constants::MAX_SIZE)
127 }
128
129 #[must_use]
135 pub fn size_ok(&self, size: u32) -> bool {
136 !self.is_too_small(size) && !self.is_too_large(size)
137 }
138
139 #[must_use]
145 pub fn is_too_small(&self, size: u32) -> bool {
146 let target_size = self.pack_size();
147 u64::from(size) * 100
149 < u64::from(target_size) * u64::from(self.min_packsize_tolerate_percent)
150 }
151
152 #[must_use]
158 pub fn is_too_large(&self, size: u32) -> bool {
159 let target_size = self.pack_size();
160 u64::from(size) * 100
162 > u64::from(target_size) * u64::from(self.max_packsize_tolerate_percent)
163 }
164
165 fn add_size(&mut self, added: u32) {
175 self.current_size += u64::from(added);
176 }
177}
178
179#[allow(missing_debug_implementations)]
185#[allow(clippy::struct_field_names)]
186#[derive(Clone)]
187pub struct Packer<BE: DecryptWriteBackend> {
188 raw_packer: Arc<RwLock<RawPacker<BE>>>,
192 indexer: SharedIndexer<BE>,
194 sender: Sender<(Bytes, BlobId, Option<u32>)>,
196 finish: Receiver<RusticResult<PackerStats>>,
198}
199
200impl<BE: DecryptWriteBackend> Packer<BE> {
201 #[allow(clippy::unnecessary_wraps)]
220 pub fn new(
221 be: BE,
222 blob_type: BlobType,
223 indexer: SharedIndexer<BE>,
224 config: &ConfigFile,
225 total_size: u64,
226 ) -> RusticResult<Self> {
227 let raw_packer = Arc::new(RwLock::new(RawPacker::new(
228 be.clone(),
229 blob_type,
230 indexer.clone(),
231 config,
232 total_size,
233 )));
234
235 let (tx, rx) = bounded(0);
236 let (finish_tx, finish_rx) = bounded::<RusticResult<PackerStats>>(0);
237 let packer = Self {
238 raw_packer: raw_packer.clone(),
239 indexer: indexer.clone(),
240 sender: tx,
241 finish: finish_rx,
242 };
243
244 let _join_handle = std::thread::spawn(move || {
245 scope(|scope| {
246 let status = rx
247 .into_iter()
248 .readahead_scoped(scope)
249 .filter(|(_, id, _)| !indexer.read().unwrap().has(id))
251 .filter(|(_, id, _)| !raw_packer.read().unwrap().has(id))
252 .readahead_scoped(scope)
253 .parallel_map_scoped(
254 scope,
255 |(data, id, size_limit): (Bytes, BlobId, Option<u32>)| {
256 let (data, data_len, uncompressed_length) = be.process_data(&data)?;
257 Ok((
258 data,
259 id,
260 u64::from(data_len),
261 uncompressed_length,
262 size_limit,
263 ))
264 },
265 )
266 .readahead_scoped(scope)
267 .filter(|res| {
270 res.as_ref().map_or_else(
271 |_| true,
272 |(_, id, _, _, _)| !indexer.read().unwrap().has(id),
273 )
274 })
275 .try_for_each(|item: RusticResult<_>| -> RusticResult<()> {
276 let (data, id, data_len, ul, size_limit) = item?;
277 raw_packer
278 .write()
279 .unwrap()
280 .add_raw(&data, &id, data_len, ul, size_limit)
281 })
282 .and_then(|()| raw_packer.write().unwrap().finalize());
283 _ = finish_tx.send(status);
284 })
285 .unwrap();
286 });
287
288 Ok(packer)
289 }
290
291 pub fn add(&self, data: Bytes, id: BlobId) -> RusticResult<()> {
302 self.add_with_sizelimit(data, id, None).map_err(|err| {
304 RusticError::with_source(
305 ErrorKind::Internal,
306 "Failed to add blob `{id}` to packfile.",
307 err,
308 )
309 .attach_context("id", id.to_string())
310 .ask_report()
311 })
312 }
313
314 fn add_with_sizelimit(
326 &self,
327 data: Bytes,
328 id: BlobId,
329 size_limit: Option<u32>,
330 ) -> PackerResult<()> {
331 self.sender
332 .send((data.clone(), id, size_limit))
333 .map_err(|err| PackerErrorKind::SendingCrossbeamMessage {
334 size_limit,
335 id,
336 data,
337 source: err,
338 })?;
339 Ok(())
340 }
341
342 fn add_raw(
357 &self,
358 data: &[u8],
359 id: &BlobId,
360 data_len: u64,
361 uncompressed_length: Option<NonZeroU32>,
362 size_limit: Option<u32>,
363 ) -> RusticResult<()> {
364 if self.indexer.read().unwrap().has(id) {
366 Ok(())
367 } else {
368 self.raw_packer.write().unwrap().add_raw(
369 data,
370 id,
371 data_len,
372 uncompressed_length,
373 size_limit,
374 )
375 }
376 }
377
378 pub fn finalize(self) -> RusticResult<PackerStats> {
384 drop(self.sender);
386 self.finish
388 .recv()
389 .expect("Should be able to receive from channel to finalize packer.")
390 }
391}
392
393#[derive(Default, Debug, Clone, Copy)]
395pub struct PackerStats {
396 blobs: u64,
398 data: u64,
400 data_packed: u64,
402}
403
404impl PackerStats {
405 pub fn apply(self, summary: &mut SnapshotSummary, tpe: BlobType) {
416 summary.data_added += self.data;
417 summary.data_added_packed += self.data_packed;
418 match tpe {
419 BlobType::Tree => {
420 summary.tree_blobs += self.blobs;
421 summary.data_added_trees += self.data;
422 summary.data_added_trees_packed += self.data_packed;
423 }
424 BlobType::Data => {
425 summary.data_blobs += self.blobs;
426 summary.data_added_files += self.data;
427 summary.data_added_files_packed += self.data_packed;
428 }
429 }
430 }
431}
432
433#[allow(missing_debug_implementations, clippy::module_name_repetitions)]
439pub(crate) struct RawPacker<BE: DecryptWriteBackend> {
440 be: BE,
442 blob_type: BlobType,
444 file: BytesMut,
446 size: u32,
448 count: u32,
450 created: SystemTime,
452 index: IndexPack,
454 file_writer: Option<Actor>,
456 pack_sizer: PackSizer,
458 stats: PackerStats,
460}
461
462impl<BE: DecryptWriteBackend> RawPacker<BE> {
463 fn new(
477 be: BE,
478 blob_type: BlobType,
479 indexer: SharedIndexer<BE>,
480 config: &ConfigFile,
481 total_size: u64,
482 ) -> Self {
483 let file_writer = Some(Actor::new(
484 FileWriterHandle {
485 be: be.clone(),
486 indexer,
487 cacheable: blob_type.is_cacheable(),
488 },
489 1,
490 1,
491 ));
492
493 let pack_sizer = PackSizer::from_config(config, blob_type, total_size);
494
495 Self {
496 be,
497 blob_type,
498 file: BytesMut::new(),
499 size: 0,
500 count: 0,
501 created: SystemTime::now(),
502 index: IndexPack::default(),
503 file_writer,
504 pack_sizer,
505 stats: PackerStats::default(),
506 }
507 }
508
509 fn finalize(&mut self) -> RusticResult<PackerStats> {
515 self.save().map_err(|err| {
516 err.overwrite_kind(ErrorKind::Internal)
517 .prepend_guidance_line("Failed to save packfile. Data may be lost.")
518 .ask_report()
519 })?;
520
521 self.file_writer.take().unwrap().finalize()?;
522
523 Ok(std::mem::take(&mut self.stats))
524 }
525
526 fn write_data(&mut self, data: &[u8]) -> PackerResult<u32> {
536 let len = data
537 .len()
538 .try_into()
539 .map_err(|err| PackerErrorKind::Conversion {
540 to: "u32",
541 from: "usize",
542 source: err,
543 })?;
544 self.file.extend_from_slice(data);
545 self.size += len;
546 Ok(len)
547 }
548
549 fn add_raw(
563 &mut self,
564 data: &[u8],
565 id: &BlobId,
566 data_len: u64,
567 uncompressed_length: Option<NonZeroU32>,
568 size_limit: Option<u32>,
569 ) -> RusticResult<()> {
570 if self.has(id) {
571 return Ok(());
572 }
573 self.stats.blobs += 1;
574
575 self.stats.data += data_len;
576
577 let data_len_packed: u64 = data.len().try_into().map_err(|err| {
578 RusticError::with_source(
579 ErrorKind::Internal,
580 "Failed to convert data length `{length}` to u64.",
581 err,
582 )
583 .attach_context("length", data.len().to_string())
584 })?;
585
586 self.stats.data_packed += data_len_packed;
587
588 let size_limit = size_limit.unwrap_or_else(|| self.pack_sizer.pack_size());
589
590 let offset = self.size;
591
592 let len = self.write_data(data).map_err(|err| {
593 RusticError::with_source(
594 ErrorKind::Internal,
595 "Failed to write data to packfile for blob `{id}`.",
596 err,
597 )
598 .attach_context("id", id.to_string())
599 .attach_context("size_limit", size_limit.to_string())
600 .attach_context("data_length_packed", data_len_packed.to_string())
601 })?;
602
603 self.index
604 .add(*id, self.blob_type, offset, len, uncompressed_length);
605
606 self.count += 1;
607
608 let elapsed = self.created.elapsed().unwrap_or_else(|err| {
610 warn!("couldn't get elapsed time from system time: {err:?}");
611 Duration::ZERO
612 });
613
614 if self.count >= constants::MAX_COUNT
615 || self.size >= size_limit
616 || elapsed >= constants::MAX_AGE
617 {
618 self.pack_sizer.add_size(self.index.pack_size());
619 self.save()?;
620 self.size = 0;
621 self.count = 0;
622 self.created = SystemTime::now();
623 }
624 Ok(())
625 }
626
627 fn write_header(&mut self) -> RusticResult<()> {
634 let data = PackHeaderRef::from_index_pack(&self.index)
636 .to_binary()
637 .map_err(|err| -> Box<RusticError> {
638 RusticError::with_source(
639 ErrorKind::Internal,
640 "Failed to convert pack header `{index_pack_id}` to binary representation.",
641 err,
642 )
643 .attach_context("index_pack_id", self.index.id.to_string())
644 })?;
645
646 let data = self.be.key().encrypt_data(&data)?;
648
649 let headerlen: u32 = data.len().try_into().map_err(|err| {
650 RusticError::with_source(
651 ErrorKind::Internal,
652 "Failed to convert header length `{length}` to u32.",
653 err,
654 )
655 .attach_context("length", data.len().to_string())
656 })?;
657
658 _ = self.write_data(&data).map_err(|err| {
660 RusticError::with_source(
661 ErrorKind::Internal,
662 "Failed to write header with length `{length}` to packfile.",
663 err,
664 )
665 .attach_context("length", headerlen.to_string())
666 })?;
667
668 let binary_repr = PackHeaderLength::from_u32(headerlen)
670 .to_binary()
671 .map_err(|err| {
672 RusticError::with_source(
673 ErrorKind::Internal,
674 "Failed to convert header length `{length}` to binary representation.",
675 err,
676 )
677 .attach_context("length", headerlen.to_string())
678 })?;
679
680 _ = self.write_data(&binary_repr).map_err(|err| {
682 RusticError::with_source(
683 ErrorKind::Internal,
684 "Failed to write header length `{length}` to packfile.",
685 err,
686 )
687 .attach_context("length", headerlen.to_string())
688 })?;
689
690 Ok(())
691 }
692
693 fn save(&mut self) -> RusticResult<()> {
704 if self.size == 0 {
705 return Ok(());
706 }
707
708 self.write_header()?;
709
710 let index = std::mem::take(&mut self.index);
712 let file = std::mem::replace(&mut self.file, BytesMut::new());
713 self.file_writer
714 .as_ref()
715 .unwrap()
716 .send((file.into(), index))
717 .map_err(|err| {
718 RusticError::with_source(
719 ErrorKind::Internal,
720 "Failed to send packfile to file writer.",
721 err,
722 )
723 })?;
724
725 Ok(())
726 }
727
728 fn has(&self, id: &BlobId) -> bool {
729 self.index.blobs.iter().any(|b| &b.id == id)
730 }
731}
732
733#[derive(Clone)]
738pub(crate) struct FileWriterHandle<BE: DecryptWriteBackend> {
739 be: BE,
741 indexer: SharedIndexer<BE>,
743 cacheable: bool,
745}
746
747impl<BE: DecryptWriteBackend> FileWriterHandle<BE> {
748 fn process(&self, load: (Bytes, PackId, IndexPack)) -> RusticResult<IndexPack> {
750 let (file, id, mut index) = load;
751 index.id = id;
752 self.be
753 .write_bytes(FileType::Pack, &id, self.cacheable, file)?;
754 index.time = Some(Local::now());
755 Ok(index)
756 }
757
758 fn index(&self, index: IndexPack) -> RusticResult<()> {
759 self.indexer.write().unwrap().add(index)?;
760 Ok(())
761 }
762}
763
764pub(crate) struct Actor {
766 sender: Sender<(Bytes, IndexPack)>,
768 finish: Receiver<RusticResult<()>>,
770}
771
772impl Actor {
773 fn new<BE: DecryptWriteBackend>(
785 fwh: FileWriterHandle<BE>,
786 queue_len: usize,
787 _par: usize,
788 ) -> Self {
789 let (tx, rx) = bounded(queue_len);
790 let (finish_tx, finish_rx) = bounded::<RusticResult<()>>(0);
791
792 let _join_handle = std::thread::spawn(move || {
793 scope(|scope| {
794 let status = rx
795 .into_iter()
796 .readahead_scoped(scope)
797 .map(|(file, index): (Bytes, IndexPack)| {
798 let id = hash(&file);
799 (file, PackId::from(id), index)
800 })
801 .readahead_scoped(scope)
802 .map(|load| fwh.process(load))
803 .readahead_scoped(scope)
804 .try_for_each(|index| fwh.index(index?));
805 _ = finish_tx.send(status);
806 })
807 .unwrap();
808 });
809
810 Self {
811 sender: tx,
812 finish: finish_rx,
813 }
814 }
815
816 fn send(&self, load: (Bytes, IndexPack)) -> PackerResult<()> {
826 self.sender.send(load.clone()).map_err(|err| {
827 PackerErrorKind::SendingCrossbeamDataMessage {
828 data: load.0,
829 index_pack: load.1,
830 source: err,
831 }
832 })?;
833 Ok(())
834 }
835
836 fn finalize(self) -> RusticResult<()> {
842 drop(self.sender);
844 self.finish.recv().unwrap()
846 }
847}
848
849#[allow(missing_debug_implementations)]
855pub struct Repacker<BE>
856where
857 BE: DecryptFullBackend,
858{
859 be: BE,
861 packer: Packer<BE>,
863 size_limit: u32,
865}
866
867impl<BE: DecryptFullBackend> Repacker<BE> {
868 pub fn new(
886 be: BE,
887 blob_type: BlobType,
888 indexer: SharedIndexer<BE>,
889 config: &ConfigFile,
890 total_size: u64,
891 ) -> RusticResult<Self> {
892 let packer = Packer::new(be.clone(), blob_type, indexer, config, total_size)?;
893 let size_limit = PackSizer::from_config(config, blob_type, total_size).pack_size();
894 Ok(Self {
895 be,
896 packer,
897 size_limit,
898 })
899 }
900
901 pub fn add_fast(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> {
913 let data = self.be.read_partial(
914 FileType::Pack,
915 pack_id,
916 blob.tpe.is_cacheable(),
917 blob.offset,
918 blob.length,
919 )?;
920
921 self.packer
922 .add_raw(
923 &data,
924 &blob.id,
925 0,
926 blob.uncompressed_length,
927 Some(self.size_limit),
928 )
929 .map_err(|err| {
930 err.overwrite_kind(ErrorKind::Internal)
931 .prepend_guidance_line(
932 "Failed to fast-add (unchecked) blob `{blob_id}` to packfile.",
933 )
934 .attach_context("blob_id", blob.id.to_string())
935 })?;
936
937 Ok(())
938 }
939
940 pub fn add(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> {
952 let data = self.be.read_encrypted_partial(
953 FileType::Pack,
954 pack_id,
955 blob.tpe.is_cacheable(),
956 blob.offset,
957 blob.length,
958 blob.uncompressed_length,
959 )?;
960
961 self.packer
962 .add_with_sizelimit(data, blob.id, Some(self.size_limit))
963 .map_err(|err| {
964 RusticError::with_source(
965 ErrorKind::Internal,
966 "Failed to add blob to packfile.",
967 err,
968 )
969 })?;
970
971 Ok(())
972 }
973
974 pub fn finalize(self) -> RusticResult<PackerStats> {
976 self.packer.finalize()
977 }
978}