use std::collections::VecDeque;
use std::convert::TryInto;
use std::io::IoSlice;
use super::super::frame::{DecodeError, Frame, Payload};
use bytes::{Buf, BufMut, Bytes, BytesMut};
pub(crate) struct BufList<T> {
bufs: VecDeque<T>,
}
impl<T: Buf> BufList<T> {
pub(crate) fn new() -> BufList<T> {
BufList { bufs: VecDeque::new() }
}
#[inline]
pub(crate) fn push(&mut self, buf: T) {
debug_assert!(buf.has_remaining());
self.bufs.push_back(buf);
}
#[inline]
pub(crate) fn len(&self) -> usize {
self.bufs.len()
}
}
impl<T: Buf> TryInto<Payload> for BufList<T> {
type Error = DecodeError;
fn try_into(self) -> Result<Payload, Self::Error> {
let mut meta = BytesMut::new();
let mut data = BytesMut::new();
self.bufs.into_iter().try_for_each(|mut v| {
let (m, d) = match Frame::decode(&mut v)? {
Frame::RequestResponse(v) => v.payload().clone().split(),
Frame::RequestFnf(v) => v.payload().clone().split(),
Frame::RequestStream(v) => v.payload().clone().split(),
Frame::RequestChannel(v) => v.payload().clone().split(),
Frame::Payload(v) => v.payload().clone().split(),
_ => (None, None),
};
if let Some(b) = m {
meta.put(b);
}
if let Some(b) = d {
data.put(b);
}
Ok(())
})?;
let meta = if meta.is_empty() { None } else { Some(meta.freeze()) };
let data = if data.is_empty() { None } else { Some(data.freeze()) };
Ok(Payload::new(meta, data))
}
}
impl<T: Buf> Buf for BufList<T> {
#[inline]
fn remaining(&self) -> usize {
self.bufs.iter().map(|buf| buf.remaining()).sum()
}
#[inline]
fn chunk(&self) -> &[u8] {
self.bufs.front().map(Buf::chunk).unwrap_or_default()
}
#[inline]
fn advance(&mut self, mut cnt: usize) {
while cnt > 0 {
let front = &mut self.bufs[0];
let rem = front.remaining();
if rem > cnt {
front.advance(cnt);
return;
} else {
front.advance(rem);
cnt -= rem;
self.bufs.pop_front();
}
}
}
#[inline]
fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
if dst.is_empty() {
return 0;
}
let mut vecs = 0;
for buf in &self.bufs {
vecs += buf.chunks_vectored(&mut dst[vecs..]);
if vecs == dst.len() {
break;
}
}
vecs
}
#[inline]
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
match self.bufs.front_mut() {
Some(front) if front.remaining() == len => {
let bytes = front.copy_to_bytes(len);
self.bufs.pop_front();
bytes
}
Some(front) if front.remaining() > len => front.copy_to_bytes(len),
_ => {
assert!(
len <= self.remaining(),
"`len` greater than remaining"
);
let mut bm = BytesMut::with_capacity(len);
bm.put(self.take(len));
bm.freeze()
}
}
}
}
#[cfg(test)]
mod tests {
use std::ptr;
use super::super::super::frame::{codec::RequestFnfFrame, Encode};
use super::*;
fn hello_world_buf() -> BufList<Bytes> {
BufList {
bufs: vec![
Bytes::from("Hello"),
Bytes::from(" "),
Bytes::from("World"),
]
.into(),
}
}
#[test]
fn to_bytes_shorter() {
let mut bufs = hello_world_buf();
let old_ptr = bufs.chunk().as_ptr();
let start = bufs.copy_to_bytes(4);
assert_eq!(start, "Hell");
assert!(ptr::eq(old_ptr, start.as_ptr()));
assert_eq!(bufs.chunk(), b"o");
assert!(ptr::eq(old_ptr.wrapping_add(4), bufs.chunk().as_ptr()));
assert_eq!(bufs.remaining(), 7);
}
#[test]
fn to_bytes_eq() {
let mut bufs = hello_world_buf();
let old_ptr = bufs.chunk().as_ptr();
let start = bufs.copy_to_bytes(5);
assert_eq!(start, "Hello");
assert!(ptr::eq(old_ptr, start.as_ptr()));
assert_eq!(bufs.chunk(), b" ");
assert_eq!(bufs.remaining(), 6);
}
#[test]
fn to_bytes_longer() {
let mut bufs = hello_world_buf();
let start = bufs.copy_to_bytes(7);
assert_eq!(start, "Hello W");
assert_eq!(bufs.remaining(), 4);
}
#[test]
fn one_long_buf_to_bytes() {
let mut buf = BufList::new();
buf.push(b"Hello World" as &[_]);
assert_eq!(buf.copy_to_bytes(5), "Hello");
assert_eq!(buf.chunk(), b" World");
}
#[test]
#[should_panic(expected = "`len` greater than remaining")]
fn buf_to_bytes_too_many() {
hello_world_buf().copy_to_bytes(42);
}
#[test]
fn try_into_payload() {
let f1 = RequestFnfFrame::new(
1,
true,
Payload::new(Some(Bytes::from("metadata 1 ")), None),
)
.to_bytes();
let f2 = RequestFnfFrame::new(
1,
true,
Payload::new(
Some(Bytes::from("metadata 2 ")),
Some(Bytes::from("data 1 ")),
),
)
.to_bytes();
let f3 = RequestFnfFrame::new(
1,
false,
Payload::new(None, Some(Bytes::from("data 2 "))),
)
.to_bytes();
let mut list = BufList::new();
list.push(f1);
list.push(f2);
list.push(f3);
let payload: Payload = list.try_into().unwrap();
let (meta, data) = payload.split();
assert_eq!(meta, Some(Bytes::from("metadata 1 metadata 2 ")));
assert_eq!(data, Some(Bytes::from("data 1 data 2 ")));
}
}