response_channel/
lib.rs

1#![deny(missing_docs)]
2
3//! # Response Channel:
4//! A wrapper crate (around the [`tokio::sync::mpsc`] channels) allowing for
5//! bidirectional communication. One can simply create a bidrectional channel by
6//! hand and manage all of the boilerplate in setting it up and handling
7//! responses, but that gets cumbersome quite quickly (especially if many
8//! bidirectional channels are required).
9//!
10//! This crate allows the developer to create a simple bidirectional
11//! response channel using the same API as creating a [`tokio::sync::mpsc`]
12//! channel (i.e., just a simple function call).
13//!
14//! ### Example:
15//! ```rust
16//! # tokio_test::block_on(async {
17//! const BUFFER_SIZE: usize = 10;
18//!
19//! type Message = u8;
20//! type Response = bool;
21//!
22//! let (mut tx, mut rx) = response_channel::channel::<Message, Response>(BUFFER_SIZE);
23//!
24//! tokio::task::spawn(async move {
25//!     // send the initial message and await for a response.
26//!     let response = tx.send_await_automatic(100).await.unwrap().unwrap();
27//!     assert!(response);
28//! });
29//!
30//! // receive a message and destructure it into the actual message and reverse transmission line
31//! // (the reverse transmission line is how you send the response back to the caller!)
32//! let (message, mut reverse_tx) = rx.recv().await.unwrap().unwrap();
33//! let response = message >= 5;
34//! reverse_tx.send(response).await.unwrap();
35//! # });
36//! ```
37
38/// The error type for this crate.
39pub mod error;
40
41use std::ops::Deref;
42
43use tokio::sync::mpsc;
44
45/// Creates a new permament, bidirectional response channel.
46///
47/// ### Notes:
48/// The reverse channel is implemented using a [`tokio::sync::mpsc`] channel,
49/// which means that it can support multiple messages. Namely, it does not need
50/// to be reinstantiated after a response has been sent.
51///
52/// An important thing to note is that [`Sender`] *can* be cloned!
53/// When cloning occurs, the same forward transmission line is cloned to the new
54/// struct, *but a new response channel is created*. This means that messages
55/// are local to the specific sender!
56///
57/// ### Arguments:
58/// - `buffer`: The size of the forward channel.
59/// - `reverse_buffer`: The size of the reverse channel. If this is [`None`],
60///   `buffer` will be used.
61///
62/// ### Examples:
63/// ```rust
64/// # tokio_test::block_on(async {
65/// const BUFFER_SIZE: usize = 100;
66///
67/// let (mut tx, mut rx) = response_channel::channel::<u8, bool>(BUFFER_SIZE, None);
68///
69/// tokio::task::spawn(async move {
70///     for i in 0..10 {
71///         let response = tx.send_await_automatic(i).await.unwrap().unwrap();
72///         assert_eq!(response, i >= 5);
73///     };
74/// });
75///
76/// while let Some((message, tx)) = rx.recv().await {
77///     let response = message >= 5;
78///     tx.send(response).await.unwrap();
79/// };
80/// # });
81/// ```
82pub fn channel<M, R>(
83    buffer: usize,
84    reverse_buffer: Option<usize>,
85) -> (Sender<M, R>, mpsc::Receiver<(M, mpsc::Sender<R>)>) {
86    let (tx, rx) = mpsc::channel(buffer);
87    let (reverse_tx, reverse_rx) =
88        mpsc::channel(reverse_buffer.unwrap_or(buffer));
89    (
90        Sender {
91            tx,
92            reverse_tx,
93            reverse_rx,
94        },
95        rx,
96    )
97}
98
99/// The [`Sender`] type which contains the necessary information to provide a
100/// bidirectional response channel.
101#[cfg_attr(not(release), derive(Debug))]
102pub struct Sender<M, R> {
103    pub(crate) tx: mpsc::Sender<(M, mpsc::Sender<R>)>,
104    pub(crate) reverse_tx: mpsc::Sender<R>,
105    pub(crate) reverse_rx: mpsc::Receiver<R>,
106}
107
108impl<M, R> Sender<M, R> {
109    /// Sends the given message to the receiver.
110    ///
111    /// ### Arguments:
112    /// - `message`: The message that needs to be sent.
113    ///
114    /// ### Notes:
115    /// This function does *not* try to receive the response!
116    /// The user must do this explicitly if they are required to read the
117    /// response.
118    ///
119    /// ### Example:
120    /// ```rust
121    /// # tokio_test::block_on(async {
122    /// type Message = u8;
123    /// type Response = bool;
124    /// # let (mut tx, rx) = response_channel::channel::<Message, Response>(100, None);
125    ///
126    /// // sends the first message (but does not eagerly await the response!)
127    /// tx.send_await(10).await.unwrap();
128    ///
129    /// // sends another message (but once again does not eagerly await the response!)
130    /// tx.send_await(11).await.unwrap();
131    /// # drop(rx);
132    /// # });
133    /// ```
134    ///
135    /// If you wish read the responses, please refer to [`Sender::recv`].
136    pub async fn send_await(&self, message: M) -> Result<(), error::Error<M>> {
137        self.tx
138            .send((message, self.reverse_tx.clone()))
139            .await
140            .map_err(|mpsc::error::SendError((m, _))| {
141                mpsc::error::SendError(m)
142            })?;
143        Ok(())
144    }
145
146    /// Sends the given message to the receiver.
147    ///
148    /// ### Arguments:
149    /// - `message`: The message that needs to be sent.
150    ///
151    /// ### Notes:
152    /// This function will send the message and then *automatically listen for
153    /// the response right away*. It is equivalent to calling
154    /// [`Sender::send_await`] followed immediately by [`Sender::recv`].
155    ///
156    /// ### Example:
157    /// ```rust
158    /// # tokio_test::block_on(async move {
159    /// // for example, consider the two type aliases:
160    /// type Message = u8;
161    /// type Response = u8;
162    /// # let (mut tx, mut rx) = response_channel::channel::<Message, Response>(1, None);
163    ///
164    /// let fut1 = tokio::task::spawn(async move {
165    ///     let message = 100;
166    ///     let response = tx.send_await_automatic(message).await.unwrap().unwrap();
167    ///     assert_eq!(response, message + 1);
168    /// });
169    ///
170    /// let fut2 = tokio::task::spawn(async move {
171    ///     while let Some((message, tx)) = rx.recv().await {
172    ///         let response = message + 1;
173    ///         tx.send(response).await.unwrap();
174    ///     };
175    /// });
176    ///
177    /// # let (res1, res2) = tokio::join!(fut1, fut2);
178    /// # res1.unwrap();
179    /// # res2.unwrap();
180    /// # });
181    /// ```
182    ///
183    /// This is usually the most common usage (i.e., waiting for the response
184    /// right away).
185    pub async fn send_await_automatic(
186        &mut self,
187        message: M,
188    ) -> Result<Option<R>, error::Error<M>> {
189        self.send_await(message).await?;
190        let response = self.reverse_rx.recv().await;
191        Ok(response)
192    }
193
194    /// Receives a message from the reverse channel.
195    ///
196    /// ### Example:
197    /// ```rust
198    /// # tokio_test::block_on(async move {
199    /// // for example, consider the two type aliases:
200    /// type Message = u8;
201    /// type Response = u8;
202    /// # let (mut tx, mut rx) = response_channel::channel::<Message, Response>(1, None);
203    ///
204    /// let fut1 = tokio::task::spawn(async move {
205    ///     let message1 = 100;
206    ///     let message2 = 100;
207    ///     tx.send_await(message1).await.unwrap();
208    ///     tx.send_await(message2).await.unwrap();
209    ///
210    ///     let response1 = tx.recv().await.unwrap();
211    ///     let response2 = tx.recv().await.unwrap();
212    ///     assert_eq!(response1, message1 + 1);
213    ///     assert_eq!(response2, message2 + 1);
214    /// });
215    ///
216    /// let fut2 = tokio::task::spawn(async move {
217    ///     while let Some((message, tx)) = rx.recv().await {
218    ///         let response = message + 1;
219    ///         tx.send(response).await.unwrap();
220    ///     };
221    /// });
222    ///
223    /// # let (res1, res2) = tokio::join!(fut1, fut2);
224    /// # res1.unwrap();
225    /// # res2.unwrap();
226    /// # });
227    /// ```
228    pub async fn recv(&mut self) -> Option<R> {
229        self.reverse_rx.recv().await
230    }
231}
232
233impl<M, R> Clone for Sender<M, R> {
234    fn clone(&self) -> Self {
235        let reverse_buffer = self.reverse_tx.max_capacity();
236        let (reverse_tx, reverse_rx) = mpsc::channel(reverse_buffer);
237        Self {
238            tx: self.tx.clone(),
239            reverse_tx,
240            reverse_rx,
241        }
242    }
243}
244
245impl<M, R> Deref for Sender<M, R> {
246    type Target = mpsc::Sender<(M, mpsc::Sender<R>)>;
247
248    fn deref(&self) -> &Self::Target {
249        &self.tx
250    }
251}