Skip to main content

seq_runtime/scheduler/
spawn.rs

1//! Strand (coroutine) spawn and lifecycle cleanup.
2
3use crate::stack::Stack;
4use crate::tagged_stack::StackValue;
5use may::coroutine;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use super::{
9    ACTIVE_STRANDS, PEAK_STRANDS, SHUTDOWN_CONDVAR, SHUTDOWN_MUTEX, TOTAL_COMPLETED, TOTAL_SPAWNED,
10};
11
12// Unique strand ID generation
13static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
14
15/// Spawn a strand (coroutine) with initial stack
16///
17/// # Safety
18/// - `entry` must be a valid function pointer that can safely execute on any thread
19/// - `initial_stack` must be either null or a valid pointer to a `StackValue` that:
20///   - Was heap-allocated (e.g., via Box)
21///   - Has a 'static lifetime or lives longer than the coroutine
22///   - Is safe to access from the spawned thread
23/// - The caller transfers ownership of `initial_stack` to the coroutine
24/// - Returns a unique strand ID (positive integer)
25///
26/// # Memory Management
27/// The spawned coroutine takes ownership of `initial_stack` and will automatically
28/// free the final stack returned by `entry` upon completion.
29#[unsafe(no_mangle)]
30pub unsafe extern "C" fn patch_seq_strand_spawn(
31    entry: extern "C" fn(Stack) -> Stack,
32    initial_stack: Stack,
33) -> i64 {
34    // For backwards compatibility, use null base (won't support nested spawns)
35    unsafe { patch_seq_strand_spawn_with_base(entry, initial_stack, std::ptr::null_mut()) }
36}
37
38/// Spawn a strand (coroutine) with initial stack and explicit stack base
39///
40/// This variant allows setting the STACK_BASE for the spawned strand, which is
41/// required for the child to perform operations like clone_stack (nested spawn).
42///
43/// # Safety
44/// - `entry` must be a valid function pointer that can safely execute on any thread
45/// - `initial_stack` must be a valid pointer to a `StackValue` array
46/// - `stack_base` must be the base of the stack (or null to skip setting STACK_BASE)
47/// - The caller transfers ownership of `initial_stack` to the coroutine
48/// - Returns a unique strand ID (positive integer)
49#[unsafe(no_mangle)]
50pub unsafe extern "C" fn patch_seq_strand_spawn_with_base(
51    entry: extern "C" fn(Stack) -> Stack,
52    initial_stack: Stack,
53    stack_base: Stack,
54) -> i64 {
55    // Generate unique strand ID
56    let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
57
58    // Increment active strand counter and track total spawned
59    let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
60    TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
61
62    // Update peak strands if this is a new high-water mark
63    // Uses a CAS loop to safely update the maximum without locks
64    // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
65    let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
66    while new_count > peak {
67        match PEAK_STRANDS.compare_exchange_weak(
68            peak,
69            new_count,
70            Ordering::Release,
71            Ordering::Relaxed,
72        ) {
73            Ok(_) => break,
74            Err(current) => peak = current,
75        }
76    }
77
78    // Register strand in the registry (for diagnostics visibility)
79    // If registry is full, strand still runs but isn't tracked
80    #[cfg(feature = "diagnostics")]
81    let _ = super::registry::strand_registry().register(strand_id);
82
83    // Function pointers are already Send, no wrapper needed
84    let entry_fn = entry;
85
86    // Convert pointers to usize (which is Send)
87    // This is necessary because *mut T is !Send, but the caller guarantees thread safety
88    let stack_addr = initial_stack as usize;
89    let base_addr = stack_base as usize;
90
91    unsafe {
92        coroutine::spawn(move || {
93            // Reconstruct pointers from addresses
94            let stack_ptr = stack_addr as *mut StackValue;
95            let base_ptr = base_addr as *mut StackValue;
96
97            // Debug assertion: validate stack pointer alignment and reasonable address
98            debug_assert!(
99                stack_ptr.is_null()
100                    || stack_addr.is_multiple_of(std::mem::align_of::<StackValue>()),
101                "Stack pointer must be null or properly aligned"
102            );
103            debug_assert!(
104                stack_ptr.is_null() || stack_addr > 0x1000,
105                "Stack pointer appears to be in invalid memory region (< 0x1000)"
106            );
107
108            // Set STACK_BASE for this strand if provided
109            // This enables nested spawns and other operations that need clone_stack
110            if !base_ptr.is_null() {
111                crate::stack::patch_seq_set_stack_base(base_ptr);
112            }
113
114            // Execute the entry function
115            let final_stack = entry_fn(stack_ptr);
116
117            // Clean up the final stack to prevent memory leak
118            free_stack(final_stack);
119
120            // Unregister strand from registry (uses captured strand_id)
121            #[cfg(feature = "diagnostics")]
122            super::registry::strand_registry().unregister(strand_id);
123
124            // Decrement active strand counter first, then track completion
125            // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
126            // is never violated from an external observer's perspective
127            // Use AcqRel to establish proper synchronization (both acquire and release barriers)
128            let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
129
130            // Track completion after decrementing active count
131            TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
132            if prev_count == 1 {
133                // We were the last strand - acquire mutex and signal shutdown
134                // The mutex must be held when calling notify to prevent missed wakeups
135                let _guard = SHUTDOWN_MUTEX.lock()
136                    .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
137                SHUTDOWN_CONDVAR.notify_all();
138            }
139        });
140    }
141
142    strand_id as i64
143}
144
145/// Free a stack allocated by the runtime
146///
147/// With the tagged stack implementation, stack cleanup is handled differently.
148/// The contiguous array is freed when the TaggedStack is dropped.
149/// This function just resets the thread-local arena.
150///
151/// # Safety
152/// Stack pointer must be valid or null.
153pub(super) fn free_stack(_stack: Stack) {
154    // With tagged stack, the array is freed when TaggedStack is dropped.
155    // We just need to reset the arena for thread-local strings.
156
157    // Reset the thread-local arena to free all arena-allocated strings
158    // This is safe because:
159    // - Any arena strings in Values have been dropped above
160    // - Global strings are unaffected (they have their own allocations)
161    // - Channel sends clone to global, so no cross-strand arena pointers
162    crate::arena::arena_reset();
163}
164
165/// Legacy spawn_strand function (kept for compatibility)
166///
167/// # Safety
168/// `entry` must be a valid function pointer that can safely execute on any thread.
169#[unsafe(no_mangle)]
170pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
171    unsafe {
172        patch_seq_strand_spawn(entry, std::ptr::null_mut());
173    }
174}