compio_net/
udp.rs

1use std::{future::Future, io, net::SocketAddr};
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use compio_driver::impl_raw_fd;
5use compio_runtime::{BorrowedBuffer, BufferPool};
6use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
7
8use crate::{Socket, ToSocketAddrsAsync};
9
10/// A UDP socket.
11///
12/// UDP is "connectionless", unlike TCP. Meaning, regardless of what address
13/// you've bound to, a `UdpSocket` is free to communicate with many different
14/// remotes. There are basically two main ways to use `UdpSocket`:
15///
16/// * one to many: [`bind`](`UdpSocket::bind`) and use
17///   [`send_to`](`UdpSocket::send_to`) and
18///   [`recv_from`](`UdpSocket::recv_from`) to communicate with many different
19///   addresses
20/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single
21///   address, using [`send`](`UdpSocket::send`) and [`recv`](`UdpSocket::recv`)
22///   to communicate only with that remote address
23///
24/// # Examples
25/// Bind and connect a pair of sockets and send a packet:
26///
27/// ```
28/// use std::net::SocketAddr;
29///
30/// use compio_net::UdpSocket;
31///
32/// # compio_runtime::Runtime::new().unwrap().block_on(async {
33/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
34/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
35///
36/// // bind sockets
37/// let mut socket = UdpSocket::bind(first_addr).await.unwrap();
38/// let first_addr = socket.local_addr().unwrap();
39/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap();
40/// let second_addr = other_socket.local_addr().unwrap();
41///
42/// // connect sockets
43/// socket.connect(second_addr).await.unwrap();
44/// other_socket.connect(first_addr).await.unwrap();
45///
46/// let buf = Vec::with_capacity(12);
47///
48/// // write data
49/// socket.send("Hello world!").await.unwrap();
50///
51/// // read data
52/// let (n_bytes, buf) = other_socket.recv(buf).await.unwrap();
53///
54/// assert_eq!(n_bytes, buf.len());
55/// assert_eq!(buf, b"Hello world!");
56/// # });
57/// ```
58/// Send and receive packets without connecting:
59///
60/// ```
61/// use std::net::SocketAddr;
62///
63/// use compio_net::UdpSocket;
64/// use socket2::SockAddr;
65///
66/// # compio_runtime::Runtime::new().unwrap().block_on(async {
67/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
68/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
69///
70/// // bind sockets
71/// let mut socket = UdpSocket::bind(first_addr).await.unwrap();
72/// let first_addr = socket.local_addr().unwrap();
73/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap();
74/// let second_addr = other_socket.local_addr().unwrap();
75///
76/// let buf = Vec::with_capacity(32);
77///
78/// // write data
79/// socket.send_to("hello world", second_addr).await.unwrap();
80///
81/// // read data
82/// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap();
83///
84/// assert_eq!(addr, first_addr);
85/// assert_eq!(n_bytes, buf.len());
86/// assert_eq!(buf, b"hello world");
87/// # });
88/// ```
89#[derive(Debug, Clone)]
90pub struct UdpSocket {
91    inner: Socket,
92}
93
94impl UdpSocket {
95    /// Creates a new UDP socket and attempt to bind it to the addr provided.
96    pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
97        super::each_addr(addr, |addr| async move {
98            Ok(Self {
99                inner: Socket::bind(&SockAddr::from(addr), Type::DGRAM, Some(Protocol::UDP))
100                    .await?,
101            })
102        })
103        .await
104    }
105
106    /// Connects this UDP socket to a remote address, allowing the `send` and
107    /// `recv` to be used to send data and also applies filters to only
108    /// receive data from the specified address.
109    ///
110    /// Note that usually, a successful `connect` call does not specify
111    /// that there is a remote server listening on the port, rather, such an
112    /// error would only be detected after the first send.
113    pub async fn connect(&self, addr: impl ToSocketAddrsAsync) -> io::Result<()> {
114        super::each_addr(addr, |addr| async move {
115            self.inner.connect(&SockAddr::from(addr))
116        })
117        .await
118    }
119
120    /// Creates new UdpSocket from a std::net::UdpSocket.
121    pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
122        Ok(Self {
123            inner: Socket::from_socket2(Socket2::from(socket))?,
124        })
125    }
126
127    /// Close the socket. If the returned future is dropped before polling, the
128    /// socket won't be closed.
129    pub fn close(self) -> impl Future<Output = io::Result<()>> {
130        self.inner.close()
131    }
132
133    /// Returns the socket address of the remote peer this socket was connected
134    /// to.
135    ///
136    /// # Examples
137    ///
138    /// ```no_run
139    /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
140    ///
141    /// use compio_net::UdpSocket;
142    /// use socket2::SockAddr;
143    ///
144    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
145    /// let socket = UdpSocket::bind("127.0.0.1:34254")
146    ///     .await
147    ///     .expect("couldn't bind to address");
148    /// socket
149    ///     .connect("192.168.0.1:41203")
150    ///     .await
151    ///     .expect("couldn't connect to address");
152    /// assert_eq!(
153    ///     socket.peer_addr().unwrap(),
154    ///     SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 1), 41203))
155    /// );
156    /// # });
157    /// ```
158    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
159        self.inner
160            .peer_addr()
161            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
162    }
163
164    /// Returns the local address that this socket is bound to.
165    ///
166    /// # Example
167    ///
168    /// ```
169    /// use std::net::SocketAddr;
170    ///
171    /// use compio_net::UdpSocket;
172    /// use socket2::SockAddr;
173    ///
174    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
175    /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
176    /// let sock = UdpSocket::bind(&addr).await.unwrap();
177    /// // the address the socket is bound to
178    /// let local_addr = sock.local_addr().unwrap();
179    /// assert_eq!(local_addr, addr);
180    /// # });
181    /// ```
182    pub fn local_addr(&self) -> io::Result<SocketAddr> {
183        self.inner
184            .local_addr()
185            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
186    }
187
188    /// Receives a packet of data from the socket into the buffer, returning the
189    /// original buffer and quantity of data received.
190    pub async fn recv<T: IoBufMut>(&self, buffer: T) -> BufResult<usize, T> {
191        self.inner.recv(buffer).await
192    }
193
194    /// Receives a packet of data from the socket into the buffer, returning the
195    /// original buffer and quantity of data received.
196    pub async fn recv_vectored<T: IoVectoredBufMut>(&self, buffer: T) -> BufResult<usize, T> {
197        self.inner.recv_vectored(buffer).await
198    }
199
200    /// Read some bytes from this source with [`BufferPool`] and return
201    /// a [`BorrowedBuffer`].
202    ///
203    /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len,
204    /// if `len` > 0, `min(len, inner buffer size)` will be the read max len
205    pub async fn recv_managed<'a>(
206        &self,
207        buffer_pool: &'a BufferPool,
208        len: usize,
209    ) -> io::Result<BorrowedBuffer<'a>> {
210        self.inner.recv_managed(buffer_pool, len).await
211    }
212
213    /// Sends some data to the socket from the buffer, returning the original
214    /// buffer and quantity of data sent.
215    pub async fn send<T: IoBuf>(&self, buffer: T) -> BufResult<usize, T> {
216        self.inner.send(buffer).await
217    }
218
219    /// Sends some data to the socket from the buffer, returning the original
220    /// buffer and quantity of data sent.
221    pub async fn send_vectored<T: IoVectoredBuf>(&self, buffer: T) -> BufResult<usize, T> {
222        self.inner.send_vectored(buffer).await
223    }
224
225    /// Receives a single datagram message on the socket. On success, returns
226    /// the number of bytes received and the origin.
227    pub async fn recv_from<T: IoBufMut>(&self, buffer: T) -> BufResult<(usize, SocketAddr), T> {
228        self.inner
229            .recv_from(buffer)
230            .await
231            .map_res(|(n, addr)| (n, addr.as_socket().expect("should be SocketAddr")))
232    }
233
234    /// Receives a single datagram message on the socket. On success, returns
235    /// the number of bytes received and the origin.
236    pub async fn recv_from_vectored<T: IoVectoredBufMut>(
237        &self,
238        buffer: T,
239    ) -> BufResult<(usize, SocketAddr), T> {
240        self.inner
241            .recv_from_vectored(buffer)
242            .await
243            .map_res(|(n, addr)| (n, addr.as_socket().expect("should be SocketAddr")))
244    }
245
246    /// Receives a single datagram message and ancillary data on the socket. On
247    /// success, returns the number of bytes received and the origin.
248    pub async fn recv_msg<T: IoBufMut, C: IoBufMut>(
249        &self,
250        buffer: T,
251        control: C,
252    ) -> BufResult<(usize, usize, SocketAddr), (T, C)> {
253        self.inner
254            .recv_msg(buffer, control)
255            .await
256            .map_res(|(n, m, addr)| (n, m, addr.as_socket().expect("should be SocketAddr")))
257    }
258
259    /// Receives a single datagram message and ancillary data on the socket. On
260    /// success, returns the number of bytes received and the origin.
261    pub async fn recv_msg_vectored<T: IoVectoredBufMut, C: IoBufMut>(
262        &self,
263        buffer: T,
264        control: C,
265    ) -> BufResult<(usize, usize, SocketAddr), (T, C)> {
266        self.inner
267            .recv_msg_vectored(buffer, control)
268            .await
269            .map_res(|(n, m, addr)| (n, m, addr.as_socket().expect("should be SocketAddr")))
270    }
271
272    /// Sends data on the socket to the given address. On success, returns the
273    /// number of bytes sent.
274    pub async fn send_to<T: IoBuf>(
275        &self,
276        buffer: T,
277        addr: impl ToSocketAddrsAsync,
278    ) -> BufResult<usize, T> {
279        super::first_addr_buf(addr, buffer, |addr, buffer| async move {
280            self.inner.send_to(buffer, &SockAddr::from(addr)).await
281        })
282        .await
283    }
284
285    /// Sends data on the socket to the given address. On success, returns the
286    /// number of bytes sent.
287    pub async fn send_to_vectored<T: IoVectoredBuf>(
288        &self,
289        buffer: T,
290        addr: impl ToSocketAddrsAsync,
291    ) -> BufResult<usize, T> {
292        super::first_addr_buf(addr, buffer, |addr, buffer| async move {
293            self.inner
294                .send_to_vectored(buffer, &SockAddr::from(addr))
295                .await
296        })
297        .await
298    }
299
300    /// Sends data on the socket to the given address accompanied by ancillary
301    /// data. On success, returns the number of bytes sent.
302    pub async fn send_msg<T: IoBuf, C: IoBuf>(
303        &self,
304        buffer: T,
305        control: C,
306        addr: impl ToSocketAddrsAsync,
307    ) -> BufResult<usize, (T, C)> {
308        super::first_addr_buf(
309            addr,
310            (buffer, control),
311            |addr, (buffer, control)| async move {
312                self.inner
313                    .send_msg(buffer, control, &SockAddr::from(addr))
314                    .await
315            },
316        )
317        .await
318    }
319
320    /// Sends data on the socket to the given address accompanied by ancillary
321    /// data. On success, returns the number of bytes sent.
322    pub async fn send_msg_vectored<T: IoVectoredBuf, C: IoBuf>(
323        &self,
324        buffer: T,
325        control: C,
326        addr: impl ToSocketAddrsAsync,
327    ) -> BufResult<usize, (T, C)> {
328        super::first_addr_buf(
329            addr,
330            (buffer, control),
331            |addr, (buffer, control)| async move {
332                self.inner
333                    .send_msg_vectored(buffer, control, &SockAddr::from(addr))
334                    .await
335            },
336        )
337        .await
338    }
339
340    /// Gets a socket option.
341    ///
342    /// # Safety
343    ///
344    /// The caller must ensure `T` is the correct type for `level` and `name`.
345    pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
346        self.inner.get_socket_option(level, name)
347    }
348
349    /// Sets a socket option.
350    ///
351    /// # Safety
352    ///
353    /// The caller must ensure `T` is the correct type for `level` and `name`.
354    pub unsafe fn set_socket_option<T: Copy>(
355        &self,
356        level: i32,
357        name: i32,
358        value: &T,
359    ) -> io::Result<()> {
360        self.inner.set_socket_option(level, name, value)
361    }
362}
363
364impl_raw_fd!(UdpSocket, socket2::Socket, inner, socket);