Skip to main content

ant_quic/high_level/
connection.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    any::Any,
10    collections::VecDeque,
11    fmt,
12    future::Future,
13    io,
14    net::{IpAddr, SocketAddr},
15    pin::Pin,
16    sync::{Arc, Weak},
17    task::{Context, Poll, Waker, ready},
18};
19
20use bytes::Bytes;
21use pin_project_lite::pin_project;
22use rustc_hash::FxHashMap;
23use thiserror::Error;
24use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
25use tracing::{Instrument, Span, debug, debug_span, error};
26
27use super::{
28    ConnectionEvent,
29    mutex::Mutex,
30    recv_stream::RecvStream,
31    runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSender},
32    send_stream::SendStream,
33    udp_transmit,
34};
35use crate::{
36    ConnectionError, ConnectionHandle, ConnectionStats, DatagramDropStats, Dir, Duration,
37    EndpointEvent, Instant, Side, StreamEvent, StreamId, VarInt,
38    congestion::Controller,
39    path::{Path, PathId, PathSnapshot},
40};
41
42/// In-progress connection attempt future
43#[derive(Debug)]
44pub struct Connecting {
45    conn: Option<ConnectionRef>,
46    connected: oneshot::Receiver<bool>,
47    handshake_data_ready: Option<oneshot::Receiver<()>>,
48}
49
50fn is_expected_background_io_error(error: &io::Error) -> bool {
51    matches!(
52        error.kind(),
53        io::ErrorKind::AddrNotAvailable
54            | io::ErrorKind::ConnectionRefused
55            | io::ErrorKind::ConnectionReset
56            | io::ErrorKind::HostUnreachable
57            | io::ErrorKind::NetworkUnreachable
58            | io::ErrorKind::NotConnected
59            | io::ErrorKind::TimedOut
60    ) || matches!(error.raw_os_error(), Some(49 | 51 | 65 | 101 | 113))
61}
62
63impl Connecting {
64    pub(crate) fn new(
65        handle: ConnectionHandle,
66        conn: crate::Connection,
67        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
68        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
69        socket: Arc<dyn AsyncUdpSocket>,
70        runtime: Arc<dyn Runtime>,
71    ) -> Self {
72        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
73        let (on_connected_send, on_connected_recv) = oneshot::channel();
74        let conn = ConnectionRef::new(
75            handle,
76            conn,
77            endpoint_events,
78            conn_events,
79            on_handshake_data_send,
80            on_connected_send,
81            socket,
82            runtime.clone(),
83        );
84
85        let driver = ConnectionDriver(conn.clone());
86        runtime.spawn(Box::pin(
87            async {
88                if let Err(e) = driver.await {
89                    if is_expected_background_io_error(&e) {
90                        debug!("background I/O path ended: {e}");
91                    } else {
92                        tracing::error!("I/O error: {e}");
93                    }
94                }
95            }
96            .instrument(Span::current()),
97        ));
98
99        Self {
100            conn: Some(conn),
101            connected: on_connected_recv,
102            handshake_data_ready: Some(on_handshake_data_recv),
103        }
104    }
105
106    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
107    ///
108    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
109    /// If so, the returned [`Connection`] can be used to send application data without waiting for
110    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
111    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
112    /// complete, at which point subsequently opened streams and written data will have full
113    /// cryptographic protection.
114    ///
115    /// ## Outgoing
116    ///
117    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
118    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
119    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
120    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
121    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
122    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
123    /// If it was rejected, the existence of streams opened and other application data sent prior
124    /// to the handshake completing will not be conveyed to the remote application, and local
125    /// operations on them will return `ZeroRttRejected` errors.
126    ///
127    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
128    /// relevant resumption state to be stored in the server, which servers may limit or lose for
129    /// various reasons including not persisting resumption state across server restarts.
130    ///
131    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
132    /// implementation's docs for 0-RTT pitfalls.
133    ///
134    /// ## Incoming
135    ///
136    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
137    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
138    ///
139    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
140    /// implementation's docs for 0-RTT pitfalls.
141    ///
142    /// ## Security
143    ///
144    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
145    /// replay attacks, and should therefore never invoke non-idempotent operations.
146    ///
147    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
148    /// before TLS client authentication has occurred, and should therefore not be used to send
149    /// data for which client authentication is being used.
150    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
151        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
152        // have to release it explicitly before returning `self` by value.
153        let conn = match self.conn.as_mut() {
154            Some(conn) => conn.state.lock("into_0rtt"),
155            None => {
156                return Err(self);
157            }
158        };
159
160        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
161        drop(conn);
162
163        if is_ok {
164            match self.conn.take() {
165                Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
166                None => {
167                    tracing::error!("Connection state missing during 0-RTT acceptance");
168                    Err(self)
169                }
170            }
171        } else {
172            Err(self)
173        }
174    }
175
176    /// Parameters negotiated during the handshake
177    ///
178    /// The dynamic type returned is determined by the configured
179    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
180    /// be [`downcast`](Box::downcast) to a
181    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
182    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
183        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
184        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
185        // simple.
186        if let Some(x) = self.handshake_data_ready.take() {
187            let _ = x.await;
188        }
189        let conn = self.conn.as_ref().ok_or_else(|| {
190            tracing::error!("Connection state missing while retrieving handshake data");
191            ConnectionError::LocallyClosed
192        })?;
193        let inner = conn.state.lock("handshake");
194        inner
195            .inner
196            .crypto_session()
197            .handshake_data()
198            .ok_or_else(|| {
199                inner.error.clone().unwrap_or_else(|| {
200                    error!("Spurious handshake data ready notification with no error");
201                    ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
202                        "Spurious handshake notification".to_string(),
203                    ))
204                })
205            })
206    }
207
208    /// The local IP address which was used when the peer established
209    /// the connection
210    ///
211    /// This can be different from the address the endpoint is bound to, in case
212    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
213    ///
214    /// This will return `None` for clients, or when the platform does not expose this
215    /// information. See quinn_udp's RecvMeta::dst_ip for a list of
216    /// supported platforms when using quinn_udp for I/O, which is the default.
217    ///
218    /// Will panic if called after `poll` has returned `Ready`.
219    pub fn local_ip(&self) -> Option<IpAddr> {
220        let conn = self.conn.as_ref()?;
221        let inner = conn.state.lock("local_ip");
222
223        inner.inner.local_ip()
224    }
225
226    /// The peer's UDP address
227    ///
228    /// Returns an error if called after `poll` has returned `Ready`.
229    pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
230        let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
231            error!("Connection used after yielding Ready");
232            ConnectionError::LocallyClosed
233        })?;
234        Ok(conn_ref.state.lock("remote_address").inner.remote_address())
235    }
236}
237
238impl Future for Connecting {
239    type Output = Result<Connection, ConnectionError>;
240    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
241        Pin::new(&mut self.connected).poll(cx).map(|_| {
242            let conn = self.conn.take().ok_or_else(|| {
243                error!("Connection not available when connecting future resolves");
244                ConnectionError::LocallyClosed
245            })?;
246            let inner = conn.state.lock("connecting");
247            if inner.connected {
248                drop(inner);
249                Ok(Connection(conn))
250            } else {
251                Err(inner.error.clone().unwrap_or_else(|| {
252                    ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
253                        "connection failed without error".to_string(),
254                    ))
255                }))
256            }
257        })
258    }
259}
260
261/// Future that completes when a connection is fully established
262///
263/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
264/// value is meaningless.
265pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
266
267impl Future for ZeroRttAccepted {
268    type Output = bool;
269    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
270        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
271    }
272}
273
274/// A future that drives protocol logic for a connection
275///
276/// This future handles the protocol logic for a single connection, routing events from the
277/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
278/// It also keeps track of outstanding timeouts for the `Connection`.
279///
280/// If the connection encounters an error condition, this future will yield an error. It will
281/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
282/// connection-related futures, this waits for the draining period to complete to ensure that
283/// packets still in flight from the peer are handled gracefully.
284#[must_use = "connection drivers must be spawned for their connections to function"]
285#[derive(Debug)]
286struct ConnectionDriver(ConnectionRef);
287
288impl Future for ConnectionDriver {
289    type Output = Result<(), io::Error>;
290
291    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
292        let conn = &mut *self.0.state.lock("poll");
293
294        let span = debug_span!("drive", id = conn.handle.0);
295        let _guard = span.enter();
296
297        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
298            conn.terminate(e, &self.0.shared);
299            return Poll::Ready(Ok(()));
300        }
301        let mut keep_going = conn.drive_transmit(cx)?;
302        // If a timer expires, there might be more to transmit. When we transmit something, we
303        // might need to reset a timer. Hence, we must loop until neither happens.
304        keep_going |= conn.drive_timer(cx);
305        conn.forward_endpoint_events();
306        conn.forward_app_events(&self.0.shared);
307
308        // Kick off automatic channel binding once connected, if configured
309        if conn.connected && !conn.binding_started {
310            if let Some(rt) = crate::trust::global_runtime() {
311                // Delay NEW_TOKEN until binding completes
312                conn.inner.set_delay_new_token_until_binding(true);
313
314                let hl_conn_server = Connection(self.0.clone());
315                let hl_conn_client = hl_conn_server.clone();
316                let store = rt.store.clone();
317                let policy = rt.policy.clone();
318                let signer = rt.local_secret_key.clone();
319                let spki = rt.local_spki.clone();
320                let runtime = conn.runtime.clone();
321
322                if conn.inner.side().is_server() {
323                    runtime.spawn(Box::pin(async move {
324                        match crate::trust::recv_verify_binding(&hl_conn_server, &*store, &policy)
325                            .await
326                        {
327                            Ok(peer) => {
328                                hl_conn_server
329                                    .0
330                                    .state
331                                    .lock("set peer")
332                                    .inner
333                                    .set_token_binding_peer_id(peer);
334                                hl_conn_server
335                                    .0
336                                    .state
337                                    .lock("allow tokens")
338                                    .inner
339                                    .set_delay_new_token_until_binding(false);
340                            }
341                            Err(_e) => {
342                                hl_conn_server.close(0u32.into(), b"channel binding failed");
343                            }
344                        }
345                    }));
346                }
347
348                if conn.inner.side().is_client() {
349                    runtime.spawn(Box::pin(async move {
350                        if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
351                            let _ =
352                                crate::trust::send_binding(&hl_conn_client, &exp, &signer, &spki)
353                                    .await;
354                        }
355                    }));
356                }
357
358                conn.binding_started = true;
359            }
360        }
361
362        if !conn.inner.is_drained() {
363            if keep_going {
364                // If the connection hasn't processed all tasks, schedule it again
365                cx.waker().wake_by_ref();
366            } else {
367                conn.driver = Some(cx.waker().clone());
368            }
369            return Poll::Pending;
370        }
371        if conn.error.is_none() {
372            unreachable!("drained connections always have an error");
373        }
374        Poll::Ready(Ok(()))
375    }
376}
377
378/// A QUIC connection.
379///
380/// If all references to a connection (including every clone of the `Connection` handle, streams of
381/// incoming streams, and the various stream types) have been dropped, then the connection will be
382/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
383/// connection explicitly by calling [`Connection::close()`].
384///
385/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
386/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
387/// application. [`Connection::close()`] describes in more detail how to gracefully close a
388/// connection without losing application data.
389///
390/// May be cloned to obtain another handle to the same connection.
391///
392/// [`Connection::close()`]: Connection::close
393#[derive(Debug, Clone)]
394pub struct Connection(ConnectionRef);
395
396/// Weak handle to a QUIC connection.
397///
398/// This lets observers retain a reference to connection identity without
399/// keeping the connection alive.
400#[derive(Debug, Clone)]
401pub struct WeakConnectionHandle(Weak<ConnectionInner>);
402
403impl WeakConnectionHandle {
404    /// Returns true while the connection still exists and has not closed.
405    pub fn is_alive(&self) -> bool {
406        self.0
407            .upgrade()
408            .is_some_and(|inner| inner.state.lock("weak_is_alive").error.is_none())
409    }
410
411    /// Upgrade to a strong connection handle if the connection state still exists.
412    pub fn upgrade(&self) -> Option<Connection> {
413        let conn = ConnectionRef(self.0.upgrade()?);
414        conn.state.lock("weak_upgrade").ref_count += 1;
415        Some(Connection(conn))
416    }
417
418    /// Returns true if both weak handles refer to the same connection allocation.
419    pub fn is_same_connection(&self, other: &Self) -> bool {
420        self.0.ptr_eq(&other.0)
421    }
422}
423
424impl Connection {
425    /// Initiate a new outgoing unidirectional stream.
426    ///
427    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
428    /// consequence, the peer won't be notified that a stream has been opened until the stream is
429    /// actually used.
430    pub fn open_uni(&self) -> OpenUni<'_> {
431        OpenUni {
432            conn: &self.0,
433            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
434        }
435    }
436
437    /// Initiate a new outgoing bidirectional stream.
438    ///
439    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
440    /// consequence, the peer won't be notified that a stream has been opened until the stream is
441    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
442    /// anything to [`SendStream`] will never succeed.
443    ///
444    /// [`open_bi()`]: Self::open_bi
445    /// [`SendStream`]: crate::SendStream
446    /// [`RecvStream`]: crate::RecvStream
447    pub fn open_bi(&self) -> OpenBi<'_> {
448        OpenBi {
449            conn: &self.0,
450            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
451        }
452    }
453
454    /// Accept the next incoming uni-directional stream
455    pub fn accept_uni(&self) -> AcceptUni<'_> {
456        AcceptUni {
457            conn: &self.0,
458            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
459        }
460    }
461
462    /// Accept the next incoming bidirectional stream
463    ///
464    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
465    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
466    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
467    ///
468    /// [`accept_bi()`]: Self::accept_bi
469    /// [`open_bi()`]: Self::open_bi
470    /// [`SendStream`]: crate::SendStream
471    /// [`RecvStream`]: crate::RecvStream
472    pub fn accept_bi(&self) -> AcceptBi<'_> {
473        AcceptBi {
474            conn: &self.0,
475            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
476        }
477    }
478
479    /// Receive an application datagram
480    pub fn read_datagram(&self) -> ReadDatagram<'_> {
481        ReadDatagram {
482            conn: &self.0,
483            notify: self.0.shared.datagram_received.notified(),
484        }
485    }
486
487    /// Wait for the connection to be closed for any reason
488    ///
489    /// Despite the return type's name, closed connections are often not an error condition at the
490    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
491    /// and [`ConnectionError::ApplicationClosed`].
492    pub async fn closed(&self) -> ConnectionError {
493        {
494            let conn = self.0.state.lock("closed");
495            if let Some(error) = conn.error.as_ref() {
496                return error.clone();
497            }
498            // Construct the future while the lock is held to ensure we can't miss a wakeup if
499            // the `Notify` is signaled immediately after we release the lock. `await` it after
500            // the lock guard is out of scope.
501            self.0.shared.closed.notified()
502        }
503        .await;
504        self.0
505            .state
506            .lock("closed")
507            .error
508            .as_ref()
509            .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
510            .clone()
511    }
512
513    /// Wait for the connection to be closed for any reason.
514    pub fn on_closed(&self) -> impl Future<Output = ConnectionError> + '_ {
515        self.closed()
516    }
517
518    /// Downgrade this connection to a weak handle.
519    pub fn weak_handle(&self) -> WeakConnectionHandle {
520        WeakConnectionHandle(Arc::downgrade(&self.0.0))
521    }
522
523    /// Return read-only handles for the connection's currently known paths.
524    ///
525    /// The current high-level API exposes the primary single-path route. Future
526    /// multipath support will extend this list without changing the handle
527    /// shape.
528    pub fn paths(&self) -> Vec<Path> {
529        self.path_snapshot(PathId::PRIMARY)
530            .map(|snapshot| Path::new(self.weak_handle(), PathId::PRIMARY, snapshot))
531            .into_iter()
532            .collect()
533    }
534
535    /// Return statistics for a path if the path is currently known.
536    pub fn path_stats(&self, id: PathId) -> Option<crate::connection::PathStats> {
537        self.path_snapshot(id).map(|snapshot| snapshot.stats)
538    }
539
540    pub(crate) fn path_snapshot(&self, id: PathId) -> Option<PathSnapshot> {
541        if id != PathId::PRIMARY {
542            return None;
543        }
544
545        let conn = self.0.state.lock("path_snapshot");
546        let stats = conn.inner.stats().path;
547        let remote_address = conn.inner.remote_address();
548        let observed_external_addr = conn.inner.observed_address();
549        Some(PathSnapshot {
550            stats,
551            remote_address,
552            observed_external_addr,
553        })
554    }
555
556    /// Check if this connection is still alive (not closed or draining).
557    ///
558    /// Returns `true` if the connection has not been closed for any reason.
559    /// This is useful for detecting phantom or stale connections that should
560    /// be cleaned up before attempting deduplication.
561    pub fn is_alive(&self) -> bool {
562        self.0.state.lock("is_alive").error.is_none()
563    }
564
565    /// If the connection is closed, the reason why.
566    ///
567    /// Returns `None` if the connection is still open.
568    pub fn close_reason(&self) -> Option<ConnectionError> {
569        self.0.state.lock("close_reason").error.clone()
570    }
571
572    pub(crate) fn supports_ack_receive_v2(&self) -> bool {
573        self.0
574            .state
575            .lock("supports_ack_receive_v2")
576            .inner
577            .supports_ack_receive_v2()
578    }
579
580    /// Close the connection immediately.
581    ///
582    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
583    /// more data is sent to the peer and the peer may drop buffered data upon receiving
584    /// the CONNECTION_CLOSE frame.
585    ///
586    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
587    ///
588    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
589    /// is preserved in full, it should be kept under 1KiB.
590    ///
591    /// # Gracefully closing a connection
592    ///
593    /// Only the peer last receiving application data can be certain that all data is
594    /// delivered. The only reliable action it can then take is to close the connection,
595    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
596    /// frame is very likely if both endpoints stay online long enough, and
597    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
598    /// remote peer will time out the connection, provided that the idle timeout is not
599    /// disabled.
600    ///
601    /// The sending side can not guarantee all stream data is delivered to the remote
602    /// application. It only knows the data is delivered to the QUIC stack of the remote
603    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
604    /// [`close()`] the remote endpoint may drop any data it received but is as yet
605    /// undelivered to the application, including data that was acknowledged as received to
606    /// the local endpoint.
607    ///
608    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
609    /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
610    /// [`close()`]: Connection::close
611    /// Wake the connection driver to trigger immediate transmission of
612    /// any pending frames. Call after queuing frames at the low level
613    /// (e.g., PUNCH_ME_NOW) that bypass the stream API.
614    pub fn wake_transmit(&self) {
615        self.0.state.lock("wake_transmit").wake();
616    }
617
618    /// Close the connection immediately with the given error code and reason.
619    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
620        let conn = &mut *self.0.state.lock("close");
621        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
622    }
623
624    /// Transmit `data` as an unreliable, unordered application datagram
625    ///
626    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
627    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
628    /// dictated by the peer.
629    ///
630    /// Previously queued datagrams which are still unsent may be discarded to make space for this
631    /// datagram, in order of oldest to newest.
632    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
633        let conn = &mut *self.0.state.lock("send_datagram");
634        if let Some(ref x) = conn.error {
635            return Err(SendDatagramError::ConnectionLost(x.clone()));
636        }
637        use crate::SendDatagramError::*;
638        match conn.inner.datagrams().send(data, true) {
639            Ok(()) => {
640                conn.wake();
641                Ok(())
642            }
643            Err(e) => Err(match e {
644                Blocked(..) => unreachable!(),
645                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
646                Disabled => SendDatagramError::Disabled,
647                TooLarge => SendDatagramError::TooLarge,
648            }),
649        }
650    }
651
652    /// Transmit `data` as an unreliable, unordered application datagram
653    ///
654    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
655    /// conditions, which effectively prioritizes old datagrams over new datagrams.
656    ///
657    /// See [`send_datagram()`] for details.
658    ///
659    /// [`send_datagram()`]: Connection::send_datagram
660    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
661        SendDatagram {
662            conn: &self.0,
663            data: Some(data),
664            notify: self.0.shared.datagrams_unblocked.notified(),
665        }
666    }
667
668    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
669    ///
670    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
671    ///
672    /// This may change over the lifetime of a connection according to variation in the path MTU
673    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
674    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
675    ///
676    /// Not necessarily the maximum size of received datagrams.
677    ///
678    /// [`send_datagram()`]: Connection::send_datagram
679    pub fn max_datagram_size(&self) -> Option<usize> {
680        self.0
681            .state
682            .lock("max_datagram_size")
683            .inner
684            .datagrams()
685            .max_size()
686    }
687
688    /// Bytes available in the outgoing datagram buffer
689    ///
690    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
691    /// at most this size is guaranteed not to cause older datagrams to be dropped.
692    pub fn datagram_send_buffer_space(&self) -> usize {
693        self.0
694            .state
695            .lock("datagram_send_buffer_space")
696            .inner
697            .datagrams()
698            .send_buffer_space()
699    }
700
701    /// Total number of application datagrams that have been dropped due to receive buffer overflow
702    pub fn datagram_drop_stats(&self) -> DatagramDropStats {
703        self.0
704            .state
705            .lock("datagram_drop_stats")
706            .inner
707            .stats()
708            .datagram_drops
709    }
710
711    /// Wait for the next datagram drop notification
712    pub fn on_datagram_drop(&self) -> DatagramDrop<'_> {
713        DatagramDrop {
714            conn: &self.0,
715            notify: self.0.shared.datagram_dropped.notified(),
716        }
717    }
718
719    /// Queue an ADD_ADDRESS NAT traversal frame via the underlying connection
720    pub fn send_nat_address_advertisement(
721        &self,
722        address: SocketAddr,
723        priority: u32,
724    ) -> Result<u64, crate::ConnectionError> {
725        let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
726        conn.inner.send_nat_address_advertisement(address, priority)
727    }
728
729    /// Queue a REMOVE_ADDRESS NAT traversal frame via the underlying connection
730    pub fn send_nat_address_removal(&self, sequence: u64) -> Result<(), crate::ConnectionError> {
731        let conn = &mut *self.0.state.lock("send_nat_address_removal");
732        conn.inner.send_nat_address_removal(sequence)
733    }
734
735    /// Queue a PUNCH_ME_NOW NAT traversal frame via the underlying connection
736    pub fn send_nat_punch_coordination(
737        &self,
738        paired_with_sequence_number: u64,
739        address: SocketAddr,
740        round: u32,
741    ) -> Result<(), crate::ConnectionError> {
742        let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
743        conn.inner
744            .send_nat_punch_coordination(paired_with_sequence_number, address, round)
745    }
746
747    /// Queue a PUNCH_ME_NOW frame via a coordinator to reach a target peer behind NAT
748    ///
749    /// This sends a PUNCH_ME_NOW to the current connection (acting as coordinator)
750    /// with a target peer ID, asking the coordinator to relay to the target peer.
751    pub fn send_nat_punch_via_relay(
752        &self,
753        target_peer_id: [u8; 32],
754        our_address: SocketAddr,
755        round: u32,
756    ) -> Result<(), crate::ConnectionError> {
757        let conn = &mut *self.0.state.lock("send_nat_punch_via_relay");
758        conn.inner
759            .send_nat_punch_via_relay(target_peer_id, our_address, round)
760    }
761
762    /// Whether NAT traversal was negotiated for this connection.
763    pub fn nat_traversal_supported(&self) -> bool {
764        self.0
765            .state
766            .lock("nat_traversal_supported")
767            .inner
768            .nat_traversal_supported()
769    }
770
771    /// Whether this connection will transmit RFC-format NAT traversal frames.
772    pub fn nat_traversal_uses_rfc_frame_format(&self) -> bool {
773        self.0
774            .state
775            .lock("nat_traversal_uses_rfc_frame_format")
776            .inner
777            .nat_traversal_uses_rfc_frame_format()
778    }
779
780    /// Whether this connection accepts legacy-format NAT traversal frames.
781    pub fn nat_traversal_accepts_legacy_frame_format(&self) -> bool {
782        self.0
783            .state
784            .lock("nat_traversal_accepts_legacy_frame_format")
785            .inner
786            .nat_traversal_accepts_legacy_frame_format()
787    }
788
789    /// The side of the connection (client or server)
790    pub fn side(&self) -> Side {
791        self.0.state.lock("side").inner.side()
792    }
793
794    /// The peer's UDP address
795    ///
796    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
797    /// switching to a cellular internet connection.
798    pub fn remote_address(&self) -> SocketAddr {
799        self.0.state.lock("remote_address").inner.remote_address()
800    }
801
802    /// The external/reflexive address observed by the remote peer
803    ///
804    /// Returns the address that the remote peer has observed for this connection,
805    /// as reported via OBSERVED_ADDRESS frames. This is useful for NAT traversal
806    /// to learn the public address of this endpoint as seen by others.
807    ///
808    /// Returns `None` if:
809    /// - Address discovery is not enabled
810    /// - No OBSERVED_ADDRESS frame has been received yet
811    /// - The connection hasn't completed the handshake
812    pub fn observed_address(&self) -> Option<SocketAddr> {
813        self.0
814            .state
815            .lock("observed_address")
816            .inner
817            .observed_address()
818    }
819
820    /// Returns all observed external addresses from all QUIC paths on this connection.
821    ///
822    /// Different paths may report different address families (IPv4 vs IPv6).
823    pub fn all_observed_addresses(&self) -> Vec<SocketAddr> {
824        self.0
825            .state
826            .lock("all_observed_addresses")
827            .inner
828            .all_observed_addresses()
829    }
830
831    /// Wait for the connection's observed-address set to change.
832    pub(crate) fn observed_address_updated(&self) -> Notified<'_> {
833        self.0.shared.observed_address_updated.notified()
834    }
835
836    /// The local IP address which was used when the peer established
837    /// the connection
838    ///
839    /// This can be different from the address the endpoint is bound to, in case
840    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
841    ///
842    /// This will return `None` for clients, or when the platform does not expose this
843    /// information. See quinn_udp's RecvMeta::dst_ip for a list of
844    /// supported platforms when using quinn_udp for I/O, which is the default.
845    pub fn local_ip(&self) -> Option<IpAddr> {
846        self.0.state.lock("local_ip").inner.local_ip()
847    }
848
849    /// Current best estimate of this connection's latency (round-trip-time)
850    pub fn rtt(&self) -> Duration {
851        self.0.state.lock("rtt").inner.rtt()
852    }
853
854    /// Returns connection statistics
855    pub fn stats(&self) -> ConnectionStats {
856        self.0.state.lock("stats").inner.stats()
857    }
858
859    /// Current state of the congestion control algorithm, for debugging purposes
860    pub fn congestion_state(&self) -> Box<dyn Controller> {
861        self.0
862            .state
863            .lock("congestion_state")
864            .inner
865            .congestion_state()
866            .clone_box()
867    }
868
869    /// Parameters negotiated during the handshake
870    ///
871    /// Guaranteed to return `Some` on fully established connections or after
872    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
873    /// the returned value.
874    ///
875    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
876    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
877        self.0
878            .state
879            .lock("handshake_data")
880            .inner
881            .crypto_session()
882            .handshake_data()
883    }
884
885    /// Cryptographic identity of the peer
886    ///
887    /// The dynamic type returned is determined by the configured
888    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
889    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
890    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
891        self.0
892            .state
893            .lock("peer_identity")
894            .inner
895            .crypto_session()
896            .peer_identity()
897    }
898
899    /// A stable identifier for this connection
900    ///
901    /// Peer addresses and connection IDs can change, but this value will remain
902    /// fixed for the lifetime of the connection.
903    pub fn stable_id(&self) -> usize {
904        self.0.stable_id()
905    }
906
907    /// Returns true if this connection negotiated post-quantum settings.
908    ///
909    /// This reflects either explicit PQC algorithms advertised via transport
910    /// parameters or in-band detection from handshake CRYPTO frames.
911    pub fn is_pqc(&self) -> bool {
912        let state = self.0.state.lock("is_pqc");
913        state.inner.is_pqc()
914    }
915
916    /// Debug-only hint: returns true when the underlying TLS provider was
917    /// configured to run in KEM-only (ML‑KEM) mode. This is a diagnostic aid
918    /// for tests and does not itself guarantee group enforcement.
919    pub fn debug_kem_only(&self) -> bool {
920        crate::crypto::rustls::debug_kem_only_enabled()
921    }
922
923    /// Update traffic keys spontaneously
924    ///
925    /// This primarily exists for testing purposes.
926    pub fn force_key_update(&self) {
927        self.0
928            .state
929            .lock("force_key_update")
930            .inner
931            .force_key_update()
932    }
933
934    /// Derive keying material from this connection's TLS session secrets.
935    ///
936    /// When both peers call this method with the same `label` and `context`
937    /// arguments and `output` buffers of equal length, they will get the
938    /// same sequence of bytes in `output`. These bytes are cryptographically
939    /// strong and pseudorandom, and are suitable for use as keying material.
940    ///
941    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
942    pub fn export_keying_material(
943        &self,
944        output: &mut [u8],
945        label: &[u8],
946        context: &[u8],
947    ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
948        self.0
949            .state
950            .lock("export_keying_material")
951            .inner
952            .crypto_session()
953            .export_keying_material(output, label, context)
954    }
955
956    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
957    ///
958    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
959    /// `count`s increase both minimum and worst-case memory consumption.
960    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
961        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
962        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
963        // May need to send MAX_STREAMS to make progress
964        conn.wake();
965    }
966
967    /// See [`crate::TransportConfig::receive_window()`]
968    pub fn set_receive_window(&self, receive_window: VarInt) {
969        let mut conn = self.0.state.lock("set_receive_window");
970        conn.inner.set_receive_window(receive_window);
971        conn.wake();
972    }
973
974    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
975    ///
976    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
977    /// `count`s increase both minimum and worst-case memory consumption.
978    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
979        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
980        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
981        // May need to send MAX_STREAMS to make progress
982        conn.wake();
983    }
984
985    /// Set up qlog for this connection.
986    #[cfg(feature = "__qlog")]
987    pub fn set_qlog(
988        &mut self,
989        writer: Box<dyn std::io::Write + Send + Sync>,
990        title: Option<String>,
991        description: Option<String>,
992    ) {
993        let mut state = self.0.state.lock("__qlog");
994        state
995            .inner
996            .set_qlog(writer, title, description, Instant::now());
997    }
998}
999
1000pin_project! {
1001    /// Future produced by [`Connection::open_uni`]
1002    pub struct OpenUni<'a> {
1003        conn: &'a ConnectionRef,
1004        #[pin]
1005        notify: Notified<'a>,
1006    }
1007}
1008
1009impl Future for OpenUni<'_> {
1010    type Output = Result<SendStream, ConnectionError>;
1011    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1012        let this = self.project();
1013        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
1014        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
1015    }
1016}
1017
1018pin_project! {
1019    /// Future produced by [`Connection::open_bi`]
1020    pub struct OpenBi<'a> {
1021        conn: &'a ConnectionRef,
1022        #[pin]
1023        notify: Notified<'a>,
1024    }
1025}
1026
1027impl Future for OpenBi<'_> {
1028    type Output = Result<(SendStream, RecvStream), ConnectionError>;
1029    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1030        let this = self.project();
1031        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
1032
1033        Poll::Ready(Ok((
1034            SendStream::new(conn.clone(), id, is_0rtt),
1035            RecvStream::new(conn, id, is_0rtt),
1036        )))
1037    }
1038}
1039
1040fn poll_open<'a>(
1041    ctx: &mut Context<'_>,
1042    conn: &'a ConnectionRef,
1043    mut notify: Pin<&mut Notified<'a>>,
1044    dir: Dir,
1045) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1046    let mut state = conn.state.lock("poll_open");
1047    if let Some(ref e) = state.error {
1048        return Poll::Ready(Err(e.clone()));
1049    } else if let Some(id) = state.inner.streams().open(dir) {
1050        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
1051        drop(state); // Release the lock so clone can take it
1052        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1053    }
1054    loop {
1055        match notify.as_mut().poll(ctx) {
1056            // `state` lock ensures we didn't race with readiness
1057            Poll::Pending => return Poll::Pending,
1058            // Spurious wakeup, get a new future
1059            Poll::Ready(()) => {
1060                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
1061            }
1062        }
1063    }
1064}
1065
1066pin_project! {
1067    /// Future produced by [`Connection::accept_uni`]
1068    pub struct AcceptUni<'a> {
1069        conn: &'a ConnectionRef,
1070        #[pin]
1071        notify: Notified<'a>,
1072    }
1073}
1074
1075impl Future for AcceptUni<'_> {
1076    type Output = Result<RecvStream, ConnectionError>;
1077
1078    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1079        let this = self.project();
1080        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
1081        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
1082    }
1083}
1084
1085pin_project! {
1086    /// Future produced by [`Connection::accept_bi`]
1087    pub struct AcceptBi<'a> {
1088        conn: &'a ConnectionRef,
1089        #[pin]
1090        notify: Notified<'a>,
1091    }
1092}
1093
1094impl Future for AcceptBi<'_> {
1095    type Output = Result<(SendStream, RecvStream), ConnectionError>;
1096
1097    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1098        let this = self.project();
1099        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
1100        Poll::Ready(Ok((
1101            SendStream::new(conn.clone(), id, is_0rtt),
1102            RecvStream::new(conn, id, is_0rtt),
1103        )))
1104    }
1105}
1106
1107fn poll_accept<'a>(
1108    ctx: &mut Context<'_>,
1109    conn: &'a ConnectionRef,
1110    mut notify: Pin<&mut Notified<'a>>,
1111    dir: Dir,
1112) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1113    let mut state = conn.state.lock("poll_accept");
1114    // Check for incoming streams before checking `state.error` so that already-received streams,
1115    // which are necessarily finite, can be drained from a closed connection.
1116    if let Some(id) = state.inner.streams().accept(dir) {
1117        let is_0rtt = state.inner.is_handshaking();
1118        state.wake(); // To send additional stream ID credit
1119        drop(state); // Release the lock so clone can take it
1120        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1121    } else if let Some(ref e) = state.error {
1122        return Poll::Ready(Err(e.clone()));
1123    }
1124    loop {
1125        match notify.as_mut().poll(ctx) {
1126            // `state` lock ensures we didn't race with readiness
1127            Poll::Pending => return Poll::Pending,
1128            // Spurious wakeup, get a new future
1129            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1130        }
1131    }
1132}
1133
1134pin_project! {
1135    /// Future produced by [`Connection::read_datagram`]
1136    pub struct ReadDatagram<'a> {
1137        conn: &'a ConnectionRef,
1138        #[pin]
1139        notify: Notified<'a>,
1140    }
1141}
1142
1143impl Future for ReadDatagram<'_> {
1144    type Output = Result<Bytes, ConnectionError>;
1145    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1146        let mut this = self.project();
1147        let mut state = this.conn.state.lock("ReadDatagram::poll");
1148        // Check for buffered datagrams before checking `state.error` so that already-received
1149        // datagrams, which are necessarily finite, can be drained from a closed connection.
1150        match state.inner.datagrams().recv() {
1151            Some(x) => {
1152                return Poll::Ready(Ok(x));
1153            }
1154            _ => {
1155                if let Some(ref e) = state.error {
1156                    return Poll::Ready(Err(e.clone()));
1157                }
1158            }
1159        }
1160        loop {
1161            match this.notify.as_mut().poll(ctx) {
1162                // `state` lock ensures we didn't race with readiness
1163                Poll::Pending => return Poll::Pending,
1164                // Spurious wakeup, get a new future
1165                Poll::Ready(()) => this
1166                    .notify
1167                    .set(this.conn.shared.datagram_received.notified()),
1168            }
1169        }
1170    }
1171}
1172
1173pin_project! {
1174    /// Future produced by [`Connection::on_datagram_drop`]
1175    pub struct DatagramDrop<'a> {
1176        conn: &'a ConnectionRef,
1177        #[pin]
1178        notify: Notified<'a>,
1179    }
1180}
1181
1182impl Future for DatagramDrop<'_> {
1183    type Output = Result<DatagramDropStats, ConnectionError>;
1184
1185    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1186        let mut this = self.project();
1187        let mut state = this.conn.state.lock("DatagramDrop::poll");
1188        if let Some(drop) = state.datagram_drop_events.pop_front() {
1189            return Poll::Ready(Ok(drop));
1190        }
1191        if let Some(ref e) = state.error {
1192            return Poll::Ready(Err(e.clone()));
1193        }
1194        loop {
1195            match this.notify.as_mut().poll(ctx) {
1196                // `state` lock ensures we didn't race with readiness
1197                Poll::Pending => return Poll::Pending,
1198                Poll::Ready(()) => this
1199                    .notify
1200                    .set(this.conn.shared.datagram_dropped.notified()),
1201            }
1202        }
1203    }
1204}
1205
1206pin_project! {
1207    /// Future produced by [`Connection::send_datagram_wait`]
1208    pub struct SendDatagram<'a> {
1209        conn: &'a ConnectionRef,
1210        data: Option<Bytes>,
1211        #[pin]
1212        notify: Notified<'a>,
1213    }
1214}
1215
1216impl Future for SendDatagram<'_> {
1217    type Output = Result<(), SendDatagramError>;
1218    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1219        let mut this = self.project();
1220        let mut state = this.conn.state.lock("SendDatagram::poll");
1221        if let Some(ref e) = state.error {
1222            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1223        }
1224        use crate::SendDatagramError::*;
1225        match state.inner.datagrams().send(
1226            this.data.take().ok_or_else(|| {
1227                error!("SendDatagram future polled without data");
1228                SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
1229            })?,
1230            false,
1231        ) {
1232            Ok(()) => {
1233                state.wake();
1234                Poll::Ready(Ok(()))
1235            }
1236            Err(e) => Poll::Ready(Err(match e {
1237                Blocked(data) => {
1238                    this.data.replace(data);
1239                    loop {
1240                        match this.notify.as_mut().poll(ctx) {
1241                            Poll::Pending => return Poll::Pending,
1242                            // Spurious wakeup, get a new future
1243                            Poll::Ready(()) => this
1244                                .notify
1245                                .set(this.conn.shared.datagrams_unblocked.notified()),
1246                        }
1247                    }
1248                }
1249                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1250                Disabled => SendDatagramError::Disabled,
1251                TooLarge => SendDatagramError::TooLarge,
1252            })),
1253        }
1254    }
1255}
1256
1257#[derive(Debug)]
1258pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1259
1260impl ConnectionRef {
1261    #[allow(clippy::too_many_arguments)]
1262    fn new(
1263        handle: ConnectionHandle,
1264        conn: crate::Connection,
1265        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1266        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1267        on_handshake_data: oneshot::Sender<()>,
1268        on_connected: oneshot::Sender<bool>,
1269        socket: Arc<dyn AsyncUdpSocket>,
1270        runtime: Arc<dyn Runtime>,
1271    ) -> Self {
1272        Self(Arc::new(ConnectionInner {
1273            state: Mutex::new(State {
1274                inner: conn,
1275                driver: None,
1276                handle,
1277                on_handshake_data: Some(on_handshake_data),
1278                on_connected: Some(on_connected),
1279                connected: false,
1280                timer: None,
1281                timer_deadline: None,
1282                conn_events,
1283                endpoint_events,
1284                blocked_writers: FxHashMap::default(),
1285                blocked_readers: FxHashMap::default(),
1286                stopped: FxHashMap::default(),
1287                error: None,
1288                ref_count: 0,
1289                datagram_drop_events: VecDeque::new(),
1290                udp_sender: socket.create_sender(),
1291                socket,
1292                runtime,
1293                send_buffer: Vec::new(),
1294                buffered_transmit: None,
1295                binding_started: false,
1296            }),
1297            shared: Shared::default(),
1298        }))
1299    }
1300
1301    fn stable_id(&self) -> usize {
1302        &*self.0 as *const _ as usize
1303    }
1304}
1305
1306impl Clone for ConnectionRef {
1307    fn clone(&self) -> Self {
1308        self.state.lock("clone").ref_count += 1;
1309        Self(self.0.clone())
1310    }
1311}
1312
1313impl Drop for ConnectionRef {
1314    fn drop(&mut self) {
1315        let conn = &mut *self.state.lock("drop");
1316        if let Some(x) = conn.ref_count.checked_sub(1) {
1317            conn.ref_count = x;
1318            if x == 0 && !conn.inner.is_closed() {
1319                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1320                // not, we can't do any harm. If there were any streams being opened, then either
1321                // the connection will be closed for an unrelated reason or a fresh reference will
1322                // be constructed for the newly opened stream.
1323                conn.implicit_close(&self.shared);
1324            }
1325        }
1326    }
1327}
1328
1329impl std::ops::Deref for ConnectionRef {
1330    type Target = ConnectionInner;
1331    fn deref(&self) -> &Self::Target {
1332        &self.0
1333    }
1334}
1335
1336#[derive(Debug)]
1337pub(crate) struct ConnectionInner {
1338    pub(crate) state: Mutex<State>,
1339    pub(crate) shared: Shared,
1340}
1341
1342#[derive(Debug, Default)]
1343pub(crate) struct Shared {
1344    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1345    /// control budget
1346    stream_budget_available: [Notify; 2],
1347    /// Notified when the peer has initiated a new stream
1348    stream_incoming: [Notify; 2],
1349    datagram_received: Notify,
1350    datagrams_unblocked: Notify,
1351    datagram_dropped: Notify,
1352    observed_address_updated: Notify,
1353    closed: Notify,
1354}
1355
1356pub(crate) struct State {
1357    pub(crate) inner: crate::Connection,
1358    driver: Option<Waker>,
1359    handle: ConnectionHandle,
1360    on_handshake_data: Option<oneshot::Sender<()>>,
1361    on_connected: Option<oneshot::Sender<bool>>,
1362    connected: bool,
1363    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1364    timer_deadline: Option<Instant>,
1365    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1366    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1367    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1368    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1369    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1370    /// Always set to Some before the connection becomes drained
1371    pub(crate) error: Option<ConnectionError>,
1372    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1373    ref_count: usize,
1374    datagram_drop_events: VecDeque<DatagramDropStats>,
1375    socket: Arc<dyn AsyncUdpSocket>,
1376    udp_sender: Pin<Box<dyn UdpSender>>,
1377    runtime: Arc<dyn Runtime>,
1378    send_buffer: Vec<u8>,
1379    /// We buffer a transmit when the underlying I/O would block
1380    buffered_transmit: Option<crate::Transmit>,
1381    /// True once we've initiated automatic channel binding (if enabled)
1382    binding_started: bool,
1383}
1384
1385impl State {
1386    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1387        let now = self.runtime.now();
1388        let mut transmits = 0;
1389
1390        let max_datagrams = self
1391            .udp_sender
1392            .max_transmit_segments()
1393            .min(MAX_TRANSMIT_SEGMENTS);
1394
1395        loop {
1396            // Retry the last transmit, or get a new one.
1397            let t = match self.buffered_transmit.take() {
1398                Some(t) => t,
1399                None => {
1400                    self.send_buffer.clear();
1401                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
1402                    match self
1403                        .inner
1404                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1405                    {
1406                        Some(t) => {
1407                            transmits += match t.segment_size {
1408                                None => 1,
1409                                Some(s) => t.size.div_ceil(s), // round up
1410                            };
1411                            t
1412                        }
1413                        None => break,
1414                    }
1415                }
1416            };
1417
1418            // X0X-0043 GSO bundle accounting. We only count multi-segment
1419            // bundles (`segment_size.is_some()` and segment count > 1);
1420            // single datagrams are not GSO bundles and would muddy the
1421            // "did GSO actually fire?" question this telemetry exists to
1422            // answer. See `crate::diagnostics::gso` module docs for the
1423            // current observability limitation: ant-quic's in-tree
1424            // `UdpSender` impls return `max_transmit_segments() = 1`
1425            // and use `try_send_to(transmit.contents, ...)`, so under
1426            // current code paths `t.segment_size` is `None` and this
1427            // branch is dead until kernel GSO is wired into the runtime.
1428            let segment_count = match t.segment_size {
1429                Some(s) if s > 0 => t.size.div_ceil(s),
1430                _ => 1,
1431            };
1432            let is_gso_bundle = segment_count > 1;
1433
1434            let len = t.size;
1435            match self
1436                .udp_sender
1437                .as_mut()
1438                .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1439            {
1440                Poll::Pending => {
1441                    self.buffered_transmit = Some(t);
1442                    return Ok(false);
1443                }
1444                Poll::Ready(Ok(())) => {
1445                    if is_gso_bundle {
1446                        crate::diagnostics::gso_diagnostics()
1447                            .record_bundle_submitted(segment_count);
1448                    }
1449                }
1450                Poll::Ready(Err(e)) => {
1451                    // X0X-0043: a hard error from the kernel send path is
1452                    // the closest measurable proxy for a "partial send"
1453                    // today. When the runtime is rewritten to use
1454                    // `quinn_udp::UdpSocketState::send`, the per-segment
1455                    // delivered count it returns will replace this heuristic.
1456                    if is_gso_bundle {
1457                        crate::diagnostics::gso_diagnostics()
1458                            .record_bundle_submitted(segment_count);
1459                        crate::diagnostics::gso_diagnostics().record_bundle_partial_send();
1460                    }
1461                    return Err(e);
1462                }
1463            }
1464
1465            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1466                // TODO: What isn't ideal here yet is that if we don't poll all
1467                // datagrams that could be sent we don't go into the `app_limited`
1468                // state and CWND continues to grow until we get here the next time.
1469                // See https://github.com/quinn-rs/quinn/issues/1126
1470                return Ok(true);
1471            }
1472        }
1473
1474        Ok(false)
1475    }
1476
1477    fn forward_endpoint_events(&mut self) {
1478        while let Some(event) = self.inner.poll_endpoint_events() {
1479            // If the endpoint driver is gone, noop.
1480            let _ = self.endpoint_events.send((self.handle, event));
1481        }
1482    }
1483
1484    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1485    fn process_conn_events(
1486        &mut self,
1487        shared: &Shared,
1488        cx: &mut Context,
1489    ) -> Result<(), ConnectionError> {
1490        loop {
1491            match self.conn_events.poll_recv(cx) {
1492                Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1493                    self.socket = socket;
1494                    self.udp_sender = self.socket.create_sender();
1495                    self.inner.local_address_changed();
1496                }
1497                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1498                    let mut observed_before = self.inner.all_observed_addresses();
1499                    observed_before.sort_unstable();
1500                    observed_before.dedup();
1501
1502                    self.inner.handle_event(event);
1503
1504                    let mut observed_after = self.inner.all_observed_addresses();
1505                    observed_after.sort_unstable();
1506                    observed_after.dedup();
1507                    if observed_before != observed_after {
1508                        shared.observed_address_updated.notify_one();
1509                    }
1510                }
1511                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1512                    self.close(error_code, reason, shared);
1513                }
1514                Poll::Ready(None) => {
1515                    return Err(ConnectionError::TransportError(crate::TransportError {
1516                        code: crate::TransportErrorCode::INTERNAL_ERROR,
1517                        frame: None,
1518                        reason: "endpoint driver future was dropped".to_string(),
1519                    }));
1520                }
1521                Poll::Pending => {
1522                    return Ok(());
1523                }
1524            }
1525        }
1526    }
1527
1528    fn forward_app_events(&mut self, shared: &Shared) {
1529        while let Some(event) = self.inner.poll() {
1530            use crate::Event::*;
1531            match event {
1532                HandshakeDataReady => {
1533                    if let Some(x) = self.on_handshake_data.take() {
1534                        let _ = x.send(());
1535                    }
1536                }
1537                Connected => {
1538                    self.connected = true;
1539                    if let Some(x) = self.on_connected.take() {
1540                        // We don't care if the on-connected future was dropped
1541                        let _ = x.send(self.inner.accepted_0rtt());
1542                    }
1543                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1544                        // Wake up rejected 0-RTT streams so they can fail immediately with
1545                        // `ZeroRttRejected` errors.
1546                        wake_all(&mut self.blocked_writers);
1547                        wake_all(&mut self.blocked_readers);
1548                        wake_all_notify(&mut self.stopped);
1549                    }
1550                }
1551                ConnectionLost { reason } => {
1552                    self.terminate(reason, shared);
1553                }
1554                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1555                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1556                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1557                }
1558                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1559                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1560                }
1561                DatagramReceived => {
1562                    shared.datagram_received.notify_waiters();
1563                }
1564                DatagramsUnblocked => {
1565                    shared.datagrams_unblocked.notify_waiters();
1566                }
1567                DatagramDropped(drop) => {
1568                    // Buffer overflow - surface to application via dedicated queue and notify
1569                    tracing::debug!(
1570                        datagrams = drop.datagrams,
1571                        bytes = drop.bytes,
1572                        "datagrams dropped due to receive buffer overflow"
1573                    );
1574                    self.datagram_drop_events.push_back(drop);
1575                    shared.datagram_dropped.notify_waiters();
1576                }
1577                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1578                Stream(StreamEvent::Available { dir }) => {
1579                    // Might mean any number of streams are ready, so we wake up everyone
1580                    shared.stream_budget_available[dir as usize].notify_waiters();
1581                }
1582                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1583                Stream(StreamEvent::Stopped { id, .. }) => {
1584                    wake_stream_notify(id, &mut self.stopped);
1585                    wake_stream(id, &mut self.blocked_writers);
1586                }
1587            }
1588        }
1589    }
1590
1591    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1592        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1593        // timer is registered with the runtime (and check whether it's already
1594        // expired).
1595        match self.inner.poll_timeout() {
1596            Some(deadline) => {
1597                if let Some(delay) = &mut self.timer {
1598                    // There is no need to reset the tokio timer if the deadline
1599                    // did not change
1600                    if self
1601                        .timer_deadline
1602                        .map(|current_deadline| current_deadline != deadline)
1603                        .unwrap_or(true)
1604                    {
1605                        delay.as_mut().reset(deadline);
1606                    }
1607                } else {
1608                    self.timer = Some(self.runtime.new_timer(deadline));
1609                }
1610                // Store the actual expiration time of the timer
1611                self.timer_deadline = Some(deadline);
1612            }
1613            None => {
1614                self.timer_deadline = None;
1615                return false;
1616            }
1617        }
1618
1619        if self.timer_deadline.is_none() {
1620            return false;
1621        }
1622
1623        let delay = match self.timer.as_mut() {
1624            Some(timer) => timer.as_mut(),
1625            None => {
1626                error!("Timer missing in state where it should exist");
1627                return false;
1628            }
1629        };
1630        if delay.poll(cx).is_pending() {
1631            // Since there wasn't a timeout event, there is nothing new
1632            // for the connection to do
1633            return false;
1634        }
1635
1636        // A timer expired, so the caller needs to check for
1637        // new transmits, which might cause new timers to be set.
1638        self.inner.handle_timeout(self.runtime.now());
1639        self.timer_deadline = None;
1640        true
1641    }
1642
1643    /// Wake up a blocked `Driver` task to process I/O
1644    pub(crate) fn wake(&mut self) {
1645        if let Some(x) = self.driver.take() {
1646            x.wake();
1647        }
1648    }
1649
1650    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1651    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1652        self.error = Some(reason.clone());
1653        if let Some(x) = self.on_handshake_data.take() {
1654            let _ = x.send(());
1655        }
1656        wake_all(&mut self.blocked_writers);
1657        wake_all(&mut self.blocked_readers);
1658        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1659        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1660        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1661        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1662        shared.datagram_received.notify_waiters();
1663        shared.datagrams_unblocked.notify_waiters();
1664        shared.datagram_dropped.notify_waiters();
1665        shared.observed_address_updated.notify_waiters();
1666        if let Some(x) = self.on_connected.take() {
1667            let _ = x.send(false);
1668        }
1669        wake_all_notify(&mut self.stopped);
1670        shared.closed.notify_waiters();
1671    }
1672
1673    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1674        self.inner.close(self.runtime.now(), error_code, reason);
1675        self.terminate(ConnectionError::LocallyClosed, shared);
1676        self.wake();
1677    }
1678
1679    /// Close for a reason other than the application's explicit request
1680    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1681        self.close(0u32.into(), Bytes::new(), shared);
1682    }
1683
1684    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1685        if self.inner.is_handshaking()
1686            || self.inner.accepted_0rtt()
1687            || self.inner.side().is_server()
1688        {
1689            Ok(())
1690        } else {
1691            Err(())
1692        }
1693    }
1694}
1695
1696impl Drop for State {
1697    fn drop(&mut self) {
1698        if !self.inner.is_drained() {
1699            // Ensure the endpoint can tidy up
1700            let _ = self
1701                .endpoint_events
1702                .send((self.handle, crate::EndpointEvent::drained()));
1703        }
1704    }
1705}
1706
1707impl fmt::Debug for State {
1708    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1709        f.debug_struct("State").field("inner", &self.inner).finish()
1710    }
1711}
1712
1713#[cfg(test)]
1714mod tests {
1715    use std::sync::Arc;
1716
1717    use rustls::pki_types::{CertificateDer, PrivateKeyDer};
1718    use tokio::time::{Duration, timeout};
1719
1720    use crate::config::{ClientConfig, ServerConfig};
1721    use crate::high_level::Endpoint;
1722
1723    fn gen_self_signed_cert() -> (Vec<CertificateDer<'static>>, PrivateKeyDer<'static>) {
1724        let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])
1725            .expect("generate self-signed cert");
1726        let cert_der = CertificateDer::from(cert.cert);
1727        let key_der = PrivateKeyDer::Pkcs8(cert.signing_key.serialize_der().into());
1728        (vec![cert_der], key_der)
1729    }
1730
1731    fn client_config(chain: &[CertificateDer<'static>]) -> ClientConfig {
1732        let mut roots = rustls::RootCertStore::empty();
1733        for cert in chain.iter().cloned() {
1734            roots.add(cert).expect("add root");
1735        }
1736        ClientConfig::with_root_certificates(Arc::new(roots)).expect("client config")
1737    }
1738
1739    #[tokio::test]
1740    async fn weak_connection_handle_does_not_keep_connection_alive() {
1741        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
1742
1743        let (chain, key) = gen_self_signed_cert();
1744        let server_config =
1745            ServerConfig::with_single_cert(chain.clone(), key).expect("server config");
1746        let server =
1747            Endpoint::server(server_config, ([127, 0, 0, 1], 0).into()).expect("server endpoint");
1748        let server_addr = server.local_addr().expect("server local addr");
1749
1750        let server_task = tokio::spawn(async move {
1751            let incoming = timeout(Duration::from_secs(10), server.accept())
1752                .await
1753                .expect("server accept wait")
1754                .expect("incoming connection");
1755            let conn = timeout(Duration::from_secs(10), incoming)
1756                .await
1757                .expect("server handshake wait")
1758                .expect("server handshake");
1759            let _ = timeout(Duration::from_secs(10), conn.closed()).await;
1760        });
1761
1762        let mut client = Endpoint::client(([127, 0, 0, 1], 0).into()).expect("client endpoint");
1763        client.set_default_client_config(client_config(&chain));
1764        let connecting = client
1765            .connect(server_addr, "localhost")
1766            .expect("connect start");
1767        let conn = timeout(Duration::from_secs(10), connecting)
1768            .await
1769            .expect("client handshake wait")
1770            .expect("client handshake");
1771
1772        let weak = conn.weak_handle();
1773        assert!(weak.is_alive());
1774
1775        let upgraded = weak.upgrade().expect("weak upgrade while live");
1776        assert!(weak.is_same_connection(&upgraded.weak_handle()));
1777        drop(upgraded);
1778
1779        conn.close(0u32.into(), b"x0x-0045");
1780        let _ = timeout(Duration::from_secs(10), conn.on_closed())
1781            .await
1782            .expect("client closed");
1783        assert!(!weak.is_alive());
1784
1785        drop(conn);
1786        drop(client);
1787
1788        timeout(Duration::from_secs(10), async {
1789            while weak.upgrade().is_some() {
1790                tokio::time::sleep(Duration::from_millis(10)).await;
1791            }
1792        })
1793        .await
1794        .expect("weak handle released after strong handles dropped");
1795
1796        server_task.await.expect("server task");
1797    }
1798}
1799
1800fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1801    if let Some(waker) = wakers.remove(&stream_id) {
1802        waker.wake();
1803    }
1804}
1805
1806fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1807    wakers.drain().for_each(|(_, waker)| waker.wake())
1808}
1809
1810fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1811    if let Some(notify) = wakers.remove(&stream_id) {
1812        notify.notify_waiters()
1813    }
1814}
1815
1816fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1817    wakers
1818        .drain()
1819        .for_each(|(_, notify)| notify.notify_waiters())
1820}
1821
1822/// Errors that can arise when sending a datagram
1823#[derive(Debug, Error, Clone, Eq, PartialEq)]
1824pub enum SendDatagramError {
1825    /// The peer does not support receiving datagram frames
1826    #[error("datagrams not supported by peer")]
1827    UnsupportedByPeer,
1828    /// Datagram support is disabled locally
1829    #[error("datagram support disabled")]
1830    Disabled,
1831    /// The datagram is larger than the connection can currently accommodate
1832    ///
1833    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1834    /// exceeded.
1835    #[error("datagram too large")]
1836    TooLarge,
1837    /// The connection was lost
1838    #[error("connection lost")]
1839    ConnectionLost(#[from] ConnectionError),
1840}
1841
1842/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1843///
1844/// This limits the amount of CPU resources consumed by datagram generation,
1845/// and allows other tasks (like receiving ACKs) to run in between.
1846const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1847
1848/// The maximum amount of datagrams that are sent in a single transmit
1849///
1850/// This can be lower than the maximum platform capabilities, to avoid excessive
1851/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1852/// that numbers around 10 are a good compromise.
1853const MAX_TRANSMIT_SEGMENTS: usize = 10;