Skip to main content

ant_quic/high_level/
connection.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    any::Any,
10    collections::VecDeque,
11    fmt,
12    future::Future,
13    io,
14    net::{IpAddr, SocketAddr},
15    pin::Pin,
16    sync::Arc,
17    task::{Context, Poll, Waker, ready},
18};
19
20use bytes::Bytes;
21use pin_project_lite::pin_project;
22use rustc_hash::FxHashMap;
23use thiserror::Error;
24use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
25use tracing::{Instrument, Span, debug, debug_span, error};
26
27use super::{
28    ConnectionEvent,
29    mutex::Mutex,
30    recv_stream::RecvStream,
31    runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
32    send_stream::SendStream,
33    udp_transmit,
34};
35use crate::{
36    ConnectionError, ConnectionHandle, ConnectionStats, DatagramDropStats, Dir, Duration,
37    EndpointEvent, Instant, Side, StreamEvent, StreamId, VarInt, congestion::Controller,
38};
39
40/// In-progress connection attempt future
41#[derive(Debug)]
42pub struct Connecting {
43    conn: Option<ConnectionRef>,
44    connected: oneshot::Receiver<bool>,
45    handshake_data_ready: Option<oneshot::Receiver<()>>,
46}
47
48fn is_expected_background_io_error(error: &io::Error) -> bool {
49    matches!(
50        error.kind(),
51        io::ErrorKind::AddrNotAvailable
52            | io::ErrorKind::ConnectionRefused
53            | io::ErrorKind::ConnectionReset
54            | io::ErrorKind::HostUnreachable
55            | io::ErrorKind::NetworkUnreachable
56            | io::ErrorKind::NotConnected
57            | io::ErrorKind::TimedOut
58    ) || matches!(error.raw_os_error(), Some(49 | 51 | 65 | 101 | 113))
59}
60
61impl Connecting {
62    pub(crate) fn new(
63        handle: ConnectionHandle,
64        conn: crate::Connection,
65        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
66        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
67        socket: Arc<dyn AsyncUdpSocket>,
68        runtime: Arc<dyn Runtime>,
69    ) -> Self {
70        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
71        let (on_connected_send, on_connected_recv) = oneshot::channel();
72        let conn = ConnectionRef::new(
73            handle,
74            conn,
75            endpoint_events,
76            conn_events,
77            on_handshake_data_send,
78            on_connected_send,
79            socket,
80            runtime.clone(),
81        );
82
83        let driver = ConnectionDriver(conn.clone());
84        runtime.spawn(Box::pin(
85            async {
86                if let Err(e) = driver.await {
87                    if is_expected_background_io_error(&e) {
88                        debug!("background I/O path ended: {e}");
89                    } else {
90                        tracing::error!("I/O error: {e}");
91                    }
92                }
93            }
94            .instrument(Span::current()),
95        ));
96
97        Self {
98            conn: Some(conn),
99            connected: on_connected_recv,
100            handshake_data_ready: Some(on_handshake_data_recv),
101        }
102    }
103
104    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
105    ///
106    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
107    /// If so, the returned [`Connection`] can be used to send application data without waiting for
108    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
109    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
110    /// complete, at which point subsequently opened streams and written data will have full
111    /// cryptographic protection.
112    ///
113    /// ## Outgoing
114    ///
115    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
116    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
117    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
118    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
119    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
120    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
121    /// If it was rejected, the existence of streams opened and other application data sent prior
122    /// to the handshake completing will not be conveyed to the remote application, and local
123    /// operations on them will return `ZeroRttRejected` errors.
124    ///
125    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
126    /// relevant resumption state to be stored in the server, which servers may limit or lose for
127    /// various reasons including not persisting resumption state across server restarts.
128    ///
129    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
130    /// implementation's docs for 0-RTT pitfalls.
131    ///
132    /// ## Incoming
133    ///
134    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
135    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
136    ///
137    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
138    /// implementation's docs for 0-RTT pitfalls.
139    ///
140    /// ## Security
141    ///
142    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
143    /// replay attacks, and should therefore never invoke non-idempotent operations.
144    ///
145    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
146    /// before TLS client authentication has occurred, and should therefore not be used to send
147    /// data for which client authentication is being used.
148    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
149        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
150        // have to release it explicitly before returning `self` by value.
151        let conn = match self.conn.as_mut() {
152            Some(conn) => conn.state.lock("into_0rtt"),
153            None => {
154                return Err(self);
155            }
156        };
157
158        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
159        drop(conn);
160
161        if is_ok {
162            match self.conn.take() {
163                Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
164                None => {
165                    tracing::error!("Connection state missing during 0-RTT acceptance");
166                    Err(self)
167                }
168            }
169        } else {
170            Err(self)
171        }
172    }
173
174    /// Parameters negotiated during the handshake
175    ///
176    /// The dynamic type returned is determined by the configured
177    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
178    /// be [`downcast`](Box::downcast) to a
179    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
180    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
181        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
182        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
183        // simple.
184        if let Some(x) = self.handshake_data_ready.take() {
185            let _ = x.await;
186        }
187        let conn = self.conn.as_ref().ok_or_else(|| {
188            tracing::error!("Connection state missing while retrieving handshake data");
189            ConnectionError::LocallyClosed
190        })?;
191        let inner = conn.state.lock("handshake");
192        inner
193            .inner
194            .crypto_session()
195            .handshake_data()
196            .ok_or_else(|| {
197                inner.error.clone().unwrap_or_else(|| {
198                    error!("Spurious handshake data ready notification with no error");
199                    ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
200                        "Spurious handshake notification".to_string(),
201                    ))
202                })
203            })
204    }
205
206    /// The local IP address which was used when the peer established
207    /// the connection
208    ///
209    /// This can be different from the address the endpoint is bound to, in case
210    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
211    ///
212    /// This will return `None` for clients, or when the platform does not expose this
213    /// information. See quinn_udp's RecvMeta::dst_ip for a list of
214    /// supported platforms when using quinn_udp for I/O, which is the default.
215    ///
216    /// Will panic if called after `poll` has returned `Ready`.
217    pub fn local_ip(&self) -> Option<IpAddr> {
218        let conn = self.conn.as_ref()?;
219        let inner = conn.state.lock("local_ip");
220
221        inner.inner.local_ip()
222    }
223
224    /// The peer's UDP address
225    ///
226    /// Returns an error if called after `poll` has returned `Ready`.
227    pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
228        let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
229            error!("Connection used after yielding Ready");
230            ConnectionError::LocallyClosed
231        })?;
232        Ok(conn_ref.state.lock("remote_address").inner.remote_address())
233    }
234}
235
236impl Future for Connecting {
237    type Output = Result<Connection, ConnectionError>;
238    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
239        Pin::new(&mut self.connected).poll(cx).map(|_| {
240            let conn = self.conn.take().ok_or_else(|| {
241                error!("Connection not available when connecting future resolves");
242                ConnectionError::LocallyClosed
243            })?;
244            let inner = conn.state.lock("connecting");
245            if inner.connected {
246                drop(inner);
247                Ok(Connection(conn))
248            } else {
249                Err(inner.error.clone().unwrap_or_else(|| {
250                    ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
251                        "connection failed without error".to_string(),
252                    ))
253                }))
254            }
255        })
256    }
257}
258
259/// Future that completes when a connection is fully established
260///
261/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
262/// value is meaningless.
263pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
264
265impl Future for ZeroRttAccepted {
266    type Output = bool;
267    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
268        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
269    }
270}
271
272/// A future that drives protocol logic for a connection
273///
274/// This future handles the protocol logic for a single connection, routing events from the
275/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
276/// It also keeps track of outstanding timeouts for the `Connection`.
277///
278/// If the connection encounters an error condition, this future will yield an error. It will
279/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
280/// connection-related futures, this waits for the draining period to complete to ensure that
281/// packets still in flight from the peer are handled gracefully.
282#[must_use = "connection drivers must be spawned for their connections to function"]
283#[derive(Debug)]
284struct ConnectionDriver(ConnectionRef);
285
286impl Future for ConnectionDriver {
287    type Output = Result<(), io::Error>;
288
289    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
290        let conn = &mut *self.0.state.lock("poll");
291
292        let span = debug_span!("drive", id = conn.handle.0);
293        let _guard = span.enter();
294
295        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
296            conn.terminate(e, &self.0.shared);
297            return Poll::Ready(Ok(()));
298        }
299        let mut keep_going = conn.drive_transmit(cx)?;
300        // If a timer expires, there might be more to transmit. When we transmit something, we
301        // might need to reset a timer. Hence, we must loop until neither happens.
302        keep_going |= conn.drive_timer(cx);
303        conn.forward_endpoint_events();
304        conn.forward_app_events(&self.0.shared);
305
306        // Kick off automatic channel binding once connected, if configured
307        if conn.connected && !conn.binding_started {
308            if let Some(rt) = crate::trust::global_runtime() {
309                // Delay NEW_TOKEN until binding completes
310                conn.inner.set_delay_new_token_until_binding(true);
311
312                let hl_conn_server = Connection(self.0.clone());
313                let hl_conn_client = hl_conn_server.clone();
314                let store = rt.store.clone();
315                let policy = rt.policy.clone();
316                let signer = rt.local_secret_key.clone();
317                let spki = rt.local_spki.clone();
318                let runtime = conn.runtime.clone();
319
320                if conn.inner.side().is_server() {
321                    runtime.spawn(Box::pin(async move {
322                        match crate::trust::recv_verify_binding(&hl_conn_server, &*store, &policy)
323                            .await
324                        {
325                            Ok(peer) => {
326                                hl_conn_server
327                                    .0
328                                    .state
329                                    .lock("set peer")
330                                    .inner
331                                    .set_token_binding_peer_id(peer);
332                                hl_conn_server
333                                    .0
334                                    .state
335                                    .lock("allow tokens")
336                                    .inner
337                                    .set_delay_new_token_until_binding(false);
338                            }
339                            Err(_e) => {
340                                hl_conn_server.close(0u32.into(), b"channel binding failed");
341                            }
342                        }
343                    }));
344                }
345
346                if conn.inner.side().is_client() {
347                    runtime.spawn(Box::pin(async move {
348                        if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
349                            let _ =
350                                crate::trust::send_binding(&hl_conn_client, &exp, &signer, &spki)
351                                    .await;
352                        }
353                    }));
354                }
355
356                conn.binding_started = true;
357            }
358        }
359
360        if !conn.inner.is_drained() {
361            if keep_going {
362                // If the connection hasn't processed all tasks, schedule it again
363                cx.waker().wake_by_ref();
364            } else {
365                conn.driver = Some(cx.waker().clone());
366            }
367            return Poll::Pending;
368        }
369        if conn.error.is_none() {
370            unreachable!("drained connections always have an error");
371        }
372        Poll::Ready(Ok(()))
373    }
374}
375
376/// A QUIC connection.
377///
378/// If all references to a connection (including every clone of the `Connection` handle, streams of
379/// incoming streams, and the various stream types) have been dropped, then the connection will be
380/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
381/// connection explicitly by calling [`Connection::close()`].
382///
383/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
384/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
385/// application. [`Connection::close()`] describes in more detail how to gracefully close a
386/// connection without losing application data.
387///
388/// May be cloned to obtain another handle to the same connection.
389///
390/// [`Connection::close()`]: Connection::close
391#[derive(Debug, Clone)]
392pub struct Connection(ConnectionRef);
393
394impl Connection {
395    /// Initiate a new outgoing unidirectional stream.
396    ///
397    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
398    /// consequence, the peer won't be notified that a stream has been opened until the stream is
399    /// actually used.
400    pub fn open_uni(&self) -> OpenUni<'_> {
401        OpenUni {
402            conn: &self.0,
403            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
404        }
405    }
406
407    /// Initiate a new outgoing bidirectional stream.
408    ///
409    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
410    /// consequence, the peer won't be notified that a stream has been opened until the stream is
411    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
412    /// anything to [`SendStream`] will never succeed.
413    ///
414    /// [`open_bi()`]: Self::open_bi
415    /// [`SendStream`]: crate::SendStream
416    /// [`RecvStream`]: crate::RecvStream
417    pub fn open_bi(&self) -> OpenBi<'_> {
418        OpenBi {
419            conn: &self.0,
420            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
421        }
422    }
423
424    /// Accept the next incoming uni-directional stream
425    pub fn accept_uni(&self) -> AcceptUni<'_> {
426        AcceptUni {
427            conn: &self.0,
428            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
429        }
430    }
431
432    /// Accept the next incoming bidirectional stream
433    ///
434    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
435    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
436    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
437    ///
438    /// [`accept_bi()`]: Self::accept_bi
439    /// [`open_bi()`]: Self::open_bi
440    /// [`SendStream`]: crate::SendStream
441    /// [`RecvStream`]: crate::RecvStream
442    pub fn accept_bi(&self) -> AcceptBi<'_> {
443        AcceptBi {
444            conn: &self.0,
445            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
446        }
447    }
448
449    /// Receive an application datagram
450    pub fn read_datagram(&self) -> ReadDatagram<'_> {
451        ReadDatagram {
452            conn: &self.0,
453            notify: self.0.shared.datagram_received.notified(),
454        }
455    }
456
457    /// Wait for the connection to be closed for any reason
458    ///
459    /// Despite the return type's name, closed connections are often not an error condition at the
460    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
461    /// and [`ConnectionError::ApplicationClosed`].
462    pub async fn closed(&self) -> ConnectionError {
463        {
464            let conn = self.0.state.lock("closed");
465            if let Some(error) = conn.error.as_ref() {
466                return error.clone();
467            }
468            // Construct the future while the lock is held to ensure we can't miss a wakeup if
469            // the `Notify` is signaled immediately after we release the lock. `await` it after
470            // the lock guard is out of scope.
471            self.0.shared.closed.notified()
472        }
473        .await;
474        self.0
475            .state
476            .lock("closed")
477            .error
478            .as_ref()
479            .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
480            .clone()
481    }
482
483    /// Check if this connection is still alive (not closed or draining).
484    ///
485    /// Returns `true` if the connection has not been closed for any reason.
486    /// This is useful for detecting phantom or stale connections that should
487    /// be cleaned up before attempting deduplication.
488    pub fn is_alive(&self) -> bool {
489        self.0.state.lock("is_alive").error.is_none()
490    }
491
492    /// If the connection is closed, the reason why.
493    ///
494    /// Returns `None` if the connection is still open.
495    pub fn close_reason(&self) -> Option<ConnectionError> {
496        self.0.state.lock("close_reason").error.clone()
497    }
498
499    pub(crate) fn supports_ack_receive_v1(&self) -> bool {
500        self.0
501            .state
502            .lock("supports_ack_receive_v1")
503            .inner
504            .supports_ack_receive_v1()
505    }
506
507    /// Close the connection immediately.
508    ///
509    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
510    /// more data is sent to the peer and the peer may drop buffered data upon receiving
511    /// the CONNECTION_CLOSE frame.
512    ///
513    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
514    ///
515    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
516    /// is preserved in full, it should be kept under 1KiB.
517    ///
518    /// # Gracefully closing a connection
519    ///
520    /// Only the peer last receiving application data can be certain that all data is
521    /// delivered. The only reliable action it can then take is to close the connection,
522    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
523    /// frame is very likely if both endpoints stay online long enough, and
524    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
525    /// remote peer will time out the connection, provided that the idle timeout is not
526    /// disabled.
527    ///
528    /// The sending side can not guarantee all stream data is delivered to the remote
529    /// application. It only knows the data is delivered to the QUIC stack of the remote
530    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
531    /// [`close()`] the remote endpoint may drop any data it received but is as yet
532    /// undelivered to the application, including data that was acknowledged as received to
533    /// the local endpoint.
534    ///
535    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
536    /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
537    /// [`close()`]: Connection::close
538    /// Wake the connection driver to trigger immediate transmission of
539    /// any pending frames. Call after queuing frames at the low level
540    /// (e.g., PUNCH_ME_NOW) that bypass the stream API.
541    pub fn wake_transmit(&self) {
542        self.0.state.lock("wake_transmit").wake();
543    }
544
545    /// Close the connection immediately with the given error code and reason.
546    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
547        let conn = &mut *self.0.state.lock("close");
548        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
549    }
550
551    /// Transmit `data` as an unreliable, unordered application datagram
552    ///
553    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
554    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
555    /// dictated by the peer.
556    ///
557    /// Previously queued datagrams which are still unsent may be discarded to make space for this
558    /// datagram, in order of oldest to newest.
559    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
560        let conn = &mut *self.0.state.lock("send_datagram");
561        if let Some(ref x) = conn.error {
562            return Err(SendDatagramError::ConnectionLost(x.clone()));
563        }
564        use crate::SendDatagramError::*;
565        match conn.inner.datagrams().send(data, true) {
566            Ok(()) => {
567                conn.wake();
568                Ok(())
569            }
570            Err(e) => Err(match e {
571                Blocked(..) => unreachable!(),
572                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
573                Disabled => SendDatagramError::Disabled,
574                TooLarge => SendDatagramError::TooLarge,
575            }),
576        }
577    }
578
579    /// Transmit `data` as an unreliable, unordered application datagram
580    ///
581    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
582    /// conditions, which effectively prioritizes old datagrams over new datagrams.
583    ///
584    /// See [`send_datagram()`] for details.
585    ///
586    /// [`send_datagram()`]: Connection::send_datagram
587    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
588        SendDatagram {
589            conn: &self.0,
590            data: Some(data),
591            notify: self.0.shared.datagrams_unblocked.notified(),
592        }
593    }
594
595    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
596    ///
597    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
598    ///
599    /// This may change over the lifetime of a connection according to variation in the path MTU
600    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
601    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
602    ///
603    /// Not necessarily the maximum size of received datagrams.
604    ///
605    /// [`send_datagram()`]: Connection::send_datagram
606    pub fn max_datagram_size(&self) -> Option<usize> {
607        self.0
608            .state
609            .lock("max_datagram_size")
610            .inner
611            .datagrams()
612            .max_size()
613    }
614
615    /// Bytes available in the outgoing datagram buffer
616    ///
617    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
618    /// at most this size is guaranteed not to cause older datagrams to be dropped.
619    pub fn datagram_send_buffer_space(&self) -> usize {
620        self.0
621            .state
622            .lock("datagram_send_buffer_space")
623            .inner
624            .datagrams()
625            .send_buffer_space()
626    }
627
628    /// Total number of application datagrams that have been dropped due to receive buffer overflow
629    pub fn datagram_drop_stats(&self) -> DatagramDropStats {
630        self.0
631            .state
632            .lock("datagram_drop_stats")
633            .inner
634            .stats()
635            .datagram_drops
636    }
637
638    /// Wait for the next datagram drop notification
639    pub fn on_datagram_drop(&self) -> DatagramDrop<'_> {
640        DatagramDrop {
641            conn: &self.0,
642            notify: self.0.shared.datagram_dropped.notified(),
643        }
644    }
645
646    /// Queue an ADD_ADDRESS NAT traversal frame via the underlying connection
647    pub fn send_nat_address_advertisement(
648        &self,
649        address: SocketAddr,
650        priority: u32,
651    ) -> Result<u64, crate::ConnectionError> {
652        let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
653        conn.inner.send_nat_address_advertisement(address, priority)
654    }
655
656    /// Queue a PUNCH_ME_NOW NAT traversal frame via the underlying connection
657    pub fn send_nat_punch_coordination(
658        &self,
659        paired_with_sequence_number: u64,
660        address: SocketAddr,
661        round: u32,
662    ) -> Result<(), crate::ConnectionError> {
663        let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
664        conn.inner
665            .send_nat_punch_coordination(paired_with_sequence_number, address, round)
666    }
667
668    /// Queue a PUNCH_ME_NOW frame via a coordinator to reach a target peer behind NAT
669    ///
670    /// This sends a PUNCH_ME_NOW to the current connection (acting as coordinator)
671    /// with a target peer ID, asking the coordinator to relay to the target peer.
672    pub fn send_nat_punch_via_relay(
673        &self,
674        target_peer_id: [u8; 32],
675        our_address: SocketAddr,
676        round: u32,
677    ) -> Result<(), crate::ConnectionError> {
678        let conn = &mut *self.0.state.lock("send_nat_punch_via_relay");
679        conn.inner
680            .send_nat_punch_via_relay(target_peer_id, our_address, round)
681    }
682
683    /// The side of the connection (client or server)
684    pub fn side(&self) -> Side {
685        self.0.state.lock("side").inner.side()
686    }
687
688    /// The peer's UDP address
689    ///
690    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
691    /// switching to a cellular internet connection.
692    pub fn remote_address(&self) -> SocketAddr {
693        self.0.state.lock("remote_address").inner.remote_address()
694    }
695
696    /// The external/reflexive address observed by the remote peer
697    ///
698    /// Returns the address that the remote peer has observed for this connection,
699    /// as reported via OBSERVED_ADDRESS frames. This is useful for NAT traversal
700    /// to learn the public address of this endpoint as seen by others.
701    ///
702    /// Returns `None` if:
703    /// - Address discovery is not enabled
704    /// - No OBSERVED_ADDRESS frame has been received yet
705    /// - The connection hasn't completed the handshake
706    pub fn observed_address(&self) -> Option<SocketAddr> {
707        self.0
708            .state
709            .lock("observed_address")
710            .inner
711            .observed_address()
712    }
713
714    /// Returns all observed external addresses from all QUIC paths on this connection.
715    ///
716    /// Different paths may report different address families (IPv4 vs IPv6).
717    pub fn all_observed_addresses(&self) -> Vec<SocketAddr> {
718        self.0
719            .state
720            .lock("all_observed_addresses")
721            .inner
722            .all_observed_addresses()
723    }
724
725    /// Wait for the connection's observed-address set to change.
726    pub(crate) fn observed_address_updated(&self) -> Notified<'_> {
727        self.0.shared.observed_address_updated.notified()
728    }
729
730    /// The local IP address which was used when the peer established
731    /// the connection
732    ///
733    /// This can be different from the address the endpoint is bound to, in case
734    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
735    ///
736    /// This will return `None` for clients, or when the platform does not expose this
737    /// information. See quinn_udp's RecvMeta::dst_ip for a list of
738    /// supported platforms when using quinn_udp for I/O, which is the default.
739    pub fn local_ip(&self) -> Option<IpAddr> {
740        self.0.state.lock("local_ip").inner.local_ip()
741    }
742
743    /// Current best estimate of this connection's latency (round-trip-time)
744    pub fn rtt(&self) -> Duration {
745        self.0.state.lock("rtt").inner.rtt()
746    }
747
748    /// Returns connection statistics
749    pub fn stats(&self) -> ConnectionStats {
750        self.0.state.lock("stats").inner.stats()
751    }
752
753    /// Current state of the congestion control algorithm, for debugging purposes
754    pub fn congestion_state(&self) -> Box<dyn Controller> {
755        self.0
756            .state
757            .lock("congestion_state")
758            .inner
759            .congestion_state()
760            .clone_box()
761    }
762
763    /// Parameters negotiated during the handshake
764    ///
765    /// Guaranteed to return `Some` on fully established connections or after
766    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
767    /// the returned value.
768    ///
769    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
770    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
771        self.0
772            .state
773            .lock("handshake_data")
774            .inner
775            .crypto_session()
776            .handshake_data()
777    }
778
779    /// Cryptographic identity of the peer
780    ///
781    /// The dynamic type returned is determined by the configured
782    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
783    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
784    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
785        self.0
786            .state
787            .lock("peer_identity")
788            .inner
789            .crypto_session()
790            .peer_identity()
791    }
792
793    /// A stable identifier for this connection
794    ///
795    /// Peer addresses and connection IDs can change, but this value will remain
796    /// fixed for the lifetime of the connection.
797    pub fn stable_id(&self) -> usize {
798        self.0.stable_id()
799    }
800
801    /// Returns true if this connection negotiated post-quantum settings.
802    ///
803    /// This reflects either explicit PQC algorithms advertised via transport
804    /// parameters or in-band detection from handshake CRYPTO frames.
805    pub fn is_pqc(&self) -> bool {
806        let state = self.0.state.lock("is_pqc");
807        state.inner.is_pqc()
808    }
809
810    /// Debug-only hint: returns true when the underlying TLS provider was
811    /// configured to run in KEM-only (ML‑KEM) mode. This is a diagnostic aid
812    /// for tests and does not itself guarantee group enforcement.
813    pub fn debug_kem_only(&self) -> bool {
814        crate::crypto::rustls::debug_kem_only_enabled()
815    }
816
817    /// Update traffic keys spontaneously
818    ///
819    /// This primarily exists for testing purposes.
820    pub fn force_key_update(&self) {
821        self.0
822            .state
823            .lock("force_key_update")
824            .inner
825            .force_key_update()
826    }
827
828    /// Derive keying material from this connection's TLS session secrets.
829    ///
830    /// When both peers call this method with the same `label` and `context`
831    /// arguments and `output` buffers of equal length, they will get the
832    /// same sequence of bytes in `output`. These bytes are cryptographically
833    /// strong and pseudorandom, and are suitable for use as keying material.
834    ///
835    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
836    pub fn export_keying_material(
837        &self,
838        output: &mut [u8],
839        label: &[u8],
840        context: &[u8],
841    ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
842        self.0
843            .state
844            .lock("export_keying_material")
845            .inner
846            .crypto_session()
847            .export_keying_material(output, label, context)
848    }
849
850    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
851    ///
852    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
853    /// `count`s increase both minimum and worst-case memory consumption.
854    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
855        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
856        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
857        // May need to send MAX_STREAMS to make progress
858        conn.wake();
859    }
860
861    /// See [`crate::TransportConfig::receive_window()`]
862    pub fn set_receive_window(&self, receive_window: VarInt) {
863        let mut conn = self.0.state.lock("set_receive_window");
864        conn.inner.set_receive_window(receive_window);
865        conn.wake();
866    }
867
868    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
869    ///
870    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
871    /// `count`s increase both minimum and worst-case memory consumption.
872    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
873        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
874        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
875        // May need to send MAX_STREAMS to make progress
876        conn.wake();
877    }
878
879    /// Set up qlog for this connection.
880    #[cfg(feature = "__qlog")]
881    pub fn set_qlog(
882        &mut self,
883        writer: Box<dyn std::io::Write + Send + Sync>,
884        title: Option<String>,
885        description: Option<String>,
886    ) {
887        let mut state = self.0.state.lock("__qlog");
888        state
889            .inner
890            .set_qlog(writer, title, description, Instant::now());
891    }
892}
893
894pin_project! {
895    /// Future produced by [`Connection::open_uni`]
896    pub struct OpenUni<'a> {
897        conn: &'a ConnectionRef,
898        #[pin]
899        notify: Notified<'a>,
900    }
901}
902
903impl Future for OpenUni<'_> {
904    type Output = Result<SendStream, ConnectionError>;
905    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
906        let this = self.project();
907        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
908        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
909    }
910}
911
912pin_project! {
913    /// Future produced by [`Connection::open_bi`]
914    pub struct OpenBi<'a> {
915        conn: &'a ConnectionRef,
916        #[pin]
917        notify: Notified<'a>,
918    }
919}
920
921impl Future for OpenBi<'_> {
922    type Output = Result<(SendStream, RecvStream), ConnectionError>;
923    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
924        let this = self.project();
925        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
926
927        Poll::Ready(Ok((
928            SendStream::new(conn.clone(), id, is_0rtt),
929            RecvStream::new(conn, id, is_0rtt),
930        )))
931    }
932}
933
934fn poll_open<'a>(
935    ctx: &mut Context<'_>,
936    conn: &'a ConnectionRef,
937    mut notify: Pin<&mut Notified<'a>>,
938    dir: Dir,
939) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
940    let mut state = conn.state.lock("poll_open");
941    if let Some(ref e) = state.error {
942        return Poll::Ready(Err(e.clone()));
943    } else if let Some(id) = state.inner.streams().open(dir) {
944        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
945        drop(state); // Release the lock so clone can take it
946        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
947    }
948    loop {
949        match notify.as_mut().poll(ctx) {
950            // `state` lock ensures we didn't race with readiness
951            Poll::Pending => return Poll::Pending,
952            // Spurious wakeup, get a new future
953            Poll::Ready(()) => {
954                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
955            }
956        }
957    }
958}
959
960pin_project! {
961    /// Future produced by [`Connection::accept_uni`]
962    pub struct AcceptUni<'a> {
963        conn: &'a ConnectionRef,
964        #[pin]
965        notify: Notified<'a>,
966    }
967}
968
969impl Future for AcceptUni<'_> {
970    type Output = Result<RecvStream, ConnectionError>;
971
972    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
973        let this = self.project();
974        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
975        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
976    }
977}
978
979pin_project! {
980    /// Future produced by [`Connection::accept_bi`]
981    pub struct AcceptBi<'a> {
982        conn: &'a ConnectionRef,
983        #[pin]
984        notify: Notified<'a>,
985    }
986}
987
988impl Future for AcceptBi<'_> {
989    type Output = Result<(SendStream, RecvStream), ConnectionError>;
990
991    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
992        let this = self.project();
993        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
994        Poll::Ready(Ok((
995            SendStream::new(conn.clone(), id, is_0rtt),
996            RecvStream::new(conn, id, is_0rtt),
997        )))
998    }
999}
1000
1001fn poll_accept<'a>(
1002    ctx: &mut Context<'_>,
1003    conn: &'a ConnectionRef,
1004    mut notify: Pin<&mut Notified<'a>>,
1005    dir: Dir,
1006) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1007    let mut state = conn.state.lock("poll_accept");
1008    // Check for incoming streams before checking `state.error` so that already-received streams,
1009    // which are necessarily finite, can be drained from a closed connection.
1010    if let Some(id) = state.inner.streams().accept(dir) {
1011        let is_0rtt = state.inner.is_handshaking();
1012        state.wake(); // To send additional stream ID credit
1013        drop(state); // Release the lock so clone can take it
1014        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1015    } else if let Some(ref e) = state.error {
1016        return Poll::Ready(Err(e.clone()));
1017    }
1018    loop {
1019        match notify.as_mut().poll(ctx) {
1020            // `state` lock ensures we didn't race with readiness
1021            Poll::Pending => return Poll::Pending,
1022            // Spurious wakeup, get a new future
1023            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1024        }
1025    }
1026}
1027
1028pin_project! {
1029    /// Future produced by [`Connection::read_datagram`]
1030    pub struct ReadDatagram<'a> {
1031        conn: &'a ConnectionRef,
1032        #[pin]
1033        notify: Notified<'a>,
1034    }
1035}
1036
1037impl Future for ReadDatagram<'_> {
1038    type Output = Result<Bytes, ConnectionError>;
1039    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1040        let mut this = self.project();
1041        let mut state = this.conn.state.lock("ReadDatagram::poll");
1042        // Check for buffered datagrams before checking `state.error` so that already-received
1043        // datagrams, which are necessarily finite, can be drained from a closed connection.
1044        match state.inner.datagrams().recv() {
1045            Some(x) => {
1046                return Poll::Ready(Ok(x));
1047            }
1048            _ => {
1049                if let Some(ref e) = state.error {
1050                    return Poll::Ready(Err(e.clone()));
1051                }
1052            }
1053        }
1054        loop {
1055            match this.notify.as_mut().poll(ctx) {
1056                // `state` lock ensures we didn't race with readiness
1057                Poll::Pending => return Poll::Pending,
1058                // Spurious wakeup, get a new future
1059                Poll::Ready(()) => this
1060                    .notify
1061                    .set(this.conn.shared.datagram_received.notified()),
1062            }
1063        }
1064    }
1065}
1066
1067pin_project! {
1068    /// Future produced by [`Connection::on_datagram_drop`]
1069    pub struct DatagramDrop<'a> {
1070        conn: &'a ConnectionRef,
1071        #[pin]
1072        notify: Notified<'a>,
1073    }
1074}
1075
1076impl Future for DatagramDrop<'_> {
1077    type Output = Result<DatagramDropStats, ConnectionError>;
1078
1079    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1080        let mut this = self.project();
1081        let mut state = this.conn.state.lock("DatagramDrop::poll");
1082        if let Some(drop) = state.datagram_drop_events.pop_front() {
1083            return Poll::Ready(Ok(drop));
1084        }
1085        if let Some(ref e) = state.error {
1086            return Poll::Ready(Err(e.clone()));
1087        }
1088        loop {
1089            match this.notify.as_mut().poll(ctx) {
1090                // `state` lock ensures we didn't race with readiness
1091                Poll::Pending => return Poll::Pending,
1092                Poll::Ready(()) => this
1093                    .notify
1094                    .set(this.conn.shared.datagram_dropped.notified()),
1095            }
1096        }
1097    }
1098}
1099
1100pin_project! {
1101    /// Future produced by [`Connection::send_datagram_wait`]
1102    pub struct SendDatagram<'a> {
1103        conn: &'a ConnectionRef,
1104        data: Option<Bytes>,
1105        #[pin]
1106        notify: Notified<'a>,
1107    }
1108}
1109
1110impl Future for SendDatagram<'_> {
1111    type Output = Result<(), SendDatagramError>;
1112    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1113        let mut this = self.project();
1114        let mut state = this.conn.state.lock("SendDatagram::poll");
1115        if let Some(ref e) = state.error {
1116            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1117        }
1118        use crate::SendDatagramError::*;
1119        match state.inner.datagrams().send(
1120            this.data.take().ok_or_else(|| {
1121                error!("SendDatagram future polled without data");
1122                SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
1123            })?,
1124            false,
1125        ) {
1126            Ok(()) => {
1127                state.wake();
1128                Poll::Ready(Ok(()))
1129            }
1130            Err(e) => Poll::Ready(Err(match e {
1131                Blocked(data) => {
1132                    this.data.replace(data);
1133                    loop {
1134                        match this.notify.as_mut().poll(ctx) {
1135                            Poll::Pending => return Poll::Pending,
1136                            // Spurious wakeup, get a new future
1137                            Poll::Ready(()) => this
1138                                .notify
1139                                .set(this.conn.shared.datagrams_unblocked.notified()),
1140                        }
1141                    }
1142                }
1143                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1144                Disabled => SendDatagramError::Disabled,
1145                TooLarge => SendDatagramError::TooLarge,
1146            })),
1147        }
1148    }
1149}
1150
1151#[derive(Debug)]
1152pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1153
1154impl ConnectionRef {
1155    #[allow(clippy::too_many_arguments)]
1156    fn new(
1157        handle: ConnectionHandle,
1158        conn: crate::Connection,
1159        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1160        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1161        on_handshake_data: oneshot::Sender<()>,
1162        on_connected: oneshot::Sender<bool>,
1163        socket: Arc<dyn AsyncUdpSocket>,
1164        runtime: Arc<dyn Runtime>,
1165    ) -> Self {
1166        Self(Arc::new(ConnectionInner {
1167            state: Mutex::new(State {
1168                inner: conn,
1169                driver: None,
1170                handle,
1171                on_handshake_data: Some(on_handshake_data),
1172                on_connected: Some(on_connected),
1173                connected: false,
1174                timer: None,
1175                timer_deadline: None,
1176                conn_events,
1177                endpoint_events,
1178                blocked_writers: FxHashMap::default(),
1179                blocked_readers: FxHashMap::default(),
1180                stopped: FxHashMap::default(),
1181                error: None,
1182                ref_count: 0,
1183                datagram_drop_events: VecDeque::new(),
1184                io_poller: socket.clone().create_io_poller(),
1185                socket,
1186                runtime,
1187                send_buffer: Vec::new(),
1188                buffered_transmit: None,
1189                binding_started: false,
1190            }),
1191            shared: Shared::default(),
1192        }))
1193    }
1194
1195    fn stable_id(&self) -> usize {
1196        &*self.0 as *const _ as usize
1197    }
1198}
1199
1200impl Clone for ConnectionRef {
1201    fn clone(&self) -> Self {
1202        self.state.lock("clone").ref_count += 1;
1203        Self(self.0.clone())
1204    }
1205}
1206
1207impl Drop for ConnectionRef {
1208    fn drop(&mut self) {
1209        let conn = &mut *self.state.lock("drop");
1210        if let Some(x) = conn.ref_count.checked_sub(1) {
1211            conn.ref_count = x;
1212            if x == 0 && !conn.inner.is_closed() {
1213                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1214                // not, we can't do any harm. If there were any streams being opened, then either
1215                // the connection will be closed for an unrelated reason or a fresh reference will
1216                // be constructed for the newly opened stream.
1217                conn.implicit_close(&self.shared);
1218            }
1219        }
1220    }
1221}
1222
1223impl std::ops::Deref for ConnectionRef {
1224    type Target = ConnectionInner;
1225    fn deref(&self) -> &Self::Target {
1226        &self.0
1227    }
1228}
1229
1230#[derive(Debug)]
1231pub(crate) struct ConnectionInner {
1232    pub(crate) state: Mutex<State>,
1233    pub(crate) shared: Shared,
1234}
1235
1236#[derive(Debug, Default)]
1237pub(crate) struct Shared {
1238    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1239    /// control budget
1240    stream_budget_available: [Notify; 2],
1241    /// Notified when the peer has initiated a new stream
1242    stream_incoming: [Notify; 2],
1243    datagram_received: Notify,
1244    datagrams_unblocked: Notify,
1245    datagram_dropped: Notify,
1246    observed_address_updated: Notify,
1247    closed: Notify,
1248}
1249
1250pub(crate) struct State {
1251    pub(crate) inner: crate::Connection,
1252    driver: Option<Waker>,
1253    handle: ConnectionHandle,
1254    on_handshake_data: Option<oneshot::Sender<()>>,
1255    on_connected: Option<oneshot::Sender<bool>>,
1256    connected: bool,
1257    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1258    timer_deadline: Option<Instant>,
1259    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1260    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1261    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1262    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1263    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1264    /// Always set to Some before the connection becomes drained
1265    pub(crate) error: Option<ConnectionError>,
1266    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1267    ref_count: usize,
1268    datagram_drop_events: VecDeque<DatagramDropStats>,
1269    socket: Arc<dyn AsyncUdpSocket>,
1270    io_poller: Pin<Box<dyn UdpPoller>>,
1271    runtime: Arc<dyn Runtime>,
1272    send_buffer: Vec<u8>,
1273    /// We buffer a transmit when the underlying I/O would block
1274    buffered_transmit: Option<crate::Transmit>,
1275    /// True once we've initiated automatic channel binding (if enabled)
1276    binding_started: bool,
1277}
1278
1279impl State {
1280    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1281        let now = self.runtime.now();
1282        let mut transmits = 0;
1283
1284        let max_datagrams = self
1285            .socket
1286            .max_transmit_segments()
1287            .min(MAX_TRANSMIT_SEGMENTS);
1288
1289        loop {
1290            // Retry the last transmit, or get a new one.
1291            let t = match self.buffered_transmit.take() {
1292                Some(t) => t,
1293                None => {
1294                    self.send_buffer.clear();
1295                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
1296                    match self
1297                        .inner
1298                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1299                    {
1300                        Some(t) => {
1301                            transmits += match t.segment_size {
1302                                None => 1,
1303                                Some(s) => t.size.div_ceil(s), // round up
1304                            };
1305                            t
1306                        }
1307                        None => break,
1308                    }
1309                }
1310            };
1311
1312            if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1313                // Retry after a future wakeup
1314                self.buffered_transmit = Some(t);
1315                return Ok(false);
1316            }
1317
1318            let len = t.size;
1319            let retry = match self
1320                .socket
1321                .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1322            {
1323                Ok(()) => false,
1324                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1325                Err(e) => return Err(e),
1326            };
1327            if retry {
1328                // We thought the socket was writable, but it wasn't. Retry so that either another
1329                // `poll_writable` call determines that the socket is indeed not writable and
1330                // registers us for a wakeup, or the send succeeds if this really was just a
1331                // transient failure.
1332                self.buffered_transmit = Some(t);
1333                continue;
1334            }
1335
1336            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1337                // TODO: What isn't ideal here yet is that if we don't poll all
1338                // datagrams that could be sent we don't go into the `app_limited`
1339                // state and CWND continues to grow until we get here the next time.
1340                // See https://github.com/quinn-rs/quinn/issues/1126
1341                return Ok(true);
1342            }
1343        }
1344
1345        Ok(false)
1346    }
1347
1348    fn forward_endpoint_events(&mut self) {
1349        while let Some(event) = self.inner.poll_endpoint_events() {
1350            // If the endpoint driver is gone, noop.
1351            let _ = self.endpoint_events.send((self.handle, event));
1352        }
1353    }
1354
1355    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1356    fn process_conn_events(
1357        &mut self,
1358        shared: &Shared,
1359        cx: &mut Context,
1360    ) -> Result<(), ConnectionError> {
1361        loop {
1362            match self.conn_events.poll_recv(cx) {
1363                Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1364                    self.socket = socket;
1365                    self.io_poller = self.socket.clone().create_io_poller();
1366                    self.inner.local_address_changed();
1367                }
1368                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1369                    let mut observed_before = self.inner.all_observed_addresses();
1370                    observed_before.sort_unstable();
1371                    observed_before.dedup();
1372
1373                    self.inner.handle_event(event);
1374
1375                    let mut observed_after = self.inner.all_observed_addresses();
1376                    observed_after.sort_unstable();
1377                    observed_after.dedup();
1378                    if observed_before != observed_after {
1379                        shared.observed_address_updated.notify_one();
1380                    }
1381                }
1382                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1383                    self.close(error_code, reason, shared);
1384                }
1385                Poll::Ready(None) => {
1386                    return Err(ConnectionError::TransportError(crate::TransportError {
1387                        code: crate::TransportErrorCode::INTERNAL_ERROR,
1388                        frame: None,
1389                        reason: "endpoint driver future was dropped".to_string(),
1390                    }));
1391                }
1392                Poll::Pending => {
1393                    return Ok(());
1394                }
1395            }
1396        }
1397    }
1398
1399    fn forward_app_events(&mut self, shared: &Shared) {
1400        while let Some(event) = self.inner.poll() {
1401            use crate::Event::*;
1402            match event {
1403                HandshakeDataReady => {
1404                    if let Some(x) = self.on_handshake_data.take() {
1405                        let _ = x.send(());
1406                    }
1407                }
1408                Connected => {
1409                    self.connected = true;
1410                    if let Some(x) = self.on_connected.take() {
1411                        // We don't care if the on-connected future was dropped
1412                        let _ = x.send(self.inner.accepted_0rtt());
1413                    }
1414                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1415                        // Wake up rejected 0-RTT streams so they can fail immediately with
1416                        // `ZeroRttRejected` errors.
1417                        wake_all(&mut self.blocked_writers);
1418                        wake_all(&mut self.blocked_readers);
1419                        wake_all_notify(&mut self.stopped);
1420                    }
1421                }
1422                ConnectionLost { reason } => {
1423                    self.terminate(reason, shared);
1424                }
1425                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1426                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1427                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1428                }
1429                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1430                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1431                }
1432                DatagramReceived => {
1433                    shared.datagram_received.notify_waiters();
1434                }
1435                DatagramsUnblocked => {
1436                    shared.datagrams_unblocked.notify_waiters();
1437                }
1438                DatagramDropped(drop) => {
1439                    // Buffer overflow - surface to application via dedicated queue and notify
1440                    tracing::debug!(
1441                        datagrams = drop.datagrams,
1442                        bytes = drop.bytes,
1443                        "datagrams dropped due to receive buffer overflow"
1444                    );
1445                    self.datagram_drop_events.push_back(drop);
1446                    shared.datagram_dropped.notify_waiters();
1447                }
1448                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1449                Stream(StreamEvent::Available { dir }) => {
1450                    // Might mean any number of streams are ready, so we wake up everyone
1451                    shared.stream_budget_available[dir as usize].notify_waiters();
1452                }
1453                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1454                Stream(StreamEvent::Stopped { id, .. }) => {
1455                    wake_stream_notify(id, &mut self.stopped);
1456                    wake_stream(id, &mut self.blocked_writers);
1457                }
1458            }
1459        }
1460    }
1461
1462    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1463        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1464        // timer is registered with the runtime (and check whether it's already
1465        // expired).
1466        match self.inner.poll_timeout() {
1467            Some(deadline) => {
1468                if let Some(delay) = &mut self.timer {
1469                    // There is no need to reset the tokio timer if the deadline
1470                    // did not change
1471                    if self
1472                        .timer_deadline
1473                        .map(|current_deadline| current_deadline != deadline)
1474                        .unwrap_or(true)
1475                    {
1476                        delay.as_mut().reset(deadline);
1477                    }
1478                } else {
1479                    self.timer = Some(self.runtime.new_timer(deadline));
1480                }
1481                // Store the actual expiration time of the timer
1482                self.timer_deadline = Some(deadline);
1483            }
1484            None => {
1485                self.timer_deadline = None;
1486                return false;
1487            }
1488        }
1489
1490        if self.timer_deadline.is_none() {
1491            return false;
1492        }
1493
1494        let delay = match self.timer.as_mut() {
1495            Some(timer) => timer.as_mut(),
1496            None => {
1497                error!("Timer missing in state where it should exist");
1498                return false;
1499            }
1500        };
1501        if delay.poll(cx).is_pending() {
1502            // Since there wasn't a timeout event, there is nothing new
1503            // for the connection to do
1504            return false;
1505        }
1506
1507        // A timer expired, so the caller needs to check for
1508        // new transmits, which might cause new timers to be set.
1509        self.inner.handle_timeout(self.runtime.now());
1510        self.timer_deadline = None;
1511        true
1512    }
1513
1514    /// Wake up a blocked `Driver` task to process I/O
1515    pub(crate) fn wake(&mut self) {
1516        if let Some(x) = self.driver.take() {
1517            x.wake();
1518        }
1519    }
1520
1521    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1522    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1523        self.error = Some(reason.clone());
1524        if let Some(x) = self.on_handshake_data.take() {
1525            let _ = x.send(());
1526        }
1527        wake_all(&mut self.blocked_writers);
1528        wake_all(&mut self.blocked_readers);
1529        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1530        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1531        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1532        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1533        shared.datagram_received.notify_waiters();
1534        shared.datagrams_unblocked.notify_waiters();
1535        shared.datagram_dropped.notify_waiters();
1536        shared.observed_address_updated.notify_waiters();
1537        if let Some(x) = self.on_connected.take() {
1538            let _ = x.send(false);
1539        }
1540        wake_all_notify(&mut self.stopped);
1541        shared.closed.notify_waiters();
1542    }
1543
1544    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1545        self.inner.close(self.runtime.now(), error_code, reason);
1546        self.terminate(ConnectionError::LocallyClosed, shared);
1547        self.wake();
1548    }
1549
1550    /// Close for a reason other than the application's explicit request
1551    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1552        self.close(0u32.into(), Bytes::new(), shared);
1553    }
1554
1555    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1556        if self.inner.is_handshaking()
1557            || self.inner.accepted_0rtt()
1558            || self.inner.side().is_server()
1559        {
1560            Ok(())
1561        } else {
1562            Err(())
1563        }
1564    }
1565}
1566
1567impl Drop for State {
1568    fn drop(&mut self) {
1569        if !self.inner.is_drained() {
1570            // Ensure the endpoint can tidy up
1571            let _ = self
1572                .endpoint_events
1573                .send((self.handle, crate::EndpointEvent::drained()));
1574        }
1575    }
1576}
1577
1578impl fmt::Debug for State {
1579    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1580        f.debug_struct("State").field("inner", &self.inner).finish()
1581    }
1582}
1583
1584fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1585    if let Some(waker) = wakers.remove(&stream_id) {
1586        waker.wake();
1587    }
1588}
1589
1590fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1591    wakers.drain().for_each(|(_, waker)| waker.wake())
1592}
1593
1594fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1595    if let Some(notify) = wakers.remove(&stream_id) {
1596        notify.notify_waiters()
1597    }
1598}
1599
1600fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1601    wakers
1602        .drain()
1603        .for_each(|(_, notify)| notify.notify_waiters())
1604}
1605
1606/// Errors that can arise when sending a datagram
1607#[derive(Debug, Error, Clone, Eq, PartialEq)]
1608pub enum SendDatagramError {
1609    /// The peer does not support receiving datagram frames
1610    #[error("datagrams not supported by peer")]
1611    UnsupportedByPeer,
1612    /// Datagram support is disabled locally
1613    #[error("datagram support disabled")]
1614    Disabled,
1615    /// The datagram is larger than the connection can currently accommodate
1616    ///
1617    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1618    /// exceeded.
1619    #[error("datagram too large")]
1620    TooLarge,
1621    /// The connection was lost
1622    #[error("connection lost")]
1623    ConnectionLost(#[from] ConnectionError),
1624}
1625
1626/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1627///
1628/// This limits the amount of CPU resources consumed by datagram generation,
1629/// and allows other tasks (like receiving ACKs) to run in between.
1630const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1631
1632/// The maximum amount of datagrams that are sent in a single transmit
1633///
1634/// This can be lower than the maximum platform capabilities, to avoid excessive
1635/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1636/// that numbers around 10 are a good compromise.
1637const MAX_TRANSMIT_SEGMENTS: usize = 10;