rasi_default/
net.rs

1//! The implementation of [`Network`] syscall.
2
3use std::{
4    io::{self, Read, Write},
5    ops::Deref,
6};
7
8use mio::{
9    net::{TcpListener, TcpStream, UdpSocket},
10    Interest, Token,
11};
12use rasi_syscall::{ready, register_global_network, Handle, Network};
13
14use crate::{
15    reactor::{global_reactor, would_block, MioSocket},
16    TokenSequence,
17};
18
19/// This type implements the system call [`Network`] using the underlying [`mio`].
20#[derive(Default)]
21pub struct MioNetwork {}
22
23impl Network for MioNetwork {
24    fn udp_join_multicast_v4(
25        &self,
26        handle: &rasi_syscall::Handle,
27        multiaddr: &std::net::Ipv4Addr,
28        interface: &std::net::Ipv4Addr,
29    ) -> io::Result<()> {
30        let socket = handle
31            .downcast::<MioSocket<UdpSocket>>()
32            .expect("Expect udpsocket");
33
34        socket.join_multicast_v4(multiaddr, interface)
35    }
36
37    fn udp_join_multicast_v6(
38        &self,
39        handle: &rasi_syscall::Handle,
40        multiaddr: &std::net::Ipv6Addr,
41        interface: u32,
42    ) -> io::Result<()> {
43        let socket = handle
44            .downcast::<MioSocket<UdpSocket>>()
45            .expect("Expect udpsocket");
46
47        socket.join_multicast_v6(multiaddr, interface)
48    }
49
50    fn udp_leave_multicast_v4(
51        &self,
52        handle: &rasi_syscall::Handle,
53        multiaddr: &std::net::Ipv4Addr,
54        interface: &std::net::Ipv4Addr,
55    ) -> io::Result<()> {
56        let socket = handle
57            .downcast::<MioSocket<UdpSocket>>()
58            .expect("Expect udpsocket");
59
60        socket.leave_multicast_v4(multiaddr, interface)
61    }
62
63    fn udp_leave_multicast_v6(
64        &self,
65        handle: &rasi_syscall::Handle,
66        multiaddr: &std::net::Ipv6Addr,
67        interface: u32,
68    ) -> io::Result<()> {
69        let socket = handle
70            .downcast::<MioSocket<UdpSocket>>()
71            .expect("Expect udpsocket");
72
73        socket.leave_multicast_v6(multiaddr, interface)
74    }
75    fn udp_set_broadcast(&self, handle: &rasi_syscall::Handle, on: bool) -> std::io::Result<()> {
76        let socket = handle
77            .downcast::<MioSocket<UdpSocket>>()
78            .expect("Expect udpsocket");
79
80        socket.set_broadcast(on)
81    }
82
83    fn udp_broadcast(&self, handle: &rasi_syscall::Handle) -> std::io::Result<bool> {
84        let socket = handle
85            .downcast::<MioSocket<UdpSocket>>()
86            .expect("Expect udpsocket");
87
88        socket.broadcast()
89    }
90
91    fn udp_ttl(&self, handle: &rasi_syscall::Handle) -> std::io::Result<u32> {
92        let socket = handle
93            .downcast::<MioSocket<UdpSocket>>()
94            .expect("Expect udpsocket");
95
96        socket.ttl()
97    }
98
99    fn udp_set_ttl(&self, handle: &rasi_syscall::Handle, ttl: u32) -> std::io::Result<()> {
100        let socket = handle
101            .downcast::<MioSocket<UdpSocket>>()
102            .expect("Expect udpsocket");
103
104        socket.set_ttl(ttl)
105    }
106
107    fn udp_local_addr(
108        &self,
109        handle: &rasi_syscall::Handle,
110    ) -> std::io::Result<std::net::SocketAddr> {
111        let socket = handle
112            .downcast::<MioSocket<UdpSocket>>()
113            .expect("Expect udpsocket");
114
115        socket.local_addr()
116    }
117
118    fn udp_bind(
119        &self,
120        _waker: std::task::Waker,
121        laddrs: &[std::net::SocketAddr],
122    ) -> rasi_syscall::CancelablePoll<std::io::Result<rasi_syscall::Handle>> {
123        ready(|| {
124            let std_socket = std::net::UdpSocket::bind(laddrs)?;
125
126            std_socket.set_nonblocking(true)?;
127
128            let mut socket = UdpSocket::from_std(std_socket);
129            let token = Token::next();
130
131            global_reactor().register(
132                &mut socket,
133                token,
134                Interest::READABLE.add(Interest::WRITABLE),
135            )?;
136
137            Ok(Handle::new(MioSocket::from((token, socket))))
138        })
139    }
140
141    fn udp_send_to(
142        &self,
143        waker: std::task::Waker,
144        socket: &rasi_syscall::Handle,
145        buf: &[u8],
146        target: std::net::SocketAddr,
147    ) -> rasi_syscall::CancelablePoll<std::io::Result<usize>> {
148        let socket = socket
149            .downcast::<MioSocket<UdpSocket>>()
150            .expect("Expect udpsocket.");
151
152        would_block(socket.token, waker, Interest::WRITABLE, || {
153            socket.send_to(buf, target)
154        })
155    }
156
157    fn udp_recv_from(
158        &self,
159        waker: std::task::Waker,
160        socket: &rasi_syscall::Handle,
161        buf: &mut [u8],
162    ) -> rasi_syscall::CancelablePoll<std::io::Result<(usize, std::net::SocketAddr)>> {
163        let socket = socket
164            .downcast::<MioSocket<UdpSocket>>()
165            .expect("Expect udpsocket.");
166
167        would_block(socket.token, waker, Interest::READABLE, || {
168            socket.recv_from(buf)
169        })
170    }
171
172    fn tcp_listener_bind(
173        &self,
174        _waker: std::task::Waker,
175        laddrs: &[std::net::SocketAddr],
176    ) -> rasi_syscall::CancelablePoll<std::io::Result<rasi_syscall::Handle>> {
177        ready(|| {
178            let std_socket = std::net::TcpListener::bind(laddrs)?;
179
180            std_socket.set_nonblocking(true)?;
181
182            let mut socket = TcpListener::from_std(std_socket);
183            let token = Token::next();
184
185            global_reactor().register(
186                &mut socket,
187                token,
188                Interest::READABLE.add(Interest::WRITABLE),
189            )?;
190
191            Ok(Handle::new(MioSocket::from((token, socket))))
192        })
193    }
194
195    fn tcp_listener_local_addr(
196        &self,
197        handle: &rasi_syscall::Handle,
198    ) -> std::io::Result<std::net::SocketAddr> {
199        let socket = handle
200            .downcast::<MioSocket<TcpListener>>()
201            .expect("Expect TcpListener.");
202
203        socket.local_addr()
204    }
205
206    fn tcp_listener_ttl(&self, handle: &rasi_syscall::Handle) -> std::io::Result<u32> {
207        let socket = handle
208            .downcast::<MioSocket<TcpListener>>()
209            .expect("Expect TcpListener.");
210
211        socket.ttl()
212    }
213
214    fn tcp_listener_set_ttl(&self, handle: &rasi_syscall::Handle, ttl: u32) -> std::io::Result<()> {
215        let socket = handle
216            .downcast::<MioSocket<TcpListener>>()
217            .expect("Expect TcpListener.");
218
219        socket.set_ttl(ttl)
220    }
221
222    fn tcp_listener_accept(
223        &self,
224        waker: std::task::Waker,
225        handle: &rasi_syscall::Handle,
226    ) -> rasi_syscall::CancelablePoll<std::io::Result<(rasi_syscall::Handle, std::net::SocketAddr)>>
227    {
228        let socket = handle
229            .downcast::<MioSocket<TcpListener>>()
230            .expect("Expect TcpListener.");
231
232        would_block(socket.token, waker, Interest::READABLE, || {
233            match socket.accept() {
234                Ok((mut stream, raddr)) => {
235                    let token = Token::next();
236
237                    global_reactor().register(
238                        &mut stream,
239                        token,
240                        Interest::READABLE.add(Interest::WRITABLE),
241                    )?;
242
243                    Ok((Handle::new(MioSocket::from((token, stream))), raddr))
244                }
245                Err(err) => Err(err),
246            }
247        })
248    }
249
250    fn tcp_stream_connect(
251        &self,
252        _waker: std::task::Waker,
253        raddrs: &[std::net::SocketAddr],
254    ) -> rasi_syscall::CancelablePoll<std::io::Result<rasi_syscall::Handle>> {
255        ready(|| {
256            let std_socket = std::net::TcpStream::connect(raddrs)?;
257
258            std_socket.set_nonblocking(true)?;
259
260            let mut socket = TcpStream::from_std(std_socket);
261            let token = Token::next();
262
263            global_reactor().register(
264                &mut socket,
265                token,
266                Interest::READABLE.add(Interest::WRITABLE),
267            )?;
268
269            Ok(Handle::new(MioSocket::from((token, socket))))
270        })
271    }
272
273    fn tcp_stream_write(
274        &self,
275        waker: std::task::Waker,
276        socket: &rasi_syscall::Handle,
277        buf: &[u8],
278    ) -> rasi_syscall::CancelablePoll<std::io::Result<usize>> {
279        let socket = socket
280            .downcast::<MioSocket<TcpStream>>()
281            .expect("Expect TcpStream.");
282
283        would_block(socket.token, waker, Interest::WRITABLE, || {
284            socket.deref().write(buf)
285        })
286    }
287
288    fn tcp_stream_read(
289        &self,
290        waker: std::task::Waker,
291        socket: &rasi_syscall::Handle,
292        buf: &mut [u8],
293    ) -> rasi_syscall::CancelablePoll<std::io::Result<usize>> {
294        let socket = socket
295            .downcast::<MioSocket<TcpStream>>()
296            .expect("Expect TcpStream.");
297
298        would_block(socket.token, waker, Interest::READABLE, || {
299            socket.deref().read(buf)
300        })
301    }
302
303    fn tcp_stream_local_addr(
304        &self,
305        handle: &rasi_syscall::Handle,
306    ) -> std::io::Result<std::net::SocketAddr> {
307        let socket = handle
308            .downcast::<MioSocket<TcpStream>>()
309            .expect("Expect TcpStream.");
310
311        socket.local_addr()
312    }
313
314    fn tcp_stream_remote_addr(
315        &self,
316        handle: &rasi_syscall::Handle,
317    ) -> std::io::Result<std::net::SocketAddr> {
318        let socket = handle
319            .downcast::<MioSocket<TcpStream>>()
320            .expect("Expect TcpStream.");
321
322        socket.peer_addr()
323    }
324
325    fn tcp_stream_nodelay(&self, handle: &rasi_syscall::Handle) -> std::io::Result<bool> {
326        let socket = handle
327            .downcast::<MioSocket<TcpStream>>()
328            .expect("Expect TcpStream.");
329
330        socket.nodelay()
331    }
332
333    fn tcp_stream_set_nodelay(
334        &self,
335        handle: &rasi_syscall::Handle,
336        nodelay: bool,
337    ) -> std::io::Result<()> {
338        let socket = handle
339            .downcast::<MioSocket<TcpStream>>()
340            .expect("Expect TcpStream.");
341
342        socket.set_nodelay(nodelay)
343    }
344
345    fn tcp_stream_ttl(&self, handle: &rasi_syscall::Handle) -> std::io::Result<u32> {
346        let socket = handle
347            .downcast::<MioSocket<TcpStream>>()
348            .expect("Expect TcpStream.");
349
350        socket.ttl()
351    }
352
353    fn tcp_stream_set_ttl(&self, handle: &rasi_syscall::Handle, ttl: u32) -> std::io::Result<()> {
354        let socket = handle
355            .downcast::<MioSocket<TcpStream>>()
356            .expect("Expect TcpStream.");
357
358        socket.set_ttl(ttl)
359    }
360
361    fn tcp_stream_shutdown(
362        &self,
363        handle: &rasi_syscall::Handle,
364        how: std::net::Shutdown,
365    ) -> std::io::Result<()> {
366        let socket = handle
367            .downcast::<MioSocket<TcpStream>>()
368            .expect("Expect TcpStream.");
369
370        socket.shutdown(how)
371    }
372
373    #[cfg(all(unix, feature = "unix_socket"))]
374    fn unix_listener_bind(
375        &self,
376        _waker: std::task::Waker,
377        path: &std::path::Path,
378    ) -> rasi_syscall::CancelablePoll<io::Result<Handle>> {
379        ready(|| {
380            let mut socket = mio::net::UnixListener::bind(path)?;
381
382            let token = Token::next();
383
384            global_reactor().register(
385                &mut socket,
386                token,
387                Interest::READABLE.add(Interest::WRITABLE),
388            )?;
389
390            Ok(Handle::new(MioSocket::from((token, socket))))
391        })
392    }
393
394    #[cfg(all(unix, feature = "unix_socket"))]
395    fn unix_listener_accept(
396        &self,
397        waker: std::task::Waker,
398        handle: &Handle,
399    ) -> rasi_syscall::CancelablePoll<io::Result<(Handle, std::os::unix::net::SocketAddr)>> {
400        use mio::net::UnixListener;
401
402        let socket = handle
403            .downcast::<MioSocket<UnixListener>>()
404            .expect("Expect TcpListener.");
405
406        would_block(socket.token, waker, Interest::READABLE, || {
407            match socket.accept() {
408                Ok((mut stream, raddr)) => {
409                    let token = Token::next();
410
411                    global_reactor().register(
412                        &mut stream,
413                        token,
414                        Interest::READABLE.add(Interest::WRITABLE),
415                    )?;
416
417                    let raddr = mio_unix_address_to_std_unix_addr(raddr)?;
418
419                    Ok((Handle::new(MioSocket::from((token, stream))), raddr))
420                }
421                Err(err) => Err(err),
422            }
423        })
424    }
425
426    #[cfg(all(unix, feature = "unix_socket"))]
427    fn unix_listener_local_addr(
428        &self,
429        handle: &Handle,
430    ) -> io::Result<std::os::unix::net::SocketAddr> {
431        use mio::net::UnixListener;
432
433        let socket = handle
434            .downcast::<MioSocket<UnixListener>>()
435            .expect("Expect UnixListener.");
436
437        mio_unix_address_to_std_unix_addr(socket.local_addr()?)
438    }
439
440    #[cfg(all(unix, feature = "unix_socket"))]
441    fn unix_stream_connect(
442        &self,
443        _waker: std::task::Waker,
444        path: &std::path::Path,
445    ) -> rasi_syscall::CancelablePoll<io::Result<Handle>> {
446        ready(|| {
447            let mut socket = mio::net::UnixStream::connect(path)?;
448
449            let token = Token::next();
450
451            global_reactor().register(
452                &mut socket,
453                token,
454                Interest::READABLE.add(Interest::WRITABLE),
455            )?;
456
457            Ok(Handle::new(MioSocket::from((token, socket))))
458        })
459    }
460
461    #[cfg(all(unix, feature = "unix_socket"))]
462    fn unix_stream_local_addr(
463        &self,
464        handle: &Handle,
465    ) -> io::Result<std::os::unix::net::SocketAddr> {
466        use mio::net::UnixStream;
467
468        let socket = handle
469            .downcast::<MioSocket<UnixStream>>()
470            .expect("Expect UnixStream.");
471
472        mio_unix_address_to_std_unix_addr(socket.local_addr()?)
473    }
474
475    #[cfg(all(unix, feature = "unix_socket"))]
476    fn unix_stream_peer_addr(&self, handle: &Handle) -> io::Result<std::os::unix::net::SocketAddr> {
477        use mio::net::UnixStream;
478
479        let socket = handle
480            .downcast::<MioSocket<UnixStream>>()
481            .expect("Expect UnixStream.");
482
483        mio_unix_address_to_std_unix_addr(socket.peer_addr()?)
484    }
485
486    #[cfg(all(unix, feature = "unix_socket"))]
487    fn unix_stream_shutdown(&self, handle: &Handle, how: std::net::Shutdown) -> io::Result<()> {
488        use mio::net::UnixStream;
489
490        let socket = handle
491            .downcast::<MioSocket<UnixStream>>()
492            .expect("Expect UnixStream.");
493
494        socket.shutdown(how)
495    }
496
497    #[cfg(all(unix, feature = "unix_socket"))]
498    fn unix_stream_write(
499        &self,
500        waker: std::task::Waker,
501        socket: &Handle,
502        buf: &[u8],
503    ) -> rasi_syscall::CancelablePoll<io::Result<usize>> {
504        use mio::net::UnixStream;
505
506        let socket = socket
507            .downcast::<MioSocket<UnixStream>>()
508            .expect("Expect UnixStream.");
509
510        would_block(socket.token, waker, Interest::WRITABLE, || {
511            socket.deref().write(buf)
512        })
513    }
514
515    #[cfg(all(unix, feature = "unix_socket"))]
516    fn unix_stream_read(
517        &self,
518        waker: std::task::Waker,
519        socket: &Handle,
520        buf: &mut [u8],
521    ) -> rasi_syscall::CancelablePoll<io::Result<usize>> {
522        use mio::net::UnixStream;
523
524        let socket = socket
525            .downcast::<MioSocket<UnixStream>>()
526            .expect("Expect UnixStream.");
527
528        would_block(socket.token, waker, Interest::WRITABLE, || {
529            socket.deref().read(buf)
530        })
531    }
532}
533
534#[cfg(unix)]
535fn mio_unix_address_to_std_unix_addr(
536    addr: mio::net::SocketAddr,
537) -> io::Result<std::os::unix::net::SocketAddr> {
538    if let Some(path) = addr.as_abstract_namespace() {
539        std::os::unix::net::SocketAddr::from_pathname(format!("{}", path.escape_ascii()))
540    } else if let Some(path) = addr.as_pathname() {
541        std::os::unix::net::SocketAddr::from_pathname(path)
542    } else {
543        std::os::unix::net::SocketAddr::from_pathname("")
544    }
545}
546
547/// This function using [`register_global_network`] to register the [`MioNetwork`] to global registry.
548///
549/// So you may not call this function twice, otherwise will cause a panic. [`read more`](`register_global_network`)
550pub fn register_mio_network() {
551    register_global_network(MioNetwork::default())
552}
553
554#[cfg(test)]
555mod tests {
556    use std::sync::OnceLock;
557
558    use rasi_spec::network::run_network_spec;
559
560    use super::*;
561
562    static INIT: OnceLock<Box<dyn rasi_syscall::Network>> = OnceLock::new();
563
564    fn get_syscall() -> &'static dyn rasi_syscall::Network {
565        INIT.get_or_init(|| Box::new(MioNetwork::default()))
566            .as_ref()
567    }
568
569    #[futures_test::test]
570    async fn test_network() {
571        run_network_spec(get_syscall()).await;
572    }
573}