async_reply/
lib.rs

1// Copyright (C) 2020 O.S. Systems Sofware LTDA
2//
3// SPDX-License-Identifier: Apache-2.0
4
5//! Allow the sending and reciving of typed messages.
6//!
7//! # Example
8//! ```
9//! # async_std::task::block_on(async {
10//! # use async_std::prelude::FutureExt;
11//! use async_reply::Message;
12//!
13//! #[derive(Debug, Message)]
14//! #[rtype(response = "Pong")]
15//! struct Ping;
16//!
17//! #[derive(Debug)]
18//! struct Pong;
19//!
20//! let (requester, replyer) = async_reply::endpoints();
21//!
22//! let ping_fut = async {
23//!     println!("Sending Ping");
24//!     let reply = requester.send(Ping).await.unwrap();
25//!     println!("Received {:?}", reply);
26//! };
27//!
28//! let pong_fut = async {
29//!     let (msg, handler) = replyer.recv::<Ping>().await.unwrap();
30//!     handler.respond(Pong).await.unwrap();
31//!     println!("Replied {:?} with Pong", msg);
32//! };
33//!
34//! ping_fut.join(pong_fut).await;
35//! # });
36//! ```
37
38#![forbid(unsafe_code)]
39#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
40
41use async_std::{channel, prelude::FutureExt, sync::Mutex};
42use std::any::Any;
43
44#[doc(hidden)]
45#[cfg(feature = "derive")]
46pub use async_reply_derive::*;
47
48/// Create a [`Requester`] and [`Replyer`] message endpoints which allow the
49/// sending and receiving of typed messages.
50pub fn endpoints() -> (Requester, Replyer) {
51    let (sndr, recv) = channel::bounded(10);
52    (
53        Requester { inner: sndr },
54        Replyer {
55            buffer: Mutex::default(),
56            inner: recv,
57        },
58    )
59}
60
61/// The requester side of a endpoint.
62#[derive(Debug, Clone)]
63pub struct Requester {
64    inner: channel::Sender<Box<dyn Any + Send>>,
65}
66
67/// The replyer side of a endpoint.
68#[derive(Debug)]
69pub struct Replyer {
70    buffer: Mutex<Vec<Box<dyn Any + Send>>>,
71    inner: channel::Receiver<Box<dyn Any + Send>>,
72}
73
74/// The reply handle to respond to the received message.
75#[must_use = "ReplyHandle should be used to respond to the received message"]
76#[derive(Debug)]
77pub struct ReplyHandle<T>(channel::Sender<T>);
78
79struct MessageHandle<M: Message> {
80    msg: M,
81    sndr: ReplyHandle<M::Response>,
82}
83
84/// A trait to bind the message and its respective response type.
85pub trait Message: 'static + Send {
86    /// The response type of the message.
87    type Response: Send;
88}
89
90impl Requester {
91    /// Send the message and wait its response.
92    pub async fn send<M>(&self, msg: M) -> Result<M::Response, Error>
93    where
94        M: Message,
95    {
96        let (sndr, recv) = channel::bounded::<M::Response>(1);
97        let sndr = ReplyHandle(sndr);
98
99        self.inner
100            .send(Box::new(MessageHandle { msg, sndr }))
101            .await?;
102
103        recv.recv().await.map_err(Error::ReplayError)
104    }
105}
106
107impl Replyer {
108    /// Receives the message and provide the handle to respond back.
109    pub async fn recv<M>(&self) -> Result<(M, ReplyHandle<M::Response>), Error>
110    where
111        M: Message,
112    {
113        let is_message_type = |any: &Box<dyn Any + Send>| any.is::<MessageHandle<M>>();
114
115        loop {
116            let buffer_search_fut = async {
117                loop {
118                    let mut buffer = self.buffer.lock().await;
119                    let msg_index = buffer
120                        .iter()
121                        .enumerate()
122                        .find(|(_, elem)| is_message_type(elem))
123                        .map(|(index, _)| index);
124                    if let Some(index) = msg_index {
125                        // We have a buffereda message of this type, so we pop
126                        // and return it
127                        return Ok(buffer.remove(index));
128                    }
129                    async_std::task::yield_now().await;
130                }
131            };
132            let channel_search_fut = async { self.inner.recv().await.map_err(Error::ReceivError) };
133
134            let msg = buffer_search_fut.race(channel_search_fut).await?;
135            if is_message_type(&msg) {
136                return Ok(msg.downcast::<MessageHandle<M>>().unwrap().into_tuple());
137            }
138            self.buffer.lock().await.push(msg);
139        }
140    }
141}
142
143impl<T> ReplyHandle<T> {
144    /// Respond back to a received message.
145    pub async fn respond(&self, r: T) -> Result<(), Error> {
146        Ok(self.0.send(r).await?)
147    }
148}
149
150impl<M: Message> MessageHandle<M> {
151    fn into_tuple(self) -> (M, ReplyHandle<M::Response>) {
152        (self.msg, self.sndr)
153    }
154}
155
156/// Encapsulate the errors which can be triggered when sending or receiving a
157/// message.
158#[derive(Debug, derive_more::Display, derive_more::Error, derive_more::From)]
159pub enum Error {
160    /// Error while sending the message.
161    SendError,
162
163    /// Error to receive the response of sent message.
164    #[from(ignore)]
165    ReplayError(channel::RecvError),
166
167    /// Error while receiving the message.
168    ReceivError(channel::RecvError),
169}
170
171impl<T> From<channel::SendError<T>> for Error {
172    fn from(_: channel::SendError<T>) -> Self {
173        // The original error from async_std::channel::Sender carries the undelivered
174        // message for recovery. However here we want to avoid raising the arity of
175        // the Error type, losing that ability but making the error type more
176        // permissive
177        Error::SendError
178    }
179}