1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
#![deny(missing_docs)] #![allow(clippy::needless_doctest_main)] //! A container for an unordered collection of [Future]s. //! This provides an experimental variant of `FuturesUnordered` aimed to be //! _fairer_. Easier to maintain, and store the futures being polled in a way which //! provides better memory locality. //! //! ## Architecture //! //! The `Unordered` type stores all futures being polled in a `PinSlab`. This [slab] //! maintains a growable collection of fixed-size memory regions, allowing it to //! store immovable objects. The primary feature of a slab is that it automatically //! reclaims memory at low cost. Each future inserted into the slab is asigned an //! _index_. //! //! Next to the futures we maintain two bitsets, one _active_ and one //! _alternate_. When a future is woken up, the bit associated with its index is //! set in the _active_ set, and the waker associated with the poll to `Unordered` //! is called. //! //! Once `Unordered` is polled, it atomically swaps the _active_ and _alternate_ //! bitsets, waits until it has exclusive access to the now _alternate_ bitset, and //! drains it from all the indexes which have been flagged to determine which //! futures to poll. //! //! We can also add futures to `Unordered`, this is achieved by inserting it into //! the slab, then marking that index in a special `pollable` collection that it //! should be polled the next time `Unordered` is. //! //! [slab]: https://github.com/carllerche/slab //! //! ## Examples //! //! ```rust,no_run //! use tokio::{stream::StreamExt as _, time}; //! use std::time::Duration; //! //! #[tokio::main] //! async fn main() { //! let mut futures = unicycle::Unordered::new(); //! //! futures.push(time::delay_for(Duration::from_secs(2))); //! futures.push(time::delay_for(Duration::from_secs(3))); //! futures.push(time::delay_for(Duration::from_secs(1))); //! //! while let Some(_) = futures.next().await { //! println!("tick"); //! } //! //! println!("done!"); //! } //! ``` #[doc(hidden)] pub use self::bit_set::{AtomicBitSet, BitSet}; #[doc(hidden)] pub use self::pin_slab::PinSlab; use self::wake_set::{SharedWakeSet, WakeSet}; use self::waker::SharedWaker; use futures_core::Stream; use std::{ collections::VecDeque, future::Future, iter, mem, pin::Pin, ptr, sync::Arc, task::{Context, Poll}, }; mod bit_set; mod pin_slab; mod wake_set; mod waker; /// Data that is shared across all sub-tasks. struct Shared { /// The currently registered parent waker. waker: SharedWaker, /// The currently registered wake set. wake_set: SharedWakeSet, } impl Shared { /// Construct new shared data. fn new() -> Self { Self { waker: SharedWaker::new(), wake_set: SharedWakeSet::new(), } } } /// A container for an unordered collection of [Future]s. pub struct Unordered<F> where F: Future, { // Indexes that needs to be polled after they have been added. pollable: Vec<usize>, // Slab of futures being polled. // They need to be pinned on the heap, since the slab might grow to // accomodate more futures. slab: PinSlab<F>, // The largest index inserted into the slab so far. max_capacity: usize, // Shared parent waker. // Includes the current wake target. Each time we poll, we swap back and // forth between this and `wake_alternate`. shared: Arc<Shared>, // Alternate wake set, used for growing the existing set when futures are // added. This is then swapped out with the active set to receive polls. wake_alternate: *mut WakeSet, // Pending outgoing results. Uses a queue to avoid interrupting polling. results: VecDeque<F::Output>, } unsafe impl<F> Send for Unordered<F> where F: Future {} unsafe impl<F> Sync for Unordered<F> where F: Future {} impl<F> Unpin for Unordered<F> where F: Future {} impl<F> Unordered<F> where F: Future, { /// Construct a new, empty [Unordered]. pub fn new() -> Self { let alternate = WakeSet::new(); alternate.lock_write(); Self { pollable: Vec::with_capacity(16), slab: PinSlab::new(), max_capacity: 0, shared: Arc::new(Shared::new()), wake_alternate: Box::into_raw(Box::new(alternate)), results: VecDeque::new(), } } /// Test if the collection of futures is empty. pub fn is_empty(&self) -> bool { self.slab.is_empty() } /// Add the given future to the [Unordered] stream. /// /// Newly added futures are guaranteed to be polled, but there is no /// guarantee in which order this will happen. pub fn push(&mut self, future: F) { let index = self.slab.insert(future); self.max_capacity = usize::max(self.max_capacity, index + 1); self.pollable.push(index); } } impl<F> Default for Unordered<F> where F: Future, { fn default() -> Self { Self::new() } } impl<F> Drop for Unordered<F> where F: Future, { fn drop(&mut self) { // Safety: we uniquely own `wake_alternate`, so we are responsible for // dropping it. This is asserted when we swap it out during a poll by // calling WakeSet::lock_write. We are also the _only_ one // swapping `wake_alternative`, so we know that can't happen here. unsafe { WakeSet::drop_raw(self.wake_alternate); } } } impl<F> Stream for Unordered<F> where F: Future, { type Item = F::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let Self { ref mut pollable, ref mut results, ref mut slab, ref shared, ref mut wake_alternate, max_capacity, .. } = *self.as_mut(); // Return pending result. if let Some(value) = results.pop_front() { cx.waker().wake_by_ref(); return Poll::Ready(Some(value)); } if slab.is_empty() { // Nothing to poll, nothing to add. End the stream since we don't have work to do. return Poll::Ready(None); } // Note: We defer swapping the waker until we are here since we `wake_by_ref` when // reading results, and if we don't have any child tasks (slab is empty) no one would // benefit from an update anyways. if !shared.waker.is_woken_by(cx.waker()) { shared.waker.swap(cx.waker().clone()); } let wake_last = unsafe { { let wake_set = (**wake_alternate).as_local_mut(); if wake_set.set.capacity() <= max_capacity { wake_set.set.reserve(max_capacity); } } // Unlock. At this position, if someone adds an element to the wake set they are // also bound to call wake, which will cause us to wake up. // // There is a race going on between locking and unlocking, and it's beneficial // for child tasks to observe the locked state of the wake set so they refetch // the other set instead of having to wait until another wakeup. (**wake_alternate).unlock_write(); let next = mem::replace(wake_alternate, ptr::null_mut()); *wake_alternate = shared.wake_set.swap(next); // Make sure no one else is using the alternate wake. // // Safety: We are the only one swapping wake_alternate, so at // this point we know that we have access to the most recent // active set. We _must_ call lock_write before we // can punt this into a mutable reference though, because at // this point inner futures will still have access to references // to it (under a lock!). We must wait for these to expire. (**wake_alternate).lock_write(); // Safety: While this is live we must _not_ mess with // `wake_alternate` in any way. (**wake_alternate).as_local_mut() }; let indexes = iter::from_fn(|| pollable.pop()).chain(wake_last.set.drain()); for index in indexes { // NB: Since we defer pollables a little, a future might // have been polled and subsequently removed from the slab. // So we don't treat this as an error here. // If on the other hand it was removed _and_ re-added, we have // a case of a spurious poll. Luckily, that doesn't bother a // future much. let fut = match slab.get_pin_mut(index) { Some(fut) => fut, None => continue, }; // Construct a new lightweight waker only capable of waking by // reference, with referential access to `shared`. let result = self::waker::poll_with_ref(shared, index, move |cx| fut.poll(cx)); if let Poll::Ready(result) = result { results.push_back(result); let removed = slab.remove(index); debug_assert!(removed); } } // Return produced result. if let Some(value) = results.pop_front() { cx.waker().wake_by_ref(); return Poll::Ready(Some(value)); } Poll::Pending } }