seq_runtime/
channel.rs

1//! Channel operations for CSP-style concurrency
2//!
3//! Channels are the primary communication mechanism between strands.
4//! They use May's MPMC channels with cooperative blocking.
5//!
6//! ## Zero-Mutex Design
7//!
8//! Channels are passed directly as `Value::Channel` on the stack. There is NO
9//! global registry and NO mutex contention. Send/receive operations work directly
10//! on the channel handles with zero locking overhead.
11//!
12//! ## Non-Blocking Guarantee
13//!
14//! All channel operations (`send`, `receive`) cooperatively block using May's scheduler.
15//! They NEVER block OS threads - May handles scheduling other strands while waiting.
16//!
17//! ## Multi-Consumer Support
18//!
19//! Channels support multiple producers AND multiple consumers (MPMC). Multiple strands
20//! can receive from the same channel concurrently - each message is delivered to exactly
21//! one receiver (work-stealing semantics).
22//!
23//! ## Stack Effects
24//!
25//! - `chan.make`: ( -- Channel ) - creates a new channel
26//! - `chan.send`: ( value Channel -- Bool ) - sends value, returns success
27//! - `chan.receive`: ( Channel -- value Bool ) - receives value and success flag
28//!
29//! ## Error Handling
30//!
31//! All operations return success flags - errors are values, not crashes:
32//!
33//! - `chan.send`: ( value Channel -- Bool ) - returns true on success, false on closed
34//! - `chan.receive`: ( Channel -- value Bool ) - returns value and success flag
35
36use crate::stack::{Stack, pop, push};
37use crate::value::{ChannelData, Value};
38use may::sync::mpmc;
39use std::sync::Arc;
40
41/// Create a new channel
42///
43/// Stack effect: ( -- Channel )
44///
45/// Returns a Channel value that can be used with send/receive operations.
46/// The channel can be duplicated (dup) to share between strands.
47///
48/// # Safety
49/// Always safe to call
50#[unsafe(no_mangle)]
51pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
52    // Create an unbounded MPMC channel
53    // May's mpmc::channel() creates coroutine-aware channels with multi-producer, multi-consumer
54    // The recv() operation cooperatively blocks (yields) instead of blocking the OS thread
55    let (sender, receiver) = mpmc::channel();
56
57    // Wrap in Arc<ChannelData> and push directly - NO registry, NO mutex
58    let channel = Arc::new(ChannelData { sender, receiver });
59
60    unsafe { push(stack, Value::Channel(channel)) }
61}
62
63/// Close a channel (drop it from the stack)
64///
65/// Stack effect: ( Channel -- )
66///
67/// Simply drops the channel. When all references are dropped, the channel is closed.
68/// This is provided for API compatibility but is equivalent to `drop`.
69///
70/// # Safety
71/// Stack must have a Channel on top
72#[unsafe(no_mangle)]
73pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
74    assert!(!stack.is_null(), "close_channel: stack is empty");
75
76    // Pop and drop the channel
77    let (rest, channel_value) = unsafe { pop(stack) };
78    match channel_value {
79        Value::Channel(_) => {} // Drop occurs here
80        _ => panic!(
81            "close_channel: expected Channel on stack, got {:?}",
82            channel_value
83        ),
84    }
85
86    rest
87}
88
89/// Send a value through a channel
90///
91/// Stack effect: ( value Channel -- Bool )
92///
93/// Returns true on success, false on failure (closed channel).
94/// Errors are values, not crashes.
95///
96/// # Safety
97/// Stack must have a Channel on top and a value below it
98#[unsafe(no_mangle)]
99pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
100    assert!(!stack.is_null(), "chan.send: stack is empty");
101
102    // Pop channel
103    let (stack, channel_value) = unsafe { pop(stack) };
104    let channel = match channel_value {
105        Value::Channel(ch) => ch,
106        _ => {
107            // Wrong type - consume value and return failure
108            if !stack.is_null() {
109                let (rest, _value) = unsafe { pop(stack) };
110                return unsafe { push(rest, Value::Bool(false)) };
111            }
112            return unsafe { push(stack, Value::Bool(false)) };
113        }
114    };
115
116    if stack.is_null() {
117        // No value to send - return failure
118        return unsafe { push(stack, Value::Bool(false)) };
119    }
120
121    // Pop value to send
122    let (rest, value) = unsafe { pop(stack) };
123
124    // Clone the value before sending
125    let global_value = value.clone();
126
127    // Send the value
128    match channel.sender.send(global_value) {
129        Ok(()) => unsafe { push(rest, Value::Bool(true)) },
130        Err(_) => unsafe { push(rest, Value::Bool(false)) },
131    }
132}
133
134/// Receive a value from a channel
135///
136/// Stack effect: ( Channel -- value Bool )
137///
138/// Returns (value, true) on success, (0, false) on failure (closed channel).
139/// Errors are values, not crashes.
140///
141/// ## Multi-Consumer Support
142///
143/// Multiple strands can receive from the same channel concurrently (MPMC).
144/// Each message is delivered to exactly one receiver (work-stealing semantics).
145///
146/// # Safety
147/// Stack must have a Channel on top
148#[unsafe(no_mangle)]
149pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
150    assert!(!stack.is_null(), "chan.receive: stack is empty");
151
152    // Pop channel
153    let (rest, channel_value) = unsafe { pop(stack) };
154    let channel = match channel_value {
155        Value::Channel(ch) => ch,
156        _ => {
157            // Wrong type - return failure
158            let stack = unsafe { push(rest, Value::Int(0)) };
159            return unsafe { push(stack, Value::Bool(false)) };
160        }
161    };
162
163    // Receive a value
164    match channel.receiver.recv() {
165        Ok(value) => {
166            let stack = unsafe { push(rest, value) };
167            unsafe { push(stack, Value::Bool(true)) }
168        }
169        Err(_) => {
170            let stack = unsafe { push(rest, Value::Int(0)) };
171            unsafe { push(stack, Value::Bool(false)) }
172        }
173    }
174}
175
176// Public re-exports with short names for internal use
177pub use patch_seq_chan_receive as receive;
178pub use patch_seq_chan_send as send;
179pub use patch_seq_close_channel as close_channel;
180pub use patch_seq_make_channel as make_channel;
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::scheduler::{spawn_strand, wait_all_strands};
186    use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
187
188    #[test]
189    fn test_make_channel() {
190        unsafe {
191            let stack = crate::stack::alloc_test_stack();
192            let stack = make_channel(stack);
193
194            // Should have Channel on stack
195            let (_stack, value) = pop(stack);
196            assert!(matches!(value, Value::Channel(_)));
197        }
198    }
199
200    #[test]
201    fn test_send_receive() {
202        unsafe {
203            // Create a channel
204            let mut stack = crate::stack::alloc_test_stack();
205            stack = make_channel(stack);
206
207            // Get channel (but keep it on stack for receive via dup-like pattern)
208            let (_empty_stack, channel_value) = pop(stack);
209
210            // Push value to send, then channel
211            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
212            stack = push(stack, channel_value.clone());
213            stack = send(stack);
214
215            // Check send succeeded
216            let (stack, send_success) = pop(stack);
217            assert_eq!(send_success, Value::Bool(true));
218
219            // Receive value
220            let mut stack = push(stack, channel_value);
221            stack = receive(stack);
222
223            // Check receive succeeded and got correct value
224            let (stack, recv_success) = pop(stack);
225            let (_stack, received) = pop(stack);
226            assert_eq!(recv_success, Value::Bool(true));
227            assert_eq!(received, Value::Int(42));
228        }
229    }
230
231    #[test]
232    fn test_channel_dup_sharing() {
233        // Verify that duplicating a channel shares the same underlying sender/receiver
234        unsafe {
235            let mut stack = crate::stack::alloc_test_stack();
236            stack = make_channel(stack);
237
238            let (_, ch1) = pop(stack);
239            let ch2 = ch1.clone(); // Simulates dup
240
241            // Send on ch1
242            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
243            stack = push(stack, ch1);
244            stack = send(stack);
245
246            // Pop send success flag
247            let (stack, _) = pop(stack);
248
249            // Receive on ch2
250            let mut stack = push(stack, ch2);
251            stack = receive(stack);
252
253            // Pop success flag then value
254            let (stack, _) = pop(stack);
255            let (_, received) = pop(stack);
256            assert_eq!(received, Value::Int(99));
257        }
258    }
259
260    #[test]
261    fn test_multiple_sends_receives() {
262        unsafe {
263            // Create a channel
264            let mut stack = crate::stack::alloc_test_stack();
265            stack = make_channel(stack);
266            let (_, channel_value) = pop(stack);
267
268            // Send multiple values
269            for i in 1..=5 {
270                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
271                stack = push(stack, channel_value.clone());
272                stack = send(stack);
273                let (_, success) = pop(stack);
274                assert_eq!(success, Value::Bool(true));
275            }
276
277            // Receive them back in order
278            for i in 1..=5 {
279                let mut stack = push(crate::stack::alloc_test_stack(), channel_value.clone());
280                stack = receive(stack);
281                let (stack, success) = pop(stack);
282                let (_, received) = pop(stack);
283                assert_eq!(success, Value::Bool(true));
284                assert_eq!(received, Value::Int(i));
285            }
286        }
287    }
288
289    #[test]
290    fn test_close_channel() {
291        unsafe {
292            // Create and close a channel
293            let mut stack = crate::stack::alloc_test_stack();
294            stack = make_channel(stack);
295
296            let _stack = close_channel(stack);
297        }
298    }
299
300    #[test]
301    fn test_arena_string_send_between_strands() {
302        // Verify that arena-allocated strings are properly cloned to global storage
303        unsafe {
304            static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
305            static VERIFIED: AtomicBool = AtomicBool::new(false);
306
307            // Create a channel
308            let mut stack = crate::stack::alloc_test_stack();
309            stack = make_channel(stack);
310            let (_, channel_value) = pop(stack);
311
312            // Store channel pointer for strands (hacky but works for test)
313            let ch_ptr = match &channel_value {
314                Value::Channel(arc) => Arc::as_ptr(arc) as i64,
315                _ => panic!("Expected Channel"),
316            };
317            CHANNEL_PTR.store(ch_ptr, Ordering::Release);
318
319            // Keep the Arc alive
320            std::mem::forget(channel_value.clone());
321
322            // Sender strand
323            extern "C" fn sender(_stack: Stack) -> Stack {
324                use crate::seqstring::arena_string;
325                use crate::value::ChannelData;
326
327                unsafe {
328                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
329                    let channel = Arc::from_raw(ch_ptr);
330                    let channel_clone = Arc::clone(&channel);
331                    std::mem::forget(channel); // Don't drop
332
333                    // Create arena string (fast path)
334                    let msg = arena_string("Arena message!");
335                    assert!(!msg.is_global(), "Should be arena-allocated initially");
336
337                    // Send through channel
338                    let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
339                    let stack = push(stack, Value::Channel(channel_clone));
340                    let stack = send(stack);
341                    // Pop success flag (we trust it worked for this test)
342                    let (stack, _) = pop(stack);
343                    stack
344                }
345            }
346
347            // Receiver strand
348            extern "C" fn receiver(_stack: Stack) -> Stack {
349                use crate::value::ChannelData;
350
351                unsafe {
352                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
353                    let channel = Arc::from_raw(ch_ptr);
354                    let channel_clone = Arc::clone(&channel);
355                    std::mem::forget(channel); // Don't drop
356
357                    let mut stack = push(
358                        crate::stack::alloc_test_stack(),
359                        Value::Channel(channel_clone),
360                    );
361                    stack = receive(stack);
362                    // Pop success flag first
363                    let (stack, _) = pop(stack);
364                    let (_, msg_val) = pop(stack);
365
366                    match msg_val {
367                        Value::String(s) => {
368                            assert_eq!(s.as_str(), "Arena message!");
369                            assert!(s.is_global(), "Received string should be global");
370                            VERIFIED.store(true, Ordering::Release);
371                        }
372                        _ => panic!("Expected String"),
373                    }
374
375                    std::ptr::null_mut()
376                }
377            }
378
379            spawn_strand(sender);
380            spawn_strand(receiver);
381            wait_all_strands();
382
383            assert!(
384                VERIFIED.load(Ordering::Acquire),
385                "Receiver should have verified"
386            );
387        }
388    }
389
390    #[test]
391    fn test_send_success() {
392        unsafe {
393            let mut stack = crate::stack::alloc_test_stack();
394            stack = make_channel(stack);
395            let (_, channel_value) = pop(stack);
396
397            // Send value
398            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
399            stack = push(stack, channel_value.clone());
400            stack = send(stack);
401
402            // Should return success (true)
403            let (stack, result) = pop(stack);
404            assert_eq!(result, Value::Bool(true));
405
406            // Receive to verify
407            let mut stack = push(stack, channel_value);
408            stack = receive(stack);
409            let (stack, success) = pop(stack);
410            let (_, received) = pop(stack);
411            assert_eq!(success, Value::Bool(true));
412            assert_eq!(received, Value::Int(42));
413        }
414    }
415
416    #[test]
417    fn test_send_wrong_type() {
418        unsafe {
419            // Try to send with Int instead of Channel
420            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
421            stack = push(stack, Value::Int(999)); // Wrong type
422            stack = send(stack);
423
424            // Should return failure (false)
425            let (_stack, result) = pop(stack);
426            assert_eq!(result, Value::Bool(false));
427        }
428    }
429
430    #[test]
431    fn test_receive_success() {
432        unsafe {
433            let mut stack = crate::stack::alloc_test_stack();
434            stack = make_channel(stack);
435            let (_, channel_value) = pop(stack);
436
437            // Send value
438            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
439            stack = push(stack, channel_value.clone());
440            stack = send(stack);
441            let (_, _) = pop(stack); // pop send success
442
443            // Receive
444            let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
445            stack = receive(stack);
446
447            // Should return (value, true)
448            let (stack, success) = pop(stack);
449            let (_stack, value) = pop(stack);
450            assert_eq!(success, Value::Bool(true));
451            assert_eq!(value, Value::Int(42));
452        }
453    }
454
455    #[test]
456    fn test_receive_wrong_type() {
457        unsafe {
458            // Try to receive with Int instead of Channel
459            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(999));
460            stack = receive(stack);
461
462            // Should return (0, false)
463            let (stack, success) = pop(stack);
464            let (_stack, value) = pop(stack);
465            assert_eq!(success, Value::Bool(false));
466            assert_eq!(value, Value::Int(0));
467        }
468    }
469
470    #[test]
471    fn test_mpmc_concurrent_receivers() {
472        // Verify that multiple receivers work with MPMC
473        unsafe {
474            const NUM_MESSAGES: i64 = 100;
475            const NUM_RECEIVERS: usize = 4;
476
477            static RECEIVER_COUNTS: [AtomicI64; 4] = [
478                AtomicI64::new(0),
479                AtomicI64::new(0),
480                AtomicI64::new(0),
481                AtomicI64::new(0),
482            ];
483            static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
484
485            // Reset counters
486            for counter in &RECEIVER_COUNTS {
487                counter.store(0, Ordering::SeqCst);
488            }
489
490            // Create channel
491            let mut stack = crate::stack::alloc_test_stack();
492            stack = make_channel(stack);
493            let (_, channel_value) = pop(stack);
494
495            let ch_ptr = match &channel_value {
496                Value::Channel(arc) => Arc::as_ptr(arc) as i64,
497                _ => panic!("Expected Channel"),
498            };
499            CHANNEL_PTR.store(ch_ptr, Ordering::SeqCst);
500
501            // Keep Arc alive
502            for _ in 0..(NUM_RECEIVERS + 1) {
503                std::mem::forget(channel_value.clone());
504            }
505
506            fn make_receiver(idx: usize) -> extern "C" fn(Stack) -> Stack {
507                match idx {
508                    0 => receiver_0,
509                    1 => receiver_1,
510                    2 => receiver_2,
511                    3 => receiver_3,
512                    _ => panic!("Invalid receiver index"),
513                }
514            }
515
516            extern "C" fn receiver_0(stack: Stack) -> Stack {
517                receive_loop(0, stack)
518            }
519            extern "C" fn receiver_1(stack: Stack) -> Stack {
520                receive_loop(1, stack)
521            }
522            extern "C" fn receiver_2(stack: Stack) -> Stack {
523                receive_loop(2, stack)
524            }
525            extern "C" fn receiver_3(stack: Stack) -> Stack {
526                receive_loop(3, stack)
527            }
528
529            fn receive_loop(idx: usize, _stack: Stack) -> Stack {
530                use crate::value::ChannelData;
531                unsafe {
532                    let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
533                    let channel = Arc::from_raw(ch_ptr);
534                    let channel_clone = Arc::clone(&channel);
535                    std::mem::forget(channel);
536
537                    loop {
538                        let mut stack = push(
539                            crate::stack::alloc_test_stack(),
540                            Value::Channel(channel_clone.clone()),
541                        );
542                        stack = receive(stack);
543                        let (stack, success) = pop(stack);
544                        let (_, value) = pop(stack);
545
546                        match (success, value) {
547                            (Value::Bool(true), Value::Int(v)) => {
548                                if v < 0 {
549                                    break; // Sentinel
550                                }
551                                RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
552                            }
553                            _ => break,
554                        }
555                        may::coroutine::yield_now();
556                    }
557                    std::ptr::null_mut()
558                }
559            }
560
561            // Spawn receivers
562            for i in 0..NUM_RECEIVERS {
563                spawn_strand(make_receiver(i));
564            }
565
566            std::thread::sleep(std::time::Duration::from_millis(10));
567
568            // Send messages
569            for i in 0..NUM_MESSAGES {
570                let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
571                let channel = Arc::from_raw(ch_ptr);
572                let channel_clone = Arc::clone(&channel);
573                std::mem::forget(channel);
574
575                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
576                stack = push(stack, Value::Channel(channel_clone));
577                let _ = send(stack);
578            }
579
580            // Send sentinels
581            for _ in 0..NUM_RECEIVERS {
582                let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
583                let channel = Arc::from_raw(ch_ptr);
584                let channel_clone = Arc::clone(&channel);
585                std::mem::forget(channel);
586
587                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(-1));
588                stack = push(stack, Value::Channel(channel_clone));
589                let _ = send(stack);
590            }
591
592            wait_all_strands();
593
594            let total_received: i64 = RECEIVER_COUNTS
595                .iter()
596                .map(|c| c.load(Ordering::SeqCst))
597                .sum();
598            assert_eq!(total_received, NUM_MESSAGES);
599
600            let active_receivers = RECEIVER_COUNTS
601                .iter()
602                .filter(|c| c.load(Ordering::SeqCst) > 0)
603                .count();
604            assert!(active_receivers >= 2, "Messages should be distributed");
605        }
606    }
607}