1use std::{
2 num::NonZeroU32,
3 sync::{Arc, RwLock},
4 thread::scope,
5 time::{Duration, SystemTime},
6};
7
8use bytes::{Bytes, BytesMut};
9use crossbeam_channel::{Receiver, Sender, bounded};
10use integer_sqrt::IntegerSquareRoot;
11use jiff::Timestamp;
12use log::warn;
13use pariter::IteratorExt;
14
15use crate::{
16 Progress,
17 backend::{
18 FileType,
19 decrypt::{DecryptFullBackend, DecryptWriteBackend},
20 },
21 blob::{BlobId, BlobLocations, BlobType},
22 crypto::{CryptoKey, hasher::hash},
23 error::{ErrorKind, RusticError, RusticResult},
24 index::{IndexEntry, indexer::SharedIndexer},
25 repofile::{
26 configfile::ConfigFile,
27 indexfile::{IndexBlob, IndexPack},
28 packfile::{PackHeaderLength, PackHeaderRef, PackId},
29 snapshotfile::SnapshotSummary,
30 },
31};
32
33#[derive(thiserror::Error, Debug, displaydoc::Display)]
35#[non_exhaustive]
36pub enum PackerErrorKind {
37 Conversion {
39 to: &'static str,
40 from: &'static str,
41 source: std::num::TryFromIntError,
42 },
43 SendingCrossbeamMessage {
45 id: BlobId,
46 data: Bytes,
47 source: crossbeam_channel::SendError<(Bytes, BlobId)>,
48 },
49 SendingCrossbeamDataMessage {
51 data: Bytes,
52 index_pack: Box<IndexPack>,
53 source: crossbeam_channel::SendError<(Bytes, IndexPack)>,
54 },
55}
56
57pub(crate) type PackerResult<T> = Result<T, Box<PackerErrorKind>>;
58
59pub(super) mod constants {
60 use std::time::Duration;
61
62 pub(super) const KB: u32 = 1024;
64 pub(super) const MB: u32 = 1024 * KB;
66 pub(super) const MAX_SIZE: u32 = 4076 * MB;
68 pub(super) const MAX_COUNT: u32 = 10_000;
70 pub(super) const MAX_AGE: Duration = Duration::from_secs(300);
72}
73
74#[derive(Debug, Clone, Copy)]
76pub struct PackSizer {
77 default_size: u32,
79 grow_factor: u32,
81 size_limit: u32,
83 current_size: u64,
85 min_packsize_tolerate_percent: u32,
87 max_packsize_tolerate_percent: u32,
89}
90
91impl PackSizer {
92 #[must_use]
104 pub fn from_config(config: &ConfigFile, blob_type: BlobType, current_size: u64) -> Self {
105 let (default_size, grow_factor, size_limit) = config.packsize(blob_type);
106 let (min_packsize_tolerate_percent, max_packsize_tolerate_percent) =
107 config.packsize_ok_percents();
108 Self {
109 default_size,
110 grow_factor,
111 size_limit,
112 current_size,
113 min_packsize_tolerate_percent,
114 max_packsize_tolerate_percent,
115 }
116 }
117
118 #[must_use]
128 pub fn fixed(size: u32) -> Self {
129 Self {
130 default_size: size,
131 grow_factor: 0,
132 size_limit: size,
133 current_size: 0,
134 min_packsize_tolerate_percent: 100,
135 max_packsize_tolerate_percent: 100,
136 }
137 }
138
139 #[must_use]
141 #[allow(clippy::cast_possible_truncation)]
142 pub fn pack_size(&self) -> u32 {
143 let size = if self.grow_factor == 0 {
144 self.default_size
145 } else {
146 self.current_size.integer_sqrt() as u32 * self.grow_factor + self.default_size
150 };
151 size.min(self.size_limit).min(constants::MAX_SIZE)
152 }
153
154 #[must_use]
160 pub fn size_ok(&self, size: u32) -> bool {
161 !self.is_too_small(size) && !self.is_too_large(size)
162 }
163
164 #[must_use]
170 pub fn is_too_small(&self, size: u32) -> bool {
171 let target_size = self.pack_size();
172 u64::from(size) * 100
174 < u64::from(target_size) * u64::from(self.min_packsize_tolerate_percent)
175 }
176
177 #[must_use]
183 pub fn is_too_large(&self, size: u32) -> bool {
184 let target_size = self.pack_size();
185 u64::from(size) * 100
187 > u64::from(target_size) * u64::from(self.max_packsize_tolerate_percent)
188 }
189
190 pub fn add_size(&mut self, added: u32) {
200 self.current_size += u64::from(added);
201 }
202}
203
204#[allow(missing_debug_implementations)]
210#[allow(clippy::struct_field_names)]
211#[derive(Clone)]
212pub struct Packer<BE: DecryptWriteBackend> {
213 raw_packer: Arc<RwLock<RawPacker<BE>>>,
217 indexer: SharedIndexer<BE>,
219 sender: Sender<(Bytes, BlobId)>,
221 finish: Receiver<RusticResult<PackerStats>>,
223}
224
225impl<BE: DecryptWriteBackend> Packer<BE> {
226 #[allow(clippy::unnecessary_wraps)]
245 pub fn new(
246 be: BE,
247 blob_type: BlobType,
248 indexer: SharedIndexer<BE>,
249 pack_sizer: PackSizer,
250 ) -> RusticResult<Self> {
251 let raw_packer = Arc::new(RwLock::new(RawPacker::new(
252 be.clone(),
253 blob_type,
254 indexer.clone(),
255 pack_sizer,
256 )));
257
258 let (tx, rx) = bounded(0);
259 let (finish_tx, finish_rx) = bounded::<RusticResult<PackerStats>>(0);
260 let packer = Self {
261 raw_packer: raw_packer.clone(),
262 indexer: indexer.clone(),
263 sender: tx,
264 finish: finish_rx,
265 };
266
267 let _join_handle = std::thread::spawn(move || {
268 scope(|scope| {
269 let status = rx
270 .into_iter()
271 .readahead_scoped(scope)
272 .filter(|(_, id)| !indexer.read().unwrap().has(id))
274 .filter(|(_, id)| !raw_packer.read().unwrap().has(id))
275 .readahead_scoped(scope)
276 .parallel_map_scoped(scope, |(data, id): (Bytes, BlobId)| {
277 let (data, data_len, uncompressed_length) = be.process_data(&data)?;
278 Ok((data, id, u64::from(data_len), uncompressed_length))
279 })
280 .readahead_scoped(scope)
281 .filter(|res| {
284 res.as_ref()
285 .map_or_else(|_| true, |(_, id, _, _)| !indexer.read().unwrap().has(id))
286 })
287 .try_for_each(|item: RusticResult<_>| -> RusticResult<()> {
288 let (data, id, data_len, ul) = item?;
289 raw_packer
290 .write()
291 .unwrap()
292 .add_raw(&data, &id, data_len, ul)
293 })
294 .and_then(|()| raw_packer.write().unwrap().finalize());
295 _ = finish_tx.send(status);
296 });
297 });
298
299 Ok(packer)
300 }
301
302 pub fn add(&self, data: Bytes, id: BlobId) -> RusticResult<()> {
313 self.sender.send((data, id)).map_err(|err| {
314 RusticError::with_source(
315 ErrorKind::Internal,
316 "Sending crossbeam message failed: `id`: `{id}`",
317 err,
318 )
319 .ask_report()
320 .attach_context("id", id.to_hex().to_string())
321 })?;
322 Ok(())
323 }
324
325 fn add_raw(
340 &self,
341 data: &[u8],
342 id: &BlobId,
343 data_len: u64,
344 uncompressed_length: Option<NonZeroU32>,
345 ) -> RusticResult<()> {
346 if self.indexer.read().unwrap().has(id) {
348 Ok(())
349 } else {
350 self.raw_packer
351 .write()
352 .unwrap()
353 .add_raw(data, id, data_len, uncompressed_length)
354 }
355 }
356
357 pub fn finalize(self) -> RusticResult<PackerStats> {
363 drop(self.sender);
365 self.finish
367 .recv()
368 .expect("Should be able to receive from channel to finalize packer.")
369 }
370}
371
372#[derive(Default, Debug, Clone, Copy)]
374pub struct PackerStats {
375 blobs: u64,
377 data: u64,
379 data_packed: u64,
381}
382
383impl PackerStats {
384 pub fn apply(self, summary: &mut SnapshotSummary, tpe: BlobType) {
395 summary.data_added += self.data;
396 summary.data_added_packed += self.data_packed;
397 match tpe {
398 BlobType::Tree => {
399 summary.tree_blobs += self.blobs;
400 summary.data_added_trees += self.data;
401 summary.data_added_trees_packed += self.data_packed;
402 }
403 BlobType::Data => {
404 summary.data_blobs += self.blobs;
405 summary.data_added_files += self.data;
406 summary.data_added_files_packed += self.data_packed;
407 }
408 }
409 }
410}
411
412#[allow(missing_debug_implementations, clippy::module_name_repetitions)]
418pub(crate) struct RawPacker<BE: DecryptWriteBackend> {
419 be: BE,
421 blob_type: BlobType,
423 file: BytesMut,
425 size: u32,
427 count: u32,
429 created: SystemTime,
431 index: IndexPack,
433 file_writer: Option<Actor>,
435 pack_sizer: PackSizer,
437 stats: PackerStats,
439}
440
441impl<BE: DecryptWriteBackend> RawPacker<BE> {
442 fn new(be: BE, blob_type: BlobType, indexer: SharedIndexer<BE>, pack_sizer: PackSizer) -> Self {
456 let file_writer = Some(Actor::new(
457 FileWriterHandle {
458 be: be.clone(),
459 indexer,
460 cacheable: blob_type.is_cacheable(),
461 },
462 1,
463 1,
464 ));
465
466 Self {
467 be,
468 blob_type,
469 file: BytesMut::new(),
470 size: 0,
471 count: 0,
472 created: SystemTime::now(),
473 index: IndexPack::default(),
474 file_writer,
475 pack_sizer,
476 stats: PackerStats::default(),
477 }
478 }
479
480 fn finalize(&mut self) -> RusticResult<PackerStats> {
486 self.save().map_err(|err| {
487 err.overwrite_kind(ErrorKind::Internal)
488 .prepend_guidance_line("Failed to save packfile. Data may be lost.")
489 .ask_report()
490 })?;
491
492 self.file_writer.take().unwrap().finalize()?;
493
494 Ok(std::mem::take(&mut self.stats))
495 }
496
497 fn write_data(&mut self, data: &[u8]) -> PackerResult<u32> {
507 let len = data
508 .len()
509 .try_into()
510 .map_err(|err| PackerErrorKind::Conversion {
511 to: "u32",
512 from: "usize",
513 source: err,
514 })?;
515 self.file.extend_from_slice(data);
516 self.size += len;
517 Ok(len)
518 }
519
520 fn add_raw(
534 &mut self,
535 data: &[u8],
536 id: &BlobId,
537 data_len: u64,
538 uncompressed_length: Option<NonZeroU32>,
539 ) -> RusticResult<()> {
540 if self.has(id) {
541 return Ok(());
542 }
543 self.stats.blobs += 1;
544
545 self.stats.data += data_len;
546
547 let data_len_packed: u64 = data.len().try_into().map_err(|err| {
548 RusticError::with_source(
549 ErrorKind::Internal,
550 "Failed to convert data length `{length}` to u64.",
551 err,
552 )
553 .attach_context("length", data.len().to_string())
554 })?;
555
556 self.stats.data_packed += data_len_packed;
557
558 let size_limit = self.pack_sizer.pack_size();
559
560 let offset = self.size;
561
562 let len = self.write_data(data).map_err(|err| {
563 RusticError::with_source(
564 ErrorKind::Internal,
565 "Failed to write data to packfile for blob `{id}`.",
566 err,
567 )
568 .attach_context("id", id.to_string())
569 .attach_context("size_limit", size_limit.to_string())
570 .attach_context("data_length_packed", data_len_packed.to_string())
571 })?;
572
573 self.index
574 .add(*id, self.blob_type, offset, len, uncompressed_length);
575
576 self.count += 1;
577
578 let elapsed = self.created.elapsed().unwrap_or_else(|err| {
580 warn!("couldn't get elapsed time from system time: {err:?}");
581 Duration::ZERO
582 });
583
584 if self.count >= constants::MAX_COUNT
585 || self.size >= size_limit
586 || elapsed >= constants::MAX_AGE
587 {
588 self.pack_sizer.add_size(self.index.pack_size());
589 self.save()?;
590 self.size = 0;
591 self.count = 0;
592 self.created = SystemTime::now();
593 }
594 Ok(())
595 }
596
597 fn write_header(&mut self) -> RusticResult<()> {
604 let data = PackHeaderRef::from_index_pack(&self.index)
606 .to_binary()
607 .map_err(|err| -> Box<RusticError> {
608 RusticError::with_source(
609 ErrorKind::Internal,
610 "Failed to convert pack header `{index_pack_id}` to binary representation.",
611 err,
612 )
613 .attach_context("index_pack_id", self.index.id.to_string())
614 })?;
615
616 let data = self.be.key().encrypt_data(&data)?;
618
619 let headerlen: u32 = data.len().try_into().map_err(|err| {
620 RusticError::with_source(
621 ErrorKind::Internal,
622 "Failed to convert header length `{length}` to u32.",
623 err,
624 )
625 .attach_context("length", data.len().to_string())
626 })?;
627
628 _ = self.write_data(&data).map_err(|err| {
630 RusticError::with_source(
631 ErrorKind::Internal,
632 "Failed to write header with length `{length}` to packfile.",
633 err,
634 )
635 .attach_context("length", headerlen.to_string())
636 })?;
637
638 let binary_repr = PackHeaderLength::from_u32(headerlen)
640 .to_binary()
641 .map_err(|err| {
642 RusticError::with_source(
643 ErrorKind::Internal,
644 "Failed to convert header length `{length}` to binary representation.",
645 err,
646 )
647 .attach_context("length", headerlen.to_string())
648 })?;
649
650 _ = self.write_data(&binary_repr).map_err(|err| {
652 RusticError::with_source(
653 ErrorKind::Internal,
654 "Failed to write header length `{length}` to packfile.",
655 err,
656 )
657 .attach_context("length", headerlen.to_string())
658 })?;
659
660 Ok(())
661 }
662
663 fn save(&mut self) -> RusticResult<()> {
674 if self.size == 0 {
675 return Ok(());
676 }
677
678 self.write_header()?;
679
680 let index = std::mem::take(&mut self.index);
682 let file = std::mem::replace(&mut self.file, BytesMut::new());
683 self.file_writer
684 .as_ref()
685 .unwrap()
686 .send((file.into(), index))
687 .map_err(|err| {
688 RusticError::with_source(
689 ErrorKind::Internal,
690 "Failed to send packfile to file writer.",
691 err,
692 )
693 })?;
694
695 Ok(())
696 }
697
698 fn has(&self, id: &BlobId) -> bool {
699 self.index.blobs.iter().any(|b| &b.id == id)
700 }
701}
702
703#[derive(Clone)]
708pub(crate) struct FileWriterHandle<BE: DecryptWriteBackend> {
709 be: BE,
711 indexer: SharedIndexer<BE>,
713 cacheable: bool,
715}
716
717impl<BE: DecryptWriteBackend> FileWriterHandle<BE> {
718 fn process(&self, load: (Bytes, PackId, IndexPack)) -> RusticResult<IndexPack> {
720 let (file, id, mut index) = load;
721 index.id = id;
722 self.be
723 .write_bytes(FileType::Pack, &id, self.cacheable, file)?;
724 index.time = Some(Timestamp::now());
725 Ok(index)
726 }
727
728 fn index(&self, index: IndexPack) -> RusticResult<()> {
729 self.indexer.write().unwrap().add(index)?;
730 Ok(())
731 }
732}
733
734pub(crate) struct Actor {
736 sender: Sender<(Bytes, IndexPack)>,
738 finish: Receiver<RusticResult<()>>,
740}
741
742impl Actor {
743 fn new<BE: DecryptWriteBackend>(
755 fwh: FileWriterHandle<BE>,
756 queue_len: usize,
757 _par: usize,
758 ) -> Self {
759 let (tx, rx) = bounded(queue_len);
760 let (finish_tx, finish_rx) = bounded::<RusticResult<()>>(0);
761
762 let _join_handle = std::thread::spawn(move || {
763 scope(|scope| {
764 let status = rx
765 .into_iter()
766 .readahead_scoped(scope)
767 .map(|(file, index): (Bytes, IndexPack)| {
768 let id = hash(&file);
769 (file, PackId::from(id), index)
770 })
771 .readahead_scoped(scope)
772 .map(|load| fwh.process(load))
773 .readahead_scoped(scope)
774 .try_for_each(|index| fwh.index(index?));
775 _ = finish_tx.send(status);
776 });
777 });
778
779 Self {
780 sender: tx,
781 finish: finish_rx,
782 }
783 }
784
785 fn send(&self, load: (Bytes, IndexPack)) -> PackerResult<()> {
795 self.sender.send(load.clone()).map_err(|err| {
796 PackerErrorKind::SendingCrossbeamDataMessage {
797 data: load.0,
798 index_pack: Box::new(load.1),
799 source: err,
800 }
801 })?;
802 Ok(())
803 }
804
805 fn finalize(self) -> RusticResult<()> {
811 drop(self.sender);
813 self.finish.recv().unwrap()
815 }
816}
817
818#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
819pub struct CopyPackBlobs {
820 pub pack_id: PackId,
821 pub locations: BlobLocations<BlobId>,
822}
823
824impl CopyPackBlobs {
825 pub fn from_index_blob(pack_id: PackId, blob: IndexBlob) -> Self {
826 Self {
827 pack_id,
828 locations: BlobLocations::from_blob_location(blob.location, blob.id),
829 }
830 }
831
832 pub fn from_index_entry(entry: IndexEntry, id: BlobId) -> Self {
833 Self {
834 pack_id: entry.pack,
835 locations: BlobLocations::from_blob_location(entry.location, id),
836 }
837 }
838
839 #[allow(clippy::result_large_err)]
840 pub fn coalesce(self, other: Self) -> Result<Self, (Self, Self)> {
842 if self.pack_id == other.pack_id && self.locations.can_coalesce(&other.locations) {
843 Ok(Self {
844 pack_id: self.pack_id,
845 locations: self.locations.append(other.locations),
846 })
847 } else {
848 Err((self, other))
849 }
850 }
851}
852
853#[allow(missing_debug_implementations)]
859pub struct BlobCopier<BE>
860where
861 BE: DecryptFullBackend,
862{
863 be_src: BE,
865 packer: Packer<BE>,
867 size_limit: u32,
869 blob_type: BlobType,
871}
872
873impl<BE: DecryptFullBackend> BlobCopier<BE> {
874 pub fn new(
892 be_src: BE,
893 be_dst: BE,
894 blob_type: BlobType,
895 indexer: SharedIndexer<BE>,
896 pack_sizer: PackSizer,
897 ) -> RusticResult<Self> {
898 let packer = Packer::new(be_dst, blob_type, indexer, pack_sizer)?;
899 let size_limit = pack_sizer.pack_size();
900 Ok(Self {
901 be_src,
902 packer,
903 size_limit,
904 blob_type,
905 })
906 }
907
908 pub fn copy_fast(&self, pack_blobs: CopyPackBlobs, p: &Progress) -> RusticResult<()> {
920 let offset = pack_blobs.locations.offset;
921 let data = self.be_src.read_partial(
922 FileType::Pack,
923 &pack_blobs.pack_id,
924 self.blob_type.is_cacheable(),
925 offset,
926 pack_blobs.locations.length,
927 )?;
928
929 for (blob, blob_id) in pack_blobs.locations.blobs {
931 let start = usize::try_from(blob.offset - offset)
932 .expect("convert from u32 to usize should not fail!");
933 let end = usize::try_from(blob.offset + blob.length - offset)
934 .expect("convert from u32 to usize should not fail!");
935 self.packer
936 .add_raw(
937 &data[start..end],
938 &blob_id,
939 u64::from(blob.length),
940 blob.uncompressed_length,
941 )
942 .map_err(|err| {
943 err.overwrite_kind(ErrorKind::Internal)
944 .prepend_guidance_line(
945 "Failed to fast-add (unchecked) blob `{blob_id}` to packfile.",
946 )
947 .attach_context("blob_id", blob_id.to_string())
948 })?;
949 p.inc(blob.length.into());
950 }
951
952 Ok(())
953 }
954
955 pub fn copy(&self, pack_blobs: CopyPackBlobs, p: &Progress) -> RusticResult<()> {
967 let offset = pack_blobs.locations.offset;
968 let read_data = self.be_src.read_partial(
969 FileType::Pack,
970 &pack_blobs.pack_id,
971 self.blob_type.is_cacheable(),
972 offset,
973 pack_blobs.locations.length,
974 )?;
975
976 for (blob, blob_id) in pack_blobs.locations.blobs {
978 let start = usize::try_from(blob.offset - offset)
979 .expect("convert from u32 to usize should not fail!");
980 let end = usize::try_from(blob.offset + blob.length - offset)
981 .expect("convert from u32 to usize should not fail!");
982 let data = self
983 .be_src
984 .read_encrypted_from_partial(&read_data[start..end], blob.uncompressed_length)?;
985
986 self.packer.add(data, blob_id).map_err(|err| {
987 RusticError::with_source(
988 ErrorKind::Internal,
989 "Failed to add blob to packfile.",
990 err,
991 )
992 })?;
993 p.inc(blob.length.into());
994 }
995
996 Ok(())
997 }
998
999 pub fn finalize(self) -> RusticResult<PackerStats> {
1001 self.packer.finalize()
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::*;
1008 use insta::assert_ron_snapshot;
1009
1010 #[test]
1011 fn pack_sizers() {
1012 let config = ConfigFile {
1013 treepack_size_limit: Some(5 * 1024 * 1024),
1014 ..Default::default()
1015 };
1016 let mut pack_sizers = [
1017 PackSizer::from_config(&config, BlobType::Tree, 0),
1018 PackSizer::from_config(&config, BlobType::Data, 0),
1019 PackSizer::fixed(12345),
1020 ];
1021
1022 let output: Vec<_> = [
1023 0,
1024 10,
1025 1000,
1026 100_000,
1027 100_000,
1028 100_000,
1029 10_000_000,
1030 10_000_000,
1031 1_000_000_000,
1032 1_000_000_000,
1033 ]
1034 .into_iter()
1035 .map(|i| {
1036 pack_sizers
1037 .iter_mut()
1038 .map(|ps| {
1039 ps.add_size(i);
1040 (ps.current_size, ps.pack_size())
1041 })
1042 .collect::<Vec<_>>()
1043 })
1044 .collect();
1045
1046 assert_ron_snapshot!(output);
1047 }
1048}