slotpoller 0.2.1

Bounded, lock-free futures collection. Faster than FuturesUnordered and other crates.
Documentation
/*
 * Copyright © 2026 Anand Beh
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
use crate::guard::{AnchorPollThread, PollThread, shared_storage_init, slot_init, state_node_iter};
use crate::polling::SlotStatus;
use crate::{EngineActivity, EngineFields, HeapSlots, Slot, StackSlots, StackSlotsRef, polling};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::mem::MaybeUninit;
use std::num::NonZeroU16;
use std::pin::Pin;
use std::process::abort;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;

/*

This module is responsible for abstracting over HeapSlots/StackSlots

 */

//
// Construction
//

impl<const N: usize, F: Future> StackSlots<N, F> {
    /// Need at least one slot, to make progress on futures
    ///
    /// ```compile_fail
    /// use std::future::PollFn;
    /// use std::task::{Context, Poll};
    /// use slotpoller::StackSlots;
    ///
    /// let stack_slots = StackSlots::<0, PollFn<fn(&mut Context) -> Poll<()>>>::new();
    /// ```
    const AT_LEAST_ONE_SLOT: usize = N
        .checked_sub(1)
        .expect("StackSlots must have at least 1 slot");
    /// Can have at most [u16::MAX] slots, by design.
    ///
    /// ```compile_fail
    /// use std::future::PollFn;
    /// use std::task::{Context, Poll};
    /// use slotpoller::StackSlots;
    ///
    /// let stack_slots = StackSlots::<{ usize::MAX }, PollFn<fn(&mut Context) -> Poll<()>>>::new();
    /// ```
    const AT_MOST_U16_SLOTS: usize = (u16::MAX as usize)
        .checked_sub(N)
        .expect("StackSlots must have at least one slot");

    ///
    /// Creates the stack memory.
    ///
    /// The placement of this function in your code will typically impose a significant amount of
    /// stack space.
    ///
    pub fn new() -> Self {
        let _ = Self::AT_LEAST_ONE_SLOT;
        let _ = Self::AT_MOST_U16_SLOTS;

        let shared_storage = shared_storage_init(N as u16);

        let slots = unsafe {
            // SAFETY
            // Initialize slots on the stack, one-by-one
            let mut slots = MaybeUninit::<[Slot<F>; N]>::uninit();
            let slots_ptr = slots.as_mut_ptr() as *mut [MaybeUninit<Slot<F>>; N];
            // Trusted function that initializes each slot
            slot_init(&mut *(slots_ptr));
            // Every element should now be initialized
            slots.assume_init()
        };
        Self {
            activity: EngineActivity::default(),
            slots,
            shared_storage,
        }
    }
}

impl<const N: usize, F: Future> Default for StackSlots<N, F> {
    fn default() -> Self {
        Self::new()
    }
}

impl<F: Future> HeapSlots<F> {
    ///
    /// Creates with the given capacity
    ///
    /// The capacity cannot be 0.
    ///
    pub fn with_capacity(capacity: NonZeroU16) -> Self {
        let capacity = capacity.get();

        let shared_storage = shared_storage_init(capacity);

        let slots_ptr = unsafe {
            // SAFETY
            // We manually initialize every slot
            let mut slots = Box::<[Slot<F>]>::new_uninit_slice(capacity as usize);
            // Trusted function that initializes each slot
            slot_init(&mut *slots);
            // Every slot is initialized by now
            let slots_ptr = Box::into_raw(slots.assume_init());
            // Pointer is non-null
            NonNull::new_unchecked(slots_ptr)
        };
        Self {
            activity: EngineActivity::default(),
            slots_ptr,
            shared_storage,
            _marker: PhantomData,
        }
    }
}

//
// Core accessors
//

pub(super) trait Memory<F>: AnchorPollThread {
    type Backing: MemoryBacking<F> + Debug;

    fn into_backing(self) -> Self::Backing;
}

pub trait MemoryBacking<F> {
    fn fields(&mut self) -> EngineFields<'_, F>;
}

impl<'pin, const N: usize, F: Future> Memory<F> for Pin<&'pin mut StackSlots<N, F>> {
    type Backing = StackSlotsRef<'pin, F>;

    fn into_backing(self) -> Self::Backing {
        unsafe {
            // SAFETY
            // The slots stay pinned - we project them, while other fields are Unpin
            let this = self.get_unchecked_mut();
            StackSlotsRef {
                activity: &mut this.activity,
                slots: Pin::new_unchecked(&mut this.slots),
                shared_storage: &this.shared_storage,
            }
        }
    }
}

impl<F: Future> MemoryBacking<F> for StackSlotsRef<'_, F> {
    fn fields(&mut self) -> EngineFields<'_, F> {
        EngineFields {
            activity: self.activity,
            slots: self.slots.as_mut(),
            shared_storage: self.shared_storage,
        }
    }
}

impl<F: Future> Debug for StackSlotsRef<'_, F> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("StackSlotsRef")
            .field("activity", self.activity)
            .field("slots", &self.slots)
            .field("shared_storage", self.shared_storage)
            .finish()
    }
}

impl<F: Future> Memory<F> for HeapSlots<F> {
    type Backing = Self;

    fn into_backing(self) -> Self::Backing {
        self
    }
}

impl<F: Future> MemoryBacking<F> for HeapSlots<F> {
    fn fields(&mut self) -> EngineFields<'_, F> {
        EngineFields {
            activity: &mut self.activity,
            slots: unsafe {
                // SAFETY
                // HeapMemory always stores a valid pointer, that stays pinned
                let slots_ptr = self.slots_ptr.as_ptr();
                Pin::new_unchecked(&mut *slots_ptr)
            },
            shared_storage: &self.shared_storage,
        }
    }
}

impl<F: Future> Debug for HeapSlots<F> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let slots = unsafe {
            // SAFETY
            // HeapMemory always stores a valid pointer
            &*self.slots_ptr.as_ptr()
        };
        f.debug_struct("HeapSlots")
            .field("activity", &self.activity)
            .field("slots", &slots)
            .field("shared_storage", &self.shared_storage)
            .finish()
    }
}

//
// Upholding the Drop guarantee for pinned data
//

/*
Stackful drop guarantee: if StackSlotsRef was ever created, then StackSlots had
to be pinned. Therefore, we rely on the Drop guarantee of StackSlots in order to
satisfy the Drop guarantee of pollable futures.
 */
impl<const N: usize, F: Future> Drop for StackSlots<N, F> {
    fn drop(&mut self) {
        let pinned_self = unsafe {
            /*
            SAFETY

            It's only relevant whether the futures are pinned. If any future was
            polled, then it is also pinned (StackSlotsRef requires pinned Self).

            Either:
            1. The future is never polled (StackSlotsRef never created)
            2. This StackSlots was pinned by the library user

            Both cases mean that pinning Self is accurate, for drop purposes.
             */
            Pin::new_unchecked(self)
        };
        let token = pinned_self.poll_thread();
        let mut stack_slots_ref: StackSlotsRef<_> = pinned_self.into();

        /*
        We must drop all futures, or else abort the process.

        After this function returns, the stack space we currently occupy is definitely
        going to be re-used. If we can't drop some of the futures, though, then we can't
        let that happen. Hence, we abort if one of the destructors panics.

        Usage of regular stack-pinning, such as through pin!, doesn't have this issue.
        That's because only a single element is pinned, so its destructor is always called.

        But since we implement multiple futures, ALL of them must be successfully dropped.
        Anything else is undefined behavior (violating the Drop guarantee) when the stack
        space is (inevitably) reused.
         */
        struct AbortOnDrop;

        impl Drop for AbortOnDrop {
            #[allow(unreachable_code)]
            fn drop(&mut self) {
                // If we're here, then code is already panicking
                panic!("Stackful future collection panicked on drop - aborting...");
                // Just in case, call explicit abort
                abort();
            }
        }
        let abort_on_panic = AbortOnDrop;
        stack_slots_ref.fields().drop_futures(token);
        mem::forget(abort_on_panic);
    }
}

/*
Heapful drop guarantee: if dropping the futures fails due to a panic, the slot memory
is leaked. This upholds the Drop guarantee that the memory won't be used again.
 */
impl<F: Future> Drop for HeapSlots<F> {
    fn drop(&mut self) {
        let token = self.poll_thread();
        self.fields().drop_futures(token);

        unsafe {
            // SAFETY
            // HeapMemory always stores a valid pointer to the slots array
            let _arr = Box::from_raw(self.slots_ptr.as_ptr());
        }
    }
}

impl<F: Future> EngineFields<'_, F> {
    fn drop_futures(&mut self, token: PollThread) {
        if self.activity.slots_active == 0 {
            // No slots have futures in them => skip iteration
            return;
        }
        for (slot_idx, state_node) in state_node_iter(self.shared_storage) {
            /*
            Ordering:
            Relaxed - we're on the poll thread. We only need to know if future is initialized.
             */
            let status = &state_node.status;
            match status.load(Ordering::Relaxed) {
                SlotStatus::Uninit | SlotStatus::UninitButEnqueued => {
                    // Not initialized
                }
                SlotStatus::Init | SlotStatus::Waiting | SlotStatus::Woken => {
                    let mut slot = slot_idx.get_slot(self.slots.as_mut());
                    unsafe {
                        // SAFETY
                        // Status guarantees future is initialized
                        polling::call_drop(token, slot.as_mut(), status);
                    }
                }
            }
        }
    }
}