asupersync 0.3.1

Spec-first, cancel-correct, capability-secure async runtime for Rust.
Documentation
//! QUIC connection type.
//!
//! Provides cancel-correct connection handling with stream management.

use super::error::QuicError;
use super::stream::{RecvStream, SendStream, StreamTracker};
use crate::cx::Cx;
use std::net::SocketAddr;
use std::sync::Arc;

/// A QUIC connection with cancel-correct stream management.
///
/// The connection tracks all open streams and ensures proper cleanup
/// on cancellation or connection close.
#[derive(Debug)]
pub struct QuicConnection {
    inner: quinn::Connection,
    tracker: Arc<StreamTracker>,
}

impl QuicConnection {
    /// Create a new connection wrapper.
    pub(crate) fn new(inner: quinn::Connection) -> Self {
        Self {
            inner,
            tracker: StreamTracker::new(),
        }
    }

    /// Get the remote address of the peer.
    #[must_use]
    pub fn remote_address(&self) -> SocketAddr {
        self.inner.remote_address()
    }

    /// Get the stable connection ID.
    #[must_use]
    pub fn stable_id(&self) -> usize {
        self.inner.stable_id()
    }

    /// Get the negotiated ALPN protocol, if any.
    #[must_use]
    pub fn alpn_protocol(&self) -> Option<Vec<u8>> {
        self.inner.handshake_data().and_then(|data| {
            data.downcast::<quinn::crypto::rustls::HandshakeData>()
                .ok()
                .and_then(|hs| hs.protocol.clone())
        })
    }

    /// Open a bidirectional stream.
    ///
    /// Returns both send and receive halves of the stream.
    pub async fn open_bi(&self, cx: &Cx) -> Result<(SendStream, RecvStream), QuicError> {
        cx.checkpoint()?;

        let (send, recv) = self.inner.open_bi().await?;

        Ok((
            SendStream::new(send, &self.tracker),
            RecvStream::new(recv, &self.tracker),
        ))
    }

    /// Open a unidirectional stream for sending.
    pub async fn open_uni(&self, cx: &Cx) -> Result<SendStream, QuicError> {
        cx.checkpoint()?;

        let send = self.inner.open_uni().await?;
        Ok(SendStream::new(send, &self.tracker))
    }

    /// Accept an incoming bidirectional stream from the peer.
    pub async fn accept_bi(&self, cx: &Cx) -> Result<(SendStream, RecvStream), QuicError> {
        cx.checkpoint()?;

        let (send, recv) = self.inner.accept_bi().await?;

        Ok((
            SendStream::new(send, &self.tracker),
            RecvStream::new(recv, &self.tracker),
        ))
    }

    /// Accept an incoming unidirectional stream from the peer.
    pub async fn accept_uni(&self, cx: &Cx) -> Result<RecvStream, QuicError> {
        cx.checkpoint()?;

        let recv = self.inner.accept_uni().await?;
        Ok(RecvStream::new(recv, &self.tracker))
    }

    /// Close the connection gracefully.
    ///
    /// Sends a close frame to the peer and waits for acknowledgement.
    pub async fn close(&self, cx: &Cx, code: u32, reason: &[u8]) -> Result<(), QuicError> {
        cx.checkpoint()?;

        // Mark all streams for cleanup
        self.tracker.mark_closing();

        // Close the connection
        self.inner.close(code.into(), reason);

        // Wait for the connection to fully close
        self.inner.closed().await;

        Ok(())
    }

    /// Close the connection immediately without waiting.
    pub fn close_immediately(&self, code: u32, reason: &[u8]) {
        self.tracker.mark_closing();
        self.inner.close(code.into(), reason);
    }

    /// Check if the connection is still open.
    #[must_use]
    pub fn is_open(&self) -> bool {
        !self.tracker.is_closing() && self.inner.close_reason().is_none()
    }

    /// Wait for the connection to close (for any reason).
    pub async fn closed(&self) {
        self.inner.closed().await;
    }

    /// Get the maximum datagram size that can be sent.
    #[must_use]
    pub fn max_datagram_size(&self) -> Option<usize> {
        self.inner.max_datagram_size()
    }

    /// Send an unreliable datagram.
    ///
    /// Datagrams are not guaranteed to be delivered or arrive in order.
    pub fn send_datagram(&self, data: &[u8]) -> Result<(), QuicError> {
        self.inner.send_datagram(data.to_vec().into())?;
        Ok(())
    }

    /// Receive an unreliable datagram.
    ///
    /// Returns the datagram payload as a byte vector.
    pub async fn read_datagram(&self, cx: &Cx) -> Result<Vec<u8>, QuicError> {
        cx.checkpoint()?;

        let data = self.inner.read_datagram().await?;
        Ok(data.to_vec())
    }

    /// Get RTT (round-trip time) estimate.
    #[must_use]
    pub fn rtt(&self) -> std::time::Duration {
        self.inner.rtt()
    }

    /// Get a reference to the inner quinn connection.
    #[must_use]
    pub fn inner(&self) -> &quinn::Connection {
        &self.inner
    }
}

impl Drop for QuicConnection {
    fn drop(&mut self) {
        // Ensure streams are marked for cleanup
        self.tracker.mark_closing();
    }
}