fastcdc/v2016/
mod.rs

1//
2// Copyright (c) 2023 Nathan Fiedler
3//
4
5//! This module implements the canonical FastCDC algorithm as described in the
6//! [paper](https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf)
7//! by Wen Xia, et al., in 2016.
8//!
9//! The algorithm incorporates a simplified hash judgement using the fast Gear
10//! hash, sub-minimum chunk cut-point skipping, and normalized chunking to
11//! produce chunks of a more consistent length.
12//!
13//! There are two ways in which to use the `FastCDC` struct defined in this
14//! module. One is to simply invoke `cut()` while managing your own `start` and
15//! `remaining` values. The other is to use the struct as an `Iterator` that
16//! yields `Chunk` structs which represent the offset and size of the chunks.
17//! Note that attempting to use both `cut()` and `Iterator` on the same
18//! `FastCDC` instance will yield incorrect results.
19//!
20//! Note that the `cut()` function returns the 64-bit hash of the chunk, which
21//! may be useful in scenarios involving chunk size prediction using historical
22//! data, such as in RapidCDC or SuperCDC. This hash value is also given in the
23//! `hash` field of the `Chunk` struct. While this value has rather low entropy,
24//! it is computationally cost-free and can be put to some use with additional
25//! record keeping.
26//!
27//! The `StreamCDC` implementation is similar to `FastCDC` except that it will
28//! read data from a `Read` into an internal buffer of `max_size` and produce
29//! `ChunkData` values from the `Iterator`.
30use std::fmt;
31use std::io::Read;
32
33/// Smallest acceptable value for the minimum chunk size.
34pub const MINIMUM_MIN: u32 = 64;
35/// Largest acceptable value for the minimum chunk size.
36pub const MINIMUM_MAX: u32 = 1_048_576;
37/// Smallest acceptable value for the average chunk size.
38pub const AVERAGE_MIN: u32 = 256;
39/// Largest acceptable value for the average chunk size.
40pub const AVERAGE_MAX: u32 = 4_194_304;
41/// Smallest acceptable value for the maximum chunk size.
42pub const MAXIMUM_MIN: u32 = 1024;
43/// Largest acceptable value for the maximum chunk size.
44pub const MAXIMUM_MAX: u32 = 16_777_216;
45
46//
47// Masks for each of the desired number of bits, where 0 through 5 are unused.
48// The values for sizes 64 bytes through 128 kilo-bytes comes from the C
49// reference implementation (found in the destor repository) while the extra
50// values come from the restic-FastCDC repository. The FastCDC paper claims that
51// the deduplication ratio is slightly improved when the mask bits are spread
52// relatively evenly, hence these seemingly "magic" values.
53//
54const MASKS: [u64; 26] = [
55    0,                  // padding
56    0,                  // padding
57    0,                  // padding
58    0,                  // padding
59    0,                  // padding
60    0x0000000001804110, // unused except for NC 3
61    0x0000000001803110, // 64B
62    0x0000000018035100, // 128B
63    0x0000001800035300, // 256B
64    0x0000019000353000, // 512B
65    0x0000590003530000, // 1KB
66    0x0000d90003530000, // 2KB
67    0x0000d90103530000, // 4KB
68    0x0000d90303530000, // 8KB
69    0x0000d90313530000, // 16KB
70    0x0000d90f03530000, // 32KB
71    0x0000d90303537000, // 64KB
72    0x0000d90703537000, // 128KB
73    0x0000d90707537000, // 256KB
74    0x0000d91707537000, // 512KB
75    0x0000d91747537000, // 1MB
76    0x0000d91767537000, // 2MB
77    0x0000d93767537000, // 4MB
78    0x0000d93777537000, // 8MB
79    0x0000d93777577000, // 16MB
80    0x0000db3777577000, // unused except for NC 3
81];
82
83//
84// GEAR contains seemingly random numbers which are created by computing the
85// MD5 digest of values from 0 to 255, using only the high 8 bytes of the 16
86// byte digest. This is the "gear hash" referred to the in FastCDC paper.
87//
88// The program to produce this table is named table64.rs in examples.
89//
90#[rustfmt::skip]
91const GEAR: [u64; 256] = [
92    0x3b5d3c7d207e37dc, 0x784d68ba91123086, 0xcd52880f882e7298, 0xeacf8e4e19fdcca7,
93    0xc31f385dfbd1632b, 0x1d5f27001e25abe6, 0x83130bde3c9ad991, 0xc4b225676e9b7649,
94    0xaa329b29e08eb499, 0xb67fcbd21e577d58, 0x0027baaada2acf6b, 0xe3ef2d5ac73c2226,
95    0x0890f24d6ed312b7, 0xa809e036851d7c7e, 0xf0a6fe5e0013d81b, 0x1d026304452cec14,
96    0x03864632648e248f, 0xcdaacf3dcd92b9b4, 0xf5e012e63c187856, 0x8862f9d3821c00b6,
97    0xa82f7338750f6f8a, 0x1e583dc6c1cb0b6f, 0x7a3145b69743a7f1, 0xabb20fee404807eb,
98    0xb14b3cfe07b83a5d, 0xb9dc27898adb9a0f, 0x3703f5e91baa62be, 0xcf0bb866815f7d98,
99    0x3d9867c41ea9dcd3, 0x1be1fa65442bf22c, 0x14300da4c55631d9, 0xe698e9cbc6545c99,
100    0x4763107ec64e92a5, 0xc65821fc65696a24, 0x76196c064822f0b7, 0x485be841f3525e01,
101    0xf652bc9c85974ff5, 0xcad8352face9e3e9, 0x2a6ed1dceb35e98e, 0xc6f483badc11680f,
102    0x3cfd8c17e9cf12f1, 0x89b83c5e2ea56471, 0xae665cfd24e392a9, 0xec33c4e504cb8915,
103    0x3fb9b15fc9fe7451, 0xd7fd1fd1945f2195, 0x31ade0853443efd8, 0x255efc9863e1e2d2,
104    0x10eab6008d5642cf, 0x46f04863257ac804, 0xa52dc42a789a27d3, 0xdaaadf9ce77af565,
105    0x6b479cd53d87febb, 0x6309e2d3f93db72f, 0xc5738ffbaa1ff9d6, 0x6bd57f3f25af7968,
106    0x67605486d90d0a4a, 0xe14d0b9663bfbdae, 0xb7bbd8d816eb0414, 0xdef8a4f16b35a116,
107    0xe7932d85aaaffed6, 0x08161cbae90cfd48, 0x855507beb294f08b, 0x91234ea6ffd399b2,
108    0xad70cf4b2435f302, 0xd289a97565bc2d27, 0x8e558437ffca99de, 0x96d2704b7115c040,
109    0x0889bbcdfc660e41, 0x5e0d4e67dc92128d, 0x72a9f8917063ed97, 0x438b69d409e016e3,
110    0xdf4fed8a5d8a4397, 0x00f41dcf41d403f7, 0x4814eb038e52603f, 0x9dafbacc58e2d651,
111    0xfe2f458e4be170af, 0x4457ec414df6a940, 0x06e62f1451123314, 0xbd1014d173ba92cc,
112    0xdef318e25ed57760, 0x9fea0de9dfca8525, 0x459de1e76c20624b, 0xaeec189617e2d666,
113    0x126a2c06ab5a83cb, 0xb1321532360f6132, 0x65421503dbb40123, 0x2d67c287ea089ab3,
114    0x6c93bff5a56bd6b6, 0x4ffb2036cab6d98d, 0xce7b785b1be7ad4f, 0xedb42ef6189fd163,
115    0xdc905288703988f6, 0x365f9c1d2c691884, 0xc640583680d99bfe, 0x3cd4624c07593ec6,
116    0x7f1ea8d85d7c5805, 0x014842d480b57149, 0x0b649bcb5a828688, 0xbcd5708ed79b18f0,
117    0xe987c862fbd2f2f0, 0x982731671f0cd82c, 0xbaf13e8b16d8c063, 0x8ea3109cbd951bba,
118    0xd141045bfb385cad, 0x2acbc1a0af1f7d30, 0xe6444d89df03bfdf, 0xa18cc771b8188ff9,
119    0x9834429db01c39bb, 0x214add07fe086a1f, 0x8f07c19b1f6b3ff9, 0x56a297b1bf4ffe55,
120    0x94d558e493c54fc7, 0x40bfc24c764552cb, 0x931a706f8a8520cb, 0x32229d322935bd52,
121    0x2560d0f5dc4fefaf, 0x9dbcc48355969bb6, 0x0fd81c3985c0b56a, 0xe03817e1560f2bda,
122    0xc1bb4f81d892b2d5, 0xb0c4864f4e28d2d7, 0x3ecc49f9d9d6c263, 0x51307e99b52ba65e,
123    0x8af2b688da84a752, 0xf5d72523b91b20b6, 0x6d95ff1ff4634806, 0x562f21555458339a,
124    0xc0ce47f889336346, 0x487823e5089b40d8, 0xe4727c7ebc6d9592, 0x5a8f7277e94970ba,
125    0xfca2f406b1c8bb50, 0x5b1f8a95f1791070, 0xd304af9fc9028605, 0x5440ab7fc930e748,
126    0x312d25fbca2ab5a1, 0x10f4a4b234a4d575, 0x90301d55047e7473, 0x3b6372886c61591e,
127    0x293402b77c444e06, 0x451f34a4d3e97dd7, 0x3158d814d81bc57b, 0x034942425b9bda69,
128    0xe2032ff9e532d9bb, 0x62ae066b8b2179e5, 0x9545e10c2f8d71d8, 0x7ff7483eb2d23fc0,
129    0x00945fcebdc98d86, 0x8764bbbe99b26ca2, 0x1b1ec62284c0bfc3, 0x58e0fcc4f0aa362b,
130    0x5f4abefa878d458d, 0xfd74ac2f9607c519, 0xa4e3fb37df8cbfa9, 0xbf697e43cac574e5,
131    0x86f14a3f68f4cd53, 0x24a23d076f1ce522, 0xe725cd8048868cc8, 0xbf3c729eb2464362,
132    0xd8f6cd57b3cc1ed8, 0x6329e52425541577, 0x62aa688ad5ae1ac0, 0x0a242566269bf845,
133    0x168b1a4753aca74b, 0xf789afefff2e7e3c, 0x6c3362093b6fccdb, 0x4ce8f50bd28c09b2,
134    0x006a2db95ae8aa93, 0x975b0d623c3d1a8c, 0x18605d3935338c5b, 0x5bb6f6136cad3c71,
135    0x0f53a20701f8d8a6, 0xab8c5ad2e7e93c67, 0x40b5ac5127acaa29, 0x8c7bf63c2075895f,
136    0x78bd9f7e014a805c, 0xb2c9e9f4f9c8c032, 0xefd6049827eb91f3, 0x2be459f482c16fbd,
137    0xd92ce0c5745aaa8c, 0x0aaa8fb298d965b9, 0x2b37f92c6c803b15, 0x8c54a5e94e0f0e78,
138    0x95f9b6e90c0a3032, 0xe7939faa436c7874, 0xd16bfe8f6a8a40c9, 0x44982b86263fd2fa,
139    0xe285fb39f984e583, 0x779a8df72d7619d3, 0xf2d79a8de8d5dd1e, 0xd1037354d66684e2,
140    0x004c82a4e668a8e5, 0x31d40a7668b044e6, 0xd70578538bd02c11, 0xdb45431078c5f482,
141    0x977121bb7f6a51ad, 0x73d5ccbd34eff8dd, 0xe437a07d356e17cd, 0x47b2782043c95627,
142    0x9fb251413e41d49a, 0xccd70b60652513d3, 0x1c95b31e8a1b49b2, 0xcae73dfd1bcb4c1b,
143    0x34d98331b1f5b70f, 0x784e39f22338d92f, 0x18613d4a064df420, 0xf1d8dae25f0bcebe,
144    0x33f77c15ae855efc, 0x3c88b3b912eb109c, 0x956a2ec96bafeea5, 0x1aa005b5e0ad0e87,
145    0x5500d70527c4bb8e, 0xe36c57196421cc44, 0x13c4d286cc36ee39, 0x5654a23d818b2a81,
146    0x77b1dc13d161abdc, 0x734f44de5f8d5eb5, 0x60717e174a6c89a2, 0xd47d9649266a211e,
147    0x5b13a4322bb69e90, 0xf7669609f8b5fc3c, 0x21e6ac55bedcdac9, 0x9b56b62b61166dea,
148    0xf48f66b939797e9c, 0x35f332f9c0e6ae9a, 0xcc733f6a9a878db0, 0x3da161e41cc108c2,
149    0xb7d74ae535914d51, 0x4d493b0b11d36469, 0xce264d1dfba9741a, 0xa9d1f2dc7436dc06,
150    0x70738016604c2a27, 0x231d36e96e93f3d5, 0x7666881197838d19, 0x4a2a83090aaad40c,
151    0xf1e761591668b35d, 0x7363236497f730a7, 0x301080e37379dd4d, 0x502dea2971827042,
152    0xc2c5eb858f32625f, 0x786afb9edfafbdff, 0xdaee0d868490b2a4, 0x617366b3268609f6,
153    0xae0e35a0fe46173e, 0xd1a07de93e824f11, 0x079b8b115ea4cca8, 0x93a99274558faebb,
154    0xfb1e6e22e08a03b3, 0xea635fdba3698dd0, 0xcf53659328503a5c, 0xcde3b31e6fd5d780,
155    0x8e3e4221d3614413, 0xef14d0d86bf1a22c, 0xe1d830d3f16c5ddb, 0xaabd2b2a451504e1
156];
157
158// Find the next chunk cut point in the source.
159fn cut(
160    source: &[u8],
161    min_size: usize,
162    avg_size: usize,
163    max_size: usize,
164    mask_s: u64,
165    mask_l: u64,
166) -> (u64, usize) {
167    let mut remaining = source.len();
168    if remaining <= min_size {
169        return (0, remaining);
170    }
171    let mut center = avg_size;
172    if remaining > max_size {
173        remaining = max_size;
174    } else if remaining < center {
175        center = remaining;
176    }
177    let mut index = min_size;
178    // Paraphrasing from the paper: Use the mask with more 1 bits for the
179    // hash judgment when the current chunking position is smaller than the
180    // desired size, which makes it harder to generate smaller chunks.
181    let mut hash: u64 = 0;
182    while index < center {
183        hash = (hash << 1).wrapping_add(GEAR[source[index] as usize]);
184        if (hash & mask_s) == 0 {
185            return (hash, index);
186        }
187        index += 1;
188    }
189    // Again, paraphrasing: use the mask with fewer 1 bits for the hash
190    // judgment when the current chunking position is larger than the
191    // desired size, which makes it easier to generate larger chunks.
192    let last_pos = remaining;
193    while index < last_pos {
194        hash = (hash << 1).wrapping_add(GEAR[source[index] as usize]);
195        if (hash & mask_l) == 0 {
196            return (hash, index);
197        }
198        index += 1;
199    }
200    // If all else fails, return the largest chunk. This will happen with
201    // pathological data, such as all zeroes.
202    (hash, index)
203}
204
205///
206/// The level for the normalized chunking used by FastCDC and StreamCDC.
207///
208/// Normalized chunking "generates chunks whose sizes are normalized to a
209/// specified region centered at the expected chunk size," as described in
210/// section 4.4 of the FastCDC 2016 paper.
211///
212/// Note that lower levels of normalization will result in a larger range of
213/// generated chunk sizes. It may be beneficial to widen the minimum/maximum
214/// chunk size values given to the `FastCDC` constructor in that case.
215///
216/// Note that higher levels of normalization may result in the final chunk of
217/// data being smaller than the minimum chunk size, which results in a hash
218/// value of zero (`0`) since no calculations are performed for sub-minimum
219/// chunks.
220///
221#[derive(Copy, Clone, Debug)]
222pub enum Normalization {
223    /// No chunk size normalization, produces a wide range of chunk sizes.
224    Level0,
225    /// Level 1 normalization, in which fewer chunks are outside of the desired range.
226    Level1,
227    /// Level 2 normalization, where most chunks are of the desired size.
228    Level2,
229    /// Level 3 normalization, nearly all chunks are the desired size.
230    Level3,
231}
232
233impl Normalization {
234    fn bits(&self) -> u32 {
235        match self {
236            Normalization::Level0 => 0,
237            Normalization::Level1 => 1,
238            Normalization::Level2 => 2,
239            Normalization::Level3 => 3,
240        }
241    }
242}
243
244impl fmt::Display for Normalization {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        self.bits().fmt(f)
247    }
248}
249
250///
251/// Represents a chunk returned from the FastCDC iterator.
252///
253#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
254pub struct Chunk {
255    /// The gear hash value as of the end of the chunk.
256    pub hash: u64,
257    /// Starting byte position within the source.
258    pub offset: usize,
259    /// Length of the chunk in bytes.
260    pub length: usize,
261}
262
263///
264/// The FastCDC chunker implementation from 2016.
265///
266/// Use `new` to construct an instance, and then iterate over the `Chunk`s via
267/// the `Iterator` trait.
268///
269/// This example reads a file into memory and splits it into chunks that are
270/// roughly 16 KB in size. The minimum and maximum sizes are the absolute limit
271/// on the returned chunk sizes. With this algorithm, it is helpful to be more
272/// lenient on the maximum chunk size as the results are highly dependent on the
273/// input data. Changing the minimum chunk size will affect the results as the
274/// algorithm may find different cut points given it uses the minimum as a
275/// starting point (cut-point skipping).
276///
277/// ```no_run
278/// # use std::fs;
279/// # use fastcdc::v2016::FastCDC;
280/// let contents = fs::read("test/fixtures/SekienAkashita.jpg").unwrap();
281/// let chunker = FastCDC::new(&contents, 8192, 16384, 65535);
282/// for entry in chunker {
283///     println!("offset={} length={}", entry.offset, entry.length);
284/// }
285/// ```
286///
287#[derive(Debug, Clone, Eq, PartialEq)]
288pub struct FastCDC<'a> {
289    source: &'a [u8],
290    processed: usize,
291    remaining: usize,
292    min_size: usize,
293    avg_size: usize,
294    max_size: usize,
295    mask_s: u64,
296    mask_l: u64,
297}
298
299impl<'a> FastCDC<'a> {
300    ///
301    /// Construct a `FastCDC` that will process the given slice of bytes.
302    ///
303    /// Uses chunk size normalization level 1 by default.
304    ///
305    pub fn new(source: &'a [u8], min_size: u32, avg_size: u32, max_size: u32) -> Self {
306        FastCDC::with_level(source, min_size, avg_size, max_size, Normalization::Level1)
307    }
308
309    ///
310    /// Create a new `FastCDC` with the given normalization level.
311    ///
312    pub fn with_level(
313        source: &'a [u8],
314        min_size: u32,
315        avg_size: u32,
316        max_size: u32,
317        level: Normalization,
318    ) -> Self {
319        assert!(min_size >= MINIMUM_MIN);
320        assert!(min_size <= MINIMUM_MAX);
321        assert!(avg_size >= AVERAGE_MIN);
322        assert!(avg_size <= AVERAGE_MAX);
323        assert!(max_size >= MAXIMUM_MIN);
324        assert!(max_size <= MAXIMUM_MAX);
325        let bits = logarithm2(avg_size);
326        let normalization = level.bits();
327        let mask_s = MASKS[(bits + normalization) as usize];
328        let mask_l = MASKS[(bits - normalization) as usize];
329        Self {
330            source,
331            processed: 0,
332            remaining: source.len(),
333            min_size: min_size as usize,
334            avg_size: avg_size as usize,
335            max_size: max_size as usize,
336            mask_s,
337            mask_l,
338        }
339    }
340
341    ///
342    /// Find the next cut point in the data, where `start` is the position from
343    /// which to start processing the source data, and `remaining` are the
344    /// number of bytes left to be processed.
345    ///
346    /// The returned 2-tuple consists of the 64-bit hash (fingerprint) and the
347    /// byte offset of the end of the chunk.
348    ///
349    /// There is a special case in which the remaining bytes are less than the
350    /// minimum chunk size, at which point this function returns a hash of 0 and
351    /// the cut point is the end of the source data.
352    ///
353    pub fn cut(&self, start: usize, remaining: usize) -> (u64, usize) {
354        let end = start + remaining;
355        let (hash, count) = cut(
356            &self.source[start..end],
357            self.min_size,
358            self.avg_size,
359            self.max_size,
360            self.mask_s,
361            self.mask_l,
362        );
363        (hash, start + count)
364    }
365}
366
367impl Iterator for FastCDC<'_> {
368    type Item = Chunk;
369
370    fn next(&mut self) -> Option<Chunk> {
371        if self.remaining == 0 {
372            None
373        } else {
374            let (hash, cutpoint) = self.cut(self.processed, self.remaining);
375            if cutpoint == 0 {
376                None
377            } else {
378                let offset = self.processed;
379                let length = cutpoint - offset;
380                self.processed += length;
381                self.remaining -= length;
382                Some(Chunk {
383                    hash,
384                    offset,
385                    length,
386                })
387            }
388        }
389    }
390
391    fn size_hint(&self) -> (usize, Option<usize>) {
392        // NOTE: This intentionally returns the upper bound for both `size_hint`
393        // values, as the upper bound doesn't actually seem to get used by `std`
394        // and using the actual lower bound is practically guaranteed to require
395        // a second capacity growth.
396        let upper_bound = self.remaining / self.min_size;
397        (upper_bound, Some(upper_bound))
398    }
399}
400
401///
402/// The error type returned from the `StreamCDC` iterator.
403///
404#[derive(Debug)]
405pub enum Error {
406    /// End of source data reached.
407    Empty,
408    /// An I/O error occurred.
409    IoError(std::io::Error),
410    /// Something unexpected happened.
411    Other(String),
412}
413
414impl fmt::Display for Error {
415    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416        write!(f, "chunker error: {self:?}")
417    }
418}
419
420impl std::error::Error for Error {}
421
422impl From<std::io::Error> for Error {
423    fn from(error: std::io::Error) -> Self {
424        Error::IoError(error)
425    }
426}
427
428impl From<Error> for std::io::Error {
429    fn from(error: Error) -> Self {
430        match error {
431            Error::IoError(ioerr) => ioerr,
432            Error::Empty => Self::from(std::io::ErrorKind::UnexpectedEof),
433            Error::Other(str) => Self::new(std::io::ErrorKind::Other, str),
434        }
435    }
436}
437
438///
439/// Represents a chunk returned from the StreamCDC iterator.
440///
441#[derive(Debug, Clone, Eq, PartialEq, Hash)]
442pub struct ChunkData {
443    /// The gear hash value as of the end of the chunk.
444    pub hash: u64,
445    /// Starting byte position within the source.
446    pub offset: u64,
447    /// Length of the chunk in bytes.
448    pub length: usize,
449    /// Source bytes contained in this chunk.
450    pub data: Vec<u8>,
451}
452
453///
454/// The FastCDC chunker implementation from 2016 with streaming support.
455///
456/// Use `new` to construct an instance, and then iterate over the `ChunkData`s
457/// via the `Iterator` trait.
458///
459/// Note that this struct allocates a `Vec<u8>` of `max_size` bytes to act as a
460/// buffer when reading from the source and finding chunk boundaries.
461///
462/// ```no_run
463/// # use std::fs::File;
464/// # use fastcdc::v2016::StreamCDC;
465/// let source = File::open("test/fixtures/SekienAkashita.jpg").unwrap();
466/// let chunker = StreamCDC::new(source, 4096, 16384, 65535);
467/// for result in chunker {
468///     let chunk = result.unwrap();
469///     println!("offset={} length={}", chunk.offset, chunk.length);
470/// }
471/// ```
472///
473pub struct StreamCDC<R: Read> {
474    /// Buffer of data from source for finding cut points.
475    buffer: Vec<u8>,
476    /// Maximum capacity of the buffer (always `max_size`).
477    capacity: usize,
478    /// Number of relevant bytes in the `buffer`.
479    length: usize,
480    /// Source from which data is read into `buffer`.
481    source: R,
482    /// Number of bytes read from the source so far.
483    processed: u64,
484    /// True when the source produces no more data.
485    eof: bool,
486    min_size: usize,
487    avg_size: usize,
488    max_size: usize,
489    mask_s: u64,
490    mask_l: u64,
491}
492
493impl<R: Read> StreamCDC<R> {
494    ///
495    /// Construct a `StreamCDC` that will process bytes from the given source.
496    ///
497    /// Uses chunk size normalization level 1 by default.
498    ///
499    pub fn new(source: R, min_size: u32, avg_size: u32, max_size: u32) -> Self {
500        StreamCDC::with_level(source, min_size, avg_size, max_size, Normalization::Level1)
501    }
502
503    ///
504    /// Create a new `StreamCDC` with the given normalization level.
505    ///
506    pub fn with_level(
507        source: R,
508        min_size: u32,
509        avg_size: u32,
510        max_size: u32,
511        level: Normalization,
512    ) -> Self {
513        assert!(min_size >= MINIMUM_MIN);
514        assert!(min_size <= MINIMUM_MAX);
515        assert!(avg_size >= AVERAGE_MIN);
516        assert!(avg_size <= AVERAGE_MAX);
517        assert!(max_size >= MAXIMUM_MIN);
518        assert!(max_size <= MAXIMUM_MAX);
519        let bits = logarithm2(avg_size);
520        let normalization = level.bits();
521        let mask_s = MASKS[(bits + normalization) as usize];
522        let mask_l = MASKS[(bits - normalization) as usize];
523        Self {
524            buffer: vec![0_u8; max_size as usize],
525            capacity: max_size as usize,
526            length: 0,
527            source,
528            eof: false,
529            processed: 0,
530            min_size: min_size as usize,
531            avg_size: avg_size as usize,
532            max_size: max_size as usize,
533            mask_s,
534            mask_l,
535        }
536    }
537
538    /// Fill the buffer with data from the source, returning the number of bytes
539    /// read (zero if end of source has been reached).
540    fn fill_buffer(&mut self) -> Result<usize, Error> {
541        // this code originally copied from asuran crate
542        if self.eof {
543            Ok(0)
544        } else {
545            let mut all_bytes_read = 0;
546            while !self.eof && self.length < self.capacity {
547                let bytes_read = self.source.read(&mut self.buffer[self.length..])?;
548                if bytes_read == 0 {
549                    self.eof = true;
550                } else {
551                    self.length += bytes_read;
552                    all_bytes_read += bytes_read;
553                }
554            }
555            Ok(all_bytes_read)
556        }
557    }
558
559    /// Drains a specified number of bytes from the buffer, then resizes the
560    /// buffer back to `capacity` size in preparation for further reads.
561    fn drain_bytes(&mut self, count: usize) -> Result<Vec<u8>, Error> {
562        // this code originally copied from asuran crate
563        if count > self.length {
564            Err(Error::Other(format!(
565                "drain_bytes() called with count larger than length: {} > {}",
566                count, self.length
567            )))
568        } else {
569            let data = self.buffer.drain(..count).collect::<Vec<u8>>();
570            self.length -= count;
571            self.buffer.resize(self.capacity, 0_u8);
572            Ok(data)
573        }
574    }
575
576    /// Find the next chunk in the source. If the end of the source has been
577    /// reached, returns `Error::Empty` as the error.
578    fn read_chunk(&mut self) -> Result<ChunkData, Error> {
579        self.fill_buffer()?;
580        if self.length == 0 {
581            Err(Error::Empty)
582        } else {
583            let (hash, count) = cut(
584                &self.buffer[..self.length],
585                self.min_size,
586                self.avg_size,
587                self.max_size,
588                self.mask_s,
589                self.mask_l,
590            );
591            if count == 0 {
592                Err(Error::Empty)
593            } else {
594                let offset = self.processed;
595                self.processed += count as u64;
596                let data = self.drain_bytes(count)?;
597                Ok(ChunkData {
598                    hash,
599                    offset,
600                    length: count,
601                    data,
602                })
603            }
604        }
605    }
606}
607
608impl<R: Read> Iterator for StreamCDC<R> {
609    type Item = Result<ChunkData, Error>;
610
611    fn next(&mut self) -> Option<Result<ChunkData, Error>> {
612        let slice = self.read_chunk();
613        if let Err(Error::Empty) = slice {
614            None
615        } else {
616            Some(slice)
617        }
618    }
619}
620
621///
622/// Base-2 logarithm function for unsigned 32-bit integers.
623///
624fn logarithm2(value: u32) -> u32 {
625    f64::from(value).log2().round() as u32
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631    use md5::{Digest, Md5};
632    use std::fs::{self, File};
633
634    #[test]
635    fn test_logarithm2() {
636        assert_eq!(logarithm2(0), 0);
637        assert_eq!(logarithm2(1), 0);
638        assert_eq!(logarithm2(2), 1);
639        assert_eq!(logarithm2(3), 2);
640        assert_eq!(logarithm2(5), 2);
641        assert_eq!(logarithm2(6), 3);
642        assert_eq!(logarithm2(11), 3);
643        assert_eq!(logarithm2(12), 4);
644        assert_eq!(logarithm2(19), 4);
645        assert_eq!(logarithm2(64), 6);
646        assert_eq!(logarithm2(128), 7);
647        assert_eq!(logarithm2(256), 8);
648        assert_eq!(logarithm2(512), 9);
649        assert_eq!(logarithm2(1024), 10);
650        assert_eq!(logarithm2(16383), 14);
651        assert_eq!(logarithm2(16384), 14);
652        assert_eq!(logarithm2(16385), 14);
653        assert_eq!(logarithm2(32767), 15);
654        assert_eq!(logarithm2(32768), 15);
655        assert_eq!(logarithm2(32769), 15);
656        assert_eq!(logarithm2(65535), 16);
657        assert_eq!(logarithm2(65536), 16);
658        assert_eq!(logarithm2(65537), 16);
659        assert_eq!(logarithm2(1_048_575), 20);
660        assert_eq!(logarithm2(1_048_576), 20);
661        assert_eq!(logarithm2(1_048_577), 20);
662        assert_eq!(logarithm2(4_194_303), 22);
663        assert_eq!(logarithm2(4_194_304), 22);
664        assert_eq!(logarithm2(4_194_305), 22);
665        assert_eq!(logarithm2(16_777_215), 24);
666        assert_eq!(logarithm2(16_777_216), 24);
667        assert_eq!(logarithm2(16_777_217), 24);
668    }
669
670    #[test]
671    #[should_panic]
672    fn test_minimum_too_low() {
673        let array = [0u8; 1024];
674        FastCDC::new(&array, 63, 256, 1024);
675    }
676
677    #[test]
678    #[should_panic]
679    fn test_minimum_too_high() {
680        let array = [0u8; 1024];
681        FastCDC::new(&array, 67_108_867, 256, 1024);
682    }
683
684    #[test]
685    #[should_panic]
686    fn test_average_too_low() {
687        let array = [0u8; 1024];
688        FastCDC::new(&array, 64, 255, 1024);
689    }
690
691    #[test]
692    #[should_panic]
693    fn test_average_too_high() {
694        let array = [0u8; 1024];
695        FastCDC::new(&array, 64, 268_435_457, 1024);
696    }
697
698    #[test]
699    #[should_panic]
700    fn test_maximum_too_low() {
701        let array = [0u8; 1024];
702        FastCDC::new(&array, 64, 256, 1023);
703    }
704
705    #[test]
706    #[should_panic]
707    fn test_maximum_too_high() {
708        let array = [0u8; 1024];
709        FastCDC::new(&array, 64, 256, 1_073_741_825);
710    }
711
712    #[test]
713    fn test_masks() {
714        let source = [0u8; 1024];
715        let chunker = FastCDC::new(&source, 64, 256, 1024);
716        assert_eq!(chunker.mask_l, MASKS[7]);
717        assert_eq!(chunker.mask_s, MASKS[9]);
718        let chunker = FastCDC::new(&source, 8192, 16384, 32768);
719        assert_eq!(chunker.mask_l, MASKS[13]);
720        assert_eq!(chunker.mask_s, MASKS[15]);
721        let chunker = FastCDC::new(&source, 1_048_576, 4_194_304, 16_777_216);
722        assert_eq!(chunker.mask_l, MASKS[21]);
723        assert_eq!(chunker.mask_s, MASKS[23]);
724    }
725
726    #[test]
727    fn test_cut_all_zeros() {
728        // for all zeros, always returns chunks of maximum size
729        let array = [0u8; 10240];
730        let chunker = FastCDC::new(&array, 64, 256, 1024);
731        let mut cursor: usize = 0;
732        for _ in 0..10 {
733            let (hash, pos) = chunker.cut(cursor, 10240 - cursor);
734            assert_eq!(hash, 14169102344523991076);
735            assert_eq!(pos, cursor + 1024);
736            cursor = pos;
737        }
738        // assert that nothing more should be returned
739        let (_, pos) = chunker.cut(cursor, 10240 - cursor);
740        assert_eq!(pos, 10240);
741    }
742
743    #[test]
744    fn test_cut_sekien_16k_chunks() {
745        let read_result = fs::read("test/fixtures/SekienAkashita.jpg");
746        assert!(read_result.is_ok());
747        let contents = read_result.unwrap();
748        let chunker = FastCDC::new(&contents, 4096, 16384, 65535);
749        let mut cursor: usize = 0;
750        let mut remaining: usize = contents.len();
751        let expected: Vec<(u64, usize)> = vec![
752            (17968276318003433923, 21325),
753            (4098594969649699419, 17140),
754            (15733367461443853673, 28084),
755            (4509236223063678303, 18217),
756            (2504464741100432583, 24700),
757        ];
758        for (e_hash, e_length) in expected.iter() {
759            let (hash, pos) = chunker.cut(cursor, remaining);
760            assert_eq!(hash, *e_hash);
761            assert_eq!(pos, cursor + e_length);
762            cursor = pos;
763            remaining -= e_length;
764        }
765        assert_eq!(remaining, 0);
766    }
767
768    #[test]
769    fn test_cut_sekien_32k_chunks() {
770        let read_result = fs::read("test/fixtures/SekienAkashita.jpg");
771        assert!(read_result.is_ok());
772        let contents = read_result.unwrap();
773        let chunker = FastCDC::new(&contents, 8192, 32768, 131072);
774        let mut cursor: usize = 0;
775        let mut remaining: usize = contents.len();
776        let expected: Vec<(u64, usize)> =
777            vec![(15733367461443853673, 66549), (2504464741100432583, 42917)];
778        for (e_hash, e_length) in expected.iter() {
779            let (hash, pos) = chunker.cut(cursor, remaining);
780            assert_eq!(hash, *e_hash);
781            assert_eq!(pos, cursor + e_length);
782            cursor = pos;
783            remaining -= e_length;
784        }
785        assert_eq!(remaining, 0);
786    }
787
788    #[test]
789    fn test_cut_sekien_64k_chunks() {
790        let read_result = fs::read("test/fixtures/SekienAkashita.jpg");
791        assert!(read_result.is_ok());
792        let contents = read_result.unwrap();
793        let chunker = FastCDC::new(&contents, 16384, 65536, 262144);
794        let mut cursor: usize = 0;
795        let mut remaining: usize = contents.len();
796        let expected: Vec<(u64, usize)> = vec![(2504464741100432583, 109466)];
797        for (e_hash, e_length) in expected.iter() {
798            let (hash, pos) = chunker.cut(cursor, remaining);
799            assert_eq!(hash, *e_hash);
800            assert_eq!(pos, cursor + e_length);
801            cursor = pos;
802            remaining -= e_length;
803        }
804        assert_eq!(remaining, 0);
805    }
806
807    struct ExpectedChunk {
808        hash: u64,
809        offset: u64,
810        length: usize,
811        digest: String,
812    }
813
814    #[test]
815    fn test_iter_sekien_16k_chunks() {
816        let read_result = fs::read("test/fixtures/SekienAkashita.jpg");
817        assert!(read_result.is_ok());
818        let contents = read_result.unwrap();
819        // The digest values are not needed here, but they serve to validate
820        // that the streaming version tested below is returning the correct
821        // chunk data on each iteration.
822        let expected_chunks = vec![
823            ExpectedChunk {
824                hash: 17968276318003433923,
825                offset: 0,
826                length: 21325,
827                digest: "2bb52734718194617c957f5e07ee6054".into(),
828            },
829            ExpectedChunk {
830                hash: 4098594969649699419,
831                offset: 21325,
832                length: 17140,
833                digest: "badfb0757fe081c20336902e7131f768".into(),
834            },
835            ExpectedChunk {
836                hash: 15733367461443853673,
837                offset: 38465,
838                length: 28084,
839                digest: "18412d7414de6eb42f638351711f729d".into(),
840            },
841            ExpectedChunk {
842                hash: 4509236223063678303,
843                offset: 66549,
844                length: 18217,
845                digest: "04fe1405fc5f960363bfcd834c056407".into(),
846            },
847            ExpectedChunk {
848                hash: 2504464741100432583,
849                offset: 84766,
850                length: 24700,
851                digest: "1aa7ad95f274d6ba34a983946ebc5af3".into(),
852            },
853        ];
854        let chunker = FastCDC::new(&contents, 4096, 16384, 65535);
855        let mut index = 0;
856        for chunk in chunker {
857            assert_eq!(chunk.hash, expected_chunks[index].hash);
858            assert_eq!(chunk.offset, expected_chunks[index].offset as usize);
859            assert_eq!(chunk.length, expected_chunks[index].length);
860            let mut hasher = Md5::new();
861            hasher.update(&contents[chunk.offset..chunk.offset + chunk.length]);
862            let table = hasher.finalize();
863            let digest = format!("{:x}", table);
864            assert_eq!(digest, expected_chunks[index].digest);
865            index += 1;
866        }
867        assert_eq!(index, 5);
868    }
869
870    #[test]
871    fn test_cut_sekien_16k_nc_0() {
872        let read_result = fs::read("test/fixtures/SekienAkashita.jpg");
873        assert!(read_result.is_ok());
874        let contents = read_result.unwrap();
875        let chunker = FastCDC::with_level(&contents, 4096, 16384, 65535, Normalization::Level0);
876        let mut cursor: usize = 0;
877        let mut remaining: usize = contents.len();
878        let expected: Vec<(u64, usize)> = vec![
879            (221561130519947581, 6634),
880            (15733367461443853673, 59915),
881            (10460176299449652894, 25597),
882            (6197802202431009942, 5237),
883            (2504464741100432583, 12083),
884        ];
885        for (e_hash, e_length) in expected.iter() {
886            let (hash, pos) = chunker.cut(cursor, remaining);
887            assert_eq!(hash, *e_hash);
888            assert_eq!(pos, cursor + e_length);
889            cursor = pos;
890            remaining -= e_length;
891        }
892        assert_eq!(remaining, 0);
893    }
894
895    #[test]
896    fn test_cut_sekien_16k_nc_3() {
897        let read_result = fs::read("test/fixtures/SekienAkashita.jpg");
898        assert!(read_result.is_ok());
899        let contents = read_result.unwrap();
900        let chunker = FastCDC::with_level(&contents, 4096, 16384, 65535, Normalization::Level3);
901        let mut cursor: usize = 0;
902        let mut remaining: usize = contents.len();
903        let expected: Vec<(u64, usize)> = vec![
904            (14582375164208481996, 17350),
905            (13104072099671895560, 19911),
906            (6161241554519610597, 17426),
907            (16009206469796846404, 17519),
908            (10460176299449652894, 19940),
909            (2504464741100432583, 17320),
910        ];
911        for (e_hash, e_length) in expected.iter() {
912            let (hash, pos) = chunker.cut(cursor, remaining);
913            assert_eq!(hash, *e_hash);
914            assert_eq!(pos, cursor + e_length);
915            cursor = pos;
916            remaining -= e_length;
917        }
918        assert_eq!(remaining, 0);
919    }
920
921    #[test]
922    fn test_error_fmt() {
923        let err = Error::Empty;
924        assert_eq!(format!("{err}"), "chunker error: Empty");
925    }
926
927    #[test]
928    fn test_stream_sekien_16k_chunks() {
929        let file_result = File::open("test/fixtures/SekienAkashita.jpg");
930        assert!(file_result.is_ok());
931        let file = file_result.unwrap();
932        // The set of expected results should match the non-streaming version.
933        let expected_chunks = vec![
934            ExpectedChunk {
935                hash: 17968276318003433923,
936                offset: 0,
937                length: 21325,
938                digest: "2bb52734718194617c957f5e07ee6054".into(),
939            },
940            ExpectedChunk {
941                hash: 4098594969649699419,
942                offset: 21325,
943                length: 17140,
944                digest: "badfb0757fe081c20336902e7131f768".into(),
945            },
946            ExpectedChunk {
947                hash: 15733367461443853673,
948                offset: 38465,
949                length: 28084,
950                digest: "18412d7414de6eb42f638351711f729d".into(),
951            },
952            ExpectedChunk {
953                hash: 4509236223063678303,
954                offset: 66549,
955                length: 18217,
956                digest: "04fe1405fc5f960363bfcd834c056407".into(),
957            },
958            ExpectedChunk {
959                hash: 2504464741100432583,
960                offset: 84766,
961                length: 24700,
962                digest: "1aa7ad95f274d6ba34a983946ebc5af3".into(),
963            },
964        ];
965        let chunker = StreamCDC::new(file, 4096, 16384, 65535);
966        let mut index = 0;
967        for result in chunker {
968            assert!(result.is_ok());
969            let chunk = result.unwrap();
970            assert_eq!(chunk.hash, expected_chunks[index].hash);
971            assert_eq!(chunk.offset, expected_chunks[index].offset);
972            assert_eq!(chunk.length, expected_chunks[index].length);
973            let mut hasher = Md5::new();
974            hasher.update(&chunk.data);
975            let table = hasher.finalize();
976            let digest = format!("{:x}", table);
977            assert_eq!(digest, expected_chunks[index].digest);
978            index += 1;
979        }
980        assert_eq!(index, 5);
981    }
982}