qconnection 0.5.0

Encapsulation of QUIC connections, a part of dquic
Documentation
use std::{
    io,
    sync::{
        Arc,
        atomic::{AtomicBool, AtomicU16, Ordering},
    },
};

use qbase::{
    Epoch,
    error::Error,
    frame::{PathChallengeFrame, PathResponseFrame, io::ReceiveFrame},
    net::{
        route::{Link, PacketHeader, Pathway},
        tx::ArcSendWaker,
    },
    packet::PacketContent,
    param::ParameterId,
    time::{ArcDeferIdleTimer, ArcMaxIdleTimer, MaxIdleTimer},
};
use qcongestion::{Algorithm, ArcCC, Feedback, HandshakeStatus, MSS, PathStatus, Transport};
use qevent::{quic::connectivity::PathAssigned, telemetry::Instrument};
use qinterface::{
    Interface,
    bind_uri::BindUri,
    io::{IO, IoExt},
};
use tokio::time::Duration;

mod aa;
mod burst;
mod drive;
pub mod error;
pub mod paths;
pub mod util;
mod validate;
pub use aa::*;
pub use burst::PacketSpace;
pub use error::*;
pub use paths::*;
use tokio_util::task::AbortOnDropHandle;
use tracing::Instrument as _;
pub use util::*;

use crate::{ArcDcidCell, Components, path::burst::BurstError};
// pub mod burst;

pub struct Path {
    interface: Interface,
    validated: AtomicBool,
    active: AtomicBool,
    link: Link,
    pathway: Pathway,
    cc: ArcCC,
    dcid_cell: ArcDcidCell,
    anti_amplifier: AntiAmplifier,
    max_idle_timer: ArcMaxIdleTimer,
    heartbeat: ArcHeartbeat,
    challenge_sndbuf: SendBuffer<PathChallengeFrame>,
    response_sndbuf: SendBuffer<PathResponseFrame>,
    response_rcvbuf: RecvBuffer<PathResponseFrame>,
    tx_waker: ArcSendWaker,
    pmtu: Arc<AtomicU16>,
    status: PathStatus,
}

impl Components {
    pub fn get_or_try_create_path(
        &self,
        bind_uri: BindUri,
        link: Link,
        pathway: Pathway,
        is_probed: bool,
    ) -> Result<Arc<Path>, CreatePathFailure> {
        let try_create = || {
            let interface = self
                .interfaces
                .borrow(&bind_uri)
                .ok_or(CreatePathFailure::NoInterface(bind_uri))?;
            let dcid_cell = self.cid_registry.remote.apply_dcid();
            let max_ack_delay = self
                .parameters
                .lock_guard()?
                .get_local(ParameterId::MaxAckDelay)
                .expect("unreachable: default value will be got if the value unset");

            let is_initial_path = self.conn_state.try_entry_attempted(self, link)?;
            qevent::event!(PathAssigned {
                path_id: pathway.to_string(),
                path_local: link.src(),
                path_remote: link.dst(),
            });

            let path = Arc::new(Path::new(
                interface,
                link,
                pathway,
                dcid_cell,
                max_ack_delay,
                self.parameters.max_idle_timer(),
                self.defer_idle_timer.clone(),
                [
                    Arc::new(
                        self.spaces
                            .initial()
                            .tracker(self.crypto_streams[Epoch::Initial].clone()),
                    ),
                    Arc::new(
                        self.spaces
                            .handshake()
                            .tracker(self.crypto_streams[Epoch::Handshake].clone()),
                    ),
                    Arc::new(self.spaces.data().tracker(
                        self.crypto_streams[Epoch::Data].clone(),
                        self.data_streams.clone(),
                        self.reliable_frames.clone(),
                    )),
                ],
                self.quic_handshake.status(),
            ));

            let validate = {
                let path = path.clone();
                let paths = self.paths.clone();
                let tls_handshake = self.tls_handshake.clone();
                let conn_state = self.conn_state.clone();
                async move {
                    if !is_probed {
                        path.grant_anti_amplification();
                    }
                    if tls_handshake.info().await.is_err() {
                        return Ok(());
                    }

                    match paths.handshake_path() {
                        Some(handshake_path) if Arc::ptr_eq(&handshake_path, &path) => {
                            path.validated();
                            Ok(())
                        }
                        _ => {
                            if conn_state.handshaked().await.is_err() {
                                return Ok(());
                            }
                            path.validate().await
                        }
                    }
                }
            };

            let drive = {
                let path = path.clone();
                let tls_handshake = self.tls_handshake.clone();
                async move { path.drive(tls_handshake).await }
            };

            let burst = {
                let path = path.clone();
                let mut packages = self.packages();
                let burst = path.new_burst(self);
                async move {
                    let mut buffers = vec![];
                    loop {
                        match burst.burst(&mut packages, &mut buffers).await {
                            Ok(segments) => path.send_packets(&segments).await?,
                            Err(BurstError::Signals(s)) => path.tx_waker.wait_for(s).await,
                            Err(BurstError::PathDeactived) => return io::Result::Ok(()),
                        }
                    }
                }
            };

            let task = async move {
                Err(tokio::select! {
                    Ok(Err(e)) = AbortOnDropHandle::new(tokio::spawn(validate.instrument_in_current().in_current_span())) => PathDeactivated::from(e),
                    Ok(Err(e)) = AbortOnDropHandle::new(tokio::spawn(drive.instrument_in_current().in_current_span())) => e,
                    Ok(Err(e)) = AbortOnDropHandle::new(tokio::spawn(burst.instrument_in_current().in_current_span())) => PathDeactivated::from(e),
                })
            };

            let task =
                Instrument::instrument(task, qevent::span!(@current, path=pathway.to_string()))
                    .in_current_span();

            tracing::trace!(target: "quic", %pathway, %link, is_probed, is_initial_path, "add new path");

            Ok((path, task))
        };
        self.paths.get_or_try_create_with(pathway, try_create)
    }
}

impl Path {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        interface: Interface,
        link: Link,
        pathway: Pathway,
        dcid_cell: ArcDcidCell,
        max_ack_delay: Duration,
        max_idle_timer: MaxIdleTimer,
        defer_idle_timer: ArcDeferIdleTimer,
        feedbacks: [Arc<dyn Feedback>; 3],
        handshake_status: Arc<HandshakeStatus>,
    ) -> Self {
        let pmtu = Arc::new(AtomicU16::new(MSS as u16));
        let path_status = PathStatus::new(handshake_status, pmtu.clone());
        let tx_waker = ArcSendWaker::new();

        let cc = ArcCC::new(
            Algorithm::NewReno,
            max_ack_delay,
            feedbacks,
            path_status.clone(),
            tx_waker.clone(),
        );
        Self {
            interface,
            link,
            pathway,
            cc,
            dcid_cell,
            validated: AtomicBool::new(false),
            active: AtomicBool::new(true),
            anti_amplifier: AntiAmplifier::new(tx_waker.clone()),
            max_idle_timer: ArcMaxIdleTimer::from(max_idle_timer),
            heartbeat: ArcHeartbeat::new(defer_idle_timer, Duration::from_secs(1)),
            challenge_sndbuf: SendBuffer::new(tx_waker.clone()),
            response_sndbuf: SendBuffer::new(tx_waker.clone()),
            response_rcvbuf: Default::default(),
            tx_waker,
            pmtu,
            status: path_status,
        }
    }

    pub fn cc(&self) -> &ArcCC {
        &self.cc
    }

    pub fn on_packet_rcvd(
        &self,
        epoch: Epoch,
        pn: u64,
        size: usize,
        packet_content: PacketContent,
    ) {
        self.anti_amplifier.on_rcvd(size);
        if size > 0 {
            self.status.release_anti_amplification_limit();
        }
        if packet_content.is_ack_eliciting() {
            self.heartbeat.renew_on_effective_communicated();
        }
        if epoch == Epoch::Data {
            self.max_idle_timer.renew_on_received_1rtt();
        }
        self.cc()
            .on_pkt_rcvd(epoch, pn, packet_content.is_ack_eliciting());
    }

    pub fn grant_anti_amplification(&self) {
        self.anti_amplifier.grant();
        self.cc().grant_anti_amplification();
    }

    pub fn mtu(&self) -> u16 {
        self.pmtu.load(Ordering::Acquire)
    }

    pub async fn send_packets(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<()> {
        self.anti_amplifier
            .on_sent(bufs.iter().map(|s| s.len()).sum());
        if self.anti_amplifier.balance().is_err() {
            self.status.enter_anti_amplification_limit();
        }
        let hdr = PacketHeader::new(self.pathway, self.link, 64, None, self.mtu() as _);
        self.interface.sendmmsg(bufs, hdr).await
    }

    pub fn deactivate(&self) {
        self.active.store(false, Ordering::Release);
    }

    pub fn active(&self) {
        self.active.store(true, Ordering::Release);
    }

    pub fn link(&self) -> &Link {
        &self.link
    }

    pub fn pathway(&self) -> &Pathway {
        &self.pathway
    }

    pub fn bind_uri(&self) -> BindUri {
        self.interface.bind_uri()
    }
}

impl Drop for Path {
    fn drop(&mut self) {
        self.response_rcvbuf.dismiss();
    }
}

impl ReceiveFrame<PathChallengeFrame> for Path {
    type Output = ();

    fn recv_frame(&self, frame: &PathChallengeFrame) -> Result<Self::Output, Error> {
        self.response_sndbuf.write((*frame).into());
        Ok(())
    }
}

impl ReceiveFrame<PathResponseFrame> for Path {
    type Output = ();

    fn recv_frame(&self, frame: &PathResponseFrame) -> Result<Self::Output, Error> {
        self.response_rcvbuf.write(*frame);
        Ok(())
    }
}