ant_quic/connection/
spaces.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    cmp,
10    collections::{BTreeMap, VecDeque},
11    mem,
12    ops::{Bound, Index, IndexMut},
13};
14
15use rand::Rng;
16use rustc_hash::FxHashSet;
17use tracing::trace;
18
19use super::assembler::Assembler;
20use crate::{
21    Dir, Duration, Instant, SocketAddr, StreamId, TransportError, VarInt, connection::StreamsState,
22    crypto::Keys, frame, packet::SpaceId, range_set::ArrayRangeSet, shared::IssuedCid,
23};
24
25pub(super) struct PacketSpace {
26    pub(super) crypto: Option<Keys>,
27    pub(super) dedup: Dedup,
28    /// Highest received packet number
29    pub(super) rx_packet: u64,
30
31    /// Data to send
32    pub(super) pending: Retransmits,
33    /// Packet numbers to acknowledge
34    pub(super) pending_acks: PendingAcks,
35
36    /// The packet number of the next packet that will be sent, if any. In the Data space, the
37    /// packet number stored here is sometimes skipped by [`PacketNumberFilter`] logic.
38    pub(super) next_packet_number: u64,
39    /// The largest packet number the remote peer acknowledged in an ACK frame.
40    pub(super) largest_acked_packet: Option<u64>,
41    pub(super) largest_acked_packet_sent: Instant,
42    /// The highest-numbered ACK-eliciting packet we've sent
43    pub(super) largest_ack_eliciting_sent: u64,
44    /// Number of packets in `sent_packets` with numbers above `largest_ack_eliciting_sent`
45    pub(super) unacked_non_ack_eliciting_tail: u64,
46    /// Transmitted but not acked
47    // We use a BTreeMap here so we can efficiently query by range on ACK and for loss detection
48    pub(super) sent_packets: BTreeMap<u64, SentPacket>,
49    /// Number of explicit congestion notification codepoints seen on incoming packets
50    pub(super) ecn_counters: frame::EcnCounts,
51    /// Recent ECN counters sent by the peer in ACK frames
52    ///
53    /// Updated (and inspected) whenever we receive an ACK with a new highest acked packet
54    /// number. Stored per-space to simplify verification, which would otherwise have difficulty
55    /// distinguishing between ECN bleaching and counts having been updated by a near-simultaneous
56    /// ACK already processed in another space.
57    pub(super) ecn_feedback: frame::EcnCounts,
58
59    /// Incoming cryptographic handshake stream
60    pub(super) crypto_stream: Assembler,
61    /// Current offset of outgoing cryptographic handshake stream
62    pub(super) crypto_offset: u64,
63
64    /// The time the most recently sent retransmittable packet was sent.
65    pub(super) time_of_last_ack_eliciting_packet: Option<Instant>,
66    /// The time at which the earliest sent packet in this space will be considered lost based on
67    /// exceeding the reordering window in time. Only set for packets numbered prior to a packet
68    /// that has been acknowledged.
69    pub(super) loss_time: Option<Instant>,
70    /// Number of tail loss probes to send
71    pub(super) loss_probes: u32,
72    pub(super) ping_pending: bool,
73    pub(super) immediate_ack_pending: bool,
74    /// Number of congestion control "in flight" bytes
75    pub(super) in_flight: u64,
76    /// Number of packets sent in the current key phase
77    pub(super) sent_with_keys: u64,
78}
79
80impl PacketSpace {
81    pub(super) fn new(now: Instant) -> Self {
82        Self {
83            crypto: None,
84            dedup: Dedup::new(),
85            rx_packet: 0,
86
87            pending: Retransmits::default(),
88            pending_acks: PendingAcks::new(),
89
90            next_packet_number: 0,
91            largest_acked_packet: None,
92            largest_acked_packet_sent: now,
93            largest_ack_eliciting_sent: 0,
94            unacked_non_ack_eliciting_tail: 0,
95            sent_packets: BTreeMap::new(),
96            ecn_counters: frame::EcnCounts::ZERO,
97            ecn_feedback: frame::EcnCounts::ZERO,
98
99            crypto_stream: Assembler::new(),
100            crypto_offset: 0,
101
102            time_of_last_ack_eliciting_packet: None,
103            loss_time: None,
104            loss_probes: 0,
105            ping_pending: false,
106            immediate_ack_pending: false,
107            in_flight: 0,
108            sent_with_keys: 0,
109        }
110    }
111
112    /// Queue data for a tail loss probe (or anti-amplification deadlock prevention) packet
113    ///
114    /// Probes are sent similarly to normal packets when an expected ACK has not arrived. We never
115    /// deem a packet lost until we receive an ACK that should have included it, but if a trailing
116    /// run of packets (or their ACKs) are lost, this might not happen in a timely fashion. We send
117    /// probe packets to force an ACK, and exempt them from congestion control to prevent a deadlock
118    /// when the congestion window is filled with lost tail packets.
119    ///
120    /// We prefer to send new data, to make the most efficient use of bandwidth. If there's no data
121    /// waiting to be sent, then we retransmit in-flight data to reduce odds of loss. If there's no
122    /// in-flight data either, we're probably a client guarding against a handshake
123    /// anti-amplification deadlock and we just make something up.
124    pub(super) fn maybe_queue_probe(
125        &mut self,
126        request_immediate_ack: bool,
127        streams: &StreamsState,
128    ) {
129        if self.loss_probes == 0 {
130            return;
131        }
132
133        if request_immediate_ack {
134            // The probe should be ACKed without delay (should only be used in the Data space and
135            // when the peer supports the acknowledgement frequency extension)
136            self.immediate_ack_pending = true;
137        }
138
139        if !self.pending.is_empty(streams) {
140            // There's real data to send here, no need to make something up
141            return;
142        }
143
144        // Retransmit the data of the oldest in-flight packet
145        for packet in self.sent_packets.values_mut() {
146            if !packet.retransmits.is_empty(streams) {
147                // Remove retransmitted data from the old packet so we don't end up retransmitting
148                // it *again* even if the copy we're sending now gets acknowledged.
149                self.pending |= mem::take(&mut packet.retransmits);
150                return;
151            }
152        }
153
154        // Nothing new to send and nothing to retransmit, so fall back on a ping. This should only
155        // happen in rare cases during the handshake when the server becomes blocked by
156        // anti-amplification.
157        if !self.immediate_ack_pending {
158            self.ping_pending = true;
159        }
160    }
161
162    /// Get the next outgoing packet number in this space
163    ///
164    /// In the Data space, the connection's [`PacketNumberFilter`] must be used rather than calling
165    /// this directly.
166    pub(super) fn get_tx_number(&mut self) -> u64 {
167        // TODO: Handle packet number overflow gracefully
168        assert!(self.next_packet_number < 2u64.pow(62));
169        let x = self.next_packet_number;
170        self.next_packet_number += 1;
171        self.sent_with_keys += 1;
172        x
173    }
174
175    pub(super) fn can_send(&self, streams: &StreamsState) -> SendableFrames {
176        let acks = self.pending_acks.can_send();
177        let other =
178            !self.pending.is_empty(streams) || self.ping_pending || self.immediate_ack_pending;
179
180        SendableFrames { acks, other }
181    }
182
183    /// Verifies sanity of an ECN block and returns whether congestion was encountered.
184    pub(super) fn detect_ecn(
185        &mut self,
186        newly_acked: u64,
187        ecn: frame::EcnCounts,
188    ) -> Result<bool, &'static str> {
189        let ect0_increase = ecn
190            .ect0
191            .checked_sub(self.ecn_feedback.ect0)
192            .ok_or("peer ECT(0) count regression")?;
193        let ect1_increase = ecn
194            .ect1
195            .checked_sub(self.ecn_feedback.ect1)
196            .ok_or("peer ECT(1) count regression")?;
197        let ce_increase = ecn
198            .ce
199            .checked_sub(self.ecn_feedback.ce)
200            .ok_or("peer CE count regression")?;
201        let total_increase = ect0_increase + ect1_increase + ce_increase;
202        if total_increase < newly_acked {
203            return Err("ECN bleaching");
204        }
205        if (ect0_increase + ce_increase) < newly_acked || ect1_increase != 0 {
206            return Err("ECN corruption");
207        }
208        // If total_increase > newly_acked (which happens when ACKs are lost), this is required by
209        // the draft so that long-term drift does not occur. If =, then the only question is whether
210        // to count CE packets as CE or ECT0. Recording them as CE is more consistent and keeps the
211        // congestion check obvious.
212        self.ecn_feedback = ecn;
213        Ok(ce_increase != 0)
214    }
215
216    /// Stop tracking sent packet `number`, and return what we knew about it
217    pub(super) fn take(&mut self, number: u64) -> Option<SentPacket> {
218        let packet = self.sent_packets.remove(&number)?;
219        self.in_flight -= u64::from(packet.size);
220        if !packet.ack_eliciting && number > self.largest_ack_eliciting_sent {
221            self.unacked_non_ack_eliciting_tail =
222                self.unacked_non_ack_eliciting_tail.checked_sub(1).unwrap();
223        }
224        Some(packet)
225    }
226
227    /// Returns the number of bytes to *remove* from the connection's in-flight count
228    pub(super) fn sent(&mut self, number: u64, packet: SentPacket) -> u64 {
229        // Retain state for at most this many non-ACK-eliciting packets sent after the most recently
230        // sent ACK-eliciting packet. We're never guaranteed to receive an ACK for those, and we
231        // can't judge them as lost without an ACK, so to limit memory in applications which receive
232        // packets but don't send ACK-eliciting data for long periods use we must eventually start
233        // forgetting about them, although it might also be reasonable to just kill the connection
234        // due to weird peer behavior.
235        const MAX_UNACKED_NON_ACK_ELICTING_TAIL: u64 = 1_000;
236
237        let mut forgotten_bytes = 0;
238        if packet.ack_eliciting {
239            self.unacked_non_ack_eliciting_tail = 0;
240            self.largest_ack_eliciting_sent = number;
241        } else if self.unacked_non_ack_eliciting_tail > MAX_UNACKED_NON_ACK_ELICTING_TAIL {
242            let oldest_after_ack_eliciting = *self
243                .sent_packets
244                .range((
245                    Bound::Excluded(self.largest_ack_eliciting_sent),
246                    Bound::Unbounded,
247                ))
248                .next()
249                .unwrap()
250                .0;
251            // Per https://www.rfc-editor.org/rfc/rfc9000.html#name-frames-and-frame-types,
252            // non-ACK-eliciting packets must only contain PADDING, ACK, and CONNECTION_CLOSE
253            // frames, which require no special handling on ACK or loss beyond removal from
254            // in-flight counters if padded.
255            let packet = self
256                .sent_packets
257                .remove(&oldest_after_ack_eliciting)
258                .unwrap();
259            forgotten_bytes = u64::from(packet.size);
260            self.in_flight -= forgotten_bytes;
261        } else {
262            self.unacked_non_ack_eliciting_tail += 1;
263        }
264
265        self.in_flight += u64::from(packet.size);
266        self.sent_packets.insert(number, packet);
267        forgotten_bytes
268    }
269}
270
271impl Index<SpaceId> for [PacketSpace; 3] {
272    type Output = PacketSpace;
273    fn index(&self, space: SpaceId) -> &PacketSpace {
274        &self.as_ref()[space as usize]
275    }
276}
277
278impl IndexMut<SpaceId> for [PacketSpace; 3] {
279    fn index_mut(&mut self, space: SpaceId) -> &mut PacketSpace {
280        &mut self.as_mut()[space as usize]
281    }
282}
283
284/// Represents one or more packets subject to retransmission
285#[derive(Debug, Clone)]
286pub(super) struct SentPacket {
287    /// The time the packet was sent.
288    pub(super) time_sent: Instant,
289    /// The number of bytes sent in the packet, not including UDP or IP overhead, but including QUIC
290    /// framing overhead. Zero if this packet is not counted towards congestion control, i.e. not an
291    /// "in flight" packet.
292    pub(super) size: u16,
293    /// Whether an acknowledgement is expected directly in response to this packet.
294    pub(super) ack_eliciting: bool,
295    /// The largest packet number acknowledged by this packet
296    pub(super) largest_acked: Option<u64>,
297    /// Data which needs to be retransmitted in case the packet is lost.
298    /// The data is boxed to minimize `SentPacket` size for the typical case of
299    /// packets only containing ACKs and STREAM frames.
300    pub(super) retransmits: ThinRetransmits,
301    /// Metadata for stream frames in a packet
302    ///
303    /// The actual application data is stored with the stream state.
304    pub(super) stream_frames: frame::StreamMetaVec,
305}
306
307/// Retransmittable data queue
308#[derive(Debug, Default, Clone)]
309pub struct Retransmits {
310    pub(super) max_data: bool,
311    pub(super) max_stream_id: [bool; 2],
312    pub(super) reset_stream: Vec<(StreamId, VarInt)>,
313    pub(super) stop_sending: Vec<frame::StopSending>,
314    pub(super) max_stream_data: FxHashSet<StreamId>,
315    pub(super) crypto: VecDeque<frame::Crypto>,
316    pub(super) new_cids: Vec<IssuedCid>,
317    pub(super) retire_cids: Vec<u64>,
318    pub(super) ack_frequency: bool,
319    pub(super) handshake_done: bool,
320    /// For each enqueued NEW_TOKEN frame, a copy of the path's remote address
321    ///
322    /// There are 2 reasons this is unusual:
323    ///
324    /// - If the path changes, NEW_TOKEN frames bound for the old path are not retransmitted on the
325    ///   new path. That is why this field stores the remote address: so that ones for old paths
326    ///   can be filtered out.
327    /// - If a token is lost, a new randomly generated token is re-transmitted, rather than the
328    ///   original. This is so that if both transmissions are received, the client won't risk
329    ///   sending the same token twice. That is why this field does _not_ store any actual token.
330    ///
331    /// It is true that a QUIC endpoint will only want to effectively have NEW_TOKEN frames
332    /// enqueued for its current path at a given point in time. Based on that, we could conceivably
333    /// change this from a vector to an `Option<(SocketAddr, usize)>` or just a `usize` or
334    /// something. However, due to the architecture of Quinn, it is considerably simpler to not do
335    /// that; consider what such a change would mean for implementing `BitOrAssign` on Self.
336    pub(super) new_tokens: Vec<SocketAddr>,
337    /// NAT traversal AddAddress frames to be sent
338    pub(super) add_addresses: Vec<frame::AddAddress>,
339    /// NAT traversal PunchMeNow frames to be sent
340    pub(super) punch_me_now: Vec<frame::PunchMeNow>,
341    /// NAT traversal RemoveAddress frames to be sent
342    pub(super) remove_addresses: Vec<frame::RemoveAddress>,
343    /// OBSERVED_ADDRESS frames to be sent
344    pub(super) observed_addresses: Vec<frame::ObservedAddress>,
345}
346
347impl Retransmits {
348    pub(super) fn is_empty(&self, streams: &StreamsState) -> bool {
349        !self.max_data
350            && !self.max_stream_id.into_iter().any(|x| x)
351            && self.reset_stream.is_empty()
352            && self.stop_sending.is_empty()
353            && self
354                .max_stream_data
355                .iter()
356                .all(|&id| !streams.can_send_flow_control(id))
357            && self.crypto.is_empty()
358            && self.new_cids.is_empty()
359            && self.retire_cids.is_empty()
360            && !self.ack_frequency
361            && !self.handshake_done
362            && self.new_tokens.is_empty()
363            && self.add_addresses.is_empty()
364            && self.punch_me_now.is_empty()
365            && self.remove_addresses.is_empty()
366            && self.observed_addresses.is_empty()
367    }
368}
369
370impl ::std::ops::BitOrAssign for Retransmits {
371    fn bitor_assign(&mut self, rhs: Self) {
372        // We reduce in-stream head-of-line blocking by queueing retransmits before other data for
373        // STREAM and CRYPTO frames.
374        self.max_data |= rhs.max_data;
375        for dir in Dir::iter() {
376            self.max_stream_id[dir as usize] |= rhs.max_stream_id[dir as usize];
377        }
378        self.reset_stream.extend_from_slice(&rhs.reset_stream);
379        self.stop_sending.extend_from_slice(&rhs.stop_sending);
380        self.max_stream_data.extend(&rhs.max_stream_data);
381        for crypto in rhs.crypto.into_iter().rev() {
382            self.crypto.push_front(crypto);
383        }
384        self.new_cids.extend(&rhs.new_cids);
385        self.retire_cids.extend(rhs.retire_cids);
386        self.ack_frequency |= rhs.ack_frequency;
387        self.handshake_done |= rhs.handshake_done;
388        self.new_tokens.extend_from_slice(&rhs.new_tokens);
389        self.add_addresses.extend_from_slice(&rhs.add_addresses);
390        self.punch_me_now.extend_from_slice(&rhs.punch_me_now);
391        self.remove_addresses
392            .extend_from_slice(&rhs.remove_addresses);
393        self.observed_addresses
394            .extend_from_slice(&rhs.observed_addresses);
395    }
396}
397
398impl ::std::ops::BitOrAssign<ThinRetransmits> for Retransmits {
399    fn bitor_assign(&mut self, rhs: ThinRetransmits) {
400        if let Some(retransmits) = rhs.retransmits {
401            self.bitor_assign(*retransmits)
402        }
403    }
404}
405
406impl ::std::iter::FromIterator<Self> for Retransmits {
407    fn from_iter<T>(iter: T) -> Self
408    where
409        T: IntoIterator<Item = Self>,
410    {
411        let mut result = Self::default();
412        for packet in iter {
413            result |= packet;
414        }
415        result
416    }
417}
418
419/// A variant of `Retransmits` which only allocates storage when required
420#[derive(Debug, Default, Clone)]
421pub(super) struct ThinRetransmits {
422    retransmits: Option<Box<Retransmits>>,
423}
424
425impl ThinRetransmits {
426    /// Returns `true` if no retransmits are necessary
427    pub(super) fn is_empty(&self, streams: &StreamsState) -> bool {
428        match &self.retransmits {
429            Some(retransmits) => retransmits.is_empty(streams),
430            None => true,
431        }
432    }
433
434    /// Returns a reference to the retransmits stored in this box
435    pub(super) fn get(&self) -> Option<&Retransmits> {
436        self.retransmits.as_deref()
437    }
438
439    /// Returns a mutable reference to the stored retransmits
440    ///
441    /// This function will allocate a backing storage if required.
442    pub(super) fn get_or_create(&mut self) -> &mut Retransmits {
443        if self.retransmits.is_none() {
444            self.retransmits = Some(Box::default());
445        }
446        self.retransmits.as_deref_mut().unwrap()
447    }
448}
449
450/// RFC4303-style sliding window packet number deduplicator.
451///
452/// A contiguous bitfield, where each bit corresponds to a packet number and the rightmost bit is
453/// always set. A set bit represents a packet that has been successfully authenticated. Bits left of
454/// the window are assumed to be set.
455///
456/// ```text
457/// ...xxxxxxxxx 1 0
458///     ^        ^ ^
459/// window highest next
460/// ```
461pub(super) struct Dedup {
462    window: Window,
463    /// Lowest packet number higher than all yet authenticated.
464    next: u64,
465}
466
467/// Inner bitfield type.
468///
469/// Because QUIC never reuses packet numbers, this only needs to be large enough to deal with
470/// packets that are reordered but still delivered in a timely manner.
471type Window = u128;
472
473/// Number of packets tracked by `Dedup`.
474const WINDOW_SIZE: u64 = 1 + mem::size_of::<Window>() as u64 * 8;
475
476impl Dedup {
477    /// Construct an empty window positioned at the start.
478    pub(super) fn new() -> Self {
479        Self { window: 0, next: 0 }
480    }
481
482    /// Highest packet number authenticated.
483    fn highest(&self) -> u64 {
484        self.next - 1
485    }
486
487    /// Record a newly authenticated packet number.
488    ///
489    /// Returns whether the packet might be a duplicate.
490    pub(super) fn insert(&mut self, packet: u64) -> bool {
491        if let Some(diff) = packet.checked_sub(self.next) {
492            // Right of window
493            self.window = ((self.window << 1) | 1)
494                .checked_shl(cmp::min(diff, u64::from(u32::MAX)) as u32)
495                .unwrap_or(0);
496            self.next = packet + 1;
497            false
498        } else if self.highest() - packet < WINDOW_SIZE {
499            // Within window
500            if let Some(bit) = (self.highest() - packet).checked_sub(1) {
501                // < highest
502                let mask = 1 << bit;
503                let duplicate = self.window & mask != 0;
504                self.window |= mask;
505                duplicate
506            } else {
507                // == highest
508                true
509            }
510        } else {
511            // Left of window
512            true
513        }
514    }
515
516    /// Returns the packet number of the smallest packet missing between the provided interval
517    ///
518    /// If there are no missing packets, returns `None`
519    fn smallest_missing_in_interval(&self, lower_bound: u64, upper_bound: u64) -> Option<u64> {
520        debug_assert!(lower_bound <= upper_bound);
521        debug_assert!(upper_bound <= self.highest());
522        const BITFIELD_SIZE: u64 = (mem::size_of::<Window>() * 8) as u64;
523
524        // Since we already know the packets at the boundaries have been received, we only need to
525        // check those in between them (this removes the necessity of extra logic to deal with the
526        // highest packet, which is stored outside the bitfield)
527        let lower_bound = lower_bound + 1;
528        let upper_bound = upper_bound.saturating_sub(1);
529
530        // Note: the offsets are counted from the right
531        // The highest packet is not included in the bitfield, so we subtract 1 to account for that
532        let start_offset = (self.highest() - upper_bound).max(1) - 1;
533        if start_offset >= BITFIELD_SIZE {
534            // The start offset is outside of the window. All packets outside of the window are
535            // considered to be received.
536            return None;
537        }
538
539        let end_offset_exclusive = self.highest().saturating_sub(lower_bound);
540
541        // The range is clamped at the edge of the window, because any earlier packets are
542        // considered to be received
543        let range_len = end_offset_exclusive
544            .saturating_sub(start_offset)
545            .min(BITFIELD_SIZE);
546        if range_len == 0 {
547            return None;
548        }
549
550        // Ensure the shift is within bounds (we already know start_offset < BITFIELD_SIZE,
551        // because of the early return)
552        let mask = if range_len == BITFIELD_SIZE {
553            u128::MAX
554        } else {
555            ((1u128 << range_len) - 1) << start_offset
556        };
557        let gaps = !self.window & mask;
558
559        let smallest_missing_offset = 128 - gaps.leading_zeros() as u64;
560        let smallest_missing_packet = self.highest() - smallest_missing_offset;
561
562        if smallest_missing_packet <= upper_bound {
563            Some(smallest_missing_packet)
564        } else {
565            None
566        }
567    }
568
569    /// Returns true if there are any missing packets between the provided interval
570    ///
571    /// The provided packet numbers must have been received before calling this function
572    fn missing_in_interval(&self, lower_bound: u64, upper_bound: u64) -> bool {
573        self.smallest_missing_in_interval(lower_bound, upper_bound)
574            .is_some()
575    }
576}
577
578/// Indicates which data is available for sending
579#[derive(Clone, Copy, PartialEq, Eq, Debug)]
580pub(super) struct SendableFrames {
581    pub(super) acks: bool,
582    pub(super) other: bool,
583}
584
585impl SendableFrames {
586    /// Returns that no data is available for sending
587    pub(super) fn empty() -> Self {
588        Self {
589            acks: false,
590            other: false,
591        }
592    }
593
594    /// Whether no data is sendable
595    pub(super) fn is_empty(&self) -> bool {
596        !self.acks && !self.other
597    }
598}
599
600#[derive(Debug)]
601pub(super) struct PendingAcks {
602    /// Whether we should send an ACK immediately, even if that means sending an ACK-only packet
603    ///
604    /// When `immediate_ack_required` is false, the normal behavior is to send ACK frames only when
605    /// there is other data to send, or when the `MaxAckDelay` timer expires.
606    immediate_ack_required: bool,
607    /// The number of ack-eliciting packets received since the last ACK frame was sent
608    ///
609    /// Once the count _exceeds_ `ack_eliciting_threshold`, an immediate ACK is required
610    ack_eliciting_since_last_ack_sent: u64,
611    non_ack_eliciting_since_last_ack_sent: u64,
612    ack_eliciting_threshold: u64,
613    /// The reordering threshold, controlling how we respond to out-of-order ack-eliciting packets
614    ///
615    /// Different values enable different behavior:
616    ///
617    /// * `0`: no special action is taken
618    /// * `1`: an ACK is immediately sent if it is out-of-order according to RFC 9000
619    /// * `>1`: an ACK is immediately sent if it is out-of-order according to the ACK frequency draft
620    reordering_threshold: u64,
621    /// The earliest ack-eliciting packet since the last ACK was sent, used to calculate the moment
622    /// upon which `max_ack_delay` elapses
623    earliest_ack_eliciting_since_last_ack_sent: Option<Instant>,
624    /// The packet number ranges of ack-eliciting packets the peer hasn't confirmed receipt of ACKs
625    /// for
626    ranges: ArrayRangeSet,
627    /// The packet with the largest packet number, and the time upon which it was received (used to
628    /// calculate ACK delay in [`PendingAcks::ack_delay`])
629    largest_packet: Option<(u64, Instant)>,
630    /// The ack-eliciting packet we have received with the largest packet number
631    largest_ack_eliciting_packet: Option<u64>,
632    /// The largest acknowledged packet number sent in an ACK frame
633    largest_acked: Option<u64>,
634}
635
636impl PendingAcks {
637    fn new() -> Self {
638        Self {
639            immediate_ack_required: false,
640            ack_eliciting_since_last_ack_sent: 0,
641            non_ack_eliciting_since_last_ack_sent: 0,
642            ack_eliciting_threshold: 1,
643            reordering_threshold: 1,
644            earliest_ack_eliciting_since_last_ack_sent: None,
645            ranges: ArrayRangeSet::default(),
646            largest_packet: None,
647            largest_ack_eliciting_packet: None,
648            largest_acked: None,
649        }
650    }
651
652    pub(super) fn set_ack_frequency_params(&mut self, frame: &frame::AckFrequency) {
653        self.ack_eliciting_threshold = frame.ack_eliciting_threshold.into_inner();
654        self.reordering_threshold = frame.reordering_threshold.into_inner();
655    }
656
657    pub(super) fn set_immediate_ack_required(&mut self) {
658        self.immediate_ack_required = true;
659    }
660
661    pub(super) fn on_max_ack_delay_timeout(&mut self) {
662        self.immediate_ack_required = self.ack_eliciting_since_last_ack_sent > 0;
663    }
664
665    pub(super) fn max_ack_delay_timeout(&self, max_ack_delay: Duration) -> Option<Instant> {
666        self.earliest_ack_eliciting_since_last_ack_sent
667            .map(|earliest_unacked| earliest_unacked + max_ack_delay)
668    }
669
670    /// Whether any ACK frames can be sent
671    pub(super) fn can_send(&self) -> bool {
672        self.immediate_ack_required && !self.ranges.is_empty()
673    }
674
675    /// Returns the delay since the packet with the largest packet number was received
676    pub(super) fn ack_delay(&self, now: Instant) -> Duration {
677        self.largest_packet
678            .map_or(Duration::default(), |(_, received)| now - received)
679    }
680
681    /// Handle receipt of a new packet
682    ///
683    /// Returns true if the max ack delay timer should be armed
684    pub(super) fn packet_received(
685        &mut self,
686        now: Instant,
687        packet_number: u64,
688        ack_eliciting: bool,
689        dedup: &Dedup,
690    ) -> bool {
691        if !ack_eliciting {
692            self.non_ack_eliciting_since_last_ack_sent += 1;
693            return false;
694        }
695
696        let prev_largest_ack_eliciting = self.largest_ack_eliciting_packet.unwrap_or(0);
697
698        // Track largest ack-eliciting packet
699        self.largest_ack_eliciting_packet = self
700            .largest_ack_eliciting_packet
701            .map(|pn| pn.max(packet_number))
702            .or(Some(packet_number));
703
704        // Handle ack_eliciting_threshold
705        self.ack_eliciting_since_last_ack_sent += 1;
706        self.immediate_ack_required |=
707            self.ack_eliciting_since_last_ack_sent > self.ack_eliciting_threshold;
708
709        // Handle out-of-order packets
710        self.immediate_ack_required |=
711            self.is_out_of_order(packet_number, prev_largest_ack_eliciting, dedup);
712
713        // Arm max_ack_delay timer if necessary
714        if self.earliest_ack_eliciting_since_last_ack_sent.is_none() && !self.can_send() {
715            self.earliest_ack_eliciting_since_last_ack_sent = Some(now);
716            return true;
717        }
718
719        false
720    }
721
722    fn is_out_of_order(
723        &self,
724        packet_number: u64,
725        prev_largest_ack_eliciting: u64,
726        dedup: &Dedup,
727    ) -> bool {
728        match self.reordering_threshold {
729            0 => false,
730            1 => {
731                // From https://www.rfc-editor.org/rfc/rfc9000#section-13.2.1-7
732                packet_number < prev_largest_ack_eliciting
733                    || dedup.missing_in_interval(prev_largest_ack_eliciting, packet_number)
734            }
735            _ => {
736                // From acknowledgement frequency draft, section 6.1: send an ACK immediately if
737                // doing so would cause the sender to detect a new packet loss
738                let Some((largest_acked, largest_unacked)) =
739                    self.largest_acked.zip(self.largest_ack_eliciting_packet)
740                else {
741                    return false;
742                };
743                if self.reordering_threshold > largest_acked {
744                    return false;
745                }
746                // The largest packet number that could be declared lost without a new ACK being
747                // sent
748                let largest_reported = largest_acked - self.reordering_threshold + 1;
749                let Some(smallest_missing_unreported) =
750                    dedup.smallest_missing_in_interval(largest_reported, largest_unacked)
751                else {
752                    return false;
753                };
754                largest_unacked - smallest_missing_unreported >= self.reordering_threshold
755            }
756        }
757    }
758
759    /// Should be called whenever ACKs have been sent
760    ///
761    /// This will suppress sending further ACKs until additional ACK eliciting frames arrive
762    pub(super) fn acks_sent(&mut self) {
763        // It is possible (though unlikely) that the ACKs we just sent do not cover all the
764        // ACK-eliciting packets we have received (e.g. if there is not enough room in the packet to
765        // fit all the ranges). To keep things simple, however, we assume they do. If there are
766        // indeed some ACKs that weren't covered, the packets might be ACKed later anyway, because
767        // they are still contained in `self.ranges`. If we somehow fail to send the ACKs at a later
768        // moment, the peer will assume the packets got lost and will retransmit their frames in a
769        // new packet, which is suboptimal, because we already received them. Our assumption here is
770        // that simplicity results in code that is more performant, even in the presence of
771        // occasional redundant retransmits.
772        self.immediate_ack_required = false;
773        self.ack_eliciting_since_last_ack_sent = 0;
774        self.non_ack_eliciting_since_last_ack_sent = 0;
775        self.earliest_ack_eliciting_since_last_ack_sent = None;
776        self.largest_acked = self.largest_ack_eliciting_packet;
777    }
778
779    /// Insert one packet that needs to be acknowledged
780    pub(super) fn insert_one(&mut self, packet: u64, now: Instant) {
781        self.ranges.insert_one(packet);
782
783        if self.largest_packet.is_none_or(|(pn, _)| packet > pn) {
784            self.largest_packet = Some((packet, now));
785        }
786
787        if self.ranges.len() > MAX_ACK_BLOCKS {
788            self.ranges.pop_min();
789        }
790    }
791
792    /// Remove ACKs of packets numbered at or below `max` from the set of pending ACKs
793    pub(super) fn subtract_below(&mut self, max: u64) {
794        self.ranges.remove(0..(max + 1));
795    }
796
797    /// Returns the set of currently pending ACK ranges
798    pub(super) fn ranges(&self) -> &ArrayRangeSet {
799        &self.ranges
800    }
801
802    /// Queue an ACK if a significant number of non-ACK-eliciting packets have not yet been
803    /// acknowledged
804    ///
805    /// Should be called immediately before a non-probing packet is composed, when we've already
806    /// committed to sending a packet regardless.
807    pub(super) fn maybe_ack_non_eliciting(&mut self) {
808        // If we're going to send a packet anyway, and we've received a significant number of
809        // non-ACK-eliciting packets, then include an ACK to help the peer perform timely loss
810        // detection even if they're not sending any ACK-eliciting packets themselves. Exact
811        // threshold chosen somewhat arbitrarily.
812        const LAZY_ACK_THRESHOLD: u64 = 10;
813        if self.non_ack_eliciting_since_last_ack_sent > LAZY_ACK_THRESHOLD {
814            self.immediate_ack_required = true;
815        }
816    }
817}
818
819/// Helper for mitigating [optimistic ACK attacks]
820///
821/// A malicious peer could prompt the local application to begin a large data transfer, and then
822/// send ACKs without first waiting for data to be received. This could defeat congestion control,
823/// allowing the connection to consume disproportionate resources. We therefore occasionally skip
824/// packet numbers, and classify any ACK referencing a skipped packet number as a transport error.
825///
826/// Skipped packet numbers occur only in the application data space (where costly transfers might
827/// take place) and are distributed exponentially to reflect the reduced likelihood and impact of
828/// bad behavior from a peer that has been well-behaved for an extended period.
829///
830/// ACKs for packet numbers that have not yet been allocated are also a transport error, but an
831/// attacker with knowledge of the congestion control algorithm in use could time falsified ACKs to
832/// arrive after the packets they reference are sent.
833///
834/// [optimistic ACK attacks]: https://www.rfc-editor.org/rfc/rfc9000.html#name-optimistic-ack-attack
835pub(super) struct PacketNumberFilter {
836    /// Next outgoing packet number to skip
837    next_skipped_packet_number: u64,
838    /// Most recently skipped packet number
839    prev_skipped_packet_number: Option<u64>,
840    /// Next packet number to skip is randomly selected from 2^n..2^n+1
841    exponent: u32,
842}
843
844impl PacketNumberFilter {
845    pub(super) fn new(rng: &mut (impl Rng + ?Sized)) -> Self {
846        // First skipped PN is in 0..64
847        let exponent = 6;
848        Self {
849            next_skipped_packet_number: rng.gen_range(0..2u64.saturating_pow(exponent)),
850            prev_skipped_packet_number: None,
851            exponent,
852        }
853    }
854
855    #[cfg(test)]
856    pub(super) fn disabled() -> Self {
857        Self {
858            next_skipped_packet_number: u64::MAX,
859            prev_skipped_packet_number: None,
860            exponent: u32::MAX,
861        }
862    }
863
864    pub(super) fn peek(&self, space: &PacketSpace) -> u64 {
865        let n = space.next_packet_number;
866        if n != self.next_skipped_packet_number {
867            return n;
868        }
869        n + 1
870    }
871
872    pub(super) fn allocate(
873        &mut self,
874        rng: &mut (impl Rng + ?Sized),
875        space: &mut PacketSpace,
876    ) -> u64 {
877        let n = space.get_tx_number();
878        if n != self.next_skipped_packet_number {
879            return n;
880        }
881
882        trace!("skipping pn {n}");
883        // Skip this packet number, and choose the next one to skip
884        self.prev_skipped_packet_number = Some(self.next_skipped_packet_number);
885        let next_exponent = self.exponent.saturating_add(1);
886        self.next_skipped_packet_number =
887            rng.gen_range(2u64.saturating_pow(self.exponent)..2u64.saturating_pow(next_exponent));
888        self.exponent = next_exponent;
889
890        space.get_tx_number()
891    }
892
893    pub(super) fn check_ack(
894        &self,
895        space_id: SpaceId,
896        range: std::ops::RangeInclusive<u64>,
897    ) -> Result<(), TransportError> {
898        if space_id == SpaceId::Data
899            && self
900                .prev_skipped_packet_number
901                .is_some_and(|x| range.contains(&x))
902        {
903            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
904        }
905        Ok(())
906    }
907}
908
909/// Ensures we can always fit all our ACKs in a single minimum-MTU packet with room to spare
910const MAX_ACK_BLOCKS: usize = 64;
911
912#[cfg(test)]
913mod test {
914    use super::*;
915
916    #[test]
917    fn sanity() {
918        let mut dedup = Dedup::new();
919        assert!(!dedup.insert(0));
920        assert_eq!(dedup.next, 1);
921        assert_eq!(dedup.window, 0b1);
922        assert!(dedup.insert(0));
923        assert_eq!(dedup.next, 1);
924        assert_eq!(dedup.window, 0b1);
925        assert!(!dedup.insert(1));
926        assert_eq!(dedup.next, 2);
927        assert_eq!(dedup.window, 0b11);
928        assert!(!dedup.insert(2));
929        assert_eq!(dedup.next, 3);
930        assert_eq!(dedup.window, 0b111);
931        assert!(!dedup.insert(4));
932        assert_eq!(dedup.next, 5);
933        assert_eq!(dedup.window, 0b11110);
934        assert!(!dedup.insert(7));
935        assert_eq!(dedup.next, 8);
936        assert_eq!(dedup.window, 0b1111_0100);
937        assert!(dedup.insert(4));
938        assert!(!dedup.insert(3));
939        assert_eq!(dedup.next, 8);
940        assert_eq!(dedup.window, 0b1111_1100);
941        assert!(!dedup.insert(6));
942        assert_eq!(dedup.next, 8);
943        assert_eq!(dedup.window, 0b1111_1101);
944        assert!(!dedup.insert(5));
945        assert_eq!(dedup.next, 8);
946        assert_eq!(dedup.window, 0b1111_1111);
947    }
948
949    #[test]
950    fn happypath() {
951        let mut dedup = Dedup::new();
952        for i in 0..(2 * WINDOW_SIZE) {
953            assert!(!dedup.insert(i));
954            for j in 0..=i {
955                assert!(dedup.insert(j));
956            }
957        }
958    }
959
960    #[test]
961    fn jump() {
962        let mut dedup = Dedup::new();
963        dedup.insert(2 * WINDOW_SIZE);
964        assert!(dedup.insert(WINDOW_SIZE));
965        assert_eq!(dedup.next, 2 * WINDOW_SIZE + 1);
966        assert_eq!(dedup.window, 0);
967        assert!(!dedup.insert(WINDOW_SIZE + 1));
968        assert_eq!(dedup.next, 2 * WINDOW_SIZE + 1);
969        assert_eq!(dedup.window, 1 << (WINDOW_SIZE - 2));
970    }
971
972    #[test]
973    fn dedup_has_missing() {
974        let mut dedup = Dedup::new();
975
976        dedup.insert(0);
977        assert!(!dedup.missing_in_interval(0, 0));
978
979        dedup.insert(1);
980        assert!(!dedup.missing_in_interval(0, 1));
981
982        dedup.insert(3);
983        assert!(dedup.missing_in_interval(1, 3));
984
985        dedup.insert(4);
986        assert!(!dedup.missing_in_interval(3, 4));
987        assert!(dedup.missing_in_interval(0, 4));
988
989        dedup.insert(2);
990        assert!(!dedup.missing_in_interval(0, 4));
991    }
992
993    #[test]
994    fn dedup_outside_of_window_has_missing() {
995        let mut dedup = Dedup::new();
996
997        for i in 0..140 {
998            dedup.insert(i);
999        }
1000
1001        // 0 and 4 are outside of the window
1002        assert!(!dedup.missing_in_interval(0, 4));
1003        dedup.insert(160);
1004        assert!(!dedup.missing_in_interval(0, 4));
1005        assert!(!dedup.missing_in_interval(0, 140));
1006        assert!(dedup.missing_in_interval(0, 160));
1007    }
1008
1009    #[test]
1010    fn dedup_smallest_missing() {
1011        let mut dedup = Dedup::new();
1012
1013        dedup.insert(0);
1014        assert_eq!(dedup.smallest_missing_in_interval(0, 0), None);
1015
1016        dedup.insert(1);
1017        assert_eq!(dedup.smallest_missing_in_interval(0, 1), None);
1018
1019        dedup.insert(5);
1020        dedup.insert(7);
1021        assert_eq!(dedup.smallest_missing_in_interval(0, 7), Some(2));
1022        assert_eq!(dedup.smallest_missing_in_interval(5, 7), Some(6));
1023
1024        dedup.insert(2);
1025        assert_eq!(dedup.smallest_missing_in_interval(1, 7), Some(3));
1026
1027        dedup.insert(170);
1028        dedup.insert(172);
1029        dedup.insert(300);
1030        assert_eq!(dedup.smallest_missing_in_interval(170, 172), None);
1031
1032        dedup.insert(500);
1033        assert_eq!(dedup.smallest_missing_in_interval(0, 500), Some(372));
1034        assert_eq!(dedup.smallest_missing_in_interval(0, 373), Some(372));
1035        assert_eq!(dedup.smallest_missing_in_interval(0, 372), None);
1036    }
1037
1038    #[test]
1039    fn pending_acks_first_packet_is_not_considered_reordered() {
1040        let mut acks = PendingAcks::new();
1041        let mut dedup = Dedup::new();
1042        dedup.insert(0);
1043        acks.packet_received(Instant::now(), 0, true, &dedup);
1044        assert!(!acks.immediate_ack_required);
1045    }
1046
1047    #[test]
1048    fn pending_acks_after_immediate_ack_set() {
1049        let mut acks = PendingAcks::new();
1050        let mut dedup = Dedup::new();
1051
1052        // Receive ack-eliciting packet
1053        dedup.insert(0);
1054        let now = Instant::now();
1055        acks.insert_one(0, now);
1056        acks.packet_received(now, 0, true, &dedup);
1057
1058        // Sanity check
1059        assert!(!acks.ranges.is_empty());
1060        assert!(!acks.can_send());
1061
1062        // Can send ACK after max_ack_delay exceeded
1063        acks.set_immediate_ack_required();
1064        assert!(acks.can_send());
1065    }
1066
1067    #[test]
1068    fn pending_acks_ack_delay() {
1069        let mut acks = PendingAcks::new();
1070        let mut dedup = Dedup::new();
1071
1072        let t1 = Instant::now();
1073        let t2 = t1 + Duration::from_millis(2);
1074        let t3 = t2 + Duration::from_millis(5);
1075        assert_eq!(acks.ack_delay(t1), Duration::from_millis(0));
1076        assert_eq!(acks.ack_delay(t2), Duration::from_millis(0));
1077        assert_eq!(acks.ack_delay(t3), Duration::from_millis(0));
1078
1079        // In-order packet
1080        dedup.insert(0);
1081        acks.insert_one(0, t1);
1082        acks.packet_received(t1, 0, true, &dedup);
1083        assert_eq!(acks.ack_delay(t1), Duration::from_millis(0));
1084        assert_eq!(acks.ack_delay(t2), Duration::from_millis(2));
1085        assert_eq!(acks.ack_delay(t3), Duration::from_millis(7));
1086
1087        // Out of order (higher than expected)
1088        dedup.insert(3);
1089        acks.insert_one(3, t2);
1090        acks.packet_received(t2, 3, true, &dedup);
1091        assert_eq!(acks.ack_delay(t2), Duration::from_millis(0));
1092        assert_eq!(acks.ack_delay(t3), Duration::from_millis(5));
1093
1094        // Out of order (lower than expected, so previous instant is kept)
1095        dedup.insert(2);
1096        acks.insert_one(2, t3);
1097        acks.packet_received(t3, 2, true, &dedup);
1098        assert_eq!(acks.ack_delay(t3), Duration::from_millis(5));
1099    }
1100
1101    #[test]
1102    fn sent_packet_size() {
1103        // The tracking state of sent packets should be minimal, and not grow
1104        // over time.
1105        assert!(std::mem::size_of::<SentPacket>() <= 128);
1106    }
1107}