async_option/
lib.rs

1//! This crate provides an asynchronous, atomic `Option` type.
2//!
3//! At a high level, this crate is exactly like `Arc<Mutex<Option<T>>>`, except with support for
4//! asynchronous operations. Given an [`Aption<T>`], you can call [`poll_put`] to attempt to place
5//! a value into the `Option`, or `poll_take` to take a value out of the `Option`. Both methods
6//! will return `Async::NotReady` if the `Option` is occupied or empty respectively, and will at
7//! that point have scheduled for the current task to be notified when the `poll_*` call may
8//! succeed in the future. `Aption<T>` can also be used as a `Sink<SinkItem = T>` and `Stream<Item
9//! = T>` by effectively operating as a single-element channel.
10//!
11//! An `Aption<T>` can also be closed using [`poll_close`]. Any `poll_put` after a `poll_close`
12//! will fail, and the next `poll_take` will return the current value (if any), and from then on
13//! `poll_take` will return an error.
14//!
15//!   [`Aption<T>`]: struct.Aption.html
16//!   [`poll_put`]: struct.Aption.html#method.poll_put
17//!   [`poll_take`]: struct.Aption.html#method.poll_take
18//!   [`poll_close`]: struct.Aption.html#method.poll_close
19
20#![deny(
21    unused_extern_crates,
22    missing_debug_implementations,
23    missing_docs,
24    unreachable_pub
25)]
26#![cfg_attr(test, deny(warnings))]
27
28use futures::{task, try_ready, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
29use std::cell::UnsafeCell;
30use std::sync::Arc;
31use std::{fmt, mem};
32use tokio_sync::semaphore;
33
34/// Indicates that an [`Aption`] has been closed, and no further operations are available on it.
35///
36///   [`Aption`]: struct.Aption.html
37#[derive(Debug, Clone, Hash, Eq, PartialEq)]
38pub struct Closed;
39
40/// An asynchronous, atomic `Option` type.
41///
42/// See the [crate-level documentation] for details.
43///
44///   [crate-level documentation]: ../
45#[derive(Debug)]
46pub struct Aption<T> {
47    inner: Arc<Inner<T>>,
48    permit: semaphore::Permit,
49}
50
51impl<T> Clone for Aption<T> {
52    fn clone(&self) -> Self {
53        Aption {
54            inner: self.inner.clone(),
55            permit: semaphore::Permit::new(),
56        }
57    }
58}
59
60#[allow(missing_docs)]
61pub fn new<T>() -> Aption<T> {
62    let m = Arc::new(Inner {
63        semaphore: semaphore::Semaphore::new(1),
64        value: UnsafeCell::new(CellValue::None),
65        put_task: task::AtomicTask::new(),
66        take_task: task::AtomicTask::new(),
67    });
68
69    Aption {
70        inner: m.clone(),
71        permit: semaphore::Permit::new(),
72    }
73}
74
75enum CellValue<T> {
76    Some(T),
77    None,
78    Fin(Option<T>),
79}
80
81impl<T> CellValue<T> {
82    fn is_none(&self) -> bool {
83        if let CellValue::None = *self {
84            true
85        } else {
86            false
87        }
88    }
89
90    fn take(&mut self) -> Option<T> {
91        match mem::replace(self, CellValue::None) {
92            CellValue::None => None,
93            CellValue::Some(t) => Some(t),
94            CellValue::Fin(f) => {
95                // retore Fin bit
96                mem::replace(self, CellValue::Fin(None));
97                f
98            }
99        }
100    }
101}
102
103struct Inner<T> {
104    semaphore: semaphore::Semaphore,
105    value: UnsafeCell<CellValue<T>>,
106    put_task: task::AtomicTask,
107    take_task: task::AtomicTask,
108}
109
110// we never expose &T, only ever &mut T, so we only require T: Send
111unsafe impl<T: Send> Sync for Inner<T> {}
112unsafe impl<T: Send> Send for Inner<T> {}
113
114impl<T> fmt::Debug for Inner<T> {
115    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
116        write!(f, "AptionInner")
117    }
118}
119
120struct TakeFuture<T>(Option<Aption<T>>);
121impl<T> Future for TakeFuture<T> {
122    type Item = (Aption<T>, T);
123    type Error = Closed;
124
125    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
126        let t = try_ready!(self
127            .0
128            .as_mut()
129            .expect("called poll after future resolved")
130            .poll_take());
131        Ok(Async::Ready((self.0.take().unwrap(), t)))
132    }
133}
134
135struct PutFuture<T>(Option<Aption<T>>, Option<T>);
136impl<T> Future for PutFuture<T> {
137    type Item = Aption<T>;
138    type Error = T;
139
140    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
141        let t = self.1.take().expect("called poll after future resolved");
142        match self
143            .0
144            .as_mut()
145            .expect("called poll after future resolved")
146            .poll_put(t)
147        {
148            Ok(AsyncSink::Ready) => Ok(Async::Ready(self.0.take().unwrap())),
149            Ok(AsyncSink::NotReady(t)) => {
150                self.1 = Some(t);
151                Ok(Async::NotReady)
152            }
153            Err(t) => Err(t),
154        }
155    }
156}
157
158impl<T> Aption<T> {
159    /// Returns a `Future` that resolves when a value is successfully taken from the `Aption<T>`.
160    pub fn take(self) -> impl Future<Item = (Self, T), Error = Closed> {
161        TakeFuture(Some(self))
162    }
163
164    /// Returns a `Future` that resolves when the given value is successfully placed in the
165    /// `Aption<T>`.
166    pub fn put(self, t: T) -> impl Future<Item = Self, Error = T> {
167        PutFuture(Some(self), Some(t))
168    }
169}
170
171impl<T> Aption<T> {
172    /// Attempt to take the value contained in the `Aption`.
173    ///
174    /// Returns `NotReady` if no value is available, and schedules the current task to be woken up
175    /// when one might be.
176    ///
177    /// Returns an error if the `Aption` has been closed with `Aption::poll_close`.
178    pub fn poll_take(&mut self) -> Poll<T, Closed> {
179        try_ready!(self
180            .permit
181            .poll_acquire(&self.inner.semaphore)
182            .map_err(|_| unreachable!("semaphore dropped while we have an Arc to it")));
183
184        // we have the lock -- is there a value?
185        let value = unsafe { &mut *self.inner.value.get() };
186
187        let v = value.take();
188        if v.is_none() {
189            // no, sadly not...
190            // has it been closed altogether?
191            if let CellValue::Fin(None) = *value {
192                // it has! nothing more to do except release the permit
193                // don't even have to wake anyone up, since we didn't take anything
194                self.permit.release(&self.inner.semaphore);
195                return Err(Closed);
196            }
197
198            // we're going to have to wait for someone to put a value.
199            // we need to do this _before_ releasing the lock,
200            // otherwise we might miss a quick notify.
201            self.inner.take_task.register();
202        }
203
204        // give up the lock for someone to put
205        self.permit.release(&self.inner.semaphore);
206
207        if let Some(t) = v {
208            // let waiting putters know that they can now put
209            self.inner.put_task.notify();
210            Ok(Async::Ready(t))
211        } else {
212            Ok(Async::NotReady)
213        }
214    }
215
216    /// Attempt to put a value into the `Aption`.
217    ///
218    /// Returns `NotReady` if there's already a value there, and schedules the current task to be
219    /// woken up when the `Aption` is free again.
220    ///
221    /// Returns an error if the `Aption` has been closed with `Aption::poll_close`.
222    pub fn poll_put(&mut self, t: T) -> Result<AsyncSink<T>, T> {
223        match self.permit.poll_acquire(&self.inner.semaphore) {
224            Ok(Async::Ready(())) => {}
225            Ok(Async::NotReady) => {
226                return Ok(AsyncSink::NotReady(t));
227            }
228            Err(_) => {
229                unreachable!("semaphore dropped while we have an Arc to it");
230            }
231        }
232
233        // we have the lock!
234        let value = unsafe { &mut *self.inner.value.get() };
235
236        // has the channel already been closed?
237        if let CellValue::Fin(_) = *value {
238            // it has, so we're not going to get to send our value
239            // we do have to release the lock though
240            self.permit.release(&self.inner.semaphore);
241            return Err(t);
242        }
243
244        // is there already a value there?
245        if value.is_none() {
246            // no, we're home free!
247            *value = CellValue::Some(t);
248
249            // give up the lock so someone can take
250            self.permit.release(&self.inner.semaphore);
251
252            // and notify any waiting takers
253            self.inner.take_task.notify();
254
255            Ok(AsyncSink::Ready)
256        } else {
257            // yes, sadly, so we can't put...
258
259            // we're going to have to wait for someone to take the existing value.
260            // we need to do this _before_ releasing the lock,
261            // otherwise we might miss a quick notify.
262            self.inner.put_task.register();
263
264            // give up the lock so someone can take
265            self.permit.release(&self.inner.semaphore);
266
267            Ok(AsyncSink::NotReady(t))
268        }
269    }
270
271    /// Indicate the `Aption` as closed so that no future puts are permitted.
272    ///
273    /// Once this method succeeds, every subsequent call to `Aption::poll_put` will return an
274    /// error. If there is currently a value in the `Aption`, the next call to `Aption::poll_take`
275    /// will return that value. Any later calls to `Aption::poll_take` will return an error.
276    pub fn poll_close(&mut self) -> Poll<(), ()> {
277        try_ready!(self
278            .permit
279            .poll_acquire(&self.inner.semaphore)
280            .map_err(|_| unreachable!("semaphore dropped while we have an Arc to it")));
281
282        // we have the lock -- wrap whatever value is there in Fin
283        let value = unsafe { &mut *self.inner.value.get() };
284        let v = value.take();
285        *value = CellValue::Fin(v);
286
287        // if the value is None, we've closed successfully!
288        let ret = if let CellValue::Fin(None) = *value {
289            Async::Ready(())
290        } else {
291            // otherwise, we'll have to wait for someone to take
292            // and again, *before* we release the lock
293            self.inner.put_task.register();
294            Async::NotReady
295        };
296
297        // give up the lock so someone can take
298        self.permit.release(&self.inner.semaphore);
299
300        // and notify any waiting takers
301        self.inner.take_task.notify();
302
303        Ok(ret)
304    }
305}
306
307impl<T> Sink for Aption<T> {
308    type SinkItem = T;
309    type SinkError = T;
310
311    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
312        self.poll_put(item)
313    }
314
315    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
316        self.poll_close()
317            .map_err(|_| unreachable!("failed to close because already closed elsewhere"))
318    }
319}
320
321impl<T> Stream for Aption<T> {
322    type Item = T;
323    type Error = ();
324
325    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
326        match self.poll_take() {
327            Ok(Async::Ready(v)) => Ok(Async::Ready(Some(v))),
328            Ok(Async::NotReady) => Ok(Async::NotReady),
329            Err(Closed) => {
330                // error on take just means it's been closed
331                Ok(Async::Ready(None))
332            }
333        }
334    }
335}
336
337#[cfg(test)]
338mod test {
339    use super::*;
340    use tokio_mock_task::MockTask;
341
342    #[test]
343    fn basic() {
344        let mut mt = MockTask::new();
345
346        let mut a = new::<usize>();
347        assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
348        assert!(!mt.is_notified());
349        assert_eq!(mt.enter(|| a.poll_put(42)), Ok(AsyncSink::Ready));
350        assert!(mt.is_notified()); // taker is notified
351        assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(42)));
352
353        assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
354        assert!(!mt.is_notified());
355        assert_eq!(mt.enter(|| a.poll_put(43)), Ok(AsyncSink::Ready));
356        assert!(mt.is_notified()); // taker is notified
357        assert_eq!(mt.enter(|| a.poll_put(44)), Ok(AsyncSink::NotReady(44)));
358        assert!(!mt.is_notified());
359        assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(43)));
360        assert!(mt.is_notified()); // putter is notified
361        assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
362        assert!(!mt.is_notified());
363        assert_eq!(mt.enter(|| a.poll_put(44)), Ok(AsyncSink::Ready));
364        assert!(mt.is_notified());
365
366        // close fails since there's still a message to be sent
367        assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::NotReady));
368        assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(44)));
369        assert!(mt.is_notified()); // closer is notified
370        assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::Ready(())));
371        assert!(!mt.is_notified());
372        assert_eq!(mt.enter(|| a.poll_take()), Err(Closed));
373    }
374
375    #[test]
376    fn sink_stream() {
377        use tokio::prelude::*;
378
379        let a = new::<usize>();
380        let (mut tx, rx) = tokio_sync::mpsc::unbounded_channel();
381        tokio::run(future::lazy(move || {
382            tokio::spawn(
383                rx.forward(a.clone().sink_map_err(|_| unreachable!()))
384                    .map(|_| ())
385                    .map_err(|_| unreachable!()),
386            );
387
388            // send a bunch of things, and make sure we get them all
389            tx.try_send(1).unwrap();
390            tx.try_send(2).unwrap();
391            tx.try_send(3).unwrap();
392            tx.try_send(4).unwrap();
393            tx.try_send(5).unwrap();
394            drop(tx);
395
396            a.collect()
397                .inspect(|v| {
398                    assert_eq!(v, &[1, 2, 3, 4, 5]);
399                })
400                .map(|_| ())
401        }));
402    }
403
404    #[test]
405    fn futures() {
406        use tokio::prelude::*;
407
408        let a = new::<usize>();
409        tokio::run(future::lazy(move || {
410            a.put(42)
411                .map_err(|_| unreachable!())
412                .and_then(|a| a.take())
413                .map_err(|_| unreachable!())
414                .and_then(|(a, v)| {
415                    assert_eq!(v, 42);
416                    a.put(43)
417                })
418                .map_err(|_| unreachable!())
419                .and_then(|a| a.take())
420                .map_err(|_| unreachable!())
421                .inspect(|(_, v)| {
422                    assert_eq!(*v, 43);
423                })
424                .map(|_| ())
425        }));
426    }
427
428    #[test]
429    fn notified_on_empty_drop() {
430        let mut mt = MockTask::new();
431
432        let mut a = new::<usize>();
433        assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
434        assert!(!mt.is_notified());
435        assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::Ready(())));
436        assert!(mt.is_notified());
437        assert_eq!(mt.enter(|| a.poll_take()), Err(Closed));
438    }
439}