1#![deny(warnings)]
24#![deny(missing_docs)]
25#![deny(missing_debug_implementations)]
26extern crate futures;
27
28use std::cell::RefCell;
29use std::fmt;
30use std::rc::Rc;
31
32use futures::{Future, Poll, Async};
33use futures::task::{self, Task};
34
35pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
37 let inner = Rc::new(RefCell::new(Inner {
38 value: None,
39 complete: false,
40 tx_task: None,
41 rx_task: None,
42 }));
43 let tx = Sender {
44 inner: inner.clone(),
45 };
46 let rx = Receiver {
47 inner: inner,
48 };
49 (tx, rx)
50}
51
52pub struct Sender<T> {
54 inner: Rc<RefCell<Inner<T>>>,
55}
56
57impl<T> Sender<T> {
58 pub fn complete(self, val: T) {
60 let mut borrow = self.inner.borrow_mut();
61 borrow.value = Some(val);
62 }
63
64 pub fn is_canceled(&self) -> bool {
66 self.inner.borrow().complete
67 }
68
69 pub fn waiting(self) -> Waiting<T> {
71 Waiting {
72 tx: Some(self),
73 }
74 }
75}
76
77impl<T> Drop for Sender<T> {
78 fn drop(&mut self) {
79 let rx_task = {
80 let mut borrow = self.inner.borrow_mut();
81 borrow.complete = true;
82 borrow.tx_task.take();
83 borrow.rx_task.take()
84 };
85 if let Some(task) = rx_task {
86 task.notify();
87 }
88 }
89}
90
91impl<T> fmt::Debug for Sender<T> {
92 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93 f.pad("Sender")
94 }
95}
96
97pub struct Receiver<T> {
101 inner: Rc<RefCell<Inner<T>>>,
102}
103
104impl<T> Receiver<T> {
105 pub fn is_canceled(&self) -> bool {
107 let borrow = self.inner.borrow();
108 borrow.complete && borrow.value.is_none()
109 }
110
111 pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
119 self.recv_inner(false)
120 }
121
122 fn recv_inner(&mut self, should_park: bool) -> Result<Option<T>, Canceled> {
123 let mut borrow = self.inner.borrow_mut();
124 if let Some(val) = borrow.value.take() {
125 Ok(Some(val))
126 } else if borrow.complete {
127 Err(Canceled)
128 } else {
129 if should_park {
130 borrow.rx_task = Some(task::current());
131 }
132 if let Some(task) = borrow.tx_task.take() {
133 task.notify();
134 }
135 Ok(None)
136 }
137 }
138}
139
140impl<T> Future for Receiver<T> {
141 type Item = T;
142 type Error = Canceled;
143
144 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
145 self.recv_inner(true).map(|opt| match opt {
146 Some(t) => Async::Ready(t),
147 None => Async::NotReady,
148 })
149 }
150}
151
152impl<T> Drop for Receiver<T> {
153 fn drop(&mut self) {
154 let tx_task = {
155 let mut borrow = self.inner.borrow_mut();
156 borrow.complete = true;
157 borrow.rx_task.take();
158 borrow.tx_task.take()
159 };
160 if let Some(task) = tx_task {
161 task.notify();
162 }
163 }
164}
165
166impl<T> fmt::Debug for Receiver<T> {
167 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
168 f.pad("Receiver")
169 }
170}
171
172pub struct Waiting<T> {
174 tx: Option<Sender<T>>,
175}
176
177impl<T> Future for Waiting<T> {
178 type Item = Sender<T>;
179 type Error = Canceled;
180
181 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
182 if self.tx.as_ref().unwrap().is_canceled() {
183 Err(Canceled)
184 } else if self.tx.as_ref().unwrap().inner.borrow().rx_task.is_some() {
185 Ok(Async::Ready(self.tx.take().unwrap()))
186 } else {
187 self.tx.as_ref().unwrap().inner.borrow_mut().tx_task = Some(task::current());
188 Ok(Async::NotReady)
189 }
190 }
191}
192
193impl<T> fmt::Debug for Waiting<T> {
194 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
195 f.pad("Waiting")
196 }
197}
198
199#[derive(Debug, Clone, Copy, PartialEq)]
201pub struct Canceled;
202
203struct Inner<T> {
204 value: Option<T>,
205 complete: bool,
206 tx_task: Option<Task>,
207 rx_task: Option<Task>,
208}
209
210#[cfg(test)]
211mod tests {
212 use futures::Future;
213 use super::channel;
214
215 #[test]
216 fn test_smoke() {
217 let (tx, rx) = channel();
218 tx.complete(33);
219 assert_eq!(rx.wait().unwrap(), 33);
220 }
221
222 #[test]
223 fn test_canceled() {
224 let (_, rx) = channel::<()>();
225 assert_eq!(rx.wait().unwrap_err(), super::Canceled);
226 }
227
228 #[test]
229 fn test_is_canceled() {
230 let (tx, _) = channel::<()>();
231 assert!(tx.is_canceled());
232
233 let (_, rx) = channel::<()>();
234 assert!(rx.is_canceled());
235
236 let (tx, rx) = channel::<()>();
237 assert!(!tx.is_canceled());
238 assert!(!rx.is_canceled());
239
240 tx.complete(());
241 assert!(!rx.is_canceled());
242 }
243
244 #[test]
245 fn test_tx_complete_rx_unparked() {
246 let (tx, rx) = channel();
247
248 let res = rx.join(::futures::lazy(move || {
249 tx.complete(55);
250 Ok(11)
251 }));
252 assert_eq!(res.wait().unwrap(), (55, 11));
253 }
254
255 #[test]
256 fn test_tx_dropped_rx_unparked() {
257 let (tx, rx) = channel::<i32>();
258
259 let res = rx.join(::futures::lazy(move || {
260 let _tx = tx;
261 Ok(11)
262 }));
263 assert_eq!(res.wait().unwrap_err(), super::Canceled);
264 }
265
266 #[test]
267 fn test_waiting_unparked() {
268 let (tx, rx) = channel::<i32>();
269
270 let res = tx.waiting().join(::futures::lazy(move || {
271 let mut rx = rx;
272 let _ = rx.poll(); Ok(rx)
274 })).and_then(|(tx, rx)| {
275 tx.complete(5);
276 rx
277 });
278 assert_eq!(res.wait().unwrap(), 5);
279 }
280
281 #[test]
282 fn test_waiting_canceled() {
283 let (tx, rx) = channel::<i32>();
284
285 let res = tx.waiting().join(::futures::lazy(move || {
286 let _rx = rx;
287 Ok(())
288 }));
289 assert_eq!(res.wait().unwrap_err(), super::Canceled);
290 }
291
292 #[test]
293 fn try_recv() {
294 let (tx, mut rx) = channel::<i32>();
295
296 assert!(rx.try_recv().unwrap().is_none());
297
298 ::futures::lazy(move || {
299 tx.complete(5);
300 Ok::<(), ()>(())
301 }).wait().unwrap();
302
303 assert_eq!(rx.try_recv().unwrap(), Some(5));
304 }
305
306 #[test]
307 fn try_recv_canceled() {
308 let (tx, mut rx) = channel::<i32>();
309
310 assert!(rx.try_recv().unwrap().is_none());
311
312 ::futures::lazy(move || {
313 let _tx = tx;
314 Ok::<(), ()>(())
315 }).wait().unwrap();
316
317 assert!(rx.try_recv().is_err());
318 }
319}