use std::mem::MaybeUninit;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::*;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, AtomicU16};
use std::sync::Arc;
use crate::common::BLOCK_PER_PAGE;
use crate::block::Block;
use crate::page::shared_arena::{PageSharedArena, drop_page};
use crate::{ArenaArc, ArenaBox, ArenaRc};
pub struct SharedArena<T: Sized> {
free_list: AtomicPtr<PageSharedArena<T>>,
pending_free_list: Arc<AtomicPtr<PageSharedArena<T>>>,
full_list: AtomicPtr<PageSharedArena<T>>,
npages: AtomicUsize,
writer: AtomicBool,
shrinking: AtomicBool,
to_free: AtomicPtr<Vec<NonNull<PageSharedArena<T>>>>,
to_free_delay: AtomicU16,
}
unsafe impl<T: Sized> Send for SharedArena<T> {}
unsafe impl<T: Sized> Sync for SharedArena<T> {}
const DELAY_DROP_SHRINK: u16 = 10;
struct WriterGuard<'a> {
writer: &'a AtomicBool
}
impl WriterGuard<'_> {
fn new(writer: &AtomicBool) -> Option<WriterGuard> {
if !writer.load(Relaxed) && !writer.swap(true, AcqRel) {
Some(WriterGuard { writer })
} else {
None
}
}
fn new_blocking(writer: &AtomicBool) -> WriterGuard {
loop {
if !writer.swap(true, AcqRel) {
return WriterGuard { writer }
}
std::thread::yield_now();
}
}
}
impl Drop for WriterGuard<'_> {
fn drop(&mut self) {
self.writer.store(false, Release);
}
}
impl<T: Sized> SharedArena<T> {
fn put_pages_in_lists(
&self,
npages: usize,
first: NonNull<PageSharedArena<T>>,
mut last: NonNull<PageSharedArena<T>>
) {
let first_ptr = first.as_ptr();
let last_ref = unsafe { last.as_mut() };
let current = self.full_list.load(Relaxed);
last_ref.next = AtomicPtr::new(current);
let old = self.full_list.swap(first_ptr, AcqRel);
assert_eq!(current, old);
let current = self.free_list.load(Relaxed);
assert!(current.is_null(), "Arena.free isn't null");
let old = self.free_list.swap(first_ptr, AcqRel);
assert!(old.is_null(), "Arena.free2 isn't null");
self.npages.fetch_add(npages, Relaxed);
}
fn alloc_new_page(&self) {
let to_allocate = self.npages
.load(Relaxed)
.max(1)
.min(900_000);
let (first, last) = PageSharedArena::make_list(to_allocate, &self.pending_free_list);
self.put_pages_in_lists(to_allocate, first, last);
}
fn maybe_free_pages(&self) {
if self.to_free_delay.load(Relaxed) < DELAY_DROP_SHRINK {
let old = self.to_free_delay.fetch_add(1, AcqRel);
if old == DELAY_DROP_SHRINK - 1 {
let to_free = self.to_free.swap(std::ptr::null_mut(), AcqRel);
if let Some(to_free) = unsafe { to_free.as_mut() } {
let to_free = unsafe { Box::from_raw(to_free) };
for page in &*to_free {
drop_page(page.as_ptr());
}
}
}
}
}
fn find_place(&self) -> NonNull<Block<T>> {
loop {
while let Some(page) = unsafe { self.free_list.load(Acquire).as_mut() } {
if let Some(block) = page.acquire_free_block() {
return block;
}
let next = page.next_free.load(Acquire);
if self.free_list.compare_exchange(page, next, AcqRel, Relaxed).is_ok() {
page.in_free_list.store(false, Release);
}
}
if let Some(_guard) = WriterGuard::new(&self.writer) {
if self.free_list.load(Acquire).is_null() {
let pending = self.pending_free_list.load(Relaxed);
if !pending.is_null() {
let pending = self.pending_free_list.swap(std::ptr::null_mut(), AcqRel);
let old = self.free_list.swap(pending, Release);
assert!(old.is_null());
self.maybe_free_pages();
} else if !self.to_free.load(Relaxed).is_null() {
self.take_pages_to_be_freed();
} else {
self.alloc_new_page();
}
}
continue;
};
if self.free_list.load(Relaxed).is_null() {
std::thread::yield_now();
} else {
self.maybe_free_pages();
}
}
}
fn take_pages_to_be_freed(&self) {
if let Some(to_free) = unsafe {
self.to_free.swap(std::ptr::null_mut(), AcqRel).as_mut()
} {
let mut to_free = unsafe { Box::from_raw(to_free) };
let npages = self.npages.load(Relaxed).max(1);
let truncate_at = to_free.len().saturating_sub(npages);
let to_reinsert = &to_free[truncate_at..];
let (first, last) = PageSharedArena::make_list_from_slice(&to_reinsert);
self.put_pages_in_lists(to_reinsert.len(), first, last);
if truncate_at != 0 {
to_free.truncate(truncate_at);
let old = self.to_free.swap(Box::into_raw(to_free), Release);
assert!(old.is_null());
self.to_free_delay.store(0, Relaxed);
} else {
self.to_free_delay.store(DELAY_DROP_SHRINK, Relaxed);
}
}
}
pub fn with_capacity(cap: usize) -> SharedArena<T> {
let npages = ((cap.max(1) - 1) / BLOCK_PER_PAGE) + 1;
let pending_free = Arc::new(AtomicPtr::new(std::ptr::null_mut()));
let (first, _) = PageSharedArena::make_list(npages, &pending_free);
SharedArena {
npages: AtomicUsize::new(npages),
free_list: AtomicPtr::new(first.as_ptr()),
pending_free_list: pending_free,
full_list: AtomicPtr::new(first.as_ptr()),
writer: AtomicBool::new(false),
shrinking: AtomicBool::new(false),
to_free: AtomicPtr::new(std::ptr::null_mut()),
to_free_delay: AtomicU16::new(DELAY_DROP_SHRINK)
}
}
pub fn new() -> SharedArena<T> {
SharedArena::with_capacity(BLOCK_PER_PAGE)
}
pub fn alloc(&self, value: T) -> ArenaBox<T> {
let block = self.find_place();
unsafe {
let ptr = block.as_ref().value.get();
ptr.write(value);
}
ArenaBox::new(block)
}
pub fn alloc_with<F>(&self, initializer: F) -> ArenaBox<T>
where
F: Fn(&mut MaybeUninit<T>) -> &T
{
let block = self.find_place();
let result = ArenaBox::new(block);
unsafe {
let ptr = block.as_ref().value.get();
let reference = initializer(&mut *(ptr as *mut std::mem::MaybeUninit<T>));
assert_eq!(
ptr as * const T,
reference as * const T,
"`initializer` must return a reference of its parameter"
);
}
result
}
pub fn alloc_arc(&self, value: T) -> ArenaArc<T> {
let block = self.find_place();
unsafe {
let ptr = block.as_ref().value.get();
ptr.write(value);
}
ArenaArc::new(block)
}
pub fn alloc_arc_with<F>(&self, initializer: F) -> ArenaArc<T>
where
F: Fn(&mut MaybeUninit<T>) -> &T
{
let block = self.find_place();
let result = ArenaArc::new(block);
unsafe {
let ptr = block.as_ref().value.get();
let reference = initializer(&mut *(ptr as *mut std::mem::MaybeUninit<T>));
assert_eq!(
ptr as * const T,
reference as * const T,
"`initializer` must return a reference of its parameter"
);
}
result
}
pub fn alloc_rc(&self, value: T) -> ArenaRc<T> {
let block = self.find_place();
unsafe {
let ptr = block.as_ref().value.get();
ptr.write(value);
}
ArenaRc::new(block)
}
pub fn alloc_rc_with<F>(&self, initializer: F) -> ArenaRc<T>
where
F: Fn(&mut MaybeUninit<T>) -> &T
{
let block = self.find_place();
let result = ArenaRc::new(block);
unsafe {
let ptr = block.as_ref().value.get();
let reference = initializer(&mut *(ptr as *mut std::mem::MaybeUninit<T>));
assert_eq!(
ptr as * const T,
reference as * const T,
"`initializer` must return a reference of its parameter"
);
}
result
}
pub fn shrink_to_fit(&self) -> bool {
if self.shrinking.swap(true, AcqRel) {
return false;
}
let _guard = WriterGuard::new_blocking(&self.writer);
let mut current: &AtomicPtr<PageSharedArena<T>> = &AtomicPtr::new(self.free_list.swap(std::ptr::null_mut(), AcqRel));
let start = current;
let mut to_drop = Vec::with_capacity(self.npages.load(Relaxed));
while let Some(current_value) = unsafe { current.load(Relaxed).as_mut() } {
let next = ¤t_value.next_free;
let next_value = next.load(Acquire);
if current_value.bitfield.load(Acquire) == !0 {
if current.compare_exchange(
current_value as *const _ as *mut _, next_value, AcqRel, Relaxed
).is_ok() {
let ptr = current_value as *const _ as *mut PageSharedArena<T>;
to_drop.push(NonNull::new(ptr).unwrap());
}
} else {
current = next;
}
}
to_drop.retain(|page| {
let page_ref = unsafe { page.as_ref() };
page_ref.bitfield.load(Acquire) == !0
});
let mut current: &AtomicPtr<PageSharedArena<T>> = &self.full_list;
while let Some(current_value) = unsafe { current.load(Relaxed).as_mut() } {
let ptr = unsafe { NonNull::new_unchecked(current_value) };
let next = ¤t_value.next;
let next_value = next.load(Relaxed);
if to_drop.contains(&ptr) {
current.compare_exchange(
current_value as *const _ as *mut _, next_value, AcqRel, Relaxed
).expect("Something went wrong in shrinking.");
} else {
current = next;
}
}
let nfreed = to_drop.len();
if nfreed != 0 {
self.to_free_delay.store(0, Release);
if let Some(to_free) = unsafe { self.to_free.swap(std::ptr::null_mut(), AcqRel).as_mut() } {
to_free.append(&mut to_drop);
let old = self.to_free.swap(to_free, AcqRel);
assert!(old.is_null());
} else {
let ptr = Box::new(to_drop);
let old = self.to_free.swap(Box::into_raw(ptr), AcqRel);
assert!(old.is_null());
}
}
let old = self.free_list.swap(start.load(Relaxed), Release);
assert!(old.is_null(), "OLD NOT NULL");
self.npages.fetch_sub(nfreed, Release);
self.shrinking.store(false, Release);
true
}
pub fn stats(&self) -> (usize, usize) {
let mut next = self.free_list.load(Relaxed);
let mut free = 0;
while let Some(next_ref) = unsafe { next.as_mut() } {
let next_next = next_ref.next_free.load(Relaxed);
free += next_ref.bitfield.load(Relaxed).count_ones() as usize - 1;
next = next_next;
}
let mut next = self.pending_free_list.load(Relaxed);
while let Some(next_ref) = unsafe { next.as_mut() } {
let next_next = next_ref.next_free.load(Relaxed);
free += next_ref.bitfield.load(Relaxed).count_ones() as usize - 1;
next = next_next;
}
let used = (self.npages.load(Relaxed) * BLOCK_PER_PAGE) - free;
(used, free)
}
#[cfg(target_pointer_width = "64") ]
#[cfg(test)]
pub(crate) fn size_lists(&self) -> (usize, usize, usize) {
let mut next = self.full_list.load(Relaxed);
let mut size = 0;
while let Some(next_ref) = unsafe { next.as_mut() } {
next = next_ref.next.load(Relaxed);
size += 1;
}
let mut next = self.free_list.load(Relaxed);
let mut free = 0;
while let Some(next_ref) = unsafe { next.as_mut() } {
next = next_ref.next_free.load(Relaxed);
free += 1;
}
let mut next = self.pending_free_list.load(Relaxed);
let mut pending = 0;
while let Some(next_ref) = unsafe { next.as_mut() } {
next = next_ref.next_free.load(Relaxed);
pending += 1;
}
(size, free, pending)
}
#[allow(dead_code)]
#[cfg(test)]
pub(crate) fn display_list(&self) {
let mut full = vec![];
let mut next = self.full_list.load(Relaxed);
while let Some(next_ref) = unsafe { next.as_mut() } {
full.push(next);
next = next_ref.next.load(Relaxed);
}
let mut list_free = vec![];
let mut next = self.free_list.load(Relaxed);
while let Some(next_ref) = unsafe { next.as_mut() } {
list_free.push(next);
next = next_ref.next_free.load(Relaxed);
}
println!("FULL {} {:#?}", full.len(), full);
println!("FREE {} {:#?}", list_free.len(), list_free);
}
}
impl<T> Drop for SharedArena<T> {
fn drop(&mut self) {
if let Some(to_free) = unsafe { self.to_free.load(Relaxed).as_mut() } {
let to_free = unsafe { Box::from_raw(to_free) };
for page in &*to_free {
drop_page(page.as_ptr());
}
}
let mut next = self.full_list.load(Relaxed);
while let Some(next_ref) = unsafe { next.as_mut() } {
let next_next = next_ref.next.load(Relaxed);
drop_page(next);
next = next_next;
}
}
}
impl<T: Sized> Default for SharedArena<T> {
fn default() -> SharedArena<T> {
SharedArena::new()
}
}
impl<T> std::fmt::Debug for SharedArena<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
struct Page {
free: usize,
used: usize,
}
impl std::fmt::Debug for Page {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Page {{ free: {} used: {} }}", self.free, self.used)
}
}
let npages = self.npages.load(Relaxed);
let mut vec = Vec::with_capacity(npages);
let mut next = self.full_list.load(Relaxed);
while let Some(next_ref) = unsafe { next.as_mut() } {
let used = next_ref.bitfield.load(Relaxed).count_zeros() as usize;
vec.push(Page {
used,
free: BLOCK_PER_PAGE - used
});
next = next_ref.next.load(Relaxed);
}
let blocks_used: usize = vec.iter().map(|p| p.used).sum();
let blocks_free: usize = vec.iter().map(|p| p.free).sum();
f.debug_struct("SharedArena")
.field("blocks_free", &blocks_free)
.field("blocks_used", &blocks_used)
.field("npages", &npages)
.field("pages", &vec)
.finish()
}
}
#[allow(dead_code)]
fn arena_fail() {}
#[cfg(test)]
mod tests {
use super::SharedArena;
use std::mem::MaybeUninit;
use std::ptr;
#[cfg(target_pointer_width = "64") ]
#[test]
fn arena_shrink() {
let arena = SharedArena::<usize>::with_capacity(1000);
assert_eq!(arena.stats(), (0, 1008));
arena.shrink_to_fit();
assert_eq!(arena.stats(), (0, 0));
}
#[cfg(target_pointer_width = "64") ]
#[test]
fn arena_shrink2() {
let arena = SharedArena::<usize>::with_capacity(1000);
let _a = arena.alloc(1);
arena.shrink_to_fit();
assert_eq!(arena.stats(), (1, 62));
let _a = arena.alloc(1);
arena.shrink_to_fit();
assert_eq!(arena.stats(), (2, 61));
let mut values = Vec::with_capacity(64);
for _ in 0..64 {
values.push(arena.alloc(1));
}
assert_eq!(arena.stats(), (66, 60));
arena.shrink_to_fit();
assert_eq!(arena.stats(), (66, 60));
std::mem::drop(values);
assert_eq!(arena.stats(), (2, 124));
arena.shrink_to_fit();
assert_eq!(arena.stats(), (2, 61));
}
#[cfg(target_pointer_width = "64") ]
#[test]
fn arena_size() {
let arena = SharedArena::<usize>::with_capacity(1000);
assert_eq!(arena.size_lists(), (16, 16, 0));
let a = arena.alloc(1);
assert_eq!(arena.size_lists(), (16, 16, 0));
let mut values = Vec::with_capacity(539);
for _ in 0..539 {
values.push(arena.alloc(1));
}
assert_eq!(arena.size_lists(), (16, 8, 0));
arena.shrink_to_fit();
assert_eq!(arena.size_lists(), (9, 1, 0));
values.truncate(503);
arena.shrink_to_fit();
assert_eq!(arena.size_lists(), (8, 0, 0));
std::mem::drop(a);
for _ in 0..62 {
values.remove(0);
}
assert_eq!(arena.size_lists(), (8, 0, 1));
arena.shrink_to_fit();
assert_eq!(arena.size_lists(), (8, 0, 1));
values.clear();
assert_eq!(arena.size_lists(), (8, 0, 8));
arena.shrink_to_fit();
assert_eq!(arena.size_lists(), (8, 0, 8));
{
let _a = arena.alloc(1);
assert_eq!(arena.size_lists(), (8, 8, 0));
println!("{:?}", arena);
arena.display_list();
}
assert_eq!(arena.size_lists(), (8, 8, 0));
arena.shrink_to_fit();
assert_eq!(arena.size_lists(), (0, 0, 0));
let mut values = Vec::with_capacity(126);
for _ in 0..126 {
values.push(arena.alloc(1));
}
assert_eq!(arena.size_lists(), (2, 1, 0));
values.remove(0);
assert_eq!(arena.size_lists(), (2, 1, 1));
values.push(arena.alloc(1));
assert_eq!(arena.size_lists(), (2, 1, 0));
}
#[test]
fn alloc_with_initializer() {
struct MyData {
a: usize
}
fn initialize_data<'d>(uninit: &'d mut MaybeUninit<MyData>, source: &MyData) -> &'d MyData {
unsafe {
let ptr = uninit.as_mut_ptr();
ptr::copy(source, ptr, 1);
&*ptr
}
}
let arena = SharedArena::<MyData>::new();
let source = MyData { a: 101 };
let data = arena.alloc_with(|uninit| {
initialize_data(uninit, &source)
});
assert!(data.a == 101);
let source = MyData { a: 102 };
let data = arena.alloc_rc_with(|uninit| {
initialize_data(uninit, &source)
});
assert!(data.a == 102);
let source = MyData { a: 103 };
let data = arena.alloc_arc_with(|uninit| {
initialize_data(uninit, &source)
});
assert!(data.a == 103);
}
#[test]
#[should_panic]
fn alloc_with_panic() {
let arena = SharedArena::<usize>::new();
const SOURCE: usize = 10;
let _ = arena.alloc_with(|_| {
&SOURCE
});
}
#[test]
#[should_panic]
fn alloc_rc_with_panic() {
let arena = SharedArena::<usize>::new();
const SOURCE: usize = 10;
let _ = arena.alloc_rc_with(|_| {
&SOURCE
});
}
#[test]
#[should_panic]
fn alloc_arc_with_panic() {
let arena = SharedArena::<usize>::new();
const SOURCE: usize = 10;
let _ = arena.alloc_arc_with(|_| {
&SOURCE
});
}
#[test]
fn alloc_fns() {
let arena = SharedArena::<usize>::new();
use std::ptr;
let a = arena.alloc_with(|place| unsafe {
ptr::copy(&101, place.as_mut_ptr(), 1);
&*place.as_mut_ptr()
});
assert!(*a == 101);
let a = arena.alloc_arc_with(|place| unsafe {
ptr::copy(&102, place.as_mut_ptr(), 1);
&*place.as_mut_ptr()
});
assert!(*a == 102);
let a = arena.alloc_rc_with(|place| unsafe {
ptr::copy(&103, place.as_mut_ptr(), 1);
&*place.as_mut_ptr()
});
assert!(*a == 103);
let a = arena.alloc(104);
assert!(*a == 104);
let a = arena.alloc_arc(105);
assert!(*a == 105);
let a = arena.alloc_rc(106);
assert!(*a == 106);
}
#[test]
fn drop_arena_with_valid_allocated() {
let (a, b, c, d) = {
let arena = SharedArena::<usize>::new();
use std::ptr;
let a = arena.alloc_with(|place| unsafe {
ptr::copy(&101, place.as_mut_ptr(), 1);
&*place.as_mut_ptr()
});
let b = arena.alloc_arc_with(|place| unsafe {
ptr::copy(&102, place.as_mut_ptr(), 1);
&*place.as_mut_ptr()
});
let c = arena.alloc(103);
let d = arena.alloc_arc(104);
(a, b, c, d)
};
assert_eq!((*a, *b, *c, *d), (101, 102, 103, 104))
}
#[test]
#[cfg_attr(miri, ignore)]
fn arena_with_threads() {
test_with_threads(12, 1024 * 64, false);
}
#[test]
#[cfg_attr(miri, ignore)]
fn arena_with_threads_and_shrinks() {
test_with_threads(12, 1024 * 4, true);
}
#[test]
fn miri_arena_with_threads() {
test_with_threads(12, 128, false);
}
#[test]
fn miri_arena_with_threads_and_shrinks() {
test_with_threads(12, 64, true);
}
fn test_with_threads(nthreads: usize, nallocs: usize, with_shrink: bool) {
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
fn get_random_number(max: usize) -> usize {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.subsec_nanos() as usize;
nanos % max
}
let arena = Arc::new(SharedArena::<usize>::default());
let mut values = Vec::with_capacity(126);
for _ in 0..63 {
values.push(arena.alloc(1));
}
let mut handles = Vec::with_capacity(nthreads);
let barrier = Arc::new(Barrier::new(nthreads));
for _ in 0..nthreads {
let c = barrier.clone();
let arena = arena.clone();
handles.push(thread::spawn(move|| {
c.wait();
arena.shrink_to_fit();
let mut values = Vec::with_capacity(nallocs);
for i in 0..(nallocs) {
values.push(arena.alloc(1));
let rand = get_random_number(values.len());
if (i + 1) % 5 == 0 {
values.remove(rand);
}
if with_shrink && rand % 200 == 0 {
if arena.shrink_to_fit() {
}
}
}
}));
}
for handle in handles {
handle.join().unwrap();
}
}
}