seq_runtime/
tcp.rs

1//! TCP Socket Operations for Seq
2//!
3//! Provides non-blocking TCP socket operations using May's coroutine-aware I/O.
4//! All operations yield the strand instead of blocking the OS thread.
5//!
6//! These functions are exported with C ABI for LLVM codegen.
7
8use crate::stack::{Stack, pop, push};
9use crate::value::Value;
10use may::net::{TcpListener, TcpStream};
11use std::io::{Read, Write};
12use std::sync::Mutex;
13
14// Maximum number of concurrent connections to prevent unbounded growth
15const MAX_SOCKETS: usize = 10_000;
16
17// Maximum bytes to read from a socket to prevent memory exhaustion attacks
18const MAX_READ_SIZE: usize = 1_048_576; // 1 MB
19
20// Socket registry with ID reuse via free list
21struct SocketRegistry<T> {
22    sockets: Vec<Option<T>>,
23    free_ids: Vec<usize>,
24}
25
26impl<T> SocketRegistry<T> {
27    const fn new() -> Self {
28        Self {
29            sockets: Vec::new(),
30            free_ids: Vec::new(),
31        }
32    }
33
34    fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
35        // Try to reuse a free ID first
36        if let Some(id) = self.free_ids.pop() {
37            self.sockets[id] = Some(socket);
38            return Ok(id as i64);
39        }
40
41        // Check max connections limit
42        if self.sockets.len() >= MAX_SOCKETS {
43            return Err("Maximum socket limit reached");
44        }
45
46        // Allocate new ID
47        let id = self.sockets.len();
48        self.sockets.push(Some(socket));
49        Ok(id as i64)
50    }
51
52    fn get_mut(&mut self, id: usize) -> Option<&mut Option<T>> {
53        self.sockets.get_mut(id)
54    }
55
56    fn free(&mut self, id: usize) {
57        if let Some(slot) = self.sockets.get_mut(id)
58            && slot.is_some()
59        {
60            *slot = None;
61            self.free_ids.push(id);
62        }
63    }
64}
65
66// Global registry for TCP listeners and streams
67static LISTENERS: Mutex<SocketRegistry<TcpListener>> = Mutex::new(SocketRegistry::new());
68static STREAMS: Mutex<SocketRegistry<TcpStream>> = Mutex::new(SocketRegistry::new());
69
70/// TCP listen on a port
71///
72/// Stack effect: ( port -- listener_id Bool )
73///
74/// Binds to 0.0.0.0:port and returns a listener ID with success flag.
75/// Returns (0, false) on failure (invalid port, bind error, socket limit).
76///
77/// # Safety
78/// Stack must have an Int (port number) on top
79#[unsafe(no_mangle)]
80pub unsafe extern "C" fn patch_seq_tcp_listen(stack: Stack) -> Stack {
81    unsafe {
82        let (stack, port_val) = pop(stack);
83        let port = match port_val {
84            Value::Int(p) => p,
85            _ => {
86                // Type error - return failure
87                let stack = push(stack, Value::Int(0));
88                return push(stack, Value::Bool(false));
89            }
90        };
91
92        // Validate port range (1-65535, or 0 for OS-assigned)
93        if !(0..=65535).contains(&port) {
94            let stack = push(stack, Value::Int(0));
95            return push(stack, Value::Bool(false));
96        }
97
98        // Bind to the port (non-blocking via May)
99        let addr = format!("0.0.0.0:{}", port);
100        let listener = match TcpListener::bind(&addr) {
101            Ok(l) => l,
102            Err(_) => {
103                let stack = push(stack, Value::Int(0));
104                return push(stack, Value::Bool(false));
105            }
106        };
107
108        // Store listener and get ID
109        let mut listeners = LISTENERS.lock().unwrap();
110        match listeners.allocate(listener) {
111            Ok(listener_id) => {
112                let stack = push(stack, Value::Int(listener_id));
113                push(stack, Value::Bool(true))
114            }
115            Err(_) => {
116                let stack = push(stack, Value::Int(0));
117                push(stack, Value::Bool(false))
118            }
119        }
120    }
121}
122
123/// TCP accept a connection
124///
125/// Stack effect: ( listener_id -- client_id Bool )
126///
127/// Accepts a connection (yields the strand until one arrives).
128/// Returns (0, false) on failure (invalid listener, accept error, socket limit).
129///
130/// # Safety
131/// Stack must have an Int (listener_id) on top
132#[unsafe(no_mangle)]
133pub unsafe extern "C" fn patch_seq_tcp_accept(stack: Stack) -> Stack {
134    unsafe {
135        let (stack, listener_id_val) = pop(stack);
136        let listener_id = match listener_id_val {
137            Value::Int(id) => id as usize,
138            _ => {
139                let stack = push(stack, Value::Int(0));
140                return push(stack, Value::Bool(false));
141            }
142        };
143
144        // Take the listener out temporarily (so we don't hold lock during accept)
145        let listener = {
146            let mut listeners = LISTENERS.lock().unwrap();
147            match listeners.get_mut(listener_id).and_then(|opt| opt.take()) {
148                Some(l) => l,
149                None => {
150                    let stack = push(stack, Value::Int(0));
151                    return push(stack, Value::Bool(false));
152                }
153            }
154        };
155        // Lock released
156
157        // Accept connection (this yields the strand, doesn't block OS thread)
158        let (stream, _addr) = match listener.accept() {
159            Ok(result) => result,
160            Err(_) => {
161                // Put listener back before returning
162                let mut listeners = LISTENERS.lock().unwrap();
163                if let Some(slot) = listeners.get_mut(listener_id) {
164                    *slot = Some(listener);
165                }
166                let stack = push(stack, Value::Int(0));
167                return push(stack, Value::Bool(false));
168            }
169        };
170
171        // Put the listener back
172        {
173            let mut listeners = LISTENERS.lock().unwrap();
174            if let Some(slot) = listeners.get_mut(listener_id) {
175                *slot = Some(listener);
176            }
177        }
178
179        // Store stream and get ID
180        let mut streams = STREAMS.lock().unwrap();
181        match streams.allocate(stream) {
182            Ok(client_id) => {
183                let stack = push(stack, Value::Int(client_id));
184                push(stack, Value::Bool(true))
185            }
186            Err(_) => {
187                let stack = push(stack, Value::Int(0));
188                push(stack, Value::Bool(false))
189            }
190        }
191    }
192}
193
194/// TCP read from a socket
195///
196/// Stack effect: ( socket_id -- string Bool )
197///
198/// Reads all available data from the socket.
199/// Returns ("", false) on failure (invalid socket, read error, size limit, invalid UTF-8).
200///
201/// # Safety
202/// Stack must have an Int (socket_id) on top
203#[unsafe(no_mangle)]
204pub unsafe extern "C" fn patch_seq_tcp_read(stack: Stack) -> Stack {
205    unsafe {
206        let (stack, socket_id_val) = pop(stack);
207        let socket_id = match socket_id_val {
208            Value::Int(id) => id as usize,
209            _ => {
210                let stack = push(stack, Value::String("".into()));
211                return push(stack, Value::Bool(false));
212            }
213        };
214
215        // Take the stream out of the registry (so we don't hold the lock during I/O)
216        let mut stream = {
217            let mut streams = STREAMS.lock().unwrap();
218            match streams.get_mut(socket_id).and_then(|opt| opt.take()) {
219                Some(s) => s,
220                None => {
221                    let stack = push(stack, Value::String("".into()));
222                    return push(stack, Value::Bool(false));
223                }
224            }
225        };
226        // Registry lock is now released
227
228        // Read available data (this yields the strand, doesn't block OS thread)
229        // Reads all currently available data up to MAX_READ_SIZE
230        // Returns when: data is available and read, EOF, or WouldBlock
231        let mut buffer = Vec::new();
232        let mut chunk = [0u8; 4096];
233        let mut read_error = false;
234
235        // Read until we get data, EOF, or error
236        // For HTTP: Read once and return immediately to avoid blocking when client waits for response
237        loop {
238            // Check size limit to prevent memory exhaustion
239            if buffer.len() >= MAX_READ_SIZE {
240                read_error = true;
241                break;
242            }
243
244            match stream.read(&mut chunk) {
245                Ok(0) => {
246                    break;
247                }
248                Ok(n) => {
249                    // Don't exceed max size even with partial chunk
250                    let bytes_to_add = n.min(MAX_READ_SIZE.saturating_sub(buffer.len()));
251                    buffer.extend_from_slice(&chunk[..bytes_to_add]);
252                    if bytes_to_add < n {
253                        break; // Hit limit
254                    }
255                    // Return immediately after reading data
256                    // May's cooperative I/O would block on next read() if no more data available
257                    // Client might be waiting for our response, so don't wait for more
258                    break;
259                }
260                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
261                    // No data available yet - yield and wait for May's scheduler to wake us
262                    // when data arrives, or connection closes
263                    if buffer.is_empty() {
264                        may::coroutine::yield_now();
265                        continue;
266                    }
267                    // If we already have some data, return it
268                    break;
269                }
270                Err(_) => {
271                    read_error = true;
272                    break;
273                }
274            }
275        }
276
277        // Put the stream back
278        {
279            let mut streams = STREAMS.lock().unwrap();
280            if let Some(slot) = streams.get_mut(socket_id) {
281                *slot = Some(stream);
282            }
283        }
284
285        if read_error {
286            let stack = push(stack, Value::String("".into()));
287            return push(stack, Value::Bool(false));
288        }
289
290        match String::from_utf8(buffer) {
291            Ok(data) => {
292                let stack = push(stack, Value::String(data.into()));
293                push(stack, Value::Bool(true))
294            }
295            Err(_) => {
296                let stack = push(stack, Value::String("".into()));
297                push(stack, Value::Bool(false))
298            }
299        }
300    }
301}
302
303/// TCP write to a socket
304///
305/// Stack effect: ( string socket_id -- Bool )
306///
307/// Writes string to the socket.
308/// Returns false on failure (invalid socket, write error).
309///
310/// # Safety
311/// Stack must have Int (socket_id) and String on top
312#[unsafe(no_mangle)]
313pub unsafe extern "C" fn patch_seq_tcp_write(stack: Stack) -> Stack {
314    unsafe {
315        let (stack, socket_id_val) = pop(stack);
316        let socket_id = match socket_id_val {
317            Value::Int(id) => id as usize,
318            _ => {
319                return push(stack, Value::Bool(false));
320            }
321        };
322
323        let (stack, data_val) = pop(stack);
324        let data = match data_val {
325            Value::String(s) => s,
326            _ => {
327                return push(stack, Value::Bool(false));
328            }
329        };
330
331        // Take the stream out of the registry (so we don't hold the lock during I/O)
332        let mut stream = {
333            let mut streams = STREAMS.lock().unwrap();
334            match streams.get_mut(socket_id).and_then(|opt| opt.take()) {
335                Some(s) => s,
336                None => {
337                    return push(stack, Value::Bool(false));
338                }
339            }
340        };
341        // Registry lock is now released
342
343        // Write data (non-blocking via May, yields strand as needed)
344        let write_result = stream.write_all(data.as_str().as_bytes());
345        let flush_result = if write_result.is_ok() {
346            stream.flush()
347        } else {
348            write_result
349        };
350
351        // Put the stream back
352        {
353            let mut streams = STREAMS.lock().unwrap();
354            if let Some(slot) = streams.get_mut(socket_id) {
355                *slot = Some(stream);
356            }
357        }
358
359        push(stack, Value::Bool(flush_result.is_ok()))
360    }
361}
362
363/// TCP close a socket
364///
365/// Stack effect: ( socket_id -- Bool )
366///
367/// Closes the socket connection and frees the socket ID for reuse.
368/// Returns true on success, false if socket_id was invalid.
369///
370/// # Safety
371/// Stack must have an Int (socket_id) on top
372#[unsafe(no_mangle)]
373pub unsafe extern "C" fn patch_seq_tcp_close(stack: Stack) -> Stack {
374    unsafe {
375        let (stack, socket_id_val) = pop(stack);
376        let socket_id = match socket_id_val {
377            Value::Int(id) => id as usize,
378            _ => {
379                return push(stack, Value::Bool(false));
380            }
381        };
382
383        // Check if socket exists before freeing
384        let mut streams = STREAMS.lock().unwrap();
385        let existed = streams
386            .get_mut(socket_id)
387            .map(|slot| slot.is_some())
388            .unwrap_or(false);
389
390        if existed {
391            streams.free(socket_id);
392        }
393
394        push(stack, Value::Bool(existed))
395    }
396}
397
398// Public re-exports with short names for internal use
399pub use patch_seq_tcp_accept as tcp_accept;
400pub use patch_seq_tcp_close as tcp_close;
401pub use patch_seq_tcp_listen as tcp_listen;
402pub use patch_seq_tcp_read as tcp_read;
403pub use patch_seq_tcp_write as tcp_write;
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::arithmetic::push_int;
409    use crate::scheduler::scheduler_init;
410
411    #[test]
412    fn test_tcp_listen() {
413        unsafe {
414            scheduler_init();
415
416            let stack = crate::stack::alloc_test_stack();
417            let stack = push_int(stack, 0); // Port 0 = OS assigns random port
418            let stack = tcp_listen(stack);
419
420            // Now returns (Int, Bool) - Bool on top
421            let (stack, success) = pop(stack);
422            assert!(
423                matches!(success, Value::Bool(true)),
424                "tcp_listen should succeed"
425            );
426
427            let (_stack, result) = pop(stack);
428            match result {
429                Value::Int(listener_id) => {
430                    assert!(listener_id >= 0, "Listener ID should be non-negative");
431                }
432                _ => panic!("Expected Int (listener_id), got {:?}", result),
433            }
434        }
435    }
436
437    #[test]
438    fn test_tcp_listen_invalid_port_negative() {
439        unsafe {
440            scheduler_init();
441            let stack = crate::stack::alloc_test_stack();
442            let stack = push_int(stack, -1);
443            let stack = tcp_listen(stack);
444
445            // Invalid port returns (0, false)
446            let (stack, success) = pop(stack);
447            assert!(
448                matches!(success, Value::Bool(false)),
449                "Invalid port should return false"
450            );
451            let (_stack, result) = pop(stack);
452            assert!(
453                matches!(result, Value::Int(0)),
454                "Invalid port should return 0"
455            );
456        }
457    }
458
459    #[test]
460    fn test_tcp_listen_invalid_port_too_high() {
461        unsafe {
462            scheduler_init();
463            let stack = crate::stack::alloc_test_stack();
464            let stack = push_int(stack, 65536);
465            let stack = tcp_listen(stack);
466
467            // Invalid port returns (0, false)
468            let (stack, success) = pop(stack);
469            assert!(
470                matches!(success, Value::Bool(false)),
471                "Invalid port should return false"
472            );
473            let (_stack, result) = pop(stack);
474            assert!(
475                matches!(result, Value::Int(0)),
476                "Invalid port should return 0"
477            );
478        }
479    }
480
481    #[test]
482    fn test_tcp_port_range_valid() {
483        unsafe {
484            scheduler_init();
485
486            // Test port 0 (OS-assigned)
487            let stack = push_int(crate::stack::alloc_test_stack(), 0);
488            let stack = tcp_listen(stack);
489            let (stack, success) = pop(stack);
490            assert!(matches!(success, Value::Bool(true)));
491            let (_, result) = pop(stack);
492            assert!(matches!(result, Value::Int(_)));
493
494            // Test a non-privileged port (ports 1-1023 require root on Unix)
495            // Use port 9999 which should be available and doesn't require privileges
496            let stack = push_int(crate::stack::alloc_test_stack(), 9999);
497            let stack = tcp_listen(stack);
498            let (stack, success) = pop(stack);
499            assert!(matches!(success, Value::Bool(true)));
500            let (_, result) = pop(stack);
501            assert!(matches!(result, Value::Int(_)));
502
503            // Note: Can't easily test all edge cases (port 1, 65535) as they
504            // may require privileges or be in use. Port validation logic is
505            // tested separately in the invalid port tests.
506        }
507    }
508
509    #[test]
510    fn test_socket_id_reuse_after_close() {
511        unsafe {
512            scheduler_init();
513
514            // Create a listener and accept a hypothetical connection
515            let stack = push_int(crate::stack::alloc_test_stack(), 0);
516            let stack = tcp_listen(stack);
517            let (stack, success) = pop(stack);
518            assert!(matches!(success, Value::Bool(true)));
519            let (_stack, listener_result) = pop(stack);
520
521            let listener_id = match listener_result {
522                Value::Int(id) => id,
523                _ => panic!("Expected listener ID"),
524            };
525
526            // Verify listener ID is valid
527            assert!(listener_id >= 0);
528
529            // Note: We can't easily test connection acceptance without
530            // actually making a connection, but we can test the registry behavior
531
532            // Clean up
533        }
534    }
535
536    #[test]
537    fn test_tcp_read_invalid_socket_id() {
538        unsafe {
539            scheduler_init();
540
541            // Invalid socket ID now returns ("", false) instead of panicking
542            let stack = push_int(crate::stack::alloc_test_stack(), 9999);
543            let stack = tcp_read(stack);
544
545            let (stack, success) = pop(stack);
546            assert!(
547                matches!(success, Value::Bool(false)),
548                "Invalid socket should return false"
549            );
550            let (_stack, result) = pop(stack);
551            match result {
552                Value::String(s) => assert_eq!(s.as_str(), ""),
553                _ => panic!("Expected empty string"),
554            }
555        }
556    }
557
558    #[test]
559    fn test_tcp_write_invalid_socket_id() {
560        unsafe {
561            scheduler_init();
562
563            // Invalid socket ID now returns false instead of panicking
564            let stack = push(
565                crate::stack::alloc_test_stack(),
566                Value::String("test".into()),
567            );
568            let stack = push_int(stack, 9999);
569            let stack = tcp_write(stack);
570
571            let (_stack, success) = pop(stack);
572            assert!(
573                matches!(success, Value::Bool(false)),
574                "Invalid socket should return false"
575            );
576        }
577    }
578
579    #[test]
580    fn test_tcp_close_idempotent() {
581        unsafe {
582            scheduler_init();
583
584            // Create a socket to close
585            let stack = push_int(crate::stack::alloc_test_stack(), 0);
586            let stack = tcp_listen(stack);
587            let (stack, success) = pop(stack);
588            assert!(matches!(success, Value::Bool(true)));
589            let (stack, _listener_result) = pop(stack);
590
591            // Close an invalid socket - now returns false instead of crashing
592            let stack = push_int(stack, 9999);
593            let stack = tcp_close(stack);
594
595            let (_stack, success) = pop(stack);
596            assert!(
597                matches!(success, Value::Bool(false)),
598                "Invalid socket close should return false"
599            );
600        }
601    }
602
603    #[test]
604    fn test_socket_registry_capacity() {
605        // Test that MAX_SOCKETS limit is enforced
606        // Note: We can't easily allocate 10,000 real sockets in a unit test,
607        // but the limit check is in the code at lines 38-41
608        // This test documents the expected behavior
609
610        // If we could allocate that many:
611        // - First 10,000 allocations should succeed
612        // - 10,001st allocation should panic with "Maximum socket limit reached"
613
614        // For now, just verify the constant exists
615        assert_eq!(MAX_SOCKETS, 10_000);
616    }
617
618    #[test]
619    fn test_max_read_size_limit() {
620        // Test that MAX_READ_SIZE limit exists and is reasonable
621        assert_eq!(MAX_READ_SIZE, 1_048_576); // 1 MB
622
623        // In practice, if tcp_read receives more than 1 MB, it should panic
624        // with "read size limit exceeded". Testing this requires a real socket
625        // with more than 1 MB of data, which is impractical for unit tests.
626    }
627}