pub use super::{
NoRecv,
RecvErr::{self, *},
};
use incin::Pause;
use owned_alloc::OwnedAlloc;
use ptr::{bypass_null, check_null_align};
use removable::Removable;
use std::{
fmt,
ptr::{null_mut, NonNull},
sync::{
atomic::{AtomicPtr, Ordering::*},
Arc,
},
};
pub fn create<T>() -> (Sender<T>, Receiver<T>) {
with_incin(SharedIncin::new())
}
pub fn with_incin<T>(incin: SharedIncin<T>) -> (Sender<T>, Receiver<T>) {
check_null_align::<Node<T>>();
let alloc = OwnedAlloc::new(Node {
message: Removable::empty(),
next: AtomicPtr::new(null_mut()),
});
let single_node = alloc.into_raw();
let sender = Sender { back: single_node };
let receiver = Receiver {
inner: Arc::new(ReceiverInner {
front: AtomicPtr::new(single_node.as_ptr()),
incin,
}),
};
(sender, receiver)
}
pub struct Sender<T> {
back: NonNull<Node<T>>,
}
impl<T> Sender<T> {
pub fn send(&mut self, message: T) -> Result<(), NoRecv<T>> {
let alloc = OwnedAlloc::new(Node {
message: Removable::new(message),
next: AtomicPtr::new(null_mut()),
});
let nnptr = alloc.into_raw();
let res = unsafe {
self.back.as_ref().next.compare_exchange(
null_mut(),
nnptr.as_ptr(),
Release,
Relaxed,
)
};
if res.is_ok() {
self.back = nnptr;
Ok(())
} else {
let mut alloc = unsafe { OwnedAlloc::from_raw(nnptr) };
let message = alloc.message.replace(None).unwrap();
Err(NoRecv { message })
}
}
pub fn is_connected(&self) -> bool {
let back = unsafe { self.back.as_ref() };
back.next.load(Relaxed).is_null()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let res = unsafe {
self.back
.as_ref()
.next
.swap((null_mut::<Node<T>>() as usize | 1) as *mut _, Relaxed)
};
if !res.is_null() {
unsafe { OwnedAlloc::from_raw(self.back) };
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
fmtr.write_str("spmc::Sender")
}
}
unsafe impl<T> Send for Sender<T> where T: Send {}
unsafe impl<T> Sync for Sender<T> where T: Send {}
pub struct Receiver<T> {
inner: Arc<ReceiverInner<T>>,
}
impl<T> Receiver<T> {
#[allow(unused_must_use)]
pub fn recv(&self) -> Result<T, RecvErr> {
let pause = self.inner.incin.inner.pause();
let mut front_nnptr = unsafe {
bypass_null(self.inner.front.load(Relaxed))
};
loop {
match unsafe { front_nnptr.as_ref().message.take(AcqRel) } {
Some(val) => {
unsafe { self.try_clear_first(front_nnptr, &pause) };
break Ok(val);
},
None => unsafe {
front_nnptr = self.try_clear_first(front_nnptr, &pause)?;
},
}
}
}
pub fn is_connected(&self) -> bool {
let _pause = self.inner.incin.inner.pause();
let front = unsafe { &*self.inner.front.load(Relaxed) };
front.message.is_present(Relaxed)
|| front.next.load(Relaxed) as usize & 1 == 0
}
pub fn incin(&self) -> SharedIncin<T> {
self.inner.incin.clone()
}
unsafe fn try_clear_first(
&self,
expected: NonNull<Node<T>>,
pause: &Pause<OwnedAlloc<Node<T>>>,
) -> Result<NonNull<Node<T>>, RecvErr> {
let next = expected.as_ref().next.load(Acquire);
if next as usize & 1 == 1 {
Err(RecvErr::NoSender)
} else if next.is_null() {
Err(RecvErr::NoMessage)
} else {
let ptr = expected.as_ptr();
let next = match self
.inner
.front
.compare_exchange(ptr, next, Relaxed, Relaxed)
{
Ok(_) => {
pause.add_to_incin(OwnedAlloc::from_raw(expected));
next
},
Err(found) => found,
};
Ok(bypass_null(next))
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
write!(fmtr, "spmc::Receiver {} ptr: {:p} {}", '{', self.inner, '}')
}
}
unsafe impl<T> Send for Receiver<T> where T: Send {}
unsafe impl<T> Sync for Receiver<T> where T: Send {}
struct ReceiverInner<T> {
front: AtomicPtr<Node<T>>,
incin: SharedIncin<T>,
}
impl<T> Drop for ReceiverInner<T> {
fn drop(&mut self) {
let front = self.front.get_mut();
loop {
let front_nnptr = unsafe { bypass_null(*front) };
let res = unsafe {
front_nnptr.as_ref().next.compare_exchange(
null_mut(),
(null_mut::<Node<T>>() as usize | 1) as *mut _,
AcqRel,
Acquire,
)
};
match res {
Ok(_) => break,
Err(next) => {
unsafe { OwnedAlloc::from_raw(front_nnptr) };
if next as usize & 1 == 1 {
break;
}
*front = next;
},
}
}
}
}
#[repr(align(/* at least */ 2))]
struct Node<T> {
message: Removable<T>,
next: AtomicPtr<Node<T>>,
}
make_shared_incin! {
{ "`spmc::Receiver`" }
pub SharedIncin<T> of OwnedAlloc<Node<T>>
}
#[cfg(test)]
mod test {
use channel::spmc;
use std::{
sync::{
atomic::{AtomicBool, Ordering::*},
Arc,
},
thread,
};
#[test]
fn correct_numbers() {
const THREADS: usize = 8;
const MSGS: usize = 512;
let mut done = Vec::with_capacity(MSGS);
for _ in 0 .. MSGS {
done.push(AtomicBool::new(false));
}
let done = Arc::<[AtomicBool]>::from(done);
let (mut sender, receiver) = spmc::create::<usize>();
let mut threads = Vec::with_capacity(THREADS);
for _ in 0 .. THREADS {
let done = done.clone();
let receiver = receiver.clone();
threads.push(thread::spawn(move || loop {
match receiver.recv() {
Ok(i) => assert!(!done[i].swap(true, AcqRel)),
Err(spmc::NoSender) => break,
Err(spmc::NoMessage) => (),
}
}))
}
for i in 0 .. MSGS {
sender.send(i).unwrap();
}
drop(sender);
for thread in threads {
thread.join().unwrap();
}
for status in done.iter() {
assert!(status.load(Relaxed));
}
}
}