use std::io;
use std::fmt;
use std::cmp;
use std::io::{Error, ErrorKind};
use super::*;
const TRACE: bool = false;
pub struct Generic<T: io::Read + Send + Sync, C: fmt::Debug + Sync + Send> {
buffer: Option<Vec<u8>>,
cursor: usize,
unused_buffer: Option<Vec<u8>>,
preferred_chunk_size: usize,
reader: T,
error: Option<Error>,
eof: bool,
cookie: C,
}
assert_send_and_sync!(Generic<T, C>
where T: io::Read,
C: fmt::Debug);
impl<T: io::Read + Send + Sync, C: fmt::Debug + Sync + Send> fmt::Display for Generic<T, C> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Generic")
}
}
impl<T: io::Read + Send + Sync, C: fmt::Debug + Sync + Send> fmt::Debug for Generic<T, C> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let buffered_data = if let Some(ref buffer) = self.buffer {
buffer.len() - self.cursor
} else {
0
};
f.debug_struct("Generic")
.field("preferred_chunk_size", &self.preferred_chunk_size)
.field("buffer data", &buffered_data)
.finish()
}
}
impl<T: io::Read + Send + Sync> Generic<T, ()> {
pub fn new(reader: T, preferred_chunk_size: Option<usize>) -> Self {
Self::with_cookie(reader, preferred_chunk_size, ())
}
}
impl<T: io::Read + Send + Sync, C: fmt::Debug + Sync + Send> Generic<T, C> {
pub fn with_cookie(
reader: T, preferred_chunk_size: Option<usize>, cookie: C)
-> Self {
Generic {
buffer: None,
cursor: 0,
unused_buffer: None,
preferred_chunk_size:
if let Some(s) = preferred_chunk_size { s }
else { default_buf_size() },
reader,
error: None,
eof: false,
cookie,
}
}
pub fn reader_ref(&self) -> &T {
&self.reader
}
pub fn reader_mut(&mut self) -> &mut T {
&mut self.reader
}
pub fn into_reader(self) -> T {
self.reader
}
fn data_helper(&mut self, amount: usize, hard: bool, and_consume: bool)
-> io::Result<&[u8]> {
tracer!(TRACE, "Generic::data_helper");
t!("amount: {}, hard: {}, and_consume: {} (cursor: {}, buffer: {:?})",
amount, hard, and_consume,
self.cursor,
self.buffer.as_ref().map(|buffer| buffer.len()));
if let Some(e) = self.error.take() {
t!("Returning stashed error: {}", e);
return Err(e);
}
if let Some(ref buffer) = self.buffer {
assert!(self.cursor <= buffer.len());
} else {
assert_eq!(self.cursor, 0);
}
let amount_buffered
= self.buffer.as_ref().map(|b| b.len() - self.cursor).unwrap_or(0);
if amount > amount_buffered {
let capacity : usize = amount
+ cmp::max(default_buf_size(), 2 * self.preferred_chunk_size);
let mut buffer_new = self.unused_buffer.take()
.map(|mut v| {
vec_resize(&mut v, capacity);
v
})
.unwrap_or_else(|| vec![0u8; capacity]);
let mut amount_read = 0;
while amount_buffered + amount_read < amount {
t!("Have {} bytes, need {} bytes",
amount_buffered + amount_read, amount);
if self.eof {
t!("Hit EOF on the underlying reader, don't poll again.");
break;
}
match self.reader.read(&mut buffer_new
[amount_buffered + amount_read..]) {
Ok(read) => {
t!("Read {} bytes", read);
if read == 0 {
self.eof = true;
break;
} else {
amount_read += read;
continue;
}
},
Err(ref err) if err.kind() == ErrorKind::Interrupted =>
continue,
Err(err) => {
self.error = Some(err);
break;
},
}
}
if amount_read > 0 {
if let Some(ref buffer) = self.buffer {
buffer_new[0..amount_buffered]
.copy_from_slice(
&buffer[self.cursor..self.cursor + amount_buffered]);
}
vec_truncate(&mut buffer_new, amount_buffered + amount_read);
self.unused_buffer = self.buffer.take();
self.buffer = Some(buffer_new);
self.cursor = 0;
}
}
let amount_buffered
= self.buffer.as_ref().map(|b| b.len() - self.cursor).unwrap_or(0);
if self.error.is_some() {
t!("Encountered an error: {}", self.error.as_ref().unwrap());
if hard && amount > amount_buffered {
t!("Not enough data to fulfill request, returning error");
return Err(self.error.take().unwrap());
}
if !hard && amount_buffered == 0 {
t!("No data data buffered, returning error");
return Err(self.error.take().unwrap());
}
}
if hard && amount_buffered < amount {
t!("Unexpected EOF");
Err(Error::new(ErrorKind::UnexpectedEof, "EOF"))
} else if amount == 0 || amount_buffered == 0 {
t!("Returning zero-length slice");
Ok(&b""[..])
} else {
let buffer = self.buffer.as_ref().unwrap();
if and_consume {
let amount_consumed = cmp::min(amount_buffered, amount);
self.cursor += amount_consumed;
assert!(self.cursor <= buffer.len());
t!("Consuming {} bytes, returning {} bytes",
amount_consumed,
buffer[self.cursor-amount_consumed..].len());
Ok(&buffer[self.cursor-amount_consumed..])
} else {
t!("Returning {} bytes",
buffer[self.cursor..].len());
Ok(&buffer[self.cursor..])
}
}
}
}
impl<T: io::Read + Send + Sync, C: fmt::Debug + Sync + Send> io::Read for Generic<T, C> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
buffered_reader_generic_read_impl(self, buf)
}
}
impl<T: io::Read + Send + Sync, C: fmt::Debug + Sync + Send> BufferedReader<C> for Generic<T, C> {
fn buffer(&self) -> &[u8] {
if let Some(ref buffer) = self.buffer {
&buffer[self.cursor..]
} else {
&b""[..]
}
}
fn data(&mut self, amount: usize) -> Result<&[u8], io::Error> {
self.data_helper(amount, false, false)
}
fn data_hard(&mut self, amount: usize) -> Result<&[u8], io::Error> {
self.data_helper(amount, true, false)
}
fn consume(&mut self, amount: usize) -> &[u8] {
if let Some(ref buffer) = self.buffer {
assert!(self.cursor <= buffer.len());
assert!(amount <= buffer.len() - self.cursor,
"buffer contains just {} bytes, but you are trying to \
consume {} bytes. Did you forget to call data()?",
buffer.len() - self.cursor, amount);
self.cursor += amount;
return &self.buffer.as_ref().unwrap()[self.cursor - amount..];
} else {
assert_eq!(amount, 0);
&b""[..]
}
}
fn data_consume(&mut self, amount: usize) -> Result<&[u8], io::Error> {
self.data_helper(amount, false, true)
}
fn data_consume_hard(&mut self, amount: usize) -> Result<&[u8], io::Error> {
self.data_helper(amount, true, true)
}
fn get_mut(&mut self) -> Option<&mut dyn BufferedReader<C>> {
None
}
fn get_ref(&self) -> Option<&dyn BufferedReader<C>> {
None
}
fn into_inner<'b>(self: Box<Self>) -> Option<Box<dyn BufferedReader<C> + 'b>>
where Self: 'b {
None
}
fn cookie_set(&mut self, cookie: C) -> C {
use std::mem;
mem::replace(&mut self.cookie, cookie)
}
fn cookie_ref(&self) -> &C {
&self.cookie
}
fn cookie_mut(&mut self) -> &mut C {
&mut self.cookie
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn buffered_reader_generic_test() {
{
use std::path::PathBuf;
use std::fs::File;
let path : PathBuf = [env!("CARGO_MANIFEST_DIR"),
"src", "buffered-reader-test.txt"]
.iter().collect();
let mut f = File::open(&path).expect(&path.to_string_lossy());
let mut bio = Generic::new(&mut f, None);
buffered_reader_test_data_check(&mut bio);
}
{
let mut data : &[u8] = include_bytes!("buffered-reader-test.txt");
let mut bio = Generic::new(&mut data, None);
buffered_reader_test_data_check(&mut bio);
}
}
#[test]
fn buffer_test() {
let size = 10 * default_buf_size();
let mut input = Vec::with_capacity(size);
let mut v = 0u8;
for _ in 0..size {
input.push(v);
if v == std::u8::MAX {
v = 0;
} else {
v += 1;
}
}
let mut reader = Generic::new(&input[..], None);
let stats_count = 2 * default_buf_size();
let mut stats = vec![0usize; stats_count];
for i in 0..input.len() {
let data = reader.data(default_buf_size() + 1).unwrap().to_vec();
assert!(!data.is_empty());
assert_eq!(data, reader.buffer());
assert_eq!(data, &input[i..i+data.len()]);
stats[cmp::min(data.len(), stats_count - 1)] += 1;
reader.consume(1);
}
if false {
for i in 0..stats.len() {
if stats[i] > 0 {
if i == stats.len() - 1 {
eprint!(">=");
}
eprintln!("{}: {}", i, stats[i]);
}
}
}
}
}