seq_runtime/udp.rs
1//! UDP Socket Operations for Seq
2//!
3//! Provides non-blocking UDP datagram operations using May's
4//! coroutine-aware I/O. `udp.receive-from` yields the strand
5//! while waiting for a datagram instead of blocking the OS thread.
6//!
7//! These functions are exported with C ABI for LLVM codegen.
8//!
9//! ## Payloads are byte-clean
10//!
11//! Datagrams carry whatever bytes the wire delivered — no UTF-8
12//! validation. Binary protocols (DNS records, NTP packets, OSC
13//! int32 / float32 arguments, multicast TLV, MessagePack-over-UDP)
14//! round-trip through `udp.send-to` / `udp.receive-from` byte for
15//! byte. See `docs/design/STRING_BYTE_CLEANLINESS.md` for the
16//! `SeqString` design that makes this possible.
17
18use crate::stack::{Stack, pop, push};
19use crate::value::Value;
20use may::net::UdpSocket;
21use std::sync::{Arc, Mutex};
22
23// Maximum number of concurrent sockets to prevent unbounded growth.
24// Same cap as `tcp.rs`.
25const MAX_SOCKETS: usize = 10_000;
26
27// Maximum bytes to read per datagram.
28//
29// UDP datagrams are protocol-capped at 65,507 bytes for IPv4 (the
30// `udp.length` header is 16-bit, minus IP+UDP headers), and 65,535
31// for IPv6 base-headered datagrams. We use the next power of two
32// (65,536) as the receive buffer size — anything larger cannot
33// arrive on the wire, so allocating more would be pure waste.
34//
35// This intentionally diverges from `tcp.rs`'s 1 MB cap, which makes
36// sense for streaming reads but not for one-datagram-per-call recv.
37const MAX_READ_SIZE: usize = 65_536;
38
39// Socket registry with ID reuse via free list.
40//
41// Slots hold `Arc<UdpSocket>` rather than the socket directly. Reasons:
42//
43// - `may::net::UdpSocket`'s I/O methods (`send_to`, `recv_from`,
44// `local_addr`) all take `&self`, so multiple `Arc` clones across
45// strands are safe without any further synchronisation.
46//
47// - I/O paths clone the `Arc` out of the registry under the lock, then
48// drop the lock before doing the syscall. This is what the previous
49// `take()`-and-restore pattern was reaching for, but with `Arc` we
50// avoid the close-vs-in-flight race: `close` simply sets the slot to
51// `None` (and frees the id) regardless of whether other strands
52// currently hold an `Arc` clone. The in-flight strand's clone keeps
53// the OS socket alive until its `recv_from` / `send_to` returns; the
54// OS-level close only happens when the last `Arc` drops.
55//
56// - The id bookkeeping is now correct under all races: every successful
57// `close` pushes the id to `free_ids`, even if the slot was being
58// used for I/O.
59//
60// `tcp.rs` keeps the take-and-restore pattern because `TcpStream::read`
61// is `&mut self` — multiple strands cannot share a TcpStream the same
62// way. UDP's `&self`-only API is what makes the cleaner shape possible.
63struct SocketRegistry<T> {
64 sockets: Vec<Option<Arc<T>>>,
65 free_ids: Vec<usize>,
66}
67
68impl<T> SocketRegistry<T> {
69 const fn new() -> Self {
70 Self {
71 sockets: Vec::new(),
72 free_ids: Vec::new(),
73 }
74 }
75
76 fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
77 let socket = Arc::new(socket);
78 if let Some(id) = self.free_ids.pop() {
79 self.sockets[id] = Some(socket);
80 return Ok(id as i64);
81 }
82 if self.sockets.len() >= MAX_SOCKETS {
83 return Err("Maximum socket limit reached");
84 }
85 let id = self.sockets.len();
86 self.sockets.push(Some(socket));
87 Ok(id as i64)
88 }
89
90 /// Clone the `Arc` out of the slot so the caller can do I/O after
91 /// dropping the registry lock. Returns `None` if the slot is empty
92 /// (handle invalid, out of range, or already closed).
93 fn checkout(&self, id: usize) -> Option<Arc<T>> {
94 self.sockets.get(id).and_then(|slot| slot.clone())
95 }
96
97 /// Drop the slot's `Arc`. Returns whether the slot held a socket
98 /// (i.e. whether the close had any effect). Idempotent: a second
99 /// close on the same id returns `false`. Independent of any
100 /// in-flight I/O — those strands hold their own `Arc` clones.
101 fn free(&mut self, id: usize) -> bool {
102 if let Some(slot) = self.sockets.get_mut(id)
103 && slot.is_some()
104 {
105 *slot = None;
106 self.free_ids.push(id);
107 return true;
108 }
109 false
110 }
111}
112
113static SOCKETS: Mutex<SocketRegistry<UdpSocket>> = Mutex::new(SocketRegistry::new());
114
115/// Bind a UDP socket to a local port.
116///
117/// Stack effect: ( port -- socket bound-port Bool )
118///
119/// Binds to `0.0.0.0:port`. `port=0` lets the OS pick a free port; the
120/// returned `bound-port` is the actual bound port (equal to `port` if
121/// non-zero). On failure pushes `(0, 0, false)`.
122///
123/// # Safety
124/// Stack must have an Int (port) on top.
125#[unsafe(no_mangle)]
126pub unsafe extern "C" fn patch_seq_udp_bind(stack: Stack) -> Stack {
127 unsafe {
128 let (stack, port_val) = pop(stack);
129 let port = match port_val {
130 Value::Int(p) => p,
131 _ => return push_bind_failure(stack),
132 };
133
134 if !(0..=65535).contains(&port) {
135 return push_bind_failure(stack);
136 }
137
138 let addr = format!("0.0.0.0:{}", port);
139 let socket = match UdpSocket::bind(&addr) {
140 Ok(s) => s,
141 Err(_) => return push_bind_failure(stack),
142 };
143
144 // Capture the actual bound port before the registry takes ownership.
145 let bound_port = match socket.local_addr() {
146 Ok(addr) => addr.port() as i64,
147 Err(_) => return push_bind_failure(stack),
148 };
149
150 let mut sockets = SOCKETS.lock().unwrap();
151 match sockets.allocate(socket) {
152 Ok(socket_id) => {
153 let stack = push(stack, Value::Int(socket_id));
154 let stack = push(stack, Value::Int(bound_port));
155 push(stack, Value::Bool(true))
156 }
157 Err(_) => push_bind_failure(stack),
158 }
159 }
160}
161
162unsafe fn push_bind_failure(stack: Stack) -> Stack {
163 unsafe {
164 let stack = push(stack, Value::Int(0));
165 let stack = push(stack, Value::Int(0));
166 push(stack, Value::Bool(false))
167 }
168}
169
170/// Send a datagram to a host:port from a bound UDP socket.
171///
172/// Stack effect: ( bytes host port socket -- Bool )
173///
174/// Pops `socket`, `port`, `host`, `bytes` (in that order, so `bytes`
175/// is below all of them on entry). Returns `false` on type mismatch,
176/// invalid socket, address-resolution failure, or send error.
177///
178/// # Safety
179/// Stack must have Int (socket), Int (port), String (host),
180/// String (bytes) — top-down — on entry.
181#[unsafe(no_mangle)]
182pub unsafe extern "C" fn patch_seq_udp_send_to(stack: Stack) -> Stack {
183 unsafe {
184 let (stack, socket_val) = pop(stack);
185 // Reject negative ids before the `as usize` cast: a negative
186 // i64 wraps to usize::MAX, which would silently fall through
187 // to a benign `None` lookup. Catching it here is a clearer
188 // signal than the indirect not-found path.
189 let socket_id = match socket_val {
190 Value::Int(id) if id >= 0 => id as usize,
191 _ => return push(stack, Value::Bool(false)),
192 };
193
194 let (stack, port_val) = pop(stack);
195 let port = match port_val {
196 Value::Int(p) if (0..=65535).contains(&p) => p,
197 _ => return push(stack, Value::Bool(false)),
198 };
199
200 let (stack, host_val) = pop(stack);
201 let host = match host_val {
202 Value::String(s) => s,
203 _ => return push(stack, Value::Bool(false)),
204 };
205
206 let (stack, bytes_val) = pop(stack);
207 let bytes = match bytes_val {
208 Value::String(s) => s,
209 _ => return push(stack, Value::Bool(false)),
210 };
211
212 // Clone the Arc<UdpSocket> out of the registry. We don't hold
213 // the lock across the syscall, and a concurrent `close` is
214 // free to drop the registry's slot reference — our clone keeps
215 // the socket alive for the duration of this send.
216 let socket = {
217 let sockets = SOCKETS.lock().unwrap();
218 match sockets.checkout(socket_id) {
219 Some(s) => s,
220 None => return push(stack, Value::Bool(false)),
221 }
222 };
223
224 let addr = format!("{}:{}", host.as_str_or_empty(), port);
225 let result = socket.send_to(bytes.as_bytes(), &addr);
226 push(stack, Value::Bool(result.is_ok()))
227 }
228}
229
230/// Receive one datagram from a UDP socket.
231///
232/// Stack effect: ( socket -- bytes host port Bool )
233///
234/// Yields the strand until a datagram arrives. On failure pushes
235/// `("", "", 0, false)` — invalid socket, recv error, datagram larger
236/// than `MAX_READ_SIZE`, or non-UTF-8 payload (see module doc).
237///
238/// # Safety
239/// Stack must have an Int (socket) on top.
240#[unsafe(no_mangle)]
241pub unsafe extern "C" fn patch_seq_udp_receive_from(stack: Stack) -> Stack {
242 unsafe {
243 let (stack, socket_val) = pop(stack);
244 let socket_id = match socket_val {
245 Value::Int(id) if id >= 0 => id as usize,
246 _ => return push_receive_failure(stack),
247 };
248
249 // Clone the Arc<UdpSocket> out of the registry. The receive
250 // strand keeps the socket alive even if another strand closes
251 // the handle while we're in `recv_from`. When close drops the
252 // registry's clone and ours returns, the OS-level close fires.
253 let socket = {
254 let sockets = SOCKETS.lock().unwrap();
255 match sockets.checkout(socket_id) {
256 Some(s) => s,
257 None => return push_receive_failure(stack),
258 }
259 };
260
261 let mut buffer = vec![0u8; MAX_READ_SIZE];
262 let recv_result = socket.recv_from(&mut buffer);
263
264 let (size, src) = match recv_result {
265 Ok(pair) => pair,
266 Err(_) => return push_receive_failure(stack),
267 };
268
269 buffer.truncate(size);
270 // The payload is whatever bytes the wire delivered. We no longer
271 // require UTF-8 — datagrams for OSC, DNS, NTP, MessagePack, etc.
272 // routinely include high-bit bytes from int32 / float32 / blob
273 // fields. The bytes go into a byte-clean SeqString unchanged.
274 let stack = push(stack, Value::String(crate::seqstring::global_bytes(buffer)));
275 let stack = push(stack, Value::String(src.ip().to_string().into()));
276 let stack = push(stack, Value::Int(src.port() as i64));
277 push(stack, Value::Bool(true))
278 }
279}
280
281unsafe fn push_receive_failure(stack: Stack) -> Stack {
282 unsafe {
283 let stack = push(stack, Value::String("".into()));
284 let stack = push(stack, Value::String("".into()));
285 let stack = push(stack, Value::Int(0));
286 push(stack, Value::Bool(false))
287 }
288}
289
290/// Close a UDP socket and free its handle.
291///
292/// Stack effect: ( socket -- Bool )
293///
294/// Returns `true` if the handle was open (the registry slot held a
295/// socket), `false` if it was already invalid (never allocated, or
296/// previously closed). Idempotent across redundant calls on the same
297/// id.
298///
299/// Concurrent I/O is safe: any strand mid-`send_to` / `recv_from`
300/// holds its own `Arc<UdpSocket>` clone, so closing the registry slot
301/// from another strand only drops the registry's reference. The
302/// in-flight syscall completes; the OS-level close fires when the
303/// last `Arc` is dropped. The id is recycled to the free list as
304/// soon as `close` returns, regardless of any in-flight strand.
305///
306/// # Safety
307/// Stack must have an Int (socket) on top.
308#[unsafe(no_mangle)]
309pub unsafe extern "C" fn patch_seq_udp_close(stack: Stack) -> Stack {
310 unsafe {
311 let (stack, socket_val) = pop(stack);
312 let socket_id = match socket_val {
313 Value::Int(id) if id >= 0 => id as usize,
314 _ => return push(stack, Value::Bool(false)),
315 };
316
317 let mut sockets = SOCKETS.lock().unwrap();
318 let existed = sockets.free(socket_id);
319 push(stack, Value::Bool(existed))
320 }
321}
322
323// Public re-exports with short names for in-module callers — the
324// `tests` submodule below imports them via `use super::*`. The
325// crate-root re-exports in `lib.rs` are the linker-facing aliases.
326pub use patch_seq_udp_bind as udp_bind;
327pub use patch_seq_udp_close as udp_close;
328pub use patch_seq_udp_receive_from as udp_receive_from;
329pub use patch_seq_udp_send_to as udp_send_to;
330
331#[cfg(test)]
332mod tests;