tiny_actor/channel/
receiving.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use crate::*;
7use concurrent_queue::PopError;
8use event_listener::EventListener;
9use futures::{pin_mut, Future, FutureExt};
10use tokio::task::yield_now;
11
12impl<M> Channel<M> {
13 pub fn try_recv(&self, signaled_halt: &mut bool) -> Result<M, TryRecvError> {
16 if !(*signaled_halt) && self.inbox_should_halt() {
17 *signaled_halt = true;
18 Err(TryRecvError::Halted)
19 } else {
20 self.pop_msg().map_err(|e| match e {
21 PopError::Empty => TryRecvError::Empty,
22 PopError::Closed => TryRecvError::ClosedAndEmpty,
23 })
24 }
25 }
26
27 pub fn recv<'a>(
29 &'a self,
30 signaled_halt: &'a mut bool,
31 listener: &'a mut Option<EventListener>,
32 ) -> RecvFut<'a, M> {
33 RecvFut {
34 channel: self,
35 signaled_halt,
36 listener,
37 }
38 }
39
40 fn poll_try_recv(
41 &self,
42 signaled_halt: &mut bool,
43 listener: &mut Option<EventListener>,
44 ) -> Option<Result<M, RecvError>> {
45 match self.try_recv(signaled_halt) {
46 Ok(msg) => {
47 *listener = None;
48 Some(Ok(msg))
49 }
50 Err(signal) => match signal {
51 TryRecvError::Halted => {
52 *listener = None;
53 Some(Err(RecvError::Halted))
54 }
55 TryRecvError::ClosedAndEmpty => {
56 *listener = None;
57 Some(Err(RecvError::ClosedAndEmpty))
58 }
59 TryRecvError::Empty => None,
60 },
61 }
62 }
63}
64
65#[derive(Debug)]
69pub struct RecvFut<'a, M> {
70 channel: &'a Channel<M>,
71 signaled_halt: &'a mut bool,
72 listener: &'a mut Option<EventListener>,
73}
74
75impl<'a, M> Unpin for RecvFut<'a, M> {}
76
77impl<'a, M> Future for RecvFut<'a, M> {
78 type Output = Result<M, RecvError>;
79
80 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81 let Self {
82 channel,
83 signaled_halt,
84 listener,
85 } = &mut *self;
86
87 if let Some(res) = channel.poll_try_recv(signaled_halt, listener) {
89 let fut = yield_now();
90 pin_mut!(fut);
91 let _ = fut.poll(cx);
92 return Poll::Ready(res);
93 }
94
95 loop {
96 if listener.is_none() {
98 **listener = Some(channel.get_recv_listener())
99 }
100
101 if let Some(res) = channel.poll_try_recv(signaled_halt, listener) {
103 return Poll::Ready(res);
104 }
105
106 match listener.as_mut().unwrap().poll_unpin(cx) {
108 Poll::Ready(()) => {
109 **listener = None;
110 if let Some(res) = channel.poll_try_recv(signaled_halt, listener) {
112 return Poll::Ready(res);
113 }
114 }
115 Poll::Pending => return Poll::Pending,
116 }
117 }
118 }
119}
120
121impl<'a, M> Drop for RecvFut<'a, M> {
122 fn drop(&mut self) {
123 *self.listener = None;
124 }
125}
126
127#[cfg(test)]
128mod test {
129 use std::{future::ready, sync::Arc, time::Duration};
130
131 use crate::*;
132
133 #[test]
134 fn try_recv() {
135 let channel = Channel::<()>::new(1, 1, Capacity::default());
136 channel.push_msg(()).unwrap();
137 channel.push_msg(()).unwrap();
138
139 assert!(channel.try_recv(&mut true).is_ok());
140 assert!(channel.try_recv(&mut false).is_ok());
141 assert_eq!(channel.try_recv(&mut true), Err(TryRecvError::Empty));
142 assert_eq!(channel.try_recv(&mut false), Err(TryRecvError::Empty));
143 }
144
145 #[test]
146 fn try_recv_closed() {
147 let channel = Channel::<()>::new(1, 1, Capacity::default());
148 channel.push_msg(()).unwrap();
149 channel.push_msg(()).unwrap();
150 channel.close();
151
152 assert!(channel.try_recv(&mut true).is_ok());
153 assert!(channel.try_recv(&mut false).is_ok());
154 assert_eq!(
155 channel.try_recv(&mut true),
156 Err(TryRecvError::ClosedAndEmpty)
157 );
158 assert_eq!(
159 channel.try_recv(&mut false),
160 Err(TryRecvError::ClosedAndEmpty)
161 );
162 }
163
164 #[test]
165 fn try_recv_halt() {
166 let channel = Channel::<()>::new(1, 1, Capacity::default());
167 channel.push_msg(()).unwrap();
168 channel.push_msg(()).unwrap();
169 channel.halt_some(1);
170
171 assert_eq!(channel.try_recv(&mut false), Err(TryRecvError::Halted));
172 assert!(channel.try_recv(&mut true).is_ok());
173 assert!(channel.try_recv(&mut false).is_ok());
174 assert_eq!(channel.try_recv(&mut true), Err(TryRecvError::Empty));
175 assert_eq!(channel.try_recv(&mut false), Err(TryRecvError::Empty));
176 }
177
178 #[tokio::test]
179 async fn recv_immedeate() {
180 let channel = Channel::<()>::new(1, 1, Capacity::default());
181 let mut listener = None;
182 channel.push_msg(()).unwrap();
183 channel.close();
184
185 assert_eq!(channel.recv(&mut false, &mut listener).await, Ok(()));
186 assert_eq!(
187 channel.recv(&mut false, &mut listener).await,
188 Err(RecvError::ClosedAndEmpty)
189 );
190 }
191
192 #[tokio::test]
193 async fn recv_delayed() {
194 let channel = Arc::new(Channel::<()>::new(1, 1, Capacity::default()));
195 let channel_clone = channel.clone();
196
197 let handle = tokio::task::spawn(async move {
198 let mut listener = None;
199 assert_eq!(channel_clone.recv(&mut false, &mut listener).await, Ok(()));
200 assert_eq!(
201 channel_clone.recv(&mut false, &mut listener).await,
202 Err(RecvError::ClosedAndEmpty)
203 );
204 });
205
206 tokio::time::sleep(Duration::from_millis(10)).await;
207 channel.push_msg(()).unwrap();
208 channel.close();
209 handle.await.unwrap();
210 }
211
212 #[tokio::test]
213 async fn dropping_recv_notifies_next() {
214 let channel = Arc::new(Channel::<()>::new(1, 1, Capacity::default()));
215 let channel_clone = channel.clone();
216
217 let handle = tokio::task::spawn(async move {
218 let mut listener = None;
219 let mut halt = false;
220 let mut recv1 = channel_clone.recv(&mut halt, &mut listener);
221 tokio::select! {
222 biased;
223 _ = &mut recv1 => {
224 panic!()
225 }
226 _ = ready(||()) => {
227 ()
228 }
229 }
230 let mut listener = None;
231 let mut halt = false;
232 let recv2 = channel_clone.recv(&mut halt, &mut listener);
233 drop(recv1);
234 recv2.await.unwrap();
235 });
236
237 tokio::time::sleep(Duration::from_millis(10)).await;
238 channel.push_msg(()).unwrap();
239 channel.close();
240 handle.await.unwrap();
241 }
242}