slotpoller 0.2.1

Bounded, lock-free futures collection. Faster than FuturesUnordered and other crates.
Documentation
/*
 * Copyright © 2026 Anand Beh
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

//!
//! An internal module for protecting internal preconditions
//!
//! Using this module makes the whole library a lot easier to implement.
//!

use crate::polling::AtomicStatus;
use crate::{
    EngineActivity, HeapSlots, SharedStorage, SharedStorageHeader, SharedStorageHeaderInner, Slot,
    StackSlots, StateNode,
};
use cache_padded::CachePadded;
use diatomic_waker::DiatomicWaker;
use std::cell::Cell;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::sync::MutexGuard;
use std::sync::atomic::{AtomicU32, Ordering};
use triomphe::Arc;

/*

A token that is !Sync and !Send. Like PhantomUnsend/PhantomUnsync if they existed.

This token gets created when the poller is created. So, although the StackSlots/HeapSlots
memory providers might be sendable, the poller certainly won't be. We kinda rely on this
in src/polling.rs during the pollable slot dequeue operation. Also, I'm not sure if our
waker storage mechanisms are quite suited to moving a poller to a different thread, where
the runtime might impose a different waker.

If we do more research on this, it's possible we could relax this in the future, to enable
Send pollers such as for purposes like scoped threads. However, I haven't tested such a
niche use case, and I don't really see the point of it.

So for now, we are !Sync and !Send once the poller is made.

 */

/// A token indicating we are on the poller thread (which created the poller)
#[derive(Copy, Clone)]
// Struct value guarantees we are not Sync (Cell) and not Send (MutexGuard)
pub(super) struct PollThread(PhantomData<(Cell<usize>, MutexGuard<'static, ()>)>);

impl Debug for PollThread {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PollThread").finish()
    }
}

/// A trait that is invoked when the poller is first created
pub(super) unsafe trait AnchorPollThread {
    fn poll_thread(&self) -> PollThread;
}

/// Safety
/// A pinned StackSlots stays on that thread (token can only be made once)
unsafe impl<'pin, const N: usize, F: Future> AnchorPollThread for Pin<&'pin mut StackSlots<N, F>> {
    fn poll_thread(&self) -> PollThread {
        PollThread(PhantomData)
    }
}

/// Safety: see below
unsafe impl<F: Future> AnchorPollThread for HeapSlots<F> {
    /*
    This theoretically can be called on different threads. In practice however, it's not
    a problem. That's because the HeapMemory is consumed by the poller when the poller is
    created, and the poller stores the thread token inside itself.

    If the HeapMemory was never given to a poller, that's okay, and it doesn't matter which
    thread calls drop handling.
     */
    fn poll_thread(&self) -> PollThread {
        PollThread(PhantomData)
    }
}

/*

A slot index corresponds to the array of futures. They are numbered 0..N where N is the
total number of slots. Note that N is exclusive.

We limit ourselves to u16::MAX slots. This means N <= u16::MAX, so that any slot is
at most u16::MAX - 1.

 */

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(super) struct SlotIdx(u16);

impl SlotIdx {
    pub(super) fn value(self) -> u16 {
        self.0
    }
}

/*

An "optional" slot index takes advantage of the u16::MAX niche to represent UNSET.

This provides better memory efficiency by packing Option<SlotIdx> into a single u16.

 */

#[derive(Clone, Copy, PartialEq, Eq)]
pub(super) struct OptSlotIdx(u16);

impl Debug for OptSlotIdx {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        if self.is_set() {
            f.debug_tuple("OptSlotIdx").field(&self.0).finish()
        } else {
            f.write_str("OptSlotIdx::UNSET")
        }
    }
}

impl OptSlotIdx {
    pub(super) const UNSET: Self = Self(u16::MAX);

    pub(super) fn is_set(&self) -> bool {
        *self != Self::UNSET
    }

    pub(super) fn into_slot_idx(self) -> SlotIdx {
        // We expect this to be optimized out, following a check to Self::is_set
        assert_ne!(self, Self::UNSET, "Self is unset!");
        SlotIdx(self.0)
    }
}

impl From<SlotIdx> for OptSlotIdx {
    fn from(value: SlotIdx) -> Self {
        Self(value.0)
    }
}

impl SlotIdx {
    pub(super) fn get_slot<F>(self, slots: Pin<&mut [Slot<F>]>) -> Pin<&mut Slot<F>> {
        let idx = self.0 as usize;
        unsafe {
            // SAFETY
            // Self and slot stay pinned
            let slots = slots.get_unchecked_mut();
            // Internally, we guarantee valid indices
            debug_assert!(idx < slots.len());
            let slot = slots.get_unchecked_mut(idx);
            // Passthrough pinning guarantee
            // This relies on the backing memory in src/memory.rs upholding the Drop guarantee
            Pin::new_unchecked(slot)
        }
    }
}

impl Default for EngineActivity {
    fn default() -> Self {
        Self {
            // In src/memory.rs, we guarantee at least 1 slot
            empty_head: OptSlotIdx::from(SlotIdx(0)),
            slots_active: 0,
            poll_queue_head: StateNodeIdx::STUB,
            poll_loop_idx: 1,
        }
    }
}

pub(super) fn slot_init<F>(slots: &mut [MaybeUninit<Slot<F>>]) {
    let slots_len = slots.len();
    // These are debug checks because we expect src/memory.rs to uphold them
    debug_assert_ne!(0, slots_len, "Length must be non-zero");
    debug_assert!(
        slots_len <= u16::MAX as usize,
        "Length can be at most u16::MAX"
    );

    for (idx, slot) in slots.iter_mut().enumerate() {
        let next_idx = idx + 1;
        let empty_link = if next_idx == slots_len {
            // The last slot in the array. Has no more empty slots after it
            OptSlotIdx::UNSET
        } else {
            OptSlotIdx::from(SlotIdx(next_idx as u16))
        };
        slot.write(Slot {
            future: MaybeUninit::uninit(),
            empty_link,
            last_poll_loop_idx: 0,
        });
    }
}

/*

State nodes are very similar to slots. However, there is a crucial difference: the stub node.

We have (1 + slot_count) state nodes. The first state node represents the stub of the concurrent
linked queue. The stub node's poll_queue_link (what we are interested in) doesn't exist as a separate
field, in order to remove branches and treat the stub node as interchangeable with the others.

The stub node has an index of 0, StateNodeIdx::STUB.

Every other state node has an index of (1 + slot_index). These state nodes have indices 1 all the way
up to the number of slots. They are thus at least 1 and at most u16::MAX.

Finally, StateNodeIdx::UNSET represents no state node. This has to be checked for manually, as our
code for state nodes is more complicated than our code for slots. As such, operations involving state
nodes are sometimes unsafe, if they assume a non-UNSET state node.

 */

pub(super) fn shared_storage_init(len: u16) -> Arc<SharedStorage> {
    assert_ne!(len, 0, "Length cannot be 0");

    // Create len + 1 state nodes, and initialize their slot indices
    let state_node_iter = (0..=len).into_iter().map(|idx| StateNode {
        status: AtomicStatus::default(),
        poll_queue_link: AtomicStateNodeIdx::new(StateNodeIdx::UNSET),
        slot_idx: if idx == 0 {
            SlotIdx(0) // Stub node. This slot index is a dummy and shouldn't be used
        } else {
            SlotIdx(idx - 1)
        },
    });
    Arc::from_header_and_iter(
        SharedStorageHeader(CachePadded::new(SharedStorageHeaderInner {
            poll_queue_tail: AtomicStateNodeIdx::new(StateNodeIdx::STUB),
            nodes_len: (len as u32) + 1,
            main_waker: DiatomicWaker::new(),
        })),
        state_node_iter,
    )
}

/// An iterator over the meaningful state nodes (those that correspond to slots).
pub(super) fn state_node_iter(
    shared_storage: &SharedStorage,
) -> impl IntoIterator<Item = (SlotIdx, &StateNode)> {
    shared_storage.slice[1..]
        .iter()
        // Because we skipped the stub node, enumerate() gives us slot indices
        .enumerate()
        .map(|(idx, state_node)| (SlotIdx(idx as u16), state_node))
}

impl SlotIdx {
    /// Gets the state node corresponding to a slot index.
    pub(super) fn get_state_node(self, shared_storage: &SharedStorage) -> &StateNode {
        // Add 1 to skip the stub node
        let idx = (self.0 + 1) as usize;
        unsafe {
            // SAFETY
            // Internally, we guarantee valid indices
            debug_assert!(idx < shared_storage.slice.len());
            shared_storage.slice.get_unchecked(idx)
        }
    }
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub(super) struct StateNodeIdx(u32);

impl Debug for StateNodeIdx {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        if *self == Self::STUB {
            f.write_str("StateNodeIdx::STUB")
        } else if *self == Self::UNSET {
            f.write_str("StateNodeIdx::UNSET")
        } else {
            f.debug_tuple("StateNodeIdx").field(&self.0).finish()
        }
    }
}

impl StateNodeIdx {
    pub(super) const UNSET: Self = Self(u32::MAX);
    pub(super) const STUB: Self = Self(0);

    /// Gets the state node at the given index.
    ///
    /// Take care: if this index is for the stub, then the stub node will be returned.
    ///
    /// SAFETY
    ///
    /// Caller ensures that this index is not equal to [Self::UNSET]
    ///
    /// Caller promises that if index is equal to [Self::STUB], that the slot index in the
    /// returned dummy state node will not be used.
    pub(super) unsafe fn get_state_node(self, shared_storage: &SharedStorage) -> &StateNode {
        let idx = self.0 as usize;
        unsafe {
            // SAFETY
            // Internally, we guarantee valid indices except for UNSET
            debug_assert!(idx < shared_storage.slice.len());
            shared_storage.slice.get_unchecked(idx)
        }
    }

    /// Gets the state node's link to the next node, for the node at the givenindex.
    ///
    /// SAFETY
    ///
    /// Caller ensures that this index is not equal to [Self::UNSET]
    pub(super) unsafe fn get_poll_queue_link(
        self,
        shared_storage: &SharedStorage,
    ) -> &AtomicStateNodeIdx {
        unsafe {
            // SAFETY
            // We guarantee valid indices except for UNSET
            // If self == STUB, that's okay, since we never touch the dummy slot index
            &self.get_state_node(shared_storage).poll_queue_link
        }
    }

    /// Converts into the slot index
    ///
    /// SAFETY
    ///
    /// Caller ensures that this index is not equal to [Self::UNSET] or [Self::STUB]
    pub(super) unsafe fn into_slot_idx(self) -> SlotIdx {
        unsafe {
            // SAFETY
            // u16::MAX is the maximum state node index, except for UNSET
            let val = self.0;
            // self != STUB means that val != 0
            std::hint::assert_unchecked(val != 0);
            SlotIdx((val as u16) - 1)
        }
    }
}

impl From<SlotIdx> for StateNodeIdx {
    fn from(value: SlotIdx) -> Self {
        // Safety: slot_idx is at most u16::MAX - 1
        Self((value.0 + 1) as u32)
    }
}

pub(super) struct AtomicStateNodeIdx(AtomicU32);

impl Debug for AtomicStateNodeIdx {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        StateNodeIdx::fmt(&StateNodeIdx(self.0.load(Ordering::Relaxed)), f)
    }
}

impl AtomicStateNodeIdx {
    pub(super) const fn new(value: StateNodeIdx) -> Self {
        Self(AtomicU32::new(value.0))
    }
}

impl AtomicStateNodeIdx {
    pub(super) fn load(&self, ordering: Ordering) -> StateNodeIdx {
        StateNodeIdx(self.0.load(ordering))
    }

    pub(super) fn store(&self, new_value: StateNodeIdx, ordering: Ordering) {
        self.0.store(new_value.0, ordering);
    }

    pub(super) fn swap(&self, new_value: StateNodeIdx, ordering: Ordering) -> StateNodeIdx {
        let previous = self.0.swap(new_value.0, ordering);
        StateNodeIdx(previous)
    }
}