mod guard;
mod memory;
mod polling;
#[cfg(test)]
mod tests;
use crate::guard::{AtomicStateNodeIdx, OptSlotIdx, PollThread, SlotIdx, StateNodeIdx};
use crate::memory::MemoryBacking;
use crate::polling::{AtomicStatus, poll_queue_reinsert_head, pop_pollable};
use cache_padded::CachePadded;
use diatomic_waker::DiatomicWaker;
use std::alloc::Layout;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::{Context, Poll};
use triomphe::{Arc, HeaderSlice};
#[allow(private_bounds)]
pub trait SlotMemory<F: Future>: memory::Memory<F> {}
pub struct StackSlots<const N: usize, F: Future> {
activity: EngineActivity,
slots: [Slot<F>; N],
shared_storage: Arc<SharedStorage>,
}
struct StackSlotsRef<'pin, F: Future> {
activity: &'pin mut EngineActivity,
slots: Pin<&'pin mut [Slot<F>]>,
shared_storage: &'pin Arc<SharedStorage>,
}
impl<'pin, const N: usize, F: Future> From<Pin<&'pin mut StackSlots<N, F>>>
for StackSlotsRef<'pin, F>
{
fn from(memory: Pin<&'pin mut StackSlots<N, F>>) -> Self {
unsafe {
let memory = memory.get_unchecked_mut();
Self {
activity: &mut memory.activity,
slots: Pin::new_unchecked(&mut memory.slots),
shared_storage: &memory.shared_storage,
}
}
}
}
impl<'pin, const N: usize, F: Future> SlotMemory<F> for Pin<&'pin mut StackSlots<N, F>> {}
pub struct HeapSlots<F: Future> {
activity: EngineActivity,
slots_ptr: NonNull<[Slot<F>]>,
shared_storage: Arc<SharedStorage>,
_marker: PhantomData<[Slot<F>]>,
}
impl<F: Future> SlotMemory<F> for HeapSlots<F> {}
impl<F: Future> Unpin for HeapSlots<F> {}
#[derive(Debug)]
struct EngineActivity {
empty_head: OptSlotIdx,
slots_active: u16,
poll_queue_head: StateNodeIdx,
poll_loop_idx: u16,
}
struct EngineFields<'m, F> {
activity: &'m mut EngineActivity,
slots: Pin<&'m mut [Slot<F>]>,
shared_storage: &'m Arc<SharedStorage>,
}
#[derive(Debug)]
struct SharedStorageHeaderInner {
poll_queue_tail: AtomicStateNodeIdx,
nodes_len: u32,
main_waker: DiatomicWaker,
}
#[derive(Debug)]
#[repr(transparent)]
struct SharedStorageHeader(CachePadded<SharedStorageHeaderInner>);
impl SharedStorageHeader {
const BYTE_OFFSET: usize = Layout::new::<HeaderSlice<Self, [StateNode; 0]>>().size();
}
impl Deref for SharedStorageHeader {
type Target = SharedStorageHeaderInner;
#[inline]
fn deref(&self) -> &Self::Target {
&*self.0
}
}
type SharedStorage = HeaderSlice<SharedStorageHeader, [StateNode]>;
pub struct SlotPoller<F: Future, M: SlotMemory<F>> {
token: PollThread,
memory: M::Backing,
}
impl<F: Future, M: SlotMemory<F>> Debug for SlotPoller<F, M> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SlotPoller")
.field("token", &self.token)
.field("memory", &self.memory)
.finish()
}
}
impl<F: Future, M: SlotMemory<F>> SlotPoller<F, M> {
pub fn new(memory: M) -> Self {
Self {
token: memory.poll_thread(),
memory: memory.into_backing(),
}
}
}
struct Slot<F> {
future: MaybeUninit<F>,
empty_link: OptSlotIdx,
last_poll_loop_idx: u16,
}
impl<F> Slot<F> {
fn project(self: Pin<&mut Self>) -> SlotProject<'_, F> {
unsafe {
let this = self.get_unchecked_mut();
SlotProject {
future: Pin::new_unchecked(&mut this.future),
empty_link: &mut this.empty_link,
last_poll_loop_idx: &mut this.last_poll_loop_idx,
}
}
}
}
struct SlotProject<'s, F> {
future: Pin<&'s mut MaybeUninit<F>>,
empty_link: &'s mut OptSlotIdx,
last_poll_loop_idx: &'s mut u16,
}
impl<F> Debug for Slot<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
struct Opaque;
impl Debug for Opaque {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("opaque")
}
}
f.debug_struct("Slot")
.field("future", &Opaque)
.field("empty_link", &self.empty_link)
.field("last_poll_loop_idx", &self.last_poll_loop_idx)
.finish()
}
}
#[derive(Debug)]
struct StateNode {
status: AtomicStatus,
poll_queue_link: AtomicStateNodeIdx,
slot_idx: SlotIdx,
}
impl EngineActivity {
fn empty_stack_pop<'s, F>(
&mut self,
slots: Pin<&'s mut [Slot<F>]>,
) -> Option<(SlotIdx, Pin<&'s mut Slot<F>>)> {
let head = self.empty_head;
if head.is_set() {
let head = head.into_slot_idx();
let head_slot = head.get_slot(slots);
let new_head = head_slot.empty_link;
self.empty_head = new_head;
self.slots_active += 1;
Some((head, head_slot))
} else {
None
}
}
fn empty_stack_push<F>(&mut self, slot_idx: SlotIdx, slot: Pin<&mut Slot<F>>) {
let prev_head = self.empty_head;
*slot.project().empty_link = prev_head;
self.empty_head = OptSlotIdx::from(slot_idx);
self.slots_active -= 1;
}
#[inline]
fn empty_stack_has_any(&self) -> bool {
self.empty_head.is_set()
}
}
macro_rules! poll_loop {
($cx:ident, $token:ident, $activity:ident, $slots:ident, $shared_storage:ident, $slot:ident, $slot_idx:ident, $state_node:ident, $poll_res:ident, $if_none:expr, $on_poll:expr) => {{
let poll_loop_idx = $activity.poll_loop_idx;
$activity.poll_loop_idx = poll_loop_idx.wrapping_add(1);
unsafe {
$shared_storage.header.main_waker.register($cx.waker());
}
loop {
let pop_pollable = pop_pollable($token, $activity, $shared_storage);
let ($slot_idx, pollable) = match pop_pollable {
(OptSlotIdx::UNSET, _) => break $if_none,
(_slot_idx, None) => continue,
(slot_idx, Some(pollable)) => (slot_idx.into_slot_idx(), pollable),
};
let mut $slot = $slot_idx.get_slot($slots.as_mut());
let $state_node = pollable.state_node();
if $slot.last_poll_loop_idx == poll_loop_idx {
unsafe {
poll_queue_reinsert_head(
$token,
$activity,
$state_node,
StateNodeIdx::from($slot_idx),
);
}
$cx.waker().wake_by_ref();
break Poll::Pending;
}
*$slot.as_mut().project().last_poll_loop_idx = poll_loop_idx;
let $poll_res = pollable.call_poll($token, $slot.as_mut());
$on_poll
}
}};
}
impl<F, M> SlotPoller<F, M>
where
F: Future,
M: SlotMemory<F>,
{
pub fn try_push(&mut self, future: F) -> Result<(), TryPushErr<F>> {
let EngineFields {
activity,
slots,
shared_storage,
} = self.memory.fields();
if let Some((slot_idx, slot)) = activity.empty_stack_pop(slots) {
let shared_storage = shared_storage as &SharedStorage;
let state_node = slot_idx.get_state_node(shared_storage);
unsafe {
slot.init_future(self.token, slot_idx, state_node, shared_storage, future);
}
Ok(())
} else {
Err(TryPushErr(future))
}
}
#[inline]
pub fn next_vacancy(&mut self) -> NextVacancyFuture<'_, F, M> {
NextVacancyFuture {
token: self.token,
poller: Some(self),
}
}
#[inline]
pub fn next_completion(&mut self) -> Option<NextCompletionFuture<'_, F, M>> {
let fields = self.memory.fields();
if fields.activity.slots_active == 0 {
None
} else {
Some(NextCompletionFuture {
token: self.token,
poller: self,
})
}
}
#[inline]
pub fn drain<D>(&mut self, drain_function: D) -> DrainFuture<'_, F, M, D>
where
D: FnMut(F::Output),
{
DrainFuture {
token: self.token,
poller: self,
drain_function: AssertUnpin(drain_function),
}
}
#[inline]
pub fn try_drain<D, E>(&mut self, drain_function: D) -> TryDrainFuture<'_, F, M, D, E>
where
D: FnMut(F::Output) -> Result<(), E>,
{
TryDrainFuture {
token: self.token,
poller: self,
drain_function: AssertUnpin(drain_function),
}
}
}
pub struct TryPushErr<F>(pub F);
impl<F> Debug for TryPushErr<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("Failed to push future: out of space")
}
}
pub struct Vacancy<'p, F: Future, M: SlotMemory<F>> {
token: PollThread,
poller: &'p mut SlotPoller<F, M>,
}
impl<'p, F, M> Vacancy<'p, F, M>
where
F: Future,
M: SlotMemory<F>,
{
pub fn insert(self, future: F) {
let EngineFields {
activity,
slots,
shared_storage,
} = self.poller.memory.fields();
let (slot_idx, slot) = activity.empty_stack_pop(slots).expect("Missing empty slot");
let shared_storage = shared_storage as &SharedStorage;
let state_node = slot_idx.get_state_node(shared_storage);
unsafe {
slot.init_future(self.token, slot_idx, state_node, shared_storage, future);
}
}
}
#[must_use = "futures do nothing unless polled"]
pub struct NextVacancyFuture<'p, F: Future, M: SlotMemory<F>> {
token: PollThread,
poller: Option<&'p mut SlotPoller<F, M>>,
}
impl<'p, F, M> Future for NextVacancyFuture<'p, F, M>
where
F: Future,
M: SlotMemory<F>,
{
type Output = (Option<F::Output>, Vacancy<'p, F, M>);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let token = self.token;
let EngineFields {
activity,
mut slots,
shared_storage,
} = self
.poller
.as_mut()
.expect("Future polled after completion")
.memory
.fields();
if activity.empty_stack_has_any() {
return Poll::Ready((
None,
Vacancy {
token,
poller: self.poller.take().unwrap(),
},
));
}
poll_loop!(
cx,
token,
activity,
slots,
shared_storage,
slot,
slot_idx,
state_node,
poll_res,
Poll::Pending, {
if let Poll::Ready((res, droppable)) = poll_res {
activity.empty_stack_push(slot_idx, slot.as_mut());
droppable.drop_future(slot, state_node);
return Poll::Ready((
Some(res),
Vacancy {
token,
poller: self.poller.take().unwrap(),
},
));
}
}
)
}
}
#[must_use = "futures do nothing unless polled"]
pub struct NextCompletionFuture<'p, F: Future, M: SlotMemory<F>> {
token: PollThread,
poller: &'p mut SlotPoller<F, M>,
}
impl<'p, F, M> Future for NextCompletionFuture<'p, F, M>
where
F: Future,
M: SlotMemory<F>,
{
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let token = self.token;
let EngineFields {
activity,
mut slots,
shared_storage,
} = self.poller.memory.fields();
poll_loop!(
cx,
token,
activity,
slots,
shared_storage,
slot,
slot_idx,
state_node,
poll_res,
Poll::Pending, {
if let Poll::Ready((res, droppable)) = poll_res {
activity.empty_stack_push(slot_idx, slot.as_mut());
droppable.drop_future(slot, state_node);
return Poll::Ready(res);
}
}
)
}
}
struct AssertUnpin<V>(V);
impl<V> Unpin for AssertUnpin<V> {}
#[must_use = "futures do nothing unless polled"]
pub struct DrainFuture<'p, F: Future, M: SlotMemory<F>, D: FnMut(F::Output)> {
token: PollThread,
poller: &'p mut SlotPoller<F, M>,
drain_function: AssertUnpin<D>,
}
#[must_use = "futures do nothing unless polled"]
pub struct TryDrainFuture<'p, F: Future, M: SlotMemory<F>, D: FnMut(F::Output) -> Result<(), E>, E>
{
token: PollThread,
poller: &'p mut SlotPoller<F, M>,
drain_function: AssertUnpin<D>,
}
impl<'p, F, M, D> Future for DrainFuture<'p, F, M, D>
where
F: Future,
M: SlotMemory<F>,
D: FnMut(F::Output),
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let token = self.token;
let this = self.get_mut();
let EngineFields {
activity,
mut slots,
shared_storage,
} = this.poller.memory.fields();
let drain_function = &mut this.drain_function.0;
poll_loop!(
cx,
token,
activity,
slots,
shared_storage,
slot,
slot_idx,
state_node,
poll_res,
{
if activity.slots_active == 0 {
Poll::Ready(())
} else {
Poll::Pending
}
},
{
if let Poll::Ready((res, droppable)) = poll_res {
activity.empty_stack_push(slot_idx, slot.as_mut());
droppable.drop_future(slot, state_node);
drain_function(res);
}
}
)
}
}
impl<'p, F, M, D, E> Future for TryDrainFuture<'p, F, M, D, E>
where
F: Future,
M: SlotMemory<F>,
D: FnMut(F::Output) -> Result<(), E>,
{
type Output = Result<(), E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let token = self.token;
let this = self.get_mut();
let EngineFields {
activity,
mut slots,
shared_storage,
} = this.poller.memory.fields();
let drain_function = &mut this.drain_function.0;
poll_loop!(
cx,
token,
activity,
slots,
shared_storage,
slot,
slot_idx,
state_node,
poll_res,
{
if activity.slots_active == 0 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
},
{
if let Poll::Ready((res, droppable)) = poll_res {
activity.empty_stack_push(slot_idx, slot.as_mut());
droppable.drop_future(slot, state_node);
if let Err(e) = drain_function(res) {
return Poll::Ready(Err(e));
}
}
}
)
}
}