embassy_net/
udp.rs

1//! UDP sockets.
2
3use core::future::{poll_fn, Future};
4use core::mem;
5use core::task::{Context, Poll};
6
7use smoltcp::iface::{Interface, SocketHandle};
8use smoltcp::socket::udp;
9pub use smoltcp::socket::udp::{PacketMetadata, UdpMetadata};
10use smoltcp::wire::IpListenEndpoint;
11
12use crate::Stack;
13
14/// Error returned by [`UdpSocket::bind`].
15#[derive(PartialEq, Eq, Clone, Copy, Debug)]
16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub enum BindError {
18    /// The socket was already open.
19    InvalidState,
20    /// No route to host.
21    NoRoute,
22}
23
24/// Error returned by [`UdpSocket::send_to`].
25#[derive(PartialEq, Eq, Clone, Copy, Debug)]
26#[cfg_attr(feature = "defmt", derive(defmt::Format))]
27pub enum SendError {
28    /// No route to host.
29    NoRoute,
30    /// Socket not bound to an outgoing port.
31    SocketNotBound,
32    /// There is not enough transmit buffer capacity to ever send this packet.
33    PacketTooLarge,
34}
35
36/// Error returned by [`UdpSocket::recv_from`].
37#[derive(PartialEq, Eq, Clone, Copy, Debug)]
38#[cfg_attr(feature = "defmt", derive(defmt::Format))]
39pub enum RecvError {
40    /// Provided buffer was smaller than the received packet.
41    Truncated,
42}
43
44/// An UDP socket.
45pub struct UdpSocket<'a> {
46    stack: Stack<'a>,
47    handle: SocketHandle,
48}
49
50impl<'a> UdpSocket<'a> {
51    /// Create a new UDP socket using the provided stack and buffers.
52    pub fn new(
53        stack: Stack<'a>,
54        rx_meta: &'a mut [PacketMetadata],
55        rx_buffer: &'a mut [u8],
56        tx_meta: &'a mut [PacketMetadata],
57        tx_buffer: &'a mut [u8],
58    ) -> Self {
59        let handle = stack.with_mut(|i| {
60            let rx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(rx_meta) };
61            let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) };
62            let tx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(tx_meta) };
63            let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) };
64            i.sockets.add(udp::Socket::new(
65                udp::PacketBuffer::new(rx_meta, rx_buffer),
66                udp::PacketBuffer::new(tx_meta, tx_buffer),
67            ))
68        });
69
70        Self { stack, handle }
71    }
72
73    /// Bind the socket to a local endpoint.
74    pub fn bind<T>(&mut self, endpoint: T) -> Result<(), BindError>
75    where
76        T: Into<IpListenEndpoint>,
77    {
78        let mut endpoint = endpoint.into();
79
80        if endpoint.port == 0 {
81            // If user didn't specify port allocate a dynamic port.
82            endpoint.port = self.stack.with_mut(|i| i.get_local_port());
83        }
84
85        match self.with_mut(|s, _| s.bind(endpoint)) {
86            Ok(()) => Ok(()),
87            Err(udp::BindError::InvalidState) => Err(BindError::InvalidState),
88            Err(udp::BindError::Unaddressable) => Err(BindError::NoRoute),
89        }
90    }
91
92    fn with<R>(&self, f: impl FnOnce(&udp::Socket, &Interface) -> R) -> R {
93        self.stack.with(|i| {
94            let socket = i.sockets.get::<udp::Socket>(self.handle);
95            f(socket, &i.iface)
96        })
97    }
98
99    fn with_mut<R>(&self, f: impl FnOnce(&mut udp::Socket, &mut Interface) -> R) -> R {
100        self.stack.with_mut(|i| {
101            let socket = i.sockets.get_mut::<udp::Socket>(self.handle);
102            let res = f(socket, &mut i.iface);
103            i.waker.wake();
104            res
105        })
106    }
107
108    /// Wait until the socket becomes readable.
109    ///
110    /// A socket is readable when a packet has been received, or when there are queued packets in
111    /// the buffer.
112    pub fn wait_recv_ready(&self) -> impl Future<Output = ()> + '_ {
113        poll_fn(move |cx| self.poll_recv_ready(cx))
114    }
115
116    /// Wait until a datagram can be read.
117    ///
118    /// When no datagram is readable, this method will return `Poll::Pending` and
119    /// register the current task to be notified when a datagram is received.
120    ///
121    /// When a datagram is received, this method will return `Poll::Ready`.
122    pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
123        self.with_mut(|s, _| {
124            if s.can_recv() {
125                Poll::Ready(())
126            } else {
127                // socket buffer is empty wait until at least one byte has arrived
128                s.register_recv_waker(cx.waker());
129                Poll::Pending
130            }
131        })
132    }
133
134    /// Receive a datagram.
135    ///
136    /// This method will wait until a datagram is received.
137    ///
138    /// Returns the number of bytes received and the remote endpoint.
139    pub fn recv_from<'s>(
140        &'s self,
141        buf: &'s mut [u8],
142    ) -> impl Future<Output = Result<(usize, UdpMetadata), RecvError>> + 's {
143        poll_fn(|cx| self.poll_recv_from(buf, cx))
144    }
145
146    /// Receive a datagram.
147    ///
148    /// When no datagram is available, this method will return `Poll::Pending` and
149    /// register the current task to be notified when a datagram is received.
150    ///
151    /// When a datagram is received, this method will return `Poll::Ready` with the
152    /// number of bytes received and the remote endpoint.
153    pub fn poll_recv_from(
154        &self,
155        buf: &mut [u8],
156        cx: &mut Context<'_>,
157    ) -> Poll<Result<(usize, UdpMetadata), RecvError>> {
158        self.with_mut(|s, _| match s.recv_slice(buf) {
159            Ok((n, meta)) => Poll::Ready(Ok((n, meta))),
160            // No data ready
161            Err(udp::RecvError::Truncated) => Poll::Ready(Err(RecvError::Truncated)),
162            Err(udp::RecvError::Exhausted) => {
163                s.register_recv_waker(cx.waker());
164                Poll::Pending
165            }
166        })
167    }
168
169    /// Receive a datagram with a zero-copy function.
170    ///
171    /// When no datagram is available, this method will return `Poll::Pending` and
172    /// register the current task to be notified when a datagram is received.
173    ///
174    /// When a datagram is received, this method will call the provided function
175    /// with a reference to the received bytes and the remote endpoint and return
176    /// `Poll::Ready` with the function's returned value.
177    pub async fn recv_from_with<F, R>(&mut self, f: F) -> R
178    where
179        F: FnOnce(&[u8], UdpMetadata) -> R,
180    {
181        let mut f = Some(f);
182        poll_fn(move |cx| {
183            self.with_mut(|s, _| {
184                match s.recv() {
185                    Ok((buffer, endpoint)) => Poll::Ready(unwrap!(f.take())(buffer, endpoint)),
186                    Err(udp::RecvError::Truncated) => unreachable!(),
187                    Err(udp::RecvError::Exhausted) => {
188                        // socket buffer is empty wait until at least one byte has arrived
189                        s.register_recv_waker(cx.waker());
190                        Poll::Pending
191                    }
192                }
193            })
194        })
195        .await
196    }
197
198    /// Wait until the socket becomes writable.
199    ///
200    /// A socket becomes writable when there is space in the buffer, from initial memory or after
201    /// dispatching datagrams on a full buffer.
202    pub fn wait_send_ready(&self) -> impl Future<Output = ()> + '_ {
203        poll_fn(|cx| self.poll_send_ready(cx))
204    }
205
206    /// Wait until a datagram can be sent.
207    ///
208    /// When no datagram can be sent (i.e. the buffer is full), this method will return
209    /// `Poll::Pending` and register the current task to be notified when
210    /// space is freed in the buffer after a datagram has been dispatched.
211    ///
212    /// When a datagram can be sent, this method will return `Poll::Ready`.
213    pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
214        self.with_mut(|s, _| {
215            if s.can_send() {
216                Poll::Ready(())
217            } else {
218                // socket buffer is full wait until a datagram has been dispatched
219                s.register_send_waker(cx.waker());
220                Poll::Pending
221            }
222        })
223    }
224
225    /// Send a datagram to the specified remote endpoint.
226    ///
227    /// This method will wait until the datagram has been sent.
228    ///
229    /// If the socket's send buffer is too small to fit `buf`, this method will return `Err(SendError::PacketTooLarge)`
230    ///
231    /// When the remote endpoint is not reachable, this method will return `Err(SendError::NoRoute)`
232    pub async fn send_to<T>(&self, buf: &[u8], remote_endpoint: T) -> Result<(), SendError>
233    where
234        T: Into<UdpMetadata>,
235    {
236        let remote_endpoint: UdpMetadata = remote_endpoint.into();
237        poll_fn(move |cx| self.poll_send_to(buf, remote_endpoint, cx)).await
238    }
239
240    /// Send a datagram to the specified remote endpoint.
241    ///
242    /// When the datagram has been sent, this method will return `Poll::Ready(Ok())`.
243    ///
244    /// When the socket's send buffer is full, this method will return `Poll::Pending`
245    /// and register the current task to be notified when the buffer has space available.
246    ///
247    /// If the socket's send buffer is too small to fit `buf`, this method will return `Poll::Ready(Err(SendError::PacketTooLarge))`
248    ///
249    /// When the remote endpoint is not reachable, this method will return `Poll::Ready(Err(Error::NoRoute))`.
250    pub fn poll_send_to<T>(&self, buf: &[u8], remote_endpoint: T, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>
251    where
252        T: Into<UdpMetadata>,
253    {
254        // Don't need to wake waker in `with_mut` if the buffer will never fit the udp tx_buffer.
255        let send_capacity_too_small = self.with(|s, _| s.payload_send_capacity() < buf.len());
256        if send_capacity_too_small {
257            return Poll::Ready(Err(SendError::PacketTooLarge));
258        }
259
260        self.with_mut(|s, _| match s.send_slice(buf, remote_endpoint) {
261            // Entire datagram has been sent
262            Ok(()) => Poll::Ready(Ok(())),
263            Err(udp::SendError::BufferFull) => {
264                s.register_send_waker(cx.waker());
265                Poll::Pending
266            }
267            Err(udp::SendError::Unaddressable) => {
268                // If no sender/outgoing port is specified, there is not really "no route"
269                if s.endpoint().port == 0 {
270                    Poll::Ready(Err(SendError::SocketNotBound))
271                } else {
272                    Poll::Ready(Err(SendError::NoRoute))
273                }
274            }
275        })
276    }
277
278    /// Send a datagram to the specified remote endpoint with a zero-copy function.
279    ///
280    /// This method will wait until the buffer can fit the requested size before
281    /// calling the function to fill its contents.
282    ///
283    /// If the socket's send buffer is too small to fit `size`, this method will return `Err(SendError::PacketTooLarge)`
284    ///
285    /// When the remote endpoint is not reachable, this method will return `Err(SendError::NoRoute)`
286    pub async fn send_to_with<T, F, R>(&mut self, size: usize, remote_endpoint: T, f: F) -> Result<R, SendError>
287    where
288        T: Into<UdpMetadata> + Copy,
289        F: FnOnce(&mut [u8]) -> R,
290    {
291        // Don't need to wake waker in `with_mut` if the buffer will never fit the udp tx_buffer.
292        let send_capacity_too_small = self.with(|s, _| s.payload_send_capacity() < size);
293        if send_capacity_too_small {
294            return Err(SendError::PacketTooLarge);
295        }
296
297        let mut f = Some(f);
298        poll_fn(move |cx| {
299            self.with_mut(|s, _| {
300                match s.send(size, remote_endpoint) {
301                    Ok(buffer) => Poll::Ready(Ok(unwrap!(f.take())(buffer))),
302                    Err(udp::SendError::BufferFull) => {
303                        s.register_send_waker(cx.waker());
304                        Poll::Pending
305                    }
306                    Err(udp::SendError::Unaddressable) => {
307                        // If no sender/outgoing port is specified, there is not really "no route"
308                        if s.endpoint().port == 0 {
309                            Poll::Ready(Err(SendError::SocketNotBound))
310                        } else {
311                            Poll::Ready(Err(SendError::NoRoute))
312                        }
313                    }
314                }
315            })
316        })
317        .await
318    }
319
320    /// Flush the socket.
321    ///
322    /// This method will wait until the socket is flushed.
323    pub fn flush(&mut self) -> impl Future<Output = ()> + '_ {
324        poll_fn(|cx| {
325            self.with_mut(|s, _| {
326                if s.send_queue() == 0 {
327                    Poll::Ready(())
328                } else {
329                    s.register_send_waker(cx.waker());
330                    Poll::Pending
331                }
332            })
333        })
334    }
335
336    /// Returns the local endpoint of the socket.
337    pub fn endpoint(&self) -> IpListenEndpoint {
338        self.with(|s, _| s.endpoint())
339    }
340
341    /// Returns whether the socket is open.
342
343    pub fn is_open(&self) -> bool {
344        self.with(|s, _| s.is_open())
345    }
346
347    /// Close the socket.
348    pub fn close(&mut self) {
349        self.with_mut(|s, _| s.close())
350    }
351
352    /// Returns whether the socket is ready to send data, i.e. it has enough buffer space to hold a packet.
353    pub fn may_send(&self) -> bool {
354        self.with(|s, _| s.can_send())
355    }
356
357    /// Returns whether the socket is ready to receive data, i.e. it has received a packet that's now in the buffer.
358    pub fn may_recv(&self) -> bool {
359        self.with(|s, _| s.can_recv())
360    }
361
362    /// Return the maximum number packets the socket can receive.
363    pub fn packet_recv_capacity(&self) -> usize {
364        self.with(|s, _| s.packet_recv_capacity())
365    }
366
367    /// Return the maximum number packets the socket can receive.
368    pub fn packet_send_capacity(&self) -> usize {
369        self.with(|s, _| s.packet_send_capacity())
370    }
371
372    /// Return the maximum number of bytes inside the recv buffer.
373    pub fn payload_recv_capacity(&self) -> usize {
374        self.with(|s, _| s.payload_recv_capacity())
375    }
376
377    /// Return the maximum number of bytes inside the transmit buffer.
378    pub fn payload_send_capacity(&self) -> usize {
379        self.with(|s, _| s.payload_send_capacity())
380    }
381
382    /// Set the hop limit field in the IP header of sent packets.
383    pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) {
384        self.with_mut(|s, _| s.set_hop_limit(hop_limit))
385    }
386}
387
388impl Drop for UdpSocket<'_> {
389    fn drop(&mut self) {
390        self.stack.with_mut(|i| i.sockets.remove(self.handle));
391    }
392}
393
394fn _assert_covariant<'a, 'b: 'a>(x: UdpSocket<'b>) -> UdpSocket<'a> {
395    x
396}