Skip to main content

ant_quic/connection/
datagrams.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::collections::VecDeque;
9
10use bytes::Bytes;
11use thiserror::Error;
12use tracing::{debug, trace};
13
14use super::Connection;
15use crate::{
16    TransportError,
17    frame::{Datagram, FrameStruct},
18};
19
20/// API to control datagram traffic
21pub struct Datagrams<'a> {
22    pub(super) conn: &'a mut Connection,
23}
24
25impl Datagrams<'_> {
26    /// Queue an unreliable, unordered datagram for immediate transmission
27    ///
28    /// If `drop` is true, previously queued datagrams which are still unsent may be discarded to
29    /// make space for this datagram, in order of oldest to newest. If `drop` is false, and there
30    /// isn't enough space due to previously queued datagrams, this function will return
31    /// `SendDatagramError::Blocked`. `Event::DatagramsUnblocked` will be emitted once datagrams
32    /// have been sent.
33    ///
34    /// Returns `Err` iff a `len`-byte datagram cannot currently be sent.
35    pub fn send(&mut self, data: Bytes, drop: bool) -> Result<(), SendDatagramError> {
36        if self.conn.config.datagram_receive_buffer_size.is_none() {
37            return Err(SendDatagramError::Disabled);
38        }
39        let max = self
40            .max_size()
41            .ok_or(SendDatagramError::UnsupportedByPeer)?;
42        if data.len() > max {
43            return Err(SendDatagramError::TooLarge);
44        }
45        if drop {
46            while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
47                let prev = self
48                    .conn
49                    .datagrams
50                    .outgoing
51                    .pop_front()
52                    .expect("datagrams.outgoing_total desynchronized");
53                debug!(
54                    len = prev.data.len(),
55                    "dropping outgoing datagram (send buffer full)"
56                );
57                self.conn.datagrams.outgoing_total -= prev.data.len();
58            }
59        } else if self.conn.datagrams.outgoing_total + data.len()
60            > self.conn.config.datagram_send_buffer_size
61        {
62            self.conn.datagrams.send_blocked = true;
63            return Err(SendDatagramError::Blocked(data));
64        }
65        self.conn.datagrams.outgoing_total += data.len();
66        self.conn.datagrams.outgoing.push_back(Datagram { data });
67        Ok(())
68    }
69
70    /// Compute the maximum size of datagrams that may passed to `send_datagram`
71    ///
72    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
73    ///
74    /// This may change over the lifetime of a connection according to variation in the path MTU
75    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
76    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
77    ///
78    /// Not necessarily the maximum size of received datagrams.
79    pub fn max_size(&self) -> Option<usize> {
80        // We use the conservative overhead bound for any packet number, reducing the budget by at
81        // most 3 bytes, so that PN size fluctuations don't cause users sending maximum-size
82        // datagrams to suffer avoidable packet loss.
83        let max_size = self.conn.path.current_mtu() as usize
84            - self.conn.predict_1rtt_overhead(None)
85            - Datagram::SIZE_BOUND;
86        let limit = self
87            .conn
88            .peer_params
89            .max_datagram_frame_size?
90            .into_inner()
91            .saturating_sub(Datagram::SIZE_BOUND as u64);
92        Some(limit.min(max_size as u64) as usize)
93    }
94
95    /// Receive an unreliable, unordered datagram
96    pub fn recv(&mut self) -> Option<Bytes> {
97        self.conn.datagrams.recv()
98    }
99
100    /// Bytes available in the outgoing datagram buffer
101    ///
102    /// When greater than zero, [`send`](Self::send)ing a datagram of at most this size is
103    /// guaranteed not to cause older datagrams to be dropped.
104    pub fn send_buffer_space(&self) -> usize {
105        self.conn
106            .config
107            .datagram_send_buffer_size
108            .saturating_sub(self.conn.datagrams.outgoing_total)
109    }
110}
111
112/// Result of receiving a datagram, including any drops that occurred
113#[derive(Debug, Clone, Copy, Default)]
114pub struct DatagramReceivedResult {
115    /// Whether the receive buffer was empty before this datagram
116    pub was_empty: bool,
117    /// Number of old datagrams that were dropped to make room
118    pub dropped_count: usize,
119    /// Total bytes of dropped datagrams
120    pub dropped_bytes: usize,
121}
122
123#[derive(Default)]
124pub(super) struct DatagramState {
125    /// Number of bytes of datagrams that have been received by the local transport but not
126    /// delivered to the application
127    pub(super) recv_buffered: usize,
128    pub(super) incoming: VecDeque<Datagram>,
129    pub(super) outgoing: VecDeque<Datagram>,
130    pub(super) outgoing_total: usize,
131    pub(super) send_blocked: bool,
132}
133
134impl DatagramState {
135    pub(super) fn received(
136        &mut self,
137        datagram: Datagram,
138        window: &Option<usize>,
139    ) -> Result<DatagramReceivedResult, TransportError> {
140        let window = match window {
141            None => {
142                return Err(TransportError::PROTOCOL_VIOLATION(
143                    "unexpected DATAGRAM frame",
144                ));
145            }
146            Some(x) => *x,
147        };
148
149        if datagram.data.len() > window {
150            return Err(TransportError::PROTOCOL_VIOLATION("oversized datagram"));
151        }
152
153        let was_empty = self.recv_buffered == 0;
154        let mut dropped_count = 0;
155        let mut dropped_bytes = 0;
156
157        while datagram.data.len() + self.recv_buffered > window {
158            if let Some(dropped) = self.recv() {
159                dropped_count += 1;
160                dropped_bytes += dropped.len();
161                debug!(
162                    dropped_count,
163                    dropped_bytes,
164                    recv_buffered = self.recv_buffered,
165                    incoming_len = datagram.data.len(),
166                    window,
167                    "dropping stale datagram (buffer full) - application not reading fast enough"
168                );
169            } else {
170                // Buffer is empty but still can't fit - shouldn't happen with valid window
171                break;
172            }
173        }
174
175        self.recv_buffered += datagram.data.len();
176        self.incoming.push_back(datagram);
177        Ok(DatagramReceivedResult {
178            was_empty,
179            dropped_count,
180            dropped_bytes,
181        })
182    }
183
184    /// Discard outgoing datagrams with a payload larger than `max_payload` bytes
185    ///
186    /// Used to ensure that reductions in MTU don't get us stuck in a state where we have a datagram
187    /// queued but can't send it.
188    pub(super) fn drop_oversized(&mut self, max_payload: usize) {
189        self.outgoing.retain(|datagram| {
190            let result = datagram.data.len() < max_payload;
191            if !result {
192                trace!(
193                    "dropping {} byte datagram violating {} byte limit",
194                    datagram.data.len(),
195                    max_payload
196                );
197                self.outgoing_total -= datagram.data.len();
198            }
199            result
200        });
201    }
202
203    /// Attempt to write a datagram frame into `buf`, consuming it from `self.outgoing`
204    ///
205    /// Returns whether a frame was written. At most `max_size` bytes will be written, including
206    /// framing.
207    pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
208        let datagram = match self.outgoing.pop_front() {
209            Some(x) => x,
210            None => return false,
211        };
212
213        if buf.len() + datagram.size(true) > max_size {
214            // Future work: we could be more clever about cramming small datagrams into
215            // mostly-full packets when a larger one is queued first
216            self.outgoing.push_front(datagram);
217            return false;
218        }
219
220        trace!(len = datagram.data.len(), "DATAGRAM");
221
222        self.outgoing_total -= datagram.data.len();
223        datagram.encode(true, buf);
224        true
225    }
226
227    pub(super) fn recv(&mut self) -> Option<Bytes> {
228        let x = self.incoming.pop_front()?.data;
229        self.recv_buffered -= x.len();
230        Some(x)
231    }
232}
233
234/// Errors that can arise when sending a datagram
235#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
236pub enum SendDatagramError {
237    /// The peer does not support receiving datagram frames
238    #[error("datagrams not supported by peer")]
239    UnsupportedByPeer,
240    /// Datagram support is disabled locally
241    #[error("datagram support disabled")]
242    Disabled,
243    /// The datagram is larger than the connection can currently accommodate
244    ///
245    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
246    /// exceeded.
247    #[error("datagram too large")]
248    TooLarge,
249    /// Send would block
250    #[error("datagram send blocked")]
251    Blocked(Bytes),
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn test_datagram_received_no_drop() {
260        let mut state = DatagramState::default();
261        let window = Some(1024);
262
263        // Add a small datagram that fits
264        let datagram = Datagram {
265            data: Bytes::from(vec![0u8; 100]),
266        };
267        let result = state.received(datagram, &window).unwrap();
268
269        assert!(result.was_empty);
270        assert_eq!(result.dropped_count, 0);
271        assert_eq!(result.dropped_bytes, 0);
272        assert_eq!(state.recv_buffered, 100);
273    }
274
275    #[test]
276    fn test_datagram_received_with_drop() {
277        let mut state = DatagramState::default();
278        let window = Some(1024);
279
280        // Fill the buffer with a datagram
281        let datagram1 = Datagram {
282            data: Bytes::from(vec![0u8; 800]),
283        };
284        let result1 = state.received(datagram1, &window).unwrap();
285        assert!(result1.was_empty);
286        assert_eq!(result1.dropped_count, 0);
287
288        // Add another datagram that would exceed the window
289        let datagram2 = Datagram {
290            data: Bytes::from(vec![1u8; 500]),
291        };
292        let result2 = state.received(datagram2, &window).unwrap();
293
294        // Should have dropped the first datagram to make room
295        assert!(!result2.was_empty);
296        assert_eq!(result2.dropped_count, 1);
297        assert_eq!(result2.dropped_bytes, 800);
298
299        // Buffer should now contain only the second datagram
300        assert_eq!(state.recv_buffered, 500);
301        assert_eq!(state.incoming.len(), 1);
302    }
303
304    #[test]
305    fn test_datagram_received_multiple_drops() {
306        let mut state = DatagramState::default();
307        let window = Some(1024);
308
309        // Fill with multiple small datagrams
310        for i in 0..5 {
311            let datagram = Datagram {
312                data: Bytes::from(vec![i as u8; 200]),
313            };
314            state.received(datagram, &window).unwrap();
315        }
316
317        // Buffer should have 1000 bytes (5 x 200)
318        assert_eq!(state.recv_buffered, 1000);
319        assert_eq!(state.incoming.len(), 5);
320
321        // Add a large datagram that requires dropping multiple old ones
322        let large_datagram = Datagram {
323            data: Bytes::from(vec![99u8; 900]),
324        };
325        let result = state.received(large_datagram, &window).unwrap();
326
327        // Should have dropped 5 datagrams (1000 bytes) to fit 900 bytes
328        assert_eq!(result.dropped_count, 5);
329        assert_eq!(result.dropped_bytes, 1000);
330        assert_eq!(state.recv_buffered, 900);
331        assert_eq!(state.incoming.len(), 1);
332    }
333
334    #[test]
335    fn test_datagram_received_disabled() {
336        let mut state = DatagramState::default();
337        let window = None; // Datagrams disabled
338
339        let datagram = Datagram {
340            data: Bytes::from(vec![0u8; 100]),
341        };
342        let result = state.received(datagram, &window);
343
344        assert!(result.is_err());
345    }
346
347    #[test]
348    fn test_datagram_received_oversized() {
349        let mut state = DatagramState::default();
350        let window = Some(100);
351
352        // Datagram larger than window
353        let datagram = Datagram {
354            data: Bytes::from(vec![0u8; 200]),
355        };
356        let result = state.received(datagram, &window);
357
358        assert!(result.is_err());
359    }
360}