use crate::guard::{AnchorPollThread, PollThread, shared_storage_init, slot_init, state_node_iter};
use crate::polling::SlotStatus;
use crate::{EngineActivity, EngineFields, HeapSlots, Slot, StackSlots, StackSlotsRef, polling};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::mem::MaybeUninit;
use std::num::NonZeroU16;
use std::pin::Pin;
use std::process::abort;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
impl<const N: usize, F: Future> StackSlots<N, F> {
const AT_LEAST_ONE_SLOT: usize = N
.checked_sub(1)
.expect("StackSlots must have at least 1 slot");
const AT_MOST_U16_SLOTS: usize = (u16::MAX as usize)
.checked_sub(N)
.expect("StackSlots must have at least one slot");
pub fn new() -> Self {
let _ = Self::AT_LEAST_ONE_SLOT;
let _ = Self::AT_MOST_U16_SLOTS;
let shared_storage = shared_storage_init(N as u16);
let slots = unsafe {
let mut slots = MaybeUninit::<[Slot<F>; N]>::uninit();
let slots_ptr = slots.as_mut_ptr() as *mut [MaybeUninit<Slot<F>>; N];
slot_init(&mut *(slots_ptr));
slots.assume_init()
};
Self {
activity: EngineActivity::default(),
slots,
shared_storage,
}
}
}
impl<const N: usize, F: Future> Default for StackSlots<N, F> {
fn default() -> Self {
Self::new()
}
}
impl<F: Future> HeapSlots<F> {
pub fn with_capacity(capacity: NonZeroU16) -> Self {
let capacity = capacity.get();
let shared_storage = shared_storage_init(capacity);
let slots_ptr = unsafe {
let mut slots = Box::<[Slot<F>]>::new_uninit_slice(capacity as usize);
slot_init(&mut *slots);
let slots_ptr = Box::into_raw(slots.assume_init());
NonNull::new_unchecked(slots_ptr)
};
Self {
activity: EngineActivity::default(),
slots_ptr,
shared_storage,
_marker: PhantomData,
}
}
}
pub(super) trait Memory<F>: AnchorPollThread {
type Backing: MemoryBacking<F> + Debug;
fn into_backing(self) -> Self::Backing;
}
pub trait MemoryBacking<F> {
fn fields(&mut self) -> EngineFields<'_, F>;
}
impl<'pin, const N: usize, F: Future> Memory<F> for Pin<&'pin mut StackSlots<N, F>> {
type Backing = StackSlotsRef<'pin, F>;
fn into_backing(self) -> Self::Backing {
unsafe {
let this = self.get_unchecked_mut();
StackSlotsRef {
activity: &mut this.activity,
slots: Pin::new_unchecked(&mut this.slots),
shared_storage: &this.shared_storage,
}
}
}
}
impl<F: Future> MemoryBacking<F> for StackSlotsRef<'_, F> {
fn fields(&mut self) -> EngineFields<'_, F> {
EngineFields {
activity: self.activity,
slots: self.slots.as_mut(),
shared_storage: self.shared_storage,
}
}
}
impl<F: Future> Debug for StackSlotsRef<'_, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StackSlotsRef")
.field("activity", self.activity)
.field("slots", &self.slots)
.field("shared_storage", self.shared_storage)
.finish()
}
}
impl<F: Future> Memory<F> for HeapSlots<F> {
type Backing = Self;
fn into_backing(self) -> Self::Backing {
self
}
}
impl<F: Future> MemoryBacking<F> for HeapSlots<F> {
fn fields(&mut self) -> EngineFields<'_, F> {
EngineFields {
activity: &mut self.activity,
slots: unsafe {
let slots_ptr = self.slots_ptr.as_ptr();
Pin::new_unchecked(&mut *slots_ptr)
},
shared_storage: &self.shared_storage,
}
}
}
impl<F: Future> Debug for HeapSlots<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let slots = unsafe {
&*self.slots_ptr.as_ptr()
};
f.debug_struct("HeapSlots")
.field("activity", &self.activity)
.field("slots", &slots)
.field("shared_storage", &self.shared_storage)
.finish()
}
}
impl<const N: usize, F: Future> Drop for StackSlots<N, F> {
fn drop(&mut self) {
let pinned_self = unsafe {
Pin::new_unchecked(self)
};
let token = pinned_self.poll_thread();
let mut stack_slots_ref: StackSlotsRef<_> = pinned_self.into();
struct AbortOnDrop;
impl Drop for AbortOnDrop {
#[allow(unreachable_code)]
fn drop(&mut self) {
panic!("Stackful future collection panicked on drop - aborting...");
abort();
}
}
let abort_on_panic = AbortOnDrop;
stack_slots_ref.fields().drop_futures(token);
mem::forget(abort_on_panic);
}
}
impl<F: Future> Drop for HeapSlots<F> {
fn drop(&mut self) {
let token = self.poll_thread();
self.fields().drop_futures(token);
unsafe {
let _arr = Box::from_raw(self.slots_ptr.as_ptr());
}
}
}
impl<F: Future> EngineFields<'_, F> {
fn drop_futures(&mut self, token: PollThread) {
if self.activity.slots_active == 0 {
return;
}
for (slot_idx, state_node) in state_node_iter(self.shared_storage) {
let status = &state_node.status;
match status.load(Ordering::Relaxed) {
SlotStatus::Uninit | SlotStatus::UninitButEnqueued => {
}
SlotStatus::Init | SlotStatus::Waiting | SlotStatus::Woken => {
let mut slot = slot_idx.get_slot(self.slots.as_mut());
unsafe {
polling::call_drop(token, slot.as_mut(), status);
}
}
}
}
}
}