async_transport/runtime/
smol.rs

1use crate::runtime::AsyncUdpSocket;
2use crate::{Capabilities, RecvMeta, Transmit, UdpSocketState};
3use async_io::Async;
4use smol::net::AsyncToSocketAddrs;
5use std::{
6    future::poll_fn,
7    io,
8    net::SocketAddr,
9    task::{Context, Poll},
10};
11
12#[derive(Debug)]
13pub struct UdpSocket {
14    io: Async<std::net::UdpSocket>,
15    inner: UdpSocketState,
16}
17
18impl AsyncUdpSocket for UdpSocket {
19    fn poll_send(
20        &self,
21        cx: &mut Context<'_>,
22        capabilities: &Capabilities,
23        transmits: &[Transmit],
24    ) -> Poll<io::Result<usize>> {
25        loop {
26            ready!(self.io.poll_writable(cx))?;
27            if let Ok(res) = self.inner.send((&self.io).into(), capabilities, transmits) {
28                return Poll::Ready(Ok(res));
29            }
30        }
31    }
32
33    fn poll_recv(
34        &self,
35        cx: &mut Context<'_>,
36        bufs: &mut [io::IoSliceMut<'_>],
37        meta: &mut [RecvMeta],
38    ) -> Poll<io::Result<usize>> {
39        loop {
40            ready!(self.io.poll_readable(cx))?;
41            if let Ok(res) = self.inner.recv((&self.io).into(), bufs, meta) {
42                return Poll::Ready(Ok(res));
43            }
44        }
45    }
46
47    fn local_addr(&self) -> io::Result<SocketAddr> {
48        self.io.as_ref().local_addr()
49    }
50
51    fn peer_addr(&self) -> io::Result<SocketAddr> {
52        self.io.as_ref().peer_addr()
53    }
54}
55
56impl UdpSocket {
57    pub async fn bind<A: AsyncToSocketAddrs>(addr: A) -> io::Result<Self> {
58        let mut last_err = None;
59
60        for addr in addr.to_socket_addrs().await? {
61            match Async::<std::net::UdpSocket>::bind(addr) {
62                Ok(socket) => {
63                    UdpSocketState::configure((&socket).into())?;
64                    return Ok(Self {
65                        io: socket,
66                        inner: UdpSocketState::new(),
67                    });
68                }
69                Err(err) => last_err = Some(err),
70            }
71        }
72
73        Err(last_err.unwrap_or_else(|| {
74            io::Error::new(
75                io::ErrorKind::InvalidInput,
76                "could not bind to any of the addresses",
77            )
78        }))
79    }
80
81    pub async fn connect<A: AsyncToSocketAddrs>(&self, addr: A) -> io::Result<()> {
82        let mut last_err = None;
83
84        for addr in addr.to_socket_addrs().await? {
85            match self.io.get_ref().connect(addr) {
86                Ok(()) => return Ok(()),
87                Err(err) => last_err = Some(err),
88            }
89        }
90
91        Err(last_err.unwrap_or_else(|| {
92            io::Error::new(
93                io::ErrorKind::InvalidInput,
94                "could not connect to any of the addresses",
95            )
96        }))
97    }
98
99    pub async fn send_to<A: AsyncToSocketAddrs>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
100        let addr = match addr.to_socket_addrs().await?.next() {
101            Some(addr) => addr,
102            None => {
103                return Err(io::Error::new(
104                    io::ErrorKind::InvalidInput,
105                    "no addresses to send data to",
106                ));
107            }
108        };
109
110        self.io.send_to(buf, addr).await
111    }
112
113    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
114        self.io.recv_from(buf).await
115    }
116
117    pub async fn send(
118        &self,
119        capabilities: &Capabilities,
120        transmits: &[Transmit],
121    ) -> io::Result<usize> {
122        poll_fn(|cx| self.poll_send(cx, capabilities, transmits)).await
123    }
124
125    pub async fn recv(
126        &self,
127        bufs: &mut [io::IoSliceMut<'_>],
128        meta: &mut [RecvMeta],
129    ) -> io::Result<usize> {
130        poll_fn(|cx| self.poll_recv(cx, bufs, meta)).await
131    }
132}