use super::*;
#[cfg(all(feature = "futures", not(feature = "tokio")))]
use futures::{
io::{AsyncRead, AsyncReadExt},
stream::Stream,
};
#[cfg(all(feature = "tokio", not(feature = "futures")))]
use tokio_stream::Stream;
#[cfg(all(feature = "tokio", not(feature = "futures")))]
use tokio::io::{AsyncRead, AsyncReadExt};
#[cfg(all(feature = "tokio", not(feature = "futures")))]
use async_stream::try_stream;
pub struct AsyncStreamCDC<R> {
buffer: Vec<u8>,
capacity: usize,
length: usize,
source: R,
processed: u64,
eof: bool,
min_size: usize,
avg_size: usize,
max_size: usize,
mask_s: u64,
mask_l: u64,
mask_s_ls: u64,
mask_l_ls: u64,
gear: Cow<'static, [u64]>,
gear_ls: Cow<'static, [u64]>,
}
impl<R: AsyncRead + Unpin> AsyncStreamCDC<R> {
pub fn new(source: R, min_size: usize, avg_size: usize, max_size: usize) -> Self {
Self::with_level(source, min_size, avg_size, max_size, Normalization::Level1)
}
pub fn with_level(
source: R,
min_size: usize,
avg_size: usize,
max_size: usize,
level: Normalization,
) -> Self {
Self::with_level_and_seed(source, min_size, avg_size, max_size, level, 0)
}
pub fn with_level_and_seed(
source: R,
min_size: usize,
avg_size: usize,
max_size: usize,
level: Normalization,
seed: u64,
) -> Self {
debug_assert!(min_size >= MINIMUM_MIN);
debug_assert!(min_size <= MINIMUM_MAX);
debug_assert!(avg_size >= AVERAGE_MIN);
debug_assert!(avg_size <= AVERAGE_MAX);
debug_assert!(max_size >= MAXIMUM_MIN);
debug_assert!(max_size <= MAXIMUM_MAX);
let bits = avg_size.ilog2();
let normalization = level.bits();
let mask_s = MASKS[(bits + normalization) as usize];
let mask_l = MASKS[(bits - normalization) as usize];
let (gear, gear_ls) = get_gear_with_seed(seed);
Self {
buffer: vec![0_u8; max_size],
capacity: max_size,
length: 0,
source,
eof: false,
processed: 0,
min_size,
avg_size,
max_size,
mask_s,
mask_l,
mask_s_ls: mask_s << 1,
mask_l_ls: mask_l << 1,
gear,
gear_ls,
}
}
async fn fill_buffer(&mut self) -> Result<usize, Error> {
if self.eof {
Ok(0)
} else {
let mut all_bytes_read = 0;
while !self.eof && self.length < self.capacity {
let bytes_read = self.source.read(&mut self.buffer[self.length..]).await?;
if bytes_read == 0 {
self.eof = true;
} else {
self.length += bytes_read;
all_bytes_read += bytes_read;
}
}
Ok(all_bytes_read)
}
}
fn drain_bytes(&mut self, count: usize) -> Result<Vec<u8>, Error> {
if count > self.length {
Err(Error::Other(format!(
"drain_bytes() called with count larger than length: {} > {}",
count, self.length
)))
} else {
let mut data = Vec::with_capacity(count);
data.extend_from_slice(&self.buffer[..count]);
self.buffer.copy_within(count..self.length, 0);
self.length -= count;
Ok(data)
}
}
async fn read_chunk(&mut self) -> Result<ChunkData, Error> {
self.fill_buffer().await?;
if self.length == 0 {
Err(Error::Empty)
} else {
let (hash, count) = cut_gear(
&self.buffer[..self.length],
self.min_size,
self.avg_size,
self.max_size,
self.mask_s,
self.mask_l,
self.mask_s_ls,
self.mask_l_ls,
&self.gear,
&self.gear_ls,
);
if count == 0 {
Err(Error::Empty)
} else {
let offset = self.processed;
self.processed += count as u64;
let data = self.drain_bytes(count)?;
Ok(ChunkData {
hash,
offset,
length: count,
data,
})
}
}
}
#[cfg(all(feature = "tokio", not(feature = "futures")))]
pub fn as_stream(&mut self) -> impl Stream<Item = Result<ChunkData, Error>> + '_ {
try_stream! {
loop {
match self.read_chunk().await {
Ok(chunk) => yield chunk,
Err(Error::Empty) => {
break;
}
error @ Err(_) => {
error?;
}
}
}
}
}
#[cfg(all(feature = "futures", not(feature = "tokio")))]
pub fn as_stream(&mut self) -> impl Stream<Item = Result<ChunkData, Error>> + '_ {
futures::stream::unfold(self, |this| async {
let chunk = this.read_chunk().await;
if let Err(Error::Empty) = chunk {
None
} else {
Some((chunk, this))
}
})
}
}
#[cfg(test)]
mod tests {
use super::AsyncStreamCDC;
use crate::v2020::MASKS;
#[test]
#[should_panic]
fn test_minimum_too_low() {
let array = [0u8; 1024];
AsyncStreamCDC::new(array.as_slice(), 63, 256, 1024);
}
#[test]
#[should_panic]
fn test_minimum_too_high() {
let array = [0u8; 1024];
AsyncStreamCDC::new(array.as_slice(), 67_108_867, 256, 1024);
}
#[test]
#[should_panic]
fn test_average_too_low() {
let array = [0u8; 1024];
AsyncStreamCDC::new(array.as_slice(), 64, 255, 1024);
}
#[test]
#[should_panic]
fn test_average_too_high() {
let array = [0u8; 1024];
AsyncStreamCDC::new(array.as_slice(), 64, 268_435_457, 1024);
}
#[test]
#[should_panic]
fn test_maximum_too_low() {
let array = [0u8; 1024];
AsyncStreamCDC::new(array.as_slice(), 64, 256, 1023);
}
#[test]
#[should_panic]
fn test_maximum_too_high() {
let array = [0u8; 1024];
AsyncStreamCDC::new(array.as_slice(), 64, 256, 1_073_741_825);
}
#[test]
fn test_masks() {
let source = [0u8; 1024];
let chunker = AsyncStreamCDC::new(source.as_slice(), 64, 256, 1024);
assert_eq!(chunker.mask_l, MASKS[7]);
assert_eq!(chunker.mask_s, MASKS[9]);
let chunker = AsyncStreamCDC::new(source.as_slice(), 8192, 16384, 32768);
assert_eq!(chunker.mask_l, MASKS[13]);
assert_eq!(chunker.mask_s, MASKS[15]);
let chunker = AsyncStreamCDC::new(source.as_slice(), 1_048_576, 4_194_304, 16_777_216);
assert_eq!(chunker.mask_l, MASKS[21]);
assert_eq!(chunker.mask_s, MASKS[23]);
}
struct ExpectedChunk {
hash: u64,
offset: u64,
length: usize,
digest: String,
}
#[cfg(all(feature = "futures", not(feature = "tokio")))]
use futures::stream::StreamExt;
#[cfg(all(feature = "tokio", not(feature = "futures")))]
use tokio_stream::StreamExt;
#[cfg_attr(all(feature = "tokio", not(feature = "futures")), tokio::test)]
#[cfg_attr(all(feature = "futures", not(feature = "tokio")), futures_test::test)]
async fn test_iter_sekien_16k_chunks() {
let read_result = std::fs::read("test/fixtures/SekienAkashita.jpg");
assert!(read_result.is_ok());
let contents = read_result.unwrap();
let expected_chunks = [
ExpectedChunk {
hash: 17968276318003433923,
offset: 0,
length: 21325,
digest: "261930e84e14c240210ae8c459acc4bb85dd52f1b91c868f2106dbc1ceb3acca".into(),
},
ExpectedChunk {
hash: 8197189939299398838,
offset: 21325,
length: 17140,
digest: "a01747cf21202f0068b8897d2be92aa4479b7ac7207b3baa5057b8ec75fa1c10".into(),
},
ExpectedChunk {
hash: 13019990849178155730,
offset: 38465,
length: 28084,
digest: "01e5305fb8f54d214ed2946843ea360fb9bb3f5df66ef3e34fb024d32ebcaee1".into(),
},
ExpectedChunk {
hash: 4509236223063678303,
offset: 66549,
length: 18217,
digest: "fc28c67b6ef846a841452a215bf704058f65cba5c1d78160398d3c2e046642f9".into(),
},
ExpectedChunk {
hash: 2504464741100432583,
offset: 84766,
length: 24700,
digest: "f6996300fce24d3da56c81ea52e5f4f461ce6adb4496f65252996e1082471aac".into(),
},
];
let mut chunker = AsyncStreamCDC::new(contents.as_ref(), 4096, 16384, 65535);
let stream = chunker.as_stream();
let chunks = stream.collect::<Vec<_>>().await;
let mut index = 0;
for chunk in chunks {
let chunk = chunk.unwrap();
assert_eq!(chunk.hash, expected_chunks[index].hash);
assert_eq!(chunk.offset, expected_chunks[index].offset);
assert_eq!(chunk.length, expected_chunks[index].length);
let mut hasher = blake3::Hasher::new();
hasher
.update(&contents[(chunk.offset as usize)..(chunk.offset as usize) + chunk.length]);
let digest = format!("{}", hasher.finalize()).to_lowercase();
assert_eq!(digest, expected_chunks[index].digest);
index += 1;
}
assert_eq!(index, 5);
}
}