nexus_async_rt/task.rs
1//! Task storage: header + future/output union in a contiguous allocation.
2//!
3//! Each task is a `Task<F>` struct. The raw pointer to the allocation
4//! IS the task handle — no index layer, no separate metadata store.
5//!
6//! The waker holds the raw pointer directly. `wake()` sets `QUEUED`
7//! and pushes the pointer to the ready queue. Zero allocations.
8//!
9//! Tasks can be allocated via Box (default) or slab (power user).
10//! The `free_fn` in the header knows how to deallocate regardless
11//! of which allocator was used.
12//!
13//! ## Packed state word
14//!
15//! All task state (flags + refcount) is packed into a single `AtomicUsize`:
16//!
17//! ```text
18//! bits 0-5: flags (COMPLETED, QUEUED, HAS_JOIN, ABORTED, OUTPUT_TAKEN, SLAB_ALLOCATED)
19//! bits 6+: refcount (shifted by 6)
20//! ```
21//!
22//! This eliminates the SIGABRT race where `Executor::drop` reads
23//! `ref_count` and `is_completed` as separate atomics and a cross-thread
24//! waker can decrement the refcount between those reads.
25//!
26//! The state word naturally converges to `TERMINAL = COMPLETED = 1`
27//! when all refs are decremented and all transient flags are cleared.
28//! The free check is one comparison: `state == TERMINAL`.
29//!
30//! ## Union storage
31//!
32//! The slot at `storage_offset` holds either `F` (the future) or `T` (the output),
33//! never both. While running, `F` is live. When the future completes,
34//! `poll_join` drops `F` in place and writes `T` to the same bytes.
35//! `drop_fn` is overwritten from `drop_fn::<F>` to `drop_output::<T>`
36//! so subsequent cleanup targets the correct type.
37//!
38//! ## `TaskRef` ownership rule
39//!
40//! [`TaskRef`] covers every refcount holder EXCEPT the executor's own
41//! `all_tasks` ref. That single ref uses [`complete_and_unref`] directly
42//! (atomic COMPLETED-set + decrement), bypassing TaskRef's Drop-time
43//! `dispose_terminal` routing. Wrapping `all_tasks` ownership in TaskRef
44//! would route terminal frees through `dispose_terminal` →
45//! `try_defer_free`, double-handling tasks the executor is already
46//! tracking. Subtle regression — don't do it.
47//!
48//! Everything else (local wakers, cross-thread wakers, channel slots,
49//! `JoinHandle`) IS a `TaskRef`.
50
51use std::cell::UnsafeCell;
52use std::future::Future;
53use std::marker::PhantomData;
54use std::pin::Pin;
55use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
56use std::task::{Context, Poll, Waker};
57
58use crate::cross_wake::CrossWakeContext;
59
60// =============================================================================
61// Packed state word — constants
62// =============================================================================
63
64/// Task has completed (future returned Ready or was aborted).
65const COMPLETED: usize = 1 << 0;
66/// Task is in a ready queue (dedup flag).
67const QUEUED: usize = 1 << 1;
68/// JoinHandle exists for this task.
69const HAS_JOIN: usize = 1 << 2;
70/// abort() was called.
71const ABORTED: usize = 1 << 3;
72/// JoinHandle consumed the output via poll.
73const OUTPUT_TAKEN: usize = 1 << 4;
74/// Task was allocated from the slab (permanent flag, set at spawn).
75const SLAB_ALLOCATED: usize = 1 << 5;
76/// Mask for all flag bits (0-5).
77const FLAG_MASK: usize = 0b11_1111;
78/// One reference count unit (bit 6).
79const REF_ONE: usize = 1 << 6;
80/// Mask for refcount bits (6+).
81const REF_MASK: usize = !FLAG_MASK;
82
83/// Lifecycle flags: must be cleared before a task can reach terminal.
84/// QUEUED: someone needs to pop this task from a queue.
85/// HAS_JOIN: a JoinHandle still exists and must be dropped.
86const LIFECYCLE_MASK: usize = QUEUED | HAS_JOIN;
87
88/// Inert flags: permanent metadata or historical — don't block terminal.
89/// SLAB_ALLOCATED: permanent, set at spawn.
90/// ABORTED: historical, set on abort — no cleanup gated on clearing it.
91/// OUTPUT_TAKEN: historical, set when output read — same.
92const INERT_MASK: usize = SLAB_ALLOCATED | ABORTED | OUTPUT_TAKEN;
93
94/// What to do when a ref_dec or complete_and_unref produces a terminal state.
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub(crate) enum FreeAction {
97 /// Task still has outstanding refs or unchecked flags. No action.
98 Retain,
99 /// Box-allocated terminal. Free from any thread via free_task.
100 FreeBox,
101 /// Slab-allocated terminal. Route to executor thread for slab free.
102 FreeSlab,
103}
104
105// =============================================================================
106// TaskRef — RAII smart pointer pairing ref_inc with ref_dec
107// =============================================================================
108
109/// Refcounted handle to a task. Pairs `ref_inc` with `ref_dec` at
110/// compile time — Drop calls `ref_dec` and routes terminal results
111/// through `dispose_terminal`.
112///
113/// `TaskRef` is the canonical refcount holder for everything except the
114/// executor's `all_tasks` ref (see the module-level doc-block). Local
115/// wakers, cross-thread wakers, channel slots, and `JoinHandle` all
116/// hold a `TaskRef`.
117///
118/// # Invariants
119///
120/// - Each `TaskRef` owns exactly one refcount unit on the underlying task.
121/// - `Drop` decrements; if terminal, the task is routed via
122/// `crate::cross_wake::dispose_terminal` (defer via `try_defer_free`
123/// on the owning executor thread or for null-ctx test tasks; queue
124/// via the cross-wake queue off-thread).
125pub(crate) struct TaskRef {
126 ptr: *mut u8,
127}
128
129impl TaskRef {
130 /// Acquire a reference. Increments the task's refcount.
131 ///
132 /// # Safety
133 ///
134 /// `ptr` must point to a live task with refcount >= 1 at the time of call.
135 #[inline]
136 pub(crate) unsafe fn acquire(ptr: *mut u8) -> Self {
137 unsafe { ref_inc(ptr) };
138 Self { ptr }
139 }
140
141 /// Wrap a pre-incremented pointer (no `ref_inc` here).
142 ///
143 /// Use when the caller has already accounted for the ref (e.g., on
144 /// the boundary of a vtable handoff like `RawWaker::data` →
145 /// `wake_fn` consuming the ref).
146 ///
147 /// # Safety
148 ///
149 /// `ptr` owns one ref. The caller must not also drop it.
150 #[inline]
151 pub(crate) unsafe fn from_owned(ptr: *mut u8) -> Self {
152 Self { ptr }
153 }
154
155 /// The raw task pointer this handle holds.
156 #[inline]
157 pub(crate) fn as_ptr(&self) -> *mut u8 {
158 self.ptr
159 }
160}
161
162impl Drop for TaskRef {
163 #[inline]
164 fn drop(&mut self) {
165 match unsafe { ref_dec(self.ptr) } {
166 FreeAction::Retain => {}
167 FreeAction::FreeBox | FreeAction::FreeSlab => {
168 // SAFETY: terminal state — ref just dropped to 0 with
169 // all lifecycle flags clear. dispose_terminal routes
170 // per the task's header context and current thread:
171 //
172 // - On-thread (or null-ctx test task): try_defer_free
173 // pushes to DEFERRED_FREE TLS if a poll cycle is
174 // active; otherwise the slot leaks until
175 // Executor::drop reclaims via its all_tasks scan.
176 // - Off-thread: queues via the cross-wake queue +
177 // conditional eventfd poke.
178 //
179 // Direct-free is never used — would race
180 // Executor::all_tasks bookkeeping. See dispose_terminal's
181 // doc-comment in `cross_wake.rs` for the full rationale.
182 unsafe { crate::cross_wake::dispose_terminal(self.ptr) };
183 }
184 }
185 }
186}
187
188// SAFETY: TaskRef is a raw pointer + refcount discipline. The underlying
189// task allocation is Send-safe (atomic state, no thread-affine fields
190// in the header). Cross-thread holders (tokio_compat, channel slots)
191// store TaskRef across threads, so it must be Send.
192unsafe impl Send for TaskRef {}
193// Not Sync — only the holder may drop. Cloning a TaskRef means a new
194// ref_inc; aliasing through &TaskRef would let two holders ref_dec
195// the same logical ref.
196
197// =============================================================================
198// Task layout
199// =============================================================================
200
201/// Header size in bytes. Must match the layout of `Task<F>` before the
202/// `storage` field.
203pub const TASK_HEADER_SIZE: usize = 72;
204
205/// Task header + storage in a contiguous allocation. `repr(C)` for
206/// deterministic layout.
207///
208/// `S` is the storage type — either just `F` (fire-and-forget) or a union
209/// of `F` and `T` (joinable). The header is always 72 bytes regardless of `S`.
210///
211/// Layout (64-bit):
212/// ```text
213/// offset 0: poll_fn (8B, fn pointer — polls the future)
214/// offset 8: drop_fn (8B, fn pointer — drops F or T in place)
215/// offset 16: free_fn (8B, fn pointer — deallocates the task storage)
216/// offset 24: state (8B, AtomicUsize — packed flags + refcount)
217/// offset 32: cross_next (8B, AtomicPtr — intrusive cross-thread wake queue)
218/// offset 40: join_waker (16B, UnsafeCell<Option<Waker>>)
219/// offset 56: storage_offset (2B, u16 — byte offset to storage field)
220/// offset 58: _pad (2B)
221/// offset 60: tracker_key (4B, u32 — index in Executor::all_tasks slab)
222/// offset 64: cross_wake_ctx (8B, *const CrossWakeContext — cold; read by dispose_terminal)
223/// offset 72: storage (S bytes — future F or union { F, T })
224/// ```
225///
226/// `cross_wake_ctx` lives at the end of the header because it's only
227/// touched on terminal Drop (cold path); hot-path reads (state, drop_fn,
228/// poll_fn, free_fn) stay near the cache-line head.
229#[repr(C)]
230pub(crate) struct Task<S> {
231 /// Polls the future. Receives the task base pointer.
232 poll_fn: unsafe fn(*mut u8, &mut Context<'_>) -> Poll<()>,
233 /// Drops the value at `storage_offset` (future F or output T). Receives base pointer.
234 drop_fn: unsafe fn(*mut u8),
235 /// Deallocates the task storage.
236 free_fn: unsafe fn(*mut u8),
237 /// Packed state word: flags (bits 0-5) + refcount (bits 6+).
238 state: AtomicUsize,
239 /// Intrusive next pointer for the cross-thread wake queue.
240 cross_next: AtomicPtr<u8>,
241 /// Waker for the task awaiting this JoinHandle.
242 join_waker: UnsafeCell<Option<Waker>>,
243 /// Byte offset from task base to the storage field.
244 /// Set at construction from `offset_of!(Task<S>, storage)`.
245 storage_offset: u16,
246 /// Padding for alignment.
247 _pad: [u8; 2],
248 /// Index into the Executor's `all_tasks` slab.
249 tracker_key: u32,
250 /// Pointer to the runtime's [`CrossWakeContext`] (Arc-backed, heap-stable).
251 /// Set at spawn time. Read by `dispose_terminal` on terminal Drop to
252 /// route the task through the owning executor (defer locally) or its
253 /// cross-thread queue (off-thread). Null for tasks not associated
254 /// with any runtime (test-only `Task::new_boxed` path).
255 cross_wake_ctx: *const CrossWakeContext,
256 storage: S,
257}
258
259/// Union storage for joinable tasks. Sized to fit both the future F
260/// and the output T in the same allocation.
261#[repr(C)]
262pub(crate) union FutureOrOutput<F, T> {
263 pub(crate) future: std::mem::ManuallyDrop<F>,
264 pub(crate) output: std::mem::ManuallyDrop<T>,
265}
266
267// Static assertion: header layout matches TASK_HEADER_SIZE.
268const _: () = {
269 assert!(std::mem::size_of::<Task<()>>() == TASK_HEADER_SIZE);
270};
271
272impl<F: Future<Output = ()> + 'static> Task<F> {
273 /// Construct a fire-and-forget task (no JoinHandle) with Box-based free.
274 ///
275 /// Used internally for tests and low-level task construction.
276 /// `ref_count = 1` (executor only), `HAS_JOIN` not set.
277 /// `cross_wake_ctx` is null — test tasks aren't registered with any
278 /// runtime. Terminal frees go through `dispose_terminal`'s on-thread
279 /// defer path (`try_defer_free` if a poll cycle is active, otherwise
280 /// leak until `Executor::drop`'s `all_tasks` scan reclaims them).
281 /// Direct-free is unsafe even for null-ctx tasks because
282 /// `dispose_terminal` doesn't own `all_tasks` bookkeeping — see
283 /// `dispose_terminal`'s doc-comment in `cross_wake.rs` for the full
284 /// rationale.
285 ///
286 /// # Why `Output = ()` is required
287 ///
288 /// This uses `poll_join::<F>` which writes T at the storage offset
289 /// after dropping F. The storage is `F` (not `FutureOrOutput<F, T>`),
290 /// so it's only sized for F. With `T = ()` (ZST), the write is
291 /// zero-size and the `drop_fn` overwrite to `drop_output::<()>` is a
292 /// no-op. Relaxing this bound to non-ZST T would write T into
293 /// storage not sized for it — UB.
294 #[cfg(test)]
295 #[inline]
296 pub(crate) fn new_boxed(future: F, tracker_key: u32) -> Self {
297 Self {
298 poll_fn: poll_join::<F>,
299 drop_fn: drop_future::<F>,
300 free_fn: box_free::<F>,
301 state: AtomicUsize::new(REF_ONE),
302 cross_next: AtomicPtr::new(std::ptr::null_mut()),
303 join_waker: UnsafeCell::new(None),
304 storage_offset: std::mem::offset_of!(Task<F>, storage) as u16,
305 tracker_key,
306 _pad: [0; 2],
307 cross_wake_ctx: std::ptr::null(),
308 storage: future,
309 }
310 }
311}
312
313/// Allocate a joinable Box task and return the raw pointer.
314///
315/// The task has `ref_count = 2` (executor + JoinHandle) and `HAS_JOIN` set.
316/// The allocation is sized for `max(size_of::<F>(), size_of::<T>())` via
317/// the `FutureOrOutput<F, T>` union.
318///
319/// `cross_wake_ctx` should be `Arc::as_ptr(&runtime.cross_wake)` for
320/// real spawns (Arc-backed, heap-stable), or `std::ptr::null()` for
321/// tasks not associated with any runtime (test paths only). Read by
322/// `dispose_terminal` on terminal Drop.
323pub(crate) fn box_spawn_joinable<F>(
324 future: F,
325 tracker_key: u32,
326 cross_wake_ctx: *const CrossWakeContext,
327) -> *mut u8
328where
329 F: Future + 'static,
330 F::Output: 'static,
331{
332 type Storage<F> = FutureOrOutput<F, <F as Future>::Output>;
333
334 let task: Task<Storage<F>> = Task {
335 poll_fn: poll_join::<F>,
336 drop_fn: drop_future_in_union::<F>,
337 free_fn: box_free::<Storage<F>>,
338 state: AtomicUsize::new(HAS_JOIN | (2 * REF_ONE)),
339 cross_next: AtomicPtr::new(std::ptr::null_mut()),
340 join_waker: UnsafeCell::new(None),
341 storage_offset: std::mem::offset_of!(Task<Storage<F>>, storage) as u16,
342 tracker_key,
343 _pad: [0; 2],
344 cross_wake_ctx,
345 storage: FutureOrOutput {
346 future: std::mem::ManuallyDrop::new(future),
347 },
348 };
349 Box::into_raw(Box::new(task)) as *mut u8
350}
351
352/// Construct a joinable task for slab allocation.
353///
354/// Returns the task struct to be copied into a slab slot. Uses the
355/// `FutureOrOutput<F, T>` union so the allocation fits both.
356///
357/// See `box_spawn_joinable` for the `cross_wake_ctx` contract.
358pub(crate) fn new_joinable_slab<F>(
359 future: F,
360 tracker_key: u32,
361 free_fn: unsafe fn(*mut u8),
362 cross_wake_ctx: *const CrossWakeContext,
363) -> Task<FutureOrOutput<F, F::Output>>
364where
365 F: Future + 'static,
366 F::Output: 'static,
367{
368 Task {
369 poll_fn: poll_join::<F>,
370 drop_fn: drop_future_in_union::<F>,
371 free_fn,
372 state: AtomicUsize::new(HAS_JOIN | SLAB_ALLOCATED | (2 * REF_ONE)),
373 cross_next: AtomicPtr::new(std::ptr::null_mut()),
374 join_waker: UnsafeCell::new(None),
375 storage_offset: std::mem::offset_of!(Task<FutureOrOutput<F, F::Output>>, storage) as u16,
376 tracker_key,
377 _pad: [0; 2],
378 cross_wake_ctx,
379 storage: FutureOrOutput {
380 future: std::mem::ManuallyDrop::new(future),
381 },
382 }
383}
384
385// =============================================================================
386// Task handle — raw pointer operations
387// =============================================================================
388
389/// Opaque task identifier. Wraps the raw pointer to the task.
390/// The pointer is stable for the task's lifetime.
391#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
392pub(crate) struct TaskId(pub(crate) *mut u8);
393
394// =============================================================================
395// JoinHandle
396// =============================================================================
397
398/// Handle to a spawned task. Await to get the result.
399///
400/// Dropping the handle detaches the task — it continues running but the
401/// output is dropped when the task completes. Use [`abort()`](Self::abort)
402/// to cancel the task.
403///
404/// `JoinHandle` is `!Send` and `!Sync` — it must stay on the executor thread.
405#[must_use = "dropping a JoinHandle detaches the task — await it or call .abort()"]
406pub struct JoinHandle<T> {
407 ptr: *mut u8,
408 _marker: PhantomData<T>,
409 _not_send: PhantomData<*const ()>, // !Send + !Sync
410}
411
412impl<T: 'static> Future for JoinHandle<T> {
413 type Output = T;
414
415 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
416 let ptr = self.ptr;
417
418 // SAFETY: ptr is valid — JoinHandle holds a ref (refcount >= 1).
419 if unsafe { is_completed(ptr) } {
420 let s = unsafe { state_load(ptr) };
421 assert!(s & ABORTED == 0, "polled JoinHandle after task was aborted");
422 // SAFETY: Task completed, so poll_join already transitioned the union
423 // from F to T. The output is live at storage_offset. ptr::read moves
424 // it out (bitwise copy). OUTPUT_TAKEN prevents double-read.
425 let output_ptr = unsafe { ptr.add(storage_offset(ptr)) };
426 let value = unsafe { std::ptr::read(output_ptr.cast::<T>()) };
427 unsafe { set_output_taken(ptr) };
428 Poll::Ready(value)
429 } else {
430 // SAFETY: Task still running, single-threaded — safe to write waker.
431 unsafe { set_join_waker(ptr, cx.waker().clone()) };
432 Poll::Pending
433 }
434 }
435}
436
437impl<T> JoinHandle<T> {
438 pub(crate) fn new(ptr: *mut u8) -> Self {
439 Self {
440 ptr,
441 _marker: PhantomData,
442 _not_send: PhantomData,
443 }
444 }
445
446 /// Returns `true` if the task has completed (output is ready).
447 pub fn is_finished(&self) -> bool {
448 unsafe { is_completed(self.ptr) }
449 }
450
451 /// Abort the task and consume the handle.
452 ///
453 /// The future is dropped on the next poll cycle. Consumes the handle
454 /// so it cannot be awaited after abort — this is enforced at the type
455 /// level rather than via a runtime panic.
456 ///
457 /// Returns `true` if the task was still running, `false` if it had
458 /// already completed (output is dropped by `JoinHandle::drop`).
459 #[must_use = "returns whether the task was still running"]
460 pub fn abort(self) -> bool {
461 let ptr = self.ptr;
462 let was_running = !unsafe { is_completed(ptr) };
463 if was_running {
464 unsafe { set_aborted(ptr) };
465 }
466 // self is consumed — Drop runs, which clears HAS_JOIN,
467 // takes the join waker, and decrements refcount.
468 was_running
469 }
470
471 /// Test-only raw pointer accessor for white-box scenarios that
472 /// need to drive the underlying task allocation directly (e.g.,
473 /// the cross_wake UAF regression test in PR 2). Avoids the
474 /// `repr(Rust)` layout assumption that `mem::transmute_copy`
475 /// would otherwise rely on.
476 #[cfg(test)]
477 pub(crate) fn raw_ptr(&self) -> *mut u8 {
478 self.ptr
479 }
480}
481
482impl<T> Drop for JoinHandle<T> {
483 fn drop(&mut self) {
484 let ptr = self.ptr;
485 // SAFETY: ptr is valid — JoinHandle holds a ref (refcount >= 1).
486 let s = unsafe { state_load(ptr) };
487
488 if (s & COMPLETED != 0) && (s & OUTPUT_TAKEN == 0) && (s & ABORTED == 0) {
489 // Task completed but output was never read — drop it.
490 // SAFETY: poll_join overwrote drop_fn to drop_output::<T>,
491 // so this drops the output T (not the future F).
492 unsafe { drop_task_future(ptr) };
493 }
494
495 // Clear HAS_JOIN so complete_task knows nobody is waiting.
496 // Take the join waker to release the parent task's refcount.
497 unsafe { clear_has_join(ptr) };
498 let _ = unsafe { take_join_waker(ptr) };
499
500 // Release our reference via TaskRef. Drop routes terminal state
501 // through dispose_terminal — defers via DEFERRED_FREE TLS on the
502 // executor thread (so all_tasks bookkeeping stays consistent),
503 // queues cross-thread otherwise, frees directly for null-ctx
504 // (test) tasks. JoinHandle is !Send so we're always on the
505 // executor thread here, but the routing handles all cases.
506 // SAFETY: JoinHandle owned exactly one ref on `ptr`; we hand
507 // it off to TaskRef which will ref_dec on Drop.
508 drop(unsafe { TaskRef::from_owned(ptr) });
509 }
510}
511
512// =============================================================================
513// Packed state accessor functions
514// =============================================================================
515
516/// Get the raw state value.
517///
518/// # Safety
519///
520/// `ptr` must point to a live `Task<F>`.
521#[inline]
522unsafe fn state_load(ptr: *mut u8) -> usize {
523 // SAFETY: state is AtomicUsize at offset 24 in repr(C) Task.
524 unsafe { &*ptr.add(24).cast::<AtomicUsize>() }.load(Ordering::Acquire)
525}
526
527/// Get a reference to the state atomic.
528///
529/// # Safety
530///
531/// `ptr` must point to a live `Task<F>`.
532#[inline]
533unsafe fn state_ref(ptr: *mut u8) -> &'static AtomicUsize {
534 // SAFETY: state is AtomicUsize at offset 24 in repr(C) Task.
535 // 'static is a lie — the caller must not outlive the task.
536 unsafe { &*ptr.add(24).cast::<AtomicUsize>() }
537}
538
539/// Read the `tracker_key` from a task pointer.
540///
541/// # Safety
542///
543/// `ptr` must point to a live `Task<F>`.
544#[inline]
545pub(crate) unsafe fn tracker_key(ptr: *mut u8) -> u32 {
546 // SAFETY: tracker_key is at offset 60 in repr(C) Task.
547 unsafe { *(ptr.add(60).cast::<u32>()) }
548}
549
550/// Increment the waker refcount. Called on waker clone.
551///
552/// # Safety
553///
554/// `ptr` must point to a live `Task<F>`.
555#[inline]
556pub(crate) unsafe fn ref_inc(ptr: *mut u8) {
557 let state = unsafe { state_ref(ptr) };
558 let prev = state.fetch_add(REF_ONE, Ordering::Relaxed);
559 debug_assert!((prev & REF_MASK) > 0, "ref_inc on zero refcount");
560}
561
562/// Decrement the refcount. Returns `FreeAction` indicating whether
563/// a terminal state was produced and what kind of allocation it is.
564///
565/// # Safety
566///
567/// `ptr` must point to a live (or completed) `Task<F>`.
568#[inline]
569pub(crate) unsafe fn ref_dec(ptr: *mut u8) -> FreeAction {
570 let state = unsafe { state_ref(ptr) };
571 let prev = state.fetch_sub(REF_ONE, Ordering::AcqRel);
572 debug_assert!((prev & REF_MASK) >= REF_ONE, "ref_dec on zero refcount");
573
574 // Was this the last ref?
575 if (prev & REF_MASK) != REF_ONE {
576 return FreeAction::Retain;
577 }
578
579 // Last ref. Check: COMPLETED must be set, lifecycle flags must be clear.
580 // ABORTED, OUTPUT_TAKEN, SLAB_ALLOCATED are inert — don't block terminal.
581 let flags = prev & FLAG_MASK;
582 if (flags & COMPLETED == 0) || (flags & LIFECYCLE_MASK != 0) {
583 return FreeAction::Retain;
584 }
585 if flags & SLAB_ALLOCATED != 0 {
586 FreeAction::FreeSlab
587 } else {
588 FreeAction::FreeBox
589 }
590}
591
592/// Read the refcount.
593///
594/// # Safety
595///
596/// `ptr` must point to a live `Task<F>`.
597#[inline]
598pub(crate) unsafe fn ref_count(ptr: *mut u8) -> usize {
599 (unsafe { state_load(ptr) } & REF_MASK) >> 6
600}
601
602/// Atomically set COMPLETED and decrement the executor's reference.
603/// Returns `FreeAction` indicating whether a terminal state was produced.
604///
605/// This is the key atomic operation that eliminates the race between
606/// `set_completed` and `ref_dec` that caused the SIGABRT.
607///
608/// # Safety
609///
610/// `ptr` must point to a live, not-yet-completed `Task<F>`.
611#[inline]
612pub(crate) unsafe fn complete_and_unref(ptr: *mut u8) -> FreeAction {
613 let state = unsafe { state_ref(ptr) };
614 // Atomically: set COMPLETED (add 1 to bit 0) + dec refcount (sub REF_ONE)
615 // Net subtraction = REF_ONE - COMPLETED.
616 let prev = state.fetch_sub(REF_ONE - COMPLETED, Ordering::AcqRel);
617 debug_assert!(prev & COMPLETED == 0, "double complete");
618 debug_assert!(
619 (prev & REF_MASK) >= REF_ONE,
620 "complete_and_unref on zero refcount"
621 );
622 // prev had COMPLETED=0. Last ref if prev had exactly REF_ONE.
623 // Lifecycle flags (QUEUED, HAS_JOIN) must be clear.
624 // Inert flags (ABORTED, OUTPUT_TAKEN, SLAB_ALLOCATED) don't matter.
625 if (prev & REF_MASK) != REF_ONE {
626 return FreeAction::Retain;
627 }
628 let flags = prev & FLAG_MASK;
629 if flags & LIFECYCLE_MASK != 0 {
630 return FreeAction::Retain;
631 }
632 if flags & SLAB_ALLOCATED != 0 {
633 FreeAction::FreeSlab
634 } else {
635 FreeAction::FreeBox
636 }
637}
638
639/// Check if the state is TERMINAL (safe to free).
640///
641/// # Safety
642///
643/// `ptr` must point to a live `Task<F>`.
644#[inline]
645pub(crate) unsafe fn is_terminal(ptr: *mut u8) -> bool {
646 let s = unsafe { state_load(ptr) };
647 // Strip inert flags (SLAB_ALLOCATED, ABORTED, OUTPUT_TAKEN).
648 // What remains must be exactly COMPLETED with zero refcount.
649 (s & !INERT_MASK) == COMPLETED
650}
651
652/// Read the is_completed flag.
653///
654/// # Safety
655///
656/// `ptr` must point to a (possibly completed) `Task<F>`.
657#[inline]
658pub(crate) unsafe fn is_completed(ptr: *mut u8) -> bool {
659 (unsafe { state_load(ptr) }) & COMPLETED != 0
660}
661
662/// Read the SLAB_ALLOCATED flag.
663///
664/// Used by `Executor::drop` to differentiate cleanup behavior between
665/// slab- and box-allocated tasks during unwinding.
666///
667/// # Safety
668///
669/// `ptr` must point to a live `Task<F>`.
670#[inline]
671pub(crate) unsafe fn is_slab_allocated(ptr: *mut u8) -> bool {
672 (unsafe { state_load(ptr) }) & SLAB_ALLOCATED != 0
673}
674
675/// Read the is_queued flag.
676///
677/// # Safety
678///
679/// `ptr` must point to a live `Task<F>`.
680#[inline]
681pub(crate) unsafe fn is_queued(ptr: *mut u8) -> bool {
682 (unsafe { state_load(ptr) }) & QUEUED != 0
683}
684
685/// Set the `is_queued` flag.
686///
687/// # Safety
688///
689/// `ptr` must point to a live `Task<F>`.
690#[inline]
691pub(crate) unsafe fn set_queued(ptr: *mut u8, queued: bool) {
692 let state = unsafe { state_ref(ptr) };
693 if queued {
694 state.fetch_or(QUEUED, Ordering::Release);
695 } else {
696 state.fetch_and(!QUEUED, Ordering::Release);
697 }
698}
699
700/// Atomically try to set QUEUED from false to true. Returns true if
701/// successful (was not queued). Used by cross-thread wakers.
702///
703/// # Safety
704///
705/// `ptr` must point to a live `Task<F>`.
706#[inline]
707pub(crate) unsafe fn try_set_queued(ptr: *mut u8) -> bool {
708 let state = unsafe { state_ref(ptr) };
709 // fetch_or always sets the bit. Check if it was already set.
710 let prev = state.fetch_or(QUEUED, Ordering::AcqRel);
711 (prev & QUEUED) == 0
712}
713
714/// Clear the QUEUED flag.
715///
716/// # Safety
717///
718/// `ptr` must point to a live `Task<F>`.
719#[inline]
720pub(crate) unsafe fn clear_queued(ptr: *mut u8) {
721 let state = unsafe { state_ref(ptr) };
722 state.fetch_and(!QUEUED, Ordering::Release);
723}
724
725/// Check if ABORTED flag is set.
726///
727/// # Safety
728///
729/// `ptr` must point to a live `Task<F>`.
730#[inline]
731pub(crate) unsafe fn is_aborted(ptr: *mut u8) -> bool {
732 (unsafe { state_load(ptr) }) & ABORTED != 0
733}
734
735/// Set the ABORTED flag.
736///
737/// # Safety
738///
739/// `ptr` must point to a live `Task<F>`.
740#[inline]
741pub(crate) unsafe fn set_aborted(ptr: *mut u8) {
742 let state = unsafe { state_ref(ptr) };
743 state.fetch_or(ABORTED, Ordering::Release);
744}
745
746/// Check if HAS_JOIN flag is set.
747///
748/// # Safety
749///
750/// `ptr` must point to a live `Task<F>`.
751#[inline]
752pub(crate) unsafe fn has_join(ptr: *mut u8) -> bool {
753 (unsafe { state_load(ptr) }) & HAS_JOIN != 0
754}
755
756/// Clear the HAS_JOIN flag.
757///
758/// # Safety
759///
760/// `ptr` must point to a live `Task<F>`.
761#[inline]
762pub(crate) unsafe fn clear_has_join(ptr: *mut u8) {
763 let state = unsafe { state_ref(ptr) };
764 state.fetch_and(!HAS_JOIN, Ordering::Release);
765}
766
767/// Set the OUTPUT_TAKEN flag.
768///
769/// # Safety
770///
771/// `ptr` must point to a live, completed `Task<F>`. Single-threaded.
772#[inline]
773unsafe fn set_output_taken(ptr: *mut u8) {
774 let state = unsafe { state_ref(ptr) };
775 state.fetch_or(OUTPUT_TAKEN, Ordering::Release);
776}
777
778/// Get a raw pointer to the `cross_next` atomic pointer.
779///
780/// # Safety
781///
782/// `ptr` must point to a live `Task<F>`.
783#[inline]
784pub(crate) unsafe fn cross_next(ptr: *mut u8) -> *const AtomicPtr<u8> {
785 // SAFETY: cross_next is at offset 32 in repr(C) Task.
786 unsafe { ptr.add(32).cast::<AtomicPtr<u8>>() }
787}
788
789/// Read the storage offset from the task header.
790///
791/// # Safety
792///
793/// `ptr` must point to a live `Task<S>`.
794#[inline]
795pub(crate) unsafe fn storage_offset(ptr: *mut u8) -> usize {
796 // SAFETY: storage_offset is u16 at offset 56 in repr(C) Task.
797 unsafe { *(ptr.add(56).cast::<u16>()) as usize }
798}
799
800/// Read the `cross_wake_ctx` pointer from the task header.
801///
802/// Returns the runtime's [`CrossWakeContext`] pointer (Arc-backed,
803/// heap-stable) set at spawn time, or null for tasks not associated
804/// with any runtime (test path).
805///
806/// # Safety
807///
808/// `ptr` must point to a live `Task<S>` (header still valid).
809#[inline]
810pub(crate) unsafe fn header_cross_wake_ctx(ptr: *mut u8) -> *const CrossWakeContext {
811 // SAFETY: `cross_wake_ctx` is `*const CrossWakeContext` at offset 64
812 // in `repr(C) Task`. The field is initialized exactly once at spawn
813 // time under the spawning thread's exclusive ownership, then made
814 // visible to other threads via Arc/refcount publication (any TaskRef
815 // holder transitively keeps the owning runtime's Arc alive, and the
816 // Arc's atomic-counter publication establishes happens-before for the
817 // read). Immutable after init — concurrent reads from any thread are
818 // sound. dispose_terminal explicitly reads this from foreign threads
819 // when routing terminal drops from cross-thread waker holders.
820 unsafe { *(ptr.add(64).cast::<*const CrossWakeContext>()) }
821}
822
823/// Store a waker for the JoinHandle awaiter.
824///
825/// # Safety
826///
827/// `ptr` must point to a live `Task<F>`. Single-threaded access only.
828#[inline]
829unsafe fn set_join_waker(ptr: *mut u8, waker: Waker) {
830 // SAFETY: join_waker is UnsafeCell<Option<Waker>> at offset 40.
831 let cell = unsafe { &*ptr.add(40).cast::<UnsafeCell<Option<Waker>>>() };
832 unsafe { *cell.get() = Some(waker) };
833}
834
835/// Take the join waker (if any).
836///
837/// # Safety
838///
839/// `ptr` must point to a live `Task<F>`. Single-threaded access only.
840#[inline]
841pub(crate) unsafe fn take_join_waker(ptr: *mut u8) -> Option<Waker> {
842 let cell = unsafe { &*ptr.add(40).cast::<UnsafeCell<Option<Waker>>>() };
843 unsafe { (*cell.get()).take() }
844}
845
846/// Poll the task's future.
847///
848/// # Safety
849///
850/// `ptr` must point to a live `Task<F>`.
851/// The future must not have been dropped.
852#[inline]
853pub(crate) unsafe fn poll_task(ptr: *mut u8, cx: &mut Context<'_>) -> Poll<()> {
854 // SAFETY: poll_fn is at offset 0 in repr(C) Task.
855 let poll_fn: unsafe fn(*mut u8, &mut Context<'_>) -> Poll<()> =
856 unsafe { *(ptr as *const unsafe fn(*mut u8, &mut Context<'_>) -> Poll<()>) };
857 // Pass the task base pointer — the trampoline reads storage_offset.
858 unsafe { poll_fn(ptr, cx) }
859}
860
861/// Drop the task's future (or output) in place.
862///
863/// # Safety
864///
865/// `ptr` must point to a live `Task<F>`. Must only be called once.
866#[inline]
867pub(crate) unsafe fn drop_task_future(ptr: *mut u8) {
868 // SAFETY: drop_fn is at offset 8 in repr(C) Task.
869 let drop_fn: unsafe fn(*mut u8) = unsafe { *(ptr.add(8) as *const unsafe fn(*mut u8)) };
870 // Pass base pointer — the trampoline reads storage_offset.
871 unsafe { drop_fn(ptr) }
872}
873
874/// Call the task's free function to deallocate its storage.
875///
876/// # Safety
877///
878/// `ptr` must point to a `Task<F>` whose future has already been dropped.
879/// Must only be called once (after state reaches TERMINAL).
880#[inline]
881pub(crate) unsafe fn free_task(ptr: *mut u8) {
882 // SAFETY: free_fn is at offset 16 in repr(C) Task.
883 let free_fn: unsafe fn(*mut u8) = unsafe { *(ptr.add(16) as *const unsafe fn(*mut u8)) };
884 unsafe { free_fn(ptr) }
885}
886
887// =============================================================================
888// Type-erased vtable functions
889// =============================================================================
890
891/// Poll trampoline for joinable tasks (Output = T).
892///
893/// On completion: drops F, writes T into the same location, overwrites
894/// drop_fn to target T instead of F.
895///
896/// # Safety
897///
898/// `ptr` must point to a live `Task<F>`. The future must not have been dropped.
899unsafe fn poll_join<F: Future>(ptr: *mut u8, cx: &mut Context<'_>) -> Poll<()>
900where
901 F::Output: 'static,
902{
903 // Check if aborted
904 if unsafe { is_aborted(ptr) } {
905 return Poll::Ready(());
906 }
907
908 let future_ptr = unsafe { ptr.add(storage_offset(ptr)) };
909 let future = unsafe { Pin::new_unchecked(&mut *future_ptr.cast::<F>()) };
910 match future.poll(cx) {
911 Poll::Pending => Poll::Pending,
912 Poll::Ready(value) => {
913 let drop_fn_slot = unsafe { ptr.add(8).cast::<unsafe fn(*mut u8)>() };
914 // 1. Overwrite drop_fn to no-op BEFORE dropping F.
915 // If F::drop() panics, this prevents double-drop —
916 // subsequent cleanup calls the no-op instead of
917 // drop_future_in_union on a partially-dropped F.
918 // The output (value) is dropped during unwind (stack-owned).
919 unsafe { *drop_fn_slot = drop_noop };
920 // 2. Drop the future in place (panic-safe now)
921 unsafe { std::ptr::drop_in_place(future_ptr.cast::<F>()) };
922 // 3. Write output T into the same location
923 unsafe { std::ptr::write(future_ptr.cast::<F::Output>(), value) };
924 // 4. Overwrite drop_fn: now drops T instead of F
925 unsafe { *drop_fn_slot = drop_output::<F::Output> };
926 Poll::Ready(())
927 }
928 }
929}
930
931/// Drop trampoline for futures stored directly (fire-and-forget tasks).
932///
933/// # Safety
934///
935/// `ptr` must point to a live `Task<F>` with a live future at `storage_offset`.
936#[cfg(test)]
937unsafe fn drop_future<F>(ptr: *mut u8) {
938 let future_ptr = unsafe { ptr.add(storage_offset(ptr)) };
939 unsafe { std::ptr::drop_in_place(future_ptr.cast::<F>()) }
940}
941
942/// Drop trampoline for futures stored in FutureOrOutput union.
943///
944/// # Safety
945///
946/// `ptr` must point to a `Task<FutureOrOutput<F, T>>` with a live future.
947unsafe fn drop_future_in_union<F: Future>(ptr: *mut u8) {
948 let storage_ptr = unsafe { ptr.add(storage_offset(ptr)) };
949 // The future is at the start of the union (same offset as the union itself).
950 unsafe { std::ptr::drop_in_place(storage_ptr.cast::<F>()) }
951}
952
953/// No-op drop trampoline. Installed temporarily during the F→T transition
954/// in `poll_join` to prevent double-drop if `F::drop()` panics.
955///
956/// # Safety
957///
958/// Always safe — does nothing.
959unsafe fn drop_noop(_ptr: *mut u8) {}
960
961/// Drop trampoline for output values. Receives the task base pointer.
962///
963/// Installed by `poll_join` after the future completes, replacing `drop_future`.
964///
965/// # Safety
966///
967/// `ptr` must point to a `Task` with a live `T` at `storage_offset`.
968unsafe fn drop_output<T>(ptr: *mut u8) {
969 let output_ptr = unsafe { ptr.add(storage_offset(ptr)) };
970 unsafe { std::ptr::drop_in_place(output_ptr.cast::<T>()) }
971}
972
973/// Free function for Box-allocated tasks.
974///
975/// Deallocates the memory without running destructors — the future/output
976/// was already dropped via `drop_task_future`, and the header fields
977/// are all trivial. Only the heap allocation needs to be freed.
978///
979/// # Safety
980///
981/// `ptr` must have been produced by `Box::into_raw(Box::new(Task<F>))`.
982/// The value at offset 64 must already be dropped.
983unsafe fn box_free<F>(ptr: *mut u8) {
984 // SAFETY: Layout matches what Box::new(Task<F>) allocated.
985 let layout = std::alloc::Layout::new::<Task<F>>();
986 unsafe { std::alloc::dealloc(ptr, layout) }
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992
993 #[test]
994 fn task_header_size() {
995 assert_eq!(TASK_HEADER_SIZE, 72);
996 assert_eq!(std::mem::size_of::<Task<()>>(), 72);
997 }
998
999 #[test]
1000 fn task_layout_offsets() {
1001 assert_eq!(std::mem::offset_of!(Task<()>, poll_fn), 0);
1002 assert_eq!(std::mem::offset_of!(Task<()>, drop_fn), 8);
1003 assert_eq!(std::mem::offset_of!(Task<()>, free_fn), 16);
1004 assert_eq!(std::mem::offset_of!(Task<()>, state), 24);
1005 assert_eq!(std::mem::offset_of!(Task<()>, cross_next), 32);
1006 assert_eq!(std::mem::offset_of!(Task<()>, join_waker), 40);
1007 assert_eq!(std::mem::offset_of!(Task<()>, storage_offset), 56);
1008 assert_eq!(std::mem::offset_of!(Task<()>, _pad), 58);
1009 assert_eq!(std::mem::offset_of!(Task<()>, tracker_key), 60);
1010 assert_eq!(std::mem::offset_of!(Task<()>, cross_wake_ctx), 64);
1011 assert_eq!(std::mem::offset_of!(Task<()>, storage), 72);
1012 }
1013
1014 #[test]
1015 fn task_size_with_future() {
1016 #[allow(dead_code)]
1017 struct SmallFuture([u8; 24]);
1018 impl Future for SmallFuture {
1019 type Output = ();
1020 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1021 Poll::Ready(())
1022 }
1023 }
1024
1025 // 72 byte header + 24 byte future = 96 bytes
1026 assert_eq!(
1027 std::mem::size_of::<Task<SmallFuture>>(),
1028 TASK_HEADER_SIZE + 24
1029 );
1030 }
1031
1032 #[test]
1033 fn packed_state_fire_and_forget() {
1034 struct Noop;
1035 impl Future for Noop {
1036 type Output = ();
1037 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1038 Poll::Ready(())
1039 }
1040 }
1041
1042 let task = Box::new(Task::new_boxed(Noop, 0));
1043 let ptr = Box::into_raw(task) as *mut u8;
1044
1045 unsafe {
1046 // Initial state: 1 ref, no flags
1047 assert_eq!(ref_count(ptr), 1);
1048 assert!(!is_completed(ptr));
1049 assert!(!is_queued(ptr));
1050 assert!(!has_join(ptr));
1051 assert!(!is_terminal(ptr));
1052
1053 // Set and clear queued
1054 set_queued(ptr, true);
1055 assert!(is_queued(ptr));
1056 set_queued(ptr, false);
1057 assert!(!is_queued(ptr));
1058
1059 // complete_and_unref with 1 ref → TERMINAL
1060 drop_task_future(ptr);
1061 assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1062 assert!(is_terminal(ptr));
1063
1064 free_task(ptr);
1065 }
1066 }
1067
1068 #[test]
1069 fn packed_state_joinable() {
1070 struct Noop;
1071 impl Future for Noop {
1072 type Output = u64;
1073 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1074 Poll::Ready(42)
1075 }
1076 }
1077
1078 let ptr = box_spawn_joinable(Noop, 7, std::ptr::null());
1079 unsafe {
1080 assert!(has_join(ptr));
1081 assert!(!is_aborted(ptr));
1082 assert_eq!(ref_count(ptr), 2); // executor + JoinHandle
1083 assert_eq!(tracker_key(ptr), 7);
1084
1085 // Simulate: handle drops before completion
1086 clear_has_join(ptr);
1087 assert!(!has_join(ptr));
1088 assert!(matches!(ref_dec(ptr), FreeAction::Retain)); // still 1 ref, not completed
1089
1090 // complete_and_unref → TERMINAL
1091 drop_task_future(ptr);
1092 assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1093 assert!(is_terminal(ptr));
1094
1095 free_task(ptr);
1096 }
1097 }
1098
1099 #[test]
1100 fn packed_state_joinable_completion_before_handle_drop() {
1101 struct Noop;
1102 impl Future for Noop {
1103 type Output = u64;
1104 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1105 Poll::Ready(42)
1106 }
1107 }
1108
1109 let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1110 unsafe {
1111 // complete_and_unref with 2 refs → not terminal
1112 drop_task_future(ptr);
1113 assert!(matches!(complete_and_unref(ptr), FreeAction::Retain));
1114 assert!(is_completed(ptr));
1115 assert_eq!(ref_count(ptr), 1);
1116
1117 // Handle drop: clear HAS_JOIN, ref_dec → TERMINAL
1118 clear_has_join(ptr);
1119 assert!(matches!(ref_dec(ptr), FreeAction::FreeBox));
1120 assert!(is_terminal(ptr));
1121
1122 free_task(ptr);
1123 }
1124 }
1125
1126 #[test]
1127 fn packed_state_cross_thread_waker_scenario() {
1128 struct Noop;
1129 impl Future for Noop {
1130 type Output = u64;
1131 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1132 Poll::Ready(42)
1133 }
1134 }
1135
1136 let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1137 unsafe {
1138 // Waker clone: ref_inc
1139 ref_inc(ptr);
1140 assert_eq!(ref_count(ptr), 3);
1141
1142 // complete_and_unref: executor releases its ref
1143 drop_task_future(ptr);
1144 assert!(matches!(complete_and_unref(ptr), FreeAction::Retain));
1145
1146 // Handle drop: clear HAS_JOIN, ref_dec
1147 clear_has_join(ptr);
1148 assert!(matches!(ref_dec(ptr), FreeAction::Retain)); // still 1 ref (waker)
1149
1150 // Waker drop: ref_dec → TERMINAL
1151 assert!(matches!(ref_dec(ptr), FreeAction::FreeBox));
1152 assert!(is_terminal(ptr));
1153
1154 free_task(ptr);
1155 }
1156 }
1157
1158 // =========================================================================
1159 // Panic safety — drop_fn transitions
1160 // =========================================================================
1161
1162 /// Future whose Drop impl panics. Used to verify the drop_noop guard
1163 /// in poll_join prevents double-drop.
1164 struct PanickingDrop {
1165 drop_count: *mut u32,
1166 }
1167
1168 impl Future for PanickingDrop {
1169 type Output = u64;
1170 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1171 Poll::Ready(42)
1172 }
1173 }
1174
1175 impl Drop for PanickingDrop {
1176 fn drop(&mut self) {
1177 unsafe { *self.drop_count += 1 };
1178 panic!("intentional drop panic");
1179 }
1180 }
1181
1182 #[test]
1183 fn poll_join_panic_in_drop_prevents_double_drop() {
1184 use std::task::{RawWaker, RawWakerVTable, Waker};
1185
1186 static NOOP_VTABLE: RawWakerVTable =
1187 RawWakerVTable::new(|p| RawWaker::new(p, &NOOP_VTABLE), |_| {}, |_| {}, |_| {});
1188 let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &NOOP_VTABLE)) };
1189 let mut cx = Context::from_waker(&waker);
1190
1191 let mut drop_count: u32 = 0;
1192 let ptr = box_spawn_joinable(
1193 PanickingDrop {
1194 drop_count: &raw mut drop_count,
1195 },
1196 0,
1197 std::ptr::null(),
1198 );
1199
1200 // poll_join completes the future, then drops F — which panics.
1201 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| unsafe {
1202 poll_task(ptr, &mut cx)
1203 }));
1204
1205 // The panic should have been caught.
1206 assert!(result.is_err(), "expected panic from PanickingDrop");
1207 // F was dropped exactly once (by poll_join, before the panic propagated).
1208 assert_eq!(drop_count, 1, "future should be dropped exactly once");
1209
1210 // drop_fn should now be drop_noop — calling it must NOT double-drop F.
1211 unsafe { drop_task_future(ptr) };
1212 assert_eq!(
1213 drop_count, 1,
1214 "drop_task_future after panic must be a no-op (drop_noop)"
1215 );
1216
1217 // Clean up: dec both refs (executor + JoinHandle), then free.
1218 unsafe {
1219 ref_dec(ptr);
1220 ref_dec(ptr);
1221 free_task(ptr);
1222 }
1223 }
1224
1225 #[test]
1226 fn drop_fn_transitions_correctly_on_normal_completion() {
1227 use std::task::{RawWaker, RawWakerVTable, Waker};
1228
1229 static NOOP_VTABLE: RawWakerVTable =
1230 RawWakerVTable::new(|p| RawWaker::new(p, &NOOP_VTABLE), |_| {}, |_| {}, |_| {});
1231 let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &NOOP_VTABLE)) };
1232 let mut cx = Context::from_waker(&waker);
1233
1234 static mut OUTPUT_DROP_COUNT: u32 = 0;
1235 struct TrackedOutput;
1236 impl Drop for TrackedOutput {
1237 fn drop(&mut self) {
1238 unsafe { OUTPUT_DROP_COUNT += 1 };
1239 }
1240 }
1241
1242 struct ProduceTracked;
1243 impl Future for ProduceTracked {
1244 type Output = TrackedOutput;
1245 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<TrackedOutput> {
1246 Poll::Ready(TrackedOutput)
1247 }
1248 }
1249
1250 let ptr = box_spawn_joinable(ProduceTracked, 0, std::ptr::null());
1251
1252 // Poll to completion — F dropped, T written, drop_fn → drop_output.
1253 let result = unsafe { poll_task(ptr, &mut cx) };
1254 assert!(result.is_ready());
1255
1256 // drop_fn should now target T (TrackedOutput).
1257 unsafe { OUTPUT_DROP_COUNT = 0 };
1258 unsafe { drop_task_future(ptr) };
1259 assert_eq!(
1260 unsafe { OUTPUT_DROP_COUNT },
1261 1,
1262 "drop_fn should drop the output exactly once"
1263 );
1264
1265 // Clean up.
1266 unsafe {
1267 ref_dec(ptr);
1268 ref_dec(ptr);
1269 free_task(ptr);
1270 }
1271 }
1272
1273 // =========================================================================
1274 // Packed state word — SIGABRT root cause regression tests
1275 // =========================================================================
1276
1277 #[test]
1278 fn packed_state_fire_and_forget_terminal() {
1279 // Box task with 1 ref (no JoinHandle). complete_and_unref → FreeBox.
1280 // Verify terminal state is exactly TERMINAL_BOX (1).
1281 struct Noop;
1282 impl Future for Noop {
1283 type Output = ();
1284 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1285 Poll::Ready(())
1286 }
1287 }
1288
1289 let task = Box::new(Task::new_boxed(Noop, 0));
1290 let ptr = Box::into_raw(task) as *mut u8;
1291
1292 unsafe {
1293 assert_eq!(ref_count(ptr), 1);
1294 assert!(!has_join(ptr));
1295
1296 drop_task_future(ptr);
1297 let action = complete_and_unref(ptr);
1298 assert_eq!(action, FreeAction::FreeBox);
1299
1300 let s = state_load(ptr);
1301 assert_eq!(s, COMPLETED, "terminal state must have COMPLETED set");
1302 assert_eq!(s, 1);
1303 assert!(is_terminal(ptr));
1304
1305 free_task(ptr);
1306 }
1307 }
1308
1309 #[test]
1310 fn packed_state_slab_flag_terminal() {
1311 // Task with SLAB_ALLOCATED set. complete_and_unref → FreeSlab.
1312 // Verify terminal state is exactly TERMINAL_SLAB (33).
1313 struct Noop;
1314 impl Future for Noop {
1315 type Output = u64;
1316 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1317 Poll::Ready(42)
1318 }
1319 }
1320
1321 // Use new_joinable_slab to get SLAB_ALLOCATED flag set at construction.
1322 // Provide a free_fn that does Box dealloc (we box it manually below).
1323 type Storage = FutureOrOutput<Noop, u64>;
1324 unsafe fn slab_free(ptr: *mut u8) {
1325 let layout = std::alloc::Layout::new::<Task<Storage>>();
1326 std::alloc::dealloc(ptr, layout);
1327 }
1328
1329 let task = new_joinable_slab(Noop, 0, slab_free, std::ptr::null());
1330 let ptr = Box::into_raw(Box::new(task)) as *mut u8;
1331
1332 unsafe {
1333 assert_eq!(ref_count(ptr), 2); // executor + JoinHandle
1334 assert!(has_join(ptr));
1335
1336 // Simulate handle detach: clear HAS_JOIN + ref_dec
1337 clear_has_join(ptr);
1338 assert_eq!(ref_dec(ptr), FreeAction::Retain);
1339 assert_eq!(ref_count(ptr), 1);
1340
1341 // Executor completes task
1342 drop_task_future(ptr);
1343 let action = complete_and_unref(ptr);
1344 assert_eq!(action, FreeAction::FreeSlab);
1345
1346 let s = state_load(ptr);
1347 assert_eq!(
1348 s,
1349 COMPLETED | SLAB_ALLOCATED,
1350 "terminal state must be COMPLETED | SLAB_ALLOCATED"
1351 );
1352 assert_eq!(s, 33);
1353 assert!(is_terminal(ptr));
1354
1355 slab_free(ptr);
1356 }
1357 }
1358
1359 #[test]
1360 fn packed_state_joinable_handle_drops_first() {
1361 // Joinable task (2 refs + HAS_JOIN). Handle drops first:
1362 // clear HAS_JOIN → ref_dec → 1 ref remaining.
1363 // Then complete_and_unref → terminal.
1364 struct Noop;
1365 impl Future for Noop {
1366 type Output = u64;
1367 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1368 Poll::Ready(42)
1369 }
1370 }
1371
1372 let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1373 unsafe {
1374 assert_eq!(ref_count(ptr), 2);
1375 assert!(has_join(ptr));
1376
1377 // Handle drops: clear HAS_JOIN, ref_dec
1378 clear_has_join(ptr);
1379 assert!(!has_join(ptr));
1380 assert_eq!(ref_dec(ptr), FreeAction::Retain);
1381 assert_eq!(ref_count(ptr), 1);
1382 assert!(!is_terminal(ptr));
1383
1384 // Executor completes
1385 drop_task_future(ptr);
1386 assert_eq!(complete_and_unref(ptr), FreeAction::FreeBox);
1387 assert!(is_terminal(ptr));
1388
1389 free_task(ptr);
1390 }
1391 }
1392
1393 #[test]
1394 fn packed_state_joinable_completion_first_then_handle() {
1395 // Joinable task. Completion fires first (Retain because 2 refs).
1396 // Then handle clears HAS_JOIN + ref_dec → FreeBox.
1397 struct Noop;
1398 impl Future for Noop {
1399 type Output = u64;
1400 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1401 Poll::Ready(42)
1402 }
1403 }
1404
1405 let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1406 unsafe {
1407 // complete_and_unref: sets COMPLETED, dec ref → 1 ref remains
1408 drop_task_future(ptr);
1409 assert_eq!(complete_and_unref(ptr), FreeAction::Retain);
1410 assert!(is_completed(ptr));
1411 assert_eq!(ref_count(ptr), 1);
1412
1413 // Handle drops: clear HAS_JOIN, ref_dec → terminal
1414 clear_has_join(ptr);
1415 assert_eq!(ref_dec(ptr), FreeAction::FreeBox);
1416 assert!(is_terminal(ptr));
1417
1418 free_task(ptr);
1419 }
1420 }
1421
1422 #[test]
1423 fn packed_state_waker_clone_lifecycle() {
1424 // Joinable task (2 refs). Waker clone adds 3rd ref.
1425 // complete_and_unref → Retain (2 refs remain, HAS_JOIN still set).
1426 // Handle drops (clear HAS_JOIN + ref_dec) → Retain (1 ref from waker).
1427 // Waker drops (ref_dec) → FreeBox.
1428 struct Noop;
1429 impl Future for Noop {
1430 type Output = u64;
1431 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1432 Poll::Ready(42)
1433 }
1434 }
1435
1436 let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1437 unsafe {
1438 // Waker clone: ref_inc
1439 ref_inc(ptr);
1440 assert_eq!(ref_count(ptr), 3);
1441
1442 // Executor completes: complete_and_unref
1443 drop_task_future(ptr);
1444 assert_eq!(complete_and_unref(ptr), FreeAction::Retain);
1445 assert_eq!(ref_count(ptr), 2);
1446
1447 // Handle drops: clear HAS_JOIN, ref_dec
1448 clear_has_join(ptr);
1449 assert_eq!(ref_dec(ptr), FreeAction::Retain);
1450 assert_eq!(ref_count(ptr), 1);
1451
1452 // Waker drops: ref_dec → terminal
1453 assert_eq!(ref_dec(ptr), FreeAction::FreeBox);
1454 assert!(is_terminal(ptr));
1455
1456 free_task(ptr);
1457 }
1458 }
1459
1460 #[test]
1461 fn packed_state_leaked_flag_prevents_terminal() {
1462 // If HAS_JOIN is NOT cleared before the final ref_dec, the state
1463 // won't reach terminal (HAS_JOIN is a lifecycle flag that blocks it).
1464 // Result: Retain. This is safe — the lifecycle flag prevents
1465 // premature free.
1466 struct Noop;
1467 impl Future for Noop {
1468 type Output = u64;
1469 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1470 Poll::Ready(42)
1471 }
1472 }
1473
1474 let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1475 unsafe {
1476 // complete_and_unref with 2 refs → Retain
1477 drop_task_future(ptr);
1478 assert_eq!(complete_and_unref(ptr), FreeAction::Retain);
1479
1480 // ref_dec WITHOUT clearing HAS_JOIN → still not terminal
1481 // because HAS_JOIN is a lifecycle flag that blocks terminal.
1482 assert_eq!(ref_dec(ptr), FreeAction::Retain);
1483 assert!(!is_terminal(ptr));
1484
1485 // State is COMPLETED | HAS_JOIN | 0 refs — leaked but safe.
1486 // In real code this can't happen (JoinHandle::Drop always
1487 // clears HAS_JOIN), but the packed state correctly prevents
1488 // a free even if it did.
1489 let s = state_load(ptr);
1490 assert_eq!(s & COMPLETED, COMPLETED);
1491 assert_eq!(s & HAS_JOIN, HAS_JOIN);
1492 assert_eq!(ref_count(ptr), 0);
1493
1494 // Clean up: manually clear HAS_JOIN to reach terminal, then free.
1495 clear_has_join(ptr);
1496 assert!(is_terminal(ptr));
1497 free_task(ptr);
1498 }
1499 }
1500
1501 // =========================================================================
1502 // TaskRef — RAII inc/dec discipline
1503 // =========================================================================
1504
1505 #[test]
1506 fn taskref_acquire_drop_balances_refcount() {
1507 // Acquire a TaskRef on a task with rc=1; rc goes to 2. Drop the
1508 // TaskRef; rc goes back to 1. Verifies acquire(ref_inc) is paired
1509 // with Drop(ref_dec).
1510 let task = Box::new(Task::new_boxed(async {}, 0));
1511 let ptr = Box::into_raw(task) as *mut u8;
1512
1513 unsafe {
1514 assert_eq!(ref_count(ptr), 1);
1515 let task_ref = TaskRef::acquire(ptr);
1516 assert_eq!(ref_count(ptr), 2);
1517 drop(task_ref); // Not terminal — rc 2 → 1
1518 assert_eq!(ref_count(ptr), 1);
1519
1520 // Cleanup: complete and free directly (no TaskRef path).
1521 drop_task_future(ptr);
1522 assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1523 free_task(ptr);
1524 }
1525 }
1526
1527 #[test]
1528 fn taskref_from_owned_drop_balances_refcount() {
1529 // Manually ref_inc, then wrap with from_owned (no extra inc).
1530 // Drop releases the manual ref. Verifies from_owned's "wrap a
1531 // pre-incremented pointer" contract.
1532 let task = Box::new(Task::new_boxed(async {}, 0));
1533 let ptr = Box::into_raw(task) as *mut u8;
1534
1535 unsafe {
1536 assert_eq!(ref_count(ptr), 1);
1537 ref_inc(ptr); // simulate handoff (e.g. RawWaker::data ownership)
1538 assert_eq!(ref_count(ptr), 2);
1539 let task_ref = TaskRef::from_owned(ptr);
1540 // No additional ref_inc — TaskRef takes the existing ref.
1541 assert_eq!(ref_count(ptr), 2);
1542 drop(task_ref); // releases the manual ref
1543 assert_eq!(ref_count(ptr), 1);
1544
1545 // Cleanup.
1546 drop_task_future(ptr);
1547 assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1548 free_task(ptr);
1549 }
1550 }
1551
1552 #[test]
1553 fn taskref_drop_non_terminal_no_dispose() {
1554 // Drop a TaskRef when refcount > 1 — should NOT invoke
1555 // dispose_terminal. Verifies Drop's terminal gate (Retain branch
1556 // returns immediately).
1557 let task = Box::new(Task::new_boxed(async {}, 0));
1558 let ptr = Box::into_raw(task) as *mut u8;
1559
1560 unsafe {
1561 // rc=1, acquire to rc=2, drop to rc=1. Not terminal (no
1562 // COMPLETED, lifecycle flags clear). Drop hits the Retain
1563 // branch — no dispose_terminal call. If dispose_terminal
1564 // were called with a non-terminal task it would assert (or
1565 // worse, free a live task).
1566 let task_ref = TaskRef::acquire(ptr);
1567 assert_eq!(ref_count(ptr), 2);
1568 drop(task_ref);
1569 assert_eq!(ref_count(ptr), 1);
1570 assert!(!is_completed(ptr));
1571
1572 // Cleanup.
1573 drop_task_future(ptr);
1574 assert!(matches!(complete_and_unref(ptr), FreeAction::FreeBox));
1575 free_task(ptr);
1576 }
1577 }
1578
1579 #[test]
1580 fn packed_state_many_refs_converge() {
1581 // Clone waker 10 times (ref_inc 10x), complete, then ref_dec 10x.
1582 // Only the last ref_dec returns FreeBox. All others Retain.
1583 struct Noop;
1584 impl Future for Noop {
1585 type Output = u64;
1586 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u64> {
1587 Poll::Ready(42)
1588 }
1589 }
1590
1591 let ptr = box_spawn_joinable(Noop, 0, std::ptr::null());
1592 unsafe {
1593 // 10 waker clones: ref 2 → 12
1594 for _ in 0..10 {
1595 ref_inc(ptr);
1596 }
1597 assert_eq!(ref_count(ptr), 12);
1598
1599 // Executor completes: ref 12 → 11
1600 drop_task_future(ptr);
1601 assert_eq!(complete_and_unref(ptr), FreeAction::Retain);
1602 assert_eq!(ref_count(ptr), 11);
1603
1604 // Handle drops: clear HAS_JOIN, ref_dec → 10
1605 clear_has_join(ptr);
1606 assert_eq!(ref_dec(ptr), FreeAction::Retain);
1607 assert_eq!(ref_count(ptr), 10);
1608
1609 // Drop 9 waker refs — all Retain
1610 for i in 0..9 {
1611 assert_eq!(
1612 ref_dec(ptr),
1613 FreeAction::Retain,
1614 "ref_dec #{i} should Retain"
1615 );
1616 }
1617 assert_eq!(ref_count(ptr), 1);
1618
1619 // Last waker drop → FreeBox
1620 assert_eq!(ref_dec(ptr), FreeAction::FreeBox);
1621 assert!(is_terminal(ptr));
1622
1623 free_task(ptr);
1624 }
1625 }
1626}