1use crate::stack::{Stack, pop, push};
37use crate::value::{ChannelData, Value};
38use may::sync::mpmc;
39use std::sync::Arc;
40
41#[unsafe(no_mangle)]
51pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
52 let (sender, receiver) = mpmc::channel();
56
57 let channel = Arc::new(ChannelData { sender, receiver });
59
60 unsafe { push(stack, Value::Channel(channel)) }
61}
62
63#[unsafe(no_mangle)]
73pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
74 assert!(!stack.is_null(), "close_channel: stack is empty");
75
76 let (rest, channel_value) = unsafe { pop(stack) };
78 match channel_value {
79 Value::Channel(_) => {} _ => panic!(
81 "close_channel: expected Channel on stack, got {:?}",
82 channel_value
83 ),
84 }
85
86 rest
87}
88
89#[unsafe(no_mangle)]
99pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
100 assert!(!stack.is_null(), "chan.send: stack is empty");
101
102 let (stack, channel_value) = unsafe { pop(stack) };
104 let channel = match channel_value {
105 Value::Channel(ch) => ch,
106 _ => {
107 if !stack.is_null() {
109 let (rest, _value) = unsafe { pop(stack) };
110 return unsafe { push(rest, Value::Bool(false)) };
111 }
112 return unsafe { push(stack, Value::Bool(false)) };
113 }
114 };
115
116 if stack.is_null() {
117 return unsafe { push(stack, Value::Bool(false)) };
119 }
120
121 let (rest, value) = unsafe { pop(stack) };
123
124 let global_value = value.clone();
126
127 match channel.sender.send(global_value) {
129 Ok(()) => unsafe { push(rest, Value::Bool(true)) },
130 Err(_) => unsafe { push(rest, Value::Bool(false)) },
131 }
132}
133
134#[unsafe(no_mangle)]
149pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
150 assert!(!stack.is_null(), "chan.receive: stack is empty");
151
152 let (rest, channel_value) = unsafe { pop(stack) };
154 let channel = match channel_value {
155 Value::Channel(ch) => ch,
156 _ => {
157 let stack = unsafe { push(rest, Value::Int(0)) };
159 return unsafe { push(stack, Value::Bool(false)) };
160 }
161 };
162
163 match channel.receiver.recv() {
165 Ok(value) => {
166 let stack = unsafe { push(rest, value) };
167 unsafe { push(stack, Value::Bool(true)) }
168 }
169 Err(_) => {
170 let stack = unsafe { push(rest, Value::Int(0)) };
171 unsafe { push(stack, Value::Bool(false)) }
172 }
173 }
174}
175
176pub use patch_seq_chan_receive as receive;
178pub use patch_seq_chan_send as send;
179pub use patch_seq_close_channel as close_channel;
180pub use patch_seq_make_channel as make_channel;
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::scheduler::{spawn_strand, wait_all_strands};
186 use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
187
188 #[test]
189 fn test_make_channel() {
190 unsafe {
191 let stack = crate::stack::alloc_test_stack();
192 let stack = make_channel(stack);
193
194 let (_stack, value) = pop(stack);
196 assert!(matches!(value, Value::Channel(_)));
197 }
198 }
199
200 #[test]
201 fn test_send_receive() {
202 unsafe {
203 let mut stack = crate::stack::alloc_test_stack();
205 stack = make_channel(stack);
206
207 let (_empty_stack, channel_value) = pop(stack);
209
210 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
212 stack = push(stack, channel_value.clone());
213 stack = send(stack);
214
215 let (stack, send_success) = pop(stack);
217 assert_eq!(send_success, Value::Bool(true));
218
219 let mut stack = push(stack, channel_value);
221 stack = receive(stack);
222
223 let (stack, recv_success) = pop(stack);
225 let (_stack, received) = pop(stack);
226 assert_eq!(recv_success, Value::Bool(true));
227 assert_eq!(received, Value::Int(42));
228 }
229 }
230
231 #[test]
232 fn test_channel_dup_sharing() {
233 unsafe {
235 let mut stack = crate::stack::alloc_test_stack();
236 stack = make_channel(stack);
237
238 let (_, ch1) = pop(stack);
239 let ch2 = ch1.clone(); let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
243 stack = push(stack, ch1);
244 stack = send(stack);
245
246 let (stack, _) = pop(stack);
248
249 let mut stack = push(stack, ch2);
251 stack = receive(stack);
252
253 let (stack, _) = pop(stack);
255 let (_, received) = pop(stack);
256 assert_eq!(received, Value::Int(99));
257 }
258 }
259
260 #[test]
261 fn test_multiple_sends_receives() {
262 unsafe {
263 let mut stack = crate::stack::alloc_test_stack();
265 stack = make_channel(stack);
266 let (_, channel_value) = pop(stack);
267
268 for i in 1..=5 {
270 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
271 stack = push(stack, channel_value.clone());
272 stack = send(stack);
273 let (_, success) = pop(stack);
274 assert_eq!(success, Value::Bool(true));
275 }
276
277 for i in 1..=5 {
279 let mut stack = push(crate::stack::alloc_test_stack(), channel_value.clone());
280 stack = receive(stack);
281 let (stack, success) = pop(stack);
282 let (_, received) = pop(stack);
283 assert_eq!(success, Value::Bool(true));
284 assert_eq!(received, Value::Int(i));
285 }
286 }
287 }
288
289 #[test]
290 fn test_close_channel() {
291 unsafe {
292 let mut stack = crate::stack::alloc_test_stack();
294 stack = make_channel(stack);
295
296 let _stack = close_channel(stack);
297 }
298 }
299
300 #[test]
301 fn test_arena_string_send_between_strands() {
302 unsafe {
304 static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
305 static VERIFIED: AtomicBool = AtomicBool::new(false);
306
307 let mut stack = crate::stack::alloc_test_stack();
309 stack = make_channel(stack);
310 let (_, channel_value) = pop(stack);
311
312 let ch_ptr = match &channel_value {
314 Value::Channel(arc) => Arc::as_ptr(arc) as i64,
315 _ => panic!("Expected Channel"),
316 };
317 CHANNEL_PTR.store(ch_ptr, Ordering::Release);
318
319 std::mem::forget(channel_value.clone());
321
322 extern "C" fn sender(_stack: Stack) -> Stack {
324 use crate::seqstring::arena_string;
325 use crate::value::ChannelData;
326
327 unsafe {
328 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
329 let channel = Arc::from_raw(ch_ptr);
330 let channel_clone = Arc::clone(&channel);
331 std::mem::forget(channel); let msg = arena_string("Arena message!");
335 assert!(!msg.is_global(), "Should be arena-allocated initially");
336
337 let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
339 let stack = push(stack, Value::Channel(channel_clone));
340 let stack = send(stack);
341 let (stack, _) = pop(stack);
343 stack
344 }
345 }
346
347 extern "C" fn receiver(_stack: Stack) -> Stack {
349 use crate::value::ChannelData;
350
351 unsafe {
352 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
353 let channel = Arc::from_raw(ch_ptr);
354 let channel_clone = Arc::clone(&channel);
355 std::mem::forget(channel); let mut stack = push(
358 crate::stack::alloc_test_stack(),
359 Value::Channel(channel_clone),
360 );
361 stack = receive(stack);
362 let (stack, _) = pop(stack);
364 let (_, msg_val) = pop(stack);
365
366 match msg_val {
367 Value::String(s) => {
368 assert_eq!(s.as_str(), "Arena message!");
369 assert!(s.is_global(), "Received string should be global");
370 VERIFIED.store(true, Ordering::Release);
371 }
372 _ => panic!("Expected String"),
373 }
374
375 std::ptr::null_mut()
376 }
377 }
378
379 spawn_strand(sender);
380 spawn_strand(receiver);
381 wait_all_strands();
382
383 assert!(
384 VERIFIED.load(Ordering::Acquire),
385 "Receiver should have verified"
386 );
387 }
388 }
389
390 #[test]
391 fn test_send_success() {
392 unsafe {
393 let mut stack = crate::stack::alloc_test_stack();
394 stack = make_channel(stack);
395 let (_, channel_value) = pop(stack);
396
397 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
399 stack = push(stack, channel_value.clone());
400 stack = send(stack);
401
402 let (stack, result) = pop(stack);
404 assert_eq!(result, Value::Bool(true));
405
406 let mut stack = push(stack, channel_value);
408 stack = receive(stack);
409 let (stack, success) = pop(stack);
410 let (_, received) = pop(stack);
411 assert_eq!(success, Value::Bool(true));
412 assert_eq!(received, Value::Int(42));
413 }
414 }
415
416 #[test]
417 fn test_send_wrong_type() {
418 unsafe {
419 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
421 stack = push(stack, Value::Int(999)); stack = send(stack);
423
424 let (_stack, result) = pop(stack);
426 assert_eq!(result, Value::Bool(false));
427 }
428 }
429
430 #[test]
431 fn test_receive_success() {
432 unsafe {
433 let mut stack = crate::stack::alloc_test_stack();
434 stack = make_channel(stack);
435 let (_, channel_value) = pop(stack);
436
437 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
439 stack = push(stack, channel_value.clone());
440 stack = send(stack);
441 let (_, _) = pop(stack); let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
445 stack = receive(stack);
446
447 let (stack, success) = pop(stack);
449 let (_stack, value) = pop(stack);
450 assert_eq!(success, Value::Bool(true));
451 assert_eq!(value, Value::Int(42));
452 }
453 }
454
455 #[test]
456 fn test_receive_wrong_type() {
457 unsafe {
458 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(999));
460 stack = receive(stack);
461
462 let (stack, success) = pop(stack);
464 let (_stack, value) = pop(stack);
465 assert_eq!(success, Value::Bool(false));
466 assert_eq!(value, Value::Int(0));
467 }
468 }
469
470 #[test]
471 fn test_mpmc_concurrent_receivers() {
472 unsafe {
474 const NUM_MESSAGES: i64 = 100;
475 const NUM_RECEIVERS: usize = 4;
476
477 static RECEIVER_COUNTS: [AtomicI64; 4] = [
478 AtomicI64::new(0),
479 AtomicI64::new(0),
480 AtomicI64::new(0),
481 AtomicI64::new(0),
482 ];
483 static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
484
485 for counter in &RECEIVER_COUNTS {
487 counter.store(0, Ordering::SeqCst);
488 }
489
490 let mut stack = crate::stack::alloc_test_stack();
492 stack = make_channel(stack);
493 let (_, channel_value) = pop(stack);
494
495 let ch_ptr = match &channel_value {
496 Value::Channel(arc) => Arc::as_ptr(arc) as i64,
497 _ => panic!("Expected Channel"),
498 };
499 CHANNEL_PTR.store(ch_ptr, Ordering::SeqCst);
500
501 for _ in 0..(NUM_RECEIVERS + 1) {
503 std::mem::forget(channel_value.clone());
504 }
505
506 fn make_receiver(idx: usize) -> extern "C" fn(Stack) -> Stack {
507 match idx {
508 0 => receiver_0,
509 1 => receiver_1,
510 2 => receiver_2,
511 3 => receiver_3,
512 _ => panic!("Invalid receiver index"),
513 }
514 }
515
516 extern "C" fn receiver_0(stack: Stack) -> Stack {
517 receive_loop(0, stack)
518 }
519 extern "C" fn receiver_1(stack: Stack) -> Stack {
520 receive_loop(1, stack)
521 }
522 extern "C" fn receiver_2(stack: Stack) -> Stack {
523 receive_loop(2, stack)
524 }
525 extern "C" fn receiver_3(stack: Stack) -> Stack {
526 receive_loop(3, stack)
527 }
528
529 fn receive_loop(idx: usize, _stack: Stack) -> Stack {
530 use crate::value::ChannelData;
531 unsafe {
532 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
533 let channel = Arc::from_raw(ch_ptr);
534 let channel_clone = Arc::clone(&channel);
535 std::mem::forget(channel);
536
537 loop {
538 let mut stack = push(
539 crate::stack::alloc_test_stack(),
540 Value::Channel(channel_clone.clone()),
541 );
542 stack = receive(stack);
543 let (stack, success) = pop(stack);
544 let (_, value) = pop(stack);
545
546 match (success, value) {
547 (Value::Bool(true), Value::Int(v)) => {
548 if v < 0 {
549 break; }
551 RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
552 }
553 _ => break,
554 }
555 may::coroutine::yield_now();
556 }
557 std::ptr::null_mut()
558 }
559 }
560
561 for i in 0..NUM_RECEIVERS {
563 spawn_strand(make_receiver(i));
564 }
565
566 std::thread::sleep(std::time::Duration::from_millis(10));
567
568 for i in 0..NUM_MESSAGES {
570 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
571 let channel = Arc::from_raw(ch_ptr);
572 let channel_clone = Arc::clone(&channel);
573 std::mem::forget(channel);
574
575 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
576 stack = push(stack, Value::Channel(channel_clone));
577 let _ = send(stack);
578 }
579
580 for _ in 0..NUM_RECEIVERS {
582 let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
583 let channel = Arc::from_raw(ch_ptr);
584 let channel_clone = Arc::clone(&channel);
585 std::mem::forget(channel);
586
587 let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(-1));
588 stack = push(stack, Value::Channel(channel_clone));
589 let _ = send(stack);
590 }
591
592 wait_all_strands();
593
594 let total_received: i64 = RECEIVER_COUNTS
595 .iter()
596 .map(|c| c.load(Ordering::SeqCst))
597 .sum();
598 assert_eq!(total_received, NUM_MESSAGES);
599
600 let active_receivers = RECEIVER_COUNTS
601 .iter()
602 .filter(|c| c.load(Ordering::SeqCst) > 0)
603 .count();
604 assert!(active_receivers >= 2, "Messages should be distributed");
605 }
606 }
607}