xet_data/deduplication/
data_aggregator.rs1use std::mem::take;
2
3use more_asserts::*;
4use xet_core_structures::merklehash::MerkleHash;
5use xet_core_structures::metadata_shard::file_structs::MDBFileInfo;
6
7use super::constants::{MAX_XORB_BYTES, MAX_XORB_CHUNKS};
8use super::{Chunk, RawXorbData};
9
10#[derive(Default, Debug)]
11pub struct DataAggregator {
12 pub chunks: Vec<Chunk>,
14
15 num_bytes: usize,
17
18 pub pending_file_info: Vec<(MDBFileInfo, Vec<usize>, u64)>,
28
29 pub file_boundaries: Vec<usize>,
32}
33
34impl DataAggregator {
35 pub(crate) fn new(
36 chunks: Vec<Chunk>,
37 pending_file_info: MDBFileInfo,
38 internally_referencing_entries: Vec<usize>,
39 file_id: u64,
40 ) -> Self {
41 let num_bytes = chunks.iter().map(|c| c.data.len()).sum();
42
43 let file_boundaries = if chunks.is_empty() { vec![] } else { vec![0] };
45
46 Self {
47 chunks,
48 num_bytes,
49 pending_file_info: vec![(pending_file_info, internally_referencing_entries, file_id)],
50 file_boundaries,
51 }
52 }
53
54 pub fn is_empty(&self) -> bool {
55 self.chunks.is_empty() && self.pending_file_info.is_empty()
56 }
57
58 pub fn num_chunks(&self) -> usize {
59 self.chunks.len()
60 }
61
62 pub fn num_bytes(&self) -> usize {
63 debug_assert_eq!(self.chunks.iter().map(|c| c.data.len()).sum::<usize>(), self.num_bytes);
64 self.num_bytes
65 }
66
67 pub fn finalize(mut self) -> (RawXorbData, Vec<(u64, MDBFileInfo, u64)>) {
71 let xorb_data = RawXorbData::from_chunks(&self.chunks, take(&mut self.file_boundaries));
73 let xorb_hash = xorb_data.hash();
74
75 debug_assert_le!(self.num_bytes(), *MAX_XORB_BYTES);
76 debug_assert_le!(self.num_chunks(), *MAX_XORB_CHUNKS);
77
78 let mut ret = vec![0u64; self.pending_file_info.len()];
79
80 for (f_idx, (fi, chunk_hash_indices_ref, _file_id)) in self.pending_file_info.iter_mut().enumerate() {
83 for &i in chunk_hash_indices_ref.iter() {
84 debug_assert_eq!(fi.segments[i].xorb_hash, MerkleHash::marker());
85 fi.segments[i].xorb_hash = xorb_hash;
86 ret[f_idx] += fi.segments[i].unpacked_segment_bytes as u64;
87 }
88
89 chunk_hash_indices_ref.clear();
91
92 #[cfg(debug_assertions)]
93 {
94 for fse in fi.segments.iter() {
96 debug_assert_ne!(fse.xorb_hash, MerkleHash::marker());
97 }
98 }
99 }
100
101 (
102 xorb_data,
103 self.pending_file_info
104 .into_iter()
105 .zip(ret)
106 .map(|((fi, _, file_id), byte_count)| (file_id, fi, byte_count))
107 .collect(),
108 )
109 }
110
111 pub fn merge_in(&mut self, mut other: DataAggregator) {
112 debug_assert_le!(self.num_bytes() + other.num_bytes(), *MAX_XORB_BYTES);
113 debug_assert_le!(self.num_chunks() + other.num_chunks(), *MAX_XORB_BYTES);
114
115 let shift = self.chunks.len() as u32;
116 self.chunks.append(&mut other.chunks);
117 self.num_bytes += other.num_bytes;
118
119 for file_info in other.pending_file_info.iter_mut() {
121 for fi in file_info.0.segments.iter_mut() {
122 if fi.xorb_hash == MerkleHash::marker() {
126 fi.chunk_index_start += shift;
127 fi.chunk_index_end += shift;
128 }
129 }
130 }
131
132 self.pending_file_info.append(&mut other.pending_file_info);
133
134 self.file_boundaries
136 .extend(other.file_boundaries.into_iter().map(|idx| idx + (shift as usize)));
137 }
138}