1use crate::stack::{Stack, pop, push};
37use crate::value::{ChannelData, Value};
38use may::sync::mpmc;
39use std::sync::Arc;
40
41#[unsafe(no_mangle)]
51pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
52 let (sender, receiver) = mpmc::channel();
56
57 let channel = Arc::new(ChannelData { sender, receiver });
59
60 unsafe { push(stack, Value::Channel(channel)) }
61}
62
63#[unsafe(no_mangle)]
73pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
74 assert!(!stack.is_null(), "send: stack is empty");
75
76 let (stack, channel_value) = unsafe { pop(stack) };
78 let channel = match channel_value {
79 Value::Channel(ch) => ch,
80 _ => panic!("send: expected Channel on stack, got {:?}", channel_value),
81 };
82
83 assert!(!stack.is_null(), "send: stack has only one value");
84
85 let (rest, value) = unsafe { pop(stack) };
87
88 let global_value = value.clone();
91
92 channel
95 .sender
96 .send(global_value)
97 .expect("send: channel closed");
98
99 rest
100}
101
102#[unsafe(no_mangle)]
117pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
118 assert!(!stack.is_null(), "receive: stack is empty");
119
120 let (rest, channel_value) = unsafe { pop(stack) };
122 let channel = match channel_value {
123 Value::Channel(ch) => ch,
124 _ => panic!(
125 "receive: expected Channel on stack, got {:?}",
126 channel_value
127 ),
128 };
129
130 let value = match channel.receiver.recv() {
133 Ok(v) => v,
134 Err(_) => panic!("receive: channel closed"),
135 };
136
137 unsafe { push(rest, value) }
138}
139
140#[unsafe(no_mangle)]
150pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
151 assert!(!stack.is_null(), "close_channel: stack is empty");
152
153 let (rest, channel_value) = unsafe { pop(stack) };
155 match channel_value {
156 Value::Channel(_) => {} _ => panic!(
158 "close_channel: expected Channel on stack, got {:?}",
159 channel_value
160 ),
161 }
162
163 rest
164}
165
166#[unsafe(no_mangle)]
176pub unsafe extern "C" fn patch_seq_chan_send_safe(stack: Stack) -> Stack {
177 assert!(!stack.is_null(), "send-safe: stack is empty");
178
179 let (stack, channel_value) = unsafe { pop(stack) };
181 let channel = match channel_value {
182 Value::Channel(ch) => ch,
183 _ => {
184 if !stack.is_null() {
186 let (rest, _value) = unsafe { pop(stack) };
187 return unsafe { push(rest, Value::Int(0)) };
188 }
189 return unsafe { push(stack, Value::Int(0)) };
190 }
191 };
192
193 if stack.is_null() {
194 return unsafe { push(stack, Value::Int(0)) };
196 }
197
198 let (rest, value) = unsafe { pop(stack) };
200
201 let global_value = value.clone();
203
204 match channel.sender.send(global_value) {
206 Ok(()) => unsafe { push(rest, Value::Int(1)) },
207 Err(_) => unsafe { push(rest, Value::Int(0)) },
208 }
209}
210
211#[unsafe(no_mangle)]
226pub unsafe extern "C" fn patch_seq_chan_receive_safe(stack: Stack) -> Stack {
227 assert!(!stack.is_null(), "receive-safe: stack is empty");
228
229 let (rest, channel_value) = unsafe { pop(stack) };
231 let channel = match channel_value {
232 Value::Channel(ch) => ch,
233 _ => {
234 let stack = unsafe { push(rest, Value::Int(0)) };
236 return unsafe { push(stack, Value::Int(0)) };
237 }
238 };
239
240 match channel.receiver.recv() {
242 Ok(value) => {
243 let stack = unsafe { push(rest, value) };
244 unsafe { push(stack, Value::Int(1)) }
245 }
246 Err(_) => {
247 let stack = unsafe { push(rest, Value::Int(0)) };
248 unsafe { push(stack, Value::Int(0)) }
249 }
250 }
251}
252
253pub use patch_seq_chan_receive as receive;
255pub use patch_seq_chan_receive_safe as receive_safe;
256pub use patch_seq_chan_send as send;
257pub use patch_seq_chan_send_safe as send_safe;
258pub use patch_seq_close_channel as close_channel;
259pub use patch_seq_make_channel as make_channel;
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::scheduler::{spawn_strand, wait_all_strands};
265 use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
266
267 #[test]
268 fn test_make_channel() {
269 unsafe {
270 let stack = crate::stack::alloc_test_stack();
271 let stack = make_channel(stack);
272
273 let (_stack, value) = pop(stack);
275 assert!(matches!(value, Value::Channel(_)));
276 }
277 }
278
279 #[test]
280 fn test_send_receive() {
281 unsafe {
282 let mut stack = crate::stack::alloc_test_stack();
284 stack = make_channel(stack);
285
286 let (_empty_stack, channel_value) = pop(stack);
288
289 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
291 stack = push(stack, channel_value.clone());
292 stack = send(stack);
293
294 stack = push(stack, channel_value);
296 stack = receive(stack);
297
298 let (_stack, received) = pop(stack);
300 assert_eq!(received, Value::Int(42));
301 }
302 }
303
304 #[test]
305 fn test_channel_dup_sharing() {
306 unsafe {
308 let mut stack = crate::stack::alloc_test_stack();
309 stack = make_channel(stack);
310
311 let (_, ch1) = pop(stack);
312 let ch2 = ch1.clone(); let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
316 stack = push(stack, ch1);
317 stack = send(stack);
318
319 stack = push(stack, ch2);
321 stack = receive(stack);
322
323 let (_, received) = pop(stack);
324 assert_eq!(received, Value::Int(99));
325 }
326 }
327
328 #[test]
329 fn test_multiple_sends_receives() {
330 unsafe {
331 let mut stack = crate::stack::alloc_test_stack();
333 stack = make_channel(stack);
334 let (_, channel_value) = pop(stack);
335
336 for i in 1..=5 {
338 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
339 stack = push(stack, channel_value.clone());
340 let _ = send(stack);
341 }
342
343 for i in 1..=5 {
345 let mut stack = push(crate::stack::alloc_test_stack(), channel_value.clone());
346 stack = receive(stack);
347 let (_, received) = pop(stack);
348 assert_eq!(received, Value::Int(i));
349 }
350 }
351 }
352
353 #[test]
354 fn test_close_channel() {
355 unsafe {
356 let mut stack = crate::stack::alloc_test_stack();
358 stack = make_channel(stack);
359
360 let _stack = close_channel(stack);
361 }
362 }
363
364 #[test]
365 fn test_arena_string_send_between_strands() {
366 unsafe {
368 static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
369 static VERIFIED: AtomicBool = AtomicBool::new(false);
370
371 let mut stack = crate::stack::alloc_test_stack();
373 stack = make_channel(stack);
374 let (_, channel_value) = pop(stack);
375
376 let ch_ptr = match &channel_value {
378 Value::Channel(arc) => Arc::as_ptr(arc) as i64,
379 _ => panic!("Expected Channel"),
380 };
381 CHANNEL_PTR.store(ch_ptr, Ordering::Release);
382
383 std::mem::forget(channel_value.clone());
385
386 extern "C" fn sender(_stack: Stack) -> Stack {
388 use crate::seqstring::arena_string;
389 use crate::value::ChannelData;
390
391 unsafe {
392 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
393 let channel = Arc::from_raw(ch_ptr);
394 let channel_clone = Arc::clone(&channel);
395 std::mem::forget(channel); let msg = arena_string("Arena message!");
399 assert!(!msg.is_global(), "Should be arena-allocated initially");
400
401 let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
403 let stack = push(stack, Value::Channel(channel_clone));
404 send(stack)
405 }
406 }
407
408 extern "C" fn receiver(_stack: Stack) -> Stack {
410 use crate::value::ChannelData;
411
412 unsafe {
413 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
414 let channel = Arc::from_raw(ch_ptr);
415 let channel_clone = Arc::clone(&channel);
416 std::mem::forget(channel); let mut stack = push(
419 crate::stack::alloc_test_stack(),
420 Value::Channel(channel_clone),
421 );
422 stack = receive(stack);
423 let (_, msg_val) = pop(stack);
424
425 match msg_val {
426 Value::String(s) => {
427 assert_eq!(s.as_str(), "Arena message!");
428 assert!(s.is_global(), "Received string should be global");
429 VERIFIED.store(true, Ordering::Release);
430 }
431 _ => panic!("Expected String"),
432 }
433
434 std::ptr::null_mut()
435 }
436 }
437
438 spawn_strand(sender);
439 spawn_strand(receiver);
440 wait_all_strands();
441
442 assert!(
443 VERIFIED.load(Ordering::Acquire),
444 "Receiver should have verified"
445 );
446 }
447 }
448
449 #[test]
450 fn test_send_safe_success() {
451 unsafe {
452 let mut stack = crate::stack::alloc_test_stack();
453 stack = make_channel(stack);
454 let (_, channel_value) = pop(stack);
455
456 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
458 stack = push(stack, channel_value.clone());
459 stack = send_safe(stack);
460
461 let (_stack, result) = pop(stack);
463 assert_eq!(result, Value::Int(1));
464
465 let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
467 stack = receive(stack);
468 let (_, received) = pop(stack);
469 assert_eq!(received, Value::Int(42));
470 }
471 }
472
473 #[test]
474 fn test_send_safe_wrong_type() {
475 unsafe {
476 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
478 stack = push(stack, Value::Int(999)); stack = send_safe(stack);
480
481 let (_stack, result) = pop(stack);
483 assert_eq!(result, Value::Int(0));
484 }
485 }
486
487 #[test]
488 fn test_receive_safe_success() {
489 unsafe {
490 let mut stack = crate::stack::alloc_test_stack();
491 stack = make_channel(stack);
492 let (_, channel_value) = pop(stack);
493
494 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
496 stack = push(stack, channel_value.clone());
497 let _ = send(stack);
498
499 let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
501 stack = receive_safe(stack);
502
503 let (stack, success) = pop(stack);
505 let (_stack, value) = pop(stack);
506 assert_eq!(success, Value::Int(1));
507 assert_eq!(value, Value::Int(42));
508 }
509 }
510
511 #[test]
512 fn test_receive_safe_wrong_type() {
513 unsafe {
514 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(999));
516 stack = receive_safe(stack);
517
518 let (stack, success) = pop(stack);
520 let (_stack, value) = pop(stack);
521 assert_eq!(success, Value::Int(0));
522 assert_eq!(value, Value::Int(0));
523 }
524 }
525
526 #[test]
527 fn test_mpmc_concurrent_receivers() {
528 unsafe {
530 const NUM_MESSAGES: i64 = 100;
531 const NUM_RECEIVERS: usize = 4;
532
533 static RECEIVER_COUNTS: [AtomicI64; 4] = [
534 AtomicI64::new(0),
535 AtomicI64::new(0),
536 AtomicI64::new(0),
537 AtomicI64::new(0),
538 ];
539 static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
540
541 for counter in &RECEIVER_COUNTS {
543 counter.store(0, Ordering::SeqCst);
544 }
545
546 let mut stack = crate::stack::alloc_test_stack();
548 stack = make_channel(stack);
549 let (_, channel_value) = pop(stack);
550
551 let ch_ptr = match &channel_value {
552 Value::Channel(arc) => Arc::as_ptr(arc) as i64,
553 _ => panic!("Expected Channel"),
554 };
555 CHANNEL_PTR.store(ch_ptr, Ordering::SeqCst);
556
557 for _ in 0..(NUM_RECEIVERS + 1) {
559 std::mem::forget(channel_value.clone());
560 }
561
562 fn make_receiver(idx: usize) -> extern "C" fn(Stack) -> Stack {
563 match idx {
564 0 => receiver_0,
565 1 => receiver_1,
566 2 => receiver_2,
567 3 => receiver_3,
568 _ => panic!("Invalid receiver index"),
569 }
570 }
571
572 extern "C" fn receiver_0(stack: Stack) -> Stack {
573 receive_loop(0, stack)
574 }
575 extern "C" fn receiver_1(stack: Stack) -> Stack {
576 receive_loop(1, stack)
577 }
578 extern "C" fn receiver_2(stack: Stack) -> Stack {
579 receive_loop(2, stack)
580 }
581 extern "C" fn receiver_3(stack: Stack) -> Stack {
582 receive_loop(3, stack)
583 }
584
585 fn receive_loop(idx: usize, _stack: Stack) -> Stack {
586 use crate::value::ChannelData;
587 unsafe {
588 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
589 let channel = Arc::from_raw(ch_ptr);
590 let channel_clone = Arc::clone(&channel);
591 std::mem::forget(channel);
592
593 loop {
594 let mut stack = push(
595 crate::stack::alloc_test_stack(),
596 Value::Channel(channel_clone.clone()),
597 );
598 stack = receive_safe(stack);
599 let (stack, success) = pop(stack);
600 let (_, value) = pop(stack);
601
602 match (success, value) {
603 (Value::Int(1), Value::Int(v)) => {
604 if v < 0 {
605 break; }
607 RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
608 }
609 _ => break,
610 }
611 may::coroutine::yield_now();
612 }
613 std::ptr::null_mut()
614 }
615 }
616
617 for i in 0..NUM_RECEIVERS {
619 spawn_strand(make_receiver(i));
620 }
621
622 std::thread::sleep(std::time::Duration::from_millis(10));
623
624 for i in 0..NUM_MESSAGES {
626 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
627 let channel = Arc::from_raw(ch_ptr);
628 let channel_clone = Arc::clone(&channel);
629 std::mem::forget(channel);
630
631 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
632 stack = push(stack, Value::Channel(channel_clone));
633 let _ = send(stack);
634 }
635
636 for _ in 0..NUM_RECEIVERS {
638 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
639 let channel = Arc::from_raw(ch_ptr);
640 let channel_clone = Arc::clone(&channel);
641 std::mem::forget(channel);
642
643 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(-1));
644 stack = push(stack, Value::Channel(channel_clone));
645 let _ = send(stack);
646 }
647
648 wait_all_strands();
649
650 let total_received: i64 = RECEIVER_COUNTS
651 .iter()
652 .map(|c| c.load(Ordering::SeqCst))
653 .sum();
654 assert_eq!(total_received, NUM_MESSAGES);
655
656 let active_receivers = RECEIVER_COUNTS
657 .iter()
658 .filter(|c| c.load(Ordering::SeqCst) > 0)
659 .count();
660 assert!(active_receivers >= 2, "Messages should be distributed");
661 }
662 }
663}