async_std/net/udp/
mod.rs

1use std::io;
2use std::net::SocketAddr;
3use std::net::{Ipv4Addr, Ipv6Addr};
4
5use crate::future;
6use crate::net::driver::Watcher;
7use crate::net::ToSocketAddrs;
8use crate::utils::Context as _;
9
10/// A UDP socket.
11///
12/// After creating a `UdpSocket` by [`bind`]ing it to a socket address, data can be [sent to] and
13/// [received from] any other socket address.
14///
15/// As stated in the User Datagram Protocol's specification in [IETF RFC 768], UDP is an unordered,
16/// unreliable protocol. Refer to [`TcpListener`] and [`TcpStream`] for async TCP primitives.
17///
18/// This type is an async version of [`std::net::UdpSocket`].
19///
20/// [`bind`]: #method.bind
21/// [received from]: #method.recv_from
22/// [sent to]: #method.send_to
23/// [`TcpListener`]: struct.TcpListener.html
24/// [`TcpStream`]: struct.TcpStream.html
25/// [`std::net`]: https://doc.rust-lang.org/std/net/index.html
26/// [IETF RFC 768]: https://tools.ietf.org/html/rfc768
27/// [`std::net::UdpSocket`]: https://doc.rust-lang.org/std/net/struct.UdpSocket.html
28///
29/// ## Examples
30///
31/// ```no_run
32/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
33/// #
34/// use async_std::net::UdpSocket;
35///
36/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
37/// let mut buf = vec![0u8; 1024];
38///
39/// loop {
40///     let (n, peer) = socket.recv_from(&mut buf).await?;
41///     socket.send_to(&buf[..n], &peer).await?;
42/// }
43/// #
44/// # }) }
45/// ```
46#[derive(Debug)]
47pub struct UdpSocket {
48    watcher: Watcher<mio::net::UdpSocket>,
49}
50
51impl UdpSocket {
52    /// Creates a UDP socket from the given address.
53    ///
54    /// Binding with a port number of 0 will request that the OS assigns a port to this socket. The
55    /// port allocated can be queried via the [`local_addr`] method.
56    ///
57    /// [`local_addr`]: #method.local_addr
58    ///
59    /// # Examples
60    ///
61    /// ```no_run
62    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
63    /// #
64    /// use async_std::net::UdpSocket;
65    ///
66    /// let socket = UdpSocket::bind("127.0.0.1:0").await?;
67    /// #
68    /// # Ok(()) }) }
69    /// ```
70    pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<UdpSocket> {
71        let mut last_err = None;
72        let addrs = addrs
73            .to_socket_addrs()
74            .await?;
75
76        for addr in addrs {
77            match mio::net::UdpSocket::bind(&addr) {
78                Ok(mio_socket) => {
79                    return Ok(UdpSocket {
80                        watcher: Watcher::new(mio_socket),
81                    });
82                }
83                Err(err) => last_err = Some(err),
84            }
85        }
86
87        Err(last_err.unwrap_or_else(|| {
88            io::Error::new(
89                io::ErrorKind::InvalidInput,
90                "could not resolve to any addresses",
91            )
92        }))
93    }
94
95    /// Returns the local address that this listener is bound to.
96    ///
97    /// This can be useful, for example, when binding to port 0 to figure out which port was
98    /// actually bound.
99    ///
100    /// # Examples
101    ///
102    /// ```no_run
103    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
104    /// #
105    ///	use async_std::net::UdpSocket;
106    ///
107    /// let socket = UdpSocket::bind("127.0.0.1:0").await?;
108    /// let addr = socket.local_addr()?;
109    /// #
110    /// # Ok(()) }) }
111    /// ```
112    pub fn local_addr(&self) -> io::Result<SocketAddr> {
113        self.watcher
114            .get_ref()
115            .local_addr()
116            .context(|| String::from("could not get local address"))
117    }
118
119    /// Sends data on the socket to the given address.
120    ///
121    /// On success, returns the number of bytes written.
122    ///
123    /// # Examples
124    ///
125    /// ```no_run
126    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
127    /// #
128    /// use async_std::net::UdpSocket;
129    ///
130    /// const THE_MERCHANT_OF_VENICE: &[u8] = b"
131    ///     If you prick us, do we not bleed?
132    ///     If you tickle us, do we not laugh?
133    ///     If you poison us, do we not die?
134    ///     And if you wrong us, shall we not revenge?
135    /// ";
136    ///
137    /// let socket = UdpSocket::bind("127.0.0.1:0").await?;
138    ///
139    /// let addr = "127.0.0.1:7878";
140    /// let sent = socket.send_to(THE_MERCHANT_OF_VENICE, &addr).await?;
141    /// println!("Sent {} bytes to {}", sent, addr);
142    /// #
143    /// # Ok(()) }) }
144    /// ```
145    pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], addrs: A) -> io::Result<usize> {
146        let addr = match addrs.to_socket_addrs().await?.next() {
147            Some(addr) => addr,
148            None => {
149                return Err(io::Error::new(
150                    io::ErrorKind::InvalidInput,
151                    "no addresses to send data to",
152                ));
153            }
154        };
155
156        future::poll_fn(|cx| {
157            self.watcher
158                .poll_write_with(cx, |inner| inner.send_to(buf, &addr))
159        })
160        .await
161        .context(|| format!("could not send packet to {}", addr))
162    }
163
164    /// Receives data from the socket.
165    ///
166    /// On success, returns the number of bytes read and the origin.
167    ///
168    /// # Examples
169    ///
170    /// ```no_run
171    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
172    /// #
173    /// use async_std::net::UdpSocket;
174    ///
175    /// let socket = UdpSocket::bind("127.0.0.1:0").await?;
176    ///
177    /// let mut buf = vec![0; 1024];
178    /// let (n, peer) = socket.recv_from(&mut buf).await?;
179    /// println!("Received {} bytes from {}", n, peer);
180    /// #
181    /// # Ok(()) }) }
182    /// ```
183    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
184        future::poll_fn(|cx| {
185            self.watcher
186                .poll_read_with(cx, |inner| inner.recv_from(buf))
187        })
188        .await
189        .context(|| {
190            use std::fmt::Write;
191
192            let mut error = String::from("could not receive data on ");
193            if let Ok(addr) = self.local_addr() {
194                let _ = write!(&mut error, "{}", addr);
195            } else {
196                error.push_str("socket");
197            }
198            error
199        })
200    }
201
202    /// Connects the UDP socket to a remote address.
203    ///
204    /// When connected, methods [`send`] and [`recv`] will use the specified address for sending
205    /// and receiving messages. Additionally, a filter will be applied to [`recv_from`] so that it
206    /// only receives messages from that same address.
207    ///
208    /// [`send`]: #method.send
209    /// [`recv`]: #method.recv
210    /// [`recv_from`]: #method.recv_from
211    ///
212    /// # Examples
213    ///
214    /// ```no_run
215    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
216    /// #
217    /// use async_std::net::UdpSocket;
218    ///
219    /// let socket = UdpSocket::bind("127.0.0.1:0").await?;
220    /// socket.connect("127.0.0.1:8080").await?;
221    /// #
222    /// # Ok(()) }) }
223    /// ```
224    pub async fn connect<A: ToSocketAddrs>(&self, addrs: A) -> io::Result<()> {
225        let mut last_err = None;
226        let addrs = addrs
227            .to_socket_addrs()
228            .await
229            .context(|| String::from("could not resolve addresses"))?;
230
231        for addr in addrs {
232            // TODO(stjepang): connect on the blocking pool
233            match self.watcher.get_ref().connect(addr) {
234                Ok(()) => return Ok(()),
235                Err(err) => last_err = Some(err),
236            }
237        }
238
239        Err(last_err.unwrap_or_else(|| {
240            io::Error::new(
241                io::ErrorKind::InvalidInput,
242                "could not resolve to any addresses",
243            )
244        }))
245    }
246
247    /// Sends data on the socket to the remote address to which it is connected.
248    ///
249    /// The [`connect`] method will connect this socket to a remote address.
250    /// This method will fail if the socket is not connected.
251    ///
252    /// [`connect`]: #method.connect
253    ///
254    /// # Examples
255    ///
256    /// ```no_run
257    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
258    /// #
259    /// use async_std::net::UdpSocket;
260    ///
261    /// let socket = UdpSocket::bind("127.0.0.1:34254").await?;
262    /// socket.connect("127.0.0.1:8080").await?;
263    /// let bytes = socket.send(b"Hi there!").await?;
264    ///
265    /// println!("Sent {} bytes", bytes);
266    /// #
267    /// # Ok(()) }) }
268    /// ```
269    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
270        future::poll_fn(|cx| self.watcher.poll_write_with(cx, |inner| inner.send(buf)))
271            .await
272            .context(|| {
273                use std::fmt::Write;
274
275                let mut error = String::from("could not send data on ");
276                if let Ok(addr) = self.local_addr() {
277                    let _ = write!(&mut error, "{}", addr);
278                } else {
279                    error.push_str("socket");
280                }
281                error
282            })
283    }
284
285    /// Receives data from the socket.
286    ///
287    /// On success, returns the number of bytes read.
288    ///
289    /// # Examples
290    ///
291    /// ```no_run
292    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
293    /// #
294    /// use async_std::net::UdpSocket;
295    ///
296    /// let socket = UdpSocket::bind("127.0.0.1:0").await?;
297    /// socket.connect("127.0.0.1:8080").await?;
298    ///
299    /// let mut buf = vec![0; 1024];
300    /// let n = socket.recv(&mut buf).await?;
301    /// println!("Received {} bytes", n);
302    /// #
303    /// # Ok(()) }) }
304    /// ```
305    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
306        future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.recv(buf)))
307            .await
308            .context(|| {
309                use std::fmt::Write;
310
311                let mut error = String::from("could not receive data on ");
312                if let Ok(addr) = self.local_addr() {
313                    let _ = write!(&mut error, "{}", addr);
314                } else {
315                    error.push_str("socket");
316                }
317                error
318            })
319    }
320
321    /// Gets the value of the `SO_BROADCAST` option for this socket.
322    ///
323    /// For more information about this option, see [`set_broadcast`].
324    ///
325    /// [`set_broadcast`]: #method.set_broadcast
326    pub fn broadcast(&self) -> io::Result<bool> {
327        self.watcher.get_ref().broadcast()
328    }
329
330    /// Sets the value of the `SO_BROADCAST` option for this socket.
331    ///
332    /// When enabled, this socket is allowed to send packets to a broadcast address.
333    pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
334        self.watcher.get_ref().set_broadcast(on)
335    }
336
337    /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket.
338    ///
339    /// For more information about this option, see [`set_multicast_loop_v4`].
340    ///
341    /// [`set_multicast_loop_v4`]: #method.set_multicast_loop_v4
342    pub fn multicast_loop_v4(&self) -> io::Result<bool> {
343        self.watcher.get_ref().multicast_loop_v4()
344    }
345
346    /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket.
347    ///
348    /// If enabled, multicast packets will be looped back to the local socket.
349    ///
350    /// # Note
351    ///
352    /// This may not have any affect on IPv6 sockets.
353    pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
354        self.watcher.get_ref().set_multicast_loop_v4(on)
355    }
356
357    /// Gets the value of the `IP_MULTICAST_TTL` option for this socket.
358    ///
359    /// For more information about this option, see [`set_multicast_ttl_v4`].
360    ///
361    /// [`set_multicast_ttl_v4`]: #method.set_multicast_ttl_v4
362    pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
363        self.watcher.get_ref().multicast_ttl_v4()
364    }
365
366    /// Sets the value of the `IP_MULTICAST_TTL` option for this socket.
367    ///
368    /// Indicates the time-to-live value of outgoing multicast packets for this socket. The default
369    /// value is 1 which means that multicast packets don't leave the local network unless
370    /// explicitly requested.
371    ///
372    /// # Note
373    ///
374    /// This may not have any affect on IPv6 sockets.
375    pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
376        self.watcher.get_ref().set_multicast_ttl_v4(ttl)
377    }
378
379    /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket.
380    ///
381    /// For more information about this option, see [`set_multicast_loop_v6`].
382    ///
383    /// [`set_multicast_loop_v6`]: #method.set_multicast_loop_v6
384    pub fn multicast_loop_v6(&self) -> io::Result<bool> {
385        self.watcher.get_ref().multicast_loop_v6()
386    }
387
388    /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket.
389    ///
390    /// Controls whether this socket sees the multicast packets it sends itself.
391    ///
392    /// # Note
393    ///
394    /// This may not have any affect on IPv4 sockets.
395    pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
396        self.watcher.get_ref().set_multicast_loop_v6(on)
397    }
398
399    /// Gets the value of the `IP_TTL` option for this socket.
400    ///
401    /// For more information about this option, see [`set_ttl`].
402    ///
403    /// [`set_ttl`]: #method.set_ttl
404    pub fn ttl(&self) -> io::Result<u32> {
405        self.watcher.get_ref().ttl()
406    }
407
408    /// Sets the value for the `IP_TTL` option on this socket.
409    ///
410    /// This value sets the time-to-live field that is used in every packet sent
411    /// from this socket.
412    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
413        self.watcher.get_ref().set_ttl(ttl)
414    }
415
416    /// Executes an operation of the `IP_ADD_MEMBERSHIP` type.
417    ///
418    /// This method specifies a new multicast group for this socket to join. The address must be
419    /// a valid multicast address, and `interface` is the address of the local interface with which
420    /// the system should join the multicast group. If it's equal to `INADDR_ANY` then an
421    /// appropriate interface is chosen by the system.
422    ///
423    /// # Examples
424    ///
425    /// ```no_run
426    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
427    /// #
428    /// use std::net::Ipv4Addr;
429    ///
430    /// use async_std::net::UdpSocket;
431    ///
432    /// let interface = Ipv4Addr::new(0, 0, 0, 0);
433    /// let mdns_addr = Ipv4Addr::new(224, 0, 0, 123);
434    ///
435    /// let socket = UdpSocket::bind("127.0.0.1:0").await?;
436    /// socket.join_multicast_v4(mdns_addr, interface)?;
437    /// #
438    /// # Ok(()) }) }
439    /// ```
440    pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> {
441        self.watcher
442            .get_ref()
443            .join_multicast_v4(&multiaddr, &interface)
444    }
445
446    /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type.
447    ///
448    /// This method specifies a new multicast group for this socket to join. The address must be
449    /// a valid multicast address, and `interface` is the index of the interface to join/leave (or
450    /// 0 to indicate any interface).
451    ///
452    /// # Examples
453    ///
454    /// ```no_run
455    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
456    /// #
457    /// use std::net::{Ipv6Addr, SocketAddr};
458    ///
459    /// use async_std::net::UdpSocket;
460    ///
461    /// let socket_addr = SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0);
462    /// let mdns_addr = Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x0123);
463    /// let socket = UdpSocket::bind(&socket_addr).await?;
464    ///
465    /// socket.join_multicast_v6(&mdns_addr, 0)?;
466    /// #
467    /// # Ok(()) }) }
468    /// ```
469    pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
470        self.watcher
471            .get_ref()
472            .join_multicast_v6(multiaddr, interface)
473    }
474
475    /// Executes an operation of the `IP_DROP_MEMBERSHIP` type.
476    ///
477    /// For more information about this option, see [`join_multicast_v4`].
478    ///
479    /// [`join_multicast_v4`]: #method.join_multicast_v4
480    pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> {
481        self.watcher
482            .get_ref()
483            .leave_multicast_v4(&multiaddr, &interface)
484    }
485
486    /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type.
487    ///
488    /// For more information about this option, see [`join_multicast_v6`].
489    ///
490    /// [`join_multicast_v6`]: #method.join_multicast_v6
491    pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
492        self.watcher
493            .get_ref()
494            .leave_multicast_v6(multiaddr, interface)
495    }
496}
497
498impl From<std::net::UdpSocket> for UdpSocket {
499    /// Converts a `std::net::UdpSocket` into its asynchronous equivalent.
500    fn from(socket: std::net::UdpSocket) -> UdpSocket {
501        let mio_socket = mio::net::UdpSocket::from_socket(socket).unwrap();
502        UdpSocket {
503            watcher: Watcher::new(mio_socket),
504        }
505    }
506}
507
508cfg_unix! {
509    use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
510
511    impl AsRawFd for UdpSocket {
512        fn as_raw_fd(&self) -> RawFd {
513            self.watcher.get_ref().as_raw_fd()
514        }
515    }
516
517    impl FromRawFd for UdpSocket {
518        unsafe fn from_raw_fd(fd: RawFd) -> UdpSocket {
519            std::net::UdpSocket::from_raw_fd(fd).into()
520        }
521    }
522
523    impl IntoRawFd for UdpSocket {
524        fn into_raw_fd(self) -> RawFd {
525            self.watcher.into_inner().into_raw_fd()
526        }
527    }
528}
529
530cfg_windows! {
531    // use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
532    //
533    // impl AsRawSocket for UdpSocket {
534    //     fn as_raw_socket(&self) -> RawSocket {
535    //         self.raw_socket
536    //     }
537    // }
538    //
539    // impl FromRawSocket for UdpSocket {
540    //     unsafe fn from_raw_socket(handle: RawSocket) -> UdpSocket {
541    //         net::UdpSocket::from_raw_socket(handle).into()
542    //     }
543    // }
544    //
545    // impl IntoRawSocket for UdpSocket {
546    //     fn into_raw_socket(self) -> RawSocket {
547    //         self.raw_socket
548    //     }
549    // }
550}