armdb 0.1.14

sharded bitcask key-value storage optimized for NVMe
Documentation
use std::alloc::{Layout, alloc, dealloc};
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, Ordering};

const INLINE_SIZE: usize = 20;

/// Immutable byte slice: inline up to 20 bytes, heap-allocated with reference counting for larger.
/// Total size: 24 bytes on 64-bit systems, alignment: 8.
pub struct ByteView {
    len: u32,
    repr: ByteViewRepr,
}

#[repr(C)]
union ByteViewRepr {
    inline: InlineRepr,
    heap: HeapRepr,
}

#[repr(C)]
#[derive(Clone, Copy)]
struct InlineRepr {
    data: [u8; INLINE_SIZE],
}

#[repr(C)]
#[derive(Clone, Copy)]
struct HeapRepr {
    prefix: [u8; 4],
    ptr: *const u8,
    _rc: *mut AtomicU32,
}

impl ByteView {
    /// Create a new `ByteView` from a byte slice.
    /// If `data.len() <= 20`, the data is stored inline.
    /// Otherwise, a heap allocation is made with a reference count of 1.
    pub fn new(data: &[u8]) -> Self {
        assert!(data.len() <= u32::MAX as usize);

        if data.len() <= INLINE_SIZE {
            let mut buf = [0u8; INLINE_SIZE];
            buf[..data.len()].copy_from_slice(data);
            Self {
                len: data.len() as u32,
                repr: ByteViewRepr {
                    inline: InlineRepr { data: buf },
                },
            }
        } else {
            Self::alloc_heap(data)
        }
    }

    /// Create a `ByteView` from a `Vec<u8>`.
    /// Inline for small values, heap-allocated with 8-byte alignment for larger.
    pub fn from_vec(data: Vec<u8>) -> Self {
        if data.len() <= INLINE_SIZE {
            return Self::new(&data);
        }
        Self::alloc_heap(&data)
    }

    #[inline]
    pub fn is_inline(&self) -> bool {
        self.len <= INLINE_SIZE as u32
    }

    #[inline]
    pub fn len(&self) -> usize {
        self.len as usize
    }

    #[inline]
    pub fn is_empty(&self) -> bool {
        self.len == 0
    }

    #[inline]
    pub fn as_bytes(&self) -> &[u8] {
        if self.is_inline() {
            unsafe { &self.repr.inline.data[..self.len as usize] }
        } else {
            unsafe { std::slice::from_raw_parts(self.repr.heap.ptr, self.len as usize) }
        }
    }

    fn alloc_heap(data: &[u8]) -> Self {
        let len = data.len() as u32;
        let mut prefix = [0u8; 4];
        prefix.copy_from_slice(&data[..4]);

        let layout = Layout::from_size_align(data.len(), 8).expect("invalid layout");
        let ptr = unsafe { alloc(layout) };
        if ptr.is_null() {
            std::alloc::handle_alloc_error(layout);
        }
        unsafe {
            std::ptr::copy_nonoverlapping(data.as_ptr(), ptr, data.len());
        }

        let rc = Box::into_raw(Box::new(AtomicU32::new(1)));

        Self {
            len,
            repr: ByteViewRepr {
                heap: HeapRepr {
                    prefix,
                    ptr,
                    _rc: rc,
                },
            },
        }
    }

    #[inline]
    fn rc(&self) -> &AtomicU32 {
        debug_assert!(!self.is_inline());
        unsafe { &*self.repr.heap._rc }
    }
}

impl Deref for ByteView {
    type Target = [u8];

    #[inline]
    fn deref(&self) -> &[u8] {
        self.as_bytes()
    }
}

impl Clone for ByteView {
    fn clone(&self) -> Self {
        if self.is_inline() {
            Self {
                len: self.len,
                repr: ByteViewRepr {
                    inline: unsafe { self.repr.inline },
                },
            }
        } else {
            self.rc().fetch_add(1, Ordering::Relaxed);
            Self {
                len: self.len,
                repr: ByteViewRepr {
                    heap: unsafe { self.repr.heap },
                },
            }
        }
    }
}

impl Drop for ByteView {
    fn drop(&mut self) {
        if self.is_inline() {
            return;
        }
        let rc = self.rc();
        if rc.fetch_sub(1, Ordering::AcqRel) == 1 {
            unsafe {
                let heap = self.repr.heap;
                let layout = Layout::from_size_align(self.len as usize, 8).expect("invalid layout");
                dealloc(heap.ptr as *mut u8, layout);
                drop(Box::from_raw(heap._rc));
            }
        }
    }
}

impl PartialEq for ByteView {
    fn eq(&self, other: &Self) -> bool {
        self.as_bytes() == other.as_bytes()
    }
}

impl Eq for ByteView {}

impl Hash for ByteView {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.as_bytes().hash(state);
    }
}

impl From<&[u8]> for ByteView {
    #[inline]
    fn from(data: &[u8]) -> Self {
        Self::new(data)
    }
}

impl From<Vec<u8>> for ByteView {
    #[inline]
    fn from(data: Vec<u8>) -> Self {
        Self::from_vec(data)
    }
}

impl AsRef<[u8]> for ByteView {
    fn as_ref(&self) -> &[u8] {
        self.as_bytes()
    }
}

impl std::fmt::Debug for ByteView {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ByteView")
            .field("len", &self.len())
            .field("inline", &self.is_inline())
            .finish()
    }
}

// SAFETY: ByteView is immutable after construction. Heap data is shared via AtomicU32 ref count.
unsafe impl Send for ByteView {}
unsafe impl Sync for ByteView {}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_inline_small() {
        let view = ByteView::new(&[1, 2, 3]);
        assert!(view.is_inline());
        assert_eq!(view.len(), 3);
        assert_eq!(view.as_bytes(), &[1, 2, 3]);
    }

    #[test]
    fn test_inline_max() {
        let data = [0xAB; 20];
        let view = ByteView::new(&data);
        assert!(view.is_inline());
        assert_eq!(view.len(), 20);
        assert_eq!(view.as_bytes(), &data);
    }

    #[test]
    fn test_heap_min() {
        let data = [0xCD; 21];
        let view = ByteView::new(&data);
        assert!(!view.is_inline());
        assert_eq!(view.len(), 21);
        assert_eq!(view.as_bytes(), &data);
    }

    #[test]
    fn test_heap_large() {
        let data = [42u8; 1000];
        let view = ByteView::new(&data);
        assert_eq!(view.as_bytes(), &data);
    }

    #[test]
    fn test_empty() {
        let view = ByteView::new(&[]);
        assert_eq!(view.len(), 0);
        assert!(view.is_empty());
        assert!(view.is_inline());
    }

    #[test]
    fn test_clone_inline() {
        let view = ByteView::new(&[10, 20, 30]);
        let cloned = view.clone();
        assert_eq!(view.as_bytes(), cloned.as_bytes());
    }

    #[test]
    fn test_clone_heap() {
        let data = [0xFF; 64];
        let view = ByteView::new(&data);
        let cloned = view.clone();
        assert_eq!(view.as_bytes(), cloned.as_bytes());
        drop(view);
        assert_eq!(cloned.as_bytes(), &data);
        drop(cloned);
    }

    #[test]
    fn test_from_vec_inline() {
        let data = vec![5u8; 20];
        let from_vec = ByteView::from_vec(data.clone());
        let from_new = ByteView::new(&data);
        assert_eq!(from_vec, from_new);
    }

    #[test]
    fn test_from_vec_heap() {
        let data = vec![7u8; 100];
        let from_vec = ByteView::from_vec(data.clone());
        let from_new = ByteView::new(&data);
        assert_eq!(from_vec, from_new);
    }

    #[test]
    fn test_eq() {
        let a = ByteView::new(&[1, 2, 3]);
        let b = ByteView::new(&[1, 2, 3]);
        let c = ByteView::new(&[4, 5, 6]);
        assert_eq!(a, b);
        assert_ne!(a, c);
    }

    #[test]
    fn test_deref() {
        let data: &[u8] = &[9, 8, 7, 6, 5];
        let view = ByteView::new(data);
        let derefed: &[u8] = &view;
        assert_eq!(derefed, data);
    }
}