seq_runtime/weave.rs
1//! Weave operations for generator/coroutine-style concurrency
2//!
3//! A "weave" is a strand that can yield values back to its caller and be resumed.
4//! Unlike regular strands (fire-and-forget), weaves allow bidirectional communication
5//! with structured yield/resume semantics.
6//!
7//! ## Zero-Mutex Design
8//!
9//! Like channels, weaves pass their communication handles directly on the stack.
10//! There is NO global registry and NO mutex contention. The weave context travels
11//! with the stack values.
12//!
13//! ## API
14//!
15//! - `strand.weave`: ( Quotation -- WeaveHandle ) - creates a woven strand, returns handle
16//! - `strand.resume`: ( WeaveHandle a -- WeaveHandle a Bool ) - resume with value
17//! - `strand.weave-cancel`: ( WeaveHandle -- ) - cancel a weave and release its resources
18//! - `yield`: ( WeaveCtx a -- WeaveCtx a ) - yield a value (only valid inside weave)
19//!
20//! ## Architecture
21//!
22//! Each weave has two internal channels that travel as values:
23//! - The WeaveHandle (returned to caller) contains the yield_chan for receiving
24//! - The WeaveCtx (on weave's stack) contains both channels for yield to use
25//!
26//! Flow:
27//! 1. strand.weave creates channels, spawns coroutine with WeaveCtx on stack
28//! 2. The coroutine waits on resume_chan for the first resume value
29//! 3. Caller calls strand.resume with WeaveHandle, sending value to resume_chan
30//! 4. Coroutine wakes, receives value, runs until yield
31//! 5. yield uses WeaveCtx to send/receive, returns with new resume value
32//! 6. When quotation returns, WeaveCtx signals completion
33//!
34//! ## Resource Management
35//!
36//! **Important:** Weaves must either be resumed until completion OR explicitly
37//! cancelled with `strand.weave-cancel`. Dropping a WeaveHandle without doing
38//! either will cause the spawned coroutine to hang forever waiting on resume_chan.
39//!
40//! Proper cleanup options:
41//!
42//! **Option 1: Resume until completion**
43//! ```seq
44//! [ generator-body ] strand.weave # Create weave
45//! 0 strand.resume # Resume until...
46//! if # ...has_more is false
47//! # process value...
48//! drop 0 strand.resume # Keep resuming
49//! else
50//! drop drop # Clean up when done
51//! then
52//! ```
53//!
54//! **Option 2: Explicit cancellation**
55//! ```seq
56//! [ generator-body ] strand.weave # Create weave
57//! 0 strand.resume # Get first value
58//! if
59//! drop # We only needed the first value
60//! strand.weave-cancel # Cancel and clean up
61//! else
62//! drop drop
63//! then
64//! ```
65//!
66//! ## Limitations
67//!
68//! - Yielding `i64::MIN` (-9223372036854775808) is not supported as it's used
69//! as the completion sentinel. This value will be interpreted as weave completion.
70//! - Yielding `i64::MIN + 1` (-9223372036854775807) is not supported as it's used
71//! as the cancellation sentinel.
72
73use crate::stack::{Stack, pop, push};
74use crate::tagged_stack::StackValue;
75use crate::value::{ChannelData, Value};
76use may::sync::mpmc;
77use std::sync::Arc;
78
79/// Sentinel value to signal weave completion.
80///
81/// **Warning:** This means `i64::MIN` cannot be yielded from a weave.
82/// If a weave yields this exact value, it will be misinterpreted as completion.
83/// This is an extremely unlikely edge case (would require yielding -9223372036854775808).
84const DONE_SENTINEL: i64 = i64::MIN;
85
86/// Sentinel value to signal weave cancellation.
87/// Sent by strand.weave-cancel to wake a blocked weave and tell it to exit.
88const CANCEL_SENTINEL: i64 = i64::MIN + 1;
89
90/// Create a woven strand from a quotation
91///
92/// Stack effect: ( Quotation -- WeaveHandle )
93///
94/// Creates a weave from the quotation. The weave is initially suspended,
95/// waiting to be resumed with the first value. The quotation will receive
96/// a WeaveCtx on its stack that it must pass to yield operations.
97///
98/// Returns a WeaveHandle that the caller uses with strand.resume.
99///
100/// # Safety
101/// Stack must have a Quotation on top
102#[unsafe(no_mangle)]
103pub unsafe extern "C" fn patch_seq_weave(stack: Stack) -> Stack {
104 // Create the two internal channels - NO registry, just Arc values
105 let (yield_sender, yield_receiver) = mpmc::channel();
106 let yield_chan = Arc::new(ChannelData {
107 sender: yield_sender,
108 receiver: yield_receiver,
109 });
110
111 let (resume_sender, resume_receiver) = mpmc::channel();
112 let resume_chan = Arc::new(ChannelData {
113 sender: resume_sender,
114 receiver: resume_receiver,
115 });
116
117 // Pop the quotation from stack
118 let (stack, quot_value) = unsafe { pop(stack) };
119
120 // Clone channels for the spawned strand's WeaveCtx
121 let weave_ctx_yield = Arc::clone(&yield_chan);
122 let weave_ctx_resume = Arc::clone(&resume_chan);
123
124 // Clone for the WeaveHandle returned to caller
125 let handle_yield = Arc::clone(&yield_chan);
126 let handle_resume = Arc::clone(&resume_chan);
127
128 match quot_value {
129 Value::Quotation { wrapper, .. } => {
130 if wrapper == 0 {
131 panic!("strand.weave: quotation wrapper function pointer is null");
132 }
133
134 use crate::scheduler::ACTIVE_STRANDS;
135 use may::coroutine;
136 use std::sync::atomic::Ordering;
137
138 let fn_ptr: extern "C" fn(Stack) -> Stack = unsafe { std::mem::transmute(wrapper) };
139
140 // Clone the stack for the child
141 let (child_stack, child_base) = unsafe { crate::stack::clone_stack_with_base(stack) };
142
143 // Convert pointers to usize (which is Send)
144 let stack_addr = child_stack as usize;
145 let base_addr = child_base as usize;
146
147 ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
148
149 unsafe {
150 coroutine::spawn(move || {
151 let child_stack = stack_addr as *mut StackValue;
152 let child_base = base_addr as *mut StackValue;
153
154 if !child_base.is_null() {
155 crate::stack::patch_seq_set_stack_base(child_base);
156 }
157
158 // Wait for first resume value before executing
159 let first_value = match weave_ctx_resume.receiver.recv() {
160 Ok(v) => v,
161 Err(_) => {
162 cleanup_strand();
163 return;
164 }
165 };
166
167 // Check for cancellation before starting
168 if let Value::Int(v) = &first_value
169 && *v == CANCEL_SENTINEL
170 {
171 // Weave was cancelled before it started - clean exit
172 crate::arena::arena_reset();
173 cleanup_strand();
174 return;
175 }
176
177 // Push WeaveCtx onto stack (yield_chan, resume_chan as a pair)
178 // We use a Variant to bundle both channels
179 let weave_ctx = Value::WeaveCtx {
180 yield_chan: weave_ctx_yield.clone(),
181 resume_chan: weave_ctx_resume.clone(),
182 };
183 let stack_with_ctx = push(child_stack, weave_ctx);
184
185 // Push the first resume value
186 let stack_with_value = push(stack_with_ctx, first_value);
187
188 // Execute the quotation - it receives (WeaveCtx, resume_value)
189 let final_stack = fn_ptr(stack_with_value);
190
191 // Quotation returned - pop WeaveCtx and signal completion
192 let (_, ctx_value) = pop(final_stack);
193 if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
194 let _ = yield_chan.sender.send(Value::Int(DONE_SENTINEL));
195 }
196
197 crate::arena::arena_reset();
198 cleanup_strand();
199 });
200 }
201 }
202 Value::Closure { fn_ptr, env } => {
203 if fn_ptr == 0 {
204 panic!("strand.weave: closure function pointer is null");
205 }
206
207 use crate::scheduler::ACTIVE_STRANDS;
208 use may::coroutine;
209 use std::sync::atomic::Ordering;
210
211 let fn_ref: extern "C" fn(Stack, *const Value, usize) -> Stack =
212 unsafe { std::mem::transmute(fn_ptr) };
213 let env_clone: Vec<Value> = env.iter().cloned().collect();
214
215 let child_base = crate::stack::alloc_stack();
216 let base_addr = child_base as usize;
217
218 ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
219
220 unsafe {
221 coroutine::spawn(move || {
222 let child_base = base_addr as *mut StackValue;
223 crate::stack::patch_seq_set_stack_base(child_base);
224
225 // Wait for first resume value
226 let first_value = match weave_ctx_resume.receiver.recv() {
227 Ok(v) => v,
228 Err(_) => {
229 cleanup_strand();
230 return;
231 }
232 };
233
234 // Check for cancellation before starting
235 if let Value::Int(v) = &first_value
236 && *v == CANCEL_SENTINEL
237 {
238 // Weave was cancelled before it started - clean exit
239 crate::arena::arena_reset();
240 cleanup_strand();
241 return;
242 }
243
244 // Push WeaveCtx onto stack
245 let weave_ctx = Value::WeaveCtx {
246 yield_chan: weave_ctx_yield.clone(),
247 resume_chan: weave_ctx_resume.clone(),
248 };
249 let stack_with_ctx = push(child_base, weave_ctx);
250 let stack_with_value = push(stack_with_ctx, first_value);
251
252 // Execute the closure
253 let final_stack = fn_ref(stack_with_value, env_clone.as_ptr(), env_clone.len());
254
255 // Signal completion
256 let (_, ctx_value) = pop(final_stack);
257 if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
258 let _ = yield_chan.sender.send(Value::Int(DONE_SENTINEL));
259 }
260
261 crate::arena::arena_reset();
262 cleanup_strand();
263 });
264 }
265 }
266 _ => panic!(
267 "strand.weave: expected Quotation or Closure, got {:?}",
268 quot_value
269 ),
270 }
271
272 // Return WeaveHandle (contains both channels for resume to use)
273 let handle = Value::WeaveCtx {
274 yield_chan: handle_yield,
275 resume_chan: handle_resume,
276 };
277 unsafe { push(stack, handle) }
278}
279
280/// Helper to clean up strand on exit
281fn cleanup_strand() {
282 use crate::scheduler::{ACTIVE_STRANDS, SHUTDOWN_CONDVAR, SHUTDOWN_MUTEX, TOTAL_COMPLETED};
283 use std::sync::atomic::Ordering;
284
285 let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
286 TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
287
288 if prev_count == 1 {
289 let _guard = SHUTDOWN_MUTEX
290 .lock()
291 .expect("weave: shutdown mutex poisoned");
292 SHUTDOWN_CONDVAR.notify_all();
293 }
294}
295
296/// Resume a woven strand with a value
297///
298/// Stack effect: ( WeaveHandle a -- WeaveHandle a Bool )
299///
300/// Sends value `a` to the weave and waits for it to yield.
301/// Returns (handle, yielded_value, has_more).
302/// - has_more = true: weave yielded a value
303/// - has_more = false: weave completed
304///
305/// # Safety
306/// Stack must have a value on top and WeaveHandle below it
307#[unsafe(no_mangle)]
308pub unsafe extern "C" fn patch_seq_resume(stack: Stack) -> Stack {
309 assert!(!stack.is_null(), "strand.resume: stack is empty");
310
311 // Pop the value to send
312 let (stack, value) = unsafe { pop(stack) };
313
314 // Pop the WeaveHandle
315 let (stack, handle) = unsafe { pop(stack) };
316
317 let (yield_chan, resume_chan) = match &handle {
318 Value::WeaveCtx {
319 yield_chan,
320 resume_chan,
321 } => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
322 _ => panic!("strand.resume: expected WeaveHandle, got {:?}", handle),
323 };
324
325 // Clone value for sending
326 let value_to_send = value.clone();
327
328 // Send resume value to the weave
329 if resume_chan.sender.send(value_to_send).is_err() {
330 // Channel closed - weave is done
331 let stack = unsafe { push(stack, handle) };
332 let stack = unsafe { push(stack, Value::Int(0)) };
333 return unsafe { push(stack, Value::Bool(false)) };
334 }
335
336 // Wait for yielded value
337 match yield_chan.receiver.recv() {
338 Ok(yielded) => {
339 // Check for Done sentinel
340 if let Value::Int(DONE_SENTINEL) = yielded {
341 // Weave completed
342 let stack = unsafe { push(stack, handle) };
343 let stack = unsafe { push(stack, Value::Int(0)) };
344 unsafe { push(stack, Value::Bool(false)) }
345 } else {
346 // Normal yield
347 let stack = unsafe { push(stack, handle) };
348 let stack = unsafe { push(stack, yielded) };
349 unsafe { push(stack, Value::Bool(true)) }
350 }
351 }
352 Err(_) => {
353 // Channel closed unexpectedly
354 let stack = unsafe { push(stack, handle) };
355 let stack = unsafe { push(stack, Value::Int(0)) };
356 unsafe { push(stack, Value::Bool(false)) }
357 }
358 }
359}
360
361/// Yield a value from within a woven strand
362///
363/// Stack effect: ( WeaveCtx a -- WeaveCtx a )
364///
365/// Sends value `a` to the caller and waits for the next resume value.
366/// The WeaveCtx must be passed through - it contains the channels.
367///
368/// # Panics
369/// Panics if WeaveCtx is not on the stack.
370///
371/// # Safety
372/// Stack must have a value on top and WeaveCtx below it
373#[unsafe(no_mangle)]
374pub unsafe extern "C" fn patch_seq_yield(stack: Stack) -> Stack {
375 assert!(!stack.is_null(), "yield: stack is empty");
376
377 // Pop the value to yield
378 let (stack, value) = unsafe { pop(stack) };
379
380 // Pop the WeaveCtx
381 let (stack, ctx) = unsafe { pop(stack) };
382
383 let (yield_chan, resume_chan) = match &ctx {
384 Value::WeaveCtx {
385 yield_chan,
386 resume_chan,
387 } => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
388 _ => panic!(
389 "yield: expected WeaveCtx on stack, got {:?}. yield can only be called inside strand.weave with context threaded through.",
390 ctx
391 ),
392 };
393
394 // Clone value for sending
395 let value_to_send = value.clone();
396
397 // Send the yielded value
398 if yield_chan.sender.send(value_to_send).is_err() {
399 panic!("yield: yield channel closed unexpectedly");
400 }
401
402 // Wait for resume value
403 let resume_value = match resume_chan.receiver.recv() {
404 Ok(v) => v,
405 Err(_) => panic!("yield: resume channel closed unexpectedly"),
406 };
407
408 // Check for cancellation
409 if let Value::Int(v) = &resume_value
410 && *v == CANCEL_SENTINEL
411 {
412 // Weave was cancelled - signal completion and exit
413 let _ = yield_chan.sender.send(Value::Int(DONE_SENTINEL));
414 crate::arena::arena_reset();
415 cleanup_strand();
416 // Block this coroutine forever - it's already marked as completed.
417 // We can't panic because that would cross an extern "C" boundary (UB).
418 // Instead, we block on an empty channel that will never receive.
419 let (_, rx): (mpmc::Sender<()>, mpmc::Receiver<()>) = mpmc::channel();
420 let _ = rx.recv();
421 // Unreachable - but satisfy the compiler's return type
422 unreachable!("cancelled weave should block forever");
423 }
424
425 // Push WeaveCtx back, then resume value
426 let stack = unsafe { push(stack, ctx) };
427 unsafe { push(stack, resume_value) }
428}
429
430/// Cancel a weave, releasing its resources
431///
432/// Stack effect: ( WeaveHandle -- )
433///
434/// Sends a cancellation signal to the weave, causing it to exit cleanly.
435/// This is necessary to avoid resource leaks when abandoning a weave
436/// before it completes naturally.
437///
438/// If the weave is:
439/// - Waiting for first resume: exits immediately
440/// - Waiting inside yield: receives cancel signal and can exit
441/// - Already completed: no effect (signal is ignored)
442///
443/// # Safety
444/// Stack must have a WeaveHandle (WeaveCtx) on top
445#[unsafe(no_mangle)]
446pub unsafe extern "C" fn patch_seq_weave_cancel(stack: Stack) -> Stack {
447 assert!(!stack.is_null(), "strand.weave-cancel: stack is null");
448
449 // Pop the WeaveHandle
450 let (stack, handle) = unsafe { pop(stack) };
451
452 // Extract the resume channel to send cancel signal
453 match handle {
454 Value::WeaveCtx { resume_chan, .. } => {
455 // Send cancel signal - if this fails, weave is already done (fine)
456 let _ = resume_chan.sender.send(Value::Int(CANCEL_SENTINEL));
457 }
458 _ => panic!(
459 "strand.weave-cancel: expected WeaveHandle, got {:?}",
460 handle
461 ),
462 }
463
464 // Handle is consumed (dropped), stack returned without it
465 stack
466}
467
468// Public re-exports
469pub use patch_seq_resume as resume;
470pub use patch_seq_weave as weave;
471pub use patch_seq_weave_cancel as weave_cancel;
472pub use patch_seq_yield as weave_yield;