use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::mem::{ManuallyDrop, MaybeUninit};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering;
use crate::loom_exports::cell::UnsafeCell;
use crate::loom_exports::sync::atomic::{self, AtomicUsize};
const CLOSED: usize = 0b01;
const POPULATED: usize = 0b10;
struct Inner<T> {
state: AtomicUsize,
value: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Inner<T> {
unsafe fn write_value(&self, t: T) {
unsafe { self.value.with_mut(|value| (*value).write(t)) };
}
unsafe fn read_value(&self) -> T {
unsafe { self.value.with(|value| (*value).as_ptr().read()) }
}
unsafe fn drop_value_in_place(&self) {
unsafe {
self.value
.with_mut(|value| ptr::drop_in_place((*value).as_mut_ptr()))
};
}
}
#[derive(Debug)]
pub(crate) struct SlotWriter<T> {
inner: NonNull<Inner<T>>,
_phantom: PhantomData<Inner<T>>,
}
impl<T> SlotWriter<T> {
pub(crate) fn write(self, value: T) -> Result<(), WriteError> {
let this = ManuallyDrop::new(self);
unsafe {
this.inner.as_ref().write_value(value);
let state = this
.inner
.as_ref()
.state
.fetch_or(POPULATED | CLOSED, Ordering::Release);
if state & CLOSED == CLOSED {
atomic::fence(Ordering::Acquire);
this.inner.as_ref().drop_value_in_place();
drop(Box::from_raw(this.inner.as_ptr()));
Err(WriteError {})
} else {
Ok(())
}
}
}
}
impl<T> Drop for SlotWriter<T> {
fn drop(&mut self) {
unsafe {
let mut state = self.inner.as_ref().state.load(Ordering::Acquire);
if state & CLOSED == 0 {
state = self.inner.as_ref().state.fetch_or(CLOSED, Ordering::AcqRel);
if state & CLOSED == 0 {
return;
}
}
drop(Box::from_raw(self.inner.as_ptr()));
}
}
}
unsafe impl<T: Send> Send for SlotWriter<T> {}
unsafe impl<T: Send> Sync for SlotWriter<T> {}
impl<T> UnwindSafe for SlotWriter<T> {}
impl<T> RefUnwindSafe for SlotWriter<T> {}
#[derive(Debug)]
pub(crate) struct SlotReader<T> {
inner: NonNull<Inner<T>>,
_phantom: PhantomData<Inner<T>>,
}
impl<T> SlotReader<T> {
pub(crate) fn try_read(&mut self) -> Result<T, ReadError> {
unsafe {
let state = self.inner.as_ref().state.load(Ordering::Acquire);
if state == 0 {
return Err(ReadError::NoValue);
}
if state & POPULATED == 0 {
return Err(ReadError::Closed);
}
self.inner.as_ref().state.store(CLOSED, Ordering::Relaxed);
Ok(self.inner.as_ref().read_value())
}
}
}
impl<T> Drop for SlotReader<T> {
fn drop(&mut self) {
unsafe {
let mut state = self.inner.as_ref().state.load(Ordering::Acquire);
if state & CLOSED == 0 {
state = self.inner.as_ref().state.fetch_or(CLOSED, Ordering::AcqRel);
if state & CLOSED == 0 {
return;
}
}
if state & POPULATED == POPULATED {
self.inner.as_ref().drop_value_in_place();
}
drop(Box::from_raw(self.inner.as_ptr()));
}
}
}
unsafe impl<T: Send> Send for SlotReader<T> {}
unsafe impl<T: Send> Sync for SlotReader<T> {}
impl<T> UnwindSafe for SlotReader<T> {}
impl<T> RefUnwindSafe for SlotReader<T> {}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) enum ReadError {
NoValue,
Closed,
}
impl fmt::Display for ReadError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoValue => write!(fmt, "no value in the slot"),
Self::Closed => write!(fmt, "slot closed by writer"),
}
}
}
impl Error for ReadError {}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) struct WriteError {}
impl fmt::Display for WriteError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "slot closed by reader")
}
}
impl Error for WriteError {}
pub(crate) fn slot<T>() -> (SlotWriter<T>, SlotReader<T>) {
let inner = NonNull::new(Box::into_raw(Box::new(Inner {
state: AtomicUsize::new(0),
value: UnsafeCell::new(MaybeUninit::uninit()),
})))
.unwrap();
let writer = SlotWriter {
inner,
_phantom: PhantomData,
};
let reader = SlotReader {
inner,
_phantom: PhantomData,
};
(writer, reader)
}
#[cfg(all(test, not(nexosim_loom)))]
mod tests {
use super::*;
use std::thread;
#[test]
fn slot_single_threaded_write() {
let (writer, mut reader) = slot();
assert_eq!(reader.try_read(), Err(ReadError::NoValue));
assert!(writer.write(42).is_ok());
assert_eq!(reader.try_read(), Ok(42));
}
#[test]
fn slot_single_threaded_drop_writer() {
let (writer, mut reader) = slot::<i32>();
assert_eq!(reader.try_read(), Err(ReadError::NoValue));
drop(writer);
assert_eq!(reader.try_read(), Err(ReadError::Closed));
}
#[test]
fn slot_single_threaded_drop_reader() {
let writer = slot().0;
assert!(writer.write(42).is_err());
}
#[test]
fn slot_multi_threaded_write() {
let (writer, mut reader) = slot();
thread::spawn(move || {
assert!(writer.write(42).is_ok());
});
loop {
if let Ok(v) = reader.try_read() {
assert_eq!(v, 42);
return;
}
}
}
#[test]
fn slot_multi_threaded_drop_writer() {
let (writer, mut reader) = slot::<i32>();
thread::spawn(move || {
drop(writer);
});
loop {
let v = reader.try_read();
assert!(v.is_err());
if v == Err(ReadError::Closed) {
return;
}
}
}
}
#[cfg(all(test, nexosim_loom))]
mod tests {
use super::*;
use loom::model::Builder;
use loom::thread;
#[test]
fn loom_slot_write() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (writer, mut reader) = slot();
let th = thread::spawn(move || assert!(writer.write(42).is_ok()));
if let Ok(v) = reader.try_read() {
assert_eq!(v, 42);
}
th.join().unwrap();
});
}
#[test]
fn loom_slot_drop_writer() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (writer, mut reader) = slot::<i32>();
let th = thread::spawn(move || drop(writer));
assert!(reader.try_read().is_err());
th.join().unwrap();
});
}
}