tokio_bichannel/lib.rs
1use tokio::sync::mpsc::{channel as create_channel, Receiver, Sender};
2use tokio::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
3
4/// A bidirectional channel structure that supports sending and receiving messages.
5///
6/// # Examples
7///
8/// ```
9/// let (mut l, mut r) = channel::<String, String>(10);
10///
11/// l.send("Hello from chan1".to_string()).await.unwrap();
12/// r.send("Hello from chan2".to_string()).await.unwrap();
13///
14/// assert_eq!(r.recv().await.unwrap(), "Hello from chan1");
15/// assert_eq!(l.recv().await.unwrap(), "Hello from chan2");
16/// ```
17#[derive(Debug)]
18pub struct Channel<S, R> {
19 sender: Sender<S>,
20 receiver: Receiver<R>,
21}
22
23impl<S, R> Channel<S, R> {
24 /// Sends a message through the channel.
25 ///
26 /// # Arguments
27 ///
28 /// * `s` - The message to send.
29 ///
30 /// # Returns
31 ///
32 /// * `Result<(), SendError<S>>` - Returns `Ok(())` if the message was sent successfully, or an error if it wasn't.
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// channel.send("Hello".to_string()).await.unwrap();
38 /// ```
39 pub async fn send(&self, s: S) -> Result<(), SendError<S>> {
40 self.sender.send(s).await
41 }
42
43 /// Receives a message from the channel.
44 ///
45 /// # Returns
46 ///
47 /// * `Option<R>` - Returns `Some(message)` if a message was received, or `None` if the channel is closed.
48 ///
49 /// # Examples
50 ///
51 /// ```
52 /// while let Some(msg) = channel.recv().await {
53 /// println!("Received: {}", msg);
54 /// }
55 /// ```
56 pub async fn recv(&mut self) -> Option<R> {
57 self.receiver.recv().await
58 }
59
60 /// Attempts to send a message through the channel without blocking.
61 ///
62 /// # Arguments
63 ///
64 /// * `s` - The message to send.
65 ///
66 /// # Returns
67 ///
68 /// * `Result<(), SendError<S>>` - Returns `Ok(())` if the message was sent, or an error if the channel is full or closed.
69 ///
70 /// # Examples
71 ///
72 /// ```
73 /// match channel.try_send("Hello".to_string()) {
74 /// Ok(_) => println!("Message sent"),
75 /// Err(e) => println!("Error: {:?}", e),
76 /// }
77 /// ```
78 pub fn try_send(&self, s: S) -> Result<(), TrySendError<S>> {
79 self.sender.try_send(s)
80 }
81
82 /// Attempts to receive a message from the channel without blocking.
83 ///
84 /// # Returns
85 ///
86 /// * `Result<R, TryRecvError>` - Returns `Ok(message)` if a message was received, or an error if the channel is empty or closed.
87 ///
88 /// # Examples
89 /// ```
90 /// match channel.try_recv() {
91 /// Ok(msg) => println!("Received: {}", msg),
92 /// Err(e) => println!("Error: {:?}", e),
93 /// }
94 /// ```
95 pub fn try_recv(&mut self) -> Result<R, TryRecvError> {
96 self.receiver.try_recv()
97 }
98}
99
100/// Creates a bidirectional channel with the specified buffer size.
101///
102/// # Arguments
103///
104/// * `buffer_size` - The size of the buffer for the channel.
105///
106/// # Returns
107///
108/// * `(Channel<T, U>, Channel<U, T>)` - Returns a tuple of two `Channel` instances.
109///
110/// # Examples
111///
112/// ```
113/// use tokio_bichannel::{channel, Channel};
114///
115/// #[tokio::main]
116/// async fn main() {
117/// let (mut chan1, mut chan2) = channel::<String, String>(10);
118///
119/// chan1.send("Hello from chan1".to_string()).await.unwrap();
120/// chan2.send("Hello from chan2".to_string()).await.unwrap();
121///
122/// let msg1 = chan2.recv().await.unwrap();
123/// let msg2 = chan1.recv().await.unwrap();
124///
125/// assert_eq!(msg1, "Hello from chan1");
126/// assert_eq!(msg2, "Hello from chan2");
127/// }
128/// ```
129pub fn channel<T, U>(buffer_size: usize) -> (Channel<T, U>, Channel<U, T>) {
130 let (ls, lr) = create_channel(buffer_size);
131 let (rs, rr) = create_channel(buffer_size);
132
133 (
134 Channel {
135 sender: ls,
136 receiver: rr,
137 },
138 Channel {
139 sender: rs,
140 receiver: lr,
141 },
142 )
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use tokio::runtime::Runtime;
149 use std::thread;
150
151 #[tokio::test]
152 async fn test_send_recv() {
153 let (mut chan1, mut chan2) = channel::<String, String>(10);
154
155 chan1.send("Hello from chan1".to_string()).await.unwrap();
156 chan2.send("Hello from chan2".to_string()).await.unwrap();
157
158 let msg1 = chan2.recv().await.unwrap();
159 let msg2 = chan1.recv().await.unwrap();
160
161 assert_eq!(msg1, "Hello from chan1");
162 assert_eq!(msg2, "Hello from chan2");
163 }
164
165 #[tokio::test]
166 async fn test_try_recv() {
167 let (mut chan1, mut chan2) = channel::<String, String>(10);
168
169 chan1.send("Hello from chan1".to_string()).await.unwrap();
170
171 let msg1 = chan2.try_recv().unwrap();
172 assert_eq!(msg1, "Hello from chan1");
173
174 let try_recv_result = chan2.try_recv();
175 assert!(try_recv_result.is_err());
176 }
177
178 #[tokio::test]
179 async fn test_threading() {
180 let (mut chan1, mut chan2) = channel::<String, String>(10);
181
182 let handle = thread::spawn(move || {
183 let rt = Runtime::new().unwrap();
184 rt.block_on(async {
185 chan1.send("Hello from chan1".to_string()).await.unwrap();
186 let msg = chan1.recv().await.unwrap();
187 assert_eq!(msg, "Hello from chan2");
188 });
189 });
190
191 chan2.send("Hello from chan2".to_string()).await.unwrap();
192 let msg = chan2.recv().await.unwrap();
193 assert_eq!(msg, "Hello from chan1");
194
195 handle.join().unwrap();
196 }
197}