1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
#![cfg(feature = "zstd")]

//! zstd's `--rsyncable` option performs content defined chunking
//!
//! This has been minimally validated to match the implimentation from zstd, with the following
//! caveats:
//!
//!   - Maximum chunk size is not implimented
//!   - Only 1 test case with a single chunk edge (ie: 2 chunks) has been tested
//!
//! It uses a internal [rolling
//! hash](https://github.com/facebook/zstd/blob/01261bc8b6fcfc77801788f8b1e2a2e5dd2e8e25/lib/compress/zstd_compress_internal.h#L658-L698)
//! with 1 multiple and 2 additions. (see `ZSTD_rollingHash_append()` for core functionality).
//!
//! The rolling hash is then used by
//! [`findSynchronizationPoint()`](https://github.com/facebook/zstd/blob/15c5e200235edc520c1bd678ed126a6dd05736e1/lib/compress/zstdmt_compress.c#L1931-L2001)
//! in various ways to find "syncronization points" (ie: edges of chunks).
//!
//! [This issue thread comment ](https://github.com/facebook/zstd/issues/1155#issuecomment-520258862) also
//! includes some explanation on the mechanism.
//!
//! The zstd code _does_ include in it's context information about _previous_ block that was
//! emitted. In other words: the rolling hash isn't "reset" on block emittion. (Most chunking
//! algorithms are reset on block emittion).
use crate::{Chunk, ChunkIncr, ToChunkIncr};
use std::convert::TryInto;
use std::num::Wrapping;

const RSYNC_LENGTH: usize = 32;
const PRIME_8_BYTES: Wrapping<u64> = Wrapping(0xCF1BBCDCB7A56463);
const ROLL_HASH_CHAR_OFFSET: Wrapping<u64> = Wrapping(10);

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Zstd {
    hit_mask: u64,
    prime_power: u64,
}

impl Default for Zstd {
    fn default() -> Self {
        // ../lib/compress/zstdmt_compress.c: jobSizeMB: 8, rsyncBits: 23, hitMask: 7fffff, primePower: f5507fe35f91f8cb
        Self::with_target_section_size(8 << 20)
    }
}

impl Zstd {
    /*
     * ```notrust
        /* Aim for the targetsectionSize as the average job size. */
        U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20);
        U32 const rsyncBits = ZSTD_highbit32(jobSizeMB) + 20;
        assert(jobSizeMB >= 1);
        DEBUGLOG(4, "rsyncLog = %u", rsyncBits);
        mtctx->rsync.hash = 0;
        mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
        mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);
        ```
    */
    pub fn with_target_section_size(target_section_size: u64) -> Self {
        let job_size_mb: u32 = (target_section_size >> 20).try_into().unwrap();
        assert_ne!(job_size_mb, 0);
        let rsync_bits = (job_size_mb.leading_zeros() ^ 31) + 20;
        let hit_mask = (1u64 << rsync_bits) - 1;
        let prime_power = PRIME_8_BYTES
            .0
            .wrapping_pow((RSYNC_LENGTH - 1).try_into().unwrap());
        Self {
            hit_mask,
            prime_power,
        }
    }
}

#[cfg(test)]
mod test {
    #[test]
    fn test_zstd_init_matches_upstream() {
        let zstd = super::Zstd::default();
        assert_eq!(zstd.hit_mask, 0x7f_ffff);
        assert_eq!(zstd.prime_power, 0xf5507fe35f91f8cb);
    }
}

#[derive(Default, Debug, PartialEq, Eq)]
struct ZstdState {
    hash: Wrapping<u64>,
}

impl ZstdState {
    // `ZSTD_rollingHash_append()`
    fn append(&mut self, data: &[u8]) {
        for i in data {
            self.hash *= PRIME_8_BYTES;
            self.hash += Wrapping(*i as u64) + ROLL_HASH_CHAR_OFFSET;
        }
    }

    // `ZSTD_rollingHash_rotate()`
    fn rotate(&mut self, to_remove: u8, to_add: u8, prime_power: u64) {
        self.hash -= (Wrapping(to_remove as u64) + ROLL_HASH_CHAR_OFFSET) * Wrapping(prime_power);
        self.hash *= PRIME_8_BYTES;
        self.hash += Wrapping(to_add as u64) + ROLL_HASH_CHAR_OFFSET;
    }

    fn at_split(&mut self, params: &Zstd) -> bool {
        (self.hash.0 & params.hit_mask) == params.hit_mask
    }
}

#[derive(Default, Debug, PartialEq, Eq)]
pub struct ZstdSearchState {
    state: ZstdState,
    offset: usize,
}

impl ZstdSearchState {
    fn append(&mut self, data: &[u8]) {
        self.state.append(data);
    }

    fn rotate(&mut self, to_remove: u8, to_add: u8, prime_power: u64) {
        self.state.rotate(to_remove, to_add, prime_power);
    }

    fn at_split(&mut self, params: &Zstd) -> bool {
        self.state.at_split(params)
    }
}

/// Incrimental chunking using Zstd's rsyncable algorithm
///
/// Performance note: Zstd's chunking requires buffer look back to remove previously inserted data,
/// and as a result requires `ZstdIncr` to maintain an internal buffer. This internal buffer may
/// reduce performance.
#[derive(Debug, PartialEq, Eq)]
pub struct ZstdIncr {
    params: Zstd,

    state: ZstdState,

    window: Box<[u8]>,
    // insert into the window at this offset
    window_offs: usize,
    // if true, we need to remove bytes from the window when inserting
    //
    // NOTE: by pre-filling `self.hash` with an appropriate value, we might be able to remove this
    // variable and always treat the window as full (of zeros initially).
    window_full: bool,

    // how many byte since last emitted block
    // used to cap the block size as zstd does
    input_offs: u64,
}

impl ToChunkIncr for Zstd {
    type Incr = ZstdIncr;

    fn to_chunk_incr(&self) -> Self::Incr {
        self.into()
    }
}

impl From<Zstd> for ZstdIncr {
    fn from(params: Zstd) -> Self {
        Self {
            params,
            state: Default::default(),
            window: vec![0; RSYNC_LENGTH].into_boxed_slice(),
            window_offs: 0,
            window_full: false,
            input_offs: 0,
        }
    }
}

impl From<&Zstd> for ZstdIncr {
    fn from(params: &Zstd) -> Self {
        params.clone().into()
    }
}

impl Chunk for Zstd {
    type SearchState = ZstdSearchState;

    fn to_search_state(&self) -> Self::SearchState {
        Self::SearchState::default()
    }

    fn find_chunk_edge(
        &self,
        state: &mut Self::SearchState,
        data: &[u8],
    ) -> (Option<usize>, usize) {
        if state.offset < RSYNC_LENGTH {
            // push some data in
            let seed_b = &data[state.offset..std::cmp::min(RSYNC_LENGTH, data.len())];
            state.append(seed_b);
            state.offset += seed_b.len();

            if state.offset < RSYNC_LENGTH {
                // not enough data
                return (None, 0);
            }
        }

        // TODO: track input_offs to split over-size blocks

        // we've got enough data, do rotations
        for i in state.offset..data.len() {
            let to_remove = data[i - RSYNC_LENGTH];
            let to_add = data[i];
            state.rotate(to_remove, to_add, self.prime_power);
            if state.at_split(self) {
                let discard_ct = data.len().saturating_sub(RSYNC_LENGTH);
                return (Some(i + 1), discard_ct);
            }
        }

        let discard_ct = data.len().saturating_sub(RSYNC_LENGTH);
        let keep_ct = data.len() - discard_ct;
        state.offset = keep_ct;
        (None, discard_ct)
    }
}

impl ChunkIncr for ZstdIncr {
    fn push(&mut self, data: &[u8]) -> Option<usize> {
        let use_len = if !self.window_full {
            let use_len = std::cmp::min(self.window.len() - self.window_offs, data.len());
            self.window[self.window_offs..(self.window_offs + use_len)]
                .copy_from_slice(&data[..use_len]);
            self.window_offs += use_len;

            if self.window_offs != self.window.len() {
                return None;
            }

            self.window_full = true;
            self.window_offs = 0;
            self.state.append(&self.window[..]);
            use_len
        } else {
            0
        };

        // TODO: track input_offs to split over-size blocks

        // we have a full window, now rotate data through
        for (i, &v) in data[use_len..].iter().enumerate() {
            let to_remove = self.window[self.window_offs];
            let to_add = v;
            self.state
                .rotate(to_remove, to_add, self.params.prime_power);
            self.window[self.window_offs] = to_add;
            self.window_offs = (self.window_offs + 1) % self.window.len();

            if self.state.at_split(&self.params) {
                // NOTE: don't clear window
                return Some(i + use_len);
            }
        }

        None
    }
}