1use crate::stack::{Stack, pop, push};
34use crate::value::Value;
35use may::sync::mpmc;
36use std::collections::HashMap;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::sync::{Arc, Mutex, Once};
39
40static NEXT_CHANNEL_ID: AtomicU64 = AtomicU64::new(1);
42
43static CHANNEL_REGISTRY: Mutex<Option<HashMap<u64, ChannelPair>>> = Mutex::new(None);
46
47static REGISTRY_INIT: Once = Once::new();
49
50#[derive(Debug)]
52struct ChannelStatsInner {
53 send_count: AtomicU64,
55 receive_count: AtomicU64,
57}
58
59struct ChannelPair {
63 sender: mpmc::Sender<Value>,
64 receiver: mpmc::Receiver<Value>,
65 stats: Arc<ChannelStatsInner>,
66}
67
68fn init_registry() {
70 REGISTRY_INIT.call_once(|| {
71 let mut guard = CHANNEL_REGISTRY.lock()
72 .expect("init_registry: channel registry lock poisoned during initialization - strand panicked while holding lock");
73 *guard = Some(HashMap::new());
74 });
75}
76
77pub fn channel_count() -> Option<usize> {
82 match CHANNEL_REGISTRY.try_lock() {
84 Ok(guard) => guard.as_ref().map(|registry| registry.len()),
85 Err(_) => None, }
87}
88
89#[derive(Debug, Clone)]
91pub struct ChannelStats {
92 pub id: u64,
94 pub queue_depth: u64,
96 pub send_count: u64,
98 pub receive_count: u64,
100}
101
102pub fn channel_stats() -> Option<Vec<ChannelStats>> {
111 match CHANNEL_REGISTRY.try_lock() {
113 Ok(guard) => {
114 guard.as_ref().map(|registry| {
115 registry
116 .iter()
117 .map(|(&id, pair)| {
118 let send_count = pair.stats.send_count.load(Ordering::Relaxed);
119 let receive_count = pair.stats.receive_count.load(Ordering::Relaxed);
120 let queue_depth = send_count.saturating_sub(receive_count);
122 ChannelStats {
123 id,
124 queue_depth,
125 send_count,
126 receive_count,
127 }
128 })
129 .collect()
130 })
131 }
132 Err(_) => None, }
134}
135
136#[unsafe(no_mangle)]
145pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
146 init_registry();
147
148 let (sender, receiver) = mpmc::channel();
153
154 let channel_id = NEXT_CHANNEL_ID.fetch_add(1, Ordering::Relaxed);
155
156 let mut guard = CHANNEL_REGISTRY.lock().expect(
158 "make_channel: channel registry lock poisoned - strand panicked while holding lock",
159 );
160
161 let registry = guard
162 .as_mut()
163 .expect("make_channel: channel registry not initialized - call init_registry first");
164
165 registry.insert(
166 channel_id,
167 ChannelPair {
168 sender,
169 receiver,
170 stats: Arc::new(ChannelStatsInner {
171 send_count: AtomicU64::new(0),
172 receive_count: AtomicU64::new(0),
173 }),
174 },
175 );
176
177 unsafe { push(stack, Value::Int(channel_id as i64)) }
179}
180
181#[unsafe(no_mangle)]
191pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
192 assert!(!stack.is_null(), "send: stack is empty");
193
194 let (stack, channel_id_value) = unsafe { pop(stack) };
196 let channel_id = match channel_id_value {
197 Value::Int(id) => {
198 if id < 0 {
199 panic!("send: channel ID must be positive, got {}", id);
200 }
201 id as u64
202 }
203 _ => panic!("send: expected channel ID (Int) on stack"),
204 };
205
206 assert!(!stack.is_null(), "send: stack has only one value");
207
208 let (rest, value) = unsafe { pop(stack) };
210
211 let guard = CHANNEL_REGISTRY
213 .lock()
214 .expect("send: channel registry lock poisoned - strand panicked while holding lock");
215
216 let registry = guard
217 .as_ref()
218 .expect("send: channel registry not initialized - call init_registry first");
219
220 let pair = match registry.get(&channel_id) {
221 Some(p) => p,
222 None => panic!("send: invalid channel ID {}", channel_id),
223 };
224
225 let sender = pair.sender.clone();
227 let stats = Arc::clone(&pair.stats);
228 drop(guard); let global_value = value.clone();
234
235 sender.send(global_value).expect("send: channel closed");
238
239 stats.send_count.fetch_add(1, Ordering::Relaxed);
241
242 rest
243}
244
245#[unsafe(no_mangle)]
261pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
262 assert!(!stack.is_null(), "receive: stack is empty");
263
264 let (rest, channel_id_value) = unsafe { pop(stack) };
266 let channel_id = match channel_id_value {
267 Value::Int(id) => {
268 if id < 0 {
269 panic!("receive: channel ID must be positive, got {}", id);
270 }
271 id as u64
272 }
273 _ => panic!("receive: expected channel ID (Int) on stack"),
274 };
275
276 let (receiver, stats) = {
279 let guard = CHANNEL_REGISTRY
280 .lock()
281 .expect("receive: channel registry lock poisoned - strand panicked while holding lock");
282
283 let registry = guard
284 .as_ref()
285 .expect("receive: channel registry not initialized - call init_registry first");
286
287 let pair = match registry.get(&channel_id) {
288 Some(p) => p,
289 None => panic!("receive: invalid channel ID {}", channel_id),
290 };
291
292 (pair.receiver.clone(), Arc::clone(&pair.stats))
293 }; let value = match receiver.recv() {
299 Ok(v) => v,
300 Err(_) => panic!("receive: channel closed"),
301 };
302
303 stats.receive_count.fetch_add(1, Ordering::Relaxed);
305
306 unsafe { push(rest, value) }
307}
308
309#[unsafe(no_mangle)]
318pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
319 assert!(!stack.is_null(), "close_channel: stack is empty");
320
321 let (rest, channel_id_value) = unsafe { pop(stack) };
323 let channel_id = match channel_id_value {
324 Value::Int(id) => {
325 if id < 0 {
326 panic!("close_channel: channel ID must be positive, got {}", id);
327 }
328 id as u64
329 }
330 _ => panic!("close_channel: expected channel ID (Int) on stack"),
331 };
332
333 let mut guard = CHANNEL_REGISTRY.lock().expect(
335 "close_channel: channel registry lock poisoned - strand panicked while holding lock",
336 );
337
338 let registry = guard
339 .as_mut()
340 .expect("close_channel: channel registry not initialized - call init_registry first");
341
342 registry.remove(&channel_id);
343
344 rest
345}
346
347#[unsafe(no_mangle)]
357pub unsafe extern "C" fn patch_seq_chan_send_safe(stack: Stack) -> Stack {
358 assert!(!stack.is_null(), "send-safe: stack is empty");
359
360 let (stack, channel_id_value) = unsafe { pop(stack) };
362 let channel_id = match channel_id_value {
363 Value::Int(id) => {
364 if id < 0 {
365 if !stack.is_null() {
367 let (rest, _value) = unsafe { pop(stack) };
368 return unsafe { push(rest, Value::Int(0)) };
369 }
370 return unsafe { push(stack, Value::Int(0)) };
371 }
372 id as u64
373 }
374 _ => panic!("send-safe: expected channel ID (Int) on stack"),
375 };
376
377 if stack.is_null() {
378 return unsafe { push(stack, Value::Int(0)) };
380 }
381
382 let (rest, value) = unsafe { pop(stack) };
384
385 let (sender, stats) = {
387 let guard = match CHANNEL_REGISTRY.lock() {
388 Ok(g) => g,
389 Err(_) => return unsafe { push(rest, Value::Int(0)) },
390 };
391
392 let registry = match guard.as_ref() {
393 Some(r) => r,
394 None => return unsafe { push(rest, Value::Int(0)) },
395 };
396
397 match registry.get(&channel_id) {
398 Some(p) => (p.sender.clone(), Arc::clone(&p.stats)),
399 None => return unsafe { push(rest, Value::Int(0)) },
400 }
401 };
402
403 let global_value = value.clone();
405
406 match sender.send(global_value) {
408 Ok(()) => {
409 stats.send_count.fetch_add(1, Ordering::Relaxed);
410 unsafe { push(rest, Value::Int(1)) }
411 }
412 Err(_) => unsafe { push(rest, Value::Int(0)) },
413 }
414}
415
416#[unsafe(no_mangle)]
431pub unsafe extern "C" fn patch_seq_chan_receive_safe(stack: Stack) -> Stack {
432 assert!(!stack.is_null(), "receive-safe: stack is empty");
433
434 let (rest, channel_id_value) = unsafe { pop(stack) };
436 let channel_id = match channel_id_value {
437 Value::Int(id) => {
438 if id < 0 {
439 let stack = unsafe { push(rest, Value::Int(0)) };
441 return unsafe { push(stack, Value::Int(0)) };
442 }
443 id as u64
444 }
445 _ => panic!("receive-safe: expected channel ID (Int) on stack"),
446 };
447
448 let (receiver, stats) = {
450 let guard = match CHANNEL_REGISTRY.lock() {
451 Ok(g) => g,
452 Err(_) => {
453 let stack = unsafe { push(rest, Value::Int(0)) };
454 return unsafe { push(stack, Value::Int(0)) };
455 }
456 };
457
458 let registry = match guard.as_ref() {
459 Some(r) => r,
460 None => {
461 let stack = unsafe { push(rest, Value::Int(0)) };
462 return unsafe { push(stack, Value::Int(0)) };
463 }
464 };
465
466 match registry.get(&channel_id) {
467 Some(p) => (p.receiver.clone(), Arc::clone(&p.stats)),
468 None => {
469 let stack = unsafe { push(rest, Value::Int(0)) };
470 return unsafe { push(stack, Value::Int(0)) };
471 }
472 }
473 };
474
475 match receiver.recv() {
477 Ok(value) => {
478 stats.receive_count.fetch_add(1, Ordering::Relaxed);
479 let stack = unsafe { push(rest, value) };
480 unsafe { push(stack, Value::Int(1)) }
481 }
482 Err(_) => {
483 let stack = unsafe { push(rest, Value::Int(0)) };
484 unsafe { push(stack, Value::Int(0)) }
485 }
486 }
487}
488
489pub use patch_seq_chan_receive as receive;
491pub use patch_seq_chan_receive_safe as receive_safe;
492pub use patch_seq_chan_send as send;
493pub use patch_seq_chan_send_safe as send_safe;
494pub use patch_seq_close_channel as close_channel;
495pub use patch_seq_make_channel as make_channel;
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500 use crate::scheduler::{spawn_strand, wait_all_strands};
501 use std::sync::atomic::{AtomicI64, Ordering};
502
503 #[test]
504 fn test_make_channel() {
505 unsafe {
506 let stack = std::ptr::null_mut();
507 let stack = make_channel(stack);
508
509 let (stack, value) = pop(stack);
511 assert!(matches!(value, Value::Int(_)));
512 assert!(stack.is_null());
513 }
514 }
515
516 #[test]
517 fn test_send_receive() {
518 unsafe {
519 let mut stack = std::ptr::null_mut();
521 stack = make_channel(stack);
522
523 let (empty_stack, channel_id_value) = pop(stack);
525 assert!(empty_stack.is_null());
526
527 let mut stack = push(std::ptr::null_mut(), Value::Int(42));
529 stack = push(stack, channel_id_value.clone());
530 stack = send(stack);
531 assert!(stack.is_null());
532
533 stack = push(stack, channel_id_value);
535 stack = receive(stack);
536
537 let (stack, received) = pop(stack);
539 assert_eq!(received, Value::Int(42));
540 assert!(stack.is_null());
541 }
542 }
543
544 #[test]
545 fn test_channel_communication_between_strands() {
546 unsafe {
547 static RECEIVED_VALUE: AtomicI64 = AtomicI64::new(0);
548
549 let mut stack = std::ptr::null_mut();
551 stack = make_channel(stack);
552 let (_, channel_id_value) = pop(stack);
553 let channel_id = match channel_id_value {
554 Value::Int(id) => id,
555 _ => panic!("Expected Int"),
556 };
557
558 extern "C" fn receiver(_stack: Stack) -> Stack {
560 unsafe {
561 let channel_id = RECEIVED_VALUE.load(Ordering::Acquire); let mut stack = push(std::ptr::null_mut(), Value::Int(channel_id));
563 stack = receive(stack);
564 let (_, value) = pop(stack);
565 if let Value::Int(n) = value {
566 RECEIVED_VALUE.store(n, Ordering::Release);
567 }
568 std::ptr::null_mut()
569 }
570 }
571
572 RECEIVED_VALUE.store(channel_id, Ordering::Release);
574
575 spawn_strand(receiver);
577
578 std::thread::sleep(std::time::Duration::from_millis(10));
580
581 let mut stack = push(std::ptr::null_mut(), Value::Int(123));
583 stack = push(stack, Value::Int(channel_id));
584 let _ = send(stack);
585
586 wait_all_strands();
588
589 assert_eq!(RECEIVED_VALUE.load(Ordering::Acquire), 123);
591 }
592 }
593
594 #[test]
595 fn test_multiple_sends_receives() {
596 unsafe {
597 let mut stack = std::ptr::null_mut();
599 stack = make_channel(stack);
600 let (_, channel_id_value) = pop(stack);
601
602 for i in 1..=5 {
604 let mut stack = push(std::ptr::null_mut(), Value::Int(i));
605 stack = push(stack, channel_id_value.clone());
606 let _ = send(stack);
607 }
608
609 for i in 1..=5 {
611 let mut stack = push(std::ptr::null_mut(), channel_id_value.clone());
612 stack = receive(stack);
613 let (_, received) = pop(stack);
614 assert_eq!(received, Value::Int(i));
615 }
616 }
617 }
618
619 #[test]
620 fn test_close_channel() {
621 unsafe {
622 let mut stack = std::ptr::null_mut();
624 stack = make_channel(stack);
625 let (rest, channel_id) = pop(stack);
626
627 stack = push(rest, channel_id);
628 stack = close_channel(stack);
629 assert!(stack.is_null());
630 }
631 }
632
633 #[test]
634 fn test_arena_string_send_between_strands() {
635 unsafe {
638 use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
639
640 static CHANNEL_ID: AtomicI64 = AtomicI64::new(0);
641 static VERIFIED: AtomicBool = AtomicBool::new(false);
642
643 let mut stack = std::ptr::null_mut();
645 stack = make_channel(stack);
646 let (_, channel_id_value) = pop(stack);
647 let channel_id = match channel_id_value {
648 Value::Int(id) => id,
649 _ => panic!("Expected Int"),
650 };
651
652 CHANNEL_ID.store(channel_id, Ordering::Release);
654
655 extern "C" fn sender(_stack: Stack) -> Stack {
657 use crate::seqstring::arena_string;
658 use crate::stack::push;
659 use crate::value::Value;
660 use std::sync::atomic::Ordering;
661
662 unsafe {
663 let chan_id = CHANNEL_ID.load(Ordering::Acquire);
664
665 let msg = arena_string("Arena message!");
667 assert!(!msg.is_global(), "Should be arena-allocated initially");
668
669 let stack = push(std::ptr::null_mut(), Value::String(msg));
671 let stack = push(stack, Value::Int(chan_id));
672 send(stack)
673 }
674 }
675
676 extern "C" fn receiver(_stack: Stack) -> Stack {
678 use crate::stack::{pop, push};
679 use crate::value::Value;
680 use std::sync::atomic::Ordering;
681
682 unsafe {
683 let chan_id = CHANNEL_ID.load(Ordering::Acquire);
684
685 let mut stack = push(std::ptr::null_mut(), Value::Int(chan_id));
686 stack = receive(stack);
687 let (_, msg_val) = pop(stack);
688
689 match msg_val {
690 Value::String(s) => {
691 assert_eq!(s.as_str(), "Arena message!");
692 assert!(s.is_global(), "Received string should be global");
694 VERIFIED.store(true, Ordering::Release);
695 }
696 _ => panic!("Expected String"),
697 }
698
699 std::ptr::null_mut()
700 }
701 }
702
703 spawn_strand(sender);
705 spawn_strand(receiver);
706
707 wait_all_strands();
709
710 assert!(
712 VERIFIED.load(Ordering::Acquire),
713 "Receiver should have verified the message"
714 );
715 }
716 }
717
718 #[test]
723 fn test_send_safe_success() {
724 unsafe {
725 let mut stack = std::ptr::null_mut();
727 stack = make_channel(stack);
728 let (_, channel_id_value) = pop(stack);
729
730 let mut stack = push(std::ptr::null_mut(), Value::Int(42));
732 stack = push(stack, channel_id_value.clone());
733 stack = send_safe(stack);
734
735 let (stack, result) = pop(stack);
737 assert_eq!(result, Value::Int(1));
738 assert!(stack.is_null());
739
740 let mut stack = push(std::ptr::null_mut(), channel_id_value);
742 stack = receive(stack);
743 let (_, received) = pop(stack);
744 assert_eq!(received, Value::Int(42));
745 }
746 }
747
748 #[test]
749 fn test_send_safe_invalid_channel() {
750 unsafe {
751 let mut stack = push(std::ptr::null_mut(), Value::Int(42));
753 stack = push(stack, Value::Int(999999)); stack = send_safe(stack);
755
756 let (stack, result) = pop(stack);
758 assert_eq!(result, Value::Int(0));
759 assert!(stack.is_null());
760 }
761 }
762
763 #[test]
764 fn test_send_safe_negative_channel() {
765 unsafe {
766 let mut stack = push(std::ptr::null_mut(), Value::Int(42));
768 stack = push(stack, Value::Int(-1));
769 stack = send_safe(stack);
770
771 let (stack, result) = pop(stack);
773 assert_eq!(result, Value::Int(0));
774 assert!(stack.is_null()); }
776 }
777
778 #[test]
779 fn test_receive_safe_success() {
780 unsafe {
781 let mut stack = std::ptr::null_mut();
783 stack = make_channel(stack);
784 let (_, channel_id_value) = pop(stack);
785
786 let mut stack = push(std::ptr::null_mut(), Value::Int(42));
788 stack = push(stack, channel_id_value.clone());
789 let _ = send(stack);
790
791 let mut stack = push(std::ptr::null_mut(), channel_id_value);
793 stack = receive_safe(stack);
794
795 let (stack, success) = pop(stack);
797 let (stack, value) = pop(stack);
798 assert_eq!(success, Value::Int(1));
799 assert_eq!(value, Value::Int(42));
800 assert!(stack.is_null());
801 }
802 }
803
804 #[test]
805 fn test_receive_safe_invalid_channel() {
806 unsafe {
807 let mut stack = push(std::ptr::null_mut(), Value::Int(999999));
809 stack = receive_safe(stack);
810
811 let (stack, success) = pop(stack);
813 let (stack, value) = pop(stack);
814 assert_eq!(success, Value::Int(0));
815 assert_eq!(value, Value::Int(0));
816 assert!(stack.is_null());
817 }
818 }
819
820 #[test]
821 fn test_receive_safe_closed_channel() {
822 unsafe {
823 let mut stack = std::ptr::null_mut();
825 stack = make_channel(stack);
826 let (_, channel_id_value) = pop(stack);
827 let channel_id = match &channel_id_value {
828 Value::Int(id) => *id,
829 _ => panic!("Expected Int"),
830 };
831
832 let stack = push(std::ptr::null_mut(), channel_id_value);
834 let _ = close_channel(stack);
835
836 let mut stack = push(std::ptr::null_mut(), Value::Int(channel_id));
838 stack = receive_safe(stack);
839
840 let (stack, success) = pop(stack);
842 let (stack, value) = pop(stack);
843 assert_eq!(success, Value::Int(0));
844 assert_eq!(value, Value::Int(0));
845 assert!(stack.is_null());
846 }
847 }
848
849 fn get_stats_with_retry() -> Option<Vec<super::ChannelStats>> {
851 for _ in 0..10 {
852 if let Some(stats) = super::channel_stats() {
853 return Some(stats);
854 }
855 std::thread::sleep(std::time::Duration::from_millis(1));
856 }
857 None
858 }
859
860 #[test]
861 fn test_channel_stats() {
862 unsafe {
863 let mut stack = std::ptr::null_mut();
865 stack = make_channel(stack);
866 let (_, channel_id_value) = pop(stack);
867 let channel_id = match &channel_id_value {
868 Value::Int(id) => *id as u64,
869 _ => panic!("Expected Int"),
870 };
871
872 let stats = match get_stats_with_retry() {
875 Some(s) => s,
876 None => {
877 let stack = push(std::ptr::null_mut(), channel_id_value);
879 let _ = close_channel(stack);
880 return;
881 }
882 };
883 let our_channel = stats.iter().find(|s| s.id == channel_id);
884 assert!(our_channel.is_some(), "Our channel should be in stats");
885 let stat = our_channel.unwrap();
886 assert_eq!(stat.send_count, 0);
887 assert_eq!(stat.receive_count, 0);
888 assert_eq!(stat.queue_depth, 0);
889
890 for i in 1..=5 {
892 let mut stack = push(std::ptr::null_mut(), Value::Int(i));
893 stack = push(stack, channel_id_value.clone());
894 let _ = send(stack);
895 }
896
897 let stats = get_stats_with_retry().expect("Should get stats after retries");
899 let stat = stats.iter().find(|s| s.id == channel_id).unwrap();
900 assert_eq!(stat.send_count, 5);
901 assert_eq!(stat.receive_count, 0);
902 assert_eq!(stat.queue_depth, 5);
903
904 for _ in 0..3 {
906 let mut stack = push(std::ptr::null_mut(), channel_id_value.clone());
907 stack = receive(stack);
908 let _ = pop(stack);
909 }
910
911 let stats = get_stats_with_retry().expect("Should get stats after retries");
913 let stat = stats.iter().find(|s| s.id == channel_id).unwrap();
914 assert_eq!(stat.send_count, 5);
915 assert_eq!(stat.receive_count, 3);
916 assert_eq!(stat.queue_depth, 2);
917
918 for _ in 0..2 {
920 let mut stack = push(std::ptr::null_mut(), channel_id_value.clone());
921 stack = receive(stack);
922 let _ = pop(stack);
923 }
924
925 let stack = push(std::ptr::null_mut(), channel_id_value);
926 let _ = close_channel(stack);
927 }
928 }
929
930 #[test]
931 fn test_mpmc_concurrent_receivers() {
932 unsafe {
935 use std::sync::atomic::{AtomicI64, Ordering};
936
937 const NUM_MESSAGES: i64 = 100;
938 const NUM_RECEIVERS: usize = 4;
939
940 static RECEIVER_COUNTS: [AtomicI64; 4] = [
942 AtomicI64::new(0),
943 AtomicI64::new(0),
944 AtomicI64::new(0),
945 AtomicI64::new(0),
946 ];
947 static CHANNEL_ID: AtomicI64 = AtomicI64::new(0);
948
949 for counter in &RECEIVER_COUNTS {
951 counter.store(0, Ordering::SeqCst);
952 }
953
954 let mut stack = std::ptr::null_mut();
956 stack = make_channel(stack);
957 let (_, channel_id_value) = pop(stack);
958 let channel_id = match channel_id_value {
959 Value::Int(id) => id,
960 _ => panic!("Expected Int"),
961 };
962 CHANNEL_ID.store(channel_id, Ordering::SeqCst);
963
964 fn make_receiver(receiver_idx: usize) -> extern "C" fn(Stack) -> Stack {
966 match receiver_idx {
967 0 => receiver_0,
968 1 => receiver_1,
969 2 => receiver_2,
970 3 => receiver_3,
971 _ => panic!("Invalid receiver index"),
972 }
973 }
974
975 extern "C" fn receiver_0(stack: Stack) -> Stack {
976 receive_loop(0, stack)
977 }
978 extern "C" fn receiver_1(stack: Stack) -> Stack {
979 receive_loop(1, stack)
980 }
981 extern "C" fn receiver_2(stack: Stack) -> Stack {
982 receive_loop(2, stack)
983 }
984 extern "C" fn receiver_3(stack: Stack) -> Stack {
985 receive_loop(3, stack)
986 }
987
988 fn receive_loop(idx: usize, _stack: Stack) -> Stack {
989 unsafe {
990 let chan_id = CHANNEL_ID.load(Ordering::SeqCst);
991 loop {
992 let mut stack = push(std::ptr::null_mut(), Value::Int(chan_id));
993 stack = receive_safe(stack);
994 let (stack, success) = pop(stack);
995 let (_, value) = pop(stack);
996
997 match (success, value) {
998 (Value::Int(1), Value::Int(v)) => {
999 if v < 0 {
1000 break;
1002 }
1003 RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
1004 }
1005 _ => break, }
1007 may::coroutine::yield_now();
1008 }
1009 std::ptr::null_mut()
1010 }
1011 }
1012
1013 for i in 0..NUM_RECEIVERS {
1015 crate::scheduler::spawn_strand(make_receiver(i));
1016 }
1017
1018 std::thread::sleep(std::time::Duration::from_millis(10));
1020
1021 for i in 0..NUM_MESSAGES {
1023 let mut stack = push(std::ptr::null_mut(), Value::Int(i));
1024 stack = push(stack, Value::Int(channel_id));
1025 let _ = send(stack);
1026 }
1027
1028 for _ in 0..NUM_RECEIVERS {
1030 let mut stack = push(std::ptr::null_mut(), Value::Int(-1));
1031 stack = push(stack, Value::Int(channel_id));
1032 let _ = send(stack);
1033 }
1034
1035 crate::scheduler::wait_all_strands();
1037
1038 let total_received: i64 = RECEIVER_COUNTS
1040 .iter()
1041 .map(|c| c.load(Ordering::SeqCst))
1042 .sum();
1043
1044 assert_eq!(
1045 total_received, NUM_MESSAGES,
1046 "Total received ({}) should equal messages sent ({})",
1047 total_received, NUM_MESSAGES
1048 );
1049
1050 let active_receivers = RECEIVER_COUNTS
1053 .iter()
1054 .filter(|c| c.load(Ordering::SeqCst) > 0)
1055 .count();
1056
1057 assert!(
1058 active_receivers >= 2,
1059 "Messages should be distributed across receivers, but only {} received any",
1060 active_receivers
1061 );
1062
1063 let stack = push(std::ptr::null_mut(), Value::Int(channel_id));
1065 let _ = close_channel(stack);
1066 }
1067 }
1068}