use std::io::Error as IOError;
use std::io::Result as IOResult;
use std::io::{ErrorKind, Read, Seek};
use std::num::NonZeroUsize;
use crate::vectored_read::{read_vectored_into_buf, resolve_read_vectored, VectoredReadSelect};
#[derive(Debug)]
pub struct ChunkedReaderIter<R> {
reader: R,
reader_vectored: VectoredReadSelect,
chunk_size: NonZeroUsize,
buf_size: NonZeroUsize,
buf: Vec<u8>,
undrained_byte_count: usize,
io_error_stash: Option<IOError>,
}
impl<R> ChunkedReaderIter<R> {
pub fn new(
reader: R,
chunk_size: NonZeroUsize,
buf_size: NonZeroUsize,
reader_vectored: VectoredReadSelect,
) -> Self {
assert!(buf_size >= chunk_size);
Self {
reader,
reader_vectored,
chunk_size,
buf_size,
buf: Vec::with_capacity(buf_size.into()),
undrained_byte_count: 0,
io_error_stash: None,
}
}
#[inline]
pub fn into_inner(self) -> (Box<[u8]>, Option<IOError>, R) {
(
self.buf[self.undrained_byte_count..]
.iter()
.copied()
.collect(),
self.io_error_stash,
self.reader,
)
}
#[inline]
pub fn chunk_size(&self) -> NonZeroUsize {
self.chunk_size
}
#[inline]
pub fn buf_size(&self) -> NonZeroUsize {
self.buf_size
}
#[inline]
pub fn vectored_read_select(&self) -> VectoredReadSelect {
self.reader_vectored
}
#[inline]
pub fn buf(&self) -> &[u8] {
&self.buf[self.undrained_byte_count..]
}
}
impl<R: Seek> ChunkedReaderIter<R> {
pub fn new_with_rewind(
mut reader: R,
chunk_size: NonZeroUsize,
buf_size: NonZeroUsize,
reader_vectored: VectoredReadSelect,
) -> Self {
reader.rewind().unwrap();
Self::new(reader, chunk_size, buf_size, reader_vectored)
}
}
impl<R: Read> Iterator for ChunkedReaderIter<R> {
type Item = IOResult<Box<[u8]>>;
fn next(&mut self) -> Option<Self::Item> {
if self.io_error_stash.is_some() {
let err_obj = self.io_error_stash.take().unwrap();
return Some(Err(err_obj));
}
let mut read_offset = self.buf.len();
assert!(self.undrained_byte_count <= read_offset);
self.buf.resize(self.buf_size.into(), 0x00);
while read_offset < self.chunk_size.into() {
let reader_result = match resolve_read_vectored(&self.reader, self.reader_vectored) {
true => read_vectored_into_buf(
&mut self.reader,
&mut self.buf[read_offset..],
self.chunk_size,
),
false => self.reader.read(&mut self.buf[read_offset..]),
};
match reader_result {
Ok(0) => {
break;
}
Ok(n) => {
read_offset += n;
}
Err(e) if e.kind() == ErrorKind::Interrupted => { }
Err(e) => {
self.buf.truncate(read_offset);
if read_offset > 0 {
assert!(self.io_error_stash.is_none());
self.io_error_stash = Some(e);
let boxed_data: Box<[u8]> =
self.buf.drain(self.undrained_byte_count..).collect();
self.undrained_byte_count = 0;
return Some(Ok(boxed_data));
}
return Some(Err(e));
}
}
}
let unyielded_count = read_offset - self.undrained_byte_count;
if unyielded_count == 0 {
self.buf.clear();
self.undrained_byte_count = 0;
return None;
}
let unyielded_count = NonZeroUsize::new(unyielded_count).unwrap();
self.buf.truncate(read_offset);
if self.chunk_size > unyielded_count {
let boxed_data: Box<[u8]> = self.buf.drain(self.undrained_byte_count..).collect();
self.buf.clear();
self.undrained_byte_count = 0;
Some(Ok(boxed_data))
} else {
let ret_buf = self.buf[self.undrained_byte_count
..self.undrained_byte_count + usize::from(self.chunk_size)]
.iter()
.copied()
.collect();
self.undrained_byte_count += usize::from(self.chunk_size);
assert!(read_offset >= self.undrained_byte_count);
if read_offset - self.undrained_byte_count < self.chunk_size.into() {
self.buf.drain(..self.undrained_byte_count);
self.undrained_byte_count = 0;
}
Some(Ok(ret_buf))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use crate::dev_helpers::{FunnyRead, IceCubeRead, TruncatedRead};
#[test]
fn chunked_read_iter_funnyread() {
let funny_read = FunnyRead::default();
let mut funny_read_iter = ChunkedReaderIter::new(
funny_read,
NonZeroUsize::new(4).unwrap(),
NonZeroUsize::new(5).unwrap(),
VectoredReadSelect::Yes,
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
&[0, 1, 2, 3]
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
&[4, 5, 6, 7]
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
&[8, 9, 10, 11]
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
&[12, 13, 14, 15]
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
&[16, 17, 18, 19]
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
&[20, 21, 22, 23]
);
}
#[test]
fn chunked_read_iter_icecuberead() {
let funny_read = IceCubeRead::default();
let mut funny_read_iter = ChunkedReaderIter::new(
funny_read,
NonZeroUsize::new(2).unwrap(),
NonZeroUsize::new(5).unwrap(),
VectoredReadSelect::No,
);
assert_eq!(funny_read_iter.next().unwrap().unwrap().as_ref(), &[9, 99]);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
&[0x99, 9]
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
&[99, 0x99]
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap_err().kind(),
ErrorKind::Other
);
assert!(funny_read_iter.next().is_none());
assert_eq!(funny_read_iter.next().unwrap().unwrap().as_ref(), &[9, 99]);
}
#[test]
fn chunked_read_iter_truncatedread() {
let funny_read = TruncatedRead::default();
let mut funny_read_iter = ChunkedReaderIter::new(
funny_read,
NonZeroUsize::new(3).unwrap(),
NonZeroUsize::new(3).unwrap(),
VectoredReadSelect::No,
);
assert_eq!(funny_read_iter.next().unwrap().unwrap().as_ref(), b"rei");
assert_eq!(funny_read_iter.next().unwrap().unwrap().as_ref(), b"mu");
assert_eq!(
funny_read_iter.next().unwrap().unwrap_err().kind(),
ErrorKind::Other
);
assert!(funny_read_iter.next().is_none());
assert_eq!(funny_read_iter.next().unwrap().unwrap().as_ref(), b"rei");
}
#[test]
fn chunked_read_iter_truncatedread_large() {
let funny_read = TruncatedRead::default();
let mut funny_read_iter = ChunkedReaderIter::new(
funny_read,
NonZeroUsize::new(11).unwrap(),
NonZeroUsize::new(22).unwrap(),
VectoredReadSelect::No,
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
b"reimureimu"
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap_err().kind(),
ErrorKind::Other
);
assert!(funny_read_iter.next().is_none());
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
b"reimureimu"
);
assert_eq!(
funny_read_iter.next().unwrap().unwrap_err().kind(),
ErrorKind::Other
);
assert!(funny_read_iter.next().is_none());
assert_eq!(
funny_read_iter.next().unwrap().unwrap().as_ref(),
b"reimureimu"
);
}
#[test]
fn chunked_read_iter_cursor_large() {
let data_buf = [1, 2, 3, 4, 5, 6, 7, 8, 9];
let data_cursor = Cursor::new(data_buf);
let mut data_chunk_iter = ChunkedReaderIter::new(
data_cursor,
NonZeroUsize::new(4).unwrap(),
NonZeroUsize::new(8).unwrap(),
VectoredReadSelect::No,
);
assert_eq!(
data_chunk_iter.next().unwrap().unwrap().as_ref(),
&[1, 2, 3, 4]
);
assert_eq!(
data_chunk_iter.next().unwrap().unwrap().as_ref(),
&[5, 6, 7, 8]
);
assert_eq!(data_chunk_iter.next().unwrap().unwrap().as_ref(), &[9]);
assert!(data_chunk_iter.next().is_none());
let (unyielded_data, unyielded_error, _) = data_chunk_iter.into_inner();
assert_eq!(unyielded_data.as_ref(), &[]);
assert!(unyielded_error.is_none());
}
#[test]
fn chunked_read_iter_cursor_large_into_inner() {
let data_buf = [1, 2, 3, 4, 5, 6, 7, 8, 9];
let data_cursor = Cursor::new(data_buf);
let mut data_chunk_iter = ChunkedReaderIter::new(
data_cursor,
NonZeroUsize::new(4).unwrap(),
NonZeroUsize::new(8).unwrap(),
VectoredReadSelect::No,
);
assert_eq!(
data_chunk_iter.next().unwrap().unwrap().as_ref(),
&[1, 2, 3, 4]
);
let (unyielded_data, unyielded_error, _) = data_chunk_iter.into_inner();
assert_eq!(unyielded_data.as_ref(), &[5, 6, 7, 8]);
assert!(unyielded_error.is_none());
}
#[test]
fn chunked_read_iter_cursor_while() {
let data_buf = [1, 2, 3, 4, 5, 6, 7, 8, 9];
let data_cursor = Cursor::new(data_buf);
let data_chunks: Vec<_> = ChunkedReaderIter::new(
data_cursor,
NonZeroUsize::new(4).unwrap(),
NonZeroUsize::new(8).unwrap(),
VectoredReadSelect::No,
)
.collect();
let data_chunks_as_slice: Vec<&[u8]> = data_chunks
.iter()
.map(|r| r.as_ref().unwrap().as_ref())
.collect();
let expected_data_chunks: &[&[u8]] = &[&[1, 2, 3, 4], &[5, 6, 7, 8], &[9]];
assert_eq!(data_chunks_as_slice.as_slice(), expected_data_chunks);
}
#[test]
fn chunked_read_iter_cursor_large_buf_eq_chunk() {
let data_buf = [1, 2, 3, 4, 5, 6, 7, 8, 9];
let data_cursor = Cursor::new(data_buf);
let mut data_chunk_iter = ChunkedReaderIter::new(
data_cursor,
NonZeroUsize::new(4).unwrap(),
NonZeroUsize::new(4).unwrap(),
VectoredReadSelect::No,
);
assert_eq!(
data_chunk_iter.next().unwrap().unwrap().as_ref(),
&[1, 2, 3, 4]
);
assert_eq!(
data_chunk_iter.next().unwrap().unwrap().as_ref(),
&[5, 6, 7, 8]
);
assert_eq!(data_chunk_iter.next().unwrap().unwrap().as_ref(), &[9]);
assert!(data_chunk_iter.next().is_none());
}
#[test]
fn chunked_read_iter_cursor_smol() {
let data_buf = [1, 2, 3];
let data_cursor = Cursor::new(data_buf);
let mut data_chunk_iter = ChunkedReaderIter::new(
data_cursor,
NonZeroUsize::new(4).unwrap(),
NonZeroUsize::new(4).unwrap(),
VectoredReadSelect::No,
);
assert_eq!(
data_chunk_iter.next().unwrap().unwrap().as_ref(),
&[1, 2, 3]
);
assert!(data_chunk_iter.next().is_none());
}
}