futures_channel/
oneshot.rs

1//! A channel for sending a single message between asynchronous tasks.
2
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::Ordering::SeqCst;
6use std::error::Error;
7use std::fmt;
8
9use futures_core::{Future, Poll, Async};
10use futures_core::task::{self, Waker};
11use futures_core::never::Never;
12
13use lock::Lock;
14
15/// A future for a value that will be provided by another asynchronous task.
16///
17/// This is created by the [`channel`](channel) function.
18#[must_use = "futures do nothing unless polled"]
19#[derive(Debug)]
20pub struct Receiver<T> {
21    inner: Arc<Inner<T>>,
22}
23
24/// A means of transmitting a single value to another task.
25///
26/// This is created by the [`channel`](channel) function.
27#[derive(Debug)]
28pub struct Sender<T> {
29    inner: Arc<Inner<T>>,
30}
31
32/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
33/// the internal synchronization between the two for send/recv operations.
34#[derive(Debug)]
35struct Inner<T> {
36    /// Indicates whether this oneshot is complete yet. This is filled in both
37    /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
38    /// appropriately.
39    ///
40    /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
41    /// unlocked and ready to be inspected.
42    ///
43    /// For `Sender` if this is `true` then the oneshot has gone away and it
44    /// can return ready from `poll_cancel`.
45    complete: AtomicBool,
46
47    /// The actual data being transferred as part of this `Receiver`. This is
48    /// filled in by `Sender::complete` and read by `Receiver::poll`.
49    ///
50    /// Note that this is protected by `Lock`, but it is in theory safe to
51    /// replace with an `UnsafeCell` as it's actually protected by `complete`
52    /// above. I wouldn't recommend doing this, however, unless someone is
53    /// supremely confident in the various atomic orderings here and there.
54    data: Lock<Option<T>>,
55
56    /// Field to store the task which is blocked in `Receiver::poll`.
57    ///
58    /// This is filled in when a oneshot is polled but not ready yet. Note that
59    /// the `Lock` here, unlike in `data` above, is important to resolve races.
60    /// Both the `Receiver` and the `Sender` halves understand that if they
61    /// can't acquire the lock then some important interference is happening.
62    rx_task: Lock<Option<Waker>>,
63
64    /// Like `rx_task` above, except for the task blocked in
65    /// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`.
66    tx_task: Lock<Option<Waker>>,
67}
68
69/// Creates a new one-shot channel for sending values across asynchronous tasks.
70///
71/// This function is similar to Rust's channel constructor found in the standard library.
72/// Two halves are returned, the first of which is a `Sender` handle, used to
73/// signal the end of a computation and provide its value. The second half is a
74/// `Receiver` which implements the `Future` trait, resolving to the value that
75/// was given to the `Sender` handle.
76///
77/// Each half can be separately owned and sent across tasks.
78///
79/// # Examples
80///
81/// ```
82/// extern crate futures;
83/// extern crate futures_channel;
84///
85/// use std::thread;
86///
87/// use futures_channel::oneshot;
88/// use futures::*;
89///
90/// fn main() {
91///     let (p, c) = oneshot::channel::<i32>();
92///
93/// # let t =
94///     thread::spawn(|| {
95///         let future = c.map(|i| {
96///             println!("got: {}", i);
97///         });
98///         // ...
99/// # return future;
100///     });
101///
102///     p.send(3).unwrap();
103/// # t.join().unwrap();
104/// }
105/// ```
106pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
107    let inner = Arc::new(Inner::new());
108    let receiver = Receiver {
109        inner: inner.clone(),
110    };
111    let sender = Sender {
112        inner: inner,
113    };
114    (sender, receiver)
115}
116
117impl<T> Inner<T> {
118    fn new() -> Inner<T> {
119        Inner {
120            complete: AtomicBool::new(false),
121            data: Lock::new(None),
122            rx_task: Lock::new(None),
123            tx_task: Lock::new(None),
124        }
125    }
126
127    fn send(&self, t: T) -> Result<(), T> {
128        if self.complete.load(SeqCst) {
129            return Err(t)
130        }
131
132        // Note that this lock acquisition may fail if the receiver
133        // is closed and sets the `complete` flag to true, whereupon
134        // the receiver may call `poll()`.
135        if let Some(mut slot) = self.data.try_lock() {
136            assert!(slot.is_none());
137            *slot = Some(t);
138            drop(slot);
139
140            // If the receiver called `close()` between the check at the
141            // start of the function, and the lock being released, then
142            // the receiver may not be around to receive it, so try to
143            // pull it back out.
144            if self.complete.load(SeqCst) {
145                // If lock acquisition fails, then receiver is actually
146                // receiving it, so we're good.
147                if let Some(mut slot) = self.data.try_lock() {
148                    if let Some(t) = slot.take() {
149                        return Err(t);
150                    }
151                }
152            }
153            Ok(())
154        } else {
155            // Must have been closed
156            Err(t)
157        }
158    }
159
160    fn poll_cancel(&self, cx: &mut task::Context) -> Poll<(), Never> {
161        // Fast path up first, just read the flag and see if our other half is
162        // gone. This flag is set both in our destructor and the oneshot
163        // destructor, but our destructor hasn't run yet so if it's set then the
164        // oneshot is gone.
165        if self.complete.load(SeqCst) {
166            return Ok(Async::Ready(()))
167        }
168
169        // If our other half is not gone then we need to park our current task
170        // and move it into the `notify_cancel` slot to get notified when it's
171        // actually gone.
172        //
173        // If `try_lock` fails, then the `Receiver` is in the process of using
174        // it, so we can deduce that it's now in the process of going away and
175        // hence we're canceled. If it succeeds then we just store our handle.
176        //
177        // Crucially we then check `oneshot_gone` *again* before we return.
178        // While we were storing our handle inside `notify_cancel` the `Receiver`
179        // may have been dropped. The first thing it does is set the flag, and
180        // if it fails to acquire the lock it assumes that we'll see the flag
181        // later on. So... we then try to see the flag later on!
182        let handle = cx.waker().clone();
183        match self.tx_task.try_lock() {
184            Some(mut p) => *p = Some(handle),
185            None => return Ok(Async::Ready(())),
186        }
187        if self.complete.load(SeqCst) {
188            Ok(Async::Ready(()))
189        } else {
190            Ok(Async::Pending)
191        }
192    }
193
194    fn is_canceled(&self) -> bool {
195        self.complete.load(SeqCst)
196    }
197
198    fn drop_tx(&self) {
199        // Flag that we're a completed `Sender` and try to wake up a receiver.
200        // Whether or not we actually stored any data will get picked up and
201        // translated to either an item or cancellation.
202        //
203        // Note that if we fail to acquire the `rx_task` lock then that means
204        // we're in one of two situations:
205        //
206        // 1. The receiver is trying to block in `poll`
207        // 2. The receiver is being dropped
208        //
209        // In the first case it'll check the `complete` flag after it's done
210        // blocking to see if it succeeded. In the latter case we don't need to
211        // wake up anyone anyway. So in both cases it's ok to ignore the `None`
212        // case of `try_lock` and bail out.
213        //
214        // The first case crucially depends on `Lock` using `SeqCst` ordering
215        // under the hood. If it instead used `Release` / `Acquire` ordering,
216        // then it would not necessarily synchronize with `inner.complete`
217        // and deadlock might be possible, as was observed in
218        // https://github.com/rust-lang-nursery/futures-rs/pull/219.
219        self.complete.store(true, SeqCst);
220        if let Some(mut slot) = self.rx_task.try_lock() {
221            if let Some(task) = slot.take() {
222                drop(slot);
223                task.wake();
224            }
225        }
226    }
227
228    fn close_rx(&self) {
229        // Flag our completion and then attempt to wake up the sender if it's
230        // blocked. See comments in `drop` below for more info
231        self.complete.store(true, SeqCst);
232        if let Some(mut handle) = self.tx_task.try_lock() {
233            if let Some(task) = handle.take() {
234                drop(handle);
235                task.wake()
236            }
237        }
238    }
239
240    fn try_recv(&self) -> Result<Option<T>, Canceled> {
241        // If we're complete, either `::close_rx` or `::drop_tx` was called.
242        // We can assume a successful send if data is present.
243        if self.complete.load(SeqCst) {
244            if let Some(mut slot) = self.data.try_lock() {
245                if let Some(data) = slot.take() {
246                    return Ok(Some(data.into()));
247                }
248            }
249            Err(Canceled)
250        } else {
251            Ok(None)
252        }
253    }
254
255    fn recv(&self, cx: &mut task::Context) -> Poll<T, Canceled> {
256        let mut done = false;
257
258        // Check to see if some data has arrived. If it hasn't then we need to
259        // block our task.
260        //
261        // Note that the acquisition of the `rx_task` lock might fail below, but
262        // the only situation where this can happen is during `Sender::drop`
263        // when we are indeed completed already. If that's happening then we
264        // know we're completed so keep going.
265        if self.complete.load(SeqCst) {
266            done = true;
267        } else {
268            let task = cx.waker().clone();
269            match self.rx_task.try_lock() {
270                Some(mut slot) => *slot = Some(task),
271                None => done = true,
272            }
273        }
274
275        // If we're `done` via one of the paths above, then look at the data and
276        // figure out what the answer is. If, however, we stored `rx_task`
277        // successfully above we need to check again if we're completed in case
278        // a message was sent while `rx_task` was locked and couldn't notify us
279        // otherwise.
280        //
281        // If we're not done, and we're not complete, though, then we've
282        // successfully blocked our task and we return `Pending`.
283        if done || self.complete.load(SeqCst) {
284            // If taking the lock fails, the sender will realise that the we're
285            // `done` when it checks the `complete` flag on the way out, and will
286            // treat the send as a failure.
287            if let Some(mut slot) = self.data.try_lock() {
288                if let Some(data) = slot.take() {
289                    return Ok(data.into());
290                }
291            }
292            Err(Canceled)
293        } else {
294            Ok(Async::Pending)
295        }
296    }
297
298    fn drop_rx(&self) {
299        // Indicate to the `Sender` that we're done, so any future calls to
300        // `poll_cancel` are weeded out.
301        self.complete.store(true, SeqCst);
302
303        // If we've blocked a task then there's no need for it to stick around,
304        // so we need to drop it. If this lock acquisition fails, though, then
305        // it's just because our `Sender` is trying to take the task, so we
306        // let them take care of that.
307        if let Some(mut slot) = self.rx_task.try_lock() {
308            let task = slot.take();
309            drop(slot);
310            drop(task);
311        }
312
313        // Finally, if our `Sender` wants to get notified of us going away, it
314        // would have stored something in `tx_task`. Here we try to peel that
315        // out and unpark it.
316        //
317        // Note that the `try_lock` here may fail, but only if the `Sender` is
318        // in the process of filling in the task. If that happens then we
319        // already flagged `complete` and they'll pick that up above.
320        if let Some(mut handle) = self.tx_task.try_lock() {
321            if let Some(task) = handle.take() {
322                drop(handle);
323                task.wake()
324            }
325        }
326    }
327}
328
329impl<T> Sender<T> {
330    /// Completes this oneshot with a successful result.
331    ///
332    /// This function will consume `self` and indicate to the other end, the
333    /// [`Receiver`](Receiver), that the value provided is the result of the
334    /// computation this represents.
335    ///
336    /// If the value is successfully enqueued for the remote end to receive,
337    /// then `Ok(())` is returned. If the receiving end was dropped before
338    /// this function was called, however, then `Err` is returned with the value
339    /// provided.
340    pub fn send(self, t: T) -> Result<(), T> {
341        self.inner.send(t)
342    }
343
344    /// Polls this `Sender` half to detect whether its associated
345    /// [`Receiver`](Receiver) with has been dropped.
346    ///
347    /// # Return values
348    ///
349    /// If `Ok(Ready)` is returned then the associated `Receiver` has been
350    /// dropped, which means any work required for sending should be canceled.
351    ///
352    /// If `Ok(Pending)` is returned then the associated `Receiver` is still
353    /// alive and may be able to receive a message if sent. The current task,
354    /// however, is scheduled to receive a notification if the corresponding
355    /// `Receiver` goes away.
356    pub fn poll_cancel(&mut self, cx: &mut task::Context) -> Poll<(), Never> {
357        self.inner.poll_cancel(cx)
358    }
359
360    /// Tests to see whether this `Sender`'s corresponding `Receiver`
361    /// has been dropped.
362    ///
363    /// Unlike [`poll_cancel`](Sender::poll_cancel), this function does not
364    /// enqueue a task for wakeup upon cancellation, but merely reports the
365    /// current state, which may be subject to concurrent modification.
366    pub fn is_canceled(&self) -> bool {
367        self.inner.is_canceled()
368    }
369}
370
371impl<T> Drop for Sender<T> {
372    fn drop(&mut self) {
373        self.inner.drop_tx()
374    }
375}
376
377/// Error returned from a [`Receiver`](Receiver) when the corresponding
378/// [`Sender`](Sender) is dropped.
379#[derive(Clone, Copy, PartialEq, Eq, Debug)]
380pub struct Canceled;
381
382impl fmt::Display for Canceled {
383    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
384        write!(fmt, "oneshot canceled")
385    }
386}
387
388impl Error for Canceled {
389    fn description(&self) -> &str {
390        "oneshot canceled"
391    }
392}
393
394impl<T> Receiver<T> {
395    /// Gracefully close this receiver, preventing any subsequent attempts to
396    /// send to it.
397    ///
398    /// Any `send` operation which happens after this method returns is
399    /// guaranteed to fail. After calling this method, you can use
400    /// [`Receiver::poll`](Receiver::poll) to determine whether a message had
401    /// previously been sent.
402    pub fn close(&mut self) {
403        self.inner.close_rx()
404    }
405
406    /// Attempts to receive a message outside of the context of a task.
407    ///
408    /// Useful when a [`Context`](Context) is not available such as within a
409    /// `Drop` impl.
410    ///
411    /// Does not schedule a task wakeup or have any other side effects.
412    ///
413    /// A return value of `None` must be considered immediately stale (out of
414    /// date) unless [`::close`](Receiver::close) has been called first.
415    ///
416    /// Returns an error if the sender was dropped.
417    pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
418        self.inner.try_recv()
419    }
420}
421
422impl<T> Future for Receiver<T> {
423    type Item = T;
424    type Error = Canceled;
425
426    fn poll(&mut self, cx: &mut task::Context) -> Poll<T, Canceled> {
427        self.inner.recv(cx)
428    }
429}
430
431impl<T> Drop for Receiver<T> {
432    fn drop(&mut self) {
433        self.inner.drop_rx()
434    }
435}