seq_runtime/
channel.rs

1//! Channel operations for CSP-style concurrency
2//!
3//! Channels are the primary communication mechanism between strands.
4//! They use May's MPMC channels with cooperative blocking.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! All channel operations (`send`, `receive`) cooperatively block using May's scheduler.
9//! They NEVER block OS threads - May handles scheduling other strands while waiting.
10//!
11//! ## Multi-Consumer Support
12//!
13//! Channels support multiple producers AND multiple consumers (MPMC). Multiple strands
14//! can receive from the same channel concurrently - each message is delivered to exactly
15//! one receiver (work-stealing semantics).
16//!
17//! ## Error Handling
18//!
19//! Two variants are available for send/receive:
20//!
21//! - `send` / `receive` - Panic on errors (closed channel, invalid ID)
22//! - `send-safe` / `receive-safe` - Return success flag instead of panicking
23//!
24//! The safe variants enable graceful shutdown patterns:
25//! ```seq
26//! value channel-id send-safe if
27//!   # sent successfully
28//! else
29//!   # channel closed, handle gracefully
30//! then
31//! ```
32
33use crate::stack::{Stack, pop, push};
34use crate::value::Value;
35use may::sync::mpmc;
36use std::collections::HashMap;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::sync::{Arc, Mutex, Once};
39
40/// Unique channel ID generation
41static NEXT_CHANNEL_ID: AtomicU64 = AtomicU64::new(1);
42
43/// Global channel registry
44/// Maps channel IDs to sender/receiver pairs
45static CHANNEL_REGISTRY: Mutex<Option<HashMap<u64, ChannelPair>>> = Mutex::new(None);
46
47/// Initialize the channel registry exactly once (lock-free after first call)
48static REGISTRY_INIT: Once = Once::new();
49
50/// Per-channel statistics (wrapped in Arc for lock-free access)
51#[derive(Debug)]
52struct ChannelStatsInner {
53    /// Lifetime count of messages sent (monotonic)
54    send_count: AtomicU64,
55    /// Lifetime count of messages received (monotonic)
56    receive_count: AtomicU64,
57}
58
59/// A channel pair (sender and receiver) with statistics
60/// Both sender and receiver are cloneable (MPMC) - no mutex needed
61/// Stats are Arc<> to allow updating after releasing the registry lock
62struct ChannelPair {
63    sender: mpmc::Sender<Value>,
64    receiver: mpmc::Receiver<Value>,
65    stats: Arc<ChannelStatsInner>,
66}
67
68/// Initialize the channel registry (lock-free after first call)
69fn init_registry() {
70    REGISTRY_INIT.call_once(|| {
71            let mut guard = CHANNEL_REGISTRY.lock()
72                .expect("init_registry: channel registry lock poisoned during initialization - strand panicked while holding lock");
73        *guard = Some(HashMap::new());
74    });
75}
76
77/// Get the number of open channels (for diagnostics)
78///
79/// Returns None if the registry lock is held (to avoid blocking in signal handler).
80/// This is a best-effort diagnostic - the count may be slightly stale.
81pub fn channel_count() -> Option<usize> {
82    // Use try_lock to avoid blocking in signal handler context
83    match CHANNEL_REGISTRY.try_lock() {
84        Ok(guard) => guard.as_ref().map(|registry| registry.len()),
85        Err(_) => None, // Lock held, return None rather than block
86    }
87}
88
89/// Per-channel statistics for diagnostics
90#[derive(Debug, Clone)]
91pub struct ChannelStats {
92    /// Channel ID
93    pub id: u64,
94    /// Current queue depth (sends - receives)
95    pub queue_depth: u64,
96    /// Lifetime count of messages sent
97    pub send_count: u64,
98    /// Lifetime count of messages received
99    pub receive_count: u64,
100}
101
102/// Get per-channel statistics for all open channels (for diagnostics)
103///
104/// Returns None if the registry lock is held (to avoid blocking in signal handler).
105/// Returns an empty Vec if no channels are open.
106///
107/// Queue depth is computed as send_count - receive_count. Due to the lock-free
108/// nature of the counters, there may be brief inconsistencies (e.g., depth < 0
109/// is clamped to 0), but this is acceptable for monitoring purposes.
110pub fn channel_stats() -> Option<Vec<ChannelStats>> {
111    // Use try_lock to avoid blocking in signal handler context
112    match CHANNEL_REGISTRY.try_lock() {
113        Ok(guard) => {
114            guard.as_ref().map(|registry| {
115                registry
116                    .iter()
117                    .map(|(&id, pair)| {
118                        let send_count = pair.stats.send_count.load(Ordering::Relaxed);
119                        let receive_count = pair.stats.receive_count.load(Ordering::Relaxed);
120                        // Queue depth = sends - receives, clamped to 0
121                        let queue_depth = send_count.saturating_sub(receive_count);
122                        ChannelStats {
123                            id,
124                            queue_depth,
125                            send_count,
126                            receive_count,
127                        }
128                    })
129                    .collect()
130            })
131        }
132        Err(_) => None, // Lock held, return None rather than block
133    }
134}
135
136/// Create a new channel
137///
138/// Stack effect: ( -- channel_id )
139///
140/// Returns a channel ID that can be used with send/receive operations.
141///
142/// # Safety
143/// Always safe to call
144#[unsafe(no_mangle)]
145pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
146    init_registry();
147
148    // Create an unbounded MPMC channel
149    // May's mpmc::channel() creates coroutine-aware channels with multi-producer, multi-consumer
150    // The recv() operation cooperatively blocks (yields) instead of blocking the OS thread
151    // Both sender and receiver are Clone - no mutex needed for sharing
152    let (sender, receiver) = mpmc::channel();
153
154    let channel_id = NEXT_CHANNEL_ID.fetch_add(1, Ordering::Relaxed);
155
156    // Store in registry
157    let mut guard = CHANNEL_REGISTRY.lock().expect(
158        "make_channel: channel registry lock poisoned - strand panicked while holding lock",
159    );
160
161    let registry = guard
162        .as_mut()
163        .expect("make_channel: channel registry not initialized - call init_registry first");
164
165    registry.insert(
166        channel_id,
167        ChannelPair {
168            sender,
169            receiver,
170            stats: Arc::new(ChannelStatsInner {
171                send_count: AtomicU64::new(0),
172                receive_count: AtomicU64::new(0),
173            }),
174        },
175    );
176
177    // Push channel ID onto stack
178    unsafe { push(stack, Value::Int(channel_id as i64)) }
179}
180
181/// Send a value through a channel
182///
183/// Stack effect: ( value channel_id -- )
184///
185/// Blocks the strand if the channel is full until space becomes available.
186/// This is cooperative blocking - the strand yields and May handles scheduling.
187///
188/// # Safety
189/// Stack must have a channel ID (Int) on top and a value below it
190#[unsafe(no_mangle)]
191pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
192    assert!(!stack.is_null(), "send: stack is empty");
193
194    // Pop channel ID
195    let (stack, channel_id_value) = unsafe { pop(stack) };
196    let channel_id = match channel_id_value {
197        Value::Int(id) => {
198            if id < 0 {
199                panic!("send: channel ID must be positive, got {}", id);
200            }
201            id as u64
202        }
203        _ => panic!("send: expected channel ID (Int) on stack"),
204    };
205
206    assert!(!stack.is_null(), "send: stack has only one value");
207
208    // Pop value to send
209    let (rest, value) = unsafe { pop(stack) };
210
211    // Get sender from registry
212    let guard = CHANNEL_REGISTRY
213        .lock()
214        .expect("send: channel registry lock poisoned - strand panicked while holding lock");
215
216    let registry = guard
217        .as_ref()
218        .expect("send: channel registry not initialized - call init_registry first");
219
220    let pair = match registry.get(&channel_id) {
221        Some(p) => p,
222        None => panic!("send: invalid channel ID {}", channel_id),
223    };
224
225    // Clone the sender and stats so we can use them outside the lock
226    let sender = pair.sender.clone();
227    let stats = Arc::clone(&pair.stats);
228    drop(guard); // Release lock before potentially blocking
229
230    // Clone the value before sending to ensure arena strings are promoted to global
231    // CemString::clone() allocates from global heap (see cemstring.rs:75-78)
232    // This prevents use-after-free when sender's arena is reset before receiver accesses the string
233    let global_value = value.clone();
234
235    // Send the value (may block if channel is full)
236    // May's scheduler will handle the blocking cooperatively
237    sender.send(global_value).expect("send: channel closed");
238
239    // Update stats after successful send
240    stats.send_count.fetch_add(1, Ordering::Relaxed);
241
242    rest
243}
244
245/// Receive a value from a channel
246///
247/// Stack effect: ( channel_id -- value )
248///
249/// Blocks the strand until a value is available.
250/// This is cooperative blocking - the strand yields and May handles scheduling.
251///
252/// ## Multi-Consumer Support
253///
254/// Multiple strands can receive from the same channel concurrently (MPMC).
255/// Each message is delivered to exactly one receiver (work-stealing semantics).
256/// No serialization - strands compete fairly for messages.
257///
258/// # Safety
259/// Stack must have a channel ID (Int) on top
260#[unsafe(no_mangle)]
261pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
262    assert!(!stack.is_null(), "receive: stack is empty");
263
264    // Pop channel ID
265    let (rest, channel_id_value) = unsafe { pop(stack) };
266    let channel_id = match channel_id_value {
267        Value::Int(id) => {
268            if id < 0 {
269                panic!("receive: channel ID must be positive, got {}", id);
270            }
271            id as u64
272        }
273        _ => panic!("receive: expected channel ID (Int) on stack"),
274    };
275
276    // Clone receiver and stats from registry (don't hold lock during recv!)
277    // MPMC receiver is Clone - no mutex needed
278    let (receiver, stats) = {
279        let guard = CHANNEL_REGISTRY
280            .lock()
281            .expect("receive: channel registry lock poisoned - strand panicked while holding lock");
282
283        let registry = guard
284            .as_ref()
285            .expect("receive: channel registry not initialized - call init_registry first");
286
287        let pair = match registry.get(&channel_id) {
288            Some(p) => p,
289            None => panic!("receive: invalid channel ID {}", channel_id),
290        };
291
292        (pair.receiver.clone(), Arc::clone(&pair.stats))
293    }; // Registry lock released here!
294
295    // Receive a value (cooperatively blocks the strand until available)
296    // May's recv() yields to the scheduler, not blocking the OS thread
297    // Multiple strands can wait concurrently - MPMC handles synchronization
298    let value = match receiver.recv() {
299        Ok(v) => v,
300        Err(_) => panic!("receive: channel closed"),
301    };
302
303    // Update stats after successful receive
304    stats.receive_count.fetch_add(1, Ordering::Relaxed);
305
306    unsafe { push(rest, value) }
307}
308
309/// Close a channel and remove it from the registry
310///
311/// Stack effect: ( channel_id -- )
312///
313/// After closing, send/receive operations on this channel will fail.
314///
315/// # Safety
316/// Stack must have a channel ID (Int) on top
317#[unsafe(no_mangle)]
318pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
319    assert!(!stack.is_null(), "close_channel: stack is empty");
320
321    // Pop channel ID
322    let (rest, channel_id_value) = unsafe { pop(stack) };
323    let channel_id = match channel_id_value {
324        Value::Int(id) => {
325            if id < 0 {
326                panic!("close_channel: channel ID must be positive, got {}", id);
327            }
328            id as u64
329        }
330        _ => panic!("close_channel: expected channel ID (Int) on stack"),
331    };
332
333    // Remove from registry
334    let mut guard = CHANNEL_REGISTRY.lock().expect(
335        "close_channel: channel registry lock poisoned - strand panicked while holding lock",
336    );
337
338    let registry = guard
339        .as_mut()
340        .expect("close_channel: channel registry not initialized - call init_registry first");
341
342    registry.remove(&channel_id);
343
344    rest
345}
346
347/// Send a value through a channel, with error handling
348///
349/// Stack effect: ( value channel_id -- success_flag )
350///
351/// Returns 1 on success, 0 on failure (closed channel or invalid ID).
352/// Does not panic on errors - returns 0 instead.
353///
354/// # Safety
355/// Stack must have a channel ID (Int) on top and a value below it
356#[unsafe(no_mangle)]
357pub unsafe extern "C" fn patch_seq_chan_send_safe(stack: Stack) -> Stack {
358    assert!(!stack.is_null(), "send-safe: stack is empty");
359
360    // Pop channel ID
361    let (stack, channel_id_value) = unsafe { pop(stack) };
362    let channel_id = match channel_id_value {
363        Value::Int(id) => {
364            if id < 0 {
365                // Invalid channel ID - consume value and return failure
366                if !stack.is_null() {
367                    let (rest, _value) = unsafe { pop(stack) };
368                    return unsafe { push(rest, Value::Int(0)) };
369                }
370                return unsafe { push(stack, Value::Int(0)) };
371            }
372            id as u64
373        }
374        _ => panic!("send-safe: expected channel ID (Int) on stack"),
375    };
376
377    if stack.is_null() {
378        // No value to send - return failure
379        return unsafe { push(stack, Value::Int(0)) };
380    }
381
382    // Pop value to send
383    let (rest, value) = unsafe { pop(stack) };
384
385    // Get sender and stats from registry
386    let (sender, stats) = {
387        let guard = match CHANNEL_REGISTRY.lock() {
388            Ok(g) => g,
389            Err(_) => return unsafe { push(rest, Value::Int(0)) },
390        };
391
392        let registry = match guard.as_ref() {
393            Some(r) => r,
394            None => return unsafe { push(rest, Value::Int(0)) },
395        };
396
397        match registry.get(&channel_id) {
398            Some(p) => (p.sender.clone(), Arc::clone(&p.stats)),
399            None => return unsafe { push(rest, Value::Int(0)) },
400        }
401    };
402
403    // Clone the value before sending to ensure arena strings are promoted to global
404    let global_value = value.clone();
405
406    // Send the value
407    match sender.send(global_value) {
408        Ok(()) => {
409            stats.send_count.fetch_add(1, Ordering::Relaxed);
410            unsafe { push(rest, Value::Int(1)) }
411        }
412        Err(_) => unsafe { push(rest, Value::Int(0)) },
413    }
414}
415
416/// Receive a value from a channel, with error handling
417///
418/// Stack effect: ( channel_id -- value success_flag )
419///
420/// Returns (value, 1) on success, (0, 0) on failure (closed channel or invalid ID).
421/// Does not panic on errors - returns (0, 0) instead.
422///
423/// ## Multi-Consumer Support
424///
425/// Multiple strands can receive from the same channel concurrently (MPMC).
426/// Each message is delivered to exactly one receiver (work-stealing semantics).
427///
428/// # Safety
429/// Stack must have a channel ID (Int) on top
430#[unsafe(no_mangle)]
431pub unsafe extern "C" fn patch_seq_chan_receive_safe(stack: Stack) -> Stack {
432    assert!(!stack.is_null(), "receive-safe: stack is empty");
433
434    // Pop channel ID
435    let (rest, channel_id_value) = unsafe { pop(stack) };
436    let channel_id = match channel_id_value {
437        Value::Int(id) => {
438            if id < 0 {
439                // Invalid channel ID - return failure
440                let stack = unsafe { push(rest, Value::Int(0)) };
441                return unsafe { push(stack, Value::Int(0)) };
442            }
443            id as u64
444        }
445        _ => panic!("receive-safe: expected channel ID (Int) on stack"),
446    };
447
448    // Clone receiver and stats from registry (MPMC receiver is Clone)
449    let (receiver, stats) = {
450        let guard = match CHANNEL_REGISTRY.lock() {
451            Ok(g) => g,
452            Err(_) => {
453                let stack = unsafe { push(rest, Value::Int(0)) };
454                return unsafe { push(stack, Value::Int(0)) };
455            }
456        };
457
458        let registry = match guard.as_ref() {
459            Some(r) => r,
460            None => {
461                let stack = unsafe { push(rest, Value::Int(0)) };
462                return unsafe { push(stack, Value::Int(0)) };
463            }
464        };
465
466        match registry.get(&channel_id) {
467            Some(p) => (p.receiver.clone(), Arc::clone(&p.stats)),
468            None => {
469                let stack = unsafe { push(rest, Value::Int(0)) };
470                return unsafe { push(stack, Value::Int(0)) };
471            }
472        }
473    };
474
475    // Receive a value - MPMC handles concurrent receivers
476    match receiver.recv() {
477        Ok(value) => {
478            stats.receive_count.fetch_add(1, Ordering::Relaxed);
479            let stack = unsafe { push(rest, value) };
480            unsafe { push(stack, Value::Int(1)) }
481        }
482        Err(_) => {
483            let stack = unsafe { push(rest, Value::Int(0)) };
484            unsafe { push(stack, Value::Int(0)) }
485        }
486    }
487}
488
489// Public re-exports with short names for internal use
490pub use patch_seq_chan_receive as receive;
491pub use patch_seq_chan_receive_safe as receive_safe;
492pub use patch_seq_chan_send as send;
493pub use patch_seq_chan_send_safe as send_safe;
494pub use patch_seq_close_channel as close_channel;
495pub use patch_seq_make_channel as make_channel;
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use crate::scheduler::{spawn_strand, wait_all_strands};
501    use std::sync::atomic::{AtomicI64, Ordering};
502
503    #[test]
504    fn test_make_channel() {
505        unsafe {
506            let stack = std::ptr::null_mut();
507            let stack = make_channel(stack);
508
509            // Should have channel ID on stack
510            let (stack, value) = pop(stack);
511            assert!(matches!(value, Value::Int(_)));
512            assert!(stack.is_null());
513        }
514    }
515
516    #[test]
517    fn test_send_receive() {
518        unsafe {
519            // Create a channel
520            let mut stack = std::ptr::null_mut();
521            stack = make_channel(stack);
522
523            // Get channel ID
524            let (empty_stack, channel_id_value) = pop(stack);
525            assert!(empty_stack.is_null());
526
527            // Push value to send
528            let mut stack = push(std::ptr::null_mut(), Value::Int(42));
529            stack = push(stack, channel_id_value.clone());
530            stack = send(stack);
531            assert!(stack.is_null());
532
533            // Receive value
534            stack = push(stack, channel_id_value);
535            stack = receive(stack);
536
537            // Should have received value
538            let (stack, received) = pop(stack);
539            assert_eq!(received, Value::Int(42));
540            assert!(stack.is_null());
541        }
542    }
543
544    #[test]
545    fn test_channel_communication_between_strands() {
546        unsafe {
547            static RECEIVED_VALUE: AtomicI64 = AtomicI64::new(0);
548
549            // Create a channel
550            let mut stack = std::ptr::null_mut();
551            stack = make_channel(stack);
552            let (_, channel_id_value) = pop(stack);
553            let channel_id = match channel_id_value {
554                Value::Int(id) => id,
555                _ => panic!("Expected Int"),
556            };
557
558            // Receiver strand
559            extern "C" fn receiver(_stack: Stack) -> Stack {
560                unsafe {
561                    let channel_id = RECEIVED_VALUE.load(Ordering::Acquire); // Temporary storage
562                    let mut stack = push(std::ptr::null_mut(), Value::Int(channel_id));
563                    stack = receive(stack);
564                    let (_, value) = pop(stack);
565                    if let Value::Int(n) = value {
566                        RECEIVED_VALUE.store(n, Ordering::Release);
567                    }
568                    std::ptr::null_mut()
569                }
570            }
571
572            // Store channel ID temporarily
573            RECEIVED_VALUE.store(channel_id, Ordering::Release);
574
575            // Spawn receiver strand
576            spawn_strand(receiver);
577
578            // Give receiver time to start
579            std::thread::sleep(std::time::Duration::from_millis(10));
580
581            // Send value from main strand
582            let mut stack = push(std::ptr::null_mut(), Value::Int(123));
583            stack = push(stack, Value::Int(channel_id));
584            let _ = send(stack);
585
586            // Wait for all strands
587            wait_all_strands();
588
589            // Check received value
590            assert_eq!(RECEIVED_VALUE.load(Ordering::Acquire), 123);
591        }
592    }
593
594    #[test]
595    fn test_multiple_sends_receives() {
596        unsafe {
597            // Create a channel
598            let mut stack = std::ptr::null_mut();
599            stack = make_channel(stack);
600            let (_, channel_id_value) = pop(stack);
601
602            // Send multiple values
603            for i in 1..=5 {
604                let mut stack = push(std::ptr::null_mut(), Value::Int(i));
605                stack = push(stack, channel_id_value.clone());
606                let _ = send(stack);
607            }
608
609            // Receive them back in order
610            for i in 1..=5 {
611                let mut stack = push(std::ptr::null_mut(), channel_id_value.clone());
612                stack = receive(stack);
613                let (_, received) = pop(stack);
614                assert_eq!(received, Value::Int(i));
615            }
616        }
617    }
618
619    #[test]
620    fn test_close_channel() {
621        unsafe {
622            // Create and close a channel
623            let mut stack = std::ptr::null_mut();
624            stack = make_channel(stack);
625            let (rest, channel_id) = pop(stack);
626
627            stack = push(rest, channel_id);
628            stack = close_channel(stack);
629            assert!(stack.is_null());
630        }
631    }
632
633    #[test]
634    fn test_arena_string_send_between_strands() {
635        // This test verifies that arena-allocated strings are properly cloned
636        // to global storage when sent through channels (fix for issue #13)
637        unsafe {
638            use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
639
640            static CHANNEL_ID: AtomicI64 = AtomicI64::new(0);
641            static VERIFIED: AtomicBool = AtomicBool::new(false);
642
643            // Create a channel
644            let mut stack = std::ptr::null_mut();
645            stack = make_channel(stack);
646            let (_, channel_id_value) = pop(stack);
647            let channel_id = match channel_id_value {
648                Value::Int(id) => id,
649                _ => panic!("Expected Int"),
650            };
651
652            // Store channel ID for strands
653            CHANNEL_ID.store(channel_id, Ordering::Release);
654
655            // Sender strand: creates arena string and sends it
656            extern "C" fn sender(_stack: Stack) -> Stack {
657                use crate::seqstring::arena_string;
658                use crate::stack::push;
659                use crate::value::Value;
660                use std::sync::atomic::Ordering;
661
662                unsafe {
663                    let chan_id = CHANNEL_ID.load(Ordering::Acquire);
664
665                    // Create arena string (fast path)
666                    let msg = arena_string("Arena message!");
667                    assert!(!msg.is_global(), "Should be arena-allocated initially");
668
669                    // Send through channel (will be cloned to global)
670                    let stack = push(std::ptr::null_mut(), Value::String(msg));
671                    let stack = push(stack, Value::Int(chan_id));
672                    send(stack)
673                }
674            }
675
676            // Receiver strand: receives string and verifies it
677            extern "C" fn receiver(_stack: Stack) -> Stack {
678                use crate::stack::{pop, push};
679                use crate::value::Value;
680                use std::sync::atomic::Ordering;
681
682                unsafe {
683                    let chan_id = CHANNEL_ID.load(Ordering::Acquire);
684
685                    let mut stack = push(std::ptr::null_mut(), Value::Int(chan_id));
686                    stack = receive(stack);
687                    let (_, msg_val) = pop(stack);
688
689                    match msg_val {
690                        Value::String(s) => {
691                            assert_eq!(s.as_str(), "Arena message!");
692                            // Verify it was cloned to global
693                            assert!(s.is_global(), "Received string should be global");
694                            VERIFIED.store(true, Ordering::Release);
695                        }
696                        _ => panic!("Expected String"),
697                    }
698
699                    std::ptr::null_mut()
700                }
701            }
702
703            // Spawn both strands
704            spawn_strand(sender);
705            spawn_strand(receiver);
706
707            // Wait for both strands
708            wait_all_strands();
709
710            // Verify message was received correctly
711            assert!(
712                VERIFIED.load(Ordering::Acquire),
713                "Receiver should have verified the message"
714            );
715        }
716    }
717
718    // Note: Cannot test negative channel ID panics with #[should_panic] because
719    // these are extern "C" functions which cannot unwind. The validation is still
720    // in place at runtime - see lines 100-102, 157-159, 217-219.
721
722    #[test]
723    fn test_send_safe_success() {
724        unsafe {
725            // Create a channel
726            let mut stack = std::ptr::null_mut();
727            stack = make_channel(stack);
728            let (_, channel_id_value) = pop(stack);
729
730            // Send value using send-safe
731            let mut stack = push(std::ptr::null_mut(), Value::Int(42));
732            stack = push(stack, channel_id_value.clone());
733            stack = send_safe(stack);
734
735            // Should return success (1)
736            let (stack, result) = pop(stack);
737            assert_eq!(result, Value::Int(1));
738            assert!(stack.is_null());
739
740            // Receive value to verify it was sent
741            let mut stack = push(std::ptr::null_mut(), channel_id_value);
742            stack = receive(stack);
743            let (_, received) = pop(stack);
744            assert_eq!(received, Value::Int(42));
745        }
746    }
747
748    #[test]
749    fn test_send_safe_invalid_channel() {
750        unsafe {
751            // Try to send to invalid channel ID
752            let mut stack = push(std::ptr::null_mut(), Value::Int(42));
753            stack = push(stack, Value::Int(999999)); // Non-existent channel
754            stack = send_safe(stack);
755
756            // Should return failure (0)
757            let (stack, result) = pop(stack);
758            assert_eq!(result, Value::Int(0));
759            assert!(stack.is_null());
760        }
761    }
762
763    #[test]
764    fn test_send_safe_negative_channel() {
765        unsafe {
766            // Try to send to negative channel ID
767            let mut stack = push(std::ptr::null_mut(), Value::Int(42));
768            stack = push(stack, Value::Int(-1));
769            stack = send_safe(stack);
770
771            // Should return failure (0), value consumed per stack effect
772            let (stack, result) = pop(stack);
773            assert_eq!(result, Value::Int(0));
774            assert!(stack.is_null()); // Value was properly consumed
775        }
776    }
777
778    #[test]
779    fn test_receive_safe_success() {
780        unsafe {
781            // Create a channel and send a value
782            let mut stack = std::ptr::null_mut();
783            stack = make_channel(stack);
784            let (_, channel_id_value) = pop(stack);
785
786            // Send value
787            let mut stack = push(std::ptr::null_mut(), Value::Int(42));
788            stack = push(stack, channel_id_value.clone());
789            let _ = send(stack);
790
791            // Receive using receive-safe
792            let mut stack = push(std::ptr::null_mut(), channel_id_value);
793            stack = receive_safe(stack);
794
795            // Should return (value, 1)
796            let (stack, success) = pop(stack);
797            let (stack, value) = pop(stack);
798            assert_eq!(success, Value::Int(1));
799            assert_eq!(value, Value::Int(42));
800            assert!(stack.is_null());
801        }
802    }
803
804    #[test]
805    fn test_receive_safe_invalid_channel() {
806        unsafe {
807            // Try to receive from invalid channel ID
808            let mut stack = push(std::ptr::null_mut(), Value::Int(999999));
809            stack = receive_safe(stack);
810
811            // Should return (0, 0)
812            let (stack, success) = pop(stack);
813            let (stack, value) = pop(stack);
814            assert_eq!(success, Value::Int(0));
815            assert_eq!(value, Value::Int(0));
816            assert!(stack.is_null());
817        }
818    }
819
820    #[test]
821    fn test_receive_safe_closed_channel() {
822        unsafe {
823            // Create a channel
824            let mut stack = std::ptr::null_mut();
825            stack = make_channel(stack);
826            let (_, channel_id_value) = pop(stack);
827            let channel_id = match &channel_id_value {
828                Value::Int(id) => *id,
829                _ => panic!("Expected Int"),
830            };
831
832            // Close the channel
833            let stack = push(std::ptr::null_mut(), channel_id_value);
834            let _ = close_channel(stack);
835
836            // Try to receive from closed channel
837            let mut stack = push(std::ptr::null_mut(), Value::Int(channel_id));
838            stack = receive_safe(stack);
839
840            // Should return (0, 0)
841            let (stack, success) = pop(stack);
842            let (stack, value) = pop(stack);
843            assert_eq!(success, Value::Int(0));
844            assert_eq!(value, Value::Int(0));
845            assert!(stack.is_null());
846        }
847    }
848
849    // Helper to get stats with retry (handles parallel test lock contention)
850    fn get_stats_with_retry() -> Option<Vec<super::ChannelStats>> {
851        for _ in 0..10 {
852            if let Some(stats) = super::channel_stats() {
853                return Some(stats);
854            }
855            std::thread::sleep(std::time::Duration::from_millis(1));
856        }
857        None
858    }
859
860    #[test]
861    fn test_channel_stats() {
862        unsafe {
863            // Create a channel
864            let mut stack = std::ptr::null_mut();
865            stack = make_channel(stack);
866            let (_, channel_id_value) = pop(stack);
867            let channel_id = match &channel_id_value {
868                Value::Int(id) => *id as u64,
869                _ => panic!("Expected Int"),
870            };
871
872            // Initially, stats should show 0 sends and 0 receives
873            // Use retry to handle parallel test lock contention
874            let stats = match get_stats_with_retry() {
875                Some(s) => s,
876                None => {
877                    // Skip test if we can't get lock after retries (parallel test contention)
878                    let stack = push(std::ptr::null_mut(), channel_id_value);
879                    let _ = close_channel(stack);
880                    return;
881                }
882            };
883            let our_channel = stats.iter().find(|s| s.id == channel_id);
884            assert!(our_channel.is_some(), "Our channel should be in stats");
885            let stat = our_channel.unwrap();
886            assert_eq!(stat.send_count, 0);
887            assert_eq!(stat.receive_count, 0);
888            assert_eq!(stat.queue_depth, 0);
889
890            // Send some values
891            for i in 1..=5 {
892                let mut stack = push(std::ptr::null_mut(), Value::Int(i));
893                stack = push(stack, channel_id_value.clone());
894                let _ = send(stack);
895            }
896
897            // Check stats after sends
898            let stats = get_stats_with_retry().expect("Should get stats after retries");
899            let stat = stats.iter().find(|s| s.id == channel_id).unwrap();
900            assert_eq!(stat.send_count, 5);
901            assert_eq!(stat.receive_count, 0);
902            assert_eq!(stat.queue_depth, 5);
903
904            // Receive some values
905            for _ in 0..3 {
906                let mut stack = push(std::ptr::null_mut(), channel_id_value.clone());
907                stack = receive(stack);
908                let _ = pop(stack);
909            }
910
911            // Check stats after receives
912            let stats = get_stats_with_retry().expect("Should get stats after retries");
913            let stat = stats.iter().find(|s| s.id == channel_id).unwrap();
914            assert_eq!(stat.send_count, 5);
915            assert_eq!(stat.receive_count, 3);
916            assert_eq!(stat.queue_depth, 2);
917
918            // Clean up - receive remaining and close
919            for _ in 0..2 {
920                let mut stack = push(std::ptr::null_mut(), channel_id_value.clone());
921                stack = receive(stack);
922                let _ = pop(stack);
923            }
924
925            let stack = push(std::ptr::null_mut(), channel_id_value);
926            let _ = close_channel(stack);
927        }
928    }
929
930    #[test]
931    fn test_mpmc_concurrent_receivers() {
932        // Verify that multiple receivers can receive from the same channel concurrently
933        // and that messages are distributed (not duplicated)
934        unsafe {
935            use std::sync::atomic::{AtomicI64, Ordering};
936
937            const NUM_MESSAGES: i64 = 100;
938            const NUM_RECEIVERS: usize = 4;
939
940            // Shared counters for each receiver
941            static RECEIVER_COUNTS: [AtomicI64; 4] = [
942                AtomicI64::new(0),
943                AtomicI64::new(0),
944                AtomicI64::new(0),
945                AtomicI64::new(0),
946            ];
947            static CHANNEL_ID: AtomicI64 = AtomicI64::new(0);
948
949            // Reset counters
950            for counter in &RECEIVER_COUNTS {
951                counter.store(0, Ordering::SeqCst);
952            }
953
954            // Create channel
955            let mut stack = std::ptr::null_mut();
956            stack = make_channel(stack);
957            let (_, channel_id_value) = pop(stack);
958            let channel_id = match channel_id_value {
959                Value::Int(id) => id,
960                _ => panic!("Expected Int"),
961            };
962            CHANNEL_ID.store(channel_id, Ordering::SeqCst);
963
964            // Receiver strand factory
965            fn make_receiver(receiver_idx: usize) -> extern "C" fn(Stack) -> Stack {
966                match receiver_idx {
967                    0 => receiver_0,
968                    1 => receiver_1,
969                    2 => receiver_2,
970                    3 => receiver_3,
971                    _ => panic!("Invalid receiver index"),
972                }
973            }
974
975            extern "C" fn receiver_0(stack: Stack) -> Stack {
976                receive_loop(0, stack)
977            }
978            extern "C" fn receiver_1(stack: Stack) -> Stack {
979                receive_loop(1, stack)
980            }
981            extern "C" fn receiver_2(stack: Stack) -> Stack {
982                receive_loop(2, stack)
983            }
984            extern "C" fn receiver_3(stack: Stack) -> Stack {
985                receive_loop(3, stack)
986            }
987
988            fn receive_loop(idx: usize, _stack: Stack) -> Stack {
989                unsafe {
990                    let chan_id = CHANNEL_ID.load(Ordering::SeqCst);
991                    loop {
992                        let mut stack = push(std::ptr::null_mut(), Value::Int(chan_id));
993                        stack = receive_safe(stack);
994                        let (stack, success) = pop(stack);
995                        let (_, value) = pop(stack);
996
997                        match (success, value) {
998                            (Value::Int(1), Value::Int(v)) => {
999                                if v < 0 {
1000                                    // Sentinel - exit
1001                                    break;
1002                                }
1003                                RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
1004                            }
1005                            _ => break, // Channel closed or error
1006                        }
1007                        may::coroutine::yield_now();
1008                    }
1009                    std::ptr::null_mut()
1010                }
1011            }
1012
1013            // Spawn receivers
1014            for i in 0..NUM_RECEIVERS {
1015                crate::scheduler::spawn_strand(make_receiver(i));
1016            }
1017
1018            // Give receivers time to start
1019            std::thread::sleep(std::time::Duration::from_millis(10));
1020
1021            // Send messages
1022            for i in 0..NUM_MESSAGES {
1023                let mut stack = push(std::ptr::null_mut(), Value::Int(i));
1024                stack = push(stack, Value::Int(channel_id));
1025                let _ = send(stack);
1026            }
1027
1028            // Send sentinels to stop receivers
1029            for _ in 0..NUM_RECEIVERS {
1030                let mut stack = push(std::ptr::null_mut(), Value::Int(-1));
1031                stack = push(stack, Value::Int(channel_id));
1032                let _ = send(stack);
1033            }
1034
1035            // Wait for all strands
1036            crate::scheduler::wait_all_strands();
1037
1038            // Verify: total received should equal messages sent
1039            let total_received: i64 = RECEIVER_COUNTS
1040                .iter()
1041                .map(|c| c.load(Ordering::SeqCst))
1042                .sum();
1043
1044            assert_eq!(
1045                total_received, NUM_MESSAGES,
1046                "Total received ({}) should equal messages sent ({})",
1047                total_received, NUM_MESSAGES
1048            );
1049
1050            // Verify: messages were distributed (not all to one receiver)
1051            // At least 2 receivers should have received messages
1052            let active_receivers = RECEIVER_COUNTS
1053                .iter()
1054                .filter(|c| c.load(Ordering::SeqCst) > 0)
1055                .count();
1056
1057            assert!(
1058                active_receivers >= 2,
1059                "Messages should be distributed across receivers, but only {} received any",
1060                active_receivers
1061            );
1062
1063            // Clean up
1064            let stack = push(std::ptr::null_mut(), Value::Int(channel_id));
1065            let _ = close_channel(stack);
1066        }
1067    }
1068}