1use crate::stack::{Stack, pop, push};
9use crate::value::Value;
10use may::net::{TcpListener, TcpStream};
11use std::io::{Read, Write};
12use std::sync::Mutex;
13
14const MAX_SOCKETS: usize = 10_000;
16
17const MAX_READ_SIZE: usize = 1_048_576; struct SocketRegistry<T> {
22 sockets: Vec<Option<T>>,
23 free_ids: Vec<usize>,
24}
25
26impl<T> SocketRegistry<T> {
27 const fn new() -> Self {
28 Self {
29 sockets: Vec::new(),
30 free_ids: Vec::new(),
31 }
32 }
33
34 fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
35 if let Some(id) = self.free_ids.pop() {
37 self.sockets[id] = Some(socket);
38 return Ok(id as i64);
39 }
40
41 if self.sockets.len() >= MAX_SOCKETS {
43 return Err("Maximum socket limit reached");
44 }
45
46 let id = self.sockets.len();
48 self.sockets.push(Some(socket));
49 Ok(id as i64)
50 }
51
52 fn get_mut(&mut self, id: usize) -> Option<&mut Option<T>> {
53 self.sockets.get_mut(id)
54 }
55
56 fn free(&mut self, id: usize) {
57 if let Some(slot) = self.sockets.get_mut(id)
58 && slot.is_some()
59 {
60 *slot = None;
61 self.free_ids.push(id);
62 }
63 }
64}
65
66static LISTENERS: Mutex<SocketRegistry<TcpListener>> = Mutex::new(SocketRegistry::new());
68static STREAMS: Mutex<SocketRegistry<TcpStream>> = Mutex::new(SocketRegistry::new());
69
70#[unsafe(no_mangle)]
80pub unsafe extern "C" fn patch_seq_tcp_listen(stack: Stack) -> Stack {
81 unsafe {
82 let (stack, port_val) = pop(stack);
83 let port = match port_val {
84 Value::Int(p) => p,
85 _ => {
86 let stack = push(stack, Value::Int(0));
88 return push(stack, Value::Bool(false));
89 }
90 };
91
92 if !(0..=65535).contains(&port) {
94 let stack = push(stack, Value::Int(0));
95 return push(stack, Value::Bool(false));
96 }
97
98 let addr = format!("0.0.0.0:{}", port);
100 let listener = match TcpListener::bind(&addr) {
101 Ok(l) => l,
102 Err(_) => {
103 let stack = push(stack, Value::Int(0));
104 return push(stack, Value::Bool(false));
105 }
106 };
107
108 let mut listeners = LISTENERS.lock().unwrap();
110 match listeners.allocate(listener) {
111 Ok(listener_id) => {
112 let stack = push(stack, Value::Int(listener_id));
113 push(stack, Value::Bool(true))
114 }
115 Err(_) => {
116 let stack = push(stack, Value::Int(0));
117 push(stack, Value::Bool(false))
118 }
119 }
120 }
121}
122
123#[unsafe(no_mangle)]
133pub unsafe extern "C" fn patch_seq_tcp_accept(stack: Stack) -> Stack {
134 unsafe {
135 let (stack, listener_id_val) = pop(stack);
136 let listener_id = match listener_id_val {
137 Value::Int(id) => id as usize,
138 _ => {
139 let stack = push(stack, Value::Int(0));
140 return push(stack, Value::Bool(false));
141 }
142 };
143
144 let listener = {
146 let mut listeners = LISTENERS.lock().unwrap();
147 match listeners.get_mut(listener_id).and_then(|opt| opt.take()) {
148 Some(l) => l,
149 None => {
150 let stack = push(stack, Value::Int(0));
151 return push(stack, Value::Bool(false));
152 }
153 }
154 };
155 let (stream, _addr) = match listener.accept() {
159 Ok(result) => result,
160 Err(_) => {
161 let mut listeners = LISTENERS.lock().unwrap();
163 if let Some(slot) = listeners.get_mut(listener_id) {
164 *slot = Some(listener);
165 }
166 let stack = push(stack, Value::Int(0));
167 return push(stack, Value::Bool(false));
168 }
169 };
170
171 {
173 let mut listeners = LISTENERS.lock().unwrap();
174 if let Some(slot) = listeners.get_mut(listener_id) {
175 *slot = Some(listener);
176 }
177 }
178
179 let mut streams = STREAMS.lock().unwrap();
181 match streams.allocate(stream) {
182 Ok(client_id) => {
183 let stack = push(stack, Value::Int(client_id));
184 push(stack, Value::Bool(true))
185 }
186 Err(_) => {
187 let stack = push(stack, Value::Int(0));
188 push(stack, Value::Bool(false))
189 }
190 }
191 }
192}
193
194#[unsafe(no_mangle)]
204pub unsafe extern "C" fn patch_seq_tcp_read(stack: Stack) -> Stack {
205 unsafe {
206 let (stack, socket_id_val) = pop(stack);
207 let socket_id = match socket_id_val {
208 Value::Int(id) => id as usize,
209 _ => {
210 let stack = push(stack, Value::String("".into()));
211 return push(stack, Value::Bool(false));
212 }
213 };
214
215 let mut stream = {
217 let mut streams = STREAMS.lock().unwrap();
218 match streams.get_mut(socket_id).and_then(|opt| opt.take()) {
219 Some(s) => s,
220 None => {
221 let stack = push(stack, Value::String("".into()));
222 return push(stack, Value::Bool(false));
223 }
224 }
225 };
226 let mut buffer = Vec::new();
232 let mut chunk = [0u8; 4096];
233 let mut read_error = false;
234
235 loop {
238 if buffer.len() >= MAX_READ_SIZE {
240 read_error = true;
241 break;
242 }
243
244 match stream.read(&mut chunk) {
245 Ok(0) => {
246 break;
247 }
248 Ok(n) => {
249 let bytes_to_add = n.min(MAX_READ_SIZE.saturating_sub(buffer.len()));
251 buffer.extend_from_slice(&chunk[..bytes_to_add]);
252 if bytes_to_add < n {
253 break; }
255 break;
259 }
260 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
261 if buffer.is_empty() {
264 may::coroutine::yield_now();
265 continue;
266 }
267 break;
269 }
270 Err(_) => {
271 read_error = true;
272 break;
273 }
274 }
275 }
276
277 {
279 let mut streams = STREAMS.lock().unwrap();
280 if let Some(slot) = streams.get_mut(socket_id) {
281 *slot = Some(stream);
282 }
283 }
284
285 if read_error {
286 let stack = push(stack, Value::String("".into()));
287 return push(stack, Value::Bool(false));
288 }
289
290 match String::from_utf8(buffer) {
291 Ok(data) => {
292 let stack = push(stack, Value::String(data.into()));
293 push(stack, Value::Bool(true))
294 }
295 Err(_) => {
296 let stack = push(stack, Value::String("".into()));
297 push(stack, Value::Bool(false))
298 }
299 }
300 }
301}
302
303#[unsafe(no_mangle)]
313pub unsafe extern "C" fn patch_seq_tcp_write(stack: Stack) -> Stack {
314 unsafe {
315 let (stack, socket_id_val) = pop(stack);
316 let socket_id = match socket_id_val {
317 Value::Int(id) => id as usize,
318 _ => {
319 return push(stack, Value::Bool(false));
320 }
321 };
322
323 let (stack, data_val) = pop(stack);
324 let data = match data_val {
325 Value::String(s) => s,
326 _ => {
327 return push(stack, Value::Bool(false));
328 }
329 };
330
331 let mut stream = {
333 let mut streams = STREAMS.lock().unwrap();
334 match streams.get_mut(socket_id).and_then(|opt| opt.take()) {
335 Some(s) => s,
336 None => {
337 return push(stack, Value::Bool(false));
338 }
339 }
340 };
341 let write_result = stream.write_all(data.as_str().as_bytes());
345 let flush_result = if write_result.is_ok() {
346 stream.flush()
347 } else {
348 write_result
349 };
350
351 {
353 let mut streams = STREAMS.lock().unwrap();
354 if let Some(slot) = streams.get_mut(socket_id) {
355 *slot = Some(stream);
356 }
357 }
358
359 push(stack, Value::Bool(flush_result.is_ok()))
360 }
361}
362
363#[unsafe(no_mangle)]
373pub unsafe extern "C" fn patch_seq_tcp_close(stack: Stack) -> Stack {
374 unsafe {
375 let (stack, socket_id_val) = pop(stack);
376 let socket_id = match socket_id_val {
377 Value::Int(id) => id as usize,
378 _ => {
379 return push(stack, Value::Bool(false));
380 }
381 };
382
383 let mut streams = STREAMS.lock().unwrap();
385 let existed = streams
386 .get_mut(socket_id)
387 .map(|slot| slot.is_some())
388 .unwrap_or(false);
389
390 if existed {
391 streams.free(socket_id);
392 }
393
394 push(stack, Value::Bool(existed))
395 }
396}
397
398pub use patch_seq_tcp_accept as tcp_accept;
400pub use patch_seq_tcp_close as tcp_close;
401pub use patch_seq_tcp_listen as tcp_listen;
402pub use patch_seq_tcp_read as tcp_read;
403pub use patch_seq_tcp_write as tcp_write;
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::arithmetic::push_int;
409 use crate::scheduler::scheduler_init;
410
411 #[test]
412 fn test_tcp_listen() {
413 unsafe {
414 scheduler_init();
415
416 let stack = crate::stack::alloc_test_stack();
417 let stack = push_int(stack, 0); let stack = tcp_listen(stack);
419
420 let (stack, success) = pop(stack);
422 assert!(
423 matches!(success, Value::Bool(true)),
424 "tcp_listen should succeed"
425 );
426
427 let (_stack, result) = pop(stack);
428 match result {
429 Value::Int(listener_id) => {
430 assert!(listener_id >= 0, "Listener ID should be non-negative");
431 }
432 _ => panic!("Expected Int (listener_id), got {:?}", result),
433 }
434 }
435 }
436
437 #[test]
438 fn test_tcp_listen_invalid_port_negative() {
439 unsafe {
440 scheduler_init();
441 let stack = crate::stack::alloc_test_stack();
442 let stack = push_int(stack, -1);
443 let stack = tcp_listen(stack);
444
445 let (stack, success) = pop(stack);
447 assert!(
448 matches!(success, Value::Bool(false)),
449 "Invalid port should return false"
450 );
451 let (_stack, result) = pop(stack);
452 assert!(
453 matches!(result, Value::Int(0)),
454 "Invalid port should return 0"
455 );
456 }
457 }
458
459 #[test]
460 fn test_tcp_listen_invalid_port_too_high() {
461 unsafe {
462 scheduler_init();
463 let stack = crate::stack::alloc_test_stack();
464 let stack = push_int(stack, 65536);
465 let stack = tcp_listen(stack);
466
467 let (stack, success) = pop(stack);
469 assert!(
470 matches!(success, Value::Bool(false)),
471 "Invalid port should return false"
472 );
473 let (_stack, result) = pop(stack);
474 assert!(
475 matches!(result, Value::Int(0)),
476 "Invalid port should return 0"
477 );
478 }
479 }
480
481 #[test]
482 fn test_tcp_port_range_valid() {
483 unsafe {
484 scheduler_init();
485
486 let stack = push_int(crate::stack::alloc_test_stack(), 0);
488 let stack = tcp_listen(stack);
489 let (stack, success) = pop(stack);
490 assert!(matches!(success, Value::Bool(true)));
491 let (_, result) = pop(stack);
492 assert!(matches!(result, Value::Int(_)));
493
494 let stack = push_int(crate::stack::alloc_test_stack(), 9999);
497 let stack = tcp_listen(stack);
498 let (stack, success) = pop(stack);
499 assert!(matches!(success, Value::Bool(true)));
500 let (_, result) = pop(stack);
501 assert!(matches!(result, Value::Int(_)));
502
503 }
507 }
508
509 #[test]
510 fn test_socket_id_reuse_after_close() {
511 unsafe {
512 scheduler_init();
513
514 let stack = push_int(crate::stack::alloc_test_stack(), 0);
516 let stack = tcp_listen(stack);
517 let (stack, success) = pop(stack);
518 assert!(matches!(success, Value::Bool(true)));
519 let (_stack, listener_result) = pop(stack);
520
521 let listener_id = match listener_result {
522 Value::Int(id) => id,
523 _ => panic!("Expected listener ID"),
524 };
525
526 assert!(listener_id >= 0);
528
529 }
534 }
535
536 #[test]
537 fn test_tcp_read_invalid_socket_id() {
538 unsafe {
539 scheduler_init();
540
541 let stack = push_int(crate::stack::alloc_test_stack(), 9999);
543 let stack = tcp_read(stack);
544
545 let (stack, success) = pop(stack);
546 assert!(
547 matches!(success, Value::Bool(false)),
548 "Invalid socket should return false"
549 );
550 let (_stack, result) = pop(stack);
551 match result {
552 Value::String(s) => assert_eq!(s.as_str(), ""),
553 _ => panic!("Expected empty string"),
554 }
555 }
556 }
557
558 #[test]
559 fn test_tcp_write_invalid_socket_id() {
560 unsafe {
561 scheduler_init();
562
563 let stack = push(
565 crate::stack::alloc_test_stack(),
566 Value::String("test".into()),
567 );
568 let stack = push_int(stack, 9999);
569 let stack = tcp_write(stack);
570
571 let (_stack, success) = pop(stack);
572 assert!(
573 matches!(success, Value::Bool(false)),
574 "Invalid socket should return false"
575 );
576 }
577 }
578
579 #[test]
580 fn test_tcp_close_idempotent() {
581 unsafe {
582 scheduler_init();
583
584 let stack = push_int(crate::stack::alloc_test_stack(), 0);
586 let stack = tcp_listen(stack);
587 let (stack, success) = pop(stack);
588 assert!(matches!(success, Value::Bool(true)));
589 let (stack, _listener_result) = pop(stack);
590
591 let stack = push_int(stack, 9999);
593 let stack = tcp_close(stack);
594
595 let (_stack, success) = pop(stack);
596 assert!(
597 matches!(success, Value::Bool(false)),
598 "Invalid socket close should return false"
599 );
600 }
601 }
602
603 #[test]
604 fn test_socket_registry_capacity() {
605 assert_eq!(MAX_SOCKETS, 10_000);
616 }
617
618 #[test]
619 fn test_max_read_size_limit() {
620 assert_eq!(MAX_READ_SIZE, 1_048_576); }
627}