use futures::{Async, Future, Poll};
use nbchan;
use std::error;
use std::fmt;
use std::sync::mpsc::{RecvError, SendError};
use super::Notifier;
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let notifier = Notifier::new();
let (tx, rx) = nbchan::oneshot::channel();
(
Sender {
inner: Some(tx),
notifier: notifier.clone(),
},
Receiver {
inner: rx,
notifier,
},
)
}
pub struct Sender<T> {
inner: Option<nbchan::oneshot::Sender<T>>,
notifier: Notifier,
}
impl<T> Sender<T> {
pub fn send(mut self, t: T) -> Result<(), SendError<T>> {
self.inner.take().expect("Never fails").send(t)?;
Ok(())
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.notifier.notify();
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Sender {{ .. }}")
}
}
pub struct Receiver<T> {
inner: nbchan::oneshot::Receiver<T>,
notifier: Notifier,
}
impl<T> Future for Receiver<T> {
type Item = T;
type Error = RecvError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut result = self.inner.try_recv();
if let Err(nbchan::oneshot::TryRecvError::Empty) = result {
self.notifier.await();
result = self.inner.try_recv();
}
match result {
Err(nbchan::oneshot::TryRecvError::Empty) => Ok(Async::NotReady),
Err(nbchan::oneshot::TryRecvError::Disconnected) => Err(RecvError),
Ok(t) => Ok(Async::Ready(t)),
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.notifier.notify();
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Receiver {{ .. }}")
}
}
pub fn monitor<T, E>() -> (Monitored<T, E>, Monitor<T, E>) {
let (tx, rx) = channel();
(Monitored(tx), Monitor(rx))
}
#[derive(Debug)]
pub struct Monitored<T, E>(Sender<Result<T, E>>);
impl<T, E> Monitored<T, E> {
pub fn exit(self, result: Result<T, E>) {
let _ = self.0.send(result);
}
}
#[derive(Debug)]
pub struct Monitor<T, E>(Receiver<Result<T, E>>);
impl<T, E> Future for Monitor<T, E> {
type Item = T;
type Error = MonitorError<E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(r) = self.0.poll().or(Err(MonitorError::Aborted))? {
match r {
Err(e) => Err(MonitorError::Failed(e)),
Ok(v) => Ok(Async::Ready(v)),
}
} else {
Ok(Async::NotReady)
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MonitorError<E> {
Aborted,
Failed(E),
}
impl<E> MonitorError<E> {
pub fn map<F, T>(self, f: F) -> MonitorError<T>
where
F: FnOnce(E) -> T,
{
match self {
MonitorError::Aborted => MonitorError::Aborted,
MonitorError::Failed(e) => MonitorError::Failed(f(e)),
}
}
pub fn unwrap_or(self, or_error: E) -> E {
self.unwrap_or_else(|| or_error)
}
pub fn unwrap_or_else<F>(self, f: F) -> E
where
F: FnOnce() -> E,
{
match self {
MonitorError::Aborted => f(),
MonitorError::Failed(e) => e,
}
}
}
impl<E: error::Error> error::Error for MonitorError<E> {
fn description(&self) -> &str {
match *self {
MonitorError::Aborted => "Monitor target aborted",
MonitorError::Failed(_) => "Monitor target failed: {}",
}
}
fn cause(&self) -> Option<&dyn error::Error> {
match *self {
MonitorError::Aborted => None,
MonitorError::Failed(ref e) => Some(e),
}
}
}
impl<E: fmt::Display> fmt::Display for MonitorError<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
MonitorError::Aborted => write!(f, "Monitor target aborted"),
MonitorError::Failed(ref e) => write!(f, "Monitor target failed: {}", e),
}
}
}
pub fn link<T0, E0, T1, E1>() -> LinkPair<T0, E0, T1, E1> {
let (tx0, rx0) = monitor();
let (tx1, rx1) = monitor();
(Link { tx: tx0, rx: rx1 }, Link { tx: tx1, rx: rx0 })
}
pub type LinkPair<T0, E0, T1, E1> = (Link<T0, E0, T1, E1>, Link<T1, E1, T0, E0>);
#[derive(Debug)]
pub struct Link<T0, E0, T1 = T0, E1 = E0> {
tx: Monitored<T0, E0>,
rx: Monitor<T1, E1>,
}
impl<T0, E0, T1, E1> Link<T0, E0, T1, E1> {
pub fn exit(self, result: Result<T0, E0>) {
self.tx.exit(result);
}
}
impl<T0, E0, T1, E1> Future for Link<T0, E0, T1, E1> {
type Item = T1;
type Error = MonitorError<E1>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.rx.poll()
}
}