#![deny(missing_docs)]
use crossbeam_channel as cc;
pub fn channel<Req, Res>() -> (RequestSender<Req, Res>, RequestReceiver<Req, Res>) {
let (request_sender, request_receiver) = cc::unbounded::<(Req, ResponseSender<Res>)>();
let request_sender = RequestSender::new(request_sender);
let request_receiver = RequestReceiver::new(request_receiver);
(request_sender, request_receiver)
}
#[derive(Debug)]
pub enum RequestError {
RecvError,
SendError
}
impl From<cc::RecvError> for RequestError {
fn from(_err: cc::RecvError) -> RequestError {
RequestError::RecvError
}
}
impl<T> From<cc::SendError<T>> for RequestError {
fn from(_err: cc::SendError<T>) -> RequestError {
RequestError::SendError
}
}
pub struct ResponseSender<Res> {
response_sender: cc::Sender<Res>,
}
impl<Res> ResponseSender<Res> {
fn new(response_sender: cc::Sender<Res>) -> ResponseSender<Res> {
ResponseSender {
response_sender: response_sender,
}
}
pub fn respond(&self, response: Res) {
match self.response_sender.send(response) {
Ok(_) => (),
Err(_e) => panic!("Response failed, send pipe was broken during request!")
}
}
}
pub struct RequestReceiver<Req, Res> {
request_receiver: cc::Receiver<(Req, ResponseSender<Res>)>,
}
impl<Req, Res> RequestReceiver<Req, Res> {
fn new(request_receiver: cc::Receiver<(Req, ResponseSender<Res>)>) -> RequestReceiver<Req, Res> {
RequestReceiver {
request_receiver,
}
}
pub fn poll(&self) -> Result<(Req, ResponseSender<Res>), RequestError> {
match self.request_receiver.recv() {
Ok((request, response_sender)) => Ok((request, response_sender)),
Err(_e) => Err(RequestError::RecvError)
}
}
pub fn poll_loop<F>(&self, mut f: F) where F: FnMut(Req, ResponseSender<Res>) {
loop {
match self.poll() {
Ok((request, response_sender)) => f(request, response_sender),
Err(e) => match e {
RequestError::RecvError => break,
_ => panic!("This is a bug")
}
};
}
}
}
#[derive(Clone)]
pub struct ResponseReceiver<Res> {
response_receiver: cc::Receiver<Res>,
}
impl<Res> ResponseReceiver<Res> {
fn new(response_receiver: cc::Receiver<Res>) -> ResponseReceiver<Res> {
ResponseReceiver {
response_receiver
}
}
pub fn collect(&self) -> Result<Res, RequestError> {
Ok(self.response_receiver.recv()?)
}
}
#[derive(Clone)]
pub struct RequestSender<Req, Res> {
request_sender: cc::Sender<(Req, ResponseSender<Res>)>,
}
impl<Req, Res> RequestSender<Req, Res> {
fn new(request_sender: cc::Sender<(Req, ResponseSender<Res>)>) -> RequestSender<Req, Res> {
RequestSender {
request_sender: request_sender,
}
}
pub fn request(&self, request: Req) -> Result<ResponseReceiver<Res>, RequestError> {
let (response_sender, response_receiver) = cc::unbounded::<Res>();
let response_sender = ResponseSender::new(response_sender);
self.request_sender.send((request, response_sender))?;
Ok(ResponseReceiver::new(response_receiver))
}
}