tokio_channel/oneshot.rs
1//! A one-shot, futures-aware channel
2
3use lock::Lock;
4
5use futures::{Future, Poll, Async};
6use futures::task::{self, Task};
7
8use std::sync::Arc;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering::SeqCst;
11use std::error::Error;
12use std::fmt;
13
14/// A future representing the completion of a computation happening elsewhere in
15/// memory.
16///
17/// This is created by the `oneshot::channel` function.
18#[must_use = "futures do nothing unless polled"]
19#[derive(Debug)]
20pub struct Receiver<T> {
21 inner: Arc<Inner<T>>,
22}
23
24/// Represents the completion half of a oneshot through which the result of a
25/// computation is signaled.
26///
27/// This is created by the `oneshot::channel` function.
28#[derive(Debug)]
29pub struct Sender<T> {
30 inner: Arc<Inner<T>>,
31}
32
33/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
34/// the internal synchronization between the two for send/recv operations.
35#[derive(Debug)]
36struct Inner<T> {
37 /// Indicates whether this oneshot is complete yet. This is filled in both
38 /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
39 /// appropriately.
40 ///
41 /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
42 /// unlocked and ready to be inspected.
43 ///
44 /// For `Sender` if this is `true` then the oneshot has gone away and it
45 /// can return ready from `poll_cancel`.
46 complete: AtomicBool,
47
48 /// The actual data being transferred as part of this `Receiver`. This is
49 /// filled in by `Sender::complete` and read by `Receiver::poll`.
50 ///
51 /// Note that this is protected by `Lock`, but it is in theory safe to
52 /// replace with an `UnsafeCell` as it's actually protected by `complete`
53 /// above. I wouldn't recommend doing this, however, unless someone is
54 /// supremely confident in the various atomic orderings here and there.
55 data: Lock<Option<T>>,
56
57 /// Field to store the task which is blocked in `Receiver::poll`.
58 ///
59 /// This is filled in when a oneshot is polled but not ready yet. Note that
60 /// the `Lock` here, unlike in `data` above, is important to resolve races.
61 /// Both the `Receiver` and the `Sender` halves understand that if they
62 /// can't acquire the lock then some important interference is happening.
63 rx_task: Lock<Option<Task>>,
64
65 /// Like `rx_task` above, except for the task blocked in
66 /// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`.
67 tx_task: Lock<Option<Task>>,
68}
69
70/// Creates a new futures-aware, one-shot channel.
71///
72/// This function is similar to Rust's channels found in the standard library.
73/// Two halves are returned, the first of which is a `Sender` handle, used to
74/// signal the end of a computation and provide its value. The second half is a
75/// `Receiver` which implements the `Future` trait, resolving to the value that
76/// was given to the `Sender` handle.
77///
78/// Each half can be separately owned and sent across threads/tasks.
79///
80/// # Examples
81///
82/// ```
83/// extern crate tokio_channel;
84/// extern crate futures;
85///
86/// use tokio_channel::oneshot;
87/// use futures::*;
88/// use std::thread;
89///
90/// # fn main() {
91/// let (p, c) = oneshot::channel::<i32>();
92///
93/// thread::spawn(|| {
94/// c.map(|i| {
95/// println!("got: {}", i);
96/// }).wait();
97/// });
98///
99/// p.send(3).unwrap();
100/// # }
101/// ```
102pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
103 let inner = Arc::new(Inner::new());
104 let receiver = Receiver {
105 inner: inner.clone(),
106 };
107 let sender = Sender {
108 inner: inner,
109 };
110 (sender, receiver)
111}
112
113impl<T> Inner<T> {
114 fn new() -> Inner<T> {
115 Inner {
116 complete: AtomicBool::new(false),
117 data: Lock::new(None),
118 rx_task: Lock::new(None),
119 tx_task: Lock::new(None),
120 }
121 }
122
123 fn send(&self, t: T) -> Result<(), T> {
124 if self.complete.load(SeqCst) {
125 return Err(t)
126 }
127
128 // Note that this lock acquisition may fail if the receiver
129 // is closed and sets the `complete` flag to true, whereupon
130 // the receiver may call `poll()`.
131 if let Some(mut slot) = self.data.try_lock() {
132 assert!(slot.is_none());
133 *slot = Some(t);
134 drop(slot);
135
136 // If the receiver called `close()` between the check at the
137 // start of the function, and the lock being released, then
138 // the receiver may not be around to receive it, so try to
139 // pull it back out.
140 if self.complete.load(SeqCst) {
141 // If lock acquisition fails, then receiver is actually
142 // receiving it, so we're good.
143 if let Some(mut slot) = self.data.try_lock() {
144 if let Some(t) = slot.take() {
145 return Err(t);
146 }
147 }
148 }
149 Ok(())
150 } else {
151 // Must have been closed
152 Err(t)
153 }
154 }
155
156 fn poll_cancel(&self) -> Poll<(), ()> {
157 // Fast path up first, just read the flag and see if our other half is
158 // gone. This flag is set both in our destructor and the oneshot
159 // destructor, but our destructor hasn't run yet so if it's set then the
160 // oneshot is gone.
161 if self.complete.load(SeqCst) {
162 return Ok(Async::Ready(()))
163 }
164
165 // If our other half is not gone then we need to park our current task
166 // and move it into the `notify_cancel` slot to get notified when it's
167 // actually gone.
168 //
169 // If `try_lock` fails, then the `Receiver` is in the process of using
170 // it, so we can deduce that it's now in the process of going away and
171 // hence we're canceled. If it succeeds then we just store our handle.
172 //
173 // Crucially we then check `oneshot_gone` *again* before we return.
174 // While we were storing our handle inside `notify_cancel` the `Receiver`
175 // may have been dropped. The first thing it does is set the flag, and
176 // if it fails to acquire the lock it assumes that we'll see the flag
177 // later on. So... we then try to see the flag later on!
178 let handle = task::current();
179 match self.tx_task.try_lock() {
180 Some(mut p) => *p = Some(handle),
181 None => return Ok(Async::Ready(())),
182 }
183 if self.complete.load(SeqCst) {
184 Ok(Async::Ready(()))
185 } else {
186 Ok(Async::NotReady)
187 }
188 }
189
190 fn is_canceled(&self) -> bool {
191 self.complete.load(SeqCst)
192 }
193
194 fn drop_tx(&self) {
195 // Flag that we're a completed `Sender` and try to wake up a receiver.
196 // Whether or not we actually stored any data will get picked up and
197 // translated to either an item or cancellation.
198 //
199 // Note that if we fail to acquire the `rx_task` lock then that means
200 // we're in one of two situations:
201 //
202 // 1. The receiver is trying to block in `poll`
203 // 2. The receiver is being dropped
204 //
205 // In the first case it'll check the `complete` flag after it's done
206 // blocking to see if it succeeded. In the latter case we don't need to
207 // wake up anyone anyway. So in both cases it's ok to ignore the `None`
208 // case of `try_lock` and bail out.
209 //
210 // The first case crucially depends on `Lock` using `SeqCst` ordering
211 // under the hood. If it instead used `Release` / `Acquire` ordering,
212 // then it would not necessarily synchronize with `inner.complete`
213 // and deadlock might be possible, as was observed in
214 // https://github.com/rust-lang-nursery/futures-rs/pull/219.
215 self.complete.store(true, SeqCst);
216 if let Some(mut slot) = self.rx_task.try_lock() {
217 if let Some(task) = slot.take() {
218 drop(slot);
219 task.notify();
220 }
221 }
222 }
223
224 fn close_rx(&self) {
225 // Flag our completion and then attempt to wake up the sender if it's
226 // blocked. See comments in `drop` below for more info
227 self.complete.store(true, SeqCst);
228 if let Some(mut handle) = self.tx_task.try_lock() {
229 if let Some(task) = handle.take() {
230 drop(handle);
231 task.notify()
232 }
233 }
234 }
235
236 fn recv(&self) -> Poll<T, Canceled> {
237 let mut done = false;
238
239 // Check to see if some data has arrived. If it hasn't then we need to
240 // block our task.
241 //
242 // Note that the acquisition of the `rx_task` lock might fail below, but
243 // the only situation where this can happen is during `Sender::drop`
244 // when we are indeed completed already. If that's happening then we
245 // know we're completed so keep going.
246 if self.complete.load(SeqCst) {
247 done = true;
248 } else {
249 let task = task::current();
250 match self.rx_task.try_lock() {
251 Some(mut slot) => *slot = Some(task),
252 None => done = true,
253 }
254 }
255
256 // If we're `done` via one of the paths above, then look at the data and
257 // figure out what the answer is. If, however, we stored `rx_task`
258 // successfully above we need to check again if we're completed in case
259 // a message was sent while `rx_task` was locked and couldn't notify us
260 // otherwise.
261 //
262 // If we're not done, and we're not complete, though, then we've
263 // successfully blocked our task and we return `NotReady`.
264 if done || self.complete.load(SeqCst) {
265 // If taking the lock fails, the sender will realise that the we're
266 // `done` when it checks the `complete` flag on the way out, and will
267 // treat the send as a failure.
268 if let Some(mut slot) = self.data.try_lock() {
269 if let Some(data) = slot.take() {
270 return Ok(data.into());
271 }
272 }
273 Err(Canceled)
274 } else {
275 Ok(Async::NotReady)
276 }
277 }
278
279 fn drop_rx(&self) {
280 // Indicate to the `Sender` that we're done, so any future calls to
281 // `poll_cancel` are weeded out.
282 self.complete.store(true, SeqCst);
283
284 // If we've blocked a task then there's no need for it to stick around,
285 // so we need to drop it. If this lock acquisition fails, though, then
286 // it's just because our `Sender` is trying to take the task, so we
287 // let them take care of that.
288 if let Some(mut slot) = self.rx_task.try_lock() {
289 let task = slot.take();
290 drop(slot);
291 drop(task);
292 }
293
294 // Finally, if our `Sender` wants to get notified of us going away, it
295 // would have stored something in `tx_task`. Here we try to peel that
296 // out and unpark it.
297 //
298 // Note that the `try_lock` here may fail, but only if the `Sender` is
299 // in the process of filling in the task. If that happens then we
300 // already flagged `complete` and they'll pick that up above.
301 if let Some(mut handle) = self.tx_task.try_lock() {
302 if let Some(task) = handle.take() {
303 drop(handle);
304 task.notify()
305 }
306 }
307 }
308}
309
310impl<T> Sender<T> {
311 #[deprecated(note = "renamed to `send`", since = "0.1.11")]
312 #[doc(hidden)]
313 #[cfg(feature = "with-deprecated")]
314 pub fn complete(self, t: T) {
315 drop(self.send(t));
316 }
317
318 /// Completes this oneshot with a successful result.
319 ///
320 /// This function will consume `self` and indicate to the other end, the
321 /// `Receiver`, that the value provided is the result of the computation this
322 /// represents.
323 ///
324 /// If the value is successfully enqueued for the remote end to receive,
325 /// then `Ok(())` is returned. If the receiving end was deallocated before
326 /// this function was called, however, then `Err` is returned with the value
327 /// provided.
328 pub fn send(self, t: T) -> Result<(), T> {
329 self.inner.send(t)
330 }
331
332 /// Polls this `Sender` half to detect whether the `Receiver` this has
333 /// paired with has gone away.
334 ///
335 /// This function can be used to learn about when the `Receiver` (consumer)
336 /// half has gone away and nothing will be able to receive a message sent
337 /// from `send`.
338 ///
339 /// If `Ready` is returned then it means that the `Receiver` has disappeared
340 /// and the result this `Sender` would otherwise produce should no longer
341 /// be produced.
342 ///
343 /// If `NotReady` is returned then the `Receiver` is still alive and may be
344 /// able to receive a message if sent. The current task, however, is
345 /// scheduled to receive a notification if the corresponding `Receiver` goes
346 /// away.
347 ///
348 /// # Panics
349 ///
350 /// Like `Future::poll`, this function will panic if it's not called from
351 /// within the context of a task. In other words, this should only ever be
352 /// called from inside another future.
353 ///
354 /// If you're calling this function from a context that does not have a
355 /// task, then you can use the `is_canceled` API instead.
356 pub fn poll_cancel(&mut self) -> Poll<(), ()> {
357 self.inner.poll_cancel()
358 }
359
360 /// Tests to see whether this `Sender`'s corresponding `Receiver`
361 /// has gone away.
362 ///
363 /// This function can be used to learn about when the `Receiver` (consumer)
364 /// half has gone away and nothing will be able to receive a message sent
365 /// from `send`.
366 ///
367 /// Note that this function is intended to *not* be used in the context of a
368 /// future. If you're implementing a future you probably want to call the
369 /// `poll_cancel` function which will block the current task if the
370 /// cancellation hasn't happened yet. This can be useful when working on a
371 /// non-futures related thread, though, which would otherwise panic if
372 /// `poll_cancel` were called.
373 pub fn is_canceled(&self) -> bool {
374 self.inner.is_canceled()
375 }
376}
377
378impl<T> Drop for Sender<T> {
379 fn drop(&mut self) {
380 self.inner.drop_tx()
381 }
382}
383
384/// Error returned from a `Receiver<T>` whenever the corresponding `Sender<T>`
385/// is dropped.
386#[derive(Clone, Copy, PartialEq, Eq, Debug)]
387pub struct Canceled;
388
389impl fmt::Display for Canceled {
390 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
391 write!(fmt, "oneshot canceled")
392 }
393}
394
395impl Error for Canceled {
396 fn description(&self) -> &str {
397 "oneshot canceled"
398 }
399}
400
401impl<T> Receiver<T> {
402 /// Gracefully close this receiver, preventing sending any future messages.
403 ///
404 /// Any `send` operation which happens after this method returns is
405 /// guaranteed to fail. Once this method is called the normal `poll` method
406 /// can be used to determine whether a message was actually sent or not. If
407 /// `Canceled` is returned from `poll` then no message was sent.
408 pub fn close(&mut self) {
409 self.inner.close_rx()
410 }
411}
412
413impl<T> Future for Receiver<T> {
414 type Item = T;
415 type Error = Canceled;
416
417 fn poll(&mut self) -> Poll<T, Canceled> {
418 self.inner.recv()
419 }
420}
421
422impl<T> Drop for Receiver<T> {
423 fn drop(&mut self) {
424 self.inner.drop_rx()
425 }
426}