use crate::error::{PipeError, Result};
use bytemuck::Zeroable;
use nix::{
fcntl::{fcntl, SealFlag, F_ADD_SEALS},
sys::{
memfd::{memfd_create, MemFdCreateFlag},
mman::{mmap, munmap, MapFlags, ProtFlags},
},
unistd::ftruncate,
};
use std::any::type_name;
use std::ffi::{c_void, CString};
use std::mem::size_of;
use std::num::NonZeroUsize;
use std::os::fd::{AsRawFd, OwnedFd};
use std::path::Path;
use std::ptr::NonNull;
use std::slice::{from_raw_parts, from_raw_parts_mut};
pub(crate) struct CircularBuffer {
_fd: OwnedFd,
rwptr: NonNull<c_void>,
nbytes: usize,
}
impl CircularBuffer {
pub(crate) fn size_bytes(&self) -> usize {
self.nbytes
}
pub(crate) fn new(path: impl AsRef<Path>, size: usize) -> Result<CircularBuffer> {
let fd = memfd_create(
&CString::new(path.as_ref().as_os_str().as_encoded_bytes())?,
MemFdCreateFlag::MFD_ALLOW_SEALING,
)?;
let page_size = 4096;
let nbytes = (size / page_size + 1) * page_size;
ftruncate(&fd, nbytes.try_into()?)?;
let rwptr = unsafe {
mmap(
None,
(nbytes * 2).try_into()?, ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED,
&fd,
0,
)?
};
let desired_pointer = (rwptr.as_ptr() as usize) + nbytes;
let rwptr_copy = unsafe {
mmap(
Some(
NonZeroUsize::new(desired_pointer).expect("desired pointer has to be non-zero"),
),
nbytes.try_into()?,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED | MapFlags::MAP_FIXED,
&fd,
0,
)?
};
assert_eq!(rwptr_copy.as_ptr() as usize, desired_pointer);
fcntl(fd.as_raw_fd(), F_ADD_SEALS(SealFlag::F_SEAL_SHRINK))?;
fcntl(fd.as_raw_fd(), F_ADD_SEALS(SealFlag::F_SEAL_GROW))?;
fcntl(fd.as_raw_fd(), F_ADD_SEALS(SealFlag::F_SEAL_FUTURE_WRITE))?;
fcntl(fd.as_raw_fd(), F_ADD_SEALS(SealFlag::F_SEAL_SEAL))?;
Ok(CircularBuffer {
_fd: fd,
rwptr,
nbytes,
})
}
pub(crate) fn view<T: Copy + Zeroable>(&self) -> Result<&[T]> {
let size = size_of::<T>();
if self.nbytes % size != 0 {
return Err(PipeError::type_size_mismatch(
self.nbytes,
size,
type_name::<T>(),
));
}
Ok(unsafe { from_raw_parts(self.rwptr.as_ptr() as *mut T, self.nbytes * 2 / size) })
}
pub(crate) fn view_mut<T: Copy + Zeroable>(&self) -> Result<&mut [T]> {
let size = size_of::<T>();
if self.nbytes % size != 0 {
return Err(PipeError::type_size_mismatch(
self.nbytes,
size,
type_name::<T>(),
));
}
Ok(unsafe { from_raw_parts_mut(self.rwptr.as_ptr() as *mut T, self.nbytes * 2 / size) })
}
}
impl Drop for CircularBuffer {
fn drop(&mut self) {
unsafe {
match munmap(self.rwptr, 2 * self.nbytes) {
Ok(()) => {}
Err(e) => eprintln!("munmap failed: {}", e),
}
}
}
}
pub(crate) struct Buffer {
data: Vec<u8>,
}
impl Buffer {
#[cfg(test)]
pub(crate) fn new(size_bytes: usize) -> Buffer {
Buffer {
data: vec![0; size_bytes],
}
}
pub(crate) fn with_capacity(capacity_bytes: usize) -> Buffer {
Buffer {
data: Vec::with_capacity(capacity_bytes),
}
}
pub(crate) fn resize_to_fit(&mut self, min_bytes: usize) {
if self.data.len() < min_bytes {
self.data.resize(min_bytes, 0);
}
}
pub(crate) fn as_bytes(&self) -> &[u8] {
&self.data
}
#[cfg(test)]
pub(crate) fn as_bytes_mut(&mut self) -> &mut [u8] {
&mut self.data
}
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.data.len()
}
#[cfg(test)]
pub(crate) fn view<T: Copy + Zeroable>(&self) -> Result<&[T]> {
let size = size_of::<T>();
if self.data.len() % size != 0 {
return Err(PipeError::type_size_mismatch(
self.data.len(),
size,
type_name::<T>(),
));
}
Ok(unsafe { from_raw_parts(self.data.as_ptr() as *const T, self.data.len() / size) })
}
pub(crate) fn view_mut<T: Copy + Zeroable>(&mut self) -> Result<&mut [T]> {
let size = size_of::<T>();
if self.data.len() % size != 0 {
return Err(PipeError::type_size_mismatch(
self.data.len(),
size,
type_name::<T>(),
));
}
Ok(unsafe { from_raw_parts_mut(self.data.as_mut_ptr() as *mut T, self.data.len() / size) })
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_circular_buffer_ctor_1() -> Result<()> {
let _buf = CircularBuffer::new(Path::new("junk1"), 8192)?;
Ok(())
}
#[test]
fn test_circular_buffer_ctor_2() -> Result<()> {
let _buf = CircularBuffer::new(Path::new("junk2"), 8192)?;
Ok(())
}
#[test]
fn test_circular_buffer_ctor_3() -> Result<()> {
let _buf = CircularBuffer::new(Path::new("junk3"), 8192)?;
Ok(())
}
#[test]
fn test_circular_buffer() -> Result<()> {
let buf = CircularBuffer::new(Path::new("junk"), 1234)?;
let view: &mut [u8] = buf.view_mut()?;
assert_eq!(view.len(), 8192);
for ii in 0..4096 {
view[ii] = (ii + 1) as u8;
}
for ii in 0..8192 {
assert_eq!(view[ii], (ii + 1) as u8);
}
let ro_view: &[u8] = buf.view()?;
for ii in 0..8192 {
assert_eq!(ro_view[ii], (ii + 1) as u8);
}
Ok(())
}
#[test]
fn test_buffer() -> Result<()> {
let mut buf = Buffer::new(1024);
let view: &mut [u8] = buf.view_mut()?;
assert_eq!(view.len(), 1024);
for ii in 0..1024 {
view[ii] = (ii % 256) as u8;
}
let ro_view: &[u8] = buf.view()?;
for ii in 0..1024 {
assert_eq!(ro_view[ii], (ii % 256) as u8);
}
Ok(())
}
#[test]
fn test_buffer_generic() -> Result<()> {
let mut buf = Buffer::new(100 * 8);
assert_eq!(buf.len(), 800);
let slice: &mut [f64] = buf.view_mut()?;
assert_eq!(slice.len(), 100);
for (i, val) in slice.iter_mut().enumerate() {
*val = i as f64;
}
let bytes = buf.as_bytes();
assert_eq!(bytes.len(), 800);
buf.resize_to_fit(1200); assert_eq!(buf.len(), 1200);
Ok(())
}
}