Skip to main content

asupersync/cx/
scope.rs

1//! Scope API for spawning work within a region.
2//!
3//! A `Scope` provides the API for spawning tasks, creating child regions,
4//! and registering finalizers.
5//!
6//! # Execution Tiers and Soundness Rules
7//!
8//! Asupersync defines two execution tiers with different constraints:
9//!
10//! ## Fiber Tier (Phase 0)
11//!
12//! - Single-thread, borrow-friendly execution
13//! - Can capture borrowed references (`&T`) since no migration
14//! - Implemented via `spawn_local` (currently requires Send bounds; relaxed in Phase 1+)
15//!
16//! ## Task Tier (Phase 1+)
17//!
18//! - Multi-threaded, `Send` tasks that may migrate across workers
19//! - **Must capture only `Send + 'static` data** by construction
20//! - Can reference region-owned data via [`RRef<T>`](crate::types::rref::RRef)
21//!
22//! # Soundness Rules for Send Tasks
23//!
24//! The [`spawn`](Scope::spawn) method enforces the following bounds:
25//!
26//! | Component | Bound | Rationale |
27//! |-----------|-------|-----------|
28//! | Factory | `F: Send + 'static` | Factory may be called on any worker |
29//! | Future | `Fut: Send + 'static` | Task may migrate between polls |
30//! | Output | `Fut::Output: Send + 'static` | Result sent to potentially different thread |
31//!
32//! ## What Can Be Captured
33//!
34//! **Allowed captures in Send tasks:**
35//! - Owned `'static` data that is `Send` (e.g., `String`, `Vec<T>`, `Arc<T>`)
36//! - [`RRef<T>`](crate::types::rref::RRef) handles to region-heap-allocated data
37//! - Atomic types (`AtomicU64`, etc.)
38//! - Clone'd `Cx` (the capability context)
39//!
40//! **Disallowed captures:**
41//! - Borrowed references (`&T`, `&mut T`) - not `'static`
42//! - `Rc<T>`, `RefCell<T>` - not `Send`
43//! - Raw pointers (unless wrapped in a `Send` type)
44//! - References to stack-local data
45//!
46//! ## RRef for Region-Owned Data
47//!
48//! When tasks need to share data within a region without cloning, use the region
49//! heap and [`RRef<T>`](crate::types::rref::RRef):
50//!
51//! ```ignore
52//! // Allocate in region heap
53//! let index = region.heap_alloc(expensive_data);
54//! let rref = RRef::<ExpensiveData>::new(region_id, index);
55//!
56//! // Pass RRef to task - it's Copy + Send
57//! scope.spawn(state, &cx, move |cx| async move {
58//!     // Access via region record (requires runtime lookup)
59//!     let data = rref.get_via_region(&region_record)?;
60//!     process(data).await
61//! });
62//! ```
63//!
64//! # Compile-Time Enforcement
65//!
66//! The bounds are enforced at compile time. Attempting to capture non-Send
67//! or non-static data will result in a compilation error:
68//!
69//! ```compile_fail
70//! use std::rc::Rc;
71//! use asupersync::cx::Scope;
72//!
73//! fn try_capture_rc(scope: &Scope, state: &mut RuntimeState, cx: &Cx) {
74//!     let rc = Rc::new(42); // Rc is !Send
75//!     scope.spawn(state, cx, move |_| async move {
76//!         println!("{}", rc); // ERROR: Rc is not Send
77//!     });
78//! }
79//! ```
80//!
81//! ```compile_fail
82//! use asupersync::cx::Scope;
83//!
84//! fn try_capture_borrow(scope: &Scope, state: &mut RuntimeState, cx: &Cx) {
85//!     let local = 42;
86//!     let reference = &local; // Borrowed, not 'static
87//!     scope.spawn(state, cx, move |_| async move {
88//!         println!("{}", reference); // ERROR: borrowed data not 'static
89//!     });
90//! }
91//! ```
92//!
93//! # Lab Runtime Compatibility
94//!
95//! The Send bounds do not affect lab runtime determinism. The lab runtime
96//! simulates multi-worker scheduling deterministically (same seed = same
97//! execution), regardless of whether tasks are actually migrated.
98
99use crate::channel::oneshot;
100use crate::combinator::{Either, Select};
101use crate::cx::{Cx, cap};
102use crate::record::{AdmissionError, TaskRecord};
103use crate::runtime::task_handle::{JoinError, TaskHandle};
104use crate::runtime::{RegionCreateError, RuntimeState, SpawnError, StoredTask};
105use crate::tracing_compat::{debug, debug_span};
106use crate::types::{Budget, CancelReason, Outcome, PanicPayload, Policy, RegionId, TaskId};
107use std::future::Future;
108use std::marker::PhantomData;
109use std::pin::Pin;
110use std::sync::Arc;
111use std::task::{Context, Poll};
112
113/// A scope for spawning work within a region.
114///
115/// The scope provides methods for:
116/// - Spawning tasks
117/// - Creating child regions
118/// - Registering finalizers
119/// - Cancelling all children
120pub struct Scope<'r, P: Policy = crate::types::policy::FailFast> {
121    /// The region this scope belongs to.
122    pub(crate) region: RegionId,
123    /// The budget for this scope.
124    pub(crate) budget: Budget,
125    /// Phantom data for the policy type.
126    pub(crate) _policy: PhantomData<&'r P>,
127}
128
129#[pin_project::pin_project]
130pub(crate) struct CatchUnwind<F> {
131    #[pin]
132    pub(crate) inner: F,
133}
134
135impl<F: Future> Future for CatchUnwind<F> {
136    type Output = std::thread::Result<F::Output>;
137
138    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139        let mut this = self.project();
140        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
141            this.inner.as_mut().poll(cx)
142        }));
143        match result {
144            Ok(Poll::Pending) => Poll::Pending,
145            Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
146            Err(payload) => Poll::Ready(Err(payload)),
147        }
148    }
149}
150
151pub(crate) fn payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
152    payload
153        .downcast_ref::<&str>()
154        .map(ToString::to_string)
155        .or_else(|| payload.downcast_ref::<String>().cloned())
156        .unwrap_or_else(|| "unknown panic".to_string())
157}
158
159struct RegionRunner<'a, Fut> {
160    fut: Pin<&'a mut CatchUnwind<Fut>>,
161    state: Option<&'a mut RuntimeState>,
162    child_region: RegionId,
163}
164
165impl<'a, Fut: Future> Future for RegionRunner<'a, Fut> {
166    type Output = (std::thread::Result<Fut::Output>, &'a mut RuntimeState);
167
168    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
169        let this = self.get_mut();
170        match this.fut.as_mut().poll(cx) {
171            Poll::Ready(res) => {
172                let state = this.state.take().expect("polled after ready");
173                Poll::Ready((res, state))
174            }
175            Poll::Pending => Poll::Pending,
176        }
177    }
178}
179
180impl<Fut> Drop for RegionRunner<'_, Fut> {
181    fn drop(&mut self) {
182        if let Some(state) = self.state.take() {
183            let reason = CancelReason::fail_fast().with_region(self.child_region);
184            let _ = state.cancel_request(self.child_region, &reason, None);
185            if let Some(region) = state.region(self.child_region) {
186                region.begin_close(None);
187            }
188            state.advance_region_state(self.child_region);
189        }
190    }
191}
192
193struct RegionCloseFuture {
194    state: Arc<parking_lot::Mutex<crate::record::region::RegionCloseState>>,
195}
196
197impl Future for RegionCloseFuture {
198    type Output = ();
199
200    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
201        let mut state = self.state.lock();
202        if state.closed {
203            Poll::Ready(())
204        } else {
205            if !state
206                .waker
207                .as_ref()
208                .is_some_and(|w| w.will_wake(cx.waker()))
209            {
210                state.waker = Some(cx.waker().clone());
211            }
212            Poll::Pending
213        }
214    }
215}
216
217impl Drop for RegionCloseFuture {
218    fn drop(&mut self) {
219        let mut state = self.state.lock();
220        state.waker = None;
221    }
222}
223
224impl<P: Policy> Scope<'_, P> {
225    /// Creates a new scope (internal use).
226    #[must_use]
227    #[allow(dead_code)]
228    #[cfg_attr(feature = "test-internals", visibility::make(pub))]
229    pub(crate) fn new(region: RegionId, budget: Budget) -> Self {
230        Self {
231            region,
232            budget,
233            _policy: PhantomData,
234        }
235    }
236
237    /// Returns the region ID for this scope.
238    #[must_use]
239    pub fn region_id(&self) -> RegionId {
240        self.region
241    }
242
243    /// Returns the budget for this scope.
244    #[must_use]
245    pub fn budget(&self) -> Budget {
246        self.budget
247    }
248
249    // =========================================================================
250    // Task Spawning
251    // =========================================================================
252
253    /// Spawns a new task within this scope's region.
254    ///
255    /// This is the **Task Tier** spawn method for parallel execution. The task
256    /// may migrate between worker threads, so all captured data must be thread-safe.
257    ///
258    /// The task will be owned by the region and will be cancelled if the
259    /// region is cancelled. The returned `TaskHandle` can be used to await
260    /// the task's result.
261    ///
262    /// # Arguments
263    ///
264    /// * `state` - The runtime state
265    /// * `cx` - The capability context (used for tracing/authorization)
266    /// * `f` - A closure that produces the future, receiving the new task's `Cx`
267    ///
268    /// # Returns
269    ///
270    /// A `TaskHandle<T>` that can be used to await the task's result.
271    ///
272    /// # Soundness Rules (Type Bounds)
273    ///
274    /// The following bounds encode the soundness rules for Send tasks:
275    ///
276    /// * `F: FnOnce(Cx) -> Fut + Send + 'static` - Factory called on any worker
277    /// * `Fut: Future + Send + 'static` - Task may migrate between polls
278    /// * `Fut::Output: Send + 'static` - Result crosses thread boundary
279    ///
280    /// These bounds ensure captured data can safely cross thread boundaries.
281    /// Use [`RRef<T>`](crate::types::rref::RRef) for region-heap-allocated data.
282    ///
283    /// # Allowed Captures
284    ///
285    /// | Type | Allowed | Reason |
286    /// |------|---------|--------|
287    /// | `String`, `Vec<T>`, owned data | ✅ | Send + 'static by ownership |
288    /// | `Arc<T>` where T: Send + Sync | ✅ | Thread-safe shared ownership |
289    /// | `RRef<T>` | ✅ | Region-heap reference, Copy + Send |
290    /// | `Cx` (cloned) | ✅ | Capability context is Send + Sync |
291    /// | `Rc<T>`, `RefCell<T>` | ❌ | Not Send |
292    /// | `&T`, `&mut T` | ❌ | Not 'static |
293    ///
294    /// # Example
295    ///
296    /// ```ignore
297    /// let handle = scope.spawn(&mut state, &cx, |cx| async move {
298    ///     cx.trace("Child task running");
299    ///     compute_value().await
300    /// });
301    ///
302    /// let result = handle.join(&cx).await?;
303    /// ```
304    ///
305    /// # Example with RRef
306    ///
307    /// ```ignore
308    /// // Allocate expensive data in region heap
309    /// let index = region_record.heap_alloc(vec![1, 2, 3, 4, 5]);
310    /// let rref = RRef::<Vec<i32>>::new(region_id, index);
311    ///
312    /// // RRef is Copy + Send, can be captured by multiple tasks
313    /// scope.spawn(&mut state, &cx, move |cx| async move {
314    ///     // Would access via runtime state in real code
315    ///     process_data(rref).await
316    /// });
317    /// ```
318    ///
319    /// # Compile-Time Errors
320    ///
321    /// Attempting to capture `!Send` types fails at compile time:
322    ///
323    /// ```compile_fail,E0277
324    /// # // This test demonstrates that Rc cannot be captured
325    /// use std::rc::Rc;
326    /// fn require_send<T: Send>(_: &T) {}
327    /// fn test_rc_rejected<'r, P: asupersync::types::Policy>(
328    ///     scope: &asupersync::cx::Scope<'r, P>,
329    ///     state: &mut asupersync::runtime::RuntimeState,
330    ///     cx: &asupersync::cx::Cx,
331    /// ) {
332    ///     let rc = Rc::new(42);
333    ///     require_send(&rc);
334    ///     let _ = scope.spawn(state, cx, move |_| async move {
335    ///         let _ = rc;  // Rc<i32> is not Send
336    ///     });
337    /// }
338    /// ```
339    ///
340    /// Attempting to capture non-`'static` references fails:
341    ///
342    /// ```compile_fail,E0597
343    /// # // This test demonstrates that borrowed data cannot be captured
344    /// fn require_static<T: 'static>(_: T) {}
345    /// fn test_borrow_rejected<'r, P: asupersync::types::Policy>(
346    ///     scope: &asupersync::cx::Scope<'r, P>,
347    ///     state: &mut asupersync::runtime::RuntimeState,
348    ///     cx: &asupersync::cx::Cx,
349    /// ) {
350    ///     let local = 42;
351    ///     let borrow = &local;
352    ///     require_static(borrow);
353    ///     let _ = scope.spawn(state, cx, move |_| async move {
354    ///         let _ = borrow;  // &i32 is not 'static
355    ///     });
356    /// }
357    /// ```
358    pub fn spawn<F, Fut, Caps>(
359        &self,
360        state: &mut RuntimeState,
361        cx: &Cx<Caps>,
362        f: F,
363    ) -> Result<(TaskHandle<Fut::Output>, StoredTask), SpawnError>
364    where
365        Caps: cap::HasSpawn + Send + Sync + 'static,
366        F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
367        Fut: Future + Send + 'static,
368        Fut::Output: Send + 'static,
369    {
370        // Create oneshot channel for result delivery
371        let (tx, rx) = oneshot::channel::<Result<Fut::Output, JoinError>>();
372
373        // Create task record
374        let task_id = self.create_task_record(state)?;
375
376        // Trace task spawn event
377        let _span = debug_span!(
378            "task_spawn",
379            task_id = ?task_id,
380            region_id = ?self.region,
381            initial_state = "Created",
382            budget_deadline = ?self.budget.deadline,
383            budget_poll_quota = self.budget.poll_quota,
384            budget_cost_quota = ?self.budget.cost_quota,
385            budget_priority = self.budget.priority,
386            budget_source = "scope"
387        )
388        .entered();
389        debug!(
390            task_id = ?task_id,
391            region_id = ?self.region,
392            initial_state = "Created",
393            budget_deadline = ?self.budget.deadline,
394            budget_poll_quota = self.budget.poll_quota,
395            budget_cost_quota = ?self.budget.cost_quota,
396            budget_priority = self.budget.priority,
397            budget_source = "scope",
398            "task spawned"
399        );
400
401        let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
402
403        // Create the TaskHandle
404        let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
405
406        // Set the shared inner state in the TaskRecord
407        // This links the user-facing Cx to the runtime's TaskRecord
408        if let Some(record) = state.task_mut(task_id) {
409            record.set_cx_inner(child_cx.inner.clone());
410            record.set_cx(child_cx_full.clone());
411        }
412
413        // Capture child_cx for result sending
414        let cx_for_send = child_cx_full;
415
416        // Instantiate the future with the child context.
417        // We use a guard to rollback task creation if the factory panics.
418        // This prevents zombie tasks (recorded but never started) which would
419        // cause the region to never close (deadlock).
420        let future = {
421            struct TaskCreationGuard<'a> {
422                state: &'a mut RuntimeState,
423                task_id: TaskId,
424                region_id: RegionId,
425                committed: bool,
426            }
427
428            impl Drop for TaskCreationGuard<'_> {
429                fn drop(&mut self) {
430                    if !self.committed {
431                        // Rollback task creation
432                        if let Some(region) = self.state.region_mut(self.region_id) {
433                            region.remove_task(self.task_id);
434                        }
435                        self.state.remove_task(self.task_id);
436                    }
437                }
438            }
439
440            let mut guard = TaskCreationGuard {
441                state,
442                task_id,
443                region_id: self.region,
444                committed: false,
445            };
446
447            let fut = f(child_cx);
448            guard.committed = true;
449            fut
450        };
451
452        // Wrap the future to send its result through the channel
453        // We use CatchUnwind to ensure panics are propagated as JoinError::Panicked
454        // rather than silent channel closure (which looks like cancellation).
455        let wrapped = async move {
456            let result_result = CatchUnwind { inner: future }.await;
457            match result_result {
458                Ok(result) => {
459                    let _ = tx.send(&cx_for_send, Ok(result));
460                    crate::types::Outcome::Ok(())
461                }
462                Err(payload) => {
463                    let msg = payload_to_string(&payload);
464                    let panic_payload = PanicPayload::new(msg);
465                    let _ = tx.send(
466                        &cx_for_send,
467                        Err(JoinError::Panicked(panic_payload.clone())),
468                    );
469                    crate::types::Outcome::Panicked(panic_payload)
470                }
471            }
472        };
473
474        // Create stored task with task_id for poll tracing
475        let stored = StoredTask::new_with_id(wrapped, task_id);
476
477        Ok((handle, stored))
478    }
479
480    /// Spawns a Send task (explicit Task Tier API).
481    ///
482    /// This is an explicit alias for [`spawn`](Self::spawn) that makes the
483    /// execution tier clear in the API. Use this when you want to emphasize
484    /// that the task may migrate between workers.
485    ///
486    /// # Type Bounds (Soundness Rules)
487    ///
488    /// Same as [`spawn`](Self::spawn):
489    /// - `F: FnOnce(Cx) -> Fut + Send + 'static`
490    /// - `Fut: Future + Send + 'static`
491    /// - `Fut::Output: Send + 'static`
492    ///
493    /// # Example
494    ///
495    /// ```ignore
496    /// // Explicit task tier spawn
497    /// let (handle, stored) = scope.spawn_task(&mut state, &cx, |cx| async move {
498    ///     // This task may run on any worker
499    ///     compute_parallel().await
500    /// })?;
501    /// ```
502    #[inline]
503    pub fn spawn_task<F, Fut, Caps>(
504        &self,
505        state: &mut RuntimeState,
506        cx: &Cx<Caps>,
507        f: F,
508    ) -> Result<(TaskHandle<Fut::Output>, StoredTask), SpawnError>
509    where
510        Caps: cap::HasSpawn + Send + Sync + 'static,
511        F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
512        Fut: Future + Send + 'static,
513        Fut::Output: Send + 'static,
514    {
515        self.spawn(state, cx, f)
516    }
517
518    /// Spawns a task and registers it with the runtime state.
519    ///
520    /// This is a convenience method that combines `spawn()` with
521    /// `RuntimeState::store_spawned_task()`. It's the primary method
522    /// used by the `spawn!` macro.
523    ///
524    /// # Arguments
525    ///
526    /// * `state` - The runtime state (for storing the task)
527    /// * `cx` - The capability context (for creating child context)
528    /// * `f` - A closure that produces the future, receiving the new task's `Cx`
529    ///
530    /// # Returns
531    ///
532    /// A `TaskHandle<T>` for awaiting the task's result.
533    ///
534    /// # Example
535    ///
536    /// ```ignore
537    /// let handle = scope.spawn_registered(&mut state, &cx, |cx| async move {
538    ///     cx.trace("Child task running");
539    ///     compute_value().await
540    /// })?;
541    ///
542    /// let result = handle.join(&cx).await?;
543    /// ```
544    pub fn spawn_registered<F, Fut, Caps>(
545        &self,
546        state: &mut RuntimeState,
547        cx: &Cx<Caps>,
548        f: F,
549    ) -> Result<TaskHandle<Fut::Output>, SpawnError>
550    where
551        Caps: cap::HasSpawn + Send + Sync + 'static,
552        F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
553        Fut: Future + Send + 'static,
554        Fut::Output: Send + 'static,
555    {
556        let (handle, stored) = self.spawn(state, cx, f)?;
557        state.store_spawned_task(handle.task_id(), stored);
558        Ok(handle)
559    }
560
561    /// Spawns a local (non-Send) task within this scope's region (**Fiber Tier**).
562    ///
563    /// This is the **Fiber Tier** spawn method. Local tasks are pinned to the
564    /// current worker thread and cannot be stolen by other workers. This enables
565    /// borrow-friendly execution with `!Send` types like `Rc` or `RefCell`.
566    ///
567    /// # Execution Tier: Fiber
568    ///
569    /// | Property | Value |
570    /// |----------|-------|
571    /// | Migration | Never (thread-pinned) |
572    /// | Send bound | Not required |
573    /// | Borrowing | Requires `'static` (no local `&T`) |
574    /// | Use case | `!Send` types, borrowed data |
575    ///
576    /// # Arguments
577    ///
578    /// * `state` - The runtime state
579    /// * `cx` - The capability context
580    /// * `f` - A closure that produces the future, receiving the new task's `Cx`
581    ///
582    /// # Panics
583    ///
584    /// Panics if called from a blocking thread (spawn_blocking context).
585    ///
586    /// # Example
587    ///
588    /// ```ignore
589    /// use std::rc::Rc;
590    /// use std::cell::RefCell;
591    ///
592    /// let counter = Rc::new(RefCell::new(0));
593    /// let counter_clone = counter.clone();
594    ///
595    /// let handle = scope.spawn_local(&mut state, &cx, |cx| async move {
596    ///     // Rc<RefCell<_>> is !Send but allowed in local tasks
597    ///     *counter_clone.borrow_mut() += 1;
598    /// });
599    /// ```
600    #[allow(clippy::too_many_lines)]
601    pub fn spawn_local<F, Fut, Caps>(
602        &self,
603        state: &mut RuntimeState,
604        cx: &Cx<Caps>,
605        f: F,
606    ) -> Result<TaskHandle<Fut::Output>, SpawnError>
607    where
608        Caps: cap::HasSpawn + Send + Sync + 'static,
609        F: FnOnce(Cx<Caps>) -> Fut + 'static,
610        Fut: Future + 'static,
611        Fut::Output: Send + 'static,
612    {
613        use crate::runtime::stored_task::LocalStoredTask;
614        use crate::runtime::task_handle::JoinError;
615
616        // Create oneshot channel for result delivery
617        let (result_tx, rx) = oneshot::channel::<Result<Fut::Output, JoinError>>();
618
619        // Create task record
620        let task_id = self.create_task_record(state)?;
621
622        // Trace task spawn event
623        let _span = debug_span!(
624            "task_spawn",
625            task_id = ?task_id,
626            region_id = ?self.region,
627            initial_state = "Created",
628            budget_deadline = ?self.budget.deadline,
629            budget_poll_quota = self.budget.poll_quota,
630            budget_cost_quota = ?self.budget.cost_quota,
631            budget_priority = self.budget.priority,
632            budget_source = "scope_local"
633        )
634        .entered();
635        debug!(
636            task_id = ?task_id,
637            region_id = ?self.region,
638            initial_state = "Created",
639            budget_deadline = ?self.budget.deadline,
640            budget_poll_quota = self.budget.poll_quota,
641            budget_cost_quota = ?self.budget.cost_quota,
642            budget_priority = self.budget.priority,
643            budget_source = "scope_local",
644            "local task spawned"
645        );
646
647        let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
648
649        // Create the TaskHandle
650        let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
651
652        // Set the shared inner state in the TaskRecord
653        if let Some(record) = state.task_mut(task_id) {
654            record.set_cx_inner(child_cx.inner.clone());
655            record.set_cx(child_cx_full.clone());
656        }
657
658        // Capture child_cx for result sending
659        let cx_for_send = child_cx_full;
660
661        // Instantiate the future with the child context.
662        // We use a guard to rollback task creation if the factory panics.
663        let future = {
664            struct TaskCreationGuard<'a> {
665                state: &'a mut RuntimeState,
666                task_id: TaskId,
667                region_id: RegionId,
668                committed: bool,
669            }
670
671            impl Drop for TaskCreationGuard<'_> {
672                fn drop(&mut self) {
673                    if !self.committed {
674                        // Rollback task creation
675                        if let Some(region) = self.state.region_mut(self.region_id) {
676                            region.remove_task(self.task_id);
677                        }
678                        self.state.remove_task(self.task_id);
679                    }
680                }
681            }
682
683            let mut guard = TaskCreationGuard {
684                state,
685                task_id,
686                region_id: self.region,
687                committed: false,
688            };
689
690            let fut = f(child_cx);
691            guard.committed = true;
692            fut
693        };
694
695        // Wrap the future to send its result through the channel
696        let wrapped = async move {
697            let result_result = CatchUnwind { inner: future }.await;
698            match result_result {
699                Ok(result) => {
700                    let _ = result_tx.send(&cx_for_send, Ok(result));
701                    crate::types::Outcome::Ok(())
702                }
703                Err(payload) => {
704                    let msg = payload_to_string(&payload);
705                    let panic_payload = PanicPayload::new(msg);
706                    let _ = result_tx.send(
707                        &cx_for_send,
708                        Err(JoinError::Panicked(panic_payload.clone())),
709                    );
710                    crate::types::Outcome::Panicked(panic_payload)
711                }
712            }
713        };
714
715        // Create local stored task
716        let stored = LocalStoredTask::new_with_id(wrapped, task_id);
717
718        // Store in thread-local storage
719        crate::runtime::local::store_local_task(task_id, stored);
720
721        // Mark the task record as local so that safety guards in the scheduler
722        // (inject_ready panic, try_steal debug_assert) can detect accidental
723        // cross-thread migration of !Send futures.
724        if let Some(record) = state.task_mut(task_id) {
725            if let Some(worker_id) = crate::runtime::scheduler::three_lane::current_worker_id() {
726                record.pin_to_worker(worker_id);
727            } else {
728                record.mark_local();
729            }
730            record.wake_state.notify();
731        }
732
733        // Schedule the task on the current worker's NON-STEALABLE local scheduler.
734        // spawn_local tasks MUST NOT be stealable.
735        let scheduled = crate::runtime::scheduler::three_lane::schedule_local_task(task_id);
736
737        if scheduled {
738            if let Some(record) = state.task(task_id) {
739                let _ = record.wake_state.notify();
740            }
741            return Ok(handle);
742        }
743
744        // No local scheduler available: rollback to avoid a permanently parked task.
745        let _ = crate::runtime::local::remove_local_task(task_id);
746        if let Some(region) = state.region(self.region) {
747            region.remove_task(task_id);
748        }
749        state.remove_task(task_id);
750        Err(SpawnError::LocalSchedulerUnavailable)
751    }
752
753    /// Spawns a blocking operation on a dedicated thread pool.
754    ///
755    /// This is used for CPU-bound or legacy synchronous operations that
756    /// should not block async workers. The closure runs on a separate
757    /// thread pool designed for blocking work.
758    ///
759    /// # Arguments
760    ///
761    /// * `state` - The runtime state
762    /// * `cx` - The capability context
763    /// * `f` - The blocking closure to run, receiving a context
764    ///
765    /// # Type Bounds
766    ///
767    /// * `F: FnOnce(Cx) -> R + Send + 'static` - The closure must be Send
768    /// * `R: Send + 'static` - The result must be Send
769    ///
770    /// # Example
771    ///
772    /// ```ignore
773    /// let (handle, stored) = scope.spawn_blocking(&mut state, &cx, |cx| {
774    ///     cx.trace("Starting blocking work");
775    ///     // CPU-intensive work
776    ///     expensive_computation()
777    /// });
778    ///
779    /// let result = handle.join(&cx).await?;
780    /// ```
781    ///
782    /// # Note
783    ///
784    /// In Phase 0 (single-threaded), blocking operations run inline.
785    /// A proper blocking pool is implemented in Phase 1+.
786    pub fn spawn_blocking<F, R, Caps>(
787        &self,
788        state: &mut RuntimeState,
789        cx: &Cx<Caps>, // Parent Cx
790        f: F,
791    ) -> Result<(TaskHandle<R>, StoredTask), SpawnError>
792    where
793        Caps: cap::HasSpawn + Send + Sync + 'static,
794        F: FnOnce(Cx<Caps>) -> R + Send + 'static,
795        R: Send + 'static,
796    {
797        // Create oneshot channel for result delivery
798        let (tx, rx) = oneshot::channel::<Result<R, JoinError>>();
799
800        // Create task record
801        let task_id = self.create_task_record(state)?;
802
803        // Trace task spawn event
804        debug!(
805            task_id = ?task_id,
806            region_id = ?self.region,
807            initial_state = "Created",
808            poll_quota = self.budget.poll_quota,
809            spawn_kind = "blocking",
810            "blocking task spawned"
811        );
812
813        let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
814
815        // Create the TaskHandle
816        let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
817
818        // Set the shared inner state in the TaskRecord
819        if let Some(record) = state.task_mut(task_id) {
820            record.set_cx_inner(child_cx.inner.clone());
821            record.set_cx(child_cx_full.clone());
822        }
823
824        // Capture child_cx for result sending
825        let cx_for_send = child_cx_full;
826
827        // For Phase 0, we run blocking code as an async task
828        // In Phase 1+, this would spawn on a blocking thread pool
829        let wrapped = async move {
830            // Execute the blocking closure with child context
831            // Catch panics to report them correctly
832            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(child_cx)));
833            match result {
834                Ok(res) => {
835                    let _ = tx.send(&cx_for_send, Ok(res));
836                    crate::types::Outcome::Ok(())
837                }
838                Err(payload) => {
839                    let msg = payload_to_string(&payload);
840                    let panic_payload = PanicPayload::new(msg);
841                    let _ = tx.send(
842                        &cx_for_send,
843                        Err(JoinError::Panicked(panic_payload.clone())),
844                    );
845                    crate::types::Outcome::Panicked(panic_payload)
846                }
847            }
848        };
849
850        let stored = StoredTask::new_with_id(wrapped, task_id);
851
852        Ok((handle, stored))
853    }
854
855    // =========================================================================
856    // Child Regions
857    // =========================================================================
858
859    /// Creates a child region and runs the provided future within a child scope.
860    ///
861    /// The child region inherits the parent's budget by default. Use
862    /// [`Scope::region_with_budget`] to tighten constraints for the child.
863    ///
864    /// The returned outcome is the result of the body future. After the body
865    /// completes, the child region begins its close sequence and advances until
866    /// it can close (assuming all child tasks have completed and obligations are resolved).
867    ///
868    /// # Errors
869    ///
870    /// Returns [`RegionCreateError`] if the parent is closed, missing, or at capacity.
871    pub async fn region<P2, F, Fut, T, Caps>(
872        &self,
873        state: &mut RuntimeState,
874        cx: &Cx<Caps>,
875        policy: P2,
876        f: F,
877    ) -> Result<Outcome<T, P2::Error>, RegionCreateError>
878    where
879        P2: Policy,
880        F: FnOnce(Scope<'_, P2>, &mut RuntimeState) -> Fut,
881        Fut: Future<Output = Outcome<T, P2::Error>>,
882    {
883        self.region_with_budget(state, cx, self.budget, policy, f)
884            .await
885    }
886
887    /// Creates a child region with an explicit budget (met with the parent budget).
888    ///
889    /// The effective budget is `parent.meet(child)` to ensure nested scopes can
890    /// never relax constraints.
891    pub async fn region_with_budget<P2, F, Fut, T, Caps>(
892        &self,
893        state: &mut RuntimeState,
894        _cx: &Cx<Caps>,
895        budget: Budget,
896        _policy: P2,
897        f: F,
898    ) -> Result<Outcome<T, P2::Error>, RegionCreateError>
899    where
900        P2: Policy,
901        F: FnOnce(Scope<'_, P2>, &mut RuntimeState) -> Fut,
902        Fut: Future<Output = Outcome<T, P2::Error>>,
903    {
904        let child_region = state.create_child_region(self.region, budget)?;
905        let child_budget = state
906            .region(child_region)
907            .map_or(self.budget, crate::record::RegionRecord::budget);
908        let child_scope = Scope::<P2>::new(child_region, child_budget);
909
910        let fut_result =
911            std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(child_scope, &mut *state)));
912
913        let fut = match fut_result {
914            Ok(fut) => fut,
915            Err(payload) => {
916                let reason = CancelReason::fail_fast().with_region(child_region);
917                let _ = state.cancel_request(child_region, &reason, None);
918                if let Some(region) = state.region(child_region) {
919                    region.begin_close(None);
920                }
921                state.advance_region_state(child_region);
922                std::panic::resume_unwind(payload);
923            }
924        };
925
926        let pinned_fut = std::pin::pin!(CatchUnwind { inner: fut });
927
928        let runner = RegionRunner {
929            fut: pinned_fut,
930            state: Some(state),
931            child_region,
932        };
933
934        let (result, state) = runner.await;
935        let outcome = match result {
936            Ok(outcome) => outcome,
937            Err(payload) => {
938                let msg = payload_to_string(&payload);
939                Outcome::Panicked(PanicPayload::new(msg))
940            }
941        };
942
943        match &outcome {
944            Outcome::Ok(_) => {
945                if let Some(region) = state.region(child_region) {
946                    region.begin_close(None);
947                }
948            }
949            Outcome::Cancelled(reason) => {
950                let _ = state.cancel_request(child_region, reason, None);
951                if let Some(region) = state.region(child_region) {
952                    region.begin_close(None);
953                }
954            }
955            Outcome::Err(_) | Outcome::Panicked(_) => {
956                let reason = CancelReason::fail_fast().with_region(child_region);
957                let _ = state.cancel_request(child_region, &reason, None);
958                if let Some(region) = state.region(child_region) {
959                    region.begin_close(None);
960                }
961            }
962        }
963
964        let close_notify = state.region(child_region).map(|r| r.close_notify.clone());
965        state.advance_region_state(child_region);
966
967        if let Some(notify) = close_notify {
968            RegionCloseFuture { state: notify }.await;
969        }
970
971        Ok(outcome)
972    }
973
974    // =========================================================================
975    // Combinators
976    // =========================================================================
977
978    /// Joins two tasks, waiting for both to complete.
979    ///
980    /// This method waits for both tasks to complete, regardless of their outcome.
981    /// It returns a tuple of results.
982    ///
983    /// # Example
984    /// ```ignore
985    /// let (h1, _) = scope.spawn(...);
986    /// let (h2, _) = scope.spawn(...);
987    /// let (r1, r2) = scope.join(cx, h1, h2).await;
988    /// ```
989    pub async fn join<T1, T2>(
990        &self,
991        cx: &Cx,
992        mut h1: TaskHandle<T1>,
993        mut h2: TaskHandle<T2>,
994    ) -> (Result<T1, JoinError>, Result<T2, JoinError>) {
995        let mut f1 = h1.join(cx);
996        let mut f2 = h2.join(cx);
997        let r1 = std::pin::Pin::new(&mut f1).await;
998        let r2 = std::pin::Pin::new(&mut f2).await;
999        (r1, r2)
1000    }
1001
1002    /// Races two tasks, waiting for the first to complete.
1003    ///
1004    /// The loser is cancelled and drained (awaited until it completes cancellation).
1005    ///
1006    /// # Example
1007    /// ```ignore
1008    /// let (h1, _) = scope.spawn(...);
1009    /// let (h2, _) = scope.spawn(...);
1010    /// match scope.race(cx, h1, h2).await {
1011    ///     Ok(val) => println!("Winner result: {val}"),
1012    ///     Err(e) => println!("Race failed: {e}"),
1013    /// }
1014    /// ```
1015    fn best_effort_poll_loser_join<T>(cx: &Cx, handle: &mut TaskHandle<T>) {
1016        let mut drain = std::pin::pin!(handle.join(cx));
1017        let waker = std::task::Waker::noop();
1018        let mut poll_cx = std::task::Context::from_waker(waker);
1019        let _ = drain.as_mut().poll(&mut poll_cx);
1020    }
1021
1022    /// Races two task handles and returns the winner while draining the loser.
1023    pub async fn race<T>(
1024        &self,
1025        cx: &Cx,
1026        mut h1: TaskHandle<T>,
1027        mut h2: TaskHandle<T>,
1028    ) -> Result<T, JoinError> {
1029        let winner = {
1030            let f1 = h1.join_with_drop_reason(cx, CancelReason::race_loser());
1031            let mut f1 = std::pin::pin!(f1);
1032            let f2 = h2.join_with_drop_reason(cx, CancelReason::race_loser());
1033            let mut f2 = std::pin::pin!(f2);
1034            Select::new(f1.as_mut(), f2.as_mut())
1035                .await
1036                .map_err(|_| JoinError::PolledAfterCompletion)?
1037        };
1038
1039        match winner {
1040            Either::Left(res) => {
1041                if matches!(&res, Err(JoinError::Panicked(_)))
1042                    && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1043                {
1044                    // In direct block_on tests there is no scheduler driving the
1045                    // loser task after the winner panic surfaces. Best-effort poll
1046                    // once so a cooperative loser can observe cancellation, then
1047                    // preserve the winner panic without deadlocking the test.
1048                    Self::best_effort_poll_loser_join(cx, &mut h2);
1049                    return res;
1050                }
1051                let loser_res = h2.join(cx).await;
1052                if let Err(JoinError::Panicked(p)) = res {
1053                    Err(JoinError::Panicked(p))
1054                } else if let Err(JoinError::Panicked(p)) = loser_res {
1055                    Err(JoinError::Panicked(p))
1056                } else {
1057                    res
1058                }
1059            }
1060            Either::Right(res) => {
1061                if matches!(&res, Err(JoinError::Panicked(_)))
1062                    && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1063                {
1064                    // See the left-branch comment above.
1065                    Self::best_effort_poll_loser_join(cx, &mut h1);
1066                    return res;
1067                }
1068                let loser_res = h1.join(cx).await;
1069                if let Err(JoinError::Panicked(p)) = res {
1070                    Err(JoinError::Panicked(p))
1071                } else if let Err(JoinError::Panicked(p)) = loser_res {
1072                    Err(JoinError::Panicked(p))
1073                } else {
1074                    res
1075                }
1076            }
1077        }
1078    }
1079
1080    /// Hedges a primary operation with a backup operation.
1081    ///
1082    /// 1. Spawns the primary task immediately.
1083    /// 2. Waits for the delay.
1084    /// 3. If primary finishes before delay: returns primary result.
1085    /// 4. If delay fires: spawns backup task and races them.
1086    ///
1087    /// The loser is cancelled and drained.
1088    ///
1089    /// # Arguments
1090    /// * `state` - The runtime state
1091    /// * `cx` - The capability context
1092    /// * `delay` - The hedge delay
1093    /// * `primary` - The primary future factory
1094    /// * `backup` - The backup future factory
1095    ///
1096    /// # Returns
1097    /// `Ok(T)` if successful, `Err(JoinError)` if failed/cancelled.
1098    pub async fn hedge<F1, Fut1, F2, Fut2, T>(
1099        &self,
1100        state: &mut RuntimeState,
1101        cx: &Cx,
1102        delay: std::time::Duration,
1103        primary: F1,
1104        backup: F2,
1105    ) -> Result<T, JoinError>
1106    where
1107        F1: FnOnce(Cx) -> Fut1 + Send + 'static,
1108        Fut1: Future<Output = T> + Send + 'static,
1109        F2: FnOnce(Cx) -> Fut2 + Send + 'static,
1110        Fut2: Future<Output = T> + Send + 'static,
1111        T: Send + 'static,
1112    {
1113        use crate::combinator::Either;
1114        use crate::combinator::select::Select;
1115        // 1. Spawn primary
1116        let mut h1 = self
1117            .spawn_registered(state, cx, primary)
1118            .map_err(|_| JoinError::Cancelled(CancelReason::resource_unavailable()))?;
1119
1120        // 2. Race primary vs delay.
1121        // Scope the pinned join future so we can safely reuse h1 afterwards.
1122        let primary_or_delay = {
1123            let f1_primary = h1.join(cx);
1124            let mut f1_primary = std::pin::pin!(f1_primary);
1125
1126            let now = cx
1127                .timer_driver()
1128                .map_or_else(crate::time::wall_now, |d| d.now());
1129            let sleep_fut = crate::time::sleep(now, delay);
1130            let mut sleep_pinned = std::pin::pin!(sleep_fut);
1131
1132            let res = Select::new(f1_primary.as_mut(), sleep_pinned.as_mut())
1133                .await
1134                .map_err(|_| JoinError::PolledAfterCompletion)?;
1135            if matches!(res, Either::Right(())) {
1136                f1_primary.defuse_drop_abort();
1137            }
1138            res
1139        };
1140
1141        match primary_or_delay {
1142            Either::Left(res) => {
1143                // Primary finished first
1144                res
1145            }
1146            Either::Right(()) => {
1147                // Timeout fired. Spawn backup.
1148                let Ok(mut h2) = self.spawn_registered(state, cx, backup) else {
1149                    // Backup admission failed after primary already started.
1150                    // Request cancellation on primary to avoid orphaned work.
1151                    h1.abort_with_reason(CancelReason::resource_unavailable());
1152
1153                    if crate::runtime::scheduler::three_lane::current_worker_id().is_some() {
1154                        // In scheduler-backed runtime execution, fully drain the
1155                        // cancelled primary before returning.
1156                        match h1.join(cx).await {
1157                            Ok(res) => return Ok(res),
1158                            Err(JoinError::Panicked(p)) => return Err(JoinError::Panicked(p)),
1159                            Err(JoinError::Cancelled(_) | JoinError::PolledAfterCompletion) => {}
1160                        }
1161                    } else {
1162                        // In no-scheduler contexts (e.g. direct unit-test block_on),
1163                        // full join can deadlock because nothing drives stored tasks.
1164                        // Keep this as best-effort and return promptly.
1165                        let mut drain = std::pin::pin!(h1.join(cx));
1166                        let waker = std::task::Waker::noop();
1167                        let mut poll_cx = Context::from_waker(waker);
1168                        match drain.as_mut().poll(&mut poll_cx) {
1169                            std::task::Poll::Ready(Ok(res)) => return Ok(res),
1170                            std::task::Poll::Ready(Err(JoinError::Panicked(p))) => {
1171                                return Err(JoinError::Panicked(p));
1172                            }
1173                            _ => {}
1174                        }
1175                    }
1176
1177                    return Err(JoinError::Cancelled(CancelReason::resource_unavailable()));
1178                };
1179
1180                // Now race h1 and h2 with bounded future borrows.
1181                let race_outcome = {
1182                    let f1_race = h1.join_with_drop_reason(cx, CancelReason::race_loser());
1183                    let mut f1_race = std::pin::pin!(f1_race);
1184                    let f2_race = h2.join_with_drop_reason(cx, CancelReason::race_loser());
1185                    let mut f2_race = std::pin::pin!(f2_race);
1186                    Select::new(f1_race.as_mut(), f2_race.as_mut())
1187                        .await
1188                        .map_err(|_| JoinError::PolledAfterCompletion)?
1189                };
1190
1191                match race_outcome {
1192                    Either::Left(res) => {
1193                        if matches!(&res, Err(JoinError::Panicked(_)))
1194                            && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1195                        {
1196                            Self::best_effort_poll_loser_join(cx, &mut h2);
1197                            return res;
1198                        }
1199                        let loser_res = h2.join(cx).await;
1200                        if let Err(JoinError::Panicked(p)) = res {
1201                            Err(JoinError::Panicked(p))
1202                        } else if let Err(JoinError::Panicked(p)) = loser_res {
1203                            Err(JoinError::Panicked(p))
1204                        } else {
1205                            res
1206                        }
1207                    }
1208                    Either::Right(res) => {
1209                        if matches!(&res, Err(JoinError::Panicked(_)))
1210                            && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1211                        {
1212                            Self::best_effort_poll_loser_join(cx, &mut h1);
1213                            return res;
1214                        }
1215                        let loser_res = h1.join(cx).await;
1216                        if let Err(JoinError::Panicked(p)) = res {
1217                            Err(JoinError::Panicked(p))
1218                        } else if let Err(JoinError::Panicked(p)) = loser_res {
1219                            Err(JoinError::Panicked(p))
1220                        } else {
1221                            res
1222                        }
1223                    }
1224                }
1225            }
1226        }
1227    }
1228
1229    /// Races multiple tasks, waiting for the first to complete.
1230    ///
1231    /// The winner's result is returned. Losers are cancelled and drained.
1232    ///
1233    /// # Arguments
1234    /// * `cx` - The capability context
1235    /// * `handles` - Vector of task handles to race
1236    ///
1237    /// # Returns
1238    /// `Ok((value, index))` if the winner succeeded.
1239    /// `Err(e)` if the winner failed (error/cancel/panic).
1240    pub async fn race_all<T>(
1241        &self,
1242        cx: &Cx,
1243        handles: Vec<TaskHandle<T>>,
1244    ) -> Result<(T, usize), JoinError> {
1245        let mut handles = handles;
1246        if handles.is_empty() {
1247            return std::future::pending().await;
1248        }
1249
1250        let mut futures: Vec<_> = handles
1251            .iter_mut()
1252            .map(|h| h.join_with_drop_reason(cx, CancelReason::race_loser()))
1253            .collect();
1254        let mut ready_results: Vec<Option<Result<T, JoinError>>> = std::iter::repeat_with(|| None)
1255            .take(futures.len())
1256            .collect();
1257
1258        // Poll every candidate in each round and keep all same-round ready
1259        // outcomes. This prevents losing loser panic outcomes when multiple
1260        // tasks become ready in the same poll.
1261        let winner_idx = std::future::poll_fn(|poll_cx| {
1262            let mut newly_ready = Vec::new();
1263
1264            for (i, future) in futures.iter_mut().enumerate() {
1265                if ready_results[i].is_some() {
1266                    continue;
1267                }
1268                if let std::task::Poll::Ready(res) = std::pin::Pin::new(future).poll(poll_cx) {
1269                    ready_results[i] = Some(res);
1270                    newly_ready.push(i);
1271                }
1272            }
1273
1274            if newly_ready.is_empty() {
1275                std::task::Poll::Pending
1276            } else {
1277                // Fairly select a winner among all that became ready in this round
1278                let chosen = newly_ready[cx.random_usize(newly_ready.len())];
1279                std::task::Poll::Ready(chosen)
1280            }
1281        })
1282        .await;
1283
1284        let winner_result = ready_results[winner_idx]
1285            .take()
1286            .expect("winner index must have a ready result");
1287
1288        // Release mutable borrows of handles held by JoinFuture values before
1289        // explicit loser cancellation/join.
1290        drop(futures);
1291
1292        // Drain completed losers first so terminal panic outcomes are not
1293        // obscured by strengthening cancellation reasons on already-finished tasks.
1294        let mut loser_panic = None;
1295        let mut pending_loser_indices = Vec::new();
1296        for (i, handle) in handles.iter_mut().enumerate() {
1297            if i == winner_idx {
1298                continue;
1299            }
1300            if let Some(res) = ready_results[i].take() {
1301                if let Err(JoinError::Panicked(p)) = res {
1302                    if loser_panic.is_none() {
1303                        loser_panic = Some(p);
1304                    }
1305                }
1306            } else if handle.is_finished() {
1307                let res = handle.join(cx).await;
1308                if let Err(JoinError::Panicked(p)) = res {
1309                    if loser_panic.is_none() {
1310                        loser_panic = Some(p);
1311                    }
1312                }
1313            } else {
1314                pending_loser_indices.push(i);
1315            }
1316        }
1317
1318        // Cancel and drain unfinished losers.
1319        // Note: Losers may also already have a race-loser reason from dropped
1320        // join futures; strengthening keeps attribution deterministic.
1321        for &idx in &pending_loser_indices {
1322            handles[idx].abort_with_reason(CancelReason::race_loser());
1323        }
1324        if matches!(&winner_result, Err(JoinError::Panicked(_)))
1325            && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1326        {
1327            // In direct block_on tests there is no scheduler driving pending
1328            // losers after the winner panic surfaces. Best-effort poll each
1329            // loser once so cooperative tasks can observe cancellation, then
1330            // preserve the winner panic without deadlocking the test.
1331            for idx in pending_loser_indices {
1332                Self::best_effort_poll_loser_join(cx, &mut handles[idx]);
1333            }
1334            return winner_result.map(|val| (val, winner_idx));
1335        }
1336        for idx in pending_loser_indices {
1337            let res = handles[idx].join(cx).await;
1338            if let Err(JoinError::Panicked(p)) = res {
1339                if loser_panic.is_none() {
1340                    loser_panic = Some(p);
1341                }
1342            }
1343        }
1344
1345        let winner_result = winner_result.map(|val| (val, winner_idx));
1346        if matches!(&winner_result, Err(JoinError::Panicked(_))) {
1347            return winner_result;
1348        }
1349
1350        loser_panic.map_or(winner_result, |panic_payload| {
1351            Err(JoinError::Panicked(panic_payload))
1352        })
1353    }
1354
1355    /// Joins multiple tasks, waiting for all to complete.
1356    ///
1357    /// Returns a vector of results in the same order as the input handles.
1358    pub async fn join_all<T>(
1359        &self,
1360        cx: &Cx,
1361        mut handles: Vec<TaskHandle<T>>,
1362    ) -> Vec<Result<T, JoinError>> {
1363        let mut futures: Vec<_> = handles.iter_mut().map(|h| h.join(cx)).collect();
1364        let mut results = Vec::with_capacity(futures.len());
1365        for fut in &mut futures {
1366            results.push(std::pin::Pin::new(fut).await);
1367        }
1368        results
1369    }
1370
1371    pub(crate) fn build_child_task_cx<Caps>(
1372        &self,
1373        state: &RuntimeState,
1374        parent_cx: &Cx<Caps>,
1375        task_id: TaskId,
1376    ) -> (Cx<Caps>, Cx<cap::All>) {
1377        let child_observability = parent_cx.child_observability(self.region, task_id);
1378        let child_entropy = parent_cx.child_entropy(task_id);
1379        let io_driver = state.io_driver_handle();
1380        let timer_driver = state.timer_driver_handle();
1381        let logical_clock = state
1382            .logical_clock_mode()
1383            .build_handle(timer_driver.clone());
1384
1385        let child_cx = Cx::<Caps>::new_with_drivers(
1386            self.region,
1387            task_id,
1388            self.budget,
1389            Some(child_observability),
1390            io_driver,
1391            parent_cx.io_cap_handle(),
1392            timer_driver,
1393            Some(child_entropy),
1394        )
1395        .with_logical_clock(logical_clock)
1396        .with_registry_handle(parent_cx.registry_handle())
1397        .with_remote_cap_handle(parent_cx.remote_cap_handle())
1398        .with_blocking_pool_handle(parent_cx.blocking_pool_handle())
1399        .with_evidence_sink(parent_cx.evidence_sink_handle())
1400        .with_macaroon_handle(parent_cx.macaroon_handle());
1401        let child_cx = if let Some(pressure) = parent_cx.pressure_handle() {
1402            child_cx.with_pressure(pressure)
1403        } else {
1404            child_cx
1405        };
1406        child_cx.set_trace_buffer(state.trace_handle());
1407        let child_cx_full = child_cx.retype::<cap::All>();
1408
1409        (child_cx, child_cx_full)
1410    }
1411
1412    /// Creates a task record in the runtime state.
1413    ///
1414    /// This is a helper method used by all spawn variants.
1415    pub(crate) fn create_task_record(
1416        &self,
1417        state: &mut RuntimeState,
1418    ) -> Result<TaskId, SpawnError> {
1419        use crate::util::ArenaIndex;
1420
1421        let now = state
1422            .timer_driver()
1423            .map_or(state.now, crate::time::TimerDriverHandle::now);
1424
1425        // Create placeholder task record
1426        let idx = state.insert_task(TaskRecord::new_with_time(
1427            TaskId::from_arena(ArenaIndex::new(0, 0)), // placeholder ID
1428            self.region,
1429            self.budget,
1430            now,
1431        ));
1432
1433        // Get the real task ID from the arena index
1434        let task_id = TaskId::from_arena(idx);
1435
1436        // Update the task record with the correct ID
1437        if let Some(record) = state.task_mut(task_id) {
1438            record.id = task_id;
1439        }
1440
1441        // Add task to the owning region
1442        if let Some(region) = state.region(self.region) {
1443            if let Err(err) = region.add_task(task_id) {
1444                // Rollback task creation
1445                state.remove_task(task_id);
1446                return Err(match err {
1447                    AdmissionError::Closed => SpawnError::RegionClosed(self.region),
1448                    AdmissionError::LimitReached { limit, live, .. } => {
1449                        SpawnError::RegionAtCapacity {
1450                            region: self.region,
1451                            limit,
1452                            live,
1453                        }
1454                    }
1455                });
1456            }
1457        } else {
1458            // Rollback task creation
1459            state.remove_task(task_id);
1460            return Err(SpawnError::RegionNotFound(self.region));
1461        }
1462
1463        state.record_task_spawn(task_id, self.region);
1464
1465        Ok(task_id)
1466    }
1467
1468    // =========================================================================
1469    // Finalizer Registration
1470    // =========================================================================
1471
1472    /// Registers a synchronous finalizer to run when the region closes.
1473    ///
1474    /// Finalizers are stored in LIFO order and executed during the Finalizing
1475    /// phase, after all children have completed. Use this for lightweight
1476    /// cleanup that doesn't need to await.
1477    ///
1478    /// # Arguments
1479    /// * `state` - The runtime state
1480    /// * `f` - The synchronous cleanup function
1481    ///
1482    /// # Returns
1483    /// `true` if the finalizer was registered successfully.
1484    ///
1485    /// # Example
1486    /// ```ignore
1487    /// scope.defer_sync(&mut state, || {
1488    ///     println!("Cleaning up!");
1489    /// });
1490    /// ```
1491    pub fn defer_sync<F>(&self, state: &mut RuntimeState, f: F) -> bool
1492    where
1493        F: FnOnce() + Send + 'static,
1494    {
1495        state.register_sync_finalizer(self.region, f)
1496    }
1497
1498    /// Registers an asynchronous finalizer to run when the region closes.
1499    ///
1500    /// Async finalizers run under a cancel mask to prevent interruption.
1501    /// They are driven to completion with a bounded budget. Use this for
1502    /// cleanup that needs to perform async operations (e.g., closing
1503    /// connections, flushing buffers).
1504    ///
1505    /// # Arguments
1506    /// * `state` - The runtime state
1507    /// * `future` - The async cleanup future
1508    ///
1509    /// # Returns
1510    /// `true` if the finalizer was registered successfully.
1511    ///
1512    /// # Example
1513    /// ```ignore
1514    /// scope.defer_async(&mut state, async {
1515    ///     close_connection().await;
1516    /// });
1517    /// ```
1518    pub fn defer_async<F>(&self, state: &mut RuntimeState, future: F) -> bool
1519    where
1520        F: Future<Output = ()> + Send + 'static,
1521    {
1522        state.register_async_finalizer(self.region, future)
1523    }
1524}
1525
1526impl<P: Policy> std::fmt::Debug for Scope<'_, P> {
1527    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1528        f.debug_struct("Scope")
1529            .field("region", &self.region)
1530            .field("budget", &self.budget)
1531            .finish()
1532    }
1533}
1534
1535#[cfg(test)]
1536mod tests {
1537    use super::*;
1538    use crate::record::RegionLimits;
1539    use crate::runtime::RuntimeState;
1540    use crate::types::{CancelKind, Outcome, Time};
1541    use crate::util::ArenaIndex;
1542    use futures_lite::future::block_on;
1543    use std::sync::Arc;
1544
1545    fn test_cx() -> Cx {
1546        Cx::new(
1547            RegionId::from_arena(ArenaIndex::new(0, 0)),
1548            TaskId::from_arena(ArenaIndex::new(0, 0)),
1549            Budget::INFINITE,
1550        )
1551    }
1552
1553    fn test_scope(region: RegionId, budget: Budget) -> Scope<'static> {
1554        Scope::new(region, budget)
1555    }
1556
1557    #[test]
1558    fn spawn_creates_task_record() {
1559        let mut state = RuntimeState::new();
1560        let cx = test_cx();
1561        let region = state.create_root_region(Budget::INFINITE);
1562        let scope = test_scope(region, Budget::INFINITE);
1563
1564        let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1565
1566        // Task should exist in state
1567        let task = state.task(handle.task_id());
1568        assert!(task.is_some());
1569
1570        // Task should be owned by the region
1571        let task = task.unwrap();
1572        assert_eq!(task.owner, region);
1573    }
1574
1575    #[test]
1576    fn spawn_inherits_registry_and_remote_capabilities() {
1577        use crate::cx::registry::RegistryHandle;
1578        use crate::remote::{NodeId, RemoteCap};
1579        use std::task::Context;
1580
1581        let mut state = RuntimeState::new();
1582
1583        let registry = crate::cx::NameRegistry::new();
1584        let registry_handle = RegistryHandle::new(Arc::new(registry));
1585        let parent_registry_arc = registry_handle.as_arc();
1586
1587        let cx = test_cx()
1588            .with_registry_handle(Some(registry_handle))
1589            .with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("origin-test")));
1590
1591        let region = state.create_root_region(Budget::INFINITE);
1592        let scope = test_scope(region, Budget::INFINITE);
1593
1594        let mut handle = scope
1595            .spawn_registered(&mut state, &cx, move |cx| async move {
1596                let child_registry = cx.registry_handle().expect("child must inherit registry");
1597                let child_registry_arc = child_registry.as_arc();
1598                let same_registry = Arc::ptr_eq(&child_registry_arc, &parent_registry_arc);
1599
1600                let child_remote = cx.remote().expect("child must inherit remote cap");
1601                let origin = child_remote.local_node().as_str().to_owned();
1602
1603                (same_registry, origin)
1604            })
1605            .unwrap();
1606
1607        let waker = std::task::Waker::noop().clone();
1608        let mut poll_cx = Context::from_waker(&waker);
1609
1610        let stored = state
1611            .get_stored_future(handle.task_id())
1612            .expect("spawn_registered must store the task");
1613        assert!(stored.poll(&mut poll_cx).is_ready());
1614
1615        let mut join_fut = std::pin::pin!(handle.join(&cx));
1616        match join_fut.as_mut().poll(&mut poll_cx) {
1617            Poll::Ready(Ok((same_registry, origin))) => {
1618                assert!(
1619                    same_registry,
1620                    "child should observe the same RegistryCap instance"
1621                );
1622                assert_eq!(origin, "origin-test");
1623            }
1624            other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1625        }
1626    }
1627
1628    #[test]
1629    fn spawn_inherits_runtime_timer_driver() {
1630        use std::task::Context;
1631
1632        let mut state = RuntimeState::new();
1633        let clock = Arc::new(crate::time::VirtualClock::new());
1634        state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
1635
1636        let cx = test_cx();
1637        let region = state.create_root_region(Budget::INFINITE);
1638        let scope = test_scope(region, Budget::INFINITE);
1639
1640        let (mut handle, mut stored) = scope
1641            .spawn(&mut state, &cx, |cx| async move { cx.has_timer() })
1642            .expect("spawn should succeed");
1643
1644        let waker = std::task::Waker::noop().clone();
1645        let mut poll_cx = Context::from_waker(&waker);
1646        assert!(stored.poll(&mut poll_cx).is_ready());
1647
1648        let mut join_fut = std::pin::pin!(handle.join(&cx));
1649        match join_fut.as_mut().poll(&mut poll_cx) {
1650            Poll::Ready(Ok(has_timer)) => assert!(has_timer),
1651            other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1652        }
1653    }
1654
1655    #[test]
1656    fn create_task_record_uses_runtime_timer_driver_time() {
1657        let mut state = RuntimeState::new();
1658        let clock = Arc::new(crate::time::VirtualClock::starting_at(Time::from_millis(
1659            11,
1660        )));
1661        state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(
1662            clock.clone(),
1663        ));
1664
1665        let region = state.create_root_region(Budget::INFINITE);
1666        let scope = test_scope(region, Budget::INFINITE);
1667
1668        clock.advance(Time::from_millis(7).as_nanos());
1669        let task_id = scope
1670            .create_task_record(&mut state)
1671            .expect("task record should be created");
1672
1673        let task = state.task(task_id).expect("task record");
1674        assert_eq!(task.created_at, Time::from_millis(18));
1675    }
1676
1677    #[test]
1678    fn spawn_blocking_inherits_runtime_timer_driver() {
1679        use std::task::Context;
1680
1681        let mut state = RuntimeState::new();
1682        let clock = Arc::new(crate::time::VirtualClock::new());
1683        state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
1684
1685        let cx = test_cx();
1686        let region = state.create_root_region(Budget::INFINITE);
1687        let scope = test_scope(region, Budget::INFINITE);
1688
1689        let (mut handle, mut stored) = scope
1690            .spawn_blocking(&mut state, &cx, |cx| cx.has_timer())
1691            .expect("spawn_blocking should succeed");
1692
1693        let waker = std::task::Waker::noop().clone();
1694        let mut poll_cx = Context::from_waker(&waker);
1695        assert!(stored.poll(&mut poll_cx).is_ready());
1696
1697        let mut join_fut = std::pin::pin!(handle.join(&cx));
1698        match join_fut.as_mut().poll(&mut poll_cx) {
1699            Poll::Ready(Ok(has_timer)) => assert!(has_timer),
1700            other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1701        }
1702    }
1703
1704    #[test]
1705    fn spawn_registered_stores_task() {
1706        let mut state = RuntimeState::new();
1707        let cx = test_cx();
1708        let region = state.create_root_region(Budget::INFINITE);
1709        let scope = test_scope(region, Budget::INFINITE);
1710
1711        // spawn_registered should both create and store the task
1712        let handle = scope
1713            .spawn_registered(&mut state, &cx, |_| async { 42_i32 })
1714            .unwrap();
1715
1716        // Task record should exist
1717        let task = state.task(handle.task_id());
1718        assert!(task.is_some());
1719        assert_eq!(task.unwrap().owner, region);
1720
1721        // StoredTask should be registered (can be retrieved for polling)
1722        let stored = state.get_stored_future(handle.task_id());
1723        assert!(stored.is_some(), "spawn_registered should store the task");
1724    }
1725
1726    #[test]
1727    fn spawn_registered_task_can_be_polled() {
1728        use std::task::Context;
1729
1730        let mut state = RuntimeState::new();
1731        let cx = test_cx();
1732        let region = state.create_root_region(Budget::INFINITE);
1733        let scope = test_scope(region, Budget::INFINITE);
1734
1735        let mut handle = scope
1736            .spawn_registered(&mut state, &cx, |_| async { 42_i32 })
1737            .unwrap();
1738
1739        // Get the stored future and poll it
1740        let waker = std::task::Waker::noop().clone();
1741        let mut poll_cx = Context::from_waker(&waker);
1742
1743        let stored = state.get_stored_future(handle.task_id()).unwrap();
1744        let poll_result = stored.poll(&mut poll_cx);
1745        assert!(
1746            poll_result.is_ready(),
1747            "Simple async should complete in one poll"
1748        );
1749
1750        // Join should now have the result
1751        let mut join_fut = std::pin::pin!(handle.join(&cx));
1752        match join_fut.as_mut().poll(&mut poll_cx) {
1753            Poll::Ready(Ok(val)) => assert_eq!(val, 42),
1754            other => unreachable!("Expected Ready(Ok(42)), got {other:?}"),
1755        }
1756    }
1757
1758    #[test]
1759    fn spawn_blocking_creates_task_record() {
1760        let mut state = RuntimeState::new();
1761        let cx = test_cx();
1762        let region = state.create_root_region(Budget::INFINITE);
1763        let scope = test_scope(region, Budget::INFINITE);
1764
1765        let (handle, _stored) = scope.spawn_blocking(&mut state, &cx, |_| 42_i32).unwrap();
1766
1767        // Task should exist
1768        let task = state.task(handle.task_id());
1769        assert!(task.is_some());
1770        assert_eq!(task.unwrap().owner, region);
1771    }
1772
1773    #[test]
1774    fn spawn_local_creates_task_record() {
1775        let mut state = RuntimeState::new();
1776        let cx = test_cx();
1777        let region = state.create_root_region(Budget::INFINITE);
1778        let scope = test_scope(region, Budget::INFINITE);
1779
1780        let local_ready = Arc::new(parking_lot::Mutex::new(std::collections::VecDeque::new()));
1781        let _local_ready_guard =
1782            crate::runtime::scheduler::three_lane::ScopedLocalReady::new(Arc::clone(&local_ready));
1783        let _worker_guard = crate::runtime::scheduler::three_lane::ScopedWorkerId::new(1);
1784
1785        // In Phase 0, spawn_local requires Send bounds
1786        // In Phase 1+, this will work with !Send futures
1787        let handle = scope
1788            .spawn_local(&mut state, &cx, |_| async move { 42_i32 })
1789            .unwrap();
1790
1791        // Task should exist
1792        let task = state.task(handle.task_id());
1793        assert!(task.is_some());
1794        assert_eq!(task.unwrap().owner, region);
1795    }
1796
1797    #[test]
1798    fn spawn_local_without_scheduler_fails_and_rolls_back() {
1799        let mut state = RuntimeState::new();
1800        let cx = test_cx();
1801        let region = state.create_root_region(Budget::INFINITE);
1802        let scope = test_scope(region, Budget::INFINITE);
1803
1804        let result = scope.spawn_local(&mut state, &cx, |_| async move { 5_i32 });
1805        assert!(matches!(result, Err(SpawnError::LocalSchedulerUnavailable)));
1806
1807        // Task should not exist
1808        assert!(state.tasks_is_empty());
1809        let region_record = state.region(region).unwrap();
1810        assert!(region_record.task_ids().is_empty());
1811    }
1812
1813    #[test]
1814    fn spawn_local_makes_progress_via_local_ready() {
1815        use std::sync::Arc;
1816        use std::task::Context;
1817
1818        let mut state = RuntimeState::new();
1819        let cx = test_cx();
1820        let region = state.create_root_region(Budget::INFINITE);
1821        let scope = test_scope(region, Budget::INFINITE);
1822
1823        let local_ready = Arc::new(parking_lot::Mutex::new(std::collections::VecDeque::new()));
1824        let _local_ready_guard =
1825            crate::runtime::scheduler::three_lane::ScopedLocalReady::new(Arc::clone(&local_ready));
1826        let _worker_guard = crate::runtime::scheduler::three_lane::ScopedWorkerId::new(1);
1827
1828        let mut handle = scope
1829            .spawn_local(&mut state, &cx, |_| async move { 7_i32 })
1830            .unwrap();
1831
1832        let queued = {
1833            let queue = local_ready.lock();
1834            queue.contains(&handle.task_id())
1835        };
1836        assert!(queued, "spawn_local should enqueue into local_ready");
1837
1838        let task_id = {
1839            let mut queue = local_ready.lock();
1840            queue
1841                .pop_front()
1842                .expect("local_ready should contain spawned task")
1843        };
1844
1845        let mut join_fut = std::pin::pin!(handle.join(&cx));
1846        let waker = std::task::Waker::noop().clone();
1847        let mut ctx = Context::from_waker(&waker);
1848
1849        assert!(join_fut.as_mut().poll(&mut ctx).is_pending());
1850
1851        let mut local_task =
1852            crate::runtime::local::remove_local_task(task_id).expect("local task missing");
1853        assert!(local_task.poll(&mut ctx).is_ready());
1854
1855        match join_fut.as_mut().poll(&mut ctx) {
1856            Poll::Ready(Ok(val)) => assert_eq!(val, 7),
1857            res => unreachable!("Expected Ready(Ok(7)), got {res:?}"),
1858        }
1859    }
1860
1861    #[test]
1862    fn task_added_to_region() {
1863        let mut state = RuntimeState::new();
1864        let cx = test_cx();
1865        let region = state.create_root_region(Budget::INFINITE);
1866        let scope = test_scope(region, Budget::INFINITE);
1867
1868        let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1869
1870        // Check region has the task
1871        let region_record = state.region(region).unwrap();
1872        assert!(region_record.task_ids().contains(&handle.task_id()));
1873    }
1874
1875    #[test]
1876    fn multiple_spawns_create_distinct_tasks() {
1877        let mut state = RuntimeState::new();
1878        let cx = test_cx();
1879        let region = state.create_root_region(Budget::INFINITE);
1880        let scope = test_scope(region, Budget::INFINITE);
1881
1882        let (handle1, _) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
1883        let (handle2, _) = scope.spawn(&mut state, &cx, |_| async { 2_i32 }).unwrap();
1884        let (handle3, _) = scope.spawn(&mut state, &cx, |_| async { 3_i32 }).unwrap();
1885
1886        // All task IDs should be different
1887        assert_ne!(handle1.task_id(), handle2.task_id());
1888        assert_ne!(handle2.task_id(), handle3.task_id());
1889        assert_ne!(handle1.task_id(), handle3.task_id());
1890
1891        // All tasks should be in the region
1892        let region_record = state.region(region).unwrap();
1893        assert!(region_record.task_ids().contains(&handle1.task_id()));
1894        assert!(region_record.task_ids().contains(&handle2.task_id()));
1895        assert!(region_record.task_ids().contains(&handle3.task_id()));
1896    }
1897
1898    #[test]
1899    fn spawn_into_closing_region_should_fail() {
1900        let mut state = RuntimeState::new();
1901        let cx = test_cx();
1902        let region = state.create_root_region(Budget::INFINITE);
1903        let scope = test_scope(region, Budget::INFINITE);
1904
1905        // Transition region to Closing
1906        let region_record = state.region_mut(region).expect("region");
1907        region_record.begin_close(None);
1908
1909        // Attempt to spawn should fail
1910        let result = scope.spawn(&mut state, &cx, |_| async { 42 });
1911        assert!(matches!(result, Err(SpawnError::RegionClosed(_))));
1912    }
1913
1914    #[test]
1915    fn test_join_manual_poll() {
1916        use std::task::Context;
1917
1918        let mut state = RuntimeState::new();
1919        let cx = test_cx();
1920        let region = state.create_root_region(Budget::INFINITE);
1921        let scope = test_scope(region, Budget::INFINITE);
1922
1923        // Spawn a task
1924        let (mut handle, mut stored_task) =
1925            scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1926        // The stored task is returned directly, not put in state by scope.spawn
1927
1928        // Create join future
1929        let mut join_fut = std::pin::pin!(handle.join(&cx));
1930
1931        // Create waker context
1932        let waker = std::task::Waker::noop().clone();
1933        let mut ctx = Context::from_waker(&waker);
1934
1935        // Poll join - should be pending
1936        assert!(join_fut.as_mut().poll(&mut ctx).is_pending());
1937
1938        // Poll stored task - should complete and send result
1939        assert!(stored_task.poll(&mut ctx).is_ready());
1940
1941        // Poll join - should be ready now
1942        match join_fut.as_mut().poll(&mut ctx) {
1943            Poll::Ready(Ok(val)) => assert_eq!(val, 42),
1944            other => unreachable!("Expected Ready(Ok(42)), got {other:?}"),
1945        }
1946    }
1947
1948    #[test]
1949    fn spawn_abort_cancels_task() {
1950        use std::task::{Context, Poll};
1951
1952        let mut state = RuntimeState::new();
1953        let cx = test_cx();
1954        let region = state.create_root_region(Budget::INFINITE);
1955        let scope = test_scope(region, Budget::INFINITE);
1956
1957        // Spawn a task that checks for cancellation
1958        let (mut handle, mut stored_task) = scope
1959            .spawn(&mut state, &cx, |cx| async move {
1960                // We expect to be cancelled immediately because abort() is called before we run
1961                if cx.checkpoint().is_err() {
1962                    return "cancelled";
1963                }
1964                "finished"
1965            })
1966            .unwrap();
1967
1968        // Abort the task via handle
1969        handle.abort();
1970
1971        // Drive the task
1972        let waker = std::task::Waker::noop().clone();
1973        let mut ctx = Context::from_waker(&waker);
1974
1975        // Task should run, see cancellation, and return "cancelled"
1976        match stored_task.poll(&mut ctx) {
1977            Poll::Ready(crate::types::Outcome::Ok(())) => {}
1978            res => unreachable!("Task should have completed with Ok(()), got {res:?}"),
1979        }
1980
1981        // Check result via handle
1982        let mut join_fut = std::pin::pin!(handle.join(&cx));
1983        match join_fut.as_mut().poll(&mut ctx) {
1984            Poll::Ready(Ok(val)) => assert_eq!(val, "cancelled"),
1985            Poll::Ready(Err(e)) => unreachable!("Task failed unexpectedly: {e}"),
1986            Poll::Pending => unreachable!("Join should be ready"),
1987        }
1988    }
1989
1990    #[test]
1991    fn hedge_backup_spawn_failure_aborts_primary() {
1992        let mut state = RuntimeState::new();
1993        let cx = test_cx();
1994        let region = state.create_root_region(Budget::INFINITE);
1995        let scope = test_scope(region, Budget::INFINITE);
1996
1997        let limits = RegionLimits {
1998            max_tasks: Some(1),
1999            ..RegionLimits::unlimited()
2000        };
2001        assert!(state.set_region_limits(region, limits));
2002
2003        let result = block_on(scope.hedge(
2004            &mut state,
2005            &cx,
2006            std::time::Duration::ZERO,
2007            |_| async { 1_u8 },
2008            |_| async { 2_u8 },
2009        ));
2010
2011        assert!(matches!(
2012            result,
2013            Err(JoinError::Cancelled(reason))
2014                if reason.kind == CancelKind::ResourceUnavailable
2015        ));
2016
2017        let task_id = *state
2018            .region(region)
2019            .expect("region missing")
2020            .task_ids()
2021            .first()
2022            .expect("primary task should remain tracked");
2023
2024        let task = state.task(task_id).expect("primary task record missing");
2025        let (cancel_requested, cancel_reason_kind) = {
2026            let inner = task
2027                .cx_inner
2028                .as_ref()
2029                .expect("primary task must have shared Cx inner")
2030                .read();
2031            (
2032                inner.cancel_requested,
2033                inner.cancel_reason.as_ref().map(|r| r.kind),
2034            )
2035        };
2036
2037        assert!(
2038            cancel_requested,
2039            "primary task must be cancellation-requested when backup spawn fails"
2040        );
2041        assert_eq!(cancel_reason_kind, Some(CancelKind::ResourceUnavailable));
2042    }
2043
2044    #[test]
2045    fn region_closes_empty_child() {
2046        let mut state = RuntimeState::new();
2047        let cx = test_cx();
2048        let parent = state.create_root_region(Budget::INFINITE);
2049        let scope = test_scope(parent, Budget::INFINITE);
2050
2051        let outcome = block_on(scope.region(
2052            &mut state,
2053            &cx,
2054            crate::types::policy::FailFast,
2055            |child, _state| {
2056                let child_id = child.region_id();
2057                async move { Outcome::Ok(child_id) }
2058            },
2059        ))
2060        .expect("child region created");
2061
2062        let child_id = match outcome {
2063            Outcome::Ok(id) => id,
2064            other => unreachable!("expected Outcome::Ok(child_id), got {other:?}"),
2065        };
2066
2067        assert!(
2068            state.region(child_id).is_none(),
2069            "closed child region should be reclaimed from arena"
2070        );
2071
2072        let parent_record = state.region(parent).expect("parent record missing");
2073        assert!(
2074            !parent_record.child_ids().contains(&child_id),
2075            "closed child should be removed from parent"
2076        );
2077    }
2078
2079    #[test]
2080    fn region_budget_is_met_with_parent() {
2081        let mut state = RuntimeState::new();
2082        let cx = test_cx();
2083        let parent = state.create_root_region(Budget::with_deadline_secs(10));
2084        let scope = test_scope(parent, Budget::with_deadline_secs(10));
2085
2086        let outcome = block_on(scope.region_with_budget(
2087            &mut state,
2088            &cx,
2089            Budget::with_deadline_secs(30),
2090            crate::types::policy::FailFast,
2091            |child, _state| {
2092                let child_id = child.region_id();
2093                let child_budget = child.budget();
2094                async move { Outcome::Ok((child_id, child_budget)) }
2095            },
2096        ))
2097        .expect("child region created");
2098
2099        let (child_id, child_budget) = match outcome {
2100            Outcome::Ok(tuple) => tuple,
2101            other => unreachable!("expected Outcome::Ok(child_id), got {other:?}"),
2102        };
2103
2104        assert_eq!(
2105            child_budget.deadline,
2106            Some(crate::types::Time::from_secs(10))
2107        );
2108        assert!(
2109            state.region(child_id).is_none(),
2110            "closed child region should be reclaimed from arena"
2111        );
2112    }
2113
2114    #[test]
2115    fn region_spawns_tasks_in_child() {
2116        use std::task::{Context, Poll};
2117
2118        let mut state = RuntimeState::new();
2119        let cx = test_cx();
2120        let parent = state.create_root_region(Budget::INFINITE);
2121        let scope = test_scope(parent, Budget::INFINITE);
2122
2123        let outcome = block_on(scope.region(
2124            &mut state,
2125            &cx,
2126            crate::types::policy::FailFast,
2127            |child, state| {
2128                let child_id = child.region_id();
2129                let (handle, mut stored) = child
2130                    .spawn(state, &cx, |_| async { 7_i32 })
2131                    .expect("spawn in child");
2132
2133                let parent_has = state
2134                    .region(parent)
2135                    .expect("parent record missing")
2136                    .task_ids()
2137                    .contains(&handle.task_id());
2138                let child_has = state
2139                    .region(child_id)
2140                    .expect("child record missing")
2141                    .task_ids()
2142                    .contains(&handle.task_id());
2143
2144                let waker = std::task::Waker::noop().clone();
2145                let mut poll_cx = Context::from_waker(&waker);
2146                let poll_result = stored.poll(&mut poll_cx);
2147                if let Poll::Ready(outcome) = poll_result {
2148                    let task_outcome = match outcome {
2149                        Outcome::Ok(()) => Outcome::Ok(()),
2150                        Outcome::Panicked(payload) => Outcome::Panicked(payload),
2151                        other => unreachable!("unexpected task outcome: {other:?}"),
2152                    };
2153                    if let Some(task_record) = state.task_mut(handle.task_id()) {
2154                        task_record.complete(task_outcome);
2155                    }
2156                    let _ = state.task_completed(handle.task_id());
2157                }
2158
2159                std::future::ready(Outcome::Ok((child_id, parent_has, child_has)))
2160            },
2161        ))
2162        .expect("child region created");
2163
2164        let (child_id, parent_has, child_has) = match outcome {
2165            Outcome::Ok(tuple) => tuple,
2166            other => unreachable!("expected Outcome::Ok(tuple), got {other:?}"),
2167        };
2168
2169        assert!(!parent_has, "task should not be owned by parent region");
2170        assert!(child_has, "task should be owned by child region");
2171
2172        let parent_record = state.region(parent).expect("parent record missing");
2173        assert!(
2174            !parent_record.child_ids().contains(&child_id),
2175            "closed child should be removed from parent"
2176        );
2177    }
2178
2179    #[test]
2180    fn spawn_panic_propagates_as_panicked_error() {
2181        use std::task::{Context, Poll};
2182
2183        let mut state = RuntimeState::new();
2184        let cx = test_cx();
2185        let region = state.create_root_region(Budget::INFINITE);
2186        let scope = test_scope(region, Budget::INFINITE);
2187
2188        let (mut handle, mut stored_task) = scope
2189            .spawn(&mut state, &cx, |_| async {
2190                std::panic::panic_any("oops");
2191            })
2192            .unwrap();
2193
2194        // Drive the task
2195        let waker = std::task::Waker::noop().clone();
2196        let mut ctx = Context::from_waker(&waker);
2197
2198        // Polling stored task should return Ready(Panicked) even if it panics (caught inside)
2199        match stored_task.poll(&mut ctx) {
2200            Poll::Ready(crate::types::Outcome::Panicked(_)) => {}
2201            res => unreachable!("Task should have completed with Panicked, got {res:?}"),
2202        }
2203
2204        // Check result via handle
2205        let mut join_fut = std::pin::pin!(handle.join(&cx));
2206        match join_fut.as_mut().poll(&mut ctx) {
2207            Poll::Ready(Err(JoinError::Panicked(p))) => {
2208                assert_eq!(p.message(), "oops");
2209            }
2210            res => unreachable!("Expected Panicked, got {res:?}"),
2211        }
2212    }
2213
2214    #[test]
2215    fn join_all_success() {
2216        use std::task::{Context, Poll};
2217
2218        let mut state = RuntimeState::new();
2219        let cx = test_cx();
2220        let region = state.create_root_region(Budget::INFINITE);
2221        let scope = test_scope(region, Budget::INFINITE);
2222
2223        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1 }).unwrap();
2224        let (h2, mut t2) = scope.spawn(&mut state, &cx, |_| async { 2 }).unwrap();
2225
2226        // Drive tasks to completion
2227        let waker = std::task::Waker::noop().clone();
2228        let mut ctx = Context::from_waker(&waker);
2229        assert!(t1.poll(&mut ctx).is_ready());
2230        assert!(t2.poll(&mut ctx).is_ready());
2231
2232        let handles = vec![h1, h2];
2233        let mut fut = Box::pin(scope.join_all(&cx, handles));
2234
2235        match fut.as_mut().poll(&mut ctx) {
2236            Poll::Ready(results) => {
2237                assert_eq!(results.len(), 2);
2238                assert_eq!(results[0].as_ref().unwrap(), &1);
2239                assert_eq!(results[1].as_ref().unwrap(), &2);
2240            }
2241            Poll::Pending => unreachable!("join_all should be ready"),
2242        }
2243    }
2244
2245    #[test]
2246    fn race_all_aborted_task_is_drained() {
2247        use std::task::{Context, Poll};
2248
2249        let mut state = RuntimeState::new();
2250        let cx = test_cx();
2251        let region = state.create_root_region(Budget::INFINITE);
2252        let scope = test_scope(region, Budget::INFINITE);
2253
2254        // Task 1: completes immediately
2255        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1 }).unwrap();
2256
2257        // Task 2: yields once, checking for cancellation
2258        let (h2, mut t2) = scope
2259            .spawn(&mut state, &cx, |cx| async move {
2260                // Yield once to simulate running
2261                struct YieldOnce(bool);
2262                impl std::future::Future for YieldOnce {
2263                    type Output = ();
2264                    fn poll(
2265                        mut self: std::pin::Pin<&mut Self>,
2266                        cx: &mut std::task::Context<'_>,
2267                    ) -> std::task::Poll<()> {
2268                        if self.0 {
2269                            std::task::Poll::Ready(())
2270                        } else {
2271                            self.0 = true;
2272                            cx.waker().wake_by_ref();
2273                            std::task::Poll::Pending
2274                        }
2275                    }
2276                }
2277                YieldOnce(false).await;
2278
2279                // Check cancellation
2280                if cx.checkpoint().is_err() {
2281                    return 0; // Cancelled
2282                }
2283                2
2284            })
2285            .unwrap();
2286
2287        let waker = std::task::Waker::noop().clone();
2288        let mut ctx = Context::from_waker(&waker);
2289
2290        // Drive t1 to completion (winner)
2291        assert!(t1.poll(&mut ctx).is_ready());
2292
2293        // Initialize race_all
2294        let handles = vec![h1, h2];
2295        let mut race_fut = Box::pin(scope.race_all(&cx, handles));
2296
2297        // Poll race_all.
2298        // It sees h1 ready. Winner=0.
2299        // It aborts h2.
2300        // It awaits h2 drain.
2301        // h2 is still pending (hasn't run), so h2.join() returns Pending.
2302        // race_fut returns Pending.
2303        assert!(race_fut.as_mut().poll(&mut ctx).is_pending());
2304
2305        // Now drive t2. It was aborted, so it should see cancellation if checked?
2306        // Wait, handle.abort() sets inner.cancel_requested.
2307        // But my t2 closure yields first.
2308        // So first poll of t2 -> YieldOnce returns Pending.
2309        assert!(t2.poll(&mut ctx).is_pending());
2310
2311        // Poll race_fut again. Still waiting for h2 drain.
2312        assert!(race_fut.as_mut().poll(&mut ctx).is_pending());
2313
2314        // Poll t2 again. YieldOnce finishes.
2315        // Then it hits checkpoint(). cancel_requested is true.
2316        // It returns 0 (simulated cancellation return).
2317        // Actually, normally tasks return Result or are wrapped.
2318        // Here spawn returns Result<i32>.
2319        // My closure returns i32.
2320        // So h2.join() will return Ok(0).
2321        // This counts as "drained".
2322        assert!(t2.poll(&mut ctx).is_ready());
2323
2324        // Now poll race_fut. h2 drain complete.
2325        // Should return (1, 0).
2326        match race_fut.as_mut().poll(&mut ctx) {
2327            Poll::Ready(Ok((val, idx))) => {
2328                assert_eq!(val, 1);
2329                assert_eq!(idx, 0);
2330            }
2331            res => unreachable!("Expected Ready(Ok((1, 0))), got {res:?}"),
2332        }
2333    }
2334
2335    #[test]
2336    fn race_surfaces_loser_panic_even_if_winner_succeeds() {
2337        use std::task::Context;
2338
2339        let mut state = RuntimeState::new();
2340        let cx = test_cx();
2341        let region = state.create_root_region(Budget::INFINITE);
2342        let scope = test_scope(region, Budget::INFINITE);
2343
2344        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
2345        let (h2, mut t2) = scope
2346            .spawn(&mut state, &cx, |_| async {
2347                std::panic::panic_any("loser panic");
2348            })
2349            .unwrap();
2350
2351        let waker = std::task::Waker::noop().clone();
2352        let mut poll_cx = Context::from_waker(&waker);
2353        assert!(t1.poll(&mut poll_cx).is_ready());
2354        assert!(t2.poll(&mut poll_cx).is_ready());
2355
2356        let result = block_on(scope.race(&cx, h1, h2));
2357        assert!(
2358            matches!(result, Err(JoinError::Panicked(_))),
2359            "loser panic must dominate race result, got {result:?}"
2360        );
2361    }
2362
2363    #[test]
2364    fn race_preserves_winner_panic_over_loser_panic() {
2365        use std::task::{Context, Poll};
2366
2367        let mut state = RuntimeState::new();
2368        let cx = test_cx();
2369        let region = state.create_root_region(Budget::INFINITE);
2370        let scope = test_scope(region, Budget::INFINITE);
2371
2372        let (h1, mut t1) = scope
2373            .spawn(&mut state, &cx, |_| async {
2374                std::panic::panic_any("winner panic");
2375            })
2376            .unwrap();
2377        let (h2, mut t2) = scope
2378            .spawn(&mut state, &cx, |_| {
2379                let mut first_poll = true;
2380                std::future::poll_fn(move |poll_cx| {
2381                    if first_poll {
2382                        first_poll = false;
2383                        poll_cx.waker().wake_by_ref();
2384                        Poll::Pending
2385                    } else {
2386                        std::panic::panic_any("loser panic");
2387                    }
2388                })
2389            })
2390            .unwrap();
2391
2392        let waker = std::task::Waker::noop().clone();
2393        let mut poll_cx = Context::from_waker(&waker);
2394        assert!(t1.poll(&mut poll_cx).is_ready());
2395        assert!(t2.poll(&mut poll_cx).is_pending());
2396
2397        let result = block_on(scope.race(&cx, h1, h2));
2398        match result {
2399            Err(JoinError::Panicked(payload)) => {
2400                assert_eq!(payload.message(), "winner panic");
2401            }
2402            other => unreachable!("winner panic must dominate race result, got {other:?}"),
2403        }
2404    }
2405
2406    #[test]
2407    fn race_all_surfaces_simultaneous_loser_panic() {
2408        use std::task::Context;
2409
2410        let mut state = RuntimeState::new();
2411        let cx = test_cx();
2412        let region = state.create_root_region(Budget::INFINITE);
2413        let scope = test_scope(region, Budget::INFINITE);
2414
2415        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
2416        let (h2, mut t2) = scope
2417            .spawn(&mut state, &cx, |_| async {
2418                std::panic::panic_any("simultaneous loser panic");
2419            })
2420            .unwrap();
2421        let (h3, mut t3) = scope.spawn(&mut state, &cx, |_| async { 3_i32 }).unwrap();
2422
2423        let waker = std::task::Waker::noop().clone();
2424        let mut poll_cx = Context::from_waker(&waker);
2425        assert!(t1.poll(&mut poll_cx).is_ready());
2426        assert!(t2.poll(&mut poll_cx).is_ready());
2427        assert!(t3.poll(&mut poll_cx).is_ready());
2428
2429        let result = block_on(scope.race_all(&cx, vec![h1, h2, h3]));
2430        assert!(
2431            matches!(result, Err(JoinError::Panicked(_))),
2432            "simultaneous loser panic must dominate race_all result, got {result:?}"
2433        );
2434    }
2435
2436    #[test]
2437    fn race_all_preserves_winner_panic_over_loser_panic() {
2438        use std::task::{Context, Poll};
2439
2440        let mut state = RuntimeState::new();
2441        let cx = test_cx();
2442        let region = state.create_root_region(Budget::INFINITE);
2443        let scope = test_scope(region, Budget::INFINITE);
2444
2445        let (h1, mut t1) = scope
2446            .spawn(&mut state, &cx, |_| async {
2447                std::panic::panic_any("winner panic");
2448            })
2449            .unwrap();
2450        let (h2, mut t2) = scope
2451            .spawn(&mut state, &cx, |_| {
2452                let mut first_poll = true;
2453                std::future::poll_fn(move |poll_cx| {
2454                    if first_poll {
2455                        first_poll = false;
2456                        poll_cx.waker().wake_by_ref();
2457                        Poll::Pending
2458                    } else {
2459                        std::panic::panic_any("loser panic");
2460                    }
2461                })
2462            })
2463            .unwrap();
2464
2465        let waker = std::task::Waker::noop().clone();
2466        let mut poll_cx = Context::from_waker(&waker);
2467        assert!(t1.poll(&mut poll_cx).is_ready());
2468        assert!(t2.poll(&mut poll_cx).is_pending());
2469
2470        let result = block_on(scope.race_all(&cx, vec![h1, h2]));
2471        match result {
2472            Err(JoinError::Panicked(payload)) => {
2473                assert_eq!(payload.message(), "winner panic");
2474            }
2475            other => unreachable!("winner panic must dominate race_all result, got {other:?}"),
2476        }
2477    }
2478
2479    #[test]
2480    fn race_all_empty_is_pending() {
2481        let mut state = RuntimeState::new();
2482        let cx = test_cx();
2483        let region = state.create_root_region(Budget::INFINITE);
2484        let scope = test_scope(region, Budget::INFINITE);
2485
2486        let fut = scope.race_all::<i32>(&cx, vec![]);
2487        let waker = std::task::Waker::noop();
2488        let mut poll_cx = std::task::Context::from_waker(waker);
2489        let pinned = std::pin::pin!(fut);
2490        let status = std::future::Future::poll(pinned, &mut poll_cx);
2491        assert!(status.is_pending());
2492    }
2493
2494    // =============================================================================
2495    // CONFORMANCE TESTS: Structured Concurrency Invariants
2496    // =============================================================================
2497    //
2498    // These tests verify the core structured concurrency guarantees that must hold
2499    // for the spawn/join contract to be sound.
2500
2501    #[test]
2502    fn conformance_spawn_creates_trackable_task() {
2503        // INVARIANT: Every spawned task creates a trackable record that belongs to the spawning region
2504        let mut state = RuntimeState::new();
2505        let cx = test_cx();
2506        let region = state.create_root_region(Budget::INFINITE);
2507        let scope = test_scope(region, Budget::INFINITE);
2508
2509        let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
2510
2511        // Task must exist and be trackable
2512        let task_record = state
2513            .task(handle.task_id())
2514            .expect("spawned task must have a record");
2515
2516        // Task must belong to the spawning region
2517        assert_eq!(
2518            task_record.owner, region,
2519            "spawned task must be owned by the spawning region"
2520        );
2521
2522        // Region must track the task
2523        let region_record = state.region(region).expect("spawning region must exist");
2524        assert!(
2525            region_record.task_ids().contains(&handle.task_id()),
2526            "spawning region must track the spawned task"
2527        );
2528    }
2529
2530    #[test]
2531    fn conformance_spawn_enforces_send_bounds() {
2532        // INVARIANT: spawn() enforces Send + 'static bounds for cross-worker task migration
2533        let mut state = RuntimeState::new();
2534        let cx = test_cx();
2535        let region = state.create_root_region(Budget::INFINITE);
2536        let scope = test_scope(region, Budget::INFINITE);
2537
2538        // This should compile - Send + 'static data
2539        let send_data = String::from("test");
2540        let (handle, _stored) = scope
2541            .spawn(&mut state, &cx, move |_| async move {
2542                send_data.len() // Uses Send + 'static String
2543            })
2544            .unwrap();
2545
2546        // Task record should reflect Send bounds
2547        let task_record = state
2548            .task(handle.task_id())
2549            .expect("Send task must have a record");
2550        assert_eq!(task_record.owner, region);
2551
2552        // NOTE: Non-Send compile-time test examples are in module docstring
2553        // (compile_fail tests with Rc<T> and borrowed references)
2554    }
2555
2556    #[test]
2557    fn conformance_join_awaits_task_completion() {
2558        // INVARIANT: TaskHandle.join() waits for task completion and returns the result
2559
2560        use std::task::Context;
2561
2562        let mut state = RuntimeState::new();
2563        let cx = test_cx();
2564        let region = state.create_root_region(Budget::INFINITE);
2565        let scope = test_scope(region, Budget::INFINITE);
2566
2567        let (mut handle, mut stored) = scope.spawn(&mut state, &cx, |_| async { 123_i32 }).unwrap();
2568
2569        let waker = std::task::Waker::noop().clone();
2570        let mut poll_cx = Context::from_waker(&waker);
2571
2572        // Before task completion, join should be pending
2573        let mut join_fut = std::pin::pin!(handle.join(&cx));
2574        assert!(
2575            join_fut.as_mut().poll(&mut poll_cx).is_pending(),
2576            "join must be pending before task completion"
2577        );
2578
2579        // Complete the task
2580        assert!(
2581            stored.poll(&mut poll_cx).is_ready(),
2582            "test task must complete in one poll"
2583        );
2584
2585        // After task completion, join should return the result
2586        match join_fut.as_mut().poll(&mut poll_cx) {
2587            std::task::Poll::Ready(Ok(result)) => {
2588                assert_eq!(result, 123, "join must return the task's result");
2589            }
2590            other => panic!("join must be Ready(Ok(123)) after task completion, got {other:?}"),
2591        }
2592    }
2593
2594    #[test]
2595    fn conformance_child_region_task_isolation() {
2596        // INVARIANT: Tasks spawned in child regions belong to the child, not the parent
2597
2598        use std::task::Context;
2599
2600        let mut state = RuntimeState::new();
2601        let cx = test_cx();
2602        let parent_region = state.create_root_region(Budget::INFINITE);
2603        let scope = test_scope(parent_region, Budget::INFINITE);
2604
2605        let outcome = block_on(scope.region(
2606            &mut state,
2607            &cx,
2608            crate::types::policy::FailFast,
2609            |child_scope, state| {
2610                let child_region = child_scope.region_id();
2611
2612                // Spawn task in child region
2613                let (handle, mut stored) = child_scope
2614                    .spawn(state, &cx, |_| async { 456_i32 })
2615                    .expect("spawn in child region must succeed");
2616
2617                // Verify task ownership invariants
2618                let task_record = state
2619                    .task(handle.task_id())
2620                    .expect("child task must have a record");
2621                let child_owns = task_record.owner == child_region;
2622                let parent_owns = task_record.owner == parent_region;
2623
2624                // Verify region tracking invariants
2625                let parent_tracks = state
2626                    .region(parent_region)
2627                    .is_some_and(|r| r.task_ids().contains(&handle.task_id()));
2628                let child_tracks = state
2629                    .region(child_region)
2630                    .is_some_and(|r| r.task_ids().contains(&handle.task_id()));
2631
2632                // Complete the task for clean shutdown
2633                let waker = std::task::Waker::noop().clone();
2634                let mut poll_cx = Context::from_waker(&waker);
2635                if let std::task::Poll::Ready(outcome) = stored.poll(&mut poll_cx) {
2636                    if let Some(task) = state.task_mut(handle.task_id()) {
2637                        task.complete(outcome.map_err(|_| {
2638                            crate::error::Error::new(crate::error::ErrorKind::Internal)
2639                        }));
2640                    }
2641                    let _ = state.task_completed(handle.task_id());
2642                }
2643
2644                std::future::ready(Outcome::Ok((
2645                    child_owns,
2646                    parent_owns,
2647                    child_tracks,
2648                    parent_tracks,
2649                )))
2650            },
2651        ))
2652        .expect("child region must complete");
2653
2654        let (child_owns, parent_owns, child_tracks, parent_tracks) = match outcome {
2655            Outcome::Ok(tuple) => tuple,
2656            other => panic!("expected Ok(ownership_data), got {other:?}"),
2657        };
2658
2659        assert!(
2660            child_owns,
2661            "task spawned in child region must be owned by child"
2662        );
2663        assert!(
2664            !parent_owns,
2665            "task spawned in child region must NOT be owned by parent"
2666        );
2667        assert!(child_tracks, "child region must track its spawned tasks");
2668        assert!(!parent_tracks, "parent region must NOT track child's tasks");
2669    }
2670
2671    #[test]
2672    fn conformance_capability_inheritance() {
2673        // INVARIANT: Spawned tasks inherit capabilities from the parent Cx
2674        use crate::cx::macaroon::MacaroonToken;
2675        use crate::cx::registry::RegistryHandle;
2676        use crate::remote::{NodeId, RemoteCap};
2677        use crate::security::key::AuthKey;
2678        use crate::types::SystemPressure;
2679        use std::sync::Arc;
2680        use std::task::Context;
2681
2682        let mut state = RuntimeState::new();
2683        let region = state.create_root_region(Budget::INFINITE);
2684        let scope = test_scope(region, Budget::INFINITE);
2685
2686        // Setup parent Cx with capabilities
2687        let registry = crate::cx::NameRegistry::new();
2688        let registry_handle = RegistryHandle::new(Arc::new(registry));
2689        let parent_registry_arc = registry_handle.as_arc();
2690        let parent_io_cap: Arc<dyn crate::io::IoCap> = Arc::new(crate::io::LabIoCap::new());
2691        let parent_pressure = Arc::new(SystemPressure::new());
2692        parent_pressure.set_headroom(0.25);
2693        let auth_key = AuthKey::from_seed(7);
2694        let token = MacaroonToken::mint(&auth_key, "scope:spawn", "cx/scope");
2695
2696        let parent_cx = Cx::new_with_io(
2697            crate::types::RegionId::new_for_test(0, 0),
2698            crate::types::TaskId::new_for_test(0, 0),
2699            Budget::INFINITE,
2700            None,
2701            None,
2702            Some(Arc::clone(&parent_io_cap)),
2703            None,
2704        )
2705        .with_registry_handle(Some(registry_handle))
2706        .with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("test-node")))
2707        .with_pressure(Arc::clone(&parent_pressure))
2708        .with_macaroon(token);
2709        let parent_macaroon = parent_cx
2710            .macaroon_handle()
2711            .expect("parent must retain macaroon capability");
2712
2713        let mut handle = scope
2714            .spawn_registered(&mut state, &parent_cx, move |child_cx| async move {
2715                // Verify registry inheritance
2716                let child_registry = child_cx
2717                    .registry_handle()
2718                    .expect("child must inherit registry capability");
2719                let same_registry = Arc::ptr_eq(&child_registry.as_arc(), &parent_registry_arc);
2720
2721                // Verify remote capability inheritance
2722                let child_remote = child_cx
2723                    .remote()
2724                    .expect("child must inherit remote capability");
2725                let node_name = child_remote.local_node().as_str().to_owned();
2726
2727                // Verify I/O capability inheritance
2728                let child_io_cap = child_cx
2729                    .io_cap_handle()
2730                    .expect("child must inherit I/O capability");
2731                let same_io_cap = Arc::ptr_eq(&child_io_cap, &parent_io_cap);
2732
2733                // Verify system pressure inheritance
2734                let child_pressure = child_cx
2735                    .pressure_handle()
2736                    .expect("child must inherit system pressure");
2737                let same_pressure = Arc::ptr_eq(&child_pressure, &parent_pressure);
2738
2739                // Verify macaroon inheritance
2740                let child_macaroon = child_cx
2741                    .macaroon_handle()
2742                    .expect("child must inherit macaroon capability");
2743                let same_macaroon = Arc::ptr_eq(&child_macaroon, &parent_macaroon);
2744
2745                // Verify timer capability inheritance (if parent has it)
2746                let has_timer = child_cx.has_timer();
2747
2748                (
2749                    same_registry,
2750                    node_name,
2751                    same_io_cap,
2752                    same_pressure,
2753                    same_macaroon,
2754                    has_timer,
2755                )
2756            })
2757            .unwrap();
2758
2759        // Complete the task and verify results
2760        let waker = std::task::Waker::noop().clone();
2761        let mut poll_cx = Context::from_waker(&waker);
2762
2763        let stored = state
2764            .get_stored_future(handle.task_id())
2765            .expect("spawn_registered must store the task");
2766        assert!(stored.poll(&mut poll_cx).is_ready());
2767
2768        let mut join_fut = std::pin::pin!(handle.join(&parent_cx));
2769        match join_fut.as_mut().poll(&mut poll_cx) {
2770            std::task::Poll::Ready(Ok((
2771                same_registry,
2772                node_name,
2773                same_io_cap,
2774                same_pressure,
2775                same_macaroon,
2776                has_timer,
2777            ))) => {
2778                assert!(
2779                    same_registry,
2780                    "child must inherit exact same registry instance"
2781                );
2782                assert_eq!(
2783                    node_name, "test-node",
2784                    "child must inherit remote capability"
2785                );
2786                assert!(same_io_cap, "child must inherit exact same I/O capability");
2787                assert!(
2788                    same_pressure,
2789                    "child must inherit exact same system pressure handle"
2790                );
2791                assert!(
2792                    same_macaroon,
2793                    "child must inherit exact same macaroon capability"
2794                );
2795                assert_eq!(
2796                    has_timer,
2797                    parent_cx.has_timer(),
2798                    "child timer capability should stay consistent with the runtime-backed parent"
2799                );
2800            }
2801            other => panic!("capability inheritance test failed: {other:?}"),
2802        }
2803    }
2804
2805    #[test]
2806    fn conformance_task_cancellation_propagation() {
2807        // INVARIANT: Cancelling a task via abort() propagates cancellation signal
2808
2809        use std::task::Context;
2810
2811        let mut state = RuntimeState::new();
2812        let cx = test_cx();
2813        let region = state.create_root_region(Budget::INFINITE);
2814        let scope = test_scope(region, Budget::INFINITE);
2815
2816        let (mut handle, mut stored) = scope
2817            .spawn(&mut state, &cx, |cx| async move {
2818                // Check cancellation status and respond accordingly
2819                if cx.checkpoint().is_err() {
2820                    "cancelled"
2821                } else {
2822                    "completed"
2823                }
2824            })
2825            .unwrap();
2826
2827        // Abort the task before it runs
2828        handle.abort();
2829
2830        let waker = std::task::Waker::noop().clone();
2831        let mut poll_cx = Context::from_waker(&waker);
2832
2833        // Task should complete and see the cancellation
2834        assert!(
2835            stored.poll(&mut poll_cx).is_ready(),
2836            "cancelled task must still complete"
2837        );
2838
2839        // Join should return the cancellation-aware result
2840        let mut join_fut = std::pin::pin!(handle.join(&cx));
2841        match join_fut.as_mut().poll(&mut poll_cx) {
2842            std::task::Poll::Ready(Ok(result)) => {
2843                assert_eq!(
2844                    result, "cancelled",
2845                    "cancelled task must observe cancellation via checkpoint()"
2846                );
2847            }
2848            other => panic!("cancelled task join failed: {other:?}"),
2849        }
2850    }
2851
2852    #[test]
2853    fn metamorphic_nested_scope_cancellation_closes_descendants_without_spawn_leaks() {
2854        use std::task::{Context, Poll};
2855
2856        struct YieldOnce(bool);
2857
2858        impl std::future::Future for YieldOnce {
2859            type Output = ();
2860
2861            fn poll(
2862                mut self: std::pin::Pin<&mut Self>,
2863                cx: &mut std::task::Context<'_>,
2864            ) -> Poll<()> {
2865                if self.0 {
2866                    Poll::Ready(())
2867                } else {
2868                    self.0 = true;
2869                    cx.waker().wake_by_ref();
2870                    Poll::Pending
2871                }
2872            }
2873        }
2874
2875        let mut state = RuntimeState::new();
2876        let cx = test_cx();
2877        let root = state.create_root_region(Budget::INFINITE);
2878        let child = state
2879            .create_child_region(root, Budget::INFINITE)
2880            .expect("child region");
2881        let grandchild = state
2882            .create_child_region(child, Budget::INFINITE)
2883            .expect("grandchild region");
2884        let child_scope = test_scope(child, Budget::INFINITE);
2885        let grandchild_scope = test_scope(grandchild, Budget::INFINITE);
2886        let finalizer_log = Arc::new(std::sync::Mutex::new(Vec::new()));
2887
2888        let child_log = Arc::clone(&finalizer_log);
2889        assert!(
2890            child_scope.defer_sync(&mut state, move || {
2891                child_log
2892                    .lock()
2893                    .expect("child finalizer log poisoned")
2894                    .push("child");
2895            }),
2896            "child finalizer should register before cancellation"
2897        );
2898
2899        let grandchild_log = Arc::clone(&finalizer_log);
2900        assert!(
2901            grandchild_scope.defer_sync(&mut state, move || {
2902                grandchild_log
2903                    .lock()
2904                    .expect("grandchild finalizer log poisoned")
2905                    .push("grandchild");
2906            }),
2907            "grandchild finalizer should register before cancellation"
2908        );
2909
2910        let mut child_handle = child_scope
2911            .spawn_registered(&mut state, &cx, |task_cx| async move {
2912                YieldOnce(false).await;
2913                if task_cx.checkpoint().is_err() {
2914                    "child_cancelled"
2915                } else {
2916                    "child_completed"
2917                }
2918            })
2919            .expect("spawn child task");
2920        let child_task_id = child_handle.task_id();
2921
2922        let mut grandchild_handle = grandchild_scope
2923            .spawn_registered(&mut state, &cx, |task_cx| async move {
2924                YieldOnce(false).await;
2925                if task_cx.checkpoint().is_err() {
2926                    "grandchild_cancelled"
2927                } else {
2928                    "grandchild_completed"
2929                }
2930            })
2931            .expect("spawn grandchild task");
2932        let grandchild_task_id = grandchild_handle.task_id();
2933
2934        let cancel_reason = CancelReason::shutdown().with_region(root);
2935        let cancelled = state.cancel_request(root, &cancel_reason, None);
2936        assert!(
2937            cancelled
2938                .iter()
2939                .any(|(task_id, _)| *task_id == child_task_id),
2940            "parent cancellation must reach child task"
2941        );
2942        assert!(
2943            cancelled
2944                .iter()
2945                .any(|(task_id, _)| *task_id == grandchild_task_id),
2946            "parent cancellation must reach grandchild task"
2947        );
2948
2949        let grandchild_tasks_before_failed_spawn = state
2950            .region(grandchild)
2951            .expect("grandchild region missing")
2952            .task_count();
2953        let live_tasks_before_failed_spawn = state.live_task_count();
2954        let failed_spawn = grandchild_scope.spawn(&mut state, &cx, |_| async { 99_u8 });
2955        let grandchild_tasks_after_failed_spawn = state
2956            .region(grandchild)
2957            .expect("grandchild region missing after failed spawn")
2958            .task_count();
2959        let live_tasks_after_failed_spawn = state.live_task_count();
2960
2961        let waker = std::task::Waker::noop().clone();
2962        let mut poll_cx = Context::from_waker(&waker);
2963        {
2964            let stored = state
2965                .get_stored_future(grandchild_task_id)
2966                .expect("grandchild stored task");
2967            let poll_result = stored.poll(&mut poll_cx);
2968            assert!(
2969                poll_result.is_pending(),
2970                "grandchild task should yield once before observing cancellation"
2971            );
2972        }
2973        {
2974            let stored = state
2975                .get_stored_future(grandchild_task_id)
2976                .expect("grandchild stored task");
2977            let poll_result = stored.poll(&mut poll_cx);
2978            let task_outcome = match poll_result {
2979                Poll::Ready(Outcome::Ok(())) => Outcome::Ok(()),
2980                Poll::Ready(Outcome::Panicked(payload)) => Outcome::Panicked(payload),
2981                other => panic!(
2982                    "grandchild task should complete once cancellation is observed: {other:?}"
2983                ),
2984            };
2985            if let Some(task_record) = state.task_mut(grandchild_task_id) {
2986                task_record.complete(task_outcome);
2987            }
2988        }
2989        let _ = state.task_completed(grandchild_task_id);
2990        state.advance_region_state(grandchild);
2991
2992        let mut grandchild_join_fut = std::pin::pin!(grandchild_handle.join(&cx));
2993        let grandchild_result = match grandchild_join_fut.as_mut().poll(&mut poll_cx) {
2994            Poll::Ready(Ok(result)) => result,
2995            other => panic!("grandchild cancellation join should succeed: {other:?}"),
2996        };
2997
2998        {
2999            let stored = state
3000                .get_stored_future(child_task_id)
3001                .expect("child stored task");
3002            let poll_result = stored.poll(&mut poll_cx);
3003            assert!(
3004                poll_result.is_pending(),
3005                "child task should yield once before observing cancellation"
3006            );
3007        }
3008        {
3009            let stored = state
3010                .get_stored_future(child_task_id)
3011                .expect("child stored task");
3012            let poll_result = stored.poll(&mut poll_cx);
3013            let task_outcome = match poll_result {
3014                Poll::Ready(Outcome::Ok(())) => Outcome::Ok(()),
3015                Poll::Ready(Outcome::Panicked(payload)) => Outcome::Panicked(payload),
3016                other => {
3017                    panic!("child task should complete once cancellation is observed: {other:?}")
3018                }
3019            };
3020            if let Some(task_record) = state.task_mut(child_task_id) {
3021                task_record.complete(task_outcome);
3022            }
3023        }
3024        let _ = state.task_completed(child_task_id);
3025        state.advance_region_state(child);
3026
3027        let mut child_join_fut = std::pin::pin!(child_handle.join(&cx));
3028        let child_result = match child_join_fut.as_mut().poll(&mut poll_cx) {
3029            Poll::Ready(Ok(result)) => result,
3030            other => panic!("child cancellation join should succeed: {other:?}"),
3031        };
3032
3033        assert_eq!(child_result, "child_cancelled");
3034        assert_eq!(grandchild_result, "grandchild_cancelled");
3035        assert!(
3036            matches!(failed_spawn, Err(SpawnError::RegionClosed(id)) if id == grandchild),
3037            "nested spawn after parent cancellation must fail against the closing grandchild region"
3038        );
3039        assert_eq!(
3040            grandchild_tasks_before_failed_spawn, grandchild_tasks_after_failed_spawn,
3041            "failed spawn after cancellation must not leak task membership into the grandchild region"
3042        );
3043        assert_eq!(
3044            live_tasks_before_failed_spawn, live_tasks_after_failed_spawn,
3045            "failed spawn after cancellation must not inflate runtime task count"
3046        );
3047        assert_eq!(
3048            *finalizer_log.lock().expect("finalizer log poisoned"),
3049            vec!["grandchild", "child"],
3050            "nested scope finalizers must run in reverse scope creation order"
3051        );
3052        assert!(
3053            state.region(grandchild).is_none(),
3054            "grandchild region should be reclaimed after close"
3055        );
3056        assert!(
3057            state.region(child).is_none(),
3058            "child region should be reclaimed after close"
3059        );
3060    }
3061
3062    #[test]
3063    fn conformance_race_loser_drain_invariant() {
3064        // INVARIANT: In race operations, losers are cancelled and fully drained
3065
3066        use std::task::{Context, Poll};
3067
3068        let mut state = RuntimeState::new();
3069        let cx = test_cx();
3070        let region = state.create_root_region(Budget::INFINITE);
3071        let scope = test_scope(region, Budget::INFINITE);
3072
3073        // Winner: completes immediately
3074        let (winner_handle, mut winner_stored) = scope
3075            .spawn(&mut state, &cx, |_| async { "winner" })
3076            .unwrap();
3077
3078        // Loser: would run longer, must be cancelled and drained
3079        let (loser_handle, mut loser_stored) = scope
3080            .spawn(&mut state, &cx, |cx| async move {
3081                // Simulate work that can be cancelled
3082                struct YieldOnce(bool);
3083                impl std::future::Future for YieldOnce {
3084                    type Output = ();
3085                    fn poll(
3086                        mut self: std::pin::Pin<&mut Self>,
3087                        cx: &mut std::task::Context<'_>,
3088                    ) -> std::task::Poll<()> {
3089                        if self.0 {
3090                            std::task::Poll::Ready(())
3091                        } else {
3092                            self.0 = true;
3093                            cx.waker().wake_by_ref();
3094                            std::task::Poll::Pending
3095                        }
3096                    }
3097                }
3098                YieldOnce(false).await;
3099
3100                // Check if we were cancelled
3101                if cx.checkpoint().is_err() {
3102                    "loser_cancelled"
3103                } else {
3104                    "loser_completed"
3105                }
3106            })
3107            .unwrap();
3108
3109        let waker = std::task::Waker::noop().clone();
3110        let mut poll_cx = Context::from_waker(&waker);
3111
3112        // Complete winner immediately
3113        assert!(winner_stored.poll(&mut poll_cx).is_ready());
3114
3115        // Start the race
3116        let handles = vec![winner_handle, loser_handle];
3117        let mut race_fut = std::pin::pin!(scope.race_all(&cx, handles));
3118
3119        // Race should be pending initially (waiting for loser drain)
3120        assert!(
3121            race_fut.as_mut().poll(&mut poll_cx).is_pending(),
3122            "race must wait for loser to be drained"
3123        );
3124
3125        // Drive loser to first yield (it's now pending, but abort signal propagates)
3126        assert!(loser_stored.poll(&mut poll_cx).is_pending());
3127
3128        // Still pending on race (loser not fully drained)
3129        assert!(race_fut.as_mut().poll(&mut poll_cx).is_pending());
3130
3131        // Drive loser to completion (should see cancellation)
3132        assert!(loser_stored.poll(&mut poll_cx).is_ready());
3133
3134        // Now race should complete with winner result
3135        match race_fut.as_mut().poll(&mut poll_cx) {
3136            Poll::Ready(Ok((result, winner_index))) => {
3137                assert_eq!(result, "winner", "race must return winner result");
3138                assert_eq!(winner_index, 0, "winner index must be correct");
3139            }
3140            other => panic!("race must complete after loser drain: {other:?}"),
3141        }
3142    }
3143
3144    #[test]
3145    fn conformance_region_quiescence_on_empty() {
3146        // INVARIANT: Empty regions reach quiescence and can be closed immediately
3147        let mut state = RuntimeState::new();
3148        let cx = test_cx();
3149        let parent_region = state.create_root_region(Budget::INFINITE);
3150        let scope = test_scope(parent_region, Budget::INFINITE);
3151
3152        // Create and immediately close a child region with no spawned tasks
3153        let outcome = block_on(scope.region(
3154            &mut state,
3155            &cx,
3156            crate::types::policy::FailFast,
3157            |_child_scope, _state| {
3158                // Don't spawn any tasks - region should be empty
3159                std::future::ready(Outcome::Ok("empty_region_completed"))
3160            },
3161        ))
3162        .expect("empty child region must complete");
3163
3164        match outcome {
3165            Outcome::Ok(result) => {
3166                assert_eq!(
3167                    result, "empty_region_completed",
3168                    "empty region must reach quiescence immediately"
3169                );
3170            }
3171            other => panic!("empty region must complete successfully: {other:?}"),
3172        }
3173
3174        // Child region should be cleaned up (no longer in parent's child list)
3175        let parent_record = state
3176            .region(parent_region)
3177            .expect("parent region must exist");
3178        assert!(
3179            parent_record.child_ids().is_empty(),
3180            "completed child region must be removed from parent"
3181        );
3182    }
3183
3184    #[test]
3185    fn conformance_spawn_into_closed_region_fails() {
3186        // INVARIANT: Cannot spawn tasks into regions that have begun closing
3187        let mut state = RuntimeState::new();
3188        let cx = test_cx();
3189        let region = state.create_root_region(Budget::INFINITE);
3190        let scope = test_scope(region, Budget::INFINITE);
3191
3192        // Close the region
3193        let region_record = state.region_mut(region).expect("region must exist");
3194        region_record.begin_close(None);
3195
3196        // Attempt to spawn should fail
3197        let spawn_result = scope.spawn(&mut state, &cx, |_| async { 42 });
3198
3199        assert!(
3200            matches!(spawn_result, Err(SpawnError::RegionClosed(_))),
3201            "spawning into closed region must fail with RegionClosed error"
3202        );
3203
3204        // State should remain consistent (no orphaned tasks)
3205        assert!(
3206            state.tasks_is_empty()
3207                || state
3208                    .region(region)
3209                    .is_none_or(|r| r.task_ids().is_empty()),
3210            "failed spawn must not create orphaned tasks"
3211        );
3212    }
3213
3214    #[test]
3215    fn conformance_join_multiple_tasks_preserves_results() {
3216        // INVARIANT: join_all preserves all task results in order
3217
3218        use std::task::{Context, Poll};
3219
3220        let mut state = RuntimeState::new();
3221        let cx = test_cx();
3222        let region = state.create_root_region(Budget::INFINITE);
3223        let scope = test_scope(region, Budget::INFINITE);
3224
3225        // Spawn multiple tasks with different results
3226        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 100_i32 }).unwrap();
3227        let (h2, mut t2) = scope.spawn(&mut state, &cx, |_| async { 200_i32 }).unwrap();
3228        let (h3, mut t3) = scope.spawn(&mut state, &cx, |_| async { 300_i32 }).unwrap();
3229
3230        let waker = std::task::Waker::noop().clone();
3231        let mut poll_cx = Context::from_waker(&waker);
3232
3233        // Complete all tasks
3234        assert!(t1.poll(&mut poll_cx).is_ready());
3235        assert!(t2.poll(&mut poll_cx).is_ready());
3236        assert!(t3.poll(&mut poll_cx).is_ready());
3237
3238        // Join all tasks
3239        let handles = vec![h1, h2, h3];
3240        let mut join_all_fut = std::pin::pin!(scope.join_all(&cx, handles));
3241
3242        match join_all_fut.as_mut().poll(&mut poll_cx) {
3243            Poll::Ready(results) => {
3244                assert_eq!(results.len(), 3, "join_all must return all results");
3245
3246                // Results must be in the same order as handles
3247                assert_eq!(
3248                    results[0].as_ref().unwrap(),
3249                    &100,
3250                    "first task result must be preserved"
3251                );
3252                assert_eq!(
3253                    results[1].as_ref().unwrap(),
3254                    &200,
3255                    "second task result must be preserved"
3256                );
3257                assert_eq!(
3258                    results[2].as_ref().unwrap(),
3259                    &300,
3260                    "third task result must be preserved"
3261                );
3262            }
3263            other @ Poll::Pending => panic!("join_all must complete with all results: {other:?}"),
3264        }
3265    }
3266
3267    #[test]
3268    fn conformance_panic_propagation_through_join() {
3269        // INVARIANT: Task panics are captured and propagated through join as JoinError::Panicked
3270
3271        use std::task::{Context, Poll};
3272
3273        let mut state = RuntimeState::new();
3274        let cx = test_cx();
3275        let region = state.create_root_region(Budget::INFINITE);
3276        let scope = test_scope(region, Budget::INFINITE);
3277
3278        // Spawn a task that panics with a specific message
3279        let (mut handle, mut stored) = scope
3280            .spawn(&mut state, &cx, |_| async {
3281                std::panic::panic_any("test_panic_message");
3282            })
3283            .unwrap();
3284
3285        let waker = std::task::Waker::noop().clone();
3286        let mut poll_cx = Context::from_waker(&waker);
3287
3288        // Task execution should complete with Panicked outcome
3289        match stored.poll(&mut poll_cx) {
3290            Poll::Ready(crate::types::Outcome::Panicked(_)) => {
3291                // Expected: panic was caught and wrapped as Outcome::Panicked
3292            }
3293            other => panic!("panicking task must complete with Panicked outcome: {other:?}"),
3294        }
3295
3296        // Join should propagate the panic as JoinError::Panicked
3297        let mut join_fut = std::pin::pin!(handle.join(&cx));
3298        match join_fut.as_mut().poll(&mut poll_cx) {
3299            Poll::Ready(Err(JoinError::Panicked(payload))) => {
3300                assert_eq!(
3301                    payload.message(),
3302                    "test_panic_message",
3303                    "join must preserve panic payload message"
3304                );
3305            }
3306            other => panic!("join of panicked task must return JoinError::Panicked: {other:?}"),
3307        }
3308    }
3309}