use crate::chunker::{Chunk, ChunkSlice, Chunker, ChunkerState, RewindChunker, VacuumChunker};
use fastcdc::v2020::{Chunk as FastCdcChunk, FastCDC};
#[derive(Debug)]
pub struct ContentDefinedChunkerConfig {
pub max_size: u32,
pub min_size: u32,
pub avg_size: u32,
}
#[derive(Debug)]
pub struct ContentDefinedChunker {
buf: Vec<u8>,
pos: usize,
config: ContentDefinedChunkerConfig,
}
impl ContentDefinedChunker {
pub fn new(config: ContentDefinedChunkerConfig) -> Self {
Self {
buf: Vec::with_capacity(config.max_size as usize),
pos: 0,
config,
}
}
}
impl ChunkSlice for ContentDefinedChunker {
fn push_bytes(&mut self, bytes: &[u8]) {
self.buf.extend_from_slice(bytes);
}
fn next_chunk(&mut self) -> Option<Chunk<'_>> {
let remaining = self
.buf
.get(self.pos..)
.expect("Cursor unexpectedly out of bounds.");
if self.buf.is_empty() || (remaining.len() < self.config.max_size as usize) {
return None;
}
let mut cdc = FastCDC::new(
remaining,
self.config.min_size,
self.config.avg_size,
self.config.max_size,
);
if let Some(FastCdcChunk { offset, length, .. }) = cdc.next() {
let chunk = remaining
.get(offset..(offset + length))
.expect("Cursor unexpectedly out of bounds.");
self.pos += chunk.len();
Some(chunk)
} else {
None
}
}
fn buffer_size(&mut self) -> usize {
self.buf.len() - self.pos
}
fn remaining(&mut self) -> Chunk<'_> {
let chunk = self
.buf
.get(self.pos..)
.expect("Cursor unexpectedly out of bounds.");
self.pos = self.buf.len();
chunk
}
fn next_chunk_or_remaining(&mut self) -> Option<Chunk<'_>> {
let remaining = self
.buf
.get(self.pos..)
.expect("Cursor unexpectedly out of bounds.");
if self.buf.is_empty() || (remaining.len() < self.config.max_size as usize) {
let chunk = self
.buf
.get(self.pos..)
.expect("Cursor unexpectedly out of bounds.");
self.pos = self.buf.len();
if chunk.is_empty() {
return None;
} else {
return Some(chunk);
}
}
let mut cdc = FastCDC::new(
remaining,
self.config.min_size,
self.config.avg_size,
self.config.max_size,
);
if let Some(FastCdcChunk { offset, length, .. }) = cdc.next() {
let chunk = remaining
.get(offset..(offset + length))
.expect("Cursor unexpectedly out of bounds.");
self.pos += chunk.len();
if chunk.is_empty() { None } else { Some(chunk) }
} else {
let chunk = self
.buf
.get(self.pos..)
.expect("Cursor unexpectedly out of bounds.");
self.pos = self.buf.len();
if chunk.is_empty() { None } else { Some(chunk) }
}
}
}
impl VacuumChunker for ContentDefinedChunker {
fn vacuum(&mut self) {
self.buf.drain(0..self.pos);
self.pos = 0;
}
}
impl RewindChunker for ContentDefinedChunker {
type State = ChunkerState;
fn dump_state(&self) -> Self::State {
ChunkerState {
pos: self.pos,
buf_len: self.buf.len(),
}
}
fn rewind_to(&mut self, state: Self::State) {
self.pos = state.pos;
self.buf.truncate(state.buf_len);
}
}
impl Chunker for ContentDefinedChunker {}
#[cfg(test)]
mod tests {
use super::*;
use xpct::{be_gt, be_none, be_some, equal, expect, have_len};
fn test_config() -> ContentDefinedChunkerConfig {
ContentDefinedChunkerConfig {
min_size: 64,
avg_size: 256,
max_size: 1024,
}
}
mod push_bytes {
use super::*;
#[test]
fn appends_bytes_to_internal_buffer() {
let mut chunker = ContentDefinedChunker::new(test_config());
chunker.push_bytes(b"hello");
expect!(chunker.buffer_size()).to(equal(5));
chunker.push_bytes(b" world");
expect!(chunker.buffer_size()).to(equal(11));
}
}
mod next_chunk {
use super::*;
#[test]
fn returns_none_for_empty_buffer() {
let mut chunker = ContentDefinedChunker::new(test_config());
expect!(chunker.next_chunk()).to(be_none());
}
#[test]
fn returns_none_when_buffer_smaller_than_max_size() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 512];
chunker.push_bytes(&data);
expect!(chunker.next_chunk()).to(be_none());
}
#[test]
fn returns_chunk_when_buffer_at_least_max_size() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 1024];
chunker.push_bytes(&data);
expect!(chunker.next_chunk()).to(be_some());
}
#[test]
fn extracts_sequential_chunks() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 3000];
chunker.push_bytes(&data);
let first_chunk = expect!(chunker.next_chunk()).to(be_some()).into_inner();
let first_len = first_chunk.len();
let second_chunk = expect!(chunker.next_chunk()).to(be_some()).into_inner();
let second_len = second_chunk.len();
expect!(chunker.buffer_size()).to(equal(3000 - first_len - second_len));
}
}
mod buffer_size {
use super::*;
#[test]
fn returns_unconsumed_byte_count() {
let mut chunker = ContentDefinedChunker::new(test_config());
expect!(chunker.buffer_size()).to(equal(0));
chunker.push_bytes(b"test data");
expect!(chunker.buffer_size()).to(equal(9));
}
#[test]
fn decreases_after_chunk_extraction() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 2048];
chunker.push_bytes(&data);
let initial_size = chunker.buffer_size();
let chunk = expect!(chunker.next_chunk()).to(be_some()).into_inner();
let chunk_len = chunk.len();
expect!(chunker.buffer_size()).to(equal(initial_size - chunk_len));
}
}
mod remaining {
use super::*;
#[test]
fn returns_all_unconsumed_bytes() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = b"test data";
chunker.push_bytes(data);
let remaining = chunker.remaining();
expect!(remaining).to(equal(data.as_slice()));
}
#[test]
fn advances_position_to_end() {
let mut chunker = ContentDefinedChunker::new(test_config());
chunker.push_bytes(b"test data");
let _ = chunker.remaining();
expect!(chunker.buffer_size()).to(equal(0));
}
#[test]
fn returns_bytes_after_chunk_extraction() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 2048];
chunker.push_bytes(&data);
let chunk = expect!(chunker.next_chunk()).to(be_some()).into_inner();
let chunk_len = chunk.len();
let remaining = chunker.remaining();
expect!(remaining).to(have_len(2048 - chunk_len));
}
}
mod next_chunk_or_remaining {
use super::*;
#[test]
fn returns_none_for_empty_buffer() {
let mut chunker = ContentDefinedChunker::new(test_config());
expect!(chunker.next_chunk_or_remaining()).to(be_none());
}
#[test]
fn returns_remaining_bytes_when_buffer_smaller_than_max_size() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = b"partial data";
chunker.push_bytes(data);
let result = expect!(chunker.next_chunk_or_remaining())
.to(be_some())
.into_inner();
expect!(result).to(equal(data.as_slice()));
expect!(chunker.buffer_size()).to(equal(0));
}
#[test]
fn returns_chunk_when_buffer_at_least_max_size() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 2048];
chunker.push_bytes(&data);
let result_len = expect!(chunker.next_chunk_or_remaining())
.to(be_some())
.into_inner()
.len();
expect!(chunker.buffer_size()).to(be_gt(0));
expect!(result_len + chunker.buffer_size()).to(equal(2048));
}
}
mod vacuum {
use super::*;
#[test]
fn removes_consumed_bytes_preserves_unconsumed() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 2048];
chunker.push_bytes(&data);
let chunk = expect!(chunker.next_chunk()).to(be_some()).into_inner();
let chunk_len = chunk.len();
let remaining_before = chunker.buffer_size();
chunker.vacuum();
expect!(chunker.buffer_size()).to(equal(remaining_before));
expect!(chunker.buf).to(have_len(2048 - chunk_len));
}
#[test]
fn resets_position_to_zero() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 2048];
chunker.push_bytes(&data);
let _ = chunker.next_chunk();
chunker.vacuum();
expect!(chunker.pos).to(equal(0));
}
}
mod rewind {
use super::*;
#[test]
fn dump_state_captures_position_and_buffer_length() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 2048];
chunker.push_bytes(&data);
let _ = chunker.next_chunk();
let state = chunker.dump_state();
expect!(state.pos).to(equal(chunker.pos));
expect!(state.buf_len).to(equal(chunker.buf.len()));
}
#[test]
fn rewind_to_restores_state_after_push_bytes() {
let mut chunker = ContentDefinedChunker::new(test_config());
chunker.push_bytes(b"initial data");
let state = chunker.dump_state();
let original_pos = chunker.pos;
let original_buf_len = chunker.buf.len();
chunker.push_bytes(b" more data");
expect!(&chunker.buf).to(have_len(original_buf_len + 10));
chunker.rewind_to(state);
expect!(chunker.pos).to(equal(original_pos));
expect!(chunker.buf).to(have_len(original_buf_len));
}
#[test]
fn rewind_to_restores_state_after_next_chunk() {
let mut chunker = ContentDefinedChunker::new(test_config());
let data = vec![0u8; 2048];
chunker.push_bytes(&data);
let state = chunker.dump_state();
let original_pos = chunker.pos;
let _ = chunker.next_chunk();
expect!(chunker.pos).to(be_gt(original_pos));
chunker.rewind_to(state);
expect!(chunker.pos).to(equal(original_pos));
}
}
}