Skip to main content

asupersync/cx/
scope.rs

1//! Scope API for spawning work within a region.
2//!
3//! A `Scope` provides the API for spawning tasks, creating child regions,
4//! and registering finalizers.
5//!
6//! # Execution Tiers and Soundness Rules
7//!
8//! Asupersync defines two execution tiers with different constraints:
9//!
10//! ## Fiber Tier (Phase 0)
11//!
12//! - Single-thread, borrow-friendly execution
13//! - Can capture borrowed references (`&T`) since no migration
14//! - Implemented via `spawn_local` (currently requires Send bounds; relaxed in Phase 1+)
15//!
16//! ## Task Tier (Phase 1+)
17//!
18//! - Multi-threaded, `Send` tasks that may migrate across workers
19//! - **Must capture only `Send + 'static` data** by construction
20//! - Can reference region-owned data via [`RRef<T>`](crate::types::rref::RRef)
21//!
22//! # Soundness Rules for Send Tasks
23//!
24//! The [`spawn`](Scope::spawn) method enforces the following bounds:
25//!
26//! | Component | Bound | Rationale |
27//! |-----------|-------|-----------|
28//! | Factory | `F: Send + 'static` | Factory may be called on any worker |
29//! | Future | `Fut: Send + 'static` | Task may migrate between polls |
30//! | Output | `Fut::Output: Send + 'static` | Result sent to potentially different thread |
31//!
32//! ## What Can Be Captured
33//!
34//! **Allowed captures in Send tasks:**
35//! - Owned `'static` data that is `Send` (e.g., `String`, `Vec<T>`, `Arc<T>`)
36//! - [`RRef<T>`](crate::types::rref::RRef) handles to region-heap-allocated data
37//! - Atomic types (`AtomicU64`, etc.)
38//! - Clone'd `Cx` (the capability context)
39//!
40//! **Disallowed captures:**
41//! - Borrowed references (`&T`, `&mut T`) - not `'static`
42//! - `Rc<T>`, `RefCell<T>` - not `Send`
43//! - Raw pointers (unless wrapped in a `Send` type)
44//! - References to stack-local data
45//!
46//! ## RRef for Region-Owned Data
47//!
48//! When tasks need to share data within a region without cloning, use the region
49//! heap and [`RRef<T>`](crate::types::rref::RRef):
50//!
51//! ```ignore
52//! // Allocate in region heap
53//! let index = region.heap_alloc(expensive_data);
54//! let rref = RRef::<ExpensiveData>::new(region_id, index);
55//!
56//! // Pass RRef to task - it's Copy + Send
57//! scope.spawn(state, &cx, move |cx| async move {
58//!     // Access via region record (requires runtime lookup)
59//!     let data = rref.get_via_region(&region_record)?;
60//!     process(data).await
61//! });
62//! ```
63//!
64//! # Compile-Time Enforcement
65//!
66//! The bounds are enforced at compile time. Attempting to capture non-Send
67//! or non-static data will result in a compilation error:
68//!
69//! ```compile_fail
70//! use std::rc::Rc;
71//! use asupersync::cx::Scope;
72//!
73//! fn try_capture_rc(scope: &Scope, state: &mut RuntimeState, cx: &Cx) {
74//!     let rc = Rc::new(42); // Rc is !Send
75//!     scope.spawn(state, cx, move |_| async move {
76//!         println!("{}", rc); // ERROR: Rc is not Send
77//!     });
78//! }
79//! ```
80//!
81//! ```compile_fail
82//! use asupersync::cx::Scope;
83//!
84//! fn try_capture_borrow(scope: &Scope, state: &mut RuntimeState, cx: &Cx) {
85//!     let local = 42;
86//!     let reference = &local; // Borrowed, not 'static
87//!     scope.spawn(state, cx, move |_| async move {
88//!         println!("{}", reference); // ERROR: borrowed data not 'static
89//!     });
90//! }
91//! ```
92//!
93//! # Lab Runtime Compatibility
94//!
95//! The Send bounds do not affect lab runtime determinism. The lab runtime
96//! simulates multi-worker scheduling deterministically (same seed = same
97//! execution), regardless of whether tasks are actually migrated.
98
99use crate::channel::oneshot;
100use crate::combinator::{Either, Select};
101use crate::cx::{Cx, cap};
102use crate::record::{AdmissionError, TaskRecord};
103use crate::runtime::task_handle::{JoinError, TaskHandle};
104use crate::runtime::{RegionCreateError, RuntimeState, SpawnError, StoredTask};
105use crate::tracing_compat::{debug, debug_span};
106use crate::types::{Budget, CancelReason, Outcome, PanicPayload, Policy, RegionId, TaskId};
107use std::future::Future;
108use std::marker::PhantomData;
109use std::pin::Pin;
110use std::sync::Arc;
111use std::task::{Context, Poll};
112
113/// A scope for spawning work within a region.
114///
115/// The scope provides methods for:
116/// - Spawning tasks
117/// - Creating child regions
118/// - Registering finalizers
119/// - Cancelling all children
120pub struct Scope<'r, P: Policy = crate::types::policy::FailFast> {
121    /// The region this scope belongs to.
122    pub(crate) region: RegionId,
123    /// The budget for this scope.
124    pub(crate) budget: Budget,
125    /// Phantom data for the policy type.
126    pub(crate) _policy: PhantomData<&'r P>,
127}
128
129#[pin_project::pin_project]
130pub(crate) struct CatchUnwind<F> {
131    #[pin]
132    pub(crate) inner: F,
133}
134
135impl<F: Future> Future for CatchUnwind<F> {
136    type Output = std::thread::Result<F::Output>;
137
138    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139        let mut this = self.project();
140        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
141            this.inner.as_mut().poll(cx)
142        }));
143        match result {
144            Ok(Poll::Pending) => Poll::Pending,
145            Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
146            Err(payload) => Poll::Ready(Err(payload)),
147        }
148    }
149}
150
151pub(crate) fn payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
152    payload
153        .downcast_ref::<&str>()
154        .map(ToString::to_string)
155        .or_else(|| payload.downcast_ref::<String>().cloned())
156        .unwrap_or_else(|| "unknown panic".to_string())
157}
158
159struct RegionRunner<'a, Fut> {
160    fut: Pin<&'a mut CatchUnwind<Fut>>,
161    state: Option<&'a mut RuntimeState>,
162    child_region: RegionId,
163}
164
165impl<'a, Fut: Future> Future for RegionRunner<'a, Fut> {
166    type Output = (std::thread::Result<Fut::Output>, &'a mut RuntimeState);
167
168    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
169        let this = self.get_mut();
170        match this.fut.as_mut().poll(cx) {
171            Poll::Ready(res) => {
172                let state = this.state.take().expect("polled after ready");
173                Poll::Ready((res, state))
174            }
175            Poll::Pending => Poll::Pending,
176        }
177    }
178}
179
180impl<Fut> Drop for RegionRunner<'_, Fut> {
181    fn drop(&mut self) {
182        if let Some(state) = self.state.take() {
183            let reason = CancelReason::fail_fast().with_region(self.child_region);
184            let _ = state.cancel_request(self.child_region, &reason, None);
185            state.advance_region_state(self.child_region);
186        }
187    }
188}
189
190struct RegionCloseFuture {
191    state: Arc<parking_lot::Mutex<crate::record::region::RegionCloseState>>,
192}
193
194impl Future for RegionCloseFuture {
195    type Output = ();
196
197    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
198        let mut state = self.state.lock();
199        if state.closed {
200            Poll::Ready(())
201        } else {
202            if !state
203                .waker
204                .as_ref()
205                .is_some_and(|w| w.will_wake(cx.waker()))
206            {
207                state.waker = Some(cx.waker().clone());
208            }
209            Poll::Pending
210        }
211    }
212}
213
214impl Drop for RegionCloseFuture {
215    fn drop(&mut self) {
216        let mut state = self.state.lock();
217        state.waker = None;
218    }
219}
220
221impl<P: Policy> Scope<'_, P> {
222    /// Creates a new scope (internal use).
223    #[must_use]
224    #[allow(dead_code)]
225    #[cfg_attr(feature = "test-internals", visibility::make(pub))]
226    pub(crate) fn new(region: RegionId, budget: Budget) -> Self {
227        Self {
228            region,
229            budget,
230            _policy: PhantomData,
231        }
232    }
233
234    /// Returns the region ID for this scope.
235    #[must_use]
236    pub fn region_id(&self) -> RegionId {
237        self.region
238    }
239
240    /// Returns the budget for this scope.
241    #[must_use]
242    pub fn budget(&self) -> Budget {
243        self.budget
244    }
245
246    // =========================================================================
247    // Task Spawning
248    // =========================================================================
249
250    /// Spawns a new task within this scope's region.
251    ///
252    /// This is the **Task Tier** spawn method for parallel execution. The task
253    /// may migrate between worker threads, so all captured data must be thread-safe.
254    ///
255    /// The task will be owned by the region and will be cancelled if the
256    /// region is cancelled. The returned `TaskHandle` can be used to await
257    /// the task's result.
258    ///
259    /// # Arguments
260    ///
261    /// * `state` - The runtime state
262    /// * `cx` - The capability context (used for tracing/authorization)
263    /// * `f` - A closure that produces the future, receiving the new task's `Cx`
264    ///
265    /// # Returns
266    ///
267    /// A `TaskHandle<T>` that can be used to await the task's result.
268    ///
269    /// # Soundness Rules (Type Bounds)
270    ///
271    /// The following bounds encode the soundness rules for Send tasks:
272    ///
273    /// * `F: FnOnce(Cx) -> Fut + Send + 'static` - Factory called on any worker
274    /// * `Fut: Future + Send + 'static` - Task may migrate between polls
275    /// * `Fut::Output: Send + 'static` - Result crosses thread boundary
276    ///
277    /// These bounds ensure captured data can safely cross thread boundaries.
278    /// Use [`RRef<T>`](crate::types::rref::RRef) for region-heap-allocated data.
279    ///
280    /// # Allowed Captures
281    ///
282    /// | Type | Allowed | Reason |
283    /// |------|---------|--------|
284    /// | `String`, `Vec<T>`, owned data | ✅ | Send + 'static by ownership |
285    /// | `Arc<T>` where T: Send + Sync | ✅ | Thread-safe shared ownership |
286    /// | `RRef<T>` | ✅ | Region-heap reference, Copy + Send |
287    /// | `Cx` (cloned) | ✅ | Capability context is Send + Sync |
288    /// | `Rc<T>`, `RefCell<T>` | ❌ | Not Send |
289    /// | `&T`, `&mut T` | ❌ | Not 'static |
290    ///
291    /// # Example
292    ///
293    /// ```ignore
294    /// let handle = scope.spawn(&mut state, &cx, |cx| async move {
295    ///     cx.trace("Child task running");
296    ///     compute_value().await
297    /// });
298    ///
299    /// let result = handle.join(&cx).await?;
300    /// ```
301    ///
302    /// # Example with RRef
303    ///
304    /// ```ignore
305    /// // Allocate expensive data in region heap
306    /// let index = region_record.heap_alloc(vec![1, 2, 3, 4, 5]);
307    /// let rref = RRef::<Vec<i32>>::new(region_id, index);
308    ///
309    /// // RRef is Copy + Send, can be captured by multiple tasks
310    /// scope.spawn(&mut state, &cx, move |cx| async move {
311    ///     // Would access via runtime state in real code
312    ///     process_data(rref).await
313    /// });
314    /// ```
315    ///
316    /// # Compile-Time Errors
317    ///
318    /// Attempting to capture `!Send` types fails at compile time:
319    ///
320    /// ```compile_fail,E0277
321    /// # // This test demonstrates that Rc cannot be captured
322    /// use std::rc::Rc;
323    /// fn require_send<T: Send>(_: &T) {}
324    /// fn test_rc_rejected<'r, P: asupersync::types::Policy>(
325    ///     scope: &asupersync::cx::Scope<'r, P>,
326    ///     state: &mut asupersync::runtime::RuntimeState,
327    ///     cx: &asupersync::cx::Cx,
328    /// ) {
329    ///     let rc = Rc::new(42);
330    ///     require_send(&rc);
331    ///     let _ = scope.spawn(state, cx, move |_| async move {
332    ///         let _ = rc;  // Rc<i32> is not Send
333    ///     });
334    /// }
335    /// ```
336    ///
337    /// Attempting to capture non-`'static` references fails:
338    ///
339    /// ```compile_fail,E0597
340    /// # // This test demonstrates that borrowed data cannot be captured
341    /// fn require_static<T: 'static>(_: T) {}
342    /// fn test_borrow_rejected<'r, P: asupersync::types::Policy>(
343    ///     scope: &asupersync::cx::Scope<'r, P>,
344    ///     state: &mut asupersync::runtime::RuntimeState,
345    ///     cx: &asupersync::cx::Cx,
346    /// ) {
347    ///     let local = 42;
348    ///     let borrow = &local;
349    ///     require_static(borrow);
350    ///     let _ = scope.spawn(state, cx, move |_| async move {
351    ///         let _ = borrow;  // &i32 is not 'static
352    ///     });
353    /// }
354    /// ```
355    pub fn spawn<F, Fut, Caps>(
356        &self,
357        state: &mut RuntimeState,
358        cx: &Cx<Caps>,
359        f: F,
360    ) -> Result<(TaskHandle<Fut::Output>, StoredTask), SpawnError>
361    where
362        Caps: cap::HasSpawn + Send + Sync + 'static,
363        F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
364        Fut: Future + Send + 'static,
365        Fut::Output: Send + 'static,
366    {
367        // Create oneshot channel for result delivery
368        let (tx, rx) = oneshot::channel::<Result<Fut::Output, JoinError>>();
369
370        // Create task record
371        let task_id = self.create_task_record(state)?;
372
373        // Trace task spawn event
374        let _span = debug_span!(
375            "task_spawn",
376            task_id = ?task_id,
377            region_id = ?self.region,
378            initial_state = "Created",
379            budget_deadline = ?self.budget.deadline,
380            budget_poll_quota = self.budget.poll_quota,
381            budget_cost_quota = ?self.budget.cost_quota,
382            budget_priority = self.budget.priority,
383            budget_source = "scope"
384        )
385        .entered();
386        debug!(
387            task_id = ?task_id,
388            region_id = ?self.region,
389            initial_state = "Created",
390            budget_deadline = ?self.budget.deadline,
391            budget_poll_quota = self.budget.poll_quota,
392            budget_cost_quota = ?self.budget.cost_quota,
393            budget_priority = self.budget.priority,
394            budget_source = "scope",
395            "task spawned"
396        );
397
398        let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
399
400        // Create the TaskHandle
401        let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
402
403        // Set the shared inner state in the TaskRecord
404        // This links the user-facing Cx to the runtime's TaskRecord
405        if let Some(record) = state.task_mut(task_id) {
406            record.set_cx_inner(child_cx.inner.clone());
407            record.set_cx(child_cx_full.clone());
408        }
409
410        // Capture child_cx for result sending
411        let cx_for_send = child_cx_full;
412
413        // Instantiate the future with the child context.
414        // We use a guard to rollback task creation if the factory panics.
415        // This prevents zombie tasks (recorded but never started) which would
416        // cause the region to never close (deadlock).
417        let future = {
418            struct TaskCreationGuard<'a> {
419                state: &'a mut RuntimeState,
420                task_id: TaskId,
421                region_id: RegionId,
422                committed: bool,
423            }
424
425            impl Drop for TaskCreationGuard<'_> {
426                fn drop(&mut self) {
427                    if !self.committed {
428                        // Rollback task creation
429                        if let Some(region) = self.state.region_mut(self.region_id) {
430                            region.remove_task(self.task_id);
431                        }
432                        self.state.remove_task(self.task_id);
433                    }
434                }
435            }
436
437            let mut guard = TaskCreationGuard {
438                state,
439                task_id,
440                region_id: self.region,
441                committed: false,
442            };
443
444            let fut = f(child_cx);
445            guard.committed = true;
446            fut
447        };
448
449        // Wrap the future to send its result through the channel
450        // We use CatchUnwind to ensure panics are propagated as JoinError::Panicked
451        // rather than silent channel closure (which looks like cancellation).
452        let wrapped = async move {
453            let result_result = CatchUnwind { inner: future }.await;
454            match result_result {
455                Ok(result) => {
456                    let _ = tx.send(&cx_for_send, Ok(result));
457                    crate::types::Outcome::Ok(())
458                }
459                Err(payload) => {
460                    let msg = payload_to_string(&payload);
461                    let panic_payload = PanicPayload::new(msg);
462                    let _ = tx.send(
463                        &cx_for_send,
464                        Err(JoinError::Panicked(panic_payload.clone())),
465                    );
466                    crate::types::Outcome::Panicked(panic_payload)
467                }
468            }
469        };
470
471        // Create stored task with task_id for poll tracing
472        let stored = StoredTask::new_with_id(wrapped, task_id);
473
474        Ok((handle, stored))
475    }
476
477    /// Spawns a Send task (explicit Task Tier API).
478    ///
479    /// This is an explicit alias for [`spawn`](Self::spawn) that makes the
480    /// execution tier clear in the API. Use this when you want to emphasize
481    /// that the task may migrate between workers.
482    ///
483    /// # Type Bounds (Soundness Rules)
484    ///
485    /// Same as [`spawn`](Self::spawn):
486    /// - `F: FnOnce(Cx) -> Fut + Send + 'static`
487    /// - `Fut: Future + Send + 'static`
488    /// - `Fut::Output: Send + 'static`
489    ///
490    /// # Example
491    ///
492    /// ```ignore
493    /// // Explicit task tier spawn
494    /// let (handle, stored) = scope.spawn_task(&mut state, &cx, |cx| async move {
495    ///     // This task may run on any worker
496    ///     compute_parallel().await
497    /// })?;
498    /// ```
499    #[inline]
500    pub fn spawn_task<F, Fut, Caps>(
501        &self,
502        state: &mut RuntimeState,
503        cx: &Cx<Caps>,
504        f: F,
505    ) -> Result<(TaskHandle<Fut::Output>, StoredTask), SpawnError>
506    where
507        Caps: cap::HasSpawn + Send + Sync + 'static,
508        F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
509        Fut: Future + Send + 'static,
510        Fut::Output: Send + 'static,
511    {
512        self.spawn(state, cx, f)
513    }
514
515    /// Spawns a task and registers it with the runtime state.
516    ///
517    /// This is a convenience method that combines `spawn()` with
518    /// `RuntimeState::store_spawned_task()`. It's the primary method
519    /// used by the `spawn!` macro.
520    ///
521    /// # Arguments
522    ///
523    /// * `state` - The runtime state (for storing the task)
524    /// * `cx` - The capability context (for creating child context)
525    /// * `f` - A closure that produces the future, receiving the new task's `Cx`
526    ///
527    /// # Returns
528    ///
529    /// A `TaskHandle<T>` for awaiting the task's result.
530    ///
531    /// # Example
532    ///
533    /// ```ignore
534    /// let handle = scope.spawn_registered(&mut state, &cx, |cx| async move {
535    ///     cx.trace("Child task running");
536    ///     compute_value().await
537    /// })?;
538    ///
539    /// let result = handle.join(&cx).await?;
540    /// ```
541    pub fn spawn_registered<F, Fut, Caps>(
542        &self,
543        state: &mut RuntimeState,
544        cx: &Cx<Caps>,
545        f: F,
546    ) -> Result<TaskHandle<Fut::Output>, SpawnError>
547    where
548        Caps: cap::HasSpawn + Send + Sync + 'static,
549        F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
550        Fut: Future + Send + 'static,
551        Fut::Output: Send + 'static,
552    {
553        let (handle, stored) = self.spawn(state, cx, f)?;
554        state.store_spawned_task(handle.task_id(), stored);
555        Ok(handle)
556    }
557
558    /// Spawns a local (non-Send) task within this scope's region (**Fiber Tier**).
559    ///
560    /// This is the **Fiber Tier** spawn method. Local tasks are pinned to the
561    /// current worker thread and cannot be stolen by other workers. This enables
562    /// borrow-friendly execution with `!Send` types like `Rc` or `RefCell`.
563    ///
564    /// # Execution Tier: Fiber
565    ///
566    /// | Property | Value |
567    /// |----------|-------|
568    /// | Migration | Never (thread-pinned) |
569    /// | Send bound | Not required |
570    /// | Borrowing | Can capture `&T` (same-thread) |
571    /// | Use case | `!Send` types, borrowed data |
572    ///
573    /// # Arguments
574    ///
575    /// * `state` - The runtime state
576    /// * `cx` - The capability context
577    /// * `f` - A closure that produces the future, receiving the new task's `Cx`
578    ///
579    /// # Panics
580    ///
581    /// Panics if called from a blocking thread (spawn_blocking context).
582    ///
583    /// # Example
584    ///
585    /// ```ignore
586    /// use std::rc::Rc;
587    /// use std::cell::RefCell;
588    ///
589    /// let counter = Rc::new(RefCell::new(0));
590    /// let counter_clone = counter.clone();
591    ///
592    /// let handle = scope.spawn_local(&mut state, &cx, |cx| async move {
593    ///     // Rc<RefCell<_>> is !Send but allowed in local tasks
594    ///     *counter_clone.borrow_mut() += 1;
595    /// });
596    /// ```
597    #[allow(clippy::too_many_lines)]
598    pub fn spawn_local<F, Fut, Caps>(
599        &self,
600        state: &mut RuntimeState,
601        cx: &Cx<Caps>,
602        f: F,
603    ) -> Result<TaskHandle<Fut::Output>, SpawnError>
604    where
605        Caps: cap::HasSpawn + Send + Sync + 'static,
606        F: FnOnce(Cx<Caps>) -> Fut + 'static,
607        Fut: Future + 'static,
608        Fut::Output: Send + 'static,
609    {
610        use crate::runtime::stored_task::LocalStoredTask;
611        use crate::runtime::task_handle::JoinError;
612
613        // Create oneshot channel for result delivery
614        let (result_tx, rx) = oneshot::channel::<Result<Fut::Output, JoinError>>();
615
616        // Create task record
617        let task_id = self.create_task_record(state)?;
618
619        // Trace task spawn event
620        let _span = debug_span!(
621            "task_spawn",
622            task_id = ?task_id,
623            region_id = ?self.region,
624            initial_state = "Created",
625            budget_deadline = ?self.budget.deadline,
626            budget_poll_quota = self.budget.poll_quota,
627            budget_cost_quota = ?self.budget.cost_quota,
628            budget_priority = self.budget.priority,
629            budget_source = "scope_local"
630        )
631        .entered();
632        debug!(
633            task_id = ?task_id,
634            region_id = ?self.region,
635            initial_state = "Created",
636            budget_deadline = ?self.budget.deadline,
637            budget_poll_quota = self.budget.poll_quota,
638            budget_cost_quota = ?self.budget.cost_quota,
639            budget_priority = self.budget.priority,
640            budget_source = "scope_local",
641            "local task spawned"
642        );
643
644        let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
645
646        // Create the TaskHandle
647        let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
648
649        // Set the shared inner state in the TaskRecord
650        if let Some(record) = state.task_mut(task_id) {
651            record.set_cx_inner(child_cx.inner.clone());
652            record.set_cx(child_cx_full.clone());
653        }
654
655        // Capture child_cx for result sending
656        let cx_for_send = child_cx_full;
657
658        // Instantiate the future with the child context.
659        // We use a guard to rollback task creation if the factory panics.
660        let future = {
661            struct TaskCreationGuard<'a> {
662                state: &'a mut RuntimeState,
663                task_id: TaskId,
664                region_id: RegionId,
665                committed: bool,
666            }
667
668            impl Drop for TaskCreationGuard<'_> {
669                fn drop(&mut self) {
670                    if !self.committed {
671                        // Rollback task creation
672                        if let Some(region) = self.state.region_mut(self.region_id) {
673                            region.remove_task(self.task_id);
674                        }
675                        self.state.remove_task(self.task_id);
676                    }
677                }
678            }
679
680            let mut guard = TaskCreationGuard {
681                state,
682                task_id,
683                region_id: self.region,
684                committed: false,
685            };
686
687            let fut = f(child_cx);
688            guard.committed = true;
689            fut
690        };
691
692        // Wrap the future to send its result through the channel
693        let wrapped = async move {
694            let result_result = CatchUnwind { inner: future }.await;
695            match result_result {
696                Ok(result) => {
697                    let _ = result_tx.send(&cx_for_send, Ok(result));
698                    crate::types::Outcome::Ok(())
699                }
700                Err(payload) => {
701                    let msg = payload_to_string(&payload);
702                    let panic_payload = PanicPayload::new(msg);
703                    let _ = result_tx.send(
704                        &cx_for_send,
705                        Err(JoinError::Panicked(panic_payload.clone())),
706                    );
707                    crate::types::Outcome::Panicked(panic_payload)
708                }
709            }
710        };
711
712        // Create local stored task
713        let stored = LocalStoredTask::new_with_id(wrapped, task_id);
714
715        // Store in thread-local storage
716        crate::runtime::local::store_local_task(task_id, stored);
717
718        // Mark the task record as local so that safety guards in the scheduler
719        // (inject_ready panic, try_steal debug_assert) can detect accidental
720        // cross-thread migration of !Send futures.
721        if let Some(record) = state.task_mut(task_id) {
722            if let Some(worker_id) = crate::runtime::scheduler::three_lane::current_worker_id() {
723                record.pin_to_worker(worker_id);
724            } else {
725                record.mark_local();
726            }
727            record.wake_state.notify();
728        }
729
730        // Schedule the task on the current worker's NON-STEALABLE local scheduler.
731        // spawn_local tasks MUST NOT be stealable.
732        let scheduled = crate::runtime::scheduler::three_lane::schedule_local_task(task_id);
733
734        if scheduled {
735            if let Some(record) = state.task(task_id) {
736                let _ = record.wake_state.notify();
737            }
738            return Ok(handle);
739        }
740
741        // No local scheduler available: rollback to avoid a permanently parked task.
742        let _ = crate::runtime::local::remove_local_task(task_id);
743        if let Some(region) = state.region(self.region) {
744            region.remove_task(task_id);
745        }
746        state.remove_task(task_id);
747        Err(SpawnError::LocalSchedulerUnavailable)
748    }
749
750    /// Spawns a blocking operation on a dedicated thread pool.
751    ///
752    /// This is used for CPU-bound or legacy synchronous operations that
753    /// should not block async workers. The closure runs on a separate
754    /// thread pool designed for blocking work.
755    ///
756    /// # Arguments
757    ///
758    /// * `state` - The runtime state
759    /// * `cx` - The capability context
760    /// * `f` - The blocking closure to run, receiving a context
761    ///
762    /// # Type Bounds
763    ///
764    /// * `F: FnOnce(Cx) -> R + Send + 'static` - The closure must be Send
765    /// * `R: Send + 'static` - The result must be Send
766    ///
767    /// # Example
768    ///
769    /// ```ignore
770    /// let (handle, stored) = scope.spawn_blocking(&mut state, &cx, |cx| {
771    ///     cx.trace("Starting blocking work");
772    ///     // CPU-intensive work
773    ///     expensive_computation()
774    /// });
775    ///
776    /// let result = handle.join(&cx).await?;
777    /// ```
778    ///
779    /// # Note
780    ///
781    /// In Phase 0 (single-threaded), blocking operations run inline.
782    /// A proper blocking pool is implemented in Phase 1+.
783    pub fn spawn_blocking<F, R, Caps>(
784        &self,
785        state: &mut RuntimeState,
786        cx: &Cx<Caps>, // Parent Cx
787        f: F,
788    ) -> Result<(TaskHandle<R>, StoredTask), SpawnError>
789    where
790        Caps: cap::HasSpawn + Send + Sync + 'static,
791        F: FnOnce(Cx<Caps>) -> R + Send + 'static,
792        R: Send + 'static,
793    {
794        // Create oneshot channel for result delivery
795        let (tx, rx) = oneshot::channel::<Result<R, JoinError>>();
796
797        // Create task record
798        let task_id = self.create_task_record(state)?;
799
800        // Trace task spawn event
801        debug!(
802            task_id = ?task_id,
803            region_id = ?self.region,
804            initial_state = "Created",
805            poll_quota = self.budget.poll_quota,
806            spawn_kind = "blocking",
807            "blocking task spawned"
808        );
809
810        let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
811
812        // Create the TaskHandle
813        let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
814
815        // Set the shared inner state in the TaskRecord
816        if let Some(record) = state.task_mut(task_id) {
817            record.set_cx_inner(child_cx.inner.clone());
818            record.set_cx(child_cx_full.clone());
819        }
820
821        // Capture child_cx for result sending
822        let cx_for_send = child_cx_full;
823
824        // For Phase 0, we run blocking code as an async task
825        // In Phase 1+, this would spawn on a blocking thread pool
826        let wrapped = async move {
827            // Execute the blocking closure with child context
828            // Catch panics to report them correctly
829            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(child_cx)));
830            match result {
831                Ok(res) => {
832                    let _ = tx.send(&cx_for_send, Ok(res));
833                    crate::types::Outcome::Ok(())
834                }
835                Err(payload) => {
836                    let msg = payload_to_string(&payload);
837                    let panic_payload = PanicPayload::new(msg);
838                    let _ = tx.send(
839                        &cx_for_send,
840                        Err(JoinError::Panicked(panic_payload.clone())),
841                    );
842                    crate::types::Outcome::Panicked(panic_payload)
843                }
844            }
845        };
846
847        let stored = StoredTask::new_with_id(wrapped, task_id);
848
849        Ok((handle, stored))
850    }
851
852    // =========================================================================
853    // Child Regions
854    // =========================================================================
855
856    /// Creates a child region and runs the provided future within a child scope.
857    ///
858    /// The child region inherits the parent's budget by default. Use
859    /// [`Scope::region_with_budget`] to tighten constraints for the child.
860    ///
861    /// The returned outcome is the result of the body future. After the body
862    /// completes, the child region begins its close sequence and advances until
863    /// it can close (assuming all child tasks have completed and obligations are resolved).
864    ///
865    /// # Errors
866    ///
867    /// Returns [`RegionCreateError`] if the parent is closed, missing, or at capacity.
868    pub async fn region<P2, F, Fut, T, Caps>(
869        &self,
870        state: &mut RuntimeState,
871        cx: &Cx<Caps>,
872        policy: P2,
873        f: F,
874    ) -> Result<Outcome<T, P2::Error>, RegionCreateError>
875    where
876        P2: Policy,
877        F: FnOnce(Scope<'_, P2>, &mut RuntimeState) -> Fut,
878        Fut: Future<Output = Outcome<T, P2::Error>>,
879    {
880        self.region_with_budget(state, cx, self.budget, policy, f)
881            .await
882    }
883
884    /// Creates a child region with an explicit budget (met with the parent budget).
885    ///
886    /// The effective budget is `parent.meet(child)` to ensure nested scopes can
887    /// never relax constraints.
888    pub async fn region_with_budget<P2, F, Fut, T, Caps>(
889        &self,
890        state: &mut RuntimeState,
891        _cx: &Cx<Caps>,
892        budget: Budget,
893        _policy: P2,
894        f: F,
895    ) -> Result<Outcome<T, P2::Error>, RegionCreateError>
896    where
897        P2: Policy,
898        F: FnOnce(Scope<'_, P2>, &mut RuntimeState) -> Fut,
899        Fut: Future<Output = Outcome<T, P2::Error>>,
900    {
901        let child_region = state.create_child_region(self.region, budget)?;
902        let child_budget = state
903            .region(child_region)
904            .map_or(self.budget, crate::record::RegionRecord::budget);
905        let child_scope = Scope::<P2>::new(child_region, child_budget);
906
907        let fut = f(child_scope, &mut *state);
908        let pinned_fut = std::pin::pin!(CatchUnwind { inner: fut });
909
910        let runner = RegionRunner {
911            fut: pinned_fut,
912            state: Some(state),
913            child_region,
914        };
915
916        let (result, state) = runner.await;
917        let outcome = match result {
918            Ok(outcome) => outcome,
919            Err(payload) => {
920                let msg = payload_to_string(&payload);
921                Outcome::Panicked(PanicPayload::new(msg))
922            }
923        };
924
925        match &outcome {
926            Outcome::Ok(_) => {
927                if let Some(region) = state.region(child_region) {
928                    region.begin_close(None);
929                }
930            }
931            Outcome::Cancelled(reason) => {
932                let _ = state.cancel_request(child_region, reason, None);
933            }
934            Outcome::Err(_) | Outcome::Panicked(_) => {
935                let reason = CancelReason::fail_fast().with_region(child_region);
936                let _ = state.cancel_request(child_region, &reason, None);
937            }
938        }
939
940        let close_notify = state.region(child_region).map(|r| r.close_notify.clone());
941        state.advance_region_state(child_region);
942
943        if let Some(notify) = close_notify {
944            RegionCloseFuture { state: notify }.await;
945        }
946
947        Ok(outcome)
948    }
949
950    // =========================================================================
951    // Combinators
952    // =========================================================================
953
954    /// Joins two tasks, waiting for both to complete.
955    ///
956    /// This method waits for both tasks to complete, regardless of their outcome.
957    /// It returns a tuple of results.
958    ///
959    /// # Example
960    /// ```ignore
961    /// let (h1, _) = scope.spawn(...);
962    /// let (h2, _) = scope.spawn(...);
963    /// let (r1, r2) = scope.join(cx, h1, h2).await;
964    /// ```
965    pub async fn join<T1, T2>(
966        &self,
967        cx: &Cx,
968        mut h1: TaskHandle<T1>,
969        mut h2: TaskHandle<T2>,
970    ) -> (Result<T1, JoinError>, Result<T2, JoinError>) {
971        let mut f1 = h1.join(cx);
972        let mut f2 = h2.join(cx);
973        let r1 = std::pin::Pin::new(&mut f1).await;
974        let r2 = std::pin::Pin::new(&mut f2).await;
975        (r1, r2)
976    }
977
978    /// Races two tasks, waiting for the first to complete.
979    ///
980    /// The loser is cancelled and drained (awaited until it completes cancellation).
981    ///
982    /// # Example
983    /// ```ignore
984    /// let (h1, _) = scope.spawn(...);
985    /// let (h2, _) = scope.spawn(...);
986    /// match scope.race(cx, h1, h2).await {
987    ///     Ok(val) => println!("Winner result: {val}"),
988    ///     Err(e) => println!("Race failed: {e}"),
989    /// }
990    /// ```
991    pub async fn race<T>(
992        &self,
993        cx: &Cx,
994        mut h1: TaskHandle<T>,
995        mut h2: TaskHandle<T>,
996    ) -> Result<T, JoinError> {
997        let winner = {
998            let f1 = h1.join_with_drop_reason(cx, CancelReason::race_loser());
999            let mut f1 = std::pin::pin!(f1);
1000            let f2 = h2.join_with_drop_reason(cx, CancelReason::race_loser());
1001            let mut f2 = std::pin::pin!(f2);
1002            Select::new(f1.as_mut(), f2.as_mut())
1003                .await
1004                .map_err(|_| JoinError::PolledAfterCompletion)?
1005        };
1006
1007        match winner {
1008            Either::Left(res) => {
1009                let loser_res = h2.join(cx).await;
1010                if let Err(JoinError::Panicked(p)) = res {
1011                    Err(JoinError::Panicked(p))
1012                } else if let Err(JoinError::Panicked(p)) = loser_res {
1013                    Err(JoinError::Panicked(p))
1014                } else {
1015                    res
1016                }
1017            }
1018            Either::Right(res) => {
1019                let loser_res = h1.join(cx).await;
1020                if let Err(JoinError::Panicked(p)) = res {
1021                    Err(JoinError::Panicked(p))
1022                } else if let Err(JoinError::Panicked(p)) = loser_res {
1023                    Err(JoinError::Panicked(p))
1024                } else {
1025                    res
1026                }
1027            }
1028        }
1029    }
1030
1031    /// Hedges a primary operation with a backup operation.
1032    ///
1033    /// 1. Spawns the primary task immediately.
1034    /// 2. Waits for the delay.
1035    /// 3. If primary finishes before delay: returns primary result.
1036    /// 4. If delay fires: spawns backup task and races them.
1037    ///
1038    /// The loser is cancelled and drained.
1039    ///
1040    /// # Arguments
1041    /// * `state` - The runtime state
1042    /// * `cx` - The capability context
1043    /// * `delay` - The hedge delay
1044    /// * `primary` - The primary future factory
1045    /// * `backup` - The backup future factory
1046    ///
1047    /// # Returns
1048    /// `Ok(T)` if successful, `Err(JoinError)` if failed/cancelled.
1049    pub async fn hedge<F1, Fut1, F2, Fut2, T>(
1050        &self,
1051        state: &mut RuntimeState,
1052        cx: &Cx,
1053        delay: std::time::Duration,
1054        primary: F1,
1055        backup: F2,
1056    ) -> Result<T, JoinError>
1057    where
1058        F1: FnOnce(Cx) -> Fut1 + Send + 'static,
1059        Fut1: Future<Output = T> + Send + 'static,
1060        F2: FnOnce(Cx) -> Fut2 + Send + 'static,
1061        Fut2: Future<Output = T> + Send + 'static,
1062        T: Send + 'static,
1063    {
1064        use crate::combinator::Either;
1065        use crate::combinator::select::Select;
1066        // 1. Spawn primary
1067        let mut h1 = self
1068            .spawn_registered(state, cx, primary)
1069            .map_err(|_| JoinError::Cancelled(CancelReason::resource_unavailable()))?;
1070
1071        // 2. Race primary vs delay.
1072        // Scope the pinned join future so we can safely reuse h1 afterwards.
1073        let primary_or_delay = {
1074            let f1_primary = h1.join(cx);
1075            let mut f1_primary = std::pin::pin!(f1_primary);
1076
1077            let now = cx
1078                .timer_driver()
1079                .map_or_else(crate::time::wall_now, |d| d.now());
1080            let sleep_fut = crate::time::sleep(now, delay);
1081            let mut sleep_pinned = std::pin::pin!(sleep_fut);
1082
1083            let res = Select::new(f1_primary.as_mut(), sleep_pinned.as_mut())
1084                .await
1085                .map_err(|_| JoinError::PolledAfterCompletion)?;
1086            if matches!(res, Either::Right(())) {
1087                f1_primary.defuse_drop_abort();
1088            }
1089            res
1090        };
1091
1092        match primary_or_delay {
1093            Either::Left(res) => {
1094                // Primary finished first
1095                res
1096            }
1097            Either::Right(()) => {
1098                // Timeout fired. Spawn backup.
1099                let Ok(mut h2) = self.spawn_registered(state, cx, backup) else {
1100                    // Backup admission failed after primary already started.
1101                    // Request cancellation on primary to avoid orphaned work.
1102                    h1.abort_with_reason(CancelReason::resource_unavailable());
1103
1104                    if crate::runtime::scheduler::three_lane::current_worker_id().is_some() {
1105                        // In scheduler-backed runtime execution, fully drain the
1106                        // cancelled primary before returning.
1107                        match h1.join(cx).await {
1108                            Ok(res) => return Ok(res),
1109                            Err(JoinError::Panicked(p)) => return Err(JoinError::Panicked(p)),
1110                            Err(JoinError::Cancelled(_) | JoinError::PolledAfterCompletion) => {}
1111                        }
1112                    } else {
1113                        // In no-scheduler contexts (e.g. direct unit-test block_on),
1114                        // full join can deadlock because nothing drives stored tasks.
1115                        // Keep this as best-effort and return promptly.
1116                        let mut drain = std::pin::pin!(h1.join(cx));
1117                        let waker = std::task::Waker::noop();
1118                        let mut poll_cx = Context::from_waker(waker);
1119                        match drain.as_mut().poll(&mut poll_cx) {
1120                            std::task::Poll::Ready(Ok(res)) => return Ok(res),
1121                            std::task::Poll::Ready(Err(JoinError::Panicked(p))) => {
1122                                return Err(JoinError::Panicked(p));
1123                            }
1124                            _ => {}
1125                        }
1126                    }
1127
1128                    return Err(JoinError::Cancelled(CancelReason::resource_unavailable()));
1129                };
1130
1131                // Now race h1 and h2 with bounded future borrows.
1132                let race_outcome = {
1133                    let f1_race = h1.join_with_drop_reason(cx, CancelReason::race_loser());
1134                    let mut f1_race = std::pin::pin!(f1_race);
1135                    let f2_race = h2.join_with_drop_reason(cx, CancelReason::race_loser());
1136                    let mut f2_race = std::pin::pin!(f2_race);
1137                    Select::new(f1_race.as_mut(), f2_race.as_mut())
1138                        .await
1139                        .map_err(|_| JoinError::PolledAfterCompletion)?
1140                };
1141
1142                match race_outcome {
1143                    Either::Left(res) => {
1144                        let loser_res = h2.join(cx).await;
1145                        if let Err(JoinError::Panicked(p)) = res {
1146                            Err(JoinError::Panicked(p))
1147                        } else if let Err(JoinError::Panicked(p)) = loser_res {
1148                            Err(JoinError::Panicked(p))
1149                        } else {
1150                            res
1151                        }
1152                    }
1153                    Either::Right(res) => {
1154                        let loser_res = h1.join(cx).await;
1155                        if let Err(JoinError::Panicked(p)) = res {
1156                            Err(JoinError::Panicked(p))
1157                        } else if let Err(JoinError::Panicked(p)) = loser_res {
1158                            Err(JoinError::Panicked(p))
1159                        } else {
1160                            res
1161                        }
1162                    }
1163                }
1164            }
1165        }
1166    }
1167
1168    /// Races multiple tasks, waiting for the first to complete.
1169    ///
1170    /// The winner's result is returned. Losers are cancelled and drained.
1171    ///
1172    /// # Arguments
1173    /// * `cx` - The capability context
1174    /// * `handles` - Vector of task handles to race
1175    ///
1176    /// # Returns
1177    /// `Ok((value, index))` if the winner succeeded.
1178    /// `Err(e)` if the winner failed (error/cancel/panic).
1179    pub async fn race_all<T>(
1180        &self,
1181        cx: &Cx,
1182        handles: Vec<TaskHandle<T>>,
1183    ) -> Result<(T, usize), JoinError> {
1184        let mut handles = handles;
1185        if handles.is_empty() {
1186            return std::future::pending().await;
1187        }
1188
1189        let mut futures: Vec<_> = handles
1190            .iter_mut()
1191            .map(|h| h.join_with_drop_reason(cx, CancelReason::race_loser()))
1192            .collect();
1193        let mut ready_results: Vec<Option<Result<T, JoinError>>> = std::iter::repeat_with(|| None)
1194            .take(futures.len())
1195            .collect();
1196        let mut winner_idx = None;
1197
1198        // Poll every candidate in each round and keep all same-round ready
1199        // outcomes. This prevents losing loser panic outcomes when multiple
1200        // tasks become ready in the same poll.
1201        let winner_idx = std::future::poll_fn(|poll_cx| {
1202            let mut newly_ready = Vec::new();
1203
1204            for (i, future) in futures.iter_mut().enumerate() {
1205                if ready_results[i].is_some() {
1206                    continue;
1207                }
1208                if let std::task::Poll::Ready(res) = std::pin::Pin::new(future).poll(poll_cx) {
1209                    ready_results[i] = Some(res);
1210                    newly_ready.push(i);
1211                }
1212            }
1213
1214            if let Some(existing) = winner_idx {
1215                return std::task::Poll::Ready(existing);
1216            }
1217
1218            if newly_ready.is_empty() {
1219                std::task::Poll::Pending
1220            } else {
1221                // Fairly select a winner among all that became ready in this round
1222                let chosen = newly_ready[cx.random_usize(newly_ready.len())];
1223                winner_idx = Some(chosen);
1224                std::task::Poll::Ready(chosen)
1225            }
1226        })
1227        .await;
1228
1229        let winner_result = ready_results[winner_idx]
1230            .take()
1231            .expect("winner index must have a ready result");
1232
1233        // Release mutable borrows of handles held by JoinFuture values before
1234        // explicit loser cancellation/join.
1235        drop(futures);
1236
1237        // Drain completed losers first so terminal panic outcomes are not
1238        // obscured by strengthening cancellation reasons on already-finished tasks.
1239        let mut loser_panic = None;
1240        let mut pending_loser_indices = Vec::new();
1241        for (i, handle) in handles.iter_mut().enumerate() {
1242            if i == winner_idx {
1243                continue;
1244            }
1245            if let Some(res) = ready_results[i].take() {
1246                if let Err(JoinError::Panicked(p)) = res {
1247                    if loser_panic.is_none() {
1248                        loser_panic = Some(p);
1249                    }
1250                }
1251            } else if handle.is_finished() {
1252                let res = handle.join(cx).await;
1253                if let Err(JoinError::Panicked(p)) = res {
1254                    if loser_panic.is_none() {
1255                        loser_panic = Some(p);
1256                    }
1257                }
1258            } else {
1259                pending_loser_indices.push(i);
1260            }
1261        }
1262
1263        // Cancel and drain unfinished losers.
1264        // Note: Losers may also already have a race-loser reason from dropped
1265        // join futures; strengthening keeps attribution deterministic.
1266        for &idx in &pending_loser_indices {
1267            handles[idx].abort_with_reason(CancelReason::race_loser());
1268        }
1269        for idx in pending_loser_indices {
1270            let res = handles[idx].join(cx).await;
1271            if let Err(JoinError::Panicked(p)) = res {
1272                if loser_panic.is_none() {
1273                    loser_panic = Some(p);
1274                }
1275            }
1276        }
1277
1278        loser_panic.map_or_else(
1279            || winner_result.map(|val| (val, winner_idx)),
1280            |p| Err(JoinError::Panicked(p)),
1281        )
1282    }
1283
1284    /// Joins multiple tasks, waiting for all to complete.
1285    ///
1286    /// Returns a vector of results in the same order as the input handles.
1287    pub async fn join_all<T>(
1288        &self,
1289        cx: &Cx,
1290        mut handles: Vec<TaskHandle<T>>,
1291    ) -> Vec<Result<T, JoinError>> {
1292        let mut futures: Vec<_> = handles.iter_mut().map(|h| h.join(cx)).collect();
1293        let mut results = Vec::with_capacity(futures.len());
1294        for fut in &mut futures {
1295            results.push(std::pin::Pin::new(fut).await);
1296        }
1297        results
1298    }
1299
1300    pub(crate) fn build_child_task_cx<Caps>(
1301        &self,
1302        state: &RuntimeState,
1303        parent_cx: &Cx<Caps>,
1304        task_id: TaskId,
1305    ) -> (Cx<Caps>, Cx<cap::All>) {
1306        let child_observability = parent_cx.child_observability(self.region, task_id);
1307        let child_entropy = parent_cx.child_entropy(task_id);
1308        let io_driver = state.io_driver_handle();
1309        let timer_driver = state.timer_driver_handle();
1310        let logical_clock = state
1311            .logical_clock_mode()
1312            .build_handle(timer_driver.clone());
1313
1314        let child_cx = Cx::<Caps>::new_with_drivers(
1315            self.region,
1316            task_id,
1317            self.budget,
1318            Some(child_observability),
1319            io_driver,
1320            None,
1321            timer_driver,
1322            Some(child_entropy),
1323        )
1324        .with_logical_clock(logical_clock)
1325        .with_registry_handle(parent_cx.registry_handle())
1326        .with_remote_cap_handle(parent_cx.remote_cap_handle())
1327        .with_blocking_pool_handle(parent_cx.blocking_pool_handle())
1328        .with_evidence_sink(parent_cx.evidence_sink_handle());
1329        child_cx.set_trace_buffer(state.trace_handle());
1330        let child_cx_full = child_cx.retype::<cap::All>();
1331
1332        (child_cx, child_cx_full)
1333    }
1334
1335    /// Creates a task record in the runtime state.
1336    ///
1337    /// This is a helper method used by all spawn variants.
1338    pub(crate) fn create_task_record(
1339        &self,
1340        state: &mut RuntimeState,
1341    ) -> Result<TaskId, SpawnError> {
1342        use crate::util::ArenaIndex;
1343
1344        // Create placeholder task record
1345        let idx = state.insert_task(TaskRecord::new_with_time(
1346            TaskId::from_arena(ArenaIndex::new(0, 0)), // placeholder ID
1347            self.region,
1348            self.budget,
1349            state.now,
1350        ));
1351
1352        // Get the real task ID from the arena index
1353        let task_id = TaskId::from_arena(idx);
1354
1355        // Update the task record with the correct ID
1356        if let Some(record) = state.task_mut(task_id) {
1357            record.id = task_id;
1358        }
1359
1360        // Add task to the owning region
1361        if let Some(region) = state.region(self.region) {
1362            if let Err(err) = region.add_task(task_id) {
1363                // Rollback task creation
1364                state.remove_task(task_id);
1365                return Err(match err {
1366                    AdmissionError::Closed => SpawnError::RegionClosed(self.region),
1367                    AdmissionError::LimitReached { limit, live, .. } => {
1368                        SpawnError::RegionAtCapacity {
1369                            region: self.region,
1370                            limit,
1371                            live,
1372                        }
1373                    }
1374                });
1375            }
1376        } else {
1377            // Rollback task creation
1378            state.remove_task(task_id);
1379            return Err(SpawnError::RegionNotFound(self.region));
1380        }
1381
1382        state.record_task_spawn(task_id, self.region);
1383
1384        Ok(task_id)
1385    }
1386
1387    // =========================================================================
1388    // Finalizer Registration
1389    // =========================================================================
1390
1391    /// Registers a synchronous finalizer to run when the region closes.
1392    ///
1393    /// Finalizers are stored in LIFO order and executed during the Finalizing
1394    /// phase, after all children have completed. Use this for lightweight
1395    /// cleanup that doesn't need to await.
1396    ///
1397    /// # Arguments
1398    /// * `state` - The runtime state
1399    /// * `f` - The synchronous cleanup function
1400    ///
1401    /// # Returns
1402    /// `true` if the finalizer was registered successfully.
1403    ///
1404    /// # Example
1405    /// ```ignore
1406    /// scope.defer_sync(&mut state, || {
1407    ///     println!("Cleaning up!");
1408    /// });
1409    /// ```
1410    pub fn defer_sync<F>(&self, state: &mut RuntimeState, f: F) -> bool
1411    where
1412        F: FnOnce() + Send + 'static,
1413    {
1414        state.register_sync_finalizer(self.region, f)
1415    }
1416
1417    /// Registers an asynchronous finalizer to run when the region closes.
1418    ///
1419    /// Async finalizers run under a cancel mask to prevent interruption.
1420    /// They are driven to completion with a bounded budget. Use this for
1421    /// cleanup that needs to perform async operations (e.g., closing
1422    /// connections, flushing buffers).
1423    ///
1424    /// # Arguments
1425    /// * `state` - The runtime state
1426    /// * `future` - The async cleanup future
1427    ///
1428    /// # Returns
1429    /// `true` if the finalizer was registered successfully.
1430    ///
1431    /// # Example
1432    /// ```ignore
1433    /// scope.defer_async(&mut state, async {
1434    ///     close_connection().await;
1435    /// });
1436    /// ```
1437    pub fn defer_async<F>(&self, state: &mut RuntimeState, future: F) -> bool
1438    where
1439        F: Future<Output = ()> + Send + 'static,
1440    {
1441        state.register_async_finalizer(self.region, future)
1442    }
1443}
1444
1445impl<P: Policy> std::fmt::Debug for Scope<'_, P> {
1446    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1447        f.debug_struct("Scope")
1448            .field("region", &self.region)
1449            .field("budget", &self.budget)
1450            .finish()
1451    }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456    use super::*;
1457    use crate::record::RegionLimits;
1458    use crate::runtime::RuntimeState;
1459    use crate::types::{CancelKind, Outcome};
1460    use crate::util::ArenaIndex;
1461    use futures_lite::future::block_on;
1462    use std::sync::Arc;
1463
1464    fn test_cx() -> Cx {
1465        Cx::new(
1466            RegionId::from_arena(ArenaIndex::new(0, 0)),
1467            TaskId::from_arena(ArenaIndex::new(0, 0)),
1468            Budget::INFINITE,
1469        )
1470    }
1471
1472    fn test_scope(region: RegionId, budget: Budget) -> Scope<'static> {
1473        Scope::new(region, budget)
1474    }
1475
1476    #[test]
1477    fn spawn_creates_task_record() {
1478        let mut state = RuntimeState::new();
1479        let cx = test_cx();
1480        let region = state.create_root_region(Budget::INFINITE);
1481        let scope = test_scope(region, Budget::INFINITE);
1482
1483        let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1484
1485        // Task should exist in state
1486        let task = state.task(handle.task_id());
1487        assert!(task.is_some());
1488
1489        // Task should be owned by the region
1490        let task = task.unwrap();
1491        assert_eq!(task.owner, region);
1492    }
1493
1494    #[test]
1495    fn spawn_inherits_registry_and_remote_capabilities() {
1496        use crate::cx::registry::RegistryHandle;
1497        use crate::remote::{NodeId, RemoteCap};
1498        use std::task::{Context, Waker};
1499
1500        struct NoopWaker;
1501        impl std::task::Wake for NoopWaker {
1502            fn wake(self: Arc<Self>) {}
1503        }
1504
1505        let mut state = RuntimeState::new();
1506
1507        let registry = crate::cx::NameRegistry::new();
1508        let registry_handle = RegistryHandle::new(Arc::new(registry));
1509        let parent_registry_arc = registry_handle.as_arc();
1510
1511        let cx = test_cx()
1512            .with_registry_handle(Some(registry_handle))
1513            .with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("origin-test")));
1514
1515        let region = state.create_root_region(Budget::INFINITE);
1516        let scope = test_scope(region, Budget::INFINITE);
1517
1518        let mut handle = scope
1519            .spawn_registered(&mut state, &cx, move |cx| async move {
1520                let child_registry = cx.registry_handle().expect("child must inherit registry");
1521                let child_registry_arc = child_registry.as_arc();
1522                let same_registry = Arc::ptr_eq(&child_registry_arc, &parent_registry_arc);
1523
1524                let child_remote = cx.remote().expect("child must inherit remote cap");
1525                let origin = child_remote.local_node().as_str().to_owned();
1526
1527                (same_registry, origin)
1528            })
1529            .unwrap();
1530
1531        let waker = Waker::from(Arc::new(NoopWaker));
1532        let mut poll_cx = Context::from_waker(&waker);
1533
1534        let stored = state
1535            .get_stored_future(handle.task_id())
1536            .expect("spawn_registered must store the task");
1537        assert!(stored.poll(&mut poll_cx).is_ready());
1538
1539        let mut join_fut = std::pin::pin!(handle.join(&cx));
1540        match join_fut.as_mut().poll(&mut poll_cx) {
1541            Poll::Ready(Ok((same_registry, origin))) => {
1542                assert!(
1543                    same_registry,
1544                    "child should observe the same RegistryCap instance"
1545                );
1546                assert_eq!(origin, "origin-test");
1547            }
1548            other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1549        }
1550    }
1551
1552    #[test]
1553    fn spawn_inherits_runtime_timer_driver() {
1554        use std::task::{Context, Waker};
1555
1556        struct NoopWaker;
1557        impl std::task::Wake for NoopWaker {
1558            fn wake(self: Arc<Self>) {}
1559        }
1560
1561        let mut state = RuntimeState::new();
1562        let clock = Arc::new(crate::time::VirtualClock::new());
1563        state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
1564
1565        let cx = test_cx();
1566        let region = state.create_root_region(Budget::INFINITE);
1567        let scope = test_scope(region, Budget::INFINITE);
1568
1569        let (mut handle, mut stored) = scope
1570            .spawn(&mut state, &cx, |cx| async move { cx.has_timer() })
1571            .expect("spawn should succeed");
1572
1573        let waker = Waker::from(Arc::new(NoopWaker));
1574        let mut poll_cx = Context::from_waker(&waker);
1575        assert!(stored.poll(&mut poll_cx).is_ready());
1576
1577        let mut join_fut = std::pin::pin!(handle.join(&cx));
1578        match join_fut.as_mut().poll(&mut poll_cx) {
1579            Poll::Ready(Ok(has_timer)) => assert!(has_timer),
1580            other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1581        }
1582    }
1583
1584    #[test]
1585    fn spawn_blocking_inherits_runtime_timer_driver() {
1586        use std::task::{Context, Waker};
1587
1588        struct NoopWaker;
1589        impl std::task::Wake for NoopWaker {
1590            fn wake(self: Arc<Self>) {}
1591        }
1592
1593        let mut state = RuntimeState::new();
1594        let clock = Arc::new(crate::time::VirtualClock::new());
1595        state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
1596
1597        let cx = test_cx();
1598        let region = state.create_root_region(Budget::INFINITE);
1599        let scope = test_scope(region, Budget::INFINITE);
1600
1601        let (mut handle, mut stored) = scope
1602            .spawn_blocking(&mut state, &cx, |cx| cx.has_timer())
1603            .expect("spawn_blocking should succeed");
1604
1605        let waker = Waker::from(Arc::new(NoopWaker));
1606        let mut poll_cx = Context::from_waker(&waker);
1607        assert!(stored.poll(&mut poll_cx).is_ready());
1608
1609        let mut join_fut = std::pin::pin!(handle.join(&cx));
1610        match join_fut.as_mut().poll(&mut poll_cx) {
1611            Poll::Ready(Ok(has_timer)) => assert!(has_timer),
1612            other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1613        }
1614    }
1615
1616    #[test]
1617    fn spawn_registered_stores_task() {
1618        let mut state = RuntimeState::new();
1619        let cx = test_cx();
1620        let region = state.create_root_region(Budget::INFINITE);
1621        let scope = test_scope(region, Budget::INFINITE);
1622
1623        // spawn_registered should both create and store the task
1624        let handle = scope
1625            .spawn_registered(&mut state, &cx, |_| async { 42_i32 })
1626            .unwrap();
1627
1628        // Task record should exist
1629        let task = state.task(handle.task_id());
1630        assert!(task.is_some());
1631        assert_eq!(task.unwrap().owner, region);
1632
1633        // StoredTask should be registered (can be retrieved for polling)
1634        let stored = state.get_stored_future(handle.task_id());
1635        assert!(stored.is_some(), "spawn_registered should store the task");
1636    }
1637
1638    #[test]
1639    fn spawn_registered_task_can_be_polled() {
1640        use std::sync::Arc;
1641        use std::task::{Context, Waker};
1642
1643        struct NoopWaker;
1644        impl std::task::Wake for NoopWaker {
1645            fn wake(self: Arc<Self>) {}
1646        }
1647
1648        let mut state = RuntimeState::new();
1649        let cx = test_cx();
1650        let region = state.create_root_region(Budget::INFINITE);
1651        let scope = test_scope(region, Budget::INFINITE);
1652
1653        let mut handle = scope
1654            .spawn_registered(&mut state, &cx, |_| async { 42_i32 })
1655            .unwrap();
1656
1657        // Get the stored future and poll it
1658        let waker = Waker::from(Arc::new(NoopWaker));
1659        let mut poll_cx = Context::from_waker(&waker);
1660
1661        let stored = state.get_stored_future(handle.task_id()).unwrap();
1662        let poll_result = stored.poll(&mut poll_cx);
1663        assert!(
1664            poll_result.is_ready(),
1665            "Simple async should complete in one poll"
1666        );
1667
1668        // Join should now have the result
1669        let mut join_fut = std::pin::pin!(handle.join(&cx));
1670        match join_fut.as_mut().poll(&mut poll_cx) {
1671            Poll::Ready(Ok(val)) => assert_eq!(val, 42),
1672            other => unreachable!("Expected Ready(Ok(42)), got {other:?}"),
1673        }
1674    }
1675
1676    #[test]
1677    fn spawn_blocking_creates_task_record() {
1678        let mut state = RuntimeState::new();
1679        let cx = test_cx();
1680        let region = state.create_root_region(Budget::INFINITE);
1681        let scope = test_scope(region, Budget::INFINITE);
1682
1683        let (handle, _stored) = scope.spawn_blocking(&mut state, &cx, |_| 42_i32).unwrap();
1684
1685        // Task should exist
1686        let task = state.task(handle.task_id());
1687        assert!(task.is_some());
1688        assert_eq!(task.unwrap().owner, region);
1689    }
1690
1691    #[test]
1692    fn spawn_local_creates_task_record() {
1693        let mut state = RuntimeState::new();
1694        let cx = test_cx();
1695        let region = state.create_root_region(Budget::INFINITE);
1696        let scope = test_scope(region, Budget::INFINITE);
1697
1698        let local_ready = Arc::new(parking_lot::Mutex::new(Vec::new()));
1699        let _local_ready_guard =
1700            crate::runtime::scheduler::three_lane::ScopedLocalReady::new(Arc::clone(&local_ready));
1701        let _worker_guard = crate::runtime::scheduler::three_lane::ScopedWorkerId::new(1);
1702
1703        // In Phase 0, spawn_local requires Send bounds
1704        // In Phase 1+, this will work with !Send futures
1705        let handle = scope
1706            .spawn_local(&mut state, &cx, |_| async move { 42_i32 })
1707            .unwrap();
1708
1709        // Task should exist
1710        let task = state.task(handle.task_id());
1711        assert!(task.is_some());
1712        assert_eq!(task.unwrap().owner, region);
1713    }
1714
1715    #[test]
1716    fn spawn_local_without_scheduler_fails_and_rolls_back() {
1717        let mut state = RuntimeState::new();
1718        let cx = test_cx();
1719        let region = state.create_root_region(Budget::INFINITE);
1720        let scope = test_scope(region, Budget::INFINITE);
1721
1722        let result = scope.spawn_local(&mut state, &cx, |_| async move { 5_i32 });
1723        assert!(matches!(result, Err(SpawnError::LocalSchedulerUnavailable)));
1724
1725        // Task should not exist
1726        assert!(state.tasks_is_empty());
1727        let region_record = state.region(region).unwrap();
1728        assert!(region_record.task_ids().is_empty());
1729    }
1730
1731    #[test]
1732    fn spawn_local_makes_progress_via_local_ready() {
1733        use std::sync::Arc;
1734        use std::task::{Context, Waker};
1735
1736        struct NoopWaker;
1737        impl std::task::Wake for NoopWaker {
1738            fn wake(self: Arc<Self>) {}
1739        }
1740
1741        let mut state = RuntimeState::new();
1742        let cx = test_cx();
1743        let region = state.create_root_region(Budget::INFINITE);
1744        let scope = test_scope(region, Budget::INFINITE);
1745
1746        let local_ready = Arc::new(parking_lot::Mutex::new(Vec::new()));
1747        let _local_ready_guard =
1748            crate::runtime::scheduler::three_lane::ScopedLocalReady::new(Arc::clone(&local_ready));
1749        let _worker_guard = crate::runtime::scheduler::three_lane::ScopedWorkerId::new(1);
1750
1751        let mut handle = scope
1752            .spawn_local(&mut state, &cx, |_| async move { 7_i32 })
1753            .unwrap();
1754
1755        let queued = {
1756            let queue = local_ready.lock();
1757            queue.contains(&handle.task_id())
1758        };
1759        assert!(queued, "spawn_local should enqueue into local_ready");
1760
1761        let task_id = {
1762            let mut queue = local_ready.lock();
1763            queue.remove(0)
1764        };
1765
1766        let mut join_fut = std::pin::pin!(handle.join(&cx));
1767        let waker = Waker::from(Arc::new(NoopWaker));
1768        let mut ctx = Context::from_waker(&waker);
1769
1770        assert!(join_fut.as_mut().poll(&mut ctx).is_pending());
1771
1772        let mut local_task =
1773            crate::runtime::local::remove_local_task(task_id).expect("local task missing");
1774        assert!(local_task.poll(&mut ctx).is_ready());
1775
1776        match join_fut.as_mut().poll(&mut ctx) {
1777            Poll::Ready(Ok(val)) => assert_eq!(val, 7),
1778            res => unreachable!("Expected Ready(Ok(7)), got {res:?}"),
1779        }
1780    }
1781
1782    #[test]
1783    fn task_added_to_region() {
1784        let mut state = RuntimeState::new();
1785        let cx = test_cx();
1786        let region = state.create_root_region(Budget::INFINITE);
1787        let scope = test_scope(region, Budget::INFINITE);
1788
1789        let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1790
1791        // Check region has the task
1792        let region_record = state.region(region).unwrap();
1793        assert!(region_record.task_ids().contains(&handle.task_id()));
1794    }
1795
1796    #[test]
1797    fn multiple_spawns_create_distinct_tasks() {
1798        let mut state = RuntimeState::new();
1799        let cx = test_cx();
1800        let region = state.create_root_region(Budget::INFINITE);
1801        let scope = test_scope(region, Budget::INFINITE);
1802
1803        let (handle1, _) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
1804        let (handle2, _) = scope.spawn(&mut state, &cx, |_| async { 2_i32 }).unwrap();
1805        let (handle3, _) = scope.spawn(&mut state, &cx, |_| async { 3_i32 }).unwrap();
1806
1807        // All task IDs should be different
1808        assert_ne!(handle1.task_id(), handle2.task_id());
1809        assert_ne!(handle2.task_id(), handle3.task_id());
1810        assert_ne!(handle1.task_id(), handle3.task_id());
1811
1812        // All tasks should be in the region
1813        let region_record = state.region(region).unwrap();
1814        assert!(region_record.task_ids().contains(&handle1.task_id()));
1815        assert!(region_record.task_ids().contains(&handle2.task_id()));
1816        assert!(region_record.task_ids().contains(&handle3.task_id()));
1817    }
1818
1819    #[test]
1820    fn spawn_into_closing_region_should_fail() {
1821        let mut state = RuntimeState::new();
1822        let cx = test_cx();
1823        let region = state.create_root_region(Budget::INFINITE);
1824        let scope = test_scope(region, Budget::INFINITE);
1825
1826        // Transition region to Closing
1827        let region_record = state.region_mut(region).expect("region");
1828        region_record.begin_close(None);
1829
1830        // Attempt to spawn should fail
1831        let result = scope.spawn(&mut state, &cx, |_| async { 42 });
1832        assert!(matches!(result, Err(SpawnError::RegionClosed(_))));
1833    }
1834
1835    #[test]
1836    fn test_join_manual_poll() {
1837        use std::sync::Arc;
1838        use std::task::{Context, Waker};
1839
1840        struct NoopWaker;
1841        impl std::task::Wake for NoopWaker {
1842            fn wake(self: Arc<Self>) {}
1843        }
1844
1845        let mut state = RuntimeState::new();
1846        let cx = test_cx();
1847        let region = state.create_root_region(Budget::INFINITE);
1848        let scope = test_scope(region, Budget::INFINITE);
1849
1850        // Spawn a task
1851        let (mut handle, mut stored_task) =
1852            scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1853        // The stored task is returned directly, not put in state by scope.spawn
1854
1855        // Create join future
1856        let mut join_fut = std::pin::pin!(handle.join(&cx));
1857
1858        // Create waker context
1859        let waker = Waker::from(Arc::new(NoopWaker));
1860        let mut ctx = Context::from_waker(&waker);
1861
1862        // Poll join - should be pending
1863        assert!(join_fut.as_mut().poll(&mut ctx).is_pending());
1864
1865        // Poll stored task - should complete and send result
1866        assert!(stored_task.poll(&mut ctx).is_ready());
1867
1868        // Poll join - should be ready now
1869        match join_fut.as_mut().poll(&mut ctx) {
1870            Poll::Ready(Ok(val)) => assert_eq!(val, 42),
1871            other => unreachable!("Expected Ready(Ok(42)), got {other:?}"),
1872        }
1873    }
1874
1875    #[test]
1876    fn spawn_abort_cancels_task() {
1877        use std::sync::Arc;
1878        use std::task::{Context, Poll, Waker};
1879
1880        struct NoopWaker;
1881        impl std::task::Wake for NoopWaker {
1882            fn wake(self: Arc<Self>) {}
1883        }
1884
1885        let mut state = RuntimeState::new();
1886        let cx = test_cx();
1887        let region = state.create_root_region(Budget::INFINITE);
1888        let scope = test_scope(region, Budget::INFINITE);
1889
1890        // Spawn a task that checks for cancellation
1891        let (mut handle, mut stored_task) = scope
1892            .spawn(&mut state, &cx, |cx| async move {
1893                // We expect to be cancelled immediately because abort() is called before we run
1894                if cx.checkpoint().is_err() {
1895                    return "cancelled";
1896                }
1897                "finished"
1898            })
1899            .unwrap();
1900
1901        // Abort the task via handle
1902        handle.abort();
1903
1904        // Drive the task
1905        let waker = Waker::from(Arc::new(NoopWaker));
1906        let mut ctx = Context::from_waker(&waker);
1907
1908        // Task should run, see cancellation, and return "cancelled"
1909        match stored_task.poll(&mut ctx) {
1910            Poll::Ready(crate::types::Outcome::Ok(())) => {}
1911            res => unreachable!("Task should have completed with Ok(()), got {res:?}"),
1912        }
1913
1914        // Check result via handle
1915        let mut join_fut = std::pin::pin!(handle.join(&cx));
1916        match join_fut.as_mut().poll(&mut ctx) {
1917            Poll::Ready(Ok(val)) => assert_eq!(val, "cancelled"),
1918            Poll::Ready(Err(e)) => unreachable!("Task failed unexpectedly: {e}"),
1919            Poll::Pending => unreachable!("Join should be ready"),
1920        }
1921    }
1922
1923    #[test]
1924    fn hedge_backup_spawn_failure_aborts_primary() {
1925        let mut state = RuntimeState::new();
1926        let cx = test_cx();
1927        let region = state.create_root_region(Budget::INFINITE);
1928        let scope = test_scope(region, Budget::INFINITE);
1929
1930        let limits = RegionLimits {
1931            max_tasks: Some(1),
1932            ..RegionLimits::unlimited()
1933        };
1934        assert!(state.set_region_limits(region, limits));
1935
1936        let result = block_on(scope.hedge(
1937            &mut state,
1938            &cx,
1939            std::time::Duration::ZERO,
1940            |_| async { 1_u8 },
1941            |_| async { 2_u8 },
1942        ));
1943
1944        assert!(matches!(
1945            result,
1946            Err(JoinError::Cancelled(reason))
1947                if reason.kind == CancelKind::ResourceUnavailable
1948        ));
1949
1950        let task_id = *state
1951            .region(region)
1952            .expect("region missing")
1953            .task_ids()
1954            .first()
1955            .expect("primary task should remain tracked");
1956
1957        let task = state.task(task_id).expect("primary task record missing");
1958        let (cancel_requested, cancel_reason_kind) = {
1959            let inner = task
1960                .cx_inner
1961                .as_ref()
1962                .expect("primary task must have shared Cx inner")
1963                .read();
1964            (
1965                inner.cancel_requested,
1966                inner.cancel_reason.as_ref().map(|r| r.kind),
1967            )
1968        };
1969
1970        assert!(
1971            cancel_requested,
1972            "primary task must be cancellation-requested when backup spawn fails"
1973        );
1974        assert_eq!(cancel_reason_kind, Some(CancelKind::ResourceUnavailable));
1975    }
1976
1977    #[test]
1978    fn region_closes_empty_child() {
1979        let mut state = RuntimeState::new();
1980        let cx = test_cx();
1981        let parent = state.create_root_region(Budget::INFINITE);
1982        let scope = test_scope(parent, Budget::INFINITE);
1983
1984        let outcome = block_on(scope.region(
1985            &mut state,
1986            &cx,
1987            crate::types::policy::FailFast,
1988            |child, _state| {
1989                let child_id = child.region_id();
1990                async move { Outcome::Ok(child_id) }
1991            },
1992        ))
1993        .expect("child region created");
1994
1995        let child_id = match outcome {
1996            Outcome::Ok(id) => id,
1997            other => unreachable!("expected Outcome::Ok(child_id), got {other:?}"),
1998        };
1999
2000        assert!(
2001            state.region(child_id).is_none(),
2002            "closed child region should be reclaimed from arena"
2003        );
2004
2005        let parent_record = state.region(parent).expect("parent record missing");
2006        assert!(
2007            !parent_record.child_ids().contains(&child_id),
2008            "closed child should be removed from parent"
2009        );
2010    }
2011
2012    #[test]
2013    fn region_budget_is_met_with_parent() {
2014        let mut state = RuntimeState::new();
2015        let cx = test_cx();
2016        let parent = state.create_root_region(Budget::with_deadline_secs(10));
2017        let scope = test_scope(parent, Budget::with_deadline_secs(10));
2018
2019        let outcome = block_on(scope.region_with_budget(
2020            &mut state,
2021            &cx,
2022            Budget::with_deadline_secs(30),
2023            crate::types::policy::FailFast,
2024            |child, _state| {
2025                let child_id = child.region_id();
2026                let child_budget = child.budget();
2027                async move { Outcome::Ok((child_id, child_budget)) }
2028            },
2029        ))
2030        .expect("child region created");
2031
2032        let (child_id, child_budget) = match outcome {
2033            Outcome::Ok(tuple) => tuple,
2034            other => unreachable!("expected Outcome::Ok(child_id), got {other:?}"),
2035        };
2036
2037        assert_eq!(
2038            child_budget.deadline,
2039            Some(crate::types::Time::from_secs(10))
2040        );
2041        assert!(
2042            state.region(child_id).is_none(),
2043            "closed child region should be reclaimed from arena"
2044        );
2045    }
2046
2047    #[test]
2048    fn region_spawns_tasks_in_child() {
2049        use std::task::{Context, Poll, Waker};
2050
2051        struct NoopWaker;
2052        impl std::task::Wake for NoopWaker {
2053            fn wake(self: Arc<Self>) {}
2054        }
2055
2056        let mut state = RuntimeState::new();
2057        let cx = test_cx();
2058        let parent = state.create_root_region(Budget::INFINITE);
2059        let scope = test_scope(parent, Budget::INFINITE);
2060
2061        let outcome = block_on(scope.region(
2062            &mut state,
2063            &cx,
2064            crate::types::policy::FailFast,
2065            |child, state| {
2066                let child_id = child.region_id();
2067                let (handle, mut stored) = child
2068                    .spawn(state, &cx, |_| async { 7_i32 })
2069                    .expect("spawn in child");
2070
2071                let parent_has = state
2072                    .region(parent)
2073                    .expect("parent record missing")
2074                    .task_ids()
2075                    .contains(&handle.task_id());
2076                let child_has = state
2077                    .region(child_id)
2078                    .expect("child record missing")
2079                    .task_ids()
2080                    .contains(&handle.task_id());
2081
2082                let waker = Waker::from(Arc::new(NoopWaker));
2083                let mut poll_cx = Context::from_waker(&waker);
2084                let poll_result = stored.poll(&mut poll_cx);
2085                if let Poll::Ready(outcome) = poll_result {
2086                    let task_outcome = match outcome {
2087                        Outcome::Ok(()) => Outcome::Ok(()),
2088                        Outcome::Panicked(payload) => Outcome::Panicked(payload),
2089                        other => unreachable!("unexpected task outcome: {other:?}"),
2090                    };
2091                    if let Some(task_record) = state.task_mut(handle.task_id()) {
2092                        task_record.complete(task_outcome);
2093                    }
2094                    let _ = state.task_completed(handle.task_id());
2095                }
2096
2097                std::future::ready(Outcome::Ok((child_id, parent_has, child_has)))
2098            },
2099        ))
2100        .expect("child region created");
2101
2102        let (child_id, parent_has, child_has) = match outcome {
2103            Outcome::Ok(tuple) => tuple,
2104            other => unreachable!("expected Outcome::Ok(tuple), got {other:?}"),
2105        };
2106
2107        assert!(!parent_has, "task should not be owned by parent region");
2108        assert!(child_has, "task should be owned by child region");
2109
2110        let parent_record = state.region(parent).expect("parent record missing");
2111        assert!(
2112            !parent_record.child_ids().contains(&child_id),
2113            "closed child should be removed from parent"
2114        );
2115    }
2116
2117    #[test]
2118    fn spawn_panic_propagates_as_panicked_error() {
2119        use std::sync::Arc;
2120        use std::task::{Context, Poll, Waker};
2121
2122        struct NoopWaker;
2123        impl std::task::Wake for NoopWaker {
2124            fn wake(self: Arc<Self>) {}
2125        }
2126
2127        let mut state = RuntimeState::new();
2128        let cx = test_cx();
2129        let region = state.create_root_region(Budget::INFINITE);
2130        let scope = test_scope(region, Budget::INFINITE);
2131
2132        let (mut handle, mut stored_task) = scope
2133            .spawn(&mut state, &cx, |_| async {
2134                std::panic::panic_any("oops");
2135            })
2136            .unwrap();
2137
2138        // Drive the task
2139        let waker = Waker::from(Arc::new(NoopWaker));
2140        let mut ctx = Context::from_waker(&waker);
2141
2142        // Polling stored task should return Ready(Panicked) even if it panics (caught inside)
2143        match stored_task.poll(&mut ctx) {
2144            Poll::Ready(crate::types::Outcome::Panicked(_)) => {}
2145            res => unreachable!("Task should have completed with Panicked, got {res:?}"),
2146        }
2147
2148        // Check result via handle
2149        let mut join_fut = std::pin::pin!(handle.join(&cx));
2150        match join_fut.as_mut().poll(&mut ctx) {
2151            Poll::Ready(Err(JoinError::Panicked(p))) => {
2152                assert_eq!(p.message(), "oops");
2153            }
2154            res => unreachable!("Expected Panicked, got {res:?}"),
2155        }
2156    }
2157
2158    #[test]
2159    fn join_all_success() {
2160        use std::sync::Arc;
2161        use std::task::{Context, Poll, Waker};
2162
2163        struct NoopWaker;
2164        impl std::task::Wake for NoopWaker {
2165            fn wake(self: Arc<Self>) {}
2166        }
2167
2168        let mut state = RuntimeState::new();
2169        let cx = test_cx();
2170        let region = state.create_root_region(Budget::INFINITE);
2171        let scope = test_scope(region, Budget::INFINITE);
2172
2173        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1 }).unwrap();
2174        let (h2, mut t2) = scope.spawn(&mut state, &cx, |_| async { 2 }).unwrap();
2175
2176        // Drive tasks to completion
2177        let waker = Waker::from(Arc::new(NoopWaker));
2178        let mut ctx = Context::from_waker(&waker);
2179        assert!(t1.poll(&mut ctx).is_ready());
2180        assert!(t2.poll(&mut ctx).is_ready());
2181
2182        let handles = vec![h1, h2];
2183        let mut fut = Box::pin(scope.join_all(&cx, handles));
2184
2185        match fut.as_mut().poll(&mut ctx) {
2186            Poll::Ready(results) => {
2187                assert_eq!(results.len(), 2);
2188                assert_eq!(results[0].as_ref().unwrap(), &1);
2189                assert_eq!(results[1].as_ref().unwrap(), &2);
2190            }
2191            Poll::Pending => unreachable!("join_all should be ready"),
2192        }
2193    }
2194
2195    #[test]
2196    fn race_all_aborted_task_is_drained() {
2197        use std::sync::Arc;
2198        use std::task::{Context, Poll, Waker};
2199
2200        struct NoopWaker;
2201        impl std::task::Wake for NoopWaker {
2202            fn wake(self: Arc<Self>) {}
2203        }
2204
2205        let mut state = RuntimeState::new();
2206        let cx = test_cx();
2207        let region = state.create_root_region(Budget::INFINITE);
2208        let scope = test_scope(region, Budget::INFINITE);
2209
2210        // Task 1: completes immediately
2211        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1 }).unwrap();
2212
2213        // Task 2: yields once, checking for cancellation
2214        let (h2, mut t2) = scope
2215            .spawn(&mut state, &cx, |cx| async move {
2216                // Yield once to simulate running
2217                struct YieldOnce(bool);
2218                impl std::future::Future for YieldOnce {
2219                    type Output = ();
2220                    fn poll(
2221                        mut self: std::pin::Pin<&mut Self>,
2222                        cx: &mut std::task::Context<'_>,
2223                    ) -> std::task::Poll<()> {
2224                        if self.0 {
2225                            std::task::Poll::Ready(())
2226                        } else {
2227                            self.0 = true;
2228                            cx.waker().wake_by_ref();
2229                            std::task::Poll::Pending
2230                        }
2231                    }
2232                }
2233                YieldOnce(false).await;
2234
2235                // Check cancellation
2236                if cx.checkpoint().is_err() {
2237                    return 0; // Cancelled
2238                }
2239                2
2240            })
2241            .unwrap();
2242
2243        let waker = Waker::from(Arc::new(NoopWaker));
2244        let mut ctx = Context::from_waker(&waker);
2245
2246        // Drive t1 to completion (winner)
2247        assert!(t1.poll(&mut ctx).is_ready());
2248
2249        // Initialize race_all
2250        let handles = vec![h1, h2];
2251        let mut race_fut = Box::pin(scope.race_all(&cx, handles));
2252
2253        // Poll race_all.
2254        // It sees h1 ready. Winner=0.
2255        // It aborts h2.
2256        // It awaits h2 drain.
2257        // h2 is still pending (hasn't run), so h2.join() returns Pending.
2258        // race_fut returns Pending.
2259        assert!(race_fut.as_mut().poll(&mut ctx).is_pending());
2260
2261        // Now drive t2. It was aborted, so it should see cancellation if checked?
2262        // Wait, handle.abort() sets inner.cancel_requested.
2263        // But my t2 closure yields first.
2264        // So first poll of t2 -> YieldOnce returns Pending.
2265        assert!(t2.poll(&mut ctx).is_pending());
2266
2267        // Poll race_fut again. Still waiting for h2 drain.
2268        assert!(race_fut.as_mut().poll(&mut ctx).is_pending());
2269
2270        // Poll t2 again. YieldOnce finishes.
2271        // Then it hits checkpoint(). cancel_requested is true.
2272        // It returns 0 (simulated cancellation return).
2273        // Actually, normally tasks return Result or are wrapped.
2274        // Here spawn returns Result<i32>.
2275        // My closure returns i32.
2276        // So h2.join() will return Ok(0).
2277        // This counts as "drained".
2278        assert!(t2.poll(&mut ctx).is_ready());
2279
2280        // Now poll race_fut. h2 drain complete.
2281        // Should return (1, 0).
2282        match race_fut.as_mut().poll(&mut ctx) {
2283            Poll::Ready(Ok((val, idx))) => {
2284                assert_eq!(val, 1);
2285                assert_eq!(idx, 0);
2286            }
2287            res => unreachable!("Expected Ready(Ok((1, 0))), got {res:?}"),
2288        }
2289    }
2290
2291    #[test]
2292    fn race_surfaces_loser_panic_even_if_winner_succeeds() {
2293        use std::sync::Arc;
2294        use std::task::{Context, Waker};
2295
2296        struct NoopWaker;
2297        impl std::task::Wake for NoopWaker {
2298            fn wake(self: Arc<Self>) {}
2299        }
2300
2301        let mut state = RuntimeState::new();
2302        let cx = test_cx();
2303        let region = state.create_root_region(Budget::INFINITE);
2304        let scope = test_scope(region, Budget::INFINITE);
2305
2306        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
2307        let (h2, mut t2) = scope
2308            .spawn(&mut state, &cx, |_| async {
2309                std::panic::panic_any("loser panic");
2310            })
2311            .unwrap();
2312
2313        let waker = Waker::from(Arc::new(NoopWaker));
2314        let mut poll_cx = Context::from_waker(&waker);
2315        assert!(t1.poll(&mut poll_cx).is_ready());
2316        assert!(t2.poll(&mut poll_cx).is_ready());
2317
2318        let result = block_on(scope.race(&cx, h1, h2));
2319        assert!(
2320            matches!(result, Err(JoinError::Panicked(_))),
2321            "loser panic must dominate race result, got {result:?}"
2322        );
2323    }
2324
2325    #[test]
2326    fn race_all_surfaces_simultaneous_loser_panic() {
2327        use std::sync::Arc;
2328        use std::task::{Context, Waker};
2329
2330        struct NoopWaker;
2331        impl std::task::Wake for NoopWaker {
2332            fn wake(self: Arc<Self>) {}
2333        }
2334
2335        let mut state = RuntimeState::new();
2336        let cx = test_cx();
2337        let region = state.create_root_region(Budget::INFINITE);
2338        let scope = test_scope(region, Budget::INFINITE);
2339
2340        let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
2341        let (h2, mut t2) = scope
2342            .spawn(&mut state, &cx, |_| async {
2343                std::panic::panic_any("simultaneous loser panic");
2344            })
2345            .unwrap();
2346        let (h3, mut t3) = scope.spawn(&mut state, &cx, |_| async { 3_i32 }).unwrap();
2347
2348        let waker = Waker::from(Arc::new(NoopWaker));
2349        let mut poll_cx = Context::from_waker(&waker);
2350        assert!(t1.poll(&mut poll_cx).is_ready());
2351        assert!(t2.poll(&mut poll_cx).is_ready());
2352        assert!(t3.poll(&mut poll_cx).is_ready());
2353
2354        let result = block_on(scope.race_all(&cx, vec![h1, h2, h3]));
2355        assert!(
2356            matches!(result, Err(JoinError::Panicked(_))),
2357            "simultaneous loser panic must dominate race_all result, got {result:?}"
2358        );
2359    }
2360
2361    #[test]
2362    fn race_all_empty_is_pending() {
2363        let mut state = RuntimeState::new();
2364        let cx = test_cx();
2365        let region = state.create_root_region(Budget::INFINITE);
2366        let scope = test_scope(region, Budget::INFINITE);
2367
2368        let fut = scope.race_all::<i32>(&cx, vec![]);
2369        let waker = std::task::Waker::noop();
2370        let mut poll_cx = std::task::Context::from_waker(waker);
2371        let pinned = std::pin::pin!(fut);
2372        let status = std::future::Future::poll(pinned, &mut poll_cx);
2373        assert!(status.is_pending());
2374    }
2375}