seq_runtime/
tcp.rs

1//! TCP Socket Operations for Seq
2//!
3//! Provides non-blocking TCP socket operations using May's coroutine-aware I/O.
4//! All operations yield the strand instead of blocking the OS thread.
5//!
6//! These functions are exported with C ABI for LLVM codegen.
7
8use crate::stack::{Stack, pop, push};
9use crate::value::Value;
10use may::net::{TcpListener, TcpStream};
11use std::io::{Read, Write};
12use std::sync::Mutex;
13
14// Maximum number of concurrent connections to prevent unbounded growth
15const MAX_SOCKETS: usize = 10_000;
16
17// Maximum bytes to read from a socket to prevent memory exhaustion attacks
18const MAX_READ_SIZE: usize = 1_048_576; // 1 MB
19
20// Socket registry with ID reuse via free list
21struct SocketRegistry<T> {
22    sockets: Vec<Option<T>>,
23    free_ids: Vec<usize>,
24}
25
26impl<T> SocketRegistry<T> {
27    const fn new() -> Self {
28        Self {
29            sockets: Vec::new(),
30            free_ids: Vec::new(),
31        }
32    }
33
34    fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
35        // Try to reuse a free ID first
36        if let Some(id) = self.free_ids.pop() {
37            self.sockets[id] = Some(socket);
38            return Ok(id as i64);
39        }
40
41        // Check max connections limit
42        if self.sockets.len() >= MAX_SOCKETS {
43            return Err("Maximum socket limit reached");
44        }
45
46        // Allocate new ID
47        let id = self.sockets.len();
48        self.sockets.push(Some(socket));
49        Ok(id as i64)
50    }
51
52    fn get_mut(&mut self, id: usize) -> Option<&mut Option<T>> {
53        self.sockets.get_mut(id)
54    }
55
56    fn free(&mut self, id: usize) {
57        if let Some(slot) = self.sockets.get_mut(id)
58            && slot.is_some()
59        {
60            *slot = None;
61            self.free_ids.push(id);
62        }
63    }
64}
65
66// Global registry for TCP listeners and streams
67static LISTENERS: Mutex<SocketRegistry<TcpListener>> = Mutex::new(SocketRegistry::new());
68static STREAMS: Mutex<SocketRegistry<TcpStream>> = Mutex::new(SocketRegistry::new());
69
70/// TCP listen on a port
71///
72/// Stack effect: ( port -- listener_id )
73///
74/// Binds to 0.0.0.0:port and returns a listener ID
75///
76/// # Safety
77/// Stack must have an Int (port number) on top
78#[unsafe(no_mangle)]
79pub unsafe extern "C" fn patch_seq_tcp_listen(stack: Stack) -> Stack {
80    unsafe {
81        let (stack, port_val) = pop(stack);
82        let port = match port_val {
83            Value::Int(p) => p,
84            _ => panic!(
85                "tcp_listen: expected Int (port) on stack, got {:?}",
86                port_val
87            ),
88        };
89
90        // Validate port range (1-65535, or 0 for OS-assigned)
91        if !(0..=65535).contains(&port) {
92            panic!("tcp_listen: invalid port {}, must be 0-65535", port);
93        }
94
95        // Bind to the port (non-blocking via May)
96        let addr = format!("0.0.0.0:{}", port);
97        let listener = TcpListener::bind(&addr)
98            .unwrap_or_else(|e| panic!("tcp_listen: failed to bind to {}: {}", addr, e));
99
100        // Store listener and get ID
101        let mut listeners = LISTENERS.lock().unwrap();
102        let listener_id = listeners
103            .allocate(listener)
104            .unwrap_or_else(|e| panic!("tcp_listen: {}", e));
105
106        push(stack, Value::Int(listener_id))
107    }
108}
109
110/// TCP accept a connection
111///
112/// Stack effect: ( listener_id -- client_id )
113///
114/// Accepts a connection (yields the strand until one arrives)
115///
116/// # Safety
117/// Stack must have an Int (listener_id) on top
118#[unsafe(no_mangle)]
119pub unsafe extern "C" fn patch_seq_tcp_accept(stack: Stack) -> Stack {
120    unsafe {
121        let (stack, listener_id_val) = pop(stack);
122        let listener_id = match listener_id_val {
123            Value::Int(id) => id as usize,
124            _ => panic!(
125                "tcp_accept: expected Int (listener_id), got {:?}",
126                listener_id_val
127            ),
128        };
129
130        // Take the listener out temporarily (so we don't hold lock during accept)
131        let listener = {
132            let mut listeners = LISTENERS.lock().unwrap();
133            listeners
134                .get_mut(listener_id)
135                .and_then(|opt| opt.take())
136                .unwrap_or_else(|| panic!("tcp_accept: invalid listener_id {}", listener_id))
137        };
138        // Lock released
139
140        // Accept connection (this yields the strand, doesn't block OS thread)
141        let (stream, _addr) = listener
142            .accept()
143            .unwrap_or_else(|e| panic!("tcp_accept: failed to accept connection: {}", e));
144
145        // Put the listener back
146        {
147            let mut listeners = LISTENERS.lock().unwrap();
148            if let Some(slot) = listeners.get_mut(listener_id) {
149                *slot = Some(listener);
150            }
151        }
152
153        // Store stream and get ID
154        let mut streams = STREAMS.lock().unwrap();
155        let client_id = streams
156            .allocate(stream)
157            .unwrap_or_else(|e| panic!("tcp_accept: {}", e));
158
159        push(stack, Value::Int(client_id))
160    }
161}
162
163/// TCP read from a socket
164///
165/// Stack effect: ( socket_id -- string )
166///
167/// Reads all available data from the socket
168///
169/// # Safety
170/// Stack must have an Int (socket_id) on top
171#[unsafe(no_mangle)]
172pub unsafe extern "C" fn patch_seq_tcp_read(stack: Stack) -> Stack {
173    unsafe {
174        let (stack, socket_id_val) = pop(stack);
175        let socket_id = match socket_id_val {
176            Value::Int(id) => id as usize,
177            _ => panic!(
178                "tcp_read: expected Int (socket_id), got {:?}",
179                socket_id_val
180            ),
181        };
182
183        // Take the stream out of the registry (so we don't hold the lock during I/O)
184        let mut stream = {
185            let mut streams = STREAMS.lock().unwrap();
186            streams
187                .get_mut(socket_id)
188                .and_then(|opt| opt.take())
189                .unwrap_or_else(|| panic!("tcp_read: invalid socket_id {}", socket_id))
190        };
191        // Registry lock is now released
192
193        // Read available data (this yields the strand, doesn't block OS thread)
194        // Reads all currently available data up to MAX_READ_SIZE
195        // Returns when: data is available and read, EOF, or WouldBlock
196        let mut buffer = Vec::new();
197        let mut chunk = [0u8; 4096];
198
199        // Read until we get data, EOF, or error
200        // For HTTP: Read once and return immediately to avoid blocking when client waits for response
201        loop {
202            // Check size limit to prevent memory exhaustion
203            if buffer.len() >= MAX_READ_SIZE {
204                panic!(
205                    "tcp_read: read size limit exceeded ({} bytes). Possible memory exhaustion attack.",
206                    MAX_READ_SIZE
207                );
208            }
209
210            match stream.read(&mut chunk) {
211                Ok(0) => {
212                    break;
213                }
214                Ok(n) => {
215                    // Don't exceed max size even with partial chunk
216                    let bytes_to_add = n.min(MAX_READ_SIZE.saturating_sub(buffer.len()));
217                    buffer.extend_from_slice(&chunk[..bytes_to_add]);
218                    if bytes_to_add < n {
219                        break; // Hit limit
220                    }
221                    // Return immediately after reading data
222                    // May's cooperative I/O would block on next read() if no more data available
223                    // Client might be waiting for our response, so don't wait for more
224                    break;
225                }
226                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
227                    // No data available yet - yield and wait for May's scheduler to wake us
228                    // when data arrives, or connection closes
229                    if buffer.is_empty() {
230                        may::coroutine::yield_now();
231                        continue;
232                    }
233                    // If we already have some data, return it
234                    break;
235                }
236                Err(e) => panic!("tcp_read: failed to read from socket: {}", e),
237            }
238        }
239
240        let data = String::from_utf8(buffer)
241            .unwrap_or_else(|e| panic!("tcp_read: invalid UTF-8 data: {}", e));
242
243        // Put the stream back
244        {
245            let mut streams = STREAMS.lock().unwrap();
246            if let Some(slot) = streams.get_mut(socket_id) {
247                *slot = Some(stream);
248            }
249        }
250
251        push(stack, Value::String(data.into()))
252    }
253}
254
255/// TCP write to a socket
256///
257/// Stack effect: ( string socket_id -- )
258///
259/// Writes string to the socket
260///
261/// # Safety
262/// Stack must have Int (socket_id) and String on top
263#[unsafe(no_mangle)]
264pub unsafe extern "C" fn patch_seq_tcp_write(stack: Stack) -> Stack {
265    unsafe {
266        let (stack, socket_id_val) = pop(stack);
267        let socket_id = match socket_id_val {
268            Value::Int(id) => id as usize,
269            _ => panic!(
270                "tcp_write: expected Int (socket_id), got {:?}",
271                socket_id_val
272            ),
273        };
274
275        let (stack, data_val) = pop(stack);
276        let data = match data_val {
277            Value::String(s) => s,
278            _ => panic!("tcp_write: expected String, got {:?}", data_val),
279        };
280
281        // Take the stream out of the registry (so we don't hold the lock during I/O)
282        let mut stream = {
283            let mut streams = STREAMS.lock().unwrap();
284            streams
285                .get_mut(socket_id)
286                .and_then(|opt| opt.take())
287                .unwrap_or_else(|| panic!("tcp_write: invalid socket_id {}", socket_id))
288        };
289        // Registry lock is now released
290
291        // Write data (non-blocking via May, yields strand as needed)
292        stream
293            .write_all(data.as_str().as_bytes())
294            .unwrap_or_else(|e| panic!("tcp_write: failed to write to socket: {}", e));
295
296        stream
297            .flush()
298            .unwrap_or_else(|e| panic!("tcp_write: failed to flush socket: {}", e));
299
300        // Put the stream back
301        {
302            let mut streams = STREAMS.lock().unwrap();
303            if let Some(slot) = streams.get_mut(socket_id) {
304                *slot = Some(stream);
305            }
306        }
307
308        stack
309    }
310}
311
312/// TCP close a socket
313///
314/// Stack effect: ( socket_id -- )
315///
316/// Closes the socket connection and frees the socket ID for reuse
317///
318/// # Safety
319/// Stack must have an Int (socket_id) on top
320#[unsafe(no_mangle)]
321pub unsafe extern "C" fn patch_seq_tcp_close(stack: Stack) -> Stack {
322    unsafe {
323        let (stack, socket_id_val) = pop(stack);
324        let socket_id = match socket_id_val {
325            Value::Int(id) => id as usize,
326            _ => panic!(
327                "tcp_close: expected Int (socket_id), got {:?}",
328                socket_id_val
329            ),
330        };
331
332        // Remove the stream and mark ID as free for reuse
333        let mut streams = STREAMS.lock().unwrap();
334        streams.free(socket_id);
335
336        stack
337    }
338}
339
340// Public re-exports with short names for internal use
341pub use patch_seq_tcp_accept as tcp_accept;
342pub use patch_seq_tcp_close as tcp_close;
343pub use patch_seq_tcp_listen as tcp_listen;
344pub use patch_seq_tcp_read as tcp_read;
345pub use patch_seq_tcp_write as tcp_write;
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use crate::arithmetic::push_int;
351    use crate::scheduler::scheduler_init;
352
353    #[test]
354    fn test_tcp_listen() {
355        unsafe {
356            scheduler_init();
357
358            let stack = std::ptr::null_mut();
359            let stack = push_int(stack, 0); // Port 0 = OS assigns random port
360            let stack = tcp_listen(stack);
361
362            let (stack, result) = pop(stack);
363            match result {
364                Value::Int(listener_id) => {
365                    assert!(listener_id >= 0, "Listener ID should be non-negative");
366                }
367                _ => panic!("Expected Int (listener_id), got {:?}", result),
368            }
369            assert!(stack.is_null());
370        }
371    }
372
373    #[test]
374    fn test_tcp_listen_invalid_port_negative() {
375        unsafe {
376            scheduler_init();
377            let stack = std::ptr::null_mut();
378            let _stack = push_int(stack, -1);
379
380            // Note: tcp_listen is extern "C" so it aborts on panic
381            // We document that invalid ports cause panics
382            // In practice, these would be caught by the type system
383            // (user code would provide validated ints)
384
385            // tcp_listen(stack); // Would abort
386        }
387    }
388
389    #[test]
390    fn test_tcp_listen_invalid_port_too_high() {
391        unsafe {
392            scheduler_init();
393            let stack = std::ptr::null_mut();
394            let _stack = push_int(stack, 65536);
395
396            // Note: tcp_listen is extern "C" so it aborts on panic
397            // We document that invalid ports cause panics
398
399            // tcp_listen(stack); // Would abort
400        }
401    }
402
403    #[test]
404    fn test_tcp_port_range_valid() {
405        unsafe {
406            scheduler_init();
407
408            // Test port 0 (OS-assigned)
409            let stack = push_int(std::ptr::null_mut(), 0);
410            let stack = tcp_listen(stack);
411            let (_, result) = pop(stack);
412            assert!(matches!(result, Value::Int(_)));
413
414            // Test a non-privileged port (ports 1-1023 require root on Unix)
415            // Use port 9999 which should be available and doesn't require privileges
416            let stack = push_int(std::ptr::null_mut(), 9999);
417            let stack = tcp_listen(stack);
418            let (_, result) = pop(stack);
419            assert!(matches!(result, Value::Int(_)));
420
421            // Note: Can't easily test all edge cases (port 1, 65535) as they
422            // may require privileges or be in use. Port validation logic is
423            // tested separately in the invalid port tests.
424        }
425    }
426
427    #[test]
428    fn test_socket_id_reuse_after_close() {
429        unsafe {
430            scheduler_init();
431
432            // Create a listener and accept a hypothetical connection
433            let stack = push_int(std::ptr::null_mut(), 0);
434            let stack = tcp_listen(stack);
435            let (stack, listener_result) = pop(stack);
436
437            let listener_id = match listener_result {
438                Value::Int(id) => id,
439                _ => panic!("Expected listener ID"),
440            };
441
442            // Verify listener ID is valid
443            assert!(listener_id >= 0);
444
445            // Note: We can't easily test connection acceptance without
446            // actually making a connection, but we can test the registry behavior
447
448            // Clean up
449            assert!(stack.is_null());
450        }
451    }
452
453    #[test]
454    fn test_tcp_read_invalid_socket_id() {
455        unsafe {
456            scheduler_init();
457
458            // Note: tcp_read is extern "C" so it aborts on panic
459            // Invalid socket IDs cause panics which are documented behavior
460            // let stack = push_int(std::ptr::null_mut(), 9999);
461            // tcp_read(stack); // Would abort
462
463            // Instead, we verify that valid operations work
464            // and document that invalid IDs are programming errors
465        }
466    }
467
468    #[test]
469    fn test_tcp_write_invalid_socket_id() {
470        unsafe {
471            scheduler_init();
472
473            // Note: tcp_write is extern "C" so it aborts on panic
474            // Invalid socket IDs cause panics which are documented behavior
475            // let stack = push(std::ptr::null_mut(), Value::String("test".into()));
476            // let stack = push_int(stack, 9999);
477            // tcp_write(stack); // Would abort
478        }
479    }
480
481    #[test]
482    fn test_tcp_close_idempotent() {
483        unsafe {
484            scheduler_init();
485
486            // Create a socket to close
487            let stack = push_int(std::ptr::null_mut(), 0);
488            let stack = tcp_listen(stack);
489            let (stack, _listener_result) = pop(stack);
490
491            // Close is idempotent - closing an already closed or invalid socket
492            // should not crash (it just does nothing via free())
493            let stack = push_int(stack, 9999);
494            let stack = tcp_close(stack);
495
496            assert!(stack.is_null());
497        }
498    }
499
500    #[test]
501    fn test_socket_registry_capacity() {
502        // Test that MAX_SOCKETS limit is enforced
503        // Note: We can't easily allocate 10,000 real sockets in a unit test,
504        // but the limit check is in the code at lines 38-41
505        // This test documents the expected behavior
506
507        // If we could allocate that many:
508        // - First 10,000 allocations should succeed
509        // - 10,001st allocation should panic with "Maximum socket limit reached"
510
511        // For now, just verify the constant exists
512        assert_eq!(MAX_SOCKETS, 10_000);
513    }
514
515    #[test]
516    fn test_max_read_size_limit() {
517        // Test that MAX_READ_SIZE limit exists and is reasonable
518        assert_eq!(MAX_READ_SIZE, 1_048_576); // 1 MB
519
520        // In practice, if tcp_read receives more than 1 MB, it should panic
521        // with "read size limit exceeded". Testing this requires a real socket
522        // with more than 1 MB of data, which is impractical for unit tests.
523    }
524}