seq_runtime/
channel.rs

1//! Channel operations for CSP-style concurrency
2//!
3//! Channels are the primary communication mechanism between strands.
4//! They use May's MPMC channels with cooperative blocking.
5//!
6//! ## Zero-Mutex Design
7//!
8//! Channels are passed directly as `Value::Channel` on the stack. There is NO
9//! global registry and NO mutex contention. Send/receive operations work directly
10//! on the channel handles with zero locking overhead.
11//!
12//! ## Non-Blocking Guarantee
13//!
14//! All channel operations (`send`, `receive`) cooperatively block using May's scheduler.
15//! They NEVER block OS threads - May handles scheduling other strands while waiting.
16//!
17//! ## Multi-Consumer Support
18//!
19//! Channels support multiple producers AND multiple consumers (MPMC). Multiple strands
20//! can receive from the same channel concurrently - each message is delivered to exactly
21//! one receiver (work-stealing semantics).
22//!
23//! ## Stack Effects
24//!
25//! - `chan.make`: ( -- Channel ) - creates a new channel
26//! - `chan.send`: ( value Channel -- ) - sends value through channel
27//! - `chan.receive`: ( Channel -- value ) - receives value from channel
28//!
29//! ## Error Handling
30//!
31//! Two variants are available for send/receive:
32//!
33//! - `send` / `receive` - Panic on errors (closed channel)
34//! - `send-safe` / `receive-safe` - Return success flag instead of panicking
35
36use crate::stack::{Stack, pop, push};
37use crate::value::{ChannelData, Value};
38use may::sync::mpmc;
39use std::sync::Arc;
40
41/// Create a new channel
42///
43/// Stack effect: ( -- Channel )
44///
45/// Returns a Channel value that can be used with send/receive operations.
46/// The channel can be duplicated (dup) to share between strands.
47///
48/// # Safety
49/// Always safe to call
50#[unsafe(no_mangle)]
51pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
52    // Create an unbounded MPMC channel
53    // May's mpmc::channel() creates coroutine-aware channels with multi-producer, multi-consumer
54    // The recv() operation cooperatively blocks (yields) instead of blocking the OS thread
55    let (sender, receiver) = mpmc::channel();
56
57    // Wrap in Arc<ChannelData> and push directly - NO registry, NO mutex
58    let channel = Arc::new(ChannelData { sender, receiver });
59
60    unsafe { push(stack, Value::Channel(channel)) }
61}
62
63/// Send a value through a channel
64///
65/// Stack effect: ( value Channel -- )
66///
67/// Cooperatively blocks if the channel is full until space becomes available.
68/// The strand yields and May handles scheduling.
69///
70/// # Safety
71/// Stack must have a Channel on top and a value below it
72#[unsafe(no_mangle)]
73pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
74    assert!(!stack.is_null(), "send: stack is empty");
75
76    // Pop channel
77    let (stack, channel_value) = unsafe { pop(stack) };
78    let channel = match channel_value {
79        Value::Channel(ch) => ch,
80        _ => panic!("send: expected Channel on stack, got {:?}", channel_value),
81    };
82
83    assert!(!stack.is_null(), "send: stack has only one value");
84
85    // Pop value to send
86    let (rest, value) = unsafe { pop(stack) };
87
88    // Clone the value before sending to ensure arena strings are promoted to global
89    // This prevents use-after-free when sender's arena is reset before receiver accesses
90    let global_value = value.clone();
91
92    // Send the value directly - NO mutex, NO registry lookup
93    // May's scheduler will handle the blocking cooperatively
94    channel
95        .sender
96        .send(global_value)
97        .expect("send: channel closed");
98
99    rest
100}
101
102/// Receive a value from a channel
103///
104/// Stack effect: ( Channel -- value )
105///
106/// Cooperatively blocks until a value is available.
107/// The strand yields and May handles scheduling.
108///
109/// ## Multi-Consumer Support
110///
111/// Multiple strands can receive from the same channel concurrently (MPMC).
112/// Each message is delivered to exactly one receiver (work-stealing semantics).
113///
114/// # Safety
115/// Stack must have a Channel on top
116#[unsafe(no_mangle)]
117pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
118    assert!(!stack.is_null(), "receive: stack is empty");
119
120    // Pop channel
121    let (rest, channel_value) = unsafe { pop(stack) };
122    let channel = match channel_value {
123        Value::Channel(ch) => ch,
124        _ => panic!(
125            "receive: expected Channel on stack, got {:?}",
126            channel_value
127        ),
128    };
129
130    // Receive a value directly - NO mutex, NO registry lookup
131    // May's recv() yields to the scheduler, not blocking the OS thread
132    let value = match channel.receiver.recv() {
133        Ok(v) => v,
134        Err(_) => panic!("receive: channel closed"),
135    };
136
137    unsafe { push(rest, value) }
138}
139
140/// Close a channel (drop it from the stack)
141///
142/// Stack effect: ( Channel -- )
143///
144/// Simply drops the channel. When all references are dropped, the channel is closed.
145/// This is provided for API compatibility but is equivalent to `drop`.
146///
147/// # Safety
148/// Stack must have a Channel on top
149#[unsafe(no_mangle)]
150pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
151    assert!(!stack.is_null(), "close_channel: stack is empty");
152
153    // Pop and drop the channel
154    let (rest, channel_value) = unsafe { pop(stack) };
155    match channel_value {
156        Value::Channel(_) => {} // Drop occurs here
157        _ => panic!(
158            "close_channel: expected Channel on stack, got {:?}",
159            channel_value
160        ),
161    }
162
163    rest
164}
165
166/// Send a value through a channel, with error handling
167///
168/// Stack effect: ( value Channel -- success_flag )
169///
170/// Returns 1 on success, 0 on failure (closed channel or wrong type).
171/// Does not panic on errors - returns 0 instead.
172///
173/// # Safety
174/// Stack must have a Channel on top and a value below it
175#[unsafe(no_mangle)]
176pub unsafe extern "C" fn patch_seq_chan_send_safe(stack: Stack) -> Stack {
177    assert!(!stack.is_null(), "send-safe: stack is empty");
178
179    // Pop channel
180    let (stack, channel_value) = unsafe { pop(stack) };
181    let channel = match channel_value {
182        Value::Channel(ch) => ch,
183        _ => {
184            // Wrong type - consume value and return failure
185            if !stack.is_null() {
186                let (rest, _value) = unsafe { pop(stack) };
187                return unsafe { push(rest, Value::Int(0)) };
188            }
189            return unsafe { push(stack, Value::Int(0)) };
190        }
191    };
192
193    if stack.is_null() {
194        // No value to send - return failure
195        return unsafe { push(stack, Value::Int(0)) };
196    }
197
198    // Pop value to send
199    let (rest, value) = unsafe { pop(stack) };
200
201    // Clone the value before sending
202    let global_value = value.clone();
203
204    // Send the value
205    match channel.sender.send(global_value) {
206        Ok(()) => unsafe { push(rest, Value::Int(1)) },
207        Err(_) => unsafe { push(rest, Value::Int(0)) },
208    }
209}
210
211/// Receive a value from a channel, with error handling
212///
213/// Stack effect: ( Channel -- value success_flag )
214///
215/// Returns (value, 1) on success, (0, 0) on failure (closed channel or wrong type).
216/// Does not panic on errors - returns (0, 0) instead.
217///
218/// ## Multi-Consumer Support
219///
220/// Multiple strands can receive from the same channel concurrently (MPMC).
221/// Each message is delivered to exactly one receiver (work-stealing semantics).
222///
223/// # Safety
224/// Stack must have a Channel on top
225#[unsafe(no_mangle)]
226pub unsafe extern "C" fn patch_seq_chan_receive_safe(stack: Stack) -> Stack {
227    assert!(!stack.is_null(), "receive-safe: stack is empty");
228
229    // Pop channel
230    let (rest, channel_value) = unsafe { pop(stack) };
231    let channel = match channel_value {
232        Value::Channel(ch) => ch,
233        _ => {
234            // Wrong type - return failure
235            let stack = unsafe { push(rest, Value::Int(0)) };
236            return unsafe { push(stack, Value::Int(0)) };
237        }
238    };
239
240    // Receive a value
241    match channel.receiver.recv() {
242        Ok(value) => {
243            let stack = unsafe { push(rest, value) };
244            unsafe { push(stack, Value::Int(1)) }
245        }
246        Err(_) => {
247            let stack = unsafe { push(rest, Value::Int(0)) };
248            unsafe { push(stack, Value::Int(0)) }
249        }
250    }
251}
252
253// Public re-exports with short names for internal use
254pub use patch_seq_chan_receive as receive;
255pub use patch_seq_chan_receive_safe as receive_safe;
256pub use patch_seq_chan_send as send;
257pub use patch_seq_chan_send_safe as send_safe;
258pub use patch_seq_close_channel as close_channel;
259pub use patch_seq_make_channel as make_channel;
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use crate::scheduler::{spawn_strand, wait_all_strands};
265    use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
266
267    #[test]
268    fn test_make_channel() {
269        unsafe {
270            let stack = crate::stack::alloc_test_stack();
271            let stack = make_channel(stack);
272
273            // Should have Channel on stack
274            let (_stack, value) = pop(stack);
275            assert!(matches!(value, Value::Channel(_)));
276        }
277    }
278
279    #[test]
280    fn test_send_receive() {
281        unsafe {
282            // Create a channel
283            let mut stack = crate::stack::alloc_test_stack();
284            stack = make_channel(stack);
285
286            // Get channel (but keep it on stack for receive via dup-like pattern)
287            let (_empty_stack, channel_value) = pop(stack);
288
289            // Push value to send, then channel
290            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
291            stack = push(stack, channel_value.clone());
292            stack = send(stack);
293
294            // Receive value
295            stack = push(stack, channel_value);
296            stack = receive(stack);
297
298            // Should have received value
299            let (_stack, received) = pop(stack);
300            assert_eq!(received, Value::Int(42));
301        }
302    }
303
304    #[test]
305    fn test_channel_dup_sharing() {
306        // Verify that duplicating a channel shares the same underlying sender/receiver
307        unsafe {
308            let mut stack = crate::stack::alloc_test_stack();
309            stack = make_channel(stack);
310
311            let (_, ch1) = pop(stack);
312            let ch2 = ch1.clone(); // Simulates dup
313
314            // Send on ch1
315            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
316            stack = push(stack, ch1);
317            stack = send(stack);
318
319            // Receive on ch2
320            stack = push(stack, ch2);
321            stack = receive(stack);
322
323            let (_, received) = pop(stack);
324            assert_eq!(received, Value::Int(99));
325        }
326    }
327
328    #[test]
329    fn test_multiple_sends_receives() {
330        unsafe {
331            // Create a channel
332            let mut stack = crate::stack::alloc_test_stack();
333            stack = make_channel(stack);
334            let (_, channel_value) = pop(stack);
335
336            // Send multiple values
337            for i in 1..=5 {
338                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
339                stack = push(stack, channel_value.clone());
340                let _ = send(stack);
341            }
342
343            // Receive them back in order
344            for i in 1..=5 {
345                let mut stack = push(crate::stack::alloc_test_stack(), channel_value.clone());
346                stack = receive(stack);
347                let (_, received) = pop(stack);
348                assert_eq!(received, Value::Int(i));
349            }
350        }
351    }
352
353    #[test]
354    fn test_close_channel() {
355        unsafe {
356            // Create and close a channel
357            let mut stack = crate::stack::alloc_test_stack();
358            stack = make_channel(stack);
359
360            let _stack = close_channel(stack);
361        }
362    }
363
364    #[test]
365    fn test_arena_string_send_between_strands() {
366        // Verify that arena-allocated strings are properly cloned to global storage
367        unsafe {
368            static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
369            static VERIFIED: AtomicBool = AtomicBool::new(false);
370
371            // Create a channel
372            let mut stack = crate::stack::alloc_test_stack();
373            stack = make_channel(stack);
374            let (_, channel_value) = pop(stack);
375
376            // Store channel pointer for strands (hacky but works for test)
377            let ch_ptr = match &channel_value {
378                Value::Channel(arc) => Arc::as_ptr(arc) as i64,
379                _ => panic!("Expected Channel"),
380            };
381            CHANNEL_PTR.store(ch_ptr, Ordering::Release);
382
383            // Keep the Arc alive
384            std::mem::forget(channel_value.clone());
385
386            // Sender strand
387            extern "C" fn sender(_stack: Stack) -> Stack {
388                use crate::seqstring::arena_string;
389                use crate::value::ChannelData;
390
391                unsafe {
392                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
393                    let channel = Arc::from_raw(ch_ptr);
394                    let channel_clone = Arc::clone(&channel);
395                    std::mem::forget(channel); // Don't drop
396
397                    // Create arena string (fast path)
398                    let msg = arena_string("Arena message!");
399                    assert!(!msg.is_global(), "Should be arena-allocated initially");
400
401                    // Send through channel
402                    let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
403                    let stack = push(stack, Value::Channel(channel_clone));
404                    send(stack)
405                }
406            }
407
408            // Receiver strand
409            extern "C" fn receiver(_stack: Stack) -> Stack {
410                use crate::value::ChannelData;
411
412                unsafe {
413                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
414                    let channel = Arc::from_raw(ch_ptr);
415                    let channel_clone = Arc::clone(&channel);
416                    std::mem::forget(channel); // Don't drop
417
418                    let mut stack = push(
419                        crate::stack::alloc_test_stack(),
420                        Value::Channel(channel_clone),
421                    );
422                    stack = receive(stack);
423                    let (_, msg_val) = pop(stack);
424
425                    match msg_val {
426                        Value::String(s) => {
427                            assert_eq!(s.as_str(), "Arena message!");
428                            assert!(s.is_global(), "Received string should be global");
429                            VERIFIED.store(true, Ordering::Release);
430                        }
431                        _ => panic!("Expected String"),
432                    }
433
434                    std::ptr::null_mut()
435                }
436            }
437
438            spawn_strand(sender);
439            spawn_strand(receiver);
440            wait_all_strands();
441
442            assert!(
443                VERIFIED.load(Ordering::Acquire),
444                "Receiver should have verified"
445            );
446        }
447    }
448
449    #[test]
450    fn test_send_safe_success() {
451        unsafe {
452            let mut stack = crate::stack::alloc_test_stack();
453            stack = make_channel(stack);
454            let (_, channel_value) = pop(stack);
455
456            // Send using send-safe
457            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
458            stack = push(stack, channel_value.clone());
459            stack = send_safe(stack);
460
461            // Should return success (1)
462            let (_stack, result) = pop(stack);
463            assert_eq!(result, Value::Int(1));
464
465            // Receive to verify
466            let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
467            stack = receive(stack);
468            let (_, received) = pop(stack);
469            assert_eq!(received, Value::Int(42));
470        }
471    }
472
473    #[test]
474    fn test_send_safe_wrong_type() {
475        unsafe {
476            // Try to send with Int instead of Channel
477            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
478            stack = push(stack, Value::Int(999)); // Wrong type
479            stack = send_safe(stack);
480
481            // Should return failure (0)
482            let (_stack, result) = pop(stack);
483            assert_eq!(result, Value::Int(0));
484        }
485    }
486
487    #[test]
488    fn test_receive_safe_success() {
489        unsafe {
490            let mut stack = crate::stack::alloc_test_stack();
491            stack = make_channel(stack);
492            let (_, channel_value) = pop(stack);
493
494            // Send value
495            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
496            stack = push(stack, channel_value.clone());
497            let _ = send(stack);
498
499            // Receive using receive-safe
500            let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
501            stack = receive_safe(stack);
502
503            // Should return (value, 1)
504            let (stack, success) = pop(stack);
505            let (_stack, value) = pop(stack);
506            assert_eq!(success, Value::Int(1));
507            assert_eq!(value, Value::Int(42));
508        }
509    }
510
511    #[test]
512    fn test_receive_safe_wrong_type() {
513        unsafe {
514            // Try to receive with Int instead of Channel
515            let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(999));
516            stack = receive_safe(stack);
517
518            // Should return (0, 0)
519            let (stack, success) = pop(stack);
520            let (_stack, value) = pop(stack);
521            assert_eq!(success, Value::Int(0));
522            assert_eq!(value, Value::Int(0));
523        }
524    }
525
526    #[test]
527    fn test_mpmc_concurrent_receivers() {
528        // Verify that multiple receivers work with MPMC
529        unsafe {
530            const NUM_MESSAGES: i64 = 100;
531            const NUM_RECEIVERS: usize = 4;
532
533            static RECEIVER_COUNTS: [AtomicI64; 4] = [
534                AtomicI64::new(0),
535                AtomicI64::new(0),
536                AtomicI64::new(0),
537                AtomicI64::new(0),
538            ];
539            static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
540
541            // Reset counters
542            for counter in &RECEIVER_COUNTS {
543                counter.store(0, Ordering::SeqCst);
544            }
545
546            // Create channel
547            let mut stack = crate::stack::alloc_test_stack();
548            stack = make_channel(stack);
549            let (_, channel_value) = pop(stack);
550
551            let ch_ptr = match &channel_value {
552                Value::Channel(arc) => Arc::as_ptr(arc) as i64,
553                _ => panic!("Expected Channel"),
554            };
555            CHANNEL_PTR.store(ch_ptr, Ordering::SeqCst);
556
557            // Keep Arc alive
558            for _ in 0..(NUM_RECEIVERS + 1) {
559                std::mem::forget(channel_value.clone());
560            }
561
562            fn make_receiver(idx: usize) -> extern "C" fn(Stack) -> Stack {
563                match idx {
564                    0 => receiver_0,
565                    1 => receiver_1,
566                    2 => receiver_2,
567                    3 => receiver_3,
568                    _ => panic!("Invalid receiver index"),
569                }
570            }
571
572            extern "C" fn receiver_0(stack: Stack) -> Stack {
573                receive_loop(0, stack)
574            }
575            extern "C" fn receiver_1(stack: Stack) -> Stack {
576                receive_loop(1, stack)
577            }
578            extern "C" fn receiver_2(stack: Stack) -> Stack {
579                receive_loop(2, stack)
580            }
581            extern "C" fn receiver_3(stack: Stack) -> Stack {
582                receive_loop(3, stack)
583            }
584
585            fn receive_loop(idx: usize, _stack: Stack) -> Stack {
586                use crate::value::ChannelData;
587                unsafe {
588                    let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
589                    let channel = Arc::from_raw(ch_ptr);
590                    let channel_clone = Arc::clone(&channel);
591                    std::mem::forget(channel);
592
593                    loop {
594                        let mut stack = push(
595                            crate::stack::alloc_test_stack(),
596                            Value::Channel(channel_clone.clone()),
597                        );
598                        stack = receive_safe(stack);
599                        let (stack, success) = pop(stack);
600                        let (_, value) = pop(stack);
601
602                        match (success, value) {
603                            (Value::Int(1), Value::Int(v)) => {
604                                if v < 0 {
605                                    break; // Sentinel
606                                }
607                                RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
608                            }
609                            _ => break,
610                        }
611                        may::coroutine::yield_now();
612                    }
613                    std::ptr::null_mut()
614                }
615            }
616
617            // Spawn receivers
618            for i in 0..NUM_RECEIVERS {
619                spawn_strand(make_receiver(i));
620            }
621
622            std::thread::sleep(std::time::Duration::from_millis(10));
623
624            // Send messages
625            for i in 0..NUM_MESSAGES {
626                let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
627                let channel = Arc::from_raw(ch_ptr);
628                let channel_clone = Arc::clone(&channel);
629                std::mem::forget(channel);
630
631                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
632                stack = push(stack, Value::Channel(channel_clone));
633                let _ = send(stack);
634            }
635
636            // Send sentinels
637            for _ in 0..NUM_RECEIVERS {
638                let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
639                let channel = Arc::from_raw(ch_ptr);
640                let channel_clone = Arc::clone(&channel);
641                std::mem::forget(channel);
642
643                let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(-1));
644                stack = push(stack, Value::Channel(channel_clone));
645                let _ = send(stack);
646            }
647
648            wait_all_strands();
649
650            let total_received: i64 = RECEIVER_COUNTS
651                .iter()
652                .map(|c| c.load(Ordering::SeqCst))
653                .sum();
654            assert_eq!(total_received, NUM_MESSAGES);
655
656            let active_receivers = RECEIVER_COUNTS
657                .iter()
658                .filter(|c| c.load(Ordering::SeqCst) > 0)
659                .count();
660            assert!(active_receivers >= 2, "Messages should be distributed");
661        }
662    }
663}