seq_runtime/weave/spawn.rs
1//! Weave creation: `patch_seq_weave` spawns a coroutine from a quotation or
2//! closure and returns a handle the caller uses with `strand.resume`.
3
4use crate::stack::{Stack, pop, push};
5use crate::tagged_stack::StackValue;
6use crate::value::{Value, WeaveChannelData, WeaveMessage};
7use may::sync::mpmc;
8use std::sync::Arc;
9
10use super::strand_lifecycle::cleanup_strand;
11
12/// Create a woven strand from a quotation
13///
14/// Stack effect: ( Quotation -- WeaveHandle )
15///
16/// Creates a weave from the quotation. The weave is initially suspended,
17/// waiting to be resumed with the first value. The quotation will receive
18/// a WeaveCtx on its stack that it must pass to yield operations.
19///
20/// Returns a WeaveHandle that the caller uses with strand.resume.
21///
22/// # Error Handling
23///
24/// This function never panics (panicking in extern "C" is UB). On fatal error
25/// (null stack, null function pointer, type mismatch), it prints an error
26/// and aborts the process.
27///
28/// # Safety
29/// Stack must have a Quotation on top
30#[unsafe(no_mangle)]
31pub unsafe extern "C" fn patch_seq_weave(stack: Stack) -> Stack {
32 // Note: We can't use assert! here (it panics). Use abort() for fatal errors.
33 if stack.is_null() {
34 eprintln!("strand.weave: stack is null (fatal programming error)");
35 std::process::abort();
36 }
37
38 // Create the two internal channels - NO registry, just Arc values
39 // Uses WeaveMessage for type-safe control flow (no sentinel values)
40 let (yield_sender, yield_receiver) = mpmc::channel();
41 let yield_chan = Arc::new(WeaveChannelData {
42 sender: yield_sender,
43 receiver: yield_receiver,
44 });
45
46 let (resume_sender, resume_receiver) = mpmc::channel();
47 let resume_chan = Arc::new(WeaveChannelData {
48 sender: resume_sender,
49 receiver: resume_receiver,
50 });
51
52 // Pop the quotation from stack
53 let (stack, quot_value) = unsafe { pop(stack) };
54
55 // Clone channels for the spawned strand's WeaveCtx
56 let weave_ctx_yield = Arc::clone(&yield_chan);
57 let weave_ctx_resume = Arc::clone(&resume_chan);
58
59 // Clone for the WeaveHandle returned to caller
60 let handle_yield = Arc::clone(&yield_chan);
61 let handle_resume = Arc::clone(&resume_chan);
62
63 match quot_value {
64 Value::Quotation { wrapper, .. } => {
65 if wrapper == 0 {
66 eprintln!(
67 "strand.weave: quotation wrapper function pointer is null (compiler bug)"
68 );
69 std::process::abort();
70 }
71
72 use crate::scheduler::ACTIVE_STRANDS;
73 use may::coroutine;
74 use std::sync::atomic::Ordering;
75
76 let fn_ptr: extern "C" fn(Stack) -> Stack = unsafe { std::mem::transmute(wrapper) };
77
78 // Clone the stack for the child
79 let (child_stack, child_base) = unsafe { crate::stack::clone_stack_with_base(stack) };
80
81 // Convert pointers to usize (which is Send)
82 let stack_addr = child_stack as usize;
83 let base_addr = child_base as usize;
84
85 // NOTE: We do NOT increment ACTIVE_STRANDS here!
86 // The weave is "dormant" until first resume. This allows the scheduler
87 // to exit cleanly if a weave is created but never resumed (fixes #287).
88 // ACTIVE_STRANDS is incremented only after receiving the first resume.
89
90 unsafe {
91 coroutine::spawn(move || {
92 let child_stack = stack_addr as *mut StackValue;
93 let child_base = base_addr as *mut StackValue;
94
95 if !child_base.is_null() {
96 crate::stack::patch_seq_set_stack_base(child_base);
97 }
98
99 // Wait for first resume value before executing
100 // The weave is dormant at this point - not counted in ACTIVE_STRANDS
101 let first_msg = match weave_ctx_resume.receiver.recv() {
102 Ok(msg) => msg,
103 Err(_) => {
104 // Channel closed before we were resumed - just exit
105 // Don't call cleanup_strand since we never activated
106 return;
107 }
108 };
109
110 // Check for cancellation before starting
111 let first_value = match first_msg {
112 WeaveMessage::Cancel => {
113 // Weave was cancelled before it started - clean exit
114 // Don't call cleanup_strand since we never activated
115 crate::arena::arena_reset();
116 return;
117 }
118 WeaveMessage::Value(v) => v,
119 WeaveMessage::Done => {
120 // Shouldn't happen - Done is sent on yield_chan
121 // Don't call cleanup_strand since we never activated
122 return;
123 }
124 };
125
126 // NOW we're activated - increment ACTIVE_STRANDS
127 // From this point on, we must call cleanup_strand on exit
128 ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
129
130 // Push WeaveCtx onto stack (yield_chan, resume_chan as a pair)
131 let weave_ctx = Value::WeaveCtx {
132 yield_chan: weave_ctx_yield.clone(),
133 resume_chan: weave_ctx_resume.clone(),
134 };
135 let stack_with_ctx = push(child_stack, weave_ctx);
136
137 // Push the first resume value
138 let stack_with_value = push(stack_with_ctx, first_value);
139
140 // Execute the quotation - it receives (WeaveCtx, resume_value)
141 let final_stack = fn_ptr(stack_with_value);
142
143 // Quotation returned - pop WeaveCtx and signal completion
144 let (_, ctx_value) = pop(final_stack);
145 if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
146 let _ = yield_chan.sender.send(WeaveMessage::Done);
147 }
148
149 crate::arena::arena_reset();
150 cleanup_strand();
151 });
152 }
153 }
154 Value::Closure { fn_ptr, env } => {
155 if fn_ptr == 0 {
156 eprintln!("strand.weave: closure function pointer is null (compiler bug)");
157 std::process::abort();
158 }
159
160 use crate::scheduler::ACTIVE_STRANDS;
161 use may::coroutine;
162 use std::sync::atomic::Ordering;
163
164 let fn_ref: extern "C" fn(Stack, *const Value, usize) -> Stack =
165 unsafe { std::mem::transmute(fn_ptr) };
166 let env_clone: Vec<Value> = env.iter().cloned().collect();
167
168 let child_base = crate::stack::alloc_stack();
169 let base_addr = child_base as usize;
170
171 // NOTE: We do NOT increment ACTIVE_STRANDS here!
172 // The weave is "dormant" until first resume. This allows the scheduler
173 // to exit cleanly if a weave is created but never resumed (fixes #287).
174 // ACTIVE_STRANDS is incremented only after receiving the first resume.
175
176 unsafe {
177 coroutine::spawn(move || {
178 let child_base = base_addr as *mut StackValue;
179 crate::stack::patch_seq_set_stack_base(child_base);
180
181 // Wait for first resume value
182 // The weave is dormant at this point - not counted in ACTIVE_STRANDS
183 let first_msg = match weave_ctx_resume.receiver.recv() {
184 Ok(msg) => msg,
185 Err(_) => {
186 // Channel closed before we were resumed - just exit
187 // Don't call cleanup_strand since we never activated
188 return;
189 }
190 };
191
192 // Check for cancellation before starting
193 let first_value = match first_msg {
194 WeaveMessage::Cancel => {
195 // Weave was cancelled before it started - clean exit
196 // Don't call cleanup_strand since we never activated
197 crate::arena::arena_reset();
198 return;
199 }
200 WeaveMessage::Value(v) => v,
201 WeaveMessage::Done => {
202 // Shouldn't happen - Done is sent on yield_chan
203 // Don't call cleanup_strand since we never activated
204 return;
205 }
206 };
207
208 // NOW we're activated - increment ACTIVE_STRANDS
209 // From this point on, we must call cleanup_strand on exit
210 ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
211
212 // Push WeaveCtx onto stack
213 let weave_ctx = Value::WeaveCtx {
214 yield_chan: weave_ctx_yield.clone(),
215 resume_chan: weave_ctx_resume.clone(),
216 };
217 let stack_with_ctx = push(child_base, weave_ctx);
218 let stack_with_value = push(stack_with_ctx, first_value);
219
220 // Execute the closure
221 let final_stack = fn_ref(stack_with_value, env_clone.as_ptr(), env_clone.len());
222
223 // Signal completion
224 let (_, ctx_value) = pop(final_stack);
225 if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
226 let _ = yield_chan.sender.send(WeaveMessage::Done);
227 }
228
229 crate::arena::arena_reset();
230 cleanup_strand();
231 });
232 }
233 }
234 _ => {
235 eprintln!(
236 "strand.weave: expected Quotation or Closure, got {:?} (compiler bug or memory corruption)",
237 quot_value
238 );
239 std::process::abort();
240 }
241 }
242
243 // Return WeaveHandle (contains both channels for resume to use)
244 let handle = Value::WeaveCtx {
245 yield_chan: handle_yield,
246 resume_chan: handle_resume,
247 };
248 unsafe { push(stack, handle) }
249}