seq_runtime/weave.rs
1//! Weave operations for generator/coroutine-style concurrency
2//!
3//! A "weave" is a strand that can yield values back to its caller and be resumed.
4//! Unlike regular strands (fire-and-forget), weaves allow bidirectional communication
5//! with structured yield/resume semantics.
6//!
7//! ## Zero-Mutex Design
8//!
9//! Like channels, weaves pass their communication handles directly on the stack.
10//! There is NO global registry and NO mutex contention. The weave context travels
11//! with the stack values.
12//!
13//! ## API
14//!
15//! - `strand.weave`: ( Quotation -- WeaveHandle ) - creates a woven strand, returns handle
16//! - `strand.resume`: ( WeaveHandle a -- WeaveHandle a Bool ) - resume with value
17//! - `strand.weave-cancel`: ( WeaveHandle -- ) - cancel a weave and release its resources
18//! - `yield`: ( WeaveCtx a -- WeaveCtx a ) - yield a value (only valid inside weave)
19//!
20//! ## Architecture
21//!
22//! Each weave has two internal channels that travel as values:
23//! - The WeaveHandle (returned to caller) contains the yield_chan for receiving
24//! - The WeaveCtx (on weave's stack) contains both channels for yield to use
25//!
26//! Flow:
27//! 1. strand.weave creates channels, spawns coroutine with WeaveCtx on stack
28//! 2. The coroutine waits on resume_chan for the first resume value
29//! 3. Caller calls strand.resume with WeaveHandle, sending value to resume_chan
30//! 4. Coroutine wakes, receives value, runs until yield
31//! 5. yield uses WeaveCtx to send/receive, returns with new resume value
32//! 6. When quotation returns, WeaveCtx signals completion
33//!
34//! ## Resource Management
35//!
36//! **Best practice:** Weaves should either be resumed until completion OR explicitly
37//! cancelled with `strand.weave-cancel` to cleanly release resources.
38//!
39//! However, dropping a WeaveHandle without doing either is safe - the program will
40//! still exit normally. The un-resumed weave is "dormant" (not counted as an active
41//! strand) until its first resume, so it won't block program shutdown. The dormant
42//! coroutine will be cleaned up when the program exits.
43//!
44//! **Resource implications of dormant weaves:** Each dormant weave consumes memory
45//! for its coroutine stack (default 128KB, configurable via SEQ_STACK_SIZE) until
46//! program exit. For short-lived programs or REPL sessions this is fine, but
47//! long-running servers should properly cancel weaves to avoid accumulating memory.
48//!
49//! Proper cleanup options:
50//!
51//! **Option 1: Resume until completion**
52//! ```seq
53//! [ generator-body ] strand.weave # Create weave
54//! 0 strand.resume # Resume until...
55//! if # ...has_more is false
56//! # process value...
57//! drop 0 strand.resume # Keep resuming
58//! else
59//! drop drop # Clean up when done
60//! then
61//! ```
62//!
63//! **Option 2: Explicit cancellation**
64//! ```seq
65//! [ generator-body ] strand.weave # Create weave
66//! 0 strand.resume # Get first value
67//! if
68//! drop # We only needed the first value
69//! strand.weave-cancel # Cancel and clean up
70//! else
71//! drop drop
72//! then
73//! ```
74//!
75//! ## Implementation Notes
76//!
77//! Control flow (completion, cancellation) is handled via a type-safe `WeaveMessage`
78//! enum rather than sentinel values. This means **any** Value can be safely yielded
79//! and resumed, including edge cases like `i64::MIN`.
80//!
81//! ## Error Handling
82//!
83//! All weave functions are `extern "C"` and never panic (panicking across FFI is UB).
84//!
85//! - **Type mismatches** (e.g., `strand.resume` without a WeaveHandle): These indicate
86//! a compiler bug or memory corruption. The function prints an error to stderr and
87//! calls `std::process::abort()` to terminate immediately.
88//!
89//! - **Channel errors in `yield`**: If channels close unexpectedly while a coroutine
90//! is yielding, the coroutine cleans up and blocks forever. The main program can
91//! still terminate normally since the strand is marked as completed.
92//!
93//! - **Channel errors in `resume`**: Returns `(handle, placeholder, false)` to indicate
94//! the weave has completed or failed. The caller should check the Bool result.
95
96use crate::stack::{Stack, pop, push};
97use crate::tagged_stack::StackValue;
98use crate::value::{Value, WeaveChannelData, WeaveMessage};
99use may::sync::mpmc;
100use std::sync::Arc;
101
102/// Create a woven strand from a quotation
103///
104/// Stack effect: ( Quotation -- WeaveHandle )
105///
106/// Creates a weave from the quotation. The weave is initially suspended,
107/// waiting to be resumed with the first value. The quotation will receive
108/// a WeaveCtx on its stack that it must pass to yield operations.
109///
110/// Returns a WeaveHandle that the caller uses with strand.resume.
111///
112/// # Error Handling
113///
114/// This function never panics (panicking in extern "C" is UB). On fatal error
115/// (null stack, null function pointer, type mismatch), it prints an error
116/// and aborts the process.
117///
118/// # Safety
119/// Stack must have a Quotation on top
120#[unsafe(no_mangle)]
121pub unsafe extern "C" fn patch_seq_weave(stack: Stack) -> Stack {
122 // Note: We can't use assert! here (it panics). Use abort() for fatal errors.
123 if stack.is_null() {
124 eprintln!("strand.weave: stack is null (fatal programming error)");
125 std::process::abort();
126 }
127
128 // Create the two internal channels - NO registry, just Arc values
129 // Uses WeaveMessage for type-safe control flow (no sentinel values)
130 let (yield_sender, yield_receiver) = mpmc::channel();
131 let yield_chan = Arc::new(WeaveChannelData {
132 sender: yield_sender,
133 receiver: yield_receiver,
134 });
135
136 let (resume_sender, resume_receiver) = mpmc::channel();
137 let resume_chan = Arc::new(WeaveChannelData {
138 sender: resume_sender,
139 receiver: resume_receiver,
140 });
141
142 // Pop the quotation from stack
143 let (stack, quot_value) = unsafe { pop(stack) };
144
145 // Clone channels for the spawned strand's WeaveCtx
146 let weave_ctx_yield = Arc::clone(&yield_chan);
147 let weave_ctx_resume = Arc::clone(&resume_chan);
148
149 // Clone for the WeaveHandle returned to caller
150 let handle_yield = Arc::clone(&yield_chan);
151 let handle_resume = Arc::clone(&resume_chan);
152
153 match quot_value {
154 Value::Quotation { wrapper, .. } => {
155 if wrapper == 0 {
156 eprintln!(
157 "strand.weave: quotation wrapper function pointer is null (compiler bug)"
158 );
159 std::process::abort();
160 }
161
162 use crate::scheduler::ACTIVE_STRANDS;
163 use may::coroutine;
164 use std::sync::atomic::Ordering;
165
166 let fn_ptr: extern "C" fn(Stack) -> Stack = unsafe { std::mem::transmute(wrapper) };
167
168 // Clone the stack for the child
169 let (child_stack, child_base) = unsafe { crate::stack::clone_stack_with_base(stack) };
170
171 // Convert pointers to usize (which is Send)
172 let stack_addr = child_stack as usize;
173 let base_addr = child_base as usize;
174
175 // NOTE: We do NOT increment ACTIVE_STRANDS here!
176 // The weave is "dormant" until first resume. This allows the scheduler
177 // to exit cleanly if a weave is created but never resumed (fixes #287).
178 // ACTIVE_STRANDS is incremented only after receiving the first resume.
179
180 unsafe {
181 coroutine::spawn(move || {
182 let child_stack = stack_addr as *mut StackValue;
183 let child_base = base_addr as *mut StackValue;
184
185 if !child_base.is_null() {
186 crate::stack::patch_seq_set_stack_base(child_base);
187 }
188
189 // Wait for first resume value before executing
190 // The weave is dormant at this point - not counted in ACTIVE_STRANDS
191 let first_msg = match weave_ctx_resume.receiver.recv() {
192 Ok(msg) => msg,
193 Err(_) => {
194 // Channel closed before we were resumed - just exit
195 // Don't call cleanup_strand since we never activated
196 return;
197 }
198 };
199
200 // Check for cancellation before starting
201 let first_value = match first_msg {
202 WeaveMessage::Cancel => {
203 // Weave was cancelled before it started - clean exit
204 // Don't call cleanup_strand since we never activated
205 crate::arena::arena_reset();
206 return;
207 }
208 WeaveMessage::Value(v) => v,
209 WeaveMessage::Done => {
210 // Shouldn't happen - Done is sent on yield_chan
211 // Don't call cleanup_strand since we never activated
212 return;
213 }
214 };
215
216 // NOW we're activated - increment ACTIVE_STRANDS
217 // From this point on, we must call cleanup_strand on exit
218 ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
219
220 // Push WeaveCtx onto stack (yield_chan, resume_chan as a pair)
221 let weave_ctx = Value::WeaveCtx {
222 yield_chan: weave_ctx_yield.clone(),
223 resume_chan: weave_ctx_resume.clone(),
224 };
225 let stack_with_ctx = push(child_stack, weave_ctx);
226
227 // Push the first resume value
228 let stack_with_value = push(stack_with_ctx, first_value);
229
230 // Execute the quotation - it receives (WeaveCtx, resume_value)
231 let final_stack = fn_ptr(stack_with_value);
232
233 // Quotation returned - pop WeaveCtx and signal completion
234 let (_, ctx_value) = pop(final_stack);
235 if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
236 let _ = yield_chan.sender.send(WeaveMessage::Done);
237 }
238
239 crate::arena::arena_reset();
240 cleanup_strand();
241 });
242 }
243 }
244 Value::Closure { fn_ptr, env } => {
245 if fn_ptr == 0 {
246 eprintln!("strand.weave: closure function pointer is null (compiler bug)");
247 std::process::abort();
248 }
249
250 use crate::scheduler::ACTIVE_STRANDS;
251 use may::coroutine;
252 use std::sync::atomic::Ordering;
253
254 let fn_ref: extern "C" fn(Stack, *const Value, usize) -> Stack =
255 unsafe { std::mem::transmute(fn_ptr) };
256 let env_clone: Vec<Value> = env.iter().cloned().collect();
257
258 let child_base = crate::stack::alloc_stack();
259 let base_addr = child_base as usize;
260
261 // NOTE: We do NOT increment ACTIVE_STRANDS here!
262 // The weave is "dormant" until first resume. This allows the scheduler
263 // to exit cleanly if a weave is created but never resumed (fixes #287).
264 // ACTIVE_STRANDS is incremented only after receiving the first resume.
265
266 unsafe {
267 coroutine::spawn(move || {
268 let child_base = base_addr as *mut StackValue;
269 crate::stack::patch_seq_set_stack_base(child_base);
270
271 // Wait for first resume value
272 // The weave is dormant at this point - not counted in ACTIVE_STRANDS
273 let first_msg = match weave_ctx_resume.receiver.recv() {
274 Ok(msg) => msg,
275 Err(_) => {
276 // Channel closed before we were resumed - just exit
277 // Don't call cleanup_strand since we never activated
278 return;
279 }
280 };
281
282 // Check for cancellation before starting
283 let first_value = match first_msg {
284 WeaveMessage::Cancel => {
285 // Weave was cancelled before it started - clean exit
286 // Don't call cleanup_strand since we never activated
287 crate::arena::arena_reset();
288 return;
289 }
290 WeaveMessage::Value(v) => v,
291 WeaveMessage::Done => {
292 // Shouldn't happen - Done is sent on yield_chan
293 // Don't call cleanup_strand since we never activated
294 return;
295 }
296 };
297
298 // NOW we're activated - increment ACTIVE_STRANDS
299 // From this point on, we must call cleanup_strand on exit
300 ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
301
302 // Push WeaveCtx onto stack
303 let weave_ctx = Value::WeaveCtx {
304 yield_chan: weave_ctx_yield.clone(),
305 resume_chan: weave_ctx_resume.clone(),
306 };
307 let stack_with_ctx = push(child_base, weave_ctx);
308 let stack_with_value = push(stack_with_ctx, first_value);
309
310 // Execute the closure
311 let final_stack = fn_ref(stack_with_value, env_clone.as_ptr(), env_clone.len());
312
313 // Signal completion
314 let (_, ctx_value) = pop(final_stack);
315 if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
316 let _ = yield_chan.sender.send(WeaveMessage::Done);
317 }
318
319 crate::arena::arena_reset();
320 cleanup_strand();
321 });
322 }
323 }
324 _ => {
325 eprintln!(
326 "strand.weave: expected Quotation or Closure, got {:?} (compiler bug or memory corruption)",
327 quot_value
328 );
329 std::process::abort();
330 }
331 }
332
333 // Return WeaveHandle (contains both channels for resume to use)
334 let handle = Value::WeaveCtx {
335 yield_chan: handle_yield,
336 resume_chan: handle_resume,
337 };
338 unsafe { push(stack, handle) }
339}
340
341/// Block the current coroutine forever without panicking.
342///
343/// This is used when an unrecoverable error occurs in an extern "C" function.
344/// We can't panic (UB across FFI) and we can't return (invalid state), so we
345/// clean up and block forever. The coroutine is already marked as completed
346/// via cleanup_strand(), so the program can still terminate normally.
347///
348/// # Safety
349/// Must only be called from within a spawned coroutine, never from the main thread.
350///
351/// # Implementation
352/// Uses May's coroutine-aware channel blocking. We keep the sender alive so that
353/// recv() blocks the coroutine (not the OS thread) indefinitely. This is critical
354/// because std::thread::park() would block the OS thread and starve all other
355/// coroutines on that thread, potentially deadlocking the scheduler.
356fn block_forever() -> ! {
357 // Create channel and keep sender alive to prevent recv() from returning Err
358 let (tx, rx): (mpmc::Sender<()>, mpmc::Receiver<()>) = mpmc::channel();
359 // Leak the sender so it's never dropped - this ensures recv() blocks forever
360 // rather than returning Err(RecvError) when all senders are dropped.
361 // This is an intentional memory leak - we're blocking forever anyway.
362 std::mem::forget(tx);
363 // Block forever using May's coroutine-aware recv()
364 // This yields the coroutine to the scheduler rather than blocking the OS thread
365 loop {
366 let _ = rx.recv();
367 }
368}
369
370/// Helper to clean up strand on exit
371fn cleanup_strand() {
372 use crate::scheduler::{ACTIVE_STRANDS, SHUTDOWN_CONDVAR, SHUTDOWN_MUTEX, TOTAL_COMPLETED};
373 use std::sync::atomic::Ordering;
374
375 let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
376 TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
377
378 if prev_count == 1 {
379 let _guard = SHUTDOWN_MUTEX
380 .lock()
381 .expect("weave: shutdown mutex poisoned");
382 SHUTDOWN_CONDVAR.notify_all();
383 }
384}
385
386/// Resume a woven strand with a value
387///
388/// Stack effect: ( WeaveHandle a -- WeaveHandle a Bool )
389///
390/// Sends value `a` to the weave and waits for it to yield.
391/// Returns (handle, yielded_value, has_more).
392/// - has_more = true: weave yielded a value
393/// - has_more = false: weave completed
394///
395/// # Error Handling
396///
397/// This function never panics (panicking in extern "C" is UB). On fatal error
398/// (null stack, type mismatch), it prints an error and aborts the process.
399///
400/// # Safety
401/// Stack must have a value on top and WeaveHandle below it
402#[unsafe(no_mangle)]
403pub unsafe extern "C" fn patch_seq_resume(stack: Stack) -> Stack {
404 // Note: We can't use assert! here (it panics). Use abort() for fatal errors.
405 if stack.is_null() {
406 eprintln!("strand.resume: stack is null (fatal programming error)");
407 std::process::abort();
408 }
409
410 // Pop the value to send
411 let (stack, value) = unsafe { pop(stack) };
412
413 // Pop the WeaveHandle
414 let (stack, handle) = unsafe { pop(stack) };
415
416 let (yield_chan, resume_chan) = match &handle {
417 Value::WeaveCtx {
418 yield_chan,
419 resume_chan,
420 } => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
421 _ => {
422 eprintln!("strand.resume: expected WeaveHandle, got {:?}", handle);
423 std::process::abort();
424 }
425 };
426
427 // Wrap value in WeaveMessage for sending
428 let msg_to_send = WeaveMessage::Value(value.clone());
429
430 // Send resume value to the weave
431 if resume_chan.sender.send(msg_to_send).is_err() {
432 // Channel closed - weave is done
433 let stack = unsafe { push(stack, handle) };
434 let stack = unsafe { push(stack, Value::Int(0)) };
435 return unsafe { push(stack, Value::Bool(false)) };
436 }
437
438 // Wait for yielded value
439 match yield_chan.receiver.recv() {
440 Ok(msg) => match msg {
441 WeaveMessage::Done => {
442 // Weave completed
443 let stack = unsafe { push(stack, handle) };
444 let stack = unsafe { push(stack, Value::Int(0)) };
445 unsafe { push(stack, Value::Bool(false)) }
446 }
447 WeaveMessage::Value(yielded) => {
448 // Normal yield
449 let stack = unsafe { push(stack, handle) };
450 let stack = unsafe { push(stack, yielded) };
451 unsafe { push(stack, Value::Bool(true)) }
452 }
453 WeaveMessage::Cancel => {
454 // Shouldn't happen - Cancel is sent on resume_chan
455 let stack = unsafe { push(stack, handle) };
456 let stack = unsafe { push(stack, Value::Int(0)) };
457 unsafe { push(stack, Value::Bool(false)) }
458 }
459 },
460 Err(_) => {
461 // Channel closed unexpectedly
462 let stack = unsafe { push(stack, handle) };
463 let stack = unsafe { push(stack, Value::Int(0)) };
464 unsafe { push(stack, Value::Bool(false)) }
465 }
466 }
467}
468
469/// Yield a value from within a woven strand
470///
471/// Stack effect: ( WeaveCtx a -- WeaveCtx a )
472///
473/// Sends value `a` to the caller and waits for the next resume value.
474/// The WeaveCtx must be passed through - it contains the channels.
475///
476/// # Error Handling
477///
478/// This function never panics (panicking in extern "C" is UB). On error:
479/// - Type mismatch: eprintln + cleanup + block forever
480/// - Channel closed: cleanup + block forever
481///
482/// The coroutine is marked as completed before blocking, so the program
483/// can still terminate normally.
484///
485/// # Safety
486/// Stack must have a value on top and WeaveCtx below it
487#[unsafe(no_mangle)]
488pub unsafe extern "C" fn patch_seq_yield(stack: Stack) -> Stack {
489 // Note: We can't use assert! here (it panics). A null stack is a fatal
490 // programming error, but we handle it gracefully to avoid UB.
491 if stack.is_null() {
492 eprintln!("yield: stack is null (fatal programming error)");
493 crate::arena::arena_reset();
494 cleanup_strand();
495 block_forever();
496 }
497
498 // Pop the value to yield
499 let (stack, value) = unsafe { pop(stack) };
500
501 // Pop the WeaveCtx
502 let (stack, ctx) = unsafe { pop(stack) };
503
504 let (yield_chan, resume_chan) = match &ctx {
505 Value::WeaveCtx {
506 yield_chan,
507 resume_chan,
508 } => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
509 _ => {
510 // Type mismatch - yield called without WeaveCtx on stack
511 // This is a programming error but we can't panic (UB)
512 eprintln!(
513 "yield: expected WeaveCtx on stack, got {:?}. \
514 yield can only be called inside strand.weave with context threaded through.",
515 ctx
516 );
517 crate::arena::arena_reset();
518 cleanup_strand();
519 block_forever();
520 }
521 };
522
523 // Wrap value in WeaveMessage for sending
524 let msg_to_send = WeaveMessage::Value(value.clone());
525
526 // Send the yielded value
527 if yield_chan.sender.send(msg_to_send).is_err() {
528 // Channel unexpectedly closed - caller dropped the handle
529 // Clean up and block forever (can't panic in extern "C")
530 // We're still active here, so call cleanup_strand
531 crate::arena::arena_reset();
532 cleanup_strand();
533 block_forever();
534 }
535
536 // IMPORTANT: Become "dormant" before waiting for resume (fixes #287)
537 // This allows the scheduler to exit if the program ends while we're waiting.
538 // We'll re-activate after receiving the resume value.
539 use crate::scheduler::ACTIVE_STRANDS;
540 use std::sync::atomic::Ordering;
541 ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
542
543 // Wait for resume value (we're dormant now - not counted as active)
544 let resume_msg = match resume_chan.receiver.recv() {
545 Ok(msg) => msg,
546 Err(_) => {
547 // Resume channel closed - caller dropped the handle
548 // We're already dormant (decremented above), don't call cleanup_strand
549 crate::arena::arena_reset();
550 block_forever();
551 }
552 };
553
554 // Handle the message
555 match resume_msg {
556 WeaveMessage::Cancel => {
557 // Weave was cancelled - signal completion and exit cleanly
558 // We're already dormant (decremented above), don't call cleanup_strand
559 let _ = yield_chan.sender.send(WeaveMessage::Done);
560 crate::arena::arena_reset();
561 block_forever();
562 }
563 WeaveMessage::Value(resume_value) => {
564 // Re-activate: we're about to run user code again
565 // Use AcqRel for consistency with the decrement above
566 ACTIVE_STRANDS.fetch_add(1, Ordering::AcqRel);
567
568 // Push WeaveCtx back, then resume value
569 let stack = unsafe { push(stack, ctx) };
570 unsafe { push(stack, resume_value) }
571 }
572 WeaveMessage::Done => {
573 // Protocol error - Done should only be sent on yield_chan
574 // We're already dormant (decremented above), don't call cleanup_strand
575 crate::arena::arena_reset();
576 block_forever();
577 }
578 }
579}
580
581/// Cancel a weave, releasing its resources
582///
583/// Stack effect: ( WeaveHandle -- )
584///
585/// Sends a cancellation signal to the weave, causing it to exit cleanly.
586/// This is necessary to avoid resource leaks when abandoning a weave
587/// before it completes naturally.
588///
589/// If the weave is:
590/// - Waiting for first resume: exits immediately
591/// - Waiting inside yield: receives cancel signal and can exit
592/// - Already completed: no effect (signal is ignored)
593///
594/// # Error Handling
595///
596/// This function never panics (panicking in extern "C" is UB). On fatal error
597/// (null stack, type mismatch), it prints an error and aborts the process.
598///
599/// # Safety
600/// Stack must have a WeaveHandle (WeaveCtx) on top
601#[unsafe(no_mangle)]
602pub unsafe extern "C" fn patch_seq_weave_cancel(stack: Stack) -> Stack {
603 // Note: We can't use assert! here (it panics). Use abort() for fatal errors.
604 if stack.is_null() {
605 eprintln!("strand.weave-cancel: stack is null (fatal programming error)");
606 std::process::abort();
607 }
608
609 // Pop the WeaveHandle
610 let (stack, handle) = unsafe { pop(stack) };
611
612 // Extract the resume channel to send cancel signal
613 match handle {
614 Value::WeaveCtx { resume_chan, .. } => {
615 // Send cancel signal - if this fails, weave is already done (fine)
616 let _ = resume_chan.sender.send(WeaveMessage::Cancel);
617 }
618 _ => {
619 eprintln!(
620 "strand.weave-cancel: expected WeaveHandle, got {:?}",
621 handle
622 );
623 std::process::abort();
624 }
625 }
626
627 // Handle is consumed (dropped), stack returned without it
628 stack
629}
630
631// Public re-exports
632pub use patch_seq_resume as resume;
633pub use patch_seq_weave as weave;
634pub use patch_seq_weave_cancel as weave_cancel;
635pub use patch_seq_yield as weave_yield;