tokio_bichannel/
lib.rs

1use tokio::sync::mpsc::{channel as create_channel, Receiver, Sender};
2use tokio::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
3
4/// A bidirectional channel structure that supports sending and receiving messages.
5/// 
6/// # Examples
7/// 
8/// ```
9/// let (mut l, mut r) = channel::<String, String>(10);
10/// 
11/// l.send("Hello from chan1".to_string()).await.unwrap();
12/// r.send("Hello from chan2".to_string()).await.unwrap();
13/// 
14/// assert_eq!(r.recv().await.unwrap(), "Hello from chan1");
15/// assert_eq!(l.recv().await.unwrap(), "Hello from chan2");
16/// ```
17#[derive(Debug)]
18pub struct Channel<S, R> {
19    sender: Sender<S>,
20    receiver: Receiver<R>,
21}
22
23impl<S, R> Channel<S, R> {
24    /// Sends a message through the channel.
25    /// 
26    /// # Arguments
27    /// 
28    /// * `s` - The message to send.
29    /// 
30    /// # Returns
31    /// 
32    /// * `Result<(), SendError<S>>` - Returns `Ok(())` if the message was sent successfully, or an error if it wasn't.
33    /// 
34    /// # Examples
35    /// 
36    /// ```
37    /// channel.send("Hello".to_string()).await.unwrap();
38    /// ```
39    pub async fn send(&self, s: S) -> Result<(), SendError<S>> {
40        self.sender.send(s).await
41    }
42
43    /// Receives a message from the channel.
44    /// 
45    /// # Returns
46    /// 
47    /// * `Option<R>` - Returns `Some(message)` if a message was received, or `None` if the channel is closed.
48    /// 
49    /// # Examples
50    /// 
51    /// ```
52    /// while let Some(msg) = channel.recv().await {
53    ///     println!("Received: {}", msg);
54    /// }
55    /// ```
56    pub async fn recv(&mut self) -> Option<R> {
57        self.receiver.recv().await
58    }
59
60    /// Attempts to send a message through the channel without blocking.
61    /// 
62    /// # Arguments
63    /// 
64    /// * `s` - The message to send.
65    /// 
66    /// # Returns
67    /// 
68    /// * `Result<(), SendError<S>>` - Returns `Ok(())` if the message was sent, or an error if the channel is full or closed.
69    /// 
70    /// # Examples
71    /// 
72    /// ```
73    /// match channel.try_send("Hello".to_string()) {
74    ///    Ok(_) => println!("Message sent"),
75    ///    Err(e) => println!("Error: {:?}", e),
76    /// }
77    /// ```
78    pub fn try_send(&self, s: S) -> Result<(), TrySendError<S>> {
79        self.sender.try_send(s)
80    }
81
82    /// Attempts to receive a message from the channel without blocking.
83    /// 
84    /// # Returns
85    /// 
86    /// * `Result<R, TryRecvError>` - Returns `Ok(message)` if a message was received, or an error if the channel is empty or closed.
87    /// 
88    /// # Examples
89    /// ```
90    /// match channel.try_recv() {
91    ///     Ok(msg) => println!("Received: {}", msg),
92    ///     Err(e) => println!("Error: {:?}", e),
93    /// }
94    /// ```
95    pub fn try_recv(&mut self) -> Result<R, TryRecvError> {
96        self.receiver.try_recv()
97    }
98}
99
100/// Creates a bidirectional channel with the specified buffer size.
101/// 
102/// # Arguments
103/// 
104/// * `buffer_size` - The size of the buffer for the channel.
105/// 
106/// # Returns
107/// 
108/// * `(Channel<T, U>, Channel<U, T>)` - Returns a tuple of two `Channel` instances.
109/// 
110/// # Examples
111/// 
112/// ```
113/// use tokio_bichannel::{channel, Channel};
114/// 
115/// #[tokio::main]
116/// async fn main() {
117///     let (mut chan1, mut chan2) = channel::<String, String>(10);
118/// 
119///     chan1.send("Hello from chan1".to_string()).await.unwrap();
120///     chan2.send("Hello from chan2".to_string()).await.unwrap();
121/// 
122///     let msg1 = chan2.recv().await.unwrap();
123///     let msg2 = chan1.recv().await.unwrap();
124/// 
125///     assert_eq!(msg1, "Hello from chan1");
126///     assert_eq!(msg2, "Hello from chan2");
127/// }
128/// ```
129pub fn channel<T, U>(buffer_size: usize) -> (Channel<T, U>, Channel<U, T>) {
130    let (ls, lr) = create_channel(buffer_size);
131    let (rs, rr) = create_channel(buffer_size);
132
133    (
134        Channel {
135            sender: ls,
136            receiver: rr,
137        },
138        Channel {
139            sender: rs,
140            receiver: lr,
141        },
142    )
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use tokio::runtime::Runtime;
149    use std::thread;
150
151    #[tokio::test]
152    async fn test_send_recv() {
153        let (mut chan1, mut chan2) = channel::<String, String>(10);
154
155        chan1.send("Hello from chan1".to_string()).await.unwrap();
156        chan2.send("Hello from chan2".to_string()).await.unwrap();
157
158        let msg1 = chan2.recv().await.unwrap();
159        let msg2 = chan1.recv().await.unwrap();
160
161        assert_eq!(msg1, "Hello from chan1");
162        assert_eq!(msg2, "Hello from chan2");
163    }
164
165    #[tokio::test]
166    async fn test_try_recv() {
167        let (mut chan1, mut chan2) = channel::<String, String>(10);
168
169        chan1.send("Hello from chan1".to_string()).await.unwrap();
170
171        let msg1 = chan2.try_recv().unwrap();
172        assert_eq!(msg1, "Hello from chan1");
173
174        let try_recv_result = chan2.try_recv();
175        assert!(try_recv_result.is_err());
176    }
177
178    #[tokio::test]
179    async fn test_threading() {
180        let (mut chan1, mut chan2) = channel::<String, String>(10);
181
182        let handle = thread::spawn(move || {
183            let rt = Runtime::new().unwrap();
184            rt.block_on(async {
185                chan1.send("Hello from chan1".to_string()).await.unwrap();
186                let msg = chan1.recv().await.unwrap();
187                assert_eq!(msg, "Hello from chan2");
188            });
189        });
190
191        chan2.send("Hello from chan2".to_string()).await.unwrap();
192        let msg = chan2.recv().await.unwrap();
193        assert_eq!(msg, "Hello from chan1");
194
195        handle.join().unwrap();
196    }
197}