tiny_actor/channel/
receiving.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use crate::*;
7use concurrent_queue::PopError;
8use event_listener::EventListener;
9use futures::{pin_mut, Future, FutureExt};
10use tokio::task::yield_now;
11
12impl<M> Channel<M> {
13    /// This will attempt to receive a message from the [Inbox]. If there is no message, this
14    /// will return `None`.
15    pub fn try_recv(&self, signaled_halt: &mut bool) -> Result<M, TryRecvError> {
16        if !(*signaled_halt) && self.inbox_should_halt() {
17            *signaled_halt = true;
18            Err(TryRecvError::Halted)
19        } else {
20            self.pop_msg().map_err(|e| match e {
21                PopError::Empty => TryRecvError::Empty,
22                PopError::Closed => TryRecvError::ClosedAndEmpty,
23            })
24        }
25    }
26
27    /// Wait until there is a message in the [Inbox].
28    pub fn recv<'a>(
29        &'a self,
30        signaled_halt: &'a mut bool,
31        listener: &'a mut Option<EventListener>,
32    ) -> RecvFut<'a, M> {
33        RecvFut {
34            channel: self,
35            signaled_halt,
36            listener,
37        }
38    }
39
40    fn poll_try_recv(
41        &self,
42        signaled_halt: &mut bool,
43        listener: &mut Option<EventListener>,
44    ) -> Option<Result<M, RecvError>> {
45        match self.try_recv(signaled_halt) {
46            Ok(msg) => {
47                *listener = None;
48                Some(Ok(msg))
49            }
50            Err(signal) => match signal {
51                TryRecvError::Halted => {
52                    *listener = None;
53                    Some(Err(RecvError::Halted))
54                }
55                TryRecvError::ClosedAndEmpty => {
56                    *listener = None;
57                    Some(Err(RecvError::ClosedAndEmpty))
58                }
59                TryRecvError::Empty => None,
60            },
61        }
62    }
63}
64
65/// A future returned by receiving messages from an [Inbox].
66///
67/// This can be awaited or streamed to get the messages.
68#[derive(Debug)]
69pub struct RecvFut<'a, M> {
70    channel: &'a Channel<M>,
71    signaled_halt: &'a mut bool,
72    listener: &'a mut Option<EventListener>,
73}
74
75impl<'a, M> Unpin for RecvFut<'a, M> {}
76
77impl<'a, M> Future for RecvFut<'a, M> {
78    type Output = Result<M, RecvError>;
79
80    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81        let Self {
82            channel,
83            signaled_halt,
84            listener,
85        } = &mut *self;
86
87        // First try to receive once, and yield if successful
88        if let Some(res) = channel.poll_try_recv(signaled_halt, listener) {
89            let fut = yield_now();
90            pin_mut!(fut);
91            let _ = fut.poll(cx);
92            return Poll::Ready(res);
93        }
94
95        loop {
96            // Otherwise, acquire a listener, if we don't have one yet
97            if listener.is_none() {
98                **listener = Some(channel.get_recv_listener())
99            }
100
101            // Attempt to receive a message, and return if ready
102            if let Some(res) = channel.poll_try_recv(signaled_halt, listener) {
103                return Poll::Ready(res);
104            }
105
106            // And poll the listener
107            match listener.as_mut().unwrap().poll_unpin(cx) {
108                Poll::Ready(()) => {
109                    **listener = None;
110                    // Attempt to receive a message, and return if ready
111                    if let Some(res) = channel.poll_try_recv(signaled_halt, listener) {
112                        return Poll::Ready(res);
113                    }
114                }
115                Poll::Pending => return Poll::Pending,
116            }
117        }
118    }
119}
120
121impl<'a, M> Drop for RecvFut<'a, M> {
122    fn drop(&mut self) {
123        *self.listener = None;
124    }
125}
126
127#[cfg(test)]
128mod test {
129    use std::{future::ready, sync::Arc, time::Duration};
130
131    use crate::*;
132
133    #[test]
134    fn try_recv() {
135        let channel = Channel::<()>::new(1, 1, Capacity::default());
136        channel.push_msg(()).unwrap();
137        channel.push_msg(()).unwrap();
138
139        assert!(channel.try_recv(&mut true).is_ok());
140        assert!(channel.try_recv(&mut false).is_ok());
141        assert_eq!(channel.try_recv(&mut true), Err(TryRecvError::Empty));
142        assert_eq!(channel.try_recv(&mut false), Err(TryRecvError::Empty));
143    }
144
145    #[test]
146    fn try_recv_closed() {
147        let channel = Channel::<()>::new(1, 1, Capacity::default());
148        channel.push_msg(()).unwrap();
149        channel.push_msg(()).unwrap();
150        channel.close();
151
152        assert!(channel.try_recv(&mut true).is_ok());
153        assert!(channel.try_recv(&mut false).is_ok());
154        assert_eq!(
155            channel.try_recv(&mut true),
156            Err(TryRecvError::ClosedAndEmpty)
157        );
158        assert_eq!(
159            channel.try_recv(&mut false),
160            Err(TryRecvError::ClosedAndEmpty)
161        );
162    }
163
164    #[test]
165    fn try_recv_halt() {
166        let channel = Channel::<()>::new(1, 1, Capacity::default());
167        channel.push_msg(()).unwrap();
168        channel.push_msg(()).unwrap();
169        channel.halt_some(1);
170
171        assert_eq!(channel.try_recv(&mut false), Err(TryRecvError::Halted));
172        assert!(channel.try_recv(&mut true).is_ok());
173        assert!(channel.try_recv(&mut false).is_ok());
174        assert_eq!(channel.try_recv(&mut true), Err(TryRecvError::Empty));
175        assert_eq!(channel.try_recv(&mut false), Err(TryRecvError::Empty));
176    }
177
178    #[tokio::test]
179    async fn recv_immedeate() {
180        let channel = Channel::<()>::new(1, 1, Capacity::default());
181        let mut listener = None;
182        channel.push_msg(()).unwrap();
183        channel.close();
184
185        assert_eq!(channel.recv(&mut false, &mut listener).await, Ok(()));
186        assert_eq!(
187            channel.recv(&mut false, &mut listener).await,
188            Err(RecvError::ClosedAndEmpty)
189        );
190    }
191
192    #[tokio::test]
193    async fn recv_delayed() {
194        let channel = Arc::new(Channel::<()>::new(1, 1, Capacity::default()));
195        let channel_clone = channel.clone();
196
197        let handle = tokio::task::spawn(async move {
198            let mut listener = None;
199            assert_eq!(channel_clone.recv(&mut false, &mut listener).await, Ok(()));
200            assert_eq!(
201                channel_clone.recv(&mut false, &mut listener).await,
202                Err(RecvError::ClosedAndEmpty)
203            );
204        });
205
206        tokio::time::sleep(Duration::from_millis(10)).await;
207        channel.push_msg(()).unwrap();
208        channel.close();
209        handle.await.unwrap();
210    }
211
212    #[tokio::test]
213    async fn dropping_recv_notifies_next() {
214        let channel = Arc::new(Channel::<()>::new(1, 1, Capacity::default()));
215        let channel_clone = channel.clone();
216
217        let handle = tokio::task::spawn(async move {
218            let mut listener = None;
219            let mut halt = false;
220            let mut recv1 = channel_clone.recv(&mut halt, &mut listener);
221            tokio::select! {
222                biased;
223                _ = &mut recv1 => {
224                    panic!()
225                }
226                _ = ready(||()) => {
227                    ()
228                }
229            }
230            let mut listener = None;
231            let mut halt = false;
232            let recv2 = channel_clone.recv(&mut halt, &mut listener);
233            drop(recv1);
234            recv2.await.unwrap();
235        });
236
237        tokio::time::sleep(Duration::from_millis(10)).await;
238        channel.push_msg(()).unwrap();
239        channel.close();
240        handle.await.unwrap();
241    }
242}