use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
use nbchan::mpsc as nb_mpsc;
use std::fmt;
use std::sync::mpsc::{SendError, TryRecvError, TrySendError};
use super::Notifier;
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let notifier = Notifier::new();
let (tx, rx) = nb_mpsc::channel();
(
Sender {
inner: tx,
notifier: notifier.clone(),
},
Receiver {
inner: rx,
notifier,
},
)
}
#[deprecated]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
let notifier = Notifier::new();
let (tx, rx) = nb_mpsc::sync_channel(bound);
(
SyncSender {
inner: tx,
notifier: notifier.clone(),
},
Receiver {
inner: rx,
notifier,
},
)
}
pub struct Receiver<T> {
inner: nb_mpsc::Receiver<T>,
notifier: Notifier,
}
impl<T> Stream for Receiver<T> {
type Error = ();
type Item = T;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut result = self.inner.try_recv();
if let Err(TryRecvError::Empty) = result {
self.notifier.await();
result = self.inner.try_recv();
}
match result {
Err(TryRecvError::Empty) => Ok(Async::NotReady),
Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)),
Ok(t) => Ok(Async::Ready(Some(t))),
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.notifier.notify();
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Receiver {{ .. }}")
}
}
pub struct Sender<T> {
inner: nb_mpsc::Sender<T>,
notifier: Notifier,
}
impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.inner.send(t)?;
self.notifier.notify();
Ok(())
}
pub fn is_disconnected(&self) -> bool {
self.inner.is_disconnected()
}
}
unsafe impl<T: Send> Sync for Sender<T> {}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
inner: self.inner.clone(),
notifier: self.notifier.clone(),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.notifier.notify();
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Sender {{ .. }}")
}
}
pub struct SyncSender<T> {
inner: nb_mpsc::SyncSender<T>,
notifier: Notifier,
}
impl<T> Sink for SyncSender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
match self.inner.try_send(item) {
Err(TrySendError::Full(item)) => Ok(AsyncSink::NotReady(item)),
Err(TrySendError::Disconnected(item)) => Err(SendError(item)),
Ok(()) => {
self.notifier.notify();
Ok(AsyncSink::Ready)
}
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
Ok(Async::Ready(()))
}
}
unsafe impl<T: Send> Sync for SyncSender<T> {}
impl<T> Clone for SyncSender<T> {
fn clone(&self) -> Self {
SyncSender {
inner: self.inner.clone(),
notifier: self.notifier.clone(),
}
}
}
impl<T> Drop for SyncSender<T> {
fn drop(&mut self) {
self.notifier.notify();
}
}
impl<T> fmt::Debug for SyncSender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SyncSender {{ .. }}")
}
}