#![warn(missing_docs)]
#![cfg_attr(feature = "nightly", feature(alloc, read_initializer, specialization))]
#![cfg_attr(all(test, feature = "nightly"), feature(io, test))]
extern crate memchr;
extern crate safemem;
use std::any::Any;
use std::cell::RefCell;
use std::io::prelude::*;
use std::io::SeekFrom;
use std::mem::ManuallyDrop;
use std::{cmp, error, fmt, io, ptr};
#[cfg(all(feature = "nightly", test))]
mod benches;
#[cfg(test)]
mod std_tests;
#[cfg(all(test, feature = "slice-deque"))]
mod ringbuf_tests;
#[cfg(feature = "nightly")]
mod nightly;
#[cfg(feature = "nightly")]
use nightly::init_buffer;
mod buffer;
use buffer::BufImpl;
pub mod policy;
use self::policy::{ReaderPolicy, WriterPolicy, StdPolicy, FlushOnNewline};
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
pub struct BufReader<R, P = StdPolicy>{
buf: Buffer,
inner: R,
policy: P,
}
impl<R> BufReader<R, StdPolicy> {
pub fn new(inner: R) -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE, inner)
}
pub fn with_capacity(cap: usize, inner: R) -> Self {
Self::with_buffer(Buffer::with_capacity(cap), inner)
}
#[cfg(feature = "slice-deque")]
pub fn new_ringbuf(inner: R) -> Self {
Self::with_capacity_ringbuf(DEFAULT_BUF_SIZE, inner)
}
#[cfg(feature = "slice-deque")]
pub fn with_capacity_ringbuf(cap: usize, inner: R) -> Self {
Self::with_buffer(Buffer::with_capacity_ringbuf(cap), inner)
}
pub fn with_buffer(buf: Buffer, inner: R) -> Self {
BufReader {
buf, inner, policy: StdPolicy
}
}
}
impl<R, P> BufReader<R, P> {
pub fn set_policy<P_: ReaderPolicy>(self, policy: P_) -> BufReader<R, P_> {
BufReader {
inner: self.inner,
buf: self.buf,
policy
}
}
pub fn policy_mut(&mut self) -> &mut P { &mut self.policy }
pub fn policy(&self) -> &P {
&self.policy
}
pub fn make_room(&mut self) {
self.buf.make_room();
}
pub fn reserve(&mut self, additional: usize) {
self.buf.reserve(additional);
}
pub fn buffer(&self) -> &[u8] {
self.buf.buf()
}
pub fn buf_len(&self) -> usize {
self.buf.len()
}
pub fn capacity(&self) -> usize {
self.buf.capacity()
}
pub fn get_ref(&self) -> &R { &self.inner }
pub fn get_mut(&mut self) -> &mut R { &mut self.inner }
pub fn into_inner(self) -> R {
self.inner
}
pub fn into_inner_with_buffer(self) -> (R, Buffer) {
(self.inner, self.buf)
}
pub fn unbuffer(self) -> Unbuffer<R> {
Unbuffer {
inner: self.inner,
buf: Some(self.buf),
}
}
}
impl<R, P: ReaderPolicy> BufReader<R, P> {
#[inline]
fn should_read(&mut self) -> bool {
self.policy.before_read(&mut self.buf).0
}
}
impl<R: Read, P> BufReader<R, P> {
pub fn read_into_buf(&mut self) -> io::Result<usize> {
self.buf.read_from(&mut self.inner)
}
pub fn boxed<'a>(self) -> BufReader<Box<Read + 'a>, P> where R: 'a {
let inner: Box<Read + 'a> = Box::new(self.inner);
BufReader {
inner,
buf: self.buf,
policy: self.policy,
}
}
}
impl<R: Read, P: ReaderPolicy> Read for BufReader<R, P> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
if self.buf.is_empty() && out.len() >= self.buf.capacity() {
return self.inner.read(out);
}
let nread = self.fill_buf()?.read(out)?;
self.consume(nread);
Ok(nread)
}
}
impl<R: Read, P: ReaderPolicy> BufRead for BufReader<R, P> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
while self.should_read() && self.buf.usable_space() > 0 {
if self.read_into_buf()? == 0 { break; };
}
Ok(self.buffer())
}
fn consume(&mut self, mut amt: usize) {
amt = cmp::min(amt, self.buf_len());
self.buf.consume(amt);
self.policy.after_consume(&mut self.buf, amt);
}
}
impl<R: fmt::Debug, P: fmt::Debug> fmt::Debug for BufReader<R, P> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("buf_redux::BufReader")
.field("reader", &self.inner)
.field("buf_len", &self.buf_len())
.field("capacity", &self.capacity())
.field("policy", &self.policy)
.finish()
}
}
impl<R: Seek, P: ReaderPolicy> Seek for BufReader<R, P> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let result: u64;
if let SeekFrom::Current(n) = pos {
let remainder = self.buf_len() as i64;
if let Some(offset) = n.checked_sub(remainder) {
result = self.inner.seek(SeekFrom::Current(offset))?;
} else {
self.inner.seek(SeekFrom::Current(-remainder))?;
self.buf.clear(); result = self.inner.seek(SeekFrom::Current(n))?;
}
} else {
result = self.inner.seek(pos)?;
}
self.buf.clear();
Ok(result)
}
}
pub struct BufWriter<W: Write, P = StdPolicy> {
buf: Buffer,
inner: W,
policy: P,
panicked: bool,
}
impl<W: Write> BufWriter<W> {
pub fn new(inner: W) -> Self {
Self::with_buffer(Buffer::new(), inner)
}
pub fn with_capacity(cap: usize, inner: W) -> Self {
Self::with_buffer(Buffer::with_capacity(cap), inner)
}
#[cfg(feature = "slice-deque")]
pub fn new_ringbuf(inner: W) -> Self {
Self::with_buffer(Buffer::new_ringbuf(), inner)
}
#[cfg(feature = "slice-deque")]
pub fn with_capacity_ringbuf(cap: usize, inner: W) -> Self {
Self::with_buffer(Buffer::with_capacity_ringbuf(cap), inner)
}
pub fn with_buffer(buf: Buffer, inner: W) -> BufWriter<W> {
BufWriter {
buf, inner, policy: StdPolicy, panicked: false,
}
}
}
impl<W: Write, P> BufWriter<W, P> {
pub fn set_policy<P_: WriterPolicy>(self, policy: P_) -> BufWriter<W, P_> {
let panicked = self.panicked;
let (inner, buf) = self.into_inner_();
BufWriter {
inner, buf, policy, panicked
}
}
pub fn policy_mut(&mut self) -> &mut P {
&mut self.policy
}
pub fn policy(&self) -> &P {
&self.policy
}
pub fn get_ref(&self) -> &W {
&self.inner
}
pub fn get_mut(&mut self) -> &mut W {
&mut self.inner
}
pub fn capacity(&self) -> usize {
self.buf.capacity()
}
pub fn buf_len(&self) -> usize {
self.buf.len()
}
pub fn reserve(&mut self, additional: usize) {
self.buf.reserve(additional);
}
pub fn make_room(&mut self) {
self.buf.make_room();
}
pub fn into_inner_with_buffer(self) -> (W, Buffer) {
self.into_inner_()
}
fn into_inner_(self) -> (W, Buffer) {
let s = ManuallyDrop::new(self);
unsafe {
let inner = ptr::read(&s.inner);
let buf = ptr::read(&s.buf);
(inner, buf)
}
}
fn flush_buf(&mut self, amt: usize) -> io::Result<()> {
if amt == 0 || amt > self.buf.len() { return Ok(()) }
self.panicked = true;
let ret = self.buf.write_max(amt, &mut self.inner);
self.panicked = false;
ret
}
}
impl<W: Write, P: WriterPolicy> BufWriter<W, P> {
pub fn into_inner(mut self) -> Result<W, IntoInnerError<Self>> {
match self.flush() {
Err(e) => Err(IntoInnerError(self, e)),
Ok(()) => Ok(self.into_inner_().0),
}
}
pub fn into_inner_with_err(mut self) -> (W, Option<io::Error>) {
let err = self.flush().err();
(self.into_inner_().0, err)
}
}
impl<W: Write, P: WriterPolicy> Write for BufWriter<W, P> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let flush_amt = self.policy.before_write(&mut self.buf, buf.len()).0;
self.flush_buf(flush_amt)?;
let written = if self.buf.is_empty() && buf.len() >= self.buf.capacity() {
self.panicked = true;
let result = self.inner.write(buf);
self.panicked = false;
result?
} else {
self.buf.copy_from_slice(buf)
};
let flush_amt = self.policy.after_write(&self.buf).0;
let _ = self.flush_buf(flush_amt);
Ok(written)
}
fn flush(&mut self) -> io::Result<()> {
let flush_amt = self.buf.len();
self.flush_buf(flush_amt)?;
self.inner.flush()
}
}
impl<W: Write + Seek, P: WriterPolicy> Seek for BufWriter<W, P> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.flush().and_then(|_| self.get_mut().seek(pos))
}
}
impl<W: Write + fmt::Debug, P: fmt::Debug> fmt::Debug for BufWriter<W, P> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("buf_redux::BufWriter")
.field("writer", &self.inner)
.field("capacity", &self.capacity())
.field("policy", &self.policy)
.finish()
}
}
impl<W: Write, P> Drop for BufWriter<W, P> {
fn drop(&mut self) {
if !self.panicked {
let buf_len = self.buf.len();
if let Err(err) = self.flush_buf(buf_len) {
DROP_ERR_HANDLER.with(|deh| {
(*deh.borrow())(&mut self.inner, &mut self.buf, err)
});
}
}
}
}
pub struct LineWriter<W: Write>(BufWriter<W, FlushOnNewline>);
impl<W: Write> LineWriter<W> {
pub fn new(inner: W) -> Self {
Self::with_buffer(Buffer::new(), inner)
}
pub fn with_capacity(cap: usize, inner: W) -> Self {
Self::with_buffer(Buffer::with_capacity(cap), inner)
}
#[cfg(feature = "slice-deque")]
pub fn new_ringbuf(inner: W) -> Self {
Self::with_buffer(Buffer::new_ringbuf(), inner)
}
#[cfg(feature = "slice-deque")]
pub fn with_capacity_ringbuf(cap: usize, inner: W) -> Self {
Self::with_buffer(Buffer::with_capacity_ringbuf(cap), inner)
}
pub fn with_buffer(buf: Buffer, inner: W) -> LineWriter<W> {
LineWriter(BufWriter::with_buffer(buf, inner).set_policy(FlushOnNewline))
}
pub fn get_ref(&self) -> &W {
self.0.get_ref()
}
pub fn get_mut(&mut self) -> &mut W {
self.0.get_mut()
}
pub fn capacity(&self) -> usize {
self.0.capacity()
}
pub fn buf_len(&self) -> usize {
self.0.buf_len()
}
pub fn reserve(&mut self, additional: usize) {
self.0.reserve(additional);
}
pub fn into_inner(self) -> Result<W, IntoInnerError<Self>> {
self.0.into_inner()
.map_err(|IntoInnerError(inner, e)| IntoInnerError(LineWriter(inner), e))
}
pub fn into_inner_with_err(self) -> (W, Option<io::Error>) {
self.0.into_inner_with_err()
}
pub fn into_inner_with_buf(self) -> (W, Buffer){
self.0.into_inner_with_buffer()
}
}
impl<W: Write> Write for LineWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl<W: Write + fmt::Debug> fmt::Debug for LineWriter<W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("buf_redux::LineWriter")
.field("writer", self.get_ref())
.field("capacity", &self.capacity())
.finish()
}
}
#[derive(Debug)]
pub struct IntoInnerError<W>(pub W, pub io::Error);
impl<W> IntoInnerError<W> {
pub fn error(&self) -> &io::Error {
&self.1
}
pub fn into_inner(self) -> W {
self.0
}
}
impl<W> Into<io::Error> for IntoInnerError<W> {
fn into(self) -> io::Error {
self.1
}
}
impl<W: Any + Send + fmt::Debug> error::Error for IntoInnerError<W> {
fn description(&self) -> &str {
error::Error::description(self.error())
}
fn cause(&self) -> Option<&error::Error> {
Some(&self.1)
}
}
impl<W> fmt::Display for IntoInnerError<W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.error().fmt(f)
}
}
pub struct Buffer {
buf: BufImpl,
zeroed: usize,
}
impl Buffer {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE)
}
pub fn with_capacity(cap: usize) -> Self {
Buffer {
buf: BufImpl::with_capacity(cap),
zeroed: 0,
}
}
#[cfg(feature = "slice-deque")]
pub fn new_ringbuf() -> Self {
Self::with_capacity_ringbuf(DEFAULT_BUF_SIZE)
}
#[cfg(feature = "slice-deque")]
pub fn with_capacity_ringbuf(cap: usize) -> Self {
Buffer {
buf: BufImpl::with_capacity_ringbuf(cap),
zeroed: 0,
}
}
pub fn is_ringbuf(&self) -> bool {
self.buf.is_ringbuf()
}
pub fn len(&self) -> usize {
self.buf.len()
}
pub fn usable_space(&self) -> usize {
self.buf.usable_space()
}
pub fn free_space(&self) -> usize {
self.capacity() - self.len()
}
pub fn capacity(&self) -> usize {
self.buf.capacity()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn make_room(&mut self) {
self.buf.make_room();
}
pub fn reserve(&mut self, additional: usize) {
if self.buf.reserve(additional) {
self.zeroed = 0;
}
}
pub fn buf(&self) -> &[u8] { self.buf.buf() }
pub fn buf_mut(&mut self) -> &mut [u8] { self.buf.buf_mut() }
pub fn read_from<R: Read + ?Sized>(&mut self, rdr: &mut R) -> io::Result<usize> {
if self.usable_space() == 0 {
return Ok(0);
}
let cap = self.capacity();
if self.zeroed < cap {
unsafe {
let buf = self.buf.write_buf();
init_buffer(&rdr, buf);
}
self.zeroed = cap;
}
let read = {
let mut buf = unsafe { self.buf.write_buf() };
rdr.read(buf)?
};
unsafe {
self.buf.bytes_written(read);
}
Ok(read)
}
pub fn copy_from_slice(&mut self, src: &[u8]) -> usize {
let len = unsafe {
let mut buf = self.buf.write_buf();
let len = cmp::min(buf.len(), src.len());
buf[..len].copy_from_slice(&src[..len]);
len
};
unsafe {
self.buf.bytes_written(len);
}
len
}
pub fn write_to<W: Write + ?Sized>(&mut self, wrt: &mut W) -> io::Result<usize> {
if self.len() == 0 {
return Ok(0);
}
let written = wrt.write(self.buf())?;
self.consume(written);
Ok(written)
}
pub fn write_max<W: Write + ?Sized>(&mut self, mut max: usize, wrt: &mut W) -> io::Result<()> {
while self.len() > 0 && max > 0 {
let len = cmp::min(self.len(), max);
let n = match wrt.write(&self.buf()[..len]) {
Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero,
"Buffer::write_all() got zero-sized write")),
Ok(n) => n,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
self.consume(n);
max = max.saturating_sub(n);
}
Ok(())
}
pub fn write_all<W: Write + ?Sized>(&mut self, wrt: &mut W) -> io::Result<()> {
while self.len() > 0 {
match self.write_to(wrt) {
Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero,
"Buffer::write_all() got zero-sized write")),
Ok(_) => (),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (),
Err(e) => return Err(e),
}
}
Ok(())
}
pub fn copy_to_slice(&mut self, out: &mut [u8]) -> usize {
let len = {
let buf = self.buf();
let len = cmp::min(buf.len(), out.len());
out[..len].copy_from_slice(&buf[..len]);
len
};
self.consume(len);
len
}
pub fn push_bytes(&mut self, bytes: &[u8]) {
let s_len = bytes.len();
if self.usable_space() < s_len {
self.reserve(s_len * 2);
}
unsafe {
self.buf.write_buf()[..s_len].copy_from_slice(bytes);
self.buf.bytes_written(s_len);
}
}
pub fn consume(&mut self, amt: usize) {
self.buf.consume(amt);
}
pub fn clear(&mut self) {
let buf_len = self.len();
self.consume(buf_len);
}
}
impl fmt::Debug for Buffer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("buf_redux::Buffer")
.field("capacity", &self.capacity())
.field("len", &self.len())
.finish()
}
}
pub struct Unbuffer<R> {
inner: R,
buf: Option<Buffer>,
}
impl<R> Unbuffer<R> {
pub fn is_buf_empty(&self) -> bool {
!self.buf.is_some()
}
pub fn buf_len(&self) -> usize {
self.buf.as_ref().map(Buffer::len).unwrap_or(0)
}
pub fn buf(&self) -> &[u8] {
self.buf.as_ref().map_or(&[], Buffer::buf)
}
pub fn into_inner(self) -> R {
self.inner
}
}
impl<R: Read> Read for Unbuffer<R> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
if let Some(ref mut buf) = self.buf.as_mut() {
let read = buf.copy_to_slice(out);
if out.len() != 0 && read != 0 {
return Ok(read);
}
}
self.buf = None;
self.inner.read(out)
}
}
impl<R: fmt::Debug> fmt::Debug for Unbuffer<R> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("buf_redux::Unbuffer")
.field("reader", &self.inner)
.field("buffer", &self.buf)
.finish()
}
}
pub fn copy_buf<B: BufRead, W: Write>(b: &mut B, w: &mut W) -> io::Result<u64> {
let mut total_copied = 0;
loop {
let copied = match b.fill_buf().and_then(|buf| w.write(buf)) {
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
Ok(buf) => buf,
};
if copied == 0 { break; }
b.consume(copied);
total_copied += copied as u64;
}
Ok(total_copied)
}
thread_local!(
static DROP_ERR_HANDLER: RefCell<Box<Fn(&mut Write, &mut Buffer, io::Error)>>
= RefCell::new(Box::new(|_, _, _| ()))
);
pub fn set_drop_err_handler<F: 'static>(handler: F)
where F: Fn(&mut Write, &mut Buffer, io::Error)
{
DROP_ERR_HANDLER.with(|deh| *deh.borrow_mut() = Box::new(handler))
}
#[cfg(not(feature = "nightly"))]
fn init_buffer<R: Read + ?Sized>(_r: &R, buf: &mut [u8]) {
safemem::write_bytes(buf, 0);
}