tokio_channel/
oneshot.rs

1//! A one-shot, futures-aware channel
2
3use lock::Lock;
4
5use futures::{Future, Poll, Async};
6use futures::task::{self, Task};
7
8use std::sync::Arc;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering::SeqCst;
11use std::error::Error;
12use std::fmt;
13
14/// A future representing the completion of a computation happening elsewhere in
15/// memory.
16///
17/// This is created by the `oneshot::channel` function.
18#[must_use = "futures do nothing unless polled"]
19#[derive(Debug)]
20pub struct Receiver<T> {
21    inner: Arc<Inner<T>>,
22}
23
24/// Represents the completion half of a oneshot through which the result of a
25/// computation is signaled.
26///
27/// This is created by the `oneshot::channel` function.
28#[derive(Debug)]
29pub struct Sender<T> {
30    inner: Arc<Inner<T>>,
31}
32
33/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
34/// the internal synchronization between the two for send/recv operations.
35#[derive(Debug)]
36struct Inner<T> {
37    /// Indicates whether this oneshot is complete yet. This is filled in both
38    /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
39    /// appropriately.
40    ///
41    /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
42    /// unlocked and ready to be inspected.
43    ///
44    /// For `Sender` if this is `true` then the oneshot has gone away and it
45    /// can return ready from `poll_cancel`.
46    complete: AtomicBool,
47
48    /// The actual data being transferred as part of this `Receiver`. This is
49    /// filled in by `Sender::complete` and read by `Receiver::poll`.
50    ///
51    /// Note that this is protected by `Lock`, but it is in theory safe to
52    /// replace with an `UnsafeCell` as it's actually protected by `complete`
53    /// above. I wouldn't recommend doing this, however, unless someone is
54    /// supremely confident in the various atomic orderings here and there.
55    data: Lock<Option<T>>,
56
57    /// Field to store the task which is blocked in `Receiver::poll`.
58    ///
59    /// This is filled in when a oneshot is polled but not ready yet. Note that
60    /// the `Lock` here, unlike in `data` above, is important to resolve races.
61    /// Both the `Receiver` and the `Sender` halves understand that if they
62    /// can't acquire the lock then some important interference is happening.
63    rx_task: Lock<Option<Task>>,
64
65    /// Like `rx_task` above, except for the task blocked in
66    /// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`.
67    tx_task: Lock<Option<Task>>,
68}
69
70/// Creates a new futures-aware, one-shot channel.
71///
72/// This function is similar to Rust's channels found in the standard library.
73/// Two halves are returned, the first of which is a `Sender` handle, used to
74/// signal the end of a computation and provide its value. The second half is a
75/// `Receiver` which implements the `Future` trait, resolving to the value that
76/// was given to the `Sender` handle.
77///
78/// Each half can be separately owned and sent across threads/tasks.
79///
80/// # Examples
81///
82/// ```
83/// extern crate tokio_channel;
84/// extern crate futures;
85///
86/// use tokio_channel::oneshot;
87/// use futures::*;
88/// use std::thread;
89///
90/// # fn main() {
91/// let (p, c) = oneshot::channel::<i32>();
92///
93/// thread::spawn(|| {
94///     c.map(|i| {
95///         println!("got: {}", i);
96///     }).wait();
97/// });
98///
99/// p.send(3).unwrap();
100/// # }
101/// ```
102pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
103    let inner = Arc::new(Inner::new());
104    let receiver = Receiver {
105        inner: inner.clone(),
106    };
107    let sender = Sender {
108        inner: inner,
109    };
110    (sender, receiver)
111}
112
113impl<T> Inner<T> {
114    fn new() -> Inner<T> {
115        Inner {
116            complete: AtomicBool::new(false),
117            data: Lock::new(None),
118            rx_task: Lock::new(None),
119            tx_task: Lock::new(None),
120        }
121    }
122
123    fn send(&self, t: T) -> Result<(), T> {
124        if self.complete.load(SeqCst) {
125            return Err(t)
126        }
127
128        // Note that this lock acquisition may fail if the receiver
129        // is closed and sets the `complete` flag to true, whereupon
130        // the receiver may call `poll()`.
131        if let Some(mut slot) = self.data.try_lock() {
132            assert!(slot.is_none());
133            *slot = Some(t);
134            drop(slot);
135
136            // If the receiver called `close()` between the check at the
137            // start of the function, and the lock being released, then
138            // the receiver may not be around to receive it, so try to
139            // pull it back out.
140            if self.complete.load(SeqCst) {
141                // If lock acquisition fails, then receiver is actually
142                // receiving it, so we're good.
143                if let Some(mut slot) = self.data.try_lock() {
144                    if let Some(t) = slot.take() {
145                        return Err(t);
146                    }
147                }
148            }
149            Ok(())
150        } else {
151            // Must have been closed
152            Err(t)
153        }
154    }
155
156    fn poll_cancel(&self) -> Poll<(), ()> {
157        // Fast path up first, just read the flag and see if our other half is
158        // gone. This flag is set both in our destructor and the oneshot
159        // destructor, but our destructor hasn't run yet so if it's set then the
160        // oneshot is gone.
161        if self.complete.load(SeqCst) {
162            return Ok(Async::Ready(()))
163        }
164
165        // If our other half is not gone then we need to park our current task
166        // and move it into the `notify_cancel` slot to get notified when it's
167        // actually gone.
168        //
169        // If `try_lock` fails, then the `Receiver` is in the process of using
170        // it, so we can deduce that it's now in the process of going away and
171        // hence we're canceled. If it succeeds then we just store our handle.
172        //
173        // Crucially we then check `oneshot_gone` *again* before we return.
174        // While we were storing our handle inside `notify_cancel` the `Receiver`
175        // may have been dropped. The first thing it does is set the flag, and
176        // if it fails to acquire the lock it assumes that we'll see the flag
177        // later on. So... we then try to see the flag later on!
178        let handle = task::current();
179        match self.tx_task.try_lock() {
180            Some(mut p) => *p = Some(handle),
181            None => return Ok(Async::Ready(())),
182        }
183        if self.complete.load(SeqCst) {
184            Ok(Async::Ready(()))
185        } else {
186            Ok(Async::NotReady)
187        }
188    }
189
190    fn is_canceled(&self) -> bool {
191        self.complete.load(SeqCst)
192    }
193
194    fn drop_tx(&self) {
195        // Flag that we're a completed `Sender` and try to wake up a receiver.
196        // Whether or not we actually stored any data will get picked up and
197        // translated to either an item or cancellation.
198        //
199        // Note that if we fail to acquire the `rx_task` lock then that means
200        // we're in one of two situations:
201        //
202        // 1. The receiver is trying to block in `poll`
203        // 2. The receiver is being dropped
204        //
205        // In the first case it'll check the `complete` flag after it's done
206        // blocking to see if it succeeded. In the latter case we don't need to
207        // wake up anyone anyway. So in both cases it's ok to ignore the `None`
208        // case of `try_lock` and bail out.
209        //
210        // The first case crucially depends on `Lock` using `SeqCst` ordering
211        // under the hood. If it instead used `Release` / `Acquire` ordering,
212        // then it would not necessarily synchronize with `inner.complete`
213        // and deadlock might be possible, as was observed in
214        // https://github.com/rust-lang-nursery/futures-rs/pull/219.
215        self.complete.store(true, SeqCst);
216        if let Some(mut slot) = self.rx_task.try_lock() {
217            if let Some(task) = slot.take() {
218                drop(slot);
219                task.notify();
220            }
221        }
222    }
223
224    fn close_rx(&self) {
225        // Flag our completion and then attempt to wake up the sender if it's
226        // blocked. See comments in `drop` below for more info
227        self.complete.store(true, SeqCst);
228        if let Some(mut handle) = self.tx_task.try_lock() {
229            if let Some(task) = handle.take() {
230                drop(handle);
231                task.notify()
232            }
233        }
234    }
235
236    fn recv(&self) -> Poll<T, Canceled> {
237        let mut done = false;
238
239        // Check to see if some data has arrived. If it hasn't then we need to
240        // block our task.
241        //
242        // Note that the acquisition of the `rx_task` lock might fail below, but
243        // the only situation where this can happen is during `Sender::drop`
244        // when we are indeed completed already. If that's happening then we
245        // know we're completed so keep going.
246        if self.complete.load(SeqCst) {
247            done = true;
248        } else {
249            let task = task::current();
250            match self.rx_task.try_lock() {
251                Some(mut slot) => *slot = Some(task),
252                None => done = true,
253            }
254        }
255
256        // If we're `done` via one of the paths above, then look at the data and
257        // figure out what the answer is. If, however, we stored `rx_task`
258        // successfully above we need to check again if we're completed in case
259        // a message was sent while `rx_task` was locked and couldn't notify us
260        // otherwise.
261        //
262        // If we're not done, and we're not complete, though, then we've
263        // successfully blocked our task and we return `NotReady`.
264        if done || self.complete.load(SeqCst) {
265            // If taking the lock fails, the sender will realise that the we're
266            // `done` when it checks the `complete` flag on the way out, and will
267            // treat the send as a failure.
268            if let Some(mut slot) = self.data.try_lock() {
269                if let Some(data) = slot.take() {
270                    return Ok(data.into());
271                }
272            }
273            Err(Canceled)
274        } else {
275            Ok(Async::NotReady)
276        }
277    }
278
279    fn drop_rx(&self) {
280        // Indicate to the `Sender` that we're done, so any future calls to
281        // `poll_cancel` are weeded out.
282        self.complete.store(true, SeqCst);
283
284        // If we've blocked a task then there's no need for it to stick around,
285        // so we need to drop it. If this lock acquisition fails, though, then
286        // it's just because our `Sender` is trying to take the task, so we
287        // let them take care of that.
288        if let Some(mut slot) = self.rx_task.try_lock() {
289            let task = slot.take();
290            drop(slot);
291            drop(task);
292        }
293
294        // Finally, if our `Sender` wants to get notified of us going away, it
295        // would have stored something in `tx_task`. Here we try to peel that
296        // out and unpark it.
297        //
298        // Note that the `try_lock` here may fail, but only if the `Sender` is
299        // in the process of filling in the task. If that happens then we
300        // already flagged `complete` and they'll pick that up above.
301        if let Some(mut handle) = self.tx_task.try_lock() {
302            if let Some(task) = handle.take() {
303                drop(handle);
304                task.notify()
305            }
306        }
307    }
308}
309
310impl<T> Sender<T> {
311    #[deprecated(note = "renamed to `send`", since = "0.1.11")]
312    #[doc(hidden)]
313    #[cfg(feature = "with-deprecated")]
314    pub fn complete(self, t: T) {
315        drop(self.send(t));
316    }
317
318    /// Completes this oneshot with a successful result.
319    ///
320    /// This function will consume `self` and indicate to the other end, the
321    /// `Receiver`, that the value provided is the result of the computation this
322    /// represents.
323    ///
324    /// If the value is successfully enqueued for the remote end to receive,
325    /// then `Ok(())` is returned. If the receiving end was deallocated before
326    /// this function was called, however, then `Err` is returned with the value
327    /// provided.
328    pub fn send(self, t: T) -> Result<(), T> {
329        self.inner.send(t)
330    }
331
332    /// Polls this `Sender` half to detect whether the `Receiver` this has
333    /// paired with has gone away.
334    ///
335    /// This function can be used to learn about when the `Receiver` (consumer)
336    /// half has gone away and nothing will be able to receive a message sent
337    /// from `send`.
338    ///
339    /// If `Ready` is returned then it means that the `Receiver` has disappeared
340    /// and the result this `Sender` would otherwise produce should no longer
341    /// be produced.
342    ///
343    /// If `NotReady` is returned then the `Receiver` is still alive and may be
344    /// able to receive a message if sent. The current task, however, is
345    /// scheduled to receive a notification if the corresponding `Receiver` goes
346    /// away.
347    ///
348    /// # Panics
349    ///
350    /// Like `Future::poll`, this function will panic if it's not called from
351    /// within the context of a task. In other words, this should only ever be
352    /// called from inside another future.
353    ///
354    /// If you're calling this function from a context that does not have a
355    /// task, then you can use the `is_canceled` API instead.
356    pub fn poll_cancel(&mut self) -> Poll<(), ()> {
357        self.inner.poll_cancel()
358    }
359
360    /// Tests to see whether this `Sender`'s corresponding `Receiver`
361    /// has gone away.
362    ///
363    /// This function can be used to learn about when the `Receiver` (consumer)
364    /// half has gone away and nothing will be able to receive a message sent
365    /// from `send`.
366    ///
367    /// Note that this function is intended to *not* be used in the context of a
368    /// future. If you're implementing a future you probably want to call the
369    /// `poll_cancel` function which will block the current task if the
370    /// cancellation hasn't happened yet. This can be useful when working on a
371    /// non-futures related thread, though, which would otherwise panic if
372    /// `poll_cancel` were called.
373    pub fn is_canceled(&self) -> bool {
374        self.inner.is_canceled()
375    }
376}
377
378impl<T> Drop for Sender<T> {
379    fn drop(&mut self) {
380        self.inner.drop_tx()
381    }
382}
383
384/// Error returned from a `Receiver<T>` whenever the corresponding `Sender<T>`
385/// is dropped.
386#[derive(Clone, Copy, PartialEq, Eq, Debug)]
387pub struct Canceled;
388
389impl fmt::Display for Canceled {
390    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
391        write!(fmt, "oneshot canceled")
392    }
393}
394
395impl Error for Canceled {
396    fn description(&self) -> &str {
397        "oneshot canceled"
398    }
399}
400
401impl<T> Receiver<T> {
402    /// Gracefully close this receiver, preventing sending any future messages.
403    ///
404    /// Any `send` operation which happens after this method returns is
405    /// guaranteed to fail. Once this method is called the normal `poll` method
406    /// can be used to determine whether a message was actually sent or not. If
407    /// `Canceled` is returned from `poll` then no message was sent.
408    pub fn close(&mut self) {
409        self.inner.close_rx()
410    }
411}
412
413impl<T> Future for Receiver<T> {
414    type Item = T;
415    type Error = Canceled;
416
417    fn poll(&mut self) -> Poll<T, Canceled> {
418        self.inner.recv()
419    }
420}
421
422impl<T> Drop for Receiver<T> {
423    fn drop(&mut self) {
424        self.inner.drop_rx()
425    }
426}