#[cfg(test)]
extern crate rand;
use std::io::{self, Read};
use std::mem::swap;
use std::num::Wrapping;
pub trait ChunkerImpl {
fn find_boundary(&mut self, data: &[u8]) -> Option<usize>;
fn reset(&mut self) {}
}
#[cfg(not(test))]
const BUF_SIZE: usize = 4096;
#[cfg(test)]
const BUF_SIZE: usize = 8;
pub struct Chunker<I: ChunkerImpl> {
inner: I,
}
impl<I: ChunkerImpl> Chunker<I> {
pub fn new(inner: I) -> Chunker<I> {
Chunker {
inner: inner,
}
}
pub fn whole_chunks<R: Read>(self, reader: R) -> WholeChunks<R, I> {
WholeChunks {
stream: self.stream(reader),
buffer: Vec::new(),
}
}
pub fn all_chunks<R: Read>(self, reader: R)
-> io::Result<Vec<Vec<u8>>>
{
let mut chunks = Vec::new();
for chunk in self.whole_chunks(reader) {
match chunk {
Ok(chunk) => chunks.push(chunk),
Err(e) => return Err(e)
}
}
Ok(chunks)
}
pub fn stream<R: Read>(self, reader: R) -> ChunkStream<R, I> {
ChunkStream {
reader: reader,
inner: self.inner,
buffer: [0u8; BUF_SIZE],
pos: 0,
len: 0,
status: EmitStatus::Data,
}
}
pub fn chunks<R: Read>(self, reader: R) -> ChunkInfoStream<R, I> {
ChunkInfoStream {
stream: self.stream(reader),
last_chunk: 0,
pos: 0,
}
}
pub fn slices(self, buffer: &[u8]) -> Slices<I> {
Slices {
inner: self.inner,
buffer: buffer,
pos: 0,
}
}
pub fn max_size(self, max: usize) -> Chunker<SizeLimited<I>> {
assert!(max > 0);
Chunker {
inner: SizeLimited {
inner: self.inner,
pos: 0,
max_size: max,
}
}
}
}
pub struct WholeChunks<R: Read, I: ChunkerImpl> {
stream: ChunkStream<R, I>,
buffer: Vec<u8>,
}
impl<R: Read, I: ChunkerImpl> Iterator for WholeChunks<R, I> {
type Item = io::Result<Vec<u8>>;
fn next(&mut self) -> Option<io::Result<Vec<u8>>> {
while let Some(chunk) = self.stream.read() {
match chunk {
Err(e) => return Some(Err(e)),
Ok(ChunkInput::Data(d)) => self.buffer.extend_from_slice(d),
Ok(ChunkInput::End) => {
let mut res = Vec::new();
swap(&mut res, &mut self.buffer);
return Some(Ok(res));
}
}
}
None
}
}
pub enum ChunkInput<'a> {
Data(&'a [u8]),
End,
}
#[derive(PartialEq, Eq)]
enum EmitStatus {
End, Data, AtSplit, }
pub struct ChunkStream<R: Read, I: ChunkerImpl> {
reader: R,
inner: I,
buffer: [u8; BUF_SIZE],
len: usize, pos: usize, status: EmitStatus,
}
impl<R: Read, I: ChunkerImpl> ChunkStream<R, I> {
pub fn read<'a>(&'a mut self) -> Option<io::Result<ChunkInput<'a>>> {
if self.status == EmitStatus::AtSplit {
self.status = EmitStatus::End;
self.inner.reset();
return Some(Ok(ChunkInput::End));
}
if self.pos == self.len {
assert!(self.status != EmitStatus::AtSplit);
self.pos = 0;
self.len = match self.reader.read(&mut self.buffer) {
Ok(l) => l,
Err(e) => return Some(Err(e)),
};
if self.len == 0 {
if self.status == EmitStatus::Data {
self.status = EmitStatus::End;
return Some(Ok(ChunkInput::End));
}
return None;
}
}
if let Some(split) = self.inner.find_boundary(
&self.buffer[self.pos..self.len])
{
assert!(self.pos + split < self.len);
self.status = EmitStatus::AtSplit;
let start = self.pos;
self.pos += split + 1;
return Some(Ok(ChunkInput::Data(&self.buffer[start..self.pos])));
}
let start = self.pos;
self.pos = self.len;
self.status = EmitStatus::Data;
Some(Ok(ChunkInput::Data(&self.buffer[start..self.len])))
}
}
pub struct ChunkInfo {
start: usize,
length: usize,
}
impl ChunkInfo {
pub fn start(&self) -> usize {
self.start
}
pub fn length(&self) -> usize {
self.length
}
pub fn end(&self) -> usize {
self.start + self.length
}
}
pub struct ChunkInfoStream<R: Read, I: ChunkerImpl> {
stream: ChunkStream<R, I>,
last_chunk: usize,
pos: usize,
}
impl<R: Read, I: ChunkerImpl> Iterator for ChunkInfoStream<R, I> {
type Item = io::Result<ChunkInfo>;
fn next(&mut self) -> Option<io::Result<ChunkInfo>> {
while let Some(chunk) = self.stream.read() {
match chunk {
Err(e) => return Some(Err(e)),
Ok(ChunkInput::Data(d)) => self.pos += d.len(),
Ok(ChunkInput::End) => {
let start = self.last_chunk;
self.last_chunk = self.pos;
return Some(Ok(ChunkInfo { start: start,
length: self.pos - start }));
}
}
}
None
}
}
pub struct Slices<'a, I: ChunkerImpl> {
inner: I,
buffer: &'a [u8],
pos: usize,
}
impl<'a, I: ChunkerImpl> Iterator for Slices<'a, I> {
type Item = &'a [u8];
fn next(&mut self) -> Option<&'a [u8]> {
if self.pos == self.buffer.len() {
None
} else {
if let Some(split) = self.inner.find_boundary(
&self.buffer[self.pos..])
{
assert!(self.pos + split < self.buffer.len());
let start = self.pos;
self.pos += split + 1;
self.inner.reset();
Some(&self.buffer[start..self.pos])
} else {
let start = self.pos;
self.pos = self.buffer.len();
Some(&self.buffer[start..])
}
}
}
}
pub struct SizeLimited<I: ChunkerImpl> {
inner: I,
pos: usize,
max_size: usize,
}
impl<I: ChunkerImpl> ChunkerImpl for SizeLimited<I> {
fn find_boundary(&mut self, data: &[u8]) -> Option<usize> {
assert!(self.max_size > self.pos);
if data.len() == 0 {
return None;
}
let left = self.max_size - self.pos;
if left == 1 {
Some(0)
} else {
let slice = if data.len() > left {
&data[..left]
} else {
data
};
match self.inner.find_boundary(slice) {
Some(p) => {
self.pos += p + 1;
Some(p)
}
None => {
self.pos += slice.len();
if data.len() >= left {
Some(left - 1)
} else {
None
}
}
}
}
}
fn reset(&mut self) {
self.pos = 0;
self.inner.reset();
}
}
const HM: Wrapping<u32> = Wrapping(123456791);
pub struct ZPAQ {
nbits: usize,
c1: u8, o1: [u8; 256],
h: Wrapping<u32>,
}
impl ZPAQ {
pub fn new(nbits: usize) -> ZPAQ {
ZPAQ {
nbits: 32 - nbits,
c1: 0,
o1: [0; 256],
h: HM,
}
}
pub fn update(&mut self, byte: u8) -> bool {
if byte == self.o1[self.c1 as usize] {
self.h = self.h * HM + Wrapping(byte as u32 + 1);
} else {
self.h = self.h * HM * Wrapping(2) + Wrapping(byte as u32 + 1);
}
self.o1[self.c1 as usize] = byte;
self.c1 = byte;
self.h.0 < (1 << self.nbits)
}
}
impl ChunkerImpl for ZPAQ {
fn find_boundary(&mut self, data: &[u8]) -> Option<usize> {
let mut pos = 0;
while pos < data.len() {
if self.update(data[pos]) {
return Some(pos);
}
pos += 1;
}
None
}
fn reset(&mut self) {
self.c1 = 0u8;
self.o1.clone_from_slice(&[0u8; 256]);
self.h = HM;
}
}
#[cfg(test)]
mod tests {
use rand::{self, Rng};
use ::{Chunker, ChunkInput, ZPAQ};
use std::io::{self, Read};
use std::str::from_utf8;
fn base() -> (Chunker<ZPAQ>, &'static [u8],
io::Cursor<&'static [u8]>, &'static [u8]) {
let rollinghash = ZPAQ::new(3); let chunker = Chunker::new(rollinghash);
let data = b"defghijklmnopqrstuvwxyz1234567890";
let expected = b"def|ghijk|lmno|pq|rstuvw|xyz123|4567890|";
(chunker, data, io::Cursor::new(data), expected)
}
#[test]
fn test_whole_chunks() {
let (chunker, _, reader, expected) = base();
let mut result = Vec::new();
for chunk in chunker.whole_chunks(reader) {
let chunk = chunk.unwrap();
result.extend(chunk);
result.push(b'|');
}
assert_eq!(from_utf8(&result).unwrap(),
from_utf8(&expected).unwrap());
}
#[test]
fn test_all_chunks() {
let (chunker, _, reader, expected) = base();
let mut result = Vec::new();
let chunks: Vec<Vec<u8>> = chunker.all_chunks(reader).unwrap();
for chunk in chunks {
result.extend(chunk);
result.push(b'|');
}
assert_eq!(from_utf8(&result).unwrap(),
from_utf8(&expected).unwrap());
}
#[test]
fn test_stream() {
let (chunker, _, reader, expected) = base();
let mut result = Vec::new();
let mut chunk_iter = chunker.stream(reader);
while let Some(chunk) = chunk_iter.read() {
let chunk = chunk.unwrap();
match chunk {
ChunkInput::Data(d) => {
result.extend(d);
}
ChunkInput::End => result.push(b'|'),
}
}
assert_eq!(from_utf8(&result).unwrap(),
from_utf8(&expected).unwrap());
}
#[test]
fn test_slices() {
let (chunker, data, _, expected) = base();
let mut result = Vec::new();
for slice in chunker.slices(data) {
result.extend(slice);
result.push(b'|');
}
assert_eq!(from_utf8(&result).unwrap(),
from_utf8(&expected).unwrap());
}
#[test]
fn test_chunks() {
let (chunker, _, reader, _) = base();
let mut result = Vec::new();
for chunk_info in chunker.chunks(reader) {
let chunk_info = chunk_info.unwrap();
result.push((chunk_info.start(), chunk_info.length()));
}
assert_eq!(result,
vec![(0, 3), (3, 5), (8, 4), (12, 2),
(14, 6), (20, 6), (26, 7)]);
}
#[test]
fn test_max_size() {
let (chunker, _, reader, _) = base();
let mut result = Vec::new();
for chunk_info in chunker.max_size(5).chunks(reader) {
let chunk_info = chunk_info.unwrap();
result.push((chunk_info.start(), chunk_info.length()));
}
assert_eq!(result,
vec![(0, 3), (3, 5), (8, 4), (12, 2),
(14, 5), (19, 5), (24, 3), (27, 5), (32, 1)]);
}
struct RngFile<R: Rng>(R);
impl<R: Rng> Read for RngFile<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.fill_bytes(buf);
Ok(buf.len())
}
}
#[test]
fn test_random() {
let mut count = 0;
let chunker = Chunker::new(ZPAQ::new(8));
let random = RngFile(rand::thread_rng());
for chunk in chunker.whole_chunks(random) {
count += 1;
if count >= 4096 {
break;
}
}
}
}