use std::mem::{self, MaybeUninit};
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use arc_swap::{ArcSwapOption, Guard};
use crossbeam::queue::SegQueue;
const CHUNK_SIZE: usize = 1024;
#[derive(Debug)]
pub struct ChunkedLinkedList<T> {
free_entries: SegQueue<(*const Chunk<T>, usize)>,
head: AtomicPtr<Chunk<T>>,
estimated_len: AtomicUsize,
}
unsafe impl<T> Send for ChunkedLinkedList<T> where T: Send + Sync {}
unsafe impl<T> Sync for ChunkedLinkedList<T> where T: Send + Sync {}
struct Chunk<T> {
values: [ArcSwapOption<T>; CHUNK_SIZE],
next: *const Chunk<T>,
}
unsafe impl<T> Send for Chunk<T> where T: Send {}
unsafe impl<T> Sync for Chunk<T> where T: Sync {}
impl<T> Chunk<T> {
fn iter_this<F: Fn(Arc<T>) + Sync>(&self, f: &F) {
for i in 0..CHUNK_SIZE {
let v = Guard::into_inner(self.values[i].load());
if let Some(arc) = v {
f(arc)
}
}
}
fn par_iter_rest<F: Fn(Arc<T>) + Sync>(&self, f: &F)
where
T: Send + Sync,
{
if self.next.is_null() {
self.iter_this(f);
} else {
rayon::join(
|| self.iter_this(f),
|| {
let next = unsafe { &*self.next };
next.par_iter_rest(f)
},
);
}
}
fn retain_this<F: Fn(&Arc<T>) -> bool + Sync>(&self, f: &F, host: &ChunkedLinkedList<T>)
where
T: Send + Sync,
{
for i in 0..CHUNK_SIZE {
let current = self.values[i].load();
let should_retain = match &*current {
Some(arc) => f(arc),
None => true,
};
if !should_retain {
let res = self.values[i].compare_and_swap(¤t, None);
if let (Some(res_ref), Some(cur_ref)) = (res.as_ref(), current.as_ref()) {
if Arc::as_ptr(res_ref) == Arc::as_ptr(cur_ref) {
host.estimated_len.fetch_sub(1, Ordering::Relaxed);
host.free_entries.push((self as _, i))
}
}
}
}
}
fn par_retain_rest<F: Fn(&Arc<T>) -> bool + Sync>(&self, f: &F, host: &ChunkedLinkedList<T>)
where
T: Send + Sync,
{
if self.next.is_null() {
self.retain_this(f, host);
} else {
let next = unsafe { &*self.next };
rayon::join(
|| self.retain_this(f, host),
|| next.par_retain_rest(f, host),
);
}
}
}
#[derive(Debug)]
pub struct CLLItem<T> {
pub v: Arc<T>,
from: *const Chunk<T>,
idx: usize,
}
unsafe impl<T> Send for CLLItem<T> {}
unsafe impl<T> Sync for CLLItem<T> {}
impl<T> Clone for CLLItem<T> {
fn clone(&self) -> Self {
Self {
v: self.v.clone(),
from: self.from,
idx: self.idx,
}
}
}
impl<T> ChunkedLinkedList<T> {
pub fn new() -> Self {
let free_entries = SegQueue::new();
let head = Box::into_raw(Box::new(Chunk {
values: initialize_values(),
next: ptr::null(),
}));
for i in 0..CHUNK_SIZE {
free_entries.push((head as *const _, i));
}
Self {
free_entries,
head: AtomicPtr::new(head as *mut _),
estimated_len: AtomicUsize::new(0),
}
}
fn expand(&self) {
let mut new_head;
loop {
let old_head = self.head.load(Ordering::Relaxed);
new_head = Box::into_raw(Box::new(Chunk {
values: initialize_values(),
next: old_head,
}));
let swap_result = self.head.compare_exchange(
old_head,
new_head,
Ordering::Relaxed,
Ordering::Relaxed,
);
if swap_result.is_ok() {
break;
}
unsafe {
Box::from_raw(new_head);
}
}
for i in 0..CHUNK_SIZE {
self.free_entries.push((new_head as *const _, i));
}
}
#[allow(clippy::redundant_else)]
pub fn insert(&self, v: Arc<T>) -> CLLItem<T> {
loop {
if let Some(idx) = self.free_entries.pop() {
let chunk = unsafe { &*idx.0 };
let slot = &chunk.values[idx.1];
slot.store(Some(v.clone()));
let res = CLLItem {
v,
from: idx.0,
idx: idx.1,
};
self.estimated_len.fetch_add(1, Ordering::Relaxed);
return res;
} else {
self.expand();
}
}
}
pub fn remove(&self, cll_item: &CLLItem<T>) {
let chunk = unsafe { &*cll_item.from };
let slot = &chunk.values[cll_item.idx];
let res = slot.compare_and_swap(&cll_item.v, None);
if let Some(prev_v) = &*res {
if Arc::as_ptr(prev_v) == Arc::as_ptr(&cll_item.v) {
self.estimated_len.fetch_sub(1, Ordering::Relaxed);
self.free_entries.push((cll_item.from, cll_item.idx))
}
}
}
pub fn par_retain<F: Fn(&Arc<T>) -> bool + Sync>(&self, f: F)
where
T: Send + Sync,
{
let head = unsafe { &*self.head.load(Ordering::Relaxed) };
head.par_retain_rest(&f, self);
}
pub fn par_iter<F: Fn(Arc<T>) + Sync>(&self, f: F)
where
T: Send + Sync,
{
let head = unsafe { &*self.head.load(Ordering::Relaxed) };
head.par_iter_rest(&f);
}
pub fn estimate_len(&self) -> usize {
self.estimated_len.load(Ordering::Relaxed)
}
}
fn initialize_values<T>() -> [ArcSwapOption<T>; CHUNK_SIZE] {
unsafe {
let mut data: [MaybeUninit<ArcSwapOption<T>>; CHUNK_SIZE] =
MaybeUninit::uninit().assume_init();
for elem in &mut data[..] {
ptr::write(elem.as_mut_ptr(), ArcSwapOption::new(None));
}
mem::transmute(data)
}
}