s2n-quic-transport 0.16.0

Internal crate used by s2n-quic
Documentation
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
    contexts::WriteContext,
    endpoint,
    path::{self, ecn::ValidationOutcome, path_event, Path},
    recovery::{
        manager::persistent_congestion::PersistentCongestionCalculator, SentPacketInfo, SentPackets,
    },
    transmission,
};
use core::{cmp::max, time::Duration};
use s2n_quic_core::{
    event::{self, builder::CongestionSource, IntoEvent},
    frame,
    frame::ack::EcnCounts,
    inet::ExplicitCongestionNotification,
    packet::number::{PacketNumber, PacketNumberRange, PacketNumberSpace},
    recovery::{congestion_controller, CongestionController, RttEstimator, K_GRANULARITY},
    time::{timer, Timer, Timestamp},
    transport,
};
use smallvec::SmallVec;

type PacketDetails<PacketInfo> = (PacketNumber, SentPacketInfo<PacketInfo>);

#[derive(Debug)]
pub struct Manager<Config: endpoint::Config> {
    // The packet space for this recovery manager
    space: PacketNumberSpace,

    //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.3
    //# The largest packet number acknowledged in the packet number space so far.
    largest_acked_packet: Option<PacketNumber>,

    //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.3
    //# An association of packet numbers in a packet number space to information about them.
    //  These are packets that are pending acknowledgement.
    sent_packets: SentPackets<<<Config::CongestionControllerEndpoint as congestion_controller::Endpoint>::CongestionController as congestion_controller::CongestionController>::PacketInfo>,

    // Timer set when packets may be declared lost at a time in the future
    loss_timer: Timer,

    //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2
    //# A Probe Timeout (PTO) triggers the sending of one or two probe
    //# datagrams when ack-eliciting packets are not acknowledged within the
    //# expected period of time or the server may not have validated the
    //# client's address.  A PTO enables a connection to recover from loss of
    //# tail packets or acknowledgments.
    pto: Pto,

    //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.3
    //# The time the most recent ack-eliciting packet was sent.
    time_of_last_ack_eliciting_packet: Option<Timestamp>,

    // The last processed ECN counts received in an ACK frame. Used to
    // validate new ECN counts and to detect increases in the reported ECN-CE counter.
    baseline_ecn_counts: EcnCounts,

    // The total ecn counts for outstanding (unacknowledged) packets
    sent_packet_ecn_counts: EcnCounts,
}

//= https://www.rfc-editor.org/rfc/rfc9002#section-6.1.1
//# The RECOMMENDED initial value for the packet reordering threshold
//# (kPacketThreshold) is 3, based on best practices for TCP loss
//# detection [RFC5681] [RFC6675].  In order to remain similar to TCP,
//# implementations SHOULD NOT use a packet threshold less than 3; see
//# [RFC5681].
const K_PACKET_THRESHOLD: u64 = 3;

/// Initial capacity of the SmallVec used for keeping track of packets
/// acked in an ack frame
// TODO: Determine if there is a more appropriate default
const ACKED_PACKETS_INITIAL_CAPACITY: usize = 32;

macro_rules! recovery_event {
    ($path_id:ident, $path:ident) => {
        event::builder::RecoveryMetrics {
            path: event::builder::Path {
                local_addr: $path.local_address().into_event(),
                local_cid: $path.local_connection_id.into_event(),
                remote_addr: $path.remote_address().into_event(),
                remote_cid: $path.peer_connection_id.into_event(),
                id: $path_id as u64,
                is_active: $path.is_active(),
            },
            min_rtt: $path.rtt_estimator.min_rtt(),
            smoothed_rtt: $path.rtt_estimator.smoothed_rtt(),
            latest_rtt: $path.rtt_estimator.latest_rtt(),
            rtt_variance: $path.rtt_estimator.rttvar(),
            max_ack_delay: $path.rtt_estimator.max_ack_delay(),
            pto_count: ($path.pto_backoff as f32).log2() as u32,
            congestion_window: $path.congestion_controller.congestion_window(),
            bytes_in_flight: $path.congestion_controller.bytes_in_flight(),
            congestion_limited: $path.transmission_constraint().is_congestion_limited(),
        }
    };
}

pub(crate) use recovery_event;

// Since `SentPacketInfo` is generic over a type supplied by the Congestion Controller implementation,
// the type definition is particularly lengthy, especially since rust requires the fully-qualified
// syntax to eliminate ambiguity. This macro can be used where ever the Congestion Controller
// generic PacketInfo type is required to help with readability.
macro_rules! packet_info_type {
    () => {
        <<Config::CongestionControllerEndpoint as congestion_controller::Endpoint>::CongestionController as congestion_controller::CongestionController>::PacketInfo
    }
}

#[allow(clippy::type_complexity)]
impl<Config: endpoint::Config> Manager<Config> {
    /// Constructs a new `recovery::Manager`
    pub fn new(space: PacketNumberSpace) -> Self {
        Self {
            space,
            largest_acked_packet: None,
            sent_packets: SentPackets::default(),
            loss_timer: Timer::default(),
            pto: Pto::default(),
            time_of_last_ack_eliciting_packet: None,
            baseline_ecn_counts: EcnCounts::default(),
            sent_packet_ecn_counts: EcnCounts::default(),
        }
    }

    /// Invoked when the Client processes a Retry packet.
    ///
    /// Reset congestion controller state by discarding sent bytes and replacing recovery
    /// manager with a new instance of itself.
    pub fn on_retry_packet<Pub: event::ConnectionPublisher>(
        &mut self,
        path: &mut Path<Config>,
        path_id: path::Id,
        publisher: &mut Pub,
    ) {
        debug_assert!(
            Config::ENDPOINT_TYPE.is_client(),
            "only a Client should process a Retry packet"
        );

        let mut discarded_bytes = 0;
        for (_, unacked_sent_info) in self.sent_packets.iter() {
            discarded_bytes += unacked_sent_info.sent_bytes as usize;
        }
        path.congestion_controller.on_packet_discarded(
            discarded_bytes,
            &mut congestion_controller::PathPublisher::new(publisher, path_id),
        );

        *self = Self::new(self.space);
    }

    pub fn on_timeout<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        timestamp: Timestamp,
        random_generator: &mut Config::RandomGenerator,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) {
        if self.loss_timer.is_armed() {
            if self.loss_timer.poll_expiration(timestamp).is_ready() {
                self.detect_and_remove_lost_packets(
                    timestamp,
                    random_generator,
                    context,
                    publisher,
                );
            }
        } else {
            let pto_expired = self
                .pto
                .on_timeout(!self.sent_packets.is_empty(), timestamp);

            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2
            //# A PTO timer expiration event does not indicate packet loss and MUST
            //# NOT cause prior unacknowledged packets to be marked as lost.

            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
            //# When a PTO timer expires, the PTO backoff MUST be increased,
            //# resulting in the PTO period being set to twice its current value.
            if pto_expired {
                // Note: the pseudocode updates the pto timer in OnLossDetectionTimeout
                // (see section A.9). We don't do that here since it will be rearmed in
                // `on_packet_sent`, which immediately follows a timeout.
                context.path_mut().pto_backoff *= 2;
            }
        }

        let path_id = context.path_id().as_u8();
        let path = context.path_mut();
        publisher.on_recovery_metrics(recovery_event!(path_id, path));
    }

    //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.5
    //# After a packet is sent, information about the packet is stored.
    #[allow(clippy::too_many_arguments)]
    pub fn on_packet_sent<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        packet_number: PacketNumber,
        outcome: transmission::Outcome,
        time_sent: Timestamp,
        ecn: ExplicitCongestionNotification,
        transmission_mode: transmission::Mode,
        app_limited: Option<bool>,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) {
        //= https://www.rfc-editor.org/rfc/rfc9002#section-7
        //# Similar to TCP, packets containing only ACK frames do not count
        //# towards bytes in flight and are not congestion controlled.

        // Everything else (including probe packets) are counted, as specified below:
        //= https://www.rfc-editor.org/rfc/rfc9002#section-7.5
        //# A sender MUST however count these packets as being additionally in
        //# flight, since these packets add network load without establishing
        //# packet loss.
        let congestion_controlled_bytes = if outcome.is_congestion_controlled {
            outcome.bytes_sent
        } else {
            0
        };

        let path_id = context.path_id();
        let path = context.path_mut();
        let cc_packet_info = path.congestion_controller.on_packet_sent(
            time_sent,
            congestion_controlled_bytes,
            app_limited,
            &path.rtt_estimator,
            &mut congestion_controller::PathPublisher::new(publisher, path_id),
        );

        self.sent_packets.insert(
            packet_number,
            SentPacketInfo::new(
                outcome.is_congestion_controlled,
                congestion_controlled_bytes,
                time_sent,
                outcome.ack_elicitation,
                path_id,
                ecn,
                transmission_mode,
                cc_packet_info,
            ),
        );
        path.ecn_controller
            .on_packet_sent(ecn, path_event!(path, path_id), publisher);
        self.sent_packet_ecn_counts.increment(ecn);

        if outcome.is_congestion_controlled {
            if outcome.ack_elicitation.is_ack_eliciting() {
                self.time_of_last_ack_eliciting_packet = Some(time_sent);
            }
            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
            //# A sender SHOULD restart its PTO timer every time an ack-eliciting
            //# packet is sent or acknowledged,
            let is_handshake_confirmed = context.is_handshake_confirmed();
            let path = context.path_mut_by_id(context.path_id());
            self.update_pto_timer(path, time_sent, is_handshake_confirmed);
        }
    }

    /// Updates the PTO timer
    pub fn update_pto_timer(
        &mut self,
        path: &Path<Config>,
        now: Timestamp,
        is_handshake_confirmed: bool,
    ) {
        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
        //# If no additional data can be sent, the server's PTO timer MUST NOT be
        //# armed until datagrams have been received from the client, because
        //# packets sent on PTO count against the anti-amplification limit.
        if path.at_amplification_limit() {
            // The server's timer is not set if nothing can be sent.
            self.pto.cancel();
            return;
        }

        let ack_eliciting_packets_in_flight = self.sent_packets.iter().any(|(_, sent_info)| {
            sent_info.congestion_controlled && sent_info.ack_elicitation.is_ack_eliciting()
        });

        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
        //# it is the client's responsibility to send packets to unblock the server
        //# until it is certain that the server has finished its address validation
        if !ack_eliciting_packets_in_flight && path.is_peer_validated() {
            // There is nothing to detect lost, so no timer is set.
            // However, the client needs to arm the timer if the
            // server might be blocked by the anti-amplification limit.
            self.pto.cancel();
            return;
        }

        let pto_base_timestamp = if ack_eliciting_packets_in_flight {
            self.time_of_last_ack_eliciting_packet
                .expect("there is at least one ack eliciting packet in flight")
        } else {
            // Arm PTO from now when there are no inflight packets.
            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
            //# That is,
            //# the client MUST set the PTO timer if the client has not received an
            //# acknowledgment for any of its Handshake packets and the handshake is
            //# not confirmed (see Section 4.1.2 of [QUIC-TLS]), even if there are no
            //# packets in flight.
            now
        };

        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
        //# An endpoint MUST NOT set its PTO timer for the Application Data
        //# packet number space until the handshake is confirmed.
        if self.space.is_application_data() && !is_handshake_confirmed {
            self.pto.cancel();
        } else {
            self.pto
                .update(pto_base_timestamp, path.pto_period(self.space));
        }
    }

    /// Queries the component for any outgoing frames that need to get sent
    pub fn on_transmit<W: WriteContext>(&mut self, context: &mut W) {
        self.pto.on_transmit(context)
    }

    /// Process ACK frame.
    ///
    /// Update congestion controller, timers and meta data around acked packet ranges.
    pub fn on_ack_frame<
        A: frame::ack::AckRanges,
        Ctx: Context<Config>,
        Pub: event::ConnectionPublisher,
    >(
        &mut self,
        timestamp: Timestamp,
        frame: frame::Ack<A>,
        packet_number: PacketNumber,
        random_generator: &mut Config::RandomGenerator,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) -> Result<(), transport::Error> {
        let space = self.space;
        let largest_acked_packet_number = space.new_packet_number(frame.largest_acknowledged());

        self.process_acks(
            timestamp,
            frame.ack_ranges().map(|ack_range| {
                let (start, end) = ack_range.into_inner();
                PacketNumberRange::new(space.new_packet_number(start), space.new_packet_number(end))
            }),
            largest_acked_packet_number,
            frame.ack_delay(),
            frame.ecn_counts,
            packet_number,
            random_generator,
            context,
            publisher,
        )?;

        Ok(())
    }

    /// Generic interface for processing ACK ranges.
    #[allow(clippy::too_many_arguments)]
    fn process_acks<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        timestamp: Timestamp,
        ranges: impl Iterator<Item = PacketNumberRange>,
        largest_acked_packet_number: PacketNumber,
        ack_delay: Duration,
        ecn_counts: Option<EcnCounts>,
        packet_number: PacketNumber,
        random_generator: &mut Config::RandomGenerator,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) -> Result<(), transport::Error> {
        // Update the largest acked packet if the largest packet acked in this frame is larger
        let acked_new_largest_packet = match self.largest_acked_packet {
            Some(current_largest) if current_largest > largest_acked_packet_number => false,
            _ => {
                self.largest_acked_packet = Some(largest_acked_packet_number);
                true
            }
        };

        let mut newly_acked_packets = SmallVec::<
            [SentPacketInfo<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY],
        >::new();
        let (largest_newly_acked, includes_ack_eliciting) = self.process_ack_range(
            &mut newly_acked_packets,
            timestamp,
            packet_number,
            ranges,
            context,
            publisher,
        )?;

        //= https://www.rfc-editor.org/rfc/rfc9002#section-5.1
        //# An endpoint generates an RTT sample on receiving an ACK frame that
        //# meets the following two conditions:
        //#
        //# *  the largest acknowledged packet number is newly acknowledged, and
        //#
        //# *  at least one of the newly acknowledged packets was ack-eliciting.
        if let Some(largest_newly_acked) = largest_newly_acked {
            self.update_congestion_control(
                largest_newly_acked,
                largest_acked_packet_number,
                includes_ack_eliciting,
                timestamp,
                ack_delay,
                context,
                publisher,
            );

            let (_, largest_newly_acked_info) = largest_newly_acked;
            self.process_new_acked_packets(
                &newly_acked_packets,
                largest_newly_acked_info,
                acked_new_largest_packet,
                timestamp,
                ecn_counts,
                random_generator,
                context,
                publisher,
            );
        }

        let path_id = context.path_id().as_u8();
        let path = context.path_mut();
        publisher.on_recovery_metrics(recovery_event!(path_id, path));

        Ok(())
    }

    // Process ack_range and return largest_newly_acked and if the packet is ack eliciting.
    fn process_ack_range<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        newly_acked_packets: &mut SmallVec<
            [SentPacketInfo<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY],
        >,
        timestamp: Timestamp,
        packet_number: PacketNumber,
        ranges: impl Iterator<Item = PacketNumberRange>,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) -> Result<(Option<PacketDetails<packet_info_type!()>>, bool), transport::Error> {
        let mut largest_newly_acked: Option<PacketDetails<packet_info_type!()>> = None;
        let mut includes_ack_eliciting = false;

        for pn_range in ranges {
            // The path the ack was received on
            let rx_path_id = context.path_id();
            let rx_path = context.path_mut();
            publisher.on_ack_range_received(event::builder::AckRangeReceived {
                packet_header: event::builder::PacketHeader::new(
                    packet_number,
                    publisher.quic_version(),
                ),
                path: path_event!(rx_path, rx_path_id),
                ack_range: pn_range.into_event(),
            });

            context.validate_packet_ack(timestamp, &pn_range)?;
            // notify components of packets acked
            context.on_packet_ack(timestamp, &pn_range);

            let mut newly_acked_range: Option<(PacketNumber, PacketNumber)> = None;

            for (packet_number, acked_packet_info) in self.sent_packets.remove_range(pn_range) {
                newly_acked_packets.push(acked_packet_info);

                if largest_newly_acked.map_or(true, |(pn, _)| packet_number > pn) {
                    largest_newly_acked = Some((packet_number, acked_packet_info));
                }

                if let Some((start, end)) = newly_acked_range.as_mut() {
                    debug_assert!(
                        packet_number > *start && packet_number > *end,
                        "remove_range should return packet numbers in ascending order"
                    );
                    *end = packet_number;
                } else {
                    newly_acked_range = Some((packet_number, packet_number));
                };

                includes_ack_eliciting |= acked_packet_info.ack_elicitation.is_ack_eliciting();

                let path = context.path_mut_by_id(acked_packet_info.path_id);
                path.mtu_controller.on_packet_ack(
                    packet_number,
                    acked_packet_info.sent_bytes,
                    &mut path.congestion_controller,
                    acked_packet_info.path_id,
                    publisher,
                );
                path.ecn_controller
                    .on_packet_ack(acked_packet_info.time_sent, acked_packet_info.ecn);
            }

            if let Some((start, end)) = newly_acked_range {
                // notify components of packets that are newly acked
                context.on_new_packet_ack(&PacketNumberRange::new(start, end), publisher);
            }
        }

        Ok((largest_newly_acked, includes_ack_eliciting))
    }

    #[allow(clippy::too_many_arguments)]
    fn update_congestion_control<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        largest_newly_acked: PacketDetails<packet_info_type!()>,
        largest_acked_packet_number: PacketNumber,
        includes_ack_eliciting: bool,
        timestamp: Timestamp,
        ack_delay: Duration,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) {
        let mut should_update_rtt = true;
        let is_handshake_confirmed = context.is_handshake_confirmed();
        let (largest_newly_acked_packet_number, largest_newly_acked_info) = largest_newly_acked;

        //= https://www.rfc-editor.org/rfc/rfc9000#section-9.4
        //# Packets sent on the old path MUST NOT contribute to
        //# congestion control or RTT estimation for the new path.
        should_update_rtt &= context.path_id() == largest_newly_acked_info.path_id;

        //= https://www.rfc-editor.org/rfc/rfc9002#section-5.1
        //# To avoid generating multiple RTT samples for a single packet, an ACK
        //# frame SHOULD NOT be used to update RTT estimates if it does not newly
        //# acknowledge the largest acknowledged packet.
        should_update_rtt &= largest_newly_acked_packet_number == largest_acked_packet_number;

        //= https://www.rfc-editor.org/rfc/rfc9002#section-5.1
        //# An RTT sample MUST NOT be generated on receiving an ACK frame that
        //# does not newly acknowledge at least one ack-eliciting packet.
        should_update_rtt &= includes_ack_eliciting;

        if should_update_rtt {
            let latest_rtt = timestamp - largest_newly_acked_info.time_sent;
            let path = context.path_mut_by_id(largest_newly_acked_info.path_id);
            path.rtt_estimator.update_rtt(
                ack_delay,
                latest_rtt,
                timestamp,
                is_handshake_confirmed,
                largest_acked_packet_number.space(),
            );

            // Update the congestion controller with the latest RTT estimate
            path.congestion_controller.on_rtt_update(
                largest_newly_acked_info.time_sent,
                timestamp,
                &path.rtt_estimator,
                &mut congestion_controller::PathPublisher::new(
                    publisher,
                    largest_newly_acked_info.path_id,
                ),
            );

            // Notify components the RTT estimate was updated
            context.on_rtt_update();
        }
    }

    #[allow(clippy::too_many_arguments)]
    fn process_new_acked_packets<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        newly_acked_packets: &SmallVec<
            [SentPacketInfo<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY],
        >,
        largest_newly_acked: SentPacketInfo<packet_info_type!()>,
        new_largest_packet: bool,
        timestamp: Timestamp,
        ecn_counts: Option<EcnCounts>,
        random_generator: &mut Config::RandomGenerator,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) {
        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.1.2
        //# Once a later packet within the same packet number space has been
        //# acknowledged, an endpoint SHOULD declare an earlier packet lost if it
        //# was sent a threshold amount of time in the past.
        self.detect_and_remove_lost_packets(timestamp, random_generator, context, publisher);

        let current_path_id = context.path_id();
        let is_handshake_confirmed = context.is_handshake_confirmed();
        let mut current_path_acked_bytes = 0;
        let mut newly_acked_ecn_counts = EcnCounts::default();

        for acked_packet_info in newly_acked_packets {
            let path = context.path_mut_by_id(acked_packet_info.path_id);

            let sent_bytes = acked_packet_info.sent_bytes as usize;
            newly_acked_ecn_counts.increment(acked_packet_info.ecn);

            if acked_packet_info.path_id == current_path_id {
                current_path_acked_bytes += sent_bytes;
            } else if sent_bytes > 0 {
                path.congestion_controller.on_ack(
                    acked_packet_info.time_sent,
                    sent_bytes,
                    acked_packet_info.cc_packet_info,
                    &path.rtt_estimator,
                    random_generator,
                    timestamp,
                    &mut congestion_controller::PathPublisher::new(
                        publisher,
                        acked_packet_info.path_id,
                    ),
                );
            }

            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
            //# The PTO backoff factor is reset when an acknowledgment is received,
            //# except in the following case.  A server might take longer to respond
            //# to packets during the handshake than otherwise.  To protect such a
            //# server from repeated client probes, the PTO backoff is not reset at a
            //# client that is not yet certain that the server has finished
            //# validating the client's address.  That is, a client does not reset
            //# the PTO backoff factor on receiving acknowledgments in Initial
            //# packets.
            if path.is_peer_validated() {
                path.reset_pto_backoff();
            }

            if acked_packet_info.path_id != current_path_id {
                self.update_pto_timer(path, timestamp, is_handshake_confirmed);
            }
        }

        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
        //# A sender SHOULD restart its PTO timer every time an ack-eliciting
        //# packet is sent or acknowledged,
        debug_assert!(
            !newly_acked_packets.is_empty(),
            "this method assumes there was at least one newly-acked packet"
        );

        //= https://www.rfc-editor.org/rfc/rfc9000#section-13.4.2.1
        //# Validating ECN counts from reordered ACK frames can result in failure.
        //# An endpoint MUST NOT fail ECN validation as a result of processing an
        //# ACK frame that does not increase the largest acknowledged packet number.
        if new_largest_packet {
            self.process_ecn(
                newly_acked_ecn_counts,
                ecn_counts,
                timestamp,
                context,
                publisher,
            );
        }

        let path = context.path_mut();

        if current_path_acked_bytes > 0 {
            path.congestion_controller.on_ack(
                largest_newly_acked.time_sent,
                current_path_acked_bytes,
                largest_newly_acked.cc_packet_info,
                &path.rtt_estimator,
                random_generator,
                timestamp,
                &mut congestion_controller::PathPublisher::new(publisher, current_path_id),
            );
            self.update_pto_timer(path, timestamp, is_handshake_confirmed);
        }
    }

    fn process_ecn<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        newly_acked_ecn_counts: EcnCounts,
        ack_frame_ecn_counts: Option<EcnCounts>,
        timestamp: Timestamp,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) {
        let path_id = context.path_id();
        let path = context.path_mut();

        let outcome = path.ecn_controller.validate(
            newly_acked_ecn_counts,
            self.sent_packet_ecn_counts,
            self.baseline_ecn_counts,
            ack_frame_ecn_counts,
            timestamp,
            path.rtt_estimator.smoothed_rtt(),
            path_event!(path, path_id),
            publisher,
        );

        if let ValidationOutcome::CongestionExperienced(ce_count) = outcome {
            //= https://www.rfc-editor.org/rfc/rfc9002#section-7.1
            //# If a path has been validated to support Explicit Congestion
            //# Notification (ECN) [RFC3168] [RFC8311], QUIC treats a Congestion
            //# Experienced (CE) codepoint in the IP header as a signal of
            //# congestion.
            context
                .path_mut()
                .congestion_controller
                .on_explicit_congestion(
                    ce_count.as_u64(),
                    timestamp,
                    &mut congestion_controller::PathPublisher::new(publisher, path_id),
                );
            let path = context.path();
            publisher.on_congestion(event::builder::Congestion {
                path: path_event!(path, path_id),
                source: CongestionSource::Ecn,
            })
        }

        self.baseline_ecn_counts = ack_frame_ecn_counts.unwrap_or_default();
        self.sent_packet_ecn_counts -= newly_acked_ecn_counts;
    }

    /// Returns `true` if the recovery manager requires a probe packet to be sent.
    #[inline]
    pub fn requires_probe(&self) -> bool {
        matches!(self.pto.state, PtoState::RequiresTransmission(_))
    }

    //= https://www.rfc-editor.org/rfc/rfc9002#appendix-B.9
    //# When Initial or Handshake keys are discarded, packets sent in that
    //# space no longer count toward bytes in flight.
    /// Clears bytes in flight for sent packets.
    pub fn on_packet_number_space_discarded<Pub: event::ConnectionPublisher>(
        &mut self,
        path: &mut Path<Config>,
        path_id: path::Id,
        publisher: &mut Pub,
    ) {
        debug_assert_ne!(self.space, PacketNumberSpace::ApplicationData);

        let path_id_idx = path_id.as_u8();
        publisher.on_recovery_metrics(recovery_event!(path_id_idx, path));

        // Remove any unacknowledged packets from flight.
        let mut discarded_bytes = 0;
        for (_, unacked_sent_info) in self.sent_packets.iter() {
            debug_assert_eq!(
                unacked_sent_info.path_id,
                path_id,
                "this implementation assumes the connection has a single path when discarding packets"
            );
            discarded_bytes += unacked_sent_info.sent_bytes as usize;
        }
        path.congestion_controller.on_packet_discarded(
            discarded_bytes,
            &mut congestion_controller::PathPublisher::new(publisher, path_id),
        );
    }

    //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.10
    //# DetectAndRemoveLostPackets is called every time an ACK is received or the time threshold
    //# loss detection timer expires. This function operates on the sent_packets for that packet
    //# number space and returns a list of packets newly detected as lost.
    fn detect_and_remove_lost_packets<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        now: Timestamp,
        random_generator: &mut Config::RandomGenerator,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) {
        // Cancel the loss timer. It will be armed again if any unacknowledged packets are
        // older than the largest acked packet, but not old enough to be considered lost yet
        self.loss_timer.cancel();

        let (persistent_congestion_duration, sent_packets_to_remove) =
            self.detect_lost_packets(now, context, publisher);

        self.remove_lost_packets(
            now,
            persistent_congestion_duration,
            sent_packets_to_remove,
            random_generator,
            context,
            publisher,
        );
    }

    fn detect_lost_packets<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        now: Timestamp,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) -> (Duration, Vec<PacketDetails<packet_info_type!()>>) {
        let largest_acked_packet = self
            .largest_acked_packet
            .expect("This function is only called after an ack has been received");

        // TODO: Investigate a more efficient mechanism for managing sent packets to remove
        //       See: https://github.com/aws/s2n-quic/issues/1075
        let mut sent_packets_to_remove = Vec::new();
        let mut persistent_congestion_calculator = PersistentCongestionCalculator::new(
            context.path().rtt_estimator.first_rtt_sample(),
            context.path_id(),
        );

        for (unacked_packet_number, unacked_sent_info) in self.sent_packets.iter() {
            if unacked_packet_number > largest_acked_packet {
                // sent_packets is ordered by packet number, so all remaining packets will be larger
                break;
            }

            let unacked_path_id = unacked_sent_info.path_id;
            let path = &context.path_by_id(unacked_path_id);
            // Calculate how long we wait until a packet is declared lost
            let time_threshold = Self::calculate_loss_time_threshold(&path.rtt_estimator);
            // Calculate at what time this particular packet is considered lost based on the
            // current path `time_threshold`
            let packet_lost_time = unacked_sent_info.time_sent + time_threshold;

            // If the `packet_lost_time` exceeds the current time, it's lost
            let time_threshold_exceeded = packet_lost_time.has_elapsed(now);

            let packet_number_threshold_exceeded = largest_acked_packet
                .checked_distance(unacked_packet_number)
                .expect("largest_acked_packet >= unacked_packet_number")
                >= K_PACKET_THRESHOLD;

            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.1
            //# A packet is declared lost if it meets all of the following
            //# conditions:
            //#
            //#     *  The packet is unacknowledged, in flight, and was sent prior to an
            //#        acknowledged packet.
            //#
            //#     *  The packet was sent kPacketThreshold packets before an
            //#        acknowledged packet (Section 6.1.1), or it was sent long enough in
            //#        the past (Section 6.1.2).
            if time_threshold_exceeded || packet_number_threshold_exceeded {
                sent_packets_to_remove.push((unacked_packet_number, *unacked_sent_info));

                if unacked_sent_info.congestion_controlled {
                    // The packet is "in-flight", ie congestion controlled
                    // TODO merge contiguous packet numbers
                    let range =
                        PacketNumberRange::new(unacked_packet_number, unacked_packet_number);
                    context.on_packet_loss(&range, publisher);
                }

                persistent_congestion_calculator
                    .on_lost_packet(unacked_packet_number, unacked_sent_info);
            } else {
                //= https://www.rfc-editor.org/rfc/rfc9002#section-6.1.2
                //# If packets sent prior to the largest acknowledged packet cannot yet
                //# be declared lost, then a timer SHOULD be set for the remaining time.
                self.loss_timer.set(packet_lost_time);
                debug_assert!(
                    !self.loss_timer.is_expired(now),
                    "loss timer was not armed in the future; now: {}, threshold: {:?}\nmanager: {:#?}",
                    now,
                    time_threshold,
                    self
                );

                //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
                //# The PTO timer MUST NOT be set if a timer is set for time threshold
                //# loss detection; see Section 6.1.2.  A timer that is set for time
                //# threshold loss detection will expire earlier than the PTO timer in
                //# most cases and is less likely to spuriously retransmit data.
                self.pto.cancel();

                // assuming sent_packets is ordered by packet number and sent time, all remaining
                // packets will have a larger packet number and sent time, and are thus not lost.
                break;
            }
        }

        (
            persistent_congestion_calculator.persistent_congestion_duration(),
            sent_packets_to_remove,
        )
    }

    fn remove_lost_packets<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
        &mut self,
        now: Timestamp,
        persistent_congestion_duration: Duration,
        sent_packets_to_remove: Vec<PacketDetails<packet_info_type!()>>,
        random_generator: &mut Config::RandomGenerator,
        context: &mut Ctx,
        publisher: &mut Pub,
    ) {
        let current_path_id = context.path_id();
        let mut is_congestion_event = false;
        let mut prev_lost_packet_number = None;

        // Remove the lost packets and account for the bytes on the proper congestion controller
        for (packet_number, sent_info) in sent_packets_to_remove {
            let path = context.path_mut_by_id(sent_info.path_id);
            self.sent_packets.remove(packet_number);

            //= https://www.rfc-editor.org/rfc/rfc9002#section-7.6.2
            //# A sender that does not have state for all packet
            //# number spaces or an implementation that cannot compare send times
            //# across packet number spaces MAY use state for just the packet number
            //# space that was acknowledged.
            let persistent_congestion = persistent_congestion_duration
                > path.rtt_estimator.persistent_congestion_threshold()
                // Check that the packet was sent on this path
                && sent_info.path_id == current_path_id;

            let new_loss_burst = prev_lost_packet_number.map_or(true, |prev: PacketNumber| {
                packet_number.checked_distance(prev) != Some(1)
            });

            if sent_info.transmission_mode.is_mtu_probing() {
                //= https://www.rfc-editor.org/rfc/rfc9000#section-14.4
                //# Loss of a QUIC packet that is carried in a PMTU probe is therefore not a
                //# reliable indication of congestion and SHOULD NOT trigger a congestion
                //# control reaction; see Item 7 in Section 3 of [DPLPMTUD].

                //= https://www.rfc-editor.org/rfc/rfc8899#section-3
                //# Loss of a probe packet SHOULD NOT be treated as an
                //# indication of congestion and SHOULD NOT trigger a congestion
                //# control reaction [RFC4821] because this could result in
                //# unnecessary reduction of the sending rate.
                path.congestion_controller.on_packet_discarded(
                    sent_info.sent_bytes as usize,
                    &mut congestion_controller::PathPublisher::new(publisher, sent_info.path_id),
                );
            } else if sent_info.sent_bytes > 0 {
                path.congestion_controller.on_packet_lost(
                    sent_info.sent_bytes as u32,
                    sent_info.cc_packet_info,
                    persistent_congestion,
                    new_loss_burst,
                    random_generator,
                    now,
                    &mut congestion_controller::PathPublisher::new(publisher, sent_info.path_id),
                );
                is_congestion_event = true;
            }

            publisher.on_packet_lost(event::builder::PacketLost {
                packet_header: event::builder::PacketHeader::new(
                    packet_number,
                    publisher.quic_version(),
                ),
                path: path_event!(path, current_path_id),
                bytes_lost: sent_info.sent_bytes,
                is_mtu_probe: sent_info.transmission_mode.is_mtu_probing(),
            });

            // Notify the MTU controller of packet loss even if it wasn't a probe since it uses
            // that information for blackhole detection.
            path.mtu_controller.on_packet_loss(
                packet_number,
                sent_info.sent_bytes,
                now,
                &mut path.congestion_controller,
                sent_info.path_id,
                publisher,
            );

            let path_id = sent_info.path_id;

            // Notify the ECN controller of packet loss for blackhole detection.
            path.ecn_controller.on_packet_loss(
                sent_info.time_sent,
                sent_info.ecn,
                now,
                path_event!(path, path_id),
                publisher,
            );

            if persistent_congestion {
                //= https://www.rfc-editor.org/rfc/rfc9002#section-5.2
                //# Endpoints SHOULD set the min_rtt to the newest RTT sample after
                //# persistent congestion is established.
                path.rtt_estimator.on_persistent_congestion();
            }

            prev_lost_packet_number = Some(packet_number);
        }

        if is_congestion_event {
            let path = context.path();
            publisher.on_congestion(event::builder::Congestion {
                path: path_event!(path, current_path_id),
                source: CongestionSource::PacketLoss,
            })
        }
    }

    fn calculate_loss_time_threshold(rtt_estimator: &RttEstimator) -> Duration {
        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.1.2
        //# The time threshold is:
        //#
        //# max(kTimeThreshold * max(smoothed_rtt, latest_rtt), kGranularity)
        let mut time_threshold = max(rtt_estimator.smoothed_rtt(), rtt_estimator.latest_rtt());

        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.1.2
        //# The RECOMMENDED time threshold (kTimeThreshold), expressed as an
        //# RTT multiplier, is 9/8.
        time_threshold = (time_threshold * 9) / 8;

        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.1.2
        //# To avoid declaring
        //# packets as lost too early, this time threshold MUST be set to at
        //# least the local timer granularity, as indicated by the kGranularity
        //# constant.
        max(time_threshold, K_GRANULARITY)
    }
}

impl<Config: endpoint::Config> timer::Provider for Manager<Config> {
    #[inline]
    fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
        //# The PTO timer MUST NOT be set if a timer is set for time threshold
        //# loss detection; see Section 6.1.2.  A timer that is set for time
        //# threshold loss detection will expire earlier than the PTO timer in
        //# most cases and is less likely to spuriously retransmit data.

        if self.loss_timer.is_armed() {
            self.loss_timer.timers(query)?;
        } else {
            self.pto.timers(query)?;
        }

        Ok(())
    }
}

pub trait Context<Config: endpoint::Config> {
    const ENDPOINT_TYPE: endpoint::Type;

    fn is_handshake_confirmed(&self) -> bool;

    fn path(&self) -> &Path<Config>;

    fn path_mut(&mut self) -> &mut Path<Config>;

    fn path_by_id(&self, path_id: path::Id) -> &path::Path<Config>;

    fn path_mut_by_id(&mut self, path_id: path::Id) -> &mut path::Path<Config>;

    fn path_id(&self) -> path::Id;

    fn validate_packet_ack(
        &mut self,
        timestamp: Timestamp,
        packet_number_range: &PacketNumberRange,
    ) -> Result<(), transport::Error>;

    fn on_new_packet_ack<Pub: event::ConnectionPublisher>(
        &mut self,
        packet_number_range: &PacketNumberRange,
        publisher: &mut Pub,
    );
    fn on_packet_ack(&mut self, timestamp: Timestamp, packet_number_range: &PacketNumberRange);
    fn on_packet_loss<Pub: event::ConnectionPublisher>(
        &mut self,
        packet_number_range: &PacketNumberRange,
        publisher: &mut Pub,
    );
    fn on_rtt_update(&mut self);
}

impl<Config: endpoint::Config> transmission::interest::Provider for Manager<Config> {
    fn transmission_interest<Q: transmission::interest::Query>(
        &self,
        query: &mut Q,
    ) -> transmission::interest::Result {
        self.pto.transmission_interest(query)
    }
}

/// Manages the probe time out calculation and probe packet transmission
#[derive(Debug, Default)]
struct Pto {
    timer: Timer,
    state: PtoState,
}

#[derive(Debug, PartialEq)]
enum PtoState {
    Idle,
    RequiresTransmission(u8),
}

impl Default for PtoState {
    fn default() -> Self {
        Self::Idle
    }
}

impl Pto {
    /// Called when a timeout has occurred. Returns true if the PTO timer had expired.
    pub fn on_timeout(&mut self, packets_in_flight: bool, timestamp: Timestamp) -> bool {
        if self.timer.poll_expiration(timestamp).is_ready() {
            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4
            //# When a PTO timer expires, a sender MUST send at least one ack-
            //# eliciting packet in the packet number space as a probe.

            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
            //# Since the server could be blocked until more datagrams are received
            //# from the client, it is the client's responsibility to send packets to
            //# unblock the server until it is certain that the server has finished
            //# its address validation

            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4
            //# An endpoint
            //# MAY send up to two full-sized datagrams containing ack-eliciting
            //# packets to avoid an expensive consecutive PTO expiration due to a
            //# single lost datagram or to transmit data from multiple packet number
            //# spaces.

            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4
            //# Sending two packets on PTO
            //# expiration increases resilience to packet drops, thus reducing the
            //# probability of consecutive PTO events.
            let transmission_count = if packets_in_flight { 2 } else { 1 };

            self.state = PtoState::RequiresTransmission(transmission_count);
            true
        } else {
            false
        }
    }

    /// Queries the component for any outgoing frames that need to get sent
    pub fn on_transmit<W: WriteContext>(&mut self, context: &mut W) {
        if !context.transmission_mode().is_loss_recovery_probing() {
            // If we aren't currently in loss recovery probing mode, don't
            // send a probe. We could be in this state even if PtoState is
            // RequiresTransmission if we are just transmitting a ConnectionClose
            // frame.
            return;
        }

        match self.state {
            PtoState::RequiresTransmission(0) => self.state = PtoState::Idle,
            PtoState::RequiresTransmission(remaining) => {
                //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4
                //# When there is no data to send, the sender SHOULD send
                //# a PING or other ack-eliciting frame in a single packet, re-arming the
                //# PTO timer.

                //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
                //# When the PTO fires, the client MUST send a Handshake packet if it
                //# has Handshake keys, otherwise it MUST send an Initial packet in a
                //# UDP datagram with a payload of at least 1200 bytes.

                //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.9
                //# // Client sends an anti-deadlock packet: Initial is padded
                //# // to earn more anti-amplification credit,
                //# // a Handshake packet proves address ownership.

                //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4
                //# All probe packets sent on a PTO MUST be ack-eliciting.

                //= https://www.rfc-editor.org/rfc/rfc9002#section-7.5
                //# Probe packets MUST NOT be blocked by the congestion controller.

                // The early transmission will automatically ensure all initial packets sent by the
                // client are padded to 1200 bytes
                if context.ack_elicitation().is_ack_eliciting()
                    || context.write_frame_forced(&frame::Ping).is_some()
                {
                    let remaining = remaining - 1;
                    self.state = if remaining == 0 {
                        PtoState::Idle
                    } else {
                        PtoState::RequiresTransmission(remaining)
                    };
                }
            }
            _ => {}
        }
    }

    //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
    //# A sender SHOULD restart its PTO timer every time an ack-eliciting
    //# packet is sent or acknowledged, or when Initial or Handshake keys are
    //# discarded (Section 4.9 of [QUIC-TLS]).
    pub fn update(&mut self, base_timestamp: Timestamp, pto_period: Duration) {
        self.timer.set(base_timestamp + pto_period);
    }

    /// Cancels the PTO timer
    pub fn cancel(&mut self) {
        self.timer.cancel();
    }
}

impl timer::Provider for Pto {
    #[inline]
    fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
        self.timer.timers(query)?;
        Ok(())
    }
}

impl transmission::interest::Provider for Pto {
    #[inline]
    fn transmission_interest<Q: transmission::interest::Query>(
        &self,
        query: &mut Q,
    ) -> transmission::interest::Result {
        if matches!(self.state, PtoState::RequiresTransmission(_)) {
            query.on_forced()?;
        }

        Ok(())
    }
}

mod persistent_congestion;
#[cfg(test)]
mod tests;