seq_runtime/tcp.rs
1//! TCP Socket Operations for Seq
2//!
3//! Provides non-blocking TCP socket operations using May's coroutine-aware I/O.
4//! All operations yield the strand instead of blocking the OS thread.
5//!
6//! These functions are exported with C ABI for LLVM codegen.
7
8use crate::stack::{Stack, pop, push};
9use crate::value::Value;
10use may::net::{TcpListener, TcpStream};
11use std::io::{Read, Write};
12use std::sync::Mutex;
13
14// Maximum number of concurrent connections to prevent unbounded growth
15const MAX_SOCKETS: usize = 10_000;
16
17// Maximum bytes to read from a socket to prevent memory exhaustion attacks
18const MAX_READ_SIZE: usize = 1_048_576; // 1 MB
19
20// Socket registry with ID reuse via free list
21struct SocketRegistry<T> {
22 sockets: Vec<Option<T>>,
23 free_ids: Vec<usize>,
24}
25
26impl<T> SocketRegistry<T> {
27 const fn new() -> Self {
28 Self {
29 sockets: Vec::new(),
30 free_ids: Vec::new(),
31 }
32 }
33
34 fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
35 // Try to reuse a free ID first
36 if let Some(id) = self.free_ids.pop() {
37 self.sockets[id] = Some(socket);
38 return Ok(id as i64);
39 }
40
41 // Check max connections limit
42 if self.sockets.len() >= MAX_SOCKETS {
43 return Err("Maximum socket limit reached");
44 }
45
46 // Allocate new ID
47 let id = self.sockets.len();
48 self.sockets.push(Some(socket));
49 Ok(id as i64)
50 }
51
52 fn get_mut(&mut self, id: usize) -> Option<&mut Option<T>> {
53 self.sockets.get_mut(id)
54 }
55
56 fn free(&mut self, id: usize) {
57 if let Some(slot) = self.sockets.get_mut(id)
58 && slot.is_some()
59 {
60 *slot = None;
61 self.free_ids.push(id);
62 }
63 }
64}
65
66// Global registry for TCP listeners and streams
67static LISTENERS: Mutex<SocketRegistry<TcpListener>> = Mutex::new(SocketRegistry::new());
68static STREAMS: Mutex<SocketRegistry<TcpStream>> = Mutex::new(SocketRegistry::new());
69
70/// TCP listen on a port
71///
72/// Stack effect: ( port -- listener_id )
73///
74/// Binds to 0.0.0.0:port and returns a listener ID
75///
76/// # Safety
77/// Stack must have an Int (port number) on top
78#[unsafe(no_mangle)]
79pub unsafe extern "C" fn patch_seq_tcp_listen(stack: Stack) -> Stack {
80 unsafe {
81 let (stack, port_val) = pop(stack);
82 let port = match port_val {
83 Value::Int(p) => p,
84 _ => panic!(
85 "tcp_listen: expected Int (port) on stack, got {:?}",
86 port_val
87 ),
88 };
89
90 // Validate port range (1-65535, or 0 for OS-assigned)
91 if !(0..=65535).contains(&port) {
92 panic!("tcp_listen: invalid port {}, must be 0-65535", port);
93 }
94
95 // Bind to the port (non-blocking via May)
96 let addr = format!("0.0.0.0:{}", port);
97 let listener = TcpListener::bind(&addr)
98 .unwrap_or_else(|e| panic!("tcp_listen: failed to bind to {}: {}", addr, e));
99
100 // Store listener and get ID
101 let mut listeners = LISTENERS.lock().unwrap();
102 let listener_id = listeners
103 .allocate(listener)
104 .unwrap_or_else(|e| panic!("tcp_listen: {}", e));
105
106 push(stack, Value::Int(listener_id))
107 }
108}
109
110/// TCP accept a connection
111///
112/// Stack effect: ( listener_id -- client_id )
113///
114/// Accepts a connection (yields the strand until one arrives)
115///
116/// # Safety
117/// Stack must have an Int (listener_id) on top
118#[unsafe(no_mangle)]
119pub unsafe extern "C" fn patch_seq_tcp_accept(stack: Stack) -> Stack {
120 unsafe {
121 let (stack, listener_id_val) = pop(stack);
122 let listener_id = match listener_id_val {
123 Value::Int(id) => id as usize,
124 _ => panic!(
125 "tcp_accept: expected Int (listener_id), got {:?}",
126 listener_id_val
127 ),
128 };
129
130 // Take the listener out temporarily (so we don't hold lock during accept)
131 let listener = {
132 let mut listeners = LISTENERS.lock().unwrap();
133 listeners
134 .get_mut(listener_id)
135 .and_then(|opt| opt.take())
136 .unwrap_or_else(|| panic!("tcp_accept: invalid listener_id {}", listener_id))
137 };
138 // Lock released
139
140 // Accept connection (this yields the strand, doesn't block OS thread)
141 let (stream, _addr) = listener
142 .accept()
143 .unwrap_or_else(|e| panic!("tcp_accept: failed to accept connection: {}", e));
144
145 // Put the listener back
146 {
147 let mut listeners = LISTENERS.lock().unwrap();
148 if let Some(slot) = listeners.get_mut(listener_id) {
149 *slot = Some(listener);
150 }
151 }
152
153 // Store stream and get ID
154 let mut streams = STREAMS.lock().unwrap();
155 let client_id = streams
156 .allocate(stream)
157 .unwrap_or_else(|e| panic!("tcp_accept: {}", e));
158
159 push(stack, Value::Int(client_id))
160 }
161}
162
163/// TCP read from a socket
164///
165/// Stack effect: ( socket_id -- string )
166///
167/// Reads all available data from the socket
168///
169/// # Safety
170/// Stack must have an Int (socket_id) on top
171#[unsafe(no_mangle)]
172pub unsafe extern "C" fn patch_seq_tcp_read(stack: Stack) -> Stack {
173 unsafe {
174 let (stack, socket_id_val) = pop(stack);
175 let socket_id = match socket_id_val {
176 Value::Int(id) => id as usize,
177 _ => panic!(
178 "tcp_read: expected Int (socket_id), got {:?}",
179 socket_id_val
180 ),
181 };
182
183 // Take the stream out of the registry (so we don't hold the lock during I/O)
184 let mut stream = {
185 let mut streams = STREAMS.lock().unwrap();
186 streams
187 .get_mut(socket_id)
188 .and_then(|opt| opt.take())
189 .unwrap_or_else(|| panic!("tcp_read: invalid socket_id {}", socket_id))
190 };
191 // Registry lock is now released
192
193 // Read available data (this yields the strand, doesn't block OS thread)
194 // Reads all currently available data up to MAX_READ_SIZE
195 // Returns when: data is available and read, EOF, or WouldBlock
196 let mut buffer = Vec::new();
197 let mut chunk = [0u8; 4096];
198
199 // Read until we get data, EOF, or error
200 // For HTTP: Read once and return immediately to avoid blocking when client waits for response
201 loop {
202 // Check size limit to prevent memory exhaustion
203 if buffer.len() >= MAX_READ_SIZE {
204 panic!(
205 "tcp_read: read size limit exceeded ({} bytes). Possible memory exhaustion attack.",
206 MAX_READ_SIZE
207 );
208 }
209
210 match stream.read(&mut chunk) {
211 Ok(0) => {
212 break;
213 }
214 Ok(n) => {
215 // Don't exceed max size even with partial chunk
216 let bytes_to_add = n.min(MAX_READ_SIZE.saturating_sub(buffer.len()));
217 buffer.extend_from_slice(&chunk[..bytes_to_add]);
218 if bytes_to_add < n {
219 break; // Hit limit
220 }
221 // Return immediately after reading data
222 // May's cooperative I/O would block on next read() if no more data available
223 // Client might be waiting for our response, so don't wait for more
224 break;
225 }
226 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
227 // No data available yet - yield and wait for May's scheduler to wake us
228 // when data arrives, or connection closes
229 if buffer.is_empty() {
230 may::coroutine::yield_now();
231 continue;
232 }
233 // If we already have some data, return it
234 break;
235 }
236 Err(e) => panic!("tcp_read: failed to read from socket: {}", e),
237 }
238 }
239
240 let data = String::from_utf8(buffer)
241 .unwrap_or_else(|e| panic!("tcp_read: invalid UTF-8 data: {}", e));
242
243 // Put the stream back
244 {
245 let mut streams = STREAMS.lock().unwrap();
246 if let Some(slot) = streams.get_mut(socket_id) {
247 *slot = Some(stream);
248 }
249 }
250
251 push(stack, Value::String(data.into()))
252 }
253}
254
255/// TCP write to a socket
256///
257/// Stack effect: ( string socket_id -- )
258///
259/// Writes string to the socket
260///
261/// # Safety
262/// Stack must have Int (socket_id) and String on top
263#[unsafe(no_mangle)]
264pub unsafe extern "C" fn patch_seq_tcp_write(stack: Stack) -> Stack {
265 unsafe {
266 let (stack, socket_id_val) = pop(stack);
267 let socket_id = match socket_id_val {
268 Value::Int(id) => id as usize,
269 _ => panic!(
270 "tcp_write: expected Int (socket_id), got {:?}",
271 socket_id_val
272 ),
273 };
274
275 let (stack, data_val) = pop(stack);
276 let data = match data_val {
277 Value::String(s) => s,
278 _ => panic!("tcp_write: expected String, got {:?}", data_val),
279 };
280
281 // Take the stream out of the registry (so we don't hold the lock during I/O)
282 let mut stream = {
283 let mut streams = STREAMS.lock().unwrap();
284 streams
285 .get_mut(socket_id)
286 .and_then(|opt| opt.take())
287 .unwrap_or_else(|| panic!("tcp_write: invalid socket_id {}", socket_id))
288 };
289 // Registry lock is now released
290
291 // Write data (non-blocking via May, yields strand as needed)
292 stream
293 .write_all(data.as_str().as_bytes())
294 .unwrap_or_else(|e| panic!("tcp_write: failed to write to socket: {}", e));
295
296 stream
297 .flush()
298 .unwrap_or_else(|e| panic!("tcp_write: failed to flush socket: {}", e));
299
300 // Put the stream back
301 {
302 let mut streams = STREAMS.lock().unwrap();
303 if let Some(slot) = streams.get_mut(socket_id) {
304 *slot = Some(stream);
305 }
306 }
307
308 stack
309 }
310}
311
312/// TCP close a socket
313///
314/// Stack effect: ( socket_id -- )
315///
316/// Closes the socket connection and frees the socket ID for reuse
317///
318/// # Safety
319/// Stack must have an Int (socket_id) on top
320#[unsafe(no_mangle)]
321pub unsafe extern "C" fn patch_seq_tcp_close(stack: Stack) -> Stack {
322 unsafe {
323 let (stack, socket_id_val) = pop(stack);
324 let socket_id = match socket_id_val {
325 Value::Int(id) => id as usize,
326 _ => panic!(
327 "tcp_close: expected Int (socket_id), got {:?}",
328 socket_id_val
329 ),
330 };
331
332 // Remove the stream and mark ID as free for reuse
333 let mut streams = STREAMS.lock().unwrap();
334 streams.free(socket_id);
335
336 stack
337 }
338}
339
340// Public re-exports with short names for internal use
341pub use patch_seq_tcp_accept as tcp_accept;
342pub use patch_seq_tcp_close as tcp_close;
343pub use patch_seq_tcp_listen as tcp_listen;
344pub use patch_seq_tcp_read as tcp_read;
345pub use patch_seq_tcp_write as tcp_write;
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use crate::arithmetic::push_int;
351 use crate::scheduler::scheduler_init;
352
353 #[test]
354 fn test_tcp_listen() {
355 unsafe {
356 scheduler_init();
357
358 let stack = crate::stack::alloc_test_stack();
359 let stack = push_int(stack, 0); // Port 0 = OS assigns random port
360 let stack = tcp_listen(stack);
361
362 let (_stack, result) = pop(stack);
363 match result {
364 Value::Int(listener_id) => {
365 assert!(listener_id >= 0, "Listener ID should be non-negative");
366 }
367 _ => panic!("Expected Int (listener_id), got {:?}", result),
368 }
369 }
370 }
371
372 #[test]
373 fn test_tcp_listen_invalid_port_negative() {
374 unsafe {
375 scheduler_init();
376 let stack = crate::stack::alloc_test_stack();
377 let _stack = push_int(stack, -1);
378
379 // Note: tcp_listen is extern "C" so it aborts on panic
380 // We document that invalid ports cause panics
381 // In practice, these would be caught by the type system
382 // (user code would provide validated ints)
383
384 // tcp_listen(stack); // Would abort
385 }
386 }
387
388 #[test]
389 fn test_tcp_listen_invalid_port_too_high() {
390 unsafe {
391 scheduler_init();
392 let stack = crate::stack::alloc_test_stack();
393 let _stack = push_int(stack, 65536);
394
395 // Note: tcp_listen is extern "C" so it aborts on panic
396 // We document that invalid ports cause panics
397
398 // tcp_listen(stack); // Would abort
399 }
400 }
401
402 #[test]
403 fn test_tcp_port_range_valid() {
404 unsafe {
405 scheduler_init();
406
407 // Test port 0 (OS-assigned)
408 let stack = push_int(crate::stack::alloc_test_stack(), 0);
409 let stack = tcp_listen(stack);
410 let (_, result) = pop(stack);
411 assert!(matches!(result, Value::Int(_)));
412
413 // Test a non-privileged port (ports 1-1023 require root on Unix)
414 // Use port 9999 which should be available and doesn't require privileges
415 let stack = push_int(crate::stack::alloc_test_stack(), 9999);
416 let stack = tcp_listen(stack);
417 let (_, result) = pop(stack);
418 assert!(matches!(result, Value::Int(_)));
419
420 // Note: Can't easily test all edge cases (port 1, 65535) as they
421 // may require privileges or be in use. Port validation logic is
422 // tested separately in the invalid port tests.
423 }
424 }
425
426 #[test]
427 fn test_socket_id_reuse_after_close() {
428 unsafe {
429 scheduler_init();
430
431 // Create a listener and accept a hypothetical connection
432 let stack = push_int(crate::stack::alloc_test_stack(), 0);
433 let stack = tcp_listen(stack);
434 let (_stack, listener_result) = pop(stack);
435
436 let listener_id = match listener_result {
437 Value::Int(id) => id,
438 _ => panic!("Expected listener ID"),
439 };
440
441 // Verify listener ID is valid
442 assert!(listener_id >= 0);
443
444 // Note: We can't easily test connection acceptance without
445 // actually making a connection, but we can test the registry behavior
446
447 // Clean up
448 }
449 }
450
451 #[test]
452 fn test_tcp_read_invalid_socket_id() {
453 unsafe {
454 scheduler_init();
455
456 // Note: tcp_read is extern "C" so it aborts on panic
457 // Invalid socket IDs cause panics which are documented behavior
458 // let stack = push_int(crate::stack::alloc_test_stack(), 9999);
459 // tcp_read(stack); // Would abort
460
461 // Instead, we verify that valid operations work
462 // and document that invalid IDs are programming errors
463 }
464 }
465
466 #[test]
467 fn test_tcp_write_invalid_socket_id() {
468 unsafe {
469 scheduler_init();
470
471 // Note: tcp_write is extern "C" so it aborts on panic
472 // Invalid socket IDs cause panics which are documented behavior
473 // let stack = push(crate::stack::alloc_test_stack(), Value::String("test".into()));
474 // let stack = push_int(stack, 9999);
475 // tcp_write(stack); // Would abort
476 }
477 }
478
479 #[test]
480 fn test_tcp_close_idempotent() {
481 unsafe {
482 scheduler_init();
483
484 // Create a socket to close
485 let stack = push_int(crate::stack::alloc_test_stack(), 0);
486 let stack = tcp_listen(stack);
487 let (stack, _listener_result) = pop(stack);
488
489 // Close is idempotent - closing an already closed or invalid socket
490 // should not crash (it just does nothing via free())
491 let stack = push_int(stack, 9999);
492 let _stack = tcp_close(stack);
493 }
494 }
495
496 #[test]
497 fn test_socket_registry_capacity() {
498 // Test that MAX_SOCKETS limit is enforced
499 // Note: We can't easily allocate 10,000 real sockets in a unit test,
500 // but the limit check is in the code at lines 38-41
501 // This test documents the expected behavior
502
503 // If we could allocate that many:
504 // - First 10,000 allocations should succeed
505 // - 10,001st allocation should panic with "Maximum socket limit reached"
506
507 // For now, just verify the constant exists
508 assert_eq!(MAX_SOCKETS, 10_000);
509 }
510
511 #[test]
512 fn test_max_read_size_limit() {
513 // Test that MAX_READ_SIZE limit exists and is reasonable
514 assert_eq!(MAX_READ_SIZE, 1_048_576); // 1 MB
515
516 // In practice, if tcp_read receives more than 1 MB, it should panic
517 // with "read size limit exceeded". Testing this requires a real socket
518 // with more than 1 MB of data, which is impractical for unit tests.
519 }
520}