bidirectional_channel/lib.rs
1#![deny(missing_docs)]
2#![deny(unused)]
3//! An async channel with request-response semantics.
4//! When a [`Responder`] is asked to receive a request, it returns a [`ReceivedRequest`],
5//! which should be used to communicate back to the sender
6//!
7//! ```
8//! # use futures::join;
9//! # async_std::task::block_on( async {
10//! use bidirectional_channel::{bounded};
11//! let (requester, responder) = bounded(1);
12//! let requester = async { requester.send("hello").await.unwrap() };
13//! let responder = async {
14//! let request = responder.recv().await.unwrap();
15//! let len = request.len();
16//! request.respond(len).unwrap()
17//! };
18//! let (response, request) = join!(requester, responder);
19//! assert!(request.len() == response)
20//! # })
21//! ```
22
23use async_std::channel;
24/// An [`async_std::channel::Receiver`] which receives an [`UnRespondedRequest<Req, Resp>`] instead of a `Req`.
25pub use async_std::channel::Receiver as Responder;
26use derive_more::{AsMut, AsRef, Deref, DerefMut};
27use futures::channel::oneshot;
28use std::fmt::Debug;
29#[allow(unused_imports)]
30use std::ops::{Deref, DerefMut}; // Doc links
31use thiserror::Error;
32
33/// Error returned when sending a request
34#[derive(Error)]
35pub enum SendRequestError<Req> {
36 /// The [`Responder`] for this channel was dropped.
37 /// Returns ownership of the `Req` that failed to send
38 #[error("The Responder was dropped before the message was sent")]
39 Closed(Req),
40 /// The [`UnRespondedRequest`] for this request was dropped.
41 #[error("The UnRespondedRequest was dropped, not responded to")]
42 Ignored,
43}
44impl<Req> Debug for SendRequestError<Req> {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::Closed(_) => write!(f, "Closed(..)"),
48 Self::Ignored => write!(f, "Cancelled"),
49 }
50 }
51}
52
53/// Represents that the [`Requester`] associated with this communication is still waiting for a response.
54#[must_use = "You must respond to the request"]
55pub struct UnRespondedRequest<Resp> {
56 response_sender: oneshot::Sender<Resp>,
57}
58impl<Resp> UnRespondedRequest<Resp> {
59 /// Respond to the [`Requester`]'s request.
60 /// Fails if the associated [`Requester`] was dropped, and returns your response back
61 pub fn respond(self, response: Resp) -> Result<(), Resp> {
62 self.response_sender.send(response)
63 }
64}
65
66/// Represents the request.
67/// This implements [`AsRef`] and [`AsMut`] for the request itself for explicit use.
68/// Alternatively, you may use [`Deref`] and [`DerefMut`] either explicitly, or coerced.
69/// Must be used by calling [`ReceivedRequest::respond`], or destructured.
70#[must_use = "You must respond to the request"]
71#[derive(AsRef, AsMut, Deref, DerefMut)]
72pub struct ReceivedRequest<Req, Resp> {
73 /// The request itself
74 #[as_ref]
75 #[as_mut]
76 #[deref]
77 #[deref_mut]
78 pub request: Req,
79 /// Handle to respond to the [`Requester`]
80 pub unresponded: UnRespondedRequest<Resp>,
81}
82
83impl<Req, Resp> ReceivedRequest<Req, Resp> {
84 /// Respond to the [`Requester`]'s request, and take ownership of it
85 /// Fails if the associated [`Requester`] was dropped, and returns your response back
86 pub fn respond(self, response: Resp) -> Result<Req, (Req, Resp)> {
87 match self.unresponded.respond(response) {
88 Ok(_) => Ok(self.request),
89 Err(response) => Err((self.request, response)),
90 }
91 }
92}
93/// Represents the initiator for the request-response exchange
94#[derive(Clone)]
95pub struct Requester<Req, Resp> {
96 outgoing: channel::Sender<ReceivedRequest<Req, Resp>>,
97}
98
99impl<Req, Resp> Requester<Req, Resp> {
100 /// Make a request.
101 /// `await` the result to receive the response.
102 pub async fn send(&self, request: Req) -> Result<Resp, SendRequestError<Req>> {
103 // Create the return path
104 let (response_sender, response_receiver) = oneshot::channel();
105 self.outgoing
106 .send(ReceivedRequest {
107 request,
108 unresponded: UnRespondedRequest { response_sender },
109 })
110 .await
111 .map_err(|e| SendRequestError::Closed(e.into_inner().request))?;
112 let response = response_receiver
113 .await
114 .map_err(|_| SendRequestError::Ignored)?;
115 Ok(response)
116 }
117}
118
119/// Create a bounded [`Requester`]-[`Responder`] pair.
120/// That is, once the channel is full, future senders will yield when awaiting until there's space again
121pub fn bounded<Req, Resp>(
122 capacity: usize,
123) -> (Requester<Req, Resp>, Responder<ReceivedRequest<Req, Resp>>) {
124 let (sender, receiver) = channel::bounded(capacity);
125 (Requester { outgoing: sender }, receiver)
126}