#![feature(vec_into_raw_parts)]
#![feature(core_intrinsics)]
#![feature(sync_unsafe_cell)]
#![feature(let_chains)]
#![feature(const_trait_impl)]
#![feature(const_ptr_write)]
#![feature(strict_provenance)]
#![allow(clippy::drop_copy)]
use std::{
cell::SyncUnsafeCell,
fmt::Debug,
hint::spin_loop,
intrinsics::unlikely,
mem::{align_of, ManuallyDrop, MaybeUninit, transmute},
ops::Index,
ptr::invalid_mut,
sync::{
atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
RwLock,
},
};
use aliasable::prelude::*;
use once_cell::sync::OnceCell;
use option::AtomicOption;
mod option;
#[cfg(test)]
mod tests;
pub struct Lariv<'a, T> {
list: AliasableBox<LarivNode<'a, T>>, shared: &'a SharedItems<'a, T>, }
#[derive(Debug)]
struct LarivNode<'a, T> {
ptr: AtomicPtr<AtomicOption<T>>, next: OnceCell<AliasableBox<Self>>, nth: usize, shared: &'a SharedItems<'a, T>, }
#[derive(Copy, Clone, Debug)]
pub struct LarivIndex {
node: u64,
index: u64,
}
#[derive(Debug)]
struct SharedItems<'a, T> {
head: SyncUnsafeCell<MaybeUninit<&'a LarivNode<'a, T>>>, cursor: AtomicUsize, cursor_node_ptr: AtomicPtr<LarivNode<'a, T>>, cap: usize, allocation_threshold: AtomicUsize, reallocating: AtomicBool, nodes: AtomicUsize, }
impl<'a, T> Lariv<'a, T> {
#[must_use]
pub fn new(buf_cap: usize) -> Self {
let (ptr, len, cap) = Vec::with_capacity(buf_cap).into_raw_parts();
Self::init_buf(ptr, cap);
let shared_items: &'a _ = Box::leak(Box::new(SharedItems {
head: SyncUnsafeCell::new(MaybeUninit::uninit()),
cursor: AtomicUsize::new(len),
cursor_node_ptr: AtomicPtr::new(invalid_mut(align_of::<LarivNode<'a, T>>())),
cap,
allocation_threshold: AtomicUsize::new(0),
reallocating: AtomicBool::new(false),
nodes: AtomicUsize::new(1),
}));
let head = LarivNode::new(ptr, 0, shared_items);
shared_items.cursor_node_ptr.store(
head.as_ref() as *const LarivNode<'a, T> as *mut _,
Ordering::Relaxed,
);
unsafe { (*shared_items.head.get()).write(&*(head.as_ref() as *const _)) };
Self {
list: head,
shared: shared_items,
}
}
#[cfg_attr(not(miri), allow(unused_variables, unused_mut))]
#[inline]
const fn init_buf<V: ~const Default>(ptr: *mut V, cap: usize) {
let mut i = 0;
#[cfg(miri)]
loop {
if i == cap {
break;
};
unsafe { ptr.add(i).write(V::default()) }
i += 1;
}
}
#[inline]
pub fn push(&self, conn: T) -> LarivIndex {
unsafe { &*self.shared.cursor_node_ptr.load(Ordering::Acquire) }.push(conn)
}
#[must_use]
#[inline]
pub fn get(&self, index: LarivIndex) -> Option<&RwLock<T>> {
self.traverse_get(index).and_then(|p| unsafe { &*p }.get())
}
#[inline]
pub fn remove(&self, index: LarivIndex) {
if let Some(e) = self.traverse_get(index) {
unsafe { &*e }.empty();
drop(self.shared.allocation_threshold.fetch_update(
Ordering::AcqRel,
Ordering::Acquire,
|i| {
if i == 0 {
None
} else {
Some(i - 1)
}
},
));
}
}
#[must_use]
#[inline]
fn traverse_get(&self, mut li: LarivIndex) -> Option<*mut AtomicOption<T>> {
let mut node = &self.list;
if li.index as usize >= node.shared.cap {
return None
};
while li.node > 0 {
node = node.next.get()?;
li.node -= 1
}
Some(unsafe { node.ptr.load(Ordering::Relaxed).add(li.index as usize) })
}
#[must_use]
#[inline]
pub fn cap(&self) -> usize {
self.shared.nodes.load(Ordering::Acquire) * self.shared.cap
}
}
impl<'a, T> Index<LarivIndex> for Lariv<'a, T> {
type Output = RwLock<T>;
#[inline]
fn index(&self, index: LarivIndex) -> &Self::Output {
self.get(index).expect("index out of bounds")
}
}
impl<'a, T> LarivNode<'a, T> {
fn new(
ptr: *mut AtomicOption<T>,
nth: usize,
shared_items: &'a SharedItems<'a, T>,
) -> AliasableBox<Self> {
AliasableBox::from_unique(UniqueBox::new(Self {
ptr: AtomicPtr::new(ptr),
next: OnceCell::new(),
nth,
shared: shared_items,
}))
}
#[inline]
fn push(&self, element: T) -> LarivIndex {
let mut node = self;
let mut index = node.shared.cursor.fetch_add(1, Ordering::AcqRel);
loop {
break if index < node.shared.cap && let Some(mut pos) =
unsafe { &*node.ptr.load(Ordering::Relaxed).add(index) }.try_set()
{
pos.write(element);
LarivIndex::new(node.nth, index)
} else {
let mut end = false;
index = node.shared.cursor
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |mut i| {
if i > node.shared.cap {
i -= 1;
end = unlikely(i == node.shared.cap);
Some(i % node.shared.cap)
} else {
Some(i + 1)
}
}).unwrap_or_else(|i| i);
node = unsafe { &*node.shared.cursor_node_ptr.load(Ordering::Acquire) };
if end {
if let Some(next) = node.next.get() {
node.shared.cursor_node_ptr.store(unsafe {
*(next as *const AliasableBox<LarivNode<'a, T>>).cast::<*const LarivNode<'a, T>>()
}.cast_mut(), Ordering::Release);
node.shared.cursor.store(0, Ordering::Release);
continue
} else if node.shared.allocation_threshold.fetch_update(
Ordering::AcqRel,
Ordering::Acquire,
|i| if i == 0 { Some(node.calculate_allocate_threshold()) } else { None }
).is_ok() {
node.shared.cursor_node_ptr.store(unsafe {
(*node.shared.head.get()).assume_init() as *const LarivNode<'a, T>
}.cast_mut(), Ordering::Release);
node.shared.cursor.store(0, Ordering::Release);
continue
} else if node.next.get().is_none() && node.shared.allocation_threshold.load(Ordering::Acquire) != 0 && !node.shared.reallocating.fetch_or(true, Ordering::AcqRel) {
let li = node.extend(element);
node.shared.reallocating.store(false, Ordering::Release);
if li.index as usize > node.shared.cap {
println!("aaaaaa {}", li.index);
}
li
} else {
node = unsafe { &*node.shared.cursor_node_ptr.load(Ordering::Acquire) };
continue
}
} else {
while unlikely(node.shared.reallocating.load(Ordering::Acquire)) {
spin_loop();
}
node = unsafe { &*node.shared.cursor_node_ptr.load(Ordering::Acquire) };
continue
}
};
}
}
fn extend(&self, first_element: T) -> LarivIndex {
let (ptr, _, cap) = Vec::with_capacity(self.shared.cap).into_raw_parts();
Lariv::<T>::init_buf::<AtomicOption<T>>(ptr, cap);
unsafe {
*ptr = AtomicOption::some(first_element);
let nth = self.nth + 1;
let node = Self::new(ptr, nth, self.shared);
let node_ptr = node.as_ref() as *const _ as *mut _;
debug_assert!(self.next.get().is_none());
self.next.set(node).unwrap_unchecked();
self.shared.nodes.fetch_add(1, Ordering::AcqRel);
self.shared.allocation_threshold.store(0, Ordering::Release);
self.shared
.cursor_node_ptr
.store(node_ptr, Ordering::Release);
LarivIndex::new(nth, 0)
}
}
#[inline]
fn calculate_allocate_threshold(&self) -> usize {
((self.shared.nodes.load(Ordering::Acquire) * self.shared.cap) as f64 * 0.3).abs() as usize
}
}
impl<'a, T> Debug for Lariv<'a, T>
where
T: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut next = Some(&self.list);
while let Some(node) = next {
writeln!(f, "{:?}", unsafe {
ManuallyDrop::new(Vec::<AtomicOption<T>>::from_raw_parts(
node.ptr.load(Ordering::Relaxed),
node.shared.cap,
node.shared.cap,
))
.iter()
.map(|x| x.get().and_then(|x| x.read().ok()))
.collect::<Vec<_>>()
})?;
next = node.next.get();
}
write!(f, "")
}
}
impl<'a, T> Drop for Lariv<'a, T> {
fn drop(&mut self) {
let mut next = Some(&self.list);
while let Some(node) = next {
unsafe {
drop(Vec::from_raw_parts(
node.ptr.load(Ordering::Relaxed),
node.shared.cap,
node.shared.cap,
));
};
next = node.next.get();
}
#[cfg(not(miri))]
unsafe {
drop(Box::from_raw(
self.shared as *const _ as *mut SharedItems<'a, T>,
));
}
}
}
impl LarivIndex {
#[inline]
pub fn new(node: usize, index: usize) -> Self {
Self {
node: node as u64,
index: index as u64,
}
}
}
impl From<LarivIndex> for u128 {
#[inline]
fn from(value: LarivIndex) -> Self {
u128::from_le_bytes(unsafe {
transmute([value.node.to_le_bytes(), value.index.to_le_bytes()])
})
}
}