seq_runtime/scheduler/spawn.rs
1//! Strand (coroutine) spawn and lifecycle cleanup.
2
3use crate::stack::Stack;
4use crate::tagged_stack::StackValue;
5use may::coroutine;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use super::{
9 ACTIVE_STRANDS, PEAK_STRANDS, SHUTDOWN_CONDVAR, SHUTDOWN_MUTEX, TOTAL_COMPLETED, TOTAL_SPAWNED,
10};
11
12// Unique strand ID generation
13static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
14
15/// Spawn a strand (coroutine) with initial stack
16///
17/// # Safety
18/// - `entry` must be a valid function pointer that can safely execute on any thread
19/// - `initial_stack` must be either null or a valid pointer to a `StackValue` that:
20/// - Was heap-allocated (e.g., via Box)
21/// - Has a 'static lifetime or lives longer than the coroutine
22/// - Is safe to access from the spawned thread
23/// - The caller transfers ownership of `initial_stack` to the coroutine
24/// - Returns a unique strand ID (positive integer)
25///
26/// # Memory Management
27/// The spawned coroutine takes ownership of `initial_stack` and will automatically
28/// free the final stack returned by `entry` upon completion.
29#[unsafe(no_mangle)]
30pub unsafe extern "C" fn patch_seq_strand_spawn(
31 entry: extern "C" fn(Stack) -> Stack,
32 initial_stack: Stack,
33) -> i64 {
34 // For backwards compatibility, use null base (won't support nested spawns)
35 unsafe { patch_seq_strand_spawn_with_base(entry, initial_stack, std::ptr::null_mut()) }
36}
37
38/// Spawn a strand (coroutine) with initial stack and explicit stack base
39///
40/// This variant allows setting the STACK_BASE for the spawned strand, which is
41/// required for the child to perform operations like clone_stack (nested spawn).
42///
43/// # Safety
44/// - `entry` must be a valid function pointer that can safely execute on any thread
45/// - `initial_stack` must be a valid pointer to a `StackValue` array
46/// - `stack_base` must be the base of the stack (or null to skip setting STACK_BASE)
47/// - The caller transfers ownership of `initial_stack` to the coroutine
48/// - Returns a unique strand ID (positive integer)
49#[unsafe(no_mangle)]
50pub unsafe extern "C" fn patch_seq_strand_spawn_with_base(
51 entry: extern "C" fn(Stack) -> Stack,
52 initial_stack: Stack,
53 stack_base: Stack,
54) -> i64 {
55 // Generate unique strand ID
56 let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
57
58 // Increment active strand counter and track total spawned
59 let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
60 TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
61
62 // Update peak strands if this is a new high-water mark
63 // Uses a CAS loop to safely update the maximum without locks
64 // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
65 let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
66 while new_count > peak {
67 match PEAK_STRANDS.compare_exchange_weak(
68 peak,
69 new_count,
70 Ordering::Release,
71 Ordering::Relaxed,
72 ) {
73 Ok(_) => break,
74 Err(current) => peak = current,
75 }
76 }
77
78 // Register strand in the registry (for diagnostics visibility)
79 // If registry is full, strand still runs but isn't tracked
80 #[cfg(feature = "diagnostics")]
81 let _ = super::registry::strand_registry().register(strand_id);
82
83 // Function pointers are already Send, no wrapper needed
84 let entry_fn = entry;
85
86 // Convert pointers to usize (which is Send)
87 // This is necessary because *mut T is !Send, but the caller guarantees thread safety
88 let stack_addr = initial_stack as usize;
89 let base_addr = stack_base as usize;
90
91 unsafe {
92 coroutine::spawn(move || {
93 // Reconstruct pointers from addresses
94 let stack_ptr = stack_addr as *mut StackValue;
95 let base_ptr = base_addr as *mut StackValue;
96
97 // Debug assertion: validate stack pointer alignment and reasonable address
98 debug_assert!(
99 stack_ptr.is_null()
100 || stack_addr.is_multiple_of(std::mem::align_of::<StackValue>()),
101 "Stack pointer must be null or properly aligned"
102 );
103 debug_assert!(
104 stack_ptr.is_null() || stack_addr > 0x1000,
105 "Stack pointer appears to be in invalid memory region (< 0x1000)"
106 );
107
108 // Set STACK_BASE for this strand if provided
109 // This enables nested spawns and other operations that need clone_stack
110 if !base_ptr.is_null() {
111 crate::stack::patch_seq_set_stack_base(base_ptr);
112 }
113
114 // Execute the entry function
115 let final_stack = entry_fn(stack_ptr);
116
117 // Clean up the final stack to prevent memory leak
118 free_stack(final_stack);
119
120 // Unregister strand from registry (uses captured strand_id)
121 #[cfg(feature = "diagnostics")]
122 super::registry::strand_registry().unregister(strand_id);
123
124 // Decrement active strand counter first, then track completion
125 // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
126 // is never violated from an external observer's perspective
127 // Use AcqRel to establish proper synchronization (both acquire and release barriers)
128 let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
129
130 // Track completion after decrementing active count
131 TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
132 if prev_count == 1 {
133 // We were the last strand - acquire mutex and signal shutdown
134 // The mutex must be held when calling notify to prevent missed wakeups
135 let _guard = SHUTDOWN_MUTEX.lock()
136 .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
137 SHUTDOWN_CONDVAR.notify_all();
138 }
139 });
140 }
141
142 strand_id as i64
143}
144
145/// Free a stack allocated by the runtime
146///
147/// With the tagged stack implementation, stack cleanup is handled differently.
148/// The contiguous array is freed when the TaggedStack is dropped.
149/// This function just resets the thread-local arena.
150///
151/// # Safety
152/// Stack pointer must be valid or null.
153pub(super) fn free_stack(_stack: Stack) {
154 // With tagged stack, the array is freed when TaggedStack is dropped.
155 // We just need to reset the arena for thread-local strings.
156
157 // Reset the thread-local arena to free all arena-allocated strings
158 // This is safe because:
159 // - Any arena strings in Values have been dropped above
160 // - Global strings are unaffected (they have their own allocations)
161 // - Channel sends clone to global, so no cross-strand arena pointers
162 crate::arena::arena_reset();
163}
164
165/// Legacy spawn_strand function (kept for compatibility)
166///
167/// # Safety
168/// `entry` must be a valid function pointer that can safely execute on any thread.
169#[unsafe(no_mangle)]
170pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
171 unsafe {
172 patch_seq_strand_spawn(entry, std::ptr::null_mut());
173 }
174}