local_sync/mpsc/
unbounded.rs

1use super::{
2    chan::{self, SendError, TryRecvError},
3    semaphore::Unlimited,
4};
5use futures_util::future::poll_fn;
6use std::task::{Context, Poll};
7
8pub struct Tx<T>(chan::Tx<T, Unlimited>);
9
10pub struct Rx<T>(chan::Rx<T, Unlimited>);
11
12pub fn channel<T>() -> (Tx<T>, Rx<T>) {
13    let semaphore = Unlimited::new();
14    let (tx, rx) = chan::channel(semaphore);
15    (Tx(tx), Rx(rx))
16}
17
18impl<T> Tx<T> {
19    pub fn send(&self, value: T) -> Result<(), SendError> {
20        self.0.send(value)
21    }
22
23    pub fn is_closed(&self) -> bool {
24        self.0.is_closed()
25    }
26
27    pub fn same_channel(&self, other: &Self) -> bool {
28        self.0.same_channel(&other.0)
29    }
30}
31
32impl<T> Clone for Tx<T> {
33    fn clone(&self) -> Self {
34        Self(self.0.clone())
35    }
36}
37
38impl<T> Rx<T> {
39    pub async fn recv(&mut self) -> Option<T> {
40        poll_fn(|cx| self.poll_recv(cx)).await
41    }
42
43    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
44        self.0.recv(cx)
45    }
46
47    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
48        self.0.try_recv()
49    }
50
51    pub fn close(&mut self) {
52        self.0.close()
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use super::channel;
59
60    #[monoio::test]
61    async fn tets_unbounded_channel() {
62        let (tx, mut rx) = channel();
63        tx.send(1).unwrap();
64        assert_eq!(rx.recv().await.unwrap(), 1);
65
66        drop(tx);
67        assert_eq!(rx.recv().await, None);
68    }
69}