use std::io;
use futures_util::Stream;
use tokio::io::AsyncRead;
use super::{fixed_size::FixedSizeChunker, rolling_hash::RollingHashChunker, StreamingChunker};
use crate::{
rolling_hash::{BuzHash, RollSum},
Chunk,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct FilterBits(pub u32);
impl FilterBits {
pub fn from_size(size: u32) -> Self {
Self(30 - size.leading_zeros())
}
pub fn from_bits(bits: u32) -> Self {
Self(bits)
}
pub fn mask(self) -> u32 {
!0 >> (32 - self.0)
}
pub fn chunk_target_average(self) -> u32 {
1 << (self.0 + 1)
}
pub fn bits(self) -> u32 {
self.0
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct FilterConfig {
pub filter_bits: FilterBits,
pub min_chunk_size: usize,
pub max_chunk_size: usize,
pub window_size: usize,
}
impl Default for FilterConfig {
fn default() -> FilterConfig {
FilterConfig {
filter_bits: FilterBits::from_size(64 * 1024),
min_chunk_size: 16 * 1024,
max_chunk_size: 16 * 1024 * 1024,
window_size: 64,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Config {
BuzHash(FilterConfig),
RollSum(FilterConfig),
FixedSize(usize),
}
impl Config {
pub fn new_chunker<'r, R>(
&self,
source: R,
) -> Box<dyn Stream<Item = io::Result<(u64, Chunk)>> + Unpin + Send + 'r>
where
R: AsyncRead + Unpin + Send + 'r,
{
match self {
Config::BuzHash(filter) => Box::new(StreamingChunker::new(
RollingHashChunker::new(BuzHash::new(filter.window_size), filter),
source,
)),
Config::RollSum(filter) => Box::new(StreamingChunker::new(
RollingHashChunker::new(RollSum::new(filter.window_size), filter),
source,
)),
Config::FixedSize(fixed_size) => Box::new(StreamingChunker::new(
FixedSizeChunker::new(*fixed_size),
source,
)),
}
}
}