local_sync/mpsc/
bounded.rs

1use super::chan::{self, SendError, TryRecvError};
2use crate::semaphore::Inner;
3use futures_util::future::poll_fn;
4use std::task::{Context, Poll};
5
6pub struct Tx<T>(chan::Tx<T, Inner>);
7
8pub struct Rx<T>(chan::Rx<T, Inner>);
9
10pub fn channel<T>(buffer: usize) -> (Tx<T>, Rx<T>) {
11    let semaphore = Inner::new(buffer);
12    let (tx, rx) = chan::channel(semaphore);
13    (Tx(tx), Rx(rx))
14}
15
16impl<T> Tx<T> {
17    pub async fn send(&self, value: T) -> Result<(), SendError> {
18        // acquire semaphore first
19        self.0
20            .chan
21            .semaphore
22            .acquire(1)
23            .await
24            .map_err(|_| SendError::RxClosed)?;
25        self.0.send(value)
26    }
27
28    pub fn is_closed(&self) -> bool {
29        self.0.is_closed()
30    }
31
32    pub fn same_channel(&self, other: &Self) -> bool {
33        self.0.same_channel(&other.0)
34    }
35}
36
37impl<T> Clone for Tx<T> {
38    fn clone(&self) -> Self {
39        Self(self.0.clone())
40    }
41}
42
43impl<T> Rx<T> {
44    pub async fn recv(&mut self) -> Option<T> {
45        poll_fn(|cx| self.poll_recv(cx)).await
46    }
47
48    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
49        self.0.recv(cx)
50    }
51
52    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
53        self.0.try_recv()
54    }
55
56    pub fn close(&mut self) {
57        self.0.close()
58    }
59}
60
61#[cfg(test)]
62mod tests {
63    use super::channel;
64
65    #[monoio::test]
66    async fn tets_bounded_channel() {
67        let (tx, mut rx) = channel(1);
68        tx.send(1).await.unwrap();
69        assert_eq!(rx.recv().await.unwrap(), 1);
70
71        drop(tx);
72        assert_eq!(rx.recv().await, None);
73    }
74}