1#![deny(
21 unused_extern_crates,
22 missing_debug_implementations,
23 missing_docs,
24 unreachable_pub
25)]
26#![cfg_attr(test, deny(warnings))]
27
28use futures::{task, try_ready, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
29use std::cell::UnsafeCell;
30use std::sync::Arc;
31use std::{fmt, mem};
32use tokio_sync::semaphore;
33
34#[derive(Debug, Clone, Hash, Eq, PartialEq)]
38pub struct Closed;
39
40#[derive(Debug)]
46pub struct Aption<T> {
47 inner: Arc<Inner<T>>,
48 permit: semaphore::Permit,
49}
50
51impl<T> Clone for Aption<T> {
52 fn clone(&self) -> Self {
53 Aption {
54 inner: self.inner.clone(),
55 permit: semaphore::Permit::new(),
56 }
57 }
58}
59
60#[allow(missing_docs)]
61pub fn new<T>() -> Aption<T> {
62 let m = Arc::new(Inner {
63 semaphore: semaphore::Semaphore::new(1),
64 value: UnsafeCell::new(CellValue::None),
65 put_task: task::AtomicTask::new(),
66 take_task: task::AtomicTask::new(),
67 });
68
69 Aption {
70 inner: m.clone(),
71 permit: semaphore::Permit::new(),
72 }
73}
74
75enum CellValue<T> {
76 Some(T),
77 None,
78 Fin(Option<T>),
79}
80
81impl<T> CellValue<T> {
82 fn is_none(&self) -> bool {
83 if let CellValue::None = *self {
84 true
85 } else {
86 false
87 }
88 }
89
90 fn take(&mut self) -> Option<T> {
91 match mem::replace(self, CellValue::None) {
92 CellValue::None => None,
93 CellValue::Some(t) => Some(t),
94 CellValue::Fin(f) => {
95 mem::replace(self, CellValue::Fin(None));
97 f
98 }
99 }
100 }
101}
102
103struct Inner<T> {
104 semaphore: semaphore::Semaphore,
105 value: UnsafeCell<CellValue<T>>,
106 put_task: task::AtomicTask,
107 take_task: task::AtomicTask,
108}
109
110unsafe impl<T: Send> Sync for Inner<T> {}
112unsafe impl<T: Send> Send for Inner<T> {}
113
114impl<T> fmt::Debug for Inner<T> {
115 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
116 write!(f, "AptionInner")
117 }
118}
119
120struct TakeFuture<T>(Option<Aption<T>>);
121impl<T> Future for TakeFuture<T> {
122 type Item = (Aption<T>, T);
123 type Error = Closed;
124
125 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
126 let t = try_ready!(self
127 .0
128 .as_mut()
129 .expect("called poll after future resolved")
130 .poll_take());
131 Ok(Async::Ready((self.0.take().unwrap(), t)))
132 }
133}
134
135struct PutFuture<T>(Option<Aption<T>>, Option<T>);
136impl<T> Future for PutFuture<T> {
137 type Item = Aption<T>;
138 type Error = T;
139
140 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
141 let t = self.1.take().expect("called poll after future resolved");
142 match self
143 .0
144 .as_mut()
145 .expect("called poll after future resolved")
146 .poll_put(t)
147 {
148 Ok(AsyncSink::Ready) => Ok(Async::Ready(self.0.take().unwrap())),
149 Ok(AsyncSink::NotReady(t)) => {
150 self.1 = Some(t);
151 Ok(Async::NotReady)
152 }
153 Err(t) => Err(t),
154 }
155 }
156}
157
158impl<T> Aption<T> {
159 pub fn take(self) -> impl Future<Item = (Self, T), Error = Closed> {
161 TakeFuture(Some(self))
162 }
163
164 pub fn put(self, t: T) -> impl Future<Item = Self, Error = T> {
167 PutFuture(Some(self), Some(t))
168 }
169}
170
171impl<T> Aption<T> {
172 pub fn poll_take(&mut self) -> Poll<T, Closed> {
179 try_ready!(self
180 .permit
181 .poll_acquire(&self.inner.semaphore)
182 .map_err(|_| unreachable!("semaphore dropped while we have an Arc to it")));
183
184 let value = unsafe { &mut *self.inner.value.get() };
186
187 let v = value.take();
188 if v.is_none() {
189 if let CellValue::Fin(None) = *value {
192 self.permit.release(&self.inner.semaphore);
195 return Err(Closed);
196 }
197
198 self.inner.take_task.register();
202 }
203
204 self.permit.release(&self.inner.semaphore);
206
207 if let Some(t) = v {
208 self.inner.put_task.notify();
210 Ok(Async::Ready(t))
211 } else {
212 Ok(Async::NotReady)
213 }
214 }
215
216 pub fn poll_put(&mut self, t: T) -> Result<AsyncSink<T>, T> {
223 match self.permit.poll_acquire(&self.inner.semaphore) {
224 Ok(Async::Ready(())) => {}
225 Ok(Async::NotReady) => {
226 return Ok(AsyncSink::NotReady(t));
227 }
228 Err(_) => {
229 unreachable!("semaphore dropped while we have an Arc to it");
230 }
231 }
232
233 let value = unsafe { &mut *self.inner.value.get() };
235
236 if let CellValue::Fin(_) = *value {
238 self.permit.release(&self.inner.semaphore);
241 return Err(t);
242 }
243
244 if value.is_none() {
246 *value = CellValue::Some(t);
248
249 self.permit.release(&self.inner.semaphore);
251
252 self.inner.take_task.notify();
254
255 Ok(AsyncSink::Ready)
256 } else {
257 self.inner.put_task.register();
263
264 self.permit.release(&self.inner.semaphore);
266
267 Ok(AsyncSink::NotReady(t))
268 }
269 }
270
271 pub fn poll_close(&mut self) -> Poll<(), ()> {
277 try_ready!(self
278 .permit
279 .poll_acquire(&self.inner.semaphore)
280 .map_err(|_| unreachable!("semaphore dropped while we have an Arc to it")));
281
282 let value = unsafe { &mut *self.inner.value.get() };
284 let v = value.take();
285 *value = CellValue::Fin(v);
286
287 let ret = if let CellValue::Fin(None) = *value {
289 Async::Ready(())
290 } else {
291 self.inner.put_task.register();
294 Async::NotReady
295 };
296
297 self.permit.release(&self.inner.semaphore);
299
300 self.inner.take_task.notify();
302
303 Ok(ret)
304 }
305}
306
307impl<T> Sink for Aption<T> {
308 type SinkItem = T;
309 type SinkError = T;
310
311 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
312 self.poll_put(item)
313 }
314
315 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
316 self.poll_close()
317 .map_err(|_| unreachable!("failed to close because already closed elsewhere"))
318 }
319}
320
321impl<T> Stream for Aption<T> {
322 type Item = T;
323 type Error = ();
324
325 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
326 match self.poll_take() {
327 Ok(Async::Ready(v)) => Ok(Async::Ready(Some(v))),
328 Ok(Async::NotReady) => Ok(Async::NotReady),
329 Err(Closed) => {
330 Ok(Async::Ready(None))
332 }
333 }
334 }
335}
336
337#[cfg(test)]
338mod test {
339 use super::*;
340 use tokio_mock_task::MockTask;
341
342 #[test]
343 fn basic() {
344 let mut mt = MockTask::new();
345
346 let mut a = new::<usize>();
347 assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
348 assert!(!mt.is_notified());
349 assert_eq!(mt.enter(|| a.poll_put(42)), Ok(AsyncSink::Ready));
350 assert!(mt.is_notified()); assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(42)));
352
353 assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
354 assert!(!mt.is_notified());
355 assert_eq!(mt.enter(|| a.poll_put(43)), Ok(AsyncSink::Ready));
356 assert!(mt.is_notified()); assert_eq!(mt.enter(|| a.poll_put(44)), Ok(AsyncSink::NotReady(44)));
358 assert!(!mt.is_notified());
359 assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(43)));
360 assert!(mt.is_notified()); assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
362 assert!(!mt.is_notified());
363 assert_eq!(mt.enter(|| a.poll_put(44)), Ok(AsyncSink::Ready));
364 assert!(mt.is_notified());
365
366 assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::NotReady));
368 assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(44)));
369 assert!(mt.is_notified()); assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::Ready(())));
371 assert!(!mt.is_notified());
372 assert_eq!(mt.enter(|| a.poll_take()), Err(Closed));
373 }
374
375 #[test]
376 fn sink_stream() {
377 use tokio::prelude::*;
378
379 let a = new::<usize>();
380 let (mut tx, rx) = tokio_sync::mpsc::unbounded_channel();
381 tokio::run(future::lazy(move || {
382 tokio::spawn(
383 rx.forward(a.clone().sink_map_err(|_| unreachable!()))
384 .map(|_| ())
385 .map_err(|_| unreachable!()),
386 );
387
388 tx.try_send(1).unwrap();
390 tx.try_send(2).unwrap();
391 tx.try_send(3).unwrap();
392 tx.try_send(4).unwrap();
393 tx.try_send(5).unwrap();
394 drop(tx);
395
396 a.collect()
397 .inspect(|v| {
398 assert_eq!(v, &[1, 2, 3, 4, 5]);
399 })
400 .map(|_| ())
401 }));
402 }
403
404 #[test]
405 fn futures() {
406 use tokio::prelude::*;
407
408 let a = new::<usize>();
409 tokio::run(future::lazy(move || {
410 a.put(42)
411 .map_err(|_| unreachable!())
412 .and_then(|a| a.take())
413 .map_err(|_| unreachable!())
414 .and_then(|(a, v)| {
415 assert_eq!(v, 42);
416 a.put(43)
417 })
418 .map_err(|_| unreachable!())
419 .and_then(|a| a.take())
420 .map_err(|_| unreachable!())
421 .inspect(|(_, v)| {
422 assert_eq!(*v, 43);
423 })
424 .map(|_| ())
425 }));
426 }
427
428 #[test]
429 fn notified_on_empty_drop() {
430 let mut mt = MockTask::new();
431
432 let mut a = new::<usize>();
433 assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
434 assert!(!mt.is_notified());
435 assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::Ready(())));
436 assert!(mt.is_notified());
437 assert_eq!(mt.enter(|| a.poll_take()), Err(Closed));
438 }
439}