Skip to main content

nexus_async_rt/
task.rs

1//! Task storage: header + future/output union in a contiguous allocation.
2//!
3//! Each task is a `Task<F>` struct. The raw pointer to the allocation
4//! IS the task handle — no index layer, no separate metadata store.
5//!
6//! The waker holds the raw pointer directly. `wake()` sets `QUEUED`
7//! and pushes the pointer to the ready queue. Zero allocations.
8//!
9//! Tasks can be allocated via Box (default) or slab (power user).
10//! The `free_fn` in the header knows how to deallocate regardless
11//! of which allocator was used.
12//!
13//! ## Packed state word
14//!
15//! All task state (flags + refcount) is packed into a single `AtomicUsize`:
16//!
17//! ```text
18//! bits 0-5:   flags (COMPLETED, QUEUED, HAS_JOIN, ABORTED, OUTPUT_TAKEN, SLAB_ALLOCATED)
19//! bits 6+:    refcount (shifted by 6)
20//! ```
21//!
22//! This eliminates the SIGABRT race where `Executor::drop` reads
23//! `ref_count` and `is_completed` as separate atomics and a cross-thread
24//! waker can decrement the refcount between those reads.
25//!
26//! The state word naturally converges to `TERMINAL = COMPLETED = 1`
27//! when all refs are decremented and all transient flags are cleared.
28//! The free check is one comparison: `state == TERMINAL`.
29//!
30//! ## Union storage
31//!
32//! The slot at `storage_offset` holds either `F` (the future) or `T` (the output),
33//! never both. While running, `F` is live. When the future completes,
34//! `poll_join` drops `F` in place and writes `T` to the same bytes.
35//! `drop_fn` is overwritten from `drop_fn::<F>` to `drop_output::<T>`
36//! so subsequent cleanup targets the correct type.
37//!
38//! ## `TaskRef` ownership rule
39//!
40//! [`TaskRef`] covers every refcount holder EXCEPT the executor's own
41//! `all_tasks` ref. That single ref uses [`complete_and_unref`] directly
42//! (atomic COMPLETED-set + decrement), bypassing TaskRef's Drop-time
43//! `dispose_terminal` routing. Wrapping `all_tasks` ownership in TaskRef
44//! would route terminal frees through `dispose_terminal` →
45//! `try_defer_free`, double-handling tasks the executor is already
46//! tracking. Subtle regression — don't do it.
47//!
48//! Everything else (local wakers, cross-thread wakers, channel slots,
49//! `JoinHandle`) IS a `TaskRef`.
50
51use std::cell::UnsafeCell;
52use std::future::Future;
53use std::marker::PhantomData;
54use std::pin::Pin;
55use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
56use std::task::{Context, Poll, Waker};
57
58use crate::cross_wake::CrossWakeContext;
59
60// =============================================================================
61// Packed state word — constants
62// =============================================================================
63
64/// Task has completed (future returned Ready or was aborted).
65const COMPLETED: usize = 1 << 0;
66/// Task is in a ready queue (dedup flag).
67const QUEUED: usize = 1 << 1;
68/// JoinHandle exists for this task.
69const HAS_JOIN: usize = 1 << 2;
70/// abort() was called.
71const ABORTED: usize = 1 << 3;
72/// JoinHandle consumed the output via poll.
73const OUTPUT_TAKEN: usize = 1 << 4;
74/// Task was allocated from the slab (permanent flag, set at spawn).
75const SLAB_ALLOCATED: usize = 1 << 5;
76/// Mask for all flag bits (0-5).
77const FLAG_MASK: usize = 0b11_1111;
78/// One reference count unit (bit 6).
79const REF_ONE: usize = 1 << 6;
80/// Mask for refcount bits (6+).
81const REF_MASK: usize = !FLAG_MASK;
82
83/// Lifecycle flags: must be cleared before a task can reach terminal.
84/// QUEUED: someone needs to pop this task from a queue.
85/// HAS_JOIN: a JoinHandle still exists and must be dropped.
86const LIFECYCLE_MASK: usize = QUEUED | HAS_JOIN;
87
88/// Inert flags: permanent metadata or historical — don't block terminal.
89/// SLAB_ALLOCATED: permanent, set at spawn.
90/// ABORTED: historical, set on abort — no cleanup gated on clearing it.
91/// OUTPUT_TAKEN: historical, set when output read — same.
92const INERT_MASK: usize = SLAB_ALLOCATED | ABORTED | OUTPUT_TAKEN;
93
94/// What to do when a ref_dec or complete_and_unref produces a terminal state.
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub(crate) enum FreeAction {
97    /// Task still has outstanding refs or unchecked flags. No action.
98    Retain,
99    /// Box-allocated terminal. Free from any thread via free_task.
100    FreeBox,
101    /// Slab-allocated terminal. Route to executor thread for slab free.
102    FreeSlab,
103}
104
105// =============================================================================
106// TaskRef — RAII smart pointer pairing ref_inc with ref_dec
107// =============================================================================
108
109/// Refcounted handle to a task. Pairs `ref_inc` with `ref_dec` at
110/// compile time — Drop calls `ref_dec` and routes terminal results
111/// through `dispose_terminal`.
112///
113/// `TaskRef` is the canonical refcount holder for everything except the
114/// executor's `all_tasks` ref (see the module-level doc-block). Local
115/// wakers, cross-thread wakers, channel slots, and `JoinHandle` all
116/// hold a `TaskRef`.
117///
118/// # Invariants
119///
120/// - Each `TaskRef` owns exactly one refcount unit on the underlying task.
121/// - `Drop` decrements; if terminal, the task is routed via
122///   `crate::cross_wake::dispose_terminal` (defer via `try_defer_free`
123///   on the owning executor thread or for null-ctx test tasks; queue
124///   via the cross-wake queue off-thread).
125pub(crate) struct TaskRef {
126    ptr: *mut u8,
127}
128
129impl TaskRef {
130    /// Acquire a reference. Increments the task's refcount.
131    ///
132    /// # Safety
133    ///
134    /// `ptr` must point to a live task with refcount >= 1 at the time of call.
135    #[inline]
136    pub(crate) unsafe fn acquire(ptr: *mut u8) -> Self {
137        unsafe { ref_inc(ptr) };
138        Self { ptr }
139    }
140
141    /// Wrap a pre-incremented pointer (no `ref_inc` here).
142    ///
143    /// Use when the caller has already accounted for the ref (e.g., on
144    /// the boundary of a vtable handoff like `RawWaker::data` →
145    /// `wake_fn` consuming the ref).
146    ///
147    /// # Safety
148    ///
149    /// `ptr` owns one ref. The caller must not also drop it.
150    #[inline]
151    pub(crate) unsafe fn from_owned(ptr: *mut u8) -> Self {
152        Self { ptr }
153    }
154
155    /// The raw task pointer this handle holds.
156    #[inline]
157    pub(crate) fn as_ptr(&self) -> *mut u8 {
158        self.ptr
159    }
160}
161
162impl Drop for TaskRef {
163    #[inline]
164    fn drop(&mut self) {
165        match unsafe { ref_dec(self.ptr) } {
166            FreeAction::Retain => {}
167            FreeAction::FreeBox | FreeAction::FreeSlab => {
168                // SAFETY: terminal state — ref just dropped to 0 with
169                // all lifecycle flags clear. dispose_terminal routes
170                // per the task's header context and current thread:
171                //
172                //   - On-thread (or null-ctx test task): try_defer_free
173                //     pushes to DEFERRED_FREE TLS if a poll cycle is
174                //     active; otherwise the slot leaks until
175                //     Executor::drop reclaims via its all_tasks scan.
176                //   - Off-thread: queues via the cross-wake queue +
177                //     conditional eventfd poke.
178                //
179                // Direct-free is never used — would race
180                // Executor::all_tasks bookkeeping. See dispose_terminal's
181                // doc-comment in `cross_wake.rs` for the full rationale.
182                unsafe { crate::cross_wake::dispose_terminal(self.ptr) };
183            }
184        }
185    }
186}
187
188// SAFETY: TaskRef is a raw pointer + refcount discipline. The underlying
189// task allocation is Send-safe (atomic state, no thread-affine fields
190// in the header). Cross-thread holders (tokio_compat, channel slots)
191// store TaskRef across threads, so it must be Send.
192unsafe impl Send for TaskRef {}
193// Not Sync — only the holder may drop. Cloning a TaskRef means a new
194// ref_inc; aliasing through &TaskRef would let two holders ref_dec
195// the same logical ref.
196
197// =============================================================================
198// Task layout
199// =============================================================================
200
201/// Header size in bytes. Must match the layout of `Task<F>` before the
202/// `storage` field.
203pub const TASK_HEADER_SIZE: usize = 72;
204
205/// Task header + storage in a contiguous allocation. `repr(C)` for
206/// deterministic layout.
207///
208/// `S` is the storage type — either just `F` (fire-and-forget) or a union
209/// of `F` and `T` (joinable). The header is always 72 bytes regardless of `S`.
210///
211/// Layout (64-bit):
212/// ```text
213/// offset  0: poll_fn        (8B, fn pointer — polls the future)
214/// offset  8: drop_fn        (8B, fn pointer — drops F or T in place)
215/// offset 16: free_fn        (8B, fn pointer — deallocates the task storage)
216/// offset 24: state          (8B, AtomicUsize — packed flags + refcount)
217/// offset 32: cross_next     (8B, AtomicPtr — intrusive cross-thread wake queue)
218/// offset 40: join_waker     (16B, UnsafeCell<Option<Waker>>)
219/// offset 56: storage_offset (2B, u16 — byte offset to storage field)
220/// offset 58: _pad           (2B)
221/// offset 60: tracker_key    (4B, u32 — index in Executor::all_tasks slab)
222/// offset 64: cross_wake_ctx (8B, *const CrossWakeContext — cold; read by dispose_terminal)
223/// offset 72: storage        (S bytes — future F or union { F, T })
224/// ```
225///
226/// `cross_wake_ctx` lives at the end of the header because it's only
227/// touched on terminal Drop (cold path); hot-path reads (state, drop_fn,
228/// poll_fn, free_fn) stay near the cache-line head.
229#[repr(C)]
230pub(crate) struct Task<S> {
231    /// Polls the future. Receives the task base pointer.
232    poll_fn: unsafe fn(*mut u8, &mut Context<'_>) -> Poll<()>,
233    /// Drops the value at `storage_offset` (future F or output T). Receives base pointer.
234    drop_fn: unsafe fn(*mut u8),
235    /// Deallocates the task storage.
236    free_fn: unsafe fn(*mut u8),
237    /// Packed state word: flags (bits 0-5) + refcount (bits 6+).
238    state: AtomicUsize,
239    /// Intrusive next pointer for the cross-thread wake queue.
240    cross_next: AtomicPtr<u8>,
241    /// Waker for the task awaiting this JoinHandle.
242    join_waker: UnsafeCell<Option<Waker>>,
243    /// Byte offset from task base to the storage field.
244    /// Set at construction from `offset_of!(Task<S>, storage)`.
245    storage_offset: u16,
246    /// Padding for alignment.
247    _pad: [u8; 2],
248    /// Index into the Executor's `all_tasks` slab.
249    tracker_key: u32,
250    /// Pointer to the runtime's [`CrossWakeContext`] (Arc-backed, heap-stable).
251    /// Set at spawn time. Read by `dispose_terminal` on terminal Drop to
252    /// route the task through the owning executor (defer locally) or its
253    /// cross-thread queue (off-thread). Null for tasks not associated
254    /// with any runtime (test-only `Task::new_boxed` path).
255    cross_wake_ctx: *const CrossWakeContext,
256    storage: S,
257}
258
259/// Union storage for joinable tasks. Sized to fit both the future F
260/// and the output T in the same allocation.
261#[repr(C)]
262pub(crate) union FutureOrOutput<F, T> {
263    pub(crate) future: std::mem::ManuallyDrop<F>,
264    pub(crate) output: std::mem::ManuallyDrop<T>,
265}
266
267// Static assertion: header layout matches TASK_HEADER_SIZE.
268const _: () = {
269    assert!(std::mem::size_of::<Task<()>>() == TASK_HEADER_SIZE);
270};
271
272impl<F: Future<Output = ()> + 'static> Task<F> {
273    /// Construct a fire-and-forget task (no JoinHandle) with Box-based free.
274    ///
275    /// Used internally for tests and low-level task construction.
276    /// `ref_count = 1` (executor only), `HAS_JOIN` not set.
277    /// `cross_wake_ctx` is null — test tasks aren't registered with any
278    /// runtime. Terminal frees go through `dispose_terminal`'s on-thread
279    /// defer path (`try_defer_free` if a poll cycle is active, otherwise
280    /// leak until `Executor::drop`'s `all_tasks` scan reclaims them).
281    /// Direct-free is unsafe even for null-ctx tasks because
282    /// `dispose_terminal` doesn't own `all_tasks` bookkeeping — see
283    /// `dispose_terminal`'s doc-comment in `cross_wake.rs` for the full
284    /// rationale.
285    ///
286    /// # Why `Output = ()` is required
287    ///
288    /// This uses `poll_join::<F>` which writes T at the storage offset
289    /// after dropping F. The storage is `F` (not `FutureOrOutput<F, T>`),
290    /// so it's only sized for F. With `T = ()` (ZST), the write is
291    /// zero-size and the `drop_fn` overwrite to `drop_output::<()>` is a
292    /// no-op. Relaxing this bound to non-ZST T would write T into
293    /// storage not sized for it — UB.
294    #[cfg(test)]
295    #[inline]
296    pub(crate) fn new_boxed(future: F, tracker_key: u32) -> Self {
297        Self {
298            poll_fn: poll_join::<F>,
299            drop_fn: drop_future::<F>,
300            free_fn: box_free::<F>,
301            state: AtomicUsize::new(REF_ONE),
302            cross_next: AtomicPtr::new(std::ptr::null_mut()),
303            join_waker: UnsafeCell::new(None),
304            storage_offset: std::mem::offset_of!(Task<F>, storage) as u16,
305            tracker_key,
306            _pad: [0; 2],
307            cross_wake_ctx: std::ptr::null(),
308            storage: future,
309        }
310    }
311}
312
313/// Allocate a joinable Box task and return the raw pointer.
314///
315/// The task has `ref_count = 2` (executor + JoinHandle) and `HAS_JOIN` set.
316/// The allocation is sized for `max(size_of::<F>(), size_of::<T>())` via
317/// the `FutureOrOutput<F, T>` union.
318///
319/// `cross_wake_ctx` should be `Arc::as_ptr(&runtime.cross_wake)` for
320/// real spawns (Arc-backed, heap-stable), or `std::ptr::null()` for
321/// tasks not associated with any runtime (test paths only). Read by
322/// `dispose_terminal` on terminal Drop.
323pub(crate) fn box_spawn_joinable<F>(
324    future: F,
325    tracker_key: u32,
326    cross_wake_ctx: *const CrossWakeContext,
327) -> *mut u8
328where
329    F: Future + 'static,
330    F::Output: 'static,
331{
332    type Storage<F> = FutureOrOutput<F, <F as Future>::Output>;
333
334    let task: Task<Storage<F>> = Task {
335        poll_fn: poll_join::<F>,
336        drop_fn: drop_future_in_union::<F>,
337        free_fn: box_free::<Storage<F>>,
338        state: AtomicUsize::new(HAS_JOIN | (2 * REF_ONE)),
339        cross_next: AtomicPtr::new(std::ptr::null_mut()),
340        join_waker: UnsafeCell::new(None),
341        storage_offset: std::mem::offset_of!(Task<Storage<F>>, storage) as u16,
342        tracker_key,
343        _pad: [0; 2],
344        cross_wake_ctx,
345        storage: FutureOrOutput {
346            future: std::mem::ManuallyDrop::new(future),
347        },
348    };
349    Box::into_raw(Box::new(task)) as *mut u8
350}
351
352/// Construct a joinable task for slab allocation.
353///
354/// Returns the task struct to be copied into a slab slot. Uses the
355/// `FutureOrOutput<F, T>` union so the allocation fits both.
356///
357/// See `box_spawn_joinable` for the `cross_wake_ctx` contract.
358pub(crate) fn new_joinable_slab<F>(
359    future: F,
360    tracker_key: u32,
361    free_fn: unsafe fn(*mut u8),
362    cross_wake_ctx: *const CrossWakeContext,
363) -> Task<FutureOrOutput<F, F::Output>>
364where
365    F: Future + 'static,
366    F::Output: 'static,
367{
368    Task {
369        poll_fn: poll_join::<F>,
370        drop_fn: drop_future_in_union::<F>,
371        free_fn,
372        state: AtomicUsize::new(HAS_JOIN | SLAB_ALLOCATED | (2 * REF_ONE)),
373        cross_next: AtomicPtr::new(std::ptr::null_mut()),
374        join_waker: UnsafeCell::new(None),
375        storage_offset: std::mem::offset_of!(Task<FutureOrOutput<F, F::Output>>, storage) as u16,
376        tracker_key,
377        _pad: [0; 2],
378        cross_wake_ctx,
379        storage: FutureOrOutput {
380            future: std::mem::ManuallyDrop::new(future),
381        },
382    }
383}
384
385// =============================================================================
386// Task handle — raw pointer operations
387// =============================================================================
388
389/// Opaque task identifier. Wraps the raw pointer to the task.
390/// The pointer is stable for the task's lifetime.
391#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
392pub(crate) struct TaskId(pub(crate) *mut u8);
393
394// =============================================================================
395// JoinHandle
396// =============================================================================
397
398/// Handle to a spawned task. Await to get the result.
399///
400/// Dropping the handle detaches the task — it continues running but the
401/// output is dropped when the task completes. Use [`abort()`](Self::abort)
402/// to cancel the task.
403///
404/// `JoinHandle` is `!Send` and `!Sync` — it must stay on the executor thread.
405#[must_use = "dropping a JoinHandle detaches the task — await it or call .abort()"]
406pub struct JoinHandle<T> {
407    ptr: *mut u8,
408    _marker: PhantomData<T>,
409    _not_send: PhantomData<*const ()>, // !Send + !Sync
410}
411
412impl<T: 'static> Future for JoinHandle<T> {
413    type Output = T;
414
415    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
416        let ptr = self.ptr;
417
418        // SAFETY: ptr is valid — JoinHandle holds a ref (refcount >= 1).
419        if unsafe { is_completed(ptr) } {
420            let s = unsafe { state_load(ptr) };
421            assert!(s & ABORTED == 0, "polled JoinHandle after task was aborted");
422            // SAFETY: Task completed, so poll_join already transitioned the union
423            // from F to T. The output is live at storage_offset. ptr::read moves
424            // it out (bitwise copy). OUTPUT_TAKEN prevents double-read.
425            let output_ptr = unsafe { ptr.add(storage_offset(ptr)) };
426            let value = unsafe { std::ptr::read(output_ptr.cast::<T>()) };
427            unsafe { set_output_taken(ptr) };
428            Poll::Ready(value)
429        } else {
430            // SAFETY: Task still running, single-threaded — safe to write waker.
431            unsafe { set_join_waker(ptr, cx.waker().clone()) };
432            Poll::Pending
433        }
434    }
435}
436
437impl<T> JoinHandle<T> {
438    pub(crate) fn new(ptr: *mut u8) -> Self {
439        Self {
440            ptr,
441            _marker: PhantomData,
442            _not_send: PhantomData,
443        }
444    }
445
446    /// Returns `true` if the task has completed (output is ready).
447    pub fn is_finished(&self) -> bool {
448        unsafe { is_completed(self.ptr) }
449    }
450
451    /// Abort the task and consume the handle.
452    ///
453    /// The future is dropped on the next poll cycle. Consumes the handle
454    /// so it cannot be awaited after abort — this is enforced at the type
455    /// level rather than via a runtime panic.
456    ///
457    /// Returns `true` if the task was still running, `false` if it had
458    /// already completed (output is dropped by `JoinHandle::drop`).
459    #[must_use = "returns whether the task was still running"]
460    pub fn abort(self) -> bool {
461        let ptr = self.ptr;
462        let was_running = !unsafe { is_completed(ptr) };
463        if was_running {
464            unsafe { set_aborted(ptr) };
465        }
466        // self is consumed — Drop runs, which clears HAS_JOIN,
467        // takes the join waker, and decrements refcount.
468        was_running
469    }
470
471    /// Test-only raw pointer accessor for white-box scenarios that
472    /// need to drive the underlying task allocation directly (e.g.,
473    /// the cross_wake UAF regression test in PR 2). Avoids the
474    /// `repr(Rust)` layout assumption that `mem::transmute_copy`
475    /// would otherwise rely on.
476    #[cfg(test)]
477    pub(crate) fn raw_ptr(&self) -> *mut u8 {
478        self.ptr
479    }
480}
481
482impl<T> Drop for JoinHandle<T> {
483    fn drop(&mut self) {
484        let ptr = self.ptr;
485        // SAFETY: ptr is valid — JoinHandle holds a ref (refcount >= 1).
486        let s = unsafe { state_load(ptr) };
487
488        if (s & COMPLETED != 0) && (s & OUTPUT_TAKEN == 0) && (s & ABORTED == 0) {
489            // Task completed but output was never read — drop it.
490            // SAFETY: poll_join overwrote drop_fn to drop_output::<T>,
491            // so this drops the output T (not the future F).
492            unsafe { drop_task_future(ptr) };
493        }
494
495        // Clear HAS_JOIN so complete_task knows nobody is waiting.
496        // Take the join waker to release the parent task's refcount.
497        unsafe { clear_has_join(ptr) };
498        let _ = unsafe { take_join_waker(ptr) };
499
500        // Release our reference via TaskRef. Drop routes terminal state
501        // through dispose_terminal — defers via DEFERRED_FREE TLS on the
502        // executor thread (so all_tasks bookkeeping stays consistent),
503        // queues cross-thread otherwise, frees directly for null-ctx
504        // (test) tasks. JoinHandle is !Send so we're always on the
505        // executor thread here, but the routing handles all cases.
506        // SAFETY: JoinHandle owned exactly one ref on `ptr`; we hand
507        // it off to TaskRef which will ref_dec on Drop.
508        drop(unsafe { TaskRef::from_owned(ptr) });
509    }
510}
511
512// =============================================================================
513// Packed state accessor functions
514// =============================================================================
515
516/// Get the raw state value.
517///
518/// # Safety
519///
520/// `ptr` must point to a live `Task<F>`.
521#[inline]
522unsafe fn state_load(ptr: *mut u8) -> usize {
523    // SAFETY: state is AtomicUsize at offset 24 in repr(C) Task.
524    unsafe { &*ptr.add(24).cast::<AtomicUsize>() }.load(Ordering::Acquire)
525}
526
527/// Get a reference to the state atomic.
528///
529/// # Safety
530///
531/// `ptr` must point to a live `Task<F>`.
532#[inline]
533unsafe fn state_ref(ptr: *mut u8) -> &'static AtomicUsize {
534    // SAFETY: state is AtomicUsize at offset 24 in repr(C) Task.
535    // 'static is a lie — the caller must not outlive the task.
536    unsafe { &*ptr.add(24).cast::<AtomicUsize>() }
537}
538
539/// Read the `tracker_key` from a task pointer.
540///
541/// # Safety
542///
543/// `ptr` must point to a live `Task<F>`.
544#[inline]
545pub(crate) unsafe fn tracker_key(ptr: *mut u8) -> u32 {
546    // SAFETY: tracker_key is at offset 60 in repr(C) Task.
547    unsafe { *(ptr.add(60).cast::<u32>()) }
548}
549
550/// Increment the waker refcount. Called on waker clone.
551///
552/// # Safety
553///
554/// `ptr` must point to a live `Task<F>`.
555#[inline]
556pub(crate) unsafe fn ref_inc(ptr: *mut u8) {
557    let state = unsafe { state_ref(ptr) };
558    let prev = state.fetch_add(REF_ONE, Ordering::Relaxed);
559    debug_assert!((prev & REF_MASK) > 0, "ref_inc on zero refcount");
560}
561
562/// Decrement the refcount. Returns `FreeAction` indicating whether
563/// a terminal state was produced and what kind of allocation it is.
564///
565/// # Safety
566///
567/// `ptr` must point to a live (or completed) `Task<F>`.
568#[inline]
569pub(crate) unsafe fn ref_dec(ptr: *mut u8) -> FreeAction {
570    let state = unsafe { state_ref(ptr) };
571    let prev = state.fetch_sub(REF_ONE, Ordering::AcqRel);
572    debug_assert!((prev & REF_MASK) >= REF_ONE, "ref_dec on zero refcount");
573
574    // Was this the last ref?
575    if (prev & REF_MASK) != REF_ONE {
576        return FreeAction::Retain;
577    }
578
579    // Last ref. Check: COMPLETED must be set, lifecycle flags must be clear.
580    // ABORTED, OUTPUT_TAKEN, SLAB_ALLOCATED are inert — don't block terminal.
581    let flags = prev & FLAG_MASK;
582    if (flags & COMPLETED == 0) || (flags & LIFECYCLE_MASK != 0) {
583        return FreeAction::Retain;
584    }
585    if flags & SLAB_ALLOCATED != 0 {
586        FreeAction::FreeSlab
587    } else {
588        FreeAction::FreeBox
589    }
590}
591
592/// Read the refcount.
593///
594/// # Safety
595///
596/// `ptr` must point to a live `Task<F>`.
597#[inline]
598pub(crate) unsafe fn ref_count(ptr: *mut u8) -> usize {
599    (unsafe { state_load(ptr) } & REF_MASK) >> 6
600}
601
602/// Atomically set COMPLETED and decrement the executor's reference.
603/// Returns `FreeAction` indicating whether a terminal state was produced.
604///
605/// This is the key atomic operation that eliminates the race between
606/// `set_completed` and `ref_dec` that caused the SIGABRT.
607///
608/// # Safety
609///
610/// `ptr` must point to a live, not-yet-completed `Task<F>`.
611#[inline]
612pub(crate) unsafe fn complete_and_unref(ptr: *mut u8) -> FreeAction {
613    let state = unsafe { state_ref(ptr) };
614    // Atomically: set COMPLETED (add 1 to bit 0) + dec refcount (sub REF_ONE)
615    // Net subtraction = REF_ONE - COMPLETED.
616    let prev = state.fetch_sub(REF_ONE - COMPLETED, Ordering::AcqRel);
617    debug_assert!(prev & COMPLETED == 0, "double complete");
618    debug_assert!(
619        (prev & REF_MASK) >= REF_ONE,
620        "complete_and_unref on zero refcount"
621    );
622    // prev had COMPLETED=0. Last ref if prev had exactly REF_ONE.
623    // Lifecycle flags (QUEUED, HAS_JOIN) must be clear.
624    // Inert flags (ABORTED, OUTPUT_TAKEN, SLAB_ALLOCATED) don't matter.
625    if (prev & REF_MASK) != REF_ONE {
626        return FreeAction::Retain;
627    }
628    let flags = prev & FLAG_MASK;
629    if flags & LIFECYCLE_MASK != 0 {
630        return FreeAction::Retain;
631    }
632    if flags & SLAB_ALLOCATED != 0 {
633        FreeAction::FreeSlab
634    } else {
635        FreeAction::FreeBox
636    }
637}
638
639/// Check if the state is TERMINAL (safe to free).
640///
641/// # Safety
642///
643/// `ptr` must point to a live `Task<F>`.
644#[inline]
645pub(crate) unsafe fn is_terminal(ptr: *mut u8) -> bool {
646    let s = unsafe { state_load(ptr) };
647    // Strip inert flags (SLAB_ALLOCATED, ABORTED, OUTPUT_TAKEN).
648    // What remains must be exactly COMPLETED with zero refcount.
649    (s & !INERT_MASK) == COMPLETED
650}
651
652/// Read the is_completed flag.
653///
654/// # Safety
655///
656/// `ptr` must point to a (possibly completed) `Task<F>`.
657#[inline]
658pub(crate) unsafe fn is_completed(ptr: *mut u8) -> bool {
659    (unsafe { state_load(ptr) }) & COMPLETED != 0
660}
661
662/// Read the SLAB_ALLOCATED flag.
663///
664/// Used by `Executor::drop` to differentiate cleanup behavior between
665/// slab- and box-allocated tasks during unwinding.
666///
667/// # Safety
668///
669/// `ptr` must point to a live `Task<F>`.
670#[inline]
671pub(crate) unsafe fn is_slab_allocated(ptr: *mut u8) -> bool {
672    (unsafe { state_load(ptr) }) & SLAB_ALLOCATED != 0
673}
674
675/// Read the is_queued flag.
676///
677/// # Safety
678///
679/// `ptr` must point to a live `Task<F>`.
680#[inline]
681pub(crate) unsafe fn is_queued(ptr: *mut u8) -> bool {
682    (unsafe { state_load(ptr) }) & QUEUED != 0
683}
684
685/// Set the `is_queued` flag.
686///
687/// # Safety
688///
689/// `ptr` must point to a live `Task<F>`.
690#[inline]
691pub(crate) unsafe fn set_queued(ptr: *mut u8, queued: bool) {
692    let state = unsafe { state_ref(ptr) };
693    if queued {
694        state.fetch_or(QUEUED, Ordering::Release);
695    } else {
696        state.fetch_and(!QUEUED, Ordering::Release);
697    }
698}
699
700/// Atomically try to set QUEUED from false to true. Returns true if
701/// successful (was not queued). Used by cross-thread wakers.
702///
703/// # Safety
704///
705/// `ptr` must point to a live `Task<F>`.
706#[inline]
707pub(crate) unsafe fn try_set_queued(ptr: *mut u8) -> bool {
708    let state = unsafe { state_ref(ptr) };
709    // fetch_or always sets the bit. Check if it was already set.
710    let prev = state.fetch_or(QUEUED, Ordering::AcqRel);
711    (prev & QUEUED) == 0
712}
713
714/// Clear the QUEUED flag.
715///
716/// # Safety
717///
718/// `ptr` must point to a live `Task<F>`.
719#[inline]
720pub(crate) unsafe fn clear_queued(ptr: *mut u8) {
721    let state = unsafe { state_ref(ptr) };
722    state.fetch_and(!QUEUED, Ordering::Release);
723}
724
725/// Check if ABORTED flag is set.
726///
727/// # Safety
728///
729/// `ptr` must point to a live `Task<F>`.
730#[inline]
731pub(crate) unsafe fn is_aborted(ptr: *mut u8) -> bool {
732    (unsafe { state_load(ptr) }) & ABORTED != 0
733}
734
735/// Set the ABORTED flag.
736///
737/// # Safety
738///
739/// `ptr` must point to a live `Task<F>`.
740#[inline]
741pub(crate) unsafe fn set_aborted(ptr: *mut u8) {
742    let state = unsafe { state_ref(ptr) };
743    state.fetch_or(ABORTED, Ordering::Release);
744}
745
746/// Check if HAS_JOIN flag is set.
747///
748/// # Safety
749///
750/// `ptr` must point to a live `Task<F>`.
751#[inline]
752pub(crate) unsafe fn has_join(ptr: *mut u8) -> bool {
753    (unsafe { state_load(ptr) }) & HAS_JOIN != 0
754}
755
756/// Clear the HAS_JOIN flag.
757///
758/// # Safety
759///
760/// `ptr` must point to a live `Task<F>`.
761#[inline]
762pub(crate) unsafe fn clear_has_join(ptr: *mut u8) {
763    let state = unsafe { state_ref(ptr) };
764    state.fetch_and(!HAS_JOIN, Ordering::Release);
765}
766
767/// Set the OUTPUT_TAKEN flag.
768///
769/// # Safety
770///
771/// `ptr` must point to a live, completed `Task<F>`. Single-threaded.
772#[inline]
773unsafe fn set_output_taken(ptr: *mut u8) {
774    let state = unsafe { state_ref(ptr) };
775    state.fetch_or(OUTPUT_TAKEN, Ordering::Release);
776}
777
778/// Get a raw pointer to the `cross_next` atomic pointer.
779///
780/// # Safety
781///
782/// `ptr` must point to a live `Task<F>`.
783#[inline]
784pub(crate) unsafe fn cross_next(ptr: *mut u8) -> *const AtomicPtr<u8> {
785    // SAFETY: cross_next is at offset 32 in repr(C) Task.
786    unsafe { ptr.add(32).cast::<AtomicPtr<u8>>() }
787}
788
789/// Read the storage offset from the task header.
790///
791/// # Safety
792///
793/// `ptr` must point to a live `Task<S>`.
794#[inline]
795pub(crate) unsafe fn storage_offset(ptr: *mut u8) -> usize {
796    // SAFETY: storage_offset is u16 at offset 56 in repr(C) Task.
797    unsafe { *(ptr.add(56).cast::<u16>()) as usize }
798}
799
800/// Read the `cross_wake_ctx` pointer from the task header.
801///
802/// Returns the runtime's [`CrossWakeContext`] pointer (Arc-backed,
803/// heap-stable) set at spawn time, or null for tasks not associated
804/// with any runtime (test path).
805///
806/// # Safety
807///
808/// `ptr` must point to a live `Task<S>` (header still valid).
809#[inline]
810pub(crate) unsafe fn header_cross_wake_ctx(ptr: *mut u8) -> *const CrossWakeContext {
811    // SAFETY: `cross_wake_ctx` is `*const CrossWakeContext` at offset 64
812    // in `repr(C) Task`. The field is initialized exactly once at spawn
813    // time under the spawning thread's exclusive ownership, then made
814    // visible to other threads via Arc/refcount publication (any TaskRef
815    // holder transitively keeps the owning runtime's Arc alive, and the
816    // Arc's atomic-counter publication establishes happens-before for the
817    // read). Immutable after init — concurrent reads from any thread are
818    // sound. dispose_terminal explicitly reads this from foreign threads
819    // when routing terminal drops from cross-thread waker holders.
820    unsafe { *(ptr.add(64).cast::<*const CrossWakeContext>()) }
821}
822
823/// Store a waker for the JoinHandle awaiter.
824///
825/// # Safety
826///
827/// `ptr` must point to a live `Task<F>`. Single-threaded access only.
828#[inline]
829unsafe fn set_join_waker(ptr: *mut u8, waker: Waker) {
830    // SAFETY: join_waker is UnsafeCell<Option<Waker>> at offset 40.
831    let cell = unsafe { &*ptr.add(40).cast::<UnsafeCell<Option<Waker>>>() };
832    unsafe { *cell.get() = Some(waker) };
833}
834
835/// Take the join waker (if any).
836///
837/// # Safety
838///
839/// `ptr` must point to a live `Task<F>`. Single-threaded access only.
840#[inline]
841pub(crate) unsafe fn take_join_waker(ptr: *mut u8) -> Option<Waker> {
842    let cell = unsafe { &*ptr.add(40).cast::<UnsafeCell<Option<Waker>>>() };
843    unsafe { (*cell.get()).take() }
844}
845
846/// Poll the task's future.
847///
848/// # Safety
849///
850/// `ptr` must point to a live `Task<F>`.
851/// The future must not have been dropped.
852#[inline]
853pub(crate) unsafe fn poll_task(ptr: *mut u8, cx: &mut Context<'_>) -> Poll<()> {
854    // SAFETY: poll_fn is at offset 0 in repr(C) Task.
855    let poll_fn: unsafe fn(*mut u8, &mut Context<'_>) -> Poll<()> =
856        unsafe { *(ptr as *const unsafe fn(*mut u8, &mut Context<'_>) -> Poll<()>) };
857    // Pass the task base pointer — the trampoline reads storage_offset.
858    unsafe { poll_fn(ptr, cx) }
859}
860
861/// Drop the task's future (or output) in place.
862///
863/// # Safety
864///
865/// `ptr` must point to a live `Task<F>`. Must only be called once.
866#[inline]
867pub(crate) unsafe fn drop_task_future(ptr: *mut u8) {
868    // SAFETY: drop_fn is at offset 8 in repr(C) Task.
869    let drop_fn: unsafe fn(*mut u8) = unsafe { *(ptr.add(8) as *const unsafe fn(*mut u8)) };
870    // Pass base pointer — the trampoline reads storage_offset.
871    unsafe { drop_fn(ptr) }
872}
873
874/// Call the task's free function to deallocate its storage.
875///
876/// # Safety
877///
878/// `ptr` must point to a `Task<F>` whose future has already been dropped.
879/// Must only be called once (after state reaches TERMINAL).
880#[inline]
881pub(crate) unsafe fn free_task(ptr: *mut u8) {
882    // SAFETY: free_fn is at offset 16 in repr(C) Task.
883    let free_fn: unsafe fn(*mut u8) = unsafe { *(ptr.add(16) as *const unsafe fn(*mut u8)) };
884    unsafe { free_fn(ptr) }
885}
886
887// =============================================================================
888// Type-erased vtable functions
889// =============================================================================
890
891/// Poll trampoline for joinable tasks (Output = T).
892///
893/// On completion: drops F, writes T into the same location, overwrites
894/// drop_fn to target T instead of F.
895///
896/// # Safety
897///
898/// `ptr` must point to a live `Task<F>`. The future must not have been dropped.
899unsafe fn poll_join<F: Future>(ptr: *mut u8, cx: &mut Context<'_>) -> Poll<()>
900where
901    F::Output: 'static,
902{
903    // Check if aborted
904    if unsafe { is_aborted(ptr) } {
905        return Poll::Ready(());
906    }
907
908    let future_ptr = unsafe { ptr.add(storage_offset(ptr)) };
909    let future = unsafe { Pin::new_unchecked(&mut *future_ptr.cast::<F>()) };
910    match future.poll(cx) {
911        Poll::Pending => Poll::Pending,
912        Poll::Ready(value) => {
913            let drop_fn_slot = unsafe { ptr.add(8).cast::<unsafe fn(*mut u8)>() };
914            // 1. Overwrite drop_fn to no-op BEFORE dropping F.
915            //    If F::drop() panics, this prevents double-drop —
916            //    subsequent cleanup calls the no-op instead of
917            //    drop_future_in_union on a partially-dropped F.
918            //    The output (value) is dropped during unwind (stack-owned).
919            unsafe { *drop_fn_slot = drop_noop };
920            // 2. Drop the future in place (panic-safe now)
921            unsafe { std::ptr::drop_in_place(future_ptr.cast::<F>()) };
922            // 3. Write output T into the same location
923            unsafe { std::ptr::write(future_ptr.cast::<F::Output>(), value) };
924            // 4. Overwrite drop_fn: now drops T instead of F
925            unsafe { *drop_fn_slot = drop_output::<F::Output> };
926            Poll::Ready(())
927        }
928    }
929}
930
931/// Drop trampoline for futures stored directly (fire-and-forget tasks).
932///
933/// # Safety
934///
935/// `ptr` must point to a live `Task<F>` with a live future at `storage_offset`.
936#[cfg(test)]
937unsafe fn drop_future<F>(ptr: *mut u8) {
938    let future_ptr = unsafe { ptr.add(storage_offset(ptr)) };
939    unsafe { std::ptr::drop_in_place(future_ptr.cast::<F>()) }
940}
941
942/// Drop trampoline for futures stored in FutureOrOutput union.
943///
944/// # Safety
945///
946/// `ptr` must point to a `Task<FutureOrOutput<F, T>>` with a live future.
947unsafe fn drop_future_in_union<F: Future>(ptr: *mut u8) {
948    let storage_ptr = unsafe { ptr.add(storage_offset(ptr)) };
949    // The future is at the start of the union (same offset as the union itself).
950    unsafe { std::ptr::drop_in_place(storage_ptr.cast::<F>()) }
951}
952
953/// No-op drop trampoline. Installed temporarily during the F→T transition
954/// in `poll_join` to prevent double-drop if `F::drop()` panics.
955///
956/// # Safety
957///
958/// Always safe — does nothing.
959unsafe fn drop_noop(_ptr: *mut u8) {}
960
961/// Drop trampoline for output values. Receives the task base pointer.
962///
963/// Installed by `poll_join` after the future completes, replacing `drop_future`.
964///
965/// # Safety
966///
967/// `ptr` must point to a `Task` with a live `T` at `storage_offset`.
968unsafe fn drop_output<T>(ptr: *mut u8) {
969    let output_ptr = unsafe { ptr.add(storage_offset(ptr)) };
970    unsafe { std::ptr::drop_in_place(output_ptr.cast::<T>()) }
971}
972
973/// Free function for Box-allocated tasks.
974///
975/// Deallocates the memory without running destructors — the future/output
976/// was already dropped via `drop_task_future`, and the header fields
977/// are all trivial. Only the heap allocation needs to be freed.
978///
979/// # Safety
980///
981/// `ptr` must have been produced by `Box::into_raw(Box::new(Task<F>))`.
982/// The value at offset 64 must already be dropped.
983unsafe fn box_free<F>(ptr: *mut u8) {
984    // SAFETY: Layout matches what Box::new(Task<F>) allocated.
985    let layout = std::alloc::Layout::new::<Task<F>>();
986    unsafe { std::alloc::dealloc(ptr, layout) }
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992
993    #[test]
994    fn task_header_size() {
995        assert_eq!(TASK_HEADER_SIZE, 72);
996        assert_eq!(std::mem::size_of::<Task<()>>(), 72);
997    }
998
999    #[test]
1000    fn task_layout_offsets() {
1001        assert_eq!(std::mem::offset_of!(Task<()>, poll_fn), 0);
1002        assert_eq!(std::mem::offset_of!(Task<()>, drop_fn), 8);
1003        assert_eq!(std::mem::offset_of!(Task<()>, free_fn), 16);
1004        assert_eq!(std::mem::offset_of!(Task<()>, state), 24);
1005        assert_eq!(std::mem::offset_of!(Task<()>, cross_next), 32);
1006        assert_eq!(std::mem::offset_of!(Task<()>, join_waker), 40);
1007        assert_eq!(std::mem::offset_of!(Task<()>, storage_offset), 56);
1008        assert_eq!(std::mem::offset_of!(Task<()>, _pad), 58);
1009        assert_eq!(std::mem::offset_of!(Task<()>, tracker_key), 60);
1010        assert_eq!(std::mem::offset_of!(Task<()>, cross_wake_ctx), 64);
1011        assert_eq!(std::mem::offset_of!(Task<()>, storage), 72);
1012    }
1013
1014    #[test]
1015    fn task_size_with_future() {
1016        #[allow(dead_code)]
1017        struct SmallFuture([u8; 24]);
1018        impl Future for SmallFuture {
1019            type Output = ();
1020            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1021                Poll::Ready(())
1022            }
1023        }
1024
1025        // 72 byte header + 24 byte future = 96 bytes
1026        assert_eq!(
1027            std::mem::size_of::<Task<SmallFuture>>(),
1028            TASK_HEADER_SIZE + 24
1029        );
1030    }
1031
1032    #[test]
1033    fn packed_state_fire_and_forget() {
1034        struct Noop;
1035        impl Future for Noop {
1036            type Output = ();
1037            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1038                Poll::Ready(())
1039            }
1040        }
1041
1042        let task = Box::new(Task::new_boxed(Noop, 0));
1043        let ptr = Box::into_raw(task) as *mut u8;
1044
1045        unsafe {
1046            // Initial state: 1 ref, no flags
1047            assert_eq!(ref_count(ptr), 1);
1048            assert!(!is_completed(ptr));
1049            assert!(!is_queued(ptr));
1050            assert!(!has_join(ptr));
1051            assert!(!is_terminal(ptr));
1052
1053            // Set and clear queued
1054            set_queued(ptr, true);
1055            assert!(is_queued(ptr));
1056            set_queued(ptr, false);
1057            assert!(!is_queued(ptr));
1058
1059            // complete_and_unref with 1 ref → TERMINAL
1060            drop_task_future(ptr);
1061            assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1062            assert!(is_terminal(ptr));
1063
1064            free_task(ptr);
1065        }
1066    }
1067
1068    #[test]
1069    fn packed_state_joinable() {
1070        struct Noop;
1071        impl Future for Noop {
1072            type Output = u64;
1073            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1074                Poll::Ready(42)
1075            }
1076        }
1077
1078        let ptr = box_spawn_joinable(Noop, 7, std::ptr::null());
1079        unsafe {
1080            assert!(has_join(ptr));
1081            assert!(!is_aborted(ptr));
1082            assert_eq!(ref_count(ptr), 2); // executor + JoinHandle
1083            assert_eq!(tracker_key(ptr), 7);
1084
1085            // Simulate: handle drops before completion
1086            clear_has_join(ptr);
1087            assert!(!has_join(ptr));
1088            assert!(matches!(ref_dec(ptr), FreeAction::Retain)); // still 1 ref, not completed
1089
1090            // complete_and_unref → TERMINAL
1091            drop_task_future(ptr);
1092            assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1093            assert!(is_terminal(ptr));
1094
1095            free_task(ptr);
1096        }
1097    }
1098
1099    #[test]
1100    fn packed_state_joinable_completion_before_handle_drop() {
1101        struct Noop;
1102        impl Future for Noop {
1103            type Output = u64;
1104            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1105                Poll::Ready(42)
1106            }
1107        }
1108
1109        let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1110        unsafe {
1111            // complete_and_unref with 2 refs → not terminal
1112            drop_task_future(ptr);
1113            assert!(matches!(complete_and_unref(ptr), FreeAction::Retain));
1114            assert!(is_completed(ptr));
1115            assert_eq!(ref_count(ptr), 1);
1116
1117            // Handle drop: clear HAS_JOIN, ref_dec → TERMINAL
1118            clear_has_join(ptr);
1119            assert!(matches!(ref_dec(ptr), FreeAction::FreeBox));
1120            assert!(is_terminal(ptr));
1121
1122            free_task(ptr);
1123        }
1124    }
1125
1126    #[test]
1127    fn packed_state_cross_thread_waker_scenario() {
1128        struct Noop;
1129        impl Future for Noop {
1130            type Output = u64;
1131            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1132                Poll::Ready(42)
1133            }
1134        }
1135
1136        let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1137        unsafe {
1138            // Waker clone: ref_inc
1139            ref_inc(ptr);
1140            assert_eq!(ref_count(ptr), 3);
1141
1142            // complete_and_unref: executor releases its ref
1143            drop_task_future(ptr);
1144            assert!(matches!(complete_and_unref(ptr), FreeAction::Retain));
1145
1146            // Handle drop: clear HAS_JOIN, ref_dec
1147            clear_has_join(ptr);
1148            assert!(matches!(ref_dec(ptr), FreeAction::Retain)); // still 1 ref (waker)
1149
1150            // Waker drop: ref_dec → TERMINAL
1151            assert!(matches!(ref_dec(ptr), FreeAction::FreeBox));
1152            assert!(is_terminal(ptr));
1153
1154            free_task(ptr);
1155        }
1156    }
1157
1158    // =========================================================================
1159    // Panic safety — drop_fn transitions
1160    // =========================================================================
1161
1162    /// Future whose Drop impl panics. Used to verify the drop_noop guard
1163    /// in poll_join prevents double-drop.
1164    struct PanickingDrop {
1165        drop_count: *mut u32,
1166    }
1167
1168    impl Future for PanickingDrop {
1169        type Output = u64;
1170        fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1171            Poll::Ready(42)
1172        }
1173    }
1174
1175    impl Drop for PanickingDrop {
1176        fn drop(&mut self) {
1177            unsafe { *self.drop_count += 1 };
1178            panic!("intentional drop panic");
1179        }
1180    }
1181
1182    #[test]
1183    fn poll_join_panic_in_drop_prevents_double_drop() {
1184        use std::task::{RawWaker, RawWakerVTable, Waker};
1185
1186        static NOOP_VTABLE: RawWakerVTable =
1187            RawWakerVTable::new(|p| RawWaker::new(p, &NOOP_VTABLE), |_| {}, |_| {}, |_| {});
1188        let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &NOOP_VTABLE)) };
1189        let mut cx = Context::from_waker(&waker);
1190
1191        let mut drop_count: u32 = 0;
1192        let ptr = box_spawn_joinable(
1193            PanickingDrop {
1194                drop_count: &raw mut drop_count,
1195            },
1196            0,
1197            std::ptr::null(),
1198        );
1199
1200        // poll_join completes the future, then drops F — which panics.
1201        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| unsafe {
1202            poll_task(ptr, &mut cx)
1203        }));
1204
1205        // The panic should have been caught.
1206        assert!(result.is_err(), "expected panic from PanickingDrop");
1207        // F was dropped exactly once (by poll_join, before the panic propagated).
1208        assert_eq!(drop_count, 1, "future should be dropped exactly once");
1209
1210        // drop_fn should now be drop_noop — calling it must NOT double-drop F.
1211        unsafe { drop_task_future(ptr) };
1212        assert_eq!(
1213            drop_count, 1,
1214            "drop_task_future after panic must be a no-op (drop_noop)"
1215        );
1216
1217        // Clean up: dec both refs (executor + JoinHandle), then free.
1218        unsafe {
1219            ref_dec(ptr);
1220            ref_dec(ptr);
1221            free_task(ptr);
1222        }
1223    }
1224
1225    #[test]
1226    fn drop_fn_transitions_correctly_on_normal_completion() {
1227        use std::task::{RawWaker, RawWakerVTable, Waker};
1228
1229        static NOOP_VTABLE: RawWakerVTable =
1230            RawWakerVTable::new(|p| RawWaker::new(p, &NOOP_VTABLE), |_| {}, |_| {}, |_| {});
1231        let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &NOOP_VTABLE)) };
1232        let mut cx = Context::from_waker(&waker);
1233
1234        static mut OUTPUT_DROP_COUNT: u32 = 0;
1235        struct TrackedOutput;
1236        impl Drop for TrackedOutput {
1237            fn drop(&mut self) {
1238                unsafe { OUTPUT_DROP_COUNT += 1 };
1239            }
1240        }
1241
1242        struct ProduceTracked;
1243        impl Future for ProduceTracked {
1244            type Output = TrackedOutput;
1245            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<TrackedOutput> {
1246                Poll::Ready(TrackedOutput)
1247            }
1248        }
1249
1250        let ptr = box_spawn_joinable(ProduceTracked, 0, std::ptr::null());
1251
1252        // Poll to completion — F dropped, T written, drop_fn → drop_output.
1253        let result = unsafe { poll_task(ptr, &mut cx) };
1254        assert!(result.is_ready());
1255
1256        // drop_fn should now target T (TrackedOutput).
1257        unsafe { OUTPUT_DROP_COUNT = 0 };
1258        unsafe { drop_task_future(ptr) };
1259        assert_eq!(
1260            unsafe { OUTPUT_DROP_COUNT },
1261            1,
1262            "drop_fn should drop the output exactly once"
1263        );
1264
1265        // Clean up.
1266        unsafe {
1267            ref_dec(ptr);
1268            ref_dec(ptr);
1269            free_task(ptr);
1270        }
1271    }
1272
1273    // =========================================================================
1274    // Packed state word — SIGABRT root cause regression tests
1275    // =========================================================================
1276
1277    #[test]
1278    fn packed_state_fire_and_forget_terminal() {
1279        // Box task with 1 ref (no JoinHandle). complete_and_unref → FreeBox.
1280        // Verify terminal state is exactly TERMINAL_BOX (1).
1281        struct Noop;
1282        impl Future for Noop {
1283            type Output = ();
1284            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1285                Poll::Ready(())
1286            }
1287        }
1288
1289        let task = Box::new(Task::new_boxed(Noop, 0));
1290        let ptr = Box::into_raw(task) as *mut u8;
1291
1292        unsafe {
1293            assert_eq!(ref_count(ptr), 1);
1294            assert!(!has_join(ptr));
1295
1296            drop_task_future(ptr);
1297            let action = complete_and_unref(ptr);
1298            assert_eq!(action, FreeAction::FreeBox);
1299
1300            let s = state_load(ptr);
1301            assert_eq!(s, COMPLETED, "terminal state must have COMPLETED set");
1302            assert_eq!(s, 1);
1303            assert!(is_terminal(ptr));
1304
1305            free_task(ptr);
1306        }
1307    }
1308
1309    #[test]
1310    fn packed_state_slab_flag_terminal() {
1311        // Task with SLAB_ALLOCATED set. complete_and_unref → FreeSlab.
1312        // Verify terminal state is exactly TERMINAL_SLAB (33).
1313        struct Noop;
1314        impl Future for Noop {
1315            type Output = u64;
1316            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1317                Poll::Ready(42)
1318            }
1319        }
1320
1321        // Use new_joinable_slab to get SLAB_ALLOCATED flag set at construction.
1322        // Provide a free_fn that does Box dealloc (we box it manually below).
1323        type Storage = FutureOrOutput<Noop, u64>;
1324        unsafe fn slab_free(ptr: *mut u8) {
1325            let layout = std::alloc::Layout::new::<Task<Storage>>();
1326            std::alloc::dealloc(ptr, layout);
1327        }
1328
1329        let task = new_joinable_slab(Noop, 0, slab_free, std::ptr::null());
1330        let ptr = Box::into_raw(Box::new(task)) as *mut u8;
1331
1332        unsafe {
1333            assert_eq!(ref_count(ptr), 2); // executor + JoinHandle
1334            assert!(has_join(ptr));
1335
1336            // Simulate handle detach: clear HAS_JOIN + ref_dec
1337            clear_has_join(ptr);
1338            assert_eq!(ref_dec(ptr), FreeAction::Retain);
1339            assert_eq!(ref_count(ptr), 1);
1340
1341            // Executor completes task
1342            drop_task_future(ptr);
1343            let action = complete_and_unref(ptr);
1344            assert_eq!(action, FreeAction::FreeSlab);
1345
1346            let s = state_load(ptr);
1347            assert_eq!(
1348                s,
1349                COMPLETED | SLAB_ALLOCATED,
1350                "terminal state must be COMPLETED | SLAB_ALLOCATED"
1351            );
1352            assert_eq!(s, 33);
1353            assert!(is_terminal(ptr));
1354
1355            slab_free(ptr);
1356        }
1357    }
1358
1359    #[test]
1360    fn packed_state_joinable_handle_drops_first() {
1361        // Joinable task (2 refs + HAS_JOIN). Handle drops first:
1362        // clear HAS_JOIN → ref_dec → 1 ref remaining.
1363        // Then complete_and_unref → terminal.
1364        struct Noop;
1365        impl Future for Noop {
1366            type Output = u64;
1367            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1368                Poll::Ready(42)
1369            }
1370        }
1371
1372        let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1373        unsafe {
1374            assert_eq!(ref_count(ptr), 2);
1375            assert!(has_join(ptr));
1376
1377            // Handle drops: clear HAS_JOIN, ref_dec
1378            clear_has_join(ptr);
1379            assert!(!has_join(ptr));
1380            assert_eq!(ref_dec(ptr), FreeAction::Retain);
1381            assert_eq!(ref_count(ptr), 1);
1382            assert!(!is_terminal(ptr));
1383
1384            // Executor completes
1385            drop_task_future(ptr);
1386            assert_eq!(complete_and_unref(ptr), FreeAction::FreeBox);
1387            assert!(is_terminal(ptr));
1388
1389            free_task(ptr);
1390        }
1391    }
1392
1393    #[test]
1394    fn packed_state_joinable_completion_first_then_handle() {
1395        // Joinable task. Completion fires first (Retain because 2 refs).
1396        // Then handle clears HAS_JOIN + ref_dec → FreeBox.
1397        struct Noop;
1398        impl Future for Noop {
1399            type Output = u64;
1400            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1401                Poll::Ready(42)
1402            }
1403        }
1404
1405        let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1406        unsafe {
1407            // complete_and_unref: sets COMPLETED, dec ref → 1 ref remains
1408            drop_task_future(ptr);
1409            assert_eq!(complete_and_unref(ptr), FreeAction::Retain);
1410            assert!(is_completed(ptr));
1411            assert_eq!(ref_count(ptr), 1);
1412
1413            // Handle drops: clear HAS_JOIN, ref_dec → terminal
1414            clear_has_join(ptr);
1415            assert_eq!(ref_dec(ptr), FreeAction::FreeBox);
1416            assert!(is_terminal(ptr));
1417
1418            free_task(ptr);
1419        }
1420    }
1421
1422    #[test]
1423    fn packed_state_waker_clone_lifecycle() {
1424        // Joinable task (2 refs). Waker clone adds 3rd ref.
1425        // complete_and_unref → Retain (2 refs remain, HAS_JOIN still set).
1426        // Handle drops (clear HAS_JOIN + ref_dec) → Retain (1 ref from waker).
1427        // Waker drops (ref_dec) → FreeBox.
1428        struct Noop;
1429        impl Future for Noop {
1430            type Output = u64;
1431            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1432                Poll::Ready(42)
1433            }
1434        }
1435
1436        let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1437        unsafe {
1438            // Waker clone: ref_inc
1439            ref_inc(ptr);
1440            assert_eq!(ref_count(ptr), 3);
1441
1442            // Executor completes: complete_and_unref
1443            drop_task_future(ptr);
1444            assert_eq!(complete_and_unref(ptr), FreeAction::Retain);
1445            assert_eq!(ref_count(ptr), 2);
1446
1447            // Handle drops: clear HAS_JOIN, ref_dec
1448            clear_has_join(ptr);
1449            assert_eq!(ref_dec(ptr), FreeAction::Retain);
1450            assert_eq!(ref_count(ptr), 1);
1451
1452            // Waker drops: ref_dec → terminal
1453            assert_eq!(ref_dec(ptr), FreeAction::FreeBox);
1454            assert!(is_terminal(ptr));
1455
1456            free_task(ptr);
1457        }
1458    }
1459
1460    #[test]
1461    fn packed_state_leaked_flag_prevents_terminal() {
1462        // If HAS_JOIN is NOT cleared before the final ref_dec, the state
1463        // won't reach terminal (HAS_JOIN is a lifecycle flag that blocks it).
1464        // Result: Retain. This is safe — the lifecycle flag prevents
1465        // premature free.
1466        struct Noop;
1467        impl Future for Noop {
1468            type Output = u64;
1469            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1470                Poll::Ready(42)
1471            }
1472        }
1473
1474        let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1475        unsafe {
1476            // complete_and_unref with 2 refs → Retain
1477            drop_task_future(ptr);
1478            assert_eq!(complete_and_unref(ptr), FreeAction::Retain);
1479
1480            // ref_dec WITHOUT clearing HAS_JOIN → still not terminal
1481            // because HAS_JOIN is a lifecycle flag that blocks terminal.
1482            assert_eq!(ref_dec(ptr), FreeAction::Retain);
1483            assert!(!is_terminal(ptr));
1484
1485            // State is COMPLETED | HAS_JOIN | 0 refs — leaked but safe.
1486            // In real code this can't happen (JoinHandle::Drop always
1487            // clears HAS_JOIN), but the packed state correctly prevents
1488            // a free even if it did.
1489            let s = state_load(ptr);
1490            assert_eq!(s & COMPLETED, COMPLETED);
1491            assert_eq!(s & HAS_JOIN, HAS_JOIN);
1492            assert_eq!(ref_count(ptr), 0);
1493
1494            // Clean up: manually clear HAS_JOIN to reach terminal, then free.
1495            clear_has_join(ptr);
1496            assert!(is_terminal(ptr));
1497            free_task(ptr);
1498        }
1499    }
1500
1501    // =========================================================================
1502    // TaskRef — RAII inc/dec discipline
1503    // =========================================================================
1504
1505    #[test]
1506    fn taskref_acquire_drop_balances_refcount() {
1507        // Acquire a TaskRef on a task with rc=1; rc goes to 2. Drop the
1508        // TaskRef; rc goes back to 1. Verifies acquire(ref_inc) is paired
1509        // with Drop(ref_dec).
1510        let task = Box::new(Task::new_boxed(async {}, 0));
1511        let ptr = Box::into_raw(task) as *mut u8;
1512
1513        unsafe {
1514            assert_eq!(ref_count(ptr), 1);
1515            let task_ref = TaskRef::acquire(ptr);
1516            assert_eq!(ref_count(ptr), 2);
1517            drop(task_ref); // Not terminal — rc 2 → 1
1518            assert_eq!(ref_count(ptr), 1);
1519
1520            // Cleanup: complete and free directly (no TaskRef path).
1521            drop_task_future(ptr);
1522            assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1523            free_task(ptr);
1524        }
1525    }
1526
1527    #[test]
1528    fn taskref_from_owned_drop_balances_refcount() {
1529        // Manually ref_inc, then wrap with from_owned (no extra inc).
1530        // Drop releases the manual ref. Verifies from_owned's "wrap a
1531        // pre-incremented pointer" contract.
1532        let task = Box::new(Task::new_boxed(async {}, 0));
1533        let ptr = Box::into_raw(task) as *mut u8;
1534
1535        unsafe {
1536            assert_eq!(ref_count(ptr), 1);
1537            ref_inc(ptr); // simulate handoff (e.g. RawWaker::data ownership)
1538            assert_eq!(ref_count(ptr), 2);
1539            let task_ref = TaskRef::from_owned(ptr);
1540            // No additional ref_inc — TaskRef takes the existing ref.
1541            assert_eq!(ref_count(ptr), 2);
1542            drop(task_ref); // releases the manual ref
1543            assert_eq!(ref_count(ptr), 1);
1544
1545            // Cleanup.
1546            drop_task_future(ptr);
1547            assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1548            free_task(ptr);
1549        }
1550    }
1551
1552    #[test]
1553    fn taskref_drop_non_terminal_no_dispose() {
1554        // Drop a TaskRef when refcount > 1 — should NOT invoke
1555        // dispose_terminal. Verifies Drop's terminal gate (Retain branch
1556        // returns immediately).
1557        let task = Box::new(Task::new_boxed(async {}, 0));
1558        let ptr = Box::into_raw(task) as *mut u8;
1559
1560        unsafe {
1561            // rc=1, acquire to rc=2, drop to rc=1. Not terminal (no
1562            // COMPLETED, lifecycle flags clear). Drop hits the Retain
1563            // branch — no dispose_terminal call. If dispose_terminal
1564            // were called with a non-terminal task it would assert (or
1565            // worse, free a live task).
1566            let task_ref = TaskRef::acquire(ptr);
1567            assert_eq!(ref_count(ptr), 2);
1568            drop(task_ref);
1569            assert_eq!(ref_count(ptr), 1);
1570            assert!(!is_completed(ptr));
1571
1572            // Cleanup.
1573            drop_task_future(ptr);
1574            assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1575            free_task(ptr);
1576        }
1577    }
1578
1579    #[test]
1580    fn packed_state_many_refs_converge() {
1581        // Clone waker 10 times (ref_inc 10x), complete, then ref_dec 10x.
1582        // Only the last ref_dec returns FreeBox. All others Retain.
1583        struct Noop;
1584        impl Future for Noop {
1585            type Output = u64;
1586            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1587                Poll::Ready(42)
1588            }
1589        }
1590
1591        let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1592        unsafe {
1593            // 10 waker clones: ref 2 → 12
1594            for _ in 0..10 {
1595                ref_inc(ptr);
1596            }
1597            assert_eq!(ref_count(ptr), 12);
1598
1599            // Executor completes: ref 12 → 11
1600            drop_task_future(ptr);
1601            assert_eq!(complete_and_unref(ptr), FreeAction::Retain);
1602            assert_eq!(ref_count(ptr), 11);
1603
1604            // Handle drops: clear HAS_JOIN, ref_dec → 10
1605            clear_has_join(ptr);
1606            assert_eq!(ref_dec(ptr), FreeAction::Retain);
1607            assert_eq!(ref_count(ptr), 10);
1608
1609            // Drop 9 waker refs — all Retain
1610            for i in 0..9 {
1611                assert_eq!(
1612                    ref_dec(ptr),
1613                    FreeAction::Retain,
1614                    "ref_dec #{i} should Retain"
1615                );
1616            }
1617            assert_eq!(ref_count(ptr), 1);
1618
1619            // Last waker drop → FreeBox
1620            assert_eq!(ref_dec(ptr), FreeAction::FreeBox);
1621            assert!(is_terminal(ptr));
1622
1623            free_task(ptr);
1624        }
1625    }
1626}