use super::{AEADReader, BlockBuffer, PoolRef, Reader, Writer};
use crate::{chunks::ChunkPointer, ObjectId};
use serde::{Deserialize, Serialize};
use std::io::{self, Read, Write};
const CHUNK_SIZE: usize = 500 * 1024;
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
pub struct Stream(Vec<ChunkPointer>);
pub type DeserializeStream =
crate::Deserializer<rmp_serde::decode::ReadReader<BufferedStream<PoolRef<AEADReader>>>>;
impl Stream {
pub fn open_reader<R: Reader, M: AsMut<R>>(&self, reader: M) -> BufferedStream<M> {
self.open_with_buffer(reader, BlockBuffer::default())
}
pub fn open_with_buffer<R: Reader, M: AsMut<R>>(
&self,
reader: M,
buffer: BlockBuffer,
) -> BufferedStream<M> {
BufferedStream {
reader,
chunks: self.0.iter().rev().cloned().collect(),
pos: None,
len: None,
buffer,
}
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn objects(&self) -> Vec<ObjectId> {
let mut objects = self
.0
.iter()
.map(|p| *p.object_id())
.collect::<std::collections::HashSet<_>>();
objects.drain().collect()
}
}
impl From<Vec<ChunkPointer>> for Stream {
fn from(ptrs: Vec<ChunkPointer>) -> Self {
Self(ptrs)
}
}
pub struct BufferedStream<Reader = AEADReader> {
reader: Reader,
buffer: BlockBuffer,
chunks: Vec<ChunkPointer>,
pos: Option<usize>,
len: Option<usize>,
}
impl<R: Reader> BufferedStream<R> {
fn open_next_chunk(&mut self) -> io::Result<Option<usize>> {
let ptr = match self.chunks.pop() {
Some(ptr) => ptr,
_ => return Ok(None),
};
let chunk = self
.reader
.read_chunk(&ptr, self.buffer.as_mut())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(Some(chunk.len()))
}
}
impl<R: Reader> Read for BufferedStream<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut written = 0;
while written < buf.len() {
match (self.pos, self.len) {
(Some(pos), Some(len)) if pos != len => {
let size = (buf.len() - written).min(len - pos);
buf[written..written + size].copy_from_slice(&self.buffer[pos..pos + size]);
self.pos = Some(pos + size);
written += size;
}
_ => match self.open_next_chunk()? {
Some(len) => {
self.pos = Some(0);
self.len = Some(len);
}
_ => break,
},
}
}
Ok(written)
}
}
pub struct BufferedSink<Writer = super::AEADWriter, Buffer = BlockBuffer> {
writer: Writer,
buffer: Buffer,
chunks: Vec<ChunkPointer>,
pos: usize,
len: usize,
chunk_size: usize,
}
impl<W> BufferedSink<W>
where
W: Writer,
{
pub fn new(writer: W) -> BufferedSink<W> {
Self::with_chunk_size(writer, CHUNK_SIZE)
}
pub fn with_chunk_size(writer: W, chunk_size: usize) -> Self {
Self {
writer,
buffer: BlockBuffer::default(),
chunks: vec![],
pos: 0,
len: 0,
chunk_size,
}
}
}
impl<W, Buffer> BufferedSink<W, Buffer>
where
W: Writer,
Buffer: AsMut<[u8]>,
{
pub fn with_buffer(writer: W, mut buffer: Buffer) -> super::Result<Self> {
if buffer.as_mut().len() < CHUNK_SIZE {
return Err(super::ObjectError::BufferTooSmall {
min_size: CHUNK_SIZE,
buf_size: buffer.as_mut().len(),
});
}
Ok(Self {
writer,
buffer,
chunks: vec![],
pos: 0,
len: 0,
chunk_size: CHUNK_SIZE,
})
}
pub fn set_chunk_size(mut self, size: usize) -> super::Result<Self> {
if self.buffer.as_mut().len() < size {
return Err(super::ObjectError::BufferTooSmall {
min_size: CHUNK_SIZE,
buf_size: self.buffer.as_mut().len(),
});
}
self.chunk_size = size;
Ok(self)
}
pub fn chunk_size(&self) -> usize {
self.chunk_size
}
pub fn clear(&mut self) -> super::Result<Stream> {
self.empty_buffer()?;
self.pos = 0;
self.len = 0;
self.buffer.as_mut().fill(0);
let chunks = Stream(self.chunks.clone());
self.chunks.clear();
Ok(chunks)
}
pub fn finish(mut self) -> super::Result<Stream> {
self.empty_buffer()?;
self.flush()?;
Ok(Stream(self.chunks))
}
fn empty_buffer(&mut self) -> super::Result<()> {
let internal = self.buffer.as_mut();
if self.len > 0 {
self.chunks.push(self.writer.write(&internal[0..self.len])?);
}
Ok(())
}
}
impl<W, Buffer> Write for BufferedSink<W, Buffer>
where
W: Writer,
Buffer: AsMut<[u8]>,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let read_size = |start: usize, pos: usize| (CHUNK_SIZE - pos).min(buf.len() - start);
let mut start = 0;
let mut size = read_size(start, self.pos);
while size > 0 {
let end = start + size;
self.len += size;
self.buffer.as_mut()[self.pos..self.len].copy_from_slice(&buf[start..end]);
if self.len == CHUNK_SIZE {
self.empty_buffer()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.pos = 0;
self.len = 0;
self.buffer.as_mut().fill(0);
} else {
self.pos += size;
}
start += size;
size = read_size(start, self.pos);
}
Ok(start)
}
fn flush(&mut self) -> io::Result<()> {
self.writer
.flush()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}
}
#[cfg(test)]
mod tests {
use crate::crypto::Scheme;
#[test]
fn large_buffer_write_then_read() {
use super::{
super::{AEADReader, AEADWriter},
BufferedSink,
};
use crate::{backends::test::InMemoryBackend, crypto::UsernamePassword};
use std::io::{Read, Write};
let key =
UsernamePassword::with_credentials("asdf".to_string(), "fdsa".to_string()).unwrap();
let backend = InMemoryBackend::shared();
let mut sink =
BufferedSink::new(AEADWriter::new(backend.clone(), key.chunk_key().unwrap()));
const SIZE: usize = 3 * crate::BLOCK_SIZE;
let buffer = vec![123u8; SIZE];
assert_eq!(SIZE, sink.write(&buffer).unwrap());
let chunks = sink.finish().unwrap();
assert_eq!(25, chunks.0.len());
let mut buffer2 = vec![0u8; SIZE];
chunks
.open_reader(AEADReader::new(backend, key.chunk_key().unwrap()))
.read(&mut buffer2)
.unwrap();
assert_eq!(buffer, buffer2);
}
}