1use crate::stack::{Stack, pop, push};
9use crate::value::Value;
10use may::net::{TcpListener, TcpStream};
11use std::io::{Read, Write};
12use std::sync::Mutex;
13
14const MAX_SOCKETS: usize = 10_000;
16
17const MAX_READ_SIZE: usize = 1_048_576; struct SocketRegistry<T> {
22 sockets: Vec<Option<T>>,
23 free_ids: Vec<usize>,
24}
25
26impl<T> SocketRegistry<T> {
27 const fn new() -> Self {
28 Self {
29 sockets: Vec::new(),
30 free_ids: Vec::new(),
31 }
32 }
33
34 fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
35 if let Some(id) = self.free_ids.pop() {
37 self.sockets[id] = Some(socket);
38 return Ok(id as i64);
39 }
40
41 if self.sockets.len() >= MAX_SOCKETS {
43 return Err("Maximum socket limit reached");
44 }
45
46 let id = self.sockets.len();
48 self.sockets.push(Some(socket));
49 Ok(id as i64)
50 }
51
52 fn get_mut(&mut self, id: usize) -> Option<&mut Option<T>> {
53 self.sockets.get_mut(id)
54 }
55
56 fn free(&mut self, id: usize) {
57 if let Some(slot) = self.sockets.get_mut(id)
58 && slot.is_some()
59 {
60 *slot = None;
61 self.free_ids.push(id);
62 }
63 }
64}
65
66static LISTENERS: Mutex<SocketRegistry<TcpListener>> = Mutex::new(SocketRegistry::new());
68static STREAMS: Mutex<SocketRegistry<TcpStream>> = Mutex::new(SocketRegistry::new());
69
70#[unsafe(no_mangle)]
80pub unsafe extern "C" fn patch_seq_tcp_listen(stack: Stack) -> Stack {
81 unsafe {
82 let (stack, port_val) = pop(stack);
83 let port = match port_val {
84 Value::Int(p) => p,
85 _ => {
86 let stack = push(stack, Value::Int(0));
88 return push(stack, Value::Bool(false));
89 }
90 };
91
92 if !(0..=65535).contains(&port) {
94 let stack = push(stack, Value::Int(0));
95 return push(stack, Value::Bool(false));
96 }
97
98 let addr = format!("0.0.0.0:{}", port);
100 let listener = match TcpListener::bind(&addr) {
101 Ok(l) => l,
102 Err(_) => {
103 let stack = push(stack, Value::Int(0));
104 return push(stack, Value::Bool(false));
105 }
106 };
107
108 let mut listeners = LISTENERS.lock().unwrap();
110 match listeners.allocate(listener) {
111 Ok(listener_id) => {
112 let stack = push(stack, Value::Int(listener_id));
113 push(stack, Value::Bool(true))
114 }
115 Err(_) => {
116 let stack = push(stack, Value::Int(0));
117 push(stack, Value::Bool(false))
118 }
119 }
120 }
121}
122
123#[unsafe(no_mangle)]
133pub unsafe extern "C" fn patch_seq_tcp_accept(stack: Stack) -> Stack {
134 unsafe {
135 let (stack, listener_id_val) = pop(stack);
136 let listener_id = match listener_id_val {
137 Value::Int(id) => id as usize,
138 _ => {
139 let stack = push(stack, Value::Int(0));
140 return push(stack, Value::Bool(false));
141 }
142 };
143
144 let listener = {
146 let mut listeners = LISTENERS.lock().unwrap();
147 match listeners.get_mut(listener_id).and_then(|opt| opt.take()) {
148 Some(l) => l,
149 None => {
150 let stack = push(stack, Value::Int(0));
151 return push(stack, Value::Bool(false));
152 }
153 }
154 };
155 let (stream, _addr) = match listener.accept() {
159 Ok(result) => result,
160 Err(_) => {
161 let mut listeners = LISTENERS.lock().unwrap();
163 if let Some(slot) = listeners.get_mut(listener_id) {
164 *slot = Some(listener);
165 }
166 let stack = push(stack, Value::Int(0));
167 return push(stack, Value::Bool(false));
168 }
169 };
170
171 {
173 let mut listeners = LISTENERS.lock().unwrap();
174 if let Some(slot) = listeners.get_mut(listener_id) {
175 *slot = Some(listener);
176 }
177 }
178
179 let mut streams = STREAMS.lock().unwrap();
181 match streams.allocate(stream) {
182 Ok(client_id) => {
183 let stack = push(stack, Value::Int(client_id));
184 push(stack, Value::Bool(true))
185 }
186 Err(_) => {
187 let stack = push(stack, Value::Int(0));
188 push(stack, Value::Bool(false))
189 }
190 }
191 }
192}
193
194#[unsafe(no_mangle)]
204pub unsafe extern "C" fn patch_seq_tcp_read(stack: Stack) -> Stack {
205 unsafe {
206 let (stack, socket_id_val) = pop(stack);
207 let socket_id = match socket_id_val {
208 Value::Int(id) => id as usize,
209 _ => {
210 let stack = push(stack, Value::String("".into()));
211 return push(stack, Value::Bool(false));
212 }
213 };
214
215 let mut stream = {
217 let mut streams = STREAMS.lock().unwrap();
218 match streams.get_mut(socket_id).and_then(|opt| opt.take()) {
219 Some(s) => s,
220 None => {
221 let stack = push(stack, Value::String("".into()));
222 return push(stack, Value::Bool(false));
223 }
224 }
225 };
226 let mut buffer = Vec::new();
232 let mut chunk = [0u8; 4096];
233 let mut read_error = false;
234
235 loop {
238 if buffer.len() >= MAX_READ_SIZE {
240 read_error = true;
241 break;
242 }
243
244 match stream.read(&mut chunk) {
245 Ok(0) => {
246 break;
247 }
248 Ok(n) => {
249 let bytes_to_add = n.min(MAX_READ_SIZE.saturating_sub(buffer.len()));
251 buffer.extend_from_slice(&chunk[..bytes_to_add]);
252 if bytes_to_add < n {
253 break; }
255 break;
259 }
260 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
261 if buffer.is_empty() {
264 may::coroutine::yield_now();
265 continue;
266 }
267 break;
269 }
270 Err(_) => {
271 read_error = true;
272 break;
273 }
274 }
275 }
276
277 {
279 let mut streams = STREAMS.lock().unwrap();
280 if let Some(slot) = streams.get_mut(socket_id) {
281 *slot = Some(stream);
282 }
283 }
284
285 if read_error {
286 let stack = push(stack, Value::String("".into()));
287 return push(stack, Value::Bool(false));
288 }
289
290 match String::from_utf8(buffer) {
291 Ok(data) => {
292 let stack = push(stack, Value::String(data.into()));
293 push(stack, Value::Bool(true))
294 }
295 Err(_) => {
296 let stack = push(stack, Value::String("".into()));
297 push(stack, Value::Bool(false))
298 }
299 }
300 }
301}
302
303#[unsafe(no_mangle)]
313pub unsafe extern "C" fn patch_seq_tcp_write(stack: Stack) -> Stack {
314 unsafe {
315 let (stack, socket_id_val) = pop(stack);
316 let socket_id = match socket_id_val {
317 Value::Int(id) => id as usize,
318 _ => {
319 return push(stack, Value::Bool(false));
320 }
321 };
322
323 let (stack, data_val) = pop(stack);
324 let data = match data_val {
325 Value::String(s) => s,
326 _ => {
327 return push(stack, Value::Bool(false));
328 }
329 };
330
331 let mut stream = {
333 let mut streams = STREAMS.lock().unwrap();
334 match streams.get_mut(socket_id).and_then(|opt| opt.take()) {
335 Some(s) => s,
336 None => {
337 return push(stack, Value::Bool(false));
338 }
339 }
340 };
341 let write_result = stream.write_all(data.as_str().as_bytes());
345 let flush_result = if write_result.is_ok() {
346 stream.flush()
347 } else {
348 write_result
349 };
350
351 {
353 let mut streams = STREAMS.lock().unwrap();
354 if let Some(slot) = streams.get_mut(socket_id) {
355 *slot = Some(stream);
356 }
357 }
358
359 push(stack, Value::Bool(flush_result.is_ok()))
360 }
361}
362
363#[unsafe(no_mangle)]
373pub unsafe extern "C" fn patch_seq_tcp_close(stack: Stack) -> Stack {
374 unsafe {
375 let (stack, socket_id_val) = pop(stack);
376 let socket_id = match socket_id_val {
377 Value::Int(id) => id as usize,
378 _ => {
379 return push(stack, Value::Bool(false));
380 }
381 };
382
383 let mut streams = STREAMS.lock().unwrap();
385 let existed = streams
386 .get_mut(socket_id)
387 .map(|slot| slot.is_some())
388 .unwrap_or(false);
389
390 if existed {
391 streams.free(socket_id);
392 }
393
394 push(stack, Value::Bool(existed))
395 }
396}
397
398pub use patch_seq_tcp_accept as tcp_accept;
400pub use patch_seq_tcp_close as tcp_close;
401pub use patch_seq_tcp_listen as tcp_listen;
402pub use patch_seq_tcp_read as tcp_read;
403pub use patch_seq_tcp_write as tcp_write;
404
405#[cfg(test)]
406mod tests;