ant_quic/connection/
datagrams.rs1use std::collections::VecDeque;
9
10use bytes::Bytes;
11use thiserror::Error;
12use tracing::{debug, trace};
13
14use super::Connection;
15use crate::{
16 TransportError,
17 frame::{Datagram, FrameStruct},
18};
19
20pub struct Datagrams<'a> {
22 pub(super) conn: &'a mut Connection,
23}
24
25impl Datagrams<'_> {
26 pub fn send(&mut self, data: Bytes, drop: bool) -> Result<(), SendDatagramError> {
36 if self.conn.config.datagram_receive_buffer_size.is_none() {
37 return Err(SendDatagramError::Disabled);
38 }
39 let max = self
40 .max_size()
41 .ok_or(SendDatagramError::UnsupportedByPeer)?;
42 if data.len() > max {
43 return Err(SendDatagramError::TooLarge);
44 }
45 if drop {
46 while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
47 let prev = self
48 .conn
49 .datagrams
50 .outgoing
51 .pop_front()
52 .expect("datagrams.outgoing_total desynchronized");
53 debug!(
54 len = prev.data.len(),
55 "dropping outgoing datagram (send buffer full)"
56 );
57 self.conn.datagrams.outgoing_total -= prev.data.len();
58 }
59 } else if self.conn.datagrams.outgoing_total + data.len()
60 > self.conn.config.datagram_send_buffer_size
61 {
62 self.conn.datagrams.send_blocked = true;
63 return Err(SendDatagramError::Blocked(data));
64 }
65 self.conn.datagrams.outgoing_total += data.len();
66 self.conn.datagrams.outgoing.push_back(Datagram { data });
67 Ok(())
68 }
69
70 pub fn max_size(&self) -> Option<usize> {
80 let max_size = self.conn.path.current_mtu() as usize
84 - self.conn.predict_1rtt_overhead(None)
85 - Datagram::SIZE_BOUND;
86 let limit = self
87 .conn
88 .peer_params
89 .max_datagram_frame_size?
90 .into_inner()
91 .saturating_sub(Datagram::SIZE_BOUND as u64);
92 Some(limit.min(max_size as u64) as usize)
93 }
94
95 pub fn recv(&mut self) -> Option<Bytes> {
97 self.conn.datagrams.recv()
98 }
99
100 pub fn send_buffer_space(&self) -> usize {
105 self.conn
106 .config
107 .datagram_send_buffer_size
108 .saturating_sub(self.conn.datagrams.outgoing_total)
109 }
110}
111
112#[derive(Debug, Clone, Copy, Default)]
114pub struct DatagramReceivedResult {
115 pub was_empty: bool,
117 pub dropped_count: usize,
119 pub dropped_bytes: usize,
121}
122
123#[derive(Default)]
124pub(super) struct DatagramState {
125 pub(super) recv_buffered: usize,
128 pub(super) incoming: VecDeque<Datagram>,
129 pub(super) outgoing: VecDeque<Datagram>,
130 pub(super) outgoing_total: usize,
131 pub(super) send_blocked: bool,
132}
133
134impl DatagramState {
135 pub(super) fn received(
136 &mut self,
137 datagram: Datagram,
138 window: &Option<usize>,
139 ) -> Result<DatagramReceivedResult, TransportError> {
140 let window = match window {
141 None => {
142 return Err(TransportError::PROTOCOL_VIOLATION(
143 "unexpected DATAGRAM frame",
144 ));
145 }
146 Some(x) => *x,
147 };
148
149 if datagram.data.len() > window {
150 return Err(TransportError::PROTOCOL_VIOLATION("oversized datagram"));
151 }
152
153 let was_empty = self.recv_buffered == 0;
154 let mut dropped_count = 0;
155 let mut dropped_bytes = 0;
156
157 while datagram.data.len() + self.recv_buffered > window {
158 if let Some(dropped) = self.recv() {
159 dropped_count += 1;
160 dropped_bytes += dropped.len();
161 debug!(
162 dropped_count,
163 dropped_bytes,
164 recv_buffered = self.recv_buffered,
165 incoming_len = datagram.data.len(),
166 window,
167 "dropping stale datagram (buffer full) - application not reading fast enough"
168 );
169 } else {
170 break;
172 }
173 }
174
175 self.recv_buffered += datagram.data.len();
176 self.incoming.push_back(datagram);
177 Ok(DatagramReceivedResult {
178 was_empty,
179 dropped_count,
180 dropped_bytes,
181 })
182 }
183
184 pub(super) fn drop_oversized(&mut self, max_payload: usize) {
189 self.outgoing.retain(|datagram| {
190 let result = datagram.data.len() < max_payload;
191 if !result {
192 trace!(
193 "dropping {} byte datagram violating {} byte limit",
194 datagram.data.len(),
195 max_payload
196 );
197 self.outgoing_total -= datagram.data.len();
198 }
199 result
200 });
201 }
202
203 pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
208 let datagram = match self.outgoing.pop_front() {
209 Some(x) => x,
210 None => return false,
211 };
212
213 if buf.len() + datagram.size(true) > max_size {
214 self.outgoing.push_front(datagram);
217 return false;
218 }
219
220 trace!(len = datagram.data.len(), "DATAGRAM");
221
222 self.outgoing_total -= datagram.data.len();
223 datagram.encode(true, buf);
224 true
225 }
226
227 pub(super) fn recv(&mut self) -> Option<Bytes> {
228 let x = self.incoming.pop_front()?.data;
229 self.recv_buffered -= x.len();
230 Some(x)
231 }
232}
233
234#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
236pub enum SendDatagramError {
237 #[error("datagrams not supported by peer")]
239 UnsupportedByPeer,
240 #[error("datagram support disabled")]
242 Disabled,
243 #[error("datagram too large")]
248 TooLarge,
249 #[error("datagram send blocked")]
251 Blocked(Bytes),
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[test]
259 fn test_datagram_received_no_drop() {
260 let mut state = DatagramState::default();
261 let window = Some(1024);
262
263 let datagram = Datagram {
265 data: Bytes::from(vec![0u8; 100]),
266 };
267 let result = state.received(datagram, &window).unwrap();
268
269 assert!(result.was_empty);
270 assert_eq!(result.dropped_count, 0);
271 assert_eq!(result.dropped_bytes, 0);
272 assert_eq!(state.recv_buffered, 100);
273 }
274
275 #[test]
276 fn test_datagram_received_with_drop() {
277 let mut state = DatagramState::default();
278 let window = Some(1024);
279
280 let datagram1 = Datagram {
282 data: Bytes::from(vec![0u8; 800]),
283 };
284 let result1 = state.received(datagram1, &window).unwrap();
285 assert!(result1.was_empty);
286 assert_eq!(result1.dropped_count, 0);
287
288 let datagram2 = Datagram {
290 data: Bytes::from(vec![1u8; 500]),
291 };
292 let result2 = state.received(datagram2, &window).unwrap();
293
294 assert!(!result2.was_empty);
296 assert_eq!(result2.dropped_count, 1);
297 assert_eq!(result2.dropped_bytes, 800);
298
299 assert_eq!(state.recv_buffered, 500);
301 assert_eq!(state.incoming.len(), 1);
302 }
303
304 #[test]
305 fn test_datagram_received_multiple_drops() {
306 let mut state = DatagramState::default();
307 let window = Some(1024);
308
309 for i in 0..5 {
311 let datagram = Datagram {
312 data: Bytes::from(vec![i as u8; 200]),
313 };
314 state.received(datagram, &window).unwrap();
315 }
316
317 assert_eq!(state.recv_buffered, 1000);
319 assert_eq!(state.incoming.len(), 5);
320
321 let large_datagram = Datagram {
323 data: Bytes::from(vec![99u8; 900]),
324 };
325 let result = state.received(large_datagram, &window).unwrap();
326
327 assert_eq!(result.dropped_count, 5);
329 assert_eq!(result.dropped_bytes, 1000);
330 assert_eq!(state.recv_buffered, 900);
331 assert_eq!(state.incoming.len(), 1);
332 }
333
334 #[test]
335 fn test_datagram_received_disabled() {
336 let mut state = DatagramState::default();
337 let window = None; let datagram = Datagram {
340 data: Bytes::from(vec![0u8; 100]),
341 };
342 let result = state.received(datagram, &window);
343
344 assert!(result.is_err());
345 }
346
347 #[test]
348 fn test_datagram_received_oversized() {
349 let mut state = DatagramState::default();
350 let window = Some(100);
351
352 let datagram = Datagram {
354 data: Bytes::from(vec![0u8; 200]),
355 };
356 let result = state.received(datagram, &window);
357
358 assert!(result.is_err());
359 }
360}