use core::{
fmt,
mem::{self, MaybeUninit},
ops,
ptr::{self, NonNull},
slice,
};
use crossbeam_utils::Backoff;
use crate::{common::*, ArenaOptions};
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
use crate::{MmapOptions, OpenOptions};
#[allow(unused_imports)]
use std::boxed::Box;
mod backed;
use backed::*;
mod bytes;
pub use bytes::*;
mod object;
pub use object::*;
#[cfg(test)]
mod tests;
const OVERHEAD: usize = mem::size_of::<Header>();
const SLOW_RETRIES: usize = 5;
#[derive(Debug)]
#[repr(C)]
pub(super) struct Header {
sentinel: AtomicU64,
allocated: AtomicU32,
min_segment_size: AtomicU32,
}
impl Header {
#[inline]
fn new(size: u32, min_segment_size: u32) -> Self {
Self {
allocated: AtomicU32::new(size),
sentinel: AtomicU64::new(encode_segment_node(u32::MAX, u32::MAX)),
min_segment_size: AtomicU32::new(min_segment_size),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub struct ArenaError;
impl core::fmt::Display for ArenaError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "allocation failed because arena is full")
}
}
#[cfg(feature = "std")]
impl std::error::Error for ArenaError {}
struct Allocated {
offset: u32,
cap: u32,
}
pub struct Arena {
write_data_ptr: NonNull<u8>,
read_data_ptr: *const u8,
header_ptr: *const u8,
ptr: *mut u8,
data_offset: u32,
inner: AtomicPtr<()>,
cap: u32,
}
impl fmt::Debug for Arena {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let header = self.header();
let allocated = header.allocated.load(Ordering::Acquire);
let data =
unsafe { slice::from_raw_parts(self.read_data_ptr, (allocated - self.data_offset) as usize) };
f.debug_struct("Arena")
.field("cap", &self.cap)
.field("header", header)
.field("data", &data)
.finish()
}
}
impl Clone for Arena {
fn clone(&self) -> Self {
unsafe {
let shared: *mut Shared = self.inner.load(Ordering::Relaxed).cast();
let old_size = (*shared).refs.fetch_add(1, Ordering::Release);
if old_size > usize::MAX >> 1 {
abort();
}
Self {
write_data_ptr: self.write_data_ptr,
read_data_ptr: self.read_data_ptr,
header_ptr: self.header_ptr,
ptr: self.ptr,
data_offset: self.data_offset,
inner: AtomicPtr::new(shared as _),
cap: self.cap,
}
}
}
}
impl Arena {
#[inline]
pub fn size(&self) -> usize {
self.header().allocated.load(Ordering::Acquire) as usize
}
#[inline]
pub fn capacity(&self) -> usize {
self.cap as usize
}
#[inline]
pub fn remaining(&self) -> usize {
(self.cap as usize).saturating_sub(self.size())
}
#[inline]
pub fn refs(&self) -> usize {
unsafe {
let shared: *mut Shared = self.inner.load(Ordering::Relaxed).cast();
(*shared).refs.load(Ordering::Acquire)
}
}
#[inline]
pub(super) fn header(&self) -> &Header {
unsafe { &*(self.header_ptr as *const _) }
}
}
unsafe impl Send for Arena {}
unsafe impl Sync for Arena {}
impl Arena {
#[inline]
pub fn new(opts: ArenaOptions) -> Self {
Self::new_in(Shared::new_vec(
opts.capacity(),
opts.maximum_alignment(),
opts.minimum_segment_size(),
))
}
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
#[inline]
pub fn map_mut<P: AsRef<std::path::Path>>(
path: P,
opts: ArenaOptions,
open_options: OpenOptions,
mmap_options: MmapOptions,
) -> std::io::Result<Self> {
Shared::map_mut(
path,
open_options,
mmap_options,
opts.maximum_alignment(),
opts.minimum_segment_size(),
)
.map(Self::new_in)
}
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
#[inline]
pub fn map<P: AsRef<std::path::Path>>(
path: P,
open_options: OpenOptions,
mmap_options: MmapOptions,
) -> std::io::Result<Self> {
Shared::map(path, open_options, mmap_options).map(Self::new_in)
}
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
#[inline]
pub fn map_anon(opts: ArenaOptions, mmap_options: MmapOptions) -> std::io::Result<Self> {
Shared::map_anon(
mmap_options,
opts.maximum_alignment(),
opts.minimum_segment_size(),
)
.map(Self::new_in)
}
#[inline]
pub fn alloc<T>(&self) -> Result<RefMut<'_, T>, ArenaError> {
if mem::size_of::<T>() == 0 {
return Ok(RefMut::new_zst(self));
}
let allocated = self
.alloc_in::<T>()?
.expect("allocated size is not zero, but get None");
let ptr = unsafe { self.get_aligned_pointer_mut::<T>(allocated.offset as usize) };
if mem::needs_drop::<T>() {
unsafe {
let ptr: *mut MaybeUninit<T> = ptr.cast();
ptr::write(ptr, MaybeUninit::uninit());
Ok(RefMut::new(
ptr::read(ptr),
allocated.offset,
allocated.cap as usize,
self,
))
}
} else {
unsafe {
Ok(RefMut::new_inline(
NonNull::new_unchecked(ptr.cast()),
allocated.offset,
allocated.cap as usize,
self,
))
}
}
}
#[inline]
pub fn alloc_bytes_owned(&self, size: u32) -> Result<BytesMut, ArenaError> {
self.alloc_bytes(size).map(|mut b| b.to_owned())
}
#[inline]
pub fn alloc_bytes(&self, size: u32) -> Result<BytesRefMut, ArenaError> {
self.alloc_bytes_in(size).map(|a| match a {
None => BytesRefMut::null(self),
Some(allocated) => unsafe { BytesRefMut::new(self, allocated.cap, allocated.offset) },
})
}
fn alloc_bytes_in(&self, size: u32) -> Result<Option<Allocated>, ArenaError> {
if size == 0 {
return Ok(None);
}
let header = self.header();
let mut allocated = header.allocated.load(Ordering::Acquire);
loop {
let want = allocated + size;
if want > self.cap {
break;
}
match header.allocated.compare_exchange_weak(
allocated,
want,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(offset) => return Ok(Some(Allocated { offset, cap: size })),
Err(x) => allocated = x,
}
}
for _ in 0..SLOW_RETRIES {
match self.alloc_slow_path(size) {
Ok(bytes) => return Ok(bytes),
Err(_) => continue,
}
}
Err(ArenaError)
}
fn alloc_slow_path(&self, size: u32) -> Result<Option<Allocated>, ArenaError> {
let backoff = Backoff::new();
let header = self.header();
loop {
let head = header.sentinel.load(Ordering::Acquire);
let (next, node_size) = decode_segment_node(head);
if next == u32::MAX && node_size == u32::MAX {
return Err(ArenaError);
}
if node_size == 0 {
backoff.snooze();
continue;
}
if size > node_size {
return Err(ArenaError);
}
let removed_head = encode_segment_node(next, 0);
if header
.sentinel
.compare_exchange_weak(head, removed_head, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
backoff.snooze();
continue;
}
let next_node = unsafe { self.get_segment_node(next) };
let next_node_val = next_node.load(Ordering::Acquire);
match header.sentinel.compare_exchange(
removed_head,
next_node_val,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
self.dealloc(next + size, node_size - size);
return Ok(Some(Allocated {
offset: next,
cap: size,
}));
}
Err(current_sentinel) => {
let (_, size) = decode_segment_node(current_sentinel);
if size == 0 {
backoff.snooze();
continue;
}
backoff.spin();
}
}
}
}
fn alloc_in<T>(&self) -> Result<Option<Allocated>, ArenaError> {
if mem::size_of::<T>() == 0 {
return Ok(None);
}
let header = self.header();
let mut allocated = header.allocated.load(Ordering::Acquire);
unsafe {
loop {
let ptr = self.get_pointer(allocated as usize);
let aligned_offset = ptr.align_offset(mem::align_of::<T>()) as u32;
let size = aligned_offset + mem::size_of::<T>() as u32;
let want = allocated + size;
if want > self.cap {
break;
}
match header.allocated.compare_exchange_weak(
allocated,
want,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(offset) => return Ok(Some(Allocated { offset, cap: size })),
Err(x) => allocated = x,
}
}
}
for _ in 0..SLOW_RETRIES {
match self.alloc_slow_path(Self::pad::<T>() as u32) {
Ok(bytes) => return Ok(bytes),
Err(_) => continue,
}
}
Err(ArenaError)
}
fn dealloc(&self, offset: u32, size: u32) {
if !self.validate_segment(offset, size) {
return;
}
let backoff = Backoff::new();
unsafe {
let ptr = self.write_data_ptr.as_ptr().add(offset as usize);
ptr::write_bytes(ptr, 0, size as usize);
let header = self.header();
loop {
let (prev, next) = self.find_free_list_position(size);
let prev_node = prev
.map(|p| self.get_segment_node(p))
.unwrap_or(&header.sentinel);
let next_node_offset = next.unwrap_or(u32::MAX);
self.write_segment_node(next_node_offset, offset, size);
let prev_node_val = prev_node.load(Ordering::Acquire);
let (_, prev_node_size) = decode_segment_node(prev_node_val);
if prev_node_size == 0 {
backoff.snooze();
continue;
}
match prev_node.compare_exchange(
prev_node_val,
encode_segment_node(offset, size),
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(current_prev) => {
let (_, size) = decode_segment_node(current_prev);
if size == 0 {
backoff.snooze();
continue;
}
backoff.spin();
}
}
}
}
}
#[inline]
fn validate_segment(&self, offset: u32, size: u32) -> bool {
unsafe {
let ptr = self.write_data_ptr.as_ptr().add(offset as usize);
let aligned_offset = ptr.align_offset(mem::align_of::<AtomicU64>());
let want = aligned_offset + mem::size_of::<AtomicU64>() + mem::size_of::<u32>();
if want >= size as usize {
return false;
}
if size < self.header().min_segment_size.load(Ordering::Acquire) {
return false;
}
true
}
}
fn find_free_list_position(&self, val: u32) -> (Option<u32>, Option<u32>) {
let header = self.header();
let mut current = &header.sentinel;
let mut prev = 0;
let backoff = Backoff::new();
loop {
let current_node = current.load(Ordering::Acquire);
let (current_next, current_node_size) = decode_segment_node(current_node);
if current_next == u32::MAX {
if prev == 0 {
return (None, None);
}
return (Some(prev), None);
}
if current_node_size == 0 {
backoff.snooze();
continue;
}
if val >= current_node_size {
if prev == 0 {
return (None, Some(current_next));
}
return (Some(prev), Some(current_next));
}
let next = unsafe { self.get_segment_node(current_next) };
prev = current_next;
current = next;
backoff.spin();
}
}
unsafe fn get_segment_node(&self, offset: u32) -> &AtomicU64 {
let ptr = self.read_data_ptr.add(offset as usize);
let aligned_offset = ptr.align_offset(mem::align_of::<AtomicU64>());
let ptr = ptr.add(aligned_offset);
&*(ptr as *const _)
}
unsafe fn write_segment_node(&self, next: u32, offset: u32, size: u32) -> u32 {
let ptr = self.write_data_ptr.as_ptr().add(offset as usize);
let aligned_offset = ptr.align_offset(mem::align_of::<AtomicU64>());
let ptr = ptr.add(aligned_offset);
let node = ptr as *mut AtomicU64;
let node = &mut *node;
node.store(encode_segment_node(next, size), Ordering::Release);
offset
}
#[inline]
fn new_in(mut shared: Shared) -> Self {
let read_data_ptr = shared.as_ptr();
let header_ptr = shared.header_ptr();
let ptr = shared.null_mut();
let write_data_ptr = shared
.as_mut_ptr()
.map(|p| unsafe { NonNull::new_unchecked(p) })
.unwrap_or_else(NonNull::dangling);
Self {
cap: shared.cap(),
write_data_ptr,
read_data_ptr,
header_ptr,
ptr,
data_offset: shared.data_offset as u32,
inner: AtomicPtr::new(Box::into_raw(Box::new(shared)) as _),
}
}
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
pub fn flush(&self) -> std::io::Result<()> {
let shared = self.inner.load(Ordering::Acquire);
{
let shared: *mut Shared = shared.cast();
unsafe { (*shared).flush() }
}
}
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
pub fn flush_async(&self) -> std::io::Result<()> {
let shared = self.inner.load(Ordering::Acquire);
{
let shared: *mut Shared = shared.cast();
unsafe { (*shared).flush_async() }
}
}
#[inline]
fn pad<T>() -> usize {
let size = mem::size_of::<T>();
let align = mem::align_of::<T>();
size + align - 1
}
#[inline]
pub(super) const unsafe fn get_bytes(&self, offset: usize, size: usize) -> &[u8] {
if offset == 0 {
return &[];
}
let ptr = self.get_pointer(offset);
slice::from_raw_parts(ptr, size)
}
#[allow(clippy::mut_from_ref)]
#[inline]
pub(super) unsafe fn get_bytes_mut(&self, offset: usize, size: usize) -> &mut [u8] {
if offset == 0 {
return &mut [];
}
let ptr = self.get_pointer_mut(offset);
slice::from_raw_parts_mut(ptr, size)
}
#[inline]
pub(super) const unsafe fn get_pointer(&self, offset: usize) -> *const u8 {
if offset == 0 {
return self.ptr;
}
self.read_data_ptr.add(offset)
}
#[inline]
pub(super) unsafe fn get_pointer_mut(&self, offset: usize) -> *mut u8 {
if offset == 0 {
return self.ptr;
}
self.write_data_ptr.as_ptr().add(offset)
}
#[inline]
pub(super) unsafe fn get_aligned_pointer_mut<T>(&self, offset: usize) -> *mut T {
if offset == 0 {
return ptr::null_mut();
}
let ptr = self.write_data_ptr.as_ptr().add(offset);
let aligned_offset = ptr.align_offset(mem::align_of::<T>());
ptr.add(aligned_offset) as *mut T
}
}
impl Drop for Arena {
fn drop(&mut self) {
unsafe {
self.inner.with_mut(|shared| {
let shared: *mut Shared = shared.cast();
if (*shared).refs.fetch_sub(1, Ordering::Release) != 1 {
return;
}
(*shared).refs.load(Ordering::Acquire);
let mut shared = Box::from_raw(shared);
shared.unmount();
});
}
}
}
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
fn invalid_data<E: std::error::Error + Send + Sync + 'static>(e: E) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
}
#[inline]
const fn decode_segment_node(val: u64) -> (u32, u32) {
((val >> 32) as u32, val as u32)
}
#[inline]
const fn encode_segment_node(next: u32, size: u32) -> u64 {
((next as u64) << 32) | size as u64
}
#[inline(never)]
#[cold]
fn abort() -> ! {
#[cfg(feature = "std")]
{
std::process::abort()
}
#[cfg(not(feature = "std"))]
{
struct Abort;
impl Drop for Abort {
fn drop(&mut self) {
panic!();
}
}
let _a = Abort;
panic!("abort");
}
}