Skip to main content

seq_runtime/
channel.rs

1//! Channel operations for CSP-style concurrency
2//!
3//! Channels are the primary communication mechanism between strands.
4//! They use May's MPMC channels with cooperative blocking.
5//!
6//! ## Zero-Mutex Design
7//!
8//! Channels are passed directly as `Value::Channel` on the stack. There is NO
9//! global registry and NO mutex contention. Send/receive operations work directly
10//! on the channel handles with zero locking overhead.
11//!
12//! ## Non-Blocking Guarantee
13//!
14//! All channel operations (`send`, `receive`) cooperatively block using May's scheduler.
15//! They NEVER block OS threads - May handles scheduling other strands while waiting.
16//!
17//! ## Multi-Consumer Support
18//!
19//! Channels support multiple producers AND multiple consumers (MPMC). Multiple strands
20//! can receive from the same channel concurrently - each message is delivered to exactly
21//! one receiver (work-stealing semantics).
22//!
23//! ## Stack Effects
24//!
25//! - `chan.make`: ( -- Channel ) - creates a new channel
26//! - `chan.send`: ( value Channel -- Bool ) - sends value, returns success
27//! - `chan.receive`: ( Channel -- value Bool ) - receives value and success flag
28//!
29//! ## Error Handling
30//!
31//! All operations return success flags - errors are values, not crashes:
32//!
33//! - `chan.send`: ( value Channel -- Bool ) - returns true on success, false on closed
34//! - `chan.receive`: ( Channel -- value Bool ) - returns value and success flag
35
36use crate::stack::{Stack, pop, push};
37use crate::value::{ChannelData, Value};
38use may::sync::mpmc;
39use std::sync::Arc;
40
41#[cfg(feature = "diagnostics")]
42use std::sync::atomic::{AtomicU64, Ordering};
43
44#[cfg(feature = "diagnostics")]
45pub static TOTAL_MESSAGES_SENT: AtomicU64 = AtomicU64::new(0);
46#[cfg(feature = "diagnostics")]
47pub static TOTAL_MESSAGES_RECEIVED: AtomicU64 = AtomicU64::new(0);
48
49/// Create a new channel
50///
51/// Stack effect: ( -- Channel )
52///
53/// Returns a Channel value that can be used with send/receive operations.
54/// The channel can be duplicated (dup) to share between strands.
55///
56/// # Safety
57/// Always safe to call
58#[unsafe(no_mangle)]
59pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
60    // Create an unbounded MPMC channel
61    // May's mpmc::channel() creates coroutine-aware channels with multi-producer, multi-consumer
62    // The recv() operation cooperatively blocks (yields) instead of blocking the OS thread
63    let (sender, receiver) = mpmc::channel();
64
65    // Wrap in Arc<ChannelData> and push directly - NO registry, NO mutex
66    let channel = Arc::new(ChannelData { sender, receiver });
67
68    unsafe { push(stack, Value::Channel(channel)) }
69}
70
71/// Close a channel (drop it from the stack)
72///
73/// Stack effect: ( Channel -- )
74///
75/// Simply drops the channel. When all references are dropped, the channel is closed.
76/// This is provided for API compatibility but is equivalent to `drop`.
77///
78/// # Safety
79/// Stack must have a Channel on top
80#[unsafe(no_mangle)]
81pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
82    assert!(!stack.is_null(), "close_channel: stack is empty");
83
84    // Pop and drop the channel
85    let (rest, channel_value) = unsafe { pop(stack) };
86    match channel_value {
87        Value::Channel(_) => {} // Drop occurs here
88        _ => panic!(
89            "close_channel: expected Channel on stack, got {:?}",
90            channel_value
91        ),
92    }
93
94    rest
95}
96
97/// Send a value through a channel
98///
99/// Stack effect: ( value Channel -- Bool )
100///
101/// Returns true on success, false on failure (closed channel).
102/// Errors are values, not crashes.
103///
104/// # Safety
105/// Stack must have a Channel on top and a value below it
106#[unsafe(no_mangle)]
107pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
108    assert!(!stack.is_null(), "chan.send: stack is empty");
109
110    // Pop channel
111    let (stack, channel_value) = unsafe { pop(stack) };
112    let channel = match channel_value {
113        Value::Channel(ch) => ch,
114        _ => {
115            // Wrong type - consume value and return failure
116            if !stack.is_null() {
117                let (rest, _value) = unsafe { pop(stack) };
118                return unsafe { push(rest, Value::Bool(false)) };
119            }
120            return unsafe { push(stack, Value::Bool(false)) };
121        }
122    };
123
124    if stack.is_null() {
125        // No value to send - return failure
126        return unsafe { push(stack, Value::Bool(false)) };
127    }
128
129    // Pop value to send
130    let (rest, value) = unsafe { pop(stack) };
131
132    // Clone the value before sending
133    let global_value = value.clone();
134
135    // Send the value
136    match channel.sender.send(global_value) {
137        Ok(()) => {
138            #[cfg(feature = "diagnostics")]
139            TOTAL_MESSAGES_SENT.fetch_add(1, Ordering::Relaxed);
140            unsafe { push(rest, Value::Bool(true)) }
141        }
142        Err(_) => unsafe { push(rest, Value::Bool(false)) },
143    }
144}
145
146/// Receive a value from a channel
147///
148/// Stack effect: ( Channel -- value Bool )
149///
150/// Returns (value, true) on success, (0, false) on failure (closed channel).
151/// Errors are values, not crashes.
152///
153/// ## Multi-Consumer Support
154///
155/// Multiple strands can receive from the same channel concurrently (MPMC).
156/// Each message is delivered to exactly one receiver (work-stealing semantics).
157///
158/// # Safety
159/// Stack must have a Channel on top
160#[unsafe(no_mangle)]
161pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
162    assert!(!stack.is_null(), "chan.receive: stack is empty");
163
164    // Pop channel
165    let (rest, channel_value) = unsafe { pop(stack) };
166    let channel = match channel_value {
167        Value::Channel(ch) => ch,
168        _ => {
169            // Wrong type - return failure
170            let stack = unsafe { push(rest, Value::Int(0)) };
171            return unsafe { push(stack, Value::Bool(false)) };
172        }
173    };
174
175    // Receive a value
176    match channel.receiver.recv() {
177        Ok(value) => {
178            #[cfg(feature = "diagnostics")]
179            TOTAL_MESSAGES_RECEIVED.fetch_add(1, Ordering::Relaxed);
180            let stack = unsafe { push(rest, value) };
181            unsafe { push(stack, Value::Bool(true)) }
182        }
183        Err(_) => {
184            let stack = unsafe { push(rest, Value::Int(0)) };
185            unsafe { push(stack, Value::Bool(false)) }
186        }
187    }
188}
189
190// Public re-exports with short names for internal use
191pub use patch_seq_chan_receive as receive;
192pub use patch_seq_chan_send as send;
193pub use patch_seq_close_channel as close_channel;
194pub use patch_seq_make_channel as make_channel;
195
196#[cfg(test)]
197mod tests;