seq_runtime/
weave.rs

1//! Weave operations for generator/coroutine-style concurrency
2//!
3//! A "weave" is a strand that can yield values back to its caller and be resumed.
4//! Unlike regular strands (fire-and-forget), weaves allow bidirectional communication
5//! with structured yield/resume semantics.
6//!
7//! ## Zero-Mutex Design
8//!
9//! Like channels, weaves pass their communication handles directly on the stack.
10//! There is NO global registry and NO mutex contention. The weave context travels
11//! with the stack values.
12//!
13//! ## API
14//!
15//! - `strand.weave`: ( Quotation -- WeaveHandle ) - creates a woven strand, returns handle
16//! - `strand.resume`: ( WeaveHandle a -- WeaveHandle a Bool ) - resume with value
17//! - `strand.weave-cancel`: ( WeaveHandle -- ) - cancel a weave and release its resources
18//! - `yield`: ( WeaveCtx a -- WeaveCtx a ) - yield a value (only valid inside weave)
19//!
20//! ## Architecture
21//!
22//! Each weave has two internal channels that travel as values:
23//! - The WeaveHandle (returned to caller) contains the yield_chan for receiving
24//! - The WeaveCtx (on weave's stack) contains both channels for yield to use
25//!
26//! Flow:
27//! 1. strand.weave creates channels, spawns coroutine with WeaveCtx on stack
28//! 2. The coroutine waits on resume_chan for the first resume value
29//! 3. Caller calls strand.resume with WeaveHandle, sending value to resume_chan
30//! 4. Coroutine wakes, receives value, runs until yield
31//! 5. yield uses WeaveCtx to send/receive, returns with new resume value
32//! 6. When quotation returns, WeaveCtx signals completion
33//!
34//! ## Resource Management
35//!
36//! **Important:** Weaves must either be resumed until completion OR explicitly
37//! cancelled with `strand.weave-cancel`. Dropping a WeaveHandle without doing
38//! either will cause the spawned coroutine to hang forever waiting on resume_chan.
39//!
40//! Proper cleanup options:
41//!
42//! **Option 1: Resume until completion**
43//! ```seq
44//! [ generator-body ] strand.weave  # Create weave
45//! 0 strand.resume                   # Resume until...
46//! if                                # ...has_more is false
47//!   # process value...
48//!   drop 0 strand.resume           # Keep resuming
49//! else
50//!   drop drop                       # Clean up when done
51//! then
52//! ```
53//!
54//! **Option 2: Explicit cancellation**
55//! ```seq
56//! [ generator-body ] strand.weave  # Create weave
57//! 0 strand.resume                   # Get first value
58//! if
59//!   drop                           # We only needed the first value
60//!   strand.weave-cancel            # Cancel and clean up
61//! else
62//!   drop drop
63//! then
64//! ```
65//!
66//! ## Implementation Notes
67//!
68//! Control flow (completion, cancellation) is handled via a type-safe `WeaveMessage`
69//! enum rather than sentinel values. This means **any** Value can be safely yielded
70//! and resumed, including edge cases like `i64::MIN`.
71//!
72//! ## Error Handling
73//!
74//! All weave functions are `extern "C"` and never panic (panicking across FFI is UB).
75//!
76//! - **Type mismatches** (e.g., `strand.resume` without a WeaveHandle): These indicate
77//!   a compiler bug or memory corruption. The function prints an error to stderr and
78//!   calls `std::process::abort()` to terminate immediately.
79//!
80//! - **Channel errors in `yield`**: If channels close unexpectedly while a coroutine
81//!   is yielding, the coroutine cleans up and blocks forever. The main program can
82//!   still terminate normally since the strand is marked as completed.
83//!
84//! - **Channel errors in `resume`**: Returns `(handle, placeholder, false)` to indicate
85//!   the weave has completed or failed. The caller should check the Bool result.
86
87use crate::stack::{Stack, pop, push};
88use crate::tagged_stack::StackValue;
89use crate::value::{Value, WeaveChannelData, WeaveMessage};
90use may::sync::mpmc;
91use std::sync::Arc;
92
93/// Create a woven strand from a quotation
94///
95/// Stack effect: ( Quotation -- WeaveHandle )
96///
97/// Creates a weave from the quotation. The weave is initially suspended,
98/// waiting to be resumed with the first value. The quotation will receive
99/// a WeaveCtx on its stack that it must pass to yield operations.
100///
101/// Returns a WeaveHandle that the caller uses with strand.resume.
102///
103/// # Error Handling
104///
105/// This function never panics (panicking in extern "C" is UB). On fatal error
106/// (null stack, null function pointer, type mismatch), it prints an error
107/// and aborts the process.
108///
109/// # Safety
110/// Stack must have a Quotation on top
111#[unsafe(no_mangle)]
112pub unsafe extern "C" fn patch_seq_weave(stack: Stack) -> Stack {
113    // Note: We can't use assert! here (it panics). Use abort() for fatal errors.
114    if stack.is_null() {
115        eprintln!("strand.weave: stack is null (fatal programming error)");
116        std::process::abort();
117    }
118
119    // Create the two internal channels - NO registry, just Arc values
120    // Uses WeaveMessage for type-safe control flow (no sentinel values)
121    let (yield_sender, yield_receiver) = mpmc::channel();
122    let yield_chan = Arc::new(WeaveChannelData {
123        sender: yield_sender,
124        receiver: yield_receiver,
125    });
126
127    let (resume_sender, resume_receiver) = mpmc::channel();
128    let resume_chan = Arc::new(WeaveChannelData {
129        sender: resume_sender,
130        receiver: resume_receiver,
131    });
132
133    // Pop the quotation from stack
134    let (stack, quot_value) = unsafe { pop(stack) };
135
136    // Clone channels for the spawned strand's WeaveCtx
137    let weave_ctx_yield = Arc::clone(&yield_chan);
138    let weave_ctx_resume = Arc::clone(&resume_chan);
139
140    // Clone for the WeaveHandle returned to caller
141    let handle_yield = Arc::clone(&yield_chan);
142    let handle_resume = Arc::clone(&resume_chan);
143
144    match quot_value {
145        Value::Quotation { wrapper, .. } => {
146            if wrapper == 0 {
147                eprintln!(
148                    "strand.weave: quotation wrapper function pointer is null (compiler bug)"
149                );
150                std::process::abort();
151            }
152
153            use crate::scheduler::ACTIVE_STRANDS;
154            use may::coroutine;
155            use std::sync::atomic::Ordering;
156
157            let fn_ptr: extern "C" fn(Stack) -> Stack = unsafe { std::mem::transmute(wrapper) };
158
159            // Clone the stack for the child
160            let (child_stack, child_base) = unsafe { crate::stack::clone_stack_with_base(stack) };
161
162            // Convert pointers to usize (which is Send)
163            let stack_addr = child_stack as usize;
164            let base_addr = child_base as usize;
165
166            ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
167
168            unsafe {
169                coroutine::spawn(move || {
170                    let child_stack = stack_addr as *mut StackValue;
171                    let child_base = base_addr as *mut StackValue;
172
173                    if !child_base.is_null() {
174                        crate::stack::patch_seq_set_stack_base(child_base);
175                    }
176
177                    // Wait for first resume value before executing
178                    let first_msg = match weave_ctx_resume.receiver.recv() {
179                        Ok(msg) => msg,
180                        Err(_) => {
181                            cleanup_strand();
182                            return;
183                        }
184                    };
185
186                    // Check for cancellation before starting
187                    let first_value = match first_msg {
188                        WeaveMessage::Cancel => {
189                            // Weave was cancelled before it started - clean exit
190                            crate::arena::arena_reset();
191                            cleanup_strand();
192                            return;
193                        }
194                        WeaveMessage::Value(v) => v,
195                        WeaveMessage::Done => {
196                            // Shouldn't happen - Done is sent on yield_chan
197                            cleanup_strand();
198                            return;
199                        }
200                    };
201
202                    // Push WeaveCtx onto stack (yield_chan, resume_chan as a pair)
203                    let weave_ctx = Value::WeaveCtx {
204                        yield_chan: weave_ctx_yield.clone(),
205                        resume_chan: weave_ctx_resume.clone(),
206                    };
207                    let stack_with_ctx = push(child_stack, weave_ctx);
208
209                    // Push the first resume value
210                    let stack_with_value = push(stack_with_ctx, first_value);
211
212                    // Execute the quotation - it receives (WeaveCtx, resume_value)
213                    let final_stack = fn_ptr(stack_with_value);
214
215                    // Quotation returned - pop WeaveCtx and signal completion
216                    let (_, ctx_value) = pop(final_stack);
217                    if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
218                        let _ = yield_chan.sender.send(WeaveMessage::Done);
219                    }
220
221                    crate::arena::arena_reset();
222                    cleanup_strand();
223                });
224            }
225        }
226        Value::Closure { fn_ptr, env } => {
227            if fn_ptr == 0 {
228                eprintln!("strand.weave: closure function pointer is null (compiler bug)");
229                std::process::abort();
230            }
231
232            use crate::scheduler::ACTIVE_STRANDS;
233            use may::coroutine;
234            use std::sync::atomic::Ordering;
235
236            let fn_ref: extern "C" fn(Stack, *const Value, usize) -> Stack =
237                unsafe { std::mem::transmute(fn_ptr) };
238            let env_clone: Vec<Value> = env.iter().cloned().collect();
239
240            let child_base = crate::stack::alloc_stack();
241            let base_addr = child_base as usize;
242
243            ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
244
245            unsafe {
246                coroutine::spawn(move || {
247                    let child_base = base_addr as *mut StackValue;
248                    crate::stack::patch_seq_set_stack_base(child_base);
249
250                    // Wait for first resume value
251                    let first_msg = match weave_ctx_resume.receiver.recv() {
252                        Ok(msg) => msg,
253                        Err(_) => {
254                            cleanup_strand();
255                            return;
256                        }
257                    };
258
259                    // Check for cancellation before starting
260                    let first_value = match first_msg {
261                        WeaveMessage::Cancel => {
262                            // Weave was cancelled before it started - clean exit
263                            crate::arena::arena_reset();
264                            cleanup_strand();
265                            return;
266                        }
267                        WeaveMessage::Value(v) => v,
268                        WeaveMessage::Done => {
269                            // Shouldn't happen - Done is sent on yield_chan
270                            cleanup_strand();
271                            return;
272                        }
273                    };
274
275                    // Push WeaveCtx onto stack
276                    let weave_ctx = Value::WeaveCtx {
277                        yield_chan: weave_ctx_yield.clone(),
278                        resume_chan: weave_ctx_resume.clone(),
279                    };
280                    let stack_with_ctx = push(child_base, weave_ctx);
281                    let stack_with_value = push(stack_with_ctx, first_value);
282
283                    // Execute the closure
284                    let final_stack = fn_ref(stack_with_value, env_clone.as_ptr(), env_clone.len());
285
286                    // Signal completion
287                    let (_, ctx_value) = pop(final_stack);
288                    if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
289                        let _ = yield_chan.sender.send(WeaveMessage::Done);
290                    }
291
292                    crate::arena::arena_reset();
293                    cleanup_strand();
294                });
295            }
296        }
297        _ => {
298            eprintln!(
299                "strand.weave: expected Quotation or Closure, got {:?} (compiler bug or memory corruption)",
300                quot_value
301            );
302            std::process::abort();
303        }
304    }
305
306    // Return WeaveHandle (contains both channels for resume to use)
307    let handle = Value::WeaveCtx {
308        yield_chan: handle_yield,
309        resume_chan: handle_resume,
310    };
311    unsafe { push(stack, handle) }
312}
313
314/// Block the current coroutine forever without panicking.
315///
316/// This is used when an unrecoverable error occurs in an extern "C" function.
317/// We can't panic (UB across FFI) and we can't return (invalid state), so we
318/// clean up and block forever. The coroutine is already marked as completed
319/// via cleanup_strand(), so the program can still terminate normally.
320///
321/// # Safety
322/// Must only be called from within a spawned coroutine, never from the main thread.
323fn block_forever() -> ! {
324    let (_, rx): (mpmc::Sender<()>, mpmc::Receiver<()>) = mpmc::channel();
325    let _ = rx.recv();
326    // This loop is a fallback in case recv() somehow returns (it shouldn't)
327    loop {
328        std::thread::park();
329    }
330}
331
332/// Helper to clean up strand on exit
333fn cleanup_strand() {
334    use crate::scheduler::{ACTIVE_STRANDS, SHUTDOWN_CONDVAR, SHUTDOWN_MUTEX, TOTAL_COMPLETED};
335    use std::sync::atomic::Ordering;
336
337    let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
338    TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
339
340    if prev_count == 1 {
341        let _guard = SHUTDOWN_MUTEX
342            .lock()
343            .expect("weave: shutdown mutex poisoned");
344        SHUTDOWN_CONDVAR.notify_all();
345    }
346}
347
348/// Resume a woven strand with a value
349///
350/// Stack effect: ( WeaveHandle a -- WeaveHandle a Bool )
351///
352/// Sends value `a` to the weave and waits for it to yield.
353/// Returns (handle, yielded_value, has_more).
354/// - has_more = true: weave yielded a value
355/// - has_more = false: weave completed
356///
357/// # Error Handling
358///
359/// This function never panics (panicking in extern "C" is UB). On fatal error
360/// (null stack, type mismatch), it prints an error and aborts the process.
361///
362/// # Safety
363/// Stack must have a value on top and WeaveHandle below it
364#[unsafe(no_mangle)]
365pub unsafe extern "C" fn patch_seq_resume(stack: Stack) -> Stack {
366    // Note: We can't use assert! here (it panics). Use abort() for fatal errors.
367    if stack.is_null() {
368        eprintln!("strand.resume: stack is null (fatal programming error)");
369        std::process::abort();
370    }
371
372    // Pop the value to send
373    let (stack, value) = unsafe { pop(stack) };
374
375    // Pop the WeaveHandle
376    let (stack, handle) = unsafe { pop(stack) };
377
378    let (yield_chan, resume_chan) = match &handle {
379        Value::WeaveCtx {
380            yield_chan,
381            resume_chan,
382        } => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
383        _ => {
384            eprintln!("strand.resume: expected WeaveHandle, got {:?}", handle);
385            std::process::abort();
386        }
387    };
388
389    // Wrap value in WeaveMessage for sending
390    let msg_to_send = WeaveMessage::Value(value.clone());
391
392    // Send resume value to the weave
393    if resume_chan.sender.send(msg_to_send).is_err() {
394        // Channel closed - weave is done
395        let stack = unsafe { push(stack, handle) };
396        let stack = unsafe { push(stack, Value::Int(0)) };
397        return unsafe { push(stack, Value::Bool(false)) };
398    }
399
400    // Wait for yielded value
401    match yield_chan.receiver.recv() {
402        Ok(msg) => match msg {
403            WeaveMessage::Done => {
404                // Weave completed
405                let stack = unsafe { push(stack, handle) };
406                let stack = unsafe { push(stack, Value::Int(0)) };
407                unsafe { push(stack, Value::Bool(false)) }
408            }
409            WeaveMessage::Value(yielded) => {
410                // Normal yield
411                let stack = unsafe { push(stack, handle) };
412                let stack = unsafe { push(stack, yielded) };
413                unsafe { push(stack, Value::Bool(true)) }
414            }
415            WeaveMessage::Cancel => {
416                // Shouldn't happen - Cancel is sent on resume_chan
417                let stack = unsafe { push(stack, handle) };
418                let stack = unsafe { push(stack, Value::Int(0)) };
419                unsafe { push(stack, Value::Bool(false)) }
420            }
421        },
422        Err(_) => {
423            // Channel closed unexpectedly
424            let stack = unsafe { push(stack, handle) };
425            let stack = unsafe { push(stack, Value::Int(0)) };
426            unsafe { push(stack, Value::Bool(false)) }
427        }
428    }
429}
430
431/// Yield a value from within a woven strand
432///
433/// Stack effect: ( WeaveCtx a -- WeaveCtx a )
434///
435/// Sends value `a` to the caller and waits for the next resume value.
436/// The WeaveCtx must be passed through - it contains the channels.
437///
438/// # Error Handling
439///
440/// This function never panics (panicking in extern "C" is UB). On error:
441/// - Type mismatch: eprintln + cleanup + block forever
442/// - Channel closed: cleanup + block forever
443///
444/// The coroutine is marked as completed before blocking, so the program
445/// can still terminate normally.
446///
447/// # Safety
448/// Stack must have a value on top and WeaveCtx below it
449#[unsafe(no_mangle)]
450pub unsafe extern "C" fn patch_seq_yield(stack: Stack) -> Stack {
451    // Note: We can't use assert! here (it panics). A null stack is a fatal
452    // programming error, but we handle it gracefully to avoid UB.
453    if stack.is_null() {
454        eprintln!("yield: stack is null (fatal programming error)");
455        crate::arena::arena_reset();
456        cleanup_strand();
457        block_forever();
458    }
459
460    // Pop the value to yield
461    let (stack, value) = unsafe { pop(stack) };
462
463    // Pop the WeaveCtx
464    let (stack, ctx) = unsafe { pop(stack) };
465
466    let (yield_chan, resume_chan) = match &ctx {
467        Value::WeaveCtx {
468            yield_chan,
469            resume_chan,
470        } => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
471        _ => {
472            // Type mismatch - yield called without WeaveCtx on stack
473            // This is a programming error but we can't panic (UB)
474            eprintln!(
475                "yield: expected WeaveCtx on stack, got {:?}. \
476                 yield can only be called inside strand.weave with context threaded through.",
477                ctx
478            );
479            crate::arena::arena_reset();
480            cleanup_strand();
481            block_forever();
482        }
483    };
484
485    // Wrap value in WeaveMessage for sending
486    let msg_to_send = WeaveMessage::Value(value.clone());
487
488    // Send the yielded value
489    if yield_chan.sender.send(msg_to_send).is_err() {
490        // Channel unexpectedly closed - caller dropped the handle
491        // Clean up and block forever (can't panic in extern "C")
492        crate::arena::arena_reset();
493        cleanup_strand();
494        block_forever();
495    }
496
497    // Wait for resume value
498    let resume_msg = match resume_chan.receiver.recv() {
499        Ok(msg) => msg,
500        Err(_) => {
501            // Resume channel closed - caller dropped the handle
502            // Clean up and block forever (can't panic in extern "C")
503            crate::arena::arena_reset();
504            cleanup_strand();
505            block_forever();
506        }
507    };
508
509    // Handle the message
510    match resume_msg {
511        WeaveMessage::Cancel => {
512            // Weave was cancelled - signal completion and exit cleanly
513            let _ = yield_chan.sender.send(WeaveMessage::Done);
514            crate::arena::arena_reset();
515            cleanup_strand();
516            block_forever();
517        }
518        WeaveMessage::Value(resume_value) => {
519            // Push WeaveCtx back, then resume value
520            let stack = unsafe { push(stack, ctx) };
521            unsafe { push(stack, resume_value) }
522        }
523        WeaveMessage::Done => {
524            // Protocol error - Done should only be sent on yield_chan
525            // Can't panic, so clean up and block
526            crate::arena::arena_reset();
527            cleanup_strand();
528            block_forever();
529        }
530    }
531}
532
533/// Cancel a weave, releasing its resources
534///
535/// Stack effect: ( WeaveHandle -- )
536///
537/// Sends a cancellation signal to the weave, causing it to exit cleanly.
538/// This is necessary to avoid resource leaks when abandoning a weave
539/// before it completes naturally.
540///
541/// If the weave is:
542/// - Waiting for first resume: exits immediately
543/// - Waiting inside yield: receives cancel signal and can exit
544/// - Already completed: no effect (signal is ignored)
545///
546/// # Error Handling
547///
548/// This function never panics (panicking in extern "C" is UB). On fatal error
549/// (null stack, type mismatch), it prints an error and aborts the process.
550///
551/// # Safety
552/// Stack must have a WeaveHandle (WeaveCtx) on top
553#[unsafe(no_mangle)]
554pub unsafe extern "C" fn patch_seq_weave_cancel(stack: Stack) -> Stack {
555    // Note: We can't use assert! here (it panics). Use abort() for fatal errors.
556    if stack.is_null() {
557        eprintln!("strand.weave-cancel: stack is null (fatal programming error)");
558        std::process::abort();
559    }
560
561    // Pop the WeaveHandle
562    let (stack, handle) = unsafe { pop(stack) };
563
564    // Extract the resume channel to send cancel signal
565    match handle {
566        Value::WeaveCtx { resume_chan, .. } => {
567            // Send cancel signal - if this fails, weave is already done (fine)
568            let _ = resume_chan.sender.send(WeaveMessage::Cancel);
569        }
570        _ => {
571            eprintln!(
572                "strand.weave-cancel: expected WeaveHandle, got {:?}",
573                handle
574            );
575            std::process::abort();
576        }
577    }
578
579    // Handle is consumed (dropped), stack returned without it
580    stack
581}
582
583// Public re-exports
584pub use patch_seq_resume as resume;
585pub use patch_seq_weave as weave;
586pub use patch_seq_weave_cancel as weave_cancel;
587pub use patch_seq_yield as weave_yield;