Skip to main content

xet_data/deduplication/
file_deduplication.rs

1use std::result::Result;
2
3use more_asserts::{debug_assert_le, debug_assert_lt};
4use xet_core_structures::MerkleHashMap;
5use xet_core_structures::merklehash::{MerkleHash, file_hash};
6use xet_core_structures::metadata_shard::file_structs::{
7    FileDataSequenceEntry, FileDataSequenceHeader, FileMetadataExt, FileVerificationEntry, MDBFileInfo,
8};
9use xet_core_structures::metadata_shard::hash_is_global_dedup_eligible;
10
11use super::constants::{XORB_CUT_THRESHOLD_BYTES, XORB_CUT_THRESHOLD_CHUNKS};
12use super::data_aggregator::DataAggregator;
13use super::dedup_metrics::DeduplicationMetrics;
14use super::defrag_prevention::DefragPrevention;
15use super::interface::DeduplicationDataInterface;
16use super::{Chunk, RawXorbData};
17use crate::progress_tracking::upload_tracking::FileXorbDependency;
18
19pub struct FileDeduper<DataInterfaceType: DeduplicationDataInterface> {
20    data_mng: DataInterfaceType,
21
22    /// A tag for tracking the file externally
23    file_id: u64,
24
25    /// The new data here that hasn't yet been deduplicated.
26    new_data: Vec<Chunk>,
27
28    /// The amount of new data we have.
29    new_data_size: usize,
30
31    /// A hashmap allowing deduplication against the current chunk.
32    new_data_hash_lookup: MerkleHashMap<usize>,
33
34    /// The current chunk hashes for this file.
35    chunk_hashes: Vec<(MerkleHash, u64)>,
36
37    /// The current file data entries.
38    file_info: Vec<FileDataSequenceEntry>,
39
40    /// The list of indices in which the file entry references the current data
41    internally_referencing_entries: Vec<usize>,
42
43    /// Tracking the defragmentation of the file specification.
44    defrag_tracker: DefragPrevention,
45
46    /// The minimum number of chunks to wait for between generating global
47    /// dedup queries.  Can be changed by testing code.
48    min_spacing_between_global_dedup_queries: usize,
49
50    /// The next chunk index that is eligible for global dedup queries
51    next_chunk_index_eligible_for_global_dedup_query: usize,
52
53    /// The tracked deduplication metrics for this file.
54    deduplication_metrics: DeduplicationMetrics,
55}
56
57impl<DataInterfaceType: DeduplicationDataInterface> FileDeduper<DataInterfaceType> {
58    pub fn new(data_manager: DataInterfaceType, file_id: u64) -> Self {
59        Self {
60            data_mng: data_manager,
61            file_id,
62            new_data: Vec::new(),
63            new_data_size: 0,
64            new_data_hash_lookup: MerkleHashMap::new(),
65            chunk_hashes: Vec::new(),
66            file_info: Vec::new(),
67            internally_referencing_entries: Vec::new(),
68            defrag_tracker: DefragPrevention::default(),
69            min_spacing_between_global_dedup_queries: 0,
70            next_chunk_index_eligible_for_global_dedup_query: 0,
71            deduplication_metrics: DeduplicationMetrics::default(),
72        }
73    }
74
75    pub async fn process_chunks(
76        &mut self,
77        chunks: &[Chunk],
78    ) -> Result<DeduplicationMetrics, DataInterfaceType::ErrorType> {
79        // track the different deduplication statistics.
80        let mut dedup_metrics = DeduplicationMetrics::default();
81
82        // Track new xorb dependencies
83        let mut xorb_dependencies = Vec::new();
84
85        // All the previous chunk are stored here, use it as the global chunk index start.
86        let global_chunk_index_start = self.chunk_hashes.len();
87
88        let chunk_hashes = Vec::from_iter(chunks.iter().map(|c| c.hash));
89
90        // Now, parallelize the querying of potential new shards on the server end with
91        // querying for dedup information of the chunks, which are the two most expensive
92        // parts of the process.  Then when we go into the next section, everything is essentially
93        // a local lookup table so the remaining work should be quite fast.
94
95        // This holds the results of the dedup queries.
96        let mut deduped_blocks = vec![None; chunks.len()];
97
98        // Do at most two passes; 1) with global dedup querying possibly enabled, and 2) possibly rerunning
99        // if the global dedup query came back with a new shard.
100
101        for first_pass in [true, false] {
102            // Now, go through and test all of these for whether or not they can be deduplicated.
103            let mut local_chunk_index = 0;
104            while local_chunk_index < chunks.len() {
105                let global_chunk_index = global_chunk_index_start + local_chunk_index;
106
107                // First check to see if we don't already know what these blocks are from a previous pass.
108                if let Some((n_deduped, _, _)) = &deduped_blocks[local_chunk_index] {
109                    local_chunk_index += n_deduped;
110                } else if let Some((n_deduped, fse, is_uploaded_shard)) =
111                    self.data_mng.chunk_hash_dedup_query(&chunk_hashes[local_chunk_index..]).await?
112                {
113                    if !first_pass {
114                        // This means new shards were discovered; so these are global dedup elegible.  We'll record
115                        // the rest later on
116                        dedup_metrics.deduped_chunks_by_global_dedup += n_deduped as u64;
117                        dedup_metrics.deduped_bytes_by_global_dedup += fse.unpacked_segment_bytes as u64;
118                    }
119
120                    deduped_blocks[local_chunk_index] = Some((n_deduped, fse, is_uploaded_shard));
121                    local_chunk_index += n_deduped;
122
123                    // Now see if we can issue a background query against the global dedup server to see if
124                    // any shards are present that give us more dedup ability.
125                    //
126                    // If we've already queried these against the global dedup, then we can proceed on without
127                    // re-querying anything.  Only doing this on the first pass also guarantees that in the case of
128                    // errors on shard retrieval, we don't get stuck in a loop trying to download
129                    // and reprocess.
130                } else {
131                    // Check for global deduplication.
132                    if
133                    // Only do this query on the first pass.
134                    first_pass
135                        // The first hash of every file and those matching a pattern are eligible. 
136                        && (global_chunk_index == 0
137                            || hash_is_global_dedup_eligible(&chunk_hashes[local_chunk_index]))
138                        // Limit by enforcing at least 4MB between chunk queries.
139                        && global_chunk_index >= self.next_chunk_index_eligible_for_global_dedup_query
140                    {
141                        self.data_mng
142                            .register_global_dedup_query(chunk_hashes[local_chunk_index])
143                            .await?;
144
145                        self.next_chunk_index_eligible_for_global_dedup_query =
146                            global_chunk_index + self.min_spacing_between_global_dedup_queries;
147                    }
148
149                    local_chunk_index += 1;
150                }
151            }
152
153            // Now, see if any of the chunk queries have completed.
154            let new_shards_added = self.data_mng.complete_global_dedup_queries().await?;
155
156            if !new_shards_added {
157                break;
158            }
159        }
160
161        // Now, go through and process the result of the query.
162        let mut cur_idx = 0;
163
164        while cur_idx < chunks.len() {
165            let mut dedupe_query = deduped_blocks[cur_idx].take();
166
167            if dedupe_query.is_none() {
168                // In this case, do a second query against the local xorb to see if we're just repeating previous
169                // information in the xorb.
170                dedupe_query = self.dedup_query_against_local_data(&chunk_hashes[cur_idx..]);
171            }
172
173            if let Some((n_deduped, fse, is_external)) = dedupe_query {
174                dedup_metrics.deduped_chunks += n_deduped as u64;
175                dedup_metrics.deduped_bytes += fse.unpacked_segment_bytes as u64;
176                dedup_metrics.total_chunks += n_deduped as u64;
177                dedup_metrics.total_bytes += fse.unpacked_segment_bytes as u64;
178
179                // check the fragmentation state and if it is pretty fragmented,
180                // we skip dedupe.  However, continuing the previous is always fine.
181                if self.file_data_sequence_continues_current(&fse)
182                    || self.defrag_tracker.allow_dedup_on_next_range(n_deduped)
183                {
184                    // Report this as a dependency
185                    // The case where it's dededuped against the present xorb is handled
186                    // when the xorb gets cut and we know the hash.
187                    if fse.xorb_hash != MerkleHash::marker() {
188                        xorb_dependencies.push(FileXorbDependency {
189                            file_id: self.file_id,
190                            xorb_hash: fse.xorb_hash,
191                            n_bytes: fse.unpacked_segment_bytes as u64,
192                            is_external,
193                        });
194                    }
195
196                    // We found one or more chunk hashes present
197                    self.add_file_data_sequence_entry(fse, n_deduped);
198
199                    cur_idx += n_deduped;
200                    continue;
201                } else {
202                    dedup_metrics.defrag_prevented_dedup_chunks += n_deduped as u64;
203                    dedup_metrics.defrag_prevented_dedup_bytes += fse.unpacked_segment_bytes as u64;
204                }
205            }
206
207            // Okay, now we need to add new data.
208            let n_bytes = chunks[cur_idx].data.len();
209
210            dedup_metrics.total_chunks += 1;
211            dedup_metrics.total_bytes += n_bytes as u64;
212            dedup_metrics.new_bytes += n_bytes as u64;
213            dedup_metrics.new_chunks += 1;
214
215            // Do we need to cut a new xorb first?
216            if self.new_data_size + n_bytes > *XORB_CUT_THRESHOLD_BYTES
217                || self.new_data.len() + 1 > *XORB_CUT_THRESHOLD_CHUNKS
218            {
219                let new_xorb = self.cut_new_xorb();
220                xorb_dependencies.push(FileXorbDependency {
221                    file_id: self.file_id,
222                    xorb_hash: new_xorb.hash(),
223                    n_bytes: new_xorb.num_bytes() as u64,
224                    is_external: false,
225                });
226                self.data_mng.register_new_xorb(new_xorb).await?;
227            }
228
229            if !self.file_info.is_empty()
230                && self.file_info.last().unwrap().xorb_hash == MerkleHash::marker()
231                && self.file_info.last().unwrap().chunk_index_end as usize == self.new_data.len()
232            {
233                // This is the next chunk in the CAS block we're building,
234                // in which case we can just modify the previous entry.
235                let last_entry = self.file_info.last_mut().unwrap();
236                last_entry.unpacked_segment_bytes += n_bytes as u32;
237                last_entry.chunk_index_end += 1;
238                self.defrag_tracker.increment_last_range_in_fragmentation_estimate(1);
239            } else {
240                // This block is unrelated to the previous one.
241                // This chunk will get the CAS hash updated when the local CAS block
242                // is full and registered.
243                let file_info_len = self.file_info.len();
244                self.internally_referencing_entries.push(file_info_len);
245                let chunk_idx = self.new_data.len();
246
247                self.file_info.push(FileDataSequenceEntry::new(
248                    MerkleHash::marker(),
249                    n_bytes,
250                    chunk_idx,
251                    chunk_idx + 1,
252                ));
253                self.defrag_tracker.add_range_to_fragmentation_estimate(1);
254            }
255
256            let chunk = chunks[cur_idx].clone();
257            self.new_data_size += chunk.data.len();
258            self.new_data_hash_lookup.insert(chunk.hash, self.new_data.len());
259            self.new_data.push(chunk);
260
261            // Next round.
262            cur_idx += 1;
263        }
264
265        self.deduplication_metrics.merge_in(&dedup_metrics);
266        self.chunk_hashes.extend(chunks.iter().map(|c| (c.hash, c.data.len() as u64)));
267
268        // Register the xorb dependencies as needed.
269        if !xorb_dependencies.is_empty() {
270            self.data_mng.register_xorb_dependencies(&xorb_dependencies).await;
271        }
272
273        Ok(dedup_metrics)
274    }
275
276    fn file_data_sequence_continues_current(&self, fse: &FileDataSequenceEntry) -> bool {
277        !self.file_info.is_empty()
278            && self.file_info.last().unwrap().xorb_hash == fse.xorb_hash
279            && self.file_info.last().unwrap().chunk_index_end == fse.chunk_index_start
280    }
281
282    /// Add a new file data sequence entry to the current process, possibly merging with the
283    /// previous entry.
284    fn add_file_data_sequence_entry(&mut self, fse: FileDataSequenceEntry, n_deduped: usize) {
285        // Do we modify the previous entry as this is the next logical chunk, or do we
286        // start a new entry?
287        if self.file_data_sequence_continues_current(&fse) {
288            // This block is the contiguous continuation of the last entry
289            let last_entry = self.file_info.last_mut().unwrap();
290            last_entry.unpacked_segment_bytes += fse.unpacked_segment_bytes;
291            last_entry.chunk_index_end = fse.chunk_index_end;
292
293            // Update the fragmentation estimation window
294            self.defrag_tracker.increment_last_range_in_fragmentation_estimate(n_deduped);
295        } else {
296            // Make sure we're tracking any that we need to fill in later.
297            if fse.xorb_hash == MerkleHash::marker() {
298                self.internally_referencing_entries.push(self.file_info.len());
299            }
300            // This block is new
301            self.file_info.push(fse);
302            self.defrag_tracker.add_range_to_fragmentation_estimate(n_deduped);
303        }
304    }
305
306    /// Cut a new xorb from the existing data.  
307    fn cut_new_xorb(&mut self) -> RawXorbData {
308        // Cut the new xorb.
309        let new_xorb = RawXorbData::from_chunks(&self.new_data[..], vec![0]);
310
311        let xorb_hash = new_xorb.hash();
312
313        // Go through and replace all the indices in the file sequence entries with
314        // the new xorb if referenced.
315        for &idx in self.internally_referencing_entries.iter() {
316            let fse = &mut self.file_info[idx];
317            debug_assert_eq!(fse.xorb_hash, MerkleHash::marker());
318            debug_assert_lt!(fse.chunk_index_start as usize, self.new_data.len());
319            debug_assert_le!(fse.chunk_index_end as usize, self.new_data.len());
320
321            fse.xorb_hash = xorb_hash;
322        }
323
324        #[cfg(debug_assertions)]
325        {
326            // For bookkeeping checks, make sure we have everything.
327            for fse in self.file_info.iter() {
328                debug_assert_ne!(fse.xorb_hash, MerkleHash::marker());
329            }
330        }
331
332        // Clear out the old data.
333        self.new_data.clear();
334        self.new_data_hash_lookup.clear();
335        self.new_data_size = 0;
336        self.internally_referencing_entries.clear();
337
338        new_xorb
339    }
340
341    /// Do a query against the local data; this would return an entry with MerkleHash::marker(), which
342    /// would need to get filled in.
343    fn dedup_query_against_local_data(
344        &mut self,
345        chunks: &[MerkleHash],
346    ) -> Option<(usize, FileDataSequenceEntry, bool)> {
347        // It's important for the defrag prevention to have a good estimate of the number of chunks in
348        // a row that can be deduplicated, so this pulls through the
349        if let Some(&base_idx) = self.new_data_hash_lookup.get(&chunks[0]) {
350            let mut n_bytes = self.new_data[base_idx].data.len();
351
352            let mut end_idx = base_idx + 1;
353            for (i, chunk) in chunks.iter().enumerate().skip(1) {
354                if let Some(&idx) = self.new_data_hash_lookup.get(chunk)
355                    && idx == base_idx + i
356                {
357                    end_idx = idx + 1;
358                    n_bytes += self.new_data[idx].data.len();
359                    continue;
360                }
361                break;
362            }
363
364            Some((
365                end_idx - base_idx,
366                FileDataSequenceEntry::new(MerkleHash::marker(), n_bytes, base_idx, end_idx),
367                false,
368            ))
369        } else {
370            None
371        }
372    }
373
374    /// Finalize the internal state, converting remaining data to a DataAggregator object that contains the file info
375    /// and remaining data.  Also returns the aggregated deduplication metrics and the list of xorb hashes that were
376    /// registered as part of this run.
377    ///
378    /// Returns (file hash, data aggregation, deduplication metrics)
379    pub fn finalize(self, metadata_ext: Option<FileMetadataExt>) -> (MerkleHash, DataAggregator, DeduplicationMetrics) {
380        let file_hash = file_hash(&self.chunk_hashes);
381
382        let metadata = FileDataSequenceHeader::new(file_hash, self.file_info.len(), true, metadata_ext.is_some());
383
384        let mut chunk_idx = 0;
385
386        // Create the file verification stamp.
387        let verification = self
388            .file_info
389            .iter()
390            .map(|entry| {
391                let n_chunks = (entry.chunk_index_end - entry.chunk_index_start) as usize;
392                let chunk_hashes: Vec<_> = self.chunk_hashes[chunk_idx..chunk_idx + n_chunks]
393                    .iter()
394                    .map(|(hash, _)| *hash)
395                    .collect();
396                let range_hash =
397                    xet_core_structures::metadata_shard::chunk_verification::range_hash_from_chunks(&chunk_hashes);
398                chunk_idx += n_chunks;
399
400                FileVerificationEntry::new(range_hash)
401            })
402            .collect();
403
404        let fi = MDBFileInfo {
405            metadata,
406            segments: self.file_info,
407            verification,
408            metadata_ext,
409        };
410
411        let remaining_data = DataAggregator::new(self.new_data, fi, self.internally_referencing_entries, self.file_id);
412
413        (file_hash, remaining_data, self.deduplication_metrics)
414    }
415}