ant_quic/connection/
spaces.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque},
4    mem,
5    ops::{Bound, Index, IndexMut},
6};
7
8use rand::Rng;
9use rustc_hash::FxHashSet;
10use tracing::trace;
11
12use super::assembler::Assembler;
13use crate::{
14    Dir, Duration, Instant, SocketAddr, StreamId, TransportError, VarInt, connection::StreamsState,
15    crypto::Keys, frame, packet::SpaceId, range_set::ArrayRangeSet, shared::IssuedCid,
16};
17
18pub(super) struct PacketSpace {
19    pub(super) crypto: Option<Keys>,
20    pub(super) dedup: Dedup,
21    /// Highest received packet number
22    pub(super) rx_packet: u64,
23
24    /// Data to send
25    pub(super) pending: Retransmits,
26    /// Packet numbers to acknowledge
27    pub(super) pending_acks: PendingAcks,
28
29    /// The packet number of the next packet that will be sent, if any. In the Data space, the
30    /// packet number stored here is sometimes skipped by [`PacketNumberFilter`] logic.
31    pub(super) next_packet_number: u64,
32    /// The largest packet number the remote peer acknowledged in an ACK frame.
33    pub(super) largest_acked_packet: Option<u64>,
34    pub(super) largest_acked_packet_sent: Instant,
35    /// The highest-numbered ACK-eliciting packet we've sent
36    pub(super) largest_ack_eliciting_sent: u64,
37    /// Number of packets in `sent_packets` with numbers above `largest_ack_eliciting_sent`
38    pub(super) unacked_non_ack_eliciting_tail: u64,
39    /// Transmitted but not acked
40    // We use a BTreeMap here so we can efficiently query by range on ACK and for loss detection
41    pub(super) sent_packets: BTreeMap<u64, SentPacket>,
42    /// Number of explicit congestion notification codepoints seen on incoming packets
43    pub(super) ecn_counters: frame::EcnCounts,
44    /// Recent ECN counters sent by the peer in ACK frames
45    ///
46    /// Updated (and inspected) whenever we receive an ACK with a new highest acked packet
47    /// number. Stored per-space to simplify verification, which would otherwise have difficulty
48    /// distinguishing between ECN bleaching and counts having been updated by a near-simultaneous
49    /// ACK already processed in another space.
50    pub(super) ecn_feedback: frame::EcnCounts,
51
52    /// Incoming cryptographic handshake stream
53    pub(super) crypto_stream: Assembler,
54    /// Current offset of outgoing cryptographic handshake stream
55    pub(super) crypto_offset: u64,
56
57    /// The time the most recently sent retransmittable packet was sent.
58    pub(super) time_of_last_ack_eliciting_packet: Option<Instant>,
59    /// The time at which the earliest sent packet in this space will be considered lost based on
60    /// exceeding the reordering window in time. Only set for packets numbered prior to a packet
61    /// that has been acknowledged.
62    pub(super) loss_time: Option<Instant>,
63    /// Number of tail loss probes to send
64    pub(super) loss_probes: u32,
65    pub(super) ping_pending: bool,
66    pub(super) immediate_ack_pending: bool,
67    /// Number of congestion control "in flight" bytes
68    pub(super) in_flight: u64,
69    /// Number of packets sent in the current key phase
70    pub(super) sent_with_keys: u64,
71}
72
73impl PacketSpace {
74    pub(super) fn new(now: Instant) -> Self {
75        Self {
76            crypto: None,
77            dedup: Dedup::new(),
78            rx_packet: 0,
79
80            pending: Retransmits::default(),
81            pending_acks: PendingAcks::new(),
82
83            next_packet_number: 0,
84            largest_acked_packet: None,
85            largest_acked_packet_sent: now,
86            largest_ack_eliciting_sent: 0,
87            unacked_non_ack_eliciting_tail: 0,
88            sent_packets: BTreeMap::new(),
89            ecn_counters: frame::EcnCounts::ZERO,
90            ecn_feedback: frame::EcnCounts::ZERO,
91
92            crypto_stream: Assembler::new(),
93            crypto_offset: 0,
94
95            time_of_last_ack_eliciting_packet: None,
96            loss_time: None,
97            loss_probes: 0,
98            ping_pending: false,
99            immediate_ack_pending: false,
100            in_flight: 0,
101            sent_with_keys: 0,
102        }
103    }
104
105    /// Queue data for a tail loss probe (or anti-amplification deadlock prevention) packet
106    ///
107    /// Probes are sent similarly to normal packets when an expected ACK has not arrived. We never
108    /// deem a packet lost until we receive an ACK that should have included it, but if a trailing
109    /// run of packets (or their ACKs) are lost, this might not happen in a timely fashion. We send
110    /// probe packets to force an ACK, and exempt them from congestion control to prevent a deadlock
111    /// when the congestion window is filled with lost tail packets.
112    ///
113    /// We prefer to send new data, to make the most efficient use of bandwidth. If there's no data
114    /// waiting to be sent, then we retransmit in-flight data to reduce odds of loss. If there's no
115    /// in-flight data either, we're probably a client guarding against a handshake
116    /// anti-amplification deadlock and we just make something up.
117    pub(super) fn maybe_queue_probe(
118        &mut self,
119        request_immediate_ack: bool,
120        streams: &StreamsState,
121    ) {
122        if self.loss_probes == 0 {
123            return;
124        }
125
126        if request_immediate_ack {
127            // The probe should be ACKed without delay (should only be used in the Data space and
128            // when the peer supports the acknowledgement frequency extension)
129            self.immediate_ack_pending = true;
130        }
131
132        if !self.pending.is_empty(streams) {
133            // There's real data to send here, no need to make something up
134            return;
135        }
136
137        // Retransmit the data of the oldest in-flight packet
138        for packet in self.sent_packets.values_mut() {
139            if !packet.retransmits.is_empty(streams) {
140                // Remove retransmitted data from the old packet so we don't end up retransmitting
141                // it *again* even if the copy we're sending now gets acknowledged.
142                self.pending |= mem::take(&mut packet.retransmits);
143                return;
144            }
145        }
146
147        // Nothing new to send and nothing to retransmit, so fall back on a ping. This should only
148        // happen in rare cases during the handshake when the server becomes blocked by
149        // anti-amplification.
150        if !self.immediate_ack_pending {
151            self.ping_pending = true;
152        }
153    }
154
155    /// Get the next outgoing packet number in this space
156    ///
157    /// In the Data space, the connection's [`PacketNumberFilter`] must be used rather than calling
158    /// this directly.
159    pub(super) fn get_tx_number(&mut self) -> u64 {
160        // TODO: Handle packet number overflow gracefully
161        assert!(self.next_packet_number < 2u64.pow(62));
162        let x = self.next_packet_number;
163        self.next_packet_number += 1;
164        self.sent_with_keys += 1;
165        x
166    }
167
168    pub(super) fn can_send(&self, streams: &StreamsState) -> SendableFrames {
169        let acks = self.pending_acks.can_send();
170        let other =
171            !self.pending.is_empty(streams) || self.ping_pending || self.immediate_ack_pending;
172
173        SendableFrames { acks, other }
174    }
175
176    /// Verifies sanity of an ECN block and returns whether congestion was encountered.
177    pub(super) fn detect_ecn(
178        &mut self,
179        newly_acked: u64,
180        ecn: frame::EcnCounts,
181    ) -> Result<bool, &'static str> {
182        let ect0_increase = ecn
183            .ect0
184            .checked_sub(self.ecn_feedback.ect0)
185            .ok_or("peer ECT(0) count regression")?;
186        let ect1_increase = ecn
187            .ect1
188            .checked_sub(self.ecn_feedback.ect1)
189            .ok_or("peer ECT(1) count regression")?;
190        let ce_increase = ecn
191            .ce
192            .checked_sub(self.ecn_feedback.ce)
193            .ok_or("peer CE count regression")?;
194        let total_increase = ect0_increase + ect1_increase + ce_increase;
195        if total_increase < newly_acked {
196            return Err("ECN bleaching");
197        }
198        if (ect0_increase + ce_increase) < newly_acked || ect1_increase != 0 {
199            return Err("ECN corruption");
200        }
201        // If total_increase > newly_acked (which happens when ACKs are lost), this is required by
202        // the draft so that long-term drift does not occur. If =, then the only question is whether
203        // to count CE packets as CE or ECT0. Recording them as CE is more consistent and keeps the
204        // congestion check obvious.
205        self.ecn_feedback = ecn;
206        Ok(ce_increase != 0)
207    }
208
209    /// Stop tracking sent packet `number`, and return what we knew about it
210    pub(super) fn take(&mut self, number: u64) -> Option<SentPacket> {
211        let packet = self.sent_packets.remove(&number)?;
212        self.in_flight -= u64::from(packet.size);
213        if !packet.ack_eliciting && number > self.largest_ack_eliciting_sent {
214            self.unacked_non_ack_eliciting_tail =
215                self.unacked_non_ack_eliciting_tail.checked_sub(1).unwrap();
216        }
217        Some(packet)
218    }
219
220    /// Returns the number of bytes to *remove* from the connection's in-flight count
221    pub(super) fn sent(&mut self, number: u64, packet: SentPacket) -> u64 {
222        // Retain state for at most this many non-ACK-eliciting packets sent after the most recently
223        // sent ACK-eliciting packet. We're never guaranteed to receive an ACK for those, and we
224        // can't judge them as lost without an ACK, so to limit memory in applications which receive
225        // packets but don't send ACK-eliciting data for long periods use we must eventually start
226        // forgetting about them, although it might also be reasonable to just kill the connection
227        // due to weird peer behavior.
228        const MAX_UNACKED_NON_ACK_ELICTING_TAIL: u64 = 1_000;
229
230        let mut forgotten_bytes = 0;
231        if packet.ack_eliciting {
232            self.unacked_non_ack_eliciting_tail = 0;
233            self.largest_ack_eliciting_sent = number;
234        } else if self.unacked_non_ack_eliciting_tail > MAX_UNACKED_NON_ACK_ELICTING_TAIL {
235            let oldest_after_ack_eliciting = *self
236                .sent_packets
237                .range((
238                    Bound::Excluded(self.largest_ack_eliciting_sent),
239                    Bound::Unbounded,
240                ))
241                .next()
242                .unwrap()
243                .0;
244            // Per https://www.rfc-editor.org/rfc/rfc9000.html#name-frames-and-frame-types,
245            // non-ACK-eliciting packets must only contain PADDING, ACK, and CONNECTION_CLOSE
246            // frames, which require no special handling on ACK or loss beyond removal from
247            // in-flight counters if padded.
248            let packet = self
249                .sent_packets
250                .remove(&oldest_after_ack_eliciting)
251                .unwrap();
252            forgotten_bytes = u64::from(packet.size);
253            self.in_flight -= forgotten_bytes;
254        } else {
255            self.unacked_non_ack_eliciting_tail += 1;
256        }
257
258        self.in_flight += u64::from(packet.size);
259        self.sent_packets.insert(number, packet);
260        forgotten_bytes
261    }
262}
263
264impl Index<SpaceId> for [PacketSpace; 3] {
265    type Output = PacketSpace;
266    fn index(&self, space: SpaceId) -> &PacketSpace {
267        &self.as_ref()[space as usize]
268    }
269}
270
271impl IndexMut<SpaceId> for [PacketSpace; 3] {
272    fn index_mut(&mut self, space: SpaceId) -> &mut PacketSpace {
273        &mut self.as_mut()[space as usize]
274    }
275}
276
277/// Represents one or more packets subject to retransmission
278#[derive(Debug, Clone)]
279pub(super) struct SentPacket {
280    /// The time the packet was sent.
281    pub(super) time_sent: Instant,
282    /// The number of bytes sent in the packet, not including UDP or IP overhead, but including QUIC
283    /// framing overhead. Zero if this packet is not counted towards congestion control, i.e. not an
284    /// "in flight" packet.
285    pub(super) size: u16,
286    /// Whether an acknowledgement is expected directly in response to this packet.
287    pub(super) ack_eliciting: bool,
288    /// The largest packet number acknowledged by this packet
289    pub(super) largest_acked: Option<u64>,
290    /// Data which needs to be retransmitted in case the packet is lost.
291    /// The data is boxed to minimize `SentPacket` size for the typical case of
292    /// packets only containing ACKs and STREAM frames.
293    pub(super) retransmits: ThinRetransmits,
294    /// Metadata for stream frames in a packet
295    ///
296    /// The actual application data is stored with the stream state.
297    pub(super) stream_frames: frame::StreamMetaVec,
298}
299
300/// Retransmittable data queue
301#[derive(Debug, Default, Clone)]
302pub struct Retransmits {
303    pub(super) max_data: bool,
304    pub(super) max_stream_id: [bool; 2],
305    pub(super) reset_stream: Vec<(StreamId, VarInt)>,
306    pub(super) stop_sending: Vec<frame::StopSending>,
307    pub(super) max_stream_data: FxHashSet<StreamId>,
308    pub(super) crypto: VecDeque<frame::Crypto>,
309    pub(super) new_cids: Vec<IssuedCid>,
310    pub(super) retire_cids: Vec<u64>,
311    pub(super) ack_frequency: bool,
312    pub(super) handshake_done: bool,
313    /// For each enqueued NEW_TOKEN frame, a copy of the path's remote address
314    ///
315    /// There are 2 reasons this is unusual:
316    ///
317    /// - If the path changes, NEW_TOKEN frames bound for the old path are not retransmitted on the
318    ///   new path. That is why this field stores the remote address: so that ones for old paths
319    ///   can be filtered out.
320    /// - If a token is lost, a new randomly generated token is re-transmitted, rather than the
321    ///   original. This is so that if both transmissions are received, the client won't risk
322    ///   sending the same token twice. That is why this field does _not_ store any actual token.
323    ///
324    /// It is true that a QUIC endpoint will only want to effectively have NEW_TOKEN frames
325    /// enqueued for its current path at a given point in time. Based on that, we could conceivably
326    /// change this from a vector to an `Option<(SocketAddr, usize)>` or just a `usize` or
327    /// something. However, due to the architecture of Quinn, it is considerably simpler to not do
328    /// that; consider what such a change would mean for implementing `BitOrAssign` on Self.
329    pub(super) new_tokens: Vec<SocketAddr>,
330    /// NAT traversal AddAddress frames to be sent
331    pub(super) add_addresses: Vec<frame::AddAddress>,
332    /// NAT traversal PunchMeNow frames to be sent
333    pub(super) punch_me_now: Vec<frame::PunchMeNow>,
334    /// NAT traversal RemoveAddress frames to be sent
335    pub(super) remove_addresses: Vec<frame::RemoveAddress>,
336    /// OBSERVED_ADDRESS frames to be sent
337    pub(super) observed_addresses: Vec<frame::ObservedAddress>,
338}
339
340impl Retransmits {
341    pub(super) fn is_empty(&self, streams: &StreamsState) -> bool {
342        !self.max_data
343            && !self.max_stream_id.into_iter().any(|x| x)
344            && self.reset_stream.is_empty()
345            && self.stop_sending.is_empty()
346            && self
347                .max_stream_data
348                .iter()
349                .all(|&id| !streams.can_send_flow_control(id))
350            && self.crypto.is_empty()
351            && self.new_cids.is_empty()
352            && self.retire_cids.is_empty()
353            && !self.ack_frequency
354            && !self.handshake_done
355            && self.new_tokens.is_empty()
356            && self.add_addresses.is_empty()
357            && self.punch_me_now.is_empty()
358            && self.remove_addresses.is_empty()
359            && self.observed_addresses.is_empty()
360    }
361}
362
363impl ::std::ops::BitOrAssign for Retransmits {
364    fn bitor_assign(&mut self, rhs: Self) {
365        // We reduce in-stream head-of-line blocking by queueing retransmits before other data for
366        // STREAM and CRYPTO frames.
367        self.max_data |= rhs.max_data;
368        for dir in Dir::iter() {
369            self.max_stream_id[dir as usize] |= rhs.max_stream_id[dir as usize];
370        }
371        self.reset_stream.extend_from_slice(&rhs.reset_stream);
372        self.stop_sending.extend_from_slice(&rhs.stop_sending);
373        self.max_stream_data.extend(&rhs.max_stream_data);
374        for crypto in rhs.crypto.into_iter().rev() {
375            self.crypto.push_front(crypto);
376        }
377        self.new_cids.extend(&rhs.new_cids);
378        self.retire_cids.extend(rhs.retire_cids);
379        self.ack_frequency |= rhs.ack_frequency;
380        self.handshake_done |= rhs.handshake_done;
381        self.new_tokens.extend_from_slice(&rhs.new_tokens);
382        self.add_addresses.extend_from_slice(&rhs.add_addresses);
383        self.punch_me_now.extend_from_slice(&rhs.punch_me_now);
384        self.remove_addresses
385            .extend_from_slice(&rhs.remove_addresses);
386        self.observed_addresses
387            .extend_from_slice(&rhs.observed_addresses);
388    }
389}
390
391impl ::std::ops::BitOrAssign<ThinRetransmits> for Retransmits {
392    fn bitor_assign(&mut self, rhs: ThinRetransmits) {
393        if let Some(retransmits) = rhs.retransmits {
394            self.bitor_assign(*retransmits)
395        }
396    }
397}
398
399impl ::std::iter::FromIterator<Self> for Retransmits {
400    fn from_iter<T>(iter: T) -> Self
401    where
402        T: IntoIterator<Item = Self>,
403    {
404        let mut result = Self::default();
405        for packet in iter {
406            result |= packet;
407        }
408        result
409    }
410}
411
412/// A variant of `Retransmits` which only allocates storage when required
413#[derive(Debug, Default, Clone)]
414pub(super) struct ThinRetransmits {
415    retransmits: Option<Box<Retransmits>>,
416}
417
418impl ThinRetransmits {
419    /// Returns `true` if no retransmits are necessary
420    pub(super) fn is_empty(&self, streams: &StreamsState) -> bool {
421        match &self.retransmits {
422            Some(retransmits) => retransmits.is_empty(streams),
423            None => true,
424        }
425    }
426
427    /// Returns a reference to the retransmits stored in this box
428    pub(super) fn get(&self) -> Option<&Retransmits> {
429        self.retransmits.as_deref()
430    }
431
432    /// Returns a mutable reference to the stored retransmits
433    ///
434    /// This function will allocate a backing storage if required.
435    pub(super) fn get_or_create(&mut self) -> &mut Retransmits {
436        if self.retransmits.is_none() {
437            self.retransmits = Some(Box::default());
438        }
439        self.retransmits.as_deref_mut().unwrap()
440    }
441}
442
443/// RFC4303-style sliding window packet number deduplicator.
444///
445/// A contiguous bitfield, where each bit corresponds to a packet number and the rightmost bit is
446/// always set. A set bit represents a packet that has been successfully authenticated. Bits left of
447/// the window are assumed to be set.
448///
449/// ```text
450/// ...xxxxxxxxx 1 0
451///     ^        ^ ^
452/// window highest next
453/// ```
454pub(super) struct Dedup {
455    window: Window,
456    /// Lowest packet number higher than all yet authenticated.
457    next: u64,
458}
459
460/// Inner bitfield type.
461///
462/// Because QUIC never reuses packet numbers, this only needs to be large enough to deal with
463/// packets that are reordered but still delivered in a timely manner.
464type Window = u128;
465
466/// Number of packets tracked by `Dedup`.
467const WINDOW_SIZE: u64 = 1 + mem::size_of::<Window>() as u64 * 8;
468
469impl Dedup {
470    /// Construct an empty window positioned at the start.
471    pub(super) fn new() -> Self {
472        Self { window: 0, next: 0 }
473    }
474
475    /// Highest packet number authenticated.
476    fn highest(&self) -> u64 {
477        self.next - 1
478    }
479
480    /// Record a newly authenticated packet number.
481    ///
482    /// Returns whether the packet might be a duplicate.
483    pub(super) fn insert(&mut self, packet: u64) -> bool {
484        if let Some(diff) = packet.checked_sub(self.next) {
485            // Right of window
486            self.window = ((self.window << 1) | 1)
487                .checked_shl(cmp::min(diff, u64::from(u32::MAX)) as u32)
488                .unwrap_or(0);
489            self.next = packet + 1;
490            false
491        } else if self.highest() - packet < WINDOW_SIZE {
492            // Within window
493            if let Some(bit) = (self.highest() - packet).checked_sub(1) {
494                // < highest
495                let mask = 1 << bit;
496                let duplicate = self.window & mask != 0;
497                self.window |= mask;
498                duplicate
499            } else {
500                // == highest
501                true
502            }
503        } else {
504            // Left of window
505            true
506        }
507    }
508
509    /// Returns the packet number of the smallest packet missing between the provided interval
510    ///
511    /// If there are no missing packets, returns `None`
512    fn smallest_missing_in_interval(&self, lower_bound: u64, upper_bound: u64) -> Option<u64> {
513        debug_assert!(lower_bound <= upper_bound);
514        debug_assert!(upper_bound <= self.highest());
515        const BITFIELD_SIZE: u64 = (mem::size_of::<Window>() * 8) as u64;
516
517        // Since we already know the packets at the boundaries have been received, we only need to
518        // check those in between them (this removes the necessity of extra logic to deal with the
519        // highest packet, which is stored outside the bitfield)
520        let lower_bound = lower_bound + 1;
521        let upper_bound = upper_bound.saturating_sub(1);
522
523        // Note: the offsets are counted from the right
524        // The highest packet is not included in the bitfield, so we subtract 1 to account for that
525        let start_offset = (self.highest() - upper_bound).max(1) - 1;
526        if start_offset >= BITFIELD_SIZE {
527            // The start offset is outside of the window. All packets outside of the window are
528            // considered to be received.
529            return None;
530        }
531
532        let end_offset_exclusive = self.highest().saturating_sub(lower_bound);
533
534        // The range is clamped at the edge of the window, because any earlier packets are
535        // considered to be received
536        let range_len = end_offset_exclusive
537            .saturating_sub(start_offset)
538            .min(BITFIELD_SIZE);
539        if range_len == 0 {
540            return None;
541        }
542
543        // Ensure the shift is within bounds (we already know start_offset < BITFIELD_SIZE,
544        // because of the early return)
545        let mask = if range_len == BITFIELD_SIZE {
546            u128::MAX
547        } else {
548            ((1u128 << range_len) - 1) << start_offset
549        };
550        let gaps = !self.window & mask;
551
552        let smallest_missing_offset = 128 - gaps.leading_zeros() as u64;
553        let smallest_missing_packet = self.highest() - smallest_missing_offset;
554
555        if smallest_missing_packet <= upper_bound {
556            Some(smallest_missing_packet)
557        } else {
558            None
559        }
560    }
561
562    /// Returns true if there are any missing packets between the provided interval
563    ///
564    /// The provided packet numbers must have been received before calling this function
565    fn missing_in_interval(&self, lower_bound: u64, upper_bound: u64) -> bool {
566        self.smallest_missing_in_interval(lower_bound, upper_bound)
567            .is_some()
568    }
569}
570
571/// Indicates which data is available for sending
572#[derive(Clone, Copy, PartialEq, Eq, Debug)]
573pub(super) struct SendableFrames {
574    pub(super) acks: bool,
575    pub(super) other: bool,
576}
577
578impl SendableFrames {
579    /// Returns that no data is available for sending
580    pub(super) fn empty() -> Self {
581        Self {
582            acks: false,
583            other: false,
584        }
585    }
586
587    /// Whether no data is sendable
588    pub(super) fn is_empty(&self) -> bool {
589        !self.acks && !self.other
590    }
591}
592
593#[derive(Debug)]
594pub(super) struct PendingAcks {
595    /// Whether we should send an ACK immediately, even if that means sending an ACK-only packet
596    ///
597    /// When `immediate_ack_required` is false, the normal behavior is to send ACK frames only when
598    /// there is other data to send, or when the `MaxAckDelay` timer expires.
599    immediate_ack_required: bool,
600    /// The number of ack-eliciting packets received since the last ACK frame was sent
601    ///
602    /// Once the count _exceeds_ `ack_eliciting_threshold`, an immediate ACK is required
603    ack_eliciting_since_last_ack_sent: u64,
604    non_ack_eliciting_since_last_ack_sent: u64,
605    ack_eliciting_threshold: u64,
606    /// The reordering threshold, controlling how we respond to out-of-order ack-eliciting packets
607    ///
608    /// Different values enable different behavior:
609    ///
610    /// * `0`: no special action is taken
611    /// * `1`: an ACK is immediately sent if it is out-of-order according to RFC 9000
612    /// * `>1`: an ACK is immediately sent if it is out-of-order according to the ACK frequency draft
613    reordering_threshold: u64,
614    /// The earliest ack-eliciting packet since the last ACK was sent, used to calculate the moment
615    /// upon which `max_ack_delay` elapses
616    earliest_ack_eliciting_since_last_ack_sent: Option<Instant>,
617    /// The packet number ranges of ack-eliciting packets the peer hasn't confirmed receipt of ACKs
618    /// for
619    ranges: ArrayRangeSet,
620    /// The packet with the largest packet number, and the time upon which it was received (used to
621    /// calculate ACK delay in [`PendingAcks::ack_delay`])
622    largest_packet: Option<(u64, Instant)>,
623    /// The ack-eliciting packet we have received with the largest packet number
624    largest_ack_eliciting_packet: Option<u64>,
625    /// The largest acknowledged packet number sent in an ACK frame
626    largest_acked: Option<u64>,
627}
628
629impl PendingAcks {
630    fn new() -> Self {
631        Self {
632            immediate_ack_required: false,
633            ack_eliciting_since_last_ack_sent: 0,
634            non_ack_eliciting_since_last_ack_sent: 0,
635            ack_eliciting_threshold: 1,
636            reordering_threshold: 1,
637            earliest_ack_eliciting_since_last_ack_sent: None,
638            ranges: ArrayRangeSet::default(),
639            largest_packet: None,
640            largest_ack_eliciting_packet: None,
641            largest_acked: None,
642        }
643    }
644
645    pub(super) fn set_ack_frequency_params(&mut self, frame: &frame::AckFrequency) {
646        self.ack_eliciting_threshold = frame.ack_eliciting_threshold.into_inner();
647        self.reordering_threshold = frame.reordering_threshold.into_inner();
648    }
649
650    pub(super) fn set_immediate_ack_required(&mut self) {
651        self.immediate_ack_required = true;
652    }
653
654    pub(super) fn on_max_ack_delay_timeout(&mut self) {
655        self.immediate_ack_required = self.ack_eliciting_since_last_ack_sent > 0;
656    }
657
658    pub(super) fn max_ack_delay_timeout(&self, max_ack_delay: Duration) -> Option<Instant> {
659        self.earliest_ack_eliciting_since_last_ack_sent
660            .map(|earliest_unacked| earliest_unacked + max_ack_delay)
661    }
662
663    /// Whether any ACK frames can be sent
664    pub(super) fn can_send(&self) -> bool {
665        self.immediate_ack_required && !self.ranges.is_empty()
666    }
667
668    /// Returns the delay since the packet with the largest packet number was received
669    pub(super) fn ack_delay(&self, now: Instant) -> Duration {
670        self.largest_packet
671            .map_or(Duration::default(), |(_, received)| now - received)
672    }
673
674    /// Handle receipt of a new packet
675    ///
676    /// Returns true if the max ack delay timer should be armed
677    pub(super) fn packet_received(
678        &mut self,
679        now: Instant,
680        packet_number: u64,
681        ack_eliciting: bool,
682        dedup: &Dedup,
683    ) -> bool {
684        if !ack_eliciting {
685            self.non_ack_eliciting_since_last_ack_sent += 1;
686            return false;
687        }
688
689        let prev_largest_ack_eliciting = self.largest_ack_eliciting_packet.unwrap_or(0);
690
691        // Track largest ack-eliciting packet
692        self.largest_ack_eliciting_packet = self
693            .largest_ack_eliciting_packet
694            .map(|pn| pn.max(packet_number))
695            .or(Some(packet_number));
696
697        // Handle ack_eliciting_threshold
698        self.ack_eliciting_since_last_ack_sent += 1;
699        self.immediate_ack_required |=
700            self.ack_eliciting_since_last_ack_sent > self.ack_eliciting_threshold;
701
702        // Handle out-of-order packets
703        self.immediate_ack_required |=
704            self.is_out_of_order(packet_number, prev_largest_ack_eliciting, dedup);
705
706        // Arm max_ack_delay timer if necessary
707        if self.earliest_ack_eliciting_since_last_ack_sent.is_none() && !self.can_send() {
708            self.earliest_ack_eliciting_since_last_ack_sent = Some(now);
709            return true;
710        }
711
712        false
713    }
714
715    fn is_out_of_order(
716        &self,
717        packet_number: u64,
718        prev_largest_ack_eliciting: u64,
719        dedup: &Dedup,
720    ) -> bool {
721        match self.reordering_threshold {
722            0 => false,
723            1 => {
724                // From https://www.rfc-editor.org/rfc/rfc9000#section-13.2.1-7
725                packet_number < prev_largest_ack_eliciting
726                    || dedup.missing_in_interval(prev_largest_ack_eliciting, packet_number)
727            }
728            _ => {
729                // From acknowledgement frequency draft, section 6.1: send an ACK immediately if
730                // doing so would cause the sender to detect a new packet loss
731                let Some((largest_acked, largest_unacked)) =
732                    self.largest_acked.zip(self.largest_ack_eliciting_packet)
733                else {
734                    return false;
735                };
736                if self.reordering_threshold > largest_acked {
737                    return false;
738                }
739                // The largest packet number that could be declared lost without a new ACK being
740                // sent
741                let largest_reported = largest_acked - self.reordering_threshold + 1;
742                let Some(smallest_missing_unreported) =
743                    dedup.smallest_missing_in_interval(largest_reported, largest_unacked)
744                else {
745                    return false;
746                };
747                largest_unacked - smallest_missing_unreported >= self.reordering_threshold
748            }
749        }
750    }
751
752    /// Should be called whenever ACKs have been sent
753    ///
754    /// This will suppress sending further ACKs until additional ACK eliciting frames arrive
755    pub(super) fn acks_sent(&mut self) {
756        // It is possible (though unlikely) that the ACKs we just sent do not cover all the
757        // ACK-eliciting packets we have received (e.g. if there is not enough room in the packet to
758        // fit all the ranges). To keep things simple, however, we assume they do. If there are
759        // indeed some ACKs that weren't covered, the packets might be ACKed later anyway, because
760        // they are still contained in `self.ranges`. If we somehow fail to send the ACKs at a later
761        // moment, the peer will assume the packets got lost and will retransmit their frames in a
762        // new packet, which is suboptimal, because we already received them. Our assumption here is
763        // that simplicity results in code that is more performant, even in the presence of
764        // occasional redundant retransmits.
765        self.immediate_ack_required = false;
766        self.ack_eliciting_since_last_ack_sent = 0;
767        self.non_ack_eliciting_since_last_ack_sent = 0;
768        self.earliest_ack_eliciting_since_last_ack_sent = None;
769        self.largest_acked = self.largest_ack_eliciting_packet;
770    }
771
772    /// Insert one packet that needs to be acknowledged
773    pub(super) fn insert_one(&mut self, packet: u64, now: Instant) {
774        self.ranges.insert_one(packet);
775
776        if self.largest_packet.is_none_or(|(pn, _)| packet > pn) {
777            self.largest_packet = Some((packet, now));
778        }
779
780        if self.ranges.len() > MAX_ACK_BLOCKS {
781            self.ranges.pop_min();
782        }
783    }
784
785    /// Remove ACKs of packets numbered at or below `max` from the set of pending ACKs
786    pub(super) fn subtract_below(&mut self, max: u64) {
787        self.ranges.remove(0..(max + 1));
788    }
789
790    /// Returns the set of currently pending ACK ranges
791    pub(super) fn ranges(&self) -> &ArrayRangeSet {
792        &self.ranges
793    }
794
795    /// Queue an ACK if a significant number of non-ACK-eliciting packets have not yet been
796    /// acknowledged
797    ///
798    /// Should be called immediately before a non-probing packet is composed, when we've already
799    /// committed to sending a packet regardless.
800    pub(super) fn maybe_ack_non_eliciting(&mut self) {
801        // If we're going to send a packet anyway, and we've received a significant number of
802        // non-ACK-eliciting packets, then include an ACK to help the peer perform timely loss
803        // detection even if they're not sending any ACK-eliciting packets themselves. Exact
804        // threshold chosen somewhat arbitrarily.
805        const LAZY_ACK_THRESHOLD: u64 = 10;
806        if self.non_ack_eliciting_since_last_ack_sent > LAZY_ACK_THRESHOLD {
807            self.immediate_ack_required = true;
808        }
809    }
810}
811
812/// Helper for mitigating [optimistic ACK attacks]
813///
814/// A malicious peer could prompt the local application to begin a large data transfer, and then
815/// send ACKs without first waiting for data to be received. This could defeat congestion control,
816/// allowing the connection to consume disproportionate resources. We therefore occasionally skip
817/// packet numbers, and classify any ACK referencing a skipped packet number as a transport error.
818///
819/// Skipped packet numbers occur only in the application data space (where costly transfers might
820/// take place) and are distributed exponentially to reflect the reduced likelihood and impact of
821/// bad behavior from a peer that has been well-behaved for an extended period.
822///
823/// ACKs for packet numbers that have not yet been allocated are also a transport error, but an
824/// attacker with knowledge of the congestion control algorithm in use could time falsified ACKs to
825/// arrive after the packets they reference are sent.
826///
827/// [optimistic ACK attacks]: https://www.rfc-editor.org/rfc/rfc9000.html#name-optimistic-ack-attack
828pub(super) struct PacketNumberFilter {
829    /// Next outgoing packet number to skip
830    next_skipped_packet_number: u64,
831    /// Most recently skipped packet number
832    prev_skipped_packet_number: Option<u64>,
833    /// Next packet number to skip is randomly selected from 2^n..2^n+1
834    exponent: u32,
835}
836
837impl PacketNumberFilter {
838    pub(super) fn new(rng: &mut (impl Rng + ?Sized)) -> Self {
839        // First skipped PN is in 0..64
840        let exponent = 6;
841        Self {
842            next_skipped_packet_number: rng.gen_range(0..2u64.saturating_pow(exponent)),
843            prev_skipped_packet_number: None,
844            exponent,
845        }
846    }
847
848    #[cfg(test)]
849    pub(super) fn disabled() -> Self {
850        Self {
851            next_skipped_packet_number: u64::MAX,
852            prev_skipped_packet_number: None,
853            exponent: u32::MAX,
854        }
855    }
856
857    pub(super) fn peek(&self, space: &PacketSpace) -> u64 {
858        let n = space.next_packet_number;
859        if n != self.next_skipped_packet_number {
860            return n;
861        }
862        n + 1
863    }
864
865    pub(super) fn allocate(
866        &mut self,
867        rng: &mut (impl Rng + ?Sized),
868        space: &mut PacketSpace,
869    ) -> u64 {
870        let n = space.get_tx_number();
871        if n != self.next_skipped_packet_number {
872            return n;
873        }
874
875        trace!("skipping pn {n}");
876        // Skip this packet number, and choose the next one to skip
877        self.prev_skipped_packet_number = Some(self.next_skipped_packet_number);
878        let next_exponent = self.exponent.saturating_add(1);
879        self.next_skipped_packet_number =
880            rng.gen_range(2u64.saturating_pow(self.exponent)..2u64.saturating_pow(next_exponent));
881        self.exponent = next_exponent;
882
883        space.get_tx_number()
884    }
885
886    pub(super) fn check_ack(
887        &self,
888        space_id: SpaceId,
889        range: std::ops::RangeInclusive<u64>,
890    ) -> Result<(), TransportError> {
891        if space_id == SpaceId::Data
892            && self
893                .prev_skipped_packet_number
894                .is_some_and(|x| range.contains(&x))
895        {
896            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
897        }
898        Ok(())
899    }
900}
901
902/// Ensures we can always fit all our ACKs in a single minimum-MTU packet with room to spare
903const MAX_ACK_BLOCKS: usize = 64;
904
905#[cfg(test)]
906mod test {
907    use super::*;
908
909    #[test]
910    fn sanity() {
911        let mut dedup = Dedup::new();
912        assert!(!dedup.insert(0));
913        assert_eq!(dedup.next, 1);
914        assert_eq!(dedup.window, 0b1);
915        assert!(dedup.insert(0));
916        assert_eq!(dedup.next, 1);
917        assert_eq!(dedup.window, 0b1);
918        assert!(!dedup.insert(1));
919        assert_eq!(dedup.next, 2);
920        assert_eq!(dedup.window, 0b11);
921        assert!(!dedup.insert(2));
922        assert_eq!(dedup.next, 3);
923        assert_eq!(dedup.window, 0b111);
924        assert!(!dedup.insert(4));
925        assert_eq!(dedup.next, 5);
926        assert_eq!(dedup.window, 0b11110);
927        assert!(!dedup.insert(7));
928        assert_eq!(dedup.next, 8);
929        assert_eq!(dedup.window, 0b1111_0100);
930        assert!(dedup.insert(4));
931        assert!(!dedup.insert(3));
932        assert_eq!(dedup.next, 8);
933        assert_eq!(dedup.window, 0b1111_1100);
934        assert!(!dedup.insert(6));
935        assert_eq!(dedup.next, 8);
936        assert_eq!(dedup.window, 0b1111_1101);
937        assert!(!dedup.insert(5));
938        assert_eq!(dedup.next, 8);
939        assert_eq!(dedup.window, 0b1111_1111);
940    }
941
942    #[test]
943    fn happypath() {
944        let mut dedup = Dedup::new();
945        for i in 0..(2 * WINDOW_SIZE) {
946            assert!(!dedup.insert(i));
947            for j in 0..=i {
948                assert!(dedup.insert(j));
949            }
950        }
951    }
952
953    #[test]
954    fn jump() {
955        let mut dedup = Dedup::new();
956        dedup.insert(2 * WINDOW_SIZE);
957        assert!(dedup.insert(WINDOW_SIZE));
958        assert_eq!(dedup.next, 2 * WINDOW_SIZE + 1);
959        assert_eq!(dedup.window, 0);
960        assert!(!dedup.insert(WINDOW_SIZE + 1));
961        assert_eq!(dedup.next, 2 * WINDOW_SIZE + 1);
962        assert_eq!(dedup.window, 1 << (WINDOW_SIZE - 2));
963    }
964
965    #[test]
966    fn dedup_has_missing() {
967        let mut dedup = Dedup::new();
968
969        dedup.insert(0);
970        assert!(!dedup.missing_in_interval(0, 0));
971
972        dedup.insert(1);
973        assert!(!dedup.missing_in_interval(0, 1));
974
975        dedup.insert(3);
976        assert!(dedup.missing_in_interval(1, 3));
977
978        dedup.insert(4);
979        assert!(!dedup.missing_in_interval(3, 4));
980        assert!(dedup.missing_in_interval(0, 4));
981
982        dedup.insert(2);
983        assert!(!dedup.missing_in_interval(0, 4));
984    }
985
986    #[test]
987    fn dedup_outside_of_window_has_missing() {
988        let mut dedup = Dedup::new();
989
990        for i in 0..140 {
991            dedup.insert(i);
992        }
993
994        // 0 and 4 are outside of the window
995        assert!(!dedup.missing_in_interval(0, 4));
996        dedup.insert(160);
997        assert!(!dedup.missing_in_interval(0, 4));
998        assert!(!dedup.missing_in_interval(0, 140));
999        assert!(dedup.missing_in_interval(0, 160));
1000    }
1001
1002    #[test]
1003    fn dedup_smallest_missing() {
1004        let mut dedup = Dedup::new();
1005
1006        dedup.insert(0);
1007        assert_eq!(dedup.smallest_missing_in_interval(0, 0), None);
1008
1009        dedup.insert(1);
1010        assert_eq!(dedup.smallest_missing_in_interval(0, 1), None);
1011
1012        dedup.insert(5);
1013        dedup.insert(7);
1014        assert_eq!(dedup.smallest_missing_in_interval(0, 7), Some(2));
1015        assert_eq!(dedup.smallest_missing_in_interval(5, 7), Some(6));
1016
1017        dedup.insert(2);
1018        assert_eq!(dedup.smallest_missing_in_interval(1, 7), Some(3));
1019
1020        dedup.insert(170);
1021        dedup.insert(172);
1022        dedup.insert(300);
1023        assert_eq!(dedup.smallest_missing_in_interval(170, 172), None);
1024
1025        dedup.insert(500);
1026        assert_eq!(dedup.smallest_missing_in_interval(0, 500), Some(372));
1027        assert_eq!(dedup.smallest_missing_in_interval(0, 373), Some(372));
1028        assert_eq!(dedup.smallest_missing_in_interval(0, 372), None);
1029    }
1030
1031    #[test]
1032    fn pending_acks_first_packet_is_not_considered_reordered() {
1033        let mut acks = PendingAcks::new();
1034        let mut dedup = Dedup::new();
1035        dedup.insert(0);
1036        acks.packet_received(Instant::now(), 0, true, &dedup);
1037        assert!(!acks.immediate_ack_required);
1038    }
1039
1040    #[test]
1041    fn pending_acks_after_immediate_ack_set() {
1042        let mut acks = PendingAcks::new();
1043        let mut dedup = Dedup::new();
1044
1045        // Receive ack-eliciting packet
1046        dedup.insert(0);
1047        let now = Instant::now();
1048        acks.insert_one(0, now);
1049        acks.packet_received(now, 0, true, &dedup);
1050
1051        // Sanity check
1052        assert!(!acks.ranges.is_empty());
1053        assert!(!acks.can_send());
1054
1055        // Can send ACK after max_ack_delay exceeded
1056        acks.set_immediate_ack_required();
1057        assert!(acks.can_send());
1058    }
1059
1060    #[test]
1061    fn pending_acks_ack_delay() {
1062        let mut acks = PendingAcks::new();
1063        let mut dedup = Dedup::new();
1064
1065        let t1 = Instant::now();
1066        let t2 = t1 + Duration::from_millis(2);
1067        let t3 = t2 + Duration::from_millis(5);
1068        assert_eq!(acks.ack_delay(t1), Duration::from_millis(0));
1069        assert_eq!(acks.ack_delay(t2), Duration::from_millis(0));
1070        assert_eq!(acks.ack_delay(t3), Duration::from_millis(0));
1071
1072        // In-order packet
1073        dedup.insert(0);
1074        acks.insert_one(0, t1);
1075        acks.packet_received(t1, 0, true, &dedup);
1076        assert_eq!(acks.ack_delay(t1), Duration::from_millis(0));
1077        assert_eq!(acks.ack_delay(t2), Duration::from_millis(2));
1078        assert_eq!(acks.ack_delay(t3), Duration::from_millis(7));
1079
1080        // Out of order (higher than expected)
1081        dedup.insert(3);
1082        acks.insert_one(3, t2);
1083        acks.packet_received(t2, 3, true, &dedup);
1084        assert_eq!(acks.ack_delay(t2), Duration::from_millis(0));
1085        assert_eq!(acks.ack_delay(t3), Duration::from_millis(5));
1086
1087        // Out of order (lower than expected, so previous instant is kept)
1088        dedup.insert(2);
1089        acks.insert_one(2, t3);
1090        acks.packet_received(t3, 2, true, &dedup);
1091        assert_eq!(acks.ack_delay(t3), Duration::from_millis(5));
1092    }
1093
1094    #[test]
1095    fn sent_packet_size() {
1096        // The tracking state of sent packets should be minimal, and not grow
1097        // over time.
1098        assert!(std::mem::size_of::<SentPacket>() <= 128);
1099    }
1100}