bidirectional_channel/
lib.rs

1#![deny(missing_docs)]
2#![deny(unused)]
3//! An async channel with request-response semantics.  
4//! When a [`Responder`] is asked to receive a request, it returns a [`ReceivedRequest`],
5//! which should be used to communicate back to the sender
6//!
7//! ```
8//! # use futures::join;
9//! # async_std::task::block_on( async {
10//! use bidirectional_channel::{bounded};
11//! let (requester, responder) = bounded(1);
12//! let requester = async { requester.send("hello").await.unwrap() };
13//! let responder = async {
14//!     let request = responder.recv().await.unwrap();
15//!     let len = request.len();
16//!     request.respond(len).unwrap()
17//! };
18//! let (response, request) = join!(requester, responder);
19//! assert!(request.len() == response)
20//! # })
21//! ```
22
23use async_std::channel;
24/// An [`async_std::channel::Receiver`] which receives an [`UnRespondedRequest<Req, Resp>`] instead of a `Req`.
25pub use async_std::channel::Receiver as Responder;
26use derive_more::{AsMut, AsRef, Deref, DerefMut};
27use futures::channel::oneshot;
28use std::fmt::Debug;
29#[allow(unused_imports)]
30use std::ops::{Deref, DerefMut}; // Doc links
31use thiserror::Error;
32
33/// Error returned when sending a request
34#[derive(Error)]
35pub enum SendRequestError<Req> {
36    /// The [`Responder`] for this channel was dropped.
37    /// Returns ownership of the `Req` that failed to send
38    #[error("The Responder was dropped before the message was sent")]
39    Closed(Req),
40    /// The [`UnRespondedRequest`] for this request was dropped.
41    #[error("The UnRespondedRequest was dropped, not responded to")]
42    Ignored,
43}
44impl<Req> Debug for SendRequestError<Req> {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Self::Closed(_) => write!(f, "Closed(..)"),
48            Self::Ignored => write!(f, "Cancelled"),
49        }
50    }
51}
52
53/// Represents that the [`Requester`] associated with this communication is still waiting for a response.
54#[must_use = "You must respond to the request"]
55pub struct UnRespondedRequest<Resp> {
56    response_sender: oneshot::Sender<Resp>,
57}
58impl<Resp> UnRespondedRequest<Resp> {
59    /// Respond to the [`Requester`]'s request.
60    /// Fails if the associated [`Requester`] was dropped, and returns your response back
61    pub fn respond(self, response: Resp) -> Result<(), Resp> {
62        self.response_sender.send(response)
63    }
64}
65
66/// Represents the request.
67/// This implements [`AsRef`] and [`AsMut`] for the request itself for explicit use.
68/// Alternatively, you may use [`Deref`] and [`DerefMut`] either explicitly, or coerced.
69/// Must be used by calling [`ReceivedRequest::respond`], or destructured.
70#[must_use = "You must respond to the request"]
71#[derive(AsRef, AsMut, Deref, DerefMut)]
72pub struct ReceivedRequest<Req, Resp> {
73    /// The request itself
74    #[as_ref]
75    #[as_mut]
76    #[deref]
77    #[deref_mut]
78    pub request: Req,
79    /// Handle to respond to the [`Requester`]
80    pub unresponded: UnRespondedRequest<Resp>,
81}
82
83impl<Req, Resp> ReceivedRequest<Req, Resp> {
84    /// Respond to the [`Requester`]'s request, and take ownership of it
85    /// Fails if the associated [`Requester`] was dropped, and returns your response back
86    pub fn respond(self, response: Resp) -> Result<Req, (Req, Resp)> {
87        match self.unresponded.respond(response) {
88            Ok(_) => Ok(self.request),
89            Err(response) => Err((self.request, response)),
90        }
91    }
92}
93/// Represents the initiator for the request-response exchange
94#[derive(Clone)]
95pub struct Requester<Req, Resp> {
96    outgoing: channel::Sender<ReceivedRequest<Req, Resp>>,
97}
98
99impl<Req, Resp> Requester<Req, Resp> {
100    /// Make a request.
101    /// `await` the result to receive the response.
102    pub async fn send(&self, request: Req) -> Result<Resp, SendRequestError<Req>> {
103        // Create the return path
104        let (response_sender, response_receiver) = oneshot::channel();
105        self.outgoing
106            .send(ReceivedRequest {
107                request,
108                unresponded: UnRespondedRequest { response_sender },
109            })
110            .await
111            .map_err(|e| SendRequestError::Closed(e.into_inner().request))?;
112        let response = response_receiver
113            .await
114            .map_err(|_| SendRequestError::Ignored)?;
115        Ok(response)
116    }
117}
118
119/// Create a bounded [`Requester`]-[`Responder`] pair.  
120/// That is, once the channel is full, future senders will yield when awaiting until there's space again
121pub fn bounded<Req, Resp>(
122    capacity: usize,
123) -> (Requester<Req, Resp>, Responder<ReceivedRequest<Req, Resp>>) {
124    let (sender, receiver) = channel::bounded(capacity);
125    (Requester { outgoing: sender }, receiver)
126}