1use std::{
4 io::{self, Read, Write},
5 ops::Deref,
6};
7
8use mio::{
9 net::{TcpListener, TcpStream, UdpSocket},
10 Interest, Token,
11};
12use rasi_syscall::{ready, register_global_network, Handle, Network};
13
14use crate::{
15 reactor::{global_reactor, would_block, MioSocket},
16 TokenSequence,
17};
18
19#[derive(Default)]
21pub struct MioNetwork {}
22
23impl Network for MioNetwork {
24 fn udp_join_multicast_v4(
25 &self,
26 handle: &rasi_syscall::Handle,
27 multiaddr: &std::net::Ipv4Addr,
28 interface: &std::net::Ipv4Addr,
29 ) -> io::Result<()> {
30 let socket = handle
31 .downcast::<MioSocket<UdpSocket>>()
32 .expect("Expect udpsocket");
33
34 socket.join_multicast_v4(multiaddr, interface)
35 }
36
37 fn udp_join_multicast_v6(
38 &self,
39 handle: &rasi_syscall::Handle,
40 multiaddr: &std::net::Ipv6Addr,
41 interface: u32,
42 ) -> io::Result<()> {
43 let socket = handle
44 .downcast::<MioSocket<UdpSocket>>()
45 .expect("Expect udpsocket");
46
47 socket.join_multicast_v6(multiaddr, interface)
48 }
49
50 fn udp_leave_multicast_v4(
51 &self,
52 handle: &rasi_syscall::Handle,
53 multiaddr: &std::net::Ipv4Addr,
54 interface: &std::net::Ipv4Addr,
55 ) -> io::Result<()> {
56 let socket = handle
57 .downcast::<MioSocket<UdpSocket>>()
58 .expect("Expect udpsocket");
59
60 socket.leave_multicast_v4(multiaddr, interface)
61 }
62
63 fn udp_leave_multicast_v6(
64 &self,
65 handle: &rasi_syscall::Handle,
66 multiaddr: &std::net::Ipv6Addr,
67 interface: u32,
68 ) -> io::Result<()> {
69 let socket = handle
70 .downcast::<MioSocket<UdpSocket>>()
71 .expect("Expect udpsocket");
72
73 socket.leave_multicast_v6(multiaddr, interface)
74 }
75 fn udp_set_broadcast(&self, handle: &rasi_syscall::Handle, on: bool) -> std::io::Result<()> {
76 let socket = handle
77 .downcast::<MioSocket<UdpSocket>>()
78 .expect("Expect udpsocket");
79
80 socket.set_broadcast(on)
81 }
82
83 fn udp_broadcast(&self, handle: &rasi_syscall::Handle) -> std::io::Result<bool> {
84 let socket = handle
85 .downcast::<MioSocket<UdpSocket>>()
86 .expect("Expect udpsocket");
87
88 socket.broadcast()
89 }
90
91 fn udp_ttl(&self, handle: &rasi_syscall::Handle) -> std::io::Result<u32> {
92 let socket = handle
93 .downcast::<MioSocket<UdpSocket>>()
94 .expect("Expect udpsocket");
95
96 socket.ttl()
97 }
98
99 fn udp_set_ttl(&self, handle: &rasi_syscall::Handle, ttl: u32) -> std::io::Result<()> {
100 let socket = handle
101 .downcast::<MioSocket<UdpSocket>>()
102 .expect("Expect udpsocket");
103
104 socket.set_ttl(ttl)
105 }
106
107 fn udp_local_addr(
108 &self,
109 handle: &rasi_syscall::Handle,
110 ) -> std::io::Result<std::net::SocketAddr> {
111 let socket = handle
112 .downcast::<MioSocket<UdpSocket>>()
113 .expect("Expect udpsocket");
114
115 socket.local_addr()
116 }
117
118 fn udp_bind(
119 &self,
120 _waker: std::task::Waker,
121 laddrs: &[std::net::SocketAddr],
122 ) -> rasi_syscall::CancelablePoll<std::io::Result<rasi_syscall::Handle>> {
123 ready(|| {
124 let std_socket = std::net::UdpSocket::bind(laddrs)?;
125
126 std_socket.set_nonblocking(true)?;
127
128 let mut socket = UdpSocket::from_std(std_socket);
129 let token = Token::next();
130
131 global_reactor().register(
132 &mut socket,
133 token,
134 Interest::READABLE.add(Interest::WRITABLE),
135 )?;
136
137 Ok(Handle::new(MioSocket::from((token, socket))))
138 })
139 }
140
141 fn udp_send_to(
142 &self,
143 waker: std::task::Waker,
144 socket: &rasi_syscall::Handle,
145 buf: &[u8],
146 target: std::net::SocketAddr,
147 ) -> rasi_syscall::CancelablePoll<std::io::Result<usize>> {
148 let socket = socket
149 .downcast::<MioSocket<UdpSocket>>()
150 .expect("Expect udpsocket.");
151
152 would_block(socket.token, waker, Interest::WRITABLE, || {
153 socket.send_to(buf, target)
154 })
155 }
156
157 fn udp_recv_from(
158 &self,
159 waker: std::task::Waker,
160 socket: &rasi_syscall::Handle,
161 buf: &mut [u8],
162 ) -> rasi_syscall::CancelablePoll<std::io::Result<(usize, std::net::SocketAddr)>> {
163 let socket = socket
164 .downcast::<MioSocket<UdpSocket>>()
165 .expect("Expect udpsocket.");
166
167 would_block(socket.token, waker, Interest::READABLE, || {
168 socket.recv_from(buf)
169 })
170 }
171
172 fn tcp_listener_bind(
173 &self,
174 _waker: std::task::Waker,
175 laddrs: &[std::net::SocketAddr],
176 ) -> rasi_syscall::CancelablePoll<std::io::Result<rasi_syscall::Handle>> {
177 ready(|| {
178 let std_socket = std::net::TcpListener::bind(laddrs)?;
179
180 std_socket.set_nonblocking(true)?;
181
182 let mut socket = TcpListener::from_std(std_socket);
183 let token = Token::next();
184
185 global_reactor().register(
186 &mut socket,
187 token,
188 Interest::READABLE.add(Interest::WRITABLE),
189 )?;
190
191 Ok(Handle::new(MioSocket::from((token, socket))))
192 })
193 }
194
195 fn tcp_listener_local_addr(
196 &self,
197 handle: &rasi_syscall::Handle,
198 ) -> std::io::Result<std::net::SocketAddr> {
199 let socket = handle
200 .downcast::<MioSocket<TcpListener>>()
201 .expect("Expect TcpListener.");
202
203 socket.local_addr()
204 }
205
206 fn tcp_listener_ttl(&self, handle: &rasi_syscall::Handle) -> std::io::Result<u32> {
207 let socket = handle
208 .downcast::<MioSocket<TcpListener>>()
209 .expect("Expect TcpListener.");
210
211 socket.ttl()
212 }
213
214 fn tcp_listener_set_ttl(&self, handle: &rasi_syscall::Handle, ttl: u32) -> std::io::Result<()> {
215 let socket = handle
216 .downcast::<MioSocket<TcpListener>>()
217 .expect("Expect TcpListener.");
218
219 socket.set_ttl(ttl)
220 }
221
222 fn tcp_listener_accept(
223 &self,
224 waker: std::task::Waker,
225 handle: &rasi_syscall::Handle,
226 ) -> rasi_syscall::CancelablePoll<std::io::Result<(rasi_syscall::Handle, std::net::SocketAddr)>>
227 {
228 let socket = handle
229 .downcast::<MioSocket<TcpListener>>()
230 .expect("Expect TcpListener.");
231
232 would_block(socket.token, waker, Interest::READABLE, || {
233 match socket.accept() {
234 Ok((mut stream, raddr)) => {
235 let token = Token::next();
236
237 global_reactor().register(
238 &mut stream,
239 token,
240 Interest::READABLE.add(Interest::WRITABLE),
241 )?;
242
243 Ok((Handle::new(MioSocket::from((token, stream))), raddr))
244 }
245 Err(err) => Err(err),
246 }
247 })
248 }
249
250 fn tcp_stream_connect(
251 &self,
252 _waker: std::task::Waker,
253 raddrs: &[std::net::SocketAddr],
254 ) -> rasi_syscall::CancelablePoll<std::io::Result<rasi_syscall::Handle>> {
255 ready(|| {
256 let std_socket = std::net::TcpStream::connect(raddrs)?;
257
258 std_socket.set_nonblocking(true)?;
259
260 let mut socket = TcpStream::from_std(std_socket);
261 let token = Token::next();
262
263 global_reactor().register(
264 &mut socket,
265 token,
266 Interest::READABLE.add(Interest::WRITABLE),
267 )?;
268
269 Ok(Handle::new(MioSocket::from((token, socket))))
270 })
271 }
272
273 fn tcp_stream_write(
274 &self,
275 waker: std::task::Waker,
276 socket: &rasi_syscall::Handle,
277 buf: &[u8],
278 ) -> rasi_syscall::CancelablePoll<std::io::Result<usize>> {
279 let socket = socket
280 .downcast::<MioSocket<TcpStream>>()
281 .expect("Expect TcpStream.");
282
283 would_block(socket.token, waker, Interest::WRITABLE, || {
284 socket.deref().write(buf)
285 })
286 }
287
288 fn tcp_stream_read(
289 &self,
290 waker: std::task::Waker,
291 socket: &rasi_syscall::Handle,
292 buf: &mut [u8],
293 ) -> rasi_syscall::CancelablePoll<std::io::Result<usize>> {
294 let socket = socket
295 .downcast::<MioSocket<TcpStream>>()
296 .expect("Expect TcpStream.");
297
298 would_block(socket.token, waker, Interest::READABLE, || {
299 socket.deref().read(buf)
300 })
301 }
302
303 fn tcp_stream_local_addr(
304 &self,
305 handle: &rasi_syscall::Handle,
306 ) -> std::io::Result<std::net::SocketAddr> {
307 let socket = handle
308 .downcast::<MioSocket<TcpStream>>()
309 .expect("Expect TcpStream.");
310
311 socket.local_addr()
312 }
313
314 fn tcp_stream_remote_addr(
315 &self,
316 handle: &rasi_syscall::Handle,
317 ) -> std::io::Result<std::net::SocketAddr> {
318 let socket = handle
319 .downcast::<MioSocket<TcpStream>>()
320 .expect("Expect TcpStream.");
321
322 socket.peer_addr()
323 }
324
325 fn tcp_stream_nodelay(&self, handle: &rasi_syscall::Handle) -> std::io::Result<bool> {
326 let socket = handle
327 .downcast::<MioSocket<TcpStream>>()
328 .expect("Expect TcpStream.");
329
330 socket.nodelay()
331 }
332
333 fn tcp_stream_set_nodelay(
334 &self,
335 handle: &rasi_syscall::Handle,
336 nodelay: bool,
337 ) -> std::io::Result<()> {
338 let socket = handle
339 .downcast::<MioSocket<TcpStream>>()
340 .expect("Expect TcpStream.");
341
342 socket.set_nodelay(nodelay)
343 }
344
345 fn tcp_stream_ttl(&self, handle: &rasi_syscall::Handle) -> std::io::Result<u32> {
346 let socket = handle
347 .downcast::<MioSocket<TcpStream>>()
348 .expect("Expect TcpStream.");
349
350 socket.ttl()
351 }
352
353 fn tcp_stream_set_ttl(&self, handle: &rasi_syscall::Handle, ttl: u32) -> std::io::Result<()> {
354 let socket = handle
355 .downcast::<MioSocket<TcpStream>>()
356 .expect("Expect TcpStream.");
357
358 socket.set_ttl(ttl)
359 }
360
361 fn tcp_stream_shutdown(
362 &self,
363 handle: &rasi_syscall::Handle,
364 how: std::net::Shutdown,
365 ) -> std::io::Result<()> {
366 let socket = handle
367 .downcast::<MioSocket<TcpStream>>()
368 .expect("Expect TcpStream.");
369
370 socket.shutdown(how)
371 }
372
373 #[cfg(all(unix, feature = "unix_socket"))]
374 fn unix_listener_bind(
375 &self,
376 _waker: std::task::Waker,
377 path: &std::path::Path,
378 ) -> rasi_syscall::CancelablePoll<io::Result<Handle>> {
379 ready(|| {
380 let mut socket = mio::net::UnixListener::bind(path)?;
381
382 let token = Token::next();
383
384 global_reactor().register(
385 &mut socket,
386 token,
387 Interest::READABLE.add(Interest::WRITABLE),
388 )?;
389
390 Ok(Handle::new(MioSocket::from((token, socket))))
391 })
392 }
393
394 #[cfg(all(unix, feature = "unix_socket"))]
395 fn unix_listener_accept(
396 &self,
397 waker: std::task::Waker,
398 handle: &Handle,
399 ) -> rasi_syscall::CancelablePoll<io::Result<(Handle, std::os::unix::net::SocketAddr)>> {
400 use mio::net::UnixListener;
401
402 let socket = handle
403 .downcast::<MioSocket<UnixListener>>()
404 .expect("Expect TcpListener.");
405
406 would_block(socket.token, waker, Interest::READABLE, || {
407 match socket.accept() {
408 Ok((mut stream, raddr)) => {
409 let token = Token::next();
410
411 global_reactor().register(
412 &mut stream,
413 token,
414 Interest::READABLE.add(Interest::WRITABLE),
415 )?;
416
417 let raddr = mio_unix_address_to_std_unix_addr(raddr)?;
418
419 Ok((Handle::new(MioSocket::from((token, stream))), raddr))
420 }
421 Err(err) => Err(err),
422 }
423 })
424 }
425
426 #[cfg(all(unix, feature = "unix_socket"))]
427 fn unix_listener_local_addr(
428 &self,
429 handle: &Handle,
430 ) -> io::Result<std::os::unix::net::SocketAddr> {
431 use mio::net::UnixListener;
432
433 let socket = handle
434 .downcast::<MioSocket<UnixListener>>()
435 .expect("Expect UnixListener.");
436
437 mio_unix_address_to_std_unix_addr(socket.local_addr()?)
438 }
439
440 #[cfg(all(unix, feature = "unix_socket"))]
441 fn unix_stream_connect(
442 &self,
443 _waker: std::task::Waker,
444 path: &std::path::Path,
445 ) -> rasi_syscall::CancelablePoll<io::Result<Handle>> {
446 ready(|| {
447 let mut socket = mio::net::UnixStream::connect(path)?;
448
449 let token = Token::next();
450
451 global_reactor().register(
452 &mut socket,
453 token,
454 Interest::READABLE.add(Interest::WRITABLE),
455 )?;
456
457 Ok(Handle::new(MioSocket::from((token, socket))))
458 })
459 }
460
461 #[cfg(all(unix, feature = "unix_socket"))]
462 fn unix_stream_local_addr(
463 &self,
464 handle: &Handle,
465 ) -> io::Result<std::os::unix::net::SocketAddr> {
466 use mio::net::UnixStream;
467
468 let socket = handle
469 .downcast::<MioSocket<UnixStream>>()
470 .expect("Expect UnixStream.");
471
472 mio_unix_address_to_std_unix_addr(socket.local_addr()?)
473 }
474
475 #[cfg(all(unix, feature = "unix_socket"))]
476 fn unix_stream_peer_addr(&self, handle: &Handle) -> io::Result<std::os::unix::net::SocketAddr> {
477 use mio::net::UnixStream;
478
479 let socket = handle
480 .downcast::<MioSocket<UnixStream>>()
481 .expect("Expect UnixStream.");
482
483 mio_unix_address_to_std_unix_addr(socket.peer_addr()?)
484 }
485
486 #[cfg(all(unix, feature = "unix_socket"))]
487 fn unix_stream_shutdown(&self, handle: &Handle, how: std::net::Shutdown) -> io::Result<()> {
488 use mio::net::UnixStream;
489
490 let socket = handle
491 .downcast::<MioSocket<UnixStream>>()
492 .expect("Expect UnixStream.");
493
494 socket.shutdown(how)
495 }
496
497 #[cfg(all(unix, feature = "unix_socket"))]
498 fn unix_stream_write(
499 &self,
500 waker: std::task::Waker,
501 socket: &Handle,
502 buf: &[u8],
503 ) -> rasi_syscall::CancelablePoll<io::Result<usize>> {
504 use mio::net::UnixStream;
505
506 let socket = socket
507 .downcast::<MioSocket<UnixStream>>()
508 .expect("Expect UnixStream.");
509
510 would_block(socket.token, waker, Interest::WRITABLE, || {
511 socket.deref().write(buf)
512 })
513 }
514
515 #[cfg(all(unix, feature = "unix_socket"))]
516 fn unix_stream_read(
517 &self,
518 waker: std::task::Waker,
519 socket: &Handle,
520 buf: &mut [u8],
521 ) -> rasi_syscall::CancelablePoll<io::Result<usize>> {
522 use mio::net::UnixStream;
523
524 let socket = socket
525 .downcast::<MioSocket<UnixStream>>()
526 .expect("Expect UnixStream.");
527
528 would_block(socket.token, waker, Interest::WRITABLE, || {
529 socket.deref().read(buf)
530 })
531 }
532}
533
534#[cfg(unix)]
535fn mio_unix_address_to_std_unix_addr(
536 addr: mio::net::SocketAddr,
537) -> io::Result<std::os::unix::net::SocketAddr> {
538 if let Some(path) = addr.as_abstract_namespace() {
539 std::os::unix::net::SocketAddr::from_pathname(format!("{}", path.escape_ascii()))
540 } else if let Some(path) = addr.as_pathname() {
541 std::os::unix::net::SocketAddr::from_pathname(path)
542 } else {
543 std::os::unix::net::SocketAddr::from_pathname("")
544 }
545}
546
547pub fn register_mio_network() {
551 register_global_network(MioNetwork::default())
552}
553
554#[cfg(test)]
555mod tests {
556 use std::sync::OnceLock;
557
558 use rasi_spec::network::run_network_spec;
559
560 use super::*;
561
562 static INIT: OnceLock<Box<dyn rasi_syscall::Network>> = OnceLock::new();
563
564 fn get_syscall() -> &'static dyn rasi_syscall::Network {
565 INIT.get_or_init(|| Box::new(MioNetwork::default()))
566 .as_ref()
567 }
568
569 #[futures_test::test]
570 async fn test_network() {
571 run_network_spec(get_syscall()).await;
572 }
573}