1use std::result::Result;
2
3use more_asserts::{debug_assert_le, debug_assert_lt};
4use xet_core_structures::MerkleHashMap;
5use xet_core_structures::merklehash::{MerkleHash, file_hash};
6use xet_core_structures::metadata_shard::file_structs::{
7 FileDataSequenceEntry, FileDataSequenceHeader, FileMetadataExt, FileVerificationEntry, MDBFileInfo,
8};
9use xet_core_structures::metadata_shard::hash_is_global_dedup_eligible;
10
11use super::constants::{XORB_CUT_THRESHOLD_BYTES, XORB_CUT_THRESHOLD_CHUNKS};
12use super::data_aggregator::DataAggregator;
13use super::dedup_metrics::DeduplicationMetrics;
14use super::defrag_prevention::DefragPrevention;
15use super::interface::DeduplicationDataInterface;
16use super::{Chunk, RawXorbData};
17use crate::progress_tracking::upload_tracking::FileXorbDependency;
18
19pub struct FileDeduper<DataInterfaceType: DeduplicationDataInterface> {
20 data_mng: DataInterfaceType,
21
22 file_id: u64,
24
25 new_data: Vec<Chunk>,
27
28 new_data_size: usize,
30
31 new_data_hash_lookup: MerkleHashMap<usize>,
33
34 chunk_hashes: Vec<(MerkleHash, u64)>,
36
37 file_info: Vec<FileDataSequenceEntry>,
39
40 internally_referencing_entries: Vec<usize>,
42
43 defrag_tracker: DefragPrevention,
45
46 min_spacing_between_global_dedup_queries: usize,
49
50 next_chunk_index_eligible_for_global_dedup_query: usize,
52
53 deduplication_metrics: DeduplicationMetrics,
55}
56
57impl<DataInterfaceType: DeduplicationDataInterface> FileDeduper<DataInterfaceType> {
58 pub fn new(data_manager: DataInterfaceType, file_id: u64) -> Self {
59 Self {
60 data_mng: data_manager,
61 file_id,
62 new_data: Vec::new(),
63 new_data_size: 0,
64 new_data_hash_lookup: MerkleHashMap::new(),
65 chunk_hashes: Vec::new(),
66 file_info: Vec::new(),
67 internally_referencing_entries: Vec::new(),
68 defrag_tracker: DefragPrevention::default(),
69 min_spacing_between_global_dedup_queries: 0,
70 next_chunk_index_eligible_for_global_dedup_query: 0,
71 deduplication_metrics: DeduplicationMetrics::default(),
72 }
73 }
74
75 pub async fn process_chunks(
76 &mut self,
77 chunks: &[Chunk],
78 ) -> Result<DeduplicationMetrics, DataInterfaceType::ErrorType> {
79 let mut dedup_metrics = DeduplicationMetrics::default();
81
82 let mut xorb_dependencies = Vec::new();
84
85 let global_chunk_index_start = self.chunk_hashes.len();
87
88 let chunk_hashes = Vec::from_iter(chunks.iter().map(|c| c.hash));
89
90 let mut deduped_blocks = vec![None; chunks.len()];
97
98 for first_pass in [true, false] {
102 let mut local_chunk_index = 0;
104 while local_chunk_index < chunks.len() {
105 let global_chunk_index = global_chunk_index_start + local_chunk_index;
106
107 if let Some((n_deduped, _, _)) = &deduped_blocks[local_chunk_index] {
109 local_chunk_index += n_deduped;
110 } else if let Some((n_deduped, fse, is_uploaded_shard)) =
111 self.data_mng.chunk_hash_dedup_query(&chunk_hashes[local_chunk_index..]).await?
112 {
113 if !first_pass {
114 dedup_metrics.deduped_chunks_by_global_dedup += n_deduped as u64;
117 dedup_metrics.deduped_bytes_by_global_dedup += fse.unpacked_segment_bytes as u64;
118 }
119
120 deduped_blocks[local_chunk_index] = Some((n_deduped, fse, is_uploaded_shard));
121 local_chunk_index += n_deduped;
122
123 } else {
131 if
133 first_pass
135 && (global_chunk_index == 0
137 || hash_is_global_dedup_eligible(&chunk_hashes[local_chunk_index]))
138 && global_chunk_index >= self.next_chunk_index_eligible_for_global_dedup_query
140 {
141 self.data_mng
142 .register_global_dedup_query(chunk_hashes[local_chunk_index])
143 .await?;
144
145 self.next_chunk_index_eligible_for_global_dedup_query =
146 global_chunk_index + self.min_spacing_between_global_dedup_queries;
147 }
148
149 local_chunk_index += 1;
150 }
151 }
152
153 let new_shards_added = self.data_mng.complete_global_dedup_queries().await?;
155
156 if !new_shards_added {
157 break;
158 }
159 }
160
161 let mut cur_idx = 0;
163
164 while cur_idx < chunks.len() {
165 let mut dedupe_query = deduped_blocks[cur_idx].take();
166
167 if dedupe_query.is_none() {
168 dedupe_query = self.dedup_query_against_local_data(&chunk_hashes[cur_idx..]);
171 }
172
173 if let Some((n_deduped, fse, is_external)) = dedupe_query {
174 dedup_metrics.deduped_chunks += n_deduped as u64;
175 dedup_metrics.deduped_bytes += fse.unpacked_segment_bytes as u64;
176 dedup_metrics.total_chunks += n_deduped as u64;
177 dedup_metrics.total_bytes += fse.unpacked_segment_bytes as u64;
178
179 if self.file_data_sequence_continues_current(&fse)
182 || self.defrag_tracker.allow_dedup_on_next_range(n_deduped)
183 {
184 if fse.xorb_hash != MerkleHash::marker() {
188 xorb_dependencies.push(FileXorbDependency {
189 file_id: self.file_id,
190 xorb_hash: fse.xorb_hash,
191 n_bytes: fse.unpacked_segment_bytes as u64,
192 is_external,
193 });
194 }
195
196 self.add_file_data_sequence_entry(fse, n_deduped);
198
199 cur_idx += n_deduped;
200 continue;
201 } else {
202 dedup_metrics.defrag_prevented_dedup_chunks += n_deduped as u64;
203 dedup_metrics.defrag_prevented_dedup_bytes += fse.unpacked_segment_bytes as u64;
204 }
205 }
206
207 let n_bytes = chunks[cur_idx].data.len();
209
210 dedup_metrics.total_chunks += 1;
211 dedup_metrics.total_bytes += n_bytes as u64;
212 dedup_metrics.new_bytes += n_bytes as u64;
213 dedup_metrics.new_chunks += 1;
214
215 if self.new_data_size + n_bytes > *XORB_CUT_THRESHOLD_BYTES
217 || self.new_data.len() + 1 > *XORB_CUT_THRESHOLD_CHUNKS
218 {
219 let new_xorb = self.cut_new_xorb();
220 xorb_dependencies.push(FileXorbDependency {
221 file_id: self.file_id,
222 xorb_hash: new_xorb.hash(),
223 n_bytes: new_xorb.num_bytes() as u64,
224 is_external: false,
225 });
226 self.data_mng.register_new_xorb(new_xorb).await?;
227 }
228
229 if !self.file_info.is_empty()
230 && self.file_info.last().unwrap().xorb_hash == MerkleHash::marker()
231 && self.file_info.last().unwrap().chunk_index_end as usize == self.new_data.len()
232 {
233 let last_entry = self.file_info.last_mut().unwrap();
236 last_entry.unpacked_segment_bytes += n_bytes as u32;
237 last_entry.chunk_index_end += 1;
238 self.defrag_tracker.increment_last_range_in_fragmentation_estimate(1);
239 } else {
240 let file_info_len = self.file_info.len();
244 self.internally_referencing_entries.push(file_info_len);
245 let chunk_idx = self.new_data.len();
246
247 self.file_info.push(FileDataSequenceEntry::new(
248 MerkleHash::marker(),
249 n_bytes,
250 chunk_idx,
251 chunk_idx + 1,
252 ));
253 self.defrag_tracker.add_range_to_fragmentation_estimate(1);
254 }
255
256 let chunk = chunks[cur_idx].clone();
257 self.new_data_size += chunk.data.len();
258 self.new_data_hash_lookup.insert(chunk.hash, self.new_data.len());
259 self.new_data.push(chunk);
260
261 cur_idx += 1;
263 }
264
265 self.deduplication_metrics.merge_in(&dedup_metrics);
266 self.chunk_hashes.extend(chunks.iter().map(|c| (c.hash, c.data.len() as u64)));
267
268 if !xorb_dependencies.is_empty() {
270 self.data_mng.register_xorb_dependencies(&xorb_dependencies).await;
271 }
272
273 Ok(dedup_metrics)
274 }
275
276 fn file_data_sequence_continues_current(&self, fse: &FileDataSequenceEntry) -> bool {
277 !self.file_info.is_empty()
278 && self.file_info.last().unwrap().xorb_hash == fse.xorb_hash
279 && self.file_info.last().unwrap().chunk_index_end == fse.chunk_index_start
280 }
281
282 fn add_file_data_sequence_entry(&mut self, fse: FileDataSequenceEntry, n_deduped: usize) {
285 if self.file_data_sequence_continues_current(&fse) {
288 let last_entry = self.file_info.last_mut().unwrap();
290 last_entry.unpacked_segment_bytes += fse.unpacked_segment_bytes;
291 last_entry.chunk_index_end = fse.chunk_index_end;
292
293 self.defrag_tracker.increment_last_range_in_fragmentation_estimate(n_deduped);
295 } else {
296 if fse.xorb_hash == MerkleHash::marker() {
298 self.internally_referencing_entries.push(self.file_info.len());
299 }
300 self.file_info.push(fse);
302 self.defrag_tracker.add_range_to_fragmentation_estimate(n_deduped);
303 }
304 }
305
306 fn cut_new_xorb(&mut self) -> RawXorbData {
308 let new_xorb = RawXorbData::from_chunks(&self.new_data[..], vec![0]);
310
311 let xorb_hash = new_xorb.hash();
312
313 for &idx in self.internally_referencing_entries.iter() {
316 let fse = &mut self.file_info[idx];
317 debug_assert_eq!(fse.xorb_hash, MerkleHash::marker());
318 debug_assert_lt!(fse.chunk_index_start as usize, self.new_data.len());
319 debug_assert_le!(fse.chunk_index_end as usize, self.new_data.len());
320
321 fse.xorb_hash = xorb_hash;
322 }
323
324 #[cfg(debug_assertions)]
325 {
326 for fse in self.file_info.iter() {
328 debug_assert_ne!(fse.xorb_hash, MerkleHash::marker());
329 }
330 }
331
332 self.new_data.clear();
334 self.new_data_hash_lookup.clear();
335 self.new_data_size = 0;
336 self.internally_referencing_entries.clear();
337
338 new_xorb
339 }
340
341 fn dedup_query_against_local_data(
344 &mut self,
345 chunks: &[MerkleHash],
346 ) -> Option<(usize, FileDataSequenceEntry, bool)> {
347 if let Some(&base_idx) = self.new_data_hash_lookup.get(&chunks[0]) {
350 let mut n_bytes = self.new_data[base_idx].data.len();
351
352 let mut end_idx = base_idx + 1;
353 for (i, chunk) in chunks.iter().enumerate().skip(1) {
354 if let Some(&idx) = self.new_data_hash_lookup.get(chunk)
355 && idx == base_idx + i
356 {
357 end_idx = idx + 1;
358 n_bytes += self.new_data[idx].data.len();
359 continue;
360 }
361 break;
362 }
363
364 Some((
365 end_idx - base_idx,
366 FileDataSequenceEntry::new(MerkleHash::marker(), n_bytes, base_idx, end_idx),
367 false,
368 ))
369 } else {
370 None
371 }
372 }
373
374 pub fn finalize(self, metadata_ext: Option<FileMetadataExt>) -> (MerkleHash, DataAggregator, DeduplicationMetrics) {
380 let file_hash = file_hash(&self.chunk_hashes);
381
382 let metadata = FileDataSequenceHeader::new(file_hash, self.file_info.len(), true, metadata_ext.is_some());
383
384 let mut chunk_idx = 0;
385
386 let verification = self
388 .file_info
389 .iter()
390 .map(|entry| {
391 let n_chunks = (entry.chunk_index_end - entry.chunk_index_start) as usize;
392 let chunk_hashes: Vec<_> = self.chunk_hashes[chunk_idx..chunk_idx + n_chunks]
393 .iter()
394 .map(|(hash, _)| *hash)
395 .collect();
396 let range_hash =
397 xet_core_structures::metadata_shard::chunk_verification::range_hash_from_chunks(&chunk_hashes);
398 chunk_idx += n_chunks;
399
400 FileVerificationEntry::new(range_hash)
401 })
402 .collect();
403
404 let fi = MDBFileInfo {
405 metadata,
406 segments: self.file_info,
407 verification,
408 metadata_ext,
409 };
410
411 let remaining_data = DataAggregator::new(self.new_data, fi, self.internally_referencing_entries, self.file_id);
412
413 (file_hash, remaining_data, self.deduplication_metrics)
414 }
415}