use crate::stack::{Stack, pop, push};
use crate::tagged_stack::StackValue;
use crate::value::{Value, WeaveChannelData, WeaveMessage};
use may::sync::mpmc;
use std::sync::Arc;
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_weave(stack: Stack) -> Stack {
if stack.is_null() {
eprintln!("strand.weave: stack is null (fatal programming error)");
std::process::abort();
}
let (yield_sender, yield_receiver) = mpmc::channel();
let yield_chan = Arc::new(WeaveChannelData {
sender: yield_sender,
receiver: yield_receiver,
});
let (resume_sender, resume_receiver) = mpmc::channel();
let resume_chan = Arc::new(WeaveChannelData {
sender: resume_sender,
receiver: resume_receiver,
});
let (stack, quot_value) = unsafe { pop(stack) };
let weave_ctx_yield = Arc::clone(&yield_chan);
let weave_ctx_resume = Arc::clone(&resume_chan);
let handle_yield = Arc::clone(&yield_chan);
let handle_resume = Arc::clone(&resume_chan);
match quot_value {
Value::Quotation { wrapper, .. } => {
if wrapper == 0 {
eprintln!(
"strand.weave: quotation wrapper function pointer is null (compiler bug)"
);
std::process::abort();
}
use crate::scheduler::ACTIVE_STRANDS;
use may::coroutine;
use std::sync::atomic::Ordering;
let fn_ptr: extern "C" fn(Stack) -> Stack = unsafe { std::mem::transmute(wrapper) };
let (child_stack, child_base) = unsafe { crate::stack::clone_stack_with_base(stack) };
let stack_addr = child_stack as usize;
let base_addr = child_base as usize;
unsafe {
coroutine::spawn(move || {
let child_stack = stack_addr as *mut StackValue;
let child_base = base_addr as *mut StackValue;
if !child_base.is_null() {
crate::stack::patch_seq_set_stack_base(child_base);
}
let first_msg = match weave_ctx_resume.receiver.recv() {
Ok(msg) => msg,
Err(_) => {
return;
}
};
let first_value = match first_msg {
WeaveMessage::Cancel => {
crate::arena::arena_reset();
return;
}
WeaveMessage::Value(v) => v,
WeaveMessage::Done => {
return;
}
};
ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
let weave_ctx = Value::WeaveCtx {
yield_chan: weave_ctx_yield.clone(),
resume_chan: weave_ctx_resume.clone(),
};
let stack_with_ctx = push(child_stack, weave_ctx);
let stack_with_value = push(stack_with_ctx, first_value);
let final_stack = fn_ptr(stack_with_value);
let (_, ctx_value) = pop(final_stack);
if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
let _ = yield_chan.sender.send(WeaveMessage::Done);
}
crate::arena::arena_reset();
cleanup_strand();
});
}
}
Value::Closure { fn_ptr, env } => {
if fn_ptr == 0 {
eprintln!("strand.weave: closure function pointer is null (compiler bug)");
std::process::abort();
}
use crate::scheduler::ACTIVE_STRANDS;
use may::coroutine;
use std::sync::atomic::Ordering;
let fn_ref: extern "C" fn(Stack, *const Value, usize) -> Stack =
unsafe { std::mem::transmute(fn_ptr) };
let env_clone: Vec<Value> = env.iter().cloned().collect();
let child_base = crate::stack::alloc_stack();
let base_addr = child_base as usize;
unsafe {
coroutine::spawn(move || {
let child_base = base_addr as *mut StackValue;
crate::stack::patch_seq_set_stack_base(child_base);
let first_msg = match weave_ctx_resume.receiver.recv() {
Ok(msg) => msg,
Err(_) => {
return;
}
};
let first_value = match first_msg {
WeaveMessage::Cancel => {
crate::arena::arena_reset();
return;
}
WeaveMessage::Value(v) => v,
WeaveMessage::Done => {
return;
}
};
ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
let weave_ctx = Value::WeaveCtx {
yield_chan: weave_ctx_yield.clone(),
resume_chan: weave_ctx_resume.clone(),
};
let stack_with_ctx = push(child_base, weave_ctx);
let stack_with_value = push(stack_with_ctx, first_value);
let final_stack = fn_ref(stack_with_value, env_clone.as_ptr(), env_clone.len());
let (_, ctx_value) = pop(final_stack);
if let Value::WeaveCtx { yield_chan, .. } = ctx_value {
let _ = yield_chan.sender.send(WeaveMessage::Done);
}
crate::arena::arena_reset();
cleanup_strand();
});
}
}
_ => {
eprintln!(
"strand.weave: expected Quotation or Closure, got {:?} (compiler bug or memory corruption)",
quot_value
);
std::process::abort();
}
}
let handle = Value::WeaveCtx {
yield_chan: handle_yield,
resume_chan: handle_resume,
};
unsafe { push(stack, handle) }
}
fn block_forever() -> ! {
let (tx, rx): (mpmc::Sender<()>, mpmc::Receiver<()>) = mpmc::channel();
std::mem::forget(tx);
loop {
let _ = rx.recv();
}
}
fn cleanup_strand() {
use crate::scheduler::{ACTIVE_STRANDS, SHUTDOWN_CONDVAR, SHUTDOWN_MUTEX, TOTAL_COMPLETED};
use std::sync::atomic::Ordering;
let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
if prev_count == 1 {
let _guard = SHUTDOWN_MUTEX
.lock()
.expect("weave: shutdown mutex poisoned");
SHUTDOWN_CONDVAR.notify_all();
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_resume(stack: Stack) -> Stack {
if stack.is_null() {
eprintln!("strand.resume: stack is null (fatal programming error)");
std::process::abort();
}
let (stack, value) = unsafe { pop(stack) };
let (stack, handle) = unsafe { pop(stack) };
let (yield_chan, resume_chan) = match &handle {
Value::WeaveCtx {
yield_chan,
resume_chan,
} => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
_ => {
eprintln!("strand.resume: expected WeaveHandle, got {:?}", handle);
std::process::abort();
}
};
let msg_to_send = WeaveMessage::Value(value.clone());
if resume_chan.sender.send(msg_to_send).is_err() {
let stack = unsafe { push(stack, handle) };
let stack = unsafe { push(stack, Value::Int(0)) };
return unsafe { push(stack, Value::Bool(false)) };
}
match yield_chan.receiver.recv() {
Ok(msg) => match msg {
WeaveMessage::Done => {
let stack = unsafe { push(stack, handle) };
let stack = unsafe { push(stack, Value::Int(0)) };
unsafe { push(stack, Value::Bool(false)) }
}
WeaveMessage::Value(yielded) => {
let stack = unsafe { push(stack, handle) };
let stack = unsafe { push(stack, yielded) };
unsafe { push(stack, Value::Bool(true)) }
}
WeaveMessage::Cancel => {
let stack = unsafe { push(stack, handle) };
let stack = unsafe { push(stack, Value::Int(0)) };
unsafe { push(stack, Value::Bool(false)) }
}
},
Err(_) => {
let stack = unsafe { push(stack, handle) };
let stack = unsafe { push(stack, Value::Int(0)) };
unsafe { push(stack, Value::Bool(false)) }
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_yield(stack: Stack) -> Stack {
if stack.is_null() {
eprintln!("yield: stack is null (fatal programming error)");
crate::arena::arena_reset();
cleanup_strand();
block_forever();
}
let (stack, value) = unsafe { pop(stack) };
let (stack, ctx) = unsafe { pop(stack) };
let (yield_chan, resume_chan) = match &ctx {
Value::WeaveCtx {
yield_chan,
resume_chan,
} => (Arc::clone(yield_chan), Arc::clone(resume_chan)),
_ => {
eprintln!(
"yield: expected WeaveCtx on stack, got {:?}. \
yield can only be called inside strand.weave with context threaded through.",
ctx
);
crate::arena::arena_reset();
cleanup_strand();
block_forever();
}
};
let msg_to_send = WeaveMessage::Value(value.clone());
if yield_chan.sender.send(msg_to_send).is_err() {
crate::arena::arena_reset();
cleanup_strand();
block_forever();
}
use crate::scheduler::ACTIVE_STRANDS;
use std::sync::atomic::Ordering;
ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
let resume_msg = match resume_chan.receiver.recv() {
Ok(msg) => msg,
Err(_) => {
crate::arena::arena_reset();
block_forever();
}
};
match resume_msg {
WeaveMessage::Cancel => {
let _ = yield_chan.sender.send(WeaveMessage::Done);
crate::arena::arena_reset();
block_forever();
}
WeaveMessage::Value(resume_value) => {
ACTIVE_STRANDS.fetch_add(1, Ordering::AcqRel);
let stack = unsafe { push(stack, ctx) };
unsafe { push(stack, resume_value) }
}
WeaveMessage::Done => {
crate::arena::arena_reset();
block_forever();
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_weave_cancel(stack: Stack) -> Stack {
if stack.is_null() {
eprintln!("strand.weave-cancel: stack is null (fatal programming error)");
std::process::abort();
}
let (stack, handle) = unsafe { pop(stack) };
match handle {
Value::WeaveCtx { resume_chan, .. } => {
let _ = resume_chan.sender.send(WeaveMessage::Cancel);
}
_ => {
eprintln!(
"strand.weave-cancel: expected WeaveHandle, got {:?}",
handle
);
std::process::abort();
}
}
stack
}
pub use patch_seq_resume as resume;
pub use patch_seq_weave as weave;
pub use patch_seq_weave_cancel as weave_cancel;
pub use patch_seq_yield as weave_yield;
#[cfg(test)]
mod tests {
use super::*;
use crate::quotations::push_quotation;
use crate::scheduler::{scheduler_init, wait_all_strands};
use crate::stack::{alloc_test_stack, pop, push};
use crate::value::Value;
use std::sync::atomic::{AtomicBool, Ordering};
unsafe extern "C" fn yield_once_quot(stack: Stack) -> Stack {
unsafe {
let (stack, resume_val) = pop(stack);
let n = match resume_val {
Value::Int(i) => i,
_ => 0,
};
let stack = push(stack, Value::Int(n + 100));
let stack = weave_yield(stack);
let (stack, _new_resume) = pop(stack);
stack
}
}
unsafe extern "C" fn yield_three_times_quot(stack: Stack) -> Stack {
unsafe {
let (stack, _) = pop(stack);
let stack = push(stack, Value::Int(1));
let stack = weave_yield(stack);
let (stack, _) = pop(stack);
let stack = push(stack, Value::Int(2));
let stack = weave_yield(stack);
let (stack, _) = pop(stack);
let stack = push(stack, Value::Int(3));
let stack = weave_yield(stack);
let (stack, _) = pop(stack);
stack
}
}
unsafe extern "C" fn no_yield_quot(stack: Stack) -> Stack {
unsafe {
let (stack, _) = pop(stack);
stack
}
}
unsafe extern "C" fn echo_quot(stack: Stack) -> Stack {
unsafe {
let (mut stack, mut resume_val) = pop(stack);
loop {
let n = match resume_val {
Value::Int(i) => i,
_ => -1,
};
if n < 0 {
break;
}
stack = push(stack, Value::Int(n));
stack = weave_yield(stack);
let (new_stack, new_val) = pop(stack);
stack = new_stack;
resume_val = new_val;
}
stack
}
}
#[test]
fn test_weave_create() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_once_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let (_, handle) = pop(stack);
assert!(
matches!(handle, Value::WeaveCtx { .. }),
"Expected WeaveCtx (handle), got {:?}",
handle
);
}
}
#[test]
fn test_weave_single_yield() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_once_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(42));
let stack = resume(stack);
let (stack, has_more) = pop(stack);
let (stack, yielded) = pop(stack);
let (_, _handle) = pop(stack);
assert_eq!(has_more, Value::Bool(true), "Should have more");
assert_eq!(yielded, Value::Int(142), "Should yield 42 + 100 = 142");
wait_all_strands();
}
}
#[test]
fn test_weave_completion() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_once_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(10));
let stack = resume(stack);
let (stack, has_more1) = pop(stack);
let (stack, _yielded) = pop(stack);
assert_eq!(has_more1, Value::Bool(true));
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, has_more2) = pop(stack);
let (_stack, _placeholder) = pop(stack);
assert_eq!(has_more2, Value::Bool(false), "Weave should be complete");
wait_all_strands();
}
}
#[test]
fn test_weave_no_yield() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = no_yield_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, has_more) = pop(stack);
let (_stack, _placeholder) = pop(stack);
assert_eq!(
has_more,
Value::Bool(false),
"Weave should complete immediately"
);
wait_all_strands();
}
}
#[test]
fn test_weave_multiple_yields() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_three_times_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, has_more1) = pop(stack);
let (stack, yielded1) = pop(stack);
assert_eq!(has_more1, Value::Bool(true));
assert_eq!(yielded1, Value::Int(1));
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, has_more2) = pop(stack);
let (stack, yielded2) = pop(stack);
assert_eq!(has_more2, Value::Bool(true));
assert_eq!(yielded2, Value::Int(2));
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, has_more3) = pop(stack);
let (stack, yielded3) = pop(stack);
assert_eq!(has_more3, Value::Bool(true));
assert_eq!(yielded3, Value::Int(3));
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, has_more4) = pop(stack);
let (_stack, _) = pop(stack);
assert_eq!(has_more4, Value::Bool(false));
wait_all_strands();
}
}
#[test]
fn test_weave_echo() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = echo_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(42));
let stack = resume(stack);
let (stack, has_more) = pop(stack);
let (stack, yielded) = pop(stack);
assert_eq!(has_more, Value::Bool(true));
assert_eq!(yielded, Value::Int(42));
let stack = push(stack, Value::Int(99));
let stack = resume(stack);
let (stack, has_more) = pop(stack);
let (stack, yielded) = pop(stack);
assert_eq!(has_more, Value::Bool(true));
assert_eq!(yielded, Value::Int(99));
let stack = push(stack, Value::Int(-1));
let stack = resume(stack);
let (stack, has_more) = pop(stack);
let (_stack, _) = pop(stack);
assert_eq!(has_more, Value::Bool(false));
wait_all_strands();
}
}
#[test]
fn test_weave_cancel_before_resume() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_three_times_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let _stack = weave_cancel(stack);
wait_all_strands();
}
}
#[test]
fn test_weave_cancel_after_yield() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_three_times_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, _) = pop(stack); let (stack, _) = pop(stack);
let _stack = weave_cancel(stack);
wait_all_strands();
}
}
#[test]
fn test_dormant_weave_doesnt_block_shutdown() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_three_times_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let _stack = weave(stack);
wait_all_strands();
}
}
#[test]
fn test_multiple_dormant_weaves() {
unsafe {
scheduler_init();
for _ in 0..10 {
let stack = alloc_test_stack();
let fn_ptr = yield_three_times_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let _stack = weave(stack);
}
wait_all_strands();
}
}
#[test]
fn test_resume_wrong_type() {
}
#[test]
fn test_weave_with_active_strands() {
unsafe {
use crate::scheduler::strand_spawn;
scheduler_init();
static STRAND_COMPLETED: AtomicBool = AtomicBool::new(false);
extern "C" fn simple_strand(_stack: Stack) -> Stack {
STRAND_COMPLETED.store(true, Ordering::SeqCst);
std::ptr::null_mut()
}
strand_spawn(simple_strand, std::ptr::null_mut());
let stack = alloc_test_stack();
let fn_ptr = yield_once_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(5));
let stack = resume(stack);
let (stack, _) = pop(stack);
let (stack, _) = pop(stack);
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, _) = pop(stack);
let (_stack, _) = pop(stack);
wait_all_strands();
assert!(
STRAND_COMPLETED.load(Ordering::SeqCst),
"Regular strand should have completed"
);
}
}
#[test]
fn test_weave_generator_pattern() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_three_times_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let mut collected = Vec::new();
let mut current_stack = stack;
loop {
current_stack = push(current_stack, Value::Int(0));
current_stack = resume(current_stack);
let (s, has_more) = pop(current_stack);
let (s, value) = pop(s);
current_stack = s;
match has_more {
Value::Bool(true) => {
if let Value::Int(n) = value {
collected.push(n);
}
}
Value::Bool(false) => {
let (_s, _handle) = pop(current_stack);
break;
}
_ => panic!("Unexpected has_more value"),
}
}
assert_eq!(collected, vec![1, 2, 3]);
wait_all_strands();
}
}
#[test]
fn test_weave_yields_zero() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = echo_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, has_more) = pop(stack);
let (stack, yielded) = pop(stack);
assert_eq!(has_more, Value::Bool(true), "Should still have more");
assert_eq!(yielded, Value::Int(0), "Should yield 0");
let stack = push(stack, Value::Int(-1));
let stack = resume(stack);
let (stack, has_more) = pop(stack);
let (_stack, _) = pop(stack);
assert_eq!(has_more, Value::Bool(false));
wait_all_strands();
}
}
#[test]
fn test_weave_yields_negative() {
unsafe {
scheduler_init();
let stack = alloc_test_stack();
let fn_ptr = yield_once_quot as *const () as usize;
let stack = push_quotation(stack, fn_ptr, fn_ptr);
let stack = weave(stack);
let stack = push(stack, Value::Int(-50));
let stack = resume(stack);
let (stack, has_more) = pop(stack);
let (stack, yielded) = pop(stack);
assert_eq!(has_more, Value::Bool(true));
assert_eq!(yielded, Value::Int(50));
let stack = push(stack, Value::Int(0));
let stack = resume(stack);
let (stack, _) = pop(stack);
let (_stack, _) = pop(stack);
wait_all_strands();
}
}
}