bi-directional-pipe 0.1.4

async bi-directional pipe.
Documentation
use crate::PipeError;
use std::cell::UnsafeCell;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

struct Status<T> {
    result: UnsafeCell<Option<Result<T>>>,
    wake: UnsafeCell<Option<Waker>>,
}

pub struct PipeHandler<T> {
    my_status: Rc<Status<T>>,
}

pub struct Left<L, R> {
    my_status: Rc<Status<L>>,
    right_status: Rc<Status<R>>,
}

impl<L, R> Left<L, R> {
    #[inline]
    pub fn recv(&self) -> PipeHandler<L> {
        PipeHandler {
            my_status: self.my_status.clone(),
        }
    }
    #[inline]
    pub fn send(&self, v: R) {
        unsafe {
            *self.right_status.result.get() = Some(Ok(v));
            if let Some(wake) = (*self.right_status.wake.get()).take() {
                wake.wake()
            }
        }
    }
}

impl<L, R> Drop for Left<L, R> {
    #[inline]
    fn drop(&mut self) {
        unsafe {
            *self.my_status.result.get() =
                Some(Err(Error::new(ErrorKind::Other, PipeError::LeftDrop)));
            *self.right_status.result.get() =
                Some(Err(Error::new(ErrorKind::Other, PipeError::LeftDrop)));
            if let Some(wake) = (*self.right_status.wake.get()).take() {
                wake.wake()
            }
        }
    }
}

pub struct Right<L, R> {
    my_status: Rc<Status<R>>,
    left_status: Rc<Status<L>>,
}

impl<L, R> Right<L, R> {
    #[inline]
    pub fn recv(&self) -> PipeHandler<R> {
        PipeHandler {
            my_status: self.my_status.clone(),
        }
    }
    #[inline]
    pub fn send(&self, v: L) {
        unsafe {
            *self.left_status.result.get() = Some(Ok(v));
            if let Some(wake) = (*self.left_status.wake.get()).take() {
                wake.wake()
            }
        }
    }
}

impl<L, R> Drop for Right<L, R> {
    #[inline]
    fn drop(&mut self) {
        unsafe {
            *self.my_status.result.get() =
                Some(Err(Error::new(ErrorKind::Other, PipeError::RightDrop)));
            *self.left_status.result.get() =
                Some(Err(Error::new(ErrorKind::Other, PipeError::RightDrop)));
            if let Some(wake) = (*self.left_status.wake.get()).take() {
                wake.wake()
            }
        }
    }
}

impl<T> Future for PipeHandler<T> {
    type Output = Result<T>;
    #[inline]
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        unsafe {
            let this = Pin::into_inner(self);
            *this.my_status.wake.get() = Some(cx.waker().clone());
            if let Some(r) = (*this.my_status.result.get()).take() {
                Poll::Ready(r)
            } else {
                Poll::Pending
            }
        }
    }
}

#[inline]
pub fn pipe<L, R>() -> (Left<L, R>, Right<L, R>) {
    let left_status = Rc::new(Status {
        result: Default::default(),
        wake: Default::default(),
    });

    let right_status = Rc::new(Status {
        result: Default::default(),
        wake: Default::default(),
    });

    let left = Left {
        my_status: left_status.clone(),
        right_status: right_status.clone(),
    };

    let right = Right {
        my_status: right_status,
        left_status,
    };

    (left, right)
}