futures_channel/oneshot.rs
1//! A channel for sending a single message between asynchronous tasks.
2
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::Ordering::SeqCst;
6use std::error::Error;
7use std::fmt;
8
9use futures_core::{Future, Poll, Async};
10use futures_core::task::{self, Waker};
11use futures_core::never::Never;
12
13use lock::Lock;
14
15/// A future for a value that will be provided by another asynchronous task.
16///
17/// This is created by the [`channel`](channel) function.
18#[must_use = "futures do nothing unless polled"]
19#[derive(Debug)]
20pub struct Receiver<T> {
21 inner: Arc<Inner<T>>,
22}
23
24/// A means of transmitting a single value to another task.
25///
26/// This is created by the [`channel`](channel) function.
27#[derive(Debug)]
28pub struct Sender<T> {
29 inner: Arc<Inner<T>>,
30}
31
32/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
33/// the internal synchronization between the two for send/recv operations.
34#[derive(Debug)]
35struct Inner<T> {
36 /// Indicates whether this oneshot is complete yet. This is filled in both
37 /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
38 /// appropriately.
39 ///
40 /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
41 /// unlocked and ready to be inspected.
42 ///
43 /// For `Sender` if this is `true` then the oneshot has gone away and it
44 /// can return ready from `poll_cancel`.
45 complete: AtomicBool,
46
47 /// The actual data being transferred as part of this `Receiver`. This is
48 /// filled in by `Sender::complete` and read by `Receiver::poll`.
49 ///
50 /// Note that this is protected by `Lock`, but it is in theory safe to
51 /// replace with an `UnsafeCell` as it's actually protected by `complete`
52 /// above. I wouldn't recommend doing this, however, unless someone is
53 /// supremely confident in the various atomic orderings here and there.
54 data: Lock<Option<T>>,
55
56 /// Field to store the task which is blocked in `Receiver::poll`.
57 ///
58 /// This is filled in when a oneshot is polled but not ready yet. Note that
59 /// the `Lock` here, unlike in `data` above, is important to resolve races.
60 /// Both the `Receiver` and the `Sender` halves understand that if they
61 /// can't acquire the lock then some important interference is happening.
62 rx_task: Lock<Option<Waker>>,
63
64 /// Like `rx_task` above, except for the task blocked in
65 /// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`.
66 tx_task: Lock<Option<Waker>>,
67}
68
69/// Creates a new one-shot channel for sending values across asynchronous tasks.
70///
71/// This function is similar to Rust's channel constructor found in the standard library.
72/// Two halves are returned, the first of which is a `Sender` handle, used to
73/// signal the end of a computation and provide its value. The second half is a
74/// `Receiver` which implements the `Future` trait, resolving to the value that
75/// was given to the `Sender` handle.
76///
77/// Each half can be separately owned and sent across tasks.
78///
79/// # Examples
80///
81/// ```
82/// extern crate futures;
83/// extern crate futures_channel;
84///
85/// use std::thread;
86///
87/// use futures_channel::oneshot;
88/// use futures::*;
89///
90/// fn main() {
91/// let (p, c) = oneshot::channel::<i32>();
92///
93/// # let t =
94/// thread::spawn(|| {
95/// let future = c.map(|i| {
96/// println!("got: {}", i);
97/// });
98/// // ...
99/// # return future;
100/// });
101///
102/// p.send(3).unwrap();
103/// # t.join().unwrap();
104/// }
105/// ```
106pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
107 let inner = Arc::new(Inner::new());
108 let receiver = Receiver {
109 inner: inner.clone(),
110 };
111 let sender = Sender {
112 inner: inner,
113 };
114 (sender, receiver)
115}
116
117impl<T> Inner<T> {
118 fn new() -> Inner<T> {
119 Inner {
120 complete: AtomicBool::new(false),
121 data: Lock::new(None),
122 rx_task: Lock::new(None),
123 tx_task: Lock::new(None),
124 }
125 }
126
127 fn send(&self, t: T) -> Result<(), T> {
128 if self.complete.load(SeqCst) {
129 return Err(t)
130 }
131
132 // Note that this lock acquisition may fail if the receiver
133 // is closed and sets the `complete` flag to true, whereupon
134 // the receiver may call `poll()`.
135 if let Some(mut slot) = self.data.try_lock() {
136 assert!(slot.is_none());
137 *slot = Some(t);
138 drop(slot);
139
140 // If the receiver called `close()` between the check at the
141 // start of the function, and the lock being released, then
142 // the receiver may not be around to receive it, so try to
143 // pull it back out.
144 if self.complete.load(SeqCst) {
145 // If lock acquisition fails, then receiver is actually
146 // receiving it, so we're good.
147 if let Some(mut slot) = self.data.try_lock() {
148 if let Some(t) = slot.take() {
149 return Err(t);
150 }
151 }
152 }
153 Ok(())
154 } else {
155 // Must have been closed
156 Err(t)
157 }
158 }
159
160 fn poll_cancel(&self, cx: &mut task::Context) -> Poll<(), Never> {
161 // Fast path up first, just read the flag and see if our other half is
162 // gone. This flag is set both in our destructor and the oneshot
163 // destructor, but our destructor hasn't run yet so if it's set then the
164 // oneshot is gone.
165 if self.complete.load(SeqCst) {
166 return Ok(Async::Ready(()))
167 }
168
169 // If our other half is not gone then we need to park our current task
170 // and move it into the `notify_cancel` slot to get notified when it's
171 // actually gone.
172 //
173 // If `try_lock` fails, then the `Receiver` is in the process of using
174 // it, so we can deduce that it's now in the process of going away and
175 // hence we're canceled. If it succeeds then we just store our handle.
176 //
177 // Crucially we then check `oneshot_gone` *again* before we return.
178 // While we were storing our handle inside `notify_cancel` the `Receiver`
179 // may have been dropped. The first thing it does is set the flag, and
180 // if it fails to acquire the lock it assumes that we'll see the flag
181 // later on. So... we then try to see the flag later on!
182 let handle = cx.waker().clone();
183 match self.tx_task.try_lock() {
184 Some(mut p) => *p = Some(handle),
185 None => return Ok(Async::Ready(())),
186 }
187 if self.complete.load(SeqCst) {
188 Ok(Async::Ready(()))
189 } else {
190 Ok(Async::Pending)
191 }
192 }
193
194 fn is_canceled(&self) -> bool {
195 self.complete.load(SeqCst)
196 }
197
198 fn drop_tx(&self) {
199 // Flag that we're a completed `Sender` and try to wake up a receiver.
200 // Whether or not we actually stored any data will get picked up and
201 // translated to either an item or cancellation.
202 //
203 // Note that if we fail to acquire the `rx_task` lock then that means
204 // we're in one of two situations:
205 //
206 // 1. The receiver is trying to block in `poll`
207 // 2. The receiver is being dropped
208 //
209 // In the first case it'll check the `complete` flag after it's done
210 // blocking to see if it succeeded. In the latter case we don't need to
211 // wake up anyone anyway. So in both cases it's ok to ignore the `None`
212 // case of `try_lock` and bail out.
213 //
214 // The first case crucially depends on `Lock` using `SeqCst` ordering
215 // under the hood. If it instead used `Release` / `Acquire` ordering,
216 // then it would not necessarily synchronize with `inner.complete`
217 // and deadlock might be possible, as was observed in
218 // https://github.com/rust-lang-nursery/futures-rs/pull/219.
219 self.complete.store(true, SeqCst);
220 if let Some(mut slot) = self.rx_task.try_lock() {
221 if let Some(task) = slot.take() {
222 drop(slot);
223 task.wake();
224 }
225 }
226 }
227
228 fn close_rx(&self) {
229 // Flag our completion and then attempt to wake up the sender if it's
230 // blocked. See comments in `drop` below for more info
231 self.complete.store(true, SeqCst);
232 if let Some(mut handle) = self.tx_task.try_lock() {
233 if let Some(task) = handle.take() {
234 drop(handle);
235 task.wake()
236 }
237 }
238 }
239
240 fn try_recv(&self) -> Result<Option<T>, Canceled> {
241 // If we're complete, either `::close_rx` or `::drop_tx` was called.
242 // We can assume a successful send if data is present.
243 if self.complete.load(SeqCst) {
244 if let Some(mut slot) = self.data.try_lock() {
245 if let Some(data) = slot.take() {
246 return Ok(Some(data.into()));
247 }
248 }
249 Err(Canceled)
250 } else {
251 Ok(None)
252 }
253 }
254
255 fn recv(&self, cx: &mut task::Context) -> Poll<T, Canceled> {
256 let mut done = false;
257
258 // Check to see if some data has arrived. If it hasn't then we need to
259 // block our task.
260 //
261 // Note that the acquisition of the `rx_task` lock might fail below, but
262 // the only situation where this can happen is during `Sender::drop`
263 // when we are indeed completed already. If that's happening then we
264 // know we're completed so keep going.
265 if self.complete.load(SeqCst) {
266 done = true;
267 } else {
268 let task = cx.waker().clone();
269 match self.rx_task.try_lock() {
270 Some(mut slot) => *slot = Some(task),
271 None => done = true,
272 }
273 }
274
275 // If we're `done` via one of the paths above, then look at the data and
276 // figure out what the answer is. If, however, we stored `rx_task`
277 // successfully above we need to check again if we're completed in case
278 // a message was sent while `rx_task` was locked and couldn't notify us
279 // otherwise.
280 //
281 // If we're not done, and we're not complete, though, then we've
282 // successfully blocked our task and we return `Pending`.
283 if done || self.complete.load(SeqCst) {
284 // If taking the lock fails, the sender will realise that the we're
285 // `done` when it checks the `complete` flag on the way out, and will
286 // treat the send as a failure.
287 if let Some(mut slot) = self.data.try_lock() {
288 if let Some(data) = slot.take() {
289 return Ok(data.into());
290 }
291 }
292 Err(Canceled)
293 } else {
294 Ok(Async::Pending)
295 }
296 }
297
298 fn drop_rx(&self) {
299 // Indicate to the `Sender` that we're done, so any future calls to
300 // `poll_cancel` are weeded out.
301 self.complete.store(true, SeqCst);
302
303 // If we've blocked a task then there's no need for it to stick around,
304 // so we need to drop it. If this lock acquisition fails, though, then
305 // it's just because our `Sender` is trying to take the task, so we
306 // let them take care of that.
307 if let Some(mut slot) = self.rx_task.try_lock() {
308 let task = slot.take();
309 drop(slot);
310 drop(task);
311 }
312
313 // Finally, if our `Sender` wants to get notified of us going away, it
314 // would have stored something in `tx_task`. Here we try to peel that
315 // out and unpark it.
316 //
317 // Note that the `try_lock` here may fail, but only if the `Sender` is
318 // in the process of filling in the task. If that happens then we
319 // already flagged `complete` and they'll pick that up above.
320 if let Some(mut handle) = self.tx_task.try_lock() {
321 if let Some(task) = handle.take() {
322 drop(handle);
323 task.wake()
324 }
325 }
326 }
327}
328
329impl<T> Sender<T> {
330 /// Completes this oneshot with a successful result.
331 ///
332 /// This function will consume `self` and indicate to the other end, the
333 /// [`Receiver`](Receiver), that the value provided is the result of the
334 /// computation this represents.
335 ///
336 /// If the value is successfully enqueued for the remote end to receive,
337 /// then `Ok(())` is returned. If the receiving end was dropped before
338 /// this function was called, however, then `Err` is returned with the value
339 /// provided.
340 pub fn send(self, t: T) -> Result<(), T> {
341 self.inner.send(t)
342 }
343
344 /// Polls this `Sender` half to detect whether its associated
345 /// [`Receiver`](Receiver) with has been dropped.
346 ///
347 /// # Return values
348 ///
349 /// If `Ok(Ready)` is returned then the associated `Receiver` has been
350 /// dropped, which means any work required for sending should be canceled.
351 ///
352 /// If `Ok(Pending)` is returned then the associated `Receiver` is still
353 /// alive and may be able to receive a message if sent. The current task,
354 /// however, is scheduled to receive a notification if the corresponding
355 /// `Receiver` goes away.
356 pub fn poll_cancel(&mut self, cx: &mut task::Context) -> Poll<(), Never> {
357 self.inner.poll_cancel(cx)
358 }
359
360 /// Tests to see whether this `Sender`'s corresponding `Receiver`
361 /// has been dropped.
362 ///
363 /// Unlike [`poll_cancel`](Sender::poll_cancel), this function does not
364 /// enqueue a task for wakeup upon cancellation, but merely reports the
365 /// current state, which may be subject to concurrent modification.
366 pub fn is_canceled(&self) -> bool {
367 self.inner.is_canceled()
368 }
369}
370
371impl<T> Drop for Sender<T> {
372 fn drop(&mut self) {
373 self.inner.drop_tx()
374 }
375}
376
377/// Error returned from a [`Receiver`](Receiver) when the corresponding
378/// [`Sender`](Sender) is dropped.
379#[derive(Clone, Copy, PartialEq, Eq, Debug)]
380pub struct Canceled;
381
382impl fmt::Display for Canceled {
383 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
384 write!(fmt, "oneshot canceled")
385 }
386}
387
388impl Error for Canceled {
389 fn description(&self) -> &str {
390 "oneshot canceled"
391 }
392}
393
394impl<T> Receiver<T> {
395 /// Gracefully close this receiver, preventing any subsequent attempts to
396 /// send to it.
397 ///
398 /// Any `send` operation which happens after this method returns is
399 /// guaranteed to fail. After calling this method, you can use
400 /// [`Receiver::poll`](Receiver::poll) to determine whether a message had
401 /// previously been sent.
402 pub fn close(&mut self) {
403 self.inner.close_rx()
404 }
405
406 /// Attempts to receive a message outside of the context of a task.
407 ///
408 /// Useful when a [`Context`](Context) is not available such as within a
409 /// `Drop` impl.
410 ///
411 /// Does not schedule a task wakeup or have any other side effects.
412 ///
413 /// A return value of `None` must be considered immediately stale (out of
414 /// date) unless [`::close`](Receiver::close) has been called first.
415 ///
416 /// Returns an error if the sender was dropped.
417 pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
418 self.inner.try_recv()
419 }
420}
421
422impl<T> Future for Receiver<T> {
423 type Item = T;
424 type Error = Canceled;
425
426 fn poll(&mut self, cx: &mut task::Context) -> Poll<T, Canceled> {
427 self.inner.recv(cx)
428 }
429}
430
431impl<T> Drop for Receiver<T> {
432 fn drop(&mut self) {
433 self.inner.drop_rx()
434 }
435}