1use std::{
3 collections::{HashMap, hash_map::Entry},
4 fmt,
5 io::{Read, Write},
6 num::NonZero,
7};
8
9use dwarfs::section::SectionType;
10use rustic_cdc::{Rabin64, RollingHash64};
11use sha2::{Digest, Sha512_256};
12
13use crate::{
14 Error, Result,
15 metadata::Chunk,
16 section::{self, CompressParam},
17};
18
19type Chunks = Vec<Chunk>;
20
21pub trait Chunker {
23 fn put_reader(&mut self, rdr: &mut dyn Read) -> Result<Chunks>;
26
27 fn put_bytes(&mut self, mut bytes: &[u8]) -> Result<Chunks> {
31 self.put_reader(&mut bytes)
32 }
33}
34
35pub struct BasicChunker<W> {
39 buf: Box<[u8]>,
40 buf_len: usize,
41 compression: CompressParam,
42 w: section::Writer<W>,
43}
44
45impl<W: fmt::Debug> fmt::Debug for BasicChunker<W> {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 f.debug_struct("BasicChunker")
48 .field("buf", &format_args!("{}/{}", self.buf_len, self.buf.len()))
49 .field("compression", &self.compression)
50 .field("w", &self.w)
51 .finish()
52 }
53}
54
55impl<W> BasicChunker<W> {
56 pub fn new(
62 w: section::Writer<W>,
63 block_size: NonZero<u32>,
64 compression: CompressParam,
65 ) -> Self {
66 Self {
67 buf: vec![0u8; block_size.get() as usize].into_boxed_slice(),
68 buf_len: 0,
69 compression,
70 w,
71 }
72 }
73
74 pub fn finish(mut self) -> Result<section::Writer<W>>
76 where
77 W: Write,
78 {
79 if self.buf_len != 0 {
80 self.w.write_section(
81 SectionType::BLOCK,
82 self.compression,
83 &self.buf[..self.buf_len],
84 )?;
85 self.buf_len = 0;
86 }
87 Ok(self.w)
88 }
89
90 fn put_reader_inner(&mut self, rdr: &mut dyn Read) -> Result<SeqChunks>
91 where
92 W: Write,
93 {
94 let mut chunks = SeqChunks {
95 start_section_idx: self.w.section_count(),
96 start_offset: self.buf_len as u32,
97 len: 0,
98 };
99 loop {
100 while self.buf_len < self.buf.len() {
101 match rdr.read(&mut self.buf[self.buf_len..]) {
102 Ok(0) => return Ok(chunks),
103 Ok(n) => {
104 self.buf_len += n;
105 chunks.len += n as u64;
106 }
107 Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
108 Err(err) => return Err(err.into()),
109 }
110 }
111
112 debug_assert_eq!(self.buf_len, self.buf.len());
113 self.w
114 .write_section(SectionType::BLOCK, self.compression, &self.buf)?;
115 self.buf_len = 0;
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy)]
121struct SeqChunks {
122 start_section_idx: u32,
123 start_offset: u32,
124 len: u64,
125}
126
127impl SeqChunks {
128 fn to_chunks(mut self, block_size: u32) -> impl Iterator<Item = Chunk> {
129 std::iter::from_fn(move || {
130 let rest_len = block_size - self.start_offset;
131 if self.len == 0 {
132 None
133 } else if self.len <= u64::from(rest_len) {
134 let c = Chunk {
135 section_idx: self.start_section_idx,
136 offset: self.start_offset,
137 size: self.len as u32,
138 };
139 self.len = 0;
140 Some(c)
141 } else {
142 let c = Chunk {
143 section_idx: self.start_section_idx,
144 offset: self.start_offset,
145 size: rest_len,
146 };
147 self.len -= u64::from(rest_len);
148 self.start_section_idx += 1;
149 self.start_offset = 0;
150 Some(c)
151 }
152 })
153 }
154}
155
156impl<W: Write> Chunker for BasicChunker<W> {
157 fn put_reader(&mut self, rdr: &mut dyn Read) -> Result<Chunks> {
158 let seq = self.put_reader_inner(rdr)?;
159 Ok(seq.to_chunks(self.buf.len() as u32).collect())
160 }
161}
162
163pub struct CdcChunker<W> {
167 inner: BasicChunker<W>,
168 rabin: Rabin64,
170 chunk_buf: Box<[u8]>,
171
172 table: HashMap<u64, CdcChunk>,
173 deduplicated_bytes: u64,
174}
175
176impl<W: fmt::Debug> fmt::Debug for CdcChunker<W> {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 f.debug_struct("CdcChunker")
179 .field("inner", &self.inner)
180 .field("table_size", &self.table.len())
181 .field("deduplicated_bytes", &self.deduplicated_bytes)
182 .finish_non_exhaustive()
183 }
184}
185
186struct CdcChunk {
187 sha256_suffix: [u8; 24],
188 start_section_idx: u32,
189 start_offset: u32,
190}
191
192impl<W> CdcChunker<W> {
193 const WINDOW_SIZE_BITS: u32 = 6;
194 const WINDOW_SIZE: usize = 1usize << Self::WINDOW_SIZE_BITS;
195 const CUT_MASK: u64 = (1u64 << 11) - 1;
196 const MIN_CHUNK_SIZE: usize = Self::WINDOW_SIZE;
197 const MAX_CHUNK_SIZE: usize = 64 << 10;
198
199 pub fn new(inner: BasicChunker<W>) -> Self {
201 let rabin = Rabin64::new(Self::WINDOW_SIZE_BITS);
202 CdcChunker {
203 inner,
204 rabin,
205 chunk_buf: vec![0u8; Self::MAX_CHUNK_SIZE].into_boxed_slice(),
206 table: HashMap::new(),
207 deduplicated_bytes: 0,
208 }
209 }
210
211 pub fn deduplicated_bytes(&self) -> u64 {
213 self.deduplicated_bytes
214 }
215
216 pub fn finish(self) -> Result<section::Writer<W>>
218 where
219 W: Write,
220 {
221 self.inner.finish()
222 }
223}
224
225impl<W: Write> Chunker for CdcChunker<W> {
226 fn put_reader(&mut self, rdr: &mut dyn Read) -> Result<Chunks> {
227 let block_size = self.inner.buf.len() as u32;
228
229 let mut chunks = Chunks::new();
230 let mut record_chunk = |cdchunk: &[u8]| {
231 debug_assert_ne!(cdchunk.len(), 0);
232
233 let hash = Sha512_256::new_with_prefix(cdchunk).finalize();
234 let (&hash_prefix, hash_suffix) = hash.split_first_chunk::<8>().expect("hash is 32B");
235 let hash_suffix: [u8; 24] = hash_suffix.try_into().expect("hash is 32B");
236
237 let seq = match self.table.entry(u64::from_ne_bytes(hash_prefix)) {
238 Entry::Vacant(ent) => {
239 let seq = self.inner.put_reader_inner(&mut { cdchunk })?;
240 ent.insert(CdcChunk {
241 sha256_suffix: hash_suffix,
242 start_section_idx: seq.start_section_idx,
243 start_offset: seq.start_offset,
244 });
245 seq
246 }
247 Entry::Occupied(ent) if ent.get().sha256_suffix == hash_suffix => {
248 self.deduplicated_bytes += cdchunk.len() as u64;
249 SeqChunks {
250 start_section_idx: ent.get().start_section_idx,
251 start_offset: ent.get().start_offset,
252 len: cdchunk.len() as u64,
253 }
254 }
255 Entry::Occupied(_) => self.inner.put_reader_inner(&mut { cdchunk })?,
257 };
258
259 for c in seq.to_chunks(block_size) {
261 if let Some(p) = chunks
262 .last_mut()
263 .filter(|p| (p.section_idx, p.offset + p.size) == (c.section_idx, c.offset))
264 {
265 p.size += c.size;
266 } else {
267 chunks.push(c);
268 }
269 }
270
271 Ok::<_, Error>(())
272 };
273
274 self.rabin.reset();
275
276 let mut cut_pos = 0usize;
281 let mut end_pos = 0usize;
282 loop {
283 assert_ne!(end_pos, self.chunk_buf.len());
284 let read_len = match rdr.read(&mut self.chunk_buf[end_pos..]) {
285 Ok(0) => break,
286 Ok(n) => n,
287 Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
288 Err(err) => return Err(err.into()),
289 };
290
291 for (&b, pos) in self.chunk_buf[end_pos..end_pos + read_len]
292 .iter()
293 .zip(end_pos..)
294 {
295 self.rabin.slide(b);
296 let len = pos - cut_pos + 1;
299
300 if len >= Self::MIN_CHUNK_SIZE && self.rabin.hash & Self::CUT_MASK == Self::CUT_MASK
302 || len >= Self::MAX_CHUNK_SIZE
303 {
304 let cdchunk = &self.chunk_buf[cut_pos..pos];
305 cut_pos = pos;
306 record_chunk(cdchunk)?;
307 }
308 }
309 end_pos += read_len;
310
311 if end_pos >= self.chunk_buf.len() {
314 debug_assert_eq!(end_pos, self.chunk_buf.len());
315 self.chunk_buf.copy_within(cut_pos.., 0);
316 end_pos -= cut_pos;
317 cut_pos = 0;
318 }
319 }
320
321 if cut_pos < end_pos {
322 record_chunk(&self.chunk_buf[cut_pos..end_pos])?;
323 }
324
325 Ok(chunks)
326 }
327}