use super::{Inner, COMPLETE, INDEX_BITS, INDEX_MASK, RX_LOCK};
use alloc::arc::Arc;
use core::{fmt, ptr};
use core::sync::atomic::Ordering::*;
use sync::spsc::SpscInner;
pub struct Sender<T, E> {
inner: Arc<Inner<T, E>>,
}
pub struct SendError<T> {
pub value: T,
pub kind: SendErrorKind,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SendErrorKind {
Canceled,
Overflow,
}
impl<T, E> Sender<T, E> {
#[inline(always)]
pub(super) fn new(inner: Arc<Inner<T, E>>) -> Self {
Self { inner }
}
#[inline(always)]
pub fn send(&mut self, value: T) -> Result<(), SendError<T>> {
self.inner.send(value)
}
#[inline(always)]
pub fn send_overwrite(&mut self, value: T) -> Result<(), T> {
self.inner.send_overwrite(value)
}
#[inline(always)]
pub fn send_err(self, err: E) -> Result<(), E> {
self.inner.send_err(err)
}
#[inline(always)]
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
self.inner.poll_cancel()
}
#[inline(always)]
pub fn is_canceled(&self) -> bool {
self.inner.is_canceled()
}
}
impl<T, E> Drop for Sender<T, E> {
#[inline(always)]
fn drop(&mut self) {
self.inner.drop_tx();
}
}
impl<T, E> Inner<T, E> {
fn send(&self, value: T) -> Result<(), SendError<T>> {
let state = self.state_load(Relaxed);
if state & COMPLETE != 0 {
Err(SendError::new(value, SendErrorKind::Canceled))
} else if let Some(index) = Self::put_index(state, self.buffer.cap()) {
self.put(value, state, index)
} else {
Err(SendError::new(value, SendErrorKind::Overflow))
}
}
fn send_overwrite(&self, value: T) -> Result<(), T> {
let mut state = self.state_load(Relaxed);
loop {
if state & COMPLETE != 0 {
break Err(value);
}
match Self::put_index(state, self.buffer.cap()) {
Some(index) => break self.put(value, state, index),
None => {
state = self
.update(state, Relaxed, Relaxed, |state| {
if let Some(index) = Self::take_index(state, self.buffer.cap()) {
Ok((*state, index))
} else {
Err(*state)
}
})
.map(|(state, index)| {
unsafe {
ptr::drop_in_place(self.buffer.ptr().offset(index as isize));
}
state
})
.unwrap_or_else(|state| state);
}
}
}
}
fn send_err(&self, err: E) -> Result<(), E> {
if self.is_canceled() {
Err(err)
} else {
unsafe { *self.err.get() = Some(err) };
Ok(())
}
}
#[inline(always)]
fn put_index(state: usize, capacity: usize) -> Option<usize> {
let count = state & INDEX_MASK;
if count == capacity {
None
} else {
let begin = state >> INDEX_BITS & INDEX_MASK;
let index = begin.wrapping_add(count).wrapping_rem(capacity);
Some(index)
}
}
#[inline(always)]
fn put<U>(&self, value: T, state: usize, index: usize) -> Result<(), U> {
unsafe {
ptr::write(self.buffer.ptr().offset(index as isize), value);
}
self
.update(state, AcqRel, Relaxed, |state| {
*state = state.wrapping_add(1);
if *state & RX_LOCK == 0 {
*state |= RX_LOCK;
Ok(Some(*state))
} else {
Ok(None)
}
})
.map(|state| {
state.map(|state| {
unsafe {
(*self.rx_task.get()).as_ref().map(|task| task.notify());
}
self.update(state, Release, Relaxed, |state| {
*state ^= RX_LOCK;
Ok::<(), ()>(())
})
});
})
}
}
impl<T> SendError<T> {
#[inline(always)]
fn new(value: T, kind: SendErrorKind) -> Self {
SendError { value, kind }
}
}
impl<T> fmt::Debug for SendError<T> {
#[inline(always)]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.kind.fmt(f)
}
}
impl<T> PartialEq for SendError<T> {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.kind.eq(&other.kind)
}
}
impl<T> Eq for SendError<T> {}