1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use {Future, Poll, Async}; use slot::{Slot, Token}; use lock::Lock; use task::{self, Task}; /// A future representing the completion of a computation happening elsewhere in /// memory. /// /// This is created by the `oneshot` function. #[must_use = "futures do nothing unless polled"] pub struct Oneshot<T> { inner: Arc<Inner<T>>, cancel_token: Option<Token>, } /// Represents the completion half of a oneshot through which the result of a /// computation is signaled. /// /// This is created by the `oneshot` function. pub struct Complete<T> { inner: Arc<Inner<T>>, completed: bool, } struct Inner<T> { slot: Slot<Option<T>>, oneshot_gone: AtomicBool, notify_cancel: Lock<Option<Task>>, } /// Creates a new in-memory oneshot used to represent completing a computation. /// /// A oneshot in this library is a concrete implementation of the `Future` trait /// used to complete a computation from one location with a future representing /// what to do in another. /// /// This function is similar to Rust's channels found in the standard library. /// Two halves are returned, the first of which is a `Oneshot` which implements /// the `Future` trait. The second half is a `Complete` handle which is used to /// signal the end of a computation. /// /// Each half can be separately owned and sent across threads. /// /// # Examples /// /// ``` /// use std::thread; /// use futures::*; /// /// let (c, p) = oneshot::<i32>(); /// /// thread::spawn(|| { /// p.map(|i| { /// println!("got: {}", i); /// }).wait(); /// }); /// /// c.complete(3); /// ``` pub fn oneshot<T>() -> (Complete<T>, Oneshot<T>) { let inner = Arc::new(Inner { slot: Slot::new(None), oneshot_gone: AtomicBool::new(false), notify_cancel: Lock::new(None), }); let oneshot = Oneshot { inner: inner.clone(), cancel_token: None, }; let complete = Complete { inner: inner, completed: false, }; (complete, oneshot) } impl<T> Complete<T> { /// Completes this oneshot with a successful result. /// /// This function will consume `self` and indicate to the other end, the /// `Oneshot`, that the error provided is the result of the computation this /// represents. pub fn complete(mut self, t: T) { self.completed = true; self.send(Some(t)) } /// Polls this `Complete` half to detect whether the `Oneshot` this has /// paired with has gone away. /// /// This function can be used to learn about when the `Oneshot` (consumer) /// half has gone away and nothing will be able to receive a message sent /// from `complete`. /// /// Like `Future::poll`, this function will panic if it's not called from /// within the context of a task. In otherwords, this should only ever be /// called from inside another future. /// /// If `Ready` is returned then it means that the `Oneshot` has disappeared /// and the result this `Complete` would otherwise produce should no longer /// be produced. /// /// If `NotReady` is returned then the `Oneshot` is still alive and may be /// able to receive a message if sent. The current task, however, is /// scheduled to receive a notification if the corresponding `Oneshot` goes /// away. pub fn poll_cancel(&mut self) -> Poll<(), ()> { // Fast path up first, just read the flag and see if our other half is // gone. if self.inner.oneshot_gone.load(Ordering::SeqCst) { return Ok(Async::Ready(())) } // If our other half is not gone then we need to park our current task // and move it into the `notify_cancel` slot to get notified when it's // actually gone. // // If `try_lock` fails, then the `Oneshot` is in the process of using // it, so we can deduce that it's now in the process of going away and // hence we're canceled. If it succeeds then we just store our handle. // // Crucially we then check `oneshot_gone` *again* before we return. // While we were storing our handle inside `notify_cancel` the `Oneshot` // may have been dropped. The first thing it does is set the flag, and // if it fails to acquire the lock it assumes that we'll see the flag // later on. So... we then try to see the flag later on! let handle = task::park(); match self.inner.notify_cancel.try_lock() { Some(mut p) => *p = Some(handle), None => return Ok(Async::Ready(())), } if self.inner.oneshot_gone.load(Ordering::SeqCst) { Ok(Async::Ready(())) } else { Ok(Async::NotReady) } } fn send(&mut self, t: Option<T>) { if let Err(e) = self.inner.slot.try_produce(t) { self.inner.slot.on_empty(Some(e.into_inner()), |slot, item| { slot.try_produce(item.unwrap()).ok() .expect("advertised as empty but wasn't"); }); } } } impl<T> Drop for Complete<T> { fn drop(&mut self) { if !self.completed { self.send(None); } } } /// Error returned from a `Oneshot<T>` whenever the correponding `Complete<T>` /// is dropped. #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct Canceled; impl<T> Future for Oneshot<T> { type Item = T; type Error = Canceled; fn poll(&mut self) -> Poll<T, Canceled> { if let Some(cancel_token) = self.cancel_token.take() { self.inner.slot.cancel(cancel_token); } match self.inner.slot.try_consume() { Ok(Some(e)) => Ok(Async::Ready(e)), Ok(None) => Err(Canceled), Err(_) => { let task = task::park(); self.cancel_token = Some(self.inner.slot.on_full(move |_| { task.unpark(); })); Ok(Async::NotReady) } } } } impl<T> Drop for Oneshot<T> { fn drop(&mut self) { // First up, if we squirreled away a task to get notified once the // oneshot was filled in, we cancel that notification. We'll never end // up actually receiving data (as we're being dropped) so no need to // hold onto the task. if let Some(cancel_token) = self.cancel_token.take() { self.inner.slot.cancel(cancel_token) } // Next up, inform the `Complete` half that we're going away. First up // we flag ourselves as gone, and next we'll attempt to wake up any // handle that was stored. // // If we fail to acquire the lock on the handle, that means that a // `Complete` is in the process of storing one, and it'll check // `oneshot_gone` on its way out to see our write here. self.inner.oneshot_gone.store(true, Ordering::SeqCst); if let Some(mut handle) = self.inner.notify_cancel.try_lock() { if let Some(task) = handle.take() { drop(handle); task.unpark() } } } }