use std::alloc::{alloc, alloc_zeroed, dealloc, Layout};
use std::ptr::NonNull;
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
use std::sync::Arc;
use crate::layout::PAGE_SIZE;
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
use super::buffer_pool::{BlobBufPool, BlobBufPoolInner};
pub const BUF_ALIGN: usize = 4096;
pub(crate) type FixedBufferIndex = u16;
pub struct AlignedBlobBuf {
ptr: NonNull<u8>,
owner: BlobBufOwner,
}
enum BlobBufOwner {
Heap,
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
Pool {
pool: Arc<BlobBufPoolInner>,
index: FixedBufferIndex,
},
}
impl AlignedBlobBuf {
#[must_use]
pub fn zeroed() -> Self {
let layout = Self::layout();
let raw = unsafe { alloc_zeroed(layout) };
let ptr = NonNull::new(raw).unwrap_or_else(|| std::alloc::handle_alloc_error(layout));
Self {
ptr,
owner: BlobBufOwner::Heap,
}
}
#[must_use]
pub fn uninit() -> Self {
let layout = Self::layout();
let raw = unsafe { alloc(layout) };
let ptr = NonNull::new(raw).unwrap_or_else(|| std::alloc::handle_alloc_error(layout));
Self {
ptr,
owner: BlobBufOwner::Heap,
}
}
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
#[must_use]
pub(crate) fn pooled_uninit(pool: &BlobBufPool) -> Option<Self> {
let index = pool.inner.alloc_slot()?;
let ptr = pool.inner.ptr_for_index(index);
Some(Self {
ptr,
owner: BlobBufOwner::Pool {
pool: Arc::clone(&pool.inner),
index,
},
})
}
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
#[must_use]
pub(crate) fn pooled_zeroed(pool: &BlobBufPool) -> Option<Self> {
let mut out = Self::pooled_uninit(pool)?;
out.as_mut_slice().fill(0);
Some(out)
}
#[must_use]
pub(crate) fn fixed_buffer_index(&self) -> Option<FixedBufferIndex> {
match &self.owner {
BlobBufOwner::Heap => None,
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
BlobBufOwner::Pool { index, .. } => Some(*index),
}
}
#[must_use]
pub(crate) fn zeroed_like(&self) -> Self {
match &self.owner {
BlobBufOwner::Heap => Self::zeroed(),
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
BlobBufOwner::Pool { pool, .. } => {
let wrapper = BlobBufPool {
inner: Arc::clone(pool),
};
Self::pooled_zeroed(&wrapper).unwrap_or_else(Self::zeroed)
}
}
}
#[must_use]
pub fn as_ptr(&self) -> *const u8 {
self.ptr.as_ptr()
}
#[must_use]
pub fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr.as_ptr()
}
#[must_use]
pub fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), PAGE_SIZE as usize) }
}
#[must_use]
pub fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), PAGE_SIZE as usize) }
}
#[must_use]
pub const fn len(&self) -> usize {
PAGE_SIZE as usize
}
#[must_use]
pub const fn is_empty(&self) -> bool {
false
}
fn layout() -> Layout {
Layout::from_size_align(PAGE_SIZE as usize, BUF_ALIGN)
.expect("PAGE_SIZE/BUF_ALIGN both > 0 and BUF_ALIGN is a power of two")
}
}
impl Drop for AlignedBlobBuf {
fn drop(&mut self) {
match &self.owner {
BlobBufOwner::Heap => unsafe { dealloc(self.ptr.as_ptr(), Self::layout()) },
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
BlobBufOwner::Pool { pool, index } => pool.free_slot(*index),
}
}
}
impl Default for AlignedBlobBuf {
fn default() -> Self {
Self::zeroed()
}
}
impl Clone for AlignedBlobBuf {
fn clone(&self) -> Self {
let mut out = match &self.owner {
BlobBufOwner::Heap => Self::uninit(),
#[cfg(any(test, all(target_os = "linux", feature = "io-uring")))]
BlobBufOwner::Pool { pool, .. } => {
let wrapper = BlobBufPool {
inner: Arc::clone(pool),
};
Self::pooled_uninit(&wrapper).unwrap_or_else(Self::uninit)
}
};
out.as_mut_slice().copy_from_slice(self.as_slice());
out
}
}
impl std::fmt::Debug for AlignedBlobBuf {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"AlignedBlobBuf({:p}, {} B, fixed={:?})",
self.ptr.as_ptr(),
PAGE_SIZE,
self.fixed_buffer_index(),
)
}
}
unsafe impl Send for AlignedBlobBuf {}
unsafe impl Sync for AlignedBlobBuf {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn zeroed_is_zeroed() {
let b = AlignedBlobBuf::zeroed();
assert_eq!(b.len(), PAGE_SIZE as usize);
assert!(b.as_slice().iter().all(|&x| x == 0));
}
#[test]
fn pointer_is_4k_aligned() {
for _ in 0..16 {
let b = AlignedBlobBuf::zeroed();
assert_eq!(b.as_ptr() as usize % BUF_ALIGN, 0);
}
}
#[test]
fn clone_is_independent_memcpy() {
let mut a = AlignedBlobBuf::zeroed();
a.as_mut_slice()[100] = 0xAB;
let mut b = a.clone();
assert_eq!(b.as_slice()[100], 0xAB);
b.as_mut_slice()[100] = 0xCD;
assert_eq!(a.as_slice()[100], 0xAB, "clone must not alias source");
assert_eq!(b.as_slice()[100], 0xCD);
}
#[test]
fn pooled_buffer_returns_fixed_index() {
let pool = BlobBufPool::new(2).unwrap();
let a = AlignedBlobBuf::pooled_zeroed(&pool).unwrap();
let b = AlignedBlobBuf::pooled_zeroed(&pool).unwrap();
assert_eq!(a.len(), PAGE_SIZE as usize);
assert!(a.fixed_buffer_index().is_some());
assert!(b.fixed_buffer_index().is_some());
assert_ne!(a.fixed_buffer_index(), b.fixed_buffer_index());
assert!(AlignedBlobBuf::pooled_uninit(&pool).is_none());
drop(a);
assert!(AlignedBlobBuf::pooled_uninit(&pool).is_some());
}
#[test]
fn pooled_buffer_free_list_survives_concurrent_churn() {
let pool = BlobBufPool::new(8).unwrap();
std::thread::scope(|scope| {
for _ in 0..8 {
let pool = pool.clone();
scope.spawn(move || {
for _ in 0..1000 {
let mut b = loop {
if let Some(b) = AlignedBlobBuf::pooled_uninit(&pool) {
break b;
}
std::hint::spin_loop();
};
b.as_mut_slice()[0] = 0x7B;
}
});
}
});
let leased: Vec<_> = (0..8)
.map(|_| AlignedBlobBuf::pooled_uninit(&pool).unwrap())
.collect();
assert!(AlignedBlobBuf::pooled_uninit(&pool).is_none());
drop(leased);
assert!(AlignedBlobBuf::pooled_uninit(&pool).is_some());
}
#[test]
fn uninit_is_writable() {
let mut b = AlignedBlobBuf::uninit();
b.as_mut_slice().fill(0x42);
assert!(b.as_slice().iter().all(|&x| x == 0x42));
}
#[test]
fn default_equals_zeroed() {
let b = AlignedBlobBuf::default();
assert!(b.as_slice().iter().all(|&x| x == 0));
}
}