#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use async_std::{channel, prelude::FutureExt, sync::Mutex};
use std::any::Any;
#[doc(hidden)]
#[cfg(feature = "derive")]
pub use async_reply_derive::*;
pub fn endpoints() -> (Requester, Replyer) {
let (sndr, recv) = channel::bounded(10);
(
Requester { inner: sndr },
Replyer {
buffer: Mutex::default(),
inner: recv,
},
)
}
#[derive(Debug, Clone)]
pub struct Requester {
inner: channel::Sender<Box<dyn Any + Send>>,
}
#[derive(Debug)]
pub struct Replyer {
buffer: Mutex<Vec<Box<dyn Any + Send>>>,
inner: channel::Receiver<Box<dyn Any + Send>>,
}
#[must_use = "ReplyHandle should be used to respond to the received message"]
#[derive(Debug)]
pub struct ReplyHandle<T>(channel::Sender<T>);
struct MessageHandle<M: Message> {
msg: M,
sndr: ReplyHandle<M::Response>,
}
pub trait Message: 'static + Send {
type Response: Send;
}
impl Requester {
pub async fn send<M>(&self, msg: M) -> Result<M::Response, Error>
where
M: Message,
{
let (sndr, recv) = channel::bounded::<M::Response>(1);
let sndr = ReplyHandle(sndr);
self.inner
.send(Box::new(MessageHandle { msg, sndr }))
.await?;
recv.recv().await.map_err(Error::ReplayError)
}
}
impl Replyer {
pub async fn recv<M>(&self) -> Result<(M, ReplyHandle<M::Response>), Error>
where
M: Message,
{
let is_message_type = |any: &Box<dyn Any + Send>| any.is::<MessageHandle<M>>();
loop {
let buffer_search_fut = async {
loop {
let mut buffer = self.buffer.lock().await;
let msg_index = buffer
.iter()
.enumerate()
.find(|(_, elem)| is_message_type(elem))
.map(|(index, _)| index);
if let Some(index) = msg_index {
return Ok(buffer.remove(index));
}
async_std::task::yield_now().await;
}
};
let channel_search_fut = async { self.inner.recv().await.map_err(Error::ReceivError) };
let msg = buffer_search_fut.race(channel_search_fut).await?;
if is_message_type(&msg) {
return Ok(msg.downcast::<MessageHandle<M>>().unwrap().into_tuple());
}
self.buffer.lock().await.push(msg);
}
}
}
impl<T> ReplyHandle<T> {
pub async fn respond(&self, r: T) -> Result<(), Error> {
Ok(self.0.send(r).await?)
}
}
impl<M: Message> MessageHandle<M> {
fn into_tuple(self) -> (M, ReplyHandle<M::Response>) {
(self.msg, self.sndr)
}
}
#[derive(Debug, derive_more::Display, derive_more::Error, derive_more::From)]
pub enum Error {
SendError,
#[from(ignore)]
ReplayError(channel::RecvError),
ReceivError(channel::RecvError),
}
impl<T> From<channel::SendError<T>> for Error {
fn from(_: channel::SendError<T>) -> Self {
Error::SendError
}
}