seq_runtime/tcp.rs
1//! TCP Socket Operations for Seq
2//!
3//! Provides non-blocking TCP socket operations using May's coroutine-aware I/O.
4//! All operations yield the strand instead of blocking the OS thread.
5//!
6//! These functions are exported with C ABI for LLVM codegen.
7
8use crate::stack::{Stack, pop, push};
9use crate::value::Value;
10use may::net::{TcpListener, TcpStream};
11use std::io::{Read, Write};
12use std::sync::Mutex;
13
14// Maximum number of concurrent connections to prevent unbounded growth
15const MAX_SOCKETS: usize = 10_000;
16
17// Maximum bytes to read from a socket to prevent memory exhaustion attacks
18const MAX_READ_SIZE: usize = 1_048_576; // 1 MB
19
20// Socket registry with ID reuse via free list
21struct SocketRegistry<T> {
22 sockets: Vec<Option<T>>,
23 free_ids: Vec<usize>,
24}
25
26impl<T> SocketRegistry<T> {
27 const fn new() -> Self {
28 Self {
29 sockets: Vec::new(),
30 free_ids: Vec::new(),
31 }
32 }
33
34 fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
35 // Try to reuse a free ID first
36 if let Some(id) = self.free_ids.pop() {
37 self.sockets[id] = Some(socket);
38 return Ok(id as i64);
39 }
40
41 // Check max connections limit
42 if self.sockets.len() >= MAX_SOCKETS {
43 return Err("Maximum socket limit reached");
44 }
45
46 // Allocate new ID
47 let id = self.sockets.len();
48 self.sockets.push(Some(socket));
49 Ok(id as i64)
50 }
51
52 fn get_mut(&mut self, id: usize) -> Option<&mut Option<T>> {
53 self.sockets.get_mut(id)
54 }
55
56 fn free(&mut self, id: usize) {
57 if let Some(slot) = self.sockets.get_mut(id)
58 && slot.is_some()
59 {
60 *slot = None;
61 self.free_ids.push(id);
62 }
63 }
64}
65
66// Global registry for TCP listeners and streams
67static LISTENERS: Mutex<SocketRegistry<TcpListener>> = Mutex::new(SocketRegistry::new());
68static STREAMS: Mutex<SocketRegistry<TcpStream>> = Mutex::new(SocketRegistry::new());
69
70/// TCP listen on a port
71///
72/// Stack effect: ( port -- listener_id )
73///
74/// Binds to 0.0.0.0:port and returns a listener ID
75///
76/// # Safety
77/// Stack must have an Int (port number) on top
78#[unsafe(no_mangle)]
79pub unsafe extern "C" fn patch_seq_tcp_listen(stack: Stack) -> Stack {
80 unsafe {
81 let (stack, port_val) = pop(stack);
82 let port = match port_val {
83 Value::Int(p) => p,
84 _ => panic!(
85 "tcp_listen: expected Int (port) on stack, got {:?}",
86 port_val
87 ),
88 };
89
90 // Validate port range (1-65535, or 0 for OS-assigned)
91 if !(0..=65535).contains(&port) {
92 panic!("tcp_listen: invalid port {}, must be 0-65535", port);
93 }
94
95 // Bind to the port (non-blocking via May)
96 let addr = format!("0.0.0.0:{}", port);
97 let listener = TcpListener::bind(&addr)
98 .unwrap_or_else(|e| panic!("tcp_listen: failed to bind to {}: {}", addr, e));
99
100 // Store listener and get ID
101 let mut listeners = LISTENERS.lock().unwrap();
102 let listener_id = listeners
103 .allocate(listener)
104 .unwrap_or_else(|e| panic!("tcp_listen: {}", e));
105
106 push(stack, Value::Int(listener_id))
107 }
108}
109
110/// TCP accept a connection
111///
112/// Stack effect: ( listener_id -- client_id )
113///
114/// Accepts a connection (yields the strand until one arrives)
115///
116/// # Safety
117/// Stack must have an Int (listener_id) on top
118#[unsafe(no_mangle)]
119pub unsafe extern "C" fn patch_seq_tcp_accept(stack: Stack) -> Stack {
120 unsafe {
121 let (stack, listener_id_val) = pop(stack);
122 let listener_id = match listener_id_val {
123 Value::Int(id) => id as usize,
124 _ => panic!(
125 "tcp_accept: expected Int (listener_id), got {:?}",
126 listener_id_val
127 ),
128 };
129
130 // Take the listener out temporarily (so we don't hold lock during accept)
131 let listener = {
132 let mut listeners = LISTENERS.lock().unwrap();
133 listeners
134 .get_mut(listener_id)
135 .and_then(|opt| opt.take())
136 .unwrap_or_else(|| panic!("tcp_accept: invalid listener_id {}", listener_id))
137 };
138 // Lock released
139
140 // Accept connection (this yields the strand, doesn't block OS thread)
141 let (stream, _addr) = listener
142 .accept()
143 .unwrap_or_else(|e| panic!("tcp_accept: failed to accept connection: {}", e));
144
145 // Put the listener back
146 {
147 let mut listeners = LISTENERS.lock().unwrap();
148 if let Some(slot) = listeners.get_mut(listener_id) {
149 *slot = Some(listener);
150 }
151 }
152
153 // Store stream and get ID
154 let mut streams = STREAMS.lock().unwrap();
155 let client_id = streams
156 .allocate(stream)
157 .unwrap_or_else(|e| panic!("tcp_accept: {}", e));
158
159 push(stack, Value::Int(client_id))
160 }
161}
162
163/// TCP read from a socket
164///
165/// Stack effect: ( socket_id -- string )
166///
167/// Reads all available data from the socket
168///
169/// # Safety
170/// Stack must have an Int (socket_id) on top
171#[unsafe(no_mangle)]
172pub unsafe extern "C" fn patch_seq_tcp_read(stack: Stack) -> Stack {
173 unsafe {
174 let (stack, socket_id_val) = pop(stack);
175 let socket_id = match socket_id_val {
176 Value::Int(id) => id as usize,
177 _ => panic!(
178 "tcp_read: expected Int (socket_id), got {:?}",
179 socket_id_val
180 ),
181 };
182
183 // Take the stream out of the registry (so we don't hold the lock during I/O)
184 let mut stream = {
185 let mut streams = STREAMS.lock().unwrap();
186 streams
187 .get_mut(socket_id)
188 .and_then(|opt| opt.take())
189 .unwrap_or_else(|| panic!("tcp_read: invalid socket_id {}", socket_id))
190 };
191 // Registry lock is now released
192
193 // Read available data (this yields the strand, doesn't block OS thread)
194 // Reads all currently available data up to MAX_READ_SIZE
195 // Returns when: data is available and read, EOF, or WouldBlock
196 let mut buffer = Vec::new();
197 let mut chunk = [0u8; 4096];
198
199 // Read until we get data, EOF, or error
200 // For HTTP: Read once and return immediately to avoid blocking when client waits for response
201 loop {
202 // Check size limit to prevent memory exhaustion
203 if buffer.len() >= MAX_READ_SIZE {
204 panic!(
205 "tcp_read: read size limit exceeded ({} bytes). Possible memory exhaustion attack.",
206 MAX_READ_SIZE
207 );
208 }
209
210 match stream.read(&mut chunk) {
211 Ok(0) => {
212 break;
213 }
214 Ok(n) => {
215 // Don't exceed max size even with partial chunk
216 let bytes_to_add = n.min(MAX_READ_SIZE.saturating_sub(buffer.len()));
217 buffer.extend_from_slice(&chunk[..bytes_to_add]);
218 if bytes_to_add < n {
219 break; // Hit limit
220 }
221 // Return immediately after reading data
222 // May's cooperative I/O would block on next read() if no more data available
223 // Client might be waiting for our response, so don't wait for more
224 break;
225 }
226 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
227 // No data available yet - yield and wait for May's scheduler to wake us
228 // when data arrives, or connection closes
229 if buffer.is_empty() {
230 may::coroutine::yield_now();
231 continue;
232 }
233 // If we already have some data, return it
234 break;
235 }
236 Err(e) => panic!("tcp_read: failed to read from socket: {}", e),
237 }
238 }
239
240 let data = String::from_utf8(buffer)
241 .unwrap_or_else(|e| panic!("tcp_read: invalid UTF-8 data: {}", e));
242
243 // Put the stream back
244 {
245 let mut streams = STREAMS.lock().unwrap();
246 if let Some(slot) = streams.get_mut(socket_id) {
247 *slot = Some(stream);
248 }
249 }
250
251 push(stack, Value::String(data.into()))
252 }
253}
254
255/// TCP write to a socket
256///
257/// Stack effect: ( string socket_id -- )
258///
259/// Writes string to the socket
260///
261/// # Safety
262/// Stack must have Int (socket_id) and String on top
263#[unsafe(no_mangle)]
264pub unsafe extern "C" fn patch_seq_tcp_write(stack: Stack) -> Stack {
265 unsafe {
266 let (stack, socket_id_val) = pop(stack);
267 let socket_id = match socket_id_val {
268 Value::Int(id) => id as usize,
269 _ => panic!(
270 "tcp_write: expected Int (socket_id), got {:?}",
271 socket_id_val
272 ),
273 };
274
275 let (stack, data_val) = pop(stack);
276 let data = match data_val {
277 Value::String(s) => s,
278 _ => panic!("tcp_write: expected String, got {:?}", data_val),
279 };
280
281 // Take the stream out of the registry (so we don't hold the lock during I/O)
282 let mut stream = {
283 let mut streams = STREAMS.lock().unwrap();
284 streams
285 .get_mut(socket_id)
286 .and_then(|opt| opt.take())
287 .unwrap_or_else(|| panic!("tcp_write: invalid socket_id {}", socket_id))
288 };
289 // Registry lock is now released
290
291 // Write data (non-blocking via May, yields strand as needed)
292 stream
293 .write_all(data.as_str().as_bytes())
294 .unwrap_or_else(|e| panic!("tcp_write: failed to write to socket: {}", e));
295
296 stream
297 .flush()
298 .unwrap_or_else(|e| panic!("tcp_write: failed to flush socket: {}", e));
299
300 // Put the stream back
301 {
302 let mut streams = STREAMS.lock().unwrap();
303 if let Some(slot) = streams.get_mut(socket_id) {
304 *slot = Some(stream);
305 }
306 }
307
308 stack
309 }
310}
311
312/// TCP close a socket
313///
314/// Stack effect: ( socket_id -- )
315///
316/// Closes the socket connection and frees the socket ID for reuse
317///
318/// # Safety
319/// Stack must have an Int (socket_id) on top
320#[unsafe(no_mangle)]
321pub unsafe extern "C" fn patch_seq_tcp_close(stack: Stack) -> Stack {
322 unsafe {
323 let (stack, socket_id_val) = pop(stack);
324 let socket_id = match socket_id_val {
325 Value::Int(id) => id as usize,
326 _ => panic!(
327 "tcp_close: expected Int (socket_id), got {:?}",
328 socket_id_val
329 ),
330 };
331
332 // Remove the stream and mark ID as free for reuse
333 let mut streams = STREAMS.lock().unwrap();
334 streams.free(socket_id);
335
336 stack
337 }
338}
339
340// Public re-exports with short names for internal use
341pub use patch_seq_tcp_accept as tcp_accept;
342pub use patch_seq_tcp_close as tcp_close;
343pub use patch_seq_tcp_listen as tcp_listen;
344pub use patch_seq_tcp_read as tcp_read;
345pub use patch_seq_tcp_write as tcp_write;
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use crate::arithmetic::push_int;
351 use crate::scheduler::scheduler_init;
352
353 #[test]
354 fn test_tcp_listen() {
355 unsafe {
356 scheduler_init();
357
358 let stack = std::ptr::null_mut();
359 let stack = push_int(stack, 0); // Port 0 = OS assigns random port
360 let stack = tcp_listen(stack);
361
362 let (stack, result) = pop(stack);
363 match result {
364 Value::Int(listener_id) => {
365 assert!(listener_id >= 0, "Listener ID should be non-negative");
366 }
367 _ => panic!("Expected Int (listener_id), got {:?}", result),
368 }
369 assert!(stack.is_null());
370 }
371 }
372
373 #[test]
374 fn test_tcp_listen_invalid_port_negative() {
375 unsafe {
376 scheduler_init();
377 let stack = std::ptr::null_mut();
378 let _stack = push_int(stack, -1);
379
380 // Note: tcp_listen is extern "C" so it aborts on panic
381 // We document that invalid ports cause panics
382 // In practice, these would be caught by the type system
383 // (user code would provide validated ints)
384
385 // tcp_listen(stack); // Would abort
386 }
387 }
388
389 #[test]
390 fn test_tcp_listen_invalid_port_too_high() {
391 unsafe {
392 scheduler_init();
393 let stack = std::ptr::null_mut();
394 let _stack = push_int(stack, 65536);
395
396 // Note: tcp_listen is extern "C" so it aborts on panic
397 // We document that invalid ports cause panics
398
399 // tcp_listen(stack); // Would abort
400 }
401 }
402
403 #[test]
404 fn test_tcp_port_range_valid() {
405 unsafe {
406 scheduler_init();
407
408 // Test port 0 (OS-assigned)
409 let stack = push_int(std::ptr::null_mut(), 0);
410 let stack = tcp_listen(stack);
411 let (_, result) = pop(stack);
412 assert!(matches!(result, Value::Int(_)));
413
414 // Test a non-privileged port (ports 1-1023 require root on Unix)
415 // Use port 9999 which should be available and doesn't require privileges
416 let stack = push_int(std::ptr::null_mut(), 9999);
417 let stack = tcp_listen(stack);
418 let (_, result) = pop(stack);
419 assert!(matches!(result, Value::Int(_)));
420
421 // Note: Can't easily test all edge cases (port 1, 65535) as they
422 // may require privileges or be in use. Port validation logic is
423 // tested separately in the invalid port tests.
424 }
425 }
426
427 #[test]
428 fn test_socket_id_reuse_after_close() {
429 unsafe {
430 scheduler_init();
431
432 // Create a listener and accept a hypothetical connection
433 let stack = push_int(std::ptr::null_mut(), 0);
434 let stack = tcp_listen(stack);
435 let (stack, listener_result) = pop(stack);
436
437 let listener_id = match listener_result {
438 Value::Int(id) => id,
439 _ => panic!("Expected listener ID"),
440 };
441
442 // Verify listener ID is valid
443 assert!(listener_id >= 0);
444
445 // Note: We can't easily test connection acceptance without
446 // actually making a connection, but we can test the registry behavior
447
448 // Clean up
449 assert!(stack.is_null());
450 }
451 }
452
453 #[test]
454 fn test_tcp_read_invalid_socket_id() {
455 unsafe {
456 scheduler_init();
457
458 // Note: tcp_read is extern "C" so it aborts on panic
459 // Invalid socket IDs cause panics which are documented behavior
460 // let stack = push_int(std::ptr::null_mut(), 9999);
461 // tcp_read(stack); // Would abort
462
463 // Instead, we verify that valid operations work
464 // and document that invalid IDs are programming errors
465 }
466 }
467
468 #[test]
469 fn test_tcp_write_invalid_socket_id() {
470 unsafe {
471 scheduler_init();
472
473 // Note: tcp_write is extern "C" so it aborts on panic
474 // Invalid socket IDs cause panics which are documented behavior
475 // let stack = push(std::ptr::null_mut(), Value::String("test".into()));
476 // let stack = push_int(stack, 9999);
477 // tcp_write(stack); // Would abort
478 }
479 }
480
481 #[test]
482 fn test_tcp_close_idempotent() {
483 unsafe {
484 scheduler_init();
485
486 // Create a socket to close
487 let stack = push_int(std::ptr::null_mut(), 0);
488 let stack = tcp_listen(stack);
489 let (stack, _listener_result) = pop(stack);
490
491 // Close is idempotent - closing an already closed or invalid socket
492 // should not crash (it just does nothing via free())
493 let stack = push_int(stack, 9999);
494 let stack = tcp_close(stack);
495
496 assert!(stack.is_null());
497 }
498 }
499
500 #[test]
501 fn test_socket_registry_capacity() {
502 // Test that MAX_SOCKETS limit is enforced
503 // Note: We can't easily allocate 10,000 real sockets in a unit test,
504 // but the limit check is in the code at lines 38-41
505 // This test documents the expected behavior
506
507 // If we could allocate that many:
508 // - First 10,000 allocations should succeed
509 // - 10,001st allocation should panic with "Maximum socket limit reached"
510
511 // For now, just verify the constant exists
512 assert_eq!(MAX_SOCKETS, 10_000);
513 }
514
515 #[test]
516 fn test_max_read_size_limit() {
517 // Test that MAX_READ_SIZE limit exists and is reasonable
518 assert_eq!(MAX_READ_SIZE, 1_048_576); // 1 MB
519
520 // In practice, if tcp_read receives more than 1 MB, it should panic
521 // with "read size limit exceeded". Testing this requires a real socket
522 // with more than 1 MB of data, which is impractical for unit tests.
523 }
524}