1use crate::stack::{Stack, pop, push};
9use crate::value::Value;
10use may::net::{TcpListener, TcpStream};
11use std::io::{Read, Write};
12use std::sync::Mutex;
13
14const MAX_SOCKETS: usize = 10_000;
16
17const MAX_READ_SIZE: usize = 1_048_576; struct SocketRegistry<T> {
22 sockets: Vec<Option<T>>,
23 free_ids: Vec<usize>,
24}
25
26impl<T> SocketRegistry<T> {
27 const fn new() -> Self {
28 Self {
29 sockets: Vec::new(),
30 free_ids: Vec::new(),
31 }
32 }
33
34 fn allocate(&mut self, socket: T) -> Result<i64, &'static str> {
35 if let Some(id) = self.free_ids.pop() {
37 self.sockets[id] = Some(socket);
38 return Ok(id as i64);
39 }
40
41 if self.sockets.len() >= MAX_SOCKETS {
43 return Err("Maximum socket limit reached");
44 }
45
46 let id = self.sockets.len();
48 self.sockets.push(Some(socket));
49 Ok(id as i64)
50 }
51
52 fn get_mut(&mut self, id: usize) -> Option<&mut Option<T>> {
53 self.sockets.get_mut(id)
54 }
55
56 fn free(&mut self, id: usize) {
57 if let Some(slot) = self.sockets.get_mut(id)
58 && slot.is_some()
59 {
60 *slot = None;
61 self.free_ids.push(id);
62 }
63 }
64}
65
66static LISTENERS: Mutex<SocketRegistry<TcpListener>> = Mutex::new(SocketRegistry::new());
68static STREAMS: Mutex<SocketRegistry<TcpStream>> = Mutex::new(SocketRegistry::new());
69
70#[unsafe(no_mangle)]
80pub unsafe extern "C" fn patch_seq_tcp_listen(stack: Stack) -> Stack {
81 unsafe {
82 let (stack, port_val) = pop(stack);
83 let port = match port_val {
84 Value::Int(p) => p,
85 _ => {
86 let stack = push(stack, Value::Int(0));
88 return push(stack, Value::Bool(false));
89 }
90 };
91
92 if !(0..=65535).contains(&port) {
94 let stack = push(stack, Value::Int(0));
95 return push(stack, Value::Bool(false));
96 }
97
98 let addr = format!("0.0.0.0:{}", port);
100 let listener = match TcpListener::bind(&addr) {
101 Ok(l) => l,
102 Err(_) => {
103 let stack = push(stack, Value::Int(0));
104 return push(stack, Value::Bool(false));
105 }
106 };
107
108 let mut listeners = LISTENERS.lock().unwrap();
110 match listeners.allocate(listener) {
111 Ok(listener_id) => {
112 let stack = push(stack, Value::Int(listener_id));
113 push(stack, Value::Bool(true))
114 }
115 Err(_) => {
116 let stack = push(stack, Value::Int(0));
117 push(stack, Value::Bool(false))
118 }
119 }
120 }
121}
122
123#[unsafe(no_mangle)]
133pub unsafe extern "C" fn patch_seq_tcp_accept(stack: Stack) -> Stack {
134 unsafe {
135 let (stack, listener_id_val) = pop(stack);
136 let listener_id = match listener_id_val {
137 Value::Int(id) => id as usize,
138 _ => {
139 let stack = push(stack, Value::Int(0));
140 return push(stack, Value::Bool(false));
141 }
142 };
143
144 let listener = {
146 let mut listeners = LISTENERS.lock().unwrap();
147 match listeners.get_mut(listener_id).and_then(|opt| opt.take()) {
148 Some(l) => l,
149 None => {
150 let stack = push(stack, Value::Int(0));
151 return push(stack, Value::Bool(false));
152 }
153 }
154 };
155 let (stream, _addr) = match listener.accept() {
159 Ok(result) => result,
160 Err(_) => {
161 let mut listeners = LISTENERS.lock().unwrap();
163 if let Some(slot) = listeners.get_mut(listener_id) {
164 *slot = Some(listener);
165 }
166 let stack = push(stack, Value::Int(0));
167 return push(stack, Value::Bool(false));
168 }
169 };
170
171 {
173 let mut listeners = LISTENERS.lock().unwrap();
174 if let Some(slot) = listeners.get_mut(listener_id) {
175 *slot = Some(listener);
176 }
177 }
178
179 let mut streams = STREAMS.lock().unwrap();
181 match streams.allocate(stream) {
182 Ok(client_id) => {
183 let stack = push(stack, Value::Int(client_id));
184 push(stack, Value::Bool(true))
185 }
186 Err(_) => {
187 let stack = push(stack, Value::Int(0));
188 push(stack, Value::Bool(false))
189 }
190 }
191 }
192}
193
194#[unsafe(no_mangle)]
204pub unsafe extern "C" fn patch_seq_tcp_read(stack: Stack) -> Stack {
205 unsafe {
206 let (stack, socket_id_val) = pop(stack);
207 let socket_id = match socket_id_val {
208 Value::Int(id) => id as usize,
209 _ => {
210 let stack = push(stack, Value::String("".into()));
211 return push(stack, Value::Bool(false));
212 }
213 };
214
215 let mut stream = {
217 let mut streams = STREAMS.lock().unwrap();
218 match streams.get_mut(socket_id).and_then(|opt| opt.take()) {
219 Some(s) => s,
220 None => {
221 let stack = push(stack, Value::String("".into()));
222 return push(stack, Value::Bool(false));
223 }
224 }
225 };
226 let mut buffer = Vec::new();
232 let mut chunk = [0u8; 4096];
233 let mut read_error = false;
234
235 loop {
238 if buffer.len() >= MAX_READ_SIZE {
240 read_error = true;
241 break;
242 }
243
244 match stream.read(&mut chunk) {
245 Ok(0) => {
246 break;
247 }
248 Ok(n) => {
249 let bytes_to_add = n.min(MAX_READ_SIZE.saturating_sub(buffer.len()));
251 buffer.extend_from_slice(&chunk[..bytes_to_add]);
252 if bytes_to_add < n {
253 break; }
255 break;
259 }
260 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
261 if buffer.is_empty() {
264 may::coroutine::yield_now();
265 continue;
266 }
267 break;
269 }
270 Err(_) => {
271 read_error = true;
272 break;
273 }
274 }
275 }
276
277 {
279 let mut streams = STREAMS.lock().unwrap();
280 if let Some(slot) = streams.get_mut(socket_id) {
281 *slot = Some(stream);
282 }
283 }
284
285 if read_error {
286 let stack = push(stack, Value::String("".into()));
287 return push(stack, Value::Bool(false));
288 }
289
290 let stack = push(stack, Value::String(crate::seqstring::global_bytes(buffer)));
295 push(stack, Value::Bool(true))
296 }
297}
298
299#[unsafe(no_mangle)]
309pub unsafe extern "C" fn patch_seq_tcp_write(stack: Stack) -> Stack {
310 unsafe {
311 let (stack, socket_id_val) = pop(stack);
312 let socket_id = match socket_id_val {
313 Value::Int(id) => id as usize,
314 _ => {
315 return push(stack, Value::Bool(false));
316 }
317 };
318
319 let (stack, data_val) = pop(stack);
320 let data = match data_val {
321 Value::String(s) => s,
322 _ => {
323 return push(stack, Value::Bool(false));
324 }
325 };
326
327 let mut stream = {
329 let mut streams = STREAMS.lock().unwrap();
330 match streams.get_mut(socket_id).and_then(|opt| opt.take()) {
331 Some(s) => s,
332 None => {
333 return push(stack, Value::Bool(false));
334 }
335 }
336 };
337 let write_result = stream.write_all(data.as_bytes());
341 let flush_result = if write_result.is_ok() {
342 stream.flush()
343 } else {
344 write_result
345 };
346
347 {
349 let mut streams = STREAMS.lock().unwrap();
350 if let Some(slot) = streams.get_mut(socket_id) {
351 *slot = Some(stream);
352 }
353 }
354
355 push(stack, Value::Bool(flush_result.is_ok()))
356 }
357}
358
359#[unsafe(no_mangle)]
369pub unsafe extern "C" fn patch_seq_tcp_close(stack: Stack) -> Stack {
370 unsafe {
371 let (stack, socket_id_val) = pop(stack);
372 let socket_id = match socket_id_val {
373 Value::Int(id) => id as usize,
374 _ => {
375 return push(stack, Value::Bool(false));
376 }
377 };
378
379 let mut streams = STREAMS.lock().unwrap();
381 let existed = streams
382 .get_mut(socket_id)
383 .map(|slot| slot.is_some())
384 .unwrap_or(false);
385
386 if existed {
387 streams.free(socket_id);
388 }
389
390 push(stack, Value::Bool(existed))
391 }
392}
393
394pub use patch_seq_tcp_accept as tcp_accept;
396pub use patch_seq_tcp_close as tcp_close;
397pub use patch_seq_tcp_listen as tcp_listen;
398pub use patch_seq_tcp_read as tcp_read;
399pub use patch_seq_tcp_write as tcp_write;
400
401#[cfg(test)]
402mod tests;