use std::fmt;
use std::io;
use std::hash;
use std::mem;
use std::cmp;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
use io::Io;
const INITIAL_CAPACITY: usize = 8 * 1024;
#[derive(Clone, Eq)]
pub struct EasyBuf {
buf: Arc<Vec<u8>>,
start: usize,
end: usize,
}
pub struct EasyBufMut<'a> {
buf: &'a mut Vec<u8>,
end: &'a mut usize,
}
impl EasyBuf {
pub fn new() -> EasyBuf {
EasyBuf::with_capacity(INITIAL_CAPACITY)
}
pub fn with_capacity(cap: usize) -> EasyBuf {
EasyBuf {
buf: Arc::new(Vec::with_capacity(cap)),
start: 0,
end: 0,
}
}
fn set_start(&mut self, start: usize) -> &mut EasyBuf {
assert!(start <= self.buf.as_ref().len());
assert!(start <= self.end);
self.start = start;
self
}
fn set_end(&mut self, end: usize) -> &mut EasyBuf {
assert!(end <= self.buf.len());
assert!(self.start <= end);
self.end = end;
self
}
pub fn len(&self) -> usize {
self.end - self.start
}
pub fn as_slice(&self) -> &[u8] {
self.as_ref()
}
pub fn split_off(&mut self, at: usize) -> EasyBuf {
let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
let idx = self.start + at;
other.set_start(idx);
self.set_end(idx);
return other
}
pub fn drain_to(&mut self, at: usize) -> EasyBuf {
let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
let idx = self.start + at;
other.set_end(idx);
self.set_start(idx);
return other
}
pub fn get_mut(&mut self) -> EasyBufMut {
if Arc::get_mut(&mut self.buf).is_some() {
let buf = Arc::get_mut(&mut self.buf).unwrap();
buf.drain(self.end..);
buf.drain(..self.start);
self.start = 0;
return EasyBufMut { buf: buf, end: &mut self.end }
}
let mut v = Vec::with_capacity(cmp::max(INITIAL_CAPACITY, self.as_ref().len()));
v.extend_from_slice(self.as_ref());
self.start = 0;
self.buf = Arc::new(v);
EasyBufMut {
buf: Arc::get_mut(&mut self.buf).unwrap(),
end: &mut self.end,
}
}
}
impl AsRef<[u8]> for EasyBuf {
fn as_ref(&self) -> &[u8] {
&self.buf[self.start..self.end]
}
}
impl<'a> Deref for EasyBufMut<'a> {
type Target = Vec<u8>;
fn deref(&self) -> &Vec<u8> {
self.buf
}
}
impl<'a> DerefMut for EasyBufMut<'a> {
fn deref_mut(&mut self) -> &mut Vec<u8> {
self.buf
}
}
impl From<Vec<u8>> for EasyBuf {
fn from(vec: Vec<u8>) -> EasyBuf {
let end = vec.len();
EasyBuf {
buf: Arc::new(vec),
start: 0,
end: end,
}
}
}
impl<T: AsRef<[u8]>> PartialEq<T> for EasyBuf {
fn eq(&self, other: &T) -> bool {
self.as_slice().eq(other.as_ref())
}
}
impl Ord for EasyBuf {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.as_slice().cmp(other.as_slice())
}
}
impl<T: AsRef<[u8]>> PartialOrd<T> for EasyBuf {
fn partial_cmp(&self, other: &T) -> Option<cmp::Ordering> {
self.as_slice().partial_cmp(other.as_ref())
}
}
impl hash::Hash for EasyBuf {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.as_slice().hash(state)
}
}
impl<'a> Drop for EasyBufMut<'a> {
fn drop(&mut self) {
*self.end = self.buf.len();
}
}
impl fmt::Debug for EasyBuf {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
let bytes = self.as_ref();
let len = self.len();
if len < 10 {
write!(formatter, "EasyBuf{{len={}/{} {:?}}}", self.len(), self.buf.len(), bytes)
} else {
write!(formatter, "EasyBuf{{len={}/{} [{}, {}, {}, {}, ..., {}, {}, {}, {}]}}", self.len(), self.buf.len(), bytes[0], bytes[1], bytes[2], bytes[3], bytes[len-4], bytes[len-3], bytes[len-2], bytes[len-1])
}
}
}
impl Into<Vec<u8>> for EasyBuf {
fn into(mut self) -> Vec<u8> {
mem::replace(self.get_mut().buf, vec![])
}
}
pub trait Codec {
type In;
type Out;
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>>;
fn decode_eof(&mut self, buf: &mut EasyBuf) -> io::Result<Self::In> {
match try!(self.decode(buf)) {
Some(frame) => Ok(frame),
None => Err(io::Error::new(io::ErrorKind::Other,
"bytes remaining on stream")),
}
}
fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()>;
}
#[must_use = "streams do nothing unless polled"]
pub struct Framed<T, C> {
upstream: T,
codec: C,
eof: bool,
is_readable: bool,
rd: EasyBuf,
wr: Vec<u8>,
}
impl<T: Io, C: Codec> Stream for Framed<T, C> {
type Item = C::In;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
loop {
if self.is_readable {
if self.eof {
if self.rd.len() == 0 {
return Ok(None.into())
} else {
let frame = try!(self.codec.decode_eof(&mut self.rd));
return Ok(Async::Ready(Some(frame)))
}
}
trace!("attempting to decode a frame");
if let Some(frame) = try!(self.codec.decode(&mut self.rd)) {
trace!("frame decoded from buffer");
return Ok(Async::Ready(Some(frame)));
}
self.is_readable = false;
}
assert!(!self.eof);
let before = self.rd.len();
let ret = self.upstream.read_to_end(&mut self.rd.get_mut());
match ret {
Ok(_n) => self.eof = true,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.rd.len() == before {
return Ok(Async::NotReady)
}
}
Err(e) => return Err(e),
}
self.is_readable = true;
}
}
}
impl<T: Io, C: Codec> Sink for Framed<T, C> {
type SinkItem = C::Out;
type SinkError = io::Error;
fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
if self.wr.len() > BACKPRESSURE_BOUNDARY {
try!(self.poll_complete());
if self.wr.len() > BACKPRESSURE_BOUNDARY {
return Ok(AsyncSink::NotReady(item));
}
}
try!(self.codec.encode(item, &mut self.wr));
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
trace!("flushing framed transport");
while !self.wr.is_empty() {
trace!("writing; remaining={}", self.wr.len());
let n = try_nb!(self.upstream.write(&self.wr));
if n == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero,
"failed to write frame to transport"));
}
self.wr.drain(..n);
}
try_nb!(self.upstream.flush());
trace!("framed transport flushed");
return Ok(Async::Ready(()));
}
fn close(&mut self) -> Poll<(), io::Error> {
try_ready!(self.poll_complete());
Ok(().into())
}
}
pub fn framed<T, C>(io: T, codec: C) -> Framed<T, C> {
Framed {
upstream: io,
codec: codec,
eof: false,
is_readable: false,
rd: EasyBuf::new(),
wr: Vec::with_capacity(INITIAL_CAPACITY),
}
}
impl<T, C> Framed<T, C> {
pub fn get_ref(&self) -> &T {
&self.upstream
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.upstream
}
pub fn into_inner(self) -> T {
self.upstream
}
}
#[cfg(test)]
mod tests {
use super::{INITIAL_CAPACITY, EasyBuf};
use std::mem;
#[test]
fn debug_empty_easybuf() {
let buf: EasyBuf = vec![].into();
assert_eq!("EasyBuf{len=0/0 []}", format!("{:?}", buf));
}
#[test]
fn debug_small_easybuf() {
let buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into();
assert_eq!("EasyBuf{len=6/6 [1, 2, 3, 4, 5, 6]}", format!("{:?}", buf));
}
#[test]
fn debug_small_easybuf_split() {
let mut buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into();
let split = buf.split_off(4);
assert_eq!("EasyBuf{len=4/6 [1, 2, 3, 4]}", format!("{:?}", buf));
assert_eq!("EasyBuf{len=2/6 [5, 6]}", format!("{:?}", split));
}
#[test]
fn debug_large_easybuf() {
let vec: Vec<u8> = (0u8..255u8).collect();
let buf: EasyBuf = vec.into();
assert_eq!("EasyBuf{len=255/255 [0, 1, 2, 3, ..., 251, 252, 253, 254]}", format!("{:?}", buf));
}
#[test]
fn easybuf_get_mut_sliced() {
let vec: Vec<u8> = (0u8..10u8).collect();
let mut buf: EasyBuf = vec.into();
buf.split_off(9);
buf.drain_to(3);
assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]);
}
#[test]
fn easybuf_get_mut_sliced_allocating_at_least_initial_capacity() {
let vec: Vec<u8> = (0u8..10u8).collect();
let mut buf: EasyBuf = vec.into();
buf.split_off(9);
buf.drain_to(3);
let clone = buf.clone();
assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]);
assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY);
mem::drop(clone);
}
#[test]
fn easybuf_get_mut_sliced_allocating_required_capacity() {
let vec: Vec<u8> = (0..INITIAL_CAPACITY * 2).map(|_|0u8).collect();
let mut buf: EasyBuf = vec.into();
buf.drain_to(INITIAL_CAPACITY / 2);
let clone = buf.clone();
assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY + INITIAL_CAPACITY / 2);
mem::drop(clone)
}
#[test]
fn easybuf_into_vec_simple() {
let vec: Vec<u8> = (0u8..10u8).collect();
let reference = vec.clone();
let buf: EasyBuf = vec.into();
let original_pointer = buf.buf.as_ref().as_ptr();
let result: Vec<u8> = buf.into();
assert_eq!(result, reference);
let new_pointer = result.as_ptr();
assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec");
}
#[test]
fn easybuf_into_vec_sliced() {
let vec: Vec<u8> = (0u8..10u8).collect();
let mut buf: EasyBuf = vec.into();
let original_pointer = buf.buf.as_ref().as_ptr();
buf.split_off(9);
buf.drain_to(3);
let result: Vec<u8> = buf.into();
let reference: Vec<u8> = (3u8..9u8).collect();
assert_eq!(result, reference);
let new_pointer = result.as_ptr();
assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec");
}
#[test]
fn easybuf_into_vec_sliced_allocating() {
let vec: Vec<u8> = (0u8..10u8).collect();
let mut buf: EasyBuf = vec.into();
let original_pointer = buf.buf.as_ref().as_ptr();
let original = buf.clone();
buf.split_off(9);
buf.drain_to(3);
let result: Vec<u8> = buf.into();
let reference: Vec<u8> = (3u8..9u8).collect();
assert_eq!(result, reference);
let original_reference: EasyBuf =(0u8..10u8).collect::<Vec<u8>>().into();
assert_eq!(original.as_ref(), original_reference.as_ref());
let new_pointer = result.as_ptr();
assert_ne!(original_pointer, new_pointer, "A new vec should be allocated");
}
#[test]
fn easybuf_equality_same_underlying_vec() {
let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
assert_eq!(buf, buf);
let other = buf.drain_to(5);
assert_ne!(buf, other);
let buf: EasyBuf = (0u8..5).collect::<Vec<_>>().into();
assert_eq!(buf, other);
}
#[test]
fn easybuf_equality_different_underlying_vec() {
let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
let mut other: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
assert_eq!(buf, other);
buf = buf.drain_to(5);
assert_ne!(buf, other);
other = other.drain_to(5);
assert_eq!(buf, other);
}
}