#[cfg(loom)]
use loom::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
};
use std::fmt;
#[cfg(not(loom))]
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
};
use crate::{notify::Notify, slot::Slot};
mod error;
mod notify;
mod slot;
pub use error::{RecvError, TryRecvError};
#[inline]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
state: AtomicUsize::new(0),
value: Slot::new(),
notify: Notify::new(),
});
(
Sender {
inner: Some(inner.clone()),
},
Receiver { inner: Some(inner) },
)
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Option<Arc<Inner<T>>>,
}
#[derive(Debug)]
pub struct Receiver<T> {
inner: Option<Arc<Inner<T>>>,
}
unsafe impl<T> Send for Sender<T> where T: Send {}
unsafe impl<T> Sync for Sender<T> where T: Send {}
unsafe impl<T> Send for Receiver<T> where T: Send {}
unsafe impl<T> Sync for Receiver<T> where T: Send {}
struct Inner<T> {
state: AtomicUsize,
value: Slot<T>,
notify: Notify,
}
impl<T> Sender<T> {
#[inline]
pub fn send(mut self, value: T) -> Result<(), T> {
let inner = self.inner.take().unwrap();
unsafe {
inner.value.set(value);
}
let prev_state = inner.set_complete();
if prev_state.is_closed() {
return Err(unsafe { inner.consume_value().unwrap() });
}
if prev_state.is_waiting() {
unsafe {
inner.notify();
}
}
Ok(())
}
pub fn is_closed(&self) -> bool {
let inner = self.inner.as_ref().unwrap();
State(inner.state.load(Ordering::Acquire)).is_closed()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let prev_state = inner.set_complete();
if prev_state.is_waiting() {
unsafe {
inner.notify.notify();
}
}
}
}
}
impl<T> Receiver<T> {
#[inline]
pub fn recv(mut self) -> Result<T, RecvError> {
let inner = self.inner.take().unwrap();
let mut state = inner.state.load(Ordering::Acquire);
loop {
if State(state).is_complete() {
let value = unsafe { inner.consume_value() };
return value.ok_or(RecvError);
} else if State(state).is_closed() {
return Err(RecvError);
}
unsafe {
if !State(state).is_waiting() {
inner.notify.set_current();
}
}
match inner.state.compare_exchange(
state,
state | WAITING,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
thread::park();
state = inner.state.load(Ordering::Acquire);
}
Err(actual) => state = actual,
}
}
}
#[inline]
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let result = if let Some(inner) = self.inner.as_ref() {
let state = State(inner.state.load(Ordering::Acquire));
if state.is_complete() {
unsafe {
match inner.consume_value() {
Some(value) => Ok(value),
None => Err(TryRecvError::Closed),
}
}
} else if state.is_closed() {
Err(TryRecvError::Closed)
} else {
return Err(TryRecvError::Empty);
}
} else {
Err(TryRecvError::Closed)
};
self.inner = None;
result
}
pub fn close(&mut self) {
if let Some(inner) = self.inner.as_ref() {
let _ = inner.set_close();
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let prev_state = inner.set_close();
if prev_state.is_complete() {
unsafe {
inner.consume_value();
}
}
}
}
}
impl<T> Inner<T> {
#[inline]
fn set_complete(&self) -> State {
let mut state = self.state.load(Ordering::Relaxed);
loop {
if State(state).is_closed() {
break;
}
match self.state.compare_exchange_weak(
state,
state | VALUE_SENT,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => state = actual,
}
}
State(state)
}
#[inline]
fn set_close(&self) -> State {
State(self.state.fetch_or(CLOSED, Ordering::AcqRel))
}
#[inline]
unsafe fn notify(&self) {
unsafe {
self.notify.notify();
}
}
#[inline]
unsafe fn consume_value(&self) -> Option<T> {
unsafe { self.value.take() }
}
}
impl<T: fmt::Debug> fmt::Debug for Inner<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Inner")
.field("state", &State(self.state.load(Ordering::Relaxed)))
.finish()
}
}
struct State(usize);
const WAITING: usize = 0b0001;
const VALUE_SENT: usize = 0b0010;
const CLOSED: usize = 0b0100;
impl State {
#[inline]
fn is_closed(&self) -> bool {
self.0 & CLOSED == CLOSED
}
#[inline]
fn is_waiting(&self) -> bool {
self.0 & WAITING == WAITING
}
#[inline]
fn is_complete(&self) -> bool {
self.0 & VALUE_SENT == VALUE_SENT
}
}
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("State")
.field("is_complete", &self.is_complete())
.field("is_closed", &self.is_closed())
.field("is_waiting", &self.is_waiting())
.finish()
}
}