1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
use std::cmp::{max, min};
use std::io::Read;
use crate::byte_table::BYTE_TABLE;
use crate::file_list::{Block, FileBlocks};
use crate::hashing::{Hashing, HashType};
// be reasonable.
const MIN_WINDOW_BITS: u8 = 8;
const MIN_BLOCK_BITS: u8 = 8;
/// An engine that hashes data, looking for good split points. When it finds
/// a split point, it emits the length of the block and its hash (SHA-256,
/// Blake2, or Blake3).
///
/// You can configure the window size, preferred block size, and min/max
/// block sizes (all in powers of 2).
pub struct Scanner {
min_bits: u8,
pref_bits: u8,
max_bits: u8,
window_bits: u8,
// state:
processed: isize,
window: Vec<u32>,
window_index: usize,
sum: u32,
// hashing state:
digest: Hashing,
}
pub struct ScanIterator<'a, R: Read> {
scanner: &'a mut Scanner,
reader: &'a mut R,
buffer: &'a mut [u8],
buffer_index: usize,
buffer_size: usize,
ended: bool,
digest: &'a mut Hashing,
}
impl Scanner {
/// Create a new scanner with:
///
/// - hash_type: which hashing function to use
/// - min_bits: minimum block size
/// - pref_bits: preferred block size
/// - max_bits: maximum block size
/// - window_bits: size of the rolling buffer, or how far back to include
/// in the rolling hash
///
/// All sizes are in powers of 2, so `Scanner::new(18, 20, 22, 10)` will
/// use a window size of 1K, a preferred block size of 1M, and a min/max
/// of 256K/4M.
pub fn new(hash_type: HashType, min_bits: u8, pref_bits: u8, max_bits: u8, window_bits: u8) -> Scanner {
debug_assert!(window_bits >= MIN_WINDOW_BITS);
debug_assert!(min_bits >= MIN_BLOCK_BITS);
debug_assert!(min_bits < pref_bits);
debug_assert!(pref_bits < max_bits);
Scanner {
min_bits,
pref_bits,
max_bits,
window_bits,
processed: 0,
window: vec![0_u32; 1 << window_bits],
window_index: 0,
sum: 0,
digest: Hashing::new(hash_type),
}
}
/// Process a chunk of data and determine if we've hit a block boundary.
/// Returns the number of bytes consumed, and optionally block metadata.
/// If it returns a block, it may not have finished processing this chunk
/// of data, so if `consumed < data.len()`, then you should call this
/// method again, with `&data[consumed..]`.
pub fn feed(&mut self, data: &[u8]) -> (usize, Option<Block>) {
let min_size: isize = 1 << self.min_bits;
let max_size: isize = 1 << self.max_bits;
let window_size: isize = 1 << self.window_bits;
let mask: u32 = (1 << self.pref_bits) - 1;
// when to start & stop computing the hash, for this chunk of data:
let min_start = max(0, min_size - window_size);
let start = max(0, min_start - self.processed);
let end = min(data.len() as isize, max_size - self.processed);
let mut sum = self.sum;
let mut window_index = self.window_index;
for i in start..end {
let t = BYTE_TABLE[data[i as usize] as usize];
sum = sum.rotate_left(1) ^ t ^ self.window[window_index];
self.window[window_index] = t;
window_index = (window_index + 1) & (window_size as usize - 1);
if (sum & mask) == 0 && self.processed + i >= min_size {
// split the cell!
let stop = i + 1;
self.processed += i + 1;
self.digest.update(&data[0..stop as usize]);
return (stop as usize, Some(self.mark()));
}
}
self.digest.update(&data[0..(end as usize)]);
self.processed += end;
self.window_index = window_index;
self.sum = sum;
if self.processed < max_size {
(end as usize, None)
} else {
(end as usize, Some(self.mark()))
}
}
/// Terminate the scanner. Return a block if there was one in flight. It
/// may be smaller than the minimum size.
pub fn finish(&mut self) -> Option<Block> {
if self.processed == 0 {
None
} else {
Some(self.mark())
}
}
// mark the end of a block, reset internal state, and return the block data
fn mark(&mut self) -> Block {
let block = Block { size: self.processed as usize, hash: self.digest.finalize_reset() };
self.reset();
block
}
fn reset(&mut self) {
self.processed = 0;
self.window.clear();
self.window.resize(1 << self.window_bits, 0);
self.window_index = 0;
self.sum = 0;
}
/// Scan a reader (`Read`), using a given temporary buffer, and computing
/// a SHA-256 hash as we go.
pub fn reader_iter<'a, R: Read>(
&'a mut self,
reader: &'a mut R,
buffer: &'a mut [u8],
digest: &'a mut Hashing,
) -> ScanIterator<'a, R> {
ScanIterator {
scanner: self,
reader,
buffer,
buffer_index: 0,
buffer_size: 0,
ended: false,
digest,
}
}
/// Exhaustively scan a `Read` and return a vector of blocks. The scanner
/// is reset at the end so that it can be immediately reused on another
/// reader.
pub fn scan_reader(
&mut self,
reader: &mut impl Read,
buffer: &mut [u8],
hash_type: HashType
) -> std::io::Result<FileBlocks> {
let mut blocks: Vec<Block> = Vec::new();
let mut digest = Hashing::new(hash_type);
loop {
let n = reader.read(buffer)?;
if n == 0 { break; }
digest.update(&buffer[..n]);
let mut i = 0;
while i < n {
let (consumed, maybe_block) = self.feed(&buffer[i..n]);
i += consumed;
if let Some(block) = maybe_block {
blocks.push(block);
}
}
}
if let Some(block) = self.finish() {
blocks.push(block);
}
let hash = digest.finalize_reset();
Ok(FileBlocks { blocks, hash })
}
}
impl<R: Read> Iterator for ScanIterator<'_, R> {
type Item = std::io::Result<Block>;
fn next(&mut self) -> Option<std::io::Result<Block>> {
loop {
if self.ended { return None; }
if self.buffer_index == self.buffer_size {
match self.reader.read(self.buffer) {
Err(e) => {
self.ended = true;
return Some(Err(e));
}
Ok(n) => {
if n == 0 {
self.ended = true;
return self.scanner.finish().map(Ok);
}
self.buffer_index = 0;
self.buffer_size = n;
self.digest.update(&self.buffer[..n]);
}
}
}
let (consumed, maybe_block) = self.scanner.feed(&self.buffer[self.buffer_index .. self.buffer_size]);
self.buffer_index += consumed;
if let Some(block) = maybe_block {
return Some(Ok(block));
}
}
}
}