#![allow(incomplete_features)]
#![feature(thread_local)]
#![feature(async_fn_in_trait)]
mod roll;
pub use roll::*;
mod piece;
pub use piece::*;
use std::{
cell::{RefCell, RefMut},
collections::VecDeque,
marker::PhantomData,
ops::{self, Bound, RangeBounds},
};
use memmap2::MmapMut;
pub const BUF_SIZE: u16 = 4096;
#[cfg(not(feature = "miri"))]
pub const NUM_BUF: u32 = 64 * 1024;
#[cfg(feature = "miri")]
pub const NUM_BUF: u32 = 64;
#[thread_local]
static BUF_POOL: BufPool = BufPool::new_empty(BUF_SIZE, NUM_BUF);
thread_local! {
static BUF_POOL_DESTRUCTOR: RefCell<Option<MmapMut>> = RefCell::new(None);
}
type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("could not mmap buffer")]
Mmap(#[from] std::io::Error),
#[error("out of memory")]
OutOfMemory,
}
pub(crate) struct BufPool {
buf_size: u16,
num_buf: u32,
inner: RefCell<Option<BufPoolInner>>,
}
struct BufPoolInner {
ptr: *mut u8,
free: VecDeque<u32>,
ref_counts: Vec<i16>,
}
impl BufPool {
pub(crate) const fn new_empty(buf_size: u16, num_buf: u32) -> BufPool {
BufPool {
buf_size,
num_buf,
inner: RefCell::new(None),
}
}
pub(crate) fn alloc(&self) -> Result<BufMut> {
let mut inner = self.borrow_mut()?;
if let Some(index) = inner.free.pop_front() {
inner.ref_counts[index as usize] += 1;
Ok(BufMut {
index,
off: 0,
len: self.buf_size as _,
_non_send: PhantomData,
})
} else {
Err(Error::OutOfMemory)
}
}
fn inc(&self, index: u32) {
let mut inner = self.inner.borrow_mut();
let inner = inner.as_mut().unwrap();
inner.ref_counts[index as usize] += 1;
}
fn dec(&self, index: u32) {
let mut inner = self.inner.borrow_mut();
let inner = inner.as_mut().unwrap();
inner.ref_counts[index as usize] -= 1;
if inner.ref_counts[index as usize] == 0 {
inner.free.push_back(index);
}
}
#[cfg(test)]
pub(crate) fn num_free(&self) -> Result<usize> {
Ok(self.borrow_mut()?.free.len())
}
fn borrow_mut(&self) -> Result<RefMut<BufPoolInner>> {
let mut inner = self.inner.borrow_mut();
if inner.is_none() {
let len = self.num_buf as usize * self.buf_size as usize;
let ptr: *mut u8;
#[cfg(feature = "miri")]
{
let mut map = vec![0; len];
ptr = map.as_mut_ptr();
std::mem::forget(map);
}
#[cfg(not(feature = "miri"))]
{
let mut map = memmap2::MmapOptions::new().len(len).map_anon()?;
ptr = map.as_mut_ptr();
BUF_POOL_DESTRUCTOR.with(|destructor| {
*destructor.borrow_mut() = Some(map);
});
}
let mut free = VecDeque::with_capacity(self.num_buf as usize);
for i in 0..self.num_buf {
free.push_back(i);
}
let ref_counts = vec![0; self.num_buf as usize];
*inner = Some(BufPoolInner {
ptr,
free,
ref_counts,
});
}
let r = RefMut::map(inner, |o| o.as_mut().unwrap());
Ok(r)
}
#[inline(always)]
unsafe fn base_ptr(&self, index: u32) -> *mut u8 {
let start = index as usize * self.buf_size as usize;
self.inner.borrow_mut().as_mut().unwrap().ptr.add(start)
}
}
pub struct BufMut {
pub(crate) index: u32,
off: u16,
len: u16,
_non_send: PhantomData<*mut ()>,
}
impl BufMut {
#[inline(always)]
pub fn alloc() -> Result<BufMut, Error> {
BUF_POOL.alloc()
}
#[inline(always)]
pub fn len(&self) -> usize {
self.len as _
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[inline]
pub fn freeze(self) -> Buf {
let b = Buf {
index: self.index,
off: self.off,
len: self.len,
_non_send: PhantomData,
};
std::mem::forget(self); b
}
pub(crate) fn freeze_slice(&self, range: impl RangeBounds<usize>) -> Buf {
let b = Buf {
index: self.index,
off: self.off,
len: self.len,
_non_send: PhantomData,
};
b.slice(range)
}
#[inline]
pub fn split_at(self, at: usize) -> (Self, Self) {
assert!(at <= self.len as usize);
let left = BufMut {
index: self.index,
off: self.off,
len: at as _,
_non_send: PhantomData,
};
let right = BufMut {
index: self.index,
off: self.off + at as u16,
len: (self.len - at as u16),
_non_send: PhantomData,
};
std::mem::forget(self); BUF_POOL.inc(left.index); (left, right)
}
pub fn skip(&mut self, n: usize) {
assert!(n <= self.len as usize);
let u16_n: u16 = n.try_into().unwrap();
self.off += u16_n;
self.len -= u16_n;
}
}
impl ops::Deref for BufMut {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(
BUF_POOL.base_ptr(self.index).add(self.off as _),
self.len as _,
)
}
}
}
impl ops::DerefMut for BufMut {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe {
std::slice::from_raw_parts_mut(
BUF_POOL.base_ptr(self.index).add(self.off as _),
self.len as _,
)
}
}
}
unsafe impl fluke_maybe_uring::buf::IoBuf for BufMut {
fn stable_ptr(&self) -> *const u8 {
unsafe { BUF_POOL.base_ptr(self.index).add(self.off as _) as *const u8 }
}
fn bytes_init(&self) -> usize {
self.len as _
}
fn bytes_total(&self) -> usize {
self.len as _
}
}
unsafe impl fluke_maybe_uring::buf::IoBufMut for BufMut {
fn stable_mut_ptr(&mut self) -> *mut u8 {
unsafe { BUF_POOL.base_ptr(self.index).add(self.off as _) }
}
unsafe fn set_init(&mut self, _pos: usize) {
}
}
impl Drop for BufMut {
fn drop(&mut self) {
BUF_POOL.dec(self.index);
}
}
pub struct Buf {
pub(crate) index: u32,
off: u16,
len: u16,
_non_send: PhantomData<*mut ()>,
}
impl Buf {
#[inline(always)]
pub fn len(&self) -> usize {
self.len as _
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn slice(mut self, range: impl RangeBounds<usize>) -> Self {
let mut new_start = 0;
let mut new_end = self.len();
match range.start_bound() {
Bound::Included(&n) => new_start = n,
Bound::Excluded(&n) => new_start = n + 1,
Bound::Unbounded => {}
}
match range.end_bound() {
Bound::Included(&n) => new_end = n + 1,
Bound::Excluded(&n) => new_end = n,
Bound::Unbounded => {}
}
assert!(new_start <= new_end);
assert!(new_end <= self.len());
self.off += new_start as u16;
self.len = (new_end - new_start) as u16;
self
}
#[inline]
pub fn split_at(self, at: usize) -> (Self, Self) {
assert!(at <= self.len as usize);
let left = Buf {
index: self.index,
off: self.off,
len: at as _,
_non_send: PhantomData,
};
let right = Buf {
index: self.index,
off: self.off + at as u16,
len: (self.len - at as u16),
_non_send: PhantomData,
};
std::mem::forget(self); BUF_POOL.inc(left.index); (left, right)
}
}
unsafe impl fluke_maybe_uring::buf::IoBuf for Buf {
fn stable_ptr(&self) -> *const u8 {
unsafe { BUF_POOL.base_ptr(self.index).add(self.off as _) as *const u8 }
}
fn bytes_init(&self) -> usize {
self.len as _
}
fn bytes_total(&self) -> usize {
self.len as _
}
}
impl ops::Deref for Buf {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(
BUF_POOL.base_ptr(self.index).add(self.off as _),
self.len as _,
)
}
}
}
impl Clone for Buf {
fn clone(&self) -> Self {
BUF_POOL.inc(self.index);
Self {
index: self.index,
off: self.off,
len: self.len,
_non_send: PhantomData,
}
}
}
impl Drop for Buf {
fn drop(&mut self) {
BUF_POOL.dec(self.index);
}
}
#[cfg(test)]
mod tests {
use crate::{Buf, BufMut, BUF_POOL};
use std::rc::Rc;
#[test]
fn size_test() {
assert_eq!(8, std::mem::size_of::<BufMut>());
assert_eq!(8, std::mem::size_of::<Buf>());
assert_eq!(16, std::mem::size_of::<Box<[u8]>>());
assert_eq!(16, std::mem::size_of::<&[u8]>());
#[allow(dead_code)]
enum BufOrBox {
Buf(Buf),
Box((Rc<Box<[u8]>>, u32, u32)),
}
assert_eq!(16, std::mem::size_of::<BufOrBox>());
#[allow(dead_code)]
enum Chunk {
Buf(Buf),
Box(Box<[u8]>),
Static(&'static [u8]),
}
assert_eq!(24, std::mem::size_of::<Chunk>());
}
#[test]
fn freeze_test() -> eyre::Result<()> {
let total_bufs = BUF_POOL.num_free()?;
let mut bm = BufMut::alloc().unwrap();
assert_eq!(total_bufs - 1, BUF_POOL.num_free()?);
assert_eq!(bm.len(), 4096);
bm[..11].copy_from_slice(b"hello world");
assert_eq!(&bm[..11], b"hello world");
let b = bm.freeze();
assert_eq!(&b[..11], b"hello world");
assert_eq!(total_bufs - 1, BUF_POOL.num_free()?);
let b2 = b.clone();
assert_eq!(&b[..11], b"hello world");
assert_eq!(total_bufs - 1, BUF_POOL.num_free()?);
drop(b);
assert_eq!(total_bufs - 1, BUF_POOL.num_free()?);
drop(b2);
assert_eq!(total_bufs, BUF_POOL.num_free()?);
Ok(())
}
#[test]
fn split_test() -> eyre::Result<()> {
let total_bufs = BUF_POOL.num_free()?;
let mut bm = BufMut::alloc().unwrap();
bm[..12].copy_from_slice(b"yellowjacket");
let (a, b) = bm.split_at(6);
assert_eq!(total_bufs - 1, BUF_POOL.num_free()?);
assert_eq!(&a[..], b"yellow");
assert_eq!(&b[..6], b"jacket");
drop((a, b));
Ok(())
}
}