slotpoller 0.2.1

Bounded, lock-free futures collection. Faster than FuturesUnordered and other crates.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
/*
 * Copyright © 2026 Anand Beh
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

//!
//! This library implements a collection of pollable futures via a backing array. It has the
//! following properties:
//! 1. Futures can be stored on the stack.
//! 2. The number of futures is bounded.
//! 3. Futures are only polled when they generate wake-ups.
//!
//! The API reflects these properties. There are three main structs, [StackSlots], [HeapSlots],
//! and [SlotPoller].
//!
//! ### Stack slots
//!
//! Using [StackSlots] requires you to pin it:
//!
//! ```
//! use std::pin::pin;
//! use slotpoller::{SlotPoller, StackSlots};
//!
//! let stack_slots = pin!(StackSlots::<30, _>::default());
//! let mut slot_poller = SlotPoller::new(stack_slots);
//! # slot_poller.try_push(async {});
//! ```
//!
//! ### Heap slots
//!
//! [HeapSlots] lets you set the capacity at runtime. A capacity must be provided:
//!
//! ```
//! use std::num::NonZeroU16;
//! use slotpoller::{HeapSlots, SlotPoller};
//!
//! let capacity = NonZeroU16::new(50).unwrap();
//! let mut slot_poller = SlotPoller::new(HeapSlots::with_capacity(capacity));
//! # slot_poller.try_push(async {});
//! ```
//!
//! ### Basic Usage
//!
//! After construction, most methods can be found on the [SlotPoller] struct.
//!
//! ```
//! use std::io;
//! use std::pin::pin;
//! use slotpoller::{StackSlots, SlotPoller};
//!
//! async fn exec() -> Result<String, io::Error> {
//!   Ok(String::new())
//! }
//!
//! async fn exec_multiple() -> Result<(), io::Error> {
//!   let stack_slots = pin!(StackSlots::<20, _>::default());
//!   let mut slot_poller = SlotPoller::new(stack_slots);
//!
//!   for _ in 0..50 {
//!     let (res, vacancy) = slot_poller.next_vacancy().await;
//!     if let Some(Err(e)) = res {
//!       return Err(e);
//!     }
//!     vacancy.insert(exec());
//!   }
//!
//!   slot_poller.try_drain(|res| { res?; Ok(()) }).await
//! }
//! ```
//!
//! ### Allocation
//!
//! The allocation footprint of this crate is very minimal and stays constant over the lifetime of
//! the poller. Here are the allocating functions in this crate:
//!
//! * [HeapSlots::with_capacity] (2 allocations)
//! * [StackSlots::new] (1 allocation)
//!
//! By design, the Rust async model requires either heap allocation or static mutable memory in
//! order to implement wakers. Users who want an alloc-free solution (using static mutable memory)
//! should look at the `embassy-executor` crate.
//!
//! ### Panics
//!
//! Functions in this library should never generate a panic, with one exception: polling a future after it
//! has completed (see [Future::poll]).
//!

mod guard;
mod memory;
mod polling;
#[cfg(test)]
mod tests;

use crate::guard::{AtomicStateNodeIdx, OptSlotIdx, PollThread, SlotIdx, StateNodeIdx};
use crate::memory::MemoryBacking;
use crate::polling::{AtomicStatus, poll_queue_reinsert_head, pop_pollable};
use cache_padded::CachePadded;
use diatomic_waker::DiatomicWaker;
use std::alloc::Layout;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::{Context, Poll};
use triomphe::{Arc, HeaderSlice};

///
/// An internal trait, used to identify different memory providers.
///
/// This is what allows the library to support both stack-based polling of a fixed size array,
/// and polling of a dynamically allocated array, at the same time.
///
#[allow(private_bounds)]
pub trait SlotMemory<F: Future>: memory::Memory<F> {}

///
/// Stack-friendly memory that can be used by the poller.
///
/// The generic parameter `N` determines the size of the array. Its value must **not** exceed
/// [u16::MAX], and the implementation checks this requirement. In the future, if the feature
/// `generic_const_exprs` is added to Rust, the type may change to `u16`.
///
/// Usage is simple. [SlotPoller] takes a pinned reference to this struct:
///
/// ```
/// use std::io;
/// use std::pin::pin;
/// use slotpoller::{StackSlots, SlotPoller};
///
/// async fn exec() -> Result<(), io::Error> {
///   Ok(())
/// }
///
/// async fn exec_multiple() {
///   let stack_slots = pin!(StackSlots::<20, _>::default());
///   let mut stack_poller = SlotPoller::new(stack_slots);
///
///   let (_, vacancy) = stack_poller.next_vacancy().await;
///   vacancy.insert(exec());
/// }
/// ```
///
pub struct StackSlots<const N: usize, F: Future> {
    activity: EngineActivity,
    slots: [Slot<F>; N],
    shared_storage: Arc<SharedStorage>,
}

///
/// An intermediate struct that erases the constant parameter on [StackSlots].
///
/// This generic erasure prevents excessive binary size due to runaway monomorphization.
///
// Not exposed: Does this have value to API users? Probably not
struct StackSlotsRef<'pin, F: Future> {
    activity: &'pin mut EngineActivity,
    slots: Pin<&'pin mut [Slot<F>]>,
    shared_storage: &'pin Arc<SharedStorage>,
}

impl<'pin, const N: usize, F: Future> From<Pin<&'pin mut StackSlots<N, F>>>
    for StackSlotsRef<'pin, F>
{
    fn from(memory: Pin<&'pin mut StackSlots<N, F>>) -> Self {
        unsafe {
            // SAFETY
            // Slots stays pinned - we project it, while other fields are Unpin
            let memory = memory.get_unchecked_mut();
            Self {
                activity: &mut memory.activity,
                slots: Pin::new_unchecked(&mut memory.slots),
                shared_storage: &memory.shared_storage,
            }
        }
    }
}

impl<'pin, const N: usize, F: Future> SlotMemory<F> for Pin<&'pin mut StackSlots<N, F>> {}

///
/// Allocates the space for the futures on the heap.
///
/// The advantage of this is that you can choose the buffer size at runtime.
/// This approach is very similar to other crates that implement collections of futures.
///
pub struct HeapSlots<F: Future> {
    activity: EngineActivity,
    // Don't use Box. We must free pointer manually, AFTER futures have been dropped
    slots_ptr: NonNull<[Slot<F>]>,
    shared_storage: Arc<SharedStorage>,
    _marker: PhantomData<[Slot<F>]>,
}

impl<F: Future> SlotMemory<F> for HeapSlots<F> {}

impl<F: Future> Unpin for HeapSlots<F> {}

#[derive(Debug)]
struct EngineActivity {
    /// A LIFO stack of empty slots.
    empty_head: OptSlotIdx,
    /// The number of slots with futures inside of them
    slots_active: u16,
    /// The head of the pollable queue is updated only by the polling thread
    poll_queue_head: StateNodeIdx,
    /// Fairness counter for the current polling loop
    poll_loop_idx: u16,
}

struct EngineFields<'m, F> {
    activity: &'m mut EngineActivity,
    slots: Pin<&'m mut [Slot<F>]>,
    shared_storage: &'m Arc<SharedStorage>,
}

/// The fixed length part of the shared storage
///
/// It is updated by everyone (poller and wakers), so it is wrapped in CachePadded for efficiency
#[derive(Debug)]
struct SharedStorageHeaderInner {
    /// The tail of the pollable linked queue
    poll_queue_tail: AtomicStateNodeIdx,
    /// Used to recover the full DST pointer
    nodes_len: u32,
    /// The waker of the poller's latest operations
    main_waker: DiatomicWaker,
}

#[derive(Debug)]
#[repr(transparent)]
struct SharedStorageHeader(CachePadded<SharedStorageHeaderInner>);

impl SharedStorageHeader {
    // HeaderSlice is repr(C), and creating a [T; 0] from a [T; N] is safe
    const BYTE_OFFSET: usize = Layout::new::<HeaderSlice<Self, [StateNode; 0]>>().size();
}

impl Deref for SharedStorageHeader {
    type Target = SharedStorageHeaderInner;

    #[inline]
    fn deref(&self) -> &Self::Target {
        &*self.0
    }
}

type SharedStorage = HeaderSlice<SharedStorageHeader, [StateNode]>;

///
/// The main poller
///
/// It's called a [SlotPoller] because it can handle either stack or heap memory,
/// and because it polls multiple futures.
///
pub struct SlotPoller<F: Future, M: SlotMemory<F>> {
    token: PollThread,
    memory: M::Backing,
}

impl<F: Future, M: SlotMemory<F>> Debug for SlotPoller<F, M> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SlotPoller")
            .field("token", &self.token)
            .field("memory", &self.memory)
            .finish()
    }
}

impl<F: Future, M: SlotMemory<F>> SlotPoller<F, M> {
    ///
    /// Creates the poller.
    ///
    /// After calling this function, the memory is permanently tied to this poller.
    /// This poller can't be moved to other threads, either.
    ///
    pub fn new(memory: M) -> Self {
        Self {
            token: memory.poll_thread(),
            memory: memory.into_backing(),
        }
    }
}

/*

The lifecycle of a slot is managed through the struggle over its activity and contents.

The poll thread stores a &mut StackPoller and manages the on-stack slots and linked list of empty
slots. The waker can only access the shared heap storage, which is manipulated through atomic
operations. The pollable slot queue is shared between them using a control-exchanging algorithm.

A slot is empty if it has no future (status = Status::Uninit). Empty slots are tracked by a linked
list, in stack memory, that is updated by the polling thread. We call this the "empty slot stack"
since it exhibits LIFO behavior.

We can describe the lifecycle and its state transitions with this diagram:

     Status    | Empty/Pollable queue  |      Poll thread       |     Waker thread      |
 ~~~~~~~~~~~~~ | ~~~~~~~~~~~~~~~~~~~~~ | ~~~~~~~~~~~~~~~~~~~~~~ | ~~~~~~~~~~~~~~~~~~~~~ |
     Uninit    |  Queued as empty
       |               |
       |              \|/
       |              *** <--------------- Find a new slot
       |         Dequeued as empty              ||
      \|/                                       ||
      *** <------------------------------- Future inserted
      New                               (defaults to unpolled)
       |
       |
       |
       |
      \|/
      *** <------------------------------- Poll the future
    Waiting                                Create the waker ------\
       |               .                                           --\
       |               .                                              --\
       |               .                                                 |
       |               |                                                \|/
      \|/              |
      *** <------------|----------------------------------------- 1. CAS to Woken
     Woken             |
       |              \|/ <-------------------------------------- 2. If CAS succeeded
       |              ***                                            A. enqueue as pollable
       |         Enqueued as pollable                                B. call original waker
       |               |
       |              \|/
       |              *** <--------------- Withdrawn from queue
       |         Dequeued as pollable            ||
      \|/              |                         ||
      *** <------------|------------------ Poll the future again
     Waiting           |
    /       \          .                   If incomplete, recreate
   /         \         .                   waker and repeat. Else,
   |         |         .                   future is completed.
   |         |
 Pending   Ready
 (repeat)   ***  <------------------------- Future dropped
   |         |                                   ||
   .         |                                   ||
   .       Uninit      |                         ||
   .                  \|/                        ||
                      *** <---------------- Mark slot empty
                 Enqueued as empty
                 (diagram repeats)

 */

struct Slot<F> {
    /// Only initialized if state is Waiting or Woken
    /// If initialized, then the future has not panicked and has not yet completed
    future: MaybeUninit<F>,
    empty_link: OptSlotIdx,
    /// The index of the loop we were in, when polled. Used to ensure fairness
    last_poll_loop_idx: u16,
}

// Manual pin_project impl

impl<F> Slot<F> {
    fn project(self: Pin<&mut Self>) -> SlotProject<'_, F> {
        unsafe {
            // SAFETY
            // Future stays pinned
            let this = self.get_unchecked_mut();
            SlotProject {
                future: Pin::new_unchecked(&mut this.future),
                empty_link: &mut this.empty_link,
                last_poll_loop_idx: &mut this.last_poll_loop_idx,
            }
        }
    }
}

struct SlotProject<'s, F> {
    future: Pin<&'s mut MaybeUninit<F>>,
    empty_link: &'s mut OptSlotIdx,
    last_poll_loop_idx: &'s mut u16,
}

impl<F> Debug for Slot<F> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        struct Opaque;
        impl Debug for Opaque {
            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
                f.write_str("opaque")
            }
        }
        f.debug_struct("Slot")
            .field("future", &Opaque)
            .field("empty_link", &self.empty_link)
            .field("last_poll_loop_idx", &self.last_poll_loop_idx)
            .finish()
    }
}

#[derive(Debug)]
struct StateNode {
    status: AtomicStatus,
    /// The next link in the queue. [StateNodeIdx::UNSET] for none.
    poll_queue_link: AtomicStateNodeIdx,
    /// The corresponding slot index.
    /// This value is recovered by the waker in order to reconstruct SharedStorage
    slot_idx: SlotIdx,
}

impl EngineActivity {
    fn empty_stack_pop<'s, F>(
        &mut self,
        slots: Pin<&'s mut [Slot<F>]>,
    ) -> Option<(SlotIdx, Pin<&'s mut Slot<F>>)> {
        let head = self.empty_head;
        if head.is_set() {
            let head = head.into_slot_idx();
            let head_slot = head.get_slot(slots);
            let new_head = head_slot.empty_link;
            self.empty_head = new_head;
            self.slots_active += 1;
            Some((head, head_slot))
        } else {
            None
        }
    }

    fn empty_stack_push<F>(&mut self, slot_idx: SlotIdx, slot: Pin<&mut Slot<F>>) {
        let prev_head = self.empty_head;
        *slot.project().empty_link = prev_head;
        self.empty_head = OptSlotIdx::from(slot_idx);
        self.slots_active -= 1;
    }

    #[inline]
    fn empty_stack_has_any(&self) -> bool {
        self.empty_head.is_set()
    }
}

/**

Dequeues pollable slots, polls them, and handles the result of that poll.

Includes cooperative yielding when a future is polled more than once, in the same loop.

 */
macro_rules! poll_loop {
    ($cx:ident, $token:ident, $activity:ident, $slots:ident, $shared_storage:ident, $slot:ident, $slot_idx:ident, $state_node:ident, $poll_res:ident, $if_none:expr, $on_poll:expr) => {{
        let poll_loop_idx = $activity.poll_loop_idx;
        $activity.poll_loop_idx = poll_loop_idx.wrapping_add(1);

        // Store the waker. If woken, we'll return back to this line
        unsafe {
            // SAFETY
            // register/unregister/wait_until can't be called concurrently
            // We're on the poll thread, and this is the only place we use this function
            $shared_storage.header.main_waker.register($cx.waker());
        }
        loop {
            let pop_pollable = pop_pollable($token, $activity, $shared_storage);
            let ($slot_idx, pollable) = match pop_pollable {
                // None found
                (OptSlotIdx::UNSET, _) => break $if_none,
                // Try again
                (_slot_idx, None) => continue,
                // Pollable slot
                (slot_idx, Some(pollable)) => (slot_idx.into_slot_idx(), pollable),
            };
            let mut $slot = $slot_idx.get_slot($slots.as_mut());
            let $state_node = pollable.state_node();

            if $slot.last_poll_loop_idx == poll_loop_idx {
                // Polled the same slot in this loop previously
                // To be fair (and maintain cooperative scheduling), yield once here
                // https://github.com/rust-lang/futures-rs/issues/2053

                // First, put the slot back on the queue
                unsafe {
                    // SAFETY
                    // 1. This node is not already enqueued
                    // 2. The state node and index are corresponding
                    // 3. It keeps the same status as it had previously (Init or Woken)
                    poll_queue_reinsert_head(
                        $token,
                        $activity,
                        $state_node,
                        StateNodeIdx::from($slot_idx),
                    );
                }
                // Now yield
                $cx.waker().wake_by_ref();
                break Poll::Pending;
            }
            // Ready to poll
            *$slot.as_mut().project().last_poll_loop_idx = poll_loop_idx;
            let $poll_res = pollable.call_poll($token, $slot.as_mut());
            $on_poll
        }
    }};
}

//
// Public API
//

impl<F, M> SlotPoller<F, M>
where
    F: Future,
    M: SlotMemory<F>,
{
    ///
    /// Tries to add another future to the stack. If there is no space, returns it back.
    ///
    pub fn try_push(&mut self, future: F) -> Result<(), TryPushErr<F>> {
        let EngineFields {
            activity,
            slots,
            shared_storage,
        } = self.memory.fields();
        if let Some((slot_idx, slot)) = activity.empty_stack_pop(slots) {
            let shared_storage = shared_storage as &SharedStorage;
            let state_node = slot_idx.get_state_node(shared_storage);
            unsafe {
                // SAFETY
                // slot is empty, and slot/slot_idx/state_node correspond
                slot.init_future(self.token, slot_idx, state_node, shared_storage, future);
            }
            Ok(())
        } else {
            Err(TryPushErr(future))
        }
    }

    ///
    /// Waits for the next vacant slot in the stack poller.
    ///
    /// The future created by this method returns a tuple. The first value is the output of any
    /// future that had to be completed, in order to find a free slot. The second value is the
    /// vacancy which is used to insert the future.
    ///
    #[inline]
    pub fn next_vacancy(&mut self) -> NextVacancyFuture<'_, F, M> {
        NextVacancyFuture {
            token: self.token,
            poller: Some(self),
        }
    }

    ///
    /// Waits for the next future to complete, if there are any futures.
    ///
    /// If there are no active futures, returns `None`. Otherwise, returns a future which polls
    /// tasks until one of them completes.
    ///
    #[inline]
    pub fn next_completion(&mut self) -> Option<NextCompletionFuture<'_, F, M>> {
        let fields = self.memory.fields();
        if fields.activity.slots_active == 0 {
            None
        } else {
            Some(NextCompletionFuture {
                token: self.token,
                poller: self,
            })
        }
    }

    ///
    /// Drains this poller, handling results using the function.
    ///
    /// This will poll all futures in the poller. When one future completes, it is handled using
    /// the given function.
    ///
    #[inline]
    pub fn drain<D>(&mut self, drain_function: D) -> DrainFuture<'_, F, M, D>
    where
        D: FnMut(F::Output),
    {
        DrainFuture {
            token: self.token,
            poller: self,
            drain_function: AssertUnpin(drain_function),
        }
    }

    ///
    /// Drains this poller, handling results using a fallible function.
    ///
    /// This is similar to [Self::drain]. However, if the function returns an error, that error
    /// is returned and the draining stops.
    ///
    #[inline]
    pub fn try_drain<D, E>(&mut self, drain_function: D) -> TryDrainFuture<'_, F, M, D, E>
    where
        D: FnMut(F::Output) -> Result<(), E>,
    {
        TryDrainFuture {
            token: self.token,
            poller: self,
            drain_function: AssertUnpin(drain_function),
        }
    }
}

//
// Pushing
//

///
/// The error produced from [SlotPoller::try_push], containing the future which wasn't pushed.
///
/// Because standard library methods like expect, unwrap, etc. require a [Debug] implementation,
/// this type provides it irrespective of the future type.
///
pub struct TryPushErr<F>(pub F);

impl<F> Debug for TryPushErr<F> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.write_str("Failed to push future: out of space")
    }
}

///
/// An open slot inside the stack poller.
///
/// Dropping this struct is harmless, and will let the slot inside the poller stay empty.
///
pub struct Vacancy<'p, F: Future, M: SlotMemory<F>> {
    token: PollThread,
    poller: &'p mut SlotPoller<F, M>,
}

impl<'p, F, M> Vacancy<'p, F, M>
where
    F: Future,
    M: SlotMemory<F>,
{
    ///
    /// Inserts a future into this vacant slot, consuming the vacancy
    ///
    pub fn insert(self, future: F) {
        let EngineFields {
            activity,
            slots,
            shared_storage,
        } = self.poller.memory.fields();
        let (slot_idx, slot) = activity.empty_stack_pop(slots).expect("Missing empty slot");
        let shared_storage = shared_storage as &SharedStorage;
        let state_node = slot_idx.get_state_node(shared_storage);
        unsafe {
            // SAFETY
            // slot is empty, and slot/slot_idx/state_node correspond
            slot.init_future(self.token, slot_idx, state_node, shared_storage, future);
        }
    }
}

///
/// The future returned from [SlotPoller::next_vacancy]
///
#[must_use = "futures do nothing unless polled"]
pub struct NextVacancyFuture<'p, F: Future, M: SlotMemory<F>> {
    token: PollThread,
    /// None if this future finished
    poller: Option<&'p mut SlotPoller<F, M>>,
}

impl<'p, F, M> Future for NextVacancyFuture<'p, F, M>
where
    F: Future,
    M: SlotMemory<F>,
{
    type Output = (Option<F::Output>, Vacancy<'p, F, M>);

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let token = self.token;
        let EngineFields {
            activity,
            mut slots,
            shared_storage,
        } = self
            .poller
            .as_mut()
            .expect("Future polled after completion")
            .memory
            .fields();
        if activity.empty_stack_has_any() {
            return Poll::Ready((
                None,
                Vacancy {
                    token,
                    poller: self.poller.take().unwrap(),
                },
            ));
        }
        poll_loop!(
            cx,
            token,
            activity,
            slots,
            shared_storage,
            slot,
            slot_idx,
            state_node,
            poll_res,
            Poll::Pending, // nothing to poll => keep waiting
            {
                if let Poll::Ready((res, droppable)) = poll_res {
                    // Future completed - mark as empty / drop it
                    activity.empty_stack_push(slot_idx, slot.as_mut());
                    droppable.drop_future(slot, state_node);

                    return Poll::Ready((
                        Some(res),
                        Vacancy {
                            token,
                            poller: self.poller.take().unwrap(),
                        },
                    ));
                }
            }
        )
    }
}

//
// Streaming
//

///
/// The future returned from [SlotPoller::next_completion].
///
/// This future will poll all the futures inside the slot poller until one of them returns a ready
/// result. Its existence as a struct statically guarantees that at least one future exists in the
/// poller.
///
#[must_use = "futures do nothing unless polled"]
pub struct NextCompletionFuture<'p, F: Future, M: SlotMemory<F>> {
    token: PollThread,
    poller: &'p mut SlotPoller<F, M>,
}

impl<'p, F, M> Future for NextCompletionFuture<'p, F, M>
where
    F: Future,
    M: SlotMemory<F>,
{
    type Output = F::Output;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let token = self.token;
        let EngineFields {
            activity,
            mut slots,
            shared_storage,
        } = self.poller.memory.fields();
        poll_loop!(
            cx,
            token,
            activity,
            slots,
            shared_storage,
            slot,
            slot_idx,
            state_node,
            poll_res,
            Poll::Pending, // nothing to poll => keep waiting
            {
                if let Poll::Ready((res, droppable)) = poll_res {
                    // Future completed - mark as empty / drop it
                    activity.empty_stack_push(slot_idx, slot.as_mut());
                    droppable.drop_future(slot, state_node);
                    return Poll::Ready(res);
                }
            }
        )
    }
}

//
// Draining
//

/// Helper struct that disavows pinning for the inner value
struct AssertUnpin<V>(V);

impl<V> Unpin for AssertUnpin<V> {}

///
/// The future returned from [SlotPoller::drain].
///
/// This future will poll all the futures inside the slot poller. When one of them completes, the
/// result is handled by the drain function.
///
#[must_use = "futures do nothing unless polled"]
pub struct DrainFuture<'p, F: Future, M: SlotMemory<F>, D: FnMut(F::Output)> {
    token: PollThread,
    poller: &'p mut SlotPoller<F, M>,
    drain_function: AssertUnpin<D>,
}

///
/// The future returned from [SlotPoller::try_drain].
///
/// This future will poll all the futures inside the slot poller. When one of them completes, the
/// result is handled by the drain function, which can return an error if it wants to abort the drain.
///
#[must_use = "futures do nothing unless polled"]
pub struct TryDrainFuture<'p, F: Future, M: SlotMemory<F>, D: FnMut(F::Output) -> Result<(), E>, E>
{
    token: PollThread,
    poller: &'p mut SlotPoller<F, M>,
    drain_function: AssertUnpin<D>,
}

impl<'p, F, M, D> Future for DrainFuture<'p, F, M, D>
where
    F: Future,
    M: SlotMemory<F>,
    D: FnMut(F::Output),
{
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let token = self.token;
        let this = self.get_mut();
        let EngineFields {
            activity,
            mut slots,
            shared_storage,
        } = this.poller.memory.fields();
        let drain_function = &mut this.drain_function.0;
        poll_loop!(
            cx,
            token,
            activity,
            slots,
            shared_storage,
            slot,
            slot_idx,
            state_node,
            poll_res,
            {
                // Nothing to poll, so check if complete
                if activity.slots_active == 0 {
                    Poll::Ready(())
                } else {
                    Poll::Pending
                }
            },
            {
                if let Poll::Ready((res, droppable)) = poll_res {
                    activity.empty_stack_push(slot_idx, slot.as_mut());
                    droppable.drop_future(slot, state_node);
                    drain_function(res);
                }
            }
        )
    }
}

impl<'p, F, M, D, E> Future for TryDrainFuture<'p, F, M, D, E>
where
    F: Future,
    M: SlotMemory<F>,
    D: FnMut(F::Output) -> Result<(), E>,
{
    type Output = Result<(), E>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let token = self.token;
        let this = self.get_mut();
        let EngineFields {
            activity,
            mut slots,
            shared_storage,
        } = this.poller.memory.fields();
        let drain_function = &mut this.drain_function.0;
        poll_loop!(
            cx,
            token,
            activity,
            slots,
            shared_storage,
            slot,
            slot_idx,
            state_node,
            poll_res,
            {
                // Nothing to poll, so check if complete
                if activity.slots_active == 0 {
                    Poll::Ready(Ok(()))
                } else {
                    Poll::Pending
                }
            },
            {
                if let Poll::Ready((res, droppable)) = poll_res {
                    activity.empty_stack_push(slot_idx, slot.as_mut());
                    droppable.drop_future(slot, state_node);
                    if let Err(e) = drain_function(res) {
                        return Poll::Ready(Err(e));
                    }
                }
            }
        )
    }
}