seq_runtime/
weave.rs

1//! Weave operations for generator/coroutine-style concurrency
2//!
3//! A "weave" is a strand that can yield values back to its caller and be resumed.
4//! Unlike regular strands (fire-and-forget), weaves allow bidirectional communication
5//! with structured yield/resume semantics.
6//!
7//! ## Zero-Mutex Design
8//!
9//! Like channels, weaves pass their communication handles directly on the stack.
10//! There is NO global registry and NO mutex contention. The weave context travels
11//! with the stack values.
12//!
13//! ## API
14//!
15//! - `strand.weave`: ( Quotation -- WeaveHandle ) - creates a woven strand, returns handle
16//! - `strand.resume`: ( WeaveHandle a -- WeaveHandle a Bool ) - resume with value
17//! - `strand.weave-cancel`: ( WeaveHandle -- ) - cancel a weave and release its resources
18//! - `yield`: ( WeaveCtx a -- WeaveCtx a ) - yield a value (only valid inside weave)
19//!
20//! ## Architecture
21//!
22//! Each weave has two internal channels that travel as values:
23//! - The WeaveHandle (returned to caller) contains the yield_chan for receiving
24//! - The WeaveCtx (on weave's stack) contains both channels for yield to use
25//!
26//! Flow:
27//! 1. strand.weave creates channels, spawns coroutine with WeaveCtx on stack
28//! 2. The coroutine waits on resume_chan for the first resume value
29//! 3. Caller calls strand.resume with WeaveHandle, sending value to resume_chan
30//! 4. Coroutine wakes, receives value, runs until yield
31//! 5. yield uses WeaveCtx to send/receive, returns with new resume value
32//! 6. When quotation returns, WeaveCtx signals completion
33//!
34//! ## Resource Management
35//!
36//! **Important:** Weaves must either be resumed until completion OR explicitly
37//! cancelled with `strand.weave-cancel`. Dropping a WeaveHandle without doing
38//! either will cause the spawned coroutine to hang forever waiting on resume_chan.
39//!
40//! Proper cleanup options:
41//!
42//! **Option 1: Resume until completion**
43//! ```seq
44//! [ generator-body ] strand.weave  # Create weave
45//! 0 strand.resume                   # Resume until...
46//! if                                # ...has_more is false
47//!   # process value...
48//!   drop 0 strand.resume           # Keep resuming
49//! else
50//!   drop drop                       # Clean up when done
51//! then
52//! ```
53//!
54//! **Option 2: Explicit cancellation**
55//! ```seq
56//! [ generator-body ] strand.weave  # Create weave
57//! 0 strand.resume                   # Get first value
58//! if
59//!   drop                           # We only needed the first value
60//!   strand.weave-cancel            # Cancel and clean up
61//! else
62//!   drop drop
63//! then
64//! ```
65//!
66//! ## Limitations
67//!
68//! - Yielding `i64::MIN` (-9223372036854775808) is not supported as it's used
69//!   as the completion sentinel. This value will be interpreted as weave completion.
70//! - Yielding `i64::MIN + 1` (-9223372036854775807) is not supported as it's used
71//!   as the cancellation sentinel.
72
73use crate::stack::{Stack, pop, push};
74use crate::tagged_stack::StackValue;
75use crate::value::{ChannelData, Value};
76use may::sync::mpmc;
77use std::sync::Arc;
78
79/// Sentinel value to signal weave completion.
80///
81/// **Warning:** This means `i64::MIN` cannot be yielded from a weave.
82/// If a weave yields this exact value, it will be misinterpreted as completion.
83/// This is an extremely unlikely edge case (would require yielding -9223372036854775808).
84const DONE_SENTINEL: i64 = i64::MIN;
85
86/// Sentinel value to signal weave cancellation.
87/// Sent by strand.weave-cancel to wake a blocked weave and tell it to exit.
88const CANCEL_SENTINEL: i64 = i64::MIN + 1;
89
90/// Create a woven strand from a quotation
91///
92/// Stack effect: ( Quotation -- WeaveHandle )
93///
94/// Creates a weave from the quotation. The weave is initially suspended,
95/// waiting to be resumed with the first value. The quotation will receive
96/// a WeaveCtx on its stack that it must pass to yield operations.
97///
98/// Returns a WeaveHandle that the caller uses with strand.resume.
99///
100/// # Safety
101/// Stack must have a Quotation on top
102#[unsafe(no_mangle)]
103pub unsafe extern "C" fn patch_seq_weave(stack: Stack) -> Stack {
104    // Create the two internal channels - NO registry, just Arc values
105    let (yield_sender, yield_receiver) = mpmc::channel();
106    let yield_chan = Arc::new(ChannelData {
107        sender: yield_sender,
108        receiver: yield_receiver,
109    });
110
111    let (resume_sender, resume_receiver) = mpmc::channel();
112    let resume_chan = Arc::new(ChannelData {
113        sender: resume_sender,
114        receiver: resume_receiver,
115    });
116
117    // Pop the quotation from stack
118    let (stack, quot_value) = unsafe { pop(stack) };
119
120    // Clone channels for the spawned strand's WeaveCtx
121    let weave_ctx_yield = Arc::clone(&yield_chan);
122    let weave_ctx_resume = Arc::clone(&resume_chan);
123
124    // Clone for the WeaveHandle returned to caller
125    let handle_yield = Arc::clone(&yield_chan);
126    let handle_resume = Arc::clone(&resume_chan);
127
128    match quot_value {
129        Value::Quotation { wrapper, .. } => {
130            if wrapper == 0 {
131                panic!("strand.weave: quotation wrapper function pointer is null");
132            }
133
134            use crate::scheduler::ACTIVE_STRANDS;
135            use may::coroutine;
136            use std::sync::atomic::Ordering;
137
138            let fn_ptr: extern "C" fn(Stack) -> Stack = unsafe { std::mem::transmute(wrapper) };
139
140            // Clone the stack for the child
141            let (child_stack, child_base) = unsafe { crate::stack::clone_stack_with_base(stack) };
142
143            // Convert pointers to usize (which is Send)
144            let stack_addr = child_stack as usize;
145            let base_addr = child_base as usize;
146
147            ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
148
149            unsafe {
150                coroutine::spawn(move || {
151                    let child_stack = stack_addr as *mut StackValue;
152                    let child_base = base_addr as *mut StackValue;
153
154                    if !child_base.is_null() {
155                        crate::stack::patch_seq_set_stack_base(child_base);
156                    }
157
158                    // Wait for first resume value before executing
159                    let first_value = match weave_ctx_resume.receiver.recv() {
160                        Ok(v) => v,
161                        Err(_) => {
162                            cleanup_strand();
163                            return;
164                        }
165                    };
166
167                    // Check for cancellation before starting
168                    if let Value::Int(v) = &first_value
169                        && *v == CANCEL_SENTINEL
170                    {
171                        // Weave was cancelled before it started - clean exit
172                        crate::arena::arena_reset();
173                        cleanup_strand();
174                        return;
175                    }
176
177                    // Push WeaveCtx onto stack (yield_chan, resume_chan as a pair)
178                    // We use a Variant to bundle both channels
179                    let weave_ctx = Value::WeaveCtx {
180                        yield_chan: weave_ctx_yield.clone(),
181                        resume_chan: weave_ctx_resume.clone(),
182                    };
183                    let stack_with_ctx = push(child_stack, weave_ctx);
184
185                    // Push the first resume value
186                    let stack_with_value = push(stack_with_ctx, first_value);
187
188                    // Execute the quotation - it receives (WeaveCtx, resume_value)
189                    let final_stack = fn_ptr(stack_with_value);
190
191                    // Quotation returned - pop WeaveCtx and signal completion
192                    let (_, ctx_value) = pop(final_stack);
193                    if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
194                        let _ = yield_chan.sender.send(Value::Int(DONE_SENTINEL));
195                    }
196
197                    crate::arena::arena_reset();
198                    cleanup_strand();
199                });
200            }
201        }
202        Value::Closure { fn_ptr, env } => {
203            if fn_ptr == 0 {
204                panic!("strand.weave: closure function pointer is null");
205            }
206
207            use crate::scheduler::ACTIVE_STRANDS;
208            use may::coroutine;
209            use std::sync::atomic::Ordering;
210
211            let fn_ref: extern "C" fn(Stack, *const Value, usize) -> Stack =
212                unsafe { std::mem::transmute(fn_ptr) };
213            let env_clone: Vec<Value> = env.iter().cloned().collect();
214
215            let child_base = crate::stack::alloc_stack();
216            let base_addr = child_base as usize;
217
218            ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
219
220            unsafe {
221                coroutine::spawn(move || {
222                    let child_base = base_addr as *mut StackValue;
223                    crate::stack::patch_seq_set_stack_base(child_base);
224
225                    // Wait for first resume value
226                    let first_value = match weave_ctx_resume.receiver.recv() {
227                        Ok(v) => v,
228                        Err(_) => {
229                            cleanup_strand();
230                            return;
231                        }
232                    };
233
234                    // Check for cancellation before starting
235                    if let Value::Int(v) = &first_value
236                        && *v == CANCEL_SENTINEL
237                    {
238                        // Weave was cancelled before it started - clean exit
239                        crate::arena::arena_reset();
240                        cleanup_strand();
241                        return;
242                    }
243
244                    // Push WeaveCtx onto stack
245                    let weave_ctx = Value::WeaveCtx {
246                        yield_chan: weave_ctx_yield.clone(),
247                        resume_chan: weave_ctx_resume.clone(),
248                    };
249                    let stack_with_ctx = push(child_base, weave_ctx);
250                    let stack_with_value = push(stack_with_ctx, first_value);
251
252                    // Execute the closure
253                    let final_stack = fn_ref(stack_with_value, env_clone.as_ptr(), env_clone.len());
254
255                    // Signal completion
256                    let (_, ctx_value) = pop(final_stack);
257                    if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
258                        let _ = yield_chan.sender.send(Value::Int(DONE_SENTINEL));
259                    }
260
261                    crate::arena::arena_reset();
262                    cleanup_strand();
263                });
264            }
265        }
266        _ => panic!(
267            "strand.weave: expected Quotation or Closure, got {:?}",
268            quot_value
269        ),
270    }
271
272    // Return WeaveHandle (contains both channels for resume to use)
273    let handle = Value::WeaveCtx {
274        yield_chan: handle_yield,
275        resume_chan: handle_resume,
276    };
277    unsafe { push(stack, handle) }
278}
279
280/// Helper to clean up strand on exit
281fn cleanup_strand() {
282    use crate::scheduler::{ACTIVE_STRANDS, SHUTDOWN_CONDVAR, SHUTDOWN_MUTEX, TOTAL_COMPLETED};
283    use std::sync::atomic::Ordering;
284
285    let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
286    TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
287
288    if prev_count == 1 {
289        let _guard = SHUTDOWN_MUTEX
290            .lock()
291            .expect("weave: shutdown mutex poisoned");
292        SHUTDOWN_CONDVAR.notify_all();
293    }
294}
295
296/// Resume a woven strand with a value
297///
298/// Stack effect: ( WeaveHandle a -- WeaveHandle a Bool )
299///
300/// Sends value `a` to the weave and waits for it to yield.
301/// Returns (handle, yielded_value, has_more).
302/// - has_more = true: weave yielded a value
303/// - has_more = false: weave completed
304///
305/// # Safety
306/// Stack must have a value on top and WeaveHandle below it
307#[unsafe(no_mangle)]
308pub unsafe extern "C" fn patch_seq_resume(stack: Stack) -> Stack {
309    assert!(!stack.is_null(), "strand.resume: stack is empty");
310
311    // Pop the value to send
312    let (stack, value) = unsafe { pop(stack) };
313
314    // Pop the WeaveHandle
315    let (stack, handle) = unsafe { pop(stack) };
316
317    let (yield_chan, resume_chan) = match &handle {
318        Value::WeaveCtx {
319            yield_chan,
320            resume_chan,
321        } => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
322        _ => panic!("strand.resume: expected WeaveHandle, got {:?}", handle),
323    };
324
325    // Clone value for sending
326    let value_to_send = value.clone();
327
328    // Send resume value to the weave
329    if resume_chan.sender.send(value_to_send).is_err() {
330        // Channel closed - weave is done
331        let stack = unsafe { push(stack, handle) };
332        let stack = unsafe { push(stack, Value::Int(0)) };
333        return unsafe { push(stack, Value::Bool(false)) };
334    }
335
336    // Wait for yielded value
337    match yield_chan.receiver.recv() {
338        Ok(yielded) => {
339            // Check for Done sentinel
340            if let Value::Int(DONE_SENTINEL) = yielded {
341                // Weave completed
342                let stack = unsafe { push(stack, handle) };
343                let stack = unsafe { push(stack, Value::Int(0)) };
344                unsafe { push(stack, Value::Bool(false)) }
345            } else {
346                // Normal yield
347                let stack = unsafe { push(stack, handle) };
348                let stack = unsafe { push(stack, yielded) };
349                unsafe { push(stack, Value::Bool(true)) }
350            }
351        }
352        Err(_) => {
353            // Channel closed unexpectedly
354            let stack = unsafe { push(stack, handle) };
355            let stack = unsafe { push(stack, Value::Int(0)) };
356            unsafe { push(stack, Value::Bool(false)) }
357        }
358    }
359}
360
361/// Yield a value from within a woven strand
362///
363/// Stack effect: ( WeaveCtx a -- WeaveCtx a )
364///
365/// Sends value `a` to the caller and waits for the next resume value.
366/// The WeaveCtx must be passed through - it contains the channels.
367///
368/// # Panics
369/// Panics if WeaveCtx is not on the stack.
370///
371/// # Safety
372/// Stack must have a value on top and WeaveCtx below it
373#[unsafe(no_mangle)]
374pub unsafe extern "C" fn patch_seq_yield(stack: Stack) -> Stack {
375    assert!(!stack.is_null(), "yield: stack is empty");
376
377    // Pop the value to yield
378    let (stack, value) = unsafe { pop(stack) };
379
380    // Pop the WeaveCtx
381    let (stack, ctx) = unsafe { pop(stack) };
382
383    let (yield_chan, resume_chan) = match &ctx {
384        Value::WeaveCtx {
385            yield_chan,
386            resume_chan,
387        } => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
388        _ => panic!(
389            "yield: expected WeaveCtx on stack, got {:?}. yield can only be called inside strand.weave with context threaded through.",
390            ctx
391        ),
392    };
393
394    // Clone value for sending
395    let value_to_send = value.clone();
396
397    // Send the yielded value
398    if yield_chan.sender.send(value_to_send).is_err() {
399        panic!("yield: yield channel closed unexpectedly");
400    }
401
402    // Wait for resume value
403    let resume_value = match resume_chan.receiver.recv() {
404        Ok(v) => v,
405        Err(_) => panic!("yield: resume channel closed unexpectedly"),
406    };
407
408    // Check for cancellation
409    if let Value::Int(v) = &resume_value
410        && *v == CANCEL_SENTINEL
411    {
412        // Weave was cancelled - signal completion and exit
413        let _ = yield_chan.sender.send(Value::Int(DONE_SENTINEL));
414        crate::arena::arena_reset();
415        cleanup_strand();
416        // Block this coroutine forever - it's already marked as completed.
417        // We can't panic because that would cross an extern "C" boundary (UB).
418        // Instead, we block on an empty channel that will never receive.
419        let (_, rx): (mpmc::Sender<()>, mpmc::Receiver<()>) = mpmc::channel();
420        let _ = rx.recv();
421        // Unreachable - but satisfy the compiler's return type
422        unreachable!("cancelled weave should block forever");
423    }
424
425    // Push WeaveCtx back, then resume value
426    let stack = unsafe { push(stack, ctx) };
427    unsafe { push(stack, resume_value) }
428}
429
430/// Cancel a weave, releasing its resources
431///
432/// Stack effect: ( WeaveHandle -- )
433///
434/// Sends a cancellation signal to the weave, causing it to exit cleanly.
435/// This is necessary to avoid resource leaks when abandoning a weave
436/// before it completes naturally.
437///
438/// If the weave is:
439/// - Waiting for first resume: exits immediately
440/// - Waiting inside yield: receives cancel signal and can exit
441/// - Already completed: no effect (signal is ignored)
442///
443/// # Safety
444/// Stack must have a WeaveHandle (WeaveCtx) on top
445#[unsafe(no_mangle)]
446pub unsafe extern "C" fn patch_seq_weave_cancel(stack: Stack) -> Stack {
447    assert!(!stack.is_null(), "strand.weave-cancel: stack is null");
448
449    // Pop the WeaveHandle
450    let (stack, handle) = unsafe { pop(stack) };
451
452    // Extract the resume channel to send cancel signal
453    match handle {
454        Value::WeaveCtx { resume_chan, .. } => {
455            // Send cancel signal - if this fails, weave is already done (fine)
456            let _ = resume_chan.sender.send(Value::Int(CANCEL_SENTINEL));
457        }
458        _ => panic!(
459            "strand.weave-cancel: expected WeaveHandle, got {:?}",
460            handle
461        ),
462    }
463
464    // Handle is consumed (dropped), stack returned without it
465    stack
466}
467
468// Public re-exports
469pub use patch_seq_resume as resume;
470pub use patch_seq_weave as weave;
471pub use patch_seq_weave_cancel as weave_cancel;
472pub use patch_seq_yield as weave_yield;