use super::{Inner, COMPLETE, LOCK_BITS, LOCK_MASK, RX_LOCK};
use alloc::arc::Arc;
use core::sync::atomic::Ordering::*;
use sync::spsc::SpscInner;
pub struct Sender<E> {
inner: Arc<Inner<E>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SendError {
Canceled,
Overflow,
}
impl<E> Sender<E> {
#[inline(always)]
pub(super) fn new(inner: Arc<Inner<E>>) -> Self {
Self { inner }
}
#[inline(always)]
pub fn send(&mut self) -> Result<(), SendError> {
self.inner.send()
}
#[inline(always)]
pub fn send_err(self, err: E) -> Result<(), E> {
self.inner.send_err(err)
}
#[inline(always)]
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
self.inner.poll_cancel()
}
#[inline(always)]
pub fn is_canceled(&self) -> bool {
self.inner.is_canceled()
}
}
impl<E> Drop for Sender<E> {
#[inline(always)]
fn drop(&mut self) {
self.inner.drop_tx();
}
}
impl<E> Inner<E> {
fn send(&self) -> Result<(), SendError> {
self
.update(self.state_load(Relaxed), Acquire, Relaxed, |state| {
let mut lock = *state & LOCK_MASK;
if lock & COMPLETE != 0 {
return Err(SendError::Canceled);
}
*state = (*state as isize >> LOCK_BITS) as usize;
*state = state.wrapping_add(1);
if *state == 0 {
return Err(SendError::Overflow);
}
let rx_locked = if lock & RX_LOCK == 0 {
lock |= RX_LOCK;
true
} else {
false
};
*state <<= LOCK_BITS;
*state |= lock;
if rx_locked {
Ok(Some(*state))
} else {
Ok(None)
}
})
.map(|state| {
state.map(|state| {
unsafe {
(*self.rx_task.get()).as_ref().map(|task| task.notify());
}
self.update(state, Release, Relaxed, |state| {
*state ^= RX_LOCK;
Ok::<(), ()>(())
})
});
})
}
fn send_err(&self, err: E) -> Result<(), E> {
if self.is_canceled() {
Err(err)
} else {
unsafe { *self.err.get() = Some(err) };
Ok(())
}
}
}