ant_quic/connection/
datagrams.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::collections::VecDeque;
9
10use bytes::Bytes;
11use thiserror::Error;
12use tracing::{debug, trace};
13
14use super::Connection;
15use crate::{
16    TransportError,
17    frame::{Datagram, FrameStruct},
18};
19
20/// API to control datagram traffic
21pub struct Datagrams<'a> {
22    pub(super) conn: &'a mut Connection,
23}
24
25impl Datagrams<'_> {
26    /// Queue an unreliable, unordered datagram for immediate transmission
27    ///
28    /// If `drop` is true, previously queued datagrams which are still unsent may be discarded to
29    /// make space for this datagram, in order of oldest to newest. If `drop` is false, and there
30    /// isn't enough space due to previously queued datagrams, this function will return
31    /// `SendDatagramError::Blocked`. `Event::DatagramsUnblocked` will be emitted once datagrams
32    /// have been sent.
33    ///
34    /// Returns `Err` iff a `len`-byte datagram cannot currently be sent.
35    pub fn send(&mut self, data: Bytes, drop: bool) -> Result<(), SendDatagramError> {
36        if self.conn.config.datagram_receive_buffer_size.is_none() {
37            return Err(SendDatagramError::Disabled);
38        }
39        let max = self
40            .max_size()
41            .ok_or(SendDatagramError::UnsupportedByPeer)?;
42        if data.len() > max {
43            return Err(SendDatagramError::TooLarge);
44        }
45        if drop {
46            while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
47                let prev = self
48                    .conn
49                    .datagrams
50                    .outgoing
51                    .pop_front()
52                    .expect("datagrams.outgoing_total desynchronized");
53                trace!(len = prev.data.len(), "dropping outgoing datagram");
54                self.conn.datagrams.outgoing_total -= prev.data.len();
55            }
56        } else if self.conn.datagrams.outgoing_total + data.len()
57            > self.conn.config.datagram_send_buffer_size
58        {
59            self.conn.datagrams.send_blocked = true;
60            return Err(SendDatagramError::Blocked(data));
61        }
62        self.conn.datagrams.outgoing_total += data.len();
63        self.conn.datagrams.outgoing.push_back(Datagram { data });
64        Ok(())
65    }
66
67    /// Compute the maximum size of datagrams that may passed to `send_datagram`
68    ///
69    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
70    ///
71    /// This may change over the lifetime of a connection according to variation in the path MTU
72    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
73    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
74    ///
75    /// Not necessarily the maximum size of received datagrams.
76    pub fn max_size(&self) -> Option<usize> {
77        // We use the conservative overhead bound for any packet number, reducing the budget by at
78        // most 3 bytes, so that PN size fluctuations don't cause users sending maximum-size
79        // datagrams to suffer avoidable packet loss.
80        let max_size = self.conn.path.current_mtu() as usize
81            - self.conn.predict_1rtt_overhead(None)
82            - Datagram::SIZE_BOUND;
83        let limit = self
84            .conn
85            .peer_params
86            .max_datagram_frame_size?
87            .into_inner()
88            .saturating_sub(Datagram::SIZE_BOUND as u64);
89        Some(limit.min(max_size as u64) as usize)
90    }
91
92    /// Receive an unreliable, unordered datagram
93    pub fn recv(&mut self) -> Option<Bytes> {
94        self.conn.datagrams.recv()
95    }
96
97    /// Bytes available in the outgoing datagram buffer
98    ///
99    /// When greater than zero, [`send`](Self::send)ing a datagram of at most this size is
100    /// guaranteed not to cause older datagrams to be dropped.
101    pub fn send_buffer_space(&self) -> usize {
102        self.conn
103            .config
104            .datagram_send_buffer_size
105            .saturating_sub(self.conn.datagrams.outgoing_total)
106    }
107}
108
109#[derive(Default)]
110pub(super) struct DatagramState {
111    /// Number of bytes of datagrams that have been received by the local transport but not
112    /// delivered to the application
113    pub(super) recv_buffered: usize,
114    pub(super) incoming: VecDeque<Datagram>,
115    pub(super) outgoing: VecDeque<Datagram>,
116    pub(super) outgoing_total: usize,
117    pub(super) send_blocked: bool,
118}
119
120impl DatagramState {
121    pub(super) fn received(
122        &mut self,
123        datagram: Datagram,
124        window: &Option<usize>,
125    ) -> Result<bool, TransportError> {
126        let window = match window {
127            None => {
128                return Err(TransportError::PROTOCOL_VIOLATION(
129                    "unexpected DATAGRAM frame",
130                ));
131            }
132            Some(x) => *x,
133        };
134
135        if datagram.data.len() > window {
136            return Err(TransportError::PROTOCOL_VIOLATION("oversized datagram"));
137        }
138
139        let was_empty = self.recv_buffered == 0;
140        while datagram.data.len() + self.recv_buffered > window {
141            debug!("dropping stale datagram");
142            self.recv();
143        }
144
145        self.recv_buffered += datagram.data.len();
146        self.incoming.push_back(datagram);
147        Ok(was_empty)
148    }
149
150    /// Discard outgoing datagrams with a payload larger than `max_payload` bytes
151    ///
152    /// Used to ensure that reductions in MTU don't get us stuck in a state where we have a datagram
153    /// queued but can't send it.
154    pub(super) fn drop_oversized(&mut self, max_payload: usize) {
155        self.outgoing.retain(|datagram| {
156            let result = datagram.data.len() < max_payload;
157            if !result {
158                trace!(
159                    "dropping {} byte datagram violating {} byte limit",
160                    datagram.data.len(),
161                    max_payload
162                );
163                self.outgoing_total -= datagram.data.len();
164            }
165            result
166        });
167    }
168
169    /// Attempt to write a datagram frame into `buf`, consuming it from `self.outgoing`
170    ///
171    /// Returns whether a frame was written. At most `max_size` bytes will be written, including
172    /// framing.
173    pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
174        let datagram = match self.outgoing.pop_front() {
175            Some(x) => x,
176            None => return false,
177        };
178
179        if buf.len() + datagram.size(true) > max_size {
180            // Future work: we could be more clever about cramming small datagrams into
181            // mostly-full packets when a larger one is queued first
182            self.outgoing.push_front(datagram);
183            return false;
184        }
185
186        trace!(len = datagram.data.len(), "DATAGRAM");
187
188        self.outgoing_total -= datagram.data.len();
189        datagram.encode(true, buf);
190        true
191    }
192
193    pub(super) fn recv(&mut self) -> Option<Bytes> {
194        let x = self.incoming.pop_front()?.data;
195        self.recv_buffered -= x.len();
196        Some(x)
197    }
198}
199
200/// Errors that can arise when sending a datagram
201#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
202pub enum SendDatagramError {
203    /// The peer does not support receiving datagram frames
204    #[error("datagrams not supported by peer")]
205    UnsupportedByPeer,
206    /// Datagram support is disabled locally
207    #[error("datagram support disabled")]
208    Disabled,
209    /// The datagram is larger than the connection can currently accommodate
210    ///
211    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
212    /// exceeded.
213    #[error("datagram too large")]
214    TooLarge,
215    /// Send would block
216    #[error("datagram send blocked")]
217    Blocked(Bytes),
218}