use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::error::Error;
use std::fmt;
use futures_core::{Future, Poll, Async};
use futures_core::task::{self, Waker};
use futures_core::never::Never;
use lock::Lock;
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}
#[derive(Debug)]
struct Inner<T> {
complete: AtomicBool,
data: Lock<Option<T>>,
rx_task: Lock<Option<Waker>>,
tx_task: Lock<Option<Waker>>,
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner::new());
let receiver = Receiver {
inner: inner.clone(),
};
let sender = Sender {
inner: inner,
};
(sender, receiver)
}
impl<T> Inner<T> {
fn new() -> Inner<T> {
Inner {
complete: AtomicBool::new(false),
data: Lock::new(None),
rx_task: Lock::new(None),
tx_task: Lock::new(None),
}
}
fn send(&self, t: T) -> Result<(), T> {
if self.complete.load(SeqCst) {
return Err(t)
}
if let Some(mut slot) = self.data.try_lock() {
assert!(slot.is_none());
*slot = Some(t);
drop(slot);
if self.complete.load(SeqCst) {
if let Some(mut slot) = self.data.try_lock() {
if let Some(t) = slot.take() {
return Err(t);
}
}
}
Ok(())
} else {
Err(t)
}
}
fn poll_cancel(&self, cx: &mut task::Context) -> Poll<(), Never> {
if self.complete.load(SeqCst) {
return Ok(Async::Ready(()))
}
let handle = cx.waker().clone();
match self.tx_task.try_lock() {
Some(mut p) => *p = Some(handle),
None => return Ok(Async::Ready(())),
}
if self.complete.load(SeqCst) {
Ok(Async::Ready(()))
} else {
Ok(Async::Pending)
}
}
fn is_canceled(&self) -> bool {
self.complete.load(SeqCst)
}
fn drop_tx(&self) {
self.complete.store(true, SeqCst);
if let Some(mut slot) = self.rx_task.try_lock() {
if let Some(task) = slot.take() {
drop(slot);
task.wake();
}
}
}
fn close_rx(&self) {
self.complete.store(true, SeqCst);
if let Some(mut handle) = self.tx_task.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.wake()
}
}
}
fn try_recv(&self) -> Result<Option<T>, Canceled> {
if self.complete.load(SeqCst) {
if let Some(mut slot) = self.data.try_lock() {
if let Some(data) = slot.take() {
return Ok(Some(data.into()));
}
}
Err(Canceled)
} else {
Ok(None)
}
}
fn recv(&self, cx: &mut task::Context) -> Poll<T, Canceled> {
let mut done = false;
if self.complete.load(SeqCst) {
done = true;
} else {
let task = cx.waker().clone();
match self.rx_task.try_lock() {
Some(mut slot) => *slot = Some(task),
None => done = true,
}
}
if done || self.complete.load(SeqCst) {
if let Some(mut slot) = self.data.try_lock() {
if let Some(data) = slot.take() {
return Ok(data.into());
}
}
Err(Canceled)
} else {
Ok(Async::Pending)
}
}
fn drop_rx(&self) {
self.complete.store(true, SeqCst);
if let Some(mut slot) = self.rx_task.try_lock() {
let task = slot.take();
drop(slot);
drop(task);
}
if let Some(mut handle) = self.tx_task.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.wake()
}
}
}
}
impl<T> Sender<T> {
pub fn send(self, t: T) -> Result<(), T> {
self.inner.send(t)
}
pub fn poll_cancel(&mut self, cx: &mut task::Context) -> Poll<(), Never> {
self.inner.poll_cancel(cx)
}
pub fn is_canceled(&self) -> bool {
self.inner.is_canceled()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner.drop_tx()
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Canceled;
impl fmt::Display for Canceled {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "oneshot canceled")
}
}
impl Error for Canceled {
fn description(&self) -> &str {
"oneshot canceled"
}
}
impl<T> Receiver<T> {
pub fn close(&mut self) {
self.inner.close_rx()
}
pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
self.inner.try_recv()
}
}
impl<T> Future for Receiver<T> {
type Item = T;
type Error = Canceled;
fn poll(&mut self, cx: &mut task::Context) -> Poll<T, Canceled> {
self.inner.recv(cx)
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.inner.drop_rx()
}
}