Skip to main content

seq_runtime/
channel.rs

1//! Channel operations for CSP-style concurrency
2//!
3//! Channels are the primary communication mechanism between strands.
4//! They use May's MPMC channels with cooperative blocking.
5//!
6//! ## Zero-Mutex Design
7//!
8//! Channels are passed directly as `Value::Channel` on the stack. There is NO
9//! global registry and NO mutex contention. Send/receive operations work directly
10//! on the channel handles with zero locking overhead.
11//!
12//! ## Non-Blocking Guarantee
13//!
14//! All channel operations (`send`, `receive`) cooperatively block using May's scheduler.
15//! They NEVER block OS threads - May handles scheduling other strands while waiting.
16//!
17//! ## Multi-Consumer Support
18//!
19//! Channels support multiple producers AND multiple consumers (MPMC). Multiple strands
20//! can receive from the same channel concurrently - each message is delivered to exactly
21//! one receiver (work-stealing semantics).
22//!
23//! ## Stack Effects
24//!
25//! - `chan.make`: ( -- Channel ) - creates a new channel
26//! - `chan.send`: ( value Channel -- Bool ) - sends value, returns success
27//! - `chan.receive`: ( Channel -- value Bool ) - receives value and success flag
28//!
29//! ## Error Handling
30//!
31//! All operations return success flags - errors are values, not crashes:
32//!
33//! - `chan.send`: ( value Channel -- Bool ) - returns true on success, false on closed
34//! - `chan.receive`: ( Channel -- value Bool ) - returns value and success flag
35
36use crate::stack::{Stack, pop, push};
37use crate::value::{ChannelData, Value};
38use may::sync::mpmc;
39use std::sync::Arc;
40
41#[cfg(feature = "diagnostics")]
42use std::sync::atomic::{AtomicU64, Ordering};
43
44#[cfg(feature = "diagnostics")]
45pub static TOTAL_MESSAGES_SENT: AtomicU64 = AtomicU64::new(0);
46#[cfg(feature = "diagnostics")]
47pub static TOTAL_MESSAGES_RECEIVED: AtomicU64 = AtomicU64::new(0);
48
49/// Create a new channel
50///
51/// Stack effect: ( -- Channel )
52///
53/// Returns a Channel value that can be used with send/receive operations.
54/// The channel can be duplicated (dup) to share between strands.
55///
56/// # Safety
57/// Always safe to call
58#[unsafe(no_mangle)]
59pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
60    // Create an unbounded MPMC channel
61    // May's mpmc::channel() creates coroutine-aware channels with multi-producer, multi-consumer
62    // The recv() operation cooperatively blocks (yields) instead of blocking the OS thread
63    let (sender, receiver) = mpmc::channel();
64
65    // Wrap in Arc<ChannelData> and push directly - NO registry, NO mutex
66    let channel = Arc::new(ChannelData { sender, receiver });
67
68    unsafe { push(stack, Value::Channel(channel)) }
69}
70
71/// Close a channel (drop it from the stack)
72///
73/// Stack effect: ( Channel -- )
74///
75/// Simply drops the channel. When all references are dropped, the channel is closed.
76/// This is provided for API compatibility but is equivalent to `drop`.
77///
78/// # Safety
79/// Stack must have a Channel on top
80#[unsafe(no_mangle)]
81pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
82    assert!(!stack.is_null(), "close_channel: stack is empty");
83
84    // Pop and drop the channel
85    let (rest, channel_value) = unsafe { pop(stack) };
86    match channel_value {
87        Value::Channel(_) => {} // Drop occurs here
88        _ => panic!(
89            "close_channel: expected Channel on stack, got {:?}",
90            channel_value
91        ),
92    }
93
94    rest
95}
96
97/// Send a value through a channel
98///
99/// Stack effect: ( value Channel -- Bool )
100///
101/// Returns true on success, false on failure (closed channel).
102/// Errors are values, not crashes.
103///
104/// # Safety
105/// Stack must have a Channel on top and a value below it
106#[unsafe(no_mangle)]
107pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
108    assert!(!stack.is_null(), "chan.send: stack is empty");
109
110    // Pop channel
111    let (stack, channel_value) = unsafe { pop(stack) };
112    let channel = match channel_value {
113        Value::Channel(ch) => ch,
114        _ => {
115            // Wrong type - consume value and return failure
116            if !stack.is_null() {
117                let (rest, _value) = unsafe { pop(stack) };
118                return unsafe { push(rest, Value::Bool(false)) };
119            }
120            return unsafe { push(stack, Value::Bool(false)) };
121        }
122    };
123
124    if stack.is_null() {
125        // No value to send - return failure
126        return unsafe { push(stack, Value::Bool(false)) };
127    }
128
129    // Pop value to send
130    let (rest, value) = unsafe { pop(stack) };
131
132    // Clone the value before sending
133    let global_value = value.clone();
134
135    // Send the value
136    match channel.sender.send(global_value) {
137        Ok(()) => {
138            #[cfg(feature = "diagnostics")]
139            TOTAL_MESSAGES_SENT.fetch_add(1, Ordering::Relaxed);
140            unsafe { push(rest, Value::Bool(true)) }
141        }
142        Err(_) => unsafe { push(rest, Value::Bool(false)) },
143    }
144}
145
146/// Receive a value from a channel
147///
148/// Stack effect: ( Channel -- value Bool )
149///
150/// Returns (value, true) on success, (0, false) on failure (closed channel).
151/// Errors are values, not crashes.
152///
153/// ## Multi-Consumer Support
154///
155/// Multiple strands can receive from the same channel concurrently (MPMC).
156/// Each message is delivered to exactly one receiver (work-stealing semantics).
157///
158/// # Safety
159/// Stack must have a Channel on top
160#[unsafe(no_mangle)]
161pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
162    assert!(!stack.is_null(), "chan.receive: stack is empty");
163
164    // Pop channel
165    let (rest, channel_value) = unsafe { pop(stack) };
166    let channel = match channel_value {
167        Value::Channel(ch) => ch,
168        _ => {
169            // Wrong type - return failure
170            let stack = unsafe { push(rest, Value::Int(0)) };
171            return unsafe { push(stack, Value::Bool(false)) };
172        }
173    };
174
175    // Receive a value
176    match channel.receiver.recv() {
177        Ok(value) => {
178            #[cfg(feature = "diagnostics")]
179            TOTAL_MESSAGES_RECEIVED.fetch_add(1, Ordering::Relaxed);
180            let stack = unsafe { push(rest, value) };
181            unsafe { push(stack, Value::Bool(true)) }
182        }
183        Err(_) => {
184            let stack = unsafe { push(rest, Value::Int(0)) };
185            unsafe { push(stack, Value::Bool(false)) }
186        }
187    }
188}
189
190// Public re-exports with short names for internal use
191pub use patch_seq_chan_receive as receive;
192pub use patch_seq_chan_send as send;
193pub use patch_seq_close_channel as close_channel;
194pub use patch_seq_make_channel as make_channel;
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::scheduler::{spawn_strand, wait_all_strands};
200    use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
201
202    #[test]
203    fn test_make_channel() {
204        unsafe {
205            let stack = crate::stack::alloc_test_stack();
206            let stack = make_channel(stack);
207
208            // Should have Channel on stack
209            let (_stack, value) = pop(stack);
210            assert!(matches!(value, Value::Channel(_)));
211        }
212    }
213
214    #[test]
215    fn test_send_receive() {
216        unsafe {
217            // Create a channel
218            let mut stack = crate::stack::alloc_test_stack();
219            stack = make_channel(stack);
220
221            // Get channel (but keep it on stack for receive via dup-like pattern)
222            let (_empty_stack, channel_value) = pop(stack);
223
224            // Push value to send, then channel
225            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
226            stack = push(stack, channel_value.clone());
227            stack = send(stack);
228
229            // Check send succeeded
230            let (stack, send_success) = pop(stack);
231            assert_eq!(send_success, Value::Bool(true));
232
233            // Receive value
234            let mut stack = push(stack, channel_value);
235            stack = receive(stack);
236
237            // Check receive succeeded and got correct value
238            let (stack, recv_success) = pop(stack);
239            let (_stack, received) = pop(stack);
240            assert_eq!(recv_success, Value::Bool(true));
241            assert_eq!(received, Value::Int(42));
242        }
243    }
244
245    #[test]
246    fn test_channel_dup_sharing() {
247        // Verify that duplicating a channel shares the same underlying sender/receiver
248        unsafe {
249            let mut stack = crate::stack::alloc_test_stack();
250            stack = make_channel(stack);
251
252            let (_, ch1) = pop(stack);
253            let ch2 = ch1.clone(); // Simulates dup
254
255            // Send on ch1
256            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
257            stack = push(stack, ch1);
258            stack = send(stack);
259
260            // Pop send success flag
261            let (stack, _) = pop(stack);
262
263            // Receive on ch2
264            let mut stack = push(stack, ch2);
265            stack = receive(stack);
266
267            // Pop success flag then value
268            let (stack, _) = pop(stack);
269            let (_, received) = pop(stack);
270            assert_eq!(received, Value::Int(99));
271        }
272    }
273
274    #[test]
275    fn test_multiple_sends_receives() {
276        unsafe {
277            // Create a channel
278            let mut stack = crate::stack::alloc_test_stack();
279            stack = make_channel(stack);
280            let (_, channel_value) = pop(stack);
281
282            // Send multiple values
283            for i in 1..=5 {
284                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
285                stack = push(stack, channel_value.clone());
286                stack = send(stack);
287                let (_, success) = pop(stack);
288                assert_eq!(success, Value::Bool(true));
289            }
290
291            // Receive them back in order
292            for i in 1..=5 {
293                let mut stack = push(crate::stack::alloc_test_stack(), channel_value.clone());
294                stack = receive(stack);
295                let (stack, success) = pop(stack);
296                let (_, received) = pop(stack);
297                assert_eq!(success, Value::Bool(true));
298                assert_eq!(received, Value::Int(i));
299            }
300        }
301    }
302
303    #[test]
304    fn test_close_channel() {
305        unsafe {
306            // Create and close a channel
307            let mut stack = crate::stack::alloc_test_stack();
308            stack = make_channel(stack);
309
310            let _stack = close_channel(stack);
311        }
312    }
313
314    #[test]
315    fn test_arena_string_send_between_strands() {
316        // Verify that arena-allocated strings are properly cloned to global storage
317        unsafe {
318            static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
319            static VERIFIED: AtomicBool = AtomicBool::new(false);
320
321            // Create a channel
322            let mut stack = crate::stack::alloc_test_stack();
323            stack = make_channel(stack);
324            let (_, channel_value) = pop(stack);
325
326            // Store channel pointer for strands (hacky but works for test)
327            let ch_ptr = match &channel_value {
328                Value::Channel(arc) => Arc::as_ptr(arc) as i64,
329                _ => panic!("Expected Channel"),
330            };
331            CHANNEL_PTR.store(ch_ptr, Ordering::Release);
332
333            // Keep the Arc alive
334            std::mem::forget(channel_value.clone());
335
336            // Sender strand
337            extern "C" fn sender(_stack: Stack) -> Stack {
338                use crate::seqstring::arena_string;
339                use crate::value::ChannelData;
340
341                unsafe {
342                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
343                    let channel = Arc::from_raw(ch_ptr);
344                    let channel_clone = Arc::clone(&channel);
345                    std::mem::forget(channel); // Don't drop
346
347                    // Create arena string (fast path)
348                    let msg = arena_string("Arena message!");
349                    assert!(!msg.is_global(), "Should be arena-allocated initially");
350
351                    // Send through channel
352                    let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
353                    let stack = push(stack, Value::Channel(channel_clone));
354                    let stack = send(stack);
355                    // Pop success flag (we trust it worked for this test)
356                    let (stack, _) = pop(stack);
357                    stack
358                }
359            }
360
361            // Receiver strand
362            extern "C" fn receiver(_stack: Stack) -> Stack {
363                use crate::value::ChannelData;
364
365                unsafe {
366                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
367                    let channel = Arc::from_raw(ch_ptr);
368                    let channel_clone = Arc::clone(&channel);
369                    std::mem::forget(channel); // Don't drop
370
371                    let mut stack = push(
372                        crate::stack::alloc_test_stack(),
373                        Value::Channel(channel_clone),
374                    );
375                    stack = receive(stack);
376                    // Pop success flag first
377                    let (stack, _) = pop(stack);
378                    let (_, msg_val) = pop(stack);
379
380                    match msg_val {
381                        Value::String(s) => {
382                            assert_eq!(s.as_str(), "Arena message!");
383                            assert!(s.is_global(), "Received string should be global");
384                            VERIFIED.store(true, Ordering::Release);
385                        }
386                        _ => panic!("Expected String"),
387                    }
388
389                    std::ptr::null_mut()
390                }
391            }
392
393            spawn_strand(sender);
394            spawn_strand(receiver);
395            wait_all_strands();
396
397            assert!(
398                VERIFIED.load(Ordering::Acquire),
399                "Receiver should have verified"
400            );
401        }
402    }
403
404    #[test]
405    fn test_send_success() {
406        unsafe {
407            let mut stack = crate::stack::alloc_test_stack();
408            stack = make_channel(stack);
409            let (_, channel_value) = pop(stack);
410
411            // Send value
412            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
413            stack = push(stack, channel_value.clone());
414            stack = send(stack);
415
416            // Should return success (true)
417            let (stack, result) = pop(stack);
418            assert_eq!(result, Value::Bool(true));
419
420            // Receive to verify
421            let mut stack = push(stack, channel_value);
422            stack = receive(stack);
423            let (stack, success) = pop(stack);
424            let (_, received) = pop(stack);
425            assert_eq!(success, Value::Bool(true));
426            assert_eq!(received, Value::Int(42));
427        }
428    }
429
430    #[test]
431    fn test_send_wrong_type() {
432        unsafe {
433            // Try to send with Int instead of Channel
434            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
435            stack = push(stack, Value::Int(999)); // Wrong type
436            stack = send(stack);
437
438            // Should return failure (false)
439            let (_stack, result) = pop(stack);
440            assert_eq!(result, Value::Bool(false));
441        }
442    }
443
444    #[test]
445    fn test_receive_success() {
446        unsafe {
447            let mut stack = crate::stack::alloc_test_stack();
448            stack = make_channel(stack);
449            let (_, channel_value) = pop(stack);
450
451            // Send value
452            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
453            stack = push(stack, channel_value.clone());
454            stack = send(stack);
455            let (_, _) = pop(stack); // pop send success
456
457            // Receive
458            let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
459            stack = receive(stack);
460
461            // Should return (value, true)
462            let (stack, success) = pop(stack);
463            let (_stack, value) = pop(stack);
464            assert_eq!(success, Value::Bool(true));
465            assert_eq!(value, Value::Int(42));
466        }
467    }
468
469    #[test]
470    fn test_receive_wrong_type() {
471        unsafe {
472            // Try to receive with Int instead of Channel
473            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(999));
474            stack = receive(stack);
475
476            // Should return (0, false)
477            let (stack, success) = pop(stack);
478            let (_stack, value) = pop(stack);
479            assert_eq!(success, Value::Bool(false));
480            assert_eq!(value, Value::Int(0));
481        }
482    }
483
484    #[test]
485    fn test_mpmc_concurrent_receivers() {
486        // Verify that multiple receivers work with MPMC
487        unsafe {
488            const NUM_MESSAGES: i64 = 100;
489            const NUM_RECEIVERS: usize = 4;
490
491            static RECEIVER_COUNTS: [AtomicI64; 4] = [
492                AtomicI64::new(0),
493                AtomicI64::new(0),
494                AtomicI64::new(0),
495                AtomicI64::new(0),
496            ];
497            static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
498
499            // Reset counters
500            for counter in &RECEIVER_COUNTS {
501                counter.store(0, Ordering::SeqCst);
502            }
503
504            // Create channel
505            let mut stack = crate::stack::alloc_test_stack();
506            stack = make_channel(stack);
507            let (_, channel_value) = pop(stack);
508
509            let ch_ptr = match &channel_value {
510                Value::Channel(arc) => Arc::as_ptr(arc) as i64,
511                _ => panic!("Expected Channel"),
512            };
513            CHANNEL_PTR.store(ch_ptr, Ordering::SeqCst);
514
515            // Keep Arc alive
516            for _ in 0..(NUM_RECEIVERS + 1) {
517                std::mem::forget(channel_value.clone());
518            }
519
520            fn make_receiver(idx: usize) -> extern "C" fn(Stack) -> Stack {
521                match idx {
522                    0 => receiver_0,
523                    1 => receiver_1,
524                    2 => receiver_2,
525                    3 => receiver_3,
526                    _ => panic!("Invalid receiver index"),
527                }
528            }
529
530            extern "C" fn receiver_0(stack: Stack) -> Stack {
531                receive_loop(0, stack)
532            }
533            extern "C" fn receiver_1(stack: Stack) -> Stack {
534                receive_loop(1, stack)
535            }
536            extern "C" fn receiver_2(stack: Stack) -> Stack {
537                receive_loop(2, stack)
538            }
539            extern "C" fn receiver_3(stack: Stack) -> Stack {
540                receive_loop(3, stack)
541            }
542
543            fn receive_loop(idx: usize, _stack: Stack) -> Stack {
544                use crate::value::ChannelData;
545                unsafe {
546                    let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
547                    let channel = Arc::from_raw(ch_ptr);
548                    let channel_clone = Arc::clone(&channel);
549                    std::mem::forget(channel);
550
551                    loop {
552                        let mut stack = push(
553                            crate::stack::alloc_test_stack(),
554                            Value::Channel(channel_clone.clone()),
555                        );
556                        stack = receive(stack);
557                        let (stack, success) = pop(stack);
558                        let (_, value) = pop(stack);
559
560                        match (success, value) {
561                            (Value::Bool(true), Value::Int(v)) => {
562                                if v < 0 {
563                                    break; // Sentinel
564                                }
565                                RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
566                            }
567                            _ => break,
568                        }
569                        may::coroutine::yield_now();
570                    }
571                    std::ptr::null_mut()
572                }
573            }
574
575            // Spawn receivers
576            for i in 0..NUM_RECEIVERS {
577                spawn_strand(make_receiver(i));
578            }
579
580            std::thread::sleep(std::time::Duration::from_millis(10));
581
582            // Send messages
583            for i in 0..NUM_MESSAGES {
584                let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
585                let channel = Arc::from_raw(ch_ptr);
586                let channel_clone = Arc::clone(&channel);
587                std::mem::forget(channel);
588
589                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
590                stack = push(stack, Value::Channel(channel_clone));
591                let _ = send(stack);
592            }
593
594            // Send sentinels
595            for _ in 0..NUM_RECEIVERS {
596                let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
597                let channel = Arc::from_raw(ch_ptr);
598                let channel_clone = Arc::clone(&channel);
599                std::mem::forget(channel);
600
601                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(-1));
602                stack = push(stack, Value::Channel(channel_clone));
603                let _ = send(stack);
604            }
605
606            wait_all_strands();
607
608            let total_received: i64 = RECEIVER_COUNTS
609                .iter()
610                .map(|c| c.load(Ordering::SeqCst))
611                .sum();
612            assert_eq!(total_received, NUM_MESSAGES);
613
614            let active_receivers = RECEIVER_COUNTS
615                .iter()
616                .filter(|c| c.load(Ordering::SeqCst) > 0)
617                .count();
618            assert!(active_receivers >= 2, "Messages should be distributed");
619        }
620    }
621}