1use crate::stack::{Stack, pop, push};
37use crate::value::{ChannelData, Value};
38use may::sync::mpmc;
39use std::sync::Arc;
40
41#[cfg(feature = "diagnostics")]
42use std::sync::atomic::{AtomicU64, Ordering};
43
44#[cfg(feature = "diagnostics")]
45pub static TOTAL_MESSAGES_SENT: AtomicU64 = AtomicU64::new(0);
46#[cfg(feature = "diagnostics")]
47pub static TOTAL_MESSAGES_RECEIVED: AtomicU64 = AtomicU64::new(0);
48
49#[unsafe(no_mangle)]
59pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
60 let (sender, receiver) = mpmc::channel();
64
65 let channel = Arc::new(ChannelData { sender, receiver });
67
68 unsafe { push(stack, Value::Channel(channel)) }
69}
70
71#[unsafe(no_mangle)]
81pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
82 assert!(!stack.is_null(), "close_channel: stack is empty");
83
84 let (rest, channel_value) = unsafe { pop(stack) };
86 match channel_value {
87 Value::Channel(_) => {} _ => panic!(
89 "close_channel: expected Channel on stack, got {:?}",
90 channel_value
91 ),
92 }
93
94 rest
95}
96
97#[unsafe(no_mangle)]
107pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
108 assert!(!stack.is_null(), "chan.send: stack is empty");
109
110 let (stack, channel_value) = unsafe { pop(stack) };
112 let channel = match channel_value {
113 Value::Channel(ch) => ch,
114 _ => {
115 if !stack.is_null() {
117 let (rest, _value) = unsafe { pop(stack) };
118 return unsafe { push(rest, Value::Bool(false)) };
119 }
120 return unsafe { push(stack, Value::Bool(false)) };
121 }
122 };
123
124 if stack.is_null() {
125 return unsafe { push(stack, Value::Bool(false)) };
127 }
128
129 let (rest, value) = unsafe { pop(stack) };
131
132 let global_value = value.clone();
134
135 match channel.sender.send(global_value) {
137 Ok(()) => {
138 #[cfg(feature = "diagnostics")]
139 TOTAL_MESSAGES_SENT.fetch_add(1, Ordering::Relaxed);
140 unsafe { push(rest, Value::Bool(true)) }
141 }
142 Err(_) => unsafe { push(rest, Value::Bool(false)) },
143 }
144}
145
146#[unsafe(no_mangle)]
161pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
162 assert!(!stack.is_null(), "chan.receive: stack is empty");
163
164 let (rest, channel_value) = unsafe { pop(stack) };
166 let channel = match channel_value {
167 Value::Channel(ch) => ch,
168 _ => {
169 let stack = unsafe { push(rest, Value::Int(0)) };
171 return unsafe { push(stack, Value::Bool(false)) };
172 }
173 };
174
175 match channel.receiver.recv() {
177 Ok(value) => {
178 #[cfg(feature = "diagnostics")]
179 TOTAL_MESSAGES_RECEIVED.fetch_add(1, Ordering::Relaxed);
180 let stack = unsafe { push(rest, value) };
181 unsafe { push(stack, Value::Bool(true)) }
182 }
183 Err(_) => {
184 let stack = unsafe { push(rest, Value::Int(0)) };
185 unsafe { push(stack, Value::Bool(false)) }
186 }
187 }
188}
189
190pub use patch_seq_chan_receive as receive;
192pub use patch_seq_chan_send as send;
193pub use patch_seq_close_channel as close_channel;
194pub use patch_seq_make_channel as make_channel;
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::scheduler::{spawn_strand, wait_all_strands};
200 use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
201
202 #[test]
203 fn test_make_channel() {
204 unsafe {
205 let stack = crate::stack::alloc_test_stack();
206 let stack = make_channel(stack);
207
208 let (_stack, value) = pop(stack);
210 assert!(matches!(value, Value::Channel(_)));
211 }
212 }
213
214 #[test]
215 fn test_send_receive() {
216 unsafe {
217 let mut stack = crate::stack::alloc_test_stack();
219 stack = make_channel(stack);
220
221 let (_empty_stack, channel_value) = pop(stack);
223
224 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
226 stack = push(stack, channel_value.clone());
227 stack = send(stack);
228
229 let (stack, send_success) = pop(stack);
231 assert_eq!(send_success, Value::Bool(true));
232
233 let mut stack = push(stack, channel_value);
235 stack = receive(stack);
236
237 let (stack, recv_success) = pop(stack);
239 let (_stack, received) = pop(stack);
240 assert_eq!(recv_success, Value::Bool(true));
241 assert_eq!(received, Value::Int(42));
242 }
243 }
244
245 #[test]
246 fn test_channel_dup_sharing() {
247 unsafe {
249 let mut stack = crate::stack::alloc_test_stack();
250 stack = make_channel(stack);
251
252 let (_, ch1) = pop(stack);
253 let ch2 = ch1.clone(); let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
257 stack = push(stack, ch1);
258 stack = send(stack);
259
260 let (stack, _) = pop(stack);
262
263 let mut stack = push(stack, ch2);
265 stack = receive(stack);
266
267 let (stack, _) = pop(stack);
269 let (_, received) = pop(stack);
270 assert_eq!(received, Value::Int(99));
271 }
272 }
273
274 #[test]
275 fn test_multiple_sends_receives() {
276 unsafe {
277 let mut stack = crate::stack::alloc_test_stack();
279 stack = make_channel(stack);
280 let (_, channel_value) = pop(stack);
281
282 for i in 1..=5 {
284 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
285 stack = push(stack, channel_value.clone());
286 stack = send(stack);
287 let (_, success) = pop(stack);
288 assert_eq!(success, Value::Bool(true));
289 }
290
291 for i in 1..=5 {
293 let mut stack = push(crate::stack::alloc_test_stack(), channel_value.clone());
294 stack = receive(stack);
295 let (stack, success) = pop(stack);
296 let (_, received) = pop(stack);
297 assert_eq!(success, Value::Bool(true));
298 assert_eq!(received, Value::Int(i));
299 }
300 }
301 }
302
303 #[test]
304 fn test_close_channel() {
305 unsafe {
306 let mut stack = crate::stack::alloc_test_stack();
308 stack = make_channel(stack);
309
310 let _stack = close_channel(stack);
311 }
312 }
313
314 #[test]
315 fn test_arena_string_send_between_strands() {
316 unsafe {
318 static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
319 static VERIFIED: AtomicBool = AtomicBool::new(false);
320
321 let mut stack = crate::stack::alloc_test_stack();
323 stack = make_channel(stack);
324 let (_, channel_value) = pop(stack);
325
326 let ch_ptr = match &channel_value {
328 Value::Channel(arc) => Arc::as_ptr(arc) as i64,
329 _ => panic!("Expected Channel"),
330 };
331 CHANNEL_PTR.store(ch_ptr, Ordering::Release);
332
333 std::mem::forget(channel_value.clone());
335
336 extern "C" fn sender(_stack: Stack) -> Stack {
338 use crate::seqstring::arena_string;
339 use crate::value::ChannelData;
340
341 unsafe {
342 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
343 let channel = Arc::from_raw(ch_ptr);
344 let channel_clone = Arc::clone(&channel);
345 std::mem::forget(channel); let msg = arena_string("Arena message!");
349 assert!(!msg.is_global(), "Should be arena-allocated initially");
350
351 let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
353 let stack = push(stack, Value::Channel(channel_clone));
354 let stack = send(stack);
355 let (stack, _) = pop(stack);
357 stack
358 }
359 }
360
361 extern "C" fn receiver(_stack: Stack) -> Stack {
363 use crate::value::ChannelData;
364
365 unsafe {
366 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
367 let channel = Arc::from_raw(ch_ptr);
368 let channel_clone = Arc::clone(&channel);
369 std::mem::forget(channel); let mut stack = push(
372 crate::stack::alloc_test_stack(),
373 Value::Channel(channel_clone),
374 );
375 stack = receive(stack);
376 let (stack, _) = pop(stack);
378 let (_, msg_val) = pop(stack);
379
380 match msg_val {
381 Value::String(s) => {
382 assert_eq!(s.as_str(), "Arena message!");
383 assert!(s.is_global(), "Received string should be global");
384 VERIFIED.store(true, Ordering::Release);
385 }
386 _ => panic!("Expected String"),
387 }
388
389 std::ptr::null_mut()
390 }
391 }
392
393 spawn_strand(sender);
394 spawn_strand(receiver);
395 wait_all_strands();
396
397 assert!(
398 VERIFIED.load(Ordering::Acquire),
399 "Receiver should have verified"
400 );
401 }
402 }
403
404 #[test]
405 fn test_send_success() {
406 unsafe {
407 let mut stack = crate::stack::alloc_test_stack();
408 stack = make_channel(stack);
409 let (_, channel_value) = pop(stack);
410
411 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
413 stack = push(stack, channel_value.clone());
414 stack = send(stack);
415
416 let (stack, result) = pop(stack);
418 assert_eq!(result, Value::Bool(true));
419
420 let mut stack = push(stack, channel_value);
422 stack = receive(stack);
423 let (stack, success) = pop(stack);
424 let (_, received) = pop(stack);
425 assert_eq!(success, Value::Bool(true));
426 assert_eq!(received, Value::Int(42));
427 }
428 }
429
430 #[test]
431 fn test_send_wrong_type() {
432 unsafe {
433 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
435 stack = push(stack, Value::Int(999)); stack = send(stack);
437
438 let (_stack, result) = pop(stack);
440 assert_eq!(result, Value::Bool(false));
441 }
442 }
443
444 #[test]
445 fn test_receive_success() {
446 unsafe {
447 let mut stack = crate::stack::alloc_test_stack();
448 stack = make_channel(stack);
449 let (_, channel_value) = pop(stack);
450
451 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
453 stack = push(stack, channel_value.clone());
454 stack = send(stack);
455 let (_, _) = pop(stack); let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
459 stack = receive(stack);
460
461 let (stack, success) = pop(stack);
463 let (_stack, value) = pop(stack);
464 assert_eq!(success, Value::Bool(true));
465 assert_eq!(value, Value::Int(42));
466 }
467 }
468
469 #[test]
470 fn test_receive_wrong_type() {
471 unsafe {
472 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(999));
474 stack = receive(stack);
475
476 let (stack, success) = pop(stack);
478 let (_stack, value) = pop(stack);
479 assert_eq!(success, Value::Bool(false));
480 assert_eq!(value, Value::Int(0));
481 }
482 }
483
484 #[test]
485 fn test_mpmc_concurrent_receivers() {
486 unsafe {
488 const NUM_MESSAGES: i64 = 100;
489 const NUM_RECEIVERS: usize = 4;
490
491 static RECEIVER_COUNTS: [AtomicI64; 4] = [
492 AtomicI64::new(0),
493 AtomicI64::new(0),
494 AtomicI64::new(0),
495 AtomicI64::new(0),
496 ];
497 static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
498
499 for counter in &RECEIVER_COUNTS {
501 counter.store(0, Ordering::SeqCst);
502 }
503
504 let mut stack = crate::stack::alloc_test_stack();
506 stack = make_channel(stack);
507 let (_, channel_value) = pop(stack);
508
509 let ch_ptr = match &channel_value {
510 Value::Channel(arc) => Arc::as_ptr(arc) as i64,
511 _ => panic!("Expected Channel"),
512 };
513 CHANNEL_PTR.store(ch_ptr, Ordering::SeqCst);
514
515 for _ in 0..(NUM_RECEIVERS + 1) {
517 std::mem::forget(channel_value.clone());
518 }
519
520 fn make_receiver(idx: usize) -> extern "C" fn(Stack) -> Stack {
521 match idx {
522 0 => receiver_0,
523 1 => receiver_1,
524 2 => receiver_2,
525 3 => receiver_3,
526 _ => panic!("Invalid receiver index"),
527 }
528 }
529
530 extern "C" fn receiver_0(stack: Stack) -> Stack {
531 receive_loop(0, stack)
532 }
533 extern "C" fn receiver_1(stack: Stack) -> Stack {
534 receive_loop(1, stack)
535 }
536 extern "C" fn receiver_2(stack: Stack) -> Stack {
537 receive_loop(2, stack)
538 }
539 extern "C" fn receiver_3(stack: Stack) -> Stack {
540 receive_loop(3, stack)
541 }
542
543 fn receive_loop(idx: usize, _stack: Stack) -> Stack {
544 use crate::value::ChannelData;
545 unsafe {
546 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
547 let channel = Arc::from_raw(ch_ptr);
548 let channel_clone = Arc::clone(&channel);
549 std::mem::forget(channel);
550
551 loop {
552 let mut stack = push(
553 crate::stack::alloc_test_stack(),
554 Value::Channel(channel_clone.clone()),
555 );
556 stack = receive(stack);
557 let (stack, success) = pop(stack);
558 let (_, value) = pop(stack);
559
560 match (success, value) {
561 (Value::Bool(true), Value::Int(v)) => {
562 if v < 0 {
563 break; }
565 RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
566 }
567 _ => break,
568 }
569 may::coroutine::yield_now();
570 }
571 std::ptr::null_mut()
572 }
573 }
574
575 for i in 0..NUM_RECEIVERS {
577 spawn_strand(make_receiver(i));
578 }
579
580 std::thread::sleep(std::time::Duration::from_millis(10));
581
582 for i in 0..NUM_MESSAGES {
584 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
585 let channel = Arc::from_raw(ch_ptr);
586 let channel_clone = Arc::clone(&channel);
587 std::mem::forget(channel);
588
589 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
590 stack = push(stack, Value::Channel(channel_clone));
591 let _ = send(stack);
592 }
593
594 for _ in 0..NUM_RECEIVERS {
596 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
597 let channel = Arc::from_raw(ch_ptr);
598 let channel_clone = Arc::clone(&channel);
599 std::mem::forget(channel);
600
601 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(-1));
602 stack = push(stack, Value::Channel(channel_clone));
603 let _ = send(stack);
604 }
605
606 wait_all_strands();
607
608 let total_received: i64 = RECEIVER_COUNTS
609 .iter()
610 .map(|c| c.load(Ordering::SeqCst))
611 .sum();
612 assert_eq!(total_received, NUM_MESSAGES);
613
614 let active_receivers = RECEIVER_COUNTS
615 .iter()
616 .filter(|c| c.load(Ordering::SeqCst) > 0)
617 .count();
618 assert!(active_receivers >= 2, "Messages should be distributed");
619 }
620 }
621}