response_channel/lib.rs
1#![deny(missing_docs)]
2
3//! # Response Channel:
4//! A wrapper crate (around the [`tokio::sync::mpsc`] channels) allowing for
5//! bidirectional communication. One can simply create a bidrectional channel by
6//! hand and manage all of the boilerplate in setting it up and handling
7//! responses, but that gets cumbersome quite quickly (especially if many
8//! bidirectional channels are required).
9//!
10//! This crate allows the developer to create a simple bidirectional
11//! response channel using the same API as creating a [`tokio::sync::mpsc`]
12//! channel (i.e., just a simple function call).
13//!
14//! ### Example:
15//! ```rust
16//! # tokio_test::block_on(async {
17//! const BUFFER_SIZE: usize = 10;
18//!
19//! type Message = u8;
20//! type Response = bool;
21//!
22//! let (mut tx, mut rx) = response_channel::channel::<Message, Response>(BUFFER_SIZE);
23//!
24//! tokio::task::spawn(async move {
25//! // send the initial message and await for a response.
26//! let response = tx.send_await_automatic(100).await.unwrap().unwrap();
27//! assert!(response);
28//! });
29//!
30//! // receive a message and destructure it into the actual message and reverse transmission line
31//! // (the reverse transmission line is how you send the response back to the caller!)
32//! let (message, mut reverse_tx) = rx.recv().await.unwrap().unwrap();
33//! let response = message >= 5;
34//! reverse_tx.send(response).await.unwrap();
35//! # });
36//! ```
37
38/// The error type for this crate.
39pub mod error;
40
41use std::ops::Deref;
42
43use tokio::sync::mpsc;
44
45/// Creates a new permament, bidirectional response channel.
46///
47/// ### Notes:
48/// The reverse channel is implemented using a [`tokio::sync::mpsc`] channel,
49/// which means that it can support multiple messages. Namely, it does not need
50/// to be reinstantiated after a response has been sent.
51///
52/// An important thing to note is that [`Sender`] *can* be cloned!
53/// When cloning occurs, the same forward transmission line is cloned to the new
54/// struct, *but a new response channel is created*. This means that messages
55/// are local to the specific sender!
56///
57/// ### Arguments:
58/// - `buffer`: The size of the forward channel.
59/// - `reverse_buffer`: The size of the reverse channel. If this is [`None`],
60/// `buffer` will be used.
61///
62/// ### Examples:
63/// ```rust
64/// # tokio_test::block_on(async {
65/// const BUFFER_SIZE: usize = 100;
66///
67/// let (mut tx, mut rx) = response_channel::channel::<u8, bool>(BUFFER_SIZE, None);
68///
69/// tokio::task::spawn(async move {
70/// for i in 0..10 {
71/// let response = tx.send_await_automatic(i).await.unwrap().unwrap();
72/// assert_eq!(response, i >= 5);
73/// };
74/// });
75///
76/// while let Some((message, tx)) = rx.recv().await {
77/// let response = message >= 5;
78/// tx.send(response).await.unwrap();
79/// };
80/// # });
81/// ```
82pub fn channel<M, R>(
83 buffer: usize,
84 reverse_buffer: Option<usize>,
85) -> (Sender<M, R>, mpsc::Receiver<(M, mpsc::Sender<R>)>) {
86 let (tx, rx) = mpsc::channel(buffer);
87 let (reverse_tx, reverse_rx) =
88 mpsc::channel(reverse_buffer.unwrap_or(buffer));
89 (
90 Sender {
91 tx,
92 reverse_tx,
93 reverse_rx,
94 },
95 rx,
96 )
97}
98
99/// The [`Sender`] type which contains the necessary information to provide a
100/// bidirectional response channel.
101#[cfg_attr(not(release), derive(Debug))]
102pub struct Sender<M, R> {
103 pub(crate) tx: mpsc::Sender<(M, mpsc::Sender<R>)>,
104 pub(crate) reverse_tx: mpsc::Sender<R>,
105 pub(crate) reverse_rx: mpsc::Receiver<R>,
106}
107
108impl<M, R> Sender<M, R> {
109 /// Sends the given message to the receiver.
110 ///
111 /// ### Arguments:
112 /// - `message`: The message that needs to be sent.
113 ///
114 /// ### Notes:
115 /// This function does *not* try to receive the response!
116 /// The user must do this explicitly if they are required to read the
117 /// response.
118 ///
119 /// ### Example:
120 /// ```rust
121 /// # tokio_test::block_on(async {
122 /// type Message = u8;
123 /// type Response = bool;
124 /// # let (mut tx, rx) = response_channel::channel::<Message, Response>(100, None);
125 ///
126 /// // sends the first message (but does not eagerly await the response!)
127 /// tx.send_await(10).await.unwrap();
128 ///
129 /// // sends another message (but once again does not eagerly await the response!)
130 /// tx.send_await(11).await.unwrap();
131 /// # drop(rx);
132 /// # });
133 /// ```
134 ///
135 /// If you wish read the responses, please refer to [`Sender::recv`].
136 pub async fn send_await(&self, message: M) -> Result<(), error::Error<M>> {
137 self.tx
138 .send((message, self.reverse_tx.clone()))
139 .await
140 .map_err(|mpsc::error::SendError((m, _))| {
141 mpsc::error::SendError(m)
142 })?;
143 Ok(())
144 }
145
146 /// Sends the given message to the receiver.
147 ///
148 /// ### Arguments:
149 /// - `message`: The message that needs to be sent.
150 ///
151 /// ### Notes:
152 /// This function will send the message and then *automatically listen for
153 /// the response right away*. It is equivalent to calling
154 /// [`Sender::send_await`] followed immediately by [`Sender::recv`].
155 ///
156 /// ### Example:
157 /// ```rust
158 /// # tokio_test::block_on(async move {
159 /// // for example, consider the two type aliases:
160 /// type Message = u8;
161 /// type Response = u8;
162 /// # let (mut tx, mut rx) = response_channel::channel::<Message, Response>(1, None);
163 ///
164 /// let fut1 = tokio::task::spawn(async move {
165 /// let message = 100;
166 /// let response = tx.send_await_automatic(message).await.unwrap().unwrap();
167 /// assert_eq!(response, message + 1);
168 /// });
169 ///
170 /// let fut2 = tokio::task::spawn(async move {
171 /// while let Some((message, tx)) = rx.recv().await {
172 /// let response = message + 1;
173 /// tx.send(response).await.unwrap();
174 /// };
175 /// });
176 ///
177 /// # let (res1, res2) = tokio::join!(fut1, fut2);
178 /// # res1.unwrap();
179 /// # res2.unwrap();
180 /// # });
181 /// ```
182 ///
183 /// This is usually the most common usage (i.e., waiting for the response
184 /// right away).
185 pub async fn send_await_automatic(
186 &mut self,
187 message: M,
188 ) -> Result<Option<R>, error::Error<M>> {
189 self.send_await(message).await?;
190 let response = self.reverse_rx.recv().await;
191 Ok(response)
192 }
193
194 /// Receives a message from the reverse channel.
195 ///
196 /// ### Example:
197 /// ```rust
198 /// # tokio_test::block_on(async move {
199 /// // for example, consider the two type aliases:
200 /// type Message = u8;
201 /// type Response = u8;
202 /// # let (mut tx, mut rx) = response_channel::channel::<Message, Response>(1, None);
203 ///
204 /// let fut1 = tokio::task::spawn(async move {
205 /// let message1 = 100;
206 /// let message2 = 100;
207 /// tx.send_await(message1).await.unwrap();
208 /// tx.send_await(message2).await.unwrap();
209 ///
210 /// let response1 = tx.recv().await.unwrap();
211 /// let response2 = tx.recv().await.unwrap();
212 /// assert_eq!(response1, message1 + 1);
213 /// assert_eq!(response2, message2 + 1);
214 /// });
215 ///
216 /// let fut2 = tokio::task::spawn(async move {
217 /// while let Some((message, tx)) = rx.recv().await {
218 /// let response = message + 1;
219 /// tx.send(response).await.unwrap();
220 /// };
221 /// });
222 ///
223 /// # let (res1, res2) = tokio::join!(fut1, fut2);
224 /// # res1.unwrap();
225 /// # res2.unwrap();
226 /// # });
227 /// ```
228 pub async fn recv(&mut self) -> Option<R> {
229 self.reverse_rx.recv().await
230 }
231}
232
233impl<M, R> Clone for Sender<M, R> {
234 fn clone(&self) -> Self {
235 let reverse_buffer = self.reverse_tx.max_capacity();
236 let (reverse_tx, reverse_rx) = mpsc::channel(reverse_buffer);
237 Self {
238 tx: self.tx.clone(),
239 reverse_tx,
240 reverse_rx,
241 }
242 }
243}
244
245impl<M, R> Deref for Sender<M, R> {
246 type Target = mpsc::Sender<(M, mpsc::Sender<R>)>;
247
248 fn deref(&self) -> &Self::Target {
249 &self.tx
250 }
251}