Skip to main content

seq_runtime/
udp.rs

1//! UDP Socket Operations for Seq
2//!
3//! Provides non-blocking UDP datagram operations using May's
4//! coroutine-aware I/O. `udp.receive-from` yields the strand
5//! while waiting for a datagram instead of blocking the OS thread.
6//!
7//! These functions are exported with C ABI for LLVM codegen.
8//!
9//! ## Payloads are byte-clean
10//!
11//! Datagrams carry whatever bytes the wire delivered — no UTF-8
12//! validation. Binary protocols (DNS records, NTP packets, OSC
13//! int32 / float32 arguments, multicast TLV, MessagePack-over-UDP)
14//! round-trip through `udp.send-to` / `udp.receive-from` byte for
15//! byte. See `docs/design/STRING_BYTE_CLEANLINESS.md` for the
16//! `SeqString` design that makes this possible.
17
18use crate::stack::{Stack, pop, push};
19use crate::value::Value;
20use may::net::UdpSocket;
21use std::sync::{Arc, Mutex};
22
23// Maximum number of concurrent sockets to prevent unbounded growth.
24// Same cap as `tcp.rs`.
25const MAX_SOCKETS: usize = 10_000;
26
27// Maximum bytes to read per datagram.
28//
29// UDP datagrams are protocol-capped at 65,507 bytes for IPv4 (the
30// `udp.length` header is 16-bit, minus IP+UDP headers), and 65,535
31// for IPv6 base-headered datagrams. We use the next power of two
32// (65,536) as the receive buffer size — anything larger cannot
33// arrive on the wire, so allocating more would be pure waste.
34//
35// This intentionally diverges from `tcp.rs`'s 1 MB cap, which makes
36// sense for streaming reads but not for one-datagram-per-call recv.
37const MAX_READ_SIZE: usize = 65_536;
38
39// Socket registry with ID reuse via free list.
40//
41// Slots hold `Arc<UdpSocket>` rather than the socket directly. Reasons:
42//
43// - `may::net::UdpSocket`'s I/O methods (`send_to`, `recv_from`,
44//   `local_addr`) all take `&self`, so multiple `Arc` clones across
45//   strands are safe without any further synchronisation.
46//
47// - I/O paths clone the `Arc` out of the registry under the lock, then
48//   drop the lock before doing the syscall. This is what the previous
49//   `take()`-and-restore pattern was reaching for, but with `Arc` we
50//   avoid the close-vs-in-flight race: `close` simply sets the slot to
51//   `None` (and frees the id) regardless of whether other strands
52//   currently hold an `Arc` clone. The in-flight strand's clone keeps
53//   the OS socket alive until its `recv_from` / `send_to` returns; the
54//   OS-level close only happens when the last `Arc` drops.
55//
56// - The id bookkeeping is now correct under all races: every successful
57//   `close` pushes the id to `free_ids`, even if the slot was being
58//   used for I/O.
59//
60// `tcp.rs` keeps the take-and-restore pattern because `TcpStream::read`
61// is `&mut self` — multiple strands cannot share a TcpStream the same
62// way. UDP's `&self`-only API is what makes the cleaner shape possible.
63struct SocketRegistry<T> {
64    sockets: Vec<Option<Arc<T>>>,
65    free_ids: Vec<usize>,
66}
67
68impl<T> SocketRegistry<T> {
69    const fn new() -> Self {
70        Self {
71            sockets: Vec::new(),
72            free_ids: Vec::new(),
73        }
74    }
75
76    fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
77        let socket = Arc::new(socket);
78        if let Some(id) = self.free_ids.pop() {
79            self.sockets[id] = Some(socket);
80            return Ok(id as i64);
81        }
82        if self.sockets.len() >= MAX_SOCKETS {
83            return Err("Maximum socket limit reached");
84        }
85        let id = self.sockets.len();
86        self.sockets.push(Some(socket));
87        Ok(id as i64)
88    }
89
90    /// Clone the `Arc` out of the slot so the caller can do I/O after
91    /// dropping the registry lock. Returns `None` if the slot is empty
92    /// (handle invalid, out of range, or already closed).
93    fn checkout(&self, id: usize) -> Option<Arc<T>> {
94        self.sockets.get(id).and_then(|slot| slot.clone())
95    }
96
97    /// Drop the slot's `Arc`. Returns whether the slot held a socket
98    /// (i.e. whether the close had any effect). Idempotent: a second
99    /// close on the same id returns `false`. Independent of any
100    /// in-flight I/O — those strands hold their own `Arc` clones.
101    fn free(&mut self, id: usize) -> bool {
102        if let Some(slot) = self.sockets.get_mut(id)
103            && slot.is_some()
104        {
105            *slot = None;
106            self.free_ids.push(id);
107            return true;
108        }
109        false
110    }
111}
112
113static SOCKETS: Mutex<SocketRegistry<UdpSocket>> = Mutex::new(SocketRegistry::new());
114
115/// Bind a UDP socket to a local port.
116///
117/// Stack effect: ( port -- socket bound-port Bool )
118///
119/// Binds to `0.0.0.0:port`. `port=0` lets the OS pick a free port; the
120/// returned `bound-port` is the actual bound port (equal to `port` if
121/// non-zero). On failure pushes `(0, 0, false)`.
122///
123/// # Safety
124/// Stack must have an Int (port) on top.
125#[unsafe(no_mangle)]
126pub unsafe extern "C" fn patch_seq_udp_bind(stack: Stack) -> Stack {
127    unsafe {
128        let (stack, port_val) = pop(stack);
129        let port = match port_val {
130            Value::Int(p) => p,
131            _ => return push_bind_failure(stack),
132        };
133
134        if !(0..=65535).contains(&port) {
135            return push_bind_failure(stack);
136        }
137
138        let addr = format!("0.0.0.0:{}", port);
139        let socket = match UdpSocket::bind(&addr) {
140            Ok(s) => s,
141            Err(_) => return push_bind_failure(stack),
142        };
143
144        // Capture the actual bound port before the registry takes ownership.
145        let bound_port = match socket.local_addr() {
146            Ok(addr) => addr.port() as i64,
147            Err(_) => return push_bind_failure(stack),
148        };
149
150        let mut sockets = SOCKETS.lock().unwrap();
151        match sockets.allocate(socket) {
152            Ok(socket_id) => {
153                let stack = push(stack, Value::Int(socket_id));
154                let stack = push(stack, Value::Int(bound_port));
155                push(stack, Value::Bool(true))
156            }
157            Err(_) => push_bind_failure(stack),
158        }
159    }
160}
161
162unsafe fn push_bind_failure(stack: Stack) -> Stack {
163    unsafe {
164        let stack = push(stack, Value::Int(0));
165        let stack = push(stack, Value::Int(0));
166        push(stack, Value::Bool(false))
167    }
168}
169
170/// Send a datagram to a host:port from a bound UDP socket.
171///
172/// Stack effect: ( bytes host port socket -- Bool )
173///
174/// Pops `socket`, `port`, `host`, `bytes` (in that order, so `bytes`
175/// is below all of them on entry). Returns `false` on type mismatch,
176/// invalid socket, address-resolution failure, or send error.
177///
178/// # Safety
179/// Stack must have Int (socket), Int (port), String (host),
180/// String (bytes) — top-down — on entry.
181#[unsafe(no_mangle)]
182pub unsafe extern "C" fn patch_seq_udp_send_to(stack: Stack) -> Stack {
183    unsafe {
184        let (stack, socket_val) = pop(stack);
185        // Reject negative ids before the `as usize` cast: a negative
186        // i64 wraps to usize::MAX, which would silently fall through
187        // to a benign `None` lookup. Catching it here is a clearer
188        // signal than the indirect not-found path.
189        let socket_id = match socket_val {
190            Value::Int(id) if id >= 0 => id as usize,
191            _ => return push(stack, Value::Bool(false)),
192        };
193
194        let (stack, port_val) = pop(stack);
195        let port = match port_val {
196            Value::Int(p) if (0..=65535).contains(&p) => p,
197            _ => return push(stack, Value::Bool(false)),
198        };
199
200        let (stack, host_val) = pop(stack);
201        let host = match host_val {
202            Value::String(s) => s,
203            _ => return push(stack, Value::Bool(false)),
204        };
205
206        let (stack, bytes_val) = pop(stack);
207        let bytes = match bytes_val {
208            Value::String(s) => s,
209            _ => return push(stack, Value::Bool(false)),
210        };
211
212        // Clone the Arc<UdpSocket> out of the registry. We don't hold
213        // the lock across the syscall, and a concurrent `close` is
214        // free to drop the registry's slot reference — our clone keeps
215        // the socket alive for the duration of this send.
216        let socket = {
217            let sockets = SOCKETS.lock().unwrap();
218            match sockets.checkout(socket_id) {
219                Some(s) => s,
220                None => return push(stack, Value::Bool(false)),
221            }
222        };
223
224        let addr = format!("{}:{}", host.as_str_or_empty(), port);
225        let result = socket.send_to(bytes.as_bytes(), &addr);
226        push(stack, Value::Bool(result.is_ok()))
227    }
228}
229
230/// Receive one datagram from a UDP socket.
231///
232/// Stack effect: ( socket -- bytes host port Bool )
233///
234/// Yields the strand until a datagram arrives. On failure pushes
235/// `("", "", 0, false)` — invalid socket, recv error, datagram larger
236/// than `MAX_READ_SIZE`, or non-UTF-8 payload (see module doc).
237///
238/// # Safety
239/// Stack must have an Int (socket) on top.
240#[unsafe(no_mangle)]
241pub unsafe extern "C" fn patch_seq_udp_receive_from(stack: Stack) -> Stack {
242    unsafe {
243        let (stack, socket_val) = pop(stack);
244        let socket_id = match socket_val {
245            Value::Int(id) if id >= 0 => id as usize,
246            _ => return push_receive_failure(stack),
247        };
248
249        // Clone the Arc<UdpSocket> out of the registry. The receive
250        // strand keeps the socket alive even if another strand closes
251        // the handle while we're in `recv_from`. When close drops the
252        // registry's clone and ours returns, the OS-level close fires.
253        let socket = {
254            let sockets = SOCKETS.lock().unwrap();
255            match sockets.checkout(socket_id) {
256                Some(s) => s,
257                None => return push_receive_failure(stack),
258            }
259        };
260
261        let mut buffer = vec![0u8; MAX_READ_SIZE];
262        let recv_result = socket.recv_from(&mut buffer);
263
264        let (size, src) = match recv_result {
265            Ok(pair) => pair,
266            Err(_) => return push_receive_failure(stack),
267        };
268
269        buffer.truncate(size);
270        // The payload is whatever bytes the wire delivered. We no longer
271        // require UTF-8 — datagrams for OSC, DNS, NTP, MessagePack, etc.
272        // routinely include high-bit bytes from int32 / float32 / blob
273        // fields. The bytes go into a byte-clean SeqString unchanged.
274        let stack = push(stack, Value::String(crate::seqstring::global_bytes(buffer)));
275        let stack = push(stack, Value::String(src.ip().to_string().into()));
276        let stack = push(stack, Value::Int(src.port() as i64));
277        push(stack, Value::Bool(true))
278    }
279}
280
281unsafe fn push_receive_failure(stack: Stack) -> Stack {
282    unsafe {
283        let stack = push(stack, Value::String("".into()));
284        let stack = push(stack, Value::String("".into()));
285        let stack = push(stack, Value::Int(0));
286        push(stack, Value::Bool(false))
287    }
288}
289
290/// Close a UDP socket and free its handle.
291///
292/// Stack effect: ( socket -- Bool )
293///
294/// Returns `true` if the handle was open (the registry slot held a
295/// socket), `false` if it was already invalid (never allocated, or
296/// previously closed). Idempotent across redundant calls on the same
297/// id.
298///
299/// Concurrent I/O is safe: any strand mid-`send_to` / `recv_from`
300/// holds its own `Arc<UdpSocket>` clone, so closing the registry slot
301/// from another strand only drops the registry's reference. The
302/// in-flight syscall completes; the OS-level close fires when the
303/// last `Arc` is dropped. The id is recycled to the free list as
304/// soon as `close` returns, regardless of any in-flight strand.
305///
306/// # Safety
307/// Stack must have an Int (socket) on top.
308#[unsafe(no_mangle)]
309pub unsafe extern "C" fn patch_seq_udp_close(stack: Stack) -> Stack {
310    unsafe {
311        let (stack, socket_val) = pop(stack);
312        let socket_id = match socket_val {
313            Value::Int(id) if id >= 0 => id as usize,
314            _ => return push(stack, Value::Bool(false)),
315        };
316
317        let mut sockets = SOCKETS.lock().unwrap();
318        let existed = sockets.free(socket_id);
319        push(stack, Value::Bool(existed))
320    }
321}
322
323// Public re-exports with short names for in-module callers — the
324// `tests` submodule below imports them via `use super::*`. The
325// crate-root re-exports in `lib.rs` are the linker-facing aliases.
326pub use patch_seq_udp_bind as udp_bind;
327pub use patch_seq_udp_close as udp_close;
328pub use patch_seq_udp_receive_from as udp_receive_from;
329pub use patch_seq_udp_send_to as udp_send_to;
330
331#[cfg(test)]
332mod tests;