dwarfs_enc/
chunker.rs

1//! File data slicing and/or deduplication.
2use std::{
3    collections::{HashMap, hash_map::Entry},
4    fmt,
5    io::{Read, Write},
6    num::NonZero,
7};
8
9use dwarfs::section::SectionType;
10use rustic_cdc::{Rabin64, RollingHash64};
11use sha2::{Digest, Sha512_256};
12
13use crate::{
14    Error, Result,
15    metadata::Chunk,
16    section::{self, CompressParam},
17};
18
19type Chunks = Vec<Chunk>;
20
21/// Algorithm to slice and/or deduplicate file content.
22pub trait Chunker {
23    /// Put data via a [`Read`] instance into the archive, and return the
24    /// chunking result ready for [`crate::metadata::Builder::put_file`].
25    fn put_reader(&mut self, rdr: &mut dyn Read) -> Result<Chunks>;
26
27    /// Put in-memory data into the archive.
28    ///
29    /// This is a shortcut to [`Chunker::put_reader`].
30    fn put_bytes(&mut self, mut bytes: &[u8]) -> Result<Chunks> {
31        self.put_reader(&mut bytes)
32    }
33}
34
35/// The simplest chunker to concat all files and slice data at block size.
36///
37/// This does no deduplication.
38pub struct BasicChunker<W> {
39    buf: Box<[u8]>,
40    buf_len: usize,
41    compression: CompressParam,
42    w: section::Writer<W>,
43}
44
45impl<W: fmt::Debug> fmt::Debug for BasicChunker<W> {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        f.debug_struct("BasicChunker")
48            .field("buf", &format_args!("{}/{}", self.buf_len, self.buf.len()))
49            .field("compression", &self.compression)
50            .field("w", &self.w)
51            .finish()
52    }
53}
54
55impl<W> BasicChunker<W> {
56    /// Create a basic chunker with given section writer and parameters.
57    ///
58    /// Note: `block_size` must match the block size configured for
59    /// [`crate::metadata::Builder`]. You should always get it from
60    /// [`crate::metadata::Builder::block_size`].
61    pub fn new(
62        w: section::Writer<W>,
63        block_size: NonZero<u32>,
64        compression: CompressParam,
65    ) -> Self {
66        Self {
67            buf: vec![0u8; block_size.get() as usize].into_boxed_slice(),
68            buf_len: 0,
69            compression,
70            w,
71        }
72    }
73
74    /// Finalize data chunks and get back the underlying section writer.
75    pub fn finish(mut self) -> Result<section::Writer<W>>
76    where
77        W: Write,
78    {
79        if self.buf_len != 0 {
80            self.w.write_section(
81                SectionType::BLOCK,
82                self.compression,
83                &self.buf[..self.buf_len],
84            )?;
85            self.buf_len = 0;
86        }
87        Ok(self.w)
88    }
89
90    fn put_reader_inner(&mut self, rdr: &mut dyn Read) -> Result<SeqChunks>
91    where
92        W: Write,
93    {
94        let mut chunks = SeqChunks {
95            start_section_idx: self.w.section_count(),
96            start_offset: self.buf_len as u32,
97            len: 0,
98        };
99        loop {
100            while self.buf_len < self.buf.len() {
101                match rdr.read(&mut self.buf[self.buf_len..]) {
102                    Ok(0) => return Ok(chunks),
103                    Ok(n) => {
104                        self.buf_len += n;
105                        chunks.len += n as u64;
106                    }
107                    Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
108                    Err(err) => return Err(err.into()),
109                }
110            }
111
112            debug_assert_eq!(self.buf_len, self.buf.len());
113            self.w
114                .write_section(SectionType::BLOCK, self.compression, &self.buf)?;
115            self.buf_len = 0;
116        }
117    }
118}
119
120#[derive(Debug, Clone, Copy)]
121struct SeqChunks {
122    start_section_idx: u32,
123    start_offset: u32,
124    len: u64,
125}
126
127impl SeqChunks {
128    fn to_chunks(mut self, block_size: u32) -> impl Iterator<Item = Chunk> {
129        std::iter::from_fn(move || {
130            let rest_len = block_size - self.start_offset;
131            if self.len == 0 {
132                None
133            } else if self.len <= u64::from(rest_len) {
134                let c = Chunk {
135                    section_idx: self.start_section_idx,
136                    offset: self.start_offset,
137                    size: self.len as u32,
138                };
139                self.len = 0;
140                Some(c)
141            } else {
142                let c = Chunk {
143                    section_idx: self.start_section_idx,
144                    offset: self.start_offset,
145                    size: rest_len,
146                };
147                self.len -= u64::from(rest_len);
148                self.start_section_idx += 1;
149                self.start_offset = 0;
150                Some(c)
151            }
152        })
153    }
154}
155
156impl<W: Write> Chunker for BasicChunker<W> {
157    fn put_reader(&mut self, rdr: &mut dyn Read) -> Result<Chunks> {
158        let seq = self.put_reader_inner(rdr)?;
159        Ok(seq.to_chunks(self.buf.len() as u32).collect())
160    }
161}
162
163/// The deduplicating chunker using Content Defined Chunking (CDC).
164///
165/// The exact algorithm used may change. Currently it uses [rustic_cdc].
166pub struct CdcChunker<W> {
167    inner: BasicChunker<W>,
168    // TODO: This struct is too large.
169    rabin: Rabin64,
170    chunk_buf: Box<[u8]>,
171
172    table: HashMap<u64, CdcChunk>,
173    deduplicated_bytes: u64,
174}
175
176impl<W: fmt::Debug> fmt::Debug for CdcChunker<W> {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        f.debug_struct("CdcChunker")
179            .field("inner", &self.inner)
180            .field("table_size", &self.table.len())
181            .field("deduplicated_bytes", &self.deduplicated_bytes)
182            .finish_non_exhaustive()
183    }
184}
185
186struct CdcChunk {
187    sha256_suffix: [u8; 24],
188    start_section_idx: u32,
189    start_offset: u32,
190}
191
192impl<W> CdcChunker<W> {
193    const WINDOW_SIZE_BITS: u32 = 6;
194    const WINDOW_SIZE: usize = 1usize << Self::WINDOW_SIZE_BITS;
195    const CUT_MASK: u64 = (1u64 << 11) - 1;
196    const MIN_CHUNK_SIZE: usize = Self::WINDOW_SIZE;
197    const MAX_CHUNK_SIZE: usize = 64 << 10;
198
199    /// Create the deduplicating chunker on top of a [`BasicChunker`].
200    pub fn new(inner: BasicChunker<W>) -> Self {
201        let rabin = Rabin64::new(Self::WINDOW_SIZE_BITS);
202        CdcChunker {
203            inner,
204            rabin,
205            chunk_buf: vec![0u8; Self::MAX_CHUNK_SIZE].into_boxed_slice(),
206            table: HashMap::new(),
207            deduplicated_bytes: 0,
208        }
209    }
210
211    /// Get the total deduplicated bytes.
212    pub fn deduplicated_bytes(&self) -> u64 {
213        self.deduplicated_bytes
214    }
215
216    /// Finalize data chunks and get back the underlying section writer.
217    pub fn finish(self) -> Result<section::Writer<W>>
218    where
219        W: Write,
220    {
221        self.inner.finish()
222    }
223}
224
225impl<W: Write> Chunker for CdcChunker<W> {
226    fn put_reader(&mut self, rdr: &mut dyn Read) -> Result<Chunks> {
227        let block_size = self.inner.buf.len() as u32;
228
229        let mut chunks = Chunks::new();
230        let mut record_chunk = |cdchunk: &[u8]| {
231            debug_assert_ne!(cdchunk.len(), 0);
232
233            let hash = Sha512_256::new_with_prefix(cdchunk).finalize();
234            let (&hash_prefix, hash_suffix) = hash.split_first_chunk::<8>().expect("hash is 32B");
235            let hash_suffix: [u8; 24] = hash_suffix.try_into().expect("hash is 32B");
236
237            let seq = match self.table.entry(u64::from_ne_bytes(hash_prefix)) {
238                Entry::Vacant(ent) => {
239                    let seq = self.inner.put_reader_inner(&mut { cdchunk })?;
240                    ent.insert(CdcChunk {
241                        sha256_suffix: hash_suffix,
242                        start_section_idx: seq.start_section_idx,
243                        start_offset: seq.start_offset,
244                    });
245                    seq
246                }
247                Entry::Occupied(ent) if ent.get().sha256_suffix == hash_suffix => {
248                    self.deduplicated_bytes += cdchunk.len() as u64;
249                    SeqChunks {
250                        start_section_idx: ent.get().start_section_idx,
251                        start_offset: ent.get().start_offset,
252                        len: cdchunk.len() as u64,
253                    }
254                }
255                // Hash prefix collision.
256                Entry::Occupied(_) => self.inner.put_reader_inner(&mut { cdchunk })?,
257            };
258
259            // Merge chunks if possible.
260            for c in seq.to_chunks(block_size) {
261                if let Some(p) = chunks
262                    .last_mut()
263                    .filter(|p| (p.section_idx, p.offset + p.size) == (c.section_idx, c.offset))
264                {
265                    p.size += c.size;
266                } else {
267                    chunks.push(c);
268                }
269            }
270
271            Ok::<_, Error>(())
272        };
273
274        self.rabin.reset();
275
276        // |               chunk_buf                            |
277        // | ...chunk | chunk | partial chunk | next read | ... |
278        //                    ^cut_pos        ^end_pos
279        //                                     ~~~~~~~~~~~ read_len
280        let mut cut_pos = 0usize;
281        let mut end_pos = 0usize;
282        loop {
283            assert_ne!(end_pos, self.chunk_buf.len());
284            let read_len = match rdr.read(&mut self.chunk_buf[end_pos..]) {
285                Ok(0) => break,
286                Ok(n) => n,
287                Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
288                Err(err) => return Err(err.into()),
289            };
290
291            for (&b, pos) in self.chunk_buf[end_pos..end_pos + read_len]
292                .iter()
293                .zip(end_pos..)
294            {
295                self.rabin.slide(b);
296                // This is the length of the whole chunk, including previous partial data.
297                // NB. the current byte at `pos` is included, hereby `+1`.
298                let len = pos - cut_pos + 1;
299
300                // The `MIN_CHUNK_SIZE` guarantees the sliding window is always filled.
301                if len >= Self::MIN_CHUNK_SIZE && self.rabin.hash & Self::CUT_MASK == Self::CUT_MASK
302                    || len >= Self::MAX_CHUNK_SIZE
303                {
304                    let cdchunk = &self.chunk_buf[cut_pos..pos];
305                    cut_pos = pos;
306                    record_chunk(cdchunk)?;
307                }
308            }
309            end_pos += read_len;
310
311            // Shift-down the last partial chunk if we reached the end of buffer.
312            // For files smaller than `MAX_CHUNK_SIZE`, this path is never entered.
313            if end_pos >= self.chunk_buf.len() {
314                debug_assert_eq!(end_pos, self.chunk_buf.len());
315                self.chunk_buf.copy_within(cut_pos.., 0);
316                end_pos -= cut_pos;
317                cut_pos = 0;
318            }
319        }
320
321        if cut_pos < end_pos {
322            record_chunk(&self.chunk_buf[cut_pos..end_pos])?;
323        }
324
325        Ok(chunks)
326    }
327}