1use shape_value::ValueWord;
9use shape_value::heap_value::{IoHandleData, IoResource};
10use std::io::{Read, Write};
11use std::sync::Arc;
12
13pub fn io_tcp_connect(
19 args: &[ValueWord],
20 ctx: &crate::module_exports::ModuleContext,
21) -> Result<ValueWord, String> {
22 let addr = args
23 .first()
24 .and_then(|a| a.as_str())
25 .ok_or_else(|| "io.tcp_connect() requires a string address".to_string())?;
26 crate::module_exports::check_net_permission(ctx, shape_abi_v1::Permission::NetConnect, addr)?;
27
28 let stream = std::net::TcpStream::connect(addr)
29 .map_err(|e| format!("io.tcp_connect(\"{}\"): {}", addr, e))?;
30
31 let handle = IoHandleData::new_tcp_stream(stream, addr.to_string());
32 Ok(ValueWord::from_io_handle(handle))
33}
34
35pub fn io_tcp_listen(
39 args: &[ValueWord],
40 ctx: &crate::module_exports::ModuleContext,
41) -> Result<ValueWord, String> {
42 let addr = args
43 .first()
44 .and_then(|a| a.as_str())
45 .ok_or_else(|| "io.tcp_listen() requires a string address".to_string())?;
46 crate::module_exports::check_net_permission(ctx, shape_abi_v1::Permission::NetListen, addr)?;
47
48 let listener = std::net::TcpListener::bind(addr)
49 .map_err(|e| format!("io.tcp_listen(\"{}\"): {}", addr, e))?;
50
51 let handle = IoHandleData::new_tcp_listener(listener, addr.to_string());
52 Ok(ValueWord::from_io_handle(handle))
53}
54
55pub fn io_tcp_accept(
60 args: &[ValueWord],
61 ctx: &crate::module_exports::ModuleContext,
62) -> Result<ValueWord, String> {
63 crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetListen)?;
64 let handle = args
65 .first()
66 .and_then(|a| a.as_io_handle())
67 .ok_or_else(|| "io.tcp_accept() requires a TcpListener IoHandle".to_string())?;
68
69 let guard = handle
70 .resource
71 .lock()
72 .map_err(|_| "io.tcp_accept(): lock poisoned".to_string())?;
73 let resource = guard
74 .as_ref()
75 .ok_or_else(|| "io.tcp_accept(): handle is closed".to_string())?;
76
77 match resource {
78 IoResource::TcpListener(listener) => {
79 let (stream, peer) = listener
80 .accept()
81 .map_err(|e| format!("io.tcp_accept(): {}", e))?;
82 let peer_str = peer.to_string();
83 let client = IoHandleData::new_tcp_stream(stream, peer_str);
84 Ok(ValueWord::from_io_handle(client))
85 }
86 _ => Err("io.tcp_accept(): handle is not a TcpListener".to_string()),
87 }
88}
89
90pub fn io_tcp_read(
95 args: &[ValueWord],
96 ctx: &crate::module_exports::ModuleContext,
97) -> Result<ValueWord, String> {
98 crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
99 let handle = args
100 .first()
101 .and_then(|a| a.as_io_handle())
102 .ok_or_else(|| "io.tcp_read() requires a TcpStream IoHandle".to_string())?;
103
104 let n = args.get(1).and_then(|a| a.as_number_coerce());
105
106 let mut guard = handle
107 .resource
108 .lock()
109 .map_err(|_| "io.tcp_read(): lock poisoned".to_string())?;
110 let resource = guard
111 .as_mut()
112 .ok_or_else(|| "io.tcp_read(): handle is closed".to_string())?;
113
114 match resource {
115 IoResource::TcpStream(stream) => {
116 let buf_size = n.map(|v| v as usize).unwrap_or(65536);
117 let mut buf = vec![0u8; buf_size];
118 let bytes_read = stream
119 .read(&mut buf)
120 .map_err(|e| format!("io.tcp_read(): {}", e))?;
121 buf.truncate(bytes_read);
122 let s = String::from_utf8(buf)
123 .map_err(|e| format!("io.tcp_read(): invalid UTF-8: {}", e))?;
124 Ok(ValueWord::from_string(Arc::new(s)))
125 }
126 _ => Err("io.tcp_read(): handle is not a TcpStream".to_string()),
127 }
128}
129
130pub fn io_tcp_write(
134 args: &[ValueWord],
135 ctx: &crate::module_exports::ModuleContext,
136) -> Result<ValueWord, String> {
137 crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
138 let handle = args
139 .first()
140 .and_then(|a| a.as_io_handle())
141 .ok_or_else(|| "io.tcp_write() requires a TcpStream IoHandle".to_string())?;
142
143 let data = args
144 .get(1)
145 .and_then(|a| a.as_str())
146 .ok_or_else(|| "io.tcp_write() requires a string as second argument".to_string())?;
147
148 let mut guard = handle
149 .resource
150 .lock()
151 .map_err(|_| "io.tcp_write(): lock poisoned".to_string())?;
152 let resource = guard
153 .as_mut()
154 .ok_or_else(|| "io.tcp_write(): handle is closed".to_string())?;
155
156 match resource {
157 IoResource::TcpStream(stream) => {
158 let written = stream
159 .write(data.as_bytes())
160 .map_err(|e| format!("io.tcp_write(): {}", e))?;
161 Ok(ValueWord::from_i64(written as i64))
162 }
163 _ => Err("io.tcp_write(): handle is not a TcpStream".to_string()),
164 }
165}
166
167pub fn io_tcp_close(
171 args: &[ValueWord],
172 ctx: &crate::module_exports::ModuleContext,
173) -> Result<ValueWord, String> {
174 crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
175 let handle = args
176 .first()
177 .and_then(|a| a.as_io_handle())
178 .ok_or_else(|| "io.tcp_close() requires an IoHandle".to_string())?;
179
180 Ok(ValueWord::from_bool(handle.close()))
181}
182
183pub fn io_udp_bind(
189 args: &[ValueWord],
190 ctx: &crate::module_exports::ModuleContext,
191) -> Result<ValueWord, String> {
192 let addr = args
193 .first()
194 .and_then(|a| a.as_str())
195 .ok_or_else(|| "io.udp_bind() requires a string address".to_string())?;
196 crate::module_exports::check_net_permission(ctx, shape_abi_v1::Permission::NetListen, addr)?;
197
198 let socket =
199 std::net::UdpSocket::bind(addr).map_err(|e| format!("io.udp_bind(\"{}\"): {}", addr, e))?;
200
201 let local = socket
203 .local_addr()
204 .map(|a| a.to_string())
205 .unwrap_or_else(|_| addr.to_string());
206
207 let handle = IoHandleData::new_udp_socket(socket, local);
208 Ok(ValueWord::from_io_handle(handle))
209}
210
211pub fn io_udp_send(
215 args: &[ValueWord],
216 ctx: &crate::module_exports::ModuleContext,
217) -> Result<ValueWord, String> {
218 crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
219 let handle = args
220 .first()
221 .and_then(|a| a.as_io_handle())
222 .ok_or_else(|| "io.udp_send() requires a UdpSocket IoHandle".to_string())?;
223
224 let data = args
225 .get(1)
226 .and_then(|a| a.as_str())
227 .ok_or_else(|| "io.udp_send() requires a string as second argument".to_string())?;
228
229 let target = args
230 .get(2)
231 .and_then(|a| a.as_str())
232 .ok_or_else(|| "io.udp_send() requires a target address as third argument".to_string())?;
233
234 let guard = handle
235 .resource
236 .lock()
237 .map_err(|_| "io.udp_send(): lock poisoned".to_string())?;
238 let resource = guard
239 .as_ref()
240 .ok_or_else(|| "io.udp_send(): handle is closed".to_string())?;
241
242 match resource {
243 IoResource::UdpSocket(socket) => {
244 let sent = socket
245 .send_to(data.as_bytes(), target)
246 .map_err(|e| format!("io.udp_send(): {}", e))?;
247 Ok(ValueWord::from_i64(sent as i64))
248 }
249 _ => Err("io.udp_send(): handle is not a UdpSocket".to_string()),
250 }
251}
252
253pub fn io_udp_recv(
258 args: &[ValueWord],
259 ctx: &crate::module_exports::ModuleContext,
260) -> Result<ValueWord, String> {
261 crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
262 let handle = args
263 .first()
264 .and_then(|a| a.as_io_handle())
265 .ok_or_else(|| "io.udp_recv() requires a UdpSocket IoHandle".to_string())?;
266
267 let n = args
268 .get(1)
269 .and_then(|a| a.as_number_coerce())
270 .unwrap_or(65536.0) as usize;
271
272 let guard = handle
273 .resource
274 .lock()
275 .map_err(|_| "io.udp_recv(): lock poisoned".to_string())?;
276 let resource = guard
277 .as_ref()
278 .ok_or_else(|| "io.udp_recv(): handle is closed".to_string())?;
279
280 match resource {
281 IoResource::UdpSocket(socket) => {
282 let mut buf = vec![0u8; n];
283 let (bytes_read, src_addr) = socket
284 .recv_from(&mut buf)
285 .map_err(|e| format!("io.udp_recv(): {}", e))?;
286 buf.truncate(bytes_read);
287 let data = String::from_utf8(buf)
288 .map_err(|e| format!("io.udp_recv(): invalid UTF-8: {}", e))?;
289
290 let pairs: Vec<(&str, ValueWord)> = vec![
291 ("data", ValueWord::from_string(Arc::new(data))),
292 (
293 "addr",
294 ValueWord::from_string(Arc::new(src_addr.to_string())),
295 ),
296 ];
297 Ok(crate::type_schema::typed_object_from_pairs(&pairs))
298 }
299 _ => Err("io.udp_recv(): handle is not a UdpSocket".to_string()),
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 fn test_ctx() -> crate::module_exports::ModuleContext<'static> {
308 let registry = Box::leak(Box::new(crate::type_schema::TypeSchemaRegistry::new()));
309 crate::module_exports::ModuleContext {
310 schemas: registry,
311 invoke_callable: None,
312 raw_invoker: None,
313 function_hashes: None,
314 vm_state: None,
315 granted_permissions: None,
316 scope_constraints: None,
317 set_pending_resume: None,
318 set_pending_frame_resume: None,
319 }
320 }
321
322 #[test]
323 fn test_tcp_connect_refused() {
324 let ctx = test_ctx();
325 let result = io_tcp_connect(
327 &[ValueWord::from_string(Arc::new("127.0.0.1:1".to_string()))],
328 &ctx,
329 );
330 assert!(result.is_err());
331 }
332
333 #[test]
334 fn test_tcp_listen_and_accept_echo() {
335 let ctx = test_ctx();
336 let listener_handle = io_tcp_listen(
338 &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
339 &ctx,
340 )
341 .unwrap();
342
343 let bound_addr = {
345 let data = listener_handle.as_io_handle().unwrap();
346 let guard = data.resource.lock().unwrap();
347 match guard.as_ref().unwrap() {
348 IoResource::TcpListener(l) => l.local_addr().unwrap().to_string(),
349 _ => panic!("expected TcpListener"),
350 }
351 };
352
353 {
355 let data = listener_handle.as_io_handle().unwrap();
356 let guard = data.resource.lock().unwrap();
357 if let Some(IoResource::TcpListener(l)) = guard.as_ref() {
358 l.set_nonblocking(false).unwrap();
359 }
360 }
361
362 let addr_clone = bound_addr.clone();
364 let client_thread = std::thread::spawn(move || {
365 let ctx = test_ctx();
366 let stream = std::net::TcpStream::connect(&addr_clone).unwrap();
367 let handle = IoHandleData::new_tcp_stream(stream, addr_clone);
368 let nb = ValueWord::from_io_handle(handle);
369
370 io_tcp_write(
372 &[
373 nb.clone(),
374 ValueWord::from_string(Arc::new("ping".to_string())),
375 ],
376 &ctx,
377 )
378 .unwrap();
379
380 {
383 let h = nb.as_io_handle().unwrap();
384 let g = h.resource.lock().unwrap();
385 if let Some(IoResource::TcpStream(s)) = g.as_ref() {
386 s.set_read_timeout(Some(std::time::Duration::from_secs(5)))
387 .unwrap();
388 }
389 }
390 let response = io_tcp_read(&[nb.clone()], &ctx).unwrap();
391 assert_eq!(response.as_str().unwrap(), "ping");
392
393 io_tcp_close(&[nb], &ctx).unwrap();
394 });
395
396 let server_conn = io_tcp_accept(&[listener_handle.clone()], &ctx).unwrap();
398
399 {
401 let h = server_conn.as_io_handle().unwrap();
402 let g = h.resource.lock().unwrap();
403 if let Some(IoResource::TcpStream(s)) = g.as_ref() {
404 s.set_read_timeout(Some(std::time::Duration::from_secs(5)))
405 .unwrap();
406 }
407 }
408
409 let data = io_tcp_read(&[server_conn.clone()], &ctx).unwrap();
411 assert_eq!(data.as_str().unwrap(), "ping");
412
413 io_tcp_write(&[server_conn.clone(), data], &ctx).unwrap();
415
416 client_thread.join().unwrap();
418
419 io_tcp_close(&[server_conn], &ctx).unwrap();
421 io_tcp_close(&[listener_handle], &ctx).unwrap();
422 }
423
424 #[test]
425 fn test_tcp_close_returns_false_on_double_close() {
426 let ctx = test_ctx();
427 let listener = io_tcp_listen(
428 &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
429 &ctx,
430 )
431 .unwrap();
432
433 let first = io_tcp_close(&[listener.clone()], &ctx).unwrap();
434 assert_eq!(first.as_bool(), Some(true));
435
436 let second = io_tcp_close(&[listener], &ctx).unwrap();
437 assert_eq!(second.as_bool(), Some(false));
438 }
439
440 #[test]
441 fn test_tcp_read_on_closed_handle() {
442 let ctx = test_ctx();
443 let listener = io_tcp_listen(
444 &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
445 &ctx,
446 )
447 .unwrap();
448
449 let addr = {
451 let h = listener.as_io_handle().unwrap();
452 let g = h.resource.lock().unwrap();
453 match g.as_ref().unwrap() {
454 IoResource::TcpListener(l) => l.local_addr().unwrap().to_string(),
455 _ => panic!(),
456 }
457 };
458
459 let conn = io_tcp_connect(&[ValueWord::from_string(Arc::new(addr))], &ctx).unwrap();
460 io_tcp_close(&[conn.clone()], &ctx).unwrap();
461
462 let result = io_tcp_read(&[conn], &ctx);
463 assert!(result.is_err());
464 assert!(result.unwrap_err().contains("closed"));
465 }
466
467 #[test]
468 fn test_udp_send_recv() {
469 let ctx = test_ctx();
470 let sock_a = io_udp_bind(
472 &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
473 &ctx,
474 )
475 .unwrap();
476 let sock_b = io_udp_bind(
477 &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
478 &ctx,
479 )
480 .unwrap();
481
482 let addr_b = {
484 let h = sock_b.as_io_handle().unwrap();
485 let g = h.resource.lock().unwrap();
486 match g.as_ref().unwrap() {
487 IoResource::UdpSocket(s) => s.local_addr().unwrap().to_string(),
488 _ => panic!(),
489 }
490 };
491
492 {
494 let h = sock_b.as_io_handle().unwrap();
495 let g = h.resource.lock().unwrap();
496 if let Some(IoResource::UdpSocket(s)) = g.as_ref() {
497 s.set_read_timeout(Some(std::time::Duration::from_secs(5)))
498 .unwrap();
499 }
500 }
501
502 let sent = io_udp_send(
504 &[
505 sock_a.clone(),
506 ValueWord::from_string(Arc::new("hello udp".to_string())),
507 ValueWord::from_string(Arc::new(addr_b)),
508 ],
509 &ctx,
510 )
511 .unwrap();
512 assert_eq!(sent.as_number_coerce(), Some(9.0)); let recv_result = io_udp_recv(&[sock_b.clone()], &ctx).unwrap();
516 assert_eq!(recv_result.type_name(), "object");
519
520 io_tcp_close(&[sock_a], &ctx).unwrap();
522 io_tcp_close(&[sock_b], &ctx).unwrap();
523 }
524
525 #[test]
526 fn test_udp_bind_ephemeral() {
527 let ctx = test_ctx();
528 let handle = io_udp_bind(
529 &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
530 &ctx,
531 )
532 .unwrap();
533 assert_eq!(handle.type_name(), "io_handle");
534
535 let h = handle.as_io_handle().unwrap();
537 assert!(h.path.starts_with("127.0.0.1:"));
538 let port: u16 = h.path.split(':').last().unwrap().parse().unwrap();
540 assert!(port > 0);
541
542 io_tcp_close(&[handle], &ctx).unwrap();
543 }
544
545 #[test]
546 fn test_tcp_accept_on_non_listener() {
547 let ctx = test_ctx();
548 let listener = io_tcp_listen(
550 &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
551 &ctx,
552 )
553 .unwrap();
554 let addr = {
555 let h = listener.as_io_handle().unwrap();
556 let g = h.resource.lock().unwrap();
557 match g.as_ref().unwrap() {
558 IoResource::TcpListener(l) => l.local_addr().unwrap().to_string(),
559 _ => panic!(),
560 }
561 };
562
563 let stream = io_tcp_connect(&[ValueWord::from_string(Arc::new(addr))], &ctx).unwrap();
564
565 let result = io_tcp_accept(&[stream.clone()], &ctx);
567 assert!(result.is_err());
568 assert!(result.unwrap_err().contains("not a TcpListener"));
569
570 io_tcp_close(&[stream], &ctx).unwrap();
571 io_tcp_close(&[listener], &ctx).unwrap();
572 }
573}