Skip to main content

msg_common/
channel.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{Sink, SinkExt, Stream};
7use tokio::sync::mpsc::{
8    self, Receiver,
9    error::{TryRecvError, TrySendError},
10};
11use tokio_util::sync::{PollSendError, PollSender};
12
13/// A bounded, bi-directional channel for sending and receiving messages.
14/// Relies on Tokio's [`mpsc`] channel.
15///
16/// Channel also implements the [`Stream`] and [`Sink`] traits for convenience.
17pub struct Channel<S, R> {
18    tx: PollSender<S>,
19    rx: Receiver<R>,
20}
21
22/// Creates a new channel with the given buffer size. This will return a tuple of
23/// 2 [`Channel`]s, both of which can be used to send and receive messages.
24///
25/// It works with 2 generic types, `S` and `R`, which represent the types of
26/// messages that can be sent and received, respectively. The first channel in
27/// the tuple can be used to send messages of type `S` and receive messages of
28/// type `R`. The second channel can be used to send messages of type `R` and
29/// receive messages of type `S`.
30pub fn channel<S, R>(tx_buffer: usize, rx_buffer: usize) -> (Channel<S, R>, Channel<R, S>)
31where
32    S: Send,
33    R: Send,
34{
35    let (tx1, rx1) = mpsc::channel(tx_buffer);
36    let (tx2, rx2) = mpsc::channel(rx_buffer);
37
38    let tx1 = PollSender::new(tx1);
39    let tx2 = PollSender::new(tx2);
40
41    (Channel { tx: tx1, rx: rx2 }, Channel { tx: tx2, rx: rx1 })
42}
43
44impl<S: Send + 'static, R> Channel<S, R> {
45    /// Sends a value, waiting until there is capacity.
46    ///
47    /// A successful send occurs when it is determined that the other end of the
48    /// channel has not hung up already. An unsuccessful send would be one where
49    /// the corresponding receiver has already been closed. Note that a return
50    /// value of `Err` means that the data will never be received, but a return
51    /// value of `Ok` does not mean that the data will be received. It is
52    /// possible for the corresponding receiver to hang up immediately after
53    /// this function returns `Ok`.
54    pub async fn send(&mut self, msg: S) -> Result<(), PollSendError<S>> {
55        self.tx.send(msg).await
56    }
57
58    /// Attempts to immediately send a message on this channel.
59    ///
60    /// This method differs from `send` by returning immediately if the channel's
61    /// buffer is full or no receiver is waiting to acquire some data. Compared
62    /// with `send`, this function has two failure cases instead of one (one for
63    /// disconnection, one for a full buffer).
64    pub fn try_send(&mut self, msg: S) -> Result<(), TrySendError<S>> {
65        if let Some(tx) = self.tx.get_ref() {
66            tx.try_send(msg)
67        } else {
68            Err(TrySendError::Closed(msg))
69        }
70    }
71
72    /// Receives the next value for this channel.
73    ///
74    /// This method returns `None` if the channel has been closed and there are
75    /// no remaining messages in the channel's buffer. This indicates that no
76    /// further values can ever be received from this `Receiver`. The channel is
77    /// closed when all senders have been dropped, or when `close` is called.
78    ///
79    /// If there are no messages in the channel's buffer, but the channel has
80    /// not yet been closed, this method will sleep until a message is sent or
81    /// the channel is closed.  Note that if `close` is called, but there are
82    /// still outstanding `Permits` from before it was closed, the channel is
83    /// not considered closed by `recv` until the permits are released.
84    pub async fn recv(&mut self) -> Option<R> {
85        self.rx.recv().await
86    }
87
88    /// Tries to receive the next value for this receiver.
89    ///
90    /// This method returns the [`Empty`](TryRecvError::Empty) error if the channel is currently
91    /// empty, but there are still outstanding senders or permits.
92    ///
93    /// This method returns the [`Disconnected`](TryRecvError::Disconnected) error if the channel is
94    /// currently empty, and there are no outstanding senders or permits.
95    ///
96    /// Unlike the [`poll_recv`](Self::poll_recv) method, this method will never return an
97    /// [`Empty`](TryRecvError::Empty) error spuriously.
98    pub fn try_recv(&mut self) -> Result<R, TryRecvError> {
99        self.rx.try_recv()
100    }
101
102    /// Polls to receive the next message on this channel.
103    ///
104    /// This method returns:
105    ///
106    ///  * `Poll::Pending` if no messages are available but the channel is not closed, or if a
107    ///    spurious failure happens.
108    ///  * `Poll::Ready(Some(message))` if a message is available.
109    ///  * `Poll::Ready(None)` if the channel has been closed and all messages sent before it was
110    ///    closed have been received.
111    ///
112    /// When the method returns `Poll::Pending`, the `Waker` in the provided
113    /// `Context` is scheduled to receive a wakeup when a message is sent on any
114    /// receiver, or when the channel is closed.  Note that on multiple calls to
115    /// `poll_recv`, only the `Waker` from the `Context` passed to the most
116    /// recent call is scheduled to receive a wakeup.
117    ///
118    /// If this method returns `Poll::Pending` due to a spurious failure, then
119    /// the `Waker` will be notified when the situation causing the spurious
120    /// failure has been resolved. Note that receiving such a wakeup does not
121    /// guarantee that the next call will succeed — it could fail with another
122    /// spurious failure.
123    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<R>> {
124        self.rx.poll_recv(cx)
125    }
126}
127
128impl<S, R> Stream for Channel<S, R> {
129    type Item = R;
130
131    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
132        self.rx.poll_recv(cx)
133    }
134}
135
136impl<S: Send + 'static, R> Sink<S> for Channel<S, R> {
137    type Error = PollSendError<S>;
138
139    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
140        self.tx.poll_ready_unpin(cx)
141    }
142
143    fn start_send(mut self: Pin<&mut Self>, item: S) -> Result<(), Self::Error> {
144        self.tx.start_send_unpin(item)
145    }
146
147    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148        self.tx.poll_flush_unpin(cx)
149    }
150
151    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152        self.tx.poll_close_unpin(cx)
153    }
154}