linked-buffer 0.0.2

Yet another linked buffer implemention.
use std::cell::UnsafeCell;

use bytes::Buf;

use crate::block::BLOCK_CAP;
use crate::Buffer as RawBuffer;
use crate::BufferMut as RawBufferMut;

const MIN_RESERVE: usize = 4096;
const DEFAULT_VEC_SIZE: usize = 16;

pub struct Buffer {
    inner: RawBuffer,
    read_vec: UnsafeCell<Vec<libc::iovec>>,
}

pub struct BufferMut {
    inner: RawBufferMut,
    read_vec: UnsafeCell<Vec<libc::iovec>>,
    write_vec: UnsafeCell<Vec<libc::iovec>>,
}

impl Default for BufferMut {
    fn default() -> Self {
        Self {
            inner: RawBufferMut::new(),
            read_vec: UnsafeCell::new(Vec::with_capacity(DEFAULT_VEC_SIZE)),
            write_vec: UnsafeCell::new(Vec::with_capacity(DEFAULT_VEC_SIZE)),
        }
    }
}

impl BufferMut {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn with_capacity(capacity: usize) -> Self {
        Self {
            inner: RawBufferMut::with_capacity(capacity),
            read_vec: UnsafeCell::new(Vec::with_capacity(DEFAULT_VEC_SIZE)),
            write_vec: UnsafeCell::new(Vec::with_capacity(DEFAULT_VEC_SIZE)),
        }
    }

    pub fn with_vec_size(read_vec_size: usize, write_vec_size: usize) -> Self {
        Self {
            inner: RawBufferMut::new(),
            read_vec: UnsafeCell::new(Vec::with_capacity(read_vec_size)),
            write_vec: UnsafeCell::new(Vec::with_capacity(write_vec_size)),
        }
    }
}

macro_rules! impl_deref {
    ($self: ty, $target: ty) => {
        impl std::ops::Deref for $self {
            type Target = $target;

            fn deref(&self) -> &Self::Target {
                &self.inner
            }
        }

        impl std::ops::DerefMut for $self {
            fn deref_mut(&mut self) -> &mut Self::Target {
                &mut self.inner
            }
        }
    };
}

impl_deref!(Buffer, RawBuffer);
impl_deref!(BufferMut, RawBufferMut);

macro_rules! impl_io_vec_buf {
    ($self: ty) => {
        unsafe impl monoio::buf::IoVecBuf for $self {
            fn read_iovec_ptr(&self) -> *const libc::iovec {
                // Cache libc::iovecs to vec.
                let vec = unsafe { &mut *self.read_vec.get() };

                let ptr = vec.as_mut_ptr();
                let cap = vec.capacity();
                let slice = unsafe { std::slice::from_raw_parts_mut(ptr as *mut _, cap) };

                let new_len = self.chunks_vectored(slice);
                unsafe { vec.set_len(new_len) };

                vec.as_ptr()
            }

            fn read_iovec_len(&self) -> usize {
                // stable_iovec_ptr must have been called before.
                // so we just get vec length will be fine.
                let vec = unsafe { &*self.read_vec.get() };
                vec.len()
            }
        }
    };
}

impl_io_vec_buf!(Buffer);
impl_io_vec_buf!(BufferMut);

unsafe impl monoio::buf::IoVecBufMut for BufferMut {
    fn write_iovec_ptr(&mut self) -> *mut libc::iovec {
        // make sure there is at least MIN_RESERVE space to write.
        self.reserve(MIN_RESERVE);

        // current node
        let mut maybe_node = self.tail;
        // current block begin index
        let mut block_begin_offset = self.write_offset;
        // move to next block if the first block is empty
        if self.write_offset == BLOCK_CAP {
            // unwrap is safe since we checked is_empty()
            maybe_node = unsafe { maybe_node.unwrap().as_ref() }.next;
            block_begin_offset = 0;
        }

        // how many IoSlice we write
        let dst = unsafe { &mut *self.write_vec.get() };
        unsafe { dst.set_len(0) };
        while let Some(mut node) = maybe_node {
            if dst.len() == dst.capacity() {
                // if no space to write, return
                break;
            }

            let ptr = unsafe {
                node.as_mut()
                    .block
                    .as_mut()
                    .buf
                    .as_mut_ptr()
                    .add(block_begin_offset)
            };
            let len = BLOCK_CAP - block_begin_offset;
            dst.push(libc::iovec {
                iov_base: ptr as _,
                iov_len: len,
            });
            block_begin_offset = 0;
            maybe_node = unsafe { node.as_ref() }.next;
        }
        dst.as_mut_ptr()
    }

    fn write_iovec_len(&self) -> usize {
        let dst = unsafe { &*self.write_vec.get() };
        dst.len()
    }

    unsafe fn set_init(&mut self, pos: usize) {
        self.advance_mut(pos);
    }
}

#[cfg(test)]
mod tests {
    use bytes::BufMut;
    use monoio::buf::{IoVecBuf, IoVecBufMut};

    use super::*;

    #[test]
    fn read_iovec() {
        let mut b = BufferMut::new();
        b.read_iovec_ptr();
        assert_eq!(b.read_iovec_len(), 0);

        // write the first block
        b.put_u32(0xdeadbeef);
        assert_eq!(b.len(), 4);
        assert_eq!(b.capacity(), BLOCK_CAP);

        let ptr = b.read_iovec_ptr();
        assert_eq!(b.read_iovec_len(), 1);
        assert_eq!(
            unsafe { std::ptr::read((*ptr).iov_base as *const [u8; 4]) },
            [0xde, 0xad, 0xbe, 0xef]
        );
        assert_eq!(unsafe { (*ptr).iov_len }, 4);

        // write another block
        unsafe { b.advance_mut(BLOCK_CAP - 4) };
        b.put_u32(0xcafebabe);
        assert_eq!(b.len(), 4 + BLOCK_CAP);
        assert_eq!(b.capacity(), BLOCK_CAP * 2);

        let ptr = b.read_iovec_ptr();
        assert_eq!(b.read_iovec_len(), 2);
        assert_eq!(
            unsafe { std::ptr::read((*ptr).iov_base as *const [u8; 4]) },
            [0xde, 0xad, 0xbe, 0xef]
        );
        assert_eq!(unsafe { (*ptr).iov_len }, BLOCK_CAP);
        assert_eq!(
            unsafe { std::ptr::read((*ptr.add(1)).iov_base as *const [u8; 4]) },
            [0xca, 0xfe, 0xba, 0xbe]
        );
        assert_eq!(unsafe { (*ptr.add(1)).iov_len }, 4);

        // mark read 2
        b.advance(2);
        assert_eq!(b.len(), 2 + BLOCK_CAP);
        assert_eq!(b.capacity(), BLOCK_CAP * 2 - 2);
        let ptr = b.read_iovec_ptr();
        assert_eq!(b.read_iovec_len(), 2);
        assert_eq!(
            unsafe { std::ptr::read((*ptr).iov_base as *const [u8; 2]) },
            [0xbe, 0xef]
        );
        assert_eq!(unsafe { (*ptr).iov_len }, BLOCK_CAP - 2);
        assert_eq!(
            unsafe { std::ptr::read((*ptr.add(1)).iov_base as *const [u8; 4]) },
            [0xca, 0xfe, 0xba, 0xbe]
        );
        assert_eq!(unsafe { (*ptr.add(1)).iov_len }, 4);

        // mark read whole block
        b.advance(BLOCK_CAP - 2);
        assert_eq!(b.len(), 4);
        assert_eq!(b.capacity(), BLOCK_CAP);
        let ptr = b.read_iovec_ptr();
        assert_eq!(b.read_iovec_len(), 1);
        assert_eq!(
            unsafe { std::ptr::read((*ptr).iov_base as *const [u8; 4]) },
            [0xca, 0xfe, 0xba, 0xbe]
        );
        assert_eq!(unsafe { (*ptr).iov_len }, 4);

        // mark all data read
        b.advance(4);
        assert!(b.is_empty());
        assert_eq!(b.capacity(), BLOCK_CAP - 4);
        b.read_iovec_ptr();
        assert_eq!(b.read_iovec_len(), 0);
    }

    #[test]
    fn write_iovec() {
        let mut b = BufferMut::new();

        // default reserve 4096 which is BLOCK_CAP.
        let ptr = b.write_iovec_ptr();
        assert_eq!(b.write_iovec_len(), 1);
        assert_eq!(unsafe { (*ptr).iov_len }, BLOCK_CAP);

        // write data which should alloc another block on converting
        b.put_u8(1);
        let ptr = b.write_iovec_ptr();
        assert_eq!(b.write_iovec_len(), 2);
        assert_eq!(unsafe { (*ptr).iov_len }, BLOCK_CAP - 1);
        assert_eq!(unsafe { (*ptr.add(1)).iov_len }, BLOCK_CAP);
    }
}