Documentation
use std::collections::VecDeque;
use std::convert::TryInto;
use std::io::IoSlice;

use super::super::frame::{DecodeError, Frame, Payload};
use bytes::{Buf, BufMut, Bytes, BytesMut};

/// A specialized buf list for frame reassembly.
///
/// This is borrowed from `hyper` <https://crates.io/crates/hyper>.
pub(crate) struct BufList<T> {
    bufs: VecDeque<T>,
}

impl<T: Buf> BufList<T> {
    pub(crate) fn new() -> BufList<T> {
        BufList { bufs: VecDeque::new() }
    }

    #[inline]
    pub(crate) fn push(&mut self, buf: T) {
        debug_assert!(buf.has_remaining());
        self.bufs.push_back(buf);
    }

    #[inline]
    pub(crate) fn len(&self) -> usize {
        self.bufs.len()
    }
}

impl<T: Buf> TryInto<Payload> for BufList<T> {
    type Error = DecodeError;

    fn try_into(self) -> Result<Payload, Self::Error> {
        let mut meta = BytesMut::new();
        let mut data = BytesMut::new();

        self.bufs.into_iter().try_for_each(|mut v| {
            let (m, d) = match Frame::decode(&mut v)? {
                Frame::RequestResponse(v) => v.payload().clone().split(),
                Frame::RequestFnf(v) => v.payload().clone().split(),
                Frame::RequestStream(v) => v.payload().clone().split(),
                Frame::RequestChannel(v) => v.payload().clone().split(),
                Frame::Payload(v) => v.payload().clone().split(),
                _ => (None, None),
            };
            if let Some(b) = m {
                meta.put(b);
            }
            if let Some(b) = d {
                data.put(b);
            }
            Ok(())
        })?;

        let meta = if meta.is_empty() { None } else { Some(meta.freeze()) };
        let data = if data.is_empty() { None } else { Some(data.freeze()) };
        Ok(Payload::new(meta, data))
    }
}

impl<T: Buf> Buf for BufList<T> {
    #[inline]
    fn remaining(&self) -> usize {
        self.bufs.iter().map(|buf| buf.remaining()).sum()
    }

    #[inline]
    fn chunk(&self) -> &[u8] {
        self.bufs.front().map(Buf::chunk).unwrap_or_default()
    }

    #[inline]
    fn advance(&mut self, mut cnt: usize) {
        while cnt > 0 {
            let front = &mut self.bufs[0];
            let rem = front.remaining();
            if rem > cnt {
                front.advance(cnt);
                return;
            } else {
                front.advance(rem);
                cnt -= rem;
                self.bufs.pop_front();
            }
        }
    }

    #[inline]
    fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
        if dst.is_empty() {
            return 0;
        }

        let mut vecs = 0;
        for buf in &self.bufs {
            vecs += buf.chunks_vectored(&mut dst[vecs..]);
            if vecs == dst.len() {
                break;
            }
        }
        vecs
    }

    #[inline]
    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
        // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
        // request can be fulfilled by the front buffer, we can take advantage.
        match self.bufs.front_mut() {
            Some(front) if front.remaining() == len => {
                let bytes = front.copy_to_bytes(len);
                self.bufs.pop_front();
                bytes
            }
            Some(front) if front.remaining() > len => front.copy_to_bytes(len),
            _ => {
                assert!(
                    len <= self.remaining(),
                    "`len` greater than remaining"
                );
                let mut bm = BytesMut::with_capacity(len);
                bm.put(self.take(len));
                bm.freeze()
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use std::ptr;

    use super::super::super::frame::{codec::RequestFnfFrame, Encode};
    use super::*;

    fn hello_world_buf() -> BufList<Bytes> {
        BufList {
            bufs: vec![
                Bytes::from("Hello"),
                Bytes::from(" "),
                Bytes::from("World"),
            ]
            .into(),
        }
    }

    #[test]
    fn to_bytes_shorter() {
        let mut bufs = hello_world_buf();
        let old_ptr = bufs.chunk().as_ptr();
        let start = bufs.copy_to_bytes(4);
        assert_eq!(start, "Hell");
        assert!(ptr::eq(old_ptr, start.as_ptr()));
        assert_eq!(bufs.chunk(), b"o");
        assert!(ptr::eq(old_ptr.wrapping_add(4), bufs.chunk().as_ptr()));
        assert_eq!(bufs.remaining(), 7);
    }

    #[test]
    fn to_bytes_eq() {
        let mut bufs = hello_world_buf();
        let old_ptr = bufs.chunk().as_ptr();
        let start = bufs.copy_to_bytes(5);
        assert_eq!(start, "Hello");
        assert!(ptr::eq(old_ptr, start.as_ptr()));
        assert_eq!(bufs.chunk(), b" ");
        assert_eq!(bufs.remaining(), 6);
    }

    #[test]
    fn to_bytes_longer() {
        let mut bufs = hello_world_buf();
        let start = bufs.copy_to_bytes(7);
        assert_eq!(start, "Hello W");
        assert_eq!(bufs.remaining(), 4);
    }

    #[test]
    fn one_long_buf_to_bytes() {
        let mut buf = BufList::new();
        buf.push(b"Hello World" as &[_]);
        assert_eq!(buf.copy_to_bytes(5), "Hello");
        assert_eq!(buf.chunk(), b" World");
    }

    #[test]
    #[should_panic(expected = "`len` greater than remaining")]
    fn buf_to_bytes_too_many() {
        hello_world_buf().copy_to_bytes(42);
    }

    #[test]
    fn try_into_payload() {
        let f1 = RequestFnfFrame::new(
            1,
            true,
            Payload::new(Some(Bytes::from("metadata 1 ")), None),
        )
        .to_bytes();
        let f2 = RequestFnfFrame::new(
            1,
            true,
            Payload::new(
                Some(Bytes::from("metadata 2 ")),
                Some(Bytes::from("data 1 ")),
            ),
        )
        .to_bytes();
        let f3 = RequestFnfFrame::new(
            1,
            false,
            Payload::new(None, Some(Bytes::from("data 2 "))),
        )
        .to_bytes();

        let mut list = BufList::new();
        list.push(f1);
        list.push(f2);
        list.push(f3);

        let payload: Payload = list.try_into().unwrap();
        let (meta, data) = payload.split();
        assert_eq!(meta, Some(Bytes::from("metadata 1 metadata 2 ")));
        assert_eq!(data, Some(Bytes::from("data 1 data 2 ")));
    }
}