1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
#![deny(missing_docs)]
#![allow(clippy::needless_doctest_main)]
//! A scheduler for driving a large number of futures.
//!
//! Unicycle provides the [Unordered] type, which is a futures abstraction that
//! runs a set of futures which may complete in any order.
//! Similarly to [FuturesUnordered].
//! But we aim to provide a stronger guarantee of fairness (see below), and
//! better memory locality for the futures being pollled.
//!
//! **Note:** This project is experimental. It involves some amount of unsafe and
//! possibly bad assumptions which needs to be either vetted or removed before you
//! should consider putting it in production.
//!
//! ## Features
//!
//! * `parking-lot` - To enable locking using the [parking_lot] crate (optional).
//! * `vec-safety` - Avoid relying on the assumption that `&mut Vec<T>` can be
//!   safely coerced to `&mut Vec<U>` if `T` and `U` have an identical memory
//!   layouts (enabled by default, [issue #1]).
//!
//! [issue #1]: https://github.com/udoprog/unicycle/issues/1
//! [parking_lot]: https://crates.io/crates/parking_lot
//!
//! ## Examples
//!
//! ```rust
//! use tokio::{stream::StreamExt as _, time};
//! use std::time::Duration;
//!
//! #[tokio::main]
//! async fn main() {
//!     let mut futures = unicycle::Unordered::new();
//!
//!     futures.push(time::delay_for(Duration::from_secs(2)));
//!     futures.push(time::delay_for(Duration::from_secs(3)));
//!     futures.push(time::delay_for(Duration::from_secs(1)));
//!
//!     while let Some(_) = futures.next().await {
//!         println!("tick");
//!     }
//!
//!     println!("done!");
//! }
//! ```
//!
//! ## Fairness
//!
//! You can think of abstractions like Unicycle as schedulers. They are provided a
//! set of child tasks, and try to do their best to drive them to completion. In
//! this regard, it's interesting to talk about _fairness_ in how the tasks are
//! being driven.
//!
//! The current implementation of [FuturesUnordered] maintains a queue of tasks
//! interested in waking up. As a task is woken up, it's added to the head of this
//! queue to signal its interest. When [FuturesUnordered] is being polled, it
//! checks the head of this queue in a loop. As long as there is a task interested
//! in being woken up, this task will be polled.
//! This process has a side effect of tasks who aggressively signal interest in
//! waking up will receive priority and be polled more frequently.
//! This can lead to instances where a small number of tasks can can cause the
//! polling loop of [FuturesUnordered] to [spin abnormally].
//! This issue was [reported by Jon Gjengset], and improved on by [limiting the
//! amount FuturesUnordered is allowed to spin].
//!
//! Unicycle addresses this by limiting how frequently a child task may be polled
//! per _polling cycle_.
//! This is done by tracking polling interest in two separate sets.
//! Once we are polled, we swap out the active set, then take the swapped out set
//! and use as a basis for what to poll in order, but we limit ourselves to only
//! poll _once_ per child task.
//! Additional wakeups are only registered in the swapped in set which will be
//! polled the next cycle.
//!
//! This way we hope to achieve a higher degree of fairness, never favoring the
//! behavior of one particular task.
//!
//! For more details, see the _Architecture_ section below.
//!
//! [spin abnormally]: https://github.com/udoprog/unicycle/blob/master/tests/spinning_futures_unordered.rs
//! [limiting the amount FuturesUnordered is allowed to spin]: https://github.com/rust-lang/futures-rs/pull/2049
//! [reported by Jon Gjengset]: https://github.com/rust-lang/futures-rs/issues/2047
//!
//! ## Architecture
//!
//! The [Unordered] type stores all futures being polled in a [PinSlab] (Inspired by
//! the [slab] crate).
//! A slab is capable of utomatically reclaiming storage at low cost, and will
//! maintain decent memory locality.
//! A [PinSlab] is different from a [Slab] in how it allocates the memory regions it
//! uses to store objects.
//! While a regular [Slab] is simply backed by a vector which grows as appropriate,
//! this approach is not viable for pinning, since it would cause the objects to
//! move while being reallocated.
//! Instead [PinSlab] maintains a growable collection of fixed-size memory regions,
//! allowing it to store and reference immovable objects through the [pin API].
//! Each future inserted into the slab is assigned an _index_, which we will be
//! using below.
//! We now call the inserted future a _task_, and you can think of this index as a
//! unique task identifier.
//!
//! [slab]: https://github.com/carllerche/slab
//! [pin API]: https://doc.rust-lang.org/std/pin/index.html
//!
//! Next to the slab we maintain two [BitSet]s, one _active_ and one _alternate_.
//! When a task registers interest in waking up, the bit associated with its index
//! is set in the active set, and the latest waker passed into [Unordered] is called
//! to wake it up.
//! Once [Unordered] is polled, it atomically swaps the active and alternate
//! [BitSet]s, waits until it has exclusive access to the now _alternate_ [BitSet], and
//! drains it from all the indexes which have been flagged to determine which tasks
//! to poll.
//! Each task is then polled _once_ in order.
//! If the task is [Ready], its result is yielded.
//! After we receive control again, we continue draining the alternate set in this manner, until it is empty.
//! When this is done we yield once, then we start the cycle over again.
//!
//! [Ready]: https://doc.rust-lang.org/std/task/enum.Poll.html
//! [Slab]: https://docs.rs/slab/latest/slab/struct.Slab.html
//! [FuturesUnordered]: https://docs.rs/futures/latest/futures/stream/struct.FuturesUnordered.html

pub use self::bit_set::{AtomicBitSet, BitSet, Drain, DrainSnapshot, Iter};
pub use self::pin_slab::PinSlab;
use self::wake_set::{LocalWakeSet, SharedWakeSet, WakeSet};
use self::waker::SharedWaker;
use futures_core::Stream;
use std::{
    future::Future,
    iter, mem,
    pin::Pin,
    ptr,
    sync::Arc,
    task::{Context, Poll},
};

mod bit_set;
mod lock;
mod pin_slab;
mod wake_set;
mod waker;

macro_rules! ready {
    ($expr:expr) => {
        match $expr {
            Poll::Ready(value) => value,
            Poll::Pending => return Poll::Pending,
        }
    };
}

/// Data that is shared across all sub-tasks.
struct Shared {
    /// The currently registered parent waker.
    waker: SharedWaker,
    /// The currently registered wake set.
    wake_set: SharedWakeSet,
}

impl Shared {
    /// Construct new shared data.
    fn new() -> Self {
        Self {
            waker: SharedWaker::new(),
            wake_set: SharedWakeSet::new(),
        }
    }

    /// Swap the active wake set with the alternate one.
    fn swap_active<'a>(
        &self,
        cx: &mut Context<'_>,
        lock_wake_alternate: &mut bool,
        max_capacity: usize,
        alternate: &'a mut *mut WakeSet,
    ) -> Poll<&'a mut LocalWakeSet> {
        unsafe {
            if !*lock_wake_alternate {
                {
                    let wake_set = (**alternate).as_local_mut();

                    if wake_set.set.capacity() <= max_capacity {
                        wake_set.set.reserve(max_capacity);
                    }
                }

                // Unlock. At this position, if someone adds an element to the wake set they are
                // also bound to call wake, which will cause us to wake up.
                //
                // There is a race going on between locking and unlocking, and it's beneficial
                // for child tasks to observe the locked state of the wake set so they refetch
                // the other set instead of having to wait until another wakeup.
                (**alternate).unlock_exclusive();

                let next = mem::replace(alternate, ptr::null_mut());
                *alternate = self.wake_set.swap(next);

                *lock_wake_alternate = true;
            }

            // Make sure no one else is using the alternate wake.
            //
            // Safety: We are the only one swapping alternate, so at
            // this point we know that we have access to the most recent
            // active set. We _must_ call lock_exclusive before we
            // can punt this into a mutable reference though, because at
            // this point inner futures will still have access to references
            // to it (under a lock!). We must wait for these to expire.
            if !(**alternate).try_lock_exclusive() {
                cx.waker().wake_by_ref();
                return Poll::Pending;
            }

            *lock_wake_alternate = false;

            // Safety: While this is live we must _not_ mess with
            // `alternate` in any way.
            Poll::Ready((**alternate).as_local_mut())
        }
    }
}

/// A container for an unordered collection of [Future]s.
///
/// # Examples
///
/// ```rust,no_run
/// use tokio::{stream::StreamExt as _, time};
/// use std::time::Duration;
///
/// #[tokio::main]
/// async fn main() {
///     let mut futures = unicycle::Unordered::new();
///
///     futures.push(time::delay_for(Duration::from_secs(2)));
///     futures.push(time::delay_for(Duration::from_secs(3)));
///     futures.push(time::delay_for(Duration::from_secs(1)));
///
///     while let Some(_) = futures.next().await {
///         println!("tick");
///     }
///
///     println!("done!");
/// }
/// ```
pub struct Unordered<F> {
    /// Indexes that needs to be polled after they have been added.
    pollable: Vec<usize>,
    /// Slab of futures being polled.
    /// They need to be pinned on the heap, since the slab might grow to
    /// accomodate more futures.
    slab: PinSlab<F>,
    /// The largest index inserted into the slab so far.
    max_capacity: usize,
    /// Shared parent waker.
    /// Includes the current wake target. Each time we poll, we swap back and
    /// forth between this and `alternate`.
    shared: Arc<Shared>,
    /// Alternate wake set, used for growing the existing set when futures are
    /// added. This is then swapped out with the active set to receive polls.
    alternate: *mut WakeSet,
    /// A stored drain location for the alternate set iterator.
    alternate_drain: Option<DrainSnapshot>,
    /// Flag intent to lock wake_alternative exclusively.
    /// We set this flag instead of blocking on locking `alternate` if
    /// locking fails. Because if this is the case, we've already swapped the
    /// active wake set.
    lock_wake_alternate: bool,
}

unsafe impl<F> Send for Unordered<F> {}
unsafe impl<F> Sync for Unordered<F> {}

impl<F> Unpin for Unordered<F> {}

impl<F> Unordered<F> {
    /// Construct a new, empty [Unordered].
    ///
    /// # Examples
    ///
    /// ```rust
    /// use unicycle::Unordered;
    ///
    /// let mut futures = Unordered::new();
    /// assert!(futures.is_empty());
    ///
    /// futures.push(async { 42 });
    /// ```
    pub fn new() -> Self {
        let alternate = WakeSet::locked();

        Self {
            pollable: Vec::new(),
            slab: PinSlab::new(),
            max_capacity: 0,
            alternate_drain: None,
            shared: Arc::new(Shared::new()),
            alternate: Box::into_raw(Box::new(alternate)),
            lock_wake_alternate: false,
        }
    }

    /// Test if the collection of futures is empty.
    ///
    /// # Examples
    ///
    /// ```rust
    /// use unicycle::Unordered;
    ///
    /// let mut futures = Unordered::<tokio::time::Delay>::new();
    /// assert!(futures.is_empty());
    /// ```
    pub fn is_empty(&self) -> bool {
        self.slab.is_empty()
    }

    /// Add the given future to the [Unordered] stream.
    ///
    /// Newly added futures are guaranteed to be polled, but there is no
    /// guarantee in which order this will happen.
    ///
    /// Pushed tasks are pinned by the [Unordered] collection automatically.
    ///
    /// # Examples
    ///
    /// ```rust
    /// use unicycle::Unordered;
    ///
    /// let mut futures = Unordered::new();
    /// assert!(futures.is_empty());
    /// futures.push(async { 42 });
    /// assert!(!futures.is_empty());
    /// ```
    pub fn push(&mut self, future: F) {
        let index = self.slab.insert(future);
        self.max_capacity = usize::max(self.max_capacity, index + 1);
        self.pollable.push(index);
    }
}

impl<F> Default for Unordered<F> {
    fn default() -> Self {
        Self::new()
    }
}

impl<F> Drop for Unordered<F> {
    fn drop(&mut self) {
        // We intend to drop both wake sets. Therefore we need exclusive access
        // to both wakers. Unfortunately that means that at this point, any call
        // to wakes will have to serialize behind the shared wake set while the
        // alternate set is being dropped.
        let _write = self.shared.wake_set.prevent_drop_write();

        // Safety: we uniquely own `alternate`, so we are responsible for
        // dropping it. This is asserted when we swap it out during a poll by
        // calling WakeSet::lock_exclusive. We are also the _only_ one
        // swapping `wake_alternative`, so we know that can't happen here.
        unsafe {
            drop(Box::from_raw(self.alternate));
        }
    }
}

impl<F> Stream for Unordered<F>
where
    F: Future,
{
    type Item = F::Output;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let Self {
            ref mut pollable,
            ref mut slab,
            ref shared,
            ref mut alternate,
            ref mut lock_wake_alternate,
            max_capacity,
            ref mut alternate_drain,
            ..
        } = *self.as_mut();

        if slab.is_empty() {
            // Nothing to poll, nothing to add. End the stream since we don't have work to do.
            return Poll::Ready(None);
        }

        loop {
            let (snapshot, wake_last, swapped) = if let Some(snapshot) = alternate_drain.take() {
                (
                    Some(snapshot),
                    unsafe { (**alternate).as_local_mut() },
                    false,
                )
            } else {
                // Note: We defer swapping the waker until we are here since we `wake_by_ref` when
                // reading results, and if we don't have any child tasks (slab is empty) no one would
                // benefit from an update anyways.
                if !shared.waker.swap(cx.waker()) {
                    return Poll::Pending;
                }

                let wake_last =
                    ready!(shared.swap_active(cx, lock_wake_alternate, max_capacity, alternate));
                (None, wake_last, true)
            };

            let mut it = match snapshot {
                Some(snapshot) => wake_last.set.drain_from(snapshot),
                None => wake_last.set.drain(),
            };

            let indexes = iter::from_fn(|| pollable.pop()).chain(&mut it);

            for index in indexes {
                // NB: Since we defer pollables a little, a future might
                // have been polled and subsequently removed from the slab.
                // So we don't treat this as an error here.
                // If on the other hand it was removed _and_ re-added, we have
                // a case of a spurious poll. Luckily, that doesn't bother a
                // future much.
                let fut = match slab.get_pin_mut(index) {
                    Some(fut) => fut,
                    None => continue,
                };

                // Construct a new lightweight waker only capable of waking by
                // reference, with referential access to `shared`.
                let result = self::waker::poll_with_ref(shared, index, move |cx| fut.poll(cx));

                if let Poll::Ready(result) = result {
                    let removed = slab.remove(index);
                    debug_assert!(removed);
                    cx.waker().wake_by_ref();
                    *alternate_drain = it.snapshot();
                    return Poll::Ready(Some(result));
                }
            }

            // We've polled a full swapped out snapshot, it's time to yield.
            // !swapped means that we got to this state by resuming draining the
            // previously swapped out bitset, in which case we should continue
            // another loop where we actually swap out the active bitset.
            if swapped {
                break;
            }
        }

        // We have successfully polled the last snapshot.
        // Yield and make sure that we are polled again.
        if !slab.is_empty() {
            cx.waker().wake_by_ref();
            return Poll::Pending;
        }

        Poll::Ready(None)
    }
}