Skip to main content

xet_data/deduplication/
data_aggregator.rs

1use std::mem::take;
2
3use more_asserts::*;
4use xet_core_structures::merklehash::MerkleHash;
5use xet_core_structures::metadata_shard::file_structs::MDBFileInfo;
6
7use super::constants::{MAX_XORB_BYTES, MAX_XORB_CHUNKS};
8use super::{Chunk, RawXorbData};
9
10#[derive(Default, Debug)]
11pub struct DataAggregator {
12    // Bytes of all chunks accumulated in one CAS block concatenated together.
13    pub chunks: Vec<Chunk>,
14
15    // Number of bytes
16    num_bytes: usize,
17
18    // The file info of files that are still being processed.
19    // As we're building this up, we assume that all files that do not have a size in the header are
20    // not finished yet and thus cannot be uploaded.
21    //
22    // All the cases the marker hash for a cas info entry will be filled in with the cas hash for
23    // an entry once the cas block is finalized and uploaded.  These correspond to the indices given
24    // alongwith the file info.
25    // This tuple contains the file info (which may be modified) and the divisions in the chunks corresponding
26    // to this file.  It also includes an optional file ID
27    pub pending_file_info: Vec<(MDBFileInfo, Vec<usize>, u64)>,
28
29    // The specific chunk indices at which a new file starts.  This is used for the compression
30    // heuristic; which compression method to use is calculated once per file section for each xorb.
31    pub file_boundaries: Vec<usize>,
32}
33
34impl DataAggregator {
35    pub(crate) fn new(
36        chunks: Vec<Chunk>,
37        pending_file_info: MDBFileInfo,
38        internally_referencing_entries: Vec<usize>,
39        file_id: u64,
40    ) -> Self {
41        let num_bytes = chunks.iter().map(|c| c.data.len()).sum();
42
43        // This is just one file here, so start it off like this.
44        let file_boundaries = if chunks.is_empty() { vec![] } else { vec![0] };
45
46        Self {
47            chunks,
48            num_bytes,
49            pending_file_info: vec![(pending_file_info, internally_referencing_entries, file_id)],
50            file_boundaries,
51        }
52    }
53
54    pub fn is_empty(&self) -> bool {
55        self.chunks.is_empty() && self.pending_file_info.is_empty()
56    }
57
58    pub fn num_chunks(&self) -> usize {
59        self.chunks.len()
60    }
61
62    pub fn num_bytes(&self) -> usize {
63        debug_assert_eq!(self.chunks.iter().map(|c| c.data.len()).sum::<usize>(), self.num_bytes);
64        self.num_bytes
65    }
66
67    /// Finalize the result, returning the xorb data, and a Vec of (file_id, file_info, n_bytes_in_xorb);
68    /// i.e. the file info that's included in this along
69    /// with the number of bytes in each file that is part of this xorb.
70    pub fn finalize(mut self) -> (RawXorbData, Vec<(u64, MDBFileInfo, u64)>) {
71        // First, cut the xorb for this one.
72        let xorb_data = RawXorbData::from_chunks(&self.chunks, take(&mut self.file_boundaries));
73        let xorb_hash = xorb_data.hash();
74
75        debug_assert_le!(self.num_bytes(), *MAX_XORB_BYTES);
76        debug_assert_le!(self.num_chunks(), *MAX_XORB_CHUNKS);
77
78        let mut ret = vec![0u64; self.pending_file_info.len()];
79
80        // Now that we have the CAS hash, fill in any blocks with the referencing xorb
81        // hash as needed.
82        for (f_idx, (fi, chunk_hash_indices_ref, _file_id)) in self.pending_file_info.iter_mut().enumerate() {
83            for &i in chunk_hash_indices_ref.iter() {
84                debug_assert_eq!(fi.segments[i].xorb_hash, MerkleHash::marker());
85                fi.segments[i].xorb_hash = xorb_hash;
86                ret[f_idx] += fi.segments[i].unpacked_segment_bytes as u64;
87            }
88
89            // Incorporated this info, so clear this.
90            chunk_hash_indices_ref.clear();
91
92            #[cfg(debug_assertions)]
93            {
94                // Make sure our bookkeeping along the way was good.
95                for fse in fi.segments.iter() {
96                    debug_assert_ne!(fse.xorb_hash, MerkleHash::marker());
97                }
98            }
99        }
100
101        (
102            xorb_data,
103            self.pending_file_info
104                .into_iter()
105                .zip(ret)
106                .map(|((fi, _, file_id), byte_count)| (file_id, fi, byte_count))
107                .collect(),
108        )
109    }
110
111    pub fn merge_in(&mut self, mut other: DataAggregator) {
112        debug_assert_le!(self.num_bytes() + other.num_bytes(), *MAX_XORB_BYTES);
113        debug_assert_le!(self.num_chunks() + other.num_chunks(), *MAX_XORB_BYTES);
114
115        let shift = self.chunks.len() as u32;
116        self.chunks.append(&mut other.chunks);
117        self.num_bytes += other.num_bytes;
118
119        // Adjust the chunk indices and shifts for
120        for file_info in other.pending_file_info.iter_mut() {
121            for fi in file_info.0.segments.iter_mut() {
122                // To transfer the cas chunks from the other data aggregator to this one,
123                // shift chunk indices so the new index start and end values reflect the
124                // append opperation above.
125                if fi.xorb_hash == MerkleHash::marker() {
126                    fi.chunk_index_start += shift;
127                    fi.chunk_index_end += shift;
128                }
129            }
130        }
131
132        self.pending_file_info.append(&mut other.pending_file_info);
133
134        // Append the file boundaries from the other aggregator, tracking the shifts.
135        self.file_boundaries
136            .extend(other.file_boundaries.into_iter().map(|idx| idx + (shift as usize)));
137    }
138}