mod multi; pub use multi::*;
use std::{
mem::MaybeUninit,
ptr,
};
use crate::types::{
sync::{
atomic::{ AtomicPtr, AtomicBool, Ordering },
Arc,
},
cell::{ Cell, UnsafeCell },
thread::{ self, Thread },
hint::spin_loop,
};
use sptr::Strict;
pub type Fuze<T> = Fuse<T>;
pub type Fire = Flame;
const EXPLODED_STATE: usize = 1 << 0;
const DROPPED_STATE: usize = 1 << 1;
const PARKED_STATE: usize = 1 << 2;
const STATE_MASK: usize = 0b111;
#[repr(align(8))]
struct ParkedThread {
thread: Cell<Option<Thread>>,
signalled: AtomicBool,
next: *const ParkedThread,
}
struct Container<T> {
state_and_queue: Arc<AtomicPtr<ParkedThread>>,
cell: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Container<T> {
pub fn new() -> Self {
Self {
state_and_queue: Arc::new(AtomicPtr::new(ptr::null_mut())),
cell: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}
impl<T> Drop for Container<T> {
fn drop(&mut self) {
let state_and_thread = self.state_and_queue.swap(
sptr::invalid_mut(DROPPED_STATE),
Ordering::AcqRel
);
let state = Strict::addr(state_and_thread) & STATE_MASK;
debug_assert!(state & DROPPED_STATE == 0, "Bomb container has already been dropped.");
if state & EXPLODED_STATE != 0 {
unsafe {
#[cfg(not(loom))]
let value = self.cell.get().replace(MaybeUninit::uninit());
#[cfg(loom)]
let value = self.cell.get_mut().with(|p| p.replace(MaybeUninit::uninit()));
value.assume_init();
}
}
if state & PARKED_STATE == 0 {
return;
}
let thread = Strict::with_addr(state_and_thread,
Strict::addr(state_and_thread) & !STATE_MASK
) as *const ParkedThread;
unsafe {
if let Some(parked) = thread.as_ref() {
let thread = parked.thread.take().unwrap();
parked.signalled.store(true, Ordering::Release);
thread.unpark();
}
}
}
}
pub struct Bomb<T> {
data: Arc<Container<T>>,
}
unsafe impl<T: Send + Sync> Send for Bomb<T> { }
unsafe impl<T: Send + Sync> Sync for Bomb<T> { }
impl<T> Clone for Bomb<T> {
fn clone(&self) -> Self {
Self { data: self.data.clone() }
}
}
pub struct Fuse<T> {
data: Arc<Container<T>>,
}
unsafe impl<T: Send + Sync> Send for Fuse<T> { }
unsafe impl<T> Sync for Fuse<T> { }
pub struct Flame {
state_and_thread: Arc<AtomicPtr<ParkedThread>>,
}
impl<T> Bomb<T> {
pub fn new() -> (Fuse<T>, Bomb<T>) {
let fuse = Fuse::new();
let bomb = Self {
data: fuse.data.clone(),
};
(fuse, bomb)
}
pub fn exploded(&self) -> Option<&T> {
if self.data.state_and_queue.load(Ordering::Acquire) as usize & EXPLODED_STATE == 0 {
None
}
else {
#[cfg(not(loom))]
unsafe { Some(self.data.cell.get().as_ref().unwrap().assume_init_ref()) }
#[cfg(loom)]
unsafe { Some(self.data.cell.get().with(|p| p.as_ref().unwrap().assume_init_ref())) }
}
}
pub fn wait_for_explosion(&self) -> &T {
if let Some(value) = spin(SPIN_ITERATIONS, || self.exploded()) {
return value;
}
self.block();
self.exploded().unwrap()
}
fn block(&self) {
let mut state_and_queue = self.data.state_and_queue.load(Ordering::Relaxed);
loop {
if state_and_queue as usize & EXPLODED_STATE != 0 {
return;
}
let top = Strict::addr(state_and_queue) & !STATE_MASK;
let next = Strict::with_addr(state_and_queue, top) as *const _ ;
let node = ParkedThread {
thread: Cell::new(Some(thread::current())),
signalled: AtomicBool::new(false),
next,
};
let node_pointer = &node as *const _ as *mut _;
let exchange = self.data.state_and_queue.compare_exchange(
state_and_queue,
node_pointer,
Ordering::Release,
Ordering::Relaxed
);
if let Err(new_state) = exchange {
state_and_queue = new_state;
continue;
}
while !node.signalled.load(Ordering::Acquire) {
thread::park();
}
break;
}
}
}
impl<T> Fuse<T> {
fn new() -> Self {
Self {
data: Arc::new(Container::new()),
}
}
pub fn ignite(self, value: T) -> Flame {
self.light(value)
}
pub fn light(self, value: T) -> Flame {
#[cfg(not(loom))]
unsafe { *self.data.cell.get() = MaybeUninit::new(value); }
#[cfg(loom)]
unsafe { self.data.cell.get_mut().with(|p| { *p = MaybeUninit::new(value); }); }
let new_state = self.data.state_and_queue.swap(
sptr::invalid_mut(EXPLODED_STATE),
Ordering::AcqRel
);
let mut next = Strict::with_addr(
new_state,
Strict::addr(new_state) & !STATE_MASK
) as *const ParkedThread;
while !next.is_null() {
let current = unsafe { &*next };
next = current.next;
let thread = current.thread.take().unwrap();
current.signalled.store(true, Ordering::Release);
thread.unpark();
}
Flame {
state_and_thread: self.data.state_and_queue.clone(),
}
}
}
impl Flame {
pub fn extinguished(&self) -> bool {
self.state_and_thread.load(Ordering::Acquire) as usize & DROPPED_STATE != 0
}
pub fn wait_for_extinguish(&self) {
if spin(SPIN_ITERATIONS, || self.extinguished().then_some(())).is_some() { return; }
let mut state_and_thread = self.state_and_thread.load(Ordering::Relaxed);
loop {
if state_and_thread as usize & DROPPED_STATE != 0 {
return;
}
let thread = ParkedThread {
thread: Cell::new(Some(thread::current())),
signalled: AtomicBool::new(false),
next: ptr::null(),
};
let thread_pointer = &thread as *const ParkedThread;
let state = Strict::addr(state_and_thread) & STATE_MASK | PARKED_STATE;
let new_state_and_thread = state | Strict::addr(thread_pointer);
let new_state_and_thread = Strict::with_addr(
thread_pointer,
new_state_and_thread
);
let exchange = self.state_and_thread.compare_exchange(
state_and_thread,
new_state_and_thread as *mut _,
Ordering::Release,
Ordering::Relaxed
);
if let Err(new_state) = exchange {
state_and_thread = new_state;
continue;
}
while !thread.signalled.load(Ordering::Acquire) {
thread::park();
}
break;
}
}
}
const SPIN_ITERATIONS: usize = 10;
fn spin<T>(iterations: usize, mut f: impl FnMut() -> Option<T>) -> Option<T> {
for _ in 0..iterations {
match f() {
None => spin_loop(),
opt => return opt,
}
}
None
}