Skip to main content

hash_roll/
zstd.rs

1#![cfg(feature = "zstd")]
2
3//! zstd's `--rsyncable` option performs content defined chunking
4//!
5//! This has been minimally validated to match the implimentation from zstd, with the following
6//! caveats:
7//!
8//!   - Maximum chunk size is not implimented
9//!   - Only 1 test case with a single chunk edge (ie: 2 chunks) has been tested
10//!
11//! It uses a internal [rolling
12//! hash](https://github.com/facebook/zstd/blob/01261bc8b6fcfc77801788f8b1e2a2e5dd2e8e25/lib/compress/zstd_compress_internal.h#L658-L698)
13//! with 1 multiple and 2 additions. (see `ZSTD_rollingHash_append()` for core functionality).
14//!
15//! The rolling hash is then used by
16//! [`findSynchronizationPoint()`](https://github.com/facebook/zstd/blob/15c5e200235edc520c1bd678ed126a6dd05736e1/lib/compress/zstdmt_compress.c#L1931-L2001)
17//! in various ways to find "syncronization points" (ie: edges of chunks).
18//!
19//! [This issue thread comment ](https://github.com/facebook/zstd/issues/1155#issuecomment-520258862) also
20//! includes some explanation on the mechanism.
21//!
22//! The zstd code _does_ include in it's context information about _previous_ block that was
23//! emitted. In other words: the rolling hash isn't "reset" on block emittion. (Most chunking
24//! algorithms are reset on block emittion).
25use crate::{Chunk, ChunkIncr, ToChunkIncr};
26use std::convert::TryInto;
27use std::num::Wrapping;
28
29const RSYNC_LENGTH: usize = 32;
30const PRIME_8_BYTES: Wrapping<u64> = Wrapping(0xCF1BBCDCB7A56463);
31const ROLL_HASH_CHAR_OFFSET: Wrapping<u64> = Wrapping(10);
32
33#[derive(Debug, PartialEq, Eq, Clone)]
34pub struct Zstd {
35    hit_mask: u64,
36    prime_power: u64,
37}
38
39impl Default for Zstd {
40    fn default() -> Self {
41        // ../lib/compress/zstdmt_compress.c: jobSizeMB: 8, rsyncBits: 23, hitMask: 7fffff, primePower: f5507fe35f91f8cb
42        Self::with_target_section_size(8 << 20)
43    }
44}
45
46impl Zstd {
47    /*
48     * ```notrust
49        /* Aim for the targetsectionSize as the average job size. */
50        U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20);
51        U32 const rsyncBits = ZSTD_highbit32(jobSizeMB) + 20;
52        assert(jobSizeMB >= 1);
53        DEBUGLOG(4, "rsyncLog = %u", rsyncBits);
54        mtctx->rsync.hash = 0;
55        mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
56        mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);
57        ```
58    */
59    pub fn with_target_section_size(target_section_size: u64) -> Self {
60        let job_size_mb: u32 = (target_section_size >> 20).try_into().unwrap();
61        assert_ne!(job_size_mb, 0);
62        let rsync_bits = (job_size_mb.leading_zeros() ^ 31) + 20;
63        let hit_mask = (1u64 << rsync_bits) - 1;
64        let prime_power = PRIME_8_BYTES
65            .0
66            .wrapping_pow((RSYNC_LENGTH - 1).try_into().unwrap());
67        Self {
68            hit_mask,
69            prime_power,
70        }
71    }
72}
73
74#[cfg(test)]
75mod test {
76    #[test]
77    fn test_zstd_init_matches_upstream() {
78        let zstd = super::Zstd::default();
79        assert_eq!(zstd.hit_mask, 0x7f_ffff);
80        assert_eq!(zstd.prime_power, 0xf5507fe35f91f8cb);
81    }
82}
83
84#[derive(Default, Debug, PartialEq, Eq)]
85struct ZstdState {
86    hash: Wrapping<u64>,
87}
88
89impl ZstdState {
90    // `ZSTD_rollingHash_append()`
91    fn append(&mut self, data: &[u8]) {
92        for i in data {
93            self.hash *= PRIME_8_BYTES;
94            self.hash += Wrapping(*i as u64) + ROLL_HASH_CHAR_OFFSET;
95        }
96    }
97
98    // `ZSTD_rollingHash_rotate()`
99    fn rotate(&mut self, to_remove: u8, to_add: u8, prime_power: u64) {
100        self.hash -= (Wrapping(to_remove as u64) + ROLL_HASH_CHAR_OFFSET) * Wrapping(prime_power);
101        self.hash *= PRIME_8_BYTES;
102        self.hash += Wrapping(to_add as u64) + ROLL_HASH_CHAR_OFFSET;
103    }
104
105    fn at_split(&mut self, params: &Zstd) -> bool {
106        (self.hash.0 & params.hit_mask) == params.hit_mask
107    }
108}
109
110#[derive(Default, Debug, PartialEq, Eq)]
111pub struct ZstdSearchState {
112    state: ZstdState,
113    offset: usize,
114}
115
116impl ZstdSearchState {
117    fn append(&mut self, data: &[u8]) {
118        self.state.append(data);
119    }
120
121    fn rotate(&mut self, to_remove: u8, to_add: u8, prime_power: u64) {
122        self.state.rotate(to_remove, to_add, prime_power);
123    }
124
125    fn at_split(&mut self, params: &Zstd) -> bool {
126        self.state.at_split(params)
127    }
128}
129
130/// Incrimental chunking using Zstd's rsyncable algorithm
131///
132/// Performance note: Zstd's chunking requires buffer look back to remove previously inserted data,
133/// and as a result requires `ZstdIncr` to maintain an internal buffer. This internal buffer may
134/// reduce performance.
135#[derive(Debug, PartialEq, Eq)]
136pub struct ZstdIncr {
137    params: Zstd,
138
139    state: ZstdState,
140
141    window: Box<[u8]>,
142    // insert into the window at this offset
143    window_offs: usize,
144    // if true, we need to remove bytes from the window when inserting
145    //
146    // NOTE: by pre-filling `self.hash` with an appropriate value, we might be able to remove this
147    // variable and always treat the window as full (of zeros initially).
148    window_full: bool,
149
150    // how many byte since last emitted block
151    // used to cap the block size as zstd does
152    input_offs: u64,
153}
154
155impl ToChunkIncr for Zstd {
156    type Incr = ZstdIncr;
157
158    fn to_chunk_incr(&self) -> Self::Incr {
159        self.into()
160    }
161}
162
163impl From<Zstd> for ZstdIncr {
164    fn from(params: Zstd) -> Self {
165        Self {
166            params,
167            state: Default::default(),
168            window: vec![0; RSYNC_LENGTH].into_boxed_slice(),
169            window_offs: 0,
170            window_full: false,
171            input_offs: 0,
172        }
173    }
174}
175
176impl From<&Zstd> for ZstdIncr {
177    fn from(params: &Zstd) -> Self {
178        params.clone().into()
179    }
180}
181
182impl Chunk for Zstd {
183    type SearchState = ZstdSearchState;
184
185    fn to_search_state(&self) -> Self::SearchState {
186        Self::SearchState::default()
187    }
188
189    fn find_chunk_edge(
190        &self,
191        state: &mut Self::SearchState,
192        data: &[u8],
193    ) -> (Option<usize>, usize) {
194        if state.offset < RSYNC_LENGTH {
195            // push some data in
196            let seed_b = &data[state.offset..std::cmp::min(RSYNC_LENGTH, data.len())];
197            state.append(seed_b);
198            state.offset += seed_b.len();
199
200            if state.offset < RSYNC_LENGTH {
201                // not enough data
202                return (None, 0);
203            }
204        }
205
206        // TODO: track input_offs to split over-size blocks
207
208        // we've got enough data, do rotations
209        for i in state.offset..data.len() {
210            let to_remove = data[i - RSYNC_LENGTH];
211            let to_add = data[i];
212            state.rotate(to_remove, to_add, self.prime_power);
213            if state.at_split(self) {
214                let discard_ct = data.len().saturating_sub(RSYNC_LENGTH);
215                return (Some(i + 1), discard_ct);
216            }
217        }
218
219        let discard_ct = data.len().saturating_sub(RSYNC_LENGTH);
220        let keep_ct = data.len() - discard_ct;
221        state.offset = keep_ct;
222        (None, discard_ct)
223    }
224}
225
226impl ChunkIncr for ZstdIncr {
227    fn push(&mut self, data: &[u8]) -> Option<usize> {
228        let use_len = if !self.window_full {
229            let use_len = std::cmp::min(self.window.len() - self.window_offs, data.len());
230            self.window[self.window_offs..(self.window_offs + use_len)]
231                .copy_from_slice(&data[..use_len]);
232            self.window_offs += use_len;
233
234            if self.window_offs != self.window.len() {
235                return None;
236            }
237
238            self.window_full = true;
239            self.window_offs = 0;
240            self.state.append(&self.window[..]);
241            use_len
242        } else {
243            0
244        };
245
246        // TODO: track input_offs to split over-size blocks
247
248        // we have a full window, now rotate data through
249        for (i, &v) in data[use_len..].iter().enumerate() {
250            let to_remove = self.window[self.window_offs];
251            let to_add = v;
252            self.state
253                .rotate(to_remove, to_add, self.params.prime_power);
254            self.window[self.window_offs] = to_add;
255            self.window_offs = (self.window_offs + 1) % self.window.len();
256
257            if self.state.at_split(&self.params) {
258                // NOTE: don't clear window
259                return Some(i + use_len);
260            }
261        }
262
263        None
264    }
265}