Skip to main content

shape_runtime/stdlib_io/
network_ops.rs

1//! Network operation implementations for the io module.
2//!
3//! TCP: tcp_connect, tcp_listen, tcp_accept, tcp_read, tcp_write, tcp_close
4//! UDP: udp_bind, udp_send, udp_recv
5//!
6//! All operations use blocking std::net (not tokio).
7
8use shape_value::ValueWord;
9use shape_value::heap_value::{IoHandleData, IoResource};
10use std::io::{Read, Write};
11use std::sync::Arc;
12
13// ── TCP ─────────────────────────────────────────────────────────────────────
14
15/// io.tcp_connect(addr) -> IoHandle
16///
17/// Connect to a TCP server at `addr` (e.g. "127.0.0.1:8080").
18pub fn io_tcp_connect(
19    args: &[ValueWord],
20    ctx: &crate::module_exports::ModuleContext,
21) -> Result<ValueWord, String> {
22    let addr = args
23        .first()
24        .and_then(|a| a.as_str())
25        .ok_or_else(|| "io.tcp_connect() requires a string address".to_string())?;
26    crate::module_exports::check_net_permission(ctx, shape_abi_v1::Permission::NetConnect, addr)?;
27
28    let stream = std::net::TcpStream::connect(addr)
29        .map_err(|e| format!("io.tcp_connect(\"{}\"): {}", addr, e))?;
30
31    let handle = IoHandleData::new_tcp_stream(stream, addr.to_string());
32    Ok(ValueWord::from_io_handle(handle))
33}
34
35/// io.tcp_listen(addr) -> IoHandle
36///
37/// Bind a TCP listener to `addr` (e.g. "0.0.0.0:8080").
38pub fn io_tcp_listen(
39    args: &[ValueWord],
40    ctx: &crate::module_exports::ModuleContext,
41) -> Result<ValueWord, String> {
42    let addr = args
43        .first()
44        .and_then(|a| a.as_str())
45        .ok_or_else(|| "io.tcp_listen() requires a string address".to_string())?;
46    crate::module_exports::check_net_permission(ctx, shape_abi_v1::Permission::NetListen, addr)?;
47
48    let listener = std::net::TcpListener::bind(addr)
49        .map_err(|e| format!("io.tcp_listen(\"{}\"): {}", addr, e))?;
50
51    let handle = IoHandleData::new_tcp_listener(listener, addr.to_string());
52    Ok(ValueWord::from_io_handle(handle))
53}
54
55/// io.tcp_accept(listener) -> IoHandle
56///
57/// Accept the next incoming connection on a TcpListener.
58/// Returns a new IoHandle (TcpStream) for the accepted connection.
59pub fn io_tcp_accept(
60    args: &[ValueWord],
61    ctx: &crate::module_exports::ModuleContext,
62) -> Result<ValueWord, String> {
63    crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetListen)?;
64    let handle = args
65        .first()
66        .and_then(|a| a.as_io_handle())
67        .ok_or_else(|| "io.tcp_accept() requires a TcpListener IoHandle".to_string())?;
68
69    let guard = handle
70        .resource
71        .lock()
72        .map_err(|_| "io.tcp_accept(): lock poisoned".to_string())?;
73    let resource = guard
74        .as_ref()
75        .ok_or_else(|| "io.tcp_accept(): handle is closed".to_string())?;
76
77    match resource {
78        IoResource::TcpListener(listener) => {
79            let (stream, peer) = listener
80                .accept()
81                .map_err(|e| format!("io.tcp_accept(): {}", e))?;
82            let peer_str = peer.to_string();
83            let client = IoHandleData::new_tcp_stream(stream, peer_str);
84            Ok(ValueWord::from_io_handle(client))
85        }
86        _ => Err("io.tcp_accept(): handle is not a TcpListener".to_string()),
87    }
88}
89
90/// io.tcp_read(handle, n?) -> string
91///
92/// Read from a TCP stream. If `n` is given, read up to `n` bytes;
93/// otherwise read whatever is available in a single recv (up to 64KB).
94pub fn io_tcp_read(
95    args: &[ValueWord],
96    ctx: &crate::module_exports::ModuleContext,
97) -> Result<ValueWord, String> {
98    crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
99    let handle = args
100        .first()
101        .and_then(|a| a.as_io_handle())
102        .ok_or_else(|| "io.tcp_read() requires a TcpStream IoHandle".to_string())?;
103
104    let n = args.get(1).and_then(|a| a.as_number_coerce());
105
106    let mut guard = handle
107        .resource
108        .lock()
109        .map_err(|_| "io.tcp_read(): lock poisoned".to_string())?;
110    let resource = guard
111        .as_mut()
112        .ok_or_else(|| "io.tcp_read(): handle is closed".to_string())?;
113
114    match resource {
115        IoResource::TcpStream(stream) => {
116            let buf_size = n.map(|v| v as usize).unwrap_or(65536);
117            let mut buf = vec![0u8; buf_size];
118            let bytes_read = stream
119                .read(&mut buf)
120                .map_err(|e| format!("io.tcp_read(): {}", e))?;
121            buf.truncate(bytes_read);
122            let s = String::from_utf8(buf)
123                .map_err(|e| format!("io.tcp_read(): invalid UTF-8: {}", e))?;
124            Ok(ValueWord::from_string(Arc::new(s)))
125        }
126        _ => Err("io.tcp_read(): handle is not a TcpStream".to_string()),
127    }
128}
129
130/// io.tcp_write(handle, data) -> int
131///
132/// Write a string to a TCP stream. Returns bytes written.
133pub fn io_tcp_write(
134    args: &[ValueWord],
135    ctx: &crate::module_exports::ModuleContext,
136) -> Result<ValueWord, String> {
137    crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
138    let handle = args
139        .first()
140        .and_then(|a| a.as_io_handle())
141        .ok_or_else(|| "io.tcp_write() requires a TcpStream IoHandle".to_string())?;
142
143    let data = args
144        .get(1)
145        .and_then(|a| a.as_str())
146        .ok_or_else(|| "io.tcp_write() requires a string as second argument".to_string())?;
147
148    let mut guard = handle
149        .resource
150        .lock()
151        .map_err(|_| "io.tcp_write(): lock poisoned".to_string())?;
152    let resource = guard
153        .as_mut()
154        .ok_or_else(|| "io.tcp_write(): handle is closed".to_string())?;
155
156    match resource {
157        IoResource::TcpStream(stream) => {
158            let written = stream
159                .write(data.as_bytes())
160                .map_err(|e| format!("io.tcp_write(): {}", e))?;
161            Ok(ValueWord::from_i64(written as i64))
162        }
163        _ => Err("io.tcp_write(): handle is not a TcpStream".to_string()),
164    }
165}
166
167/// io.tcp_close(handle) -> bool
168///
169/// Shut down and close a TCP stream or listener. Returns true if it was open.
170pub fn io_tcp_close(
171    args: &[ValueWord],
172    ctx: &crate::module_exports::ModuleContext,
173) -> Result<ValueWord, String> {
174    crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
175    let handle = args
176        .first()
177        .and_then(|a| a.as_io_handle())
178        .ok_or_else(|| "io.tcp_close() requires an IoHandle".to_string())?;
179
180    Ok(ValueWord::from_bool(handle.close()))
181}
182
183// ── UDP ─────────────────────────────────────────────────────────────────────
184
185/// io.udp_bind(addr) -> IoHandle
186///
187/// Bind a UDP socket to `addr` (e.g. "0.0.0.0:0" for ephemeral port).
188pub fn io_udp_bind(
189    args: &[ValueWord],
190    ctx: &crate::module_exports::ModuleContext,
191) -> Result<ValueWord, String> {
192    let addr = args
193        .first()
194        .and_then(|a| a.as_str())
195        .ok_or_else(|| "io.udp_bind() requires a string address".to_string())?;
196    crate::module_exports::check_net_permission(ctx, shape_abi_v1::Permission::NetListen, addr)?;
197
198    let socket =
199        std::net::UdpSocket::bind(addr).map_err(|e| format!("io.udp_bind(\"{}\"): {}", addr, e))?;
200
201    // Get the actual bound address (useful when binding to port 0)
202    let local = socket
203        .local_addr()
204        .map(|a| a.to_string())
205        .unwrap_or_else(|_| addr.to_string());
206
207    let handle = IoHandleData::new_udp_socket(socket, local);
208    Ok(ValueWord::from_io_handle(handle))
209}
210
211/// io.udp_send(handle, data, target_addr) -> int
212///
213/// Send a datagram to `target_addr`. Returns bytes sent.
214pub fn io_udp_send(
215    args: &[ValueWord],
216    ctx: &crate::module_exports::ModuleContext,
217) -> Result<ValueWord, String> {
218    crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
219    let handle = args
220        .first()
221        .and_then(|a| a.as_io_handle())
222        .ok_or_else(|| "io.udp_send() requires a UdpSocket IoHandle".to_string())?;
223
224    let data = args
225        .get(1)
226        .and_then(|a| a.as_str())
227        .ok_or_else(|| "io.udp_send() requires a string as second argument".to_string())?;
228
229    let target = args
230        .get(2)
231        .and_then(|a| a.as_str())
232        .ok_or_else(|| "io.udp_send() requires a target address as third argument".to_string())?;
233
234    let guard = handle
235        .resource
236        .lock()
237        .map_err(|_| "io.udp_send(): lock poisoned".to_string())?;
238    let resource = guard
239        .as_ref()
240        .ok_or_else(|| "io.udp_send(): handle is closed".to_string())?;
241
242    match resource {
243        IoResource::UdpSocket(socket) => {
244            let sent = socket
245                .send_to(data.as_bytes(), target)
246                .map_err(|e| format!("io.udp_send(): {}", e))?;
247            Ok(ValueWord::from_i64(sent as i64))
248        }
249        _ => Err("io.udp_send(): handle is not a UdpSocket".to_string()),
250    }
251}
252
253/// io.udp_recv(handle, n?) -> object { data: string, addr: string }
254///
255/// Receive a datagram. Returns an object with the data and sender address.
256/// `n` is the max receive buffer (default 65536).
257pub fn io_udp_recv(
258    args: &[ValueWord],
259    ctx: &crate::module_exports::ModuleContext,
260) -> Result<ValueWord, String> {
261    crate::module_exports::check_permission(ctx, shape_abi_v1::Permission::NetConnect)?;
262    let handle = args
263        .first()
264        .and_then(|a| a.as_io_handle())
265        .ok_or_else(|| "io.udp_recv() requires a UdpSocket IoHandle".to_string())?;
266
267    let n = args
268        .get(1)
269        .and_then(|a| a.as_number_coerce())
270        .unwrap_or(65536.0) as usize;
271
272    let guard = handle
273        .resource
274        .lock()
275        .map_err(|_| "io.udp_recv(): lock poisoned".to_string())?;
276    let resource = guard
277        .as_ref()
278        .ok_or_else(|| "io.udp_recv(): handle is closed".to_string())?;
279
280    match resource {
281        IoResource::UdpSocket(socket) => {
282            let mut buf = vec![0u8; n];
283            let (bytes_read, src_addr) = socket
284                .recv_from(&mut buf)
285                .map_err(|e| format!("io.udp_recv(): {}", e))?;
286            buf.truncate(bytes_read);
287            let data = String::from_utf8(buf)
288                .map_err(|e| format!("io.udp_recv(): invalid UTF-8: {}", e))?;
289
290            let pairs: Vec<(&str, ValueWord)> = vec![
291                ("data", ValueWord::from_string(Arc::new(data))),
292                (
293                    "addr",
294                    ValueWord::from_string(Arc::new(src_addr.to_string())),
295                ),
296            ];
297            Ok(crate::type_schema::typed_object_from_pairs(&pairs))
298        }
299        _ => Err("io.udp_recv(): handle is not a UdpSocket".to_string()),
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    fn test_ctx() -> crate::module_exports::ModuleContext<'static> {
308        let registry = Box::leak(Box::new(crate::type_schema::TypeSchemaRegistry::new()));
309        crate::module_exports::ModuleContext {
310            schemas: registry,
311            invoke_callable: None,
312            raw_invoker: None,
313            function_hashes: None,
314            vm_state: None,
315            granted_permissions: None,
316            scope_constraints: None,
317            set_pending_resume: None,
318            set_pending_frame_resume: None,
319        }
320    }
321
322    #[test]
323    fn test_tcp_connect_refused() {
324        let ctx = test_ctx();
325        // Connecting to a port that nothing listens on should fail
326        let result = io_tcp_connect(
327            &[ValueWord::from_string(Arc::new("127.0.0.1:1".to_string()))],
328            &ctx,
329        );
330        assert!(result.is_err());
331    }
332
333    #[test]
334    fn test_tcp_listen_and_accept_echo() {
335        let ctx = test_ctx();
336        // Bind listener to ephemeral port
337        let listener_handle = io_tcp_listen(
338            &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
339            &ctx,
340        )
341        .unwrap();
342
343        // Get the actual bound address from the listener
344        let bound_addr = {
345            let data = listener_handle.as_io_handle().unwrap();
346            let guard = data.resource.lock().unwrap();
347            match guard.as_ref().unwrap() {
348                IoResource::TcpListener(l) => l.local_addr().unwrap().to_string(),
349                _ => panic!("expected TcpListener"),
350            }
351        };
352
353        // Set listener to non-blocking so accept won't hang if test breaks
354        {
355            let data = listener_handle.as_io_handle().unwrap();
356            let guard = data.resource.lock().unwrap();
357            if let Some(IoResource::TcpListener(l)) = guard.as_ref() {
358                l.set_nonblocking(false).unwrap();
359            }
360        }
361
362        // Connect from a client in a background thread
363        let addr_clone = bound_addr.clone();
364        let client_thread = std::thread::spawn(move || {
365            let ctx = test_ctx();
366            let stream = std::net::TcpStream::connect(&addr_clone).unwrap();
367            let handle = IoHandleData::new_tcp_stream(stream, addr_clone);
368            let nb = ValueWord::from_io_handle(handle);
369
370            // Write
371            io_tcp_write(
372                &[
373                    nb.clone(),
374                    ValueWord::from_string(Arc::new("ping".to_string())),
375                ],
376                &ctx,
377            )
378            .unwrap();
379
380            // Read echo back
381            // Set a read timeout so we don't hang forever
382            {
383                let h = nb.as_io_handle().unwrap();
384                let g = h.resource.lock().unwrap();
385                if let Some(IoResource::TcpStream(s)) = g.as_ref() {
386                    s.set_read_timeout(Some(std::time::Duration::from_secs(5)))
387                        .unwrap();
388                }
389            }
390            let response = io_tcp_read(&[nb.clone()], &ctx).unwrap();
391            assert_eq!(response.as_str().unwrap(), "ping");
392
393            io_tcp_close(&[nb], &ctx).unwrap();
394        });
395
396        // Accept the incoming connection
397        let server_conn = io_tcp_accept(&[listener_handle.clone()], &ctx).unwrap();
398
399        // Set read timeout on server connection
400        {
401            let h = server_conn.as_io_handle().unwrap();
402            let g = h.resource.lock().unwrap();
403            if let Some(IoResource::TcpStream(s)) = g.as_ref() {
404                s.set_read_timeout(Some(std::time::Duration::from_secs(5)))
405                    .unwrap();
406            }
407        }
408
409        // Read from client
410        let data = io_tcp_read(&[server_conn.clone()], &ctx).unwrap();
411        assert_eq!(data.as_str().unwrap(), "ping");
412
413        // Echo back
414        io_tcp_write(&[server_conn.clone(), data], &ctx).unwrap();
415
416        // Wait for client thread
417        client_thread.join().unwrap();
418
419        // Close everything
420        io_tcp_close(&[server_conn], &ctx).unwrap();
421        io_tcp_close(&[listener_handle], &ctx).unwrap();
422    }
423
424    #[test]
425    fn test_tcp_close_returns_false_on_double_close() {
426        let ctx = test_ctx();
427        let listener = io_tcp_listen(
428            &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
429            &ctx,
430        )
431        .unwrap();
432
433        let first = io_tcp_close(&[listener.clone()], &ctx).unwrap();
434        assert_eq!(first.as_bool(), Some(true));
435
436        let second = io_tcp_close(&[listener], &ctx).unwrap();
437        assert_eq!(second.as_bool(), Some(false));
438    }
439
440    #[test]
441    fn test_tcp_read_on_closed_handle() {
442        let ctx = test_ctx();
443        let listener = io_tcp_listen(
444            &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
445            &ctx,
446        )
447        .unwrap();
448
449        // Get the addr for a client connect
450        let addr = {
451            let h = listener.as_io_handle().unwrap();
452            let g = h.resource.lock().unwrap();
453            match g.as_ref().unwrap() {
454                IoResource::TcpListener(l) => l.local_addr().unwrap().to_string(),
455                _ => panic!(),
456            }
457        };
458
459        let conn = io_tcp_connect(&[ValueWord::from_string(Arc::new(addr))], &ctx).unwrap();
460        io_tcp_close(&[conn.clone()], &ctx).unwrap();
461
462        let result = io_tcp_read(&[conn], &ctx);
463        assert!(result.is_err());
464        assert!(result.unwrap_err().contains("closed"));
465    }
466
467    #[test]
468    fn test_udp_send_recv() {
469        let ctx = test_ctx();
470        // Bind two sockets
471        let sock_a = io_udp_bind(
472            &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
473            &ctx,
474        )
475        .unwrap();
476        let sock_b = io_udp_bind(
477            &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
478            &ctx,
479        )
480        .unwrap();
481
482        // Get sock_b's address
483        let addr_b = {
484            let h = sock_b.as_io_handle().unwrap();
485            let g = h.resource.lock().unwrap();
486            match g.as_ref().unwrap() {
487                IoResource::UdpSocket(s) => s.local_addr().unwrap().to_string(),
488                _ => panic!(),
489            }
490        };
491
492        // Set recv timeout on sock_b
493        {
494            let h = sock_b.as_io_handle().unwrap();
495            let g = h.resource.lock().unwrap();
496            if let Some(IoResource::UdpSocket(s)) = g.as_ref() {
497                s.set_read_timeout(Some(std::time::Duration::from_secs(5)))
498                    .unwrap();
499            }
500        }
501
502        // Send from A to B
503        let sent = io_udp_send(
504            &[
505                sock_a.clone(),
506                ValueWord::from_string(Arc::new("hello udp".to_string())),
507                ValueWord::from_string(Arc::new(addr_b)),
508            ],
509            &ctx,
510        )
511        .unwrap();
512        assert_eq!(sent.as_number_coerce(), Some(9.0)); // "hello udp" = 9 bytes
513
514        // Receive on B
515        let recv_result = io_udp_recv(&[sock_b.clone()], &ctx).unwrap();
516        // recv_result is a TypedObject { data, addr }
517        // We verify it's an object type
518        assert_eq!(recv_result.type_name(), "object");
519
520        // Close both
521        io_tcp_close(&[sock_a], &ctx).unwrap();
522        io_tcp_close(&[sock_b], &ctx).unwrap();
523    }
524
525    #[test]
526    fn test_udp_bind_ephemeral() {
527        let ctx = test_ctx();
528        let handle = io_udp_bind(
529            &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
530            &ctx,
531        )
532        .unwrap();
533        assert_eq!(handle.type_name(), "io_handle");
534
535        // The path should show the actual bound address, not "127.0.0.1:0"
536        let h = handle.as_io_handle().unwrap();
537        assert!(h.path.starts_with("127.0.0.1:"));
538        // Port should be non-zero since OS picks one
539        let port: u16 = h.path.split(':').last().unwrap().parse().unwrap();
540        assert!(port > 0);
541
542        io_tcp_close(&[handle], &ctx).unwrap();
543    }
544
545    #[test]
546    fn test_tcp_accept_on_non_listener() {
547        let ctx = test_ctx();
548        // Create a TCP stream handle and try accept -- should fail
549        let listener = io_tcp_listen(
550            &[ValueWord::from_string(Arc::new("127.0.0.1:0".to_string()))],
551            &ctx,
552        )
553        .unwrap();
554        let addr = {
555            let h = listener.as_io_handle().unwrap();
556            let g = h.resource.lock().unwrap();
557            match g.as_ref().unwrap() {
558                IoResource::TcpListener(l) => l.local_addr().unwrap().to_string(),
559                _ => panic!(),
560            }
561        };
562
563        let stream = io_tcp_connect(&[ValueWord::from_string(Arc::new(addr))], &ctx).unwrap();
564
565        // Accept on the *stream* handle should fail
566        let result = io_tcp_accept(&[stream.clone()], &ctx);
567        assert!(result.is_err());
568        assert!(result.unwrap_err().contains("not a TcpListener"));
569
570        io_tcp_close(&[stream], &ctx).unwrap();
571        io_tcp_close(&[listener], &ctx).unwrap();
572    }
573}