Skip to main content

seq_runtime/weave/
spawn.rs

1//! Weave creation: `patch_seq_weave` spawns a coroutine from a quotation or
2//! closure and returns a handle the caller uses with `strand.resume`.
3
4use crate::stack::{Stack, pop, push};
5use crate::tagged_stack::StackValue;
6use crate::value::{Value, WeaveChannelData, WeaveMessage};
7use may::sync::mpmc;
8use std::sync::Arc;
9
10use super::strand_lifecycle::cleanup_strand;
11
12/// Create a woven strand from a quotation
13///
14/// Stack effect: ( Quotation -- WeaveHandle )
15///
16/// Creates a weave from the quotation. The weave is initially suspended,
17/// waiting to be resumed with the first value. The quotation will receive
18/// a WeaveCtx on its stack that it must pass to yield operations.
19///
20/// Returns a WeaveHandle that the caller uses with strand.resume.
21///
22/// # Error Handling
23///
24/// This function never panics (panicking in extern "C" is UB). On fatal error
25/// (null stack, null function pointer, type mismatch), it prints an error
26/// and aborts the process.
27///
28/// # Safety
29/// Stack must have a Quotation on top
30#[unsafe(no_mangle)]
31pub unsafe extern "C" fn patch_seq_weave(stack: Stack) -> Stack {
32    // Note: We can't use assert! here (it panics). Use abort() for fatal errors.
33    if stack.is_null() {
34        eprintln!("strand.weave: stack is null (fatal programming error)");
35        std::process::abort();
36    }
37
38    // Create the two internal channels - NO registry, just Arc values
39    // Uses WeaveMessage for type-safe control flow (no sentinel values)
40    let (yield_sender, yield_receiver) = mpmc::channel();
41    let yield_chan = Arc::new(WeaveChannelData {
42        sender: yield_sender,
43        receiver: yield_receiver,
44    });
45
46    let (resume_sender, resume_receiver) = mpmc::channel();
47    let resume_chan = Arc::new(WeaveChannelData {
48        sender: resume_sender,
49        receiver: resume_receiver,
50    });
51
52    // Pop the quotation from stack
53    let (stack, quot_value) = unsafe { pop(stack) };
54
55    // Clone channels for the spawned strand's WeaveCtx
56    let weave_ctx_yield = Arc::clone(&yield_chan);
57    let weave_ctx_resume = Arc::clone(&resume_chan);
58
59    // Clone for the WeaveHandle returned to caller
60    let handle_yield = Arc::clone(&yield_chan);
61    let handle_resume = Arc::clone(&resume_chan);
62
63    match quot_value {
64        Value::Quotation { wrapper, .. } => {
65            if wrapper == 0 {
66                eprintln!(
67                    "strand.weave: quotation wrapper function pointer is null (compiler bug)"
68                );
69                std::process::abort();
70            }
71
72            use crate::scheduler::ACTIVE_STRANDS;
73            use may::coroutine;
74            use std::sync::atomic::Ordering;
75
76            let fn_ptr: extern "C" fn(Stack) -> Stack = unsafe { std::mem::transmute(wrapper) };
77
78            // Clone the stack for the child
79            let (child_stack, child_base) = unsafe { crate::stack::clone_stack_with_base(stack) };
80
81            // Convert pointers to usize (which is Send)
82            let stack_addr = child_stack as usize;
83            let base_addr = child_base as usize;
84
85            // NOTE: We do NOT increment ACTIVE_STRANDS here!
86            // The weave is "dormant" until first resume. This allows the scheduler
87            // to exit cleanly if a weave is created but never resumed (fixes #287).
88            // ACTIVE_STRANDS is incremented only after receiving the first resume.
89
90            unsafe {
91                coroutine::spawn(move || {
92                    let child_stack = stack_addr as *mut StackValue;
93                    let child_base = base_addr as *mut StackValue;
94
95                    if !child_base.is_null() {
96                        crate::stack::patch_seq_set_stack_base(child_base);
97                    }
98
99                    // Wait for first resume value before executing
100                    // The weave is dormant at this point - not counted in ACTIVE_STRANDS
101                    let first_msg = match weave_ctx_resume.receiver.recv() {
102                        Ok(msg) => msg,
103                        Err(_) => {
104                            // Channel closed before we were resumed - just exit
105                            // Don't call cleanup_strand since we never activated
106                            return;
107                        }
108                    };
109
110                    // Check for cancellation before starting
111                    let first_value = match first_msg {
112                        WeaveMessage::Cancel => {
113                            // Weave was cancelled before it started - clean exit
114                            // Don't call cleanup_strand since we never activated
115                            crate::arena::arena_reset();
116                            return;
117                        }
118                        WeaveMessage::Value(v) => v,
119                        WeaveMessage::Done => {
120                            // Shouldn't happen - Done is sent on yield_chan
121                            // Don't call cleanup_strand since we never activated
122                            return;
123                        }
124                    };
125
126                    // NOW we're activated - increment ACTIVE_STRANDS
127                    // From this point on, we must call cleanup_strand on exit
128                    ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
129
130                    // Push WeaveCtx onto stack (yield_chan, resume_chan as a pair)
131                    let weave_ctx = Value::WeaveCtx {
132                        yield_chan: weave_ctx_yield.clone(),
133                        resume_chan: weave_ctx_resume.clone(),
134                    };
135                    let stack_with_ctx = push(child_stack, weave_ctx);
136
137                    // Push the first resume value
138                    let stack_with_value = push(stack_with_ctx, first_value);
139
140                    // Execute the quotation - it receives (WeaveCtx, resume_value)
141                    let final_stack = fn_ptr(stack_with_value);
142
143                    // Quotation returned - pop WeaveCtx and signal completion
144                    let (_, ctx_value) = pop(final_stack);
145                    if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
146                        let _ = yield_chan.sender.send(WeaveMessage::Done);
147                    }
148
149                    crate::arena::arena_reset();
150                    cleanup_strand();
151                });
152            }
153        }
154        Value::Closure { fn_ptr, env } => {
155            if fn_ptr == 0 {
156                eprintln!("strand.weave: closure function pointer is null (compiler bug)");
157                std::process::abort();
158            }
159
160            use crate::scheduler::ACTIVE_STRANDS;
161            use may::coroutine;
162            use std::sync::atomic::Ordering;
163
164            let fn_ref: extern "C" fn(Stack, *const Value, usize) -> Stack =
165                unsafe { std::mem::transmute(fn_ptr) };
166            let env_clone: Vec<Value> = env.iter().cloned().collect();
167
168            let child_base = crate::stack::alloc_stack();
169            let base_addr = child_base as usize;
170
171            // NOTE: We do NOT increment ACTIVE_STRANDS here!
172            // The weave is "dormant" until first resume. This allows the scheduler
173            // to exit cleanly if a weave is created but never resumed (fixes #287).
174            // ACTIVE_STRANDS is incremented only after receiving the first resume.
175
176            unsafe {
177                coroutine::spawn(move || {
178                    let child_base = base_addr as *mut StackValue;
179                    crate::stack::patch_seq_set_stack_base(child_base);
180
181                    // Wait for first resume value
182                    // The weave is dormant at this point - not counted in ACTIVE_STRANDS
183                    let first_msg = match weave_ctx_resume.receiver.recv() {
184                        Ok(msg) => msg,
185                        Err(_) => {
186                            // Channel closed before we were resumed - just exit
187                            // Don't call cleanup_strand since we never activated
188                            return;
189                        }
190                    };
191
192                    // Check for cancellation before starting
193                    let first_value = match first_msg {
194                        WeaveMessage::Cancel => {
195                            // Weave was cancelled before it started - clean exit
196                            // Don't call cleanup_strand since we never activated
197                            crate::arena::arena_reset();
198                            return;
199                        }
200                        WeaveMessage::Value(v) => v,
201                        WeaveMessage::Done => {
202                            // Shouldn't happen - Done is sent on yield_chan
203                            // Don't call cleanup_strand since we never activated
204                            return;
205                        }
206                    };
207
208                    // NOW we're activated - increment ACTIVE_STRANDS
209                    // From this point on, we must call cleanup_strand on exit
210                    ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
211
212                    // Push WeaveCtx onto stack
213                    let weave_ctx = Value::WeaveCtx {
214                        yield_chan: weave_ctx_yield.clone(),
215                        resume_chan: weave_ctx_resume.clone(),
216                    };
217                    let stack_with_ctx = push(child_base, weave_ctx);
218                    let stack_with_value = push(stack_with_ctx, first_value);
219
220                    // Execute the closure
221                    let final_stack = fn_ref(stack_with_value, env_clone.as_ptr(), env_clone.len());
222
223                    // Signal completion
224                    let (_, ctx_value) = pop(final_stack);
225                    if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
226                        let _ = yield_chan.sender.send(WeaveMessage::Done);
227                    }
228
229                    crate::arena::arena_reset();
230                    cleanup_strand();
231                });
232            }
233        }
234        _ => {
235            eprintln!(
236                "strand.weave: expected Quotation or Closure, got {:?} (compiler bug or memory corruption)",
237                quot_value
238            );
239            std::process::abort();
240        }
241    }
242
243    // Return WeaveHandle (contains both channels for resume to use)
244    let handle = Value::WeaveCtx {
245        yield_chan: handle_yield,
246        resume_chan: handle_resume,
247    };
248    unsafe { push(stack, handle) }
249}