seq_runtime/channel.rs
1//! Channel operations for CSP-style concurrency
2//!
3//! Channels are the primary communication mechanism between strands.
4//! They use May's MPMC channels with cooperative blocking.
5//!
6//! ## Zero-Mutex Design
7//!
8//! Channels are passed directly as `Value::Channel` on the stack. There is NO
9//! global registry and NO mutex contention. Send/receive operations work directly
10//! on the channel handles with zero locking overhead.
11//!
12//! ## Non-Blocking Guarantee
13//!
14//! All channel operations (`send`, `receive`) cooperatively block using May's scheduler.
15//! They NEVER block OS threads - May handles scheduling other strands while waiting.
16//!
17//! ## Multi-Consumer Support
18//!
19//! Channels support multiple producers AND multiple consumers (MPMC). Multiple strands
20//! can receive from the same channel concurrently - each message is delivered to exactly
21//! one receiver (work-stealing semantics).
22//!
23//! ## Stack Effects
24//!
25//! - `chan.make`: ( -- Channel ) - creates a new channel
26//! - `chan.send`: ( value Channel -- Bool ) - sends value, returns success
27//! - `chan.receive`: ( Channel -- value Bool ) - receives value and success flag
28//!
29//! ## Error Handling
30//!
31//! All operations return success flags - errors are values, not crashes:
32//!
33//! - `chan.send`: ( value Channel -- Bool ) - returns true on success, false on closed
34//! - `chan.receive`: ( Channel -- value Bool ) - returns value and success flag
35
36use crate::stack::{Stack, pop, push};
37use crate::value::{ChannelData, Value};
38use may::sync::mpmc;
39use std::sync::Arc;
40
41#[cfg(feature = "diagnostics")]
42use std::sync::atomic::{AtomicU64, Ordering};
43
44#[cfg(feature = "diagnostics")]
45pub static TOTAL_MESSAGES_SENT: AtomicU64 = AtomicU64::new(0);
46#[cfg(feature = "diagnostics")]
47pub static TOTAL_MESSAGES_RECEIVED: AtomicU64 = AtomicU64::new(0);
48
49/// Create a new channel
50///
51/// Stack effect: ( -- Channel )
52///
53/// Returns a Channel value that can be used with send/receive operations.
54/// The channel can be duplicated (dup) to share between strands.
55///
56/// # Safety
57/// Always safe to call
58#[unsafe(no_mangle)]
59pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
60 // Create an unbounded MPMC channel
61 // May's mpmc::channel() creates coroutine-aware channels with multi-producer, multi-consumer
62 // The recv() operation cooperatively blocks (yields) instead of blocking the OS thread
63 let (sender, receiver) = mpmc::channel();
64
65 // Wrap in Arc<ChannelData> and push directly - NO registry, NO mutex
66 let channel = Arc::new(ChannelData { sender, receiver });
67
68 unsafe { push(stack, Value::Channel(channel)) }
69}
70
71/// Close a channel (drop it from the stack)
72///
73/// Stack effect: ( Channel -- )
74///
75/// Simply drops the channel. When all references are dropped, the channel is closed.
76/// This is provided for API compatibility but is equivalent to `drop`.
77///
78/// # Safety
79/// Stack must have a Channel on top
80#[unsafe(no_mangle)]
81pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
82 assert!(!stack.is_null(), "close_channel: stack is empty");
83
84 // Pop and drop the channel
85 let (rest, channel_value) = unsafe { pop(stack) };
86 match channel_value {
87 Value::Channel(_) => {} // Drop occurs here
88 _ => panic!(
89 "close_channel: expected Channel on stack, got {:?}",
90 channel_value
91 ),
92 }
93
94 rest
95}
96
97/// Send a value through a channel
98///
99/// Stack effect: ( value Channel -- Bool )
100///
101/// Returns true on success, false on failure (closed channel).
102/// Errors are values, not crashes.
103///
104/// # Safety
105/// Stack must have a Channel on top and a value below it
106#[unsafe(no_mangle)]
107pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
108 assert!(!stack.is_null(), "chan.send: stack is empty");
109
110 // Pop channel
111 let (stack, channel_value) = unsafe { pop(stack) };
112 let channel = match channel_value {
113 Value::Channel(ch) => ch,
114 _ => {
115 // Wrong type - consume value and return failure
116 if !stack.is_null() {
117 let (rest, _value) = unsafe { pop(stack) };
118 return unsafe { push(rest, Value::Bool(false)) };
119 }
120 return unsafe { push(stack, Value::Bool(false)) };
121 }
122 };
123
124 if stack.is_null() {
125 // No value to send - return failure
126 return unsafe { push(stack, Value::Bool(false)) };
127 }
128
129 // Pop value to send
130 let (rest, value) = unsafe { pop(stack) };
131
132 // Clone the value before sending
133 let global_value = value.clone();
134
135 // Send the value
136 match channel.sender.send(global_value) {
137 Ok(()) => {
138 #[cfg(feature = "diagnostics")]
139 TOTAL_MESSAGES_SENT.fetch_add(1, Ordering::Relaxed);
140 unsafe { push(rest, Value::Bool(true)) }
141 }
142 Err(_) => unsafe { push(rest, Value::Bool(false)) },
143 }
144}
145
146/// Receive a value from a channel
147///
148/// Stack effect: ( Channel -- value Bool )
149///
150/// Returns (value, true) on success, (0, false) on failure (closed channel).
151/// Errors are values, not crashes.
152///
153/// ## Multi-Consumer Support
154///
155/// Multiple strands can receive from the same channel concurrently (MPMC).
156/// Each message is delivered to exactly one receiver (work-stealing semantics).
157///
158/// # Safety
159/// Stack must have a Channel on top
160#[unsafe(no_mangle)]
161pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
162 assert!(!stack.is_null(), "chan.receive: stack is empty");
163
164 // Pop channel
165 let (rest, channel_value) = unsafe { pop(stack) };
166 let channel = match channel_value {
167 Value::Channel(ch) => ch,
168 _ => {
169 // Wrong type - return failure
170 let stack = unsafe { push(rest, Value::Int(0)) };
171 return unsafe { push(stack, Value::Bool(false)) };
172 }
173 };
174
175 // Receive a value
176 match channel.receiver.recv() {
177 Ok(value) => {
178 #[cfg(feature = "diagnostics")]
179 TOTAL_MESSAGES_RECEIVED.fetch_add(1, Ordering::Relaxed);
180 let stack = unsafe { push(rest, value) };
181 unsafe { push(stack, Value::Bool(true)) }
182 }
183 Err(_) => {
184 let stack = unsafe { push(rest, Value::Int(0)) };
185 unsafe { push(stack, Value::Bool(false)) }
186 }
187 }
188}
189
190// Public re-exports with short names for internal use
191pub use patch_seq_chan_receive as receive;
192pub use patch_seq_chan_send as send;
193pub use patch_seq_close_channel as close_channel;
194pub use patch_seq_make_channel as make_channel;
195
196#[cfg(test)]
197mod tests;