round_pipers 0.2.0

A way to pipe ndarrays using circular buffers
Documentation
//! Buffer implementations for pipe data storage
//!
//! This module provides two buffer types for different use cases:
//!
//! ## CircularBuffer
//! A high-performance circular buffer using Linux `memfd` + double-mapped memory.
//! Uses `mmap` to create two contiguous mappings of the same memory region, allowing
//! seamless wraparound without data copying. Perfect for streaming data where the
//! write pointer can wrap around the buffer boundaries.
//!
//! ## Buffer  
//! A simple heap-allocated buffer using `Vec<u8>` for straightforward data storage.
//! Used by write-only pipes and other scenarios where dynamic resizing and simple
//! byte access is needed.
//!
//! Both buffers provide type-safe views via `view()` and `view_mut()` methods that
//! convert the underlying byte storage to typed slices with proper alignment checks.
use crate::error::{PipeError, Result};
use bytemuck::Zeroable;
use nix::{
    fcntl::{fcntl, SealFlag, F_ADD_SEALS},
    sys::{
        memfd::{memfd_create, MemFdCreateFlag},
        mman::{mmap, munmap, MapFlags, ProtFlags},
    },
    unistd::ftruncate,
};
use std::any::type_name;
use std::ffi::{c_void, CString};
use std::mem::size_of;
use std::num::NonZeroUsize;
use std::os::fd::{AsRawFd, OwnedFd};
use std::path::Path;
use std::ptr::NonNull;
use std::slice::{from_raw_parts, from_raw_parts_mut};

pub(crate) struct CircularBuffer {
    _fd: OwnedFd,
    rwptr: NonNull<c_void>,
    nbytes: usize,
}

impl CircularBuffer {
    /// Get the size of the buffer in bytes (single mapping size, not double-mapped total)
    pub(crate) fn size_bytes(&self) -> usize {
        self.nbytes
    }

    pub(crate) fn new(path: impl AsRef<Path>, size: usize) -> Result<CircularBuffer> {
        let fd = memfd_create(
            &CString::new(path.as_ref().as_os_str().as_encoded_bytes())?,
            MemFdCreateFlag::MFD_ALLOW_SEALING,
        )?;
        let page_size = 4096;
        let nbytes = (size / page_size + 1) * page_size;
        ftruncate(&fd, nbytes.try_into()?)?;
        // Safe because: fd is valid, size is aligned to page boundaries, and we check return value
        // Pages will come in with zero values which is fine because all of the views provided are
        // Copy + Zeroable, and as long as no one uses the view / view_mut as unsafe the buffer
        // will maintain valid values.
        //
        // Note that nix doesn't offer a safe mmap -- it is inherently unsafe.
        let rwptr = unsafe {
            mmap(
                None,
                (nbytes * 2).try_into()?, //Overallocate how much we're requesting so that the next mmap can be contiguous
                ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
                MapFlags::MAP_SHARED,
                &fd,
                0,
            )?
        };
        let desired_pointer = (rwptr.as_ptr() as usize) + nbytes;
        // Safe because:
        //   * Mapping is guaranteed to only overlap with the mmap we just did. That means we can't
        //     clobber someone elses memory
        //   * Size and alignments are checked already
        //   * Copy + Zeroable ...
        let rwptr_copy = unsafe {
            mmap(
                Some(
                    NonZeroUsize::new(desired_pointer).expect("desired pointer has to be non-zero"),
                ),
                nbytes.try_into()?,
                ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
                MapFlags::MAP_SHARED | MapFlags::MAP_FIXED,
                &fd,
                0,
            )?
        };
        assert_eq!(rwptr_copy.as_ptr() as usize, desired_pointer);

        //This is really insuring our safety, and that nothing can change about our 'file', and our
        //memmaps

        //Make it so the file can't grow, shrink, be written to (except by the two mmaps above)
        fcntl(fd.as_raw_fd(), F_ADD_SEALS(SealFlag::F_SEAL_SHRINK))?;
        fcntl(fd.as_raw_fd(), F_ADD_SEALS(SealFlag::F_SEAL_GROW))?;
        fcntl(fd.as_raw_fd(), F_ADD_SEALS(SealFlag::F_SEAL_FUTURE_WRITE))?;

        //Make it so the seals can't be changed.
        fcntl(fd.as_raw_fd(), F_ADD_SEALS(SealFlag::F_SEAL_SEAL))?;

        Ok(CircularBuffer {
            _fd: fd,
            rwptr,
            nbytes,
        })
    }
    pub(crate) fn view<T: Copy + Zeroable>(&self) -> Result<&[T]> {
        let size = size_of::<T>();
        if self.nbytes % size != 0 {
            return Err(PipeError::type_size_mismatch(
                self.nbytes,
                size,
                type_name::<T>(),
            ));
        }

        // Safe because: rwptr is valid from mmap, size calculation ensures proper alignment,
        // lifetime is bounded by self, and T is Copy + Zero so the initial state for pages from
        // mmap is valid.
        Ok(unsafe { from_raw_parts(self.rwptr.as_ptr() as *mut T, self.nbytes * 2 / size) })
    }
    pub(crate) fn view_mut<T: Copy + Zeroable>(&self) -> Result<&mut [T]> {
        let size = size_of::<T>();
        if self.nbytes % size != 0 {
            return Err(PipeError::type_size_mismatch(
                self.nbytes,
                size,
                type_name::<T>(),
            ));
        }

        // Safe because: rwptr is valid from mmap, size calculation ensures proper alignment,
        // lifetime is bounded by self, we have exclusive access via &self (CircularBuffer is not Sync)
        // and T is Copy + Zero so the initial state for pages from mmap is valid.
        Ok(unsafe { from_raw_parts_mut(self.rwptr.as_ptr() as *mut T, self.nbytes * 2 / size) })
    }
}
impl Drop for CircularBuffer {
    fn drop(&mut self) {
        // Safe because: rwptr is valid from mmap, size matches original allocation,
        // and this is called only once during drop.
        // Only need to munmap rwptr for it's full size because:
        //   The munmap() function shall remove any mappings for those entire
        //   pages containing any part of the address space of the process
        //   starting at addr and continuing for len bytes.
        //
        //   That means the one munmap will unmap both mmaps we did earlier
        //
        // Again nix doesn't provide any safe munmaps. This operation can not be rust safe.

        unsafe {
            match munmap(self.rwptr, 2 * self.nbytes) {
                Ok(()) => {}
                Err(e) => eprintln!("munmap failed: {}", e),
            }
        }
    }
}
pub(crate) struct Buffer {
    data: Vec<u8>,
}

impl Buffer {
    #[cfg(test)]
    pub(crate) fn new(size_bytes: usize) -> Buffer {
        Buffer {
            data: vec![0; size_bytes],
        }
    }

    pub(crate) fn with_capacity(capacity_bytes: usize) -> Buffer {
        Buffer {
            data: Vec::with_capacity(capacity_bytes),
        }
    }

    pub(crate) fn resize_to_fit(&mut self, min_bytes: usize) {
        if self.data.len() < min_bytes {
            self.data.resize(min_bytes, 0);
        }
    }

    pub(crate) fn as_bytes(&self) -> &[u8] {
        &self.data
    }

    #[cfg(test)]
    pub(crate) fn as_bytes_mut(&mut self) -> &mut [u8] {
        &mut self.data
    }

    #[cfg(test)]
    pub(crate) fn len(&self) -> usize {
        self.data.len()
    }

    #[cfg(test)]
    pub(crate) fn view<T: Copy + Zeroable>(&self) -> Result<&[T]> {
        let size = size_of::<T>();
        if self.data.len() % size != 0 {
            return Err(PipeError::type_size_mismatch(
                self.data.len(),
                size,
                type_name::<T>(),
            ));
        }

        // Safe because: Vec guarantees valid pointer and length, size calculation ensures proper alignment,
        // and lifetime is bounded by self
        Ok(unsafe { from_raw_parts(self.data.as_ptr() as *const T, self.data.len() / size) })
    }

    pub(crate) fn view_mut<T: Copy + Zeroable>(&mut self) -> Result<&mut [T]> {
        let size = size_of::<T>();
        if self.data.len() % size != 0 {
            return Err(PipeError::type_size_mismatch(
                self.data.len(),
                size,
                type_name::<T>(),
            ));
        }

        // Safe because: Vec guarantees valid pointer and length, size calculation ensures proper alignment,
        // lifetime is bounded by self, and we have exclusive mutable access
        Ok(unsafe { from_raw_parts_mut(self.data.as_mut_ptr() as *mut T, self.data.len() / size) })
    }
}

#[cfg(test)]
mod test {
    use super::*;

    // At one point I was debugging exactly how mmap / munmap worked, and the way I was munmapping
    // data at the time I would munmap the second copy of the buffer, then the first copy of the
    // buffer asking for the whol address range, but that's not how mmap worked. The only way to
    // really see the problem was to run in a multi threaded mode where there were multiple
    // threads creating and destroying CircularBuffers, so running these three tests in parallel
    // (with cargo run) showed the problem pretty quickly.
    #[test]
    fn test_circular_buffer_ctor_1() -> Result<()> {
        let _buf = CircularBuffer::new(Path::new("junk1"), 8192)?;
        Ok(())
    }
    #[test]
    fn test_circular_buffer_ctor_2() -> Result<()> {
        let _buf = CircularBuffer::new(Path::new("junk2"), 8192)?;
        Ok(())
    }
    #[test]
    fn test_circular_buffer_ctor_3() -> Result<()> {
        let _buf = CircularBuffer::new(Path::new("junk3"), 8192)?;
        Ok(())
    }
    #[test]
    fn test_circular_buffer() -> Result<()> {
        let buf = CircularBuffer::new(Path::new("junk"), 1234)?;
        let view: &mut [u8] = buf.view_mut()?;
        assert_eq!(view.len(), 8192);
        for ii in 0..4096 {
            view[ii] = (ii + 1) as u8;
        }
        for ii in 0..8192 {
            assert_eq!(view[ii], (ii + 1) as u8);
        }

        let ro_view: &[u8] = buf.view()?;
        for ii in 0..8192 {
            assert_eq!(ro_view[ii], (ii + 1) as u8);
        }
        Ok(())
    }

    #[test]
    fn test_buffer() -> Result<()> {
        let mut buf = Buffer::new(1024);
        let view: &mut [u8] = buf.view_mut()?;
        assert_eq!(view.len(), 1024);

        for ii in 0..1024 {
            view[ii] = (ii % 256) as u8;
        }

        let ro_view: &[u8] = buf.view()?;
        for ii in 0..1024 {
            assert_eq!(ro_view[ii], (ii % 256) as u8);
        }
        Ok(())
    }

    #[test]
    fn test_buffer_generic() -> Result<()> {
        // Create buffer with 100 f64s worth of bytes
        let mut buf = Buffer::new(100 * 8);
        assert_eq!(buf.len(), 800); // 100 f64s = 800 bytes

        // Test view as f64
        let slice: &mut [f64] = buf.view_mut()?;
        assert_eq!(slice.len(), 100);
        for (i, val) in slice.iter_mut().enumerate() {
            *val = i as f64;
        }

        // Test byte access
        let bytes = buf.as_bytes();
        assert_eq!(bytes.len(), 800); // 800 bytes

        // Test resize
        buf.resize_to_fit(1200); // 150 f64s worth
        assert_eq!(buf.len(), 1200);

        Ok(())
    }
}