s2n-quic-transport 0.16.0

Internal crate used by s2n-quic
Documentation
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
    connection, endpoint, path, path::Path, space::PacketSpaceManager, transmission,
    transmission::interest::Provider,
};
use core::time::Duration;
use s2n_codec::{Encoder, EncoderBuffer};
use s2n_quic_core::{
    event::{self, ConnectionPublisher as _},
    frame::ack_elicitation::AckElicitable,
    inet::ExplicitCongestionNotification,
    io::tx,
    packet::{encoding::PacketEncodingError, number::PacketNumberSpace},
    recovery::{CongestionController, MAX_BURST_PACKETS},
    time::Timestamp,
};

#[derive(Debug)]
pub struct ConnectionTransmissionContext<'a, 'sub, Config: endpoint::Config> {
    pub quic_version: u32,
    pub timestamp: Timestamp,
    pub path_id: path::Id,
    pub path_manager: &'a mut path::Manager<Config>,
    pub local_id_registry: &'a mut connection::LocalIdRegistry,
    pub outcome: &'a mut transmission::Outcome,
    pub ecn: ExplicitCongestionNotification,
    pub min_packet_len: Option<usize>,
    pub transmission_mode: transmission::Mode,
    pub publisher: &'a mut event::ConnectionPublisherSubscriber<'sub, Config::EventSubscriber>,
    pub packet_interceptor: &'a mut Config::PacketInterceptor,
}

impl<'a, 'sub, Config: endpoint::Config> ConnectionTransmissionContext<'a, 'sub, Config> {
    pub fn path(&self) -> &Path<Config> {
        &self.path_manager[self.path_id]
    }

    pub fn path_mut(&mut self) -> &mut Path<Config> {
        &mut self.path_manager[self.path_id]
    }
}

pub struct ConnectionTransmission<'a, 'sub, Config: endpoint::Config> {
    pub context: ConnectionTransmissionContext<'a, 'sub, Config>,
    pub space_manager: &'a mut PacketSpaceManager<Config>,
}

impl<'a, 'sub, Config: endpoint::Config> tx::Message for ConnectionTransmission<'a, 'sub, Config> {
    type Handle = Config::PathHandle;

    #[inline]
    fn path_handle(&self) -> &Self::Handle {
        &self.context.path().handle
    }

    #[inline]
    fn ecn(&mut self) -> ExplicitCongestionNotification {
        self.context.ecn
    }

    #[inline]
    fn delay(&mut self) -> Duration {
        // TODO return delay from pacer
        Default::default()
    }

    #[inline]
    fn ipv6_flow_label(&mut self) -> u32 {
        // TODO compute flow label from connection id
        0
    }

    #[inline]
    fn can_gso(&self, segment_len: usize, segment_count: usize) -> bool {
        if let Some(min_packet_len) = self.context.min_packet_len {
            if segment_len < min_packet_len {
                return false;
            }
        }

        if let Some(send_quantum) = self.context.path().congestion_controller.send_quantum() {
            if segment_len * segment_count >= send_quantum {
                return false;
            }
        }

        // If a packet can be GSO'd it means it's limited to the previously written packet
        // size. This becomes a problem for MTU probes where they will likely exceed that amount.
        // As such, if we're probing we want to let the IO layer know to not GSO the current
        // packet.
        !self.context.transmission_mode.is_mtu_probing()
    }

    fn write_payload(
        &mut self,
        buffer: tx::PayloadBuffer,
        gso_offset: usize,
    ) -> Result<usize, tx::Error> {
        let space_manager = &mut self.space_manager;
        let buffer = unsafe {
            // the WriterContext has its own checks for buffer capacity so convert `buffer` into a
            // `&mut [u8]`
            buffer.into_mut_slice()
        };

        let mtu = self
            .context
            .path()
            .clamp_mtu(buffer.len(), self.context.transmission_mode);
        debug_assert_ne!(
            mtu, 0,
            "the amplification limit should be checked before trying to transmit"
        );

        // limit the number of retries to the MAX_BURST_PACKETS
        for _ in 0..MAX_BURST_PACKETS {
            let encoder = EncoderBuffer::new(&mut buffer[..mtu]);
            let initial_capacity = encoder.capacity();

            //= https://www.rfc-editor.org/rfc/rfc9002#section-7
            //# An endpoint MUST NOT send a packet if it would cause bytes_in_flight
            //# (see Appendix B.2) to be larger than the congestion window, unless
            //# the packet is sent on a PTO timer expiration (see Section 6.2) or
            //# when entering recovery (see Section 7.3.2).

            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4
            //# In addition to sending data in the packet number space for which the
            //# timer expired, the sender SHOULD send ack-eliciting packets from
            //# other packet number spaces with in-flight data, coalescing packets if
            //# possible.
            let transmission_constraint =
                if space_manager.requires_probe() && self.context.transmission_mode.is_normal() {
                    //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4
                    //# When a PTO timer expires, a sender MUST send at least one ack-
                    //# eliciting packet in the packet number space as a probe.

                    //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4
                    //# An endpoint SHOULD include new data in packets that are sent on PTO
                    //# expiration.  Previously sent data MAY be sent if no new data can be
                    //# sent.

                    //= https://www.rfc-editor.org/rfc/rfc9002#section-7.5
                    //# Probe packets MUST NOT be blocked by the congestion controller.
                    self.context.transmission_mode = transmission::Mode::LossRecoveryProbing;
                    transmission::Constraint::None
                } else {
                    self.context.path().transmission_constraint()
                };

            //= https://www.rfc-editor.org/rfc/rfc9000#section-14.1
            //# A client MUST expand the payload of all UDP datagrams carrying
            //# Initial packets to at least the smallest allowed maximum datagram
            //# size of 1200 bytes by adding PADDING frames to the Initial packet or
            //# by coalescing the Initial packet; see Section 12.2.

            //= https://www.rfc-editor.org/rfc/rfc9000#section-14.1
            //# Similarly, a
            //# server MUST expand the payload of all UDP datagrams carrying ack-
            //# eliciting Initial packets to at least the smallest allowed maximum
            //# datagram size of 1200 bytes.

            // If the transmission contains an Initial packet, it must be padded. However, we should
            // add padding only if necessary, after packets from all packet number spaces have been
            // coalesced. Therefore, after confirming there will be an Initial packet, we first check
            // if there will be an ApplicationData packet, since those packets come at the end of the
            // datagram. If there is no ApplicationData packet, the Handshake packet will come at the
            // end, so we check that next. Finally, if there is no ApplicationData or Handshake packet
            // to transmit, the Initial packet itself will be padded.
            let mut pn_space_to_pad = {
                if !has_transmission(space_manager.initial(), transmission_constraint) {
                    // There is no Initial packet, so no padding is needed
                    None
                } else if has_transmission(space_manager.application(), transmission_constraint) {
                    Some(PacketNumberSpace::ApplicationData)
                } else if has_transmission(space_manager.handshake(), transmission_constraint) {
                    Some(PacketNumberSpace::Handshake)
                } else {
                    //= https://www.rfc-editor.org/rfc/rfc9001#section-4.9
                    //# These packets MAY also include PADDING frames.
                    Some(PacketNumberSpace::Initial)
                }
            };

            //= https://www.rfc-editor.org/rfc/rfc9001#section-4
            //# When packets of different types need to be sent,
            //# endpoints SHOULD use coalesced packets to send them in the same UDP
            //# datagram.
            // here we query all of the spaces to try and fill the current datagram

            let is_mtu_probing = self.context.transmission_mode.is_mtu_probing();

            let encoder = if let Some((space, handshake_status)) = space_manager
                .initial_mut()
                // MTU probes are only sent in the Application Space
                .filter(|_| !is_mtu_probing)
            {
                self.context.min_packet_len = pn_space_to_pad
                    .filter(|pn_space| pn_space.is_initial())
                    .map(|_| encoder.capacity());

                match space.on_transmit(
                    &mut self.context,
                    transmission_constraint,
                    handshake_status,
                    encoder,
                ) {
                    Ok((outcome, encoder)) => {
                        if Config::ENDPOINT_TYPE.is_server()
                            && !outcome.ack_elicitation().is_ack_eliciting()
                        {
                            //= https://www.rfc-editor.org/rfc/rfc9000#section-14.1
                            //# Similarly, a
                            //# server MUST expand the payload of all UDP datagrams carrying ack-
                            //# eliciting Initial packets to at least the smallest allowed maximum
                            //# datagram size of 1200 bytes.

                            // The Initial packet was not ack eliciting so there is no need to pad
                            pn_space_to_pad = None;
                        }
                        *self.context.outcome += outcome;
                        encoder
                    }
                    Err(PacketEncodingError::PacketNumberTruncationError(encoder)) => {
                        // TODO handle this
                        encoder
                    }
                    Err(PacketEncodingError::InsufficientSpace(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                    Err(PacketEncodingError::EmptyPayload(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                    Err(PacketEncodingError::AeadLimitReached(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                }
            } else {
                encoder
            };

            let encoder = if let Some((space, handshake_status)) = space_manager
                .handshake_mut()
                // MTU probes are only sent in the Application Space
                .filter(|_| !is_mtu_probing)
            {
                self.context.min_packet_len = pn_space_to_pad
                    .filter(|pn_space| pn_space.is_handshake())
                    .map(|_| encoder.capacity());

                let encoder = match space.on_transmit(
                    &mut self.context,
                    transmission_constraint,
                    handshake_status,
                    encoder,
                ) {
                    Ok((outcome, encoder)) => {
                        //= https://www.rfc-editor.org/rfc/rfc9001#section-4.9.1
                        //# a client MUST discard Initial keys when it first sends a
                        //# Handshake packet

                        if Config::ENDPOINT_TYPE.is_client() {
                            let path = &mut self.context.path_manager[self.context.path_id];
                            space_manager.discard_initial(
                                path,
                                self.context.path_id,
                                self.context.publisher,
                            );
                        }

                        *self.context.outcome += outcome;
                        encoder
                    }
                    Err(PacketEncodingError::PacketNumberTruncationError(encoder)) => {
                        // TODO handle this
                        encoder
                    }
                    Err(PacketEncodingError::InsufficientSpace(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                    Err(PacketEncodingError::EmptyPayload(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                    Err(PacketEncodingError::AeadLimitReached(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                };

                //= https://www.rfc-editor.org/rfc/rfc9001#section-4.9.2
                //# An endpoint MUST discard its handshake keys when the TLS handshake is
                //# confirmed (Section 4.1.2).
                if space_manager.is_handshake_confirmed() {
                    let path = &mut self.context.path_manager[self.context.path_id];
                    space_manager.discard_handshake(
                        path,
                        self.context.path_id,
                        self.context.publisher,
                    );
                }

                encoder
            } else {
                encoder
            };

            //= https://www.rfc-editor.org/rfc/rfc9001#section-4.9
            //# Though an endpoint might retain older keys, new data MUST be sent at
            //# the highest currently-available encryption level.

            // This requirement is automatically support with s2n-quic's implementation. Each space
            // acts mostly independent from another and will buffer its own CRYPTO and ACK state. Other
            // frames are only allowed in the ApplicationData space, which will always be the highest
            // current-available encryption level.

            let encoder = if let Some((space, handshake_status)) = space_manager.application_mut() {
                self.context.min_packet_len = pn_space_to_pad
                    .filter(|pn_space| pn_space.is_application_data())
                    .map(|_| encoder.capacity());

                // Pad the packet when sending path validation frames so that MTU is also validated.
                let path = &self.context.path_manager[self.context.path_id];

                //= https://www.rfc-editor.org/rfc/rfc9000#section-8.2.1
                //# An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
                //# to at least the smallest allowed maximum datagram size of 1200 bytes.
                //
                //= https://www.rfc-editor.org/rfc/rfc9000#section-8.2.2
                //# An endpoint MUST expand datagrams that contain a PATH_RESPONSE frame
                //# to at least the smallest allowed maximum datagram size of 1200 bytes.
                // Pad the packet when sending path validation frames so that MTU is also validated.
                //
                // The path's transmission_interest indicates if a PATH_CHALLENGE or PATH_RESPONSE
                // frame is to be written.
                //
                // We need to check is_validated because it is possible to receive a PATH_CHALLENGE on
                // an active path for Off-Path Packet Forwarding prevention. However, we would only
                // like to pad when validating the MTU.
                if !path.is_validated() && path.has_transmission_interest() {
                    self.context.min_packet_len = Some(encoder.capacity());
                }

                match space.on_transmit(
                    &mut self.context,
                    transmission_constraint,
                    handshake_status,
                    encoder,
                ) {
                    Ok((outcome, encoder)) => {
                        *self.context.outcome += outcome;
                        encoder
                    }
                    Err(PacketEncodingError::PacketNumberTruncationError(encoder)) => {
                        // TODO handle this
                        encoder
                    }
                    Err(PacketEncodingError::InsufficientSpace(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                    Err(PacketEncodingError::EmptyPayload(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                    Err(PacketEncodingError::AeadLimitReached(encoder)) => {
                        // move to the next packet space
                        encoder
                    }
                }
            } else {
                encoder
            };

            let datagram_len = initial_capacity - encoder.capacity();

            // the spaces didn't write anything so we're done
            if datagram_len == 0 {
                return Err(tx::Error::EmptyPayload);
            }

            // Emit the transmission event
            //
            // Even though the interceptor could alter the outgoing bytes, we're going to pretend
            // that it doesn't so it's closer to on-path datagram corruption.
            self.context.path_mut().on_bytes_transmitted(datagram_len);
            self.context
                .publisher
                .on_datagram_sent(event::builder::DatagramSent {
                    len: datagram_len as u16,
                    gso_offset,
                });

            let datagram_len = {
                use s2n_quic_core::{
                    event::IntoEvent,
                    packet::interceptor::{Datagram, Interceptor},
                };

                let mut encoder = EncoderBuffer::new(buffer);
                encoder.set_position(datagram_len);

                let subject = self.context.publisher.subject();

                let remote_address = self.context.path().remote_address();
                let local_address = self.context.path().local_address();

                let datagram = Datagram {
                    remote_address: remote_address.into_event(),
                    local_address: local_address.into_event(),
                    timestamp: self.context.timestamp,
                };

                self.context.packet_interceptor.intercept_tx_datagram(
                    &subject,
                    &datagram,
                    &mut encoder,
                );

                // if the packet interceptor cleared the encoder buffer, then try again
                if encoder.is_empty() {
                    continue;
                }

                encoder.len()
            };

            return Ok(datagram_len);
        }

        Err(tx::Error::EmptyPayload)
    }
}

fn has_transmission<P: transmission::interest::Provider>(
    transmission_interest_provider: Option<&P>,
    transmission_constraint: transmission::Constraint,
) -> bool {
    transmission_interest_provider.map_or(false, |provider| {
        provider.can_transmit(transmission_constraint)
    })
}