ublox_sockets/
udp.rs

1use core::cmp::min;
2use fugit::{ExtU32, SecsDurationU32};
3
4use super::{Error, Instant, Result, RingBuffer, Socket, SocketHandle, SocketMeta};
5pub use embedded_nal::{Ipv4Addr, SocketAddr, SocketAddrV4};
6
7/// A UDP socket ring buffer.
8pub type SocketBuffer<const N: usize> = RingBuffer<u8, N>;
9
10#[derive(Debug, PartialEq, Eq, Clone, Copy)]
11#[cfg_attr(feature = "defmt", derive(defmt::Format))]
12pub enum State {
13    Closed,
14    Established,
15}
16
17impl Default for State {
18    fn default() -> Self {
19        State::Closed
20    }
21}
22
23/// A User Datagram Protocol socket.
24///
25/// A UDP socket is bound to a specific endpoint, and owns transmit and receive
26/// packet buffers.
27#[derive(Debug)]
28pub struct UdpSocket<const TIMER_HZ: u32, const L: usize> {
29    pub(crate) meta: SocketMeta,
30    pub(crate) endpoint: Option<SocketAddr>,
31    check_interval: SecsDurationU32,
32    read_timeout: Option<SecsDurationU32>,
33    state: State,
34    available_data: usize,
35    rx_buffer: SocketBuffer<L>,
36    last_check_time: Option<Instant<TIMER_HZ>>,
37    closed_time: Option<Instant<TIMER_HZ>>,
38}
39
40impl<const TIMER_HZ: u32, const L: usize> UdpSocket<TIMER_HZ, L> {
41    /// Create an UDP socket with the given buffers.
42    pub fn new(socket_id: u8) -> UdpSocket<TIMER_HZ, L> {
43        UdpSocket {
44            meta: SocketMeta {
45                handle: SocketHandle(socket_id),
46            },
47            check_interval: 15.secs(),
48            state: State::Closed,
49            read_timeout: Some(15.secs()),
50            endpoint: None,
51            available_data: 0,
52            rx_buffer: SocketBuffer::new(),
53            last_check_time: None,
54            closed_time: None,
55        }
56    }
57
58    /// Return the socket handle.
59    pub fn handle(&self) -> SocketHandle {
60        self.meta.handle
61    }
62
63    pub fn update_handle(&mut self, handle: SocketHandle) {
64        debug!(
65            "[UDP Socket] [{:?}] Updating handle {:?}",
66            self.handle(),
67            handle
68        );
69        self.meta.update(handle)
70    }
71
72    /// Return the bound endpoint.
73    pub fn endpoint(&self) -> Option<SocketAddr> {
74        self.endpoint
75    }
76
77    /// Return the connection state, in terms of the UDP connection.
78    pub fn state(&self) -> State {
79        self.state
80    }
81
82    pub fn set_state(&mut self, state: State) {
83        debug!(
84            "[UDP Socket] {:?}, state change: {:?} -> {:?}",
85            self.handle(),
86            self.state,
87            state
88        );
89        self.state = state
90    }
91
92    pub fn should_update_available_data(&mut self, ts: Instant<TIMER_HZ>) -> bool {
93        self.last_check_time
94            .replace(ts)
95            .and_then(|last_check_time| ts.checked_duration_since(last_check_time))
96            .map(|dur| dur >= self.check_interval)
97            .unwrap_or(false)
98    }
99
100    pub fn recycle(&self, ts: Instant<TIMER_HZ>) -> bool {
101        if let Some(read_timeout) = self.read_timeout {
102            self.closed_time
103                .and_then(|closed_time| ts.checked_duration_since(closed_time))
104                .map(|dur| dur >= read_timeout)
105                .unwrap_or(false)
106        } else {
107            false
108        }
109    }
110
111    pub fn closed_by_remote(&mut self, ts: Instant<TIMER_HZ>) {
112        self.closed_time.replace(ts);
113    }
114
115    /// Set available data.
116    pub fn set_available_data(&mut self, available_data: usize) {
117        self.available_data = available_data;
118    }
119
120    /// Get the number of bytes available to ingress.
121    pub fn get_available_data(&self) -> usize {
122        self.available_data
123    }
124
125    pub fn rx_window(&self) -> usize {
126        self.rx_buffer.window()
127    }
128
129    /// Bind the socket to the given endpoint.
130    ///
131    /// This function returns `Err(Error::Illegal)` if the socket was open
132    /// (see [is_open](#method.is_open)), and `Err(Error::Unaddressable)`
133    /// if the port in the given endpoint is zero.
134    pub fn bind<T: Into<SocketAddr>>(&mut self, endpoint: T) -> Result<()> {
135        if self.is_open() {
136            return Err(Error::Illegal);
137        }
138
139        self.endpoint.replace(endpoint.into());
140        Ok(())
141    }
142
143    /// Check whether the socket is open.
144    pub fn is_open(&self) -> bool {
145        self.endpoint.is_some()
146    }
147
148    /// Check whether the receive buffer is full.
149    pub fn can_recv(&self) -> bool {
150        !self.rx_buffer.is_full()
151    }
152
153    // /// Return the maximum number packets the socket can receive.
154    // #[inline]
155    // pub fn packet_recv_capacity(&self) -> usize {
156    //     self.rx_buffer.packet_capacity()
157    // }
158
159    // /// Return the maximum number of bytes inside the recv buffer.
160    // #[inline]
161    // pub fn payload_recv_capacity(&self) -> usize {
162    //     self.rx_buffer.payload_capacity()
163    // }
164
165    fn recv_impl<'b, F, R>(&'b mut self, f: F) -> Result<R>
166    where
167        F: FnOnce(&'b mut SocketBuffer<L>) -> (usize, R),
168    {
169        // We may have received some data inside the initial SYN, but until the connection
170        // is fully open we must not dequeue any data, as it may be overwritten by e.g.
171        // another (stale) SYN. (We do not support TCP Fast Open.)
172        if !self.is_open() {
173            return Err(Error::Illegal);
174        }
175
176        let (_size, result) = f(&mut self.rx_buffer);
177        Ok(result)
178    }
179
180    /// Dequeue a packet received from a remote endpoint, and return the endpoint as well
181    /// as a pointer to the payload.
182    ///
183    /// This function returns `Err(Error::Exhausted)` if the receive buffer is empty.
184    pub fn recv<'b, F, R>(&'b mut self, f: F) -> Result<R>
185    where
186        F: FnOnce(&'b mut [u8]) -> (usize, R),
187    {
188        self.recv_impl(|rx_buffer| rx_buffer.dequeue_many_with(f))
189    }
190
191    /// Dequeue a packet received from a remote endpoint, copy the payload into the given slice,
192    /// and return the amount of octets copied as well as the endpoint.
193    ///
194    /// See also [recv](#method.recv).
195    pub fn recv_slice(&mut self, data: &mut [u8]) -> Result<usize> {
196        self.recv_impl(|rx_buffer| {
197            let size = rx_buffer.dequeue_slice(data);
198            (size, size)
199        })
200    }
201
202    pub fn rx_enqueue_slice(&mut self, data: &[u8]) -> usize {
203        self.rx_buffer.enqueue_slice(data)
204    }
205
206    /// Peek at a packet received from a remote endpoint, and return the endpoint as well
207    /// as a pointer to the payload without removing the packet from the receive buffer.
208    /// This function otherwise behaves identically to [recv](#method.recv).
209    ///
210    /// It returns `Err(Error::Exhausted)` if the receive buffer is empty.
211    pub fn peek(&mut self, size: usize) -> Result<&[u8]> {
212        if !self.is_open() {
213            return Err(Error::Illegal);
214        }
215
216        Ok(self.rx_buffer.get_allocated(0, size))
217    }
218
219    /// Peek at a packet received from a remote endpoint, copy the payload into the given slice,
220    /// and return the amount of octets copied as well as the endpoint without removing the
221    /// packet from the receive buffer.
222    /// This function otherwise behaves identically to [recv_slice](#method.recv_slice).
223    ///
224    /// See also [peek](#method.peek).
225    pub fn peek_slice(&mut self, data: &mut [u8]) -> Result<usize> {
226        let buffer = self.peek(data.len())?;
227        let length = min(data.len(), buffer.len());
228        data[..length].copy_from_slice(&buffer[..length]);
229        Ok(length)
230    }
231
232    pub fn close(&mut self) {
233        self.endpoint.take();
234    }
235}
236
237#[cfg(feature = "defmt")]
238impl<const TIMER_HZ: u32, const L: usize> defmt::Format for UdpSocket<TIMER_HZ, L> {
239    fn format(&self, fmt: defmt::Formatter) {
240        defmt::write!(fmt, "[{:?}, {:?}],", self.handle(), self.state())
241    }
242}
243
244impl<const TIMER_HZ: u32, const L: usize> Into<Socket<TIMER_HZ, L>> for UdpSocket<TIMER_HZ, L> {
245    fn into(self) -> Socket<TIMER_HZ, L> {
246        Socket::Udp(self)
247    }
248}