#![warn(unsafe_op_in_unsafe_fn)]
use alloc::alloc::{dealloc, handle_alloc_error, Layout};
use cordyceps::{
mpsc_queue::{Links, TryDequeueError},
Linked, MpscQueue,
};
use core::{
marker::PhantomData,
mem::ManuallyDrop,
ptr::{self, drop_in_place, NonNull},
sync::atomic::{self, AtomicUsize, Ordering},
task::Waker,
};
use diatomic_waker::primitives::DiatomicWaker;
use spin::mutex::SpinMutex;
pub(crate) struct WakerList {
ptr: NonNull<WakerHeader>,
phantom: PhantomData<WakerListInner>,
}
#[repr(C)]
pub(crate) struct WakerListInner {
meta: WakerHeader,
slice: [WakerItem],
}
pub(crate) struct WakerHeader {
strong: AtomicUsize,
waker: DiatomicWaker,
len: usize,
queue: MpscQueue<WakerItem>,
}
pub(crate) struct WakerItem {
links: Links<Self>,
wake_lock: SpinMutex<bool>,
index: usize,
}
unsafe impl Linked<Links<Self>> for WakerItem {
type Handle = NonNull<Self>;
fn into_ptr(r: Self::Handle) -> NonNull<Self> {
r
}
unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
ptr
}
unsafe fn links(ptr: NonNull<Self>) -> NonNull<Links<Self>> {
let this = ptr.as_ptr();
let links = unsafe { ptr::addr_of_mut!((*this).links) };
unsafe { NonNull::new_unchecked(links) }
}
}
const fn __assert_send_sync<T: Send + Sync>() {}
const _: () = {
__assert_send_sync::<WakerItem>();
unsafe impl Send for WakerList {}
unsafe impl Sync for WakerList {}
};
impl WakerList {
fn slice_start(&self) -> *mut WakerItem {
unsafe {
let ptr = self.ptr.as_ptr().cast::<u8>();
ptr.add(slice_offset()).cast::<WakerItem>()
}
}
pub(crate) unsafe fn push(&self, index: usize) {
let queue = unsafe { &*ptr::addr_of!((*self.ptr.as_ptr()).queue) };
let slot = unsafe { self.slice_start().add(index) };
let mut wake_lock = unsafe { &*slot }.wake_lock.lock();
let prev = core::mem::replace(&mut *wake_lock, true);
if !prev {
queue.enqueue(unsafe { NonNull::new_unchecked(slot) });
}
}
pub(crate) fn register(&mut self, waker: &Waker) {
let meta = unsafe { &*self.ptr.as_ptr() };
unsafe { meta.waker.register(waker) }
}
fn get(&self, index: usize) -> ManuallyDrop<Waker> {
let slot = unsafe { self.slice_start().add(index) };
debug_assert_eq!(
unsafe { (*slot).index },
index,
"the slot should point at our index"
);
slot::waker(slot)
}
pub(crate) unsafe fn pop(&self) -> ReadySlot<(usize, ManuallyDrop<Waker>)> {
let queue = unsafe { &*ptr::addr_of!((*self.ptr.as_ptr()).queue) };
match unsafe { queue.try_dequeue_unchecked() } {
Ok(slot) => {
let slot = unsafe { &*slot.as_ptr() };
*slot.wake_lock.lock() = false;
ReadySlot::Ready((slot.index, self.get(slot.index)))
}
Err(TryDequeueError::Inconsistent) => ReadySlot::Inconsistent,
Err(TryDequeueError::Empty) => ReadySlot::None,
Err(TryDequeueError::Busy) => unreachable!(),
}
}
}
pub(crate) enum ReadySlot<T> {
Ready(T),
Inconsistent,
None,
}
mod slot {
use core::{
mem::ManuallyDrop,
ptr::NonNull,
task::{RawWaker, RawWakerVTable, Waker},
};
use super::{slice_offset, WakerHeader, WakerItem};
unsafe fn meta_raw(ptr: *mut WakerItem) -> *mut WakerHeader {
let index = unsafe { (*ptr).index };
let slice_start = unsafe { ptr.sub(index) };
unsafe { slice_start.cast::<u8>().sub(slice_offset()) }.cast::<WakerHeader>()
}
unsafe fn meta_ref<'a>(ptr: *const WakerItem) -> &'a WakerHeader {
unsafe { &*meta_raw(ptr.cast_mut()) }
}
pub(super) fn waker(ptr: *const WakerItem) -> ManuallyDrop<Waker> {
static VTABLE: &RawWakerVTable =
&RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
unsafe fn clone_waker(waker: *const ()) -> RawWaker {
unsafe { meta_ref(waker.cast()).inc_strong() };
RawWaker::new(waker, VTABLE)
}
unsafe fn wake(waker: *const ()) {
unsafe {
wake_by_ref(waker);
drop_waker(waker);
}
}
unsafe fn wake_by_ref(waker: *const ()) {
let slot = waker.cast::<WakerItem>();
let node = unsafe { &*slot };
let mut wake_lock = node.wake_lock.lock();
let prev = core::mem::replace(&mut *wake_lock, true);
if !prev {
let meta = unsafe { meta_ref(slot) };
meta.queue
.enqueue(unsafe { NonNull::new_unchecked(slot.cast_mut()) });
meta.waker.notify();
}
}
unsafe fn drop_waker(waker: *const ()) {
let meta = unsafe { meta_ref(waker.cast()) };
if meta.dec_strong() {
unsafe {
super::drop_inner(meta_raw(waker.cast::<WakerItem>().cast_mut()), meta.len);
}
}
}
let raw_waker = RawWaker::new(ptr.cast(), VTABLE);
unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker)) }
}
}
impl WakerHeader {
fn inc_strong(&self) {
let old_size = self.strong.fetch_add(1, Ordering::Relaxed);
if old_size > (isize::MAX) as usize {
abort("too many arc clones");
}
}
fn dec_strong(&self) -> bool {
let old_size = self.strong.fetch_sub(1, Ordering::Release);
if old_size != 1 {
return false;
}
atomic::fence(Ordering::Acquire);
true
}
}
fn slice_offset() -> usize {
fn padding_needed_for(layout: &Layout, align: usize) -> usize {
let len = layout.size();
let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1);
len_rounded_up.wrapping_sub(len)
}
let layout = Layout::new::<WakerHeader>();
layout.size() + padding_needed_for(&layout, core::mem::align_of::<WakerItem>())
}
unsafe fn drop_inner(p: *mut WakerHeader, capacity: usize) {
let layout = WakerList::layout(capacity);
unsafe { drop_in_place(p) };
unsafe { dealloc(p.cast(), layout) };
}
impl Drop for WakerList {
fn drop(&mut self) {
let meta = unsafe { &*self.ptr.as_ptr() };
if meta.dec_strong() {
unsafe { drop_inner(self.ptr.as_ptr().cast(), meta.len) }
}
}
}
impl WakerList {
pub(crate) fn new(cap: usize) -> Self {
let arc_slice_layout = Self::layout(cap);
debug_assert!(arc_slice_layout.size() > 0);
let ptr = unsafe { alloc::alloc::alloc(arc_slice_layout) };
if ptr.is_null() {
handle_alloc_error(arc_slice_layout)
}
let meta = ptr.cast::<WakerHeader>();
let slice = unsafe { ptr.add(slice_offset()).cast::<WakerItem>() };
unsafe {
let stub = slice.add(cap);
for i in 0..cap {
ptr::write(
slice.add(i),
WakerItem {
index: i,
wake_lock: SpinMutex::new(false),
links: Links::new(),
},
);
}
ptr::write(
slice.add(cap),
WakerItem {
index: cap,
wake_lock: SpinMutex::new(false),
links: Links::new_stub(),
},
);
ptr::write(
meta,
WakerHeader {
strong: AtomicUsize::new(1),
len: cap,
waker: DiatomicWaker::new(),
queue: MpscQueue::new_with_stub(NonNull::new_unchecked(stub)),
},
);
}
Self {
ptr: unsafe { NonNull::new_unchecked(meta) },
phantom: PhantomData,
}
}
fn layout(cap: usize) -> Layout {
let padded = Layout::new::<WakerItem>().pad_to_align();
let alloc_size = padded.size().checked_mul(cap + 1).unwrap();
let slice_layout =
Layout::from_size_align(alloc_size, Layout::new::<WakerItem>().align()).unwrap();
Layout::new::<WakerHeader>()
.extend(slice_layout)
.unwrap()
.0
.pad_to_align()
}
}
fn abort(s: &str) -> ! {
struct DoublePanic;
impl Drop for DoublePanic {
fn drop(&mut self) {
panic!("panicking twice to abort the program");
}
}
let _bomb = DoublePanic;
panic!("{}", s);
}